1. 程式人生 > >比較epoll與iocp的吞吐能力比較

比較epoll與iocp的吞吐能力比較

環境:機器xp系統 CPUi5-3470 + 4G記憶體, 在此基礎上安裝的虛擬機器vmware10(ubuntu)
epoll: 虛擬機器+ubuntu14,網絡卡顯示為1000M,單執行緒
client: 單執行緒

iocp:xp,網絡卡顯示為100M,多執行緒(沒有測試其執行緒池模式)
相同的網路協議dataLen(int)+data(N位元組)。
client: 多執行緒

接收到的資料不處理,保確定每次傳送的資料頭長度是否被覆蓋,程式實現的思路基本差不多,只測試兩個模式的吞吐能力。
測試結果:
iocp: 
server                                    client
接近400MB/s,  CPU:18%        CPU:80%
epoll:   
1000MB/s,      CPU:2%           CPU:2%(800KB/次)
20MB/s,          CPU:1%           CPU:1%(20KB/次)

多個Client時,epoll接收資料成倍增加。

也許作業系統、網絡卡速率不同,差異很多,感覺做伺服器,Linux有天生的優勢.
當每次發的資料包比較小時,Server端的接收速率上不去,大概在20MB/s多點.

iocp server:
// IOCP_TCPIP_Socket_Server.cpp   
  
#include    
#include    
#include    
#include    
  
using namespace std;  
  
#pragma comment(lib, "Ws2_32.lib")      // Socket程式設計需用的動態連結庫   
#pragma comment(lib, "Kernel32.lib")    // IOCP需要用到的動態連結庫   
  
/** 
 * 結構體名稱:PER_IO_DATA 
 * 結構體功能:重疊I/O需要用到的結構體,臨時記錄IO資料 
 **/  
 
#define BUFFSIZE 1024*4
#define SENDSIZE  1024*1

typedef struct{
 double nDataTotal;
 float fTotalLast;
 int  nActCount;
 int  nLastLen;
 int  TotalSize;
 int  ReadSize;
 DWORD tBegin;
 DWORD tEnd;
 DWORD tDiff;
}RecvInfo;

typedef struct  
{  
    OVERLAPPED overlapped;  
    WSABUF databuff;  
    char buffer[ BUFFSIZE ];
 char *pHeader;
    int TotalSize;
 int ReadSize;
    int operationType;  
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;  
  
/** 
 * 結構體名稱:PER_HANDLE_DATA 
 * 結構體儲存:記錄單個套接字的資料,包括了套接字的變數及套接字的對應的客戶端的地址。 
 * 結構體作用:當伺服器連線上客戶端時,資訊儲存到該結構體中,知道客戶端的地址以便於回訪。 
 **/  
typedef struct  
{  
    SOCKET socket;  
    SOCKADDR_STORAGE ClientAddr;  
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA; 

char *g_pRecvData;
// 定義全域性變數   
const int DefaultPort = 20000;         
vector < PER_HANDLE_DATA* > clientGroup;      // 記錄客戶端的向量組   
  
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);  
DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID);  
DWORD WINAPI ServerSendThread(LPVOID IpParam);  
void  WINAPI IOWorkDummy(DWORD ExeCode, DWORD IOSize, LPOVERLAPPED pOVL);

RecvInfo g_info;
// 開始主函式   
int main()  
{  
// 載入socket動態連結庫   
    WORD wVersionRequested = MAKEWORD(2, 2); // 請求2.2版本的WinSock庫   
    WSADATA wsaData;    // 接收Windows Socket的結構資訊   
    DWORD err = WSAStartup(wVersionRequested, &wsaData);  
  
    if (0 != err){  // 檢查套接字型檔是否申請成功   
        cerr << "Request Windows Socket Library Error!\n";  
        system("pause");  
        return -1;  
    }  
    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 檢查是否申請了所需版本的套接字型檔   
        WSACleanup();  
        cerr << "Request Windows Socket Version 2.2 Error!\n";  
        system("pause");  
        return -1;  
    }  
  
// 建立IOCP的核心物件   
    /** 
     * 需要用到的函式的原型: 
     * HANDLE WINAPI CreateIoCompletionPort( 
     *    __in   HANDLE FileHandle,     // 已經開啟的檔案控制代碼或者空控制代碼,一般是客戶端的控制代碼 
     *    __in   HANDLE ExistingCompletionPort, // 已經存在的IOCP控制代碼 
     *    __in   ULONG_PTR CompletionKey,   // 完成鍵,包含了指定I/O完成包的指定檔案 
     *    __in   DWORD NumberOfConcurrentThreads // 真正併發同時執行最大執行緒數,一般推介是CPU核心數*2 
     * ); 
     **/  
    HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);  
    if (NULL == completionPort){    // 建立IO核心物件失敗   
        cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
  
// 建立IOCP執行緒--執行緒裡面建立執行緒池   
  
    // 確定處理器的核心數量   
    SYSTEM_INFO mySysInfo;  
    GetSystemInfo(&mySysInfo);  
  
    // 基於處理器的核心數量建立執行緒   
    //for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i)
 for(DWORD i = 0; i < 4; ++i)
 {  
        // 建立伺服器工作器執行緒,並將完成埠傳遞到該執行緒   
        HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);  
        if(NULL == ThreadHandle){  
            cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;  
        system("pause");  
            return -1;  
        }  
        CloseHandle(ThreadHandle);  
   }  
  
// 建立流式套接字   
    SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0);  
  
// 繫結SOCKET到本機   
    SOCKADDR_IN srvAddr;  
    srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);  
    srvAddr.sin_family = AF_INET;  
    srvAddr.sin_port = htons(DefaultPort);  
    int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));  
    if(SOCKET_ERROR == bindResult){  
        cerr << "Bind failed. Error:" << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
  
// 將SOCKET設定為監聽模式   
    int listenResult = listen(srvSocket, 100);  
    if(SOCKET_ERROR == listenResult){  
        cerr << "Listen failed. Error: " << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
      
// 開始處理IO資料   
    cout << "本伺服器已準備就緒,正在等待客戶端的接入...\n";  
   memset(&g_info, 0, sizeof(RecvInfo));

 int rcv_size = 1024*1024*20;    /* 接收緩衝區大小為8K */ 
 int optlen = sizeof(rcv_size); 
 err = setsockopt(srvSocket,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen); 
 if(err<0){ 
  printf("設定接收緩衝區大小錯誤\n"); 
 }else{
  printf("設定接收緩衝區大小OK\n"); 
 }

 g_pRecvData = new char[1024*1024*500];
 if (g_pRecvData==NULL){
  printf("new g_pRecvData false\n");
 }
    // 建立用於傳送資料的執行緒   
    HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);  
 /*if (0 == BindIoCompletionCallback((HANDLE)srvSocket, IOWorkDummy, 0))
 {
  printf("BindIoCompletionCallback is failed. Error code = %d", GetLastError());
 }*/

 int nPackIndex = 0;
    while(true){  
        PER_HANDLE_DATA * PerHandleData = NULL;  
        SOCKADDR_IN saRemote;  
        int RemoteLen;  
        SOCKET acceptSocket;  
  
        // 接收連線,並分配完成端,這兒可以用AcceptEx()   
        RemoteLen = sizeof(saRemote);  
        acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);  
        if(SOCKET_ERROR == acceptSocket){   // 接收客戶端失敗   
            cerr << "Accept Socket Error: " << GetLastError() << endl;  
            system("pause");  
            return -1;  
        }  
          
        // 建立用來和套接字關聯的單控制代碼資料資訊結構   
        PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));  // 在堆中為這個PerHandleData申請指定大小的記憶體   
        PerHandleData -> socket = acceptSocket;  
        memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);  
        clientGroup.push_back(PerHandleData);       // 將單個客戶端資料指標放到客戶端組中   
  
        // 將接受套接字和完成埠關聯   
        CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);  
  
          
        // 開始在接受套接字上處理I/O使用重疊I/O機制   
        // 在新建的套接字上投遞一個或多個非同步   
        // WSARecv或WSASend請求,這些I/O請求完成後,工作者執行緒會為I/O請求提供服務       
        // 單I/O操作資料(I/O重疊)   
        LPPER_IO_OPERATION_DATA PerIoData = NULL;  
        PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));  
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));  
        PerIoData->databuff.len = BUFFSIZE;  
        PerIoData->databuff.buf = PerIoData->buffer;
  PerIoData->pHeader = g_pRecvData + nPackIndex*1024*100;
        PerIoData->operationType = 0;    // read   
  PerIoData->TotalSize = 0;
  PerIoData->ReadSize = 0;
  nPackIndex++;
  
        DWORD RecvBytes;  
        DWORD Flags = 0;  
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
    }  
  
    system("pause"); 
 free(g_pRecvData);
    return 0;  
}  
  
// 開始服務工作執行緒函式   
DWORD WINAPI ServerWorkThread(LPVOID IpParam)  
{  
    HANDLE CompletionPort = (HANDLE)IpParam;  
    DWORD BytesTransferred;  
    LPOVERLAPPED IpOverlapped;  
    LPPER_HANDLE_DATA PerHandleData = NULL;  
    LPPER_IO_DATA PerIoData = NULL;  
    DWORD RecvBytes;  
    DWORD Flags = 0;  
    BOOL bRet = false;  
 
 g_info.tBegin = GetTickCount();
    while(true){  
        bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);  
        if(bRet == 0){  
            cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl;  
            return -1;  
        }  
        PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);  
          
        // 檢查在套接字上是否有錯誤發生   
        if(0 == BytesTransferred){  
            closesocket(PerHandleData->socket);  
            GlobalFree(PerHandleData);  
            GlobalFree(PerIoData);  
            continue;  
        }  
          
        // 開始資料處理,接收來自客戶端的資料   
  //cout << "A Client says: " << PerIoData->databuff.len << endl;  
        if (PerIoData->TotalSize == 0){
   // header1
   PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
   if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
    //fwrite( g_pBuf, 1, g_lastLen, g_hFile );
    //sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
    //fflush( g_hFile );
    printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //break;
   }
   
   PerIoData->ReadSize = BytesTransferred - 4;
  }else{
   PerIoData->ReadSize += BytesTransferred;
  }
  g_info.nDataTotal += BytesTransferred;
  
  //PerIoData->buff[len] = '\0';   
  if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
   g_info.TotalSize=PerIoData->TotalSize;
   g_info.ReadSize=PerIoData->ReadSize;
   PerIoData->TotalSize = PerIoData->ReadSize = 0;
   //printf("full .... ");
  }
  
  if (g_info.nActCount++>50000){
   //if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
   g_info.nActCount = 0;
   g_info.tEnd = GetTickCount();
   g_info.tDiff = g_info.tEnd - g_info.tBegin;

   float fTotal = (float)g_info.nDataTotal/(1024*1024);
   float nDiffTime = (float)g_info.tDiff/1000;
   g_info.tBegin = g_info.tEnd;
   printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n", 
     nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize); 
   g_info.fTotalLast = fTotal;
  }
  
        // 為下一個重疊呼叫建立單I/O操作資料   
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空記憶體   
  if (PerIoData->TotalSize==0){
   PerIoData->databuff.len = BUFFSIZE;  
   PerIoData->databuff.buf = PerIoData->buffer;  
   PerIoData->operationType = 0;    // read   
  }else{
   memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
   PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize;  
   PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
   PerIoData->operationType = 0;    // read   
  }

        
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
    }  
  
    return 0;  
}  
  
void WINAPI IOWorkDummy(DWORD ExeCode, DWORD BytesTransferred, LPOVERLAPPED pOVL)
{
 LPOVERLAPPED IpOverlapped;  
 LPPER_HANDLE_DATA PerHandleData = NULL;  
 LPPER_IO_DATA PerIoData = NULL;  
 DWORD RecvBytes;  
 DWORD Flags = 0;  
 BOOL bRet = false; 

 g_info.tBegin = GetTickCount();
 while(true){  
  PerIoData = (LPPER_IO_DATA)(pOVL); 

  // 檢查在套接字上是否有錯誤發生   
  if(0 == BytesTransferred){  
   closesocket(PerHandleData->socket);  
   GlobalFree(PerHandleData);  
   GlobalFree(PerIoData);  
   continue;  
  } 

  // 開始資料處理,接收來自客戶端的資料   
  //cout << "A Client says: " << PerIoData->databuff.len << endl;  
  if (PerIoData->TotalSize == 0){
   // header1
   PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
   if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
    //fwrite( g_pBuf, 1, g_lastLen, g_hFile );
    //sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
    //fflush( g_hFile );
    printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //break;
   }

   PerIoData->ReadSize = BytesTransferred - 4;
  }else{
   PerIoData->ReadSize += BytesTransferred;
  }
  g_info.nDataTotal += BytesTransferred;

  //PerIoData->buff[len] = '\0';   
  if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
   g_info.TotalSize=PerIoData->TotalSize;
   g_info.ReadSize=PerIoData->ReadSize;
   PerIoData->TotalSize = PerIoData->ReadSize = 0;
   //printf("full .... ");
  }

  if (g_info.nActCount++>50000){
   //if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
   g_info.nActCount = 0;
   g_info.tEnd = GetTickCount();
   g_info.tDiff = g_info.tEnd - g_info.tBegin;

   float fTotal = (float)g_info.nDataTotal/(1024*1024);
   float nDiffTime = (float)g_info.tDiff/1000;
   g_info.tBegin = g_info.tEnd;
   printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n", 
    nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize); 
   g_info.fTotalLast = fTotal;
  }

  // 為下一個重疊呼叫建立單I/O操作資料   
  ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空記憶體   
  if (PerIoData->TotalSize==0){
   PerIoData->databuff.len = BUFFSIZE;  
   PerIoData->databuff.buf = PerIoData->buffer;  
   PerIoData->operationType = 0;    // read   
  }else{
   memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
   PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize;  
   PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
   PerIoData->operationType = 0;    // read   
  }


  WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
 }  
} // IOWorkerDummy()
  
// 傳送資訊的執行緒執行函式   
DWORD WINAPI ServerSendThread(LPVOID IpParam)  
{  
    while(1){  
        char talk[200];  
        gets(talk);  
        int len;  
        for (len = 0; talk[len] != '\0'; ++len){  
            // 找出這個字元組的長度   
        }  
        talk[len] = '\n';  
        talk[++len] = '\0';  
        printf("I Say:");  
        cout << talk;  
        WaitForSingleObject(hMutex,INFINITE);  
        for(int i = 0; i < clientGroup.size(); ++i){  
            send(clientGroup[i]->socket, talk, 200, 0);  // 傳送資訊   
        }  
        ReleaseMutex(hMutex);   
    }  
    return 0;  
}