1. 程式人生 > 其它 >Netty 原始碼解讀(二)-ChannelPipeline、ChannelHandler、ChannelHandlerContext

Netty 原始碼解讀(二)-ChannelPipeline、ChannelHandler、ChannelHandlerContext

1.ChannelPipeline、ChannelHandler、ChannelHandlerContext 的關係

1. 每建立一個Socket 就會分配一個全新的ChannelPipeline (簡稱pipeline)

2. 每一個 ChannelPipeline 內部包含多個 ChannelHandlerContext (簡稱Context)

3. 他們一起組成了一個雙向連結串列,這些Context 用於封裝我們呼叫addLast 時新增的Channelhandler(以下簡稱Handler)

也就是說ChannelSocket 和 ChannelPipeline 是一對一的關係,而pipeline內部的多個Context 行成了連結串列,Context 只是對Handler 的封裝。

當一個請求進來時會進入socket 對應的pipeline,並經過pipeline 所有的handler。 可以理解為過濾器模式。

2. 設計

1. ChannelPipeline 設計

該介面繼承了ChannelInboundInvoker、 ChannelOutboundInvoker、 Iterable 介面。 標識可以呼叫資料出站的方法和入站的方法, 同時也能遍歷內部的連結串列。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

處理過程如下:

(1)入站事件由入站處理程式自下而上的方向處理。入站處理程式通常處理由地步的IO執行緒生成入站資料,入站資料通常從SocketChannel#read(ByteBuffer) 獲取

(2) 通常一個pipeline 有多個handler。例如一個典型的伺服器在每個通常的管道中都會有一下處理程式:

協議解碼器-將二進位制資料轉換為Java 物件;

協議編碼器-將java 物件轉換為二進位制資料

業務邏輯處理程式-執行實際業務邏輯

(3) 業務程式不能將執行緒阻塞,會影響IO的速度,進而影響整個Netty 程式的效能。如果業務程式很快,可以放在IO執行緒中,反之就需要非同步執行。 或者在新增handler的時候新增一個執行緒池。

2. ChannelHandler 作用以及設計

(1) 原始碼

public interface ChannelHandler {

    // 當把channelHandler 新增到pipeline 時被呼叫
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    // 當從pipeline 移除時呼叫
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    // 處理髮生異常時呼叫
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

(2) 作用: 作用是處理IO事件或攔截IO事件,並將其轉發給下一個handler。 handler 處理事件是分入站和出站的(入站是說讀取資料到程式處理的過程,出站是說寫出資料到呼叫核心write方法寫出去資料的過程)。兩個方向的操作都是不同的,因此,netty 定義了兩個子介面繼承ChannelHandler。

入站:ChannelInboundHandler

出站: ChannelOutboundHandler

入站出站都可以處理的handler:ChannelDuplexHandler

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {

3. ChannelHandlerContext 作用

ChannelHandlerContext 同時繼承了 ChannelInboundInvoker, ChannelOutboundInvoker。ChannelHandlerContext 也 定義了一些自己的方法。這些方法能夠獲取Context 上下文環境的物件,比如channel、executor、handler、pipeline, 記憶體分配器,關聯的handler 是否被刪除等資訊。Context 就是包裝了handler相關的一切,以方便Contex 可以在pipeline 方便的操作handler。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

如下自身的方法:

3. 建立過程

1. 任何一個ChannelSocket 建立的同時都會建立一個pipeline

io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) 原始碼如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

可以看到連結串列有兩個偽節點(頭和尾)。

1》頭節點:

io.netty.channel.DefaultChannelPipeline.TailContext (入站處理handler)

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }
。。。
}

2》偽節點:

HeadContext 是一個入站和出站都兼顧的handler

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
。。。
}

  建立是在客戶端建立連線時:

2. 當用戶或系統內部呼叫pipeline的addXX 方法新增handler 時,都會建立一個包裝這handler 的Context

io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

解釋:

1. io.netty.channel.DefaultChannelPipeline#checkMultiplicity 檢查該例項是否是共享的,如果不是並且已經被別的pipeline 使用了,則丟擲異常

2. 呼叫io.netty.channel.DefaultChannelPipeline#newContext 建立一個DefaultChannelHandlerContext

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

3. 呼叫io.netty.channel.DefaultChannelPipeline#addLast0 新增到鏈條尾部

4. 做一些其他處理

4. 呼叫過程

讀的時候從head開始, 寫的時候從tail 開始。

呼叫過程可以用下圖示識:

1. 入站讀取資料追蹤

io.netty.channel.DefaultChannelPipeline#fireChannelRead程式碼如下:

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

讀取資料時呼叫過程如下:

1》 繼續呼叫io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

2》io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

    private void invokeChannelRead(Object msg) {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
            } catch (Throwable var3) {
                this.invokeExceptionCaught(var3);
            }
        } else {
            this.fireChannelRead(msg);
        }

    }

  這時候會呼叫handler的channelRead 方法。也就是具體的handler 的方法。

3》io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead 方法如下: 相當於沒做任何邏輯處理,直接呼叫下一個處理器處理

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

4》io.netty.channel.AbstractChannelHandlerContext#fireChannelRead

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

io.netty.channel.AbstractChannelHandlerContext#findContextInbound 如下:(可以看到入站是找inbound屬性為true的context,然後繼續進行呼叫)

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

5》繼續呼叫io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)方法(同1》 一樣)

  也就是如果希望pipeline 中的context 繼續處理,需要在handler中繼續呼叫 ctx.fireXXX 方法,比如io.netty.handler.logging.LoggingHandler#channelRead

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "READ", msg));
        }
        ctx.fireChannelRead(msg);
    }

2. 出站資料跟蹤

1》io.netty.channel.DefaultChannelPipeline#write(java.lang.Object, io.netty.channel.ChannelPromise)

    public final ChannelFuture write(Object msg, ChannelPromise promise) {
        return tail.write(msg, promise);
    }

2》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        write(msg, false, promise);

        return promise;
    }

3》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }

4》io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

5》io.netty.channel.AbstractChannelHandlerContext#invokeWrite0 這裡呼叫handler的write 方法

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

6》handler 處理完成如果需要繼續處理呼叫ctx.write(msg, promise); 會重新呼叫到io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

【當你用心寫完每一篇部落格之後,你會發現它比你用程式碼實現功能更有成就感!】