1. 程式人生 > 其它 >9.java中的執行緒池

9.java中的執行緒池

Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。在開發過程中,合理地使用執行緒池能夠帶來3個好處。

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。但是,要做到合理利用執行緒池,必須對其實現原理了如指掌。

一、執行緒池的實現原理

當向執行緒池提交一個任務之後,執行緒池是如何處理這個任務的呢?本節來看一下執行緒池的主要處理流程,處理流程圖如圖9-1所示。

從圖中可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下。

  • 1)執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。如果不是,則建立一個新的工作執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進入下個流程。

  • 2)執行緒池判斷工作(阻塞)佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這個工作佇列裡。如果工作佇列滿了,則進入下個流程。

  • 3)執行緒池判斷執行緒池的處於工作的執行緒是否達到了Max引數。如果沒有,則建立一個新的工作執行緒來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

ThreadPoolExecutor執行execute()方法的示意圖,如圖9-2所示

ThreadPoolExecutor執行execute方法分下面4種情況。

  • 1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟需要獲取全域性鎖)。

  • 2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。

  • 3)如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。

  • 4)如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor採取上述步驟的總體設計思路,是為了在執行execute()方法時,儘可能地避免獲取全域性鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後(當前執行的執行緒數大於等於corePoolSize),幾乎所有的execute()方法呼叫都是執行步驟2,而步驟2不需要獲取全域性鎖。

原始碼分析:上面的流程分析讓我們很直觀地瞭解了執行緒池的工作原理,讓我們再通過原始碼來看看是如何實現的,執行緒池執行任務的方法如下。

public void execute(Runnable command) {

	if (command == null)

		throw new NullPointerException();

	// 如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務

	if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

		// 如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。

		if (runState == RUNNING && workQueue.offer(command)) {

			if (runState != RUNNING || poolSize == 0)

				ensureQueuedTaskHandled(command);

		}

		// 如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,

		// 則建立一個執行緒執行任務。

		else if (!addIfUnderMaximumPoolSize(command))

		// 丟擲RejectedExecutionException異常

			reject(command); // is shutdown or saturated

	}

}

工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會迴圈獲取工作佇列裡的任務來執行。我們可以從Worker類的run()方法裡看到這點

public void run() {

	try {

		Runnable task = firstTask;

		firstTask = null;

		while (task != null || (task = getTask()) != null) {

			runTask(task);task = null;

		}

	} finally {

		workerDone(this);

	}

}

ThreadPoolExecutor中執行緒執行任務的示意圖如圖9-3所示。

執行緒池中的執行緒執行任務分兩種情況,如下。

  • 1)在execute()方法中建立一個執行緒時,會讓這個執行緒執行當前任務。

  • 2)這個執行緒執行完上圖中1的任務後,會反覆從BlockingQueue獲取任務來執行。

二、執行緒池的使用

2.1、執行緒池的建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池。

public ThreadPoolExecutor(int corePoolSize,  //核心執行緒池大小
                          int maximumPoolSize,  //最大執行緒池大小
                          long keepAliveTime,  //超時了沒有人呼叫就會釋放
                          TimeUnit unit,  //超時單位
                          BlockingQueue<Runnable> workQueue,  //阻塞佇列
                          ThreadFactory threadFactory,  //執行緒工廠
                          RejectedExecutionHandler handler//拒絕策略
						  ) {  
    if (corePoolSize < 0 ||  
        maximumPoolSize <= 0 ||  
        maximumPoolSize < corePoolSize ||  
        keepAliveTime < 0)  
        throw new IllegalArgumentException();  
    if (workQueue == null || threadFactory == null || handler == null)  
        throw new NullPointerException();  
    this.acc = System.getSecurityManager() == null ?  
            null :  
            AccessController.getContext();  
    this.corePoolSize = corePoolSize;  
    this.maximumPoolSize = maximumPoolSize;  
    this.workQueue = workQueue;  
    this.keepAliveTime = unit.toNanos(keepAliveTime);  
    this.threadFactory = threadFactory;  
    this.handler = handler;  
}

5)RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略。

  • ·AbortPolicy:直接丟擲異常。

  • ·CallerRunsPolicy:只用呼叫者所線上程來執行任務。

  • ·DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。

  • ·DiscardPolicy:不處理,丟棄掉。

2.2、向執行緒池提交任務

可以使用兩個方法向執行緒池提交任務,分別為execute()和submit()方法

execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute()方法輸入的任務是一個Runnable類的例項。

threadsPool.execute(new Runnable() {

	@Override

	public void run() {

		// TODO Auto-generated method stub

	}

});

submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);

try {

	Object s = future.get();

} catch (InterruptedException e) {

	// 處理中斷異常

} catch (ExecutionException e) {

	// 處理無法執行任務異常

} finally {

	// 關閉執行緒池

	executor.shutdown();

}

2.3、關閉執行緒池

可以通過呼叫執行緒池的shutdown或shutdownNow方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒

只要呼叫了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫shutdown方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow方法

2.4、合理地配置執行緒池

要想合理地配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來分析。

  • 任務的性質:CPU密集型任務(CPU數量)、IO密集型任務(CPU數量*2)和混合型任務(CPU數量)

  • 任務的優先順序:高、中和低。

  • 任務的執行時間:長、中和短。

  • 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

性質不同的任務可以用不同規模的執行緒池分開處理。CPU密集型任務應配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先執行。

注意 如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的執行緒池來處理,或者可以使用優先順序佇列,讓執行時間短的任務先執行。

依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼執行緒數應該設定得越大,這樣才能更好地利用CPU。建議使用有界佇列。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有一次,我們系統裡後臺任務執行緒池的佇列和執行緒池全滿了,不斷丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞,任務積壓線上程池裡。如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。當然,我們的系統所有的任務是用單獨的伺服器部署的,我們使用不同規模的執行緒池完成不同型別的任務,但是出現這樣問題時也會影響到其他任務。

2.5、執行緒池的監控

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,在監控執行緒池的時候可以使用以下屬性。

  • ·taskCount:執行緒池需要執行的任務數量。

  • ·completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。

  • ·largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。

  • ·getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。

  • ·getActiveCount:獲取活動的執行緒數。
    通過擴充套件執行緒池進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的

beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控(類似於前置通知那一套)。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。

protected void beforeExecute(Thread t, Runnable r) { }

三、本章小結

在工作中我經常發現,很多人因為不瞭解執行緒池的實現原理,把執行緒池配置錯誤,從而導致了各種問題。本章介紹了為什麼要使用執行緒池、如何使用執行緒池和執行緒池的使用原理,相信閱讀完本章之後,讀者能更準確、更有效地使用執行緒池