1. 程式人生 > >時間輪演算法(TimingWheel)是如何實現的?

時間輪演算法(TimingWheel)是如何實現的?

前言

我在2. SOFAJRaft原始碼分析—JRaft的定時任務排程器是怎麼做的?這篇文章裡已經講解過時間輪演算法在JRaft中是怎麼應用的,但是我感覺我並沒有講解清楚這個東西,導致看了這篇文章依然和沒看是一樣的,所以我打算重新說透時間輪演算法。

時間輪的應用並非 JRaft 獨有,其應用場景還有很多,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等元件中都存在時間輪的蹤影。

我們下面講解的時間輪的實現以JRaft中的為例子進行講解,因為JRaft這部分的程式碼是參考Netty的,所以大家也可以去Netty中去尋找原始碼實現。

時間輪用來解決什麼問題?

如果一個系統中存在著大量的排程任務,而大量的排程任務如果每一個都使用自己的排程器來管理任務的生命週期的話,浪費cpu的資源並且很低效。

時間輪是一種高效來利用執行緒資源來進行批量化排程的一種排程模型。把大批量的排程任務全部都繫結到同一個的排程器上面,使用這一個排程器來進行所有任務的管理(manager),觸發(trigger)以及執行(runnable)。能夠高效的管理各種延時任務,週期任務,通知任務等等。

不過,時間輪排程器的時間精度可能不是很高,對於精度要求特別高的排程任務可能不太適合。因為時間輪演算法的精度取決於,時間段“指標”單元的最小粒度大小,比如時間輪的格子是一秒跳一次,那麼排程精度小於一秒的任務就無法被時間輪所排程。

時間輪結構

如圖,JRaft中時間輪(HashedWheelTimer)是一個儲存定時任務的環形佇列,底層採用陣列實現,陣列中的每個元素可以存放一個定時任務列表(HashedWheelBucket),HashedWheelBucket是一個環形的雙向連結串列,連結串列中的每一項表示的都是定時任務項(HashedWheelTimeout),其中封裝了真正的定時任務(TimerTask)。

時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(tickDuration)。時間輪的時間格個數是固定的,可用 wheel.length 來表示。

時間輪還有一個錶盤指標(tick),用來表示時間輪當前指標跳動的次數,可以用tickDuration * (tick + 1)來表示下一次到期的任務,需要處理此時間格所對應的 HashedWheelBucket 中的所有任務。

時間輪執行邏輯

時間輪在啟動的時候會記錄一下當前啟動的時間賦值給startTime。時間輪在新增任務的時候首先會計算延遲時間(deadline),比如一個任務的延遲時間為24ms,那麼會將當前的時間(currentTime)+24ms-時間輪啟動時的時間(startTime)。然後將任務封裝成HashedWheelTimeout加入到timeouts佇列中,作為快取。

時間輪在執行的時候會將timeouts中快取的HashedWheelTimeout任務取10萬個出來進行遍歷。
然後需要計算出幾個引數值:

  1. HashedWheelTimeout的總共延遲的次數:將每個任務的延遲時間(deadline)/tickDuration 計算出tick需要總共跳動的次數;
  2. 計算時間輪round次數:根據計算的需要走的(總次數- 當前tick數量)/ 時間格個數(wheel.length)。比如tickDuration為1ms,時間格個數為20個,那麼時間輪走一圈需要20ms,那麼新增進一個延時為24ms的資料,如果當前的tick為0,那麼計算出的輪數為1,指標沒執行一圈就會將round取出來減一,所以需要轉動到第二輪之後才可以將輪數round減為0之後才會執行
  3. 計算出該任務需要放置到時間輪(wheel)的槽位,然後加入到槽位連結串列最後

將timeouts中的資料放置到時間輪wheel中之後,計算出當前時針走到的槽位的位置,並取出槽位中的連結串列資料,將deadline和當前的時間做對比,執行過期的資料。

原始碼分析

構造器

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                        long maxPendingTimeouts) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    //unit = MILLISECONDS
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    }
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    // 建立一個HashedWheelBucket陣列
    // 建立時間輪基本的資料結構,一個數組。長度為不小於ticksPerWheel的最小2的n次方
    wheel = createWheel(ticksPerWheel);
    // 這是一個標示符,用來快速計算任務應該呆的格子。
    // 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替:
    // 因為一圈的長度為2的n次方,mask = 2^n-1後低位將全部是1,然後deadline&mast == deadline%wheel.length
    // java中的HashMap在進行hash之後,進行index的hash定址定址的演算法也是和這個一樣的
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    //tickDuration傳入是1的話,這裡會轉換成1000000
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    // 校驗是否存在溢位。即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
                                                                                        / wheel.length));
    }
    //將worker包裝成thread
    workerThread = threadFactory.newThread(worker);
    //maxPendingTimeouts = -1
    this.maxPendingTimeouts = maxPendingTimeouts;

    //如果HashedWheelTimer例項太多,那麼就會列印一個error日誌
    if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
        && warnedTooManyInstances.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

在這個構造器中有幾個細節需要注意:

  1. 呼叫createWheel方法建立的wheel陣列一定是2次方數,比如傳入的ticksPerWheel是6,那麼初始化的wheel長度一定是8。這樣做是為了讓mask & tick 來計算出槽位
  2. tickDuration用的是納秒
  3. 在構造裡面並不會裡面啟動時間輪,而是要等到有第一個任務加入到時間輪的時候才啟動。在構造器裡面會將工作執行緒worker封裝成workerThread

放入任務到時間輪中

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                                             + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
    // 如果時間輪沒有啟動,則啟動
    start();

    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    //在delay為正數的情況下,deadline是不可能為負數
    //如果為負數,那麼說明超過了long的最大值
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // 這裡定時任務不是直接加到對應的格子中,而是先加入到一個佇列裡,然後等到下一個tick的時候,
    // 會從佇列裡取出最多100000個任務加入到指定的格子中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    //Worker會去處理timeouts佇列裡面的資料
    timeouts.add(timeout);
    return timeout;
}
  1. 如果時間輪沒有啟動,那麼就呼叫start方法啟動時間輪,啟動時間輪之後會為startTime設定為當前時間
  2. 計算延遲時間deadline
  3. 將task任務封裝到HashedWheelTimeout中,然後新增到timeouts佇列中進行快取

start

private final CountDownLatch                                     startTimeInitialized   = new CountDownLatch(1);

public void start() {
    //workerState一開始的時候是0(WORKER_STATE_INIT),然後才會設定為1(WORKER_STATE_STARTED)
    switch (workerStateUpdater.get(this)) {
        case WORKER_STATE_INIT:
            //使用cas來獲取啟動排程的權力,只有競爭到的執行緒允許來進行例項啟動
            if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                //如果成功設定了workerState,那麼就呼叫workerThread執行緒
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // 等待worker執行緒初始化時間輪的啟動時間
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            //這裡使用countDownLauch來確保排程的執行緒已經被啟動
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

start方法會根據當前的workerState狀態來啟動時間輪。並且用了startTimeInitialized來控制執行緒的執行,如果workerThread沒有啟動起來,那麼newTimeout方法會一直阻塞在執行start方法中。如果不阻塞,newTimeout方法會獲取不到startTime。

啟動時間輪

時間輪的啟動在HashedWheelTimer的內部類Worker中。呼叫workerThread#start方法會呼叫Worker的run方法啟動時間輪。

下面我們看時間輪啟動做了什麼,下面的分析不考慮任務被取消的情況。

Worker#run

public void run() {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    //HashedWheelTimer的start方法會繼續往下執行
    // Notify the other threads waiting for the initialization at start().
    startTimeInitialized.countDown();

    do {
        //返回的是當前的nanoTime- startTime
        //也就是返回的是 每 tick 一次的時間間隔
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            //算出時間輪的槽位
            int idx = (int) (tick & mask);
            //移除cancelledTimeouts中的bucket
            // 從bucket中移除timeout
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // 將newTimeout()方法中加入到待處理定時任務佇列中的任務加入到指定的格子中
            transferTimeoutsToBuckets();
            bucket.expireTimeouts(deadline);
            tick++;
        }
    //    校驗如果workerState是started狀態,那麼就一直迴圈
    } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        //如果有沒有被處理的timeout,那麼加入到unprocessedTimeouts對列中
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    //處理被取消的任務
    processCancelledTasks();
}
  1. 時間輪執行的時候首先會記錄一下啟動時間(startTime),然後呼叫startTimeInitialized釋放外層的等待執行緒;
  2. 進入dowhile迴圈,呼叫waitForNextTick睡眠等待到下一次的tick指標的跳動,並返回當前時間減去startTime作為deadline
  3. 由於mask= wheel.length -1 ,wheel是2的次方數,所以可以直接用tick & mask 計算出此次在wheel中的槽位
  4. 呼叫processCancelledTasks將cancelledTimeouts佇列中的任務取出來,並將當前的任務從時間輪中移除
  5. 呼叫transferTimeoutsToBuckets方法將timeouts佇列中快取的資料取出加入到時間輪中
  6. 執行目前指標指向的槽位中的bucket連結串列資料

時間輪指標跳動

waitForNextTick

//sleep, 直到下次tick到來, 然後返回該次tick和啟動時間之間的時長
private long waitForNextTick() {
    //tickDuration這裡是100000
    //tick表示總tick數
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        // 計算需要sleep的時間, 之所以加999999後再除10000000,前面是1所以這裡需要減去1,
        // 才能計算準確,還有通過這裡可以看到 其實執行緒是以睡眠一定的時候再來執行下一個ticket的任務的,
        //這樣如果ticket的間隔設定的太小的話,系統會頻繁的睡眠然後啟動,
        //其實感覺影響部分的效能,所以為了更好的利用系統資源步長可以稍微設定大點
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        //sleepTimeMs小於零表示走到了下一個時間輪位置
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (Platform.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }

        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

可以想象一下在時鐘的秒鐘上面秒與秒之間的時間是需要等待的,那麼waitForNextTick這個方法就是根據當前的時間計算出跳動到下個時間的間隔時間,並進行sleep操作,然後返回當前時間距離時間輪啟動時間的時間段。

轉移任務到時間輪中

在呼叫時間輪的方法加入任務的時候並沒有直接加入到時間輪中,而是快取到了timeouts佇列中,所以在執行的時候需要將timeouts佇列中的任務轉移到時間輪資料的連結串列中

transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
    // adds new timeouts in a loop.
    // 每次tick只處理10w個任務,以免阻塞worker執行緒
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        //已經被取消了;
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        //calculated = tick 次數
        long calculated = timeout.deadline / tickDuration;
        // 計算剩餘的輪數, 只有 timer 走夠輪數, 並且到達了 task 所在的 slot, task 才會過期
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        //如果任務在timeouts佇列裡面放久了, 以至於已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法呼叫完後就會被執行
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        //// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        //將timeout加入到bucket連結串列中
        bucket.addTimeout(timeout);
    }
}

在這個轉移方法中,寫死了一個迴圈,每次都只轉移10萬個任務。

然後根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要執行多少次才能運行當前的任務,如果當前的任務延遲時間大於時間輪跑一圈所需要的時間,那麼就計算需要跑幾圈才能到這個任務執行。

最後計算出該任務在時間輪中的槽位,新增到時間輪的連結串列中。

執行時間輪中的任務

當指標跳到時間輪的槽位的時間,會將槽位的HashedWheelBucket取出來,然後遍歷連結串列,執行其中到期的任務。

expireTimeouts

// 過期並執行格子中的到期任務,tick到該格子的時候,worker執行緒會呼叫這個方法
//根據deadline和remainingRounds判斷任務是否過期
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    //遍歷格子中的所有定時任務
    while (timeout != null) {
        // 先儲存next,因為移除後next將被設定為null
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            //從bucket連結串列中移除當前timeout,並返回連結串列中下一個timeout
            next = remove(timeout);
            //如果timeout的時間小於當前的時間,那麼就呼叫expire執行task
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                //不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
                        timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            //因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
            timeout.remainingRounds--;
        }
        //把指標放置到下一個timeout
        timeout = next;
    }
}

HashedWheelBucket是一個連結串列,所以我們需要從head節點往下進行遍歷。如果連結串列沒有遍歷到連結串列尾部那麼就繼續往下遍歷。

獲取的timeout節點節點,如果剩餘輪數remainingRounds大於0,那麼就說明要到下一圈才能執行,所以將剩餘輪數減一;

如果當前剩餘輪數小於等於零了,那麼就將當前節點從bucket連結串列中移除,並判斷一下當前的時間是否大於timeout的延遲時間,如果是則呼叫timeout的expire執行任務