【Netty 專欄】深入淺出 Netty write
把資料返回客戶端,需要經歷三個步驟:
1、申請一塊快取buf,寫入資料。
2、將buf儲存到ChannelOutboundBuffer中。
3、將ChannelOutboundBuffer中的buff輸出到socketChannel中。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ReferenceCountUtil.release(msg); ByteBuf buf1 = ctx.alloc().buffer(4); buf1.writeInt(1); ByteBuf buf2 = ctx.alloc().buffer(4); buf2.writeInt(2); ByteBuf buf3 = ctx.alloc().buffer(4); buf3.writeInt(3); ctx.write(buf1); ctx.write(buf2); ctx.write(buf3); ctx.flush(); }
為什麼需要把buf儲存到ChannelOutboundBuffer?
ctx.write()實現:
//AbstractChannelHandlerContext.java public ChannelFuture write(Object msg) { return write(msg, newPromise()); } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, msg, promise); } else { task = WriteTask.newInstance(next, msg, promise); } safeExecute(executor, task, promise, msg); } }
預設情況下,findContextOutbound()會找到pipeline的head節點,觸發write方法。
//HeadContext.java public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } //AbstractUnsafe public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
outboundBuffer 隨著Unsafe一起例項化,最終將msg通過outboundBuffer封裝起來。
ChannelOutboundBuffer內部維護了一個Entry連結串列,並使用Entry封裝msg。
1、unflushedEntry:指向連結串列頭部
2、tailEntry:指向連結串列尾部
3、totalPendingSize:儲存msg的位元組數
4、unwritable:不可寫標識
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false);
}
通過Entry.newInstance返回Entry例項,Netty對Entry採用了快取策略,使用完的Entry例項需要清空並回收,難道是因為Entry例項化比較耗時?
新的entry預設插入連結串列尾部,並讓tailEntry指向它。
img
Paste_Image.png
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
方法incrementPendingOutboundBytes主要採用CAS更新totalPendingSize欄位,並判斷當前totalPendingSize是否超過閾值writeBufferHighWaterMark,預設是65536。如果totalPendingSize >= 65536,則採用CAS更新unwritable為1,並觸發ChannelWritabilityChanged事件。
到此為止,全部的buf資料已經儲存在outboundBuffer中。
ctx.flush()實現:
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}
return this;
}
預設情況下,findContextOutbound()會找到pipeline的head節點,觸發flush方法。
//HeadContext.java
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
//AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
方法addFlush主要對write過程新增的msg進行flush標識,其實我不清楚,這個標識過程有什麼意義。
直接看flush0方法:
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
1、如果當前selectionKey 是寫事件,說明有執行緒執行flush過程,則直接返回。
2、否則直接執行flush操作。
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
1、如果當前socketChannel已經關閉,或斷開連線,則執行失敗操作。
2、否則執行doWrite把資料寫入到socketChannel。
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}
1、size方法返回outboundBuffer有多少Entry例項。
2、in.nioBuffers()負責把Entry中儲存的ByteBuf型別的msg,重新返回Nio的ByteBuffer例項,並返回ByteBuffer陣列nioBuffers,其實msg和ByteBuffer例項指向的是同一塊記憶體,因為在UnpooledDirectByteBuf實現類中,已經維護了ByteBuffer的例項。
3、socketChannel.write()方法把nioBuffers的資料寫到socket中,這是Nio中的實現。
到此為止,nioBuffers的資料都flush到socket,客戶端可以準備接收了。