RocketMQ實戰之 訊息重試
阿新 • • 發佈:2020-06-24
叢集消費模式下,訊息消費失敗後,Broker會通過訊息重試機制重新投遞訊息。
只有消費模式為 MessageModel.CLUSTERING(叢集模式) 時,Broker才會自動進行重試,廣播模式不會重試。
什麼情況屬於訊息消費失敗?
- 消費端返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 消費端返回null
- 消費端丟擲異常並且未被捕獲
對於一直消費不成功的訊息達到最大重試次數(16次)後,會投遞到死信佇列並設定為消費成功
訊息消費重試
// 訊息延遲級別定義
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ;
重試次數為1時,對應第一次傳送時間間隔為10s,然後依次類推
複製程式碼
RocketMQ採用了“時間衰減策略”進行訊息的重複投遞,即重試次數越多,訊息消費成功的可能性越小。
對於重試訊息,會複製一個新的訊息並備份Topic和QueueId投遞到主題SCHEDULE_TOPIC_XXXX下,然後由定時任務進行排程重試,重試時在恢復到原來的佇列
訊息傳送重試
訊息傳送失敗後會自動重試,預設重試2次。
可通過訊息記錄表記錄訊息的資訊和傳送狀態,對傳送失敗的訊息進行補償
/**設定訊息傳送失敗時最大重試次數,預設2次*/
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
/**同步傳送訊息,並指定超時時間*/
public SendResult send(Message msg,long timeout) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
return this.defaultMQProducerImpl.send(msg,timeout);
}
複製程式碼