手把手實現一條延時訊息
前言
近期在維護公司的排程平臺,其中有個關鍵功能那就是定時任務;定時任務大家平時肯定接觸的不少,比如 JDK
中的 Timer
、ScheduledExecutorService
、排程框架 Quartz
等。
通常用於實現 XX 時間後的延時任務,或週期性任務;
比如一個常見的業務場景:使用者下單 N 分鐘未能支付便自動取消訂單。
實現這類需求通常有兩種方式:
- 輪詢定時任務:給定週期內掃描所有未支付的訂單,檢視時間是否到期。
- 延時訊息:訂單建立的時候傳送一條 N 分鐘到期的資訊,一旦訊息消費後便可判斷訂單是否可以取消。
先看第一種,這類方式實現較為簡單,只需要啟動一個定時任務即可;但缺點同樣也很明顯,這個間隔掃描的時間不好控制。
給短了會造成很多無意義的掃描,增大資料庫壓力,給長了又會使得誤差較大。
當然最大的問題還是效率較低,隨著訂單增多耗時會呈線性增長,最差的情況甚至會出現上一波輪詢還沒有掃描完,下一波排程又來了。
這時第二種方案就要顯得靠譜多了,通過延時訊息可以去掉不必要的訂單掃描,實時性也比較高。
延時訊息
這裡我們不過多討論這類需求如何實現;重點聊聊這個延時訊息,看它是如何實現的,基於實現延時訊息的資料結構還能實現定時任務。
我在之前的開源 IM 專案中也加入了此類功能,可以很直觀的傳送一條延時訊息,效果如下:
使用 :delay hahah 2
傳送了一條兩秒鐘的延時訊息,另外一個客戶端將會在兩秒鐘之後收到該訊息。
具體的實現步驟會在後文繼續分析。
時間輪
要實現延時訊息就不得不提到一種資料結構【時間輪
】,時間輪聽這名字可以很直觀的抽象出它的資料結構。
其實本質上它就是一個環形的陣列,如圖所示,假設我們建立了一個長度為 8 的時間輪。
task0
= 當我們需要新建一個 5s 延時訊息,則只需要將它放到下標為 5 的那個槽中。
task1
= 而如果是一個 10s 的延時訊息,則需要將它放到下標為 2 的槽中,但同時需要記錄它所對應的圈數,不然就和 2 秒的延時訊息重複了。
task2
= 當建立一個 21s 的延時訊息時,它所在的位置就和 task0
相同了,都在下標為 5 的槽中,所以為了區別需要為他加上圈數為 2。
通過這張圖可以更直觀的理解。
當我們需要取出延時訊息時,只需要每秒往下移動這個指標,然後取出該位置的所有任務即可。
當然取出任務之前還得判斷圈數是否為 0 ,不為 0 時說明該任務還得再輪幾圈,同時需要將圈數 -1 。
這樣就可避免輪詢所有的任務,不過如果時間輪的槽比較少,導致某一個槽上的任務非常多那效率也比較低,這就和 HashMap
的 hash
衝突是一樣的。
編碼實現
理論講完後我們來看看實際的編碼實現,為此我建立了一個 RingBufferWheel
類。
它的主要功能如下:
- 可以新增指定時間的延時任務,在這個任務中可以實現自己的業務邏輯。
- 停止執行(包含強制停止和所有任務完成後停止)。
- 檢視待執行任務數量。
首先直接看看這個類是如何使用的。
我在這裡建立了 65 個延時任務,每個任務都比前一個延後 1s 執行;同時自定義了一個 Job
類來實現自己的業務邏輯,最後呼叫 stop(false)
會在所有任務執行完畢後退出。
建構函式
先來看看其中的建構函式,這裡一共有兩個建構函式,用於接收一個執行緒池及時間輪的大小。
執行緒池的作用會在後面講到。
這裡的時間輪大小也是有講究的,它的長度必須得是 2∧n
,至於為什麼有這個要求後面也會講到。
預設情況下會初始化一個長度為 64 的陣列。
新增任務
下面來看看新增任務的邏輯,根據我們之前的那張抽象圖其實很容易實現。
首先我們要定義一個 Task
類,用於抽象任務;它本身也是一個執行緒,一旦延時到期便會執行其中的 run 函式,所以使用時便可繼承該類,將業務邏輯寫在 run()
中即可。
它其中還有兩個成員變數,也很好理解。
cycleNum
用於記錄該任務所在時間輪的圈數。key
在這裡其實就是延時時間。
//通過 key 計算應該存放的位置
private Set<Task> get(int key) {
int index = mod(key, bufferSize);
return (Set<Task>) ringBuffer[index];
}
private int mod(int target, int mod) {
// equals target % mod
target = target + tick.get() ;
return target & (mod - 1);
}
首先是根據延時時間 (key
) 計算出所在的位置,其實就和 HashMap
一樣的取模運算,只不過這裡使用了位運算替代了取模,同時效率會高上不少。
這樣也解釋了為什麼陣列長度一定得是
2∧n
。
然後檢視該位置上是否存在任務,不存在就新建一個;存在自然就是將任務寫入這個集合並更新回去。
private int cycleNum(int target, int mod) {
//equals target/mod
return target >> Integer.bitCount(mod - 1);
}
其中的
cycleNum()
自然是用於計算該任務所處的圈數,也是考慮到效率問題,使用位運算替代了除法。
private void put(int key, Set<Task> tasks) {
int index = mod(key, bufferSize);
ringBuffer[index] = tasks;
}
而 put()
函式就非常簡單了,就是將任務寫入指定陣列下標即可。
啟動時間輪
任務寫進去後下一步便是啟動這個時間輪了,我這裡定義了一個 start()
函式。
其實本質上就是開啟了一個後臺執行緒來做這個事情:
它會一直從時間輪中取出任務來執行,而執行這些任務的執行緒便是我們在初始化時傳入的執行緒池;所以所有的延時任務都是由自定義的執行緒池排程完成的,這樣可以避免時間輪的阻塞。
這裡呼叫的 remove(index)
很容易猜到是用於獲取當前陣列中的所有任務。
邏輯很簡單就不再贅述,不過其中的 size2Notify()
倒是值得說一下。
他是用於在停止任務時,主執行緒等待所有延時任務執行完畢的喚醒條件。這類用法幾乎是所有執行緒間通訊的常規套路,值得收入技能包。
停止時間輪
剛才提到的喚醒主執行緒得配合這裡的停止方法使用:
如果是強制停止那便什麼也不管,直接更新停止標誌,同時關閉執行緒池即可。
但如果是軟停止(等待所有任務執行完畢)時,那就得通過上文提到的方式阻塞主執行緒,直到任務執行完畢後被喚醒。
CIM 中的應用
介紹了核心原理和基本 API
後,我們來看看實際業務場景如何結合使用(背景是一個即時通訊專案)。
我這裡所使用的場景在文初也提到了,就是真的傳送一條延時訊息;
現有的訊息都是實時訊息,所以要實現一個延時訊息便是在現有的傳送客戶端處將延時訊息放入到這個時間輪中,在任務到期時再執行真正的訊息傳送邏輯。
由於專案本身結合了 Spring
,所以第一步自然是配置 bean
。
bean
配置好後其實就可以使用了。
每當傳送的是延時訊息時,只需要將這個訊息封裝為一個 Job
放到時間輪中,然後在自己的業務類中完成業務即可。
後續可以優化下
api
,不用每次新增任務都要呼叫start()
方法。
這樣一個延時訊息的應用便完成了。
總結
時間輪這樣的應用還非常多,比如 Netty
中的 HashedWheelTimer
工具原理也差不多,可以用於維護長連線心跳資訊。
甚至 Kafka
在這基礎上還優化出了層級時間輪,這些都是後話了,大家感興趣的話可以自行搜尋資料或者抽時間我再完善一次。
這篇文章從前期準備到擼碼實現還是花了不少時間,如果對你有幫助的話還請點贊轉發。
本文的所有原始碼都可在此處查閱:
https://github.com/crossoverJie/cim
你的點贊與分享是對我最大的支援