《Netty in action》 讀書筆記
聲明:這篇文章是記錄讀書過程中的知識點,並加以歸納總結,成文。文中圖片、代碼出自《Netty in action》。
1. 為什麽用Netty?
每個框架的流行,都一定有它出眾的地方。Netty就是為了網絡編程使用,它封裝了大量的通訊的底層內容,簡化開發者的工作量,讓開發者的精力全都放於業務上,而且它能高效的處理網絡通訊的東西。很多大的公司都使用Netty作為通訊框架,因此,使這個框架更加完善。
2. OIO(阻塞I/O)與NIO(非阻塞I/O)與AIO(異步I/O)
2.1 阻塞I/O
為什麽叫阻塞I/O呢?因為每個線程對應一個連接,連接的建立、數據讀取、寫入,都會進行阻塞,所以叫阻塞I/O。
性能瓶頸:一個線程對應一個連接,當連接數量巨大時,線程數量過多,每個線程都需要內存,內存會被迅速耗盡,以及線程間切換的開銷也十分巨大。另外,阻塞也是一個問題。accept()、read()、write()都會進行阻塞。
OIO線程模型:
處理邏輯:
2.2 非阻塞I/O
NIO三大組件:Buffer、Channel(數據來源或數據寫入的目的地)、Selector(多路復用器)
開啟Selector方式:Selector selector = Selector.open();
註冊的四種事件:SelectionKey.OP_READ(可讀)、SelectionKey.OP_WRITE(可寫)、SelectionKey.OP_CONNECT(建立起連接)、SelectionKey.OP_ACCEPT(接收連接)
為什麽叫非阻塞I/O呢?它不阻塞了嗎?不是的。是因為,使用Selector對多個並發的連接進行輪詢(也就是用更少的線程監視更多的連接),通過事件驅動來查明哪些可以進行I/O,不必等待到操作的結束。
NIO線程模型:
處理邏輯:
2.3 異步I/O
在NIO的基礎上,AIO可以讓效率有更多的提高。AIO是基於回調和事件驅動,當異步方法提交任務後可以立即返回,由線程池負責來執行任務,當任務完成後可以立即通知給任務的提交者。異步I/O的目的,主要為了控制線程數量,減少過多的線程帶來的內存消耗和CPU在線程調度上的開銷。
異步I/O的使用方式有兩種,一種是Future,另一種是回調函數。
2.3.1 Future
它代表異步計算的結果。
1 interface ArchiveSearcher { String search(String target); } 2 class App {3 ExecutorService executor = ... 4 ArchiveSearcher searcher = ... 5 void showSearch(final String target) 6 throws InterruptedException { 7 Future<String> future 8 = executor.submit(new Callable<String>() { 9 public String call() { 10 return searcher.search(target); 11 }}); 12 displayOtherThings(); // do other things while searching 13 try { 14 displayText(future.get()); // use future 這裏會進行阻塞,一直到計算的完成 15 } catch (ExecutionException ex) { cleanup(); return; } 16 } 17 }
在Future這裏,還要提到FutreTask這個類,它可以包裝Callable和Runnable的對象,此外,因為它繼承Runnable接口,所以它可以被提交給Executor來執行。
2.3.2 回調函數 CompletionHandler
CompletionHandler它是一個處理器,用於異步計算結果。有兩個方法,completed(V result, A attachment)(執行成功時調用)和failed(Throwable exc, A attachment)(執行失敗時調用)。在下列地方,都有使用CompletionHandler作為回調函數的方法:
AsynchronousFileChannel 類的 read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler)
AsynchronousServerSocketChannel類的 accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)
AsynchronousSocketChannel 類的 read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler)
3. ByteBuf
Java NIO中提供ByteBuffer作為byte的容器,在Netty中,使用ByteBuf來替代ByteBuffer。ByteBuf功能更強大,更加靈活。Netty對於數據的處理主要是通過兩個組件,抽象類ByteBuf和接口ByteBufHolder。
3.1 ByteBuf使用方式
HEAP BUFFERS 存儲JVM的堆數據
1 ByteBuf heapBuf = ...; 2 if (heapBuf.hasArray()) { 3 byte[] array = heapBuf.array(); 4 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); 5 int length = heapBuf.readableBytes(); 6 handleArray(array, offset, length); 7 }
DIRECT BUFFERS
這裏的Direct該如何理解呢?它是ByteBuf的一種形式,在使用JNI的時候,它避免了將buffer的內容復制到中間媒介buffer上,可以直接使用。這就是DIrect的說明。
在JavaDoc中,關於ByteBuffer有明確的說明:direct buffers會處於正常的垃圾回收之外,因此,它非常適用於網絡傳輸方面。如果你的數據保留在heap-buffer上面,那麽socket傳輸前,要將buffer復制到direct buffer上。至於,它的不足之處,在於使用direct buffer的時候,必須先復制一份。代碼如下:
1 ByteBuf directBuf = ...; 2 if (!directBuf.hasArray()) { 3 int length = directBuf.readableBytes(); 4 byte[] array = new byte[length]; 5 directBuf.getBytes(directBuf.readerIndex(), array); 6 handleArray(array, 0, length); 7 }
COMPOSITE BUFFERS 就是前面兩種的結合
3.2 接口ByteBufHolder
在保存實際數值的時候,常需要保存一些附加的值。比如:Http返回的例子,除了返回的實際內容外,還有status code,cookies等等。
Netty用ByteBufholder來提供一些共通的方法使用,它有幾個常用的方法:content()、copy()、duplicate()
如果你要實現一個在ByteBuf中存儲實際負載的對象的話,那麽ByteBufHolder是個好的選擇。
3.3 ByteBuf allocation
這裏討論的是ByteBuf實例的管理。
3.3.1 On-demand: interface ByteBufAllocator
ByteBufAllocator 常見方法(部分記錄):buffer()、heapBuffer()、directBuffer()、compositeBuffer()、ioBuffer()
Netty用ByteBufAllocator實現了緩存池。我們可以通過Channel或是綁定在ChannelHandler上的ChannelHandlerContext,來獲得ByteBufAllocator的引用。示例如下:
1 Channel channel = ...; 2 ByteBufAllocator allocator = channel.alloc(); 3 .... 4 ChannelHandlerContext ctx = ...; 5 ByteBufAllocator allocator2 = ctx.alloc();
ByteBufAllocator有兩種實現:PooledByteBufAllocator 和 UnpooledByteBufAllocator 前者會進行緩存;後者,不會緩存ByteBuf的實例,每次調用都會返回新的。
3.3.2 Unpooled buffers
Netty有一個Unpooled的類,來提供靜態方法,用於創建未被緩存ByteBuf實例。
常用的方法:buffer()、directBuffer()、wrappedBuffer()、copiedBuffer()
3.4 Reference counting
Reference counting被用來優化內存的使用,通過釋放不再使用的資源。Netty中的Reference counting 和 ByteBufHolder 都實現了ReferenceCounted接口。ReferenceCounted的實現的實例,它們的活躍的實例的計數都是從1進行的,只要大於0,對象就一定不會被釋放。當數字減為0時,資源就會被釋放。下面給出兩個例子:
Reference counting:
1 Channel channel = ...; 2 ByteBufAllocator allocator = channel.alloc(); 3 .... 4 ByteBuf buffer = allocator.directBuffer(); 5 assert buffer.refCnt() == 1;
Release reference-counted object
1 ByteBuf buffer = ...; 2 boolean released = buffer.release(); // 活躍的引用,變成0,對象被釋放,方法返回true
4. ChannelHandler 和 ChannelPipline
4.1 The ChannelHandler family
4.1.1 Channel 生命周期
ChannelRegistered:Channel被註冊到EventLoop
ChannelActive: Channel活躍,可以發送、接收數據
ChannelInactive:Channel沒有連接到remote peer
ChannelUnregistered:Channel被創建了,但沒被註冊到EventLoop
4.1.2 ChannelHandler
先看看它的接口層次圖:
ChannelHandler 生命周期的方法:handlerAdded(ChannelHandler被加入到ChannelPipeline時調用)、handlerRemoved(被移除時)、exceptionCaught(ChannelPipeline的處理過程中發生exception時調用)
ChannelInboundHandler
處理inbound數據和Channel狀態的改變
常用方法:channelRegistered、channelActive、channelReadComplete、channelRead 等等
看一段代碼:
1 @Sharable //這裏的註解的意思是,這個handler可以被加到多個ChannelPipelines中 2 public class DiscardHandler extends ChannelInboundHandlerAdapter {
// 當繼承ChannelInboundHandler,覆蓋channelRead時,需要明確地釋放緩存的實例
3 @Override
4 public void channelRead(ChannelHandlerContext ctx, Object msg) {
5 ReferenceCountUtil.release(msg);
6 }
7 }
看另一個例子:
1 @Sharable 2 public class SimpleDiscardHandler // SimpleDiscardHandler 會自動釋放資源 3 extends SimpleChannelInboundHandler<Object> { 4 @Override 5 public void channelRead0(ChannelHandlerContext ctx, 6 Object msg) { 7 // 不需要任何明確地釋放資源的操作,這裏只需做業務處理就可以 8 } 9 }
註意:如果一個message被使用或是丟棄,並且沒有被傳給管道中下一個ChannelOutboundHandler時,就一定要調用ReferenceCountUtil.release() 釋放資源。如果消息到達實際傳輸層,那麽,write或是Channel關閉時,會自動釋放資源。
ChannelOutboundHandler:處理outbound數據,並允許所有操作的攔截
常見方法:bind、connect、disconnect、close 等
ChannelHandlerAdapter有一個isSharable()方法,判斷是否有註解@Sharable
4.1.3 Resource management
無論是ChannelInboundHandler.channelRead()還是ChannelOutboundHandler.write(),都需要確保沒有資源泄露。當你使用ByteBuf之後,需要調整引用的計數reference count。為了幫助診斷潛在的問題,Netty提供了一個ResourceLeakDetector,檢測到泄漏時,會打印log。
使用方法:java -Dio.netty.leakDetectionLevel=ADVANCED
參數有四種:DISABLED、SIMPLE(default)、ADVANCED、PARANOID
4.2 ChannelPipeline接口
每個新Channel創建時,都會被指定一個新的ChannelPipeline,這種對應關系是永久的,不會改變。
ChannelPipeline 和 ChannelHandlers 關系圖:
ChannelPipeline一些常用方法:
addLast、addFirst、remove、replace、get、context(獲得綁定在ChannelHandler上的ChannelHandlerContext)、names(獲得ChannelPipeline上的所有ChannelHandler名字)
4.3 ChannelHandlerContext接口
4.3.1 ChannelHandlerContext的使用
它的主要作用就是管理handlers之間的交互。
常見API:bind、channel、close、connect、deregister、fireChannelActive、fireChannelRead、handler、pipeline、read、write 等等
ChannelHandlerContext的使用,如下圖:
每個ChannelHandler都是和一個ChannelHandlerContext相互關聯,關聯關系一直不會改變,因此,緩存一個它的引用很安全。
下面看2個代碼片段:
1 ChannelHandlerContext ctx = ..; 2 Channel channel = ctx.channel(); // 從ChannelHandlerContext關聯到Channel 3 channel.write(Unpooled.copiedBuffer("Netty in Action", 4 CharsetUtil.UTF_8)); 5 。。。 9 ChannelHandlerContext ctx = ..; 10 ChannelPipeline pipeline = ctx.pipeline(); // 從ChannelHandlerContext 關聯到 ChannelPipeline
11 pipeline.write(Unpooled.copiedBuffer("Netty in Action", 12 CharsetUtil.UTF_8));
上面代碼中,雖然Channel 或是 ChannelPipeline 調用的write()方法,代理了pipline中事件的傳遞,實際上,ChannelHandler之間的傳遞是ChannelHandlerContext來做的。
如果你想從某個handler開始(不是從第一個開始),那麽就可以利用ChannelHandlerContext。
1 ChannelHandlerContext ctx = ..; // 獲取ChannelHandlerContext的引用 2 ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); //write() sends the buffer to the next ChannelHandler
4.3.2 ChannelHandler 和ChannelHandlerContext 的高級用法
6.3.2
To be continued!!!
《Netty in action》 讀書筆記