netty原始碼深入研究(從客戶端入手)第二篇(詳解讀訊息的管道處理流程)
上一篇講到netty和伺服器建立連線的所有過程,接著上一篇的結尾,看程式碼
private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { final Channel channel = connectPromise.channel(); 標記: channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
看標記處,此處處直接加入佇列一個任務執行連線,好找到我們的管道(channel即是NioSocketChannel),eventLoop()返回的就是NioEventLoopGroup,進入,在它的父類找到這個方法
@Override
public void execute(Runnable command) {
next().execute(command);
}
上一篇已經講過這個類next()方法獲取的是NioEventLoop,那麼這個類就是負責整個讀寫事件分發的最重要的一個類了,跟進
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //判斷當前處理任務的執行緒是否啟動 boolean inEventLoop = inEventLoop(); //沒有啟動的話開啟執行緒,啟動的話直接加入任務 if (inEventLoop) { addTask(task); } else { 標記: startThread(); addTask(task); //加入執行緒池關閉了,移除任務,拋拒絕任務的異常 if (isShutdown() && removeTask(task)) { reject(); } } //喚醒執行緒繼續處理任務 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } 看標記處現在最需要關心的就是啟動執行緒了, private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); //執行緒是否被打斷 if (interrupted) { thread.interrupt(); } boolean success = false; //更新最後一次啟動的時間 updateLastExecutionTime(); try { //執行當前的run方法,進入這個方法 SingleThreadEventExecutor.this.run(); success = true;
最後在NioEventLoop中發現最終處理任務的方法,如下程式碼
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { 標記: processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { 標記: processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
看標記處,其他的方法都是一些無關緊要的判斷,真正開始讀寫訊息了
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
//selectedKeys集合是第一篇當中連線之前,註冊的一些key值,nio通訊基礎,不熟悉的小夥伴請自行查詢
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
//死迴圈,不斷監測通道資訊,是否可讀,可寫
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
//a肯定屬於AbstractNioChannel
if (a instanceof AbstractNioChannel)
{ 標記:
processSelectedKey(k, (AbstractNioChannel) a);
}
else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task);
}
if (!i.hasNext()) { break; }
if (needsToSelectAgain) {
selectAgain(); selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) { break; }
else {
i = selectedKeys.iterator();
}
}
}
}
不容易啊,最終終於找到最核心的方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//已經連線上,nio收到這個通知
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//已經可以寫的時候收到這個通知
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//最後可以讀的時候收到這個通知
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
終於找到入口點,unsafe又是什麼鬼,曹曹曹.....,我想說整這麼多類真的好嗎,真正這相當於一箇中介啊,搞了半天真正執行方法的是unsafe,什麼鬼啊
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
/**
* Read from underlying {@link SelectableChannel}
*/
void read();
void forceFlush();
}
一個介面,好,通道里面持有這個介面,再次進入NioSocketChannel
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
最後居然是NioSocketChannelUnsafe類,隱藏夠深的,我只能說寫的真好
真正的讀終於開始了 public final void read() {
//獲取配置檔案 config = new NioSocketChannelConfig(this, socket.socket());
final ChannelConfig config = config();
//得到當前的管道
final ChannelPipeline pipeline = pipeline();
//allocator 是AdaptiveRecvByteBufAllocator
//ByteBuffer的封裝類最終靠他解析位元組並返回給客戶端的
final ByteBufAllocator allocator = config.getAllocator();
//allocHandle 最終是通過FixedRecvByteBufAllocator獲取的
標記1: final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
/迴圈開始讀
try {
do
{
標記2: byteBuf = allocHandle.allocate(allocator);
/向byteBuf裡填充讀取資料
標記3: allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0)
{ // nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
標記:最重要的方法開始分發管道了 pipeline.fireChannelRead(byteBuf);
byteBuf = null; }
while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) { closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead())
{ removeReadOp();
}
}
}
}
看標記1怎麼獲取到handler的 ,最終是獲得HandleImpl這個類
public Handle newHandle() {
return new HandleImpl(bufferSize);
}
那麼標記2的allocHandle.allocate(allocator);預設申請直接記憶體2048個位元組的ByteBuf ,不瞭解java的nio的請自行百度ByteBuf的用法。
看標記3,我們進入doReadBytes(byteBuf)的方法 protected int doReadBytes(ByteBuf byteBuf)
throws Exception {
//媽的剛轉到NioSocketChannel又轉回NioSocketChannelUnsafe
final RecvByteBufAllocator.Handle allocHandle =unsafe().recvBufAllocHandle();
//byteBuf還有多少空間可寫,即position離limit的距離,寫入HandleImpl的attemptedBytesRead屬性中,做記錄
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
//真正的讀取
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
好了現在該返回上一個方法了,即allocHandle.lastBytesRead(doReadBytes(byteBuf));方法,進入該方法
public final void lastBytesRead(int bytes) {
//最後一次讀取的位元組的個數
lastBytesRead = bytes;
if (bytes > 0) {
//總共讀取的位元組數
totalBytesRead += bytes;
}
}
這個方法的主要作用是記錄從流中讀了多少資料,那麼現在思路清晰多了,HandleImpl類就是用來做記錄用的一個輔助類
//如果讀取的的位元組為0的話,那麼釋放byteBuf
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
//將讀的次數加1
allocHandle.incMessagesRead(1);
readPending = false;
//呼叫通道將讀取的資料傳入
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//如果仍然可讀繼續讀
}
while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close)
{
closeOnRead(pipeline);
}
接下來找到我們的DefaultChannelPipeline類
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
繼續,AbstractChannelHandlerContext又是什麼鬼呢,還記得上一篇講的嗎,這是個管道封裝類,管道即我們新增的
ch.pipeline().addLast("encoder", new Encoder());
ch.pipeline().addLast("decoder", new Decoder());
ch.pipeline().addLast("handler",new ClientHandler(NettyCore.this));
這些類通通是繼承ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter的類,先看一下怎麼新增的
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
以佇列的方式插入到tail的前面和 head的後面,那麼這兩個又是幹啥的呢
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
初始化時建立,將tail放在head的後面,這是addFirst方法
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
將newCtx插入到佇列head的後面,繼續
//第一個引數是head頭,第二引數是byteBuf
static void invokeChannelRead
(final AbstractChannelHandlerContext next, Object msg)
{
//判斷管道是否可用
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop())
{ next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m); }
});
}
}
饒了這麼一大圈終於到了傳訊息了
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
標記1: ((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t)
{ notifyHandlerException(t); } }
else { 標記2: fireChannelRead(msg); } }
進入標記2的方法先從佇列頭開始讀
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
findContextInbound又是什麼鬼呢?
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
重點終於來了
//讀管道我們必須繼承ChannelInboundHandler
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
//寫管道我必須繼承ChannelOutboundHandler
private static boolean isOutbound(ChannelHandler handler)
{ return handler instanceof ChannelOutboundHandler;
}
現在思路已經很清晰了,從head開始呼叫invokeChannelRead-》fireChannelRead-》invokeChannelRead-》呼叫佇列下一個fireChannelRead,以此形
成迴圈,知道呼叫佇列最後一個tail的時候invokeHandler()為true走標記1,此時結束一次讀取。現在整個讀取就形成了一條鏈。
總結:tcp緩衝區一有資料就會發送讀訊號開始讀取,每次讀到資料就開始鏈式分發,交給管道處理,管道處理得到我們想要的資料,直到讀夠資料為止,返回資料給客戶端。