靈感來襲,基於Redis的分散式延遲佇列
延遲佇列
延遲佇列,也就是一定時間之後將訊息體放入佇列,然後消費者才能正常消費。比如1分鐘之後傳送簡訊,傳送郵件,檢測資料狀態等。
Redisson Delayed Queue
如果你專案中使用了redisson,那麼恭喜你,使用延遲佇列將非常的簡單。
基於Redis的Redisson分散式延遲佇列(Delayed Queue)結構的RDelayedQueue
Java物件在實現了RQueue
介面的基礎上提供了向佇列按要求延遲新增專案的功能。該功能可以用來實現訊息傳送延遲按幾何增長或幾何衰減的傳送策略。
RQueue<String> distinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue); // 10秒鐘以後將訊息傳送到指定佇列 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 一分鐘以後將訊息傳送到指定佇列 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在該物件不再需要的情況下,應該主動銷燬。僅在相關的Redisson物件也需要關閉的時候可以不用主動銷燬。
Java DelayQueue
DelayQueue它本質上是一個佇列,而這個佇列裡也只有存放Delayed的子類才有意義。
延遲佇列demo
public class DelayTask implements Delayed { private long startDate; public DelayTask(Long delayMillions) { this.startDate = System.currentTimeMillis() + delayMillions; } @Override public int compareTo(Delayed o) { Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); } @Override public long getDelay(TimeUnit unit) { return this.startDate - System.currentTimeMillis(); } public static void main(String[] args) throws Exception { BlockingQueue<DelayTask> queue = new DelayQueue<>(); DelayTask delayTask = new DelayTask(1000 * 5L); queue.put(delayTask); while (queue.size()>0){ queue.take(); } } }
延遲佇列消費原理
原始碼中出現了三次await字眼:- 第一次是當佇列為空時,等待;
- 第二次等待是因為,發現有任務,沒有到執行時間,並且有準備執行的執行緒(leader),那不好意思,還得接續等待直到下一個可執行的任務。
- 第三次是真正延時的地方了,available.awaitNanos(delay),此時也沒有別的執行緒要執行,也就是我將要執行,等待剩下的延遲時間即可。
延遲佇列生產原理
為保證消費者正常消費,如果優先佇列頭元素和當前放入元素相等,則說明當前元素消費的優先順序高,重置準備消費的執行緒(leader)為null,喚醒消費者執行緒重新執行take方法邏輯。
手寫一個Redis延遲佇列
Redis延遲佇列設計
延遲訊息體設計
延遲訊息體Message實現了Delayed介面,這樣Java DelayQueue就知道什麼時候取出訊息體。
Redis延遲佇列實現
RedisDelayQueue建構函式依賴redis操作快取服務物件和目標佇列名稱(redis key)。
offer方法傳入member(具體訊息),delay(延遲時間),timeUnit(時間單位),然後封裝成延遲訊息體Message物件,放入Java DelayQueue中。
run方法是一個迴圈體,不斷的從Java DelayQueue物件中獲取訊息體,然後放入redis對應的目標佇列裡。
延遲佇列測試demo
控制檯列印效果
思考
這種方案實現比較簡單,使用的時候一定要謹慎,應用於延遲小,訊息量不大的場景是沒問題的,畢竟Java DelayQueue是佔用記憶體的。另外也可以考慮利用Redis的sorted set 結構實現延遲佇列,使用TimeStamp作為score,比如你的任務是要延遲5分鐘,那麼就在當前時間上加5分鐘作為 score ,輪詢任務每秒只輪詢 score 大於當前時間的 key即可,如果任務支援有誤差,那麼當沒有掃描到有效資料的時候可以休眠對應時間再繼續輪