rabbitmq routing and binding relation
binding代表了exchange和queue之間的關係,說明了queue對哪些訊息感興趣
channel.queueBind(queueName,EXCHANGE_NAME,"black")
Direct exchange
上面是將所有的訊息傳送給消費者,我們想要通過特殊的標示去篩除一些訊息,這樣可以減少磁碟的浪費,可以關注更多特定的訊息
這時可以使用direct exchange binding key matches routing key
x綁定了兩個queue,一個queue 的binding key是 orange ,另一個是queue 的binding key是 black,green
其他訊息將會被丟棄
將多個佇列繫結同一個binding key 也是允許的,這就相當於一種廣播的形式
日誌型別訊息模型如下
public class RemitLogDirect {
public static final String EXCHANGE_NAME = "direct_log";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String serverity = getServerity(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(Charset.defaultCharset()));
System.out.println(" [x] send '" + serverity + ":" + message + "'");
}
}
private static String getMessage(String[] args) {
Preconditions.checkNotNull(args, "傳入引數%s不能為空", args);
String backMessage = "";
for (String type : args) {
//some info
}
return backMessage;
}
private static String getServerity(String[] args) {
Preconditions.checkNotNull(args, "傳入引數%s不能為空", args);
for (String type : args) {
if (type.equals("error")) {
return "error";
}
if (type.equals("info")) {
return "info";
}
if (type.equals("warn")) {
return "warn";
}
}
return "undefined";
}
}
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1) {
System.out.println("Usages ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String serverity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, serverity);
}
System.out.println("[x] waiting for messages ,to exit press c");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] received ...." + message);
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});