Kotlin 協程官網翻譯
你的第一個協程
fun main(args: Array<String>) {
launch { //在後臺啟動新的協程並繼續
delay(1000L) //非阻塞延遲1秒(預設時間單位為ms)
println("World!") //延遲後列印
}
println("Hello,") //主執行緒繼續,而協程延遲
Thread.sleep(2000L)//阻塞主執行緒2秒以保持JVM活動
}
輸出結果
Hello,
World!
從本質上講,協同程式是輕量級的執行緒。它們是與釋出 協同程式構建器一起啟動的。您可以實現相同的結果替換 launch { … }
如果以替換launch為開頭thread,則編譯器會產生以下錯誤:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
這是因為delay是一個特殊的掛起函式,它不會阻塞一個執行緒,但會掛起 協同程式,它只能從協程中使用。
橋接阻塞和非阻塞世界
第一個示例在同一程式碼中混合非阻塞 delay(…)和阻塞 Thread.sleep(…)。很容易迷失哪一個阻塞而另一個阻塞。讓我們明確說明使用runBlocking coroutine builder進行阻塞:
fun main(args: Array<String>) { launch { // launch new coroutine in background and continue delay(1000L) println("World!") } println("Hello,") // main thread continues here immediately runBlocking { // but this expression blocks the main thread delay(2000L) // ... while we delay for 2 seconds to keep JVM alive } }
結果是相同的,但此程式碼僅使用非阻塞延遲。主執行緒,呼叫runBlocking,塊,直到協程內runBlocking完成。
這個例子也可以用更慣用的方式重寫,runBlocking用來包裝main函式的執行:
等待工作
在另一個協程正在工作時延遲一段時間並不是一個好方法。讓我們明確等待(以非阻塞方式),直到我們啟動的後臺作業完成:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
提取函式重構
讓我們將程式碼塊提取launch { … }到一個單獨的函式中。當您對此程式碼執行“提取功能”重構時,您將獲得帶有suspend修飾符的新功能。這是你的第一個暫停功能。掛起函式可以在協程內部使用,就像常規函式一樣,但它們的附加功能是它們可以反過來使用其他掛起函式(如delay本示例中所示)來暫停協程的執行。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
協同程式足夠輕量級
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) {
// launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
它啟動了100K協同程式,一秒鐘之後,每個協同程式都打印出一個點。現在,嘗試使用執行緒。會發生什麼?(很可能你的程式碼會產生某種記憶體不足的錯誤)
協同程式就像守護程式執行緒
下面的程式碼啟動一個長時間執行的協同程式,每秒列印“我正在睡覺”兩次,然後在一段延遲後從main函式返回:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
您可以執行並看到它列印三行並終止:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
活動協同程式不會使程序保持活動狀態。它們就像守護程式執行緒。
取消和超時
在小應用程式中,從“main”方法返回可能聽起來像是一個好主意,以便隱式終止所有協同程式。在較大的長期執行的應用程式中,您需要更精細的控制。在推出函式返回一個作業,可用於取消執行協程:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
輸出如下
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
主呼叫後job.cancel,我們看不到其他協同程式的任何輸出,因為它已被取消。還有一個Job擴充套件函式cancelAndJoin ,它結合了取消和連線呼叫。
取消是合作的
協同取消是合作的。協程程式碼必須合作才能取消。所有掛起函式kotlinx.coroutines都是可取消的。他們檢查coroutine的取消並在取消時丟擲CancellationException。但是,如果協程正在計算中並且未檢查取消,則無法取消它,如下例所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
執行它以檢視它繼續列印“我正在睡覺”,即使在取消之後,直到作業在五次迭代後自行完成。
輸出結果
I'm sleep 0
I'm sleep 1
I'm sleep 2
main I;m tried of waiting
I'm sleep 3
I'm sleep 4
main Now I can quit
使計算程式碼可取消
有兩種方法可以使計算程式碼可以取消。第一個是定期呼叫檢查取消的掛起功能。有一個收益率的功能是實現這一目的的好選擇。另一個是明確檢查取消狀態。讓我們嘗試後一種方法。
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
如您所見,現在此迴圈已取消。isActive是通過CoroutineScope物件在協同程式程式碼中可用的屬性。
最後關閉資源
可取消的掛起函式會在取消時丟擲CancellationException,這可以通過所有常規方式處理。例如,當取消協程時,try {…} finally {…}表示式和Kotlin use函式通常會執行其終結操作:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
無論加入和cancelAndJoin等待所有完成動作來完成的,所以上面的例子產生下面的輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
執行不可取消的塊
finally在前一個示例的塊中嘗試使用掛起函式將導致CancellationException,因為執行此程式碼的協程將 被取消。通常,這不是問題,因為所有表現良好的關閉操作(關閉檔案,取消作業或關閉任何型別的通訊通道)通常都是非阻塞的,並且不涉及任何掛起功能。但是,在極少數情況下,當您需要掛起已取消的協同程式時,可以withContext(NonCancellable) {…}使用withContext函式和NonCancellable上下文包裝相應的程式碼, 如下例所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
超時退出
在實踐中取消協程執行的最明顯的原因是因為它的執行時間超過了一些超時。雖然您可以手動跟蹤對相應作業的引用並啟動單獨的協同程式以在延遲後取消跟蹤的協程,但是有一個準備好使用withTimeout函式執行此操作。請看以下示例:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
該TimeoutCancellationException由丟擲withTimeout是的子類CancellationException。我們之前沒有看到它的堆疊跟蹤列印在控制檯上。這是因為在取消的協程中CancellationException被認為是協程完成的正常原因。但是,在這個例子中我們withTimeout在main函式內部使用了。
因為取消只是一個例外,所有資源都將以通常的方式關閉。您可以在超時包裹程式碼try {…} catch (e: TimeoutCancellationException) {…}塊,如果你需要專門做一些額外的行動在任何型別的超時或使用withTimeoutOrNull功能類似於withTimeout,但返回null的超時,而不是丟擲一個異常:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
執行此程式碼時不再有異常:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
撰寫暫停功能
預設順序
假設我們在其他地方定義了兩個掛起函式,它們可以像某種遠端服務呼叫或計算一樣有用。我們只是假裝它們很有用,但實際上每個只是為了這個例子的目的而延遲一秒:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
如果需要按順序呼叫它們,我們該怎麼做- 首先doSomethingUsefulOne 然後 doSomethingUsefulTwo計算結果的總和?實際上,如果我們使用第一個函式的結果來決定是否需要呼叫第二個函式或決定如何呼叫它,我們就會這樣做。
我們只使用正常的順序呼叫,因為協程中的程式碼與常規程式碼中的程式碼一樣,預設是順序的。以下示例通過測量執行兩個掛起函式所需的總時間來演示它:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
它產生這樣的東西:
The answer is 42
Completed in 2017 ms
併發使用非同步
如果在呼叫doSomethingUsefulOne和之間沒有依賴關係,doSomethingUsefulTwo並且我們希望通過同時執行兩者來更快地得到答案,該怎麼辦?這是非同步來幫助的地方。
從概念上講,非同步就像啟動一樣。它啟動一個單獨的協程,這是一個輕量級的執行緒,與所有其他協同程式同時工作。不同之處在於launch返回一個Job並且不攜帶任何結果值,同時async返回Deferred - 一個輕量級的非阻塞未來,表示稍後提供結果的承諾。您可以使用.await()延遲值來獲取其最終結果,但Deferred也是a Job,因此您可以根據需要取消它。
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
它產生這樣的東西:
The answer is 42
Completed in 1017 ms
這是兩倍的速度,因為我們同時執行了兩個協同程式。注意,與協同程式的併發性始終是顯式的。
懶惰地開始非同步
使用值為CoroutineStart.LAZY的可選引數進行非同步時有一個惰性選項。它僅在某些等待需要其結果或呼叫啟動函式時才啟動協同程式 。執行以下示例,該示例僅與此前一個示例不同:start
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
它產生這樣的東西:
The answer is 42
Completed in 2017 ms
所以,我們回到順序執行,因為我們首先啟動並等待one,然後啟動並等待two。它不是懶惰的預期用例。lazy在計算值涉及暫停函式的情況下,它被設計為標準函式的替代。
非同步風格的功能
我們可以定義使用非同步協同生成器呼叫doSomethingUsefulOne和doSomethingUsefulTwo 非同步呼叫的非同步樣式函式。使用“Async”字尾命名此類函式是一種很好的方式,以突出顯示它們只啟動非同步計算並且需要使用結果延遲值來獲取結果的事實。
// somethingUsefulOneAsync的結果型別是Deferred <Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// somethingUsefulTwoAsync的結果型別是Deferred <Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
注意,這些xxxAsync功能不是 暫停功能。它們可以在任何地方使用。但是,它們的使用總是意味著它們的動作與呼叫程式碼的非同步(這裡意味著併發)。
以下示例顯示了它們在協同程式之外的用法:
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
協同上下文和排程員
協同程式總是在某些上下文中執行,該上下文由 在Kotlin標準庫中定義的CoroutineContext型別的值表示 。
協程上下文是一組各種元素。主要元素是我們之前見過的協同工作及其排程程式,本節將對其進行介紹。
排程員和執行緒
協程上下文包括一個協程排程程式(請參閱CoroutineDispatcher),它確定相應的協程用於執行的執行緒。協程排程程式可以將協程執行限制在特定執行緒,將其分派給執行緒池,或讓它無限制地執行。
所有協同構建器(如launch和async)都接受一個可選的 CoroutineContext 引數,該引數可用於顯式指定新協程和其他上下文元素的排程程式。
請嘗試以下示例:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
它產生以下輸出(可能以不同的順序):
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
我們在前面部分中使用的預設排程程式由DefaultDispatcher表示,它等於當前實現中的CommonPool。所以,launch { … }是一樣的launch(DefaultDispatcher) { … },它是一樣的launch(CommonPool) { … }。
父coroutineContext和 Unconfined上下文之間的區別 將在稍後顯示。
注意,newSingleThreadContext建立一個新執行緒,這是一個非常昂貴的資源。在實際應用程式中,它必須在不再需要時釋放,使用close 函式,或者儲存在頂級變數中並在整個應用程式中重用。
無限制與受限制的排程員
該開敞協程排程員開始協程在呼叫執行緒,但直到第一個懸掛點。暫停後,它將線上程中恢復,該執行緒完全由呼叫的掛起函式確定。當協同程式不消耗CPU時間也不更新任何侷限於特定執行緒的共享資料(如UI)時,無限制排程程式是合適的。
另一方面, coroutineContext 屬性(在任何協同程式中可用)是對此特定協同程式的上下文的引用。這樣,可以繼承父上下文。特別是runBlocking協同程式的預設排程程式僅限於呼叫程式執行緒,因此繼承它具有通過可預測的FIFO排程將執行限制在此執行緒的效果。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
所以,這繼承了協程coroutineContext的runBlocking {…}繼續在執行main執行緒,而不受限制一個曾在預設執行執行緒重新恢復延遲 功能使用。
除錯協程和執行緒
協同程式可以暫停在一個執行緒,並恢復與另一個執行緒開敞排程員或預設多執行緒排程。即使使用單執行緒排程程式,也可能很難弄清楚協程正在做什麼,何時何地。使用執行緒除錯應用程式的常用方法是在每個日誌語句的日誌檔案中列印執行緒名稱。日誌框架普遍支援此功能。使用協同程式時,單獨的執行緒名稱不會給出很多上下文,因此 kotlinx.coroutines包括除錯工具以使其更容易。
使用-Dkotlinx.coroutines.debugJVM選項執行以下程式碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
有三個協同程式。主協程(#1) - runBlocking一個和兩個協程計算延遲值a(#2)和b(#3)。它們都在上下文中執行,runBlocking並且僅限於主執行緒。此程式碼的輸出是:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
該log函式在方括號中列印執行緒的名稱,您可以看到它是main 執行緒,但是當前正在執行的協程的識別符號被附加到它。開啟除錯模式時,會將此識別符號連續分配給所有已建立的協同程式。
您可以在newCoroutineContext函式的文件中閱讀有關除錯工具的更多資訊。
線上程之間跳轉
使用 -Dkotlinx.coroutines.debug JVM選項執行以下程式碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
它演示了幾種新技術。一個是使用帶有明確指定上下文的runBlocking,另一個是使用withContext函式來更改協程的上下文,同時仍然保持在下面的輸出中可以看到的相同協程:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
請注意,此示例還使用useKotlin標準庫中的函式來釋放在不再需要時使用newSingleThreadContext建立的執行緒。
工作在上下文中
協程的工作是其背景的一部分。協程可以使用coroutineContext[Job]表示式從其自己的上下文中檢索它:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
在除錯模式下執行時會產生類似的東西:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
因此,isActive在CoroutineScope僅僅是一個方便快捷 coroutineContext[Job]?.isActive == true。
子協程
當 coroutineContext 協程的用於啟動另一個協程,該工作新協程成為孩子的家長協程的工作。當父協程被取消時,它的所有子節點也會被遞迴取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
此程式碼的輸出是:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
結合上下文
可以使用+運算子組合協程上下文。右側的上下文替換了左側上下文的相關條目。例如,可以繼承父協程的Job,同時替換其排程程式:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
此程式碼的預期結果是:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
父母的責任
父協同程式總是等待所有孩子的完成。Parent不必顯式跟蹤它啟動的所有子節點,也不必使用Job.join在結束時等待它們:
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
結果將是:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
命名協同程式以進行除錯
當協同程式經常記錄時,自動分配的ID很好,您只需要關聯來自同一協程的日誌記錄。但是,當協程與特定請求的處理或執行某些特定後臺任務相關聯時,最好將其明確命名以用於除錯目的。 CoroutineName上下文元素與執行緒名稱具有相同的功能。當除錯模式開啟時,它將顯示在執行此協程的執行緒名稱中。
以下示例演示了此概念:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
它使用-Dkotlinx.coroutines.debugJVM選項生成的輸出類似於:
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
通過明確的工作取消
讓我們將關於上下文,child物件和工作的知識放在一起。假設我們的應用程式有一個具有生命週期的物件,但該物件不是協程。例如,我們正在編寫一個Android應用程式,並在Android活動的上下文中啟動各種協同程式,以執行非同步操作以獲取和更新資料,執行動畫等。所有這些協同程式必須在活動被銷燬時取消,以避免記憶體洩漏。
我們可以通過建立與我們活動的生命週期相關聯的Job例項來管理協同程式的生命週期。使用Job()工廠函式建立作業例項,如以下示例所示。為方便起見,launch(coroutineContext + job)我們可以編寫launch(coroutineContext, parent = job)以明確表示正在使用父作業的事實,而不是使用表示式。
現在,Job.cancel的單個呼叫取消了我們啟動的所有孩子。此外,Job.join等待所有這些完成,所以我們也可以在這個示例中使用cancelAndJoin:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
正如你所看到的,只有前三個協同程式列印了一條訊息,而其他協同程式被一次呼叫取消了job.cancelAndJoin()。因此,我們在假設的Android應用程式中需要做的就是在建立活動時建立父作業物件,將其用於子協同程式,並在銷燬活動時取消它。我們不能join在Android生命週期的情況下使用它們,因為它是同步的,但是這種連線能力在構建後端服務以確保有限的資源使用時非常有用。
通道
延遲值提供了在協同程式之間傳輸單個值的便捷方法。管道提供了一種傳輸值流的方法。
通道的基礎知識
一個通道是在概念上非常相似BlockingQueue。一個關鍵的區別是,它不是阻塞put操作,而是暫停傳送,而不是阻塞take操作,它有一個暫停接收。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
此程式碼的輸出是:
1
4
9
16
25
Done!
關閉和迭代通道
與佇列不同,可以關閉通道以指示不再有元素到來。在接收器端,使用常規for迴圈來接收來自通道的元素是方便的。
從概念上講,關閉就像向通道傳送特殊的關閉令牌。一旦收到此關閉令牌,迭代就會停止,因此可以保證收到關閉前所有先前傳送的元素:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
建立渠道生產者
協程生成一系列元素的模式很常見。這是生產者 - 消費者模式的一部分,通常在併發程式碼中找到。您可以將這樣的生成器抽象為一個以通道作為引數的函式,但這與必須從函式返回結果的常識相反。
有一個名為produce的便利協程構建器,它可以很容易地在生產者端執行,並且擴充套件函式consumeEach,它取代了for消費者端的迴圈:
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
管道
管道是一個協程正在生成的模式,可能是無限的值流:
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
而另一個協程或協同程式正在消耗該流,進行一些處理,併產生一些其他結果。在下面的例子中,數字只是平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
主程式碼啟動並連線整個管道:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
我們不必在這個示例應用程式中取消這些協同程式,因為 協同程式就像守護程式執行緒,但是在更大的應用程式中,如果我們不再需要它,我們將需要停止我們的管道。或者,我們可以執行管道協同程式作為 主協程的子代,如以下示例所示。
帶管道的素數
讓我們通過一個使用協程管道生成素數的例子將管道帶到極端。我們從無限的數字序列開始。這次我們引入一個顯式context引數並將其傳遞給generate構建器,以便呼叫者可以控制我們的協程執行的位置:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
以下管道階段過濾傳入的數字流,刪除可由給定素數整除的所有數字:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
現在我們通過從2開始一個數字流來構建我們的管道,從當前通道獲取素數,併為找到的每個素數啟動新的管道階段:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
以下示例列印前十個素數,在主執行緒的上下文中執行整個管道。由於所有協同程式都是在其coroutineContext中作為主runBlocking協程的 子程序啟動的,因此我們不必保留我們已經啟動的所有協同程式的明確列表。我們使用cancelChildren 擴充套件函式取消所有子協同程式。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
此程式碼的輸出是:
2
3
5
7
11
13
17
19
23
29
請注意,您可以使用buildIterator 標準庫中的coroutine builder 來構建相同的管道 。更換produce用buildIterator,send用yield,receive用next, ReceiveChannel用Iterator,並擺脫上下文。你也不需要runBlocking。但是,如上所示使用通道的管道的好處是,如果在CommonPool上下文中執行它,它實際上可以使用多個CPU核心。
無論如何,這是找到素數的極不切實際的方法。在實踐中,管道確實涉及一些其他掛起呼叫(如對遠端服務的非同步呼叫),並且這些管道不能使用buildSeqeunce/ 構建buildIterator,因為它們不允許任意掛起,這與produce完全非同步完全不同 。
扇出
多個協同程式可以從同一個通道接收,在它們之間分配工作。讓我們從生成器協程開始,它定期生成整數(每秒十個數字):
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
然後我們可以有幾個處理器協同程式。在這個例子中,他們只打印他們的id和收到的號碼:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
現在讓我們啟動五個處理器,讓它們工作幾乎一秒鐘。走著瞧吧:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
輸出將類似於以下輸出,儘管接收每個特定整數的處理器ID可能不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消生成器協同程式會關閉其通道,從而最終終止處理器協同程式正在執行的通道上的迭代。
另外,請注意我們如何使用for迴圈顯式迭代通道以在launchProcessor程式碼中執行扇出。與consumeEach此不同,這種for迴圈模式可以非常安全地從多個協同程式中使用。如果其中一個處理器協同程式失敗,則其他處理程式協同程式仍將處理該通道,而通過其寫入的處理器consumeEach 總是在正常或異常終止時消耗(取消)底層通道。
扇入
多個協同程式可以傳送到同一個通道。例如,讓我們有一個字串通道和一個掛起函式,它以指定的延遲重複傳送指定的字串到此通道:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現在,讓我們看看如果我們啟動幾個協同程式傳送字串會發生什麼(在這個例子中,我們在主執行緒的上下文中將它們作為主協程的子節點啟動):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
輸出是:
foo
foo
BAR!
foo
foo
BAR!
緩衝頻道
到目前為止顯示的通道沒有緩衝區。當傳送方和接收方彼此相遇(也稱為集合點)時,無緩衝的通道傳輸元素。如果首先呼叫send,那麼它將被掛起,直到呼叫receive,如果先呼叫receive,它將被掛起,直到呼叫send。
兩個通道()工廠函式和產生助洗劑採取可選的capacity引數來指定緩衝區大小。緩衝區允許傳送方在掛起之前傳送多個元素,類似於BlockingQueue具有指定容量的緩衝區已滿時阻塞。
看一下以下程式碼的行為:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
它使用容量為4的緩衝通道列印“傳送” 五次:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前四個元素被新增到緩衝區,傳送方在嘗試傳送第五個元素時暫停。
Ticker通道
Ticker通道是一個特殊的會合通道,Unit每次從此通道上次消耗後產生給定的延遲通道。雖然它可能看起來沒有用,但它是一個有用的構建塊,可以建立複雜的基於時間的生產 管道和操作員,這些管道和操作員可以進行視窗化和其他時間依賴的處理。可以在select中使用Ticker通道執行“on tick”操作。
要建立此類渠道,請使用工廠方法程式碼。要指示不需要其他元素,請使用ReceiveChannel.cancel方法。
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
它列印以下行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
請注意,自動收報機知道可能的消費者暫停,並且預設情況下,如果發生暫停,則調整下一個生成的元素延遲,嘗試維持生成元素的固定速率。
可選地,mode可以指定等於[TickerMode.FIXED_DELAY]的引數以維持元素之間的固定延遲。
渠道公平
對於從多個協同程式呼叫它們的順序,向通道傳送和接收操作是公平的。它們以先進先出順序提供,例如,要呼叫的第一個協程receive 獲取元素。在以下示例中,兩個協程“ping”和“pong”正在從共享的“table”通道接收“ball”物件。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
“ping”協程首先啟動,因此它是第一個接收球的人。即使“ping”coroutine在將球送回桌面後立即再次接球,球也會被“pong”協程接收,因為它已經在等待它了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
請注意,由於正在使用的執行程式的性質,有時通道可能會產生看起來不公平的執行。有關詳細資訊,請參閱此問
共享可變狀態和併發
可以使用多執行緒排程程式(如預設的CommonPool)同時執行協同程式。它提出了所有常見的併發問題。主要問題是同步訪問共享可變狀態。在協同程式領域,這個問題的一些解決方案類似於多執行緒世界中的解決方案,但其他解決方案是獨一無二的。
問題
讓我們推出一千個協同程式,它們都做了一千次相同的動作(總計一百萬次執行)。我們還將測量完成時間以進行進一步比較:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我們從一個非常簡單的操作開始,該操作使用多執行緒CommonPool上下文來增加共享的可變變數。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
最後列印什麼?它不太可能列印“Counter = 1000000”,因為一千個協程counter從多個執行緒同時增加而沒有任何同步。
注意:如果您的舊系統具有2個或更少的CPU,那麼您將始終看到1000000,因為 CommonPool在這種情況下僅在一個執行緒中執行。要重現此問題,您需要進行以下更改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
最後列印什麼?它不太可能列印“Counter = 1000000”,因為一千個協程counter從多個執行緒同時增加而沒有任何同步。
注意:如果您的舊系統具有2個或更少的CPU,那麼您將始終看到1000000,因為 CommonPool在這種情況下僅在一個執行緒中執行。要重現此問題,您需要進行以下更改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
Volatiles 沒有任何幫助
有一個常見的誤解是,使變數volatile解決了併發問題。讓我們試一試:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
這段程式碼執行速度較慢,但我們仍然沒有得到“Counter = 1000000”,因為volatile變數保證可線性化(這是“原子”的技術術語)讀取和寫入相應的變數,但不提供原子性較大的行動(在我們的案例中增加)
執行緒安全的資料結構
適用於執行緒和協同程式的通用解決方案是使用執行緒安全(也稱為同步,可線性化或原子)資料結構,該資料結構為需要在共享狀態上執行的相應操作提供所有必需的同步。在簡單計數器的情況下,我們可以使用AtomicInteger具有原子incrementAndGet操作的類:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
這是針對此特定問題的最快解決方案。它適用於普通計數器,集合,佇列和其他標準資料結構以及它們的基本操作。但是,它不容易擴充套件到複雜狀態或沒有現成的執行緒安全實現的複雜操作。
執行緒限制細粒度
執行緒限制是解決共享可變狀態問題的一種方法,其中對特定共享狀態的所有訪問都限於單個執行緒。它通常用於UI應用程式,其中所有UI狀態都侷限於單個事件派發/應用程式執行緒。使用
單執行緒上下文很容易應用協同程式:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
此程式碼的工作速度非常慢,因為它可以進行細粒度的執行緒限制。每個增量CommonPool使用withContext塊從多執行緒上下文切換到單執行緒上下文。
執行緒限制粗粒度
實際上,執行緒限制是在大塊中執行的,例如,大塊狀態更新業務邏輯僅限於單個執行緒。下面的示例就是這樣,在單執行緒上下文中執行每個協程開始。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
這現在可以更快地執行併產生正確的結果。
相互排斥
該問題的相互排除解決方案 是使用永遠不會同時執行的關鍵部分來保護共享狀態的所有修改。在一個阻塞的世界中,你通常會使用synchronized或ReentrantLock為此而使用。Coroutine的替代品叫做Mutex。它具有鎖定和解鎖功能,可以分隔關鍵部分。關鍵的區別在於它Mutex.lock()是一個暫停功能。它不會阻塞執行緒。
還有withLock擴充套件功能,方便代表 mutex.lock(); try { … } finally { mutex.unlock() }模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
此示例中的鎖定是細粒度的,因此它付出了代價。但是,對於某些必須定期修改某些共享狀態的情況,它是一個不錯的選擇,但是沒有自然執行緒可以限制此狀態。
Actors
的演員是由一個協程,即被限制和封裝到該協程的狀態下,並與其他協同程式進行通訊的通道的組合的實體。一個簡單的actor可以寫成一個函式,但是一個具有複雜狀態的actor更適合一個類。
有一個actor協程構建器,它可以方便地將actor的郵箱通道組合到其作用域中,以便從傳送通道接收訊息並將其組合到生成的作業物件中,這樣對actor的單個引用就可以作為其控制代碼攜帶。
使用actor的第一步是定義一個actor要處理的訊息類。Kotlin的密封課程非常適合這個目的。我們CounterMsg使用IncCounter訊息定義密封類以增加計數器和GetCounter訊息以獲取其值。後者需要傳送回覆。甲CompletableDeferred通訊原碼,即表示將在將來已知的(傳送)一個單一的值,在這裡用於該目的。
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
然後我們定義一個使用actor coroutine builder 啟動actor的函式:
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主要程式碼很簡單:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
執行者本身執行的上下文無關緊要(正確性)。一個actor是一個協程,一個協同程式按順序執行,因此將狀態限制到特定協程可以解決共享可變狀態的問題。實際上,演員可以修改自己的私有狀態,但只能通過訊息相互影響(避免任何鎖定)。
Actor比在負載下鎖定更有效,因為在這種情況下它總是有工作要做,而且根本不需要切換到不同的上下文。
注意,actor協程構建器是產品協同程式構建器的雙重構件。一個actor與它接收訊息的頻道相關聯,而一個製作者與它傳送元素的頻道相關聯。
選擇表示式
選擇表示式可以同時等待多個掛起函式,並選擇 第一個可用的掛起函式。
從頻道中選擇
讓我們有兩個字串生成器:fizz和buzz。該fizz生產“菲斯”串每300毫秒:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
而buzz產品“Buzz!” 字串每500毫秒:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
使用接收暫停功能,我們可以接收任一從一個通道或其他。但select表示式允許我們同時使用其 onReceive子句從兩者接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
讓我們一起執行七次:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
這段程式碼的結果是:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
選擇關閉
所述的onReceive條款select當通道被關閉引起相應失敗 select丟擲異常。我們可以使用onReceiveOrNull子句在關閉通道時執行特定操作。以下示例還顯示該select表示式返回其所選子句的結果:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
讓我們使用它a產生“Hello”字串四次的頻道b和產生“世界”四次的頻道:
fun main(args: Array<String>) = runBlocking<Unit> {
// we are using the context of the main thread in this example for predictability ...
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
這段程式碼的結果非常有趣,所以我們將在模式細節中分析它:
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
有幾個觀察要做出來。
首先,select是偏向於第一條。當可以同時選擇多個子句時,其中的第一個子句將被選中。在這裡,兩個通道都在不斷地產生字串,因此a作為select中的第一個子句的channel獲勝。但是,因為我們使用的是無緩衝通道,所以a它的傳送呼叫會不時地暫停,並且也有機會b傳送。
第二個觀察結果是,當通道已經關閉時,會立即選擇onReceiveOrNull。
選擇傳送
選擇表示式具有onSend子句,可以與選擇的偏見性結合使用。
讓我們編寫一個整數生成器的示例,side當主要通道上的消費者無法跟上它時,它會將其值傳送到通道:
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
消費者將會非常緩慢,需要250毫秒才能處理每個號碼:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
那麼讓我們看看會發生什麼:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
選擇延期值
可以使用onAwait子句選擇延遲值。讓我們從一個非同步函式開始,該函式在隨機延遲後返回一個延遲字串值:
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
讓我們隨機延遲開始十幾個。
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
現在,主函式等待第一個函式完成並計算仍處於活動狀態的延遲值的數量。注意,我們在這裡使用的select表示式是Kotlin DSL,因此我們可以使用任意程式碼為它提供子句。在這種情況下,我們遍歷一個延遲值列表,onAwait為每個延遲值提供子句。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
輸出是:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
切換延遲值的通道
讓我們編寫一個使用延遲字串值通道的通道生成器函式,等待每個接收的延遲值,但只有在下一個延遲值結束或通道關閉之前。這個例子將onReceiveOrNull和onAwait子句放在一起 select:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isAct