Netty學習:ChannelPipeline
一個{@link ChannelHandler}的列表,它處理或攔截{@link Channel}的入站事件和出站操作。
建立管道
每個通道都有自己的管道,在建立新通道時自動建立管道。
事件如何在管道中流動
下圖描述了在{@link ChannelPipeline}中{@link ChannelHandler}s如何處理I/O事件。I/O事件由{@link ChannelInboundHandler}或{@link ChannelOutboundHandler}處理,並通過呼叫{@link ChannelHandlerContext}中定義的事件傳播方法(例如{@link ChannelHandlerContext#fireChannelRead(Object)}和{@link ChannelHandlerContext#write(Object)})轉發到最近的處理程式。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
入站事件由入站處理程式按照自底向上的方向處理,如圖左側所示。入站處理程式通常處理由圖底部的I/O執行緒生成的入站資料。入站資料通常通過實際輸入操作從遠端對等點讀取,例如{@link SocketChannel#read(ByteBuffer)}。如果入站事件超出了頂級入站處理程式,則會悄無聲息地丟棄該事件,或者在需要您注意時記錄該事件。
出站事件由出站處理程式按照自頂向下的方向處理,如圖右側所示。出站處理程式通常生成或轉換出站通訊流,如寫請求。如果出站事件超出底部出站處理程式,則由與 {@link Channel}關聯的I/O執行緒處理。I/O執行緒通常執行實際的輸出操作,例如{@link SocketChannel#write(ByteBuffer)}。
例如,假設我們建立了以下管道:
* p.addLast("1", new InboundHandlerA());
* p.addLast("2", new InboundHandlerB());
* p.addLast("3", new OutboundHandlerA());
* p.addLast("4", new OutboundHandlerB());
* p.addLast("5", new InboundOutboundHandlerX());
在上面的示例中,名稱以{@code Inbound}開頭的類意味著它是入站處理程式。名稱以{@code Outbound}開頭的類意味著它是一個出站處理程式。
在給定的示例配置中,當事件進入入站時,處理程式的評估順序是1、2、3、4、5。當事件出站時,順序是5,4,3,2,1。在此原則之上,{@link ChannelPipeline}跳過了對某些處理程式的評估,以縮短堆疊深度:
- 3和4沒有實現{@link ChannelInboundHandler},因此入站事件的實際計算順序是:1、2和5。
- 1和2沒有實現{@link ChannelOutboundHandler},因此出站事件的實際計算順序是:5、4和3。
- 如果5同時實現{@link ChannelInboundHandler}和{@link ChannelOutboundHandler},則入站和出站事件的計算順序分別為125和543。
將事件轉發到下一個處理程式
正如您在圖中可能注意到的,處理程式必須呼叫{@link ChannelHandlerContext}中的事件傳播方法,以便將事件轉發給下一個處理程式。這些方法包括:
入站事件傳播方法:
- {@link ChannelHandlerContext#fireChannelRegistered()}
- {@link ChannelHandlerContext#fireChannelActive()}
- {@link ChannelHandlerContext#fireChannelRead(Object)}
- {@link ChannelHandlerContext#fireChannelReadComplete()}
- {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
- {@link ChannelHandlerContext#fireUserEventTriggered(Object)}
- {@link ChannelHandlerContext#fireChannelWritabilityChanged()}
- {@link ChannelHandlerContext#fireChannelInactive()}
- {@link ChannelHandlerContext#fireChannelUnregistered()}
出站事件傳播方法:
- {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}
- {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}
- {@link ChannelHandlerContext#write(Object, ChannelPromise)}
- {@link ChannelHandlerContext#flush()}
- {@link ChannelHandlerContext#read()}
- {@link ChannelHandlerContext#disconnect(ChannelPromise)}
- {@link ChannelHandlerContext#close(ChannelPromise)}
- {@link ChannelHandlerContext#deregister(ChannelPromise)}
下面的例子展示了事件傳播通常是如何完成的:
* public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} {
* {@code @Override}
* public void channelActive({@link ChannelHandlerContext} ctx) {
* System.out.println("Connected!");
* ctx.fireChannelActive();
* }
* }
*
* public class MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} {
* {@code @Override}
* public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) {
* System.out.println("Closing ..");
* ctx.close(promise);
* }
* }
建立一個管道
使用者應該在管道中有一個或多個{@link ChannelHandler}來接收I/O事件(例如讀取)和請求I/O操作(例如寫入和關閉)。例如,一個典型的伺服器在每個通道的管道中都有以下處理程式,但是根據協議和業務邏輯的複雜性和特徵,您的里程可能會有所不同:
- 協議解碼器——將二進位制資料(例如{@link ByteBuf})轉換為Java物件。
- 協議編碼器——將Java物件轉換為二進位制資料。
- 業務邏輯Handler——執行實際的業務邏輯(例如資料庫訪問)。
可以表示為,如下例所示:
* static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
* ...
*
* {@link ChannelPipeline} pipeline = ch.pipeline();
*
* pipeline.addLast("decoder", new MyProtocolDecoder());
* pipeline.addLast("encoder", new MyProtocolEncoder());
//告訴管道在與I/O執行緒不同的執行緒中執行MyBusinessLogicHandler的事件處理程式方法,這樣I/O執行緒就不會被耗時的任務阻塞。
//如果您的業務邏輯是完全非同步的,或者非常快地完成,則不需要指定組。
* pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
執行緒安全
可以在任何時候新增或刪除{@link ChannelHandler},因為{@link ChannelPipeline}是執行緒安全的。例如,您可以在即將交換敏感資訊時插入加密處理程式,並在交換後刪除它。
ChannelPipeline addFirst(String name, ChannelHandler handler);在管道的第一個位置插入一個{@link ChannelHandler}。
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);在管道的第一個位置插入一個{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline addLast(String name, ChannelHandler handler);在管道的最後一個位置追加一個{@link ChannelHandler}。
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);在管道的最後一個位置追加一個{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);在該管道的現有handler之前插入{@link ChannelHandler}。
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);在該管道的現有handler之前插入{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);在該管道的現有處理程式之後插入{@link ChannelHandler}。
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);在該管道的現有處理程式之後插入{@link ChannelHandler}。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline addFirst(ChannelHandler... handlers);在管道的第一個位置插入{@link ChannelHandler}s。
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);在管道的第一個位置插入{@link ChannelHandler}s。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline addLast(ChannelHandler... handlers);在管道的最後一個位置插入{@link ChannelHandler}s。
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);在管道的最後一個位置插入{@link ChannelHandler}s。{@link EventExecutorGroup}將用於執行{@link ChannelHandler}方法
ChannelPipeline remove(ChannelHandler handler);從該管道中刪除指定的{@link ChannelHandler}。
ChannelHandler remove(String name);從管道中刪除具有指定名稱的{@link ChannelHandler}。
<T extends ChannelHandler> T remove(Class<T> handlerType);從管道中刪除指定型別的{@link ChannelHandler}。
ChannelHandler removeFirst();刪除管道中的第一個{@link ChannelHandler}。
ChannelHandler removeLast();刪除此管道中的最後一個{@link ChannelHandler}。
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);用管道中的newHandler替換指定的{@link ChannelHandler}。
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);用管道中的新處理程式替換指定名稱的{@link ChannelHandler}。
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);用管道中的新處理程式替換指定型別的{@link ChannelHandler}。
ChannelHandler first();返回此管道中的第一個{@link ChannelHandler}。
ChannelHandlerContext firstContext();返回管道中第一個{@link ChannelHandler}的上下文。
ChannelHandler last();返回此管道中的最後一個{@link ChannelHandler}。
ChannelHandlerContext lastContext();返回此管道中最後一個{@link ChannelHandler}的上下文。
ChannelHandler get(String name);返回管道中指定名稱的{@link ChannelHandler}。
<T extends ChannelHandler> T get(Class<T> handlerType);返回管道中指定型別的{@link ChannelHandler}。
ChannelHandlerContext context(ChannelHandler handler);返回此管道中指定的{@link ChannelHandler}的上下文物件。
ChannelHandlerContext context(String name);返回管道中指定名稱的{@link ChannelHandler}的上下文物件。
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);返回管道中指定型別的{@link ChannelHandler}的上下文物件。
Channel channel();返回管道所連線的 {@link Channel}
List<String> names();返回handler的名稱LIST
Map<String, ChannelHandler> toMap();將此管道轉換為有序的{@link對映},其鍵為處理程式名稱,其值為處理程式。