1. 程式人生 > 其它 >netty原始碼之ChannelPipeline

netty原始碼之ChannelPipeline

技術標籤:nettynettychannelpipelinepipeline

netty原始碼之ChannelPipeline

Netty中ChannelPipeline和ChannelHandler機制類似於Servlet和Filter過濾器,實際上都是職責鏈模式的一種變形,主要是為了方便事件的攔截和使用者業務邏輯的定製。

Netty的Channel過濾器實現原理與Servlet的Filter機制一致,它將Channel的資料管道抽象為ChannelPipeline,訊息在ChannelPipeline中流動和傳遞。ChannelPipeline持有IO事件攔截器ChannelHandler的連結串列,由ChannelHandler對IO事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定製,不需要對已有的ChannelHandler進行修改,能夠實現對修改封閉和對擴充套件的支援(開閉原則)。

ChannelPipeline的結構

ChannelPipeline是ChannelHandler的容器,它負責ChannelHandler的管理和事件攔截與排程。

ChannelPipeline底層使用了一個雙向連結串列來儲存ChannelHandler,但並不是直接儲存的ChannelHandler,而是ChannelHandlerContext,在ChannelHandlerContext可以直接獲取到與之對應的ChannelHandler、ChannelPipeline、Channel。

在這裡插入圖片描述

在使用Netty時,我們並不需要手動建立ChannelPipeline,因為使用ServerBootstrap或者Bootstrap啟動服務端或者客戶端時,Netty會為每個Channel連線建立一個獨立的ChannelPipeline。對於使用者而言,只需要將自定義的攔截器加入到ChannelPipeline中即可。

ChannelPipeline支援執行態動態的新增或者刪除ChannelHandler,在某些場景下這個特性非常實用。例如當業務高峰期需要對系統做擁塞保護時,就可以根據當前的系統時間進行判斷,如果處於業務高峰期,則動態地將系統擁塞保護ChannelHandler新增到當前的ChannelPipeline中,當高峰期過去之後,就可以動態刪除擁塞保護ChannelHandler了。

ChannelPipeline是執行緒安全的,這意味著N個業務執行緒可以併發地操作ChannelPipeline而不存在多執行緒併發問題。但是,ChannelHandler卻不是執行緒安全的,這意味著儘管ChannelPipeline是執行緒安全的,但是使用者仍然需要自己保證ChannelHandler的執行緒安全。

怎麼保證ChannelHandler的執行緒安全?

  1. 每個Channel儘量保證都擁有自己的Handler,而不是Handler在多個Channel之間共享,也就是下面的程式碼不推薦使用,這麼寫那麼這個ServerHandler必須用註解@ChannelHandler.Sharable明確表明這是一個共享的handler,而且是執行緒安全的。
ServerBootstrap b = new ServerBootstrap();
ServerHandler serverHandler = new ServerHandler();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(serverHandler);
            }
        });

而是推薦這麼使用:ch.pipeline().addLast(new ServerHandler());

  1. 儘量不使用共享資源,如果使用了共享資源,在操作時需要對共享資源進行同步,如加鎖。

ChannelPipeline的事件處理

Netty中的事件分為inbound(入站)事件和outbound(出站)事件。

inbound事件通常由IO執行緒觸發,例如TCP鏈路建立事件、鏈路關閉事件、讀事件、異常通知事件等。觸發inbound事件的方法如下:

  • ChannelHandlerContext.fireChannelRegistered(): Channel註冊事件。
  • ChannelHandlerContext.fireChannelActive():TCP鏈路建立成功,Channel啟用事件。
  • ChannelHandlerContext.fireChannelRead(Object):讀事件。
  • ChannelHandlerContext.fireChannelReadComplete():讀操作完成通知事件。
  • ChannelHandlerContext.fireExceptionCaught(Throwable):異常通知事件。
  • ChannelHandlerContext.fireUserEventTriggered(Object):使用者自定義事件。
  • ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可寫狀態變化通知事件。
  • ChannelHandlerContext.fireChannelInactive():TCP連線關閉,鏈路不可用通知事件。

outbound事件通常是由使用者主動發起的網路IO操作,例如使用者發起的連線操作、繫結操作、訊息傳送等操作。觸發outbound事件的方法如下:

  • ChannelHandlerContext.bind(Socket.Address, ChannelPromise):繫結本地地址事件。
  • ChannelHandlerContext.connect(SocketAddress,SocketAddress, ChannelPromise):連線服務端事件。
  • ChannelHandlerContext.write(Object, ChannelPromise):傳送事件。
  • ChannelHandlerContext.flush():重新整理事件。
  • ChannelHandlerContext.read():讀事件。
  • ChannelHandlerContext.disconnect(ChannelPromise):斷開連線事件。
  • ChannelHandlerContext.close(ChannelPromise):關閉當前Channel事件。

DefaultChannelPipeline

ChannelPipeline的實現其實只有一個DefaultChannelPipeline。

DefaultChannelPipeline對ChannelHandler的管理

ChannelPipeline是ChannelHandler的管理容器,必然要負責ChannelHandler的查詢、新增、替換和刪除,所以它裡面的方法很多都是和管理ChannelHandler有關的,比如addXXX、removeXXX、replaceXXX等。

ChannelPipeline內部維護了一個ChannelHandler的連結串列和迭代器,所以從本質上來說,上述的管理操作基本上就是對連結串列的操作。但是ChannelHandler本身並不是連結串列上的元素,所以在具體實現上是把ChannelHandler包裝進ChannelHandlerContext的例項DefaultChannelHandlerContext,然後把ChannelHandlerContext作為元素來組成連結串列。

下面是DefaultChannelPipeline.addLast()方法的原始碼:

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler); // 檢查handler是否是共享的,如果是,必須加@Sharable註解,否則會丟擲異常

        // filterName()方法會檢查handler是否已經存在
        // 建立DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);

        // 新增至連結串列尾部
        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    // 回撥handler.handlerAdd()方法
    callHandlerAdded0(newCtx);
    return this;
}

由於ChannelPipeline支援執行期動態修改,因此存在兩種潛在的多執行緒併發訪問場景。

  1. IO執行緒和使用者業務執行緒的併發訪問;
  2. 使用者多個執行緒之間的併發訪問。

為了保證ChannelPipeline的執行緒安全性,需要通過執行緒安全容器或者鎖來保證併發訪問的安全,此處Netty直接使用了synchronized關鍵字,保證同步塊內的所有操作的原子性。在加入連結串列之前,Netty還會檢查該Handler是否已經被新增過及其名字是否有重複,如果該Handler不是共享的,而且被新增過丟擲異常,如果名字重複,也會丟擲異常。

完成連結串列操作後,後面的部分基本上做的就是一件事,執行方法callHandlerAdded0,只是根據條件不同進行同步或者非同步執行,callHandlerAdded0的主要作用是在handler新增被加入連結串列之後做一些額外工作,Netty本身對這個方法的實現,在ChannelHandlerAdapter類中,是個空操作,這裡如果我們需要做一些業務邏輯, 可以通過重寫該方法進行實現。

callHandlerAddedInEventLoop和callHandlerAdded0的區別:

  • callHandlerAdded0直接使用當前執行緒來回調handler.handlerAdd()方法。
  • callHandlerAddedInEventLoop是使用context中所攜帶的執行緒池來回調handler.handlerAdd()方法。

也就說如果addLast()方法是由內部EventLocp中的執行緒呼叫,那麼就會直接回調handler.handlerAdd()方法,如果是外部使用者執行緒呼叫,那麼就會放入到當前EventLocp的任務佇列中,等待其中的執行緒進行回撥。

inbound事件

當發生某個I/O事件的時候,例如鏈路建立、鏈路關閉、讀取操作完成等,都會產生一個事件,事件在pipeline中得到傳播和處理,它是事件處理的總入口。由於網路IO相關的事件有限,因此Netty對這些事件進行了統一抽象,Netty自身和使用者的ChannelHandler會對感興趣的事件進行攔截和處理。

pipeline中以fireXXX命名的方法都是從IO執行緒流向使用者業務Handler的inbound事件,它們的實現因功能而異,但是處理步驟類似,總結如下。

  1. 呼叫HeadHandler對應的fireXXX方法;
  2. 執行事件相關的邏輯操作。

inbound事件對應的方法如下:

ChannelPipeline fireChannelRegistered();

ChannelPipeline fireChannelUnregistered();

ChannelPipeline fireChannelActive();

ChannelPipeline fireChannelInactive();

ChannelPipeline fireExceptionCaught(Throwable cause);

ChannelPipeline fireUserEventTriggered(Object event);

ChannelPipeline fireChannelRead(Object msg);

ChannelPipeline fireChannelReadComplete();

ChannelPipeline fireChannelWritabilityChanged();

來看一下fireChannelActive()方法的原始碼:

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

可見pipeline的fireChannelActive對應的方法都是委託雙向連結中的head.invokeChannelActive()進行處理,實際上會觸發對應handler.channelActive(),其他方法也類似。

outbound事件

由使用者執行緒或者程式碼發起的IO操作被稱為outbound事件,事實上inbound和outbound是Netty自身根據事件在pipeline中的流向抽象出來的術語,在其他NIO框架中並沒有這個概念。outbound事件包括髮起繫結、發起連線、發起讀寫、重新整理資料、發起關閉、發起斷連等等。

Pipeline本身並不直接進行IO操作,最終都是由Unsafe和Channel來實現真正的IO操作的。Pipeline負責將IO事件通過TailHandler進行排程和傳播,最終呼叫Unsafe的IO方法進行I/O操作。

outbound事件對應的方法如下:

ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

ChannelFuture disconnect(ChannelPromise promise);

ChannelFuture close(ChannelPromise promise);

ChannelFuture deregister(ChannelPromise promise);

ChannelOutboundInvoker read();

ChannelFuture write(Object msg);

ChannelFuture write(Object msg, ChannelPromise promise);

ChannelOutboundInvoker flush();

ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

ChannelFuture writeAndFlush(Object msg);

來看一下write()方法的原始碼:

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

可見pipeline的出站對應的方法都是委託雙向連結串列中的tail進行處理。