RocketMQ學習(七)——RocketMQ延時訊息
使用者傳送的訊息也可以指定延時時間(更準確的說是延時等級),然後在指定延時時間之後投遞訊息,然後被consumer消費。阿里雲的ons還支援定時訊息,而且延時訊息是直接指定延時時間,其實阿里雲的延時訊息也是定時訊息的另一種表述方式,都是通過設定訊息被投遞的時間來實現的,但是Apache RocketMQ在版本4.2.0中尚不支援指定時間的延時,只能通過配置延時等級和延時等級對應的時間來實現延時。
一個延時訊息被髮出到消費成功經歷以下幾個過程:
1) 設定訊息的延時級別delayLevel。
2) producer傳送訊息。
3) broker收到訊息在準備將訊息寫入儲存的時候,判斷是延時訊息則更改Message的topic為延時訊息佇列的topic,也就是將訊息投遞到延時訊息佇列。
4)
5) consumer像消費其他訊息一樣從broker拉取訊息進行消費。
注意: 批量訊息是不支援延時訊息的。
producer傳送延時訊息
在producer中傳送訊息的時候,設定Message的delayLevel。
// org.apache.rocketmq.common.message.Message
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst. PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
呼叫上面的方法設定延時等級的時候,會向message新增"DELAY"屬性,後面broker處理延時訊息就是依賴該屬性進行特別的處理。接下來發送訊息的流程和正常傳送訊息的流程基本一致,只是會將該訊息標記為延時訊息型別。
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
if (msg.getProperty("__STARTDELIVERTIME") != null || msg. getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
broker處理延時訊息
broker收到延時訊息和正常訊息在前置的處理流程是一致的,對於延時訊息的特殊處理體現在將訊息寫入儲存(記憶體或檔案)的時候。
// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 省略中間程式碼...
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 拿到原始topic和對應的queueId
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 非事務訊息和事務的commit訊息才會進一步判斷delayLevel
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
// 糾正設定過大的level,就是delayLevel設定都大於延時時間等級的最大級
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 設定為延時佇列的topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 每一個延時等級一個queue,queueId = delayLevel - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 備份原始的topic和queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
// 更新properties
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 省略中間程式碼...
}
上面的SCHEDULE_TOPIC是:
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
這個topic是一個特殊的topic,和正常的topic不同的地方是:
1、不會建立TopicConfig,因為也不需要consumer直接消費這個topic下的訊息。
2、不會將topic註冊到namesrv。
3、這個topic的佇列個數和延時等級的個數是相同的。
後面訊息寫入的過程和普通的又是一致的。上面將訊息寫入延時佇列中了,接下來就是處理延時佇列中的訊息,然後重新發送回原始topic的佇列中。
在此之前先說明下至今還有疑問的一個個概念——delayLevel。這個概念和我們接下要需要用到的的類ScheduleMessageService有關,這個類的欄位delayLevelTable裡面儲存了具體的延時等級。
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
看下這個欄位的初始化過程:
// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
// 每個延時等級延時時間的單位對應的ms數
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
// 延時等級在MessageStoreConfig中配置
// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
// 根據空格將配置分隔出每個等級
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
// 時間單位對應的ms數
Long tu = timeUnitTable.get(ch);
// 延時等級從1開始
int level = i + 1;
if (level > this.maxDelayLevel) {
// 找出最大的延時等級
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
// 省略部分程式碼...
}
上面這個load方法在broker啟動的時候DefaultMessageStore會呼叫來初始化延時等級。接下來就應該解決怎麼處理延時訊息佇列中的訊息的問題了。處理延時訊息的服務是:ScheduleMessageService。是broker啟動的時候DefaultMessageStore會呼叫ScheduleMessageService#start來啟動處理延時訊息佇列的服務:
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 記錄佇列的處理進度
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 每個延時佇列啟動一個定時任務來處理該佇列的延時訊息
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 持久化offsetTable(儲存了每個延時佇列對應的處理進度offset)
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
DeliverDelayedMessageTimerTask是一個TimerTask,啟動以後不斷處理延時佇列中的訊息,直到出現異常則終止該執行緒重新啟動一個新的TimerTask,它會呼叫executeOnTimeup方法。
public void executeOnTimeup() {
// 找到該延時等級對應的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 記錄異常情況下一次啟動TimerTask開始處理的offset
long failScheduleOffset = offset;
if (cq != null) {
// 找到offset所處的MappedFile中offset後面的buffer
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 下面三個欄位資訊是ConsumeQueue物理儲存的資訊
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
// 注意這個tagCode,不再是普通的tag的hashCode,而是該延時訊息到期的時間
long tagsCode = bufferCQ.getByteBuffer().getLong();
// 省略中間程式碼....
long now = System.currentTimeMillis();
// 計算應該投遞該訊息的時間,如果已經超時則立即投遞
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 計算下一個訊息的開始位置,用來尋找下一個訊息位置(如果有的話)
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 判斷延時訊息是否到期
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 將訊息恢復到原始訊息的格式,恢復topic、queueId、tagCode等,清除屬性"DELAY"
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
// 投遞成功,處理下一個
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
// 投遞失敗,結束當前task,重新啟動TimerTask,從下一個訊息開始處理,也就是說當前訊息丟棄
// 更新offsetTable中當前佇列的offset為下一個訊息的offset
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
// 重新投遞期間出現任何異常,結束當前task,重新啟動TimerTask,從當前訊息開始重試
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 處理完當前MappedFile中的訊息後,重新啟動TimerTask,從下一個訊息開始處理
// 更新offsetTable中當前佇列的offset為下一個訊息的offset
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
// 如果根據offsetTable中的offset沒有找到對應的訊息(可能被刪除了),則按照當前ConsumeQueue的最小offset開始處理
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
對於上面的tagCode做一下特別說明,延時訊息的tagCode和普通訊息不一樣:
- 延時訊息的tagCode:儲存的是訊息到期的時間。
- 非延時訊息的tagCode:tags字串的hashCode。
總結
以上就是RocketMQ延時訊息的實現方式,上面沒有詳說的是重試訊息的延時是怎麼實現的,其實就是在consumer將延時訊息傳送回broker的時候設定了(使用者可以自己設定,如果沒有自己設定預設是0)delayLevel,到了broker處理重試訊息的時候如果delayLevel是0(也就是說是預設的延時等級)的時候會在原來的基礎上加3,後面的處理就和上面說的延時訊息一樣了,儲存的時候將訊息投遞到延時佇列,等待延時到期後再重新投遞到原始topic佇列中等到consumer消費。