1. 程式人生 > >boost asio使用注意事項

boost asio使用注意事項

背景知識

         高效網路程式設計一般都要依賴於IO複用,IO複用是指同時傳送並監聽處理很多socket或者檔案讀寫的事件。IO複用的高效方式目前常用的有兩種:Reactor和Proactor。這兩種方式在作業系統級都是非同步和非阻塞的,也就是說使用者提交了一個請求後都可以直接返回。但是Reactor在使用者層級看來是同步的,就是在提交了一系列的操作給作業系統後,需要阻塞監聽等待事件的發生,如果有事件發生則手動呼叫相關的函式進行處理,其具體在作業系統級利用的是作業系統的事件通知介面。而Proactor在使用者看來是非同步的,他是在呼叫的時候同時註冊一個回撥函式,如果請求在作業系統級有結果了,其註冊的回撥函式就會自動呼叫。這個在作業系統級使用的aio非同步呼叫介面。

         最顯著的不同時,以TCP距離,Reactor會在核心收到TCP資料的時候通知上層應用程式,後面從核心中取出資料並呼叫處理函式處理用使用者完成(什麼時候,怎麼處理)。Proactor會在TCP收到資料後由核心將資料拷貝到使用者指定的空間,然後立即呼叫註冊的回撥函式進行處理。

         看起來Proactor會明顯比Reactor簡單和快速,但是由於工程原因,這個也是不一定的。

介紹

將整個非同步平臺抽象成boost::asio::io_service,想要使用asio都要先建立這個物件。非同步平臺上可以使用很多元件,比如boost::asio::ip::tcp::socket,這些元件又有各自的方法。但是過程是統一的:(asio可以執行同步和非同步兩種呼叫)

對於同步的呼叫。呼叫socket.connect(server_endpoint),或者其他隊遠端互動的方法。請求會首先發送給io_service,io_service會呼叫作業系統的具體方法,然後返回結果到io_service,io_service會通知到上層使用者元件。錯誤用異常通知(可以阻止),正確用返回值。

對於非同步呼叫。在元件呼叫io_service執行命令的同時要提供一個回撥函式,可以同時釋出多個非同步請求,所有的返回結果都會放在io_service的佇列裡儲存。程序呼叫io_service::run()會逐個的拿出儲存在佇列裡的請求呼叫提前傳入的回撥函式進行處理。

I/O物件是用來完成實際功能的元件,有多種物件 :

boost::asio::ip::tcp::socket

boost::asio::ip::tcp::resolver

boost::asio::ip::tcp::acceptor

boost::asio::local::stream_protocol::socket本地連線

boost::asio::posix::stream_descriptor 面向流的檔案描述符,比如stdout,stdin

boost::asio::deadline_timer 定時器

boost::asio::signal_set 訊號處理

這些物件大部分需要io_service來初始化。還有一個用於控制io_service生命週期的work類,和用來儲存資料的buffer類。

io_service

run() vs poll()

run()和poll()都迴圈執行I/O物件的事件,區別在於如果事件沒有被觸發(ready),run()會等待,但是poll()會立即返回。也就是說poll()只會執行已經觸發的I/O事件。

比如I/O物件socket1,socket2, socket3都綁定了socket.async_read_some()事件,而此時socket1、socket3有資料過來。則呼叫poll()會執行socket1、socket3相應的handler,然後返回;而呼叫run()也會執行socket1和socket3的相應的handler,但會繼續等待socket2的讀事件。

stop()

呼叫 io_service.stop() 會中止 run loop,一般在多執行緒中使用。

post() vs dispatch()

post()和dispatch()都是要求io_service執行一個handler,但是dispatch()要求立即執行,而post()總是先把該handler加入事件佇列。

什麼時候需要使用post()?當不希望立即呼叫一個handler,而是非同步呼叫該handler,則應該呼叫post()把該handler交由io_service放到事件佇列裡去執行。比如,Boost.Asio自帶的聊天室示例,其中實現了一個支援非同步IO的聊天室客戶端,是個很好的例子。

chat_client.cpp 的write()函式之所以要使用post(),是為了避免臨界區同步問題。write()呼叫和do_write()裡async_write()的執行分別屬於兩個執行緒,前者會往write_msgs_裡寫資料,而後者會從write_msgs_裡讀資料,如果不使用post(),而直接呼叫do_write(),顯然需要使用鎖來同步write_msgs_。但是使用post()相當於由io_service來排程write_msgs_的讀寫,這就在一個執行緒內完成,無需額外的鎖機制。

work

work類用於通知io_service是否可以結束,只要物件work(io_service)存在,io_service就不會結束。所以work類用起來更像是一個標識,比如:

boost::asio::io_serviceio_service;

boost::asio::io_service::work*work = new boost::asio::io_service::work( io_service );

// deletework; // 如果不註釋掉這一句,則run loop不會退出;一般用shared_ptr維護work物件,使用work.reset()來結束其生命週期。

io_service.run()

buffer類

buffer類分mutable_buffer和const_buffer兩個類,buffer類特別簡單,僅有兩個成員變數:指向資料的指標 和 相應的資料長度。buffer類本身並不申請記憶體,只是提供了一個對現有記憶體的封裝。

需要注意的是,所有async_write()、async_read()之類函式接受的buffer型別是MutableBufferSequence / ConstBufferSequence,這意味著它們既可以接受boost::asio::buffer,也可以接受std::vector<boost::asio::buffer> 這樣的型別。

緩衝區管理

緩衝區的生命期是使用asio最需要重視的兩件事之一,緩衝區之所以需要重視的原因在於Asio非同步呼叫Reference裡的這段描述:

Althoughthe buffers object may be copied as necessary, ownership of the underlyingmemory blocks is retained by the caller, which must guarantee that they remainvalid until the handler is called.

這意味著緩衝區從發起非同步呼叫到handler被執行,這段時間內需要交由io_service控制,這個限制常常導致asio的某些程式碼變得可能比Reactor相應程式碼還要麻煩一些。

還是舉上面聊天室的那個例子。chat_client.cpp的do_write()函式收到使用者輸入資料後,之所以把該資料儲存到std::deque<std::string> write_msgs_ 佇列,而不是存到類似chardata[]的數組裡,然後去呼叫async_write(..data..)傳送資料,是為了避免這種情況:輸入資料速度過快,當上一次async_write()呼叫的handler還沒有來得及處理,又收到一份新的資料,如果直接儲存到data,會導致覆蓋上一次async_write()的緩衝區。async_write()要求這個緩衝區從呼叫async_write()開始,直到handler處理這個時間段是不變的。

同樣的,在do_write()函式裡呼叫async_write()函式之前,先判斷write_msgs_佇列是否為空,也是為了保證async_write()總是從write_msgs_佇列頭取得有效的資料,而在handle_write()裡當資料傳送完畢後,再pop_front()彈出已經發送的資料包。以此避免出現前一個async_write()的handler還沒執行完畢,就把佇列頭彈出去,導致對應的緩衝區失效問題。

這裡主要還是因為async_write()和async_read()的區別,前者是主動發起的,後者可以由io_service控制,所以後者不用擔心這種緩衝區被覆蓋問題。因為在同一個執行緒裡,哪怕需要讀取的事件觸發得再快,也需要由io_service逐一處理。

在這個聊天室的例子裡,如果不考慮把資料按使用者輸入順序傳送出去的話,可以使用更簡單的辦法來處理do_write()函式,例如:

:::c++

voiddo_write(chat_message msg)

{

    chat_message* pmsg = new chat_message(msg);// implement copy ctor for chat_message firstly

    boost::asio::async_write(socket_,

           boost::asio::buffer(pmsg->data(), pmsg->length()),

            boost::bind(&chat_client::handle_write,this,

                   boost::asio::placeholders::error, pmsg));

}

voidhandle_write(const boost::system::error_code& error, chat_message* pmsg)

{

    if (!error) {

    }else{

        do_close();

    }

    delete pmsg;

}   

這裡相當於給每個非同步呼叫分配一塊屬於自己的記憶體,非同步呼叫完成即自動釋放掉,有些類似於閉包了。如果不希望頻繁new/delete記憶體,也可以考慮使用boost::circular_buffer一次性分配記憶體後逐項使用。

I/O物件

socket

Boost.Asio最常用的物件應該就是socket了,常用的函式一般有這幾個:

讀寫TCP socket的時候,一般使用read(),async_read(), write(), async_write(),為了避免所謂的short readsand writes,一般不使用receive(), async_receive(), send(), async_send()。

讀寫有連線的UDP socket的時候,一般使用receive(),async_receive(), send(), async_send()。

讀寫無連線的UDP socket的時候,一般使用receive_from(),async_receive_from(), send_to(), async_send_to()。

而自由函式boost::asio::async_write()和類成員函式socket.async_write_some()的有什麼區別呢(boost::asio::async_read()和socket.async_read_some()類似):

boost::asio::async_write()非同步寫,立即返回。但它可以保證寫完整個緩衝區的內容,否則將報錯。boost::asio::async_write() 是通過呼叫n次socket.async_write_some()來實現的,所以程式碼必須確保在boost::asio::async_write()執行的時候,沒有其他的寫操作在同一socket上執行。在呼叫boost::asio::async_write()的時候,如果指定buffer的length沒有寫完或出錯,是不會回撥相應的handler的,它將一直在run loop中執行;直到buffer裡所有的資料都寫完或出錯(此時handler裡返回的長度肯定會小於buffer length),才會呼叫handler繼續處理;而socket.async_write_some()不會有這樣的問題,它只會嘗試寫一次,寫完的長度會在handler的引數裡返回。

所以,這裡強調使用asio時第二件需要重視的事情,就是handler的返回值(一般可能宣告為boost::asio::placeholders::error)。因為asio裡所有的任務都由io_service非同步執行,只有執行成功或者失敗之後才會回撥handler,所以返回值是你瞭解當前非同步操作狀況的唯一辦法,記住不要忽略任何handler的返回值處理。

訊號處理

Boost.Asio的訊號處理非常簡單,宣告一個訊號集合,然後把相應的非同步handler綁上就可以了。如果你希望在一個訊號集裡處理所有的訊號,那麼你可以根據handler的第二個引數,來獲取當前觸發的是那個訊號。比如:

boost::asio::signal_set signals(io_service,SIGINT, SIGTERM);

signals.add(SIGUSR1); // 也可以直接用add函式新增訊號

signals.async_wait(boost::bind(handler, _1,_2));

void handler(

  constboost::system::error_code& error,

  intsignal_number // 通過這個引數獲取當前觸發的訊號值

);

定時器

Boost.Asio的定時器用起來根訊號集一樣簡單,但由於它太過簡單,也有不方便的地方。比如,在一個UDP伺服器裡,一般收到的每個UDP包中都會包含一個sequence number,用於標識該UDP,以應對包處理超時情況。假設每個UDP包處理時間只有100ms,如果超時則直接給客戶端返回超時標記。這種最簡單的定時器常用的一些Reactor框架都有很完美的解決方案,一般是建一個定時器連結串列來實現,但是Asio中的定時器沒法單獨完成這個工作。

boost::asio::deadline_timer只有兩種狀態:超時和未超時。所以,只能很土的對每個UDP包建立一個定時器,然後藉助std::map和boost::shared_ptr儲存sequence number到定時器的對映,根據定時器handler的返回值判斷該定時器是超時,還是被主動cancel。

strand

在多執行緒中,多個I/O物件的handler要訪問同一塊臨界區,此時可以使用strand來保證這些handler之間的同步。

示例:

我們向定時器註冊 func1 和 func2,它們可能會同時訪問全域性的物件(比如 std::cout )。這時我們希望對 func1 和 func2 的呼叫是同步的,即執行其中一個的時候,另一個要等待。

這時就可以用到boost::asio::strand 類,它可以把幾個cmd包裝成同步執行的。例如,我們向定時器註冊 func1 和 func2 時,可以改為:

boost::asio::strand  the_strand;

t1.async_wait(the_strand.wrap(func1));      //包裝為同步執行的

t2.async_wait(the_strand.wrap(func2));

這樣就保證了在任何時刻,func1 和 func2 都不會同時在執行。

還有就是如果你希望把一個io_service物件繫結到多個執行緒。此時需要boost::asio::strand來確保handler不會被同時執行,因為非同步操作,比如async_write、async_receive_from之類會影響到臨界區buffer。

具體可參考asio examples裡的示例:HTTPServer 2和HTTP Server 3的connection.hpp設計。