1. 程式人生 > >RabbitMQ (三) 釋出/訂閱

RabbitMQ (三) 釋出/訂閱

本系列教程主要來自於官網入門教程的翻譯,然後自己進行了部分的修改與實驗,內容僅供參考。 

上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者,如果你還不瞭解:RabbitMQ (二)工作佇列。這篇部落格中,我們會做一些改變,就是把一個訊息發給多個消費者,這種模式稱之為釋出/訂閱(類似觀察者模式)。

         為了驗證這種模式,我們準備構建一個簡單的日誌系統。這個系統包含兩類程式,一類程式發動日誌,另一類程式接收和處理日誌。

         在我們的日誌系統中,每一個執行的接收者程式都會收到日誌。然後我們實現,一個接收者將接收到的資料寫到硬碟上,與此同時,另一個接收者把接收到的訊息展現在螢幕上。

         本質上來說,就是釋出的日誌訊息會轉發給所有的接收者。

1、轉發器(Exchanges)

前面的部落格中我們主要的介紹都是傳送者傳送訊息給佇列,接收者從佇列接收訊息。下面我們會引入Exchanges,展示RabbitMQ的完整的訊息模型。

RabbitMQ訊息模型的核心理念是生產者永遠不會直接傳送任何訊息給佇列,一般的情況生產者甚至不知道訊息應該傳送到哪些佇列。

相反的,生產者只能傳送訊息給轉發器(Exchange)。轉發器是非常簡單的,一邊接收從生產者發來的訊息,另一邊把訊息推送到佇列中。轉發器必須清楚的知道訊息如何處理它收到的每一條訊息。是否應該追加到一個指定的佇列?是否應該追加到多個佇列?或者是否應該丟棄?這些規則通過轉發器的型別進行定義。

下面列出一些可用的轉發器型別:

Direct

Topic

Headers

Fanout

目前我們關注最後一個fanout,宣告轉發器型別的程式碼:

channel.exchangeDeclare("logs","fanout");

fanout型別轉發器特別簡單,把所有它介紹到的訊息,廣播到所有它所知道的佇列。不過這正是我們前述的日誌系統所需要的。

2、匿名轉發器(nameless exchange)

前面說到生產者只能傳送訊息給轉發器(Exchange),但是我們前兩篇部落格中的例子並沒有使用到轉發器,我們仍然可以傳送和接收訊息。這是因為我們使用了一個預設的轉發器,它的識別符號為””。之前傳送訊息的程式碼:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一個引數為轉發器的名稱,我們設定為”” : 如果存在routingKey(第二個引數),訊息由routingKey決定傳送到哪個佇列。

現在我們可以指定訊息傳送到的轉發器:

channel.basicPublish( "logs","", null, message.getBytes());

3、臨時佇列(Temporary queues)

前面的部落格中我們都為佇列指定了一個特定的名稱。能夠為佇列命名對我們來說是很關鍵的,我們需要指定消費者為某個佇列。當我們希望在生產者和消費者間共享佇列時,為佇列命名是很重要的。 不過,對於我們的日誌系統我們並不關心佇列的名稱。我們想要接收到所有的訊息,而且我們也只對當前正在傳遞的資料的感興趣。為了滿足我們的需求,需要做兩件事: 第一, 無論什麼時間連線到Rabbit我們都需要一個新的空的佇列。為了實現,我們可以使用隨機數建立佇列,或者更好的,讓伺服器給我們提供一個隨機的名稱。 第二, 一旦消費者與Rabbit斷開,消費者所接收的那個佇列應該被自動刪除。 Java中我們可以使用queueDeclare()方法,不傳遞任何引數,來建立一個非持久的、唯一的、自動刪除的佇列且佇列名稱由伺服器隨機產生。 String queueName = channel.queueDeclare().getQueue(); 一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。

4、繫結(Bindings)

我們已經建立了一個fanout轉發器和佇列,我們現在需要通過binding告訴轉發器把訊息傳送給我們的佇列。 channel.queueBind(queueName, “logs”, ””)引數1:佇列名稱 ;引數2:轉發器名稱  

5、完整的例子

日誌傳送端:

package com.zhy.rabbit._03_bindings_exchanges;
 
import java.io.IOException;
import java.util.Date;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
public class EmitLog
{
	private final static String EXCHANGE_NAME = "ex_log";
 
	public static void main(String[] args) throws IOException
	{
		// 建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 宣告轉發器和型別
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
		
		String message = new Date().toLocaleString()+" : log something";
		// 往轉發器上傳送訊息
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
 
		System.out.println(" [x] Sent '" + message + "'");
 
		channel.close();
		connection.close();
 
	}
 
}
沒什麼太大的改變,宣告佇列的程式碼,改為宣告轉發器了,同樣的訊息的傳遞也交給了轉發器。

接收端1 :ReceiveLogsToSave.java:

package com.zhy.rabbit._03_bindings_exchanges;
 
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
 
public class ReceiveLogsToSave
{
	private final static String EXCHANGE_NAME = "ex_log";
 
	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException
	{
		// 建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 建立一個非持久的、唯一的且自動刪除的佇列
		String queueName = channel.queueDeclare().getQueue();
		// 為轉發器指定佇列,設定binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");
 
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定接收者,第二個引數為自動應答,無需手動應答
		channel.basicConsume(queueName, true, consumer);
 
		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
 
			print2File(message);
		}
 
	}
 
	private static void print2File(String msg)
	{
		try
		{
			String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd")
					.format(new Date());
			File file = new File(dir, logFileName+".txt");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e)
		{
			e.printStackTrace();
		} catch (IOException e)
		{
			e.printStackTrace();
		}
	}
}

隨機建立一個佇列,然後將佇列與轉發器繫結,然後將消費者與該佇列繫結,然後寫入日誌檔案。

接收端2:ReceiveLogsToConsole.java

package com.zhy.rabbit._03_bindings_exchanges;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
 
public class ReceiveLogsToConsole
{
	private final static String EXCHANGE_NAME = "ex_log";
 
	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException
	{
		// 建立連線和頻道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 建立一個非持久的、唯一的且自動刪除的佇列
		String queueName = channel.queueDeclare().getQueue();
		// 為轉發器指定佇列,設定binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");
 
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定接收者,第二個引數為自動應答,無需手動應答
		channel.basicConsume(queueName, true, consumer);
 
		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
 
		}
 
	}
 
}

隨機建立一個佇列,然後將佇列與轉發器繫結,然後將消費者與該佇列繫結,然後列印到控制檯。

現在把兩個接收端執行,然後執行3次傳送端:

輸出結果:

傳送端:

 [x] Sent '2014-7-10 16:04:54 : log something'

 [x] Sent '2014-7-10 16:04:58 : log something'

 [x] Sent '2014-7-10 16:05:02 : log something'

接收端1:

接收端2:

 [*] Waiting for messages. To exit press CTRL+C  [x] Received '2014-7-10 16:04:54 : log something'  [x] Received '2014-7-10 16:04:58 : log something'  [x] Received '2014-7-10 16:05:02 : log something'

這個例子實現了我們文章開頭所描述的日誌系統,利用了轉發器的型別:fanout。

本篇說明了,生產者將訊息傳送至轉發器,轉發器決定將訊息傳送至哪些佇列,消費者繫結佇列獲取訊息。