Netty5原始碼分析(六) -- CodeC編解碼分析
Netty5的CodeC編解碼對以往的版本進行了簡化,沒有單獨的Encoder / Decoder介面,都繼承了ChannelHandlerApdater類,來實現ChannelHandler介面。
對Decoder來說,主要有兩個頂層的抽象類,一個是從位元組流到訊息的ByteToMessageDecoder,一個是中間訊息到業務訊息的MessageToMessageDecoder。
ByteToMessageDecoder放置在MessageToMessageDecoder前面,處理inbound事件。
拿讀IO資料來舉例,最初的資料來源與底層的SocketChannel的讀方法,比如UnpooledDirectByteBuf,先分配一個直接記憶體緩衝區DirectByteBuffer,然後把資料複製到ByteBuf,返回ByteBuf給Netty上層的類。
對UnpooledDirectByteBuf來說,它底層封裝了Java的ByteBuffer,使用ByteBuf來對ByteBuffer進行操作,併發ByteBuf拋給頂層
// NioSocketChannel.doReadBytes protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } // AbstractByteBuf.writeBytes public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; } //UnpooledDirectByteBuf.setBytes public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); try { return in.read(tmpNioBuf); } catch (ClosedChannelException e) { return -1; } }
Netty框架拿到ByteBuf之後就拿到了讀到的位元組資料,就可以呼叫ByteToMessageDecoder來把位元組流轉化成業務訊息。把位元組流轉化成業務訊息也就是如何分幀的問題。具體有幾類:
1. 固定長度來分幀
2.根據制定的分隔符
3. 採用訊息頭+訊息體的方式,訊息頭制定訊息長度
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data); data.release(); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } } protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
把位元組流轉化成訊息後,這個訊息有時候是中間訊息,還需要把中間訊息轉化成具體的業務訊息,這時候呼叫MessageToMessageDecoder介面。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
}
// HttpContentDecoder的decode方法,把HttpResponse訊息轉化成具體的業務訊息
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {
if (!(msg instanceof LastHttpContent)) {
continueResponse = true;
}
// 100-continue response must be passed through.
out.add(ReferenceCountUtil.retain(msg));
return;
}
if (continueResponse) {
if (msg instanceof LastHttpContent) {
continueResponse = false;
}
// 100-continue response must be passed through.
out.add(ReferenceCountUtil.retain(msg));
return;
}
if (msg instanceof HttpMessage) {
assert message == null;
message = (HttpMessage) msg;
decodeStarted = false;
cleanup();
}
最後當ChannelPipeline到達後端的業務ChannelHandler時,拿到的訊息是已經解碼後的業務訊息。
編碼的過程也是一樣,提供兩個頂層介面,
1. MessageToMessageEncoder負責把業務訊息轉化成中間訊息
2. MessageToByteEncoder負責把中間訊息/業務訊息轉化成位元組流
編碼過程是一個反向的過程,這裡就不重複展示程式碼了。MessgeToMessageEncoder/Decoder不是必須的元件,可以根據實際情況使用。
ByteToMessageDecoder / MessageToByteEncoder 是必須的元件