1. 程式人生 > >JAVA執行緒池,ThreadPoolExecutor實現的四種執行緒池

JAVA執行緒池,ThreadPoolExecutor實現的四種執行緒池

執行緒池

在JAVA中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便。但是就會有一個問題,如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。

那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?這樣Java執行緒池出現了。

new Thread的弊端:

a. 每次new Thread新建物件效能差。

b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。

c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。

相比new Thread,執行緒池的好處:

a. 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。

b. 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。

c. 提供定時執行、定期執行、單執行緒、併發數控制等功能。

ThreadPoolExecutor詳解

ThreadPoolExecutor完整構造方法是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

corePoolSize - 核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中。

maximumPoolSize - 執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒。

keepAliveTime - 表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0。

unit - 引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

workQueue - 一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:

ArrayBlockingQueue;
LinkedBlockingQueue;

SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。

threadFactory - 執行程式建立新執行緒時使用的工廠,主要用來建立執行緒。

handler - 表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

Java通過Executors提供四種執行緒池,底層均是由ThreadPoolExecutor實現的,分別為:

newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。

newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

(1)newCachedThreadPool

建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();  
for (int i = 0; i < 10; i++) {  
    final int index = i;  
    try {  
        Thread.sleep(index * 1000);  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }  
  
    cachedThreadPool.execute(new Runnable() {  
  
        @Override  
        public void run() {  
            System.out.println(index);  
        }  
    });  
}

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

(2)newFixedThreadPool

建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);  
for (int i = 0; i < 10; i++) {  
    final int index = i;  
    fixedThreadPool.execute(new Runnable() {  
  
  
        @Override  
        public void run() {  
            try {  
                System.out.println(index);  
                Thread.sleep(2000);  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
    });  
}  
因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。

定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache。

(3)newScheduledThreadPool

建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
scheduledThreadPool.schedule(new Runnable() {  
  
    @Override  
    public void run() {  
        System.out.println("delay 3 seconds");  
    }  
}, 3, TimeUnit.SECONDS);  

表示延遲3秒執行。定期執行示例程式碼如下:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {  
  
    @Override  
    public void run() {  
        System.out.println("delay 1 seconds, and excute every 3 seconds");  
    }  
}, 1, 3, TimeUnit.SECONDS);  

表示延遲1秒後每3秒執行一次。

(4)newSingleThreadExecutor

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();  
for (int i = 0; i < 10; i++) {  
    final int index = i;  
    singleThreadExecutor.execute(new Runnable() {  
  
        @Override  
        public void run() {  
            try {  
                System.out.println(index);  
                Thread.sleep(2000);  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
    });  
}  

結果依次輸出,相當於順序執行各個任務。

ThreadPoolExecutor例子一:使用直接提交策略,也即SynchronousQueue

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次新增元素後必須等待其他執行緒取走後才能繼續新增。在這裡不是核心執行緒便是新建立的執行緒,但是我們試想一樣下,下面的場景。

我們使用一下引數構造ThreadPoolExecutor:

new ThreadPoolExecutor(   
2, 3, 30, TimeUnit.SECONDS,    
new  SynchronousQueue<Runnable>(),    
new RecorderThreadFactory("CookieRecorderPool"),    
new ThreadPoolExecutor.CallerRunsPolicy());  

當核心執行緒已經有2個正在執行。

此時繼續來了一個任務(A),根據前面介紹的“如果執行的執行緒等於或多於 corePoolSize,則Executor始終首選將請求加入佇列,而不新增新的執行緒。”,所以A被新增到queue中。

又來了一個任務(B),且核心2個執行緒還沒有忙完,OK,接下來首先嚐試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。

此時便滿足了上面提到的“如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個執行緒來執行這個任務。

暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個新增入queue中,後一個呢?queue中無法插入,而執行緒數達到了maximumPoolSize,所以只好執行異常策略了。

所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界佇列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。

什麼意思?如果你的任務A1,A2有內部關聯,A1需要先執行,那麼先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麼有被執行前,A2不可能新增入queue中。

ThreadPoolExecutor例子二:使用無界佇列策略,即LinkedBlockingQueue

這個就拿newFixedThreadPool來說,根據前文提到的規則:

如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。那麼當任務繼續增加,會發生什麼呢?

如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。OK,此時任務變加入佇列之中了,那什麼時候才會新增新執行緒呢?

如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。這裡就很有意思了,可能會出現無法加入佇列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界佇列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的執行緒!corePoolSize大小的執行緒數會一直執行,忙完當前的,就從佇列中拿任務開始執行。所以要防止任務瘋長,比如任務執行的實行比較長,而新增任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。

ThreadPoolExecutor例子三:有界佇列,使用ArrayBlockingQueue

這個是最為複雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

舉例來說,請看如下構造方法:

new ThreadPoolExecutor(   
2, 4, 30, TimeUnit.SECONDS,    
new ArrayBlockingQueue<Runnable>(2),    
new RecorderThreadFactory("CookieRecorderPool"),    
new ThreadPoolExecutor.CallerRunsPolicy());

假設,所有的任務都永遠無法執行完。

對於首先來的A,B來說直接執行,接下來,如果來了C,D,他們會被放到queue中,如果接下來再來E,F,則增加執行緒執行E,F。但是如果再來任務,佇列無法再接受了,執行緒數也到達最大的限制了,所以就會使用拒絕策略來處理。

拒絕策略:

keepAliveTime

jdk中的解釋是:當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。

有點拗口,其實這個不難理解,在使用了“池”的應用中,大多都有類似的引數需要配置。比如資料庫連線池,DBCP中的maxIdle,minIdle引數。

什麼意思?接著上面的解釋,後來向老闆派來的工人始終是“借來的”,俗話說“有借就有還”,但這裡的問題就是什麼時候還了,如果借來的工人剛完成一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆肯定頭也大死了。

合理的策略:既然借了,那就多借一會兒。直到“某一段”時間後,發現再也用不到這些工人時,便可以還回去了。這裡的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。

RejectedExecutionHandler

另一種情況便是,即使向老闆借了工人,但是任務還是繼續過來,還是忙不過來,這時整個隊伍只好拒絕接受了。

RejectedExecutionHandler介面提供了對於拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經預設包含了4中策略,因為原始碼非常簡單,這裡直接貼出來。

CallerRunsPolicy

執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	if (!e.isShutdown()) {
		r.run();
	}
}

這個策略顯然不想放棄執行任務。但是由於池中已經沒有任何資源了,那麼就直接使用呼叫該execute的執行緒本身來執行。

AbortPolicy

處理程式遭到拒絕將丟擲執行時RejectedExecutionException。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	throw new RejectedExecutionException();
}

這種策略直接丟擲異常,丟棄任務。

DiscardPolicy

不能執行的任務將被刪除。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不丟擲異常。

DiscardOldestPolicy

如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	if (!e.isShutdown()) {
		e.getQueue().poll();
		e.execute(r);
	}
}

該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉快取在佇列中的最早的任務,然後重新嘗試執行該任務。這個策略需要適當小心。

設想:如果其他執行緒都還在執行,那麼新來任務踢掉舊任務,快取在queue中,再來一個任務又會踢掉queue中最老任務。

策略總結:

keepAliveTime和maximumPoolSize及BlockingQueue的型別均有關係。如果BlockingQueue是無界的,那麼永遠不會觸發maximumPoolSize,自然keepAliveTime也就沒有了意義。

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(nThreads, nThreads,
	0L, TimeUnit.MILLISECONDS,
	new LinkedBlockingQueue<Runnable>());
}

反之,如果核心數較小,有界BlockingQueue數值又較小,同時keepAliveTime又設的很小,如果任務頻繁,那麼系統就會頻繁的申請回收執行緒。

那麼如何合理配置執行緒池的大小?一般需要根據任務的型別來配置執行緒池大小:

1、如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1。

2、如果是IO密集型任務,參考值可以設定為2*NCPU。

當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,再觀察任務執行情況和系統負載、資源利用率來進行適當調整。