關於windows完成埠(IOCP)的一些理解
本人很多年前接觸完成埠以來,期間學習和練習了很多次,本以為自己真正地理解了其原理,最近在看網狐的伺服器端原始碼時又再一次拾起完成埠的知識,結果發現以前理解的其實很多偏差,有些理解的甚至都是錯誤的。網路上關於windows完成埠的介紹舉不勝舉,但大多數都是介紹怎麼做,而不是為告訴讀者為什麼這麼做。看了很多遍小豬的講解:http://blog.csdn.net/piggyxp/article/details/6922277,終於有些頓悟。為自己也為別人,在這裡做個備忘。
這篇文章將從為什麼這麼做的角度來解釋完成埠的一些重難點。
使用完成埠一般按以下步驟(這裡以網路伺服器接受客戶端連線並與客戶端進行網路通訊為例):
//步驟1:建立完成埠
//步驟2:建立偵聽socket並將偵聽socket繫結到完成埠上
//步驟3:設定偵聽
步驟1程式碼:
m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
步驟2程式碼:
//建立偵聽socket m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); // 將Listen Socket繫結至完成埠中 if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0)) { return false; } return true;
注意,必須使用WSASocket函式,並設定標識位WSA_FLAG_OVERLAPPED。
步驟3程式碼:
// 伺服器地址資訊,用於繫結Socket struct sockaddr_in ServerAddress; // 填充地址資訊 ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress)); ServerAddress.sin_family = AF_INET; // 這裡可以繫結任何可用的IP地址,或者繫結一個指定的IP地址 ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY); //ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString()); ServerAddress.sin_port = htons(m_nPort); // 繫結地址和埠 if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress))) return false; // 開始進行監聽 if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN)) return false; return true;
以上步驟都是完成埠約定俗成的套路,現在接下來的問題是如何接受客戶端連線?
難點一: 使用AcceptEx代替accept時,完成埠模型讓作業系統替我們接受新連線
不管是使用select還是epoll這裡模型無非都是檢測到偵聽socket可讀,然後在呼叫accept函式接受連線,這樣存在一個問題,就是偵聽socket只有一個,所以呼叫accept函式接受連線的邏輯也只能有一個(一般不會在多執行緒裡面對同一個socket進行同一種操作)。但是如果是這樣的話,如果同一時間有大量的連線來了,可能就要逐個接受連線了,相當於一群人排隊進入一個門裡面,那有沒有更好的方法呢?有,windows提供了一個AcceptEx函式,在建立完偵聽函式之後,呼叫這個函式,那麼將來在完成埠的工作執行緒裡面如果有接受新連線動作,則無需呼叫accept或者AcceptEx,作業系統自動幫你接受新連線,等在工作執行緒裡面得到通知的時候,連線已經建立,而且新的客戶端socket也已經建立好。注意:這是完成埠的另外一個優勢,如果使用accept,不僅需要使用accept接受新連線,同時需要在連線現場建立一個socket,而使用AcceptEx,這兩個步驟都不需要了。AcceptEx函式簽名如下:
BOOL AcceptEx(
_In_ SOCKET sListenSocket,
_In_ SOCKET sAcceptSocket,
_In_ PVOID lpOutputBuffer,
_In_ DWORD dwReceiveDataLength,
_In_ DWORD dwLocalAddressLength,
_In_ DWORD dwRemoteAddressLength,
_Out_ LPDWORD lpdwBytesReceived,
_In_ LPOVERLAPPED lpOverlapped
);
注意看第二個引數sAcceptSocket,這個socket我們在初始化的時候需要準備好,將來新連線成功以後,可以直接使用這個socket表示客戶端連線。但是你可能又會問,我初始化階段需要準備多少個這樣的socket呢?畢竟不可能多個連線使用同一個sAcceptSocket。的確如此,所以一般初始化的時候準備一批客戶端socket,等工作執行緒有新連線成功後,表明開始準備的某個客戶端socket已經被使用了,這個時候我們可以繼續補充一個。相當於,我們預先準備五個容器,在使用過程中每次使用一個,我們就立刻補充一個。當然,這個AcceptEx這個函式不僅準備了接受連線操作,同時也準備了連線的兩端的地址緩衝區和對端發來的第一組資料緩衝區,將來有新連線成功以後,作業系統通知我們的時候,作業系統不僅幫我門接收好了連線,還將連線兩端的地址和對端發過來的第一組資料填到我們指定的緩衝區了。當然msdn上說使用這個函式最好不要直接使用,而是通過相應API獲取該函式的指標,再呼叫之(https://msdn.microsoft.com/en-us/library/windows/desktop/ms737524(v=vs.85).aspx):Note The function pointer for the AcceptEx function must be obtained at run time by making a call to the WSAIoctl function with the SIO_GET_EXTENSION_FUNCTION_POINTER opcode specified. The input buffer passed to the WSAIoctl function must contain WSAID_ACCEPTEX, a globally unique identifier (GUID) whose value identifies the AcceptEx extension function. On success, the output returned by the WSAIoctl function contains a pointer to the AcceptEx function. The WSAID_ACCEPTEX GUID is defined in the Mswsock.h header file.
程式碼應該寫成這樣:
// 使用AcceptEx函式,因為這個是屬於WinSock2規範之外的微軟另外提供的擴充套件函式
// 所以需要額外獲取一下函式的指標,
// 獲取AcceptEx函式指標
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能獲取AcceptEx函式指標。錯誤程式碼: %d\n"), WSAGetLastError());
return false;
}
當然,WSAIoctl函式第一個引數只要填寫任意一個有效的socket就可以了。
難點二:完成埠模型讓作業系統替我們進行資料收發
NO1. 寫過網路通訊程式的人都知道,尤其是伺服器端程式,我們不能直接呼叫send和recv這類函式進行資料收發,因為當tcp視窗太小時,資料發不出去,send會阻塞執行緒,同理,如果當前網路緩衝區沒有資料,呼叫recv也會阻塞執行緒。這是入門級的做法。
NO2. 既然上述做法不好,那我就換成主動檢測資料是否可以收發,當資料可以收發的時候,再呼叫send或者recv函式進行收發。這就是常用的IO複用函式的用途,如select函式、linux下的poll函式。這是中級做法。
NO3. 使用IO複用技術主動檢測資料是否可讀可寫,也存在問題。如果檢測到了資料可讀或可寫,那這種檢測就是值得的;但是反之檢測不到呢?那也是白白地浪費時間的。如果有一種方法,我不需要主動去檢測,我只需要預先做一個部署,當有資料可讀或者可寫時,作業系統能通知我就好了,而不是每次都是我自己去主動檢測。有,這就是linux下的epoll模型和windows下的WSAAsyncSelect和完成埠模型。這是高階做法。
NO4. 但是無論是epoll模型還是WSAAsyncSelect模型,雖然作業系統會告訴我們什麼時候資料可讀或者可寫,但是當資料可讀或者可寫時,還是需要我們自己去呼叫send或者recv函式做實際的收發資料工作。那有沒有一種模型,不僅能通知我們資料可讀和可寫,甚至當資料可讀或者可寫時,連資料的收發工作也幫我們做好了?有,這就是windows的完成埠模型。
這就是標題所說的完成埠將IO操作從手動變為自動,完成埠將資料的可讀與可寫檢測操作和收發資料操作這兩項工作改為作業系統代勞,等系統完成之後會通知我們的,而我們只需要在這之前做一些相應的部署(初始化工作)就可以了。 那麼需要做那些初始化工作呢?這裡我們以收發網路資料為例。
對於收資料,我們只需要準備好,存放資料的緩衝區就可以了:
// 初始化變數
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
// 初始化完成後,,投遞WSARecv請求
int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
// 如果返回值錯誤,並且錯誤的程式碼並非是Pending的話,那就說明這個重疊請求失敗了
if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
{
this->_ShowMessage(_T("投遞第一個WSARecv失敗!"));
return false;
}
WSARecv函式會立刻返回,不會阻塞,如果返回時資料已經收成功了,那我們準備的緩衝區m_wsaBuf中存放的就是我們收到的資料;否則WASRecv會返回-1(SOCKET_ERROR),此時錯誤碼如果是WSA_IO_PENDING表示收資料暫且還沒完成,這樣你需要等待後續通知。所以從某種意義上來說WSARecv函式並不是收取資料,而更像是安排讓作業系統收資料的設定。
同理,對於發資料,我們也只要準備好需要傳送的資料即可:
// 初始化變數
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = SEND_POSTED;
// 初始化完成後,,投遞WSARecv請求
int nBytesSend = WSASend pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
// 如果返回值錯誤,並且錯誤的程式碼並非是Pending的話,那就說明這個重疊請求失敗了
if ((SOCKET_ERROR == nBytesSend) && (WSA_IO_PENDING != WSAGetLastError()))
{
this->_ShowMessage(_T("傳送資料失敗!"));
return false;
}
發資料的程式碼基本上和收資料一模一樣。
上面介紹了一些不成體系的程式碼片段,那麼我們應該怎麼把上面介紹的程式碼組織成一個整體呢?完成埠模型,需要初始化步驟中還需要建立一些工作執行緒,這些工作執行緒就是用來處理各種作業系統的通知的,比如有新客戶端連線成功了、資料收好了、資料傳送好了等等。建立工作執行緒以及準備新連線到來時需要的一些容器的程式碼(上文介紹過了,如一些acceptSocket、兩端地址緩衝區、第一份收到的資料緩衝區):
建立工作執行緒:
DWORD nThreadID;
for (int i = 0; i < m_nThreads; i++)
{
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->nThreadNo = i+1;
m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
}
呼叫AcceptEx為將來接受新連線準備:// 為AcceptEx 準備引數,然後投遞AcceptEx I/O請求
for( int i=0;i<MAX_POST_ACCEPT;i++ )
{
// 新建一個IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if( false==this->_PostAccept( pAcceptIoContext ) )
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
//////////////////////////////////////////////////////////////////
// 投遞Accept請求
bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
{
ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
// 準備引數
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
// 為以後新連入的客戶端先準備好Socket( 這個是與傳統accept最大的區別 )
pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
{
_ShowMessage(_T("建立用於Accept的Socket失敗!錯誤程式碼: %d"), WSAGetLastError());
return false;
}
// 投遞AcceptEx
if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
{
if(WSA_IO_PENDING != WSAGetLastError())
{
_ShowMessage(_T("投遞 AcceptEx 請求失敗,錯誤程式碼: %d"), WSAGetLastError());
return false;
}
}
return true;
}
這裡我開始準備了MAX_POST_ACCEPT=10個socket。而工作執行緒的執行緒函式應該看起來是這個樣子:
DWORD ThreadFunction()
{
//使用GetQueuedCompletionStatus函式檢測事件型別
if (事件型別 == 有新客戶端連成功)
{
//做一些操作1,比如顯示一個新連線資訊
}
else if (事件型別 == 收到了一份資料)
{
//做一些操作2,比如解析資料
}
else if (事件型別 == 資料傳送成功了)
{
//做一些操作3,比如顯示一條資料傳送成功資訊
}
}
在沒有事件發生時,函式GetQueuedCompletionStatus()會讓工作執行緒掛起,不然不會佔用cpu時間片。但是不知道你有沒有發現執行緒函式存在以下問題:
1 . GetQueuedCompletionStatus函式如何確定事件型別?如何判斷哪些事件是客戶端連線成功事件,哪些事件是收發資料成功事件呢?
2. 當一個完成埠上繫結多個socket時,這些socket有的是偵聽socket,有的是客戶端socket,如何判斷到底是哪個socket呢?
奧妙就在某個socket與完成埠控制代碼繫結時的第三個引數CompletionKey,這其實就是一個指標。
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
GetQueuedCompletionStatus函式簽名如下:BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_Out_ LPDWORD lpNumberOfBytes,
_Out_ PULONG_PTR lpCompletionKey,
_Out_ LPOVERLAPPED *lpOverlapped,
_In_ DWORD dwMilliseconds
);
看到沒有,GetQueuedCompletionStatus正好也有一個引數叫CompletionPort,而且還是一個輸出引數。沒錯!這兩個其實就是同一個指標。這樣如果我在繫結socket到完成埠控制代碼時使用一塊記憶體的指標作為CompletionKey的值,該記憶體含有該socket的資訊,這樣我在工作執行緒中收到事件通知時就能取出這個CompletionKey來得到這個socket控制代碼了,這樣我就知道到底是哪個socket上的事件了。偽碼如下:
struct SOME_STRUCT
{
SOCKET s;
//可以再定義一些其它資訊一起攜帶
};
//對於偵聽socket
SOME_STRUCT someStruct1;
someStruct1.s = ListenSocket;
CreateIoCompletionPort( ListenSocket, m_hIOCompletionPort,(DWORD)&someStruct1, 0);
//對於普通客戶端連線socket
SOME_STRUCT someStruct2;
someStruct2.s = acceptSocket;
CreateIoCompletionPort( acceptSocket, m_hIOCompletionPort,(DWORD)&someStruct2, 0);
其實這個SOME_STRUCT因為是每一個socket有一份,所以它有個名字叫“Per Socket Data”。
執行緒函式裡面就應該寫成這個樣子:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 偵聽socket控制代碼)
{
//新連線接收成功,做一些操作
}
//普通客戶端socket收發資料
else
{
if (事件型別 == 收到了一份資料)
{
//做一些操作2,比如解析資料
}
else if (事件型別 == 資料傳送成功了)
{
//做一些操作3,比如顯示一條資料傳送成功資訊
}
}
}
現在另外一個問題就是,如何判斷是資料傳送成功還是收到了資料?前面已經說過,對於每一次的收發資料,都需要呼叫WSASend或WSARecv函式進行準備,而這兩個函式需要一個OVERLAPPED結構體,反正傳得是這個結構體的指標,我們可以根據指標物件的伸縮特性,在這個OVERLAPPED結構體後面再增加一些欄位來標識我們是收資料動作還是發資料動作。而這個擴充套件的OVERLAPPED結構體,因為是針對每一次IO操作的,所以叫“Per IO Data”。因此這個資料結構的第一個欄位必須是一個OVERLAPPED結構體:
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped; // 每一個重疊網路操作的重疊結構(針對每一個Socket的每一個操作,都要有一個)
SOCKET m_sockAccept; // 這個網路操作所使用的Socket
WSABUF m_wsaBuf; // WSA型別的緩衝區,用於給重疊操作傳引數的
char m_szBuffer[MAX_BUFFER_LEN]; // 這個是WSABUF裡具體存字元的緩衝區
OPERATION_TYPE m_OpType; // 標識網路操作的型別(對應上面的列舉)
};
我們首先將SOME_STRUCT改名成它應該叫的名字,即_PER_SOCKET_CONTEXT:
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET m_Socket; // 每一個客戶端連線的Socket
SOCKADDR_IN m_ClientAddr; // 客戶端的地址
CArray<_PER_IO_CONTEXT*> m_arrayIoContext; // 客戶端網路操作的上下文資料,
};
我們再次觀察GetQueuedCompletionStatus的函式簽名會發現,其第三個引數正好就是一個OVERLAPPED結構指標,至此我們在工作執行緒裡面不僅可以知道是哪個socket的事件,同時能通過OVERLAPPED*後面的欄位知道是收資料還是發資料:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 偵聽socket控制代碼)
{
//新連線接收成功,做一些操作
}
//普通客戶端socket收發資料
else
{
//通過pOverlapped結構得到pIOContext
PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;
if (pIOContext->Type == 收)
{
//做一些操作2,比如解析資料
}
else if (pIOContext->Type == 發)
{
//做一些操作3,比如顯示一條資料傳送成功資訊
}
}
}
小結構體指標轉換成大結構體指標操作:PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;微軟直接幫我們定義了一個巨集CONTAINING_RECORD來操作:
//
// Calculate the address of the base of the structure given its type, and an
// address of a field within the structure.
//
#define CONTAINING_RECORD(address, type, field) ((type *)( \
(PCHAR)(address) - \
(ULONG_PTR)(&((type *)0)->field)))
所以上述程式碼也可以寫成:
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
不知道你是否記得前面中說過每消耗一個預先準備客戶端的socket,就要補上一個。這個程式碼現在看來就應該放在連線成功事件裡面了:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 偵聽socket控制代碼)
{
//連線成功後可以做以下事情:
//1. 獲取對端和本端的ip地址和埠號,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步,不是必須)
//2. 如果對端連線成功後會發資料過來,則可以從初始化時呼叫AcceptEx準備的緩衝區裡面拿到,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步不是必須)
//3. 再次呼叫AcceptEx補充一個sAcceptSocket(這一步是必須的)
}
//普通客戶端socket收發資料
else
{
//通過pOverlapped結構得到pIOContext
PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;
if (pIOContext->Type == 收)
{
//做一些操作2,比如解析資料
}
else if (pIOContext->Type == 發)
{
//做一些操作3,比如顯示一條資料傳送成功資訊
}
}
}
上面連線成功後的偽碼,第1步和第2步不是必須的,而第3步是必須的,如果不及時補充的話,等連線數多於準備的socket,可能就會發生故障了。
因為兩端的地址資訊和對端發過來的第一組資料都在同一個緩衝區裡面,再次看下AcceptEx函式簽名吧:
BOOL AcceptEx(
_In_ SOCKET sListenSocket,
_In_ SOCKET sAcceptSocket,
_In_ PVOID lpOutputBuffer,
_In_ DWORD dwReceiveDataLength,
_In_ DWORD dwLocalAddressLength,
_In_ DWORD dwRemoteAddressLength,
_Out_ LPDWORD lpdwBytesReceived,
_In_ LPOVERLAPPED lpOverlapped
);
雖然可以根據dwReceiveDataLength、dwLocalAddressLength、dwRemoteAddressLength、lpdwBytesReceived這幾個引數計算出來,但是微軟提供了一個函式來幫我們做這個解析動作: 同理,這個函式最好也要通過WSAIoctl函式來動態獲取:
// 獲取GetAcceptExSockAddrs函式指標,也是同理
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能獲取GuidGetAcceptExSockAddrs函式指標。錯誤程式碼: %d\n"), WSAGetLastError());
this->_DeInitialize();
return false;
}
然後使用返回的函式指標來使用函式GetAcceptExSockaddrs。解析地址資訊和第一組資料的程式碼如下:
SOCKADDR_IN* ClientAddr = NULL; SOCKADDR_IN* LocalAddr = NULL; int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN); /////////////////////////////////////////////////////////////////////////// // 1. 首先取得連入客戶端的地址資訊 // 這個 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~ // 不但可以取得客戶端和本地端的地址資訊,還能順便取出客戶端發來的第一組資料,老強大了... this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2), sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen); this->_ShowMessage( _T("客戶端 %s:%d 連入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) ); this->_ShowMessage( _T("客戶額 %s:%d 資訊:%s."),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf );
以上介紹的是接收新連線成功後的處理,那收資料和發資料的準備工作在哪裡做呢?(收取第一組資料可以在呼叫AcceptEx的地方做)。這個就仁者見仁,智者見智了。比如可以在新連線接收成功之後,立即準備給對端發資料;或者在收到對端資料的時候準備給對端發資料;在傳送資料完成後準備收對端資料。偽碼如下:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
if (((SOME_STRUCT*)pSocketContext)->s == 偵聽socket控制代碼)
{
//連線成功後可以做以下事情:
//1. 獲取對端和本端的ip地址和埠號,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步,不是必須)
//2. 如果對端連線成功後會發資料過來,則可以從初始化時呼叫AcceptEx準備的緩衝區裡面拿到,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步不是必須)
//3. 再次呼叫AcceptEx補充一個sAcceptSocket(這一步是必須的)
//4. 呼叫WSASend準備傳送資料工作或呼叫WSARecv準備接收資料工作(這一步,不是必須)
}
//普通客戶端socket收發資料
else
{
//通過pOverlapped結構得到pIOContext
PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;
if (pIOContext->Type == 收)
{
//解析收到的資料(這一步,不是必須)
//呼叫WSASend準備傳送資料工作(比如應答客戶端)(這一步,不是必須)
//繼續呼叫WSARecv準備收取資料工作(這一步,不是必須)
}
else if (pIOContext->Type == 發)
{
//呼叫WSARecv準備收取資料工作(這一步,不是必須)
}
}
}
現在還剩下最後一個問題,就是工作執行緒如何退出。當然你可以在每次判斷標識位前先判斷一個退出標識。但是如果工作執行緒正好被GetQueuedCompletionStatus掛載那裡呢?如何喚醒,微軟提供了另外一個函式:PostQueuedCompletionStatus,看下這個函式的簽名:
BOOL WINAPI PostQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_In_ DWORD dwNumberOfBytesTransferred,
_In_ ULONG_PTR dwCompletionKey,
_In_opt_ LPOVERLAPPED lpOverlapped
);
這個函式可以喚醒被GetQueuedCompletionStatus函式掛起的工作執行緒,當然其第三個引數也是一個CompletionKey(dwCompletionKey)。你可以使用這個dwCompletionKey做標識幹一些其它的事情,當然設定一個退出碼也可以。例如:
PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
這樣工作執行緒裡面就可以使用EXIT_CODE來作為退出標誌:
DWORD ThreadFunction()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
// 如果收到的是退出標誌,則直接退出
if ( EXIT_CODE==(DWORD)pSocketContext )
{
return 0;
}
if (((SOME_STRUCT*)pSocketContext)->s == 偵聽socket控制代碼)
{
//連線成功後可以做以下事情:
//1. 獲取對端和本端的ip地址和埠號,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步,不是必須)
//2. 如果對端連線成功後會發資料過來,則可以從初始化時呼叫AcceptEx準備的緩衝區裡面拿到,即AcceptEx的第三個引數lpOutputBuffer中拿(這一步不是必須)
//3. 再次呼叫AcceptEx補充一個sAcceptSocket(這一步是必須的)
//4. 呼叫WSASend準備傳送資料工作或呼叫WSARecv準備接收資料工作(這一步,不是必須)
}
//普通客戶端socket收發資料
else
{
//通過pOverlapped結構得到pIOContext
PER_IO_CONTEXT* pIOContext = (PER_IO_CONTEXT*)pOverlapped;
if (pIOContext->Type == 收)
{
//解析收到的資料(這一步,不是必須)
//呼叫WSASend準備傳送資料工作(比如應答客戶端)(這一步,不是必須)
//繼續呼叫WSARecv準備收取資料工作(這一步,不是必須)
}
else if (pIOContext->Type == 發)
{
//呼叫WSARecv準備收取資料工作(這一步,不是必須)
}
}
return 0;
}
至此,關於完成埠的東西就全部介紹完了。我們總結一下,掌握完成埠的關鍵在於理解以下幾點:
1. 完成埠綁定了某個socket後,不僅其事件的讀寫檢測由作業系統完成,而且就算是接受新連線、收發資料的動作也是由作業系統代勞了,作業系統完成後會通知你。等你收到通知時,一切都完成好了。你可以直接取出對應的資料使用。
2. 要想第1點介紹的事情由作業系統代勞,你必須預先準備很多資料結構,比如兩端的地址結構體、收發緩衝區、和用來表示新連線的socket等等,這些準備工作可能在程式初始化階段,也可能在工作執行緒某個事件處理的地方。
3. 初始化準備好的各種緩衝區如何在工作執行緒裡面引用到的關鍵就在於繫結完成埠時CompletionKey和準備收發緩衝區時OVERLAPPED結構體的使用, CompletionKey對應PER Socket Data, OVERLAPPED對應Per IO Data,即CompletionKey是單Socket資料,OVERLAPPED是單IO資料。
下面給出上文中使用到的對完成埠模型封裝的類的全部程式碼:
IOCPModel.h
/*
==========================================================================
Purpose:
* 這個類CIOCPModel是本程式碼的核心類,用於說明WinSock伺服器端程式設計模型中的
完成埠(IOCP)的使用方法,並使用MFC對話方塊程式來呼叫這個類實現了基本的
伺服器網路通訊的功能。
* 其中的PER_IO_DATA結構體是封裝了用於每一個重疊操作的引數
PER_HANDLE_DATA 是封裝了用於每一個Socket的引數,也就是用於每一個完成埠的引數
* 詳細的文件說明請參考 http://blog.csdn.net/PiggyXP
Notes:
* 具體講明瞭伺服器端建立完成埠、建立工作者執行緒、投遞Recv請求、投遞Accept請求的方法,
所有的客戶端連入的Socket都需要繫結到IOCP上,所有從客戶端發來的資料,都會實時顯示到
主介面中去。
Author:
* PiggyXP【小豬】
Date:
* 2009/10/04
==========================================================================
*/
#pragma once
// winsock 2 的標頭檔案和庫
#include <winsock2.h>
#include <MSWSock.h>
#pragma comment(lib,"ws2_32.lib")
// 緩衝區長度 (1024*8)
// 之所以為什麼設定8K,也是一個江湖上的經驗值
// 如果確實客戶端發來的每組資料都比較少,那麼就設定得小一些,省記憶體
#define MAX_BUFFER_LEN 8192
// 預設埠
#define DEFAULT_PORT 12345
// 預設IP地址
#define DEFAULT_IP _T("127.0.0.1")
//////////////////////////////////////////////////////////////////
// 在完成埠上投遞的I/O操作的型別
typedef enum _OPERATION_TYPE
{
ACCEPT_POSTED, // 標誌投遞的Accept操作
SEND_POSTED, // 標誌投遞的是傳送操作
RECV_POSTED, // 標誌投遞的是接收操作
NULL_POSTED // 用於初始化,無意義
}OPERATION_TYPE;
//====================================================================================
//
// 單IO資料結構體定義(用於每一個重疊操作的引數)
//
//====================================================================================
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped; // 每一個重疊網路操作的重疊結構(針對每一個Socket的每一個操作,都要有一個)
SOCKET m_sockAccept; // 這個網路操作所使用的Socket
WSABUF m_wsaBuf; // WSA型別的緩衝區,用於給重疊操作傳引數的
char m_szBuffer[MAX_BUFFER_LEN]; // 這個是WSABUF裡具體存字元的緩衝區
OPERATION_TYPE m_OpType; // 標識網路操作的型別(對應上面的列舉)
// 初始化
_PER_IO_CONTEXT()
{
ZeroMemory(&m_Overlapped, sizeof(m_Overlapped));
ZeroMemory( m_szBuffer,MAX_BUFFER_LEN );
m_sockAccept = INVALID_SOCKET;
m_wsaBuf.buf = m_szBuffer;
m_wsaBuf.len = MAX_BUFFER_LEN;
m_OpType = NULL_POSTED;
}
// 釋放掉Socket
~_PER_IO_CONTEXT()
{
if( m_sockAccept!=INVALID_SOCKET )
{
closesocket(m_sockAccept);
m_sockAccept = INVALID_SOCKET;
}
}
// 重置緩衝區內容
void ResetBuffer()
{
ZeroMemory( m_szBuffer,MAX_BUFFER_LEN );
}
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;
//====================================================================================
//
// 單控制代碼資料結構體定義(用於每一個完成埠,也就是每一個Socket的引數)
//
//====================================================================================
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET m_Socket; // 每一個客戶端連線的Socket
SOCKADDR_IN m_ClientAddr; // 客戶端的地址
CArray<_PER_IO_CONTEXT*> m_arrayIoContext; // 客戶端網路操作的上下文資料,
// 也就是說對於每一個客戶端Socket,是可以在上面同時投遞多個IO請求的
// 初始化
_PER_SOCKET_CONTEXT()
{
m_Socket = INVALID_SOCKET;
memset(&m_ClientAddr, 0, sizeof(m_ClientAddr));
}
// 釋放資源
~_PER_SOCKET_CONTEXT()
{
if( m_Socket!=INVALID_SOCKET )
{
closesocket( m_Socket );
m_Socket = INVALID_SOCKET;
}
// 釋放掉所有的IO上下文資料
for( int i=0;i<m_arrayIoContext.GetCount();i++ )
{
delete m_arrayIoContext.GetAt(i);
}
m_arrayIoContext.RemoveAll();
}
// 獲取一個新的IoContext
_PER_IO_CONTEXT* GetNewIoContext()
{
_PER_IO_CONTEXT* p = new _PER_IO_CONTEXT;
m_arrayIoContext.Add( p );
return p;
}
// 從陣列中移除一個指定的IoContext
void RemoveContext( _PER_IO_CONTEXT* pContext )
{
ASSERT( pContext!=NULL );
for( int i=0;i<m_arrayIoContext.GetCount();i++ )
{
if( pContext==m_arrayIoContext.GetAt(i) )
{
delete pContext;
pContext = NULL;
m_arrayIoContext.RemoveAt(i);
break;
}
}
}
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;
//====================================================================================
//
// CIOCPModel類定義
//
//====================================================================================
// 工作者執行緒的執行緒引數
class CIOCPModel;
typedef struct _tagThreadParams_WORKER
{
CIOCPModel* pIOCPModel; // 類指標,用於呼叫類中的函式
int nThreadNo; // 執行緒編號
} THREADPARAMS_WORKER,*PTHREADPARAM_WORKER;
// CIOCPModel類
class CIOCPModel
{
public:
CIOCPModel(void);
~CIOCPModel(void);
public:
// 啟動伺服器
bool Start();
// 停止伺服器
void Stop();
// 載入Socket庫
bool LoadSocketLib();
// 解除安裝Socket庫,徹底完事
void UnloadSocketLib() { WSACleanup(); }
// 獲得本機的IP地址
CString GetLocalIP();
// 設定監聽埠
void SetPort( const int& nPort ) { m_nPort=nPort; }
// 設定主介面的指標,用於呼叫顯示資訊到介面中
void SetMainDlg( CDialog* p ) { m_pMain=p; }
protected:
// 初始化IOCP
bool _InitializeIOCP();
// 初始化Socket
bool _InitializeListenSocket();
// 最後釋放資源
void _DeInitialize();
// 投遞Accept請求
bool _PostAccept( PER_IO_CONTEXT* pAcceptIoContext );
// 投遞接收資料請求
bool _PostRecv( PER_IO_CONTEXT* pIoContext );
// 在有客戶端連入的時候,進行處理
bool _DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext );
// 在有接收的資料到達的時候,進行處理
bool _DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext );
// 將客戶端的相關資訊儲存到陣列中
void _AddToContextList( PER_SOCKET_CONTEXT *pSocketContext );
// 將客戶端的資訊從陣列中移除
void _RemoveContext( PER_SOCKET_CONTEXT *pSocketContext );
// 清空客戶端資訊
void _ClearContextList();
// 將控制代碼繫結到完成埠中
bool _AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext);
// 處理完成埠上的錯誤
bool HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr );
// 執行緒函式,為IOCP請求服務的工作者執行緒
static DWORD WINAPI _WorkerThread(LPVOID lpParam);
// 獲得本機的處理器數量
int _GetNoOfProcessors();
// 判斷客戶端Socket是否已經斷開
bool _IsSocketAlive(SOCKET s);
// 在主介面中顯示資訊
void _ShowMessage( const CString szFormat,...) const;
private:
HANDLE m_hShutdownEvent; // 用來通知執行緒系統退出的事件,為了能夠更好的退出執行緒
HANDLE m_hIOCompletionPort; // 完成埠的控制代碼
HANDLE* m_phWorkerThreads; // 工作者執行緒的控制代碼指標
int m_nThreads; // 生成的執行緒數量
CString m_strIP; // 伺服器端的IP地址
int m_nPort; // 伺服器端的監聽埠
CDialog* m_pMain; // 主介面的介面指標,用於在主介面中顯示訊息
CRITICAL_SECTION m_csContextList; // 用於Worker執行緒同步的互斥量
CArray<PER_SOCKET_CONTEXT*> m_arrayClientContext; // 客戶端Socket的Context資訊
PER_SOCKET_CONTEXT* m_pListenContext; // 用於監聽的Socket的Context資訊
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx 和 GetAcceptExSockaddrs 的函式指標,用於呼叫這兩個擴充套件函式
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
};
IOCPModel.cpp
#include "StdAfx.h"
#include "IOCPModel.h"
#include "MainDlg.h"
// 每一個處理器上產生多少個執行緒(為了最大限度的提升伺服器效能,詳見配套文件)
#define WORKER_THREADS_PER_PROCESSOR 2
// 同時投遞的Accept請求的數量(這個要根據實際的情況靈活設定)
#define MAX_POST_ACCEPT 10
// 傳遞給Worker執行緒的退出訊號
#define EXIT_CODE NULL
// 釋放指標和控制代碼資源的巨集
// 釋放指標巨集
#define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
// 釋放控制代碼巨集
#define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
// 釋放Socket巨集
#define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
CIOCPModel::CIOCPModel(void):
m_nThreads(0),
m_hShutdownEvent(NULL),
m_hIOCompletionPort(NULL),
m_phWorkerThreads(NULL),
m_strIP(DEFAULT_IP),
m_nPort(DEFAULT_PORT),
m_pMain(NULL),
m_lpfnAcceptEx( NULL ),
m_pListenContext( NULL )
{
}
CIOCPModel::~CIOCPModel(void)
{
// 確保資源徹底釋放
this->Stop();
}
///////////////////////////////////////////////////////////////////
// 工作者執行緒: 為IOCP請求服務的工作者執行緒
// 也就是每當完成埠上出現了完成資料包,就將之取出來進行處理的執行緒
///////////////////////////////////////////////////////////////////
DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)
{
THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;
int nThreadNo = (int)pParam->nThreadNo;
pIOCPModel->_ShowMessage(_T("工作者執行緒啟動,ID: %d."),nThreadNo);
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
// 迴圈處理請求,知道接收到Shutdown資訊為止
while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
{
BOOL bReturn = GetQueuedCompletionStatus(
pIOCPModel->m_hIOCompletionPort,
&dwBytesTransfered,
(PULONG_PTR)&pSocketContext,
&pOverlapped,
INFINITE);
// 如果收到的是退出標誌,則直接退出
if ( EXIT_CODE==(DWORD)pSocketContext )
{
break;
}
// 判斷是否出現了錯誤
if( !bReturn )
{
DWORD dwErr = GetLastError();
// 顯示一下提示資訊
if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
{
break;
}
continue;
}
else
{
// 讀取傳入的引數
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
// 判斷是否有客戶端斷開了
if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType))
{
pIOCPModel->_ShowMessage( _T("客戶端 %s:%d 斷開連線."),inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), ntohs(pSocketContext->m_ClientAddr.sin_port) );
// 釋放掉對應的資源
pIOCPModel->_RemoveContext( pSocketContext );
continue;
}
else
{
switch( pIoContext->m_OpType )
{
// Accept
case ACCEPT_POSTED:
{
// 為了增加程式碼可讀性,這裡用專門的_DoAccept函式進行處理連入請求
pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
}
break;
// RECV
case RECV_POSTED:
{
// 為了增加程式碼可讀性,這裡用專門的_DoRecv函式進行處理接收請求
pIOCPModel->_DoRecv( pSocketContext,pIoContext );
}
break;
// SEND
// 這裡略過不寫了,要不程式碼太多了,不容易理解,Send操作相對來講簡單一些
case SEND_POSTED:
{
}
break;
default:
// 不應該執行到這裡
TRACE(_T("_WorkThread中的 pIoContext->m_OpType 引數異常.\n"));
break;
} //switch
}//if
}//if
}//while
TRACE(_T("工作者執行緒 %d 號退出.\n"),nThreadNo);
// 釋放執行緒引數
RELEASE(lpParam);
return 0;
}
//====================================================================================
//
// 系統初始化和終止
//
//====================================================================================
////////////////////////////////////////////////////////////////////
// 初始化WinSock 2.2
bool CIOCPModel::LoadSocketLib()
{
WSADATA wsaData;
int nResult;
nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
// 錯誤(一般都不可能出現)
if (NO_ERROR != nResult)
{
this->_ShowMessage(_T("初始化WinSock 2.2失敗!\n"));
return false;
}
return true;
}
//////////////////////////////////////////////////////////////////
// 啟動伺服器
bool CIOCPModel::Start()
{
// 初始化執行緒互斥量
InitializeCriticalSection(&m_csContextList);
// 建立系統退出的事件通知
m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// 初始化IOCP
if (false == _InitializeIOCP())
{
this->_ShowMessage(_T("初始化IOCP失敗!\n"));
return false;
}
else
{
this->_ShowMessage(_T("\nIOCP初始化完畢\n."));
}
// 初始化Socket
if( false==_InitializeListenSocket() )
{
this->_ShowMessage(_T("Listen Socket初始化失敗!\n"));
this->_DeInitialize();
return false;
}
else
{
this->_ShowMessage(_T("Listen Socket初始化完畢."));
}
this->_ShowMessage(_T("系統準備就緒,等候連線....\n"));
return true;
}
////////////////////////////////////////////////////////////////////
// 開始傳送系統退出訊息,退出完成埠和執行緒資源
void CIOCPModel::Stop()
{
if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )
{
// 啟用關閉訊息通知
SetEvent(m_hShutdownEvent);
for (int i = 0; i < m_nThreads; i++)
{
// 通知所有的完成埠操作退出
PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
}
// 等待所有的客戶端資源退出
WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
// 清除客戶端列表資訊
this->_ClearContextList();
// 釋放其他資源
this->_DeInitialize();
this->_ShowMessage(_T("停止監聽\n"));
}
}
////////////////////////////////
// 初始化完成埠
bool CIOCPModel::_InitializeIOCP()
{
// 建立第一個完成埠
m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
if ( NULL == m_hIOCompletionPort)
{
this->_ShowMessage(_T("建立完成埠失敗!錯誤程式碼: %d!\n"), WSAGetLastError());
return false;
}
// 根據本機中的處理器數量,建立對應的執行緒數
m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
// 為工作者執行緒初始化控制代碼
m_phWorkerThreads = new HANDLE[m_nThreads];
// 根據計算出來的數量建立工作者執行緒
DWORD nThreadID;
for (int i = 0; i < m_nThreads; i++)
{
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->pIOCPModel = this;
pThreadParams->nThreadNo = i+1;
m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
}
TRACE(" 建立 _WorkerThread %d 個.\n", m_nThreads );
return true;
}
/////////////////////////////////////////////////////////////////
// 初始化Socket
bool CIOCPModel::_InitializeListenSocket()
{
// AcceptEx 和 GetAcceptExSockaddrs 的GUID,用於匯出函式指標
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
// 伺服器地址資訊,用於繫結Socket
struct sockaddr_in ServerAddress;
// 生成用於監聽的Socket的資訊
m_pListenContext = new PER_SOCKET_CONTEXT;
// 需要使用重疊IO,必須得使用WSASocket來建立Socket,才可以支援重疊IO操作
m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_pListenContext->m_Socket)
{
this->_ShowMessage(_T("初始化Socket失敗,錯誤程式碼: %d.\n"), WSAGetLastError());
return false;
}
else
{
TRACE(_T("WSASocket() 完成.\n"));
}
// 將Listen Socket繫結至完成埠中
if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
{
this->_ShowMessage(_T("繫結 Listen Socket至完成埠失敗!錯誤程式碼: %d/n"), WSAGetLastError());
RELEASE_SOCKET( m_pListenContext->m_Socket );
return false;
}
else
{
TRACE(_T("Listen Socket繫結完成埠 完成.\n"));
}
// 填充地址資訊
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
// 這裡可以繫結任何可用的IP地址,或者繫結一個指定的IP地址
ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
//ServerAddress.sin_addr.s_addr = inet_addr(CStringA(m_strIP).GetString());
ServerAddress.sin_port = htons(m_nPort);
// 繫結地址和埠
if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
{
this->_ShowMessage(_T("bind()函式執行錯誤.\n"));
return false;
}
else
{
TRACE(_T("bind() 完成.\n"));
}
// 開始進行監聽
if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
{
this->_ShowMessage(_T("Listen()函式執行出現錯誤.\n"));
return false;
}
else
{
TRACE(_T("Listen() 完成.\n"));
}
// 使用AcceptEx函式,因為這個是屬於WinSock2規範之外的微軟另外提供的擴充套件函式
// 所以需要額外獲取一下函式的指標,
// 獲取AcceptEx函式指標
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能獲取AcceptEx函式指標。錯誤程式碼: %d\n"), WSAGetLastError());
this->_DeInitialize();
return false;
}
// 獲取GetAcceptExSockAddrs函式指標,也是同理
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage(_T("WSAIoctl 未能獲取GuidGetAcceptExSockAddrs函式指標。錯誤程式碼: %d\n"), WSAGetLastError());
this->_DeInitialize();
return false;
}
// 為AcceptEx 準備引數,然後投遞AcceptEx I/O請求
for( int i=0;i<MAX_POST_ACCEPT;i++ )
{
// 新建一個IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if( false==this->_PostAccept( pAcceptIoContext ) )
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
this->_ShowMessage( _T("投遞 %d 個AcceptEx請求完畢"),MAX_POST_ACCEPT );
return true;
}
////////////////////////////////////////////////////////////
// 最後釋放掉所有資源
void CIOCPModel::_DeInitialize()
{
// 刪除客戶端列表的互斥量
DeleteCriticalSection(&m_csContextList);
// 關閉系統退出事件控制代碼
RELEASE_HANDLE(m_hShutdownEvent);
// 釋放工作者執行緒控制代碼指標
for( int i=0;i<m_nThreads;i++ )
{
RELEASE_HANDLE(m_phWorkerThreads[i]);
}
RELEASE(m_phWorkerThreads);
// 關閉IOCP控制代碼
RELEASE_HANDLE(m_hIOCompletionPort);
// 關閉監聽Socket
RELEASE(m_pListenContext);
this->_ShowMessage(_T("釋放資源完畢.\n"));
}
//====================================================================================
//
// 投遞完成埠請求
//
//====================================================================================
//////////////////////////////////////////////////////////////////
// 投遞Accept請求
bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
{
ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
// 準備引數
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
// 為以後新連入的客戶端先準備好Socket( 這個是與傳統accept最大的區別 )
pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
{
_ShowMessage(_T("建立用於Accept的Socket失敗!錯誤程式碼: %d"), WSAGetLastError());
return false;
}
// 投遞AcceptEx
if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
{
if(WSA_IO_PENDING != WSAGetLastError())
{
_ShowMessage(_T("投遞 AcceptEx 請求失敗,錯誤程式碼: %d"), WSAGetLastError());
return false;
}
}
return true;
}
////////////////////////////////////////////////////////////
// 在有客戶端連入的時候,進行處理
// 流程有點複雜,你要是看不懂的話,就看配套的文件吧....
// 如果能理解這裡的話,完成埠的機制你就消化了一大半了
// 總之你要知道,傳入的是ListenSocket的Context,我們需要複製一份出來給新連入的Socket用
// 原來的Context還是要在上面繼續投遞下一個Accept請求
//
bool CIOCPModel::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
{
SOCKADDR_IN* ClientAddr = NULL;
SOCKADDR_IN* LocalAddr = NULL;
int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
///////////////////////////////////////////////////////////////////////////
// 1. 首先取得連入客戶端的地址資訊
// 這個 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~
// 不但可以取得客戶端和本地端的地址資訊,還能順便取出客戶端發來的第一組資料,老強大了...
this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
this->_ShowMessage( _T("客戶端 %s:%d 連入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) );
this->_ShowMessage( _T("客戶額 %s:%d 資訊:%s."),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf );
//////////////////////////////////////////////////////////////////////////////////////////////////////
// 2. 這裡需要注意,這裡傳入的這個是ListenSocket上的Context,這個Context我們還需要用於監聽下一個連線
// 所以我還得要將ListenSocket上的Context複製出來一份為新連入的Socket新建一個SocketContext
PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
// 引數設定完畢,將這個Socket和完成埠繫結(這也是一個關鍵步驟)
if( false==this->_AssociateWithIOCP( pNewSocketContext ) )
{
RELEASE( pNewSocketContext );
return false;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// 3. 繼續,建立其下的IoContext,用於在這個Socket上投遞第一個Recv資料請求
PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
pNewIoContext->m_OpType = RECV_POSTED;
pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
// 如果Buffer需要保留,就自己拷貝一份出來
//memcpy( pNewIoContext->m_szBuffer,pIoContext->m_szBuffer,MAX_BUFFER_LEN );
// 繫結完畢之後,就可以開始在這個Socket上投遞完成請求了
if( false==this->_PostRecv( pNewIoContext) )
{
pNewSocketContext->RemoveContext( pNewIoContext );
return false;
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// 4. 如果投遞成功,那麼就把這個有效的客戶端資訊,加入到ContextList中去(需要統一管理,方便釋放資源)
this->_AddToContextList( pNewSocketContext );
////////////////////////////////////////////////////////////////////////////////////////////////
// 5. 使用完畢之後,把Listen Socket的那個IoContext重置,然後準備投遞新的AcceptEx
pIoContext->ResetBuffer();
return this->_PostAccept( pIoContext );
}
////////////////////////////////////////////////////////////////////
// 投遞接收資料請求
bool CIOCPModel::_PostRecv( PER_IO_CONTEXT* pIoContext )
{
// 初始化變數
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
// 初始化完成後,,投遞WSARecv請求
int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
// 如果返回值錯誤,並且錯誤的程式碼並非是Pending的話,那就說明這個重疊請求失敗了
if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
{
this->_ShowMessage(_T("投遞第一個WSARecv失敗!"));
return false;
}
return true;
}
/////////////////////////////////////////////////////////////////
// 在有接收的資料到達的時候,進行處理
bool CIOCPModel::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
{
// 先把上一次的資料顯示出現,然後就重置狀態,發出下一個Recv請求
SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
this->_ShowMessage( _T("收到 %s:%d 資訊:%s"),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf );
// 然後開始投遞下一個WSARecv請求
return _PostRecv( pIoContext );
}
/////////////////////////////////////////////////////
// 將控制代碼(Socket)繫結到完成埠中
bool CIOCPModel::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext )
{
// 將用於和客戶端通訊的SOCKET繫結到完成埠中
HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
if (NULL == hTemp)
{
this->_ShowMessage(_T("執行CreateIoCompletionPort()出現錯誤.錯誤程式碼:%d"),GetLastError());
return false;
}
return true;
}
//====================================================================================
//
// ContextList 相關操作
//
//====================================================================================
//////////////////////////////////////////////////////////////
// 將客戶端的相關資訊儲存到陣列中
void CIOCPModel::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData )
{
EnterCriticalSection(&m_csContextList);
m_arrayClientContext.Add(pHandleData);
LeaveCriticalSection(&m_csContextList);
}
////////////////////////////////////////////////////////////////
// 移除某個特定的Context
void CIOCPModel::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext )
{
EnterCriticalSection(&m_csContextList);
for( int i=0;i<m_arrayClientContext.GetCount();i++ )
{
if( pSocketContext==m_arrayClientContext.GetAt(i) )
{
RELEASE( pSocketContext );
m_arrayClientContext.RemoveAt(i);
break;
}
}
LeaveCriticalSection(&m_csContextList);
}
////////////////////////////////////////////////////////////////
// 清空客戶端資訊
void CIOCPModel::_ClearContextList()
{
EnterCriticalSection(&m_csContextList);
for( int i=0;i<m_arrayClientContext.GetCount();i++ )
{
delete m_arrayClientContext.GetAt(i);
}
m_arrayClientContext.RemoveAll();
LeaveCriticalSection(&m_csContextList);
}
//====================================================================================
//
// 其他輔助函式定義
//
//====================================================================================
////////////////////////////////////////////////////////////////////
// 獲得本機的IP地址
CString CIOCPModel::GetLocalIP()
{
// 獲得本機主機名
char hostname[MAX_PATH] = {0};
gethostname(hostname,MAX_PATH);
struct hostent FAR* lpHostEnt = gethostbyname(hostname);
if(lpHostEnt == NULL)
{
return DEFAULT_IP;
}
// 取得IP地址列表中的第一個為返回的IP(因為一臺主機可能會繫結多個IP)
LPSTR lpAddr = lpHostEnt->h_addr_list[0];
// 將IP地址轉化成字串形式
struct in_addr inAddr;
memmove(&inAddr,lpAddr,4);
m_strIP = CString( inet_ntoa(inAddr) );
return m_strIP;
}
///////////////////////////////////////////////////////////////////
// 獲得本機中處理器的數量
int CIOCPModel::_GetNoOfProcessors()
{
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
}
/////////////////////////////////////////////////////////////////////
// 在主介面中顯示提示資訊
void CIOCPModel::_ShowMessage(const CString szFormat,...) const
{
// 根據傳入的引數格式化字串
CString strMessage;
va_list arglist;
// 處理變長引數
va_start(arglist, szFormat);
strMessage.FormatV(szFormat,arglist);
va_end(arglist);
// 在主介面中顯示
CMainDlg* pMain = (CMainDlg*)m_pMain;
if( m_pMain!=NULL )
{
pMain->AddInformation(strMessage);
TRACE( strMessage+_T("\n") );
}
}
/////////////////////////////////////////////////////////////////////
// 判斷客戶端Socket是否已經斷開,否則在一個無效的Socket上投遞WSARecv操作會出現異常
// 使用的方法是嘗試向這個socket傳送資料,判斷這個socket呼叫的返回值
// 因為如果客戶端網路異常斷開(例如客戶端崩潰或者拔掉網線等)的時候,伺服器端是無法收到客戶端斷開的通知的
bool CIOCPModel::_IsSocketAlive(SOCKET s)
{
int nByteSent=send(s,"",0,0);
if (-1 == nByteSent) return false;
return true;
}
///////////////////////////////////////////////////////////////////
// 顯示並處理完成埠上的錯誤
bool CIOCPModel::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr )
{
// 如果是超時了,就再繼續等吧
if(WAIT_TIMEOUT == dwErr)
{
// 確認客戶端是否還活著...
if( !_IsSocketAlive( pContext->m_Socket) )
{
this->_ShowMessage( _T("檢測到客戶端異常退出!") );
this->_RemoveContext( pContext );
return true;
}
else
{
this->_ShowMessage( _T("網路操作超時!重試中...") );
return true;
}
}
// 可能是客戶端異常退出了
else if( ERROR_NETNAME_DELETED==dwErr )
{
this->_ShowMessage( _T("檢測到客戶端異常退出!") );
this->_RemoveContext( pContext );
return true;
}
else
{
this->_ShowMessage( _T("完成埠操作出現錯誤,執行緒退出。錯誤程式碼:%d"),dwErr );
return false;
}
}
參考連結:http://blog.csdn.net/piggyxp/article/details/6922277
2017.07.06