併發新特性—Executor 框架與執行緒池
併發新特性—Executor 框架與執行緒池
Executor 框架簡介
在 Java 5 之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor 框架便是 Java 5 中引入的,其內部使用了執行緒池機制,它在 java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計的操作。因此,在 Java 5之後,通過 Executor 來啟動執行緒比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用執行緒池實現,節約開銷)外,還有關鍵的一點:有助於避免 this 逃逸問題——如果我們在構造器中啟動一個執行緒,因為另一個任務可能會在構造器結束之前開始執行,此時可能會訪問到初始化了一半的物件用 Executor 在構造器中。
Executor 框架包括:執行緒池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。
Executor 介面中之定義了一個方法 execute(Runnable command),該方法接收一個 Runable 例項,它用來執行一個任務,任務即一個實現了 Runnable 介面的類。ExecutorService 介面繼承自 Executor 介面,它提供了更豐富的實現多執行緒的方法,比如,ExecutorService 提供了關閉自己的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。 可以呼叫 ExecutorService 的 shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致 ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉 ExecutorService。因此我們一般用該介面來實現和管理多執行緒。
ExecutorService 的生命週期包括三種狀態:執行、關閉、終止。建立後便進入執行狀態,當呼叫了 shutdown()方法時,便進入關閉狀態,此時意味著 ExecutorService 不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完後,便到達終止狀態。如果不呼叫 shutdown()方法,ExecutorService 會一直處在執行狀態,不斷接收新的任務,執行新的任務,伺服器端一般不需要關閉它,保持一直執行即可。
Executors 提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了 ExecutorService 介面。
public static ExecutorService newFixedThreadPool(int nThreads)
建立固定數目執行緒的執行緒池。
public static ExecutorService newCachedThreadPool()
建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線 程並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。
public static ExecutorService newSingleThreadExecutor()
建立一個單執行緒化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
這四種方法都是用的 Executors 中的 ThreadFactory 建立的執行緒,下面就以上四個方法做個比較:
newCachedThreadPool()
- 快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就 reuse 如果沒有,就建一個新的執行緒加入池中
- 快取型池子通常用於執行一些生存期很短的非同步型任務 因此在一些面向連線的 daemon 型 SERVER 中用得不多。但對於生存期短的非同步任務,它是 Executor 的首選。
-
能 reuse 的執行緒,必須是 timeout IDLE 內的池中執行緒,預設 timeout 是 60s,超過這個 IDLE 時長,執行緒例項將被終止及移出池。
注意,放入 CachedThreadPool 的執行緒不必擔心其結束,超過 TIMEOUT 不活動,其會自動被終止。
newFixedThreadPool(int)
- newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能隨時建新的執行緒。
- 其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子。
- 和 cacheThreadPool 不同,FixedThreadPool 沒有 IDLE 機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機制之類的),所以 FixedThreadPool 多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器。
- 從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層 池,只不過引數不同:
- fixed 池執行緒數固定,並且是0秒IDLE(無IDLE)。
- cache 池執行緒數支援 0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60 秒 IDLE 。
newScheduledThreadPool(int)
- 排程型執行緒池
- 這個池子裡的執行緒可以按 schedule 依次 delay 執行,或週期執行
SingleThreadExecutor()
- 單例執行緒,任意時間池中只能有一個執行緒
- 用的是和 cache 池和 fixed 池相同的底層池,但執行緒數目是 1-1,0 秒 IDLE(無 IDLE)
一般來說,CachedTheadPool 在程式執行過程中通常會建立與所需數量相同的執行緒,然後在它回收舊執行緒時停止建立新執行緒,因此它是合理的 Executor 的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連線的執行緒時),才需要考慮用 FixedThreadPool。(該段話摘自《Thinking in Java》第四版)
Executor 執行 Runnable 任務
通過 Executors 的以上四個靜態工廠方法獲得 ExecutorService 例項,而後呼叫該例項的 execute(Runnable command)方法即可。一旦 Runnable 任務傳遞到 execute()方法,該方法便會自動在一個執行緒上執行。下面是 Executor 執行 Runnable 任務的示例程式碼:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCachedThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName() + "執行緒被呼叫了。");
}
}
執行後的結果如下:
從結果中可以看出,pool-1-thread-1 和 pool-1-thread-2 均被呼叫了兩次,這是隨機的,execute 會首先線上程池中選擇一個已有空閒執行緒來執行任務,如果執行緒池中沒有空閒執行緒,它便會建立一個新的執行緒來執行任務。
Executor 執行 Callable 任務
在 Java 5 之後,任務分兩類:一類是實現了 Runnable 介面的類,一類是實現了 Callable 介面的類。兩者都可以被 ExecutorService 執行,但是 Runnable 任務沒有返回值,而 Callable 任務有返回值。並且 Callable 的 call()方法只能通過 ExecutorService 的 submit(Callable
Callable 介面類似於 Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常而 Callable 又返回結果,而且當獲取返回結果時可能會丟擲異常。Callable 中的 call()方法類似 Runnable 的 run()方法,區別同樣是有返回值,後者沒有。
當將一個 Callable 的物件傳遞給 ExecutorService 的 submit 方法,則該 call 方法自動在一個執行緒上執行,並且會返回執行結果 Future 物件。同樣,將 Runnable 的物件傳遞給 ExecutorService 的 submit 方法,則該 run 方法自動在一個執行緒上執行,並且會返回執行結果 Future 物件,但是在該 Future 物件上呼叫 get 方法,將返回 null。
下面給出一個 Executor 執行 Callable 任務的示例程式碼:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//建立10個任務並執行
for (int i = 0; i < 10; i++){
//使用ExecutorService執行Callable型別的任務,並將結果儲存在future變數中
Future<String> future = executorService.submit(new TaskWithResult(i));
//將任務執行結果儲存到List中
resultList.add(future);
}
//遍歷任務的結果
for (Future<String> fs : resultList){
try{
while(!fs.isDone);//Future返回如果沒有完成,則一直迴圈等待,直到Future返回完成
System.out.println(fs.get()); //列印各個執行緒(任務)執行的結果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//啟動一次順序關閉,執行以前提交的任務,但不接受新任務
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任務的具體過程,一旦任務傳給ExecutorService的submit方法,
* 則該方法自動在一個執行緒上執行
*/
public String call() throws Exception {
System.out.println("call()方法被自動呼叫!!! " + Thread.currentThread().getName());
//該返回結果將被Future的get方法得到
return "call()方法被自動呼叫,任務返回的結果是:" + id + " " + Thread.currentThread().getName();
}
}
執行結果如下:
從結果中可以同樣可以看出,submit 也是首先選擇空閒執行緒來執行任務,如果沒有,才會建立新的執行緒來執行任務。另外,需要注意:如果 Future 的返回尚未完成,則 get()方法會阻塞等待,直到 Future 完成返回,可以通過呼叫 isDone()方法判斷 Future 是否完成了返回。
自定義執行緒池
自定義執行緒池,可以用 ThreadPoolExecutor 類建立,它有多個構造方法來建立執行緒池,用該類很容易實現自定義的執行緒池,這裡先貼上示例程式:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest{
public static void main(String[] args){
//建立等待佇列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//建立執行緒池,池中儲存的執行緒數為3,允許的最大執行緒數為5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//建立七個任務
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每個任務會在一個執行緒上執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//關閉執行緒池
pool.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在執行。。。");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
執行結果如下:
從結果中可以看出,七個任務是線上程池的三個執行緒上執行的。這裡簡要說明下用到的 ThreadPoolExecuror 類的構造方法中各個引數的含義。
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)
-
corePoolSize:執行緒池中所儲存的核心執行緒數,包括空閒執行緒。
-
maximumPoolSize:池中允許的最大執行緒數。
-
keepAliveTime:執行緒池中的空閒執行緒所能持續的最長時間。
-
unit:持續時間的單位。
- workQueue:任務執行前儲存任務的佇列,僅儲存由 execute 方法提交的 Runnable 任務。
根據 ThreadPoolExecutor 原始碼前面大段的註釋,我們可以看出,當試圖通過 excute 方法講一個 Runnable 任務新增到執行緒池中時,按照如下順序來處理:
-
如果執行緒池中的執行緒數量少於 corePoolSize,即使執行緒池中有空閒執行緒,也會建立一個新的執行緒來執行新新增的任務;
-
如果執行緒池中的執行緒數量大於等於 corePoolSize,但緩衝佇列 workQueue 未滿,則將新新增的任務放到 workQueue 中,按照 FIFO 的原則依次等待執行(執行緒池中有執行緒空閒出來後依次將緩衝佇列中的任務交付給空閒的執行緒執行);
-
如果執行緒池中的執行緒數量大於等於 corePoolSize,且緩衝佇列 workQueue 已滿,但執行緒池中的執行緒數量小於 maximumPoolSize,則會建立新的執行緒來處理被新增的任務;
- 如果執行緒池中的執行緒數量等於了 maximumPoolSize,有 4 種才處理方式(該構造方法呼叫了含有 5 個引數的構造方法,並將最後一個構造方法為 RejectedExecutionHandler 型別,它在處理執行緒溢位時有 4 種方式,這裡不再細說,要了解的,自己可以閱讀下原始碼)。
總結起來,也即是說,當有新的任務要處理時,先看執行緒池中的執行緒數量是否大於 corePoolSize,再看緩衝佇列 workQueue 是否滿,最後看執行緒池中的執行緒數量是否大於 maximumPoolSize。
另外,當執行緒池中的執行緒數量大於 corePoolSize 時,如果裡面有執行緒的空閒時間超過了 keepAliveTime,就將其移除執行緒池,這樣,可以動態地調整執行緒池中執行緒的數量。
我們大致來看下 Executors 的原始碼,newCachedThreadPool 的不帶 RejectedExecutionHandler 引數(即第五個引數,執行緒數量超過 maximumPoolSize 時,指定處理方式)的構造方法如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它將 corePoolSize 設定為 0,而將 maximumPoolSize 設定為了 Integer 的最大值,執行緒空閒超過 60 秒,將會從執行緒池中移除。由於核心執行緒數為 0,因此每次新增任務,都會先從執行緒池中找空閒執行緒,如果沒有就會建立一個執行緒(SynchronousQueue
再來看 newFixedThreadPool 的不帶 RejectedExecutionHandler 引數的構造方法,如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它將 corePoolSize 和 maximumPoolSize 都設定為了 nThreads,這樣便實現了執行緒池的大小的固定,不會動態地擴大,另外,keepAliveTime 設定為了 0,也就是說執行緒只要空閒下來,就會被移除執行緒池,敢於 LinkedBlockingQueue 下面會說。
幾種排隊的策略
-
直接提交。緩衝佇列採用 SynchronousQueue,它將任務直接交給執行緒處理而不保持它們。如果不存在可用於立即執行任務的執行緒(即執行緒池中的執行緒都在工作),則試圖把任務加入緩衝佇列將會失敗,因此會構造一個新的執行緒來處理新新增的任務,並將其加入到執行緒池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool 採用的便是這種策略。
-
無界佇列。使用無界佇列(典型的便是採用預定義容量的 LinkedBlockingQueue,理論上是該緩衝佇列可以對無限多的任務排隊)將導致在所有 corePoolSize 執行緒都工作的情況下將新任務加入到緩衝佇列中。這樣,建立的執行緒就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列。newFixedThreadPool採用的便是這種策略。
- 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(一般緩衝佇列使用 ArrayBlockingQueue,並制定佇列的最大長度)有助於防止資源耗盡,但是可能較難調整和控制,佇列大小和最大池大小需要相互折衷,需要設定合理的引數。