【搞定Java併發程式設計】第29篇:Executor 框架詳解
上一篇:Java中的執行緒池詳解
本文目錄:
3、ScheduledThreadPoolExecutor 詳解
3.1、ScheduledThreadPoolExecutor 的執行機制
3.2、ScheduledThreadPoolExecutor 的實現
本文的內容來自於學習《Java併發程式設計的藝術》一書的學習筆記。
Java中的執行緒即是工作單元也是執行機制,從JDK 5後,工作單元與執行機制被分離。工作單元包括Runnable和Callable,執行機制由JDK 5中增加的java.util.concurrent包中Executor框架提供。
1、Executor 框架簡介
1.1、Executor 框架的兩級排程模型
在 HotSpot VM 的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執行緒。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,這個作業系統執行緒也被回收。作業系統會排程所有執行緒並將它們分配給可用的CPU。
在上層,Java多執行緒程式通常把應用分解為若干個任務,然後使用使用者級的排程器(Executor框架)將這些任務對映為固定的數量的執行緒;在底層,作業系統核心將這些執行緒對映到硬體處理器上。這兩級的排程模型的示意圖如下所示:
從圖中可以看出,應用程式通過 Executor 框架控制上層的排程;而下層的排程由作業系統核心控制,下層的排程不受應用程式的控制。
1.2、Executor 框架的結構與成員
1.2.1、Executor 框架的結構
Executor 框架主要由3大部分組成如下:
1、任務
2、任務的執行:包括任務執行機制的核心介面Executor,以及繼承自 Executor 的 ExecutorService 介面。Executor 框架有兩個關鍵類實現了 ExecutorService 介面,即:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor;
3、非同步計算的結果:包括介面 Future 和實現 Future 介面的 FutureTask 類。
Executor 框架包含的主要類和介面如下圖所示:
1、Executor:是一個介面,它是 Executor 框架的基礎,它將任務的提交與任務的執行分離開來;
2、ThreadPoolExecutor:是執行緒池的核心實現類,用來執行被提交的任務;
3、ScheduledThreadPoolExecutor:是一個實現類,可以在給定的延遲後執行命令,或者定期執行命令。ScheduledThreadPoolExecutor 比 Timer 更加靈活,功能更強大;
4、Future 介面和實現 Future 介面的 FutureTask 類,代表非同步計算的結果;
5、Runnable 介面和 Callable 介面的實現類,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。
Executor 框架的使用示意圖如下圖所示:
主執行緒首先要建立實現Runnable或者Callable介面的任務物件。工具類Executors可以把一個Runnable物件封裝為一個Callable物件(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。
然後可以把Runnbale物件直接交給 ExecutorService 執行(ExecutorService.execute(Runnable command));或者也可以把Runnable或者Callable物件提交給 ExecutorService 執行(ExecutorService.submit(Runnable task) 或 ExcutorService.submit(Callable<T> task))。
如果執行ExecutorService.submit( ... ),ExecutorService 將返回一個實現 Future 介面的物件。由於 FutureTask 實現了 Runnable,程式設計師也可以建立 FutureTask,然後直接交給 ExecutorService 執行。
最後,主執行緒可以執行 FutureTask.get() 方法來等待任務執行完成。主執行緒也可以執行 FutureTask.cancel(boolean mayInterruptIfRunning) 來取消此任務的執行。
1.2.2、Executor 框架的成員
Executor 框架的主要成員包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors。
- 1、ThreadPoolExecutor
ThreadPoolExecutor 通常使用工廠類 Executors 來建立。Executors 可以建立3種類型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。
- 2、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 通常使用工廠類 Executors 來建立。Executors可以建立2種類型的 ScheduledThreadPoolExecutor,如下:
- ScheduledThreadPoolExecutor:包含若干個執行緒的ScheduledThreadPoolExecutor;
- SingleThreadScheduledExecutor:包含一個執行緒的ScheduledThreadPoolExecutor。
- 3、Future介面
Future 介面和實現 Future 介面的 FutureTask 類用來表示非同步計算的結果。當我們把 Runnable 介面或 Callable 介面的實現類提交(submit)給 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 時,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 會向我們返回一個 FutureTask 物件。
- 4、Runnable 介面和 Callable 介面
Runnable 介面和 Callable 介面的實現類,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。它們之間的區別是 Runnable 不會返回結果,而 Callable 可以返回結果。
2、ThreadPoolExecutor 詳解
其實關於 ThreadPoolExecutor 在上一篇文章:Java中的執行緒池詳解已經進行了原始碼講解,這裡僅介紹下它的3種類型:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。
2.1、FixedThreadPool
建立固定長度的執行緒池,每次提交任務建立一個執行緒,直到達到執行緒池的最大數量,執行緒池的大小不再變化。
這個執行緒池可以建立固定執行緒數的執行緒池。特點就是可以重用固定數量執行緒的執行緒池。它的構造原始碼如下:
ublic static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- FixedThreadPool的corePoolSize和maxiumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads;
- 0L則表示當執行緒池中的執行緒數量操作核心執行緒的數量時,多餘的執行緒將被立即停止;
- 最後一個引數表示FixedThreadPool使用了無界佇列LinkedBlockingQueue作為執行緒池的做工佇列,由於是無界的,當執行緒池的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池的執行緒數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的引數,並且執行中的執行緒池並不會拒絕任務。
FixedThreadPool執行圖如下:
執行過程如下:
1.如果當前工作中的執行緒數量少於corePool的數量,就建立新的執行緒來執行任務。
2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.執行緒執行完1中的任務後會從佇列中去任務。
注意:LinkedBlockingQueue是無界佇列,所以可以一直新增新任務到執行緒池。
2.2、SingleThreadExecutor
SingleThreadExecutor是使用單個worker執行緒的Executor。特點是使用單個工作執行緒執行任務。它的構造原始碼如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的 corePoolSize 和 maxiumPoolSize 都被設定1。其他引數均與 FixedThreadPool 相同,其執行圖如下:
執行過程如下:
1.如果當前工作中的執行緒數量少於corePool的數量,就建立一個新的執行緒來執行任務。
2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.執行緒執行完1中的任務後會從佇列中去任務。
注意:由於線上程池中只有一個工作執行緒,所以任務可以按照新增順序執行。
2. 3、CachedThreadPool
CachedThreadPool是一個”無限“容量的執行緒池,它會根據需要建立新執行緒。特點是可以根據需要來建立新的執行緒執行任務,沒有特定的corePool。下面是它的構造方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximum是無界的。這裡keepAliveTime設定為60秒,意味著空閒的執行緒最多可以等待任務60秒,否則將被回收。
CachedThreadPool使用沒有容量的SynchronousQueue作為主執行緒池的工作佇列,它是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作。這意味著,如果主執行緒提交任務的速度高於執行緒池中處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU資源。其執行圖如下:
執行過程如下:
1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的執行緒池中有空閒的執行緒正在執行SynchronousQueue.poll(),那麼主執行緒執行的offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行。,execute()方法執行成功,否則執行步驟2;
2.當執行緒池為空(初始maximumPool為空)或沒有空閒執行緒時,配對失敗,將沒有執行緒執行SynchronousQueue.poll操作。這種情況下,執行緒池會建立一個新的執行緒執行任務;
3.在建立完新的執行緒以後,將會執行poll操作。當步驟2的執行緒執行完成後,將等待60秒,如果此時主執行緒提交了一個新任務,那麼這個空閒執行緒將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會佔用系統資源。
SynchronousQueue是一個不儲存元素阻塞佇列,每次要進行offer操作時必須等待poll操作,否則不能繼續新增元素。
2.4、具體應用案例
1、newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:
public class Demo1 {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for(int i = 0; i < 10; i++){
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable(){
@Override
public void run() {
System.out.println(index);
}
});
}
}
}
執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。
2、newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:
public class Demo2 {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache。
3、newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:
public class Demo3 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable(){
@Override
public void run() {
System.out.println("延遲3秒");
}
}, 3, TimeUnit.SECONDS);
}
}
表示延遲3秒執行。定期執行示例程式碼如下:
public class Demo4 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
System.out.println("延遲1秒,每3秒執行1次");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
表示延遲1秒後每3秒執行一次。ScheduledExecutorService比Timer更安全,功能更強大,後面會有一篇單獨進行對比。
4、newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:
public class Demo5 {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
結果依次輸出,相當於順序執行各個任務。現行大多數GUI程式都是單執行緒的。Android中單執行緒可用於資料庫操作,檔案操作,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI執行緒響應的操作。
3、ScheduledThreadPoolExecutor 詳解
我們知道Timer與TimerTask雖然可以實現執行緒的週期和延遲排程,但是Timer與TimerTask存在一些缺陷,所以對於這種定期、週期執行任務的排程策略,我們一般都是推薦ScheduledThreadPoolExecutor來實現。下面就深入分析ScheduledThreadPoolExecutor是如何來實現執行緒的週期、延遲排程的。
ScheduledThreadPoolExecutor,繼承ThreadPoolExecutor且實現了ScheduledExecutorService介面,它就相當於提供了“延遲”和“週期執行”功能的ThreadPoolExecutor。在JDK API中是這樣定義它的:ScheduledThreadPoolExecutor,它可另行安排在給定的延遲後執行命令,或者定期執行命令。需要多個輔助執行緒時,或者要求 ScheduledThreadPoolExecutor具有額外的靈活性或功能時,此類要優於 Timer。 一旦啟用已延遲的任務就執行它,但是有關何時啟用,啟用後何時執行則沒有任何實時保證。按照提交的先進先出 (FIFO) 順序來啟用那些被安排在同一執行時間的任務。
3.1、ScheduledThreadPoolExecutor 的執行機制
ScheduledThreadPoolExecutor 的執行示意圖如下圖所示:
DelayQueue 是一個無界佇列,所以 ThreadPoolExecutor 的 maximumPoolSize 在 ScheduledThreadPoolExecutor 中沒有什麼意義。
ScheduledThreadPoolExecutor 的執行主要分為兩大部分:
1、當呼叫 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduledWithFixedDelay() 方法時,會向 ScheduledThreadPoolExecutor 的 DelayQueue 新增一個實現了 RunnableScheduledFuture 介面的 ScheduledFutureTask。
2、執行緒池中的執行緒從 DelayQueue 中獲取 ScheduledFutureTask,然後執行任務。
ScheduledThreadPoolExecutor 為了實現週期性的執行任務,對 ThreadPoolExecutor 做了如下的修改:
1、使用 DelayQueue 作為任務佇列;
2、獲取任務的方式不同(後文會講解到);
3、執行週期任務後,增加了額外的處理(後文會講解到)。
3.2、ScheduledThreadPoolExecutor 的實現
先來看下ScheduledThreadPoolExecutor類中的主要結構:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
private static final AtomicLong sequencer = new AtomicLong(0);
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
}
private void delayedExecute(RunnableScheduledFuture<?> task) { ... }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { ... }
protected <V> RunnableScheduledFuture<V> decorateTask(...){...}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {...}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {...}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {...}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {...}
public void execute(Runnable command) {...}
public Future<?> submit(Runnable task) {...}
public <T> Future<T> submit(Runnable task, T result) {...}
public <T> Future<T> submit(Callable<T> task) {...}
public void shutdown() {...}
public List<Runnable> shutdownNow() {...}
public BlockingQueue<Runnable> getQueue() {...}
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {...}
......
}
ScheduledThreadPoolExecutor 會把排程的任務(ScheduledFutureTask)放到一個DelayQueue中。下面來看下ScheduledFutureTask主要包含的3個成員變數:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time;
private final long period;
...
}
1、long型成員變數time:表示這個任務將要執行的具體時間;
2、long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號;
3、long型成員變數period,表示任務執行的間隔週期。
DelayQueue 封裝了一個 PriorityQueue,這個 PriorityQueue 會對佇列中的 ScheduledEutureTask 進行排序。排序時,time 小的排在前面(時間早的任務將被先執行)。如果兩個 ScheduledFutureTask 的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面,也就是說,如果兩個任務的執行時間相同,那麼先執行提交早的任務。
下圖所示的是:ScheduledThreadPoolExecutor 中的執行緒1執行某個週期任務的4個步驟:
1、執行緒1從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DealyQueue.take())。到其任務是指 ScheduledFutureTask 的 time 大於等於當前時間;
2、執行緒1執行這個 ScheduledFutureTask;
3、執行緒1修改 ScheduledFutureTask 的 time 變數為下次將要被執行的時間;
4、執行緒1把這個修改 time 之後的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
下面就看下DelayQueue.take()方法的原始碼實現:【在原始碼中:DelayQueue 就是 DelayedWorkQueue】
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
public RunnableScheduledFuture take() throws InterruptedException {
// 獲取lock
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0]; // 獲取任務
if (first == null)
available.await(); // 如果佇列為空,則等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
DelayQueue.take()的執行示意圖如下所示:
如上圖所示的過程,大致可以分為3個步驟:
1、獲取Lock;
2、獲取週期任務;
- 2.1、如果 PriorityQueue 為空,當前執行緒到 Condition 中等待,否則執行下面的2.2;
- 2.2、如果 PriorityQueue 的頭元素的 time 時間比當前時間大,到 Condition 中等待到 time 時間,否則執行2.3;
- 2.3、獲取 PriorityQueue 的頭元素,如果 PriorityQueue 不為空,則喚醒在 Condition 中等待的所有執行緒。
3、釋放Lock。
ScheduledThreadFutureTask 在一個迴圈中執行步驟2,直到執行緒從 PriorityQueue 獲取到一個元素之後,才會退出無限迴圈。
下面看下 ScheduledThreadFutureTask 中的執行緒把 ScheduledFutureTask 放入 DelayQueue 中的過程。下面是 DelayQueue.add() 的原始碼實現:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
......
}
下圖是 DelayQueue.add() 的執行示意圖:
如上圖所示,新增任務分為3大步驟:
1、獲取 Lock;
2、新增任務;
- 2.1、向 PriorityQueue 新增任務;
- 2.2、如果在上面2.1 中新增的任務是 PriorityQueue 的頭元素,則喚醒在 Conditon 中等待的所有執行緒;
3、釋放 Lock。
4、FutureTask 詳解
Future 介面和實現 Future 介面的 FutureTask 類,代表非同步計算的結果。
4.1、FutureTask 簡介
FutureTask 除了實現了 Future 介面外,還實現了 Runnable介面。那麼我們就先看下這兩個介面的內部結構。
- Future 介面的內部結構
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- Runnable 介面的內部結構
public interface Runnable {
public abstract void run();
}
由於FutureTask 除了實現了 Future 介面外,還實現了 Runnable介面。因此,FutureTask 可以交給 Executor 執行,也可以呼叫執行緒直接執行(FutureTask.run())。
根據 FutureTask.run()方法被執行的時機,FutureTask可以處於下面3種狀態:
1、未啟動:FutureTask.run()方法還沒有被執行之前,FutureTask 處於未啟動狀態。當建立一個 FutureTask,且沒有執行 FutureTask.run() 方法之前,這個 FutureTask 處於未啟動狀態;
2、已啟動:FutureTask.run()方法被執行的過程中,FutureTask 處於已啟動狀態;
3、已完成:FutureTask.run()方法執行完成後正常結束,或被取消(FutureTask.cancel(...)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask 處於已完成狀態。
FutureTask 的狀態遷移的示意圖如下所示:
FutureTask 的 get 和 cancel 的執行示意圖如下所示:
4.2、FutureTask 的實現
先看下 FutureTask 的內部結構:
public class FutureTask<V> implements RunnableFuture<V> {
private final Sync sync;
// 建構函式1 Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
// 建構函式2 Runnable
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
// 呼叫的是sync中的innerCancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
// 呼叫的是sync中的innerGet方法
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
// 呼叫的是sync中的innerGet方法
public V get(long timeout, TimeUnit unit){
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
// 呼叫的是sync中的innerRun方法
public void run() {
sync.innerRun();
}
private final class Sync extends AbstractQueuedSynchronizer {...}
// .......
}
從 FutureTask 的原始碼中可以看出來,它的實現是基於 AbstractQueuedSynchronizer 。AQS 是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。基於 AQS 實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch 和 FutureTask。
每一個基於 AQS 實現的同步器都會包含兩種型別的操作,如下:
1、至少一個 acquire 操作:這個操作阻塞呼叫執行緒,除非 / 直到 AQS 的狀態允許這個執行緒繼續執行。 FutureTask 的 acquire 操作為 get() / get(long timeout, TimeUnit unit)方法呼叫;
2、至少一個 release 操作:這個操作改變 AQS 的狀態,改變後的狀態可允許一個或多個阻塞執行緒被解除阻塞。FutureTask 的 release 操作包括 run() 方法和 cancel(...) 方法。
基於“複合優先繼承”的原則,FutureTask 聲明瞭一個內部私有的繼承於 AQS 的子類 Sync,對 FutureTask 所有公有方法的呼叫都會委託給這個內部子類。
AQS 被作為“模板方法模式”的基礎類提供給 FutureTask 的內部子類 Sync,這個內部子類只需要實現狀態檢測和狀態更新的方法即可,這些方法將控制 FutureTask 的獲取和釋放操作。具體來說,Sync實現了 AQS 的 tryAcquireShared(int)方法和 tryReleaseShared(int)方法,Sync 通過這兩個方法來檢查和更新同步狀態。
FutureTask 的設計示意圖如下圖所示:
如圖所示,Sync 是 FutureTask 的內部私有類,它繼承自 AQS。建立 FutureTask 時會建立內部私有的成員物件 Sync,FutureTask 所有的公有方法都直接委託給了內部私有的 Sync。
下面對 FutureTask 中主要的幾個方法進行呼叫過程分析:
4.2.1、FutureTask.get() 方法
- 第1步:呼叫 FutureTask 中的 get() 方法
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
從原始碼中很清楚的看到 get() 方法內部是由 sync 的 innerGet()方法實現的。
- 第2步:呼叫 Sync 中的 innerGet()方法
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
- 第3步:呼叫 AQS.acquireSharedInterruptibly(int args)方法。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 第4步:呼叫Sync.tryAcquireShared方法
- 第5步:呼叫 AQS.doAcquireSharedIntrruptibly方法
這個方法首先會在子類 Sync 中實現的 tryAcquireShared()方法來判斷 acquire 操作是否可以成功,acquire 操作可以成功的條件為:state 為執行完成狀態RAN 或取消狀態 CANCELLED,且 runner 不為null。
【至於tryAcquireShared和doAcquireSharedIntrruptibly方法,這裡不再做原始碼分析了,前面文章已經分析過多次了】
如果成功則立即返回,如果失敗則到執行緒等待佇列中去等待其他執行緒執行 release 操作。
當其他執行緒執行 release 操作(比如:FutureTask.run() 或 FutureTask.cancel(...))喚醒當前執行緒後,當前執行緒再次執行 tryAcquiredShared() 將返回正值 1,當前執行緒將離開執行緒等待佇列,並喚醒它的後繼節點執行緒。
最後返回計算的結果或者丟擲異常。
4.2.2、FutureTask.run() 方法
- 第1步:呼叫了 FutureTask.run() 方法
public void run() {
sync.innerRun();
}
可以看到 run() 方法內部仍然是呼叫了 Sync.innerRun() 方法。
- 第2步:呼叫 Sync.innerRun() 方法
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
Sync.innerRun() 方法中以原子的方式更新同步狀態(呼叫AQS.compareAndSetState(READY, RUNNING),將 state 值設定為 RUNNING 狀態)。如果這個原子操作成功,就設定代表計算結果的變數 result 的值為 Callable.call() 的返回值,然後呼叫AQS.releaseShared(int args)方法。
- 第3步:呼叫AQS.releaseShared(int args)方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
AQS.releaseShared(int args)首先會回撥子類 Sync 中實現的 tryReleaseShared(int args)方法來執行 release操作。
- 第4步:呼叫 Sync.tryReleaseShared(int args) 方法
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
設定允許任務執行緒 runner 為 null,然後返回 true。
- 第5步:呼叫AQS.doReleaseShared() 方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
喚醒執行緒等待佇列中的第一個執行緒。
全文完!
上一篇:Java中的執行緒池詳解