Protobuf協議精品應用
??Bwar從2013年開始應用protobuf,2014年設計了用於mysql數據代理的protobuf協議,2015年設計了用於即時通訊的protobuf協議。高性能C++ IoC網絡框架Nebula https://github.com/Bwar/Nebula把這幾個protobuf協議設計應用到了極致。
1. TCP通訊協議設計
??本協議設計於2015年,用於一個生產環境的IM和埋點數據采集及實時分析,2016年又延伸發展了基於protobuf3的版本並用於開源網絡框架Nebula。基於protobuf2和protobuf3的有較少差別,這裏分開講解兩個版本的協議設計。
1.1. protobuf2.5版Msg
??2015年尚無protobuf3的release版本,protobuf2版本的fixed32類型是固定占用4個字節的,非常適合用於網絡通訊協議設計。Bwar設計用於IM系統的協議包括兩個protobuf message:MsgHead和MsgBody,協議定義如下:
```C++
syntax = "proto2";
/**
- @brief 消息頭
*/
message MsgHead
{
required fixed32 cmd = 1 ; ///< 命令字(壓縮加密算法占高位1字節)
required fixed32 msgbody_len = 2; ///< 消息體長度(單個消息體長度不能超過65535即8KB)
required fixed32 seq = 3; ///< 序列號
}
/**
- @brief 消息體
- @note 消息體主體是body,所有業務邏輯內容均放在body裏。session_id和session用於接入層路由,
- 兩者只需要填充一個即可,首選session_id,當session_id用整型無法表達時才使用session。
*/
message MsgBody
{
required bytes body = 1; ///< 消息體主體
optional uint32 session_id = 2; ///< 會話ID(單聊消息為接收者uid,個人信息修改為uid,群聊消息為groupid,群管理為groupid)
optional string session = 3; ///< 會話ID(當session_id用整型無法表達時使用)
optional bytes additional = 4; ///< 接入層附加的數據(客戶端無須理會)
}
??解析收到的字節流時先解固定長度(15字節)的MsgHead(protobuf3.0之後的版本必須在cmd、msgbody_len、seq均不為0的情況下才是15字節),再通過MsgHead裏的msgbody_len判斷消息體是否接收完畢,若接收完畢則調用MsgBody.Parse()解析。MsgBody裏的設計在下一節詳細說明。
??MsgHead在實際的項目應用中對應下面的消息頭並可以相互轉換:
```C++
#pragma pack(1)
/**
- @brief 與客戶端通信消息頭
*/
struct tagClientMsgHead
{
unsigned char version; ///< 協議版本號(1字節)
unsigned char encript; ///< 壓縮加密算法(1字節)
unsigned short cmd; ///< 命令字/功能號(2字節)
unsigned short checksum; ///< 校驗碼(2字節)
unsigned int body_len; ///< 消息體長度(4字節)
unsigned int seq; ///< 序列號(4字節)
};
#pragma pack()
??轉換代碼如下:
```C++
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
tagClientMsgHead stClientMsgHead;
stClientMsgHead.version = 1; // version暫時無用
stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
stClientMsgHead.seq = htonl(oMsgHead.seq());
stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
...
}
E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
size_t uiHeadSize = sizeof(tagClientMsgHead);
if (pBuff->ReadableBytes() >= uiHeadSize)
{
tagClientMsgHead stClientMsgHead;
int iReadIdx = pBuff->GetReadIndex();
pBuff->Read(&stClientMsgHead, uiHeadSize);
stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
pBuff->ReadableBytes());
oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
oMsgHead.set_seq(stClientMsgHead.seq);
...
}
}
<br/>
1.2. protobuf3版Msg
??protobuf3版的MsgHead和MsgBody從IM業務應用實踐中發展而來,同時滿足了埋點數據采集、實時計算、消息推送等業務需要,更為通用。正因其通用性和高擴展性,采用proactor模型的IoC網絡框架Nebula才會選用這個協議,通過這個協議,框架層將網絡通信工作從業務應用中完全獨立出來,基於Nebula框架的應用開發者甚至可以不懂網絡編程也能開發出高並發的分布式服務。
??MsgHead和MsgBody的protobuf定義如下:
```C++
syntax = "proto3";
// import "google/protobuf/any.proto";
/**
- @brief 消息頭
- @note MsgHead為固定15字節的頭部,當MsgHead不等於15字節時,消息發送將出錯。
- 在proto2版本,MsgHead為15字節總是成立,cmd、seq、len都是required;
- 但proto3版本,MsgHead為15字節則必須要求cmd、seq、len均不等於0,否則無法正確進行收發編解碼。
*/
message MsgHead
{
fixed32 cmd = 1; ///< 命令字(壓縮加密算法占高位1字節)
fixed32 seq = 2; ///< 序列號
sfixed32 len = 3; ///< 消息體長度
}
/**
- @brief 消息體
- @note 消息體主體是data,所有業務邏輯內容均放在data裏。req_target是請求目標,用於
-
服務端接入路由,請求包必須填充。rsp_result是響應結果,響應包必須填充。
*/
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 請求目標(請求包必須填充)
Response rsp_result = 2; ///< 響應結果(響應包必須填充)
}
bytes data = 3; ///< 消息體主體
bytes add_on = 4; ///< 服務端接入層附加在請求包的數據(客戶端無須理會)
string trace_id = 5; ///< for log tracemessage Request
{
uint32 route_id = 1; ///< 路由ID
string route = 2; ///< 路由ID(當route_id用整型無法表達時使用)
}message Response
{
int32 code = 1; ///< 錯誤碼
bytes msg = 2; ///< 錯誤信息
}
}
??MsgBody的data字段存儲消息主體,任何自定義數據均可以二進制數據流方式寫入到data。
??msg_type用於標識該消息是請求還是響應(所有網絡數據流都可歸為請求或響應),如果是請求,則可以選擇性填充Request裏的route_id或route,如果填充了,則框架層無須解析應用層協議(也無法解析)就能自動根據路由ID轉發,而無須應用層解開data裏的內容再根據自定義邏輯轉發。如果是響應,則定義了統一的錯誤標準,也為業務無關的錯誤處理提供方便。
??add_on是附在長連接上的業務數據,框架並不會解析但會在每次轉發消息時帶上,可以為應用提供極其方便且強大的功能。比如,IM用戶登錄時客戶端只發送用戶ID和密碼到服務端,服務端在登錄校驗通過後,將該用戶的昵稱、頭像等信息通過框架提供的方法SetClientData()將數據附在服務端接入層該用戶對應的長連接Channel上,之後所有從該連接過來的請求都會由框架層自動填充add_on字段,服務端的其他邏輯服務器只從data中得到自定義業務邏輯(比如聊天消息)數據,卻可以從add_on中得到這個發送用戶的信息。add_on的設計簡化了應用開發邏輯,並降低了客戶端與服務端傳輸的數據量。
??trace_id用於分布式日誌跟蹤。分布式服務的錯誤定位是相當麻煩的,Nebula分布式服務解決方案提供了日誌跟蹤功能,協議裏的trace_id字段的設計使得Nebula框架可以在完全不增加應用開發者額外工作的情況下(正常調用LOG4_INFO寫日誌而無須額外工作)實現所有標記著同一trace_id的日誌發送到指定一臺日誌服務器,定義錯誤時跟單體服務那樣登錄一臺服務器查看日誌即可。比如,IM用戶發送一條消息失敗,在用戶發送的消息到達服務端接入層時就被打上了trace_id標記,這個id會一直傳遞到邏輯層、存儲層等,哪個環節發生了錯誤都可以從消息的發送、轉發、處理路徑上查到。
??MsgHead和MsgBody的編解碼實現見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp。
2. Http通訊協議設計
??上面的講解的是protobuf應用於TCP數據流通信,接下來將描述protobuf在http通信上的應用。
??在Web服務中通常會用Nginx做接入層的反向代理,經過Nginx轉發到後續業務邏輯層的tomcat、apache或nginx上,接入層和業務邏輯層至少做了兩次http協議解析,http協議是文本協議,傳輸數據量大解析速度慢。Nebula框架不是一個web服務器,但支持http協議,在只需提供http接口的應用場景(比如完全前後端分離的後端)基於Nebula的單進程http服務端並發量就可以是tomcat的數十倍。這一定程度上得益於Nebula框架在http通信上protobuf的應用。Nebula框架解析http文本協議並轉化為HttpMsg在服務內部處理,應用開發者填充HttpMsg,接入層將響應的HttpMsg轉換成http文本協議發回給請求方,不管服務端內部經過多少次中轉,始終只有一次http協議的decode和一次http協議的encode。
```C++
syntax = "proto3";
message HttpMsg
{
int32 type = 1; ///< http_parser_type 請求或響應
int32 http_major = 2; ///< http大版本號
int32 http_minor = 3; ///< http小版本號
int32 content_length = 4; ///< 內容長度
int32 method = 5; ///< 請求方法
int32 status_code = 6; ///< 響應狀態碼
int32 encoding = 7; ///< 傳輸編碼(只在encode時使用,當 Transfer-Encoding: chunked 時,用於標識chunk序號,0表示第一個chunk,依次遞增)
string url = 8; ///< 地址
map<string, string> headers = 9; ///< http頭域
bytes body = 10; ///< 消息體(當 Transfer-Encoding: chunked 時,只存儲一個chunk)
map<string, string> params = 11; ///< GET方法參數,POST方法表單提交的參數
Upgrade upgrade = 12; ///< 升級協議
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decode時從url中解析出來,不需要人為填充(encode時不需要填)
bool is_decoding = 15; ///< 是否正在解碼(true 正在解碼, false 未解碼或已完成解碼)
message Upgrade
{
bool is_upgrade = 1;
string protocol = 2;
}
}
??HttpMsg的編解碼實現見Nebula框架的[https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp](https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp)。
### 3. 數據庫代理服務協議設計
??如果上面描述的protobuf在網絡通信上應用算不錯的話,那以下將protobuf用於數據代理上的協議設計則絕對是讓人眼前一亮。
??有的公司規定web服務不得直接訪問MySQL數據庫,甚至不允許在web邏輯層拼接SQL語句。如果有這種出於安全性考慮而做的限制,在web邏輯層後面再增加一層業務邏輯層成本未免太高了,那麽解決辦法應該是增加一層業務邏輯無關的代理服務層。這個代理服務層不是簡單的轉發SQL語句這麽簡單,因為web邏輯層可能不允許拼接SQL,由此引出我們這個用於數據庫代理的protobuf協議設計。這個協議是將SQL邏輯融入整個協議之中,數據庫代理層接收並解析這個協議後生成SQL語句或用binding方式到數據庫去執行。數據庫代理層只有協議解析和轉化邏輯,無其他任何業務邏輯,業務邏輯還在web邏輯層,區別只在於從拼接SQL變成了填充protobuf協議。
```C++
syntax = "proto2";
package dbagent;
/**
* @brief DB Agent消息
*/
message DbAgentMsg
{
enum E_TYPE
{
UNDEFINE = 0; ///< 未定義
REQ_CONNECT = 1; ///< 連接DB請求
RSP_CONNECT = 2; ///< 連接DB響應
REQ_QUERY = 3; ///< 執行SQL請求
RSP_QUERY = 4; ///< 執行SQL響應
REQ_DISCONNECT = 5; ///< 關閉連接請求
RSP_DISCONNECT = 6; ///< 關閉連接響應
RSP_RECORD = 7; ///< 結果集記錄
RSP_COMMON = 8; ///< 通用響應(當請求不能被Server所認知時會做出這個回應)
REQ_GET_CONNECT = 9; ///< 獲取連接請求
RSP_GET_CONNECT = 10; ///< 獲取連接響應
}
required E_TYPE type = 1; ///< 消息/操作 類型
optional RequestConnect req_connect = 2; ///< 連接請求
optional ResponseConnect rsp_connect = 3; ///< 連接響應
optional RequestDisconnect req_disconnect = 4; ///< 關閉請求
optional ResponseDisconnect rsp_disconnect = 5; ///< 關閉響應
optional RequestQuery req_query = 6; ///< 執行SQL請求
optional ResponseQuery rsp_query = 7; ///< 執行SQL響應
optional ResponseRecord rsp_record = 8; ///< SELECT結果集記錄
optional ResponseCommon rsp_common = 9; ///< 通用響應
optional RequestGetConnection req_get_conn = 10; ///< 獲取連接請求
optional ResponseGetConnection rsp_get_conn = 11; ///< 獲取連接響應
}
/**
* @brief 連接請求
*/
message RequestConnect
{
required string host = 1; ///< DB所在服務器IP
required int32 port = 2; ///< DB端口
required string user = 3; ///< DB用戶名
required string password = 4; ///< DB用戶密碼
required string dbname = 5; ///< DB庫名
required string charset = 6; ///< DB字符集
}
/**
* @brief 連接響應
*/
message ResponseConnect
{
required int32 connect_id = 1; ///< 連接ID (連接失敗時,connect_id為0)
optional int32 err_no = 2; ///< 錯誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯誤信息
}
/**
* @brief 關閉連接請求
*/
message RequestDisconnect
{
required int32 connect_id = 1; ///< 連接ID (連接失敗時,connect_id為0)
}
/**
* @brief 關閉連接響應
*/
message ResponseDisconnect
{
optional int32 err_no = 2; ///< 錯誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯誤信息
}
/**
* @brief 執行SQL請求
*/
message RequestQuery
{
required E_QUERY_TYPE query_type = 1; ///< 查詢類型
required string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions= 4; ///< where條件組(由group_relation指定,若不指定則默認為AND關系)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
optional uint32 limit = 7; ///< 指定返回的行數的最大值 (limit 200)
optional uint32 limit_from = 8; ///< 指定返回的第一行的偏移量 (limit 100, 200)
optional ConditionGroup.E_RELATION group_relation = 9; ///< where條件組的關系,條件組之間有且只有一種關系(and或者or)
optional int32 connect_id = 10; ///< 連接ID,有效連接ID(長連接,當connect後多次執行query可以使用connect_id)
optional string bid = 11; ///< 業務ID,在CmdDbAgent.json配置文件中配置(短連接,每次執行query時連接,執行完後關閉連接)
optional string password = 12; ///< 業務密碼
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Field ///< 字段
{
required string col_name = 1; ///< 列名
required E_COL_TYPE col_type = 2; ///< 列類型
required bytes col_value = 3; ///< 列值
optional string col_as = 4; ///< as列名
}
message Condition ///< where條件
{
required E_RELATION relation = 1; ///< 關系(=, !=, >, <, >=, <= 等)
required E_COL_TYPE col_type = 2; ///< 列類型
required string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation為IN時值的個數大於1有效)
optional string col_name_right= 5; ///< 關系右邊列名(用於where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
required E_RELATION relation = 1; ///< 條件之間的關系,一個ConditionGroup裏的所有Condition之間有且只有一種關系(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
required E_RELATION relation = 1; ///< 降序或升序
required string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
/**
* @brief 執行SQL響應
*/
message ResponseQuery
{
required uint32 seq = 1; ///< 數據包序列號(SELECT結果集會分包返回,只有一個包的情況或已到達最後一個包則seq=0xFFFFFFFF)
required int32 err_no = 2; ///< 錯誤碼,0 表示執行成功
optional string err_msg = 3; ///< 錯誤信息
optional uint64 insert_id = 4; ///< mysql_insert_id()獲取的值(視執行的SQL語句而定,不一定存在)
repeated bytes dict = 5; ///< 結果集字典(視執行的SQL語句而定,不一定存在)
}
/**
* @brief SELECT語句返回結果集的一條記錄
*/
message ResponseRecord
{
required uint32 seq = 1; ///< 數據包序列號(SELECT結果集會分包返回,已到達最後一個包則seq=0xFFFFFFFF)
repeated bytes field = 2; ///< 數據集記錄的字段
}
/**
* @brief 常規響應
*/
message ResponseCommon
{
optional int32 err_no = 1; ///< 錯誤碼 0 表示連接成功
optional string err_msg = 2; ///< 錯誤信息
}
/**
* @brief 獲取連接請求
*/
message RequestGetConnection
{
required string bid = 1; ///< 業務ID,在dbproxy配置文件中配置
required string password = 2; ///< 業務密碼
}
/**
* @brief 獲取連接響應
*/
message ResponseGetConnection
{
required int32 connect_id = 1; ///< 連接ID,有效連接ID,否則執行失敗
optional int32 err_no = 2; ///< 錯誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯誤信息
}
??基於這個數據庫操作協議開發的數據庫代理層完全解決了web邏輯層不允許直接訪問數據庫也不允許拼接SQL語句的問題,而且幾乎沒有增加開發代價。另外,基於這個協議的數據庫代理天然防止SQL註入(在代理層校驗field_name,並且mysql_escape_string(filed_value)),雖然防SQL註入應是應用層的責任,但多了數據代理這層保障也是好事。
??這個協議只支持簡單SQL,不支持聯合查詢、子查詢,也不支持存儲過程,如果需要支持的話協議會更復雜。在Bwar所負責過的業務裏,基本都禁止數據庫聯合查詢之類,只把數據庫當存儲用,不把邏輯寫到SQL語句裏,所以這個協議滿足大部分業務需要。
??這一節只說明數據庫代理協議,下一節將從數據庫代理協議延伸並提供協議代碼講解。
4. Redis和MySQL數據代理協議設計
??大部分後臺應用只有MySQL是不夠的,往往還需要緩存,經常會用Redis來做數據緩存。用緩存意味著數據至少需要同時寫到Redis和MySQL,又或者在未命中緩存時從MySQL中讀取到的數據需要回寫到Redis,這些通常都是由業務邏輯層來做的。也有例外,Nebula提供的分布式解決方案是由數據代理層來做的,業務邏輯層只需向數據代理層發送一個protobuf協議數據,數據代理層就會完成Redis和MySQL雙寫或緩存未命中時的自動回寫(暫且不探討數據一致性問題)。數據代理層來做這些工作是為了減少業務邏輯層的復雜度,提高開發效率。既然是為了提高開發效率,就得讓業務邏輯層低於原來同時操作Redis和MySQL的開發量。Nebula提供的NebulaMydis就是這樣一個讓原來同時操作Redis和MySQL的開發量(假設是2)降到1.2左右。
??這個同時操作Redis和MySQL的數據代理協議如下:
```C++
syntax = "proto3";
package neb;
message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;
message RedisOperate
{
bytes key_name = 1;
string redis_cmd_read = 2;
string redis_cmd_write = 3;
OPERATE_TYPE op_type = 4;
repeated Field fields = 5;
int32 key_ttl = 6;
int32 redis_structure = 7; ///< redis數據類型
int32 data_purpose = 8; ///< 數據用途
bytes hash_key = 9; ///< 可選hash key,當has_hash_key()時用hash_key來計算hash值,否則用key_name來計算hash值
enum OPERATE_TYPE
{
T_READ = 0;
T_WRITE = 1;
}
}
message DbOperate
{
E_QUERY_TYPE query_type = 1; ///< 查詢類型
string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions = 4; ///< where條件組(由group_relation指定,若不指定則默認為AND關系)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
ConditionGroup.E_RELATION group_relation = 7; ///< where條件組的關系,條件組之間有且只有一種關系(and或者or)
uint32 limit = 8; ///< 指定返回的行數的最大值 (limit 200)
uint32 limit_from = 9; ///< 指定返回的第一行的偏移量 (limit 100, 200)
uint32 mod_factor = 10; ///< 分表取模因子,當這個字段沒有時使用section_factor
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
message Condition ///< where條件
{
E_RELATION relation = 1; ///< 關系(=, !=, >, <, >=, <= 等)
E_COL_TYPE col_type = 2; ///< 列類型
string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation為IN時值的個數大於1有效)
string col_name_right = 5; ///< 關系右邊列名(用於where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
E_RELATION relation = 1; ///< 條件之間的關系,一個ConditionGroup裏的所有Condition之間有且只有一種關系(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
E_RELATION relation = 1; ///< 降序或升序
string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Record
{
repeated Field field_info = 1; ///< value data
}
message Field ///< 字段
{
string col_name = 1; ///< 列名
E_COL_TYPE col_type = 2; ///< 列類型
bytes col_value = 3; ///< 列值
string col_as = 4; ///< as列名
}
/**
- @brief 查詢結果
- @note 適用於Redis返回和MySQL返回,當totalcount與curcount相等時表明數據已接收完畢,
- 否則表示數據尚未接收完,剩余的數據會在後續數據包繼續返回。
*/
message Result
{
int32 err_no = 1;
bytes err_msg = 2;
int32 total_count = 3;
int32 current_count = 4;
repeated Record record_data = 5;
int32 from = 6; ///< 數據來源 E_RESULT_FROM
DataLocate locate = 7; ///< 僅在DataProxy使用
enum E_RESULT_FROM
{
FROM_DB = 0;
FROM_REDIS = 1;
}
message DataLocate
{
uint32 section_from = 1;
uint32 section_to = 2; ///< 數據所在分段,section_from < MemOperate.section_factor <= section_to
uint32 hash = 3; ///< 用於做分布的hash值(取模運算時,為取模後的結果)
uint32 divisor = 4; ///< 取模運算的除數(一致性hash時不需要)
}
}
??這個協議分了Redis和MySQL兩部分數據,看似業務邏輯層把一份數據填充了兩份並沒有降低多少開發量,實際上這兩部分數據有許多是可共用的,只要提供一個填充類就可以大幅降低協議填充開發量。為簡化協議填充,Nebula提供了幾個類:同時填充Redis和MySQL數據、只填充Redis、只填充MySQL。
??從Mydis協議的MySQL部分如何生成SQL語句請參考NebulaDbAgent,核心代碼頭文件如下:
```C++
namespace dbagent
{
const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;
struct tagConnection
{
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;
tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}
~tagConnection()
{
if (pDbi != NULL)
{
delete pDbi;
pDbi = NULL;
}
}
};
class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();
virtual bool Init();
virtual bool AnyMessage(
std::shared_ptr<neb::SocketChannel> pChannel,
const MsgHead& oMsgHead,
const MsgBody& oMsgBody);
protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType,
const std::string& strMasterIdentify, const std::string& strSlaveIdentify,
const neb::CJsonObject& oInstanceConf, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);
int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //檢查連接是否已超時
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);
bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);
private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空閑連接超時(單位秒)
char m_szColValue; //字段值
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //分段因子區間配置,key為因子類型
std::map<std::string, neb::CJsonObject> m_mapDbInstanceInfo; //數據庫配置信息key為("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //數據庫連接池,key為identify(如:192.168.18.22:3306)
};
} // namespace dbagent
??整個mydis數據協議是如何解析如何使用,如何做Redis和MySQL的數據雙寫、緩存數據回寫等不在本文討論範圍,如有興趣可以閱讀[NebulaMydis](https://github.com/Bwar/NebulaMydis)源碼,也可以聯系Bwar。
### 5. 結語
??Protobuf用得合適用得好可以解決許多問題,可以提高開發效率,也可以提高運行效率,以上就是Bwar多年應用protobuf的小結,沒有任何藏私,文中列出的協議都可以在開源項目[Nebula](https://github.com/Bwar/Nebula)的這個路徑[https://github.com/Bwar/Nebula/tree/master/proto](https://github.com/Bwar/Nebula/tree/master/proto)找到。
??開發Nebula框架目的是致力於提供一種基於C\+\+快速構建高性能的分布式服務。如果覺得本文對你有用,別忘了到Nebula的[__Github__](https://github.com/Bwar/Nebula)或[__碼雲__](https://gitee.com/Bwar/Nebula)給個star,謝謝。
Protobuf協議精品應用