Java併發程式設計:執行緒池的使用
Thread和Runnable
首先,多執行緒的實現方式兩種:一種是繼承Thread類,另一種是實現Runnable介面。
那麼這兩種方法的區別何在?該如何選擇?
第一:他們之間的關係
檢視J2EE的API看到
Thread類中: public class Thread extends Object implements Runnable
Runnable介面:public interfaceRunnable
明顯可知兩者:Thread類是Runnable介面的一個實現類,那麼Runnable介面是何用?
文件解釋:
Runnable 介面應該由那些打算通過某一執行緒執行其例項的類來實現。類必須定義一個稱為 run 的無引數方法。
設計該介面的目的是為希望在活動時執行程式碼的物件提供一個公共協議。例如,Thread 類實現了 Runnable。啟用的意思是說某個執行緒已啟動並且尚未停止。
也就是說Runnable提供的是一種執行緒執行規範,具體執行執行緒需要通過它的實現類
第二:通過原始碼分析
我們以public class Thread1 extends Thread 這個自定義執行緒來追本溯源:首先檢視Thread類
其中定義了一個private Runnable target; 定義了Runnable型別的屬性target,檢視哪裡有引用
幾個方法:
private void init(ThreadGroup g, Runnable target, String name, long stackSize) {。。。} public Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); } public Thread(Runnable target) { init(null, target, "Thread-" + nextThreadNum(), 0); } public Thread(ThreadGroup group, Runnable target) { init(group, target, "Thread-" + nextThreadNum(), 0); } public Thread(String name) { init(null, null, name, 0); } public Thread(ThreadGroup group, String name) { init(group, null, name, 0); } public Thread(Runnable target, String name) { init(null, target, name, 0); } public Thread(ThreadGroup group, Runnable target, String name) { init(group, target, name, 0); } public Thread(ThreadGroup group, Runnable target, String name, long stackSize) { init(group, target, name, stackSize); }
以上列出了Thread的一個初始化方法init()和所有的構造方法:可以知道,構造方法需要呼叫init()方法初始化執行緒物件,有一個Runnable型別的target物件也參與初始化。
我們所知道的Thread類進行執行執行緒時是呼叫start()方法,我們也來檢視這個方法:
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); start0(); if (stopBeforeStart) { stop0(throwableFromStop); } }
可知:當呼叫這個start()方法時,使該執行緒開始執行;Java 虛擬機器呼叫該執行緒的 run 方法。結果是兩個執行緒併發地執行;當前執行緒(從呼叫返回給 start 方法)和另一個執行緒(執行其 run 方法)。
這個方法僅作了執行緒狀態的判斷(保證一個執行緒不多次啟動,多次啟動在JVM看來這是非法的),然後把該執行緒新增到執行緒組(不多做解釋)等待執行。
那麼繼續看run()方法:thread的run只是呼叫了runable的對像,再呼叫runable物件的方法
public void run() {
if (target != null) {
target.run();
}
}
可知,當Runnable實現類物件沒有內容為null,則方法什麼都不執行,如果有實現類物件,就呼叫它實現類物件實現的run()方法。這讓我們想到了一個經典的設計模式:代理模式
到此我們知道:執行緒的執行是依靠Thread類中的start()方法執行,並且由虛擬機器呼叫run()方法,所以我們必須實現run()方法
那麼還是要看一下Runnable介面的設計:
public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
可知它只有一個抽象的run()方法,完全是實實在在的執行緒執行規範
第三:通過他們之間的設計模式:代理模式 再次深入
代理模式如圖:
可知,Thread也是Runnable介面的子類,但其沒有完全實現run()方法,所以說如果繼承Thread類實現多執行緒,仍舊需要覆寫run()方法。
看兩種實現多執行緒的基本方式
繼承Thread:
class MyThread extends Thread{
private int ticket = 5;
public void run(){
for(int i = 0;i<100;i++){
if(ticket>0){//判斷是否還有剩餘票
System.out.println("賣票,ticket = "+ticket--);
}
}
}
};
public class ThreadDemo04{
public static void main(String args[]){
MyThread mt1 = new MyThread();
MyThread mt2 = new MyThread();
MyThread mt3 = new MyThread();
mt1.start();//呼叫執行緒主體讓其執行
mt2.start();//三個地方同時賣票
mt3.start();
}
};
實現Runnable:
class MyThread implements Runnable{
private int ticket=5;
public void run(){
for(int i = 0;i<100;i++){
if(ticket>0){
System.out.println("賣票,ticket = "+ticket--);
}
}
}
};
public class RunnableDemo02{
public static void main(String args[]){
MyThread my1 = new MyThread();
new Thread(my1).start(); //啟動三個執行緒
new Thread(my1).start(); //共享my1中資源
new Thread(my1).start();
}
};
可知最後:無論哪種方法都需要實現run()方法,run方法是執行緒的執行主體。並且,執行緒的執行都是呼叫Thread的start()方法。
那麼代理模式中Thread類就充當了代理類,它線上程執行主體執行前作了一些操作然後才執行執行緒的run()。首先說一下代理模式的基本特徵就是對【代理目標進行增強】代理模式就不在這裡詳述。總之,Thread提供了很多有關執行緒執行前、後的操作,然後通過它的start()方法讓JVM自動呼叫目標的run()方法
第四:繼承Thread與實現Runnable介面方法區別
首先看一段程式碼:
new Thread(
new Runnable(){
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("runnable :" + Thread.currentThread().getName());
}
}
}
){
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("thread :" + Thread.currentThread().getName());
}
}
}.start();
可以預測一下這段程式碼是執行哪一個run()方法?
根據以前java中基礎知識可知:執行start()方法後,JVM去找run()方法,然後它找到了自己的run()方法,那麼就直接執行自己的run()方法。如果找不到自己的方法它才會去找被代理的run()方法。所以它應該執行的是"thread:。。。"程式碼部分。可以把它放到一個main方法中,通過測試驗證推斷正確:
想說明的是一個面向物件的思想:即如果沒有上方第二個run()塊,那麼它執行的就是匿名Runnable實現類的run()方法。這說明什麼,說明,Thread相當於一個執行者,而執行的程式碼塊在Runnable實現類中定義好。這樣實現執行與原始碼的分離,體現了面向物件的思想。這也是他們之間的一個比較大的區別。
其他區別:
實現Runnable介面可以實現資源共享,Thread無法完成資源共享 ----- 討論第三點的兩段程式碼中:繼承Thread的:結果賣出了15張,各有各的票數。實現Runnable介面的方法:賣出5張,共享了資源
實現Runnable介面比繼承Thread類來實現多執行緒有如下明顯優點:
適合多個相同程式程式碼使用共同資源;
避免由單繼承侷限帶來的影響;
增強程式的健壯性,程式碼能夠被多個執行緒共享,程式碼資料是獨立的;
使用的選擇
通過比對區別可知:
由於面向物件的思想,以及資源共享,程式碼健壯性等,一般都是使用實現Runnable介面來實現多執行緒,也比較推薦
Java併發程式設計:執行緒池的使用
文章的主題是基於下面的舊版的一些執行緒池的講解,再加上1.8的情況與一些補充,下面為舊版原文連結,基本思想沒變,僅進行一些對比解釋
http://www.cnblogs.com/dolphin0520/p/3932921.html
在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?
在Java中可以通過執行緒池來達到這樣的效果。今天我們就來詳細講解一下Java的執行緒池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置執行緒池的大小。
以下是本文的目錄大綱:
一.Java中的ThreadPoolExecutor類
二.深入剖析執行緒池實現原理
三.使用示例
四.如何合理配置執行緒池的大小
若有不正之處請多多諒解,並歡迎批評指正。
一.Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的程式碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的原始碼具體實現,發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個引數的含義:
- corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
- maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
- keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
- unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
- workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。
- threadFactory:執行緒工廠,主要用來建立執行緒;
- handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
從上面給出的ThreadPoolExecutor類的程式碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一個抽象類,它實現了ExecutorService介面。
我們接著看ExecutorService介面的實現:
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor介面,我們看一下Executor介面的實現:
public interface Executor {
void execute(Runnable command);
}
到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。
Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;
然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;
然後ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。
submit()方法是在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()和shutdownNow()是用來關閉執行緒池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與執行緒池相關屬性的方法,有興趣的朋友可以自行查閱API。
二.深入剖析執行緒池實現原理
在上一節我們從巨集觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下執行緒池的具體實現原理,將從下面幾個方面講解:
1.執行緒池狀態
2.任務的執行
3.執行緒池中的執行緒初始化
4.任務快取佇列及排隊策略
5.任務拒絕策略
6.執行緒池的關閉
7.執行緒池容量的動態調整
1.執行緒池狀態
在ThreadPoolExecutor中定義了一個volatile變數,另外定義了幾個static final變量表示執行緒池的各個狀態
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
- runState表示當前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
當建立執行緒池後,初始時,執行緒池處於RUNNING狀態;
-
如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;
-
如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;
-
當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。
2.任務的執行
在瞭解將任務提交給執行緒池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變數:
private final BlockingQueue<Runnable> workQueue; //任務快取佇列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小 )(runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //執行緒存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心執行緒設定存活時間
private volatile int corePoolSize; //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private volatile int maximumPoolSize; //執行緒池最大能容忍的執行緒數
private volatile int poolSize; //執行緒池中當前的執行緒數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //執行緒工廠,用來建立執行緒
private int largestPoolSize; //用來記錄執行緒池中曾經出現過的最大執行緒數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
每個變數的作用都已經標明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變數。
- corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是執行緒池的大小。舉個簡單的例子:
假如有一個工廠,工廠裡面有10個工人,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;
當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然後就將任務也分配給這4個臨時工人做;
如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
-
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
-
也就是說corePoolSize就是執行緒池大小,maximumPoolSize在我看來是執行緒池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了方便理解,在本文後面還是將corePoolSize翻譯成核心池大小。
- largestPoolSize只是一個用來起記錄作用的變數,用來記錄執行緒池中曾經有過的最大執行緒數目,跟執行緒池的容量沒有任何關係。
下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:
public void execute(Runnable command) {//舊版
if (command == null)//檢視
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
public void execute(Runnable command) {//新版,本質上差不多,將addIfUnderCorePoolSize(command)直接加到了addworker裡面
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面的程式碼可能看起來不是那麼容易理解,下面我們一句一句解釋:
首先,判斷提交的任務command是否為null,若是null,則丟擲空指標異常;
接著是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由於是或條件運算子,所以先計算前半部分的值,如果執行緒池中當前執行緒數不小於核心池大小,那麼就會直接進入下面的if語句塊了。
如果執行緒池中當前執行緒數小於核心池大小,則接著執行後半部分,也就是執行
addIfUnderCorePoolSize(command)
如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。
如果執行完addIfUnderCorePoolSize這個方法返回false,然後接著判斷:
if (runState == RUNNING && workQueue.offer(command))
如果當前執行緒池處於RUNNING狀態,則將任務放入任務快取佇列;如果當前執行緒池不處於RUNNING狀態或者任務放入快取佇列失敗,則執行:
addIfUnderMaximumPoolSize(command)
如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))
這句的執行,如果說當前執行緒池處於RUNNING狀態且將任務放入任務快取佇列成功,則繼續進行判斷:
if (runState != RUNNING || poolSize == 0)
這句判斷是為了防止在將此任務新增進任務快取佇列的同時其他執行緒突然呼叫shutdown或者shutdownNow方法關閉了執行緒池的一種應急措施。如果是這樣就執行:
ensureQueuedTaskHandled(command)
進行應急處理,從名字可以看出是保證 新增到任務快取佇列中的任務得到處理。
我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:重點
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //建立執行緒去執行firstTask任務
} finally {
mainLock.unlock();
}
if (t == null) return false;
t.start();//注意了。執行緒是在這裡啟動的
return true;
}
private boolean addWorker(Runnable firstTask, boolean core) {//新版,將 addIfUnderCorePoolSize與addThread結合了起來。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//執行緒是在這裡啟動,但是可能會有疑惑下面新版worker程式碼中的runWorker有什麼用
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到執行緒池狀態的變化,先通過if語句判斷當前執行緒池中的執行緒數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有執行緒池當前執行緒數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之後,在其他執行緒中又向執行緒池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然後接著判斷執行緒池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他執行緒中呼叫了shutdown或者shutdownNow方法。然後就是執行
t = addThread(firstTask);
這個方法也非常關鍵,傳進去的引數為提交的任務,返回值為Thread型別。然後接著在下面判斷t是否為空,為空則表明建立執行緒失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則呼叫t.start()方法啟動執行緒。
我們來看一下addThread方法的實現:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); //建立一個執行緒,執行任務
if (t != null) {
w.thread = t; //將建立的執行緒的引用賦值為w的成員變數
workers.add(w);
int nt = ++poolSize; //當前執行緒數加1
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
在addThread方法中,首先用提交的任務建立了一個Worker物件,然後呼叫執行緒工廠threadFactory建立了一個新的執行緒t,然後將執行緒t的引用賦值給了Worker物件的成員變數thread,接著通過workers.add(w)將Worker物件新增到工作集當中。
下面我們看一下Worker類的實現:重點
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
volatile long completedTasks;
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
void interruptNow() {
thread.interrupt();
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
boolean ran = false;
beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,使用者可以根據
//自己需要過載這個方法和後面的afterExecute方法來進行一些統計資訊,比如某個任務的執行時間等
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this); //當任務佇列中沒有任務時,進行清理工作
}
}
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable//新版,其中與舊版一樣,執行緒是再addworker(addThread)後啟動的,那麼worker中的run方法以及方法中的runWorker又是如何實現的呢? 關鍵就在於構造方法中的this.thread = getThreadFactory().newThread(this)這個語句,其中過的this為當前的worker物件本身,而上面可以看出worker是實現了Runable介面的,因此addworker中的執行緒t是基於Worker物件本身啟動的,而thread的run方法就是對runable物件的run的呼叫,而worker重寫了run方法,因此就能start0()呼叫JVM後呼叫run時呼叫runworker方法,繼而呼叫gettask等。
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
它實際上實現了Runnable介面,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:
Thread t = new Thread(w);
相當於傳進去了一個Runnable任務,線上程t中執行這個Runnable。
既然Worker實現了Runnable介面,那麼自然最核心的方法便是run()方法了:
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
final void runWorker(Worker w) {//新版 worker自己作為runnable物件(以便thread啟動時基於worker啟動來實現自己需要的邏輯)時接收了runnable物件,最後還是用接收的runnable物件run。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在呼叫runTask()執行完firstTask之後,在while迴圈裡面不斷通過getTask()去取新的任務來執行,那麼去哪裡取呢?自然是從任務快取佇列裡面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果執行緒數大於核心池大小或者允許為核心池執行緒設定空閒時間,
//則通過poll取任務,若等待一定的時間取不到任務,則返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中斷處於空閒狀態的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
private Runnable getTask() {//新版
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在getTask中,先判斷當前執行緒池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
如果runState為SHUTDOWN或者RUNNING,則從任務快取佇列取任務。
如果當前執行緒池的執行緒數大於核心池大小corePoolSize或者允許為核心池中的執行緒設定空閒存活時間,則呼叫poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。
然後判斷取到的任務r是否為null,為null則通過呼叫workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
//如果runState大於等於STOP,或者任務快取佇列為空了
//或者 允許為核心池執行緒設定空閒存活時間並且執行緒池中的執行緒數目大於1
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
也就是說如果執行緒池處於STOP狀態、或者任務佇列已為空或者允許為核心池執行緒設定空閒存活時間並且執行緒數大於1時,允許worker退出。如果允許worker退出,則呼叫interruptIdleWorkers()中斷處於空閒狀態的worker,我們看一下interruptIdleWorkers()的實現:
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) //實際上呼叫的是worker的interruptIfIdle()方法
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
從實現可以看出,它實際上呼叫的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) { //注意這裡,是呼叫tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
//如果成功獲取了鎖,說明當前worker處於空閒狀態
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
這裡有一個非常巧妙的設計方式,假如我們來設計執行緒池,可能會有一個任務分派執行緒,當發現有執行緒空閒時,就從任務快取佇列中取一個任務交給空閒執行緒執行。但是在這裡,並沒有採用這樣的方式,因為這樣會要額外地對任務分派執行緒進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的執行緒去任務快取佇列裡面取任務來執行。
我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是線上程池中的執行緒數達到了核心池大小並且往任務佇列中新增任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。
到這裡,大部分朋友應該對任務提交給執行緒池之後到被執行的整個過程有了一個基本的瞭解,下面總結一下:
1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2)其次,要知道Worker是用來起到什麼作用的;
3)要知道任務提交給執行緒池之後的處理策略,這裡總結一下主要有4點:
如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;
如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;
如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止。
3.執行緒池中的執行緒初始化
預設情況下,建立執行緒池之後,執行緒池中是沒有執行緒的,需要提交任務之後才會建立執行緒。
在實際中如果需要執行緒池建立之後立即建立執行緒,可以通過以下兩個方法辦到:
prestartCoreThread():初始化一個核心執行緒;
prestartAllCoreThreads():初始化所有核心執行緒
下面是這2個方法的實現:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意傳進去的引數是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意傳進去的引數是null
++n;
return n;
}
注意上面傳進去的引數是null,根據第2小節的分析可知如果傳進去的引數為null,則最後執行執行緒會阻塞在getTask方法中的
1
r = workQueue.take();
即等待任務佇列中有任務。
4.任務快取佇列及排隊策略
在前面我們多次提到了任務快取佇列,即workQueue,它用來存放等待執行的任務。
workQueue的型別為BlockingQueue
1)ArrayBlockingQueue:基於陣列的先進先出佇列,此佇列建立時必須指定大小;
2)LinkedBlockingQueue:基於連結串列的先進先出佇列,如果建立時沒有指定此佇列大小,則預設為Integer.MAX_VALUE;
3)synchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。
5.任務拒絕策略
當執行緒池的任務快取佇列已滿並且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
從runworker方法中我們可以看到對應的Exception都是儲存在thrownn中,在finally中交給了 afterExecute
進行了處理。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
6.執行緒池的關閉
ThreadPoolExecutor提供了兩個方法,用於執行緒池的關閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務
shutdownNow():立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務
7.執行緒池容量的動態調整
ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設定核心池大小
setMaximumPoolSize:設定執行緒池最大能建立的執行緒數目大小
當上述引數從小變大時,ThreadPoolExecutor進行執行緒賦值,還可能立即建立新的執行緒來執行任務。
三.使用示例
前面我們討論了關於執行緒池的實現原理,這一節我們來看一下它的具體使用:
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5)
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+
executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在執行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"執行完畢");
}
}
執行結果:
正在執行task 0
執行緒池中執行緒數目:1,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
執行緒池中執行緒數目:2,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
執行緒池中執行緒數目:3,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
執行緒池中執行緒數目:4,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:1,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:2,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:3,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:4,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
執行緒池中執行緒數目:6,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
執行緒池中執行緒數目:7,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
執行緒池中執行緒數目:8,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
執行緒池中執行緒數目:9,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 13
執行緒池中執行緒數目:10,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 14
task 3執行完畢
task 0執行完畢
task 2執行完畢
task 1執行完畢
正在執行task 8
正在執行task 7
正在執行task 6
正在執行task 5
task 4執行完畢
task 10執行完畢
task 11執行完畢
task 13執行完畢
task 12執行完畢
正在執行task 9
task 14執行完畢
task 8執行完畢
task 5執行完畢
task 7執行完畢
task 6執行完畢
task 9執行完畢
從執行結果可以看出,當執行緒池中執行緒的數目大於5時,便將任務放入任務快取佇列裡面,當任務快取佇列滿了之後,便建立新的執行緒。如果上面程式中,將for迴圈中改成執行20個任務,就會丟擲任務拒絕異常了。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立執行緒池:
Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //建立容量為1的緩衝池
Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
下面是這三個靜態方法的具體實現;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從它們的具體實現來看,它們實際上也是呼叫了ThreadPoolExecutor,只不過引數都已配置好了。
newFixedThreadPool建立的執行緒池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設定為0,將maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立執行緒執行,當執行緒空閒超過60秒,就銷燬執行緒。
實際中,如果Executors提供的三個靜態方法能滿足要求,就儘量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的引數有點麻煩,要根據實際任務的型別和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
四.如何合理配置執行緒池的大小
本節來討論一個比較重要的話題:如何合理配置執行緒池大小,僅供參考。
一般需要根據任務的型別來配置執行緒池大小:
如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
如果是IO密集型任務,參考值可以設定為2*NCPU
當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,再觀察任務執行情況和系統負載、資源利用率來進行適當調整。