RabbitMQ系列-- 廣播
阿新 • • 發佈:2020-12-21
技術標籤:中介軟體# RabbitMQ
在上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者。 在這篇部落格中,我們將實現將一個訊息發給多個消費者,這種模式稱之為廣播。
本質上來說,就是釋出的訊息會轉發給所有的接收者。
交換機(Exchanges)
前面的部落格中我們都是通過生產者傳送訊息給佇列,接收者從佇列中接收訊息。 接下來我們將引入Exchanges。
生產者只能將訊息傳送給Exchanges。 Exchanges一邊從生產者接收訊息,另一邊將訊息推送到佇列中。我們可以通過定義轉發器的型別進行定義它處理訊息的方式。 可用的交換機的型別如下:
- Direct
- Topic
- headers
- fanout
目前我們只關注fanout。 fanout型別交換機特別簡單,把所有它接收到的訊息,廣播到它所繫結的佇列中。
在廣播模式下,訊息傳送流程是這樣的:
- 可以有多個消費者
- 每個消費者有自己的queue(佇列)
- 每個佇列都要繫結到Exchange(交換機)
- 生產者傳送的訊息,只能傳送到交換機,交換機來決定要傳送給哪個佇列,生產者無法決定。
- 交換機把訊息傳送給繫結過的所有佇列
- 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費。
程式碼實現
1.生產者:
public class Provider {
public static void main (String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
//獲取通道
Channel channel = connection.createChannel();
//將通道宣告指定交換機
//引數1:交換機名稱 引數2:交換機的型別 fanout 廣播型別
//沒有交換機會建立一共名為logs的交換機
channel.exchangeDeclare ("logs","fanout");
//傳送訊息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
//關閉連線和通道
RabbitMQUtils.closeChannelAndConnection(channel,connection);
}
}
2.消費者1
public class Customer1 {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("logs","fanout");
//建立一個臨時的、唯一的佇列
//返回的是 臨時佇列名
String queueName = channel.queueDeclare().getQueue();
//繫結交換機和佇列
//引數1: 佇列名稱 引數2:交換機名稱 引數3:路由名稱
channel.queueBind(queueName,"logs","");
//消費訊息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
3.消費者2
public class Customer2 {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("logs","fanout");
//建立一個臨時的、唯一的佇列
//返回的是 臨時佇列名
String queueName = channel.queueDeclare().getQueue();
//繫結交換機和佇列
//引數1: 佇列名稱 引數2:交換機名稱 引數3:路由名稱
channel.queueBind(queueName,"logs","");
//消費訊息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
4.消費者3
public class Customer3 {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("logs","fanout");
//建立一個臨時的、唯一的佇列
//返回的是 臨時佇列名
String queueName = channel.queueDeclare().getQueue();
//繫結交換機和佇列
//引數1: 佇列名稱 引數2:交換機名稱 引數3:路由名稱
channel.queueBind(queueName,"logs","");
//消費訊息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者3:"+new String(body));
}
});
}
}
先執行3個消費者,在執行生產者,可以傳送3個消費者都接收到了生產者傳送的訊息。