1. 程式人生 > >無界佇列的使用誤區

無界佇列的使用誤區

本文目錄:

1. 概述

我們這裡的佇列都指執行緒池使用的阻塞佇列 BlockingQueue 的實現。

什麼是有界佇列?就是有固定大小的佇列。比如設定了固定大小的 LinkedBlockingQueue,又或者大小為 0,只是在生產者和消費者中做中轉用的 SynchronousQueue。

什麼是無界佇列?指的是沒有設定固定大小的佇列。這些佇列的特點是可以直接入列,直到溢位。當然現實幾乎不會有到這麼大的容量(超過 Integer.MAX_VALUE),所以從使用者的體驗上,就相當於 “無界”。比如沒有設定固定大小的 LinkedBlockingQueue。

所以無界佇列的特點就是可以一直入列,不存在佇列滿負荷的現象。

這個特性,在我們自定義執行緒池的使用中非常容易出錯。而出錯的根本原因是對執行緒池內部原理的不瞭解。

比如有這麼一個案例,我們使用了無界佇列建立了這樣一個執行緒池:

ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

配置的引數如下:

  • 核心執行緒數 2
  • 最大執行緒數 4
  • 空閒執行緒保活時間 60s
  • 使用無界佇列 LinkedBlockingQueue

然後對這個執行緒池我們提出一個問題:使用過程中,是否會達到最大執行緒數 4?

2. 驗證

我們寫了個 Demo 驗證一下,設定有 10 個任務,每個任務執行 10s。

任務的執行程式碼如下,用 Thread.sleep 操作模擬執行任務的阻塞耗時。

/**
 * @author lidiqing
 * @since 17/9/17.
 */
public class BlockRunnable implements Runnable {
    private final String mName;

    public BlockRunnable(String name) {
        mName = name;
    }

    public
void run() { System.out.println(String.format("[%s] %s 執行", Thread.currentThread().getName(), mName)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }

然後在 main 方法中把這 10 個任務扔進剛剛設計好的執行緒池中:

 public static void main(String[] args) {
        ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 10; i++) {
            executor.execute(new BlockRunnable(String.valueOf(i)));
        }
    }

結果輸出如下:

[pool-1-thread-2] 1 執行
[pool-1-thread-1] 0 執行
[pool-1-thread-2] 2 執行
[pool-1-thread-1] 3 執行
[pool-1-thread-1] 5 執行
[pool-1-thread-2] 4 執行
[pool-1-thread-2] 7 執行
[pool-1-thread-1] 6 執行
[pool-1-thread-1] 8 執行
[pool-1-thread-2] 9 執行

發現了什麼問題?這裡最多出現兩個執行緒。當放開到更多的任務時,也依然是這樣。

3. 剖析

我們回到執行緒池 ThreadPoolExecutor 的 execute 方法來找原因。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

上面程式碼的核心就是任務進入等待佇列 workQueue 的時機。答案就是,執行 execute 方法時,如果發現核心執行緒數已滿,是會先執行 workQueue.offer(command) 來入列。

也就是 當核心執行緒數滿了後,任務優先進入等待佇列。如果等待佇列也滿了後,才會去建立新的非核心執行緒

所以我們上面設計的執行緒池,使用了無界佇列,會直接導致最大執行緒數的配置失效。

可以用一張圖來展示整個 execute 階段的過程:

執行機制-execute流程

所以上面的執行緒池,實際使用的執行緒數的最大值始終是 corePoolSize ,即便設定了 maximumPoolSize 也沒有生效。 要用上 maximumPoolSize ,允許在核心執行緒滿負荷下,繼續建立新執行緒來工作 ,就需要選用有界任務佇列。可以給 LinkedBlockingQueue 設定容量,比如 new LinkedBlockingQueue(128) ,也可以換成 SynchronousQueue。

舉個例子,用來做非同步任務的 AsyncTask 的內建併發執行器的執行緒池設計如下:

public abstract class AsyncTask<Params, Progress, Result> {     
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // We want at least 2 threads and at most 4 threads in the core pool,
    // preferring to have 1 less than the CPU count to avoid saturating
    // the CPU with background work
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    ...
}

我們可以看到,AsyncTask 的這個執行緒池設計,是希望在達到核心執行緒數之後,能夠繼續增加工作執行緒,最大達到 CPU_COUNT * 2 + 1 個執行緒,所以使用了有界佇列,限制了任務佇列最大數量為 128 個。

所以使用 AsyncTask 的併發執行緒池的時候要注意,不適宜短時間同時大量觸發大量任務的場景。

因為當核心執行緒、任務佇列、非核心執行緒全部滿負荷工作的情況下,下一個進來的任務會觸發 ThreaPoolExecutor 的 reject 操作,預設會使用 AbortPolicy 策略,丟擲 RejectedExecutionException 異常。