1. 程式人生 > >spp proxy 原始碼分析

spp proxy 原始碼分析

spp proxy demo

#include "defaultproxy.h"

using namespace spp::proxy;

int main(int argc, char* argv[])
{
    CServerBase* proxy = new CDefaultProxy;
    proxy->run(argc, argv);
    delete proxy;
    return 0;
}

原始碼分析

spp_rpc/src/proxy/defaultproxy.h

namespace spp
{
    namespace proxy
    {
        class
CDefaultProxy : public CServerBase, public CFrame { public: // 建構函式+解構函式 // 實際執行函式 void realrun(int argc, char* argv[]); // 服務容器型別 // 獲取介面地址 unsigned GetListenIp(const char *intf); int loop(); // 初始化配置
int initconf(bool reload = false); //一些回撥函式 static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要 static int ctor_recvdata(unsigned flow, void* arg1, void* arg2); //必要 static int ator_recvdata_v2(unsigned flow, void* arg1, void
* arg2); //必要 // ...非必要回調函式 // 接受者 與controller通訊 CTCommu* ator_; // 連線者,共享記憶體佇列,與worker通訊 map<int, CTCommu*> ctor_; //ip限制類型 unsigned char iplimit_; //ip集合 set<unsigned> iptable_; spp_handle_process_t local_handle; }; } }

spp_rpc/src/proxy/defaultproxy.cpp

void CDefaultProxy::realrun(int argc, char* argv[])
{
    //初始化配置
    SingleTon<CTLog, CreateByProto>::SetProto(&flog_);
    initconf(false);
    ix_->fstat_inter_time_ = 5; //5秒 統計一次框架資訊
    time_t fstattime = 0;
    time_t montime = 0;
    static char statbuff[PROXY_STAT_BUF_SIZE];
    char* pstatbuff = (char*)statbuff;
    int bufflen = 0;
    char ctor_stat[1 << 16] = {0};
    int ctor_stat_len = 0;
    int processed = 0;

    while (true)
    {

        //輪詢acceptor
        ator_->poll(processed == 0); // need sleep ?
        processed = 0;

        //輪詢connector
        map<int, CTCommu*>::iterator it;
        for (it = ctor_.begin(); it != ctor_.end(); it++)
        {
            spp_global::set_cur_group_id(it->first);
            processed += it->second->poll(true); // block
        }

        if ( unlikely(get_time_s() - montime > ix_->moni_inter_time_) )
        {

            // 上報資訊到ctrl程序
            CLI_SEND_INFO(&moncli_)->timestamp_ = get_time_s();
            moncli_.run();

            //輸出監控統計資訊
        }

        //檢查quit訊號
        if (unlikely(CServerBase::quit()))
        {

            int count = 20;
            while(count--)
            {
                for (it = ctor_.begin(); it != ctor_.end(); it++)
                {
                    it->second->poll(true); // block
                }
                usleep(20000);
            }
            break; // 20ms * 20
        }

        loop();
    }

    if (sppdll.spp_handle_fini != NULL)
        sppdll.spp_handle_fini(NULL, this);
}

當proxy接收到使用者請求的回撥處理,然後通知對應worker進行處理

int CDefaultProxy::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{
    blob_type* blob = (blob_type*)arg1;
    CDefaultProxy* proxy = (CDefaultProxy*)arg2;
    proxy->flog_.LOG_P_FILE(LOG_DEBUG, "flow:%u,blob len:%d\n", flow, blob->len);
    int total_len = blob->len;
    int processed_len = 0;
    int proto_len = -1;
    int ret = 0;
    int handle_exception = 0;

    while (blob->len > 0 && (proto_len = sppdll.spp_handle_input(flow, arg1, arg2)) > 0)
    {
        // 異常處理
        // ...

        //選取路由number
        unsigned route_no;

        if (!sppdll.spp_handle_route)
        {
            //如果spp_handle_route沒有實現,只選取第一個
            route_no = 1;
        }
        else
        {
            // 0x7FFF 為了去掉GROUPID併兼容
            route_no = sppdll.spp_handle_route(flow, arg1, arg2) & 0x7FFF;
        }

        blob_type sendblob;

        // 分配空間

        if (data != NULL)
        {
            // 獲取sendblob
            memcpy(data, blob->data, proto_len);
            memcpy(data + proto_len, blob->extdata, sizeof(TConnExtInfo));
            sendblob.data = data;
            sendblob.len = proto_len + sizeof(TConnExtInfo);
        }

        // 根據router_no選取相應的worker,將data寫入與該worker相對應的共享記憶體
        map<int, CTCommu*>::iterator iter;
        if ((iter = proxy->ctor_.find(route_no)) != proxy->ctor_.end())
        {
            MONITOR(MONITOR_PROXY_TO_WORKER);
            ret = iter->second->sendto(flow, &sendblob, NULL);
        }

        //異常處理

        blob->data += proto_len;
        blob->len -= proto_len;
    }

    // ...
}

當proxy接收到ctl的心跳詢問,會主動回包維持心跳

int CDefaultProxy::ctor_recvdata(unsigned flow, void* arg1, void* arg2)
{
    blob_type* blob = (blob_type*)arg1;
    CDefaultProxy* proxy = (CDefaultProxy*)arg2;

    // 維持心跳
    int ret = proxy->ator_->sendto(flow, arg1, NULL);

    return 0;
}