定時器常見實現方式(時間堆、時間輪)
需求
介面
首先需求場景主要有這幾種(簡化):
- 在 n 秒以後執行一個任務 X
- 每隔 n 秒執行一次任務 X
- 取消一個已經新增的定時器
根據上面的簡化需求,得到需要的主要介面:
- 新增一個定時器
- 定時器過期執行(可能需要重複執行)
- 取消一個定時器
資料結構
最後,就是考慮用來存放定時器的資料結構(也是定時器設計的核心)
上面的介面可以簡單的看成這幾個操作:
- 新增
- 刪除
- 查詢(獲取最近需要執行的一個)
對於這幾個操作,常用的複雜度比較均衡的資料結構:
- 紅黑樹
- 優先佇列(最小堆)
另外跳錶(本質類似紅黑樹)也是可用的;還有一種比較巧妙的hash結構時間輪
,這是一個類似鐘錶指標的結構,將需要管理的定時器根據時間hash到陣列中,以提高查詢和新增的效率
這幾種實現,現在流行的開源框架都各有采用,比如 Linux 核心用的就是時間輪的實現
實現
下面簡單實現一下采用最小堆和時間輪作為資料結構的定時器,方便起見,編碼演示使用python
時間堆
所謂的“時間堆”並不是什麼稀奇東西,只是用小堆來管理以時間作為關鍵字的定時器集合
實現思路:
- 使用一個結構來儲存定時器物件的資料,包括過期時間、回撥函式等
- 所有的定時器物件儲存在一個優先佇列中(也就是最小堆),鍵值為時間
- 新增定時器即為向佇列中插入新項
- 刪除定時器可以使用惰性刪除,首先給對應的定時器物件置位表示其已經取消,當達到一定條件時(比如取消的定時器數量達到總數量的1/2),進行一個清理操作
- 按照固定的時間間隔來tick,每次tick都需要看看有沒有需要執行的定時器物件
- 因為物件由優先佇列管理,因此,當隊首元素不需要執行時,後面的元素則都不需要執行
- 一個物件需要執行時,可能需要將其刪除;也可能是需要重複執行的,則需要設定好時間後再次加入
實現程式碼如下(僅為演示):
class Timer(): def __init__(self, owner, timestamp, callback, callargs, loop): self.owner: Timers = owner self.timestamp = timestamp self.callback = callback self.callargs = callargs self.cancelled = False self.loop = loop def __lt__(self, other): return self.timestamp < other.timestamp @property def Cancelled(self): return self.cancelled @property def Time(self): return self.timestamp def cancel(self): self.cancelled = True self.owner.onCancel() def trigger(self, now): if not self.cancelled: self.callback(now, *self.callargs) if self.loop > 0: self.timestamp += self.loop else: self.cancel() class Timers(): def __init__(self): self.timerQue = [] self.cancelCount = 0 def tick(self): now = int(time.time()) self.process(now) def process(self, now): while len(self.timerQue) > 0: timer: Timer = self.timerQue[0] # 到期或取消的定時器需要處理 if timer.Time <= now or timer.Cancelled: if not timer.Cancelled: timer.trigger(now) if not timer.Cancelled: heapq.heapreplace(self.timerQue, timer) else: heapq.heappop(self.timerQue) self.cancelCount -= 1 else: break def onCancel(self): self.cancelCount += 1 if self.cancelCount > len(self.timerQue)/2: self.purge() # 這裡處理的時候參考c++ std partition 操作,先將沒用的都放到陣列最後 # 然後,將前面有用的重新進行一次建隊操作 def purge(self): # patition l = 0 r = len(self.timerQue) - 1 while l < r: while l < r and not self.timerQue[l].Cancelled: l += 1 while l < r and self.timerQue[r].Cancelled: r -= 1 self.timerQue[l], self.timerQue[r] = self.timerQue[r], self.timerQue[l] l += 1 r -= 1 # remove mid = math.ceil((l + r) / 2) if not self.timerQue[mid].Cancelled: mid += 1 del self.timerQue[mid:] # heapmake heapq.heapify(self.timerQue) def add(self, duration, callback, callargs, looptime): now = int(time.time()) timer: Timer = Timer(self, now + duration, callback, callargs, looptime) heapq.heappush(self.timerQue, timer) return timer # test def func(callback): waitEvent = threading.Event() while not waitEvent.wait(1): callback() def func_a(now): print('in func a', now) def func_b(now): print('in func b', now) def func_c(a, b): print('in func c: before sleep') time.sleep(30) print('in func c: after sleep') a.cancel() b.cancel() timers = Timers() threading.Thread(target=func, args=(lambda:timers.tick(), )).start() timer_a = timers.add(0, func_a, (), 10) timer_b = timers.add(5, func_b, (), 10) threading.Thread(target=func_c, args=(timer_a, timer_b)).start()
時間輪
時間輪是一個比較巧妙地hash結構,性質有點類似鐘錶指標
首先考慮一個簡單的大小為60陣列t
,每一位表示對應時間的集合:第0秒需要處理的事務都在t[0]
,第5秒需要處理的事務都在t[5]
,依次類推
這樣一來,當處在 x 秒時只需要去對應的陣列元素即可;但是同樣也帶來了一個問題:最多隻能處理一分鐘內的事務(陣列大小隻有60)
為了解決這個問題,有一個簡單的方案:在事務物件加一個欄位 nRound 用來表示迴圈使用這個陣列,當迴圈次數為 nRound 時才執行本事務
這樣一來就解決了陣列容量問題,但是隨之而來的就是效率變得低下了;本來使用hash的目的就是為了避免不必要的遍歷,可以直截了當地獲取當前需要處理地任務,而現在又要遍歷 t[n]
來判斷 nRound 是否為當前輪次了
基於此,就有了這種更優美地解決方案時間輪
:再加一個大小為60陣列d
,每一位還是表示對應時間地集合:第0分鐘要處理地事務都在d[0]
中,第5分鐘需要處理地事務都在d[5]
中,依次類推
當然,因為0-59秒的事務都放在了陣列t
裡了,所以d[0]
為空即可;當時間來到第1分鐘時,再將d[1]
中的事務放置到t
中對應位置即可
這樣一來,就已經可以處理1個小時內的任務了,可想而知,再加上更多的陣列就可以處理更長的時間跨度了
(很明顯,這裡的陣列大小和陣列數量只是一個進位制關係而已)
工作原理如下:
- 一級輪(就是一個數組)的每個格對應一個時間間隔
- 一級輪指標每次tick加1;當指標指向一級輪的某一格時,即表示這一格里的定時器都到期了
- 二級輪(包括更多級同理)的每個格對應一級輪的一圈
- 二級輪指標每當一級輪指標轉一圈加1;當指標指向某一個時,即表示接下來需要處理這一格的定時器了(分散到一級輪裡)
後面給出了一個簡單的實現,簡單起見就沒有再處理重複執行的情況
這裡再談談一些可以優化的地方:
- 指標和陣列大小:在Linux核心定時器的實現裡,採用了一個很巧妙的設計,一共5個數組,大小分別為 64(a) 64(b) 64(c) 64(d) 255(e) (64是2的6次方,255是2的8次方,也就總共佔了32位);這樣一來,只需利用整數的進位就可以自然的處理陣列之間的進位制關係了,這體現在,只需要一個32bit的指標和對應的位操作即可表示5個數組中的情況了,不再需要每個陣列分配一個指標(例如末8位表示陣列e的指標,之前的6位表示陣列d的指標)
- 陣列元素:更高階的陣列對應的時間刻度就越長,就看可能有更多甚至非常大量的事務擠在一個格里,這時候特殊需求的操作(比如查詢),可能就不能很好地支援,因此可以採用合適的資料結構來管理每個格里的事務;當然通常情況是不需要的,因為一般操作只是要把當前格里的事務hash到下一級的數組裡
- 取消定時器:還是跟上面時間輪的實現一樣,可以考慮採用惰性刪除的策略
class Timer():
def __init__(self, delay, callback, callargs):
self.delay = delay
self.callback = callback
self.callargs = callargs
self.cancelled = False
@property
def Cancelled(self): return self.cancelled
def cancel(self):
self.cancelled = True
def trigger(self):
if not self.cancelled:
self.callback(*self.callargs)
self.cancelled = True
class Wheels():
WHEEL_NUM = 3
SOLT_NUM = 8
MAX_NUM = int(math.pow(SOLT_NUM, WHEEL_NUM))
def __init__(self):
self.pointer = []
self.wheels = []
for i in range(self.WHEEL_NUM):
self.pointer.append(0)
self.wheels.append([])
for j in range(self.SOLT_NUM):
self.wheels[i].append([])
def add(self, delay, func, args):
if delay >= self.MAX_NUM: return
past = delay
wheel = self.WHEEL_NUM - 1
while past >= self.SOLT_NUM:
past = past // self.SOLT_NUM
wheel -= 1
solt = (self.pointer[wheel] + past) % self.SOLT_NUM
delay = delay % int(math.pow(self.SOLT_NUM, self.WHEEL_NUM - wheel - 1))
self.wheels[wheel][solt].append(Timer(delay, func, args))
def tick(self):
print('tick')
for i in range(self.WHEEL_NUM - 1):
while self.wheels[i][self.pointer[i]]:
timer = self.wheels[i][self.pointer[i]].pop()
self.add(timer.delay, timer.callback, timer.callargs)
idx = self.WHEEL_NUM - 1
while self.wheels[idx][self.pointer[idx]]:
timer = self.wheels[idx][self.pointer[idx]].pop()
timer.trigger()
for i in range(self.WHEEL_NUM - 1, -1, -1):
self.pointer[i] = (self.pointer[i] + 1) % self.SOLT_NUM
if self.pointer[i] > 0:
break
def func(callback):
waitEvent = threading.Event()
while not waitEvent.wait(1):
callback()
wheels = Wheels()
threading.Thread(target=func, args=(lambda:wheels.tick(), )).start()
# test
def func_a():
print('in func a')
def func_b():
print('in func b')
timer_a = wheels.add(5, func_a, ())
timer_b = wheels.add(10, func_b, ())
總結
到這裡,關於定時器的實現就講完了;實現本身可能並不精彩,秒的是一些細節處的設計思想,最後再來回顧一下:
- 惰性刪除和清理操作:參考了開源引擎kbengine中定時器的實現;惰性刪除可以節省高頻操作的開銷,還可以減少頻繁的記憶體操作;在清理操作的細節中,先將有用和無用的定時器分離(分別放置到陣列前後),可以有效提高記憶體操作的效率
- 時間輪結構的設計:時間輪結構的設計非常巧妙,源於hash也兼顧了空間,並且將高頻操作(最近時間段)和低頻操作(較遠時間段)分離了開來
- 時間輪指標設計:利用整數進位,簡化時間輪指標