1. 程式人生 > >rabbitmq--topic主題路由模型

rabbitmq--topic主題路由模型

上一篇,我們介紹了直接路由模型,這一篇我們介紹下主題路由模型。

就如同上一篇所說,直接路由模型和主題路由模型非常相似,它們唯一區別在於:直接路由模型對於路由key是精確匹配的,而主題路由模型對於路由key是模糊匹配的(使用萬用字元 )。

因此當我們瞭解上一篇中的直接路由模型,也不難理解本篇中的主題路由模型了。如圖:

那麼topic中的路由key的匹配規則是什麼呢? 

topic模式中路由key中存在兩種萬用字元  * 和 # 

  • * (star) can substitute for exactly one word.       *匹配明確的一個單詞
  • # (hash) can substitute for zero or more words.    #匹配0到多個單詞

 

topic模式中若路由key為"#" ,則表示對所有佇列感興趣 ,相當於fanout 

topic模式中若路由key中不含有*和#字元時,則表示只對精確匹配的路由key佇列感興趣,則相對於routing 
 

示例程式碼:

生產者 

package com.tingcream.rabbitmq.topic;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TopicLogProducer {
	
	private static String EXCHANGE_NAME="topic_logs";
	
	public static void main(String[] args) throws Exception {
		
		
		 ConnectionFactory factory = new ConnectionFactory();
         
			//主機  埠  vhost 使用者名稱 密碼
			factory.setHost("192.168.9.102");
			factory.setUsername("rabbitmq");
			factory.setPassword("rabbitmq123");
			factory.setPort(AMQP.PROTOCOL.PORT);
			factory.setVirtualHost("/");
         
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        

//        lazy.black.elephant    懶惰的黑色的象
//        lazy.white.sheep       懶惰的白色的羊
//        quick.black.horse      快速的黑色的馬
//        quick.red.horse        快速的紅色的馬
        
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");  
        
        for (int i=0;i<10;i++){
            String message="你好,這是lazy.black.elephant級別訊息 "+i;
             channel.basicPublish(EXCHANGE_NAME,"lazy.black.elephant",null,message.getBytes());
             System.out.println("TopicLogProducer Send: " + message );
        }
        for (int i=0;i<10;i++){
        	String message="你好,這是lazy.white.sheep級別訊息 "+i;
        	channel.basicPublish(EXCHANGE_NAME,"lazy.white.sheep",null,message.getBytes());
        	System.out.println("TopicLogProducer Send: " + message );
        }
        for (int i=0;i<10;i++){
        	String message="你好,這是quick.black.horse級別訊息 "+i;
        	channel.basicPublish(EXCHANGE_NAME,"quick.black.horse",null,message.getBytes());
        	System.out.println("TopicLogProducer Send: " + message );
        }
        for (int i=0;i<10;i++){
        	String message="你好,這是quick.red.horse級別訊息 "+i;
        	channel.basicPublish(EXCHANGE_NAME,"quick.red.horse",null,message.getBytes());
        	System.out.println("TopicLogProducer Send: " + message );
        }
        channel.close();
        connection.close();
    }

}

消費者A

package com.tingcream.rabbitmq.topic;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicLogReceiverA {
	private static    String EXCHANGE_NAME="topic_logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    
	    
	    
	    //  繫結routingKey "#" 表示對所有佇列感興趣 ,相當於fanout 
	    //  繫結routingKey 若不含"#"和"*",表示只對精確匹配的路由key佇列感興趣,則相對於routing 
	    
	    
	   // channel.queueBind(queue, exchange, routingKey)
	    channel.queueBind(queueName, EXCHANGE_NAME, "*.black.*"); 
	    System.out.println("TopicLogReceiverA Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "TopicLogReceiverA接收到訊息:" + message );   
	        }
	      };
	      channel.basicConsume(queueName, true, consumer);
	}

}

消費者B

 

package com.tingcream.rabbitmq.topic;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicLogReceiverB {
	private static    String EXCHANGE_NAME="topic_logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    
	   // channel.queueBind(queue, exchange, routingKey)  //bind方法可以呼叫多次,繫結多個
	    channel.queueBind(queueName, EXCHANGE_NAME, "quick.#"); 
	    channel.queueBind(queueName, EXCHANGE_NAME, "*.white.*"); 
	    
	    
	    
	    System.out.println("TopicLogReceiverB Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "TopicLogReceiverB接收到訊息:" + message );   
	        }
	      };
	      channel.basicConsume(queueName, true, consumer);
	}

}

 

依次啟動消費者A、B和生產者。

消費者A輸出

TopicLogReceiverA Waiting for messages
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 0
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 1
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 2
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 3
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 4
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 5
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 6
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 7
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 8
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 9
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 0
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 1
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 2
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 3
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 4
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 5
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 6
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 7
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 8
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 9

消費者B輸出:

TopicLogReceiverB Waiting for messages
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 9
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 9
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 9
 

管理後臺中 exchange標籤項中多出了一個topic交換機,名叫 topic_logs ,這就是我們剛宣告的topic型別交換機。