1. 程式人生 > >rabbitmq--釋出-訂閱模型

rabbitmq--釋出-訂閱模型

上兩篇中我們介紹了rabbitmq中的收發模型和工作佇列模型,它們都一個共同的特點——exchange是透明的,似乎不存在,收、發雙方都是通過queue名稱產生關聯。

本篇中我們來著重介紹下rabbitmq中的exchange ,rabbitmq中涉及exchange(非透明)的收發模型我們稱之為釋出-訂閱模型。在釋出-訂閱模型中,訊息生產者和消費者不再是直接面對queue(佇列名稱),而是直面exchange,都需要經過exchange來進行訊息的傳送和接收。 

釋出-訂閱模型又可以分為以下幾種:

fanout: 廣播交換機,廣播模型,不關心queue名稱,不關心路由key

direct:  直接交換機,直接模型,根據路由key字串精確匹配, 不關心queue名稱,關心路由key

topic:主題交換機,主題模型,根據路由key字串模糊(萬用字元)匹配, 不關心queue名稱,關心路由key

headers:頭交換機,訊息頭模型(使用較少),不關心queue名稱,不關心路由key,關心訊息頭中key-value對。

本篇,我們來介紹下第一種釋出訂閱模型——fanout廣播模型。

廣播模型的特點是:所有發往同一個fanout交換機的訊息都會被所有監聽這個交換機的消費者接收到,而不關心具體的路由key。廣播交換機就如同一個廣播地址,只要有消費者監聽這個廣播地址,則一定都會收到來自這個廣播地址收到的所有訊息。如圖:

示例程式碼如下 

日誌生產者:

package com.tingcream.rabbitmq.broadcast;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class LogProducer {
	
	private static String EXCHANGE_NAME="logs";
	
	public static void main(String[] args) throws Exception {
		
		
		 ConnectionFactory factory = new ConnectionFactory();
         
			//主機  埠  vhost 使用者名稱 密碼
			factory.setHost("192.168.9.102");
			factory.setUsername("rabbitmq");
			factory.setPassword("rabbitmq123");
			factory.setPort(AMQP.PROTOCOL.PORT);
			factory.setVirtualHost("/");
         
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        
        //    宣告一個fanout 廣播交換機,名稱為logs  ,  宣告的fanout佇列     autoDelete為true, exclude 為true
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //amq.gen-xxxxxxxxxx   AD EXCL      
       //  channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments)
         
        for (int i=0;i<10;i++){
            String message="你好 World "+i;
             // channel.basicPublish(exchange, routingKey, props, body);
             //釋出訊息時指定routingKey為""
             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
             System.out.println("LogProducer Send :" + message );
        }
        channel.close();
        connection.close();
    }

}

日誌生產者程式碼中聲明瞭一個名叫logs的fanout型別交換機,並使用這個交換機來發送訊息,並且傳送訊息時設定路由key為"",因為在廣播的場景中根本就不關心路由key。

日誌消費者A:

package com.tingcream.rabbitmq.broadcast;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiverA {
	private static    String EXCHANGE_NAME="logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    //重要 與生產者使用同一個交換機
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    //關聯 exchange 和 queue ,因為是廣播無需指定routekey,routingKey設定為空字串
	   // channel.queueBind(queue, exchange, routingKey)
	    channel.queueBind(queueName, EXCHANGE_NAME, ""); 
	    System.out.println("LogReceiverA Waiting for messages");
	    
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "LogReceiverA接收到訊息:" + message );   
	        }
	        
	      };
	      channel.basicConsume(queueName, true, consumer);
 
		
	}

}

日誌消費者B:

package com.tingcream.rabbitmq.broadcast;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiverB {
	private static    String EXCHANGE_NAME="logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    //重要 與生產者使用同一個交換機
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    //獲取一個隨機的佇列名稱
	    String queueName = channel.queueDeclare().getQueue();
	    
	    //關聯 exchange 和 queue ,因為是廣播無需指定routekey,routingKey設定為空字串
	   // channel.queueBind(queue, exchange, routingKey)
	     channel.queueBind(queueName, EXCHANGE_NAME, ""); 
	     System.out.println("LogReceiverB Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "LogReceiverA接收到訊息:" + message );   
	        }
	        
	      };
	      //true 自動回覆ack
	      channel.basicConsume(queueName, true, consumer);
 
		
	}

}

注意:

日誌消費者A、B程式碼中使用了隨機生成的queue名稱(在本地隨機產生一個名稱),然後繫結這個queue名稱到logs交換機上了。這是因為所有釋出-訂閱模型對queue名稱都不關心,只要queue名稱唯一、不重複即可。繫結queue到交換機是一種慣例的做法,實際上就是從交換機上取出一條訊息並關聯上一個指定的queue,然後我們就可以對這個queue進行監聽等等,當然本例中queue的名稱是隨機生成。

先啟動消費者A、B, 再啟動生產者,執行下,登入管理後臺。

登入管理後臺後,我們發現exchange標籤項中最後一行多出了一個名叫logs交換機,它的型別為fanout(廣播),這就是我們程式碼中自定義的一個交換機。其上的7個交換機都是rabbitmq中內建的交換機。(最上面一個我們已經很熟悉了,收-發模型和工作佇列模型就是採用的這種exchange)

queue標籤項中多出了兩條隨機生成的佇列名稱。