一種有界佇列(Bounded Buffer)的實現
一、概述
在有 CPU
和 GPU
參與的一種運算中,比如深度學習推理,CPU 需要預處理資料,然後交給 GPU 處理,最後 CPU 對 GPU 的運算結果進行後處理。
在整個過程中都是 FIFO
,即資料 ABC 按順序輸入,也需要按 A'B'C' 順序輸出。
如果採用同步阻塞的方式,在 CPU 預處理時 GPU 處於空閒狀態,GPU 運算時 CPU 後處理處於空閒狀態並且也不能進行後續資料的預處理。這樣影響整體的吞吐。
期望是 GPU 運算時,CPU 可以同時進行資料預處理和後處理。這是典型的單生產者單消費者模式。
在兩個執行緒之間傳遞資料時,為確保執行緒安全,可以在一個執行緒每次 malloc
new
申請記憶體,在另一個執行緒 free
或 delete
。為了避免頻繁的記憶體分配和釋放,需要使用到記憶體池。
本文描述採用有界佇列實現記憶體池,適用場景和限制:
- 需要把記憶體使用控制在一定範圍內;
- 整個過程不允許丟棄資料;
- 生產和消費之間執行緒安全;
- 不會(也不允許)同時生產,不會(也不允許)同時消費。如果確實要多執行緒生產或多執行緒消費,呼叫程式碼自行確保執行緒安全。
二、實現
// File: bounded_buffer.h #pragma once #include <cstddef> #include <functional> #include <mutex> #include <string> #include <thread> /* * @Description: BoundedBuffer。Produce 和 Consume 方法不是執行緒安全的。使用不同執行緒或確保執行緒安全地呼叫 Produce 和 Consume 方法。 */ class BoundedBuffer { public: BoundedBuffer(const std::string& name, size_t buffers_capacity_, size_t buffer_size_max); ~BoundedBuffer(); BoundedBuffer(const BoundedBuffer& rhs) = delete; BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete; public: /** * @description: 生產。非執行緒安全,兩個及以上執行緒呼叫 Produce 可能會導致髒寫。 * @param {function<void(void*)>} func * @return {void} */ void Produce(std::function<void(void*)> func); /** * @description: 消費。非執行緒安全,兩個及以上執行緒呼叫 Consume 可能會導致讀取到同一份資料。 * @param {function<void(void*)>} func * @return {void} */ void Consume(std::function<void(void*)> func); private: const std::string _name; // 記憶體池 void** _buffers; // 記憶體池容量 size_t _buffers_capacity; // 記憶體塊最大長度 size_t _buffer_size_max; // 保護記憶體池 std::mutex _buffers_mtx; // 記憶體池是否有可用的 slot (非滿則可以寫資料) std::condition_variable _buffers_not_full_cond; // 記憶體池是否非空 (非空則可以讀資料) std::condition_variable _buffers_not_empty_cond; // 記憶體池將會讀取的位置 size_t _buffers_read_position; // 記憶體池當前可寫入的位置 size_t _buffers_write_position; };
// File: bounded_buffer.cpp #include "bounded_buffer.h" #include <assert.h> BoundedBuffer::BoundedBuffer(const std::string& name, size_t buffers_capacity, size_t buffer_size_max) : _name(name), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max), _buffers_read_position(0), _buffers_write_position(0) { assert(buffers_capacity > 1); assert(buffer_size_max > 0); _buffers = static_cast<void**>(std::malloc(sizeof(void*) * buffers_capacity)); std::memset(_buffers, 0, sizeof(void*) * buffers_capacity); } BoundedBuffer::~BoundedBuffer() { for (auto i = 0; i < _buffers_capacity; i++) { if (_buffers[i]) { std::free(_buffers[i]); _buffers[i] = nullptr; } } std::free(_buffers); _buffers = nullptr; } void BoundedBuffer::Produce(std::function<void(void*)> func) { std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); // 等待可寫 slot。要確保本次寫入後,下次有寫入位置,所以 +1。 _buffers_not_full_cond.wait(buffers_lock, [&] { return ((_buffers_write_position + 1) % _buffers_capacity) != _buffers_read_position; }); // 有可寫 slot 馬上釋放。因為 func 可能是耗時操作,防止過久阻塞 Consume 造成有可讀 slot 而無法讀。 buffers_lock.unlock(); if (!_buffers[_buffers_write_position]) { _buffers[_buffers_write_position] = std::malloc(_buffer_size_max); } auto buffer = _buffers[_buffers_write_position]; func(buffer); // 更改寫 slot _buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity; _buffers_not_empty_cond.notify_one(); } void BoundedBuffer::Consume(std::function<void(void*)> func) { std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); // 等待讀 slot _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; }); // 有可讀 slot 馬上釋放。因為 func 可能是耗時操作,防止過久阻塞 Produce 造成有可寫 slot 而無法寫。 buffers_lock.unlock(); auto buffer = _buffers[_buffers_read_position]; func(buffer); // 更改讀 slot _buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity; _buffers_not_full_cond.notify_one(); }
三、測試
// File: test_bounded_queue.cpp
#include "bounded_buffer.h"
#include <iostream>
#include <thread>
int main(int argc, const char** argv)
{
// Buffer 中每塊資料最大長度 sizeof(size_t)。實際應用中,更大長度的記憶體才有意義。
std::unique_ptr<BoundedBuffer> boundedBuffer = std::make_unique<BoundedBuffer>("Test", 4, sizeof(size_t));
std::thread producer_thread(
[&]
{
for (size_t i = 0; i < 1000; i++)
{
boundedBuffer->Produce(
[=](void* buffer)
{
// 假設生產耗時 20ms 左右。
std::this_thread::sleep_for(std::chrono::milliseconds(20));
*(size_t*)buffer = i;
// std::cout << "Produce: " << i << std::endl;
});
}
});
std::thread consumer_thread(
[&]
{
for (size_t i = 0; i < 1000; i++)
{
boundedBuffer->Consume(
[=](void* buffer)
{
auto value = *(size_t*)buffer;
// 假設消費耗時 20ms 左右。
std::this_thread::sleep_for(std::chrono::milliseconds(20));
// std::cout << "Consume: " << value << std::endl;
});
}
});
producer_thread.join();
consumer_thread.join();
return 0;
}
執行:
$ time ./test_bounded_queue
./test_bounded_queue 0.05s user 0.05s system 0% cpu 24.314 total
理所應當地,粗略測試耗時 24s 左右比序列 40s 左右快——這不是重點,重點是達到了記憶體複用的目的。
四、說明
1、的確是需要 mutex 和 condition_variable 嗎?
是的。比如在生產時,發現“無法獲取到”可寫的 slot,又不允許丟棄資料,為了不讓生產者執行緒輪詢則只能等待。
2、為什麼 Produce 和 Consume 裡 wait 返回後馬上解鎖?
比如生產時,生產的過程可能耗時。確保“能獲取到”生產 slot 後立即解鎖,以便消費者執行緒呼叫 Consume 時如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
能夠取得鎖,從而得以消費在本次生產之前已經生產好的 slot ——如果佇列完全沒有可讀資料當然就“轉為”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
。如果是阻塞在 wait
,則會在本次生產好後通過 _buffers_not_empty_cond.notify_one();
喚醒消費者執行緒。