1. 程式人生 > >Boost.ASIO原始碼:thread_info_base::allocate古怪的記憶體分配邏輯解析與猜想

Boost.ASIO原始碼:thread_info_base::allocate古怪的記憶體分配邏輯解析與猜想

自我感覺這個記憶體組織思路很有參考學習價值,故在此總結一下。

thread_info_base介紹

啥是thread_info

要引出thread_info_base就得從scheduler說起,scheduler實際上就是io_service的一個實現,而眾所周知(不知者百度),io_service中有一個公共佇列和若干個與執行緒數對應的私有佇列,當用戶post了資料進來時,隨機的某個空閒執行緒的私有佇列就會push進這個請求,然後該執行緒就會開始處理這個請求。這時候再回歸問題,thread_info儲存的就是某個執行緒下的私有佇列以及佇列中的未完成工作數:

struct
scheduler_thread_info : public thread_info_base { op_queue<scheduler_operation> private_op_queue; // 工作佇列 long private_outstanding_work; // 未完成的工作數。(這個outstanding我剛開始還理解的是“傑出的”的意思。。。實際是“未完成的”) };

這個scheduler_thread_info就是實際所用的實現類,接下來看看它的基類thread_info_base

thread_info_base概覽

直接上原始碼:

class
thread_info_base : private noncopyable { public: thread_info_base() : reusable_memory_(0) {} ~thread_info_base() { if (reusable_memory_) ::operator delete(reusable_memory_); } static void* allocate(thread_info_base* this_thread, std::size_t size) { // 一大坨程式碼 } static
void deallocate(thread_info_base* this_thread, void* pointer, std::size_t size) { // another 一大坨程式碼 } private: enum { chunk_size = 4 }; // 分配記憶體的最小單位為32位 void* reusable_memory_; // 一塊可重用記憶體的首地址 };

這個類實際上並沒有什麼資訊,除了繼承自noncopyable表明它是不可複製的,剩下的就全都與記憶體分配相關了。
需要稍稍注意的是 那個只有一個元素的內部列舉類,這種寫法實際上只是為了得到一個編譯器常量罷了,這裡並沒有任何語義上的列舉用法,寫成constexpr chunk_size = 4或者Integral_constant<int, 4> chunk_size應該也是一樣的。

thread_info的記憶體分配邏輯細節

allocate與deallocate

先上allocate和deallocate的原始碼,我特地加上了註釋方便理解:

static void* allocate(thread_info_base* this_thread, std::size_t size /* 需要分配的位元組數 */)
  { // 注意這裡chunk沒有明顯的語義,它就是個最小儲存單位,即4位元組
    std::size_t chunks = (size + chunk_size - 1) / chunk_size;  // 由位元組數得到需要分配的chunk數,注意向上取整

    if (this_thread && this_thread->reusable_memory_)//判斷傳進來的this_thread是不是空指標且它內部所指的那塊可重用記憶體空間是不是空的
    { // 如果有現有的可重用記憶體空間,就先判斷現有的記憶體空間夠不夠大,如果足夠塞下那就直接返回,不夠的話就重新申請記憶體
      void* const pointer = this_thread->reusable_memory_; // 把這塊我們需要操作的記憶體空間暫存下來
      this_thread->reusable_memory_ = 0;  // 先把它置空晾一邊

      unsigned char* const mem = static_cast<unsigned char*>(pointer);  // 轉成char陣列方便指標運算
      //這裡是個小優化,後面講。只要知道mem[0]記憶體的是這塊記憶體的chunk數目大小就行了
      if (static_cast<std::size_t>(mem[0]) >= chunks)  // 如果這塊記憶體空間的chunk足夠多,那就直接返回
      {
      	// 把chunk數目大小存在這塊記憶體空間的最後面
      	// (注意這裡不會記憶體越界,因為實際上總會比記錄的chunk多分配一個位元組)
      	// (還有人會問一個位元組就夠存嗎?再回到上面看一下,我們實際上只用了一個位元組來表示chunk數量,mem[0]取到的只有一個位元組,畢竟它只是char,不是int)
        mem[size] = mem[0];
        return pointer;
      }
		// 否則若這塊記憶體不夠我們想要分配的量,那就刪掉這塊記憶體重新分配
      ::operator delete(pointer);
    }
	//以下是重新分配記憶體邏輯
    void* const pointer = ::operator new(chunks * chunk_size + 1); // 注意此處多分配了一個位元組
    unsigned char* const mem = static_cast<unsigned char*>(pointer);
    // UCHAR_MAX代表'unsigned char'可以儲存的最大值(因為我們的chunk數相當於是用一個unsigned char來儲存的呀)
    mem[size] = (chunks <= UCHAR_MAX) ? static_cast<unsigned char>(chunks) : 0;  // 把chunk數存到尾端
    return pointer;
  }

  static void deallocate(thread_info_base* this_thread,
      void* pointer, std::size_t size)  // pointer所指的記憶體才是要釋放的記憶體
  {
    if (size <= chunk_size * UCHAR_MAX)  // 這塊記憶體的chunk數是不是超出了一個位元組所能表示的極限
    {
      if (this_thread && this_thread->reusable_memory_ == 0)
      {
        unsigned char* const mem = static_cast<unsigned char*>(pointer);
        mem[0] = mem[size];  // 把存在尾端的chunk數重新存到第一個位元組
        this_thread->reusable_memory_ = pointer;
        return;
      }
    }
    ::operator delete(pointer);
  }

細節邏輯在上面程式碼中的註釋已經講得很詳細了,這裡總結一下allocate和deallocate的邏輯思路。先宣告,allocate和deallocate都是最底層函式,最外界實際上是不會傳入thread_info_base引數的,這個引數來自另一個thread_info棧,等會再講。
deallocate實際上先判斷pointer所指的這塊要釋放的記憶體是不是太大了,如果太大了就直接刪掉,否則就重新組織一下並把它放到可重用記憶體塊中方便下次分配記憶體時使用;而allocate則是取出這塊可重用記憶體塊(若有的話),足夠大就直接返回,否則就重新申請塊記憶體空間再返回。其中涉及到的關於可重用記憶體塊的chunk數的維護這裡不細講了,不懂者多看兩次上面的註釋也就懂了。

自動傳入的thread_info_base

直接看呼叫處:

void* asio_handler_allocate(std::size_t size, ...)
{
  return detail::thread_info_base::allocate(
      detail::thread_context::thread_call_stack::top(), size);
}
void asio_handler_deallocate(void* pointer, std::size_t size, ...)
{
  detail::thread_info_base::deallocate(
      detail::thread_context::thread_call_stack::top(), pointer, size);
}

可以看到都是自動把一個棧的最上層元素給傳入。這個thread_call_stack實際上是call_stack<thread_context, thread_info_base>的別名:

class thread_context
{
public:
  // Per-thread call stack to track the state of each thread in the context.
  typedef call_stack<thread_context, thread_info_base> thread_call_stack;
};

為了方便後面理解,這裡先提前劇透:這個stack實際上就是個單鏈表,每個節點儲存有鍵值對,鍵就是thread_context,值就是thread_info_base。
再小小提點一下,諸如scheduler這種類也是繼承自thread_context的,實際上就是為了能在這個棧中通過scheduler找到它的scheduler_thread_info(這是個實現類)。
再看call_stack,這裡面大部分的邏輯都只是維護棧結構用的,關係不大,但是為了程式碼完整性我還是全部貼出來了,以防有人像我一樣看不到完整程式碼就不舒服斯基,重點只用關注最下面的top_靜態成員就行了:

// Helper class to determine whether or not the current thread is inside an
// invocation of io_context::run() for a specified io_context object.
template <typename Key, typename Value = unsigned char>
class call_stack
{
public:
  // Context class automatically pushes the key/value pair on to the stack.
  class context
    : private noncopyable
  {
  public:
    // Push the key on to the stack.
    explicit context(Key* k)
      : key_(k),
        next_(call_stack<Key, Value>::top_)
    {
      value_ = reinterpret_cast<unsigned char*>(this);
      call_stack<Key, Value>::top_ = this;
    }

    // Push the key/value pair on to the stack.
    context(Key* k, Value& v)
      : key_(k),
        value_(&v),
        next_(call_stack<Key, Value>::top_)
    {
      call_stack<Key, Value>::top_ = this;
    }

    // Pop the key/value pair from the stack.
    ~context()
    {
      call_stack<Key, Value>::top_ = next_;
    }

    // Find the next context with the same key.
    Value* next_by_key() const
    {
      context* elem = next_;
      while (elem)
      {
        if (elem->key_ == key_)
          return elem->value_;
        elem = elem->next_;
      }
      return 0;
    }

  private:
    friend class call_stack<Key, Value>;

    // The key associated with the context.
    Key* key_;

    // The value associated with the context.
    Value* value_;

    // The next element in the stack.
    context* next_;
  };

  friend class context;

  // Determine whether the specified owner is on the stack. Returns address of
  // key if present, 0 otherwise.
  static Value* contains(Key* k)
  {
    context* elem = top_;
    while (elem)
    {
      if (elem->key_ == k)
        return elem->value_;
      elem = elem->next_;
    }
    return 0;
  }

  // Obtain the value at the top of the stack.
  static Value* top()
  {
    context* elem = top_;
    return elem ? elem->value_ : 0;
  }

private:
  // The top of the stack of calls for the current thread.
  static tss_ptr<context> top_;
};

總結下:context有點類似pair,只不過構造context時會自動把這個context放到棧頂,棧頂由top_維護,是全域性的。tss_ptr就是個普通指標型別,跟普通指標的唯二區別就是繼承自noncopyable以及它的值是儲存在一個靜態變數裡的

為何要自動傳入棧頂的thread_info_base

先拉個免責宣告,以下內容沒有得到完整的原始碼證明,或者說原始碼沒完全看明白(因為找不到最外外外面是怎麼呼叫allocate的。。),一部分來源於第六感猜測,僅作參考,不要把以下理論當成標準。當然,若有人知道詳情歡迎打臉。

前面已經知道了asio_handler_allocate呼叫了最底層的thread_info_base_allocate(deallocate同理),再往外找可以看到:

namespace boost_asio_handler_alloc_helpers {
	template <typename Handler>
	inline void* allocate(std::size_t s, Handler& h)
	{
	  using boost::asio::asio_handler_allocate;
	  return asio_handler_allocate(s, boost::asio::detail::addressof(h));
	}
	
	// 。。。。。
}

可以看到這裡用了一個模板函式allocate來呼叫asio_handler_allocate,這裡傳入的第二個引數在下面並沒有得到處理,我也看不懂這個是用來幹嗎的。再往外找(這裡只列一個例子,還有很多類似的呼叫):

template <typename Handler, typename Arg1, typename Arg2,
    typename Arg3, typename Arg4, typename Arg5>
inline void* asio_handler_allocate(std::size_t size,
    binder5<Handler, Arg1, Arg2, Arg3, Arg4, Arg5>* this_handler)
{
  return boost_asio_handler_alloc_helpers::allocate(
      size, this_handler->handler_);
}

這裡能確定的是傳入的handler是一個偽函式,binder5只是給某個函式物件綁上5個固定引數,與std::bind同理,而這個偽函式物件傳進allocate函式之後其實啥都不幹(就是耍流氓)。

好了,以下來自我的推理:
首先,為什麼要給top分配大小是這個偽函式的大小(這個很可能是僅用一個位元組儲存chunk數的原因)。說明這個棧頂的thread_info_base是用來存這個偽函式物件的,再推導一下,很可能所有的thread_info_base中的可重用記憶體空間都是給這些偽函式物件用的,因為這樣才有重用的價值——因為偽函式物件的大小差別不會太大,這樣造成的內部碎片才不會太大。
接下來再看下另外一個用到thread_info的地方:

std::size_t scheduler::run(boost::system::error_code& ec)
{
  ec = boost::system::error_code();
  if (outstanding_work_ == 0)
  {
    stop();
    return 0;
  }

  thread_info this_thread;   // 注意這裡宣告的臨時變數,並初始化
  this_thread.private_outstanding_work = 0;
  thread_call_stack::context ctx(this, this_thread);  // 還記得scheduler是繼承自thread_context的麼

  mutex::scoped_lock lock(mutex_);

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock())
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;
}

這相當於io_service的run,在這裡面會初始化一個thread_info,此時它就在棧頂了,注意馬上就上鎖了所以能保證同時間後續的處理邏輯中,棧頂的那個thread_info一直保持不變(哪怕是由於多個run同時觸發,2個同時走完this_thread的建立,而實際拿到鎖的那個run的後續處理用的實際不是自己建立的那個thread_info也沒關係,反正都一樣,只要始終如一就行)。然後後續的處理邏輯,也就是do_run_one中,很可能會建立前面提到的那些handler(偽函式),它們的記憶體空間應該是來自這塊可重用記憶體,而執行緒可能會重複多次do_run_one,也就是說會用到多個handler,如此重複的分配記憶體回收記憶體用上可重用記憶體的邏輯應該就能帶來很大的效率提升。最後全部執行完後,函式結束,ctx作為臨時變數將會被自動回收,然後執行context的解構函式,這裡會自動將該context從棧中移除從而達到目的。
補充一下,do_run_one執行結束之前會呼叫lock.unlock(),也就是說在2次連續的do_run_one之間可能會發生CPU競爭,但哪怕是這裡切換了程序也不要緊,此時唯一用到了“棧頂”這一地方的就只有分配記憶體與回收記憶體的邏輯中了,其它的業務邏輯不會受它影響(至少在我看到的範圍內,看不到的地方只能上帝保佑了)。

再次宣告,最後一段可靠性尚存疑,僅作參考討論。