基於Netty高效能RPC框架Nifty協議、傳輸層、編解碼
ThriftCodecManager與物件讀取
在編寫服務端程式碼的時候,我們建立了ThriftCodecManager這個物件,該物件是用來管理編解碼器ThriftCodec<?>的,在初始化的時候會建立各種型別的編解碼器放到快取中, 以供服務處理器ThrfitServiceProcessor使用。接下來我們就深入分析這個編解碼器管理器。
通常我們使用的就是預設變成ThriftCodec陣列構造方法來建立該物件,內部持有guava提供的快取
private final LoadingCache<ThriftType, ThriftCodec<?>> typeCodecs;
public ThriftCodecManager(ThriftCodec<?>... codecs) {
this(new CompilerThriftCodecFactory(ThriftCodecManager.class.getClassLoader()), ImmutableSet.copyOf(codecs));
}
public ThriftCodecManager(ThriftCodecFactory factory, Set<ThriftCodec<?>> codecs){
this(factory, new ThriftCatalog(), codecs);
}
複製程式碼
主要分三步來新增編解碼器到快取typeCodecs中,
構造typeCodecs, 根據ThriftType來動態構建編解碼器新增到快取中;使用的是guava cache, 這個東西公司內部基礎架構部門也用的不少,作為本地快取提供了挺多的策略,非常推薦花半小時學習一下;新增支援基本型別的編解碼器,比如StringThriftCodec, IntThriftCodec;將我們自己定義的編解碼器也加入到快取中;
接下來按照順序依次介紹
1.1. 讀取集合型別引數
typeCodecs=CacheBuilder.newBuilder().build(new CacheLoader<ThriftType, ThriftCodec<?>>() {
public ThriftCodec<?> load(ThriftType type) throws Exception {
switch (type.getProtocolType()) {
case STRUCT: {
return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata());
}
case MAP: {
ThriftCodec<?> keyCodec=typeCodecs.get(type.getKeyType());
ThriftCodec<?> valueCodec=typeCodecs.get(type.getValueType());
return new MapThriftCodec<>(type, keyCodec, valueCodec);
}
case SET: {
ThriftCodec<?> elementCodec=typeCodecs.get(type.getValueType());
return new SetThriftCodec<>(type, elementCodec);
}
case LIST: {
ThriftCodec<?> elementCodec=typeCodecs.get(type.getValueType());
return new ListThriftCodec<>(type, elementCodec);
}
}
}
});
複製程式碼
比如ThriftType=MAP,分別從快取獲取key和value的編解碼器,從而來構建map的編解碼器MapThriftCodec
在這之前需要知道編解碼器其實就是提供了read和write方法,從協議中讀取和寫出資料。
public interface ThriftCodec{
/**
* The Thrift type this codec supports. The Thrift type contains the Java generic Type of the
* codec.
*/
public ThriftType getType();
/**
* Reads a value from supplied Thrift protocol reader.
*
* @param protocol the protocol to read from
* @return the value; not null
* @throws Exception if any problems occurred when reading or coercing the value
*/
public T read(TProtocol protocol)
throws Exception;
/**
* Writes a value to the supplied Thrift protocol writer.
*
* @param value the value to write; not null
* @param protocol the protocol to write to
* @throws Exception if any problems occurred when writing or coercing the value
*/
public void write(T value, TProtocol protocol)
throws Exception;
}
複製程式碼
基本型別的編解碼器在之前的部落格中說過,這裡就看下MapThriftCodec和基本型別的編解碼器有什麼不同。
public Map<K, V> read(TProtocol protocol) throws Exception {
return new TProtocolReader(protocol).readMap(keyCodec, valueCodec);
}
複製程式碼
繼續往下看: TProtocolReader:
public <K, V> Map<K, V> readMap(ThriftCodec keyCodec, ThriftCodec valueCodec) throws Exception {
TMap tMap=protocol.readMapBegin();
Map<K, V> map=new HashMap<>();
for (int i=0; i < tMap.size; i++) {
K key=keyCodec.read(protocol);
V value=valueCodec.read(protocol);
map.put(key, value);
}
protocol.readMapEnd();
return map;
}
複製程式碼
是不是很容易理解,拿到key和value的編解碼器後,不停的往下讀即可。
比較疑問的地方就是這裡沒有涉及到迴圈遍歷i,實際上這是關於map的協議來決定的,protocal.readMapBegin在TBinaryProtocal中的實現是,依次讀取一個位元組,一個位元組,4個位元組分別表示key的型別,value的型別和 元素數量。得到了map中元素數量tMap.size後,由於每次讀取key和value的時候buffer中的指標都會移動,讀完了value後,能保證下次讀到的就是下一個元素的key。最後將結果放到構建的hashMap中即可。
1.2. 讀取結構體
方法的形參裡面除了基本型別,集合型別外,經常還能遇到結構體型別,對於java來說就是物件,這個時候對應的處理如下。
case STRUCT: {
return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata());
}
複製程式碼
做點小改動,在這裡使用一個新的ThriftCodecManager構造方法,傳入編解碼器工廠類
ReflectionThriftCodecFactory
ThriftCodecManager manager=new ThriftCodecManager(new ReflectionThriftCodecFactory());
複製程式碼
其實還有個工廠類
CompilerThriftCodecFactory,看名字是個自建編譯器的工廠類,看過內部實現程式碼多而且不大好理解,所以這裡使用反射方式的編解碼器工廠類。
ReflectionThriftCodecFactory繼承自ThriftCodecFactory,提供了獲取編解碼器的方法。
public interface ThriftCodecFactory{
ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata);
}
複製程式碼
ReflectionThriftCodecFactory的實現如下:
public class ReflectionThriftCodecFactory implements ThriftCodecFactory {
@Override
public ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata) {
switch (metadata.getMetadataType()) {
case STRUCT:
return new ReflectionThriftStructCodec<>(codecManager, metadata);
case UNION:
return new ReflectionThriftUnionCodec<>(codecManager, metadata);
default:
throw new IllegalStateException(format("encountered type %s", metadata.getMetadataType()));
}
}
}
複製程式碼
那麼問題就是, ThriftStructMetadata如何得到以及幹嘛用的, 看名字就知道這是儲存結構體元資料的. 由type.getStructMetadata()得到, 那麼問題就是如何構造的ThriftType . 其實在最開始構造ThriftServiceProcessor的時候就構造好了,
建立過程比較複雜,簡單來說就是:構建ThriftServiceProcessor的時候,會將構建ThriftServiceMetadata;而構建ThriftServiceMetadata的時候會構建ThriftMethodMetadata;構建 的時候會構建List; 每個ThriftFieldMetadata都代表一個方法形參,內部持有一個ThriftType, 到這裡就知道了ThriftType是屬於ThriftFieldMetadata,在構建ThriftMethodMetadata的時候會得到ThriftType,如何得到?其實就是根據方法形參得到ThriftType的,從目錄類ThriftCatalog獲取到,其內部存有一個Type和ThriftType的對映,來簡單看下。
ThriftCatalog:
private final ConcurrentMap<Type, ThriftType> typeCache=new ConcurrentHashMap<>();
public ThriftType getThriftType(Type javaType) throws IllegalArgumentException{
ThriftType thriftType=typeCache.get(javaType);
if (thriftType==null) {
thriftType=getThriftTypeUncached(javaType);
typeCache.putIfAbsent(javaType, thriftType);
}
return thriftType;
}
複製程式碼
基於此,在構建ThriftMethodProcessor的時候,再跟進ThriftCodecManager就能得到Map<Short, ThriftCodec<?>>
ImmutableMap.Builder<Short, ThriftCodec<?>> builder=ImmutableMap.builder();
for (ThriftFieldMetadata fieldMetadata : methodMetadata.getParameters()) {
builder.put(fieldMetadata.getId(), codecManager.getCodec(fieldMetadata.getThriftType()));
}
parameterCodecs=builder.build();
複製程式碼
所以在處理接收到的資料的時候,可以根據fieldid來獲取對應的編解碼器了。
知道ThriftType如何獲得後,再回過頭來看
ReflectionThriftStructCodec如何解析結構體的。
protected final ThriftStructMetadata metadata;
protected final SortedMap<Short, ThriftCodec<?>> fields;
@Override
public T read(TProtocol protocol) throws Exception {
TProtocolReader reader=new TProtocolReader(protocol);
Map<Short, Object> data=new HashMap<>(metadata.getFields().size());
while (reader.nextField()) {
short fieldId=reader.getFieldId();
ThriftCodec<?> codec=fields.get(fieldId);
Object value=reader.readField(codec);
data.put(fieldId, value);
}
// build the struct
return constructStruct(data);
}
複製程式碼
程式碼簡化了很多,但是主體邏輯留了下來。從這部分程式碼可以看到,這和服務端讀取資料的邏輯是一樣的,具體的可以參考服務端讀取資料。最後得到的data中key為 struct(即java物件)的fieldId,value則為屬性的值。比如傳入的物件class Dog{int age=5; String name="tom"}那麼兩個data中的資料為{1=5,2="tom"},最後在constructStruct構建物件。
編解碼器
在前面其實介紹過編解碼器ThrfitCodec,為了符合標題,這裡再囉嗦一遍。
Thrift提供的編解碼器頂層介面為ThriftCodec,提供了read和write方法
public interface ThriftCodec{
/**
* The Thrift type this codec supports. The Thrift type contains the Java generic Type of the
* codec.
*/
public ThriftType getType();
/**
* Reads a value from supplied Thrift protocol reader.
*
* @param protocol the protocol to read from
* @return the value; not null
* @throws Exception if any problems occurred when reading or coercing the value
*/
public T read(TProtocol protocol) throws Exception;
/**
* Writes a value to the supplied Thrift protocol writer.
*
* @param value the value to write; not null
* @param protocol the protocol to write to
* @throws Exception if any problems occurred when writing or coercing the value
*/
public void write(T value, TProtocol protocol) throws Exception;
}
複製程式碼
同時Thrift也為我們提供了常用的編解碼器,足以應付我們業務的使用。比如常見的基本型別的編解碼器,String型別編解碼器 StringThriftCodec:
public class StringThriftCodec implements ThriftCodec {
@Override
public ThriftType getType() {
return ThriftType.STRING;
}
@Override
public String read(TProtocol protocol) throws Exception {
return protocol.readString();
}
@Override
public void write(String value, TProtocol protocol) throws Exception{
protocol.writeString(value);
}
}
複製程式碼
IntegerThriftCodec:
public class IntegerThriftCodec implements ThriftCodec {
@Override
public ThriftType getType() {
return ThriftType.I32;
}
@Override
public Integer read(TProtocol protocol) throws Exception {
return protocol.readI32();
}
@Override
public void write(Integer value, TProtocol protocol) throws Exception {
Preconditions.checkNotNull(protocol, "protocol is null");
protocol.writeI32(value);
}
}
複製程式碼
關於結構體編解碼器
ReflectionThriftStructCodec前面一大篇幅都是介紹這個。
不管是哪種編解碼器都是非常依賴協議的,只是編解碼器做了一層抽象遮蔽了細節,方便我們使用。
協議與傳輸
協議TProtocal和傳輸元件TTransport是緊密相連的,協議內部是持有TTransport的,而TTransport可以理解為傳輸層,是直接與輸出資料容器buffer打交道的;比如使用最多的就是TNiftyTransport,內部會持有ChannelBuffer,包含了從netty資料流中獲取的ChannelBuffer和之後寫到客戶端的空的ChannelBuffer。
我們先簡單介紹協議定義了哪些介面,然後找個介面來看如何進行資料傳輸的。
/**
* Protocol interface definition.
*
*/
public abstract class TProtocol {
protected TTransport trans_;
protected TProtocol(TTransport trans) {
trans_=trans;
}
private boolean serverSide;
private String serviceName;
// getter, setter
/**
* Reading methods.
*/
public abstract TMessage readMessageBegin() throws TException;
public abstract void readMessageEnd() throws TException;
public abstract TStruct readStructBegin() throws TException;
public abstract void readStructEnd() throws TException;
public abstract TField readFieldBegin() throws TException;
public abstract void readFieldEnd() throws TException;
public abstract TMap readMapBegin() throws TException;
public abstract void readMapEnd() throws TException;
public abstract TList readListBegin() throws TException;
public abstract void readListEnd() throws TException;
public abstract TSet readSetBegin() throws TException;
public abstract void readSetEnd() throws TException;
public abstract boolean readBool() throws TException;
public abstract byte readByte() throws TException;
public abstract short readI16() throws TException;
public abstract int readI32() throws TException;
public abstract long readI64() throws TException;
public abstract double readDouble() throws TException;
public abstract String readString() throws TException;
public abstract ByteBuffer readBinary() throws TException;
/**
* Writing methods.
*/
// ...
複製程式碼
裡面主要是資料的讀和寫方法,寫和讀方法是對應的就不同了。讀的方法基本都會配合TProtocolReader來使用,寫的方法基本都會配合TProtocolWriter來使用。
開始和結束訊息的讀取,開始讀取方法引數的時候會在初始和結尾進行呼叫, 可以獲得方法的名字和請求的序號。開始和結束結構體的讀取,在正式讀取方法引數值的時候和讀取完畢後進行呼叫,在TBinaryProtocal中可以認為是空實現,readStructEnd通常在readMessageEnd之前。開始和結束引數的讀取,每次讀取一個引數都會呼叫,readFieldBegin返回TField表示引數名稱,型別和序號,基於此獲取編解碼器來讀取引數值,最後再呼叫readFieldEnd。開始和結束集合的讀取;
挑個readI32,readString來看在TBinaryProtocal中的使用。
private byte[] i32rd=new byte[4];
public int readI32() throws TException {
byte[] buf=i32rd;
int off=0;
if (trans_.getBytesRemainingInBuffer() >=4) {
buf=trans_.getBuffer();
off=trans_.getBufferPosition();
trans_.consumeBuffer(4);
} else {
readAll(i32rd, 0, 4);
}
return
((buf[off] & 0xff) << 24) |
((buf[off + 1] & 0xff) << 16) |
((buf[off + 2] & 0xff) << 8) |
((buf[off + 3] & 0xff));
}
複製程式碼
這裡的trans_在前面說過,就是TNiftyTransport,由nifty包提供的。
trans_.getBytesRemainingInBuffer()表示的是內部持有的channelBuffer剩餘位元組數,即bufferEnd - bufferPosition; 如果有四個位元組就讀取4個位元組,否則讀取所有;buf=trans_.getBuffer()是獲取transport內部的的buffer陣列;off=trans_.getBufferPosition();是獲取transport內部buffer當前讀取到的位置,即bufferPosition;trans_.consumeBuffer(4);則是transport內部buffer消費4個位元組,即bufferPosition +=4;關於返回值,注意到16進位制的0xff就是二進位制的11111111,最終結果就是將四個位元組拼接在一起構成一個int值
關於readByte,readShort,readLong都是類似的。
再來看readString:
public String readString() throws TException {
int size=readI32();
checkStringReadLength(size);
if (trans_.getBytesRemainingInBuffer() >=size) {
String s=new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
trans_.consumeBuffer(size);
return s;
}
return readStringBody(size);
}
複製程式碼
首先讀取四個位元組構成的size,表示需要讀取多少byte從而來構造string。在獲取buffer和position,從而從buffer的position位置讀取size個位元組,構造出string;最後需要移動position再返回string結果。