1. 程式人生 > 實用技巧 >延時任務的實現

延時任務的實現

前言

延時任務介紹:

比如你在某寶上下了一個訂單,卻沒有支付,過了半個小時後這個訂單自動取消了。

設計思路比較方法可以通過效能,能否持久化,拓展分散式等。當然要根據你的業務來。

1. 基於資料庫輪訓

此方案很easy,即將延時任務存進資料庫的表中,然後通過一個執行緒定時的去掃描資料庫,不斷的將任務的觸發時間和當前時間進行比較,如果達到任務的觸發時間,就執行任務!

優點:簡單易行,支援叢集操作

缺點:

(1)對伺服器記憶體資源、cpu資源消耗大

(2)存在延遲,比如你每隔3分鐘掃描一次,那最壞的延遲時間就是3分鐘

(3)在網際網路專案中,經常會遇到有幾千萬條延時任務在跑。那麼,資料庫裡延時任務表裡就有幾千萬條記錄,每隔幾分鐘這樣掃描一次,資料庫損耗極大

不推薦

2. 基於JDK延遲佇列

DelayQueue

該方案是利用JDK自帶的DelayQueue來實現,這是一個無界阻塞佇列,該佇列只有在延遲期滿的時候才能從中獲取元素,放入DelayQueue中的物件,是必須實現Delayed介面的。

消費者通過 poll()/take()方法獲取一個任務。

其中Poll():獲取並移除佇列的超時元素,沒有則返回空

take():獲取並移除佇列的超時元素,如果沒有則wait當前執行緒,直到有元素滿足超時條件,返回結果。

優點::效率高,任務觸發時間延遲低

缺點::(1)伺服器重啟後,資料全部消失,怕宕機。要滿足高可用場景,需要hook執行緒二次開發;

(2)叢集擴充套件相當麻煩

(3)因為記憶體條件限制的原因,比如在網際網路專案中,延時任務通常十分的多,如果全丟JVM,記憶體容易OOM

(4)程式碼複雜度較高

3. 基於時間輪

包含兩個重要的資料結構:
(1)環形佇列,例如可以建立一個包含3600個slot的環形佇列(本質是個陣列);
(2)任務集合,環上每一個slot是一個Set

同時,啟動一個timer:
(1)此timer每隔1s,在環形佇列中移動一格;
(2)用一個Current Index來標識正在檢測的slot;

Task結構中有兩個很重要的屬性:
(1)Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務;
(2)Task-Function:需要執行的任務函式;

如上圖,假設當前Current Index指向第一格,當有延時訊息到達之後,例如希望3610秒之後,觸發一個延時訊息任務,只需:
(1)計算這個Task應該放在哪一個slot,現在指向1,3610秒之後,應該是第11格,所以這個Task應該放在第11個slot的Set中;
(2)計算這個Task的Cycle-Num,由於環形佇列是3600格(每秒移動一格,正好1小時),這個任務是3610秒後執行,所以應該繞3610/3600=1圈之後再執行,於是Cycle-Num=1;

Current Index不停的移動,每秒移動一格,當移動到一個新slot,遍歷這個slot中對應的Set,每個Task看Cycle-Num是不是0:
(1)如果不是0,說明還需要多移動幾圈,將Cycle-Num減1;
(2)如果是0,說明馬上要執行這個Task了,取出Task-Funciton執行,丟給工作執行緒執行,並把這個Task從Set中刪除;

注意,不要用timer來執行任務,否則timer會越來越不準。

優點::效率高,任務觸發時間延遲時間比delayQueue低,程式碼複雜度比delayQueue低。

缺點::(1)伺服器重啟後,資料全部消失,怕宕機。(可拓展持久化方案實現高可用,Netty kafak akka均有使用)

   (2)叢集擴充套件相當麻煩

   (3)這種情況也是把任務丟JVM記憶體,因為記憶體條件限制的原因,,那麼很容易就出現OOM異常

netty 中有 HashedWheelTimer 工具原理類似

4. Redis

redis zset

zset是一個有序集合,每一個元素(member)都關聯了一個score,通過score排序來取集合中的值。
具體如下圖所示,我們將超時時間戳與延時任務分別設定為score和member,系統掃描第一個元素判斷是否超時,具體如下圖所示

取出score最小的元素,與當前時間進行比較,如果發現已經到達時間,則執行任務。

鍵空間機制

該方案使用redis的Keyspace Notifications,中文翻譯就是鍵空間機制,就是利用該機制可以在key失效之後,提供一個回撥,實際上是redis會給客戶端傳送一個訊息。是需要redis版本2.8以上。
做法很簡單:
(1)給key設定一個超時時間

(2)給key超時事件訂閱一個處理方法

(3)key超時了,redis將回調步驟(2)中訂閱的方法

ps:官網不推薦使用該機制。因為Redis的釋出/訂閱目前是即發即棄(fire and forget)模式的,因此無法實現事件的可靠通知。也就是說,如果釋出/訂閱的客戶端斷鏈之後,此時剛好key的失效期到了,如果此時客戶端又無法連線到,那麼該延時任務就將丟失。即使客戶端又恢復了連線,也不會再次回撥。

優點:(1)由於使用Redis作為訊息通道,訊息都儲存在Redis中。如果傳送程式或者任務處理程式掛了,重啟之後,還有重新處理資料的可能性。

(2)做叢集擴充套件相當方便

(3)時間準確度高

缺點:(1)需要額外進行redis維護

5. MQ 基於訊息佇列

利用訊息佇列的某些特性實現延時佇列,例如我們可以實現rabbitMQ的延時佇列。

優點: 高效,可以利用rabbitmq的分散式特性輕易的進行橫向擴充套件,訊息支援持久化增加了可靠性。

缺點:本身的易用度要依賴於rabbitMq的運維.因為要引用rabbitMq,所以複雜度和成本變高

mq有些場景可能不太適合,比如對這個延遲任務取消

推薦使用此方案

6. 基於執行緒池

ScheduledThreadPoolExecutor

1.5 引入了 ScheduledThreadPoolExecutor,它是一個具有更多功能的 Timer 的替代品,允許多個服務執行緒。如果設定一個服務執行緒和 Timer 沒啥差別。

ScheduledThreadPoolExecutor繼承了 ThreadPoolExecutor,實現了 ScheduledExecutorService。可以定性操作就是正常執行緒池差不多了。區別就在於兩點,一個是 ScheduledFutureTask ,一個是 DelayedWorkQueue。

其實 DelayedWorkQueue 就是優先佇列,也是利用陣列實現的小頂堆。而 ScheduledFutureTask 繼承自 FutureTask 重寫了 run 方法,實現了週期性任務的需求。

 /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic) // 不是週期性任務 run
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime(); // 設定下一次執行時間
                reExecutePeriodic(outerTask); // 重新入佇列
            }
        }

ScheduledThreadPoolExecutor 大致的流程和 Timer 差不多,也是維護一個優先佇列,然後通過重寫 task 的 run 方法來實現週期性任務,主要差別在於能多執行緒執行任務,不會單執行緒阻塞。

並且 Java 執行緒池的設定是 task 出錯會把錯誤吃了,無聲無息的。因此一個任務出錯也不會影響之後的任務。