面向連接的Socket Server的簡單實現(簡明易懂)
http://www.cnblogs.com/worldtraveler/p/4685977.html
一、基本原理
有時候我們需要實現一個公共的模塊,需要對多個其他的模塊提供服務,最常用的方式就是實現一個Socket Server,接受客戶的請求,並返回給客戶結果。
這經常涉及到如果管理多個連接及如何多線程的提供服務的問題,常用的方式就是連接池和線程池,基本流程如下:
首先服務器端有一個監聽線程,不斷監聽來自客戶端的連接。
當一個客戶端連接到監聽線程後,便建立了一個新的連接。
監聽線程將新建立的連接放入連接池進行管理,然後繼續監聽新來的連接。
線程池中有多個服務線程,每個線程都監聽一個任務隊列,一個建立的連接對應一個服務任務,當服務線程發現有新的任務的時候,便用此連接向客戶端提供服務。
一個Socket Server所能夠提供的連接數可配置,如果超過配置的個數則拒絕新的連接。
當服務線程完成服務的時候,客戶端關閉連接,服務線程關閉連接,空閑並等待處理新的任務。
連接池的監控線程清除其中關閉的連接對象,從而可以建立新的連接。
二、對Socket的封裝
Socket的調用主要包含以下的步驟:
調用比較復雜,我們首先區分兩類Socket,一類是Listening Socket,一類是Connected Socket.
Listening Socket由MySocketServer負責,一旦accept,則生成一個Connected Socket,又MySocket負責。
MySocket主要實現的方法如下:
int MySocket::write(const char * buf, int length) |
int MySocket::read(char * buf, int length) return index; |
int MySocket::status() FD_ZERO(&checkset); timeout.tv_sec = 10; status = select((int)m_socket + 1, &checkset, 0, 0, &timeout); |
int MySocket::close() |
MySocketServer的主要方法實現如下:
int MySocketServer::init(int port) struct sockaddr_in serverAddr; if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1) if(listen(m_socket, SOMAXCONN) == -1) struct linger lin; setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin)); |
MySocket * MySocketServer::accept() |
MySocket * MySocketServer::accept(int timeout) fd_set checkset; int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout); if(FD_ISSET(m_socket, &checkset)) |
三、線程池的實現
一個線程池一般有一個任務隊列,啟動的各個線程從任務隊列中競爭任務,得到的線程則進行處理:list<MyTask *> m_taskQueue;
任務隊列由鎖保護,使得線程安全:pthread_mutex_t m_queueMutex
任務隊列需要條件變量來支持生產者消費者模式:pthread_cond_t m_cond
如果任務列表為空,則線程等待,等待中的線程個數為:m_numWaitThreads
需要一個列表來維護線程池中的線程:vector<MyThread *> m_threads
每個線程需要一個線程運行函數:
void * __thread_new_proc(void *p) |
每個線程由MyThread類負責,主要函數如下:
int MyThread::start() pthread_attr_t attr; int ret = pthread_create(&m_thread, &attr, thread_func, args); if(ret != 0) } |
int MyThread::stop() int ret = pthread_kill(m_thread, SIGINT); if(ret != 0) |
int MyThread::join() { int ret = pthread_join(m_thread, NULL); if(ret != 0) return –1; } |
void MyThread::run() { while (false == m_bStop) { MyTask *pTask = m_threadPool->getNextTask(); if (NULL != pTask) { pTask->process(); } } } |
線程池由MyThreadPool負責,主要函數如下:
int MyThreadPool::init() pthread_condattr_t cond_attr; if (ret_val != 0) pthread_mutexattr_t attr; if (ret_val != 0) for (int i = 0; i< m_poolSize; ++i) return 0; |
int MyThreadPool::start() ret = pthread_cond_broadcast(&m_cond); if(ret != 0) |
void MyThreadPool::addTask(MyTask *ptask) pthread_mutex_lock(&m_queueMutex); m_taskQueue.push_back(ptask); if (m_waitingThreadCount > 0) pthread_mutex_unlock(&m_queueMutex); |
MyTask * MyThreadPool::getNextTask() pthread_mutex_lock(&m_queueMutex); while (m_taskQueue.begin() == m_taskQueue.end()) pthread_cond_wait(&n_cond, &m_queueMutex); --m_waitingThreadCount; pTask = m_taskQueue.front(); m_taskQueue.pop_front(); pthread_mutex_unlock(&m_queueMutex); return pTask; |
其中每一個任務的執行由MyTask負責,其主要方法如下:
void MyTask::process() { //用read從客戶端讀取指令 //對指令進行處理 //用write向客戶端寫入結果 } |
四、連接池的實現
每個連接池保存一個鏈表保存已經建立的連接:list<MyConnection *> * m_connections
當然這個鏈表也需要鎖來進行多線程保護:pthread_mutex_t m_connectionMutex;
此處一個MyConnection也是一個MyTask,由一個線程來負責。
線程池也作為連接池的成員變量:MyThreadPool * m_threadPool
連接池由類MyConnectionPool負責,其主要函數如下:
void MyConnectionPool::addConnection(MyConnection * pConn) pthread_mutex_lock(&m_connectionMutex); m_connections->push_back(pConn); pthread_mutex_unlock(&m_connectionMutex); m_threadPool->addTask(pConn); |
MyConnectionPool也要啟動一個背後的線程,來管理這些連接,移除結束的連接和錯誤的連接。
void MyConnectionPool::managePool() pthread_mutex_lock(&m_connectionMutex); for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); ) //處理錯誤的連接 pthread_mutex_unlock(&m_connectionMutex); } |
五、監聽線程的實現
監聽線程需要有一個MySocketServer來監聽客戶端的連接,每當形成一個新的連接,查看是否超過設置的最大連接數,如果超過則關閉連接,如果未超過設置的最大連接數,則形成一個新的MyConnection,將其加入連接池和線程池。
MySocketServer *pServer = new MySocketServer(port); MyConnectionPool *pPool = new MyConnectionPool(); while (!stopFlag) { MySocket * sock = pServer->acceptConnection(5); if(sock != null) { if(m_connections.size > maxConnectionSize) { sock.close(); } MyTask *pTask = new MyConnection(); pPool->addConnection(pTask); } } |
面向連接的Socket Server的簡單實現(簡明易懂)