1. 程式人生 > 其它 >[RabbitMQ]AMQP 0-9-1:模型

[RabbitMQ]AMQP 0-9-1:模型

上一篇文章(RabbitMQ:下載 & 安裝)中,我們下載並且安裝了RabbitMQ,並且成功註冊了RabbitMQ服務。本文我們將學習RabbitMQ中最基礎、最重要的概念:AMQP 0-9-1協議模型。

0 前言

要學好一項技術,千萬不要死記硬背那些呼叫API,而是要理解它的執行邏輯。

RabbitMQ的執行邏輯本質上是它所支援的通訊協議。

RabbitMQ支援很多通訊協議,包括AMQP 0-9-1、AMQP 1.0、MQTT和STOMP等。其中,最重要、最常用的是AMQP 0-9-1(預設)。我們只需要充分理解這個協議,就能夠解決日常工作中絕大部分RabbitMQ相關的問題。

AMQP(Advanced Message Queuing Protocol,高階訊息佇列協議)是二進位制訊息協議,即底層傳輸的訊息是二進位制資料。它的版本劃分方式為major-minor[-revision]

major.minor[.revision]。AMQP 0-9-1是AMQP的0-9-1版本(major=0,minor=9,rivision=1)。

從本質上來看,AMQP 0-9-1具備兩個核心功能:

  • 接收訊息。
  • 轉發訊息。

為了實現這兩個功能,AMQP 0-9-1提出了兩方面規範:

  • 在服務端層面,提出了AMQ Model(Advanced Message Queuing Protocol
    Model,高階訊息佇列協議模型):由一系列具備路由/儲存訊息功能的元件,以及元件之間互動的規則組成。
  • 在網路傳輸層面,提出了在AMQ模型下客戶端與服務端以及元件之間互動的規範,具體可以分成兩部分:
    • 方法層(Functional Layer):規範了客戶端與服務端之間、AMQ模型之間的命令。
    • 傳輸層(Transport Layer):規範了二進位制資料(包括命令和訊息)的傳輸格式。

因此,學習AMQP 0-9-1的核心內容可以分成上述三個部分:AMQ 模型、方法和傳輸資料格式。

本文介紹的是第一部分:AMQ 模型。其餘兩部分可檢視後續文章。

1 AMQ模型

再次回顧AMQ Model的定義:由一系列具備路由/儲存訊息功能的元件,以及元件之間互動的規則組成。

其中,核心元件包括:

  • Exchange(交換機)。
  • Message Queue(訊息佇列)。
  • Binding(繫結)

其他重要的基本概念包括:Message、Connection、Channel以及Virtual Host。

訊息中介軟體工作方式是典型的C/S模式,通常將伺服器中執行的訊息中介軟體程式稱為Broker,而釋出和接收訊息的客戶端應用程式分別稱為Publisher和Consumer。

AMQ Model的元件就工作於Broker的訊息中介軟體(如rabbitmq-server)程序中。為了方便統一管理,在建立時需要為這些元件指定對應的Virtual Host(類似於Java中的包)。

因此,AMQ模型的結構圖以及工作流程如下:

  1. Publisher和Consumer使用客戶端工具(如rabbitmq的amqp-client),通過TCP/IP協議連線到指定的Broker。
  2. 服務端或客戶端建立Exchange和Queue,並使用Binding元件進行繫結。
  3. Consumer監聽Queue。
  4. Publisher指定某個Virtual Host下的Exchange和路由規則,傳送訊息。
  5. Broker中對應的Exchange接收到訊息後,根據路由規則將訊息轉發給所繫結的某個/某些Queue。
  6. Queue將訊息推送給正在監聽它的Consumer。

AMQP協議的主要特點是將路由和儲存訊息的功能分給交給Exchange和Message Queue元件,而在AMQP之前的訊息佇列協議通常將路由和儲存訊息定義在同一個功能元件中。這樣做的好處在於可以實現更靈活、多樣和健壯的訊息中介軟體系統。

2 Exchange

2.1 工作流程

Exchange的作用是:接收從Publisher發來的訊息,然後根據路由規則轉發訊息給Message Queue。

需要注意的是,在這個過程中Exchange並不會儲存訊息。

如果能找到匹配的Message Queue,那麼訊息就能夠成功轉發。但是如果匹配失敗,Exchange會根據自身是否存在替補交換機(Alternate Exchange)進行重新分發訊息,也會根據Publisher釋出訊息是否指定為mandatory進行丟棄或返回給Publisher。

同時,為了保證資料安全,如果手動開啟了Publisher確認機制,當訊息被Broker中的Exchange接收時會返回一個確認訊息basic.ack,如果沒有Exchange能夠接收則會響應異常資訊。後續文章會深入討論確認機制的細節。

因此,考慮到所有常見情況,RabbitMQ中Exchange的基本工作流程是:

  1. Broker監聽伺服器的5682(non-ssl)或5671(ssl)埠。
  2. Publisher指定Exchange和路由規則,傳送訊息給Broker。
  3. Broker接收到訊息:
    1. 未找到Exchange:響應Channel級別異常給Publisher,終止流程。
    2. 找到Exchange:分發訊息給對應的Exchange,同時響應basic.ack給Publisher,進行下一步。
  4. Exchange解析路由規則,遍歷Binding列表:
    1. 找到Message Queue:轉發訊息到對應的Message Queue,終止流程。
    2. 未找到Message Queue,判斷是否存在替補交換機:
      1. 存在:重新分發訊息給替補交換機,替補交換機按照自己的方式解析路由規則。
      2. 不存在,判斷mandatory屬性:
        1. false,丟棄訊息,終止流程。
        2. true:返回訊息給Publisher,終止流程。

2.2 交換機型別

Exchange本質上是一種路由演算法,它會分析釋出訊息時指定的routingKey或headers,從Binding列表中找到匹配的Queue進行轉發。

為了適應不同的路由情況,AMQP 0-9-1預先定義了幾種不同型別的Exchange,分別對應不同的路由演算法:

  • Direct Exchange
  • Faout Exchange
  • Topic Exchange
  • Headers Exchange
  • System Exchange

這些交換機型別足以滿足日常工作中的各種場景。

RabbitMQ實現了前四種,並在Broker中預設建立瞭如下Exchange例項:

  • (AMQP default):Direct Exchange,釋出訊息不指定交換機時會預設傳送到該交換機。
  • amq.direct:Direct Exchange。
  • amq.faout:Faout Exchange。
  • amq.headers:Headers Exchange。
  • amq.match:Headers Exchange。
  • amq.rabbitmq.trace:Topic Exchange。
  • amq.topic:Topic Exchange。

1、Direct Exchange

Direct Exchange工作原理:

  1. Queue繫結到Direct Exchange時需要指定routingKey
  2. Publisher釋出訊息給Direct Exchange時需要指定routingKey的值。
  3. Direct Exchange接收到訊息時,會執行如下路由演算法:
    1. 讀取routingKey值。
    2. 遍歷Binding列表,找到所有routingKey匹配的Queue。
    3. 轉發訊息給匹配Queue。

很明顯,上面原理圖中,Direct Exchange會將訊息轉發給routingKey=red的Queue。

特別需要注意的是,Direct Exchange路由演算法的唯一指標是routingKey。同時,它會將訊息轉發給所有匹配的Queue,而不是說找到了一個匹配的Queue就停止遍歷了。

RabbitMQ預設建立了名為 (空字串)的預設Direct Exchange,它會自動與所有Queue繫結,並指定routingKey為佇列名。因此,在向預設交換機發送訊息時可以直接將routingKey指定為佇列名。

此外,日常工作中也習慣將routingKey與佇列名使用相同值,所以容易引起混淆,讓人們誤以為Direct Exchange是根據佇列名進行路由的。

2、Faout Exchange

Faout Exchange工作原理:

  1. Queue繫結到Faout Exchange時不需要指定引數。
  2. Publisher釋出訊息給Faout Exchange時也不需要指定引數。
  3. 遍歷Binding列表,找到與之繫結的Queue。
  4. Faout Exchange會無條件將訊息轉發給所有與它繫結的Queue。

很明顯,上面原理圖中,Faout Exchange會將訊息同時轉發給與它繫結的三個Queue。

特別需要注意的是,Faout Exchange路由演算法沒有路由指標,它會將訊息轉發給所有與它繫結的Queue。

Faout Exchange的原理與計算機網路中的組播類似,通常用於實現釋出/訂閱場景。

3、Topic Exchange

Topic Exchange工作原理:

  1. Queue繫結到Topic Exchange時需要指定routingKey,其值通常為以“.”分隔的多個單詞。使用萬用字元#*進行模糊匹配:
    1. #:匹配零或多個單詞。
    2. *:匹配一個單詞。
  2. Publisher釋出訊息給Topic Exchange時需要指定routingKey,其值為確定值(即沒有萬用字元的概念)。
  3. Topic Exchange接收到訊息時,會執行如下路由演算法:
    1. 讀取routingKey值。
    2. 遍歷Binding列表,找到所有routingKey匹配的Queue。
    3. 轉發訊息給匹配Queue。

很明顯,上面原理圖中,Topic Exchange會將兩條訊息都轉發給routingKey=#.black.*的Queue。

特別需要注意的是,Topic Exchange路由演算法的唯一指標也是routingKey。同時,它會將訊息轉發給所有匹配的Queue,而不是說找到了一個匹配的Queue就停止遍歷了。

將Direct Exchange和Topic Exchange進行對比,可以很明顯地發現:

  • Direct Exchange是低配版的Topic Exchange,routingKey與Queue之間為一對一關係:一個Queue只能接收routingKey唯一對應的訊息。
  • Topic Exchange是高配版的Direct Exchange,routingKey與Queue之間為多對一關係:一個Queue可以接收多種routingKey的訊息。

4、Headers Exchange

Headers Exchange工作原理:

  1. Queue繫結到Headers Exchange時需要指定arguement作為匹配條件,其值為key-value鍵值對。多個key-value鍵值對時,可以使用x-match指定多條件匹配關係:
    1. all:所有key-value鍵值對都要匹配才會進行轉發,即匹配條件之間為“且”的關係。預設值。
    2. any:只要有一個key-value鍵值對匹配就會進行轉發,即匹配條件之間為“或”的關係。
  2. Publisher釋出訊息給Headers Exchange時需要指定headers,此時不需要新增x-match
  3. headers Exchange接收到訊息時,會執行如下路由演算法:
    1. 讀取請求headers
    2. 遍歷Binding列表,讀取繫結arguement
    3. 判斷繫結arguementx-match值:
      1. all或沒有宣告x-match:繫結arguement中所有key-value在請求headers中都存在且匹配則成功,否則失敗。
      2. any:繫結arguement中只要有一個key-value鍵值對在請求headers中存在且匹配就成功,所有繫結arguement的key-value鍵值對在請求headers中都不存在或不匹配才失敗。
    4. 轉發訊息給匹配Queue。

很明顯,上面原理圖中,Headers Exchange會將訊息轉發給bigOrBlackblack佇列。

5、System Exchange

System Exchange的工作原理為:

  1. Publisher向System Exchange傳送routingKey=S的訊息。
  2. System Exchange會將該訊息轉發給名為S的系統服務。

RabbitMQ預設沒有支援該型別交換機,所以在這裡不進行過多講解。

2.3 交換機屬性

通過匯出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到交換機資料結構的儲存格式如下:

"exchanges": [
  {
    "name": "test.exchange",
    "vhost": "/",
    "type": "direct",
    "durable": true,
    "auto_delete": false,
    "internal": false,
    "arguments": {
      "alternate-exchange": "amq.direct",
      "testArgumentsName": "testArgumentsValue"
    }
  }
]
  • exchanges:存放交換機例項的陣列,內部每一個物件表示一個交換機例項。
  • name:交換機名字。
  • vhost:交換機所屬Virtual Host。
  • type:交換機型別,RabbitMQ中可選值為directfaouttopicheaders
  • durable:是否可以持久化,可選值為true(持久化)和false(非持久化)。持久化交換機會儲存到本地磁碟,Broker重啟後能獲取原有交換機資料。
  • auto_delete:是否自動刪除,可選值為truefalse
    • true:當該交換機沒有與之繫結的訊息佇列時,會被自動刪除。
    • false:當該交換機沒有與之繫結的訊息佇列時,不會被刪除,仍然可以獨立存在。
  • internal:是否時內部使用的交換機,可選值為truefalse
    • true:內部使用交換機,Publisher不能指定傳送訊息給內部交換機。
    • false:外部使用交換機,Publisher可以將訊息傳送給外部交換機。通常我們宣告的都是外部使用交換機。
  • arguments:可選引數,內部為key-value鍵值對,可用於完成Exchange的特定功能。例如,alternate-exchange可指定替補交換機。

3 Message Queue

3.1 工作流程

Message Queue是FIFO(First In First Out,先進先出)佇列,它的作用是:

  • 接收訊息(from Exchange)
  • 儲存訊息
  • 傳送訊息(to Consumer)

RabbitMQ中Message Queue的基本工作流程是:

  1. Message Queue接收到Exchange轉發的訊息後,將訊息儲存到記憶體/磁碟中。
  2. Consumer通過訂閱/拉取的方式向Message Queue獲取訊息。
  3. Message Queue將佇列頭部訊息複製併發送給Consumer。
  4. 在開啟手動確認模式後,Consumer會響應ack/reject/nack給Message Queue(可以在業務處理之前或之後):
    1. ack:確認接收並處理訊息(Message Queue會刪除佇列中儲存的該訊息)。
    2. reject:拒絕一條訊息。
    3. nack:拒絕一條/多條訊息。
  5. 接收reject/nack響應後,Message Queue會根據響應是否requeue進行下一步處理:
    1. true:不刪除佇列中該訊息,之後可以將該訊息發給另外的Consumer處理。
    2. false:刪除佇列中該訊息,可能會造成訊息丟失。
  6. 接收reject/nack響應後,如果設定x-dead-letter-exchange還可以重新轉發給替補交換機。

3.2 訊息佇列屬性

通過匯出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到訊息佇列資料結構的儲存格式如下(其中不包含訊息佇列的內容):

"queues": [
    {
        "name": "testQueue",
        "vhost": "/",
        "durable": true,
        "auto_delete": false,
        "arguments": {
            "x-queue-type": "classic"
        }
    }
]
  • queues:存放訊息佇列例項的陣列,內部每一個物件表示一個訊息佇列例項。

  • name:訊息佇列名字。

  • vhost:訊息佇列所屬Virtual Host。

  • durable:是否可以持久化,可選值為true(持久化)和false(非持久化)。持久化訊息佇列會儲存到本地磁碟,Broker重啟後能獲取原有訊息佇列資料(包括其中的訊息)。

  • exclusive:是否排他,可選值為truefalse

    • true:訊息佇列獨屬於當前Channel,Channel關閉時訊息佇列會被刪除。
    • false:訊息佇列可以多個Channel共享。
  • auto_delete:是否自動刪除,可選值為truefalse

    • true:當沒有Consumer訂閱該訊息佇列時,會被自動刪除。
    • false:當沒有Consumer訂閱該訊息佇列時,不會被刪除,仍然可以獨立存在。
  • arguments:可選引數,內部為key-value鍵值對,可用於完成Message Queue的特定功能。例如:

    • x-message-ttl:沒有被Consumer消費的情況下,訊息能夠在佇列中存活的時間(毫秒)。
    • x-expires:沒有被Consumer訂閱的情況下,訊息佇列能夠存活的時間(毫秒)。
    • x-single-active-consumer:值為true/false,一次只有一個Consumer從佇列中獲取訊息。
    • x-dead-letter-exchange:指定訊息過期或被reject(死信)後,用來重新轉發訊息的交換機的名字。
    • x-dead-letter-routing-key:死信在交換機路由過程中所使用的routingKey,如果沒有指定則使用原先的routingKey
    • x-max-length:訊息佇列最大能存放訊息的數量。
    • x-max-length-bytes:訊息佇列最大能存放訊息的位元組長度。
    • x-max-priority:訊息佇列能提供的最小優先順序。如果沒有指定則不能提供訊息優先順序功能。
    • x-queue-mode:是否為懶模式:
      • lazy:訊息佇列會盡可能將訊息儲存到磁碟,而不是記憶體中。
      • default(未設定):訊息佇列儘可能將訊息儲存到記憶體中,以實現訊息的快速傳送。
    • x-queue-master-locator:設定訊息佇列叢集中主節點的地址。
    • x-queue-type:設定訊息佇列的型別,可選值為classicquorumstream

4 Binding

Exchange和Message Queue並沒有儲存對方的資訊,那麼Exchange在轉發過程中是如何找到正確的Message Queue的呢?這需要藉助Binding元件。

Binding中儲存著sourcedestination屬性,可以將交換機作為訊息源,交換機/訊息佇列作為轉發地址。當交換機路由訊息時,會遍歷Binding陣列,找到source為自身的繫結關係,判斷訊息屬性是否滿足routing_keyarguments進行轉發。

通過匯出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到Binding資料結構的儲存格式如下:

"bindings": [
    {
        "source": "amq.headers",
        "vhost": "/",
        "destination": "bigAndBlue",
        "destination_type": "queue",
        "routing_key": "",
        "arguments": {
            "color": "blue",
            "size": "big",
            "x-match": "all"
        }
    }
]
  • source:資料來源,只能是交換機。
  • vhost:繫結關係所屬Virtual Host。。
  • destination:資料轉發地址,可以是交換機或訊息佇列。
  • destination_type:資料轉發地址型別。queue-訊息佇列,exchange-交換機。
  • routing_keyroutingKey
  • arguments:路由引數。

5 Message

AMQP 0-9-1中傳輸資料的基本結構是Frame(幀),分成三類:

  • 方法幀(Method Frame),RabbitMQ伺服器會按照以下步驟執行方法:
    1. 讀取方法幀載荷。
    2. 解析載荷到對應資料結構。
    3. 校驗許可權和引數格式。
    4. 執行方法。
  • 內容幀(Content Frame),封裝具體業務訊息,包含訊息頭幀(content header frame)和訊息體幀(content body frame)。
  • 心跳幀(Heartbeat Frame),維持TCP/IP連線的心跳資料。

6 Connection & Channel

簡單來說,Connection表示客戶端與RabbitMQ伺服器一個TCP連線。

為了對TCP連線進行多路複用,一個Connection內部可以建立多個Channel,用於不同的業務。客戶端中不同Channel在傳送或接收資料時,使用的都是同一個Connection

7 Virtual Host

Virtual Host的作用類似Java中Package(包)的概念,它的作用是:建立名稱空間,用來分隔交換機和訊息佇列。我們可以在不同Virtual Host下宣告同名的交換機或訊息佇列。

Virtual Host本質上只是一個字串:

"vhosts": [
    {
        "name": "/"
    }
]