1. 程式人生 > 其它 >定時器常見實現方式(時間堆、時間輪)

定時器常見實現方式(時間堆、時間輪)

談談兩種定時器結構設計——時間堆和時間輪 簡單實現了設計並聊聊其中一些有意思的小細節

需求

介面

首先需求場景主要有這幾種(簡化):

  1. 在 n 秒以後執行一個任務 X
  2. 每隔 n 秒執行一次任務 X
  3. 取消一個已經新增的定時器

根據上面的簡化需求,得到需要的主要介面:

  1. 新增一個定時器
  2. 定時器過期執行(可能需要重複執行)
  3. 取消一個定時器

資料結構

最後,就是考慮用來存放定時器的資料結構(也是定時器設計的核心)
上面的介面可以簡單的看成這幾個操作:

  1. 新增
  2. 刪除
  3. 查詢(獲取最近需要執行的一個)

對於這幾個操作,常用的複雜度比較均衡的資料結構:

  1. 紅黑樹
  2. 優先佇列(最小堆)

另外跳錶(本質類似紅黑樹)也是可用的;還有一種比較巧妙的hash結構時間輪,這是一個類似鐘錶指標的結構,將需要管理的定時器根據時間hash到陣列中,以提高查詢和新增的效率

這幾種實現,現在流行的開源框架都各有采用,比如 Linux 核心用的就是時間輪的實現

實現

下面簡單實現一下采用最小堆和時間輪作為資料結構的定時器,方便起見,編碼演示使用python

時間堆

所謂的“時間堆”並不是什麼稀奇東西,只是用小堆來管理以時間作為關鍵字的定時器集合
實現思路:

  1. 使用一個結構來儲存定時器物件的資料,包括過期時間、回撥函式等
  2. 所有的定時器物件儲存在一個優先佇列中(也就是最小堆),鍵值為時間
  3. 新增定時器即為向佇列中插入新項
  4. 刪除定時器可以使用惰性刪除,首先給對應的定時器物件置位表示其已經取消,當達到一定條件時(比如取消的定時器數量達到總數量的1/2),進行一個清理操作
  5. 按照固定的時間間隔來tick,每次tick都需要看看有沒有需要執行的定時器物件
  6. 因為物件由優先佇列管理,因此,當隊首元素不需要執行時,後面的元素則都不需要執行
  7. 一個物件需要執行時,可能需要將其刪除;也可能是需要重複執行的,則需要設定好時間後再次加入

實現程式碼如下(僅為演示):

class Timer():
    def __init__(self, owner, timestamp, callback, callargs, loop):
        self.owner: Timers = owner
        self.timestamp = timestamp
        self.callback = callback
        self.callargs = callargs
        self.cancelled = False
        self.loop = loop

    def __lt__(self, other):
        return self.timestamp < other.timestamp

    @property
    def Cancelled(self): return self.cancelled
    @property
    def Time(self): return self.timestamp

    def cancel(self):
        self.cancelled = True
        self.owner.onCancel()

    def trigger(self, now):
        if not self.cancelled:
            self.callback(now, *self.callargs)
            if self.loop > 0:
                self.timestamp += self.loop
            else:
                self.cancel()

class Timers():
    def __init__(self):
        self.timerQue = []
        self.cancelCount = 0

    def tick(self):
        now = int(time.time())
        self.process(now)

    def process(self, now):
        while len(self.timerQue) > 0:
            timer: Timer = self.timerQue[0]

            # 到期或取消的定時器需要處理
            if timer.Time <= now or timer.Cancelled:

                if not timer.Cancelled:
                    timer.trigger(now)

                if not timer.Cancelled:
                    heapq.heapreplace(self.timerQue, timer)
                else:
                    heapq.heappop(self.timerQue)
                    self.cancelCount -= 1
            else:
                break

    def onCancel(self):
        self.cancelCount += 1
        if self.cancelCount > len(self.timerQue)/2:
            self.purge()

    # 這裡處理的時候參考c++ std partition 操作,先將沒用的都放到陣列最後
    # 然後,將前面有用的重新進行一次建隊操作
    def purge(self):
        # patition
        l = 0
        r = len(self.timerQue) - 1
        while l < r:
            while l < r and not self.timerQue[l].Cancelled: l += 1
            while l < r and self.timerQue[r].Cancelled: r -= 1

            self.timerQue[l], self.timerQue[r] = self.timerQue[r], self.timerQue[l]
            l += 1
            r -= 1

        # remove
        mid = math.ceil((l + r) / 2)
        if not self.timerQue[mid].Cancelled:
            mid += 1
        del self.timerQue[mid:]

        # heapmake
        heapq.heapify(self.timerQue)

    def add(self, duration, callback, callargs, looptime):
        now = int(time.time())
        timer: Timer = Timer(self, now + duration, callback, callargs, looptime)
        heapq.heappush(self.timerQue, timer)
        return timer

# test
def func(callback):
    waitEvent = threading.Event()
    while not waitEvent.wait(1):
        callback()
def func_a(now):
    print('in func a', now)
def func_b(now):
    print('in func b', now)
def func_c(a, b):
    print('in func c: before sleep')
    time.sleep(30)
    print('in func c: after sleep')
    a.cancel()
    b.cancel()

timers = Timers()
threading.Thread(target=func, args=(lambda:timers.tick(), )).start()
timer_a = timers.add(0, func_a, (), 10)
timer_b = timers.add(5, func_b, (), 10)
threading.Thread(target=func_c, args=(timer_a, timer_b)).start()

時間輪

時間輪是一個比較巧妙地hash結構,性質有點類似鐘錶指標
首先考慮一個簡單的大小為60陣列t,每一位表示對應時間的集合:第0秒需要處理的事務都在t[0],第5秒需要處理的事務都在t[5],依次類推
這樣一來,當處在 x 秒時只需要去對應的陣列元素即可;但是同樣也帶來了一個問題:最多隻能處理一分鐘內的事務(陣列大小隻有60)

為了解決這個問題,有一個簡單的方案:在事務物件加一個欄位 nRound 用來表示迴圈使用這個陣列,當迴圈次數為 nRound 時才執行本事務
這樣一來就解決了陣列容量問題,但是隨之而來的就是效率變得低下了;本來使用hash的目的就是為了避免不必要的遍歷,可以直截了當地獲取當前需要處理地任務,而現在又要遍歷 t[n] 來判斷 nRound 是否為當前輪次了

基於此,就有了這種更優美地解決方案時間輪:再加一個大小為60陣列d,每一位還是表示對應時間地集合:第0分鐘要處理地事務都在d[0]中,第5分鐘需要處理地事務都在d[5]中,依次類推
當然,因為0-59秒的事務都放在了陣列t裡了,所以d[0]為空即可;當時間來到第1分鐘時,再將d[1]中的事務放置到t中對應位置即可
這樣一來,就已經可以處理1個小時內的任務了,可想而知,再加上更多的陣列就可以處理更長的時間跨度了
(很明顯,這裡的陣列大小和陣列數量只是一個進位制關係而已)

工作原理如下:

  1. 一級輪(就是一個數組)的每個格對應一個時間間隔
  2. 一級輪指標每次tick加1;當指標指向一級輪的某一格時,即表示這一格里的定時器都到期了
  3. 二級輪(包括更多級同理)的每個格對應一級輪的一圈
  4. 二級輪指標每當一級輪指標轉一圈加1;當指標指向某一個時,即表示接下來需要處理這一格的定時器了(分散到一級輪裡)

後面給出了一個簡單的實現,簡單起見就沒有再處理重複執行的情況
這裡再談談一些可以優化的地方:

  1. 指標和陣列大小:在Linux核心定時器的實現裡,採用了一個很巧妙的設計,一共5個數組,大小分別為 64(a) 64(b) 64(c) 64(d) 255(e) (64是2的6次方,255是2的8次方,也就總共佔了32位);這樣一來,只需利用整數的進位就可以自然的處理陣列之間的進位制關係了,這體現在,只需要一個32bit的指標和對應的位操作即可表示5個數組中的情況了,不再需要每個陣列分配一個指標(例如末8位表示陣列e的指標,之前的6位表示陣列d的指標)
  2. 陣列元素:更高階的陣列對應的時間刻度就越長,就看可能有更多甚至非常大量的事務擠在一個格里,這時候特殊需求的操作(比如查詢),可能就不能很好地支援,因此可以採用合適的資料結構來管理每個格里的事務;當然通常情況是不需要的,因為一般操作只是要把當前格里的事務hash到下一級的數組裡
  3. 取消定時器:還是跟上面時間輪的實現一樣,可以考慮採用惰性刪除的策略
class Timer():
    def __init__(self, delay, callback, callargs):
        self.delay = delay
        self.callback = callback
        self.callargs = callargs
        self.cancelled = False

    @property
    def Cancelled(self): return self.cancelled

    def cancel(self):
        self.cancelled = True

    def trigger(self):
        if not self.cancelled:
            self.callback(*self.callargs)
            self.cancelled = True

class Wheels():
    WHEEL_NUM = 3
    SOLT_NUM = 8
    MAX_NUM = int(math.pow(SOLT_NUM, WHEEL_NUM))
    def __init__(self):
        self.pointer = []
        self.wheels = []
        for i in range(self.WHEEL_NUM):
            self.pointer.append(0)
            self.wheels.append([])
            for j in range(self.SOLT_NUM):
                self.wheels[i].append([])
    
    def add(self, delay, func, args):
        if delay >= self.MAX_NUM: return

        past = delay
        wheel = self.WHEEL_NUM - 1
        while past >= self.SOLT_NUM:
            past = past // self.SOLT_NUM
            wheel -= 1
        solt = (self.pointer[wheel] + past) % self.SOLT_NUM
        delay = delay % int(math.pow(self.SOLT_NUM, self.WHEEL_NUM - wheel - 1))

        self.wheels[wheel][solt].append(Timer(delay, func, args))

    def tick(self):
        print('tick')
        for i in range(self.WHEEL_NUM - 1):
            while self.wheels[i][self.pointer[i]]:
                timer = self.wheels[i][self.pointer[i]].pop()
                self.add(timer.delay, timer.callback, timer.callargs)

        idx = self.WHEEL_NUM - 1
        while self.wheels[idx][self.pointer[idx]]:
            timer = self.wheels[idx][self.pointer[idx]].pop()
            timer.trigger()

        for i in range(self.WHEEL_NUM - 1, -1, -1):
            self.pointer[i] = (self.pointer[i] + 1) % self.SOLT_NUM
            if self.pointer[i] > 0:
                break

def func(callback):
    waitEvent = threading.Event()
    while not waitEvent.wait(1):
        callback()
wheels = Wheels()
threading.Thread(target=func, args=(lambda:wheels.tick(), )).start()

# test
def func_a():
    print('in func a')
def func_b():
    print('in func b')

timer_a = wheels.add(5, func_a, ())
timer_b = wheels.add(10, func_b, ())

總結

到這裡,關於定時器的實現就講完了;實現本身可能並不精彩,秒的是一些細節處的設計思想,最後再來回顧一下:

  1. 惰性刪除和清理操作:參考了開源引擎kbengine中定時器的實現;惰性刪除可以節省高頻操作的開銷,還可以減少頻繁的記憶體操作;在清理操作的細節中,先將有用和無用的定時器分離(分別放置到陣列前後),可以有效提高記憶體操作的效率
  2. 時間輪結構的設計:時間輪結構的設計非常巧妙,源於hash也兼顧了空間,並且將高頻操作(最近時間段)和低頻操作(較遠時間段)分離了開來
  3. 時間輪指標設計:利用整數進位,簡化時間輪指標