1. 程式人生 > >週期性執行緒池與主要原始碼解析

週期性執行緒池與主要原始碼解析

之前學習ThreadPool的使用以及原始碼剖析,並且從面試的角度去介紹知識點的解答。今天小強帶來週期性執行緒池的使用和重點原始碼剖析。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor:用來處理延時任務或定時任務
定時執行緒池類的類結構圖

ScheduledThreadPoolExecutor接收ScheduleFutureTask型別的任務,是執行緒池排程任務的最小單位。
它採用DelayQueue儲存等待的任務:
1、DelayQueue內部封裝成一個PriorityQueue,它會根據time的先後時間順序,如果time相同則根絕sequenceNumber排序;
2、DelayQueue是無界佇列;

ScheduleFutureTask

接收的引數:

private final long sequenceNumber;//任務的序號
private long time;//任務開始的時間
private final long period;//任務執行的時間間隔

工作執行緒的的執行過程:
工作執行緒會從DelayQueue取出已經到期的任務去執行;
執行結束後重新設定任務的到期時間,再次放回DelayQueue;

ScheduledThreadPoolExecutor會把待執行的任務放到工作佇列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對佇列中的ScheduledFutureTask進行排序,具體的排序演算法實現如下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        //首先按照time排序,time小的排到前面,time大的排到後面
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        //time相同,按照sequenceNumber排序;
        //sequenceNumber小的排在前面,sequenceNumber大的排在後面 
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

接下來看看ScheduledFutureTask的run方法實現, run方法是排程task的核心,task的執行實際上是run方法的執行。

public void run() {
    //是否是週期性的
    boolean periodic = isPeriodic();
    //執行緒池是shundown狀態不支援處理新任務,直接取消任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //如果不需要執行執行週期性任務,直接執行run方法結束
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //如果需要週期性執行,則在執行任務完成後,設定下一次執行時間
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設定下一次執行該任務的時間
        setNextRunTime();
        //重複執行該任務
        reExecutePeriodic(outerTask);
    }
}

run方法的執行步驟:

  • 1、如果執行緒池是shundown狀態不支援處理新任務,直接取消任務,否則步驟2;
  • 2、如果不是週期性任務,直接呼叫ScheduledFutureTask的run方法執行,會設定執行結果,然後直接返回,否則步驟3;
  • 3、如果是週期性任務,呼叫ScheduledFutureTask的runAndset方法執行,不會設定執行結果,然後直接返回,否則執行步驟4和步驟5;
  • 4、計算下一次執行該任務的時間;
  • 5、重複執行該任務;

接下來看下reExecutePeriodic方法的執行步驟:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

由於已經執行過一次週期性任務,所以不會reject當前任務,同時傳入的任務一定是週期性任務。

週期性執行緒池任務的提交方式

週期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和原始碼兩個方面進行說明,首先是如果提交任務:

pool.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("延遲執行");
    }
},1, TimeUnit.SECONDS);

/**
 * 這個執行週期是固定,不管任務執行多長時間,每過3秒中就會產生一個新的任務
 */
pool.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //這個業務邏輯需要很長的時間,超過了3秒
        System.out.println("重複執行");
    }
},1,3,TimeUnit.SECONDS);

pool.shutdown();

/**
 * 假如run方法30min後執行完成,然後間隔3秒,再週期性執行下一個任務
 */
pool.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        //30min
        System.out.println("重複執行");
    }
},1,3,TimeUnit.SECONDS);

知道了如何提交週期性任務,接下來原始碼是如何執行的,首先是schedule方法,該方法是指任務在指定延遲時間到達後觸發,只會執行一次。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //把任務封裝成ScheduledFutureTask,之後呼叫decorateTask進行包裝;
    //decorateTask方法是空方法,留給使用者去實現的;
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //包裝好任務之後,進行任務的提交                                  
    delayedExecute(t);
    return t;
}

任務提交方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果執行緒池不是RUNNING狀態,則使用拒絕策略把提交任務拒絕掉
    if (isShutdown())
        reject(task);
    else {
        //與ThreadPoolExecutor不同,這裡直接把任務加入延遲佇列
        super.getQueue().add(task);
        //如果當前狀態無法執行任務,則取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
        //和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;
        //增加Worker,確保提交的任務能夠被執行
            ensurePrestart();
    }
}

還沒關注我的公眾號?

  • 掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
  • 學習資料: 1T視訊教程:涵蓋Javaweb前後端教學視訊、機器學習/人工智慧教學視訊、Linux系統教程視訊、雅思考試視訊教程;
  • 100多本書:包含C/C++、Java、Python三門程式語言的經典必看圖書、LeetCode題解大全;
  • 軟體工具:幾乎包括你在程式設計道路上的可能會用到的大部分軟體;
  • 專案原始碼:20個JavaWeb專案原始碼。