[C++]生產消費模型
阿新 • • 發佈:2021-01-06
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。
github原始碼路徑:https://github.com/dangwei-90/Design-Mode
// 此檔案包含 "main" 函式。程式執行將在此處開始並結束。 // // 參考大話設計模式 - 生產消費模型 #include <iostream> #include <vector> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> #ifndef SAFE_DELETE #define SAFE_DELETE(p) { if(p){delete(p); (p)=NULL;} } #endif static const int g_item_queue_size_rel = 3; // 佇列實際大小 static const int g_item_queue_size = g_item_queue_size_rel + 1; // 用於判斷的佇列滿的邏輯上限 static const int g_item_to_produce = 10; // 待生產數量 std::mutex g_mutex; // 用於多執行緒同步輸出,防止日誌混亂 struct ItemQueue { int item_buffer[g_item_queue_size]; // 生產消費緩衝區,生產的資料存到此處,消費者從此處拿資料。 size_t read_position; // 標記位,消費者從陣列中獲取資料的位置 size_t write_position; // 標記位,生產者生產的資料放到陣列中 std::mutex queue_mtx; // 操作緩衝區的鎖,防止緩衝區資料混亂 std::condition_variable queue_not_full; // 條件變數,表示快取區是否已滿 std::condition_variable queue_not_empty; // 條件變數,表示緩衝區是否為空 } g_item_queue; // 生成產品的實現 void ProductItem(ItemQueue* iq, int item) { std::unique_lock<std::mutex> queue_lock(iq->queue_mtx); // 判斷 queue 是否已滿,如果已滿,則 while 等待 while (((iq->write_position + 1) % g_item_queue_size) == iq->read_position) { { std::lock_guard<std::mutex> lock(g_mutex); std::cout << "緩衝區滿,等待..." << std::endl; } (iq->queue_not_full).wait(queue_lock); } // queue 不為空,繼續寫入產品 (iq->item_buffer)[iq->write_position] = item; // 寫入標記位加1 (iq->write_position)++; if (iq->write_position == g_item_queue_size) { // 已滿,設定寫入位置為初始位置 iq->write_position = 0; } (iq->queue_not_empty).notify_all(); // 通知消費者,佇列不為空 queue_lock.unlock(); } // 消費產品的實現 int ConsumeItem(ItemQueue* iq) { int data = -1; std::unique_lock<std::mutex> queue_lock(iq->queue_mtx); // 判斷 queue 是否為空,空的時候,等待 while (iq->read_position == iq->write_position) { { std::lock_guard<std::mutex> lock(g_mutex); std::cout << "緩衝區空,等待..." << std::endl; } (iq->queue_not_empty).wait(queue_lock); } data = (iq->item_buffer)[iq->read_position]; // 讀取產品 (iq->read_position)++; if (iq->read_position == g_item_queue_size) { iq->read_position = 0; } (iq->queue_not_full).notify_all(); queue_lock.unlock(); return data; } // 生產者任務 void ProducerTask() { for (int n = 1; n <= g_item_to_produce; n++) { ProductItem(&g_item_queue, n); { std::lock_guard<std::mutex> lock(g_mutex); std::cout << "生成產品:" << n << " " << std::endl; } } } void ConsumerTask() { static int consume_count = 0; while (1) { std::this_thread::sleep_for(std::chrono::seconds(1)); int item = ConsumeItem(&g_item_queue); { std::lock_guard<std::mutex> lock(g_mutex); std::cout << "消費產品:" << item << " " << std::endl; } if (++consume_count == g_item_to_produce) { // 滿足消費者數量後,退出 break; } } } void InitItemQueue(ItemQueue* iq) { iq->write_position = 0; iq->write_position = 0; } int main() { InitItemQueue(&g_item_queue); std::thread producer_thread(ProducerTask); std::thread consumer_thread(ConsumerTask); producer_thread.join(); consumer_thread.join(); return 0; }