1. 程式人生 > >如何在MQ中實現支援任意延遲的訊息?

如何在MQ中實現支援任意延遲的訊息?

什麼是定時訊息和延遲訊息?

  • 定時訊息:Producer 將訊息傳送到 MQ 服務端,但並不期望這條訊息立馬投遞,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費,該訊息即定時訊息。
  • 延遲訊息:Producer 將訊息傳送到 MQ 服務端,但並不期望這條訊息立馬投遞,而是延遲一定時間後才投遞到 Consumer 進行消費,該訊息即延時訊息。

定時訊息與延遲訊息在程式碼配置上存在一些差異,但是最終達到的效果相同:訊息在傳送到 MQ 服務端後並不會立馬投遞,而是根據訊息中的屬性延遲固定時間後才投遞給消費者。

 

 

目前業界MQ對定時訊息和延遲訊息的支援情況

上圖是阿里雲上對業界MQ功能的對比,其中開源產品中只有阿里的RocketMQ支援延遲訊息,且是固定的18個Level。

固定Level的含義是延遲是特定級別的,比如支援3秒、5秒的Level,那麼使用者只能傳送3秒延遲或者5秒延遲,不能傳送8秒延遲的訊息。

訊息佇列RocketMQ的阿里雲版本(收費版本)才支援到精確到秒級別的延遲訊息(沒有特定Level的限制)。

上圖是CMQ中對MQ功能的對比,其中標明騰訊的CMQ支援延遲訊息,但是沒有具體寫明支援到什麼精度,支援任意時間還是特定的Level。

 

通過騰訊雲上CMQ的API文件可以看到有一個秒級別的delaySeconds,應該是支援任意級別的延遲,即和收費版本的RocketMQ一致。

總結

  • 開源版本中,只有RocketMQ支援延遲訊息,且只支援18個特定級別的延遲
  • 付費版本中,阿里雲和騰訊雲上的MQ產品都支援精度為秒級別的延遲訊息

(真是有錢能使鬼推磨啊,有錢就能發任意延遲的訊息了,沒錢最多隻能發特定Level了)

 

 

任意延遲的訊息難點在哪裡?

開源版本沒有支援任意延遲的訊息,我想可能有以下幾個原因:

  1. 任意延遲的訊息的需求不強烈
  2. 可能是一個比較有技術含量的點,不願意開源

需求不強

對支援任意延遲的需求確實不強,因為:

  1. 延遲並不是MQ場景的核心功能,業務單獨做一個替代方案的成本不大
  2. 業務上一般對延遲的需求都是固定的,比如下單後半小時check是否付款,發貨後7天check是否收貨

在我司,MQ上線一年多後才有業務方希望我能支援延遲訊息,且不要求任意延遲,只要求和RocketMQ開源版本一致,支援一些業務上的級別即可。

不願意開源

為了差異化(好在雲上賣錢),只能降開源版本的功能進行閹割,所以開源版本的RocketMQ變成了只支援特定Level的延遲。

難點在哪裡?

既然業務有需求,我們肯定也要去支援。

首先,我們先劃清楚定義和邊界:

在我們的系統範圍內,支援任意延遲的訊息指的是:

  1. 精度支援到秒級別

  2. 最大支援30天的延遲

本著對自己的高要求,我們並不滿足於開源RocketMQ的18個Level的方案。那麼,如果我們自己要去實現一個支援任意延遲的訊息佇列,難點在哪裡呢?

  1. 排序
  2. 訊息儲存

首先,支援任意延遲意味著訊息是需要在服務端進行排序的。

比如使用者先發了一條延遲1分鐘的訊息,一秒後發了一條延遲3秒的訊息,顯然延遲3秒的訊息需要先被投遞出去。那麼服務端在收到訊息後需要對訊息進行排序後再投遞出去。

在MQ中,為了保證可靠性,訊息是需要落盤的,且對效能和延遲的要求,決定了在服務端對訊息進行排序是完全不可接受的。

其次,目前MQ的方案中都是基於WAL的方式實現的(RocketMQ、Kafka),日誌檔案會被過期刪除,一般會保留最近一段時間的資料。

 

支援任意級別的延遲,那麼需要儲存最近30天的訊息。

阿里內部 1000+ 核心應用使用,每天流轉幾千億條訊息,經過雙11交易、商品等核心鏈路真實場景的驗證,穩定可靠。

考慮一下一天幾千億的訊息,儲存30天的話需要堆多少伺服器,顯然是無法做到的。

 

 

知己知彼

雖然決定自己做,但是依舊需要先了解開源的實現,那麼就只能看看RocketMQ開源版本中,支援18個Level是怎麼實現的,希望能從中得到一些靈感。

上圖是通過RocketMQ原始碼分析後簡化一個實現原理方案示意圖。

分為兩個部分:

  1. 訊息的寫入
  2. 訊息的Schedule

訊息寫入中:

  1. 在寫入CommitLog之前,如果是延遲訊息,替換掉訊息的Topic和queueId(被替換為延遲訊息特定的Topic,queueId則為延遲級別對應的id)
  2. 訊息寫入CommitLog之後,提交dispatchRequest到DispatchService
  3. 因為在第①步中Topic和QueueId被替換了,所以寫入的ConsumeQueue實際上非真正訊息應該所屬的ConsumeQueue,而是寫入到ScheduledConsumeQueue中(這個特定的Queue存放不會被消費)

Schedule過程中:

  1. 給每個Level設定定時器,從ScheduledConsumeQueue中讀取資訊
  2. 如果ScheduledConsumeQueue中的元素已近到時,那麼從CommitLog中讀取訊息內容,恢復成正常的訊息內容寫入CommitLog
  3. 寫入CommitLog後提交dispatchRequest給DispatchService
  4. 因為在寫入CommitLog前已經恢復了Topic等屬性,所以此時DispatchService會將訊息投遞到正確的ConsumeQueue中

回顧一下這個方案,最大的優點就是沒有了排序:

  • 先發一條level是5s的訊息,再發一條level是3s的訊息,因為他們會屬於不同的ScheduleQueue所以投遞順序能保持正確
  • 如果先後發兩條level相同的訊息,那麼他們的處於同一個ConsumeQueue且保持傳送順序
  • 因為level數固定,每個level的有自己獨立的定時器,開銷也不會很大
  • ScheduledConsumeQueue其實是一個普通的ConsumeQueue,所以可靠性等都可以按照原系統的M-S結構等得到保障

但是這個方案也有一些問題:

  • 固定了Level,不夠靈活,最多隻能支援18個Level
  • 業務是會變的,但是Level需要提前劃分,不支援修改
  • 如果要支援30天的延遲,CommitLog的量會很大,這塊怎麼處理沒有看到

 

 

站在巨人的肩膀上

總結RocketMQ的方案,通過劃分Level的方式,將排序操作轉換為了O(1)的ConsumeQueue 的append操作。

我們去支援任意延遲的訊息,必然也需要通過類似的方式避免掉排序。

此時我們想到了TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility 》

Netty中也是用TimeWheel來優化I/O超時的操作。

TimeWheel

TimeWheel的大致原理如下:

  • 箭頭按照一定方向固定頻率移動(如手錶指標),每一次跳動稱為一個tick。ticksPerWheel表示一個定時輪上的tick數。

如每次tick為1秒,ticksPerWheel為60,那麼這就和現實中的秒針走動完全一致。

TimeWheel應用到延遲訊息中

無論定時訊息還是延遲訊息,最終都是投遞後延遲一段時間對使用者可見。

假設這個延遲時間為X秒,那麼X%(ticksPerWheel * tick)可以計算出X所屬的TimeWheel中位置。

這裡存在一個問題,以上圖為例,TimeWheel的size為8,那麼延遲1秒和9秒的訊息都處在一個連結串列中。如果使用者先發了延遲9秒的訊息再發了延遲1秒的訊息,他們在一個連結串列中所以延遲1秒的訊息會需要等待延遲9秒的訊息先投遞。顯然這是不能接受的,那麼如何解決這個問題?

排序

顯然,如果對TimeWheel一個tick中的元素進行排序顯然就解決了上面的問題。但是顯而易見的是排序是不可能的。

擴大時間輪

最直觀的方式,我們能不能通過擴大時間輪的方式避免延遲9和延遲1落到一個tick位置上?

假設支援30天,精度為1秒,那麼ticksPerWheel=30 * 24 * 60 * 60,這樣每一個tick上的延遲都是一致的,不存在上述的問題(類似於將RocketMQ的Level提升到了30 * 24 * 60 * 60個)。但是TimeWheel需要被載入到記憶體操作,這顯然是無法接受的。

多級時間輪

單個TimeWheel無法支援,那麼能否顯示中的時針、分針的形式,構建多級時間輪來解決呢?

多級時間輪解決了上述的問題,但是又引入了新的問題:

  1. 在整點(tick指向0的位置)需要載入大量的資料會導致延遲,比如第二個時間輪到整點需要載入未來一天的資料
  2. 時間輪需要載入到記憶體,這個開銷是不可接受的

延遲載入

多級定時輪的問題在於需要載入大量資料到記憶體,那麼能否優化一下將這裡的資料延遲載入到記憶體來解決記憶體開銷的問題呢?

 

在多級定時輪的方案中,顯然對於未來一小時或者未來一天的資料可以不載入到記憶體,而可以只加載延遲時間臨近的訊息。

進一步優化,可以將資料按照固定延遲間隔劃分,那麼每次載入的資料量是大致相同的,不會出tick約大的定時輪需要載入越多的資料,那麼方案如下:

基於上述的方案,那麼TimeWheel中儲存未來30分鐘需要投遞的訊息的索引,索引為一個long型,那麼資料量為:30 * 60 * 8 * TPS,相對來說記憶體開銷是可以接受的,比如TPS為1w那麼大概開銷為200M+。

之後的資料按照每30分鐘一個塊的形式寫入檔案,那麼每個整點時的操作就是計算一下將30分鐘的訊息Hash到對應的TimeWheel上,那麼排序問題就解決了。

到此為止就只剩下一個問題,如何儲存30天的資料?

CommitLog儲存超長延遲的資料

CommitLog是有時效性的,比如在我們只儲存最近7天的訊息,過期資料將被刪除。對於延遲訊息,可能需要30天之後投遞,顯然是不能被刪除的。

那麼我們怎麼儲存延遲訊息呢?

直觀的方法就是將延遲訊息從CommitLog中剝離出來,獨立儲存以儲存更長的時間。

通過DispatchService將WAL中的延遲訊息寫入到獨立的檔案中。這些檔案按照延遲時間組成一個連結串列。

連結串列長度為最大延遲時間/每個檔案儲存的時間長度。

那麼WAL可以按照正常的策略進行過期刪除,Delay Msg File則在一個檔案投遞完之後進行刪除。

唯一的問題是這裡會有Delay Msg File帶來的隨機寫問題,但是這個對系統整體效能不會有很大影響,在可接受範圍內。

 

 

BOUNS

結合TimeWheel和CommitLog儲存超長延遲資料的方案,加上一些優化手段,基本就完成了支援任意延遲時間的方案:

  • 訊息寫入WAL
  • Dispatcher處理延遲訊息
    • 延遲訊息一定時間的直接寫入TimeWheel
    • 延遲超過一定時間寫入DelayMessageStorage
  • DelayMessageStorage對DelayMsgFile構建一層索引,這樣在對映到TimeWheel時只需要做一次Hash操作
  • 通過TimeWheel將訊息投遞到ConsumeQueue中完成對Consumer的可見

通過這個方案解決了最初提出來的任意延遲訊息的兩個難點:

  1. 訊息的排序問題
  2. 超長延遲訊息的儲存問題

 

 

最後

本文從延遲訊息的概念出發,瞭解業界的支援情況,確定延遲訊息的難點和支援邊界,最後通過一步步推導完成了一個相對來說從記憶體開銷和效能上都可以滿足期望的方案。

對本文有任何問題歡迎通過公公眾號留言或新增我的微信交流。