1. 程式人生 > 其它 >基於Netty高效能RPC框架Nifty協議、傳輸層、編解碼

基於Netty高效能RPC框架Nifty協議、傳輸層、編解碼

  ThriftCodecManager與物件讀取

  在編寫服務端程式碼的時候,我們建立了ThriftCodecManager這個物件,該物件是用來管理編解碼器ThriftCodec<?>的,在初始化的時候會建立各種型別的編解碼器放到快取中, 以供服務處理器ThrfitServiceProcessor使用。接下來我們就深入分析這個編解碼器管理器。

  通常我們使用的就是預設變成ThriftCodec陣列構造方法來建立該物件,內部持有guava提供的快取

  private final LoadingCache<ThriftType, ThriftCodec<?>> typeCodecs;

  public ThriftCodecManager(ThriftCodec<?>... codecs) {

  this(new CompilerThriftCodecFactory(ThriftCodecManager.class.getClassLoader()), ImmutableSet.copyOf(codecs));

  }

  public ThriftCodecManager(ThriftCodecFactory factory, Set<ThriftCodec<?>> codecs){

  this(factory, new ThriftCatalog(), codecs);

  }

  複製程式碼

  主要分三步來新增編解碼器到快取typeCodecs中,

  構造typeCodecs, 根據ThriftType來動態構建編解碼器新增到快取中;使用的是guava cache, 這個東西公司內部基礎架構部門也用的不少,作為本地快取提供了挺多的策略,非常推薦花半小時學習一下;新增支援基本型別的編解碼器,比如StringThriftCodec, IntThriftCodec;將我們自己定義的編解碼器也加入到快取中;

  接下來按照順序依次介紹

  1.1. 讀取集合型別引數

  typeCodecs=CacheBuilder.newBuilder().build(new CacheLoader<ThriftType, ThriftCodec<?>>() {

  public ThriftCodec<?> load(ThriftType type) throws Exception {

  switch (type.getProtocolType()) {

  case STRUCT: {

  return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata());

  }

  case MAP: {

  ThriftCodec<?> keyCodec=typeCodecs.get(type.getKeyType());

  ThriftCodec<?> valueCodec=typeCodecs.get(type.getValueType());

  return new MapThriftCodec<>(type, keyCodec, valueCodec);

  }

  case SET: {

  ThriftCodec<?> elementCodec=typeCodecs.get(type.getValueType());

  return new SetThriftCodec<>(type, elementCodec);

  }

  case LIST: {

  ThriftCodec<?> elementCodec=typeCodecs.get(type.getValueType());

  return new ListThriftCodec<>(type, elementCodec);

  }

  }

  }

  });

  複製程式碼

  比如ThriftType=MAP,分別從快取獲取key和value的編解碼器,從而來構建map的編解碼器MapThriftCodec

  在這之前需要知道編解碼器其實就是提供了read和write方法,從協議中讀取和寫出資料。

  public interface ThriftCodec{

  /**

  * The Thrift type this codec supports. The Thrift type contains the Java generic Type of the

  * codec.

  */

  public ThriftType getType();

  /**

  * Reads a value from supplied Thrift protocol reader.

  *

  * @param protocol the protocol to read from

  * @return the value; not null

  * @throws Exception if any problems occurred when reading or coercing the value

  */

  public T read(TProtocol protocol)

  throws Exception;

  /**

  * Writes a value to the supplied Thrift protocol writer.

  *

  * @param value the value to write; not null

  * @param protocol the protocol to write to

  * @throws Exception if any problems occurred when writing or coercing the value

  */

  public void write(T value, TProtocol protocol)

  throws Exception;

  }

  複製程式碼

  基本型別的編解碼器在之前的部落格中說過,這裡就看下MapThriftCodec和基本型別的編解碼器有什麼不同。

  public Map<K, V> read(TProtocol protocol) throws Exception {

  return new TProtocolReader(protocol).readMap(keyCodec, valueCodec);

  }

  複製程式碼

  繼續往下看: TProtocolReader:

  public <K, V> Map<K, V> readMap(ThriftCodec keyCodec, ThriftCodec valueCodec) throws Exception {

  TMap tMap=protocol.readMapBegin();

  Map<K, V> map=new HashMap<>();

  for (int i=0; i < tMap.size; i++) {

  K key=keyCodec.read(protocol);

  V value=valueCodec.read(protocol);

  map.put(key, value);

  }

  protocol.readMapEnd();

  return map;

  }

  複製程式碼

  是不是很容易理解,拿到key和value的編解碼器後,不停的往下讀即可。

  比較疑問的地方就是這裡沒有涉及到迴圈遍歷i,實際上這是關於map的協議來決定的,protocal.readMapBegin在TBinaryProtocal中的實現是,依次讀取一個位元組,一個位元組,4個位元組分別表示key的型別,value的型別和 元素數量。得到了map中元素數量tMap.size後,由於每次讀取key和value的時候buffer中的指標都會移動,讀完了value後,能保證下次讀到的就是下一個元素的key。最後將結果放到構建的hashMap中即可。

  1.2. 讀取結構體

  方法的形參裡面除了基本型別,集合型別外,經常還能遇到結構體型別,對於java來說就是物件,這個時候對應的處理如下。

  case STRUCT: {

  return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata());

  }

  複製程式碼

  做點小改動,在這裡使用一個新的ThriftCodecManager構造方法,傳入編解碼器工廠類

  ReflectionThriftCodecFactory

  ThriftCodecManager manager=new ThriftCodecManager(new ReflectionThriftCodecFactory());

  複製程式碼

  其實還有個工廠類

  CompilerThriftCodecFactory,看名字是個自建編譯器的工廠類,看過內部實現程式碼多而且不大好理解,所以這裡使用反射方式的編解碼器工廠類。

  ReflectionThriftCodecFactory繼承自ThriftCodecFactory,提供了獲取編解碼器的方法。

  public interface ThriftCodecFactory{

  ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata);

  }

  複製程式碼

  ReflectionThriftCodecFactory的實現如下:

  public class ReflectionThriftCodecFactory implements ThriftCodecFactory {

  @Override

  public ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata) {

  switch (metadata.getMetadataType()) {

  case STRUCT:

  return new ReflectionThriftStructCodec<>(codecManager, metadata);

  case UNION:

  return new ReflectionThriftUnionCodec<>(codecManager, metadata);

  default:

  throw new IllegalStateException(format("encountered type %s", metadata.getMetadataType()));

  }

  }

  }

  複製程式碼

  那麼問題就是, ThriftStructMetadata如何得到以及幹嘛用的, 看名字就知道這是儲存結構體元資料的. 由type.getStructMetadata()得到, 那麼問題就是如何構造的ThriftType . 其實在最開始構造ThriftServiceProcessor的時候就構造好了,

  建立過程比較複雜,簡單來說就是:構建ThriftServiceProcessor的時候,會將構建ThriftServiceMetadata;而構建ThriftServiceMetadata的時候會構建ThriftMethodMetadata;構建 的時候會構建List; 每個ThriftFieldMetadata都代表一個方法形參,內部持有一個ThriftType, 到這裡就知道了ThriftType是屬於ThriftFieldMetadata,在構建ThriftMethodMetadata的時候會得到ThriftType,如何得到?其實就是根據方法形參得到ThriftType的,從目錄類ThriftCatalog獲取到,其內部存有一個Type和ThriftType的對映,來簡單看下。

  ThriftCatalog:

  private final ConcurrentMap<Type, ThriftType> typeCache=new ConcurrentHashMap<>();

  public ThriftType getThriftType(Type javaType) throws IllegalArgumentException{

  ThriftType thriftType=typeCache.get(javaType);

  if (thriftType==null) {

  thriftType=getThriftTypeUncached(javaType);

  typeCache.putIfAbsent(javaType, thriftType);

  }

  return thriftType;

  }

  複製程式碼

  基於此,在構建ThriftMethodProcessor的時候,再跟進ThriftCodecManager就能得到Map<Short, ThriftCodec<?>>

  ImmutableMap.Builder<Short, ThriftCodec<?>> builder=ImmutableMap.builder();

  for (ThriftFieldMetadata fieldMetadata : methodMetadata.getParameters()) {

  builder.put(fieldMetadata.getId(), codecManager.getCodec(fieldMetadata.getThriftType()));

  }

  parameterCodecs=builder.build();

  複製程式碼

  所以在處理接收到的資料的時候,可以根據fieldid來獲取對應的編解碼器了。

  知道ThriftType如何獲得後,再回過頭來看

  ReflectionThriftStructCodec如何解析結構體的。

  protected final ThriftStructMetadata metadata;

  protected final SortedMap<Short, ThriftCodec<?>> fields;

  @Override

  public T read(TProtocol protocol) throws Exception {

  TProtocolReader reader=new TProtocolReader(protocol);

  Map<Short, Object> data=new HashMap<>(metadata.getFields().size());

  while (reader.nextField()) {

  short fieldId=reader.getFieldId();

  ThriftCodec<?> codec=fields.get(fieldId);

  Object value=reader.readField(codec);

  data.put(fieldId, value);

  }

  // build the struct

  return constructStruct(data);

  }

  複製程式碼

  程式碼簡化了很多,但是主體邏輯留了下來。從這部分程式碼可以看到,這和服務端讀取資料的邏輯是一樣的,具體的可以參考服務端讀取資料。最後得到的data中key為 struct(即java物件)的fieldId,value則為屬性的值。比如傳入的物件class Dog{int age=5; String name="tom"}那麼兩個data中的資料為{1=5,2="tom"},最後在constructStruct構建物件。

  編解碼器

  在前面其實介紹過編解碼器ThrfitCodec,為了符合標題,這裡再囉嗦一遍。

  Thrift提供的編解碼器頂層介面為ThriftCodec,提供了read和write方法

  public interface ThriftCodec{

  /**

  * The Thrift type this codec supports. The Thrift type contains the Java generic Type of the

  * codec.

  */

  public ThriftType getType();

  /**

  * Reads a value from supplied Thrift protocol reader.

  *

  * @param protocol the protocol to read from

  * @return the value; not null

  * @throws Exception if any problems occurred when reading or coercing the value

  */

  public T read(TProtocol protocol) throws Exception;

  /**

  * Writes a value to the supplied Thrift protocol writer.

  *

  * @param value the value to write; not null

  * @param protocol the protocol to write to

  * @throws Exception if any problems occurred when writing or coercing the value

  */

  public void write(T value, TProtocol protocol) throws Exception;

  }

  複製程式碼

  同時Thrift也為我們提供了常用的編解碼器,足以應付我們業務的使用。比如常見的基本型別的編解碼器,String型別編解碼器 StringThriftCodec:

  public class StringThriftCodec implements ThriftCodec {

  @Override

  public ThriftType getType() {

  return ThriftType.STRING;

  }

  @Override

  public String read(TProtocol protocol) throws Exception {

  return protocol.readString();

  }

  @Override

  public void write(String value, TProtocol protocol) throws Exception{

  protocol.writeString(value);

  }

  }

  複製程式碼

  IntegerThriftCodec:

  public class IntegerThriftCodec implements ThriftCodec {

  @Override

  public ThriftType getType() {

  return ThriftType.I32;

  }

  @Override

  public Integer read(TProtocol protocol) throws Exception {

  return protocol.readI32();

  }

  @Override

  public void write(Integer value, TProtocol protocol) throws Exception {

  Preconditions.checkNotNull(protocol, "protocol is null");

  protocol.writeI32(value);

  }

  }

  複製程式碼

  關於結構體編解碼器

  ReflectionThriftStructCodec前面一大篇幅都是介紹這個。

  不管是哪種編解碼器都是非常依賴協議的,只是編解碼器做了一層抽象遮蔽了細節,方便我們使用。

  協議與傳輸

  協議TProtocal和傳輸元件TTransport是緊密相連的,協議內部是持有TTransport的,而TTransport可以理解為傳輸層,是直接與輸出資料容器buffer打交道的;比如使用最多的就是TNiftyTransport,內部會持有ChannelBuffer,包含了從netty資料流中獲取的ChannelBuffer和之後寫到客戶端的空的ChannelBuffer。

  我們先簡單介紹協議定義了哪些介面,然後找個介面來看如何進行資料傳輸的。

  /**

  * Protocol interface definition.

  *

  */

  public abstract class TProtocol {

  protected TTransport trans_;

  protected TProtocol(TTransport trans) {

  trans_=trans;

  }

  private boolean serverSide;

  private String serviceName;

  // getter, setter

  /**

  * Reading methods.

  */

  public abstract TMessage readMessageBegin() throws TException;

  public abstract void readMessageEnd() throws TException;

  public abstract TStruct readStructBegin() throws TException;

  public abstract void readStructEnd() throws TException;

  public abstract TField readFieldBegin() throws TException;

  public abstract void readFieldEnd() throws TException;

  public abstract TMap readMapBegin() throws TException;

  public abstract void readMapEnd() throws TException;

  public abstract TList readListBegin() throws TException;

  public abstract void readListEnd() throws TException;

  public abstract TSet readSetBegin() throws TException;

  public abstract void readSetEnd() throws TException;

  public abstract boolean readBool() throws TException;

  public abstract byte readByte() throws TException;

  public abstract short readI16() throws TException;

  public abstract int readI32() throws TException;

  public abstract long readI64() throws TException;

  public abstract double readDouble() throws TException;

  public abstract String readString() throws TException;

  public abstract ByteBuffer readBinary() throws TException;

  /**

  * Writing methods.

  */

  // ...

  複製程式碼

  裡面主要是資料的讀和寫方法,寫和讀方法是對應的就不同了。讀的方法基本都會配合TProtocolReader來使用,寫的方法基本都會配合TProtocolWriter來使用。

  開始和結束訊息的讀取,開始讀取方法引數的時候會在初始和結尾進行呼叫, 可以獲得方法的名字和請求的序號。開始和結束結構體的讀取,在正式讀取方法引數值的時候和讀取完畢後進行呼叫,在TBinaryProtocal中可以認為是空實現,readStructEnd通常在readMessageEnd之前。開始和結束引數的讀取,每次讀取一個引數都會呼叫,readFieldBegin返回TField表示引數名稱,型別和序號,基於此獲取編解碼器來讀取引數值,最後再呼叫readFieldEnd。開始和結束集合的讀取;

  挑個readI32,readString來看在TBinaryProtocal中的使用。

  private byte[] i32rd=new byte[4];

  public int readI32() throws TException {

  byte[] buf=i32rd;

  int off=0;

  if (trans_.getBytesRemainingInBuffer() >=4) {

  buf=trans_.getBuffer();

  off=trans_.getBufferPosition();

  trans_.consumeBuffer(4);

  } else {

  readAll(i32rd, 0, 4);

  }

  return

  ((buf[off] & 0xff) << 24) |

  ((buf[off + 1] & 0xff) << 16) |

  ((buf[off + 2] & 0xff) << 8) |

  ((buf[off + 3] & 0xff));

  }

  複製程式碼

  這裡的trans_在前面說過,就是TNiftyTransport,由nifty包提供的。

  trans_.getBytesRemainingInBuffer()表示的是內部持有的channelBuffer剩餘位元組數,即bufferEnd - bufferPosition; 如果有四個位元組就讀取4個位元組,否則讀取所有;buf=trans_.getBuffer()是獲取transport內部的的buffer陣列;off=trans_.getBufferPosition();是獲取transport內部buffer當前讀取到的位置,即bufferPosition;trans_.consumeBuffer(4);則是transport內部buffer消費4個位元組,即bufferPosition +=4;關於返回值,注意到16進位制的0xff就是二進位制的11111111,最終結果就是將四個位元組拼接在一起構成一個int值

  關於readByte,readShort,readLong都是類似的。

  再來看readString:

  public String readString() throws TException {

  int size=readI32();

  checkStringReadLength(size);

  if (trans_.getBytesRemainingInBuffer() >=size) {

  String s=new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");

  trans_.consumeBuffer(size);

  return s;

  }

  return readStringBody(size);

  }

  複製程式碼

  首先讀取四個位元組構成的size,表示需要讀取多少byte從而來構造string。在獲取buffer和position,從而從buffer的position位置讀取size個位元組,構造出string;最後需要移動position再返回string結果。