Windows socket之IO完成埠(IOCP)模型開發
Windows socket之IO完成埠(IOCP)模型開發
IO完成埠是一種核心物件。利用完成埠,套接字應用程式能夠管理數百上千個套接字。應用程式建立完成埠物件後,通過指定一定數量的服務執行緒,為已經完成的重疊IO操作提供服務。該模型可以達到最後的系統性能。
完成埠是一種真正意義上的非同步模型。在重疊IO模型中,當Windows socket應用程式在呼叫WSARecv函式後立即返回,執行緒繼續執行。另一執行緒在在完成埠等待操作結果,當系統接收資料完成後,會向完成埠傳送通知,然後應用程式對資料進行處理。
為了將Windows打造成一個出色的伺服器環境,
伺服器有兩種執行緒模型:序列和併發模型。
序列模型:單個執行緒等待客戶端請求。當請求到來時,該執行緒被喚醒來處理請求。但是當多個客戶端同時向伺服器發出請求時,這些請求必須依次被請求。
併發模型:單個執行緒等待請求到來。當請求到來時,會建立新執行緒來處理。但是隨著更多的請求到來必須建立更多的執行緒。這會導致系統核心進行上下文切換花費更多的時間。執行緒無法即時響應客戶請求。伴隨著不斷有客戶端請求、退出,系統會不斷新建和銷燬執行緒,這同樣會增加系統開銷。
而IO完成埠卻可以很好的解決以上問題。它的目標就是實現高效伺服器程式。
與重疊IO相比較
重疊IO與IO完成埠模型都是非同步模型。都可以改善程式效能。但是它們也有以下區別:
1:在重疊IO使用事件通知時,WSAWaitForMultipleEvents只能等待WSA_MAXIMUM_WAIT_EVENTS(64)個事件。這限制了伺服器提供服務的客戶端的數量。
2:事件物件、套接字和WSAOVERLAPPED結構必須一一對應關係,如果出現一點疏漏將會導致嚴重的後果。
完成埠模型實現包括以下步驟:
1:建立完成埠
2:將套接字與完成埠關聯。
3:呼叫輸入輸出函式,發起重疊IO操作。
4:在服務執行緒中,等待完成埠重疊IO操作結果。
雖然在《談談Windows核心程式設計系列》十非同步IO之IO完成埠博文中,已經詳細介紹了IO完成埠的方方面面.但是處於完整性的考慮,此處介紹與Windows socket IO完成埠開發有關的內容再介紹一遍。
一:建立IO完成埠 IO完成埠也是一個核心物件。呼叫以下函式建立IO完成埠核心物件。
1 HANDLE CreateIoCompletionPort(
2
3 HANDLE hFile,
4
5 HANDLE hExistingCompletionPort,
6
7 ULONG_PTR CompletionKey,
8
9 DWORD dwNumberOfConcurrentThreads);
這個函式會完成兩個任務:
一是建立一個IO完成埠物件。
二是將一個裝置與一個IO完成埠關聯起來。
hFile就是裝置控制代碼。在本文中就是套接字。
hExistingCompletionPort是與裝置關聯的IO完成埠控制代碼。為NULL時,系統會建立新的完成埠。
dwCompletionKey是一個對我們有意義的值,但是作業系統並不關心我們傳入的值。一般用它來區分各個裝置。
dwNumberOfConcurrentThreads告訴IO完成埠在同一時間最多能有多少程序處於可執行狀態。如果傳入0,那麼將使用預設值(併發的執行緒數量等於cpu數量)。
每次呼叫CreateIoCompletionPort時,函式會判斷hExistingCompletionKey是否為NULL,如果為NULL,會建立新的完成埠核心物件。併為此完成埠建立裝置列表然後將裝置加入到此完成埠裝置列表中(先入先出)。
如果CreateIoCompletionPort呼叫成功則返回完成埠的控制代碼。否則返回NULL。
一般情況下,分兩次呼叫這個函式,每次實現一個功能。
首先建立新的完成埠(不關聯裝置):此時hFile應為INVALID_HANDLE_VALUE。
ExistingCompletionPort為NULL。
hIOPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
第二步:將套接字與IO完成埠關聯CreateIoCompletionPort(sListenSocket,hIOPort,完成鍵,0);
呼叫此函式即告訴系統:當IO操作完成時,想完成埠傳送一個IO操作完成通知。這些通知按照FIFO 方式在完成佇列中等待服務執行緒讀取。
在利用IO完成埠開發套接字應用程式時,通常宣告一個結構體儲存與套接字相關的資訊。該結構通常作為完成鍵傳遞給CreateIoCompletionPort用以區分與套接字相關的資訊。我們可以給完成鍵傳入任何對我們有用的資訊,一般情況下都是傳入一個結構的地址。如可以定義以下結構,:
typedef struct _completionKey
{
SOCKET s;
SOCKADDR_IN clientAddr;
}COMPLETIONKEY,*PCOMPLETIONKEY;
作為完成鍵傳遞給CreateIoCompletionPort程式碼如下:
PCOMPLETIONPKEY pCompletionKey=new COMPLETIONKEY;
SOCKADDR_IN addr;
int len;
sAccept=accept(sListen,(SOCKADDR*)&addr,&len);
pCompletionKey->s=sAccept;
pCompletionKey->clientAddr=addr;
HANDLE h=CreateIoCompletionPort((HANDLE)sAccept,
hIOPort,
(DWORD)pCompletionKey,
0);
3:發起重疊IO操作
將套接字與IO完成埠關聯後,應用程式可以呼叫以下函式,發起重疊IO操作:
WSASend和WSASendTo:傳送資料。
WSARecv和WSARecvFrom:接收資料。
在應用程式中通常宣告一個和IO操作相關的結構體,它是WSAOVERLAPPED結構的擴充套件。用以儲存每一次IO操作的相關資訊。該結構定義如下:
typdef struct _io_operation_data
{
WSAOVERLAPPED overlapped;
WSABUF dataBuf;
CHAR buffer[BUFFER_SIZE];
}IO_OPERATION_DATA;
除了上面這一種方法:將WSOVERLAPPED結構作為IO_OPERATION_DATAA的第一個成員外,還可以將IO_OPERATION_DATA結構繼承自WSAOVERLAPPED結構。效果是一樣的。
下列程式碼演示了呼叫WSARecv發起非同步接收資料的過程,程式清單如下:
IO_OPERATION_DATA *pIoData=new IO_OPERATION_DATA;
pIoData->dataBuf=pIoData->buffer;
pIoData->dataBuf.len=BUFFER_SIZE;
ZeroMemory(&pIoData->overlapped,sizeof(WSAOVERLAPPED));
if(WSARecv(sAccept,&(pIo->dataBuf),1,&recvBytes,&flags,&(pIoData->overlapped),NULL)==SOCKET_ERROR)
{
if(WSAGetLastError()!=ERROR_IO_PENDING)
{
return ;
}
}
4:等待重疊IO操作結果:
服務執行緒啟動後,呼叫GetQueuedCompletionStatus函式等待重疊IO操作的完成結果。當重疊IO操作完成時,IO操作完成通知被髮送到完成埠上,此時函式返回。
GetQueuedCompletionStatus函式從完成埠完成佇列中取出一個完成項。完成佇列為空則等待。該函式宣告如下:
1 BOOL GetQueuedCompletionStatus(
2
3 HANDLE hCompletionPort,
4
5 PDWORD pdwNumberOfBytesTransferred,
6
7 ULONG_PTR pCompletionKey,
8
9 OVERLAPPED** ppOverlapped,
10
11 DWORD dwMilliSeconds);
hCompletionPort表示執行緒希望對哪個完成埠進行監視,GetQueuedCompletionStatus的任務就是將呼叫執行緒切換到睡眠狀態,也就是阻塞在此函式上,直到指定的IO完成端口出現一項或者超時。
pdwNumberOfBytesTransferred返回在非同步IO完成時傳輸的位元組數。
pCompletionKey返回完成鍵。
ppOverlapped返回非同步IO開始時傳入的OVERLAPPED結構地址。
dwMillisecond指定等待時間。
函式執行成功則返回true,否則返回false。
如果在完成埠上成功等待一個完成項的到來,則函式返回TRUE。此時lpNumberOfBytesTransferred,lpCompletionKey和lpOverlapped引數返回相關資訊。一般從lpCompletionKey和lpOverlapped獲得與本次IO相關的資訊。
如果在完成埠等待失敗,則返回false,此時lpOverlapped不為NULL。如果等待超時,則返回false,錯誤程式碼為WAIT_TIMEOUT。
綜上,在使用完成埠開發Windows socket應用程式時,一般需要定義兩種資料結構:完成鍵和擴充套件的WSAOVERLAPPED結構。完成鍵儲存與套接字有關的資訊。在GetQueuedCompletionStatus返回時可以通過該引數獲取套接字的相關資訊。這用於區分不同裝置。
擴充套件的WSAOVERLAPPED結構,儲存每次發起IO操作時IO操作相關的資訊。當GetQueuedCompletionStatus返回時通過該引數獲取套接字的IO操作相關資訊。
下面展示GetQueuedCompletionStatus函式的用法:
PCOMPLETIONKEY pCompletionKey;
DWORD dwNumberOfBytesTransferrd;
LPOVERLAPPED pOverlapped;
bool ret=GetQueuedCompletionStatus(hIOPort,&dwNumberOfBytesTransferred,(LPDWORD)pCompletionKey,&pOverlapped,100);
if(ret)
{
//等待成功。
}
else
{
int err=WSAGetLastError();
if(NULL!=pOverlapped)
{
//失敗的IO操作。
}
else if(ret==WAIT_TIMEOUT)
{
//超時。
}
}
5:取消非同步操作。
當關閉套接字時,如果此時系統還有未完成的非同步操作,應用程式可以呼叫CancelIo函式取消等待執行的非同步操作。函式宣告如下:
bool CancelIo(HANDLE hFile);
如果函式呼叫成功,返回TRUE,所有在此套接字上等待的非同步操作都被成功的取消。
投遞完成通知
當伺服器退出,應用程式可以呼叫PostQueuedCompletionStatus函式向伺服器傳送一個特殊的完成通知。伺服器收到通知後即退出。
該函式宣告如下:
1 BOOL PostQueuedCompletionStatus(
2
3 HANDLE hCompletionPort,
4
5 DWORD dwNumBytes,
6
7 ULONG_PTR CompletionKey,
8
9 OVERLAPPED*pOverlapped);
這個函式用來將已完成的IO通知追加到IO完成埠的佇列中。
該函式的四個引數與GetQueuedCompletionStatus的函式相同。
hCompletionPort表示我們要將已完成的IO項新增到哪個完成埠的佇列中。
當應用程式退出時,可以指定該函式的後三個中的一個或多個為某個特殊值。
下面的程式碼段演示利用PostQueuedCompletionStatus函式傳送退出通知的過程:
PCOMPLETIONKEY pCompletionKey;
LPOVERLAPPED pOverlapped;
PostQueuedCompletionStatus(hIOPort,0,0,NULL);
bool ret=GetQueuedCompletionStatus(hIOPort,
&dwNumberOfBytesTransferred,
(LPDWORD)pCompletionKey,
&pOverlapped,
100);
if(NULL==pOverlapped&&NULL==pCompletionKey)
{
//伺服器退出。
}
利用完成埠開發應用程式可以按一下步驟進行:
1:呼叫CreateIoCompletionPort建立完成埠。
2:建立服務執行緒。
3:接受客戶端請求;
4:宣告完成鍵結構,它包含客戶端套接字資訊。
5:呼叫CreateIoCompletionPort將套接字與完成埠關聯起來。並傳入完成鍵。
6:宣告IO操作結構,它包含每次重疊IO時的操作資訊。如WSAOVERLAPPED結構,WSADATA結構等。
7:在服務執行緒中,呼叫GetQueuedCompletionStatus函式等待IO操作結果
完成埠模型開發伺服器程式:
為了簡單起見,重點突出如何利用完成埠模型開發Windows socket程式框架,伺服器會將接收到的客戶端的資料原封不動的返回給客戶端。
IO完成埠是非同步IO的一種。它是在處理與IO有關的操作時起作用。因此一個完整的伺服器程式,除了採用IOCP管理IO操作外,還必須使用前面介紹的幾種機制對套接字進行監聽。可以使用阻塞模式或者是WSAEventSelect模型管理。具體可以參考另一片博文《使用IOCP開發駕照考試系統》。
伺服器分為兩個執行緒。主執行緒和伺服器執行緒。主執行緒用於接收客戶端的連線請求(可以採用阻塞模式,或WSAEventSelect模式),並初始化重疊IO操作。伺服器執行緒用於為客戶端提供服務。
一:定義IO操作資料結構。在該結構中包含了OVERLAPPED,WSADATA和緩衝區,除此之外還包括IO操作型別。它指明發起IO操作的型別。根據GetQueuedCompletionStatus函式返回的重疊結構指標,可以獲取當前完成的IO操作型別,從而知道完成的是什麼IO操作。
typedef struct _io_operation_data
{
OVERLAPPED overlapped;
WSABUF dataBuf;
char buffer[BUFFER_SIZE];
char IoType;//IO操作型別:如READ或WRITE。
BYTE len;//實際傳輸的資料長度。
}IO_OPERATION_DATA;
二:定義CClient類。
CClient類用以管理伺服器接受的套接字,並執行與客戶端通訊的任務。CClient的建構函式引數為伺服器接受的套接字和客戶端地址。在解構函式中將這個套接字關閉。CClient的Recv用以接收資料,Send函式用以傳送資料。在該類中定義了一個IO_OPERATION_DATA型別m_io變數,儲存客戶端IO操作的相關資訊。可以為接收和傳送操作定義不同的IO_OPERATION_DATA變數。
#pragma once
typedef struct _io_operation_data
{
OVERLAPPED overlapped;
WSABUF dataBuf;
char buffer[BUFFER_SIZE];
char IoType;//IO操作型別:如READ或WRITE。
BYTE len;//實際傳輸的資料長度。
}IO_OPERATION_DATA;
由於GetQueuedCompletionStatus函式會等待成功接收和傳送非同步IO,因此為了區分接收和傳送,在WSAOVERLAPPED擴充套件結構,加一個IoType成員。這樣在GetQueuedCompletionStatus函式返回時,就可以根據此成員知道是接收或傳送非同步IO完成。
由於overlapped為該結構的第一個成員,因此overlapped的地址與IO_OPERATION_DATA結構地址相同。根據這一原理,在呼叫WSARecv時第六個引數可以傳入IO_OPERATION_DATA的地址。這樣在GetQueuedCompletionStatus函式返回時就可以根據返回的OVERLAPPED的地址得到IO_OPERATION_DATA的地址。另外一種實現方式是讓IO_OPERATION_DATA從OVERLAPPED結構上繼承而來。
CClient類宣告如下:
class CClient
{
public:
CClient(void);
~CClient(void);
public:
bool Recv();
bool Send();
public:
SOCKET m_s;
SOCKADDR_IN m_addr;
IO_OPERATION_DATA m_IoRecv;
IO_OPERATION_DATA m_IoSend;
}
Recv函式實現發起非同步操作接收資料功能。在該函式中首先設定當前IO操作型別為READ,然後呼叫WSARecv函式接收資料,最後判斷函式返回值是否為ERROR_IO_PENDING。如是,說明接收資料未能立即完成,即成功發起一個非同步接收資料操作:
Recv函式定義如下:
bool CClient::AsyRecv()
{
WSABUF wsabuf;
ZeroMemory(&m_IoRecv,sizeof(IO_OPERATION_DATA));
m_IoRecv.IOType=IOReadHead;
wsabuf.buf=(char*)&m_IoRecv.hdr;
wsabuf.len=sizeof(HDR);
DWORD flag=0;
int ret=WSARecv(m_s,&wsabuf,1,NULL,&flag,&m_IoRecv.overlapped,NULL);
if(ret==SOCKET_ERROR)
{
int err=WSAGetLastError();
if(err!=WSA_IO_PENDING)
{
return false;
}
}
return true;
}
Send函式定義:
bool CClient::AsySend()
{
ZeroMemory(&m_IoSend,sizeof(IO_OPERATION_DATA));
WSABUF wsabuf[2];
m_IoSend.IOType=IOWriteName;
m_IoSend.hdr.Len=m_StuName.GetLength();
m_IoSend.hdr.PacketType=STUNAME;
//傳送包頭。
wsabuf[0].buf=(char*)&m_IoSend.hdr;
wsabuf[0].len=sizeof(HDR);
//送包體。
wsabuf[1].buf=m_StuName.GetBuffer();
wsabuf[1].len=m_StuName.GetLength();
DWORD flag=0;
int ret=WSASend(m_s,wsabuf,2,NULL,flag,&m_IoSend.overlapped,NULL);
if(ret==SOCKET_ERROR)
{
int err=WSAGetLastError();
if(err!=WSA_IO_PENDING)
{
return false;
}
}
return true;
}
子執行緒用於迴圈呼叫GetQueuedCompletionStatus函式。等待接收或傳送非同步IO通知。看程式碼:
DWORD WINAPI CIOCPDriverLisenceExamServerView::ServiceThread( PVOID ppram )
{
CIOCPDriverLisenceExamServerView*pServer=(CIOCPDriverLisenceExamServerView*)ppram;
LPOVERLAPPED lpoverlapped;
CClient *pClient;
DWORD transferred;
while(pServer->m_IsRunning)
{
bool ret=GetQueuedCompletionStatus(pServer->m_hIOCP,&transferred,(LPDWORD)&pClient,&lpoverlapped,WSA_INFINITE);
if(ret&&lpoverlapped&&pClient)//成功的非同步IO完成。根據從lpoverlapped中得到的型別,進行操作。
{
IO_OPERATION_DATA*pIO=(IO_OPERATION_DATA*)lpoverlapped;
switch(pIO->IOType)
{
case IOReadHead:
{
pClient->AsyRecvHeaderCompleted();
}
break;
case IOReadBody:
{
pClient->AsyRecvBodyCompleted();
}
break;
case IOWritePaper:
{
//試卷傳送完畢。不執行動作。
pServer->UpdateClientState(pClient,CClient::LOGIN);
}
break;
case IOWriteName:
{
pClient->AsySendPaper();
}
break;
case IOWriteUnLogin:
{
g_clientManager.deleteClient(pClient);
}
break;
default:
break;
}
}
}
return 0;
}
監聽套接字使用WSAEventSelect模型管理。在子執行緒內迴圈呼叫WSAWaitForMultipleEvents函式。
看程式碼:
DWORD WINAPI CIOCPDriverLisenceExamServerView::AcceptClientThread( PVOID ppram )
{
CIOCPDriverLisenceExamServerView*pServer=(CIOCPDriverLisenceExamServerView*)ppram;
SOCKADDR_IN addr;
int len=sizeof(addr);
while(pServer->m_IsRunning)
{
int ret=WSAWaitForMultipleEvents(1,&pServer->m_hEvent,false,WSA_INFINITE,false);
if(ret==WSA_WAIT_TIMEOUT)
{
continue;
}
else
{
WSANETWORKEVENTS events;
int r=WSAEnumNetworkEvents(pServer->m_sListenSocket,pServer->m_hEvent,&events);//重置事件物件。
if(r==SOCKET_ERROR)
{
break;
}
if(events.lNetworkEvents&FD_ACCEPT)
{
if(events.iErrorCode[FD_ACCEPT_BIT]==0)//發生FD_ACCEPT網路事件。接受客戶端請求。
{
SOCKET sAccept=WSAAccept(pServer->m_sListenSocket,(SOCKADDR*)&addr,&len,0,NULL);
CClient*pClient=new CClient(sAccept,pServer);
if(CreateIoCompletionPort((HANDLE)sAccept,pServer->m_hIOCP,(ULONG_PTR)pClient,0)==NULL)
return -1;
g_clientManager.addClient(pClient);
//呼叫接收資料非同步IO。
if(!pClient->AsyRecvHeader())
g_clientManager.deleteClient(pClient);//接收資料失敗後,將此客戶端從連結串列刪除。
}
}
}
}
return 0;
}
如有紕漏,請不吝賜教。轉載請註明出處 !!謝謝
2012.1.21于山西大同