1. 程式人生 > >從零開始學習比特幣(五)--P2P網路建立的流程之套接字的讀取和傳送

從零開始學習比特幣(五)--P2P網路建立的流程之套接字的讀取和傳送

寫在前面:
本篇文章接續

從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點

從零開始學習區塊鏈技術(三)-接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令

從零開始學習區塊鏈技術(二)–如何接入比特幣網路以及其原理分析

從零開始學習區塊鏈技術(一)–從原始碼編譯比特幣

如果這篇文章看不明白,請務必先閱讀之前的文章。


從本節開始我們將開始講解比特幣系統中P2P網路是如何建立的。還記得在上一篇比特幣系統啟動的的第12步的講解中,我們提到有幾個執行緒相關的處理非常重要嗎?這一節正是基於此做了詳細的講解。

P2P 網路的建立是在上一篇文章系統啟動的第 12 步,最後時刻呼叫 CConnman::Start

方法開始的。

本部分內容在 net.cppnet_processing.cpp 等檔案中。

下面開始講解各個執行緒的具體處理。

1、ThreadSocketHandler

這個執行緒主要用來處理套接字的讀取和傳送,呼叫系統提供的相關相關底層函式進行處理,把收到的訊息轉化成 CNetMessage 型別的物件,並儲存到節點的 vRecvMsg 集合中,把待發送的訊息從 vSendMsg 集合中取出來進行傳送。

執行緒定義在 net.cpp 檔案的 1155 行。下面我們開始進行具體的解讀。

這個方法的主體是一個 while 迴圈,只要 interruptNet 變數為空,就一直迴圈。

1. 如果布林變數 fNetworkActive 為假,則斷開所有已經連線的接點。

遍歷所有的節點列表,把沒有斷開的節點設定為斷開連線。

    if (!fNetworkActive) {
        // Disconnect any connected nodes
        for (CNode* pnode : vNodes) {
            if (!pnode->fDisconnect) {
                LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
                pnode->fDisconnect = true;
            }
        }
    }

2. 接下來,斷開所有未使用的節點。

遍歷所有的節點列表,如果節點已經斷開連線,則進行下面的處理:

  • 首先,從 vNodes 集合中刪除當前節點。

  • 然後,呼叫 CloseSocketDisconnect 方法,關閉當前節點的套接字並進行清理。
    方法內部設定 fDisconnect 變數為真;如果當前套接字有效,則呼叫 CloseSocket 方法,關閉套接字。CloseSocket 方法針對 win32 系統呼叫 closesocket 方法,別的系統呼叫 close 來關閉套接字,然後設定套接字為 INVALID_SOCKET
    具體方法如下所示:

    ​```void CNode::CloseSocketDisconnect()
      {
          fDisconnect = true;
          LOCK(cs_hSocket);
          if (hSocket != INVALID_SOCKET)
          {
              LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
              CloseSocket(hSocket);
          }
      }
    ​```
    
  • 再然後,呼叫 Release 方法,把節點的引用數量 nRefCount 減一。

  • 最後,把當前節點放入 vNodesDisconnected 集合中。

3. 刪除所有已斷開連線的節點。

遍歷所有已經斷開連線的節點的集合 vNodesDisconnected,如果當前節點的引用數量 nRefCount 小於等於0,進行下面的處理:

  • 呼叫巨集 TRY_LOCK 獲取 cs_inventory 鎖。如果可以獲取,則獲取 cs_vSend 鎖。如果可以獲取,則設定變數 fDelete 為真。

  • 如果變數 fDelete 為真,則從 vNodesDisconnected 集合中移除當前的節點,然後呼叫 DeleteNode 方法,刪除節點。

    下面講解下 DeleteNode 方法。方法內部首先設定變數 fUpdateConnectionTime 為真,然後呼叫訊息處理介面的 NetEventsInterfaceFinalizeNode 方法,最終處理節點。如果變數 fUpdateConnectionTimeFinalizeNode 方法中被設定為真,則呼叫地址管理器物件的 Connected 方法,進行處理。最後,刪除當前節點。

    程式碼如下:

    `void CConnman::DeleteNode(CNode* pnode)
    {
        assert(pnode);
        bool fUpdateConnectionTime = false;
        m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
        if(fUpdateConnectionTime) {
            addrman.Connected(pnode->addr);
        }
        delete pnode;
    }`
    

    FinalizeNode 方法是一個純虛擬函式,具體由 net_processing.cpp 檔案中的 PeerLogicValidation 物件的 FinalizeNode 方法實現。下面,我們來看這個函式的處理。

    • 設定引數 fUpdateConnectionTime 預設為真。

    • 呼叫 State 方法,獲取節點狀態物件的指標。

      方法內部根據節點 ID,從 mapNodeState 集合中找到並返回對應的節點狀態物件。如果不存在,則返回空指標。

    • 如果我們已經在這個對等節點上同步了區塊頭部,即節點狀態物件的 fSyncStarted 屬性為真,則設定變數 nSyncStarted 減一。

    • 如果對等節點的不良積分即節點狀態物件的 nMisbehavior 屬性等於0,且對等節點已經建立了完整的連線即節點狀態物件的 fCurrentlyConnected 屬性為真,則設定變數 fUpdateConnectionTime 為真。

    • 遍歷狀態物件的 vBlocksInFlight 集合,並刪除所有的條目。

    • 呼叫 EraseOrphansFor 方法,刪除與這個對等節點相關聯的孤兒交易。

      方法內部遍歷 mapOrphanTransactions 集合,如果當前孤兒交易的來自於指定的對等節點,則呼叫 EraseOrphanTx 方法進行刪除。

      後者根據交易的雜湊ID,查詢 mapOrphanTransactions 集合對應的孤兒交易。如果找不到,則返回。如果找到則遍歷交易的所有輸入,如果當前輸入指向的父輸出不在 mapOrphanTransactionsByPrev 集合中,則處理下一個;否則,在返回的集合中移除指定的孤兒交易,如果指向的父輸出現在為空,則從 mapOrphanTransactionsByPrev 集合中刪除指向的父輸出。從 mapOrphanTransactions 刪除指定的孤兒交易。

      `int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans)
      {
          std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash);
          if (it == mapOrphanTransactions.end())
              return 0;
          for (const CTxIn& txin : it->second.tx->vin)
          {
              std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>
      
              auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout);
              if (itPrev == mapOrphanTransactionsByPrev.end())
                  continue;
              itPrev->second.erase(it);
              if (itPrev->second.empty())
                  mapOrphanTransactionsByPrev.erase(itPrev);
          }
          mapOrphanTransactions.erase(it);
          return 1;
      }`
      
    • 把優先下載區塊的對等節點變數nPreferredDownload 減去狀態物件的對應的 fPreferredDownload 屬性。這個屬性是一個布林值,表示節點是否為優先下載區塊。這裡利用了 C++ 布林值自動轉換為整數值,真值轉換為1,假值轉換為0.

    • 處理正在下載區塊的節點數量 nPeersWithValidatedDownloads 變數。如果節點狀態物件的 nBlocksInFlightValidHeaders 屬性不等於0,則正在下載區塊的節點數量減去1,否則減去0。

    • 接下來處理 g_outbound_peers_with_protect_from_disconnect 變數,這個變數代表了出站的對等節點數量。程式碼比較簡單,不解釋。

    • 從節點狀態集合 mapNodeState 中刪除指定的節點ID。

4. 如果節點數量不等於 nPrevNodeCount,則把 nPrevNodeCount 設定為當前的節點數量。

	{
        LOCK(cs_vNodes);
        vNodesSize = vNodes.size();
    }
    if(vNodesSize != nPrevNodeCount) {
        nPrevNodeCount = vNodesSize;
        if(clientInterface)
            clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
    }

5. 接下來檢查哪個套接字有資料要接收。

生成相關的時間變數和 3 個 fd_set 集合來處理接收資料、傳送資料及資料錯誤,然後將集合初始化為空集。

struct timeval timeout;
timeout.tv_sec  = 0;
timeout.tv_usec = 50000; // frequency to poll pnode->vSend

fd_set fdsetRecv;
fd_set fdsetSend;
fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);

遍歷當前處於監聽狀態的套接字 vhListenSocket 集合,呼叫 FD_SET 方法,把當前套接字儲存在 fdsetRecv 集合中,然後呼叫標準庫的 max 方法儲存最大的套接字,設定變數 have_fds 為真。

for (const ListenSocket& hListenSocket : vhListenSocket) {
    FD_SET(hListenSocket.socket, &fdsetRecv);
    hSocketMax = std::max(hSocketMax, hListenSocket.socket);
    have_fds = true;
}

遍歷所有的節點,設定相關的變數。

for (CNode* pnode : vNodes)
{
    bool select_recv = !pnode->fPauseRecv;
    bool select_send;
    {
        LOCK(pnode->cs_vSend);
        select_send = !pnode->vSendMsg.empty();
    }

    LOCK(pnode->cs_hSocket);
    if (pnode->hSocket == INVALID_SOCKET)
        continue;

    FD_SET(pnode->hSocket, &fdsetError);
    hSocketMax = std::max(hSocketMax, pnode->hSocket);
    have_fds = true;

    if (select_send) {
        FD_SET(pnode->hSocket, &fdsetSend);
        continue;
    }
    if (select_recv) {
        FD_SET(pnode->hSocket, &fdsetRecv);
    }
}

呼叫 select 方法進行連線。

int nSelect = select(have_fds ? hSocketMax + 1 : 0,&fdsetRecv, &fdsetSend, &fdsetError, &timeout);

接下來,檢查 select 呼叫是否出錯。

if (nSelect == SOCKET_ERROR)
{
    if (have_fds)
    {
        int nErr = WSAGetLastError();
        LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
        for (unsigned int i = 0; i <= hSocketMax; i++)
            FD_SET(i, &fdsetRecv);
    }
    FD_ZERO(&fdsetSend);
    FD_ZERO(&fdsetError);
    if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
        return;
}

6. 接下來,接收新的連線。

遍歷所有監聽的套接字,如果當前套接字有效,並且套接字在在接收資料的集合中,即新的連線請求進來,則呼叫 AcceptConnection 方法進行處理。

for (const ListenSocket& hListenSocket : vhListenSocket)
{
    if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
    {
        AcceptConnection(hListenSocket);
    }
}

AcceptConnection 方法處理遠端對等節點的連線邏輯,具體如下:

  • 呼叫 accept 方法,接受客戶端連線。

    SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);

  • 計算最大的入站連線數

    int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler);

  • 如果連線成功,那麼呼叫地址物件的 SetSockAddr 方法來儲存儲存對等節點的地址。

    if (hSocket != INVALID_SOCKET) {
        if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) {
            LogPrintf("Warning: Unknown socket family\n");
        }
    }
    
  • 遍歷對等節點列表,如果當前對等節點屬於入站型別,則變數 nInbound 加 1。

    {
        LOCK(cs_vNodes);
        for (const CNode* pnode : vNodes) {
            if (pnode->fInbound) nInbound++;
        }
    }
    
  • 如果連線不成功,則直接退出。

    if (hSocket == INVALID_SOCKET)
    {
        int nErr = WSAGetLastError();
        if (nErr != WSAEWOULDBLOCK)
            LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr));
        return;
    }
    
  • 如果網路是不活躍的,則關閉套接字並返回。

    if (!fNetworkActive) {
        LogPrintf("connection from %s dropped: not accepting new connections\n", addr.ToString());
        CloseSocket(hSocket);
        return;
    }
    
  • 如果是不可連線的,則關閉套接字並返回。

    if (!IsSelectableSocket(hSocket))
    {
        LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToString());
        CloseSocket(hSocket);
        return;
    }
    

    IsSelectableSocket 方法是一個內聯方法,如果 Win32 則直接返回真,否則如果 select 函式返回值小於 FD_SETSIZE 則返回真,否則返回假。

  • 如果入站數量已經達到最大的入站數量,則呼叫 AttemptToEvictConnection 方法,找到要退出的連線。如果找不到則關閉套接字並返回。

    if (nInbound >= nMaxInbound)
    {
        if (!AttemptToEvictConnection()) {
            LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n");
            CloseSocket(hSocket);
            return;
        }
    }
    
  • 生成節點物件,並進行相關設定,然後加入節點集合中 vNodes

    NodeId id = GetNewNodeId();
            uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
            CAddress addr_bind = GetBindAddress(hSocket);
            CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true);
            pnode->AddRef();
            pnode->fWhitelisted = whitelisted;
            m_msgproc->InitializeNode(pnode);
            LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());
            {
                LOCK(cs_vNodes);
                vNodes.push_back(pnode);
            }
    

7. 處理節點的引用數量

std::vector<CNode*> vNodesCopy;
{
    LOCK(cs_vNodes);
    vNodesCopy = vNodes;
    for (CNode* pnode : vNodesCopy)
        pnode->AddRef();
}

AddRef 方法會把 nRefCount 加1。

8. 遍歷所有的節點進行收發資訊處理。

  • 判斷當前節點是否在讀寫、錯誤集合中。

    bool recvSet = false;
    bool sendSet = false;
    bool errorSet = false;
    {
        LOCK(pnode->cs_hSocket);
        if (pnode->hSocket == INVALID_SOCKET)
            continue;
        recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
        sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
        errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
    }
    
  • 如果當前節點在讀取集合或錯誤集合中,則進行下面的處理:

    呼叫 recv 方法,讀取資料。

    char pchBuf[0x10000];
    int nBytes = 0;
    {
        LOCK(pnode->cs_hSocket);
        if (pnode->hSocket == INVALID_SOCKET)
            continue;
        nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
    }
    

    如果讀取的數量大於0,則:呼叫 ReceiveMsgBytes 方法,從緩衝區中讀取給定數量的資料,並生成 CNetMessage 物件。如果出錯,則呼叫 CloseSocketDisconnect 方法,關閉套接字並斷開連線。

    如果讀取的數量等於0,即遠端節點已經關閉,則:呼叫 CloseSocketDisconnect 方法,關閉套接字並斷開連線。

    如果讀取的數量小於0,即讀取過程中出錯,則:呼叫 CloseSocketDisconnect 方法,關閉套接字並斷開連線。

    具體程式碼如下:

    if (recvSet || errorSet)
    {
        // typical socket buffer is 8K-64K
        char pchBuf[0x10000];
        int nBytes = 0;
        {
            LOCK(pnode->cs_hSocket);
            if (pnode->hSocket == INVALID_SOCKET)
                continue;
            nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
        }
        if (nBytes > 0)
        {
            bool notify = false;
            if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
                pnode->CloseSocketDisconnect();
            RecordBytesRecv(nBytes);
            if (notify) {
                size_t nSizeAdded = 0;
                auto it(pnode->vRecvMsg.begin());
                for (; it != pnode->vRecvMsg.end(); ++it) {
                    if (!it->complete())
                        break;
                    nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
                }
                {
                    LOCK(pnode->cs_vProcessMsg);
                    pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
                    pnode->nProcessQueueSize += nSizeAdded;
                    pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
                }
                WakeMessageHandler();
            }
        }
        else if (nBytes == 0)
        {
            // socket closed gracefully
            if (!pnode->fDisconnect) {
                LogPrint(BCLog::NET, "socket closed\n");
            }
            pnode->CloseSocketDisconnect();
        }
        else if (nBytes < 0)
        {
            // error
            int nErr = WSAGetLastError();
            if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
            {
                if (!pnode->fDisconnect)
                    LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
                pnode->CloseSocketDisconnect();
            }
        }
    }
    
  • 如果當前節點在傳送集合中,則進行如下處理。

    if (sendSet)
    {
        LOCK(pnode->cs_vSend);
        size_t nBytes = SocketSendData(pnode);
        if (nBytes) {
            RecordBytesSent(nBytes);
        }
    }
    

    SocketSendData 方法主要邏輯是遍歷節點的傳送訊息集合 vSendMsg,然後呼叫 send 方法傳送每一個訊息,並針對傳送正確與否進行處理;同時從傳送訊息集合 vSendMsg 對應的訊息。

    size_t CConnman::SocketSendData(CNode *pnode) const
              {
                auto it = pnode->vSendMsg.begin();
                size_t nSentSize = 0;
    
                while (it != pnode->vSendMsg.end()) {
                    const auto &data = *it;
                    assert(data.size() > pnode->nSendOffset);
                    int nBytes = 0;
                    {
                        LOCK(pnode->cs_hSocket);
                        if (pnode->hSocket == INVALID_SOCKET)
                            break;
                        nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
                    }
                    if (nBytes > 0) {
                        pnode->nLastSend = GetSystemTimeInSeconds();
                        pnode->nSendBytes += nBytes;
                        pnode->nSendOffset += nBytes;
                        nSentSize += nBytes;
                        if (pnode->nSendOffset == data.size()) {
                            pnode->nSendOffset = 0;
                            pnode->nSendSize -= data.size();
                            pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
                            it++;
                        } else {
                            // could not send full message; stop sending more
                            break;
                        }
                    } else {
                        if (nBytes < 0) {
                            // error
                            int nErr = WSAGetLastError();
                            if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
                            {
                                LogPrintf("socket send error %s\n", NetworkErrorString(nErr));
                                pnode->CloseSocketDisconnect();
                            }
                        }
                        // couldn't send anything at all
                        break;
                    }
                }
    
                if (it == pnode->vSendMsg.end()) {
                    assert(pnode->nSendOffset == 0);
                    assert(pnode->nSendSize == 0);
                }
                pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
                return nSentSize;
            }
    

9. 接下來對節點的活躍性進行檢查。

int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
    if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
    {
        LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
        pnode->fDisconnect = true;
    }
    else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
    {
        LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
        pnode->fDisconnect = true;
    }
    else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
    {
        LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
        pnode->fDisconnect = true;
    }
    else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
    {
        LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
        pnode->fDisconnect = true;
    }
    else if (!pnode->fSuccessfullyConnected)
    {
        LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
        pnode->fDisconnect = true;
    }
} 
}

10. 遍歷所有節點,減少引用數。

{
    LOCK(cs_vNodes);
    for (CNode* pnode : vNodesCopy)
        pnode->Release();
}

Release 方法把 nRefCount 引數減1。


我是區小白,Ulord全球社群聯盟(優得社群)核心區塊鏈技術開發者,深入研究比特幣,以太坊,EOS Dash,Rsk,Java, Nodejs,PHP,Python,C++ 我希望能聚集更多區塊鏈開發者,一起學習共同進步。
為了更高效的交流探討區塊鏈開發過程中遇到的問題,我建立了一個技術交流群。
歡迎將以上問題的答案發在群中討論,或者在帖子下面留言。

比特幣區塊鏈技術交流QQ群:253968045
QQ號:77078193 或者 705706498
微信:joepeak

往期文章:

從零開始學習比特幣開發(一)–從原始碼編譯比特幣

從零開始學習比特幣開發(二)如何接入比特幣網路以及步驟分析

從零開始學習比特幣開發(三)接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令

從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點

原文轉載自:

優得社群–從0開始學習比特幣專題

從零開始學習比特幣開發(一)–從原始碼編譯比特幣

從零開始學習比特幣開發(二)–如何接入比特幣網路以及原理分析

從零開始學習比特幣開發(三)–接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令

從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點