Netty入門(2) - 核心概念
Netty Crash Course
一個Netty程式一般開始於Bootstrap類,通過設定程式後,使用Handlers來處理特定的event和設定Netty中的事件,從而處理多個協議資料,比如實現ChannelInBoundHandler類;
多個Handler通過ChannelInitializer的ChannelPipeline組合在一起,再配合EventLoop(整個生命週期內繫結對應一條執行緒)和EventLoopGroup來處理各種業務邏輯。
Channels, Events and Input/Ouput
Netty中所有操作都是非同步執行的,例如連線一個主機、發訊息。可以註冊監聽器來通知,如Futures(註冊一個監聽,當操作成功或者失敗時會通知)和ChannelFutures(封裝的是一個操作的相關資訊,操作被執行會理科返回ChannelFuture)。
Bootstrap
兩種引導,用於客戶端或者DatagramChannel的Bootstrap(一個EventGroup)和用於服務端的ServerBootstrap(兩個EventGroup)。
EventLoopGroup可以包含很多個EventLoop,每個channel繫結一個EventLoop不會被改變,很多channel共享一個EventLoop。這就意味著如果一個Channel保持EventLoop繁忙,會禁止其他channel繫結到相同的EventLoop,我們可以理解EventLoop為一個事件迴圈執行緒,而EventLoop是一個事件迴圈集合。
Channel Handler and Data Flow
Handlers自身依賴ChannelPipeline來決定它們的執行順序。
ChannelHandler定義了一個父介面,ChannelInboundHandler(入站,讀)和ChannelOutboundHandler(出站,寫)是其子介面,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter是橋接器類。
入站是從ChannelPipeline的頭部開始傳遞到最後,而出站是從尾部開始。
Netty發訊息有兩種方法:1、直接寫入通道導致訊息從ChannelPipeline的尾部開始;2、寫入ChannelHandlerContext物件導致處理訊息從ChannelPipeline的下一個handler開始
編碼器、解碼器和業務邏輯
每個Handler負責轉發時間到ChannelPipeline的下一個handler,但在*Adapter(和子類)中是自動完成的:
ChannelHandlerAdapter,ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter,
ByteToMessageDecoder,MessageToByteEncoder,ProtobufEncoder,ProtobufDecoder
所有的解碼器都繼承或者實現ChannelInboundHandlerAdapter或者ChannelInboundHandler,channelRead放大呼叫decode方法,並通過ChannelHandlerContext.fireChannelRead(Object msg)傳遞給下一個handler。編碼器類似。
也許最常見的應用程式處理接收到訊息後進行解碼,然後供相應的業務處理模組使用。那麼有用程式只需要拓展SimpleChannelInboundHandler<T>,SimpleChannelOutboundHandler<T>即可。主要方法是channelRead0(ChannelHadnlerContext ctx, T msg); 消費msg時需要注意,Netty中梳理IO一般有很多執行緒,不要阻塞IO執行緒,因為阻塞會降低程式的效能,解決方案是新增ChannelHandler到ChannelPipeline時指定一個EventExecutorGroup,EventExecutorGroup會獲得一個EventExecutor,EventExecutor將會執行ChannelHandler的所有方法:
double coefficient = 0.8; //係數 int numberOfCores = Runtime.getRuntime().availableProcessors(); int poolSize = (int)(numberOfCores / (1 - coefficient)); static final EventExecutorGroup group = new DefaultEventExecutorGroup(poolSize); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
如需要定時執行簡單的任務,可以使用EventLoop:
ch.eventLoop().schedule(...); 或者 ScheduleFuture f = ch.eventLoop().scheduleAtFixedRate(...); f.cancel(true);
Netty使用reference-counting來安全釋放Buf和其他資源:
ByteBuf
位元組資料容器,單獨維護讀寫兩個索引,最大容量限制是Integer.MAX_VALUE,三種不同型別的ByteBuf:
1、Heap Buffer(堆緩衝區)
資料儲存在JVM的堆空間中,通過陣列實現,可以使用ByteBuf的array()來獲取資料。訪問非堆緩衝區的ByteBuf的陣列會導致UNsupportedOperationException,可以使用hasArray()來檢查
2、Direct Buffer(直接緩衝區)
堆之外直接分配記憶體,效能很好,分配和釋放比較複雜,使用記憶體池來解決。不支援直接陣列訪問資料:
ByteBuf buf = Unpooled.directBuffer(16); if (!buf.hasArray()) { int len = buf.readableBytes(); byte[] arr = new byte[len]; buf.getBytes(arr); }
3、Composite Buffer(複合緩衝區)
提供一個檢視組合多個buff,hasArray()總返回false。
CompositeByteBuf cbf = Unpooled.compositeBuffer(); ByteBuf heap = Unpooled.buffer(8); ByteBuf direct = Unpooled.directBuffer(16); cbf.addComponents(heap, direct); cbf.removeComponent(0); Iterator<ByteBuf> iter = cbf.iterator(); while(iter.hasNext()) { System.out.println(iter.next().toString()); } if (!cbf.hasArray()) { int len = cbf.readableBytes(); byte[] arr = new byte[len]; cbf.getBytes(0, arr); }
相關方法:
isReadable()是否還可以繼續讀
writeableBytes()剩餘可寫空間
clear()重置讀寫的index為0
discardReadBytes()清空已讀資料,會移動位元組位置,設計到記憶體複製,注意效能問題
readerIndex(int idx)設定讀索引位置
writerIndex(int idx)設定寫索引位置
duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)這些會建立緩衝區檢視但有獨立的讀寫索引
copy()或者copy(int index, int length)建立全新的副本
ByteBufProcessor
搜尋操作,如bytesBefore(byte value)
ByteBufHolder
幫助更快捷的訪問ByteBuf中的資料,當緩衝區沒用了之後,尅只用這個輔助類釋放資源。
ByteBufAllocator
負責分配ByteBuf例項,可以從Channel或者ChannelHandlerContext的alloc()方法獲得該物件,有兩種實現,一為將分配和回收成本以及記憶體使用降到最低,另一種是每次使用都建立新的ByteBuf例項。
Netty預設使用PooledByteBufAllocator,可以通過ChannelConfig或者引導設定不同的實現來改變。
Unpooled
建立緩衝區的工具類。
ByteBufUtil
工具類,hexDump(ByteBuf buff)