Java執行緒池技術之二 Java自帶執行緒池實現
一,介紹
類檢視如下:
自Java 1.5後,Java對執行緒相關的庫做了很大的拓展,執行緒池就是其中之一。Java執行緒的新特性多數在java.util.concurrent,其包含眾多的介面和類。其中java.util.concurrent.Executor是這些類的最頂級介面。其是執行執行緒的一個工具。ExecutorService在Executor的基礎上增加了一些方法,以向執行緒池提交任務。包括實現Runnable介面的任務和實現Callable介面的任務。兩個方法為:
Future<?> submit(Runnable task)
Future<T> submit(Callable<T> task)
實現Runnable介面的任務,其run方法的返回值為void,因此submit的返回值為Future<?>型別,?表示匹配任意一種型別。實現Callable<T>介面的任務,其call方法的返回值型別為T,對應submit的返回值型別則是Future<T>。具有返回值的Callable的任務對於多個執行緒中傳遞狀態和結果是非常有用的。另外使用這兩個方法返回的Future物件可以阻塞當前執行緒直到提交的任務執行完畢以獲取結果,也可以用以取消任務的執行,或者檢測任務是否被取消或者是否執行完畢。如果不使用Future,我們檢測一個執行緒是否執行完畢通常使用Thread.join()或者對狀態標識進行輪詢。
阻塞等待結果,或者在某個時候獲取結果的方法是,呼叫Futrue的get()方法。這樣呼叫執行緒便會等待線上程執行完成並將結果反饋到Futrue物件。之後呼叫執行緒可以對處理結果進行判別處理。isDone(),isCancelled()可以用以判斷當前的任務執行狀態(注意被打斷或取消也是Done的一種)。cancel(Boolean)方法則可以用以取消執行緒執行。如果引數是true表示即使任務已經執行也將試圖取消它,如果是false則表示如果任務沒有被執行則取消,執行了則不取消執行。
可以使用Executor的靜態方法建立實現ExecutorService的執行緒池類。對應方法建立的執行緒池描述如下:
1,newSingleThreadExecutor,單執行緒執行緒池。單個執行緒執行器,內有一個執行緒執行Runnable的任務。如果該執行緒發生異常終了,則建立新的進行補充。可以保證任務順 序的執行。
2,newFixedThreadPool,建立固定執行緒數目(引數指定)的執行緒池。每加入一個任務,建立一個執行緒,直到達到固定數目後,將會有固定大小的執行緒執行Runnable的任務。如果有執行緒異常終了,則建立新的執行緒來進行補充。
3,newCachedThreadPool, 緩衝執行緒池,產生一個大小可變的執行緒池。當執行緒池的執行緒多於執行任務所需要的執行緒的時候,對空閒執行緒(即60s沒有任務執行)進行回收;當執行任務的執行緒數不足的時候,自動拓展執行緒數量。因此執行緒數量是JVM可建立執行緒的最大數目。
4,newSingleThreadScheduledExecutor,單個執行緒排程執行器,產生單個執行緒執行任務,採用schedule方法可以延期或者定期的執行任務
5,newScheduledThreadPool,排程執行緒執行器,當有任務的時候,建立執行緒,直到預設執行緒數量(引數指定),當執行任務所需的執行緒多於該數目的時候,自動拓展執行緒數量,沒有上限;如果執行緒空閒則被回收,直到預設執行緒數量。
6,另外可以自定義執行緒池,請參見使用程式碼例子。
另外注意CompletionService介面,其能夠根據任務的執行先後順序得到執行結果。
二,使用例子
//兩種任務
package poolmanager;
public class RunnableTask implements Runnable {
int i = 0;
public RunnableTask(int i) {
this.i = i;
}
public void run() {
System.out.println("******************");
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task is :" + i);
}
}
package poolmanager;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<String> {
public String ok = "OK!";
public String ng = "Error Happened in Running!";
int i = 0;
public CallableTask(int i){
this.i = i;
}
public String call(){
System.out.println("******************");
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task is :" + i);
return ok;
}
}
//執行緒管理類
package poolmanager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class JavaThreadPoolManager {
int ThreadSize = 2;
int ThreadMaxSize = 2*ThreadSize;
ExecutorService executorSingleThreadExecutor = null;
ExecutorService executorFixedThreadPool = null;
ExecutorService executorCachedThreadPool = null;
ScheduledExecutorService executorSingleThreadScheduledExcutor = null;
ScheduledExecutorService executorScheduledThreadPool = null;
//自定義執行緒池
BlockingQueue<Runnable> bqueue = null;
ThreadPoolExecutor pool = null;
public void startJavaThreadPoolManager(){
//單個執行緒執行器,內有一個執行緒執行Runnable的任務。
//如果該執行緒發生異常終了,則建立新的進行補充。
//可以保證任務順序的執行。
executorSingleThreadExecutor = Executors.newSingleThreadExecutor();
//固定數目的執行緒執行器。
//每加入一個任務,建立一個執行緒,直到達到固定數目後,
//將會有固定大小的執行緒執行Runnable的任務。
//如果有執行緒異常終了,則建立新的執行緒來進行補充。
executorFixedThreadPool = Executors.newFixedThreadPool(ThreadSize);
//緩衝執行緒執行器,產生一個大小可變的執行緒池。
//當執行緒池的執行緒多於執行任務所需要的執行緒的時候,
//對空閒執行緒(即60s沒有任務執行)進行回收;
//當執行任務的執行緒數不足的時候,自動拓展執行緒數量。因此執行緒數量是JVM
//可建立執行緒的最大數目。
executorCachedThreadPool = Executors.newCachedThreadPool();
//單個執行緒排程執行器,產生單個執行緒執行任務,採用schedule方法可以延期或者定期的執行任務
executorSingleThreadScheduledExcutor = Executors
.newSingleThreadScheduledExecutor();
//排程執行緒執行器,當有任務的時候,建立執行緒,直到預設執行緒數量(引數指定),
//當執行任務所需的執行緒多於該數目的時候,
//自動拓展執行緒數量,沒有上限;
//如果執行緒空閒則被回收,直到預設執行緒數量。
executorScheduledThreadPool = Executors.newScheduledThreadPool(ThreadSize);
//自定義執行緒池
bqueue = new ArrayBlockingQueue<Runnable>(20);
//第三個引數是執行緒池執行緒所允許的空閒時間
//第四個引數是執行緒池執行緒空閒時間的單位
//第五個引數是緩衝任務佇列
//第六個引數是緩衝區滿的時候,對任務的處理策略
//第六個引數有如下幾種選擇:
//ThreadPoolExecutor.AbortPolicy()
//丟擲java.util.concurrent.RejectedExecutionException異常
//ThreadPoolExecutor.CallerRunsPolicy()
//重試添加當前的任務,即自動以任務為引數,再次呼叫execute()方法
//ThreadPoolExecutor.DiscardOldestPolicy()
//拋棄一個已有的任務(拋棄當前任務佇列的頭部任務,即最開始加入的任務)
//ThreadPoolExecutor.DiscardPolicy()
//拋棄當前這個任務
pool = new ThreadPoolExecutor(
ThreadSize,ThreadMaxSize,2,TimeUnit.MILLISECONDS,bqueue
,new ThreadPoolExecutor.DiscardOldestPolicy());
}
public void endJavaThreadPoolManager(){
executorSingleThreadExecutor.shutdown();
executorFixedThreadPool.shutdown();
executorCachedThreadPool.shutdown();
executorSingleThreadScheduledExcutor.shutdown();
executorScheduledThreadPool.shutdown();
pool.shutdown();
}
}
//使用
package poolmanager;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MainThread {
public static void main(String[] args) throws InterruptedException {
JavaThreadPoolManager jThreadPoolManager = new JavaThreadPoolManager();
jThreadPoolManager.startJavaThreadPoolManager();
RunnableTask command1 = new RunnableTask(1);
CallableTask command2 = new CallableTask(2);
jThreadPoolManager.executorSingleThreadExecutor.execute(command1);
jThreadPoolManager.executorSingleThreadExecutor.submit(command2);
jThreadPoolManager.executorFixedThreadPool.execute(command1);
jThreadPoolManager.executorFixedThreadPool.submit(command2);
jThreadPoolManager.executorCachedThreadPool.execute(command1);
jThreadPoolManager.executorCachedThreadPool.submit(command2);
jThreadPoolManager.executorScheduledThreadPool.execute(command1);
jThreadPoolManager.executorScheduledThreadPool.submit(command2);
jThreadPoolManager.executorSingleThreadScheduledExcutor.execute(command1);
jThreadPoolManager.executorSingleThreadScheduledExcutor.submit(command2);
jThreadPoolManager.executorScheduledThreadPool.execute(command1);
jThreadPoolManager.executorScheduledThreadPool.schedule(command1,60, TimeUnit.SECONDS);
jThreadPoolManager.executorScheduledThreadPool.schedule(command2,60, TimeUnit.SECONDS);
jThreadPoolManager.executorSingleThreadScheduledExcutor.scheduleWithFixedDelay(command1, 0, 20, TimeUnit.SECONDS);
Future<String> returnFuture =
jThreadPoolManager.executorSingleThreadScheduledExcutor.submit(command2);
System.out.println(returnFuture.isCancelled());
System.out.println(returnFuture.isDone());
returnFuture.cancel(true);
System.out.println(returnFuture.isCancelled());
System.out.println(returnFuture.isDone());
String resultString =null;
if(!returnFuture.isCancelled()){
try {
resultString = returnFuture.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(resultString);
}
jThreadPoolManager.pool.execute(command1);
}
}
三,再討論
我們看到自定義執行緒池的時候,可以自定義任務佇列,其中有如下幾種任務佇列的自定義方法:
1,直接提交 選擇任務佇列為 SynchronousQueue
的,則預設的佇列大小為1,在這種情況下,如果沒有空閒執行緒,則往往直接建立新的執行緒執行該任務,因此往往需要不設立執行緒的數目上限。該策略可以避免在處理可能具有內部依賴的請求集時出現鎖,因為其任務執行順序必然是先加入的先被執行。
2,無界佇列 選擇任務佇列為LinkedBlockingQueue 其意義即任務佇列大小無限制。這樣建立的執行緒往往不超過corePoolSize。因此maximumPoolSize的值就沒有什麼意義。適合於任務相互獨立。
3,有界佇列 使用具有最大執行緒數目限制maximumPoolSize的時候,有界佇列例如ArrayBlockingQueue有助於防止資源耗盡,但可能較難控制。
使用大佇列小池子有利於降低CPU使用,上下文切換,但是吞吐量不高;如果小佇列大池子,則CPU使用率高,但是可能排程開銷大,也會降低吞吐量。
另外,選擇上述任務佇列的方案,需要與corePoolSize和maximumPoolSizes這對引數配合。
因此,執行緒池的選擇要根據任務的型別,比如任務是數量大、單個執行時間短,還是任務數量小、單個執行長,還是任務數量又多,執行時間又長,來進行不同的選擇。而自定義執行緒池也是需要根據任務的型別選擇任務佇列、corePoolSize和maximumPoolSizes。
此外,keepAliveTime和maximumPoolSize及BlockingQueue的型別均有關係。如果BlockingQueue是無界的,那麼永遠不會觸發maximumPoolSize,自然keepAliveTime也就沒有了意義。如果很容易就產生執行緒的回收,也會導致效能下降。
參考:
1,JDK Document
2, http://www.cnblogs.com/jersey/archive/2011/03/30/2000231.html