thrift 原始碼剖析1 :TProcessor
阿新 • • 發佈:2018-12-20
TProcessor
這層主要負責應用層也就是需要我們平常自己實現的一層,它裡面封裝了Handler類。一般thrift 生成的程式碼中我們只需要負責寫Handler類的邏輯即可,Handler中的邏輯就是我們自己定義的服務邏輯。
分析 demo
Service Serv {
string put(1:i32 value) ,
}
Handler
可以看到Handler 類繼承了ServIf 類,thrift 是通過c++中Base類指標可以指向Dervied類物件,從而實現通過 Processor來操作我們自己實現的Handler類。
Processor
thrift 內部檔案程式碼:
class TProcessorEventHandler {
public:
virtual ~TProcessorEventHandler() {}
/**
* Called before calling other callback methods.
* Expected to return some sort of context object.
* The return value is passed to all other callbacks
* for that function invocation.
*/
virtual void* getContext(const char* fn_name, void* serverContext) {
(void)fn_name;
(void)serverContext;
return NULL;
}
/**
* Expected to free resources associated with a context.
*/
virtual void freeContext(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called before reading arguments.
*/
virtual void preRead(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called between reading arguments and calling the handler.
*/
virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;
(void)fn_name;
(void)bytes;
}
/**
* Called between calling the handler and writing the response.
*/
virtual void preWrite(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called after writing the response.
*/
virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;
(void)fn_name;
(void)bytes;
}
/**
* Called when an async function call completes successfully.
*/
virtual void asyncComplete(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called if the handler throws an undeclared exception.
*/
virtual void handlerError(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
protected:
TProcessorEventHandler() {}
};
class TProcessor {
public:
virtual ~TProcessor() {}
virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
boost::shared_ptr<protocol::TProtocol> out,
void* connectionContext) = 0;
bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
return process(io, io, connectionContext);
}
boost::shared_ptr<TProcessorEventHandler> getEventHandler() { return eventHandler_; }
void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
protected:
TProcessor() {}
boost::shared_ptr<TProcessorEventHandler> eventHandler_;
};
class TDispatchProcessor : public TProcessor {
public:
virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
boost::shared_ptr<protocol::TProtocol> out,
void* connectionContext) {
std::string fname;
protocol::TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid);
if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
GlobalOutput.printf("received invalid message type %d from client", mtype);
return false;
}
return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
}
protected:
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,
apache::thrift::protocol::TProtocol* out,
const std::string& fname,
int32_t seqid,
void* callContext) = 0;
};
業務生成程式碼:
class ServProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
boost::shared_ptr<ServIf> iface_;
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot,
const std::string& fname, int32_t seqid, void* callContext);
private:
typedef void (ServProcessor::*ProcessFunction)(int32_t,
::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_;
void process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:
ServProcessor(boost::shared_ptr<ServIf> iface) :
iface_(iface) {
processMap_["put"] = &ServProcessor::process_put; //這個put是我們Service中的函式名
}
virtual ~ServProcessor() {}
};
bool ServProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid,
void* callContext) {
ProcessMap::iterator pfn;
pfn = processMap_.find(fname);
if (pfn == processMap_.end()) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
::apache::thrift::TApplicationException x(::apache::thrift::
TApplicationException::UNKNOWN_METHOD,
"Invalid method name: '"+fname+"'");
oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return true;
}
(this->*(pfn->second))(seqid, iprot, oprot, callContext); //Processor::process_put
return true;
}
void ServProcessor::process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol*
iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;
if (this->eventHandler_.get() != NULL) {
ctx = this->eventHandler_->getContext("Serv.put", callContext);
}
::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "Serv.put");
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preRead(ctx, "Serv.put");
}
Serv_put_args args;
args.read(iprot);
iprot->readMessageEnd();
uint32_t bytes = iprot->getTransport()->readEnd();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postRead(ctx, "Serv.put", bytes);
}
Serv_put_result result;
try {
iface_->put(result.success, args.value); //這裡呼叫了我們自己設定的邏輯函式
result.__isset.success = true;
} catch (const std::exception& e) {
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->handlerError(ctx, "Serv.put");
}
::apache::thrift::TApplicationException x(e.what());
oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return;
}
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preWrite(ctx, "Serv.put");
}
oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot); //返回結果
oprot->writeMessageEnd();
bytes = oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postWrite(ctx, "Serv.put", bytes);
}
}
呼叫邏輯流
TDispatchProcessor::processor -> ServProcessor::dispatchCall -> ServProcessor::process_put
--> Handler::put
從上面的原始碼可以看出ServProcessor::ProcessMap 主要用來查詢方法的,當client發起相應RPC時,RPC會把相應函式的名字傳過去,Server收到函式後就在ProcessMap查詢是否有相應函式名的函式,有使用相應函式指標去呼叫函式,隨後會呼叫相應的業務函式去進行業務處理然後把結果返回回去。