thrift C++程式碼分析
阿新 • • 發佈:2019-01-15
C++生成的程式碼,越上面的越新
Calculator.cpp
Calculator.h
Calculator_server.skeleton.cpp
shared_constants.cpp
shared_constants.h
SharedService.cpp
SharedService.h
SharedService_server.skeleton.cpp
shared_types.cpp
shared_types.h
tutorial_constants.cpp
tutorial_constants.h
tutorial_types.cpp
tutorial_types.h
tutorial_types.h(tutorial.thrift)
namespace tutorial {...}
先列舉生成
struct Operation {
enum type {
ADD = 1,
SUBTRACT = 2,
MULTIPLY = 3,
DIVIDE = 4
};
};
生成全域性變數
extern const std::map<int, const char*> _Operation_VALUES_TO_NAMES;
typedef int32_t MyInteger;
定義的結構體和異常生成類,先生成
class Work;
class InvalidOperation;
為每個結構體生成 《_結構體名_isset》 結構體,全都是bool,該欄位是否被設定
typedef struct _Work__isset {
_Work__isset() : num1(true), num2(false), op(false), comment(false) {}
bool num1 :1;
bool num2 :1;
bool op :1;
bool comment :1;
} _Work__isset;
類裡面依次是 拷貝建構函式,賦值運算子,建構函式,解構函式,
資料成員,_類名_isset成員,成員的set方法,__set_成員名
過載==運算子,整數和字串要求相等,過載!=運算子
過載<運算子宣告
成員函式read,write函式宣告,printTo虛擬函式宣告
swap函式宣告 void swap(Work &a, Work &b);
輸出物件函式
inline std::ostream& operator<<(std::ostream& out, const Work& obj)
{
obj.printTo(out);
return out;
}
class Work {
public:
Work(const Work&);
Work& operator=(const Work&);
Work() : num1(0), num2(0), op((Operation::type)0), comment() {
}
virtual ~Work() throw();
int32_t num1;
int32_t num2;
Operation::type op;
std::string comment;
_Work__isset __isset; read結構體時會使用
void __set_num1(const int32_t val);
void __set_num2(const int32_t val);
void __set_op(const Operation::type val);
void __set_comment(const std::string& val);
bool operator == (const Work & rhs) const
{
if (!(num1 == rhs.num1))
return false;
if (!(num2 == rhs.num2))
return false;
if (!(op == rhs.op))
return false;
if (__isset.comment != rhs.__isset.comment)
return false;
else if (__isset.comment && !(comment == rhs.comment))
return false;
return true;
}
bool operator != (const Work &rhs) const {
return !(*this == rhs);
}
bool operator < (const Work & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
virtual void printTo(std::ostream& out) const;
};
InvalidOperation類似
tutorial_types.cpp
namespace tutorial { ... }
列舉生成的
int _kOperationValues[] = {
Operation::ADD,
Operation::SUBTRACT,
Operation::MULTIPLY,
Operation::DIVIDE
};
const char* _kOperationNames[] = {
"ADD",
"SUBTRACT",
"MULTIPLY",
"DIVIDE"
};
全域性變數定義,map的建構函式
map (InputIterator first, InputIterator last,
const key_compare& comp = key_compare(),
const allocator_type& = allocator_type());
const std::map<int, const char*> _Operation_VALUES_TO_NAMES(
::apache::thrift::TEnumIterator(4, _kOperationValues, _kOperationNames),
::apache::thrift::TEnumIterator(-1, NULL, NULL));
namespace apache {
namespace thrift {
class TEnumIterator
: public std::iterator<std::forward_iterator_tag, std::pair<int, const char*> > {
public: TEnumIterator(int n, int* enums, const char** names)
: ii_(0), n_(n), enums_(enums), names_(names) {}
int operator++() { return ++ii_; }
bool operator!=(const TEnumIterator& end) {
THRIFT_UNUSED_VARIABLE(end);
assert(end.n_ == -1);
return (ii_ != n_);
}
std::pair<int, const char*> operator*() const { return std::make_pair(enums_[ii_], names_[ii_]); }
private:
int ii_; 當前陣列索引
const int n_; 陣列總長度
int* enums_; keys
const char** names_; values
};
Work::~Work() throw() {
}表示解構函式不能丟擲異常
void Work::__set_num1(const int32_t val) {
this->num1 = val;
}
void Work::__set_num2(const int32_t val) {
this->num2 = val;
}
void Work::__set_op(const Operation::type val) {
this->op = val;
}
void Work::__set_comment(const std::string& val) {
this->comment = val;
__isset.comment = true;
}
enum TType {
T_STOP = 0,
T_VOID = 1,
T_BOOL = 2,
T_BYTE = 3,
T_I08 = 3,
T_I16 = 6,
T_I32 = 8,
T_U64 = 9,
T_I64 = 10,
T_DOUBLE = 4,
T_STRING = 11,
T_UTF7 = 11,
T_STRUCT = 12,
T_MAP = 13,
T_SET = 14,
T_LIST = 15,
T_UTF8 = 16,
T_UTF16 = 17
}
C++版本支援的型別
struct TInputRecursionTracker {
TProtocol &prot_;
TInputRecursionTracker(TProtocol &prot) : prot_(prot) {
prot_.incrementInputRecursionDepth();
}
~TInputRecursionTracker() {
prot_.decrementInputRecursionDepth();
}
};
TBinaryProtocol extends TProtocol
uint32_t Work::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
TBinaryProtocol.tcc
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStructBegin(std::string& name) {
name = ""; 沒有實際操作
return 0;
}
using ::apache::thrift::protocol::TProtocolException;
class TProtocolException : public apache::thrift::TException {
public:
/**
* Error codes for the various types of exceptions.
*/
enum TProtocolExceptionType {
UNKNOWN = 0,
INVALID_DATA = 1,
NEGATIVE_SIZE = 2,
SIZE_LIMIT = 3,
BAD_VERSION = 4,
NOT_IMPLEMENTED = 5,
DEPTH_LIMIT = 6
};
TProtocolException() : apache::thrift::TException(), type_(UNKNOWN) {}
TProtocolException(TProtocolExceptionType type) : apache::thrift::TException(), type_(type) {}
TProtocolException(const std::string& message)
: apache::thrift::TException(message), type_(UNKNOWN) {}
TProtocolException(TProtocolExceptionType type, const std::string& message)
: apache::thrift::TException(message), type_(type) {}
virtual ~TProtocolException() throw() {}
/**
* Returns an error code that provides information about the type of error
* that has occurred.
*
* @return Error code
*/
TProtocolExceptionType getType() { return type_; }
virtual const char* what() const throw() {
if (message_.empty()) {
switch (type_) {
case UNKNOWN:
return "TProtocolException: Unknown protocol exception";
case INVALID_DATA:
return "TProtocolException: Invalid data";
case NEGATIVE_SIZE:
return "TProtocolException: Negative size";
case SIZE_LIMIT:
return "TProtocolException: Exceeded size limit";
case BAD_VERSION:
return "TProtocolException: Invalid version";
case NOT_IMPLEMENTED:
return "TProtocolException: Not implemented";
default:
return "TProtocolException: (Invalid exception type)";
}
} else {
return message_.c_str();
}
}
protected:
/**
* Error code
*/
TProtocolExceptionType type_;
};
}
}
} // apache::thrift::protocol
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
1位元組type,2位元組序號,如果是T_STOP,fieldId = 0
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{讀work的4個欄位
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->num1);
this->__isset.num1 = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->num2);
this->__isset.num2 = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast0;
xfer += iprot->readI32(ecast0);
this->op = (Operation::type)ecast0;
this->__isset.op = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 4:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->comment);4位元組長度,然後是body
this->__isset.comment = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();無操作
}
xfer += iprot->readStructEnd();無操作
return xfer;
}
struct TOutputRecursionTracker {
TProtocol &prot_;
TOutputRecursionTracker(TProtocol &prot) : prot_(prot) {
prot_.incrementOutputRecursionDepth();
}
~TOutputRecursionTracker() {
prot_.decrementOutputRecursionDepth();
}
};
uint32_t Work::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;寫worker物件
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Work");
xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32(this->num1);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32(this->num2);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("op", ::apache::thrift::protocol::T_I32, 3);
xfer += oprot->writeI32((int32_t)this->op);
xfer += oprot->writeFieldEnd();
if (this->__isset.comment) {
xfer += oprot->writeFieldBegin("comment", ::apache::thrift::protocol::T_STRING, 4);
xfer += oprot->writeString(this->comment);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(Work &a, Work &b) {
using ::std::swap; 庫函式,交換
swap(a.num1, b.num1);
swap(a.num2, b.num2);
swap(a.op, b.op);
swap(a.comment, b.comment);
swap(a.__isset, b.__isset);
}
Work::Work(const Work& other1) {
num1 = other1.num1;拷貝構造
num2 = other1.num2;
op = other1.op;
comment = other1.comment;
__isset = other1.__isset;
}
Work& Work::operator=(const Work& other2) {
num1 = other2.num1;賦值
num2 = other2.num2;
op = other2.op;
comment = other2.comment;
__isset = other2.__isset;
return *this;
}
void Work::printTo(std::ostream& out) const {
using ::apache::thrift::to_string; TToSting.h:boost::lexical_cast<std::string>(t);
out << "Work(";
out << "num1=" << to_string(num1);
out << ", " << "num2=" << to_string(num2);
out << ", " << "op=" << to_string(op);
out << ", " << "comment="; (__isset.comment ? (out << to_string(comment)) : (out << "<null>"));
out << ")";
}
InvalidOperation方法的實現,依次是解構函式,set函式,read,write,swap,拷貝建構函式,賦值,printTo,
因為thrift檔案裡定義了InvalidOperation為exception關鍵字,所以InvalidOperation繼承自TException,
TException在thrift.h檔案裡定義,有一個虛擬函式virtual const char* what() const throw(),所以這裡也生成了
mutable std::string thriftTExceptionMessageHolder_;
const char* what() const throw();
實現
const char* InvalidOperation::what() const throw() {
try {
std::stringstream ss;
ss << "TException - service has thrown: " << *this;
this->thriftTExceptionMessageHolder_ = ss.str();
return this->thriftTExceptionMessageHolder_.c_str();
} catch (const std::exception&) {
return "TException - service has thrown: InvalidOperation";
}
}
_InvalidOperation__isset在read的時候會使用
tutorial_constants.h
#include "tutorial_types.h"
namespace tutorial {
class tutorialConstants {
public:
tutorialConstants();
int32_t INT32CONSTANT;
std::map<std::string, std::string> MAPCONSTANT;
};
extern const tutorialConstants g_tutorial_constants;
} // namespace
tutorial_constants.cpp
定義了g_tutorial_constants,預設建構函式
tutorialConstants::tutorialConstants() {
INT32CONSTANT = 9853;
MAPCONSTANT.insert(std::make_pair("hello", "world"));
MAPCONSTANT.insert(std::make_pair("goodnight", "moon"));
}
shared_types.h/shared_types.cpp
定義SharedStruct
拷貝構造,賦值,構造,析構,欄位key,value,_SharedStruct__isset __isset
set方法,==, !=, <運算子,read,write函式,printTo虛擬函式
shared_constants.cpp
shared_constants.h
namespace shared {
class sharedConstants {
public:
sharedConstants();空實現
};
extern const sharedConstants g_shared_constants;在cpp中定義
} // namespace
SharedService_server.skeleton.cpp
#include "SharedService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace ::shared;
SharedServiceHandler虛繼承自SharedServiceIf,虛繼承主要解決多重繼承的二義性問題
class SharedServiceHandler : virtual public SharedServiceIf {
public:
SharedServiceHandler() {
// Your initialization goes here
}
void getStruct(SharedStruct& _return, const int32_t key) {
// Your implementation goes here
printf("getStruct\n");
}
};
int main(int argc, char **argv) {
int port = 9090;
建立handler,實際是建立了SharedServiceIf
shared_ptr<SharedServiceHandler> handler(new SharedServiceHandler());
用handler建立processor
shared_ptr<TProcessor> processor(new SharedServiceProcessor(handler));
建立socket,並用socket建立Transport
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
建立transportFactory
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
建立protocolFactory
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
用processor,transport,transportFactory, protocolFactory建立server
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
呼叫server的serve方法
server.serve();
return 0;
}
SharedService.h
SharedServiceIf抽象類,提供
virtual void getStruct(SharedStruct& _return, const int32_t key) = 0;
方法
SharedServiceIfFactory抽象類,提供
virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0;
virtual void releaseHandler(SharedServiceIf* /* handler */) = 0;
SharedServiceIf也叫Handler
SharedServiceIfFactory的實現SharedServiceIfSingletonFactory
class SharedServiceIfSingletonFactory : virtual public SharedServiceIfFactory {
public:
SharedServiceIfSingletonFactory(const boost::shared_ptr<SharedServiceIf>& iface) : iface_(iface) {}
virtual ~SharedServiceIfSingletonFactory() {}
virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo&) {
return iface_.get();
}
virtual void releaseHandler(SharedServiceIf* /* handler */) {}
protected:
boost::shared_ptr<SharedServiceIf> iface_;
};
SharedServiceIf的實現SharedServiceNull
class SharedServiceNull : virtual public SharedServiceIf {
public:
virtual ~SharedServiceNull() {}
void getStruct(SharedStruct& /* _return */, const int32_t /* key */) {
return;
}
};
SharedService_getStruct_args類
拷貝構造,賦值,構造,析構,欄位key,_SharedService_getStruct_args__isset
set方法,==, !=, <運算子,read,write函式(和types.h裡的檔案比較沒有printTo虛擬函式)
SharedService_getStruct_pargs類,析構,欄位key,write函式
class SharedService_getStruct_pargs {
public:
virtual ~SharedService_getStruct_pargs() throw();
const int32_t* key;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
SharedService_getStruct_result類
SharedService_getStruct_presult類
因為返回值是SharedStruct,所以多生成了_SharedService_getStruct_presult__isset,結構體區別對待
析構,欄位success,欄位是否被設定,read函式
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
SharedServiceClient是SharedServiceIf的另一個實現
class SharedServiceClient : virtual public SharedServiceIf {
public: 2個建構函式,可接受1個TProtocol或2個TProtocol
private: 2個set函式
public:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {
return piprot_;
}
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {
return poprot_;
}
void getStruct(SharedStruct& _return, const int32_t key);
void send_getStruct(const int32_t key);
void recv_getStruct(SharedStruct& _return);
protected: 4個欄位,2個封裝指標,2個真正的指標
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
};
SharedServiceProcessor繼承TDispatchProcessor,不是虛繼承
class SharedServiceProcessor : public ::apache::thrift::TDispatchProcessor {
protected: 受保護的成員iface_指向SharedServiceIf
boost::shared_ptr<SharedServiceIf> iface_;
protect方法dispatchCall,接收2個protocol,fname函式名,32位seqid,void* callContext
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
private:
定義SharedServiceProcessor類的函式指標ProcessFunction,接收i32,2個proto,void *, 返回void
typedef void (SharedServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
定義ProcessMap,key是string,value是ProcessFunction
typedef std::map<std::string, ProcessFunction> ProcessMap;
processMap_成員
ProcessMap processMap_;
真正處理的方法,接收的型別和ProcessFunction一致
void process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:公有的建構函式,解構函式
SharedServiceProcessor(boost::shared_ptr<SharedServiceIf> iface) :
iface_(iface) {
processMap_["getStruct"] = &SharedServiceProcessor::process_getStruct;
}
virtual ~SharedServiceProcessor() {}
};
SharedServiceProcessorFactory繼承自TProcessorFactory
class SharedServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:用SharedServiceIfFactory來構造SharedServiceProcessorFactory
SharedServiceProcessorFactory(const ::boost::shared_ptr< SharedServiceIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
由TConnectionInfo得到TProcessor,TDispatchProcessor,SharedServiceProcessor
::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);
protected: processorFactory需要SharedServiceIfFactory成員,processor需要SharedServiceIf成員
::boost::shared_ptr< SharedServiceIfFactory > handlerFactory_;
};
SharedServiceMultiface是SharedServiceIf的第三個實現
class SharedServiceMultiface : virtual public SharedServiceIf {
public:
SharedServiceMultiface(std::vector<boost::shared_ptr<SharedServiceIf> >& ifaces) : ifaces_(ifaces) {
}
virtual ~SharedServiceMultiface() {}
protected: 含有多個SharedServiceIf
std::vector<boost::shared_ptr<SharedServiceIf> > ifaces_;
SharedServiceMultiface() {} 多了一個add方法
void add(boost::shared_ptr<SharedServiceIf> iface) {
ifaces_.push_back(iface);
}
public:從ifaces_陣列中得到一個SharedServiceIf,呼叫它的getStruct
void getStruct(SharedStruct& _return, const int32_t key) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
ifaces_[i]->getStruct(_return, key);
}
ifaces_[i]->getStruct(_return, key);
return;
}
};
SharedServiceIf的第四個實現,執行緒安全的client,多個執行緒共享一個連線
// The 'concurrent' client is a thread safe client that correctly handles
// out of order responses. It is slower than the regular client, so should
// only be used when you need to share a connection among multiple threads
class SharedServiceConcurrentClient : virtual public SharedServiceIf {
public: 建構函式,設定TProtocol
private: 2個set函式
public:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol()
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol()
void getStruct(SharedStruct& _return, const int32_t key);
int32_t send_getStruct(const int32_t key);
void recv_getStruct(SharedStruct& _return, const int32_t seqid);
protected:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
::apache::thrift::async::TConcurrentClientSyncInfo sync_; 多了1個這個
};
SharedService.cpp
SharedService_getStruct_args類的解構函式,read(讀到this->key,通過引用),write函式,讀寫欄位名key(實際沒有),int32,序號1
SharedService_getStruct_pargs類的解構函式,write函式,寫欄位名key,int32
SharedService_getStruct_result類的解構函式,read(this->success.read,呼叫SharedStruct的read,會填充this->success),
write函式,寫欄位名success,型別,序號,這裡的序號是0(thrift都是從序號1開始),呼叫SharedStruct的write
SharedService_getStruct_presult類的解構函式,read函式,同上面的read
SharedServiceClient::getStruct(SharedStruct& _return, const int32_t key)
send_getStruct(key);
recv_getStruct(_return);
typedef enum {
T_CALL = 1,
T_REPLY = 2,
T_EXCEPTION = 3,
T_ONEWAY = 4
} ThriftMessageType;
SharedServiceClient::send_getStruct(const int32_t key)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin("getStruct", ::apache::thrift::protocol::T_CALL, cseqid);
32位,T_CALL | VERSION_1 = 0x80010000
String: getStruct
32位,seqid
SharedService_getStruct_pargs args;
args.key = &key;
args.write(oprot_); 呼叫上面的SharedService_getStruct_pargs的write
oprot_->writeMessageEnd();無操作
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
}
template <class Transport_, class ByteOrder_ = TNetworkBigEndian>
class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > {
struct TNetworkBigEndian
{
static uint16_t toWire16(uint16_t x) {return htons(x);}
static uint32_t toWire32(uint32_t x) {return htonl(x);}
static uint64_t toWire64(uint64_t x) {return THRIFT_htonll(x);}
static uint16_t fromWire16(uint16_t x) {return ntohs(x);}
static uint32_t fromWire32(uint32_t x) {return ntohl(x);}
static uint64_t fromWire64(uint64_t x) {return THRIFT_ntohll(x);}
}
TBinaryProtocolT::writeI32
this->trans_->write((uint8_t*)&net, 4);
TTransport::write---->write_virt(buf, len)---->TVirtualTransport::write_virt
---->static_cast<Transport_*>(this)->write(buf, len)
繼承體系
TBufferedTransport---->TVirtualTransport<TBufferedTransport, TBufferBase>---->TBufferBase
---->TVirtualTransport<TBufferBase, TTransportDefaults>---->TTransportDefaults---->TTransport
TBufferedTransport沒有write方法,它呼叫的最現代的write方法是TBufferBase的write
先往緩衝區寫,寫滿了,呼叫writeSlow,TBufferedTransport實現了writeSlow,呼叫transport_的write方法
TVirtualTransport<class Transport_, class Super_ = TTransportDefaults>的read_virt會呼叫Transport_的
read方法
TSocket---->TSocket : public TVirtualTransport<TSocket>---->TTransportDefaults---->TTransport
recv_getStruct得到返回結果
bool SharedServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext)
從processMap_中取到fname對應的處理函式,呼叫,如果沒找到,寫給客戶端TApplicationException
(this->*(pfn->second))(seqid, iprot, oprot, callContext);
void SharedServiceProcessor::process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, void* callContext)
SharedServiceProcessor---->TDispatchProcessor---->TProcessor(boost::shared_ptr<TProcessorEventHandler> eventHandler_;)
TProcessorEventHandler
Calculator_server.skeleton.cpp
class CalculatorHandler : virtual public CalculatorIf
ping,add,calculate,zip方法的簡單實現
main方法
Calculator.h
Calculator.cpp
CppClient.cpp
boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);
transport->open()---->transport_->open()---->開啟socket
CppServer.cpp
class CalculatorHandler : public CalculatorIf
protected:
map<int32_t, SharedStruct> log;
實現了ping,add,calculate,getStruct,zip方法
calculate會往log裡放entry,entry是在棧上分配的,往log裡放時map應該拷貝物件了
class CalculatorCloneFactory : virtual public CalculatorIfFactory {
virtual CalculatorIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
{
boost::shared_ptr<TSocket> sock = boost::dynamic_pointer_cast<TSocket>(connInfo.transport);
從connInfo.transport得到TSocket
cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";
return new CalculatorHandler; 建立一個CalculatorHandler物件
}
virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
delete handler;刪除傳進來的handler
}
boost::make_shared是建立一個物件,返回物件指標shared_ptr
TThreadedServer server(
boost::make_shared<CalculatorProcessorFactory>(boost::make_shared<CalculatorCloneFactory>()),
CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorProcessorFactory有
::boost::shared_ptr< CalculatorIfFactory > handlerFactory_;這個欄位
所以是用CalculatorCloneFactory物件初始化ProcessorFactory
boost::make_shared<TServerSocket>(9090), //port
TServerSocket---->TServerTransport
socket_func_t = ::std::tr1::function<void(THRIFT_SOCKET fd)>, THRIFT_SOCKET = int
const static int DEFAULT_BACKLOG = 1024;
boost::make_shared<TBufferedTransportFactory>(),
TBufferedTransportFactory---->TTransportFactory
getTransport(trans) ----> new TBufferedTransport(trans)
boost::make_shared<TBinaryProtocolFactory>());
typedef TBinaryProtocolFactoryT<TTransport> TBinaryProtocolFactory;
TBinaryProtocolFactoryT---->TProtocolFactory
Transport_ = TTransport, ByteOrder_ = TNetworkBigEndian
getProtocol(boost::shared_ptr<TTransport> trans)
new TBinaryProtocolT<Transport_, ByteOrder_>(
specific_trans,
string_limit_,
container_limit_,
strict_read_,
strict_write_);
boost::shared_ptr 是一個指標
還有一個預設的引數
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
new apache::thrift::concurrency::PlatformThreadFactory)
用PlatformThreadFactory物件的指標給threadFactory賦值
TThreadedServer---->TServerFramework---->TServer---->Runnable
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
apache::thrift::concurrency::Monitor clientsMonitor_;
繼承自TServer的欄位
boost::shared_ptr<TProcessorFactory> processorFactory_;
boost::shared_ptr<TServerTransport> serverTransport_;
boost::shared_ptr<TTransportFactory> inputTransportFactory_;
boost::shared_ptr<TTransportFactory> outputTransportFactory_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
boost::shared_ptr<TServerEventHandler> eventHandler_;
TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory)
TServer(processorFactory, serverTransport, transportFactory, protocolFactory), 欄位賦值
clients_(0),
hwm_(0), 併發客戶端數
limit_(INT64_MAX)
threadFactory_(threadFactory)
server.serve();
void TThreadedServer::serve() {
TServerFramework::serve();
// Drain all clients - no more will arrive
try {
Synchronized s(clientsMonitor_); 用monitor物件,構造Synchronized,內含guard, 構造時加鎖,析構時解鎖
while (getConcurrentClientCount() > 0) {
getConcurrentClientCount會Synchronized sync(mon_),mon_是TServerFramework私有的
clientsMonitor_.wait(); 等待clientsMonitor_的notify
}
} catch (TException& tx) {
string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
GlobalOutput(errStr.c_str());
}
}
void TServerFramework::serve() {
shared_ptr<TTransport> client;
shared_ptr<TTransport> inputTransport;
shared_ptr<TTransport> outputTransport;
shared_ptr<TProtocol> inputProtocol;
shared_ptr<TProtocol> outputProtocol;
// Start the server listening
serverTransport_->listen(); 監聽埠
// Run the preServe event to indicate server is now listening
// and that it is safe to connect.
if (eventHandler_) { 預設沒有
eventHandler_->preServe();
}
// Fetch client from server
for (;;) {
try {
// Dereference any resources from any previous client creation
// such that a blocking accept does not hold them indefinitely.
outputProtocol.reset();
inputProtocol.reset();
outputTransport.reset();
inputTransport.reset();
client.reset();清空指標值
// If we have reached the limit on the number of concurrent
// clients allowed, wait for one or more clients to drain before
// accepting another.
{
Synchronized sync(mon_);
while (clients_ >= limit_) {
mon_.wait(); 等待mon_的notify
}
}
TServerTransport的accept,呼叫的是TServerSocket的acceptImpl
client = serverTransport_->accept();
TSocket::TSocket(THRIFT_SOCKET socket, boost::shared_ptr<THRIFT_SOCKET> interruptListener)
listen函式建立的2對socket
interruptSockWriter_ = sv[1];
interruptSockReader_ = sv[0];
childInterruptSockWriter_ = sv[1];
pChildInterruptSockReader_
= boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
用於對外提供介面中斷連線,TServerFramework::stop()
serverTransport_->interrupt();serverTransport_->interruptChildren();
TServerSocket::interrupt() TServerSocket::interruptChildren()
TServerSocket::notify(THRIFT_SOCKET notifySocket)
網通訊管道里寫1位元組的0,對應的reader讀到了,就拋異常,外面捕獲(客戶端連線),或者關閉server
建立shared_ptr<TSocket> client = createSocket(clientSocket); clientSocket是一個fd
設定fd的一些引數,返回
根據client建立transport(TBufferedTransport),
根據transport建立protocol(TBinaryProtocolT)
inputTransport = inputTransportFactory_->getTransport(client);
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
newlyConnectedClient(shared_ptr<TConnectedClient>(
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol,
outputProtocol,
eventHandler_,
client),
bind(&TServerFramework::disposeConnectedClient, this, _1)));
建立TConnectedClient指標,包含2個引數,建立的物件,和刪除物件的方法
意思是用函式TServerFramework::disposeConnectedClient(this)(TConnectedClient *)
this->disposeConnectedClient(TConnectedClient* pClient)
來刪除TConnectedClient物件
--clients_, 如果允許新來客戶端連線,呼叫mon_的notify
onClientDisconnected(pClient) 呼叫TThreadedServer::onClientDisconnected,呼叫
clientsMonitor_.notify()(TThreadedServer的鎖)
然後將建立的指標傳給newlyConnectedClient方法
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol,
outputProtocol,
eventHandler_,
client)
getProcessor方法是TServer的成員方法,TServerFramework繼承了
boost::shared_ptr<TProcessor> getProcessor(boost::shared_ptr<TProtocol> inputProtocol,
boost::shared_ptr<TProtocol> outputProtocol,
boost::shared_ptr<TTransport> transport) {
TConnectionInfo connInfo; 3個欄位
connInfo.input = inputProtocol;
connInfo.output = outputProtocol;
connInfo.transport = transport; 可轉換為TSocket(就是之前建立的client),獲取連線資訊
return processorFactory_->getProcessor(connInfo);
}
processorFactory_是CalculatorProcessorFactory,生成的程式碼
::boost::shared_ptr< ::apache::thrift::TProcessor > CalculatorProcessorFactory::getProcessor
(const ::apache::thrift::TConnectionInfo& connInfo) {
handlerFactory_是CalculatorCloneFactory,建立ReleaseHandler物件cleanup
ReleaseHandler過載了()運算子,
void operator()(typename HandlerFactory_::Handler* handler) {
if (handler) {
handlerFactory_->releaseHandler(handler);
}
}
virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
delete handler;
}
CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorIfFactory中定義了
typedef CalculatorIf Handler,所以Handler是CalculatorIf,實際是CalculatorHandler
CppServer實現的
::apache::thrift::ReleaseHandler< CalculatorIfFactory > cleanup(handlerFactory_);
CalculatorCloneFactory的getHandler,new了一個CalculatorHandler, 同時指定刪除物件時的回撥
它提供了一個刪除Handler物件的方法,它可以形如cleanup(handler)的形式呼叫,它不是函式指標,所以
必須過載運算子來實現;
::boost::shared_ptr< CalculatorIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
用建立的CalculatorHandler物件,建立CalculatorProcessor物件, 賦給iface_成員
::boost::shared_ptr< ::apache::thrift::TProcessor > processor(new CalculatorProcessor(handler));
return processor;
}
CalculatorProcessor---->SharedServiceProcessor---->TDispatchProcessor---->TProcessor(), 有eventHandler_欄位
processer負責讀引數,呼叫CalculatorHandler的方法,構造返回值給客戶端
建立完TConnectedClient,呼叫
void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
onClientConnected(pClient);
TThreadedServer::onClientConnected
threadFactory_->newThread(pClient)->start();
pClient作為一個runnable,建立一個執行緒執行其run方法
void TConnectedClient::run() {
if (eventHandler_) {
opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
}
for (bool done = false; !done;) {
if (eventHandler_) {
eventHandler_->processContext(opaqueContext_, client_);
}
try {
if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
呼叫processor的process方法, 即CalculatorProcessor的process_XXX方法
break;
}
} catch (const TTransportException& ttx) {
switch (ttx.getType()) {
case TTransportException::TIMED_OUT:
// Receive timeout - continue processing.
continue;
case TTransportException::END_OF_FILE:
case TTransportException::INTERRUPTED:
// Client disconnected or was interrupted. No logging needed. Done.
done = true;
break;
default: {
// All other transport exceptions are logged.
// State of connection is unknown. Done.
string errStr = string("TConnectedClient died: ") + ttx.what();
GlobalOutput(errStr.c_str());
done = true;
break;
}
}
} catch (const TException& tex) {
string errStr = string("TConnectedClient processing exception: ") + tex.what();
GlobalOutput(errStr.c_str());
// Continue processing
}
}
cleanup();
inputProtocol_->getTransport()->close(); TProtocol::ptrans
outputProtocol_->getTransport()->close();
client_->close(); 這三個呼叫都是呼叫client.close
}
// Count a concurrent client added. 計數增加,需同步執行
Synchronized sync(mon_);
++clients_;
hwm_ = std::max(hwm_, clients_);
}
} catch (TTransportException& ttx) {
releaseOneDescriptor("inputTransport", inputTransport);
releaseOneDescriptor("outputTransport", outputTransport);
releaseOneDescriptor("client", client);
if (ttx.getType() == TTransportException::TIMED_OUT) {
// Accept timeout - continue processing.
continue;
} else if (ttx.getType() == TTransportException::END_OF_FILE
|| ttx.getType() == TTransportException::INTERRUPTED) {
// Server was interrupted. This only happens when stopping.
break;
} else {
// All other transport exceptions are logged.
// State of connection is unknown. Done.
string errStr = string("TServerTransport died: ") + ttx.what();
GlobalOutput(errStr.c_str());
break;
}
}
}
releaseOneDescriptor("serverTransport", serverTransport_);
serverTransport_->close()實際是呼叫TServerSocket::close()
關閉serverSocket_,interruptSockWriter_/interruptSockReader_, childInterruptSockWriter_
pChildInterruptSockReader_.reset() 他有自己的銷燬函式,destroyer_of_fine_sockets
}
class Synchronized {
public: 2個建構函式,傳入Monitor,用monitor.mutex()初始化化成員g
Synchronized(const Monitor* monitor) : g(monitor->mutex()) {}
Synchronized(const Monitor& monitor) : g(monitor.mutex()) {}
private:
Guard g; 成員
};
class Guard : boost::noncopyable {
public:
Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
if (timeout == 0) {
value.lock();
} else if (timeout < 0) {
if (!value.trylock()) {
mutex_ = NULL;
}
} else {
if (!value.timedlock(timeout)) {
mutex_ = NULL;
}
}
} 鎖
~Guard() {
if (mutex_) {
mutex_->unlock();
}
} 解鎖
operator bool() const { return (mutex_ != NULL); }
private:
const Mutex* mutex_;
};
class Monitor : boost::noncopyable {
public:
/** Creates a new mutex, and takes ownership of it. */
Monitor();
/** Uses the provided mutex without taking ownership. */
explicit Monitor(Mutex* mutex);
/** Uses the mutex inside the provided Monitor without taking ownership. */
explicit Monitor(Monitor* monitor);
/** Deallocates the mutex only if we own it. */
virtual ~Monitor();
Mutex& mutex() const;
virtual void lock() const;
virtual void unlock() const;
/**
* Waits a maximum of the specified timeout in milliseconds for the condition
* to occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTimeRelative(int64_t timeout_ms) const;
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const THRIFT_TIMESPEC* abstime) const;
/**
* Waits until the absolute time specified using struct timeval.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const struct timeval* abstime) const;
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() const;
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms = 0LL) const;
/** Wakes up one thread waiting on this monitor. */
virtual void notify() const;
/** Wakes up all waiting threads on this monitor. */
virtual void notifyAll() const;
private:
class Impl;
Impl* impl_;
};
class Monitor::Impl {
public:
Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
init(ownedMutex_.get());
}
Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
由另一個monitor來構造一個monitor
Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
~Impl() { cleanup(); }
Mutex& mutex() { return *mutex_; }
void lock() { mutex().lock(); } 加鎖
void unlock() { mutex().unlock(); } 解鎖
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms) const {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
// pthread_cond_timedwait has been observed to return early on
// various platforms, so comment out this assert.
// assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
}
}
/**
* Waits until the specified timeout in milliseconds for the condition to
* occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/ 等待時間間隔
int waitForTimeRelative(int64_t timeout_ms) const {
if (timeout_ms == 0LL) {
return waitForever();
}
struct THRIFT_TIMESPEC abstime;
Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
return waitForTime(&abstime);
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/等待絕對時間
int waitForTime(const THRIFT_TIMESPEC* abstime) const {
assert(mutex_);
pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
// XXX Need to assert that caller owns mutex在mutexImpl上等待
return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
}
int waitForTime(const struct timeval* abstime) const {
struct THRIFT_TIMESPEC temp;
temp.tv_sec = abstime->tv_sec;
temp.tv_nsec = abstime->tv_usec * 1000;
return waitForTime(&temp);
}
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() const {
assert(mutex_);
pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl); 永遠等待直到條件發生
return pthread_cond_wait(&pthread_cond_, mutexImpl);
}
void notify() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_signal(&pthread_cond_); 發出訊號,啟用一個等待執行緒
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
void notifyAll() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_broadcast(&pthread_cond_); 廣播訊號,啟用所有等待執行緒
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
private:將建立的mutex物件賦給mutex_,有可能是null,初始化pthread_cond_t
void init(Mutex* mutex) {
mutex_ = mutex;
if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
condInitialized_ = true;
}
if (!condInitialized_) {
cleanup();
throw SystemResourceException();
}
}
void cleanup() {
if (condInitialized_) {
condInitialized_ = false;
int iret = pthread_cond_destroy(&pthread_cond_);
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
}
scoped_ptr<Mutex> ownedMutex_;
Mutex* mutex_;
mutable pthread_cond_t pthread_cond_;
mutable bool condInitialized_;
};
class Mutex {
public:
typedef void (*Initializer)(void*);
Mutex(Initializer init = DEFAULT_INITIALIZER);
virtual ~Mutex() {}
virtual void lock() const;
virtual bool trylock() const;
virtual bool timedlock(int64_t milliseconds) const;
virtual void unlock() const;
void* getUnderlyingImpl() const;
static void DEFAULT_INITIALIZER(void*);
static void ADAPTIVE_INITIALIZER(void*);
static void RECURSIVE_INITIALIZER(void*);
都是呼叫實現類的方法
private:
class impl;
boost::shared_ptr<impl> impl_;
};
class Mutex::impl {
public: Initializer是個函式指標
impl(Initializer init) : initialized_(false) {
#ifndef THRIFT_NO_CONTENTION_PROFILING
profileTime_ = 0;
#endif
init(&pthread_mutex_); 用來初始化pthread_mutex_t
initialized_ = true;
}
~impl() {
if (initialized_) {
initialized_ = false;
int ret = pthread_mutex_destroy(&pthread_mutex_);
THRIFT_UNUSED_VARIABLE(ret);
assert(ret == 0);
}
}
void lock() const {
PROFILE_MUTEX_START_LOCK();
pthread_mutex_lock(&pthread_mutex_); 加鎖
PROFILE_MUTEX_LOCKED();
}
嘗試加鎖
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
鎖指定的時間
bool timedlock(int64_t milliseconds) const {
#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
PROFILE_MUTEX_START_LOCK();
struct THRIFT_TIMESPEC ts;
Util::toTimespec(ts, milliseconds + Util::currentTime());
int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
if (ret == 0) {
PROFILE_MUTEX_LOCKED();
return true;
}
PROFILE_MUTEX_NOT_LOCKED();
return false;
#else
/* Otherwise follow solution used by Mono for Android */
struct THRIFT_TIMESPEC sleepytime, now, to;
/* This is just to avoid a completely busy wait */
sleepytime.tv_sec = 0;
sleepytime.tv_nsec = 10000000L; /* 10ms */
Util::toTimespec(to, milliseconds + Util::currentTime());
while ((trylock()) == false) {
Util::toTimespec(now, Util::currentTime());
if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
return false;
}
nanosleep(&sleepytime, NULL);
}
return true;
#endif
}
解鎖
void unlock() const {
PROFILE_MUTEX_START_UNLOCK();
pthread_mutex_unlock(&pthread_mutex_);
PROFILE_MUTEX_UNLOCKED();
}
得到底層的實現
void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }
private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
#ifndef THRIFT_NO_CONTENTION_PROFILING
mutable int64_t profileTime_;
#endif
};
class Runnable {內含Thread指標,由外部傳入,提供設定執行緒,鎖住執行緒2個功能
public:
virtual ~Runnable(){};
virtual void run() = 0;
/**
* Gets the thread object that is hosting this runnable object - can return
* an empty boost::shared pointer if no references remain on that thread object
*/runnable弱引用thread,可能返回NULL
virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
/**
* Sets the thread that is executing this object. This is only meant for
* use by concrete implementations of Thread.
*/
virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
private:
boost::weak_ptr<Thread> thread_;
};
class Thread {
public:
#if USE_BOOST_THREAD
typedef boost::thread::id id_t;
static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
static inline id_t get_current() { return boost::this_thread::get_id(); }
#elif USE_STD_THREAD
typedef std::thread::id id_t;
static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
static inline id_t get_current() { return std::this_thread::get_id(); }
#else
typedef pthread_t id_t; 定義執行緒標識id_t
物件公用,判斷傳入的執行緒id是否是當前執行緒,當前執行緒的id沒有作為成員存下來,而是pthread_self獲取
static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
獲取當前執行緒id
static inline id_t get_current() { return pthread_self(); }
#endif
virtual ~Thread(){};
/**
* Starts the thread. Does platform specific thread creation and
* configuration then invokes the run method of the Runnable object bound
* to this thread.
*/
virtual void start() = 0; 啟動執行緒
/**
* Join this thread. Current thread blocks until this target thread
* completes.
*/
virtual void join() = 0; 阻塞執行緒直到目標執行緒結束
/**
* Gets the thread's platform-specific ID
*/
virtual id_t getId() = 0; 獲取特定平臺的執行緒id
/**
* Gets the runnable object this thread is hosting
*/返回runnable
virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
protected:設定runnable,runnable中有thread,thread中有runnable
virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
private:
boost::shared_ptr<Runnable> _runnable;
};
3個factory都繼承自ThreadFactory
virtual ~ThreadFactory() {}
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
都實現了newThread函式
/** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
*/
static const Thread::id_t unknown_thread_id;
virtual Thread::id_t getCurrentThreadId() const = 0;
#if USE_BOOST_THREAD
typedef BoostThreadFactory PlatformThreadFactory;
#elif USE_STD_THREAD
typedef StdThreadFactory PlatformThreadFactory;
#else
typedef PosixThreadFactory PlatformThreadFactory;
#endif
class PosixThreadFactory :
Calculator.cpp
Calculator.h
Calculator_server.skeleton.cpp
shared_constants.cpp
shared_constants.h
SharedService.cpp
SharedService.h
SharedService_server.skeleton.cpp
shared_types.cpp
shared_types.h
tutorial_constants.cpp
tutorial_constants.h
tutorial_types.cpp
tutorial_types.h
tutorial_types.h(tutorial.thrift)
namespace tutorial {...}
先列舉生成
struct Operation {
enum type {
ADD = 1,
SUBTRACT = 2,
MULTIPLY = 3,
DIVIDE = 4
};
};
生成全域性變數
extern const std::map<int, const char*> _Operation_VALUES_TO_NAMES;
typedef int32_t MyInteger;
定義的結構體和異常生成類,先生成
class Work;
class InvalidOperation;
為每個結構體生成 《_結構體名_isset》 結構體,全都是bool,該欄位是否被設定
typedef struct _Work__isset {
_Work__isset() : num1(true), num2(false), op(false), comment(false) {}
bool num1 :1;
bool num2 :1;
bool op :1;
bool comment :1;
} _Work__isset;
類裡面依次是 拷貝建構函式,賦值運算子,建構函式,解構函式,
資料成員,_類名_isset成員,成員的set方法,__set_成員名
過載==運算子,整數和字串要求相等,過載!=運算子
過載<運算子宣告
成員函式read,write函式宣告,printTo虛擬函式宣告
swap函式宣告 void swap(Work &a, Work &b);
輸出物件函式
inline std::ostream& operator<<(std::ostream& out, const Work& obj)
{
obj.printTo(out);
return out;
}
class Work {
public:
Work(const Work&);
Work& operator=(const Work&);
Work() : num1(0), num2(0), op((Operation::type)0), comment() {
}
virtual ~Work() throw();
int32_t num1;
int32_t num2;
Operation::type op;
std::string comment;
_Work__isset __isset; read結構體時會使用
void __set_num1(const int32_t val);
void __set_num2(const int32_t val);
void __set_op(const Operation::type val);
void __set_comment(const std::string& val);
bool operator == (const Work & rhs) const
{
if (!(num1 == rhs.num1))
return false;
if (!(num2 == rhs.num2))
return false;
if (!(op == rhs.op))
return false;
if (__isset.comment != rhs.__isset.comment)
return false;
else if (__isset.comment && !(comment == rhs.comment))
return false;
return true;
}
bool operator != (const Work &rhs) const {
return !(*this == rhs);
}
bool operator < (const Work & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
virtual void printTo(std::ostream& out) const;
};
InvalidOperation類似
tutorial_types.cpp
namespace tutorial { ... }
列舉生成的
int _kOperationValues[] = {
Operation::ADD,
Operation::SUBTRACT,
Operation::MULTIPLY,
Operation::DIVIDE
};
const char* _kOperationNames[] = {
"ADD",
"SUBTRACT",
"MULTIPLY",
"DIVIDE"
};
全域性變數定義,map的建構函式
map (InputIterator first, InputIterator last,
const key_compare& comp = key_compare(),
const allocator_type& = allocator_type());
const std::map<int, const char*> _Operation_VALUES_TO_NAMES(
::apache::thrift::TEnumIterator(4, _kOperationValues, _kOperationNames),
::apache::thrift::TEnumIterator(-1, NULL, NULL));
namespace apache {
namespace thrift {
class TEnumIterator
: public std::iterator<std::forward_iterator_tag, std::pair<int, const char*> > {
public: TEnumIterator(int n, int* enums, const char** names)
: ii_(0), n_(n), enums_(enums), names_(names) {}
int operator++() { return ++ii_; }
bool operator!=(const TEnumIterator& end) {
THRIFT_UNUSED_VARIABLE(end);
assert(end.n_ == -1);
return (ii_ != n_);
}
std::pair<int, const char*> operator*() const { return std::make_pair(enums_[ii_], names_[ii_]); }
private:
int ii_; 當前陣列索引
const int n_; 陣列總長度
int* enums_; keys
const char** names_; values
};
Work::~Work() throw() {
}表示解構函式不能丟擲異常
void Work::__set_num1(const int32_t val) {
this->num1 = val;
}
void Work::__set_num2(const int32_t val) {
this->num2 = val;
}
void Work::__set_op(const Operation::type val) {
this->op = val;
}
void Work::__set_comment(const std::string& val) {
this->comment = val;
__isset.comment = true;
}
enum TType {
T_STOP = 0,
T_VOID = 1,
T_BOOL = 2,
T_BYTE = 3,
T_I08 = 3,
T_I16 = 6,
T_I32 = 8,
T_U64 = 9,
T_I64 = 10,
T_DOUBLE = 4,
T_STRING = 11,
T_UTF7 = 11,
T_STRUCT = 12,
T_MAP = 13,
T_SET = 14,
T_LIST = 15,
T_UTF8 = 16,
T_UTF16 = 17
}
C++版本支援的型別
struct TInputRecursionTracker {
TProtocol &prot_;
TInputRecursionTracker(TProtocol &prot) : prot_(prot) {
prot_.incrementInputRecursionDepth();
}
~TInputRecursionTracker() {
prot_.decrementInputRecursionDepth();
}
};
TBinaryProtocol extends TProtocol
uint32_t Work::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
TBinaryProtocol.tcc
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStructBegin(std::string& name) {
name = ""; 沒有實際操作
return 0;
}
using ::apache::thrift::protocol::TProtocolException;
class TProtocolException : public apache::thrift::TException {
public:
/**
* Error codes for the various types of exceptions.
*/
enum TProtocolExceptionType {
UNKNOWN = 0,
INVALID_DATA = 1,
NEGATIVE_SIZE = 2,
SIZE_LIMIT = 3,
BAD_VERSION = 4,
NOT_IMPLEMENTED = 5,
DEPTH_LIMIT = 6
};
TProtocolException() : apache::thrift::TException(), type_(UNKNOWN) {}
TProtocolException(TProtocolExceptionType type) : apache::thrift::TException(), type_(type) {}
TProtocolException(const std::string& message)
: apache::thrift::TException(message), type_(UNKNOWN) {}
TProtocolException(TProtocolExceptionType type, const std::string& message)
: apache::thrift::TException(message), type_(type) {}
virtual ~TProtocolException() throw() {}
/**
* Returns an error code that provides information about the type of error
* that has occurred.
*
* @return Error code
*/
TProtocolExceptionType getType() { return type_; }
virtual const char* what() const throw() {
if (message_.empty()) {
switch (type_) {
case UNKNOWN:
return "TProtocolException: Unknown protocol exception";
case INVALID_DATA:
return "TProtocolException: Invalid data";
case NEGATIVE_SIZE:
return "TProtocolException: Negative size";
case SIZE_LIMIT:
return "TProtocolException: Exceeded size limit";
case BAD_VERSION:
return "TProtocolException: Invalid version";
case NOT_IMPLEMENTED:
return "TProtocolException: Not implemented";
default:
return "TProtocolException: (Invalid exception type)";
}
} else {
return message_.c_str();
}
}
protected:
/**
* Error code
*/
TProtocolExceptionType type_;
};
}
}
} // apache::thrift::protocol
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
1位元組type,2位元組序號,如果是T_STOP,fieldId = 0
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{讀work的4個欄位
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->num1);
this->__isset.num1 = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->num2);
this->__isset.num2 = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast0;
xfer += iprot->readI32(ecast0);
this->op = (Operation::type)ecast0;
this->__isset.op = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 4:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->comment);4位元組長度,然後是body
this->__isset.comment = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();無操作
}
xfer += iprot->readStructEnd();無操作
return xfer;
}
struct TOutputRecursionTracker {
TProtocol &prot_;
TOutputRecursionTracker(TProtocol &prot) : prot_(prot) {
prot_.incrementOutputRecursionDepth();
}
~TOutputRecursionTracker() {
prot_.decrementOutputRecursionDepth();
}
};
uint32_t Work::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;寫worker物件
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Work");
xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32(this->num1);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32(this->num2);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("op", ::apache::thrift::protocol::T_I32, 3);
xfer += oprot->writeI32((int32_t)this->op);
xfer += oprot->writeFieldEnd();
if (this->__isset.comment) {
xfer += oprot->writeFieldBegin("comment", ::apache::thrift::protocol::T_STRING, 4);
xfer += oprot->writeString(this->comment);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(Work &a, Work &b) {
using ::std::swap; 庫函式,交換
swap(a.num1, b.num1);
swap(a.num2, b.num2);
swap(a.op, b.op);
swap(a.comment, b.comment);
swap(a.__isset, b.__isset);
}
Work::Work(const Work& other1) {
num1 = other1.num1;拷貝構造
num2 = other1.num2;
op = other1.op;
comment = other1.comment;
__isset = other1.__isset;
}
Work& Work::operator=(const Work& other2) {
num1 = other2.num1;賦值
num2 = other2.num2;
op = other2.op;
comment = other2.comment;
__isset = other2.__isset;
return *this;
}
void Work::printTo(std::ostream& out) const {
using ::apache::thrift::to_string; TToSting.h:boost::lexical_cast<std::string>(t);
out << "Work(";
out << "num1=" << to_string(num1);
out << ", " << "num2=" << to_string(num2);
out << ", " << "op=" << to_string(op);
out << ", " << "comment="; (__isset.comment ? (out << to_string(comment)) : (out << "<null>"));
out << ")";
}
InvalidOperation方法的實現,依次是解構函式,set函式,read,write,swap,拷貝建構函式,賦值,printTo,
因為thrift檔案裡定義了InvalidOperation為exception關鍵字,所以InvalidOperation繼承自TException,
TException在thrift.h檔案裡定義,有一個虛擬函式virtual const char* what() const throw(),所以這裡也生成了
mutable std::string thriftTExceptionMessageHolder_;
const char* what() const throw();
實現
const char* InvalidOperation::what() const throw() {
try {
std::stringstream ss;
ss << "TException - service has thrown: " << *this;
this->thriftTExceptionMessageHolder_ = ss.str();
return this->thriftTExceptionMessageHolder_.c_str();
} catch (const std::exception&) {
return "TException - service has thrown: InvalidOperation";
}
}
_InvalidOperation__isset在read的時候會使用
tutorial_constants.h
#include "tutorial_types.h"
namespace tutorial {
class tutorialConstants {
public:
tutorialConstants();
int32_t INT32CONSTANT;
std::map<std::string, std::string> MAPCONSTANT;
};
extern const tutorialConstants g_tutorial_constants;
} // namespace
tutorial_constants.cpp
定義了g_tutorial_constants,預設建構函式
tutorialConstants::tutorialConstants() {
INT32CONSTANT = 9853;
MAPCONSTANT.insert(std::make_pair("hello", "world"));
MAPCONSTANT.insert(std::make_pair("goodnight", "moon"));
}
shared_types.h/shared_types.cpp
定義SharedStruct
拷貝構造,賦值,構造,析構,欄位key,value,_SharedStruct__isset __isset
set方法,==, !=, <運算子,read,write函式,printTo虛擬函式
shared_constants.cpp
shared_constants.h
namespace shared {
class sharedConstants {
public:
sharedConstants();空實現
};
extern const sharedConstants g_shared_constants;在cpp中定義
} // namespace
SharedService_server.skeleton.cpp
#include "SharedService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace ::shared;
SharedServiceHandler虛繼承自SharedServiceIf,虛繼承主要解決多重繼承的二義性問題
class SharedServiceHandler : virtual public SharedServiceIf {
public:
SharedServiceHandler() {
// Your initialization goes here
}
void getStruct(SharedStruct& _return, const int32_t key) {
// Your implementation goes here
printf("getStruct\n");
}
};
int main(int argc, char **argv) {
int port = 9090;
建立handler,實際是建立了SharedServiceIf
shared_ptr<SharedServiceHandler> handler(new SharedServiceHandler());
用handler建立processor
shared_ptr<TProcessor> processor(new SharedServiceProcessor(handler));
建立socket,並用socket建立Transport
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
建立transportFactory
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
建立protocolFactory
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
用processor,transport,transportFactory, protocolFactory建立server
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
呼叫server的serve方法
server.serve();
return 0;
}
SharedService.h
SharedServiceIf抽象類,提供
virtual void getStruct(SharedStruct& _return, const int32_t key) = 0;
方法
SharedServiceIfFactory抽象類,提供
virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0;
virtual void releaseHandler(SharedServiceIf* /* handler */) = 0;
SharedServiceIf也叫Handler
SharedServiceIfFactory的實現SharedServiceIfSingletonFactory
class SharedServiceIfSingletonFactory : virtual public SharedServiceIfFactory {
public:
SharedServiceIfSingletonFactory(const boost::shared_ptr<SharedServiceIf>& iface) : iface_(iface) {}
virtual ~SharedServiceIfSingletonFactory() {}
virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo&) {
return iface_.get();
}
virtual void releaseHandler(SharedServiceIf* /* handler */) {}
protected:
boost::shared_ptr<SharedServiceIf> iface_;
};
SharedServiceIf的實現SharedServiceNull
class SharedServiceNull : virtual public SharedServiceIf {
public:
virtual ~SharedServiceNull() {}
void getStruct(SharedStruct& /* _return */, const int32_t /* key */) {
return;
}
};
SharedService_getStruct_args類
拷貝構造,賦值,構造,析構,欄位key,_SharedService_getStruct_args__isset
set方法,==, !=, <運算子,read,write函式(和types.h裡的檔案比較沒有printTo虛擬函式)
SharedService_getStruct_pargs類,析構,欄位key,write函式
class SharedService_getStruct_pargs {
public:
virtual ~SharedService_getStruct_pargs() throw();
const int32_t* key;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
SharedService_getStruct_result類
SharedService_getStruct_presult類
因為返回值是SharedStruct,所以多生成了_SharedService_getStruct_presult__isset,結構體區別對待
析構,欄位success,欄位是否被設定,read函式
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
SharedServiceClient是SharedServiceIf的另一個實現
class SharedServiceClient : virtual public SharedServiceIf {
public: 2個建構函式,可接受1個TProtocol或2個TProtocol
private: 2個set函式
public:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {
return piprot_;
}
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {
return poprot_;
}
void getStruct(SharedStruct& _return, const int32_t key);
void send_getStruct(const int32_t key);
void recv_getStruct(SharedStruct& _return);
protected: 4個欄位,2個封裝指標,2個真正的指標
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
};
SharedServiceProcessor繼承TDispatchProcessor,不是虛繼承
class SharedServiceProcessor : public ::apache::thrift::TDispatchProcessor {
protected: 受保護的成員iface_指向SharedServiceIf
boost::shared_ptr<SharedServiceIf> iface_;
protect方法dispatchCall,接收2個protocol,fname函式名,32位seqid,void* callContext
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
private:
定義SharedServiceProcessor類的函式指標ProcessFunction,接收i32,2個proto,void *, 返回void
typedef void (SharedServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
定義ProcessMap,key是string,value是ProcessFunction
typedef std::map<std::string, ProcessFunction> ProcessMap;
processMap_成員
ProcessMap processMap_;
真正處理的方法,接收的型別和ProcessFunction一致
void process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:公有的建構函式,解構函式
SharedServiceProcessor(boost::shared_ptr<SharedServiceIf> iface) :
iface_(iface) {
processMap_["getStruct"] = &SharedServiceProcessor::process_getStruct;
}
virtual ~SharedServiceProcessor() {}
};
SharedServiceProcessorFactory繼承自TProcessorFactory
class SharedServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:用SharedServiceIfFactory來構造SharedServiceProcessorFactory
SharedServiceProcessorFactory(const ::boost::shared_ptr< SharedServiceIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
由TConnectionInfo得到TProcessor,TDispatchProcessor,SharedServiceProcessor
::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);
protected: processorFactory需要SharedServiceIfFactory成員,processor需要SharedServiceIf成員
::boost::shared_ptr< SharedServiceIfFactory > handlerFactory_;
};
SharedServiceMultiface是SharedServiceIf的第三個實現
class SharedServiceMultiface : virtual public SharedServiceIf {
public:
SharedServiceMultiface(std::vector<boost::shared_ptr<SharedServiceIf> >& ifaces) : ifaces_(ifaces) {
}
virtual ~SharedServiceMultiface() {}
protected: 含有多個SharedServiceIf
std::vector<boost::shared_ptr<SharedServiceIf> > ifaces_;
SharedServiceMultiface() {} 多了一個add方法
void add(boost::shared_ptr<SharedServiceIf> iface) {
ifaces_.push_back(iface);
}
public:從ifaces_陣列中得到一個SharedServiceIf,呼叫它的getStruct
void getStruct(SharedStruct& _return, const int32_t key) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
ifaces_[i]->getStruct(_return, key);
}
ifaces_[i]->getStruct(_return, key);
return;
}
};
SharedServiceIf的第四個實現,執行緒安全的client,多個執行緒共享一個連線
// The 'concurrent' client is a thread safe client that correctly handles
// out of order responses. It is slower than the regular client, so should
// only be used when you need to share a connection among multiple threads
class SharedServiceConcurrentClient : virtual public SharedServiceIf {
public: 建構函式,設定TProtocol
private: 2個set函式
public:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol()
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol()
void getStruct(SharedStruct& _return, const int32_t key);
int32_t send_getStruct(const int32_t key);
void recv_getStruct(SharedStruct& _return, const int32_t seqid);
protected:
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
::apache::thrift::async::TConcurrentClientSyncInfo sync_; 多了1個這個
};
SharedService.cpp
SharedService_getStruct_args類的解構函式,read(讀到this->key,通過引用),write函式,讀寫欄位名key(實際沒有),int32,序號1
SharedService_getStruct_pargs類的解構函式,write函式,寫欄位名key,int32
SharedService_getStruct_result類的解構函式,read(this->success.read,呼叫SharedStruct的read,會填充this->success),
write函式,寫欄位名success,型別,序號,這裡的序號是0(thrift都是從序號1開始),呼叫SharedStruct的write
SharedService_getStruct_presult類的解構函式,read函式,同上面的read
SharedServiceClient::getStruct(SharedStruct& _return, const int32_t key)
send_getStruct(key);
recv_getStruct(_return);
typedef enum {
T_CALL = 1,
T_REPLY = 2,
T_EXCEPTION = 3,
T_ONEWAY = 4
} ThriftMessageType;
SharedServiceClient::send_getStruct(const int32_t key)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin("getStruct", ::apache::thrift::protocol::T_CALL, cseqid);
32位,T_CALL | VERSION_1 = 0x80010000
String: getStruct
32位,seqid
SharedService_getStruct_pargs args;
args.key = &key;
args.write(oprot_); 呼叫上面的SharedService_getStruct_pargs的write
oprot_->writeMessageEnd();無操作
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
}
template <class Transport_, class ByteOrder_ = TNetworkBigEndian>
class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > {
struct TNetworkBigEndian
{
static uint16_t toWire16(uint16_t x) {return htons(x);}
static uint32_t toWire32(uint32_t x) {return htonl(x);}
static uint64_t toWire64(uint64_t x) {return THRIFT_htonll(x);}
static uint16_t fromWire16(uint16_t x) {return ntohs(x);}
static uint32_t fromWire32(uint32_t x) {return ntohl(x);}
static uint64_t fromWire64(uint64_t x) {return THRIFT_ntohll(x);}
}
TBinaryProtocolT::writeI32
this->trans_->write((uint8_t*)&net, 4);
TTransport::write---->write_virt(buf, len)---->TVirtualTransport::write_virt
---->static_cast<Transport_*>(this)->write(buf, len)
繼承體系
TBufferedTransport---->TVirtualTransport<TBufferedTransport, TBufferBase>---->TBufferBase
---->TVirtualTransport<TBufferBase, TTransportDefaults>---->TTransportDefaults---->TTransport
TBufferedTransport沒有write方法,它呼叫的最現代的write方法是TBufferBase的write
先往緩衝區寫,寫滿了,呼叫writeSlow,TBufferedTransport實現了writeSlow,呼叫transport_的write方法
TVirtualTransport<class Transport_, class Super_ = TTransportDefaults>的read_virt會呼叫Transport_的
read方法
TSocket---->TSocket : public TVirtualTransport<TSocket>---->TTransportDefaults---->TTransport
recv_getStruct得到返回結果
bool SharedServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext)
從processMap_中取到fname對應的處理函式,呼叫,如果沒找到,寫給客戶端TApplicationException
(this->*(pfn->second))(seqid, iprot, oprot, callContext);
void SharedServiceProcessor::process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, void* callContext)
SharedServiceProcessor---->TDispatchProcessor---->TProcessor(boost::shared_ptr<TProcessorEventHandler> eventHandler_;)
TProcessorEventHandler
Calculator_server.skeleton.cpp
class CalculatorHandler : virtual public CalculatorIf
ping,add,calculate,zip方法的簡單實現
main方法
Calculator.h
Calculator.cpp
CppClient.cpp
boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);
transport->open()---->transport_->open()---->開啟socket
CppServer.cpp
class CalculatorHandler : public CalculatorIf
protected:
map<int32_t, SharedStruct> log;
實現了ping,add,calculate,getStruct,zip方法
calculate會往log裡放entry,entry是在棧上分配的,往log裡放時map應該拷貝物件了
class CalculatorCloneFactory : virtual public CalculatorIfFactory {
virtual CalculatorIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
{
boost::shared_ptr<TSocket> sock = boost::dynamic_pointer_cast<TSocket>(connInfo.transport);
從connInfo.transport得到TSocket
cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";
return new CalculatorHandler; 建立一個CalculatorHandler物件
}
virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
delete handler;刪除傳進來的handler
}
boost::make_shared是建立一個物件,返回物件指標shared_ptr
TThreadedServer server(
boost::make_shared<CalculatorProcessorFactory>(boost::make_shared<CalculatorCloneFactory>()),
CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorProcessorFactory有
::boost::shared_ptr< CalculatorIfFactory > handlerFactory_;這個欄位
所以是用CalculatorCloneFactory物件初始化ProcessorFactory
boost::make_shared<TServerSocket>(9090), //port
TServerSocket---->TServerTransport
socket_func_t = ::std::tr1::function<void(THRIFT_SOCKET fd)>, THRIFT_SOCKET = int
const static int DEFAULT_BACKLOG = 1024;
boost::make_shared<TBufferedTransportFactory>(),
TBufferedTransportFactory---->TTransportFactory
getTransport(trans) ----> new TBufferedTransport(trans)
boost::make_shared<TBinaryProtocolFactory>());
typedef TBinaryProtocolFactoryT<TTransport> TBinaryProtocolFactory;
TBinaryProtocolFactoryT---->TProtocolFactory
Transport_ = TTransport, ByteOrder_ = TNetworkBigEndian
getProtocol(boost::shared_ptr<TTransport> trans)
new TBinaryProtocolT<Transport_, ByteOrder_>(
specific_trans,
string_limit_,
container_limit_,
strict_read_,
strict_write_);
boost::shared_ptr 是一個指標
還有一個預設的引數
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
new apache::thrift::concurrency::PlatformThreadFactory)
用PlatformThreadFactory物件的指標給threadFactory賦值
TThreadedServer---->TServerFramework---->TServer---->Runnable
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
apache::thrift::concurrency::Monitor clientsMonitor_;
繼承自TServer的欄位
boost::shared_ptr<TProcessorFactory> processorFactory_;
boost::shared_ptr<TServerTransport> serverTransport_;
boost::shared_ptr<TTransportFactory> inputTransportFactory_;
boost::shared_ptr<TTransportFactory> outputTransportFactory_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
boost::shared_ptr<TServerEventHandler> eventHandler_;
TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory)
TServer(processorFactory, serverTransport, transportFactory, protocolFactory), 欄位賦值
clients_(0),
hwm_(0), 併發客戶端數
limit_(INT64_MAX)
threadFactory_(threadFactory)
server.serve();
void TThreadedServer::serve() {
TServerFramework::serve();
// Drain all clients - no more will arrive
try {
Synchronized s(clientsMonitor_); 用monitor物件,構造Synchronized,內含guard, 構造時加鎖,析構時解鎖
while (getConcurrentClientCount() > 0) {
getConcurrentClientCount會Synchronized sync(mon_),mon_是TServerFramework私有的
clientsMonitor_.wait(); 等待clientsMonitor_的notify
}
} catch (TException& tx) {
string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
GlobalOutput(errStr.c_str());
}
}
void TServerFramework::serve() {
shared_ptr<TTransport> client;
shared_ptr<TTransport> inputTransport;
shared_ptr<TTransport> outputTransport;
shared_ptr<TProtocol> inputProtocol;
shared_ptr<TProtocol> outputProtocol;
// Start the server listening
serverTransport_->listen(); 監聽埠
// Run the preServe event to indicate server is now listening
// and that it is safe to connect.
if (eventHandler_) { 預設沒有
eventHandler_->preServe();
}
// Fetch client from server
for (;;) {
try {
// Dereference any resources from any previous client creation
// such that a blocking accept does not hold them indefinitely.
outputProtocol.reset();
inputProtocol.reset();
outputTransport.reset();
inputTransport.reset();
client.reset();清空指標值
// If we have reached the limit on the number of concurrent
// clients allowed, wait for one or more clients to drain before
// accepting another.
{
Synchronized sync(mon_);
while (clients_ >= limit_) {
mon_.wait(); 等待mon_的notify
}
}
TServerTransport的accept,呼叫的是TServerSocket的acceptImpl
client = serverTransport_->accept();
TSocket::TSocket(THRIFT_SOCKET socket, boost::shared_ptr<THRIFT_SOCKET> interruptListener)
listen函式建立的2對socket
interruptSockWriter_ = sv[1];
interruptSockReader_ = sv[0];
childInterruptSockWriter_ = sv[1];
pChildInterruptSockReader_
= boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
用於對外提供介面中斷連線,TServerFramework::stop()
serverTransport_->interrupt();serverTransport_->interruptChildren();
TServerSocket::interrupt() TServerSocket::interruptChildren()
TServerSocket::notify(THRIFT_SOCKET notifySocket)
網通訊管道里寫1位元組的0,對應的reader讀到了,就拋異常,外面捕獲(客戶端連線),或者關閉server
建立shared_ptr<TSocket> client = createSocket(clientSocket); clientSocket是一個fd
設定fd的一些引數,返回
根據client建立transport(TBufferedTransport),
根據transport建立protocol(TBinaryProtocolT)
inputTransport = inputTransportFactory_->getTransport(client);
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
newlyConnectedClient(shared_ptr<TConnectedClient>(
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol,
outputProtocol,
eventHandler_,
client),
bind(&TServerFramework::disposeConnectedClient, this, _1)));
建立TConnectedClient指標,包含2個引數,建立的物件,和刪除物件的方法
意思是用函式TServerFramework::disposeConnectedClient(this)(TConnectedClient *)
this->disposeConnectedClient(TConnectedClient* pClient)
來刪除TConnectedClient物件
--clients_, 如果允許新來客戶端連線,呼叫mon_的notify
onClientDisconnected(pClient) 呼叫TThreadedServer::onClientDisconnected,呼叫
clientsMonitor_.notify()(TThreadedServer的鎖)
然後將建立的指標傳給newlyConnectedClient方法
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol,
outputProtocol,
eventHandler_,
client)
getProcessor方法是TServer的成員方法,TServerFramework繼承了
boost::shared_ptr<TProcessor> getProcessor(boost::shared_ptr<TProtocol> inputProtocol,
boost::shared_ptr<TProtocol> outputProtocol,
boost::shared_ptr<TTransport> transport) {
TConnectionInfo connInfo; 3個欄位
connInfo.input = inputProtocol;
connInfo.output = outputProtocol;
connInfo.transport = transport; 可轉換為TSocket(就是之前建立的client),獲取連線資訊
return processorFactory_->getProcessor(connInfo);
}
processorFactory_是CalculatorProcessorFactory,生成的程式碼
::boost::shared_ptr< ::apache::thrift::TProcessor > CalculatorProcessorFactory::getProcessor
(const ::apache::thrift::TConnectionInfo& connInfo) {
handlerFactory_是CalculatorCloneFactory,建立ReleaseHandler物件cleanup
ReleaseHandler過載了()運算子,
void operator()(typename HandlerFactory_::Handler* handler) {
if (handler) {
handlerFactory_->releaseHandler(handler);
}
}
virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
delete handler;
}
CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorIfFactory中定義了
typedef CalculatorIf Handler,所以Handler是CalculatorIf,實際是CalculatorHandler
CppServer實現的
::apache::thrift::ReleaseHandler< CalculatorIfFactory > cleanup(handlerFactory_);
CalculatorCloneFactory的getHandler,new了一個CalculatorHandler, 同時指定刪除物件時的回撥
它提供了一個刪除Handler物件的方法,它可以形如cleanup(handler)的形式呼叫,它不是函式指標,所以
必須過載運算子來實現;
::boost::shared_ptr< CalculatorIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
用建立的CalculatorHandler物件,建立CalculatorProcessor物件, 賦給iface_成員
::boost::shared_ptr< ::apache::thrift::TProcessor > processor(new CalculatorProcessor(handler));
return processor;
}
CalculatorProcessor---->SharedServiceProcessor---->TDispatchProcessor---->TProcessor(), 有eventHandler_欄位
processer負責讀引數,呼叫CalculatorHandler的方法,構造返回值給客戶端
建立完TConnectedClient,呼叫
void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
onClientConnected(pClient);
TThreadedServer::onClientConnected
threadFactory_->newThread(pClient)->start();
pClient作為一個runnable,建立一個執行緒執行其run方法
void TConnectedClient::run() {
if (eventHandler_) {
opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
}
for (bool done = false; !done;) {
if (eventHandler_) {
eventHandler_->processContext(opaqueContext_, client_);
}
try {
if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
呼叫processor的process方法, 即CalculatorProcessor的process_XXX方法
break;
}
} catch (const TTransportException& ttx) {
switch (ttx.getType()) {
case TTransportException::TIMED_OUT:
// Receive timeout - continue processing.
continue;
case TTransportException::END_OF_FILE:
case TTransportException::INTERRUPTED:
// Client disconnected or was interrupted. No logging needed. Done.
done = true;
break;
default: {
// All other transport exceptions are logged.
// State of connection is unknown. Done.
string errStr = string("TConnectedClient died: ") + ttx.what();
GlobalOutput(errStr.c_str());
done = true;
break;
}
}
} catch (const TException& tex) {
string errStr = string("TConnectedClient processing exception: ") + tex.what();
GlobalOutput(errStr.c_str());
// Continue processing
}
}
cleanup();
inputProtocol_->getTransport()->close(); TProtocol::ptrans
outputProtocol_->getTransport()->close();
client_->close(); 這三個呼叫都是呼叫client.close
}
// Count a concurrent client added. 計數增加,需同步執行
Synchronized sync(mon_);
++clients_;
hwm_ = std::max(hwm_, clients_);
}
} catch (TTransportException& ttx) {
releaseOneDescriptor("inputTransport", inputTransport);
releaseOneDescriptor("outputTransport", outputTransport);
releaseOneDescriptor("client", client);
if (ttx.getType() == TTransportException::TIMED_OUT) {
// Accept timeout - continue processing.
continue;
} else if (ttx.getType() == TTransportException::END_OF_FILE
|| ttx.getType() == TTransportException::INTERRUPTED) {
// Server was interrupted. This only happens when stopping.
break;
} else {
// All other transport exceptions are logged.
// State of connection is unknown. Done.
string errStr = string("TServerTransport died: ") + ttx.what();
GlobalOutput(errStr.c_str());
break;
}
}
}
releaseOneDescriptor("serverTransport", serverTransport_);
serverTransport_->close()實際是呼叫TServerSocket::close()
關閉serverSocket_,interruptSockWriter_/interruptSockReader_, childInterruptSockWriter_
pChildInterruptSockReader_.reset() 他有自己的銷燬函式,destroyer_of_fine_sockets
}
class Synchronized {
public: 2個建構函式,傳入Monitor,用monitor.mutex()初始化化成員g
Synchronized(const Monitor* monitor) : g(monitor->mutex()) {}
Synchronized(const Monitor& monitor) : g(monitor.mutex()) {}
private:
Guard g; 成員
};
class Guard : boost::noncopyable {
public:
Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
if (timeout == 0) {
value.lock();
} else if (timeout < 0) {
if (!value.trylock()) {
mutex_ = NULL;
}
} else {
if (!value.timedlock(timeout)) {
mutex_ = NULL;
}
}
} 鎖
~Guard() {
if (mutex_) {
mutex_->unlock();
}
} 解鎖
operator bool() const { return (mutex_ != NULL); }
private:
const Mutex* mutex_;
};
class Monitor : boost::noncopyable {
public:
/** Creates a new mutex, and takes ownership of it. */
Monitor();
/** Uses the provided mutex without taking ownership. */
explicit Monitor(Mutex* mutex);
/** Uses the mutex inside the provided Monitor without taking ownership. */
explicit Monitor(Monitor* monitor);
/** Deallocates the mutex only if we own it. */
virtual ~Monitor();
Mutex& mutex() const;
virtual void lock() const;
virtual void unlock() const;
/**
* Waits a maximum of the specified timeout in milliseconds for the condition
* to occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTimeRelative(int64_t timeout_ms) const;
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const THRIFT_TIMESPEC* abstime) const;
/**
* Waits until the absolute time specified using struct timeval.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const struct timeval* abstime) const;
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() const;
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms = 0LL) const;
/** Wakes up one thread waiting on this monitor. */
virtual void notify() const;
/** Wakes up all waiting threads on this monitor. */
virtual void notifyAll() const;
private:
class Impl;
Impl* impl_;
};
class Monitor::Impl {
public:
Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
init(ownedMutex_.get());
}
Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
由另一個monitor來構造一個monitor
Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
~Impl() { cleanup(); }
Mutex& mutex() { return *mutex_; }
void lock() { mutex().lock(); } 加鎖
void unlock() { mutex().unlock(); } 解鎖
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms) const {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
// pthread_cond_timedwait has been observed to return early on
// various platforms, so comment out this assert.
// assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
}
}
/**
* Waits until the specified timeout in milliseconds for the condition to
* occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/ 等待時間間隔
int waitForTimeRelative(int64_t timeout_ms) const {
if (timeout_ms == 0LL) {
return waitForever();
}
struct THRIFT_TIMESPEC abstime;
Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
return waitForTime(&abstime);
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/等待絕對時間
int waitForTime(const THRIFT_TIMESPEC* abstime) const {
assert(mutex_);
pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
// XXX Need to assert that caller owns mutex在mutexImpl上等待
return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
}
int waitForTime(const struct timeval* abstime) const {
struct THRIFT_TIMESPEC temp;
temp.tv_sec = abstime->tv_sec;
temp.tv_nsec = abstime->tv_usec * 1000;
return waitForTime(&temp);
}
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() const {
assert(mutex_);
pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl); 永遠等待直到條件發生
return pthread_cond_wait(&pthread_cond_, mutexImpl);
}
void notify() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_signal(&pthread_cond_); 發出訊號,啟用一個等待執行緒
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
void notifyAll() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_broadcast(&pthread_cond_); 廣播訊號,啟用所有等待執行緒
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
private:將建立的mutex物件賦給mutex_,有可能是null,初始化pthread_cond_t
void init(Mutex* mutex) {
mutex_ = mutex;
if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
condInitialized_ = true;
}
if (!condInitialized_) {
cleanup();
throw SystemResourceException();
}
}
void cleanup() {
if (condInitialized_) {
condInitialized_ = false;
int iret = pthread_cond_destroy(&pthread_cond_);
THRIFT_UNUSED_VARIABLE(iret);
assert(iret == 0);
}
}
scoped_ptr<Mutex> ownedMutex_;
Mutex* mutex_;
mutable pthread_cond_t pthread_cond_;
mutable bool condInitialized_;
};
class Mutex {
public:
typedef void (*Initializer)(void*);
Mutex(Initializer init = DEFAULT_INITIALIZER);
virtual ~Mutex() {}
virtual void lock() const;
virtual bool trylock() const;
virtual bool timedlock(int64_t milliseconds) const;
virtual void unlock() const;
void* getUnderlyingImpl() const;
static void DEFAULT_INITIALIZER(void*);
static void ADAPTIVE_INITIALIZER(void*);
static void RECURSIVE_INITIALIZER(void*);
都是呼叫實現類的方法
private:
class impl;
boost::shared_ptr<impl> impl_;
};
class Mutex::impl {
public: Initializer是個函式指標
impl(Initializer init) : initialized_(false) {
#ifndef THRIFT_NO_CONTENTION_PROFILING
profileTime_ = 0;
#endif
init(&pthread_mutex_); 用來初始化pthread_mutex_t
initialized_ = true;
}
~impl() {
if (initialized_) {
initialized_ = false;
int ret = pthread_mutex_destroy(&pthread_mutex_);
THRIFT_UNUSED_VARIABLE(ret);
assert(ret == 0);
}
}
void lock() const {
PROFILE_MUTEX_START_LOCK();
pthread_mutex_lock(&pthread_mutex_); 加鎖
PROFILE_MUTEX_LOCKED();
}
嘗試加鎖
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
鎖指定的時間
bool timedlock(int64_t milliseconds) const {
#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
PROFILE_MUTEX_START_LOCK();
struct THRIFT_TIMESPEC ts;
Util::toTimespec(ts, milliseconds + Util::currentTime());
int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
if (ret == 0) {
PROFILE_MUTEX_LOCKED();
return true;
}
PROFILE_MUTEX_NOT_LOCKED();
return false;
#else
/* Otherwise follow solution used by Mono for Android */
struct THRIFT_TIMESPEC sleepytime, now, to;
/* This is just to avoid a completely busy wait */
sleepytime.tv_sec = 0;
sleepytime.tv_nsec = 10000000L; /* 10ms */
Util::toTimespec(to, milliseconds + Util::currentTime());
while ((trylock()) == false) {
Util::toTimespec(now, Util::currentTime());
if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
return false;
}
nanosleep(&sleepytime, NULL);
}
return true;
#endif
}
解鎖
void unlock() const {
PROFILE_MUTEX_START_UNLOCK();
pthread_mutex_unlock(&pthread_mutex_);
PROFILE_MUTEX_UNLOCKED();
}
得到底層的實現
void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }
private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
#ifndef THRIFT_NO_CONTENTION_PROFILING
mutable int64_t profileTime_;
#endif
};
class Runnable {內含Thread指標,由外部傳入,提供設定執行緒,鎖住執行緒2個功能
public:
virtual ~Runnable(){};
virtual void run() = 0;
/**
* Gets the thread object that is hosting this runnable object - can return
* an empty boost::shared pointer if no references remain on that thread object
*/runnable弱引用thread,可能返回NULL
virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
/**
* Sets the thread that is executing this object. This is only meant for
* use by concrete implementations of Thread.
*/
virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
private:
boost::weak_ptr<Thread> thread_;
};
class Thread {
public:
#if USE_BOOST_THREAD
typedef boost::thread::id id_t;
static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
static inline id_t get_current() { return boost::this_thread::get_id(); }
#elif USE_STD_THREAD
typedef std::thread::id id_t;
static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
static inline id_t get_current() { return std::this_thread::get_id(); }
#else
typedef pthread_t id_t; 定義執行緒標識id_t
物件公用,判斷傳入的執行緒id是否是當前執行緒,當前執行緒的id沒有作為成員存下來,而是pthread_self獲取
static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
獲取當前執行緒id
static inline id_t get_current() { return pthread_self(); }
#endif
virtual ~Thread(){};
/**
* Starts the thread. Does platform specific thread creation and
* configuration then invokes the run method of the Runnable object bound
* to this thread.
*/
virtual void start() = 0; 啟動執行緒
/**
* Join this thread. Current thread blocks until this target thread
* completes.
*/
virtual void join() = 0; 阻塞執行緒直到目標執行緒結束
/**
* Gets the thread's platform-specific ID
*/
virtual id_t getId() = 0; 獲取特定平臺的執行緒id
/**
* Gets the runnable object this thread is hosting
*/返回runnable
virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
protected:設定runnable,runnable中有thread,thread中有runnable
virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
private:
boost::shared_ptr<Runnable> _runnable;
};
3個factory都繼承自ThreadFactory
virtual ~ThreadFactory() {}
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
都實現了newThread函式
/** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
*/
static const Thread::id_t unknown_thread_id;
virtual Thread::id_t getCurrentThreadId() const = 0;
#if USE_BOOST_THREAD
typedef BoostThreadFactory PlatformThreadFactory;
#elif USE_STD_THREAD
typedef StdThreadFactory PlatformThreadFactory;
#else
typedef PosixThreadFactory PlatformThreadFactory;
#endif
class PosixThreadFactory :