1. 程式人生 > 其它 >asio boost 非同步錯誤處理_C++20 await & asio 相關的學習

asio boost 非同步錯誤處理_C++20 await & asio 相關的學習

技術標籤:asio boost 非同步錯誤處理

背景:

最新版的asio(1-13-0)的api添加了對co_await的支援 (之前的版本也支援一些,只不過名稱空間還沒有正式放入asio)。

我打算學習下asio,也打算拋棄自己的網路庫了,也期望其他的庫或者driver能夠統一的使用asio+co_await,這樣我們應用程式開發者就能快速的引入庫(少一些心智負擔,不用擔心執行緒/同步/非同步/生命週期管理等各種模型差異性導致的顧慮)。[當然我也並不是說asio就是最好的選擇,但是大家都基於某個統一的庫去做開發,還是有很多好處的]。

目的:

我的目的是採用 asio+await開發一個同步非阻塞RPC(嗯,就是類似golang grpc),來替換調我的非同步RPC,進而通過它再重新開發Orleans-CPP簡易版本。

通過我個人理解,要實現同步RPC(當然也包括其他基於協程的應用)我們需要先實現幾個基礎功能):

  1. Channel - 用於實現協程之間的pipeline
  2. Mutex - 基於協程的鎖
  3. WaitGroup - 基於協程的等待物件
  4. CondVariables
  5. ……(帶補充)

因為同步RPC庫(支援多個協程使用同一個RPC物件去呼叫服務)的實現(個人理解,我本打看看grpc怎麼做的,但時間還不夠,所以~~~)大概如下:

開啟一個writer協程,使用channel收集其他協程(呼叫RPC函式時)產生的訊息(訊息裡包括一個request msg、一個用於接收此次請求的response的channel),開啟一個receive協程,來接收伺服器的網路訊息,並且從中解析各個respnse,並且根據其seqid (通過鎖)拿到其對應的呼叫者的channel,然後將response放入channel,以此喚醒呼叫者。

簡要實現:

下面是我寫的簡易版channel和協程鎖的實現(我這幾天才開始學習asio,所以難免有一些問題或者錯誤處理省略了,還請指教)

//
// echo_server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <cstdio>
#include <queue>
#include <mutex>
#include <iostream>

#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/signal_set.hpp>
#include <asio/write.hpp>
#include <asio/awaitable.hpp>
#include <asio/redirect_error.hpp>

using asio::ip::tcp;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;
using asio::redirect_error;
namespace this_coro = asio::this_coro;

template<typename T>
class Channel : public asio::noncopyable
{
public:
    using Ptr = std::shared_ptr<Channel<T>>;
    using Container = std::queue<T>;

    static Ptr Make(asio::io_context& context)
    {
        struct make_shared_enabler : public Channel<T>
        {
            make_shared_enabler(asio::io_context& context)
                :
                Channel<T>(context)
            {}
        };
        return std::make_shared<make_shared_enabler>(context);
    }

    asio::io_context& context()
    {
        return mContext;
    }

    auto& operator << (T value)
    {
        {
            std::lock_guard<std::mutex> lck(mValuesGuard);
            mValues.push(std::move(value));
        }
        mTimer.cancel_one();
        return *this;
    }

    asio::awaitable<T> takeOne()
    {
        while (true)
        {
            {
                std::lock_guard<std::mutex> lck(mValuesGuard);
                if (!mValues.empty())
                {
                    auto result = mValues.front();
                    mValues.pop();
                    co_return result;
                }
            }

            asio::error_code ec;
            co_await mTimer.async_wait(redirect_error(use_awaitable, ec));
        }
    }

    asio::awaitable<Container>  takeAll()
    {
        while (true)
        {
            {
                std::lock_guard<std::mutex> lck(mValuesGuard);
                if (!mValues.empty())
                {
                    co_return std::move(mValues);
                }
            }

            asio::error_code ec;
            co_await mTimer.async_wait(redirect_error(use_awaitable, ec));
        }
    }

private:
    Channel(asio::io_context& context)
        :
        mContext(context),
        mTimer(context)
    {
        mTimer.expires_at(std::chrono::steady_clock::time_point::max());
    }

private:
    asio::io_context&   mContext;
    asio::steady_timer  mTimer;

    Container           mValues;
    std::mutex          mValuesGuard;
};

class CoroutineMutex : public asio::noncopyable
{
public:
    using Ptr = std::shared_ptr<CoroutineMutex>;

    static Ptr Make(asio::io_context& context)
    {
        struct make_shared_enabler : public CoroutineMutex
        {
            make_shared_enabler(asio::io_context& context)
                :
                CoroutineMutex(context)
            {}
        };
        return std::make_shared<make_shared_enabler>(context);
    }

    asio::io_context& context()
    {
        return mContext;
    }

    awaitable<void> lock()
    {
        asio::error_code ec;
        while (mLocked.exchange(true))
        {
            co_await mTimer.async_wait(redirect_error(use_awaitable, ec));
        }
    }

    void unlock()
    {
        auto o = mLocked.exchange(false);
        assert(o);
        mTimer.cancel_one();
    }

private:
    CoroutineMutex(asio::io_context& context)
        :
        mLocked(false),
        mContext(context),
        mTimer(context)
    {
    }
    virtual ~CoroutineMutex() = default;

private:
    std::atomic_bool    mLocked;
    asio::io_context&   mContext;
    asio::steady_timer  mTimer;
};

static awaitable<void> producer(Channel<int>::Ptr channel)
{
    asio::steady_timer timer(channel->context());

    int i = 0;

    while (true)
    {
        timer.expires_from_now(std::chrono::seconds(1));
        asio::error_code ec;
        co_await timer.async_wait(redirect_error(use_awaitable, ec));

        (*channel) << i++;
    }
}

static awaitable<void> consumer(Channel<int>::Ptr channel)
{
    while (true)
    {
        {
            auto i = co_await channel->takeOne();
            std::cout << i << std::endl;
        }
        {
            auto v = co_await channel->takeAll();
            for (; !v.empty();)
            {
                std::cout << v.front() << std::endl;
                v.pop();
            }
        }
    }
}

std::atomic<int> global = 0;

static awaitable<void> testMutex(CoroutineMutex::Ptr mutex, int index, Channel<int>::Ptr channel)
{
    asio::steady_timer timer(mutex->context());

    for (int i = 0; i < 10; i++)
    {
        timer.expires_from_now(std::chrono::milliseconds(100+std::rand()%100));
        asio::error_code ec;
        co_await timer.async_wait(redirect_error(use_awaitable, ec));

        co_await mutex->lock();
        global++;
        std::cout << "in " << index << " take global is:" << global << std::endl;
        (*channel) << global;
        mutex->unlock();
    }
}

static awaitable<void> consumerGlobalChange(Channel<int>::Ptr channel)
{
    while (true)
    {
        auto currentGlobalValue = co_await channel->takeOne();
        std::cout << "current currentGlobalValue is :" << currentGlobalValue << std::endl;
    }
}

int main()
{
    try
    {
        asio::io_context io_context(1);

        asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { io_context.stop(); });

        {
            // 一個生產者協程和兩個消費者協程
            auto channel = Channel<int>::Make(io_context);
            co_spawn(io_context, [=]() {
                return producer(channel);
            }, detached);
            co_spawn(io_context, [=]() {
                return consumer(channel);
            }, detached);
            co_spawn(io_context, [=]() {
                return consumer(channel);
            }, detached);
        }
        
        {
            // 構造10個協程通過互斥進行修改全域性變數
            // 並在其中將全域性變數當前的值放入channel
            // 開啟一個協程通過channel去觀察全域性變數的變更
            auto mutex = CoroutineMutex::Make(io_context);
            auto consumerChannel = Channel<int>::Make(io_context);

            for (int i = 0; i < 10; i++)
            {
                co_spawn(io_context, [=]() {
                    return testMutex(mutex, i, consumerChannel);
                }, detached);
            }
            co_spawn(io_context, [=]() {
                return consumerGlobalChange(consumerChannel);
            }, detached);
        }
        
        io_context.run();
    }
    catch (std::exception & e)
    {
        std::printf("Exception: %sn", e.what());
    }
}

效能測試:

構造一百萬個協程,並且使用一百萬個channel將它們兩兩串連起來,然後做類似擊鼓傳花的操作。從第一個協程開始輸入,直到最後一個協程收到,程式碼如下:

const static auto MaxNum = 1000000;

static awaitable<void> foo(Channel<int>::Ptr in, Channel<int>::Ptr out)
{
    auto value = co_await in->takeOne();
    // 將收到的值+1傳遞給下一位夥伴
    (*out << (value + 1));
    if (value == MaxNum)
    {
        std::cout << "end" << std::endl;
        out->context().stop();
    }
}

static awaitable<void> start(Channel<int>::Ptr out)
{
    (*out << 1);
    co_return;
}

auto in = Channel<int>::Make(io_context);
auto out = Channel<int>::Make(io_context);

// 開啟一個協程做初始輸入動作
co_spawn(io_context, [=]() {
    return start(in);
}, detached);

for (int i = 0; i < MaxNum; i++)
{
    co_spawn(io_context, [=]() {
        return foo(in, out);
    }, detached);

    in = out;
    out = Channel<int>::Make(io_context);
}

在 的垃圾I5 Win10機器上,花費8s跑完整個程式,(程式只有一個io_context,所以是單執行緒)佔滿單個CPU核心,記憶體佔用最高峰為1G。

注:asio在開啟協程後,需要在下一次排程時才執行協程,awaituv則是立馬執行協程,所以asio這種做法在我的測試中相比awaituv所花費的時間(我猜測)可能是數量級的差異,當然這也沒太大意義,在真實場景下,協程並不是執行一次就馬上結束,它通常會存活一段時間的。

(剛才寫了一個Golang的版本,發現也是立即執行,只需要1s時間就跑完測試程式。可是一旦我將初始輸入動作放在開啟擊鼓傳花的協程動作之後(也即是說哪怕它們本身立即執行,但它們無法立即完成,所以必須等待/存活),時間耗費多少呢?抱歉,我沒跑出結果就強制關閉了程式(進度也尚不可知),此時記憶體耗費在2.9G。

當然,這個測試程式並不能完全說明channel本身的效能,或許更多的是測試協程排程的效能?

疑問/注意

  1. 要開發一個支援co_await的庫(類似最新版asio或者 awaituv)是非常困難的,我期望能夠有相當背景的公司開發一個通用的封裝,來支援其他開發者將非同步呼叫轉換為同步呼叫(就類似awaituv裡做過的一些事情/以及封裝,但它有bug或者功能還很簡陋·····)。 cppcoro是一個選擇,可惜它又幹了一些io/network相關事情(而且還沒幹好~~),而且~~~作者我不瞭解啊,我覺得背書不能啊)
  2. 如果一個函式是協程函式,但你呼叫時忘記co_await了,那就是開啟一個新協程,估計這容易成為一個坑?哈哈哈
  3. 我嘗試通過RAII做一個協程鎖的guard,但是建構函式裡不能co_await mutex->lock(); 相當尷尬,因為operator new、建構函式 等編譯器自動生成的函式都不是awaitable的啊~~~但如果它們是awiatable,那完蛋了,所有對他們的呼叫你基本上都得(不能忘記)加co_await。
  4. co_await 可以理解為一個操作符,或者把它當成一個函式?它接收的型別是awaitable,那我就不容易做鏈式呼叫啊:co_await channel >> a >> b >> c; (假設我們有一個全域性函式 awaitable<Channel<T>> operator >> (Channel<T>, T);),直觀上看,我們應該讓 Channel<T>本身是一個awaitable物件 ,但我還沒搞定它,算了,先趕緊釋出文章請 @vczh 幫忙點贊再說吧。
  5. Asymmetric Transfer cppcoro作者的這個blog裡的三篇文章推薦大家看看。
  6. 現在VS除錯協程程式時,在協程函式裡無法檢視函式引數(函式棧變數則可以檢視)·····希望以後能得到支援。