1. 程式人生 > 實用技巧 >RabbitMQ總體介紹

RabbitMQ總體介紹

歷史-從開始到現在

RabbitMQ是一個Erlang開發的AMQP(Advanced Message Queuing Protocol )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步訊息通訊的世界裡有很多公開標準(如 Cobar)的 IIOP ,或者是 SOAP 等),但是在非同步訊息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 WebSphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Red Hat、iMatix 等聯合制定了 AMQP 的公開標準。

RabbitMQ由RabbitMQ Technologies Ltd開發並且提供商業支援的。該公司在2010年4月被SpringSource(VMware的一個部門)收購。在2013年5月被併入Pivotal。其實VMware,Pivotal和EMC本質上是一家的。不同的是,VMware是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在並沒有上市。

RabbitMQ官網:http://www.rabbitmq.com

一、應用場景

言歸正傳。RabbitMQ,或者說AMQP解決了什麼問題,或者說它的應用場景是什麼?

對於一個大型的軟體系統來說,它會有很多的元件或者說模組,又或者說子系統。那這些模組又如何通訊?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模組耦合性很大,不適合擴充套件(Scalability)。如果使用Socket,那麼不同的模組的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:

  • 資訊的傳送者和接收者如何維持這個連線,如果一方的連線中斷,這期間的資料是以什麼方式丟失?

  • 如何降低傳送者和接收者的耦合度?

  • 如何讓Priority高的接收者先接到資料?

  • 如何做到Load Balance?有效均衡接收者的負載?

  • 如何有效的將資料傳送到相關的接收者?也就是說將接收者subscribe 不同的資料,如何做有效的filter。

  • 如何做到可擴充套件,甚至將這個通訊模組發到cluster上?

  • 如何保證接收者接收到了完整,正確的資料?

AMQP協議解決了以上的問題,而RabbitMQ實現了AMQP。

二、系統架構

RabbitMQ Server

也叫Broker Server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn't a food truck, it's a delivery service. 它的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。雖然這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層資料一致性的guard,就可以徹底保證系統的一致性了。

Client P

也叫Producer,資料的傳送方。Create messages and publish (send) them to a Broker Server (RabbitMQ)。一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

Client C

也叫Consumer,資料的接收方。Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它傳送給它的某個訂閱者即Consumer。當然可能會把同一個Message傳送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰傳送的這個資訊的,就是協議本身不支援。當然了,如果Producer傳送的payload包含了Producer的資訊就另當別論了。

對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。

  • Exchanges are where producers publish their messages.

  • Queues are where the messages end up and are received by consumers.

  • Bindings are how the messages get routed from the exchange to particular queues.

還有幾個概念是上述圖中沒有標明的,那就是Connection(連線)和Channel(通道,頻道)。

Connection

就是一個TCP的連線。Producer和Consumer都是通過TCP連線到RabbitMQ Server的。以後我們可以看到,程式的起始處就是建立這個TCP連線。

Channel

虛擬連線。它建立在上述的TCP連線中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連線,第二步就是建立這個Channel。

那麼,為什麼使用Channel,而不是直接使用TCP連線?

對於OS來說,建立和關閉TCP連線是有代價的,頻繁的建立關閉TCP連線對於系統的效能有很大的影響,而且TCP的連線數也有限制,這也限制了系統處理高併發的能力。但是,在TCP連線中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以併發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的資料可以Publish10K的資料包。當然對於不同的硬體環境,不同的資料包大小這個資料肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化SPLIT你的設計。

相關定義:

  • Broker: 簡單來說就是訊息佇列伺服器實體

  • Exchange: 訊息交換機,它指定訊息按什麼規則,路由到哪個佇列

  • Queue: 訊息佇列載體,每個訊息都會被投入到一個或多個佇列

  • Binding: 繫結,它的作用就是把exchange和queue按照路由規則繫結起來

  • Routing Key: 路由關鍵字,exchange根據這個關鍵字進行訊息投遞

  • VHost: 虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。

  • Producer: 訊息生產者,就是投遞訊息的程式

  • Consumer: 訊息消費者,就是接受訊息的程式

  • Channel: 訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務

由Exchange、Queue、RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。

三、基本概念

Connection Factory、Connection、Channel都是RabbitMQ對外提供的API中最基本的物件。Connection是RabbitMQ的socket連結,它封裝了socket協議相關部分邏輯。Connection Factory則是Connection的製造工廠。

Channel是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、繫結Queue與Exchange、釋出訊息等。

Queue

Queue(佇列)是RabbitMQ的內部物件,用於儲存訊息,如下圖表示。

RabbitMQ中的訊息都只能儲存在Queue中,生產者(下圖中的P)生產訊息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取訊息並消費。

多個消費者可以訂閱同一個Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。

Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的訊息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致訊息丟失。為了避免這種情況發生,我們可以要求消費者在消費完訊息後傳送一個回執給RabbitMQ,RabbitMQ收到訊息回執(Message acknowledgment)後才將該訊息從Queue中移除。

如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連線斷開,則RabbitMQ會將該訊息傳送給其他消費者(如果存在多個消費者)進行處理。這裡不存在timeout,一個消費者處理訊息時間再長也不會導致該訊息被髮送給其他消費者,除非它的RabbitMQ連線斷開。

這裡會產生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記傳送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的訊息會越來越多。消費者重啟後會重複消費這些訊息並重復執行業務邏輯。

另外publish message 是沒有ACK的。

Message durability

如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失訊息,我們可以將Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ伺服器已經接收到生產者的訊息,但還沒來得及持久化該訊息時RabbitMQ伺服器就斷電了),如果我們需要對這種小概率事件也要管理起來,那麼我們要用到事務。由於這裡僅為RabbitMQ的簡單介紹,所以這裡將不講解RabbitMQ相關的事務。

Prefetch count

前面我們講到如果有多個消費者同時訂閱同一個Queue中的訊息,Queue中的訊息會被平攤給多個消費者。這時如果每個訊息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閒的情況。我們可以通過設定Prefetch count來限制Queue每次傳送給每個消費者的訊息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者傳送一條訊息;消費者處理完這條訊息後Queue會再給該消費者傳送一條訊息。

Exchange

在上一節我們看到生產者將訊息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將訊息傳送到Exchange(交換器,下圖中的X),由Exchange將訊息路由到一個或多個Queue中(或者丟棄)。

Exchange是按照什麼邏輯將訊息路由到Queue的?這個將在Binding一節中介紹。

RabbitMQ中的Exchange有四種類型,不同的型別有著不同的路由策略,這將在Exchange Types一節介紹。

Routing Key

生產者在將訊息傳送給Exchange的時候,一般會指定一個Routing Key,來指定這個訊息的路由規則,而這個Routing Key需要與Exchange Type及Binding key聯合使用才能最終生效。

在Exchange Type與Binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在傳送訊息給Exchange時,通過指定Routing Key來決定訊息流向哪裡。

RabbitMQ為Routing Key設定的長度限制為255 bytes。

Binding

RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將訊息路由到指定的Queue了。

Binding key

在繫結(Binding)Exchange與Queue的同時,一般會指定一個Binding key。消費者將訊息傳送給Exchange時,一般會指定一個Routing Key。當 Binding key與Routing Key相匹配時,訊息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。

在繫結多個Queue到同一個Exchange的時候,這些Binding允許使用相同的Binding key。

Binding key並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout型別的Exchange就會無視Binding key,而是將訊息路由到所有繫結到該Exchange的Queue。

Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裡還提到兩種Exchange Type,分別為system與自定義,這裡不予以描述),下面分別進行介紹。

fanout

fanout型別的Exchange路由規則非常簡單,它會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中。

上圖中,生產者(P)傳送到Exchange(X)的所有訊息都會路由到圖中的兩個Queue,並最終被兩個消費者(C1與C2)消費。

direct

direct型別的Exchange路由規則也很簡單,它會把訊息路由到那些Binding key與Routing key完全匹配的Queue中。

以上圖的配置為例,我們以routingKey="error"傳送訊息到Exchange,則訊息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以Routing Key="info"或routingKey="warning"來發送訊息,則訊息只會路由到Queue2。如果我們以其他Routing Key傳送訊息,則訊息不會路由到這兩個Queue中。

topic

前面講到direct型別的Exchange路由規則是完全匹配Binding Key與Routing Key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic型別的Exchange在匹配規則上進行了擴充套件,它與direct型別的Exchage相似,也是將訊息路由到Binding Key與Routing Key相匹配的Queue中,但這裡的匹配規則有些不同,它約定:

Routing Key為一個句點號“.”分隔的字串(我們將被句點號". "分隔開的每一段獨立的字串稱為一個單詞),如"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。Binding Key與Routing Key一樣也是句點號“. ”分隔的字串。

Binding Key中可以存在兩種特殊字元"*""#",用於做模糊匹配,其中"*"用於匹配一個單詞,"#"用於匹配多個單詞(可以是零個)。

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的訊息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的訊息會路由到Q1,routingKey=”lazy.brown.fox”的訊息會路由到Q2,routingKey=”lazy.pink.rabbit”的訊息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的訊息將會被丟棄,因為它們沒有匹配任何bindingKey。

headers

headers型別的Exchange不依賴於Routing Key與Binding Key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配。

在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到Exchange時,RabbitMQ會取到該訊息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange繫結時指定的鍵值對。如果完全匹配則訊息會路由到該Queue,否則不會路由到該Queue。

該型別的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。

RPC

MQ本身是基於非同步的訊息處理,前面的示例中所有的生產者(P)將訊息傳送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條訊息都不知道)。

但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的訊息處理完成後再進行下一步處理。這相當於RPC(Remote Procedure Call,遠端過程呼叫)。在RabbitMQ中也支援RPC。

RabbitMQ中實現RPC的機制是:

客戶端傳送請求(訊息)時,在訊息的屬性(Message Properties,在AMQP協議中定義了14種properties,這些屬性會隨著訊息一起傳送)中設定兩個值replyTo(一個Queue名稱,用於告訴伺服器處理完成後將通知我的訊息傳送到這個Queue中)和correlationId(此次請求的標識號,伺服器處理完成後需要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗)。伺服器端收到訊息處理完後,將生成一條應答訊息到replyTo指定的Queue,同時帶上correlationId屬性。客戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答訊息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。

四、細節闡明

使用ACK確認Message的正確傳遞

預設情況下,如果Message 已經被某個Consumer正確的接收到了,那麼該Message就會被從Queue中移除。當然也可以讓同一個Message傳送到很多的Consumer。

如果一個Queue沒被任何的Consumer Subscribe(訂閱),當有資料到達時,這個資料會被cache,不會被丟棄。當有Consumer時,這個資料會被立即傳送到這個Consumer。這個資料被Consumer正確收到時,這個資料就被從Queue中刪除。

那麼什麼是正確收到呢?通過ACK。每個Message都要被acknowledged(確認,ACK)。我們可以顯示的在程式中去ACK,也可以自動的ACK。如果有資料沒有被ACK,那麼RabbitMQ Server會把這個資訊傳送到下一個Consumer。

如果這個APP有bug,忘記了ACK,那麼RabbitMQ Server不會再發送資料給它,因為Server認為這個Consumer處理能力有限。而且ACK的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成資料後傳送ACK,甚至在額外的延時後傳送ACK,將有效的balance Consumer的load。

當然對於實際的例子,比如我們可能會對某些資料進行merge,比如merge 4s內的資料,然後sleep 4s後再獲取資料。特別是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端使用者也不會感覺到。

Reject a message

有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 傳送到下一個Consumer。第二種是從Queue中立即刪除該Message。

Creating a queue

Consumer和Procuder都可以通過 queue.declare 建立queue。對於某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以建立私有的queue。這樣只有APP本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麼如果是建立一個已經存在的queue呢?那麼不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次建立如果引數和第一次不一樣,那麼該操作雖然成功,但是queue的屬性並不會被修改。

那麼誰應該負責建立這個queue呢?是Consumer,還是Producer?

如果queue不存在,當然Consumer不會得到任何的Message。那麼Producer Publish的Message會被丟棄。所以,還是為了資料不丟失,Consumer和Producer都try to create the queue!反正不管怎麼樣,這個介面都不會出問題。

queue對load balance的處理是完美的。對於多個Consumer來說,RabbitMQ 使用迴圈的方式(round-robin)的方式均衡的傳送給不同的Consumer。

Exchanges

從架構圖可以看出,Procuder Publish的Message進入了Exchange。接著通過"routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裡。queue也是通過這個routing keys來做的繫結。

有三種類型的Exchanges:direct, fanout,topic。 每個實現了不同的路由演算法(routing algorithm)。

  • Direct exchange:如果 routing key 匹配,那麼Message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字作為routing key來繫結那個exchange。

  • Fanout exchange: 會向響應的queue廣播。

  • Topic exchange:對key進行模式匹配,比如ab可以傳遞到所有ab的queue。

Virtual hosts

每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的Application中使用RabbitMQ。