1. 程式人生 > 實用技巧 >《java併發》之執行緒池

《java併發》之執行緒池

執行緒池有什麼好處

降低資源消耗,複用已建立的執行緒,降低開銷、控制最大併發數。

隔離執行緒環境,可以配置獨立執行緒池,將較慢的執行緒與較快的隔離開,避免相互影響。

實現任務執行緒佇列緩衝策略和拒絕機制。

實現某些與時間相關的功能,如定時執行、週期執行等。


執行緒池處理任務的流程

當提交一個新任務到執行緒池是

核心執行緒池未滿,建立一個新的執行緒執行任務

② 如果核心執行緒池已滿,工作佇列未滿,將執行緒儲存在工作佇列

③ 如果工作佇列已滿,執行緒數小於最大執行緒數就建立一個新執行緒處理任務

④ 如果超過大小執行緒數,按照拒絕策略來處理任務

執行緒池建立執行緒時,會將執行緒封裝成工作執行緒 Worker,Worker 在執行完任務後還會迴圈獲取工作佇列中的任務來執行。


建立執行緒池的方法

可以通過 Executors 的靜態工廠方法建立執行緒池:

newFixedThreadPool,固定大小的執行緒池,核心執行緒數也是最大執行緒數,不存在空閒執行緒,keepAliveTime = 0。該執行緒池使用的工作佇列是無界阻塞佇列 LinkedBlockingQueue,適用於負載較重的伺服器。

newSingleThreadExecutor,使用單執行緒,相當於單執行緒序列執行所有任務,適用於需要保證順序執行任務的場景。阻塞佇列為LinkedBlockingQueue

newCachedThreadPool,maximumPoolSize 設定為 Integer 最大值,是高度可伸縮的執行緒池。

該執行緒池使用的工作佇列是沒有容量的 SynchronousQueue,如果主執行緒提交任務的速度高於執行緒處理的速度,執行緒池會不斷建立新執行緒,極端情況下會建立過多執行緒而耗盡CPU 和記憶體資源。適用於執行很多短期非同步任務的小程式或負載較輕的伺服器。

newScheduledThreadPool:執行緒數最大為 Integer 最大值,存在 OOM 風險。支援定期及週期性任務執行,適用需要多個後臺執行緒執行週期任務,同時需要限制執行緒數量的場景。相比 Timer 更安全,功能更強,與 newCachedThreadPool 的區別是不回收工作執行緒。

newWorkStealingPool

:JDK8 引入,建立持有足夠執行緒的執行緒池支援給定的並行度,通過多個佇列減少競爭。


建立執行緒池引數

① corePoolSize:核心執行緒池大小

② maximumPoolSize:最大執行緒池大小

keepAliveTime:執行緒池中超過 corePoolSize 數目的空閒執行緒最大存活時間

④ unit:keepAliveTime 的時間單位。

⑤ workQueue:工作佇列,當執行緒請求數大於等於 corePoolSize 時執行緒會進入阻塞佇列。

⑥ threadFactory:執行緒工廠,用來生產一組相同任務的執行緒。可以給執行緒命名,有利於分析錯誤。

⑦ handler:拒絕策略

  • AbortPolicy: 直接丟擲異常(預設)
  • DiscardPolicy : 不處理,丟棄掉
  • CallerRunsPolicy : 用呼叫者所在的執行緒執行任務
  • DiscardOldestPolicy :丟棄佇列中最近的一個任務,並執行當前任務


如何關閉執行緒池

可以呼叫 shutdownshutdownNow 方法關閉執行緒池

原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的 interrupt 方法中斷執行緒,無法響應中斷的任務可能永遠無法終止。

  • shutdownNow 首先將執行緒池的狀態設為 STOP,然後嘗試停止正在執行或暫停任務的執行緒,並返回等待執行任務的列表。
  • shutdown 只是將執行緒池的狀態設為 SHUTDOWN,然後中斷沒有正在執行任務的執行緒。

通常呼叫 shutdown 來關閉執行緒池,如果任務不一定要執行完可呼叫 shutdownNow


執行緒池的選擇策略

可以從以下角度分析:

  • ①任務性質:CPU 密集型、IO 密集型和混合型。
  • ②任務優先順序。
  • ③任務執行時間。
  • ④任務依賴性:是否依賴其他資源,如資料庫連線。

性質不同的任務可用不同規模的執行緒池處理

  • CPU 密集型任務應配置儘可能小的執行緒,如配置 Ncpu+1 個執行緒的執行緒池。
  • IO 密集型任務執行緒並不是一直在執行任務,應配置儘可能多的執行緒,如 2*Ncpu
  • 混合型的任務,如果可以拆分,將其拆分為一個 CPU 密集型任務和一個 IO 密集型任務,只要兩個任務執行的時間相差不大那麼分解後的吞吐量將高於序列執行的吞吐量,如果相差太大則沒必要分解。

優先順序不同的任務可以使用優先順序佇列 PriorityBlockingQueue 處理。

執行時間不同的任務可以交給不同規模的執行緒池處理,或者使用優先順序佇列讓執行時間短的任務先執行。

依賴資料庫連線池的任務,由於執行緒提交 SQL 後需要等待資料庫返回的結果,等待的時間越長 CPU 空閒的時間就越長,因此執行緒數應該儘可能地設定大一些,提高 CPU 的利用率。

建議使用有界佇列,能增加系統的穩定性和預警能力,可以根據需要設定的稍微大一些。


阻塞佇列

阻塞佇列支援阻塞插入和移除,當佇列滿時,阻塞插入元素的執行緒直到佇列不滿。當佇列為空時,獲取元素的執行緒會被阻塞直到佇列非空。阻塞佇列常用於生產者和消費者的場景,阻塞佇列就是生產者用來存放元素,消費者用來獲取元素的容器。

Java 中的阻塞佇列

ArrayBlockingQueue,由陣列組成的有界阻塞佇列,預設情況下不保證執行緒公平,有可能先阻塞的執行緒最後才訪問佇列。

LinkedBlockingQueue,由連結串列組成的有界阻塞佇列,佇列的預設和最大長度為 Integer 最大值。

PriorityBlockingQueue支援優先順序無界阻塞佇列,預設情況下元素按照升序排序。可自定義 compareTo 方法指定排序規則,或者初始化時指定 Comparator 排序,不能保證同優先順序元素的順序。

DelayQueue,支援延時獲取元素無界阻塞佇列,使用優先順序佇列實現。建立元素時可以指定多久才能從佇列中獲取當前元素,只有延遲期滿時才能從佇列中獲取元素,適用於快取和定時排程。

SynchronousQueue不儲存元素的阻塞佇列,每一個 put 必須等待一個 take。預設使用非公平策略,也支援公平策略,適用於傳遞性場景,吞吐量高。

LinkedTransferQueue連結串列組成的無界阻塞佇列,相對於其他阻塞佇列多了 tryTransfer transfer 方法。

  • transfer方法:如果當前有消費者正等待接收元素,可以把生產者傳入的元素立刻傳輸給消費者,否則會將元素放在佇列的尾節點並等到該元素被消費者消費才返回。
  • tryTransfer 方法用來試探生產者傳入的元素能否直接傳給消費者,如果沒有消費者等待接收元素則返回 false,和 transfer 的區別是無論消費者是否消費都會立即返回。

LinkedBlockingDeque,連結串列組成的雙向阻塞佇列,可從佇列的兩端插入和移出元素,多執行緒同時入隊時減少了競爭。

實現原理

使用通知模式實現,生產者往滿的佇列裡新增元素時會阻塞,當消費者消費後,會通知生產者當前佇列可用。

當往佇列裡插入一個元素,如果佇列不可用,阻塞生產者主要通過 LockSupport 的 park 方法實現,不同作業系統中實現方式不同,

在 Linux 下使用的是系統方法 pthread_cond_wait 實現。