1. 程式人生 > >併發:ThreadPoolExecutor詳解。

併發:ThreadPoolExecutor詳解。

Executor框架最核心的類是ThreadPoolExecutor,他是執行緒池的實現類,主要由下列4個元件構成。

  • corePool:核心執行緒池的大小。
  • maximumPool:最大執行緒池的大小。
  • BlockingQueue:用來暫時儲存任務的工作佇列。
  • RegjectedExecutionHandler:當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大執行緒池大小且工作佇列已滿),execute()方法將要呼叫的Handler。

通過Executor框架的工具類Executors,可以建立3種類型的ThreadPoolExecutor。

  • FixedThreadPool。
  • SingleThreadExecutor。
  • CacheThreadPool。

下面將分別介紹這3種ThreadPoolExecutor。

FixedThreadPool詳解

FixedThreadPool被稱為可重用固定執行緒數的執行緒池。下面是FixedThreadPool的原始碼實現。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。

當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止。
FixedThreadPool的execute()方法的執行示意圖如下所示。

對上圖的說明如下。

  1. 如果當前執行得執行緒數少於corePoolSize,則建立執行緒來執行任務。
  2. 線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。
  3. 執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響。

  1. 當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中的執行緒數不會超過corePoolSize。
  2. 由於1,使用無界佇列時maximumPoolSize將是一個無效引數。
  3. 由於1和2,使用無界佇列時keepAliveTime將是一個無效引數。
  4. 由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor詳解

SingleThreadExecutor是使用單個worker執行緒的Executor。下面是SingleThreadExecutor的原始碼實現。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他引數與FixedThreadPool相同。SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界佇列作為工作佇列對執行緒池帶來的影響與FixedThreadPool相同,這裡就不贅述了。
SingleThreadExecutor的執行示意圖如下所示。

對上圖的說明如下。

  1. 如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。
  2. 線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入LinkedBlockingQueue。
  3. 執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

CacheThreadPool詳解

CacheThreadPool是一個會根據需要建立新執行緒的執行緒池。下面是建立CacheThreadPool的原始碼。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CacheThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設定為60L,意味著CacheThreadPool中的空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列。CacheThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列,但CacheThreadPool的maximumPool是無界的。這意味著,如果主執行緒提交任務的速度高於maximumPool中執行緒處理任務的速度時,CacheThreadPool會不斷建立新執行緒。極端情況下,CacheThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。

CacheThreadPool的execute()方法的執行示意圖如下所示。

對上圖的說明如下。

  1. 首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的步驟2。
  2. 當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這種情況下,步驟1將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。
  3. 在步驟2中新建立的執行緒將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行步驟1),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任務資源。

前面提到過,SynchronousQueue是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。CachedThreadPool中任務傳遞的示意圖如下所示。