1. 程式人生 > 其它 >kotlin中的協程三:Async/Await

kotlin中的協程三:Async/Await

目錄

Async/Await 模式

計算中最大的問題之一是能夠從非同步函式返回值。如果你呼叫的函式建立了一個不同的執行緒來執行,你不能將值返回給外部函式。這是一個程式限制,因為系統不知道何時返回,並且很難線上程之間進行橋接。但是有一種方法可以通過 async/await 模式來解決這個問題。
想法很簡單:圍繞你需要的值構建一個包裝器,然後呼叫一個提供該值的函式並將其傳遞給包裝器。準備好後,可以請求它。一旦你請求了這個值,就掛起你請求的函式,直到資料出現。這種模式非常像Java裡的Futures。

Java裡的Future

建立一個Future,必須宣告一個具有 return 語句的函式;否則,會得到一個編譯時錯誤。此外,可以隨時使用 isDone() 檢查Future的狀態。如果 isDone() 返回 true,就可以使用值了。
獲取Future返回值的方式是get(),這是一個阻塞的方法,該方法可能會長時間阻塞主執行緒。

private static ExecutorService executor =
 Executors.newSingleThreadExecutor();
public static Future<Integer> parse(String input) {
 return executor.submit(() -> {
 Thread.sleep(1000)
 return Integer.parseInt(input);
 });
}
public static void main(String...args) {
 Future<Integer> parser = parse("310");
 while(!parser.isDone()) {
 // waiting to parse }
 int parsedValue = parser.get();
}
上面的例子建立了一個future,它開始在executor建立的執行緒中執行任務。一旦對 isDone() 的呼叫返回 true,就知道它可以使用了。最後,通過呼叫 get(),從 Future 接收值,該值現在快取在物件本身中,並且可以重用。 如果需要在不同的執行緒中處理和生成值時,Future 非常有用。想實現並行,可以建立多個執行緒供你的future使用,同時執行多個任務。它們還允許擁有清晰的控制流邏輯,因為生成的值可以在順序程式碼中使用,並且不必依賴回撥或鏈式函式呼叫。但是,另一方面,它們的值總是以阻塞的方式接收的,因此有一個要渲染的使用者介面時,等待的代價可能是昂貴的。

使用Async/Await

kotlin中的async await和java的Future模式很像。kotlin中的async返回了Deferred類比java中的Future。deferred值只是包裝了可以使用的潛在物件。一旦你準備好接收物件,你必須等待它,有效地請求資料,這些資料可能存在也可能不存在。如果資料已經提供並交付,呼叫將變成一個簡單的 get();否則,程式碼將不得不掛起並等待資料到達包裝器。 很簡單,就好像正在建立一個 BlockingQueue 例項,它只能存放一個例項。而且,在任何時候,都可以嘗試獲取值或在等待程式碼時掛起執行code。關鍵區別在於實際上並沒有阻塞執行緒,而是暫停了程式碼。
該模式被稱為 async/await 是有原因的——因為完整的實現需要兩個函式呼叫——async() 來準備和包裝值,以及 await() 來請求使用值。

public fun <T> CoroutineScope.async(
 context: CoroutineContext = EmptyCoroutineContext,
 start: CoroutineStart = CoroutineStart.DEFAULT,
 block: suspend CoroutineScope.() -> T
): Deferred<T> {
 val newContext = newCoroutineContext(context)
 val coroutine = if (start.isLazy)
 LazyDeferredCoroutine(newContext, block) else
 DeferredCoroutine<T>(newContext, active = true)
 coroutine.start(start, coroutine, block)
 return coroutine
}
當呼叫 async 時,可以傳遞一個 CoroutineContext 來將它繫結到某個 Job 或 Dispatcher。也可以使用 CoroutineStart 引數以不同的模式啟動它。但是,最重要的是,必須傳入一個 lambda 塊,該塊可以訪問呼叫該函式的 CoroutineScope,並且需要返回一個它將嘗試儲存在 Deferred 中的值。它通過建立一個新的 DeferredCoroutine 來做到這一點,它以lambda 開頭,並返回前面提到的協程。 這個函式呼叫基本上將值包裝在一個協程中,該協程實現了稍後將呼叫 await() 的 Deferred 介面。一旦呼叫 await(),協程將嘗試為你生成值或暫停直到它出現。
/**
* Awaits for completion of this value without blocking a thread
* and resumes when deferred computation is complete,
* returning the resulting value or throwing the
* corresponding exception if the deferred was cancelled.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled
* or completed while this suspending function is waiting,
* this function immediately resumes with
[CancellationException].
*
* This function can be used in [select] invocation
* with [onAwait] clause.
* Use [isCompleted] to check for completion of this
* deferred value without waiting.
*/
public suspend fun await(): T

函式本身極其簡單.可以掛起整個函式或協程,而不是實際阻塞執行緒,並在值準備好後去獲取。

fun main() {
 val userId = 992

 getUserByIdFromNetwork(userId) { user ->
 println(user)
 }
}
private fun getUserByIdFromNetwork(userId: Int, onUserReady:
(User) -> Unit) {
 Thread.sleep(3000)
 onUserReady(User(userId, "Filip", "Babic"))
}
data class User(val id: Int, val name: String, val lastName:
String)
這段程式碼嘗試從模擬的網路呼叫中獲取使用者,並將其打印出來。但是,問題在於它正在呼叫 Thread.sleep(),它會暫停執行三秒鐘。就像有一個阻塞的網路呼叫一樣,必須等待資料返回,然後才能使用它。另外它還使用回撥在資料準備好後將資料傳回給呼叫者。接下來,將對其進行重構
private fun getUserByIdFromNetwork(userId: Int) =
GlobalScope.async {
 Thread.sleep(3000)
 User(userId, "Filip", "Babic")
}
fun main() {
 val userId = 992
 val userData = getUserByIdFromNetwork(userId)
 println(userData.await())
}

<font size = 3 如>前所述,它建立一個協程並使用 Deferred 值對其進行包裝。通過該介面,您可以訪問該值,因為該介面公開了 await()。一旦你呼叫了 await(),掛起了函式呼叫,有效地避免了阻塞執行緒。然後等待 lambda 塊執行以使用內部儲存的值。當值準備好時,函式停止掛起,程式碼繼續正常執行

Deferring values

每個 async() 塊都返回一個 Deferred。它是驅動 Coroutines API 的核心機制,理解它的工作原理非常重要。當呼叫 async() 時,該函式返回一個 Deferred。它通過建立 DeferredCoroutine 或 LazyDeferredCoroutine 來實現。這樣的協程實現了 Continuation 介面,允許攔截執行流,並將值一直傳遞到呼叫點,就像suspendCoroutine()一樣。這類似於之前看到的Future模式的工作方式。
一旦協程建立完成,除非它的 CoroutineStart 是 CoroutineStart.LAZY,否則它會立即啟動。程式碼將開始在使用Dispatchers宣告的執行緒中執行。一旦程式碼完成執行併產生一個值,它將被儲存在內部。如果在任何時間點呼叫 await(),都將是一個掛起的呼叫,這將建立一個新的continuation和執行流,等待該值可供使用。如果它沒有準備好,該函式將不得不等待。如果已經提供了該值,就能立即獲得它。
您還可以檢查延遲值的狀態,因為它還實現了 Job 介面。可以檢查諸如 isActive 和 isCompleted 之類的標誌,以瞭解其當前的生命週期狀態。還有一些漂亮的函式,如果Job被取消,可以使用它們來接收值或異常。這些函式是 getCompleted(),它返回值 - 如果延遲值被取消,則返回異常 - 和 getCompletionExceptionOrNull(),如果Job未取消,則返回 CancellationException 或 null。使用這些函式,還可以檢查deferred values的完成狀態的詳細資訊。
所以解釋什麼是 Deferred 的一個好方法是,它是一個有結果的 Job。該Job可以並行執行,它可能會或可能不會產生結果值,並且可以取消並與其他Job連線。這提供了一個強大的 API,可以隨心所欲。可以利用deferred values的一種方法是將它們組合在一起以呼叫具有多個引數的函式。

Combining multiple deferred values

能夠建立在後臺構建但可以在一個函式呼叫中在主執行緒上訪問的deferred values是一件了不起的事情。但非同步的真正威力在於能夠將兩個或多個延遲值組合到一個函式呼叫中。讓我們看看如何做到這一點。

到目前為止,已經使用了一個模擬網路請求的示例,但現在是擴充套件該示例的時候了。示例中的 users.txt是一個包含 13,000 行文字的檔案。大多數行包含構建使用者所需的資訊——ID、姓名和姓氏。有些行是空的,有些沒有全部三項。這背後的想法是讀取整個檔案,解析和分割每一行,並從中建立使用者。之後,將使用使用者列表和從模擬網路呼叫中獲得的使用者來檢視該使用者是否儲存在檔案中。

private fun readUsersFromFile(filePath: String) =
            GlobalScope.async {
                println("Reading the file of users")
                delay(1000)
                File(filePath)
                        .readLines()
                        .asSequence()
                        .filter { it.isNotEmpty() }
                        .map {
                            val data = it.split(" ") // [id, name, lastName]
                            if (data.size == 3) data else emptyList()
                        }
                        .filter {
                            it.isNotEmpty()
                        }
                        .map {
                            val userId = it[0].toInt()
                            val name = it[1]
                            val lastName = it[2]
                            User(userId, name, lastName)
                        }
                        .toList()
            }

    private fun checkUserExists(user: User, users: List<User>):
            Boolean {
        return user in users
    }
	fun main() {
 val userId = 992
 GlobalScope.launch {
 val userDeferred = getUserByIdFromNetwork(userId)
 val usersFromFileDeferred = readUsersFromFile("users.txt")
 println("Finding user")
 val userStoredInFile = checkUserExists(
 userDeferred.await(), usersFromFileDeferred.await()
 )
 if (userStoredInFile) {
 println("Found user in file!")
 }
 }
 Thread.sleep(5000)
}
傳遞給函式的引數是之前準備的兩個deferred values的 await() 結果,因此,有一行程式碼掛起函式,等待這兩個值。這是使用多個deferred values的正確方法。實際上是在程式中建立了一個掛起點,它暫停了兩個函式。您可以做的另一件事是使 checkUserExists() 可掛起,然後從內部等待,但在將值進一步傳遞給其他函式之前等待值更方便。 大約三秒鐘後,應該會看到 Found user in file!。這是因為圍繞 Deferred 建立的協程不是惰性的,它們會立即啟動——`但只有在等待值時才會掛起程式碼`。呼叫 await() 後,將接收到的值傳遞給 checkUserExists(),然後接收到使用者存在於檔案中的輸出。 使用這種方法,可以組合任意數量的deferred values,進而實現智慧和簡單的並行性,這不是建立在資料流回撥的概念之上的。考慮到這一點,程式碼非常容易理解,因為它類似於順序、同步的程式碼,即使它在幕後可能是完全非同步的。這就是協程和 async/await 模式的真正威力。 ## 重構 上面的程式碼目的是解釋如何並行初始化多個值,並在它們準備好後將它們傳遞給函式。另一方面,問題是如果出現問題或非同步塊沒有正確構建,將阻塞一個執行緒並可能凍結整個系統。例如,如果非同步塊包含一個 while 迴圈,並且條件沒有中斷策略,則函式將永遠不會返回該值。此外,如果構建的函式執行繁重的操作並需要大量時間才能完成,應該能夠在執行過程中的任何時候取消它們。否則,可能會取消其父Job ,但不會取消Job本身。然後會得到仍在執行並耗盡資源的程式碼,即使它的 Job 已被取消。
private suspend fun getUserByIdFromNetwork(userId: Int) =
GlobalScope.async {
 println("Retrieving user from network")
 delay(3000)
 println("Still in the coroutine")
 User(userId, "Filip", "Babic") // we simulate the network call
}
fun main() {
 GlobalScope.launch {
 val deferredUser = getUserByIdFromNetwork(130)
 println(deferredUser.await())
 }
}
如果你呼叫這個函式,這個簡單的程式碼片段將在三秒後返回一個使用者。由於它是一個掛起函式,因此需要將其包裝在協程構建器中.但是如果你在它的程式碼開始執行之後從 launch() 中取消 Job 會發生什麼?即使父Job被取消,getUserByIdFromNetwork() 仍將執行三秒鐘並返回一個值。這會導致計算時間和資源的浪費,而這些時間和資源最好花在其他地方。或者至少在最初的版本中是這樣。