1. 程式人生 > >啃不透--執行緒池

啃不透--執行緒池

引子   

  上一篇文章《火焰圖--記一次cpu降溫過程》提到接手了單車投放的應用,上週已經有大量流量切到新應用,然而心中還是惴惴不安的,投放動作業務邏輯複雜,使用了執行緒池非同步處理。對於執行緒池認知只是停留在一個模糊的狀態。這是一個什麼狀態呢:感覺什麼都懂,就是說不出來(就像考試前啥都懂,考試時咬筆桿)。每次使用執行緒遇到問題都不能用已經有知識去思考,比如3個核心引數應該如何設定,阻塞佇列應該用什麼...通過搜尋引擎查詢後,又感覺自己懂了,實際上只是做了一次無用的迴圈,這次輸入的內容,因為沒有體系根基,很快又被新的內容沖刷得毫無痕跡。

  最近加入同事文博組織的虛擬PMO團隊--Thor,大家在交流分享結構化思維時,方才意識到自己在學習執行緒池上花了很多時間,還是不能清楚的描述它,更本原因就是沒有從巨集觀上認識,建立初始知識體系,沒有這個基礎,零散學習只是增加了很多無效時間。這也就是為啥有的人學習快,有些慢;有的人可以舉一反三,有的確不能。

  一個良好的學習過程應該是:

  1,找到初始化知識體系。

      2,補充豐富知識體系。

      我們都知道系統學習可以建立初始化知識體系,比如閱讀一本相關書籍,寫一些文章。那麼如何補充豐富呢,一句話:拿到錘子,看到什麼都是釘子。那應該如何理解這句話呢:這篇文章就是這個的實踐。

  

                                        文博分享的關於結構化思維的腦圖

 

為什麼要用執行緒池

  隨著處理器的核心越來越多,利用多執行緒技術可以把計算邏輯拆分成多個片段,分配到多個核心上,可以顯著減少處理時間,提高效率;複雜的業務邏輯,可以使用多執行緒併發處理,縮短響應時間,提高使用者體驗。java的執行緒機制是搶佔式協作多執行緒, 呼叫機制會週期性的中斷執行緒,將上下文切換到另一個程序,執行緒越多,競爭會越多,切換會更頻繁。所以增加執行緒帶來的效能增加不是線性的,這就是amdahl定律。

  再者,執行緒的建立與銷燬,上下文切換都不是免費的。《併發程式設計實戰》一書中對於執行緒建立與銷燬開銷說明:

Thread lifecycle overhead. Thread creation and teardown are not free. The actual overhead varies across platforms, but thread creation takes time, introducing latency into request processing, and requires some processing activity by the JVM and OS. If requests are frequent and lightweight, as in most server applications, creating a new thread for each request can consume significative computing resources.

  大意如下:“執行緒生命週期開銷:建立和銷燬都是有代價的。實際開銷雖因平臺有所不同,但是都要消耗時間,jvm和os 需要執行一些處理程式;在大數請求頻繁的服務端應用中,如果為每個請求建立一個執行緒將消耗非常可觀的計算機資源”。以上概念層的開銷,那一個java執行緒的建立實際開銷則是這樣的:  

  • A large block of memory has to be allocated and initialized for the thread stack. 為執行緒棧分配記憶體
  • System calls need to be made to create / register the native thread with the host OS.  為os 建立和註冊本地執行緒進行系統呼叫
  • Descriptors needs to be created, initialized and added to JVM internal data structures.  建立和初始化描述符,新增到jvm內部的資料結構。

  上下問切換(context switching)也是有開銷的,需要分配記憶體儲存當前狀態,克隆系統呼叫等,具體可以參考文末參考資料[2]

 

    正是因為建立執行緒的代價是如此昂貴的(expensive),所以執行緒池出現了, 它以“池化”思想來管理資源,按需建立,分配,回收;並重複利用已有的執行緒資源。既然大家都用執行緒池,那麼它的”真面目“是怎麼樣的呢-- 從源開開始。

 

原始碼分析

  java為多執行緒程式設計提供了良好的,考究並且一致的程式設計模型,讓我們只需關注問題本身,而ThreadPoolExecutor類就是java為我們提供的執行緒池模型,其繼承體系如下圖,頂層介面定義了統一的行為,並將任務提交與任務執行的策略解藕開來;而AbstractExecutorService 抽象任務執行流程並串連起來;如此,子類只用關注某個具體方法了。

                  

 

   一般而言 ThreadPoolExecutor.execute(Runnable()) 是我們使用執行緒池的入口

 

public void execute(Runnable command) {
if (command == null) // 三種情況: int c = ctl.get(); //1,執行緒數 少於 核心執行緒 直接建立執行緒 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //執行緒數數超過 核心執行緒,但是blockqueue 未滿,enqueue. if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // queue 已經滿,直接建立執行緒(超過max reject) else if (!addWorker(command, false)) reject(command); }

  execute方法的三個分支,決定了執行緒池中執行緒的建立執行策略(面試中經常碰到的場景就是:添加了多個任務時,建立了多少個執行緒):

  1,執行緒數 少於 核心執行緒 直接建立執行緒   2,執行緒數數超過 核心執行緒,但是blockqueue 未滿,enqueue.   3, queue 已經滿,直接建立執行緒(超過max reject)      下圖展示了執行緒的建立過程

  

   上面的程式碼中的判斷條件中有兩個:workerCountOf(c) -- 獲取當前執行緒數; isRunning(c)  -- 執行緒池是否是執行狀態。這兩個方法的引數都是一個int型別,那麼一個int是如何能同時表示兩個型別呢。一個int 4個位元組,32位,這裡就是用指定位數(3位)來表示狀態,剩下的29位表示執行緒數,下圖展示了這個關係。jdk中還有一些其他類也同步用了這樣方法,比如:ReentrantReadWriteLock,高16位表示共享鎖的數量,低16位表示互斥鎖的數量。

 

  

  

// CAPACITY= 00011111111111111111111111111111(29個1)
// 獲取當前執行緒數 // 執行緒池的最大數就是2^29-1 private static int workerCountOf(int c) { return c & CAPACITY; }

 

  執行緒池做為一個物件,有自己的狀態機,其狀態變化是有內部事件驅動的。下圖展示了每個狀態以及對應值(狀態值是3位二進位制),及對應的行為。這裡有個插曲:以前面試被問到執行緒池shutwon和stop的差別。當時認識不清說得特別含糊,其實從這兩個狀態的英文單詞的含義就可以看出7,8分了。 showdown 牛津翻譯為:the act of closing a factory or business or stopping a large machine from working, either temporarily or permanently。體現的是進行時,closing,stopping;stop 意思比較多,但都是表示的一個意思:end /  not continue。大師的變數名命名那真是相當精確的,要不怎麼都提倡程式設計師學好英語呢。

  

   看完了執行緒池的排程入口,瞭解了執行緒池的狀態機,我們繼續來看下方法 addWorker(Runnable firstTask, boolean core),前文說到執行緒池的把任務的提交和執行解藕,那就是如何串連的呢,addWorker方法就很好的完成的這個串連。這個方法主要分兩個部分:

  1,根據執行緒池狀態及執行緒數判斷是返回還是繼續。其中第一個 if 條件尤為複雜,已經有註釋。

  2,建立工作程序物件 Worker w ,並執行其持有的執行緒物件thread 的start 方法。順利讓解藕的執行部分開始工作。

  這裡的程式碼邏輯不復雜,有一個標記還是有意思的: retry:(標記,可以寫成任意如:abc:) / continue retry ;(跳出當前迴圈) /break retry; (跳出外層迴圈)。 以後跳出雙重迴圈是不是也可以這樣寫?

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
       
        // 如果 是shutdown 以上, 後在有三個條件 都滿足才可以接續執行         1, shutdown 執行原有任務,可能加新任務。         2, firstTask 必須為空。         3, queue 不能為空(有任務才能接續執行。)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 

   接下來任務的執行就交給了工作執行緒 Worker w 了,這是一個內部類實現了介面 Runnable,建構函式中對的 屬性thread初始化傳是this,  如此 addWorker 方法中的 t.start(); 就順利呼叫了Worker的run 方法了,而run方法又呼叫 runWorker。所以真正執行任務的最終方法在這裡 -- runWorker。

Worker 
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 注意:這裡,這個執行緒 傳的runnable 是this, 也就是 worker本身, 所以start()後進入runnable狀態,等到獲取時間片後,就執行 run方法。
    this.thread = getThreadFactory().newThread(this);
}
}
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}
 

  我們繼續來讀最關鍵的方法runWorker,我刪除了一些判斷以及異常處理的程式碼,讓我們可以清晰看到處理邏輯:獲取任務,執行,回收執行緒。獲取任務有兩種情況:1,執行緒數小於核心數和佇列滿了但執行緒未到最大執行緒數時直接傳入了任務;2,從阻塞獲取任務,getTask()方法完成了這一任務

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();try {
                    Throwable thrown = null;
                    try {
                        task.run();
                    }
            afterExecute(task, thrown);

} finally { task = null;
            // 統計完成任務數
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
//回收工作執行緒,嘗試更新狀態。 processWorkerExit(w, completedAbruptly); } }

  

   ThreadPoolExecutor 中定義了 HashSet<Worker>worker 工作執行緒佇列,BlockingQueue<Runnable>workQueue 任務佇列 來實現了工作執行緒和任務管理與解藕。到裡執行緒的任務新增流程和執行過程就分析完了,當然中間拋棄了大量細節,比如鎖的使用,比如狀態機的變化等等。還是如前文所說,先建立初始化知識體系,後面再研究細節補充體系,每次的投入都是在強化它,再也不是無效時間了。簡版呼叫時序圖如下:

  

 

執行緒池監控

   文章開頭提到流量增大,心中不安,很大一部分原因,就是因為無法監控到線上執行緒池的狀態,比如阻塞佇列中的任務數,活躍執行緒數,執行緒池大小等等。當然這也是原於早前的無知,平時我們寫程式碼主要分成兩部分:功能性程式碼,實現業務功能;運維性程式碼,監控程式狀態,分析問題。大師的程式碼也不例外,只是優雅很多。ThreadPoolExecutor 中有提供了相關運維程式碼,並在runWorker 中使用模板方法設計模式,為我們獲取執行緒池狀態等資訊提供介面了,比如:beforeExecute(wt, task);  afterExecute(task, thrown);   ThreadPoolExecutor中這兩個方法都是空實現,我們可以繼承,並重寫完成狀態的獲取。獲取執行緒池運維狀態提代瞭如下方法下圖。

  

  參考了一位網友的程式碼(忘記出處了),繼承ThreadPoolExecutor ,重寫afterExecute,列印執行緒池相關資訊

@Slf4j
public class ThreadPoolMonitor  extends ThreadPoolExecutor {

    private String poolName;

    /**
     * 呼叫執行緒池的構造方法,並記錄執行緒池名
     *
     * @param corePoolSize    執行緒池核心執行緒數
     * @param maximumPoolSize 執行緒池最大執行緒數
     * @param keepAliveTime   執行緒的最大空閒時間
     * @param unit            空閒時間的單位
     * @param workQueue       儲存被提交任務的佇列
     * @param poolName        執行緒池名稱
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                new HamThreadFactory(poolName));
        this.poolName = poolName;
    }

    /**
     * 任務執行之後,將相關狀態記錄日誌。
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 統計任務耗時、初始執行緒數、核心執行緒數、正在執行的任務數量、
        // 已完成任務數量、任務總數、佇列裡快取的任務數量、池中存在的最大執行緒數、
        // 最大允許的執行緒數、執行緒空閒時間、執行緒池是否關閉、執行緒池是否終止
        log.info("{}-pool-monitor: " +
                        " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                this.poolName, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

}
View Code

 

 結語

  最近學習一直停留在輸入(看)層面,所看內容無法轉化成自己的知識體系,因而很多東西都無法深入,我們當然知道原因,但是總是說忙得沒時間整理。入職哈囉後看到很多優秀的人都是每天大量記錄心得感想。等到文博和我們分享他的讀書筆記時,從震撼到懺愧。知識只有經歷了 輸入-消化-輸出 才會最終成為我們擁有的。為此文博還在Thor團隊發起對賭打卡 -- 哈哈哈,我們當然要應戰。

  文章到這裡就結束了,因為個人經驗還有很多不足,文章中的分析也比較粗淺,甚至有錯誤的地方,希望大家可以拍磚,狠狠的拍。

 

   成為一名優秀的程式設計師!

   參考資料:

  [1] https://intellipaat.com/community/36170/why-is-creating-a-thread-said-to-be-expensive

  [2] https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/

  [3] https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww

  [4] 《java併發程式設計》

&n