【譯】理解Rust中的Futures(二)
原文標題:Understanding Futures in Rust -- Part 2
原文連結:https://www.viget.com/articles/understanding-futures-is-rust-part-2/
公眾號: Rust 碎碎念
翻譯 by: Praying
背景
如果你還沒有看前面的內容,可以在這裡[1]檢視(譯註:已有譯文,可在公眾號檢視)。
在第一部分,我們介紹了 Future trait,瞭解了 future 是如何被建立和執行的,並且開始知道它們如何能被連結到一起。
上次內容的程式碼可以在這個 playground 連結[2]檢視,並且本文中所有示例程式碼將會以這段程式碼為基礎。
注意:所有的程式碼示例都有對應的 playground 連結,其中一些用於解釋說明但無法編譯的程式碼會有相應的標記。
目標
如果你熟悉 JavaScript 中的 promise 並且閱讀了最新的部落格,你可能會對先前文章中提到的組合子(then
、catch
和finally
)感到困惑。
你將會在本文章找到與它們對等的東西,並且在最後,下面這段程式碼將能夠編譯。你將會理解使得 future 能夠運作的型別,trait 和底層概念。
//Thisdoesnotcompile,yet
fnmain(){
letmy_future=future::ready(1)
.map(|x|x+3)
.map(Ok)
.map_err(|e:()|format! ("Error:{:?}",e))
.and_then(|x|future::ready(Ok(x-3)))
.then(|res|{
future::ready(matchres{
Ok(val)=>Ok(val+3),
err=>err,
})
});
letval=block_on(my_future);
assert_eq!(val,Ok(4));
}
工具函式
首先,我們需要一些工具函式,future::ready
和block_on
。這些函式能夠讓我們很容易地建立和執行 future 直到它們完成,這些函式雖然有用,但是在生產環境的程式碼中並不常見。
在開始之前,我們先把我們的Future
trait 和Context
modtask{
usecrate::NOTIFY;
pubstructContext<'a>{
waker:&'aWaker,
}
impl<'a>Context<'a>{
pubfnfrom_waker(waker:&'aWaker)->Self{
Context{waker}
}
pubfnwaker(&self)->&'aWaker{
&self.waker
}
}
pubstructWaker;
implWaker{
pubfnwake(&self){
NOTIFY.with(|f|*f.borrow_mut()=true)
}
}
}
usecrate::task::*;
modfuture{
usecrate::task::*;
pubenumPoll<T>{
Ready(T),
Pending,
}
pubtraitFuture{
typeOutput;
fnpoll(&mutself,cx:&Context)->Poll<Self::Output>;
}
}
usecrate::future::*;
Playground 連結[3]
這裡唯一需要注意的就是,只有將模組,型別和函式公開,才能在程式碼中使用它們。這可以通過pub
關鍵字來完成。
工具函式實現
Future::Ready
future::ready
建立了一個 future,該 future 帶有傳入值並且是立即就緒(ready)的。當你有一個已經不是 future 的值的時候,這個函式可以用於開啟一個 future 鏈,就像前一個示例那樣。
modfuture{
//...
pubstructReady<T>(Option<T>);
impl<T>FutureforReady<T>{
typeOutput=T;
fnpoll(&mutself,_:&Context)->Poll<Self::Output>{
Poll::Ready(self.0.take().unwrap())
}
}
pubfnready<T>(val:T)->Ready<T>{
Ready(Some(val))
}
}
fnmain(){
letmy_future=future::ready(1);
println!("Output:{}",run(my_future));
}
Playground 連結[4]
我們建立了一個型別為Ready<T>
的泛型結構體,該結構體包裝了一個Option
。這裡我們使用Option
列舉以保證 poll 函式只被呼叫一次。在 executor 的實現中,在返回一個Poll::Ready
之後呼叫 poll 將會報錯。
BLOCK_ON
為了我們的目標,我們把我們的 run 函式重新命名為block_on
。在future-preview
這個 crate 中,該函式使用內部的LocalPool
來執行一個 future 直到完成,同時會阻塞當前執行緒。我們的函式也做了相似的事情。
fnblock_on<F>(mutf:F)->F::Output
where
F:Future,
{
NOTIFY.with(|n|loop{
if*n.borrow(){
*n.borrow_mut()=false;
letctx=Context::from_waker(&Waker);
ifletPoll::Ready(val)=f.poll(&ctx){
returnval;
}
}
})
}
fnmain(){
letmy_future=future::ready(1);
println!("Output:{}",block_on(my_future));
}
Playground 連結[5]
組合子(Combinators)
首先,讓我們從一些能夠讓你直接作用於另一個 Future 的Output
值的一些組合子開始。在本文中,我們使用非正式的但是比較流行的組合子定義,即能夠允許你對某種型別執行操作,並與其他型別結合起來的函式。例如,一個巢狀的 future 可以由一個組合子函式函式建立,它可以有一個複雜的型別Future< Output = Future < Output = i32>>
。這可以被稱為一個 future,該 future 的輸出(Output)是另一個 future,新的 future 的輸出是 i32 型別。這樣的組合子中,最簡單的一個就是map
。
Map
如果你熟悉Result
或者Option
型別的map
函式,那麼對它應該不陌生。map 組合子持有一個函式並將其應用到 future 的Output
值上,返回一個新的 future,這個新 future 把函式的結果(Result)作為它的Output
。Future 中的 map 組合子甚至比Result
或者Option
中更簡單,因為不需要考慮 failure 的情況。map 就是簡單的Future->Future
。
下面是函式簽名:
//doesnotcompile
fnmap<U,F>(self:Sized,f:F)->Map<Self,F>
where
F:FnOnce(Self::Output)->U,
Self:Sized,
map
是一個泛型函式,它接收一個閉包,返回一個實現了 Future 的Map
結構體。不是每當我們在值上進行連結都需要實現Future
trait,正如我們在最後一部分做的那樣,我們可以使用這些函式來為我們完成這些工作。
讓我們來分析一下:
Map<Self, F>
聲明瞭 map 函式的(返回)型別,包括當前的 future,以及傳入函式的 future。where
是一個能夠讓我們新增型別約束的關鍵字。對於F
型別引數,我們可以在內部定義約束map<U, F: FnOnce(Self::Output) -> U
,但是使用 where 語句可讀性會更好。FnOnce(Self::Output) -> U
是一個函式的型別定義,該函式接收當前型別的Output
並返回任意型別U
。FnOnce
是函式 trait 中的一個,其他還包括FnMut
和Fn
。FnOnce
是用起來最簡單的,因為編譯器可以保證這個函式只被呼叫一次。它使用環境中用到的值並獲取其所有權。Fn
和FnMut
分別以不可變和可變的方式借用環境中值的引用。所有的閉包都實現了FnOnce
trait,並且其中一些沒有移動值的閉包還實現了FnMut
和Fn
trait。這是 Rust 做的最酷的事情之一,允許對閉包和第一類函式引數進行真正有表達力的使用。Rust book 中的相關內容[6]值得一讀。Self: Sized
是一個約束,允許map
只能被Sized
的 trait 實現者呼叫。你不必考慮這個問題,但是確實有些型別不是Sized
。例如,[i32]
是一個不確定大小的陣列。因為我們不知道它多長。如果我們想要為它實現我們的Future
trait,那麼我們就不能對它呼叫map
。
大多數組合子都遵循這個模式,因此接下來的文章我們就不需要分析的這麼仔細了。
下面是一個map
的完整實現,它的Map
型別以及它對Future
的實現
modfuture{
traitFuture{
//...
fnmap<U,F>(self,f:F)->Map<Self,F>
where
F:FnOnce(Self::Output)->U,
Self:Sized,
{
Map{
future:self,
f:Some(f),
}
}
}
//...
pubstructMap<Fut,F>{
future:Fut,
f:Option<F>,
}
impl<Fut,F,T>FutureforMap<Fut,F>
where
Fut:Future,
F:FnOnce(Fut::Output)->T,
{
typeOutput=T;
fnpoll(&mutself,cx:&Context)->Poll<T>{
matchself.future.poll(cx){
Poll::Ready(val)=>{
letf=self.f.take().unwrap();
Poll::Ready(f(val))
}
Poll::Pending=>Poll::Pending,
}
}
}
}
fnmain(){
letmy_future=future::ready(1).map(|val|val+1);
println!("Output:{}",block_on(my_future));
}
Playground 連結[7]
從高層次來講,當我們呼叫一個 future 上的map
時,我們構造了一個Map
型別,該型別持有當前 future 的引用以及我們傳入的閉包。Map
物件自身也是一個 Future。當它被輪詢時,它依次輪詢底層的 future。當底層的 future 就緒後,它獲取那個 future 的Output
的值並且把它傳入閉包,對Poll::Ready
中的閉包返回的值進行包裝(wrapping)並且把新值向上傳遞。
如果你閱讀了最新的部落格,你對在這裡看到的東西應該感到很熟悉,但是在我們繼續之前,我會快速地講解作為一個複習。
pub struct Map<Fut, F>
是一個關於 future——Fut
和函式F
的泛型。f: Option<F>
是一個包裝了閉包了Option
型別。這裡是個小技巧,以保證閉包只被呼叫一次。當你獲取一個Option
的值,它會用None
替換內部的值並且返回裡面包含的值。如果在返回一個Poll::Ready
之後被輪詢,這個函式會 panic。在實際中,future 的 executor 不會允許這種情況發生。type Output = T;
定義了 map future 的輸出和我們的閉包的返回值是將會是相同的。Poll::Read(f(val))
返回帶有閉包返回結果的就緒(ready)狀態。Poll::Pending => Poll::Pending
如果底層的 future 返回 pending,繼續傳遞。future::ready(1).map(|val| val + 1);
這對就緒(ready)future 的輸出進行了 map,並對其加 1。它返回了一個 map future,其中帶有對原先的 future 的一個引用。map future 在執行期間輪詢原先的 future 是否就緒(ready)。這和我們的AddOneFuture
做的是相同的事情。
這真的很酷,主要有以下幾個原因。首先,你不必對每一個你想要進行的計算都實現一個新的 future,它們可以被包裝(wrap)進組合子。事實上,除非你正在實現你自己的非同步操作,否則你可能從來都不需要自己去實現Future
trait。
Then
現在我們有了map
,我們可以把任何我們想要的計算連結起來,對麼?答案是對的,但是對此還有一個相當大的警告。
想象一下,當你有一些函式,這些函式返回你想要連結起來的 future。對於這個例子,我們可以想象,它們是下面的 api 呼叫,這些呼叫返回包裝(wrap)在 future 中的結果,get_user
和get_files_for_user
。
//doesnotcompile
fnmain(){
letfiles_future=get_user(1).map(|user|get_files_for_user(user));
println!("UserFiles:{}",block_on(files_future));
}
這段程式碼無法編譯,但是你可以想象你在這裡構建的型別,看起來應該像這樣:Future<Output = Future<Output= FileList>>
。這在使用Result
和Option
型別的時候也是一個常見問題。使用map
函式經常會導致巢狀的輸出和對這些巢狀的繁瑣處理。在這種情況下,你不得不去跟蹤到底嵌套了多少層並且對每一個巢狀的 future 都呼叫block_on
。
幸運地是,Result
,Option
有一個被稱為and_then
的解決方案。Option
的and_then
通過對T
應用一個函式來對映(map)Some(T) -> Some(U)
,並且返回閉包所返回的Option
。對於 future,它是通過一個稱為then
的函式來實現的,該函式看起來很像對映(map),但是這個閉包應該它自己的 future。在一些語言中,這被稱為flatmap
。這裡值得注意的是,傳遞給then
的閉包返回的值必須是實現了Future
,否則你將會得到一個編譯器錯誤。
這裡是我們的對於then
,Then
結構體和它的對Future
trait 的實現。其中的大部分內容和我們在 map 中做的很像。
modfuture{
traitFuture{
//...
fnthen<Fut,F>(self,f:F)->Then<Self,F>
where
F:FnOnce(Self::Output)->Fut,
Fut:Future,
Self:Sized,
{
Then{
future:self,
f:Some(f),
}
}
}
//...
pubstructThen<Fut,F>{
future:Fut,
f:Option<F>,
}
impl<Fut,NextFut,F>FutureforThen<Fut,F>
where
Fut:Future,
NextFut:Future,
F:FnOnce(Fut::Output)->NextFut,
{
typeOutput=NextFut::Output;
fnpoll(&mutself,cx:&Context)->Poll<Self::Output>{
matchself.future.poll(cx){
Poll::Ready(val)=>{
letf=self.f.take().unwrap();
f(val).poll(cx)
}
Poll::Pending=>Poll::Pending,
}
}
}
}
fnmain(){
letmy_future=future::ready(1)
.map(|val|val+1)
.then(|val|future::ready(val+1));
println!("Output:{}",block_on(my_future));
}
Playground 連結[8]
這裡面沒見過的程式碼可能是f(val).poll(cx)
。它呼叫了帶有先前 future 的閉包並且直接返回給你poll
的值。
聰明的你可能會意識到,我們的Then::poll
函式可能會 panic。如果第一個 future 返回就緒(ready)但是第二個 future 返回Poll::Pending
,接著let f = self.f.take().unwrap();
這行程式碼就會在下次被輪詢(poll)的時候 panic 並退出程式。在future-preview
中,這種情況會通過一個稱為Chain[9]的型別來處理。Chain 通過 unsafe 程式碼塊來實現,並且使用了新型別——Pin
。這些內容超出了本文的範圍。目前來講,我們可以假定任何通過then
閉包返回的 future 都絕不會返回Poll::Pending
。總體來講,這不是個安全的假設。
Result 組合子
在 futures-rs 庫的 0.1 版本中,Future
trait 和Result
型別緊密關聯。Future
trait 的定義如下:
//doesnotcompile
traitFuture{
typeItem;
typeError;
fnpoll(self)->Poll<Self::Item,Self::Error>;
}
Poll
型別裡定義了成功狀態、失敗狀態和未就緒狀態。這意味著像map
這種函式只有當 Poll 是就緒並且不是錯誤的情況下才能執行。儘管這會產生一些困擾,但是它在連結組合子並且根據成功或失敗狀態做決定的時候,會產生一些非常好的人體工程學(ergonomics )。
這與std::future
的實現方式有所不同。現在 future 要麼是就緒或者是未就緒,對於成功或失敗語義是不可知的。它們可以包含任何值,包括一個Result
。為了得到便利的組合子,比如像map_err
能夠讓你只改變一個巢狀的 Result 中的錯誤型別,或者想and_then
這樣,允許你只改變巢狀 Result 中的值型別,我們需要實現一個新的 trait。下面是TryFuture
的定義:
modfuture{
//...
pubtraitTryFuture{
typeOk;
typeError;
fntry_poll(self,cx:&mutContext)->Poll<Result<Self::Ok,Self::Error>>;
}
impl<F,T,E>TryFutureforF
where
F:Future<Output=Result<T,E>>,
{
typeOk=T;
typeError=E;
fntry_poll(&mutself,cx:&Context)->Poll<F::Output>{
self.poll(cx)
}
}
}
Playground 連結[10]
TryFuture
是一個 trait,我們可以為任意的型別<F, T, E>
實現這個 trait,其中F
實現了Future
trait,它的Output
型別是Result<T,E>
。它只有一個實現者。那個實現者定義了一個try_poll
函式,該函式與Future
trait 上的poll
有相同的簽名,它只是簡單地呼叫了poll
方法。
這意味著任何一個擁有 Result 的Output
型別的 future 也能夠訪問它的成功/錯誤(success/error)狀態。這也使得我們能夠定義一些非常方便的組合子來處理這些內部 Result 型別,而不必在一個map
或and_then
組合子內顯示地匹配Ok
和Err
型別。下面是一些能夠闡述這個概念的實現。
AndThen
讓我們回顧之前想象到的 API 函式。假定它們現在處於會發生網路分割槽和伺服器中斷的現實世界中,不會總是能返回一個值。這些 API 方法實際上會返回一個嵌有 result 的 future 以表明它已經完成,並且是要麼是成功完成,要麼是帶有錯誤的完成。我們需要去處理這些結果,下面是我們可能是根據現有工具處理它的方式。
//doesnotcompile
fnmain(){
letfiles_future=get_user(1).then(|result|{
matchresult{
Ok(user)=>get_files_for_user(user),
Err(err)=>future::ready(Err(err)),
}
});
matchblock_on(files_future){
Ok(files)=>println!("UserFiles:{}",files),
Err(err)=>println!("Therewasanerror:{}",err),:w
};
}
情況還不算太壞,但是假定你想要連結更多的 future,事情很快就會變得一團糟。幸運的是,我們可以定義一個組合子——and_then
,該組合子將會把型別Future<Output = Result<T, E>>
對映到Future<Output = Result<U, E>>
,其中我們把T
變為了U
。
下面是我們定義它的方式:
modfuture{
pubtraitTryFuture{
//...
fnand_then<Fut,F>(self,f:F)->AndThen<Self,F>
where
F:FnOnce(Self::Ok)->Fut,
Fut:Future,
Self:Sized,
{
AndThen{
future:self,
f:Some(f),
}
}
}
//...
pubstructAndThen<Fut,F>{
future:Fut,
f:Option<F>,
}
impl<Fut,NextFut,F>FutureforAndThen<Fut,F>
where
Fut:TryFuture,
NextFut:TryFuture<Error=Fut::Error>,
F:FnOnce(Fut::Ok)->NextFut,
{
typeOutput=Result<NextFut::Ok,Fut::Error>;
fnpoll(&mutself,cx:&Context)->Poll<Self::Output>{
matchself.future.try_poll(cx){
Poll::Ready(Ok(val))=>{
letf=self.f.take().unwrap();
f(val).try_poll(cx)
}
Poll::Ready(Err(err))=>Poll::Ready(Err(err)),
Poll::Pending=>Poll::Pending,
}
}
}
}
fnmain(){
letmy_future=future::ready(1)
.map(|val|val+1)
.then(|val|future::ready(val+1))
.map(Ok::<i32,()>)
.and_then(|val|future::ready(Ok(val+1)));
println!("Output:{:?}",block_on(my_future));
}
Playground 連結[11]
你對此應該較為熟悉。事實上,這和then
組合子的實現基本一致。只有一些關鍵的區別需要注意:
函式定義在 TryFuture trait 中
type Output = Result<NextFut::Ok, Fut::Error>;
表明 AndThen future 的輸出擁有新的 future 的值型別,以及在它之前的 future 的錯誤型別。換句話說,如果先前的 future 的輸出包含一個錯誤型別,那麼這個閉包將不會被執行。我們呼叫的是
try_poll
而不是poll
。
值得注意的是,當你像這樣來連結組合子的時候,它們的型別前面可能會變得很長且在編譯錯誤資訊中難以閱讀。and_then
函式要求 future 呼叫時的錯誤型別和由閉包返回的型別必須是相同的。
MapErr
回到我們的想象的 api 呼叫。假定呼叫的 api 都返回帶有同一類錯誤的 future,但是你需要在呼叫之間進行額外的步驟。假定你必須解析第一個 api 結果然後把它傳遞給第二個。
//無法編譯
fnmain(){
letfiles_future=get_user(1)
.and_then(|user_string|parse::<User>())
.and_then(|user|get_files_for_user(user));
matchblock_on(files_future){
Ok(files)=>println!("UserFiles:{}",files),
Err(err)=>println!("Therewasanerror:{}",err),:w
};
}
這看起來很好,但是無法編譯,並且會有個晦澀的錯誤資訊說它期望得到像ApiError
的東西但是卻找到了一個ParseError
。你可以在解析返回的Result
上使用過map_err
組合子,但是對於 future 應該如何處理呢?如果我們為 TryFuture 實現一個map_err
,那麼我們可以重寫成下面這樣:
//無法編譯
fnmain(){
letfiles_future=get_user(1)
.map_err(|e|format!("ApiError:{}",e))
.and_then(|user_string|parse::<User>())
.map_err(|e|format!("ParseError:{}",e))
.and_then(|user|get_files_for_user(user))
.map_err(|e|format!("ApiError:{}",e));
matchblock_on(files_future){
Ok(files)=>println!("UserFiles:{}",files),
Err(err)=>println!("Therewasanerror:{}",err),:w
};
}
如果這讓你看著比較混亂,請繼續關注本系列的第三部分,我將談談如何處理這個問題和你可能會在使用 future 時遇到的其他問題。
下面是我們實現map_err
的方式
modfuture{
pubtraitTryFuture{
//...
fnmap_err<E,F>(self,f:F)->MapErr<Self,F>
where
F:FnOnce(Self::Error)->E,
Self:Sized,
{
MapErr{
future:self,
f:Some(f),
}
}
}
//...
pubstructMapErr<Fut,F>{
future:Fut,
f:Option<F>,
}
impl<Fut,F,E>FutureforMapErr<Fut,F>
where
Fut:TryFuture,
F:FnOnce(Fut::Error)->E,
{
typeOutput=Result<Fut::Ok,E>;
fnpoll(&mutself,cx:&Context)->Poll<Self::Output>{
matchself.future.try_poll(cx){
Poll::Ready(result)=>{
letf=self.f.take().unwrap();
Poll::Ready(result.map_err(f))
}
Poll::Pending=>Poll::Pending,
}
}
}
}
fnmain(){
letmy_future=future::ready(1)
.map(|val|val+1)
.then(|val|future::ready(val+1))
.map(Ok)
.and_then(|val|future::ready(Ok(val+1)))
.map_err(|_:()|5);
println!("Output:{:?}",block_on(my_future));
}
Playground 連結[12]
唯一比較陌生的地方是Poll::Ready(result.map_err(f))
。在這段程式碼裡,我們傳遞我們的閉包到Result
型別的map_err
函式裡。
包裝 (Wrap Up)
現在,文章開頭的程式碼可以運行了!比較酷的是這些全都是我們自己實現的。還有很多其他用途的組合子,但是它們幾乎都是相同的方式構建的。讀者可以自己練習一下,試試實現一個map_ok
組合子,行為類似於TryFuture
上的map_err
但是適用於成功的結果。
Playground 連結[13]
概要重述(Recap)
Rust 中的 Future 之所以如此強大,是因為有一套可以用於連結計算和非同步呼叫的組合子。
我們也學習了 Rust 強大的函式指標 trait,
FnOnce
,FnMut
和Fn
。我們已經瞭解瞭如何使用嵌入在 future 中的 Result 型別。
接下來
在第三部分中,我們將會介紹使錯誤處理沒有那麼痛苦的方式,當你有很多分支時,如何處理返回的 future,以及我們將深入到 async/await 這個令人激動的世界。
參考資料
這裡: https://www.viget.com/articles/understanding-futures-in-rust-part-1/
[2]這個playground連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=c354bc3ffaf4cbb5502e839f96459023
[3]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=6cebb88919bd65411178ce8019a3aa06
[4]Playground 連結: https://www.viget.com/articles/understanding-futures-is-rust-part-2/
[5]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=9e7fca1f3c6f2f5f91b25622db71635f
[6]Rust book中的相關內容: https://doc.rust-lang.org/book/ch13-01-closures.html
[7]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=9c427527c64b4dd5238c508de1d4151a
[8]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=d86c1223ed4318dcbfa3539ca9a021f2
[9]Chain: https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.StreamExt.html#method.chain
[10]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=78daa6a5e60df17d8334199c43fe1e36
[11]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=71fe0962974657f6b9be25510a652b3d
[12]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f9a6cc9cddaac1a43a85bc24db436964
[13]Playground 連結: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=92a88fffb74ad350a4db1970b646c41f