1. 程式人生 > >RabbitMQ札記-訊息分發策略

RabbitMQ札記-訊息分發策略

今天來學習RabbitMQ的訊息分發策略。

RabbitMQ札記-RabbitMQ入門一文中,我們曾學習過RabbitMQ的概念模型,其中就介紹過訊息分發策略。

Exchange表示交換器,用來接收生產者傳送的訊息並將這些訊息路由給佇列。從圖中可以看出,Procuder釋出的Message進入了Exchange。Exchange通過Routing key與Queue繫結在一起。通過Routing key, RabbitMQ可以得知應該把這個Message放到哪個Queue裡。Exchange分發訊息時根據型別的不同分發策略有區別,目前共五種型別:fanout、direct、topic、headers、x-delayed-message。

fanout。每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。 direct。如果Routing key匹配, 那麼Message就會被傳遞到相應的Queue中。比如key可以傳遞到Routing key為key的Queue。 topic。對Routing key進行模式匹配,比如key*可以傳遞到Routing key為key1、key2、key3的Queue。

為了理解訊息分發策略,我們將建立一個簡單的日誌系統。它將包含兩種程式,一個Producer和兩個Consumer。Producer將傳送日誌訊息,Consumer將接收並列印訊息。

fanoutx

在前面的文章中,每個Message都是deliver給一個Consumer。今天我們學習如何將一個Message傳遞給多個Consumer,這種模式被稱為“釋出/訂閱

”模式。

分發策略中的fanout就是廣播模式,每個發到 fanout 型別Exchange的訊息都會發到所有繫結的佇列上去。讓我們建立一個這種型別的Exchange,並將其命名為logs:

channel.exchangeDeclare(“logs”,“fanout”);

現在,我們可以釋出Message到名為logs的Exchange了:

channel.basicPublish("logs","",null,message.getBytes());
臨時佇列

截至現在,我們用的queue都是有名字的,如testQueue。但是對於我們將要構建的日誌系統,並不需要有名字的queue。當我們不給queueDeclare()提供引數時,將用一個自動生成的名稱建立一個非持久的,獨佔的自動刪除佇列:

String queueName = channel.queueDeclare().getQueue();

此時queueName是一個隨機的佇列名稱,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

繫結

我們已經學習瞭如何建立fanout 型別的Exchange和臨時佇列。現在我們需要將兩者繫結起來。

channel.queueBind(queueName,"logs","");
最終版本

Producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 建立一個到伺服器的連線
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		//釋出訊息到我們的exchange,而不是佇列
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		// 傳送的訊息
		String message = "Hello World.";
		// 發訊息
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println("Sent:" + message);

		// 關閉渠道和連線;
		channel.close();
		conn.close();
	}
}

Consumer.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 建立一個到伺服器的連線
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		//獲取channel繫結的佇列的名字
		String queueName = channel.queueDeclare().getQueue();
		//將queue與exchange繫結
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println("Waiting for messages.");

		// 建立佇列消費者
		final DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received:" + message);
			}
		};
		channel.basicConsume(queueName, true, consumer);
	}
}

啟動Consumer的兩個例項C1和C2,再執行Producer傳送訊息,檢視結果 C1:

Waiting for messages.
Received:Hello World.

C2:

Waiting for messages.
Received:Hello World.

從結果中可以看出,兩個Consumer都收到了訊息。從管理平臺中可以看到,logs交換器確實綁定了兩個queue。 MarkdownPhotos/master/CSDNBlogs/RabbitMQ/pubsubDemoResult.png

direct

在上文中,我們建立了一個簡單的日誌系統,它可以將訊息廣播給多個Consumer。在本文中,我們將通過direct模式為其新增一個功能。Consumer將只能訂閱一部分訊息。

direct意為如果Routing key匹配, 那麼Message就會被傳遞到相應的Queue中。比如Routing key為key的Exchange可以分發訊息到Routing key為key的Queue。 MarkdownPhotos/master/CSDNBlogs/RabbitMQ/directExchangeModel.png 從圖中我們可以看到兩個佇列繫結的直接交換。第一個佇列用繫結鍵key1繫結,第二個佇列有兩個繫結,一個繫結鍵為key2,另一個為key3。通過路由鍵key1釋出到Exchange的訊息將被路由到佇列Queue1。通過路由鍵key2或key3釋出到Exchange的訊息將被路由到佇列Queue2。其他訊息將被丟棄。

繫結

繫結是exchange和queue之間的關係。在前面的例子中,我們已經建立了繫結,queueName為佇列的名字,“logs”為Exchange的名字。

channel.queueBind(queueName,"logs","");

繫結可以採用額外的routingKey引數。

channel.queueBind(queueName,"logs",routingKey);

對於fanout的exchange來說,這個引數是被忽略的。

多個繫結

繫結多個佇列是完全合法的。 MarkdownPhotos/master/CSDNBlogs/RabbitMQ/multiBindExchangeModel.png 在這種情況下,直接交換就像廣播一樣,將訊息廣播到所有的匹配佇列。

最終版本

Producer.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

	private final static String EXCHANGE_NAME = "direct_logs";
	private final static String ROUTING_KEY = "key1";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 建立一個到伺服器的連線
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		//釋出訊息到我們的exchange,而不是佇列
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		// 傳送的訊息
		String message = "Hello World.";
		// 發訊息
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
		System.out.println("Sent:" + ROUTING_KEY + ":" + message);

		// 關閉渠道和連線;
		channel.close();
		conn.close();
	}
}

Consumer.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

	private final static String EXCHANGE_NAME = "direct_logs";
	private final static String ROUTING_KEY = "key1";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 建立一個到伺服器的連線
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		//獲取channel繫結的佇列的名字
		String queueName = channel.queueDeclare().getQueue();
		//將queue與exchange繫結
		channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);

		System.out.println("Waiting for messages.");

		// 建立佇列消費者
		final DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received:" + envelope.getRoutingKey() + "':'" + message);
			}
		};
		channel.basicConsume(queueName, true, consumer);
	}
}

Consumer2.java 將Consumer.java複製一份,命名為Consumer2.java。將其中的ROUTING_KEY改為“key2”。

執行兩個Consumer例項,稱其為C1和C2,執行一個Consumer2例項,稱其為C3。執行Producer傳送訊息。觀察執行結果 C1:

Waiting for messages.
Received:key1':'Hello World.

C2:

Waiting for messages.
Received:key1':'Hello World.

C3:

Waiting for messages.

結果驗證了上述直接交換和多個繫結中的內容。

topic

在上文中,我們改進了我們的日誌系統。我們使用了直接交換,使Consumer支援選擇性地接收日誌。儘管如此,它仍然有侷限性。為了提高日誌系統中的靈活性,我們需要了解更復雜的話題交換。

topic可以對Routing key進行模式匹配,比如key*可以傳遞到Routing key為key1、key2、key3的Queue。Routing key的值是有限制的,它必須是由“.”分隔的單詞列表,比如“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。Routing key最長不能超過255 bytes。

Routing key中可能有兩個特殊字元:

  • *可以代替一個字。
  • #可以代替零個或多個單詞。

請看下面一個例子 MarkdownPhotos/master/CSDNBlogs/RabbitMQ/topicExchangeModel.png 在這裡我們建立了兩個繫結: Queue1 的binding key 是".orange."; Queue2的binding key 是 “..rabbit” 和 “lazy.#”。

  • Queue1 對所有orange顏色的動物感興趣
  • Queue2 對所有的rabbits和所有的lazy的感興趣

比如Routing key是 “quick.orange.rabbit"將會發送到Queue1和Queue2中。訊息"lazy.orange.elephant” 也會發送到Queue1和Queue2。但是"quick.orange.fox" 會發送到Queue1;"lazy.brown.fox"會發送到Queue2。“lazy.pink.rabbit” 也會發送到Queue2,但是儘管兩個routing_key都匹配,它也只是傳送一次。“quick.brown.fox” 會被丟棄。

Topic exchange功能強大,可以轉化為其他的exchange。 如果binding_key 是 “#” ,它會接收所有的Message,不管Routing key是什麼,就像是fanout exchange。 如果 "*“和”#"沒有被使用,那麼topic exchange就變成了direct exchange。

參考資料: