1. 程式人生 > >Executor Framework分析 (二) ThreadPoolExecutor主要引數分析

Executor Framework分析 (二) ThreadPoolExecutor主要引數分析

本篇部落格主要記錄ThreadPoolExecutor主要引數的含義,
並分析相關介面的具體實現。

很多時候,當我們不需要指定執行緒池的執行細節時,
會直接利用工具類Executors建立執行緒池,例如:

public class Executors {
    //建立固定大小的執行緒池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0
L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ........ //建立可複用的執行緒池 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60
L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } ...... }

從上述程式碼可以看出,Executors提供的方法,
實際上就是呼叫了ThreadPoolExecutor的建構函式,
只不過傳入了不同的引數。

因此,我們有必要分析下ThreadPoolExecutor的建構函式,
瞭解下引數的具體含義,以便分析ThreadPoolExecutor的設計意圖。

ThreadPoolExecutor的建構函式如下所示:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...................
    }

接下來,我們來看看每個引數的含義。

一、corePoolSize和maximumPoolSize
corePoolSize和maximumPoolSize決定了執行緒池建立執行緒的方式和數量。

根據ThreadPoolExecutor的描述來看:
當新的任務被提交給執行緒池時,執行緒池會檢視當前執行緒的數量。

如果當前執行緒的數量小於corePoolSize時,那麼執行緒池就會創建出新的執行緒,
即使此時有空閒的執行緒存在。

如果當前執行緒的數量大於等於corePoolSize,且小於maximumPoolSize,
只有執行緒池的快取佇列滿了時,才會建立新的執行緒。

為了驗證這段描述,我們可以看看相關的原始碼:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .........
    public void execute(Runnable command) {
        .......
        int c = ctl.get();
        //當前執行緒數小於corePoolSize時,
        if (workerCountOf(c) < corePoolSize) {
            //直接addWorker,增加執行緒,引數為true
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //執行緒數大於corePoolSize時,先嚐試將task加入佇列workQueue
        if (isRunning(c) && workQueue.offer(command)) {
            ..............
        }
        //加入佇列失敗後,即佇列是滿的,同樣呼叫addWorker方法
        //引數為false
        else if (!addWorker(command, false))
            //建立失敗,呼叫reject函式處理
            reject(command);
    }

    ...............

我們跟進一下addWorker函式:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            ...........
            for (;;) {
                //得到當前執行緒數量
                int wc = workerCountOf(c);

                //若引數為true時,表示建立核心執行緒,總數必須小於corePoolSize
                //若引數為false時,表示建立非核心執行緒,總數必須小於maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

                //增加執行緒數量,利用標號跳出迴圈
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                ...............
            }
        }

        //建立實際的worker,即工作執行緒
        ..............
    }
    ................
}

從上面的程式碼可以看出,當ThreadPoolExecutor收到新的任務時,
即execute介面被呼叫後,才會呼叫addWorker函式建立執行緒。

如果想盡快地創建出核心執行緒,ThreadPoolExecutor提供了
prestartCoreThread和prestartAllCoreThreads函式,如下所示:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ..........
    //建立一個core thread
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    ..........
    //創建出所有的core thread
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }
    ..........
}

二、keepAliveTime
當執行緒的數量超過corePoolSize後,為了減少對系統資源的消耗,
一旦執行緒池發現某個執行緒空閒的時間超過了keepAliveTime,
就會主動結束該空閒執行緒,以釋放系統資源。

一般情況下,只有執行緒數量超過corePoolSize時,
才會試圖回收空閒執行緒的資源(執行緒數量小於corePoolSize時,停止回收)。
不過,如果執行緒池呼叫了allowCoreThreadTimeOut介面,
則可以回收所有空閒的執行緒資源執行緒。

接下來,我們來看看對應的程式碼:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...............
    //我們仍然從addWorker入手
    private boolean addWorker(Runnable firstTask, boolean core) {
        .........
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //建立worker, 並得到其中的thread
            w = new Worker(firstTask);
            final Thread t = w.thread;

            if (t != null) {
                //判斷執行緒池狀態,決定是否能增加worker
                ..............
                if (workerAdded) {
                    //若增加了worker,則呼叫對應執行緒的start方法
                    t.start();
                    workerStarted = true;
                }
            }
        } .........
        ...........
    }
    ............

我們看看Worker類的實現:

    //Worker實現了Runnable介面
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        ..........
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;

            //呼叫getThreadFactory獲取ThreadPoolExecutor對應的ThreadFactory
            //建立thread時,傳入了worker作為runnable
            //因此,前文的執行緒start後,會呼叫worker的run方法
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            //繼續跟進runWorker函式
            runWorker(this);
        }
        .........
    }

跟進一下runWorker函式:

    ............

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                //不斷獲取task並執行
                ............
            }
            completedAbruptly = false;
        } finally {
            //沒有task可執行時,就會移除worker,即回收對應的資源
            processWorkerExit(w, completedAbruptly);
        }
    }
    ............
}

從上面的程式碼可以看出,worker被建立後就會一直獲取Task來執行。
如果獲取不到task,worker就會被移除,對應的執行緒將會被回收。

最後,我們跟進一下getTask函式:

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        ...........
        int wc = workerCountOf(c);

        //在不呼叫allowCoreThreadTimeOut的條件下
        //執行緒池中執行緒數量大於corePoolSize時,就會判斷生存時間
        //呼叫了allowCoreThreadTimeOut後,可將allowCoreThreadTimeOut置為true
        //所有執行緒都需要判斷生存時間
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //這裡我們關注timedOut
        //後面的程式碼決定了它的值
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //從workQueue中取出待執行的task
            //此處說明,當需要判斷生存時間時,僅等待keepAliveTime
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //沒獲取到task時,將timedOut置為true
            //於是下一輪for迴圈時,return null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

根據上面的程式碼應該不難理解keepAliveTime的含義。
執行緒池中的執行緒創建出來後,都是不斷地嘗試從同一個workQueue中獲取任務。
keepAliveTime就是控制執行緒空閒時的生命週期。

三、WorkQueue
WorkQueue主要用於儲存提交到執行緒池的任務。

根據第一部分提及的execute函式的原始碼,
我們可以得出以下結論:
1、當執行緒數量小於corePoolSize時,執行緒池會優先建立執行緒,
不會將任務加入到WorkQueue中;
2、當執行緒數大於corePoolSize時,執行緒池會優先將任務加入到WorkQueue中,
而不是建立新的執行緒;
3、當WorkQueue中任務數量達到上限時,執行緒池才會繼續建立執行緒,
直到執行緒數達到maximumPoolSize;
4、如果執行緒數達到了maximumPoolSize,且WorkQueue中任務數量達到上限,
那麼新來的任務將被丟棄掉。

常用的WorkQueue型別主要有以下三種:
3.1 無緩衝的佇列
這種佇列的代表是SynchronousQueue,其特點是:
收到任務後,會立即轉交給工作執行緒處理,即佇列本身不會快取任務。

當執行緒池使用該型別佇列時,如果有新任務到來,
但沒有空閒執行緒,那麼會立即嘗試建立新的執行緒。
因此使用這種型別的佇列時,為了避免任務被拒絕掉,
maximumPoolSizes一般被設定為Integer.MAX_VALUE。

3.2 無限快取容量的佇列
這種佇列的代表是不設定容量上限的LinkedBlockingQueue(容量的上限比較大)。

當執行緒池使用該型別佇列時,一旦執行緒的數量達到corePoolSize,
那麼新到來的任務就會被加入到佇列中。
於是,執行緒池建立執行緒的數量上限就是corePoolSize(maximumPoolSize沒有意義)。

3.3 有限容量的佇列
這種佇列的代表是ArrayBlockingQueue。

當執行緒池使用該型別佇列時,處理任務的行為,
與第三部分開頭描述的完全一致。
即在佇列滿時,執行緒池就需要建立新的執行緒,
直到執行緒數量達到maximumPoolSizes。

容易看出,使用有限容量佇列可以有效地節省系統資源。
不過,需要仔細平衡佇列容量和maximumPoolSizes的值:
當佇列容量較大,maximumPoolSizes較小時,雖然可以降低CPU使用率,
降低消耗的系統資源及執行緒切換帶來的開銷,不過此時工作的吞吐率將會降低;
相對地,當佇列容量較小,就需要更大的maximumPoolSizes,
此時會提高CPU使用率,但增大執行緒切換的開銷。

四、ThreadFactory
ThreadFactory負責建立實際的執行緒。

我們直接看Worker類相關的程式碼:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ............
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        ..........
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //呼叫getThreadFactory獲取ThreadPoolExecutor對應的ThreadFactory
            this.thread = getThreadFactory().newThread(this);
        }
        .........
    }
    ............
}

利用工具類Executors建立ThreadPoolExecutor時,
如果不顯示指定,一般預設使用DefaultThreadFactory。

public class Executors {
    ..........
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        //決定了執行緒的group, 名稱字首
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        //建立實際執行緒的方法
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);

            //強行定義了thread的daemon屬性及優先順序
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    ..........
}

可以根據需求,自行定義ThreadFactory。

五、RejectedExecutionHandler
RejectedExecutionHandler主要負責處理被拒絕的任務。

從第一部分execute的程式碼,我們知道當任務處理失敗時,
將會呼叫reject方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ..........
    final void reject(Runnable command) {
        //呼叫RejectedExecutionHandler的rejectedExecution方法
        handler.rejectedExecution(command, this);
    }
    ..........
}

常見的RejectedExecutionHandler有以下幾種:
5.1 AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {
    ..........
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

容易看出使用AbortPolicy時,一旦遇到任務無法處理的情況,將直接丟擲異常。

5.2 CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    ..........
    //呼叫執行緒池的execute方法時, 才有可能被拒絕,從而執行rejectedExecution方法
    //這是一個同步的過程,因此r.run將執行在呼叫者所在的執行緒中
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //呼叫前提是執行緒池沒有被shutdown
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

顧名思義,當使用CallerRunsPolicy時,一旦遇到任務無法處理的情況,
任務將直接在呼叫執行緒中執行。
因此使用CallerRunsPolicy時,最好不要在類似UI執行緒裡直接執行execute函式,
避免UI執行緒被任務阻塞。

5.3 DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {
    ............
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //直接丟棄
    }
}

當使用DiscardPolicy時,一旦遇到任務無法處理的情況,執行緒池直接丟棄該任務。

5.4 DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    ............
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            //移除workQueue中最前面的任務
            e.getQueue().poll();
            //重新呼叫execute,仍然有可能觸發rejectedExecution
            e.execute(r);
        }
    }
}

當使用DiscardOldestPolicy時,一旦遇到任務無法處理的情況,
執行緒池直接將丟棄任務佇列中最舊的任務,並重新呼叫execute介面處理任務。

六、總結
至此,我們已經分析完了ThreadPoolExecutor主要引數的含義。

結合引數的含義,分析ThreadPoolExecutor的execute函式,
應該比較容易掌握ThreadPoolExecutor的工作原理。

下一篇部落格,我們再來分析ThreadPoolExecutor中其它值得一看的程式碼。

相關推薦

Executor Framework分析 () ThreadPoolExecutor主要引數分析

本篇部落格主要記錄ThreadPoolExecutor主要引數的含義, 並分析相關介面的具體實現。 很多時候,當我們不需要指定執行緒池的執行細節時, 會直接利用工具類Executors建立執行緒池,例如: public class Executo

ThreadPoolExecutor分析()

ase 任務 second nbsp while循環 clear ech 嘗試 turn 說明:本作者是文章的原創作者,轉載請註明出處:本文地址:http://www.cnblogs.com/qm-article/p/7859620.html 內部類Worker的分析 從源

Django rest framework 權限操作(源碼分析)

prop 源碼 display rtc body app ffi 代碼 tin 知識回顧 這一篇是基於上一篇寫的,上一篇謝了認證的具體流程,看懂了上一篇這一篇才能看懂, 當用戶訪問是 首先執行dispatch函數,當執行當第二部時: #2.處理版本信息

Django rest framework 許可權操作(原始碼分析)

知識回顧  這一篇是基於上一篇寫的,上一篇謝了認證的具體流程,看懂了上一篇這一篇才能看懂, 當用戶訪問是 首先執行dispatch函式,當執行當第二部時: #2.處理版本資訊 處理認證資訊 處理許可權資訊 對使用者的訪問頻率進行限制 self

Java執行緒池ThreadPoolExecutor使用和分析()

   相關文章目錄:     execute()是 java.util.concurrent.Executor介面中唯一的方法,JDK註釋中的描述是“在未來的某一時刻執行命令command”,即向執行緒池中提交任務,在未來某個時刻執行,提交的任務必須實現

維高斯分佈(Two-dimensional Gaussian distribution)的引數分析

  最近在看高斯混合模型(Gaussian Mixture Model, GMM),涉及到高斯分佈的引數。為此特意回顧了概率論的二維高斯分佈的相關概念,並分析了引數對二維高斯分佈曲面的影響。 1、多維高斯分佈的概率密度函式       多維變數X

Executor執行框架原始碼分析(一)——executor、threadFactory、ThreadPoolExecutor 、Future元件的關係及作用

       executor執行框架是JDK1.5新增的,用於專注於任務執行的框架。其最大的特點就是將任務的建立和任務的執行分離,鬆耦合,已達到最大限度的利用計算機資源(執行緒和記憶體等)。在併發程式設計中,executor是一個必備的工具。     在分析原始碼之前,首先

百度post引數分析)完結,dv、traceid的js來源

上一篇已經找到了post引數中的dv來源,今天繼續往下看,回顧一上一篇中dv的相關js var a = document.getElementById("dv_Input") , c = {

ABP原始碼分析十七:ABP.Entity Framework

IRepository:介面定義了Repository常見的方法 AbpRepositoryBase:實現了IRepository介面的常見方法 EfRepositoryBase:實現了AbpRepositoryBase中定義的抽象方法:GetAll,Insert,Delete,Update。在實

(數字IC)低功耗設計入門()——功耗的分析

layout 變化 監視 merge obj source divide 傳播 總結   前面學習了進行低功耗的目的個功耗的構成,今天就來分享一下功耗的分析。由於是面向數字IC前端設計的學習,所以這裏的功耗分析是基於DC中的power compiler工具;更精確的功耗分析

guava eventbus代碼分析()

.get 實現類 ava bject () sync 技術 cdi alt ---恢復內容開始--- 我們分析下EventBus的核心方法 post方法,直接貼代碼 1 public void post(Object event) { 2 Iterator

thinkphp5 源碼分析 框架引導

ase 文件 loader esp register behavior llb err filename 框架引導文件源代碼 (/thinkphp/start.php) 1. 引入基礎文件(/thinkphp/base.php) // 加載基礎文件 require __D

傳奇源碼分析-客戶端(遊戲邏輯處理源分析)

ltp 網關 message 魔法 tco 獲取 creat proc 分數 5.接受登錄成功後,接收GameSrv服務器發送的消息:接收GameGate發送的消息:CClientSocket::OnSocketMessage的FD_READ事件中,PacketQ.Push

day32 Python與金融量化分析()

fill 最小 all copy data oat 模型 主板 解析 第一部分:金融與量化投資 股票: 股票是股份公司發給出資人的一種憑證,股票的持有者就是股份公司的股東。 股票的面值與市值 面值表示票面金額 市值表示市場價值 上市/IPO: 企業通過證券交易所公

Python數據分析(): Numpy技巧 (3/4)

targe 工具 由於 ref 數據分析 技術分享 添加 pan note numpy、pandas、matplotlib(+seaborn)是python數據分析/機器學習的基本工具。 numpy的內容特別豐富,我這裏只能介紹一下比較常見的方法和屬性。 昨天晚上發

Python數據分析(): Numpy技巧 (4/4)

div 基本 images atp 工具 cnblogs note 屬性。 html numpy、pandas、matplotlib(+seaborn)是python數據分析/機器學習的基本工具。 numpy的內容特別豐富,我這裏只能介紹一下比較常見的方法和屬性。

第十至十章 算法分析--高階數據結構

需要 while 目的 pub 特殊 lar pan 編碼 void 1.貪婪算法的第二個應用為 哈夫曼編碼 來進行文件壓縮。 文件壓縮的主要問題是給文件中的所有字符分配能唯一識別的編碼(n個比特),如果我們事先知道所有字符出現的頻率,把頻率最高的放在最上層,頻率低的放在左

病毒分析第二講,分析病毒的主要功能

進入 cnblogs and 定位 mic 1-1 ftw com png ---恢復內容開始---              病毒分析第二講,分析病毒的主要功能 經過昨天病毒分析第一講,得出一個被註入的DLL 開始分析DLL主要功能 PS: IDA中,DLL會有各種初始化

Linux VFS分析()

系統內存 生命期 需要 tar 創建 重新 方法 沒有 引用計數 inode的管理:Inode-cache hash表inode_hashtable索引節點緩存 dentry的管理:我們知道,若幹dentry描繪了一個樹型的目錄結構,這就是用戶所看到的目錄結構,每個den

銳捷CCNA系列() Wireshark抓包分析Ping過程

銳捷 CCNA 銳捷實戰 銳捷CCNA 數通 實訓目的 初步了解Wireshark的使用。 能分析Ping過程。 實訓背景 PING(Packet Internet Groper, 因特網包探索器),用於測試網絡是否連通的程序,在Windows、Linux、Unix下都是標配程序,Pi