ByteBuf(秒懂)- 圖解Netty系列
Netty ByteBuf(圖解 )之一
瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之15 【 】
原始碼工程
寫在前面
大家好,我是作者尼恩。
今天是百萬級流量 Netty 聊天器 打造的系列文章的第15篇,這是一個基礎篇。
由於關於ByteBuf的內容比較多,分兩篇文章:
第一篇:圖解 ByteBuf的分配、釋放和如何避免記憶體洩露
本篇為第一篇。
Netty ByteBuf 優勢
Netty 提供了ByteBuf,來替代Java NIO的 ByteBuffer 緩,來操縱記憶體緩衝區。
與Java NIO的 ByteBuffer 相比,ByteBuf的優勢如下:
-
Pooling (池化,這點減少了記憶體複製和GC,提升效率)
-
可以自定義緩衝型別
-
通過一個內建的複合緩衝型別實現零拷貝
-
擴充套件性好,比如 StringBuffer
-
不需要呼叫 flip()來切換讀/寫模式
-
讀取和寫入索引分開
-
方法鏈
-
引用計數
手動獲取與釋放ByteBuf
Netty環境下,業務處理的程式碼,基本上都在Handler處理器中的各個入站和出站方法中。
一般情況下,採用如下方法獲取一個Java 堆中的緩衝區:
ByteBuf heapBuffer = ctx.alloc().heapBuffer();
使用完成後,通過如下的方法,釋放緩衝區:
ReferenceCountUtil.release(heapBuffer );
上面的程式碼很簡單,通過release方法減去 heapBuffer 的使用計數,Netty 會自動回收 heapBuffer 。
緩衝區記憶體的回收、二次分配等管理工作,是 Netty 自動完成的。
自動獲取和釋放 ByteBuf
方式一:TailHandler 自動釋放
Netty預設會在ChannelPipline的最後新增的那個 TailHandler 幫你完成 ByteBuf的release。
先看看,自動建立的ByteBuf例項是如何登場的?
Netty自動建立 ByteBuf例項
Netty 的 Reactor 執行緒會在 AbstractNioByteChannel.NioByteUnsafe.read() 處呼叫 ByteBufAllocator建立ByteBuf例項,將TCP緩衝區的資料讀取到 Bytebuf 例項中,並呼叫 pipeline.fireChannelRead(byteBuf) 進入pipeline 入站處理流水線。
預設情況下,TailHandler自動釋放掉ByteBuf例項
Netty的ChannelPipleline的流水線的末端是TailHandler,預設情況下如果每個入站處理器Handler都把訊息往下傳,TailHandler會釋放掉ReferenceCounted型別的訊息。
說明:
上圖中,TailHandler 寫成了TailContext,這個是沒有錯的。
對於流水線的頭部和尾部Hander來說, Context和Hander ,是同一個類。
HeadContext 與HeadHandler ,也是同一個類。
關於Context與Handler 的關係,請看 瘋狂創客圈 的系列文章。
如果沒有到達末端呢?
一種沒有到達入站處理流水線pipeline末端的情況,如下圖所示:
這種場景下,也有一種自動釋放的解決辦法,它就是:
可以繼承 SimpleChannelInboundHandler,實現業務Handler。 SimpleChannelInboundHandler 會完成ByteBuf 的自動釋放,釋放的處理工作,在其入站處理方法 channelRead 中。
方式二:SimpleChannelInboundHandler 自動釋放
如果業務Handler需要將 ChannelPipleline的流水線的預設處理流程截斷,不進行後邊的inbound入站處理操作,這時候末端 TailHandler自動釋放緩衝區的工作,自然就失效了。
這種場景下,業務Handler 有兩種選擇:
-
手動釋放 ByteBuf 例項
-
繼承 SimpleChannelInboundHandler,利用它的自動釋放功能。
本小節,我們聚焦的是第二種選擇:看看 SimpleChannelInboundHandler是如何自動釋放的。
利用這種方法,業務處理Handler 必須繼承 SimpleChannelInboundHandler基類。並且,業務處理的程式碼,必須 移動到 重寫的 channelRead0(ctx, msg)方法中。
如果好奇,想看看 SimpleChannelInboundHandler 是如何釋放ByteBuf 的,那就一起來看看Netty原始碼。
擷取的程式碼如下所示:
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
{
//...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
原始碼中,執行完重寫的channelRead0()後,在 finally 語句塊中,ByteBuf 的生命被結束掉了。
上面兩種,都是入站處理(inbound)過程中的自動釋放。
**出站處理(outbound)**流程,又是如何自動釋放呢?
方式三:HeadHandler 自動釋放
出站處理流程中,申請分配到的 ByteBuf,通過 HeadHandler 完成自動釋放。
出站處理用到的 Bytebuf 緩衝區,一般是要傳送的訊息,通常由應用所申請。在出站流程開始的時候,通過呼叫 ctx.writeAndFlush(msg),Bytebuf 緩衝區開始進入出站處理的 pipeline 流水線 。在每一個出站Handler中的處理完成後,最後訊息會來到出站的最後一棒 HeadHandler,再經過一輪複雜的呼叫,在flush完成後終將被release掉。
強調一下,HeadContext (HeadHandler)是出站處理流程的最後一棒。
出站處理的全過程,請檢視瘋狂創客圈的專門文章。
如何避免記憶體洩露
基本上,在 Netty的開發中,通過 ChannelHandlerContext 或 Channel 獲取的緩衝區ByteBuf 預設都是Pooled,所以需要再合適的時機對其進行釋放,避免造成記憶體洩漏。
自動釋放的注意事項
我們已經知道了三種自動釋放方法:
-
通過 TailHandler 自動釋放入站 ByteBuf
-
繼承 SimpleChannelInboundHandler 的完成 入站ByteBuf 自動釋放
-
通過HeadHandler自動釋放出站 ByteBuf
自動釋放,注意事項如下:
-
入站處理流程中,如果對原訊息不做處理,預設會呼叫 ctx.fireChannelRead(msg) 把原訊息往下傳,由流水線最後一棒 TailHandler 完成自動釋放。
-
如果截斷了入站處理流水線,則可以繼承 SimpleChannelInboundHandler ,完成入站ByteBuf 自動釋放。
-
出站處理過程中,申請分配到的 ByteBuf,通過 HeadHandler 完成自動釋放。
出站處理用到的 Bytebuf 緩衝區,一般是要傳送的訊息,通常由應用所申請。在出站流程開始的時候,通過呼叫 ctx.writeAndFlush(msg),Bytebuf 緩衝區開始進入出站處理的 pipeline 流水線 。在每一個出站Handler中的處理完成後,最後訊息會來到出站的最後一棒 HeadHandler,再經過一輪複雜的呼叫,在flush完成後終將被release掉。
手動釋放的注意事項
手動釋放是自動釋放的重要補充和輔助。
手動釋放操作,大致有如下注意事項:
-
入站處理中,如果將原訊息轉化為新的訊息並呼叫 ctx.fireChannelRead(newMsg)往下傳,那必須把原訊息release掉;
-
入站處理中,如果已經不再呼叫 ctx.fireChannelRead(msg) 傳遞任何訊息,也沒有繼承SimpleChannelInboundHandler 完成自動釋放,那更要把原訊息release掉;
-
多層的異常處理機制,有些異常處理的地方不一定準確知道ByteBuf之前釋放了沒有,可以在釋放前加上引用計數大於0的判斷避免異常; 有時候不清楚ByteBuf被引用了多少次,但又必須在此進行徹底的釋放,可以迴圈呼叫reelase()直到返回true。
特別需要強調的,是上邊的第一種情況。
如果在入站處理的 handlers 傳遞過程中,傳遞了新的ByteBuf 值,老ByteBuf 值需要自己手動釋放。老的ByteBuf 值,就是從pipeline流水線入口傳遞過來的 ByteBuf 例項。
總之,只要是在傳遞過程中,沒有傳遞下去的ByteBuf就需要手動釋放,避免不必要的記憶體洩露。
緩衝區 Allocator 分配器
Netty通過 ByteBufAllocator分配緩衝區。
Netty提供了ByteBufAllocator的兩種實現:PoolByteBufAllocator和UnpooledByteBufAllocator。前者將ByteBuf例項放入池中,提高了效能,將記憶體碎片減少到最小。這個實現採用了一種記憶體分配的高效策略,稱為 jemalloc。它已經被好幾種現代作業系統所採用。後者則沒有把ByteBuf放入池中,每次被呼叫時,返回一個新的ByteBuf例項。
分配器 Allocator的型別
PooledByteBufAllocator:可以重複利用之前分配的記憶體空間。
為了減少記憶體的分配回收以及產生的記憶體碎片,Netty提供了PooledByteBufAllocator 用來分配可回收的ByteBuf,可以把PooledByteBufAllocator 看做一個池子,需要的時候從裡面獲取ByteBuf,用完了放回去,以此提高效能。
UnpooledByteBufAllocator:不可重複利用,由JVM GC負責回收。
顧名思義Unpooled就是不會放到池子裡,所以根據該分配器分配的ByteBuf,不需要放回池子,由JVM自己GC回收。
這兩個類,都是AbstractByteBufAllocator的子類,AbstractByteBufAllocator實現了一個介面,叫做ByteBufAllocator。
可以做一個對比試驗:
使用UnpooledByteBufAllocator的方式建立ByteBuf的時候,單臺24核CPU的伺服器,16G記憶體,剛啟動時候,10000個長連線,每秒所有的連線發一條訊息,短時間內,可以看到記憶體佔到10G多點,但隨著系統的執行,記憶體不斷增長,直到整個系統記憶體溢位掛掉。
把UnpooledByteBufAllocator換成PooledByteBufAllocator,通過試驗,記憶體使用量機器能維持在一個連線佔用1M左右,記憶體在10G左右,經常長期的執行測試,發現都能維持在這個數量,系統記憶體不會崩潰。
預設的分配器
預設的分配器 ByteBufAllocator.DEFAULT ,可以通過 Java 系統引數(SystemProperty )選項 io.netty.allocator.type 去配置,使用字串值:“unpooled”,“pooled”。
關於這一段,Netty的原始碼擷取如下:
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
Object alloc;
if("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: unpooled (unknown: {})", allocType);
}
不同的Netty版本,原始碼不一樣。
上面的程式碼,是4.0版本的原始碼,預設為UnpooledByteBufAllocator。
而4.1 版本,預設為 PooledByteBufAllocator。因此,4.1版本的程式碼,是和上面的程式碼稍微有些不同的。
設定通道Channel的分配器
在4.x版本中,UnpooledByteBufAllocator是預設的allocator,儘管其存在某些限制。
現在PooledByteBufAllocator已經廣泛使用一段時間,並且我們有了增強的緩衝區洩漏追蹤機制,所以是時候讓PooledByteBufAllocator成為默認了。
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(...);
}
});
使用Netty帶來的又一個好處就是記憶體管理。只需一行簡單的配置,就能獲得到記憶體池帶來的好處。在底層,Netty實現了一個Java版的Jemalloc記憶體管理庫,為我們做完了所有“髒活累活”!
緩衝區記憶體的型別
說完了分配器的型別,再來說下緩衝區的型別。
依據記憶體的管理方不同,分為堆快取和直接快取。也就是Heap ByteBuf 和 Direct ByteBuf。另外,為了方便緩衝區進行組合,提供了一種組合快取區。
三種緩衝區的介紹如下:
使用模式 | 描述 | 優點 | 劣勢 |
---|---|---|---|
堆緩衝區 | 資料存儲存在JVM的堆空間中,又稱為支撐陣列,通過 hasArray 來判斷是不是在堆緩衝區中 | 沒使用池化情況下能提供快速的分配和釋放 | 傳送之前都會拷貝到直接緩衝區 |
直接緩衝區 | 儲存在實體記憶體中 | 能獲取超過jvm堆限制大小的空間; 寫入channel比堆緩衝區更快 |
釋放和分配空間昂貴(使用系統的方法) ; 操作時需要複製一次到堆上 |
複合緩衝 | 單個緩衝區合併多個緩衝區表示 | 操作多個更方便 | - |
上面三種緩衝區的型別,無論哪一種,都可以通過池化、非池化的方式,去獲取。
Unpooled 非池化緩衝區的使用方法
Unpooled也是用來建立緩衝區的工具類,Unpooled 的使用也很容易。
看下面程式碼:
//建立複合緩衝區
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
//建立堆緩衝區
ByteBuf heapBuf = Unpooled.buffer(8);
//建立直接緩衝區
ByteBuf directBuf = Unpooled.directBuffer(16);
Unpooled 提供了很多方法,詳細方法大致如下:
方法名稱 | 描述 |
---|---|
buffer() buffer(int initialCapacity) buffer(int initialCapacity, int maxCapacity) |
返回 heap ByteBuf |
directBuffer() directBuffer(int initialCapacity) directBuffer(int initialCapacity, intmaxCapacity) |
返回 direct ByteBuf |
compositeBuffer() | 返回 CompositeByteBuf |
copiedBuffer() | 返回 copied ByteBuf |
Unpooled類的應用場景
Unpooled類讓ByteBuf也同樣適用於不需要其他的Netty元件的、無網路操作的專案,這些專案可以從這個高效能的、可擴充套件的buffer API中獲益。
寫在最後
至此為止,終於完成ByteBuf的分配、釋放和如何避免記憶體洩露介紹。
接下來是:
瘋狂創客圈 Java 死磕系列
- Java (Netty) 聊天程式【 億級流量】實戰 開源專案實戰
- Netty 原始碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡