1. 程式人生 > >基於java語言使用RabbitMQ開發過程記錄

基於java語言使用RabbitMQ開發過程記錄

此處不再一一貼上程式碼,只記錄開發學習過程中遇到的一些問題和解決辦法,還有一些理論知識,留著以後面試的時候不啦不啦~~

剛才使用簡單佇列實現了生產者和消費者模式(基於舊 的API和新的API兩種方式)

簡單佇列

一個生產者對應一個消費者,耦合性高,無法實現讓多個消費者共同消費一個生產者,而且若想修改佇列名稱,就得全部修改。

實際場景中,生產者生產訊息很快,一般都是插入一條資料之類的;而消費者則需要根據業務邏輯處理很多流程,所以簡單佇列的一對一場景滿足不了現實的需要,如果同時起兩個簡單佇列,就會發現兩個消費者所接收的訊息數量一致,不管它們各自消費一個訊息需要多久,它們接收的訊息數量都是一樣的,均分的,輪詢分發,round-robin

,是一次全部發送出去,不管消費者有沒有處理完成;

若一開始生產者給每一個消費者都只發一個訊息,等消費者處理完成以後通知生產者再發送下一個,這樣就提高整體的效率。這種叫做公平分發。公平分發就不能使用自動應答,而需要手動應答,由消費者在消費完成的時候再應答。

生產者和消費者都要宣告:

消費者的應答模式

以及在處理完成(callback)最後手動應答

訊息應答(針對於消費者)

若是自動應答,即生產者將訊息傳送出去,rabbitMq將訊息分發給消費者之後,就會從記憶體中刪除;如果消費者剛接收到訊息就宕機了,那麼這個訊息沒有處理完成就丟失了。應為應答是在接收之時自動發出的,而不是真正完成時發出的。

所以,手動應答,若一個消費者在處理過程中掛了,則將它正在處理未完成的訊息重新分發給另一個消費者。

訊息持久化

將訊息持久化可以避免rabbitMq服務宕機訊息丟失的問題。

若是將上述方法,第二個引數直接修改成true(持久化),啟動報錯,因為佇列是已經宣告好的未持久化的(),rabbitmq不允許使用不同的引數重新定義一個已經存在的佇列。

                                                           (work-queue已經存在,以未持久化的方式存在)

解決辦法:在控制檯中將這個佇列刪除,然後重新建立;或者重新建立一個新的佇列。

???

上面講述的都是只有一個佇列一個訊息只能由一個消費者消費。下面引入一個新的概念:交換器

中間X就是交換器。生產者將訊息傳送給交換器,由交換器通過路由分發給佇列;而佇列和消費者是一一對應的。

交換器可以將同一個訊息發給一個佇列或多個佇列,這樣一個訊息就可以由多個消費者共同消費了。

注意:交換器沒有儲存能力,所以交換機必須與佇列繫結。

如果控制檯中還沒有該交換器,就先執行與該交換器繫結的消費者,那麼啟動將報錯;如果與該交換器繫結的消費者還沒有進行監聽,那麼生產者傳送的訊息將丟失。

交換器分發型別

fanout:每一個消費者接收全部訊息,不處理路由鍵。

生產者要宣告交換器,不需要宣告隊列了

傳送訊息時使用交換器引數,第二個引數路由鍵為空。

注意:簡單佇列不使用交換器時,生產者傳送訊息的路由鍵與佇列名稱一致。

消費宣告佇列,且要與交換器繫結

 queueBind方法的最後一個引數路由鍵為空

direct:訊息的路由鍵與佇列匹配,交換器則將該訊息分發給該佇列

因為消費者的佇列可以繫結多個路由鍵,所以同一個訊息還是可以被多個消費者接收。

direct模式要求不管是生產者還是消費者都要有自己明確的路由鍵,而在實際複雜的業務中,可以需要定義多個路由鍵,這樣一一繫結,可能會比較麻煩。

topic :將路由鍵與某個模式進行匹配,相當於direct的模糊匹配

#匹配一個或多個字元

*匹配一個字元

這裡有一個問題:生產者的路由鍵必須由XXX.XXX格式組成,消費者的路由鍵若使用topic模式,格式為XXX.#或#.XXX,否則就接收不到訊息。

若在控制檯中手動解綁,則被解綁的佇列及其消費者就不能再接收訊息了

訊息確認機制

生產者將訊息傳送出去以後,到底有沒有成功發給rabbitMq呢?注意,不是消費者有沒有成功處理完成。

1、可以通過AMQP協議的事務機制進行確認

發訊息後提交

出現異常回滾

每一次都要兩到三步向伺服器傳送事務,會降低伺服器的吞吐量,效率比較低。

2、rabbitMq訊息確認機制之confirm模式(序列)

注意:若佇列、通道之前已經是事務模式的,不能直接修改為confirm模式。

先開啟confirm確認模式

傳送訊息--確認

若是批量傳送的訊息,確認時若是有一條訊息傳送失敗,就會確認失敗。

3、rabbitMq訊息確認機制之confirm模式(非同步)

在訊息傳送成功時,刪除set中的標識;訊息傳送失敗時,可以重新發送,根據自己的業務去實現。

注意:此時若在生產者處關閉連線,則不能再進行監聽。