1. 程式人生 > 其它 >RabbitMQ系列-- 廣播

RabbitMQ系列-- 廣播

技術標籤:中介軟體# RabbitMQ

在上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者。 在這篇部落格中,我們將實現將一個訊息發給多個消費者,這種模式稱之為廣播。

本質上來說,就是釋出的訊息會轉發給所有的接收者。

交換機(Exchanges)

前面的部落格中我們都是通過生產者傳送訊息給佇列,接收者從佇列中接收訊息。 接下來我們將引入Exchanges。

生產者只能將訊息傳送給Exchanges。 Exchanges一邊從生產者接收訊息,另一邊將訊息推送到佇列中。我們可以通過定義轉發器的型別進行定義它處理訊息的方式。 可用的交換機的型別如下:

  1. Direct
  2. Topic
  3. headers
  4. fanout

目前我們只關注fanout。 fanout型別交換機特別簡單,把所有它接收到的訊息,廣播到它所繫結的佇列中。

在這裡插入圖片描述
在廣播模式下,訊息傳送流程是這樣的:

  • 可以有多個消費者
  • 每個消費者有自己的queue(佇列)
  • 每個佇列都要繫結到Exchange(交換機)
  • 生產者傳送的訊息,只能傳送到交換機,交換機來決定要傳送給哪個佇列,生產者無法決定。
  • 交換機把訊息傳送給繫結過的所有佇列
  • 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費。

程式碼實現

1.生產者:

public class Provider {
    public static void main
(String[] args) throws IOException { //獲取連線物件 Connection connection = RabbitMQUtils.getConnection(); //獲取通道 Channel channel = connection.createChannel(); //將通道宣告指定交換機 //引數1:交換機名稱 引數2:交換機的型別 fanout 廣播型別 //沒有交換機會建立一共名為logs的交換機 channel.exchangeDeclare
("logs","fanout"); //傳送訊息 channel.basicPublish("logs","",null,"fanout type message".getBytes()); //關閉連線和通道 RabbitMQUtils.closeChannelAndConnection(channel,connection); } }

2.消費者1

public class Customer1 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("logs","fanout");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"logs","");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

3.消費者2

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("logs","fanout");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"logs","");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

4.消費者3

public class Customer3 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("logs","fanout");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"logs","");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者3:"+new String(body));
            }
        });
    }
}

先執行3個消費者,在執行生產者,可以傳送3個消費者都接收到了生產者傳送的訊息。
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述