spp worker 原始碼分析
阿新 • • 發佈:2018-12-25
spp worker Demo
spp_rpc/src/worker/main.cpp
main函式執行入口:
#include "defaultworker.h"
#include "comm_def.h"
using namespace spp;
using namespace spp::worker;
CServerBase* g_worker = NULL;
// ...
int main(int argc, char* argv[])
{
// 生成worker
g_worker = new CDefaultWorker;
if (g_worker)
{
// worker 執行並且阻塞
g_worker->run(argc, argv);
// worker 結束執行釋放記憶體
delete g_worker;
}
return 0;
}
spp_rpc/src/worker/defaultworker.h
namespace spp
{
namespace worker
{
class CDefaultWorker : public CServerBase, public CFrame
{
public:
// 建構函式+解構函式
// 獲取是否啟用微執行緒
bool get_mt_flag();
// 微執行緒切換函式
void handle_switch(bool block);
// 重寫CServerBase的realrun函式
void realrun(int argc, char* argv[]);
// 定義服務型別
// 註冊spp框架訊號處理函式
void assign_signal(int signo);
// 框架迴圈呼叫的邏輯,用於config reload
int loop();
//初始化配置
int initconf(bool reload = false);
static void shm_delay_stat(int64_t time_delay)
{
if(time_delay <= 1)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_1);
}
else if(time_delay <= 10)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_10);
}
else if(time_delay <= 50)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_50);
}
else if(time_delay <= 100)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_100);
}
else
{
MONITOR(MONITOR_WORKER_RECV_DELAY_XXX);
}
}
//一些回撥函式
static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要
static int ator_senddata(unsigned flow, void* arg1, void* arg2); //非必要
static int ator_overload(unsigned flow, void* arg1, void* arg2); //非必要
static int ator_senderror(unsigned flow, void* arg1, void* arg2); //必要
//接受者
CTCommu* ator_;
inline int get_TOS(){return TOS_;}
private:
unsigned msg_timeout_;
int TOS_;
int notify_fd_; // socket commu need notify mircro thread
};
}
}
spp_rpc/src/comm/serverbase.h
namespace spp {
namespace comm {
// 伺服器程式基礎類,包含執行環境初始化、日誌、統計、監控物件
class CServerBase {
public:
// 建構函式+解構函式
// 可重寫的虛擬函式
virtual void run(int argc, char *argv[]);
virtual void startup(bool bg_run = true);
virtual void realrun(int argc, char *argv[]) {}
// 業務名和服務型別的描述,估計會和業務監控日誌以及部署相關聯
// 業務日誌
CTLog log_;
// 統計
CTStat stat_;
// 監控
CTProcMonCli moncli_;
protected:
// 內部監控時間間隔
public:
///////////////////////////////////////////////////////////////////////
// 服務reload退出以及相關訊號處理
static bool reload();
static bool quit();
static void sigusr1_handle(int signo);
static void sigusr2_handle(int signo);
};
}
}
看看比較重要的realrun虛擬函式的實現
spp_rpc/src/worker/defaultworker.cpp
void CDefaultWorker::realrun(int argc, char* argv[])
{
// 初始化配置
SingleTon<CTLog, CreateByProto>::SetProto(&flog_);
initconf(false);
time_t nowtime = time(NULL), montime = 0;
int64_t now_ms = 0;
while (true)
{
///< start: micro thread handle loop entry add 20130715
if (sppdll.spp_handle_loop)
{
sppdll.spp_handle_loop(this);
}
///< end: micro thread handle loop entry 20130715
// == 0 時,表示沒取到請求,進入較長時間非同步等待
bool isBlock = (ator_->poll(false) == 0);
static CSyncFrame* sync_frame = CSyncFrame::Instance();
sync_frame->HandleSwitch(isBlock);
// Check and reconnect net proxy, default 10 ms
now_ms = get_time_ms();
// 檢查quit訊號
if (unlikely(CServerBase::quit()) || unlikely(CServerBase::reload()))
{
now_ms = get_time_ms();
// 保證剩下的請求都處理完
if (unlikely(CServerBase::quit()))
{
flog_.LOG_P_PID(LOG_FATAL, "recv quit signal at %u\n", now_ms);
ator_->poll(true);
}
else
{
flog_.LOG_P_PID(LOG_FATAL, "recv reload signal at %u\n", now_ms);
}
int timeout = 0;
//微執行緒
while (CSyncFrame::Instance()->GetThreadNum() > 1 && timeout < 1000)
{
CSyncFrame::Instance()->sleep(10000);
timeout += 10;
}
now_ms = get_time_ms();
flog_.LOG_P_PID(LOG_FATAL, "exit at %u\n", now_ms);
break;
}
//監控資訊上報
nowtime = time(NULL);
if ( unlikely(nowtime - montime > ix_->moni_inter_time_) )
{
CLI_SEND_INFO(&moncli_)->timestamp_ = nowtime;
moncli_.run();
montime = nowtime;
flog_.LOG_P_PID(LOG_DEBUG, "moncli run!\n");
}
loop();
}
g_check_point = CoreCheckPoint_HandleFini; // 設定待呼叫外掛的CheckPoint
if (sppdll.spp_handle_fini != NULL)
// 呼叫spp_handle_fini函式釋放資源
sppdll.spp_handle_fini(NULL, this);
g_check_point = CoreCheckPoint_SppFrame; // 恢復CheckPoint,重置為CoreCheckPoint_SppFrame
CStatMgrInstance::destroy();
}
ator_ 型別為CTCommu*,負責接受訊號。
//通訊類抽象介面
class CTCommu
{
public:
CTCommu() {
memset(func_list_, 0, sizeof(cb_func) *(CB_TIMEOUT + 1));
memset(func_args_, 0, sizeof(void*) *(CB_TIMEOUT + 1));
}
virtual ~CTCommu() {}
//初始化
//config:配置檔名或者配置引數記憶體指標
virtual int init(const void* config) = 0;
//輪詢,收發資料
//block: true表示使用阻塞模式,否則非阻塞模式
virtual int poll(bool block = false) = 0;
//傳送資料提交
//flow: 資料包唯一標示
//arg1: 通用引數指標1, 一般指向資料blob
//arg2: 通用引數指標2,保留
virtual int sendto(unsigned flow, void* arg1, void* arg2) = 0;
//控制介面
//flow: 資料包唯一標示
//type: 控制命令
//arg1: 通用引數指標1,具體元件有具體的含義
//arg2: 通用引數指標2,具體元件有具體的含義
virtual int ctrl(unsigned flow, ctrl_type type, void* arg1, void* arg2) = 0;
//註冊回撥
//type: 回撥函式型別
//func: 回撥函式
//args: 使用者自定義引數指標, 作為回撥函式的第2個通用引數傳遞
virtual int reg_cb(cb_type type, cb_func func, void* args = NULL) {
if (type <= CB_TIMEOUT) {
func_list_[type] = func;
func_args_[type] = args;
return 0;
} else {
return -1;
}
}
//清空所有共享記憶體佇列,僅供proxy啟動時使用
virtual int clear() = 0;
protected:
cb_func func_list_[CB_TIMEOUT + 1];
void* func_args_[CB_TIMEOUT + 1];
//釋放資源
virtual void fini() = 0;
};
ator_->poll(false) 按照註解說明,使用非阻塞模式接受資料。
那麼ator 具體實現是initconf中
spp_rpc/src/worker/defaultworker.cpp
類比golang的gpm模型
// 使用CTShmCommu實現
ator_ = new CTShmCommu;
ret = ator_->init(&shm);
// 註冊了一些資料操作的回撥
...
// 呼叫了spp_handle_init函式
handle_init_ret = sppdll.spp_handle_init((void*)module_etc.c_str(), this);
spp_rpc/src/comm/tbase/tshmcommu.cpp
有空再分析如何實現poll。
[poll 實現](https://github.com/Tencent/MSEC/blob/master/document/msec/cpp_dev_manual.md)
spp_rpc/src/comm/serverbase.cpp
run->startup+realrun
void CServerBase::run(int argc, char* argv[])
{
// 啟動引數解析
startup(true);
realrun(argc, argv);
}
// 訊號處理
void CServerBase::startup(bool bg_run)
{
//預設需要root許可權才能setrlimit
struct rlimit rlim;
if (0 == getrlimit(RLIMIT_NOFILE, &rlim))
{
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
if (rlim.rlim_cur < 100000) // fix for limits over 100000
{
rlim.rlim_cur = 100000;
rlim.rlim_max = 100000;
setrlimit(RLIMIT_NOFILE, &rlim);
}
}
mallopt(M_MMAP_THRESHOLD, 1024*1024); // 1MB,防止頻繁mmap
mallopt(M_TRIM_THRESHOLD, 8*1024*1024); // 8MB,防止頻繁brk
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGTTOU, SIG_IGN);
signal(SIGTTIN, SIG_IGN);
signal(SIGCHLD, SIG_IGN);
if (bg_run)
{
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
daemon(1, 1);
}
CServerBase::flag_ = 0;
//signal(SIGSEGV, CServerBase::sigsegv_handle);
signal(SIGUSR1, CServerBase::sigusr1_handle);
signal(SIGUSR2, CServerBase::sigusr2_handle);
}
啟動流程大體走一遍了,那麼接受到資料會進行怎樣的回撥呢?
spp_rpc/src/worker/defaultworker.h
// 回撥入口
static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要
static int ator_senderror(unsigned flow, void* arg1, void* arg2); //必要
spp_rpc/src/worker/defaultworker.cpp
int CDefaultWorker::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{
blob_type* blob = (blob_type*)arg1;
CDefaultWorker* worker = (CDefaultWorker*)arg2;
if (likely(blob->len > 0))
{
TConnExtInfo* ptr = NULL;
MONITOR(MONITOR_WORKER_FROM_PROXY);
blob->len -= sizeof(TConnExtInfo);
blob->extdata = blob->data + blob->len;
ptr = (TConnExtInfo*)blob->extdata;
int64_t recv_ms = int64_t(ptr->recvtime_) * 1000 + ptr->tv_usec / 1000;
int64_t now = get_time_ms();
int64_t time_delay = now - recv_ms;
worker->fstat_.op(WIDX_MSG_SHM_TIME, time_delay);
add_memlog(blob->data, blob->len);
// 超時處理
worker->flog_.LOG_P_FILE(LOG_DEBUG, "ator recvdone, flow:%u, blob len:%d\n", flow, blob->len);
worker->fstat_.op(WIDX_SHM_RX_BYTES, blob->len); // 累加接收位元組數
g_check_point = CoreCheckPoint_HandleProcess; // 設定待呼叫外掛的CheckPoint
// 呼叫spp_handle_process函式
int ret = sppdll.spp_handle_process(flow, arg1, arg2);
g_check_point = CoreCheckPoint_SppFrame; // 恢復CheckPoint,重置為CoreCheckPoint_SppFrame
if (likely(!ret))
{
MONITOR(MONITOR_WORKER_PROC_SUSS);
return 0;
}
else
{
MONITOR(MONITOR_WORKER_PROC_FAIL);
CTCommu* commu = (CTCommu*)blob->owner;
blob_type rspblob;
rspblob.len = 0;
rspblob.data = NULL;
commu->sendto(flow, &rspblob, NULL);
}
}
return -1;
}
具體幾個spp函式的實現:
spp_rpc/src/module/example/simple/echo_example.cpp
/**
* Tencent is pleased to support the open source community by making MSEC available.
*
* Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the GNU General Public License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* https://opensource.org/licenses/GPL-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
//必須包含spp的標頭檔案
#include "sppincl.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
//格式化時間輸出
char *format_time( time_t tm);
//初始化方法(可選實現)
//arg1: 配置檔案
//arg2: 伺服器容器物件
//返回0成功,非0失敗
extern "C" int spp_handle_init(void* arg1, void* arg2)
{
//外掛自身的配置檔案
const char* etc = (const char*)arg1;
//伺服器容器物件
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_init, config:%s, servertype:%d\n", etc, base->servertype());
return 0;
}
//資料接收(必須實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回值:> 0 表示資料已經接收完整且該值表示資料包的長度
// == 0 表示資料包還未接收完整
// < 0 負數表示出錯,將會斷開連線
extern "C" int spp_handle_input(unsigned flow, void* arg1, void* arg2)
{
//資料塊物件,結構請參考tcommu.h
blob_type* blob = (blob_type*)arg1;
//extinfo有擴充套件資訊
TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
//伺服器容器物件
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P(LOG_DEBUG, "spp_handle_input[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
format_time(extinfo->recvtime_),
flow,
blob->len,
inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));
return blob->len;
}
//路由選擇(可選實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回值表示worker的組號
extern "C" int spp_handle_route(unsigned flow, void* arg1, void* arg2)
{
//伺服器容器物件
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_FILE(LOG_DEBUG, "spp_handle_route, flow:%d\n", flow);
return 1;
}
//資料處理(必須實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回0表示成功,非0失敗(將會主動斷開連線)
extern "C" int spp_handle_process(unsigned flow, void* arg1, void* arg2)
{
//資料塊物件,結構請參考tcommu.h
blob_type* blob = (blob_type*)arg1;
//資料來源的通訊元件物件
CTCommu* commu = (CTCommu*)blob->owner;
//extinfo有擴充套件資訊
TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
//伺服器容器物件
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_process[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
format_time(extinfo->recvtime_),
flow,
blob->len,
inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));
//echo logic
int ret = commu->sendto(flow, arg1, arg2 );
if (ret != 0)
{
base->log_.LOG_P_PID(LOG_ERROR, "send response error, ret:%d\n", ret);
return ret;
}
//if all responses were sent, send a NULL blob to release flow
//blob_type release_cmd;
//release_cmd.data = NULL;
//release_cmd.len = 0;
//commu->sendto(flow, &release_cmd, arg2);
return 0;
}
//析構資源(可選實現)
//arg1: 保留引數
//arg2: 伺服器容器物件
extern "C" void spp_handle_fini(void* arg1, void* arg2)
{
//伺服器容器物件
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_fini\n");
}
char *format_time( time_t tm)
{
static char str_tm[1024];
struct tm tmm;
memset(&tmm, 0, sizeof(tmm) );
localtime_r((time_t *)&tm, &tmm);
snprintf(str_tm, sizeof(str_tm), "[%04d-%02d-%02d %02d:%02d:%02d]",
tmm.tm_year + 1900, tmm.tm_mon + 1, tmm.tm_mday,
tmm.tm_hour, tmm.tm_min, tmm.tm_sec);
return str_tm;
}