從零開始學習比特幣(五)--P2P網路建立的流程之套接字的讀取和傳送
寫在前面:
本篇文章接續
從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點
從零開始學習區塊鏈技術(三)-接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令
從零開始學習區塊鏈技術(二)–如何接入比特幣網路以及其原理分析
如果這篇文章看不明白,請務必先閱讀之前的文章。
從本節開始我們將開始講解比特幣系統中P2P網路是如何建立的。還記得在上一篇比特幣系統啟動的的第12步的講解中,我們提到有幾個執行緒相關的處理非常重要嗎?這一節正是基於此做了詳細的講解。
P2P 網路的建立是在上一篇文章系統啟動的第 12 步,最後時刻呼叫 CConnman::Start
本部分內容在 net.cpp
、net_processing.cpp
等檔案中。
下面開始講解各個執行緒的具體處理。
1、ThreadSocketHandler
這個執行緒主要用來處理套接字的讀取和傳送,呼叫系統提供的相關相關底層函式進行處理,把收到的訊息轉化成 CNetMessage
型別的物件,並儲存到節點的 vRecvMsg
集合中,把待發送的訊息從 vSendMsg
集合中取出來進行傳送。
執行緒定義在 net.cpp
檔案的 1155 行。下面我們開始進行具體的解讀。
這個方法的主體是一個 while 迴圈,只要 interruptNet 變數為空,就一直迴圈。
1. 如果布林變數 fNetworkActive 為假,則斷開所有已經連線的接點。
遍歷所有的節點列表,把沒有斷開的節點設定為斷開連線。
if (!fNetworkActive) { // Disconnect any connected nodes for (CNode* pnode : vNodes) { if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); pnode->fDisconnect = true; } } }
2. 接下來,斷開所有未使用的節點。
遍歷所有的節點列表,如果節點已經斷開連線,則進行下面的處理:
-
首先,從 vNodes 集合中刪除當前節點。
-
然後,呼叫
CloseSocketDisconnect
方法,關閉當前節點的套接字並進行清理。
方法內部設定fDisconnect
變數為真;如果當前套接字有效,則呼叫CloseSocket
方法,關閉套接字。CloseSocket
方法針對 win32 系統呼叫closesocket
方法,別的系統呼叫close
來關閉套接字,然後設定套接字為INVALID_SOCKET
。
具體方法如下所示:```void CNode::CloseSocketDisconnect() { fDisconnect = true; LOCK(cs_hSocket); if (hSocket != INVALID_SOCKET) { LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); } } ```
-
再然後,呼叫
Release
方法,把節點的引用數量nRefCount
減一。 -
最後,把當前節點放入 vNodesDisconnected 集合中。
3. 刪除所有已斷開連線的節點。
遍歷所有已經斷開連線的節點的集合 vNodesDisconnected
,如果當前節點的引用數量 nRefCount
小於等於0,進行下面的處理:
-
呼叫巨集
TRY_LOCK
獲取cs_inventory
鎖。如果可以獲取,則獲取cs_vSend
鎖。如果可以獲取,則設定變數fDelete
為真。 -
如果變數
fDelete
為真,則從vNodesDisconnected
集合中移除當前的節點,然後呼叫DeleteNode
方法,刪除節點。下面講解下
DeleteNode
方法。方法內部首先設定變數fUpdateConnectionTime
為真,然後呼叫訊息處理介面的NetEventsInterface
的FinalizeNode
方法,最終處理節點。如果變數fUpdateConnectionTime
在FinalizeNode
方法中被設定為真,則呼叫地址管理器物件的Connected
方法,進行處理。最後,刪除當前節點。程式碼如下:
`void CConnman::DeleteNode(CNode* pnode) { assert(pnode); bool fUpdateConnectionTime = false; m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); if(fUpdateConnectionTime) { addrman.Connected(pnode->addr); } delete pnode; }`
FinalizeNode
方法是一個純虛擬函式,具體由net_processing.cpp
檔案中的PeerLogicValidation
物件的FinalizeNode
方法實現。下面,我們來看這個函式的處理。-
設定引數
fUpdateConnectionTime
預設為真。 -
呼叫
State
方法,獲取節點狀態物件的指標。方法內部根據節點 ID,從
mapNodeState
集合中找到並返回對應的節點狀態物件。如果不存在,則返回空指標。 -
如果我們已經在這個對等節點上同步了區塊頭部,即節點狀態物件的
fSyncStarted
屬性為真,則設定變數nSyncStarted
減一。 -
如果對等節點的不良積分即節點狀態物件的
nMisbehavior
屬性等於0,且對等節點已經建立了完整的連線即節點狀態物件的fCurrentlyConnected
屬性為真,則設定變數fUpdateConnectionTime
為真。 -
遍歷狀態物件的
vBlocksInFlight
集合,並刪除所有的條目。 -
呼叫
EraseOrphansFor
方法,刪除與這個對等節點相關聯的孤兒交易。方法內部遍歷
mapOrphanTransactions
集合,如果當前孤兒交易的來自於指定的對等節點,則呼叫EraseOrphanTx
方法進行刪除。後者根據交易的雜湊ID,查詢
mapOrphanTransactions
集合對應的孤兒交易。如果找不到,則返回。如果找到則遍歷交易的所有輸入,如果當前輸入指向的父輸出不在mapOrphanTransactionsByPrev
集合中,則處理下一個;否則,在返回的集合中移除指定的孤兒交易,如果指向的父輸出現在為空,則從mapOrphanTransactionsByPrev
集合中刪除指向的父輸出。從mapOrphanTransactions
刪除指定的孤兒交易。`int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash); if (it == mapOrphanTransactions.end()) return 0; for (const CTxIn& txin : it->second.tx->vin) { std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator> auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout); if (itPrev == mapOrphanTransactionsByPrev.end()) continue; itPrev->second.erase(it); if (itPrev->second.empty()) mapOrphanTransactionsByPrev.erase(itPrev); } mapOrphanTransactions.erase(it); return 1; }`
-
把優先下載區塊的對等節點變數
nPreferredDownload
減去狀態物件的對應的fPreferredDownload
屬性。這個屬性是一個布林值,表示節點是否為優先下載區塊。這裡利用了 C++ 布林值自動轉換為整數值,真值轉換為1,假值轉換為0. -
處理正在下載區塊的節點數量
nPeersWithValidatedDownloads
變數。如果節點狀態物件的nBlocksInFlightValidHeaders
屬性不等於0,則正在下載區塊的節點數量減去1,否則減去0。 -
接下來處理
g_outbound_peers_with_protect_from_disconnect
變數,這個變數代表了出站的對等節點數量。程式碼比較簡單,不解釋。 -
從節點狀態集合
mapNodeState
中刪除指定的節點ID。
-
4. 如果節點數量不等於 nPrevNodeCount
,則把 nPrevNodeCount
設定為當前的節點數量。
{
LOCK(cs_vNodes);
vNodesSize = vNodes.size();
}
if(vNodesSize != nPrevNodeCount) {
nPrevNodeCount = vNodesSize;
if(clientInterface)
clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
}
5. 接下來檢查哪個套接字有資料要接收。
生成相關的時間變數和 3 個 fd_set 集合來處理接收資料、傳送資料及資料錯誤,然後將集合初始化為空集。
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
fd_set fdsetRecv;
fd_set fdsetSend;
fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
遍歷當前處於監聽狀態的套接字 vhListenSocket
集合,呼叫 FD_SET
方法,把當前套接字儲存在 fdsetRecv
集合中,然後呼叫標準庫的 max
方法儲存最大的套接字,設定變數 have_fds
為真。
for (const ListenSocket& hListenSocket : vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
have_fds = true;
}
遍歷所有的節點,設定相關的變數。
for (CNode* pnode : vNodes)
{
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = std::max(hSocketMax, pnode->hSocket);
have_fds = true;
if (select_send) {
FD_SET(pnode->hSocket, &fdsetSend);
continue;
}
if (select_recv) {
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
呼叫 select
方法進行連線。
int nSelect = select(have_fds ? hSocketMax + 1 : 0,&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
接下來,檢查 select
呼叫是否出錯。
if (nSelect == SOCKET_ERROR)
{
if (have_fds)
{
int nErr = WSAGetLastError();
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
for (unsigned int i = 0; i <= hSocketMax; i++)
FD_SET(i, &fdsetRecv);
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
}
6. 接下來,接收新的連線。
遍歷所有監聽的套接字,如果當前套接字有效,並且套接字在在接收資料的集合中,即新的連線請求進來,則呼叫 AcceptConnection
方法進行處理。
for (const ListenSocket& hListenSocket : vhListenSocket)
{
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
{
AcceptConnection(hListenSocket);
}
}
AcceptConnection
方法處理遠端對等節點的連線邏輯,具體如下:
-
呼叫
accept
方法,接受客戶端連線。SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);
-
計算最大的入站連線數
int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler);
-
如果連線成功,那麼呼叫地址物件的
SetSockAddr
方法來儲存儲存對等節點的地址。if (hSocket != INVALID_SOCKET) { if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) { LogPrintf("Warning: Unknown socket family\n"); } }
-
遍歷對等節點列表,如果當前對等節點屬於入站型別,則變數 nInbound 加 1。
{ LOCK(cs_vNodes); for (const CNode* pnode : vNodes) { if (pnode->fInbound) nInbound++; } }
-
如果連線不成功,則直接退出。
if (hSocket == INVALID_SOCKET) { int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK) LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); return; }
-
如果網路是不活躍的,則關閉套接字並返回。
if (!fNetworkActive) { LogPrintf("connection from %s dropped: not accepting new connections\n", addr.ToString()); CloseSocket(hSocket); return; }
-
如果是不可連線的,則關閉套接字並返回。
if (!IsSelectableSocket(hSocket)) { LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToString()); CloseSocket(hSocket); return; }
IsSelectableSocket
方法是一個內聯方法,如果 Win32 則直接返回真,否則如果select
函式返回值小於FD_SETSIZE
則返回真,否則返回假。 -
如果入站數量已經達到最大的入站數量,則呼叫
AttemptToEvictConnection
方法,找到要退出的連線。如果找不到則關閉套接字並返回。if (nInbound >= nMaxInbound) { if (!AttemptToEvictConnection()) { LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n"); CloseSocket(hSocket); return; } }
-
生成節點物件,並進行相關設定,然後加入節點集合中
vNodes
。NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CAddress addr_bind = GetBindAddress(hSocket); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; m_msgproc->InitializeNode(pnode); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); { LOCK(cs_vNodes); vNodes.push_back(pnode); }
7. 處理節點的引用數量
std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
for (CNode* pnode : vNodesCopy)
pnode->AddRef();
}
AddRef
方法會把 nRefCount
加1。
8. 遍歷所有的節點進行收發資訊處理。
-
判斷當前節點是否在讀寫、錯誤集合中。
bool recvSet = false; bool sendSet = false; bool errorSet = false; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); errorSet = FD_ISSET(pnode->hSocket, &fdsetError); }
-
如果當前節點在讀取集合或錯誤集合中,則進行下面的處理:
呼叫
recv
方法,讀取資料。char pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); }
如果讀取的數量大於0,則:呼叫
ReceiveMsgBytes
方法,從緩衝區中讀取給定數量的資料,並生成CNetMessage
物件。如果出錯,則呼叫CloseSocketDisconnect
方法,關閉套接字並斷開連線。如果讀取的數量等於0,即遠端節點已經關閉,則:呼叫
CloseSocketDisconnect
方法,關閉套接字並斷開連線。如果讀取的數量小於0,即讀取過程中出錯,則:呼叫
CloseSocketDisconnect
方法,關閉套接字並斷開連線。具體程式碼如下:
if (recvSet || errorSet) { // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } if (nBytes > 0) { bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); pnode->nProcessQueueSize += nSizeAdded; pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } } else if (nBytes == 0) { // socket closed gracefully if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed\n"); } pnode->CloseSocketDisconnect(); } else if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { if (!pnode->fDisconnect) LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); pnode->CloseSocketDisconnect(); } } }
-
如果當前節點在傳送集合中,則進行如下處理。
if (sendSet) { LOCK(pnode->cs_vSend); size_t nBytes = SocketSendData(pnode); if (nBytes) { RecordBytesSent(nBytes); } }
SocketSendData
方法主要邏輯是遍歷節點的傳送訊息集合vSendMsg
,然後呼叫send
方法傳送每一個訊息,並針對傳送正確與否進行處理;同時從傳送訊息集合vSendMsg
對應的訊息。size_t CConnman::SocketSendData(CNode *pnode) const { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; while (it != pnode->vSendMsg.end()) { const auto &data = *it; assert(data.size() > pnode->nSendOffset); int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) break; nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); } if (nBytes > 0) { pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more break; } } else { if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { LogPrintf("socket send error %s\n", NetworkErrorString(nErr)); pnode->CloseSocketDisconnect(); } } // couldn't send anything at all break; } } if (it == pnode->vSendMsg.end()) { assert(pnode->nSendOffset == 0); assert(pnode->nSendSize == 0); } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); return nSentSize; }
9. 接下來對節點的活躍性進行檢查。
int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
{
LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
{
LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
{
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
}
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
{
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
else if (!pnode->fSuccessfullyConnected)
{
LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
pnode->fDisconnect = true;
}
}
}
10. 遍歷所有節點,減少引用數。
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodesCopy)
pnode->Release();
}
Release
方法把 nRefCount
引數減1。
我是區小白,Ulord全球社群聯盟(優得社群)核心區塊鏈技術開發者,深入研究比特幣,以太坊,EOS Dash,Rsk,Java, Nodejs,PHP,Python,C++ 我希望能聚集更多區塊鏈開發者,一起學習共同進步。
為了更高效的交流探討區塊鏈開發過程中遇到的問題,我建立了一個技術交流群。
歡迎將以上問題的答案發在群中討論,或者在帖子下面留言。
比特幣區塊鏈技術交流QQ群:253968045
QQ號:77078193 或者 705706498
微信:joepeak
往期文章:
從零開始學習比特幣開發(三)接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令
從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點
原文轉載自:
從零開始學習比特幣開發(二)–如何接入比特幣網路以及原理分析