RocketMQ原理學習--延時訊息實現原理
RocketMQ提供了延時訊息型別,簡單來說就是生產者在傳送訊息的時候指定一個延時時間,當到達延時時間之後訊息才能夠被投送到消費者。
首先我們可以考慮一下RocketMQ的延時訊息是如何實現:
(1)生產者將延時訊息傳送到Broker,Broker是如何區分普通訊息和延時訊息(訊息型別)
(2)訊息如何保證不被消費者拉取到的(RocketMQ將訊息以SCHEDULE_TOPIC_XXXX為topic將延時訊息持久化,等到達延時訊息之後再以原有的topic重新儲存)。
1、簡單示例
(1)RocketMQ目前指定的延時時間間隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等級來表示時間間隔。
public class DelayProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); try { for (int i = 0; i < 3; i++) { Message msg = new Message("TopicA-test",// topic "TagA",// tag (new Date() + "Hello RocketMQ ,QuickStart 11" + i) .getBytes()// body ); //1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。 // level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推 msg.setDelayTimeLevel(2); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
2、Broker持久化延時訊息
Broker對於接收到的訊息首先會判斷一下是不是延時訊息,如果是延時訊息會將訊息以SCHEDULE_TOPIC_XXXX為topic替換原有的topic名稱進行持久化,實現方法在CommitLog的putMessage中。
首先會判斷msg的延時標準如果大於0,則重新設定訊息的topic名稱和queueId,之後將訊息以SCHEDULE_TOPIC_XXXX為topic,以延時時間的等級為queueId持久化到commitlog檔案中。
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
3、延時訊息處理
Broker將延時訊息以SCHEDULE_TOPIC_XXXX為topic名稱將訊息進行持久化,接下來我們看看Broker是如何將訊息在延時訊息到達之後進行訊息還原的。
RocketMQ提供了定時任務服務ScheduleMessageService,通過定時任務的方式不斷的讀取topic為SCHEDULE_TOPIC_XXXX何queueId為延時等級的訊息進行訊息還原處理,這樣訊息被還原之後消費者就可以拉取訊息了。
每個消費等級有個定時任務DeliverDelayedMessageTimerTask:
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
在DeliverDelayedMessageTimerTask中根據SCHEDULE_TOPIC_XXXX名稱和延時等級對應的queueId獲取訊息佇列,然後從commitlog中讀取訊息,還原訊息的原有資訊(訊息的原topic資訊)再將訊息持久化到commitlog檔案中,這樣消費者就可以拉取訊息了。
public void executeOnTimeup() {
//獲取消費者訊息
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//讀取訊息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//獲取訊息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//還原訊息資訊topic名稱等
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//重新將訊息持久化到commitlog中
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
總結:
延時訊息的實現還是挺精巧的,首先將延時訊息換了一個topic名稱進行持久化,這樣消費者就無法獲取訊息,然後有定時任務,會將訊息還原到原有的topic資訊,這樣消費者又可以重新拉取訊息了。