RabbitMQ中 exchange、route、queue的關係
從AMQP協議可以看出,MessageQueue、Exchange和Binding構成了AMQP協議的核心,下面我們就圍繞這三個主要元件 從應用使用的角度全面的介紹如何利用Rabbit MQ構建訊息佇列以及使用過程中的注意事項。
-
1. 宣告MessageQueue
在Rabbit MQ中,無論是生產者傳送訊息還是消費者接受訊息,都首先需要宣告一個MessageQueue。這就存在一個問題,是生產者宣告還是消費者宣告呢?要解決這個問題,首先需要明確:
a)消費者是無法訂閱或者獲取不存在的MessageQueue中資訊。
b)訊息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。
在明白了上述兩點以後,就容易理解如果是消費者去宣告Queue,就有可能會出現在宣告Queue之前,生產者已傳送的訊息被丟棄的隱患。如果應用能夠通過訊息重發的機制允許訊息丟失,則使用此方案沒有任何問題。但是如果不能接受該方案,這就需要無論是生產者還是消費者,在傳送或者接受訊息前,都需要去嘗試建立訊息佇列。這裡有一點需要明確,如果客戶端嘗試建立一個已經存在的訊息佇列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的。
如果一個消費者在一個通道中正在監聽某一個佇列的訊息,Rabbit MQ是不允許該消費者在同一個channel去宣告其他佇列的。Rabbit MQ中,可以通過queue.declare命令宣告一個佇列,可以設定該佇列以下屬性:
a) Exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:其一,排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一個連線建立的排他佇列的。其二,“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同。其三,即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的。這種佇列適用於只限於一個客戶端傳送讀取訊息的應用場景。
b) Auto-delete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。
c) Durable:持久化,這個會在後面作為專門一個章節討論。
d) 其他選項,例如如果使用者僅僅想查詢某一個佇列是否已存在,如果不存在,不想建立該佇列,仍然可以呼叫queue.declare,只不過需要將引數passive設為true,傳給queue.declare,如果該佇列已存在,則會返回true;如果不存在,則會返回Error,但是不會建立新的佇列。
-
2. 生產者傳送訊息
在AMQP模型中,Exchange是接受生產者訊息並將訊息路由到訊息佇列的關鍵元件。ExchangeType和Binding決定了訊息的路由規則。所以生產者想要傳送訊息,首先必須要宣告一個Exchange和該Exchange對應的Binding。可以通過 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,宣告一個Exchange需要三個引數:ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在建立Binding和生產者通過publish推送訊息時需要指定。ExchangeType,指Exchange的型別,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不同的Exchange會表現出不同路由行為。Durable是該Exchange的持久化屬性,這個會在訊息持久化章節討論。宣告一個Binding需要提供一個QueueName,ExchangeName和BindingKey。下面我們就分析一下不同的ExchangeType表現出的不同路由規則。
生產者在傳送訊息時,都需要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey以後,會判斷該ExchangeType:
a) 如果是Direct型別,則會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則傳送到該Binding對應的Queue中。
b) 如果是 Fanout 型別,則會將訊息傳送給所有與該 Exchange 定義過 Binding 的所有 Queues 中去,其實是一種廣播行為。
c)如果是Topic型別,則會按照正則表示式,對RoutingKey與BindingKey進行匹配,如果匹配成功,則傳送到對應的Queue中。
-
3. 消費者訂閱訊息
在RabbitMQ中消費者有2種方式獲取佇列中的訊息:
a) 一種是通過basic.consume命令,訂閱某一個佇列中的訊息,channel會自動在處理完上一條訊息之後,接收下一條訊息。(同一個channel訊息處理是序列的)。除非關閉channel或者取消訂閱,否則客戶端將會一直接收佇列的訊息。
b) 另外一種方式是通過basic.get命令主動獲取佇列中的訊息,但是絕對不可以通過迴圈呼叫basic.get來代替basic.consume,這是因為basic.get RabbitMQ在實際執行的時候,是首先consume某一個佇列,然後檢索第一條訊息,然後再取消訂閱。如果是高吞吐率的消費者,最好還是建議使用basic.consume。
如果有多個消費者同時訂閱同一個佇列的話,RabbitMQ是採用迴圈的方式分發訊息的,每一條訊息只能被一個訂閱者接收。例如,有佇列Queue,其中ClientA和ClientB都Consume了該佇列,MessageA到達佇列後,被分派到ClientA,ClientA伺服器收到響應,伺服器刪除MessageA;再有一條訊息MessageB抵達佇列,伺服器根據“迴圈推送”原則,將訊息會發給ClientB,然後收到ClientB的確認後,刪除MessageB;等到再下一條訊息時,伺服器會再將訊息傳送給ClientA。
這裡我們可以看出,消費者再接到訊息以後,都需要給伺服器傳送一條確認命令,這個即可以在handleDelivery裡顯示的呼叫basic.ack實現,也可以在Consume某個佇列的時候,設定autoACK屬性為true實現。這個ACK僅僅是通知伺服器可以安全的刪除該訊息,而不是通知生產者,與RPC不同。 如果消費者在接到訊息以後還沒來得及返回ACK就斷開了連線,訊息伺服器會重傳該訊息給下一個訂閱者,如果沒有訂閱者就會儲存該訊息。
既然RabbitMQ提供了ACK某一個訊息的命令,當然也提供了Reject某一個訊息的命令。當客戶端發生錯誤,呼叫basic.reject命令拒絕某一個訊息時,可以設定一個requeue的屬性,如果為true,則訊息伺服器會重傳該訊息給下一個訂閱者;如果為false,則會直接刪除該訊息。當然,也可以通過ack,讓訊息伺服器直接刪除該訊息並且不會重傳。
-
4. 持久化:
Rabbit MQ預設是不持久佇列、Exchange、Binding以及佇列中的訊息的,這意味著一旦訊息伺服器重啟,所有已宣告的佇列,Exchange,Binding以及佇列中的訊息都會丟失。通過設定Exchange和MessageQueue的durable屬性為true,可以使得佇列和Exchange持久化,但是這還不能使得佇列中的訊息持久化,這需要生產者在傳送訊息的時候,將delivery mode設定為2,只有這3個全部設定完成後,才能保證伺服器重啟不會對現有的佇列造成影響。這裡需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能繫結,否則在繫結時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的效能造成比較大的影響,可能會下降10倍不止。
-
5. 事務:
對事務的支援是AMQP協議的一個重要特性。假設當生產者將一個持久化訊息傳送給伺服器時,因為consume命令本身沒有任何Response返回,所以即使伺服器崩潰,沒有持久化該訊息,生產者也無法獲知該訊息已經丟失。如果此時使用事務,即通過txSelect()開啟一個事務,然後傳送訊息給伺服器,然後通過txCommit()提交該事務,即可以保證,如果txCommit()提交了,則該訊息一定會持久化,如果txCommit()還未提交即伺服器崩潰,則該訊息不會伺服器就收。當然Rabbit MQ也提供了txRollback()命令用於回滾某一個事務。
-
6. Confirm機制:
使用事務固然可以保證只有提交的事務,才會被伺服器執行。但是這樣同時也將客戶端與訊息伺服器同步起來,這背離了訊息佇列解耦的本質。Rabbit MQ提供了一個更加輕量級的機制來保證生產者可以感知伺服器訊息是否已被路由到正確的佇列中——Confirm。如果設定channel為confirm狀態,則通過該channel傳送的訊息都會被分配一個唯一的ID,然後一旦該訊息被正確的路由到匹配的佇列中後,伺服器會返回給生產者一個Confirm,該Confirm包含該訊息的ID,這樣生產者就會知道該訊息已被正確分發。對於持久化訊息,只有該訊息被持久化後,才會返回Confirm。Confirm機制的最大優點在於非同步,生產者在傳送訊息以後,即可繼續執行其他任務。而伺服器返回Confirm後,會觸發生產者的回撥函式,生產者在回撥函式中處理Confirm資訊。如果訊息伺服器發生異常,導致該訊息丟失,會返回給生產者一個nack,表示訊息已經丟失,這樣生產者就可以通過重發訊息,保證訊息不丟失。Confirm機制在效能上要比事務優越很多。但是Confirm機制,無法進行回滾,就是一旦伺服器崩潰,生產者無法得到Confirm資訊,生產者其實本身也不知道該訊息吃否已經被持久化,只有繼續重發來保證訊息不丟失,但是如果原先已經持久化的訊息,並不會被回滾,這樣佇列中就會存在兩條相同的訊息,系統需要支援去重。
-
其他:
Broker:簡單來說就是訊息佇列伺服器實體。
Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
producer:訊息生產者,就是投遞訊息的程式。
consumer:訊息消費者,就是接受訊息的程式。
channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
訊息佇列的使用過程大概如下:
(1)客戶端連線到訊息佇列伺服器,開啟一個channel。
(2)客戶端宣告一個exchange,並設定相關屬性。
(3)客戶端宣告一個queue,並設定相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好繫結關係。
(5)客戶端投遞訊息到exchange。
Exchanges, queues, and bindings
exchanges, queues, and bindings是三個基礎的概念, 他們的作用是: exchanges are where producers publish their messages, queues are where the messages end up and are received by consumers, and bindings are how the messages get routed from the exchange to particular queues. 下面我們用一副簡單的思維導圖把上面的概念組織起來: 上面還提到了一個vhost的概念,vhost是為了組織exchanges, queues, and bindings提出的概念,我們就從它開始講起:VHost
Vhosts也是AMQP的一個基礎概念,連線到RabbitMQ預設就有一個名為"/"的vhost可用,本地除錯的時候可以直接使用這個預設的vhost.這個"/"的訪問可以使用guest使用者名稱(密碼guest)訪問.可以使用rabbitmqctl工具修改這個賬戶的許可權和密碼,這在生產環境是必須要關注的. 出於安全和可移植性的考慮,一個vhost內的exchange不能繫結到其他的vhost. 可以按照業務功能組來規劃vhost,在叢集環境中只要在某個節點建立vhost就會在整個叢集內的節點都建立該vhost.VHost和許可權都不能通過AMQP協議建立,在RabbitMQ中都是使用rabbitmqctl進行建立,管理. 如何建立vhost vhost和permission(許可權)資訊是並不是通過AMQP建立而是通過rabbitmqctl工具來新增,管理的. 說完vhost我們就來看看重中之重的訊息:MessageMessage
訊息由兩部分組成: payload and label. "payload"是實際要傳輸的資料,至於資料的格式RabbitMQ並不關心,"label"描述payload,包括exchange name 和可選的topic tag.訊息一旦到了consumer那裡就只有payload部分了,label部分並沒有帶過來.RabbitMQ並不告訴你訊息是誰發出的.這好比你收到一封信但是信封上是空白的.當然想知道是誰發的還是有辦法的,在訊息內容中包含傳送者的資訊就可以了. 訊息的consumer和producer對應的概念是sending和receiving並不對應client和server.通過channel我們可以建立很多並行的傳輸 TCP連結不再成為瓶頸,我們可以把RabbitMQ當做應用程式級別的路由器. Consumer訊息的接收方式 Consumer有兩種方式接收訊息: 通過 basic.consume 訂閱佇列.channel將進入接收模式直到你取消訂閱.訂閱模式下Consumer只要上一條訊息處理完成(ACK或拒絕),就會主動接收新訊息.如果訊息到達queue就希望得到儘快處理,也應該使用basic.consume命令. 還有一種情況,我們不需要一直保持訂閱,只要使用basic.get命令主動獲取訊息即可.當前訊息處理完成之後,繼續獲取訊息需要主動執行basic.get 不要"在迴圈中使用basic.ge"t當做另外一種形式的basic.consume,因為這種做法相比basic.consume有額外的成本:basic.get本質上就是先訂閱queue取回一條訊息之後取消訂閱.Consumer吞吐量大的情況下通常都會使用basic.consume. 要是沒有Consumer怎麼辦? 如果訊息沒有Consumer就會老老實實呆在佇列裡面. 多個Consumer訂閱同一個佇列 只要Consumer訂閱了queue,訊息就會發送到該Consumer.我們的問題是這種情況下queue中的訊息是如何分發的? 如果一個rabbit queue有多個consumer,具體到佇列中的某條訊息只會傳送到其中的一個Consumer. 訊息確認 所有接收到的訊息都要求傳送響應訊息(ACK).這裡有兩種方式一種是Consumer使用basic.ack明確傳送ACK,一種是訂閱queue的時候指定auto_ack為true,這樣訊息一到Consumer那裡RabbitMQ就會認為訊息已經得到ACK. 要注意的是這裡的響應和訊息的傳送者沒有絲毫關係,ACK只是Consumer向RabbitMQ確認訊息已經正確的接收到訊息,RabbitMQ可以安全移除該訊息,僅此而已. 沒有正確響應怎麼辦 如果Consumer接收了一個訊息就還沒有傳送ACK就與RabbitMQ斷開了,RabbitMQ會認為這條訊息沒有投遞成功會重新投遞到別的Consumer.如果你的應用程式崩掉了,你可以設定備用程式來繼續完成訊息的處理. 如果Consumer本身邏輯有問題沒有傳送ACK的處理,RabbitMQ不會再向該Consumer傳送訊息.RabbitMQ會認為這個Consumer還沒有處理完上一條訊息,沒有能力繼續接收新訊息.我們可以善加利用這一機制,如果需要處理過程是相當複雜的,應用程式可以延遲傳送ACK直到處理完成為止.這可以有效控制應用程式這邊的負載,不致於被大量訊息衝擊. 拒絕訊息 由於要拒絕訊息,所以ACK響應訊息還沒有發出,所以這裡拒絕訊息可以有兩種選擇: 1.Consumer直接斷開RabbitMQ 這樣RabbitMQ將把這條訊息重新排隊,交由其它Consumer處理.這個方法在RabbitMQ各版本都支援.這樣做的壞處就是連線斷開增加了RabbitMQ的額外負擔,特別是consumer出現異常每條訊息都無法正常處理的時候. 2. RabbitMQ 2.0.0可以使用 basic.reject 命令,收到該命令RabbitMQ會重新投遞到其它的Consumer.如果設定requeue為false,RabbitMQ會直接將訊息從queue中移除. 其實還有一種選擇就是直接忽略這條訊息併發送ACK,當你明確直到這條訊息是異常的不會有Consumer能處理,可以這樣做拋棄異常資料.為什麼要傳送basic.reject訊息而不是ACK?RabbitMQ後面的版本可能會引入"dead letter"佇列,如果想利用dead letter做點文章就使用basic.reject並設定requeue為false. 訊息持久化 訊息的持久化需要在訊息投遞的時候設定delivery mode值為2.由於訊息實際儲存於queue之中,"皮之不存毛將焉附"邏輯上,訊息持久化同時要求exchange和queue也是持久化的.這是訊息持久化必須滿足的三個條件. 持久化的代價就是效能損失,磁碟IO遠遠慢於RAM(使用SSD會顯著提高訊息持久化的效能) , 持久化會大大降低RabbitMQ每秒可處理的訊息.兩者的效能差距可能在10倍以上. 訊息恢復 consumer從durable queue中取回一條訊息之後併發回了ACK訊息,RabbitMQ就會將其標記,方便後續垃圾回收.如果一條持久化的訊息沒有被consumer取走,RabbitMQ重啟之後會自動重建exchange和queue(以及bingding關係),訊息通過持久化日誌重建再次進入對應的queues,exchanges. 皮之不存,毛將焉附?緊接著我們看看訊息實際存放的地方:QueueQueue
Queues是Massage的落腳點和等待接收的地方,訊息除非被扔進黑洞否則就會被安置在一個Queue裡面.Queue很適合做負載均衡,RabbitMQ可以在若干consumer中間實現輪流排程(Round-Robin). 如何建立佇列 consumer和producer都可以建立Queue,如果consumer來建立,避免consumer訂閱一個不存在的Queue的情況,但是這裡要承擔一種風險:訊息已經投遞但是consumer尚未建立佇列,那麼訊息就會被扔到黑洞,換句話說訊息丟了;避免這種情況的好辦法就是producer和consumer都嘗試建立一下queue. 如果consumer在已經訂閱了另外一個Queue的情況下無法完成新Queue的建立,必須取消之前的訂閱將Channel置為傳輸模式("transmit")才能建立新的Channel. 建立Queue的時候通常要指定名字,名字方便consumer訂閱.即使你不指定Rabbit會給它分配一個隨機的名字,這在使用臨時匿名佇列完成RPC-over-AMQP呼叫時會非常有用. 建立Queue的時候還有兩個非常有用的選項: exclusive—When set to true, your queue becomes private and can only be consumed by your app. This is useful when you need to limit a queue to only one consumer. auto-delete—The queue is automatically deleted when the last consumer unsubscribes. 如果要建立只有一個consumer使用的臨時queue可以組合使用auto-delete和 exclusive.consumer一旦斷開連線該佇列自動刪除. 重複建立Queue會怎樣?如果Queue建立的選項完全一致的話,RabbitMQ直接返回成功,如果名稱相同但是建立選項不一致就會返回建立失敗.如果是想檢查Queue是否存在,可以設定queue.declare命令的passive 選項為true:如果佇列存在就會返回成功,如果佇列不存在會報錯且不會執行建立邏輯. 訊息是如何從動態路由到不同的佇列的?這就看下面的內容了bindings and exchanges
訊息如何傳送到佇列 訊息是如何傳送到佇列的?這就要說到AMQP bindings and exchanges. 投遞訊息到queue都是經由exchange完成的,和生活中的郵件投遞一樣也需要遵循一定的規則,在RabbitMQ中規則是通過routing key把queue繫結到exchange上,這種繫結關係即binding.訊息傳送到RabbitMQ都會攜帶一個routing key(哪怕是空的key),RabbitMQ會根據bindings匹配routing key,如果匹配成功訊息會轉發到指定Queue,如果沒有匹配到queue訊息就會被扔到黑洞. 如何傳送到多個佇列 訊息是分發到多個佇列的?AMQP協議裡面定義了幾種不同型別的exchange:direct, fanout, topic, and headers. 每一種都實現了一種 routing 演算法. header的路由訊息並不依賴routing key而是去匹配AMQP訊息的header部分,這和下面提到的direct exchange如出一轍,但是效能要差很多,在實際場景中幾乎不會被用到. direct exchange routing key完全匹配才轉發fanout exchange 不理會routing key,訊息直接廣播到所有繫結的queue
topic exchange 對routing key模式匹配 exchange持久化 建立queue和exchange預設情況下都是沒有持久化的,節點重啟之後queue和exchange就會消失,這裡需要特別指定queue和exchange的durable屬性. Consumer是直接建立TCP連結到RabbitMQ嗎?下面就是答案: