1. 程式人生 > >Android 執行緒池—ThreadPoolExecutor理解與使用

Android 執行緒池—ThreadPoolExecutor理解與使用

使用執行緒池的好處可以歸納為3點:

  1. 重用執行緒池中的執行緒, 避免因為執行緒的建立和銷燬所帶來的效能開銷.
  2. 有效控制執行緒池中的最大併發數,避免大量執行緒之間因為相互搶佔系統資源而導致的阻塞現象.
  3. 能夠對執行緒進行簡單的管理,可提供定時執行和按照指定時間間隔迴圈執行等功能.

ThreadPoolExecutor是Executors類的底層實現。android中執行緒池的概念來源於java中的Executor,執行緒池真正的實現類是ThreadPoolExecutor,它間接實現了Executor介面。ThreadPoolExecutor提供了一系列引數來配置執行緒池,通過不同的引數配置實現不同功能特性的執行緒池,android中的Executors類提供了4個工廠方法用於建立4種不同特性的執行緒池給開發者用.
android中的執行緒池都是直接或是間接通過配置ThreadPoolExecutor來實現的

public class Executors {
    ...
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());
    }
    ...
}

ThreadPoolExecutor 的配置引數

public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler) {
    this
.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } corePoolSize: 執行緒池的核心執行緒數,預設情況下, 核心執行緒會線上程池中一直存活, 即使處於閒置狀態. 但如果將allowCoreThreadTimeOut設定為true的話, 那麼核心執行緒也會有超時機制, 在keepAliveTime設定的時間過後, 核心執行緒也會被終止. maximumPoolSize: 最大的執行緒數, 包括核心執行緒, 也包括非核心執行緒, 線上程數達到這個值後,新來的任務將會被阻塞. keepAliveTime: 超時的時間, 閒置的非核心執行緒超過這個時長,講會被銷燬回收, 當allowCoreThreadTimeOut為true時,這個值也作用於核心執行緒. unit:超時時間的時間單位. workQueue:執行緒池的任務佇列, 通過execute方法提交的runnable物件會儲存在這個佇列中. threadFactory: 執行緒工廠, 為執行緒池提供建立新執行緒的功能. handler: 任務無法執行時,回撥handler的rejectedExecution方法來通知呼叫者.

ThreadPoolExecutor的執行過程

  • 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。(什麼意思?如果當前執行的執行緒小於corePoolSize,則任務根本不會存放,新增到queue中,而是直接抄傢伙(thread)開始執行)
  • 如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
  • 如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。

用currentSize表示執行緒池中當前執行緒數量,將上述過程可以表示如下
1. 當currentSize < corePoolSize時,直接啟動一個核心執行緒並執行任務。
2. 當currentSize>=corePoolSize、並且workQueue未滿時,新增進來的任務會被安排到workQueue中等待執行。
3. 當workQueue已滿,但是currentSize < maximumPoolSize時,會立即開啟一個非核心執行緒來執行任務。
4. 當currentSize >= corePoolSize、workQueue已滿、並且currentSize > maximumPoolSize時,呼叫handler預設丟擲RejectExecutionExpection異常。

一句話概括來說就是先裝滿核心執行緒,再裝滿workQueue,再裝滿非核心執行緒,都裝滿則拒絕任務。

任務佇列workQueue

在上述執行過程中有個關鍵的承載工具workQueue,任務佇列queue的種類有三種:

  1. 直接提交。工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
  2. 無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
  3. 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

Android中的4種常用的執行緒池

通過配置不同引數的ThreadPoolExecutor, 有4類常用的執行緒池, Executors類提供了4個工廠方法用於建立4種不同特性的執行緒池給開發者用.

1.FixedThreadPool

建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
用法:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(num);
fixedThreadPool.execute(runnable物件);
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());
}

特點:只有核心執行緒數,並且沒有超時機制,因此核心執行緒即使閒置時,也不會被回收,因此能更快的響應外界的請求.

2.CachedThreadPool

池裡的執行緒數量並不是固定的,理論上可以無限大,任務不需要排隊,如果有空閒的執行緒,則複用,無則新建執行緒。
用法:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(runnable物件);
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());
}

特點:沒有核心執行緒,非核心執行緒數量沒有限制, 超時為60秒.
適用於執行大量耗時較少的任務,當執行緒閒置超過60秒時就會被系統回收掉,當所有執行緒都被系統回收後,它幾乎不佔用任何系統資源.

3.ScheduledThreadPool

建立一個定長執行緒池,支援定時及週期性任務執行。
用法:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
scheduledThreadPool.schedule(runnable物件, 2000, TimeUnit.MILLISECONDS);
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

特點:核心執行緒數是固定的,非核心執行緒數量沒有限制, 沒有超時機制.
主要用於執行定時任務和具有固定週期的重複任務.

4.SingleThreadExecutor

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(runnable物件);
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

特點:只有一個核心執行緒,並沒有超時機制.
意義在於統一所有的外界任務到一個執行緒中, 這使得在這些任務之間不需要處理執行緒同步的問題.

下面是一個對執行緒池的簡單封裝,管理專案中的執行緒。

/**
 * 執行緒池管理
 */
public class UPThreadPoolManager {

    private static UPThreadPoolManager mInstance = new UPThreadPoolManager();

    public static final synchronized UPThreadPoolManager getInstance() {
        if (mInstance == null) {
            mInstance = new UPThreadPoolManager();
        }
        return mInstance;
    }

    private int corePoolSize; // 核心執行緒池的數量,同時能夠執行的執行緒數量
    private int maximumPoolSize;// 最大執行緒池數量,表示當緩衝佇列滿的時候能繼續容納的等待任務的數量
    private long keepAliveTime = 3;// 存活時間
    private TimeUnit unit = TimeUnit.MINUTES;
    private ThreadPoolExecutor executor;

    private UPThreadPoolManager() {
        /**
         * 給corePoolSize賦值:當前裝置可用處理器核心數*2 + 1,能夠讓cpu的效率得到最大程度執行
         */
        corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
        maximumPoolSize = corePoolSize; // 雖然maximumPoolSize用不到,但是需要賦值,否則報錯
        executor = new ThreadPoolExecutor(corePoolSize, // 當某個核心任務執行完畢,會依次從緩衝佇列中取出等待任務
                maximumPoolSize, // 5,先corePoolSize,然後new
                                 // LinkedBlockingQueue<Runnable>(),然後maximumPoolSize,但是它的數量是包含了corePoolSize的
                keepAliveTime, // 表示的是maximumPoolSize當中等待任務的存活時間
                unit, new LinkedBlockingQueue<Runnable>(), // 緩衝佇列,用於存放等待任務,Linked的先進先出(無界佇列)
                Executors.defaultThreadFactory(), // 建立執行緒的工廠
                new ThreadPoolExecutor.AbortPolicy() // 用來對超出maximumPoolSize的任務的處理策略
        );
        executor.allowCoreThreadTimeOut(true); // keepAliveTime同樣作用於核心執行緒
    }

    /**
     * 執行任務
     */
    public void execute(Runnable runnable) {
        if (runnable == null)
            return;

        executor.execute(runnable);
    }

    /**
     * 從執行緒池中移除任務
     */
    public void remove(Runnable runnable) {
        if (runnable == null)
            return;

        executor.remove(runnable);
    }
}