1. 程式人生 > >RabbitMQ 之 訂閱模式 Publish/Subscribe

RabbitMQ 之 訂閱模式 Publish/Subscribe

模型圖

我們之前學習的都是一個訊息只能被一個消費者消費,那麼如果我想發一個訊息 能被多個消費者消費,這時候怎麼辦? 這時候我們就得用到了訊息中的釋出訂閱模型

在前面的教程中,我們建立了一個工作佇列,都是一個任務只交給一個消費者。這次我們做 將訊息傳送給多個消費者。這種模式叫做“釋出/訂閱”。

舉列:    

類似微信訂閱號 釋出文章訊息 就可以廣播給所有的接收者。(訂閱者)

那麼咱們來看一下圖,我們學過前兩種有一些不一樣,work 模式 是不是同一個佇列 多個消費者,而 ps 這種模式呢,是一個佇列對應一個消費者,pb 模式還多了一個 X(交換機 轉發器) ,這時候我們要獲取訊息 就需要佇列繫結到交換機上,交換機把訊息傳送到佇列 , 消費者才能獲取佇列的訊息

解讀:
1、1 個生產者,多個消費者
2、每一個消費者都有自己的一個佇列
3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機(轉發器)
4、每個佇列都要繫結到交換機
5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的

生產者

 1 package cn.wh.simple;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 
 7 import cn.wh.util.RabbitMqConnectionUtil;
8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 11 public class Send { 12 13 private static final String EXCHANGE_NAME="test_exchange_fanout"; 14 public static void main(String[] args) throws IOException, TimeoutException { 15 16 Connection connection = RabbitMqConnectionUtil.getConnection();
17 18 Channel channel = connection.createChannel(); 19 20 //宣告交換機 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分發 22 23 //傳送訊息 24 String msg="hello ps"; 25 26 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); 27 28 System.out.println("Send :"+msg); 29 30 channel.close(); 31 connection.close(); 32 } 33 }

消費者1

 1 package cn.wh.simple;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import cn.wh.util.RabbitMqConnectionUtil;
 7 
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14 
15 public class Recv1 {
16     
17     private static final String QUEUE_NAME="test_queue_fanout_email";
18     private static final String  EXCHANGE_NAME="test_exchange_fanout";
19     public static void main(String[] args) throws IOException, TimeoutException {
20         Connection connection = RabbitMqConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         
23         //佇列宣告
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         //繫結佇列到交換機 轉發器
27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
28         
29         
30         channel.basicQos(1);//保證一次只分發一個  
31         
32         //定義一個消費者
33         Consumer consumer=new DefaultConsumer(channel){
34             //訊息到達 觸發這個方法
35             @Override
36             public void handleDelivery(String consumerTag, Envelope envelope,
37                     BasicProperties properties, byte[] body) throws IOException {
38              
39                 String msg=new String(body,"utf-8");
40                 System.out.println("[1] Recv msg:"+msg);
41                 
42                 try {
43                     Thread.sleep(2000);
44                 } catch (InterruptedException e) {
45                     e.printStackTrace();
46                 }finally{
47                     System.out.println("[1] done ");
48                     channel.basicAck(envelope.getDeliveryTag(), false);
49                 }
50             }
51         };
52         
53         boolean autoAck=false;//自動應答 false
54         channel.basicConsume(QUEUE_NAME,autoAck , consumer);
55     }
56 }

消費者2

package cn.wh.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


import cn.wh.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv2 {
    
    private static final String QUEUE_NAME="test_queue_fanout_sms";
    private static final String  EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMqConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        //佇列宣告
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //繫結佇列到交換機 轉發器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保證一次只分發一個

        //定義一個消費者
        Consumer consumer=new DefaultConsumer(channel){
            //訊息到達 觸發這個方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {

                String msg=new String(body,"utf-8");
                System.out.println("[2] Recv msg:"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("[2] done ");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck=false;//自動應答 false
        channel.basicConsume(QUEUE_NAME,autoAck , consumer);
    }
}

 測試

一個訊息 可以被多個消費者