rabbitmq-routing直接路由模型
阿新 • • 發佈:2018-12-22
rabbitmq中直接路由模型屬於釋出-訂閱模型中的一種,它是採用direct交換機,通過路由key精確匹配。
直接路由模型和主題路由模型非常相似,它們唯一區別在於:直接路由模型對於路由key是精確匹配的,而主題路由模型對於路由key是模糊匹配的(使用萬用字元 )。
示例程式碼
日誌生產者 :釋出了info、warning、error級別的訊息各10條。
package com.tingcream.rabbitmq.routing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RoutingLogProducer { private static String EXCHANGE_NAME="routing_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); //info warning error channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments) for (int i=0;i<10;i++){ String message="你好,這是info級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes()); System.out.println("RoutingLogProducer Send: " + message ); } for (int i=0;i<10;i++){ String message="你好,這是warning級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes()); System.out.println("RoutingLogProducer Send: " + message ); } for (int i=0;i<10;i++){ String message="你好,這是error級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes()); System.out.println("RoutingLogProducer Send: " + message ); } channel.close(); connection.close(); } }
日誌消費者A、B、C分別只消費info、warning、error級別的日誌。
RoutingLogReceiverA.java
package com.tingcream.rabbitmq.routing; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RoutingLogReceiverA { private static String EXCHANGE_NAME="routing_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //獲取一個隨機的佇列名稱 String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, EXCHANGE_NAME, "info"); System.out.println("RoutingLogReceiverA Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "RoutingLogReceiverA接收到訊息:" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
RoutingLogReceiverB.java
package com.tingcream.rabbitmq.routing; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RoutingLogReceiverB { private static String EXCHANGE_NAME="routing_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //獲取一個隨機的佇列名稱 String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, EXCHANGE_NAME, "warning"); System.out.println("RoutingLogReceiverB Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "RoutingLogReceiverB接收到訊息:" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
RoutingLogReceiverC.java
package com.tingcream.rabbitmq.routing;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RoutingLogReceiverC {
private static String EXCHANGE_NAME="routing_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主機 埠 vhost 使用者名稱 密碼
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//獲取一個隨機的佇列名稱
String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queue, exchange, routingKey)
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("RoutingLogReceiverC Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "RoutingLogReceiverC接收到訊息:" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
注:
日誌消費者中,宣告使用的交換機型別為direct,交換機名稱為routing_logs(與生產者保持一致),queue的名稱也是隨機生成的。
但channel.queueBind(queueName, EXCHANGE_NAME, "info")程式碼中所繫結的路由key不同。消費者A、B、C分別繫結的路由key為info、warning和error 。
分別啟動消費這A、B、C和生產者程式。
我們再看看管理後臺,其中的queue標籤欄中,又多出了一個交換機,這就是我們剛宣告的一個direct型別的交換機,名叫routing_logs