一種基於Qt的可伸縮的全非同步C/S架構伺服器實現(五) 單層無中心叢集
五、單層無中心叢集
對40萬用戶規模以內的伺服器,使用星形的無中心連線是較為簡便的實現方式。分佈在各個物理伺服器上的服務程序共同工作,每個程序承擔若干連線。為了實現這個功能,需要解決幾個關鍵問題。
5.1、跨伺服器傳輸通道
設計在高速區域網中的連線可直接採用TCP,並用第二章介紹的網路傳輸工具、第三章介紹的流水線執行緒池共同搭建。引用上述兩個工具的程式碼在cluster子資料夾的 zp_clusterterm.h中定義:
ZPNetwork::zp_net_Engine * m_pClusterNet; ZPTaskEngine::zp_pipeline * m_pClusterEng;
伺服器在專用的叢集網路中監聽,需要引數如下:
1、監聽的地址、埠
2、本節點唯一名稱
3、對伺服器叢集內其他節點發布的連線埠、地址
4、對公網客戶端釋出的連線埠、地址。
比如,伺服器高速區域網網段可能是 10.129.XX.XX,而有些伺服器可能以虛擬機器(192.168.11.XX)+NAT(10.129.XX.XX)的方式在內網的子網中對映,因此,需要告訴別的伺服器節點,如何連線到自己。同時,對公網客戶端來說,每個伺服器的連線地址又不同了。很有可能也是通過NAT的方式,把數十個內網IP對映到一個外網出口的連續埠上去。這個策略的配置頁面如下:
叢集的連線策略是,新的伺服器程序選取任意一個現有節點,連線後,通過叢集內廣播系統自動接收其它各個節點的地址,並繼續發起連線,直到與現有節點兩兩相通為止。
為了支援這個策略,叢集傳輸需要定義一些指令。
5.1.1 叢集指令
叢集指令在 cluster 資料夾的cross_svr_message.h 定義:
指令由頭部、載荷兩部分組成。#ifndef CROSS_SVR_MESSAGES_H #define CROSS_SVR_MESSAGES_H #include <qglobal.h> namespace ZP_Cluster{ #pragma pack (push,1) typedef struct tag_cross_svr_message{ struct tag_header{ quint16 Mark; //Always be 0x1234 quint8 messagetype; quint32 data_length; } hearder; union uni_payload{ quint8 data[1]; struct tag_CSM_heartBeating{ quint32 nClients; } heartBeating; struct tag_CSM_BasicInfo{ quint8 name [64]; quint8 Address_LAN[64]; quint16 port_LAN; quint8 Address_Pub[64]; quint16 port_Pub; } basicInfo; struct tag_CSM_Broadcast{ quint8 name [64]; quint8 Address_LAN[64]; quint16 port_LAN; quint8 Address_Pub[64]; quint16 port_Pub; } broadcastMsg[1]; } payload; } CROSS_SVR_MSG; #pragma pack(pop) } #endif // CROSS_SVR_MESSAGES_H
頭部header說明:
Mark是一個固定的起始,用於驗證流解譯的正確性。如果流解譯不正確,第二塊指令的起始將不是這個值。
messagetype 是一個用來標定指令型別的位元組,決定了載荷聯合體該採用哪個策略解譯
data_length是長度,這裡代表載荷的長度
載荷 payload說明:
有三種指令結構體, 心跳結構體用來維持各個伺服器之間的心跳,基本資訊(basicInfo)用於在連線建立後,向對方告知本節點的資訊。廣播結構體是用於在本機的伺服器列表發生變更時,向所有現有節點廣播新的列表。
對傳輸的使用者資料,直接儲存在data中。
5.1.2 連線流程
第一步,準備加入叢集的伺服器選取叢集中任一個節點作為物件,發起P2P連線。
第二步,雙方互換資訊(basicInfo)
第三步,雙方將對方的資訊新增到本地的伺服器節點表中。伺服器節點表是一群zp_ClusterNode類的例項,該類由ZPTaskEngine::zp_plTaskBase派生。這個基類在第三章有介紹。伺服器節點物件的例項負責具體的指令解譯。該列表如下(在cluster子資料夾的 zp_clusterterm.h中定義):
//important hashes. server name to socket, socket to server name
QMutex m_hash_mutex;
QMap<QString , zp_ClusterNode *> m_hash_Name2node;
QMap<QObject *,zp_ClusterNode *> m_hash_sock2node;
節點的指標存放在對映中,一個是名稱到物件的對映,一個是套接字到物件的對映
第四步,由於節點表發生變化,因此,會觸發對現有節點的廣播(broadCasting)
第五步,各個節點收到廣播後,會比較廣播中的節點資訊和自己目前的節點資訊,併發起向新增節點的連線。
最終,當一對一連線完成,系統重新處於穩定狀態。解譯這段資訊的程式碼片段在中cluster資料夾zp_clusternode.cpp的deal_current_message_block方法中實現:
switch(m_currentHeader.messagetype)
{
\\...
case 0x01://basicInfo, when connection established, this message should be used
if (bytesLeft==0)
{
QString strName ((const char *)pMsg->payload.basicInfo.name);
if (strName != m_pTerm->name())
{
this->m_strTermName = strName;
m_nPortLAN = pMsg->payload.basicInfo.port_LAN;
m_addrLAN = QHostAddress((const char *)pMsg->payload.basicInfo.Address_LAN);
m_nPortPub = pMsg->payload.basicInfo.port_Pub;
m_addrPub = QHostAddress((const char *)pMsg->payload.basicInfo.Address_Pub);
if (false==m_pTerm->regisitNewServer(this))
{
this->m_strTermName.clear();
emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName);
emit evt_close_client(this->sock());
}
else
{
emit evt_NewSvrConnected(this->termName());
m_pTerm->BroadcastServers();
}
}
else
{
emit evt_Message(this,tr("Can not connect to it-self, Loopback connections is forbidden."));
emit evt_close_client(this->sock());
}
}
break;
case 0x02: //Server - broadcast messages
if (bytesLeft==0)
{
int nSvrs = pMsg->hearder.data_length / sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast);
for (int i=0;i<nSvrs;i++)
{
QString strName ((const char *)pMsg->payload.broadcastMsg[i].name);
if (strName != m_pTerm->name() && m_pTerm->SvrNodeFromName(strName)==NULL)
{
QHostAddress addrToConnectTo((const char *)pMsg->payload.broadcastMsg[i].Address_LAN);
quint16 PortToConnectTo = pMsg->payload.broadcastMsg[i].port_LAN;
if (strName > m_pTerm->name())
emit evt_connect_to(addrToConnectTo,PortToConnectTo,false);
else
emit evt_Message(this,tr("Name %1 <= %2, omitted.").arg(strName).arg(m_pTerm->name()));
}
}
}
break;
...
5.2 流式解析
TCP 是面向連線的流式傳輸。對使用者傳送的一個大資料包,雖然保證收發的完整性,旦接收方每次接收的資料片段長度是有限的,也是不定的。一種簡單的思路是按照指令結構體的長度,直接快取完整的資料包,而後集中處理。這樣有一個問題,在資料包很大時,記憶體開銷過高。因此,本應用設計的思路是邊接收、邊處理。具體步驟:
1、檢查收到的頭部是否合法
2、儲存當前指令的頭部
3、一旦得到一段載荷資料,就回調一次處理過程,處理過程根據需求等待更多資料,或者處理完後清空快取。這對一次傳輸100MB資料的應用是很關鍵的。流式處理需要完成的步驟關鍵程式碼如下:
5.2.1 資料接收
在 zp_ClusterTerm的接收槽裡,直接把資料片段壓入zp_ClusterNode物件的佇列中,並壓入流水線。
//some data arrival
void zp_ClusterTerm::on_evt_Data_recieved(QObject * clientHandle,QByteArray datablock )
{
//Push Clients to nodes if it is not exist
zp_ClusterNode * pClientNode = ...;
int nblocks = pClientNode->push_new_data(datablock);
if (nblocks<=1)
m_pClusterEng->pushTask(pClientNode);
//...
}
oushTask方法把Block壓入zp_ClusterNode的處理佇列m_list_Rawdata裡,這部分的狀態變數如下:
class zp_ClusterNode : public ZPTaskEngine::zp_plTaskBase
{
//.....
//Data Process
//The raw data queue and its mutex
QList<QByteArray> m_list_RawData;
QMutex m_mutex_rawData;
//The current Read Offset, from m_list_RawData's beginning
int m_currentReadOffset;
//Current Message Offset, according to m_currentHeader
int m_currentMessageSize;
//Current un-procssed message block.for large blocks,
//this array will be re-setted as soon as some part of data has been
//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
QByteArray m_currentBlock;
CROSS_SVR_MSG::tag_header m_currentHeader;
//...
};
變數 m_currentReadOffset 指的是佇列的首部元素已經處理的偏移。比如首部的Block有2341位元組,處理了1099位元組,本指令已經結束,則此值為1099
變數 m_currentMessageSize 指的是當前接收的資訊的大小。比如100MB 的資訊,接受了23MB,這個值就是23MB
變數 m_currentBlock 是當前的快取。這個快取會不斷的遞交處理,負責處理的程式可以根據情況適時清空它。對短指令,不清也是可以的。
變數 m_currentHeader 是當前的資訊頭部,這個值記錄了當前結構體的首部資訊。
5.2.2 資料處理
線上程池中,會呼叫 zp_ClusterNode::run 虛擬方法。這個方法的關鍵程式碼如下(實際程式碼因為有執行緒同步,要複雜一些):
int zp_ClusterNode::run()
{
//nMessageBlockSize 是靜態變數,表示最多處理幾個塊就釋放CPU給其他節點
int nMessage = m_nMessageBlockSize;
int nCurrSz = -1;
while (--nMessage>=0 && nCurrSz!=0 )
{
QByteArray block;
block = *m_list_RawData.begin();
m_currentReadOffset = filter_message(block,m_currentReadOffset);
if (m_currentReadOffset >= block.size())
{
m_list_RawData.pop_front();
m_currentReadOffset = 0;
}
nCurrSz = m_list_RawData.size();
}
if (nCurrSz==0)
return 0;
return -1;
}
其中,filter_message 是對資訊進行初步處理,輸入當前佇列的首部、處理偏移,返回新的處理偏移
這個方法的關鍵程式碼如下:
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used.
int zp_ClusterNode::filter_message(QByteArray block, int offset)
{
const int blocklen = block.length();
while (blocklen>offset)
{
const char * dataptr = block.constData();
//先確保資訊的頭標誌被接收
while (m_currentMessageSize<2 && blocklen>offset )
{
m_currentBlock.push_back(dataptr[offset++]);
m_currentMessageSize++;
}
if (m_currentMessageSize < 2) //First 2 byte not complete
continue;
if (m_currentMessageSize==2)
{
const char * headerptr = m_currentBlock.constData();
memcpy((void *)&m_currentHeader,headerptr,2);
}
const char * ptrCurrData = m_currentBlock.constData();
//判斷頭2個位元組是不是1234
if (m_currentHeader.Mark == 0x1234)
//Valid Message
{
//試圖接收完整的頭部資訊
if (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset)
{
int nCpy = sizeof(CROSS_SVR_MSG::tag_header) - m_currentMessageSize;
if (nCpy > blocklen - offset)
nCpy = blocklen - offset;
QByteArray arrCpy(dataptr+offset,nCpy);
m_currentBlock.push_back(arrCpy);
offset += nCpy;
m_currentMessageSize += nCpy;
}
//如果頭部還沒收完則返回
if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed.
continue;
//除了頭部以外,還有資料可用,並且頭部剛剛接收完
else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed.
{
//儲存頭部
const char * headerptr = m_currentBlock.constData();
memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header));
//繼續處理後續的載荷
if (block.length()>offset)
{
//確定還有多少位元組沒有接收
qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
-m_currentMessageSize ;
//繼續接收載荷
if (bitLeft>0 && blocklen>offset)
{
int nCpy = bitLeft;
if (nCpy > blocklen - offset)
nCpy = blocklen - offset;
QByteArray arrCpy(dataptr+offset,nCpy);
m_currentBlock.push_back(arrCpy);
offset += nCpy;
m_currentMessageSize += nCpy;
bitLeft -= nCpy;
}
//處理一次資料
deal_current_message_block();
if (bitLeft>0)
continue;
//This Message is Over. Start a new one.
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
continue;
}
}
//除了頭部以外,還有資料可用
else
{
if (block.length()>offset)
{
//確定還有多少位元組沒有接收
qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
-m_currentMessageSize ;
//繼續接收載荷
if (bitLeft>0 && blocklen>offset)
{
int nCpy = bitLeft;
if (nCpy > blocklen - offset)
nCpy = blocklen - offset;
QByteArray arrCpy(dataptr+offset,nCpy);
m_currentBlock.push_back(arrCpy);
offset += nCpy;
m_currentMessageSize += nCpy;
bitLeft -= nCpy;
}
//deal block, may be processed as soon as possible;
deal_current_message_block();
if (bitLeft>0)
continue;
//This Message is Over. Start a new one.
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
continue;
}
} // end if there is more bytes to append
} //end deal trans message
else
//...
} // end while block len > offset
return offset;
}
在處理當前塊資料的方法 deal_current_message_block裡,即可逐一判斷訊息型別,加以處理了。
5.3 叢集模組外部介面
叢集模組只負責在伺服器之間建立連線,並提供一套傳輸使用者資料的通路。在叢集建立連線後,使用者直接通過
void zp_ClusterTerm::SendDataToRemoteServer(QString svrName,QByteArray SourceArray)
{
int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + SourceArray.size();
QByteArray array(nMsgLen,0);
CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
pMsg->hearder.Mark = 0x1234;
pMsg->hearder.data_length = SourceArray.size();
pMsg->hearder.messagetype = 0x03;
memcpy (pMsg->payload.data,SourceArray.constData(),SourceArray.size());
m_hash_mutex.lock();
if (m_hash_Name2node.contains(svrName))
netEng()->SendDataToClient(m_hash_Name2node[svrName]->sock(),array);
m_hash_mutex.unlock();
}
向伺服器 svrName傳送 SourceArray, 並響應
void evt_RemoteData_recieved(QString /*svrHandle*/,QByteArray /*svrHandle*/ );
訊號來接收資料。使用者不用關心傳輸協議的封裝和解析。
但是,以下問題是不涉及的。
1、傳輸的資料的具體意義解釋
2、全域性客戶端的UUID雜湊和同步
3、客戶端資料是否被真正接收。
這些部分留給應用相關部分來具體實現。