比特幣原始碼分析-網路(二)
比特幣原始碼分析-網路(二)
https://www.jianshu.com/p/4b42d8698f35
眾所周知,比特幣網路是採用的P2P網路體系,所以,沒有明顯的客戶端與服務端的區別或者是概念,每一個節點既是自身的客戶端,又是其它節點的服務端。
在sync.h
中,定義了 CSemaphore
,它包裝了系統底層的訊號量機制,對wait(), try_wait(),post()
實現了封裝,程式碼如下:
class CSemaphore { private: boost::condition_variable condition; boost::mutex mutex; int value; public: CSemaphore(int init) : value(init) {} void wait() {} bool try_wait() {} void post() {} };
用於控制網路連線時的最大數量,每一個網路節點的最大連線數受限於訊號量所允許的最大值。
下面我們按照一個網路連線從傳送到接收到請求返回的這麼個思路,來梳理程式碼邏輯。
004.png
CNode
CNode定義在bitcoin.cpp
中,是比較重要的也是較為複雜的一個類,節點的所有資訊都包含在內:
class CNode { SOCKET sock; //用來連線的socket控制代碼 CDataStream vSend; //傳送訊息 CDataStream vRecv; //接收訊息 uint32_t nHeaderStart; //頭資訊開始 uint32_t nMessageStart; int nVersion; //版本資訊 std::string strSubVer; int nStartingHeight; //起始高度 std::vector<CAddress> *vAddr; //ip地址(網路上節點的連線資訊) int ban; int64_t doneAfter; CAddress you; };
在上述定義中,最主要的是 std::vector<CAddress> *vAddr;
它包含了連線的所有節點,如果有節點連線進來,就加入到這個vector中;如果某個節點斷開連線,就從這個vector中刪除。
在net.h
中,對CNode進行了詳細的定義(所有關於節點的資訊,都進行了詳細羅列),由於篇幅較長,只羅列其中的一些關鍵結構:
/** Information about a peer */ class CNode { friend class CConnman; public: SOCKET hSocket; //連線的socket控制代碼 size_t nSendSize; //所有vSendMsg條目的總大小。 size_t nSendOffset; //已經發送的第一個vSendMsg內的偏移量。 std::deque<std::vector<uint8_t>> vSendMsg;//傳送訊息的陣列 ... const CAddress addr;//節點地址資訊 std::atomic<int> nVersion;//版本資訊 CBloomFilter *pfilter;//海量過濾器 const NodeId id;//節點ID protected: mapMsgCmdSize mapSendBytesPerMsgCmd; mapMsgCmdSize mapRecvBytesPerMsgCmd; public: uint256 hashContinue; std::atomic<int> nStartingHeight; private: std::list<CNetMessage> vRecvMsg;//接收訊息的陣列 public: //用來解析接收到的訊息資料 bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool &complete); //用來設定接收版本 void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; } int GetRecvVersion() { return nRecvVersion; } void SetSendVersion(int nVersionIn); int GetSendVersion() const; //用來發送地址 void PushAddress(const CAddress &_addr, FastRandomContext &insecure_rand) { // Known checking here is only to save space from duplicates. // SendMessages will filter it again for knowns that were added // after addresses were pushed. if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) { if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) { vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] = _addr; } else { vAddrToSend.push_back(_addr); } } } //用來發送inventory訊息 void PushInventory(const CInv &inv) { LOCK(cs_inventory); if (inv.type == MSG_TX) { if (!filterInventoryKnown.contains(inv.hash)) { setInventoryTxToSend.insert(inv.hash); } } else if (inv.type == MSG_BLOCK) { vInventoryBlockToSend.push_back(inv.hash); } } void PushBlockHash(const uint256 &hash) { LOCK(cs_inventory); vBlockHashesToAnnounce.push_back(hash); } };
傳送訊息
CDataStream這個類主要是包裝了一個帶有雙向緩衝區的介面, 它過載了 >> 和 <<,使用上述序列化讀取和寫入未格式化的資料模板,以線性時間填充資料;
class CDataStream {
protected:
typedef CSerializeData vector_type;
vector_type vch;
unsigned int nReadPos;
int nType;
int nVersion;
public:
template <typename T> CDataStream &operator<<(const T &obj) {
// Serialize to this stream
::Serialize(*this, obj);
return (*this);
}
template <typename T> CDataStream &operator>>(T &obj) {
// Unserialize from this stream
::Unserialize(*this, obj);
return (*this);
}
void read(char *pch, size_t nSize) {
if (nSize == 0) {
return;
}
// Read from the beginning of the buffer
unsigned int nReadPosNext = nReadPos + nSize;
if (nReadPosNext >= vch.size()) {
if (nReadPosNext > vch.size()) {
throw std::ios_base::failure(
"CDataStream::read(): end of data");
}
memcpy(pch, &vch[nReadPos], nSize);
nReadPos = 0;
vch.clear();
return;
}
memcpy(pch, &vch[nReadPos], nSize);
nReadPos = nReadPosNext;
}
void write(const char *pch, size_t nSize) {
// Write to the end of the buffer
vch.insert(vch.end(), pch, pch + nSize);
}
所以,當我們需要傳送訊息時,首先會把資料放到CDataStream
的資料流中,構造好完整的訊息,但此時的訊息格式是網路無法識別的,下一步,將構造好的訊息放入到CSerializeData
(類似一個訊息佇列)進行序列化,序列化之後,我們就可以把訊息放到SocketSendData
中傳送出去。
CSerializeData 的格式如下:
// Byte-vector that clears its contents before deletion.
typedef std::vector<char, zero_after_free_allocator<char>> CSerializeData;
SocketSendData 的定義如下:
size_t CConnman::SocketSendData(CNode *pnode) const {
AssertLockHeld(pnode->cs_vSend);
size_t nSentSize = 0;
size_t nMsgCount = 0;
for (const auto &data : pnode->vSendMsg) {
assert(data.size() > pnode->nSendOffset);
int nBytes = 0;
...
}
pnode->vSendMsg.erase(pnode->vSendMsg.begin(),
pnode->vSendMsg.begin() + nMsgCount);
if (pnode->vSendMsg.empty()) {
assert(pnode->nSendOffset == 0);
assert(pnode->nSendSize == 0);
}
return nSentSize;
}
接收訊息
接收訊息的工作,主要是由 ThreadSocketHandler
來完成的,
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) {
pnode->CloseSocketDisconnect();
}
隨後,通過 ReceiveMsgBytes 把從其它節點接收到的資料解析為單個數據,然後放回到訊息佇列,最後由ThreadMessageHandler來進行最後的處理。
ReceiveMsgBytes解析資料的主要流程如下,呼叫的是CNetMessage下的readHeader和readData方法,隨後,使用complete()進行一次判定,看解析是否完成:
// Absorb network data.
int handled;
if (!msg.in_data) {
handled = msg.readHeader(pch, nBytes);
} else {
handled = msg.readData(pch, nBytes);
}
readHeader
readHeader 主要用來解析訊息頭,由上一篇文章我們能夠知道,一個訊息頭,至少24位元組,如果小於24位元組直接退出,如果滿足這個條件,先把接收到的資料的開始部分複製到訊息頭資料流中(hdrbuf),再反格式化成訊息頭(hdr)。訊息資料最大為MAX_SIZE(0x02000000),如果大於這個值,證明出錯,直接退出。
int CNetMessage::readHeader(const char *pch, unsigned int nBytes) {
// copy data to temporary parsing buffer
unsigned int nRemaining = 24 - nHdrPos;
unsigned int nCopy = std::min(nRemaining, nBytes);
memcpy(&hdrbuf[nHdrPos], pch, nCopy);
nHdrPos += nCopy;
// if header incomplete, exit
if (nHdrPos < 24) {
return nCopy;
}
// deserialize to CMessageHeader
try {
hdrbuf >> hdr;
} catch (const std::exception &) {
return -1;
}
// reject messages larger than MAX_SIZE
if (hdr.nMessageSize > MAX_SIZE) {
return -1;
}
// switch state to reading message data
in_data = true;
return nCopy;
}
readData
readData 主要用來解析訊息體,訊息的資料部分複製到訊息資料流中(vRecv)來處理,如果 vRecv 的空間不夠,會進行擴容,但最多分配256 KB,不能超過總訊息大小。
int CNetMessage::readData(const char *pch, unsigned int nBytes) {
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
unsigned int nCopy = std::min(nRemaining, nBytes);
if (vRecv.size() < nDataPos + nCopy) {
// Allocate up to 256 KiB ahead, but never more than the total message
// size.
vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
}
hasher.Write((const uint8_t *)pch, nCopy);
memcpy(&vRecv[nDataPos], pch, nCopy);
nDataPos += nCopy;
return nCopy;
}
緩衝區
在 net.h 檔案中,我們能夠看到如下定義:
//接收訊息緩衝區
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
//傳送訊息緩衝區
static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
我們將接收或者傳送的資料放入到緩衝區,我們可以通過如下函式,分別對他們呼叫,加速我們的處理過程:
unsigned int CConnman::GetReceiveFloodSize() const {
return nReceiveFloodSize;
}
unsigned int CConnman::GetSendBufferSize() const {
return nSendBufferMaxSize;
}
祝您生活愉快!