Kotlin協程實現原理
初看suspend關鍵字
下面的例子模擬一個網路請求:
class Temp {
suspend fun fetchData(argument: String): Boolean {
val result = netRequest(argument)
return result == 0
}
// 模擬網路請求
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument.length
}
}
這兩個方法都使用了suspend
Java
程式碼:
public final class Temp { @Nullable public final Object fetchData(@NotNull String argument, @NotNull Continuation var2) { Object $continuation; label25: { if (var2 instanceof <undefinedtype>) { $continuation = (<undefinedtype>)var2; if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) { ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE; break label25; } } $continuation = new ContinuationImpl(var2) { // $FF: synthetic field Object result; int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return Temp.this.fetchData((String)null, this); } }; } Object $result = ((<undefinedtype>)$continuation).result; Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); Object var10000; switch(((<undefinedtype>)$continuation).label) { case 0: ResultKt.throwOnFailure($result); ((<undefinedtype>)$continuation).label = 1; var10000 = this.netRequest(argument, (Continuation)$continuation); if (var10000 == var6) { return var6; } break; case 1: ResultKt.throwOnFailure($result); var10000 = $result; break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } int result = ((Number)var10000).intValue(); return Boxing.boxBoolean(result == 0); } @Nullable public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) { Object $continuation; label20: { if (var2 instanceof <undefinedtype>) { $continuation = (<undefinedtype>)var2; if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) { ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl(var2) { // $FF: synthetic field Object result; int label; Object L$0; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return Temp.this.netRequest((String)null, this); } }; } Object $result = ((<undefinedtype>)$continuation).result; Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(((<undefinedtype>)$continuation).label) { case 0: ResultKt.throwOnFailure($result); ((<undefinedtype>)$continuation).L$0 = argument; ((<undefinedtype>)$continuation).label = 1; if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) { return var5; } break; case 1: argument = (String)((<undefinedtype>)$continuation).L$0; ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Boxing.boxInt(argument.length()); } }
幾行協程相關的程式碼,竟然對應了這麼多的Java
程式碼,可見kotlin
編譯器為我們做了很多事情。
上面程式碼的可讀性不高,例如有<undefinedtype>
這種未定義的型別,我使用jd-gui
對Temp.class
檔案再進行了一次反編譯,獲取到了更多資訊,我將上面的反編譯的一大串程式碼和jd-gui
反編譯獲取的資訊進行整合,並且對一些類和變數進行適當的重新命名,得出資訊更完整且可讀性更高的「Temp.class
反編譯後對應的Java
程式碼」,首先是fetchData
相關的:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) { Object $continuation; label25: { if (completion instanceof FetchDataStateMachine) { $continuation = (FetchDataStateMachine) completion; if (($continuation.label & Integer.MIN_VALUE) != 0) { $continuation.label -= Integer.MIN_VALUE; break label25; } } $continuation = new FetchDataStateMachine(completion); } Object $result = $continuation.result; Object resultTemp; switch ($continuation.label) { case 0: ResultKt.throwOnFailure($result); $continuation.label = 1; resultTemp = this.netRequest(argument, (Continuation) $continuation); if (resultTemp == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED; } break; case 1: ResultKt.throwOnFailure($result); resultTemp = $result; break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } int result = ((Number) resultTemp).intValue(); return Boxing.boxBoolean(result == 0); } static final class FetchDataStateMachine extends ContinuationImpl { Object result; int label; FetchDataStateMachine(Continuation $completion) { super($completion); } @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return Temp.this.fetchData(null, (Continuation<? super Boolean>) this); } }
netRequest
相關的程式碼,與fetchData
相關的程式碼,在結構和形式上類似:
public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label20:
{
if (completion instanceof NetRequestStateMachine) {
$continuation = (NetRequestStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new NetRequestStateMachine(completion);
}
Object $result = $continuation.result;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.functionParameter = argument;
$continuation.label = 1;
if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
argument = (String) ($continuation.functionParameter);
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing.boxInt(argument.length());
}
static final class NetRequestStateMachine extends ContinuationImpl {
Object result;
int label;
Object functionParameter;
NetRequestStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
}
}
可以發現,反編譯後的Java
程式碼中,fetchData
和netRequest
方法都多了一個Continuation completion
引數,這是Kotlin Compiler
幫我們做的,對於suspend
修飾的函式,編譯的時候Kotlin Compiler
會幫我們在該函式中傳入一個Continuation
引數,使用Continuation
引數代替了suspend
修飾符,這個引數有什麼含義呢?
初識續體
續體是理解協程工作原理的一個關鍵。
先看傳統的網路請求:
data class User(val id: Long, val name: String)
interface Callback {
fun success(user: User)
fun failure(t: Throwable)
}
class Model {
fun getUserInfo(callback: Callback) {
Thread.sleep(1000) // 模擬網路請求
callback.success(User(1, "giagor"))
}
}
class Business {
val model = Model()
fun getUserInfo() {
model.getUserInfo(object : Callback {
override fun success(user: User) {
showMsg(user.toString())
}
override fun failure(t: Throwable) {
showMsg(t.message ?: "")
}
})
}
fun showMsg(msg: String) {
// ...
}
}
在使用Model
進行網路請求的時候,使用Callback
接收網路請求的結果,我們這時候可以將Callback
看作一個續體,即網路請求的續體,用於接收網路請求的結果。
在協程中使用Continuation
介面表示一個續體,它代表一個掛起點之後的延續,即 掛起點之後的剩餘應執行的程式碼:
public interface Continuation<in T> {
// 與該續體對應的協程的上下文
public val context: CoroutineContext
// 恢復對應協程的執行,並且傳遞一個表示成功或失敗的result作為最後一個掛起點的返回值
public fun resumeWith(result: Result<T>)
}
在Kotlin 1.3
,也有可以方便地呼叫resumeWith
的擴充套件函式:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
正如前面所說,對於suspend
修飾的函式,Kotlin Compiler
會幫我們在該函式中傳入一個Continuation
引數,使用Continuation
引數代替了suspend
修飾符,通過Continuation
引數,Kotlin Compiler
可以將我們的協程程式碼轉化為等價的回撥程式碼,也就是說,Kt
編譯器幫我們寫好了那些回撥的程式碼,至於怎麼幫我們寫的後面會分析,這種通過傳遞Continuation
來控制非同步呼叫流程被稱作CPS
變換(Continuation-Passing-Style Transformation
)。
狀態機
fetchData
函式編譯時會生成下面的一個靜態內部類(續體):
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
}
}
FetchDataStateMachine
的繼承關係如下:
FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
FetchDataStateMachine
接收一個名稱為$completion
的Continuation
引數,$completion
被儲存在父類BaseContinuationImpl
中:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {...}
通過$completion
可以將fetchData
函式的執行結果傳遞迴給呼叫fetchData
的函式,有了$completion
,才有能力去實現回撥。
狀態機FetchDataStateMachine
聲明瞭result
和label
兩個變數
result
:表示上一個Continuation
的結果,比如有函式A
和B
,函式內部分別聲明瞭ContinuationA
和ContinuationB
,A
呼叫B
並且將ContinuationA
傳入B
中儲存。在後續回撥的過程中,ContinuationA
可以從result
變數中拿到ContinuationB::invokeSuspend
的執行結果。label
:Kotlin Compiler
可以識別函式內部哪個地方會掛起,每一個掛起點(suspension point
)被表示為狀態機的一個狀態(state
),這些狀態通過switch case
語句表示出來。label
表示當前應該執行狀態機的哪一個狀態,具體來說就是要進入哪一個case
,通過label
變數就記錄下了狀態機當前的狀態。
再看下fetchData
的前半部分程式碼:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
...
}
它會判斷傳入的completion
是否為FetchDataStateMachine
型別,若是則對它的label
變數做些操作,若不是則直接建立一個FetchDataStateMachine
並且傳入completion
(completion
會被儲存下來)。
再看下fetchData
的後半部分程式碼:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
...
Object $result = $continuation.result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing.boxBoolean(result == 0);
}
fetchData
方法原先的程式碼語句會被劃分為switch
下的多個case
語句,在這裡就是
FetchDataStateMachine
中的label
變數就是控制當前要執行哪個case
分支。
可見,函式與續體構成了一個有限狀態機(FSM,即 Finite-State Machine),來控制協程程式碼的執行。
何為「非阻塞式掛起」?
在netRequest
方法中,呼叫了delay(1000)
掛起了當前的協程,簡單看下delay
方法反編譯後的程式碼:
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
// 實現類
CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
cancellableContinuationImpl.initCancellability();
// 向上轉型
CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
if (timeMillis < Long.MAX_VALUE) {
// 延時操作
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
}
// 獲取執行結果
Object result = cancellableContinuationImpl.getResult();
if (result == COROUTINE_SUSPENDED) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
// 返回結果
return result;
}
}
在該方法裡會執行延時操作,如果需要掛起,就會返回COROUTINE_SUSPENDED
值給呼叫者。
結合fetchData
、netRequest
和delay
反編譯的程式碼,我們可以得出下面的這個呼叫圖:
圖中紅色的線表示函式返回COROUTINE_SUSPENDED
,需要掛起。當delay
方法需要掛起的時候,它返回COROUTINE_SUSPENDED
,接著netRequest
方法返回COROUTINE_SUSPENDED
,接著fetchData
方法返回COROUTINE_SUSPENDED
,重複這個過程直到呼叫棧的最上層。
通過這種「結束方法呼叫」的方式,讓協程暫時不在這個執行緒上面執行,讓執行緒可以去處理其它的任務(包括執行其它的協程),這也就是為什麼協程的掛起不會阻塞當前的執行緒,這也是「非阻塞式掛起」的由來。
如何恢復?
既然協程掛起了,那就有相應的協程的恢復。先說結論:協程恢復的實質是對續體進行回撥。
暫時還沒有研究delay
函式的具體實現,但是delay
函式會在某個子執行緒執行等待操作,等延時時間到達之後,就會呼叫傳給delay
函式的$completion
的resumeWith
方法,也就是呼叫NetRequestStateMachine
的resumeWith
方法。NetRequestStateMachine
的繼承關係、父類如下:
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl
目前是我們分析的一個重點,它主要做了下面的幾件事情:
- 儲存
completion
:它儲存了fetchData
方法的FetchDataStateMachine
例項,使得可以一級一級地向上回撥續體。 - 重寫
resumeWith
方法:BaseContinuationImpl
重寫了Continuation
介面的resumeWith
方法,該方法用於恢復協程,也是協程恢復的核心邏輯。
我們檢視BaseContinuationImpl
類的定義:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// 在每個恢復的continuation進行除錯探測,使得除錯庫可以精確跟蹤掛起的呼叫棧中哪些部分
// 已經恢復了。
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
...
}
重點是resumeWith
方法的實現,它在一個while(true)
迴圈下面執行回撥的邏輯。我們結合前面給出的fetchData
和netRequest
反編譯後的程式碼,看看delay
函式的延時時間到達時呼叫NetRequestStateMachine
的resumeWith
方法,後續的執行流程是怎樣的:
- 執行
NetRequestStateMachine
父類BaseContinuationImpl
的resumeWith
方法。 - 執行當前續體也就是
NetRequestStateMachine
的invokeSuspend
方法(NetRequestStateMachine
有實現該方法,忘記了的話可以回頭看看之前的反編譯程式碼)。 - 在
NetRequestStateMachine
的invokeSuspend
方法呼叫了netRequest
方法,並且將續體自身作為引數傳入。 - 在
netRequest
方法中,由於completion
的型別就是NetRequestStateMachine
,因此可以直接使用該續體,不用像之前第一次進入netRequest
方法那樣需要建立一個新的續體。此時續體的label
值為1
,於是進入netRequest
的case 1
語句分支。
實際上這個過程有對續體的
label
進行一些運算轉化的操作,但是最終label
的值都是1
,做的運算轉化操作不影響我們的分析,因此並不是重點
- 從續體中取出一開始傳入
netRequest
方法的引數,也就是argument
,返回argument.length
。為了方便後面闡述,這裡將該返回值argument.length
記為netRequest-Return
。 - 接著
netRequest
方法結束,NetRequestStateMachine::invokeSuspend
方法也執行結束,netRequest-Return
也作為invokeSuspend
方法的返回值,該返回值會傳遞到BaseContinuationImpl
的resumeWith
方法中,在resumeWith
方法中,將netRequest-Return
包裝為Result
儲存到outcome
變數中。 - 判斷
NetRequestStateMachine
持有的completion
是否為BaseContinuationImpl
型別,我們知道它持有的例項其實就是FetchDataStateMachine
,因此肯定是BaseContinuationImpl
,於是進行了變數的更新
// 把current更新為FetchDataStateMachine例項
current = completion
// 把param更新為outcome(包裝了netRequest-Return的Result)
param = outcome
通過這種方式,其實就可以實現回撥,我們繼續往後看。
- 繼續進行下一輪
while
迴圈,在with
塊中會執行FetchDataStateMachine::invokeSuspend
,在invokeSuspend
裡,將傳入的引數param
儲存到result
變數裡(其實這和傳統的回撥類似,傳統的回撥中也是要將下層的執行結果回撥給上層),接著呼叫了fetchData
方法。 - 在
fetchData
方法中,由於傳入的completion
已是FetchDataStateMachine
型別,因此無需再去建立新的續體。由於此時續體label
的值為1
,所以會進入case 1
語句,並且將netRequest
方法的執行結果儲存在resultTemp
變數中,最終fetchData
方法結束並返回結果result == 0
,為了方便闡述,將fetchData
方法的執行結果記為fetchData-Return
。 FetchDataStateMachine::invokeSuspend
方法也會結束並返回fetchData-Return
,然後在BaseContinuationImpl
的resumeWith
方法中將fetchData-Return
包裝為Result
。然後會判斷FetchDataStateMachine
持有的completion
是否為BaseContinuationImpl
型別。- 程式碼的後續走向,我們目前是不清楚的,我們得知道在協程中呼叫
fetchData
方法的時候會做些什麼,才能清楚後續的程式碼走向。
從上面的流程分析中,我們對協程的恢復有了一個基本的認識,下面給出流程圖進行總結:
再看看上面續體的呼叫過程,其實就是層層往上地呼叫續體的invokeSuspend
方法,從過程來看有點像遞迴呼叫,但是BaseContinuationImpl::resumeWith
的實現卻和遞迴不太一樣,它的實現是在while(true)
迴圈中,對續體呼叫一次invokeSuspend
方法,然後記錄它的返回結果,將這個返回結果作為下一個續體invokeSuspend
的方法引數。
簡單來講,就是在呼叫一個續體的invokeSuspend
方法,待這個方法執行結束後,再呼叫下一個續體的invokeSuspend
方法。這樣做的一個原因是避免呼叫棧過深,在BaseContinuationImpl::resumeWith
也有相關的註釋說明:
This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
啟動協程
我們在一個協程中去呼叫fetchData
方法:
class Temp2 {
fun execute() {
GlobalScope.launch(Dispatchers.Main) {
Temp().fetchData("argument")
}
}
}
通過launch
方法可以啟動一個協程,其原始碼如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
協程中的程式碼會被包裝為一個block
,預設情況下會建立一個StandaloneCoroutine
,然後呼叫它的start
方法並返回StandaloneCoroutine
。
StandaloneCoroutine
間接的實現了Job
介面和Continuation<T>
介面,如下:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {...}
可以看出StandaloneCoroutine
身兼多職,實現了Job, Continuation<T>, CoroutineScope
介面。後面程式碼跟蹤可以得出一個結論,最頂層的續體實現是協程自身,也就是協程恢復的時候續體會一層層地往上回調,最頂層的續體就是協程coroutine
自身,即StandaloneCoroutine
(這裡以StandaloneCoroutine
為例)。
另外還要注意一點,launch
方法中傳入的 block
塊型別:
block: suspend CoroutineScope.() -> Unit
它等價於下面的這種函式型別:
// CoroutineScope:擴充套件函式轉化而來
// Continuation:suspend關鍵字轉化而來,Continuation引數由編譯器傳入
block : (CoroutineScope,Continuation) -> Unit
// 或者通過Function2的形式表示
block : Function2<CoroutineScope,Continuation,Unit>
接著跟蹤下啟動協程的呼叫過程。在launch
方法中,呼叫了AbstractCoroutine::start
方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// 語法糖,實際是呼叫CoroutineStart.invoke方法
start(block, receiver, this)
}
CoroutineStart::invoke
方法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
從launch
方法可以知道CoroutineStart
的預設值是CoroutineStart.DEFAULT
,因此會呼叫到block
的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
我在AS
跟蹤createCoroutineUnintercepted
的程式碼呼叫時,發現會跳轉到IntrinsicsKt.class
檔案,這個檔案裡面找不到方法的原始碼,最後找到了IntrinsicsJvm.kt
檔案,找到createCoroutineUnintercepted
方法原始碼,如下:
# R:CoroutineScope
# T:Unit
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
// probeCoroutineCreated方法直接返回completion
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
這裡會判斷this
的型別是否為BaseContinuationImpl
,this
就是我們之前在launch
中傳入的lambda
塊,那麼這個lambda
程式碼塊是什麼型別的呢?想要知道這個答案,我們得對這一節剛開始給出的程式碼進行反編譯
kotlin
程式碼:
class Temp2 {
fun execute() {
GlobalScope.launch(Dispatchers.Main) {
Temp().fetchData("argument")
}
}
}
對反編譯後的java
程式碼進行適當的重新命名和調整,得出:
public final class Temp2 {
...
static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
LaunchLambda(Continuation $completion) {
super(2, $completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super LaunchLambda> $completion) {
return (Continuation<Unit>) new LaunchLambda($completion);
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
return ((LaunchLambda) create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}
}
可以看出在Temp2
裡面會自動生成一個靜態內部類LaunchLambda
,它對應著launch
方法中傳入的lambda
塊。LaunchLambda
的繼承關係(由上到下,子類到父類的順序):
LaunchLambda
-> SuspendLambda // 用suspend修飾的lambda塊都會繼承至這個類
-> ContinuationImpl
-> BaseContinuationImpl // 重寫了resumeWith函式
-> Continuation
OK,回到createCoroutineUnintercepted
方法中,現在可以回答剛剛提出的問題了,lambda
傳入的lambda
塊是不是BaseContinuationImpl
型別呢?根據上面的繼承關係得出,當然是!那麼它就會呼叫LaunchLambda
的create
方法,注意第二個引數傳入的是completion
(程式碼中寫的是probeCompletion
),它最終會被儲存在父類BaseContinuationImpl
的completion
變數中,這個completion
引數就是launch
方法中建立的StandaloneCoroutine
,即協程本身,它作為協程恢復時的最頂層續體。
通過呼叫create
方法獲取到一個LaunchLambda
例項,createCoroutineUnintercepted
方法執行結束並返回LaunchLambda
例項,接著程式碼執行又回到startCoroutineCancellable
中,回顧下該方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
這裡有兩部分呼叫,先是呼叫intercepted
方法,然後再呼叫resumeCancellableWith
方法。intercepted
方法與續體攔截機制有關,後面會介紹,這裡先忽略,這裡直接認為呼叫了LaunchLambda
例項的resumeCancellableWith
方法即可,該方法如下:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
那麼會走到resumeWith
方法,前面提到過該方法在父類BaseContinuationImpl
實現,在該方法裡面會呼叫invokeSuspend
方法,invokeSuspend
方法在LaunchLambda
中實現了,如下:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
一開始label
的值為0
,所以會進入case 0
語句分支,在該語句分支裡面,會設定label
的值為1
,然後建立一個Temp
物件並且呼叫它的fetchData
方法,並把LaunchLambda
自身作為引數傳入,也就是LaunchLambda
例項會被儲存在fetchData
方法建立的續體的completion
變數裡,方便協程恢復的時候進行回撥。
現在續體的持有圖:
到了這裡,從啟動一個協程到協程最終是如何掛起的,我們已經可以串聯起來了。在「如何恢復?」一節中,協程恢復的最後幾個步驟我們還沒有分析,這裡把它分析完,然後整個協程恢復的流程也可以串起來了。
協程恢復的後續流程:
- 當
FetchDataStateMachine::invokeSuspend
執行完後,會在BaseContinuationImpl
的resumeWith
方法中判斷FetchDataStateMachine
所持有的completion
(即LaunchLambda
)是否為BaseContinuationImpl
型別,由LaunchLambda
的繼承關係,容易得出答案為「是」,所以會進入下一輪while
迴圈,呼叫LaunchLambda
的invokeSuspend
方法。 - 由於
label = 1
所以會進入case 1
語句,裡面直接return Unit
。接著判斷LaunchLambda
持有的completion
(即StandaloneCoroutine
)是否為BaseContinuationImpl
型別,根據StandaloneCoroutine
的繼承關係容易得出答案為「不是」,所以會呼叫StandaloneCoroutine
的resumeWith
方法。 StandaloneCoroutine
的resumeWith
方法在父類AbstractCoroutine
中實現:
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
// 如果在等子協程完成,則返回
if (state === COMPLETING_WAITING_CHILDREN) return
// 應該是做一些後續處理
afterResume(state)
}
此時最頂層的續體(協程自身)也恢復了。
BaseContinuationImpl::resumeWith
方法執行結束,整個協程的恢復也完成了。
在之前流程圖的基礎上進行補充完善:
一、協程至上而下呼叫的流程圖(協程掛起)
其中藍色的文字和線條表示新增的,紅色的文字和線條表示掛起的過程。
二、協程至下而上恢復的流程圖(協程恢復)
其中藍色的文字和線條表示新增的,橙色的文字和線條表示方法呼叫的結束。
協程上下文
協程上下文CoroutineContext
定義了協程的行為,它記錄了當前協程所持有的資訊,是協程執行中一個重要的資料物件。CoroutineContext
是一個介面:
public interface CoroutineContext {...}
在續體中就有CoroutineContext
的相關資訊:
public interface Continuation<in T> {
// 與該續體對應的協程的上下文
public val context: CoroutineContext
// 恢復對應協程的執行,並且傳遞一個表示成功或失敗的result作為最後一個掛起點的返回值
public fun resumeWith(result: Result<T>)
}
下面幾種元素都是「協程上下文」的元素:
Job
:控制協程的生命週期。CoroutineDispatcher
:將工作分派到適當的執行緒。CoroutineName
:協程的名稱,可用於除錯。CoroutineExceptionHandler
:處理未捕獲的異常。
CoroutineContext
可以看做是CoroutineContext.Element
的一個集合,集合中的每個元素都可以使用CoroutineContext.Key
進行定位,且每個元素的Key
都是不同的。
CoroutineContext.Element
的定義:
public interface Element : CoroutineContext {...}
可以看到Element
本身也實現了CoroutineContext
介面,這很奇怪,看上去好像是Int
實現了List<Int>
介面一樣,為什麼元素本身也是集合了呢?其實這主要是為了方便API的設計,這樣的話,一個元素比如Job
也可以直接作為一個CoroutineContext
,而不需要建立一個只包含一個元素的List
,多個元素之間也可以通過「+」進行拼接,如:
scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}
這裡的「+」其實是操作符過載,對應CoroutineContext
宣告的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
「協程上下文」儲存元素的方式比較巧妙,它內部並不是建立一個集合,集合的每個位置都存放一個元素。它藉助了一個CombinedContext
結構來實現資料的存取,CombinedContext
的定義及get
方法:
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur.left
if (next is CombinedContext) {
cur = next
} else {
return next[key]
}
}
}
...
}
從建構函式中可以看出它包含兩部分內容:left
和element
。也就是說一個CombinedContext
內部可能包含多個元素。
- left:可能是普通的上下文元素(
CoroutineContext.Element
),也可能又是一個CombinedContext
(又包含多個上下文元素)。 - element:一個協程上下文元素。
在CombinedContext
的get
方法中,有一個while(true)
迴圈,執行過程如下:
- 它會先判斷當前
element
元素與傳入的key
是否相符,是的話直接返回該元素,否則獲取到left
部分。 - 若
left
是CombinedContext
部分,則對left
變數重複步驟1。 - 若
left
不是CombinedContext
部分,則直接呼叫它的get
方法獲取元素(獲取不到則返回null
)。
另外,也可以看出element
先於left
被訪問,所以越靠右邊的上下文元素,其優先順序越高。
Key
用於標識協程上下文元素,看看它的定義:
public interface CoroutineContext {
...
public interface Key<E : Element>
public interface Element : CoroutineContext {
// 用於標識元素的Key
public val key: Key<*>
...
}
}
CoroutineContext.Element
有個抽象類實現,可以讓我們更方便地實現上下文元素:
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
以CoroutineName
為例,分析如何實現一個協程上下文元素:
public data class CoroutineName(
val name: String
/* CoroutineName.Key可以簡寫為CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
...
}
首先宣告一點,傳入父類AbstractCoroutineContextElement
的引數是CoroutineName.Key
,只是它可以簡寫為CoroutineName
。其實這也很好理解,在Kotlin
中,我們呼叫伴生物件方法的時候,是可以省去伴生物件的類名的,這裡也是同樣的道理。
CoroutineName
內部聲明瞭一個繼承至CoroutineContext.Key
的伴生物件Key
,並將其作為構造引數傳入父類AbstractCoroutineContextElement
中,以此作為該協程上下文元素的Key
。
上面是實現協程上下文元素的一種普遍做法,即在協程上下文元素裡面定義一個伴生物件,以伴生物件為Key
,標識該上下文元素。
最後再看一下CoroutineContext
的完整定義:
public interface CoroutineContext {
// 根據key獲取元素
public operator fun <E : Element> get(key: Key<E>): E?
// 翻譯為"摺疊",它與上下文元素的累加有關
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 協程上下文元素的累加
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
// 當前CoroutineContext中,去掉key標識的元素後,剩下的上下文元素(以CoroutineContext形式返回)
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
// 標識上下文元素的Key
public val key: Key<*>
// key相同則返回元素自身,否則返回null
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
// 執行傳入的operation函式
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
CoroutineContext
的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
為了方便後面闡述,記呼叫形式為A + B,假設A是含有多個元素的協程上下文,B是單個上下文元素。該方法的大致執行流程如下:
- 若元素B是空的,則返回原來的上下文A。
- 在fold的lambda塊中,可以認為acc為A,element為B。
- 若A中減去element.key元素後(記為C),C為空上下文,則返回B(相當於元素B替換了上下文A)。
- 檢視C中是否有ContinuationInterceptor元素,沒有則將C和B拼接後返回。
- C中剔除ContinuationInterceptor,記為D,若D是空的,則將B和ContinuationInterceptor拼接然後返回。
- D不是空的,則將D和B和ContinuationInterceptor拼接然後返回。
簡單來說,這裡就是要將「傳入的協程上下文元素」與「原來的協程上下文元素」進行拼接,若傳入的元素與原來集合中的元素的key
有衝突,則用傳入的元素替換掉原來集合中key
衝突的元素。在上下文元素拼接的時候,若有ContinuationInterceptor
元素則要確保它在「協程上下文元素集合」的最右邊,這樣它的優先順序最高,從協程上下文獲取該元素的時候可以更快地獲取到(至於為什麼元素在右邊,元素的優先順序就高、獲取快,在前面介紹CombinedContext
中已經說明過了)。
plus
方法的執行流程很難用文字敘述清楚,如果想要知道它的實現流程,可以代入幾個例子試試。但是它具體的執行流程並不是要分析的重點,有個大概的印象即可。
續體攔截機制
這裡算是協程實現原理解析的最後一環了。我們在使用協程的時候,會使用到一些排程器如Dispatchers.Main
和Dispatchers.IO
等排程器來排程執行緒,在前面的分析中並沒有提到協程是如何進行執行緒排程的。
執行緒的排程與續體攔截器ContinuationInterceptor
有關,它也是一種「協程上下文元素」:
public interface ContinuationInterceptor : CoroutineContext.Element {
// 續體攔截器對應的Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 返回一個續體,該續體對原始的續體進行包裝(原始的續體作為方法引數傳入)。
// 如果該方法不想攔截傳入的續體,也可以直接返回原來的續體。
// 當原始續體完成時,如果該續體之前被攔截了,協程框架會呼叫releaseInterceptedContinuation
// 方法,傳入的引數就是「續體的包裝類」。
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 該函式只有在interceptContinuation成功攔截的情況下,才會被呼叫。
// 若原始續體成功被攔截,當原始續體完成且不再被使用時,該方法會被呼叫,傳入的引數是「續體的包裝類」。
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
...
}
續體攔截器可以用於攔截一個續體,最常見的續體攔截器就是協程排程器CoroutineDispatcher
,可以通過單例類Dispatchers
獲取到相應的協程排程器。檢視CoroutineDispatcher
的實現:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
...
}
- 攔截器:
CoroutineDispatcher
繼承至ContinuationInterceptor
,所以它也是一種續體攔截器。 - 上下文元素的標識:
CoroutineDispatcher
繼承至AbstractCoroutineContextElement
,並傳入ContinuationInterceptor.Key
構造引數,以此來標識自身。 - isDispatchNeeded:若需要使用
dispatch
方法進行排程則返回true
,否則返回false
。該方法預設返回true
。協程排程器可以重寫該方法,提供一個性能優化以避免不必要的dispatch
,例如主執行緒排程器Dispatchers.Main
會判斷當前協程是否已經在UI
執行緒中,如果是的話該方法就會返回false
,沒有必要再去執行dispatch
方法進行不必要的執行緒排程。 - dispatch:在給定的上下文和執行緒中,去執行
block
塊。
假設使用的協程排程器是主執行緒排程器Dispatchers.Main
:
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
檢視MainDispatcherLoader.dispatcher
:
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
呼叫了tryCreateDispatcher
:
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
繼續跟蹤,發現createDispatcher
是MainDispatcherFactory
介面的一個方法,其中的一個實現在AndroidDispatcherFactory
中:
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
...
}
HandlerContext
其實就是排程器Dispatchers.Main
的最終實現:
# handler:主執行緒的Handler
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
...
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
-
isDispatchNeeded:通過
looper
判斷協程當前是否在主執行緒上,是的話返回false
,表示不需要再進行執行緒排程,否則返回true
表示需要進行執行緒排程。 -
dispatch:使用主執行緒的
handler
對傳入的block
塊進行post
操作。
對「續體攔截器」「協程排程器」有了一定的瞭解之後,我們再回過頭看一下協程排程器是如何發揮作用的。我們前面分析過Cancellable
檔案的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
在createCoroutineUnintercepted
方法中返回了LaunchLambda
例項,在之前的分析中,我們忽略了intercepted
方法,直接分析為LaunchLambda
會呼叫resumeCancellableWith
方法,若沒有為協程設定續體攔截器,那麼確實是LaunchLambda
會直接呼叫到resumeCancellableWith
方法。我們看看,如果為協程設定了續體攔截器,會發生什麼?
檢視LaunchLambda
呼叫的intercepted
方法,它在IntrinsicsJVM
檔案中:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
LaunchLambda
是ContinuationImpl
型別,因此會呼叫到父類ContinuationImpl::intercepted
:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
...
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
剛開始intercepted
為null
,所以會判斷協程上下文中是否有ContinuationInterceptor
元素,若沒有則會返回this
(即LaunchLambda
自身,並將intercepted
變數設定為LaunchLambda
),有的話則會呼叫interceptContinuation
方法,假設使用的續體攔截器是Dispatchers.Main
,那麼就是呼叫到CoroutineDispatcher
的interceptContinuation
方法,該方法會返回一個DispatchedContinuation
(並將DispatchedContinuation
設定到intercepted
變數中)。
檢視CoroutineDispatcher::interceptContinuation
:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
DispatchedContinuation
類:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {...}
在這裡的例子中,dispatcher
就是Dispatchers.Main
,continuation
就是LaunchLambda
。
再回到Cancellable
檔案的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
在有續體攔截器(Dispatchers.Main
)的情況下,intercepted
方法會返回DispatchedContinuation
,接著呼叫它的resumeCancellableWith
方法:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
呼叫到另外一個resumeCancellableWith
方法,這個方法就是在DispatchedContinuation
中實現的了:
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // 需要執行緒排程
_state = state
resumeMode = MODE_CANCELLABLE
// 執行緒排程,將自身以Runnable塊形式傳入
dispatcher.dispatch(context, this)
} else { // 不需要執行緒排程
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
// 最終會呼叫continuation.resumeWith,即LaunchLambda.resumeWith
resumeUndispatchedWith(result)
}
}
}
}
可以看到,它呼叫了dispatcher.isDispatchNeeded
來判斷是否需要進行執行緒排程,以Dispatchers.Main
為例,就是判斷當前協程是否在主執行緒中執行,是的話則不需要排程,否則需要將協程排程到主執行緒中執行。
- 不需執行緒排程:最終會呼叫到
LaunchLambda.resumeWith
,它後續的執行流程之前已經分析過了。 - 需要執行緒排程:(以主執行緒的協程排程器為例)最終會將傳入的
Runnable
在主執行緒中執行。
Runnable
的run
方法在哪實現的呢?在DispatchedContinuation
的父類DispatchedTask
中有run
方法的實現:
public final override fun run() {
...
try {
// 獲取到的delegate其實就是DispatchedContinuation
val delegate = delegate as DispatchedContinuation<T>
// 獲取到的continuation其實就是LaunchLambda
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
// 正常情況下,會執行到這裡,呼叫LaunchLambda的resume方法
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
...
} finally {
...
}
}
在run
方法中,最終會呼叫到LaunchLambda
的resume
方法(內部又會呼叫到resumeWith
方法)。所以這裡做的執行緒排程,其實就是通過主執行緒的handler
,將程式碼post
到主執行緒中去執行,從而完成執行緒的排程工作。
另外,還有幾個未研究的地方與自己的猜想:
一、releaseIntercepted方法:在BaseContinuationImpl::resumeWith
中,每執行完一個續體的invokeSuspend
方法,就會呼叫該續體的releaseIntercepted
方法
protected override fun releaseIntercepted() {
val intercepted = intercepted
// intercepted不為null且不為自身(即之前成功攔截續體),就進入If塊
if (intercepted != null && intercepted !== this) {
// 呼叫續體攔截器的releaseInterceptedContinuation方法,並傳入續體包裝類
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
intercepted)
}
// 將intercepted變數設定為CompletedContinuation
this.intercepted = CompletedContinuation // just in case
}
續體攔截器的releaseInterceptedContinuation
方法應該是做一些資源清理的工作。
二、像withContext
這樣的函式:
scope.launch(Dispatchers.Main) {
withContext(Dispatchers.IO) {}
}
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {...}
在block
塊執行完後,會將執行緒自動切回「啟動協程時的協程排程器所指定」的執行緒,那麼它是如何切回來的呢?個人猜測,在協程至上而下呼叫的時候,協程上下文會一層一層地向下傳遞,withContext
的block
塊執行的時候,協程上下文會被儲存在某個地方,等到block
塊執行結束的時候,會從之前儲存的協程上下文中取出協程排程器,將剩餘的程式碼(協程恢復)排程到相應的執行緒中去執行,從而實現了 block
塊執行完後,執行緒會自動切回「啟動協程時的協程排程器所指定」的執行緒。