1. 程式人生 > >RocketMQ訊息重複消費的問題

RocketMQ訊息重複消費的問題

重複消費的問題的一個可能的問題:消費者消費訊息時產生了異常,並沒有返回CONSUME_SUCCESS標誌。
我急於尋找解決方法,結果百度的結果都是一期多個消費者問題云云,根本沒有解決我的問題。
我發現重複消費的訊息和第一次消費的訊息不同,多了一些重複消費的資訊:
reconsumeTimes=1,2,…10
REAL_TOPIC也會是:%RETRY%XXXXX
這就是因為訊息處理異常導致的訊息重新消費,無路時重啟服務端,還是通過mqadmin刪除都沒用,RocketMQ可以很好的保持訊息,一定要消費成功才可以!
官方對comsumerMessage方法的實現建議是:

It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure

無論如何,都不要丟擲異常,如果需要重新消費,可以返回RECONSUME_LATER主動要求重新消費。
下面是我的程式碼,我加入了catch Exception根異常來捕獲業務處理的異常。

consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    logger.debug
(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); MessagePack msgpack = new MessagePack(); for (MessageExt msg : msgs){ byte[] data = msg.getBody(); try { RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class
); logger.debug("Receive a message:" + rtmsg); anlysisRTMsgPack(rtmsg, engine); } catch (IOException e) { logger.error("Unpack RTMsg:", e); } catch (Exception e1){ logger.warn("Unexcepted exception.", e1); } } logger.debug("RETURN CONSUME SUCCESS."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });