RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe)
工作隊列模式是直接在生產者與消費者裏聲明好一個隊列,這種情況下消息只會對應同類型的消費者。
舉個用戶註冊的列子:用戶在註冊完後一般都會發送消息通知用戶註冊成功(失敗)。如果在一個系統中,用戶註冊信息有郵箱、手機號,那麽在註冊完後會向郵箱和手機號都發送註冊完成信息。利用MQ實現業務異步處理,如果是用工作隊列的話,就會聲明一個註冊信息隊列。註冊完成之後生產者會向隊列提交一條註冊數據,消費者取出數據同時向郵箱以及手機號發送兩條消息。但是實際上郵箱和手機號信息發送實際上是不同的業務邏輯,不應該放在一塊處理。這個時候就可以利用發布/訂閱模式將消息發送到轉換機(EXCHANGE),聲明兩個不同的隊列(郵箱、手機),並綁定到交換機。這樣生產者只需要發布一次消息,兩個隊列都會接收到消息發給對應的消費者。
1、什麽是發布/訂閱模式(Publish/Subscribe)
簡單解釋就是,可以將消息發送給不同類型的消費者。做到發布一次,消費多個。下圖取自於官方網站(RabbitMQ)的發布/訂閱模式的圖例
P:消息的生產者
X:交換機
紅色:隊列
C1,C2:消息消費者
下面是利用用戶註冊解釋的該模式。(先運行兩個消費者,在運行生產者。如果沒有提前將隊列綁定到交換機,那麽直接運行生產者的話,消息是不會發到任何隊列裏的)
2、生產者(Send)代碼
public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) { try { //獲取連接 Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 Channel channel = connection.createChannel(); //聲明交換機(分發:發布/訂閱模式) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //發送消息 for (int i = 0; i < 10; i++) { String message = "this is user registe message" + i; System.out.println("[send]:" + message); //發送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); Thread.sleep(5 * i); } channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
運行結果:
[send]:this is user registe message0
[send]:this is user registe message1
[send]:this is user registe message2
[send]:this is user registe message3
[send]:this is user registe message4
[send]:this is user registe message5
[send]:this is user registe message6
[send]:this is user registe message7
[send]:this is user registe message8
[send]:this is user registe message9
3、消費者1(ReceiveEmail)
public class ReceiveEmail { //交換機名稱 private final static String EXCHANGE_NAME = "test_exchange_fanout"; //隊列名稱 private static final String QUEUE_NAME = "test_queue_email"; public static void main(String[] args) { try { //獲取連接 Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 final Channel channel = connection.createChannel(); //聲明交換機(分發:發布/訂閱模式) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //將隊列綁定到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //保證一次只分發一個 int prefetchCount = 1; channel.basicQos(prefetchCount); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { //當消息到達時執行回調方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[email] Receive message:" + message); try { //消費者休息2s處理業務 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); //手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //設置手動應答 boolean autoAck = false; //監聽隊列 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (IOException e) { e.printStackTrace(); } } }
運行結果:
[email] Receive message:this is user registe message0
[1] done
[email] Receive message:this is user registe message1
[1] done
[email] Receive message:this is user registe message2
[1] done
[email] Receive message:this is user registe message3
[1] done
[email] Receive message:this is user registe message4
[1] done
[email] Receive message:this is user registe message5
[1] done
[email] Receive message:this is user registe message6
[1] done
[email] Receive message:this is user registe message7
[1] done
[email] Receive message:this is user registe message8
[1] done
[email] Receive message:this is user registe message9
[1] done
4、消費者2(ReceivePhone)
public class ReceivePhone { //交換機名稱 private final static String EXCHANGE_NAME = "test_exchange_fanout"; //隊列名稱 private static final String QUEUE_NAME = "test_queue_phone"; public static void main(String[] args) { try { //獲取連接 Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 final Channel channel = connection.createChannel(); //聲明交換機(分發:發布/訂閱模式) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //將隊列綁定到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //保證一次只分發一個 int prefetchCount = 1; channel.basicQos(prefetchCount); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { //當消息到達時執行回調方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[phone] Receive message:" + message); try { //消費者休息2s處理業務 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); //手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //設置手動應答 boolean autoAck = false; //監聽隊列 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (IOException e) { e.printStackTrace(); } } }
運行結果:
[phone] Receive message:this is user registe message0
[2] done
[phone] Receive message:this is user registe message1
[2] done
[phone] Receive message:this is user registe message2
[2] done
[phone] Receive message:this is user registe message3
[2] done
[phone] Receive message:this is user registe message4
[2] done
[phone] Receive message:this is user registe message5
[2] done
[phone] Receive message:this is user registe message6
[2] done
[phone] Receive message:this is user registe message7
[2] done
[phone] Receive message:this is user registe message8
[2] done
[phone] Receive message:this is user registe message9
[2] done
總結:
1、該模式下生產者並不是直接操作隊列,而是將數據發送給交換機,由交換機將數據發送給與之綁定的隊列。從運行結果中可以看到,兩中類型的消費者(Email,Phone)都收到相同數量的消息。
2、該模式必須聲明交換機,並且設置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout") fanout指分發模式(將每一條消息都發送到與交換機綁定的隊列。
3、 隊列必須綁定交換機:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
註意:本文僅代表個人理解和看法喲!和本人所在公司和團體無任何關系!
RabbitMQ學習第三記:發布/訂閱模式(Publish/Subscribe)