Netty中的ChannelPipeline原始碼分析
ChannelPipeline在Netty中是用來處理請求的責任鏈,預設實現是DefaultChannelPipeline,其構造方法如下:
1 private final Channel channel; 2 private final ChannelFuture succeededFuture; 3 private final VoidChannelPromise voidPromise; 4 final AbstractChannelHandlerContext head; 5 final AbstractChannelHandlerContext tail; 6 7 protected DefaultChannelPipeline(Channel channel) { 8 this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel"); 9 this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null); 10 this.voidPromise = new VoidChannelPromise(channel, true); 11 this.tail = new DefaultChannelPipeline.TailContext(this); 12 this.head = new DefaultChannelPipeline.HeadContext(this); 13 this.head.next = this.tail; 14 this.tail.prev = this.head; 15 }
ChannelPipeline和Channel是一一對應關係,一個Channel繫結一條ChannelPipeline責任鏈
succeededFuture 和voidPromise用來處理非同步操作
AbstractChannelHandlerContext 是持有請求的上下文物件,其和ChannelHandler是對應關係(在使用Sharable註解的情況下,不同的AbstractChannelHandlerContext 還可以對應同一個ChannelHandler),ChannelPipeline責任鏈
處理的就AbstractChannelHandlerContext ,再將最後的AbstractChannelHandlerContext 交給ChannelHandler去做正真的邏輯處理
AbstractChannelHandlerContext構造方法如下:
1 private final String name; 2 private final DefaultChannelPipeline pipeline; 3 final EventExecutor executor; 4 private final boolean inbound; 5 private final boolean outbound; 6 private final boolean ordered; 7 volatile AbstractChannelHandlerContext next; 8 volatile AbstractChannelHandlerContext prev; 9 10 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { 11 this.name = (String)ObjectUtil.checkNotNull(name, "name"); 12 this.pipeline = pipeline; 13 this.executor = executor; 14 this.inbound = inbound; 15 this.outbound = outbound; 16 this.ordered = executor == null || executor instanceof OrderedEventExecutor; 17 }
name是AbstractChannelHandlerContext的名稱,pipeline就是上面說的ChannelPipeline;executor是用來進行非同步操作的,預設使用的是在前面部落格中說過的NioEventLoop (Netty中NioEventLoopGroup的建立原始碼分析)
inbound 和outbound 代表兩種請求處理方式,對應Netty中的I/O操作,若是inbound則處理Input操作,由ChannelPipeline從head 開始向後遍歷連結串列,並且只處理ChannelInboundHandler型別的AbstractChannelHandlerContext;若是outbound 則處理Output操作,由ChannelPipeline從tail開始向前遍歷連結串列,並且只處理ChannelOutboundHandler型別的AbstractChannelHandlerContext;
ordered 是判斷是否需要提供executor。
由next和prev成員可以知道,ChannelPipeline維護的是一條AbstractChannelHandlerContext的雙向連結串列
其頭節點head和尾節點tail分別預設初始化了HeadContext和TailContext
HeadContext的構造:
1 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { 2 private final Unsafe unsafe; 3 4 HeadContext(DefaultChannelPipeline pipeline) { 5 super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true); 6 this.unsafe = pipeline.channel().unsafe(); 7 this.setAddComplete(); 8 } 9 }
其中setAddComplete是由AbstractChannelHandlerContext實現的:
1 final void setAddComplete() { 2 int oldState; 3 do { 4 oldState = this.handlerState; 5 } while(oldState != 3 && !HANDLER_STATE_UPDATER.compareAndSet(this, oldState, 2)); 6 7 }
handlerState表示AbstractChannelHandlerContext對應的ChannelHandler的狀態,有一下幾種:
1 private static final int ADD_PENDING = 1; 2 private static final int ADD_COMPLETE = 2; 3 private static final int REMOVE_COMPLETE = 3; 4 private static final int INIT = 0; 5 private volatile int handlerState = 0;
handlerState初始化預設是INIT狀態。
HANDLER_STATE_UPDATER是一個原子更新器:
1 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
所以setAddComplete方法,就是通過CAS操作,將handlerState狀態更新為ADD_COMPLETE
TailContext的構造:
1 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { 2 TailContext(DefaultChannelPipeline pipeline) { 3 super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false); 4 this.setAddComplete(); 5 } 6 }
和HeadContext一樣,將handlerState狀態更新為ADD_COMPLETE
結合官方給出的ChannelPipeline的圖示更容易理解:
1 I/O Request 2 via Channel or 3 ChannelHandlerContext 4 | 5 +---------------------------------------------------+---------------+ 6 | ChannelPipeline | | 7 | \|/ | 8 | +---------------------+ +-----------+----------+ | 9 | | Inbound Handler N | | Outbound Handler 1 | | 10 | +----------+----------+ +-----------+----------+ | 11 | /|\ | | 12 | | \|/ | 13 | +----------+----------+ +-----------+----------+ | 14 | | Inbound Handler N-1 | | Outbound Handler 2 | | 15 | +----------+----------+ +-----------+----------+ | 16 | /|\ . | 17 | . . | 18 | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| 19 | [ method call] [method call] | 20 | . . | 21 | . \|/ | 22 | +----------+----------+ +-----------+----------+ | 23 | | Inbound Handler 2 | | Outbound Handler M-1 | | 24 | +----------+----------+ +-----------+----------+ | 25 | /|\ | | 26 | | \|/ | 27 | +----------+----------+ +-----------+----------+ | 28 | | Inbound Handler 1 | | Outbound Handler M | | 29 | +----------+----------+ +-----------+----------+ | 30 | /|\ | | 31 +---------------+-----------------------------------+---------------+ 32 | \|/ 33 +---------------+-----------------------------------+---------------+ 34 | | | | 35 | [ Socket.read() ] [ Socket.write() ] | 36 | | 37 | Netty Internal I/O Threads (Transport Implementation) | 38 +-------------------------------------------------------------------+
下面對一些主要方法分析:
addFirst方法,有如下幾種過載:
1 public final ChannelPipeline addFirst(ChannelHandler handler) { 2 return this.addFirst((String)null, (ChannelHandler)handler); 3 } 4 5 public final ChannelPipeline addFirst(String name, ChannelHandler handler) { 6 return this.addFirst((EventExecutorGroup)null, name, handler); 7 } 8 9 public final ChannelPipeline addFirst(ChannelHandler... handlers) { 10 return this.addFirst((EventExecutorGroup)null, (ChannelHandler[])handlers); 11 } 12 13 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { 14 if (handlers == null) { 15 throw new NullPointerException("handlers"); 16 } else if (handlers.length != 0 && handlers[0] != null) { 17 int size; 18 for(size = 1; size < handlers.length && handlers[size] != null; ++size) { 19 ; 20 } 21 22 for(int i = size - 1; i >= 0; --i) { 23 ChannelHandler h = handlers[i]; 24 this.addFirst(executor, (String)null, h); 25 } 26 27 return this; 28 } else { 29 return this; 30 } 31 } 32 33 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { 34 final AbstractChannelHandlerContext newCtx; 35 synchronized(this) { 36 checkMultiplicity(handler); 37 name = this.filterName(name, handler); 38 newCtx = this.newContext(group, name, handler); 39 this.addFirst0(newCtx); 40 if (!this.registered) { 41 newCtx.setAddPending(); 42 this.callHandlerCallbackLater(newCtx, true); 43 return this; 44 } 45 46 EventExecutor executor = newCtx.executor(); 47 if (!executor.inEventLoop()) { 48 newCtx.setAddPending(); 49 executor.execute(new Runnable() { 50 public void run() { 51 DefaultChannelPipeline.this.callHandlerAdded0(newCtx); 52 } 53 }); 54 return this; 55 } 56 } 57 58 this.callHandlerAdded0(newCtx); 59 return this; 60 }
前面幾種都是間接呼叫的第四種沒什麼好說的,直接看第四種addFirst
首先呼叫checkMultiplicity,檢查ChannelHandlerAdapter在不共享的情況下是否重複:
1 private static void checkMultiplicity(ChannelHandler handler) { 2 if (handler instanceof ChannelHandlerAdapter) { 3 ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler; 4 if (!h.isSharable() && h.added) { 5 throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); 6 } 7 8 h.added = true; 9 } 10 11 }
isSharable方法:
1 public boolean isSharable() { 2 Class<?> clazz = this.getClass(); 3 Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); 4 Boolean sharable = (Boolean)cache.get(clazz); 5 if (sharable == null) { 6 sharable = clazz.isAnnotationPresent(Sharable.class); 7 cache.put(clazz, sharable); 8 } 9 10 return sharable; 11 }
首先嚐試從當前執行緒的InternalThreadLocalMap中獲取handlerSharableCache,(InternalThreadLocalMap是在Netty中使用高效的FastThreadLocal替代JDK的ThreadLocal使用的 Netty中FastThreadLocal原始碼分析)
InternalThreadLocalMap的handlerSharableCache方法:
1 public Map<Class<?>, Boolean> handlerSharableCache() { 2 Map<Class<?>, Boolean> cache = this.handlerSharableCache; 3 if (cache == null) { 4 this.handlerSharableCache = (Map)(cache = new WeakHashMap(4)); 5 } 6 7 return (Map)cache; 8 }
噹噹前執行緒的InternalThreadLocalMap中沒有handlerSharableCache時,直接建立一個大小為4的WeakHashMap弱引用Map;
根據clazz從map中get,若是沒有,需要檢測當前clazz是否有Sharable註解,添加了Sharable註解的ChannelHandlerAdapter可以在不同Channel中共享使用一個單例,前提是確保執行緒安全;
之後會將該clazz以及是否實現Sharable註解的情況新增在cache快取中;
其中ChannelHandler的added是用來標識是否新增過;
回到addFirst方法:
checkMultiplicity成功結束後,呼叫filterName方法,給當前要產生的AbstractChannelHandlerContext物件產生一個名稱,
然後呼叫newContext方法,產生AbstractChannelHandlerContext物件:
1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { 2 return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler); 3 }
這裡實際上產生了一個DefaultChannelHandlerContext物件:
1 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { 2 private final ChannelHandler handler; 3 4 DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { 5 super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); 6 if (handler == null) { 7 throw new NullPointerException("handler"); 8 } else { 9 this.handler = handler; 10 } 11 } 12 13 public ChannelHandler handler() { 14 return this.handler; 15 } 16 17 private static boolean isInbound(ChannelHandler handler) { 18 return handler instanceof ChannelInboundHandler; 19 } 20 21 private static boolean isOutbound(ChannelHandler handler) { 22 return handler instanceof ChannelOutboundHandler; 23 } 24 }
可以看到DefaultChannelHandlerContext 僅僅是將AbstractChannelHandlerContext和ChannelHandler封裝了
在產生了DefaultChannelHandlerContext 物件後,呼叫addFirst0方法:
1 private void addFirst0(AbstractChannelHandlerContext newCtx) { 2 AbstractChannelHandlerContext nextCtx = this.head.next; 3 newCtx.prev = this.head; 4 newCtx.next = nextCtx; 5 this.head.next = newCtx; 6 nextCtx.prev = newCtx; 7 }
這裡就是一個簡單的雙向連結串列的操作,將newCtx節點插入到了head後面
然後判斷registered成員的狀態:
1 private boolean registered;
在初始化時是false
registered若是false,首先呼叫AbstractChannelHandlerContext的setAddPending方法:
1 final void setAddPending() { 2 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, 0, 1); 3 4 assert updated; 5 6 }
和前面說過的setAddComplete方法同理,通過CAS操作,將handlerState狀態設定為ADD_PENDING
接著呼叫callHandlerCallbackLater方法:
1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { 2 assert !this.registered; 3 4 DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx); 5 DefaultChannelPipeline.PendingHandlerCallback pending = this.pendingHandlerCallbackHead; 6 if (pending == null) { 7 this.pendingHandlerCallbackHead = (DefaultChannelPipeline.PendingHandlerCallback)task; 8 } else { 9 while(pending.next != null) { 10 pending = pending.next; 11 } 12 13 pending.next = (DefaultChannelPipeline.PendingHandlerCallback)task; 14 } 15 16 }
首先斷言判斷registered可能存在的多執行緒改變,然後根據added判斷產生何種型別的PendingHandlerCallback
PendingHandlerCallback是用來處理ChannelHandler的兩種回撥,定義如下:
1 private abstract static class PendingHandlerCallback implements Runnable { 2 final AbstractChannelHandlerContext ctx; 3 DefaultChannelPipeline.PendingHandlerCallback next; 4 5 PendingHandlerCallback(AbstractChannelHandlerContext ctx) { 6 this.ctx = ctx; 7 } 8 9 abstract void execute(); 10 }
PendingHandlerAddedTask定義如下:
1 private final class PendingHandlerAddedTask extends DefaultChannelPipeline.PendingHandlerCallback { 2 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { 3 super(ctx); 4 } 5 6 public void run() { 7 DefaultChannelPipeline.this.callHandlerAdded0(this.ctx); 8 } 9 10 void execute() { 11 EventExecutor executor = this.ctx.executor(); 12 if (executor.inEventLoop()) { 13 DefaultChannelPipeline.this.callHandlerAdded0(this.ctx); 14 } else { 15 try { 16 executor.execute(this); 17 } catch (RejectedExecutionException var3) { 18 if (DefaultChannelPipeline.logger.isWarnEnabled()) { 19 DefaultChannelPipeline.logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3}); 20 } 21 22 DefaultChannelPipeline.remove0(this.ctx); 23 this.ctx.setRemoved(); 24 } 25 } 26 27 } 28 }
除去異常處理,無論是在execute方法還是在run方法中,主要核心是非同步執行callHandlerAdded0方法:
1 private void callHandlerAdded0(AbstractChannelHandlerContext ctx) { 2 try { 3 ctx.setAddComplete(); 4 ctx.handler().handlerAdded(ctx); 5 } catch (Throwable var10) { 6 boolean removed = false; 7 8 try { 9 remove0(ctx); 10 11 try { 12 ctx.handler().handlerRemoved(ctx); 13 } finally { 14 ctx.setRemoved(); 15 } 16 17 removed = true; 18 } catch (Throwable var9) { 19 if (logger.isWarnEnabled()) { 20 logger.warn("Failed to remove a handler: " + ctx.name(), var9); 21 } 22 } 23 24 if (removed) { 25 this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", var10)); 26 } else { 27 this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", var10)); 28 } 29 } 30 31 }
除去異常處理,主要核心就兩行程式碼,首先通過setAddComplete方法,設定handlerState狀態為ADD_COMPLETE,然後回撥ChannelHandler的handlerAdded方法,這個handlerAdded方法就很熟悉了,在使用Netty處理業務邏輯時,會覆蓋這個方法。
PendingHandlerRemovedTask定義如下:
1 private final class PendingHandlerRemovedTask extends DefaultChannelPipeline.PendingHandlerCallback { 2 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) { 3 super(ctx); 4 } 5 6 public void run() { 7 DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx); 8 } 9 10 void execute() { 11 EventExecutor executor = this.ctx.executor(); 12 if (executor.inEventLoop()) { 13 DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx); 14 } else { 15 try { 16 executor.execute(this); 17 } catch (RejectedExecutionException var3) { 18 if (DefaultChannelPipeline.logger.isWarnEnabled()) { 19 DefaultChannelPipeline.logger.warn("Can't invoke handlerRemoved() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3}); 20 } 21 22 this.ctx.setRemoved(); 23 } 24 } 25 26 } 27 }
和PendingHandlerAddedTask一樣,主要還是非同步呼叫callHandlerRemoved0方法:
1 private void callHandlerRemoved0(AbstractChannelHandlerContext ctx) { 2 try { 3 try { 4 ctx.handler().handlerRemoved(ctx); 5 } finally { 6 ctx.setRemoved(); 7 } 8 } catch (Throwable var6) { 9 this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", var6)); 10 } 11 12 }
首先直接回調ChannelHandler的handlerRemoved方法,然後通過setRemoved方法將handlerState狀態設定為REMOVE_COMPLETE
回到callHandlerCallbackLater,其中成員pendingHandlerCallbackHead定義:
1 private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
結合PendingHandlerCallback 可知,這個pendingHandlerCallbackHead是 DefaultChannelPipeline儲存的一條PendingHandlerCallback單鏈表,用來處理ChannelHandler的handlerAdded和handlerRemoved的回撥,在add的這些方法裡呼叫callHandlerCallbackLater時,added引數都為true,所以add的ChannelHandler只向pendingHandlerCallbackHead添加了handlerAdded的回撥。
回到addFirst方法,若是registered為true,先獲取EventExecutor,判斷是否處於輪詢中,若不是,則需要開啟輪詢執行緒直接非同步執行callHandlerAdded0方法,若處於輪詢,由於ChannelPipeline的呼叫是發生在輪詢時的,所以還是直接非同步執行callHandlerAdded0方法。
addFirst方法到此結束,再來看addLast方法,同樣有好幾種過載:
1 public final ChannelPipeline addLast(ChannelHandler handler) { 2 return this.addLast((String)null, (ChannelHandler)handler); 3 } 4 5 public final ChannelPipeline addLast(String name, ChannelHandler handler) { 6 return this.addLast((EventExecutorGroup)null, name, handler); 7 } 8 9 public final ChannelPipeline addLast(ChannelHandler... handlers) { 10 return this.addLast((EventExecutorGroup)null, (ChannelHandler[])handlers); 11 } 12 13 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { 14 if (handlers == null) { 15 throw new NullPointerException("handlers"); 16 } else { 17 ChannelHandler[] var3 = handlers; 18 int var4 = handlers.length; 19 20 for(int var5 = 0; var5 < var4; ++var5) { 21 ChannelHandler h = var3[var5]; 22 if (h == null) { 23 break; 24 } 25 26 this.addLast(executor, (String)null, h); 27 } 28 29 return this; 30 } 31 } 32 33 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 34 final AbstractChannelHandlerContext newCtx; 35 synchronized(this) { 36 checkMultiplicity(handler); 37 newCtx = this.newContext(group, this.filterName(name, handler), handler); 38 this.addLast0(newCtx); 39 if (!this.registered) { 40 newCtx.setAddPending(); 41 this.callHandlerCallbackLater(newCtx, true); 42 return this; 43 } 44 45 EventExecutor executor = newCtx.executor(); 46 if (!executor.inEventLoop()) { 47 newCtx.setAddPending(); 48 executor.execute(new Runnable() { 49 public void run() { 50 DefaultChannelPipeline.this.callHandlerAdded0(newCtx); 51 } 52 }); 53 return this; 54 } 55 } 56 57 this.callHandlerAdded0(newCtx); 58 return this; 59 }
還是間接呼叫最後一種:
對比addFirst來看,只有addLast0不一樣:
1 private void addLast0(AbstractChannelHandlerContext newCtx) { 2 AbstractChannelHandlerContext prev = this.tail.prev; 3 newCtx.prev = prev; 4 newCtx.next = this.tail; 5 prev.next = newCtx; 6 this.tail.prev = newCtx; 7 }
還是非常簡單的雙向連結串列基本操作,只不過這次,是將AbstractChannelHandlerContext插入到了tail之前
還有兩個,addBefore和addAfter方法,和上述方法類似,就不再累贅
接下來看看ChannelPipeline是如何完成請求的傳遞的:
invokeHandlerAddedIfNeeded方法:
1 final void invokeHandlerAddedIfNeeded() { 2 assert this.channel.eventLoop().inEventLoop(); 3 4 if (this.firstRegistration) { 5 this.firstRegistration = false; 6 this.callHandlerAddedForAllHandlers(); 7 } 8 9 }
斷言判斷是否處於輪詢執行緒(ChannelPipeline處理請求都是在輪詢執行緒中,都需要非同步處理)
其中firstRegistration成員在DefaultChannelPipeline初始化時為true:
1 private boolean firstRegistration = true;
此時設定為false,表示第一次呼叫,以後都不再呼叫後面的callHandlerAddedForAllHandlers:
1 private void callHandlerAddedForAllHandlers() { 2 DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead; 3 synchronized(this) { 4 assert !this.registered; 5 6 this.registered = true; 7 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; 8 this.pendingHandlerCallbackHead = null; 9 } 10 11 for(DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead; task != null; task = task.next) { 12 task.execute(); 13 } 14 15 }
剛才說過registered初始是false,在這裡判斷符合,之後就令其為true,然後獲取處理ChannelHandler的回撥連結串列pendingHandlerCallbackHead,並且將pendingHandlerCallbackHead置為null
然後遍歷這個單鏈表,處理ChannelHandler的handlerAdded和handlerRemoved的回撥
fireChannelRegistered方法,當Channel完成了向Selector的註冊後,會由channel的Unsafe進行回撥,非同步處理:
1 public final ChannelPipeline fireChannelRegistered() { 2 AbstractChannelHandlerContext.invokeChannelRegistered(this.head); 3 return this; 4 }
實際上的處理由AbstractChannelHandlerContext的靜態方法invokeChannelRegistered完成,這裡傳遞的引數head就是DefaultChannelPipeline初始化時建立的HeadContext:
1 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { 2 EventExecutor executor = next.executor(); 3 if (executor.inEventLoop()) { 4 next.invokeChannelRegistered(); 5 } else { 6 executor.execute(new Runnable() { 7 public void run() { 8 next.invokeChannelRegistered(); 9 } 10 }); 11 } 12 13 }
可以看到實際上是非同步執行head物件的invokeChannelRegistered方法:
1 private void invokeChannelRegistered() { 2 if (this.invokeHandler()) { 3 try { 4 ((ChannelInboundHandler)this.handler()).channelRegistered(this); 5 } catch (Throwable var2) { 6 this.notifyHandlerException(var2); 7 } 8 } else { 9 this.fireChannelRegistered(); 10 } 11 12 }
其中invokeHandler是用來判斷當前的handlerState狀態:
1 private boolean invokeHandler() { 2 int handlerState = this.handlerState; 3 return handlerState == 2 || !this.ordered && handlerState == 1; 4 }
若是當前handlerState狀態為ADD_COMPLETE,或者不需要提供EventExecutor並且狀態為ADD_PENDING時返回true,否則返回false
在成立的情況下,呼叫ChannelInboundHandler的channelRegistered方法,由於當前是head,所以由HeadContext實現了:
1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 2 DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded(); 3 ctx.fireChannelRegistered(); 4 }
首先呼叫invokeHandlerAddedIfNeeded,處理ChannelHandler的handlerAdded和handlerRemoved的回撥
然後呼叫ctx的fireChannelRegistered方法:
1 public ChannelHandlerContext fireChannelRegistered() { 2 invokeChannelRegistered(this.findContextInbound()); 3 return this; 4 }
findContextInbound方法,用來找出下一個ChannelInboundInvoker:
1 private AbstractChannelHandlerContext findContextInbound() { 2 AbstractChannelHandlerContext ctx = this; 3 4 do { 5 ctx = ctx.next; 6 } while(!ctx.inbound); 7 8 return ctx; 9 }
從當前節點向後遍歷,inbound之前說過,該方法就是找到下一個ChannelInboundInvoker的型別的AbstractChannelHandlerContext,然後呼叫靜態方法invokeChannelRegistered,重複上述操作,若是在ChannelInboundHandler中沒有重寫channelRegistered方法,會一直執直到完所有ChannelHandler的channelRegistered方法。
ChannelInboundHandlerAdapter中的預設channelRegistered方法:
1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 2 ctx.fireChannelRegistered(); 3 }
比HeadContext中的實現還簡單,直接呼叫fireChannelRegistered向後傳遞
fireChannelRead方法,是在Selector輪循到讀事件就緒,會由channel的Unsafe進行回撥,非同步處理:
1 public final ChannelPipeline fireChannelRead(Object msg) { 2 AbstractChannelHandlerContext.invokeChannelRead(this.head, msg); 3 return this; 4 }
還是從head開始呼叫AbstractChannelHandlerContext的靜態方法invokeChannelRead:
1 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { 2 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); 3 EventExecutor executor = next.executor(); 4 if (executor.inEventLoop()) { 5 next.invokeChannelRead(m); 6 } else { 7 executor.execute(new Runnable() { 8 public void run() { 9 next.invokeChannelRead(m); 10 } 11 }); 12 } 13 14 }
和上面一個邏輯非同步呼叫AbstractChannelHandlerContext物件的invokeChannelRead方法:
1 private void invokeChannelRead(Object msg) { 2 if (this.invokeHandler()) { 3 try { 4 ((ChannelInboundHandler)this.handler()).channelRead(this, msg); 5 } catch (Throwable var3) { 6 this.notifyHandlerException(var3); 7 } 8 } else { 9 this.fireChannelRead(msg); 10 } 11 12 }
這裡也和上面一樣,呼叫了HeadContext的channelRead方法:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 ctx.fireChannelRead(msg); 3 }
這裡直接不處理,呼叫ChannelHandlerContext 的fireChannelRead方法:
1 public ChannelHandlerContext fireChannelRead(Object msg) { 2 invokeChannelRead(this.findContextInbound(), msg); 3 return this; 4 }
和之前註冊一樣,選擇下一個ChannelInboundHandler,重複執行上述操作。
再來看到writeAndFlush方法,和上面的就不太一樣,這個發生在輪詢前,使用者通過channel來間接呼叫,在AbstractChannel中實現:
1 public ChannelFuture writeAndFlush(Object msg) { 2 return this.pipeline.writeAndFlush(msg); 3 }
實際上直接呼叫了DefaultChannelPipeline的writeAndFlush方法:
1 public final ChannelFuture writeAndFlush(Object msg) { 2 return this.tail.writeAndFlush(msg); 3 }
這裡又有些不一樣了,呼叫了tail的writeAndFlush方法,即TailContext的writeAndFlush,在AbstractChannelHandlerContext中實現:
1 public ChannelFuture writeAndFlush(Object msg) { 2 return this.writeAndFlush(msg, this.newPromise()); 3 }
newPromise產生了一個ChannelPromise,用來處理非同步事件的;實際上呼叫了writeAndFlush的過載:
1 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { 2 if (msg == null) { 3 throw new NullPointerException("msg"); 4 } else if (this.isNotValidPromise(promise, true)) { 5 ReferenceCountUtil.release(msg); 6 return promise; 7 } else { 8 this.write(msg, true, promise); 9 return promise; 10 } 11 }
繼續呼叫write方法:
1 private void write(Object msg, boolean flush, ChannelPromise promise) { 2 AbstractChannelHandlerContext next = this.findContextOutbound(); 3 Object m = this.pipeline.touch(msg, next); 4 EventExecutor executor = next.executor(); 5 if (executor.inEventLoop()) { 6 if (flush) { 7 next.invokeWriteAndFlush(m, promise); 8 } else { 9 next.invokeWrite(m, promise); 10 } 11 } else { 12 Object task; 13 if (flush) { 14 task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise); 15 } else { 16 task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise); 17 } 18 19 safeExecute(executor, (Runnable)task, promise, m); 20 } 21 22 }
還是很相似,只不過先呼叫findContextOutbound找到下一個ChannelOutboundInvoker型別的ChannelHandlerContext,而且這裡是從尾部往前遍歷的,這樣來看前面所給的圖是沒有任何問題的
在找到ChannelOutboundInvoker後,呼叫invokeWriteAndFlush或者invokeWrite方法:
invokeWriteAndFlush方法:
1 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { 2 if (this.invokeHandler()) { 3 this.invokeWrite0(msg, promise); 4 this.invokeFlush0(); 5 } else { 6 this.writeAndFlush(msg, promise); 7 } 8 9 } 10 11 private void invokeWrite0(Object msg, ChannelPromise promise) { 12 try { 13 ((ChannelOutboundHandler)this.handler()).write(this, msg, promise); 14 } catch (Throwable var4) { 15 notifyOutboundHandlerException(var4, promise); 16 } 17 18 } 19 20 private void invokeFlush0() { 21 try { 22 ((ChannelOutboundHandler)this.handler()).flush(this); 23 } catch (Throwable var2) { 24 this.notifyHandlerException(var2); 25 } 26 27 }
可以看到invokeWriteAndFlush回調了ChannelOutboundHandler的write和flush方法
最終會呼叫HeadContext的write和flush方法:
1 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 2 this.unsafe.write(msg, promise); 3 } 4 5 public void flush(ChannelHandlerContext ctx) throws Exception { 6 this.unsafe.flush(); 7 }
可以看到呼叫了unsafe的write和flush方法,向unsafe緩衝區寫入了訊息,當Selector輪詢到寫事件就緒時,就會通過unsafe將剛才寫入的內容交由JDK的SocketChannel完成最終的write操作。
ChannelPipeline的分析到此全部結束。
&n