1. 程式人生 > >thrift C++程式碼分析

thrift C++程式碼分析

C++生成的程式碼,越上面的越新
Calculator.cpp
Calculator.h
Calculator_server.skeleton.cpp
shared_constants.cpp
shared_constants.h
SharedService.cpp
SharedService.h
SharedService_server.skeleton.cpp
shared_types.cpp
shared_types.h
tutorial_constants.cpp
tutorial_constants.h
tutorial_types.cpp
tutorial_types.h

tutorial_types.h(tutorial.thrift)
namespace tutorial {...}
先列舉生成
struct Operation {
  enum type {
    ADD = 1,
    SUBTRACT = 2,
    MULTIPLY = 3,
    DIVIDE = 4
  };
};

生成全域性變數
extern const std::map<int, const char*> _Operation_VALUES_TO_NAMES;
typedef int32_t MyInteger;

定義的結構體和異常生成類,先生成
class Work;
class InvalidOperation;

為每個結構體生成 《_結構體名_isset》 結構體,全都是bool,該欄位是否被設定
typedef struct _Work__isset {
  _Work__isset() : num1(true), num2(false), op(false), comment(false) {}
  bool num1 :1;
  bool num2 :1;
  bool op :1;
  bool comment :1;
} _Work__isset;

類裡面依次是 拷貝建構函式,賦值運算子,建構函式,解構函式,
資料成員,_類名_isset成員,成員的set方法,__set_成員名
過載==運算子,整數和字串要求相等,過載!=運算子
過載<運算子宣告
成員函式read,write函式宣告,printTo虛擬函式宣告

swap函式宣告 void swap(Work &a, Work &b);
輸出物件函式
inline std::ostream& operator<<(std::ostream& out, const Work& obj)
{
  obj.printTo(out);
  return out;
}

class Work {
 public:

  Work(const Work&);
  Work& operator=(const Work&);
  Work() : num1(0), num2(0), op((Operation::type)0), comment() {
  }

  virtual ~Work() throw();
  int32_t num1;
  int32_t num2;
  Operation::type op;
  std::string comment;

  _Work__isset __isset; read結構體時會使用

  void __set_num1(const int32_t val);

  void __set_num2(const int32_t val);

  void __set_op(const Operation::type val);

  void __set_comment(const std::string& val);

  bool operator == (const Work & rhs) const
  {
    if (!(num1 == rhs.num1))
      return false;
    if (!(num2 == rhs.num2))
      return false;
    if (!(op == rhs.op))
      return false;
    if (__isset.comment != rhs.__isset.comment)
      return false;
    else if (__isset.comment && !(comment == rhs.comment))
      return false;
    return true;
  }
  bool operator != (const Work &rhs) const {
    return !(*this == rhs);
  }

  bool operator < (const Work & ) const;

  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;

  virtual void printTo(std::ostream& out) const;
};

InvalidOperation類似

tutorial_types.cpp
namespace tutorial { ... }
列舉生成的
int _kOperationValues[] = {
  Operation::ADD,
  Operation::SUBTRACT,
  Operation::MULTIPLY,
  Operation::DIVIDE
};
const char* _kOperationNames[] = {
  "ADD",
  "SUBTRACT",
  "MULTIPLY",
  "DIVIDE"
};

全域性變數定義,map的建構函式
map (InputIterator first, InputIterator last,
       const key_compare& comp = key_compare(),
       const allocator_type& = allocator_type());
const std::map<int, const char*> _Operation_VALUES_TO_NAMES(
    ::apache::thrift::TEnumIterator(4, _kOperationValues, _kOperationNames),
    ::apache::thrift::TEnumIterator(-1, NULL, NULL));
namespace apache {
namespace thrift {
 
class TEnumIterator
    : public std::iterator<std::forward_iterator_tag, std::pair<int, const char*> > {
public:  TEnumIterator(int n, int* enums, const char** names)
    : ii_(0), n_(n), enums_(enums), names_(names) {}
 
  int operator++() { return ++ii_; }

  bool operator!=(const TEnumIterator& end) {
    THRIFT_UNUSED_VARIABLE(end);
    assert(end.n_ == -1);
    return (ii_ != n_);
  }
 
  std::pair<int, const char*> operator*() const { return std::make_pair(enums_[ii_], names_[ii_]); }
 
private:
  int ii_; 當前陣列索引
  const int n_; 陣列總長度
  int* enums_; keys
  const char** names_; values
};

Work::~Work() throw() {
}表示解構函式不能丟擲異常

void Work::__set_num1(const int32_t val) {
  this->num1 = val;
}

void Work::__set_num2(const int32_t val) {
  this->num2 = val;
}

void Work::__set_op(const Operation::type val) {
  this->op = val;
}

void Work::__set_comment(const std::string& val) {
  this->comment = val;
__isset.comment = true;
}

enum TType {
  T_STOP       = 0,
  T_VOID       = 1,
  T_BOOL       = 2,
  T_BYTE       = 3,
  T_I08        = 3,
  T_I16        = 6,
  T_I32        = 8,
  T_U64        = 9,
  T_I64        = 10,
  T_DOUBLE     = 4,
  T_STRING     = 11,
  T_UTF7       = 11,
  T_STRUCT     = 12,
  T_MAP        = 13,
  T_SET        = 14,
  T_LIST       = 15,
  T_UTF8       = 16,
  T_UTF16      = 17
}
C++版本支援的型別

struct TInputRecursionTracker {
  TProtocol &prot_;
  TInputRecursionTracker(TProtocol &prot) : prot_(prot) {
    prot_.incrementInputRecursionDepth();
  }
  ~TInputRecursionTracker() {
    prot_.decrementInputRecursionDepth();
  }
};

TBinaryProtocol extends TProtocol
uint32_t Work::read(::apache::thrift::protocol::TProtocol* iprot) {

  apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
  uint32_t xfer = 0;
  std::string fname;
  ::apache::thrift::protocol::TType ftype;
  int16_t fid;

  xfer += iprot->readStructBegin(fname);
  TBinaryProtocol.tcc
    template <class Transport_, class ByteOrder_>
    uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStructBegin(std::string& name) {
      name = ""; 沒有實際操作
      return 0;
    }

  using ::apache::thrift::protocol::TProtocolException;

class TProtocolException : public apache::thrift::TException {
public:
  /**
   * Error codes for the various types of exceptions.
   */
  enum TProtocolExceptionType {
    UNKNOWN = 0,
    INVALID_DATA = 1,
    NEGATIVE_SIZE = 2,
    SIZE_LIMIT = 3,
    BAD_VERSION = 4,
    NOT_IMPLEMENTED = 5,
    DEPTH_LIMIT = 6
  };

  TProtocolException() : apache::thrift::TException(), type_(UNKNOWN) {}

  TProtocolException(TProtocolExceptionType type) : apache::thrift::TException(), type_(type) {}

  TProtocolException(const std::string& message)
    : apache::thrift::TException(message), type_(UNKNOWN) {}

  TProtocolException(TProtocolExceptionType type, const std::string& message)
    : apache::thrift::TException(message), type_(type) {}

  virtual ~TProtocolException() throw() {}

  /**
   * Returns an error code that provides information about the type of error
   * that has occurred.
   *
   * @return Error code
   */
  TProtocolExceptionType getType() { return type_; }

  virtual const char* what() const throw() {
    if (message_.empty()) {
      switch (type_) {
      case UNKNOWN:
        return "TProtocolException: Unknown protocol exception";
      case INVALID_DATA:
        return "TProtocolException: Invalid data";
      case NEGATIVE_SIZE:
        return "TProtocolException: Negative size";
      case SIZE_LIMIT:
        return "TProtocolException: Exceeded size limit";
      case BAD_VERSION:
        return "TProtocolException: Invalid version";
      case NOT_IMPLEMENTED:
        return "TProtocolException: Not implemented";
      default:
        return "TProtocolException: (Invalid exception type)";
      }
    } else {
      return message_.c_str();
    }
  }

protected:
  /**
   * Error code
   */
  TProtocolExceptionType type_;
};
}
}
} // apache::thrift::protocol

  while (true)
  {
    xfer += iprot->readFieldBegin(fname, ftype, fid);
    1位元組type,2位元組序號,如果是T_STOP,fieldId = 0
    if (ftype == ::apache::thrift::protocol::T_STOP) {
      break;
    }
    switch (fid)
    {讀work的4個欄位
      case 1:
        if (ftype == ::apache::thrift::protocol::T_I32) {
          xfer += iprot->readI32(this->num1);
          this->__isset.num1 = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
      case 2:
        if (ftype == ::apache::thrift::protocol::T_I32) {
          xfer += iprot->readI32(this->num2);
          this->__isset.num2 = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
      case 3:
        if (ftype == ::apache::thrift::protocol::T_I32) {
          int32_t ecast0;
          xfer += iprot->readI32(ecast0);
          this->op = (Operation::type)ecast0;
          this->__isset.op = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
      case 4:
        if (ftype == ::apache::thrift::protocol::T_STRING) {
          xfer += iprot->readString(this->comment);4位元組長度,然後是body
          this->__isset.comment = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
      default:
        xfer += iprot->skip(ftype);
        break;
    }
    xfer += iprot->readFieldEnd();無操作
  }

  xfer += iprot->readStructEnd();無操作

  return xfer;
}

struct TOutputRecursionTracker {
  TProtocol &prot_;
  TOutputRecursionTracker(TProtocol &prot) : prot_(prot) {
    prot_.incrementOutputRecursionDepth();
  }
  ~TOutputRecursionTracker() {
    prot_.decrementOutputRecursionDepth();
  }
};

uint32_t Work::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;寫worker物件
  apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Work");

  xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);
  xfer += oprot->writeI32(this->num1);
  xfer += oprot->writeFieldEnd();

  xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
  xfer += oprot->writeI32(this->num2);
  xfer += oprot->writeFieldEnd();

  xfer += oprot->writeFieldBegin("op", ::apache::thrift::protocol::T_I32, 3);
  xfer += oprot->writeI32((int32_t)this->op);
  xfer += oprot->writeFieldEnd();

  if (this->__isset.comment) {
    xfer += oprot->writeFieldBegin("comment", ::apache::thrift::protocol::T_STRING, 4);
    xfer += oprot->writeString(this->comment);
    xfer += oprot->writeFieldEnd();
  }
  xfer += oprot->writeFieldStop();
  xfer += oprot->writeStructEnd();
  return xfer;
}

void swap(Work &a, Work &b) {
  using ::std::swap; 庫函式,交換
  swap(a.num1, b.num1);
  swap(a.num2, b.num2);
  swap(a.op, b.op);
  swap(a.comment, b.comment);
  swap(a.__isset, b.__isset);
}

Work::Work(const Work& other1) {
  num1 = other1.num1;拷貝構造
  num2 = other1.num2;
  op = other1.op;
  comment = other1.comment;
  __isset = other1.__isset;
}
Work& Work::operator=(const Work& other2) {
  num1 = other2.num1;賦值
  num2 = other2.num2;
  op = other2.op;
  comment = other2.comment;
  __isset = other2.__isset;
  return *this;
}
void Work::printTo(std::ostream& out) const {
  using ::apache::thrift::to_string; TToSting.h:boost::lexical_cast<std::string>(t);
  out << "Work(";
  out << "num1=" << to_string(num1);
  out << ", " << "num2=" << to_string(num2);
  out << ", " << "op=" << to_string(op);
  out << ", " << "comment="; (__isset.comment ? (out << to_string(comment)) : (out << "<null>"));
  out << ")";
}    

InvalidOperation方法的實現,依次是解構函式,set函式,read,write,swap,拷貝建構函式,賦值,printTo,
因為thrift檔案裡定義了InvalidOperation為exception關鍵字,所以InvalidOperation繼承自TException,
TException在thrift.h檔案裡定義,有一個虛擬函式virtual const char* what() const throw(),所以這裡也生成了
mutable std::string thriftTExceptionMessageHolder_;
const char* what() const throw();
實現
const char* InvalidOperation::what() const throw() {
  try {
    std::stringstream ss;
    ss << "TException - service has thrown: " << *this;
    this->thriftTExceptionMessageHolder_ = ss.str();
    return this->thriftTExceptionMessageHolder_.c_str();
  } catch (const std::exception&) {
    return "TException - service has thrown: InvalidOperation";
  }
}

_InvalidOperation__isset在read的時候會使用

tutorial_constants.h
#include "tutorial_types.h"

namespace tutorial {

class tutorialConstants {
 public:
  tutorialConstants();

  int32_t INT32CONSTANT;
  std::map<std::string, std::string>  MAPCONSTANT;
};

extern const tutorialConstants g_tutorial_constants;

} // namespace

tutorial_constants.cpp
定義了g_tutorial_constants,預設建構函式

tutorialConstants::tutorialConstants() {
  INT32CONSTANT = 9853;

  MAPCONSTANT.insert(std::make_pair("hello", "world"));
  MAPCONSTANT.insert(std::make_pair("goodnight", "moon"));

}

shared_types.h/shared_types.cpp
定義SharedStruct
拷貝構造,賦值,構造,析構,欄位key,value,_SharedStruct__isset __isset
set方法,==, !=, <運算子,read,write函式,printTo虛擬函式

shared_constants.cpp
shared_constants.h
namespace shared {

class sharedConstants {
 public:
  sharedConstants();空實現

};
extern const sharedConstants g_shared_constants;在cpp中定義
} // namespace


SharedService_server.skeleton.cpp
#include "SharedService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using boost::shared_ptr;

using namespace  ::shared;
SharedServiceHandler虛繼承自SharedServiceIf,虛繼承主要解決多重繼承的二義性問題
class SharedServiceHandler : virtual public SharedServiceIf {
 public:
  SharedServiceHandler() {
    // Your initialization goes here
  }

  void getStruct(SharedStruct& _return, const int32_t key) {
    // Your implementation goes here
    printf("getStruct\n");
  }

};

int main(int argc, char **argv) {
  int port = 9090;
  建立handler,實際是建立了SharedServiceIf
  shared_ptr<SharedServiceHandler> handler(new SharedServiceHandler());
  用handler建立processor
  shared_ptr<TProcessor> processor(new SharedServiceProcessor(handler));
 
  建立socket,並用socket建立Transport
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  建立transportFactory
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  建立protocolFactory
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  用processor,transport,transportFactory, protocolFactory建立server
  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  呼叫server的serve方法
  server.serve();
  return 0;
}
 
SharedService.h
SharedServiceIf抽象類,提供
virtual void getStruct(SharedStruct& _return, const int32_t key) = 0;
方法

SharedServiceIfFactory抽象類,提供
virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0;
virtual void releaseHandler(SharedServiceIf* /* handler */) = 0;
SharedServiceIf也叫Handler

SharedServiceIfFactory的實現SharedServiceIfSingletonFactory
class SharedServiceIfSingletonFactory : virtual public SharedServiceIfFactory {
 public:
  SharedServiceIfSingletonFactory(const boost::shared_ptr<SharedServiceIf>& iface) : iface_(iface) {}
  virtual ~SharedServiceIfSingletonFactory() {}

  virtual SharedServiceIf* getHandler(const ::apache::thrift::TConnectionInfo&) {
    return iface_.get();
  }
  virtual void releaseHandler(SharedServiceIf* /* handler */) {}

 protected:
  boost::shared_ptr<SharedServiceIf> iface_;
};

SharedServiceIf的實現SharedServiceNull
class SharedServiceNull : virtual public SharedServiceIf {
 public:
  virtual ~SharedServiceNull() {}
  void getStruct(SharedStruct& /* _return */, const int32_t /* key */) {
    return;
  }
};

SharedService_getStruct_args類
拷貝構造,賦值,構造,析構,欄位key,_SharedService_getStruct_args__isset
set方法,==, !=, <運算子,read,write函式(和types.h裡的檔案比較沒有printTo虛擬函式)
SharedService_getStruct_pargs類,析構,欄位key,write函式
class SharedService_getStruct_pargs {
 public:
  virtual ~SharedService_getStruct_pargs() throw();
  const int32_t* key;
  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};

SharedService_getStruct_result類
SharedService_getStruct_presult類
因為返回值是SharedStruct,所以多生成了_SharedService_getStruct_presult__isset,結構體區別對待
析構,欄位success,欄位是否被設定,read函式
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);

SharedServiceClient是SharedServiceIf的另一個實現
class SharedServiceClient : virtual public SharedServiceIf {
 public: 2個建構函式,可接受1個TProtocol或2個TProtocol
 private: 2個set函式
 public:
 boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {
    return piprot_;
  }
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {
    return poprot_;
  }
  void getStruct(SharedStruct& _return, const int32_t key);
  void send_getStruct(const int32_t key);
  void recv_getStruct(SharedStruct& _return);
 protected: 4個欄位,2個封裝指標,2個真正的指標
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
};

SharedServiceProcessor繼承TDispatchProcessor,不是虛繼承
class SharedServiceProcessor : public ::apache::thrift::TDispatchProcessor {
 protected: 受保護的成員iface_指向SharedServiceIf
  boost::shared_ptr<SharedServiceIf> iface_;
  protect方法dispatchCall,接收2個protocol,fname函式名,32位seqid,void* callContext
  virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
 private:
定義SharedServiceProcessor類的函式指標ProcessFunction,接收i32,2個proto,void *, 返回void
  typedef  void (SharedServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
定義ProcessMap,key是string,value是ProcessFunction
  typedef std::map<std::string, ProcessFunction> ProcessMap;
processMap_成員
  ProcessMap processMap_;
真正處理的方法,接收的型別和ProcessFunction一致
  void process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
 public:公有的建構函式,解構函式
  SharedServiceProcessor(boost::shared_ptr<SharedServiceIf> iface) :
    iface_(iface) {
    processMap_["getStruct"] = &SharedServiceProcessor::process_getStruct;
  }

  virtual ~SharedServiceProcessor() {}
};


SharedServiceProcessorFactory繼承自TProcessorFactory
class SharedServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
 public:用SharedServiceIfFactory來構造SharedServiceProcessorFactory
  SharedServiceProcessorFactory(const ::boost::shared_ptr< SharedServiceIfFactory >& handlerFactory) :
      handlerFactory_(handlerFactory) {}

  由TConnectionInfo得到TProcessor,TDispatchProcessor,SharedServiceProcessor
  ::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);

 protected: processorFactory需要SharedServiceIfFactory成員,processor需要SharedServiceIf成員
  ::boost::shared_ptr< SharedServiceIfFactory > handlerFactory_;
};

SharedServiceMultiface是SharedServiceIf的第三個實現
class SharedServiceMultiface : virtual public SharedServiceIf {
 public:
  SharedServiceMultiface(std::vector<boost::shared_ptr<SharedServiceIf> >& ifaces) : ifaces_(ifaces) {
  }
  virtual ~SharedServiceMultiface() {}
 protected: 含有多個SharedServiceIf
  std::vector<boost::shared_ptr<SharedServiceIf> > ifaces_;
  SharedServiceMultiface() {} 多了一個add方法
  void add(boost::shared_ptr<SharedServiceIf> iface) {
    ifaces_.push_back(iface);
  }
 public:從ifaces_陣列中得到一個SharedServiceIf,呼叫它的getStruct
  void getStruct(SharedStruct& _return, const int32_t key) {
    size_t sz = ifaces_.size();
    size_t i = 0;
    for (; i < (sz - 1); ++i) {
      ifaces_[i]->getStruct(_return, key);
    }
    ifaces_[i]->getStruct(_return, key);
    return;
  }

};

SharedServiceIf的第四個實現,執行緒安全的client,多個執行緒共享一個連線
// The 'concurrent' client is a thread safe client that correctly handles
// out of order responses.  It is slower than the regular client, so should
// only be used when you need to share a connection among multiple threads
class SharedServiceConcurrentClient : virtual public SharedServiceIf {
 public: 建構函式,設定TProtocol
 private: 2個set函式
 public:
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol()
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol()
  void getStruct(SharedStruct& _return, const int32_t key);
  int32_t send_getStruct(const int32_t key);
  void recv_getStruct(SharedStruct& _return, const int32_t seqid);
 protected:
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
  ::apache::thrift::async::TConcurrentClientSyncInfo sync_; 多了1個這個
};


SharedService.cpp
SharedService_getStruct_args類的解構函式,read(讀到this->key,通過引用),write函式,讀寫欄位名key(實際沒有),int32,序號1
SharedService_getStruct_pargs類的解構函式,write函式,寫欄位名key,int32
SharedService_getStruct_result類的解構函式,read(this->success.read,呼叫SharedStruct的read,會填充this->success),
write函式,寫欄位名success,型別,序號,這裡的序號是0(thrift都是從序號1開始),呼叫SharedStruct的write
SharedService_getStruct_presult類的解構函式,read函式,同上面的read

SharedServiceClient::getStruct(SharedStruct& _return, const int32_t key)
    send_getStruct(key);
    recv_getStruct(_return);
    
typedef enum {
  T_CALL      = 1,
  T_REPLY     = 2,
  T_EXCEPTION = 3,
  T_ONEWAY    = 4
} ThriftMessageType;    
SharedServiceClient::send_getStruct(const int32_t key)
{
  int32_t cseqid = 0;
  oprot_->writeMessageBegin("getStruct", ::apache::thrift::protocol::T_CALL, cseqid);
    32位,T_CALL | VERSION_1 = 0x80010000
    String: getStruct
    32位,seqid
  SharedService_getStruct_pargs args;
  args.key = &key;
  args.write(oprot_); 呼叫上面的SharedService_getStruct_pargs的write

  oprot_->writeMessageEnd();無操作
  oprot_->getTransport()->writeEnd();
  oprot_->getTransport()->flush();
}

template <class Transport_, class ByteOrder_ = TNetworkBigEndian>
class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > {
struct TNetworkBigEndian
{
  static uint16_t toWire16(uint16_t x)   {return htons(x);}
  static uint32_t toWire32(uint32_t x)   {return htonl(x);}
  static uint64_t toWire64(uint64_t x)   {return THRIFT_htonll(x);}
  static uint16_t fromWire16(uint16_t x) {return ntohs(x);}
  static uint32_t fromWire32(uint32_t x) {return ntohl(x);}
  static uint64_t fromWire64(uint64_t x) {return THRIFT_ntohll(x);}
}
TBinaryProtocolT::writeI32
    this->trans_->write((uint8_t*)&net, 4);
    TTransport::write---->write_virt(buf, len)---->TVirtualTransport::write_virt
        ---->static_cast<Transport_*>(this)->write(buf, len)
    
    繼承體系
    TBufferedTransport---->TVirtualTransport<TBufferedTransport, TBufferBase>---->TBufferBase
    ---->TVirtualTransport<TBufferBase, TTransportDefaults>---->TTransportDefaults---->TTransport
    
    TBufferedTransport沒有write方法,它呼叫的最現代的write方法是TBufferBase的write
    先往緩衝區寫,寫滿了,呼叫writeSlow,TBufferedTransport實現了writeSlow,呼叫transport_的write方法
    
    TVirtualTransport<class Transport_, class Super_ = TTransportDefaults>的read_virt會呼叫Transport_的
    read方法
    TSocket---->TSocket : public TVirtualTransport<TSocket>---->TTransportDefaults---->TTransport
    
recv_getStruct得到返回結果

bool SharedServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext)
從processMap_中取到fname對應的處理函式,呼叫,如果沒找到,寫給客戶端TApplicationException
(this->*(pfn->second))(seqid, iprot, oprot, callContext);

void SharedServiceProcessor::process_getStruct(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, void* callContext)

SharedServiceProcessor---->TDispatchProcessor---->TProcessor(boost::shared_ptr<TProcessorEventHandler> eventHandler_;)

TProcessorEventHandler

Calculator_server.skeleton.cpp
class CalculatorHandler : virtual public CalculatorIf
ping,add,calculate,zip方法的簡單實現
main方法

Calculator.h
Calculator.cpp


CppClient.cpp
boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);

transport->open()---->transport_->open()---->開啟socket


CppServer.cpp
class CalculatorHandler : public CalculatorIf
protected:
  map<int32_t, SharedStruct> log;
實現了ping,add,calculate,getStruct,zip方法
calculate會往log裡放entry,entry是在棧上分配的,往log裡放時map應該拷貝物件了

class CalculatorCloneFactory : virtual public CalculatorIfFactory {
  virtual CalculatorIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
  {
    boost::shared_ptr<TSocket> sock = boost::dynamic_pointer_cast<TSocket>(connInfo.transport);
    從connInfo.transport得到TSocket
    cout << "Incoming connection\n";
    cout << "\tSocketInfo: "  << sock->getSocketInfo() << "\n";
    cout << "\tPeerHost: "    << sock->getPeerHost() << "\n";
    cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
    cout << "\tPeerPort: "    << sock->getPeerPort() << "\n";
    return new CalculatorHandler; 建立一個CalculatorHandler物件
  }
  virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
    delete handler;刪除傳進來的handler
  }

boost::make_shared是建立一個物件,返回物件指標shared_ptr  
TThreadedServer server(
    boost::make_shared<CalculatorProcessorFactory>(boost::make_shared<CalculatorCloneFactory>()),
        CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorProcessorFactory有
        ::boost::shared_ptr< CalculatorIfFactory > handlerFactory_;這個欄位
        所以是用CalculatorCloneFactory物件初始化ProcessorFactory
    boost::make_shared<TServerSocket>(9090), //port
        TServerSocket---->TServerTransport
            socket_func_t = ::std::tr1::function<void(THRIFT_SOCKET fd)>, THRIFT_SOCKET = int
            const static int DEFAULT_BACKLOG = 1024;
    boost::make_shared<TBufferedTransportFactory>(),
        TBufferedTransportFactory---->TTransportFactory
            getTransport(trans) ----> new TBufferedTransport(trans)
    boost::make_shared<TBinaryProtocolFactory>());
        typedef TBinaryProtocolFactoryT<TTransport> TBinaryProtocolFactory;
            TBinaryProtocolFactoryT---->TProtocolFactory
                Transport_ = TTransport, ByteOrder_ = TNetworkBigEndian
                getProtocol(boost::shared_ptr<TTransport> trans)
                    new TBinaryProtocolT<Transport_, ByteOrder_>(
                        specific_trans,
                        string_limit_,
                        container_limit_,
                        strict_read_,
                        strict_write_);
    boost::shared_ptr 是一個指標
    還有一個預設的引數
    const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
      = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
          new apache::thrift::concurrency::PlatformThreadFactory)
        用PlatformThreadFactory物件的指標給threadFactory賦值

TThreadedServer---->TServerFramework---->TServer---->Runnable
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
apache::thrift::concurrency::Monitor clientsMonitor_;
繼承自TServer的欄位
boost::shared_ptr<TProcessorFactory> processorFactory_;
boost::shared_ptr<TServerTransport> serverTransport_;
boost::shared_ptr<TTransportFactory> inputTransportFactory_;
boost::shared_ptr<TTransportFactory> outputTransportFactory_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
boost::shared_ptr<TServerEventHandler> eventHandler_;

TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory)
    TServer(processorFactory, serverTransport, transportFactory, protocolFactory), 欄位賦值
    clients_(0),
    hwm_(0), 併發客戶端數
    limit_(INT64_MAX)
threadFactory_(threadFactory)

server.serve();    
void TThreadedServer::serve() {
  TServerFramework::serve();

  // Drain all clients - no more will arrive
  try {
    Synchronized s(clientsMonitor_); 用monitor物件,構造Synchronized,內含guard, 構造時加鎖,析構時解鎖
    while (getConcurrentClientCount() > 0) {
    getConcurrentClientCount會Synchronized sync(mon_),mon_是TServerFramework私有的
      clientsMonitor_.wait(); 等待clientsMonitor_的notify
    }
  } catch (TException& tx) {
    string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
    GlobalOutput(errStr.c_str());
  }
}        

void TServerFramework::serve() {
  shared_ptr<TTransport> client;
  shared_ptr<TTransport> inputTransport;
  shared_ptr<TTransport> outputTransport;
  shared_ptr<TProtocol> inputProtocol;
  shared_ptr<TProtocol> outputProtocol;

  // Start the server listening
  serverTransport_->listen(); 監聽埠

  // Run the preServe event to indicate server is now listening
  // and that it is safe to connect.
  if (eventHandler_) { 預設沒有
    eventHandler_->preServe();
  }

  // Fetch client from server
  for (;;) {
    try {
      // Dereference any resources from any previous client creation
      // such that a blocking accept does not hold them indefinitely.
      outputProtocol.reset();
      inputProtocol.reset();
      outputTransport.reset();
      inputTransport.reset();
      client.reset();清空指標值

      // If we have reached the limit on the number of concurrent
      // clients allowed, wait for one or more clients to drain before
      // accepting another.
      {
        Synchronized sync(mon_);
        while (clients_ >= limit_) {
          mon_.wait(); 等待mon_的notify
        }
      }
        TServerTransport的accept,呼叫的是TServerSocket的acceptImpl
      client = serverTransport_->accept();
            TSocket::TSocket(THRIFT_SOCKET socket, boost::shared_ptr<THRIFT_SOCKET> interruptListener)
                listen函式建立的2對socket
                    interruptSockWriter_ = sv[1];
                    interruptSockReader_ = sv[0];
                    
                    childInterruptSockWriter_ = sv[1];
                    pChildInterruptSockReader_
                        = boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
                用於對外提供介面中斷連線,TServerFramework::stop()
                    serverTransport_->interrupt();serverTransport_->interruptChildren();
                    TServerSocket::interrupt() TServerSocket::interruptChildren()
                    TServerSocket::notify(THRIFT_SOCKET notifySocket)
                        網通訊管道里寫1位元組的0,對應的reader讀到了,就拋異常,外面捕獲(客戶端連線),或者關閉server
                        
            建立shared_ptr<TSocket> client = createSocket(clientSocket); clientSocket是一個fd
            設定fd的一些引數,返回

      根據client建立transport(TBufferedTransport),
      根據transport建立protocol(TBinaryProtocolT)
      inputTransport = inputTransportFactory_->getTransport(client);
      outputTransport = outputTransportFactory_->getTransport(client);
      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);

      newlyConnectedClient(shared_ptr<TConnectedClient>(
          new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
                               inputProtocol,
                               outputProtocol,
                               eventHandler_,
                               client),
          bind(&TServerFramework::disposeConnectedClient, this, _1)));
          
         建立TConnectedClient指標,包含2個引數,建立的物件,和刪除物件的方法
         意思是用函式TServerFramework::disposeConnectedClient(this)(TConnectedClient *)
         this->disposeConnectedClient(TConnectedClient* pClient)
         來刪除TConnectedClient物件
            --clients_, 如果允許新來客戶端連線,呼叫mon_的notify
            onClientDisconnected(pClient) 呼叫TThreadedServer::onClientDisconnected,呼叫
            clientsMonitor_.notify()(TThreadedServer的鎖)
        
         然後將建立的指標傳給newlyConnectedClient方法
        
         new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
                               inputProtocol,
                               outputProtocol,
                               eventHandler_,
                               client)
            getProcessor方法是TServer的成員方法,TServerFramework繼承了
                boost::shared_ptr<TProcessor> getProcessor(boost::shared_ptr<TProtocol> inputProtocol,
                                             boost::shared_ptr<TProtocol> outputProtocol,
                                             boost::shared_ptr<TTransport> transport) {
                TConnectionInfo connInfo; 3個欄位
                connInfo.input = inputProtocol;
                connInfo.output = outputProtocol;
                connInfo.transport = transport; 可轉換為TSocket(就是之前建立的client),獲取連線資訊
                return processorFactory_->getProcessor(connInfo);
              }
            processorFactory_是CalculatorProcessorFactory,生成的程式碼
                ::boost::shared_ptr< ::apache::thrift::TProcessor > CalculatorProcessorFactory::getProcessor
                        (const ::apache::thrift::TConnectionInfo& connInfo) {
                  
                  handlerFactory_是CalculatorCloneFactory,建立ReleaseHandler物件cleanup
                  ReleaseHandler過載了()運算子,
                  void operator()(typename HandlerFactory_::Handler* handler) {
                    if (handler) {
                      handlerFactory_->releaseHandler(handler);
                    }
                  }
                  
                    virtual void releaseHandler( ::shared::SharedServiceIf* handler) {
                        delete handler;
                      }
                  CalculatorCloneFactory繼承自CalculatorIfFactory,CalculatorIfFactory中定義了
                  typedef CalculatorIf Handler,所以Handler是CalculatorIf,實際是CalculatorHandler
                  CppServer實現的
                  
                  ::apache::thrift::ReleaseHandler< CalculatorIfFactory > cleanup(handlerFactory_);
                  
                  CalculatorCloneFactory的getHandler,new了一個CalculatorHandler, 同時指定刪除物件時的回撥
                  它提供了一個刪除Handler物件的方法,它可以形如cleanup(handler)的形式呼叫,它不是函式指標,所以
                  必須過載運算子來實現;
                  ::boost::shared_ptr< CalculatorIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
                  
                  用建立的CalculatorHandler物件,建立CalculatorProcessor物件, 賦給iface_成員
                  ::boost::shared_ptr< ::apache::thrift::TProcessor > processor(new CalculatorProcessor(handler));
                  return processor;
                }
                
                CalculatorProcessor---->SharedServiceProcessor---->TDispatchProcessor---->TProcessor(), 有eventHandler_欄位
                processer負責讀引數,呼叫CalculatorHandler的方法,構造返回值給客戶端
        
        建立完TConnectedClient,呼叫
        void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
          onClientConnected(pClient);
            TThreadedServer::onClientConnected
                threadFactory_->newThread(pClient)->start();
                    pClient作為一個runnable,建立一個執行緒執行其run方法
                    
                    void TConnectedClient::run() {
                      if (eventHandler_) {
                        opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
                      }

                      for (bool done = false; !done;) {
                        if (eventHandler_) {
                          eventHandler_->processContext(opaqueContext_, client_);
                        }

                        try {
                          if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
                          呼叫processor的process方法, 即CalculatorProcessor的process_XXX方法
                            break;
                          }
                        } catch (const TTransportException& ttx) {
                          switch (ttx.getType()) {
                          case TTransportException::TIMED_OUT:
                            // Receive timeout - continue processing.
                            continue;

                          case TTransportException::END_OF_FILE:
                          case TTransportException::INTERRUPTED:
                            // Client disconnected or was interrupted.  No logging needed.  Done.
                            done = true;
                            break;

                          default: {
                            // All other transport exceptions are logged.
                            // State of connection is unknown.  Done.
                            string errStr = string("TConnectedClient died: ") + ttx.what();
                            GlobalOutput(errStr.c_str());
                            done = true;
                            break;
                          }
                          }
                        } catch (const TException& tex) {
                          string errStr = string("TConnectedClient processing exception: ") + tex.what();
                          GlobalOutput(errStr.c_str());
                          // Continue processing
                        }
                      }

                      cleanup();
                        inputProtocol_->getTransport()->close(); TProtocol::ptrans
                        outputProtocol_->getTransport()->close();
                        client_->close(); 這三個呼叫都是呼叫client.close
                    }
          // Count a concurrent client added. 計數增加,需同步執行
          Synchronized sync(mon_);
          ++clients_;
          hwm_ = std::max(hwm_, clients_);
        }
    } catch (TTransportException& ttx) {
      releaseOneDescriptor("inputTransport", inputTransport);
      releaseOneDescriptor("outputTransport", outputTransport);
      releaseOneDescriptor("client", client);
      if (ttx.getType() == TTransportException::TIMED_OUT) {
        // Accept timeout - continue processing.
        continue;
      } else if (ttx.getType() == TTransportException::END_OF_FILE
                 || ttx.getType() == TTransportException::INTERRUPTED) {
        // Server was interrupted.  This only happens when stopping.
        break;
      } else {
        // All other transport exceptions are logged.
        // State of connection is unknown.  Done.
        string errStr = string("TServerTransport died: ") + ttx.what();
        GlobalOutput(errStr.c_str());
        break;
      }
    }
  }

  releaseOneDescriptor("serverTransport", serverTransport_);
  serverTransport_->close()實際是呼叫TServerSocket::close()
  關閉serverSocket_,interruptSockWriter_/interruptSockReader_, childInterruptSockWriter_
  pChildInterruptSockReader_.reset() 他有自己的銷燬函式,destroyer_of_fine_sockets
}

class Synchronized {
public: 2個建構函式,傳入Monitor,用monitor.mutex()初始化化成員g
  Synchronized(const Monitor* monitor) : g(monitor->mutex()) {}
  Synchronized(const Monitor& monitor) : g(monitor.mutex()) {}

private:
  Guard g; 成員
};

class Guard : boost::noncopyable {
public:
  Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
    if (timeout == 0) {
      value.lock();
    } else if (timeout < 0) {
      if (!value.trylock()) {
        mutex_ = NULL;
      }
    } else {
      if (!value.timedlock(timeout)) {
        mutex_ = NULL;
      }
    }
  } 鎖
  ~Guard() {
    if (mutex_) {
      mutex_->unlock();
    }
  } 解鎖

  operator bool() const { return (mutex_ != NULL); }

private:
  const Mutex* mutex_;
};

class Monitor : boost::noncopyable {
public:
  /** Creates a new mutex, and takes ownership of it. */
  Monitor();

  /** Uses the provided mutex without taking ownership. */
  explicit Monitor(Mutex* mutex);

  /** Uses the mutex inside the provided Monitor without taking ownership. */
  explicit Monitor(Monitor* monitor);

  /** Deallocates the mutex only if we own it. */
  virtual ~Monitor();

  Mutex& mutex() const;

  virtual void lock() const;

  virtual void unlock() const;

  /**
   * Waits a maximum of the specified timeout in milliseconds for the condition
   * to occur, or waits forever if timeout_ms == 0.
   *
   * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
   */
  int waitForTimeRelative(int64_t timeout_ms) const;

  /**
   * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
   * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
   */
  int waitForTime(const THRIFT_TIMESPEC* abstime) const;

  /**
   * Waits until the absolute time specified using struct timeval.
   * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
   */
  int waitForTime(const struct timeval* abstime) const;

  /**
   * Waits forever until the condition occurs.
   * Returns 0 if condition occurs, or an error code otherwise.
   */
  int waitForever() const;

  /**
   * Exception-throwing version of waitForTimeRelative(), called simply
   * wait(int64) for historical reasons.  Timeout is in milliseconds.
   *
   * If the condition occurs,  this function returns cleanly; on timeout or
   * error an exception is thrown.
   */
  void wait(int64_t timeout_ms = 0LL) const;

  /** Wakes up one thread waiting on this monitor. */
  virtual void notify() const;

  /** Wakes up all waiting threads on this monitor. */
  virtual void notifyAll() const;

private:
  class Impl;

  Impl* impl_;
};

class Monitor::Impl {

public:
  Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
    init(ownedMutex_.get());
  }

  Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
    由另一個monitor來構造一個monitor
  Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }

  ~Impl() { cleanup(); }

  Mutex& mutex() { return *mutex_; }
  void lock() { mutex().lock(); } 加鎖
  void unlock() { mutex().unlock(); } 解鎖

  /**
   * Exception-throwing version of waitForTimeRelative(), called simply
   * wait(int64) for historical reasons.  Timeout is in milliseconds.
   *
   * If the condition occurs,  this function returns cleanly; on timeout or
   * error an exception is thrown.
   */
  void wait(int64_t timeout_ms) const {
    int result = waitForTimeRelative(timeout_ms);
    if (result == THRIFT_ETIMEDOUT) {
      // pthread_cond_timedwait has been observed to return early on
      // various platforms, so comment out this assert.
      // assert(Util::currentTime() >= (now + timeout));
      throw TimedOutException();
    } else if (result != 0) {
      throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
    }
  }

  /**
   * Waits until the specified timeout in milliseconds for the condition to
   * occur, or waits forever if timeout_ms == 0.
   *
   * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
   */ 等待時間間隔
  int waitForTimeRelative(int64_t timeout_ms) const {
    if (timeout_ms == 0LL) {
      return waitForever();
    }

    struct THRIFT_TIMESPEC abstime;
    Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
    return waitForTime(&abstime);
  }

  /**
   * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
   * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
   */等待絕對時間
  int waitForTime(const THRIFT_TIMESPEC* abstime) const {
    assert(mutex_);
    pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
    assert(mutexImpl);

    // XXX Need to assert that caller owns mutex在mutexImpl上等待
    return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
  }

  int waitForTime(const struct timeval* abstime) const {
    struct THRIFT_TIMESPEC temp;
    temp.tv_sec = abstime->tv_sec;
    temp.tv_nsec = abstime->tv_usec * 1000;
    return waitForTime(&temp);
  }
  /**
   * Waits forever until the condition occurs.
   * Returns 0 if condition occurs, or an error code otherwise.
   */
  int waitForever() const {
    assert(mutex_);
    pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
    assert(mutexImpl); 永遠等待直到條件發生
    return pthread_cond_wait(&pthread_cond_, mutexImpl);
  }

  void notify() {
    // XXX Need to assert that caller owns mutex
    int iret = pthread_cond_signal(&pthread_cond_); 發出訊號,啟用一個等待執行緒
    THRIFT_UNUSED_VARIABLE(iret);
    assert(iret == 0);
  }

  void notifyAll() {
    // XXX Need to assert that caller owns mutex
    int iret = pthread_cond_broadcast(&pthread_cond_); 廣播訊號,啟用所有等待執行緒
    THRIFT_UNUSED_VARIABLE(iret);
    assert(iret == 0);
  }

private:將建立的mutex物件賦給mutex_,有可能是null,初始化pthread_cond_t
  void init(Mutex* mutex) {
    mutex_ = mutex;

    if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
      condInitialized_ = true;
    }

    if (!condInitialized_) {
      cleanup();
      throw SystemResourceException();
    }
  }

  void cleanup() {
    if (condInitialized_) {
      condInitialized_ = false;
      int iret = pthread_cond_destroy(&pthread_cond_);
      THRIFT_UNUSED_VARIABLE(iret);
      assert(iret == 0);
    }
  }

  scoped_ptr<Mutex> ownedMutex_;
  Mutex* mutex_;

  mutable pthread_cond_t pthread_cond_;
  mutable bool condInitialized_;
};

class Mutex {
public:
  typedef void (*Initializer)(void*);

  Mutex(Initializer init = DEFAULT_INITIALIZER);
  virtual ~Mutex() {}
  virtual void lock() const;
  virtual bool trylock() const;
  virtual bool timedlock(int64_t milliseconds) const;
  virtual void unlock() const;

  void* getUnderlyingImpl() const;

  static void DEFAULT_INITIALIZER(void*);
  static void ADAPTIVE_INITIALIZER(void*);
  static void RECURSIVE_INITIALIZER(void*);
都是呼叫實現類的方法
private:
  class impl;
  boost::shared_ptr<impl> impl_;
};

class Mutex::impl {
public:  Initializer是個函式指標
  impl(Initializer init) : initialized_(false) {
#ifndef THRIFT_NO_CONTENTION_PROFILING
    profileTime_ = 0;
#endif
    init(&pthread_mutex_); 用來初始化pthread_mutex_t
    initialized_ = true;
  }

  ~impl() {
    if (initialized_) {
      initialized_ = false;
      int ret = pthread_mutex_destroy(&pthread_mutex_);
      THRIFT_UNUSED_VARIABLE(ret);
      assert(ret == 0);
    }
  }

  void lock() const {
    PROFILE_MUTEX_START_LOCK();
    pthread_mutex_lock(&pthread_mutex_);  加鎖
    PROFILE_MUTEX_LOCKED();
  }
  嘗試加鎖
  bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }

  鎖指定的時間
  bool timedlock(int64_t milliseconds) const {
#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
    PROFILE_MUTEX_START_LOCK();

    struct THRIFT_TIMESPEC ts;
    Util::toTimespec(ts, milliseconds + Util::currentTime());
    int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
    if (ret == 0) {
      PROFILE_MUTEX_LOCKED();
      return true;
    }

    PROFILE_MUTEX_NOT_LOCKED();
    return false;
#else
    /* Otherwise follow solution used by Mono for Android */
    struct THRIFT_TIMESPEC sleepytime, now, to;

    /* This is just to avoid a completely busy wait */
    sleepytime.tv_sec = 0;
    sleepytime.tv_nsec = 10000000L; /* 10ms */

    Util::toTimespec(to, milliseconds + Util::currentTime());

    while ((trylock()) == false) {
      Util::toTimespec(now, Util::currentTime());
      if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
        return false;
      }
      nanosleep(&sleepytime, NULL);
    }

    return true;
#endif
  }
解鎖
  void unlock() const {
    PROFILE_MUTEX_START_UNLOCK();
    pthread_mutex_unlock(&pthread_mutex_);
    PROFILE_MUTEX_UNLOCKED();
  }
得到底層的實現
  void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }

private:
  mutable pthread_mutex_t pthread_mutex_;
  mutable bool initialized_;
#ifndef THRIFT_NO_CONTENTION_PROFILING
  mutable int64_t profileTime_;
#endif
};

        
        
class Runnable {內含Thread指標,由外部傳入,提供設定執行緒,鎖住執行緒2個功能
public:
  virtual ~Runnable(){};
  virtual void run() = 0;
  /**
   * Gets the thread object that is hosting this runnable object  - can return
   * an empty boost::shared pointer if no references remain on that thread object
   */runnable弱引用thread,可能返回NULL
  virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
  /**
   * Sets the thread that is executing this object.  This is only meant for
   * use by concrete implementations of Thread.
   */
  virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
private:
  boost::weak_ptr<Thread> thread_;
};

class Thread {

public:
#if USE_BOOST_THREAD
  typedef boost::thread::id id_t;

  static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
  static inline id_t get_current() { return boost::this_thread::get_id(); }
#elif USE_STD_THREAD
  typedef std::thread::id id_t;

  static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
  static inline id_t get_current() { return std::this_thread::get_id(); }
#else
  typedef pthread_t id_t; 定義執行緒標識id_t

  物件公用,判斷傳入的執行緒id是否是當前執行緒,當前執行緒的id沒有作為成員存下來,而是pthread_self獲取
  static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
  獲取當前執行緒id
  static inline id_t get_current() { return pthread_self(); }
#endif

  virtual ~Thread(){};

  /**
   * Starts the thread. Does platform specific thread creation and
   * configuration then invokes the run method of the Runnable object bound
   * to this thread.
   */
  virtual void start() = 0; 啟動執行緒

  /**
   * Join this thread. Current thread blocks until this target thread
   * completes.
   */
  virtual void join() = 0; 阻塞執行緒直到目標執行緒結束

  /**
   * Gets the thread's platform-specific ID
   */
  virtual id_t getId() = 0; 獲取特定平臺的執行緒id

  /**
   * Gets the runnable object this thread is hosting
   */返回runnable
  virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }

protected:設定runnable,runnable中有thread,thread中有runnable
  virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }

private:
  boost::shared_ptr<Runnable> _runnable;
};
        
3個factory都繼承自ThreadFactory
  virtual ~ThreadFactory() {}
  virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
    都實現了newThread函式
  /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
   */
  static const Thread::id_t unknown_thread_id;
  virtual Thread::id_t getCurrentThreadId() const = 0;

#if USE_BOOST_THREAD
  typedef BoostThreadFactory PlatformThreadFactory;
 
#elif USE_STD_THREAD
  typedef StdThreadFactory PlatformThreadFactory;
#else
  typedef PosixThreadFactory PlatformThreadFactory;
 
#endif


class PosixThreadFactory :