Linux C/C++多執行緒學習:生產者消費者問題
阿新 • • 發佈:2019-02-05
#include <iostream> #include <mutex> #include <condition_variable> #include <unistd.h> #include <thread> using namespace std; const int kProduceItems = 10; const int kRepositorySize = 4; template<class T> class Repository { public: T items_buff[kRepositorySize]; mutex mtx; // 生產者消費者互斥量 mutex produce_mutex; // 生產計數互斥量 mutex consume_mutex; // 消費計數互斥量 size_t produce_item_count; size_t consume_item_count; size_t produce_position; // 下一個生產的位置 size_t consume_position; // 下一個消費的位置 condition_variable repo_not_full; // 倉庫不滿條件變數 condition_variable repo_not_empty; // 倉庫不空條件變數 Repository() { produce_item_count = 0; consume_item_count = 0; produce_position = 0; consume_position = 0; }; void Init() { fill_n(items_buff, sizeof(items_buff)/sizeof(items_buff[0]), 0); produce_item_count = 0; consume_item_count = 0; produce_position = 0; consume_position = 0; } }; template<class T> class Factory { private: Repository<T> repo; void ProduceItem(T item) { unique_lock<mutex> lock(repo.mtx); // +1 後判斷,因為在初始時,兩者位於同一位置(因此倉庫中最大存在 kRepositorySize-1 個產品) while ((repo.produce_position+1) % kRepositorySize == repo.consume_position) { cout << "Repository is full, waiting..." << endl; (repo.repo_not_full).wait(lock); // 阻塞時釋放鎖,被喚醒時獲得鎖 } repo.items_buff[repo.produce_position++] = item; if (repo.produce_position == kRepositorySize) repo.produce_position = 0; (repo.repo_not_empty).notify_all(); // 喚醒所有因空阻塞的程序 lock.unlock(); } T ConsumeItem() { unique_lock<mutex> lock(repo.mtx); while (repo.consume_position == repo.produce_position) { cout << "Repository is empty, waiting ..." << endl; (repo.repo_not_empty).wait(lock); } T data = repo.items_buff[repo.consume_position++]; if (repo.consume_position == kRepositorySize) repo.consume_position = 0; (repo.repo_not_full).notify_all(); lock.unlock(); return data; } public: void Reset() { repo.Init(); } void ProduceTask() { bool ready_to_exit = false; while (true) { sleep(1); // 如果不sleep ,執行太快,一個程序會完成所有生產 unique_lock<mutex> lock(repo.produce_mutex); if (repo.produce_item_count < kProduceItems) { ++(repo.produce_item_count); T item = repo.produce_item_count; cout << "producer id: "<< this_thread::get_id() << " is producing " << item << "^th item..." << endl; ProduceItem(item); } else { ready_to_exit = true; } lock.unlock(); // sleep(1); if (ready_to_exit) break; } printf("Producer thread %lld is exiting...\n", std::this_thread::get_id()); } void ConsumeTask() { bool ready_to_exit =false; while (true) { sleep(1); // 如果不sleep ,執行太快,一個程序會消費所有產品 unique_lock<mutex> lock(repo.consume_mutex); if (repo.consume_item_count < kProduceItems) { T item = ConsumeItem(); cout << "consumer id: " << this_thread::get_id() << " is consuming " << item << "^th item" << endl; ++(repo.consume_item_count); } else { ready_to_exit = true; } lock.unlock(); // sleep(1); if (ready_to_exit) break; } printf("Consumer thread %lld is exiting...\n", std::this_thread::get_id()); } }; int main() { cout << "Main thread id :" << this_thread::get_id() << endl; Factory<int> myfactory; thread producer1(&Factory<int>::ProduceTask, &myfactory); thread producer2(&Factory<int>::ProduceTask, &myfactory); thread producer3(&Factory<int>::ProduceTask, &myfactory); thread consumer1(&Factory<int>::ConsumeTask, &myfactory); thread consumer2(&Factory<int>::ConsumeTask, &myfactory); thread consumer3(&Factory<int>::ConsumeTask, &myfactory); producer1.join(); producer2.join(); producer3.join(); consumer1.join(); consumer2.join(); consumer3.join(); return 0; }