RabbitMQ入門學習——Topics(主題)
在前面的章節中,我們改善了我們的日誌程式碼。我們使用direct直播代替fanout廣播,並且可以選擇性惡接受日誌。儘管使用direct直播改善了我們的日誌程式碼,但是它還有許多侷限性。比如:不能給予多種標準來路由。
在我們的日誌系統中,我們也許即希望按照日誌的嚴重程度來訂閱,也希望按照日誌的來源定於。你也許知道unix syslog工具的概念,它是給予嚴重程度和裝置來路由日誌的。這將給我們許多的靈活性——我們也許既希望監聽嚴重的錯誤,同時也不希望錯過來自kern的日誌。為了實現這種目的,我們需要學習一種更復雜的交換區——topic。
1:Topic exchange(主題交換區)
傳送到topic
Binding key必須要有相同的形式。Topic交換區和direct有些類似——它們都通過一個特定的routing key傳遞給多個binding key相匹配的佇列。對於binding key有兩點非常重要:
*可以代替一個單詞
#可以代替
下圖是一個最簡單的例子
在這個例子中,我們要傳送的訊息全部是用來描述動物的。該訊息將會被路由到一個包含三個單詞的routing key上(兩個點分隔)。Routing key中的第一個單詞表示速度,第二個表示顏色,第三個種類:“<speed>.<color>.<species>”。
我們建立了三個binding:Q1通過“*.orange.*”繫結,Q2通過“*.*.rabbit”和“lazy.#”繫結。
這三種繫結可以描述為:
Q1對所有黃顏色的動物感興趣
Q2對所有的兔子和比較懶的動物感興趣
通過使用routing key“quick.orange.rabbit
如果我們沒有按照約定來發送訊息(比如“orange”,“quick.orange.male.rabbit”),那將會怎麼樣呢?這些訊息由於沒有匹配到任何的binding,所以他們將會被丟棄。雖然“lazy.orange.male.rabbit”有四個單詞,單是匹配到了最後一個binding,所以會發送給Q2。
Topic交換區功能非常強大,他可以實現其他交換區的功能。
當一個佇列通過binding key “#”進行繫結,它會接受所有的訊息,忽略routing key。類似fanout交換區
當特殊字元“*”和“#”沒有在binding中使用,topic交換區就和direct交換區類似。
2:Putting it all together
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
connection.close();
}
//...
}
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for(String bindingKey : argv){
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}