1. 程式人生 > >有贊延遲隊列設計

有贊延遲隊列設計

發現 json格式 xxxx true pytho java 強調 生命 信息

技術分享圖片

延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那麽,是在什麽場景下我才需要這樣的隊列呢?

背景

我們先看看以下業務場景:

  • 當訂單一直處於未支付狀態時,如何及時的關閉訂單,並退還庫存?
  • 如何定期檢查處於退款狀態的訂單是否已經退款成功?
  • 新創建店鋪,N天內沒有上傳商品,系統如何知道該信息,並發送激活短信?等等

為了解決以上問題,最簡單直接的辦法就是定時去掃表。每個業務都要維護一個自己的掃表邏輯。 當業務越來越多時,我們會發現掃表部分的邏輯會非常類似。我們可以考慮將這部分邏輯從具體的業務邏輯裏面抽出來,變成一個公共的部分。
那麽開源界是否已有現成的方案呢?答案是肯定的。Beanstalkd(http://kr.github.io/beanstalkd/), 它基本上已經滿足以上需求。但是,在刪除消息的時候不是特別方便,需要更多的成本。而且,它是基於C語言開發的,當時我們團隊主流是PHP和Java,沒法做二次開發。於是我們借鑒了它的設計思路,用Java重新實現了一個延遲隊列。

設計目標

  • 消息傳輸可靠性:消息進入到延遲隊列後,保證至少被消費一次。
  • Client支持豐富:由於業務上的需求,至少支持PHP和Python。
  • 高可用性:至少得支持多實例部署。掛掉一個實例後,還有後備實例繼續提供服務。
  • 實時性:允許存在一定的時間誤差。
  • 支持消息刪除:業務使用方,可以隨時刪除指定消息。

整體結構

整個延遲隊列由4個部分組成:

  • Job Pool用來存放所有Job的元信息。
  • Delay Bucket是一組以時間為維度的有序隊列,用來存放所有需要延遲的/已經被reserve的Job(這裏只存放Job Id)。
  • Timer負責實時掃描各個Bucket,並將delay時間大於等於當前時間的Job放入到對應的Ready Queue。
  • Ready Queue存放處於Ready狀態的Job(這裏只存放Job Id),以供消費程序消費。

如下圖表述:技術分享圖片

設計要點

基本概念

  • Job:需要異步處理的任務,是延遲隊列裏的基本單元。與具體的Topic關聯在一起。
  • Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱。

消息結構

每個Job必須包含一下幾個屬性:

  • Topic:Job類型。可以理解成具體的業務名稱。
  • Id:Job的唯一標識。用來檢索和刪除指定的Job信息。
  • Delay:Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)
  • TTR(time-to-run):Job執行超時時間。單位:秒。
  • Body:Job的內容,供消費者做具體的業務處理,以json格式存儲。

具體結構如下圖表示:技術分享圖片TTR的設計目的是為了保證消息傳輸的可靠性。

消息狀態轉換

每個Job只會處於某一個狀態下:

  • ready:可執行狀態,等待消費。
  • delay:不可執行狀態,等待時鐘周期。
  • reserved:已被消費者讀取,但還未得到消費者的響應(delete、finish)。
  • deleted:已被消費完成或者已被刪除。

下面是四個狀態的轉換示意圖:技術分享圖片

消息存儲

在選擇存儲介質之前,先來確定下具體的數據結構:

  • Job Poll存放的Job元信息,只需要K/V形式的結構即可。key為job id,value為job struct。
  • Delay Bucket是一個有序隊列。
  • Ready Queue是一個普通list或者隊列都行。

能夠同時滿足以上需求的,非redis莫屬了。
bucket的數據結構就是redis的zset,將其分為多個bucket是為了提高掃描速度,降低消息延遲。

通信協議

為了滿足多語言Client的支持,我們選擇Http通信方式,通過文本協議(json)來實現與Client端的交互。 目前支持以下協議:

  • 添加:{‘command’:’add’, ’topic’:’xxx’, ‘id’: ‘xxx’, ‘delay’: 30, ’TTR’: 60, ‘body’:‘xxx‘}
  • 獲取:{‘command’:’pop’, ’topic’:’xxx‘}
  • 完成:{‘command’:’finish’, ‘id’:’xxx‘}
  • 刪除:{‘command’:’delete’, ‘id’:’xxx‘}

body也是一個json串。
Response結構:{’success’:true/false, ‘error’:’error reason’, ‘id’:’xxx’, ‘value’:’job body‘}
強調一下:job id是由業務使用方決定的,一定要保證全局唯一性。這裏建議采用topic+業務唯一id的組合。

舉例說明一個Job的生命周期

  • 用戶對某個商品下單,系統創建訂單成功,同時往延遲隊列裏put一個job。job結構為:{‘topic‘:‘orderclose’, ‘id‘:‘ordercloseorderNoXXX’, ‘delay’:1800 ,’TTR‘:60 , ‘body‘:’XXXXXXX’}
  • 延遲隊列收到該job後,先往job pool中存入job信息,然後根據delay計算出絕對執行時間,並以輪詢(round-robbin)的方式將job id放入某個bucket。
  • timer每時每刻都在輪詢各個bucket,當1800秒(30分鐘)過後,檢查到上面的job的執行時間到了,取得job id從job pool中獲取元信息。如果這時該job處於deleted狀態,則pass,繼續做輪詢;如果job處於非deleted狀態,首先再次確認元信息中delay是否大於等於當前時間,如果滿足則根據topic將job id放入對應的ready queue,然後從bucket中移除;如果不滿足則重新計算delay時間,再次放入bucket,並將之前的job id從bucket中移除。
  • 消費端輪詢對應的topic的ready queue(這裏仍然要判斷該job的合理性),獲取job後做自己的業務邏輯。與此同時,服務端將已經被消費端獲取的job按照其設定的TTR,重新計算執行時間,並將其放入bucket。
  • 消費端處理完業務後向服務端響應finish,服務端根據job id刪除對應的元信息。

現有物理拓撲

技術分享圖片目前采用的是集中存儲機制,在多實例部署時Timer程序可能會並發執行,導致job被重復放入ready queue。為了解決這個問題,我們使用了redis的setnx命令實現了簡單的分布式鎖,以保證每個bucket每次只有一個timer thread來掃描。

設計不足的地方

timer是通過獨立線程的無限循環來實現,在沒有ready job的時候會對CPU造成一定的浪費。
消費端在reserve job的時候,采用的是http短輪詢的方式,且每次只能取的一個job。如果ready job較多的時候會加大網絡I/O的消耗。
數據存儲使用的redis,消息在持久化上受限於redis的特性。
scale-out的時候依賴第三方(nginx)。

未來架構方向

基於wait/notify方式的Timer實現。
提供TCP長連的API,實現push或者long-polling的消息reserve方法。
擁有自己的存儲方案(內嵌數據庫、自定義數據結構寫文件),確保消息的持久化。
實現自己的name-server。
考慮提供周期性任務的直接支持。

有贊延遲隊列設計