RabbitMQ(一)-RabbitMQ原理及簡介
一、什麼是RabbitMQ
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。
AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。
二、RabbitMQ特點
RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。具體特點包括:
-
可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。
-
靈活的路由(Flexible Routing)
在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。
-
訊息叢集(Clustering)
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。
-
高可用(Highly Available Queues)
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。
-
多種協議(Multi-protocol)
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。
-
多語言客戶端(Many Clients)
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。
-
管理介面(Management UI)
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。
-
跟蹤機制(Tracing)
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。
-
外掛機制(Plugin System)
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。
三、原理
RabbitMQ 內部結構圖:
Publisher
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
Exchange
Exchange類似於資料通訊網路中的交換機,提供訊息路由策略。rabbitmq中,producer不是通過通道直接將訊息傳送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行繫結,producer在傳遞訊息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將訊息路由給指定的queue。和Queue一樣,Exchange也可設定為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別:
-
Direct
直接交換器,工作方式類似於單播,Exchange會將訊息傳送完全匹配ROUTING_KEY的Queue
-
fanout
廣播是式交換器,不管訊息的ROUTING_KEY設定為什麼,Exchange都會將訊息轉發給所有繫結的Queue。
-
topic
主題交換器,工作方式類似於組播,Exchange會將訊息轉發和ROUTING_KEY匹配模式相同的所有佇列,比如,ROUTING_KEY為user.stock的Message會轉發給繫結匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的佇列。( * 表是匹配一個任意片語,#表示匹配0個或多個片語)
-
headers
訊息體的header匹配(ignore)
Queue
訊息佇列,提供了FIFO的處理機制,具有快取訊息的能力。rabbitmq中,佇列訊息可以設定為持久化,臨時或者自動刪除。
- 設定為持久化的佇列,queue中的訊息會在server本地硬碟儲存一份,防止系統crash,資料丟失
- 設定為臨時佇列,queue中的資料在系統重啟之後就會丟失
- 設定為自動刪除的佇列,當不存在使用者連線到server,佇列中的資料會被自動刪除
Binding
所謂繫結就是將一個特定的 Exchange 和一個特定的 Queue 繫結起來。Exchange 和Queue的繫結可以是多對多的關係。
virtual host
在rabbitmq server上可以建立多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的執行在不同的vhost例項上,相互之間不會干擾。producer和consumer連線rabbit server需要指定一個vhost。
Broker
表示訊息佇列伺服器實體。
Connection
網路連線,比如一個TCP連線。
Channel
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內地虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。
Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
四、核心概念Exchange
那麼為什麼我們需要 Exchange 而不是直接將訊息傳送至佇列呢?
AMQP 協議中的核心思想就是生產者和消費者的解耦,生產者從不直接將訊息傳送給佇列。生產者通常不知道是否一個訊息會被髮送到佇列中,只是將訊息傳送到一個交換機。先由 Exchange 來接收,然後 Exchange 按照特定的策略轉發到 Queue 進行儲存。Exchange 就類似於一個交換機,將各個訊息分發到相應的佇列中。
在實際應用中我們只需要定義好 Exchange 的路由策略,而生產者則不需要關心訊息會發送到哪個 Queue 或被哪些 Consumer 消費。在這種模式下生產者只面向 Exchange 釋出訊息,消費者只面向 Queue 消費訊息,Exchange 定義了訊息路由到 Queue 的規則,將各個層面的訊息傳遞隔離開,使每一層只需要關心自己面向的下一層,降低了整體的耦合度。
Exchange型別:
1.fanout
fanout型別的Exchange路由規則非常簡單,它會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中。
上圖所示,生產者(P)生產訊息1將訊息1推送到Exchange,由於Exchange Type=fanout這時候會遵循fanout的規則將訊息推送到所有與它繫結Queue,也就是圖上的兩個Queue最後兩個消費者消費。
應用場景
以日誌系統為例:假設我們定義了一個 Exchange 來接收日誌訊息,同時定義了兩個 Queue 來儲存訊息:一個記錄將被列印到控制檯的日誌訊息;另一個記錄將被寫入磁碟檔案的日誌訊息。我們希望 Exchange 接收到的每一條訊息都會同時被轉發到兩個 Queue,這種場景下就可以使用 Fanout Exchange 來廣播訊息到所有繫結的 Queue。
2.direct
direct型別的Exchange路由規則也很簡單,它會把訊息路由到那些binding key與routing key完全匹配的Queue中
當生產者(P)傳送訊息時Rotuing key=booking時,這時候將訊息傳送給Exchange,Exchange獲取到生產者傳送過來訊息後,會根據自身的規則進行與匹配相應的Queue,這時發現Queue1和Queue2都符合,就會將訊息傳送給這兩個佇列,如果我們以Rotuing key=create和Rotuing key=confirm傳送訊息時,這時訊息只會被推送到Queue2佇列中,其他Routing Key的訊息將會被丟棄。
應用場景
現在我們考慮只把重要的日誌訊息寫入磁碟檔案,例如只把 Error 級別的日誌傳送給負責記錄寫入磁碟檔案的 Queue。這種場景下我們可以使用指定的 RoutingKey(例如 error)將寫入磁碟檔案的 Queue 繫結到 Direct Exchange 上。
3.topic
前面提到的direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候才將訊息傳送給Queue,那麼topic這個規則就是模糊匹配,可以通過萬用字元滿足一部分規則就可以傳送。它的約定是:
- routing key為一個句點號“. ”分隔的字串(我們將被句點號“. ”分隔開的每一段獨立的字串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字串
- binding key中可以存在兩種特殊字元“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
當生產者傳送訊息Routing Key=F.C.E的時候,這時候只滿足Queue1,所以會被路由到Queue中,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B時,這裡只會傳送一條訊息到Queue2中。
應用場景
假設我們的訊息路由規則除了需要根據日誌級別來分發之外還需要根據訊息來源分發,可以將 RoutingKey 定義為 訊息來源.級別
如 order.info
、user.error
等。處理所有來源為 user
的 Queue 就可以通過 user.*
繫結到 Topic Exchange 上,而處理所有日誌級別為 info
的 Queue 可以通過 *.info
繫結到 Exchange上。
4.headers
headers型別的Exchange不依賴於routing key與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配。
在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到Exchange時,RabbitMQ會取到該訊息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange繫結時指定的鍵值對;如果完全匹配則訊息會路由到該Queue,否則不會路由到該Queue。
該型別的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。
這裡在對其進行簡要的表格整理:
型別名稱 | 型別描述 |
fanout | 把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中 |
direct | Routing Key==Binding Key |
topic | 我這裡自己總結的簡稱模糊匹配 |
headers | Exchange不依賴於routing key與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配。 |
五、通訊過程
假設P1和C1註冊了相同的Broker,Exchange和Queue。P1傳送的訊息最終會被C1消費。基本的通訊流程大概如下所示:
- P1生產訊息,傳送給伺服器端的Exchange
- Exchange收到訊息,根據ROUTINKEY,將訊息轉發給匹配的Queue1
- Queue1收到訊息,將訊息傳送給訂閱者C1
- C1收到訊息,傳送ACK給佇列確認收到訊息
- Queue1收到ACK,刪除佇列中快取的此條訊息
Consumer收到訊息時需要顯式的向rabbit broker傳送basic.ack訊息或者consumer訂閱訊息時設定auto_ack引數為true。在通訊過程中,佇列對ACK的處理有以下幾種情況:
- 如果consumer接收了訊息,傳送ack,rabbitmq會刪除佇列中這個訊息,傳送另一條訊息給consumer。
- 如果cosumer接受了訊息, 但在傳送ack之前斷開連線,rabbitmq會認為這條訊息沒有被deliver,在consumer在次連線的時候,這條訊息會被redeliver。
- 如果consumer接受了訊息,但是程式中有bug,忘記了ack,rabbitmq不會重複傳送訊息。
- rabbitmq2.0.0和之後的版本支援consumer reject某條(類)訊息,可以通過設定requeue引數中的reject為true達到目地,那麼rabbitmq將會把訊息傳送給下一個註冊的consumer。