1. 程式人生 > >P2P打洞伺服器與客戶端

P2P打洞伺服器與客戶端

本文從以下四個部分開始分析講解,客戶端與服務端的原始碼在文章末尾連結

一、P2P打洞的原理

二、P2P伺服器的實現

三、P2P客戶端的實現

四、資料包格式

一、P2P打洞原理

1、打洞解決了什麼問題?

我們平常使用的一般都為私有ip,但是私有ip之間是不能直接通訊的,如果要進行通訊只能通過公網上的伺服器進行資料的轉發,難道我們每次傳送資料都要經過公網上的伺服器轉發嗎?也不是不可以,但是伺服器的承受能力就會大大增加。此時就需要我們的打洞技術的出現了,打洞的出現解決了私有ip之間直接通訊的問題(還是需要經過一次公網伺服器)
例如:QQ中的聊天就廣泛的使用到了打洞技術,不然伺服器的承受能力會大大增加,而且會影響聊天的效率。

2、打洞的實現過程與原理

私有ip的資料都要經過路由器的轉發,路由器上有一張NAPT表(IP埠對映表),NAPT表記錄的是【私有IP:埠】與【公有IP:埠】的對映關係(就是一一對應關係),本文講到的路由均是以NAPT為工作模式,這並不影響對打洞。實際中的資料實際傳送給的都是路由器的【公有IP:埠】,然後經過路由器進過查詢路由表後再轉發給【私有的IP:埠】的。

舉個示例:
使用者A
電腦IP:192.168.1.101
桌面上有個客戶端程式採用的網路埠:10000
路由器的公有IP:120.78.201.201(實際中常常為多級路由,這裡以最簡單的一層路由舉例)
NAPT路由器的NAPT表的其中一條記錄為:【120.78.201.201:20202】-【192.168.1.101:10000】

使用者B
電腦IP:192.168.2.202
桌面上有個客戶端程式採用的網路埠:22222
路由器的公有IP:120.78.202.202
NAPT路由器的NAPT表的其中一條記錄為:【120.78.202.202:20000】-【192.168.2.202:22222】

打洞伺服器P2Pserver
IP:120.78.202.100
port:20000

此時使用者A的電腦發給了伺服器一條資料,伺服器收到使用者A的IP與埠是多少呢?當然為120.78.201.201:20202,資料包經過路由的時候進行了重新的封包。如果伺服器此時發一條資料給使用者A,發往的IP與埠是什麼呢?當然為120.78.201.201:20202,此時路由器收到這個資料包後,進行查詢NAPT表中120.78.201.201:20202對應的IP與埠資訊,發現是192.168.1.101:10000,然後路由器就轉發給IP為192.168.1.101:10000的電腦,然後電腦上的應用程式就收到這條資訊了。

既然如此,我們私有IP雖然不能直接通訊,但是我們能夠發給公有IP!如果使用者B需要給使用者A發一條資訊時,使用者B直接將資料發往目的IP、埠為120.78.201.201:20202的地方不就行了?
這裡有兩個問題:
第一,使用者B怎麼知道使用者A在路由上對映的IP與埠;
第二,使用者B直接將資料包發往120.78.201.201:20202,路由器是會將使用者B的資料包丟棄的,因為路由器裡面沒有關於使用者B120.78.202.202的路由資訊(路由器裡面還有個路由表,用於路由),無法進行路由,所以將會進行丟棄。

如何解決第一個問題?
通過打洞伺服器,將使用者A對映的IP、埠資訊告訴使用者B即可。
如何解決第二個問題?
如果打洞伺服器首先告訴使用者A先發一條資訊給使用者B(使用者A得知使用者B的地址資訊也是通過打洞伺服器),注意此時使用者B是收不到的,使用者B的路由同樣會進行丟棄,但是這並不要緊,因為使用者A發了這條資訊後,使用者A的路由就會記錄關於使用者B的路由資訊(該資訊記錄的是將使用者B的IP資訊路由到使用者A電腦),然後此時使用者B再發給使用者A一條資訊,就不會進行丟棄了,因為使用者A的路由裡面有使用者B的路由資訊。

通過解決上面的兩個問題後,我們再通過圖形示意圖來具體瞭解下打洞的過程
這裡寫圖片描述
整個過程就是我標的序號1->2->3->4->5->6->7->8>9
過程一:1->2
此過程為使用者B向伺服器請求向用戶A打洞
過程二:3->4
此過程為伺服器相應使用者B的打洞請求,告訴使用者A使用者B想與你打洞(資料包中包含使用者B的地址資訊)。
過程三:5->6
使用者A主動發一條資訊給使用者B,目的是為了使得路由器A中能夠有一條關於路由B的IP的路由資訊(注意不是使用者B,使用者B是私有IP),就如圖所示,這條資訊會被丟棄的,因為路由B的路由表中沒有路由A的IP的資訊。
過程四:7->8->9
使用者B再發一條資訊給使用者A,因為此時路由A的路由表中有關於路由B的IP的路由資訊,此時路由A就能路由給使用者A了,至此,使用者A就能直接收到使用者B發的資訊了。注意,此時使用者A發給使用者B不需要打洞,因為路由B中已經有關於路由A的IP的路由資訊了。

還是思路重要,所以有點囉嗦了…本來以為自己對P2P瞭解的算不錯了,但是在書寫的時候發現在細枝末節還是半知半解,因此特此去查閱了一些資料。本文難免有一些不足,希望大家指出不足。

二、P2P伺服器的實現

服務端的實現:
服務端採用的是基於UDP的IOCP網路模型實現資料的收發。實現了對客戶端資料的分發處理,心跳檢測是否線上,資料包完整性的CRC校驗等

原始碼過長,就貼下服務端處理資料的工作者類CWorker,原始碼自行下載
工作者類-Worker.h

#ifndef __WORKER_H__
#define __WORKER_H__

#include "IOCPServer.h"

#include <map>
using namespace std;
typedef list<stUserListNode *> UserList;
typedef map<stUserListNode*,int> UserMap;

class CWorker
{
public:
    CWorker(int nPort = SERVER_PORT, HWND hWnd = NULL);
    ~CWorker();

    void ShowServerInfo();              // 顯示伺服器的相關配置資訊
    bool OpenHeartThread();             // 開啟心跳檢測執行緒
    void OnUsers();                     // 顯示所有線上使用者

    static CWorker* m_worker;

protected:
    static void CALLBACK NotifyProc(LPVOID lParam, OVERLAPPEDPLUS* pContent, DWORD dwSize, UINT nCode);

    void OnCMDLogIn(Msg msg, SOCKADDR_IN addr);             // 登入資訊
    void OnCMDLogOut(Msg msg, SOCKADDR_IN addr);            // 下線資訊
    void OnCMDP2Ptrans(Msg msg, SOCKADDR_IN addr);          // 打洞請求資訊
    void OnCMDGetAllUser(Msg msg, SOCKADDR_IN addr);        // 獲取線上使用者資訊
    bool SendPacketACK(MsgCmd msgCmd, SOCKADDR_IN addr);    // 傳送確認包

    void OnCMDHeartACK(Msg msg, SOCKADDR_IN addr);
    static DWORD CALLBACK HeartbeatProc(LPVOID lparam);     // 心跳包即時檢測客戶狀態

    bool GetUserByName(stUserListNode & userNode, char* username);

    HANDLE          m_hHeartThread;     // 心跳包檢測執行緒控制代碼
    bool            m_bExitTherad;      // 心跳包檢測執行緒退出標識

    CIOCPServer*    m_IocpServer;       // UDP IOCP伺服器
    UserList        m_userList;         // 儲存使用者登入資訊的連結串列
    UserMap         m_userMap;          // 使用者心跳包檢測,記錄心跳
    static CRITICAL_SECTION m_cs;
};

#endif

工作者類-Worker.cpp

#include "Worker.h"
#include "Lock.h"

#define MAXHEART 10 // 最大心跳次數
CWorker* CWorker::m_worker;
CRITICAL_SECTION CWorker::m_cs;

/************************************************************************/
/* 心跳檢測,每一秒進行一次探測                                            */
/************************************************************************/
DWORD CALLBACK CWorker::HeartbeatProc(LPVOID lparam)
{
    int addrlen = sizeof(SOCKADDR_IN);
    SOCKADDR_IN clientAddr = { 0 };
    int msglen = sizeof(Msg);
    Msg heartmsg = { 0 };
    heartmsg.head.sCmd = CMDHEARTMSG;
    heartmsg.head.nContentLen = 0;
    heartmsg.head.CRC32 = ::GetCRC32(heartmsg.content.szMsg, heartmsg.head.nContentLen);

    while (!(m_worker->m_bExitTherad))
    {
        if (m_worker->m_userList.size() == 0)
        {
            Sleep(100);
            continue;
        }

        Sleep(2000);
        CLock cs(m_cs, "HeartbeatProc");
        for (UserList::iterator it = m_worker->m_userList.begin(); 
            it != m_worker->m_userList.end(); ++it)
        {
            if (m_worker->m_userMap[*it] == MAXHEART)//達到最大心跳次數,刪除使用者資訊
            {
                printf("user [%s] logout -> has max heartbeat\n",(*it)->userName);
                m_worker->m_userMap.erase(*it);

                stUserListNode* tmp = *it;
                it = m_worker->m_userList.erase(it);
                delete tmp;

                if (it == m_worker->m_userList.end())
                    break;
            }
            m_worker->m_userMap[*it]++;//心跳加一

            ZeroMemory(&clientAddr, addrlen);
            clientAddr.sin_family = AF_INET;
            clientAddr.sin_port = ntohs((*it)->port);
            clientAddr.sin_addr.s_addr = htonl((*it)->ip);
            m_worker->m_IocpServer->PostSend(&clientAddr, (LPBYTE)&heartmsg, msglen);
        }
    }

    return 0;
}


void CALLBACK CWorker::NotifyProc(LPVOID lParam, OVERLAPPEDPLUS* pContent, DWORD dwSize, UINT nCode)
{
    Msg msg = { 0 };
    memcpy_s(&msg, sizeof(Msg), pContent->buf, sizeof(Msg));
    SOCKADDR_IN addr = pContent->remoteAddr;
    switch (msg.head.sCmd)
    {
    case CMDLOGIN:
        m_worker->SendPacketACK(CMDLOGINACK, addr);
        m_worker->OnCMDLogIn(msg, addr);
        break;
    case CMDLOGOUT:
        m_worker->SendPacketACK(CMDLOGOUTACK, addr);
        m_worker->OnCMDLogOut(msg, addr);
        break;
    case CMDP2PTRANS:
        m_worker->SendPacketACK(CMDP2PTRANSACK, addr);
        m_worker->OnCMDP2Ptrans(msg, addr);
        break;
    case CMDP2PGETALLUSER:
        m_worker->SendPacketACK(CMDP2PGETALLUSERACK, addr);
        m_worker->OnCMDGetAllUser(msg, addr);
        break;
    case CMDHEARTMSGACK:
        m_worker->OnCMDHeartACK(msg, addr);
        break;
    default:
        break;
    }
}

void CWorker::OnCMDHeartACK(Msg msg, SOCKADDR_IN addr)
{
    unsigned int nIp = ntohl(addr.sin_addr.S_un.S_addr);
    unsigned short nPort = ntohs(addr.sin_port);

    CLock cs(m_cs, "OnCMDHeartACK");
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++ it)
    {
        if ((*it)->ip == nIp && (*it)->port == nPort && 
            strcmp(msg.content.heartmessage.userName,(*it)->userName) == 0)
        {
            m_userMap[*it] = 0;
            return;
        }
    }
}

void CWorker::OnCMDLogIn(Msg msg, SOCKADDR_IN addr)
{
    /*
    對於使用者登入的密碼檢驗沒有
    採用 MD5碼 進行校驗
    */
    stUserListNode* userInfo = new stUserListNode;
    strcpy_s(userInfo->userName, 10, msg.content.loginmember.userName);
    userInfo->ip = ntohl(addr.sin_addr.S_un.S_addr);
    userInfo->port = ntohs(addr.sin_port);

    // 防止執行緒同步導致多條相同使用者資訊被存放到連結串列中
    // 出現的情況是:由於網路阻塞,一瞬間同時收到同一個客戶端的10條登入請求,此時10個執行緒同時工作(同時對連結串列操作),此時必須避免執行緒同步的問題!
    CLock* cs = new CLock(m_cs, "OnCMDLogIn");
    // 檢測該使用者是否已經存在
    bool bExist = false;
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
    {
        if (strcmp(userInfo->userName,(*it)->userName) == 0 &&
            userInfo->ip == (*it)->ip && userInfo->port == (*it)->port)
        {
            bExist = true;
            break;
        }
    }

    // 將新連線的使用者,加入連結串列中
    if (!bExist)
    {
        printf("------user login : %s <-> [%s:%d]\n", userInfo->userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
        m_userList.push_back(userInfo);
        m_userMap[userInfo] = 0;
    }
    delete cs;// 這裡不用在末尾釋放已達到提升速度

    // 傳送已經登入的客戶資訊給客戶端
    // 1、傳送 登入客戶的個數,客戶端根據個數進行接收
    int usercount = m_userList.size();
    Msg usercntmsg = { 0 };
    usercntmsg.head.sCmd = CMDP2PGETUSERCNT;
    usercntmsg.head.nContentLen = sizeof(int);
    usercntmsg.head.CRC32 = ::GetCRC32((char*)&usercount, sizeof(int));
    usercntmsg.content.usercount = usercount;
    m_IocpServer->PostSend((SOCKADDR_IN*)&addr, (LPBYTE)&usercntmsg, sizeof(Msg));
    // 2、開始逐條傳送 客戶資訊
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
    {
        m_IocpServer->PostSend((SOCKADDR_IN*)&addr, (LPBYTE)(*it), sizeof(stUserListNode));
    }

    printf("send user list information to : %s <-> [%s:%d]\n", userInfo->userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}

void CWorker::OnCMDLogOut(Msg msg, SOCKADDR_IN addr)
{
    CLock cs(m_cs, "OnCMDLogOut");
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
    {
        if (strcmp((*it)->userName,msg.content.logoutmember.userName) == 0)
        {
            printf("------user logout : %s <-> [%s:%d]\n", msg.content.logoutmember.userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
            m_userMap.erase(*it);           
            m_userList.remove(*it); // 移除連結串列節點
            return;
        }
    }
}

void CWorker::OnCMDP2Ptrans(Msg msg, SOCKADDR_IN addr)
{
    printf("[%s:%d] wants to p2p %s\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), msg.content.requesttransmsg.toName);
    stUserListNode userNode;
    // 獲取使用者資訊--使用者可能已經下線
    if (!GetUserByName(userNode,msg.content.requesttransmsg.toName))
    {
        // 使用者下線處理
        return;
    }

    // 1、獲取被打洞端的資訊
    SOCKADDR_IN remote = { 0 };
    remote.sin_family = AF_INET;
    remote.sin_port = htons(userNode.port);
    remote.sin_addr.s_addr = htonl(userNode.ip);
    printf("tell %s[%s:%d] to send p2ptrans message to: ", msg.content.requesttransmsg.toName,inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));
    printf("%s[%s:%d]\n", msg.content.requesttransmsg.requestName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
    // 2、打洞資料包封裝
    Msg transmsg = { 0 };
    transmsg.content.servtransmsg.nIP = ntohl(addr.sin_addr.S_un.S_addr);
    transmsg.content.servtransmsg.nPort = ntohs(addr.sin_port);
    strcpy_s(transmsg.content.servtransmsg.userName, USERNAMELEN, msg.content.requesttransmsg.requestName);
    transmsg.head.sCmd = CMDREQUESTP2P;
    transmsg.head.nContentLen = sizeof(stP2PTranslate);
    transmsg.head.CRC32 = ::GetCRC32(transmsg.content.szMsg, transmsg.head.nContentLen);
    // 3、傳送打洞資料包給 被打洞方,要求 被打洞方 主動更新路由器表資訊
    m_IocpServer->PostSend(&remote, (LPBYTE)&transmsg, sizeof(Msg));
}

void CWorker::OnCMDGetAllUser(Msg msg, SOCKADDR_IN addr)
{
    // 1、傳送客戶個數
    msg.content.usercount = m_userList.size();
    msg.head.sCmd = CMDP2PGETUSERCNT;
    msg.head.nContentLen = sizeof(int);
    msg.head.CRC32 = ::GetCRC32(msg.content.szMsg, msg.head.nContentLen);
    m_IocpServer->PostSend(&addr, (LPBYTE)&msg, sizeof(Msg));
    // 2、迴圈傳送客戶資訊
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
    {
        m_IocpServer->PostSend(&addr, (LPBYTE)(*it), sizeof(stUserListNode));
    }
    printf("send user list information to : [%s:%d]\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}


bool CWorker::SendPacketACK(MsgCmd msgCmd, SOCKADDR_IN addr)
{
    Msg msgACK = { 0 };
    msgACK.head.sCmd = msgCmd;
    msgACK.head.nContentLen = 0;
    msgACK.head.CRC32 = ::GetCRC32(msgACK.content.szMsg, 0);

    m_IocpServer->PostSend(&addr, (LPBYTE)&msgACK, sizeof(Msg));
    return true;
}


bool CWorker::GetUserByName(stUserListNode & userNode, char* username)
{
    CLock cs(m_cs, "GetUserByName");
    for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
    {
        if (strcmp((*it)->userName,username) == 0)
        {
            userNode = **it;
            return true;
        }
    }
    return false;
}


CWorker::CWorker(int nPort, HWND hWnd)
    :m_IocpServer(NULL)
    ,m_bExitTherad(false)
{
    InitializeCriticalSection(&m_cs);

    m_IocpServer = new CIOCPServer;
    if (!m_IocpServer->Initialize(NotifyProc, hWnd, nPort))
        printf("服務端啟動失敗:%d", GetLastError());
    printf("P2P伺服器啟動成功...\n");

    ShowServerInfo();
}

CWorker::~CWorker()
{
    try
    {
        if (m_IocpServer)
        {
            DeleteCriticalSection(&m_cs);
            m_bExitTherad = true;
            delete m_IocpServer;
            Sleep(100);// 等待資源釋放完畢
        }
    }
    catch (...) {}
}

void CWorker::ShowServerInfo()
{
    printf("伺服器開啟的執行緒數 : %d\n", m_IocpServer->m_nCurrentThreads);
    printf("伺服器埠號 : %d\n", m_IocpServer->m_nPort);
    printf("正在工作的執行緒數 : %d\n", m_IocpServer->m_nBusyThreads);
    printf("記憶體中的元素個數 : %d\n", m_IocpServer->m_listContexts.size());
    printf("伺服器傳送即時速度 : %d KB/s\n", m_IocpServer->m_nSendKbps);
    printf("伺服器接收即時速度 : %d KB/s\n", m_IocpServer->m_nRecvKbps);
}

bool CWorker::OpenHeartThread()
{
    m_hHeartThread = CreateThread(NULL, NULL, HeartbeatProc, m_worker, NULL, NULL);
    if (m_hHeartThread == INVALID_HANDLE_VALUE)
    {
        printf("CreateThread failed!\n");
        return false;
    }
    return true;
}

void CWorker::OnUsers()
{
    CLock cs(m_cs, "OnUsers");

    printf("\nHave %d users\n", m_userMap.size());
    if (m_userMap.size() == 0)
        return;

    for (UserMap::iterator it = m_userMap.begin(); it != m_userMap.end(); it++)
    {
        in_addr tmp;
        tmp.S_un.S_addr = htonl(it->first->ip);
        printf("Username:%s\tIP:%s\tport:%d\theart:%d\n", it->first->userName, inet_ntoa(tmp), it->first->port,it->second);
    }
}

三、P2P客戶端的實現

客戶端的實現:
客戶端採用普通UDP套接字加多執行緒實現。實現了超時重發,使用者登入密碼的MD5加密,資料包完整性的CRC校驗等

客戶端的工作者類CWorker
Worker.h

#ifndef __WORKER_H__
#define __WORKER_H__


#include "Socket.h"
#include "../public/MsgProtocal.h"
#include <list>

#define MAX_RESEND 10               // 最大重發次數
#define MIN_RESEND 5                // 最小重發次數
#define MAX_RECV 10                 // 最大接收次數
#define MAX_CONNECT 10              // 連線伺服器的次數


typedef std::list<stUserListNode *> UserList;

class CWorker
{
public:
    CWorker();
    ~CWorker();

    static CWorker* m_worker;
    static DWORD WINAPI RecvThreadProc(LPVOID lparam);  // 接收訊息的執行緒

    bool IsIP(char* ip);

    bool InitNetEnvironment(int nPort=0);
    bool OnLogin();

    void SuspendRecvThread() const
    {
        SuspendThread(m_hThread);
    }

    void ResumeRecvThread() const
    {
        ResumeThread(m_hThread);
    }

    /*
    **  使用者命令響應
    */
    bool OnGetU();

    /*
    ** 傳送資訊給線上使用者
    */
    bool OnSend(char* sendname, char* message);

    /*
    ** 下線
    */
    bool OnExit();

protected:
    MsgCmd      m_cmdACK;           // 確認訊息的型別
    bool        m_bReplyACK;        // 是否判斷回覆的確認訊息

    UserList    m_UserList;
    Socket*     m_sock;
    unsigned int m_serverPort;

    HANDLE      m_hThread;          // 執行緒控制代碼
    bool        m_bExit;            // 執行緒退出標識

    char        m_ServerIP[20];
    char        m_UserName[USERNAMELEN];
    char        m_UserPwd[USERPASSWORDLEN];

    bool CheckPackage(Msg msg);

    // 丟包重發機制的收發
    bool ConnectToServer(char* serverip, char* szName, char* szPwd, int max_time = MAX_CONNECT);
    bool SendtoServer(Msg & msg, unsigned short nCmd, char* dstip, unsigned int dstport, int max_time = MAX_RESEND);

    // 接收指定檔案頭的資料包
    bool OnCmdRecvFrom(Msg & msg, unsigned short nCmd, char* dstip, unsigned int dstport, int max_time = MAX_RECV);
    bool OnGetUsersRecvFrom(void* recvbuf, int recvlen, char* dstip, unsigned int dstport, int max_time = MAX_RECV);
};


#endif

Worker.cpp

#include "Worker.h"
#include "../public/CRC32.h"
#include "../public/MD5.h"
#include <string>
#include <regex>
using namespace std;

CWorker* CWorker::m_worker;

DWORD WINAPI CWorker::RecvThreadProc(LPVOID lparam)
{
    CWorker* pThis = (CWorker*)lparam;
    Msg recvMsg = { 0 };
    int msglen = sizeof(Msg);
    char szAddress[32] = { 0 };
    unsigned int nPort = 0;

    while (!(pThis->m_bExit))
    {
        // 此處可以改進為使用 接收的訊息佇列
        // 其中一個執行緒作為接收,放入訊息隊裡中,其中一個執行緒不停的在訊息佇列中獲取訊息,並進行處理!!!
        ZeroMemory(&recvMsg, msglen);
        ZeroMemory(szAddress, sizeof(szAddress));
        int iread = pThis->m_sock->ReceiveFrom(&recvMsg, msglen, szAddress, nPort);
        if (iread <= 0 || iread != msglen)      // recv error
        {
            //printf("recvfrom failed : %d\n", GetLastError());
            continue;
        }
        // CRC差錯檢測
        if (recvMsg.head.CRC32 != ::GetCRC32(recvMsg.content.szMsg,recvMsg.head.nContentLen))
            continue;

        /******************************************************/
        /*          根據資料包訊息頭進行分發處理                 */
        /*******************************************************/
        switch (recvMsg.head.sCmd)
        {
            case CMDP2PMESSAGEACK:// P2P訊息確認包的處理,來自客戶端
            {
                pThis->m_bReplyACK = true;
                printf("Recv message ack from %s:%ld\n", szAddress, nPort);
                break;
            }

            case CMDP2PMESSAGE:// 接收到P2P訊息的處理,來自客戶端
            {
                printf("Recv Message from %s[%s:%d] -> %s\n", recvMsg.content.p2pmesssage.username,
                    szAddress, nPort, recvMsg.content.p2pmesssage.message);

                ZeroMemory(&recvMsg, msglen);
                recvMsg.head.sCmd = CMDP2PMESSAGEACK;
                recvMsg.head.nContentLen = 0;
                recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);
                pThis->m_sock->SendTo(&recvMsg, msglen, nPort, szAddress);
                printf("Send a Message ACK to %s[%s:%d]\n", recvMsg.content.p2pmesssage.username, szAddress, nPort);
                break;
            }

            case CMDP2PGETALLUSERACK:// getu 獲取所有使用者資訊回覆包,來自服務端
            {
                pThis->m_bReplyACK = true;
                printf("Recv getu ack from server\n");

                ZeroMemory(&recvMsg, sizeof(Msg));
                if (!pThis->OnCmdRecvFrom(recvMsg, CMDP2PGETUSERCNT,pThis->m_ServerIP, pThis->m_serverPort))
                {
                    printf("Get user count from server timeout...\n");
                    continue;
                }
                int usercount = recvMsg.content.usercount;
                printf("Have %d users logined server\n", usercount);

                // 3、開始逐條接收(可能會有丟包出現,可以記錄是哪個包丟失,然後進行重新申請)
                int nCntRecved = 0;
                int nodelen = sizeof(stUserListNode);
                pThis->m_UserList.clear();
                for (int i = 0; i < usercount; i++)
                {
                    stUserListNode *node = new stUserListNode;
                    if (!pThis->OnGetUsersRecvFrom(node, nodelen, pThis->m_ServerIP, pThis->m_serverPort))
                    {
                        printf("Get user info from server timeout...\n");
                        continue;
                    }

                    pThis->m_UserList.push_back(node);

                    in_addr tmp;
                    tmp.S_un.S_addr = htonl(node->ip);
                    printf("Username:%s\nUserIP:%s\nUserPort:%d\n\n", node->userName, inet_ntoa(tmp), node->port);
                    nCntRecved++;
                }

                printf("Has received %d users form server\n", nCntRecved);  
                break;
            }

            case CMDP2PTRANSACK:// 主動請求打洞確認包,來自服務端
            {
                pThis->m_bReplyACK = true;
                printf("Recv p2ptransACK from server\n");
                break;
            }

            case CMDREQUESTP2P:// 對方的打洞請求,來自服務端的轉發
            {
                printf("Recv P2P request from %s\n",recvMsg.content.servtransmsg.userName);

                printf("Send P2P request ACK to %s\n", recvMsg.content.servtransmsg.userName);
                unsigned int toIP = recvMsg.content.servtransmsg.nIP;
                unsigned short toPort = recvMsg.content.servtransmsg.nPort;
                ZeroMemory(&recvMsg, msglen);
                recvMsg.head.sCmd = CMDREQUESTP2PACK;
                recvMsg.head.nContentLen = 0;
                recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);

                pThis->m_sock->SendTo2(&recvMsg, msglen, toPort, toIP);
                break;
            }
            case CMDREQUESTP2PACK:// 對方發來的打洞訊息,忽略,來自客戶端
            {
                // do nothing
                printf("Recv P2P burrow ACK from %s:%d\n", szAddress, nPort);

                break;
            }
            case CMDLOGOUTACK:// 確認下線訊息
            {
                pThis->m_bReplyACK = true;
                break;
            }

            case CMDHEARTMSG:// 心跳包訊息
            {
                //printf("recv one heart packet \n");
                ZeroMemory(&recvMsg, msglen);
                strcpy_s(recvMsg.content.heartmessage.userName, pThis->m_UserName);
                recvMsg.head.sCmd = CMDHEARTMSGACK;
                recvMsg.head.nContentLen = strlen(pThis->m_UserName);
                recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);
                pThis->m_sock->SendTo(&recvMsg, msglen, pThis->m_serverPort, pThis->m_ServerIP);
                //printf("reply heart ack to server\n");
                break;
            }
        }
    }

    printf("Worker Thread exit...\n");
    return 0;
}

bool CWorker::InitNetEnvironment(int nPort/*=0*/)
{
    if (!m_sock->Create(nPort, SOCK_DGRAM))
        return false;
    return true;
}

bool CWorker::IsIP(char* ip)
{
    regex pattern("\\b([1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\b");
    return regex_match(ip, pattern);
}

bool CWorker::CheckPackage(Msg msg)
{
    if (msg.head.CRC32 == ::GetCRC32(msg.content.szMsg, msg.head.nContentLen))
        return true;
    return false;
}

bool CWorker::OnLogin()
{
    printf("\n");
    do
    {
        ZeroMemory(m_ServerIP, sizeof(m_ServerIP));
        printf("Please input server ip:");
        scanf_s("%s", m_ServerIP, sizeof(m_ServerIP));
    } while (!IsIP(m_ServerIP));

    printf("Please input your name:");
    ZeroMemory(m_UserName, USERNAMELEN);
    scanf_s("%s", m_UserName, sizeof(m_UserName));

    printf("Please input your password:");
    ZeroMemory(m_UserPwd, USERPASSWORDLEN);
    scanf_s("%s", m_UserPwd, sizeof(m_UserPwd));

    return ConnectToServer(m_ServerIP, m_UserName, m_UserPwd);
}

bool CWorker::ConnectToServer(char* serverip, char* szName, char* szPwd, int max_time)
{
    // 1、對密碼進行MD5加密處理
    string check = szName + string("#") + ::md5(szPwd);
    MsgContent msgcontent = { 0 };
    strcpy(msgcontent.loginmember.userName, szName);
    strcpy(msgcontent.loginmember.check, check.c_str());

    // 2、封裝資料包,進行CRC32校驗
    Msg msg = { 0 };
    msg.head.sCmd = CMDLOGIN;
    msg.head.nContentLen = strlen(msgcontent.szMsg);
    msg.head.CRC32 = GetCRC32(msgcontent.szMsg, msg.head.nContentLen);
    memcpy(&msg.content.loginmember, &msgcontent, sizeof(stLoginMessage));

    // 3、傳送登入資料包 登入伺服器
    printf("begin to connect server...\n");
    if(!SendtoServer(msg,CMDLOGINACK,m_ServerIP,m_serverPort))
    {
        printf("connect server timeout...\n");
        return false;
    }

    // 4、登入伺服器成功,開始接收線上端使用者數量
    ZeroMemory(&msg, sizeof(Msg));
    int usercount = 0;
    int iread = 0;

    if (!OnCmdRecvFrom(msg, CMDP2PGETUSERCNT, m_ServerIP, m_serverPort))
    {
        printf("Get user count from server timeout...\n");
        return false;
    }
    usercount = msg.content.usercount;
    printf("Have %d users logined server\n", usercount);

    // 5、接收線上使用者資訊
    int nCntRecved = 0;
    int nodelen = sizeof(stUserListNode);
    m_UserList.clear();
    for (int i = 0; i < usercount; i++)
    {
        stUserListNode *node = new stUserListNode;
        if (!OnGetUsersRecvFrom(node, nodelen, m_ServerIP, m_serverPort))
        {
            printf("Get user info from server timeout...\n");
            continue;
        }

        m_UserList.push_back(node);

        in_addr tmp;
        tmp.S_un.S_addr = htonl(node->ip);
        printf("Username:%s\nUserIP:%s\nUserPort:%d\n\n", node->userName, inet_ntoa(tmp), node->port);
        nCntRecved++;
    }
    printf("Has received %d users form server\n", nCntRecved);

    m_hThread = CreateThread(NULL, NULL, RecvThreadProc, m_worker, NULL, NULL);
    if (m_hThread == INVALID_HANDLE_VALUE)
    {
        printf("CreateThread Failed!\n");
        return false;
    }

    return true;
}

bool CWorker::OnGetU()
{
    int msglen = sizeof(Msg);
    Msg msg = { 0 };
    msg.head.sCmd = CMDP2PGETALLUSER;   // 傳送 CMDGETALLUSER 命令,獲取使用者
    msg.head.nContentLen = 0;
    msg.head.CRC32 = ::GetCRC32(msg.content.szMsg, 0);

    for (int i = 0; i < MAX_RESEND; i++)
    {
        if (msglen != m_sock->SendTo(&msg, msglen, m_serverPort, m_ServerIP))
        {
            printf("sendto failed...\n");
            continue;
        }

        for (int j = 0; j < 15; j++)
        {
            // 是否收到服務端的回覆
            if (m_bReplyACK)
                return true;
            Sleep(100);
        }
    }

    printf("get users timeout...\n");
    return false;
}

bool CWorker::OnSend(char* sendname, char* message)
{
    unsigned int UserIP = 0;
    unsigned short UserPort = 0;
    bool bFindUser = false;
    for (UserList::iterator it = m_UserList.begin(); it != m_UserList.end(); ++it)
    {
        if (strcmp((*it)->userName,sendname) == 0)
        {
            UserIP = (*it)->ip;
            UserPort = (*it)->port;
            bFindUser = true;
        }
    }

    if (!bFindUser)
    {
        printf("Can't find username!\nSend failed!\n");
        return false;
    }

    Msg p2pmsg = { 0 };
    strcpy_s(p2pmsg.content.p2pmesssage.username, USERNAMELEN, m_UserName);
    strcpy_s(p2pmsg.content.p2pmesssage.message, MAXMSGLEN, message);
    p2pmsg.head.sCmd = CMDP2PMESSAGE;
    p2pmsg.head.nContentLen = USERNAMELEN + MAXMSGLEN;
    p2pmsg.head.CRC32 = ::GetCRC32(p2pmsg.content.szMsg, p2pmsg.head.nContentLen);

    int msglen = sizeof(Msg);

    /* 1、
    ** 傳送訊息給使用者,每1.5秒進行一次重傳,重傳次數為5次
    */
    m_bReplyACK = false;
    for (int i = 0; i < MIN_RESEND; i++)
    {
        if (msglen != m_sock->SendTo2(&p2pmsg, msglen, UserPort, UserIP))
        {
            printf("sendto() failed:%d\n",GetLastError());
            continue;
        }

        for (int j = 0; j < 15; j++)// 1.5 秒沒有收到確認包,進行重傳
        {
            if (m_bReplyACK)
            {
                printf("Send OK!\n");
                return true;
            }
            Sleep(100);
        }
    }

    /*  2、
    **  沒有收到目標主機的迴應時,認為與目標主機之間沒有進行穿透
    **  傳送穿透資訊給伺服器,請求打洞穿透
    */
    printf("Prepare p2p to %s by P2Pserver...\n",sendname);
    Msg transmsg = { 0 };
    strcpy_s(transmsg.content.requesttransmsg.requestName, USERNAMELEN, m_UserName);
    strcpy_s(transmsg.content.requesttransmsg.toName, USERNAMELEN, sendname);
    transmsg.head.sCmd = CMDP2PTRANS;
    transmsg.head.nContentLen = sizeof(stP2PClientRequestTrans);
    transmsg.head.CRC32 = ::GetCRC32(transmsg.content.szMsg, transmsg.head.nContentLen);

    m_bReplyACK = false;
    for (int i = 0; i < MIN_RESEND; i++)
    {
        if(msglen != m_sock->SendTo(&transmsg, msglen, m_serverPort, m_ServerIP))
        {
            printf("sendto() failed:%d\n", GetLastError());
            continue;
        }

        for (int j = 0; j < 15; j++)
        {
            if (m_bReplyACK)
                break; 

            Sleep(100);
        }

        if (m_bReplyACK)
            break;
    }

    /* 3、
    ** 再次 傳送訊息給使用者,每1.5秒進行一次重傳,重傳次數為5次
    */
    m_bReplyACK = false;
    for (int i = 0; i < MIN_RESEND; i++)
    {
        if (msglen != m_sock->SendTo2(&p2pmsg, msglen, UserPort, UserIP))
        {
            printf("sendto() failed:%d\n", GetLastError());
            continue;
        }

        for (int j = 0; j < 15; j++)// 1.5 秒沒有收到確認包,進行重傳
        {
            if (m_bReplyACK)
            {
                printf("Send OK!\n");
                return true;
            }
            Sleep(100);
        }
    }

    printf("Send failed!\n");
    return false;
}

bool CWorker::OnExit()
{
    Msg exitmsg = { 0 };
    strcpy_s(exitmsg.content.logoutmember.userName, USERNAMELEN, m_UserName);
    exitmsg.head.sCmd = CMDLOGOUT;
    exitmsg.head.nContentLen = strlen(m_UserName);
    exitmsg.head.CRC32 = ::GetCRC32(exitmsg.content.szMsg, exitmsg.head.nContentLen);
    int msglen = sizeof(Msg);

    m_bReplyACK = false;
    for (int i = 0; i < MAX_RESEND; i++)
    {
        if (msglen != m_sock->SendTo(&exitmsg,msglen,m_serverPort,m_ServerIP))
            continue;
        for (int j = 0; j < 15; j++)
        {
            if (m_bReplyACK)
                return true;
            Sleep(100);
        }
    }
    return false;
}

bool CWorker::OnCmdRecvFrom(Msg & msg,