無界佇列的使用誤區
本文目錄:
1. 概述
我們這裡的佇列都指執行緒池使用的阻塞佇列 BlockingQueue 的實現。
什麼是有界佇列?就是有固定大小的佇列。比如設定了固定大小的 LinkedBlockingQueue,又或者大小為 0,只是在生產者和消費者中做中轉用的 SynchronousQueue。
什麼是無界佇列?指的是沒有設定固定大小的佇列。這些佇列的特點是可以直接入列,直到溢位。當然現實幾乎不會有到這麼大的容量(超過 Integer.MAX_VALUE),所以從使用者的體驗上,就相當於 “無界”。比如沒有設定固定大小的 LinkedBlockingQueue。
所以無界佇列的特點就是可以一直入列,不存在佇列滿負荷的現象。
這個特性,在我們自定義執行緒池的使用中非常容易出錯。而出錯的根本原因是對執行緒池內部原理的不瞭解。
比如有這麼一個案例,我們使用了無界佇列建立了這樣一個執行緒池:
ExecutorService executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
配置的引數如下:
- 核心執行緒數 2
- 最大執行緒數 4
- 空閒執行緒保活時間 60s
- 使用無界佇列 LinkedBlockingQueue
然後對這個執行緒池我們提出一個問題:使用過程中,是否會達到最大執行緒數 4?
2. 驗證
我們寫了個 Demo 驗證一下,設定有 10 個任務,每個任務執行 10s。
任務的執行程式碼如下,用 Thread.sleep 操作模擬執行任務的阻塞耗時。
/**
* @author lidiqing
* @since 17/9/17.
*/
public class BlockRunnable implements Runnable {
private final String mName;
public BlockRunnable(String name) {
mName = name;
}
public void run() {
System.out.println(String.format("[%s] %s 執行", Thread.currentThread().getName(), mName));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
然後在 main 方法中把這 10 個任務扔進剛剛設計好的執行緒池中:
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 10; i++) {
executor.execute(new BlockRunnable(String.valueOf(i)));
}
}
結果輸出如下:
[pool-1-thread-2] 1 執行
[pool-1-thread-1] 0 執行
[pool-1-thread-2] 2 執行
[pool-1-thread-1] 3 執行
[pool-1-thread-1] 5 執行
[pool-1-thread-2] 4 執行
[pool-1-thread-2] 7 執行
[pool-1-thread-1] 6 執行
[pool-1-thread-1] 8 執行
[pool-1-thread-2] 9 執行
發現了什麼問題?這裡最多出現兩個執行緒。當放開到更多的任務時,也依然是這樣。
3. 剖析
我們回到執行緒池 ThreadPoolExecutor 的 execute 方法來找原因。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面程式碼的核心就是任務進入等待佇列 workQueue 的時機。答案就是,執行 execute 方法時,如果發現核心執行緒數已滿,是會先執行 workQueue.offer(command)
來入列。
也就是 當核心執行緒數滿了後,任務優先進入等待佇列。如果等待佇列也滿了後,才會去建立新的非核心執行緒 。
所以我們上面設計的執行緒池,使用了無界佇列,會直接導致最大執行緒數的配置失效。
可以用一張圖來展示整個 execute 階段的過程:
所以上面的執行緒池,實際使用的執行緒數的最大值始終是 corePoolSize
,即便設定了 maximumPoolSize
也沒有生效。 要用上 maximumPoolSize
,允許在核心執行緒滿負荷下,繼續建立新執行緒來工作 ,就需要選用有界任務佇列。可以給 LinkedBlockingQueue 設定容量,比如 new LinkedBlockingQueue(128)
,也可以換成 SynchronousQueue。
舉個例子,用來做非同步任務的 AsyncTask 的內建併發執行器的執行緒池設計如下:
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
...
}
我們可以看到,AsyncTask 的這個執行緒池設計,是希望在達到核心執行緒數之後,能夠繼續增加工作執行緒,最大達到 CPU_COUNT * 2 + 1
個執行緒,所以使用了有界佇列,限制了任務佇列最大數量為 128 個。
所以使用 AsyncTask 的併發執行緒池的時候要注意,不適宜短時間同時大量觸發大量任務的場景。
因為當核心執行緒、任務佇列、非核心執行緒全部滿負荷工作的情況下,下一個進來的任務會觸發 ThreaPoolExecutor 的 reject
操作,預設會使用 AbortPolicy
策略,丟擲 RejectedExecutionException
異常。