Java異步編程——深入源碼分析FutureTask
Java的異步編程是一項非常常用的多線程技術。
之前通過源碼詳細分析了ThreadPoolExecutor《你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識》。通過創建一個ThreadPoolExecutor,往裏面丟任務就可以實現多線程異步執行了。
但之前的任務主要傾向於線程池,並沒有講到異步編程方面的內容。本文將通過介紹Executor+Future框架(FutureTask是實現的核心),來深入了解下Java的異步編程。
萬事從示例開始,我們先通過示例Demo有一個直觀的印象,再深入去了解概念與原理。
使用示例
Demo:
使用上比較簡單,
運行結果:
任務1異步執行:0
任務2異步執行:0
任務2異步執行:1
...
任務2異步執行:45
同步代碼
任務2異步執行:24
...
任務1異步執行:199
任務1:執行完成
...
任務2異步執行:199
任務2:執行完成
假若你多次執行這個程序,會發現結果大大的不一樣,因為兩個任務和同步代碼是異步由多條線程執行的,打印的結果當然是隨機的。
回顧這個Demo做了什麽,
- 構建了一個線程池
- 往線程池裏面丟兩個需要執行的任務
- 最後獲取這兩個任務的結果
其中第二點是異步執行兩個任務,這兩個任務和主線程分別是用了三個線程並發執行的,第三點是在主線程中同步等待兩個任務的結果。
很容易看出來,異步編程的好處就在於可以讓不相幹的任務異步執行,不阻塞主線程。若是主線程需要異步執行的結果,此時再去等待結果會更加高效,提高程序的執行效率。
下面來看看整個流程的實現原理。
源碼分析
一般在實際項目中,都會有配置有自己的線程池,建議大家在用異步編程時,配置一個專用的線程池,做好線程隔離,避免異步線程影響到其他模塊的工作。Demo中為了方便,直接調用Exectors的方法生成一個臨時的線程池,日常不建議使用。
我們從這個ExecutorService.submit()
方法入手,看看整體實現。
ExecutorService.submit()
定義一個接口。這個接口接收一個Callable參數(執行的任務),返回一個Future(計算結果)。
Callable
,相當於一個需要執行的任務。它不接收任何參數,可以返回結果,可以拋出異常。相類似的還有Runnable
Callable
更像一個方法的調用,Runnable
則是一個不需要理會結果的調用。在JDK 8以後,它們都可以通過Lamda表達式寫法去替代內部類的寫法(詳見Demo)。
Future
,一個異步計算的結果。調用get()
方法可以得到對應的計算結果,如果調用時沒有異步計算完,會阻塞等待計算的結果。同時它還提供方法可以嘗試取消任務的執行。
看回ExecutorService.submit()
的實現,代碼在實現類AbstractExecutorService
中。
除了它接口的實現,還提供了兩種變形。原來接口只接收Callable
參數,實現類中還新增了接收Runnable
參數的。
如果看過之前寫的《你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識》,應該了解ThreadPoolExecutor
執行任務是可以調用execute()
方法的。而這裏面submit()
方法則是為Callable/Runnable
加多一層FutureTask
,從而
使執行結果有一個存放的地方,同時也添加一個可以取消的功能。原本的execute()
只能執行任務,不會返回結果的,具體實現原理可以看看之前的文章分析。
FutureTask
是RunnableFuture
的實現。而RunnableFuture
是繼承Future
和Runnable
接口的,定義run()
接口。
因為FutureTask
有run()
接口,所以可以直接用一個Callable/Runnable
創建一個FutureTask
單獨執行。但這樣並沒有異步的效果,因為沒有啟用新的線程去跑,而是在原來的線程阻塞執行的。
到這裏我們清楚知道了,submit()
方法重點是利用Callable/Runnable
創建一個FutureTask
,然後多線程執行run()
方法,達到異步處理並且得到結果的效果。而FutureTask
的重點則是run()
方法如何持有保存計算的結果。
FutureTask.run()
首先判斷futureTask
對象的state
狀態,如果不是NEW的話,證明已經開始運行過了,則退出執行。同時futureTask
對象通過CAS,把當前線程賦值給變量runner
(是Thread類型,說明對象使用哪個線程執行的),如果CAS失敗則退出。
外層try{}
代碼塊中,對callable
判空和state
狀態必須是NEW。內層try{}
代碼真正調用callable
,開始執行任務。若執行成功,則把ran
變量設為true,保存結果在result
變量中,證明已跑成功過了;若拋異常了,則設為false,result
為空,並且調用setException()
保存異常。最後如果ran
為true的話,則調用set()
保存result
結果。
看下setException()
和set()
的實現。
兩者的基本流程一樣,CAS置換狀態,保存結果在outcome
變量道中,但setException()
保存的結果類型固定是Throwable
。另外一個不同在於最終state
狀態,一個是EXCEPTION,一個是NORMAL。
這兩個方法最後都調用了finishCompletion()
。這個方法主要是配合線程池喚醒下一個任務。
FutureTask.get()
從上面run()
方法得知,最後執行的結果放在了outcome
變量中。那最終怎麽從其中取出結果來,我們來看看get()
方法。
從源碼可知,get()
方法分兩步。第一步,先判斷狀態,如果計算為完成,則需要阻塞地等待完成。第二步,如果完成了,則調用report()
方法獲取結果並返回。
先看看awaitDone()
阻塞等待完成。該方法可以選用超時功能。
在自旋的for()循環中,
- 先判斷是否線程被中斷,中斷的話拋異常退出。
- 然後開始判斷運行的
state
值,如果state
大於COMPLETING
,證明計算已經是終態了,此時返回終態變量。 - 若
state
等於COMPLETING
,證明已經開始計算,並且還在計算中。此時為了避免過多的CPU時間放在這個for循環的自旋上,程序執行Thread.yield()
,把線程從運行態降為就緒態,讓出CPU時間。 - 若以上狀態都不是,則證明
state
為NEW
,還沒開始執行。那麽程序在當前循環現在會新增一個WaitNode
,在下一個循環裏面調用LockSupport.park()
把當前線程阻塞。當run()
方法結束的時候,會再次喚醒此線程,避免自旋消耗CPU時間。 - 如果選用了超時功能,在阻塞和自旋過程中超時了,則會返回當前超時的狀態。
第二步的report()
方法比較簡單。
- 如果狀態是
NORMAL
,正常結束的話,則把outcome
變量返回; - 如果是取消或者中斷狀態的,則拋出取消異常;
- 如果是
EXCEPTION
,則把outcome
當作異常拋出(之前setException()
保存的類型就是Throwable
)。從而整個get()
會有一個異常拋出。
總結
至此我們已經比較完整地了解Executor+Future的框架原理了,而FutureTask則是該框架的主要實現。下面總結下要點
Executor.sumbit()
方法異步執行一個任務,並且返回一個Future結果。submit()
的原理是利用Callable
創建一個FutureTask
對象,然後執行對象的run()
方法,把結果保存在outcome
中。- 調用
get()
獲取outcome
時,如果任務未完成,會阻塞線程,等待執行完畢。 - 異常和正常結果都放在
outcome
中,調用get()
獲取結果或拋出異常。
更多技術文章、精彩幹貨,請關註
博客:zackku.com
微信公眾號:Zack說碼
Java異步編程——深入源碼分析FutureTask