1. 程式人生 > >【Netty原始碼分析】傳送資料過程

【Netty原始碼分析】傳送資料過程

future.channel().writeAndFlush("Hello Netty Server ,I am a common client");  
呼叫AbstractChannel的writeAndFlush函式
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
}

呼叫AbstractChannelHandlerContext的writeAndFlush函式

@Override
public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
	........
	
	write(msg, true, promise);
	
	.......

}
需要注意的一點是,寫資料的過程其實是分為兩步的,第一步是將要寫的資料寫到buffer中,第二步是flush其實就是從buffer中讀取資料然後傳送給服務端。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

首先是呼叫write函式,將資料寫到buffer中。

private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

呼叫HeadContext的write函式

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
}
AbstractUnsafe中呼叫write函式,這一步就可以認為將資料寫到buffer中了,接下來buffer的東西我們會分析。
@Override
public final void write(Object msg, ChannelPromise promise) {

	.......
	
    outboundBuffer.addMessage(msg, size, promise);
	
	......
}
接下來是flush過程,將資料寫到服務端
private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
HeadContext中呼叫flush過程
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
     unsafe.flush();
}

AbstractUnsafe中呼叫flush過程,在這裡我們可以看到之前寫入資料的buffer(outboundBuffer)

@Override
public final void flush() {
   assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
         return;
     }

    outboundBuffer.addFlush();
    flush0();
}
呼叫AbstractNioUnsafe的flush0函式
@Override
protected void flush0() {
		
	........
	 
    doWrite(outboundBuffer);
	
	.......
           
}
AbstractUnsafe中呼叫flush0函式
protected void flush0() {
		
	........
	 
    doWrite(outboundBuffer);
	
	.......
           
}

呼叫NioSocketChannel中的doWrite函式,在doWrite函式中會看到呼叫NIO中的socketChannel中的寫資料操作。

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }