1. 程式人生 > >routing

routing

pan ctrl wait 類型 utf-8 override declare rop next

綁定是exchange和queue之間的一種關系,這可以簡單的理解為:這個queue對這個exchange中的消息感興趣。

channel.queueBind(queueName, EXCHANGE_NAME, "");

綁定可以使用一個額外的routingKey參數,為了避免和basic_publish參數混淆,我們稱它為binding key。 我們可以這樣來使用key創建一個綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的含義取決於不同的exchange類型,我們之前使用的fanout類型會直接忽略這個值。

direct類型的exchange的路由算法很簡單——消息將會被傳遞到與它的routing key完全相同的 binding key的queue中

 1 package com.rabbitmq.www.publish_subscribe.direct;
 2 
 3 import java.util.Random;
 4 
 5 import com.rabbitmq.client.BuiltinExchangeType;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.Channel; 9 10 public class EmitLogDirect { 11 12 private static final String EXCHANGE_NAME = "direct_logs"; 13 private final static String HOST_ADDR = "172.18.112.102"; 14 15 public static void main(String[] argv) throws Exception { 16 17 ConnectionFactory factory = new
ConnectionFactory(); 18 factory.setHost(HOST_ADDR); 19 Connection connection = factory.newConnection(); 20 Channel channel = connection.createChannel(); 21 //申明exchange 類型direct 22 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 23 24 for(int i=0;i<=10;i++){ 25 String message = "helloworld"+i; 26 Random random = new Random(); 27 String severity = "info"; 28 if(random.nextInt(2)==1){ 29 severity = "debug"; 30 } 31 //信息發送給申明的exchange,指明routingkey 32 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); 33 System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); 34 } 35 36 37 38 channel.close(); 39 connection.close(); 40 } 41 42 43 }
package com.rabbitmq.www.publish_subscribe.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";
  private final static String HOST_ADDR = "172.18.112.102";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(HOST_ADDR);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    String queueName = channel.queueDeclare().getQueue();
    //direct類型exchange申明routekey debug
      channel.queueBind(queueName, EXCHANGE_NAME, "debug");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

routing