1. 程式人生 > >Thrift結構分析及增加取客戶端IP功能實現

Thrift結構分析及增加取客戶端IP功能實現

#ifndef MOOON_NET_THRIFT_HELPER_H

#define MOOON_NET_THRIFT_HELPER_H

#include <mooon/net/config.h>

#include <mooon/sys/log.h>

#include <mooon/utils/scoped_ptr.h>

#include <arpa/inet.h>

#include <boost/scoped_ptr.hpp>

#include <thrift/concurrency/PosixThreadFactory.h>

#include <thrift/concurrency/ThreadManager.h>

#include <thrift/protocol/TBinaryProtocol.h>

#include <thrift/server/TNonblockingServer.h>

#include <thrift/transport/TSocketPool.h>

#include <thrift/transport/TTransportException.h>

NET_NAMESPACE_BEGIN

// 用來判斷thrift是否已經連線,包括兩種情況:

// 1.從未連線過,也就是還未開啟過連線

// 2.連線被對端關閉了

inline bool thrift_not_connected(

        apache::thrift::transport::TTransportException::TTransportExceptionType type)

{

    return (apache::thrift::transport::TTransportException::NOT_OPEN == type)

        || (apache::thrift::transport::TTransportException::END_OF_FILE == type);

}

inline bool thrift_not_connected(

        apache::thrift::transport::TTransportException& ex)

{

    apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();

    return thrift_not_connected(type);

}

// thrift客戶端輔助類

//

// 使用示例:

// mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port);

// try

// {

//     client.connect();

//     client->foo();

// }

// catch (apache::thrift::transport::TTransportException& transport_ex)

// {

//     MYLOG_ERROR("thrift exception: %s\n", transport_ex.what());

// }

// catch (apache::thrift::transport::TApplicationException& app_ex)

// {

//     MYLOG_ERROR("thrift exception: %s\n", app_ex.what());

// }

// catch (apache::thrift::TException& tx)

// {

//     MYLOG_ERROR("thrift exception: %s\n", tx.what());

// }

// Transport除預設的TFramedTransport (TBufferTransports.h),還可選擇:

// TBufferedTransport (TBufferTransports.h)

// THttpTransport

// TZlibTransport

// TFDTransport (TSimpleFileTransport)

//

// Protocol除預設的apache::thrift::protocol::TBinaryProtocol,還可選擇:

// TCompactProtocol

// TJSONProtocol

// TDebugProtocol

template <class ThriftClient,

          class Protocol=apache::thrift::protocol::TBinaryProtocol,

          class Transport=apache::thrift::transport::TFramedTransport>

class CThriftClientHelper

{

public:

    // host thrift服務端的IP地址

    // port thrift服務端的埠號

    // connect_timeout_milliseconds 連線thrift服務端的超時毫秒數

    // receive_timeout_milliseconds 接收thrift服務端發過來的資料的超時毫秒數

    // send_timeout_milliseconds 向thrift服務端傳送資料時的超時毫秒數

    CThriftClientHelper(const std::string &host, uint16_t port,

                        int connect_timeout_milliseconds=2000,

                        int receive_timeout_milliseconds=2000,

                        int send_timeout_milliseconds=2000);

    ~CThriftClientHelper();

    // 連線thrift服務端

    //

    // 出錯時,可丟擲以下幾個thrift異常:

    // apache::thrift::transport::TTransportException

    // apache::thrift::TApplicationException

    // apache::thrift::TException

    void connect();

    // 斷開與thrift服務端的連線

    //

    // 出錯時,可丟擲以下幾個thrift異常:

    // apache::thrift::transport::TTransportException

    // apache::thrift::TApplicationException

    // apache::thrift::TException

    void close();

    ThriftClient* get() { return _client.get(); }

    ThriftClient* get() const { return _client.get(); }

    ThriftClient* operator ->() { return get(); }

    ThriftClient* operator ->() const { return get(); }

    const std::string& get_host() const { return _host; }

    uint16_t get_port() const { return _port; }

private:

    std::string _host;

    uint16_t _port;

    boost::shared_ptr<apache::thrift::transport::TSocketPool> _sock_pool;

    boost::shared_ptr<apache::thrift::transport::TTransport> _socket;

    boost::shared_ptr<apache::thrift::transport::TFramedTransport> _transport;

    boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;

    boost::shared_ptr<ThriftClient> _client;

};

////////////////////////////////////////////////////////////////////////////////

// thrift服務端輔助類

//

// 使用示例:

// mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server;

// try

// {

//     _thrift_server.serve(listen_port);

// }

// catch (apache::thrift::TException& tx)

// {

//     MYLOG_ERROR("thrift exception: %s\n", tx.what());

// }

// ProtocolFactory除了預設的TBinaryProtocolFactory,還可選擇:

// TCompactProtocolFactory

// TJSONProtocolFactory

// TDebugProtocolFactory

//

// Server除預設的TNonblockingServer外,還可選擇:

// TSimpleServer

// TThreadedServer

// TThreadPoolServer

template <class ThriftHandler,

          class ServiceProcessor,

          class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory,

          class Server=apache::thrift::server::TNonblockingServer>

class CThriftServerHelper

{

public:

    // 啟動rpc服務,請注意該呼叫是同步阻塞的,所以需放最後呼叫

    // port thrift服務端的監聽埠號

    // num_threads thrift服務端開啟的執行緒數

    //

    // 出錯時,可丟擲以下幾個thrift異常:

    // apache::thrift::transport::TTransportException

    // apache::thrift::TApplicationException

    // apache::thrift::TException

    // 引數num_io_threads,只有當Server為TNonblockingServer才有效

    void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);

    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);

    void stop();

private:

    boost::shared_ptr<ThriftHandler> _handler;

    boost::shared_ptr<apache::thrift::TProcessor> _processor;

    boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;

    boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager;

    boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory;

    boost::shared_ptr<apache::thrift::server::TServer> _server;

};

////////////////////////////////////////////////////////////////////////////////

// 被thrift回撥的寫日誌函式,由set_thrift_log_write_function()呼叫它

inline void write_log_function(const char* log)

{

    MYLOG_INFO("%s", log);

}

// 將thrift輸出寫入到日誌檔案中

inline void set_thrift_log_write_function()

{

    if (log != NULL)

    {

        apache::thrift::GlobalOutput.setOutputFunction(write_log_function);

    }

}

////////////////////////////////////////////////////////////////////////////////

template <class ThriftClient, class Protocol, class Transport>

CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(

        const std::string &host, uint16_t port,

        int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)

        : _host(host)

        , _port(port)

{

    set_thrift_log_write_function();

    _sock_pool.reset(new apache::thrift::transport::TSocketPool());

    _sock_pool->addServer(host, (int)port);

    _sock_pool->setConnTimeout(connect_timeout_milliseconds);

    _sock_pool->setRecvTimeout(receive_timeout_milliseconds);

    _sock_pool->setSendTimeout(send_timeout_milliseconds);

    _socket = _sock_pool;

    // Transport預設為apache::thrift::transport::TFramedTransport

    _transport.reset(new Transport(_socket));

    // Protocol預設為apache::thrift::protocol::TBinaryProtocol

    _protocol.reset(new Protocol(_transport));

    _client.reset(new ThriftClient(_protocol));

}

template <class ThriftClient, class Protocol, class Transport>

CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper()

{

    close();

}

template <class ThriftClient, class Protocol, class Transport>

void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect()

{

    if (!_transport->isOpen())

    {

        _transport->open();

    }

}

template <class ThriftClient, class Protocol, class Transport>

void CThriftClientHelper<ThriftClient, Protocol, Transport>::close()

{

    if (_transport->isOpen())

    {

        _transport->close();

    }

}

////////////////////////////////////////////////////////////////////////////////

template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>

void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

{

    serve("0.0.0.0", port, num_worker_threads, num_io_threads);

}

template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>

void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)

{

    set_thrift_log_write_function();

    _handler.reset(new ThriftHandler);

    _processor.reset(new ServiceProcessor(_handler));

    // ProtocolFactory預設為apache::thrift::protocol::TBinaryProtocolFactory

    _protocol_factory.reset(new ProtocolFactory());

    _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);

    _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());

    _thread_manager->threadFactory(_thread_factory);

    _thread_manager->start();

    // Server預設為apache::thrift::server::TNonblockingServer

    Server* server = new Server(_processor, _protocol_factory, port, _thread_manager);

    if (sizeof(Server) == sizeof(apache::thrift::server::TNonblockingServer))

        server->setNumIOThreads(num_io_threads);

    _server.reset(server);

    _server->run(); // 這裡也可直接呼叫serve(),但推薦run()

}

template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>

void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::stop()

{

    _server->stop();

}

NET_NAMESPACE_END

#endif // MOOON_NET_THRIFT_HELPER_H