Boost庫之asio io_service以及run、run_one、poll、poll_one區別
一、io_service的作用
io_servie 實現了一個任務隊列,這裏的任務就是void(void)的函數。Io_servie最常用的兩個接口是post和run,post向任務隊列中投遞任務,run是執行隊列中的任務,直到全部執行完畢,並且run可以被N個線程調用。Io_service是完全線程安全的隊列。
二、Io_servie的接口
提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop
三、Io_servie 實現代碼的基本類結構:
Io_servie是接口類,為實現跨平臺,采用了策略模式,所有接口均有impl_type實現。根據平臺不同impl_type分為
win_iocp_io_service Win版本的實現,這裏主要分析Linux版本。
task_io_service 非win平臺下的實現,其代碼結構為:
detail/task_io_service_fwd.hpp 簡單聲明task_io_service名稱
detail/task_io_service.hpp 聲明task_io_service的方法和屬性
detail/impl/task_io_service.ipp 具體實現文件
隊列中的任務類型為opertioan,原型其實是typedef task_io_service_operation operation,其實現文件在detail/task_io_service_operation.hpp中,當隊列中的任務被執行時,就是task_io_service_operation:: complete被調用的時候。
四、Dispatch和post的區別
Post一定是PostQueuedCompletionStatus並且在GetQueuedCompletionStatus 之後執行。
Dispatch會首先檢查當前thread是不是io_service.run/runonce/poll/poll_once線程,如果是,則直接運行。
五、Io_servie::run方法的實現
Run方法執行隊列中的所有任務,直到任務執行完畢。
run方法首先構造一個idle_thread_info,和first_idle_thread_類型相同,即通過first_idle_thread_將所有線程串聯起來,它這個串聯不是立即串聯的,當該線程無任務可做是加入到first_idle_thread_的首部,有任務執行時,從first_idle_thread_中斷開。這很正常,因為first_idle_thread_維護的是當前空閑線程。
加鎖,循環執行do_one方法,直到do_one返回false
do_one每次執行一個任務。首先檢查隊列是否為空,若空將此線程追加到first_idle_thread_的首部,然後阻塞在條件變量上,直到被喚醒。
當被喚醒或是首次執行,若stopped_為true(即此時stop方法被調用了),返回0
隊列非空,pop出一個任務,檢查隊列無任務那麽簡單的解鎖,若仍有,調用wake_one_thread_and_unlock嘗試喚醒其他空閑線程執行。然後執行該任務,返回1.
實際上在執行隊列任務時有一個特別的判斷if (o ==&task_operation_),那麽將會執行task_->run,task_變量類型為reactor,在linux平臺實現為epoll_reactor,實現代碼文件為detail/impl/epoll_reactor.ipp,run方法實際上執行的是epoll_wait,run阻塞在epoll_wait上等待事件到來,並且處理完事件後將需要回調的函數push到io_servie的任務隊列中,雖然epoll_wait是阻塞的,但是它提供了interrupt函數,該interrupt是如何實現的呢,它向epoll_wait添加一個文件描述符,該文件描述符中有8個字節可讀,這個文件描述符是專用於中斷epoll_wait的,他被封裝到select_interrupter中,select_interrupter實際上實現是eventfd_select_interrupter,在構造的時候通過pipe系統調用創建兩個文件描述符,然後預先通過write_fd寫8個字節,這8個字節一直保留。在添加到epoll_wait中采用EPOLLET水平觸發,這樣,只要select_interrupter的讀文件描述符添加到epoll_wait中,立即中斷epoll_wait。
Run方法的原則是:
有任務立即執行任務,盡量使所有的線程一起執行任務
若沒有任務,阻塞在epoll_wait上等待io事件
若有新任務到來,並且沒有空閑線程,那麽先中斷epoll_wait,先執行任務
若隊列中有任務,並且也需要epoll_wait監聽事件,那麽非阻塞調用epoll_wait(timeout字段設置為0),待任務執行完畢在阻塞在epoll_wait上。
幾乎對線程的使用上達到了極致。
從這個函數中可以知道,在使用ASIO時,io_servie應該盡量多,這樣可以使其epoll_wait占用的時間片最多,這樣可以最大限度的響應IO事件,降低響應時延。但是每個io_servie::run占用一個線程,所以io_servie最佳應該和CPU的核數相同。
六、Io_servie::stop的實現
加鎖,調用stop_all_threads
設置stopped_變量為true,遍歷所有的空閑線程,依次喚醒
task_interrupted_設置為true,調用task_的interrupt方法。
七、reset和stop
文檔中reset的解釋是重置io_service以便下一次調用。
當 run,run_one,poll,poll_one是被stop掉導致退出,或者由於完成了所有任務(正常退出)導致退出時,在調用下一次 run,run_one,poll,poll_one之前,必須調用此函數。reset不能在run,run_one,poll,poll_one正在運行時調用。如果是消息處理handler(用戶代碼)拋出異常,則可以在處理之後直接繼續調用 io.run,run_one,poll,poll_one。
八、run,run_one,poll,poll_one的區別
run其實就是一直循環執行do_one,並且是以阻塞方式進行的(參數為true),而run_one同樣是以阻塞方式進行的,但只執行一次do_one;poll和run幾乎完全相同,只是它是以非阻塞方式執行do_one(參數為false),poll_one是以非阻塞方式執行一次do_one。
run,run_one,及poll,poll_one的實現代碼,如下:
[cpp] view plain copy
- // Run the event loop until stopped or no more work.
- size_t run(boost::system::error_code& ec)
- {
- if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
- {
- stop();
- ec = boost::system::error_code();
- return 0;
- }
- call_stack<win_iocp_io_service>::context ctx(this);
- size_t n = 0;
- while (do_one(true, ec))
- if (n != (std::numeric_limits<size_t>::max)())
- ++n;
- return n;
- }
- // Run until stopped or one operation is performed.
- size_t run_one(boost::system::error_code& ec)
- {
- if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
- {
- stop();
- ec = boost::system::error_code();
- return 0;
- }
- call_stack<win_iocp_io_service>::context ctx(this);
- return do_one(true, ec);
- }
- // Poll for operations without blocking.
- size_t poll(boost::system::error_code& ec)
- {
- if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
- {
- stop();
- ec = boost::system::error_code();
- return 0;
- }
- call_stack<win_iocp_io_service>::context ctx(this);
- size_t n = 0;
- while (do_one(false, ec))
- if (n != (std::numeric_limits<size_t>::max)())
- ++n;
- return n;
- }
- // Poll for one operation without blocking.
- size_t poll_one(boost::system::error_code& ec)
- {
- if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
- {
- stop();
- ec = boost::system::error_code();
- return 0;
- }
- call_stack<win_iocp_io_service>::context ctx(this);
- return do_one(false, ec);
- }
- do_one的函數原型
- size_t do_one(bool block, boost::system::error_code& ec)
- {
- …
- BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? timeout : 0);
- …
- }
Boost庫之asio io_service以及run、run_one、poll、poll_one區別