1. 程式人生 > >netty原始碼深入研究(從客戶端入手)第二篇(詳解讀訊息的管道處理流程)

netty原始碼深入研究(從客戶端入手)第二篇(詳解讀訊息的管道處理流程)

上一篇講到netty和伺服器建立連線的所有過程,接著上一篇的結尾,看程式碼

private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

       
        final Channel channel = connectPromise.channel();
      標記:  channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }

看標記處,此處處直接加入佇列一個任務執行連線,好找到我們的管道(channel即是NioSocketChannel),eventLoop()返回的就是NioEventLoopGroup,進入,在它的父類找到這個方法
 @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

上一篇已經講過這個類next()方法獲取的是NioEventLoop,那麼這個類就是負責整個讀寫事件分發的最重要的一個類了,跟進

 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
//判斷當前處理任務的執行緒是否啟動
        boolean inEventLoop = inEventLoop();
//沒有啟動的話開啟執行緒,啟動的話直接加入任務
        if (inEventLoop) {
            addTask(task);
        } else {
 標記:           startThread();
            addTask(task);
//加入執行緒池關閉了,移除任務,拋拒絕任務的異常
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
//喚醒執行緒繼續處理任務
 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
看標記處現在最需要關心的就是啟動執行緒了,
private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
//執行緒是否被打斷
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
//更新最後一次啟動的時間
 updateLastExecutionTime(); try {
//執行當前的run方法,進入這個方法
                    SingleThreadEventExecutor.this.run();
                    success = true;

最後在NioEventLoop中發現最終處理任務的方法,如下程式碼
 protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
  標記:                      processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
 標記:                  processSelectedKeys();
 } finally { // Ensure we always run tasks.
 final long ioTime = System.nanoTime() - ioStartTime; 
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } 
catch (Throwable t) {
 handleLoopException(t); } 
// Always handle shutdown even if the loop processing threw an exception. try { 
if (isShuttingDown()) { 
closeAll(); 
if (confirmShutdown()) { return; } } }
 catch (Throwable t) 
{ handleLoopException(t); } } }

看標記處,其他的方法都是一些無關緊要的判斷,真正開始讀寫訊息了
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
      //selectedKeys集合是第一篇當中連線之前,註冊的一些key值,nio通訊基礎,不熟悉的小夥伴請自行查詢
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
 //死迴圈,不斷監測通道資訊,是否可讀,可寫
 for (;;) { 
final SelectionKey k = i.next(); 
final Object a = k.attachment();
 i.remove();
//a肯定屬於AbstractNioChannel
 if (a instanceof AbstractNioChannel) 
{ 標記:
 processSelectedKey(k, (AbstractNioChannel) a); 
} 
else { 
@SuppressWarnings("unchecked")
 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); 
} 
if (!i.hasNext()) { break; } 
if (needsToSelectAgain) { 
selectAgain(); selectedKeys = selector.selectedKeys(); 
// Create the iterator again to avoid ConcurrentModificationException
 if (selectedKeys.isEmpty()) { break; } 
else {
 i = selectedKeys.iterator(); 
} 
} 
} 
}

不容易啊,最終終於找到最核心的方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
               
                return;
            }
          
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
           //已經連線上,nio收到這個通知
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
             
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

           
//已經可以寫的時候收到這個通知
 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 
ch.unsafe().forceFlush(); 
} 
//最後可以讀的時候收到這個通知
 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 
unsafe.read(); 
} 
} catch (CancelledKeyException ignored) { 
unsafe.close(unsafe.voidPromise()); 
} 
}

終於找到入口點,unsafe又是什麼鬼,曹曹曹.....,我想說整這麼多類真的好嗎,真正這相當於一箇中介啊,搞了半天真正執行方法的是unsafe,什麼鬼啊

 public interface NioUnsafe extends Unsafe {
        /**
         * Return underlying {@link SelectableChannel}
         */
        SelectableChannel ch();

        /**
         * Finish connect
         */
        void finishConnect();

        /**
         * Read from underlying {@link SelectableChannel}
         */
        void read();

        void forceFlush();
    }

一個介面,好,通道里面持有這個介面,再次進入NioSocketChannel

 protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

最後居然是NioSocketChannelUnsafe類,隱藏夠深的,我只能說寫的真好

真正的讀終於開始了
 public final void read() {
//獲取配置檔案 config = new NioSocketChannelConfig(this, socket.socket());
            final ChannelConfig config = config();
//得到當前的管道
 final ChannelPipeline pipeline = pipeline();
 
//allocator 是AdaptiveRecvByteBufAllocator
//ByteBuffer的封裝類最終靠他解析位元組並返回給客戶端的
 final ByteBufAllocator allocator = config.getAllocator();
//allocHandle 最終是通過FixedRecvByteBufAllocator獲取的
       標記1:     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
/迴圈開始讀
 try { 
do 
{ 
標記2: byteBuf = allocHandle.allocate(allocator);
/向byteBuf裡填充讀取資料
標記3: allocHandle.lastBytesRead(doReadBytes(byteBuf)); 
if (allocHandle.lastBytesRead() <= 0) 
{ // nothing was read. release the buffer. 
byteBuf.release(); 
byteBuf = null; 
close = allocHandle.lastBytesRead() < 0; 
break; 
} 
allocHandle.incMessagesRead(1);
 readPending = false;
標記:最重要的方法開始分發管道了 pipeline.fireChannelRead(byteBuf);
 byteBuf = null; } 
while (allocHandle.continueReading()); 
allocHandle.readComplete(); 
pipeline.fireChannelReadComplete();
 if (close) { closeOnRead(pipeline); 
} 
} catch (Throwable t) {
 handleReadException(pipeline, byteBuf, t, close, allocHandle);
 } finally { 
if (!readPending && !config.isAutoRead())
 { removeReadOp(); 
} 
}
 } 
}

看標記1怎麼獲取到handler的 ,最終是獲得HandleImpl這個類
public Handle newHandle() {
        return new HandleImpl(bufferSize);
    }

那麼標記2的allocHandle.allocate(allocator);預設申請直接記憶體2048個位元組的ByteBuf ,不瞭解java的nio的請自行百度ByteBuf的用法。

看標記3,我們進入doReadBytes(byteBuf)的方法
  protected int doReadBytes(ByteBuf byteBuf)
 throws Exception {
//媽的剛轉到NioSocketChannel又轉回NioSocketChannelUnsafe
   final RecvByteBufAllocator.Handle allocHandle =unsafe().recvBufAllocHandle();
//byteBuf還有多少空間可寫,即position離limit的距離,寫入HandleImpl的attemptedBytesRead屬性中,做記錄
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
//真正的讀取
 return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); 
}
好了現在該返回上一個方法了,即allocHandle.lastBytesRead(doReadBytes(byteBuf));方法,進入該方法
public final void lastBytesRead(int bytes) {
//最後一次讀取的位元組的個數
            lastBytesRead = bytes;
            if (bytes > 0) {
//總共讀取的位元組數
                totalBytesRead += bytes;
            }
        }

這個方法的主要作用是記錄從流中讀了多少資料,那麼現在思路清晰多了,HandleImpl類就是用來做記錄用的一個輔助類

//如果讀取的的位元組為0的話,那麼釋放byteBuf
 if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

            
//將讀的次數加1
 allocHandle.incMessagesRead(1); 
readPending = false;
//呼叫通道將讀取的資料傳入
 pipeline.fireChannelRead(byteBuf); 
byteBuf = null;
//如果仍然可讀繼續讀
 } 
while (allocHandle.continueReading()); 
allocHandle.readComplete();
 pipeline.fireChannelReadComplete(); 
if (close) 
{ 
closeOnRead(pipeline); 
}


接下來找到我們的DefaultChannelPipeline類 
  public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

繼續,AbstractChannelHandlerContext又是什麼鬼呢,還記得上一篇講的嗎,這是個管道封裝類,管道即我們新增的
ch.pipeline().addLast("encoder", new Encoder());
ch.pipeline().addLast("decoder", new Decoder());
ch.pipeline().addLast("handler",new ClientHandler(NettyCore.this));

這些類通通是繼承ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter的類,先看一下怎麼新增的

 private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

以佇列的方式插入到tail的前面和 head的後面,那麼這兩個又是幹啥的呢

 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

初始化時建立,將tail放在head的後面,這是addFirst方法

private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

將newCtx插入到佇列head的後面,繼續

//第一個引數是head頭,第二引數是byteBuf
 static void invokeChannelRead
(final AbstractChannelHandlerContext next, Object msg)
 {
//判斷管道是否可用
 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); 
EventExecutor executor = next.executor();
 if (executor.inEventLoop()) 
{ next.invokeChannelRead(m); 
} else { 
executor.execute(new Runnable() { 
@Override 
public void run() { 
next.invokeChannelRead(m); } 
}); 
} 
}

饒了這麼一大圈終於到了傳訊息了

private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
     
 標記1:   ((ChannelInboundHandler) handler()).channelRead(this, msg);
 } catch (Throwable t) 
{ notifyHandlerException(t); } }
 else { 標記2: fireChannelRead(msg); } }

進入標記2的方法先從佇列頭開始讀

 public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

findContextInbound又是什麼鬼呢?

private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

重點終於來了

//讀管道我們必須繼承ChannelInboundHandler

 private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
//寫管道我必須繼承ChannelOutboundHandler
 private static boolean isOutbound(ChannelHandler handler) 
{ return handler instanceof ChannelOutboundHandler; 
}

現在思路已經很清晰了,從head開始呼叫invokeChannelRead-》fireChannelRead-》invokeChannelRead-》呼叫佇列下一個fireChannelRead,以此形

成迴圈,知道呼叫佇列最後一個tail的時候invokeHandler()為true走標記1,此時結束一次讀取。現在整個讀取就形成了一條鏈。

總結:tcp緩衝區一有資料就會發送讀訊號開始讀取,每次讀到資料就開始鏈式分發,交給管道處理,管道處理得到我們想要的資料,直到讀夠資料為止,返回資料給客戶端。