1. 程式人生 > 其它 >Brpc學習:簡單回顯伺服器/客戶端

Brpc學習:簡單回顯伺服器/客戶端

sudo apt-get install git g++ make libssl-dev
sudo apt-get install realpath libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
sudo apt-get install libsnappy-dev

sudo apt-get install gperf
sudo apt-get install libgoogle-perftools-dev

git clone https://github.com/brpc/brpc.git
cd ./brpc/
sh config_brpc.sh --headers=/usr/include --libs=/usr/lib --nodebugsymbols
make

cd example/echo_c++
make
./echo_server & ./echo_client

sudo apt-get install libgtest-dev
cd /usr/src/gtest
sudo cmake .
sudo make
sudo mv libgtest* /usr/lib/
cd ./brpc/
sh config_brpc.sh --headers=/usr/include --libs=/usr/lib
cd test/
make
sh run_tests.sh

/********** Makefile **********/

//config.mk
# Generated by config_brpc.sh, don't modify manually
HDRS=/usr/include/
LIBS=/usr/lib/x86_64-linux-gnu
PROTOC=/usr/bin/protoc
PROTOBUF_HDR=/usr/include/
CC=gcc
CXX=g++
GCC_VERSION=50400
STATIC_LINKINGS= -lgflags -lprotobuf -lleveldb -lsnappy
DYNAMIC_LINKINGS=-lpthread -lrt -lssl -lcrypto -ldl -lz
ifeq ($(NEED_LIBPROTOC), 1)
    STATIC_LINKINGS+=-lprotoc
endif
ifeq ($(NEED_GPERFTOOLS), 1)
    LIBS+=/usr/lib
    DYNAMIC_LINKINGS+=-ltcmalloc_and_profiler
endif
CPPFLAGS+=-DBRPC_WITH_GLOG=0 -DGFLAGS_NS=google 
ifeq ($(NEED_GTEST), 1)
    LIBS+=/usr/lib
    STATIC_LINKINGS+=-lgtest
    STATIC_LINKINGS+=-lgtest_main
endif

//Makefile
BRPC_PATH = ../../
include $(BRPC_PATH)/config.mk
# Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
CXXFLAGS+=$(CPPFLAGS) -std=c++0x -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))

STATIC_LINKINGS += -lbrpc

CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
PROTOS = $(wildcard *.proto)

PROTO_OBJS = $(PROTOS:.proto=.pb.o)
PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES))) 
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES))) 

.PHONY:all
all: echo_client echo_server

.PHONY:clean
clean:
	@echo "Cleaning"
	@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS)

echo_client:$(PROTO_OBJS) $(CLIENT_OBJS)
	@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
	@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
	#@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
	@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
	#@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif

echo_server:$(PROTO_OBJS) $(SERVER_OBJS)
	@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
	@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
	#@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
	@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
	#@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif

%.pb.cc %.pb.h:%.proto
	@echo "Generating $@"
	@$(PROTOC) --cpp_out=. --proto_path=. $(PROTOC_EXTRA_ARGS) $<

%.o:%.cpp
	@echo "Compiling $@"
	@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@

%.o:%.cc
	@echo "Compiling $@"
	@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@

/********** server **********/

填寫proto檔案 https://www.cnblogs.com/yinheyi/p/6080244.html

請求、回覆、服務的介面均定義在proto檔案中。

# 告訴protoc要生成C++ Service基類,如果是java或python,則應分別修改為java_generic_services和py_generic_services
option cc_generic_services = true;
 
message EchoRequest {
      required string message = 1;
};
message EchoResponse {
      required string message = 1;
};
 
service EchoService {
      rpc Echo(EchoRequest) returns (EchoResponse);
};br

protobuf的更多用法請閱讀protobuf官方文件。
實現生成的Service介面

protoc執行後會生成echo.pb.cc和echo.pb.h檔案,你得include echo.pb.h,實現其中的EchoService基類:

#include "echo.pb.h"
...
class MyEchoService : public EchoService  {
public:
    void Echo(::google::protobuf::RpcController* cntl_base,
              const ::example::EchoRequest* request,
              ::example::EchoResponse* response,
              ::google::protobuf::Closure* done) {
        // 這個物件確保在return時自動呼叫done->Run()
        brpc::ClosureGuard done_guard(done);
         
        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
 
        // 填寫response
        response->set_message(request->message());
    }
};

Service在插入brpc.Server後才可能提供服務。

當客戶端發來請求時,Echo()會被呼叫。引數的含義分別是:

controller

在brpc中可以靜態轉為brpc::Controller(前提是程式碼執行brpc.Server中),包含了所有request和response之外的引數集合,具體介面查閱controller.h

request

請求,只讀的,來自client端的資料包。

response

回覆。需要使用者填充,如果存在required欄位沒有被設定,該次呼叫會失敗。

done

done由框架建立,遞給服務回撥,包含了呼叫服務回撥後的後續動作,包括檢查response正確性,序列化,打包,傳送等邏輯。

不管成功失敗,done->Run()必須在請求處理完成後被使用者呼叫一次。

為什麼框架不自己呼叫done->Run()?這是為了允許使用者把done儲存下來,在服務回撥之後的某事件發生時再呼叫,即實現非同步Service。

強烈建議使用ClosureGuard確保done->Run()被呼叫,即在服務回撥開頭的那句:

brpc::ClosureGuard done_guard(done);

不管在中間還是末尾脫離服務回撥,都會使done_guard析構,其中會呼叫done->Run()。這個機制稱為RAII(Resource Acquisition Is Initialization也稱為“資源獲取就是初始化”)。沒有這個的話你得在每次return前都加上done->Run(),極易忘記。

在非同步Service中,退出服務回撥時請求未處理完成,done->Run()不應被呼叫,done應被儲存下來供以後呼叫,乍看起來,這裡並不需要用ClosureGuard。但在實踐中,非同步Service照樣會因各種原因跳出回撥,如果不使用ClosureGuard,一些分支很可能會在return前忘記done->Run(),所以我們也建議在非同步service中使用done_guard,與同步Service不同的是,為了避免正常脫離函式時done->Run()也被呼叫,你可以呼叫done_guard.release()來釋放其中的done。

一般來說,同步Service和非同步Service分別按如下程式碼處理done:

class MyFooService: public FooService  {
public:
    // 同步服務
    void SyncFoo(::google::protobuf::RpcController* cntl_base,
                 const ::example::EchoRequest* request,
                 ::example::EchoResponse* response,
                 ::google::protobuf::Closure* done) {
         brpc::ClosureGuard done_guard(done);
         ...
    }
 
    // 非同步服務
    void AsyncFoo(::google::protobuf::RpcController* cntl_base,
                  const ::example::EchoRequest* request,
                  ::example::EchoResponse* response,
                  ::google::protobuf::Closure* done) {
         brpc::ClosureGuard done_guard(done);
         ...
         done_guard.release();
    }
};

ClosureGuard的介面如下:

// RAII: Call Run() of the closure on destruction.
class ClosureGuard {
public:
    ClosureGuard();
    // Constructed with a closure which will be Run() inside dtor.
    explicit ClosureGuard(google::protobuf::Closure* done);
    
    // Call Run() of internal closure if it's not NULL.
    ~ClosureGuard();
 
    // Call Run() of internal closure if it's not NULL and set it to `done'.
    void reset(google::protobuf::Closure* done);
 
    // Set internal closure to NULL and return the one before set.
    google::protobuf::Closure* release();
};

標記當前呼叫為失敗

呼叫Controller.SetFailed()可以把當前呼叫設定為失敗,當傳送過程出現錯誤時,框架也會呼叫這個函式。使用者一般是在服務的CallMethod裡呼叫這個函式,比如某個處理環節出錯,SetFailed()後確認done->Run()被呼叫了就可以跳出函數了(若使用了ClosureGuard,跳出函式時會自動呼叫done,不用手動)。Server端的done的邏輯主要是傳送response回client,當其發現使用者呼叫了SetFailed()後,會把錯誤資訊送回client。client收到後,它的Controller::Failed()會為true(成功時為false),Controller::ErrorCode()和Controller::ErrorText()則分別是錯誤碼和錯誤資訊。

使用者可以為http訪問設定status-code,在server端一般是呼叫controller.http_response().set_status_code(),標準的status-code定義在http_status_code.h中。如果SetFailed了但沒有設定status-code,框架會代為選擇和錯誤碼最接近的status-code,實在沒有相關的則填brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR(500錯誤)
獲取Client的地址

controller->remote_side()可獲得傳送該請求的client地址和埠,型別是butil::EndPoint。如果client是nginx,remote_side()是nginx的地址。要獲取真實client的地址,可以在nginx裡設定proxy_header ClientIp $remote_addr;, 在rpc中通過controller->http_request().GetHeader("ClientIp")獲得對應的值。

列印方式:

LOG(INFO) << "remote_side=" << cntl->remote_side(); 
printf("remote_side=%sn", butil::endpoint2str(cntl->remote_side()).c_str());

獲取Server的地址

controller->local_side()獲得server端的地址,型別是butil::EndPoint。

列印方式:

LOG(INFO) << "local_side=" << cntl->local_side(); 
printf("local_side=%sn", butil::endpoint2str(cntl->local_side()).c_str());

非同步Service

即done->Run()在Service回撥之外被呼叫。

有些server以等待後端服務返回結果為主,且處理時間特別長,為了及時地釋放出執行緒資源,更好的辦法是把done註冊到被等待事件的回撥中,等到事件發生後再呼叫done->Run()。

非同步service的最後一行一般是done_guard.release()以確保正常退出CallMethod時不會呼叫done->Run()。例子請看example/session_data_and_thread_local。

Service和Channel都可以使用done來表達後續的操作,但它們是完全不同的,請勿混淆:

    Service的done由框架建立,使用者處理請求後呼叫done把response發回給client。
    Channel的done由使用者建立,待RPC結束後被框架呼叫以執行使用者的後續程式碼。

在一個會訪問下游服務的非同步服務中會同時接觸兩者,容易搞混,請注意區分。
加入Service

預設構造後的Server不包含任何服務,也不會對外提供服務,僅僅是一個物件。

通過如下方法插入你的Service例項。

int AddService(google::protobuf::Service* service, ServiceOwnership ownership);

若ownership引數為SERVER_OWNS_SERVICE,Server在析構時會一併刪除Service,否則應設為SERVER_DOESNT_OWN_SERVICE。

插入MyEchoService程式碼如下:

brpc::Server server;
MyEchoService my_echo_service;
if (server.AddService(&my_echo_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
    LOG(FATAL) << "Fail to add my_echo_service";
    return -1;
}

Server啟動後你無法再修改其中的Service。
啟動

呼叫以下Server的一下介面啟動服務。

int Start(const char* ip_and_port_str, const ServerOptions* opt);
int Start(EndPoint ip_and_port, const ServerOptions* opt);
int Start(int port, const ServerOptions* opt);
int Start(const char *ip_str, PortRange port_range, const ServerOptions *opt);  // r32009後增加

"localhost:9000", "cq01-cos-dev00.cq01:8000", “127.0.0.1:7000"都是合法的ip_and_port_str。

options為NULL時所有引數取預設值,如果你要使用非預設值,這麼做就行了:

brpc::ServerOptions options;  // 包含了預設值
options.xxx = yyy;
...
server.Start(..., &options);

監聽多個埠

一個server只能監聽一個埠(不考慮ServerOptions.internal_port),需要監聽N個埠就起N個Server。
停止

server.Stop(closewait_ms); // closewait_ms實際無效,出於歷史原因未刪
server.Join();

Stop()不會阻塞,Join()會。分成兩個函式的原因在於當多個Server需要退出時,可以先全部Stop再一起Join,如果一個個Stop/Join,可能得花費Server個數倍的等待時間。

不管closewait_ms是什麼值,server在退出時會等待所有正在被處理的請求完成,同時對新請求立刻回覆ELOGOFF錯誤以防止新請求加入。這麼做的原因在於只要server退出時仍有處理執行緒執行,就有訪問到已釋放記憶體的風險。如果你的server“退不掉”,很有可能是由於某個檢索執行緒沒結束或忘記呼叫done了。

當client看到ELOGOFF時,會跳過對應的server,並在其他server上重試對應的請求。所以在一般情況下brpc總是“優雅退出”的,重啟或上線時幾乎不會或只會丟失很少量的流量。

RunUntilAskedToQuit()函式可以在大部分情況下簡化server的運轉和停止程式碼。在server.Start後,只需如下程式碼即會讓server執行直到按到Ctrl-C。

// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
 
// server已經停止了,這裡可以寫釋放資源的程式碼。

Join()完成後可以修改其中的Service,並重新Start。
被HTTP client訪問

使用Protobuf的服務通常可以通過HTTP+json訪問,存於http body的json串可與對應protobuf訊息相互轉化。以echo server為例,你可以用curl訪問這個服務。

# -H 'Content-Type: application/json' is optional
$ curl -d '{"message":"hello"}' http://brpc.baidu.com:8765/EchoService/Echo
{"message":"hello"}

注意:也可以指定Content-Type: application/proto用http+protobuf二進位制串訪問服務,序列化效能更好。
json<=>pb

json欄位通過匹配的名字和結構與pb欄位一一對應。json中一定要包含pb的required欄位,否則轉化會失敗,對應請求會被拒絕。json中可以包含pb中沒有定義的欄位,但它們會被丟棄而不會存入pb的unknown欄位。轉化規則詳見json <=> protobuf。

開啟選項-pb_enum_as_number後,pb中的enum會轉化為它的數值而不是名字,比如在enum MyEnum { Foo = 1; Bar = 2; };中不開啟此選項時MyEnum型別的欄位會轉化為"Foo"或"Bar",開啟後為1或2。此選項同時影響client發出的請求和server返回的回覆。由於轉化為名字相比數值有更好的前後相容性,此選項只應用於相容無法處理enum為名字的老程式碼。
相容早期版本client

早期的brpc允許一個pb service被http協議訪問時不設定pb請求,即使裡面有required欄位。一般來說這種service會自行解析http請求和設定http回覆,並不會訪問pb請求。但這也是非常危險的行為,畢竟這是pb service,但pb請求卻是未定義的。

這種服務在升級到新版本rpc時會遇到障礙,因為brpc已不允許這種行為。為了幫助這種服務升級,brpc允許經過一些設定後不把http body自動轉化為pb request(從而可自行處理),方法如下:

brpc::ServiceOptions svc_opt;
svc_opt.ownership = ...;
svc_opt.restful_mappings = ...;
svc_opt.allow_http_body_to_pb = false; //關閉http body至pb request的自動轉化
server.AddService(service, svc_opt);

如此設定後service收到http請求後不會嘗試把body轉化為pb請求,所以pb請求總是未定義狀態,使用者得在cntl->request_protocol() == brpc::PROTOCOL_HTTP認定請求是http時自行解析http body。

相應地,當cntl->response_attachment()不為空且pb回覆不為空時,框架不再報錯,而是直接把cntl->response_attachment()作為回覆的body。這個功能和設定allow_http_body_to_pb與否無關。如果放開自由度導致過多的使用者犯錯,可能會有進一步的調整。
協議支援

server端會自動嘗試其支援的協議,無需使用者指定。cntl->protocol()可獲得當前協議。server能從一個listen埠建立不同協議的連線,不需要為不同的協議使用不同的listen埠,一個連線上也可以傳輸多種協議的資料包, 但一般不會這麼做(也不建議),支援的協議有:

    百度標準協議,顯示為"baidu_std",預設啟用。

    流式RPC協議,顯示為"streaming_rpc", 預設啟用。

    http 1.0/1.1,顯示為”http“,預設啟用。

    RTMP協議,顯示為"rtmp", 預設啟用。

    hulu-pbrpc的協議,顯示為"hulu_pbrpc",預設啟動。

    sofa-pbrpc的協議,顯示為”sofa_pbrpc“, 預設啟用。

    百盟的協議,顯示為”nova_pbrpc“, 預設不啟用,開啟方式:

    #include <brpc/policy/nova_pbrpc_protocol.h>
    ...
    ServerOptions options;
    ...
    options.nshead_service = new brpc::policy::NovaServiceAdaptor;

    public_pbrpc協議,顯示為"public_pbrpc",預設不啟用,開啟方式:

    #include <brpc/policy/public_pbrpc_protocol.h>
    ...
    ServerOptions options;
    ...
    options.nshead_service = new brpc::policy::PublicPbrpcServiceAdaptor;

    nshead+mcpack協議,顯示為"nshead_mcpack",預設不啟用,開啟方式:

    #include <brpc/policy/nshead_mcpack_protocol.h>
    ...
    ServerOptions options;
    ...
    options.nshead_service = new brpc::policy::NsheadMcpackAdaptor;

    顧名思義,這個協議的資料包由nshead+mcpack構成,mcpack中不包含特殊欄位。不同於使用者基於NsheadService的實現,這個協議使用了mcpack2pb,使得一份程式碼可以同時處理mcpack和pb兩種格式。由於沒有傳遞ErrorText的欄位,當發生錯誤時server只能關閉連線。

    和UB相關的協議請閱讀實現NsheadService。

如果你有更多的協議需求,可以聯絡我們。
設定
版本

Server.set_version(...)可以為server設定一個名稱+版本,可通過/version內建服務訪問到。雖然叫做"version“,但設定的值請包含服務名,而不僅僅是一個數字版本。
關閉閒置連線

如果一個連線在ServerOptions.idle_timeout_sec對應的時間內沒有讀取或寫出資料,則被視為”閒置”而被server主動關閉。預設值為-1,代表不開啟。

開啟-log_idle_connection_close後關閉前會列印一條日誌。
Name 	Value 	Description 	Defined At
log_idle_connection_close 	false 	Print log when an idle connection is closed 	src/brpc/socket.cpp
pid_file

如果設定了此欄位,Server啟動時會建立一個同名檔案,內容為程序號。預設為空。
在每條日誌後列印hostname

此功能只對butil/logging.h中的日誌巨集有效。

開啟-log_hostname後每條日誌後都會帶本機名稱,如果所有的日誌需要彙總到一起進行分析,這個功能可以幫助你瞭解某條日誌來自哪臺機器。
列印FATAL日誌後退出程式

此功能只對butil/logging.h中的日誌巨集有效,glog預設在FATAL日誌時crash。

開啟-crash_on_fatal_log後如果程式使用LOG(FATAL)列印了異常日誌或違反了CHECK巨集中的斷言,那麼程式會在列印日誌後abort,這一般也會產生coredump檔案,預設不開啟。這個開關可在對程式的壓力測試中開啟,以確認程式沒有進入過嚴重錯誤的分支。

    一般的慣例是,ERROR表示可容忍的錯誤,FATAL代表不可逆轉的錯誤。

最低日誌級別

此功能由butil/logging.h和glog各自實現,為同名選項。

只有不低於-minloglevel指定的日誌級別的日誌才會被列印。這個選項可以動態修改。設定值和日誌級別的對應關係:0=INFO 1=NOTICE 2=WARNING 3=ERROR 4=FATAL,預設為0。

未列印日誌的開銷只是一次if判斷,也不會評估引數(比如某個引數呼叫了函式,日誌不打,這個函式就不會被呼叫)。如果日誌最終列印到自定義LogSink,那麼還要經過LogSink的過濾。
歸還空閒記憶體至系統

選項-free_memory_to_system_interval表示每過這麼多秒就嘗試向系統歸還空閒記憶體,<= 0表示不開啟,預設值為0,若開啟建議設為10及以上的值。此功能支援tcmalloc,之前程式中對MallocExtension::instance()->ReleaseFreeMemory()的定期呼叫可改成設定此選項。
打印發送給client的錯誤

server的框架部分一般不針對個別client列印錯誤日誌,因為當大量client出現錯誤時,可能導致server高頻列印日誌而嚴重影響效能。但有時為了除錯問題,或就是需要讓server列印錯誤,開啟引數-log_error_text即可。
限制最大訊息

為了保護server和client,當server收到的request或client收到的response過大時,server或client會拒收並關閉連線。此最大尺寸由-max_body_size控制,單位為位元組。

超過最大訊息時會列印如下錯誤日誌:

FATAL: 05-10 14:40:05: * 0 src/brpc/input_messenger.cpp:89] A message from 127.0.0.1:35217(protocol=baidu_std) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages

protobuf中有類似的限制,出錯時會列印如下日誌:

FATAL: 05-10 13:35:02: * 0 google/protobuf/io/coded_stream.cc:156] A protocol message was rejected because it was too big (more than 67108864 bytes). To increase the limit (or to disable these warnings), see CodedInputStream::SetTotalBytesLimit() in google/protobuf/io/coded_stream.h.

brpc移除了protobuf中的限制,全交由此選項控制,只要-max_body_size足夠大,使用者就不會看到錯誤日誌。此功能對protobuf的版本沒有要求。
壓縮

set_response_compress_type()設定response的壓縮方式,預設不壓縮。

注意附件不會被壓縮。HTTP body的壓縮方法見這裡。

支援的壓縮方法有:

    brpc::CompressTypeSnappy : snanpy壓縮,壓縮和解壓顯著快於其他壓縮方法,但壓縮率最低。
    brpc::CompressTypeGzip : gzip壓縮,顯著慢於snappy,但壓縮率高
    brpc::CompressTypeZlib : zlib壓縮,比gzip快10%~20%,壓縮率略好於gzip,但速度仍明顯慢於snappy。

更具體的效能對比見Client-壓縮.
附件

baidu_std和hulu_pbrpc協議支援傳遞附件,這段資料由使用者自定義,不經過protobuf的序列化。站在server的角度,設定在Controller.response_attachment()的附件會被client端收到,Controller.request_attachment()則包含了client端送來的附件。

附件不會被框架壓縮。

在http協議中,附件對應message body,比如要返回的資料就設定在response_attachment()中。
驗證client身份

如果server端要開啟驗證功能,需要實現Authenticator中的介面:

class Authenticator {
public:
    // Implement this method to verify credential information `auth_str' from
    // `client_addr'. You can fill credential context (result) into `*out_ctx'
    // and later fetch this pointer from `Controller'.
    // Returns 0 on success, error code otherwise
    virtual int VerifyCredential(const std::string& auth_str,
                                 const base::EndPoint& client_addr,
                                 AuthContext* out_ctx) const = 0;
    }; 

class AuthContext {
public:
    const std::string& user() const;
    const std::string& group() const;
    const std::string& roles() const;
    const std::string& starter() const;
    bool is_service() const;
};

server的驗證是基於連線的。當server收到連線上的第一個請求時,會嘗試解析出其中的身份資訊部分(如baidu_std裡的auth欄位、HTTP協議裡的Authorization頭),然後附帶client地址資訊一起呼叫VerifyCredential。若返回0,表示驗證成功,使用者可以把驗證後的資訊填入AuthContext,後續可通過controller->auth_context()獲取,使用者不需要關心其分配和釋放。否則表示驗證失敗,連線會被直接關閉,client訪問失敗。

後續請求預設通過驗證麼,沒有認證開銷。

把實現的Authenticator例項賦值到ServerOptions.auth,即開啟驗證功能,需要保證該例項在整個server執行週期內都有效,不能被析構。
worker執行緒數

設定ServerOptions.num_threads即可,預設是cpu core的個數(包含超執行緒的)。

注意: ServerOptions.num_threads僅僅是個提示。

你不能認為Server就用了這麼多執行緒,因為程序內的所有Server和Channel會共享執行緒資源,執行緒總數是所有ServerOptions.num_threads和-bthread_concurrency中的最大值。比如一個程式內有兩個Server,num_threads分別為24和36,bthread_concurrency為16。那麼worker執行緒數為max(24, 36, 16) = 36。這不同於其他RPC實現中往往是加起來。

Channel沒有相應的選項,但可以通過選項-bthread_concurrency調整。

另外,brpc不區分IO執行緒和處理執行緒。brpc知道如何編排IO和處理程式碼,以獲得更高的併發度和執行緒利用率。
限制最大併發

“併發”可能有兩種含義,一種是連線數,一種是同時在處理的請求數。這裡提到的是後者。

在傳統的同步server中,最大併發不會超過工作執行緒數,設定工作執行緒數量一般也限制了併發。但brpc的請求運行於bthread中,M個bthread會對映至N個worker中(一般M大於N),所以同步server的併發度可能超過worker數量。另一方面,雖然非同步server的併發不受執行緒數控制,但有時也需要根據其他因素控制併發量。

brpc支援設定server級和method級的最大併發,當server或method同時處理的請求數超過併發度限制時,它會立刻給client回覆brpc::ELIMIT錯誤,而不會呼叫服務回撥。看到ELIMIT錯誤的client應重試另一個server。這個選項可以防止server出現過度排隊,或用於限制server佔用的資源。

預設不開啟。
為什麼超過最大併發要立刻給client返回錯誤而不是排隊?

當前server達到最大併發並不意味著叢集中的其他server也達到最大併發了,立刻讓client獲知錯誤,並去嘗試另一臺server在全域性角度是更好的策略。
為什麼不限制QPS?

QPS是一個秒級的指標,無法很好地控制瞬間的流量爆發。而最大併發和當前可用的重要資源緊密相關:"工作執行緒",“槽位”等,能更好地抑制排隊。

另外當server的延時較為穩定時,限制併發的效果和限制QPS是等價的。但前者實現起來容易多了:只需加減一個代表併發度的計數器。這也是大部分流控都限制併發而不是QPS的原因,比如TCP中的“視窗"即是一種併發度。
計算最大併發數

最大併發度 = 極限QPS * 平均延時 (little's law)

極限QPS和平均延時指的是server在沒有嚴重積壓請求的前提下(請求的延時仍能接受時)所能達到的最大QPS和當時的平均延時。一般的服務上線都會有效能壓測,把測得的QPS和延時相乘一般就是該服務的最大併發度。
限制server級別併發度

設定ServerOptions.max_concurrency,預設值0代表不限制。訪問內建服務不受此選項限制。

Server.ResetMaxConcurrency()可在server啟動後動態修改server級別的max_concurrency。
限制method級別併發度

server.MaxConcurrencyOf("...") = ...可設定method級別的max_concurrency。可能的設定方法有:

server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
server.MaxConcurrencyOf("example.EchoService", "Echo") = 10;
server.MaxConcurrencyOf(&service, "Echo") = 10;

此設定一般發生在AddService後,server啟動前。當設定失敗時(比如對應的method不存在),server會啟動失敗同時提示使用者修正MaxConcurrencyOf設定錯誤。

當method級別和server級別的max_concurrency都被設定時,先檢查server級別的,再檢查method級別的。

注意:沒有service級別的max_concurrency。
pthread模式

使用者程式碼(客戶端的done,伺服器端的CallMethod)預設在棧為1MB的bthread中執行。但有些使用者程式碼無法在bthread中執行,比如:

    JNI會檢查stack layout而無法在bthread中執行。
    程式碼中廣泛地使用pthread local傳遞session級別全域性資料,在RPC前後均使用了相同的pthread local的資料,且資料有前後依賴性。比如在RPC前往pthread-local儲存了一個值,RPC後又讀出來希望和之前儲存的相等,就會有問題。而像tcmalloc雖然也使用了pthread/LWP local,但每次使用之間沒有直接的依賴,是安全的。

對於這些情況,brpc提供了pthread模式,開啟**-usercode_in_pthread**後,使用者程式碼均會在pthread中執行,原先阻塞bthread的函式轉而阻塞pthread。

開啟pthread模式後在效能上的注意點:

    同步RPC都會阻塞worker pthread,server端一般需要設定更多的工作執行緒(ServerOptions.num_threads),排程效率會略微降低。
    執行使用者程式碼的仍然是bthread,只是很特殊,會直接使用pthread worker的棧。這些特殊bthread的排程方式和其他bthread是一致的,這方面效能差異很小。
    bthread支援一個獨特的功能:把當前使用的pthread worker 讓給另一個新建立的bthread執行,以消除一次上下文切換。brpc client利用了這點,從而使一次RPC過程中3次上下文切換變為了2次。在高QPS系統中,消除上下文切換可以明顯改善效能和延時分佈。但pthread模式不具備這個能力,在高QPS系統中效能會有一定下降。
    pthread模式中執行緒資源是硬限,一旦執行緒被打滿,請求就會迅速擁塞而造成大量超時。一個常見的例子是:下游服務大量超時後,上游服務可能由於執行緒大都在等待下游也被打滿從而影響效能。開啟pthread模式後請考慮設定ServerOptions.max_concurrency以控制server的最大併發。而在bthread模式中bthread個數是軟限,對此類問題的反應會更加平滑。

pthread模式可以讓一些老程式碼快速嘗試brpc,但我們仍然建議逐漸地把程式碼改造為使用bthread local或最好不用TLS,從而最終能關閉這個開關。
安全模式

如果你的服務流量來自外部(包括經過nginx等轉發),你需要注意一些安全因素:
對外隱藏內建服務

內建服務很有用,但包含了大量內部資訊,不應對外暴露。有多種方式可以對外隱藏內建服務:

    設定內部埠。把ServerOptions.internal_port設為一個僅允許內網訪問的埠。你可通過internal_port訪問到內建服務,但通過對外埠(Server.Start時傳入的那個)訪問內建服務時將看到如下錯誤:

    [a27eda84bcdeef529a76f22872b78305] Not allowed to access builtin services, try ServerOptions.internal_port=... instead if you're inside internal network

    http proxy指定轉發路徑。nginx等可配置URL的對映關係,比如下面的配置把訪問/MyAPI的外部流量對映到target-server的/ServiceName/MethodName。當外部流量嘗試訪問內建服務,比如說/status時,將直接被nginx拒絕。

  location /MyAPI {
      ...
      proxy_pass http://<target-server>/ServiceName/MethodName$query_string   # $query_string是nginx變數,更多變數請查詢http://nginx.org/en/docs/http/ngx_http_core_module.html
      ...
  }

請勿在對外服務上開啟-enable_dir_service和-enable_threads_service兩個選項,它們雖然很方便,但會嚴重洩露伺服器上的其他資訊。檢查對外的rpc服務是否打開了這兩個開關:

curl -s -m 1 <HOSTNAME>:<PORT>/flags/enable_dir_service,enable_threads_service | awk '{if($3=="false"){++falsecnt}else if($3=="Value"){isrpc=1}}END{if(isrpc!=1||falsecnt==2){print "SAFE"}else{print "NOT SAFE"}}'

轉義外部可控的URL

可呼叫brpc::WebEscape()對url進行轉義,防止惡意URI注入攻擊。
不返回內部server地址

可以考慮對server地址做簽名。比如在設定ServerOptions.internal_port後,server返回的錯誤資訊中的IP資訊是其MD5簽名,而不是明文。
定製/health頁面

/health頁面預設返回"OK",若需定製/health頁面的內容:先繼承HealthReporter,在其中實現生成頁面的邏輯(就像實現其他http service那樣),然後把例項賦給ServerOptions.health_reporter,這個例項不被server擁有,必須保證在server執行期間有效。使用者在定製邏輯中可以根據業務的執行狀態返回更多樣的狀態資訊。
執行緒私有變數

百度內的檢索程式大量地使用了thread-local storage (縮寫TLS),有些是為了快取頻繁訪問的物件以避免反覆建立,有些則是為了在全域性函式間隱式地傳遞狀態。你應當儘量避免後者,這樣的函式難以測試,不設定thread-local變數甚至無法執行。brpc中有三套機制解決和thread-local相關的問題。
session-local

session-local data與一次server端RPC繫結: 從進入service回撥開始,到呼叫server端的done結束,不管該service是同步還是非同步處理。 session-local data會盡量被重用,在server停止前不會被刪除。

設定ServerOptions.session_local_data_factory後訪問Controller.session_local_data()即可獲得session-local資料。若沒有設定,Controller.session_local_data()總是返回NULL。

若ServerOptions.reserved_session_local_data大於0,Server會在提供服務前就建立這麼多個數據。

示例用法

struct MySessionLocalData {
    MySessionLocalData() : x(123) {}
    int x;
};
 
class EchoServiceImpl : public example::EchoService {
public:
    ...
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        ...
        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
 
        // Get the session-local data which is created by ServerOptions.session_local_data_factory
        // and reused between different RPC.
        MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data());
        if (sd == NULL) {
            cntl->SetFailed("Require ServerOptions.session_local_data_factory to be set with a correctly implemented instance");
            return;
        }
        ...

struct ServerOptions {
    ...
    // The factory to create/destroy data attached to each RPC session.
    // If this field is NULL, Controller::session_local_data() is always NULL.
    // NOT owned by Server and must be valid when Server is running.
    // Default: NULL
    const DataFactory* session_local_data_factory;
 
    // Prepare so many session-local data before server starts, so that calls
    // to Controller::session_local_data() get data directly rather than
    // calling session_local_data_factory->Create() at first time. Useful when
    // Create() is slow, otherwise the RPC session may be blocked by the
    // creation of data and not served within timeout.
    // Default: 0
    size_t reserved_session_local_data;
};

session_local_data_factory的型別為DataFactory,你需要實現其中的CreateData和DestroyData。

注意:CreateData和DestroyData會被多個執行緒同時呼叫,必須執行緒安全。

class MySessionLocalDataFactory : public brpc::DataFactory {
public:
    void* CreateData() const {
        return new MySessionLocalData;
    }  
    void DestroyData(void* d) const {
        delete static_cast<MySessionLocalData*>(d);
    }  
};
 
int main(int argc, char* argv[]) {
    ...
    MySessionLocalDataFactory session_local_data_factory;
 
    brpc::Server server;
    brpc::ServerOptions options;
    ...
    options.session_local_data_factory = &session_local_data_factory;
    ...

server-thread-local

server-thread-local與一次service回撥繫結,從進service回撥開始,到出service回撥結束。所有的server-thread-local data會被儘量重用,在server停止前不會被刪除。在實現上server-thread-local是一個特殊的bthread-local。

設定ServerOptions.thread_local_data_factory後訪問Controller.thread_local_data()即可獲得thread-local資料。若沒有設定,Controller.thread_local_data()總是返回NULL。

若ServerOptions.reserved_thread_local_data大於0,Server會在啟動前就建立這麼多個數據。

與session-local的區別

session-local data得從server端的Controller獲得, server-thread-local可以在任意函式中獲得,只要這個函式直接或間接地執行在server執行緒中。

當service是同步時,session-local和server-thread-local基本沒有差別,除了前者需要Controller建立。當service是非同步時,且你需要在done->Run()中訪問到資料,這時只能用session-local,因為server-thread-local在service回撥外已經失效。

示例用法

struct MyThreadLocalData {
    MyThreadLocalData() : y(0) {}
    int y;
};
 
class EchoServiceImpl : public example::EchoService {
public:
    ...
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        ...
        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
         
        // Get the thread-local data which is created by ServerOptions.thread_local_data_factory
        // and reused between different threads.
        // "tls" is short for "thread local storage".
        MyThreadLocalData* tls = static_cast<MyThreadLocalData*>(brpc::thread_local_data());
        if (tls == NULL) {
            cntl->SetFailed("Require ServerOptions.thread_local_data_factory "
                            "to be set with a correctly implemented instance");
            return;
        }
        ...

struct ServerOptions {
    ...    
    // The factory to create/destroy data attached to each searching thread
    // in server.
    // If this field is NULL, brpc::thread_local_data() is always NULL.
    // NOT owned by Server and must be valid when Server is running.
    // Default: NULL
    const DataFactory* thread_local_data_factory;
 
    // Prepare so many thread-local data before server starts, so that calls
    // to brpc::thread_local_data() get data directly rather than calling
    // thread_local_data_factory->Create() at first time. Useful when Create()
    // is slow, otherwise the RPC session may be blocked by the creation
    // of data and not served within timeout.
    // Default: 0
    size_t reserved_thread_local_data;
};

thread_local_data_factory的型別為DataFactory,你需要實現其中的CreateData和DestroyData。

注意:CreateData和DestroyData會被多個執行緒同時呼叫,必須執行緒安全。

class MyThreadLocalDataFactory : public brpc::DataFactory {
public:
    void* CreateData() const {
        return new MyThreadLocalData;
    }  
    void DestroyData(void* d) const {
        delete static_cast<MyThreadLocalData*>(d);
    }  
};
 
int main(int argc, char* argv[]) {
    ...
    MyThreadLocalDataFactory thread_local_data_factory;
 
    brpc::Server server;
    brpc::ServerOptions options;
    ...
    options.thread_local_data_factory  = &thread_local_data_factory;
    ...

bthread-local

Session-local和server-thread-local對大部分server已經夠用。不過在一些情況下,我們需要更通用的thread-local方案。在這種情況下,你可以使用bthread_key_create, bthread_key_destroy, bthread_getspecific, bthread_setspecific等函式,它們的用法類似pthread中的函式。

這些函式同時支援bthread和pthread,當它們在bthread中被呼叫時,獲得的是bthread私有變數; 當它們在pthread中被呼叫時,獲得的是pthread私有變數。但注意,這裡的“pthread私有變數”不是通過pthread_key_create建立的,使用pthread_key_create建立的pthread-local是無法被bthread_getspecific訪問到的,這是兩個獨立的體系。由gcc的__thread,c++11的thread_local等宣告的私有變數也無法被bthread_getspecific訪問到。

由於brpc會為每個請求建立一個bthread,server中的bthread-local行為特殊:一個server建立的bthread在退出時並不刪除bthread-local,而是還回server的一個pool中,以被其他bthread複用。這可以避免bthread-local隨著bthread的建立和退出而不停地構造和析構。這對於使用者是透明的。

主要介面

// Create a key value identifying a slot in a thread-specific data area.
// Each thread maintains a distinct thread-specific data area.
// `destructor', if non-NULL, is called with the value associated to that key
// when the key is destroyed. `destructor' is not called if the value
// associated is NULL when the key is destroyed.
// Returns 0 on success, error code otherwise.
extern int bthread_key_create(bthread_key_t* key, void (*destructor)(void* data)) __THROW;
 
// Delete a key previously returned by bthread_key_create().
// It is the responsibility of the application to free the data related to
// the deleted key in any running thread. No destructor is invoked by
// this function. Any destructor that may have been associated with key
// will no longer be called upon thread exit.
// Returns 0 on success, error code otherwise.
extern int bthread_key_delete(bthread_key_t key) __THROW;
 
// Store `data' in the thread-specific slot identified by `key'.
// bthread_setspecific() is callable from within destructor. If the application
// does so, destructors will be repeatedly called for at most
// PTHREAD_DESTRUCTOR_ITERATIONS times to clear the slots.
// NOTE: If the thread is not created by brpc server and lifetime is
// very short(doing a little thing and exit), avoid using bthread-local. The
// reason is that bthread-local always allocate keytable on first call to
// bthread_setspecific, the overhead is negligible in long-lived threads,
// but noticeable in shortly-lived threads. Threads in brpc server
// are special since they reuse keytables from a bthread_keytable_pool_t
// in the server.
// Returns 0 on success, error code otherwise.
// If the key is invalid or deleted, return EINVAL.
extern int bthread_setspecific(bthread_key_t key, void* data) __THROW;
 
// Return current value of the thread-specific slot identified by `key'.
// If bthread_setspecific() had not been called in the thread, return NULL.
// If the key is invalid or deleted, return NULL.
extern void* bthread_getspecific(bthread_key_t key) __THROW;

使用方法

用bthread_key_create建立一個bthread_key_t,它代表一種bthread私有變數。

用bthread_[get|set]specific查詢和設定bthread私有變數。一個執行緒中第一次訪問某個私有變數返回NULL。

在所有執行緒都不使用和某個bthread_key_t相關的私有變數後再刪除它。如果刪除了一個仍在被使用的bthread_key_t,相關的私有變數就洩露了。

static void my_data_destructor(void* data) {
    ...
}
 
bthread_key_t tls_key;
 
if (bthread_key_create(&tls_key, my_data_destructor) != 0) {
    LOG(ERROR) << "Fail to create tls_key";
    return -1;
}

// in some thread ...
MyThreadLocalData* tls = static_cast<MyThreadLocalData*>(bthread_getspecific(tls_key));
if (tls == NULL) {  // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
    tls = new MyThreadLocalData;   // Create thread-local data on demand.
    CHECK_EQ(0, bthread_setspecific(tls_key, tls));  // set the data so that next time bthread_getspecific in the thread returns the data.
}

示例程式碼

static void my_thread_local_data_deleter(void* d) {
    delete static_cast<MyThreadLocalData*>(d);
}  
 
class EchoServiceImpl : public example::EchoService {
public:
    EchoServiceImpl() {
        CHECK_EQ(0, bthread_key_create(&_tls2_key, my_thread_local_data_deleter));
    }
    ~EchoServiceImpl() {
        CHECK_EQ(0, bthread_key_delete(_tls2_key));
    };
    ...
private:
    bthread_key_t _tls2_key;
}
 
class EchoServiceImpl : public example::EchoService {
public:
    ...
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        ...
        // You can create bthread-local data for your own.
        // The interfaces are similar with pthread equivalence:
        //   pthread_key_create  -> bthread_key_create
        //   pthread_key_delete  -> bthread_key_delete
        //   pthread_getspecific -> bthread_getspecific
        //   pthread_setspecific -> bthread_setspecific
        MyThreadLocalData* tls2 = static_cast<MyThreadLocalData*>(bthread_getspecific(_tls2_key));
        if (tls2 == NULL) {
            tls2 = new MyThreadLocalData;
            CHECK_EQ(0, bthread_setspecific(_tls2_key, tls2));
        }
        ...

FAQ
Q: Fail to write into fd=1865 [email protected]:54742@8230: Got EOF是什麼意思

A: 一般是client端使用了連線池或短連線模式,在RPC超時後會關閉連線,server寫回response時發現連線已經關了就報這個錯。Got EOF就是指之前已經收到了EOF(對端正常關閉了連線)。client端使用單連線模式server端一般不會報這個。
Q: Remote side of fd=9 [email protected]:8000 was closed是什麼意思

這不是錯誤,是常見的warning,表示對端關掉連線了(EOF)。這個日誌有時對排查問題有幫助。

預設關閉,把引數-log_connection_close設定為true就打開了(支援動態修改)。
Q: 為什麼server端執行緒數設了沒用

brpc同一個程序中所有的server共用執行緒,如果建立了多個server,最終的工作執行緒數多半是最大的那個ServerOptions.num_threads。
Q: 為什麼client端的延時遠大於server端的延時

可能是server端的工作執行緒不夠用了,出現了排隊現象。排查方法請檢視高效率排查服務卡頓。
Q: 程式切換到brpc之後出現了像堆疊寫壞的coredump

brpc的Server是執行在bthread之上,預設棧大小為1MB,而pthread預設棧大小為10MB,所以在pthread上正常執行的程式,在bthread上可能遇到棧不足。

注意:不是說程式core了就意味著”棧不夠大“,只是因為這個試起來最容易,所以優先排除掉可能性。事實上百度內如此多的應用也很少碰到棧不夠大的情況。

解決方案:新增以下gflags以調整棧大小,比如--stack_size_normal=10000000 --tc_stack_normal=1。第一個flag把棧大小修改為10MB,第二個flag表示每個工作執行緒快取的棧的個數(避免每次都從全域性拿).
Q: Fail to open /proc/self/io

有些核心沒這個檔案,不影響服務正確性,但如下幾個bvar會無法更新:

process_io_read_bytes_second
process_io_write_bytes_second
process_io_read_second
process_io_write_second

Q: json串"[1,2,3]"沒法直接轉為protobuf message

這不是標準的json。最外層必須是花括號{}包圍的json object。
附:Server端基本流程

/********** server **********/

/********** client **********/

事實速查

    Channel.Init()是執行緒不安全的。
    Channel.CallMethod()是執行緒安全的,一個Channel可以被所有執行緒同時使用。
    Channel可以分配在棧上。
    Channel在傳送非同步請求後可以析構。
    沒有brpc::Client這個類。

Channel

Client指發起請求的一端,在brpc中沒有對應的實體,取而代之的是brpc::Channel,它代表和一臺或一組伺服器的互動通道,Client和Channel在角色上的差別在實踐中並不重要,你可以把Channel視作Client。

Channel可以被所有執行緒共用,你不需要為每個執行緒建立獨立的Channel,也不需要用鎖互斥。不過Channel的建立和Init並不是執行緒安全的,請確保在Init成功後再被多執行緒訪問,在沒有執行緒訪問後再析構。

一些RPC實現中有ClientManager的概念,包含了Client端的配置資訊和資源管理。brpc不需要這些,以往在ClientManager中配置的執行緒數、長短連線等等要麼被加入了brpc::ChannelOptions,要麼可以通過gflags全域性配置,這麼做的好處:

    方便。你不需要在建立Channel時傳入ClientManager,也不需要儲存ClientManager。否則不少程式碼需要一層層地傳遞ClientManager,很麻煩。gflags使一些全域性行為的配置更加簡單。
    共用資源。比如server和channel可以共用後臺執行緒。(bthread的工作執行緒)
    生命週期。析構ClientManager的過程很容易出錯,現在由框架負責則不會有問題。

就像大部分類那樣,Channel必須在Init之後才能使用,options為NULL時所有引數取預設值,如果你要使用非預設值,這麼做就行了:

brpc::ChannelOptions options;  // 包含了預設值
options.xxx = yyy;
...
channel.Init(..., &options);

注意Channel不會修改options,Init結束後不會再訪問options。所以options一般就像上面程式碼中那樣放棧上。Channel.options()可以獲得channel在使用的所有選項。

Init函式分為連線一臺伺服器和連線服務叢集。
連線一臺伺服器

// options為NULL時取預設值
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);

這類Init連線的伺服器往往有固定的ip地址,不需要名字服務和負載均衡,建立起來相對輕量。但是請勿頻繁建立使用域名的Channel。這需要查詢dns,可能最多耗時10秒(查詢DNS的預設超時)。重用它們。

合法的“server_addr_and_port”:

    127.0.0.1:80
    www.foo.com:8765
    localhost:9000

不合法的"server_addr_and_port":

    127.0.0.1:90000 # 埠過大
    10.39.2.300:8000 # 非法的ip

連線服務叢集

int Init(const char* naming_service_url,
         const char* load_balancer_name,
         const ChannelOptions* options);

這類Channel需要定期從naming_service_url指定的名字服務中獲得伺服器列表,並通過load_balancer_name指定的負載均衡演算法選擇出一臺機器傳送請求。

你不應該在每次請求前動態地建立此類(連線服務叢集的)Channel。因為建立和析構此類Channel牽涉到較多的資源,比如在建立時得訪問一次名字服務,否則便不知道有哪些伺服器可選。由於Channel可被多個執行緒共用,一般也沒有必要動態建立。

當load_balancer_name為NULL或空時,此Init等同於連線單臺server的Init,naming_service_url應該是"ip:port"或"域名:port"。你可以通過這個Init函式統一Channel的初始化方式。比如你可以把naming_service_url和load_balancer_name放在配置檔案中,要連線單臺server時把load_balancer_name置空,要連線服務叢集時則設定一個有效的演算法名稱。
名字服務

名字服務把一個名字對映為可修改的機器列表,在client端的位置如下:

img

有了名字服務後client記錄的是一個名字,而不是每一臺下游機器。而當下遊機器變化時,就只需要修改名字服務中的列表,而不需要逐臺修改每個上游。這個過程也常被稱為“解耦上下游”。當然在具體實現上,上游會記錄每一臺下游機器,並定期向名字服務請求或被推送最新的列表,以避免在RPC請求時才去訪問名字服務。使用名字服務一般不會對訪問效能造成影響,對名字服務的壓力也很小。

naming_service_url的一般形式是"protocol://service_name"
bns://<bns-name>

BNS是百度內常用的名字服務,比如bns://rdev.matrix.all,其中"bns"是protocol,"rdev.matrix.all"是service-name。相關一個gflag是-ns_access_interval: img

如果BNS中顯示不為空,但Channel卻說找不到伺服器,那麼有可能BNS列表中的機器狀態位(status)為非0,含義為機器不可用,所以不會被加入到server候選集中.狀態位可通過命令列檢視:

get_instance_by_service [bns_node_name] -s
file://<path>

伺服器列表放在path所在的檔案裡,比如"file://conf/local_machine_list"中的“conf/local_machine_list”對應一個檔案,其中每行應是一臺伺服器的地址。當檔案更新時, brpc會重新載入。
list://<addr1>,<addr2>...

伺服器列表直接跟在list://之後,以逗號分隔,比如"list://db-bce-81-3-186.db01:7000,m1-bce-44-67-72.m1:7000,cp01-rd-cos-006.cp01:7000" 中有三個地址。
http://<url>

連線一個域名下所有的機器, 例如http://www.baidu.com:80 ,注意連線單點的Init(兩個引數)雖然也可傳入域名,但只會連線域名下的一臺機器。
名字服務過濾器

當名字服務獲得機器列表後,可以自定義一個過濾器進行篩選,最後把結果傳遞給負載均衡:

img

過濾器的介面如下:

// naming_service_filter.h
class NamingServiceFilter {
public:
    // Return true to take this `server' as a candidate to issue RPC
    // Return false to filter it out
    virtual bool Accept(const ServerNode& server) const = 0;
};
 
// naming_service.h
struct ServerNode {
    butil::EndPoint addr;
    std::string tag;
};

常見的業務策略如根據server的tag進行過濾。

自定義的過濾器配置在ChannelOptions中,預設為NULL(不過濾)。

class MyNamingServiceFilter : public brpc::NamingServiceFilter {
public:
    bool Accept(const brpc::ServerNode& server) const {
        return server.tag == "main";
    }
};
 
int main() {
    ...
    MyNamingServiceFilter my_filter;
    ...
    brpc::ChannelOptions options;
    options.ns_filter = &my_filter;
    ...
}

負載均衡

當下遊機器超過一臺時,我們需要分割流量,此過程一般稱為負載均衡,在client端的位置如下圖所示:

img

理想的演算法是每個請求都得到及時的處理,且任意機器crash對全域性影響較小。但由於client端無法及時獲得server端的延遲或擁塞,而且負載均衡演算法不能耗費太多的cpu,一般來說使用者得根據具體的場景選擇合適的演算法,目前rpc提供的演算法有(通過load_balancer_name指定):
rr

即round robin,總是選擇列表中的下一臺伺服器,結尾的下一臺是開頭,無需其他設定。比如有3臺機器a,b,c,那麼brpc會依次向a, b, c, a, b, c, ...傳送請求。注意這個演算法的前提是伺服器的配置,網路條件,負載都是類似的。
random

隨機從列表中選擇一臺伺服器,無需其他設定。和round robin類似,這個演算法的前提也是伺服器都是類似的。
la

locality-aware,優先選擇延時低的下游,直到其延時高於其他機器,無需其他設定。實現原理請檢視Locality-aware load balancing。
c_murmurhash or c_md5

一致性雜湊,與簡單hash的不同之處在於增加或刪除機器時不會使分桶結果劇烈變化,特別適合cache類服務。

發起RPC前需要設定Controller.set_request_code(),否則RPC會失敗。request_code一般是請求中主鍵部分的32位雜湊值,不需要和負載均衡使用的雜湊演算法一致。比如用c_murmurhash演算法也可以用md5計算雜湊值。

src/brpc/policy/hasher.h中包含了常用的hash函式。如果用std::string key代表請求的主鍵,controller.set_request_code(brpc::policy::MurmurHash32(key.data(), key.size()))就正確地設定了request_code。

注意甄別請求中的“主鍵”部分和“屬性”部分,不要為了偷懶或通用,就把請求的所有內容一股腦兒計算出雜湊值,屬性的變化會使請求的目的地發生劇烈的變化。另外也要注意padding問題,比如struct Foo { int32_t a; int64_t b; }在64位機器上a和b之間有4個位元組的空隙,內容未定義,如果像hash(&foo, sizeof(foo))這樣計算雜湊值,結果就是未定義的,得把內容緊密排列或序列化後再算。

實現原理請檢視Consistent Hashing。
健康檢查

連線斷開的server會被暫時隔離而不會被負載均衡演算法選中,brpc會定期連線被隔離的server,以檢查他們是否恢復正常,間隔由引數-health_check_interval控制:
Name 	Value 	Description 	Defined At
health_check_interval (R) 	3 	seconds between consecutive health-checkings 	src/brpc/socket_map.cpp

一旦server被連線上,它會恢復為可用狀態。如果在隔離過程中,server從名字服務中刪除了,brpc也會停止連線嘗試。
發起訪問

一般來說,我們不直接呼叫Channel.CallMethod,而是通過protobuf生成的樁XXX_Stub,過程更像是“呼叫函式”。stub內沒什麼成員變數,建議在棧上建立和使用,而不必new,當然你也可以把stub存下來複用。Channel::CallMethod和stub訪問都是執行緒安全的,可以被所有執行緒同時訪問。比如:

XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);

甚至

XXX_Stub(&channel).some_method(controller, request, response, done);

一個例外是http client。訪問http服務和protobuf沒什麼關係,直接呼叫CallMethod即可,除了Controller和done均為NULL,詳見訪問HTTP服務。
同步訪問

指的是:CallMethod會阻塞到收到server端返回response或發生錯誤(包括超時)。

同步訪問中的response/controller不會在CallMethod後被框架使用,它們都可以分配在棧上。注意,如果request/response欄位特別多位元組數特別大的話,還是更適合分配在堆上。

MyRequest request;
MyResponse response;
brpc::Controller cntl;
XXX_Stub stub(&channel);
 
request.set_foo(...);
cntl.set_timeout_ms(...);
stub.some_method(&cntl, &request, &response, NULL);
if (cntl->Failed()) {
    // RPC失敗了. response裡的值是未定義的,勿用。
} else {
    // RPC成功了,response裡有我們想要的回覆資料。
}

非同步訪問

指的是:給CallMethod傳遞一個額外的回撥物件done,CallMethod在發出request後就結束了,而不是在RPC結束後。當server端返回response或發生錯誤(包括超時)時,done->Run()會被呼叫。對RPC的後續處理應該寫在done->Run()裡,而不是CallMethod後。

由於CallMethod結束不意味著RPC結束,response/controller仍可能被框架及done->Run()使用,它們一般得建立在堆上,並在done->Run()中刪除。如果提前刪除了它們,那當done->Run()被呼叫時,將訪問到無效記憶體。

你可以獨立地建立這些物件,並使用NewCallback生成done,也可以把Response和Controller作為done的成員變數,一起new出來,一般使用前一種方法。

發起非同步請求後Request和Channel也可以立刻析構。這兩樣和response/controller是不同的。注意:這是說Channel的析構可以立刻發生在CallMethod之後,並不是說析構可以和CallMethod同時發生,刪除正被另一個執行緒使用的Channel是未定義行為(很可能crash)。
使用NewCallback

static void OnRPCDone(MyResponse* response, brpc::Controller* cntl) {
    // unique_ptr會幫助我們在return時自動刪掉response/cntl,防止忘記。gcc 3.4下的unique_ptr是模擬版本。
    std::unique_ptr<MyResponse> response_guard(response);
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    if (cntl->Failed()) {
        // RPC失敗了. response裡的值是未定義的,勿用。
    } else {
        // RPC成功了,response裡有我們想要的資料。開始RPC的後續處理。    
    }
    // NewCallback產生的Closure會在Run結束後刪除自己,不用我們做。
}
 
MyResponse* response = new MyResponse;
brpc::Controller* cntl = new brpc::Controller;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在非同步訪問中.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl, &request, response, google::protobuf::NewCallback(OnRPCDone, response, cntl));

由於protobuf 3把NewCallback設定為私有,r32035後brpc把NewCallback獨立於src/brpc/callback.h(並增加了一些過載)。如果你的程式出現NewCallback相關的編譯錯誤,把google::protobuf::NewCallback替換為brpc::NewCallback就行了。
繼承google::protobuf::Closure

使用NewCallback的缺點是要分配三次記憶體:response, controller, done。如果profiler證明這兒的記憶體分配有瓶頸,可以考慮自己繼承Closure,把response/controller作為成員變數,這樣可以把三次new合併為一次。但缺點就是程式碼不夠美觀,如果記憶體分配不是瓶頸,別用這種方法。

class OnRPCDone: public google::protobuf::Closure {
public:
    void Run() {
        // unique_ptr會幫助我們在return時自動delete this,防止忘記。gcc 3.4下的unique_ptr是模擬版本。
        std::unique_ptr<OnRPCDone> self_guard(this);
          
        if (cntl->Failed()) {
            // RPC失敗了. response裡的值是未定義的,勿用。
        } else {
            // RPC成功了,response裡有我們想要的資料。開始RPC的後續處理。
        }
    }
 
    MyResponse response;
    brpc::Controller cntl;
}
 
OnRPCDone* done = new OnRPCDone;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在非同步訪問中.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);

如果非同步訪問中的回撥函式特別複雜會有什麼影響嗎?

沒有特別的影響,回撥會執行在獨立的bthread中,不會阻塞其他的邏輯。你可以在回撥中做各種阻塞操作。
rpc傳送處的程式碼和回撥函式是在同一個執行緒裡執行嗎?

一定不在同一個執行緒裡執行,即使該次rpc呼叫剛進去就失敗了,回撥也會在另一個bthread中執行。這可以在加鎖進行rpc(不推薦)的程式碼中避免死鎖。
等待RPC完成

注意:當你需要發起多個併發操作時,可能ParallelChannel更方便。

如下程式碼發起兩個非同步RPC後等待它們完成。

const brpc::CallId cid1 = controller1->call_id();
const brpc::CallId cid2 = controller2->call_id();
...
stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);
...
brpc::Join(cid1);
brpc::Join(cid2);

在發起RPC前呼叫Controller.call_id()獲得一個id,發起RPC呼叫後Join那個id。

Join()的行為是等到RPC結束且done->Run()執行後,一些Join的性質如下:

    如果對應的RPC已經結束,Join將立刻返回。
    多個執行緒可以Join同一個id,它們都會醒來。
    同步RPC也可以在另一個執行緒中被Join,但一般不會這麼做。

Join()在之前的版本叫做JoinResponse(),如果你在編譯時被提示deprecated之類的,修改為Join()。

在RPC呼叫後Join(controller->call_id())是錯誤的行為,一定要先把call_id儲存下來。因為RPC呼叫後controller可能被隨時開始執行的done刪除。下面程式碼的Join方式是錯誤的。

static void on_rpc_done(Controller* controller, MyResponse* response) {
    ... Handle response ...
    delete controller;
    delete response;
}
 
Controller* controller1 = new Controller;
Controller* controller2 = new Controller;
MyResponse* response1 = new MyResponse;
MyResponse* response2 = new MyResponse;
...
stub.method1(controller1, &request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2, &request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
brpc::Join(controller1->call_id());   // 錯誤,controller1可能被on_rpc_done刪除了
brpc::Join(controller2->call_id());   // 錯誤,controller2可能被on_rpc_done刪除了

半同步

Join可用來實現“半同步”訪問:即等待多個非同步訪問完成。由於呼叫處的程式碼會等到所有RPC都結束後再醒來,所以controller和response都可以放棧上。

brpc::Controller cntl1;
brpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
...
stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
...
brpc::Join(cntl1.call_id());
brpc::Join(cntl2.call_id());

brpc::DoNothing()可獲得一個什麼都不幹的done,專門用於半同步訪問。它的生命週期由框架管理,使用者不用關心。

注意在上面的程式碼中,我們在RPC結束後又訪問了controller.call_id(),這是沒有問題的,因為DoNothing中並不會像上節中的on_rpc_done中那樣刪除Controller。
取消RPC

brpc::StartCancel(call_id)可取消對應的RPC,call_id必須在發起RPC前通過Controller.call_id()獲得,其他時刻都可能有race condition。

注意:是brpc::StartCancel(call_id),不是controller->StartCancel(),後者被禁用,沒有效果。後者是protobuf預設提供的介面,但是在controller物件的生命週期上有嚴重的競爭問題。

顧名思義,StartCancel呼叫完成後RPC並未立刻結束,你不應該碰觸Controller的任何欄位或刪除任何資源,它們自然會在RPC結束時被done中對應邏輯處理。如果你一定要在原地等到RPC結束(一般不需要),則可通過Join(call_id)。

關於StartCancel的一些事實:

    call_id在發起RPC前就可以被取消,RPC會直接結束(done仍會被呼叫)。
    call_id可以在另一個執行緒中被取消。
    取消一個已經取消的call_id不會有任何效果。推論:同一個call_id可以被多個執行緒同時取消,但最多一次有效果。
    這裡的取消是純client端的功能,server端未必會取消對應的操作,server cancelation是另一個功能。

獲取Server的地址和埠

remote_side()方法可知道request被送向了哪個server,返回值型別是butil::EndPoint,包含一個ip4地址和埠。在RPC結束前呼叫這個方法都是沒有意義的。

列印方式:

LOG(INFO) << "remote_side=" << cntl->remote_side();
printf("remote_side=%sn", butil::endpoint2str(cntl->remote_side()).c_str());

獲取Client的地址和埠

r31384後通過local_side()方法可在RPC結束後獲得發起RPC的地址和埠。

列印方式:

LOG(INFO) << "local_side=" << cntl->local_side(); 
printf("local_side=%sn", butil::endpoint2str(cntl->local_side()).c_str());

應該重用brpc::Controller嗎?

不用刻意地重用,但Controller是個大雜燴,可能會包含一些快取,Reset()可以避免反覆地建立這些快取。

在大部分場景下,構造Controller(snippet1)和重置Controller(snippet2)的效能差異不大。

// snippet1
for (int i = 0; i < n; ++i) {
    brpc::Controller controller;
    ...
    stub.CallSomething(..., &controller);
}
 
// snippet2
brpc::Controller controller;
for (int i = 0; i < n; ++i) {
    controller.Reset();
    ...
    stub.CallSomething(..., &controller);
}

但如果snippet1中的Controller是new出來的,那麼snippet1就會多出“記憶體分配”的開銷,在一些情況下可能會慢一些。
設定

Client端的設定主要由三部分組成:

    brpc::ChannelOptions: 定義在src/brpc/channel.h中,用於初始化Channel,一旦初始化成功無法修改。
    brpc::Controller: 定義在src/brpc/controller.h中,用於在某次RPC中覆蓋ChannelOptions中的選項,可根據上下文每次均不同。
    全域性gflags:常用於調節一些底層程式碼的行為,一般不用修改。請自行閱讀服務/flags頁面中的說明。

Controller包含了request中沒有的資料和選項。server端和client端的Controller結構體是一樣的,但使用的欄位可能是不同的,你需要仔細閱讀Controller中的註釋,明確哪些欄位可以在server端使用,哪些可以在client端使用。

一個Controller對應一次RPC。一個Controller可以在Reset()後被另一個RPC複用,但一個Controller不能被多個RPC同時使用(不論是否在同一個執行緒發起)。

Controller的特點:

    一個Controller只能有一個使用者,沒有特殊說明的話,Controller中的方法預設執行緒不安全。
    因為不能被共享,所以一般不會用共享指標管理Controller,如果你用共享指標了,很可能意味著出錯了。
    Controller創建於開始RPC前,析構於RPC結束後,常見幾種模式:
        同步RPC前Controller放棧上,出作用域後自行析構。注意非同步RPC的Controller絕對不能放棧上,否則其析構時非同步呼叫很可能還在進行中,從而引發未定義行為。
        非同步RPC前new Controller,done中刪除。

執行緒數

和大部分的RPC框架不同,brpc中並沒有獨立的Client執行緒池。所有Channel和Server通過bthread共享相同的執行緒池. 如果你的程式同樣使用了brpc的server, 僅僅需要設定Server的執行緒數。 或者可以通過gflags設定-bthread_concurrency來設定全域性的執行緒數.
超時

ChannelOptions.timeout_ms是對應Channel上所有RPC的總超時,Controller.set_timeout_ms()可修改某次RPC的值。單位毫秒,預設值1秒,最大值2^31(約24天),-1表示一直等到回覆或錯誤。

ChannelOptions.connect_timeout_ms是對應Channel上所有RPC的連線超時,單位毫秒,預設值1秒。-1表示等到連線建立或出錯,此值被限制為不能超過timeout_ms。注意此超時獨立於TCP的連線超時,一般來說前者小於後者,反之則可能在connect_timeout_ms未達到前由於TCP連線超時而出錯。

注意1:brpc中的超時是deadline,超過就意味著RPC結束,超時後沒有重試。其他實現可能既有單次訪問的超時,也有代表deadline的超時。遷移到brpc時請仔細區分。

注意2:RPC超時的錯誤碼為ERPCTIMEDOUT (1008),ETIMEDOUT的意思是連線超時,且可重試。
重試

ChannelOptions.max_retry是該Channel上所有RPC的預設最大重試次數,Controller.set_max_retry()可修改某次RPC的值,預設值3,0表示不重試。

r32111後Controller.retried_count()返回重試次數。

r34717後Controller.has_backup_request()獲知是否傳送過backup_request。

重試時框架會盡量避開之前嘗試過的server。

重試的觸發條件有(條件之間是AND關係):
連接出錯

如果server一直沒有返回,但連線沒有問題,這種情況下不會重試。如果你需要在一定時間後傳送另一個請求,使用backup request。

工作機制如下:如果response沒有在backup_request_ms內返回,則傳送另外一個請求,哪個先回來就取哪個。新請求會被儘量送到不同的server。注意如果backup_request_ms大於超時,則backup request總不會被髮送。backup request會消耗一次重試次數。backup request不意味著server端cancel。

ChannelOptions.backup_request_ms影響該Channel上所有RPC,單位毫秒,預設值-1(表示不開啟),Controller.set_backup_request_ms()可修改某次RPC的值。
沒到超時

超時後RPC會盡快結束。
有剩餘重試次數

Controller.set_max_retry(0)或ChannelOptions.max_retry=0關閉重試。
錯誤值得重試

一些錯誤重試是沒有意義的,就不會重試,比如請求有錯時(EREQUEST)不會重試,因為server總不會接受,沒有意義。

使用者可以通過繼承brpc::RetryPolicy自定義重試條件。比如brpc預設不重試HTTP相關的錯誤,而你的程式中希望在碰到HTTP_STATUS_FORBIDDEN (403)時重試,可以這麼做:

#include <brpc/retry_policy.h>
 
class MyRetryPolicy : public brpc::RetryPolicy {
public:
    bool DoRetry(const brpc::Controller* cntl) const {
        if (cntl->ErrorCode() == brpc::EHTTP && // HTTP錯誤
            cntl->http_response().status_code() == brpc::HTTP_STATUS_FORBIDDEN) {
            return true;
        }
        // 把其他情況丟給框架。
        return brpc::DefaultRetryPolicy()->DoRetry(cntl);
    }
};
...
 
// 給ChannelOptions.retry_policy賦值就行了。
// 注意:retry_policy必須在Channel使用期間保持有效,Channel也不會刪除retry_policy,所以大部分情況下RetryPolicy都應以單例模式建立。
brpc::ChannelOptions options;
static MyRetryPolicy g_my_retry_policy;
options.retry_policy = &g_my_retry_policy;
...

一些提示:

    通過cntl->response()可獲得對應RPC的response。
    對ERPCTIMEDOUT代表的RPC超時總是不重試,即使你繼承的RetryPolicy中允許。

重試應當保守

由於成本的限制,大部分線上server的冗餘度是有限的,主要是滿足多機房互備的需求。而激進的重試邏輯很容易導致眾多client對server叢集造成2-3倍的壓力,最終使叢集雪崩:由於server來不及處理導致佇列越積越長,使所有的請求得經過很長的排隊才被處理而最終超時,相當於服務停擺。預設的重試是比較安全的: 只要連線不斷RPC就不會重試,一般不會產生大量的重試請求。使用者可以通過RetryPolicy定製重試策略,但也可能使重試變成一場“風暴”。當你定製RetryPolicy時,你需要仔細考慮client和server的協作關係,並設計對應的異常測試,以確保行為符合預期。
協議

Channel的預設協議是baidu_std,可通過設定ChannelOptions.protocol換為其他協議,這個欄位既接受enum也接受字串。

目前支援的有:

    PROTOCOL_BAIDU_STD 或 “baidu_std",即百度標準協議,預設為單連線。
    PROTOCOL_HULU_PBRPC 或 "hulu_pbrpc",hulu的協議,預設為單連線。
    PROTOCOL_NOVA_PBRPC 或 ”nova_pbrpc“,網盟的協議,預設為連線池。
    PROTOCOL_HTTP 或 ”http", http 1.0或1.1協議,預設為連線池(Keep-Alive)。具體方法見訪問HTTP服務。
    PROTOCOL_SOFA_PBRPC 或 "sofa_pbrpc",sofa-pbrpc的協議,預設為單連線。
    PROTOCOL_PUBLIC_PBRPC 或 "public_pbrpc",public_pbrpc的協議,預設為連線池。
    PROTOCOL_UBRPC_COMPACK 或 "ubrpc_compack",public/ubrpc的協議,使用compack打包,預設為連線池。具體方法見ubrpc (by protobuf)。相關的還有PROTOCOL_UBRPC_MCPACK2或ubrpc_mcpack2,使用mcpack2打包。
    PROTOCOL_NSHEAD_CLIENT 或 "nshead_client",這是傳送baidu-rpc-ub中所有UBXXXRequest需要的協議,預設為連線池。具體方法見訪問UB。
    PROTOCOL_NSHEAD 或 "nshead",這是傳送NsheadMessage需要的協議,預設為連線池。具體方法見nshead+blob 。
    PROTOCOL_MEMCACHE 或 "memcache",memcached的二進位制協議,預設為單連線。具體方法見訪問memcached。
    PROTOCOL_REDIS 或 "redis",redis 1.2後的協議(也是hiredis支援的協議),預設為單連線。具體方法見訪問Redis。
    PROTOCOL_NSHEAD_MCPACK 或 "nshead_mcpack", 顧名思義,格式為nshead + mcpack,使用mcpack2pb適配,預設為連線池。
    PROTOCOL_ESP 或 "esp",訪問使用esp協議的服務,預設為連線池。

連線方式

brpc支援以下連線方式:

    短連線:每次RPC前建立連線,結束後關閉連線。由於每次呼叫得有建立連線的開銷,這種方式一般用於偶爾發起的操作,而不是持續發起請求的場景。沒有協議預設使用這種連線方式,http 1.0對連線的處理效果類似短連結。
    連線池:每次RPC前取用空閒連線,結束後歸還,一個連線上最多隻有一個請求,一個client對一臺server可能有多條連線。http 1.1和各類使用nshead的協議都是這個方式。
    單連線:程序內所有client與一臺server最多隻有一個連線,一個連線上可能同時有多個請求,回覆返回順序和請求順序不需要一致,這是baidu_std,hulu_pbrpc,sofa_pbrpc協議的預設選項。

	短連線 	連線池 	單連線
長連線 	否 	是 	是
server端連線數(單client) 	qps*latency (原理見little's law) 	qps*latency 	1
極限qps 	差,且受限於單機埠數 	中等 	高
latency 	1.5RTT(connect) + 1RTT + 處理時間 	1RTT + 處理時間 	1RTT + 處理時間
cpu佔用 	高, 每次都要tcp connect 	中等, 每個請求都要一次sys write 	低, 合併寫出在大流量時減少cpu佔用

框架會為協議選擇預設的連線方式,使用者一般不用修改。若需要,把ChannelOptions.connection_type設為:

    CONNECTION_TYPE_SINGLE 或 "single" 為單連線

    CONNECTION_TYPE_POOLED 或 "pooled" 為連線池, 與單個遠端的最大連線數由-max_connection_pool_size控制:
    Name 	Value 	Description 	Defined At
    max_connection_pool_size (R) 	100 	maximum pooled connection count to a single endpoint 	src/brpc/socket.cpp

    CONNECTION_TYPE_SHORT 或 "short" 為短連線

    設定為“”(空字串)則讓框架選擇協議對應的預設連線方式。

brpc支援Streaming RPC,這是一種應用層的連線,用於傳遞流式資料。
關閉連線池中的閒置連線

當連線池中的某個連線在-idle_timeout_second時間內沒有讀寫,則被視作“閒置”,會被自動關閉。預設值為10秒。此功能只對連線池(pooled)有效。開啟-log_idle_connection_close在關閉前會列印一條日誌。
Name 	Value 	Description 	Defined At
idle_timeout_second 	10 	Pooled connections without data transmission for so many seconds will be closed. No effect for non-positive values 	src/brpc/socket_map.cpp
log_idle_connection_close 	false 	Print log when an idle connection is closed 	src/brpc/socket.cpp
延遲關閉連線

多個channel可能通過引用計數引用同一個連線,當引用某個連線的最後一個channel析構時,該連線將被關閉。但在一些場景中,channel在使用前才被建立,用完立刻析構,這時其中一些連線就會被無謂地關閉再被開啟,效果類似短連線。

一個解決辦法是使用者把所有或常用的channel快取下來,這樣自然能避免channel頻繁產生和析構,但目前brpc沒有提供這樣一個utility,使用者自己(正確)實現有一些工作量。

另一個解決辦法是設定全域性選項-defer_close_second
Name 	Value 	Description 	Defined At
defer_close_second 	0 	Defer close of connections for so many seconds even if the connection is not used by anyone. Close immediately for non-positive values 	src/brpc/socket_map.cpp

設定後引用計數清0時連線並不會立刻被關閉,而是會等待這麼多秒再關閉,如果在這段時間內又有channel引用了這個連線,它會恢復正常被使用的狀態。不管channel建立析構有多頻率,這個選項使得關閉連線的頻率有上限。這個選項的副作用是一些fd不會被及時關閉,如果延時被誤設為一個大數值,程式佔據的fd個數可能會很大。
連線的緩衝區大小

-socket_recv_buffer_size設定所有連線的接收緩衝區大小,預設-1(不修改)

-socket_send_buffer_size設定所有連線的傳送緩衝區大小,預設-1(不修改)
Name 	Value 	Description 	Defined At
socket_recv_buffer_size 	-1 	Set the recv buffer size of socket if this value is positive 	src/brpc/socket.cpp
socket_send_buffer_size 	-1 	Set send buffer size of sockets if this value is positive 	src/brpc/socket.cpp
log_id

通過set_log_id()可設定64位整型log_id。這個id會和請求一起被送到伺服器端,一般會被打在日誌裡,從而把一次檢索經過的所有服務串聯起來。字串格式的需要轉化為64位整形才能設入log_id。
附件

baidu_std和hulu_pbrpc協議支援附件,這段資料由使用者自定義,不經過protobuf的序列化。站在client的角度,設定在Controller::request_attachment()的附件會被server端收到,response_attachment()則包含了server端送回的附件。附件不受壓縮選項影響。

在http協議中,附件對應message body,比如要POST的資料就設定在request_attachment()中。
認證

client端的認證一般分為2種:

    基於請求的認證:每次請求都會帶上認證資訊。這種方式比較靈活,認證資訊中可以含有本次請求中的欄位,但是缺點是每次請求都會需要認證,效能上有所損失
    基於連線的認證:當TCP連線建立後,client傳送認證包,認證成功後,後續該連線上的請求不再需要認證。相比前者,這種方式靈活度不高(一般ren認證包裡只能攜帶本機一些靜態資訊),但效能較好,一般用於單連線/連線池場景

針對第一種認證場景,在實現上非常簡單,將認證的格式定義加到請求結構體中,每次當做正常RPC傳送出去即可;針對第二種場景,brpc提供了一種機制,只要使用者繼承實現:

class Authenticator {
public:
    virtual ~Authenticator() {}

    // Implement this method to generate credential information
    // into `auth_str' which will be sent to `VerifyCredential'
    // at server side. This method will be called on client side.
    // Returns 0 on success, error code otherwise
    virtual int GenerateCredential(std::string* auth_str) const = 0;
};

那麼當用戶併發呼叫RPC介面用單連線往同一個server發請求時,框架會自動保證:建立TCP連線後,連線上的第一個請求中會帶有上述GenerateCredential產生的認證包,其餘剩下的併發請求不會帶有認證資訊,依次排在第一個請求之後。整個傳送過程依舊是併發的,並不會等第一個請求先返回。若server端認證成功,那麼所有請求都能成功返回;若認證失敗,一般server端則會關閉連線,這些請求則會收到相應錯誤。

目前自帶協議中支援客戶端認證的有:baidu_std(預設協議), HTTP, hulu_pbrpc, ESP。對於自定義協議,一般可以在組裝請求階段,呼叫Authenticator介面生成認證串,來支援客戶端認證。
重置

呼叫Reset方法可讓Controller回到剛建立時的狀態。

別在RPC結束前重置Controller,行為是未定義的。
壓縮

set_request_compress_type()設定request的壓縮方式,預設不壓縮。

注意:附件不會被壓縮。

HTTP body的壓縮方法見client壓縮request body。

支援的壓縮方法有:

    brpc::CompressTypeSnappy : snanpy壓縮,壓縮和解壓顯著快於其他壓縮方法,但壓縮率最低。
    brpc::CompressTypeGzip : gzip壓縮,顯著慢於snappy,但壓縮率高
    brpc::CompressTypeZlib : zlib壓縮,比gzip快10%~20%,壓縮率略好於gzip,但速度仍明顯慢於snappy。

下表是多種壓縮演算法應對重複率很高的資料時的效能,僅供參考。
Compress method 	Compress size(B) 	Compress time(us) 	Decompress time(us) 	Compress throughput(MB/s) 	Decompress throughput(MB/s) 	Compress ratio
Snappy 	128 	0.753114 	0.890815 	162.0875 	137.0322 	37.50%
Gzip 	10.85185 	1.849199 	11.2488 	66.01252 	47.66% 	
Zlib 	10.71955 	1.66522 	11.38763 	73.30581 	38.28% 	
Snappy 	1024 	1.404812 	1.374915 	695.1555 	710.2713 	8.79%
Gzip 	16.97748 	3.950946 	57.52106 	247.1718 	6.64% 	
Zlib 	15.98913 	3.06195 	61.07665 	318.9348 	5.47% 	
Snappy 	16384 	8.822967 	9.865008 	1770.946 	1583.881 	4.96%
Gzip 	160.8642 	43.85911 	97.13162 	356.2544 	0.78% 	
Zlib 	147.6828 	29.06039 	105.8011 	537.6734 	0.71% 	
Snappy 	32768 	16.16362 	19.43596 	1933.354 	1607.844 	4.82%
Gzip 	229.7803 	82.71903 	135.9995 	377.7849 	0.54% 	
Zlib 	240.7464 	54.44099 	129.8046 	574.0161 	0.50% 	

下表是多種壓縮演算法應對重複率很低的資料時的效能,僅供參考。
Compress method 	Compress size(B) 	Compress time(us) 	Decompress time(us) 	Compress throughput(MB/s) 	Decompress throughput(MB/s) 	Compress ratio
Snappy 	128 	0.866002 	0.718052 	140.9584 	170.0021 	105.47%
Gzip 	15.89855 	4.936242 	7.678077 	24.7294 	116.41% 	
Zlib 	15.88757 	4.793953 	7.683384 	25.46339 	107.03% 	
Snappy 	1024 	2.087972 	1.06572 	467.7087 	916.3403 	100.78%
Gzip 	32.54279 	12.27744 	30.00857 	79.5412 	79.79% 	
Zlib 	31.51397 	11.2374 	30.98824 	86.90288 	78.61% 	
Snappy 	16384 	12.598 	6.306592 	1240.276 	2477.566 	100.06%
Gzip 	537.1803 	129.7558 	29.08707 	120.4185 	75.32% 	
Zlib 	519.5705 	115.1463 	30.07291 	135.697 	75.24% 	
Snappy 	32768 	22.68531 	12.39793 	1377.543 	2520.582 	100.03%
Gzip 	1403.974 	258.9239 	22.25825 	120.6919 	75.25% 	
Zlib 	1370.201 	230.3683 	22.80687 	135.6524 	75.21% 	
FAQ
Q: brpc能用unix domain socket嗎

不能。同機TCP socket並不走網路,相比unix domain socket效能只會略微下降。一些不能用TCP socket的特殊場景可能會需要,以後可能會擴充套件支援。
Q: Fail to connect to xx.xx.xx.xx:xxxx, Connection refused

一般是對端server沒開啟埠(很可能掛了)。
Q: 經常遇到至另一個機房的Connection timedout

img

這個就是連線超時了,調大連線和RPC超時:

struct ChannelOptions {
    ...
    // Issue error when a connection is not established after so many
    // milliseconds. -1 means wait indefinitely.
    // Default: 200 (milliseconds)
    // Maximum: 0x7fffffff (roughly 30 days)
    int32_t connect_timeout_ms;
    
    // Max duration of RPC over this Channel. -1 means wait indefinitely.
    // Overridable by Controller.set_timeout_ms().
    // Default: 500 (milliseconds)
    // Maximum: 0x7fffffff (roughly 30 days)
    int32_t timeout_ms;
    ...
};

注意: 連線超時不是RPC超時,RPC超時列印的日誌是"Reached timeout=..."。
Q: 為什麼同步方式是好的,非同步就crash了

重點檢查Controller,Response和done的生命週期。在非同步訪問中,RPC呼叫結束並不意味著RPC整個過程結束,而是在進入done->Run()時才會結束。所以這些物件不應在呼叫RPC後就釋放,而是要在done->Run()裡釋放。你一般不能把這些物件分配在棧上,而應該分配在堆上。詳見非同步訪問。
Q: 怎麼確保請求只被處理一次

這不是RPC層面的事情。當response返回且成功時,我們確認這個過程一定成功了。當response返回且失敗時,我們確認這個過程一定失敗了。但當response沒有返回時,它可能失敗,也可能成功。如果我們選擇重試,那一個成功的過程也可能會被再執行一次。一般來說帶副作用的RPC服務都應當考慮冪等問題,否則重試可能會導致多次疊加副作用而產生意向不到的結果。只有讀的檢索服務大都沒有副作用而天然冪等,無需特殊處理。而帶寫的儲存服務則要在設計時就加入版本號或序列號之類的機制以拒絕已經發生的過程,保證冪等。
Q: Invalid address=`bns://group.user-persona.dumi.nj03'

FATAL 04-07 20:00:03 7778 src/brpc/channel.cpp:123] Invalid address=`bns://group.user-persona.dumi.nj03'. You should use Init(naming_service_name, load_balancer_name, options) to access multiple servers.

訪問名字服務要使用三個引數的Init,其中第二個引數是load_balancer_name,而這裡用的是兩個引數的Init,框架認為是訪問單點,就會報這個錯。
Q: 兩端都用protobuf,為什麼不能互相訪問

協議 !=protobuf。protobuf負責一個包的序列化,協議中的一個訊息可能會包含多個protobuf包,以及額外的長度、校驗碼、magic number等等。打包格式相同不意味著協議可以互通。在brpc中寫一份程式碼就能服務多協議的能力是通過把不同協議的資料轉化為統一的程式設計介面完成的,而不是在protobuf層面。
Q: 為什麼C++ client/server 能夠互相通訊, 和其他語言的client/server 通訊會報序列化失敗的錯誤

檢查一下C++ 版本是否開啟了壓縮 (Controller::set_compress_type), 目前其他語言的rpc框架還沒有實現壓縮,互相返回會出現問題。
附:Client端基本流程

img

主要步驟:

    建立一個bthread_id作為本次RPC的correlation_id。
    根據Channel的建立方式,從程序級的SocketMap中或從LoadBalancer中選擇一臺下游server作為本次RPC傳送的目的地。
    根據連線方式(單連線、連線池、短連線),選擇一個Socket。
    如果開啟驗證且當前Socket沒有被驗證過時,第一個請求進入驗證分支,其餘請求會阻塞直到第一個包含認證資訊的請求寫入Socket。server端只對第一個請求進行驗證。
    根據Channel的協議,選擇對應的序列化函式把request序列化至IOBuf。
    如果配置了超時,設定定時器。從這個點開始要避免使用Controller物件,因為在設定定時器後隨時可能觸發超時->呼叫到使用者的超時回撥->使用者在回撥中析構Controller。
    傳送準備階段結束,若上述任何步驟出錯,會呼叫Channel::HandleSendFailed。
    將之前序列化好的IOBuf寫出到Socket上,同時傳入回撥Channel::HandleSocketFailed,當連線斷開、寫失敗等錯誤發生時會呼叫此回撥。
    如果是同步傳送,Join correlation_id;否則至此CallMethod結束。
    網路上發訊息+收訊息。
    收到response後,提取出其中的correlation_id,在O(1)時間內找到對應的Controller。這個過程中不需要查詢全域性雜湊表,有良好的多核擴充套件性。
    根據協議格式反序列化response。
    呼叫Controller::OnRPCReturned,可能會根據錯誤碼判斷是否需要重試,或讓RPC結束。如果是非同步傳送,呼叫使用者回撥。最後摧毀correlation_id喚醒Join著的執行緒。

/********** client **********/

搭建好Brpc的環境後,建立echo.proto並輸入內容:

option cc_generic_services = true;

message EchoRequest {
      required string message = 1;
};

message EchoResponse {
      required string message = 1;
};

service EchoService {
      rpc Echo(EchoRequest) returns (EchoResponse);
};

proto檔案的語法規則可參考:https://www.cnblogs.com/yinheyi/p/6080244.html

命令 protoc echo.proto --cpp_out=. 在當前路徑生成用於C++的標頭檔案和原始檔echo.pb.h和echo.pb.cc。

伺服器程式碼:

#include "../../output/include/butil/logging.h"
#include "../../output/include/brpc/server.h"
#include "echo.pb.h"

class MyEchoService : public EchoService
{
public:
    MyEchoService() {};
    virtual ~MyEchoService() {};
    virtual void Echo(::google::protobuf::RpcController* controller,
                      const ::EchoRequest* request,
                      ::EchoResponse* response,
                      ::google::protobuf::Closure* done)
    {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
        response->set_message(request->message());

        LOG(INFO) << "remote_side=" << cntl->remote_side();
        printf("remote_side=%sn", 
               butil::endpoint2str(cntl->remote_side()).c_str());
    }
};

int main(int argc, char* argv[])
{
    brpc::Server server;
    MyEchoService myechoservice;

    if(server.AddService(&myechoservice, brpc::SERVER_OWNS_SERVICE) != 0)
    {
        LOG(FATAL) << "Fail to add myechoservice";
        return -1;
    }

    if(server.Start("localhost:9000", NULL) != 0)
    {
        LOG(ERROR) << "Fail to start EchoServer";
        return -1;
    }

    printf("Server starting...n");
    server.RunUntilAskedToQuit();

    return 0;
}

客戶端程式碼:

#include "../../output/include/butil/logging.h"
#include "../../output/include/brpc/channel.h"
#include "echo.pb.h"

int main(int argc, char* argv[])
{
    brpc::Channel channel;

    if(channel.Init("localhost:9000", NULL) != 0)
    {
        LOG(ERROR) << "Fail to initialize channel";
        return -1;
    }

    EchoService_Stub stub(&channel);
    EchoRequest request;
    EchoResponse response;
    brpc::Controller cntl;

    request.set_message(argv[1]);
    stub.Echo(&cntl, &request, &response, NULL);
    if(!cntl.Failed())
    {
        LOG(INFO) << "Received from " << cntl.remote_side()
            << " to " << cntl.local_side();
        printf("Response : %sn", response.message().c_str());
    }
    else
        LOG(WARNING) << cntl.ErrorText();

    return 0;
}

g++ -L/usr/lib/x86_64-linux-gnu -L../../output/lib -Xlinker "-(" echo.pb.o server.o -Wl,-Bstatic -lgflags -lprotobuf -lleveldb -lsnappy -lbrpc -Wl,-Bdynamic -Xlinker "-)" -lpthread -lrt -lssl -lcrypto -ldl -lz -o echo_server

效果圖:

我的程式碼:https://github.com/gongluck/CodeBase/tree/master/notes/brpc-notes/brpc/example/echo_c%2B%2B

當然,這個只是最簡單的例子,更復雜的應用還是要檢視:https://github.com/brpc/brpc.git