1. 程式人生 > 實用技巧 >03 執行緒池:業務程式碼最常用也最容易犯錯誤的元件

03 執行緒池:業務程式碼最常用也最容易犯錯誤的元件

  1. 執行緒池的宣告需要手動進行new ThreadPoolExecutor形式。生產中常因為使用Executors類下方法,如newFixedThreadPool和newCachedThreadPool導致OOM問題。

  • newFixedThreadPool中雖然執行緒數可以限制,但任務數量是無限的,因為使用的是LinkendBlockingQueue無限佇列來儲存任務,任務不能及時處理,過多堆積,最終OOM;

  • newCachedThreadPool執行緒數是無上界的,而工作佇列是SynchronousQueue沒有儲存空間的阻塞佇列,即有請求到來就建立一個執行緒處理任務(執行緒是需要儲存空間的),最終OOM無法建立執行緒;

  1. 手動建立執行緒的原因:

  • 根據自己的場景、併發情況來評估執行緒池的幾個核心引數,如核心執行緒數、最大執行緒數、執行緒回收策略、工作佇列型別以及拒絕策略,確保執行緒池工作行為符合要求,一般都需要設定有界工作佇列和可控執行緒數

  • 任何時候,都應該為自定義執行緒池指定有意義的名稱,以方便排查問題。當出現執行緒數暴增、執行緒死鎖、執行緒佔用大量CPU、執行緒執行出現異常等問題時,往往會抓取執行緒棧。此時有意義執行緒名稱,有助於定位問題。

  • 要用一些監控手段來觀察執行緒池狀態。

  1. 執行緒池預設工作行為如下:

  • 不會初始化corePoolSize個執行緒,有任務來了才會建立工作執行緒,即使核心執行緒有空閒,在沒有到核心執行緒數前,對新任務建立新執行緒;

  • 當核心執行緒滿了之後不會立即擴容執行緒池,而是把任務堆積到工作佇列中;

  • 當工作佇列滿了後擴容執行緒池,一直到執行緒個數達到maximumPoolSize為止;

  • 如果佇列已滿且達到了最大執行緒後還有任務進來,按照拒絕策略處理;

  • 當執行緒數大於核心執行緒數時,執行緒等待keepAliveTime後沒有任務要處理的話,收縮執行緒到核心執行緒數;

  1. 通過如下手段改變執行緒池預設工作行為:

  • 宣告執行緒池後立即呼叫prestartAllCoreThreads方法,來啟動所有核心執行緒;

  • 傳入true給allowCoreThreadTimeOut方法,來讓執行緒池在空閒的時候同樣回收核心執行緒。

  1. 瞭解了執行緒池工作行為,如何設計一個不等任務佇列滿,而優先建立執行緒的執行緒池呢,如下兩步:

  • 由於執行緒池在佇列滿,即無法加入佇列情況下才擴容執行緒池,那麼就可以重寫佇列offer方法,造成佇列已滿的假象;offer方法是往隊尾插入任務,

  • 佇列已滿後,同時達到最大執行緒數,那麼會觸發執行緒池拒絕策略。我們可以自定義拒絕策略,當佇列"已滿"(假象)後,再真正把任務加入到佇列中,通過執行緒池獲取到佇列,再通過put加入任務。

  • 參考stackoverflowtomcat

  • 程式碼案例

  static BlockingQueue queue = new LinkedBlockingQueue(10) {
@Override
public boolean offer(Object o) {
if (size() == 0) {
return super.offer(o);
} else {
return false;
}
}
};
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 5,
60, TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from ");
}
} catch (Exception ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
});
public static void main(String[] args) throws Exception{
IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
threadPoolExecutor.execute(() -> {
System.out.println(i);
});
});
}
  1. 執行緒池是用來複用的,一個系統根據業務類別用到幾個執行緒池還好,但是要注意使用的時候不要根據請求或其他方法呼叫來建立執行緒池,避免造成建立太多執行緒池。一般就是系統初始化過程,執行緒池已經建立好,直接使用。

  2. 注意執行緒池不要混用,根據程式功能不同,不同策略要配置不同引數的執行緒池:

  • 對於執行慢、數量不大的IO任務,或許要考慮更多的執行緒數,而不需要太大的佇列;

  • 對於吞吐量較大的計算型任務,執行緒數量不宜過多,可以是CPU核數或核數*2(過多執行緒造成的執行緒切換開銷,並不能提高吞吐量),但可能需要較長的佇列來做緩衝。

  1. Linux系統中可以使用wrk壓測工具,這款工具可以呼叫lua指令碼,還可以統計吞吐量、延遲、每次請求資料大小等

  2. Java8中的parallel stream功能,如IntStream.rangeClosed,可以很方便的並行處理集合中元素,其背後是共享同一個ForkJoinPool,預設並行度是CPU核數-1。對於CPU繫結的任務使用這樣的配置比較合適。 如果集合操作設計同步IO操作(資料庫操作、外部服務呼叫等)的話,建議自定義一個ForkJoinPool或普通執行緒池。

  public static void main(String[] args) throws Exception{
ForkJoinPool forkJoinPool=new ForkJoinPool(3);
//注意parallel()方法是將操作分成多個任務,多個任務一起完成這個迴圈
//如果不使用這個方法,execute中的引數就是一個任務裡面做了10次迴圈
forkJoinPool.execute(()-> IntStream.rangeClosed(1,10).parallel().forEach(i->{
System.out.println(Thread.currentThread().getName());
System.out.println(i);
}));
forkJoinPool.shutdown();
//下面方法是等待在執行shutdown之後,所有任務執行完就終止,否則就等待指定時間後終止
forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
}