1. 程式人生 > >Kotlin協程通訊機制: Channel

Kotlin協程通訊機制: Channel

Coroutines Channels

Java中的多執行緒通訊, 總會涉及到共享狀態(shared mutable state)的讀寫, 有同步, 死鎖等問題要處理.

協程中的Channel用於協程間的通訊, 它的宗旨是:

Do not communicate by sharing memory; instead, share memory by communicating.

Channel basics

channels用於協程間的通訊, 允許我們在不同的協程間傳遞資料(a stream of values).

生產者-消費者模式

傳送資料到channel的協程被稱為producer, 從channel接受資料的協程被稱為consumer

.

生產: send, produce.
消費: receive, consume.

當需要的時候, 多個協程可以向同一個channel傳送資料, 一個channel的資料也可以被多個協程接收.

當多個協程從同一個channel接收資料的時候, 每個元素僅被其中一個consumer消費一次. 處理元素會自動將其從channel裡刪除.

Channel的特點

Channel在概念上有點類似於BlockingQueue, 元素從一端被加入, 從另一端被消費. 關鍵的區別在於, 讀寫的方法不是blocking的, 而是suspending的.
在為空或為滿時. channel可以suspend它的send

receive操作.

Channel的關閉和迭代

Channel可以被關閉, 說明沒有更多的元素了.
取消producer協程也會關閉channel.

在receiver端有一種方便的方式來接收: 用for迭代.

看這個例子:

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x)
        channel.close() // we're done sending
    }
// here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

執行後會輸出:

1
2
3
4
5
Done!

Process finished with exit code 0

如果註釋掉channel.close()就會變成:

1
2
3
4
5

Done沒有被輸出, 程式也沒有退出, 這是因為接受者協程還在一直等待.

不同的Channel型別

庫中定義了多個channel型別, 它們的主要區別在於:

  • 內部可以儲存的元素數量;
  • send是否可以被掛起.

所有channel型別的receive方法都是同樣的行為: 如果channel不為空, 接收一個元素, 否則掛起.

Channel的不同型別:

  • Rendezvous channel: 0尺寸buffer, sendreceive要meet on time, 否則掛起. (預設型別).
  • Unlimited channel: 無限元素, send不被掛起.
  • Buffered channel: 指定大小, 滿了之後send掛起.
  • Conflated channel: 新元素會覆蓋舊元素, receiver只會得到最新元素, send永不掛起.

建立channel:

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

預設是Rendezvous channel.

練習: 分析程式碼輸出

看這段程式碼:

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

這段程式碼建立了一個channel, 傳遞String型別的元素.
兩個producder協程, 分別向channel傳送不同的字串, 傳送完畢後列印各自的"done".
一個receiver協程, 接收channel中的3個元素並列印.

程式的執行輸出結果會是怎樣呢?

記得在Configurations中加上VM options: -Dkotlinx.coroutines.debug. 可以看到協程資訊.

答案揭曉:

[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2

答對了嗎?

為什麼會是這樣呢? 原因主要有兩點:

  • 這裡建立的channel是預設的Rendezvous型別, 沒有buffer, send和receive必須要meet, 否則掛起.
  • 兩個producer和receiver協程都執行在同一個執行緒上, ready to be resumed也只是加入了一個等待佇列, resume要按順序來.

這個例子在Introduction to Coroutines and Channels中有一個視訊解說.

另外, 官方文件中還有一個ping-pang的例子, 為了說明Channels are fair.

參考

  • 官方文件: Channels
  • Introduction to Coroutines and Channels
  • Github: Coroutines Guide
  • Kotlin: Diving in to Coroutines and Channels