【Netty原始碼分析】傳送資料過程
阿新 • • 發佈:2019-02-16
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
呼叫AbstractChannel的writeAndFlush函式@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
呼叫AbstractChannelHandlerContext的writeAndFlush函式
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
........
write(msg, true, promise);
.......
}
需要注意的一點是,寫資料的過程其實是分為兩步的,第一步是將要寫的資料寫到buffer中,第二步是flush其實就是從buffer中讀取資料然後傳送給服務端。private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
首先是呼叫write函式,將資料寫到buffer中。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
呼叫HeadContext的write函式
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
AbstractUnsafe中呼叫write函式,這一步就可以認為將資料寫到buffer中了,接下來buffer的東西我們會分析。@Override
public final void write(Object msg, ChannelPromise promise) {
.......
outboundBuffer.addMessage(msg, size, promise);
......
}
接下來是flush過程,將資料寫到服務端private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
HeadContext中呼叫flush過程@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
AbstractUnsafe中呼叫flush過程,在這裡我們可以看到之前寫入資料的buffer(outboundBuffer)
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
呼叫AbstractNioUnsafe的flush0函式@Override
protected void flush0() {
........
doWrite(outboundBuffer);
.......
}
AbstractUnsafe中呼叫flush0函式protected void flush0() {
........
doWrite(outboundBuffer);
.......
}
呼叫NioSocketChannel中的doWrite函式,在doWrite函式中會看到呼叫NIO中的socketChannel中的寫資料操作。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}