1. 程式人生 > >從零學習遊戲伺服器開發(四)LogServer原始碼探究

從零學習遊戲伺服器開發(四)LogServer原始碼探究

這是從零學習開源專案的第四篇,上一篇是《從零學習開源專案系列(三) CSBattleMgr服務原始碼研究》,這篇文章我們一起來學習LogServer,中文意思可能是“日誌伺服器”。那麼這個日誌伺服器到底做了哪些工作呢?

我們在Visual Studio中將LogServer設定為啟動項,然後按F5將LogServer啟動起來,啟動成功後顯示如下圖:


從上圖中,我們可以到大致做了三件事:

1. 建立一個偵聽埠(埠號1234)
2. 連線mysql資料庫
3. 初始化日誌處理程式

我們來驗證一下這三件事的細節。我們再Visual Studio中將程式中斷(【除錯】選單-【全部中斷】,快捷鍵Ctrl + Alt + Break)。然後線上程視窗檢視這個程式所有的執行緒,如下圖所示:


所有用紅色旗幟標記的執行緒都是使用者執行緒,我們可以檢視這些執行緒的呼叫堆疊。我們從最上面的主執行緒開始:


切換到main函式,我們可以看出這裡是一個迴圈:

int main()
{
	auto res = CLogHandler::GetInstance().Init();
	if (res)
    {
		while(true)
        {
			INetSessionMgr::GetInstance()->Update();
			Sleep(1);
		}
	}

	return 0;
}

這裡一個是初始化動作,一個迴圈中Update動作,它們具體做了些什麼,我們先不管,我們先看其他執行緒做了什麼,再回過頭來看這裡的程式碼。

我們接著看下一個執行緒的內容:


從呼叫堆疊來看,這是一個使用boost::thread啟動的執行緒,這個執行緒函式程式碼如下:

void Active::Run() {
    if (m_BeginInThreadCallback){
	m_BeginInThreadCallback();
    }
    while (true){
	Consume();
    }
}

我們先看下這個執行緒函式做了什麼,主要是m_BeginInThreadCallback和Consume()函式,看Consume()函式:

void Active::Consume(){
	boost::mutex::scoped_lock lock(m_IOMutex);
	while(m_Queue.empty()){
		m_ConditionVar.wait(lock);
	}

	m_SwapQueue.swap(m_Queue);
	lock.unlock();
	while(!m_SwapQueue.empty()){
		Buffer* pBuffer = m_SwapQueue.front();
		m_SwapQueue.pop();

		m_Callback(pBuffer);
		--m_PendingWorkNum;
		if (pBuffer){
			m_pBufferPool.ReleaseObejct(pBuffer);
		}
	}
}

這段程式碼很好理解,先使用條件變數掛起當前執行緒,條件變數觸發後,如果消費者和生產者共有佇列m_Queue中有資料,將公用的佇列m_Queue臨時倒換到本地的一個區域性佇列m_SwapQueue中,然後挨個處理佇列m_SwapQueue中的資料。

這個執行緒在哪裡建立的呢?通過搜尋執行緒函式,我們找到如下程式碼:

void Active::Start(){
	bool ifHvTimer = !m_ThreadTimer.IsEmpty();
	if (ifHvTimer){
		m_Thread = boost::thread(&Active::RunWithUpdate, this);
	}
	else{
		m_Thread = boost::thread(&Active::Run, this);
	}
	m_ThreadID = get_native_thread_id(m_Thread);

	char sThreadName[30];
	sprintf(sThreadName, "%s-%d", "Actor-Run", GetActorID());
	_SetThreadName(m_ThreadID, sThreadName);
}

在上面這個函式中新增斷點,重啟下程式,很快會觸發斷點,我們看下斷點觸發時的呼叫堆疊:


通過呼叫堆疊,我們發現這個執行緒在一個全域性變數的建構函式中初始化的,這個全域性變數在DllMain()函式中初始化:


而這個dll是ELogging專案生成的:

也就是說,這是一個與日誌處理相關的執行緒。生產者產生日誌記錄,然後由這個執行緒作為消費者,來處理日誌。

我們接著看下一個執行緒的內容:


void CConnectCtrl::OnExecute()
{
	while(!m_bTerminate)
	{
		_ProcRequests();
		_ProcEvents();
		//CCPSockMgr::Instance()->CheckDelayRelease();
		Sleep(1);
	}
}

這也是一個迴圈,先看下_ProcRequests()函式:

void CConnectCtrl::_ProcRequests()
{
	while(m_dwSockCount < (UINT32)MAX_CONNECTION)
	{
		SConnReq* pstConnReq = (SConnReq*)m_oReqQueue.PopFront();
		if(NULL == pstConnReq)
		{
			break;
		}

		SOCKET hSock = socket(PF_INET, SOCK_STREAM, IPPROTO_IP);
		if(INVALID_SOCKET == hSock)
		{
			CRITICAL(_SDT("CConnectCtrl::_ProcRequests, socket failed, errno %d"), WSAGetLastError());
			CEventMgr::Instance()->PushConnErrEvt(WSAGetLastError(), pstConnReq->dwConnectorID);
			m_oFreeQueue.PushBack(pstConnReq);
			break;
		}
	
		//// 2009-04-02 cwy modify for general use
		if (pstConnReq->bNeedBind)
		{
			if ( false == BindAddress(hSock, pstConnReq->pszBindIP, pstConnReq->wBindPort) )
			{
				_OnSockError(hSock, pstConnReq);
				break;
			}
		}
        if (g_bNodelay)
        {
            const CHAR szOpt = 1;
            if (0 != ::setsockopt(hSock, IPPROTO_TCP, TCP_NODELAY, (char *)&szOpt, sizeof(char)))
            {
                WARN(_SDT("setsockopt for new socket on UpdateConetext failed, errno=%d"), ::WSAGetLastError());
            }
        }

		WSAEVENT hEvent = WSACreateEvent();
		if(WSA_INVALID_EVENT == hEvent)
		{
			_OnSockError(hSock, pstConnReq);
			break;
		}

		if(SOCKET_ERROR == WSAEventSelect(hSock, hEvent, FD_CONNECT))
		{
			_OnSockError(hSock, pstConnReq);
			WSACloseEvent(hEvent);
			break;
		}

		sockaddr_in stAddr = {0};
		stAddr.sin_family		= AF_INET;
		stAddr.sin_addr.s_addr	= pstConnReq->dwIP;
		stAddr.sin_port			= htons(pstConnReq->wPort);
		
		if( SOCKET_ERROR == connect(hSock, (sockaddr*)&stAddr, sizeof(stAddr)) )
		{
			if(WSAEWOULDBLOCK != WSAGetLastError())
			{
				_OnSockError(hSock, pstConnReq);
				WSACloseEvent(hEvent);
				break;
			}
		}

		m_pProcReqArray[m_dwSockCount]	= pstConnReq;
		m_pSockArray[m_dwSockCount]		= hSock;
		m_pEventsArray[m_dwSockCount]	= hEvent;
		++m_dwSockCount;
	}
}

這段函式的邏輯也是比較容易懂,先從一個佇列中取出資料,然後處理,只不過這些資料都是與連線相關的資訊。

再看下while迴圈中第二個函式_ProcEvents:

void CConnectCtrl::_ProcEvents()
{
	if(0 == m_dwSockCount)
	{
		return;
	}

	WSANETWORKEVENTS	stNetworkEvents;
	WSAEVENT*			pEvents;
	UINT32				dwCount;
	UINT32				dwIndex;
	UINT32				dwStart = 0;

	do
	{
		pEvents = &m_pEventsArray[dwStart];
		if(dwStart + WSA_MAXIMUM_WAIT_EVENTS > m_dwSockCount)
		{
			dwCount	= m_dwSockCount - dwStart;
		}
		else
		{
			dwCount	= WSA_MAXIMUM_WAIT_EVENTS;
		}
		
		dwIndex = WSAWaitForMultipleEvents(dwCount, pEvents, false, 0, false);
		if(WSA_WAIT_FAILED == dwIndex || WSA_WAIT_TIMEOUT == dwIndex)
		{
			dwStart += dwCount;
			continue;
		}

		dwIndex -= WSA_WAIT_EVENT_0;
		dwIndex += dwStart;
		++dwStart;

		SDASSERT(m_pProcReqArray[dwIndex] != NULL && m_pSockArray[dwIndex] != INVALID_SOCKET && m_pEventsArray[dwIndex] != WSA_INVALID_EVENT);

		if(SOCKET_ERROR == WSAEnumNetworkEvents(m_pSockArray[dwIndex], m_pEventsArray[dwIndex], &stNetworkEvents))
		{
			if(WSAEWOULDBLOCK != WSAGetLastError())
			{
				CEventMgr::Instance()->PushConnErrEvt(WSAGetLastError(), m_pProcReqArray[dwIndex]->dwConnectorID);
				_CloseEvent(dwIndex);
			}
			continue;
		}

		if(stNetworkEvents.lNetworkEvents & FD_CONNECT)
		{
			if(stNetworkEvents.iErrorCode[FD_CONNECT_BIT] != 0)
			{
				CEventMgr::Instance()->PushConnErrEvt(stNetworkEvents.iErrorCode[FD_CONNECT_BIT], m_pProcReqArray[dwIndex]->dwConnectorID);
				_CloseEvent(dwIndex);
				continue;
			}

			//
			// 連線成功
			//
			SConnReq* pstReq = m_pProcReqArray[dwIndex];

			CConnData * pConnData = CConnDataMgr::Instance()->Alloc(pstReq->dwRecvBufSize, pstReq->dwSendBufSize);
			if (pConnData == NULL)
			{
				CRITICAL(_SDT("CConnectCtrl::_ProcEvents, create ConnData failed"));
				CEventMgr::Instance()->PushConnErrEvt(0, pstReq->dwConnectorID);
				_CloseEvent(dwIndex);
				continue;
			}
			CCPSock *poSock = &pConnData->sock;
			CUCConnection * poConnection = &pConnData->connection;

			poSock->SetSock(m_pSockArray[dwIndex]);

			m_oFreeQueue.PushBack(m_pProcReqArray[dwIndex]);
			WSACloseEvent(m_pEventsArray[dwIndex]);
			m_pProcReqArray[dwIndex]	= NULL;
			m_pSockArray[dwIndex]		= INVALID_SOCKET;
			m_pEventsArray[dwIndex]		= WSA_INVALID_EVENT;

			sockaddr_in stAddr = {0};
			INT32 nAddrLen = sizeof(stAddr);
			getsockname(poSock->GetSock(), (sockaddr*)&stAddr, &nAddrLen);

			poConnection->SetAccept(false);
			poConnection->SetParentID(pstReq->dwConnectorID);
			poConnection->SetSession(pstReq->poSession);
			poConnection->SetLocalIP(stAddr.sin_addr.s_addr);
			poConnection->SetLocalPort(SDNtohs(stAddr.sin_port));
			poConnection->SetRemoteIP(pstReq->dwIP);
			poConnection->SetRemotePort(pstReq->wPort);
			//poConnection->SetCpSock(poSock);

			//poSock->SetConnection(poConnection);
			poSock->SetPacketParser(pstReq->poPacketParser);
			poSock->SetConnect(TRUE);

			//CEventMgr::Instance()->PushEstablishEvt(pConnData, false, pstReq->dwConnectorID);

			if(false == poSock->AssociateWithIocp())
			{
				poSock->Close();
			}
			else
			{
				if(false == poSock->PostRecv())
				{
					poSock->Close();
				}
			}
		}
	}while(dwStart < m_dwSockCount);

	_CompressEvent();
}

這個函式,對上一個函式中發起的連線結果做出判斷並處理。如果連線成功,則向完成埠上投遞一個recv事件。這個迴圈的程式碼,我建議讀者好好研究一下,非常好的重連例項,同時也組合了完成埠的模型,還有一些重要的網路程式設計細節(如nodelay選項等)。

那麼這個執行緒在哪裡啟動的呢?通過搜尋OnExecute函式名我們發現真正的執行緒函式:

unsigned CConnectCtrl::ThreadFunc(LPVOID pParam)
{
	CConnectCtrl* poCtrl = (CConnectCtrl*)pParam;
	poCtrl->OnExecute();

	return 0;
}

進而搜尋到:

bool CConnectCtrl::Init()
{
	INT32 nMaxRequest = MAX_CONNECTION * 2;

	m_pAllReqArray = new SConnReq[nMaxRequest];
	if(NULL == m_pAllReqArray)
	{
		return false;
	}

	if(false == m_oFreeQueue.Init(nMaxRequest+1))
	{
		return false;
	}

	if(false == m_oReqQueue.Init(nMaxRequest+1))
	{
		return false;
	}

	INT32 i;
	for(i = 0; i < nMaxRequest; i++)
	{
		m_oFreeQueue.PushBack(&m_pAllReqArray[i]);
	}

	m_pProcReqArray = new SConnReq*[MAX_CONNECTION];
	if(NULL == m_pProcReqArray)
	{
		CRITICAL(_SDT("CConnectCtrl::Init, new SConnReq*[%d] failed"), MAX_CONNECTION);
		return false;
	}

	m_pEventsArray = new WSAEVENT[MAX_CONNECTION];
	if(NULL == m_pEventsArray)
	{
		CRITICAL(_SDT("CConnectCtrl::Init, new WSAEVENT[%d] failed"), MAX_CONNECTION);
		return false;
	}

	m_pSockArray = new SOCKET[MAX_CONNECTION];
	if(NULL == m_pSockArray)
	{
		CRITICAL(_SDT("CConnectCtrl::Init, new SOCKET[%d] failed"), MAX_CONNECTION);
		return false;
	}

	for(i = 0; i < MAX_CONNECTION; i++)
	{
		m_pProcReqArray[i]	= NULL;
		m_pEventsArray[i]	= WSA_INVALID_EVENT;
		m_pSockArray[i]		= INVALID_SOCKET;
	}
	m_dwSockCount = 0;

	m_bTerminate = false;

	UINT dwThreadID = 0;
	m_hThread = (HANDLE)_beginthreadex(	NULL,					// Security
										0,						// Stack size - use default
										ThreadFunc,     		// Thread fn entry point
										(void*)this,			// Param for thread
										0,						// Init flag
										&dwThreadID);			// Thread address

	if(NULL == m_hThread)
	{
		CRITICAL(_SDT("CConnectCtrl::Init, _beginthreadex failed"));
		return false;
	}

	return true;
}

我們在CConnectCtrl::Init()處加個斷點,然後重啟一下程式,看下呼叫堆疊:


在CUCODENETWin::_InitComponent()中我們看到整個網路通訊框架的初始化,初始化CConnDataMgr、CEventMgr、CConnectCtrl和CIocpCtrl。

bool CUCODENetWin::_InitComponent()
{
	if (false == CConnDataMgr::Instance()->Init())
	{
		CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CConnDataMgr failed" ));
		return false;
	}
	if(false == CEventMgr::Instance()->Init(MAX_NET_EVENT))
	{
		CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CEventMgr %d failed"), MAX_NET_EVENT);
		return false;
	}


	if(false == CConnectCtrl::Instance()->Init())
	{
		CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CConnectCtrl failed"));
		return false;
	}

	if(false == CIocpCtrl::Instance()->Init())
	{
		CRITICAL(_SDT("CUCODENetWin::_InitComponent, Init CIocpCtrl failed"));
		return false;
	}

	return true;
}

而所有的這些初始化,都是在所謂的CLogNetSessionMgr中初始化的:


我們最終追溯到最上層的程式碼中:


到這裡,終於找到家了。

最後一批介紹的四個執行緒是完成埠執行緒,如下圖所示:


精華部分全在其執行緒函式中:

void CIocpCtrl::OnExecute()
{
	SPerHandleData* pstPerHandleData;
	SPerIoData*		pstPerIoData;
	CCPSock*		poSock;
	CCpListener*	poListener;
	BOOL			bRet;
	DWORD			dwByteTrabsferred;
	
	while(true)
	{
		pstPerHandleData	= NULL;
		pstPerIoData		= NULL;
		dwByteTrabsferred	= 0;

		bRet = GetQueuedCompletionStatus(
			m_hCompletionPort,
			&dwByteTrabsferred,
			(PULONG_PTR)&pstPerHandleData,
			(LPOVERLAPPED*)&pstPerIoData,
			INFINITE);

		// 檢查是否是執行緒退出
		if(NULL == pstPerHandleData)
		{
			return;
		}

		//當有客戶端請求建立連線時
		if(pstPerHandleData->bListen)
		{
			// for listen event
			poListener = (CCpListener*)pstPerHandleData->ptr;
			if(NULL != poListener &&  NULL != pstPerIoData)
			{
				poListener->OnAccept(bRet, pstPerIoData);
				//printf("Accpet Count:%d \n", InterlockedIncrement((LONG*)&m_acceptCount) ); 

			}
			else 
			{
				SDASSERT(false);
			}
		}
		else 
		{
			//for non-listen event 
			poSock = (CCPSock*)pstPerHandleData->ptr;
			if ( NULL == poSock )
			{
				continue;
			}
			if( FALSE == bRet || NULL == pstPerIoData )
			{				
                if (::WSAGetLastError()!=ERROR_IO_PENDING)
                {
                    INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"), 
                        MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());
                    poSock->OnClose();
                }				
			}
			else
			{			
				switch(pstPerIoData->nOp)
				{
				case IOCP_RECV:
					{
						poSock->DecPostRecv();
						if (dwByteTrabsferred > 0)
						{
							poSock->OnRecv(dwByteTrabsferred);
						}
						else
						{
							INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it, socket :%d "), 
								MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError(), poSock->GetSock());
							poSock->OnClose();
						}
					}
					break;
				case IOCP_SEND:
					{
						poSock->DecPostSend();
						if (dwByteTrabsferred > 0)
						{
							poSock->OnSend(dwByteTrabsferred);
						}
						else
						{
							INFO(_SDT("[%s:%d]CCPSock connID=%d error %d, close it"), 
								MSG_MARK, poSock->GetConnectionID(), ::WSAGetLastError());
							poSock->OnClose();
						}
					}
					break;
				case IOCP_CLOSE:
					{
						poSock->OnClose(false);
					}
					break;
			 
				default:
					;
				}
			}
		}
	}
}

我始終覺得,完成埠模型即使不從事Windows開發的linux伺服器開發人員應該也要掌握一下。尤其是linux伺服器開發人員需要給客戶端人員設計網路通訊層的企業。

我們看下,這四個執行緒在哪裡啟動的?

同樣的方法,我們通過搜尋,先找到:

unsigned CIocpCtrl::ThreadFunc(LPVOID pParam)
{
	CIocpCtrl* poCtrl = (CIocpCtrl*)pParam;
	poCtrl->m_threadBufPool.CreateThreadBuffer();
	poCtrl->OnExecute();
	poCtrl->m_threadBufPool.ReleaseThreadBuffer();
	return 0;
}

進而進一步找到:

bool CIocpCtrl::Init()
{
	//建立IO完成埠控制代碼
	m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	if (m_hCompletionPort == NULL)
	{
		CRITICAL(_SDT("CIocpCtrl::Init, CreateIoCompletionPort failed, Error %d \n"), ::WSAGetLastError()); 
		return false;
	}

	//獲取當前伺服器的CPU核數
	SYSTEM_INFO stSysInfo;
	GetSystemInfo(&stSysInfo);
	m_nNumberOfWorkers = stSysInfo.dwNumberOfProcessors * THREAD_PER_CPU;

    if (g_nThreadNum > 0)
    {
        m_nNumberOfWorkers = g_nThreadNum;
    }
	m_WorkerArray = new HANDLE[m_nNumberOfWorkers];
	for (INT32 i = 0; i < m_nNumberOfWorkers; i++) 
	{
		m_WorkerArray[i] = INVALID_HANDLE_VALUE;
	}

	//建立m_nNumberOfWorkers個執行緒
	UINT dwThreadID = 0;
	for (INT32 j = 0; j < m_nNumberOfWorkers; j++) 
	{
		m_WorkerArray[j] = (HANDLE)_beginthreadex(	NULL,					// Security
													0,						// Stack size - use default
													ThreadFunc,     		// Thread fn entry point
													(void*)this,			// Param for thread
													0,						// Init flag
													&dwThreadID);			// Thread address

		if (NULL == m_WorkerArray[j]) 
		{
			m_nNumberOfWorkers = j;
			this->Uninit();
			CRITICAL(_SDT("CIocpCtrl::Init, Create Worker thread failed, Close Handler\n")); 
			return false;
		}
		
	}
	return true;
}

然後同樣的方法在CIocpCtrl::Init()處加個斷點,重新跑下程式,得到如下呼叫堆疊:


我們上文中已經介紹過了,這裡就不再重複說明:


通過分析,我們知道LogServer大致的技術框架,業務細節和技術細節,我們在後面的文章中會接著介紹。我們當前的目的是快速把所有的服務的技術框架給熟悉一遍。

您可以繼續閱讀下一篇文章《從零學習開源專案系列(五)SSBattleMgr原始碼探究》。

歡迎關注公眾號『easyserverdev』。如果有任何技術或者職業方面的問題需要我提供幫助,可通過這個公眾號與我取得聯絡,此公眾號不僅分享高效能伺服器開發經驗和故事,同時也免費為廣大技術朋友提供技術答疑和職業解惑,您有任何問題都可以在微信公眾號直接留言,我會盡快回復您。