1. 程式人生 > >C++非同步呼叫利器future/promise實現原理

C++非同步呼叫利器future/promise實現原理

前言

在非同步程式設計中,各種回撥將讓人眼花繚亂,程式碼分散,維護起來十分困難。boost和C++11 的 future/promise 提供了一個很好的解決方案,使得程式碼更加漂亮、易維護。

在工作中,我也用過幾次future/promise,但是還是十分生疏,所以決定學習下它的原理,用起來才更加順暢。

查了很多資料,發現很多語言都有這個機制,但是關於C++的promise的資料卻很少,只有一些使用的教程,而沒有找到原理方面的。

“原始碼之前,了無祕密。”

所以還是決定從原始碼入手學習!

本文針對的原始碼是借鑑boost實現的版本,由於copyright的原因,就不貼出完整的原始碼了,需要學習的朋友可以參見boost的實現。

關於future/promise的簡介,參見我之前寫的一篇博文:

一、基於Future/Promise的非同步同步化程式設計依賴的元件

  1. bind和callback,類似與boost庫的bind和function;
  2. shared_ptr、scoped_ptr、tuple、exception,參考boost庫中的shared_ptr、scoped_ptr、tuple、exception實現;
  3. Future和Promise,借鑑boost庫的future設計思想;
  4. when_all,通過Future、Promise、tuple來實現,針對非同步並行的同步化。

二、Function和Bind的用法

參見我之前寫的一篇博文:

下面提到的bind類似於boost的bind,而Callback類似於Function,一般跟bind搭配使用。

三、shared_ptr、scoped_ptr、tuple

shared_ptr:引用計數。

scoped_ptr:不可轉移所有權。

Tuple:很多的時候我們需要為函式返回多個值,我們可以使用class或struct來封裝要返回的多個值,然後返回封裝struct或class,但是使用這種方法的弊端就是增加的程式的程式碼量,最好是能通過一種匿名的struct或class來解決這個問題。Boost::tuple就為我們提供了一種類似於匿名struct的方法為我們解決函式的多個返回值的問題。既增強了程式碼的可讀性又不增加程式碼量。其實std::pair就是boost::tuple的2個引數的特例,對boost::tuple你可以繫結更多的引數,或者你可以迭代實現無限多引數的情況。

四、EnableSharedFromThis

使用boost庫時,經常會看到如下的類:

class A:public enable_share_from_this<A>

在什麼情況下要使類A繼承enable_share_from_this?

使用場合 :當類A被share_ptr管理,且在類A的成員函式裡需要把當前類物件作為引數傳給其他函式時,就需要傳遞一個指向自身的share_ptr。

我們就使類A繼承enable_share_from_this,然後通過其成員函式 share_from_this()返回當指向自身的share_ptr。

以上有2個疑惑:

1.把當前類物件作為引數傳給其他函式時,為什麼要傳遞share_ptr呢?直接傳遞this指標不可以嗎?

一個裸指標傳遞給呼叫者,誰也不知道呼叫者會幹什麼?假如呼叫者delete了該物件,而share_tr此時還指向該物件。

2.這樣傳遞share_ptr可以嗎?share_ptr< this >

這樣會造成2個非共享的share_ptr指向一個物件,最後造成2次析構該物件。

boost官方文件中一個非常典型的例子:

部分程式碼:

 1 class tcp_connection
 2   : public boost::enable_shared_from_this<tcp_connection>
 3 {
 4 public:
 5   typedef boost::shared_ptr<tcp_connection> pointer;
 6 
 7   static pointer create(boost::asio::io_service& io_service)
 8   {
 9     return pointer(new tcp_connection(io_service));
10   }
11 
12   tcp::socket& socket()
13   {
14     return socket_;
15   }
16 
17   void start()
18   {
19     message_ = make_daytime_string();
20 
21     boost::asio::async_write(socket_, boost::asio::buffer(message_),
22         boost::bind(&tcp_connection::handle_write, shared_from_this(),
23           boost::asio::placeholders::error,
24           boost::asio::placeholders::bytes_transferred));
25   }
26 
27 private:
28   tcp_connection(boost::asio::io_service& io_service)
29     : socket_(io_service)
30   {
31   }
32 
33   void handle_write(const boost::system::error_code& /*error*/,
34       size_t /*bytes_transferred*/)
35   {
36   }
37 
38   tcp::socket socket_;
39   std::string message_;
40 };

類tcp_connection繼承enable_share_from_this,在22行裡,它的成員函式start(),通過share_from_this返回指向自身的share_ptr。

五、Future和Promise

5.1 簡介

Promise物件可儲存T型別的值,該值可被future物件讀取(可能在另一個執行緒中),這是promise提供的同步的一種手段。

在構造promise時,promise物件可以與共享狀態關聯起來,這個共享狀態可以儲存一個T型別或者一個由std::exception派生出的類的值,並可以通過get_future來獲取與promise物件關聯的物件,呼叫該函式之後,兩個物件共享相同的共享狀態(shared state)

Promise物件是非同步provider,它可以在某一時刻設定共享狀態的值。

Future物件可以返回共享狀態的值,或者在必要的情況下阻塞呼叫者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。

5.2 關係圖總覽

這裡寫圖片描述

5.3從使用入手學習

下面是使用future/promise的一個例子(目前僅討論序列呼叫的情況):

//服務對外介面,序列呼叫
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
    //設定非同步回包
    current->setResponse(false);

    // 向服務B傳送非同步請求,返回值的型別是
    // promise::Future<std::string>,
    // 意思就是服務B未來會返回一個string型別的資料
    promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);

    // f呼叫其成員函式then,給未來要到達的string型別的
    // 返回結果設定一個處理函式
    // 在handleBRspAndSendCReq中獲取返回結果,
    // 並return sendCReq(),即f2,然後f2通過鏈式法則呼叫then
    f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
    .then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}

promise::Future<std::string> 
sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
    // 定義一個promise::Promise<std::string>型別的變數promise,
    // 其目的是承諾會在promise裡面存放一個string型別的資料,
    // 然後把這個變數傳到BServantCallback物件中,
    // 然後發起非同步呼叫
    // 最後返回promise.getFuture(),
    // 意思是promise承諾的string型別資料
    // 可以通過promise::Future<std::string>型別的
    // promise.getFuture()來獲得

    promise::Promise<std::string> promise;

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

    prx->async_queryResult(cb, sIn);

    return promise.getFuture();     //返回一個future給f
}

//////////////////////////////////////////////////////
promise::Future<std::string> 
sendCReq(CServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
    //這個跟sendBReq的意思類似
    //……
}

//////////////////////////////////////////////////////
promise::Future<std::string> 
handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future)
{
    std::string sResult("");
    std::string sException("");
    try
    {
        //此行程式碼被執行的時候,promie承諾給future的資料已經達到
        //達到的資料分兩種情況,一是正常的資料,即請求服務B的結果資料返回過來了,
        //那麼呼叫future.get()會得到這個資料
        //二是異常的資料,即請求服務B的結果資料沒有返回,比如非同步呼叫超時了
        //那麼呼叫future.get()會丟擲異常,所以需要try-catch一下

        sResult = future.get();
        return sendCReq(prx, sResult, current);
    }
    catch (exception& e)
    {
        TLOGDEBUG("Exception:" << e.what() << endl);
        sException = e.what();
    } 

    promise::Promise<std::string> promise;
    promise.setValue(sException); 
    return promise.getFuture();
}

//////////////////////////////////////////////////////
int 
handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
{
    int ret = 0;
    std::string sResult("");

    try
    {
        //與handleBRspAndSendCReq處理類似
        sResult = future.get();
    }
    catch (exception& e)
    {
        ret = -1;
        sResult = e.what();
        TLOGDEBUG("Exception:" << e.what() << endl);
    }

    //回包
    AServant::async_response_queryResultSerial(current, ret, sResult);
    return 0;
}

我們一步步看看發生了什麼吧~

5.4定義並初始化一個Future型別的變數

promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current); 

sendBReq通過promise.getFuture()返回了一個Future,用它來初始化f。

兩個問題:
1.promise.getFuture()是怎麼來的?
2.f是如何初始化的?

1.promise.getFuture()是怎麼來的?

promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture();     //返回一個future給f

promise內部有一個數據成員:

SharedPtr<detail::FutureObjectInterface<T> > m_future; 

該成員的預設構造:

Promise()
        : m_future(SharedPtr<detail::FutureObject<T> >(new detail::FutureObject<T>()))
    {}

它使用了FutureObject來作為FutureObjectInterface的具體實現。

Promise的getFuture()方法用m_future構造了一個臨時物件Future< T >(該建構函式為private,因此需要Future將Promise設為友元)並返回,因此promise.getFuture()臨時物件中的m_future和promise中的m_future指向同一個FutureObject物件。

Future<T> getFuture() const
{
    return Future<T> (m_future);
}

2.f是如何初始化的?

Future< T >繼承自FutureBase< T >,繼承的資料成員:

typedef SharedPtr<detail::FutureObjectInterface<T> > FuturePtr;
FuturePtr m_future;

我們的目的就是用promise.getFuture()(一個匿名的臨時物件)的m_future來初始化f的m_future,使promise、promise.getFuture()和f的m_future均指向同一個物件,之後promise.getFuture()臨時物件析構,只剩下promise和f的m_future指向同一個物件,有了這個共享,我們就可以在promise中進行賦值(在BServantCallback中呼叫setValue進行賦值),而在f中進行讀取(通過f.get())!

Future< T >的3個public建構函式:

Future() {}

explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>( SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)) )
    {}

Future(const ExceptionPtr& e)
        : detail::FutureBase<T>(e)
{}

對於第二個,由於T為string,detail::FutureTraits< T >::rvalue_source_type實際上就是 const std::string&。

從上面看,並沒有匹配的建構函式可用,且其父類也沒有拷貝建構函式,因此編譯器會進行成員逐個拷貝,最終將sendBReq中的m_future成員拷貝過來,該成員由shared_ptr進行管理,因此promise和f的m_future指向同一個物件的目的達到。

5.5繫結回撥函式進行處理,處理完畢之後鏈式呼叫

當promise承諾的值設定好之後,需要回調函式進行處理。因此我們需要通過then來繫結回撥函式。另外,為支援鏈式呼叫,then應該返回一個future,這個future一般是回撥函式的返回值,在回撥函式中通過promise.getFuture()來獲取。

Future< T >從FutureBase< T >繼承下來的成員函式:

get();
isDone();
hasValue();
hasException();
operator unspecified_bool_type() const; // Returns true if this future has been initialized.

以上函式都是轉呼叫m_future的相關函式,因此整個future的細節封裝在了FutureObject和PromptFutureObject當中,這裡先不深究。

Future< T >另外自己實現了一個函式then:

/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.
* 
* \throws std::bad_alloc if memory is unavailable.
*/
template <typename R>
Future<typename detail::resolved_type<R>::type> 
then(const Callback<R(const Future&)>& callback) const
{
    typedef typename detail::resolved_type<R>::type value_type;

    if (!this->m_future)
    {
        throwException(FutureUninitializedException(__FILE__, __LINE__));
    }

    Promise<value_type> promise;

    this->m_future->registerCallback(
        bind(&detail::SequentialCallback<R, T>::template run<R>,
            owned(new detail::SequentialCallback<R, T>(callback, promise))));

    return promise.getFuture();
}

該函式接受一個Callback物件作為引數,Callback封裝了一個函式,其返回值為R,引數為Future。

比如以下呼叫:

f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current));

其中handleBRspAndSendCReq的簽名如下:

promise::Future<std::string> handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future);

bind綁定了函式handleBRspAndSendCReq的前兩個引數prx和current,剩下第三個引數future。

5.5.1 then的返回值型別

看完引數,我們來看看then的返回值型別:

Future<typename detail::resolved_type<R>::type> 

在上面例子中,handleBRspAndSendCReq的返回型別為promise::Future< std::string >,它被用來具現化then的模板引數R。

為什麼這裡的返回值型別不直接使用R,而要通過resolved_type來決議呢?

我們先看一下resolved_type的定義:

    template <typename T>
    struct resolved_type 
    {
        typedef T type;
    };

    template <typename T>
    struct resolved_type<Future<T> > 
    {
        typedef T type;
};

resolved_type< T >的type為T;
resolved_type< Future< T > >的type也為T。
無論是普通的T,還是Future< T >,通過resolved_type決議出的type成員都為T。

我們看以下另一個then的呼叫:

f.then(promise::bind(&handleCRspAndReturnClient, current));

其中handleCRspAndReturnClient的簽名如下:

int handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)

此時將用int來具現化模板引數R。

為了可進行鏈式呼叫,我們應該保證then返回的是一個Future,因此這時不能直接用R,而需要用Future< R >。如果R本身就是Future< T >,我們則可以通過resolved_type將T萃取出來。

5.5.2 then的函式體

首先明確下then的使命:
1.註冊一個回撥函式,來處理就緒的future;
2.將回調函式的返回值帶回來,返回一個future給使用者做鏈式呼叫。

函式體內的操作:
1.首先保證m_future已經初始化。
2.定義了一個Promise變數:

Promise<value_type> promise;

3.呼叫this->m_future->registerCallback()來註冊我們傳進來的callback函式,這裡使用了bind、SequentialCallback進行了包裝:

this->m_future->registerCallback(
    bind(&detail::SequentialCallback<R, T>::template run<R>,
             owned(new detail::SequentialCallback<R, T>(callback, promise))));

4.返回promise的future:

return promise.getFuture();

具體的細節這裡暫不探討,目前只需要明白,這裡的promise承諾返回一個Future< value_type >而value_type是跟函式callback(比如handleBRspAndSendCReq)的返回值息息相關的(比如handleBRspAndSendCReq返回一個Future< std::string >,其value_type為string,handleCRspAndReturnClient返回一個int,其value_type為int)。

Promise和SequentialCallback的作用就是把callback的返回值給帶回來,最終返回給使用者來做鏈式呼叫。

2016/9/15更新:

帶回callback的返回值

現在簡單說下在then中,如何將callback的返回值帶回來並生成一個future返回:

// then()中的程式碼片段
Promise<value_type> promise;

this->m_future->registerCallback(bind(
&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));

return promise.getFuture();

這裡多引入了一層promise,該promise**承諾帶回函式callback()的返回值**(如果該返回值是一個future,則還是帶回一個future,如果不是future,比如int,則帶回一個future< int >)。

可以看到,bind繫結的是SequentialCallback的成員函式run(template表示這是一個模板)。
通過第二個引數的SequentialCallback物件進行呼叫,該物件封裝了callback和一個promise。

當上一層的future就緒時,會呼叫回撥函式,此時呼叫的是SequentialCallback 的run函式,而不是真正的callback。在run函式中,再呼叫真正的callback進行處理,並將callback的返回值設定到SequentialCallback物件的promise當中,而then返回的正是這個promise關聯的future。

因此,通過一層間接的future/promise,then成功地返回了callback的返回值的一個future。

Run有多個過載版本:

// For callback which returns void.
template <typename U>
typename enable_if<is_void<U> >::type run(const FuturePtr& future)
{
    try 
    {
        m_callback(future);
        m_promise.set();
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為void時,無需返回值,所以promise呼叫空的set。

// For callback which returns non-void non-future type
template <typename U>
typename enable_if_c<!is_void<U>::value && !is_future_type<U>::value>::type
run(const FuturePtr& future)
{
    try 
    {
        m_promise.setValue(m_callback(future));
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為非void且非future時,呼叫m_callback並將返回值設定到promise中,於是該值可以通過promise.getFuture().get()來獲取。

// For callback which returns future type.
template <typename U>
typename enable_if<is_future_type<U> >::type run(const FuturePtr& future)
{
    try 
    {
        m_callback(future).then(
            bind(&ForwardValue<value_type>::template run<value_type>,
                owned(new ForwardValue<value_type>(m_promise))));
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為future時,則對該future使用then繫結到ForwardValue的run函式當中(類似的手法),run函式中再通過get()方法把future內的值取出來,並設定到then中定義的promise當中,於是then的返回值便以一個future的形式存放著callback的返回值。

至於如何對型別進行判斷,當然是利器traits技法,這裡不再展開。

5.5.3 then函式總結

我們重新看一下下面這個例子,理清這兩句程式碼包含的內容:

promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));

首先,我們在sendBReq中定義了一個Promise,並把它繫結到BServantCallback中。

當非同步呼叫回包時,將回調BServantCallback,在裡面呼叫Promise的setValue進行賦值。

賦值完畢之後,將呼叫通過then繫結的回撥handleBRspAndSendCReq來處理。

由於我們通過promise.getFuture()使得f和promise的m_future**指向了同一個物件**,所以我們在回撥handleBRspAndSendCReq中可以通過f.get()來讀取該值。

f.get()只是完成了handleBRsp部分,在SendCReq的時候,類似於sendBReq,我又定義了一個Promise,我們需要把與之關聯的future(通過promise.getFuture()獲取)作為handleBRspAndSendCReq的返回值,並通過then中的Promise和SequentialCallback將這個future返回給使用者,從而使用者可以繼續呼叫then來指定handle。

總結:then幫我們把handle回撥函式註冊到future當中,當future可讀時,將呼叫該handle進行處理,then還為我們把handle的返回值帶回來,以供鏈式呼叫。

5.6 future和promise的關係梳理

兩者都有一個m_future成員,其型別為

SharedPtr<detail::FutureObjectInterface<T> >

這裡寫圖片描述

由於為Future< T >需要針對void進行特化,為避免過多重複的程式碼,把與特化無關的部分抽離出來形成FutureBase作為基類。

從上圖可以看出,Future和Promise均持有m_future,兩者正是通過該物件進行共享、關聯的(通過promise.getFuture()實現)。其中Promise對外提供了對m_future的set(寫)介面,而Future對外提供了m_future的get(讀)介面。

5.7 FutureObjectInterface的實現

下圖展示了FutureObjectInterface的具體實現:

這裡寫圖片描述

可以看到,FutureObjectInterface有FutureObject和PromptFutureObject兩種實現。

Promise的FutureObjectInterface 是一個FutureObject。

Future的FutureObjectInterface有兩種情況:直接用一個值來構造Future時(比如呼叫makeFuture來獲取一個future)用的是PromptFutureObject,而其他情況(比如通過Promise獲得的future)用的是FutureObject。

那麼,兩者有何區別呢?

對於第一個應用場景,future不是通過promise來獲取的,而是直接用一個立即數構造:

explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>(SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)))
    {}

比如下面這個應用場景:

Future< int > hsF = makeFuture<int>(-1);    //使用立即數構造
if (openSwitch)
{
    // sendBReq中進行非同步呼叫,並通過promise.getFuture()返回一個future
    hsF = sendBReq();
}
// …

在handle中,我們可以通過以下判斷來決定是否處理:

int result = hsF.get();
if (result != -1)
{
    // handle
}

立即數構造這種用法,由於其值已經設定了,不需要等待promise填值進去,因此該future內部的PromptFutureObject是隻讀的,也就不需要加鎖。如果還是使用FutureObject這個版本,將會在加鎖解鎖上做無用功。因此PromptFutureObject是針對這種場景進行優化的。

而當Future 與Promise共享同一個m_future時,由於Future和Promise可能在不同執行緒中,因此可能同時讀寫,這裡存在race condition,因此需要加鎖。FutureObject正是一個加鎖的版本。

關於FutureObject,有幾個需要注意的點:
1.在一開始介紹的時候我們說過,Future可以獲取共享狀態的值(通過get()方法),在必要的情況下阻塞呼叫者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。
這裡寫圖片描述
這裡寫圖片描述

2.setValue只能被呼叫一次,即共享狀態的值只能設定一次,如果試圖設定第二次,將丟擲異常。
這裡寫圖片描述

3.在registerCallback時,根據m_is_done來判斷是否已經setValue,如果m_is_done為true,則直接呼叫callback(this->sharedFromThis())來處理,否則將callback加入m_pending_callbacks列表中,等待setValue之後呼叫。在setValue中,除了設定值之外,還會呼叫doPendingCallbacks()函式,在該函式中逐個呼叫m_pending_callbacks列表中的callback。

最後,關於when_all的實現(並行非同步呼叫),後續有時間再補充~