Kotlin協程通訊機制: Channel
Coroutines Channels
Java中的多執行緒通訊, 總會涉及到共享狀態(shared mutable state)的讀寫, 有同步, 死鎖等問題要處理.
協程中的Channel用於協程間的通訊, 它的宗旨是:
Do not communicate by sharing memory; instead, share memory by communicating.
Channel basics
channels用於協程間的通訊, 允許我們在不同的協程間傳遞資料(a stream of values).
生產者-消費者模式
傳送資料到channel的協程被稱為producer
, 從channel接受資料的協程被稱為consumer
生產: send, produce.
消費: receive, consume.
當需要的時候, 多個協程可以向同一個channel傳送資料, 一個channel的資料也可以被多個協程接收.
當多個協程從同一個channel接收資料的時候, 每個元素僅被其中一個consumer消費一次. 處理元素會自動將其從channel裡刪除.
Channel的特點
Channel
在概念上有點類似於BlockingQueue
, 元素從一端被加入, 從另一端被消費. 關鍵的區別在於, 讀寫的方法不是blocking的, 而是suspending的.
在為空或為滿時. channel可以suspend它的send
receive
操作.
Channel的關閉和迭代
Channel可以被關閉, 說明沒有更多的元素了.
取消producer協程也會關閉channel.
在receiver端有一種方便的方式來接收: 用for
迭代.
看這個例子:
fun main() = runBlocking<Unit> { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) println(y) println("Done!") }
執行後會輸出:
1
2
3
4
5
Done!
Process finished with exit code 0
如果註釋掉channel.close()
就會變成:
1
2
3
4
5
Done沒有被輸出, 程式也沒有退出, 這是因為接受者協程還在一直等待.
不同的Channel型別
庫中定義了多個channel型別, 它們的主要區別在於:
- 內部可以儲存的元素數量;
send
是否可以被掛起.
所有channel型別的receive
方法都是同樣的行為: 如果channel不為空, 接收一個元素, 否則掛起.
Channel的不同型別:
- Rendezvous channel: 0尺寸buffer,
send
和receive
要meet on time, 否則掛起. (預設型別). - Unlimited channel: 無限元素,
send
不被掛起. - Buffered channel: 指定大小, 滿了之後
send
掛起. - Conflated channel: 新元素會覆蓋舊元素, receiver只會得到最新元素,
send
永不掛起.
建立channel:
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
預設是Rendezvous channel.
練習: 分析程式碼輸出
看這段程式碼:
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
這段程式碼建立了一個channel, 傳遞String型別的元素.
兩個producder協程, 分別向channel傳送不同的字串, 傳送完畢後列印各自的"done".
一個receiver協程, 接收channel中的3個元素並列印.
程式的執行輸出結果會是怎樣呢?
記得在Configurations中加上VM options: -Dkotlinx.coroutines.debug
. 可以看到協程資訊.
答案揭曉:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
答對了嗎?
為什麼會是這樣呢? 原因主要有兩點:
- 這裡建立的channel是預設的Rendezvous型別, 沒有buffer, send和receive必須要meet, 否則掛起.
- 兩個producer和receiver協程都執行在同一個執行緒上, ready to be resumed也只是加入了一個等待佇列, resume要按順序來.
這個例子在Introduction to Coroutines and Channels中有一個視訊解說.
另外, 官方文件中還有一個ping-pang的例子, 為了說明Channels are fair.
參考
- 官方文件: Channels
- Introduction to Coroutines and Channels
- Github: Coroutines Guide
- Kotlin: Diving in to Coroutines and Channels