1. 程式人生 > >spp worker 原始碼分析

spp worker 原始碼分析

spp worker Demo

spp_rpc/src/worker/main.cpp
main函式執行入口:

#include "defaultworker.h"
#include "comm_def.h"

using namespace spp;
using namespace spp::worker;

CServerBase* g_worker = NULL;

// ...

int main(int argc, char* argv[])
{
    // 生成worker
    g_worker = new CDefaultWorker;
    if (g_worker)
    {
        // worker 執行並且阻塞
g_worker->run(argc, argv); // worker 結束執行釋放記憶體 delete g_worker; } return 0; }

spp_rpc/src/worker/defaultworker.h

namespace spp
{
    namespace worker
    {
        class CDefaultWorker : public CServerBase, public CFrame
        {
        public:
            // 建構函式+解構函式
// 獲取是否啟用微執行緒 bool get_mt_flag(); // 微執行緒切換函式 void handle_switch(bool block); // 重寫CServerBase的realrun函式 void realrun(int argc, char* argv[]); // 定義服務型別 // 註冊spp框架訊號處理函式 void assign_signal(int signo); // 框架迴圈呼叫的邏輯,用於config reload
int loop(); //初始化配置 int initconf(bool reload = false); static void shm_delay_stat(int64_t time_delay) { if(time_delay <= 1) { MONITOR(MONITOR_WORKER_RECV_DELAY_1); } else if(time_delay <= 10) { MONITOR(MONITOR_WORKER_RECV_DELAY_10); } else if(time_delay <= 50) { MONITOR(MONITOR_WORKER_RECV_DELAY_50); } else if(time_delay <= 100) { MONITOR(MONITOR_WORKER_RECV_DELAY_100); } else { MONITOR(MONITOR_WORKER_RECV_DELAY_XXX); } } //一些回撥函式 static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要 static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要 static int ator_senddata(unsigned flow, void* arg1, void* arg2); //非必要 static int ator_overload(unsigned flow, void* arg1, void* arg2); //非必要 static int ator_senderror(unsigned flow, void* arg1, void* arg2); //必要 //接受者 CTCommu* ator_; inline int get_TOS(){return TOS_;} private: unsigned msg_timeout_; int TOS_; int notify_fd_; // socket commu need notify mircro thread }; } }

spp_rpc/src/comm/serverbase.h

namespace spp {
    namespace comm {

        // 伺服器程式基礎類,包含執行環境初始化、日誌、統計、監控物件
        class CServerBase {
        public:
            // 建構函式+解構函式

            // 可重寫的虛擬函式
            virtual void run(int argc, char *argv[]);
            virtual void startup(bool bg_run = true);
            virtual void realrun(int argc, char *argv[]) {}

            // 業務名和服務型別的描述,估計會和業務監控日誌以及部署相關聯

            // 業務日誌
            CTLog log_;
            // 統計
            CTStat stat_;
            // 監控
            CTProcMonCli moncli_;

        protected:
            // 內部監控時間間隔

        public:
            ///////////////////////////////////////////////////////////////////////

            // 服務reload退出以及相關訊號處理
            static bool reload();
            static bool quit();
            static void sigusr1_handle(int signo);
            static void sigusr2_handle(int signo);
        };
    }
}

看看比較重要的realrun虛擬函式的實現
spp_rpc/src/worker/defaultworker.cpp

void CDefaultWorker::realrun(int argc, char* argv[])
{
    // 初始化配置
    SingleTon<CTLog, CreateByProto>::SetProto(&flog_);

    initconf(false);

    time_t  nowtime = time(NULL), montime = 0;
    int64_t now_ms = 0;

    while (true)
    {
        ///< start: micro thread handle loop entry add 20130715
        if (sppdll.spp_handle_loop)
        {
            sppdll.spp_handle_loop(this);   
        }            
        ///< end: micro thread handle loop entry 20130715

        // == 0 時,表示沒取到請求,進入較長時間非同步等待
        bool isBlock = (ator_->poll(false) == 0);

        static CSyncFrame* sync_frame = CSyncFrame::Instance();
        sync_frame->HandleSwitch(isBlock);

        // Check and reconnect net proxy, default 10 ms 
        now_ms = get_time_ms();

        // 檢查quit訊號
        if (unlikely(CServerBase::quit()) || unlikely(CServerBase::reload()))
        {
            now_ms = get_time_ms();

            // 保證剩下的請求都處理完
            if (unlikely(CServerBase::quit()))
            {        
                flog_.LOG_P_PID(LOG_FATAL, "recv quit signal at %u\n", now_ms);
                ator_->poll(true);
            }
            else
            {           
                flog_.LOG_P_PID(LOG_FATAL, "recv reload signal at %u\n", now_ms);
            }

            int timeout = 0;
            //微執行緒
            while (CSyncFrame::Instance()->GetThreadNum() > 1 && timeout < 1000)
            {
                CSyncFrame::Instance()->sleep(10000);
                timeout += 10;
            }

            now_ms = get_time_ms();
            flog_.LOG_P_PID(LOG_FATAL, "exit at %u\n", now_ms);
            break;
        }

        //監控資訊上報
        nowtime = time(NULL);

        if ( unlikely(nowtime - montime > ix_->moni_inter_time_) )
        {
            CLI_SEND_INFO(&moncli_)->timestamp_ = nowtime;
            moncli_.run();
            montime = nowtime;
            flog_.LOG_P_PID(LOG_DEBUG, "moncli run!\n");
        }

        loop();
    }

    g_check_point = CoreCheckPoint_HandleFini;           // 設定待呼叫外掛的CheckPoint
    if (sppdll.spp_handle_fini != NULL)
        // 呼叫spp_handle_fini函式釋放資源
        sppdll.spp_handle_fini(NULL, this);
    g_check_point = CoreCheckPoint_SppFrame;                // 恢復CheckPoint,重置為CoreCheckPoint_SppFrame

    CStatMgrInstance::destroy();
}

ator_ 型別為CTCommu*,負責接受訊號。

        //通訊類抽象介面
        class CTCommu
        {
        public:
            CTCommu() {
                memset(func_list_, 0, sizeof(cb_func) *(CB_TIMEOUT + 1));
                memset(func_args_, 0, sizeof(void*) *(CB_TIMEOUT + 1));
            }
            virtual ~CTCommu() {}

            //初始化
            //config:配置檔名或者配置引數記憶體指標
            virtual int init(const void* config) = 0;

            //輪詢,收發資料
            //block: true表示使用阻塞模式,否則非阻塞模式
            virtual int poll(bool block = false) = 0;

            //傳送資料提交
            //flow: 資料包唯一標示
            //arg1: 通用引數指標1, 一般指向資料blob
            //arg2: 通用引數指標2,保留
            virtual int sendto(unsigned flow, void* arg1, void* arg2) = 0;

            //控制介面
            //flow: 資料包唯一標示
            //type: 控制命令
            //arg1: 通用引數指標1,具體元件有具體的含義
            //arg2: 通用引數指標2,具體元件有具體的含義
            virtual int ctrl(unsigned flow, ctrl_type type, void* arg1, void* arg2) = 0;

            //註冊回撥
            //type: 回撥函式型別
            //func: 回撥函式
            //args: 使用者自定義引數指標, 作為回撥函式的第2個通用引數傳遞
            virtual int reg_cb(cb_type type, cb_func func, void* args = NULL) {
                if (type <= CB_TIMEOUT) {
                    func_list_[type] = func;
                    func_args_[type] = args;
                    return 0;
                } else {
                    return -1;
                }
            }

            //清空所有共享記憶體佇列,僅供proxy啟動時使用
            virtual int clear() = 0;

        protected:
            cb_func func_list_[CB_TIMEOUT + 1];
            void* func_args_[CB_TIMEOUT + 1];

            //釋放資源
            virtual void fini() = 0;
        };

ator_->poll(false) 按照註解說明,使用非阻塞模式接受資料。
那麼ator 具體實現是initconf中
spp_rpc/src/worker/defaultworker.cpp
類比golang的gpm模型

// 使用CTShmCommu實現
ator_ = new CTShmCommu;
ret = ator_->init(&shm);

// 註冊了一些資料操作的回撥
...

// 呼叫了spp_handle_init函式
handle_init_ret = sppdll.spp_handle_init((void*)module_etc.c_str(), this);

spp_rpc/src/comm/tbase/tshmcommu.cpp

有空再分析如何實現poll。
[poll 實現](https://github.com/Tencent/MSEC/blob/master/document/msec/cpp_dev_manual.md)

spp_rpc/src/comm/serverbase.cpp
run->startup+realrun

void CServerBase::run(int argc, char* argv[])
{
    // 啟動引數解析

    startup(true);

    realrun(argc, argv);
}

// 訊號處理
void CServerBase::startup(bool bg_run)
{
    //預設需要root許可權才能setrlimit
    struct rlimit rlim;
    if (0 == getrlimit(RLIMIT_NOFILE, &rlim))
    {
        rlim.rlim_cur = rlim.rlim_max;
        setrlimit(RLIMIT_NOFILE, &rlim);
        if (rlim.rlim_cur < 100000)     // fix for limits over 100000
        {
            rlim.rlim_cur = 100000;
            rlim.rlim_max = 100000;
            setrlimit(RLIMIT_NOFILE, &rlim);
        }
    }

    mallopt(M_MMAP_THRESHOLD, 1024*1024);   // 1MB,防止頻繁mmap
    mallopt(M_TRIM_THRESHOLD, 8*1024*1024); // 8MB,防止頻繁brk

    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGTTOU, SIG_IGN);
    signal(SIGTTIN, SIG_IGN);
    signal(SIGCHLD, SIG_IGN);

    if (bg_run)
    {
        signal(SIGINT,  SIG_IGN);
        signal(SIGTERM, SIG_IGN);
        daemon(1, 1);
    }

    CServerBase::flag_ = 0;
    //signal(SIGSEGV, CServerBase::sigsegv_handle);
    signal(SIGUSR1, CServerBase::sigusr1_handle);
    signal(SIGUSR2, CServerBase::sigusr2_handle);
}

啟動流程大體走一遍了,那麼接受到資料會進行怎樣的回撥呢?
spp_rpc/src/worker/defaultworker.h

            // 回撥入口
            static int ator_recvdata(unsigned flow, void* arg1, void* arg2);    //必要
            static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要
            static int ator_senderror(unsigned flow, void* arg1, void* arg2);   //必要

spp_rpc/src/worker/defaultworker.cpp

int CDefaultWorker::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{
    blob_type* blob = (blob_type*)arg1;
    CDefaultWorker* worker = (CDefaultWorker*)arg2;

    if (likely(blob->len > 0))
    {
        TConnExtInfo* ptr = NULL;

        MONITOR(MONITOR_WORKER_FROM_PROXY);
        blob->len -= sizeof(TConnExtInfo);
        blob->extdata = blob->data + blob->len;
        ptr = (TConnExtInfo*)blob->extdata;

        int64_t recv_ms = int64_t(ptr->recvtime_) * 1000 + ptr->tv_usec / 1000;
        int64_t now = get_time_ms();

        int64_t time_delay = now - recv_ms;

        worker->fstat_.op(WIDX_MSG_SHM_TIME, time_delay);
        add_memlog(blob->data, blob->len);

        // 超時處理

        worker->flog_.LOG_P_FILE(LOG_DEBUG, "ator recvdone, flow:%u, blob len:%d\n", flow, blob->len);
        worker->fstat_.op(WIDX_SHM_RX_BYTES, blob->len); // 累加接收位元組數

        g_check_point = CoreCheckPoint_HandleProcess;           // 設定待呼叫外掛的CheckPoint

        // 呼叫spp_handle_process函式
        int ret = sppdll.spp_handle_process(flow, arg1, arg2);

        g_check_point = CoreCheckPoint_SppFrame;                // 恢復CheckPoint,重置為CoreCheckPoint_SppFrame

        if (likely(!ret))
        {
            MONITOR(MONITOR_WORKER_PROC_SUSS);
            return 0;
        }
        else
        {
            MONITOR(MONITOR_WORKER_PROC_FAIL);
            CTCommu* commu = (CTCommu*)blob->owner;
            blob_type rspblob;

            rspblob.len = 0;
            rspblob.data = NULL;

            commu->sendto(flow, &rspblob, NULL);
        }
    }

    return -1;
}

具體幾個spp函式的實現:
spp_rpc/src/module/example/simple/echo_example.cpp


/**
 * Tencent is pleased to support the open source community by making MSEC available.
 *
 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
 *
 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 
 * you may not use this file except in compliance with the License. You may 
 * obtain a copy of the License at
 *
 *     https://opensource.org/licenses/GPL-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the 
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */


//必須包含spp的標頭檔案
#include "sppincl.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

//格式化時間輸出
char *format_time( time_t tm);

//初始化方法(可選實現)
//arg1: 配置檔案
//arg2: 伺服器容器物件
//返回0成功,非0失敗
extern "C" int spp_handle_init(void* arg1, void* arg2)
{
    //外掛自身的配置檔案
    const char* etc = (const char*)arg1;
    //伺服器容器物件
    CServerBase* base = (CServerBase*)arg2;

    base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_init, config:%s, servertype:%d\n", etc, base->servertype());

    return 0;
}

//資料接收(必須實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回值:> 0 表示資料已經接收完整且該值表示資料包的長度
// == 0 表示資料包還未接收完整
// < 0 負數表示出錯,將會斷開連線
extern "C" int spp_handle_input(unsigned flow, void* arg1, void* arg2)
{
    //資料塊物件,結構請參考tcommu.h
    blob_type* blob = (blob_type*)arg1;
    //extinfo有擴充套件資訊
    TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
    //伺服器容器物件
    CServerBase* base = (CServerBase*)arg2;

    base->log_.LOG_P(LOG_DEBUG, "spp_handle_input[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
                     format_time(extinfo->recvtime_),
                     flow,
                     blob->len,
                     inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));

    return blob->len;
}

//路由選擇(可選實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回值表示worker的組號
extern "C" int spp_handle_route(unsigned flow, void* arg1, void* arg2)
{
    //伺服器容器物件
    CServerBase* base = (CServerBase*)arg2;
    base->log_.LOG_P_FILE(LOG_DEBUG, "spp_handle_route, flow:%d\n", flow);
    return 1;
}

//資料處理(必須實現)
//flow: 請求包標誌
//arg1: 資料塊物件
//arg2: 伺服器容器物件
//返回0表示成功,非0失敗(將會主動斷開連線)
extern "C" int spp_handle_process(unsigned flow, void* arg1, void* arg2)
{
    //資料塊物件,結構請參考tcommu.h
    blob_type* blob = (blob_type*)arg1;
    //資料來源的通訊元件物件
    CTCommu* commu = (CTCommu*)blob->owner;
    //extinfo有擴充套件資訊
    TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
    //伺服器容器物件
    CServerBase* base = (CServerBase*)arg2;

    base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_process[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
                         format_time(extinfo->recvtime_),
                         flow,
                         blob->len,
                         inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));

    //echo logic
    int ret = commu->sendto(flow, arg1, arg2 );
    if (ret != 0)
    {
        base->log_.LOG_P_PID(LOG_ERROR, "send response error, ret:%d\n", ret);
        return ret;
    }

    //if all responses were sent, send a NULL blob to release flow
    //blob_type release_cmd;
    //release_cmd.data = NULL;
    //release_cmd.len = 0;
    //commu->sendto(flow, &release_cmd, arg2);

    return 0;
}

//析構資源(可選實現)
//arg1: 保留引數
//arg2: 伺服器容器物件
extern "C" void spp_handle_fini(void* arg1, void* arg2)
{
    //伺服器容器物件
    CServerBase* base = (CServerBase*)arg2;

    base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_fini\n");
}

char *format_time( time_t tm)
{
    static char str_tm[1024];
    struct tm tmm;
    memset(&tmm, 0, sizeof(tmm) );
    localtime_r((time_t *)&tm, &tmm);

    snprintf(str_tm, sizeof(str_tm), "[%04d-%02d-%02d %02d:%02d:%02d]",
             tmm.tm_year + 1900, tmm.tm_mon + 1, tmm.tm_mday,
             tmm.tm_hour, tmm.tm_min, tmm.tm_sec);

    return str_tm;
}