spp proxy 原始碼分析
阿新 • • 發佈:2018-12-25
spp proxy demo
#include "defaultproxy.h"
using namespace spp::proxy;
int main(int argc, char* argv[])
{
CServerBase* proxy = new CDefaultProxy;
proxy->run(argc, argv);
delete proxy;
return 0;
}
原始碼分析
spp_rpc/src/proxy/defaultproxy.h
namespace spp
{
namespace proxy
{
class CDefaultProxy : public CServerBase, public CFrame
{
public:
// 建構函式+解構函式
// 實際執行函式
void realrun(int argc, char* argv[]);
// 服務容器型別
// 獲取介面地址
unsigned GetListenIp(const char *intf);
int loop();
// 初始化配置
int initconf(bool reload = false);
//一些回撥函式
static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ctor_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ator_recvdata_v2(unsigned flow, void* arg1, void * arg2); //必要
// ...非必要回調函式
// 接受者 與controller通訊
CTCommu* ator_;
// 連線者,共享記憶體佇列,與worker通訊
map<int, CTCommu*> ctor_;
//ip限制類型
unsigned char iplimit_;
//ip集合
set<unsigned> iptable_;
spp_handle_process_t local_handle;
};
}
}
spp_rpc/src/proxy/defaultproxy.cpp
void CDefaultProxy::realrun(int argc, char* argv[])
{
//初始化配置
SingleTon<CTLog, CreateByProto>::SetProto(&flog_);
initconf(false);
ix_->fstat_inter_time_ = 5; //5秒 統計一次框架資訊
time_t fstattime = 0;
time_t montime = 0;
static char statbuff[PROXY_STAT_BUF_SIZE];
char* pstatbuff = (char*)statbuff;
int bufflen = 0;
char ctor_stat[1 << 16] = {0};
int ctor_stat_len = 0;
int processed = 0;
while (true)
{
//輪詢acceptor
ator_->poll(processed == 0); // need sleep ?
processed = 0;
//輪詢connector
map<int, CTCommu*>::iterator it;
for (it = ctor_.begin(); it != ctor_.end(); it++)
{
spp_global::set_cur_group_id(it->first);
processed += it->second->poll(true); // block
}
if ( unlikely(get_time_s() - montime > ix_->moni_inter_time_) )
{
// 上報資訊到ctrl程序
CLI_SEND_INFO(&moncli_)->timestamp_ = get_time_s();
moncli_.run();
//輸出監控統計資訊
}
//檢查quit訊號
if (unlikely(CServerBase::quit()))
{
int count = 20;
while(count--)
{
for (it = ctor_.begin(); it != ctor_.end(); it++)
{
it->second->poll(true); // block
}
usleep(20000);
}
break; // 20ms * 20
}
loop();
}
if (sppdll.spp_handle_fini != NULL)
sppdll.spp_handle_fini(NULL, this);
}
當proxy接收到使用者請求的回撥處理,然後通知對應worker進行處理
int CDefaultProxy::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{
blob_type* blob = (blob_type*)arg1;
CDefaultProxy* proxy = (CDefaultProxy*)arg2;
proxy->flog_.LOG_P_FILE(LOG_DEBUG, "flow:%u,blob len:%d\n", flow, blob->len);
int total_len = blob->len;
int processed_len = 0;
int proto_len = -1;
int ret = 0;
int handle_exception = 0;
while (blob->len > 0 && (proto_len = sppdll.spp_handle_input(flow, arg1, arg2)) > 0)
{
// 異常處理
// ...
//選取路由number
unsigned route_no;
if (!sppdll.spp_handle_route)
{
//如果spp_handle_route沒有實現,只選取第一個
route_no = 1;
}
else
{
// 0x7FFF 為了去掉GROUPID併兼容
route_no = sppdll.spp_handle_route(flow, arg1, arg2) & 0x7FFF;
}
blob_type sendblob;
// 分配空間
if (data != NULL)
{
// 獲取sendblob
memcpy(data, blob->data, proto_len);
memcpy(data + proto_len, blob->extdata, sizeof(TConnExtInfo));
sendblob.data = data;
sendblob.len = proto_len + sizeof(TConnExtInfo);
}
// 根據router_no選取相應的worker,將data寫入與該worker相對應的共享記憶體
map<int, CTCommu*>::iterator iter;
if ((iter = proxy->ctor_.find(route_no)) != proxy->ctor_.end())
{
MONITOR(MONITOR_PROXY_TO_WORKER);
ret = iter->second->sendto(flow, &sendblob, NULL);
}
//異常處理
blob->data += proto_len;
blob->len -= proto_len;
}
// ...
}
當proxy接收到ctl的心跳詢問,會主動回包維持心跳
int CDefaultProxy::ctor_recvdata(unsigned flow, void* arg1, void* arg2)
{
blob_type* blob = (blob_type*)arg1;
CDefaultProxy* proxy = (CDefaultProxy*)arg2;
// 維持心跳
int ret = proxy->ator_->sendto(flow, arg1, NULL);
return 0;
}