Netty 原始碼解讀(二)-ChannelPipeline、ChannelHandler、ChannelHandlerContext
1.ChannelPipeline、ChannelHandler、ChannelHandlerContext 的關係
1. 每建立一個Socket 就會分配一個全新的ChannelPipeline (簡稱pipeline)
2. 每一個 ChannelPipeline 內部包含多個 ChannelHandlerContext (簡稱Context)
3. 他們一起組成了一個雙向連結串列,這些Context 用於封裝我們呼叫addLast 時新增的Channelhandler(以下簡稱Handler)
也就是說ChannelSocket 和 ChannelPipeline 是一對一的關係,而pipeline內部的多個Context 行成了連結串列,Context 只是對Handler 的封裝。
當一個請求進來時會進入socket 對應的pipeline,並經過pipeline 所有的handler。 可以理解為過濾器模式。
2. 設計
1. ChannelPipeline 設計
該介面繼承了ChannelInboundInvoker、 ChannelOutboundInvoker、 Iterable 介面。 標識可以呼叫資料出站的方法和入站的方法, 同時也能遍歷內部的連結串列。
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
處理過程如下:
(1)入站事件由入站處理程式自下而上的方向處理。入站處理程式通常處理由地步的IO執行緒生成入站資料,入站資料通常從SocketChannel#read(ByteBuffer) 獲取
(2) 通常一個pipeline 有多個handler。例如一個典型的伺服器在每個通常的管道中都會有一下處理程式:
協議解碼器-將二進位制資料轉換為Java 物件;
協議編碼器-將java 物件轉換為二進位制資料
業務邏輯處理程式-執行實際業務邏輯
(3) 業務程式不能將執行緒阻塞,會影響IO的速度,進而影響整個Netty 程式的效能。如果業務程式很快,可以放在IO執行緒中,反之就需要非同步執行。 或者在新增handler的時候新增一個執行緒池。
2. ChannelHandler 作用以及設計
(1) 原始碼
public interface ChannelHandler { // 當把channelHandler 新增到pipeline 時被呼叫 void handlerAdded(ChannelHandlerContext ctx) throws Exception; // 當從pipeline 移除時呼叫 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; // 處理髮生異常時呼叫 @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }
(2) 作用: 作用是處理IO事件或攔截IO事件,並將其轉發給下一個handler。 handler 處理事件是分入站和出站的(入站是說讀取資料到程式處理的過程,出站是說寫出資料到呼叫核心write方法寫出去資料的過程)。兩個方向的操作都是不同的,因此,netty 定義了兩個子介面繼承ChannelHandler。
入站:ChannelInboundHandler
出站: ChannelOutboundHandler
入站出站都可以處理的handler:ChannelDuplexHandler
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
3. ChannelHandlerContext 作用
ChannelHandlerContext 同時繼承了 ChannelInboundInvoker, ChannelOutboundInvoker。ChannelHandlerContext 也 定義了一些自己的方法。這些方法能夠獲取Context 上下文環境的物件,比如channel、executor、handler、pipeline, 記憶體分配器,關聯的handler 是否被刪除等資訊。Context 就是包裝了handler相關的一切,以方便Contex 可以在pipeline 方便的操作handler。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
如下自身的方法:
3. 建立過程
1. 任何一個ChannelSocket 建立的同時都會建立一個pipeline
io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) 原始碼如下:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
可以看到連結串列有兩個偽節點(頭和尾)。
1》頭節點:
io.netty.channel.DefaultChannelPipeline.TailContext (入站處理handler)
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } 。。。 }
2》偽節點:
HeadContext 是一個入站和出站都兼顧的handler
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } 。。。 }
建立是在客戶端建立連線時:
2. 當用戶或系統內部呼叫pipeline的addXX 方法新增handler 時,都會建立一個包裝這handler 的Context
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
@Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
解釋:
1. io.netty.channel.DefaultChannelPipeline#checkMultiplicity 檢查該例項是否是共享的,如果不是並且已經被別的pipeline 使用了,則丟擲異常
2. 呼叫io.netty.channel.DefaultChannelPipeline#newContext 建立一個DefaultChannelHandlerContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
3. 呼叫io.netty.channel.DefaultChannelPipeline#addLast0 新增到鏈條尾部
4. 做一些其他處理
4. 呼叫過程
讀的時候從head開始, 寫的時候從tail 開始。
呼叫過程可以用下圖示識:
1. 入站讀取資料追蹤
io.netty.channel.DefaultChannelPipeline#fireChannelRead程式碼如下:
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
讀取資料時呼叫過程如下:
1》 繼續呼叫io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
2》io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRead(this, msg); } catch (Throwable var3) { this.invokeExceptionCaught(var3); } } else { this.fireChannelRead(msg); } }
這時候會呼叫handler的channelRead 方法。也就是具體的handler 的方法。
3》io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead 方法如下: 相當於沒做任何邏輯處理,直接呼叫下一個處理器處理
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
4》io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
io.netty.channel.AbstractChannelHandlerContext#findContextInbound 如下:(可以看到入站是找inbound屬性為true的context,然後繼續進行呼叫)
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
5》繼續呼叫io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)方法(同1》 一樣)
也就是如果希望pipeline 中的context 繼續處理,需要在handler中繼續呼叫 ctx.fireXXX 方法,比如io.netty.handler.logging.LoggingHandler#channelRead
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "READ", msg)); } ctx.fireChannelRead(msg); }
2. 出站資料跟蹤
1》io.netty.channel.DefaultChannelPipeline#write(java.lang.Object, io.netty.channel.ChannelPromise)
public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); }
2》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; }
3》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
4》io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
5》io.netty.channel.AbstractChannelHandlerContext#invokeWrite0 這裡呼叫handler的write 方法
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
6》handler 處理完成如果需要繼續處理呼叫ctx.write(msg, promise); 會重新呼叫到io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
【當你用心寫完每一篇部落格之後,你會發現它比你用程式碼實現功能更有成就感!】