1. 程式人生 > 程式設計 >詳細分析JAVA 執行緒池

詳細分析JAVA 執行緒池

系統啟動一個新執行緒的成本是比較高的,因為它涉及與作業系統互動。在這種情形下,使用執行緒池可以很好地提高效能,尤其是當程式中需要建立大量生存期很短暫的執行緒時,更應該考慮使用執行緒池。

與資料庫連線池類似的是,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個 Runnable 物件或 Callable 物件傳給執行緒池,執行緒池就會啟動一個執行緒來執行它們的 run() 或 call() 方法,當 run() 或 call() 方法執行結束後,該執行緒並不會死亡,而是再次返回執行緒池成為空閒狀態,等待執行下一個 Runnable 物件的 run() 或 call() 方法。

除此之外,使用執行緒池可以有效地控制系統中併發執行緒的數量,當系統中包含大量併發執行緒時,會導致系統性能劇烈下降,甚至導致 JVM 崩潰,而執行緒池的最大執行緒數引數可以控制系統中併發執行緒數不超過此數。

Java 8 改進的執行緒池

在 Java 5 以前,開發者必須手動實現自己的執行緒池;從 Java 5 開始, Java 內建支援執行緒池。 Java 5 新增了一個 Executors 工廠類來產生執行緒池,該工廠類包含如下幾個靜態工廠方法來建立執行緒池。

  • newCachedThreadPool():建立一個具有快取功能的執行緒池,系統根據需要建立執行緒,這些執行緒將會被快取線上程池中。
  • newFixedThreadPool(int nThreads):建立一個可重用的、具有固定執行緒數的執行緒池。
  • newSingleThreadExecutor():建立一個只有單執行緒的執行緒池,它相當於呼叫 newFixedThreadPool() 方法時傳入引數為1。
  • newScheduledThreadPool(int corePoolSize):建立具有指定執行緒數的執行緒池,它可以在指定延遲後執行執行緒任務。 corePoolSize 指池中所儲存的執行緒數,即使執行緒是空閒的也被儲存線上程池內。
  • newSingleThreadScheduledExecutor():建立只有一個執行緒的執行緒池,它可以在指定延遲後執行執行緒任務。
  • ExecutorService newWorkStealingPool(int parallelism):建立持有足夠的執行緒的執行緒池來支援給定的並行級別,該方法還會使用多個佇列來減少競爭。
  • ExecutorService newWorkStealingPool():該方法是前一個方法的簡化版本。如果當前機器有 4 個CPU,則目標並行級別被設定為 4,也就是相當於為前一個方法傳入 4 作為引數。

上面7個方法中的前三個方法返回一個 ExecutorService 物件,該物件代表一個執行緒池,它可以執行 Runnable 物件或 Callable 物件所代表的執行緒;而中間兩個方法返回一個 ScheduledExecutorService 執行緒池,它是 ExecutorService 的子類,它可以在指定延遲後執行執行緒任務;最後兩個方法則是 Java 8 新增的,這兩個方法可充分利用多 CPU 並行的能力。這兩個方法生成的 work stealing 池,都相當於後臺執行緒池,如果所有的前臺執行緒都死亡了,work stealing 池中的執行緒會自動死亡。

由於目前計算機硬體的發展日新月異,即使普通使用者使用的電腦通常也都是多核 CPU,因此 Java 8 線上程支援上也增加了利用多 CPU 並行的能力,這樣可以更好地發揮底層硬體的效能。

ExecutorService 代表儘快執行執行緒的執行緒池(只要執行緒池中有空閒執行緒,就立即執行執行緒任務),程式只要將一個 Runnable 物件或 Callable 物件(代表執行緒任務)提交給該執行緒池,該執行緒池就會盡快執行該任務。 ExecutorService 裡提供瞭如下三個方法。

  • Future <?> submit(Runnable task):將一個 Runnable 物件提交給指定的執行緒池,執行緒池將在有空閒執行緒時執行 Runnable 物件代表的任務。其中 Future 物件代表 Runnable 任務的返回值,但 run() 方法沒有返回值,所以 Future 物件將在 run() 方法執行結束後返回 null 。但可以呼叫 Future 的 isDone()、 isCancelled() 方法來獲得 Runnable 物件的執行狀態。
  • <T> Future <T> submit(Runnable task,T result):將一個 Runnable 物件提交給指定的執行緒池,執行緒池將在有空閒執行緒時執行 Runnable 物件代表的任務。其中 result 顯式指定執行緒執行結束後的返回值,所以 Future 物件將在 run() 方法執行結束後返回 result 。
  • <T> Future <T> submit(Callable <T> task):將一個 Callable 物件提交給指定的執行緒池,執行緒池將在有空閒執行緒時執行 Callable 物件代表的任務。其中 Future 代表 Callable 物件裡 call() 方法的返回值。

ScheduledExecutorService 代表可在指定延遲後或週期性地執行執行緒任務的執行緒池,它提供瞭如下4個方法。

  • ScheduledFuture<V> schedule (Callable<V> callable,long delay,TimeUnit unit):指定 callable 任務將在 delay 延遲後執行。
  • ScheduledFuture<?> schedule(Runnable command,TimeUnit unit):指定 command 任務將在 delay 延遲後執行。
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) : 指定 command 任務將在 delay 延遲後執行,而且以設定頻率重複執行。也就是說,在 initialDelay 後開始執行,依次在 initialDelay + period 、 initialDelay +2* period …處重複執行,依此類推。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,TimeUnit unit):建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務在任一次執行時遇到異常,就會取消後續執行;否則,只能通過程式來顯式取消或終止該任務。

用完一個執行緒池後,應該呼叫該執行緒池的 shutdown() 方法,該方法將啟動執行緒池的關閉序列,呼叫 shutdown() 方法後的執行緒池不再接收新任務,但會將以前所有已提交任務執行完成。當執行緒池中的所有任務都執行完成後,池中的所有執行緒都會死亡;另外也可以呼叫執行緒池的 shutdownNow() 方法來關閉執行緒池,該方法試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

使用執行緒池來執行執行緒任務的步驟如下。

  • 呼叫 Executors 類的靜態工廠方法建立一個 ExecutorService 物件,該物件代表一個執行緒池。
  • 建立 Runnable 實現類或 Callable 實現類的例項,作為執行緒執行任務。
  • 呼叫 ExecutorService 物件的 submit() 方法來提交 Runnable 例項或 Callable 例項。
  • 當不想提交任何任務時,呼叫 ExecutorService 物件的 shutdown() 方法來關閉執行緒池。

下面程式使用執行緒池來執行指定 Runnable 物件所代表的任務。

//實現Runnable介面來定義一個簡單的
class TestThread implements Runnable{
 public void run(){
  for (int i = 0; i < 100 ; i++ ){
   System.out.println(Thread.currentThread().getName()
    + "的i值為:" + i);
  }
 }
}

public class ThreadPoolTest{
 public static void main(String[] args) {
  //建立一個具有固定執行緒數(6)的執行緒池
  ExecutorService pool = Executors.newFixedThreadPool(6);
  //向執行緒池中提交2個執行緒
  pool.submit(new TestThread());
  pool.submit(new TestThread());
  //關閉執行緒池
  pool.shutdown();
 }
}

上面程式中建立 Runnable 實現類與最開始建立執行緒池並沒有太大差別,建立了 Runnable 實現類之後程式沒有直接建立執行緒、啟動執行緒來執行該 Runnable 任務,而是通過執行緒池來執行該任務,使用執行緒池來執行 Runnable 任務的程式碼如程式中粗體字程式碼所示。執行上面程式,將看到兩個執行緒交替執行的效果,如下圖所示。

詳細分析JAVA 執行緒池

Java 8 增強的 ForkJoinPool

現在計算機大多已向多 CPU 方向發展,即使普通 PC ,甚至小型智慧裝置(如手機)、多核處理器也已被廣泛應用。在未來的日子裡,處理器的核心數將會發展到更多。

雖然硬體上的多核 CPU 已經十分成熟,但很多應用程式並未為這種多核 CPU 做好準備,因此並不能很好地利用多核 CPU 的效能優勢。

為了充分利用多 CPU 、多核 CPU 的效能優勢,計算機軟體系統應該可以充分“挖掘”每個 CPU 的計算能力,絕不能讓某個 CPU 處於“空閒”狀態。為了充分利用多 CPU 、多核 CPU 的優勢,可以考慮把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之後,再將這些執行結果合併起來即可。

Java 7 提供了 ForkJoinPool 來支援將一個任務拆分成多個“小任務”平行計算,再把多個“小任務”的結果合併成總的計算結果。 ForkJoinPool 是 ExecutorService 的實現類,因此是一種特殊的執行緒池。ForkJoinPool 提供瞭如下兩個常用的構造器。

  • ForkJoinPool(int parallelism):建立一個包含 parallelism 個並行執行緒的 ForkJoinPool 。
  • ForkJoinPool():以 Runtime.availableProcessors() 方法的返回值作為 parallelism 引數來建立 ForkJoinPool

Java 8 進一步擴充套件了 ForkJoinPool 的功能 ,Java 8 為 ForkJoinPool 增加了通用池功能。 ForkJoinPool 類通過如下兩個靜態方法提供通用池功能。

  • ForkJoinPool commonPool():該方法返回一個通用池,通用池的執行狀態不會受 shutdown() 或 shutdownNow() 方法的影響。當然,如果程式直接執行 System.exit(0); 來終止虛擬機器,通用池以及通用池中正在執行的任務都會被自動終止。
  • int getCommonPoolParallelism():該方法返回通用池的並行級別。

建立了 ForkJoinPool 例項之後,就可呼叫 ForkJoinPool 的 submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執行指定任務了。其中 ForkJoinTask 代表一個可以並行、合併的任務。ForkJoinTask 是一個抽象類,它還有兩個抽象子類 : RecursiveAction 和 RecursiveTask 。其中 RecursiveTask 代表有返回值的任務,而 RecursiveAction 代表沒有返回值的任務。

下面以執行沒有返回值的“大任務”(簡單地列印0〜300的數值)為例,程式將一個“大任務”拆分成多個“小任務”,並將任務交給 ForkJoinPool 來執行。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class PrintTask extends RecursiveAction{
 // 每個“小任務”最多隻列印50個數
 private static final int THRESHOLD = 50;
 private int start;
 private int end;
 
 // 列印從 start 到 end 的任務
 public PrintTask(int start,int end) {
  this.start = start;
  this.end = end;
 }
 
 @Override
 protected void compute() {
  // 當 end 與 start 之間的差小於 THRESHOLD 時,開始列印
  if(end-start<THRESHOLD) {
   for(int i=start;i<end;i++) {
    System.out.println(Thread.currentThread().getName()+"的 i 值:"+i);
   }
  }else {
   // 當 end 與 start 之間的差大於 THRESHOLD 時,即要列印的數超過50個時
   // 將大任務分解成兩個“小任務”
   int middle = (start+end)/2;
   PrintTask left = new PrintTask(start,middle);
   PrintTask right = new PrintTask(middle,end);
   // 並行執行兩個“小任務”
   left.fork();
   right.fork();
  }
 }
}

public class ForkJoinPoolTest {
 public static void main(String[] args) throws Exception {
  ForkJoinPool pool = new ForkJoinPool();
  // 提交可分解的 PrintTask 任務
  pool.submit(new PrintTask(0,300));
  pool.awaitTermination(2,TimeUnit.SECONDS);
  // 關閉執行緒池
  pool.shutdown();
 }
}

上面程式中的粗體字程式碼實現了對指定列印任務的分解,分解後的任務分別呼叫 fork() 方法開始並行執行。執行上面程式,可以看到如下圖所示的結果。

詳細分析JAVA 執行緒池

從如上圖所示的執行結果來看, ForkJoinPool 啟動了 4個執行緒來執行這個列印任務——這是因為測試計算機的 CPU 是4核的。不僅如此,讀者可以看到程式雖然列印了 0〜299這300個數字,但並不是連續列印的,這是因為程式將這個列印任務進行了分解,分解後的任務會並行執行,所以不會按順序從0列印到299。

上面定義的任務是一個沒有返回值的列印任務,如果大任務是有返回值的任務,則可以讓任務繼承 RecursiveTask<T>,其中泛型引數 T 就代表了該任務的返回值型別。下面程式示範了使用 RecursiveTask 對一個長度為100的陣列的元素值進行累加。

package com.jwen.demo4;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;

class CalTask extends RecursiveTask<Integer>{

 
 // 每個“小任務”最多隻累加20個數
 private static final int THRESHOLD = 20;
 private int arr[];
 private int start;
 private int end;
 // 累加從 start 到 end 的陣列元素
 public CalTask(int[] arr,int start,int end) {
  this.arr = arr;
  this.start = start;
  this.end = end;
 }
 
 @Override
 protected Integer compute() {
  int sum = 0;
  // 當 end 與 start 之間的差小於 THRESHOLD 時,開始進行實際累加
  if(end-start<THRESHOLD) {
   for(int i=start;i<end;i++) {
    sum+=arr[i];
   }
   return sum;
  }else {
   // 當 end 與 start 之間的差大於 THRESHOLD 時,即要累加的數超過20個時
   // 將大任務分解成兩個“小任務”
   int middle = (start+end)/2;
   CalTask left = new CalTask(arr,start,middle);
   CalTask right = new CalTask(arr,middle,end);
   // 並行執行兩個“小任務”
   left.fork();
   right.fork();
   // 把兩個“小任務”累加的結果合併起來
   return left.join()+right.join(); // ①
  }
 }
}

public class Sum {
 public static void main(String[] args) throws Exception{
  int[] arr = new int[100];
  Random rand = new Random();
  int total = 0;
  // 初始化100個數字元素
  for(int i=0,len = arr.length;i<len;i++) {
   int tmp = rand.nextInt(20);
   // 對陣列元素賦值,並將陣列元素的值新增到 sum 總和中
   total +=(arr[i]=tmp);
  }
  System.out.println(total);
  // 建立一個通用池
  ForkJoinPool pool = ForkJoinPool.commonPool();
  // 提交可分解的 CalTask 任務
  Future<Integer> future = pool.submit(new CalTask(arr,arr.length));
  System.out.println(future.get());
  // 關閉執行緒池
  pool.shutdown();
 }
}

上面程式與前一個程式基本相似,同樣是將任務進行了分解,並呼叫分解後的任務的 fork() 方法使它們並行執行。與前一個程式不同的是,現在任務是帶返回值的,因此程式還在①號程式碼處將兩個分解後的“小任務”的返回值進行了合併。

執行上面程式,將可以看到程式通過 CalTask 計算出來的總和,與初始化陣列元素時統計出來的總和總是相等,這表明程式一切正常。

Java 的確是一門非常優秀的程式語言,在多 CPU、多核 CPU 時代來到時,Java 語言的多執行緒已經為多核 CPU 做好了準備。

以上就是詳細分析JAVA 執行緒池的詳細內容,更多關於JAVA 執行緒池的資料請關注我們其它相關文章!