asio boost 非同步錯誤處理_C++20 await & asio 相關的學習
技術標籤:asio boost 非同步錯誤處理
背景:
最新版的asio(1-13-0)的api添加了對co_await的支援 (之前的版本也支援一些,只不過名稱空間還沒有正式放入asio)。
我打算學習下asio,也打算拋棄自己的網路庫了,也期望其他的庫或者driver能夠統一的使用asio+co_await,這樣我們應用程式開發者就能快速的引入庫(少一些心智負擔,不用擔心執行緒/同步/非同步/生命週期管理等各種模型差異性導致的顧慮)。[當然我也並不是說asio就是最好的選擇,但是大家都基於某個統一的庫去做開發,還是有很多好處的]。
目的:
我的目的是採用 asio+await開發一個同步非阻塞RPC(嗯,就是類似golang grpc),來替換調我的非同步RPC,進而通過它再重新開發Orleans-CPP簡易版本。
通過我個人理解,要實現同步RPC(當然也包括其他基於協程的應用)我們需要先實現幾個基礎功能):
- Channel - 用於實現協程之間的pipeline
- Mutex - 基於協程的鎖
- WaitGroup - 基於協程的等待物件
- CondVariables
- ……(帶補充)
因為同步RPC庫(支援多個協程使用同一個RPC物件去呼叫服務)的實現(個人理解,我本打看看grpc怎麼做的,但時間還不夠,所以~~~)大概如下:
開啟一個writer協程,使用channel收集其他協程(呼叫RPC函式時)產生的訊息(訊息裡包括一個request msg、一個用於接收此次請求的response的channel),開啟一個receive協程,來接收伺服器的網路訊息,並且從中解析各個respnse,並且根據其seqid (通過鎖)拿到其對應的呼叫者的channel,然後將response放入channel,以此喚醒呼叫者。
簡要實現:
下面是我寫的簡易版channel和協程鎖的實現(我這幾天才開始學習asio,所以難免有一些問題或者錯誤處理省略了,還請指教)
// // echo_server.cpp // ~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include <cstdio> #include <queue> #include <mutex> #include <iostream> #include <asio/co_spawn.hpp> #include <asio/detached.hpp> #include <asio/io_context.hpp> #include <asio/ip/tcp.hpp> #include <asio/signal_set.hpp> #include <asio/write.hpp> #include <asio/awaitable.hpp> #include <asio/redirect_error.hpp> using asio::ip::tcp; using asio::awaitable; using asio::co_spawn; using asio::detached; using asio::use_awaitable; using asio::redirect_error; namespace this_coro = asio::this_coro; template<typename T> class Channel : public asio::noncopyable { public: using Ptr = std::shared_ptr<Channel<T>>; using Container = std::queue<T>; static Ptr Make(asio::io_context& context) { struct make_shared_enabler : public Channel<T> { make_shared_enabler(asio::io_context& context) : Channel<T>(context) {} }; return std::make_shared<make_shared_enabler>(context); } asio::io_context& context() { return mContext; } auto& operator << (T value) { { std::lock_guard<std::mutex> lck(mValuesGuard); mValues.push(std::move(value)); } mTimer.cancel_one(); return *this; } asio::awaitable<T> takeOne() { while (true) { { std::lock_guard<std::mutex> lck(mValuesGuard); if (!mValues.empty()) { auto result = mValues.front(); mValues.pop(); co_return result; } } asio::error_code ec; co_await mTimer.async_wait(redirect_error(use_awaitable, ec)); } } asio::awaitable<Container> takeAll() { while (true) { { std::lock_guard<std::mutex> lck(mValuesGuard); if (!mValues.empty()) { co_return std::move(mValues); } } asio::error_code ec; co_await mTimer.async_wait(redirect_error(use_awaitable, ec)); } } private: Channel(asio::io_context& context) : mContext(context), mTimer(context) { mTimer.expires_at(std::chrono::steady_clock::time_point::max()); } private: asio::io_context& mContext; asio::steady_timer mTimer; Container mValues; std::mutex mValuesGuard; }; class CoroutineMutex : public asio::noncopyable { public: using Ptr = std::shared_ptr<CoroutineMutex>; static Ptr Make(asio::io_context& context) { struct make_shared_enabler : public CoroutineMutex { make_shared_enabler(asio::io_context& context) : CoroutineMutex(context) {} }; return std::make_shared<make_shared_enabler>(context); } asio::io_context& context() { return mContext; } awaitable<void> lock() { asio::error_code ec; while (mLocked.exchange(true)) { co_await mTimer.async_wait(redirect_error(use_awaitable, ec)); } } void unlock() { auto o = mLocked.exchange(false); assert(o); mTimer.cancel_one(); } private: CoroutineMutex(asio::io_context& context) : mLocked(false), mContext(context), mTimer(context) { } virtual ~CoroutineMutex() = default; private: std::atomic_bool mLocked; asio::io_context& mContext; asio::steady_timer mTimer; }; static awaitable<void> producer(Channel<int>::Ptr channel) { asio::steady_timer timer(channel->context()); int i = 0; while (true) { timer.expires_from_now(std::chrono::seconds(1)); asio::error_code ec; co_await timer.async_wait(redirect_error(use_awaitable, ec)); (*channel) << i++; } } static awaitable<void> consumer(Channel<int>::Ptr channel) { while (true) { { auto i = co_await channel->takeOne(); std::cout << i << std::endl; } { auto v = co_await channel->takeAll(); for (; !v.empty();) { std::cout << v.front() << std::endl; v.pop(); } } } } std::atomic<int> global = 0; static awaitable<void> testMutex(CoroutineMutex::Ptr mutex, int index, Channel<int>::Ptr channel) { asio::steady_timer timer(mutex->context()); for (int i = 0; i < 10; i++) { timer.expires_from_now(std::chrono::milliseconds(100+std::rand()%100)); asio::error_code ec; co_await timer.async_wait(redirect_error(use_awaitable, ec)); co_await mutex->lock(); global++; std::cout << "in " << index << " take global is:" << global << std::endl; (*channel) << global; mutex->unlock(); } } static awaitable<void> consumerGlobalChange(Channel<int>::Ptr channel) { while (true) { auto currentGlobalValue = co_await channel->takeOne(); std::cout << "current currentGlobalValue is :" << currentGlobalValue << std::endl; } } int main() { try { asio::io_context io_context(1); asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { io_context.stop(); }); { // 一個生產者協程和兩個消費者協程 auto channel = Channel<int>::Make(io_context); co_spawn(io_context, [=]() { return producer(channel); }, detached); co_spawn(io_context, [=]() { return consumer(channel); }, detached); co_spawn(io_context, [=]() { return consumer(channel); }, detached); } { // 構造10個協程通過互斥進行修改全域性變數 // 並在其中將全域性變數當前的值放入channel // 開啟一個協程通過channel去觀察全域性變數的變更 auto mutex = CoroutineMutex::Make(io_context); auto consumerChannel = Channel<int>::Make(io_context); for (int i = 0; i < 10; i++) { co_spawn(io_context, [=]() { return testMutex(mutex, i, consumerChannel); }, detached); } co_spawn(io_context, [=]() { return consumerGlobalChange(consumerChannel); }, detached); } io_context.run(); } catch (std::exception & e) { std::printf("Exception: %sn", e.what()); } }
效能測試:
構造一百萬個協程,並且使用一百萬個channel將它們兩兩串連起來,然後做類似擊鼓傳花的操作。從第一個協程開始輸入,直到最後一個協程收到,程式碼如下:
const static auto MaxNum = 1000000;
static awaitable<void> foo(Channel<int>::Ptr in, Channel<int>::Ptr out)
{
auto value = co_await in->takeOne();
// 將收到的值+1傳遞給下一位夥伴
(*out << (value + 1));
if (value == MaxNum)
{
std::cout << "end" << std::endl;
out->context().stop();
}
}
static awaitable<void> start(Channel<int>::Ptr out)
{
(*out << 1);
co_return;
}
auto in = Channel<int>::Make(io_context);
auto out = Channel<int>::Make(io_context);
// 開啟一個協程做初始輸入動作
co_spawn(io_context, [=]() {
return start(in);
}, detached);
for (int i = 0; i < MaxNum; i++)
{
co_spawn(io_context, [=]() {
return foo(in, out);
}, detached);
in = out;
out = Channel<int>::Make(io_context);
}
在 的垃圾I5 Win10機器上,花費8s跑完整個程式,(程式只有一個io_context,所以是單執行緒)佔滿單個CPU核心,記憶體佔用最高峰為1G。
注:asio在開啟協程後,需要在下一次排程時才執行協程,awaituv則是立馬執行協程,所以asio這種做法在我的測試中相比awaituv所花費的時間(我猜測)可能是數量級的差異,當然這也沒太大意義,在真實場景下,協程並不是執行一次就馬上結束,它通常會存活一段時間的。
(剛才寫了一個Golang的版本,發現也是立即執行,只需要1s時間就跑完測試程式。可是一旦我將初始輸入動作放在開啟擊鼓傳花的協程動作之後(也即是說哪怕它們本身立即執行,但它們無法立即完成,所以必須等待/存活),時間耗費多少呢?抱歉,我沒跑出結果就強制關閉了程式(進度也尚不可知),此時記憶體耗費在2.9G。
當然,這個測試程式並不能完全說明channel本身的效能,或許更多的是測試協程排程的效能?
疑問/注意:
- 要開發一個支援co_await的庫(類似最新版asio或者 awaituv)是非常困難的,我期望能夠有相當背景的公司開發一個通用的封裝,來支援其他開發者將非同步呼叫轉換為同步呼叫(就類似awaituv裡做過的一些事情/以及封裝,但它有bug或者功能還很簡陋·····)。 cppcoro是一個選擇,可惜它又幹了一些io/network相關事情(而且還沒幹好~~),而且~~~作者我不瞭解啊,我覺得背書不能啊)
- 如果一個函式是協程函式,但你呼叫時忘記co_await了,那就是開啟一個新協程,估計這容易成為一個坑?哈哈哈
- 我嘗試通過RAII做一個協程鎖的guard,但是建構函式裡不能co_await mutex->lock(); 相當尷尬,因為operator new、建構函式 等編譯器自動生成的函式都不是awaitable的啊~~~但如果它們是awiatable,那完蛋了,所有對他們的呼叫你基本上都得(不能忘記)加co_await。
- co_await 可以理解為一個操作符,或者把它當成一個函式?它接收的型別是awaitable,那我就不容易做鏈式呼叫啊:co_await channel >> a >> b >> c; (假設我們有一個全域性函式 awaitable<Channel<T>> operator >> (Channel<T>, T);),直觀上看,我們應該讓 Channel<T>本身是一個awaitable物件 ,但我還沒搞定它,算了,先趕緊釋出文章請 @vczh 幫忙點贊再說吧。
- Asymmetric Transfer cppcoro作者的這個blog裡的三篇文章推薦大家看看。
- 現在VS除錯協程程式時,在協程函式裡無法檢視函式引數(函式棧變數則可以檢視)·····希望以後能得到支援。