分布式環境中,模塊數據交互協議分析 (百度brpc)
阿新 • • 發佈:2018-08-18
sign short 自己的 source 變量 git character 消息 sock
1. 背景
之前聽到同事說,要為自己的模塊考慮寫個數據協議。今天有空想了一下。寫出來,方便後續使用。
開源代碼brpc
中可以支持多種協議,nshead、redis、mongo等20多種協議。
2. 什麽是數據交互協議?
這裏說的協議,不是tcp/ip這些網絡協議。
在分布式環境中,我們需要將模塊的數據通過網絡bit流傳給上、下遊模塊,就會涉及到數據完整性
、正確性
校驗。
為了能夠校驗數據,就需要定義數據交換協議。
3. 代碼brpc中的實現
每種協議類型,都需要實現自己的parser類,進行消息的驗證。
3.1 bprc 中nshead協議的校驗
nshead_t 結構體
static const unsigned int NSHEAD_MAGICNUM = 0xfb709394; //特殊數字 struct nshead_t { unsigned short id; unsigned short version; unsigned int log_id; char provider[16]; unsigned int magic_num; unsigned int reserved; unsigned int body_len; //實際傳輸的包體長度 };
校驗過程:magic_num是否正確,是否包體超長,是否包體收到數據不足等。
ParseResult ParseNsheadMessage(butil::IOBuf* source, Socket*, bool /*read_eof*/, const void* /*arg*/) { char header_buf[sizeof(nshead_t)]; const size_t n = source->copy_to(header_buf, sizeof(header_buf)); if (n < offsetof(nshead_t, magic_num) + 4) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } const void* dummy = header_buf + offsetof(nshead_t, magic_num); const unsigned int magic_num = *(unsigned int*)dummy; if (magic_num != NSHEAD_MAGICNUM) { RPC_VLOG << "magic_num=" << magic_num << " doesn't match NSHEAD_MAGICNUM=" << NSHEAD_MAGICNUM; return MakeParseError(PARSE_ERROR_TRY_OTHERS); } if (n < sizeof(nshead_t)) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } const nshead_t* nshead = (const nshead_t *)header_buf; uint32_t body_len = nshead->body_len; if (body_len > FLAGS_max_body_size) { return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); } else if (source->length() < sizeof(header_buf) + body_len) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } policy::MostCommonMessage* msg = policy::MostCommonMessage::Get(); source->cutn(&msg->meta, sizeof(header_buf)); source->cutn(&msg->payload, body_len); return MakeMessage(msg); }
3.2 bprc 中redis協議的校驗
先看看redis中的協議,比如下面主從復制時需要的select db的情形。表示有2行(*2),第一行len:6, vak:SELECT, 第二行len:2, val:10
*2\r\n
$6\r\n
SELECT\r\n
$2\r\n
10\r\n
校驗過程:字符串處理,switch ...case...
bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { // Notice that all branches returning false must not change `buf'. const char* pfc = (const char*)buf.fetch1(); if (pfc == NULL) { return false; } const char fc = *pfc; // first character switch (fc) { case '-': // Error "-<message>\r\n" case '+': { // Simple String "+<string>\r\n" ......
4. 如果要自己實現一種協議
可以學習上面的兩種情況:
(1) nshead 使用特殊magic數組, bodylen,body
(2) redis 使用val_len, val
這也是
通用的套路
,len + value限定了一個變量。
當然可以加一些crc校驗和,等其他條件。
5. 參考:
brpc new_protocol.md
分布式環境中,模塊數據交互協議分析 (百度brpc)