1. 程式人生 > >RocketMQ系列(四)順序消費

RocketMQ系列(四)順序消費

折騰了好長時間才寫這篇文章,順序消費,看上去挺好理解的,就是消費的時候按照佇列中的順序一個一個消費;而併發消費,則是消費者同時從佇列中取訊息,同時消費,沒有先後順序。RocketMQ也有這兩種方式的實現,但是在實踐的過程中,就是不能順序消費,好不容易能夠實現順序消費了,發現採用併發消費的方式,消費的結果也是順序的,頓時就蒙圈了,到底怎麼回事?哪裡出了問題?百思不得其解。 經過多次除錯,檢視資料,debug跟蹤程式,最後終於搞清楚了,但是又不知道怎麼去寫這篇文章,是按部就班的講原理,講如何配置到最後實現,還是按照我的除錯過程去寫呢?我覺得還是按照我的除錯過程去寫這篇文章吧,因為我的調成過程應該和大多數人的理解思路是一致的,大家也更容易重視。 ## 環境回顧 我們先來回顧一下前面搭建的RocketMQ的環境,這對於我們理解RocketMQ的順序消費是至關重要的。我們的RocketMQ環境是一個兩主兩從的非同步叢集,其中有兩個broker,broker-a和broker-b,另外,我們建立了兩個Topic,“cluster-topic”,這個Topic我們在建立的時候指定的是叢集,也就是說我們傳送訊息的時候,如果Topic指定為“cluster-topic”,那麼這個訊息應該在broker-a和broker-b之間負載;另外建立的一個Topic是“broker-a-topic”,這個Topic我們在建立的時候指定的是broker-a,當我們傳送這個Topic的訊息時,這個訊息只會在broker-a當中,不會出現在broker-b中。 和大家羅嗦了這麼多,大家只要記住,我們的環境中有兩個broker,“broker-a”和“broker-b”,有兩個Topic,“cluster-topic”和“broker-a-topic”就可以了。 ## cluster-topic可以順序消費嗎 我們傳送的訊息,如果指定Topic為“cluster-topic”,那麼這種訊息將在broker-a和broker-b直接負載,這種情況能夠做到順序消費嗎?我們試驗一下, 消費端的程式碼如下: ```java @Bean(name = "pushConsumerOrderly", initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer pushConsumerOrderly() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pushConsumerOrderly"); consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); consumer.subscribe("cluster-topic","*"); consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { Random random = new Random(); try { Thread.sleep(random.nextInt(5) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; }); return consumer; } ``` * 消費者組的名稱,連線的NameServer,訂閱的Topic,這裡就不多說了; * 再來看一下注冊的訊息監聽器,它是MessageListenerOrderly,順序消費,具體實現裡我們打印出了訊息體的內容,最後返回消費成功ConsumeOrderlyStatus.SUCCESS。 * **重點看一下列印語句之前的隨機休眠,這是非常重要的一步,它可以驗證訊息是否是順序消費的,如果消費者是消費完一個訊息以後,再去取下一個訊息,那麼順序是沒有問題,但是如果消費者是併發地取訊息,但是每個消費者的休眠時間又不一樣,那麼打印出來的就是亂序** 生產端我們採用同步傳送的方式,程式碼如下: ```java @Test public void producerTest() throws Exception { for (int i = 0;i<5;i++) { Message message = new Message(); message.setTopic("cluster-topic"); message.setKeys("key-"+i); message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes()); SendResult sendResult = defaultMQProducer.send(message); System.out.println("i=" + i); System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName()); } } ``` 和前面一樣,我們傳送5個訊息,並且打印出i的值和broker的名稱,傳送訊息的順序是0,1,2,3,4,傳送完成後,我們觀察一下消費端的日誌,如果順序也是0,1,2,3,4,那麼就是順序消費。我們執行一下,看看結果吧。 生產者的傳送日誌如下: ```shell i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-a i=3 BrokerName:broker-a i=4 BrokerName:broker-b ``` 傳送5個訊息,其中4個在broker-a,1個在broker-b。再來看看消費端的日誌: ```shell this is simpleMQ,my NO is 3---Wed Jun 10 13:48:57 CST 2020 this is simpleMQ,my NO is 2---Wed Jun 10 13:48:57 CST 2020 this is simpleMQ,my NO is 4---Wed Jun 10 13:48:57 CST 2020 this is simpleMQ,my NO is 1---Wed Jun 10 13:48:57 CST 2020 this is simpleMQ,my NO is 0---Wed Jun 10 13:48:56 CST 2020 ``` 順序是亂的?怎麼回事?說明消費者在並不是一個消費完再去消費另一個,而是拉取了一個訊息以後,並沒有消費完就去拉取下一個訊息了,那這不是併發消費嗎?可是我們程式中設定的是順序消費啊。**這裡我們就開始懷疑是broker的問題,難道是因為兩個broker引起的?順序消費只能在一個broker裡才能實現嗎?那我們使用broker-a-topic這個試一下吧。** ## broker-a-topic可以順序消費嗎? 我們把上面的程式稍作修改,只把訂閱的Topic和傳送訊息時訊息的Topic改為broker-a-topic即可。程式碼在這裡就不給大家重複寫了,重啟一下程式,傳送訊息看看日誌吧。 生產者端的日誌如下: ```shell i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-a i=3 BrokerName:broker-a i=4 BrokerName:broker-a ``` 我們看到5個訊息都發送到了broker-a中,再來看看消費端的日誌, ```shell this is simpleMQ,my NO is 0---Wed Jun 10 14:00:28 CST 2020 this is simpleMQ,my NO is 2---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 3---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 4---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 1---Wed Jun 10 14:00:29 CST 2020 ``` 消費的順序還是亂的,這是怎麼回事?訊息都在broker-a中了,為什麼消費時順序還是亂的?程式有問題嗎?review了好幾遍沒有發現問題。 ## 問題排查 問題卡在這個地方,卡了好長時間,最後在官網的示例中發現,它在傳送訊息時,使用了一個MessageQueueSelector,我們也實現一下試試吧,改造一下發送端的程式,如下: ```java SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() { @Override public MessageQueue sel