1. 程式人生 > >戲(細)說Executor框架執行緒池任務執行全過程(上)

戲(細)說Executor框架執行緒池任務執行全過程(上)

內容綜述

基於Executor介面中將任務提交和任務執行解耦的設計,ExecutorService和其各種功能強大的實現類提供了非常簡便方式來提交任務並獲取任務執行結果,封裝了任務執行的全部過程。本文嘗試通過對j.u.c.下該部分原始碼的解析以ThreadPoolExecutor為例來追蹤任務提交、執行、獲取執行結果的整個過程。為了避免陷入枯燥的原始碼解釋,將該過程和過程中涉及的角色與我們工作中的場景和場景中涉及的角色進行對映,力圖生動和深入淺出。

一、前言

1.5後引入的Executor框架的最大優點是把任務的提交和執行解耦。要執行任務的人只需把Task描述清楚,然後提交即可。這個Task是怎麼被執行的,被誰執行的,什麼時候執行的,提交的人就不用關心了。具體點講,提交一個Callable物件給ExecutorService(如最常用的執行緒池ThreadPoolExecutor),將得到一個Future物件,呼叫Future物件的get方法等待執行結果就好了。

經過這樣的封裝,對於使用者來說,提交任務獲取結果的過程大大簡化,呼叫者直接從提交的地方就可以等待獲取執行結果。而封裝最大的效果是使得真正執行任務的執行緒們變得不為人知。有沒有覺得這個場景似曾相識?我們工作中當老大的老大(且稱作LD^2)把一個任務交給我們老大(LD)的時候,到底是LD自己幹,還是轉過身來拉來一幫苦逼的兄弟加班加點幹,那LD^2是不管的。LD^2只用把人描述清楚提及給LD,然後喝著咖啡等著收LD的report即可。等LD一封郵件非常優雅地報告LD^2report結果時,實際操作中是碼農A和碼農B幹了一個月,還是碼農ABCDE加班幹了一個禮拜,大多是不用體現的。這套機制的優點就是LD^2找個合適的LD出來提交任務即可,介面友好有效,不用為具體怎麼幹費神費力。

二、 一個最簡單的例子

看上去這個執行過程是這個樣子。呼叫這段程式碼的是老大的老大了,他所需要乾的所有事情就是找到一個合適的老大(如下面例子中laodaA就榮幸地被選中了),提交任務就好了。

// 一個有7個作業執行緒的執行緒池,老大的老大找到一個管7個人的小團隊的老大
       ExecutorService laodaA = Executors.newFixedThreadPool(7);
		 //提交作業給老大,作業內容封裝在Callable中,約定好了輸出的型別是String。
			String outputs = laoda.submit(
			         new Callable<String>() {
			             public String call() throws Exception
			             {
			                 return "I am a task, which submited by the so called laoda, and run by those anonymous workers";
			             }
			             //提交後就等著結果吧,到底是手下7個作業中誰領到任務了,老大是不關心的。
			         }).get();

			System.out.println(outputs);

使用上非常簡單,其實只有兩行語句來完成所有功能:建立一個執行緒池,提交任務並等待獲取執行結果。

例子中生成執行緒池採用了工具類Executors的靜態方法。除了newFixedThreadPool可以生成固定大小的執行緒池,newCachedThreadPool可以生成一個無界、可以自動回收的執行緒池,newSingleThreadScheduledExecutor可以生成一個單個執行緒的執行緒池。newScheduledThreadPool還可以生成支援週期任務的執行緒池。一般使用者場景下各種不同設定要求的執行緒池都可以這樣生成,不用自己new一個執行緒池出來。

三、程式碼剖析

這套機制怎麼用,上面兩句語句就做到了,非常方便和友好。但是submit的task是怎麼被執行的?是誰執行的?如何做到在呼叫的時候只有等待執行結束才能get到結果。這些都是1.5之後Executor介面下的執行緒池、Future介面下的可獲得執行結果的的任務,配合AQS和原有的Runnable來做到的。在下文中我們嘗試通過剖析每部分的程式碼來了解Task提交,Task執行,獲取Task執行結果等幾個主要步驟。為了控制篇幅,突出主要邏輯,文章中引用的程式碼片段去掉了異常捕獲、非主要條件判斷、非主要操作。文中只是以最常用的ThreadPoolExecutor執行緒池舉例,其實ExecutorService介面下定義了很多功能豐富的其他型別,有各自的特點,但風格類似。本文重點是介紹任務提交的過程,過程中涉及的ExecutorService、ThreadPoolExecutor、AQS、Future、FutureTask等只會介紹該過程中用到的內容,不會對每個類都詳細展開。

1、 任務提交

從類圖上可以看到,介面ExecutorService繼承自Executor。不像Executor中只定義了一個方法來執行任務,在ExecutorService中,正如其名字暗示的一樣,定義了一個服務,定義了完整的執行緒池的行為,可以接受提交任務、執行任務、關閉服務。抽象類AbstractExecutorService類實現了ExecutorService介面,也實現了介面定義的預設行為。

1-Executor

AbstractExecutorService任務提交的submit方法有三個實現。第一個接收一個Runnable的Task,沒有執行結果;第二個是兩個引數:一個任務,一個執行結果;第三個一個Callable,本身就包含執任務內容和執行結果。 submit方法的返回結果是Future型別,呼叫該介面定義的get方法即可獲得執行結果。 V get() 方法的返回值型別V是在提交任務時就約定好了的。

除了submit任務的方法外,作為對服務的管理,在ExecutorService介面中還定義了服務的關閉方法shutdown和shutdownNow方法,可以平緩或者立即關閉執行服務,實現該方法的子類根據自身特徵支援該定義。在ThreadPoolExecutor中,維護了RUNNING、SHUTDOWN、STOP、TERMINATED四種狀態來實現對執行緒池的管理。執行緒池的完整執行機制不是本文的重點,重點還是關注submit過程中的邏輯。

1) 看AbstractExecutorService中程式碼提交部分,構造好一個FutureTask物件後,呼叫execute()方法執行任務。我們知道這個方法是頂級介面Executor中定義的最重要的方法。。FutureTask型別實現了Runnable介面,因此滿足Executor中execute()方法的約定。同時比較有意思的是,該物件在execute執行後,就又作為submit方法的返回值返回,因為FutureTask同時又實現了Future介面,滿足Future介面的約定。

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2) Submit傳入的引數都被封裝成了FutureTask型別來execute的,對應前面三個不同的引數型別都會封裝成FutureTask。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

3) Executor介面中定義的execute方法的作用就是執行提交的任務,該方法在抽象類AbstractExecutorService中沒有實現,留到子類中實現。我們觀察下子類ThreadPoolExecutor,使用最廣泛的執行緒池如何來execute那些submit的任務的。這個方法看著比較簡單,但是執行緒池什麼時候建立新的作業執行緒來處理任務,什麼時候只接收任務不建立作業執行緒,另外什麼時候拒絕任務。執行緒池的接收任務、維護工作執行緒的策略都要在其中體現。

作為必要的預備知識,先補充下ThreadPoolExecutor有兩個最重要的集合屬性,分別是儲存接收任務的任務佇列和用來幹活的作業集合。

//任務佇列
private final BlockingQueue<Runnable> workQueue;
//作業執行緒集合
private final HashSet<Worker> workers = new HashSet<Worker>();

其中阻塞佇列workQueue是來儲存待執行的任務的,在構造執行緒池時可以選擇滿足該BlockingQueue 介面定義的SynchronousQueue、LinkedBlockingQueue或者DelayedWorkQueue等不同阻塞佇列來實現不同特徵的執行緒池。

關注下execute(Runnable command)方法中呼叫到的addIfUnderCorePoolSize,workQueue.offer(command) , ensureQueuedTaskHandled(command),addIfUnderMaximumPoolSize(command)這幾個操作。尤其幾個名字較長的private方法,把方法名的駝峰式的單詞分開,加上對方法上下文的瞭解就能理解其功能。

因為前面說到的幾個方法在裡面即是操作,又返回一個布林值,影響後面的邏輯,所以不大方便在方法體中為每條語句加註釋來說明,需要大致關聯起來看。所以首先需要把execute方法的主要邏輯說明下,再看其中各自方法的作用。

  • 如果執行緒池的狀態是RUNNING,執行緒池的大小小於配置的核心執行緒數,說明還可以建立新執行緒,則啟動新的執行緒執行這個任務。
  • 如果執行緒池的狀態是RUNNING ,執行緒池的大小小於配置的最大執行緒數,並且任務佇列已經滿了,說明現有執行緒已經不能支援當前的任務了,並且執行緒池還有繼續擴充的空間,就可以建立一個新的執行緒來處理提交的任務。
  • 如果執行緒池的狀態是RUNNING,當前執行緒池的大小大於等於配置的核心執行緒數,說明根據配置當前的執行緒數已經夠用,不用建立新執行緒,只需把任務加入任務佇列即可。如果任務佇列不滿,則提交的任務在任務佇列中等待處理;如果任務佇列滿了則需要考慮是否要擴充套件執行緒池的容量。
  • 當執行緒池已經關閉或者上面的條件都不能滿足時,則進行拒絕策略,拒絕策略在RejectedExecutionHandler介面中定義,可以有多種不同的實現。

上面其實也是對最主要思路的解析,詳細展開可能還會更復雜。簡單梳理下思路:構建執行緒池時定義了一個額定大小,當執行緒池內工作執行緒數小於額定大小,有新任務進來就建立新工作執行緒,如果超過該閾值,則一般就不建立了,只是把接收任務加到任務佇列裡面。但是如果任務佇列裡的任務實在太多了,那還是要申請額外的工作執行緒來幫忙。如果還是不夠用就拒絕服務。這個場景其實也是每天我們工作中會碰到的場景。我們管人的老大,手裡都有一定HC(Head Count),當上面老大有活分下來,手裡人不夠,但是不超過HC,我們就自己招人;如果超過了還是忙不過來,那就向上門老大申請借調人手來幫忙;如果還是幹不完,那就沒辦法了,新任務咱就不接了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

4) addIfUnderCorePoolSize方法檢查如果當前執行緒池的大小小於配置的核心執行緒數,說明還可以建立新執行緒,則啟動新的執行緒執行這個任務。

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
       //如果當前執行緒池的大小小於配置的核心執行緒數,說明還可以建立新執行緒
            if (poolSize < corePoolSize && runState == RUNNING)
      // 則啟動新的執行緒執行這個任務
                t = addThread(firstTask);
        return t != null;
    }

5)  和上一個方法類似,addIfUnderMaximumPoolSize檢查如果執行緒池的大小小於配置的最大執行緒數,並且任務佇列已經滿了(就是execute方法試圖把當前執行緒加入任務佇列時不成功),說明現有執行緒已經不能支援當前的任務了,但執行緒池還有繼續擴充的空間,就可以建立一個新的執行緒來處理提交的任務。

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
           // 如果執行緒池的大小小於配置的最大執行緒數,並且任務佇列已經滿了(就是execute方法中試圖把當前執行緒加入任務佇列workQueue.offer(command)時候不成功),說明現有執行緒已經不能支援當前的任務了,但執行緒池還有繼續擴充的空間
            if (poolSize < maximumPoolSize && runState == RUNNING)
          //就可以建立一個新的執行緒來處理提交的任務
            t = addThread(firstTask);
        return t != null;
    }

6)  在ensureQueuedTaskHandled方法中,判斷如果當前狀態不是RUNING,則當前任務不加入到任務佇列中,判斷如果狀態是停止,執行緒數小於允許的最大數,且任務佇列還不空,則加入一個新的工作執行緒到執行緒池來幫助處理還未處理完的任務。

private void ensureQueuedTaskHandled(Runnable command) {
            //  如果當前狀態不是RUNING,則當前任務不加入到任務佇列中,判斷如果狀態是停止,執行緒數小於允許的最大數,且任務佇列還不空
             if (state < STOP &&
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
            //則加入一個新的工作執行緒到執行緒池來幫助處理還未處理完的任務
                t = addThread(null);
        if (reject)
            reject(command);
    }

7)   在前面方法中都會呼叫adThread方法建立一個工作執行緒,差別是建立的有些工作執行緒上面關聯接收到的任務firstTask,有些沒有。該方法為當前接收到的任務firstTask建立Worker,並將Worker新增到作業集合HashSet<Worker> workers中,並啟動作業。

private Thread addThread(Runnable firstTask) {
        //為當前接收到的任務firstTask建立Worker
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
            w.thread = t;
       //將Worker新增到作業集合HashSet&lt;Worker&gt; workers中,並啟動作業
            workers.add(w);
                t.start();
        return t;
    }

至此,任務提交過程簡單描述完畢,並介紹了任務提交後ExecutorService框架下執行緒池的主要應對邏輯,其實就是接收任務,根據需要建立或者維護管理執行緒。

維護這些工作執行緒幹什麼用?先不用看後面的程式碼,想想我們老大每月辛苦地把老闆豐厚的薪水遞到我們手裡,定期還要領著大家出去happy下,又是定期的關心下個人生活,所有做的這些都是為什麼呢?木訥的程式碼工不往這邊使勁動腦子,但是猜還是能猜的到的,就讓幹活唄。本文想著重表達細節,諸如執行緒池裡的Worker是怎麼工作的,Task到底是不是在這些工作執行緒中執行的,如何保證執行完成後,外面等待任務的老大拿到想要結果,我們將在下篇文章中詳細介紹。