1. 程式人生 > >Executor框架(九)

Executor框架(九)

Java的執行緒既是工作單元,也是執行機制。從JDK5開始,把工作單元與執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供。

Executor框架介紹

Executor框架的兩極排程模型

在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執行緒。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,這個作業系統執行緒也會被回收。作業系統會排程所有執行緒並將它們分配給可用的CPU。

在上層,Java多執行緒程式通常把應用分解為若干個任務,然後使用使用者級的排程器
(Executor框架)將這些任務對映為固定數量的執行緒;在底層,作業系統核心將這些執行緒對映到硬體處理器上。

這裡寫圖片描述

應用程式通過Executor框架控制上層的排程;而下層的排程由作業系統核心控制,下層的排程不受應用程式的控制。

Executor框架的結構與成員

1.Executor框架的結構

Executor框架主要由3大部分組成:
·任務。包括被執行任務需要實現的介面:Runnable介面或Callable介面。
·任務的執行。包括任務執行機制的核心介面Executor,以及繼承自Executor的
ExecutorService介面。Executor框架有兩個關鍵類實現了ExecutorService介面
(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
·非同步計算的結果。包括介面Future和實現Future介面的FutureTask類。

Executor框架包含的主要的類與介面如圖:

這裡寫圖片描述

·Executor是一個介面,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。
·ThreadPoolExecutor是執行緒池的核心實現類,用來執行被提交的任務。
·ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲後執行命令,或者定期執行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。
·Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。
·Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。

Executor框架的使用示意圖:

這裡寫圖片描述

主執行緒首先要建立實現Runnable或者Callable介面的任務物件。工具類Executors可以把一個Runnable物件封裝為一個Callable物件(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。

然後可以把Runnable物件直接交給ExecutorService執行(ExecutorService.execute(Runnable command));或者也可以把Runnable物件或Callable物件提交給ExecutorService執行(Executor-
Service.submit(Runnable task)或ExecutorService.submit(Callabletask))。

如果執行ExecutorService.submit(…),ExecutorService將返回一個實現Future介面的物件(到目前為止的JDK中,返回的是FutureTask物件)。由於FutureTask實現了Runnable,程式設計師也可以建立FutureTask,然後直接交給ExecutorService執行。

最後,主執行緒可以執行FutureTask.get()方法來等待任務執行完成。主執行緒也可以執行
FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

2.Executor框架的成員

本節將介紹Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors。

(1)ThreadPoolExecutor

ThreadPoolExecutor通常使用工廠類Executors來建立。Executors可以建立3種類型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

1) FixedThreadPool。
是Executors提供的,建立使用固定執行緒數的FixedThreadPool的API:

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

FixedThreadPool適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場景,它適用於負載比較重的伺服器。

2)SingleThreadExecutor。
Executors提供的,建立使用單個執行緒的SingleThreadExecutor的API:

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

SingleThreadExecutor適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。

3) CachedThreadPool。
Executors提供的,建立一個會根據需要建立新執行緒的CachedThreadPool的API:

public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

CachedThreadPool是大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。

(2)ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工廠類Executors來建立。Executors可以建立2種類型的ScheduledThreadPoolExecutor。

·ScheduledThreadPoolExecutor。包含若干個執行緒的ScheduledThreadPoolExecutor。
·SingleThreadScheduledExecutor。只包含一個執行緒的ScheduledThreadPoolExecutor。

工廠類Executors提供的,建立固定個數執行緒的ScheduledThreadPoolExecutor的API:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)

ScheduledThreadPoolExecutor適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源管理的需求而需要限制後臺執行緒的數量的應用場景。下面是Executors提供的,建立單個執行緒的SingleThreadScheduledExecutor的API:

public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor
(ThreadFactory threadFactory)

SingleThreadScheduledExecutor適用於需要單個後臺執行緒執行週期任務,同時需要保證順序地執行各個任務的應用場景。

ThreadFactory介面只有一個方法呼叫 newThread()。它接收一個 Runnable物件作為引數,並返回一個 Thread物件。當你實現一個 ThreadFactory介面,您必須實現該介面並覆蓋此方法。

(3)Future介面

Future介面和實現Future介面的FutureTask類用來表示非同步計算的結果。當我們把Runnable介面或Callable介面的實現類提交(submit)給ThreadPoolExecutor或
ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向我們返回一個FutureTask物件。

<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)

有一點需要讀者注意,到目前最新的JDK 8為止,Java通過上述API返回的是一個
FutureTask物件。但從API可以看到,Java僅僅保證返回的是一個實現了Future介面的物件。在將來的JDK實現中,返回的可能不一定是FutureTask。

(4)Runnable介面和Callable介面

Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable可以返回結果。

Callable介面與Runnable介面區別
相同點:
Callable和Runnable都是介面
Callable和Runnable都可以應用於Executors
不同點:
Callable要實現call方法,Runnable要實現run方法
call方法可以返回值,run方法不能
call方法可以丟擲checked exception,run方法不能
Runnable介面在jdk1.1就有了,Callable在Jdk1.5才有

除了可以自己建立實現Callable介面的物件外,還可以使用工廠類Executors來把一個
Runnable包裝成一個Callable。

下面是Executors提供的,把一個Runnable包裝成一個Callable的API:

public static Callable<Object> callable(Runnable task) // 假設返回物件Callable1

下面是Executors提供的,把一個Runnable和一個待返回的結果包裝成一個Callable的API:

public static <T> Callable<T> callable(Runnable task, T result) // 假設返回物件Callable2

當我們把一個Callable物件(比如上面的Callable1或Callable2)提交給
ThreadPoolExecutor或ScheduledThreadPoolExecutor執行時,submit(…)會向我們返回一個FutureTask物件。我們可以執行FutureTask.get()方法來等待任務執行完成。當任務成功完成後FutureTask.get()將返回該任務的結果。例如,如果提交的是物件Callable1,FutureTask.get()方法將返回null;如果提交的是物件Callable2,FutureTask.get()方法將返回result物件。

ThreadPoolExecutor詳解

Executor框架最核心的類是ThreadPoolExecutor,它是執行緒池的實現類,主要由下列4個元件構成。
·corePool:核心執行緒池的大小。
·maximumPool:最大執行緒池的大小。
·BlockingQueue:用來暫時儲存任務的工作佇列。
·RejectedExecutionHandler:當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大執行緒池大小且工作佇列已滿),execute()方法將要呼叫的Handler。

通過Executor框架的工具類Executors,可以建立3種類型的ThreadPoolExecutor:
·FixedThreadPool。
·SingleThreadExecutor。
·CachedThreadPool。

FixedThreadPool詳解

FixedThreadPool被稱為可重用固定執行緒數的執行緒池。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。

FixedThreadPool的execute()方法的執行示意圖:

這裡寫圖片描述

1)如果當前執行的執行緒數少於corePoolSize,則建立新執行緒來執行任務。
2)線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入
LinkedBlockingQueue。
3)執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響:

1)當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中
的執行緒數不會超過corePoolSize。
2)由於1,使用無界佇列時maximumPoolSize將是一個無效引數。
3)由於1和2,使用無界佇列時keepAliveTime將是一個無效引數。
4)由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或
shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor詳解

SingleThreadExecutor是使用單個worker執行緒的Executor。

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他引數與FixedThreadPool相同。SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界佇列作為工作佇列對執行緒池帶來的影響與FixedThreadPool相同。

這裡寫圖片描述

1)如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。
2)線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入Linked-
BlockingQueue。
3)執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

CachedThreadPool詳解

CachedThreadPool是一個會根據需要建立新執行緒的執行緒池。下面是建立CachedThreadPool的原始碼:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設定為60L,意味著CachedThreadPool中的空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列。CachedThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列,但CachedThreadPool的maximumPool是無界的。這意味著,如果主執行緒提交任務的速度高於maximumPool中執行緒處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。

CachedThreadPool的execute()方法的執行示意圖:

這裡寫圖片描述

1)首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的步驟2)。

2)當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1)將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。

3)在步驟2)中新建立的執行緒將任務執行完後,會執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行步驟1)),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任何資源。

SynchronousQueue是一個沒有容量的阻塞佇列。每個插入操作必須等待另一
個執行緒的對應移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。

這裡寫圖片描述

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之後執行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺執行緒,而ScheduledThreadPoolExecutor可以在建構函式中指定多個對應的後臺執行緒數。

ScheduledThreadPoolExecutor的執行機制

ScheduledThreadPoolExecutor的執行示意圖:

這裡寫圖片描述

DelayQueue是一個無界佇列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義。

ScheduledThreadPoolExecutor的執行主要分為兩大部分:
1)當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了RunnableScheduledFuture介面的ScheduledFutureTask。
2)執行緒池中的執行緒從DelayQueue中獲取ScheduledFutureTask,然後執行任務。

ScheduledThreadPoolExecutor為了實現週期性的執行任務,對ThreadPoolExecutor做了如下的修改:

·使用DelayQueue作為任務佇列。
·獲取任務的方式不同。
·執行週期任務後,增加了額外的處理。

ScheduledThreadPoolExecutor的實現

ScheduledThreadPoolExecutor會把待排程的任務(ScheduledFutureTask)
放到一個DelayQueue中。

ScheduledFutureTask主要包含3個成員變數:
·long型成員變數time,表示這個任務將要被執行的具體時間。
·long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號。
·long型成員變數period,表示任務執行的間隔週期。

DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對佇列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麼先提交的任務將被先執行)。

首先,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行週期任務的過程:

這裡寫圖片描述

1)執行緒1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
2)執行緒1執行這個ScheduledFutureTask。
3)執行緒1修改ScheduledFutureTask的time變數為下次將要被執行的時間。
4)執行緒1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(Delay-
Queue.add())。

上面的步驟1)獲取任務的過程。下面是DelayQueue.take()方法的源代
碼實現。

public E take() throws InterruptedException{
    final ReentrantLock lock = this.lock();
    lock.lockInterruptibly();                 //1
    try{
        for(;;){
            E first = q.peek();
            if(first == null){
                available.await();           //2.1
            }else{
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if(delay > 0){
                    long tl = available.awaitNanos(delay); //2.2
                }else{
                    E x = q.poll();           //2.3.1
                    assert x != null;     
                    if(q.size() != 0)
                        available.signalAll(); //2.3.2
                    return x;
                }
            }
        }
    }finally{
        lock.unlock();
    }
}

是DelayQueue.take()的執行示意圖:

這裡寫圖片描述

獲取任務分為3大步驟:
1)獲取Lock。
2)獲取週期任務。
·如果PriorityQueue為空,當前執行緒到Condition中等待;否則執行下面的2.2。
·如果PriorityQueue的頭元素的time時間比當前時間大,到Condition中等待到time時間;否則執行下面的2.3。
·獲取PriorityQueue的頭元素(2.3.1);如果PriorityQueue不為空,則喚醒在Condition中等待的所有執行緒(2.3.2)。
3)釋放Lock。

ScheduledThreadPoolExecutor在一個迴圈中執行步驟2,直到執行緒從PriorityQueue獲取到一個元素之後(執行2.3.1之後),才會退出無限迴圈(結束步驟2)。

最後,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行任務的步驟4,把
ScheduledFutureTask放入DelayQueue中的過程。下面是DelayQueue.add()的原始碼:

public boolean offer(E e){
    final ReentrantLock lock = this.lock;
    lock.lock;              //1
    try{
        E first = q.peek();
        q.offer(e);         //2.1
        if(first == null || e.compareTo(first)<0)
            available.signalAll();  //2.2
        return true;
    }finally{
        lock.unlock();       //3
    }
}

這裡寫圖片描述

如圖所示,新增任務分為3大步驟:

1)獲取Lock。
2)新增任務。
·向PriorityQueue新增任務。
·如果在上面2.1中新增的任務是PriorityQueue的頭元素,喚醒在Condition中等待的所有執行緒。
3)釋放Lock。

FutureTask詳解

Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。

FutureTask簡介

FutureTask除了實現Future介面外,還實現了Runnable介面。因此,FutureTask可以交給Executor執行,也可以由呼叫執行緒直接執行(FutureTask.run())。根據FutureTask.run()方法被執行的時機,FutureTask可以處於下面3種狀態。

1)未啟動。FutureTask.run()方法還沒有被執行之前,FutureTask處於未啟動狀態。當建立一個FutureTask,且沒有執行FutureTask.run()方法之前,這個FutureTask處於未啟動狀態。
2)已啟動。FutureTask.run()方法被執行的過程中,FutureTask處於已啟動狀態。
3)已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(…)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask處於已完成狀態。

當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致呼叫執行緒阻塞;
當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致呼叫執行緒立即返回結果或丟擲異常。

當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務執行緒的方式來試圖停止任務;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的執行緒產生影響(讓正在執行的任務執行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel(…)方法將返回false。

這裡寫圖片描述

FutureTask的使用

可以把FutureTask交給Executor執行;也可以通過ExecutorService.submit(…)方法返回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.cancel(…)方法。除此以外,還可以單獨使用FutureTask。

當一個執行緒需要等待另一個執行緒把某個任務執行完後它才能繼續執行,此時可以使用
FutureTask。假設有多個執行緒執行若干任務,每個任務最多隻能被執行一次。當多個執行緒試圖同時執行同一個任務時,只允許一個執行緒執行任務,其他執行緒需要等待這個任務執行完後才能繼續執行。

FutureTask的實現

FutureTask的實現基於AbstractQueuedSynchronizer(以下簡稱為AQS)。java.util.concurrent中的很多可阻塞類(比如ReentrantLock)都是基於AQS來實現的。AQS是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。JDK 6中AQS被廣泛使用,基於AQS實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

每一個基於AQS實現的同步器都會包含兩種型別的操作,如下:

  • 至少一個acquire操作。這個操作阻塞呼叫執行緒,除非/直到AQS的狀態允許這個執行緒繼續執行。FutureTask的acquire操作為get()/get(long timeout,TimeUnit unit)方法呼叫。
  • 至少一個release操作。這個操作改變AQS的狀態,改變後的狀態可允許一個或多個阻塞執行緒被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基於“複合優先於繼承”的原則,FutureTask聲明瞭一個內部私有的繼承於AQS的子類
Sync,對FutureTask所有公有方法的呼叫都會委託給這個內部子類。

AQS被作為“模板方法模式”的基礎類提供給FutureTask的內部子類Sync,這個內部子類只需要實現狀態檢查和狀態更新的方法即可,這些方法將控制FutureTask的獲取和釋放操作。具體來說,Sync實現了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態。