回撥“地獄”和反應模式
瞭解更多有關基於反應流的方法以及如何避免回撥地獄的資訊。
更好地理解基於反應流的方法的有用性的方法之一是它如何簡化非阻塞 IO 呼叫。
本篇文章將簡要介紹進行同步遠端呼叫所涉及的程式碼型別。然後,我們將演示非阻塞 IO 中的分層如何高效使用資源(尤其是執行緒),引入了稱為回撥地獄帶來的複雜性以及基於反應流方法如何簡化程式設計模型。
1. 目標服務
客戶端呼叫表示城市詳細資訊的目標服務有兩個埠。當使用型別為——/cityids 的 URI 呼叫時,返回城市 id 列表,並且示例結果如下所示:
[ 1, 2, 3, 4, 5, 6, 7 ] 複製程式碼
一個埠返回給定其 ID 的城市的詳細資訊,例如,當使用 ID 為1——“/cities/1” 呼叫時:
{
"country": "USA",
"id": 1,
"name": "Portland",
"pop": 1600000
}
複製程式碼
客戶端的責任是獲取城市 ID 的列表,然後對於每個城市,根據 ID 獲取城市的詳細資訊並將其組合到城市列表中。
2. 同步呼叫
我正在使用 Spring Framework 的 RestTemplate 進行遠端呼叫。獲取 cityId 列表的 Kotlin 函式如下所示:
private fun getCityIds(): List<String> { val cityIdsEntity: ResponseEntity<List<String>> = restTemplate .exchange("http://localhost:$localServerPort/cityids", HttpMethod.GET, null, object : ParameterizedTypeReference<List<String>>() {}) return cityIdsEntity.body!! } 複製程式碼
獲取城市詳情:
private fun getCityForId(id: String): City {
return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}
複製程式碼
鑑於這兩個函式,它們很容易組合,以便於輕鬆返回城市列表 :
val cityIds: List<String> = getCityIds()
val cities: List<City> = cityIds
.stream()
.map<City> { cityId -> getCityForId(cityId) }
.collect(Collectors.toList())
cities.forEach { city -> LOGGER.info(city.toString()) }
複製程式碼
程式碼很容易理解;但是,涉及八個阻塞呼叫:
- 獲取 7 個城市 ID 的列表,然後獲取每個城市的詳細資訊
- 獲取 7 個城市的詳細資訊
每一個呼叫都將在不同的執行緒上。
3. 非阻塞 IO 回撥
我將使用 AsyncHttpClient 庫來進行非阻塞 IO 呼叫。
進行遠端呼叫時,AyncHttpClient
返回 ListenableFuture 型別。
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
複製程式碼
可以將回調附加到 ListenableFuture
以在可用時對響應進行操作。
responseListenableFuture.addListener(Runnable {
val response: Response = responseListenableFuture.get()
val responseBody: String = response.responseBody
val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
object : TypeReference<List<Long>>() {})
....
}
複製程式碼
鑑於 cityIds 的列表,我想獲得城市的詳細資訊,因此從響應中,我需要進行更多的遠端呼叫併為每個呼叫附加回調以獲取城市的詳細資訊:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
responseListenableFuture.addListener(Runnable {
val response: Response = responseListenableFuture.get()
val responseBody: String = response.responseBody
val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
object : TypeReference<List<Long>>() {})
cityIds.stream().map { cityId ->
val cityListenableFuture = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cities/$cityId")
.execute()
cityListenableFuture.addListener(Runnable {
val cityDescResp = cityListenableFuture.get()
val cityDesc = cityDescResp.responseBody
val city = objectMapper.readValue(cityDesc, City::class.java)
LOGGER.info("Got city: $city")
}, executor)
}.collect(Collectors.toList())
}, executor)
複製程式碼
這是一段粗糙的程式碼;回撥中又包含一組回撥,很難推理和理解 - 因此它被稱為“回撥地獄”。
4. 在 Java CompletableFuture 中使用非阻塞 IO
通過將 Java 的 CompletableFuture 作為返回型別而不是 ListenableFuture 返回,可以稍微改進此程式碼。CompletableFuture
提供允許修改和返回型別的運算子。
例如,考慮獲取城市 ID 列表的功能:
private fun getCityIds(): CompletableFuture<List<Long>> {
return asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
l
}
}
複製程式碼
在這裡,我使用 thenApply
運算子將 CompletableFuture<Response>
轉換為 CompletableFuture<List<Long>>
。
同樣的,獲取城市詳情:
private fun getCityDetail(cityId: Long): CompletableFuture<City> {
return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
LOGGER.info("Got {}", s)
val city = objectMaper.readValue(s, City::class.java)
city
}
}
複製程式碼
這是基於回撥的方法的改進。但是,在這個特定情況下,CompletableFuture
缺乏有用的運算子,例如,所有城市細節都需要放在一起:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds()
val citiesCompletableFuture: CompletableFuture<List<City>> =
cityIdsFuture
.thenCompose { l ->
val citiesCompletable: List<CompletableFuture<City>> =
l.stream()
.map { cityId ->
getCityDetail(cityId)
}.collect(toList())
val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray())
.thenApply { _: Void? ->
citiesCompletable
.stream()
.map { it.join() }
.collect(toList())
}
citiesCompletableFutureOfList
}
複製程式碼
使用了一個名為 CompletableFuture.allOf 的運算子,它返回一個“Void”型別,並且必須強制返回所需型別的 CompletableFuture<List<City>>
。
5. 使用 Reactor 專案
Project Reactor 是 Reactive Streams 規範的實現。它有兩種特殊型別可以返回 0/1 項的流和 0/n 項的流 - 前者是 Mono,後者是 Flux。
Project Reactor 提供了一組非常豐富的運算子,允許以各種方式轉換資料流。首先考慮返回城市 ID 列表的函式:
private fun getCityIds(): Flux<Long> {
return webClient.get()
.uri("/cityids")
.exchange()
.flatMapMany { response ->
LOGGER.info("Received cities..")
response.bodyToFlux<Long>()
}
}
複製程式碼
我正在使用 Spring 優秀的 WebClient 庫進行遠端呼叫並獲得 Project Reactor Mono <ClientResponse>
型別的響應,可以使用 flatMapMany
運算子將其修改為 Flux<Long>
型別。
根據城市 ID,沿著同樣的路線獲取城市的詳情:
private fun getCityDetail(cityId: Long?): Mono<City> {
return webClient.get()
.uri("/cities/{id}", cityId!!)
.exchange()
.flatMap { response ->
val city: Mono<City> = response.bodyToMono()
LOGGER.info("Received city..")
city
}
}
複製程式碼
在這裡,Project Reactor Mono<ClientResponse>
型別正在使用 flatMap
運算子轉換為 Mono<City>
型別。
以及從中獲取 cityIds,這是 City 的程式碼:
val cityIdsFlux: Flux<Long> = getCityIds()
val citiesFlux: Flux<City> = cityIdsFlux
.flatMap { this.getCityDetail(it) }
return citiesFlux
複製程式碼
這非常具有表現力 - 對比基於回撥的方法的混亂和基於 Reactive Streams 的方法的簡單性。
6. 結束語
在我看來,這是使用基於反應流的方法的最大原因之一,特別是 Project Reactor,用於涉及跨越非同步邊界的場景,例如在此例項中進行遠端呼叫。它清理了回撥和回撥的混亂,提供了一種使用豐富的運算子進行修改/轉換型別的自然方法。
本文使用的所有示例的工作版本的儲存庫都可以在 GitHub 上找到。
原文:dzone.com/articles/ca…
作者:Biju Kunjummen
譯者:Emma
作者:鍋外的大佬
連結:https://juejin.im/post/5d1579aee51d45772a49ad77
來源:掘金
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處