Netty原始碼解析 -- ChannelOutboundBuffer實現與Flush過程
阿新 • • 發佈:2020-11-09
前面文章說了,ChannelHandlerContext#write只是將資料快取到ChannelOutboundBuffer,等到ChannelHandlerContext#flush時,再將ChannelOutboundBuffer快取的資料寫到Channel中。
本文分享Netty中ChannelOutboundBuffer的實現以及Flush過程。
**原始碼分析基於Netty 4.1**
每個Channel的AbstractUnsafe#outboundBuffer 都維護了一個ChannelOutboundBuffer。
ChannelOutboundBuffer,出站資料緩衝區,負責快取ChannelHandlerContext#write的資料。通過連結串列管理資料,連結串列節點為內部類Entry。
關鍵欄位如下
```
Entry tailEntry; // 連結串列最後一個節點,新增的節點新增其後。
Entry unflushedEntry; // 連結串列中第一個未重新整理的節點
Entry flushedEntry; // 連結串列中第一個已重新整理但資料未寫入的節點
int flushed; // 已重新整理但資料未寫入的節點數
```
ChannelHandlerContext#flush操作前,需要先重新整理一遍待處理的節點(主要是統計本次ChannelHandlerContext#flush操作可以寫入多少個節點資料),從unflushedEntry開始。重新整理完成後使用flushedEntry標誌第一個待寫入的節點,flushed為待寫入節點數。
前面分享Netty讀寫過程的文章說過,AbstractUnsafe#write處理寫操作時,會呼叫ChannelOutboundBuffer#addMessage將資料快取起來
```
public void addMessage(Object msg, int size, ChannelPromise promise) {
// #1
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(entry.pendingSize, false);
}
```
`#1` 構建一個Entry,注意,這裡使用了物件池RECYCLER,後面有文章詳細解析。
主要是更新tailEntry和unflushedEntry
`#2` 如果當前快取數量超過閥值WriteBufferWaterMark#high,更新unwritable標誌為true,並觸發`pipeline.fireChannelWritabilityChanged()`方法。
由於ChannelOutboundBuffer連結串列沒有大小限制,不斷累積資料可能導致 OOM,
為了避免這個問題,我們可以在unwritable標誌為true時,不再繼續快取資料。
Netty只會更新unwritable標誌,並不阻止資料快取,我們可以根據需要實現該功能。示例如下
```
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
ctx.writeAndFlush(responseMessage);
} else {
...
}
```
addFlush方法負責重新整理節點(ChannelHandlerContext#flush操作前呼叫該方法統計可寫入節點資料數)
```
public void addFlush() {
// #1
Entry entry = unflushedEntry;
if (entry != null) {
// #2
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
// #3
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
// #4
} while (entry != null);
// All flushed so reset unflushedEntry
// #5
unflushedEntry = null;
}
}
```
`#1` 從unflushedEntry節點開始處理
`#2` 賦值flushedEntry為unflushedEntry。
ChannelHandlerContext#flush寫入完成後會置空flushedEntry
`#3` 增加flushed
設定節點的ChannelPromise不可取消
`#4` 從unflushedEntry開始,遍歷後面節點
`#5` 置空unflushedEntry,表示當前所有節點都已重新整理。
nioBuffers方法負責將當前快取的ByteBuf轉發為(jvm)ByteBuffer
```
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
// #1
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
// #2
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
break;
}
nioBufferSize += readableBytes;
// #3
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
// #4
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
...
}
if (nioBufferCount == maxCount) {
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
```
`#1` 從執行緒快取中獲取nioBuffers變數,這樣可以避免反覆構造ByteBuffer陣列的效能損耗
`#2` maxBytes,即本次操作最大的位元組數。
`maxBytes - readableBytes < nioBufferSize`,表示如果本次操作後將超出maxBytes,退出
`#3`
buf.nioBufferCount(),獲取ByteBuffer數量,CompositeByteBuf可能有多個ByteBuffer組成。
neededSpace,即nioBuffers陣列中ByteBuffer數量,nioBuffers長度不夠時需要擴容。
`#4`
`buf.internalNioBuffer(readerIndex, readableBytes)`,使用readerIndex, readableBytes構造一個ByteBuffer。
這裡涉及ByteBuf相關知識,後面有文章詳細解析。
ChannelHandlerContext#flush完成後,需要移除對應的快取節點。
```
public void removeBytes(long writtenBytes) {
for (;;) {
// #1
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// #2
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
// #3
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
```
`#1`
current方法返回flushedEntry節點快取資料。
結果null時,退出迴圈
`#2` 當前節點的資料已經全部寫入,
progress方法喚醒資料節點上ChannelProgressivePromise的監聽者
writtenBytes減去對應位元組數
remove()方法移除節點,釋放ByteBuf,flushedEntry標誌後移。
`#3` 當前節點的資料部分寫入,它應該是本次ChannelHandlerContext#flush操作的最後一個節點
更新ByteBuf的readerIndex,下次從這裡開始讀取資料。
退出
移除資料節點
```
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
// #1
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
// #2
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
// #3
e.recycle();
return true;
}
```
`#1`
flushed減1
當flushed為0時,flushedEntry賦值為null,否則flushedEntry指向後一個節點。
`#2` 釋放ByteBuf
`#3` 當前節點返回物件池中,以便複用。
下面來看一下ChannelHandlerContext#flush操作過程。
ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush
```
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// #1
outboundBuffer.addFlush();
// #2
flush0();
}
```
`#1` 重新整理outboundBuffer中資料節點
`#2` 寫入操作
flush -> NioSocketChannel#doWrite
```
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
// #1
if (in.isEmpty()) {
clearOpWrite();
return;
}
// #2
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
// #3
writeSpinCount -= doWrite0(in);
break;
case 1: {
// #4
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
// #5
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// #6
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// #7
...
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
```
`#1` 通過ChannelOutboundBuffer#flushed判斷是否沒有資料可以寫,沒有資料則清除關注事件OP_WRITE,直接返回。
`#2` 獲取ChannelOutboundBuffer中ByteBuf維護的(jvm)ByteBuffer,並統計nioBufferSize,nioBufferCount。
`#3` 這時沒有ByteBuffer,但是可能有其他型別的資料(如FileRegion型別),呼叫doWrite0繼續處理,這裡不再深入
`#4` 只有一個ByteBuffer,呼叫SocketChannel#write將資料寫入Channel。
`#5` 如果寫入資料數量小於等於0,說明資料沒有被寫出去(可能是因為套接字的緩衝區滿了等原因),那麼就需要關注該Channel上的OP_WRITE事件,方便下次EventLoop將Channel輪詢出來的時候,能繼續寫資料。
`#6` 移除ChannelOutboundBuffer快取資料節點。
`#7` 有多個ByteBuffer,呼叫`SocketChannel#write(ByteBuffer[] srcs, int offset, int length)`,批量寫入,與上一種情況處理類似
回顧之前文章《事件迴圈機制實現原理》中對NioEventLoop#processSelectedKey方法的解析
```
...
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
```
這裡會呼叫forceFlush方法,再次寫入資料。
**FlushConsolidationHandler**
ChannelHandlerContext#flush是很昂貴的操作,可能觸發系統呼叫,但資料又不能快取太久,使用FlushConsolidationHandler可以儘量達到寫入延遲與吞吐量之間的權衡。
FlushConsolidationHandler中維護了explicitFlushAfterFlushes變數,
在ChannelOutboundHandler#channelRead中呼叫flush,如果呼叫次數小於explicitFlushAfterFlushes, 會攔截flush操作不執行。
在channelReadComplete後呼叫flush,則不會攔截flush操作。
本文涉及ByteBuf元件,它是Netty中的記憶體緩衝區,後面有文章解析。
如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!
![](https://upload-images.jianshu.io/upload_images/3804367-a3375313d1db6207.png?imageMogr2/auto-orient/strip%7CimageView2/2/