1. 程式人生 > 其它 >Android中的Coroutine協程原理詳解

Android中的Coroutine協程原理詳解

前言

協程是一個併發方案。也是一種思想。

傳統意義上的協程是單執行緒的,面對io密集型任務他的記憶體消耗更少,進而效率高。但是面對計算密集型的任務不如多執行緒並行運算效率高。

不同的語言對於協程都有不同的實現,甚至同一種語言對於不同平臺的作業系統都有對應的實現。

我們kotlin語言的協程是 coroutines for jvm的實現方式。底層原理也是利用java 執行緒。

基礎知識

生態架構

相關依賴庫

dependencies {
   // Kotlin
   implementation "org.jetbrains.kotlin:kotlin-stdlib:1.4.32"

   // 協程核心庫
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3"
   // 協程Android支援庫
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3"
   // 協程Java8支援庫
   implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3"
 
   // lifecycle對於協程的擴充套件封裝
   implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
   implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
   implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
}
​

為什麼一些人總覺得協程晦澀難懂?

1.網路上沒有詳細的關於協程的概念定義,每種語言、每個系統對其實現都不一樣。可謂是眾說紛紜,什麼核心態使用者態巴拉巴拉,很容易給我們帶偏

2.kotlin的各種語法糖對我們造成的干擾。如:

  • 高階函式
  • 原始碼實現類找不到

所以紮實的kotlin語法基本功是學習協程的前提。

實在看不懂得地方就反編譯為java,以java最終翻譯為準。

協程是什麼?有什麼用?

kotlin中的協程乾的事就是把非同步回撥程式碼拍扁了,捋直了,讓非同步回撥程式碼同步化。除此之外,沒有任何特別之處。

建立一個協程,就是編譯器背後偷偷生成一系列程式碼,比如說狀態機

通過掛起恢復讓狀態機狀態流轉

實現把層層巢狀的回撥程式碼變成像同步程式碼那樣直觀、簡潔。

它不是什麼執行緒框架,也不是什麼高深的核心態,使用者態。它其實對於咱們安卓來說,就是一個關於回撥函式的語法糖。。。

本文將會圍繞掛起恢復徹底剖析協程的實現原理

Kotlin函式基礎知識複習

再Kotlin中函式是一等公民,有自己的型別

函式型別

fun foo(){}
//型別為 () -> Unit
fun foo(p: Int){}
//型別為 (Int) -> String

class Foo{
    fun bar(p0: String,p1: Long):Any{}
    
}
//那麼 bar 的型別為:Foo.(String,Long) -> Any
//Foo就是bar的 receiver。也可以寫成 (Foo,String,Long) ->Any

函式引用

fun foo(){} 
//引用是 ::foo
fun foo(p0: Int): String
//引用也是 ::foo

咋都一樣?沒辦法,就這樣規定的。使用的時候 只能靠編譯器推斷

val f: () -> Unit = ::foo //編譯器會推斷出是fun foo(){} 
val g: (Int) -> String = ::foo //推斷為fun foo(p0: Int): String

帶Receiver的寫法

class Foo{
    fun bar(p0: String,p1: Long):Any{}
}

val h: (Foo,String,Long) -> Any = Foo:bar

繫結receiver的函式引用:

val foo: Foo = Foo()
val m: (String,Long) -> Any = foo:bar

額外知識點

val x: (Foo,String,Long) -> Any = Foo:bar
val y: Function3<Foo,String,Long,Any> = x

Foo.(String,Long) -> Any = (Foo,String,Long) ->Any = Function3<Foo,String,Long,Any>

函式作為引數傳遞

fun yy(p: (Foo,String,Long)->Any){
	p(Foo(),"Hello",3L)//直接p()就能呼叫
    //p.invoke(Foo(),"Hello",3L) 也可以用invoke形式
}

Lambda

就是匿名函式,它跟普通函式比是沒有名字的,聽起來好像是廢話

//普通函式
fun func(){
   println("hello");
}
​
//去掉函式名 func,就成了匿名函式
fun(){
   println("hello");    
}
​
//可以賦值給一個變數
val func = fun(){
   println("hello");    
}
​
//匿名函式的型別
val func :()->Unit = fun(){
   println("hello");    
}
​
//Lambda表示式
val func={
   print("Hello");
}
​
//Lambda型別
val func :()->String = {
print("Hello");
"Hello" //如果是Lambda中,最後一行被當作返回值,能省掉return。普通函式則不行
}
​
//帶引數Lambda
val f1: (Int)->Unit = {p:Int ->
print(p);
}
//可進一步簡化為
val f1 = {p:Int ->
print(p);    
}
//當只有一個引數的時候,還可以寫成
val f1: (Int)->Unit = {
   print(it);
}
​

關於函式的個人經驗總結

函式跟匿名函式看起來沒啥區別,但是反編譯為java後還是能看出點差異

如果只是用普通的函式,那麼他跟普通java 函式沒啥區別。

比如 fun a() 就是對應java方法public void a(){}

但是如果通過函式引用(:: a)來用這個函式,那麼他並不是直接呼叫fun a()而是重新生成一個Function0

掛起函式

suspend 修飾。

掛起函式中能呼叫任何函式。

非掛起函式只能呼叫非掛起函式。

換句話說,suspend函式只能在suspend函式中呼叫。


簡單的掛起函式展示:

//com.example.studycoroutine.chapter.CoroutineRun.kt
suspend fun suspendFun(): Int {
    return 1;
}

掛起函式特殊在哪?

public static final Object suspendFun(Continuation completion) {
    return Boxing.boxInt(1);
}

這下理解suspend為啥只能在suspend裡面呼叫了吧?

想要讓道貌岸然的suspend函式幹活必須要先滿足它!!!就是給它裡面塞入一顆球。

然後他想呼叫其他的suspend函式,只需將球繼續塞到其它的suspend方法裡面。

普通函式裡沒這玩意啊,所以壓根沒法呼叫suspend函式。。。

讀到這裡,想必各位會有一些疑問:

  • question1.這不是雞生蛋生雞的問題麼?第一顆球是哪來的?

  • question2.為啥編譯後返回值也變了?

  • question3.suspendFun 如果在協程體內被呼叫,那麼他的球(completion)是誰?

標準庫給我們提供的最原始工具

public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {
   createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
​
public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =
   SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

以一個最簡單的方式啟動一個協程。

Demo-K1

fun main() {
   val b = suspend {
       val a = hello2()
       a
  }
   b.createCoroutine(MyCompletionContinuation()).resume(Unit)
}
​
suspend fun hello2() = suspendCoroutine<Int> {
   thread{
       Thread.sleep(1000)
       it.resume(10086)
  }
}
​
class MyContinuation() : Continuation<Int> {
   override val context: CoroutineContext = CoroutineName("Co-01")
   override fun resumeWith(result: Result<Int>) {
       log("MyContinuation resumeWith 結果 = ${result.getOrNull()}")
  }
}

兩個建立協程函式區別

startCoroutine 沒有返回值 ,而createCoroutine返回一個Continuation,不難看出是SafeContinuation

好像看起來主要的區別就是startCoroutine直接呼叫resume(Unit),所以不用包裝成SafeContinuation,而createCoroutine則返回一個SafeContinuation,因為不知道將會在何時何處呼叫resume,必須保證resume只調用一次,所以包裝為safeContinuation

SafeContinuationd的作用是為了確保只有發生非同步呼叫時才掛起

分析createCoroutineUnintercepted

//kotlin.coroutines.intrinsics.CoroutinesIntrinsicsH.kt
@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit>

先說結論

其實可以簡單的理解為kotlin層面的原語,就是返回一個協程體。

開始分析

引用程式碼Demo-K1首先b 是一個匿名函式,他肯定要被編譯為一個FunctionX,同時它還被suspend修飾 所以它肯定跟普通匿名函式編譯後不一樣。

編譯後的原始碼為

public static final void main() {
     Function1 var0 = (Function1)(new Function1((Continuation)null) {
        int label;
​
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
           Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           Object var10000;
           switch(this.label) {
           case 0:
              ResultKt.throwOnFailure($result);
              this.label = 1;
              var10000 = TestSampleKt.hello2(this);
              if (var10000 == var3) {
                 return var3;
              }
              break;
           case 1:
              ResultKt.throwOnFailure($result);
              var10000 = $result;
              break;
           default:
              throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
          }
​
           int a = ((Number)var10000).intValue();
           return Boxing.boxInt(a);
        }
​
        @NotNull
        public final Continuation create(@NotNull Continuation completion) {
           Intrinsics.checkParameterIsNotNull(completion, "completion");
           Function1 var2 = new <anonymous constructor>(completion);
           return var2;
        }
​
        public final Object invoke(Object var1) {
           return((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
        }
    });
     boolean var1 = false;
     Continuation var7 = ContinuationKt.createCoroutine(var0, (Continuation)(newMyContinuation()));
     Unit var8 = Unit.INSTANCE;
     boolean var2 = false;
     Companion var3 = Result.Companion;
     boolean var5 = false;
     Object var6 = Result.constructor-impl(var8);
     var7.resumeWith(var6);
  }

我們可以看到先是 Function1 var0 = new Function1建立了一個物件,此時跟協程沒關係,這步只是編譯器層面的匿名函式語法優化

如果直接

fun main() {
   suspend {
       val a = hello2()
       a
  }.createCoroutine(MyContinuation()).resume(Unit)
}

也是一樣會建立Function1 var0 = new Function1

解答question1

繼續呼叫createCoroutine

再繼續createCoroutineUnintercepted ,找到在JVM平臺的實現

//kotlin.coroutines.intrinsics.IntrinsicsJVM.class
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
   completion: Continuation<T>
): Continuation<Unit> {
//probeCompletion還是我們傳入completion物件,在我們的Demo就是myCoroutine
   val probeCompletion = probeCoroutineCreated(completion)//probeCoroutineCreated方法點進去看了,好像是debug用的.我的理解是這樣的
   //This就是這個suspend lambda。在Demo中就是myCoroutineFun
   return if (this is BaseContinuationImpl)
       create(probeCompletion)
   else
//else分支在我們demo中不會走到
     //當 [createCoroutineUnintercepted] 遇到不繼承 BaseContinuationImpl 的掛起 lambda 時,將使用此函式。
       createCoroutineFromSuspendFunction(probeCompletion) {
          (this as Function1<Continuation<T>, Any?>).invoke(it)
      }
}

@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}

completion傳入,並建立一個新的Function1,作為Continuation返回,這就是創建出來的協程體物件,協程的工作核心就是它內部的狀態機,invokeSuspend函式

呼叫 create

@NotNull
public final Continuation create(@NotNull Continuation completion) {
	Intrinsics.checkNotNullParameter(completion, "completion");
	Function1 var2 = new <anonymous constructor>(completion);
	return var2;
}

completion傳入,並建立一個新的Function1,作為Continuation返回,這就是創建出來的協程體物件,協程的工作核心就是它內部的狀態機,invokeSuspend函式

補充---相關類繼承關係

解答question2&3

已知協程啟動會呼叫協程體的resume,該呼叫最終會來到BaseContinuationImpl::resumeWith

internal abstract class BaseContinuationImpl{
   fun resumeWith(result: Result<Any?>) {
          // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
       var current = this
       var param = result
       while (true) {
           with(current) {
               val completion = completion!! // fail fast when trying to resume continuation without completion
               val outcome: Result<Any?> =
                   try {
                       val outcome = invokeSuspend(param)//呼叫狀態機
                       if (outcome === COROUTINE_SUSPENDED) return
                       Result.success(outcome)
                  } catch (exception: Throwable) {
                       Result.failure(exception)
                  }
               releaseIntercepted() // this state machine instance is terminating
               if (completion is BaseContinuationImpl) {
                   // unrolling recursion via loop
                   current = completion
                   param = outcome
              } else {
                   //最終走到這裡,這個completion就是被塞的第一顆球。
                   completion.resumeWith(outcome)
                   return
              }
          }
      }
  }
}

狀態機程式碼擷取

public final Object invokeSuspend(@NotNull Object $result) {
   Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   Object var10000;
   switch(this.label) {
   case 0://第一次進來 label = 0 
      ResultKt.throwOnFailure($result);
      // label改成1了,意味著下一次被恢復的時候會走case 1,這就是所謂的【狀態流轉】
      this.label = 1; 
      //全體目光向我看齊,我宣佈個事:this is 協程體物件。
      var10000 = TestSampleKt.hello2(this);
      if (var10000 == var3) {
         return var3;
      }
      break;
   case 1:
      ResultKt.throwOnFailure($result);
      var10000 = $result;
      break;
   default:
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
   }

   int a = ((Number)var10000).intValue();
   return Boxing.boxInt(a);
}

question3答案出來了,傳進去的是create建立的那個continuation

最後再來聊聊question2,從上面的程式碼已經很清楚的告訴我們為啥掛起函式反編譯後的返回值變為object了。

以hello2為例子,hello2能返回代表掛起的白板,也能返回result。如果返回白板,狀態機return,協程掛起。如果返回result,那麼hello2執行完畢,是一個沒有掛起的掛起函式,通常編譯器也會提醒 suspend 修飾詞無意義。所以這就是設計需要,沒有啥因為所以。

最後,除了直接返回結果的情況,掛起函式一定會以resume結尾,要麼返回result,要麼返回異常。代表這個掛起函式返回了。

呼叫resume意義在於重新回撥BaseContinuationImpl的resumeWith,進而喚醒狀態機,繼續執行協程體的程式碼。

換句話說,我們自定義的suspend函式,一定要利用suspendCoroutine 獲得續體,即狀態機物件,否則無法實現真正的掛起與resume。

suspendCoroutine

我們可以不用suspendCoroutine,用更直接的suspendCoroutineUninterceptedOrReturn也能實現,不過這種方式要手動返回白板。不過一定要小心,要在合理的情況下返回或者不返回,不然會產生很多意想不到的結果

suspend fun mySuspendOne() = suspendCoroutineUninterceptedOrReturn<String> { continuation ->
    thread {
        TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")
    }
    //因為我們這個函式沒有返回正確結果,所以必須返回一個掛起標識,否則BaseContinuationImpl會認為完成了任務。 
    // 並且我們的執行緒又在執行沒有取消,這將很多意想不到的結果
    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}

而suspendCoroutine則沒有這個隱患

suspend fun mySafeSuspendOne() = suspendCoroutine<String> { continuation ->
    thread {
        TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")
    }
    //suspendCoroutine函式很聰明的幫我們判斷返回結果如果不是想要的物件,自動返				
    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
    	//封裝一個代理Continuation物件
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        //根據block返回結果判斷要不要返回COROUTINE_SUSPENDED
        safe.getOrThrow()
    }

SafeContinuation的奧祕

//呼叫單引數的這個構造方法
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
@Volatile
private var result: Any? = initialResult //UNDECIDED賦值給 result 
//java原子屬性更新器那一套東西
private companion object {
       @Suppress("UNCHECKED_CAST")
       @JvmStatic
       private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
           SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
      )
  }
​
internal actual fun getOrThrow(): Any? {
   var result = this.result // atomic read
   if (result === UNDECIDED) { //如果UNDECIDED,那麼就把result設定為COROUTINE_SUSPENDED
       if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) returnCOROUTINE_SUSPENDED
       result = this.result // reread volatile var
  }
   return when {
       result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
       result is Result.Failure -> throw result.exception
       else -> result // either COROUTINE_SUSPENDED or data <-這裡返回白板
  }
}
​
public actual override fun resumeWith(result: Result<T>) {
       while (true) { // lock-free loop
           val cur = this.result // atomic read。不理解這裡的官方註釋為啥叫做原子讀。我覺得 Volatile只能保證可見性。
           when {
             //這裡如果是UNDECIDED 就把 結果附上去。
               cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
             //如果是掛起狀態,就通過resumeWith回撥狀態機
               cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)){
                   delegate.resumeWith(result)
                   return
              }
               else -> throw IllegalStateException("Already resumed")
          }
      }
  }

val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()

先回顧一下什麼叫真正的掛起,就是getOrThrow返回了“白板”,那麼什麼時候getOrThrow能返回白板?答案就是result被初始化後值沒被修改過。那麼也就是說resumeWith沒有被執行過,即:block(safe)這句程式碼,block這個被傳進來的函式,執行過程中沒有呼叫safe的resumeWith。原理就是這麼簡單,cas程式碼保證關鍵邏輯的原子性與併發安全

繼續以Demo-K1為例子,這裡假設hello2執行在一條新的子執行緒,否則仍然是沒有掛起。

{
   thread{
       Thread.sleep(1000)
       it.resume(10086)
  }
}

總結

最後,可以說開啟一個協程,就是利用編譯器生成一個狀態機物件,幫我們把回撥程式碼拍扁,成為同步程式碼。