1. 程式人生 > 其它 >netty和dubbo的hashwheel時間輪定時器原理解析

netty和dubbo的hashwheel時間輪定時器原理解析

技術標籤:nettydubbojava時間輪演算法nettydubbo

時間輪定時器

時間輪定時器的論文ppt參考:論文ppt:http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt

hash時間輪結構

時間輪結構

基本過程跟時鐘類似,秒針每間隔一段時間跳一格,每格桶判斷儲存的任務列表中的任務是否超時。涉及的兩個重要概念:
1.Tick Duration

時間輪跳動的時間間隔,預設是100ms,不是精確的時間。

2.Ticks per Wheel (Wheel Size)
時間輪的格數預設是 512.

時間輪處理過程

理解時間輪的工作原理需要理解這幾個過程:

  1. 時間指標的跳動

  2. 指標跳動對應到時間輪上桶的位置

  3. 桶內部如何增加移除任務

  4. 桶內部任務如何進行超時判斷

  5. 任務需要等待多少圈數及圈數的維護

Worker & WorkerThread

時間輪的核心邏輯都在workerThread的Worker中執行, 建立timer時建立worker執行緒執行worker任務,死迴圈。包括:

  1. 時間指標的跳動
    1. waitForNextTick
  2. 指標跳動對應到時間輪上桶的位置
    1. int stopIndex = (int) (ticks & mask);
  3. 任務需要等待多少圈數及圈數的維護
    1.

private final class Worker
implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; @Override public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime ==
0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { //計算下次跳動的起始時間,為當前桶任務最大的截止時間 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //timer被stop了 // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); } //從timeouts佇列新增到時間輪的桶中,定位到桶,然後插入到連結串列 private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } //根據超時時間計算需要經過多少輪過期 long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. //定位到桶的index,ticks mod 時間輪桶個數 mask=buckets.size-1 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } } private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) 等待下次跳動指標的時間,時間間隔 */ private long waitForNextTick() { // 計算下次跳動的時間,為當前桶任務最大的截止時間 long deadline = tickDuration * (tick + 1); for (;;) { final long currentTime = System.nanoTime() - startTime; // (deadline-currenttime)ms + 1ms long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //到時間了返回,tick+1 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { //curenttime>=deadline return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { //沒到時間睡眠一會 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } }

HashedWheelBucket && Timeout

Timeout是任務的抽象,bucket是任務的容器。
HashedWheelBucket 類似連結串列,儲存每個刻度的任務,expireTimeouts方法移除過期任務列表

  1. 桶內部如何增加移除任務
  2. 桶內部任務如何進行超時判斷操作
    1. 任務的輪數維護也在這裡處理

兩個過程在桶內部實現,見expireTimeouts程式碼

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        //如果任務是到期的任務,直接移除,remaininrounds指任務還需要等幾輪才到期
        if (timeout.remainingRounds <= 0) {
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            //每輪經過一輪,等待輪數減1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}