1. 程式人生 > 其它 >java執行緒池深入講解

java執行緒池深入講解

目錄

1 執行緒池介紹

1.1 執行緒池概念

SunJava5中,對Java執行緒的類庫做了大量的擴充套件,其中執行緒池就是Java5的新特徵之一,除了執行緒池之外,還有很多多執行緒相關的內容,為多執行緒的程式設計帶來了極大便利。為了編寫高效穩定可靠的多執行緒程式,執行緒部分的新增內容顯得尤為重要。

有關Java5執行緒新特徵的內容全部在java.util.concurrent下面,裡面包含數目眾多的介面和類,熟悉這部分API特徵是一項艱難的學習過程

執行緒池的基本思想還是一種物件池的思想,開闢一塊記憶體空間,裡面存放了眾多(未死亡)的執行緒,池中執行緒執行排程由池管理器來處理。當有執行緒任務時,從池中取一個,執行完成後執行緒物件歸池,這樣可以避免反覆建立執行緒物件所帶來的效能開銷,節省了系統的資源。

1.2 執行緒池好處

合理利用執行緒池能夠帶來三個好處
第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗
第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行
第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。

2 執行緒池的使用

2.1 執行緒池的建立

2.1.1 通過ThreadPoolExecutor建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池

new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, milliseconds,
runnableTaskQueue, threadFactory,handler);

建立一個執行緒池需要輸入幾個引數:

  • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列:
    ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
    LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
    SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
    PriorityBlockingQueue:一個具有優先順序得無限阻塞佇列。
  • maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,Debug和定位問題時非常又幫助。
  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略:
    AbortPolicy:直接丟擲異常。
    CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    DiscardPolicy:不處理,丟棄掉。
    當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)

自定義連線池稍微麻煩些,不過通過建立的ThreadPoolExecutor執行緒池物件,可以獲取到當前執行緒池的尺寸、正在執行任務的執行緒數、工作佇列等等

2.1.2 通過Executors方式建立

Java通過Executors(jdk1.5併發包)提供四種執行緒池,分別為:

  • newCachedThreadPool:建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒
ExecutorService pool = Executors.newCachedThreadPool(); 
  • newFixedThreadPool:建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
ExecutorService pool = Executors.newFixedThreadPool(2);
  • newScheduledThreadPool:建立一個定長執行緒池,支援定時及週期性任務執行
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); 
將執行緒放入池中進行執行 
pool.execute(new MyThread()); 
使用延遲執行風格的方法 
pool.schedule(new MyThread(), 10, TimeUnit.MILLISECONDS); 
class MyThread extends Thread { 
        @Override 
        public void run() { 
                System.out.println(Thread.currentThread().getName() + "正在執行。。。"); 
        } 
}
  • newSingleThreadExecutor:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行
ExecutorService pool = Executors.newSingleThreadExecutor(); 

2.2 執行緒池提交的返回值

2.2.1 無返回值

2.2.1.1 execute提交

可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute方法輸入的任務是一個Runnable類的例項

threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});

2.2.1.2 實現Runnable介面

無返回值的任務必須Runnable介面重寫run方法,方法的異常只能在內部消化,不能繼續上拋

2.2.2 有返回值

2.2.2.1 submit提交

我們也可以使用submit方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過futureget方法來獲取返回值, get方法會阻塞住直到任務完成 ,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉執行緒池
executor.shutdown();
}

2.2.2.2 實現Callable介面

返回值的任務必須實現Callable介面重寫call方法且方法允許丟擲異常
要想有返回值,那麼submit的組合:

  • submit(Callable<T> task)能獲取到它的返回值,通過future.get()獲取(阻塞直到任務執行完)。一般使用FutureTask+Callable配合使用
  • submit(Runnable task, T result)能通過傳入的載體result間接獲得執行緒的返回值。
  • submit(Runnable task)則是沒有返回值的,就算獲取它的返回值也是null

2.2.3 使用例子

public void testFuture() throws InterruptedException {
       ExecutorService executor = Executors.newCachedThreadPool();
       Task task = new Task();
       NewTask newTask = new NewTask();
       Future<Integer> result = executor.submit(task);
       Future<String> ends = executor.submit(newTask);
       executor.shutdown();
       System.out.println("主執行緒開始執行");
       System.out.println("主執行緒做一些複雜任務");
       Thread.sleep(10000);
       System.out.println("主執行緒需要子執行緒的計算結果");
       try {
               System.out.println("主執行緒得到子執行緒的結果:"+result.get());
               System.out.println("主執行緒需要第二個子執行緒的資料:"+ends.get());
        } catch (InterruptedException e) {
               e.printStackTrace();
        } catch (ExecutionException e) {
               e.printStackTrace();
        }
        System.out.println("所有均完畢");
}
class Task implements Callable<Integer>{
        public Integer call() throws Exception {
                System.out.println("子執行緒計算開始");
                Thread.sleep(3000);
                int sum = 0;
                for (int i=0;i<100000;i++){
                        sum += i ;
                }
                System.out.println("子執行緒已經計算完畢");
                return sum;
        }
}
class  NewTask implements  Callable<String>{
        public String call() throws Exception {
                System.out.println("第二個子執行緒已經執行完畢");
                return "success";
        }
}

2.3 執行緒池的關閉

我們可以通過呼叫執行緒池的shutdownshutdownNow方法來關閉執行緒池,但是它們的實現原理不同:
shutdown的原理是隻是將執行緒池的狀態設定成SHUTDOWN狀態,然後 中斷所有沒有正在執行任務的執行緒
shutdownNow的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表。
只要呼叫了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於我們應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫shutdown來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow

3 執行緒池的分析

3.1 流程分析

Java執行緒池主要工作流程:
當提交一個新任務到執行緒池時,執行緒池的處理流程如下:

  • 首先執行緒池判斷基本執行緒池是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
  • 其次執行緒池判斷工作佇列是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
  • 最後執行緒池判斷整個執行緒池是否已滿?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給飽和策略來處理這個任務。

另外:當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒
當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉

3.2 原始碼分析

上面的流程分析讓我們很直觀的瞭解的執行緒池的工作原理,讓我們再通過原始碼來看看是如何實現的。執行緒池執行任務的方法如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,則建立一個執行緒執行任務。
else if (!addIfUnderMaximumPoolSize(command))
//丟擲RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}

工作執行緒
執行緒池建立執行緒時,會將執行緒封裝成工作執行緒WorkerWorker在執行完任務後,還會無限迴圈獲取工作佇列裡的任務來執行。我們可以從Workerrun方法裡看到這點:

public void run() {
     try {
          Runnable task = firstTask;
          firstTask = null;
          while (task != null || (task = getTask()) != null) {
                   runTask(task);
                    task = null;
           }
      } finally {
             workerDone(this);
    }
}

4 合理的配置執行緒池

4.1 執行緒池分析

要想合理的配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  • 任務的性質CPU密集型任務,IO密集型任務和混合型任務。
  • 任務的優先順序:高,中和低。
  • 任務的執行時間:長,中和短。
  • 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

任務性質不同的任務可以用不同規模的執行緒池分開處理。CPU密集型任務配置儘可能少的執行緒數量,如配置Ncpu+1個執行緒的執行緒池。IO密集型任務則由於需要等待IO操作,執行緒並不是一直在執行任務,則配置儘可能多的執行緒,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於序列執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先得到執行,需要注意的是如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的執行緒池來處理,或者也可以使用優先順序佇列,讓執行時間短的任務先執行。

依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼執行緒數應該設定越大,這樣才能更好的利用CPU

4.2 有界佇列

建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次使用的後臺任務執行緒池的佇列和執行緒池全滿了,不斷的丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞住,任務積壓線上程池裡。如果當時我們設定成無界佇列,執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題

5 執行緒池的監控

5.1 通過執行緒池提供引數監控

執行緒池裡有一些屬性在監控執行緒池的時候可以使用:

  • taskCount:執行緒池需要執行的任務數量。
  • completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount
  • largestPoolSize:執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。
  • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
  • getActiveCount:獲取活動的執行緒數。

5.2 通過擴充套件執行緒池進行監控

通過繼承執行緒池並重寫執行緒池的beforeExecuteafterExecuteterminated方法,我們可以在任務執行前執行後執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }

附:Spring執行緒池ThreadPoolTaskExecutor的使用