1. 程式人生 > >thrift 原始碼剖析1 :TProcessor

thrift 原始碼剖析1 :TProcessor

TProcessor

  這層主要負責應用層也就是需要我們平常自己實現的一層,它裡面封裝了Handler類。一般thrift 生成的程式碼中我們只需要負責寫Handler類的邏輯即可,Handler中的邏輯就是我們自己定義的服務邏輯。

分析 demo
Service Serv {
    string  put(1:i32 value) ,
}
Handler

在這裡插入圖片描述 在這裡插入圖片描述   可以看到Handler 類繼承了ServIf 類,thrift 是通過c++中Base類指標可以指向Dervied類物件,從而實現通過 Processor來操作我們自己實現的Handler類。

Processor

thrift 內部檔案程式碼:
class TProcessorEventHandler {
public: virtual ~TProcessorEventHandler() {} /** * Called before calling other callback methods. * Expected to return some sort of context object. * The return value is passed to all other callbacks * for that function invocation. */ virtual void* getContext(const char* fn_name,
void* serverContext) { (void)fn_name; (void)serverContext; return NULL; } /** * Expected to free resources associated with a context. */ virtual void freeContext(void* ctx, const char* fn_name) { (void)ctx; (void)fn_name; } /** * Called before reading arguments. */
virtual void preRead(void* ctx, const char* fn_name) { (void)ctx; (void)fn_name; } /** * Called between reading arguments and calling the handler. */ virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) { (void)ctx; (void)fn_name; (void)bytes; } /** * Called between calling the handler and writing the response. */ virtual void preWrite(void* ctx, const char* fn_name) { (void)ctx; (void)fn_name; } /** * Called after writing the response. */ virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) { (void)ctx; (void)fn_name; (void)bytes; } /** * Called when an async function call completes successfully. */ virtual void asyncComplete(void* ctx, const char* fn_name) { (void)ctx; (void)fn_name; } /** * Called if the handler throws an undeclared exception. */ virtual void handlerError(void* ctx, const char* fn_name) { (void)ctx; (void)fn_name; } protected: TProcessorEventHandler() {} }; class TProcessor { public: virtual ~TProcessor() {} virtual bool process(boost::shared_ptr<protocol::TProtocol> in, boost::shared_ptr<protocol::TProtocol> out, void* connectionContext) = 0; bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) { return process(io, io, connectionContext); } boost::shared_ptr<TProcessorEventHandler> getEventHandler() { return eventHandler_; } void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) { eventHandler_ = eventHandler; } protected: TProcessor() {} boost::shared_ptr<TProcessorEventHandler> eventHandler_; }; class TDispatchProcessor : public TProcessor { public: virtual bool process(boost::shared_ptr<protocol::TProtocol> in, boost::shared_ptr<protocol::TProtocol> out, void* connectionContext) { std::string fname; protocol::TMessageType mtype; int32_t seqid; in->readMessageBegin(fname, mtype, seqid); if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { GlobalOutput.printf("received invalid message type %d from client", mtype); return false; } return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext); } protected: virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in, apache::thrift::protocol::TProtocol* out, const std::string& fname, int32_t seqid, void* callContext) = 0; }; 業務生成程式碼: class ServProcessor : public ::apache::thrift::TDispatchProcessor { protected: boost::shared_ptr<ServIf> iface_; virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext); private: typedef void (ServProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*); typedef std::map<std::string, ProcessFunction> ProcessMap; ProcessMap processMap_; void process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); public: ServProcessor(boost::shared_ptr<ServIf> iface) : iface_(iface) { processMap_["put"] = &ServProcessor::process_put; //這個put是我們Service中的函式名 } virtual ~ServProcessor() {} }; bool ServProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) { ProcessMap::iterator pfn; pfn = processMap_.find(fname); if (pfn == processMap_.end()) { iprot->skip(::apache::thrift::protocol::T_STRUCT); iprot->readMessageEnd(); iprot->getTransport()->readEnd(); ::apache::thrift::TApplicationException x(::apache::thrift:: TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'"); oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid); x.write(oprot); oprot->writeMessageEnd(); oprot->getTransport()->writeEnd(); oprot->getTransport()->flush(); return true; } (this->*(pfn->second))(seqid, iprot, oprot, callContext); //Processor::process_put return true; } void ServProcessor::process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) { void* ctx = NULL; if (this->eventHandler_.get() != NULL) { ctx = this->eventHandler_->getContext("Serv.put", callContext); } ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "Serv.put"); if (this->eventHandler_.get() != NULL) { this->eventHandler_->preRead(ctx, "Serv.put"); } Serv_put_args args; args.read(iprot); iprot->readMessageEnd(); uint32_t bytes = iprot->getTransport()->readEnd(); if (this->eventHandler_.get() != NULL) { this->eventHandler_->postRead(ctx, "Serv.put", bytes); } Serv_put_result result; try { iface_->put(result.success, args.value); //這裡呼叫了我們自己設定的邏輯函式 result.__isset.success = true; } catch (const std::exception& e) { if (this->eventHandler_.get() != NULL) { this->eventHandler_->handlerError(ctx, "Serv.put"); } ::apache::thrift::TApplicationException x(e.what()); oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_EXCEPTION, seqid); x.write(oprot); oprot->writeMessageEnd(); oprot->getTransport()->writeEnd(); oprot->getTransport()->flush(); return; } if (this->eventHandler_.get() != NULL) { this->eventHandler_->preWrite(ctx, "Serv.put"); } oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_REPLY, seqid); result.write(oprot); //返回結果 oprot->writeMessageEnd(); bytes = oprot->getTransport()->writeEnd(); oprot->getTransport()->flush(); if (this->eventHandler_.get() != NULL) { this->eventHandler_->postWrite(ctx, "Serv.put", bytes); } }
呼叫邏輯流
TDispatchProcessor::processor -> ServProcessor::dispatchCall -> ServProcessor::process_put 
--> Handler::put 

  從上面的原始碼可以看出ServProcessor::ProcessMap 主要用來查詢方法的,當client發起相應RPC時,RPC會把相應函式的名字傳過去,Server收到函式後就在ProcessMap查詢是否有相應函式名的函式,有使用相應函式指標去呼叫函式,隨後會呼叫相應的業務函式去進行業務處理然後把結果返回回去。