資料庫優化(索引、分割槽)
前言
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?
在Java中可以通過執行緒池來達到這樣的效果。今天我們就來詳細講解一下Java的執行緒池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置執行緒池的大小。
Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。
構造方法
public class ThreadPoolExecutor extends AbstractExecutorService {
........
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
........
}
|
從上面的原始碼中我們可以看到,ThreadPoolExecutor是繼承於AbstractExecutorService類,並提供了四個構造器,事實上,前三個構造器都是通過第四個構造器進行初始化工作的。
構造方法引數含義
corePoolSize:執行緒池核心執行緒數大小
這個引數與後面講述的執行緒池工作原理有很大的關係。在建立了執行緒池之後,預設是沒有任何執行緒的,而是等待有任務到來時才會建立執行緒去執行任務,除非呼叫 prestartAllCoreThreads() 或 者prestartCoreThread() 方法時,會建立全部或者一個核心執行緒。在預設情況下,當有任務到來時,建立工作執行緒去執行任務,當執行緒池中的執行緒數目到達corePoolSize時,就會把提交的任務放到等待佇列中。
maximumPoolSize:執行緒池最大執行緒數
它標識執行緒池中最大可建立的多少個執行緒
keepAliveTime:空閒執行緒存活時間
這個引數只有執行緒池中的執行緒數達到corePoolSize時才會起效,直到執行緒池中的執行緒數小於等於corePoolSize。如果呼叫allowCoreThreadTimeOut(boolean value)方法,則當執行緒池中的執行緒數小於等於corePoolSize,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0
unit:空閒執行緒存活時間單位
該引數的值是TimeUnit的7個靜態屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
|
workQueue:任務等待佇列
一個阻塞佇列,用來儲存等待執行的任務。這個引數非常重要,會對執行緒池的執行產生很大的影響。一般來說,阻塞佇列有如下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
|
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。
threadFactory:執行緒工廠
主要用來建立執行緒
handler:表示當拒絕處理任務時的策略
有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
|
原始碼解析
從上面給出的ThreadPoolExecutor類的程式碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
|
AbstractExecutorService是一個抽象類,它實現了ExecutorService介面。
public interface ExecutorService extends Executor {
// 關閉執行緒池,佇列中已經存在的任務還可以被繼續執行
void shutdown();
// 關閉執行緒池,中斷正在執行的任務
List<Runnable> shutdownNow();
// 判斷執行緒池是否關閉
boolean isShutdown();
// 判斷執行緒池是否終止
boolean isTerminated();
// 設定超時終止
boolean awaitTermination( long timeout, TimeUnit unit) throws InterruptedException;
// 提交Callable任務
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任務,並帶返回值
<T> Future<T> submit(Runnable task, T result);
// 提交Runnable任務,不帶返回值
Future<?> submit(Runnable task);
// invokeAll是同步的,需要等待其他任務完成才會返回結果。而Submit是非同步的
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// invokeAny是取第一個任務的返回值,並中斷其他任務
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
|
而ExecutorService又是繼承了Executor介面,我們看一下Executor介面的實現:
public interface Executor {
// 啟動任務
void execute(Runnable command);
}
|
到這裡,我們應該可以看到ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。
Executor是一個頂層介面,它只聲明瞭一個execute(Runnable)方法,返回值是void,引數是Runnable,從字面意思可以理解,這個是用來執行傳入的執行緒任務的。
然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;
然後ThreadPoolExecutor繼承了類AbstractExecutorService。整體類圖如下:
在ThreadPoolExecutor類中有幾個非常重要的方法:
// Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行
execute()
// ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果
submit()
// 關閉執行緒池,會線上程池中的執行緒執行完成後關閉,執行完該方法後,不再接受新的任務
shutdown()
// 立即關閉執行緒池
shutdownNow()
|
深入剖析執行緒池的實現原理
執行緒池狀態
在ThreadPoolExecutor中定義了幾個static final變量表示執行緒池的各個狀態:
// 執行緒池使用一個int來儲存執行緒池當前的狀態和工作執行緒數
// int是4位元組,32位,用高三位儲存執行緒池的工作狀態,低29位儲存執行緒池的工作執行緒數
// 為什麼這樣?節省空間,一個int可以表述兩個含義
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3 ;
// 建立執行緒池後,初始時的執行緒池狀態
private static final int RUNNING = - 1 << COUNT_BITS;
// 呼叫了shutdown()方法後,執行緒池狀態。此時執行緒池不再接受新任務,它會等待所有任務執行完畢
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 呼叫了shotdownNow()方法後,執行緒池狀態。此時執行緒池不再接收新任務,並嘗試主動終止正在執行的任務
private static final int STOP = 1 << COUNT_BITS;
// 臨時過渡狀態,所有任務都執行完了,當執行緒池的有效執行緒數為0時,這個時候為該狀態。執行terminated()方法,流轉下一個狀態
private static final int TIDYING = 2 << COUNT_BITS;
// 終止狀態,terminated()方法完成後的狀態
private static final int TERMINATED = 3 << COUNT_BITS;
|
任務的執行
在瞭解任務提交到執行緒池到任務執行完成的整個過程前,我們先來看下ThreadPoolExecutor中比較重要的成員變數:
// 任務快取佇列,用來存放等待執行的任務
private final BlockingQueue<Runnable> workQueue;
// 執行緒池的主要狀態鎖,對執行緒池的狀態及屬性的變化,都要依賴這個鎖來保證同步
private final ReentrantLock mainLock = new ReentrantLock();
// 用來存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
// 執行緒空閒時間
private volatile long keepAliveTime;
// 是否允許為核心執行緒設定存活時間
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private volatile int corePoolSize;
// 執行緒池最大能容忍的執行緒數
private volatile int maximumPoolSize;
// 當前執行緒池的執行緒數
private volatile int poolSize;
// 執行緒池的拒絕策略
private volatile RejectedExecutionHandler handler;
// 執行緒工廠,用來建立執行緒
private volatile ThreadFactory threadFactory;
// 用來記錄執行緒池中曾經出現過的最大執行緒數
private int largestPoolSize;
// 用來記錄已經執行完畢的任務個數
private long completedTaskCount;
|
每個變數的作用都已經標明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變數。
corePoolSize在很多地方都被翻譯成核心執行緒數量,但是我的理解是這個值就是執行緒池的大小。舉個例子:假如一個工廠有10個工人,當有人空閒的時候,就分配任務給一個工人。如果10個工人都在忙碌,就把任務置放在等待佇列。如果任務繁多,就招臨時工進行工作(maximumPoolSize)。當任務增長緩和,就辭退臨時工,只保持10個工人。不過為了保持認知的一致性,本文還是繼續將corePoolSize解釋為核心執行緒數。
largestPoolSize只是一個用來記錄的變數,和執行緒池的容量沒有關係。
任務執行的生命週期
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可。
execute()方法
public void execute(Runnable command) {
// 判斷提交的任務是否為空
if (command == null )
throw new NullPointerException();
// 獲取執行緒池狀態和工作執行緒數量結合體(下文統稱為ctl)
int c = ctl.get();
// 判斷工作執行緒數量是否小於核心執行緒數
if (workerCountOf(c) < corePoolSize) {
// 把任務新增Worker,新增成功則返回
if (addWorker(command, true ))
return ;
// 新增失敗,再次獲取ctl
c = ctl.get();
}
// 如果執行緒池的狀態是執行中,並且向等待佇列提交成功
if (isRunning(c) && workQueue.offer(command)) {
// double-check機制,如果執行緒池已經不是running狀態,就要把提交的任務移除並拒絕
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0 )
// 如果核心執行緒都超時退出,因為任務已經放入佇列,所以不需要再提交一個任務,同時建立一個執行緒並啟動
addWorker( null , false );
}
// 執行到這裡有兩種情況
// 1. 執行緒池已經不是RUNNING狀態 2.等待佇列的長度已經超過定義
else if (!addWorker(command, false ))
reject(command);
}
|
addWorker()方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取執行緒池執行狀態
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果執行緒池的狀態是:STOP TYDING TERMINATD狀態,直接返回false,任務新增失敗
// 如果執行緒池的狀態為SHUTDOWN 同時first!=null 或者 workQueue為空,任務新增失敗。此時代表:執行緒池已經停止,正在等待僅有的一個任務執行完成
// 為什麼要做兩次判斷?有可能任務是線上程池RUNNING狀態的時候將任務到佇列中,但是放入完成後狀態變為SHUTDOWN,此時不應該再執行新的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false ;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false ;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false ;
boolean workerAdded = false ;
Worker w = null ;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null ) {
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null )) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true ;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true ;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
|