RabbitMQ入門使用
RabbitMQ(Rabbit Message Queue),即訊息佇列系統,它是一款開源訊息佇列中介軟體,採用Erlang語言開發,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的標準實現。
AMQP是一個公開發布的非同步訊息的規範,是提供統一訊息服務的應用層標準高階訊息佇列協議,為面向訊息的中介軟體設計,訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。相對於JMS(Java Message Service)規範來說,JMS使用的是特定的APIs,而訊息格式可自由定義,而AMQP對訊息的格式和傳輸是有要求的,但實現不會受作業系統、開發語言以及平臺等的限制。
JMS和AMQP還有一個較大的區別:JMS有佇列(Queues)和主題(Topics)兩種形式,傳送到JMS佇列的訊息最多隻能被一個Client消費,傳送到JMS主題的訊息可能會被多個Clients消費;AMQP只有佇列(Queues),佇列的訊息只能被單個接受者消費,傳送者並不直接把訊息傳送到佇列中,而是傳送到Exchange中,該Exchage會與一個或多個佇列繫結,能夠實現與JMS佇列和主題同樣的功能。
RabbitMQ的主要宣傳點在於其健壯性好、易於使用、高效能、高併發、叢集易擴充套件以及強大的開源社群支援,鑑於些,在實際專案中使用還是比較可靠有價值的。接下來就介紹一下RabbitMQ從安裝到簡單示例的入門教程。
以Java為例整合RabbitMQ,針對不同應用場景使用其對應的功能。
應用場景一:Simple Queue “Hello Word”
一個Producer傳送訊息到Queue中,一個Consumer從Queue讀取訊息並列印。
應用場景二:Work Queues
將訊息分配給多個Consumer進行處理,可以避免在執行資源密集性任務時同步處理導致阻塞等待的問題,從而在一定程度上提升並行能力,通常稱該類Consumer為Work,多個Work在後臺接收分配到的任務並處理。
應用場景三:Publish/Subscribe
前兩個場景都是把一個訊息傳遞給一個Consumer/Worker,而這裡的Publish/Subscribe需要把訊息傳遞給多個Consumer。這裡Producer不會將訊息直接傳送到佇列,事實上,Producer也並不知道訊息會傳遞給任何的Queue,而是將訊息傳送到一個Exchange上,Exchange的作用在於收到Producer的訊息並推送給繫結的Queue,這樣Exchange就將訊息傳遞給繫結的Queues及其以對應的Consumer了,這裡使用的Exchange是fanout,其實就是廣播功能。Exchange通常分為四種:
- fanout:該型別路由規則非常簡單,會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中,相當於廣播功能
- direct:該型別路由規則會將訊息路由到binding key與routing key完全匹配的Queue中,在場景四中會用到
- topic:與direct型別相似,只是規則沒有那麼嚴格,可以模糊匹配和多條件匹配,在場景五中會進一步解釋
- headers:該型別不依賴於routing key與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配
比如一個列印Log的系統,當所有接收Log的Consumer接收到訊息後都可以列印Log,有興趣可參閱示例程式碼:Publish/Subscribe Demo。
應用場景四:Routing
在上一場景中,只是一個簡單的Log系統,相當於廣播功能,更進一步,可以針對不同的日誌傳送到不同的Consumer進行不同的處理,比如有的寫檔案,有的列印控制檯等,那麼就需要定義Routing路由了。為了使用Routing功能,可以將Exchange定義為direct型別,需要設定routing_key繫結到Queue上,然後就可以傳送訊息了。
應用場景五:Topics
在以上場景中,使用direct型別的Exchange可以實現對不同訊息的路由,但只是支援單一的條件,為了支援多種條件,比如不僅針對Log的級別,還要針對其來源進行分發訊息,這時候可以使用Topic來實現了,其中的邏輯和direct的類似,只是routing_key支援多個用點.分隔的值用於匹配路由資訊,其中路由Key可以使用*替代一個詞,#匹配0個或多個詞。
應用場景六:RPC(Remote procedure call)
在前面的場景二中,能夠實現使用Work Queues分發處理運算任務,但如果需要將任務傳送到遠端伺服器上執行處理,然後等待返回運算結果呢?那就需要RPC遠端過程回調了。這裡描述的場景將和之前的完全不一樣,需要構建一個Client和RPC Server,Client作為遠端呼叫的發起者攜帶一些如reply_to,correlation_id等的特定資訊傳送請求到rpc_queue上,RPC Server接收到請求後執行處理函式並將運算結果返回給Callback Queue,Client就可以接收到對應correlationId的返回結果了。