1. 程式人生 > 其它 >支援外掛的訊息中介軟體【msg broker with plugin】

支援外掛的訊息中介軟體【msg broker with plugin】

支援外掛的訊息中介軟體

msg broker with plugin

Msg Broker概念:

msg broker是實現application 之間互通訊的元件。通常為實現application之間的解耦,訊息都是通過msg broker完成轉發。application只需知道其他applicatipn的邏輯名稱,而不需要知道對方的具體位置。Broker中維護一個查詢表,記錄著哪個application註冊在此邏輯名稱之下,所以訊息總是會被正確的投遞到目的地。

msg broker不限於1-1的轉發,也支援1-N的模式。其主要功能有:

  1. 實現多個application的互通訊,而隱藏彼此的位置
  2. 實現訊息個格式的轉換,如json to bin
  3. 安全控制,msg broker可以再轉發訊息前進行一定程度的安全驗證
  4. 增大系統的可伸縮性,由於application通訊的目標變成了邏輯結點,而該邏輯結點可以對應多個物理結點,理論上可以動態的增加物理結點,來擴充套件該邏輯結點的吞吐量。
  5. msg broker可以用來整合服務,並且可以暴樓服務的部分介面

 msg broker 具有的缺點是:

  1. 增加了複雜性,多了一層轉發
  2. 可維護性降低,需要理清msg broker和各個application和服務的關係。
  3. 降低效能,主要是實時效能下降了,訊息需要多轉發一邊,單次請求的延時大大增加了。

當前流行的Broker的特點和缺點:

Msg Broker的結構:

流行的Broker中介軟體介紹:

  1. RabbitMQ

專案地址:http://www.rabbitmq.com/

RabbitMQ是由Erlang開發的以高效、健壯以及高度伸縮性的訊息伺服器。其所包含的概念有Producer、Consumer、Exchange、Queue。RabbitMQ基於QMQP協議,支援的語言也非常豐富,文件也非常清晰。使用RabbitMQ可以實現訂閱釋出模型、RPC模型、路由模型等,參見RabbitMQ的例子:http://www.rabbitmq.com/getstarted.html

但是它有如下侷限性:

  • RabbitMQ 沒有針對連線做控制,它是為高效而生,它對外來的請求是信任的,不存在安全驗證,如任何一個client都可以建立訊息佇列,所以RabbitMQ一定是放在內網的。
  • 使用RabbitMQ ,我們是通過Client遠端操作RabbitMQ,不能定製RabbitMQ的功能。
  1. ZeroMQ

專案地址:http://www.zeromq.org/

ZeroMQ是一個Socket封裝庫,號稱是最快的訊息核心。ZeroMQ可以支援TCP、UDP、IPC等多種通訊協議。ZeroMQ可以實現的通訊模型就更多了,幾乎涵蓋了訊息通訊的所有模式,參見官網介紹http://www.zeromq.org/intro:read-the-manual

其侷限性為:

  • ZeroMQ雖然封裝了訊息傳輸的複雜性,但是它也隱藏了連線的建立、斷開等過程。ZeroMQ傳輸訊息更像是udp資料報,使用者不能知道對方何時連線建立、何時連線斷開。

我們需要一個不一樣的Broker

應用場景介紹

在網路遊戲中,cliet和伺服器是通過tcp長連線的。相對於HTTP+WebServer的不同在於:

  • client連線到伺服器,需要進行身份驗證,通常是client第一個訊息包含身份驗證資料如使用者名稱密碼等,而驗證通過後該連線為可信任連線。
  • client 任意時間都可以向伺服器傳送請求,而不需要伺服器立即返回,同樣,伺服器是在任意時間(當然會有實時性等約束)都可以像client推送訊息。
  • client斷開連線時,伺服器必須捕獲該事件,以便完成一些資料清理操作。
  • client對應的一般是個叢集,但是client無從得知細節,因為它只連線最外層的一個,給他取個名字“MsgBroker”。
  • Msg Broker 不許有一定的安全控制,如心跳、網路包頻率限制等,防範某些可能的攻擊。
  • Msg Broker需要高度可定製。不同的遊戲主要是邏輯不同,而MsgBroker大多大同小異。當然MsgBroker總是會根據需求稍作修改。
  • Msg Broker 主要瓶頸是IO操作,因為它涉及大量的網路連線、斷開、心跳、廣播訊息等。而它具有的領域邏輯則非常非常少。所以Msg Broker的邏輯可以使用動態指令碼實現,其實時性、效率都能滿足要求。

需要的broker具有的功能:

  • 能夠捕獲client連線事件
  • 能夠捕獲client斷開事件
  • 具有網路心跳功能
  • 方便的訊息傳送介面
  • broker可以以client的角色連線到其他Server,因為從其他邏輯角度看,Broker可能是其他服務的使用者。
  • Broker 提供訊息收發框架,邏輯層通過外掛實現。
  • 實現外掛的方式有
    • 動態連結庫,可以將邏輯層封裝到so連結庫中
    • python指令碼,邏輯層可以有python指令碼實現,Broker封裝了載入python、呼叫python,封裝訊息傳送介面到Pyhton
    • Lua指令碼,邏輯層也可以又Lua指令碼實現,Broker封裝了載入lua、呼叫lua、封裝訊息介面給lua。

Msg Broker 結構圖

Msg Broker  的安裝使用:

安裝依賴庫:

由於msg broker支援Python和lua作為外掛,那麼必須確保linux下安裝了相應的標頭檔案。示例中的外掛均只實現了echo功能。

  • 確保Linux系統安裝了Python,推薦python2.6
  • 確保安裝了Python-devel,如果是centos,直接yum即可。
  • 確保安裝了Lua-5.1.4, 其他版本沒有測試過
  • 下載Msg Broker最新原始碼,目前處於0.1版本

svn co https://ffown.googlecode.com/svn/trunk/

  • 編譯原始碼:
    • cd trunk/example/plugin_msg_broker/
    • make
  • 編譯動態連線庫外掛
    • cd plugin/plugin_echo_dll/
    • sh gen_dll.sh

執行示例外掛:

  • 執行動態連結庫
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_dll/libechoso
    • 另開終端,telent 127.0.01 10241, 收入5 回車,再輸入5個字元,通訊協議是body長度加回車加body,如圖:
  • 執行Python 指令碼示例程式
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_py/echo.py
    • 同樣使用telnet 測試echo功能
  • 執行Lua指令碼示例程式
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_lua/lua.py
    • 同樣使用telnet 測試echo功能

外掛層設計分析:

外掛介面:

#ifndef _PLUGIN_H_
#define _PLUGIN_H_

#include "channel.h"
#include "message.h"

class plugin_i
{
public:
    virtual ~plugin_i(){}
    virtual int start() = 0;
    virtual int stop() = 0;

    virtual int handle_broken(channel_ptr_t channel_) = 0;
    virtual int handle_msg(const message_t& msg_, channel_ptr_t channel_) = 0;
};

typedef plugin_i* plugin_ptr_t;
typedef int (*handle_channel_msg_func_t)(const message_t& msg_, channel_ptr_t);
typedef int (*handle_channel_broken_func_t)(channel_ptr_t);

#define HANDLE_CHANNEL_MSG       "handle_channel_msg"
#define HANDLE_CHANNEL_BROKEN "handle_channel_broken"
#endif

各個介面作用如下:

  • start 實現外掛載入,環境初始化
  • stop實現優雅的退出
  • handle msg 為訊息到來通知
  • handle_broken 為對方連線關閉

Channel 設計

channel 用來表示一個連線,可以理解成socket的抽象,也可直接理解成遠端client。

#ifndef _CHANNEL_H_
#define _CHANNEL_H_

#include "socket_i.h"

class channel_t
{
public:
    channel_t(socket_ptr_t sock_);
    ~channel_t();
    void  set_data(void* p);
    void* get_data() const;

    template<typename T>
    T* get_data() const { return (T*)this->get_data(); }

    void async_send(const string& buff_);
    void close();

private:
    socket_ptr_t    m_socket;
    void*               m_data;
};

typedef channel_t* channel_ptr_t;
#endif

各個介面作用如下:

  • 構造,channel必須繫結一個socket
  • set_data get_data用來操作channel私有資料,如我們可以在channel私有資料中存放該channel對應的uid,這樣每個channel之需驗證一次,以後自然知道到來的訊息屬於哪個channel。
  • async_send 非同步傳送訊息
  • close 關閉連線

動態連結庫外掛:

流程如下:
  • 載入動態庫
  • 獲取動態庫的介面,記錄函式指標地址
  • 若有msg到來,呼叫動態連結庫的handle_msg
  • 若連線關閉,呼叫動態連結庫的handl_broken

int plugin_dll_t::start()
{
    m_dll_handler = ::dlopen(m_dll_name.c_str(), RTLD_NOW|RTLD_GLOBAL);

    if (NULL == m_dll_handler)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s>", dlerror()));
        return -1;
    }

    m_msg_cb     = (handle_channel_msg_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_MSG);
    m_broken_cb = (handle_channel_broken_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_BROKEN);

    if (NULL == m_msg_cb)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_MSG));
        return -1;
    }
    if (NULL == m_broken_cb)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_BROKEN));
        return -1;
    }

    return 0;
}

int plugin_dll_t::stop()
{
    ::dlclose(m_dll_handler);
    return 0;
}

int plugin_dll_t::handle_broken(channel_ptr_t channel_)
{
    return m_broken_cb(channel_);
}

int plugin_dll_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    return m_msg_cb(msg_, channel_);
}

Python 外掛

其工作流程如下:
  • 初始化Python解釋權,將封裝的傳送訊息介面註冊到Python虛擬機器中
  • 設定PythonPath
  • 載入python檔案
  • 若msg到來,呼叫python全域性函式handle_msg
  • 若channel斷開,呼叫Python 全域性handle_broken 函式

#include "plugin_impl/plugin_python.h"
#include "plugin_impl/pyext.h"
#include "log_module.h"


plugin_python_t::plugin_python_t(const string& name_):
    m_py_mod(NULL)
{
    string pythonpath = "./";
    int pos = name_.find_last_of('/');
    if (-1 == pos)
    {
        m_py_name = name_;
    }
    else
    {
        m_py_name = name_.substr(pos+1);
        pythonpath = name_.substr(0, pos+1);
    }
    pos = m_py_name.find_first_of('.');
    m_py_name = m_py_name.substr(0, pos);

    Py_InitializeEx(0);
    Py_SetPythonHome((char*)pythonpath.c_str());
    initpyext(this);
    PyRun_SimpleString("import channel;import sys;sys.path.append('./plugin/plugin_echo_py/')");
}

plugin_python_t::~plugin_python_t()
{
    Py_Finalize();
}

int plugin_python_t::start()
{
    if(load_py_mod())
    {
        return -1;
    } 
    return 0;
}

int plugin_python_t::stop()
{
    return 0;
}

int plugin_python_t::load_py_mod()
{
    PyObject *pName, *pModule;
    pName = PyString_FromString(m_py_name.c_str());
    pModule = PyImport_Import(pName);
    if (!pModule )  
    {
        Py_DECREF(pName);
        logerror((PLUGIN_IMPL, "can't find %s.pyn", m_py_name.c_str()));
        if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }  
        return -1;
    }
    m_py_mod = PyModule_GetDict(pModule);
    Py_DECREF(pName);
    Py_DECREF(pModule);
    return 0;
}

int plugin_python_t::handle_broken(channel_ptr_t channel_)
{    
    m_channel_mgr.erase(long(channel_));
    delete channel_;
    return call_py_handle_broken(long(channel_));
}
int plugin_python_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    m_channel_mgr.insert(make_pair((long)channel_, channel_));
    return call_py_handle_msg((long)channel_, msg_.get_body().c_str());
}


int plugin_python_t::call_py_handle_msg(long val, const char* msg)
{
    PyObject *pDict       = m_py_mod;
    const char* func_name = "handle_msg";
    PyObject *pFunc, *arglist, *pRetVal;

    pFunc = PyDict_GetItemString(pDict, func_name);
    if (!pFunc || !PyCallable_Check(pFunc))  
    {  
        logerror((PLUGIN_IMPL, "can't find function [%s]n", func_name));
        return -1;
    }
    arglist = Py_BuildValue("ls", val, msg);
    pRetVal = PyObject_CallObject(pFunc, arglist);
    Py_DECREF(arglist);
    if (pRetVal)
    {
        Py_DECREF(pRetVal);
    }
    if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }
    return 0;
}

int plugin_python_t::call_py_handle_broken(long val)
{
    PyObject *pDict       = m_py_mod;
    const char* func_name = "handle_broken";
    PyObject *pFunc, *arglist, *pRetVal;

    pFunc = PyDict_GetItemString(pDict, func_name);
    if (!pFunc || !PyCallable_Check(pFunc))  
    {  
        logerror((PLUGIN_IMPL, "can't find function [%s]n", func_name));
        return -1;
    }
    arglist = Py_BuildValue("l", val);
    pRetVal = PyObject_CallObject(pFunc, arglist);
    Py_DECREF(arglist);
    if (pRetVal)
    {
        Py_DECREF(pRetVal);
    }
    if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }
    return 0;
}

channel_ptr_t plugin_python_t::get_channel(long p)
{
    map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
    if (it != m_channel_mgr.end())
    {
        return it->second;
    }
    return NULL;
}

Lua 外掛:

工作流程如下:
  • 初始化lua虛擬機器
  • 註冊傳送訊息介面給lua
  • 載入Lua指令碼
  • 有msg到來,呼叫lua的hanle_msg介面
  • 有channel斷開,呼叫lua的handle_broken介面

static plugin_lua_t* g_plugin_lua_obj = NULL;
static int channel_send_msg(lua_State* ls_)
{
    long ptr = (long)luaL_checknumber(ls_, 1);
    size_t len = 0;
    const char* msg = luaL_checklstring(ls_, 2, &len);
    channel_ptr_t c = g_plugin_lua_obj->get_channel(ptr);
    if (c)
    {
        c->async_send(msg);
    }
    return 0;
}

plugin_lua_t::plugin_lua_t(const string& name_):
    m_ls(NULL)
{
    g_plugin_lua_obj = this;
    string luapath = "./";
    int pos = name_.find_last_of('/');
    if (-1 == pos)
    {
        m_lua_name = name_;
    }
    else
    {
        m_lua_name = name_.substr(pos+1);
        luapath = name_.substr(0, pos+1);
    }
    pos = m_lua_name.find_first_of('.');
    m_lua_name = m_lua_name.substr(0, pos);
    
    m_ls = lua_open();
    lua_checkstack(m_ls, 20);

    lua_pushcfunction(m_ls, channel_send_msg);
    lua_setglobal(m_ls, "_tmp_func_");
    luaL_dostring(m_ls, "channel = {} channel.send = _tmp_func_ _tmp_func_ = nil");

    string lua_str = "package.path = package.path .. "" + luapath + "?.lua"";
    luaL_openlibs(m_ls);

    if (luaL_dostring(m_ls, lua_str.c_str()))
    {
        lua_pop(m_ls, 1);
    }
    m_lua_name = name_;
}

plugin_lua_t::~plugin_lua_t()
{
}

int plugin_lua_t::start()
{
    if (load_lua_mod())
    {
        logerror((PLUGIN_IMPL, "can't find %s.luan", m_lua_name.c_str()));
        return -1;
    }
    return 0;
}

int plugin_lua_t::stop()
{
    return 0;
}

int plugin_lua_t::handle_broken(channel_ptr_t channel_)
{
    m_channel_mgr.erase(long(channel_));
    delete channel_;
    return call_lua_handle_broken(long(channel_));
}

int plugin_lua_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    m_channel_mgr.insert(make_pair((long)channel_, channel_));
    return call_lua_handle_msg((long)channel_, msg_.get_body());
}

int plugin_lua_t::load_lua_mod()
{
    if (luaL_dofile(m_ls, m_lua_name.c_str()))
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

int plugin_lua_t::call_lua_handle_msg(long val, const string& msg)
{
    lua_checkstack(m_ls, 20);
    lua_getglobal(m_ls, "handle_msg");
    lua_pushnumber(m_ls, val);
    lua_pushlstring(m_ls, msg.c_str(), msg.size());
    if (lua_pcall(m_ls, 2, 0, 0) != 0)
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

int plugin_lua_t::call_lua_handle_broken(long val)
{
    lua_checkstack(m_ls, 20);
    lua_getglobal(m_ls, "handle_broken");
    lua_pushnumber(m_ls, val);
    if (lua_pcall(m_ls, 1, 0, 0) != 0)
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

channel_ptr_t plugin_lua_t::get_channel(long p)
{
    map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
    if (it != m_channel_mgr.end())
    {
        return it->second;
    }
    return NULL;
}

msg_broker 待完善的地方:

  1. 心跳層還未加入
  2. 外掛層報錯不夠友好
  3. Python 中封裝的channel使用long型,呼叫send介面時需要從long轉化到channel,需要優化一下,直接封裝一個channel物件到Python
  4. Lua中channel的封裝暫時也是使用long來表示,具有和上面一樣的效能損耗問題