1. 程式人生 > >rabbitmq-routing直接路由模型

rabbitmq-routing直接路由模型

rabbitmq中直接路由模型屬於釋出-訂閱模型中的一種,它是採用direct交換機,通過路由key精確匹配。

直接路由模型和主題路由模型非常相似,它們唯一區別在於:直接路由模型對於路由key是精確匹配的,而主題路由模型對於路由key是模糊匹配的(使用萬用字元 )。

示例程式碼  

日誌生產者 :釋出了info、warning、error級別的訊息各10條。

package com.tingcream.rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RoutingLogProducer {
	
	private static String EXCHANGE_NAME="routing_logs";
	
	public static void main(String[] args) throws Exception {
		
		
		 ConnectionFactory factory = new ConnectionFactory();
         
			//主機  埠  vhost 使用者名稱 密碼
			factory.setHost("192.168.9.102");
			factory.setUsername("rabbitmq");
			factory.setPassword("rabbitmq123");
			factory.setPort(AMQP.PROTOCOL.PORT);
			factory.setVirtualHost("/");
         
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        
        //info warning  error
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");      
       //  channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments)
         
        for (int i=0;i<10;i++){
            String message="你好,這是info級別訊息 "+i;
             channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes());
             System.out.println("RoutingLogProducer Send: " + message );
        }
        for (int i=0;i<10;i++){
        	String message="你好,這是warning級別訊息 "+i;
        	channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes());
        	System.out.println("RoutingLogProducer Send: " + message );
        }
        for (int i=0;i<10;i++){
        	String message="你好,這是error級別訊息 "+i;
        	channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
        	System.out.println("RoutingLogProducer Send: " + message );
        }
        channel.close();
        connection.close();
    }

}

日誌消費者A、B、C分別只消費info、warning、error級別的日誌。

RoutingLogReceiverA.java

package com.tingcream.rabbitmq.routing;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RoutingLogReceiverA {
	private static    String EXCHANGE_NAME="routing_logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    
	   // channel.queueBind(queue, exchange, routingKey)
	    channel.queueBind(queueName, EXCHANGE_NAME, "info"); 
	    System.out.println("RoutingLogReceiverA Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "RoutingLogReceiverA接收到訊息:" + message );   
	        }
	      };
	      channel.basicConsume(queueName, true, consumer);
	}

}

 RoutingLogReceiverB.java

package com.tingcream.rabbitmq.routing;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RoutingLogReceiverB {
	private static    String EXCHANGE_NAME="routing_logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    
	   // channel.queueBind(queue, exchange, routingKey)
	    channel.queueBind(queueName, EXCHANGE_NAME, "warning"); 
	    System.out.println("RoutingLogReceiverB Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "RoutingLogReceiverB接收到訊息:" + message );   
	        }
	      };
	      channel.basicConsume(queueName, true, consumer);
	}

}

RoutingLogReceiverC.java

package com.tingcream.rabbitmq.routing;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RoutingLogReceiverC {
	private static    String EXCHANGE_NAME="routing_logs";
	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
        
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
	    Connection connection=factory.newConnection();
	    Channel channel=connection.createChannel();
	    
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    
	    //獲取一個隨機的佇列名稱   
	    String queueName = channel.queueDeclare().getQueue();
	    
	    
	   // channel.queueBind(queue, exchange, routingKey)
	    channel.queueBind(queueName, EXCHANGE_NAME, "error"); 
	    System.out.println("RoutingLogReceiverC Waiting for messages");
	    
	      Consumer consumer = new DefaultConsumer(channel) {
	        @Override
	        public void handleDelivery(String consumerTag, Envelope envelope,
	                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
	          String message = new String(body, "UTF-8");
	          System.out.println( "RoutingLogReceiverC接收到訊息:" + message );   
	        }
	      };
	      channel.basicConsume(queueName, true, consumer);
	}

}

注:

日誌消費者中,宣告使用的交換機型別為direct,交換機名稱為routing_logs(與生產者保持一致),queue的名稱也是隨機生成的。

但channel.queueBind(queueName, EXCHANGE_NAME, "info")程式碼中所繫結的路由key不同。消費者A、B、C分別繫結的路由key為info、warning和error 。

 

分別啟動消費這A、B、C和生產者程式。

我們再看看管理後臺,其中的queue標籤欄中,又多出了一個交換機,這就是我們剛宣告的一個direct型別的交換機,名叫routing_logs