理解執行緒池的原理
1.關於執行緒池
執行緒池的技術背景
在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。
所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些”池化資源”技術產生的原因。
例如Android中常見到的很多通用元件一般都離不開”池”的概念,如各種圖片載入庫,網路請求庫,即使Android的訊息傳遞機制中的Meaasge當使用Meaasge.obtain()就是使用的Meaasge池中的物件,因此這個概念很重要。本文將介紹的執行緒池技術同樣符合這一思想。
執行緒池的優點:
- 重用執行緒池中的執行緒,減少因物件建立,銷燬所帶來的效能開銷;
- 能有效的控制執行緒的最大併發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞;
- 能夠多執行緒進行簡單的管理,使執行緒的使用簡單、高效。
java中的執行緒池是通過Executor框架實現的,Executor 框架包括類:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。
Executor: 所有執行緒池的介面,只有一個方法。
public interface Executor {
void execute(Runnable command);
}
ExecutorService: 增加Executor的行為,是Executor實現類的最直接介面。
Executors: 提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService 介面。
ThreadPoolExecutor:執行緒池的具體實現類,一般用的各種執行緒池都是基於這個類實現的。
構造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
- corePoolSize:執行緒池的核心執行緒數,執行緒池中執行的執行緒數也永遠不會超過 corePoolSize 個,預設情況下可以一直存活。可以通過設定allowCoreThreadTimeOut為True,此時 核心執行緒數就是0,此時keepAliveTime控制所有執行緒的超時時間。
- maximumPoolSize:執行緒池允許的最大執行緒數;
- keepAliveTime: 指的是空閒執行緒結束的超時時間;
- unit :是一個列舉,表示 keepAliveTime 的單位;
- workQueue:表示存放任務的BlockingQueue<Runnable佇列。
- BlockingQueue:阻塞佇列(BlockingQueue)是java.util.concurrent下的主要用來控制執行緒同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間才會被喚醒繼續操作。
阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。
執行緒池的工作過程如下:
- 執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。
- 當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
- 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
- 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列;
- 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立非核心執行緒立刻執行這個任務;
- 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常RejectExecutionException。
- 當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
- 當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
執行緒池的建立和使用
生成執行緒池採用了工具類Executors的靜態方法,以下是幾種常見的執行緒池。
SingleThreadExecutor:單個後臺執行緒 (其緩衝佇列是無界的)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
建立一個單執行緒的執行緒池。這個執行緒池只有一個核心執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。
FixedThreadPool:只有核心執行緒的執行緒池,大小固定 (其緩衝佇列是無界的) 。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
CachedThreadPool:無界執行緒池,可以進行自動執行緒回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。SynchronousQueue是一個是緩衝區為1的阻塞佇列。
ScheduledThreadPool:核心執行緒池固定,大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。
public static ExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPool(corePoolSize,
Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
建立一個週期性執行任務的執行緒池。如果閒置,非核心執行緒池會在DEFAULT_KEEPALIVEMILLIS時間內回收。
執行緒池最常用的提交任務的方法有兩種:
execute:
ExecutorService.execute(Runnable runable);
submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
submit(Callable callable)的實現,submit(Runnable runnable)同理。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
FutureTask<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看出submit開啟的是有返回結果的任務,會返回一個FutureTask物件,這樣就能通過get()方法得到結果。submit最終呼叫的也是execute(Runnable runable),submit只是將Callable物件或Runnable封裝成一個FutureTask物件,因為FutureTask是個Runnable,所以可以在execute中執行。關於Callable物件和Runnable怎麼封裝成FutureTask物件,見Callable和Future、FutureTask的使用。
執行緒池實現的原理
如果只講執行緒池的使用,那這篇部落格沒有什麼大的價值,充其量也就是熟悉Executor相關API的過程。執行緒池的實現過程沒有用到Synchronized關鍵字,用的都是,Lock和同步(阻塞)佇列,Atomic相關類,FutureTask等等,因為後者的效能更優。理解的過程可以很好的學習原始碼中併發控制的思想。
在開篇提到過執行緒池的優點是可總結為以下三點:
- 執行緒複用
- 控制最大併發數
- 管理執行緒
1.執行緒複用過程
理解執行緒複用原理首先應瞭解執行緒生命週期。
線上程的生命週期中,它要經過新建(New)、就緒(Runnable)、執行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態。
Thread通過new來新建一個執行緒,這個過程是是初始化一些執行緒資訊,如執行緒名,id,執行緒所屬group等,可以認為只是個普通的物件。呼叫Thread的start()後Java虛擬機器會為其建立方法呼叫棧和程式計數器,同時將hasBeenStarted為true,之後呼叫start方法就會有異常。
處於這個狀態中的執行緒並沒有開始執行,只是表示該執行緒可以運行了。至於該執行緒何時開始執行,取決於JVM裡執行緒排程器的排程。當執行緒獲取cpu後,run()方法會被呼叫。不要自己去呼叫Thread的run()方法。之後根據CPU的排程在就緒——執行——阻塞間切換,直到run()方法結束或其他方式停止執行緒,進入dead狀態。
所以實現執行緒複用的原理應該就是要保持執行緒處於存活狀態(就緒,執行或阻塞)。接下來來看下ThreadPoolExecutor是怎麼實現執行緒複用的。
在ThreadPoolExecutor主要Worker類來控制執行緒的複用。看下Worker類簡化後的程式碼,這樣方便理解:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
Worker是一個Runnable,同時擁有一個thread,這個thread就是要開啟的執行緒,在新建Worker物件時同時新建一個Thread物件,同時將Worker自己作為引數傳入TThread,這樣當Thread的start()方法呼叫時,執行的實際上是Worker的run()方法,接著到runWorker()中,有個while迴圈,一直從getTask()裡得到Runnable物件,順序執行。getTask()又是怎麼得到Runnable物件的呢?
依舊是簡化後的程式碼:
private Runnable getTask() {
if(一些特殊情況) {
return null;
}
Runnable r = workQueue.take();
return r;
}
這個workQueue就是初始化ThreadPoolExecutor時存放任務的BlockingQueue佇列,這個佇列裡的存放的都是將要執行的Runnable任務。因為BlockingQueue是個阻塞佇列,BlockingQueue.take()得到如果是空,則進入等待狀態直到BlockingQueue有新的物件被加入時喚醒阻塞的執行緒。所以一般情況Thread的run()方法就不會結束,而是不斷執行從workQueue裡的Runnable任務,這就達到了執行緒複用的原理了。
2.控制最大併發數
那Runnable是什麼時候放入workQueue?Worker又是什麼時候建立,Worker裡的Thread的又是什麼時候呼叫start()開啟新執行緒來執行Worker的run()方法的呢?有上面的分析看出Worker裡的runWorker()執行任務時是一個接一個,序列進行的,那併發是怎麼體現的呢?
很容易想到是在execute(Runnable runnable)時會做上面的一些任務。看下execute裡是怎麼做的。
execute:
簡化後的程式碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 當前執行緒數 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接啟動新的執行緒。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活動執行緒數 >= corePoolSize
// runState為RUNNING && 佇列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢驗是否為RUNNING狀態
// 非RUNNING狀態 則從workQueue中移除任務並拒絕
if (!isRunning(recheck) && remove(command))
reject(command);// 採用執行緒池指定的策略拒絕任務
// 兩種情況:
// 1.非RUNNING狀態拒絕新的任務
// 2.佇列滿了啟動新的執行緒失敗(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
addWorker:
簡化後的程式碼
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
根據程式碼再來看上面提到的執行緒池工作過程中的新增任務的情況:
* 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
* 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列;
* 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立非核心執行緒立刻執行這個任務;
* 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常RejectExecutionException。
這就是Android的AsyncTask在並行執行是在超出最大任務數是丟擲RejectExecutionException的原因所在,詳見基於最新版本的AsyncTask原始碼解讀及AsyncTask的黑暗面
通過addWorker如果成功建立新的執行緒成功,則通過start()開啟新執行緒,同時將firstTask作為這個Worker裡的run()中執行的第一個任務。
雖然每個Worker的任務是序列處理,但如果建立了多個Worker,因為共用一個workQueue,所以就會並行處理了。
所以根據corePoolSize和maximumPoolSize來控制最大併發數。大致過程可用下圖表示。
上面的講解和圖來可以很好的理解的這個過程。
如果是做Android開發的,並且對Handler原理比較熟悉,你可能會覺得這個圖挺熟悉,其中的一些過程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相當於execute(Runnuble),Looper中維護的Meaasge佇列相當於BlockingQueue,只不過需要自己通過同步來維護這個佇列,Looper中的loop()函式迴圈從Meaasge佇列取Meaasge和Worker中的runWork()不斷從BlockingQueue取Runnable是同樣的道理。
3.管理執行緒
通過執行緒池可以很好的管理執行緒的複用,控制併發數,以及銷燬等過程,執行緒的複用和控制併發上面已經講了,而執行緒的管理過程已經穿插在其中了,也很好理解。
在ThreadPoolExecutor有個ctl的AtomicInteger變數。通過這一個變數儲存了兩個內容:
- 所有執行緒的數量
- 每個執行緒所處的狀態
其中低29位存執行緒數,高3位存runState,通過位運算來得到不同的值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到執行緒的狀態
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//得到Worker的的數量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 判斷執行緒是否在執行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
這裡主要通過shutdown和shutdownNow()來分析執行緒池的關閉過程。首先執行緒池有五種狀態來控制任務新增與執行。主要介紹以下三種:
- RUNNING狀態:執行緒池正常執行,可以接受新的任務並處理佇列中的任務;
- SHUTDOWN狀態:不再接受新的任務,但是會執行佇列中的任務;
- STOP狀態:不再接受新任務,不處理佇列中的任務
shutdown這個方法會將runState置為SHUTDOWN,會終止所有空閒的執行緒,而仍在工作的執行緒不受影響,所以佇列中的任務人會被執行。shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的執行緒,所以佇列中的任務也不會被執行了。
總結
通過對ThreadPoolExecutor原始碼的分析,從總體上了解了執行緒池的建立,任務的新增,執行等過程,熟悉這些過程,使用執行緒池就會更輕鬆了。
而從中學到的一些對併發控制,以及生產者——消費者模型任務處理的使用,對以後理解或解決其他相關問題會有很大的幫助。比如Android中的Handler機制,而Looper中的Messager佇列用一個BlookQueue來處理同樣是可以的,這寫就是讀原始碼的收穫吧。
深入理解執行緒池及其原理
我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?
在Java中可以通過執行緒池來達到這樣的效果。今天我們就來詳細講解一下Java的執行緒池,首先我們從最核心的ThreadPool類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置執行緒池的大小。
以下是本文的目錄大綱:
- 一.Java中的ThreadPoolExecutor類
- 二.深入剖析執行緒池實現原理
- 三.使用示例
- 四.如何合理配置執行緒池的大小
若有不正之處請多多諒解,並歡迎批評指正。
一.Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的程式碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的原始碼具體實現,發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個引數的含義:
- corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
- maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
- keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
- unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
- workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。
- threadFactory:執行緒工廠,主要用來建立執行緒;
- handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
具體引數的配置與執行緒池的關係將在下一節講述。
從上面給出的ThreadPoolExecutor類的程式碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Futu