RabbitMQ學習總結(5)——釋出和訂閱例項詳解
阿新 • • 發佈:2019-01-26
一、Publish/Subscribe(釋出/訂閱)(using the Java Client)
在前面的教程中,我們建立了一個work Queue(工作佇列)。工作佇列背後的假設是每個任務是交付給一個工作者(worker) 也就是均勻分給每個消費者。在本部分,我們將做一些完全不同的事情,我們將提供一個訊息到多個消費者。這種模式被稱為“釋出/訂閱”。
為了說明這個模式,我們將構建一個簡單的日誌系統。它將包括兩個專案:
- 第一個將發出日誌訊息
- 第二個將接收並列印它們。
在我們的日誌系統,每執行一次,接收器專案將得到訊息的副本。這樣我們能夠執行一個接收機並且可以直接記錄到磁碟,同時我們可以執行另一個接收器,看到螢幕上的日誌。注:從本質上講,發表日誌訊息廣播給所有的接收者。
下面讓我們腦中帶幾個問題,讓我們一步一步去解決:
- 如果我把訊息分配給所有的消費者,我們將怎麼做呢?
二、Exchanges(交換機)
在前部分的教程中,我們從一個佇列傳送和接收訊息。現在是時候讓Rabbit推出完整的訊息模型。 讓我們快速複習我們前面的教程::- 生產者是一個使用者傳送訊息的應用程式。
- 一個佇列是儲存訊息的緩衝區。
- 消費者是一個使用者應用程式接收訊息。
相反,生產者只能傳送Exchanges(訊息交換區)
- 它應該被加到一個特定的佇列嗎?
- 它應該被加到多佇列?
- 或者它應該丟棄嗎?
channel.exchangeDeclare("logs", "fanout");fanout交換非常簡單。你大概可以猜到的名字,只是廣播所有的訊息接收佇列它知道。而這正是我們需要為我們的記錄器。 問題:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.在此列表中有一些amq* 交換器 與預設(匿名)交換。這些都是預設建立的,但可能你不需要使用它們。 ② 預設名字的 exchange(交換機)
在前部分的教程中我們對exchange 一無所知,,但仍然能夠將訊息傳送到佇列。這是可能的,因為我們是使用一個預設的交換,我們確定的空字串(" ")。 記得之前我們釋出一個訊息:
channel.basicPublish("", "hello", null, message.getBytes());第一個引數是該交換區的名稱;空字串表示預設或無名的交換,:如果routingKey存在的話,訊息路由到指定的佇列的名稱。 現在,我們可以釋出我們的交換器:
channel.basicPublish( "logs", "", null, message.getBytes());
三、Temporary queues(臨時佇列)
你可能記得以前我們使用的佇列都是指定名稱的(還記得hello和task_queue嗎?)。對我們來說命名一個佇列是至關重要的,
當你想在生產者和消費者中分享佇列的時候,給一個佇列的名稱是必須的。
但是那些都不是日誌記錄系統所需要的,我們希望能夠獲得所有的日誌資訊,而不只是其中的一部分,而且我們只對當前正在傳遞的資訊感興趣, 對舊的日誌資訊不感興趣,要解決這些問題,我們需要分兩個步驟:
- 首先當我們連結到RabbitMQ伺服器的時候,需要一個新的、空的佇列,為了做到這點,可以建立一個隨機名的佇列,
- 其次,當斷開與佇列的連線時,消費者應該被自動刪除掉。
String queueName = channel.queueDeclare().getQueue();在這點上,queueName包含了一個隨機佇列名稱。例如它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
四、Bindings(繫結)
我們已經建立了一個fanout exchange和一個佇列,現在我們需要告訴exchange去傳送訊息到佇列中,exchange和佇列之間的關係被稱為一個繫結(binding)。channel.queueBind(queueName, "logs", "");注意:從現在開始我們從logs exchange將被新增訊息到佇列中,使用rabbitmqctl list_bingdins能列出所有的繫結。
五、Putting it all together(釋出者/訂閱者 實現)
生產者程式碼和之前的傳送訊息的程式碼並沒有太大的區別,最重要的變化是,我們現在要將釋出的訊息傳遞給logs exchange來代替無名的exchange(之前的是""), 在傳送訊息時需要提供一個routingKey,它對於fanout exchange是非常重要的,不能被忽視的,這裡的EmitLog.java程式碼如下
[java] view plaincopyprint?
- </pre><pre name="code"class="java">import java.io.IOException;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- publicclass EmitLog {
- privatestaticfinal String EXCHANGE_NAME = "logs";
- publicstaticvoid main(String[] argv)
- throws java.io.IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String message = getMessage(argv);
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- channel.close();
- connection.close();
- }
- //...
- }
接收端: [java] view plaincopyprint?
- import java.io.IOException;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.QueueingConsumer;
- publicclass ReceiveLogs {
- privatestaticfinal String EXCHANGE_NAME = "logs";
- publicstaticvoid main(String[] argv)
- throws java.io.IOException,
- java.lang.InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- }
- }
- }
像以前一樣,我們開始做編譯
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想將日誌儲存到一個檔案,開啟一個控制檯並執行$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你想看到日誌在你的螢幕上,產生一個新的終端並執行:$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
釋出日誌型別:$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings實際上您可以驗證繫結和佇列的程式碼是否是我們想要的? 有兩個ReceiveLogs。$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.