RabbitMQ使用教程(六):更強大的交換機—Topics
一、Topics交換機
之前我們學習了釋出/訂閱模式、路由模式,其中一個使用了最簡單的fanout交換機,一個使用了帶個性化的direct交換機,儘管direct在一定程度上提供了個性化操作入口,改善了我們的日誌系統,但是還遠遠不夠,因為需求總是千奇百怪的,direct的限制在於:不支援多重標準。
還是以我們的日誌系統為例,我們不僅需要根據嚴重級別來處理日誌,還可能需要根據不同的來源作出不同的處理,這時候就需要使用更大的交換機:Topics。Topics交換機可以實現更加複雜的訊息傳送規則,即傳送訊息時,指定更為複雜的routingKey。
二、Topics的配置規則
使用Topics的交換機,我們就不能使用任意名字的routingKey,其必須遵守其規則:以一串words命名,使用逗號分隔。
比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.
可以隨意地增加routingKey的words個數,但是最大長度為255個位元組。
Topics的邏輯跟direct大同小異,它與direct一樣,可以發給多個特定的消費者,但是其有兩點值得關注的:
①:*可以代表任何一個單詞
②:#可以代表0個或多個單詞
如下圖:
我們的生產者打算髮送關於動物的訊息:
Q1佇列綁定了:* . orange . *
Q2佇列綁定了:* . * . rabbit、lazy . #
這裡說明Q1對橘色的動物感興趣,而Q2對兔子、懶的動物感興趣。
一下是一些訊息指定的routingKey,以及它將會發送到的佇列:
quick.orange.rabbit:二者
“lazy.orange.elephant”:二者
“quick.orange.fox”:Q1
“lazy.brown.fox” :Q2
“quick.brown.fox”:無(該訊息將會被丟棄)
“orange”:無
“quick.orange.male.rabbit”:無(位數過長,訊息將被丟棄)
“lazy.orange.male.rabbit”:Q2
三、特殊的Topics
Topics交換機非常的強大,它也可以實現fanout與direct那樣的使用效果,當一個佇列的routingKey繫結為:“#”時,它接收所有訊息;當其繫結為“*”、“#”都沒有使用時,其相當於direct。
四、程式碼實現
Topics交換機非常的強大,它也可以實現fanout與direct那樣的使用效果,當一個佇列的routingKey繫結為:“#”時,它接收所有訊息;當其繫結為“*”、“#”都沒有使用時,其相當於direct。
我們將error日誌分層處理,sql異常的儲存在sql異常相關的磁碟,service異常儲存在service相關的目錄,為了簡單,我們使用列印語句代替將要執行的操作。
EmitTopicLog.java:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitTopicLog {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//debug日誌
for (int i = 1; i <= 3; i++) {
String message = "debug_message" + i;
channel.basicPublish(EXCHANGE_NAME, "debug", null, message.getBytes());
}
//info日誌
for (int i = 1; i <= 3; i++) {
String message = "info_message" + i;
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
}
//sql error日誌
for (int i = 1; i <= 3; i++) {
String message = "error_message" + i;
channel.basicPublish(EXCHANGE_NAME, "userQuery.sql.error", null, message.getBytes());
}
//sql error日誌
for (int i = 1; i <= 3; i++) {
String message = "error_message" + i;
channel.basicPublish(EXCHANGE_NAME, "userUpdate.sql.error", null, message.getBytes());
}
//service error日誌
for (int i = 1; i <= 3; i++) {
String message = "error_message" + i;
channel.basicPublish(EXCHANGE_NAME, "userLogin.service.error", null, message.getBytes());
}
channel.close();
connection.close();
}
}
ReceiveLog1.java:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "debug");
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
ReceiveLog2.java:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
ReceiveLog3.java:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog3 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.sql.error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("將"+message+"記錄到sql異常相關的目錄");
}
};
channel.basicConsume(queueName,true,consumer);
}
}
ReceiveLog4.java:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLog4 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.service.error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
FileUtil.save(message);
System.out.println("將"+message+"記錄到service異常相關的目錄");
}
};
channel.basicConsume(queueName,true,consumer);
}
}
執行結果:
五、總結
將釋出/訂閱模式、路由模式、Topic模式進行對比,很容易發現以下結論:
Publish/Subcribe、Routing、Topics都是圍繞交換機進行的,差別僅是交換機的型別不同。 |
下篇將和大家一起來探索遠端呼叫RPC