1. 程式人生 > >RocketMQ延時訊息

RocketMQ延時訊息

一個延時訊息被髮出到消費成功經歷以下幾個過程:

  1. 設定訊息的延時級別delayLevel
  2. producer傳送訊息
  3. broker收到訊息在準備將訊息寫入儲存的時候,判斷是延時訊息則更改Message的topic為延時訊息佇列的topic,也就是將訊息投遞到延時訊息佇列
  4. 有定時任務從延時佇列中讀取訊息,拿到訊息後判斷是否達到延時時間,如果到了則修改topic為原始topic。並將訊息投遞到原始topic的佇列
  5. consumer像消費其他訊息一樣從broker拉取訊息進行消費

注意:批量訊息是不支援延時訊息的

tips:下文中說到的延時佇列可以理解為一個ConsumeQueue

producer傳送延時訊息

在producer中傳送訊息的時候,設定Message的delayLevel

// org.apache.rocketmq.common.message.Message
publicvoidsetDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

呼叫上面的方法設定延時等級的時候,會向message新增"DELAY"屬性,後面broker處理延時訊息就是依賴該屬性進行特別的處理。

接下來發送訊息的流程和正常傳送訊息的流程基本一致,只是會將該訊息標記為延時訊息型別

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); }

broker處理延時訊息

broker收到延時訊息和正常訊息在前置的處理流程是一致的,對於延時訊息的特殊處理體現在將訊息寫入儲存(記憶體或檔案)的時候

// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult 
putMessage(final MessageExtBrokerInner msg) { // 省略中間程式碼... StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); // 拿到原始topic和對應的queueId String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 非事務訊息和事務的commit訊息才會進一步判斷delayLevel if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { // 糾正設定過大的level,就是delayLevel設定都大於延時時間等級的最大級 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 設定為延時佇列的topic topic = ScheduleMessageService.SCHEDULE_TOPIC; // 每一個延時等級一個queue,queueId = delayLevel - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId // 備份原始的topic和queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); // 更新properties msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } // 省略中間程式碼... }

上面的SCHEDULE_TOPIC是:

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

這個topic是一個特殊的topic,和正常的topic不同的地方是:

  1. 不會建立TopicCOnfig,因為也不需要consumer直接消費這個topic下的訊息
  2. 不會將topic註冊到namesrv
  3. 這個topic的佇列個數和延時等級的個數是相同的

後面訊息寫入的過程和普通的又是一致的。

上面將訊息寫入延時佇列中了,接下來就是處理延時佇列中的訊息,然後重新發送回原始topic的佇列中。

在此之前先說明下至今還有疑問的一個個概念——delayLevel。這個概念和我們接下要需要用到的的類org.apache.rocketmq.store.schedule.ScheduleMessageService有關,這個類的欄位delayLevelTable裡面儲存了具體的延時等級

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

看下這個欄位的初始化過程

// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
publicbooleanparseDelayLevel() {
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    // 每個延時等級延時時間的單位對應的ms數
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    // 延時等級在MessageStoreConfig中配置
    // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        // 根據空格將配置分隔出每個等級
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            // 時間單位對應的ms數
            Long tu = timeUnitTable.get(ch);

            // 延時等級從1開始
            int level = i + 1;
            if (level > this.maxDelayLevel) {
                // 找出最大的延時等級
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
    // 省略部分程式碼...
}

上面這個load方法在broker啟動的時候DefaultMessageStore會呼叫來初始化延時等級。

接下來就應該解決怎麼處理延時訊息佇列中的訊息的問題了。處理延時訊息的服務是:ScheduleMessageService。

還是broker啟動的時候DefaultMessageStore會呼叫org.apache.rocketmq.store.schedule.ScheduleMessageService#start來啟動處理延時訊息佇列的服務:

publicvoidstart() {

    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        Integer level = entry.getKey();
        Long timeDelay = entry.getValue();
        // 記錄佇列的處理進度
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            // 每個延時佇列啟動一個定時任務來處理該佇列的延時訊息
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        publicvoidrun() {
            try {
                // 持久化offsetTable(儲存了每個延時佇列對應的處理進度offset)
                ScheduleMessageService.this.persist();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

DeliverDelayedMessageTimerTask是一個TimerTask,啟動以後不斷處理延時佇列中的訊息,直到出現異常則終止該執行緒重新啟動一個新的TimerTask

publicvoidexecuteOnTimeup() {
    // 找到該延時等級對應的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 記錄異常情況下一次啟動TimerTask開始處理的offset
    long failScheduleOffset = offset;

    if (cq != null) {
        // 找到offset所處的MappedFile中offset後面的buffer
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 下面三個欄位資訊是ConsumeQueue物理儲存的資訊
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // 注意這個tagCode,不再是普通的tag的hashCode,而是該延時訊息到期的時間
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    // 省略中間程式碼....
                    long now = System.currentTimeMillis();
                    // 計算應該投遞該訊息的時間,如果已經超時則立即投遞
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 計算下一個訊息的開始位置,用來尋找下一個訊息位置(如果有的話)
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 判斷延時訊息是否到期
                    long countdown = deliverTimestamp - now;

                    if (countdown <= 0) {
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 將訊息恢復到原始訊息的格式,恢復topic、queueId、tagCode等,清除屬性"DELAY"
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.defaultMessageStore
                                        .putMessage(msgInner);

                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    // 投遞成功,處理下一個
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    // 投遞失敗,結束當前task,重新啟動TimerTask,從下一個訊息開始處理,也就是說當前訊息丟棄
                                    // 更新offsetTable中當前佇列的offset為下一個訊息的offset
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                // 重新投遞期間出現任何異常,結束當前task,重新啟動TimerTask,從當前訊息開始重試
                                /*                                 * XXX: warn and notify me                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for
                // 處理完當前MappedFile中的訊息後,重新啟動TimerTask,從下一個訊息開始處理
                // 更新offsetTable中當前佇列的offset為下一個訊息的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            // 如果根據offsetTable中的offset沒有找到對應的訊息(可能被刪除了),則按照當前ConsumeQueue的最小offset開始處理
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

對於上面的tagCode做一下特別說明,延時訊息的tagCode和普通訊息不一樣:

  • 延時訊息的tagCode:儲存的是訊息到期的時間
  • 非延時訊息的tagCode:tags字串的hashCode

對延時訊息的tagCode的特別處理是在下面這個方法中完成的,也就是在build ConsumeQueue資訊的時候

org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

http://www.jlh2124.cn/
http://www.bjl7141.cn/
http://news.azd6793.cn/
http://www.cun5054.cn/
http://news.akb6775.cn/
http://www.mtr5072.cn/
http://www.zlq5247.cn/
http://www.rxi1689.cn/
http://www.itb0047.cn/
http://news.buu7798.cn/
http://news.tzn6024.cn/
http://news.ath0401.cn/
http://www.miv2453.cn/
http://www.ath0401.cn/
http://www.gky0942.cn/
http://news.axs9870.cn/
http://www.vme0237.top/
http://news.ybc8953.cn/
http://news.bah1564.cn/
http://www.win4778.top/
http://news.kbs9896.cn/
http://www.alj9141.cn/
http://www.ncc8754.cn/
http://www.mtl1611.cn/
http://www.kpq2047.cn/
http://www.bti6873.cn/
http://news.uzu8211.cn/
http://www.yvr8830.cn/
http://www.tyo9948.cn/
http://www.ymg3874.cn/
http://www.sdb0307.cn/
http://news.xgs5975.cn/
http://news.miv2453.cn/
http://www.usx0758.cn/
http://news.jiq1934.cn/
http://www.hwp3498.cn/
http://www.dep9137.cn/
http://www.zbb7727.cn/
http://www.nlc4773.cn/
http://news.vpk8803.cn/
http://www.hij5984.cn/
http://www.ryc2010.top/
http://www.qld9407.cn/
http://news.gmx2930.cn/
http://www.nrr1653.cn/
http://news.cxb4532.cn/
http://news.bin6562.cn/
http://www.ric5056.cn/
http://www.bjb5476.cn/
http://www.eyf3292.cn/
http://news.xcd5039.cn/
http://www.xtl7183.cn/
http://news.fzl7156.cn/
http://news.mco2769.cn/
http://www.bmh5849.top/
http://www.ogr7085.cn/
http://www.nwz4782.top/
http://www.zri8413.cn/

相關推薦

RocketMQ訊息的使用和級別的配置

RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。 其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 如果要支援任意的時間精度,在Broker層面,必須要做訊息排序,

RocketMQ訊息

一個延時訊息被髮出到消費成功經歷以下幾個過程:設定訊息的延時級別delayLevelproducer傳送訊息broker收到訊息在準備將訊息寫入儲存的時候,判斷是延時訊息則更改Message的topic為延時訊息佇列的topic,也就是將訊息投遞到延時訊息佇列有定時任務從延時

RocketMQ學習(七)——RocketMQ訊息

使用者傳送的訊息也可以指定延時時間(更準確的說是延時等級),然後在指定延時時間之後投遞訊息,然後被consumer消費。阿里雲的ons還支援定時訊息,而且延時訊息是直接指定延時時間,其實阿里雲的延時訊息也是定時訊息的另一種表述方式,都是通過設定訊息被投遞的時間

RocketMQ原理學習--訊息實現原理

         RocketMQ提供了延時訊息型別,簡單來說就是生產者在傳送訊息的時候指定一個延時時間,當到達延時時間之後訊息才能夠被投送到消費者。       首先我們可以考慮一下RocketMQ的延時訊息是

Spring boot實戰專案整合阿里雲RocketMQ (非開源版)訊息佇列實現傳送普通訊息訊息 --附程式碼

一.為什麼選擇RocketMQ訊息佇列? 首先RocketMQ是阿里巴巴自研出來的,也已開源。其效能和穩定性從雙11就能看出來,借用阿里的一句官方介紹:歷年雙 11 購物狂歡節零點千萬級 TPS、萬億級資料洪峰,創造了全球最大的業務訊息併發以及流轉紀錄(日誌類訊息除外);  在始終保證高效能前提下

rocketmq消息

string 解決 print log void ride reg .sh art rocketmq提供一種延時消息的解決方案,就是在特定的時間到了,消息才會被投遞出去供consumer消費。總體來是簡單的場景是滿足了,但是需要註意的是延時的時間是需要按照默認配置的延時級別

訊息佇列

下面程式碼按需要填寫 @Bean public Queue delayQueuePerMessageTTL() { Map<String, Object> argument = new HashMap<>(); argument.put(“x-message-ttl

RabbitMQ 訊息設計

 問題背景 所謂"延時訊息"是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。 場景一:客戶A在十二點下了一個訂單,我想半個小時後來檢查一下這個訂單的付款狀態,根據付款狀態來作下一步的處理。 a. 針對場景一,建議採用方案資料庫儲存+sc

Springboot2(28)整合rabbitmq實現訊息

原始碼地址 springboot2教程系列 rabbitmq實現訊息的確認機制和延時訊息的傳送 訊息生產者程式碼實現的主要配置 @Configuration @Slf4j public class PrividerRabbitm

Redis訊息佇列、非同步訊息佇列的實現

package list; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.a

rabbitmq發訊息以及通過一個exchange發到不同的queue

public static void main(String[] args) throws Exception { producer(1); producer(2); producer(3); } private static void

阿里MQ普通+順序+訊息 整合Spring

前言 由於公司專案需要,研究了下AliWareMQ。阿里mq的普通訊息和延時訊息還是挺簡單的。不過在順序訊息的時候出現了一些瓶頸。後來查閱原始碼和依據demo整理了一版融合Spring的版本。 例項 mq配置檔案(Spring) 主要是順序訊息的

RocketMQ 級別配置

RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 如何配置: 在伺服器端(rock

Java關於DelayQueue做訊息推送

最近比較閒,看某專案原始碼時看到有用DelayQueue類來做延時的訊息推送。 DelayQueue是Delayed元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的Delayed元素。如果延遲都還沒有期滿,則佇列沒有頭部,並

手把手實現一條訊息

前言 近期在維護公司的排程平臺,其中有個關鍵功能那就是定時任務;定時任務大家平時肯定接觸的不少,比如 JDK 中的 Timer、ScheduledExecutorService、排程框架 Quartz 等。 通常用於實現 XX 時間後的延時任務,或週期性任務; 比如一個常見的業務場景:使用者下單 N 分鐘

玩轉redis-訊息佇列

上一篇基於redis的list實現了一個簡單的訊息佇列:玩轉redis-簡單訊息佇列 原始碼地址 使用demo 產品經理經常說的一句話,我們不光要有X功能,還要Y功能,這樣客戶才能更滿意。同樣的,只有簡單訊息佇列是不夠的,還要有延時訊息佇列才能算是一個完整的訊息佇列。 看看redis的命令,放眼望去,的有序

Redis 非同步訊息佇列與佇列

        訊息中介軟體,大家都會想到  Rabbitmq 和 Kafka 作為訊息佇列中介軟體,來給應用程式之間增加非同步訊息傳遞功能。這兩個中介軟體都是專業的訊息佇列中介軟體,特性之多超出了大多數人的理解能力。但是這種屬於重量級的應

效能提升五十倍:訊息佇列聚合通知的重要性

前言 這個話題對我而言,是影響很久的事情。從第一次使用訊息佇列開始,業務背景是報名系統通知到我們的系統。正常流量下資料都能正常通知過來,但遇到匯入報名人時,採用了Task非同步通知,資料量一大,佇列就死了。當時是儘量採用同步方式,減少併發量。  後來業務上有了專門的營銷系統

Redis實現可靠低訊息佇列

不過因為使用的是decrBy會導致一種情況出現,當前庫存還剩1個,2個執行緒同時請求,一個請求減去1個庫存,另一個請求減去2個庫存,,如果減去2個庫存的先執行,他會返回一個-1,然後我會加回去,但是在加回去的之前,減1庫存的執行緒執行了,會返回-2,依然沒有辦法減庫存成功,所以在這中情況下,我採用當減庫存返回

Active MQ傳送訊息的Java示例程式碼段

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import java