1. 程式人生 > 其它 >延時佇列 基於Redis實現延時佇列服務

延時佇列 基於Redis實現延時佇列服務

基於Redis實現延時佇列服務

背景

在業務發展過程中,會出現一些需要延時處理的場景,比如:

a.訂單下單之後超過30分鐘使用者未支付,需要取消訂單
b.訂單一些評論,如果48h使用者未對商家評論,系統會自動產生一條預設評論
c.點我達訂單下單後,超過一定時間訂單未派出,需要超時取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在資料量不大的場景下是完全沒問題,但是當資料量大的時候高頻的輪訓資料庫就會比較的耗資源,導致資料庫的慢查或者查詢超時。所以在處理這類需求時候,採用了延時佇列來完成。


幾種延時佇列

延時佇列就是一種帶有延遲功能的訊息佇列。下面會介紹幾種目前已有的延時佇列:
1.Java中java.util.concurrent.DelayQueue
優點:JDK自身實現,使用方便,量小適用
缺點:佇列訊息處於jvm記憶體,不支援分散式執行和訊息持久化
2.Rocketmq延時佇列
優點:訊息持久化,分散式
缺點:不支援任意時間精度,只支援特定level的延時訊息
3.Rabbitmq延時佇列(TTL+DLX實現)
優點:訊息持久化,分散式
缺點:延時相同的訊息必須扔在同一個佇列

根據自身業務和公司情況,如果實現一個自己的延時佇列服務需要考慮一下幾點:

* 訊息儲存
* 過期延時訊息實時獲取
* 高可用性

 基於Redis實現

1.0版本

功能特性

* 訊息可靠性,訊息持久化,訊息至少被消費一次
* 實時性:存在一定的時間誤差(定時任務間隔)
* 支援指定訊息remove
* 高可用性

整體結構

- Messages Pool所有的延時訊息存放,結構為KV結構,key為訊息ID,value為一個具體的message(這裡選擇Redis Hash結構主要是因為hash結構能儲存較大的資料量,資料較多時候會進行漸進式rehash擴容,並且對於HSET和HGET命令來說時間複雜度都是O(1))
- Delayed Queue是16個有序佇列(佇列支援水平擴充套件),結構為ZSET,value為messages pool中訊息ID,score為過期時間(分為多個佇列是為了提高掃描的速度)
- Timed Task定時任務,負責掃描處理每個佇列過期訊息

 訊息結構

每個延時訊息必須包括以下引數:

* tags:訊息過期之後傳送mq的tags
* keys:訊息過期之後傳送mq的keys
* body:訊息過期之後傳送mq的body,提供給消費這做具體的訊息處理
* delayTime:延時傳送時間(預設,delayTime、expectDate有一個即可)
* expectDate:期望傳送時間

流程


注:上圖1、2、3或者2、3是一個事務操作
取出過期訊息過程是通過一個外部定時任務每隔1min分鐘去查詢佇列中過期的訊息,然後傳送mq && remove

2.0版本

1.0上有一個可改進的地方就是佇列中過期的訊息是通過定時任務觸發查詢。所有有了2.0
2.0版本在1.0上做了一個優化,廢棄掉了1min定時任務觸發過期訊息傳送,採用了java Lock await/singlal方式實現過期訊息的實時傳送低延時

多節點部署結構:

- pull job:這裡分別為每一個佇列建立了一個pull job thread,功能很簡單,就是負責去佇列中拉取過期的訊息資料(這裡保證一個佇列有且只有一個pull job)
- worker:pull job拉取到的過期訊息會交給一個worker thread去處理,這樣的好處是處理過期的訊息實時性更高(pull job不必等去除過期訊息全部處理完成在繼續去拉取新的過期資料)
- zookeeper coordinate:通過zk的操作來完成對佇列的重新分配工作,daemon thread監聽zk節點的建立和刪除

主要流程:


服務啟動會註冊zk,獲取分配處理的queues,啟動後臺執行緒監聽zk 
為每個分配queue建立一個pull job 
pull job首先會去queue中查詢是否有過期訊息: 
Y:將取出訊息交給worker處理
N:查詢queue中最後一個成員(zset結構預設按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())

由於過期訊息傳送成功才會從佇列中remove,所以pull job會記錄上一次查詢佇列的一個offset,每次獲取到過期訊息會將offset向前偏移,過期訊息交給worker處理,當worker由於某些異常原因處理失敗會重置pull job中offset,這樣可以避免訊息傳送一次失敗之後沒辦法在繼續處理(除了新節點add || remove時候)
當部署服務有新增,延時佇列服務會重新計算得到當前處理佇列,並將之前建立pull job cancel,為新處理佇列重新建立pull job。刪除同理。
</ol>

背景

在業務發展過程中,會出現一些需要延時處理的場景,比如:

a.訂單下單之後超過30分鐘使用者未支付,需要取消訂單
b.訂單一些評論,如果48h使用者未對商家評論,系統會自動產生一條預設評論
c.點我達訂單下單後,超過一定時間訂單未派出,需要超時取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在資料量不大的場景下是完全沒問題,但是當資料量大的時候高頻的輪訓資料庫就會比較的耗資源,導致資料庫的慢查或者查詢超時。所以在處理這類需求時候,採用了延時佇列來完成。


幾種延時佇列

延時佇列就是一種帶有延遲功能的訊息佇列。下面會介紹幾種目前已有的延時佇列:
1.Java中java.util.concurrent.DelayQueue
優點:JDK自身實現,使用方便,量小適用
缺點:佇列訊息處於jvm記憶體,不支援分散式執行和訊息持久化
2.Rocketmq延時佇列
優點:訊息持久化,分散式
缺點:不支援任意時間精度,只支援特定level的延時訊息
3.Rabbitmq延時佇列(TTL+DLX實現)
優點:訊息持久化,分散式
缺點:延時相同的訊息必須扔在同一個佇列

根據自身業務和公司情況,如果實現一個自己的延時佇列服務需要考慮一下幾點:

* 訊息儲存
* 過期延時訊息實時獲取
* 高可用性

 基於Redis實現

1.0版本

功能特性

* 訊息可靠性,訊息持久化,訊息至少被消費一次
* 實時性:存在一定的時間誤差(定時任務間隔)
* 支援指定訊息remove
* 高可用性

整體結構

- Messages Pool所有的延時訊息存放,結構為KV結構,key為訊息ID,value為一個具體的message(這裡選擇Redis Hash結構主要是因為hash結構能儲存較大的資料量,資料較多時候會進行漸進式rehash擴容,並且對於HSET和HGET命令來說時間複雜度都是O(1))
- Delayed Queue是16個有序佇列(佇列支援水平擴充套件),結構為ZSET,value為messages pool中訊息ID,score為過期時間(分為多個佇列是為了提高掃描的速度)
- Timed Task定時任務,負責掃描處理每個佇列過期訊息

 訊息結構

每個延時訊息必須包括以下引數:

* tags:訊息過期之後傳送mq的tags
* keys:訊息過期之後傳送mq的keys
* body:訊息過期之後傳送mq的body,提供給消費這做具體的訊息處理
* delayTime:延時傳送時間(預設,delayTime、expectDate有一個即可)
* expectDate:期望傳送時間

流程


注:上圖1、2、3或者2、3是一個事務操作
取出過期訊息過程是通過一個外部定時任務每隔1min分鐘去查詢佇列中過期的訊息,然後傳送mq && remove

2.0版本

1.0上有一個可改進的地方就是佇列中過期的訊息是通過定時任務觸發查詢。所有有了2.0
2.0版本在1.0上做了一個優化,廢棄掉了1min定時任務觸發過期訊息傳送,採用了java Lock await/singlal方式實現過期訊息的實時傳送低延時

多節點部署結構:

- pull job:這裡分別為每一個佇列建立了一個pull job thread,功能很簡單,就是負責去佇列中拉取過期的訊息資料(這裡保證一個佇列有且只有一個pull job)
- worker:pull job拉取到的過期訊息會交給一個worker thread去處理,這樣的好處是處理過期的訊息實時性更高(pull job不必等去除過期訊息全部處理完成在繼續去拉取新的過期資料)
- zookeeper coordinate:通過zk的操作來完成對佇列的重新分配工作,daemon thread監聽zk節點的建立和刪除

主要流程:


服務啟動會註冊zk,獲取分配處理的queues,啟動後臺執行緒監聽zk 
為每個分配queue建立一個pull job 
pull job首先會去queue中查詢是否有過期訊息: 
Y:將取出訊息交給worker處理
N:查詢queue中最後一個成員(zset結構預設按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())

由於過期訊息傳送成功才會從佇列中remove,所以pull job會記錄上一次查詢佇列的一個offset,每次獲取到過期訊息會將offset向前偏移,過期訊息交給worker處理,當worker由於某些異常原因處理失敗會重置pull job中offset,這樣可以避免訊息傳送一次失敗之後沒辦法在繼續處理(除了新節點add || remove時候)
當部署服務有新增,延時佇列服務會重新計算得到當前處理佇列,並將之前建立pull job cancel,為新處理佇列重新建立pull job。刪除同理。
</ol>