RocketMQ(五)——消費模式
宣告:
目錄:
一叢集消費
二廣播消費
內容補充
一、叢集消費
之前的部落格中,啟動的都是單個Consumer,如果啟動多個呢?
RocketMQ-叢集消費
其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的訊息負載均衡!通俗點來說,RocketMQ中的訊息通過ConsumeGroup實現了將訊息分發到C1/C2/C3/……的機制,這意味著我們將非常方便的通過加機器來實現水平擴充套件!
我們考慮一下這種情況:比如C2發生了重啟,一條訊息發往C3進行消費,但是這條訊息的處理需要0.1S,而此時C2剛好完成重啟,那麼C2是否可能會收到這條訊息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱後生產訊息,都可能導致訊息的重複消費!關於去重的話題會在後續中予以介紹!
至於訊息分發到C1/C2/C3,其實也是可以設定策略的:
RocketMQ-訊息負載策略
使用哪種策略,只需要例項化對應的物件即可,如:
AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle();
consumer.setAllocateMessageQueueStrategy(aqs);
上面內容,其實是一種消費模式——叢集消費。
RocketMQ的消費模式有2種,檢視一下原始碼:
public enum MessageModel { /** * broadcast */ BROADCASTING, /** * clustering */ CLUSTERING; }
在預設情況下,就是叢集消費(CLUSTERING),也就是上面提及的訊息的負載均衡消費。另一種消費模式,是廣播消費(BROADCASTING)。
二、廣播消費
廣播消費,類似於ActiveMQ中的釋出訂閱模式,訊息會發給Consume Group中的每一個消費者進行消費。
RocketMQ-廣播消費模式設定
/** * Consumer,訂閱訊息 */ public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); consumer.setConsumeMessageBatchMaxSize(10); // 設定為廣播消費模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
內容補充
《RocketMQ(三)——HelloWorld》那篇部落格的最後提到了單批次訊息消費數量 ,本文既然提到了叢集消費,那就針對這兩個內容再進行一次補充吧。
如果我們有2臺節點,Producerw往MQ上寫入20條資料 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種情況下,假如Consumer1宕機,那麼我們消費資料的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。需要Consumer1恢復之後這12條資料才能繼續被消費。其實這種先啟動producer往MQ上寫資料,然後再啟動Consumer的情況本來就是違規操作,正確的情況應該是先啟動Consumer後再啟動producer。