RabbitMQ入門教程(六):路由選擇Routing
阿新 • • 發佈:2019-02-18
簡介
本節主要演示使用直連線型別,將多個路由鍵繫結到同一個佇列上。也可以將同一個鍵繫結到多個佇列上(多重繫結multiple bindings),此時滿足鍵的佇列都能收到訊息,不滿足的直接被丟棄。
生產者
public class Producer {
@Test
public void testBasicPublish() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1" );
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Routing 的路由規則使用直連線
String EXCHANGE_NAME = "exchange.direct.routing" ;
String[] routingKeys = {"debug", "info", "warning", "error"};
for (int i = 0; i < 20; i++){
int random = (int)(Math.random() * 4);
String routingKey = routingKeys[random];
String message = "Hello RabbitMQ - " + routingKey + " - " + i;
channel.basicPublish(EXCHANGE_NAME, routingKey, null , message.getBytes("UTF-8"));
}
// 關閉資源
channel.close();
connection.close();
}
}
消費者1
public class Consumer1 {
@Test
public void testBasicConsumer1() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱,queueDeclare()方法沒有任何引數,當最後一個消費者斷開時就會刪除掉該佇列,當消費者結束後可以看到佇列就刪除了
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端佇列繫結
// 將一個對列繫結多個路由鍵
String[] routingKeys = {"debug", "info"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}
System.out.println("Consumer Wating Receive Message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Thread.sleep(1000000);
}
}
消費者2
public class Consumer2 {
@Test
public void testBasicConsumer2() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端佇列繫結
// 將一個對列繫結多個路由鍵
String[] routingKeys = {"warning", "error"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}
System.out.println("Consumer Wating Receive Message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Thread.sleep(1000000);
}
}