Netty編碼流程及WriteAndFlush()的實現
編碼器的執行時機
首先, 我們想通過服務端,往客戶端傳送資料, 通常我們會呼叫ctx.writeAndFlush(資料)
的方式, 入參位置的資料可能是基本資料型別,也可能物件
其次,編碼器同樣屬於handler,只不過他是特化的專門用於編碼作用的handler, 在我們的訊息真正寫入jdk底層的ByteBuffer時前,資料需要經過編碼處理, 不是說不進行編碼就傳送不出去,而是不經過編碼,客戶端可能接受到的是亂碼
然後,我們知道,ctx.writeAndFlush(資料)
它其實是出站處理器特有的行為,因此註定了它需要在pipeline中進行傳遞,從哪裡進行傳遞呢? 從tail節點開始,一直傳播到header之前的我們自己新增的自定義的解碼器
WriteAndFlush()
的邏輯
我們跟進原始碼WriteAndFlush()
相對於Write()
,它的flush欄位是true
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //todo 因為flush 為 true next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }
於是就會這樣
- 逐個呼叫handler的
write()
- 逐個呼叫handler的
flush()
知道這一點很重要,這意味這我們知道了,事件傳播分成兩波進行, 一波write,一波flush, 這兩波事件傳播的大體流程我寫在這裡, 在下面
write
- 將ByteBuf 轉換成DirctBuffer
- 將訊息(DirctBuffer)封裝進entry 插入寫佇列
- 設定寫狀態
flush
- 重新整理標誌,設定寫狀態
- 變數buffer佇列,過濾Buffer
- 呼叫jdk底層的api,把ByteBuf寫入jdk原生的
ByteBuffer
自定義一個簡單的編碼器
/** * @Author: Changwu * @Date: 2019/7/21 20:49 */ public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> { // todo write動作會傳播到 MyPersonEncoder的write方法, 但是我們沒有重寫, 於是就執行 父類 MessageToByteEncoder的write, 我們進去看 @Override protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder...."); // 訊息頭 長度 out.writeInt(msg.getLength()); // 訊息體 out.writeBytes(msg.getContent()); } }
選擇繼承MessageToByteEncoder<T>
從訊息到位元組的編碼器
繼續跟進
ok,現在來到了我們自定義的 解碼器MyPersonEncoder
,
但是,並沒看到正在傳播的writeAndFlush()
,沒關係, 我們自己的解碼器繼承了MessageToByteEncoder
,這個父類中實現了writeAndFlush()
,原始碼如下:解析寫在原始碼後面
// todo 看他的write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {// todo 1 判斷當前是否可以處理這個物件
@SuppressWarnings("unchecked")
I cast = (I) msg;
// todo 2 記憶體分配
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// todo 3 呼叫本類的encode(), 這個方法就是我們自己實現的方法
encode(ctx, cast, buf);
} finally {
// todo 4 釋放
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
// todo 5. 往前傳遞
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
// todo 釋放
buf.release();
}
}
- 將我們傳送的訊息msg,封裝進了 ByteBuf 中
- 編碼: 執行
encode()
方法,這是個抽象方法,由我們自定義的編碼器實現- 我們的實現很簡單,分別往Buf裡面寫入下面兩次資料
- int型別的訊息的長度
- 訊息體
- 我們的實現很簡單,分別往Buf裡面寫入下面兩次資料
- 將msg釋放
- 繼續向前傳遞
write()
事件 - 最終,釋放第一步建立的ByteBuf
小結
到這裡為止,編碼器的執行流程已經完成了,我們可以看到,和解碼器的架構邏輯相似,類似於模板設計模式,對我們來說,只不過是做了個填空題
其實到上面的最後一步 釋放第一步建立的ByteBuf
之前 ,訊息已經被寫到jdk底層的 ByteBuffer 中了,怎麼做的呢? 別忘了它的上一步, 繼續向前傳遞write()
事件,再往前其實就是HeaderContext
了,和HeaderContext
直接關聯的就是unsafe類, 這並不奇怪,我們都知道,netty中無論是客戶端還是服務端channel底層的資料讀寫,都依賴unsafe
下面開始分析,
WriteAndFlush()
底層的兩波任務細節
第一波事件傳遞 write()
我們跟進HenderContext的write()
,而HenderContext的中依賴的是unsafe.wirte()
所以直接去 AbstractChannel
的Unsafe 原始碼如下:
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { // todo 快取 寫進來的 buffer
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// todo buffer Dirct化 , (我們檢視 AbstractNioByteBuf的實現)
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// todo 插入寫佇列 將 msg 插入到 outboundBuffer
// todo outboundBuffer 這個物件是 ChannelOutBoundBuf型別的,它的作用就是起到一個容器的作用
// todo 下面看, 是如何將 msg 新增進 ChannelOutBoundBuf中的
outboundBuffer.addMessage(msg, size, promise);
}
引數位置的msg,就是經過我們自定義解碼器的父類進行包裝了的ByteBuf
型別訊息
這個方法主要做了三件事
- 第一:
filterOutboundMessage(msg);
將ByteBuf轉換成DirctByteBuf
當我們進入檢視他的實現時,idea會提示,它的子類重寫了這個方法, 是誰重寫的呢? 是AbstractNioByteChannel
這個類其實是屬於客戶端陣營的類,和服務端的AbstractNioMessageChannel
相提並論
原始碼如下:
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
- 第二件事: 將轉換後的
DirectBuffer
插入到寫佇列中
什麼是寫佇列 ? 作用是啥?
它其實就是一個netty自定義的容器,使用的單向連結串列的結構,為什麼要有這個容器呢? 回想一下,服務端需要向客戶端傳送訊息,訊息進而被封裝進ByteBuf
,但是呢, 往客戶端寫的方法有兩個
- write()
- writeAndFlush()
這個方法的區別是有的,前者只是進行了寫,(寫到了ByteBuf) 卻沒有將內容重新整理到ByteBuffer
,沒有重新整理到快取中,就沒辦法進一步把它寫入jdk原生的ByteBuffer
中, 而writeAndFlush()
就比較方便,先把msg寫入ByteBuf
,然後直接刷進socket,一套帶走,打完收工
但是如果客戶端偏偏就是不使用writeAndFlush()
,而使用前者,那麼盛放訊息的ByteBuf
被傳遞到handler的最開始的位置,怎麼辦? unsafe也無法把它寫給客戶端, 難道丟棄不成?
於是寫佇列就解決了這個問題,它以連結串列當做資料結構,新傳播過來的ByteBuf
就會被他封裝成一個一個的節點(entry)進行維護,為了區分這個連結串列中,哪個節點是被使用過的,哪個節點是沒有使用過的,他就用三個標記指標進行標記,如下:
- flushedEntry 被重新整理過的entry
- tailEntry 尾節點
- unflushedEntry 未被刷的entry
下面我們看一下,它如何將一個新的節點,新增到寫佇列
addMessage(Object msg, int size, ChannelPromise promise)
新增寫佇列
public void addMessage(Object msg, int size, ChannelPromise promise) {
// todo 將上面的三者封裝成實體
// todo 呼叫工廠方法, 建立 Entry , 在 當前的ChannelOutboundBuffer 中每一個單位都是一個 Entry, 用它進一步包裝 msg
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// todo 調整三個指標, 去上面檢視這三個指標的定義
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
// todo 跟進這個方法
incrementPendingOutboundBytes(entry.pendingSize, false);
}
看他的原始碼,其實就是簡單的針對連結串列進行插入的操作,尾插入法, 一直往最後的位置插入,連結串列的頭被標記成unflushedEntry
這兩個節點之間entry,表示是可以被flush的節點
在每次新增新的 節點後都呼叫incrementPendingOutboundBytes(entry.pendingSize, false)
方法, 這個方法的作用是設定寫狀態, 設定怎樣的狀態呢? 我們看它的原始碼, 可以看到,它會記錄下累計的ByteBuf
的容量,一旦超出了閾值,就會傳播channel不可寫的事件
- 這也是
write()
的第三件事
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// todo TOTAL_PENDING_SIZE_UPDATER 當前快取中 存在的代寫的 位元組
// todo 累加
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// todo 判斷 新的將被寫的 buffer的容量不能超過 getWriteBufferHighWaterMark() 預設是 64*1024 64位元組
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// todo 超過64 位元組,進入這個方法
setUnwritable(invokeLater);
}
}
小結:
到目前為止,第一波write()
事件已經完成了,我們可以看到了,這個事件的功能就是使用ChannelOutBoundBuf
將write事件傳播過去的單個ByteBuf
維護起來,等待 flush事件的傳播
第二波事件傳遞 flush()
我們重新回到,AbstractChannel
中,看他的第二波flush事件的傳播狀態, 原始碼如下:它也是主要做了下面的三件事
- 新增重新整理標誌,設定寫狀態
- 遍歷buffer佇列,過濾可以flush的buffer
- 呼叫jdk底層的api,進行自旋寫
// todo 最終傳遞到 這裡
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// todo 新增重新整理標誌, 設定寫狀態
outboundBuffer.addFlush();
// todo 遍歷buffer佇列, 過濾byteBuf
flush0();
}
新增重新整理標誌,設定寫狀態
什麼是新增重新整理標誌呢? 其實就是更改連結串列中的指標位置,三個指標之間的可以完美的把entry
劃分出曾經flush過的和未flush節點
ok,繼續
下面看一下如何設定狀態,addflush() 原始碼如下:
* todo 給 ChannelOutboundBuffer 新增快取, 這意味著, 原來新增進 ChannelOutboundBuffer 中的所有 Entry, 全部會被標記為 flushed 過
*/
public void addFlush() {
// todo 預設讓 entry 指向了 unflushedEntry ==> 其實連結串列中的最左邊的 未被使用過的 entry
// todo
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
// todo 跟進這個方法
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
目標是移動指標,改變每一個節點的狀態, 哪一個指標呢? 是 flushedEntry
, 它指向讀被flush的節點,也就是說,它左邊的,都被處理過了
下面的程式碼,是選出一開始位置, 因為, 如果flushedEntry == null,說明沒有任何一個曾經被flush過的節點,於是就將開始的位置定位到最左邊開始,
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
緊接著一個do-while迴圈,從最後一個被flushedEntry
的地方,到尾部,挨個遍歷每一個節點, 因為這些節點要被flush進快取,我們需要把write時累加的他們的容量減掉, 原始碼如下
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
// todo 每次 減去 -size
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// todo 預設 getWriteBufferLowWaterMark() -32kb
// todo newWriteBufferSize<32 就把不可寫狀態改為可寫狀態
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
同樣是使用原子類做到的這件事, 此外,經過減少的容量,如果小於了32kb就會傳播 channel可寫的事件
遍歷buffer佇列, 過濾byteBuf
這是flush的重頭戲,它實現了將資料寫入socket的操作
我們跟進它的原始碼,doWrite(ChannelOutboundBuffer in)
這是本類AbstractChannel
的抽象方法, 寫如的邏輯方法,被設計成抽象的,具體往那個channel寫,和具體的實現有關, 當前我們想往客戶端寫, 它的實現是AbstractNioByteChannel
,我們進入它的實現,原始碼如下
boolean setOpWrite = false;
// todo 整體是無限迴圈, 過濾ByteBuf
for (;;) {
// todo 獲取第一個 flushedEntity, 這個entity中 有我們需要的 byteBuf
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
// todo 第三部分,jdk底層, 進行自旋的寫
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
// todo 當前的 ByteBuf 中,沒有可寫的, 直接remove掉
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
// todo 獲取自旋鎖, netty使用它進行
writeSpinCount = config().getWriteSpinCount();
}
// todo 這個for迴圈是在自旋嘗試往 jdk底層的 ByteBuf寫入資料
for (int i = writeSpinCount - 1; i >= 0; i --) {
// todo 把 對應的 buf , 寫到socket中
// todo localFlushedAmount就是 本次 往jdk底層的 ByteBuffer 中寫入了多少位元組
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
// todo 累加一共寫了多少位元組
flushedAmount += localFlushedAmount;
// todo 如果buf中的資料全部寫完了, 設定完成的狀態, 退出迴圈
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// todo 自旋結束,寫完了 done = true
if (done) {
// todo 跟進去
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
....
這一段程式碼也是非常長, 它的主要邏輯如下:
通過一個無限迴圈,保證可以拿到所有的節點上的ByteBuf
,通過這個函式獲取節點,Object msg = in.current();
我們進一步看它的實現,如下,它只會取出我們標記的節點
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
下一步, 使用jdk的自旋鎖,迴圈16次,嘗試往jdk底層的ByteBuffer中寫資料, 呼叫函式doWriteBytes(buf);
他是本類的抽象方法, 具體的實現是,客戶端chanel的封裝類NioSocketChannel
實現的原始碼如下:
// todo
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
// todo 將位元組資料, 寫入到 java 原生的 channel中
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
這個readBytes()
依然是抽象方法,因為前面我們曾經把從ByteBuf
轉化成了Dirct型別的, 所以它的實現類是PooledDirctByteBuf
繼續跟進如下: 終於見到了親切的一幕
// todo
@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
checkReadableBytes(length);
//todo 關鍵的就是 getBytes() 跟進去
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
return readBytes;
}
跟進getBytes(){
index = idx(index);
// todo 將netty 的 ByteBuf 塞進 jdk的 ByteBuffer tmpBuf;
tmpBuf.clear().position(index).limit(index + length);
// todo 呼叫jdk的write()方法
return out.write(tmpBuf);
}
此外,被使用過的節點會被remove()掉, 原始碼如下, 也是針對連結串列的操作
private void removeEntry(Entry e) {
if (-- flushed == 0) { // todo 如果是最後一個節點, 把所有的指標全部設為 null
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else { //todo 如果 不是最後一個節點, 把當前節點,移動到最後的 節點
flushedEntry = e.next;
}
}
小結
到這裡, 第二波任務的傳播就完成了
write
- 將buffer 轉換成DirctBuffer
- 將訊息entry 插入寫佇列
- 設定寫狀態
flush
- 重新整理標誌,設定寫狀態
- 變數buffer佇列,過濾Buffer
- 呼叫jdk底層的api,把ByteBuf寫入jdk原生的
ByteBuffer