1. 程式人生 > >理解RabbitMQ中的AMQP-0-9-1模型

理解RabbitMQ中的AMQP-0-9-1模型

前提

之前有個打算在學習RabbitMQ之前,把AMQP詳細閱讀一次,挑出裡面的重點內容。後來找了下RabbitMQ的官方文件,發現了有一篇文件專門介紹了RabbitMQ中實現的AMQP模型部分,於是直接基於此文件和個人理解寫下這篇文章。

AMQP協議

AMQP全稱是Advanced Message Queuing Protocol,它是一個(分散式)訊息傳遞協議,使用和符合此協議的客戶端能夠基於使用和符合此協議的訊息傳遞中介軟體代理(Broker,也就是經紀人,個人感覺叫代理合口一些)進行通訊。AMQP目前已經推出協議1.0,實現此協議的比較知名的產品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ實現的AMQP版本是0.9.1,官方文件中也提供了該協議pdf文字下載,有興趣可以翻閱一下。

訊息中介軟體代理的職責

Messaging Broker,這裡稱為訊息中介軟體代理。它的職責是從釋出者(Publisher,或者有些時候稱為Producer,生產者)接收訊息,然後把訊息路由到消費者(Consumer,或者有些時候稱為Listener,監聽者)。

因為訊息中介軟體代理、釋出者客戶端和消費者客戶端都是基於AMQP這一網路訊息協議,所以訊息中介軟體代理、釋出者客戶端和消費者客戶端可以在不同的機器上,從而實現分散式通訊和服務解耦。

訊息中介軟體代理不僅僅提供了訊息接收和訊息路由這兩個基本功能,還有其他高階的特性如訊息持久化功能、監控功能等等。

AMQP-0-9-1在RabbitMQ中的基本模型

AMQP-0-9-1模型的基本檢視是:訊息釋出者訊息釋出到交換器(Exchange)中,交換器的角色有點類似於日常見到的郵局或者信箱。然後,交換器把訊息的副本分發到佇列(Queue)中,分發訊息的時候遵循的規則叫做繫結(Binding)。接著,訊息中介軟體代理向訂閱佇列的消費者傳送訊息(push模式),或者消費者也可以主動從佇列中拉取訊息(fetch/pull模式)。

釋出者在釋出訊息的時候可以指定訊息屬性(訊息元資料),某些訊息元資料可能由訊息中介軟體代理使用,其他訊息元資料對於訊息中介軟體代理而言是不透明的,僅供訊息消費者使用。

由於網路是不可靠的,客戶端可能無法接收訊息或者處理訊息失敗,這個時候訊息中介軟體代理無法感知訊息是否正確傳遞到消費者中,因此AMQP模型提供了訊息確認(Message Acknowledgement)的概念:當訊息傳遞到消費者,消費者可以自動向訊息中介軟體代理確認訊息已經接收成功或者由應用程式開發者選擇手動確認訊息已經接收成功並且向訊息中介軟體代理確認訊息,訊息中介軟體代理只有在接收到該訊息(或者訊息組)的確認通知後才會從佇列中完全刪除該訊息。

在某些情況下,交換器無法正確路由到佇列中,那麼該訊息就會返回給釋出者,或者丟棄,或者如果訊息中介軟體代理實現了"死信佇列(Dead Letter Queue)"擴充套件,訊息會被放置到死信佇列中。訊息釋出者可以選擇使用對應的引數控制路由失敗的處理策略。

交換器和交換器型別

互動器(Exchange)是訊息傳送的第一站目的地,它的作用就是就收訊息並且將其路由到零個或者多個佇列。路由訊息的演算法取決於互動器的型別和路由規則(也就是Binding)。RabbitMQ訊息中介軟體代理支援四種類型的互動器,分別是:

交換器型別 Broker預設預宣告的交換器
Direct (空字串[(AMQP default)])和amq.direct
Fanout amq.fanout
Topic amq.topic
Headers amq.match (和RabbitMQ中的amq.headers)

宣告互動器的時候需要提供一些列的屬性,其中比較重要的屬性如下:

  • Name:互動器的名稱。
  • Type:交換器的型別。
  • Durability:(交換器)持久化特性,如果啟動此特性,則Broker重啟後交換器依然存在,否則交換器會被刪除。
  • Auto-delete:是否自動刪除,如果啟用此特性,當最後一個佇列解除與交換器的繫結關係,交換器會被刪除。
  • Arguments:可選引數,一般配合外掛或者Broker的特性使用。

之所以存在Durability和Auto-delete特性是因為併發所有的場景和用例都要求互動器是持久化的。

Direct交換器

Direct型別的交換器基於訊息路由鍵(RoutingKey)把訊息傳遞到佇列中。Direct交換器是訊息單播路由的理想實現(當然,用於多播路由也可以),它的工作原理如下:

  • 佇列使用路由鍵K繫結到交換器。
  • 當具有路由鍵R的新訊息到達交換器的時候,如果K = R,那麼交換器會把訊息傳遞到佇列中。

預設交換器

預設交換器(Default Exchange)是一種特殊的Direct互動器,它的名稱是空字串(也就是""),它由訊息中介軟體代理預宣告,在RabbitMQ Broker中,它在Web管理介面中的名稱是(AMQP default)。每個新建立的佇列都會繫結到預設交換器,路由鍵就是該佇列的佇列名,也就是所有的佇列都可以通過預設交換器進行訊息投遞,只需要指定路由鍵為相應的佇列名即可。

Fanout交換器

Fanout其實是一個組合單詞,fan也就是扇形,out就是向外發散的意思,Fanout交換器可以想象為"扇形"交換器。Fanout交換器會忽略路由鍵,它會路由訊息到所有繫結到它的佇列。也就是說,如果有N個佇列繫結到一個Fanout交換器,當一個新的訊息釋出到該Fanout交換器,那麼這條新訊息的一個副本會分發到這N個佇列中。Fanout交換器是訊息廣播路由的理想實現。

Topic交換器

Topic交換器基於路由鍵和繫結佇列和交換器的模式進行匹配從而把訊息路由到一個或者多個佇列。繫結佇列和交換器的Topic模式(這個模式串其實就是宣告繫結時候的路由鍵,和訊息釋出的路由鍵並非同一個)一般使用點號(dot,也就是'.')分隔,例如source.target.key,繫結模式支援萬用字元:

  • 符號'#'匹配一個或者多個詞,例如:source.target.#可以匹配source.target.dogesource.target.doge.throwable等等。
  • 符號'*'只能匹配一個詞,例如:source.target.*可以匹配source.target.dogesource.target.throwable等等。

對每一條訊息,Topic交換器會遍歷所有的繫結關係,檢查訊息指定的路由鍵是否匹配繫結關係中的路由鍵,如果匹配,則將訊息推送到相應佇列。

Topic交換器是訊息多播路由的理想實現。

Headers交換器

Headers交換器是一種不常用的交換器,它使用多個屬性進行路由,這些屬性一般稱為訊息頭,它不使用路由鍵進行訊息路由。訊息頭(Message Headers)是訊息屬性(訊息元資料)部分,因此,使用Headers交換器在建立佇列和交換器的繫結關係的時候需要指定一組鍵值對,傳送訊息到Headers交換器時候,需要在訊息屬性中攜帶一組鍵值對作為訊息頭。訊息頭屬性支援匹配規則x-match如下:

  • x-match = all:表示所有的鍵值對都匹配才能接受到訊息。
  • x-match = any:表示只要存在鍵值對匹配就能接受到訊息。

Headers交換器也是忽略路由鍵的,只依賴於訊息屬性中的訊息頭進行訊息路由。

佇列

AMQP 0-9-1模型中的佇列與其他訊息或者任務佇列系統中的佇列非常相似:它們儲存應用程式所使用的訊息。佇列和交換器的基本屬性有類似的地方:

  • Name:佇列名稱。
  • Durable:是否持久化,開啟持久化意味著訊息中介軟體代理重啟後佇列依然存在,否則佇列會被刪除。
  • Exclusive:是否獨佔的,開啟佇列獨佔特性意味著佇列只能被一個連線使用並且連線關閉之後佇列會被刪除。
  • Auto-delete:是否自動刪除,開啟自動刪除特性意味著佇列至少有一個消費者並且最後一個消費者解除訂閱狀態(一般是消費者對應的通道關閉)後佇列會自動刪除。
  • Arguments:佇列引數,一般和訊息中介軟體代理或者外掛的特性相關,如訊息的過期時間(Message TTL)和佇列長度等。

一個佇列只有被宣告(Declare)了才能使用,也就是佇列的第一次宣告就是佇列的建立操作(因為第一次宣告的時候佇列並不存在)。如果使用相同的引數再次宣告已經存在的佇列,那麼此次宣告會不生效(當然也不會出現異常)。但是如果使用不相同的引數再次宣告已經存在的佇列,那麼會丟擲通道級別的異常,異常程式碼是406(PRECONDITION_FAILED)。

佇列名稱

佇列名必須由255位元組(bytes)長度以內的UTF-8編碼字元組成。實現AMQP 0-9-1規範的訊息中介軟體代理具備自動生成隨機佇列名的功能,也就是在宣告佇列的時候,佇列名指定為空字串,那麼訊息中介軟體代理會自動生成一個佇列名,並且在佇列宣告的返回結果中帶上對應的佇列名。

以"amq."開頭的佇列是由訊息中介軟體代理內部生成的,有其特殊的作用,因此不能宣告此類名稱的新佇列,否則會導致通道級別的異常,異常程式碼為403(ACCESS_REFUSED)。

佇列的持久化特性

持久化的佇列會持久化到磁碟中,這種佇列在訊息中介軟體代理重啟後不會被刪除。不開啟持久化特性的佇列稱為瞬時(transient)佇列,並非所有的場景都需要開啟佇列的持久化特性。

佇列的持久化特性並不意味著路由到它上面的訊息是持久化的,也就是佇列的持久化跟訊息的持久化是兩回事。如果息中介軟體代理掛了,它重啟後會重新宣告開啟了持久化特性的佇列,這些佇列中只有使用了訊息持久化特性的訊息會被恢復。

繫結

繫結(Binding)是交換器路由訊息到佇列的規則。例如交換器E可以路由訊息到佇列Q,那麼Q必須通過一定的規則繫結到E。繫結中使用的某些交換器的型別決定了它可以使用可選的路由鍵(RoutingKey)。路由鍵的作用類似於過濾器,可以篩選某些釋出到交換器的訊息路由到目標佇列。

如果釋出的訊息沒有路由到任意一個目標佇列,例如,訊息已經發布到交換器,交換器中沒有任何繫結,這個時候訊息會被丟棄或者返回給釋出者,取決於訊息釋出者釋出訊息時候使用的引數。

消費者

如果佇列只有釋出者生產訊息,那麼是沒有意義的,必須有消費者對訊息進行使用,或者叫這個操作為訊息消費,訊息消費的方式有兩種:

  • 訊息代理中介軟體向消費者推送訊息(推模式,代表方法是basic.consume)。
  • 消費者主動向訊息代理中介軟體拉取訊息(拉模式,代表方法是basic.get)。

使用推模式的情況下,消費者必須指定需要訂閱的佇列。每個佇列可以存在多個消費者,或者僅僅註冊一個獨佔的消費者。

每個消費者(訂閱者)都有一個稱為消費者標籤(consumer tag)的識別符號,消費者標籤是一個字串。通過消費者標籤可以實現取消訂閱的操作。

訊息確認

消費者應用程式有可能在接收和處理訊息的時候崩潰,也有可能因為網路原因導致訊息中介軟體代理投遞訊息到消費者的時候失敗了,這樣就會催生一個問題:AMQP訊息中介軟體代理應該在什麼時候從佇列中刪除訊息?因此,AMQP 0-9-1規範提供了兩種選擇:

  • 訊息中介軟體代理嚮應用程式傳送訊息(使用AMQP方法basic.deliverbasic.get-ok)。
  • 應用程式收到訊息後向訊息中介軟體代理髮送確認(使用AMQP方法basic.ack <= 個人感覺這個地方少寫了basic.nackbasic.reject)

前一種稱為自動確認模型(動作觸發的同時進行了訊息確認),後一種稱為顯式確認模型。顯式確認模型中,需要消費者主動向訊息中介軟體代理進行訊息主動確認,這個訊息主動確認動作的執行時機完全由應用程式控制。訊息主動確認有三種方式:積極確認(ack)、消極確認(nack)和拒絕(reject)。

預取訊息

預取訊息(Prefetching Messages)是一個特性。對於多個消費者共享同一個佇列的情況,能夠告知訊息中介軟體代理在傳送下一個確認之前指定每個消費者一次可以接收訊息的訊息量。這個特性可以理解為簡單的負載均衡技術,在批量釋出訊息的場景下能夠提高吞吐量。

訊息屬性和有效負載

AMQP模型中,訊息具有屬性值。AMQP 0-9-1規範定義了一些常見的屬性,一般開發人員不需要太關注這些屬性:

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode (persistent or not)
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

這些通用的屬性一般是訊息中介軟體代理使用的,還有可以定製的可選屬性header,形式是鍵值對,類似於HTTP中的請求頭。訊息屬性是在釋出訊息的時候設定的。

AMQP訊息還有一個有效載荷(payload,其實就是訊息資料體),AMQP代理將其視為不透明的位元組陣列,也就是AMQP代理不會檢查或者修改訊息的有效載荷。有些訊息可能只包含屬性而沒有有效負載。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)來序列化和結構化資料,以便將其作為訊息有效負載釋出。在一般約定下,訊息屬性中的Content typeContent encoding一般可以表明其序列化的方式。

訊息釋出支援訊息的持久化特性,訊息持久化特性開啟後,訊息中介軟體代理會把訊息儲存到磁碟中,如果重啟代理訊息也不會丟失。開啟訊息持久化特性將會影響效能,主要是因為涉及到刷盤操作。

AMQP-0-9-1方法

AMQP 0-9-1定義了一些方法,對應了客戶端和訊息中介軟體代理之間互動的一些操作方法,這些操作方法的設計跟面向物件程式語言中的方法沒有任何共同之處。常用的交換器相關的操作方法有:

  • exchange.declare
  • exchange.declare-OK
  • exchange.delete
  • exchange.delete-OK

在邏輯上,上面幾個操作方法在客戶端和訊息中介軟體代理之間的互動如下:

對於佇列,也有類似的操作方法:

  • queue.declare
  • queue.declare-OK
  • queue.delete
  • queue.delete-OK

並非所有的AMQP操作方法都有響應結果操作方法,像訊息釋出方法basic.publish的使用是最廣泛的,此操作方法沒有對應的響應結果操作方法。有些操作方法可能有多個響應結果(操作方法),例如basic.get

連線(Connection)

AMQP的連線(Connection)通常是長期存在的。AMQP是一種使用TCP進行可靠傳遞的應用程式級協議。AMQP連線使用使用者身份驗證,可以使用TLS(SSL)進行保護。當應用程式不再需要連線到AMQP代理時,它應該正常關閉AMQP連線,而不是突然關閉底層TCP連線。

通道(Channel)

某些應用程式需要與AMQP代理程式建立多個連線。但是,不希望同時開啟許多TCP連線,因為這樣做會消耗系統資源並使配置防火牆變得十分困難。通道(Channel)可以認為是"共享一個單獨的TCP連線的輕量級連線",一個AMQP連線可以擁有多個通道。

對於使用了多執行緒處理的應用程式,有一種使用場景十分普遍:每個執行緒開啟一個新的通道使用,這些通道是執行緒間隔離的。

另外,每個特定的通道和其他通道是相互隔離的,每個執行的AMQP操作方法(包括響應)都攜帶一個通道的唯一標識,這樣客戶端就能通過該通道的唯一標識得知操作方法是對應哪個通道發生的。

虛擬主機(Virtual Host)

為了使單個訊息中介軟體代理可以託管多個完全隔離的"環境"(這裡的隔離指的是使用者組、互動器、佇列等),AMQP提供了虛擬主機(Virtual Host)的概念。多個虛擬主機類似於許多主流的Web伺服器的虛擬主機,提供了AMQP元件完全隔離的環境。AMQP客戶端可以在連線訊息中介軟體代理時指定需要連線的虛擬主機。

個人理解

關於Exchange、Queue和Binding

理解RabbitMQ中的AMQP模型,其實從開發者的角度來看,最重要的是Exchange、Queue、Binding三者的關係,這裡談談個人的見解。訊息的釋出第一站總是Exchange,從模型上看,訊息釋出無法直接傳送到佇列中。Exchange本身不儲存訊息,它在接收到訊息之後,會基於路由規則也就是Binding,把訊息路由到目標Queue中。從實際操作來看,宣告路由規則總是在釋出訊息和消費訊息之前,也就是一般步驟如下:

  • 1、宣告Exchange。
  • 2、宣告Queue。
  • 3、基於Exchange和Queue宣告Binding,這個過程有可能自定義一個RoutingKey。
  • 4、通過Exchange訊息釋出,這個過程有可能使用到上一步定義的RoutingKey。
  • 5、通過Queue消費訊息。

我們最關注的兩個階段,訊息釋出和訊息消費中,訊息釋出實際上只跟Exchange有關,而訊息消費實際上只跟Queue有關。Binding實際上就是Exchange和Queue的契約關係,會直接影響訊息釋出階段的訊息路由。那麼,路由失敗一般是什麼情況導致的?路由失敗,其實就是訊息已經發布到Exchange,而Exchange中從既有的Binding中無法找到存在的目標Queue用於傳遞訊息副本(一般而言,很少人會發送訊息到一個不存在的Exchange)。訊息路由失敗,從理解AMQP的模型來看,可以從根本上避免的,除非是訊息釋出者故意胡亂使用或者人為錯誤使用了未存在的RoutingKey、Exchange或者說是Binding關係而導致的。

關於Exchange的型別

AMQP-0-9-1模型中支援了四種交換器direct(單播)、fanout(廣播)、topic(多播)、headers,實際上,從使用者角度來看,四種交換器的功能是可以相互取代的。例如可以使用fanout型別交換器實現廣播,其實使用direct型別交換器也是可以實現廣播的,只是對應的direct型別交換器需要通過多個路由鍵繫結到多個目標佇列中。在面對生產環境的技術選型的時候,我們需要考慮效能、維護難度、合理性等角度去考慮選擇什麼型別的交換器,就上面的廣播訊息的例子,顯然使用fanout型別交換器可以避免宣告多個繫結關係,這樣在效能、合理性上是更優的選擇。

關於負載均衡

在AMQP-0-9-1模型中,負載均衡的實現是基於消費者而不是基於佇列(準確來說應該是訊息傳遞到佇列的方式)。實際上,出現訊息生產速度大大超過消費者的消費速度的時候,佇列中有可能會出現訊息積壓。AMQP-0-9-1模型中沒有提供基於佇列負載均衡的特性,也就是出現訊息生產速度大大超過消費者的消費速度時候,並不會把訊息路由到多個佇列中,而是通過預取訊息(Prefetching Messages)的特性,確定訊息者的消費能力,從而調整訊息中介軟體代理推送訊息到對應消費者的數量,這樣就能夠實現消費速度快的消費者能夠消費更多的訊息,減少產生有消費者處於飢餓狀態和有消費者長期處於忙碌狀態的問題。

關於訊息確認機制

AMQP中提供的訊息確認機制主要包括積極確認(一般叫ack,Acknowledgement)、消極確認(一般叫nack,Negative Acknowledgement)和拒絕(reject)。訊息確認機制是保證訊息不丟失的重要措施,當消費者接收到訊息中介軟體代理推送的訊息時候,需要主動通知訊息中介軟體代理訊息已經確認投遞成功,然後訊息中介軟體代理才會從佇列中刪除對應的訊息。沒有主動確認的訊息就會變為"nack"狀態,可以想象為暫存在佇列的"nack區"中,這些訊息不會投遞到消費者,直到消費者重啟後,"nack區"中的訊息會重新變為"ready"狀態,可以重新投遞給消費者。關於訊息確認機制其實場景比較複雜,後面再做一篇文章專門分析。

小結

參考資料:

  • AMQP 0-9-1 Model Explained

個人部落格

  • Throwable's Blog

(本文完 e-a-20181125 c-7-d