1. 程式人生 > >Java 執行緒池詳解及例項程式碼

Java 執行緒池詳解及例項程式碼

這篇文章主要介紹了Java 執行緒池的相關資料,並符例項程式碼,幫助大家學習參考,需要的朋友可以參考下

執行緒池的技術背景

在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。

所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些”池化資源”技術產生的原因。

例如Android中常見到的很多通用元件一般都離不開”池”的概念,如各種圖片載入庫,網路請求庫,即使Android的訊息傳遞機制中的Meaasge當使用Meaasge.obtain()就是使用的Meaasge池中的物件,因此這個概念很重要。本文將介紹的執行緒池技術同樣符合這一思想。

執行緒池的優點:

1.重用執行緒池中的執行緒,減少因物件建立,銷燬所帶來的效能開銷;

2.能有效的控制執行緒的最大併發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞;

3.能夠多執行緒進行簡單的管理,使執行緒的使用簡單、高效。

執行緒池框架Executor

java中的執行緒池是通過Executor框架實現的,Executor 框架包括類:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。

Executor: 所有執行緒池的介面,只有一個方法。

public interface Executor {  
 void execute(Runnable command);  
}

ExecutorService: 增加Executor的行為,是Executor實現類的最直接介面。

Executors: 提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService 介面。

ThreadPoolExecutor:執行緒池的具體實現類,一般用的各種執行緒池都是基於這個類實現的。 構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

    Executors.defaultThreadFactory(), defaultHandler);

}

corePoolSize:執行緒池的核心執行緒數,執行緒池中執行的執行緒數也永遠不會超過 corePoolSize 個,預設情況下可以一直存活。可以通過設定allowCoreThreadTimeOut為True,此時 核心執行緒數就是0,此時keepAliveTime控制所有執行緒的超時時間。

maximumPoolSize:執行緒池允許的最大執行緒數;

keepAliveTime: 指的是空閒執行緒結束的超時時間;

unit :是一個列舉,表示 keepAliveTime 的單位;

workQueue:表示存放任務的BlockingQueue<Runnable佇列。

BlockingQueue:阻塞佇列(BlockingQueue)是java.util.concurrent下的主要用來控制執行緒同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間才會被喚醒繼續操作。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。

執行緒池的工作過程如下:

執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。

當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:

如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;

如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列;

如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立非核心執行緒立刻執行這個任務;

如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常RejectExecutionException。

當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。

當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

執行緒池的建立和使用

生成執行緒池採用了工具類Executors的靜態方法,以下是幾種常見的執行緒池。

SingleThreadExecutor:單個後臺執行緒 (其緩衝佇列是無界的)

public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,         
  0L, TimeUnit.MILLISECONDS,         
  new LinkedBlockingQueue<Runnable>())); 
}

建立一個單執行緒的執行緒池。這個執行緒池只有一個核心執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

FixedThreadPool:只有核心執行緒的執行緒池,大小固定 (其緩衝佇列是無界的) 。

public static ExecutorService newFixedThreadPool(int nThreads) {         
        return new ThreadPoolExecutor(nThreads, nThreads,                                       
            0L, TimeUnit.MILLISECONDS,                                         
            new LinkedBlockingQueue<Runnable>());     
}

建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

CachedThreadPool:無界執行緒池,可以進行自動執行緒回收。

public static ExecutorService newCachedThreadPool() {   
 return new ThreadPoolExecutor(0,Integer.MAX_VALUE,           
   60L, TimeUnit.SECONDS,          
   new SynchronousQueue<Runnable>());  
}

如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。SynchronousQueue是一個是緩衝區為1的阻塞佇列。

ScheduledThreadPool:核心執行緒池固定,大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {   
 return new ScheduledThreadPool(corePoolSize, 
    Integer.MAX_VALUE,             
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             
    new DelayedWorkQueue()); 
}

建立一個週期性執行任務的執行緒池。如果閒置,非核心執行緒池會在DEFAULT_KEEPALIVEMILLIS時間內回收。

執行緒池最常用的提交任務的方法有兩種:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

submit(Callable callable)的實現,submit(Runnable runnable)同理。

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

可以看出submit開啟的是有返回結果的任務,會返回一個FutureTask物件,這樣就能通過get()方法得到結果。submit最終呼叫的也是execute(Runnable runable),submit只是將Callable物件或Runnable封裝成一個FutureTask物件,因為FutureTask是個Runnable,所以可以在execute中執行。關於Callable物件和Runnable怎麼封裝成FutureTask物件,見Callable和Future、FutureTask的使用。

執行緒池實現的原理

如果只講執行緒池的使用,那這篇部落格沒有什麼大的價值,充其量也就是熟悉Executor相關API的過程。執行緒池的實現過程沒有用到Synchronized關鍵字,用的都是Volatile,Lock和同步(阻塞)佇列,Atomic相關類,FutureTask等等,因為後者的效能更優。理解的過程可以很好的學習原始碼中併發控制的思想。

在開篇提到過執行緒池的優點是可總結為以下三點:

執行緒複用

控制最大併發數

管理執行緒

1.執行緒複用過程

理解執行緒複用原理首先應瞭解執行緒生命週期。

線上程的生命週期中,它要經過新建(New)、就緒(Runnable)、執行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態。

Thread通過new來新建一個執行緒,這個過程是是初始化一些執行緒資訊,如執行緒名,id,執行緒所屬group等,可以認為只是個普通的物件。呼叫Thread的start()後Java虛擬機器會為其建立方法呼叫棧和程式計數器,同時將hasBeenStarted為true,之後呼叫start方法就會有異常。

處於這個狀態中的執行緒並沒有開始執行,只是表示該執行緒可以運行了。至於該執行緒何時開始執行,取決於JVM裡執行緒排程器的排程。當執行緒獲取cpu後,run()方法會被呼叫。不要自己去呼叫Thread的run()方法。之後根據CPU的排程在就緒——執行——阻塞間切換,直到run()方法結束或其他方式停止執行緒,進入dead狀態。

所以實現執行緒複用的原理應該就是要保持執行緒處於存活狀態(就緒,執行或阻塞)。接下來來看下ThreadPoolExecutor是怎麼實現執行緒複用的。

在ThreadPoolExecutor主要Worker類來控制執行緒的複用。看下Worker類簡化後的程式碼,這樣方便理解:

private final class Worker implements Runnable {
final Thread thread;

Runnable firstTask;

Worker(Runnable firstTask) {

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

public void run() {

runWorker(this);

}

final void runWorker(Worker w) {

Runnable task = w.firstTask;

w.firstTask = null;

while (task != null || (task = getTask()) != null){

task.run();

}

}

Worker是一個Runnable,同時擁有一個thread,這個thread就是要開啟的執行緒,在新建Worker物件時同時新建一個Thread物件,同時將Worker自己作為引數傳入TThread,這樣當Thread的start()方法呼叫時,執行的實際上是Worker的run()方法,接著到runWorker()中,有個while迴圈,一直從getTask()裡得到Runnable物件,順序執行。getTask()又是怎麼得到Runnable物件的呢?

依舊是簡化後的程式碼:

private Runnable getTask() {
 if(一些特殊情況) {
  return null;
 }
Runnable r = workQueue.take();

return r;

}

這個workQueue就是初始化ThreadPoolExecutor時存放任務的BlockingQueue佇列,這個佇列裡的存放的都是將要執行的Runnable任務。因為BlockingQueue是個阻塞佇列,BlockingQueue.take()得到如果是空,則進入等待狀態直到BlockingQueue有新的物件被加入時喚醒阻塞的執行緒。所以一般情況Thread的run()方法就不會結束,而是不斷執行從workQueue裡的Runnable任務,這就達到了執行緒複用的原理了。

2.控制最大併發數

那Runnable是什麼時候放入workQueue?Worker又是什麼時候建立,Worker裡的Thread的又是什麼時候呼叫start()開啟新執行緒來執行Worker的run()方法的呢?有上面的分析看出Worker裡的runWorker()執行任務時是一個接一個,序列進行的,那併發是怎麼體現的呢?

很容易想到是在execute(Runnable runnable)時會做上面的一些任務。看下execute裡是怎麼做的。

execute:

簡化後的程式碼

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();

// 當前執行緒數 < corePoolSize

if (workerCountOf(c) < corePoolSize) {

// 直接啟動新的執行緒。

if (addWorker(command, true))

return;

c = ctl.get();

}

// 活動執行緒數 >= corePoolSize

// runState為RUNNING && 佇列未滿

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

// 再次檢驗是否為RUNNING狀態

// 非RUNNING狀態 則從workQueue中移除任務並拒絕

if (!isRunning(recheck) && remove(command))

reject(command);// 採用執行緒池指定的策略拒絕任務

// 兩種情況:

// 1.非RUNNING狀態拒絕新的任務

// 2.佇列滿了啟動新的執行緒失敗(workCount > maximumPoolSize)

} else if (!addWorker(command, false))

reject(command);

}

addWorker:

簡化後的程式碼

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);

if (wc >= (core ? corePoolSize : maximumPoolSize)) {

return false;

}

w = new Worker(firstTask);

final Thread t = w.thread;

t.start();

}

根據程式碼再來看上面提到的執行緒池工作過程中的新增任務的情況:

* 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;   
* 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列;
* 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立非核心執行緒立刻執行這個任務;
* 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常RejectExecutionException。

這就是Android的AsyncTask在並行執行是在超出最大任務數是丟擲RejectExecutionException的原因所在,詳見基於最新版本的AsyncTask原始碼解讀及AsyncTask的黑暗面

通過addWorker如果成功建立新的執行緒成功,則通過start()開啟新執行緒,同時將firstTask作為這個Worker裡的run()中執行的第一個任務。

雖然每個Worker的任務是序列處理,但如果建立了多個Worker,因為共用一個workQueue,所以就會並行處理了。

所以根據corePoolSize和maximumPoolSize來控制最大併發數。大致過程可用下圖表示。

上面的講解和圖來可以很好的理解的這個過程。

如果是做Android開發的,並且對Handler原理比較熟悉,你可能會覺得這個圖挺熟悉,其中的一些過程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相當於execute(Runnuble),Looper中維護的Meaasge佇列相當於BlockingQueue,只不過需要自己通過同步來維護這個佇列,Looper中的loop()函式迴圈從Meaasge佇列取Meaasge和Worker中的runWork()不斷從BlockingQueue取Runnable是同樣的道理。

3.管理執行緒

通過執行緒池可以很好的管理執行緒的複用,控制併發數,以及銷燬等過程,執行緒的複用和控制併發上面已經講了,而執行緒的管理過程已經穿插在其中了,也很好理解。

在ThreadPoolExecutor有個ctl的AtomicInteger變數。通過這一個變數儲存了兩個內容:

所有執行緒的數量 每個執行緒所處的狀態 其中低29位存執行緒數,高3位存runState,通過位運算來得到不同的值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到執行緒的狀態

private static int runStateOf(int c) {

return c & ~CAPACITY;

}

//得到Worker的的數量

private static int workerCountOf(int c) {

return c & CAPACITY;

}

// 判斷執行緒是否在執行

private static boolean isRunning(int c) {

return c < SHUTDOWN;

}

這裡主要通過shutdown和shutdownNow()來分析執行緒池的關閉過程。首先執行緒池有五種狀態來控制任務新增與執行。主要介紹以下三種:

RUNNING狀態:執行緒池正常執行,可以接受新的任務並處理佇列中的任務;

SHUTDOWN狀態:不再接受新的任務,但是會執行佇列中的任務;

STOP狀態:不再接受新任務,不處理佇列中的任務 shutdown這個方法會將runState置為SHUTDOWN,會終止所有空閒的執行緒,而仍在工作的執行緒不受影響,所以佇列中的任務人會被執行。

shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的執行緒,所以佇列中的任務也不會被執行了。

總結
通過對ThreadPoolExecutor原始碼的分析,從總體上了解了執行緒池的建立,任務的新增,執行等過程,熟悉這些過程,使用執行緒池就會更輕鬆了。

而從中學到的一些對併發控制,以及生產者——消費者模型任務處理的使用,對以後理解或解決其他相關問題會有很大的幫助。比如Android中的Handler機制,而Looper中的Messager佇列用一個BlookQueue來處理同樣是可以的,這寫就是讀原始碼的收穫吧。