Protobuf協議應用乾貨
Protobuf應用廣泛,尤其作為網路通訊協議最為普遍。本文將詳細描述幾個讓人眼前一亮的protobuf協議設計,對準備應用或已經應用protobuf的開發者會有所啟發,甚至可以直接拿過去用。 這裡描述的協議設計被用於生產環境的即時通訊、埋點資料採集、訊息推送、redis和mysql資料代理。
Bwar從2013年開始應用protobuf,2014年設計了用於mysql資料代理的protobuf協議,2015年設計了用於即時通訊的protobuf協議。高效能C++ IoC網路框架Nebula https://github.com/Bwar/Nebula把這幾個protobuf協議設計應用到了極致。
1. TCP通訊協議設計
本協議設計於2015年,用於一個生產環境的IM和埋點資料採集及實時分析,2016年又延伸發展了基於protobuf3的版本並用於開源網路框架Nebula。基於protobuf2和protobuf3的有較少差別,這裡分開講解兩個版本的協議設計。
1.1. protobuf2.5版Msg
2015年尚無protobuf3的release版本,protobuf2版本的fixed32型別是固定佔用4個位元組的,非常適合用於網路通訊協議設計。Bwar設計用於IM系統的協議包括兩個protobuf message:MsgHead和MsgBody,協議定義如下:
syntax = "proto2"; /** * @brief 訊息頭 */ message MsgHead { required fixed32 cmd = 1 ; ///< 命令字(壓縮加密算法佔高位1位元組) required fixed32 msgbody_len = 2; ///< 訊息體長度(單個訊息體長度不能超過65535即8KB) required fixed32 seq = 3; ///< 序列號 } /** * @brief 訊息體 * @note 訊息體主體是body,所有業務邏輯內容均放在body裡。session_id和session用於接入層路由, * 兩者只需要填充一個即可,首選session_id,當session_id用整型無法表達時才使用session。 */ message MsgBody { required bytes body = 1; ///< 訊息體主體 optional uint32 session_id = 2; ///< 會話ID(單聊訊息為接收者uid,個人資訊修改為uid,群聊訊息為groupid,群管理為groupid) optional string session = 3; ///< 會話ID(當session_id用整型無法表達時使用) optional bytes additional = 4; ///< 接入層附加的資料(客戶端無須理會) }
解析收到的位元組流時先解固定長度(15位元組)的MsgHead(protobuf3.0之後的版本必須在cmd、msgbody_len、seq均不為0的情況下才是15位元組),再通過MsgHead裡的msgbody_len判斷訊息體是否接收完畢,若接收完畢則呼叫MsgBody.Parse()解析。MsgBody裡的設計在下一節詳細說明。
MsgHead在實際的專案應用中對應下面的訊息頭並可以相互轉換:
#pragma pack(1) /** * @brief 與客戶端通訊訊息頭 */ struct tagClientMsgHead { unsigned char version; ///< 協議版本號(1位元組) unsigned char encript; ///< 壓縮加密演算法(1位元組) unsigned short cmd; ///< 命令字/功能號(2位元組) unsigned short checksum; ///< 校驗碼(2位元組) unsigned int body_len; ///< 訊息體長度(4位元組) unsigned int seq; ///< 序列號(4位元組) }; #pragma pack()
轉換程式碼如下:
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
tagClientMsgHead stClientMsgHead;
stClientMsgHead.version = 1; // version暫時無用
stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
stClientMsgHead.seq = htonl(oMsgHead.seq());
stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
...
}
E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
size_t uiHeadSize = sizeof(tagClientMsgHead);
if (pBuff->ReadableBytes() >= uiHeadSize)
{
tagClientMsgHead stClientMsgHead;
int iReadIdx = pBuff->GetReadIndex();
pBuff->Read(&stClientMsgHead, uiHeadSize);
stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
pBuff->ReadableBytes());
oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
oMsgHead.set_seq(stClientMsgHead.seq);
...
}
}
<br/>
1.2. protobuf3版Msg
protobuf3版的MsgHead和MsgBody從IM業務應用實踐中發展而來,同時滿足了埋點資料採集、實時計算、訊息推送等業務需要,更為通用。正因其通用性和高擴充套件性,採用proactor模型的IoC網路框架Nebula才會選用這個協議,通過這個協議,框架層將網路通訊工作從業務應用中完全獨立出來,基於Nebula框架的應用開發者甚至可以不懂網路程式設計也能開發出高併發的分散式服務。
MsgHead和MsgBody的protobuf定義如下:
syntax = "proto3";
// import "google/protobuf/any.proto";
/**
* @brief 訊息頭
* @note MsgHead為固定15位元組的頭部,當MsgHead不等於15位元組時,訊息傳送將出錯。
* 在proto2版本,MsgHead為15位元組總是成立,cmd、seq、len都是required;
* 但proto3版本,MsgHead為15位元組則必須要求cmd、seq、len均不等於0,否則無法正確進行收發編解碼。
*/
message MsgHead
{
fixed32 cmd = 1; ///< 命令字(壓縮加密算法佔高位1位元組)
fixed32 seq = 2; ///< 序列號
sfixed32 len = 3; ///< 訊息體長度
}
/**
* @brief 訊息體
* @note 訊息體主體是data,所有業務邏輯內容均放在data裡。req_target是請求目標,用於
* 服務端接入路由,請求包必須填充。rsp_result是響應結果,響應包必須填充。
*/
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 請求目標(請求包必須填充)
Response rsp_result = 2; ///< 響應結果(響應包必須填充)
}
bytes data = 3; ///< 訊息體主體
bytes add_on = 4; ///< 服務端接入層附加在請求包的資料(客戶端無須理會)
string trace_id = 5; ///< for log trace
message Request
{
uint32 route_id = 1; ///< 路由ID
string route = 2; ///< 路由ID(當route_id用整型無法表達時使用)
}
message Response
{
int32 code = 1; ///< 錯誤碼
bytes msg = 2; ///< 錯誤資訊
}
}
MsgBody的data欄位儲存訊息主體,任何自定義資料均可以二進位制資料流方式寫入到data。
msg_type用於標識該訊息是請求還是響應(所有網路資料流都可歸為請求或響應),如果是請求,則可以選擇性填充Request裡的route_id或route,如果填充了,則框架層無須解析應用層協議(也無法解析)就能自動根據路由ID轉發,而無須應用層解開data裡的內容再根據自定義邏輯轉發。如果是響應,則定義了統一的錯誤標準,也為業務無關的錯誤處理提供方便。
add_on是附在長連線上的業務資料,框架並不會解析但會在每次轉發訊息時帶上,可以為應用提供極其方便且強大的功能。比如,IM使用者登入時客戶端只發送使用者ID和密碼到服務端,服務端在登入校驗通過後,將該使用者的暱稱、頭像等資訊通過框架提供的方法SetClientData()將資料附在服務端接入層該使用者對應的長連線Channel上,之後所有從該連線過來的請求都會由框架層自動填充add_on欄位,服務端的其他邏輯伺服器只從data中得到自定義業務邏輯(比如聊天訊息)資料,卻可以從add_on中得到這個傳送使用者的資訊。add_on的設計簡化了應用開發邏輯,並降低了客戶端與服務端傳輸的資料量。
trace_id用於分散式日誌跟蹤。分散式服務的錯誤定位是相當麻煩的,Nebula分散式服務解決方案提供了日誌跟蹤功能,協議裡的trace_id欄位的設計使得Nebula框架可以在完全不增加應用開發者額外工作的情況下(正常呼叫LOG4_INFO寫日誌而無須額外工作)實現所有標記著同一trace_id的日誌傳送到指定一臺日誌伺服器,定義錯誤時跟單體服務那樣登入一臺伺服器檢視日誌即可。比如,IM使用者傳送一條訊息失敗,在使用者傳送的訊息到達服務端接入層時就被打上了trace_id標記,這個id會一直傳遞到邏輯層、儲存層等,哪個環節發生了錯誤都可以從訊息的傳送、轉發、處理路徑上查到。
MsgHead和MsgBody的編解碼實現見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp。
2. Http通訊協議設計
上面的講解的是protobuf應用於TCP資料流通訊,接下來將描述protobuf在http通訊上的應用。
在Web服務中通常會用Nginx做接入層的反向代理,經過Nginx轉發到後續業務邏輯層的tomcat、apache或nginx上,接入層和業務邏輯層至少做了兩次http協議解析,http協議是文字協議,傳輸資料量大解析速度慢。Nebula框架不是一個web伺服器,但支援http協議,在只需提供http介面的應用場景(比如完全前後端分離的後端)基於Nebula的單程序http服務端併發量就可以是tomcat的數十倍。這一定程度上得益於Nebula框架在http通訊上protobuf的應用。Nebula框架解析http文字協議並轉化為HttpMsg在服務內部處理,應用開發者填充HttpMsg,接入層將響應的HttpMsg轉換成http文字協議發回給請求方,不管服務端內部經過多少次中轉,始終只有一次http協議的decode和一次http協議的encode。
syntax = "proto3";
message HttpMsg
{
int32 type = 1; ///< http_parser_type 請求或響應
int32 http_major = 2; ///< http大版本號
int32 http_minor = 3; ///< http小版本號
int32 content_length = 4; ///< 內容長度
int32 method = 5; ///< 請求方法
int32 status_code = 6; ///< 響應狀態碼
int32 encoding = 7; ///< 傳輸編碼(只在encode時使用,當 Transfer-Encoding: chunked 時,用於標識chunk序號,0表示第一個chunk,依次遞增)
string url = 8; ///< 地址
map<string, string> headers = 9; ///< http頭域
bytes body = 10; ///< 訊息體(當 Transfer-Encoding: chunked 時,只儲存一個chunk)
map<string, string> params = 11; ///< GET方法引數,POST方法表單提交的引數
Upgrade upgrade = 12; ///< 升級協議
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decode時從url中解析出來,不需要人為填充(encode時不需要填)
bool is_decoding = 15; ///< 是否正在解碼(true 正在解碼, false 未解碼或已完成解碼)
message Upgrade
{
bool is_upgrade = 1;
string protocol = 2;
}
}
HttpMsg的編解碼實現見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp。
3. 資料庫代理服務協議設計
如果上面描述的protobuf在網路通訊上應用算不錯的話,那以下將protobuf用於資料代理上的協議設計則絕對是讓人眼前一亮。
有的公司規定web服務不得直接訪問MySQL資料庫,甚至不允許在web邏輯層拼接SQL語句。如果有這種出於安全性考慮而做的限制,在web邏輯層後面再增加一層業務邏輯層成本未免太高了,那麼解決辦法應該是增加一層業務邏輯無關的代理服務層。這個代理服務層不是簡單的轉發SQL語句這麼簡單,因為web邏輯層可能不允許拼接SQL,由此引出我們這個用於資料庫代理的protobuf協議設計。這個協議是將SQL邏輯融入整個協議之中,資料庫代理層接收並解析這個協議後生成SQL語句或用binding方式到資料庫去執行。資料庫代理層只有協議解析和轉化邏輯,無其他任何業務邏輯,業務邏輯還在web邏輯層,區別只在於從拼接SQL變成了填充protobuf協議。
syntax = "proto2";
package dbagent;
/**
* @brief DB Agent訊息
*/
message DbAgentMsg
{
enum E_TYPE
{
UNDEFINE = 0; ///< 未定義
REQ_CONNECT = 1; ///< 連線DB請求
RSP_CONNECT = 2; ///< 連線DB響應
REQ_QUERY = 3; ///< 執行SQL請求
RSP_QUERY = 4; ///< 執行SQL響應
REQ_DISCONNECT = 5; ///< 關閉連線請求
RSP_DISCONNECT = 6; ///< 關閉連線響應
RSP_RECORD = 7; ///< 結果集記錄
RSP_COMMON = 8; ///< 通用響應(當請求不能被Server所認知時會做出這個迴應)
REQ_GET_CONNECT = 9; ///< 獲取連線請求
RSP_GET_CONNECT = 10; ///< 獲取連線響應
}
required E_TYPE type = 1; ///< 訊息/操作 型別
optional RequestConnect req_connect = 2; ///< 連線請求
optional ResponseConnect rsp_connect = 3; ///< 連線響應
optional RequestDisconnect req_disconnect = 4; ///< 關閉請求
optional ResponseDisconnect rsp_disconnect = 5; ///< 關閉響應
optional RequestQuery req_query = 6; ///< 執行SQL請求
optional ResponseQuery rsp_query = 7; ///< 執行SQL響應
optional ResponseRecord rsp_record = 8; ///< SELECT結果集記錄
optional ResponseCommon rsp_common = 9; ///< 通用響應
optional RequestGetConnection req_get_conn = 10; ///< 獲取連線請求
optional ResponseGetConnection rsp_get_conn = 11; ///< 獲取連線響應
}
/**
* @brief 連線請求
*/
message RequestConnect
{
required string host = 1; ///< DB所在伺服器IP
required int32 port = 2; ///< DB埠
required string user = 3; ///< DB使用者名稱
required string password = 4; ///< DB使用者密碼
required string dbname = 5; ///< DB庫名
required string charset = 6; ///< DB字符集
}
/**
* @brief 連線響應
*/
message ResponseConnect
{
required int32 connect_id = 1; ///< 連線ID (連線失敗時,connect_id為0)
optional int32 err_no = 2; ///< 錯誤碼 0 表示連線成功
optional string err_msg = 3; ///< 錯誤資訊
}
/**
* @brief 關閉連線請求
*/
message RequestDisconnect
{
required int32 connect_id = 1; ///< 連線ID (連線失敗時,connect_id為0)
}
/**
* @brief 關閉連線響應
*/
message ResponseDisconnect
{
optional int32 err_no = 2; ///< 錯誤碼 0 表示連線成功
optional string err_msg = 3; ///< 錯誤資訊
}
/**
* @brief 執行SQL請求
*/
message RequestQuery
{
required E_QUERY_TYPE query_type = 1; ///< 查詢型別
required string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列型別
repeated ConditionGroup conditions= 4; ///< where條件組(由group_relation指定,若不指定則預設為AND關係)
repeated string groupby_col = 5; ///< group by欄位
repeated OrderBy orderby_col = 6; ///< order by欄位
optional uint32 limit = 7; ///< 指定返回的行數的最大值 (limit 200)
optional uint32 limit_from = 8; ///< 指定返回的第一行的偏移量 (limit 100, 200)
optional ConditionGroup.E_RELATION group_relation = 9; ///< where條件組的關係,條件組之間有且只有一種關係(and或者or)
optional int32 connect_id = 10; ///< 連線ID,有效連線ID(長連線,當connect後多次執行query可以使用connect_id)
optional string bid = 11; ///< 業務ID,在CmdDbAgent.json配置檔案中配置(短連線,每次執行query時連線,執行完後關閉連線)
optional string password = 12; ///< 業務密碼
enum E_QUERY_TYPE ///< 查詢型別
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
enum E_COL_TYPE ///< 列型別
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Field ///< 欄位
{
required string col_name = 1; ///< 列名
required E_COL_TYPE col_type = 2; ///< 列型別
required bytes col_value = 3; ///< 列值
optional string col_as = 4; ///< as列名
}
message Condition ///< where條件
{
required E_RELATION relation = 1; ///< 關係(=, !=, >, <, >=, <= 等)
required E_COL_TYPE col_type = 2; ///< 列型別
required string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation為IN時值的個數大於1有效)
optional string col_name_right= 5; ///< 關係右邊列名(用於where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
required E_RELATION relation = 1; ///< 條件之間的關係,一個ConditionGroup裡的所有Condition之間有且只有一種關係(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
required E_RELATION relation = 1; ///< 降序或升序
required string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
/**
* @brief 執行SQL響應
*/
message ResponseQuery
{
required uint32 seq = 1; ///< 資料包序列號(SELECT結果集會分包返回,只有一個包的情況或已到達最後一個包則seq=0xFFFFFFFF)
required int32 err_no = 2; ///< 錯誤碼,0 表示執行成功
optional string err_msg = 3; ///< 錯誤資訊
optional uint64 insert_id = 4; ///< mysql_insert_id()獲取的值(視執行的SQL語句而定,不一定存在)
repeated bytes dict = 5; ///< 結果集字典(視執行的SQL語句而定,不一定存在)
}
/**
* @brief SELECT語句返回結果集的一條記錄
*/
message ResponseRecord
{
required uint32 seq = 1; ///< 資料包序列號(SELECT結果集會分包返回,已到達最後一個包則seq=0xFFFFFFFF)
repeated bytes field = 2; ///< 資料集記錄的欄位
}
/**
* @brief 常規響應
*/
message ResponseCommon
{
optional int32 err_no = 1; ///< 錯誤碼 0 表示連線成功
optional string err_msg = 2; ///< 錯誤資訊
}
/**
* @brief 獲取連線請求
*/
message RequestGetConnection
{
required string bid = 1; ///< 業務ID,在dbproxy配置檔案中配置
required string password = 2; ///< 業務密碼
}
/**
* @brief 獲取連線響應
*/
message ResponseGetConnection
{
required int32 connect_id = 1; ///< 連線ID,有效連線ID,否則執行失敗
optional int32 err_no = 2; ///< 錯誤碼 0 表示連線成功
optional string err_msg = 3; ///< 錯誤資訊
}
基於這個資料庫操作協議開發的資料庫代理層完全解決了web邏輯層不允許直接訪問資料庫也不允許拼接SQL語句的問題,而且幾乎沒有增加開發代價。另外,基於這個協議的資料庫代理天然防止SQL注入(在代理層校驗field_name,並且mysql_escape_string(filed_value)),雖然防SQL注入應是應用層的責任,但多了資料代理這層保障也是好事。
這個協議只支援簡單SQL,不支援聯合查詢、子查詢,也不支援儲存過程,如果需要支援的話協議會更復雜。在Bwar所負責過的業務裡,基本都禁止資料庫聯合查詢之類,只把資料庫當儲存用,不把邏輯寫到SQL語句裡,所以這個協議滿足大部分業務需要。
這一節只說明資料庫代理協議,下一節將從資料庫代理協議延伸並提供協議程式碼講解。
4. Redis和MySQL資料代理協議設計
大部分後臺應用只有MySQL是不夠的,往往還需要快取,經常會用Redis來做資料快取。用快取意味著資料至少需要同時寫到Redis和MySQL,又或者在未命中快取時從MySQL中讀取到的資料需要回寫到Redis,這些通常都是由業務邏輯層來做的。也有例外,Nebula提供的分散式解決方案是由資料代理層來做的,業務邏輯層只需向資料代理層傳送一個protobuf協議資料,資料代理層就會完成Redis和MySQL雙寫或快取未命中時的自動回寫(暫且不探討資料一致性問題)。資料代理層來做這些工作是為了減少業務邏輯層的複雜度,提高開發效率。既然是為了提高開發效率,就得讓業務邏輯層低於原來同時操作Redis和MySQL的開發量。Nebula提供的NebulaMydis就是這樣一個讓原來同時操作Redis和MySQL的開發量(假設是2)降到1.2左右。
這個同時操作Redis和MySQL的資料代理協議如下:
syntax = "proto3";
package neb;
message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;
message RedisOperate
{
bytes key_name = 1;
string redis_cmd_read = 2;
string redis_cmd_write = 3;
OPERATE_TYPE op_type = 4;
repeated Field fields = 5;
int32 key_ttl = 6;
int32 redis_structure = 7; ///< redis資料型別
int32 data_purpose = 8; ///< 資料用途
bytes hash_key = 9; ///< 可選hash key,當has_hash_key()時用hash_key來計算hash值,否則用key_name來計算hash值
enum OPERATE_TYPE
{
T_READ = 0;
T_WRITE = 1;
}
}
message DbOperate
{
E_QUERY_TYPE query_type = 1; ///< 查詢型別
string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列型別
repeated ConditionGroup conditions = 4; ///< where條件組(由group_relation指定,若不指定則預設為AND關係)
repeated string groupby_col = 5; ///< group by欄位
repeated OrderBy orderby_col = 6; ///< order by欄位
ConditionGroup.E_RELATION group_relation = 7; ///< where條件組的關係,條件組之間有且只有一種關係(and或者or)
uint32 limit = 8; ///< 指定返回的行數的最大值 (limit 200)
uint32 limit_from = 9; ///< 指定返回的第一行的偏移量 (limit 100, 200)
uint32 mod_factor = 10; ///< 分表取模因子,當這個欄位沒有時使用section_factor
enum E_QUERY_TYPE ///< 查詢型別
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
message Condition ///< where條件
{
E_RELATION relation = 1; ///< 關係(=, !=, >, <, >=, <= 等)
E_COL_TYPE col_type = 2; ///< 列型別
string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation為IN時值的個數大於1有效)
string col_name_right = 5; ///< 關係右邊列名(用於where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
E_RELATION relation = 1; ///< 條件之間的關係,一個ConditionGroup裡的所有Condition之間有且只有一種關係(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
E_RELATION relation = 1; ///< 降序或升序
string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
}
enum E_COL_TYPE ///< 列型別
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Record
{
repeated Field field_info = 1; ///< value data
}
message Field ///< 欄位
{
string col_name = 1; ///< 列名
E_COL_TYPE col_type = 2; ///< 列型別
bytes col_value = 3; ///< 列值
string col_as = 4; ///< as列名
}
/**
* @brief 查詢結果
* @note 適用於Redis返回和MySQL返回,當totalcount與curcount相等時表明資料已接收完畢,
* 否則表示資料尚未接收完,剩餘的資料會在後續資料包繼續返回。
*/
message Result
{
int32 err_no = 1;
bytes err_msg = 2;
int32 total_count = 3;
int32 current_count = 4;
repeated Record record_data = 5;
int32 from = 6; ///< 資料來源 E_RESULT_FROM
DataLocate locate = 7; ///< 僅在DataProxy使用
enum E_RESULT_FROM
{
FROM_DB = 0;
FROM_REDIS = 1;
}
message DataLocate
{
uint32 section_from = 1;
uint32 section_to = 2; ///< 資料所在分段,section_from < MemOperate.section_factor <= section_to
uint32 hash = 3; ///< 用於做分佈的hash值(取模運算時,為取模後的結果)
uint32 divisor = 4; ///< 取模運算的除數(一致性hash時不需要)
}
}
這個協議分了Redis和MySQL兩部分資料,看似業務邏輯層把一份資料填充了兩份並沒有降低多少開發量,實際上這兩部分資料有許多是可共用的,只要提供一個填充類就可以大幅降低協議填充開發量。為簡化協議填充,Nebula提供了幾個類:同時填充Redis和MySQL資料、只填充Redis、只填充MySQL。
從Mydis協議的MySQL部分如何生成SQL語句請參考NebulaDbAgent,核心程式碼標頭檔案如下:
namespace dbagent
{
const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;
struct tagConnection
{
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;
tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}
~tagConnection()
{
if (pDbi != NULL)
{
delete pDbi;
pDbi = NULL;
}
}
};
class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();
virtual bool Init();
virtual bool AnyMessage(
std::shared_ptr<neb::SocketChannel> pChannel,
const MsgHead& oMsgHead,
const MsgBody& oMsgBody);
protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType,
const std::string& strMasterIdentify, const std::string& strSlaveIdentify,
const neb::CJsonObject& oInstanceConf, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);
int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //檢查連線是否已超時
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);
bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);
private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空閒連線超時(單位秒)
char* m_szColValue; //欄位值
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //分段因子區間配置,key為因子型別
std::map<std::string, neb::CJsonObject*> m_mapDbInstanceInfo; //資料庫配置資訊key為("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //資料庫連線池,key為identify(如:192.168.18.22:3306)
};
} // namespace dbagent
整個mydis資料協議是如何解析如何使用,如何做Redis和MySQL的資料雙寫、快取資料回寫等不在本文討論範圍,如有興趣可以閱讀NebulaMydis原始碼,也可以聯絡Bwar。
5. 結語
Protobuf用得合適用得好可以解決許多問題,可以提高開發效率,也可以提高執行效率,以上就是Bwar多年應用protobuf的小結,沒有任何藏私,文中列出的協議都可以在開源專案Nebula的這個路徑https://github.com/Bwar/Nebula/tree/master/proto找到。
開發Nebula框架目的是致力於提供一種基於C++快速構建高效能的分散式服務。如果覺得本文對你有用,別忘了到Nebula的Github或碼雲