1. 程式人生 > >RabbitMQ (四) 路由選擇 Routing

RabbitMQ (四) 路由選擇 Routing

               

上一篇部落格我們建立了一個簡單的日誌系統,我們能夠廣播日誌訊息給所有你的接收者,如果你不瞭解,請檢視:RabbitMQ (三) 釋出/訂閱。本篇部落格我們準備給日誌系統新增新的特性,讓日誌接收者能夠訂閱部分訊息。例如,我們可以僅僅將致命的錯誤寫入日誌檔案,然而仍然在控制面板上打印出所有的其他型別的日誌訊息。

1、繫結(Bindings)
在上一篇部落格中我們已經使用過繫結。類似下面的程式碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
繫結表示轉發器與佇列之間的關係。我們也可以簡單的認為:佇列對該轉發器上的訊息感興趣。
繫結可以附帶一個額外的引數routingKey。為了與避免basicPublish方法(釋出訊息的方法)的引數混淆,我們準備把它稱作繫結鍵(binding key)。下面展示如何使用繫結鍵(binding key)來建立一個繫結:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
繫結鍵的意義依賴於轉發器的型別。對於fanout型別,忽略此引數。
2、直接轉發(Direct exchange)
上一篇的日誌系統廣播所有的訊息給所有的消費者。我們希望可以對其擴充套件,來允許根據日誌的嚴重性進行過濾日誌。例如:我們可能希望把致命型別的錯誤寫入硬碟,而不把硬碟空間浪費在警告或者訊息型別的日誌上。
之前我們使用fanout型別的轉發器,但是並沒有給我們帶來更多的靈活性:僅僅可以愚蠢的轉發。
我們將會使用direct型別的轉發器進行替代。direct型別的轉發器背後的路由轉發演算法很簡單:訊息會被推送至繫結鍵(binding key)和訊息釋出附帶的選擇鍵(routing key)完全匹配的佇列。
圖解:

上圖,我們可以看到direct型別的轉發器與兩個佇列繫結。第一個佇列與繫結鍵orange繫結,第二個佇列與轉發器間有兩個繫結,一個與繫結鍵black繫結,另一個與green繫結鍵繫結。
這樣的話,當一個訊息附帶一個選擇鍵(routing key) orange釋出至轉發器將會被導向到佇列Q1。訊息附帶一個選擇鍵(routing key)black或者green將會被導向到Q2.所有的其他的訊息將會被丟棄。

3、多重繫結(multiple bindings)

使用一個繫結鍵(binding key)繫結多個佇列是完全合法的。如上圖,一個附帶選擇鍵(routing key)的訊息將會被轉發到Q1和Q2。

4、傳送日誌(Emittinglogs)

我們準備將這種模式用於我們的日誌系統。我們將訊息傳送到direct型別的轉發器而不是fanout型別。我們將把日誌的嚴重性作為選擇鍵(routing key)。這樣的話,接收程式可以根據嚴重性來選擇接收。我們首先關注傳送日誌的程式碼:

像以前一樣,我們需要先建立一個轉發器:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

然後我們準備傳送一條訊息:

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());

為了簡化程式碼,我們假定‘severity’是‘info’,‘warning’,‘error’中的一個。

5、訂閱

接收訊息的程式碼和前面的部落格的中類似,只有一點不同:我們給我們所感興趣的嚴重性型別的日誌建立一個繫結。

StringqueueName = channel.queueDeclare().getQueue();

for(Stringseverity : argv)

{

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

6、完整的例項
傳送端:EmitLogDirect.java
package com.zhy.rabbit._04_binding_key;import java.util.Random;import java.util.UUID;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirectprivate static final String EXCHANGE_NAME = "ex_logs_direct"private static final String[] SEVERITIES = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException {  // 建立連線和頻道  ConnectionFactory factory = new ConnectionFactory();  factory.setHost("localhost");  Connection connection = factory.newConnection();  Channel channel = connection.createChannel();  // 宣告轉發器的型別  channel.exchangeDeclare(EXCHANGE_NAME, "direct");  //傳送6條訊息  for (int i = 0; i < 6; i++)  {   String severity = getSeverity();   String message = severity + "_log :" + UUID.randomUUID().toString();   // 釋出訊息至轉發器,指定routingkey   channel.basicPublish(EXCHANGE_NAME, severity, null, message     .getBytes());   System.out.println(" [x] Sent '" + message + "'");  }  channel.close();  connection.close(); } /**  * 隨機產生一種日誌型別  *   * @return  */ private static String getSeverity() {  Random random = new Random();  int ranVal = random.nextInt(3);  return SEVERITIES[ranVal]; }}

隨機發送6條隨機型別(routing key)的日誌給轉發器~~接收端:ReceiveLogsDirect.java
package com.zhy.rabbit._04_binding_key;import java.util.Random;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class ReceiveLogsDirectprivate static final String EXCHANGE_NAME = "ex_logs_direct"private static final String[] SEVERITIES = { "info", "warning", "error" }; public static void main(String[] argv) throws java.io.IOException,   java.lang.InterruptedException {  // 建立連線和頻道  ConnectionFactory factory = new ConnectionFactory();  factory.setHost("localhost");  Connection connection = factory.newConnection();  Channel channel = connection.createChannel();  // 宣告direct型別轉發器  channel.exchangeDeclare(EXCHANGE_NAME, "direct");  String queueName = channel.queueDeclare().getQueue();  String severity = getSeverity();  // 指定binding_key  channel.queueBind(queueName, EXCHANGE_NAME, severity);  System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");  QueueingConsumer consumer = new QueueingConsumer(channel);  channel.basicConsume(queueName, true, consumer);  while (true)  {   QueueingConsumer.Delivery delivery = consumer.nextDelivery();   String message = new String(delivery.getBody());   System.out.println(" [x] Received '" + message + "'");  } } /**  * 隨機產生一種日誌型別  *   * @return  */ private static String getSeverity() {  Random random = new Random();  int ranVal = random.nextInt(3);  return SEVERITIES[ranVal]; }}

接收端隨機設定一個日誌嚴重級別(binding_key)。。。我開啟了3個接收端程式,兩個準備接收error型別日誌,一個接收info型別日誌,然後執行傳送端程式執行結果: [x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
 [x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
------------------------------------------------------------------------------------ [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
 [*] Waiting for info logs. To exit press CTRL+C
 [x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
可以看到我們實現了博文開頭所描述的特性,接收者可以自定義自己感興趣型別的日誌。其實文章這麼長就在說:傳送訊息時可以設定routing_key,接收佇列與轉發器間可以設定binding_key,接收者接收與binding_key與routing_key相同的訊息。
           

相關推薦

RabbitMQ 路由選擇 Routing

                上一篇部落格我們建立了一個簡單的日誌系統,我們能夠廣播日誌訊息給所有你的接收者,如果你不瞭解,請檢視:RabbitMQ (三) 釋出/訂閱。本篇部落格我們準備給日誌系統新增新的特性,讓日誌接收者能夠訂閱部分訊息。例如,我們可以僅僅將致命的錯誤寫入日誌檔案,然而仍然在控制面板上打

輕松搞定RabbitMQ——路由選擇

byte[] view 轉發器 ews 磁盤空間 表示 info 直接 net 轉自 http://blog.csdn.net/xiaoxian8023/article/details/48733249 翻譯地址:http://www.rabbitmq.com/tutori

RabbitMQ ——路由

路由 key eve lar enter align queue bit 機會 RabbitMQ(四) ——路由 (轉載請附上本文鏈接——linhxx) 一、概述 路由模式(routing)是交換機不將消息廣播到全部的隊列中,而是采用交換機的另一種模式——direc

RabbitMQ路由選擇

在前篇博文中,我們建立了一個簡單的日誌系統。可以廣播訊息給多個消費者。本篇博文,我們將新增新的特性——我們可以只訂閱部分訊息。比如:我們可以接收Error級別的訊息寫入檔案。同時仍然可以在控制檯列印所有日誌。 Bindings(繫結) 在上一篇部落格中我

學習yaf路由

protect 根據 route 構造 bar mod enc ble function 使用路由 使用路由既可以讓之很復雜,同時也能讓它很簡單,這是歸於你的應用。然而使用一個路由是很簡單的,你可以添加你的路由協議給路由器,這樣就OK了! 不同的路

python採用pika庫使用rabbitmq訊息確認Message acknowledgment

從上篇文章可知,每個工作者,都會依次分配到任務。那麼如果一個工作者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其他工作者處理。所以應當有一種機制,當一個工作者完成任務時,會反饋訊息。 訊息確認就是當工作者完成任務後,會反饋給rabbitmq 修改receive.py的內容: 1 def c

python採用pika庫使用rabbitmq選擇的接收訊息(exchange type=direct)

RabbitMQ還支援根據關鍵字傳送,即:佇列繫結關鍵字,傳送者將資料根據關鍵字傳送到訊息exchange,exchange根據 關鍵字 判定應該將資料傳送至指定佇列。 1 import pika 2 import sys 3 4 connection = pika.B

排序演算法選擇排序 —— 簡單選擇排序 和 堆排序

1、簡單選擇排序簡單選擇排序思想是:從頭到尾(從後往前也行)遍歷序列,先固定第一個位置的資料,將該位置後面的資料,依次和這個位置的資料進行比較,如果比固定位置的資料大,就交換。這樣,進行一趟排序以後,第一個位置就是最小的數了。然後重複進行,第 2 次遍歷並且比較後,第二個位置

RabbitMQRabbitMQ死信郵箱DLX

DLX,Dead-Letter-Exchange(死信郵箱) 利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。訊息變成死信一向有以下幾種情況: 訊息被拒絕(basic.reject or basi

學習 Laravel 那些坑 路由

位置 5.4的時候,路由還在 app/Http/routes.php 5.6的時候就挪到 app 目錄外的 routes/web.php 對於一個 MVC 框架,如何解析路由是非常重要的問題,這樣變來變去真得好嗎?

spring cloud快速入門教程路由閘道器Zuul

現在服務也統一註冊管理了,配置也統一管理了,我們就可以瘋狂的開發各項微服務了,是不是還覺得少了點什麼?前端怎麼訪問到相應服務?這就用到路由網關了。 路由閘道器就是整個微服務的統一入口,看看第一張的架構圖,專案的前端做成了動靜分離,靜態檔案、html頁面、css檔案和js檔案

iOS路由設計路由設計思路分析

前言 隨著使用者的需求越來越多,對App的使用者體驗也變的要求越來越高。為了更好的應對各種需求,開發人員從軟體工程的角度,將App架構由原來簡單的MVC變成MVVM,VIPER等複雜架構。更換適合業務的架構,是為了後期能更好的維護專案。 但是使用者依舊不滿意,繼續對開發人員提出了更多更高的要求,不

RabbitMQ遠端連線RabbitMQ

為了避免汙染宿主系統環境,於是在虛擬機器中搭建了一個linux環境並且安裝了rabbitmq-server。然後在遠端連線的時候一直連線失敗。官網上面給的例子都是在本地使用系統預設的guest使用者連線的。沒有給出遠端連線的例子,於是閱讀文件發現: When th

開始一個React專案路由例項v4

前言 在開始一個React專案(三)路由基礎(v4)中我大概總結了一下web應用的路由,這一篇我會接著上一篇分享一些例子。 簡單的路由示例 一個最簡單的網站結構是首頁和幾個獨立的二級頁面,假如我們有三個獨立的二級頁面分別為:新聞頁、課程頁、加入我們,路

Java語言基礎選擇結構之 if 語句

在Java中,選擇結構有:if語句和switch語句。 if語句的語法格式1: if(比較表示式) { 語句體; } 它的執行流程: 1.先計算比較表示式的值,看其返回值是true還是fal

RabbitMQ:使用Docker構建RabbitMQ高可用負載均衡叢集

本文使用Docker搭建RabbitMQ叢集,然後使用HAProxy做負載均衡,最後使用KeepAlived實現叢集高可用,從而搭建起來一個完成了RabbitMQ高可用負載均衡叢集。受限於自身條件,本文使用VMware虛擬機器的克隆功能克隆了兩臺伺服器進行操作,僅作為一個demo,開發中可根據實際情況進行調整

訊息中介軟體——RabbitMQ命令列與管控臺的基本操作!

前言 在前面的文章中我們介紹過RabbitMQ的搭建:RabbitMQ的安裝過以及各大主流訊息中介軟體的對比:,本章就主要來介紹下我們之前安裝的管控臺是如何使用以及如何通過命令列進行操作。 1. 命令列操作 1.1 基礎服務的命令操作 rabbitmqctl stop_app:關閉應用 rabbitm

RabbitMq學習筆記—— 訊息路由Routing

//宣告直連交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 獲取匿名佇列名稱 String queueName = channel.queueDeclare().getQueue()

【c#】RabbitMQ學習文檔Routing路由

sin chan command 接收 color prop html 記錄 blank 原文:【c#】RabbitMQ學習文檔(四)Routing(路由) (使用Net客戶端)在上一個教程中,我們構建了一個簡單的日誌系統,我們能夠向許多消息接受者廣播發送日誌消息。在本教

路由交換基礎——ACL訪問控制列表

per not 由器 地址 同時 擴展 數據包 而不是 需要 一、ACL1.作用訪問控制列表(Access Control List),是路由器和交換機接口的指令列表,用來控制端口進出的數據包。ACL可以過濾網絡中的流量,是控制訪問的一種網絡技術手段。配置ACL後,可以限制