1. 程式人生 > >Kotlin協程快速進階

Kotlin協程快速進階

大家元旦快樂,去年寫了篇Kotlin協程快速入門,簡單介紹了下協程的一些基本概念,今天來介紹下一些其他重要的知識點。

Channel

在協程裡面開啟另一個協程是很方便的,但如果想在它們之間傳遞訊息,或者說協程間通訊該怎麼做呢?Channel(通道)就可以用作在協程之間簡單的傳送接收資料:

fun main() = runBlocking {
        val channel = Channel<String>()
        launch {
                channel.send("apple")
        }
        println("I like ${channel.receive()}
"
) } 複製程式碼

這種做法是很像消費者與生產者模式。生產者一方生成併發送一定量的資料放到緩衝區中,與此同時,消費者也在緩衝區消耗這些資料。這一點通過它所繼承的介面定義也能很好地體現:

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
  public companion object Factory {
       
        public const val UNLIMITED = Int.MAX_VALUE

        public const val
RENDEZVOUS = 0 public const val CONFLATED = -1 } } 複製程式碼

通道緩衝區 通道是一個介面,根據緩衝區容量不同,有四種不同的具體實現。

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        else
-> ArrayChannel(capacity) } 複製程式碼

Channel的緩衝區預設是0個,當有資訊send進去後,協程就會被掛起,只有被調receive後才會繼續執行。如果容量大於0,當達到容量最大值時也同樣會被掛起:

 fun main() = runBlocking {
        val channel = Channel<Int>(2)
        launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send $x")
            }
        }
        delay(200L)
        repeat(2) { println("receive ${channel.receive()}") }
複製程式碼

結果如下,在傳送兩個資料後,只有收到一個數據後才會繼續傳送:

2019-01-01 19:01:11.176 30809-30809/com.renny.kotlin I/System.out: send 1
2019-01-01 19:01:11.176 30809-30809/com.renny.kotlin I/System.out: send 2
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: receive 1
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: receive 4
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: send 3
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: send 4
複製程式碼

以上就是Channel的基本用法了,看到這,熟悉Java併發程式設計的同學很容易聯想到阻塞佇列,它們的作用是很相似的。Channel實現的阻塞佇列並不是真正的阻塞,而是協程被掛起,並且它是可以被關閉的。

Channel詳解

上面說道Channel繼承了SendChannelReceiveChannel,它本身沒有實現邏輯,所以我們來看下這兩個介面的一些重要方法:

public fun offer(element: E): Boolean
複製程式碼

這也是傳送訊息的方法,不過和send不同,它有返回值,在Channel緩衝區容量滿了的時候不會掛起而是直接返回false。

public fun close(cause: Throwable? = null): Boolean
複製程式碼

關閉通道,關閉通道後再呼叫send或者offer會丟擲異常。在傳送方可以用isClosedForSend來判斷通道是否關閉。對應的, 還有isClosedForReceive,但它會在所有之前傳送的元素收到之後才返回 "true"。

 public fun poll(): E?
複製程式碼

offer對應,從緩衝區取不到訊息會返回空,而不是像receive一樣掛起協程。

public fun cancel(): Unit
複製程式碼

會取消接受訊息並移除緩衝區的所有元素,因此isClosedForReceive也會立即返回"true"。

 public operator fun iterator(): ChannelIterator<E>
複製程式碼

通過返回一個迭代器來接受緩衝區的訊息,其實直接用for迴圈也是可以的(Channel並不是一個集合,可能是對協程的特殊支援吧):

fun main() = runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..5) channel.send(x * x)
            channel.close()
        }
        for (y in channel) println(y)
        println("Done!")
    }
複製程式碼

Channel進階

事件的合併

再回到最初,緩衝區容量定義,大於等於0的值都很好理解,但Channel.CONFLATED = -1是什麼鬼? 我們來改造下上面的demo:

 fun main() = runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)
        launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send $x")
            }
        }
        delay(200L)
        repeat(2) { println("receive ${channel.receive()}") }
    }
複製程式碼

輸出如下:

2019-01-01 20:10:29.922 1314-1314/com.renny.kotlin I/System.out: send 1
2019-01-01 20:10:29.922 1314-1314/com.renny.kotlin I/System.out: send 2
2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/System.out: send 3
2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/System.out: send 4
2019-01-01 20:10:29.928 1314-1314/com.renny.kotlin I/System.out: send 5
2019-01-01 20:10:30.117 1314-1314/com.renny.kotlin I/System.out: receive 25
複製程式碼

send方法並沒有被掛起,但我們只收到了一個訊息。事實上,定義為Channel.CONFLATED時,緩衝區的的容量也是1,但當容量已經有訊息,但又有新訊息來的的時候,它會用新訊息來替代當前的訊息。所以根據這個特性,接收方總是能接收到最新的訊息。具體有啥用嘛?比如點選一次按鈕觸發一次動畫,在動畫播放期間的點選事件都將被合併成一次,當動畫結束後,又會開始最新點選的動畫,之間的點選都被略掉了。

擴充套件

上面傳送和接受程式碼寫的多少有些繁瑣,官方還提供了擴充套件方法produceconsumeEach,我們來該寫下例子,不需要手動再開啟發送訊息一方的協程了:

fun main() = runBlocking {
        val squares = produce {
            for (x in 1..5) send(x * x)
        }
        squares.consumeEach { println(it) }
        println("Done!")
    }
複製程式碼

async/await

async 非同步, await 等待 ,這兩個方法是協程為了更好解決非同步任務而推出的,熟悉JS、C#等語言的人對這兩個方法肯定很熟悉,用法也是差不多的。

 fun main() = runBlocking{
        var time = measureTimeMillis {
            val one = doSomethingUsefulOne()
            val two = doSomethingUsefulTwo()
            println("The answer is ${one + two}")
        }
        println("Sync completed in $time ms")

         time = measureTimeMillis {
            val one = async { doSomethingUsefulOne() }
            val two = async { doSomethingUsefulTwo() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Async completed in $time ms")
    }

    suspend fun doSomethingUsefulOne(): Int {
        delay(1000L)
        return 13
    }

    suspend fun doSomethingUsefulTwo(): Int {
        delay(1000L)
        return 29
    }
複製程式碼

結果是一樣的,但耗時卻差了一半。我們就像呼叫同步任務一樣啟用非同步,不得不說比java原生實現優雅多了

2019-01-01 20:52:15.482 3520-3520/com.renny.kotlin I/System.out: The answer is 42
2019-01-01 20:52:15.483 3520-3520/com.renny.kotlin I/System.out: Sync completed in 2006 ms
2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/System.out: The answer is 42
2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/System.out: Async completed in 1006 ms
複製程式碼

async是一個擴充套件方法,在裡面啟動了一個子協程,看下定義:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
複製程式碼

返回的也不再是Job物件,而是Deferred,這兩者大概像就是runnable和callable的關係吧,無返回值和有返回值,其他都差不多。而await會掛起當前的協程,直到子協程程式碼結束並拿到返回結果,和join也類似