Thrift結構分析及增加取客戶端IP功能實現
#ifndef MOOON_NET_THRIFT_HELPER_H
#define MOOON_NET_THRIFT_HELPER_H
#include <mooon/net/config.h>
#include <mooon/sys/log.h>
#include <mooon/utils/scoped_ptr.h>
#include <arpa/inet.h>
#include <boost/scoped_ptr.hpp>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TSocketPool.h>
#include <thrift/transport/TTransportException.h>
NET_NAMESPACE_BEGIN
// 用來判斷thrift是否已經連線,包括兩種情況:
// 1.從未連線過,也就是還未開啟過連線
// 2.連線被對端關閉了
inline bool thrift_not_connected(
apache::thrift::transport::TTransportException::TTransportExceptionType type)
{
return (apache::thrift::transport::TTransportException::NOT_OPEN == type)
|| (apache::thrift::transport::TTransportException::END_OF_FILE == type);
}
inline bool thrift_not_connected(
apache::thrift::transport::TTransportException& ex)
{
apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();
return thrift_not_connected(type);
}
// thrift客戶端輔助類
//
// 使用示例:
// mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port);
// try
// {
// client.connect();
// client->foo();
// }
// catch (apache::thrift::transport::TTransportException& transport_ex)
// {
// MYLOG_ERROR("thrift exception: %s\n", transport_ex.what());
// }
// catch (apache::thrift::transport::TApplicationException& app_ex)
// {
// MYLOG_ERROR("thrift exception: %s\n", app_ex.what());
// }
// catch (apache::thrift::TException& tx)
// {
// MYLOG_ERROR("thrift exception: %s\n", tx.what());
// }
// Transport除預設的TFramedTransport (TBufferTransports.h),還可選擇:
// TBufferedTransport (TBufferTransports.h)
// THttpTransport
// TZlibTransport
// TFDTransport (TSimpleFileTransport)
//
// Protocol除預設的apache::thrift::protocol::TBinaryProtocol,還可選擇:
// TCompactProtocol
// TJSONProtocol
// TDebugProtocol
template <class ThriftClient,
class Protocol=apache::thrift::protocol::TBinaryProtocol,
class Transport=apache::thrift::transport::TFramedTransport>
class CThriftClientHelper
{
public:
// host thrift服務端的IP地址
// port thrift服務端的埠號
// connect_timeout_milliseconds 連線thrift服務端的超時毫秒數
// receive_timeout_milliseconds 接收thrift服務端發過來的資料的超時毫秒數
// send_timeout_milliseconds 向thrift服務端傳送資料時的超時毫秒數
CThriftClientHelper(const std::string &host, uint16_t port,
int connect_timeout_milliseconds=2000,
int receive_timeout_milliseconds=2000,
int send_timeout_milliseconds=2000);
~CThriftClientHelper();
// 連線thrift服務端
//
// 出錯時,可丟擲以下幾個thrift異常:
// apache::thrift::transport::TTransportException
// apache::thrift::TApplicationException
// apache::thrift::TException
void connect();
// 斷開與thrift服務端的連線
//
// 出錯時,可丟擲以下幾個thrift異常:
// apache::thrift::transport::TTransportException
// apache::thrift::TApplicationException
// apache::thrift::TException
void close();
ThriftClient* get() { return _client.get(); }
ThriftClient* get() const { return _client.get(); }
ThriftClient* operator ->() { return get(); }
ThriftClient* operator ->() const { return get(); }
const std::string& get_host() const { return _host; }
uint16_t get_port() const { return _port; }
private:
std::string _host;
uint16_t _port;
boost::shared_ptr<apache::thrift::transport::TSocketPool> _sock_pool;
boost::shared_ptr<apache::thrift::transport::TTransport> _socket;
boost::shared_ptr<apache::thrift::transport::TFramedTransport> _transport;
boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;
boost::shared_ptr<ThriftClient> _client;
};
////////////////////////////////////////////////////////////////////////////////
// thrift服務端輔助類
//
// 使用示例:
// mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server;
// try
// {
// _thrift_server.serve(listen_port);
// }
// catch (apache::thrift::TException& tx)
// {
// MYLOG_ERROR("thrift exception: %s\n", tx.what());
// }
// ProtocolFactory除了預設的TBinaryProtocolFactory,還可選擇:
// TCompactProtocolFactory
// TJSONProtocolFactory
// TDebugProtocolFactory
//
// Server除預設的TNonblockingServer外,還可選擇:
// TSimpleServer
// TThreadedServer
// TThreadPoolServer
template <class ThriftHandler,
class ServiceProcessor,
class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory,
class Server=apache::thrift::server::TNonblockingServer>
class CThriftServerHelper
{
public:
// 啟動rpc服務,請注意該呼叫是同步阻塞的,所以需放最後呼叫
// port thrift服務端的監聽埠號
// num_threads thrift服務端開啟的執行緒數
//
// 出錯時,可丟擲以下幾個thrift異常:
// apache::thrift::transport::TTransportException
// apache::thrift::TApplicationException
// apache::thrift::TException
// 引數num_io_threads,只有當Server為TNonblockingServer才有效
void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);
void stop();
private:
boost::shared_ptr<ThriftHandler> _handler;
boost::shared_ptr<apache::thrift::TProcessor> _processor;
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;
boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager;
boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory;
boost::shared_ptr<apache::thrift::server::TServer> _server;
};
////////////////////////////////////////////////////////////////////////////////
// 被thrift回撥的寫日誌函式,由set_thrift_log_write_function()呼叫它
inline void write_log_function(const char* log)
{
MYLOG_INFO("%s", log);
}
// 將thrift輸出寫入到日誌檔案中
inline void set_thrift_log_write_function()
{
if (log != NULL)
{
apache::thrift::GlobalOutput.setOutputFunction(write_log_function);
}
}
////////////////////////////////////////////////////////////////////////////////
template <class ThriftClient, class Protocol, class Transport>
CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
const std::string &host, uint16_t port,
int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)
: _host(host)
, _port(port)
{
set_thrift_log_write_function();
_sock_pool.reset(new apache::thrift::transport::TSocketPool());
_sock_pool->addServer(host, (int)port);
_sock_pool->setConnTimeout(connect_timeout_milliseconds);
_sock_pool->setRecvTimeout(receive_timeout_milliseconds);
_sock_pool->setSendTimeout(send_timeout_milliseconds);
_socket = _sock_pool;
// Transport預設為apache::thrift::transport::TFramedTransport
_transport.reset(new Transport(_socket));
// Protocol預設為apache::thrift::protocol::TBinaryProtocol
_protocol.reset(new Protocol(_transport));
_client.reset(new ThriftClient(_protocol));
}
template <class ThriftClient, class Protocol, class Transport>
CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper()
{
close();
}
template <class ThriftClient, class Protocol, class Transport>
void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect()
{
if (!_transport->isOpen())
{
_transport->open();
}
}
template <class ThriftClient, class Protocol, class Transport>
void CThriftClientHelper<ThriftClient, Protocol, Transport>::close()
{
if (_transport->isOpen())
{
_transport->close();
}
}
////////////////////////////////////////////////////////////////////////////////
template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>
void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
{
serve("0.0.0.0", port, num_worker_threads, num_io_threads);
}
template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>
void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
{
set_thrift_log_write_function();
_handler.reset(new ThriftHandler);
_processor.reset(new ServiceProcessor(_handler));
// ProtocolFactory預設為apache::thrift::protocol::TBinaryProtocolFactory
_protocol_factory.reset(new ProtocolFactory());
_thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);
_thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());
_thread_manager->threadFactory(_thread_factory);
_thread_manager->start();
// Server預設為apache::thrift::server::TNonblockingServer
Server* server = new Server(_processor, _protocol_factory, port, _thread_manager);
if (sizeof(Server) == sizeof(apache::thrift::server::TNonblockingServer))
server->setNumIOThreads(num_io_threads);
_server.reset(server);
_server->run(); // 這裡也可直接呼叫serve(),但推薦run()
}
template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server>
void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::stop()
{
_server->stop();
}
NET_NAMESPACE_END
#endif // MOOON_NET_THRIFT_HELPER_H