1. 程式人生 > >RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe)

RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe)

font image 直接 email err spl 回調方法 byte []

  工作隊列模式是直接在生產者與消費者裏聲明好一個隊列,這種情況下消息只會對應同類型的消費者。

  舉個用戶註冊的列子:用戶在註冊完後一般都會發送消息通知用戶註冊成功(失敗)。如果在一個系統中,用戶註冊信息有郵箱、手機號,那麽在註冊完後會向郵箱和手機號都發送註冊完成信息。利用MQ實現業務異步處理,如果是用工作隊列的話,就會聲明一個註冊信息隊列。註冊完成之後生產者會向隊列提交一條註冊數據,消費者取出數據同時向郵箱以及手機號發送兩條消息。但是實際上郵箱和手機號信息發送實際上是不同的業務邏輯,不應該放在一塊處理。這個時候就可以利用發布/訂閱模式將消息發送到轉換機(EXCHANGE),聲明兩個不同的隊列(郵箱、手機),並綁定到交換機。這樣生產者只需要發布一次消息,兩個隊列都會接收到消息發給對應的消費者。

1、什麽是發布/訂閱模式(Publish/Subscribe)

  簡單解釋就是,可以將消息發送給不同類型的消費者。做到發布一次,消費多個。下圖取自於官方網站(RabbitMQ)的發布/訂閱模式的圖例

技術分享圖片

P:消息的生產者

X:交換機

紅色:隊列

C1,C2:消息消費者

下面是利用用戶註冊解釋的該模式。(先運行兩個消費者,在運行生產者。如果沒有提前將隊列綁定到交換機,那麽直接運行生產者的話,消息是不會發到任何隊列裏的

2、生產者(Send)代碼

public class Send
{
    private final static String EXCHANGE_NAME = "test_exchange_fanout
"; public static void main(String[] args) { try { //獲取連接 Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 Channel channel = connection.createChannel(); //聲明交換機(分發:發布/訂閱模式) channel.exchangeDeclare(EXCHANGE_NAME, "
fanout"); //發送消息 for (int i = 0; i < 10; i++) { String message = "this is user registe message" + i; System.out.println("[send]:" + message); //發送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); Thread.sleep(5 * i); } channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }

運行結果:

[send]:this is user registe message0
[send]:this is user registe message1
[send]:this is user registe message2
[send]:this is user registe message3
[send]:this is user registe message4
[send]:this is user registe message5
[send]:this is user registe message6
[send]:this is user registe message7
[send]:this is user registe message8
[send]:this is user registe message9

3、消費者1(ReceiveEmail)

public class ReceiveEmail
{
    //交換機名稱
    private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
    //隊列名稱
    private static final String QUEUE_NAME    = "test_queue_email";
    
    public static void main(String[] args)
    {
        try
        {
            
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明交換機(分發:發布/訂閱模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //將隊列綁定到交換機
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保證一次只分發一個  
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[email] Receive message:" + message);
                    try
                    {
                        //消費者休息2s處理業務
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[1] done");
                        //手動應答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設置手動應答
            boolean autoAck = false;
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
    
}

運行結果:
[email] Receive message:this is user registe message0
[1] done
[email] Receive message:this is user registe message1
[1] done
[email] Receive message:this is user registe message2
[1] done
[email] Receive message:this is user registe message3
[1] done
[email] Receive message:this is user registe message4
[1] done
[email] Receive message:this is user registe message5
[1] done
[email] Receive message:this is user registe message6
[1] done
[email] Receive message:this is user registe message7
[1] done
[email] Receive message:this is user registe message8
[1] done
[email] Receive message:this is user registe message9
[1] done

4、消費者2(ReceivePhone)

public class ReceivePhone
{
    //交換機名稱
    private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
    //隊列名稱
    private static final String QUEUE_NAME    = "test_queue_phone";
    
    public static void main(String[] args)
    {
        try
        {
            
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明交換機(分發:發布/訂閱模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //將隊列綁定到交換機
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保證一次只分發一個  
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[phone] Receive message:" + message);
                    try
                    {
                        //消費者休息2s處理業務
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[2] done");
                        //手動應答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設置手動應答
            boolean autoAck = false;
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
    
}

運行結果:
[phone] Receive message:this is user registe message0
[2] done
[phone] Receive message:this is user registe message1
[2] done
[phone] Receive message:this is user registe message2
[2] done
[phone] Receive message:this is user registe message3
[2] done
[phone] Receive message:this is user registe message4
[2] done
[phone] Receive message:this is user registe message5
[2] done
[phone] Receive message:this is user registe message6
[2] done
[phone] Receive message:this is user registe message7
[2] done
[phone] Receive message:this is user registe message8
[2] done
[phone] Receive message:this is user registe message9
[2] done

總結:

  1、該模式下生產者並不是直接操作隊列,而是將數據發送給交換機,由交換機將數據發送給與之綁定的隊列。從運行結果中可以看到,兩中類型的消費者(Email,Phone)都收到相同數量的消息。

  2、該模式必須聲明交換機,並且設置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout")  fanout指分發模式(將每一條消息都發送到與交換機綁定的隊列。

  3、 隊列必須綁定交換機:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

註意:本文僅代表個人理解和看法喲!和本人所在公司和團體無任何關系!

RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe)