1. 程式人生 > >實現一個生產者消費者佇列

實現一個生產者消費者佇列

題目:實現一個佇列,佇列的應用場景為: 

一個生產者執行緒將int型別的數入列,一個消費者執行緒將int型別的數出列 。

先做一個執行緒同步方法的概述:

有以下幾種方式保持執行緒同步:

臨界區: 臨界區(Critical Section)是一段獨佔對某些共享資源訪問的程式碼,在任意時刻只允許一個執行緒對共享資源進行訪問。如果有多個執行緒試圖同時訪問臨界區,那麼在有一個執行緒進入後其他所有試圖訪問此臨界區的執行緒將被掛起,並一直持續到進入臨界區的執行緒離開。臨界區在被釋放後,其他執行緒可以繼續搶佔,並以此達到用原子方式操作共享資源的目的。 管理事件核心物件: 事件核心物件也可以通過通知操作的方式來保持執行緒的同步。 訊號量核心物件:
訊號量(Semaphore)核心物件對執行緒的同步方式與前面幾種方法不同,它允許多個執行緒在同一時刻訪問同一資源,但是需要限制在同一時刻訪問此資源的最大執行緒數目。在用CreateSemaphore()建立訊號量時即要同時指出允許的最大資源計數和當前可用資源計數。一般是將當前可用資源計數設定為最大資源計數,每增加一個執行緒對共享資源的訪問,當前可用資源計數就會減1,只要當前可用資源計數是大於0的,就可以發出訊號量訊號。但是當前可用計數減小到0時則說明當前佔用資源的執行緒數已經達到了所允許的最大數目,不能在允許其他執行緒的進入,此時的訊號量訊號將無法發出。執行緒在處理完共享資源後,應在離開的同時通過ReleaseSemaphore()函式將當前可用資源計數加1。在任何時候當前可用資源計數決不可能大於最大資源計數。
WaitForSingleObject();//等待訊號量有效
CreatSemaphore();//申請訊號量
OpenSemaphore();//開啟訊號量
ReleaseSemaphore();//釋放訊號量


互斥核心物件:
互斥(Mutex)是一種用途非常廣泛的核心物件。能夠保證多個執行緒對同一共享資源的互斥訪問。同臨界區有些類似,只有擁有互斥物件的執行緒才具有訪問資源的許可權,由於互斥物件只有一個,因此就決定了任何情況下此共享資源都不會同時被多個執行緒所訪問。當前佔據資源的執行緒在任務處理完後應將擁有的互斥物件交出,以便其他執行緒在獲得後得以訪問資源。與其他幾種核心物件不同,互斥物件在作業系統中擁有特殊程式碼,並由作業系統來管理,作業系統甚至還允許其進行一些其他核心物件所不能進行的非常規操作。

下面用的是訊號量方法,程式碼如下:

#include<iostream>
#include<process.h>
#include<queue>
#include<Windows.h>
using namespace std;
HANDLE g_hSemaphore=NULL;
const int g_Max=50;
queue<int> g_queuePV;

//生產者執行緒
unsigned int __stdcall ProducerThread(void* pParam)
{
int n=0;
while(++n<=g_Max)
{
	g_queuePV.push(n);
	cout<<"Produce"<<n<<endl;
	ReleaseSemaphore(g_hSemaphore,1,NULL);
	Sleep(300);
}
return 0;
}
//消費者執行緒
unsigned int __stdcall CustomerThread(void* pParam)
{
int n=g_Max;
while(n--)
{
WaitForSingleObject(g_hSemaphore,10000);
queue<int>::size_type iVal=g_queuePV.front();
g_queuePV.pop();
cout<<"custom"<<iVal<<endl;
Sleep(500);//此處消費者消費執行緒後,讓它休息一下,否則生產者生產的再多,也會被一下子消費完
}
cout<<"working end."<<endl;
return 0;
}
void PVOperationGo()
{
	g_hSemaphore=CreateSemaphore(NULL,0,g_Max,NULL);//訊號量來維護執行緒同步
	if(NULL==g_hSemaphore)
	return ;
	cout<<"working start..."<<endl;
	HANDLE aryhPV[2];
	aryhPV[0]=(HANDLE)_beginthreadex(NULL,0,ProducerThread,NULL,0,NULL);
	aryhPV[1]=(HANDLE)_beginthreadex(NULL,0,CustomerThread,NULL,0,NULL);
	WaitForMultipleObjects(2,aryhPV,TRUE,INFINITE);
	CloseHandle(g_hSemaphore);
	
}
void main()
{
	PVOperationGo();
	getchar();
}