1. 程式人生 > >java ThreadPoolExecutor初探

java ThreadPoolExecutor初探

 導讀:執行緒池是開發中使用頻率比較高的元件之一,但是又有多少人真正瞭解其內部機制呢。

關鍵詞:執行緒池

前言

執行緒池是大家開發過程中使用頻率比較高的元件之一,但是其內部原理又有多少人真正清楚呢。最近抽時間去了解了一下其內部實現細節,感覺略有收穫,遂以ThreadPoolExecuter為例將自己的心得體會分享出來和大家一起交流,如有不妥之處,煩請大家積極指正。

我的疑問

  1. 執行任務的工作機制是怎樣的?
  2. 是什麼時機清理空閒執行緒的?
  3. ThreadPoolExecutor.CallerRunsPolicy 什麼場景下會用到?
  4. 執行緒池終止的時候都做了什麼?(過兩天補上)

問題1-執行任務的工作機制是怎樣的?

開始之前,我先試圖用一個真實的場景來描述一下,2015年我們公司boss開始創立公司,創立初期公司業務比較少,他一個人(corePoolSize=1)乾的有條不紊,有聲有色的,沒過多久,業務量上來了,他一個人幹不過來,分身乏力,那怎麼辦呢?其實很簡單,排隊唄,就這樣boss將待辦的任務都新增到備忘錄(BlockIngQueue)裡面,boss又開始愉快的工作,但是客戶的耐心終歸有限,過了幾天發現自己交給我們公司的業務還沒完成,客戶一氣之下打電話給boss“我的活你幹完沒有,沒幹的話就停下來(shutdown/shutdownNow)吧,我找別人了”,這時候boss慌了,默默的點上一根菸,在網上發了招聘通知,就這樣幹活的人又多了起來(addWorker),公司在boss的帶領下風生水起,就這樣不知不覺中公司走過了五個年頭,本以為可以大幹一場的我們,卻偏偏趕上了2020年的新冠,復工日期一拖再拖,客戶需求一少再少,唯獨公司養的員工沒少,這是公司目前最大的開支了。長痛不如短痛,boss下發了一個政策,如果員工本月(keepAliveTime=一個月)kpi完不成,假定kpi為“單月完成任務數大於0“,那麼就會被淘汰(空閒執行緒被清理),最後撐了兩個月,公司又回到解放前,boss又成了光桿司令。

文字描述

提交任務步驟1:啟動核心工作執行緒

觸發條件:工作執行緒數小於核心執行緒數

步驟描述:啟動核心工作執行緒,並將任務作為工作執行緒的firstTask,如果啟動失敗會走到“提交任務步驟2”

 

提交任務步驟2:往阻塞佇列中堆積任務

觸發條件:工作執行緒數大於核心執行緒數

步驟描述:嘗試將任務堆積到佇列中,如果堆積失敗,會走到“提交任務步驟3”。如果堆積成功會做兩個雙重檢查,分別是狀態的檢查和工作執行緒數的檢查,如果狀態不合法就嘗試刪除剛新增成功的任務,刪除成功呼叫reject方法(為什麼刪除失敗不需要呼叫reject方法呢?因為刪除失敗意味著task已經被處理過了,不能謊報軍情);如果工作執行緒數等於0就補充工作執行緒,什麼情況下工作執行緒會變成0呢,後面會單獨說這個問題

 

提交任務步驟3:啟動工作執行緒

觸發條件:核心執行緒數已達到執行緒池設定的閾值corePoolSize而且佇列裡已經堆積不了了

步驟描述:嘗試啟動工作執行緒,如果這時候啟動失敗就呼叫reject方法通知呼叫者

時序圖描述

 

 

 

 

問題2-是什麼時機清理空閒執行緒的?

 我們先來回顧下執行緒的執行狀態,如下圖:

 

 

 回到我們的問題,只需要讓“空閒執行緒真的空閒下來”它自然就被清理了,那怎麼能讓它真的閒下來呢。前面時序圖的第5步我提到worker會一直迴圈從BlockingQueue裡面獲取task執行,如果沒有task返回給worker,那說明它真的是閒了(getTask返回null,意味著會退出while迴圈,很快worker的run方法就執行結束,執行緒從RUNNABLE到TERMINATED),它需要自己退出歷史的長河中了,縱使它曾經立下汗馬功勞,但是誰讓它不是核心人員呢(你處於corePoolSize之外),讓我們讀一下getTask的程式碼:

private Runnable getTask() {
       //標記位,獲取任務是否超時
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
           
            int wc = workerCountOf(c);//worker的數量

            // 標記位,可以簡單理解為是否允許超時回收,允許的條件為allowCoreThreadTimeOut(是否允許核心執行緒被回收)或者worker的數量大於核心執行緒數
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //滿足以下幾個條件時會返回null,1.worker的數量大於最大執行緒池的數量;
2.允許超時回收(timed==true)&&上一次獲取任務超時(timedOut==true)&&當前沒有堆積任務(workQueue.isEmpty),說明worker確實空閒了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果允許超時回收就使用poll,否則使用take Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //poll結束,佇列中當時沒有任務,將timedOut置為true,下一趟迴圈時會滿足回收的第二個條件,getTask返回null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

  

問題3-ThreadPoolExecutor.CallerRunsPolicy 什麼場景下會用到?

前面我們提到,如果執行緒池的處理能力已經飽和,那麼就會呼叫reject方法執行RejectedExecutionHandler,jdk已經為我們提供了四種拒絕策略,分別是AbortPolicy,DiscardOldestPolicy,DiscardPolicy,CallerRunsPolicy,前三種都好理解,簡單來說就是丟棄,唯獨最後一種我想了好久沒想到使用場景,前不久在看同事程式碼的時候發現他有用到這個策略,來達到一種限流的效果,簡單畫個流程圖說明一下邏輯:

DelayQueuePollingTask負責定時從資料庫拉取延遲佇列,然後封裝成task扔到執行緒池去執行,task使用httpclient向isv推送資料,由於isv的服務吞吐量比較低,經常會觸發超時,進而導致執行緒池被跑滿,為了讓DelayQueuePollingTask可以感知到執行緒池處於飽和狀態,而且又不至於它空等待,所以使用了CallerRunsPolicy這個策略,當執行緒池滿負荷的時候由DelayQueuePollingTask所在的執行緒負責執行task。

 

 

 

 

 

 

總結

 

上週末帶著自己之前使用執行緒池過程中的一些疑問學習了一下ThreadPoolExecutor原始碼,雖說沒有完全吃透,但也帶給自己不少收穫,在這裡將自己的思考過程分享出來,但願能幫到一部分人。

如果覺得有用,請點個推薦