綜合運用: C++11 多線程下生產者消費者模型詳解(轉)
阿新 • • 發佈:2017-06-12
並發 rep 生產 我會 交流 模型 操作 const ref
生產者消費者問題是多線程並發中一個非常經典的問題,相信學過操作系統課程的同學都清楚這個問題的根源。本文將就四種情況分析並介紹生產者和消費者問題,它們分別是:單生產者-單消費者模型,單生產者-多消費者模型,多生產者-單消費者模型,多生產者-多消費者模型,我會給出四種情況下的 C++11 並發解決方案,如果文中出現了錯誤或者你對代碼有異議,歡迎交流 ;-)。
單生產者-單消費者模型
顧名思義,單生產者-單消費者模型中只有一個生產者和一個消費者,生產者不停地往產品庫中放入產品,消費者則從產品庫中取走產品,產品庫容積有限制,只能容納一定數目的產品,如果生產者生產產品的速度過快,則需要等待消費者取走產品之後,產品庫不為空才能繼續往產品庫中放置新的產品,相反,如果消費者取走產品的速度過快,則可能面臨產品庫中沒有產品可使用的情況,此時需要等待生產者放入一個產品後,消費者才能繼續工作。C++11實現單生產者單消費者模型的代碼如下:
1 #include <unistd.h> 2 3 #include <cstdlib> 4 #include <condition_variable> 5 #include <iostream> 6 #include <mutex> 7 #include <thread> 8 9 static const int kItemRepositorySize = 10; // Item buffer size. 10 static const int kItemsToProduce = 1000; // How many items we plan to produce.11 12 struct ItemRepository { 13 int item_buffer[kItemRepositorySize]; // 產品緩沖區, 配合 read_position 和 write_position 模型環形隊列. 14 size_t read_position; // 消費者讀取產品位置. 15 size_t write_position; // 生產者寫入產品位置. 16 std::mutex mtx; // 互斥量,保護產品緩沖區 17 std::condition_variable repo_not_full; // 條件變量, 指示產品緩沖區不為滿.18 std::condition_variable repo_not_empty; // 條件變量, 指示產品緩沖區不為空. 19 } gItemRepository; // 產品庫全局變量, 生產者和消費者操作該變量. 20 21 typedef struct ItemRepository ItemRepository; 22 23 24 void ProduceItem(ItemRepository *ir, int item) 25 { 26 std::unique_lock<std::mutex> lock(ir->mtx); 27 while(((ir->write_position + 1) % kItemRepositorySize) 28 == ir->read_position) { // item buffer is full, just wait here. 29 std::cout << "Producer is waiting for an empty slot...\n"; 30 (ir->repo_not_full).wait(lock); // 生產者等待"產品庫緩沖區不為滿"這一條件發生. 31 } 32 33 (ir->item_buffer)[ir->write_position] = item; // 寫入產品. 34 (ir->write_position)++; // 寫入位置後移. 35 36 if (ir->write_position == kItemRepositorySize) // 寫入位置若是在隊列最後則重新設置為初始位置. 37 ir->write_position = 0; 38 39 (ir->repo_not_empty).notify_all(); // 通知消費者產品庫不為空. 40 lock.unlock(); // 解鎖. 41 } 42 43 int ConsumeItem(ItemRepository *ir) 44 { 45 int data; 46 std::unique_lock<std::mutex> lock(ir->mtx); 47 // item buffer is empty, just wait here. 48 while(ir->write_position == ir->read_position) { 49 std::cout << "Consumer is waiting for items...\n"; 50 (ir->repo_not_empty).wait(lock); // 消費者等待"產品庫緩沖區不為空"這一條件發生. 51 } 52 53 data = (ir->item_buffer)[ir->read_position]; // 讀取某一產品 54 (ir->read_position)++; // 讀取位置後移 55 56 if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最後,則重新置位. 57 ir->read_position = 0; 58 59 (ir->repo_not_full).notify_all(); // 通知消費者產品庫不為滿. 60 lock.unlock(); // 解鎖. 61 62 return data; // 返回產品. 63 } 64 65 66 void ProducerTask() // 生產者任務 67 { 68 for (int i = 1; i <= kItemsToProduce; ++i) { 69 // sleep(1); 70 std::cout << "Produce the " << i << "^th item..." << std::endl; 71 ProduceItem(&gItemRepository, i); // 循環生產 kItemsToProduce 個產品. 72 } 73 } 74 75 void ConsumerTask() // 消費者任務 76 { 77 static int cnt = 0; 78 while(1) { 79 sleep(1); 80 int item = ConsumeItem(&gItemRepository); // 消費一個產品. 81 std::cout << "Consume the " << item << "^th item" << std::endl; 82 if (++cnt == kItemsToProduce) break; // 如果產品消費個數為 kItemsToProduce, 則退出. 83 } 84 } 85 86 void InitItemRepository(ItemRepository *ir) 87 { 88 ir->write_position = 0; // 初始化產品寫入位置. 89 ir->read_position = 0; // 初始化產品讀取位置. 90 } 91 92 int main() 93 { 94 InitItemRepository(&gItemRepository); 95 std::thread producer(ProducerTask); // 創建生產者線程. 96 std::thread consumer(ConsumerTask); // 創建消費之線程. 97 producer.join(); 98 consumer.join(); 99 }
單生產者-多消費者模型
與單生產者和單消費者模型不同的是,單生產者-多消費者模型中可以允許多個消費者同時從產品庫中取走產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護消費者取走產品的計數器,代碼如下:
1 #include <unistd.h> 2 3 #include <cstdlib> 4 #include <condition_variable> 5 #include <iostream> 6 #include <mutex> 7 #include <thread> 8 9 static const int kItemRepositorySize = 4; // Item buffer size. 10 static const int kItemsToProduce = 10; // How many items we plan to produce. 11 12 struct ItemRepository { 13 int item_buffer[kItemRepositorySize]; 14 size_t read_position; 15 size_t write_position; 16 size_t item_counter; 17 std::mutex mtx; 18 std::mutex item_counter_mtx; 19 std::condition_variable repo_not_full; 20 std::condition_variable repo_not_empty; 21 } gItemRepository; 22 23 typedef struct ItemRepository ItemRepository; 24 25 26 void ProduceItem(ItemRepository *ir, int item) 27 { 28 std::unique_lock<std::mutex> lock(ir->mtx); 29 while(((ir->write_position + 1) % kItemRepositorySize) 30 == ir->read_position) { // item buffer is full, just wait here. 31 std::cout << "Producer is waiting for an empty slot...\n"; 32 (ir->repo_not_full).wait(lock); 33 } 34 35 (ir->item_buffer)[ir->write_position] = item; 36 (ir->write_position)++; 37 38 if (ir->write_position == kItemRepositorySize) 39 ir->write_position = 0; 40 41 (ir->repo_not_empty).notify_all(); 42 lock.unlock(); 43 } 44 45 int ConsumeItem(ItemRepository *ir) 46 { 47 int data; 48 std::unique_lock<std::mutex> lock(ir->mtx); 49 // item buffer is empty, just wait here. 50 while(ir->write_position == ir->read_position) { 51 std::cout << "Consumer is waiting for items...\n"; 52 (ir->repo_not_empty).wait(lock); 53 } 54 55 data = (ir->item_buffer)[ir->read_position]; 56 (ir->read_position)++; 57 58 if (ir->read_position >= kItemRepositorySize) 59 ir->read_position = 0; 60 61 (ir->repo_not_full).notify_all(); 62 lock.unlock(); 63 64 return data; 65 } 66 67 68 void ProducerTask() 69 { 70 for (int i = 1; i <= kItemsToProduce; ++i) { 71 // sleep(1); 72 std::cout << "Producer thread " << std::this_thread::get_id() 73 << " producing the " << i << "^th item..." << std::endl; 74 ProduceItem(&gItemRepository, i); 75 } 76 std::cout << "Producer thread " << std::this_thread::get_id() 77 << " is exiting..." << std::endl; 78 } 79 80 void ConsumerTask() 81 { 82 bool ready_to_exit = false; 83 while(1) { 84 sleep(1); 85 std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); 86 if (gItemRepository.item_counter < kItemsToProduce) { 87 int item = ConsumeItem(&gItemRepository); 88 ++(gItemRepository.item_counter); 89 std::cout << "Consumer thread " << std::this_thread::get_id() 90 << " is consuming the " << item << "^th item" << std::endl; 91 } else ready_to_exit = true; 92 lock.unlock(); 93 if (ready_to_exit == true) break; 94 } 95 std::cout << "Consumer thread " << std::this_thread::get_id() 96 << " is exiting..." << std::endl; 97 } 98 99 void InitItemRepository(ItemRepository *ir) 100 { 101 ir->write_position = 0; 102 ir->read_position = 0; 103 ir->item_counter = 0; 104 } 105 106 int main() 107 { 108 InitItemRepository(&gItemRepository); 109 std::thread producer(ProducerTask); 110 std::thread consumer1(ConsumerTask); 111 std::thread consumer2(ConsumerTask); 112 std::thread consumer3(ConsumerTask); 113 std::thread consumer4(ConsumerTask); 114 115 producer.join(); 116 consumer1.join(); 117 consumer2.join(); 118 consumer3.join(); 119 consumer4.join(); 120 }
多生產者-單消費者模型
與單生產者和單消費者模型不同的是,多生產者-單消費者模型中可以允許多個生產者同時向產品庫中放入產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護生產者放入產品的計數器,代碼如下:
1 #include <unistd.h> 2 3 #include <cstdlib> 4 #include <condition_variable> 5 #include <iostream> 6 #include <mutex> 7 #include <thread> 8 9 static const int kItemRepositorySize = 4; // Item buffer size. 10 static const int kItemsToProduce = 10; // How many items we plan to produce. 11 12 struct ItemRepository { 13 int item_buffer[kItemRepositorySize]; 14 size_t read_position; 15 size_t write_position; 16 size_t item_counter; 17 std::mutex mtx; 18 std::mutex item_counter_mtx; 19 std::condition_variable repo_not_full; 20 std::condition_variable repo_not_empty; 21 } gItemRepository; 22 23 typedef struct ItemRepository ItemRepository; 24 25 26 void ProduceItem(ItemRepository *ir, int item) 27 { 28 std::unique_lock<std::mutex> lock(ir->mtx); 29 while(((ir->write_position + 1) % kItemRepositorySize) 30 == ir->read_position) { // item buffer is full, just wait here. 31 std::cout << "Producer is waiting for an empty slot...\n"; 32 (ir->repo_not_full).wait(lock); 33 } 34 35 (ir->item_buffer)[ir->write_position] = item; 36 (ir->write_position)++; 37 38 if (ir->write_position == kItemRepositorySize) 39 ir->write_position = 0; 40 41 (ir->repo_not_empty).notify_all(); 42 lock.unlock(); 43 } 44 45 int ConsumeItem(ItemRepository *ir) 46 { 47 int data; 48 std::unique_lock<std::mutex> lock(ir->mtx); 49 // item buffer is empty, just wait here. 50 while(ir->write_position == ir->read_position) { 51 std::cout << "Consumer is waiting for items...\n"; 52 (ir->repo_not_empty).wait(lock); 53 } 54 55 data = (ir->item_buffer)[ir->read_position]; 56 (ir->read_position)++; 57 58 if (ir->read_position >= kItemRepositorySize) 59 ir->read_position = 0; 60 61 (ir->repo_not_full).notify_all(); 62 lock.unlock(); 63 64 return data; 65 } 66 67 void ProducerTask() 68 { 69 bool ready_to_exit = false; 70 while(1) { 71 sleep(1); 72 std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); 73 if (gItemRepository.item_counter < kItemsToProduce) { 74 ++(gItemRepository.item_counter); 75 ProduceItem(&gItemRepository, gItemRepository.item_counter); 76 std::cout << "Producer thread " << std::this_thread::get_id() 77 << " is producing the " << gItemRepository.item_counter 78 << "^th item" << std::endl; 79 } else ready_to_exit = true; 80 lock.unlock(); 81 if (ready_to_exit == true) break; 82 } 83 std::cout << "Producer thread " << std::this_thread::get_id() 84 << " is exiting..." << std::endl; 85 } 86 87 void ConsumerTask() 88 { 89 static int item_consumed = 0; 90 while(1) { 91 sleep(1); 92 ++item_consumed; 93 if (item_consumed <= kItemsToProduce) { 94 int item = ConsumeItem(&gItemRepository); 95 std::cout << "Consumer thread " << std::this_thread::get_id() 96 << " is consuming the " << item << "^th item" << std::endl; 97 } else break; 98 } 99 std::cout << "Consumer thread " << std::this_thread::get_id() 100 << " is exiting..." << std::endl; 101 } 102 103 void InitItemRepository(ItemRepository *ir) 104 { 105 ir->write_position = 0; 106 ir->read_position = 0; 107 ir->item_counter = 0; 108 } 109 110 int main() 111 { 112 InitItemRepository(&gItemRepository); 113 std::thread producer1(ProducerTask); 114 std::thread producer2(ProducerTask); 115 std::thread producer3(ProducerTask); 116 std::thread producer4(ProducerTask); 117 std::thread consumer(ConsumerTask); 118 119 producer1.join(); 120 producer2.join(); 121 producer3.join(); 122 producer4.join(); 123 consumer.join(); 124 }
多生產者-多消費者模型
該模型可以說是前面兩種模型的綜合,程序需要維護兩個計數器,分別是生產者已生產產品的數目和消費者已取走產品的數目。另外也需要保護產品庫在多個生產者和多個消費者互斥地訪問。
代碼如下:
1 #include <unistd.h> 2 3 #include <cstdlib> 4 #include <condition_variable> 5 #include <iostream> 6 #include <mutex> 7 #include <thread> 8 9 static const int kItemRepositorySize = 4; // Item buffer size. 10 static const int kItemsToProduce = 10; // How many items we plan to produce. 11 12 struct ItemRepository { 13 int item_buffer[kItemRepositorySize]; 14 size_t read_position; 15 size_t write_position; 16 size_t produced_item_counter; 17 size_t consumed_item_counter; 18 std::mutex mtx; 19 std::mutex produced_item_counter_mtx; 20 std::mutex consumed_item_counter_mtx; 21 std::condition_variable repo_not_full; 22 std::condition_variable repo_not_empty; 23 } gItemRepository; 24 25 typedef struct ItemRepository ItemRepository; 26 27 28 void ProduceItem(ItemRepository *ir, int item) 29 { 30 std::unique_lock<std::mutex> lock(ir->mtx); 31 while(((ir->write_position + 1) % kItemRepositorySize) 32 == ir->read_position) { // item buffer is full, just wait here. 33 std::cout << "Producer is waiting for an empty slot...\n"; 34 (ir->repo_not_full).wait(lock); 35 } 36 37 (ir->item_buffer)[ir->write_position] = item; 38 (ir->write_position)++; 39 40 if (ir->write_position == kItemRepositorySize) 41 ir->write_position = 0; 42 43 (ir->repo_not_empty).notify_all(); 44 lock.unlock(); 45 } 46 47 int ConsumeItem(ItemRepository *ir) 48 { 49 int data; 50 std::unique_lock<std::mutex> lock(ir->mtx); 51 // item buffer is empty, just wait here. 52 while(ir->write_position == ir->read_position) { 53 std::cout << "Consumer is waiting for items...\n"; 54 (ir->repo_not_empty).wait(lock); 55 } 56 57 data = (ir->item_buffer)[ir->read_position]; 58 (ir->read_position)++; 59 60 if (ir->read_position >= kItemRepositorySize) 61 ir->read_position = 0; 62 63 (ir->repo_not_full).notify_all(); 64 lock.unlock(); 65 66 return data; 67 } 68 69 void ProducerTask() 70 { 71 bool ready_to_exit = false; 72 while(1) { 73 sleep(1); 74 std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx); 75 if (gItemRepository.produced_item_counter < kItemsToProduce) { 76 ++(gItemRepository.produced_item_counter); 77 ProduceItem(&gItemRepository, gItemRepository.produced_item_counter); 78 std::cout << "Producer thread " << std::this_thread::get_id() 79 << " is producing the " << gItemRepository.produced_item_counter 80 << "^th item" << std::endl; 81 } else ready_to_exit = true; 82 lock.unlock(); 83 if (ready_to_exit == true) break; 84 } 85 std::cout << "Producer thread " << std::this_thread::get_id() 86 << " is exiting..." << std::endl; 87 } 88 89 void ConsumerTask() 90 { 91 bool ready_to_exit = false; 92 while(1) { 93 sleep(1); 94 std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx); 95 if (gItemRepository.consumed_item_counter < kItemsToProduce) { 96 int item = ConsumeItem(&gItemRepository); 97 ++(gItemRepository.consumed_item_counter); 98 std::cout << "Consumer thread " << std::this_thread::get_id() 99 << " is consuming the " << item << "^th item" << std::endl; 100 } else ready_to_exit = true; 101 lock.unlock(); 102 if (ready_to_exit == true) break; 103 } 104 std::cout << "Consumer thread " << std::this_thread::get_id() 105 << " is exiting..." << std::endl; 106 } 107 108 void InitItemRepository(ItemRepository *ir) 109 { 110 ir->write_position = 0; 111 ir->read_position = 0; 112 ir->produced_item_counter = 0; 113 ir->consumed_item_counter = 0; 114 } 115 116 int main() 117 { 118 InitItemRepository(&gItemRepository); 119 std::thread producer1(ProducerTask); 120 std::thread producer2(ProducerTask); 121 std::thread producer3(ProducerTask); 122 std::thread producer4(ProducerTask); 123 124 std::thread consumer1(ConsumerTask); 125 std::thread consumer2(ConsumerTask); 126 std::thread consumer3(ConsumerTask); 127 std::thread consumer4(ConsumerTask); 128 129 producer1.join(); 130 producer2.join(); 131 producer3.join(); 132 producer4.join(); 133 134 consumer1.join(); 135 consumer2.join(); 136 consumer3.join(); 137 consumer4.join(); 138 }
轉自:http://www.cnblogs.com/haippy/p/3252092.html
綜合運用: C++11 多線程下生產者消費者模型詳解(轉)