1. 程式人生 > >Java Netty 學習(九) - ChannelPipeline

Java Netty 學習(九) - ChannelPipeline

上一篇文章學習了Channel,它遮蔽了許多底層的java.net.Socket的操作,
那麼,當有了資料流之後,就到了如何處理它的時候,那麼本篇文章先看ChannelPipeline和ChannelHandler。

概念

Netty裡面的ChannelPipeline就類似於一根管道,而ChannelHandler類似於裡面的攔截器,當一個攔截器攔截完後,可以向後傳遞,或者跳過。
ChannelPipeline提供了提供了ChannelHandler鏈上的容器,並定義了用於該鏈的傳播入站和出站的流API。ChannelPipeline持有I/O事件攔截器ChannelHandler的連結串列,由ChannelHandler對I/O事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定製

很簡單的理解就是編碼器和解碼器都是ChannelHandler,當資料在網路上傳輸時候,通訊雙方會定義協議和格式,此時到了Channel端,則需要進行解碼,從而再進行業務處理,而處理完,再進行編碼傳送出去。當然這是例子,實際中可以定一個多個ChannelHandler,這些ChannelHandler將以連結串列的形式存在,再進行事件傳遞。

當建立一個新的Channel時,都會分配了一個新的ChannelPipeline,該關聯是永久的,該通道既不能附加另一個ChannelPipeline也不能分離當前的ChannelPipeline
下面分別介紹ChannelPipline

ChannelPipline

ChannelPipline可以理解為管家,對ChannelHandler進行攔截和排程。
當一個訊息被ChannelPipeline的Handler鏈攔截和處理過程是怎樣的呢?
在上文中的HelloClientHandlerchannelActive打一個端點,分析其執行流程
先來分析下:

  1. 啟動Server.java,隨後啟動Client.java
  2. 當Client嘗試連線到Server時,初始化了Channel資訊,可看:Netty的Channel
  3. 隨後,由於HelloClientHandler也是一個Handler,它的呼叫必定經過ChannelPipeline。
  4. 隨後,底層的SocketChannel read()方法讀取ByteBuf,觸發ChannelRead事件
  5. 由IO執行緒EventLoopGroup 分配執行緒作為Selector,等待到了事件變化,並將事件按照類別處理,如下:
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();     //獲取channel的unsafe內部類
        if (!k.isValid()) {             //key可用
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();            //從Channel中獲取對應的EventLoop
            } catch (Throwable ignored) {
            // 如果丟擲異常則直接返回,因為原因可能是還沒有EventLoop
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();    // 獲取Selector的keys
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {  // 可讀並且連線的事件
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();  //先呼叫finishConnect,否則jdk會丟擲NotYetConnectedException
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {  //可讀並且寫事件
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();   //讀事件
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
  1. 此時由於NIO的Keys變化,執行了unsaferead()方法,而在read方法裡面,將呼叫pipelinefireChannelRead方法,將事件流傳遞,如read中下面程式碼:
				int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
  1. 此時訊息已經傳遞到ChannelPipeline,通過fireChannel*方法將訊息向後傳遞向後傳遞,利用ChannelHandlerContext來依次傳遞給channelHandler1,channelHandler2,channelHandler3…

整個read事件就如上。
那麼write事件呢?可以理解為和read相反,訊息從tailHandler開始,途經channelHandlerN……channelHandler1, 最終被新增到訊息傳送緩衝區中等待重新整理和傳送,在此過程中也可以中斷訊息的傳遞,例如當編碼失敗時,就需要中斷流程,構造異常的Future返回等。

事件

事件主要分為inbound和outbound,即入站和出站事件
fireChannelActivefireChannelReadfireChannelReadComplete等則為入站事件,而
writeflushdisconnect則為出戰事件。
看ChannelInbound和ChannelOutbound結構圖:
在這裡插入圖片描述

在這裡插入圖片描述

即一般實現ChannelHandler,繼承相應的裝飾器類Adapter即可,然後重寫需要的方法。

構建ChannelPipeline

ChannelPipline介面提供了ChannelHandler鏈的容器,並定義了用於在該鏈上傳播入站和出戰的事件流API。當Channel被建立時, 它會自動的分配到它專屬的ChannelPipline中。
而且並不用程式設計師去建立一個ChannelPipline,只需要往這個容器中丟東西就好了,記得在BootStrap啟動時:

pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast(new HelloClientHandler());
pipeline.addLast("encoder", new MyProtocolEncoder());

ChannelPipeline的機制和和Map很相似,以簡直對的方式講ChannelHandler管理起來,增刪改查,但是此時會有問題,類似與Map有ConcurrentHashMap一類併發容器,理論上,ChannelPipeline會有IO執行緒和使用者執行緒之間的併發情況,以及使用者之間的併發情況,那麼ChannelPipeline併發下怎麼解決呢?
在ChannelPipeline中有四個方法:

  • addFirst
  • addBefore
  • addAfter
  • addLast
    通過查DefaultChannelPipeline的原始碼不難發現,這四個方法都使用Synchronized(this)來加鎖,將當前整個ChannelPipeline給鎖起來,這樣依賴就很好的避免了更改Pipeline內部連結串列結構時候出現的併發問題。
    每當新增時候,ChannelPipeline都會呼叫checkDuplicateName(name);進行同名校驗,從表頭迴圈到表尾進行校驗:
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

DefaultChannelPipeline 是ChannelPipline的預設實現,能夠滿足大多數ChannelPipline的需求,其父類實現了ChannelInboundInvokerChannelOutboundInvoker 用於能夠分別給不同型別的事件傳送通知。
在這裡插入圖片描述

ChannelPipeline中的耗時操作

ChannelPipline中的每一個ChannelHandler都是通過它的EventLoop(IO執行緒)來處理它的事件的。所以不要阻塞這個執行緒,因為會對整體 IO產生負面影響。
但有時可能需要與那些使用阻塞API的遺留程式碼進行互動,對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroupaddFirstaddLastaddBeforeaddAfter 方法,如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroupEventExecutor所處理 (類似新開一個執行緒),從而被該Channel本身的EventLoop中移除,對於這種用例,Netty提供一個交DefaultEventExecutorGroup預設實現
當然,在上述四個add*方法也是有Synchronized修飾的

ChannelHandlerContext介面

  • ChannelHandlerContext代表ChannelHandlerChannelPipline之間的關聯,每當有ChannelHandler新增到ChannelPipline中時,
    都會建立ChannelHandlerContext,它的主要功能是管理它所關聯的ChannelHandler和它同一個ChannelPipline中其他ChannelHandler
    之間的互動。
  • ChannelHandlerContext有很多方法,其中一些方法也存在於ChannelChannelPipline本身上,但有一點重要不同,如果呼叫Channel
    或者ChannelPipline上的這些方法,它們將沿著整個ChannelPipline進行傳播。而呼叫位於ChannelHandlerContext上相同方法,
    則講從當前所關聯的ChannelHandler開始,並且只會傳播給位於該ChannelPipline中下一個能夠處理的ChannelHandler
  • ChannelHandlerContextChannelHandler之間的關聯是永遠不變的,所以快取你對他的引用是安全的
  • 相對於其他類的同名方法,ChannelHandlerContext的方法將產生更短的事件流,應該儘可能利用這個特性來獲得最大的效能
    雖然被呼叫的CHannel或者ChannelPipline上的write方法一直傳播事件通過整個ChannelPipline,但是在ChannelHandler的級別上,
    事件從一個ChannelHandler到下一個ChannelHandler的移動是由ChannelHandlerContext上呼叫完成的。

一個ChannelHandler可以從屬於多個ChannelPipline,所以它也可以繫結到多個ChannelHandlerContext例項,對於這種用法,
對應的ChannelHandler必須要使用@Shareable註解標註,否則試圖將它新增多個ChannelPIpline將會觸發異常。顯而易見,為了安全的
備用與多個併發的Channel,這樣的ChannelHandler必須是執行緒安全的。

參考資料:

  1. Netty In Action
  2. Netty 權威指南
  3. Netty 原始碼 4.1.12 Final