1. 程式人生 > >讀Muduo原始碼筆記---7(Protobuf)

讀Muduo原始碼筆記---7(Protobuf)

1、概念

   Protocol Buffers 是一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化,或者說序列化。它很適合做資料儲存或 RPC 資料交換格式。可用於通訊協議、資料儲存等領域的語言無關、平臺無關、可擴充套件的序列化結構資料格式。
    將 程式資料轉化成能被儲存並傳輸的格式的過程被稱為“序列化”(Serialization),而它的逆過程則可被稱為“反序列化” (Deserialization)。

     簡單來說,序列化就是將物件例項的狀態轉換為可保持或傳輸的格式的過程。與序列化相對的是反序列化,它根據流重構物件。這兩個過程結合起來,可以輕 鬆地儲存和傳輸資料。

  序列化:將物件變成位元組流的形式傳出去。      反序列化:從位元組流恢復成原來的物件。

protobuf的反射功能

  

  1. Message:Person是自定義的pb型別,繼承自Message. MessageLite作為Message基類,更加輕量級一些。
    通過Message的兩個介面GetDescriptor/GetReflection,可以獲取該型別對應的Descriptor/Reflection。

  2. Descriptor:Descriptor是對message型別定義的描述,包括message的名字、所有欄位的描述、原始的proto檔案內容等,提供的介面:獲取所有欄位的個數:int field_count() const   

    獲取單個欄位描述型別FieldDescriptor的介面 。

  3. FieldDescriptor:描述message中的單個欄位,例如欄位名,欄位屬性(optional/required/repeated)等。

  4. Reflection:提供了動態讀寫pb欄位的介面,對pb物件的自動讀寫主要通過該類完成。對每種型別,Reflection都提供了一個單獨的介面用於讀寫欄位對應的值。

//讀操作
virtual int32  GetInt32 (const Message& message,const FieldDescriptor* field) const = 0;

virtual int64  GetInt64 (const Message& message,const FieldDescriptor* field) const = 0;

//對於列舉和巢狀的message

virtual const EnumValueDescriptor* GetEnum(const Message& message, const FieldDescriptor* field) const = 0;

virtual const Message& GetMessage(const Message& message,const FieldDescriptor* field,MessageFactory* factory = NULL) const = 0;

       反射使用

  • 通過型別名建立型別物件
// 先獲得型別的Descriptor .
    auto descriptor = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName("T.Test");
    if (nullptr == descriptor)
    {
        return 0 ;
    }
    // 利用Descriptor拿到型別註冊的instance. 這個是不可修改的.
    auto prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);
    if ( nullptr == descriptor)
    {
        return 0 ;
    }
    // 構造一個可用的訊息.
    auto message = prototype->New();
    // 只有當我們預先編譯了test訊息並且正確連結才能這麼幹.
    auto test = dynamic_cast<T::Test*>(message);
    // 直接呼叫message的具體介面
    // 其實這些介面是語法糖介面.所以並沒有對應的反射機制來對應呼叫.
    // 反射機制實現了的Set/Get XXX系列介面,是屬於Reflection的介面,接收Message作為引數.
    test->set_id(1);
  • 通過物件和物件的屬性的名字獲取、修改對應的屬性
// 拿到一個物件,不在乎怎麼拿到,可以是通過反射拿到。
    // 這裡簡單直接的建立一個.
    T::Test p_test ;
    // 拿到物件的描述包.
    auto descriptor = p_test.GetDescriptor() ;
    // 拿到物件的反射配置.
    auto reflecter = p_test.GetReflection() ;
    // 拿到屬性的描述包.
    auto field = descriptor->FindFieldByName("id");
    // 設定屬性的值.
    reflecter->SetInt32(&p_test , field , 5 ) ;
    // 獲取屬性的值.
    std::cout<<reflecter->GetInt32(p_test , field)<< std::endl ;

2、實現

編寫.proto檔案

syntax = "proto3";
package pt;
option optimize_for = LITE_RUNTIME;

message req_login
{
    string username = 1;
    string password = 2;
}

message obj_user_info
{
    string nickname = 1;
    string icon        = 2;    //頭像
    int64  coin        = 3;    //金幣
    string location    = 4;    //所屬地
}

//遊戲資料統計
message obj_user_game_record
{
    string time = 1;
    int32 kill  = 2;        //擊殺數
    int32 dead  = 3;        //死亡數
    int32 assist= 4;        //助攻數
}

message rsp_login
{
    enum RET {
        SUCCESS         = 0;
        ACCOUNT_NULL    = 1;    //賬號不存在
        ACCOUNT_LOCK    = 2;    //賬號鎖定
        PASSWORD_ERROR  = 3;    //密碼錯誤
        ERROR           = 10;
    }
    int32 ret = 1;
    obj_user_info user_info = 2;
    repeated obj_user_game_record record = 3;
}

protobuf的message中有很多欄位,每個欄位的格式為: 修飾符 欄位型別 欄位名 = 域號;

     序列化後的Value是按原樣儲存到字串或者檔案中,Key按照一定的轉換條件儲存起來,序列化後的結果就是 KeyValueKeyValue…。Key的序列化格式是按照message中欄位後面的域號與欄位型別來轉換 .

通過:protoc -I=. --cpp_out=. game.proto   生成.h檔案和.cc檔案

測試程式:

#include <iostream>
#include <string>
#include "game.pb.h"

int main()
{
    pt::rsp_login rsp{};
    rsp.set_ret(pt::rsp_login_RET_SUCCESS);
    auto user_info = rsp.mutable_user_info();
    user_info->set_nickname("dsw");
    user_info->set_icon("345DS55GF34D774S");
    user_info->set_coin(2000);
    user_info->set_location("zh");

    for (int i = 0; i < 5; i++) {
        auto record = rsp.add_record();
        record->set_time("2017/4/13 12:22:11");
        record->set_kill(i * 4);
        record->set_dead(i * 2);
        record->set_assist(i * 5);
    }

    std::string buff{};
    rsp.SerializeToString(&buff);
    //------------------解析----------------------
    pt::rsp_login rsp2{};
    if (!rsp2.ParseFromString(buff)) {
        std::cout << "parse error\n";
    }
    
    auto temp_user_info = rsp2.user_info();
    std::cout << "nickname:" << temp_user_info.nickname() << std::endl;
    std::cout << "coin:" << temp_user_info.coin() << std::endl;
    for (int m = 0; m < rsp2.record_size(); m++) {
        auto temp_record = rsp2.record(m);
        std::cout << "time:" << temp_record.time() << " kill:" << temp_record.kill() << " dead:" << temp_record.dead() << " assist:" << temp_record.assist() << std::endl;
    }
}

編譯:g++ Writer.cc game.pb.cc -o s -std=c++11 -I /usr/local/include/ -L /usr/local/lib/ -lprotobuf -lpthread

執行結果:

3、muduo中的protobuf編解碼器

codec是一個間接層位於Tcpconnection與Server之間,將接到的資料解析出訊息物件,再呼叫Server對應的處理函式進行處理。

程式碼解析:

編解碼程式碼ProtobufCodec

//codec的基本功能之一是TCP分包
class ProtobufCodec : boost::noncopyable
{
public:
	//出錯處理
	enum ErrorCode
	{
		kNoError = 0,
		kInvalidLength,//長度超出範圍
		kCheckSumError,//check num不正確
		kInvalidNameLen,//
		kUnknownMessageType,//不能識別
		kParseError,//解析出錯
	};

	explicit ProtobufCodec(const ProtobufMessageCallback& messageCb)
		: messageCallback_(messageCb),errorCallback_(defaultErrorCallback)
    {
    }//預設處理方式
	ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb)
		: messageCallback_(messageCb),errorCallback_(errorCb)
    {
    }//註冊出錯處理方式
};

//關鍵處理函式
//fillEmptyBuffer用訊息內容來填充Buffer

void ProtobufCodec::fillEmptyBuffer(Buffer* buf, const google::protobuf::Message& message)
{
	// buf->retrieveAll();
	assert(buf->readableBytes() == 0);

	const std::string& typeName = message.GetTypeName();//獲取訊息型別名
	int32_t nameLen = static_cast<int32_t>(typeName.size() + 1);//型別名長度
	buf->appendInt32(nameLen);//新增型別名長度到buffer
	buf->append(typeName.c_str(), nameLen);//新增訊息型別名

	// code copied from MessageLite::SerializeToArray() and MessageLite::SerializePartialToArray().
	GOOGLE_DCHECK(message.IsInitialized()) << InitializationErrorMessage("serialize", message);

	int byte_size = message.ByteSize();
	buf->ensureWritableBytes(byte_size);

	uint8_t* start = reinterpret_cast<uint8_t*>(buf->beginWrite());//獲取buffer可寫指標
	uint8_t* end = message.SerializeWithCachedSizesToArray(start);//將訊息從buffer的writerindex處開始寫
	if (end - start != byte_size)//判斷是否將訊息完整寫入
	{
		ByteSizeConsistencyError(byte_size, message.ByteSize(), static_cast<int>(end - start));
	}
	buf->hasWritten(byte_size);//更新寫指標

	int32_t checkSum = static_cast<int32_t>(
		::adler32(1,
		reinterpret_cast<const Bytef*>(buf->peek()),
		static_cast<int>(buf->readableBytes())));//計算效驗值
	buf->appendInt32(checkSum);//寫入效驗值
	assert(buf->readableBytes() == sizeof nameLen + nameLen + byte_size + sizeof checkSum);
	int32_t len = sockets::hostToNetwork32(static_cast<int32_t>(buf->readableBytes()));
	buf->prepend(&len, sizeof len);//在前面加上總長度
}

//onMessage
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
	Buffer* buf,
	Timestamp receiveTime) //接收完整的一條訊息,交給messageCallback_回撥函式處理
{
	while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)//判斷緩衝區中是否滿足最小的訊息長度條件
	{
		const int32_t len = buf->peekInt32();
		if (len > kMaxMessageLen || len < kMinMessageLen)//長度是否在規定範圍內
		{
			errorCallback_(conn, buf, receiveTime, kInvalidLength);//長度超出範圍,呼叫回撥進行處理
			break;
		}
		else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
		{
			ErrorCode errorCode = kNoError;
			MessagePtr message = parse(buf->peek() + kHeaderLen, len, &errorCode);//解析出MessagePtr
			if (errorCode == kNoError && message)
			{
				messageCallback_(conn, message, receiveTime);//呼叫訊息處理函式
				buf->retrieve(kHeaderLen + len);
			}
			else
			{
				errorCallback_(conn, buf, receiveTime, errorCode);//根據錯誤碼進行相應的處理
				break;
			}
		}
		else
		{
			break;
		}
	}
}

//parse
MessagePtr ProtobufCodec::parse(const char* buf, int len, ErrorCode* error)
{
	MessagePtr message;

	// check sum
	int32_t expectedCheckSum = asInt32(buf + len - kHeaderLen);
	int32_t checkSum = static_cast<int32_t>(
		::adler32(1,
		reinterpret_cast<const Bytef*>(buf),
		static_cast<int>(len - kHeaderLen)));//adler效驗演算法
	if (checkSum == expectedCheckSum)//滿足效驗
	{
		// get message type name
		int32_t nameLen = asInt32(buf);
		if (nameLen >= 2 && nameLen <= len - 2 * kHeaderLen)//保證namelen大於2,同時保證訊息的長度大於最小的值
		{
			std::string typeName(buf + kHeaderLen, buf + kHeaderLen + nameLen - 1);//取出訊息型別名
			// create message object
			message.reset(createMessage(typeName));//建立訊息物件
			if (message)
			{
				// parse from buffer
				const char* data = buf + kHeaderLen + nameLen;//從buffer中取出訊息內容
				int32_t dataLen = len - nameLen - 2 * kHeaderLen;
				if (message->ParseFromArray(data, dataLen))//對新建的訊息物件賦值
				{
					*error = kNoError;
				}
				else
				{
					*error = kParseError;
				}
			}
			else
			{
				*error = kUnknownMessageType;
			}
		}
		else
		{
			*error = kInvalidNameLen;
		}
	}
	else
	{
		*error = kCheckSumError;
	}

	return message;
}
//createMessage

google::protobuf::Message* ProtobufCodec::createMessage(const std::string& typeName)
{
	google::protobuf::Message* message = NULL;
	const google::protobuf::Descriptor* descriptor =
		google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);//通過型別名從DescriptorPool中得到Descriptor
	if (descriptor)
	{
		const google::protobuf::Message* prototype =
			google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);//通過Descriptor從MessageFactory中得到Message
		if (prototype)
		{
			message = prototype->New();//建立新的訊息物件
		}
	}
	return message;
}

4、muduo中的protobuf訊息分發器ProtobufDispatcher:按訊息型別對訊息進行分發

通過多型和模板實現,Callback是基類,定義了一個純虛擬函式onMessage;CallbackT是一個模板類,定義了一個回撥函式的指標,重寫了基類的虛擬函式,該類是一個模板類,根據不同的型別,實現對不同訊息的處理。ProtobufDispatcher是分發器實現的類,該類成員map是以Descriptor為key,以Callback*為值,提供了註冊任意訊息函式,以及訊息分發函式。

template <typename T>
class CallbackT : public Callback //任意訊息的對應的處理
{
 public:
  CallbackT(const ProtobufMessageTCallback& callback)
    : callback_(callback)
  {
  }

  virtual void onMessage(const muduo::net::TcpConnectionPtr& conn,
                         const MessagePtr& message,
                         muduo::Timestamp receiveTime) const  //重寫基類虛擬函式
  {
    boost::shared_ptr<T> concrete = muduo::down_pointer_cast<T>(message);
    assert(concrete != NULL);
    callback_(conn, concrete, receiveTime);//訊息處理的回撥函式
  }
 private:
  ProtobufMessageTCallback callback_;
};

class ProtobufDispatcher //訊息分發器
{
 public:

  explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb)
    : defaultCallback_(defaultCb)
  {
  }

  void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
                         const MessagePtr& message,
                         muduo::Timestamp receiveTime) const
  {
    CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());//根據傳入訊息的Descriptor,找到對應的callbackT
    if (it != callbacks_.end())//找到
    {
      it->second->onMessage(conn, message, receiveTime);//呼叫相應訊息的onMessage
    }
    else//未找到,選擇預設處理方式
    {
      defaultCallback_(conn, message, receiveTime);
    }
  }

  template<typename T>
  void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback)    //註冊訊息
  {
    boost::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));
    callbacks_[T::descriptor()] = pd;
  }

 private:
  typedef std::map<const google::protobuf::Descriptor*, boost::shared_ptr<Callback> > CallbackMap;

  CallbackMap callbacks_;
  ProtobufMessageCallback defaultCallback_;
};
#endif

訊息分發器的使用

void onUnknownMessageType(const muduo::net::TcpConnectionPtr&,
                          const MessagePtr& message,
                          muduo::Timestamp)        //定義未註冊訊息的處理方式
{
  cout << "onUnknownMessageType: " << message->GetTypeName() << endl;
}

void onQuery(const muduo::net::TcpConnectionPtr&,
             const MessagePtr& message,
             muduo::Timestamp)     //Query訊息的處理
{
  cout << "onQuery: " << message->GetTypeName() << endl;
  boost::shared_ptr<muduo::Query> query = muduo::down_pointer_cast<muduo::Query>(message);
  assert(query != NULL);
}

void onAnswer(const muduo::net::TcpConnectionPtr&,
              const MessagePtr& message,
              muduo::Timestamp)     //Answer訊息的處理
{
  cout << "onAnswer: " << message->GetTypeName() << endl;
  boost::shared_ptr<muduo::Answer> answer = muduo::down_pointer_cast<muduo::Answer>(message);
  assert(answer != NULL);
}

int main()
{
  GOOGLE_PROTOBUF_VERIFY_VERSION;
  //定義分發器
  ProtobufDispatcherLite dispatcher(onUnknownMessageType);
  //註冊訊息到分發器
  dispatcher.registerMessageCallback(muduo::Query::descriptor(), onQuery); 
  dispatcher.registerMessageCallback(muduo::Answer::descriptor(), onAnswer);

  muduo::net::TcpConnectionPtr conn;
  muduo::Timestamp t;

  boost::shared_ptr<muduo::Query> query(new muduo::Query);
  boost::shared_ptr<muduo::Answer> answer(new muduo::Answer);
  boost::shared_ptr<muduo::Empty> empty(new muduo::Empty);
  dispatcher.onProtobufMessage(conn, query, t);
  dispatcher.onProtobufMessage(conn, answer, t);
  dispatcher.onProtobufMessage(conn, empty, t);

  google::protobuf::ShutdownProtobufLibrary();
}

boost::bind使用說明:function是函式物件的“容器”型別,bind繫結成員函式,返回函式物件。bind可以繫結函式指標、函式應用、成員函式指標、函式物件作為回撥,在繫結非成員函式或者類靜態成員函式時,函式引數最多可以達到9個,在繫結成員函式時,函式引數最多可以達到8個,另外一個用於指明例項物件或者this指標。