routing
阿新 • • 發佈:2017-06-17
pan ctrl wait 類型 utf-8 override declare rop next
綁定是exchange和queue之間的一種關系,這可以簡單的理解為:這個queue對這個exchange中的消息感興趣。
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定可以使用一個額外的routingKey
參數,為了避免和basic_publish
參數混淆,我們稱它為binding key
。 我們可以這樣來使用key創建一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key的含義取決於不同的exchange類型,我們之前使用的fanout類型會直接忽略這個值。
direct類型的exchange的路由算法很簡單——消息將會被傳遞到與它的routing key
完全相同的 binding key
的queue中
1 package com.rabbitmq.www.publish_subscribe.direct; 2 3 import java.util.Random; 4 5 import com.rabbitmq.client.BuiltinExchangeType; 6 import com.rabbitmq.client.ConnectionFactory; 7 import com.rabbitmq.client.Connection;8 import com.rabbitmq.client.Channel; 9 10 public class EmitLogDirect { 11 12 private static final String EXCHANGE_NAME = "direct_logs"; 13 private final static String HOST_ADDR = "172.18.112.102"; 14 15 public static void main(String[] argv) throws Exception { 16 17 ConnectionFactory factory = newConnectionFactory(); 18 factory.setHost(HOST_ADDR); 19 Connection connection = factory.newConnection(); 20 Channel channel = connection.createChannel(); 21 //申明exchange 類型direct 22 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 23 24 for(int i=0;i<=10;i++){ 25 String message = "helloworld"+i; 26 Random random = new Random(); 27 String severity = "info"; 28 if(random.nextInt(2)==1){ 29 severity = "debug"; 30 } 31 //信息發送給申明的exchange,指明routingkey 32 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); 33 System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); 34 } 35 36 37 38 channel.close(); 39 connection.close(); 40 } 41 42 43 }
package com.rabbitmq.www.publish_subscribe.direct; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private final static String HOST_ADDR = "172.18.112.102"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDR); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); //direct類型exchange申明routekey debug channel.queueBind(queueName, EXCHANGE_NAME, "debug"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer); } }
routing