延時任務的實現
前言
延時任務介紹:
比如你在某寶上下了一個訂單,卻沒有支付,過了半個小時後這個訂單自動取消了。
設計思路比較方法可以通過效能,能否持久化,拓展分散式等。當然要根據你的業務來。
1. 基於資料庫輪訓
此方案很easy,即將延時任務存進資料庫的表中,然後通過一個執行緒定時的去掃描資料庫,不斷的將任務的觸發時間和當前時間進行比較,如果達到任務的觸發時間,就執行任務!
優點:簡單易行,支援叢集操作
缺點:
(1)對伺服器記憶體資源、cpu資源消耗大
(2)存在延遲,比如你每隔3分鐘掃描一次,那最壞的延遲時間就是3分鐘
(3)在網際網路專案中,經常會遇到有幾千萬條延時任務在跑。那麼,資料庫裡延時任務表裡就有幾千萬條記錄,每隔幾分鐘這樣掃描一次,資料庫損耗極大
不推薦
2. 基於JDK延遲佇列
DelayQueue
該方案是利用JDK自帶的DelayQueue來實現,這是一個無界阻塞佇列,該佇列只有在延遲期滿的時候才能從中獲取元素,放入DelayQueue中的物件,是必須實現Delayed介面的。
消費者通過 poll()/take()方法獲取一個任務。
其中Poll():獲取並移除佇列的超時元素,沒有則返回空
take():獲取並移除佇列的超時元素,如果沒有則wait當前執行緒,直到有元素滿足超時條件,返回結果。
優點::效率高,任務觸發時間延遲低
缺點::(1)伺服器重啟後,資料全部消失,怕宕機。要滿足高可用場景,需要hook執行緒二次開發;
(2)叢集擴充套件相當麻煩
(3)因為記憶體條件限制的原因,比如在網際網路專案中,延時任務通常十分的多,如果全丟JVM,記憶體容易OOM
(4)程式碼複雜度較高
3. 基於時間輪
包含兩個重要的資料結構:
(1)環形佇列,例如可以建立一個包含3600個slot的環形佇列(本質是個陣列);
(2)任務集合,環上每一個slot是一個Set
同時,啟動一個timer:
(1)此timer每隔1s,在環形佇列中移動一格;
(2)用一個Current Index來標識正在檢測的slot;
Task結構中有兩個很重要的屬性:
(1)Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務;
(2)Task-Function:需要執行的任務函式;
如上圖,假設當前Current Index指向第一格,當有延時訊息到達之後,例如希望3610秒之後,觸發一個延時訊息任務,只需:
(1)計算這個Task應該放在哪一個slot,現在指向1,3610秒之後,應該是第11格,所以這個Task應該放在第11個slot的Set
(2)計算這個Task的Cycle-Num,由於環形佇列是3600格(每秒移動一格,正好1小時),這個任務是3610秒後執行,所以應該繞3610/3600=1圈之後再執行,於是Cycle-Num=1;
Current Index不停的移動,每秒移動一格,當移動到一個新slot,遍歷這個slot中對應的Set
(1)如果不是0,說明還需要多移動幾圈,將Cycle-Num減1;
(2)如果是0,說明馬上要執行這個Task了,取出Task-Funciton執行,丟給工作執行緒執行,並把這個Task從Set
注意,不要用timer來執行任務,否則timer會越來越不準。
優點::效率高,任務觸發時間延遲時間比delayQueue低,程式碼複雜度比delayQueue低。
缺點::(1)伺服器重啟後,資料全部消失,怕宕機。(可拓展持久化方案實現高可用,Netty kafak akka均有使用)
(2)叢集擴充套件相當麻煩
(3)這種情況也是把任務丟JVM記憶體,因為記憶體條件限制的原因,,那麼很容易就出現OOM異常
netty 中有 HashedWheelTimer 工具原理類似
4. Redis
redis zset
zset是一個有序集合,每一個元素(member)都關聯了一個score,通過score排序來取集合中的值。
具體如下圖所示,我們將超時時間戳與延時任務分別設定為score和member,系統掃描第一個元素判斷是否超時,具體如下圖所示
取出score最小的元素,與當前時間進行比較,如果發現已經到達時間,則執行任務。
鍵空間機制
該方案使用redis的Keyspace Notifications,中文翻譯就是鍵空間機制,就是利用該機制可以在key失效之後,提供一個回撥,實際上是redis會給客戶端傳送一個訊息。是需要redis版本2.8以上。
做法很簡單:
(1)給key設定一個超時時間
(2)給key超時事件訂閱一個處理方法
(3)key超時了,redis將回調步驟(2)中訂閱的方法
ps:官網不推薦使用該機制。因為Redis的釋出/訂閱目前是即發即棄(fire and forget)模式的,因此無法實現事件的可靠通知。也就是說,如果釋出/訂閱的客戶端斷鏈之後,此時剛好key的失效期到了,如果此時客戶端又無法連線到,那麼該延時任務就將丟失。即使客戶端又恢復了連線,也不會再次回撥。
優點:(1)由於使用Redis作為訊息通道,訊息都儲存在Redis中。如果傳送程式或者任務處理程式掛了,重啟之後,還有重新處理資料的可能性。
(2)做叢集擴充套件相當方便
(3)時間準確度高
缺點:(1)需要額外進行redis維護
5. MQ 基於訊息佇列
利用訊息佇列的某些特性實現延時佇列,例如我們可以實現rabbitMQ的延時佇列。
優點: 高效,可以利用rabbitmq的分散式特性輕易的進行橫向擴充套件,訊息支援持久化增加了可靠性。
缺點:本身的易用度要依賴於rabbitMq的運維.因為要引用rabbitMq,所以複雜度和成本變高
mq有些場景可能不太適合,比如對這個延遲任務取消
推薦使用此方案
6. 基於執行緒池
ScheduledThreadPoolExecutor
1.5 引入了 ScheduledThreadPoolExecutor,它是一個具有更多功能的 Timer 的替代品,允許多個服務執行緒。如果設定一個服務執行緒和 Timer 沒啥差別。
ScheduledThreadPoolExecutor繼承了 ThreadPoolExecutor,實現了 ScheduledExecutorService。可以定性操作就是正常執行緒池差不多了。區別就在於兩點,一個是 ScheduledFutureTask ,一個是 DelayedWorkQueue。
其實 DelayedWorkQueue 就是優先佇列,也是利用陣列實現的小頂堆。而 ScheduledFutureTask 繼承自 FutureTask 重寫了 run 方法,實現了週期性任務的需求。
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 不是週期性任務 run
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 設定下一次執行時間
reExecutePeriodic(outerTask); // 重新入佇列
}
}
ScheduledThreadPoolExecutor 大致的流程和 Timer 差不多,也是維護一個優先佇列,然後通過重寫 task 的 run 方法來實現週期性任務,主要差別在於能多執行緒執行任務,不會單執行緒阻塞。
並且 Java 執行緒池的設定是 task 出錯會把錯誤吃了,無聲無息的。因此一個任務出錯也不會影響之後的任務。