(譯)Netty In Action第六章—channelhandler 和 channelpipeline
請尊重勞動成果,未經本人允許,拒絕轉載,謝謝!
這一章涵蓋以下內容:
- ChannelHandler 和 ChannelPipeline的APIs介紹
- 資源洩漏檢測
- 異常處理
在前一章節你已經學習了ByteBuf——Netty的資料容器。隨著在這一章研究Netty的資料流和處理元件,我們將以你所學的知識為基礎,同時開始探索框架中的重要元素。
你已經知道一個Channel-Pipeline中可以宣告多個ChannelHandlers,以便於組織處理邏輯。我們將檢查各種包含這些classes和一個重要的關聯類——ChannelHandlerContext的用例。
理解這些元件之間的關係對於利用Netty建立模組化的、可重用的實現是至關重要的。
6.1 ChannelHandler家族
為了準備ChannelHandler的詳細學習,我們將花時間瞭解這部分涉及的Netty元件模型的一些基礎知識。
6.11 Channel生命週期
Channel介面定義了一個簡單但作用巨大的狀態模型,這個模型和ChannelInboundHandler API緊密關聯。Channel的4種狀態如表6.1所示。
表6.1 Channel生命週期狀態
狀態 | 描述 |
---|---|
ChannelUnregistered | Channel已建立,但沒有註冊到EventLoop. |
ChannelRegistered | Channel已註冊到EventLoop |
ChannelActive | Channel是活躍的(連上遠端)。它可以接收和傳送資料。 |
ChannelInactive | Channel沒有連上遠端 |
Channel的正常生命週期如圖6.1所示。隨著這些狀態變化的出現,響應事件隨之生成。這些事件被轉發到ChannelPipeline中的ChannelHandlers來執行。
圖6.1 Channel狀態模型
6.1.2 ChannelHandler生命週期
ChannelHandler生命週期的方法定義在ChannelHandler介面中,如表6.2所示,它們在ChannelHandler加入Channel-Pipeline或者從Channel-Pipeline移除之後被呼叫。每個方法傳入一個ChannelHandlerContext引數。
表6.2 ChannelHandler生命週期方法
型別 | 描述 |
---|---|
handlerAdded | 當ChannelHandler加入ChannelPipeline時被呼叫 |
handlerRemoved | 當ChannelHandler從ChannelPipeline移除時被呼叫 |
exceptionCaught | ChannelPipeline在處理過程中發生錯誤時被呼叫 |
Netty定義了以下兩個重要的ChannelHandler的子介面:
- ChannelInboundHandler——處理各種入站資料和狀態變化
- ChannelOutboundHandler——處理出站資料且允許所有操作打斷
下一節我們將詳細討論這些介面。
6.1.3 ChannelInboundHandler介面
表6.3列出了ChannelInboundHandler介面的生命週期方法。這些方法在資料接收或者關聯Channel的狀態改變時被呼叫。正如我們之前所提到的,這些方法緊密對映到Channel生命週期。
表6.3 ChannelInboundHandler方法
型別 | 描述 |
---|---|
channelRegistered | Channel註冊到EventLoop且能處理I/O時呼叫 |
channelUnregistered | Channel從EventLoop登出且不能處理任何I/O時呼叫 |
channelActive | Channel活躍時呼叫;Channel已連線/繫結且準備好了。 |
channelInactive | Channel不再是活躍狀態且不再連線遠端時呼叫 |
channelReadComplete | Channel的讀操作已經完成時呼叫 |
channelRead | 如果資料從Channel讀取就呼叫 |
channelWritabilityChanged | Channel的可寫性狀態改變時呼叫。使用者可以確保寫操作不會完成太快(以此避免OutOfMemoryError)或者當Channel變成可重寫時可以恢復寫操作。Channel類的isWritable()方法可以用來檢查channel的可寫性。可寫性的閾值可以通過Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()來設定。 |
當ChannelInboundHandler實現重寫channelRead()方法時,釋放與連線池ByteBuf例項相關的記憶體是非常有必要的。Netty為此提供了一個工具方法,ReferenceCountUtil.release(),如下所示。
碼單6.1 釋放訊息資源
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead0(ChannelHandlerContext ctx,Object msg) {
// No need to do anything special
}
}
因為SimpleChannelInboundHandler自動釋放資源,你不必要為後面的呼叫儲存任何訊息的引用,也就是說這些引用將變成無效。
章節6.1.6提供了關於引用處理的更詳細的討論。
6.1.4 ChannelOutboundHandler介面
出站的操作和和資料用ChannelOutboundHandler來處理。它的方法通過Channel, ChannelPipeline, 和 ChannelHandlerContext呼叫。
ChannelOutboundHandler的一項強大的功能是按需延遲操作或者事件,此功能允許高階的方法來請求處理。舉個例子,如果寫到遠端被暫停,你可以延遲沖洗操作,然後恢復他們。
表6.4展示了所有ChannelOutboundHandler自身定義的方法(遺漏那些從ChannelHandler繼承的方法)
表6.4 ChannelOutboundHandler 方法
型別 | 描述 |
---|---|
bind(ChannelHandlerContext,SocketAddress,ChannelPromise) | 在繫結Channel到本地地址時呼叫 |
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise) | 在請求連線Channel到遠端時呼叫 |
disconnect(ChannelHandlerContext,ChannelPromise) | 在請求斷開Channel和遠端的連線時呼叫 |
close(ChannelHandlerContext,ChannelPromise) | 在請求關閉Channel時呼叫 |
deregister(ChannelHandlerContext,ChannelPromise) | 請求從EventLoop登出Channel時呼叫 |
read(ChannelHandlerContext) | 請求從Channel讀取更多資料時呼叫 |
flush(ChannelHandlerContext) | 請求通過Channel重新整理排隊的資料到遠端時呼叫 |
write(ChannelHandlerContext,Object,ChannelPromise) | 請求通過Channel寫資料到遠端時呼叫 |
CHANNELPROMISE VS.CHANNELFUTURE
ChannelOutboundHandler的大部分方法帶了一個ChannelPromise引數,當操作完成時用它進行通知。ChannelPromise是ChannelFuture的一個子介面,它定義了一些可寫方法,比如setSuccess()或setFailure(),所以使ChannelFuture不可變。
接下來我們將著眼於這些簡化寫ChannelHandlers任務的類。
6.1.5 ChannelHandler介面卡
你可以使用類ChannelInboundHandlerAdapter和
ChannelOutboundHandlerAdapter作為你自定義的ChannelHandlers的出發點。這些介面卡分別提供了 ChannelInboundHandler和ChannelOutboundHandlerxed的基本實現。他們通過繼承抽象類ChannelHandlerAdapter來獲取他們公共子介面的方法。最終的類層次結構如圖6.2所示。
圖6.2 ChannelHandlerAdapter類層次結構
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中提供的方法體在關聯的ChannelHandlerContext上呼叫相同的方法,從而將事件轉發給管道中的下一個ChannelHandler。
為了將這些適配類使用在你自己的攔截器中,簡單繼承他們並且重寫你想去自定義的方法。
6.1.6 資源管理
無論何時你通過呼叫ChannelInboundHandler.channelRead()或ChannelOutboundHandler.write()操作資料,你需要確保沒有資源洩露。正如你可能記得的前面章節所述,Netty使用引用計數來處理連線池ByteBufs。因此在你使用完ByteBuf後調整引用計數是很重要的。
為了幫助你診斷潛在的問題,Netty提供了類Resource-LeakDetector(資源-洩露檢測器),它將取樣你應用的緩衝區分配的1%來檢查記憶體洩露。涉及的開銷非常小。
如果檢測到洩露,將產生類似如下日誌資訊:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().
Netty 目前定義了4種洩露檢查級別,如表6.5所示。
表6.5 洩露-檢查 級別
級別 | 描述 |
---|---|
DISABLED | 不啟用洩露檢查,僅在大量的測試後使用此級別 |
SIMPLE | 使用預設的1%取樣率,報告任何發現的洩露。這是預設的級別且對大部分場景都比較適用。 |
ADVANCED | 報告發現的洩露和資訊被訪問的地方。適用預設的取樣率。 |
PARANOID | 類似ADVANCED,除了每一次訪問被取樣外。此級別對效能有很大的影響,一般僅僅在除錯模式中使用。 |
洩露檢查級別通過將以下Java系統屬性設定成表格中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED
如果你設定了JVM條件後重新啟動你的應用,你會發現最近你應用被訪問的洩露緩衝區的位置。以下是通過單元測試生成的一份嚴重的洩露報告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...
當你實現Channel-InboundHandler.channelRead()和
ChannelOutboundHandler.write()方法時,怎麼使用這個診斷工具來防止洩露呢?讓我們檢查例項中你的channelRead()操作消費入站訊息的地方;也就是說,不呼叫ChannelHandlerContext.fireChannelRead()方法來傳遞給下一個ChannelInboundHandler。下面這個程式碼清單展示瞭如何釋放訊息。
碼單 6.3 消費並釋放入站訊息
@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
消費入站訊息簡便方式
因為消費入站資料並釋放時一個公共的任務,Netty提供了一個特別的Channel-
InboundHandler實現——SimpleChannelInboundHandler。一旦訊息通過channelRead0()被消費,這個實現將自動釋放訊息。
在出站一端,如果你處理write()操作並丟棄一個訊息,你必須釋放它。以下程式碼清單展示了一個丟棄所有寫入資料的實現。
碼單 6.4 丟棄並釋放出站資料
@Sharable
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
promise.setSuccess();
}
}
我們不但要釋放資源而且要通知ChannelPromise。否則可能會出現這種情況,ChannelFutureListener沒有接到訊息已經被處理的通知。
總而言之,如果訊息被消費或者被丟棄且沒有傳遞給ChannelPipeline的下一個ChannelOutbound-
Handler,使用者有必要呼叫ReferenceCountUtil.release()。如果訊息到達實際的傳輸層,當它被寫入或者Channel關閉時,它將自動釋放。
6.2 ChannelPipeline介面
如果你把ChannelPipeline當做攔截流通Channel的入站和出站事件的一個ChannelHandler例項鏈,那麼這些ChannelHandlers的相互作用如何能構造核心的應用資料和事件處理邏輯是顯而易見的。
每一個新建的Channel會分配一個新的ChannelPipeline。這個關聯是永久的;Channel既不能連線到另一個ChannelPipeline,也不能斷開當前連線的ChannelPipeline。這是Netty元件生命週期的修復操作,不需要開發者進行操作。
根據其來源,一個事件會被ChannelInbound-
Handler或者ChannelOutboundHandler處理。隨後它將通過呼叫ChannelHandlerContext實現,轉發給同一父型別的下一個攔截器。
ChannelHandlerContext
ChannelHandlerContext促使ChannelHandler和它的Channel-
Pipeline以及其他的攔截器相互作用。一個攔截器可以通知ChannelPipeline的下一個ChannelHandler,且動態修改它屬於的ChannelPipeline。
ChannelHandlerContext有豐富的API來處理事件和執行I/O操作。章節6.3會提供更多關於ChannelHandlerContext的資訊。
圖6.3 ChannelPipeline 和 ChannelHandlers
圖6.3用入站和出站ChannelHandlers圖解了一個典型的ChannelPipeline佈局,並且圖解了我們之前的論述——ChannelPipeline根本上是一系列的ChannelHandlers。ChannelPipeline同樣提供通過ChannelPipeline自身傳播事件的方法。如果入站事件被觸發,它會貫穿整個ChannelPipeline進行傳播。在圖6.3中,一個出站I/O事件將在ChannelPipeline右端開始,然後行進到左端。
ChannelPipeline 相對性
從事件通過Channel-Pipeline傳輸的這一觀點上,你可能會認為其開端依賴於事件是入站還是出站。但是Netty一直證明ChannelPipeline的入站入口(圖6.3左端)作為開始,而出站入口(圖6.3右端)作為結束。
當你使用ChannelPipeline.add*()方法完成增加你的ChannelPipeline的入站和出站混合攔截器,每個ChannelHandler的順序是從開始到結束的位置,正如我們剛剛給他們定義的一樣。因此,如果你給圖6.3中的攔截器從左至右編號,第一個入站事件可見的Channel-Handler將是1;第一個出站事件可見的Channel-Handler將是5。
當pipline傳播一個事件時,它決定pipline的下一個ChannelHandler的型別是否匹配移動方向。如果不匹配,ChannelPipeline跳過此ChannelHandler並傳遞給下一個,直到它發現一個能匹配期望方向的ChannelHandler為止。(當然,一個攔截器可能實現ChannelInboundHandler 和ChannelOutboundHandler兩個介面。)
6.2.1 修改ChannelPipeline
ChannelHandler可以通過新增、移除、替換其他的ChannelHandlers,實時修改ChannelPipeline的佈局。(它也可以從ChannelPipeline移除自身。)這是Channel-Handler最重要的功能,所以我們將仔細觀察它是怎麼做的。相關的方法如表6.6所列:
表6.6 ChannelHandler修改ChannelPipeline方法
方法名 | 描述 |
---|---|
addFirst/addBefore/addAfter/addLast | 增加一個ChannelHandler到ChannelPipeline |
remove | 移除從ChannelPipeline移除一個ChannelHandler |
replace | 在ChannelPipeline中用一個ChannelHandler替換另一個ChannelHandler |
以下程式碼清單展示了這些方法的用途:
碼單 6.5 修改ChannelPipeline
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", new SecondHandler());
pipeline.addLast("handler3", new ThirdHandler());
...
pipeline.remove("handler3");
pipeline.remove(firstHandler);
pipeline.replace("handler2", "handler4", new FourthHandler());
你隨後會發現這項用來輕鬆重組ChannelHandlers的能力,適用於極其複雜邏輯的實現。
ChannelHandler執行和阻塞
正常情況下,ChannelPipeline的每一個ChannelHandler處理通過它的EventLoop(I/O執行緒)傳遞給它的事件。千萬不要去阻塞這個執行緒,否則它會對全部的I/O處理有負面影響。
有時候結合使用阻塞APIs的遺留程式碼可能是必須的。對於這個例項,ChannelPipeline擁有add()方法來接收一個Event-ExecutorGroup。如果一個事件被傳遞到一個自定義的EventExecutorGroup,它將被包含此EventExecutorGroup的EventExecutors中的一個處理,而且從它自身Channel的EventLoop被移除。對於這個使用場景,Netty提供了一個稱為DefaultEventExecutorGroup的實現。
除了這些操作,也有通過型別或者名稱訪問ChannelHandlers的其他操作。它們如表6.7所列:
表 6.7 訪問ChannelHandlers的ChannelPipeline操作
方法名 | 描述 |
---|---|
get | 通過型別或名稱返回一個ChannelHandler |
context | 返回繫結到ChannelHandler的ChannelHandlerContext |
names | 返回此ChannelPipeline中所有ChannelHandlers的名稱 |
6.2.2 觸發事件
ChannelPipeline API暴露其他的方法來呼叫入站和出站操作。表6.8列出了通知Channel-InboundHandlers發生在ChannelPipeline的事件的入站操作。
表6.8 ChannelPipeline入站操作
方法名 | 描述 |
---|---|
fireChannelRegistered | 在ChannelPipeline的下一個ChannelInboundHandler呼叫channelRegistered(ChannelHandlerContext) |
fireChannelUnregistered | 在ChannelPipeline的下一個ChannelInboundHandler呼叫channelUnRegistered(ChannelHandlerContext) |
fireChannelActive | 在ChannelPipeline的下一個ChannelInboundHandler呼叫channelInactive(ChannelHandlerContext) |
fireExceptionCaught | 在ChannelPipeline的下一個ChannelHandler呼叫exceptionCaught(ChannelHandlerContext,Throwable) |
fireUserEventTriggered | 在ChannelPipeline的下一個ChannelInboundHandler呼叫userEventTriggered(ChannelHandler-Context, Object) |
fireChannelRead | 在ChannelPipeline的下一個ChannelInboundHandler呼叫channelRead(ChannelHandlerContext,Object msg) |
fireChannelReadComplete | 在ChannelPipeline的下一個ChannelStateHandler呼叫channelReadComplete(ChannelHandler-Context) |
在出站方面,處理事件將導致底層socket執行某些操作。表6.9列出了ChannelPipeline API的出站操作。
表6.9 ChannelPipeline出站操作
方法名 | 描述 |
---|---|
bind | 繫結Channel到本地地址。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫bind(Channel-HandlerContext, SocketAddress, ChannelPromise) |
connect | 連線Channel到遠端地址。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫connect(ChannelHandlerContext, SocketAddress,ChannelPromise) |
disconnect | 斷開Channel。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫disconnect(Channel-HandlerContext,ChannelPromise) |
close | 關閉Channel。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫close(ChannelHandlerContext,ChannelPromise) |
deregister | 從之前分配的EventExecutor(EventLoop)登出Channel。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫deregister(ChannelHandler-Context, ChannelPromise)。 |
flush | 重新整理Channel的所有掛起寫入。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫flush(Channel-HandlerContext)。 |
write | 寫訊息到Channel。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫write(Channel-HandlerContext, Object msg, ChannelPromise)。注意:此方法不會寫訊息到底層socket,僅僅對它排隊。如果要將訊息寫到socket,呼叫flush或者writeAndFlush()。 |
writeAndFlush | 對於呼叫write()然後呼叫flush()來說,這是一個簡便的方法。 |
read | 請求從Channel讀取更多的資料。此方法會在ChannelPipeline的下一個ChannelOutboundHandler呼叫read(ChannelHandlerContext)。 |
總而言之:
- ChannelPipeline持有關聯Channel的ChannelHandlers。
- ChannelPipeline可以通過按需增加和移除ChannelHandlers動態修改。
- ChannelPipeline有一套豐富的API來呼叫操作應對入站和出站事件。
6.3 ChannelHandlerContext介面
ChannelHandlerContext代表ChannelHandler和ChannelPipeline之間的關聯,且無論何時ChannelHandler被加入到Channel-Pipeline就建立。ChannelHandlerContext的主要功能是管理同一個ChannelPipeline中它的關聯ChannelHandler和其他ChannelHandler之間的相互作用。
ChannelHandlerContext有很多方法,一些方法也在它自身Channel和ChannelPipeline中出現,但是有重大的不同之處。如果你在Channel和ChannelPipline例項上呼叫這些方法,他們一直在管道上傳播。同樣的方法在ChannelHandlerContext上呼叫,將在當前關聯的ChannelHandler開始並僅僅傳播到管道的下一個有能力處理此事件的ChannelHandler。
表6.10總結了ChannelHandlerContext API
表 6.10 ChannelHandlerContext API
方法名 | 描述 |
---|---|
bind | 繫結給予的SocketAddress並返回ChannelFuture |
channel | 返回繫結到此例項的Channel |
close | 關閉Channel並返回ChannelFuture |
connect | 連線到給予的SocketAddress並返回ChannelFuture |
deregister | 從先前分配的EventExecutor登出並返回ChannelFuture |
disconnect | 斷開遠端連線並返回ChannelFuture |
executor | 返回分發事件的EventExecutor |
fireChannelActive | 觸發下一個ChannelInboundHandler呼叫channelActive()(已連線) |
fireChannelInactive | 觸發下一個ChannelInboundHandler呼叫channelInactive()(已連線) |
fireChannelRead | 觸發下一個ChannelInboundHandler呼叫channelRead()(已連線) |
fireChannelReadComplete | 觸發下一個ChannelInboundHandler的Channel可寫性變化的事件 |
handler | 返回繫結該例項的ChannelHandler |
isRemoved | 如果關聯ChannelHandler從ChannelPipeline移除則返回true |
name | 返回該例項唯一的名稱 |
pipline | 返回關聯ChannelPipeline |
read | 從Channel讀取資料到第一個入站快取區;如果成功的話,觸發channelRead事件,然後通知channelReadComplete攔截器 |
write | 利用該例項通過管道寫訊息 |
當使用ChannelHandlerContext API,請記住以下幾點:
- 關聯ChannelHandler的ChannelHandlerContext從不會改變,所以快取它的引用是安全的。
- ChannelHandlerContext方法,正如我們在這章節開始陳述的一樣,相比在其他類上使用相同名稱的方法,包含更短的事件流。我們應儘可能利用這一點來提供最大效能。
6.3.1 使用ChannelHandlerContext
在這一章節我們將討論ChannelHandlerContext的用法,ChannelHandlerContext、Channel和ChannelPipeline上可用方法的行為。圖6.4展示了他們之間的關係。
圖6.4 Channel, ChannelPipeline,ChannelHandler和ChannelHandlerContext之間的關係
以下程式碼清單展示了你從ChannelHandlerContext獲取Channel引用。在Channel上呼叫write()會導致一個流通整個管道的寫事件。
碼單6.6 從ChannelHandlerContext訪問Channel
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
以下程式碼清單展示了一個相似的例子,但是這次寫到ChannelPipeline。同樣,從ChannelHandlerContext檢索此引用。
碼單6.7 從ChannelHandlerContext訪問ChannelPipeline
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
正如你在圖6.5所見,程式碼清單6.6和6.7中的流程是相同的。特別需要注意的是,儘管write()在Channel或ChannelPipeline操作一直通過管道傳播事件上被呼叫,ChannelHandlerContext呼叫從一個攔截器到下一個ChannelHandler級別的攔截器的運動。
圖6.5 事件通過Channel或ChannelPipeline傳播
為什麼你想在ChannelPipeline中特定點傳播事件呢?
- 為了減少傳遞事件通過對它不感興趣的ChannelHandlers
- 為了防止通過對事件感興趣的攔截器處理此事件
為了呼叫處理以特殊的ChannelHandler開始,你必須參考在此ChannelHandler之前的ChannelHandler關聯的ChannelHandlerContext。此ChannelHandlerContext將呼叫跟隨與之相關的那個ChannelHandler。
以下程式碼清單和圖6.6闡明這種用法。
Listing 6.8 呼叫 ChannelHandlerContext write()
ChannelHandlerContext ctx = ..;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
正如圖6.6所示,訊息通過始於下一個ChannelHandler的ChannelPipeline流通,繞過所有在前的ChannelHandler。
我們剛剛描述的用例是一個普遍用例,對於在特殊ChannelHandler例項上呼叫操作,它是特別有用的。
圖 6.6 事件流操作通過ChannelHandlerContext觸發
6.3.2 ChannelHandler 和ChannelHandlerContext的高階用法
正如你在程式碼清單6.6所見,你可以通過呼叫ChannelHandlerContext的pipeline()方法獲取封閉的ChannelPipeline。這使得管道的ChannelHandler的執行時操作成為可能,並可以利用它來實現複雜的設計。舉個例子,你可以增加ChannelHandler到管道來支援動態協議變更。
通過快取ChannelHandlerContext的引用以供後續使用可以支援其他高階用法,這可能替換外面任何ChannelHandler方法,且甚至可能發源於一個不同的執行緒。以下程式碼清單展示了使用這種模式觸發一個事件。
碼單 6.9 快取ChannelHandlerContext
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
//Stores reference to ChannelHandlerContext for later use
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
//Sends message using previously stored ChannelHandlerContext
public void send(String msg) {
ctx.writeAndFlush(msg);
}
}
因為一個ChannelHandler可以屬於多個ChannelPipeline,所以它可以繫結到多個ChannelHandlerContext例項。一個ChannelHandler用於此用法必須以@Sharable註解;否則,嘗試把它增加到多個ChannelPipeline將觸發異常。顯而易見,為了安全使用多併發通道(也就是說,連線),比如一個ChannelHandler必須是執行緒安全的。
以下程式碼清單佔了此模式的一種正確實現。
碼單 6.10 共享ChannelHandler
@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read message: " + msg);
ctx.fireChannelRead(msg);
}
}
前述的ChannelHandler實現滿足包含多管道在內的所有需求;也就是使用@Sharable註解且不持有任何狀態。反之,以下程式碼清單中的程式碼會導致問題。
碼單 6.11 @Sharable的無效用法
@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
count++;
System.out.println("channelRead(...) called the "
+ count + " time");
ctx.fireChannelRead(msg();
}
}
這段程式碼的問題在於它持有狀態;也就是說記錄方法呼叫次數的例項變數count。當併發通道訪問它時,增加這個類例項到ChannelPipeline將很有可能產生錯誤。(當然,這個簡單案例可以通過同步channelRead()方法來糾正。)
總而言之,當且僅當你確認你的ChannelHandler是執行緒安全時才能使用@Sharable。
為什麼共享CHANNELHANDLER?
一個普遍的原因是,在多個ChannelPipelines中建立單個的ChannelHandler來收集多個渠道的統計資料。
關於ChannelHandlerContext及其與其他框架元件的關係的討論到此結束。下面我們將研究異常處理。
6.4 異常處理
異常處理是任何大型應用的重要部分,且有各種實現方法。相應地,Netty為處理入站或者出站處理期間丟擲的異常提供了幾種選擇。這一章節將幫助你理解如何設計最適合你需求的方法。
6.4.1 處理入站異常
如果一個異常在處理入站事件期間被丟擲,那麼它將從ChannelInboundHandler中觸發它的位置開始流經ChannelPipeline。為了處理這樣的一個異常,你需要在你ChannelInboundHandler實現中重寫以下方法。
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception
以下大媽清單展示了一個關閉Channel並列印異常堆疊記錄的簡單例子。
碼單 6.12 基本的入站異常處理
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
因為異常會繼續沿著入站方向流動(恰如所有入站事件),實現上述邏輯的ChannelInboundHandler經常位於ChannelPipeline的最後。這確保了所有入站異常總會被處理,不管他們發生在ChannelPipeline的任何位置。
你應該如何對異常作出反應,這很可能與你的應用相關。你可能想去關閉Channel(和連線)或者你可能嘗試去恢復它。如果你沒有實現任何處理入站異常(或者沒有消費異常),Netty將記錄異常沒有被處理的事實。
來總結一下,
- ChannelHandler.exceptionCaught()的預設實現轉發當前異常到管道的下一個攔截器。
- 如果一個異常到達管道的末端,它會被記錄為未處理的。
- 重寫exceptionCaught()來定義特定的處理。然後,你決定是否將異常傳播到該點以外。
6.4.2 處理出站異常
在出站操作中處理正常完成和異常的選項基於以下通知機制:
- 每一個出站操作返回一個ChannelFuture。當操作完成時,使用ChannelFuture註冊的ChannelFutureListeners會接到成功或者錯誤的通知。
- ChannelPromise例項幾乎在所有ChannelOutboundHandler方法中傳遞。作為ChannelFuture的子類,ChannelPromise也可以分配用於非同步通知的監聽器。但ChannelPromise還有提供實時通知的可寫方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
增加ChannelFutureListener意味著在ChannelFuture例項上呼叫addListener(ChannelFutureListener),有兩種實現方案。最普遍使用的一種方案是在入站操作(比如write())返回的ChannelFuture上呼叫addListener()。
以下程式碼清單使用這種方案來增加一個列印堆疊記錄然後關閉Channel的ChannelFutureListener。
碼單 6.13 給ChannelFuture增加ChannelFutureListener
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
第二種方案是給作為ChannelOutboundHandler方法引數傳遞的ChannelPromise增加ChannelFutureListener。如下所示的程式碼和前面的程式碼清單有同樣的效果。
碼單 6.14 給ChannelPromise增加ChannelFutureListener
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}
ChannelPromise可寫方法
通過在ChannelPromise上呼叫setSuccess()和setFailure(),一旦ChannelHandler方法返回呼叫者,你就可以知道操作狀態。
為什麼選擇第一種方案而不是第二種?對於詳細的異常處理,你可能會發現,當調用出站操作時,增加ChannelFutureListener更加合適,正如程式碼清單6.13所示。對於稍不專業的處理異常方案,你可能發現自定義的如程式碼清單6.14所示的ChannelOutboundHandler實現會更加簡單。
如果你的ChannelOutboundHandler自身丟擲異常會發生什麼?在這種情況,Netty自身將通知任何用相應的ChannelPromise註冊的監聽器。
6.5 總結
這一章我們仔細研究了Netty資料處理元件——ChannelHandler。我們討論ChannelHandlers如何一起宣告,他們如何以ChannelInboundHandlers和ChannelOutboundHandlers來與Channelpipeline相互作用。
下一章將聚焦在Netty的編解碼器抽象上,相比於直接使用底層的ChannelHandler實現,它能使編寫協議加密和解密更加簡單。