1. 程式人生 > >Boost.ASIO原始碼:pthread包裝類——posix_event小結

Boost.ASIO原始碼:pthread包裝類——posix_event小結

posix_event介紹

該類用在scheduler中,用於喚醒阻塞的執行緒,下面程式碼中的conditionally_enabled_event實際上就是posix_event的包裝類:

class scheduler
  : public execution_context_service_base<scheduler>,
    public thread_context
{
// ...

private// The event type used by this scheduler.
  typedef conditionally_enabled_event event;
// Event to wake up blocked threads. event wakeup_event_; // ... }

至於為什麼要用conditionally_enabled_event來包一層呢,主要是為了多加一個enabled這個flag。
以前也總結過,io_service能夠根據傳入的concurrency_hint決定是否採用多執行緒並行,若是不採用並行,則理所當然的,加鎖就成了沒必要的操作,為了對這裡面涉及到的鎖進行統一化處理,他們決定用一個enabled標誌位來判斷是否真的加鎖。
而對於event,自然也是同理。如以下的conditionally:

  void
clear(conditionally_enabled_mutex::scoped_lock& lock) { if (lock.mutex_.enabled_) // 這裡進行enabled控制 event_.clear(lock); // 呼叫posix_event的介面 }

posix_event原始碼

posix_event結構也很簡單,實際上就是pthread的包裝。posix_event就只有兩個成員變數:

class posix_event
  : private noncopyable
{
private:
  ::pthread_cond_t cond_;
// 條件變數 std::size_t state_; };

cond_很好理解,就是用來控制阻塞喚醒的條件變數。而state_是一個無符號整數,它的設計很巧妙,state_有2個作用:一是用來在wait中作為條件控制判斷的flag;二是用來判斷是否還有處於阻塞在該條件變數上的執行緒。下面詳細說明。
以signal_all函式為例子:

  template <typename Lock>
  void signal_all(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    (void)lock;
    state_ |= 1;
    ::pthread_cond_broadcast(&cond_);
  }

BOOST_ASIO_ASSERT是巨集函式,是一個斷言判斷,如果傳入值為true則沒事,傳入值為false就丟擲異常。在這裡僅是為了檢驗是否已加鎖。
第5行這種寫法在Boost中出現了多次,沒有實際作用,僅是為了欺騙編譯器,不然編譯器會報警告,說沒有使用該變數。
第6行就是關鍵的state_的控制操作,在這裡可以看出state_的最低位就是我們的flag標誌位。結合wait函式來看邏輯可能會更清晰:

  // Wait for the event to become signalled.
  template <typename Lock>
  void wait(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    while ((state_ & 1) == 0)
    {
      state_ += 2;
      ::pthread_cond_wait(&cond_, &lock.mutex().mutex_); // Ignore EINVAL.
      state_ -= 2;
    }
  }

第5行的迴圈判斷就用到了state的最低位,若該位為0,則說明還沒有到達喚醒的條件,否則說明該執行緒已正常加鎖喚醒。
至於為什麼要用迴圈判斷,這算是pthread_cond_wait的標準用法,因為處於等待狀態的執行緒可能有多個,一次可能有多個執行緒被喚醒,而這裡要保證進入臨界區的執行緒只有一個。
再回到上面的wait函式中,可以看到在呼叫pthread_cond_wait進入阻塞狀態之前,先將state_自增2。這裡可以看出 state_ / 2 實際上就是當前處於等待狀態的執行緒數了,此時無論最低位是否為0,也就是無論state是否為偶數都不影響。由此達到用一個無符號整數變數記錄2個操作資訊的目的。
在最後,還是附上posix_event的完整程式碼,我這裡刪除了許多條件編譯相關的程式碼,該原始碼是基於Linux的。

class posix_event
  : private noncopyable
{
public:
  // Constructor.
  BOOST_ASIO_DECL posix_event();

  // Destructor.
  ~posix_event()
  {
    ::pthread_cond_destroy(&cond_);
  }

  // Signal the event. (Retained for backward compatibility.)
  template <typename Lock>
  void signal(Lock& lock)
  {
    this->signal_all(lock);
  }

  // Signal all waiters.
  template <typename Lock>
  void signal_all(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    (void)lock;
    state_ |= 1;
    ::pthread_cond_broadcast(&cond_); // Ignore EINVAL.
  }

  // Unlock the mutex and signal one waiter.
  template <typename Lock>
  void unlock_and_signal_one(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    state_ |= 1;
    bool have_waiters = (state_ > 1);
    lock.unlock();
    if (have_waiters)
      ::pthread_cond_signal(&cond_); // Ignore EINVAL.
  }

  // If there's a waiter, unlock the mutex and signal it.
  template <typename Lock>
  bool maybe_unlock_and_signal_one(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    state_ |= 1;
    if (state_ > 1)
    {
      lock.unlock();
      ::pthread_cond_signal(&cond_); // Ignore EINVAL.
      return true;
    }
    return false;
  }

  // Reset the event.
  template <typename Lock>
  void clear(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    (void)lock;
    state_ &= ~std::size_t(1);
  }

  // Wait for the event to become signalled.
  template <typename Lock>
  void wait(Lock& lock)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    while ((state_ & 1) == 0)
    {
      state_ += 2;
      ::pthread_cond_wait(&cond_, &lock.mutex().mutex_); // Ignore EINVAL.
      state_ -= 2;
    }
  }

  // Timed wait for the event to become signalled.
  template <typename Lock>
  bool wait_for_usec(Lock& lock, long usec)
  {
    BOOST_ASIO_ASSERT(lock.locked());
    if ((state_ & 1) == 0)
    {
      state_ += 2;
      timespec ts;

      if (::clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
      {
        ts.tv_sec += usec / 1000000;
        ts.tv_nsec = (usec % 1000000) * 1000;
        ts.tv_sec += ts.tv_nsec / 1000000000;
        ts.tv_nsec = ts.tv_nsec % 1000000000;
        ::pthread_cond_timedwait(&cond_,
            &lock.mutex().mutex_, &ts); // Ignore EINVAL.
      }

      state_ -= 2;
    }
    return (state_ & 1) != 0;
  }

private:
  ::pthread_cond_t cond_;
  std::size_t state_;
};