1. 程式人生 > >執行緒池與執行緒佇列分析-優

執行緒池與執行緒佇列分析-優

·  執行緒池是物件池的一個有用的例子,它能夠節省在建立它們時候的資源開銷。並且執行緒池對系統中的執行緒數量也起到了很好的限制作用。

·  執行緒池中的執行緒數量必須仔細的設定,否則冒然增加執行緒數量只會帶來效能的下降。

·  在定製ThreadPoolExecutor時,遵循KISS原則,通常情況下會提供最好的效能。

執行緒池類為java.util.concurrent.ThreadPollExecutor,常用構造方法為:ThreadPoolExecutor(

int corePoolSize,

int maxnumPollSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler),各個屬性介紹:

corePoolSize: 執行緒池維護執行緒的最少數量

maxnumPoolSize:執行緒池維護執行緒的最大數量

keepAliveTime:執行緒池維護執行緒所允許的空閒時間

unit:執行緒池維護執行緒所允許的空閒時間的單位

workQueue:執行緒池所使用的緩衝佇列

handler:執行緒池對拒絕任務的處理策略

      一個任務通過execute(Runnable)方法被新增到執行緒池,任務就是一個Runnable型別的物件,任務的執行方法就是Runnable型別物件的run()方法。

當一個任務通過execute(Runnable)方法想新增到執行緒池時:

1.      如果此時執行緒池中數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。

2.      如果此時執行緒池中的數量等於corePoolSize,但是緩衝佇列workQueue未滿,那麼任務放入緩衝佇列

3.      如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。

4.      如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue 滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先順序為:核心執行緒 corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務

5.      當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。

unit可選的引數為java.util.concurrent.TimeUnit中的幾個靜態屬性:

NANOSECONDS、

MICROSECONDS、

MILLISECONDS、

SECONDS。

workQueue常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四個選擇:

ThreadPoolExecutor.AbortPolicy()

丟擲java.util.concurrent.RejectedExecutionException異常

ThreadPoolExecutor.CallerRunsPolicy()

當丟擲RejectedExecutionException異常時,會呼叫rejectedExecution方法

(如果主執行緒沒有關閉,則主執行緒呼叫run方法,原始碼如下

publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

)

ThreadPoolExecutor.DiscardOldestPolicy()

拋棄舊的任務

ThreadPoolExecutor.DiscardPolicy()

拋棄當前的任務

執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大 量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行集合任務時使用的執行緒)的方法。每個ThreadPoolExecutor 還維護 著一些基本的統計資料,如完成的任務數。

排隊有三種通用策略:

直接提交。工作佇列的預設選項是 SynchronousQueue,它將任務直接提 交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有 內部依賴性的請求集合時出現鎖定。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數 連續到達時,此策略允許無界執行緒具有增長的可能性。

無界佇列。使用無界佇列(例如,不具有預定義容量 的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙的情況下將新任務加入佇列。這樣,建立的執行緒就不會超 過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合 於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有 增長的可能性。

有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列 (如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型 池可以最大限度地降低CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊 界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降 低吞吐量。

掛鉤方法

此類提供 protected 可重寫 的 beforeExecute(java.lang.Thread,java.lang.Runnable) 和 afterExecute(java.lang.Runnable,java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後呼叫。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計資訊或新增日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成 的所有特殊處理。

如果掛鉤或回撥方法丟擲異常,則內部輔助執行緒將依次失敗並突然終止。

佇列維護

方法 getQueue() 允許出於監控和除錯目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行儲存回收。

強烈建議程式設計師使用較為方便的 Executors 工廠方 法 Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池) 和 Executors.newSingleThreadExecutor()(單個後臺執行緒),它們均為大多數使用場景預定義了設定。

  newFixedThreadPool和newSingleThreadExectuor預設使用的是一個無限的LinkedBlockingQueue。如果所有的工作者執行緒都處於忙碌狀態,任務會在佇列中等候。如果任務持續快速到達,超過了它們被執行的速度,隊 列也會無限制地增加。穩妥的策略是使用有限佇列,比如ArrayBlockingQueue或有限的LinkedBlockingQueue以及 PriorityBlockingQueue。newCachedThreadPool使用SynchronousQueue,完全繞開佇列,直接將任務由生產者交給工作者執行緒。非常重要的一點,可以使用PriorityBlockingQueue通過優先順序安排任務,這樣幾個子任務嚴格按照順序執行的時候,可以用這個來實現哈。

newFixedThreadPool()/newSingleThreadPool()

建立執行緒數固定大小的執行緒池。 由於使用了LinkedBlockingQueue所以maximumPoolSize 沒用(除非用ArrayBlockingQueue或者指定linkedBlockingQueue的長度),當corePoolSize滿了之後就加入到LinkedBlockingQueue佇列中。每當某個執行緒執行完成之後就從LinkedBlockingQueue佇列中取一個。所以這個是建立固定大小的執行緒池。

無界限的佇列可以是諸如LinkedBlockingQueue這種型別,在這種情況下,任何被提交的任務都不會被拒絕。但是執行緒池會忽略最大執行緒數這一引數,意味著執行緒池的最大執行緒數就變成了設定的最小執行緒數。所以在使用這種佇列時,通常會將最大執行緒數設定的和最小執行緒數相等。這就相當於使用了一個固定 了執行緒數量的執行緒池。

newCachedThreadPool()

建立可緩衝的執行緒池。沒有大小限制。由於corePoolSize為0所以任務會放入SynchronousQueue佇列中,SynchronousQueue只能存放大小為1,所以會立刻新起執行緒,由於maxumumPoolSize為Integer.MAX_VALUE所以可以認為大小為2147483647。受記憶體大小限制。任務佇列是 SynchronousQueue 這個佇列的特點是,它並不能放置任何任務在其佇列中,當有任務被提交時,使用SynchronousQueue的執行緒池會立即為該任務建立一個執行緒(如果 執行緒數量沒有達到最大時,如果達到了最大,那麼該任務會被拒絕)。這種佇列適合於當任務數量較小時採用。也就是說,在使用這種佇列時,未被執行的任務沒有 一個容器來暫時儲存。

關於BlockingQueue補充:

BlockingQueue也是java.util.concurrent下的主要用來控制執行緒同步的工具。

BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1、ArrayBlockingQueue:一個由陣列支援的有界阻塞佇列,規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的。


2、LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的 BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的。


3、PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序。


4、SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在佇列滿的時候 會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來

Java7中引入了一種新的執行緒池:ForkJoinPool。

它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的 任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-ConquerAlgorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會 做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序 對它們進行排序。

那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法像任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行

ForkJoinPool的另外一個特性是它能夠實現工作竊取(Work Stealing),在該執行緒池的每個執行緒中會維護一個佇列來存放需要被執行的任務。當執行緒自身佇列中的任務都執行完畢後,它會從別的執行緒中拿到未被執行的任務並幫助它執行。

可以通過以下的程式碼來測試ForkJoinPool的Work Stealing特性:

for (int i = first; i <= last; i++) {

    if (d[i] <0.5) {

        subCount++;

    }

    for (int j = 0;j < d.length - i; j++) {

        for (int k= 0; k < 100; k++) {

            dummy =j * k + i; // dummy is volatile, so multiple writes occur

            d[i] =dummy;

        }

    }

}

因為裡層的迴圈次數(j)是依賴於外層的i的值的,所以這段程式碼的執行時間依賴於i的值。當i = 0時,執行時間最長,而i= last時執行時間最短。也就意味著任務的工作量是不一樣的,當i的值較小時,任務的工作量大,隨著i逐漸增加,任務的工作量變小。因此這是一個典型的任 務負載不均衡的場景。

這時,選擇ThreadPoolExecutor就不合適了,因為它其中的執行緒並不會關注每個任務之間任務量的差異。當執行任務量最小的任務的執行緒執行完畢後,它就會處於空閒的狀態(Idle),等待任務量最大的任務執行完畢。

而ForkJoinPool的情況就不同了,即使任務的工作量有差別,當某個執行緒在執行工作量大的任務時,其他的空閒執行緒會幫助它完成剩下的任務。因此,提高了執行緒的利用率,從而提高了整體效能。

自動並行化(Automatic Parallelization)

在Java8中,引入了自動並行化的概念。它能夠讓一部分Java程式碼自動地以並行的方式執行,前提是使用了ForkJoinPool。

Java 8為ForkJoinPool添加了一個通用執行緒池,這個執行緒池用來處理那些沒有被顯式提交到任何執行緒池的任務。它是ForkJoinPool型別上的一個靜態元素,它擁有的預設執行緒數量等於執行計算機上的處理器數量。

當呼叫Arrays類上新增的新方法時,自動並行化就會發生。比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新新增的Stream API中。

比如下面的程式碼用來遍歷列表中的元素並執行需要的計算:

Stream stream = arrayList.parallelStream();

stream.forEach(a -> {

    String symbol =StockPriceUtils.makeSymbol(a);

   StockPriceHistory sph = new StockPriceHistoryImpl(symbol, startDate,endDate, entityManager);

});

對於列表中的元素的計算都會以並行的方式執行。forEach方法會為每個元素的計算操作建立一個任務,該任務會被前文中提到的ForkJoinPool中的通用執行緒池處理。以上的平行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就程式碼的可讀性和程式碼量而言,使用 ForkJoinPool明顯更勝一籌。

對於ForkJoinPool通用執行緒池的執行緒數量,通常使用預設值就可以了,即執行時計算機的處理器數量。如果需要調整執行緒數量,可以通過設定系統屬性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

下面的一組資料用來比較使用ThreadPoolExecutor和ForkJoinPool中的通用執行緒池來完成上面簡單計算時的效能:

執行緒數

ThreadPoolExecutor(秒)

ForkJoinPool Common Pool(秒)

1

255.6

135.4

2

134.8

110.2

4

77.0

96.5

8

81.7

84.0

16

85.6

84.6

注意到當執行緒數為1,2,4時,效能差異的比較明顯。執行緒數為1的ForkJoinPool通用執行緒池和執行緒數為2的ThreadPoolExecutor的效能十分接近。

出現這種現象的原因是,forEach方法用了一些小把戲。它會將執行forEach本身的執行緒也作為執行緒池中的一個工作執行緒。因此,即使將 ForkJoinPool的通用執行緒池的執行緒數量設定為1,實際上也會有2個工作執行緒。因此在使用forEach的時候,執行緒數為1的 ForkJoinPool通用執行緒池和執行緒數為2的ThreadPoolExecutor是等價的。

所以當ForkJoinPool通用執行緒池實際需要4個工作執行緒時,可以將它設定成3,那麼在執行時可用的工作執行緒就是4了