Kotlin協程的使用
前言
本篇是在Android
官網對Kotlin
協程的學習記錄。記錄Kotlin Coroutines
在Android
上的特點、應用等
協程概述
一、協程是什麼?
協程是一種併發的設計模式,可以使用它來簡化非同步執行的程式碼,它可以幫助管理一些耗時的任務,以防耗時任務阻塞主執行緒。協程可以用同步的方式寫出非同步程式碼,代替了傳統的回撥方式,讓程式碼更具有可讀性。
二、協程的特點?
-
輕量(Lightweight):其實這裡的輕量是相對執行緒阻塞而言的,協程支援掛起,掛起的時候並不會阻塞當前執行緒,也就是"非阻塞式掛起",在協程掛起的時候執行緒可以做其它的事情,而執行緒的阻塞期間是無法做其他事情的。所以協程的"非阻塞式掛起"可以節省系統的資源。
-
記憶體洩漏更少(Fewer memory leaks):使用者關閉頁面的時候,後臺執行緒可能仍然有還在執行的任務,如果使用傳統的執行緒進行後臺請求,可能沒有很好的辦法讓執行緒及時地停止執行,使用協程的話,可以通過
Job::cancel
讓協程及時地停止執行,並且可以通過協程作用域CoroutineScope
對協程進行統一管理,例如對通過CoroutineScope
啟動的協程統一進行cancel
,這種就稱作結構化併發,它讓我們的程式有更少的協程洩漏,協程洩漏可以看做是一種記憶體洩露。 -
內建取消支援(Built-in cancellation support):
Cancellation
會自動在執行中的整個協程層次結構內傳播。 -
Jetpack
和第三方框架支援:一些比如Room
、ViewModel
等Jetpack
元件,第三方框架Retrofit
等有對Kt
協程提供支援。
關於協程作用域:協程必須執行在CoroutineScope
裡(協程作用域),一個 CoroutineScope
管理一個或多個相關的協程。例如viewmodel-ktx
包下面有 viewModelScope
,viewModelScope
管理通過它啟動的協程,如果viewModel
被銷燬,那麼viewModelScope
會自動被取消,通過viewModelScope
啟動的正在執行的協程也會被取消。
掛起與恢復
協程有suspend
和resume
兩項概念:
suspend
(掛起):暫停執行當前協程,並儲存所有區域性變數。resume
(恢復):用於讓已掛起的協程從掛起處繼續執行。
協程中有一個suspend
關鍵字,它和剛剛提到的suspend
概念要區分一下,剛剛提到的suspend(掛起)
是一個概念,而suspend
關鍵字可以修飾一個函式,但是僅這個關鍵字沒有讓協程掛起的作用,一般suspend
關鍵字是提醒呼叫者該函式需要直接或間接地在協程下面執行,起到一個標記與提醒的作用。
suspend
關鍵字的標記與提醒有什麼作用?在以前,開發者很難判斷一個方法是否是耗時的,如果錯誤地在主執行緒呼叫一個耗時方法,那麼會造成主執行緒卡頓,有了suspend
關鍵字,耗時函式的建立者可以將耗時方法使用suspend
關鍵字修飾,並且在方法內部將耗時程式碼使用withContext{Dispatchers.IO}
等方式放到IO
執行緒等執行,開發者只需要直接或間接地在協程下面呼叫它即可,這樣就可以避免耗時任務在主執行緒中執行從而造成主執行緒卡頓了。
下面通過官方的一個例子,對協程的suspend
和resume
兩個概念進行說明:
suspend fun fetchDocs() { // Dispatchers.Main
val result = get("https://developer.android.com") // Dispatchers.IO for `get`
show(result) // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
我們假設在協程中呼叫fetchDocs
方法,該協程提供了一個主執行緒環境(如啟動協程時通過Dispatchers.Main
指定),另外,get
方法執行耗時任務,它使用掛起函式withContext{Dispatchers.IO}
將耗時任務放到了IO
執行緒中執行。
在fetchDocs
方法裡,當執行到get
方法開始進行網路請求的時候,它會掛起(suspend
)所在的協程,當網路請求完成時,get
會恢復(resume
)已掛起的協程,而不是使用回撥通知主執行緒。
Kotlin
使用棧幀(stack frame
)管理正在執行的函式以及它的區域性變數,當掛起一個協程的時候,系統會複製並儲存當前的棧幀以供稍後使用。協程恢復時,會將棧幀從其儲存位置複製回來,然後函式再次開始執行。
排程器
Kotlin
協程必須執行在dispatcher
裡面,協程可以將自己suspend
,dispatcher
負責resume
它們。
有下面三種Dispatcher
:
Dispatchers.Main
:在主執行緒執行協程。Dispatchers.IO
:該dispatcher
適合執行磁碟或網路I/O
,並且經過優化。Dispatchers.Default
:該dispatcher
適合執行佔用大量CPU
資源的工作(對列表排序和解析JSON
),並且經過優化。
啟動協程
有以下兩種方式啟動協程:
launch
:啟動新協程,launch
的返回值為Job
,協程的執行結果不會返回給呼叫方。async
:啟動新協程,async
的返回值為Deferred
,Deferred
繼承至Job
,可通過呼叫Deferred::await
獲取協程的執行結果,其中await
是掛起函式。
在一個常規函式啟動協程,通常使用的是launch
,因為常規函式無法呼叫Deferred::await
,在一個協程或者掛起函式內部開啟協程可以使用async
。
launch
與async
的區別:
launch
啟動的協程沒有返回結果;async
啟動的協程有返回結果。launch
啟動的協程有異常會立即丟擲;async
啟動的協程的異常不會立即丟擲,會等到呼叫Deferred::await
的時候才將異常丟擲。async
適合於一些併發任務的執行,例如有這樣的業務:做兩個網路請求,等兩個請求都完成後,一起顯示請求結果。使用async
可以這樣實現
interface IUser {
@GET("/users/{nickname}")
suspend fun getUser(@Path("nickname") nickname: String): User
@GET("/users/{nickname}")
fun getUserRx(@Path("nickname") nickname: String): Observable<User>
}
val iUser = ServiceCreator.create(IUser::class.java)
GlobalScope.launch(Dispatchers.Main) {
val one = async {
Log.d(TAG, "one: ${threadName()}")
iUser.getUser("giagor")
}
val two = async {
Log.d(TAG, "two: ${threadName()}")
iUser.getUser("google")
}
Log.d(TAG, "giagor:${one.await()} , google:${two.await()} ")
}
協程概念
CoroutineScope
CoroutineScope
會跟蹤它使用launch
或async
建立的所有協程,可以呼叫scope.cancel()
取消該作用域下所有正在執行的協程。在ktx
中,為我們提供了一些已經定義好的CoroutineScope
,如ViewModel
的viewModelScope
,Lifecycle
的lifecycleScope
,具體可以檢視Android KTX | Android Developers。
viewModelScope會在ViewModel的onCleared()方法中被取消
可以自己創捷CoroutineScope
,如下:
class MainActivity : AppCompatActivity() {
val scope = CoroutineScope(Job() + Dispatchers.Main)
override fun onCreate(savedInstanceState: Bundle?) {
...
scope.launch {
Log.d(TAG, "onCreate: ${threadName()}") // main
fetchDoc1()
}
scope.launch {
...
}
}
suspend fun fetchDoc1() = withContext(Dispatchers.IO) {...}
override fun onDestroy() {
scope.cancel()
super.onDestroy()
}
}
建立scope
的時候,將Job
和Dispatcher
聯合起來,作為一個CoroutineContext
,作為CoroutineScope
的構造引數。當scope.cancel
的時候,通過scope
開啟的所有協程都會被自動取消,並且之後無法使用scope
來開啟協程(不會報錯但是協程開啟無效)。
也可以通過傳入CoroutineScope
的Job
來取消協程:
val job = Job()
val scope = CoroutineScope(job + Dispatchers.Main)
scope.launch {...}
...
job.cancel()
使用Job
取消了協程,之後也是無法通過scope
來開啟協程的。
其實檢視原始碼,可以發現CoroutineScope.cancel
方法內部就是通過Job
進行cancel
的:
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
關於協程的取消後面還會再進行介紹。
Job
當我們使用launch
或者async
建立一個協程的時候,都會獲取到一個Job
例項,這個Job
例項唯一地標識這個協程,並且管理這個協程地生命週期。Job
有點類似Java
中的Thread
類。
Java
中Thread
類的部分方法:
它可以對所建立的執行緒進行管理。
Job
類還有部分擴充套件函式如下:
CoroutineContext
CoroutineContext
使用下面的幾種元素定義了協程的行為:
Job
:控制協程的生命週期。CoroutineDispatcher
:將工作分派到適當的執行緒。CoroutineName
:協程的名稱,可用於除錯。CoroutineExceptionHandler
:處理未捕獲的異常。
對於在作用域內建立的新協程,系統會為新協程分配一個新的 Job
例項,而從包含協程的作用域繼承其他 CoroutineContext
元素。可以通過向 launch
或 async
函式傳遞新的 CoroutineContext
替換繼承的元素。請注意,將 Job
傳遞給 launch
或 async
不會產生任何效果,因為系統始終會向新協程分配 Job
的新例項。
例如:
val scope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("Top Scope"))
scope.launch(Dispatchers.IO) {
Log.d(TAG, "onCreate: ${coroutineContext[CoroutineName]}")
}
D/abcde: onCreate: CoroutineName(Top Scope)
新建立的協程從外部的scope
繼承了CoroutineName
等元素,但注意,CoroutineDispatcher
元素被重寫了,在新建立的協程裡,CoroutineDispatcher
元素被指定為Dispatchers.IO
。
避免使用GlobalScope
官方文件中,對於不提倡使用GlobalScope
,給出了三個原因:
- (一)Promotes hard-coding values. If you hardcode
GlobalScope
, you might be hard-codingDispatchers
as well. - (二)Makes testing very hard as your code is executed in an uncontrolled scope, you won't be able to control its execution.
- (三)You can't have a common
CoroutineContext
to execute for all coroutines built into the scope itself.
關於第二點和第三點的解釋如下:我們自己建立的CoroutineScope
可以進行結構化併發的操作,例如我們可以呼叫CoroutineScope.cancel
去取消該作用域下所有正在執行的協程,cancel
方法如下:
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
它內部先獲取CoroutineContext
的Job
,然後第哦啊有Job
的cancel
方法,實現協程的取消。我們手動建立的CoroutineScope
的CoroutineContext
中都是有Job
的,例如:
val scope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("Top Scope"))
它的構造方法為:
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
構造方法中,若傳入的CoroutineContext
沒有Job
,則會建立一個Job
新增到CoroutineContext
中。但是GlobalScope
是全域性(單例)的,它的CoroutineContext
是一個EmptyCoroutineContext
,裡面沒有Job
成員
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
我們在呼叫GlobalScope.launch
時,可以指定本次啟動的協程的CoroutineContext
。當我們在呼叫GlobalScope.cancel()
的時候,會報下面的錯誤:
java.lang.IllegalStateException: Scope cannot be cancelled because it does not have a job: kotlinx.coroutines.GlobalScope@11b671b
可以看出,報錯的原因就是因為GlobalScope
沒有Job
。
協程的取消
官方文件的原話:
Cancellation in coroutines is cooperative, which means that when a coroutine's Job is cancelled, the coroutine isn't cancelled until it suspends or checks for cancellation. If you do blocking operations in a coroutine, make sure that the coroutine is cancellable.
可以得出:
- 協程的取消是協作式
- 外部對當前正在執行的協程的取消,協程不會立即取消,當下面兩種情況之一發生時,協程才會取消
- 該協程的配合檢查,協同進行取消,這和停止一個執行緒的執行類似(需要執行緒的配合檢查)。
- 當協程
suspend
的時候,協程也會被取消。
主動檢查
舉個例子:
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
bn2.setOnClickListener {
scope.cancel()
}
假如我們只點擊bn1
開啟協程,但是不點選bn2
去取消協程,那麼輸出為
D/abcde: onCreate: true
D/abcde: onCreate: DefaultDispatcher-worker-1,Top Scope
假設我們點選bn1
開啟協程後,立即點選bn2
取消協程(此時協程仍然在Thread.sleep
期間),那麼輸出為
D/abcde: onCreate: false
D/abcde: onCreate: DefaultDispatcher-worker-2,Top Scope
可以看到,協程的isActive
的值變為false
,但是協程仍然會執行(雖然之後無法通過scope
再去啟動新的協程)。
上面的例子中,已經呼叫了scope.cancel
,但是當前協程仍然還在執行,說明協程的真正取消需要協程內部的配合,其中一個方法就是呼叫ensureActive()
函式,ensureActive
的作用大致上相當於:
if (!isActive) {
throw CancellationException()
}
我們修改下上面的例子:
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
// 檢查協程是否取消
ensureActive()
Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
bn2.setOnClickListener {
scope.cancel()
}
我們點選bn1
開啟協程後,立即點選bn2
取消協程(此時協程仍然在Thread.sleep
期間),那麼輸出為
D/abcde: onCreate: false
可以看到,當前協程內部的ensureActive()
函式配合外部的cancel
操作,成功地將協程取消了。
當然,也可以通過其它的方式在協程內部進行協作式地取消操作。
協程掛起
外部對協程cancel
之後,執行的協程被suspend
的時候,協程也會被取消。
對上面的例子改造一下:
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
withContext(Dispatchers.Main) {
Log.d(TAG,
"onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
}
bn2.setOnClickListener {
scope.cancel()
}
假如我們只點擊bn1
開啟協程,但是不點選bn2
去取消協程,那麼輸出為
D/abcde: onCreate: true
D/abcde: onCreate: main,Top Scope
假設我們點選bn1
開啟協程後,立即點選bn2
取消協程(此時協程仍然在Thread.sleep
期間),那麼輸出為
D/abcde: onCreate: false
可以看出,withContext
在suspend
當前協程的時候,協程被取消了。
kotlinx.coroutines
中的所有suspend
函式都是可取消的(cancellable
),例如withContext
and delay
(上面的例子中,不使用withContext
,使用delay
函式也是可以實現協程的取消的)。如果協程中呼叫了這些掛起函式,就不需要做任何其它的額外工作。
異常的處理
對於協程中的異常,可以使用try...catch...
進行捕獲,也可以使用CoroutineExceptionHandler
。
CoroutineExceptionHandler
是CoroutineContext
中的一種
協程中使用try...catch...
捕獲異常:
class LoginViewModel(
private val loginRepository: LoginRepository
) : ViewModel() {
fun login(username: String, token: String) {
viewModelScope.launch {
try {
loginRepository.login(username, token)
// Notify view user logged in successfully
} catch (error: Throwable) {
// Notify view login attempt failed
}
}
}
}
其它掛起函式
coroutineScope
掛起函式coroutineScope
:建立一個CoroutineScope
,並且在這個scope
裡面呼叫特定的suspend block
,建立的CoroutineScope
繼承外部scope
的CoroutineContext
(CoroutineContext
中的Job
會被重寫)。
這個函式為parallel decomposition
而設計,當這個scope
的任何子協程fail
,這個scope
裡面其它的子協程也會fail
,這個scope
也fail
了(感覺有點結構化併發的感覺)。
當使用coroutineScope
的時候,外部的協程會被掛起,直到coroutineScope
裡面的程式碼和scope
裡面的協程執行結束的時候,掛起函式coroutineScope
的外部協程就會恢復執行。
一個例子:
GlobalScope.launch(Dispatchers.Main) {
fetchTwoDocs()
Log.d(TAG, "Under fetchTwoDocs()")
}
suspend fun fetchTwoDocs() {
coroutineScope {
Log.d(TAG, "fetchTwoDocs: ${threadName()}")
val deferredOne = async {
Log.d(TAG, "async1 start: ${threadName()}")
fetchDoc1()
Log.d(TAG, "async1 end: ${threadName()}")
}
val deferredTwo = async {
Log.d(TAG, "async2: start:${threadName()}")
fetchDoc2()
Log.d(TAG, "async2 end: ${threadName()}")
}
deferredOne.await()
deferredTwo.await()
}
}
suspend fun fetchDoc1() = withContext(Dispatchers.IO) {
Thread.sleep(2000L)
}
suspend fun fetchDoc2() = withContext(Dispatchers.IO) {
Thread.sleep(1000L)
}
D/abcde: fetchTwoDocs: main
D/abcde: async1 start: main
D/abcde: async2: start:main
D/abcde: async2 end: main
D/abcde: async1 end: main
D/abcde: Under fetchTwoDocs()
幾個關注點:
Under fetchTwoDocs()
在fetchTwoDocs
執行完畢後才輸出coroutineScope
裡面的程式碼在主執行緒執行async
的程式碼執行在main
執行緒中,因為coroutineScope
建立的scope
會繼承外部的GlobalScope.launch
的CoroutineContext
。
上面的程式碼即使不呼叫deferredOne.await()
、deferredTwo.await()
,也是一樣的執行和輸出結果。
suspendCoroutine
/**
* Obtains the current continuation instance inside suspend functions and suspends
* the currently running coroutine.
*
* In this function both [Continuation.resume] and [Continuation.resumeWithException] can be used either synchronously in
* the same stack-frame where the suspension function is run or asynchronously later in the same thread or
* from a different thread of execution. Subsequent invocation of any resume function will produce an [IllegalStateException].
*/
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}
suspendCoroutine
是一個主動掛起協程的行為,它會給你一個Continuation
,讓你決定什麼時候去恢復協程的執行。