Java Netty 學習(九)
上一篇文章學習了Channel,它遮蔽了許多底層的java.net.Socket
的操作,
那麼,當有了資料流之後,就到了如何處理它的時候,那麼本篇文章先看ChannelPipeline和ChannelHandler。
概念
Netty裡面的ChannelPipeline就類似於一根管道,而ChannelHandler類似於裡面的攔截器,當一個攔截器攔截完後,可以向後傳遞,或者跳過。
ChannelPipeline提供了提供了ChannelHandler鏈上的容器,並定義了用於該鏈的傳播入站和出站的流API。ChannelPipeline持有I/O事件攔截器ChannelHandler的連結串列,由ChannelHandler對I/O事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定製
很簡單的理解就是編碼器和解碼器都是ChannelHandler,當資料在網路上傳輸時候,通訊雙方會定義協議和格式,此時到了Channel端,則需要進行解碼,從而再進行業務處理,而處理完,再進行編碼傳送出去。當然這是例子,實際中可以定一個多個ChannelHandler,這些ChannelHandler將以連結串列的形式存在,再進行事件傳遞。
當建立一個新的Channel
時,都會分配了一個新的ChannelPipeline
,該關聯是永久的,該通道既不能附加另一個ChannelPipeline
也不能分離當前的ChannelPipeline
。
下面分別介紹ChannelPipline
ChannelPipline
ChannelPipline可以理解為管家,對ChannelHandler進行攔截和排程。
當一個訊息被ChannelPipeline的Handler鏈攔截和處理過程是怎樣的呢?
在上文中的HelloClientHandler
的channelActive
打一個端點,分析其執行流程
先來分析下:
- 啟動
Server.java
,隨後啟動Client.java
- 當Client嘗試連線到Server時,初始化了Channel資訊,可看:Netty的Channel
- 隨後,由於HelloClientHandler也是一個Handler,它的呼叫必定經過ChannelPipeline。
- 隨後,底層的SocketChannel read()方法讀取ByteBuf,觸發ChannelRead事件
- 由IO執行緒
EventLoopGroup
分配執行緒作為Selector,等待到了事件變化,並將事件按照類別處理,如下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //獲取channel的unsafe內部類
if (!k.isValid()) { //key可用
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop(); //從Channel中獲取對應的EventLoop
} catch (Throwable ignored) {
// 如果丟擲異常則直接返回,因為原因可能是還沒有EventLoop
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps(); // 獲取Selector的keys
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 可讀並且連線的事件
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect(); //先呼叫finishConnect,否則jdk會丟擲NotYetConnectedException
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) { //可讀並且寫事件
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); //讀事件
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 此時由於NIO的Keys變化,執行了
unsafe
的read()
方法,而在read
方法裡面,將呼叫pipeline
的fireChannelRead
方法,將事件流傳遞,如read
中下面程式碼:
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
- 此時訊息已經傳遞到ChannelPipeline,通過
fireChannel*
方法將訊息向後傳遞向後傳遞,利用ChannelHandlerContext
來依次傳遞給channelHandler1,channelHandler2,channelHandler3…
整個read事件就如上。
那麼write事件呢?可以理解為和read相反,訊息從tailHandler開始,途經channelHandlerN……channelHandler1, 最終被新增到訊息傳送緩衝區中等待重新整理和傳送,在此過程中也可以中斷訊息的傳遞,例如當編碼失敗時,就需要中斷流程,構造異常的Future返回等。
事件
事件主要分為inbound和outbound,即入站和出站事件
如fireChannelActive
,fireChannelRead
,fireChannelReadComplete
等則為入站事件,而
write
,flush
,disconnect
則為出戰事件。
看ChannelInbound和ChannelOutbound結構圖:
即一般實現ChannelHandler,繼承相應的裝飾器類Adapter即可,然後重寫需要的方法。
構建ChannelPipeline
ChannelPipline介面提供了ChannelHandler鏈的容器,並定義了用於在該鏈上傳播入站和出戰的事件流API。當Channel被建立時, 它會自動的分配到它專屬的ChannelPipline中。
而且並不用程式設計師去建立一個ChannelPipline,只需要往這個容器中丟東西就好了,記得在BootStrap啟動時:
pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast(new HelloClientHandler());
pipeline.addLast("encoder", new MyProtocolEncoder());
ChannelPipeline的機制和和Map很相似,以簡直對的方式講ChannelHandler管理起來,增刪改查,但是此時會有問題,類似與Map有ConcurrentHashMap一類併發容器,理論上,ChannelPipeline會有IO執行緒和使用者執行緒之間的併發情況,以及使用者之間的併發情況,那麼ChannelPipeline併發下怎麼解決呢?
在ChannelPipeline中有四個方法:
- addFirst
- addBefore
- addAfter
- addLast
通過查DefaultChannelPipeline
的原始碼不難發現,這四個方法都使用Synchronized(this)
來加鎖,將當前整個ChannelPipeline給鎖起來,這樣依賴就很好的避免了更改Pipeline內部連結串列結構時候出現的併發問題。
每當新增時候,ChannelPipeline都會呼叫checkDuplicateName(name);
進行同名校驗,從表頭迴圈到表尾進行校驗:
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
DefaultChannelPipeline
是ChannelPipline的預設實現,能夠滿足大多數ChannelPipline的需求,其父類實現了ChannelInboundInvoker
和 ChannelOutboundInvoker
用於能夠分別給不同型別的事件傳送通知。
ChannelPipeline中的耗時操作
ChannelPipline
中的每一個ChannelHandler
都是通過它的EventLoop
(IO執行緒)來處理它的事件的。所以不要阻塞這個執行緒,因為會對整體 IO產生負面影響。
但有時可能需要與那些使用阻塞API的遺留程式碼進行互動,對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroup
的addFirst
,addLast
,addBefore
,addAfter
方法,如果一個事件被傳遞給一個自定義的EventExecutorGroup
,它將被包含在這個EventExecutorGroup
中EventExecutor
所處理 (類似新開一個執行緒),從而被該Channel
本身的EventLoop
中移除,對於這種用例,Netty
提供一個交DefaultEventExecutorGroup
預設實現
當然,在上述四個add*
方法也是有Synchronized
修飾的
ChannelHandlerContext介面
ChannelHandlerContext
代表ChannelHandler
和ChannelPipline
之間的關聯,每當有ChannelHandler
新增到ChannelPipline
中時,
都會建立ChannelHandlerContext
,它的主要功能是管理它所關聯的ChannelHandler
和它同一個ChannelPipline
中其他ChannelHandler
之間的互動。ChannelHandlerContext
有很多方法,其中一些方法也存在於Channel
和ChannelPipline
本身上,但有一點重要不同,如果呼叫Channel
或者ChannelPipline
上的這些方法,它們將沿著整個ChannelPipline
進行傳播。而呼叫位於ChannelHandlerContext
上相同方法,
則講從當前所關聯的ChannelHandler
開始,並且只會傳播給位於該ChannelPipline
中下一個能夠處理的ChannelHandler
。ChannelHandlerContext
和ChannelHandler
之間的關聯是永遠不變的,所以快取你對他的引用是安全的- 相對於其他類的同名方法,
ChannelHandlerContext
的方法將產生更短的事件流,應該儘可能利用這個特性來獲得最大的效能
雖然被呼叫的CHannel
或者ChannelPipline
上的write方法一直傳播事件通過整個ChannelPipline
,但是在ChannelHandler
的級別上,
事件從一個ChannelHandler
到下一個ChannelHandler
的移動是由ChannelHandlerContext
上呼叫完成的。
一個ChannelHandler
可以從屬於多個ChannelPipline
,所以它也可以繫結到多個ChannelHandlerContext
例項,對於這種用法,
對應的ChannelHandler
必須要使用@Shareable
註解標註,否則試圖將它新增多個ChannelPIpline
將會觸發異常。顯而易見,為了安全的
備用與多個併發的Channel
,這樣的ChannelHandler
必須是執行緒安全的。
參考資料:
- Netty In Action
- Netty 權威指南
- Netty 原始碼 4.1.12 Final