C++非同步呼叫利器future/promise實現原理
前言
在非同步程式設計中,各種回撥將讓人眼花繚亂,程式碼分散,維護起來十分困難。boost和C++11 的 future/promise 提供了一個很好的解決方案,使得程式碼更加漂亮、易維護。
在工作中,我也用過幾次future/promise,但是還是十分生疏,所以決定學習下它的原理,用起來才更加順暢。
查了很多資料,發現很多語言都有這個機制,但是關於C++的promise的資料卻很少,只有一些使用的教程,而沒有找到原理方面的。
“原始碼之前,了無祕密。”
所以還是決定從原始碼入手學習!
本文針對的原始碼是借鑑boost實現的版本,由於copyright的原因,就不貼出完整的原始碼了,需要學習的朋友可以參見boost的實現。
關於future/promise的簡介,參見我之前寫的一篇博文:
一、基於Future/Promise的非同步同步化程式設計依賴的元件
- bind和callback,類似與boost庫的bind和function;
- shared_ptr、scoped_ptr、tuple、exception,參考boost庫中的shared_ptr、scoped_ptr、tuple、exception實現;
- Future和Promise,借鑑boost庫的future設計思想;
- when_all,通過Future、Promise、tuple來實現,針對非同步並行的同步化。
二、Function和Bind的用法
參見我之前寫的一篇博文:
下面提到的bind類似於boost的bind,而Callback類似於Function,一般跟bind搭配使用。
三、shared_ptr、scoped_ptr、tuple
shared_ptr:引用計數。
scoped_ptr:不可轉移所有權。
Tuple:很多的時候我們需要為函式返回多個值,我們可以使用class或struct來封裝要返回的多個值,然後返回封裝struct或class,但是使用這種方法的弊端就是增加的程式的程式碼量,最好是能通過一種匿名的struct或class來解決這個問題。Boost::tuple就為我們提供了一種類似於匿名struct的方法為我們解決函式的多個返回值的問題。既增強了程式碼的可讀性又不增加程式碼量。其實std::pair就是boost::tuple的2個引數的特例,對boost::tuple你可以繫結更多的引數,或者你可以迭代實現無限多引數的情況。
四、EnableSharedFromThis
使用boost庫時,經常會看到如下的類:
class A:public enable_share_from_this<A>
在什麼情況下要使類A繼承enable_share_from_this?
使用場合 :當類A被share_ptr管理,且在類A的成員函式裡需要把當前類物件作為引數傳給其他函式時,就需要傳遞一個指向自身的share_ptr。
我們就使類A繼承enable_share_from_this,然後通過其成員函式 share_from_this()返回當指向自身的share_ptr。
以上有2個疑惑:
1.把當前類物件作為引數傳給其他函式時,為什麼要傳遞share_ptr呢?直接傳遞this指標不可以嗎?
一個裸指標傳遞給呼叫者,誰也不知道呼叫者會幹什麼?假如呼叫者delete了該物件,而share_tr此時還指向該物件。
2.這樣傳遞share_ptr可以嗎?share_ptr< this >
這樣會造成2個非共享的share_ptr指向一個物件,最後造成2次析構該物件。
boost官方文件中一個非常典型的例子:
部分程式碼:
1 class tcp_connection
2 : public boost::enable_shared_from_this<tcp_connection>
3 {
4 public:
5 typedef boost::shared_ptr<tcp_connection> pointer;
6
7 static pointer create(boost::asio::io_service& io_service)
8 {
9 return pointer(new tcp_connection(io_service));
10 }
11
12 tcp::socket& socket()
13 {
14 return socket_;
15 }
16
17 void start()
18 {
19 message_ = make_daytime_string();
20
21 boost::asio::async_write(socket_, boost::asio::buffer(message_),
22 boost::bind(&tcp_connection::handle_write, shared_from_this(),
23 boost::asio::placeholders::error,
24 boost::asio::placeholders::bytes_transferred));
25 }
26
27 private:
28 tcp_connection(boost::asio::io_service& io_service)
29 : socket_(io_service)
30 {
31 }
32
33 void handle_write(const boost::system::error_code& /*error*/,
34 size_t /*bytes_transferred*/)
35 {
36 }
37
38 tcp::socket socket_;
39 std::string message_;
40 };
類tcp_connection繼承enable_share_from_this,在22行裡,它的成員函式start(),通過share_from_this返回指向自身的share_ptr。
五、Future和Promise
5.1 簡介
Promise物件可儲存T型別的值,該值可被future物件讀取(可能在另一個執行緒中),這是promise提供的同步的一種手段。
在構造promise時,promise物件可以與共享狀態關聯起來,這個共享狀態可以儲存一個T型別或者一個由std::exception派生出的類的值,並可以通過get_future來獲取與promise物件關聯的物件,呼叫該函式之後,兩個物件共享相同的共享狀態(shared state)。
Promise物件是非同步provider,它可以在某一時刻設定共享狀態的值。
Future物件可以返回共享狀態的值,或者在必要的情況下阻塞呼叫者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。
5.2 關係圖總覽
5.3從使用入手學習
下面是使用future/promise的一個例子(目前僅討論序列呼叫的情況):
//服務對外介面,序列呼叫
taf::Int32
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
//設定非同步回包
current->setResponse(false);
// 向服務B傳送非同步請求,返回值的型別是
// promise::Future<std::string>,
// 意思就是服務B未來會返回一個string型別的資料
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
// f呼叫其成員函式then,給未來要到達的string型別的
// 返回結果設定一個處理函式
// 在handleBRspAndSendCReq中獲取返回結果,
// 並return sendCReq(),即f2,然後f2通過鏈式法則呼叫then
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));
return 0;
}
promise::Future<std::string>
sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
// 定義一個promise::Promise<std::string>型別的變數promise,
// 其目的是承諾會在promise裡面存放一個string型別的資料,
// 然後把這個變數傳到BServantCallback物件中,
// 然後發起非同步呼叫
// 最後返回promise.getFuture(),
// 意思是promise承諾的string型別資料
// 可以通過promise::Future<std::string>型別的
// promise.getFuture()來獲得
promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture(); //返回一個future給f
}
//////////////////////////////////////////////////////
promise::Future<std::string>
sendCReq(CServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
//這個跟sendBReq的意思類似
//……
}
//////////////////////////////////////////////////////
promise::Future<std::string>
handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future)
{
std::string sResult("");
std::string sException("");
try
{
//此行程式碼被執行的時候,promie承諾給future的資料已經達到
//達到的資料分兩種情況,一是正常的資料,即請求服務B的結果資料返回過來了,
//那麼呼叫future.get()會得到這個資料
//二是異常的資料,即請求服務B的結果資料沒有返回,比如非同步呼叫超時了
//那麼呼叫future.get()會丟擲異常,所以需要try-catch一下
sResult = future.get();
return sendCReq(prx, sResult, current);
}
catch (exception& e)
{
TLOGDEBUG("Exception:" << e.what() << endl);
sException = e.what();
}
promise::Promise<std::string> promise;
promise.setValue(sException);
return promise.getFuture();
}
//////////////////////////////////////////////////////
int
handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
{
int ret = 0;
std::string sResult("");
try
{
//與handleBRspAndSendCReq處理類似
sResult = future.get();
}
catch (exception& e)
{
ret = -1;
sResult = e.what();
TLOGDEBUG("Exception:" << e.what() << endl);
}
//回包
AServant::async_response_queryResultSerial(current, ret, sResult);
return 0;
}
我們一步步看看發生了什麼吧~
5.4定義並初始化一個Future型別的變數
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
sendBReq通過promise.getFuture()返回了一個Future,用它來初始化f。
兩個問題:
1.promise.getFuture()是怎麼來的?
2.f是如何初始化的?
1.promise.getFuture()是怎麼來的?
promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture(); //返回一個future給f
promise內部有一個數據成員:
SharedPtr<detail::FutureObjectInterface<T> > m_future;
該成員的預設構造:
Promise()
: m_future(SharedPtr<detail::FutureObject<T> >(new detail::FutureObject<T>()))
{}
它使用了FutureObject來作為FutureObjectInterface的具體實現。
Promise的getFuture()方法用m_future構造了一個臨時物件Future< T >(該建構函式為private,因此需要Future將Promise設為友元)並返回,因此promise.getFuture()臨時物件中的m_future和promise中的m_future指向同一個FutureObject物件。
Future<T> getFuture() const
{
return Future<T> (m_future);
}
2.f是如何初始化的?
Future< T >繼承自FutureBase< T >,繼承的資料成員:
typedef SharedPtr<detail::FutureObjectInterface<T> > FuturePtr;
FuturePtr m_future;
我們的目的就是用promise.getFuture()(一個匿名的臨時物件)的m_future來初始化f的m_future,使promise、promise.getFuture()和f的m_future均指向同一個物件,之後promise.getFuture()臨時物件析構,只剩下promise和f的m_future指向同一個物件,有了這個共享,我們就可以在promise中進行賦值(在BServantCallback中呼叫setValue進行賦值),而在f中進行讀取(通過f.get())!
Future< T >的3個public建構函式:
Future() {}
explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>( SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)) )
{}
Future(const ExceptionPtr& e)
: detail::FutureBase<T>(e)
{}
對於第二個,由於T為string,detail::FutureTraits< T >::rvalue_source_type實際上就是 const std::string&。
從上面看,並沒有匹配的建構函式可用,且其父類也沒有拷貝建構函式,因此編譯器會進行成員逐個拷貝,最終將sendBReq中的m_future成員拷貝過來,該成員由shared_ptr進行管理,因此promise和f的m_future指向同一個物件的目的達到。
5.5繫結回撥函式進行處理,處理完畢之後鏈式呼叫
當promise承諾的值設定好之後,需要回調函式進行處理。因此我們需要通過then來繫結回撥函式。另外,為支援鏈式呼叫,then應該返回一個future,這個future一般是回撥函式的返回值,在回撥函式中通過promise.getFuture()來獲取。
Future< T >從FutureBase< T >繼承下來的成員函式:
get();
isDone();
hasValue();
hasException();
operator unspecified_bool_type() const; // Returns true if this future has been initialized.
以上函式都是轉呼叫m_future的相關函式,因此整個future的細節封裝在了FutureObject和PromptFutureObject當中,這裡先不深究。
Future< T >另外自己實現了一個函式then:
/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.
*
* \throws std::bad_alloc if memory is unavailable.
*/
template <typename R>
Future<typename detail::resolved_type<R>::type>
then(const Callback<R(const Future&)>& callback) const
{
typedef typename detail::resolved_type<R>::type value_type;
if (!this->m_future)
{
throwException(FutureUninitializedException(__FILE__, __LINE__));
}
Promise<value_type> promise;
this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
return promise.getFuture();
}
該函式接受一個Callback物件作為引數,Callback封裝了一個函式,其返回值為R,引數為Future。
比如以下呼叫:
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current));
其中handleBRspAndSendCReq的簽名如下:
promise::Future<std::string> handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future);
bind綁定了函式handleBRspAndSendCReq的前兩個引數prx和current,剩下第三個引數future。
5.5.1 then的返回值型別
看完引數,我們來看看then的返回值型別:
Future<typename detail::resolved_type<R>::type>
在上面例子中,handleBRspAndSendCReq的返回型別為promise::Future< std::string >,它被用來具現化then的模板引數R。
為什麼這裡的返回值型別不直接使用R,而要通過resolved_type來決議呢?
我們先看一下resolved_type的定義:
template <typename T>
struct resolved_type
{
typedef T type;
};
template <typename T>
struct resolved_type<Future<T> >
{
typedef T type;
};
resolved_type< T >的type為T;
resolved_type< Future< T > >的type也為T。
無論是普通的T,還是Future< T >,通過resolved_type決議出的type成員都為T。
我們看以下另一個then的呼叫:
f.then(promise::bind(&handleCRspAndReturnClient, current));
其中handleCRspAndReturnClient的簽名如下:
int handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
此時將用int來具現化模板引數R。
為了可進行鏈式呼叫,我們應該保證then返回的是一個Future,因此這時不能直接用R,而需要用Future< R >。如果R本身就是Future< T >,我們則可以通過resolved_type將T萃取出來。
5.5.2 then的函式體
首先明確下then的使命:
1.註冊一個回撥函式,來處理就緒的future;
2.將回調函式的返回值帶回來,返回一個future給使用者做鏈式呼叫。
函式體內的操作:
1.首先保證m_future已經初始化。
2.定義了一個Promise變數:
Promise<value_type> promise;
3.呼叫this->m_future->registerCallback()來註冊我們傳進來的callback函式,這裡使用了bind、SequentialCallback進行了包裝:
this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
4.返回promise的future:
return promise.getFuture();
具體的細節這裡暫不探討,目前只需要明白,這裡的promise承諾返回一個Future< value_type >而value_type是跟函式callback(比如handleBRspAndSendCReq)的返回值息息相關的(比如handleBRspAndSendCReq返回一個Future< std::string >,其value_type為string,handleCRspAndReturnClient返回一個int,其value_type為int)。
Promise和SequentialCallback的作用就是把callback的返回值給帶回來,最終返回給使用者來做鏈式呼叫。
2016/9/15更新:
帶回callback的返回值
現在簡單說下在then中,如何將callback的返回值帶回來並生成一個future返回:
// then()中的程式碼片段
Promise<value_type> promise;
this->m_future->registerCallback(bind(
&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
return promise.getFuture();
這裡多引入了一層promise,該promise**承諾帶回函式callback()的返回值**(如果該返回值是一個future,則還是帶回一個future,如果不是future,比如int,則帶回一個future< int >)。
可以看到,bind繫結的是SequentialCallback的成員函式run(template表示這是一個模板)。
通過第二個引數的SequentialCallback物件進行呼叫,該物件封裝了callback和一個promise。
當上一層的future就緒時,會呼叫回撥函式,此時呼叫的是SequentialCallback 的run函式,而不是真正的callback。在run函式中,再呼叫真正的callback進行處理,並將callback的返回值設定到SequentialCallback物件的promise當中,而then返回的正是這個promise關聯的future。
因此,通過一層間接的future/promise,then成功地返回了callback的返回值的一個future。
Run有多個過載版本:
// For callback which returns void.
template <typename U>
typename enable_if<is_void<U> >::type run(const FuturePtr& future)
{
try
{
m_callback(future);
m_promise.set();
}
catch (...)
{
m_promise.setException(currentException());
}
}
當callback返回值為void時,無需返回值,所以promise呼叫空的set。
// For callback which returns non-void non-future type
template <typename U>
typename enable_if_c<!is_void<U>::value && !is_future_type<U>::value>::type
run(const FuturePtr& future)
{
try
{
m_promise.setValue(m_callback(future));
}
catch (...)
{
m_promise.setException(currentException());
}
}
當callback返回值為非void且非future時,呼叫m_callback並將返回值設定到promise中,於是該值可以通過promise.getFuture().get()來獲取。
// For callback which returns future type.
template <typename U>
typename enable_if<is_future_type<U> >::type run(const FuturePtr& future)
{
try
{
m_callback(future).then(
bind(&ForwardValue<value_type>::template run<value_type>,
owned(new ForwardValue<value_type>(m_promise))));
}
catch (...)
{
m_promise.setException(currentException());
}
}
當callback返回值為future時,則對該future使用then繫結到ForwardValue的run函式當中(類似的手法),run函式中再通過get()方法把future內的值取出來,並設定到then中定義的promise當中,於是then的返回值便以一個future的形式存放著callback的返回值。
至於如何對型別進行判斷,當然是利器traits技法,這裡不再展開。
5.5.3 then函式總結
我們重新看一下下面這個例子,理清這兩句程式碼包含的內容:
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));
首先,我們在sendBReq中定義了一個Promise,並把它繫結到BServantCallback中。
當非同步呼叫回包時,將回調BServantCallback,在裡面呼叫Promise的setValue進行賦值。
賦值完畢之後,將呼叫通過then繫結的回撥handleBRspAndSendCReq來處理。
由於我們通過promise.getFuture()使得f和promise的m_future**指向了同一個物件**,所以我們在回撥handleBRspAndSendCReq中可以通過f.get()來讀取該值。
f.get()只是完成了handleBRsp部分,在SendCReq的時候,類似於sendBReq,我又定義了一個Promise,我們需要把與之關聯的future(通過promise.getFuture()獲取)作為handleBRspAndSendCReq的返回值,並通過then中的Promise和SequentialCallback將這個future返回給使用者,從而使用者可以繼續呼叫then來指定handle。
總結:then幫我們把handle回撥函式註冊到future當中,當future可讀時,將呼叫該handle進行處理,then還為我們把handle的返回值帶回來,以供鏈式呼叫。
5.6 future和promise的關係梳理
兩者都有一個m_future成員,其型別為
SharedPtr<detail::FutureObjectInterface<T> >
由於為Future< T >需要針對void進行特化,為避免過多重複的程式碼,把與特化無關的部分抽離出來形成FutureBase作為基類。
從上圖可以看出,Future和Promise均持有m_future,兩者正是通過該物件進行共享、關聯的(通過promise.getFuture()實現)。其中Promise對外提供了對m_future的set(寫)介面,而Future對外提供了m_future的get(讀)介面。
5.7 FutureObjectInterface的實現
下圖展示了FutureObjectInterface的具體實現:
可以看到,FutureObjectInterface有FutureObject和PromptFutureObject兩種實現。
Promise的FutureObjectInterface 是一個FutureObject。
Future的FutureObjectInterface有兩種情況:直接用一個值來構造Future時(比如呼叫makeFuture來獲取一個future)用的是PromptFutureObject,而其他情況(比如通過Promise獲得的future)用的是FutureObject。
那麼,兩者有何區別呢?
對於第一個應用場景,future不是通過promise來獲取的,而是直接用一個立即數構造:
explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>(SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)))
{}
比如下面這個應用場景:
Future< int > hsF = makeFuture<int>(-1); //使用立即數構造
if (openSwitch)
{
// sendBReq中進行非同步呼叫,並通過promise.getFuture()返回一個future
hsF = sendBReq();
}
// …
在handle中,我們可以通過以下判斷來決定是否處理:
int result = hsF.get();
if (result != -1)
{
// handle
}
立即數構造這種用法,由於其值已經設定了,不需要等待promise填值進去,因此該future內部的PromptFutureObject是隻讀的,也就不需要加鎖。如果還是使用FutureObject這個版本,將會在加鎖解鎖上做無用功。因此PromptFutureObject是針對這種場景進行優化的。
而當Future 與Promise共享同一個m_future時,由於Future和Promise可能在不同執行緒中,因此可能同時讀寫,這裡存在race condition,因此需要加鎖。FutureObject正是一個加鎖的版本。
關於FutureObject,有幾個需要注意的點:
1.在一開始介紹的時候我們說過,Future可以獲取共享狀態的值(通過get()方法),在必要的情況下阻塞呼叫者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。
2.setValue只能被呼叫一次,即共享狀態的值只能設定一次,如果試圖設定第二次,將丟擲異常。
3.在registerCallback時,根據m_is_done來判斷是否已經setValue,如果m_is_done為true,則直接呼叫callback(this->sharedFromThis())來處理,否則將callback加入m_pending_callbacks列表中,等待setValue之後呼叫。在setValue中,除了設定值之外,還會呼叫doPendingCallbacks()函式,在該函式中逐個呼叫m_pending_callbacks列表中的callback。
最後,關於when_all的實現(並行非同步呼叫),後續有時間再補充~