1. 程式人生 > >Netty入門(2) - 核心概念

Netty入門(2) - 核心概念

Netty Crash Course

一個Netty程式一般開始於Bootstrap類,通過設定程式後,使用Handlers來處理特定的event和設定Netty中的事件,從而處理多個協議資料,比如實現ChannelInBoundHandler類;

多個Handler通過ChannelInitializer的ChannelPipeline組合在一起,再配合EventLoop(整個生命週期內繫結對應一條執行緒)和EventLoopGroup來處理各種業務邏輯。

Channels, Events and Input/Ouput

Netty中所有操作都是非同步執行的,例如連線一個主機、發訊息。可以註冊監聽器來通知,如Futures(註冊一個監聽,當操作成功或者失敗時會通知)和ChannelFutures(封裝的是一個操作的相關資訊,操作被執行會理科返回ChannelFuture)。

Bootstrap

兩種引導,用於客戶端或者DatagramChannel的Bootstrap(一個EventGroup)和用於服務端的ServerBootstrap(兩個EventGroup)。

EventLoopGroup可以包含很多個EventLoop,每個channel繫結一個EventLoop不會被改變,很多channel共享一個EventLoop。這就意味著如果一個Channel保持EventLoop繁忙,會禁止其他channel繫結到相同的EventLoop,我們可以理解EventLoop為一個事件迴圈執行緒,而EventLoop是一個事件迴圈集合。

Channel Handler and Data Flow

Handlers自身依賴ChannelPipeline來決定它們的執行順序。

ChannelHandler定義了一個父介面,ChannelInboundHandler(入站,讀)和ChannelOutboundHandler(出站,寫)是其子介面,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter是橋接器類。

入站是從ChannelPipeline的頭部開始傳遞到最後,而出站是從尾部開始。

Netty發訊息有兩種方法:1、直接寫入通道導致訊息從ChannelPipeline的尾部開始;2、寫入ChannelHandlerContext物件導致處理訊息從ChannelPipeline的下一個handler開始

編碼器、解碼器和業務邏輯

每個Handler負責轉發時間到ChannelPipeline的下一個handler,但在*Adapter(和子類)中是自動完成的:

ChannelHandlerAdapter,ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter,

ByteToMessageDecoder,MessageToByteEncoder,ProtobufEncoder,ProtobufDecoder

所有的解碼器都繼承或者實現ChannelInboundHandlerAdapter或者ChannelInboundHandler,channelRead放大呼叫decode方法,並通過ChannelHandlerContext.fireChannelRead(Object msg)傳遞給下一個handler。編碼器類似。

也許最常見的應用程式處理接收到訊息後進行解碼,然後供相應的業務處理模組使用。那麼有用程式只需要拓展SimpleChannelInboundHandler<T>,SimpleChannelOutboundHandler<T>即可。主要方法是channelRead0(ChannelHadnlerContext ctx, T msg); 消費msg時需要注意,Netty中梳理IO一般有很多執行緒,不要阻塞IO執行緒,因為阻塞會降低程式的效能,解決方案是新增ChannelHandler到ChannelPipeline時指定一個EventExecutorGroup,EventExecutorGroup會獲得一個EventExecutor,EventExecutor將會執行ChannelHandler的所有方法:

double coefficient = 0.8;  //係數
int numberOfCores = Runtime.getRuntime().availableProcessors();
int poolSize = (int)(numberOfCores / (1 - coefficient));
static final EventExecutorGroup group = new DefaultEventExecutorGroup(poolSize);


ChannelPipeline pipeline = ch.pipeline(); 
pipeline.addLast("decoder", new MyProtocolDecoder()); 
pipeline.addLast("encoder", new MyProtocolEncoder()); 
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods 
// in a different thread than an I/O thread so that the I/O thread is not blocked by 
// a time-consuming task. 
// If your business logic is fully asynchronous or finished very quickly, you don't 
// need to specify a group. 
pipeline.addLast(group, "handler", new MyBusinessLogicHandler()); 

 如需要定時執行簡單的任務,可以使用EventLoop:

ch.eventLoop().schedule(...);
或者
ScheduleFuture f = ch.eventLoop().scheduleAtFixedRate(...);
f.cancel(true);

 

Netty使用reference-counting來安全釋放Buf和其他資源:

ByteBuf

位元組資料容器,單獨維護讀寫兩個索引,最大容量限制是Integer.MAX_VALUE,三種不同型別的ByteBuf:

1、Heap Buffer(堆緩衝區)

資料儲存在JVM的堆空間中,通過陣列實現,可以使用ByteBuf的array()來獲取資料。訪問非堆緩衝區的ByteBuf的陣列會導致UNsupportedOperationException,可以使用hasArray()來檢查

2、Direct Buffer(直接緩衝區)

堆之外直接分配記憶體,效能很好,分配和釋放比較複雜,使用記憶體池來解決。不支援直接陣列訪問資料:

ByteBuf buf = Unpooled.directBuffer(16);
if (!buf.hasArray()) {
  int len = buf.readableBytes();
  byte[] arr = new byte[len];
  buf.getBytes(arr);          
}

3、Composite Buffer(複合緩衝區)

提供一個檢視組合多個buff,hasArray()總返回false。

CompositeByteBuf cbf = Unpooled.compositeBuffer();
ByteBuf heap = Unpooled.buffer(8);
ByteBuf direct = Unpooled.directBuffer(16);
cbf.addComponents(heap, direct);

cbf.removeComponent(0);

Iterator<ByteBuf> iter = cbf.iterator();
while(iter.hasNext()) {
  System.out.println(iter.next().toString());      
}

if (!cbf.hasArray()) {
  int len = cbf.readableBytes();
  byte[] arr = new byte[len];
  cbf.getBytes(0, arr);    
}

相關方法:

isReadable()是否還可以繼續讀

writeableBytes()剩餘可寫空間

clear()重置讀寫的index為0

discardReadBytes()清空已讀資料,會移動位元組位置,設計到記憶體複製,注意效能問題

readerIndex(int idx)設定讀索引位置

writerIndex(int idx)設定寫索引位置

duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)這些會建立緩衝區檢視但有獨立的讀寫索引

copy()或者copy(int index, int length)建立全新的副本

ByteBufProcessor

搜尋操作,如bytesBefore(byte value)

ByteBufHolder

幫助更快捷的訪問ByteBuf中的資料,當緩衝區沒用了之後,尅只用這個輔助類釋放資源。

ByteBufAllocator

負責分配ByteBuf例項,可以從Channel或者ChannelHandlerContext的alloc()方法獲得該物件,有兩種實現,一為將分配和回收成本以及記憶體使用降到最低,另一種是每次使用都建立新的ByteBuf例項。

Netty預設使用PooledByteBufAllocator,可以通過ChannelConfig或者引導設定不同的實現來改變。

Unpooled

建立緩衝區的工具類。

ByteBufUtil

工具類,hexDump(ByteBuf buff)