百度開源高效能RPC框架 sofa-pbrpc
簡介
sofa-pbrpc是基於Google Protocol Buffers 實現的RPC網路通訊庫,在百度公司各部門得到廣泛使用,每天支撐上億次內部呼叫。sofa-pbrpc基於百度大搜索高併發高負載的業務場景不斷打磨,成為一套簡單易用的輕量級高效能RPC框架。2014年sofa-pbrpc正式對外開源受到廣大開發人員的關注,目前sofa-pbrpc已經在浪潮、金山、樂視等各大網際網路公司產品中使用。
目標
- 輕量
- 易用
- 高效能
特性
- 介面簡單,容易使用
- 實現高效,效能優異(高吞吐、低延遲、高併發連線數)
- 測試完善,執行穩定
- 支援同步和非同步呼叫,滿足不同型別需求
- 支援多級超時設定,靈活控制請求超時時間
- 支援精準的網路流量控制,對應用層透明
- 支援透明壓縮傳輸,節省頻寬
- 提供服務和方法級別的服務呼叫統計資訊,方便監控
- 支援自動建立連線和自動重連,使用者無需感知連線
- 遠端地址相同的Client Stub共享一個連線通道,節省資源
- 空閒連線自動關閉,及時釋放資源
- 支援Mock測試
- 支援多Server負載均衡與容錯
- 原生支援HTTP協議訪問
- 提供內建的Web監控頁面
- 提供Python客戶端庫
- 支援webservice,使用者快速定義web server處理邏輯
- 支援profiling,實時檢視程式的資源消耗,方便問題追查
快速使用
使用sofa-pbrpc只需要三步:
- 定義通訊協議
- 實現Server
- 實現Client
樣例程式碼參見“sample/echo”。
定義通訊協議
定義協議只需要編寫一個proto檔案即可。 範例:echo_service.proto
package sofa.pbrpc.test;
option cc_generic_services = true;
message EchoRequest {
required string message = 1;
}
message EchoResponse {
required string message = 1;
}
service EchoServer {
rpcEcho(EchoRequest)returns(EchoResponse);
}
使用protoc編譯'echo_service.proto',生成介面檔案'echo_service.pb.h'和'echo_service.pb.cc'。
注意:
- package會被對映到C++中的namespace,為了避免衝突建議使用package;
- 需要設定“cc_generic_services”,以通知protoc工具生成RPC框架程式碼;
- 這裡EchoRequest和EchoResponse的成員完全相同,在實際應用中可以設定不同的成員;
實現Server
標頭檔案
#include<sofa/pbrpc/pbrpc.h>// sofa-pbrpc標頭檔案
#include"echo_service.pb.h"// service介面定義標頭檔案
實現服務
class EchoServerImpl : public sofa::pbrpc::test::EchoServer
{
public:
EchoServerImpl() {}
virtual ~EchoServerImpl() {}
private:
virtualvoidEcho(google::protobuf::RpcController* controller,
constsofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
google::protobuf::Closure* done){
sofa::pbrpc::RpcController* cntl =
static_cast<sofa::pbrpc::RpcController*>(controller);
SLOG(NOTICE, "Echo(): request message from %s: %s",
cntl->RemoteAddress().c_str(), request->message().c_str());
response->set_message("echo message: " + request->message());
done->Run();
}
};
注意:
- 服務完成後必須呼叫done->Run(),通知RPC系統服務完成,觸發傳送Response;
- 在調了done->Run()之後,Echo的所有四個引數都不再能訪問; done-Run()可以分派到其他執行緒中執行,以實現了真正的非同步處理;
註冊和啟動服務
intmain(){
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcServerOptions options;
options.work_thread_num = 8;
sofa::pbrpc::RpcServer rpc_server(options);
if (!rpc_server.Start("0.0.0.0:12321")) {
SLOG(ERROR, "start server failed");
return EXIT_FAILURE;
}
sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl();
if (!rpc_server.RegisterService(echo_service)) {
SLOG(ERROR, "register service failed");
return EXIT_FAILURE;
}
rpc_server.Run();
rpc_server.Stop();
return EXIT_SUCCESS;
}
實現Client
Client支援同步和非同步兩種呼叫方式:
- 同步呼叫時,呼叫執行緒會被阻塞,直到收到回覆或者超時;
- 非同步呼叫時,呼叫執行緒不會被阻塞,收到回覆或者超時會呼叫使用者提供的回撥函式;
標頭檔案
#include<sofa/pbrpc/pbrpc.h>// sofa-pbrpc標頭檔案
#include"echo_service.pb.h"// service介面定義標頭檔案
同步呼叫
intmain(){
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcClientOptions client_options;
client_options.work_thread_num = 8;
sofa::pbrpc::RpcClient rpc_client(client_options);
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
sofa::pbrpc::test::EchoRequest request;
request.set_message("Hello world!");
sofa::pbrpc::test::EchoResponse response;
sofa::pbrpc::RpcController controller;
controller.SetTimeout(3000);
stub.Echo(&controller, &request, &response, NULL);
if (controller.Failed()) {
SLOG(ERROR, "request failed: %s", controller.ErrorText().c_str());
}
return EXIT_SUCCESS;
}
非同步呼叫
voidEchoCallback(sofa::pbrpc::RpcController* cntl,
sofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
bool* callbacked){
SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str());
SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false");
if (cntl->IsRequestSent())
{
SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str());
SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes());
}
if (cntl->Failed()) {
SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
}
else {
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
}
delete cntl;
delete request;
delete response;
*callbacked = true;
}
intmain(){
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcClientOptions client_options;
sofa::pbrpc::RpcClient rpc_client(client_options);
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
bool callbacked = false;
google::protobuf::Closure* done = sofa::pbrpc::NewClosure(
&EchoCallback, cntl, request, response, &callbacked);
stub.Echo(cntl, request, response, done);
while (!callbacked) {
usleep(100000);
}
return EXIT_SUCCESS;
}
注意:
- 非同步呼叫傳入的controller、request、response引數,在回撥函式執行之前需一直保持有效;
- 回撥函式的執行會分配到專門的回撥執行緒中執行,可以通過設定RpcClientOptions的callback_thread_num來配置回撥執行緒數;
實現
系統結構
- RpcClientStream/RpcServerStream:代表client和server之間的連線,用於client和server的網路通訊。
- ThreadGroup:client和server內部執行緒池,用於io操作和執行回撥。
- TimeoutManager:採用訂閱者模型,對rpc請求進行超時管理。
- RpCListenser:接受來自client的連線請求,建立與client之間的連線。
- ServicePool:server端服務管理與路由。
整個RPC呼叫經過以下階段:
- Stub呼叫RPC函式發起RPC請求.
- RpcChannel呼叫CallMethod執行RPC呼叫。
- RpcClient選取RpcClientStream非同步傳送請求,並新增至超時佇列。
- server端RpcListener接收到client的請求,建立對應RpcServerStream。
- RpcServerStream接收資料,根據meta資訊在ServerPool中選取對應Service.Method執行。
- server通過RpcServerStream傳送執行結果,回覆過程與請求過程類似。
技術特點
協議棧方式的網路模型
在sofa-pbrpc中網路資料自上而下流劃分為RpcClientStream/RpcServerStream、RpcMessageStream、RpcByteStream三層。訊息流層主要負責網路通訊相關的操作,操作物件為序列化之後的二機制位元組流;訊息流層處理的物件是由header、meta和data組裝的訊息,負責訊息級別的控制與統計;協議層負責非同步傳送接受請求和響應資料。三層結構每一層是下一層的封裝和擴充套件,採用這樣協議棧方式的層次劃分更加有利於資料協議的擴充套件。
ZeroCopy方式管理緩衝區。
sofa-pbrpc將記憶體劃分為固定大小的buffer作為緩衝區,對buffer採用引用計數進行管理,減少不必要的記憶體拷貝。
支援HTTP協議
除了使用原生client訪問server外,sofa-pbrpc也支援使用http協議訪問server上的服務。同時,使用者可以通過使用server端的WebService工具類,快速實現server的對於http請求的處理邏輯。
支援json格式資料傳輸
sofa-pbrpc支援使用者使用http客戶端向server傳送json格式的資料請求,並返回json格式的響應。
提供豐富的工具類
sofa-pbrpc提供常用工具類給開發者,包括:
效能
測試環境
- cpu 16core
- memory 64G
- kernel 2.6.32_1-15-0-0