1. 程式人生 > 實用技巧 >【RabbitMQ】叢集

【RabbitMQ】叢集

叢集

叢集搭建

環境準備

  • 準備三臺虛擬機器,主機名分別為node1、node2、node3
$ vi /etc/hosts

## 新增ip 主機名對映

192.168.0.90 node1
192.168.0.91 node2
192.168.0.81 node3
  • 分別安裝rabbitmq
## 新增erlang 源至yum儲存庫
$ rpm -Uvh https://download.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm

## wget https://packages.erlang-solutions.com/erlang-19.0.4-1.el7.centos.x86_64.rpm
## rpm -Uvh erlang-19.0.4-1.el7.centos.x86_64.rpm

## 安裝erlang
$ yum -y install erlang

## 匯入RabbitMQ源
$ wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm
$ rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm

## 安裝RabbitMQ公共庫祕鑰
$ rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

## 安裝RabbitMQ
$ yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm

## 新增mq開機自啟
$ chkconfig rabbitmq-server on

## 以守護程序方式後臺執行
$ rabbitmq-server -detached

## 啟動RabbitMQ服務 埠5672
$ service rabbitmq-server start

## 關閉RabbitMQ服務
$ service rabbitmq-server stop
$ rabbitmqctl stop

## 檢視RabbitMQ服務狀態
$ service rabbitmq-server status
$ rabbitmqctl status

## 檢視已開啟外掛
$ rabbitmq-plugins list

## 開啟管理頁面 埠15672
$ rabbitmq-plugins enable rabbitmq_management

## 刪除預設使用者 guest/guest
$ rabbitmqctl delete_user guest
 
## 新增使用者
$ rabbitmqctl add_user  {username} {password}
 
## 設定tag
$ rabbitmqctl set_user_tags {username} administrator
 
## 賦予許可權(最大)
$ rabbitmqctl set_permissions -p / {username} ".*" ".*" ".*"
 
## 檢視確認許可權賦予是否成功
$ rabbitmqctl list_user_permissions {username}

## 修改密碼
$ rabbitmqctl change_password {username} {new_password}

交換金鑰令牌

編輯 RabbitMQ cookie 檔案,以確保各個節點的 cookie 檔案使用的是同一個值。

可以讀取 node1 節點的 cookie 然後將其複製到 node2 node3 節點中 cookie 檔案預設路徑為 /var/lib/rabbitmq/.erlang.cookie。

cookie 相當於金鑰令牌,叢集中的 RabbitMQ 節點需要通過交換金鑰令牌以獲得相互認證,如果節點的金鑰令牌不一致,那麼在配置節點時就會報錯。

配置叢集

配置叢集有三種方式:

  • 通過 rabbitmqctl 工具的方式配置叢集,這種方式也是最常用的方式,下面的演示也將使用rabbitmqctl 進行配置
  • rabbitmq.config 配置檔案配置
  • 通過 rabbitmq-autocluster 外掛配置

任選一個節點(以node1為例)為基準,將另外的節點加入選中節點叢集

## node2 & node3

## 關閉RabbitMQ 應用
$ rabbitmqctl stop_app
## 將節點重置還原到最初狀態。包括從原來所在的叢集中刪除此節點,從管理資料庫中刪除所有的配置資料
$ rabbitmqctl reset
## 將節點加入指定叢集中
$ rabbitmqctl join_cluster rabbit@node1
## 啟動RabbitMQ 應用
$ rabbitmqctl start_app

檢視各節點狀態

$ rabbitmqctl cluster_status

如果關閉了叢集中的所有節點,則需要確保在啟動的時候最後關閉的那個節點是第一個啟動。如果第一個啟動的不是最後關閉的節點,那麼這個節點會等待最後關閉的節點啟動。這個等待時間是30 秒,如果沒有等到,那麼這個先啟動的節點也會失敗。在最新的版本中會有重試機制,預設重試30 秒以等待最後關閉的節點啟動。

如果最後一個關閉的節點最終由於某些異常而無法啟動,則可以通過 rabbitmqctl forget_cluster_node 命令來將此節點剔出當前叢集。

例如,叢集中節點按照 node3 node2 node1 順序關閉,此時如果要啟動叢集,就要先啟動 node1 節點。這裡可以在 node2 節點中執行命令將 node1 節點剔除出當前叢集:

## --offline 引數可以在非執行狀態下將 node1 剝離出當前叢集
$ rabbitmqctl forget_cluster_node rabbit@node1 --offline

如果叢集中的所有節點由於某些非正常因素,比如斷電而關閉,那麼叢集中的節點都會認為還有其他節點在它後面關閉,此時需要呼叫以下命令來啟動一個節點,之後叢集才能正常啟動。

$ rabbitmqctl force_boot

叢集節點型別

在使用 rabbitmqctl cluster_status 命令來檢視叢集狀態時會有 {nodes [{disc, [rabbit@nodel, rabbit@node2, rabbit@node3]}]} 一項資訊,其中的 disc 標註了RabbitMQ 節點的型別。

  • disc 磁碟節點
  • ram 記憶體節點

記憶體節點將所有的佇列、交換器、繫結關係、使用者、許可權和 host 的元資料定義都儲存在記憶體中,而磁碟節點則將這些資訊儲存到磁碟中。

單節點的叢集中必然只有磁碟型別的節點,否則當重啟MQ之後,所有關於系統的配置資訊都會丟失。不過在叢集中,可以選擇配置部分節點為記憶體節點,這樣可以獲得更高的效能。

比如將node2 節點加入node1 節點的時候可以指定node2 節點的型別為記憶體節點(預設磁碟節點):

$ rabbitmqctl join_cluster rabbit@node1 --ram

如果叢集已經搭建好了,可以使用rabbitmqctl change_cluster_node_type {disc ram}命令來切換節點的型別:

$ rabbitmqctl stop_app
## 將node2 從記憶體節點轉換為磁碟節點
$ rabbitmqctl change_cluster_node_type disc
$ rabbitmqctl start app

剔除單個節點

有兩種方式將 node2 剝離出當前叢集:

第一種: 在 node2 節點上執行rabbitmqctl stop_app或者rabbitmqctl stop 命令來關閉RabbitMQ 服務。之後再在 node1 節點或者 node3 節點上執行rabbitmqctl forget_cluster_node rabbit@node2命令將 node1 節點剔除出去。這種方式適合 node2 節點不再執行RabbitMQ 情況。

## node2
$ rabbitmqctl stop_app
## node1 | node3
$ rabbitmqctl forget_cluster_node rabbit@node2 

第二種: 在 node2 上執行 rabbitmqctl reset 命令。如果不是由於啟動順序的緣故而不得不刪除一個叢集節點,建議採用這種方式。

$ rabbitmqctl forget_cluster_node rabbit@node1 --offline

rabbitmqctl reset 命令將清空節點的狀態並將其恢復到空白狀態。當重設的節點是叢集中的一部分時,該命令也會和叢集中的磁碟節點進行通訊,告訴它們該節點正在離開叢集。不然叢集會認為該節點出了故障 並期望其最終能夠恢復過來。

叢集節點升級

單節點

如果 RabbitMQ 叢集由單獨的一個節點組成,那麼升級版本很容易,只需關閉原來的服務,然後解壓新的版本再執行即可。不過要確保原節點的 Mnesia 中的資料不被變更,且新節點中的 Mnesia 路徑的指向要與原節點中的相同。或者說保留原節點 Mnesia 資料 然後解壓新版本到相應的目錄,再將新版本的 Mnesia 路徑指向保留的 Mnesia資料的路徑(也可以直接複製保留 Mnesia 資料到新版本中相應的目錄),最後啟動新版本的服務即可。

多節點

如果 RabbitMQ 叢集由多個節點組成,那麼也可以參考單個節點的情形。具體步驟如下:

  1. 關閉所有節點的服務 注意採用 rabbitmqctl stop 命令關閉。
  2. 儲存各個節點的 Mnesia 資料
  3. 解壓新版本的 RabbitMQ 到指定的目錄
  4. 指定新版本的 Mnesia 路徑為步驟2儲存的 Mnesia 資料路徑
  5. 啟動新版本的服務,注意先重啟原版本中最後關閉的那個節點

其中步驟4步驟5可以一起操作,比如執行 RABBITMQ MNESIA BASE=/opt/mnesia rabbitmq-server-detached 命令,其中 /opt/mnesia 為原版本儲存 Mnesia 資料的路徑。

服務日誌

RabbitMQ 的日誌預設存放在$RABBITMQ_HOME/var/log/rabbitmq 資料夾內。在這個資料夾內 RabbitMQ 會建立兩個日誌檔案 RABBITMQ_NODENAME-sasl.log 和 RABBITMQ_NODENAME.log 。

  • RABBITMQ_NODENAME-sasl.log 記錄 Erlang 相關資訊,例如檢視 Erlang 崩潰報告。
  • RABBITMQ_NODENAME.log 記錄 RbbitMQ 應用服務日誌。

單節點故障恢復

RabbitMQ 使用過程中,或多或少都會遇到一些故障,對於叢集層面來說,更多的是單點故障。所謂的單點故障是指叢集中單個節點發生了故障,有可能會引起叢集服務不可用、資料丟失等異常。配置資料節點冗餘(映象佇列)可以有效地防止由於單點故障而降低整個叢集的可用性、可靠性。

單節點故障包括:機器硬體故障、機器掉電、網路異常、服務程序異常。

機器硬體故障

單節點機器硬體故障包括機器硬碟、記憶體、主機板等故障造成的宕機,無法從軟體角度來恢復,此時需要在叢集中的其他節點中執行rabbitmqctl forget_cluster_node {nodename} 命令來將故障節點剔除。

如果之前有客戶端連線到此故障節點上,在故障發生時會有異常報出,此時需要將故障節點的ip地址從連線列表裡刪除,並讓客戶端重新與叢集中的節點建立連線,以恢復整個應用。如果此故障機器修復或者原本有備用機器,那麼也可以選擇性的新增到叢集中。

機器掉電故障

當遇到機器掉電故障,需要等待電源接通之後重啟機器。此時這個機器節點上的 RabbitMQ 處於 stop 狀態,但是此時不要盲目重啟服務,否則可能會引起網路分割槽。

此時同樣需要在其他節點上執行 rabbitmqctl forget_cluster_node {nodename} 命令將此節點從叢集中剔除,然後刪除當前故障機器的 RabbitMQ 中的 Mnesia
資料(相當於重置),然後再重啟 RabbitMQ 服務,最後再將此節點作為一個新的節點加入到當前叢集中。

網路異常

網線鬆動或者網絡卡損壞都會引起網路故障的發生。

  • 對於網線鬆動,無論是徹底斷開,還是“藕斷絲連”,只要它不降速,RabbitMQ 叢集就沒有任何影響,但是為了保險起見,建議先關閉故障機器的 RabbitMQ 程序,然後對網線進行更換或者修復操作,之後再考慮是否重新開啟RabbitMQ 程序。

  • 而網絡卡故障極易引起網路分割槽的發生,如果監控到網絡卡故障而網路分割槽尚未發生時,理應第一時間關閉此機器節點上的 RabbitMQ 程序,在網絡卡修復之前不建議再次開啟,如果己經發生了網路分割槽,可以進行手動恢復網路分割槽。

服務程序異常

對於服務程序異常,如 RabbitMQ 程序非預期終止,需要預先思考相關風險是否在可控範圍之內。如果風險不可控,可以選擇拋棄這個節點。一般情況下,重新啟動 RabbitMQ 服務程序即可。

叢集遷移

元資料重建

元資料重建是指在新的叢集中建立原叢集的佇列、交換器、繫結關係、host 、使用者、許可權和Parameter 等資料資訊。元資料重建之後才可將原叢集中的訊息及客戶端連線遷移過來。

有很多種方法可以重建元資料,比如通過手工建立或者使用客戶端建立。通過人工的方式來整理元資料是極其煩瑣、低效的,且時效性太差,不到萬不得已不建議使用,可以通過 Web 管理介面的方式重建,直接在 Import / export definitions 下載叢集的元資料資訊json檔案。然後匯入新叢集。

這種方式需要考慮三個問題:

1. 如果原叢集突發故障,又或者開啟 RabbitMQ Management 外掛的那個節點機器故障不可修復,就無法匯出原叢集的元資料。

這個問題 很好解決,採取一個通用的備份任務在元資料有變更或者達到某個儲存週期時將最新的元資料配置備份至另一處安全的地方。這樣在遇到需要叢集遷移時,可以獲取到最新的元資料。

2. 如果新舊叢集的 RabbitMQ 版本不一致時會出現異常情況。

比如新建立了3.6.10 版本的叢集,舊叢集版本為3.5.7 ,這兩個版本元資料就不相同。3.5.7 版本中的user 項的內容 3.6.10 版本的加密演算法是不一樣。

這裡可以簡單地在 Shell 控制檯輸入變更密碼的方式來解決這個問題:

$ rabbitmqctl change_password {username} {new_password}

如果還是不能成功上傳元資料,那麼就需要進一步採取措施。首先對於使用者、策略、許可權這種元資料來說內容相對固定,且內容較少,手工重建的代價較小。相反叢集中元資料最多且最複雜的要數佇列、交換器和繫結這三項的內容,這三項內容還涉及其內容的引數設定,如果採用人工重建的方式代價太大,重建元資料的意義其實就在於重建佇列、交換器及繫結這三項的相關資訊。

  • 這裡有個小竅門,可以將3.6.10 的元資料從 queues 這一項前面的內容,包括 rabbit_version 、users、vhosts、permissions、parameters、global_parameters和policies
    這幾項內容複製後替換 3.5.7 版本中的 queues 這一項前面的所有內容然後再儲存。之後將修改
    並儲存過後的 3.5.7 版本的元資料 JSON 檔案上傳到新叢集 3.6.10 版本的 Web 管理介面中,至此就完成了叢集的元資料重建。

3. 如果採用上面的方法將元資料在新叢集上重建,則所有的佇列都只會落到同一個叢集節點上,而其他節點處於空置狀態,這樣所有的壓力將會集中到這單臺節點之上。

處理這個問題,有兩種方式,都是通過程式(或者指令碼)的方式在新叢集上建立元資料,而非簡單地在頁面上上傳元資料檔案而己。

  • 第一種方式是通過 HTTPAPI 介面建立相應的資料
  • 第二種方式是隨機連線叢集中不同的節點的地址,然後再建立佇列。與前一種方式需要節點名稱的列表不同,這裡需要的是節點IP地址列表。

資料遷移和客戶端連線切換

元資料重建為叢集遷移前必要的準備工作,在遷移過程中的主要工作步驟如下:

生產者

首先需要將生產者的客戶端與原 RabbitMQ 叢集的連線斷開,然後再與新的叢集建立新的連線,這樣就可以將新的訊息流轉入到新的叢集中。

消費者

一種是等待原叢集中的訊息全部消費完之後再將連線斷開,然後與新叢集建立連線進行消費作業。可以通過 Web 頁面檢視訊息是否消費完成。也可以通過 rabbitmqctl list_queues name messages messages_ready messages_unacknowledged 命令來檢視是否有未被消費的訊息。

當原叢集服務不可用或者出現故障造成服務質量下降而需要迅速將訊息流切換到新的叢集中時,此時就不能等待消費完原叢集中的訊息,這裡需要及時將消費者客戶端的連線切換到新的叢集中,那麼在原叢集中就會殘留部分未被消費的訊息,此時需要做進一步的處理。如果原叢集損壞,可以等待修復之後將資料遷移到新叢集中,否則會丟失資料。

資料遷移原理

資料遷移的主要原理是先從原叢集中將資料消費出來,然後存入一個快取區中,另一個執行緒讀取快取區中的訊息再發布到新的叢集中完成資料遷移。

RabbitMQ 本身提供的 Federation Shove 外掛都可以實現此功能,確切地說 Shove 外掛更貼近,不過自定義的遷移工具(可以稱之為RabbitMQ ForwarMaker)可以讓遷移系統更加高效、靈活。

自動化遷移

要實現叢集自動化遷移,需要在使用相關資源時就做好一些準備工作,方便在自動化遷移過程中進行無縫切換。

與生產者和消費者客戶端相關的是交換器、佇列及叢集的資訊,如果這種型別的資源發生改變時需要讓客戶端迅速感知,以便進行相應的處理,則可以通過將相應的資源載入到 ZooKeeper 的相應節點中,然後在客戶端為對應的資源節點加入 watcher 來感知變化,當然這個功能使用 etc 或者整合到公司層面的資源配置中心中會更加標準、高效。

如圖所示,將整個 RabbitMQ 叢集資源的使用分為三個部分:客戶端、叢集、 ZooKeeper配置管理。

在叢集中建立元資料資源時都需要在 ZooKeeper 生成相應的配置,比如在 cluster1 叢集中建立交換器 exchange1 之後,需要在 /rmqNode/exchanges 路徑下建立實節點 exchange1 並賦予節點的資料內容為:

cluster=cluster1 # 表示此交換器所在的叢集名稱
exchangeType=direct # 表示此交換器的型別
vhost=vhost1 # 表示此交換器所在的 vhost
username=root # 表示使用者名稱
password=123 # 表示密碼

同樣,在 cluster1 叢集中建立佇列 queue1 之後,需要在 /rmqNode/queues 路徑下建立實節點 queue1 ,並賦予節點的資料內容為:

cluster=cluster1 
bindings=exchange1 # 表示此佇列所繫結的交換器
# 如果有需要,也可以新增一些其他資訊,比如路由鍵等
vhost=vhost1
userni me=root
password=123

對應叢集的資料在 /rmqNode/clusters 路徑下,比如 cluster1 叢集,其對應節點的資料內容包含 IP 地址列表資訊:

ipList=192.168.0.1 ,192.168.0.2, 192.168.0.3 # 叢集中各個節點的IP地址資訊

客戶端程式如果與其上的交換器或者佇列進行互動,那麼需要在相應的 ZooKeeper 節點中新增 watcher ,以便在資料發生變更時進行相應的變更,從而達到自動化遷移的目的。

生產者客戶端在傳送訊息之前需要先連線 ZooKeeper ,然後根據指定的交換器名稱如exchange1 到相應的路徑/rmqNode/exchanges 中尋找 exchange1 的節點,之後再讀取節點中的資料,並同時對此節點新增 watcher 。在節點的資料第一條 “cluster=cluster1” 中找到交換器所在的叢集名稱,然後再從路徑 /rmqNode/clusters 中尋找 cluster1 節點,然後讀取其對應IP 地址列表資訊。這樣整個傳送端所需要的連線串資料(IP地址列表、vhost、usename、password等)都已獲取,接下就可以與 RabbitMQ 叢集 cluster1 建立連線然後傳送資料了。

對於消費者客戶端而言,同樣需要連線ZooKeeper,之後根據指定的佇列名稱(queue1)到相應的路徑 /rmqNode/queues 中尋找 queue1 節點,繼而找到相應的連線串,然後與RabbitMQ 叢集cluster1 建立連線進行消費。當然對 /rmqNode/queues/queue1 節點的 watcher 必不可少。

當cluster1 叢集需要遷移到 cluster2 叢集時,首先需要將 cluster1 叢集中的元資料在 cluster2 叢集中重建。之後通過修改 channel 和 queue 元資料資訊,比如原 cluster1 叢集中有交換器exchange1、exchange2 和佇列 queue1、queue2,現在通過指令碼或者程式將其中的"cluster=cluster1"資料修改為"cluster=cluster2"。客戶端會立刻感知節點的變化,然後迅速關閉當前連線之後再與新叢集 cluster2 建立新的連線後生產和消費訊息,在此切換客戶端連線的過程中是可以保證資料零丟失的。遷移之後,生產者和消費者都會與cluster2 叢集進行互通,此時原 cluster1 叢集中可能還有未被消費完的資料,此時需要使用前文中描述的自定義遷移工具(RabbitMQ ForwarMaker)將cluster1 叢集中未被消費完的資料同步到 cluster2 叢集中。

如果沒有準備 RabbitMQ ForwardMaker 工具,也不想使用 Federation 或者 Shovel 外掛,那麼在變更完交換器相關的 ZooKeeper 中的節點資料之後,需要等待原叢集中的所有佇列都消費完全之後,再將佇列相關的 ZooKeeper 中的節點資料變更,進而使得消費者的連線能夠順利遷移到新的叢集之上。可以通過下面的命令來檢視是否有佇列中的訊息未被消費完:

$ rabbitmqctl list_queues -p / -q | awk '{if($2>0} print $0}'

其他

重置資料

## 刪除原有的資料
$ rm -rf /var/lib/rabbitmq/mnesia/*

## 重啟服務
$ rabbitmq-server -detached

殺程序重啟

## 查詢mq的程序
$ ps -ef | grep rabbitmq

## 將mq的程序殺掉
$ ps -ef | grep rabbitmq | grep -v grep | awk '{print $2}' | xargs kill -9

## 啟動mq
$ rabbitmq-server -detached

## 查詢mq的狀態
$ rabbitmqctl status

跨越叢集界限

RabbitMQ 可以通過3 種方式實現分散式部署:

  • 叢集
  • Federation
  • Shovel

這3 種方式不是互斥的,可以根據需要選擇其中的一種或者以幾種方式的組合來達到分散式部署的目的。Federation 、Shovel 可以為RabbitMQ 的分散式部署提供更高的靈活性,但同時也提高了部署的複雜性。

Federation

Federation 外掛的設計目標是使 RabbitMQ 在不同的 Broker 節點之間進行訊息傳遞而無須建立叢集,該功能在很多場景下都非常有用:

  • Federation 外掛能夠在不同管理域(可能設定了不同的使用者和 vhost ,也可能執行在不同版本的 RabbitMQ Erlang 上)中的 Broker 或者叢集之間傳遞訊息。
  • Federation 外掛基於 AMQP 0-9-1 協議在不同的Broker 之間進行通訊,並設計成能夠容忍不穩定的網路連線情況。
  • 一個Broker 節點中可以同時存在聯邦交換器(或佇列)或者本地交換器(或佇列),只需要對特定的交換器(或佇列)建立 Federation 連線(Federation link)。
  • Federation 需要在 Broker 節點之間建立 O(N^2) 個連線(儘管這是最簡單的使用方式),這也就意味 Federation 在使用時更容易擴充套件。

Federation 外掛可以讓多個交換器或者多個佇列進行聯邦:

  • 一個聯邦交換器(federated exchange)或者一個聯邦佇列(federated queue)接收上游(upstream)的訊息,這裡的上游是指位於其他 Broker 上的交換器或者佇列。
  • 聯邦交換器能夠將原本傳送給上游交換器(upstream exchange)的訊息路由到本地的某個佇列中。
  • 聯邦佇列允許一個本地消費者接收到來自上游佇列(upstream queue)的訊息。

聯邦交換器

假設下圖中 broker1 部署在北京,broker2 部署在上海,而 broker3 部署在廣州,彼此之間相距甚遠,網路延遲是一個不得不面對的問題。

例如:有一個在廣州的業務 ClientA 需要連線broker3 ,井向其中的交換器 exchangeA 傳送訊息,此時的網路延遲很小,ClientA 可以迅速將訊息傳送至 exchangeA 中,就算在開啟了 publisher confirm 機制或者事務機制的情況下,也可以迅速收到確認資訊。此時又有一個在北京的業務ClientB 需要向 exchangeA 傳送訊息,那麼 ClientB broker3 之間有很大的網路延遲,ClientB 將傳送訊息至exchangeA 會經歷一定的延遲,尤其是在開啟了 publisher confirm 機制或者事務機制的情況下,ClientB 會等待很長的延遲時間來接收 broker3 的確認資訊,進而必然造成這條傳送執行緒的效能降低,甚至造成一定程度上的阻塞。

使用 Federation 外掛就可以很好地解決這個問題:

如下圖所示,在 broker3 中為交換器exchangeA(broker3 中的佇列 queueA 通過 "rkA" 與 exchangeA 進行了繫結)與廣州的 broker1 之間建立一條單向的 Federation link 。

此時 Federation 外掛會在 broker1 上會建立一個同名的交換器 exchangeA (這個名稱可以配置,默認同名),同時建立一個內部的交換器 "exchangeA -> broker3 B" ,並通過路由鍵 "rkA" 將這兩個交換器繫結起來。這個交換器"exchangeA -> broker3 B" 名字中的 broker3 是叢集名,可以通過 rabbitmqctl set cluster name {new name} 命令進行修改。

與此同時 Federation 外掛還會在 broker1 上建立一個佇列 "federation: exchangeA -> broker3 B" ,並與交換器 "exchangeA -> broker3 B" 進行繫結。Federation 外掛會在佇列 "federation: exchangeA -> broker3 B" broker3 中的交換器 exchangeA 之間建立一條 AMQP 連線來實時地消費佇列 "federation: exchangeA -> broker3 B" 中的資料。

這些操作都是內部的,對外部業務客戶端來說這條 Federation link 建立在broker1 exchangeA 與broker3 exchangeA 之間。

回到前面的問題,部署在北京的業務 ClientB 可以連線 broker1 並向 exchangeA 傳送訊息,這樣 ClientB 可以迅速傳送完訊息並收到確認資訊,而之後訊息通過 Federation link 轉發到 broker3 交換器 exchangeA,最終訊息會存入與 exchangeA 繫結的佇列 queueA 中,消費者最終可以消費佇列 queueA 中的訊息。經過 Federation link 轉發的訊息會帶有特殊的 headers 性標記。

聯邦佇列

除了聯邦交換器,RabbitMQ 還可以支援聯邦佇列 (federated queue)。聯邦佇列可以在多個 Broker 節點(或者叢集)之間為單個佇列提供均衡負載的功能。一個聯邦佇列可以連線一個或者多個上游佇列 (upstream queue),並從這些上游佇列中獲取訊息以滿足本地消費者消費訊息的需求。

下圖演示了:

  1. 位於兩個 Broker 中的幾個聯邦佇列(灰色)和非聯邦佇列(白色) 佇列 queue1、queue2 原本在 broker2 中,由於某種需求將其配置為 federated queue 並將 broker1 作為 upstream

  2. Federation 外掛會在 broker1 上建立同名的佇列 queue1、queue2,與 broker2 中的佇列 queue1、queue2 分別建立兩條單向獨立的 Federation link

  3. 當有消費者 ClinetA 連線 broker2 並通過Basic.Consume 消費佇列 queue1 (或 queue2) 中的訊息時:

    • 如果佇列 queue1 (或 queue2)本身有若干訊息堆積,那麼 ClientA 直接消費這些訊息,此時 broker2 中的 queue1 (或 queue2)並不會拉取 broker1 中的 queue1 (或 queue2) 的訊息;
    • 如果佇列 queue1 (或 queue2) 中沒有訊息堆積或者訊息被消費完了,那麼它會通過 Federation link 拉取在 broker1 中的上游佇列 queue1 (或queue2) 中的訊息(如果有訊息),然後儲存到本地,之後再被消費者 ClientA 進行消費。

消費者既可以消費 broker2 中的佇列,又可以消費 broker1 中的佇列,Federation 的這種分散式佇列的部署可以提升單個佇列的容量。如果在 broker1 端部署的消費者來不及消費佇列queue1 中的訊息,那麼 broker2 端部署的消費者可以為其分擔消費,也可以達到某種意義上的負載均衡。

與federated exchange 不同,一條訊息可以在聯邦佇列間轉發無限次,如圖中兩個佇列queue 互為聯邦佇列。

Federation 的使用

為了能夠使用 Federation 功能, 需要配置以下兩個內容:

  1. 需要配置一個或多個 upstream,每個 upstream 均定義了到其他節點的 Federation link,這個配置可以通過設定執行時的引數 (Runtime Parameter) 來完成,也可以通過 federation management 外掛來完成。

  2. 需要定義匹配交換器或者佇列的一種/多種策略 (Policy)。

Federation 外掛預設在 RabbitMQ 釋出包中,開啟 Federation 功能:

$ rabbitmq-plugins enable rabbitmq_federation

Federation 內部基於 AMQP 協議拉取資料,所以在開啟 rabbitmq federation 外掛的時候,預設會開啟 amqp_c lient 外掛。如果要開啟 Federation 的管理外掛,需要執行 rabbitmq-plugins enable rabbitmq_federation _management 命令。

注意:

當需要在叢集中使用 Federation 功能的時候,叢集中所有的節點都應該開啟 Federation 外掛。

Shovel

與 Federation 具備的資料轉發功能類似,Shovel 能夠可靠、持續地從一個 Broker 中的佇列(作為源端,即 source)拉取資料並轉發至另一個 Broker 中的交換器(作為目的端,即 destination)作為源端的佇列和作為目的端的交換器可以同時位於同一個 Broker 上,也可以位於不同的 Broker 上。 Shovel 可以翻譯為 "鏟子",這個"鏟子"可以將訊息從一方"挖到"另一方。Shovel 的行為就像優秀的客戶端應用程式能夠負責連線源和目的地、負責訊息的讀寫及負責連線失敗問題的處理。

Shovel 的主要優勢在於:

  • 鬆耦合。Shovel 可以移動位於不同管理域中的 Broker(或者叢集)上的訊息,這些 Broker (或者叢集)可以包含不同的使用者和 vhost,也可以使用不同的 RabbitMQ Erlang 版本。
  • 支援廣域網。Shovel 外掛同樣基於 AMQP 協議 Broker 之間進行通訊,被設計成可以容忍時斷時續的連通情形,並且能夠保證訊息的可靠性。
  • 高度定製。當 Shovel 成功連線後,可以對其進行配置以執行相關的 AMQP 命令。

Shovel 原理

下圖為 Shovel 的結構示意圖:

這裡有兩個 Broker: broker1、broker2,broker1 中有交換器 exchange1 和佇列 queue1,且這兩者通過路由鍵 "rk1" 進行繫結;broker2 中有交換器 exchange2 和佇列 queue2 ,且這兩者通過路由鍵"rk2" 進行繫結。在佇列 queue1 和交換器 exchange2 之間配置一個 Shovel link。

當一條內容為 "shovel test payload" 的訊息從客戶端傳送至交換器 exchange1 的時候,這條訊息會經過圖圖示中的資料流轉最後儲存在佇列 queue2 中。如果在配置 Shovel link 時設定了
add-forward-headers 引數為 true,則在消費到佇列 queue2 中這條訊息的時候會有特殊headers 屬性標記。

通常情況下,使用 shovel 時配置佇列作為源端,交換器作為目的端。同樣可以將佇列配置為目的端,如下圖所示:

雖然看起來佇列 queue2 是通過 Shovel link 直接將訊息轉發至 queue2 ,其實中間也是經由 brokr2 的交換器轉發,只不過這個交換器是預設的交換器而己。

如下圖所示,配置交換器為源端也是可行的。雖然看起來交換器 exchange1 是通過 Shovel link 直接將訊息轉發至exchange2 上的,實際上在 broker1 中會新建一個佇列(名稱由 RabbitMQ 自定義,比如圖中的 "amq.gen-ZwolUsoUchY6a7xaPyrZZH") 並繫結 exchange1,訊息從交換器 exchange1 過來先儲存在這個佇列中,然後 Shovel 再從這個佇列中拉取訊息進而轉發至換器 exchange2。

前面所闡述的 broker1 broker2 中的 exchange1 queue1 exchange2 queue2 都可以在 Shovel 成功連線源端或者目的端 Broker 之後再第一次建立(執行一系列相應的 AMQP 配置宣告時),它們並不一定需要在 Shovel link 建立之前建立。Shovel 可以為源端或者目的端配置多個 Broker 的地址,這樣可以使得源端或者目的端的 Broker 失效後能夠重連到其他 Broker 之上(隨機挑選),可以設定 reconnect_delay 引數以避免由於重連行為導致的網路泛洪,或者可以在重連失敗後直接停止連線。針對源端和目的端的所有配置宣告連成功之後被新發送。

Shovel 使用

Shovel 外掛預設 RabbitMQ 釋出包中,開啟方式:

rabbitmq-plugins enable rabbitmq_shovel

Shovel 內部也是基於 AMQP 協議轉發資料的,所以在開啟 rabbitmq_shovel 外掛的時候也是預設開啟 amqp_client 外掛。

同時,如果要開啟 Shovel 的管理外掛需要執行:

rabbitmq-plugins enable rabbitmq_shovel_management