Netty原始碼分析之ChannelPipeline(一)—ChannelPipeline的構造與初始化
Netty中ChannelPipeline實際上類似與一條資料管道,負責傳遞Channel中讀取的訊息,它本質上是基於責任鏈模式的設計與實現,無論是IO事件的攔截器,還是使用者自定義的ChannelHandler業務邏輯都做為一個個節點被新增到任務鏈上。
一、ChannelPipeline的設計與構成
ChannelPipeline中做為Netty中的資料管道,作用就是通過控制與聯通不同的ChannelHandler,傳遞Channel中的訊息。每一個Channel,都對應一個ChannelPipeline作為ChannelHandler的容器,而ChannelHandlerContext則把ChannelHandler的封裝成每個節點,以雙向連結串列方式在容器中存在;我們可以通過下圖簡單看下它們之間的關係。
1、ChannelHandler
使用過Netty的朋友們都清楚,ChannelHandler就是做為攔截器和業務處理邏輯的存在,它會處理Channel中讀寫的訊息;
首先看下ChannelHandler介面的定義
public interface ChannelHandler { //當ChannelHandler新增到Pipeline中時呼叫 void handlerAdded(ChannelHandlerContext ctx) throws Exception; //當ChannelHandler在Pipeline中被移除時呼叫 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; //在執行過程中 發生異常時呼叫 @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }
ChannelHandler在處理或攔截IO操作時,分為出站和入站兩個方向,對應channelde讀寫兩個操作,所以Netty中又從ChannelHandler中派生出入站ChannelInboundHandler和出站ChannelOutboundHandler兩個介面
ChannelInboundHandler處理入站資料以及各種狀態的變化,下面列出了ChannelInboundHandler中資料被接收或者Channel狀態發生變化時被呼叫的方法,這些方法和Channel的生命週期密切相關
public interface ChannelInboundHandler extends ChannelHandler { //當Channel註冊對應的EventLoop並且能夠處理I/O操作是被呼叫 void channelRegistered(ChannelHandlerContext ctx) throws Exception; //當Channel從它對應的EventLoop上登出,並且無法處理I/O操作是被呼叫 void channelUnregistered(ChannelHandlerContext ctx) throws Exception; //當Channel已經連線時被呼叫 void channelActive(ChannelHandlerContext ctx) throws Exception; //當Channel為非活動狀態,也就是斷開時被呼叫 void channelInactive(ChannelHandlerContext ctx) throws Exception; //當從Channel讀取資料時被呼叫 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; //當Channel的上一個讀資料完成後被呼叫 void channelReadComplete(ChannelHandlerContext ctx) throws Exception; //當呼叫fireUserEventTriggered方法時被呼叫 void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; //當Channel的可寫狀態發生改變時被呼叫。使用者可以確保寫操作不會完成太快 void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; @Override @SuppressWarnings("deprecation") //入站操作發生異常時呼叫 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
ChannelOutboundHandler處理出站操作和資料
public interface ChannelOutboundHandler extends ChannelHandler { //當請求將Channel繫結到本地地址時被呼叫 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; //當請求將Channel連線到遠端節點時被呼叫 void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; //當請求將Channel從遠端節點斷開時被呼叫 void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //當請求關閉Channel時被呼叫 void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //當請求從對應的EventLoop中登出時被呼叫 void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; //當請求從Channel讀取資料時被呼叫 void read(ChannelHandlerContext ctx) throws Exception; //當請求通過Channel將資料寫到遠端節點時被呼叫 void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; //當請求通過Channel將入佇列資料沖刷到遠端節點時被呼叫 void flush(ChannelHandlerContext ctx) throws Exception; }
2、ChannelHandlerContext
ChannelHandlerContext可以說是ChannelPipeline的核心,它代表了ChannelHandler和ChannelPipeline之間的關聯,我們首先要知道一個ChannelPipeline內部會維護一個雙向連結串列,每當一個ChannelHandler被新增到ChannelPipeline中時,它都會被包裝成為一個ChannelHandlerContext,組成連結串列的各個節點。
我們看下ChannelHandlerContext介面中定義的API介面
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { /** * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}. */ //每個ChannelHandlerContext都會對一個Channel Channel channel(); /** * Returns the {@link EventExecutor} which is used to execute an arbitrary task. */ //返回用於執行的EventExecutor任務 EventExecutor executor(); /** * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler} * was added to the {@link ChannelPipeline}. This name can also be used to access the registered * {@link ChannelHandler} from the {@link ChannelPipeline}. */ //返回定義的name名稱 String name(); /** * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}. */ ChannelHandler handler(); /** * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the * {@link EventLoop}. */ //如果繫結到ChannelPipeline的ChannelHandler被刪除,返回true boolean isRemoved(); //觸發下一個ChannelInboundHandler中fireChannelRegistered方法 @Override ChannelHandlerContext fireChannelRegistered(); //觸發下一個ChannelInboundHandler中fireChannelUnregistered方法 @Override ChannelHandlerContext fireChannelUnregistered(); //觸發下一個ChannelInboundHandler中fireChannelActive方法 @Override ChannelHandlerContext fireChannelActive(); //觸發下一個ChannelInboundHandler中fireChannelInactive方法 @Override ChannelHandlerContext fireChannelInactive(); //觸發下一個ChannelInboundHandler中fireExceptionCaught方法 @Override ChannelHandlerContext fireExceptionCaught(Throwable cause); //觸發下一個ChannelInboundHandler中fireUserEventTriggered方法 @Override ChannelHandlerContext fireUserEventTriggered(Object evt); //觸發下一個ChannelInboundHandler中fireChannelRead方法 @Override ChannelHandlerContext fireChannelRead(Object msg); //觸發下一個ChannelInboundHandler中fireChannelReadComplete方法 @Override ChannelHandlerContext fireChannelReadComplete(); //觸發下一個ChannelInboundHandler中fireChannelWritabilityChanged方法 @Override ChannelHandlerContext fireChannelWritabilityChanged(); //觸發下一個ChannelInboundHandler中channelRead方法,如果是最後一個ChannelInboundHandler,則讀取完成後觸發channelReadComplete @Override ChannelHandlerContext read(); //觸發下一個ChannelOutboundHandler中flush方法 @Override ChannelHandlerContext flush(); /** * Return the assigned {@link ChannelPipeline} */ ChannelPipeline pipeline(); /** * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s. */ //返回繫結該channel 的 ByteBufAllocator ByteBufAllocator alloc(); /** * @deprecated Use {@link Channel#attr(AttributeKey)} */ @Deprecated @Override //返回Attribute <T> Attribute<T> attr(AttributeKey<T> key); /** * @deprecated Use {@link Channel#hasAttr(AttributeKey)} */ @Deprecated @Override //是否包含指定的AttributeKey <T> boolean hasAttr(AttributeKey<T> key); }
二、ChannelPipeline的初始化
在AbstractChannel的建構函式中我們可以看到對ChannelPipeline的初始化
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();//初始化ChannelPipeline }
看下newChannelPipeline()內部的實現
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
在這裡建立了一個DefaultChannelPipeline 物件,並傳入Channel物件。DefaultChannelPipeline 實現了ChannelPipeline的介面
進入DefaultChannelPipeline類內部,看下其具體構造
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this);//定義一個頭部節點 head = new HeadContext(this);//定義一個尾部節點 //連線頭尾節點,構成雙向連結串列 head.next = tail; tail.prev = head; }
在這裡我們可以看到DefaultChannelPipeline內部通過宣告頭尾兩個Context節點物件,構建了一個雙向連結串列結構我們;其實這裡的TailContext與HeadContext都是ChannelHandlerContext介面的具體實現;
三、總結
通過上面的內容,我們可以看出ChannelPipeline就是一個用於攔截Channel入站和出站事件的ChannelHandler例項鏈,而ChannelHandlerContext就是這個例項鏈上的節點,每一個新建立的Channel都會被分配一個新的ChannelPipeline。這篇文章我們對ChannelPipeline的構造和設計進行了大概的總結,其中如有不足與不正確的地方還望指出與海涵。後面我會對ChannelPipeline中ChannelHandler的新增、刪除等具體操作與事件如何在管道中流通傳遞進行具體的分析。
關注微信公眾號,檢視更多技術文章。
&n