Java執行緒池原始碼解讀
執行緒池
使用
我們看一看執行緒池引數最全的建立方式
public ThreadPoolExecutor( int corePoolSize, //核心執行緒數大小 int maximumPoolSize, //最大執行緒數大小 long keepAliveTime, //空閒執行緒存活時間 TimeUnit unit, //時間單位 BlockingQueue<Runnable> workQueue, //工作佇列 ThreadFactory threadFactory, //執行緒工廠 RejectedExecutionHandler handler) { //拒絕策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) //校驗基本引數 throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
四種原生局決策略:
1、AbortPolicy策略:該策略直接丟擲異常,阻止系統工作
2、CallerRunsPolicy策略:只要執行緒池未關閉,該策略直接在呼叫者執行緒中運行當前被丟棄的任務。顯然這樣不會真的丟棄任務,但是,呼叫者執行緒效能可能急劇下降。
3、DiscardOledestPolicy策略:丟棄最老的一個請求任務,也就是丟棄一個即將被執行的任務,並嘗試再次提交當前任務。
4、DiscardPolicy策略:默默的丟棄無法處理的任務,不予任何處理。
應用場景:
1、AbortPolicy策略:這個就沒有特殊的場景了,但是一點要正確處理丟擲的異常。
2、CallerRunsPolicy策略:一般在不允許失敗的、對效能要求不高、併發量較小的場景下使用,因為執行緒池一般情況下不會關閉,也就是提交的任務一定會被執行,但是由於是呼叫者執行緒自己執行的,當多次提交任務時,就會阻塞後續任務執行,效能和效率自然就慢了。
3、DiscardOledestPolicy策略:釋出訊息,和修改訊息,當訊息釋出出去後,還未執行,此時更新的訊息又來了,這個時候未執行的訊息的版本比現在提交的訊息版本要低就可以被丟棄了
4、DiscardPolicy策略:如果你提交的任務無關緊要,你就可以使用它 。因為它就是個空實現,會悄無聲息的吞噬你的的任務。所以這個策略基本上不用了
//建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 ExecutorService executorService = Executors.newCachedThreadPool(); //建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。 ExecutorService executorService1 = Executors.newFixedThreadPool(1); //建立一個定長執行緒池,支援定時及週期性任務執行。 ScheduledExecutorService executorService2 = Executors.newScheduledThreadPool(1); //建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。 ExecutorService executorService3 = Executors.newSingleThreadExecutor();
這裡不建議我們這樣建立執行緒池,比如說建立一個無界佇列執行緒池,我們的任務接二連三的來,對列大小無限大導致記憶體溢位。
1.1、execute()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
*分三步進行:
1. 如果正在執行的執行緒小於corePoolSize,請嘗試執行此操作
以給定的命令作為第一個命令啟動一個新執行緒
的任務。對addWorker的呼叫會自動檢查runState和
workerCount,從而防止新增的假警報
執行緒,通過返回false。
2. 如果一個任務可以成功排隊,那麼我們仍然需要
再次檢查是否應該新增執行緒
(因為現有的在上次檢查後就死了)或者其他的
進入此方法後,池關閉。所以我們
重新檢查狀態,並在必要時回滾排隊if
停止,如果沒有執行緒,則啟動一個新執行緒。
3.如果我們不能對任務進行排隊,那麼我們嘗試新增一個新的
執行緒。如果它失敗了,我們知道我們關閉了或飽和了
所以拒絕這個任務。
*/
int c = ctl.get(); //獲取執行緒池的執行狀態和有效執行緒
if (workerCountOf(c) < corePoolSize) { //獲取當前執行緒數與核心執行緒數比較,如果小於
if (addWorker(command, true)) //呼叫addworker(任務,是否是核心執行緒)來新增工作執行緒
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//有效執行緒數不小於核心執行緒數,檢查執行緒池是否是RUNNING狀態;是,將任務物件新增到任務佇列中
int recheck = ctl.get();//再次獲取執行緒池的執行狀態和有效執行緒數
if (! isRunning(recheck) && remove(command))//當前執行緒池不處於RUNNING狀態,移除任務佇列workQueue中的任務物件,並執行拒絕策略
reject(command);//執行拒絕策略
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//當前執行緒池中的worker數為0,則直接建立一個(非核心)執行緒,task為空的執行緒在執行時,會直接到任務佇列中去獲取任務
}
else if (!addWorker(command, false))
reject(command);//將任務物件新增到任務佇列中失敗,則新增到執行緒池的有效執行緒中,如果失敗,執行拒絕策略
}
1.1.1、ctl
我們在上圖看到了ctl,這裡這個ctl是什麼?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl作為AtomicInteger類存放了類中的兩種資訊,在其中由高3位來儲存執行緒池的狀態,後29位來儲存此時執行緒池中的Woker類執行緒數量(由此可知,執行緒池中的執行緒數量最高可以接受大約在五億左右)。由此可見給出的runStateOf()和workerCountOf()方法分別給出了檢視執行緒狀態和執行緒數量的方法。
RUNNING狀態可以接受新進來的任務,同時也會執行佇列裡的任務。
SHUTDOWN狀態已經不會再接受新任務,但仍舊會處理佇列中的任務。
STOP狀態在之前的基礎上,不會處理佇列中的人物,在執行的任務也會直接被打斷。
TIDYING狀態在之前的基礎上,所有任務都已經終止,池中的Worker執行緒都已經為0,也就是stop狀態在清理完所有工作執行緒之後就會進入該狀態,同時在shutdown狀態在佇列空以及工作執行緒清理完畢之後也會直接進入這個階段,這一階段會迴圈執行terminated()方法。
TERMINATED 狀態作為最後的狀態,在之前的基礎上terminated()方法也業已執行完畢,才會從上個狀態進入這個狀態,代表執行緒池已經完全停止。
1.1.2、addWork()方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry://retry後面跟迴圈,標記這個迴圈的位置。break後面加retry表示要跳出這個標記的迴圈,continue後面加retry表示跳過這個標記的迴圈的本次迴圈。
for (;;) {
int c = ctl.get();//獲取執行緒池執行狀態和有效執行緒數
int rs = runStateOf(c);//獲取執行緒池執行狀態
//不能新增有效執行緒的情況,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//獲取執行緒池中有效執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//CAS操作,使得有效執行緒數+1
break retry;//CAS操作成功,跳出retry迴圈
c = ctl.get(); // 再次獲取執行緒池執行狀態和有效執行緒數
if (runStateOf(c) != rs)//執行緒池執行狀態改變
continue retry;//跳過本次迴圈,繼續retry標記的迴圈
}
}
boolean workerStarted = false;//新執行緒成功啟動標識
boolean workerAdded = false;//新執行緒新增到workers中的標識
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;//可重入鎖
w = new Worker(firstTask);//將firstTask封裝成Worker物件
final Thread t = w.thread;//w對應的執行緒
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //檢查執行緒t是否已經啟動
throw new IllegalThreadStateException();
workers.add(w);//w新增到workers中
int s = workers.size();//workers的大小
if (s > largestPoolSize)//如果s大於largestPoolSize,則需要將s賦值給largestPoolSize
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//將執行緒新增到workers中失敗,需要回滾,
}
return workerStarted;
}
首先判斷當前執行緒池的狀態,如果已經狀態不是shutdown或者running,或者已經為shutdown但是工作佇列已經為空,那麼這個時候直接返回新增工作失敗。接下來是對執行緒池執行緒數量的判斷,根據呼叫時的core的值來判斷是跟corePoolSize還是 maximumPoolSize判斷。
在確認了執行緒池狀態以及執行緒池中工作執行緒數量之後,才真正開始新增工作執行緒。
新建立一個worker類(執行緒池的內部類,具體的工作執行緒),將要執行的具體執行緒做為構造方法中的引數傳遞進去,接下來將其加入執行緒池的工作執行緒容器workers,並且更新工作執行緒最大量,最後呼叫worker工作執行緒的start()方法,就完成了工作執行緒的建立與啟動。
讓我們回到execute()方法,如果我們在一開始的執行緒數量就大於corePoolSize,或者我們在呼叫addworker()方法的過程中出現了問題導致新增工作執行緒數量失敗,那麼我們會繼續執行接下來的邏輯。
在判斷完畢執行緒池的狀態後,則會將任務通過workQueue.offer())方法試圖加進任務佇列。Offer()方法的具體實現會根據線上程池構造方法中選取的任務佇列種類而產生變化。
但是如果成功加入了任務佇列,仍舊需要注意判斷如果執行緒池的狀態如果已經不是running那麼會拒絕執行這一任務並執行相應的拒絕策略。在最後需要記得成功加入佇列成功後如果執行緒池中如果已經沒有了工作執行緒,需要重新建立一個工作執行緒去執行仍舊在任務佇列中等待執行的任務。
如果在之前的前提下加入任務佇列也失敗了(比如任務佇列已滿),則會在不超過執行緒池最大執行緒數量的前提下建立一個工作執行緒來處理。
如果在最後的建立工作執行緒也失敗了,那麼我們只有很遺憾的執行任務的拒絕策略了。
1.1.2.1、Worker類
繼承AQS,實現Runnable,提供了執行緒中斷機制;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//執行任務的執行緒
final Thread thread;
//需要執行的任務,可為空
Runnable firstTask;
//執行的任務書
volatile long completedTasks;
//構造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//將worker物件傳入到執行緒中,執行緒啟動後,會執行下面的run方法
}
public void run() {
runWorker(this);
}
//鎖的狀態,1:加鎖狀態;0:未加鎖狀態
protected boolean isHeldExclusively() {
return getState() != 0;
}
//搶佔鎖
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//釋放鎖
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中斷執行緒
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
抽離一下worker類的內部成員:
thread作為worker的工作執行緒空間,由執行緒池中所設定的執行緒工廠生成。
firstTask則是worker在構造方法中所接受到的所要執行的任務。
completedTasks作為該worker類所執行完畢的任務總數。
我們建立完worker,就會呼叫worker的工作執行緒空間就來到了run方法
public void run() {
runWorker(this);
}
繼續追蹤
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//獲取當前執行緒
Runnable task = w.firstTask;//獲取Worker物件中的任務
w.firstTask = null;
w.unlock(); // 允許中斷
boolean completedAbruptly = true;
try {
//任務不為空或者從任務佇列中獲取的任務不為空,則進入while迴圈
while (task != null || (task = getTask()) != null) {
w.lock();
//檢查當前執行緒是否需要中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//沒有具體實現,使用者根據業務場景進行自定義
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//類似beforeExecute
}
} finally {
task = null; //設定為null,在執行完畢後,工作執行緒的使命並沒有真正宣告段落。在while部分worker仍舊會通過getTask()方法試圖取得新的任務。下面是getTask()的實現。
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//處理worker退出的流程
}
如果這個worker還沒有執行過在構造方法就傳入的任務,那麼在這個方法中,會直接執行這一任務,如果沒有,則會嘗試去從任務隊列當中去取的新的任務。
但是在真正呼叫任務之前,仍舊會判斷執行緒池的狀態,如果已經不是running亦或是shutdwon,則會直接確保執行緒被中斷。如果沒有,將會繼續執行並確保不被中斷。
接下來可見,我們所需要的任務,直接在工作執行緒中直接以run()方式以非執行緒的方式所呼叫,這裡也就是我們所需要的任務真正執行的地方。
在執行完畢後,工作執行緒的使命並沒有真正宣告段落。在while部分worker仍舊會通過getTask()方法試圖取得新的任務。下面是getTask()的實現。
1、getTask(); ThreadPoolExecutor中方法,從任務佇列中獲取任務。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 1)、當前執行緒池的執行狀態是STOP、TIDYING、TERMINATED中的一個;2)、當前執行緒池的執行狀態是SHUTDOWN,任務佇列已空;
decrementWorkerCount();//有效執行緒數減1
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;//核心執行緒設定了存活時間或者有效執行緒數大於核心執行緒數(存在非核心執行緒)
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//根據timed的值,對r進行賦值。timed為true,呼叫workQueue.poll(),如果在限定時間內沒有取到任務,執行timedOut = true;timed為false,呼叫workQueue.take(),如果沒有立即拿到任務,執行緒會被阻塞,直到從任務佇列拿到任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先仍舊會判斷執行緒池的狀態是否是running還是shutdown以及stop狀態下佇列是否仍舊有需要等待執行的任務。如果狀態沒有問題,則會跟據allowCoreThreadTimeOut和corePoolSize的值通過對前面這兩個屬性解釋的方式來選擇從任務佇列中獲得任務的方式(是否設定timeout)。其中的timedOut保證了確認前一次試圖取任務時超時發生的記錄,以確保工作執行緒的回收。
2、processWorkerExist()方法來執行工作執行緒的回收
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
在這一方法中,首先確保已經重新更新了執行緒池中工作執行緒的數量,之後從執行緒池中的工作執行緒容器移去當前工作執行緒,並且將完成的任務總數加到執行緒池的任務總數當中。
在最後仍舊要確保執行緒池中依舊存在大於等於最小執行緒數量的工作執行緒數量存在,如果沒有,則重新建立工作執行緒去等待處理任務佇列中任務。
1.2、submit()方法:
基本使用
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 4, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("測試");
return t;
}
});
Callable<Integer> callable = () -> {
System.out.println("我是callable");
return 66;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
executor.submit(futureTask);
System.out.println(futureTask.get());
Future<Integer> submit = executor.submit(callable);
System.out.println(submit.get());
}
}
方法過載:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
我們看看這個方法的父類
1.2.1、前景複習
Callable類
package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
//返回結果,如果無法執行,則丟擲異常。
V call() throws Exception;
}
這是函式式介面
Future類
package java.util.concurrent;
public interface Future<V> {
//試圖取消此任務的執行。如果任務已完成、已取消或由於某些原因無法取消,則嘗試取消失敗。如果任務未啟動時嘗試取消成功,那麼這個任務就永遠不能執行。如果任務已經啟動,則根據mayInterruptIfRunning引數確定是否要中斷該任務的執行緒,阻止任務的執行。
boolean cancel(boolean mayInterruptIfRunning);
//如果任務在正常完成之前被取消,則返回true
boolean isCancelled();
//任務是否完成,完成返回true
boolean isDone();
//等待計算完成,然後檢索其結果。
V get() throws InterruptedException, ExecutionException;
//在給定的時間等待計算完成,然後檢索其結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask類
FutureTask重寫了Runnable的run方法。
1.2.2、newTaskFor
這三個方法都用到了newTaskFor方法,我們看看newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
我們返回給定可呼叫任務的RunnableFuture物件;
1.2.2.1、FutureTask(Callable callable)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 確保可呼叫的可見性
1.2.2.2、 FutureTask中的任務狀態
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL 正常
* NEW -> COMPLETING -> EXCEPTIONAL 異常
* NEW -> CANCELLED 取消
* NEW -> INTERRUPTING -> INTERRUPTED 中斷
//state用來記錄FutureTask內部任務的執行狀態
private volatile int state;
//新任務的初始狀態
private static final int NEW = 0;
//任務已經執行完成或者執行任務過程中發生異常,執行結果還沒有賦值給outcome欄位時的狀態
private static final int COMPLETING = 1;
//任務已經執行完成並且任務執行結果已經儲存到outcome欄位時的狀態
private static final int NORMAL = 2;
//任務執行發生異常並且異常原因已經儲存到outcome欄位時的狀態
private static final int EXCEPTIONAL = 3;
//任務未執行或者進行中,呼叫cancel(false)方法取消任務但是不中斷任務執行執行緒時的狀態
private static final int CANCELLED = 4;
//任務未執行或者進行中,呼叫cancel(false)方法取消任務,要中斷任務執行執行緒還沒有中斷時的狀態
private static final int INTERRUPTING = 5;
//中斷任務執行執行緒後的狀態
private static final int INTERRUPTED = 6;
1.2.3、FutureTask的run方法
剩下的就是放到execute()中去執行了,大體邏輯和前邊差不多,我們看看封裝為FutureTask的run方法有什麼不同?
public void run() {
//狀態如果不是NEW,說明任務已執行/已被取消;狀態如果是NEW,嘗試把當前執行執行緒儲存在runner欄位中,儲存失敗;以上兩種情況直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;//獲取前面初始化賦值的callable任務
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//任務執行
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);//異常,這就是submit為啥不報錯的原因,內部已經消化。
}
if (ran)
set(result);
}
} finally {
// 執行器必須為非null,直到解決狀態為止,以防止同時呼叫run()
runner = null;
// 清空執行器後必須重新讀取狀態,以防止洩漏中斷
int s = state;
if (s >= INTERRUPTING)//任務被中斷,執行中斷操作
handlePossibleCancellationInterrupt(s);
}
}
1.2.3.1、setException(Throwable t)
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//狀態從NEW改成COMPLETING
outcome = t;//將異常賦值給outcome欄位,這個欄位就是get()呼叫的返回值。
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 狀態改成EXCEPTIONAL
finishCompletion();//移除並通知所有等待的執行緒
}
}
這段程式碼的主要作用是:任務執行異常,修改狀態並儲存異常資訊;
1.2.3.1.1、finishCompletion()
private void finishCompletion() {
// assert state > COMPLETING;
//依次遍歷waiters連結串列,喚醒節點中的執行緒,然後把callable置空
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // 丟失引用幫助GC
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
刪除併發出所有等待執行緒的訊號,呼叫done(),並使callable無效。
1.2.3.2、set(V v)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//狀態從NEW改成COMPLETING
outcome = v;//將結果賦值給outcome 欄位,這跟異常資訊是同一個欄位
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //狀態設定為NORMAL
finishCompletion();
}
}
1.2.3.2、handlePossibleCancellationInterrupt(s);
private void handlePossibleCancellationInterrupt(int s) {
// 我們傳進來的狀態等於INTERRUPTING就迴圈等待,知道狀態變更
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// 我們想清除從cancel(true)收到的所有中斷。但是,允許將中斷用作任務與其呼叫方進行通訊的獨立機制,並且無法清除清除中斷。
//
// Thread.interrupted();
}
確保來自可能的cancel(true)的任何中斷僅在執行或runAndReset時才傳遞給任務。
1.2.4、get()方法
當我們的任務執行完的時候,我們就回去獲取執行結果,這個執行結果就是通過get()方法來獲取的。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//如果是初始化和完成狀態則執行
s = awaitDone(false, 0L);
return report(s);
}
1.2.4.1、awaitDone(false, 0L)
private int awaitDone(boolean timed, long nanos) //是否超時等待,如果超時等待時間是多少
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (; ; ) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
} else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
} else
LockSupport.park(this);
}
}
阻塞等待任務執行完成
1.2.4.2、report(s)
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)//如果是正常狀態
return (V) x;
if (s >= CANCELLED)//如果不是正常狀態
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
異常處理
當我們的執行緒有任務出錯了,我們應該如何處理,接下來我就告訴大家幾種方法。
首先我們要知道execute方法和submit對待異常是不一樣的,execute是會丟擲異常的,而submit不會丟擲異常,原因前面也提到了,就是callable封裝為FutureTask,而FutureTask重寫了Runable的run方法,異常在內部補貨了,放在了outcome,通過get()方法獲取。
1、try catch
executorService.submit(()->{
try{
int i=1/0;
}catch (Exception ex){
System.out.println("sumbit提交"+ex.getMessage());
}
});
executorService.submit(()->{
System.out.println("當執行緒池丟擲異常後繼續新的任務");
});
我們為每一個執行緒任務加上try catch。我們就可以看到執行的異常了,這樣是很麻煩的,我們就需要另一個方式。
2、UncaughtExceptionHandler
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
*
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}
UncaughtExceptionHandler 是Thread類一個內部類,也是一個函式式介面。
內部的uncaughtException是一個處理執行緒內發生的異常的方法,引數為執行緒物件t和異常物件e。
我們應該如何使用那,直接給Thread執行緒繫結就可以了。
//建立執行緒物件 內部會丟擲異常
Thread thread=new Thread(()->{
int i=1/0;
});
//設定該物件的預設異常處理器
thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e)->{
System.out.println("exceptionHandler"+e.getMessage());
});
//啟動執行緒
thread.start();
如果我們不想在每個執行緒的任務裡面都加try-catch的話,可以自己實現的一個執行緒池,重寫它的執行緒工廠方法,在建立執行緒的時候,都賦予UncaughtExceptionHandler處理器物件。
//1.實現一個自己的執行緒池工廠
ThreadFactory factory = (Runnable r) -> {
//建立一個執行緒
Thread t = new Thread(r);
//給建立的執行緒設定UncaughtExceptionHandler物件 裡面實現異常的預設邏輯
t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
System.out.println("執行緒工廠設定的exceptionHandler" + e.getMessage());
});
return t;
};
//2.建立一個自己定義的執行緒池,使用自己定義的執行緒工廠
ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory);
//3.提交任務
service.execute(()->{
int i=1/0;
});
我們來看看這個方法是什麼時候呼叫的。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
向處理程式排程一個未捕獲的異常。此方法僅應由JVM呼叫。
同理submit()方法也不管用:submit方法內部已經捕獲了異常, 只是沒有打印出來,也因為異常已經被捕獲,因此jvm也就不會去呼叫Thread的UncaughtExceptionHandler去處理異常。就和我們前邊的出的結論一致了。
希望大家多多指正