netty原始碼之ChannelPipeline
技術標籤:nettynettychannelpipelinepipeline
netty原始碼之ChannelPipeline
Netty中ChannelPipeline和ChannelHandler機制類似於Servlet和Filter過濾器,實際上都是職責鏈模式的一種變形,主要是為了方便事件的攔截和使用者業務邏輯的定製。
Netty的Channel過濾器實現原理與Servlet的Filter機制一致,它將Channel的資料管道抽象為ChannelPipeline,訊息在ChannelPipeline中流動和傳遞。ChannelPipeline持有IO事件攔截器ChannelHandler的連結串列,由ChannelHandler對IO事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定製,不需要對已有的ChannelHandler進行修改,能夠實現對修改封閉和對擴充套件的支援(開閉原則)。
ChannelPipeline的結構
ChannelPipeline是ChannelHandler的容器,它負責ChannelHandler的管理和事件攔截與排程。
ChannelPipeline底層使用了一個雙向連結串列來儲存ChannelHandler,但並不是直接儲存的ChannelHandler,而是ChannelHandlerContext,在ChannelHandlerContext可以直接獲取到與之對應的ChannelHandler、ChannelPipeline、Channel。
在使用Netty時,我們並不需要手動建立ChannelPipeline,因為使用ServerBootstrap或者Bootstrap啟動服務端或者客戶端時,Netty會為每個Channel連線建立一個獨立的ChannelPipeline。對於使用者而言,只需要將自定義的攔截器加入到ChannelPipeline中即可。
ChannelPipeline支援執行態動態的新增或者刪除ChannelHandler,在某些場景下這個特性非常實用。例如當業務高峰期需要對系統做擁塞保護時,就可以根據當前的系統時間進行判斷,如果處於業務高峰期,則動態地將系統擁塞保護ChannelHandler新增到當前的ChannelPipeline中,當高峰期過去之後,就可以動態刪除擁塞保護ChannelHandler了。
ChannelPipeline是執行緒安全的,這意味著N個業務執行緒可以併發地操作ChannelPipeline而不存在多執行緒併發問題。但是,ChannelHandler卻不是執行緒安全的,這意味著儘管ChannelPipeline是執行緒安全的,但是使用者仍然需要自己保證ChannelHandler的執行緒安全。
怎麼保證ChannelHandler的執行緒安全?
- 每個Channel儘量保證都擁有自己的Handler,而不是Handler在多個Channel之間共享,也就是下面的程式碼不推薦使用,這麼寫那麼這個ServerHandler必須用註解
@ChannelHandler.Sharable
明確表明這是一個共享的handler,而且是執行緒安全的。
ServerBootstrap b = new ServerBootstrap();
ServerHandler serverHandler = new ServerHandler();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(serverHandler);
}
});
而是推薦這麼使用:ch.pipeline().addLast(new ServerHandler())
;
- 儘量不使用共享資源,如果使用了共享資源,在操作時需要對共享資源進行同步,如加鎖。
ChannelPipeline的事件處理
Netty中的事件分為inbound(入站)事件和outbound(出站)事件。
inbound事件通常由IO執行緒觸發,例如TCP鏈路建立事件、鏈路關閉事件、讀事件、異常通知事件等。觸發inbound事件的方法如下:
- ChannelHandlerContext.fireChannelRegistered(): Channel註冊事件。
- ChannelHandlerContext.fireChannelActive():TCP鏈路建立成功,Channel啟用事件。
- ChannelHandlerContext.fireChannelRead(Object):讀事件。
- ChannelHandlerContext.fireChannelReadComplete():讀操作完成通知事件。
- ChannelHandlerContext.fireExceptionCaught(Throwable):異常通知事件。
- ChannelHandlerContext.fireUserEventTriggered(Object):使用者自定義事件。
- ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可寫狀態變化通知事件。
- ChannelHandlerContext.fireChannelInactive():TCP連線關閉,鏈路不可用通知事件。
outbound事件通常是由使用者主動發起的網路IO操作,例如使用者發起的連線操作、繫結操作、訊息傳送等操作。觸發outbound事件的方法如下:
- ChannelHandlerContext.bind(Socket.Address, ChannelPromise):繫結本地地址事件。
- ChannelHandlerContext.connect(SocketAddress,SocketAddress, ChannelPromise):連線服務端事件。
- ChannelHandlerContext.write(Object, ChannelPromise):傳送事件。
- ChannelHandlerContext.flush():重新整理事件。
- ChannelHandlerContext.read():讀事件。
- ChannelHandlerContext.disconnect(ChannelPromise):斷開連線事件。
- ChannelHandlerContext.close(ChannelPromise):關閉當前Channel事件。
DefaultChannelPipeline
ChannelPipeline的實現其實只有一個DefaultChannelPipeline。
DefaultChannelPipeline對ChannelHandler的管理
ChannelPipeline是ChannelHandler的管理容器,必然要負責ChannelHandler的查詢、新增、替換和刪除,所以它裡面的方法很多都是和管理ChannelHandler有關的,比如addXXX、removeXXX、replaceXXX等。
ChannelPipeline內部維護了一個ChannelHandler的連結串列和迭代器,所以從本質上來說,上述的管理操作基本上就是對連結串列的操作。但是ChannelHandler本身並不是連結串列上的元素,所以在具體實現上是把ChannelHandler包裝進ChannelHandlerContext的例項DefaultChannelHandlerContext,然後把ChannelHandlerContext作為元素來組成連結串列。
下面是DefaultChannelPipeline.addLast()方法的原始碼:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler); // 檢查handler是否是共享的,如果是,必須加@Sharable註解,否則會丟擲異常
// filterName()方法會檢查handler是否已經存在
// 建立DefaultChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
// 新增至連結串列尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回撥handler.handlerAdd()方法
callHandlerAdded0(newCtx);
return this;
}
由於ChannelPipeline支援執行期動態修改,因此存在兩種潛在的多執行緒併發訪問場景。
- IO執行緒和使用者業務執行緒的併發訪問;
- 使用者多個執行緒之間的併發訪問。
為了保證ChannelPipeline的執行緒安全性,需要通過執行緒安全容器或者鎖來保證併發訪問的安全,此處Netty直接使用了synchronized關鍵字,保證同步塊內的所有操作的原子性。在加入連結串列之前,Netty還會檢查該Handler是否已經被新增過及其名字是否有重複,如果該Handler不是共享的,而且被新增過丟擲異常,如果名字重複,也會丟擲異常。
完成連結串列操作後,後面的部分基本上做的就是一件事,執行方法callHandlerAdded0,只是根據條件不同進行同步或者非同步執行,callHandlerAdded0的主要作用是在handler新增被加入連結串列之後做一些額外工作,Netty本身對這個方法的實現,在ChannelHandlerAdapter類中,是個空操作,這裡如果我們需要做一些業務邏輯, 可以通過重寫該方法進行實現。
callHandlerAddedInEventLoop和callHandlerAdded0的區別:
- callHandlerAdded0直接使用當前執行緒來回調handler.handlerAdd()方法。
- callHandlerAddedInEventLoop是使用context中所攜帶的執行緒池來回調handler.handlerAdd()方法。
也就說如果addLast()方法是由內部EventLocp中的執行緒呼叫,那麼就會直接回調handler.handlerAdd()方法,如果是外部使用者執行緒呼叫,那麼就會放入到當前EventLocp的任務佇列中,等待其中的執行緒進行回撥。
inbound事件
當發生某個I/O事件的時候,例如鏈路建立、鏈路關閉、讀取操作完成等,都會產生一個事件,事件在pipeline中得到傳播和處理,它是事件處理的總入口。由於網路IO相關的事件有限,因此Netty對這些事件進行了統一抽象,Netty自身和使用者的ChannelHandler會對感興趣的事件進行攔截和處理。
pipeline中以fireXXX命名的方法都是從IO執行緒流向使用者業務Handler的inbound事件,它們的實現因功能而異,但是處理步驟類似,總結如下。
- 呼叫HeadHandler對應的fireXXX方法;
- 執行事件相關的邏輯操作。
inbound事件對應的方法如下:
ChannelPipeline fireChannelRegistered();
ChannelPipeline fireChannelUnregistered();
ChannelPipeline fireChannelActive();
ChannelPipeline fireChannelInactive();
ChannelPipeline fireExceptionCaught(Throwable cause);
ChannelPipeline fireUserEventTriggered(Object event);
ChannelPipeline fireChannelRead(Object msg);
ChannelPipeline fireChannelReadComplete();
ChannelPipeline fireChannelWritabilityChanged();
來看一下fireChannelActive()方法的原始碼:
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
可見pipeline的fireChannelActive對應的方法都是委託雙向連結中的head.invokeChannelActive()進行處理,實際上會觸發對應handler.channelActive(),其他方法也類似。
outbound事件
由使用者執行緒或者程式碼發起的IO操作被稱為outbound事件,事實上inbound和outbound是Netty自身根據事件在pipeline中的流向抽象出來的術語,在其他NIO框架中並沒有這個概念。outbound事件包括髮起繫結、發起連線、發起讀寫、重新整理資料、發起關閉、發起斷連等等。
Pipeline本身並不直接進行IO操作,最終都是由Unsafe和Channel來實現真正的IO操作的。Pipeline負責將IO事件通過TailHandler進行排程和傳播,最終呼叫Unsafe的IO方法進行I/O操作。
outbound事件對應的方法如下:
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
來看一下write()方法的原始碼:
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
可見pipeline的出站對應的方法都是委託雙向連結串列中的tail進行處理。