RocketMQ訊息重複消費的問題
阿新 • • 發佈:2019-01-05
重複消費的問題的一個可能的問題:消費者消費訊息時產生了異常,並沒有返回CONSUME_SUCCESS標誌。
我急於尋找解決方法,結果百度的結果都是一期多個消費者問題云云,根本沒有解決我的問題。
我發現重複消費的訊息和第一次消費的訊息不同,多了一些重複消費的資訊:
reconsumeTimes=1,2,…10
REAL_TOPIC也會是:%RETRY%XXXXX
這就是因為訊息處理異常導致的訊息重新消費,無路時重啟服務端,還是通過mqadmin刪除都沒用,RocketMQ可以很好的保持訊息,一定要消費成功才可以!
官方對comsumerMessage方法的實現建議是:
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
無論如何,都不要丟擲異常,如果需要重新消費,可以返回RECONSUME_LATER主動要求重新消費。
下面是我的程式碼,我加入了catch Exception根異常來捕獲業務處理的異常。
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
logger.debug (Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
MessagePack msgpack = new MessagePack();
for (MessageExt msg : msgs){
byte[] data = msg.getBody();
try {
RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class );
logger.debug("Receive a message:" + rtmsg);
anlysisRTMsgPack(rtmsg, engine);
} catch (IOException e) {
logger.error("Unpack RTMsg:", e);
} catch (Exception e1){
logger.warn("Unexcepted exception.", e1);
}
}
logger.debug("RETURN CONSUME SUCCESS.");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});