1. 程式人生 > >C++建立執行緒池

C++建立執行緒池

池:

執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,然後在建立執行緒後自動啟動這些任務。執行緒池執行緒都是後臺執行緒。每個執行緒都使用預設的堆疊大小,以預設的優先順序執行,並處於多執行緒單元中。如果某個執行緒在託管程式碼中空閒(如正在等待某個事件),則執行緒池將插入另一個輔助執行緒來使所有處理器保持繁忙。如果所有執行緒池執行緒都始終保持繁忙,但佇列中包含掛起的工作,則執行緒池將在一段時間後建立另一個輔助執行緒但執行緒的數目永遠不會超過最大值。超過最大值的執行緒可以排隊,但他們要等到其他執行緒完成後才啟動。

一  定義自己的執行緒

#pragma once
#ifndef MYTHREAD
#define MYTHREAD
#include <iostream>
#include <windows.h>
//執行緒要做的事情
typedef unsigned int(*jobFunction)(int num,WPARAM wParam, LPARAM lParam);
//執行緒完成後回撥
typedef void(*jobCallback)(unsigned int pResult);
class MyThread
{
public:
	MyThread();
	~MyThread();
	bool isWorking();//是否在工作
	void doJob(jobFunction jobPrac, int num,WPARAM wParam, LPARAM lParam, jobCallback cb);   //指派任務
private:
	bool bIsWorking;
	jobFunction m_jobFunc;
	jobCallback m_jobCallback;
	int num;
	WPARAM wParam;
	LPARAM lParam;
	HANDLE m_pHread;
	void jobDone();// 執行緒掛起
	static DWORD WINAPI threadProc(LPARAM lParam);   //真正執行緒
};
#endif // MYTHREAD
#include "Mythread.h"
MyThread::MyThread()
{
	 bIsWorking = false;
	 m_jobFunc = 0;
	 m_jobCallback = 0;
	 num = 0;
	 wParam = 0;
	 lParam = 0;
	//m_pHread = INVALID_HANDLE_VALUE;
	 //建立執行緒
	m_pHread = CreateThread(0,0,(LPTHREAD_START_ROUTINE)threadProc,this,0,0); 
	printf("new thread %08x\n",m_pHread);
}
MyThread::~MyThread()
{
	//if (m_pHread != INVALID_HANDLE_VALUE)
	//{
	//	TerminateThread(m_pHread,0); //強制自殺
	//}
		if (m_pHread != INVALID_HANDLE_VALUE)
		{
			m_jobCallback = (jobCallback)INVALID_HANDLE_VALUE;
			m_jobFunc = (jobFunction)INVALID_HANDLE_VALUE;
			ResumeThread(m_pHread);
			WaitForSingleObject(m_pHread,INFINITE);
			CloseHandle(m_pHread);
		}
}
bool MyThread::isWorking()
{
	return this->bIsWorking;
}
void MyThread::jobDone()
{
	this->bIsWorking = false;
	SuspendThread(m_pHread);  //休眠
	//printf("thread %08x job done\n",m_pHread);
}
/*
執行一個具體的任務
*/
void MyThread::doJob(jobFunction jobPrac,int num, WPARAM wParam, LPARAM lParam, jobCallback cb)
{
	this->num = num;
	this->m_jobCallback = cb;
	this->m_jobFunc = jobPrac;
	this->wParam = wParam;
	this->lParam = lParam;
	ResumeThread(m_pHread);  //ResumeThread()使用該函式能夠啟用執行緒的執行,
							 //使CPU分配資源讓執行緒恢復執行。
	printf("thread %08x start work..num %d wParam %d lParam  %d\n",m_pHread, num,wParam, lParam);
}
/*
真正的執行緒函式
*/
DWORD WINAPI MyThread::threadProc(LPARAM lParam)
{
	MyThread *pthis = (MyThread *)lParam;
	while (true)
	{  //因為執行緒在建構函式中申請的,但是申請的時候沒有具體的任務
		if (pthis->m_jobCallback == INVALID_HANDLE_VALUE || pthis->m_jobFunc == INVALID_HANDLE_VALUE)
		{
			printf("thread %08x see byebye",pthis->m_pHread);
			break;
		}
		if (pthis->m_jobCallback == 0 || pthis->m_jobFunc == 0)
		{
			pthis->jobDone();
			//return;   不能直接return
			//continue;
		}
		pthis->bIsWorking = true;
		unsigned int result = pthis->m_jobFunc(pthis->num,pthis->wParam,pthis->lParam);
		printf("thread %08x job result %d\n",pthis->m_pHread,result);
		pthis->m_jobCallback(result);
		pthis->jobDone();
	}
	return 0;
}

二 建立執行緒池 

#pragma once
#include <vector>
#include "Mythread.h"
using namespace std;
#ifndef MYPOOL
#define MYPOOL

/*
池的概念:預先從作業系統申請大片資源,然後使用後不還給系統,保持複用
優點:避免頻繁從應用層切換到核心<作業系統申請資源需要嵌入核心當中去>
*/

/*
執行緒池管理類,需要作為一個管理者,管理N個執行緒
*/
class MyPool
{
public:
	MyPool(int nSize);
	~MyPool();
	bool pushJob(jobFunction jobPrac, int num,WPARAM wParam, LPARAM lParam, jobCallback cb);
	int getPoolSize();
private:
	vector<MyThread *>m_threadVector;
};

#endif // !MYPOOL
#include "Mypool.h"

//在構造在初始化幾個執行緒
MyPool::MyPool(int nSize)
{
	m_threadVector.clear();
	for (int i = 0; i < nSize; ++i)
	{
		MyThread *tmp = new MyThread();
		m_threadVector.push_back(tmp);
	}
}
MyPool::~MyPool()
{
	vector<MyThread *>::iterator iter = m_threadVector.begin();
	for (; iter != m_threadVector.end(); iter++)
	{
		MyThread *tmp = *iter;
		delete tmp;
	}
}

bool MyPool::pushJob(jobFunction jobPrac, int num,WPARAM wParam, LPARAM lParam, jobCallback cb)
{
	vector<MyThread *>::iterator iter = m_threadVector.begin();
	for (;iter != m_threadVector.end();iter++)
	{
		MyThread *tmp = *iter;
		if (!tmp->isWorking())
		{
			tmp->doJob(jobPrac, num,wParam, lParam, cb);
			return true;
		}
	}
	MyThread *tmp = new MyThread();
	m_threadVector.push_back(tmp);
	tmp->doJob(jobPrac,num, wParam, lParam, cb);
	return true;
}

int MyPool::getPoolSize()
{
	return m_threadVector.size();
}

 三 啟動執行緒池

#include <mutex>
#include "Mypool.h"

//typedef unsigned int(*jobFunction)(WPARAM wParam, LPARAM lParam);
//typedef void(*jobCallback)(unsigned int pResult);
mutex m;
unsigned int job(WPARAM wParam, LPARAM lParam)
{
	m.lock();
	printf("job doing :%d + %d = ?\n", wParam, lParam);
	//Sleep(500);
	return lParam + wParam;
	m.unlock();
}
unsigned int job1(int num,WPARAM wParam, LPARAM lParam)
{
	printf("job doing :1+2+3+...+%d= ?\n", num);
	return num *(num -1)/2;
}
void cb(unsigned int pResult)
{
	printf("job result:%d\n",pResult);
}
int main()
{
	MyPool *pool = new MyPool(2);
	while (true)
	{
		char cmd = getchar();
		if (cmd == 'q' || cmd == 'Q')
		{
			break;
		}
		printf("thread pool size :%d\n",pool->getPoolSize());
		pool->pushJob(job1,100,0,0,cb);
	}
	printf("process exit\n");
	return 0;
}