1. 程式人生 > >高效的半同步/半非同步模式的實現

高效的半同步/半非同步模式的實現

先介紹一下半同步/半非同步模式:

首先半同步/半非同步模式中的同步和非同步和前面的IO模型中的同步和非同步是完全不用的概念。在IO模型中,同步和非同步區分的是核心嚮應用程式通知的是何種IO事件(是就緒事件還是完成事件),以及該由誰來完成IO讀寫(是應用程式還是核心)。在併發模式中,同步指的是程式完全按照程式碼序列的順序執行,非同步指的是程式的執行需要由系統事件來驅動。常見的系統事件包括中斷 訊號等。

比如8-8a描述了同步的讀操作 ,圖8-8b則描述了非同步的讀操作。


按照同步方式執行的執行緒稱為同步執行緒,按照非同步方式執行的執行緒成為非同步執行緒。顯然非同步執行緒的執行效率高,實時性強,這是很多嵌入式程式採用的模型。但編寫非同步方式執行的程式相對複雜,難於除錯和擴充套件,且不適合大量的併發。而同步執行緒則相反,它雖然效率比較低,實時性較差,但邏輯簡單。因此,對於像伺服器這種既要求較好的實時性,又要求同時處理多個客戶請求的應用程式,我們就應該同時使用同步執行緒和非同步執行緒來實現。即使用半同步/半非同步模式來實現!

半同步/半非同步模式中,同步執行緒用於處理客戶邏輯,非同步執行緒用於處理IO事件。非同步執行緒監聽到客戶請求後,就將其封裝成請求物件並插入請求佇列中。請求佇列將通知某個工作在同步模式的工作執行緒來讀取並處理該物件。具體選擇哪個工作執行緒來為新的客戶請求服務,則取決於請求佇列的設計。

下面圖8-9總結了半同步/半非同步的工作流程


在伺服器程式中,如果結合考慮兩種事件處理模式和幾種IO模型,則半同步/半非同步模式就存在多種變體。其中有一種變體成為半同步/半反應對模式,如下圖8-10所示



圖8-10中,主執行緒插入請求佇列中的任務就是就緒的連線socket.這說明該圖所示的半同步/半反應堆模式採用的事件處理模式是Reactor模式,它要求工作執行緒自己從socket上讀取客戶請求和往socket寫入伺服器應答。實際上,半同步/半反應對模式也可以使用模擬的Proactor事件處理模式,即由主執行緒來完成資料的讀寫。在這種情況下,主執行緒一般會將應用程式資料 任務型別等資訊封裝為一個任務物件,如何將其插入請求佇列。工作執行緒從請求佇列中取得任務物件之後,即可直接處理,而無須執行讀寫操作了。



可見 圖8-11中,每個執行緒都維持著自己的事件迴圈,它們各自獨立監聽不同的事件,因此在這種搞笑的半同步/半非同步模式中,每個執行緒都工作在非同步模式。

下面吧大概的程式碼展示一下

#include "SocketServer.h"


SocketServer::SocketServer(void) :
  m_nport(5001)
 ,m_epollfd(-1)
 ,m_bindsocket(-1)
 ,m_bstop(false)
{
}

SocketServer::~SocketServer(void)
{

}

void SocketServer::S_WorkService(void* arg)
{
	int epollfd = *(int*)arg;
	SOCKETServer::Instance()->WorkService(epollfd);
}

void SocketServer::WorkService(int epollfd)
{
	epoll_event events[ MAX_EVENT_NUMBER ];	
	 while( !m_bstop )
    {
        int event_num = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
		if ( event_num < 0 && (errno != EINTR))
        {
			DC_ERROR("epoll_wait error ,errmsg = %s",strerror(errno));
            break;
        }
		
		for ( int i = 0; i < event_num; i++ )
        {
            int sock = events[i].data.fd;
			
			struct sockaddr_in client_address;
			socklen_t client_addrlength = sizeof( client_address );
			getpeername(sock, (struct sockaddr *)&client_address, &client_addrlength); 
			char remoteAddress[INET_ADDRSTRLEN ] = {0};
			inet_ntop( AF_INET, &client_address.sin_addr, remoteAddress, INET_ADDRSTRLEN );
			int remotePort = ntohs( client_address.sin_port );
			
            if ( events[i].events & EPOLLIN )
            {
				char* pRecvBuff = new  char[SOCKET_BUF_SIZE];
				int nRemainDataSize = 0;
				 while( true )
				 {
					int nBytesThisTime = recv( sock, pRecvBuff + nRemainDataSize, SOCKET_BUF_SIZE -1-nRemainDataSize, 0 );
					if( nBytesThisTime < 0 )
					{
						if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
						{
							 break;
						}
						DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);
						del_socket_epoll(epollfd,sock);
						break;
					}
					else if (nBytesThisTime == 0)
					{
						DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);
						nRemainDataSize = 0;
						del_socket_epoll(epollfd,sock);
						break;
					}
					nRemainDataSize += nBytesThisTime;
				 }
				 //如果讀取失敗的話就直接返回
				 if(nRemainDataSize == 0)
				 {
					 continue;
				 }
				
				DC_INFO("recv %s:%d data size = %d" ,remoteAddress ,remotePort,nRemainDataSize);
				delete(pRecvBuff);pRecvBuff = NULL;
				
				if(0 != reset_socket_epoll(epollfd, sock))
				{
					del_socket_epoll(epollfd,sock);
				}
            }
            else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) )
            {
               DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);
			   del_socket_epoll(epollfd,sock);
            }
			else
			{
				DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);
			}
        }
		
	}
	
}
int SocketServer::StartServer(int port ,int threadNum )
{
	m_nport = port;
	m_nThreadNum = threadNum;
	
	m_bindsocket = socket( PF_INET, SOCK_STREAM, 0 );
	if(m_bindsocket < 0)
	{
		DC_ERROR("socket error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	
	/*地址可複用 time_wait*/	
	if(0 != make_socket_reuseable(m_bindsocket))
	{
		return SERVER_ERROR;
	}
	
	/*設定超時時間*/
	if(0 != make_socket_timeout(m_bindsocket,SOCKET_TIMEOUT))
	{
		return SERVER_ERROR;
	}
	
	/*設定緩衝區大小*/
	if(0 != make_socket_buffsize(m_bindsocket,SOCKET_BUF_SIZE))
	{
		return SERVER_ERROR;
	}
	
	/*設定非阻塞*/
	if(0 != make_socket_nonblock(m_bindsocket))
	{
		return SERVER_ERROR;
	}
	
	/*繫結地址和埠*/
	struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, "0.0.0.0", &address.sin_addr );
    address.sin_port = htons( m_nport );
	if(0 != bind( m_bindsocket, ( struct sockaddr* )&address, sizeof(address) ))
	{
		DC_ERROR("bind %d error ,errmsg = %s",m_nport,strerror(errno));
		return SERVER_ERROR;
	}
	
	/*監聽埠*/
	if(0 != listen(m_bindsocket, 128))
	{
		DC_ERROR("bind %d error ,errmsg = %s",m_nport,strerror(errno));
		return SERVER_ERROR;
	}
	
	/*建立工作執行緒以及對應的epoll控制代碼*/
	for(int i = 0 ;i < m_nThreadNum ;i++)
	{
		int epollfd = epoll_create( 5 );
		swartz_thread_detached_create((void*)S_WorkService, (void*)&epollfd, 0, 0);	
		//休眠50ms ,保證工作執行緒裡面的epollfd是正確的
		usleep(50*1000);
		m_EpollVec.push_back(epollfd);
	}
	
	/*epoll 監聽*/
    epoll_event events[ MAX_EVENT_NUMBER ];
    m_epollfd = epoll_create( 5 );
	if(m_epollfd == -1)
	{
		DC_ERROR("epoll_create  error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	if(0 != add_socket_epoll(m_epollfd, m_bindsocket,false))
	{
		return SERVER_ERROR;
	}
	
    while( 1 )
    {
		static int ClusterNum = 0;
        int event_num = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( event_num < 0 && (errno != EINTR))
        {
			DC_ERROR("epoll_wait error ,errmsg = %s",strerror(errno));
            break;
        }
    
        for ( int i = 0; i < event_num; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == m_bindsocket )
            {
				struct sockaddr_in client_address;
				socklen_t client_addrlength = sizeof( client_address );
				int clientfd = accept( m_bindsocket, ( struct sockaddr* )&client_address, &client_addrlength );
				if(clientfd < 0)
				{
					DC_ERROR("accept error ,errmsg = %s",strerror(errno));
					continue ;
				}
				
				char remoteAddress[INET_ADDRSTRLEN ] = {0};
				inet_ntop( AF_INET, &client_address.sin_addr, remoteAddress, INET_ADDRSTRLEN );
				int remotePort = ntohs( client_address.sin_port );
				DC_INFO("%s:%d connect" , remoteAddress, remotePort );
				
				/*設定非阻塞*/
				if(0 != make_socket_nonblock(clientfd))
				{
					continue;
				}
	
				if(0 != add_socket_epoll(m_EpollVec[ClusterNum%m_nThreadNum], clientfd,true))
				{
					continue;
				}
				ClusterNum++;
            }
            else
            {
                DC_INFO("other thing happened ,event = %d ",events[i].events);
            } 
        }
    }
	
	m_bstop = true;
	StopServer();
	
	return SERVER_OK;
}



int SocketServer::make_socket_nonblock(int sock)
{
	int flags;
	if ((flags = fcntl(sock, F_GETFL, NULL)) < 0) 
	{
		DC_ERROR("fcntl(%d, F_GETFL) ,ermsg = %s", sock,strerror(errno));
		return SERVER_ERROR;
	}
	if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
	{
		DC_ERROR("fcntl(%d, F_SETFL) O_NONBLOCK ,ermsg = %s", sock,strerror(errno));
		return SERVER_ERROR;
	}
	return SERVER_OK;
	
}

int SocketServer::make_socket_reuseable(int sock)
{
	int reuse = 1;
    if(0 != setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse )))
	{
		DC_ERROR("setsockopt SO_REUSEADDR error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	struct linger tcpLinger;
	tcpLinger.l_onoff  = 1;
	tcpLinger.l_linger = 0;

	if (0 != setsockopt(sock, SOL_SOCKET, SO_LINGER, &tcpLinger, sizeof(tcpLinger)))
	{
		DC_ERROR("setsockopt SO_LINGER error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	return SERVER_OK;	
}

int SocketServer::make_socket_timeout(int sock,int time)
{
	/*查詢和設定傳送超時時間*/
    struct timeval send_timeout;  
	send_timeout.tv_sec = time;  
    send_timeout.tv_usec = 0; 
	int len = sizeof( timeval );	
    if(0 != setsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout) ))
	{
		DC_ERROR("setsockopt SO_SNDTIMEO error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	getsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, ( socklen_t* )&len);
    DC_INFO( "the send timeout after settting is %ds", send_timeout.tv_sec/1000 );
	
	/*查詢和設定接收超時時間*/
	struct timeval recv_timeout;  
	recv_timeout.tv_sec = time;  
    recv_timeout.tv_usec = 0;  
    if(0 != setsockopt( sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof( recv_timeout) ))
	{
		DC_ERROR("setsockopt SO_RCVTIMEO error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	getsockopt( sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, ( socklen_t* )&len);
    DC_INFO( "the recv timeout after setting is %ds", recv_timeout.tv_sec/1000 );
	return SERVER_OK;
}
int SocketServer::make_socket_buffsize(int sock,int size)
{
	/*查詢和設定接收緩衝區*/
	int recvbuf = 0;
	int len = sizeof( recvbuf );
	getsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, ( socklen_t* )&len);
    DC_INFO( "the receive buffer size before settting is %d", recvbuf );
	
	recvbuf = size;
    if(0 != setsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, sizeof( recvbuf) ))
	{
		DC_ERROR("setsockopt SO_RCVBUF error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	getsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, ( socklen_t* )&len);
    DC_INFO( "the receive buffer size after settting is %d", recvbuf );
	
	/*查詢和設定傳送緩衝區*/
	int sendbuf = 0;
	getsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, ( socklen_t* )&len);
    DC_INFO( "the tcp send buffer size before setting is %d", sendbuf );
	
	sendbuf = size;
    if(0 != setsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof( sendbuf) ))
	{
		DC_ERROR("setsockopt SO_SNDBUF error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	getsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, ( socklen_t* )&len);
    DC_INFO( "the tcp send buffer size after setting is %d", sendbuf );
	return SERVER_OK;
}

int SocketServer::add_socket_epoll(int epollfd ,int socket,bool oneshot)
{
	epoll_event event;
    event.data.fd = socket;
    event.events = EPOLLIN | EPOLLET ;
	if( oneshot )
    {
        event.events |= EPOLLONESHOT;
    }
    if(0 != epoll_ctl( epollfd, EPOLL_CTL_ADD, socket, &event ))
	{
		DC_ERROR("epoll_ctl  error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	return SERVER_OK;
}

int SocketServer::reset_socket_epoll(int epollfd ,int socket)
{
	epoll_event event;
    event.data.fd = socket;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    if(0 != epoll_ctl( epollfd, EPOLL_CTL_MOD, socket, &event ))
	{
		DC_ERROR("epoll_ctl  error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	return SERVER_OK;
}

int SocketServer::del_socket_epoll(int epollfd ,int socket)
{
    if(0 != epoll_ctl( epollfd, EPOLL_CTL_DEL, socket,0))
	{
		DC_ERROR("epoll_ctl  error ,errmsg = %s",strerror(errno));
		return SERVER_ERROR;
	}
	close(socket);
	return SERVER_OK;
}

void SocketServer::StopServer()
{
	/*關閉所有的檔案描述符*/
	for (int sockfd = 3; sockfd < getdtablesize(); sockfd++)
	{
		close(sockfd);
	}
}
#ifndef _SOCKER_SERVER_H
#define _SOCKER_SERVER_H

#include "common.h"

class SocketServer
{
public:
	SocketServer(void);
	~SocketServer(void);
	
public:
	int StartServer(int port = 5001,int threadNum = 10);
	void StopServer();	
	
	static void S_WorkService(void* arg);
	void WorkService(int epollfd);
private:
	int make_socket_nonblock(int sock);	
	int make_socket_reuseable(int sock);
	int make_socket_timeout(int sock,int time);	
	int make_socket_buffsize(int sock,int size);	
	int add_socket_epoll(int epollfd ,int socket,bool oneshot);
	int reset_socket_epoll(int epollfd ,int socket);
	int del_socket_epoll(int epollfd ,int socket);

private:
	bool             m_bstop;                 //是否關閉
	int            	 m_nport;				  //埠號
	int            	 m_epollfd;               //主執行緒監聽epoll 控制代碼
	int           	 m_bindsocket;            //監聽的socket
	int           	 m_nThreadNum;            //接收資料工作執行緒的數量
	std::vector<int> m_EpollVec ;             //工作執行緒的epoll控制代碼集
};

typedef singleton<SocketServer> SOCKETServer;

#endif


相關推薦

高效同步/非同步模式實現

先介紹一下半同步/半非同步模式:首先半同步/半非同步模式中的同步和非同步和前面的IO模型中的同步和非同步是完全不用的概念。在IO模型中,同步和非同步區分的是核心嚮應用程式通知的是何種IO事件(是就緒事件還是完成事件),以及該由誰來完成IO讀寫(是應用程式還是核心)。在併發模式

同步-非同步模式談伺服器的設計

半同步-半非同步模式,最早應該是由ACE的作者提出,原文在這裡.簡而言之,所謂的半同步半非同步模式分為三個組成模組:同步處理模組,佇列模組,非同步處理模組.三個模組之間的互動關係如圖:(注:上圖出自這裡)幾個模組的之間的互動為:非同步模組接收可能會非同步到來的各種事件(I/O,訊號等),然後將它們放入

c++11 實現同步非同步執行緒池

感受: 隨著深入學習,現代c++給我帶來越來越多的驚喜… c++真的變強大了。 半同步半非同步執行緒池: 其實很好理解,分為三層 同步層:通過IO複用或者其他多執行緒多程序等不斷的將待處理事件新增到佇列中,這個過程是同步進行的

c++11實現一個同步非同步執行緒池

在處理大量併發任務的時候,如果按照傳統的方式,一個請求一個執行緒來處理請求任務,大量的執行緒建立和銷燬將消耗過多的系統資源,還增加了執行緒上下文切換的開銷,而通過執行緒池技術就可以很好的解決這些問題,執行緒池技術通過在系統中預先建立一定數量的執行緒,當任務請求到

多執行緒設計模式——Half-sync/Half-async(同步/非同步)模式

這些都是根據我最近看的《Java實戰指南多執行緒程式設計(設計模式篇)》所得整理。 模式名稱 Half-sync/Half-async(半同步/半非同步)模式 模式解決的問題 同步和非同步各有各的優勢,有沒有一個方法,能夠既保持了同步程式設計的簡

同步復制的實現

mysql1、在主服務器上的配置1)安裝mariadb-server[[email protected]/* */ ~]# yum -y install mariadb-server2)編輯/etc/my.cnf[[email protected]/* */ ~]# vim /etc/

Mysql同步復制模式說明 - 運維小結

ber 註冊 處理 從庫 mysql load 導致 版本 mas MySQL主從復制包括異步模式、半同步模式、GTID模式以及多源復制模式,默認是異步模式 (如之前詳細介紹的mysql主從復制)。所謂異步模式指的是MySQL 主服務器上I/O thread 線程將二進

同步非同步 多程序/多執行緒 區別

在《Linux高效能伺服器程式設計》中,看到的最厲害的方法基本上就是這個半同步半非同步的方法了。 簡直炸天。用到了多程序、多執行緒、epoll I/O複用。好像真的很高效能哦。 那麼其中多程序和多執行緒的的區別是啥? 多程序: 為了避免在父、子

基於Wininet非同步模式實現的HttpClient

      專案在使用Wininet API時一直採用的同步模式,通過開一個執行緒+等待超時的“假"非同步方式來實現非阻塞呼叫。但是最近突然發現,同步呼叫InternetOpenUrl在處於阻塞狀態時,在某些情況下,通過InternetCloseHandle無法強制Inte

memcached 原始碼分析——同步非同步模式

memcached 是目前應用非常廣泛的快取伺服器,採用的是半同步、半非同步模式。 半同步、半非同步 半同步/半非同步模型的基礎設施:主執行緒建立多個子執行緒(這些子執行緒也稱為worker執行緒),每一個執行緒都維持自己的事件迴圈,即每個執行緒都有自己

基於MySQL實現數據庫的同步主從復制

mysql、半同步、主從架構 首先我們來了解一下數據庫常遇到的問題: 第一就是性能上的問題1、向上拓展(硬件方面) scale up 個體本身 容易達到極限 2、向外拓展 scale out 第二就是可用性的問題1、數據庫服務中斷 2、誤操作數據損壞 3、硬件故障 4、數據庫升級測試

Mysql實現數據庫主從復制、主主復制、同步復制

mysql 數據庫復制 主主復制 主從復制 半同步復制 --------------Mysql實現數據庫主從復制架構----------------一、環境準備:centos系統服務器2臺、一臺用戶做Mysql主服務器,一臺用於做Mysql從服務器,配置好yum源、防火墻關閉、各節點時鐘服

實現MySQL同步架構

MySQL半同步架構默認情況下,MySQL的復制功能是異步的,異步復制可以提 供最佳的性能,主庫把binlog日誌發送給從庫即結束,並不驗 證從庫是否接收完畢。這意味著當主服務器或從服務器端發生 故障時,有可能從服務器沒有接收到主服務器發送過來的 binlog日誌,這就會造成主服務器和從服務器的數據不一致,

Mysql 同步複製和非同步複製

mysql 半同步複製和非同步複製 -- 在主庫中安裝半同步外掛,開啟半同步複製功能 install plugin rpl_semi_sync_master soname 'semisync_master.so'; set global rpl_semi_sync_master_enab

Mysql同步複製模式說明 - 運維小結

  MySQL主從複製包括非同步模式、半同步模式、GTID模式以及多源複製模式,預設是非同步模式 (如之前詳細介紹的mysql主從複製)。所謂非同步模式指的是MySQL 主伺服器上I/O thread 執行緒將二進位制日誌寫入binlog檔案之後就返回客戶端結果,不會考慮二進位制日誌是否完整傳輸到

Mysql同步複製模式

MySQL主從複製包括非同步模式、半同步模式、GTID模式以及多源複製模式,預設是非同步模式 (如之前詳細介紹的mysql主從複製)。所謂非同步模式指的是MySQL 主伺服器上I/O thread 執行緒將二進位制日誌寫入binlog檔案之後就返回客戶端結果,不會考慮二進位制日誌是否完整傳輸

MySQL複製(非同步方式、同步方式、GTID)總結

這是之前做的筆記,整體有些凌亂,後續有時間再整理一下格式!!!! 非同步複製:在主節點寫入日誌即返回成功,預設情況下MySQL5.5/5.6/5.7和mariaDB10.0/10.1的複製功能是非

MHA實現Mysql同步高可用

一、MHA介紹 1、MHA兩部分組成 MHA 由兩部分組成: MHA Manager(管理節點)和 MHA Node(資料節點)。 MHA Manager可以單獨部署在一臺獨立的機器上管理多個 master-slave 叢集,也可以部署在一臺 slave 節點上。 2、

mysql總結3(複製:M/S(非同步同步)、M/M,複製過濾器)

對於mysql複製一般來講只有一個節點即能讀又能寫,其餘節點只能讀 複製的功用: 資料分佈 負載均衡 備份 高可用和故障切換 mysql的升級測試 主從複製: 最基本條件:開啟二進位制日誌 原理: 1、slave將

centos7.2 MYSQL雙主+同步+keepalived實現高可用負載均衡

這兩天瞭解了一下mysql的叢集方案,發現有很多解決方案,有複雜的也有簡單的,有興趣的參考下面網址:http://www.cnblogs.com/Kellana/p/6738739.html 這裡,我使用中小企業最常用也是較簡單的方案,用keepalived提供一個vip(