Netty原始碼分析第4章(pipeline)---->第4節: 傳播inbound事件
Netty原始碼分析第四章: pipeline
第四節: 傳播inbound事件
有關於inbound事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead事件, 就是對方發來資料流的所觸發的事件, 己方要對這些資料進行處理, 這一小節, 以啟用channelRead為例講解有關inbound事件的處理流程
在業務程式碼中, 我們自己的handler往往會通過重寫channelRead方法來處理對方發來的資料, 那麼對方發來的資料是如何走到channelRead方法中了呢, 也是我們這一小節要剖析的內容
在業務程式碼中, 傳遞channelRead事件方式是通過fireChannelRead方法進行傳播的
這裡給大家看兩種寫法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫法1:
ctx.fireChannelRead(msg);
//寫法2
ctx.pipeline().fireChannelRead(msg);
}
這裡重寫了channelRead方法, 並且方法體內繼續通過fireChannelRead方法進行傳播channelRead事件, 那麼這兩種寫法有什麼異同?
我們先以寫法2為例, 將這種寫法進行剖析
這裡首先獲取當前context的pipeline物件, 然後通過pipeline物件呼叫自身的fireChannelRead方法進行傳播, 因為預設建立的DefaultChannelpipeline
我們跟到DefaultChannelpipeline的fireChannelRead方法中:
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
這裡首先呼叫的是AbstractChannelHandlerContext類的靜態方法invokeChannelRead, 引數傳入head節點和事件的訊息
我們跟進invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
這裡的Object m m通常就是我們傳入的msg, 而next, 目前是head節點, 然後再判斷是否為當前eventLoop執行緒, 如果不是則將方法包裝成task交給eventLoop執行緒處理
我們跟到invokeChannelRead方法中:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
首先通過invokeHandler()判斷當前handler是否已新增, 如果新增, 則執行當前handler的chanelRead方法, 其實這裡我們基本上就明白了, 通過fireChannelRead方法傳遞事件的過程中, 其實就是找到相關handler執行其channelRead方法, 由於我們在這裡的handler就是head節點, 所以我們跟到HeadContext的channelRead方法中:
HeadContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//向下傳遞channelRead事件
ctx.fireChannelRead(msg);
}
在這裡我們看到, 這裡通過fireChannelRead方法繼續往下傳遞channelRead事件, 而這種呼叫方式, 就是我們剛才分析使用者程式碼的第一種呼叫方式:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫法1:
ctx.fireChannelRead(msg);
//寫法2
ctx.pipeline().fireChannelRead(msg);
}
這裡直接通過context物件呼叫fireChannelRead方法, 那麼和使用pipeline呼叫有什麼區別的, 我會回到HeadConetx的channelRead方法, 我們來剖析ctx.fireChannelRead(msg)這句, 大家就會對這個問題有答案了, 跟到ctx的fireChannelRead方法中, 這裡會走到AbstractChannelHandlerContext類中的fireChannelRead方法中
跟到AbstractChannelHandlerContext類中的fireChannelRead方法:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
這裡我們看到, invokeChannelRead方法中傳入了一個findContextInbound()引數, 而這findContextInbound方法其實就是找到當前Context的下一個節點
跟到findContextInbound方法:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
這裡的邏輯也比較簡單, 是通過一個doWhile迴圈, 找到當前handlerContext的下一個節點, 這裡要注意迴圈的終止條件, while (!ctx.inbound)表示下一個context標誌的事件不是inbound的事件, 則迴圈繼續往下找, 言外之意就是要找到下一個標註inbound事件的節點
有關事件的標註, 之前的小節已經剖析過了, 如果是使用者定義的handler, 是通過handler繼承的介面而定的, 如果tail或者head, 那麼是在初始化的時候就已經定義好, 這裡不再贅述
回到AbstractChannelHandlerContext類的fireChannelRead方法中:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
找到下一個節點後, 繼續呼叫invokeChannelRead方法, 傳入下一個和訊息物件:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
//第一次執行next其實就是head
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
這裡的邏輯我們又不陌生了, 因為我們傳入的是當前context的下一個節點, 所以這裡會呼叫下一個節點invokeChannelRead方法, 因我們剛才剖析的是head節點, 所以下一個節點有可能是使用者新增的handler的包裝類HandlerConext的物件
這裡我們跟進invokeChannelRead方法中去:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//發生異常的時候在這裡捕獲異常
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
又是我們熟悉的邏輯, 呼叫了自身handler的channelRead方法, 如果是使用者自定義的handler, 則會走到使用者定義的channelRead()方法中去, 所以這裡就解釋了為什麼通過傳遞channelRead事件, 最終會走到使用者重寫的channelRead方法中去
同樣, 也解釋了該小節最初提到過的兩種寫法的區別:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫法1:
ctx.fireChannelRead(msg);
//寫法2
ctx.pipeline().fireChannelRead(msg);
}
寫法1是通過當前節點往下傳播事件
寫法2是通過頭節點往下傳遞事件
所以, 在handler中如果如果要在channelRead方法中傳遞channelRead事件, 一定要採用寫法2的方式向下傳遞, 或者交給其父類處理, 如果採用1的寫法則每次事件傳輸到這裡都會繼續從head節點傳輸, 從而陷入死迴圈或者發生異常
這裡有一點需要注意, 如果使用者程式碼中channelRead方法, 如果沒有顯示的呼叫ctx.fireChannelRead(msg)那麼事件則不會再往下傳播, 則事件會在這裡終止, 所以如果我們寫業務程式碼的時候要考慮有關資源釋放的相關操作
如果ctx.fireChannelRead(msg)則事件會繼續往下傳播, 如果每一個handler都向下傳播事件, 當然, 根據我們之前的分析channelRead事件只會在標識為inbound事件的HandlerConetext中傳播, 傳播到最後, 則最終會呼叫到tail節點的channelRead方法
我們跟到tailConext的channelRead方法中:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
我們跟進到onUnhandledInboundMessage方法中:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//釋放資源
ReferenceCountUtil.release(msg);
}
}
這裡做了釋放資源的相關的操作
至此, channelRead事件傳輸相關羅輯剖析完整, 其實對於inbound事件的傳輸流程都會遵循這一邏輯, 小夥伴們可以自行剖析其他inbound事件的傳輸流程