1. 程式人生 > 其它 >一種有界佇列(Bounded Buffer)的實現

一種有界佇列(Bounded Buffer)的實現

一、概述

在有 CPUGPU 參與的一種運算中,比如深度學習推理,CPU 需要預處理資料,然後交給 GPU 處理,最後 CPU 對 GPU 的運算結果進行後處理
在整個過程中都是 FIFO,即資料 ABC 按順序輸入,也需要按 A'B'C' 順序輸出。
如果採用同步阻塞的方式,在 CPU 預處理時 GPU 處於空閒狀態,GPU 運算時 CPU 後處理處於空閒狀態並且也不能進行後續資料的預處理。這樣影響整體的吞吐。
期望是 GPU 運算時,CPU 可以同時進行資料預處理和後處理。這是典型的單生產者單消費者模式。

在兩個執行緒之間傳遞資料時,為確保執行緒安全,可以在一個執行緒每次 malloc

new 申請記憶體,在另一個執行緒 freedelete。為了避免頻繁的記憶體分配和釋放,需要使用到記憶體池。

本文描述採用有界佇列實現記憶體池,適用場景和限制:

  1. 需要把記憶體使用控制在一定範圍內;
  2. 整個過程不允許丟棄資料;
  3. 生產和消費之間執行緒安全;
  4. 不會(也不允許)同時生產,不會(也不允許)同時消費。如果確實要多執行緒生產或多執行緒消費,呼叫程式碼自行確保執行緒安全。

二、實現

// File: bounded_buffer.h
#pragma once
#include <cstddef>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

/*
 * @Description: BoundedBuffer。Produce 和 Consume 方法不是執行緒安全的。使用不同執行緒或確保執行緒安全地呼叫 Produce 和 Consume 方法。
 */
class BoundedBuffer
{
public:
  BoundedBuffer(const std::string& name, size_t buffers_capacity_, size_t buffer_size_max);
  ~BoundedBuffer();
  BoundedBuffer(const BoundedBuffer& rhs)            = delete;
  BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;

public:
  /**
   * @description: 生產。非執行緒安全,兩個及以上執行緒呼叫 Produce 可能會導致髒寫。
   * @param {function<void(void*)>} func
   * @return {void}
   */
  void Produce(std::function<void(void*)> func);

  /**
   * @description: 消費。非執行緒安全,兩個及以上執行緒呼叫 Consume 可能會導致讀取到同一份資料。
   * @param {function<void(void*)>} func
   * @return {void}
   */
  void Consume(std::function<void(void*)> func);

private:
  const std::string _name;

  // 記憶體池
  void** _buffers;
  // 記憶體池容量
  size_t _buffers_capacity;
  // 記憶體塊最大長度
  size_t _buffer_size_max;
  // 保護記憶體池
  std::mutex _buffers_mtx;
  // 記憶體池是否有可用的 slot (非滿則可以寫資料)
  std::condition_variable _buffers_not_full_cond;
  // 記憶體池是否非空 (非空則可以讀資料)
  std::condition_variable _buffers_not_empty_cond;
  // 記憶體池將會讀取的位置
  size_t _buffers_read_position;
  // 記憶體池當前可寫入的位置
  size_t _buffers_write_position;
};
// File: bounded_buffer.cpp
#include "bounded_buffer.h"
#include <assert.h>

BoundedBuffer::BoundedBuffer(const std::string& name, size_t buffers_capacity, size_t buffer_size_max)
  : _name(name), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max), _buffers_read_position(0), _buffers_write_position(0)
{
  assert(buffers_capacity > 1);
  assert(buffer_size_max > 0);
  _buffers = static_cast<void**>(std::malloc(sizeof(void*) * buffers_capacity));
  std::memset(_buffers, 0, sizeof(void*) * buffers_capacity);
}

BoundedBuffer::~BoundedBuffer()
{
  for (auto i = 0; i < _buffers_capacity; i++)
  {
    if (_buffers[i])
    {
      std::free(_buffers[i]);
     _buffers[i] = nullptr;
    }
  }
  std::free(_buffers);
  _buffers = nullptr;
}

void BoundedBuffer::Produce(std::function<void(void*)> func)
{
  std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
  // 等待可寫 slot。要確保本次寫入後,下次有寫入位置,所以 +1。
  _buffers_not_full_cond.wait(buffers_lock, [&] { return ((_buffers_write_position + 1) % _buffers_capacity) != _buffers_read_position; });
  // 有可寫 slot 馬上釋放。因為 func 可能是耗時操作,防止過久阻塞 Consume 造成有可讀 slot 而無法讀。
  buffers_lock.unlock();
  if (!_buffers[_buffers_write_position])
  {
    _buffers[_buffers_write_position] = std::malloc(_buffer_size_max);
  }
  auto buffer = _buffers[_buffers_write_position];
  func(buffer);
  // 更改寫 slot
  _buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity;
  _buffers_not_empty_cond.notify_one();
}

void BoundedBuffer::Consume(std::function<void(void*)> func)
{
  std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
  // 等待讀 slot
  _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
  // 有可讀 slot 馬上釋放。因為 func 可能是耗時操作,防止過久阻塞 Produce 造成有可寫 slot 而無法寫。
  buffers_lock.unlock();
  auto buffer = _buffers[_buffers_read_position];
  func(buffer);
  // 更改讀 slot
  _buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity;
  _buffers_not_full_cond.notify_one();
}

三、測試

// File: test_bounded_queue.cpp
#include "bounded_buffer.h"
#include <iostream>
#include <thread>

int main(int argc, const char** argv)
{
  // Buffer 中每塊資料最大長度 sizeof(size_t)。實際應用中,更大長度的記憶體才有意義。
  std::unique_ptr<BoundedBuffer> boundedBuffer = std::make_unique<BoundedBuffer>("Test", 4, sizeof(size_t));

  std::thread producer_thread(
    [&]
    {
      for (size_t i = 0; i < 1000; i++)
      {
        boundedBuffer->Produce(
        [=](void* buffer)
        {
          // 假設生產耗時 20ms 左右。
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          *(size_t*)buffer = i;
          // std::cout << "Produce: " << i << std::endl;
        });
      }
   });

  std::thread consumer_thread(
    [&]
    {
      for (size_t i = 0; i < 1000; i++)
      {
        boundedBuffer->Consume(
        [=](void* buffer)
        {
          auto value = *(size_t*)buffer;
          // 假設消費耗時 20ms 左右。
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          // std::cout << "Consume: " << value << std::endl;
        });
      }
   });

  producer_thread.join();
  consumer_thread.join();

  return 0;
}

執行:

$ time ./test_bounded_queue
./test_bounded_queue  0.05s user 0.05s system 0% cpu 24.314 total

理所應當地,粗略測試耗時 24s 左右比序列 40s 左右快——這不是重點,重點是達到了記憶體複用的目的。

四、說明

1、的確是需要 mutex 和 condition_variable 嗎?

是的。比如在生產時,發現“無法獲取到”可寫的 slot,又不允許丟棄資料,為了不讓生產者執行緒輪詢則只能等待。

2、為什麼 Produce 和 Consume 裡 wait 返回後馬上解鎖?

比如生產時,生產的過程可能耗時。確保“能獲取到”生產 slot 後立即解鎖,以便消費者執行緒呼叫 Consume 時如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); 能夠取得鎖,從而得以消費在本次生產之前已經生產好的 slot ——如果佇列完全沒有可讀資料當然就“轉為”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });。如果是阻塞在 wait,則會在本次生產好後通過 _buffers_not_empty_cond.notify_one(); 喚醒消費者執行緒。