1. 程式人生 > >併發:FutureTask詳解。

併發:FutureTask詳解。

FutureTask介面和實現Future介面的FutureTask類,代表非同步計算的結果。

FutureTask簡介

FutureTask除了實現Future介面外,還實現了Runnable介面。因此,FutureTask可以交給Executor執行也可以由當呼叫執行緒直接執行(FutureTask.run())。根據FutureTask.run()方法被執行的時機,FutureTask可以處於下面3種狀態。

  • 未啟動。FutureTask.run()方法還沒有被執行之前,FutureTask處於未啟動狀態。當建立一個FutureTask,且沒有執行FutureTask.run()方法之前,這個FutureTask處於未啟動狀態。
  • 已啟動。FutureTask.run()方法被執行的過程中,FutureTask處於已啟動狀態。
  • 已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(...)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask處於已完成狀態。

下圖是FutureTask的狀態遷移的示意圖。

當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致呼叫執行緒阻塞;當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致呼叫執行緒立即返回結果或丟擲異常。

當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務執行緒的方式來試圖停止任務;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的執行緒產生影響(讓正在執行的任務執行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel(...)方法將返回false。

下圖是get方法和cancel方法的執行示意圖。

FutureTask的使用

可以把FutureTask交給Executor執行;也可以通過ExecutorService.submit(...)方法返回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.cancel(...)方法。除此以外,還可以單獨使用FutureTask。

當一個執行緒需要等待另一個執行緒把某個任務執行完後他才能繼續執行,此時可以使用FutureTask。假設有多個執行緒執行若干任務,每個任務最多隻能被執行一次。當多個執行緒試圖同時執行同一個任務時,只允許一個執行緒執行任務,其他執行緒需要等待這個任務執行完後才能繼續執行。下面是對應的示例程式碼。

public class FutureTaskTest {
	private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<Object, Future<String>>();
	public final String executionTask(final String taskName) {
		while (true) {
			Future<String> future = taskCache.get(taskName);	// 1.1,2.1
			if(future == null) {
				Callable<String> task = new Callable<String>() {
					@Override
					public String call() throws Exception {
						return taskName;
					}
					
				};
				// 1.2 建立任務
				FutureTask<String> futureTask = new FutureTask<String>(task);
				future = taskCache.putIfAbsent(taskName, futureTask);	// 1.3
				if(future == null) {
					future = futureTask;
					futureTask.run(); // 1.4 執行任務
				}
			}
			try {
				return future.get();	// 1.5,2.2執行緒在此等待任務執行完成
			} catch (InterruptedException | ExecutionException e) {
				taskCache.remove(taskName, future);
			}
		}
	}
}

上述程式碼的執行示意圖如下所示。

當兩個執行緒試圖同時執行同一個任務時,如果Thread 1執行1.3後Thread 2 執行2.1,那麼接下來Thread 2jiang在2.2等待,直到Thread 1執行完1.4後Thread 2才能從2.2(FutureTask.get())返回。

FutureTask的實現

FutureTask的實現基於AbstractQueuedSynchronizer(以下簡稱為AQS)。java.util.concurrent中的很多可阻塞類(比如ReentrantLock)都是基於AQS來實現的。AQS是一個同步框架,他提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。JDK 6中AQL被廣泛使用,基於AQS實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

每一個基於AQL實現的同步器都會包含兩種型別的操作,如下。

  • 至少一個acquire操作。這個操作阻塞呼叫執行緒,除非/直到AQS的狀態允許這個執行緒繼續執行。FutureTask的acquire操作為get()/get(long timeout, TimeUnit unit)方法呼叫。

  • 至少一個release操作。這個操作改變AQS的改變,改變後的狀態可允許一個或多個阻塞執行緒被解除阻塞。FutureTask的release操作包括run()方法和cancel(...)方法。

基於“符合優先於繼承”的原則,FutureTask聲明瞭一個內部私有的繼承於AQS的子類Sync,對FutureTask所有公有方法的呼叫都會委託給這個內部子類。

AQS被作為“模板方法模式”的基礎類提供給FutureTask的內部子類Sync,這個內部子類只需要實現檢查和狀態更新的方法即可,這些方法將控制FutureTask的獲取和釋放操作。具體來說,Sync是新啊了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態。

FutureTask的設計示意圖如下所示。

如上圖所示,Sync是FutureTask的內部私有類,他繼承自AQS。建立FutureTask時會建立內部私有的成員物件Sync,FutureTask所有的公有方法都直接委託給了內部私有的Sync。

FutureTask.get()方法會呼叫AQS.acquireSharedInterruptibly(int arg)方法,這個方法的執行過程如下。

  1. 呼叫AQS.acquireSharedInterruptibly(int arg)方法,這個方法首先會回撥在子類Sync中實現的tryAcquireShared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為:state為執行完成狀態RAN或已取消狀態CANCELLED,且runner不為null。

  2. 如果成功則get()方法立即返回。如果失敗則到執行緒等待佇列中去等待其他執行緒執行release操作。

  3. 當其他執行緒執行release操作(比如FutureTask.run()或FutureTask.cancel(...))喚醒當前執行緒後,當前執行緒再次執行tryAcquireShared()將返回正值1,當前執行緒將歷開執行緒等待佇列並喚醒他的後繼執行緒(這裡會產生級聯喚醒的效果)。

  4. 最後返回計算的結構或丟擲異常。

FutureTask.run()的執行過程如下。

  1. 執行在建構函式中指定的任務(Callable.call())。

  2. 以原子方式來更新同步狀態(呼叫AQS.compareAndSetState(int expect, int update),設定state為執行完成狀態RAN)。如果這個原子操作成功,就設定代表計算結果的變數result的值為Callable.call()的返回值,然後呼叫AQS.releaseShared(int arg);

  3. AQS.releaseShared(int arg)首先會回撥在子類Sync中實現的tryReleaseShared(arg)來執行release操作(設定執行任務的執行緒runner為null,然後返回true);AQS.releaseShared(int arg),然後喚醒執行緒等待佇列中的第一個執行緒。

  4. 呼叫FutureTask.done()。

當執行FutureTask.get()方法,如果FutureTask不是處於執行完成狀態RAN或已取消狀態CANCELLED,當前執行執行緒將到AQS的執行緒等待佇列中等待(見下圖的執行緒A、B、C和D)。當某個執行緒執行FutureTask.run()方法或FutureTask.cancel(...)方法時,會喚醒執行緒等待佇列的第一個執行緒(見下圖所示的執行緒E喚醒執行緒A)。

假設開始時FutureTask處於未啟動狀態或已啟動狀態,等待佇列中已經有3個執行緒(A、B和C)在等待。此時,執行緒D執行get()方法將導致執行緒D也到等待佇列中去等待。

當執行緒E執行run()方法時,會喚醒佇列中的第一個執行緒A。執行緒A被喚醒後,首先把自己從佇列中刪除,然後喚醒他的後繼執行緒B,最後執行緒A從get()方法返回。執行緒B、C和D重複A執行緒的處理流程。最終,在佇列中等待的所有執行緒都被級聯喚醒並從get()方法返回。