Boost.ASIO原始碼:pthread包裝類——posix_event小結
posix_event介紹
該類用在scheduler中,用於喚醒阻塞的執行緒,下面程式碼中的conditionally_enabled_event實際上就是posix_event的包裝類:
class scheduler
: public execution_context_service_base<scheduler>,
public thread_context
{
// ...
private:
// The event type used by this scheduler.
typedef conditionally_enabled_event event;
// Event to wake up blocked threads.
event wakeup_event_;
// ...
}
至於為什麼要用conditionally_enabled_event來包一層呢,主要是為了多加一個enabled這個flag。
以前也總結過,io_service能夠根據傳入的concurrency_hint決定是否採用多執行緒並行,若是不採用並行,則理所當然的,加鎖就成了沒必要的操作,為了對這裡面涉及到的鎖進行統一化處理,他們決定用一個enabled標誌位來判斷是否真的加鎖。
而對於event,自然也是同理。如以下的conditionally:
void clear(conditionally_enabled_mutex::scoped_lock& lock)
{
if (lock.mutex_.enabled_) // 這裡進行enabled控制
event_.clear(lock); // 呼叫posix_event的介面
}
posix_event原始碼
posix_event結構也很簡單,實際上就是pthread的包裝。posix_event就只有兩個成員變數:
class posix_event
: private noncopyable
{
private:
::pthread_cond_t cond_; // 條件變數
std::size_t state_;
};
cond_很好理解,就是用來控制阻塞喚醒的條件變數。而state_是一個無符號整數,它的設計很巧妙,state_有2個作用:一是用來在wait中作為條件控制判斷的flag;二是用來判斷是否還有處於阻塞在該條件變數上的執行緒。下面詳細說明。
以signal_all函式為例子:
template <typename Lock>
void signal_all(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
(void)lock;
state_ |= 1;
::pthread_cond_broadcast(&cond_);
}
BOOST_ASIO_ASSERT是巨集函式,是一個斷言判斷,如果傳入值為true則沒事,傳入值為false就丟擲異常。在這裡僅是為了檢驗是否已加鎖。
第5行這種寫法在Boost中出現了多次,沒有實際作用,僅是為了欺騙編譯器,不然編譯器會報警告,說沒有使用該變數。
第6行就是關鍵的state_的控制操作,在這裡可以看出state_的最低位就是我們的flag標誌位。結合wait函式來看邏輯可能會更清晰:
// Wait for the event to become signalled.
template <typename Lock>
void wait(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
while ((state_ & 1) == 0)
{
state_ += 2;
::pthread_cond_wait(&cond_, &lock.mutex().mutex_); // Ignore EINVAL.
state_ -= 2;
}
}
第5行的迴圈判斷就用到了state的最低位,若該位為0,則說明還沒有到達喚醒的條件,否則說明該執行緒已正常加鎖喚醒。
至於為什麼要用迴圈判斷,這算是pthread_cond_wait的標準用法,因為處於等待狀態的執行緒可能有多個,一次可能有多個執行緒被喚醒,而這裡要保證進入臨界區的執行緒只有一個。
再回到上面的wait函式中,可以看到在呼叫pthread_cond_wait進入阻塞狀態之前,先將state_自增2。這裡可以看出 state_ / 2 實際上就是當前處於等待狀態的執行緒數了,此時無論最低位是否為0,也就是無論state是否為偶數都不影響。由此達到用一個無符號整數變數記錄2個操作資訊的目的。
在最後,還是附上posix_event的完整程式碼,我這裡刪除了許多條件編譯相關的程式碼,該原始碼是基於Linux的。
class posix_event
: private noncopyable
{
public:
// Constructor.
BOOST_ASIO_DECL posix_event();
// Destructor.
~posix_event()
{
::pthread_cond_destroy(&cond_);
}
// Signal the event. (Retained for backward compatibility.)
template <typename Lock>
void signal(Lock& lock)
{
this->signal_all(lock);
}
// Signal all waiters.
template <typename Lock>
void signal_all(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
(void)lock;
state_ |= 1;
::pthread_cond_broadcast(&cond_); // Ignore EINVAL.
}
// Unlock the mutex and signal one waiter.
template <typename Lock>
void unlock_and_signal_one(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
state_ |= 1;
bool have_waiters = (state_ > 1);
lock.unlock();
if (have_waiters)
::pthread_cond_signal(&cond_); // Ignore EINVAL.
}
// If there's a waiter, unlock the mutex and signal it.
template <typename Lock>
bool maybe_unlock_and_signal_one(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
state_ |= 1;
if (state_ > 1)
{
lock.unlock();
::pthread_cond_signal(&cond_); // Ignore EINVAL.
return true;
}
return false;
}
// Reset the event.
template <typename Lock>
void clear(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
(void)lock;
state_ &= ~std::size_t(1);
}
// Wait for the event to become signalled.
template <typename Lock>
void wait(Lock& lock)
{
BOOST_ASIO_ASSERT(lock.locked());
while ((state_ & 1) == 0)
{
state_ += 2;
::pthread_cond_wait(&cond_, &lock.mutex().mutex_); // Ignore EINVAL.
state_ -= 2;
}
}
// Timed wait for the event to become signalled.
template <typename Lock>
bool wait_for_usec(Lock& lock, long usec)
{
BOOST_ASIO_ASSERT(lock.locked());
if ((state_ & 1) == 0)
{
state_ += 2;
timespec ts;
if (::clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
{
ts.tv_sec += usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec = ts.tv_nsec % 1000000000;
::pthread_cond_timedwait(&cond_,
&lock.mutex().mutex_, &ts); // Ignore EINVAL.
}
state_ -= 2;
}
return (state_ & 1) != 0;
}
private:
::pthread_cond_t cond_;
std::size_t state_;
};