1. 程式人生 > 其它 >Kotlin協程實現原理

Kotlin協程實現原理

初看suspend關鍵字

下面的例子模擬一個網路請求:

class Temp {
    suspend fun fetchData(argument: String): Boolean {
        val result = netRequest(argument)
        return result == 0
    }

    // 模擬網路請求
    suspend fun netRequest(argument: String): Int {
        delay(1000)
        return argument.length
    }
}

這兩個方法都使用了suspend

關鍵字修飾,我們將這個檔案的位元組碼反編譯為等同效果的Java程式碼:

public final class Temp {
   @Nullable
   public final Object fetchData(@NotNull String argument, @NotNull Continuation var2) {
      Object $continuation;
      label25: {
         if (var2 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var2;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label25;
            }
         }

         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.fetchData((String)null, this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      Object var10000;
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).label = 1;
         var10000 = this.netRequest(argument, (Continuation)$continuation);
         if (var10000 == var6) {
            return var6;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         var10000 = $result;
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      int result = ((Number)var10000).intValue();
      return Boxing.boxBoolean(result == 0);
   }

   @Nullable
   public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {
      Object $continuation;
      label20: {
         if (var2 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var2;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;
            Object L$0;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.netRequest((String)null, this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).L$0 = argument;
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
            return var5;
         }
         break;
      case 1:
         argument = (String)((<undefinedtype>)$continuation).L$0;
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      return Boxing.boxInt(argument.length());
   }
}

幾行協程相關的程式碼,竟然對應了這麼多的Java程式碼,可見kotlin編譯器為我們做了很多事情

上面程式碼的可讀性不高,例如有<undefinedtype>這種未定義的型別,我使用jd-guiTemp.class檔案再進行了一次反編譯,獲取到了更多資訊,我將上面的反編譯的一大串程式碼和jd-gui反編譯獲取的資訊進行整合,並且對一些類和變數進行適當的重新命名,得出資訊更完整且可讀性更高的「Temp.class反編譯後對應的Java程式碼」,首先是fetchData相關的:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label25:
    {
        if (completion instanceof FetchDataStateMachine) {
            $continuation = (FetchDataStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label25;
            }
        }
        $continuation = new FetchDataStateMachine(completion);
    }

    Object $result = $continuation.result;
    Object resultTemp;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.label = 1;
            resultTemp = this.netRequest(argument, (Continuation) $continuation);
            if (resultTemp == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            resultTemp = $result;
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    int result = ((Number) resultTemp).intValue();
    return Boxing.boxBoolean(result == 0);
}

static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;

    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
    }
}

netRequest相關的程式碼,與fetchData相關的程式碼,在結構和形式上類似:

public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label20:
    {
        if (completion instanceof NetRequestStateMachine) {
            $continuation = (NetRequestStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label20;
            }
        }
        $continuation = new NetRequestStateMachine(completion);
    }

    Object $result = $continuation.result;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.functionParameter = argument;
            $continuation.label = 1;
            if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            argument = (String) ($continuation.functionParameter);
            ResultKt.throwOnFailure($result);
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    return Boxing.boxInt(argument.length());
}

static final class NetRequestStateMachine extends ContinuationImpl {
    Object result;
    int label;
    Object functionParameter;

    NetRequestStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
    }
}

可以發現,反編譯後的Java程式碼中,fetchDatanetRequest方法都多了一個Continuation completion引數,這是Kotlin Compiler幫我們做的,對於suspend修飾的函式,編譯的時候Kotlin Compiler會幫我們在該函式中傳入一個Continuation引數,使用Continuation引數代替了suspend修飾符,這個引數有什麼含義呢?

初識續體

續體是理解協程工作原理的一個關鍵。

先看傳統的網路請求:

data class User(val id: Long, val name: String)

interface Callback {
    fun success(user: User)
    fun failure(t: Throwable)
}

class Model {
    fun getUserInfo(callback: Callback) {
        Thread.sleep(1000) // 模擬網路請求
        callback.success(User(1, "giagor"))
    }
}

class Business {
    val model = Model()

    fun getUserInfo() {
        model.getUserInfo(object : Callback {
            override fun success(user: User) {
                showMsg(user.toString())
            }

            override fun failure(t: Throwable) {
                showMsg(t.message ?: "")
            }
        })
    }

    fun showMsg(msg: String) {
        // ...
    }
}

在使用Model進行網路請求的時候,使用Callback接收網路請求的結果,我們這時候可以將Callback看作一個續體,即網路請求的續體,用於接收網路請求的結果。

在協程中使用Continuation介面表示一個續體,它代表一個掛起點之後的延續,即 掛起點之後的剩餘應執行的程式碼

public interface Continuation<in T> {
    // 與該續體對應的協程的上下文
    public val context: CoroutineContext
    // 恢復對應協程的執行,並且傳遞一個表示成功或失敗的result作為最後一個掛起點的返回值
    public fun resumeWith(result: Result<T>)
}

Kotlin 1.3,也有可以方便地呼叫resumeWith的擴充套件函式:

public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

正如前面所說,對於suspend修飾的函式,Kotlin Compiler會幫我們在該函式中傳入一個Continuation引數,使用Continuation引數代替了suspend修飾符,通過Continuation引數,Kotlin Compiler可以將我們的協程程式碼轉化為等價的回撥程式碼,也就是說,Kt編譯器幫我們寫好了那些回撥的程式碼,至於怎麼幫我們寫的後面會分析,這種通過傳遞Continuation來控制非同步呼叫流程被稱作CPS變換(Continuation-Passing-Style Transformation

狀態機

fetchData函式編譯時會生成下面的一個靜態內部類(續體):

static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;

    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
    }
}

FetchDataStateMachine的繼承關係如下:

FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

FetchDataStateMachine接收一個名稱為$completionContinuation引數,$completion被儲存在父類BaseContinuationImpl中:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {...}

通過$completion可以將fetchData函式的執行結果傳遞迴給呼叫fetchData的函式,有了$completion,才有能力去實現回撥

狀態機FetchDataStateMachine聲明瞭resultlabel兩個變數

  • result表示上一個Continuation的結果,比如有函式AB,函式內部分別聲明瞭ContinuationAContinuationBA呼叫B並且將ContinuationA傳入B中儲存。在後續回撥的過程中,ContinuationA可以從result變數中拿到ContinuationB::invokeSuspend的執行結果
  • labelKotlin Compiler可以識別函式內部哪個地方會掛起,每一個掛起點(suspension point)被表示為狀態機的一個狀態(state),這些狀態通過switch case語句表示出來。label表示當前應該執行狀態機的哪一個狀態,具體來說就是要進入哪一個case,通過label變數就記錄下了狀態機當前的狀態

再看下fetchData的前半部分程式碼:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label25:
    {
        if (completion instanceof FetchDataStateMachine) {
            $continuation = (FetchDataStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label25;
            }
        }
        $continuation = new FetchDataStateMachine(completion);
    }
    ...
}

它會判斷傳入的completion是否為FetchDataStateMachine型別,若是則對它的label變數做些操作,若不是則直接建立一個FetchDataStateMachine並且傳入completioncompletion會被儲存下來)。

再看下fetchData的後半部分程式碼:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    ...
    Object $result = $continuation.result;
    Object resultTemp;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.label = 1;
            resultTemp = this.netRequest(argument, (Continuation) $continuation);
            if (resultTemp == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            resultTemp = $result;
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    int result = ((Number) resultTemp).intValue();
    return Boxing.boxBoolean(result == 0);
}

fetchData方法原先的程式碼語句會被劃分為switch下的多個case語句,在這裡就是

FetchDataStateMachine中的label變數就是控制當前要執行哪個case分支。

可見,函式與續體構成了一個有限狀態機(FSM,即 Finite-State Machine),來控制協程程式碼的執行

何為「非阻塞式掛起」?

netRequest方法中,呼叫了delay(1000)掛起了當前的協程,簡單看下delay方法反編譯後的程式碼:

public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
    if (timeMillis <= 0L) {
        return Unit.INSTANCE;
    } else {
        // 實現類
        CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
        cancellableContinuationImpl.initCancellability();
        // 向上轉型
        CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
        if (timeMillis < Long.MAX_VALUE) {
            // 延時操作
            getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
        }
		// 獲取執行結果
        Object result = cancellableContinuationImpl.getResult();
        if (result == COROUTINE_SUSPENDED) {
            DebugProbesKt.probeCoroutineSuspended($completion);
        }
		// 返回結果
        return result;
    }
}

在該方法裡會執行延時操作,如果需要掛起,就會返回COROUTINE_SUSPENDED值給呼叫者。

結合fetchDatanetRequestdelay反編譯的程式碼,我們可以得出下面的這個呼叫圖:

圖中紅色的線表示函式返回COROUTINE_SUSPENDED,需要掛起。當delay方法需要掛起的時候,它返回COROUTINE_SUSPENDED,接著netRequest方法返回COROUTINE_SUSPENDED,接著fetchData方法返回COROUTINE_SUSPENDED,重複這個過程直到呼叫棧的最上層。

通過這種「結束方法呼叫」的方式,讓協程暫時不在這個執行緒上面執行,讓執行緒可以去處理其它的任務(包括執行其它的協程),這也就是為什麼協程的掛起不會阻塞當前的執行緒,這也是「非阻塞式掛起」的由來

如何恢復?

既然協程掛起了,那就有相應的協程的恢復。先說結論:協程恢復的實質是對續體進行回撥

暫時還沒有研究delay函式的具體實現,但是delay函式會在某個子執行緒執行等待操作,等延時時間到達之後,就會呼叫傳給delay函式的$completionresumeWith方法,也就是呼叫NetRequestStateMachineresumeWith方法NetRequestStateMachine的繼承關係、父類如下:

NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

BaseContinuationImpl目前是我們分析的一個重點,它主要做了下面的幾件事情:

  1. 儲存completion:它儲存了fetchData方法的FetchDataStateMachine例項,使得可以一級一級地向上回撥續體。
  2. 重寫resumeWith方法:BaseContinuationImpl重寫了Continuation介面的resumeWith方法,該方法用於恢復協程,也是協程恢復的核心邏輯。

我們檢視BaseContinuationImpl類的定義:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // 在每個恢復的continuation進行除錯探測,使得除錯庫可以精確跟蹤掛起的呼叫棧中哪些部分
            // 已經恢復了。
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }    
    
    ...
}

重點是resumeWith方法的實現,它在一個while(true)迴圈下面執行回撥的邏輯。我們結合前面給出的fetchDatanetRequest反編譯後的程式碼,看看delay函式的延時時間到達時呼叫NetRequestStateMachineresumeWith方法,後續的執行流程是怎樣的:

  1. 執行NetRequestStateMachine父類BaseContinuationImplresumeWith方法。
  2. 執行當前續體也就是NetRequestStateMachineinvokeSuspend方法(NetRequestStateMachine有實現該方法,忘記了的話可以回頭看看之前的反編譯程式碼)。
  3. NetRequestStateMachineinvokeSuspend方法呼叫了netRequest方法,並且將續體自身作為引數傳入。
  4. netRequest方法中,由於completion的型別就是NetRequestStateMachine因此可以直接使用該續體,不用像之前第一次進入netRequest方法那樣需要建立一個新的續體。此時續體的label值為1,於是進入netRequestcase 1語句分支。

實際上這個過程有對續體的label進行一些運算轉化的操作,但是最終label的值都是1,做的運算轉化操作不影響我們的分析,因此並不是重點

  1. 從續體中取出一開始傳入netRequest方法的引數,也就是argument,返回argument.length。為了方便後面闡述,這裡將該返回值argument.length記為netRequest-Return
  2. 接著netRequest方法結束,NetRequestStateMachine::invokeSuspend方法也執行結束,netRequest-Return也作為invokeSuspend方法的返回值,該返回值會傳遞到BaseContinuationImplresumeWith方法中,在resumeWith方法中,將netRequest-Return包裝為Result儲存到outcome變數中。
  3. 判斷NetRequestStateMachine持有的completion是否為BaseContinuationImpl型別,我們知道它持有的例項其實就是FetchDataStateMachine,因此肯定是BaseContinuationImpl,於是進行了變數的更新
    // 把current更新為FetchDataStateMachine例項
	current = completion
	// 把param更新為outcome(包裝了netRequest-Return的Result)
    param = outcome

通過這種方式,其實就可以實現回撥,我們繼續往後看。

  1. 繼續進行下一輪while迴圈,在with塊中會執行FetchDataStateMachine::invokeSuspend,在invokeSuspend裡,將傳入的引數param儲存到result變數裡(其實這和傳統的回撥類似,傳統的回撥中也是要將下層的執行結果回撥給上層),接著呼叫了fetchData方法。
  2. fetchData方法中,由於傳入的completion已是FetchDataStateMachine型別,因此無需再去建立新的續體。由於此時續體label的值為1,所以會進入case 1語句,並且將netRequest方法的執行結果儲存在resultTemp變數中,最終fetchData方法結束並返回結果result == 0,為了方便闡述,將fetchData方法的執行結果記為fetchData-Return
  3. FetchDataStateMachine::invokeSuspend方法也會結束並返回fetchData-Return,然後在BaseContinuationImplresumeWith方法中將fetchData-Return包裝為Result。然後會判斷FetchDataStateMachine持有的completion是否為BaseContinuationImpl型別。
  4. 程式碼的後續走向,我們目前是不清楚的,我們得知道在協程中呼叫fetchData方法的時候會做些什麼,才能清楚後續的程式碼走向

從上面的流程分析中,我們對協程的恢復有了一個基本的認識,下面給出流程圖進行總結:

再看看上面續體的呼叫過程,其實就是層層往上地呼叫續體的invokeSuspend方法,從過程來看有點像遞迴呼叫,但是BaseContinuationImpl::resumeWith的實現卻和遞迴不太一樣,它的實現是在while(true)迴圈中,對續體呼叫一次invokeSuspend方法,然後記錄它的返回結果,將這個返回結果作為下一個續體invokeSuspend的方法引數。

簡單來講,就是在呼叫一個續體的invokeSuspend方法,待這個方法執行結束後,再呼叫下一個續體的invokeSuspend方法。這樣做的一個原因是避免呼叫棧過深,在BaseContinuationImpl::resumeWith也有相關的註釋說明:

This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume

啟動協程

我們在一個協程中去呼叫fetchData方法:

class Temp2 {
    fun execute() {
        GlobalScope.launch(Dispatchers.Main) {
            Temp().fetchData("argument")
        }
    }
}

通過launch方法可以啟動一個協程,其原始碼如下:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

協程中的程式碼會被包裝為一個block,預設情況下會建立一個StandaloneCoroutine,然後呼叫它的start方法並返回StandaloneCoroutine

StandaloneCoroutine間接的實現了Job介面和Continuation<T>介面,如下:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

public abstract class AbstractCoroutine<in T>(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {...}

可以看出StandaloneCoroutine身兼多職,實現了Job, Continuation<T>, CoroutineScope介面。後面程式碼跟蹤可以得出一個結論,最頂層的續體實現是協程自身,也就是協程恢復的時候續體會一層層地往上回調,最頂層的續體就是協程coroutine自身,即StandaloneCoroutine(這裡以StandaloneCoroutine為例)

另外還要注意一點,launch方法中傳入的 block塊型別:

 block: suspend CoroutineScope.() -> Unit

它等價於下面的這種函式型別:

// CoroutineScope:擴充套件函式轉化而來
// Continuation:suspend關鍵字轉化而來,Continuation引數由編譯器傳入
block : (CoroutineScope,Continuation) -> Unit

// 或者通過Function2的形式表示
block : Function2<CoroutineScope,Continuation,Unit>

接著跟蹤下啟動協程的呼叫過程。在launch方法中,呼叫了AbstractCoroutine::start方法:

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        // 語法糖,實際是呼叫CoroutineStart.invoke方法
        start(block, receiver, this)
    }

CoroutineStart::invoke方法:

    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

launch方法可以知道CoroutineStart的預設值是CoroutineStart.DEFAULT,因此會呼叫到blockstartCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

我在AS跟蹤createCoroutineUnintercepted的程式碼呼叫時,發現會跳轉到IntrinsicsKt.class檔案,這個檔案裡面找不到方法的原始碼,最後找到了IntrinsicsJvm.kt檔案,找到createCoroutineUnintercepted方法原始碼,如下:

# R:CoroutineScope
# T:Unit
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    // probeCoroutineCreated方法直接返回completion
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

這裡會判斷this的型別是否為BaseContinuationImplthis就是我們之前在launch中傳入的lambda塊,那麼這個lambda程式碼塊是什麼型別的呢?想要知道這個答案,我們得對這一節剛開始給出的程式碼進行反編譯

kotlin程式碼:

class Temp2 {
    fun execute() {
        GlobalScope.launch(Dispatchers.Main) {
            Temp().fetchData("argument")
        }
    }
}

對反編譯後的java程式碼進行適當的重新命名和調整,得出:

public final class Temp2 {
    ...
    static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
        int label;

        LaunchLambda(Continuation $completion) {
            super(2, $completion);
        }

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            switch (this.label) {
                case 0:
                    ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                    this.label = 1;
                    if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
                        return COROUTINE_SUSPENDED;
                    (new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
                    return Unit.INSTANCE;
                case 1:
                    ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                    return Unit.INSTANCE;
            }
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }

        @NotNull
        public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super LaunchLambda> $completion) {
            return (Continuation<Unit>) new LaunchLambda($completion);
        }

        @Nullable
        public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
            return ((LaunchLambda) create(p1, p2)).invokeSuspend(Unit.INSTANCE);
        }
    }
}

可以看出在Temp2裡面會自動生成一個靜態內部類LaunchLambda,它對應著launch方法中傳入的lambda塊。LaunchLambda的繼承關係(由上到下,子類到父類的順序):

LaunchLambda
-> SuspendLambda // 用suspend修飾的lambda塊都會繼承至這個類
-> ContinuationImpl
-> BaseContinuationImpl // 重寫了resumeWith函式
-> Continuation    

OK,回到createCoroutineUnintercepted方法中,現在可以回答剛剛提出的問題了,lambda傳入的lambda塊是不是BaseContinuationImpl型別呢?根據上面的繼承關係得出,當然是!那麼它就會呼叫LaunchLambdacreate方法,注意第二個引數傳入的是completion(程式碼中寫的是probeCompletion),它最終會被儲存在父類BaseContinuationImplcompletion變數中,這個completion引數就是launch方法中建立的StandaloneCoroutine,即協程本身,它作為協程恢復時的最頂層續體

通過呼叫create方法獲取到一個LaunchLambda例項,createCoroutineUnintercepted方法執行結束並返回LaunchLambda例項,接著程式碼執行又回到startCoroutineCancellable中,回顧下該方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

這裡有兩部分呼叫,先是呼叫intercepted方法,然後再呼叫resumeCancellableWith方法。intercepted方法與續體攔截機制有關,後面會介紹,這裡先忽略,這裡直接認為呼叫了LaunchLambda例項的resumeCancellableWith方法即可,該方法如下:

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

那麼會走到resumeWith方法,前面提到過該方法在父類BaseContinuationImpl實現,在該方法裡面會呼叫invokeSuspend方法,invokeSuspend方法在LaunchLambda中實現了,如下:

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
    switch (this.label) {
        case 0:
            ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
            this.label = 1;
            if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
                return COROUTINE_SUSPENDED;
            (new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
            return Unit.INSTANCE;
        case 1:
            ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
            return Unit.INSTANCE;
    }
    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

一開始label的值為0,所以會進入case 0語句分支,在該語句分支裡面,會設定label的值為1,然後建立一個Temp物件並且呼叫它的fetchData方法,並把LaunchLambda自身作為引數傳入,也就是LaunchLambda例項會被儲存在fetchData方法建立的續體的completion變數裡,方便協程恢復的時候進行回撥。

現在續體的持有圖:

到了這裡,從啟動一個協程到協程最終是如何掛起的,我們已經可以串聯起來了。在「如何恢復?」一節中,協程恢復的最後幾個步驟我們還沒有分析,這裡把它分析完,然後整個協程恢復的流程也可以串起來了。

協程恢復的後續流程:

  1. FetchDataStateMachine::invokeSuspend執行完後,會在BaseContinuationImplresumeWith方法中判斷FetchDataStateMachine所持有的completion(即LaunchLambda)是否為BaseContinuationImpl型別,由LaunchLambda的繼承關係,容易得出答案為「是」,所以會進入下一輪while迴圈,呼叫LaunchLambdainvokeSuspend方法。
  2. 由於label = 1所以會進入case 1語句,裡面直接return Unit。接著判斷LaunchLambda持有的completion(即StandaloneCoroutine)是否為BaseContinuationImpl型別,根據StandaloneCoroutine的繼承關係容易得出答案為「不是」,所以會呼叫StandaloneCoroutineresumeWith方法。
  3. StandaloneCoroutineresumeWith方法在父類AbstractCoroutine中實現:
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        // 如果在等子協程完成,則返回
        if (state === COMPLETING_WAITING_CHILDREN) return
        // 應該是做一些後續處理
        afterResume(state)
    }

此時最頂層的續體(協程自身)也恢復了。

  1. BaseContinuationImpl::resumeWith方法執行結束,整個協程的恢復也完成了。

在之前流程圖的基礎上進行補充完善:

一、協程至上而下呼叫的流程圖(協程掛起)

其中藍色的文字和線條表示新增的,紅色的文字和線條表示掛起的過程。

二、協程至下而上恢復的流程圖(協程恢復)

其中藍色的文字和線條表示新增的,橙色的文字和線條表示方法呼叫的結束。

協程上下文

協程上下文CoroutineContext定義了協程的行為,它記錄了當前協程所持有的資訊,是協程執行中一個重要的資料物件。CoroutineContext是一個介面:

public interface CoroutineContext {...}

在續體中就有CoroutineContext的相關資訊:

public interface Continuation<in T> {
    // 與該續體對應的協程的上下文
    public val context: CoroutineContext
    // 恢復對應協程的執行,並且傳遞一個表示成功或失敗的result作為最後一個掛起點的返回值
    public fun resumeWith(result: Result<T>)
}

下面幾種元素都是「協程上下文」的元素:

  • Job:控制協程的生命週期。
  • CoroutineDispatcher:將工作分派到適當的執行緒。
  • CoroutineName:協程的名稱,可用於除錯。
  • CoroutineExceptionHandler:處理未捕獲的異常。

CoroutineContext可以看做是CoroutineContext.Element的一個集合,集合中的每個元素都可以使用CoroutineContext.Key進行定位,且每個元素的Key都是不同的。

CoroutineContext.Element的定義:

    public interface Element : CoroutineContext {...}

可以看到Element本身也實現了CoroutineContext介面,這很奇怪,看上去好像是Int實現了List<Int>介面一樣,為什麼元素本身也是集合了呢?其實這主要是為了方便API的設計,這樣的話,一個元素比如Job也可以直接作為一個CoroutineContext,而不需要建立一個只包含一個元素的List,多個元素之間也可以通過「+」進行拼接,如:

scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}

這裡的「+」其實是操作符過載,對應CoroutineContext宣告的plus方法:

    public operator fun plus(context: CoroutineContext): CoroutineContext = ...

「協程上下文」儲存元素的方式比較巧妙,它內部並不是建立一個集合,集合的每個位置都存放一個元素。它藉助了一個CombinedContext結構來實現資料的存取,CombinedContext的定義及get方法:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            cur.element[key]?.let { return it }
            val next = cur.left
            if (next is CombinedContext) {
                cur = next
            } else {
                return next[key]
            }
        }
    }
    ...
}

從建構函式中可以看出它包含兩部分內容:leftelement。也就是說一個CombinedContext內部可能包含多個元素。

  • left:可能是普通的上下文元素(CoroutineContext.Element),也可能又是一個CombinedContext(又包含多個上下文元素)。
  • element:一個協程上下文元素。

CombinedContextget方法中,有一個while(true)迴圈,執行過程如下:

  1. 它會先判斷當前element元素與傳入的key是否相符,是的話直接返回該元素,否則獲取到left部分。
  2. leftCombinedContext部分,則對left變數重複步驟1。
  3. left不是CombinedContext部分,則直接呼叫它的get方法獲取元素(獲取不到則返回null)。

另外,也可以看出element先於left被訪問,所以越靠右邊的上下文元素,其優先順序越高

Key用於標識協程上下文元素,看看它的定義:

public interface CoroutineContext {
	...
    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        // 用於標識元素的Key
    	public val key: Key<*>
        ...
    }
}

CoroutineContext.Element有個抽象類實現,可以讓我們更方便地實現上下文元素:

public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

CoroutineName為例,分析如何實現一個協程上下文元素:

public data class CoroutineName(
    val name: String
    /* CoroutineName.Key可以簡寫為CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {
    
    public companion object Key : CoroutineContext.Key<CoroutineName>
	...
}

首先宣告一點,傳入父類AbstractCoroutineContextElement的引數是CoroutineName.Key,只是它可以簡寫為CoroutineName。其實這也很好理解,在Kotlin中,我們呼叫伴生物件方法的時候,是可以省去伴生物件的類名的,這裡也是同樣的道理。

CoroutineName內部聲明瞭一個繼承至CoroutineContext.Key的伴生物件Key,並將其作為構造引數傳入父類AbstractCoroutineContextElement中,以此作為該協程上下文元素的Key

上面是實現協程上下文元素的一種普遍做法,即在協程上下文元素裡面定義一個伴生物件,以伴生物件為Key,標識該上下文元素

最後再看一下CoroutineContext的完整定義:

public interface CoroutineContext {
    // 根據key獲取元素
    public operator fun <E : Element> get(key: Key<E>): E?

    // 翻譯為"摺疊",它與上下文元素的累加有關
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    // 協程上下文元素的累加
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
        
	// 當前CoroutineContext中,去掉key標識的元素後,剩下的上下文元素(以CoroutineContext形式返回)
    public fun minusKey(key: Key<*>): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        // 標識上下文元素的Key
        public val key: Key<*>

        // key相同則返回元素自身,否則返回null
        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        // 執行傳入的operation函式
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

CoroutineContextplus方法:

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed.minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)
                }
            }
        }

為了方便後面闡述,記呼叫形式為A + B,假設A是含有多個元素的協程上下文,B是單個上下文元素。該方法的大致執行流程如下:

  1. 若元素B是空的,則返回原來的上下文A。
  2. 在fold的lambda塊中,可以認為acc為A,element為B。
  3. 若A中減去element.key元素後(記為C),C為空上下文,則返回B(相當於元素B替換了上下文A)。
  4. 檢視C中是否有ContinuationInterceptor元素,沒有則將C和B拼接後返回。
  5. C中剔除ContinuationInterceptor,記為D,若D是空的,則將B和ContinuationInterceptor拼接然後返回。
  6. D不是空的,則將D和B和ContinuationInterceptor拼接然後返回。

簡單來說,這裡就是要將「傳入的協程上下文元素」與「原來的協程上下文元素」進行拼接,若傳入的元素與原來集合中的元素的key有衝突,則用傳入的元素替換掉原來集合中key衝突的元素。在上下文元素拼接的時候,若有ContinuationInterceptor元素則要確保它在「協程上下文元素集合」的最右邊,這樣它的優先順序最高,從協程上下文獲取該元素的時候可以更快地獲取到(至於為什麼元素在右邊,元素的優先順序就高、獲取快,在前面介紹CombinedContext中已經說明過了)。

plus方法的執行流程很難用文字敘述清楚,如果想要知道它的實現流程,可以代入幾個例子試試。但是它具體的執行流程並不是要分析的重點,有個大概的印象即可。

續體攔截機制

這裡算是協程實現原理解析的最後一環了。我們在使用協程的時候,會使用到一些排程器如Dispatchers.MainDispatchers.IO等排程器來排程執行緒,在前面的分析中並沒有提到協程是如何進行執行緒排程的。

執行緒的排程與續體攔截器ContinuationInterceptor有關,它也是一種「協程上下文元素」:

public interface ContinuationInterceptor : CoroutineContext.Element {
    // 續體攔截器對應的Key
	companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    // 返回一個續體,該續體對原始的續體進行包裝(原始的續體作為方法引數傳入)。
    // 如果該方法不想攔截傳入的續體,也可以直接返回原來的續體。
    // 當原始續體完成時,如果該續體之前被攔截了,協程框架會呼叫releaseInterceptedContinuation
    // 方法,傳入的引數就是「續體的包裝類」。
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    
    // 該函式只有在interceptContinuation成功攔截的情況下,才會被呼叫。
    // 若原始續體成功被攔截,當原始續體完成且不再被使用時,該方法會被呼叫,傳入的引數是「續體的包裝類」。
    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }
    ...
}

續體攔截器可以用於攔截一個續體,最常見的續體攔截器就是協程排程器CoroutineDispatcher,可以通過單例類Dispatchers獲取到相應的協程排程器。檢視CoroutineDispatcher的實現:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })    
    
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
        
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
        
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)   
        
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    }
        
	...      
}
  • 攔截器:CoroutineDispatcher繼承至ContinuationInterceptor,所以它也是一種續體攔截器。
  • 上下文元素的標識:CoroutineDispatcher繼承至AbstractCoroutineContextElement,並傳入ContinuationInterceptor.Key構造引數,以此來標識自身。
  • isDispatchNeeded:若需要使用dispatch方法進行排程則返回true,否則返回false。該方法預設返回true。協程排程器可以重寫該方法,提供一個性能優化以避免不必要的dispatch,例如主執行緒排程器Dispatchers.Main會判斷當前協程是否已經在UI執行緒中,如果是的話該方法就會返回false,沒有必要再去執行dispatch方法進行不必要的執行緒排程。
  • dispatch:在給定的上下文和執行緒中,去執行block塊。

假設使用的協程排程器是主執行緒排程器Dispatchers.Main

    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

檢視MainDispatcherLoader.dispatcher

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }

呼叫了tryCreateDispatcher

public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        createMissingDispatcher(cause, hintOnError())
    }

繼續跟蹤,發現createDispatcherMainDispatcherFactory介面的一個方法,其中的一個實現在AndroidDispatcherFactory中:

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
	...
}

HandlerContext其實就是排程器Dispatchers.Main的最終實現:

# handler:主執行緒的Handler
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
	
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
    ...
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }    
    ...
}
  • isDispatchNeeded:通過looper判斷協程當前是否在主執行緒上,是的話返回false,表示不需要再進行執行緒排程,否則返回true表示需要進行執行緒排程。

  • dispatch:使用主執行緒的handler對傳入的block塊進行post操作。

對「續體攔截器」「協程排程器」有了一定的瞭解之後,我們再回過頭看一下協程排程器是如何發揮作用的。我們前面分析過Cancellable檔案的startCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

createCoroutineUnintercepted方法中返回了LaunchLambda例項,在之前的分析中,我們忽略了intercepted方法,直接分析為LaunchLambda會呼叫resumeCancellableWith方法,若沒有為協程設定續體攔截器,那麼確實是LaunchLambda會直接呼叫到resumeCancellableWith方法。我們看看,如果為協程設定了續體攔截器,會發生什麼?

檢視LaunchLambda呼叫的intercepted方法,它在IntrinsicsJVM檔案中:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

LaunchLambdaContinuationImpl型別,因此會呼叫到父類ContinuationImpl::intercepted

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
	...
    @Transient
    private var intercepted: Continuation<Any?>? = null    
    
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    ...
}

剛開始interceptednull,所以會判斷協程上下文中是否有ContinuationInterceptor元素,若沒有則會返回this(即LaunchLambda自身,並將intercepted變數設定為LaunchLambda),有的話則會呼叫interceptContinuation方法,假設使用的續體攔截器是Dispatchers.Main,那麼就是呼叫到CoroutineDispatcherinterceptContinuation方法,該方法會返回一個DispatchedContinuation(並將DispatchedContinuation設定到intercepted變數中)。

檢視CoroutineDispatcher::interceptContinuation

    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

DispatchedContinuation類:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {...}

在這裡的例子中,dispatcher就是Dispatchers.Maincontinuation就是LaunchLambda

再回到Cancellable檔案的startCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

在有續體攔截器(Dispatchers.Main)的情況下,intercepted方法會返回DispatchedContinuation,接著呼叫它的resumeCancellableWith方法:

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

呼叫到另外一個resumeCancellableWith方法,這個方法就是在DispatchedContinuation中實現的了:

    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) { // 需要執行緒排程
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 執行緒排程,將自身以Runnable塊形式傳入
            dispatcher.dispatch(context, this)
        } else { // 不需要執行緒排程
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    // 最終會呼叫continuation.resumeWith,即LaunchLambda.resumeWith
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

可以看到,它呼叫了dispatcher.isDispatchNeeded來判斷是否需要進行執行緒排程,以Dispatchers.Main為例,就是判斷當前協程是否在主執行緒中執行,是的話則不需要排程,否則需要將協程排程到主執行緒中執行。

  • 不需執行緒排程:最終會呼叫到LaunchLambda.resumeWith,它後續的執行流程之前已經分析過了。
  • 需要執行緒排程:(以主執行緒的協程排程器為例)最終會將傳入的Runnable在主執行緒中執行。

Runnablerun方法在哪實現的呢?在DispatchedContinuation的父類DispatchedTask中有run方法的實現:

    public final override fun run() {
        ...
        try {
            // 獲取到的delegate其實就是DispatchedContinuation
            val delegate = delegate as DispatchedContinuation<T>
            // 獲取到的continuation其實就是LaunchLambda
            val continuation = delegate.continuation
            val context = continuation.context
            val state = takeState() // NOTE: Must take state in any case, even if cancelled
            withCoroutineContext(context, delegate.countOrElement) {
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        // 正常情況下,會執行到這裡,呼叫LaunchLambda的resume方法
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            ...
        } finally {
            ...
        }
    }

run方法中,最終會呼叫到LaunchLambdaresume方法(內部又會呼叫到resumeWith方法)。所以這裡做的執行緒排程,其實就是通過主執行緒的handler,將程式碼post到主執行緒中去執行,從而完成執行緒的排程工作。

另外,還有幾個未研究的地方與自己的猜想:

一、releaseIntercepted方法:在BaseContinuationImpl::resumeWith中,每執行完一個續體的invokeSuspend方法,就會呼叫該續體的releaseIntercepted方法

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        // intercepted不為null且不為自身(即之前成功攔截續體),就進入If塊
        if (intercepted != null && intercepted !== this) {
            // 呼叫續體攔截器的releaseInterceptedContinuation方法,並傳入續體包裝類
 		 	context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
            	 intercepted)
        }
        // 將intercepted變數設定為CompletedContinuation
        this.intercepted = CompletedContinuation // just in case
    }

續體攔截器的releaseInterceptedContinuation方法應該是做一些資源清理的工作。

二、像withContext這樣的函式:

scope.launch(Dispatchers.Main) {
    withContext(Dispatchers.IO) {}
}
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {...}

block塊執行完後,會將執行緒自動切回「啟動協程時的協程排程器所指定」的執行緒,那麼它是如何切回來的呢?個人猜測,在協程至上而下呼叫的時候,協程上下文會一層一層地向下傳遞,withContextblock塊執行的時候,協程上下文會被儲存在某個地方,等到block塊執行結束的時候,會從之前儲存的協程上下文中取出協程排程器,將剩餘的程式碼(協程恢復)排程到相應的執行緒中去執行,從而實現了 block塊執行完後,執行緒會自動切回「啟動協程時的協程排程器所指定」的執行緒。

參考

  1. 協程咖啡廳 - 構造魔法 - 探索 Kotlin 協程實現原理 - M.D

  2. Suspend functions - Kotlin Vocabulary - YouTube