1. 程式人生 > 其它 >asio核心概念和功能

asio核心概念和功能

原因

大多數程式以某種方式與外界互動,無論是通過檔案、網路、序列電纜還是控制檯。 有時,就像網路一樣,單個 I/O 操作可能需要很長時間才能完成。 這對應用程式開發提出了特殊的挑戰。

Boost.Asio 提供了管理這些長時間執行的操作的工具,而無需程式使用基於執行緒和顯式加鎖的併發模型。

Boost.Asio 庫適用於使用 C++ 進行系統程式設計的程式設計師,這些程式設計師通常需要訪問作業系統功能,例如網路。 特別是,Boost.Asio 解決了以下目標:

  • 可移植性。該庫支援一系列常用的作業系統,併為這些作業系統提供一致性的行為。
  • 伸縮性。該庫應用來開發能擴充套件到數千個併發連線的網路應用程式。每個作業系統的庫實現應該使用最能實現這種可伸縮性的機制。
  • 高效。該庫應支援分散-聚集 I/O 等技術,並允許程式最大限度地減少資料複製。
  • 模型概念來自已有的API,如BSD套接字。BSD套接字API被廣泛實現和理解,並在很多文獻都有涉及。其他程式語言通常使用相似的網路介面API。在合理的情況下,Boost.Asio應該利用現有的做法。
  • 易用。該庫應該採用提供工具包而不是框架的方法為新使用者提供較低的入門門檻。也就是說,它應該儘量減少預先在時間上的投資,只學習一些基本的規則和指導方針。在此之後,庫使用者應該只需要瞭解正在使用的特定函式。
  • 進一步抽象的基礎。該庫應該允許能夠開發提供更高級別抽象的庫。例如,常用協議HTTP的實現。

儘管 Boost.Asio 一開始主要關注網路,但它的非同步 I/O 概念已經擴充套件到包括其他作業系統資源,例如串列埠、檔案描述符等。

核心概念和功能

Asio的基本架構

Asio可用於對IO物件(如套接字)執行同步和非同步操作。在使用 Boost.Asio 之前,瞭解一下 Boost.Asio 的各個部分、您的程式以及它們如何協同工作的概念圖可能會很有用。

作為入門級的示例,讓我們考慮在套接字上執行連線操作時會發生什麼。 我們將從檢查同步操作開始。

你的程式中應該至少有一個IO執行上下文(I/O Execution Context),例如一個boost::asio::io_context物件,boost::asio::thread_pool物件或者boost::asio::system_context物件。I/O可執行上下文代表了你的程式與作業系統的I/O服務的連結。

boost::asio::io_context io_context;

為了執行I/O操作,你的程式需要一個I/O物件,例如TCP套接字:

boost::asio::ip::tcp::socket socket(io_context);

當執行一個同步的連線操作時,會發生下面的事件時序:

  1. 你的程度通過呼叫I/O物件來啟動連線操作。
socket.connect(server_endpoint);
  1. IO物件將請求轉發到IO execution context。
  2. IO execution context呼叫作業系統來執行連線操作。
  3. 作業系統返回執行結果給IO execution context。
  4. IO execution context將任何執行的錯誤結果轉為boost::system::error_code型別。error_code可以與特定值進行比較,或作為布林值進行測試(為false表示沒有發生錯誤)。結果轉發會IO物件。
  5. 如果操作失敗,IO物件丟擲一個boost::system::system_error型別的異常。如果初始化操作的程式碼被寫為:
boost::system::error_code ec;
socket.connect(server_endpoint,ec);

​ 那麼error_code型別變數ec將會被設定為執行結果,不會丟擲異常。

當使用非同步操作的時候,發生的事件時序有所不同。

  1. 你的程式通過呼叫IO物件啟動連線操作:
socket.async_connect(server_endpoint,completion_handler);

​ completion_handler是一個函式並且函式簽名如下:

void completion_handler(const boost::system::error_code ec);

​ 所需的確切簽名取決於正在執行的非同步操作。參考文件指出了每個操作的適當形式。

  1. IO物件轉發請求到IO execution context。
  2. IO execution context向作業系統傳送訊號,表示它應該啟動一個非同步連線。不會阻塞在連線操作處。 (在同步情況下,此等待將完全包含在連線操作的持續時間內。)
  1. 作業系統通過將結果放入佇列,準備被IO execution context取走來指示連線操作已經完成。
  2. 當使用io_context作為IO execution context,你的程式必須呼叫io_context::run()(或一個類似io_context的成員函式)來獲取結果。呼叫io_context::run(),只要還有未完成的非同步操作就會阻塞,所以只要你啟動了你的第一個非同步操作你就應該立即呼叫。
  3. 在呼叫io_context::run()時,IO execution context取出執行結果,將其轉為error_code,然後傳遞給completion handler。

Proactor設計模式:無執行緒的併發

Asio為同步操作和非同步操作提供的並行支援。非同步操作是基於Proactor設計模式。與只有同步和Reactor方式相比,這種模式的優缺點如下所述:

Proactor與Asio

我們先了解一下Proactor在Asio中是如何實現的,但不設計具體的細節。

  • Asynchronous Operation(非同步操作):定義一個非同步執行的操作,例如非同步讀或寫一個套接字。
  • Asynchronous Operation Processor(非同步操作處理器):執行非同步操作,並且當操作完成時在完成事件佇列上對事件進行排隊。
  • Completion Event Queue(完成事件佇列):快取完成事件直到他們被非同步事件分發器取走。
  • Completion Handler(完成處理器):處理非同步操作的結果。這些是函式物件,通常是使用boost::bind建立的。
  • Asynchronous Event Demultiplexer(非同步事件分配器):阻塞等待完成事件佇列上有事件出現,並將完成的時間返回給呼叫者。
  • Proactor:呼叫非同步事件分配器來取出事件,然後排程與事件關聯的完成處理器(即呼叫函式物件)。抽象為io_context類。
  • Initiator(啟動器):啟動非同步操作的應用程式程式碼。Initiator通過高階介面(如basic_stream_socket)與非同步操作處理器互動,該介面反過來委託給像reactive_socket_service的服務。

使用Reactor實現

在許多平臺,Asio使用Reactor實現Proactor設計模式,例如select,epoll或者kqueue。這種實現方法對應的Proactor設計模式如下:

  • Asynchronous Operation Processor:Reactor實現使用select,epoll或kqueue。當Reactor指示執行操作的資源準備好時,處理器執行非同步操作並將相關的完成處理器入隊到完成事件佇列中。
  • Completion Event Queue:一個完成處理器的連結串列。
  • Asynchronous Event Demultiplexer:這個是通過等待一個事件或條件變數直到完成事件佇列中有一個完成處理器可用實現的。

使用Windows Overlapped I/O實現

在Windows NT,2000和XP。Asio利用Overlapped I/O的優點提供了高效的Proactor設計模式的實現。這種實現方式對應的Proactor設計模式如下:

  • Asynchronous Operation Processor:這是通過作業系統實現的。操作是通過呼叫諸如 AcceptEx 之類的重疊函式來啟動的。
  • Completion Event Queue:這是由作業系統實現的,並與I/O完成埠相關聯。每個io_context例項都有一個I/O完成埠。
  • Asynchronous Event Demultiplexer:由 Asio 呼叫以使事件及其關聯的完成處理程式出列。

優點

  • 可移植性。許多作業系統提供本機非同步 I/O API(例如Windows上的重疊 I/O )作為開發高效能網路應用程式的首選選項。該庫可以根據本機非同步 I/O 來實現。但是,如果本機支援不可用,也可以使用代表 Reactor 模式的同步事件多路分解器(例如POSIX select() 來實現該庫。
  • 將執行緒和併發解耦。長時間操作是由代表應用程式的實現來非同步執行的,因此,應用程式不需要產生許多執行緒來增加併發性。
  • 高效和可伸縮性。由於增加了 CPU 之間的上下文切換、同步和資料移動,諸如thread-per-connect(僅同步方法需要)之類的實現策略可能會降低系統性能。 通過非同步操作,可以通過最小化作業系統執行緒的數量並且只啟用有事件要處理的邏輯控制執行緒來避免上下文切換的成本。
  • 簡化應用程式同步。可以編寫非同步操作完成處理程式,就好像它們存在於單執行緒環境中一樣,因此可以在開發應用程式邏輯時幾乎不關心同步問題。
  • 函式組合。函式組合是指實現以提供更高階操作的函式,例如以特定格式傳送訊息。 每個函式都是對較低級別的讀取或寫入操作的多次呼叫來實現的。例如,考慮這樣一種協議,其中每個訊息由固定長度的報頭和可變長度的正文組成,其中正文的長度在報頭中指定。假設的讀取訊息操作可以使用兩個較低級別的讀取來實現,第一個讀取用於接收訊息頭,一旦長度已知,第二個讀取用於接收訊息體。要在非同步模型中組合函式,可以將非同步操作連結在一起。也就是說,一個操作的完成處理程式可以啟動下一個操作。啟動鏈中的第一個呼叫可以被封裝,這樣呼叫者就不需要知道高階操作是作為非同步操作鏈實現的。以這種方式組合新操作的能力簡化了在網路庫之上的更高抽象級別的開發,例如支援特定協議的函式。

缺點

  • 程式複雜度。由於操作啟動和完成之間的時間和空間分離,使用非同步機制開發應用程式更加困難。 由於反向控制流,應用程式也可能更難除錯。

  • 記憶體利用率。在讀或寫操作期間必須提交緩衝區空間,這可能會無限期地持續下去,並且每個併發操作都需要一個單獨的緩衝區。 另一方面,Ractor模式在套接字準備好讀取或寫入之前不需要緩衝區空間。

執行緒和Asio

執行緒安全

一般來說,併發使用不同的物件是安全的,但是併發使用單個物件是不安全的。但是,諸如io_context之類的型別提供了強力的保證,即併發使用單個物件是安全的。

執行緒池

多個執行緒可以呼叫 io_context::run() 來設定一個執行緒池,可以從中呼叫完成處理器。這種方法也可以與post()一起使用,作為跨執行緒池執行任意計算任務的方法。

請注意,所有加入io_context的執行緒都被認為是等價的,io_context可以以任意的方式在它們之間分發工作。

內部執行緒

這個庫對於特定平臺的實現可以使用一個或多個內部執行緒來模擬非同步性。這些執行緒對於庫的使用者必須儘可能不可見。特別的,這些執行緒:

  • 一定不能直接呼叫使用者程式碼
  • 必須遮蔽所有訊號

此方法由以下保證補充:

  • 非同步完成處理器只會被當前正在呼叫 io_context::run() 的執行緒呼叫。

因此,庫使用者有責任建立和管理通知將傳送到的所有執行緒。

採用這種方法的原因包括:

  • 通過僅從單個執行緒呼叫io_context::run(),使用者程式碼可以避免與同步相關的開發複雜度。例如,庫使用者可以實現單執行緒的可擴充套件伺服器(從使用者角度看)。
  • 庫使用者可能需要線上程啟動後不久和任何其他應用程式程式碼執行之前線上程中執行初始化。例如,Microsoft 的 COM 使用者必須先呼叫 CoInitializeEx,然後才能從該執行緒呼叫任何其他 COM 操作。
  • 庫介面與執行緒建立和管理的介面解耦,並允許線上程不可用的平臺上實現。

Strands:使用執行緒而不使用顯式加鎖

Strands被定義為事件處理程式的嚴格呼叫。使用Strands允許在多執行緒程式中執行程式碼而無需顯式加鎖。

Strands可以是顯式的,也可以是隱式的。如下面的例子:

  • 只在一個執行緒呼叫io_context::run()意味著所有的事件處理器在一個隱式strand中,因為io_context保證處理器只能從run()中呼叫。
  • 如果存在與連線相關聯的單個非同步操作鏈(例如,在像 HTTP 這樣的半雙工協議實現中),則不可能同時執行處理器。 這是一個隱含的鏈。
  • 顯式的strand是strand<>或者io_context::strand的例項。所有的事件處理器函式物件需要使用boost::asio::bind_executor繫結到strand上,或者使用strand物件進行釋出/排程。

在非同步操作組合的情況下,像async_read()或者async_read_until(),如果一個完成處理器經過了一個strand,那麼所有的中間處理器也應該經過同樣的strand。這是確保能夠執行緒安全地訪問呼叫者和組合操作之間共享物件所必要的(在async_read()的情況下,它是套接字,呼叫者可以close()取消操作)。

為了實現這一點,所有的非同步操作通過使用get_associated_executor函式獲取處理器相關的執行器。例如:

boost::asio::associated_executor_t<Handler> a = boost::asio::get_associated_executor(h);

相關的執行器必須滿足Executor要求。非同步操作使用它來提交中間和最終處理器以供執行。

可以通過指定巢狀型別executor_type和成員函式get_executor為特定處理器型別自定義執行器:

class my_handler {
public:
  //Executor 型別要求的自定義實現。
  typedef my_executor executor_type;
  //返回一個自定義執行器的實現
  executor_type get_executor() const noexcept {
    return my_executor();
  }
  void operator()() {...}
};

在更復雜的情況下,associated_executor模板可能會直接部分特化:

//處理器
struct my_handler {
  void operator()() {...}  
};

namespace boost {namespace asio {
    //特化associator_executor模板
    template<class Executor>
    struct associated_executor<my_handler,Executor> {
      //Executor 型別要求的自定義實現。
      typedef my_executor type;
      //返回一個自定義執行器的實現
      static type get(const my_handler&,const Executor&=Executor()) noexcept {
          return my_executor();
      }  
    };
}}

boost::asio::bind_executor()函式用來將特定的executor物件(像strand)繫結到一個完成處理器上。這個繫結會自動關聯一個執行器。例如,為了將strand繫結到一個完成處理器上,我們可以簡單地寫為:

my_socket.async_read_some(my_buffer, 
    boost::asio::bind_executor(my_strand,[](error_code ec,size_t length) {
    	//....
}));

Buffers

從根本上說,I/O涉及在記憶體的連續區域(稱為緩衝區)之間傳輸資料。

這些緩衝區可以簡單地表示為由一個指標和一個位元組大小組成的元組。但是,為了開發高效的網路應用程式,Asio包括對分散-聚集操作的支援。這些操作涉及一個或多個緩衝區。

  • 分散讀將資料讀取到多個緩衝區
  • 聚集寫,傳輸多個緩衝區

因為,我們需要一個表示緩衝區集合的抽象。Asio使用的方法就是定義一個(實際上是兩個)來表示單個緩衝區。這些可以儲存在一個容器中,而該容器可以傳遞給分散-聚集操作。

除了將緩衝區指定為指標和位元組大小之外,Boost.Asio 還區分了可修改記憶體(稱為可變)和不可修改記憶體(後者是從 const 限定變數的儲存中建立的)。 因此,這兩種型別可以定義如下:

typedef std::pair<void*,std::size_t> mutable_buffer;
typedef std::pair<const void*,std::size_t> const_buffer;

mutable_buffer可以轉為const_buffer,但是不能反過來轉換。

但是,Asio並沒有使用上面的定義,而是定義了兩個類mutable_bufferconst_buffer。其目的是提供連續記憶體的不透明表示,其中:

  • 型別的轉換行為與std::pair定義方式的表現一樣。也就是說mutable_bufer可以轉為const_buffer,但是不能反過來。
  • 有防止緩衝區溢位的保護。給定一個緩衝區例項,使用者只能建立表示相同記憶體範圍或其子範圍的另一個緩衝區。為了提供進一步的安全性,該庫還包括用於從陣列中自動確定緩衝區大小的機制,POD元素的boost::arraystd::vector,或來自std::stirng
  • 使用data()顯式訪問底層的記憶體。通常來說,應用程式不需要這樣做,但是庫的實現需要傳遞原始記憶體給底層作業系統函式。

最後,多個buffer可以通過將buffer物件放入容器中傳給分散-聚集操作(像read()write())。定義了MutableBufferSequenceConstBufferSequence概念,以便可以使用像std::vector,std::list,std::array,boost::array的容器。

與iostreams整合的streambuf

boost::asio::basic_streambuf派生自std::basic_streambuf以將輸入序列和輸出序列與某種字元陣列型別的一個或多個物件相關聯,這些物件的元素儲存任意值。這些字元陣列物件在 streambuf 物件內部,但提供了對陣列元素的直接訪問,以允許它們與 I/O 操作一起使用,例如套接字的傳送或接收操作:

  • streambuf的輸入序列可以通過data()成員函式訪問。該函式的返回型別滿足ConstBufferSequence的要求。
  • streambuf的輸出序列可以通過prepare()成員函式訪問。函式的返回型別滿足MutableBufferSequence要求。
  • 通過呼叫commit()成員函式,資料從輸出序列的前面傳輸到輸入序列的後面。
  • 通過呼叫consume() 成員函式從輸入序列的前面刪除資料。

streambuf 建構函式接受一個size_t引數,指定輸入序列和輸出序列的大小之和的最大值。 如果成功,任何將內部資料增長超過此限制的操作都將丟擲 std::length_error 異常。

按位元組順序遍歷緩衝區序列

buffers_iterator<> 類模板允許遍歷緩衝區序列(即滿足 MutableBufferSequenceConstBufferSequence要求的型別),就好像它們是連續的位元組序列一樣。還提供了稱為 buffers_begin()buffers_end() 的輔助函式,其中會自動推匯出 buffers_iterator<> 模板引數。

舉個例子,從套接字中讀取一行放入std::string,可以寫為:

boost::asio::streambuf sb;
...
std::size_t n = boost::asio::read_until(sock,sb,'\n');
boost::asio::streambuf::const_buffers_type bufs = sb.data();
std::string line(
	boost::asio::buffers_begin(bufs),
    boost::asio::buffers_begin(bufs)+n
);

Buffer debugging

一些標準庫的實現,比如微軟Visual c++ 8.0及更高版本附帶的庫,提供了一個稱為迭代器除錯的特性。這意味著在執行時檢查迭代器的有效性。如果程式嘗試使用已失效的迭代器,則會觸發斷言。 例如:

std::vector<int> v(1);
std::vector<int>::iterator i = v.begin();
v.clear(); //使迭代器無效
*i=0; //斷言

Asio利用了這一特性,加入到了buffer的debugging。考慮下面的程式碼:

void dont_do_this() {
    std::string msg = "Hello,world!";
    boost::asio::async_write(sock,boost::asio::buffer(msg),my_handler);
}

當您呼叫非同步讀取或寫入時,您需要確保操作的緩衝區在呼叫完成處理器之前有效。在上面的例子中,緩衝區是 std::string 變數 msg。這個變數在堆疊上,所以它在非同步操作完成之前就超出了範圍。如果你很幸運,那麼應用程式會崩潰,但更有可能出現隨機故障。

啟用緩衝區除錯時,Asio 將迭代器儲存到string中,直到非同步操作完成,然後解引用它以檢查其有效性。在上面的示例中,您將在 Asio 嘗試呼叫完成處理器之前觀察到斷言失敗。

當定義_GLIBCXX_DEBUG時,此功能會自動適用於 Microsoft Visual Studio 8.0 或更高版本以及 GCC。此檢查會產生效能成本,因此緩衝區除錯僅在除錯版本中啟用。對於其他編譯器,它可以通過定義 BOOST_ASIO_ENABLE_BUFFER_DEBUGGING 來啟用。 它也可以通過定義 BOOST_ASIO_DISABLE_BUFFER_DEBUGGING 來顯式禁用。

Streams,Short Read and Short Writes

Asio的許多I/O物件是面向流。這就意味著:

  • 沒有訊息邊界。資料時作為連續的位元組序列傳輸的。
  • 讀或寫操作可能傳輸的位元組比要求的更少。這被稱為短讀(short read)或短寫(short write)。

提供面向流的 I/O 模型的物件具有以下一種或多種型別要求:

  • SyncReadStream:其中使用名為read_some() 的成員函式執行同步讀取操作。
  • AsyncReadStream,其中使用名為 async_read_some() 的成員函式執行非同步讀取操作。
  • SyncWriteStream,其中使用名為write_some()的成員函式執行同步寫入操作。
  • AsyncWriteStream,其中使用名為 async_write_some() 的成員函式執行非同步寫入操作。

面向流的IO物件的例子包括ip::tcp::socket,ssl::stream<>,posix::stream_descriptor,windows::stream_handle等等。

程式通常希望傳輸確切數量的位元組。 當發生短讀或短寫時,程式必須重新開始操作,並繼續這樣做,直到傳輸了所需的位元組數。 Asio 提供了自動執行此操作的通用函式:read()async_read()write()async_write()

為什麼EOF是錯誤

  • 流的結尾會導致read、async_read、read_untilasync_read_until 函式違反它們的約定。 例如。 由於 EOF,N 個位元組的讀取可能會提前完成。
  • EOF 錯誤可用於區分流的結束和成功讀取了0位元組大小的資料。

Reactor風格的操作

有時,程式必須與想要自己執行 I/O 操作的第三方庫整合。為促進這一點,Asio 的同步和非同步操作可用於等待套接字準備好讀取、準備寫入或具有掛起的錯誤條件。

舉個例子,執行非阻塞讀:

ip::tcp::socket socket(my_io_context);
...
socket.non_blocking(true);
...
socket.async_wait(ip::tcp::socket::wait_read,read_handler);
...
void read_handler(boost::system::error_code ec) {
    if(!ec) {
        std::vector<charA> buf(socket.available());
        socket.read_some(buffer(buf));
    }
}

所有平臺上的套接字和 POSIX 面向流的描述符類都支援這些操作。

基於行的操作

許多常用的 Internet 協議都是基於行的,這意味著它們具有由字元序列“\r\n”分隔的協議元素。例如HTTP,SMTP,FTP。為了更容易地實現基於行的協議以及其他使用分隔符的協議,Asio 提供了包括read_until() async_read_until()的函式。

下面例子說明了async_read_until()在HTTP伺服器中的使用,用來接收來自客戶端的HTTP請求的第一行:

class http_connection {
  ...
  void start() {		boost::asio::async_read_until(socket_,data_,"\r\n",boost::bind(&http_connection::handle_request_line,this,_1));
  }
  void handle_request_line(boost::system::error_code ec) {
      if(!ec) {
          std::string method, uri, version;
          char sp1,sp2,cr,lf;
          std::istream is(&data_);
          is.unsetf(std::ios_base::skipws);
          is >> method >>sp1 >> uri >> sp2 >>version >> cr >> lf;
          ...
      }
  }
  ...
  boost::asio::ip::tcp::socket socket_;
  boost::asio::streambuf data_;
};

streambuf 資料成員用作儲存在搜尋分隔符之前從套接字讀取的資料的地方。重要的是要記住,分隔符之後可能還有其他資料。 這個多餘的資料應該留在流緩衝中,以便後續呼叫read_until()async_read_until() 可以檢查它。

分隔符可以指定為單個charstd::stringboost::regexread_until()async_read_until() 函式還包括接受稱為匹配條件的使用者定義函式物件的過載。 例如,要將資料讀入流緩衝直到遇到空格:

typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> iterator;
std::pair<iterator,bool> match_whitespace(iterator begin,iterator end) {
    iterator i = begin;
    while(i!=end) {
        if(std::isspace(*i++))
            return std::make_pair(i,true);
     return std::make_pair(i,false);
    }
}
...
boost::asio::streambuf b;
boost::asio::read_until(s,b,match_whitespace);

將資料讀入流緩衝區直到找到匹配的字元為止。

class match_char
{
public:
    explicit match_char(char c) : c_(c) {}
    
    template <typename Iterator>
    std::pair<Iterator,bool>operator()(Iterator begin,iterator end) const 
    {
        Iterator i = begin;
        while(i != end) 
            if(c_ == *i++)
                return std::make_pair(i,true);
        return std::make_pair(i,false);
    }
private:
    char c_;
};
namespace boost 
{
    namespace asio
    {
        template<> struct is_match_condition<match_char> : public boost::true_type {}
    }
}
....
boost::asio::streambuf b;
boost::asio::read_until(s,b,match_char('a'));

對於函式和具有巢狀 result_type 型別定義的函式物件, is_match_condition<> 型別特徵自動計算為真。對於其他型別,特徵必須明確特化,如上所示。

自定義記憶體分配器

許多同步操作需要申請一個物件來儲存與操作相關的狀態。例如,Win32 實現需要將 OVERLAPPED 派生物件傳遞給 Win32 API 函式。

並且,程式通常包含易於識別的同步操作鏈。半雙工協議實現(例如 HTTP 伺服器)每個客戶端都有一個操作鏈(接收後傳送)。一個全雙工協議實現有兩條並行執行的鏈。程式應該能夠利用這些知識為鏈中的所有非同步操作重用記憶體。

給定一個使用者定義的Handler物件h的拷貝,如果實現需要分配與該Handler關聯的記憶體,它將使用 get_related_allocator 函式獲取分配器。例如:

boost::asio::associated_allocator_t<Handler> a = boost::asio::get_associated_allocator(h);

關聯的分配器必須滿足標準的分配器的標準。

預設情況下,處理器使用標準分配器(用::oprator new()::operator delete()實現)。可以通過指定巢狀型別 allocator_type 和成員函式 get_allocator() 為特定處理其型別定製分配器:

class my_handler
{
public:
  // 分配器型別要求的自定義實現.
  typedef my_allocator allocator_type;

  // 返回一個自定義的分配器實現
  allocator_type get_allocator() const noexcept
  {
    return my_allocator();
  }

  void operator()() { ... }
};

在大多數複雜長江下,會直接對associated_allocator模板進行特化:

namespace boost 
{ 
    namespace asio 
    {

      template <typename Allocator>
      struct associated_allocator<my_handler, Allocator>
      {
        // Custom implementation of Allocator type requirements.
        typedef my_allocator type;

        // Return a custom allocator implementation.
        static type get(const my_handler&,
            const Allocator& a = Allocator()) noexcept
        {
          return my_allocator();
        }
  };

	} 
} // namespace boost::asio

該實現保證釋放將在呼叫關聯的處理器之前發生,這意味著記憶體已準備好重新用於處理器啟動的任何新非同步操作。

可以從呼叫庫函式的任何使用者建立的執行緒呼叫自定義記憶體分配函式。 該實現保證,對於包含在庫中的非同步操作,該實現不會對該處理器的記憶體分配函式進行併發呼叫。 如果需要從不同執行緒呼叫分配函式,在實現時將插入適當的記憶體屏障以確保正確的記憶體可見性。

Handler追蹤

為了幫助除錯非同步程式,Asio提供了對Handler追蹤的支援。通過定義BOOST_ASIO_ENABLE_HANDLER_TRACKING開啟,Asio將除錯輸出寫入標準錯誤流。輸出記錄非同步操作及其和Handler之間的關係。

此功能在除錯時很有用,您需要知道非同步操作是如何連結在一起的,或者掛起的非同步操作是什麼。下面是執行HTTP伺服器示例時的輸出,處理單個請求,然後通過Ctrl+C關閉:

@asio|1589424178.741850|0*1|[email protected]_wait
@asio|1589424178.742593|0*2|[email protected]_accept
@asio|1589424178.742619|.2|non_blocking_accept,ec=asio.system:11
@asio|1589424178.742625|0|[email protected]
@asio|1589424195.830382|.2|non_blocking_accept,ec=system:0
@asio|1589424195.830413|>2|ec=system:0
@asio|1589424195.830473|2*3|[email protected]_receive
@asio|1589424195.830496|.3|non_blocking_recv,ec=system:0,bytes_transferred=151
@asio|1589424195.830503|2*4|[email protected]_accept
@asio|1589424195.830507|.4|non_blocking_accept,ec=asio.system:11
@asio|1589424195.830510|<2|
@asio|1589424195.830529|>3|ec=system:0,bytes_transferred=151
@asio|1589424195.831143|3^5|in 'async_write' (./../../../boost/asio/impl/write.hpp:330)
@asio|1589424195.831143|3*5|[email protected]_send
@asio|1589424195.831186|.5|non_blocking_send,ec=system:0,bytes_transferred=1090
@asio|1589424195.831194|<3|
@asio|1589424195.831218|>5|ec=system:0,bytes_transferred=1090
@asio|1589424195.831263|5|[email protected]
@asio|1589424195.831298|<5|
@asio|1589424199.793770|>1|ec=system:0,signal_number=2
@asio|1589424199.793781|1|[email protected]
@asio|1589424199.793809|<1|
@asio|1589424199.793840|>4|ec=asio.system:125
@asio|1589424199.793854|<4|
@asio|1589424199.793883|0|[email protected]

每一行的格式如下:

<tag> | <timestamp> | <action> | <description> 

<tag>總是@asio,用於從程式輸出中識別和提取Handler追蹤訊息。

<timestamp>是距離1970.1.1的秒和毫秒。

<action>採取下面的形式之一:

  • >n:程式進入了編號n的處理器。<description>顯示Handler的引數。
  • <n:程式離開了編號為n的處理器。
  • !n:由於異常,程式離開的編號為n的處理器。
  • ~n:編號為n的處理程式沒有被呼叫就被銷燬了。當io_context被銷燬時,任何未完成的非同步操作通常都是這種情況。
  • n^m:編號為n的處理程式將要建立一個新的非同步操作,其完成處理器編號為m。<description>包含源位置資訊,以幫助確定非同步操作在程式中的何處啟動。
  • n*m:編號為n的處理程式建立了一個新的非同步操作,其完成處理程式編號為m。<description>顯示了啟動了哪些非同步操作。
  • n:編號為n的處理器執行了一些其他操作。<description>顯示了呼叫了什麼函式。目前只有close()cancel()操作會被記錄,因為這些操作可能會影響掛起的非同步操作的狀態。
  • .n:該實現執行了一個系統呼叫,作為非同步操作的一部分,完成處理器編號為n。 <description> 顯示呼叫了什麼函式及其結果。 這些跟蹤事件僅在使用基於Reactor的實現時才會發出。

<description> 顯示同步或非同步操作,格式為 <object-type>@<pointer>.<operation>。 對於處理程式條目,它顯示了一個逗號分隔的引數列表及其值。

如上所示,每個處理程式都分配了一個數字識別符號。 如果處理器跟蹤輸出顯示處理程式編號為 0,則表示該操作是在任何處理器之外執行的。

新增區域性資訊

程式可以通過在原始碼中使用巨集 BOOST_ASIO_HANDLER_LOCATION 來增加處理器跟蹤輸出的位置資訊。 例如:

#define HANDLER_LOCATION \
  BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__))

// ...

void do_read()
{
  HANDLER_LOCATION;

  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(data_, max_length),
      [this, self](boost::system::error_code ec, std::size_t length)
      {
        HANDLER_LOCATION;

        if (!ec)
        {
          do_write(length);
        }
      });
}

使用附加位置資訊可用時,處理程式跟蹤輸出可能包括源位置的呼叫堆疊:

@asio|1589423304.861944|>7|ec=system:0,bytes_transferred=5
@asio|1589423304.861952|7^8|in 'async_write' (./../../../boost/asio/impl/write.hpp:330)
@asio|1589423304.861952|7^8|called from 'do_write' (handler_tracking/async_tcp_echo_server.cpp:62)
@asio|1589423304.861952|7^8|called from 'operator()' (handler_tracking/async_tcp_echo_server.cpp:51)
@asio|1589423304.861952|7*8|[email protected]_send
@asio|1589423304.861975|.8|non_blocking_send,ec=system:0,bytes_transferred=5
@asio|1589423304.861980|<7|

此外,如果 std::source_location std::experimental::source_location 可用,則 use_awaitable_t 標記(當預設構造或用作預設完成標記時)還將導致處理器跟蹤為每個新建立的非同步操作輸出源位置 。 use_awaitable_t 物件也可以用區域性資訊顯式構造。

視覺化展示

可以使用包含的 handlerviz.pl 工具對處理器跟蹤輸出進行後處理,以建立處理程式的視覺化表示(需要 GraphViz 工具dot)。

自定義追蹤

可以通過將 BOOST_ASIO_CUSTOM_HANDLER_TRACKING 巨集定義為標頭檔案的名稱(用“”或 <> 括起來)來自定義處理器跟蹤。 此標頭檔案必須實現以下前處理器巨集:

Macro Description
BOOST_ASIO_INHERIT_TRACKED_HANDLER 為實現非同步操作的類指定基類。 使用時,巨集緊跟在類名之後,因此它必須具有以下形式:public my_class。
BOOST_ASIO_ALSO_INHERIT_TRACKED_HANDLER 為實現非同步操作的類指定基類。 使用時,巨集跟隨其他基類,因此它必須具有形式,public my_class。
BOOST_ASIO_HANDLER_TRACKING_INIT(args) 用於初始化跟蹤機制的表示式。
BOOST_ASIO_HANDLER_LOCATION(args) 用於定義原始碼位置的變數宣告。 args 是一個帶括號的函式引數列表,包含檔名、行號和函式名。
BOOST_ASIO_HANDLER_CREATION(args) 在建立非同步操作時呼叫的N表示式。Args是一個帶圓括號的函式引數列表,包含擁有的執行上下文、被跟蹤的處理程式、物件型別的名稱、物件的指標、物件的本機控制代碼和操作名稱。
BOOST_ASIO_HANDLER_COMPLETION(args) 在非同步操作完成時呼叫的表示式。 args 是包含跟蹤處理程式的帶括號的函式引數列表。
BOOST_ASIO_HANDLER_INVOCATION_BEGIN(args) 在呼叫完成處理程式之前立即呼叫的表示式。 args 是一個帶括號的函式引數列表,包含完成處理程式的引數。
BOOST_ASIO_HANDLER_INVOCATION_END 在呼叫完成處理程式後立即呼叫的表示式。
BOOST_ASIO_HANDLER_OPERATION(args) 在呼叫某些同步物件操作(例如 close() 或 cancel())時呼叫的表示式。 args 是一個帶括號的函式引數列表,包含擁有的執行上下文、物件型別的名稱、指向物件的指標、物件的本機控制代碼和操作名稱。
BOOST_ASIO_HANDLER_REACTOR_REGISTRATION(args) 當物件註冊到反應器時呼叫的表示式。 args 是一個帶括號的函式引數列表,包含擁有的執行上下文、物件的本機控制代碼和唯一的註冊鍵。
BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION(args) 當物件從反應器中登出時呼叫的表示式。 args 是一個帶括號的函式引數列表,包含擁有的執行上下文、物件的本機控制代碼和唯一的註冊鍵。
BOOST_ASIO_HANDLER_REACTOR_READ_EVENT 用於識別反應器讀取就緒事件的位掩碼常量。
BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT 用於標識反應器寫入準備事件的位掩碼常量。
BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT 用於識別反應器錯誤準備事件的位掩碼常量。
BOOST_ASIO_HANDLER_REACTOR_EVENTS(args) 當註冊到反應器的物件準備就緒時呼叫的表示式。 args 是一個帶括號的函式引數列表,包含擁有的執行上下文、唯一的註冊鍵和就緒事件的位掩碼。
BOOST_ASIO_HANDLER_REACTOR_OPERATION(args) 當實現作為基於反應器的非同步操作的一部分執行系統呼叫時呼叫的表示式。 args 是一個帶括號的函式引數列表,包含被跟蹤的處理程式、操作名稱、操作產生的錯誤程式碼和(可選)傳輸的位元組數。

併發提示

io_context構造器允許程式指定一個併發提示。這是對io_context實現中應用於執行完成處理器的活動執行緒數的建議。

當後臺使用 Windows I/O 完成埠時,此值將傳遞給 CreateIoCompletionPort

當使用基於Reactor的後端時,實現會識別以下特殊的併發提示值:

Value Description
1 該實現假設 io_context 將從單個執行緒執行,並基於此假設應用多項優化。例如,當一個處理程式從另一個處理程式中釋出時,新的處理程式被新增到一個快速執行緒本地佇列(結果是新的處理程式被阻止,直到當前正在執行的處理程式完成)。
BOOST_ASIO_CONCURRENCY_HINT_UNSAFE 這個特殊的併發提示禁用了排程程式和反應器 I/O 中的鎖定。
BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO 這個特殊的併發提示禁用反應器 I/O 中的鎖定。
BOOST_ASIO_CONCURRENCY_HINT_SAFE 預設值。io_context提供了完整的執行緒安全性,並且任何執行緒都可以使用不同的I/O物件。

通過定義BOOST_ASIO_CONCURRENCY_HINT_DEFAULT巨集,可以在編譯時覆蓋預設構造的 io_context 物件使用的併發提示。 例如,在編譯期命令列指定

-DBOOST_ASIO_CONCURRENCY_HINT_DEFAULT=1

意味著對程式中所有預設構造的 io_context 物件使用併發提示 1。類似地,可以通過定義 BOOST_ASIO_CONCURRENCY_HINT_1 來覆蓋由 1 構造的 io_context 物件使用的併發提示。 例如,傳遞

-DBOOST_ASIO_CONCURRENCY_HINT_1=BOOST_ASIO_CONCURRENCY_HINT_UNSAFE

給編譯期會禁用所有物件的執行緒安全。

無堆疊協程

coroutine類提供無堆疊協程的支援。無堆疊協程使程式能夠以最小的開銷以同步方式實現非同步邏輯,如下例所示:

struct session : boost::asio::coroutine
{
  boost::shared_ptr<tcp::socket> socket_;
  boost::shared_ptr<std::vector<char> > buffer_;

  session(boost::shared_ptr<tcp::socket> socket)
    : socket_(socket),
      buffer_(new std::vector<char>(1024))
  {
  }

  void operator()(boost::system::error_code ec = boost::system::error_code(), std::size_t n = 0)
  {
    if (!ec) reenter (this)
    {
      for (;;)
      {
        yield socket_->async_read_some(boost::asio::buffer(*buffer_), *this);
        yield boost::asio::async_write(*socket_, boost::asio::buffer(*buffer_, n), *this);
      }
    }
  }
};

coroutine類與偽關鍵字reenter,yield,fork同時使用。它們是前處理器巨集,並使用類似於 Duff's Device 的技術根據 switch 語句實現。 coroutine類的文件提供了這些偽關鍵字的完整描述。

堆疊式協程

spawn() 函式是用於執行堆疊協程的高階包裝器。 它基於 Boost.Coroutine 庫。 spawn() 函式使程式能夠以同步方式實現非同步邏輯,如下例所示:

boost::asio::spawn(my_strand, do_echo);

// ...

void do_echo(boost::asio::yield_context yield)
{
  try
  {
    char data[128];
    for (;;)
    {
      std::size_t length =
        my_socket.async_read_some(
          boost::asio::buffer(data), yield);

      boost::asio::async_write(my_socket,
          boost::asio::buffer(data, length), yield);
    }
  }
  catch (std::exception& e)
  {
    // ...
  }
}

spawn() 的第一個引數可能是一個strand、io_context 或完成處理器。 此引數確定允許協程執行的上下文。 例如,伺服器的每個客戶端物件可能由多個協程組成; 它們都應該在同一strand上執行,這樣就不需要顯式同步。

第二個引數是一個帶有簽名的函式物件,說明指定的程式碼作為協程的一部分執行:

void coroutine(boost::asio::yield_context yield);

引數 yield 可以傳遞給非同步操作來代替完成處理器,如下所示:

std::size_t length = my_socket.async_read_some(boost::asio::buffer(data),yield);

這將啟動非同步操作並暫停協程。 非同步操作完成後,協程將自動恢復。

其中非同步操作的處理程式簽名具有以下形式,啟動函式返回 result_type。:

void handler(boost::system::error_code ec, result_type result);

在上面的 async_read_some 示例中,這是 size_t。 如果非同步操作失敗,則將error_code 轉換為system_error 異常並丟擲。

處理器簽名如下形式,啟動函式返回void:

void handler(boost::system::error_code ec);

如上所述,錯誤作為system_error異常傳遞迴協程。

要從操作中收集 error_code,而不是讓它丟擲異常,請將輸出變數與 yield_context 關聯,如下所示:

boost::system::error_code ec;
std::size_t length =
  my_socket.async_read_some(
    boost::asio::buffer(data), yield[ec]);

注意:如果 spawn() 與 Handler 型別的自定義完成處理程式一起使用,則函式物件簽名實際上是:

void coroutine(boost::asio::basic_yield_context<Handler> yield);

協程TS支援

通過 awaitable 類模板、use_awaitable 完成標記和co_spawn()函式提供對 Coroutines TS 的支援。 這些工具允許程式以同步方式實現非同步邏輯,結合 co_await 關鍵字,如以下示例所示:

boost::asio::co_spawn(executor, echo(std::move(socket)), boost::asio::detached);

// ...

boost::asio::awaitable<void> echo(tcp::socket socket)
{
  try
  {
    char data[1024];
    for (;;)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), boost::asio::use_awaitable);
      co_await async_write(socket, boost::asio::buffer(data, n), boost::asio::use_awaitable);
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo Exception: %s\n", e.what());
  }
}

co_spawn() 的第一個引數是一個執行器,它確定允許協程執行的上下文。 例如,伺服器的每個客戶端物件可能由多個協程組成; 它們都應該在同一個strand上執行,這樣就不需要顯式同步。

第二個引數是一個awaitable<R>,它是協程入口點函式的返回結果,在上面的例子中是呼叫echo的結果。 (或者,此引數可以是返回 awaitable<R> 的函式物件。)模板引數 R 是協程生成的返回值的型別。 在上面的例子中,協程返回 void。

第三個引數是一個完成標記,co_spawn() 使用它來生成一個帶有簽名 void(std::exception_ptr, R) 的完成處理程式。 一旦完成,這個完成處理程式就會被協程的結果呼叫。 在上面的示例中,我們傳遞了一個完成標記型別 boost::asio::detached,它用於顯式忽略非同步操作的結果。

在這個例子中,協程的主體是在 echo 函式中實現的。 當 use_awaitable 完成令牌傳遞給非同步操作時,此非同步操作的啟動函式返回一個可與co_await關鍵字一起使用的可等待物件:

std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), boost::asio::use_awaitable);

其中非同步操作的處理程式函式簽名具有以下形式:

void handler(boost::system::error_code ec, result_type result);

co_await表示式的結果型別是 result_type。在上面的 async_read_some 示例中,這是 size_t。 如果非同步操作失敗,則將error_code 轉換為system_error 異常並丟擲。

處理程式函式簽名為如下形式的:

void handler(boost::system::error_code ec);

co_await 表示式產生一個 void 結果。 如上所述,錯誤作為 system_error 異常傳遞迴協程。