RabbitMQ 之 訂閱模式 Publish/Subscribe
阿新 • • 發佈:2018-12-04
模型圖
我們之前學習的都是一個訊息只能被一個消費者消費,那麼如果我想發一個訊息 能被多個消費者消費,這時候怎麼辦? 這時候我們就得用到了訊息中的釋出訂閱模型
在前面的教程中,我們建立了一個工作佇列,都是一個任務只交給一個消費者。這次我們做 將訊息傳送給多個消費者。這種模式叫做“釋出/訂閱”。
舉列:
類似微信訂閱號 釋出文章訊息 就可以廣播給所有的接收者。(訂閱者)
那麼咱們來看一下圖,我們學過前兩種有一些不一樣,work 模式 是不是同一個佇列 多個消費者,而 ps 這種模式呢,是一個佇列對應一個消費者,pb 模式還多了一個 X(交換機 轉發器) ,這時候我們要獲取訊息 就需要佇列繫結到交換機上,交換機把訊息傳送到佇列 , 消費者才能獲取佇列的訊息
解讀:
1、1 個生產者,多個消費者
2、每一個消費者都有自己的一個佇列
3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機(轉發器)
4、每個佇列都要繫結到交換機
5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的
生產者
1 package cn.wh.simple; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.wh.util.RabbitMqConnectionUtil;8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 11 public class Send { 12 13 private static final String EXCHANGE_NAME="test_exchange_fanout"; 14 public static void main(String[] args) throws IOException, TimeoutException { 15 16 Connection connection = RabbitMqConnectionUtil.getConnection();17 18 Channel channel = connection.createChannel(); 19 20 //宣告交換機 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分發 22 23 //傳送訊息 24 String msg="hello ps"; 25 26 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); 27 28 System.out.println("Send :"+msg); 29 30 channel.close(); 31 connection.close(); 32 } 33 }
消費者1
1 package cn.wh.simple; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import cn.wh.util.RabbitMqConnectionUtil; 7 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 import com.rabbitmq.client.Consumer; 11 import com.rabbitmq.client.DefaultConsumer; 12 import com.rabbitmq.client.Envelope; 13 import com.rabbitmq.client.AMQP.BasicProperties; 14 15 public class Recv1 { 16 17 private static final String QUEUE_NAME="test_queue_fanout_email"; 18 private static final String EXCHANGE_NAME="test_exchange_fanout"; 19 public static void main(String[] args) throws IOException, TimeoutException { 20 Connection connection = RabbitMqConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 23 //佇列宣告 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 //繫結佇列到交換機 轉發器 27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 28 29 30 channel.basicQos(1);//保證一次只分發一個 31 32 //定義一個消費者 33 Consumer consumer=new DefaultConsumer(channel){ 34 //訊息到達 觸發這個方法 35 @Override 36 public void handleDelivery(String consumerTag, Envelope envelope, 37 BasicProperties properties, byte[] body) throws IOException { 38 39 String msg=new String(body,"utf-8"); 40 System.out.println("[1] Recv msg:"+msg); 41 42 try { 43 Thread.sleep(2000); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 }finally{ 47 System.out.println("[1] done "); 48 channel.basicAck(envelope.getDeliveryTag(), false); 49 } 50 } 51 }; 52 53 boolean autoAck=false;//自動應答 false 54 channel.basicConsume(QUEUE_NAME,autoAck , consumer); 55 } 56 }
消費者2
package cn.wh.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import cn.wh.util.RabbitMqConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Recv2 { private static final String QUEUE_NAME="test_queue_fanout_sms"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //佇列宣告 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //繫結佇列到交換機 轉發器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1);//保證一次只分發一個 //定義一個消費者 Consumer consumer=new DefaultConsumer(channel){ //訊息到達 觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg=new String(body,"utf-8"); System.out.println("[2] Recv msg:"+msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("[2] done "); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck=false;//自動應答 false channel.basicConsume(QUEUE_NAME,autoAck , consumer); } }
測試
一個訊息 可以被多個消費者