1. 程式人生 > >理解Java併發工具包執行緒池的設計

理解Java併發工具包執行緒池的設計

為什麼需要執行緒池?

答:主要原因是因為建立一個執行緒開銷太大,尤其是對大量的小任務需要執行這種場景。

在Java裡面建立一個執行緒,需要包含的東西:

(1)它為一個執行緒堆疊分配記憶體,該堆疊為每個執行緒方法呼叫儲存一個幀

(2)每個幀由區域性變數陣列,返回值,運算元棧,常量池組成

(3)某些JVM會為本地方法分配一個本地棧

(4)每個執行緒有一個程式計數器,用來告訴程序當前的指令執行到什麼地方

(5)作業系統建立一個本機執行緒與java執行緒相對應

(6)檔案描述符需要被建立,初始化然後新增到JVM內部的資料結構裡面

(7)執行緒共享堆和方法區的記憶體

建立執行緒的流程依賴底層的作業系統,不同的作業系統可能不一樣,此外更多的執行緒意味著 OS排程需要做更多的工作來決定哪一個執行緒可以訪問資源,並且要通過OS排程切換維護執行緒的各種狀態。

執行緒池的優點

(1)降低資源消耗。通過重複利用已經建立的執行緒來降低各種資源消耗包括(執行緒的建立,銷燬,狀態的切換)

(2)提高響應速度。請求或者任務到達時直接處理。

(3)將任務的提交與任務執行分離,降低耦合。

(4)提高執行緒的可管理性。 使用執行緒池進行資源的統一分配,調優和監控。

Java執行緒池的相關設計

執行緒池有關的介面和類

Java併發包在Java語言層面實現了自己的執行緒池,抽象封裝了執行緒池的相關內容,從而可以做到更細粒度的資源控制:

與執行緒池相關的介面和類如下:

Executor介面:一個介面僅僅包含一個方法execute用來執行Runnable任務,主要定義了:

(1)通過這個介面就可以實現將任務提交和任務執行解耦,包括執行緒的詳細使用,排程任務等等。

ExecutorService介面繼承Executor介面:這個介面的主要定義了

(1)執行緒的執行緒池的關閉策略

shutdown() 告訴執行緒池,不能在接受新的任務,但是已經提交的任何或在等待執行的任務會繼續執行

shutdownNow() 直接傳送打斷訊號,讓執行緒優雅的停止,如果忽略了中斷訊號,那麼這個方法和shutdown方法作用一樣

執行緒池關閉後,不會有任務還在執行,也不會有任務在等待執行,並且也不會有新的任務可以被提交,對於 不使用的 ExecutorService我們應該將其關閉,並回收其資源

(2)可以產生一個Future介面,用來跟蹤一個或多個非同步任務的執行進展

這個介面的submit方法,相當於是對Executor.execute(Runnable)介面方法的擴充套件,這個方法在提交任務之後,可以返回 一個Future介面,用於取消任務或者等待完成。 此外這個介面的invokeAny方法和invokeAll 方法可以用來執行一批任務, 然後等待至少一個或者全部任務完成。

最後這個介面還有一個awaitTermination方法,因為shutDown方法執行後,並不會阻塞到完成,所以我們可以使用這個方法來阻塞指定的時間,如果沒有終止,就可以使用shutdownNow來發送打算訊號,然後繼續阻塞等待一定的時間,如果還沒有終止,在指定的超時後,可以採用其他的辦法,如強制退出虛擬機器等,其間如果自身被打斷,可以捕捉中斷異常,再次關閉執行緒池,如果直到正常關閉後,保留中斷的訊號。

ScheduledExecutorService介面繼承了ExecutorService介面,除了擁有父介面的定義外,該介面主要定義了:

可以排程任務在指定的延遲後執行一次或者週期性執行。

schedule方法可以建立不同的延遲任務,並返回一個task物件可以用來取消任務或者檢查執行。

scheduleAtFixedRate和scheduleWithFixedDelay可以用來建立週期性的排程任務,直到被取消。

前者是每隔固定的延遲時間執行,後者也是間隔固定的時間,但是受前一個任務的完成時間影響,只有前者完成了後者才能執行。

AbstractExecutorService抽象類繼承了ExecutorService介面:

提供了ExecutorService介面方法submit, invokeAny 和 invokeAll的預設實現,並對每一個執行任務的Runnable執行緒通過newTaskFor 方法產生了對應的RunnableFuture返回值用來跟蹤任務的執行情況。

類圖如下:

執行緒池的核心屬性

ThreadPoolExecutor執行緒池的核心類繼承了AbstractExecutorService類:

這個類定義了執行緒池核心引數和配置:

corePoolSize:執行緒池裡面的最小的保持存活的worker數量,不允許超時,除非設定了allowCoreThreadTimeOut,最小是0;可以被動態改變

maximumPoolSize:執行緒池的最大數量,注意這個取決於阻塞佇列的大小。可以被動態設定

keepAliveTime:執行緒的保持存活時間,如果超過這個時間值,沒有任務提交就關閉自己,預設情況下 核心執行緒是不受這個引數影響的,除非設定了allowCoreThreadTimeOut=true。

threadFactory:除非使用者實現自己的執行緒工廠類,否則新執行緒的建立使用Executors.defaultThreadFactory(),通過預設工廠 創建出來的執行緒具有同樣的組,優先順序,和非守護程序的狀態。提供一個不同的實現你可以修改,執行緒的名字,執行緒的組, 優先順序,守護狀態等。

handler:當阻塞佇列滿了之後的,制定的拒絕策略。

ThreadPoolExecutor類的主要構造方法如下:


new ThreadPoolExecutor
(
int corePoolSize, // 核心執行緒池的數量
int maximumPoolSize,//最大執行緒池的數量
long keepAliveTime, // 執行緒長期不用時,最大存活時間
TimeUnit unit,   // 指定存活時間的單位
BlockingQueue<Runnable> workQueue, //指定的阻塞佇列
ThreadFactory threadFactory, //指定的執行緒工廠類
RejectedExecutionHandler handler //如果佇列滿了使用的拒絕策略
)

引數比較多,所以在Executors類裡面通過靜態工廠方法,已經給我們涉及好了幾種實現,我們可以 直接使用,下面我們詳細看下最常見的幾種執行緒池,如下:

(1)固定數量的執行緒池

擁有固定數量的執行緒來處理任務,如果全部都在處理任務, 新來的任務將會進入阻塞佇列等待執行

newFixedThreadPool(int nThreads)//固定數量的執行緒池數
###這個方法的實際底層引數如下
corePoolSize=maximumPoolSize,//核心數量=最大執行緒池數量=固定數量
keepAliveTime=0//永遠不銷燬執行緒,等於0的情況,核心執行緒在不使用的時候不能被銷燬
unit=TimeUnit.MILLISECONDS//毫秒單位
workQueue=new LinkedBlockingQueue<Runnable>() //無界阻塞佇列
threadFactory=new DefaultThreadFactory()//預設的工廠
handler =new AbortPolicy() //佇列滿了,直接丟擲異常

(2)單個執行緒的執行緒池

只有1個執行緒來處理任務,如果這個執行緒正在處理任務,新來的任務將會進入阻塞佇列等待執行

newSingleThreadExecutor()//只建立一個執行緒來執行任務
###這個方法的實際底層引數如下
corePoolSize=maximumPoolSize=1,//核心數量=最大執行緒池數量=1
keepAliveTime=0//永遠不銷燬執行緒,等於0的情況,核心執行緒在不使用的時候不能被銷燬
unit=TimeUnit.MILLISECONDS//毫秒單位
workQueue=new LinkedBlockingQueue<Runnable>() //無界阻塞佇列
threadFactory=new DefaultThreadFactory()//預設的工廠
handler =new AbortPolicy() //佇列滿了,直接丟擲異常

(3)擁有快取效果的執行緒池

這個執行緒池在大量的執行時間短的非同步任務時候,效能很高,內部的用的佇列是Synchronous通過一對一 直接傳遞,可快速處理請求,並且處理完任務的執行緒會被快取60秒,期間如果還有新任務到來,可以 複用先前的執行緒直接來處理,如果當前沒有任務,那麼這些執行緒超過60秒後會自動終止。

newSingleThreadExecutor()//只建立一個執行緒來執行任務
###這個方法的實際底層引數如下
corePoolSize=0//不維護核心執行緒
maximumPoolSize=Integer.MAX_VALUE,//允許最大的執行緒數
keepAliveTime=60//超過60秒自動銷燬
unit=TimeUnit.SECONDS//單位秒
workQueue=new SynchronousQueue<Runnable>()) //無界阻塞佇列
threadFactory=new DefaultThreadFactory()//預設的工廠
handler =new AbortPolicy() //佇列滿了,直接丟擲異常

(4)延遲指定時間後執行一次任務或者週期執行任務的只擁有單個執行緒的執行緒池

這個執行緒池其實是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor提供的功能:

newScheduledThreadPool//只建立一個執行緒來執行任務
###這個方法的實際底層引數如下
corePoolSize=1//核心執行緒只有一個
maximumPoolSize=Integer.MAX_VALUE,//允許最大的執行緒數
keepAliveTime=0//永遠不銷燬執行緒,等於0的情況,核心執行緒在不使用的時候不能被銷燬
unit=TimeUnit.NANOSECONDS//單位納秒
workQueue=new DelayedWorkQueue() //類似DelayQueue佇列的,內部採用堆結構實現
threadFactory=new DefaultThreadFactory()//預設的工廠
handler =new AbortPolicy() //佇列滿了,直接丟擲異常

(5)延遲指定時間後執行一次任務或者週期執行任務的只擁有執行執行緒的執行緒池

這個執行緒池其實是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor提供的功能:

newScheduledThreadPool//只建立一個執行緒來執行任務
###這個方法的實際底層引數如下
corePoolSize=n//指定的數量
maximumPoolSize=Integer.MAX_VALUE,//允許最大的執行緒數
keepAliveTime=0//永遠不銷燬執行緒,等於0的情況,核心執行緒在不使用的時候不能被銷燬
unit=TimeUnit.NANOSECONDS//單位秒
workQueue=new DelayedWorkQueue() //類似DelayQueue佇列的,內部採用堆結構實現
threadFactory=new DefaultThreadFactory()//預設的工廠
handler =new AbortPolicy() //佇列滿了,直接丟擲異常

(6)ForkJoinPool基於工作竊取演算法( work-stealing)的執行緒池

這個執行緒池其實是ThreadPoolExecutor的子類ForkJoinPool類提供的功能:

newWorkStealingPool(nThreads) //預設獲取伺服器的cpu的個數,也可以自己指定並行度
###這個方法的實際底層引數如下

parallelism=n // 並行任務個數
factory=ForkJoinWorkerThreadFactory  //forkjoin的預設工廠
handler=null //不存在拒絕策略
asyncMode (true=FIFO先進先出模式處理佇列任務,false=LIFO後進先出的模式處理任務)
workerNamePrefix  //執行緒組的字首

工作竊取(work-stealing)演算法是指某個執行緒從其他佇列裡竊取任務來執行。那麼,為什麼 需要使用工作竊取演算法呢?假如我們需要做一個比較大的任務,可以把這個任務分割為若干 互不依賴的子任務,為了減少執行緒間的競爭,把這些子任務分別放到不同的佇列裡,併為每個 佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應。比如A執行緒負責處理A 佇列裡的任務。但是,有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有 任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列 裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被 竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿 任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

image

執行緒池的阻塞佇列

queue:阻塞佇列BlockingQueue介面可以用來轉換和儲存提交的任務,

當一個任務來的時候,如果當前的執行緒數小於corePoolSize,Executors會新建執行緒來處理,即使有其他的 空閒執行緒,如果corePoolSize滿了,Executors把新的任務新增到阻塞佇列裡面,而不是新建立執行緒,如果佇列 也滿了,但是當前的任務總數小於maximumPoolSize,那麼新的執行緒會被建立,如果總個數大於maximumPoolSize,那麼 新的任務會被拒絕。

如下圖示:

阻塞佇列主要包括三種:

(1)直接交付 (SynchronousQueue)

(2)無邊界佇列(LinkedBlockingQueue)

(3)有邊界佇列 (ArrayBlockingQueue)

workerCount的值2的29次方-1約500萬,代表這個執行緒池總共出現執行過的執行緒數量

執行緒池的狀態

runState:代表執行緒池的狀態

(1)RUNNING 當前接受新任務提交,並且也處理佇列裡面的任務

(2)SHUTDOWN 不接受新任務 , 但是處理佇列裡面的任務

(3)STOP 不接受新任務,不處理佇列任務,並且打斷正在執行的任務

(4)TIDYING 所有的任務已經終止,workerCount是0,此時執行緒的狀態切為TIDYING, 結束時可以執行鉤子方法terminated()

(5)TERMINATED terminated方法呼叫完成。

狀態轉變:

RUNNING => SHUTDOWN 呼叫shutdown方法

RUNNING 或者 SHUTDOWN => 呼叫shutdownNow方法

SHUTDOWN => TIDYING 當佇列和執行緒池都為空的情況下

STOP => TIDYING 當執行緒池是空的時候。

TIDYING =>TERMINATED 當terminated()鉤子方法呼叫完成的時候。

另外注意如果呼叫了awaitTermination方法,只有當執行緒池的狀態 轉變為TERMINATED的時候,這個方法才會返回。

執行緒池的拒絕策略

關於執行緒池如果滿了或者執行緒池已經關閉時的拒絕策略:

AbortPolicy:直接丟擲一個RejectedExecutionException異常

CallerRunsPolicy:直接使用呼叫者執行緒用來執行任務,提供了一個簡單的反壓控制機制 ,從而降低新任務提交的速度。

DiscardPolicy:直接丟棄任務。

DiscardOldestPolicy: 如果executor沒有關閉,丟棄頭部的任務,然後執行重試 ,如果重試失敗,則重複嘗試。

鉤子方法:

beforeExecute(Thread, Runnable)

afterExecute(Runnable, Throwable)

可以線上程的執行前後做一些處理,比如新增log日誌, 收集統計資料等。 此外還可以覆蓋重寫terminated(),線上程池徹底關閉時做一些處理工作。

關於佇列的維護:

ThreadPoolExecutor類提供了一個getQueue()方法,允許訪問當前的工作佇列去監控和除錯,此外 這個類還有兩個方法:remove(Runnable) 和 purge() 用來刪除或者取消任務來輔助回收資源的, 不建議使用這些方法。

資源釋放

如果一個執行緒池長時間不再活動,或者使用者忘記呼叫shutdown方法,我們也希望回收資源,這個時候我們可以設定 allowCoreThreadTimeOut(boolean)這個引數使得核心執行緒也能在長時間不用時回收掉。

非同步任務的獲取Future

Future用來代表對於非同步任務的結果,可以通過Future來檢查任務是否執行完成, 以及阻塞等待其完成,獲取結算結果,取消任務(已經完成的任務不能取消)等

總結

本篇文章主要了Java執行緒池的出現的意義及Java執行緒池的相關設計與相關內容的概述,通過執行緒池我們可以將任務的提交與執行分離,從而降低與程式的耦合,此外利用執行緒池我們還可以降低資源的消耗,提高執行緒的可管理性,進行資源的統一分配,調優和監控。