1. 程式人生 > 其它 >Kotlin協程的使用

Kotlin協程的使用

前言

本篇是在Android官網對Kotlin協程的學習記錄。記錄Kotlin CoroutinesAndroid上的特點、應用等

協程概述

一、協程是什麼?

協程是一種併發的設計模式,可以使用它來簡化非同步執行的程式碼,它可以幫助管理一些耗時的任務,以防耗時任務阻塞主執行緒。協程可以用同步的方式寫出非同步程式碼,代替了傳統的回撥方式,讓程式碼更具有可讀性。

二、協程的特點?

  • 輕量(Lightweight):其實這裡的輕量是相對執行緒阻塞而言的,協程支援掛起,掛起的時候並不會阻塞當前執行緒,也就是"非阻塞式掛起",在協程掛起的時候執行緒可以做其它的事情,而執行緒的阻塞期間是無法做其他事情的。所以協程的"非阻塞式掛起"可以節省系統的資源。

  • 記憶體洩漏更少(Fewer memory leaks):使用者關閉頁面的時候,後臺執行緒可能仍然有還在執行的任務,如果使用傳統的執行緒進行後臺請求,可能沒有很好的辦法讓執行緒及時地停止執行,使用協程的話,可以通過Job::cancel讓協程及時地停止執行,並且可以通過協程作用域CoroutineScope對協程進行統一管理,例如對通過CoroutineScope啟動的協程統一進行cancel,這種就稱作結構化併發,它讓我們的程式有更少的協程洩漏,協程洩漏可以看做是一種記憶體洩露。

  • 內建取消支援(Built-in cancellation support):Cancellation會自動在執行中的整個協程層次結構內傳播。

  • Jetpack和第三方框架支援:一些比如RoomViewModelJetpack元件,第三方框架Retrofit等有對Kt協程提供支援。

關於協程作用域:協程必須執行在CoroutineScope裡(協程作用域),一個 CoroutineScope 管理一個或多個相關的協程。例如viewmodel-ktx包下面有 viewModelScopeviewModelScope管理通過它啟動的協程,如果viewModel被銷燬,那麼viewModelScope會自動被取消,通過viewModelScope啟動的正在執行的協程也會被取消。

掛起與恢復

協程有suspendresume兩項概念:

  • suspend(掛起):暫停執行當前協程,並儲存所有區域性變數。
  • resume(恢復):用於讓已掛起的協程從掛起處繼續執行。

協程中有一個suspend關鍵字,它和剛剛提到的suspend概念要區分一下,剛剛提到的suspend(掛起)是一個概念,而suspend關鍵字可以修飾一個函式,但是僅這個關鍵字沒有讓協程掛起的作用,一般suspend關鍵字是提醒呼叫者該函式需要直接或間接地在協程下面執行,起到一個標記與提醒的作用。

suspend關鍵字的標記與提醒有什麼作用?在以前,開發者很難判斷一個方法是否是耗時的,如果錯誤地在主執行緒呼叫一個耗時方法,那麼會造成主執行緒卡頓,有了suspend關鍵字,耗時函式的建立者可以將耗時方法使用suspend關鍵字修飾,並且在方法內部將耗時程式碼使用withContext{Dispatchers.IO}等方式放到IO執行緒等執行,開發者只需要直接或間接地在協程下面呼叫它即可,這樣就可以避免耗時任務在主執行緒中執行從而造成主執行緒卡頓了。

下面通過官方的一個例子,對協程的suspendresume兩個概念進行說明:

suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

我們假設在協程中呼叫fetchDocs方法,該協程提供了一個主執行緒環境(如啟動協程時通過Dispatchers.Main指定),另外,get方法執行耗時任務,它使用掛起函式withContext{Dispatchers.IO}將耗時任務放到了IO執行緒中執行。

fetchDocs方法裡,當執行到get方法開始進行網路請求的時候,它會掛起(suspend)所在的協程,當網路請求完成時,get會恢復(resume)已掛起的協程,而不是使用回撥通知主執行緒

Kotlin使用棧幀(stack frame)管理正在執行的函式以及它的區域性變數,當掛起一個協程的時候,系統會複製並儲存當前的棧幀以供稍後使用。協程恢復時,會將棧幀從其儲存位置複製回來,然後函式再次開始執行

排程器

Kotlin協程必須執行在dispatcher裡面,協程可以將自己suspenddispatcher負責resume它們。

有下面三種Dispatcher

  • Dispatchers.Main:在主執行緒執行協程。
  • Dispatchers.IO:該dispatcher適合執行磁碟或網路I/O,並且經過優化。
  • Dispatchers.Default:該dispatcher適合執行佔用大量 CPU 資源的工作(對列表排序和解析JSON),並且經過優化。

啟動協程

有以下兩種方式啟動協程:

  • launch:啟動新協程,launch的返回值為Job,協程的執行結果不會返回給呼叫方。
  • async:啟動新協程,async的返回值為DeferredDeferred繼承至Job,可通過呼叫Deferred::await獲取協程的執行結果,其中await是掛起函式。

在一個常規函式啟動協程,通常使用的是launch,因為常規函式無法呼叫Deferred::await,在一個協程或者掛起函式內部開啟協程可以使用async

launchasync的區別:

  1. launch啟動的協程沒有返回結果;async啟動的協程有返回結果。
  2. launch啟動的協程有異常會立即丟擲;async啟動的協程的異常不會立即丟擲,會等到呼叫Deferred::await的時候才將異常丟擲。
  3. async適合於一些併發任務的執行,例如有這樣的業務:做兩個網路請求,等兩個請求都完成後,一起顯示請求結果。使用async可以這樣實現
interface IUser {
    @GET("/users/{nickname}")
    suspend fun getUser(@Path("nickname") nickname: String): User

    @GET("/users/{nickname}")
    fun getUserRx(@Path("nickname") nickname: String): Observable<User>
}
val iUser = ServiceCreator.create(IUser::class.java)
GlobalScope.launch(Dispatchers.Main) {
    val one = async {
        Log.d(TAG, "one: ${threadName()}")
        iUser.getUser("giagor")
    }
    val two = async {
        Log.d(TAG, "two: ${threadName()}")
        iUser.getUser("google")
    }
    Log.d(TAG, "giagor:${one.await()} , google:${two.await()} ")
}

協程概念

CoroutineScope

CoroutineScope會跟蹤它使用launchasync建立的所有協程,可以呼叫scope.cancel()取消該作用域下所有正在執行的協程。在ktx中,為我們提供了一些已經定義好的CoroutineScope,如ViewModelviewModelScopeLifecyclelifecycleScope,具體可以檢視Android KTX | Android Developers

viewModelScope會在ViewModel的onCleared()方法中被取消

可以自己創捷CoroutineScope,如下:

class MainActivity : AppCompatActivity() {
    val scope = CoroutineScope(Job() + Dispatchers.Main)
    
    override fun onCreate(savedInstanceState: Bundle?) {
    	...
        scope.launch {
            Log.d(TAG, "onCreate: ${threadName()}") // main
            fetchDoc1()
        }
        
        scope.launch { 
            ...
        }
    }
    
    suspend fun fetchDoc1() = withContext(Dispatchers.IO) {...}
    
    override fun onDestroy() {
        scope.cancel()
        super.onDestroy()
    }
}

建立scope的時候,將JobDispatcher聯合起來,作為一個CoroutineContext,作為CoroutineScope的構造引數。當scope.cancel的時候,通過scope開啟的所有協程都會被自動取消,並且之後無法使用scope來開啟協程(不會報錯但是協程開啟無效)。

也可以通過傳入CoroutineScopeJob來取消協程:

    val job = Job()
    val scope = CoroutineScope(job + Dispatchers.Main)

    scope.launch {...}
	...
	job.cancel()

使用Job取消了協程,之後也是無法通過scope來開啟協程的。

其實檢視原始碼,可以發現CoroutineScope.cancel方法內部就是通過Job進行cancel的:

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

關於協程的取消後面還會再進行介紹。

Job

當我們使用launch或者async建立一個協程的時候,都會獲取到一個Job例項,這個Job例項唯一地標識這個協程,並且管理這個協程地生命週期。Job有點類似Java中的Thread類。

JavaThread類的部分方法:

它可以對所建立的執行緒進行管理。

Job類還有部分擴充套件函式如下:

CoroutineContext

CoroutineContext使用下面的幾種元素定義了協程的行為:

  • Job:控制協程的生命週期。
  • CoroutineDispatcher:將工作分派到適當的執行緒。
  • CoroutineName:協程的名稱,可用於除錯。
  • CoroutineExceptionHandler:處理未捕獲的異常。

對於在作用域內建立的新協程,系統會為新協程分配一個新的 Job 例項,而從包含協程的作用域繼承其他 CoroutineContext 元素。可以通過向 launchasync 函式傳遞新的 CoroutineContext 替換繼承的元素。請注意,將 Job 傳遞給 launchasync 不會產生任何效果,因為系統始終會向新協程分配 Job 的新例項。

例如:

val scope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("Top Scope"))

scope.launch(Dispatchers.IO) {
    Log.d(TAG, "onCreate: ${coroutineContext[CoroutineName]}")
}
D/abcde: onCreate: CoroutineName(Top Scope)

新建立的協程從外部的scope繼承了CoroutineName等元素,但注意,CoroutineDispatcher元素被重寫了,在新建立的協程裡,CoroutineDispatcher元素被指定為Dispatchers.IO

避免使用GlobalScope

官方文件中,對於不提倡使用GlobalScope,給出了三個原因:

  • (一)Promotes hard-coding values. If you hardcode GlobalScope, you might be hard-coding Dispatchers as well.
  • (二)Makes testing very hard as your code is executed in an uncontrolled scope, you won't be able to control its execution.
  • (三)You can't have a common CoroutineContext to execute for all coroutines built into the scope itself.

關於第二點和第三點的解釋如下:我們自己建立的CoroutineScope可以進行結構化併發的操作,例如我們可以呼叫CoroutineScope.cancel去取消該作用域下所有正在執行的協程,cancel方法如下:

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

它內部先獲取CoroutineContextJob,然後第哦啊有Jobcancel方法,實現協程的取消。我們手動建立的CoroutineScopeCoroutineContext中都是有Job的,例如:

val scope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("Top Scope"))

它的構造方法為:

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

構造方法中,若傳入的CoroutineContext沒有Job,則會建立一個Job新增到CoroutineContext中。但是GlobalScope是全域性(單例)的,它的CoroutineContext是一個EmptyCoroutineContext,裡面沒有Job成員

public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

我們在呼叫GlobalScope.launch時,可以指定本次啟動的協程的CoroutineContext。當我們在呼叫GlobalScope.cancel()的時候,會報下面的錯誤:

java.lang.IllegalStateException: Scope cannot be cancelled because it does not have a job: kotlinx.coroutines.GlobalScope@11b671b

可以看出,報錯的原因就是因為GlobalScope沒有Job

協程的取消

官方文件的原話:

Cancellation in coroutines is cooperative, which means that when a coroutine's Job is cancelled, the coroutine isn't cancelled until it suspends or checks for cancellation. If you do blocking operations in a coroutine, make sure that the coroutine is cancellable.

可以得出:

  1. 協程的取消是協作式
  2. 外部對當前正在執行的協程的取消,協程不會立即取消,當下面兩種情況之一發生時,協程才會取消
    • 該協程的配合檢查,協同進行取消,這和停止一個執行緒的執行類似(需要執行緒的配合檢查)。
    • 當協程suspend的時候,協程也會被取消。

主動檢查

舉個例子:

val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))

bn1.setOnClickListener {
	scope.launch {
        Thread.sleep(2000)
        Log.d(TAG, "onCreate: $isActive")
        Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
	}
}

bn2.setOnClickListener {
	scope.cancel()
}

假如我們只點擊bn1開啟協程,但是不點選bn2去取消協程,那麼輸出為

D/abcde: onCreate: true
D/abcde: onCreate: DefaultDispatcher-worker-1,Top Scope

假設我們點選bn1開啟協程後,立即點選bn2取消協程(此時協程仍然在Thread.sleep期間),那麼輸出為

D/abcde: onCreate: false
D/abcde: onCreate: DefaultDispatcher-worker-2,Top Scope

可以看到,協程的isActive的值變為false,但是協程仍然會執行(雖然之後無法通過scope再去啟動新的協程)。

上面的例子中,已經呼叫了scope.cancel,但是當前協程仍然還在執行,說明協程的真正取消需要協程內部的配合,其中一個方法就是呼叫ensureActive()函式,ensureActive的作用大致上相當於:

if (!isActive) {
    throw CancellationException()
}

我們修改下上面的例子:

val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))

bn1.setOnClickListener {
	scope.launch {
        Thread.sleep(2000)
        Log.d(TAG, "onCreate: $isActive")
        // 檢查協程是否取消
        ensureActive()
        Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
	}
}

bn2.setOnClickListener {
	scope.cancel()
}

我們點選bn1開啟協程後,立即點選bn2取消協程(此時協程仍然在Thread.sleep期間),那麼輸出為

D/abcde: onCreate: false

可以看到,當前協程內部的ensureActive()函式配合外部的cancel操作,成功地將協程取消了。

當然,也可以通過其它的方式在協程內部進行協作式地取消操作。

協程掛起

外部對協程cancel之後,執行的協程被suspend的時候,協程也會被取消。

對上面的例子改造一下:

val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))

bn1.setOnClickListener {
    scope.launch {
        Thread.sleep(2000)
        Log.d(TAG, "onCreate: $isActive")
        withContext(Dispatchers.Main) {
            Log.d(TAG, 
                  "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
        }
    }    
}

bn2.setOnClickListener {
	scope.cancel()
}

假如我們只點擊bn1開啟協程,但是不點選bn2去取消協程,那麼輸出為

D/abcde: onCreate: true
D/abcde: onCreate: main,Top Scope

假設我們點選bn1開啟協程後,立即點選bn2取消協程(此時協程仍然在Thread.sleep期間),那麼輸出為

D/abcde: onCreate: false

可以看出,withContextsuspend當前協程的時候,協程被取消了。

kotlinx.coroutines中的所有suspend函式都是可取消的(cancellable),例如withContext and delay(上面的例子中,不使用withContext,使用delay函式也是可以實現協程的取消的)。如果協程中呼叫了這些掛起函式,就不需要做任何其它的額外工作。

異常的處理

對於協程中的異常,可以使用try...catch...進行捕獲,也可以使用CoroutineExceptionHandler

CoroutineExceptionHandlerCoroutineContext中的一種

協程中使用try...catch...捕獲異常:

class LoginViewModel(
    private val loginRepository: LoginRepository
) : ViewModel() {

    fun login(username: String, token: String) {
        viewModelScope.launch {
            try {
                loginRepository.login(username, token)
                // Notify view user logged in successfully
            } catch (error: Throwable) {
                // Notify view login attempt failed
            }
        }
    }
}

其它掛起函式

coroutineScope

掛起函式coroutineScope:建立一個CoroutineScope,並且在這個scope裡面呼叫特定的suspend block,建立的CoroutineScope繼承外部scopeCoroutineContextCoroutineContext中的Job會被重寫)。

這個函式為parallel decomposition而設計,當這個scope的任何子協程fail,這個scope裡面其它的子協程也會fail,這個scopefail了(感覺有點結構化併發的感覺)。

當使用coroutineScope的時候,外部的協程會被掛起,直到coroutineScope裡面的程式碼和scope裡面的協程執行結束的時候,掛起函式coroutineScope的外部協程就會恢復執行。

一個例子:

    GlobalScope.launch(Dispatchers.Main) {
        fetchTwoDocs()
        Log.d(TAG, "Under fetchTwoDocs()")
    }

    suspend fun fetchTwoDocs() {
        coroutineScope {
            Log.d(TAG, "fetchTwoDocs: ${threadName()}")
            val deferredOne = async {
                Log.d(TAG, "async1 start: ${threadName()}")
                fetchDoc1()
                Log.d(TAG, "async1 end: ${threadName()}")
            }
            val deferredTwo = async {
                Log.d(TAG, "async2: start:${threadName()}")
                fetchDoc2()
                Log.d(TAG, "async2 end: ${threadName()}")
            }
            deferredOne.await()
            deferredTwo.await()
        }
    }

    suspend fun fetchDoc1() = withContext(Dispatchers.IO) {
        Thread.sleep(2000L)
    }

    suspend fun fetchDoc2() = withContext(Dispatchers.IO) {
        Thread.sleep(1000L)
    }
D/abcde: fetchTwoDocs: main
D/abcde: async1 start: main
D/abcde: async2: start:main
D/abcde: async2 end: main
D/abcde: async1 end: main
D/abcde: Under fetchTwoDocs()

幾個關注點:

  1. Under fetchTwoDocs()fetchTwoDocs執行完畢後才輸出
  2. coroutineScope裡面的程式碼在主執行緒執行
  3. async的程式碼執行在main執行緒中,因為coroutineScope建立的scope會繼承外部的GlobalScope.launchCoroutineContext

上面的程式碼即使不呼叫deferredOne.await()deferredTwo.await(),也是一樣的執行和輸出結果。

suspendCoroutine

/**
 * Obtains the current continuation instance inside suspend functions and suspends
 * the currently running coroutine.
 *
 * In this function both [Continuation.resume] and [Continuation.resumeWithException] can be used either synchronously in
 * the same stack-frame where the suspension function is run or asynchronously later in the same thread or
 * from a different thread of execution. Subsequent invocation of any resume function will produce an [IllegalStateException].
 */
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

suspendCoroutine是一個主動掛起協程的行為,它會給你一個Continuation,讓你決定什麼時候去恢復協程的執行。

參考

  1. Kotlin coroutines on Android | Android Developers