RabbitMQ的幾種模型和spring整合
阿新 • • 發佈:2021-01-28
如何從零搭建RabbitMQ,詳見此處
依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version >
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</ groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
連線工具類:
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連線
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連線工廠
ConnectionFactory factory = new ConnectionFactory();
//設定服務地址
factory.setHost("192.168.233.128");
//埠
factory.setPort(5672);
//設定賬號資訊,使用者名稱、密碼、vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("111111");
// 通過工程獲取連線
Connection connection = factory.newConnection();
return connection;
}
}
1.work模型
最簡單的模型我這就不列舉了,work解決的是佇列和消費者之間的堆擠,和下面的幾種模型不衝突
demo程式碼:
生產者
// 生產者
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 迴圈釋出任務
for (int i = 0; i < 50; i++) {
// 訊息內容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
// 關閉通道和連線
channel.close();
connection.close();
}
}
消費者1
// 消費者1
public class Recv {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 設定每個消費者同時只能處理一條訊息,能者多勞
channel.basicQos(1);
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
try {
// 模擬完成任務的耗時:1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// 手動ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 監聽佇列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消費者2
//消費者2
public class Recv2 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 設定每個消費者同時只能處理一條訊息(能者多勞模式)
channel.basicQos(1);
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
// 手動ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 監聽佇列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
2.fanout模型
一個消費者將訊息首先發送到交換器,交換器繫結到多個佇列,然後被監聽該佇列的消費者所接收並消費。
生產者
public class Send {
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別為fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 訊息內容
String message = "Hello everyone";
// 釋出訊息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1
//消費者1
public class Recv {
private final static String QUEUE_NAME = "fanout_exchange_queue_1";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2
// 消費者2
public class Recv2 {
private final static String QUEUE_NAME = "fanout_exchange_queue_2";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3.direct模型
生成者生產訊息的時候帶了一個routingKey,佇列繫結到交換機的時候也要設定一個routingKey,如果這2個routingKey一致就可以把訊息推入佇列
生產者
public class Send {
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別為direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
// 訊息內容
String message = "商品新增了, id = 1001";
// 傳送訊息,並且指定routing key 為:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
System.out.println(" [商品服務:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete訊息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
生產者的routingKey是insert,消費者1沒有這個routingKey,無法接收,消費者2可以
4. topic模型
topic模型其實就是direct模型的升級版,direct模型的routingKey是要完全一致的,topic模型可以用類似正則的方式匹配,item.*可以代表item.delete,item.insert,但不能代表item.group.insert;item.#則可以代表item.insert或者item.group.insert
生產者
public class Send {
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別為topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
// 訊息內容
String message = "新增商品 : id = 1001";
// 傳送訊息,並且指定routing key 為:insert ,代表新增商品 MessageProperties.PERSISTENT_TEXT_PLAIN持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [商品服務:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機,同時指定需要訂閱的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_22";
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列(durable:訊息持久化)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.spring整合RabbitMQ
配置檔案
spring:
rabbitmq:
host: 192.168.233.128
username: admin
password: 111111
virtual-host: /
生產者:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "100,商品更新成功";
this.amqpTemplate.convertAndSend("tntab_ex","item.update", msg);
// 等待10秒後再結束
Thread.sleep(10000);
}
}
消費者
@Component
public class Listener {
// 建立交換機佇列然後繫結
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "tntab_queue", durable = "true"),
exchange = @Exchange(
value = "tntab_ex",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"#.#"}))
// @RabbitListener(queues = "topic_exchange_queue_2")
public void listen(String msg){
System.out.println("接收到訊息:" + msg);
}
}