1. 程式人生 > >如何在MQ中實現支持任意延遲的消息?

如何在MQ中實現支持任意延遲的消息?

ima sch 版本 未來 req .com mes pro ons

什麽是定時消息和延遲消息?

  • 定時消息:Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費,該消息即定時消息。
  • 延遲消息:Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是延遲一定時間後才投遞到 Consumer 進行消費,該消息即延時消息。

定時消息與延遲消息在代碼配置上存在一些差異,但是最終達到的效果相同:消息在發送到 MQ 服務端後並不會立馬投遞,而是根據消息中的屬性延遲固定時間後才投遞給消費者。

目前業界MQ對定時消息和延遲消息的支持情況

技術分享圖片

上圖是阿裏雲上對業界MQ功能的對比,其中開源產品中只有阿裏的RocketMQ支持延遲消息,且是固定的18個Level。

固定Level的含義是延遲是特定級別的,比如支持3秒、5秒的Level,那麽用戶只能發送3秒延遲或者5秒延遲,不能發送8秒延遲的消息。

消息隊列RocketMQ的阿裏雲版本(收費版本)才支持到精確到秒級別的延遲消息(沒有特定Level的限制)。

技術分享圖片

上圖是CMQ中對MQ功能的對比,其中標明騰訊的CMQ支持延遲消息,但是沒有具體寫明支持到什麽精度,支持任意時間還是特定的Level。

技術分享圖片

通過騰訊雲上CMQ的API文檔可以看到有一個秒級別的delaySeconds,應該是支持任意級別的延遲,即和收費版本的RocketMQ一致。

總結

  • 開源版本中,只有RocketMQ支持延遲消息,且只支持18個特定級別的延遲
  • 付費版本中,阿裏雲和騰訊雲上的MQ產品都支持精度為秒級別的延遲消息

(真是有錢能使鬼推磨啊,有錢就能發任意延遲的消息了,沒錢最多只能發特定Level了)

任意延遲的消息難點在哪裏?

開源版本沒有支持任意延遲的消息,我想可能有以下幾個原因:

  1. 任意延遲的消息的需求不強烈
  2. 可能是一個比較有技術含量的點,不願意開源

需求不強

對支持任意延遲的需求確實不強,因為:

  1. 延遲並不是MQ場景的核心功能,業務單獨做一個替代方案的成本不大
  2. 業務上一般對延遲的需求都是固定的,比如下單後半小時check是否付款,發貨後7天check是否收貨

在我司,MQ上線一年多後才有業務方希望我能支持延遲消息,且不要求任意延遲,只要求和RocketMQ開源版本一致,支持一些業務上的級別即可。

不願意開源

為了差異化(好在雲上賣錢),只能降開源版本的功能進行閹割,所以開源版本的RocketMQ變成了只支持特定Level的延遲。

難點在哪裏?

既然業務有需求,我們肯定也要去支持。

首先,我們先劃清楚定義和邊界:

在我們的系統範圍內,支持任意延遲的消息指的是:

  1. 精度支持到秒級別

  2. 最大支持30天的延遲

本著對自己的高要求,我們並不滿足於開源RocketMQ的18個Level的方案。那麽,如果我們自己要去實現一個支持任意延遲的消息隊列,難點在哪裏呢?

  1. 排序
  2. 消息存儲

首先,支持任意延遲意味著消息是需要在服務端進行排序的。

比如用戶先發了一條延遲1分鐘的消息,一秒後發了一條延遲3秒的消息,顯然延遲3秒的消息需要先被投遞出去。那麽服務端在收到消息後需要對消息進行排序後再投遞出去。

在MQ中,為了保證可靠性,消息是需要落盤的,且對性能和延遲的要求,決定了在服務端對消息進行排序是完全不可接受的。

其次,目前MQ的方案中都是基於WAL的方式實現的(RocketMQ、Kafka),日誌文件會被過期刪除,一般會保留最近一段時間的數據。

支持任意級別的延遲,那麽需要保存最近30天的消息。

阿裏內部 1000+ 核心應用使用,每天流轉幾千億條消息,經過雙11交易、商品等核心鏈路真實場景的驗證,穩定可靠。

考慮一下一天幾千億的消息,保存30天的話需要堆多少服務器,顯然是無法做到的。

知己知彼

雖然決定自己做,但是依舊需要先了解開源的實現,那麽就只能看看RocketMQ開源版本中,支持18個Level是怎麽實現的,希望能從中得到一些靈感。

技術分享圖片

上圖是通過RocketMQ源碼分析後簡化一個實現原理方案示意圖。

分為兩個部分:

  1. 消息的寫入
  2. 消息的Schedule

消息寫入中:

  1. 在寫入CommitLog之前,如果是延遲消息,替換掉消息的Topic和queueId(被替換為延遲消息特定的Topic,queueId則為延遲級別對應的id)
  2. 消息寫入CommitLog之後,提交dispatchRequest到DispatchService
  3. 因為在第①步中Topic和QueueId被替換了,所以寫入的ConsumeQueue實際上非真正消息應該所屬的ConsumeQueue,而是寫入到ScheduledConsumeQueue中(這個特定的Queue存放不會被消費)

Schedule過程中:

  1. 給每個Level設置定時器,從ScheduledConsumeQueue中讀取信息
  2. 如果ScheduledConsumeQueue中的元素已近到時,那麽從CommitLog中讀取消息內容,恢復成正常的消息內容寫入CommitLog
  3. 寫入CommitLog後提交dispatchRequest給DispatchService
  4. 因為在寫入CommitLog前已經恢復了Topic等屬性,所以此時DispatchService會將消息投遞到正確的ConsumeQueue中

回顧一下這個方案,最大的優點就是沒有了排序:

  • 先發一條level是5s的消息,再發一條level是3s的消息,因為他們會屬於不同的ScheduleQueue所以投遞順序能保持正確
  • 如果先後發兩條level相同的消息,那麽他們的處於同一個ConsumeQueue且保持發送順序
  • 因為level數固定,每個level的有自己獨立的定時器,開銷也不會很大
  • ScheduledConsumeQueue其實是一個普通的ConsumeQueue,所以可靠性等都可以按照原系統的M-S結構等得到保障

但是這個方案也有一些問題:

  • 固定了Level,不夠靈活,最多只能支持18個Level
  • 業務是會變的,但是Level需要提前劃分,不支持修改
  • 如果要支持30天的延遲,CommitLog的量會很大,這塊怎麽處理沒有看到

站在巨人的肩膀上

總結RocketMQ的方案,通過劃分Level的方式,將排序操作轉換為了O(1)的ConsumeQueue 的append操作。

我們去支持任意延遲的消息,必然也需要通過類似的方式避免掉排序。

此時我們想到了TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility 》

Netty中也是用TimeWheel來優化I/O超時的操作。

TimeWheel

TimeWheel的大致原理如下:

技術分享圖片

  • 箭頭按照一定方向固定頻率移動(如手表指針),每一次跳動稱為一個tick。ticksPerWheel表示一個定時輪上的tick數。

如每次tick為1秒,ticksPerWheel為60,那麽這就和現實中的秒針走動完全一致。

TimeWheel應用到延遲消息中

無論定時消息還是延遲消息,最終都是投遞後延遲一段時間對用戶可見。

假設這個延遲時間為X秒,那麽X%(ticksPerWheel * tick)可以計算出X所屬的TimeWheel中位置。

這裏存在一個問題,以上圖為例,TimeWheel的size為8,那麽延遲1秒和9秒的消息都處在一個鏈表中。如果用戶先發了延遲9秒的消息再發了延遲1秒的消息,他們在一個鏈表中所以延遲1秒的消息會需要等待延遲9秒的消息先投遞。顯然這是不能接受的,那麽如何解決這個問題?

排序

顯然,如果對TimeWheel一個tick中的元素進行排序顯然就解決了上面的問題。但是顯而易見的是排序是不可能的。

擴大時間輪

最直觀的方式,我們能不能通過擴大時間輪的方式避免延遲9和延遲1落到一個tick位置上?

假設支持30天,精度為1秒,那麽ticksPerWheel=30 * 24 * 60 * 60,這樣每一個tick上的延遲都是一致的,不存在上述的問題(類似於將RocketMQ的Level提升到了30 * 24 * 60 * 60個)。但是TimeWheel需要被加載到內存操作,這顯然是無法接受的。

多級時間輪

單個TimeWheel無法支持,那麽能否顯示中的時針、分針的形式,構建多級時間輪來解決呢?

技術分享圖片

多級時間輪解決了上述的問題,但是又引入了新的問題:

  1. 在整點(tick指向0的位置)需要加載大量的數據會導致延遲,比如第二個時間輪到整點需要加載未來一天的數據
  2. 時間輪需要載入到內存,這個開銷是不可接受的

延遲加載

多級定時輪的問題在於需要加載大量數據到內存,那麽能否優化一下將這裏的數據延遲加載到內存來解決內存開銷的問題呢?

在多級定時輪的方案中,顯然對於未來一小時或者未來一天的數據可以不加載到內存,而可以只加載延遲時間臨近的消息。

進一步優化,可以將數據按照固定延遲間隔劃分,那麽每次加載的數據量是大致相同的,不會出tick約大的定時輪需要加載越多的數據,那麽方案如下:

技術分享圖片

基於上述的方案,那麽TimeWheel中存儲未來30分鐘需要投遞的消息的索引,索引為一個long型,那麽數據量為:30 * 60 * 8 * TPS,相對來說內存開銷是可以接受的,比如TPS為1w那麽大概開銷為200M+。

之後的數據按照每30分鐘一個塊的形式寫入文件,那麽每個整點時的操作就是計算一下將30分鐘的消息Hash到對應的TimeWheel上,那麽排序問題就解決了。

到此為止就只剩下一個問題,如何保存30天的數據?

CommitLog保存超長延遲的數據

CommitLog是有時效性的,比如在我們只保存最近7天的消息,過期數據將被刪除。對於延遲消息,可能需要30天之後投遞,顯然是不能被刪除的。

那麽我們怎麽保存延遲消息呢?

直觀的方法就是將延遲消息從CommitLog中剝離出來,獨立存儲以保存更長的時間。

技術分享圖片

通過DispatchService將WAL中的延遲消息寫入到獨立的文件中。這些文件按照延遲時間組成一個鏈表。

鏈表長度為最大延遲時間/每個文件保存的時間長度。

那麽WAL可以按照正常的策略進行過期刪除,Delay Msg File則在一個文件投遞完之後進行刪除。

唯一的問題是這裏會有Delay Msg File帶來的隨機寫問題,但是這個對系統整體性能不會有很大影響,在可接受範圍內。

BOUNS

結合TimeWheel和CommitLog保存超長延遲數據的方案,加上一些優化手段,基本就完成了支持任意延遲時間的方案:

技術分享圖片

  • 消息寫入WAL
  • Dispatcher處理延遲消息
    • 延遲消息一定時間的直接寫入TimeWheel
    • 延遲超過一定時間寫入DelayMessageStorage
  • DelayMessageStorage對DelayMsgFile構建一層索引,這樣在映射到TimeWheel時只需要做一次Hash操作
  • 通過TimeWheel將消息投遞到ConsumeQueue中完成對Consumer的可見

通過這個方案解決了最初提出來的任意延遲消息的兩個難點:

  1. 消息的排序問題
  2. 超長延遲消息的存儲問題

最後

本文從延遲消息的概念出發,了解業界的支持情況,確定延遲消息的難點和支持邊界,最後通過一步步推導完成了一個相對來說從內存開銷和性能上都可以滿足期望的方案。

對本文有任何問題歡迎通過公公眾號留言或添加我的微信交流。

技術分享圖片

如何在MQ中實現支持任意延遲的消息?