深入理解AMQP協議
文章目錄
一、AMQP 是什麼
AMQP(Advanced Message Queuing Protocol,高階訊息佇列協議)是一個程序間傳遞非同步訊息
二、AMQP模型
工作過程
釋出者(Publisher)釋出訊息(Message),經由交換機(Exchange)。
交換機根據路由規則將收到的訊息分發給與該交換機繫結的佇列(Queue)。
最後 AMQP 代理會將訊息投遞給訂閱了此佇列的消費者,或者消費者按照需求自行獲取。
深入理解
1、釋出者、交換機、佇列、消費者都可以有多個。同時因為 AMQP 是一個網路協議,所以這個過程中的釋出者,消費者,訊息代理 可以分別存在於不同的裝置上。
2、釋出者釋出訊息時可以給訊息指定各種訊息屬性(Message Meta-data)。有些屬性有可能會被訊息代理(Brokers)使用,然而其他的屬性則是完全不透明的,它們只能被接收訊息的應用所使用。
3、從安全形度考慮,網路是不可靠的,又或是消費者在處理訊息的過程中意外掛掉,這樣沒有處理成功的訊息就會丟失。基於此原因,AMQP 模組包含了一個訊息確認(Message Acknowledgements)機制:當一個訊息從佇列中投遞給消費者後,不會立即從佇列中刪除,直到它收到來自消費者的確認回執(Acknowledgement)後,才完全從佇列中刪除。
4、在某些情況下,例如當一個訊息無法被成功路由時(無法從交換機分發到佇列),訊息或許會被返回給釋出者並被丟棄。或者,如果訊息代理執行了延期操作,訊息會被放入一個所謂的死信佇列中。此時,訊息釋出者可以選擇某些引數來處理這些特殊情況。
三、Exchange交換機
交換機是用來發送訊息的 AMQP 實體。
交換機拿到一個訊息之後將它路由給一個或零個佇列。
它使用哪種路由演算法是由交換機型別和繫結(Bindings)規則所決定的。
AMQP 0-9-1 的代理提供了四種交換機:
交換機可以有兩個狀態:持久(durable)、暫存(transient)。
持久化的交換機會在訊息代理(broker)重啟後依舊存在,而暫存的交換機則不會(它們需要在代理再次上線後重新被宣告)。
並不是所有的應用場景都需要持久化的交換機。
預設交換機
預設交換機(default exchange)實際上是一個由訊息代理預先宣告好的沒有名字(名字為空字串)的直連交換機(direct exchange)。
它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每個新建佇列(queue)都會自動繫結到預設交換機上,繫結的路由鍵(routing key)名稱與佇列名稱相同。
舉個栗子:當你聲明瞭一個名為 “search-indexing-online” 的佇列,AMQP 代理會自動將其繫結到預設交換機上,繫結(binding)的路由鍵名稱也是為 “search-indexing-online”。因此,當攜帶著名為 “search-indexing-online” 的路由鍵的訊息被髮送到預設交換機的時候,此訊息會被預設交換機路由至名為 “search-indexing-online” 的佇列中。換句話說,預設交換機看起來貌似能夠直接將訊息投遞給佇列,儘管技術上並沒有做相關的操作。
直連交換機
直連型交換機(direct exchange)是根據訊息攜帶的路由鍵(routing key)將訊息投遞給對應繫結鍵的佇列。直連交換機用來處理訊息的單播路由(unicast routing)(儘管它也可以處理多播路由)。下邊介紹它是如何工作的:
1)將一個佇列繫結到某個交換機上時,賦予該繫結一個繫結鍵(Binding Key),假設為R;
2)當一個攜帶著路由鍵(Routing Key)為R的訊息被髮送給直連交換機時,交換機會把它路由給繫結鍵為R的佇列。
直連交換機的佇列通常是迴圈分發任務給多個消費者(我們稱之為輪詢)。比如說有3個消費者,4個任務。分別分發每個消費者一個任務後,第4個任務又分發給了第一個消費者。綜上,我們很容易得出一個結論,在 AMQP 0-9-1 中,訊息的負載均衡是發生在消費者(consumer)之間的,而不是佇列(queue)之間。
直連型交換機圖例:
當生產者(P)傳送訊息時 Rotuing key=booking 時,這時候將訊息傳送給 Exchange,Exchange 獲取到生產者傳送過來訊息後,會根據自身的規則進行與匹配相應的 Queue,這時發現 Queue1 和 Queue2 都符合,就會將訊息傳送給這兩個佇列。
如果我們以 Rotuing key=create 和 Rotuing key=confirm 傳送訊息時,這時訊息只會被推送到 Queue2 佇列中,其他 Routing Key 的訊息將會被丟棄。
扇型交換機
扇型交換機(funout exchange)將訊息路由給繫結到它身上的所有佇列,而不理會繫結的路由鍵。如果 N 個佇列繫結到某個扇型交換機上,當有訊息傳送給此扇型交換機時,交換機會將訊息的拷貝分別傳送給這所有的 N 個佇列。扇型用來交換機處理訊息的廣播路由(broadcast routing)。
因為扇型交換機投遞訊息的拷貝到所有繫結到它的佇列,所以他的應用案例都極其相似:
- 大規模多使用者線上(MMO)遊戲可以使用它來處理排行榜更新等全域性事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發訊息給參與群聊的使用者。(AMQP 沒有內建 presence 的概念,因此 XMPP 可能會是個更好的選擇)
扇型交換機圖例:
上圖所示,生產者(P)生產訊息 1 將訊息 1 推送到 Exchange,由於 Exchange Type=fanout 這時候會遵循 fanout 的規則將訊息推送到所有與它繫結 Queue,也就是圖上的兩個 Queue 最後兩個消費者消費。
主題交換機
前面提到的 direct 規則是嚴格意義上的匹配,換言之 Routing Key 必須與 Binding Key 相匹配的時候才將訊息傳送給 Queue.
而Topic 的路由規則是一種模糊匹配,可以通過萬用字元滿足一部分規則就可以傳送。
它的約定是:
1)binding key 中可以存在兩種特殊字元 “” 與“#”,用於做模糊匹配,其中 “” 用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
2)routing key 為一個句點號 “.” 分隔的字串(我們將被句點號 “. ” 分隔開的每一段獨立的字串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key 與 routing key 一樣也是句點號 “.” 分隔的字串
主題交換機圖例:
當生產者傳送訊息 Routing Key=F.C.E 的時候,這時候只滿足 Queue1,所以會被路由到 Queue 中,如果 Routing Key=A.C.E 這時候會被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 時,這裡只會傳送一條訊息到 Queue2 中。
主題交換機擁有非常廣泛的使用者案例。無論何時,當一個問題涉及到那些想要有針對性的選擇需要接收訊息的 多消費者 / 多應用(multiple consumers/applications) 的時候,主題交換機都可以被列入考慮範圍。
使用案例:
- 分發有關於特定地理位置的資料,例如銷售點
- 由多個工作者(workers)完成的後臺任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他型別的金融資料更新)
- 涉及到分類或者標籤的新聞更新(例如,針對特定的運動專案或者隊伍)
- 雲端的不同種類服務的協調
- 分散式架構 / 基於系統的軟體封裝,其中每個構建者僅能處理一個特定的架構或者系統。
頭交換機
headers 型別的 Exchange 不依賴於 routing key 與 binding key 的匹配規則來路由訊息,而是根據傳送的訊息內容中的 headers 屬性進行匹配。
頭交換機可以視為直連交換機的另一種表現形式。但直連交換機的路由鍵必須是一個字串,而頭屬性值則沒有這個約束,它們甚至可以是整數或者雜湊值(字典)等。靈活性更強(但實際上我們很少用到頭交換機)。工作流程:
1)繫結一個佇列到頭交換機上時,會同時繫結多個用於匹配的頭(header)。
2)傳來的訊息會攜帶header,以及會有一個 “x-match” 引數。當 “x-match” 設定為 “any” 時,訊息頭的任意一個值被匹配就可以滿足條件,而當 “x-match” 設定為 “all” 的時候,就需要訊息頭的所有值都匹配成功。
交換機小結
型別名稱 | 路由規則 |
---|---|
Default | 自動命名的直交換機 |
Direct | Routing Key==Binding Key,嚴格匹配 |
Fanout | 把傳送到該 Exchange 的訊息路由到所有與它繫結的 Queue 中 |
Topic | Routing Key==Binding Key,模糊匹配 |
Headers | 根據傳送的訊息內容中的 headers 屬性進行匹配 |
四、Queue佇列
AMQP 中的佇列(queue)跟其他訊息佇列或任務佇列中的佇列是很相似的:它們儲存著即將被應用消費掉的訊息。
佇列屬性
佇列跟交換機共享某些屬性,但是佇列也有一些另外的屬性。
- Name
- Durable(訊息代理重啟後,佇列依舊存在)
- Exclusive(只被一個連線(connection)使用,而且當連線關閉後佇列即被刪除)
- Auto-delete(當最後一個消費者退訂後即被刪除)
- Arguments(一些訊息代理用他來完成類似與 TTL 的某些額外功能)
佇列建立
佇列在宣告(declare)後才能被使用。如果一個佇列尚不存在,宣告一個佇列會建立它。如果宣告的佇列已經存在,並且屬性完全相同,那麼此次宣告不會對原有佇列產生任何影響。如果宣告中的屬性與已存在佇列的屬性有差異,那麼一個錯誤程式碼為 406 的通道級異常就會被丟擲。
佇列持久化
持久化佇列(Durable queues)會被儲存在磁碟上,當訊息代理(broker)重啟的時候,它依舊存在。沒有被持久化的佇列稱作暫存佇列(Transient queues)。並不是所有的場景和案例都需要將佇列持久化。
持久化的佇列並不會使得路由到它的訊息也具有永續性。倘若訊息代理掛掉了,重新啟動,那麼在重啟的過程中持久化佇列會被重新宣告,無論怎樣,只有經過持久化的訊息才能被重新恢復。
五、Consumer消費者
訊息如果只是儲存在佇列裡是沒有任何用處的。被應用消費掉,訊息的價值才能夠體現。在 AMQP 0-9-1 模型中,有兩種途徑可以達到此目的:
1)將訊息投遞給應用 (“push API”)
2)應用根據需要主動獲取訊息 (“pull API”)
使用 push API,應用(application)需要明確表示出它在某個特定佇列裡所感興趣的,想要消費的訊息。如是,我們可以說應用註冊了一個消費者,或者說訂閱了一個佇列。一個佇列可以註冊多個消費者,也可以註冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
每個消費者(訂閱者)都有一個叫做消費者標籤的識別符號。它可以被用來退訂訊息。消費者標籤實際上是一個字串。
六、訊息機制
訊息確認
消費者應用(Consumer applications) - 用來接受和處理訊息的應用 - 在處理訊息的時候偶爾會失敗或者有時會直接崩潰掉。而且網路原因也有可能引起各種問題。這就給我們出了個難題,AMQP 代理在什麼時候刪除訊息才是正確的?AMQP 0-9-1 規範給我們兩種建議:
1)自動確認模式:當訊息代理(broker)將訊息傳送給應用後立即刪除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
2)顯式確認模式:待應用(application)傳送一個確認回執(acknowledgement)後再刪除訊息。(使用 AMQP 方法:basic.ack)
如果一個消費者在尚未傳送確認回執的情況下掛掉了,那 AMQP 代理會將訊息重新投遞給另一個消費者。如果當時沒有可用的消費者了,訊息代理會死等下一個註冊到此佇列的消費者,然後再次嘗試投遞。
拒絕訊息
當一個消費者接收到某條訊息後,處理過程有可能成功,有可能失敗。應用可以向訊息代理表明,本條訊息由於 “拒絕訊息(Rejecting Messages)” 的原因處理失敗了(或者未能在此時完成)。
當拒絕某條訊息時,應用可以告訴訊息代理如何處理這條訊息——銷燬它或者重新放入佇列。
當此佇列只有一個消費者時,請確認不要由於拒絕訊息並且選擇了重新放入佇列的行為而引起訊息在同一個消費者身上無限迴圈的情況發生。
在 AMQP 中,basic.reject 方法用來執行拒絕訊息的操作。但 basic.reject 有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的訊息。但是如果你使用的是 RabbitMQ,那麼你可以使用被稱作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 擴充套件來解決這個問題。
預取訊息
在多個消費者共享一個佇列的案例中,明確指定在收到下一個確認回執前每個消費者一次可以接受多少條訊息是非常有用的。這可以在試圖批量釋出訊息的時候起到簡單的負載均衡和提高訊息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生產應用每分鐘才傳送一條訊息,這說明處理工作尚在執行。)
注意,RabbitMQ 只支援通道級的預取計數,而不是連線級的或者基於大小的預取。
訊息屬性
AMQP 模型中的訊息(Message)物件是帶有屬性(Attributes)的。有些屬性及其常見,以至於 AMQP 0-9-1 明確的定義了它們,並且應用開發者們無需費心思思考這些屬性名字所代表的具體含義。例如:
- Content type(內容型別)
- Content encoding(內容編碼)
- Routing key(路由鍵)
- Delivery mode (persistent or not)
- 投遞模式(持久化 或 非持久化)
- Message priority(訊息優先權)
- Message publishing timestamp(訊息釋出的時間戳)
- Expiration period(訊息有效期)
- Publisher application id(釋出應用的 ID)
有些屬性是被 AMQP 代理所使用的,但是大多數是開放給接收它們的應用直譯器用的。有些屬性是可選的也被稱作訊息頭(headers)。他們跟 HTTP 協議的 X-Headers 很相似。訊息屬性需要在訊息被髮布的時候定義。
訊息主體
AMQP 的訊息除屬性外,也含有一個有效載荷 - Payload(訊息實際攜帶的資料),它被 AMQP 代理當作不透明的位元組陣列來對待。
訊息代理不會檢查或者修改有效載荷。訊息可以只包含屬性而不攜帶有效載荷。它通常會使用類似 JSON 這種序列化的格式資料,為了節省,協議緩衝器和 MessagePack 將結構化資料序列化,以便以訊息的有效載荷的形式釋出。AMQP 及其同行者們通常使用 “content-type” 和 “content-encoding” 這兩個欄位來與訊息溝通進行有效載荷的辨識工作,但這僅僅是基於約定而已。
訊息持久化
訊息能夠以持久化的方式釋出,AMQP 代理會將此訊息儲存在磁碟上。如果伺服器重啟,系統會確認收到的持久化訊息未丟失。
簡單地將訊息傳送給一個持久化的交換機或者路由給一個持久化的佇列,並不會使得此訊息具有持久化性質:它完全取決與訊息本身的持久模式(persistence mode)。將訊息以持久化方式釋出時,會對效能造成一定的影響(就像資料庫操作一樣,健壯性的存在必定造成一些效能犧牲)。
七、其他
連線
AMQP 連線通常是長連線。AMQP 是一個使用 TCP 提供可靠投遞的應用層協議。AMQP 使用認證機制並且提供 TLS(SSL)保護。當一個應用不再需要連線到 AMQP 代理的時候,需要優雅的釋放掉 AMQP 連線,而不是直接將 TCP 連線關閉。
通道
有些應用需要與 AMQP 代理建立多個連線。無論怎樣,同時開啟多個 TCP 連線都是不合適的,因為這樣做會消耗掉過多的系統資源並且使得防火牆的配置更加困難。AMQP 0-9-1 提供了通道(channels)來處理多連線,可以把通道理解成共享一個 TCP 連線的多個輕量化連線。
在涉及多執行緒 / 程序的應用中,為每個執行緒 / 程序開啟一個通道(channel)是很常見的,並且這些通道不能被執行緒 / 程序共享。
一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個 AMQP 方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的。
虛擬主機
為了在一個單獨的代理上實現多個隔離的環境(使用者、使用者組、交換機、佇列 等),AMQP 提供了一個虛擬主機(virtual hosts - vhosts)的概念。這跟 Web servers 虛擬主機概念非常相似,這為 AMQP 實體提供了完全隔離的環境。當連線被建立的時候,AMQP 客戶端來指定使用哪個虛擬主機。
AMQP 是可擴充套件的
AMQP 0-9-1 擁有多個擴充套件點:
1)定製化交換機型別:可以讓開發者們實現一些開箱即用的交換機型別尚未很好覆蓋的路由方案。例如 geodata-based routing。)
2)交換機和佇列的宣告中可以包含一些訊息代理能夠用到的額外屬性。例如 RabbitMQ 中的 per-queue message TTL 即是使用該方式實現。)
3)特定訊息代理的協議擴充套件。例如 RabbitMQ 所實現的擴充套件。
新的 AMQP 0-9-1 方法類可被引入。)
4)訊息代理可以被其他的外掛擴充套件,例如 RabbitMQ 的管理前端 和 已經被外掛化的 HTTP API。
這些特性使得 AMQP 0-9-1 模型更加靈活,並且能夠適用於解決更加寬泛的問題。
AMQP 0-9-1 客戶端生態系統
AMQP 0-9-1 擁有眾多的適用於各種流行語言和框架的客戶端。其中一部分嚴格遵循 AMQP 規範,提供 AMQP 方法的實現。另一部分提供了額外的技術,方便使用的方法和抽象。有些客戶端是非同步的(非阻塞的),有些是同步的(阻塞的),有些將這兩者同時實現。有些客戶端支援 “供應商的特定擴充套件”(例如 RabbitMQ 的特定擴充套件)。
因為 AMQP 的主要目標之一就是實現互動性,所以對於開發者來講,瞭解協議的操作方法而不是隻停留在弄懂特定客戶端的庫就顯得十分重要。這樣一來,開發者使用不同型別的庫與協議進行溝通時就會容易的多。
參考資料
[1]:RabbitMQ中文文件
[2]:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html