1. 程式人生 > >【RabbitMQ】3.工作佇列及釋出訂閱

【RabbitMQ】3.工作佇列及釋出訂閱

一、工作佇列

(一個任務只發給一個消費者,根據設定,若消費者異常,才可轉發給另一個消費者)

當有的消費者(Consumer)需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance(平衡)每個Consumer(生產者)的load,即負載均衡。通過建立一個工作佇列用來在consumer(生產者)間分發耗時任務。試想一下,對於web application來說,在一個很多的HTTP request裡是沒有時間來處理複雜的運算的,只能通過後臺的一些工作執行緒來完成。

應用場景就是RabbitMQ Server會將queue的Message分發給不同的Consumer以處理計算密集型的任務:

工作佇列的主要任務是:避免立刻執行資源密集型任務,然後必須等待其完成。相反地,我們進行任務排程:我們把任務封裝為訊息傳送給佇列。工作進行在後臺執行並不斷的從佇列中取出任務然後執行。當你運行了多個工作程序時,任務佇列中的任務將會被工作程序共享執行。

在現實應用中,Consumer有可能做的是一個圖片的resize,或者是pdf檔案的渲染或者內容提取。但是作為Demo,還是用字串模擬吧:通過字串中的.的數量來決定計算的複雜度,每個.都會消耗1s,即sleep(1)。

我們使用Thread.sleep來模擬耗時的任務。我們在傳送到佇列的訊息的末尾新增一定數量的點,每個點代表在工作執行緒中需要耗時1秒,例如hello…將會需要等待3秒。

1.Round-robin dispatching 迴圈分發

RabbitMQ的分發機制非常適合擴充套件,而且它是專門為併發程式設計的。如果現在load加重,那麼只需要建立更多的Consumer(消費者或工作者)來進行任務處理即可。當然了,對於負載還要加大怎麼辦?我沒有遇到過這種情況,那就可以建立多個virtual Host,細化不同的通訊類別了。

下面我們先執行3個工作者(Work.java)例項,然後執行NewTask.java,3個工作者例項都會得到資訊。但是如何分配呢?讓我們來看輸出結果:[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'
[x] Sent 'helloworld.....5'
[x] Sent 'helloworld......6'
[x] Sent 'helloworld.......7'
[x] Sent 'helloworld........8'
[x] Sent 'helloworld.........9'
[x] Sent 'helloworld..........10'

工作者1:
605645 [*] Waiting for messages...
605645 [x] Received 'helloworld.1'
605645 [x] Done
605645 [x] Received 'helloworld....4'
605645 [x] Done
605645 [x] Received 'helloworld.......7'
605645 [x] Done
605645 [x] Received 'helloworld..........10'
605645 [x] Done

工作者2:
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld.....5'
18019860 [x] Done
18019860 [x] Received 'helloworld........8'
18019860 [x] Done

工作者3:
18019860 [*] Waiting for messages... 
18019860 [x] Received 'helloworld...3'
18019860 [x] Done
18019860 [x] Received 'helloworld......6'
18019860 [x] Done
18019860 [x] Received 'helloworld.........9'
18019860 [x] Done
可以看到,預設的,RabbitMQ會一個一個的傳送資訊給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的訊息。這樣分發訊息的方式叫做round-robin。

2.message acknowledgments 訊息應答(確認)機制

執行一個任務需要花費幾秒鐘。你可能會擔心當一個工作者在執行任務時發生中斷。我們上面的程式碼,一旦RabbItMQ交付了一個資訊給消費者,會馬上從記憶體中移除這個資訊。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正在處理的資訊。我們也會丟失已經轉發給這個工作者且它還未執行的訊息。
上面的例子,我們首先開啟兩個任務,然後執行傳送任務的程式碼(NewTask.java),然後立即關閉第二個任務,結果為:
工作者2:

31054905 [*] Waiting for messages...
31054905 [x] Received 'helloworld..2'
31054905 [x] Done
31054905 [x] Received 'helloworld....4'

工作者1:
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld.1'
18019860 [x] Done
18019860 [x] Received 'helloworld...3'
18019860 [x] Done
18019860 [x] Received 'helloworld.....5'
18019860 [x] Done
18019860 [x] Received 'helloworld.......7'
18019860 [x] Done
18019860 [x] Received 'helloworld.........9'
18019860 [x] Done
可以看到,第二個工作者至少丟失了6,8,10號任務,且4號任務未完成。

但是,我們不希望丟失任何任務(資訊)。當某個工作者(接收者)被殺死時,我們希望將任務傳遞給另一個工作者。
為了保證訊息永遠不會丟失,RabbitMQ支援訊息應答(message acknowledgments)。消費者傳送應答給RabbitMQ,告訴它資訊已經被接收和處理,然後RabbitMQ可以自由的進行資訊刪除。
如果消費者被殺死而沒有傳送應答,RabbitMQ會認為該資訊沒有被完全的處理,然後將會重新轉發給別的消費者。通過這種方式,你可以確認資訊不會被丟失,即使消者偶爾被殺死。
這種機制並沒有超時時間這麼一說,RabbitMQ只有在消費者連線斷開時重新轉發此資訊。如果消費者處理一個資訊需要耗費特別特別長的時間是允許的。
訊息應答預設是開啟的。上面的程式碼中我們通過顯示的設定autoAsk=true關閉了這種機制。下面我們修改程式碼

boolean ack = false ; //開啟應答機制
channel.basicConsume(QUEUE_NAME, ack, consumer);
//另外需要在每次處理完成一個訊息後,手動傳送一次應答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

完整的程式碼為:

package com.bj.rabbitmq.study.second;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work {
	//佇列名稱
		private final static String QUEUE_NAME = "workqueue";
	 
		public static void main(String[] argv) throws java.io.IOException,
				java.lang.InterruptedException
		{
			//區分不同工作程序的輸出
			int hashCode = Work.class.hashCode();
			//建立連線和頻道
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//宣告佇列
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			System.out.println(hashCode
					+ " [*] Waiting for messages...");
		
			QueueingConsumer consumer = new QueueingConsumer(channel);
			// 指定消費佇列
            boolean ack = false ; //開啟應答機制
		    channel.basicConsume(QUEUE_NAME, ack, consumer);
			while (true)
			{
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
	 
				System.out.println(hashCode + " [x] Received '" + message + "'");
				doWork(message);
				System.out.println(hashCode + " [x] Done");
                //傳送應答
    			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
	 
		}
	 
		/**
		 * 每個點耗時1s
		 * @param task
		 * @throws InterruptedException
		 */
		private static void doWork(String task) throws InterruptedException {
			char[] taskarray = task.toCharArray();
			for (char ch : taskarray){
				if (ch == '.')
					Thread.sleep(1000);
			}
		}
}

測試:
我們把訊息數量改為5,然後先開啟兩個消費者(Work.java),然後傳送任務(NewTask.java),立即關閉一個消費者,觀察輸出:
[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'
[x] Sent 'helloworld.....5'

工作者2
18019860 [*] Waiting for messages...
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld....4'

工作者1
31054905 [*] Waiting for messages...
31054905 [x] Received 'helloworld.1'
31054905 [x] Done
31054905 [x] Received 'helloworld...3'
31054905 [x] Done
31054905 [x] Received 'helloworld.....5'
31054905 [x] Done
31054905 [x] Received 'helloworld....4'
31054905 [x] Done

可以看到工作者2沒有完成的任務4,重新轉發給工作者1進行完成了。

如果忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會佔用越來越多的記憶體,由於RabbitMQ會長時間執行,因此這個“記憶體洩漏”是致命的。去除錯這種錯誤,可以通過一下命令列印un-acked Messages。

3、 訊息持久化(Message durability)

已經學習了即使消費者被殺死,訊息也不會被丟失。但是如果此時RabbitMQ服務被停止,我們的訊息仍然會丟失。

當RabbitMQ退出或者異常退出,將會丟失所有的佇列和資訊,除非你告訴它不要丟失。我們需要做兩件事來確保資訊不會被丟失:我們需要給所有的佇列訊息設定持久化的標誌。
第一, 我們需要確認RabbitMQ永遠不會丟失我們的佇列。為了這樣,我們需要宣告它為持久化的。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不允許使用不同的引數重新定義一個佇列,所以已經存在的佇列,我們無法修改其屬性。
第二, 我們需要標識我們的資訊為持久化的。通過設定MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
現在你可以執行一個傳送訊息的程式,然後關閉服務,再重新啟動服務,執行消費者程式做下實驗。

 

4、公平轉發(Fair dispatch)

或許會發現,目前的訊息轉發機制(Round-robin)並非是我們想要的。例如,這樣一種情況,對於兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕鬆,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務後等待。
造成這樣的原因是因為RabbitMQ僅僅是當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
為了解決這樣的問題,我們可以使用basicQos方法,傳遞引數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息。換句話說,只有在消費者空閒的時候會發送下一條資訊。

channel.basic_qos(prefetch_count=1)

注:如果所有的工作者都處於繁忙狀態,你的佇列有可能被填充滿。你可能會觀察佇列的使用情況,然後增加工作者,或者使用別的什麼策略。

生產者:

package com.bj.rabbitmq.study.second;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewsTask {
	//佇列名稱
	public final static String QUEUE_NAME = "workqueue";
	
	public static void main(String[] args) throws IOException {
		//建立連線 連線到rabbitmq訊息佇列
		ConnectionFactory cf = new ConnectionFactory();
		//設定RabbitMQ所在主機ip或者主機名
		cf.setHost("localhost");
		//建立一個連線
		Connection c = cf.newConnection();
		//建立一個頻道
		Channel ch = c.createChannel();
		//指定一個佇列
        //設定佇列持久化
        boolean durable = true;// 1、設定佇列持久化
		ch.queueDeclare(QUEUE_NAME, durable, false, false, null);
		//往佇列中釋出10調訊息,依次在其後增加1至10個點
		for (int i = 0; i < 10; i++)
		{
			String dots = "";
			for (int j = 0; j <= i; j++)
			{
				dots += ".";
			}
			String message = "helloworld" + dots+dots.length();
            //設定訊息持久化MessageProperties
			ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		
		ch.close();
		c.close();
	}
}

消費者:

package com.bj.rabbitmq.study.second;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work {
	//佇列名稱
	private final static String QUEUE_NAME = "workqueue";
	 
	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException{
		//區分不同工作程序的輸出
		int hashCode = Work.class.hashCode();
		//建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		//宣告佇列
        boolean durable = true;
		channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
		System.out.println(hashCode
				+ " [*] Waiting for messages...");
		//設定最大服務轉發訊息數量
	    int prefetchCount = 1;
	    channel.basicQos(prefetchCount);
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定消費佇列
        boolean ack = false; // 開啟應答機制
		channel.basicConsume(QUEUE_NAME, ack, consumer);
		while (true){
		    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
	 
			System.out.println(hashCode + " [x] Received '" + message + "'");
			doWork(message);
			System.out.println(hashCode + " [x] Done");
	 
		}
	 
	}
	 
	/**
	 * 每個點耗時1s
	 * @param task
	 * @throws InterruptedException
	 */
	private static void doWork(String task) throws InterruptedException {
		char[] taskarray = task.toCharArray();
		for (char ch : taskarray){
			if (ch == '.')
				Thread.sleep(1000);
		}
	}
}

 

二、釋出/訂閱

把一個訊息發給多個消費者,這種模式稱之為釋出/訂閱(類似觀察者模式)

通過建立一個日誌系統,來驗證這種模式。它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並列印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁碟上;第二個將log輸出的螢幕。

就是釋出的日誌訊息會轉發給所有的接收者。

1.Exchanges(交換器/轉發器)

     之前主要介紹的都是傳送者傳送訊息給佇列,接收者從佇列接收訊息,現在我們引入exchanges,展示RabbitMQ的完整的訊息模型。

     RabbitMQ訊息模型的核心理念是生產者永遠不會直接傳送任何訊息給佇列,一般的情況生產者甚至不知道訊息應該傳送到哪些佇列。相反的,生產者只能傳送訊息給轉發器(Exchange)。

     轉發器是非常簡單的,一邊接收從生產者發來的訊息,另一邊把訊息推送到佇列中。轉發器必須清楚的知道訊息如何處理它收到的每一條訊息。是否應該追加到一個指定的佇列?是否應該追加到多個佇列?或者是否應該丟棄?這些規則通過轉發器的型別進行定義。

轉發器的型別:Direct、Topic、Headers、Fanout(廣播模式)

我們示例通過fanout來實現。channel.exchangeDeclare("logs","fanout");

fanout型別轉發器特別簡單,把所有它介紹到的訊息,廣播到所有它所知道的佇列。不過這正是我們前述的日誌系統所需要的

2、匿名轉發器(nameless exchange)

之前講解中並沒有使用到轉發器,我們仍可以傳送和接收訊息,是因為使用了一個預設的轉發器,它的識別符號為””。之前傳送訊息的程式碼:channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

其中第一個引數即為轉發器的名稱,第二個引數為routingKey,若存在routingKey,則由其決定將訊息傳送到哪個佇列。

3、臨時佇列(Temporary queues)

       截至現在,我們用的queue都是有名字的,都為佇列指定了一個特定的名稱。能夠為佇列命名對我們來說是很關鍵的,我們需要指定消費者為某個佇列。當我們希望在生產者和消費者間共享佇列時,為佇列命名是很重要的。
       不過,對於我們的日誌系統我們並不關心佇列的名稱。我們想要接收到所有的log訊息,而且我們也只對當前正在傳遞的資料感興趣。為了滿足我們的需求,需要做兩件事:
       第一, 無論什麼時間Consumer連線到RabbitMQ Server時,我們都需要一個新的空的佇列。為了實現,我們可以使用隨機數建立佇列,或者更好的,讓伺服器給我們提供一個隨機的名稱。

     Java中我們可以使用queueDeclare()方法,不傳遞任何引數,來建立一個非持久的、唯一的、自動刪除的佇列且佇列名稱由伺服器隨機產生。
           String queueName = channel.queueDeclare().getQueue();
       第二, 一旦消費者與RabbitMQ Server斷開,消費者所接收的那個佇列應該被自動刪除。
           String queueName = channel.queueDeclare().getQueue();
一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。

4、繫結(Bindings)

我們已經建立了一個fanout轉發器和佇列,我們現在需要通過binding告訴轉發器把訊息傳送給我們的佇列。
channel.queueBind(queueName, “logs”, ””)引數1:佇列名稱 ;引數2:轉發器名稱

5、完整的例子

日誌傳送端

與之前的例子不同之處在於publish通過了exchange而不是routing_key。即宣告佇列的程式碼,改為宣告轉發器了,同樣的訊息的傳遞也交給了轉發器。若沒有佇列繫結該轉發器,則該日誌資訊將會被拋棄。

package com.bj.rabbitmq.study.third;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class LogTask {
	//轉發器
	public final static String EXCHANGE_NAME = "workqueue";
	
	public static void main(String[] args) throws IOException {
		//建立連線 連線到rabbitmq訊息佇列
		ConnectionFactory cf = new ConnectionFactory();
		//設定RabbitMQ所在主機ip或者主機名
		cf.setHost("localhost");
		//建立一個連線
		Connection c = cf.newConnection();
		//建立一個頻道
		Channel ch = c.createChannel();
		// 宣告轉發器和型別
		ch.exchangeDeclare(EXCHANGE_NAME, "fanout" );
		
		String message = new Date().toLocaleString()+" : log something";
		// 往轉發器上傳送訊息
		ch.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		ch.close();
		c.close();
	}
}

接收端

隨機建立一個佇列,然後將佇列與轉發器繫結,然後將消費者與該佇列繫結。

接收端1輸出到控制檯

package com.bj.rabbitmq.study.third;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class LogWorkConsole {
	//轉發器
	public final static String EXCHANGE_NAME = "workqueue";
 
	public static void main(String[] argv) throws Exception {
		//區分不同工作程序的輸出
		int hashCode = LogWorkConsole.class.hashCode();
		//建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 建立一個非持久的、唯一的且自動刪除的佇列
		String queueName = channel.queueDeclare().getQueue();
		// 為轉發器指定佇列,設定binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		
		System.out.println(hashCode + " [*] Waiting for messages...");
	
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定消費佇列,第二個引數為自動應答,無需手動應答
		channel.basicConsume(queueName, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(hashCode + " [x] Received '" + message + "'");
		}
	}
}

接收端2 儲存到檔案

package com.bj.rabbitmq.study.third;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class LogWorkFile {
	//轉發器
	public final static String EXCHANGE_NAME = "workqueue";
 
	public static void main(String[] argv) throws Exception {
		//區分不同工作程序的輸出
		int hashCode = LogWorkFile.class.hashCode();
		//建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 建立一個非持久的、唯一的且自動刪除的佇列
		String queueName = channel.queueDeclare().getQueue();
		// 為轉發器指定佇列,設定binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		
		System.out.println(hashCode + " [*] Waiting for messages...");
	
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定消費佇列,第二個引數為自動應答,無需手動應答
		channel.basicConsume(queueName, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			print2File(message);
		}
	}
	private static void print2File(String msg) {
		try {
			String dir = LogWorkFile.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd")
					.format(new Date());
			File file = new File(dir, logFileName+".txt");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

 

參考:

https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq

https://blog.csdn.net/lmj623565791/article/list/7