epoll+執行緒池實現高併發
版權宣告:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連結和本宣告。
本文連結:https://blog.csdn.net/qq_38506897/article/details/82863066
epoll模型是linux實現高併發的一種方法,基於事件驅動模型,相比於select/poll 模型具有更高的效率,本人對epoll模型做了一個簡易的封裝,更多的功能還在完善中,在這裡僅做學習參考用。
在epoll程式設計中,有三個非常重要的函式:
1. int epoll_create(int size) :建立epoll 控制代碼, 入參是表示監聽的數目是多大。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) : 事件註冊函式, @param1:epoll控制代碼,epoli_create()的返回值, @param2: 表示註冊的行為, 有ADD事件 、MOD事件、DEL事件, @param3: 註冊的fd,在網路程式設計中,一般為sockfd,@param4:表示事件型別,
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待事件的產生,類似於select() 呼叫。
線上程池上,之前的博文寫過一篇關於執行緒池的實現,拿來即用即可。造輪子的過程雖然很枯燥也很痛苦,但當你做一個專案用到你造的輪子時還是很有成就感的(菜鳥的成就感~)。
封裝的比較簡單,後續會完善。
#ifndef _CEVENT_H_
#define _CEVENT_H_
#include <sys/socket.h>
#include <sys/types.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#define MAX_SIZE 1024
enum EventType
{
EIN = EPOLLIN, // 讀事件
EOUT = EPOLLOUT, // 寫事件
ECLOSE = EPOLLRDHUP, // 對端關閉連線或者寫半部
EPRI = EPOLLPRI, // 緊急資料到達
EERR = EPOLLERR, // 錯誤事件
EET = EPOLLET, // 邊緣觸發
EDEFULT = EIN | ECLOSE | EERR | EET
};
class CEvent
{
public:
CEvent();
~CEvent();
int Register_event(int fd, EventType type = EDEFULT);
int unRegister_event(int fd);
void* EventHandle(void* arg);
void SetNoblocking(int v_sockfd);
private:
int epfd;
bool is_running;
pthread_t m_tid;
struct events[EPOLL_SIZE];
CThreadPoolProxy *pool;
};
CEvent::CEvent()
{
epfd = epoll_create(MAX_SIZE);
if(epfd == -1)
{
printf("epoll_create failed.");
return -1;
}
pthread_t tid = 0;
pthread_create(&tid, NULL, EventHandle, (void*)this == 0);
m_tid = tid;
//執行緒池初始化
pool = CThreadPoolProxy::instance();
}
CEvent::~CEvent()
{
if(pthread_cancel(m_tid) == 0)
{
pthread_join(m_tid, (void **)NULL);
}
}
void CEvent::SetNoblocking(int v_sockfd)
{
int opts = fcntl(v_sockfd,F_GETFL);
if(opts < 0)
{
printf("fcntl(sockfd, F_GETFL) failed.");
opts = opts|O_NONBLOCK;
}
fcntl(v_sockfd, F_SETFL, opts);
}
int CEvent::Register_event(int fd, EventType type = EDEFULT)
{
SetNoblocking(fd);
struct epoll_event ev;
ev.data.fd = fd
ev.events = type;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1)
{
printf("epoll_ctl: EPOLL_CTL_ADD failed, fd[%d].",&fd);
return -1;
}
return 0;
}
int CEvent::unRegister_event(int fd)
{
if(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
{
printf("epoll_ctl: EPOLL_CTL_DEL failed, fd[%d].",&fd);
return -1;
}
return 0;
}
void* CEvent::EventHandle(void* arg)
{
CEvent &event = *(CEvent*)arg;
while(is_running)
{
int ret = epoll_wait(event.epfd, event.events, MAX_SIZE, -1);
if(ret < 0)
{
printf("epoll_wait failed, epfd[%d]",&event.epfd);
}
for(int i=0; i<ret; i++)
{
int connfd = event.events[i].data.fd;
if(event.events[i].events & EPOLLIN)
{
CTask* ta=new CMyTask; // 具體的方法自己實現。
ta->SetConnFd(connfd);
pool->AddTask(ta);
}
}
}
}
#endif
執行緒池程式碼:
#ifndef __THREAD_H
#define __THREAD_H
/*********************
** Filename: Thread.h
** Dsricbe: 執行緒池標頭檔案
** Date: 2018.7.18
** @author: Mr.xl
***/
#include <deque>
#include <string>
#include <pthread.h>
using namespace std;
/**
* 執行任務的類,設定任務資料並執行
*/
class CTask {
protected:
string m_strTaskName; //任務的名稱
int connfd; //接收的地址
public:
CTask() = default;
CTask(string &taskName): m_strTaskName(taskName), connfd(NULL) {}
virtual int Run() = 0;
void SetConnFd(int data); //設定接收的套接字連線號。
int GetConnFd();
virtual ~CTask() {}
};
/**
* 執行緒池類的實現
*/
class CThreadPool
{
private:
static deque<CTask*> m_deqTaskList; /** 任務佇列 */
static bool shutdown; /** 執行緒退出標誌 */
int m_iThreadNum; /** 執行緒池中啟動的執行緒數 */
pthread_t *pthread_id;
static pthread_mutex_t m_pthreadMutex; /** 執行緒同步鎖 */
static pthread_cond_t m_pthreadCond; /** 執行緒同步的條件變數 */
protected:
static void* ThreadFunc(void * threadData); /** 新執行緒的執行緒回撥函式 */
static int MoveToIdle(pthread_t tid); /** 執行緒執行結束後,把自己放入到空閒執行緒中 */
static int MoveToBusy(pthread_t tid); /** 移入到忙碌執行緒中去 */
int Create(); /** 建立執行緒池中的執行緒 */
public:
CThreadPool(int threadNum = 10);
~CThreadPool();
int AddTask(CTask *task); /** 把任務新增到任務佇列中 */
int StopAll(); /** 使執行緒池中的執行緒退出 */
int getTaskSize(); /** 獲取當前任務佇列中的任務數 */
};
// 代理類,只暴露給別人用的
class CThreadPoolProxy: public CThreadPool
{
public:
static CThreadPool* instance()
{
if(NULL == m_pInstance)
{
m_pInstance = new CThreadPoolProxy;
return m_pInstance;
}
return m_pInstance;
}
int AddTask(CTask *task)
{
return m_pthreadpool->AddTask(task);
}
private:
CThreadPoolProxy()
{
m_pthreadpool = new CThreadPool(5)
}
~CThreadPoolProxy()
{
delete m_pthreadpool;
}
private:
static CThreadPoolProxy* m_pInstance;
CThreadPool* m_pthreadpool;
};
#endif
/******************
** Fliename: Thread.cpp
** Dscribe: 執行緒池實現檔案
** Date: 2018.7.18
** @author: Mr.xl
***/
#include "Thread.h"
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <deque>
void CTask::SetConnFd(int data)
{
connfd = data;
}
int CTask::GetConnFd()
{
return connfd;
}
/**
* 初始化資料
*/
deque<CTask*> CThreadPool::m_deqTaskList; //任務列表
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
/**
* 執行緒池管理類建構函式
*/
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
cout << "I will create " << threadNum << " threads" << endl;
Create(); //*建立物件時便建立執行緒。
}
CThreadPool::~CThreadPool()
{
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
}
/**
* 執行緒回撥函式
*/
void* CThreadPool::ThreadFunc(void* threadData)
{
pthread_t tid = pthread_self();
while (1)
{
//* 執行緒開啟時先上鎖 */
pthread_mutex_lock(&m_pthreadMutex);
while (m_deqTaskList.size() == 0 && !shutdown)
{
//* 沒有任務時,執行緒等待狀態(條件變數)*/
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
}
if (shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("thread %lu will exit\n", pthread_self());
pthread_exit(NULL);
}
printf("tid %lu run\n", tid);
/**
* 取任務佇列並處理之
*/
//deque<CTask*>::iterator iter = m_deqTaskList.front();
CTask* task = m_deqTaskList.front();
m_deqTaskList.pop_front();
//* 取完任務後釋放鎖*/
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); /** 執行任務 */
}
return (void*)0;
}
/**
* 往任務佇列裡邊新增任務併發出線程同步訊號
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex);
this->m_deqTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
// * 新增任務 條件變數發訊號,非阻塞 */
pthread_cond_signal(&m_pthreadCond);
return 0;
}
/**
* 建立執行緒
*/
int CThreadPool::Create()
{
pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);
for(int i = 0; i < m_iThreadNum; i++)
{
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
}
return 0;
}
/**
* 停止所有執行緒
*/
int CThreadPool::StopAll()
{
/** 避免重複呼叫 */
if (shutdown)
{
return -1;
}
printf("Now I will end all threads!!\n");
/** 喚醒所有等待執行緒,執行緒池要銷燬了 */
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
/** 阻塞等待執行緒退出,否則就成殭屍了 */
for (int i = 0; i < m_iThreadNum; i++)
{
pthread_join(pthread_id[i], NULL);
}
free(pthread_id);
pthread_id = NULL;
/** 銷燬條件變數和互斥體 */
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
/**
* 獲取當前佇列中任務數
*/
int CThreadPool::getTaskSize()
{
return m_deqTaskList.size();
}
————————————————
版權宣告:本文為CSDN博主「可樂小浣熊」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/qq_38506897/art