RabbitMQ(五)—路由選擇
在前篇博文中,我們建立了一個簡單的日誌系統。可以廣播訊息給多個消費者。本篇博文,我們將新增新的特性——我們可以只訂閱部分訊息。比如:我們可以接收Error級別的訊息寫入檔案。同時仍然可以在控制檯列印所有日誌。
Bindings(繫結)
在上一篇部落格中我們已經使用過繫結。類似下面的程式碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
繫結表示轉換器與佇列之間的關係。可以簡單的人為:佇列對該轉發器上的訊息感興趣。
繫結可以設定額外的routingKey引數。為了與避免basicPublish方法(釋出訊息的方法)的引數混淆,我們準備
把它稱作繫結鍵(binding key)。下面展示如何使用繫結鍵(binding key)來建立一個繫結:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
繫結鍵關鍵取決於轉換器的型別。對於fanout型別,忽略此引數。
Direct exchange(直接轉發)
前面講到我們的日誌系統廣播訊息給所有的消費者。我們想對其擴充套件,根據訊息的嚴重性來過濾訊息。例如:我們希望將致命錯誤的日誌訊息記錄到檔案,而不是把磁碟空間浪費在warn和info型別的日誌上。我們使用的fanout轉發器,不能給我們太多的靈活性。它僅僅只是盲目的廣播而已。我們使用direct轉發器進行代替,其背後的演算法很簡單——訊息會被推送至繫結鍵(binding key)和訊息釋出附帶的選擇鍵(routing key)完全匹配的佇列。
在上圖中,我們可以看到direct型別的轉發器與2個佇列進行了繫結。第一個佇列使用的繫結鍵是orange,第二個佇列繫結鍵為black和green。這樣當訊息釋出到轉發器是,附帶orange繫結鍵的訊息將被路由到佇列Q1中去。附帶black和green繫結鍵的訊息被路由到Q2中去。其他訊息全部丟棄。
Multiple bindings(多重繫結)
使用一個繫結鍵繫結多個佇列是完全合法的。如上圖,繫結鍵black綁定了2個佇列——Q1和Q2。
Emitting logs(傳送日誌)
我們將這種模式用於日誌系統,傳送訊息給direct型別的轉發器。我們將 提供日誌嚴重性做為繫結鍵。那樣,接收程式可以選擇性的接收嚴重性的訊息。首先關注傳送日誌的程式碼:
像往常一樣首先建立一個轉換器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然後為傳送訊息做準備:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了簡化程式碼,我們假定日誌的嚴重性是‘info’,‘warning’,‘error’中之一。
Subscribing(訂閱)
接收訊息跟前面博文中的一樣。我們僅需要修改一個地方:為每一個我們感興趣的嚴重性的訊息,建立一個新的繫結。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
完整的例子
傳送端程式碼(EmitLogDirect.java)
public class EmitLogDirect {
private final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException {
/**
* 建立連線連線到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個頻道
Channel channel = connection.createChannel();
// 指定轉發——廣播
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//所有日誌嚴重性級別
String[] severities={"error","info","warning"};
for(int i=0;i<3;i++){
String severity = severities[i%3];//每一次傳送一條不同嚴重性的日誌
// 傳送的訊息
String message = "Hello World"+Strings.repeat(".", i+1);
//引數1:exchange name
//引數2:routing key
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
}
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者1(ReceiveLogs2Console.java)
public class ReceiveLogs2Console {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 宣告一個隨機佇列
String queueName = channel.queueDeclare().getQueue();
//所有日誌嚴重性級別
String[] severities={"error","info","warning"};
for (String severity : severities) {
//關注所有級別的日誌(多重繫結)
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消費者2(ReceiveLogs2File.java)
public class ReceiveLogs2File {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 宣告一個隨機佇列
String queueName = channel.queueDeclare().getQueue();
String severity="error";//只關注error級別的日誌,然後記錄到檔案中去。
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
//記錄日誌到檔案:
print2File( "["+ envelope.getRoutingKey() + "] "+message);
}
};
channel.basicConsume(queueName, true, consumer);
}
private static void print2File(String msg) {
try {
String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File file = new File(dir, logFileName + ".log");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最終結果:
羅哩羅嗦的說這麼多,其實就是說了這麼一件事:我們可以使用Direct exchange+routingKey來過濾自己感興趣的訊息。一個佇列可以繫結多個routingKey。這就是我們今天的主題——路由選擇。