1. 程式人生 > >RocketMQ實戰(三)之生產者、消費者

RocketMQ實戰(三)之生產者、消費者

一:Maven配置

加入rocketmq-client依賴

二:生產者、消費者

1:生產者

2:消費者

DefaultMQPushConsumer和DefaultMQProducer需要設定三個引數:

一是這個Consumer的GroupName,二是NameServer的地址和埠號,三是Topic的名稱。

無論生產者、消費者都必須給出GroupName,而且具有唯一性!

生產到哪個Topic的哪個Tag下,消費者也是從Topic的哪個Tag進行消費,可見這個Tag有點類似於JMS Selector機制,即實現訊息的過濾。

生產者、消費者需要設定NameServer地址。

三:執行結果

生產者的執行結果:

生產者可以自動實現了訊息的負載均衡!

消費者執行結果:

消費者消費訊息是沒有什麼順序的,以後我們在來談訊息的順序性。

我們再來看一看管控臺:

四:初步瞭解訊息失敗重試機制

訊息失敗,無非涉及到2端:從生產者端發往MQ的失敗;消費者端從MQ消費訊息的失敗

1:生產端的失敗重試:

生產者端的訊息失敗:比如網路抖動導致生產者傳送訊息到MQ失敗。

上圖程式碼示例的處理手段是:如果該條訊息在1S內沒有傳送成功,那麼重試3次。

2:消費端的失敗重試

消費者端的失敗,分為2種情況,一個是timeout,一個是exception

timeout,比如由於網路原因導致訊息壓根就沒有從MQ到消費者上,在RocketMQ內部會不斷的嘗試傳送這條訊息,直至傳送成功為止!(比如叢集中一個broker失敗,就嘗試另一個broker)

exception,訊息正常的到了消費者,結果消費者發生異常,處理失敗了。這裡涉及到一些問題,需要我們思考下,比如,消費者消費訊息的狀態有哪些定義?如果失敗,MQ將採取什麼策略進行重試?假設一次性批量PUSH了10條,其中某條資料消費異常,那麼訊息重試是10條呢,還是1條呢?而且在重試的過程中,需要保證不重複消費嗎?

訊息消費的狀態,有2種,一個是成功(CONSUME_SUCCESS),一個是失敗&稍後重試(RECONSUME_LATER

)

注意了,對於消費訊息而言,存在2種指定的狀態(成功 OR 失敗重試),如果一條訊息在消費端處理沒有返回這2個狀態,那麼相當於這條訊息沒有達到消費者,勢必會再次傳送給消費者!也即是訊息的處理必須有返回值,否則就進行重發。

對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的訊息負載均衡!通俗點來說,RocketMQ中的訊息通過ConsumeGroup實現了將訊息分發到C1/C2/C3/......的機制,這意味著我們將非常方便的通過加機器來實現水平擴充套件!

我們考慮一下這種情況:比如C2發生了重啟,一條訊息發往C3進行消費,但是這條訊息的處理需要0.1S,而此時C2剛好完成重啟,那麼C2是否可能會收到這條訊息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱後生產訊息,都可能導致訊息的重複消費!關於去重的話題會在後續中予以介紹!

至於訊息分發到C1/C2/C3,其實也是可以設定策略的。

五:叢集消費 AND 廣播消費

RocketMQ的消費方式有2種,在預設情況下,就是叢集消費,也就是訊息的負載均衡消費。另一種消費模式,是廣播消費。廣播消費,類似於ActiveMQ中的釋出訂閱模式,訊息會發給Consume Group中的每一個消費者進行消費。