1. 程式人生 > >rabbitmq routing and binding relation

rabbitmq routing and binding relation

開發十年,就只剩下這套架構體系了! >>>   

binding代表了exchange和queue之間的關係,說明了queue對哪些訊息感興趣

channel.queueBind(queueName,EXCHANGE_NAME,"black")

Direct exchange

上面是將所有的訊息傳送給消費者,我們想要通過特殊的標示去篩除一些訊息,這樣可以減少磁碟的浪費,可以關注更多特定的訊息

這時可以使用direct exchange binding key matches routing key

 

x綁定了兩個queue,一個queue 的binding key是 orange ,另一個是queue 的binding key是 black,green

其他訊息將會被丟棄

 

將多個佇列繫結同一個binding key 也是允許的,這就相當於一種廣播的形式

日誌型別訊息模型如下

 

 

public class RemitLogDirect {

public static final String EXCHANGE_NAME = "direct_log";

 

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

 

String serverity = getServerity(args);

String message = getMessage(args);

channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(Charset.defaultCharset()));

System.out.println(" [x] send '" + serverity + ":" + message + "'");

}

 

}

 

private static String getMessage(String[] args) {

Preconditions.checkNotNull(args, "傳入引數%s不能為空", args);

String backMessage = "";

for (String type : args) {

//some info

}

return backMessage;

}

 

private static String getServerity(String[] args) {

Preconditions.checkNotNull(args, "傳入引數%s不能為空", args);

 

for (String type : args) {

if (type.equals("error")) {

return "error";

}

if (type.equals("info")) {

return "info";

}

if (type.equals("warn")) {

return "warn";

}

}

return "undefined";

}

}

 

 

public class ReceiveLogsDirect {

 

private static final String EXCHANGE_NAME = "direct_logs";

 

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

 

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String queueName = channel.queueDeclare().getQueue();

if (args.length < 1) {

System.out.println("Usages ReceiveLogsDirect [info] [warning] [error]");

System.exit(1);

}

for (String serverity : args) {

channel.queueBind(queueName, EXCHANGE_NAME, serverity);

}

System.out.println("[x] waiting for messages ,to exit press c");

 

DeliverCallback deliverCallback = ((consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("[x] received ...." + message);

 

 

});

 

 

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

});