1. 程式人生 > >RocketMQ(五)——消費模式

RocketMQ(五)——消費模式

宣告:

目錄:
一叢集消費
二廣播消費
內容補充

一、叢集消費

之前的部落格中,啟動的都是單個Consumer,如果啟動多個呢?

RocketMQ-叢集消費 
RocketMQ-叢集消費

其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的訊息負載均衡!通俗點來說,RocketMQ中的訊息通過ConsumeGroup實現了將訊息分發到C1/C2/C3/……的機制,這意味著我們將非常方便的通過加機器來實現水平擴充套件!

我們考慮一下這種情況:比如C2發生了重啟,一條訊息發往C3進行消費,但是這條訊息的處理需要0.1S,而此時C2剛好完成重啟,那麼C2是否可能會收到這條訊息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱後生產訊息,都可能導致訊息的重複消費!關於去重的話題會在後續中予以介紹!

至於訊息分發到C1/C2/C3,其實也是可以設定策略的: 

RocketMQ-訊息負載策略 
RocketMQ-訊息負載策略

使用哪種策略,只需要例項化對應的物件即可,如:

AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle();
consumer.setAllocateMessageQueueStrategy(aqs);

上面內容,其實是一種消費模式——叢集消費。 
RocketMQ的消費模式有2種,檢視一下原始碼:

public enum MessageModel {
    /**
     * broadcast
     */
    BROADCASTING,
    /**
     * clustering
     */
    CLUSTERING;
}

在預設情況下,就是叢集消費(CLUSTERING),也就是上面提及的訊息的負載均衡消費。另一種消費模式,是廣播消費(BROADCASTING)。

二、廣播消費

廣播消費,類似於ActiveMQ中的釋出訂閱模式,訊息會發給Consume Group中的每一個消費者進行消費。 

RocketMQ-廣播消費模式設定 
RocketMQ-廣播消費模式設定

/**
 * Consumer,訂閱訊息
 */
public class Consumer2 {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        consumer.setConsumeMessageBatchMaxSize(10);

        // 設定為廣播消費模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;   // 重試
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;       // 成功
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}

內容補充

《RocketMQ(三)——HelloWorld》那篇部落格的最後提到了單批次訊息消費數量 ,本文既然提到了叢集消費,那就針對這兩個內容再進行一次補充吧。 
如果我們有2臺節點,Producerw往MQ上寫入20條資料 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種情況下,假如Consumer1宕機,那麼我們消費資料的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。需要Consumer1恢復之後這12條資料才能繼續被消費。其實這種先啟動producer往MQ上寫資料,然後再啟動Consumer的情況本來就是違規操作,正確的情況應該是先啟動Consumer後再啟動producer。