1. 程式人生 > >一份C++執行緒池的程式碼,非常實用

一份C++執行緒池的程式碼,非常實用

#ifndef _ThreadPool_H_
#define _ThreadPool_H_
#pragma warning(disable: 4530)
#pragma warning(disable: 4786)
#include <cassert>
#include <vector>
#include <queue>
#include <windows.h>

using namespace std;

class ThreadJob  //工作基類
{
public:
 //供執行緒池呼叫的虛擬函式
 virtual void DoJob(void *pPara) = 0;
};

class ThreadPool
{
public:
 //dwNum 執行緒池規模
 ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
 {
  InitializeCriticalSection(&_csThreadVector);
  InitializeCriticalSection(&_csWorkQueue);
  _EventComplete = CreateEvent(0, false, false, NULL);
  _EventEnd = CreateEvent(0, true, false, NULL);
  _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
  _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
  assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
  assert(_EventComplete != INVALID_HANDLE_VALUE);
  assert(_EventEnd != INVALID_HANDLE_VALUE);
  assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
  AdjustSize(dwNum <= 0 ? 4 : dwNum);
 }


 ~ThreadPool()
 {
  DeleteCriticalSection(&_csWorkQueue);
  CloseHandle(_EventEnd);
  CloseHandle(_EventComplete);
  CloseHandle(_SemaphoreCall);
  CloseHandle(_SemaphoreDel);

  vector<ThreadItem*>::iterator iter;
  for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
  {
   if(*iter)
    delete *iter;
  }


  DeleteCriticalSection(&_csThreadVector);
 }
 //調整執行緒池規模
 int AdjustSize(int iNum)
 {
  if(iNum > 0)
  {
   ThreadItem *pNew;
   EnterCriticalSection(&_csThreadVector);
   for(int _i=0; _i<iNum; _i++)
   {
    _ThreadVector.push_back(pNew = new ThreadItem(this));
    assert(pNew);
    pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    // set priority
    SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
    assert(pNew->_Handle);
   }
   LeaveCriticalSection(&_csThreadVector);
  }
  else
  {
   iNum= -1;
   ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
  }
  return (int)_lThreadNum;
 }
 //呼叫執行緒池
 void Call(void (*pFunc)(void  *), void *pPara = NULL)
 {
  assert(pFunc);
  EnterCriticalSection(&_csWorkQueue);
  _JobQueue.push(new JobItem(pFunc, pPara));
  LeaveCriticalSection(&_csWorkQueue);
  ReleaseSemaphore(_SemaphoreCall, 1, NULL);
 }
 //呼叫執行緒池
 inline void Call(ThreadJob * p, void *pPara = NULL)
 {
  Call(CallProc, new CallProcPara(p, pPara));
 }
 //結束執行緒池, 並同步等待
 bool EndAndWait(DWORD dwWaitTime = INFINITE)
 {
  SetEvent(_EventEnd);
  return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
 }
 //結束執行緒池
 inline void End()
 {
  SetEvent(_EventEnd);
 }
 inline DWORD Size()
 {
  return (DWORD)_lThreadNum;
 }
 inline DWORD GetRunningSize()
 {
  return (DWORD)_lRunningNum;
 }
 bool IsRunning()
 {
  return _lRunningNum > 0;
 }
protected:
 //工作執行緒
 static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
 {
  ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
  assert(pThread);
  ThreadPool *pThreadPoolObj = pThread->_pThis;
  assert(pThreadPoolObj);
  InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
  HANDLE hWaitHandle[3];
  hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
  hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
  hWaitHandle[2] = pThreadPoolObj->_EventEnd;
  JobItem *pJob;
  bool fHasJob;

  for(;;)
  {
   DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
   //響應刪除執行緒訊號
   if(wr == WAIT_OBJECT_0 + 1) 
    break;

   //從佇列裡取得使用者作業
   EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
   if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
   {
    pJob = pThreadPoolObj->_JobQueue.front();
    pThreadPoolObj->_JobQueue.pop();
    assert(pJob);
   }
   LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
   //受到結束執行緒訊號 確定是否結束執行緒(結束執行緒訊號 && 是否還有工作)
   if(wr == WAIT_OBJECT_0 + 2 && !fHasJob) 
    break;
   if(fHasJob && pJob)
   {
    InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    pThread->_dwLastBeginTime = GetTickCount();
    pThread->_dwCount++;
    pThread->_fIsRunning = true;
    pJob->_pFunc(pJob->_pPara); //執行使用者作業
    delete pJob;
    pThread->_fIsRunning = false;
    InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
   }
  }
  //刪除自身結構
  EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
  pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
  LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
  delete pThread;
  InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
  if(!pThreadPoolObj->_lThreadNum)  //所有執行緒結束
   SetEvent(pThreadPoolObj->_EventComplete);
  return 0;
 }
 //呼叫使用者物件虛擬函式
 static void CallProc(void *pPara)
 {
  CallProcPara *cp = static_cast<CallProcPara *>(pPara);
  assert(cp);
  if(cp)
  {
   cp->_pObj->DoJob(cp->_pPara);
   delete cp;
  }
 }
 //使用者物件結構
 struct CallProcPara 
 {
  ThreadJob* _pObj;//使用者物件
  void *_pPara;//使用者引數
  CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
 };
 //使用者函式結構
 struct JobItem
 {
  void (*_pFunc)(void  *);//函式
  void *_pPara; //引數
  JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
 };
 //執行緒池中的執行緒結構
 struct ThreadItem
 {
  HANDLE _Handle; //執行緒控制代碼
  ThreadPool *_pThis;  //執行緒池的指標
  DWORD _dwLastBeginTime; //最後一次執行開始時間
  DWORD _dwCount; //執行次數
  bool _fIsRunning;
  ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
  ~ThreadItem()
  {
   if(_Handle)
   {
    CloseHandle(_Handle);
    _Handle = NULL;
   }
  }
 };

 std::queue<JobItem *> _JobQueue;  //工作佇列
 std::vector<ThreadItem *>  _ThreadVector; //執行緒資料
 CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作佇列臨界, 執行緒資料臨界
 HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//結束通知, 完成事件, 工作訊號, 刪除執行緒訊號
 long _lThreadNum, _lRunningNum; //執行緒數, 執行的執行緒數
};
#endif //_ThreadPool_H_

#include <iostream>
#include "head.h"
using namespace std;

class CMyThreadJob
   :public ThreadJob
{
public:
 void DoJob(void *pPara)
 {
        cout<<"我的執行緒工作物件,列印資料: "<< *((int*)pPara)<<endl;
 }
};

void threadfunc(void *p)
{
  cout<<"我的執行緒函式,列印資料: "<< *((int*)p)<<endl;
}

int main()
{

 //ThreadPool tp;
 //for(int i=0; i<100; i++)
 // tp.Call(threadfunc);

 ThreadPool tp(20);//20為初始執行緒池規模

 int* pData = new int(1011);
 tp.Call(threadfunc, pData);

 int* pData2 = new int(8888);
 CMyThreadJob myjob;
 tp.Call(&myjob, pData2);

    Sleep(10000);
 return 0;
}

基本上是拿來就用了,對執行緒池的邏輯比較熟的,認為這個執行緒池寫得很清晰,我拿來用在一個多執行緒下載的模組中。很實用的東東。 呼叫方法 void threadfunc(void *p) {      YourClass* yourObject = (YourClass*)    p;

//...
}
ThreadPool tp;
for(i=0; i<100; i++)
  tp.Call(threadfunc);

ThreadPool tp(20);//20為初始執行緒池規模

tp.Call(threadfunc, lpPara);
    

使用時注意幾點: 1. ThreadJob  沒什麼用,直接寫執行緒函式吧。 2. 執行緒函式(threadfunc)的入口引數void* 可以轉成自定義的型別物件,這個物件可以記錄下執行緒執行中的資料,並設定執行緒當前狀態,以此與執行緒進行互動。 3. 執行緒池有一個EndAndWait函式,用於讓執行緒池中所有計算正常結束。有時執行緒池中的一個執行緒可能要執行很長時間,怎麼辦?可以通過執行緒函式threadfunc的入口引數物件來處理,比如: class YourClass {   int cmd; // cmd = 1是上執行緒停止計算,正常退出。 }; threadfunc(void* p) {   YourClass* yourObject = (YourClass*)p;   while (true) {     // do some calculation     if (yourClass->cmd == 1)       break;   } } 在主執行緒中設定yourClass->cmd = 1,該執行緒就會自然結束。 很簡潔通用的執行緒池實現。

相關推薦

轉載C++執行程式碼非常實用

基本上是拿來就用了,對WIN32 API不熟,但對執行緒池的邏輯還是比較熟的,認為這個執行緒池寫得很清晰,我拿來用在一個多執行緒下載的模組中。很實用的東東。 呼叫方法 void threadfunc(void *p) {      YourClass* your

C++執行程式碼非常實用

#ifndef _ThreadPool_H_ #define _ThreadPool_H_ #pragma warning(disable: 4530) #pragma warning(disable: 4786) #include <cassert> #in

【小家Java】次Java執行誤用(newFixedThreadPool)引發的線上血案和總結

相關閱讀 【小家java】java5新特性(簡述十大新特性) 重要一躍 【小家java】java6新特性(簡述十大新特性) 雞肋升級 【小家java】java7新特性(簡述八大新特性) 不溫不火 【小家java】java8新特性(簡述十大新特性) 飽受讚譽 【小家java】java9

執行的建立與執行ThreadPoolExecutorExecutors

                    執行緒的建立與執行緒池及執行緒池工具類 1.執行緒的建立方式 1.1繼承Thread類重寫run方法 public class Test { p

乾貨!執行+CountDownLatch實現 多執行併發計算、彙總

目錄結構 抽象類:求和器 單執行緒 求和器 VS 多執行緒 求和器 1)執行緒池 多個執行緒 一起併發執行,效能很生猛 2)CountDownLatch 主執行緒 使用 latch.await() 阻塞住,直到所有 子任務 都執行完畢了

Java程式設計師拿著阿里offer去頭條面試卻被執行絆倒難受!

之前有程式設計師網友在牛客網發表了自己在頭條的面試經驗和過程,小編拿過來和大夥分享下。 一面考演算法:兩個基礎題目,思路不難,考基本功,一個是連結串列相加,思路就是反轉 然後求和,另一個是多個有序陣列 歸併。 二面考應用和知識面:內容涉及 mysql 引擎,索引(mysql 這塊一

Java併發(二十):執行實現原理 Java併發(十八):阻塞佇列BlockingQueue Java併發(十八):阻塞佇列BlockingQueue Java併發程式設計:執行的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

Java併發(二十):執行實現原理

Java併發(二十一):執行緒池實現原理 一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) E

常用執行程式碼

執行緒池的介紹 1 常用的 池化技術 C3P0 DBCP 2 執行緒池的衍生 頻繁的建立執行緒物件和多執行緒之間進行上下文切換,是非常耗時間和資源的所以JDK1.5中提出了執行緒池技術 3 使用執行緒池 Exector 4 執行緒池的建立 1 建立一個固

C++執行實現

最近讀了muduo的原始碼,看了一下其中執行緒池的是實現。其中互斥量、條件變數都是庫裡面自己封裝的,正好現在C++標準庫裡面有對應的類,所以就改造了一下,補充了部分註釋。同時總結了一下條件變數和鎖的使用。程式碼如下: ThreadPool.h #pragma once #incl

簡單Linux C執行

#include "threadpool.h"struct threadpool* threadpool_init(int thread_num, int queue_max_num){struct threadpool *pool = NULL;do { pool = malloc

白話跨平臺C++執行實現

執行緒池在一個C++專案中是必不可少的。去看任何一個C++開發框架,絕大部分都會實現一個執行緒池。而如今C++11已經成熟,藉助C++標準庫中的執行緒庫std::thread,以及標準庫提供的多執行緒同步神器std::condition_variable(這個已

Linux C++ 執行

標頭檔案定義: 話不多說,先上程式碼: 注意:本文程式碼來源於https://github.com/progschj/ThreadPool.git 不需要註釋,想要乾淨程式碼的可以直接上GitHub去找資源 標頭檔案 執行緒池類程式碼 /* * Date : 2018/7/24 * Aut

OKHttp 3.10原始碼解析():執行和任務佇列

OKhttp是Android端最火熱的網路請求框架之一,它以高效的優點贏得了廣大開發者的喜愛,下面是OKhttp的主要特點: 1.支援HTTPS/HTTP2/WebSocket 2.內部維護執行緒池佇列,提高併發訪問的效率 3.內部維護連線池,支援多路複用,減少連線建立開銷 4.

java執行技術經典易懂

1.為什麼要使用執行緒池 在java中,如果每個請求到達就建立一個新執行緒,開銷是相當大的。在實際使用中,伺服器在建立和銷燬執行緒上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的使用者請求的時間和資源要多的多。除了建立和銷燬執行緒的開銷之外,活動的執行緒也需要

動態修改執行併發執行數

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import java.util.Map; import static java.lang.Thread.sleep; p

用Python BeautifulSoup寫的執行圖片抓取的指令碼

最近一個做設計的妹子需要從一個素材的網站(https://www.goodfon.su)下載各種圖片原圖作為設計的素材,但是苦於境外網站,而且只能單張下載,而且單張圖片需要兩個路徑才能到達原圖的下載地址。 幾年沒寫過Python的我決定從操就業幫她寫一份批量下載的指令碼。由於是國外網站

Java基礎複習之多執行(併發記憶體模型)

前言:畢業來北京轉眼一年半了,但是沒有太多的成績。沒有太多的記錄和沉澱。現在開始複習一下基礎的知識。涉及到的有多執行緒,集合,JVM,NIO,資料庫,作業系統。後面還是想走實時處理那一塊,可能會有那方面的研究。 多執行緒:為啥用?因為想去執行不同的任務,或利用現狀多核的處

C# 執行中取消執行的三種方式

三種方式都使用CancellationToken,只是使用方式不同,有類似於採用全域性標誌位的方式 第一種 檢測IsCancellationRequested方式 static void AsyncOperation1(CancellationToken t

執行executorexecutorServiceThreadPoolExecutor

Executor是一個介面,他是Executor框架的基礎,它將任務的提交與任務的執行分離。  Executor介面中之定義了一個方法execute(Runnable command),該方法接收一個Runable例項,它用來執行一個任務,任務即一個實現了Runnable介面