【RabbitMQ】3.工作佇列及釋出訂閱
一、工作佇列
(一個任務只發給一個消費者,根據設定,若消費者異常,才可轉發給另一個消費者)
當有的消費者(Consumer)需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance(平衡)每個Consumer(生產者)的load,即負載均衡。通過建立一個工作佇列用來在consumer(生產者)間分發耗時任務。試想一下,對於web application來說,在一個很多的HTTP request裡是沒有時間來處理複雜的運算的,只能通過後臺的一些工作執行緒來完成。
應用場景就是RabbitMQ Server會將queue的Message分發給不同的Consumer以處理計算密集型的任務:
工作佇列的主要任務是:避免立刻執行資源密集型任務,然後必須等待其完成。相反地,我們進行任務排程:我們把任務封裝為訊息傳送給佇列。工作進行在後臺執行並不斷的從佇列中取出任務然後執行。當你運行了多個工作程序時,任務佇列中的任務將會被工作程序共享執行。
在現實應用中,Consumer有可能做的是一個圖片的resize,或者是pdf檔案的渲染或者內容提取。但是作為Demo,還是用字串模擬吧:通過字串中的.的數量來決定計算的複雜度,每個.都會消耗1s,即sleep(1)。
我們使用Thread.sleep來模擬耗時的任務。我們在傳送到佇列的訊息的末尾新增一定數量的點,每個點代表在工作執行緒中需要耗時1秒,例如hello…將會需要等待3秒。
1.Round-robin dispatching 迴圈分發
RabbitMQ的分發機制非常適合擴充套件,而且它是專門為併發程式設計的。如果現在load加重,那麼只需要建立更多的Consumer(消費者或工作者)來進行任務處理即可。當然了,對於負載還要加大怎麼辦?我沒有遇到過這種情況,那就可以建立多個virtual Host,細化不同的通訊類別了。
下面我們先執行3個工作者(Work.java)例項,然後執行NewTask.java,3個工作者例項都會得到資訊。但是如何分配呢?讓我們來看輸出結果:[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'
[x] Sent 'helloworld.....5'
[x] Sent 'helloworld......6'
[x] Sent 'helloworld.......7'
[x] Sent 'helloworld........8'
[x] Sent 'helloworld.........9'
[x] Sent 'helloworld..........10'
工作者1:
605645 [*] Waiting for messages...
605645 [x] Received 'helloworld.1'
605645 [x] Done
605645 [x] Received 'helloworld....4'
605645 [x] Done
605645 [x] Received 'helloworld.......7'
605645 [x] Done
605645 [x] Received 'helloworld..........10'
605645 [x] Done
工作者2:
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld.....5'
18019860 [x] Done
18019860 [x] Received 'helloworld........8'
18019860 [x] Done
工作者3:
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld...3'
18019860 [x] Done
18019860 [x] Received 'helloworld......6'
18019860 [x] Done
18019860 [x] Received 'helloworld.........9'
18019860 [x] Done
可以看到,預設的,RabbitMQ會一個一個的傳送資訊給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的訊息。這樣分發訊息的方式叫做round-robin。
2.message acknowledgments 訊息應答(確認)機制
執行一個任務需要花費幾秒鐘。你可能會擔心當一個工作者在執行任務時發生中斷。我們上面的程式碼,一旦RabbItMQ交付了一個資訊給消費者,會馬上從記憶體中移除這個資訊。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正在處理的資訊。我們也會丟失已經轉發給這個工作者且它還未執行的訊息。
上面的例子,我們首先開啟兩個任務,然後執行傳送任務的程式碼(NewTask.java),然後立即關閉第二個任務,結果為:
工作者2:
31054905 [*] Waiting for messages...
31054905 [x] Received 'helloworld..2'
31054905 [x] Done
31054905 [x] Received 'helloworld....4'
工作者1:
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld.1'
18019860 [x] Done
18019860 [x] Received 'helloworld...3'
18019860 [x] Done
18019860 [x] Received 'helloworld.....5'
18019860 [x] Done
18019860 [x] Received 'helloworld.......7'
18019860 [x] Done
18019860 [x] Received 'helloworld.........9'
18019860 [x] Done
可以看到,第二個工作者至少丟失了6,8,10號任務,且4號任務未完成。
但是,我們不希望丟失任何任務(資訊)。當某個工作者(接收者)被殺死時,我們希望將任務傳遞給另一個工作者。
為了保證訊息永遠不會丟失,RabbitMQ支援訊息應答(message acknowledgments)。消費者傳送應答給RabbitMQ,告訴它資訊已經被接收和處理,然後RabbitMQ可以自由的進行資訊刪除。
如果消費者被殺死而沒有傳送應答,RabbitMQ會認為該資訊沒有被完全的處理,然後將會重新轉發給別的消費者。通過這種方式,你可以確認資訊不會被丟失,即使消者偶爾被殺死。
這種機制並沒有超時時間這麼一說,RabbitMQ只有在消費者連線斷開時重新轉發此資訊。如果消費者處理一個資訊需要耗費特別特別長的時間是允許的。
訊息應答預設是開啟的。上面的程式碼中我們通過顯示的設定autoAsk=true關閉了這種機制。下面我們修改程式碼
boolean ack = false ; //開啟應答機制
channel.basicConsume(QUEUE_NAME, ack, consumer);
//另外需要在每次處理完成一個訊息後,手動傳送一次應答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
完整的程式碼為:
package com.bj.rabbitmq.study.second;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Work {
//佇列名稱
private final static String QUEUE_NAME = "workqueue";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
//區分不同工作程序的輸出
int hashCode = Work.class.hashCode();
//建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(hashCode
+ " [*] Waiting for messages...");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
boolean ack = false ; //開啟應答機制
channel.basicConsume(QUEUE_NAME, ack, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
doWork(message);
System.out.println(hashCode + " [x] Done");
//傳送應答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
/**
* 每個點耗時1s
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
char[] taskarray = task.toCharArray();
for (char ch : taskarray){
if (ch == '.')
Thread.sleep(1000);
}
}
}
測試:
我們把訊息數量改為5,然後先開啟兩個消費者(Work.java),然後傳送任務(NewTask.java),立即關閉一個消費者,觀察輸出:
[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'
[x] Sent 'helloworld.....5'
工作者2
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld....4'
工作者1
31054905 [*] Waiting for messages...
31054905 [x] Received 'helloworld.1'
31054905 [x] Done
31054905 [x] Received 'helloworld...3'
31054905 [x] Done
31054905 [x] Received 'helloworld.....5'
31054905 [x] Done
31054905 [x] Received 'helloworld....4'
31054905 [x] Done
可以看到工作者2沒有完成的任務4,重新轉發給工作者1進行完成了。
如果忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會佔用越來越多的記憶體,由於RabbitMQ會長時間執行,因此這個“記憶體洩漏”是致命的。去除錯這種錯誤,可以通過一下命令列印un-acked Messages。
3、 訊息持久化(Message durability)
已經學習了即使消費者被殺死,訊息也不會被丟失。但是如果此時RabbitMQ服務被停止,我們的訊息仍然會丟失。
當RabbitMQ退出或者異常退出,將會丟失所有的佇列和資訊,除非你告訴它不要丟失。我們需要做兩件事來確保資訊不會被丟失:我們需要給所有的佇列和訊息設定持久化的標誌。
第一, 我們需要確認RabbitMQ永遠不會丟失我們的佇列。為了這樣,我們需要宣告它為持久化的。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不允許使用不同的引數重新定義一個佇列,所以已經存在的佇列,我們無法修改其屬性。
第二, 我們需要標識我們的資訊為持久化的。通過設定MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
現在你可以執行一個傳送訊息的程式,然後關閉服務,再重新啟動服務,執行消費者程式做下實驗。
4、公平轉發(Fair dispatch)
或許會發現,目前的訊息轉發機制(Round-robin)並非是我們想要的。例如,這樣一種情況,對於兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕鬆,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務後等待。
造成這樣的原因是因為RabbitMQ僅僅是當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
為了解決這樣的問題,我們可以使用basicQos方法,傳遞引數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息。換句話說,只有在消費者空閒的時候會發送下一條資訊。
channel.basic_qos(prefetch_count=1)
注:如果所有的工作者都處於繁忙狀態,你的佇列有可能被填充滿。你可能會觀察佇列的使用情況,然後增加工作者,或者使用別的什麼策略。
生產者:
package com.bj.rabbitmq.study.second;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewsTask {
//佇列名稱
public final static String QUEUE_NAME = "workqueue";
public static void main(String[] args) throws IOException {
//建立連線 連線到rabbitmq訊息佇列
ConnectionFactory cf = new ConnectionFactory();
//設定RabbitMQ所在主機ip或者主機名
cf.setHost("localhost");
//建立一個連線
Connection c = cf.newConnection();
//建立一個頻道
Channel ch = c.createChannel();
//指定一個佇列
//設定佇列持久化
boolean durable = true;// 1、設定佇列持久化
ch.queueDeclare(QUEUE_NAME, durable, false, false, null);
//往佇列中釋出10調訊息,依次在其後增加1至10個點
for (int i = 0; i < 10; i++)
{
String dots = "";
for (int j = 0; j <= i; j++)
{
dots += ".";
}
String message = "helloworld" + dots+dots.length();
//設定訊息持久化MessageProperties
ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
ch.close();
c.close();
}
}
消費者:
package com.bj.rabbitmq.study.second;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Work {
//佇列名稱
private final static String QUEUE_NAME = "workqueue";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException{
//區分不同工作程序的輸出
int hashCode = Work.class.hashCode();
//建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告佇列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(hashCode
+ " [*] Waiting for messages...");
//設定最大服務轉發訊息數量
int prefetchCount = 1;
channel.basicQos(prefetchCount);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
boolean ack = false; // 開啟應答機制
channel.basicConsume(QUEUE_NAME, ack, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
doWork(message);
System.out.println(hashCode + " [x] Done");
}
}
/**
* 每個點耗時1s
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
char[] taskarray = task.toCharArray();
for (char ch : taskarray){
if (ch == '.')
Thread.sleep(1000);
}
}
}
二、釋出/訂閱
把一個訊息發給多個消費者,這種模式稱之為釋出/訂閱(類似觀察者模式)
通過建立一個日誌系統,來驗證這種模式。它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並列印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁碟上;第二個將log輸出的螢幕。
就是釋出的日誌訊息會轉發給所有的接收者。
1.Exchanges(交換器/轉發器)
之前主要介紹的都是傳送者傳送訊息給佇列,接收者從佇列接收訊息,現在我們引入exchanges,展示RabbitMQ的完整的訊息模型。
RabbitMQ訊息模型的核心理念是生產者永遠不會直接傳送任何訊息給佇列,一般的情況生產者甚至不知道訊息應該傳送到哪些佇列。相反的,生產者只能傳送訊息給轉發器(Exchange)。
轉發器是非常簡單的,一邊接收從生產者發來的訊息,另一邊把訊息推送到佇列中。轉發器必須清楚的知道訊息如何處理它收到的每一條訊息。是否應該追加到一個指定的佇列?是否應該追加到多個佇列?或者是否應該丟棄?這些規則通過轉發器的型別進行定義。
轉發器的型別:Direct、Topic、Headers、Fanout(廣播模式)
我們示例通過fanout來實現。channel.exchangeDeclare("logs","fanout");
fanout型別轉發器特別簡單,把所有它介紹到的訊息,廣播到所有它所知道的佇列。不過這正是我們前述的日誌系統所需要的
2、匿名轉發器(nameless exchange)
之前講解中並沒有使用到轉發器,我們仍可以傳送和接收訊息,是因為使用了一個預設的轉發器,它的識別符號為””。之前傳送訊息的程式碼:channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
其中第一個引數即為轉發器的名稱,第二個引數為routingKey,若存在routingKey,則由其決定將訊息傳送到哪個佇列。
3、臨時佇列(Temporary queues)
截至現在,我們用的queue都是有名字的,都為佇列指定了一個特定的名稱。能夠為佇列命名對我們來說是很關鍵的,我們需要指定消費者為某個佇列。當我們希望在生產者和消費者間共享佇列時,為佇列命名是很重要的。
不過,對於我們的日誌系統我們並不關心佇列的名稱。我們想要接收到所有的log訊息,而且我們也只對當前正在傳遞的資料感興趣。為了滿足我們的需求,需要做兩件事:
第一, 無論什麼時間Consumer連線到RabbitMQ Server時,我們都需要一個新的空的佇列。為了實現,我們可以使用隨機數建立佇列,或者更好的,讓伺服器給我們提供一個隨機的名稱。
Java中我們可以使用queueDeclare()方法,不傳遞任何引數,來建立一個非持久的、唯一的、自動刪除的佇列且佇列名稱由伺服器隨機產生。
String queueName = channel.queueDeclare().getQueue();
第二, 一旦消費者與RabbitMQ Server斷開,消費者所接收的那個佇列應該被自動刪除。
String queueName = channel.queueDeclare().getQueue();
一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。
4、繫結(Bindings)
我們已經建立了一個fanout轉發器和佇列,我們現在需要通過binding告訴轉發器把訊息傳送給我們的佇列。
channel.queueBind(queueName, “logs”, ””)引數1:佇列名稱 ;引數2:轉發器名稱
5、完整的例子
日誌傳送端
與之前的例子不同之處在於publish通過了exchange而不是routing_key。即宣告佇列的程式碼,改為宣告轉發器了,同樣的訊息的傳遞也交給了轉發器。若沒有佇列繫結該轉發器,則該日誌資訊將會被拋棄。
package com.bj.rabbitmq.study.third;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class LogTask {
//轉發器
public final static String EXCHANGE_NAME = "workqueue";
public static void main(String[] args) throws IOException {
//建立連線 連線到rabbitmq訊息佇列
ConnectionFactory cf = new ConnectionFactory();
//設定RabbitMQ所在主機ip或者主機名
cf.setHost("localhost");
//建立一個連線
Connection c = cf.newConnection();
//建立一個頻道
Channel ch = c.createChannel();
// 宣告轉發器和型別
ch.exchangeDeclare(EXCHANGE_NAME, "fanout" );
String message = new Date().toLocaleString()+" : log something";
// 往轉發器上傳送訊息
ch.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
ch.close();
c.close();
}
}
接收端
隨機建立一個佇列,然後將佇列與轉發器繫結,然後將消費者與該佇列繫結。
接收端1輸出到控制檯
package com.bj.rabbitmq.study.third;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class LogWorkConsole {
//轉發器
public final static String EXCHANGE_NAME = "workqueue";
public static void main(String[] argv) throws Exception {
//區分不同工作程序的輸出
int hashCode = LogWorkConsole.class.hashCode();
//建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一個非持久的、唯一的且自動刪除的佇列
String queueName = channel.queueDeclare().getQueue();
// 為轉發器指定佇列,設定binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(hashCode + " [*] Waiting for messages...");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列,第二個引數為自動應答,無需手動應答
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
}
}
}
接收端2 儲存到檔案
package com.bj.rabbitmq.study.third;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class LogWorkFile {
//轉發器
public final static String EXCHANGE_NAME = "workqueue";
public static void main(String[] argv) throws Exception {
//區分不同工作程序的輸出
int hashCode = LogWorkFile.class.hashCode();
//建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一個非持久的、唯一的且自動刪除的佇列
String queueName = channel.queueDeclare().getQueue();
// 為轉發器指定佇列,設定binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(hashCode + " [*] Waiting for messages...");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列,第二個引數為自動應答,無需手動應答
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
print2File(message);
}
}
private static void print2File(String msg) {
try {
String dir = LogWorkFile.class.getClassLoader().getResource("").getPath();
String logFileName = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
File file = new File(dir, logFileName+".txt");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((msg + "\r\n").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
參考: