P2P打洞伺服器與客戶端
本文從以下四個部分開始分析講解,客戶端與服務端的原始碼在文章末尾連結
一、P2P打洞的原理
二、P2P伺服器的實現
三、P2P客戶端的實現
四、資料包格式
一、P2P打洞原理
1、打洞解決了什麼問題?
我們平常使用的一般都為私有ip,但是私有ip之間是不能直接通訊的,如果要進行通訊只能通過公網上的伺服器進行資料的轉發,難道我們每次傳送資料都要經過公網上的伺服器轉發嗎?也不是不可以,但是伺服器的承受能力就會大大增加。此時就需要我們的打洞技術的出現了,打洞的出現解決了私有ip之間直接通訊的問題(還是需要經過一次公網伺服器)
例如:QQ中的聊天就廣泛的使用到了打洞技術,不然伺服器的承受能力會大大增加,而且會影響聊天的效率。
2、打洞的實現過程與原理
私有ip的資料都要經過路由器的轉發,路由器上有一張NAPT表(IP埠對映表),NAPT表記錄的是【私有IP:埠】與【公有IP:埠】的對映關係(就是一一對應關係),本文講到的路由均是以NAPT為工作模式,這並不影響對打洞。實際中的資料實際傳送給的都是路由器的【公有IP:埠】,然後經過路由器進過查詢路由表後再轉發給【私有的IP:埠】的。
舉個示例:
使用者A
電腦IP:192.168.1.101
桌面上有個客戶端程式採用的網路埠:10000
路由器的公有IP:120.78.201.201(實際中常常為多級路由,這裡以最簡單的一層路由舉例)
NAPT路由器的NAPT表的其中一條記錄為:【120.78.201.201:20202】-【192.168.1.101:10000】
使用者B
電腦IP:192.168.2.202
桌面上有個客戶端程式採用的網路埠:22222
路由器的公有IP:120.78.202.202
NAPT路由器的NAPT表的其中一條記錄為:【120.78.202.202:20000】-【192.168.2.202:22222】
打洞伺服器P2Pserver
IP:120.78.202.100
port:20000
此時使用者A的電腦發給了伺服器一條資料,伺服器收到使用者A的IP與埠是多少呢?當然為120.78.201.201:20202,資料包經過路由的時候進行了重新的封包。如果伺服器此時發一條資料給使用者A,發往的IP與埠是什麼呢?當然為120.78.201.201:20202,此時路由器收到這個資料包後,進行查詢NAPT表中120.78.201.201:20202對應的IP與埠資訊,發現是192.168.1.101:10000,然後路由器就轉發給IP為192.168.1.101:10000的電腦,然後電腦上的應用程式就收到這條資訊了。
既然如此,我們私有IP雖然不能直接通訊,但是我們能夠發給公有IP!如果使用者B需要給使用者A發一條資訊時,使用者B直接將資料發往目的IP、埠為120.78.201.201:20202的地方不就行了?
這裡有兩個問題:
第一,使用者B怎麼知道使用者A在路由上對映的IP與埠;
第二,使用者B直接將資料包發往120.78.201.201:20202,路由器是會將使用者B的資料包丟棄的,因為路由器裡面沒有關於使用者B120.78.202.202的路由資訊(路由器裡面還有個路由表,用於路由),無法進行路由,所以將會進行丟棄。
如何解決第一個問題?
通過打洞伺服器,將使用者A對映的IP、埠資訊告訴使用者B即可。
如何解決第二個問題?
如果打洞伺服器首先告訴使用者A先發一條資訊給使用者B(使用者A得知使用者B的地址資訊也是通過打洞伺服器),注意此時使用者B是收不到的,使用者B的路由同樣會進行丟棄,但是這並不要緊,因為使用者A發了這條資訊後,使用者A的路由就會記錄關於使用者B的路由資訊(該資訊記錄的是將使用者B的IP資訊路由到使用者A電腦),然後此時使用者B再發給使用者A一條資訊,就不會進行丟棄了,因為使用者A的路由裡面有使用者B的路由資訊。
通過解決上面的兩個問題後,我們再通過圖形示意圖來具體瞭解下打洞的過程
整個過程就是我標的序號1->2->3->4->5->6->7->8>9
過程一:1->2
此過程為使用者B向伺服器請求向用戶A打洞
過程二:3->4
此過程為伺服器相應使用者B的打洞請求,告訴使用者A使用者B想與你打洞(資料包中包含使用者B的地址資訊)。
過程三:5->6
使用者A主動發一條資訊給使用者B,目的是為了使得路由器A中能夠有一條關於路由B的IP的路由資訊(注意不是使用者B,使用者B是私有IP),就如圖所示,這條資訊會被丟棄的,因為路由B的路由表中沒有路由A的IP的資訊。
過程四:7->8->9
使用者B再發一條資訊給使用者A,因為此時路由A的路由表中有關於路由B的IP的路由資訊,此時路由A就能路由給使用者A了,至此,使用者A就能直接收到使用者B發的資訊了。注意,此時使用者A發給使用者B不需要打洞,因為路由B中已經有關於路由A的IP的路由資訊了。
還是思路重要,所以有點囉嗦了…本來以為自己對P2P瞭解的算不錯了,但是在書寫的時候發現在細枝末節還是半知半解,因此特此去查閱了一些資料。本文難免有一些不足,希望大家指出不足。
二、P2P伺服器的實現
服務端的實現:
服務端採用的是基於UDP的IOCP網路模型實現資料的收發。實現了對客戶端資料的分發處理,心跳檢測是否線上,資料包完整性的CRC校驗等
原始碼過長,就貼下服務端處理資料的工作者類CWorker,原始碼自行下載
工作者類-Worker.h
#ifndef __WORKER_H__
#define __WORKER_H__
#include "IOCPServer.h"
#include <map>
using namespace std;
typedef list<stUserListNode *> UserList;
typedef map<stUserListNode*,int> UserMap;
class CWorker
{
public:
CWorker(int nPort = SERVER_PORT, HWND hWnd = NULL);
~CWorker();
void ShowServerInfo(); // 顯示伺服器的相關配置資訊
bool OpenHeartThread(); // 開啟心跳檢測執行緒
void OnUsers(); // 顯示所有線上使用者
static CWorker* m_worker;
protected:
static void CALLBACK NotifyProc(LPVOID lParam, OVERLAPPEDPLUS* pContent, DWORD dwSize, UINT nCode);
void OnCMDLogIn(Msg msg, SOCKADDR_IN addr); // 登入資訊
void OnCMDLogOut(Msg msg, SOCKADDR_IN addr); // 下線資訊
void OnCMDP2Ptrans(Msg msg, SOCKADDR_IN addr); // 打洞請求資訊
void OnCMDGetAllUser(Msg msg, SOCKADDR_IN addr); // 獲取線上使用者資訊
bool SendPacketACK(MsgCmd msgCmd, SOCKADDR_IN addr); // 傳送確認包
void OnCMDHeartACK(Msg msg, SOCKADDR_IN addr);
static DWORD CALLBACK HeartbeatProc(LPVOID lparam); // 心跳包即時檢測客戶狀態
bool GetUserByName(stUserListNode & userNode, char* username);
HANDLE m_hHeartThread; // 心跳包檢測執行緒控制代碼
bool m_bExitTherad; // 心跳包檢測執行緒退出標識
CIOCPServer* m_IocpServer; // UDP IOCP伺服器
UserList m_userList; // 儲存使用者登入資訊的連結串列
UserMap m_userMap; // 使用者心跳包檢測,記錄心跳
static CRITICAL_SECTION m_cs;
};
#endif
工作者類-Worker.cpp
#include "Worker.h"
#include "Lock.h"
#define MAXHEART 10 // 最大心跳次數
CWorker* CWorker::m_worker;
CRITICAL_SECTION CWorker::m_cs;
/************************************************************************/
/* 心跳檢測,每一秒進行一次探測 */
/************************************************************************/
DWORD CALLBACK CWorker::HeartbeatProc(LPVOID lparam)
{
int addrlen = sizeof(SOCKADDR_IN);
SOCKADDR_IN clientAddr = { 0 };
int msglen = sizeof(Msg);
Msg heartmsg = { 0 };
heartmsg.head.sCmd = CMDHEARTMSG;
heartmsg.head.nContentLen = 0;
heartmsg.head.CRC32 = ::GetCRC32(heartmsg.content.szMsg, heartmsg.head.nContentLen);
while (!(m_worker->m_bExitTherad))
{
if (m_worker->m_userList.size() == 0)
{
Sleep(100);
continue;
}
Sleep(2000);
CLock cs(m_cs, "HeartbeatProc");
for (UserList::iterator it = m_worker->m_userList.begin();
it != m_worker->m_userList.end(); ++it)
{
if (m_worker->m_userMap[*it] == MAXHEART)//達到最大心跳次數,刪除使用者資訊
{
printf("user [%s] logout -> has max heartbeat\n",(*it)->userName);
m_worker->m_userMap.erase(*it);
stUserListNode* tmp = *it;
it = m_worker->m_userList.erase(it);
delete tmp;
if (it == m_worker->m_userList.end())
break;
}
m_worker->m_userMap[*it]++;//心跳加一
ZeroMemory(&clientAddr, addrlen);
clientAddr.sin_family = AF_INET;
clientAddr.sin_port = ntohs((*it)->port);
clientAddr.sin_addr.s_addr = htonl((*it)->ip);
m_worker->m_IocpServer->PostSend(&clientAddr, (LPBYTE)&heartmsg, msglen);
}
}
return 0;
}
void CALLBACK CWorker::NotifyProc(LPVOID lParam, OVERLAPPEDPLUS* pContent, DWORD dwSize, UINT nCode)
{
Msg msg = { 0 };
memcpy_s(&msg, sizeof(Msg), pContent->buf, sizeof(Msg));
SOCKADDR_IN addr = pContent->remoteAddr;
switch (msg.head.sCmd)
{
case CMDLOGIN:
m_worker->SendPacketACK(CMDLOGINACK, addr);
m_worker->OnCMDLogIn(msg, addr);
break;
case CMDLOGOUT:
m_worker->SendPacketACK(CMDLOGOUTACK, addr);
m_worker->OnCMDLogOut(msg, addr);
break;
case CMDP2PTRANS:
m_worker->SendPacketACK(CMDP2PTRANSACK, addr);
m_worker->OnCMDP2Ptrans(msg, addr);
break;
case CMDP2PGETALLUSER:
m_worker->SendPacketACK(CMDP2PGETALLUSERACK, addr);
m_worker->OnCMDGetAllUser(msg, addr);
break;
case CMDHEARTMSGACK:
m_worker->OnCMDHeartACK(msg, addr);
break;
default:
break;
}
}
void CWorker::OnCMDHeartACK(Msg msg, SOCKADDR_IN addr)
{
unsigned int nIp = ntohl(addr.sin_addr.S_un.S_addr);
unsigned short nPort = ntohs(addr.sin_port);
CLock cs(m_cs, "OnCMDHeartACK");
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++ it)
{
if ((*it)->ip == nIp && (*it)->port == nPort &&
strcmp(msg.content.heartmessage.userName,(*it)->userName) == 0)
{
m_userMap[*it] = 0;
return;
}
}
}
void CWorker::OnCMDLogIn(Msg msg, SOCKADDR_IN addr)
{
/*
對於使用者登入的密碼檢驗沒有
採用 MD5碼 進行校驗
*/
stUserListNode* userInfo = new stUserListNode;
strcpy_s(userInfo->userName, 10, msg.content.loginmember.userName);
userInfo->ip = ntohl(addr.sin_addr.S_un.S_addr);
userInfo->port = ntohs(addr.sin_port);
// 防止執行緒同步導致多條相同使用者資訊被存放到連結串列中
// 出現的情況是:由於網路阻塞,一瞬間同時收到同一個客戶端的10條登入請求,此時10個執行緒同時工作(同時對連結串列操作),此時必須避免執行緒同步的問題!
CLock* cs = new CLock(m_cs, "OnCMDLogIn");
// 檢測該使用者是否已經存在
bool bExist = false;
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
{
if (strcmp(userInfo->userName,(*it)->userName) == 0 &&
userInfo->ip == (*it)->ip && userInfo->port == (*it)->port)
{
bExist = true;
break;
}
}
// 將新連線的使用者,加入連結串列中
if (!bExist)
{
printf("------user login : %s <-> [%s:%d]\n", userInfo->userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
m_userList.push_back(userInfo);
m_userMap[userInfo] = 0;
}
delete cs;// 這裡不用在末尾釋放已達到提升速度
// 傳送已經登入的客戶資訊給客戶端
// 1、傳送 登入客戶的個數,客戶端根據個數進行接收
int usercount = m_userList.size();
Msg usercntmsg = { 0 };
usercntmsg.head.sCmd = CMDP2PGETUSERCNT;
usercntmsg.head.nContentLen = sizeof(int);
usercntmsg.head.CRC32 = ::GetCRC32((char*)&usercount, sizeof(int));
usercntmsg.content.usercount = usercount;
m_IocpServer->PostSend((SOCKADDR_IN*)&addr, (LPBYTE)&usercntmsg, sizeof(Msg));
// 2、開始逐條傳送 客戶資訊
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
{
m_IocpServer->PostSend((SOCKADDR_IN*)&addr, (LPBYTE)(*it), sizeof(stUserListNode));
}
printf("send user list information to : %s <-> [%s:%d]\n", userInfo->userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}
void CWorker::OnCMDLogOut(Msg msg, SOCKADDR_IN addr)
{
CLock cs(m_cs, "OnCMDLogOut");
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
{
if (strcmp((*it)->userName,msg.content.logoutmember.userName) == 0)
{
printf("------user logout : %s <-> [%s:%d]\n", msg.content.logoutmember.userName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
m_userMap.erase(*it);
m_userList.remove(*it); // 移除連結串列節點
return;
}
}
}
void CWorker::OnCMDP2Ptrans(Msg msg, SOCKADDR_IN addr)
{
printf("[%s:%d] wants to p2p %s\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), msg.content.requesttransmsg.toName);
stUserListNode userNode;
// 獲取使用者資訊--使用者可能已經下線
if (!GetUserByName(userNode,msg.content.requesttransmsg.toName))
{
// 使用者下線處理
return;
}
// 1、獲取被打洞端的資訊
SOCKADDR_IN remote = { 0 };
remote.sin_family = AF_INET;
remote.sin_port = htons(userNode.port);
remote.sin_addr.s_addr = htonl(userNode.ip);
printf("tell %s[%s:%d] to send p2ptrans message to: ", msg.content.requesttransmsg.toName,inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));
printf("%s[%s:%d]\n", msg.content.requesttransmsg.requestName, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
// 2、打洞資料包封裝
Msg transmsg = { 0 };
transmsg.content.servtransmsg.nIP = ntohl(addr.sin_addr.S_un.S_addr);
transmsg.content.servtransmsg.nPort = ntohs(addr.sin_port);
strcpy_s(transmsg.content.servtransmsg.userName, USERNAMELEN, msg.content.requesttransmsg.requestName);
transmsg.head.sCmd = CMDREQUESTP2P;
transmsg.head.nContentLen = sizeof(stP2PTranslate);
transmsg.head.CRC32 = ::GetCRC32(transmsg.content.szMsg, transmsg.head.nContentLen);
// 3、傳送打洞資料包給 被打洞方,要求 被打洞方 主動更新路由器表資訊
m_IocpServer->PostSend(&remote, (LPBYTE)&transmsg, sizeof(Msg));
}
void CWorker::OnCMDGetAllUser(Msg msg, SOCKADDR_IN addr)
{
// 1、傳送客戶個數
msg.content.usercount = m_userList.size();
msg.head.sCmd = CMDP2PGETUSERCNT;
msg.head.nContentLen = sizeof(int);
msg.head.CRC32 = ::GetCRC32(msg.content.szMsg, msg.head.nContentLen);
m_IocpServer->PostSend(&addr, (LPBYTE)&msg, sizeof(Msg));
// 2、迴圈傳送客戶資訊
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
{
m_IocpServer->PostSend(&addr, (LPBYTE)(*it), sizeof(stUserListNode));
}
printf("send user list information to : [%s:%d]\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}
bool CWorker::SendPacketACK(MsgCmd msgCmd, SOCKADDR_IN addr)
{
Msg msgACK = { 0 };
msgACK.head.sCmd = msgCmd;
msgACK.head.nContentLen = 0;
msgACK.head.CRC32 = ::GetCRC32(msgACK.content.szMsg, 0);
m_IocpServer->PostSend(&addr, (LPBYTE)&msgACK, sizeof(Msg));
return true;
}
bool CWorker::GetUserByName(stUserListNode & userNode, char* username)
{
CLock cs(m_cs, "GetUserByName");
for (UserList::iterator it = m_userList.begin(); it != m_userList.end(); ++it)
{
if (strcmp((*it)->userName,username) == 0)
{
userNode = **it;
return true;
}
}
return false;
}
CWorker::CWorker(int nPort, HWND hWnd)
:m_IocpServer(NULL)
,m_bExitTherad(false)
{
InitializeCriticalSection(&m_cs);
m_IocpServer = new CIOCPServer;
if (!m_IocpServer->Initialize(NotifyProc, hWnd, nPort))
printf("服務端啟動失敗:%d", GetLastError());
printf("P2P伺服器啟動成功...\n");
ShowServerInfo();
}
CWorker::~CWorker()
{
try
{
if (m_IocpServer)
{
DeleteCriticalSection(&m_cs);
m_bExitTherad = true;
delete m_IocpServer;
Sleep(100);// 等待資源釋放完畢
}
}
catch (...) {}
}
void CWorker::ShowServerInfo()
{
printf("伺服器開啟的執行緒數 : %d\n", m_IocpServer->m_nCurrentThreads);
printf("伺服器埠號 : %d\n", m_IocpServer->m_nPort);
printf("正在工作的執行緒數 : %d\n", m_IocpServer->m_nBusyThreads);
printf("記憶體中的元素個數 : %d\n", m_IocpServer->m_listContexts.size());
printf("伺服器傳送即時速度 : %d KB/s\n", m_IocpServer->m_nSendKbps);
printf("伺服器接收即時速度 : %d KB/s\n", m_IocpServer->m_nRecvKbps);
}
bool CWorker::OpenHeartThread()
{
m_hHeartThread = CreateThread(NULL, NULL, HeartbeatProc, m_worker, NULL, NULL);
if (m_hHeartThread == INVALID_HANDLE_VALUE)
{
printf("CreateThread failed!\n");
return false;
}
return true;
}
void CWorker::OnUsers()
{
CLock cs(m_cs, "OnUsers");
printf("\nHave %d users\n", m_userMap.size());
if (m_userMap.size() == 0)
return;
for (UserMap::iterator it = m_userMap.begin(); it != m_userMap.end(); it++)
{
in_addr tmp;
tmp.S_un.S_addr = htonl(it->first->ip);
printf("Username:%s\tIP:%s\tport:%d\theart:%d\n", it->first->userName, inet_ntoa(tmp), it->first->port,it->second);
}
}
三、P2P客戶端的實現
客戶端的實現:
客戶端採用普通UDP套接字加多執行緒實現。實現了超時重發,使用者登入密碼的MD5加密,資料包完整性的CRC校驗等
客戶端的工作者類CWorker
Worker.h
#ifndef __WORKER_H__
#define __WORKER_H__
#include "Socket.h"
#include "../public/MsgProtocal.h"
#include <list>
#define MAX_RESEND 10 // 最大重發次數
#define MIN_RESEND 5 // 最小重發次數
#define MAX_RECV 10 // 最大接收次數
#define MAX_CONNECT 10 // 連線伺服器的次數
typedef std::list<stUserListNode *> UserList;
class CWorker
{
public:
CWorker();
~CWorker();
static CWorker* m_worker;
static DWORD WINAPI RecvThreadProc(LPVOID lparam); // 接收訊息的執行緒
bool IsIP(char* ip);
bool InitNetEnvironment(int nPort=0);
bool OnLogin();
void SuspendRecvThread() const
{
SuspendThread(m_hThread);
}
void ResumeRecvThread() const
{
ResumeThread(m_hThread);
}
/*
** 使用者命令響應
*/
bool OnGetU();
/*
** 傳送資訊給線上使用者
*/
bool OnSend(char* sendname, char* message);
/*
** 下線
*/
bool OnExit();
protected:
MsgCmd m_cmdACK; // 確認訊息的型別
bool m_bReplyACK; // 是否判斷回覆的確認訊息
UserList m_UserList;
Socket* m_sock;
unsigned int m_serverPort;
HANDLE m_hThread; // 執行緒控制代碼
bool m_bExit; // 執行緒退出標識
char m_ServerIP[20];
char m_UserName[USERNAMELEN];
char m_UserPwd[USERPASSWORDLEN];
bool CheckPackage(Msg msg);
// 丟包重發機制的收發
bool ConnectToServer(char* serverip, char* szName, char* szPwd, int max_time = MAX_CONNECT);
bool SendtoServer(Msg & msg, unsigned short nCmd, char* dstip, unsigned int dstport, int max_time = MAX_RESEND);
// 接收指定檔案頭的資料包
bool OnCmdRecvFrom(Msg & msg, unsigned short nCmd, char* dstip, unsigned int dstport, int max_time = MAX_RECV);
bool OnGetUsersRecvFrom(void* recvbuf, int recvlen, char* dstip, unsigned int dstport, int max_time = MAX_RECV);
};
#endif
Worker.cpp
#include "Worker.h"
#include "../public/CRC32.h"
#include "../public/MD5.h"
#include <string>
#include <regex>
using namespace std;
CWorker* CWorker::m_worker;
DWORD WINAPI CWorker::RecvThreadProc(LPVOID lparam)
{
CWorker* pThis = (CWorker*)lparam;
Msg recvMsg = { 0 };
int msglen = sizeof(Msg);
char szAddress[32] = { 0 };
unsigned int nPort = 0;
while (!(pThis->m_bExit))
{
// 此處可以改進為使用 接收的訊息佇列
// 其中一個執行緒作為接收,放入訊息隊裡中,其中一個執行緒不停的在訊息佇列中獲取訊息,並進行處理!!!
ZeroMemory(&recvMsg, msglen);
ZeroMemory(szAddress, sizeof(szAddress));
int iread = pThis->m_sock->ReceiveFrom(&recvMsg, msglen, szAddress, nPort);
if (iread <= 0 || iread != msglen) // recv error
{
//printf("recvfrom failed : %d\n", GetLastError());
continue;
}
// CRC差錯檢測
if (recvMsg.head.CRC32 != ::GetCRC32(recvMsg.content.szMsg,recvMsg.head.nContentLen))
continue;
/******************************************************/
/* 根據資料包訊息頭進行分發處理 */
/*******************************************************/
switch (recvMsg.head.sCmd)
{
case CMDP2PMESSAGEACK:// P2P訊息確認包的處理,來自客戶端
{
pThis->m_bReplyACK = true;
printf("Recv message ack from %s:%ld\n", szAddress, nPort);
break;
}
case CMDP2PMESSAGE:// 接收到P2P訊息的處理,來自客戶端
{
printf("Recv Message from %s[%s:%d] -> %s\n", recvMsg.content.p2pmesssage.username,
szAddress, nPort, recvMsg.content.p2pmesssage.message);
ZeroMemory(&recvMsg, msglen);
recvMsg.head.sCmd = CMDP2PMESSAGEACK;
recvMsg.head.nContentLen = 0;
recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);
pThis->m_sock->SendTo(&recvMsg, msglen, nPort, szAddress);
printf("Send a Message ACK to %s[%s:%d]\n", recvMsg.content.p2pmesssage.username, szAddress, nPort);
break;
}
case CMDP2PGETALLUSERACK:// getu 獲取所有使用者資訊回覆包,來自服務端
{
pThis->m_bReplyACK = true;
printf("Recv getu ack from server\n");
ZeroMemory(&recvMsg, sizeof(Msg));
if (!pThis->OnCmdRecvFrom(recvMsg, CMDP2PGETUSERCNT,pThis->m_ServerIP, pThis->m_serverPort))
{
printf("Get user count from server timeout...\n");
continue;
}
int usercount = recvMsg.content.usercount;
printf("Have %d users logined server\n", usercount);
// 3、開始逐條接收(可能會有丟包出現,可以記錄是哪個包丟失,然後進行重新申請)
int nCntRecved = 0;
int nodelen = sizeof(stUserListNode);
pThis->m_UserList.clear();
for (int i = 0; i < usercount; i++)
{
stUserListNode *node = new stUserListNode;
if (!pThis->OnGetUsersRecvFrom(node, nodelen, pThis->m_ServerIP, pThis->m_serverPort))
{
printf("Get user info from server timeout...\n");
continue;
}
pThis->m_UserList.push_back(node);
in_addr tmp;
tmp.S_un.S_addr = htonl(node->ip);
printf("Username:%s\nUserIP:%s\nUserPort:%d\n\n", node->userName, inet_ntoa(tmp), node->port);
nCntRecved++;
}
printf("Has received %d users form server\n", nCntRecved);
break;
}
case CMDP2PTRANSACK:// 主動請求打洞確認包,來自服務端
{
pThis->m_bReplyACK = true;
printf("Recv p2ptransACK from server\n");
break;
}
case CMDREQUESTP2P:// 對方的打洞請求,來自服務端的轉發
{
printf("Recv P2P request from %s\n",recvMsg.content.servtransmsg.userName);
printf("Send P2P request ACK to %s\n", recvMsg.content.servtransmsg.userName);
unsigned int toIP = recvMsg.content.servtransmsg.nIP;
unsigned short toPort = recvMsg.content.servtransmsg.nPort;
ZeroMemory(&recvMsg, msglen);
recvMsg.head.sCmd = CMDREQUESTP2PACK;
recvMsg.head.nContentLen = 0;
recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);
pThis->m_sock->SendTo2(&recvMsg, msglen, toPort, toIP);
break;
}
case CMDREQUESTP2PACK:// 對方發來的打洞訊息,忽略,來自客戶端
{
// do nothing
printf("Recv P2P burrow ACK from %s:%d\n", szAddress, nPort);
break;
}
case CMDLOGOUTACK:// 確認下線訊息
{
pThis->m_bReplyACK = true;
break;
}
case CMDHEARTMSG:// 心跳包訊息
{
//printf("recv one heart packet \n");
ZeroMemory(&recvMsg, msglen);
strcpy_s(recvMsg.content.heartmessage.userName, pThis->m_UserName);
recvMsg.head.sCmd = CMDHEARTMSGACK;
recvMsg.head.nContentLen = strlen(pThis->m_UserName);
recvMsg.head.CRC32 = ::GetCRC32(recvMsg.content.szMsg, recvMsg.head.nContentLen);
pThis->m_sock->SendTo(&recvMsg, msglen, pThis->m_serverPort, pThis->m_ServerIP);
//printf("reply heart ack to server\n");
break;
}
}
}
printf("Worker Thread exit...\n");
return 0;
}
bool CWorker::InitNetEnvironment(int nPort/*=0*/)
{
if (!m_sock->Create(nPort, SOCK_DGRAM))
return false;
return true;
}
bool CWorker::IsIP(char* ip)
{
regex pattern("\\b([1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\.([0-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4])\\b");
return regex_match(ip, pattern);
}
bool CWorker::CheckPackage(Msg msg)
{
if (msg.head.CRC32 == ::GetCRC32(msg.content.szMsg, msg.head.nContentLen))
return true;
return false;
}
bool CWorker::OnLogin()
{
printf("\n");
do
{
ZeroMemory(m_ServerIP, sizeof(m_ServerIP));
printf("Please input server ip:");
scanf_s("%s", m_ServerIP, sizeof(m_ServerIP));
} while (!IsIP(m_ServerIP));
printf("Please input your name:");
ZeroMemory(m_UserName, USERNAMELEN);
scanf_s("%s", m_UserName, sizeof(m_UserName));
printf("Please input your password:");
ZeroMemory(m_UserPwd, USERPASSWORDLEN);
scanf_s("%s", m_UserPwd, sizeof(m_UserPwd));
return ConnectToServer(m_ServerIP, m_UserName, m_UserPwd);
}
bool CWorker::ConnectToServer(char* serverip, char* szName, char* szPwd, int max_time)
{
// 1、對密碼進行MD5加密處理
string check = szName + string("#") + ::md5(szPwd);
MsgContent msgcontent = { 0 };
strcpy(msgcontent.loginmember.userName, szName);
strcpy(msgcontent.loginmember.check, check.c_str());
// 2、封裝資料包,進行CRC32校驗
Msg msg = { 0 };
msg.head.sCmd = CMDLOGIN;
msg.head.nContentLen = strlen(msgcontent.szMsg);
msg.head.CRC32 = GetCRC32(msgcontent.szMsg, msg.head.nContentLen);
memcpy(&msg.content.loginmember, &msgcontent, sizeof(stLoginMessage));
// 3、傳送登入資料包 登入伺服器
printf("begin to connect server...\n");
if(!SendtoServer(msg,CMDLOGINACK,m_ServerIP,m_serverPort))
{
printf("connect server timeout...\n");
return false;
}
// 4、登入伺服器成功,開始接收線上端使用者數量
ZeroMemory(&msg, sizeof(Msg));
int usercount = 0;
int iread = 0;
if (!OnCmdRecvFrom(msg, CMDP2PGETUSERCNT, m_ServerIP, m_serverPort))
{
printf("Get user count from server timeout...\n");
return false;
}
usercount = msg.content.usercount;
printf("Have %d users logined server\n", usercount);
// 5、接收線上使用者資訊
int nCntRecved = 0;
int nodelen = sizeof(stUserListNode);
m_UserList.clear();
for (int i = 0; i < usercount; i++)
{
stUserListNode *node = new stUserListNode;
if (!OnGetUsersRecvFrom(node, nodelen, m_ServerIP, m_serverPort))
{
printf("Get user info from server timeout...\n");
continue;
}
m_UserList.push_back(node);
in_addr tmp;
tmp.S_un.S_addr = htonl(node->ip);
printf("Username:%s\nUserIP:%s\nUserPort:%d\n\n", node->userName, inet_ntoa(tmp), node->port);
nCntRecved++;
}
printf("Has received %d users form server\n", nCntRecved);
m_hThread = CreateThread(NULL, NULL, RecvThreadProc, m_worker, NULL, NULL);
if (m_hThread == INVALID_HANDLE_VALUE)
{
printf("CreateThread Failed!\n");
return false;
}
return true;
}
bool CWorker::OnGetU()
{
int msglen = sizeof(Msg);
Msg msg = { 0 };
msg.head.sCmd = CMDP2PGETALLUSER; // 傳送 CMDGETALLUSER 命令,獲取使用者
msg.head.nContentLen = 0;
msg.head.CRC32 = ::GetCRC32(msg.content.szMsg, 0);
for (int i = 0; i < MAX_RESEND; i++)
{
if (msglen != m_sock->SendTo(&msg, msglen, m_serverPort, m_ServerIP))
{
printf("sendto failed...\n");
continue;
}
for (int j = 0; j < 15; j++)
{
// 是否收到服務端的回覆
if (m_bReplyACK)
return true;
Sleep(100);
}
}
printf("get users timeout...\n");
return false;
}
bool CWorker::OnSend(char* sendname, char* message)
{
unsigned int UserIP = 0;
unsigned short UserPort = 0;
bool bFindUser = false;
for (UserList::iterator it = m_UserList.begin(); it != m_UserList.end(); ++it)
{
if (strcmp((*it)->userName,sendname) == 0)
{
UserIP = (*it)->ip;
UserPort = (*it)->port;
bFindUser = true;
}
}
if (!bFindUser)
{
printf("Can't find username!\nSend failed!\n");
return false;
}
Msg p2pmsg = { 0 };
strcpy_s(p2pmsg.content.p2pmesssage.username, USERNAMELEN, m_UserName);
strcpy_s(p2pmsg.content.p2pmesssage.message, MAXMSGLEN, message);
p2pmsg.head.sCmd = CMDP2PMESSAGE;
p2pmsg.head.nContentLen = USERNAMELEN + MAXMSGLEN;
p2pmsg.head.CRC32 = ::GetCRC32(p2pmsg.content.szMsg, p2pmsg.head.nContentLen);
int msglen = sizeof(Msg);
/* 1、
** 傳送訊息給使用者,每1.5秒進行一次重傳,重傳次數為5次
*/
m_bReplyACK = false;
for (int i = 0; i < MIN_RESEND; i++)
{
if (msglen != m_sock->SendTo2(&p2pmsg, msglen, UserPort, UserIP))
{
printf("sendto() failed:%d\n",GetLastError());
continue;
}
for (int j = 0; j < 15; j++)// 1.5 秒沒有收到確認包,進行重傳
{
if (m_bReplyACK)
{
printf("Send OK!\n");
return true;
}
Sleep(100);
}
}
/* 2、
** 沒有收到目標主機的迴應時,認為與目標主機之間沒有進行穿透
** 傳送穿透資訊給伺服器,請求打洞穿透
*/
printf("Prepare p2p to %s by P2Pserver...\n",sendname);
Msg transmsg = { 0 };
strcpy_s(transmsg.content.requesttransmsg.requestName, USERNAMELEN, m_UserName);
strcpy_s(transmsg.content.requesttransmsg.toName, USERNAMELEN, sendname);
transmsg.head.sCmd = CMDP2PTRANS;
transmsg.head.nContentLen = sizeof(stP2PClientRequestTrans);
transmsg.head.CRC32 = ::GetCRC32(transmsg.content.szMsg, transmsg.head.nContentLen);
m_bReplyACK = false;
for (int i = 0; i < MIN_RESEND; i++)
{
if(msglen != m_sock->SendTo(&transmsg, msglen, m_serverPort, m_ServerIP))
{
printf("sendto() failed:%d\n", GetLastError());
continue;
}
for (int j = 0; j < 15; j++)
{
if (m_bReplyACK)
break;
Sleep(100);
}
if (m_bReplyACK)
break;
}
/* 3、
** 再次 傳送訊息給使用者,每1.5秒進行一次重傳,重傳次數為5次
*/
m_bReplyACK = false;
for (int i = 0; i < MIN_RESEND; i++)
{
if (msglen != m_sock->SendTo2(&p2pmsg, msglen, UserPort, UserIP))
{
printf("sendto() failed:%d\n", GetLastError());
continue;
}
for (int j = 0; j < 15; j++)// 1.5 秒沒有收到確認包,進行重傳
{
if (m_bReplyACK)
{
printf("Send OK!\n");
return true;
}
Sleep(100);
}
}
printf("Send failed!\n");
return false;
}
bool CWorker::OnExit()
{
Msg exitmsg = { 0 };
strcpy_s(exitmsg.content.logoutmember.userName, USERNAMELEN, m_UserName);
exitmsg.head.sCmd = CMDLOGOUT;
exitmsg.head.nContentLen = strlen(m_UserName);
exitmsg.head.CRC32 = ::GetCRC32(exitmsg.content.szMsg, exitmsg.head.nContentLen);
int msglen = sizeof(Msg);
m_bReplyACK = false;
for (int i = 0; i < MAX_RESEND; i++)
{
if (msglen != m_sock->SendTo(&exitmsg,msglen,m_serverPort,m_ServerIP))
continue;
for (int j = 0; j < 15; j++)
{
if (m_bReplyACK)
return true;
Sleep(100);
}
}
return false;
}
bool CWorker::OnCmdRecvFrom(Msg & msg,