1. 程式人生 > 其它 >Netty 框架學習 —— ByteBuf

Netty 框架學習 —— ByteBuf


概述

網路資料的基本單位總是位元組,Java NIO 提供了 ByteBuffer 作為它的位元組容器,但這個類的使用過於複雜。Netty 的 ByteBuf 具有卓越的功能性和靈活性,可以作為 ByteBuffer 的替代品

Netty 的資料處理 API 通過兩個元件暴露 —— abstract class ByteBuf 和 interface ByteBufHolder,下面是 ByteBuf API 的優點:

  • 可以被使用者自定義的緩衝區型別擴充套件
  • 通過內建的複合緩衝區型別實現透明的零拷貝
  • 容量可以按需增長
  • 在讀和寫這兩種模式之間切換不需要呼叫 ByteBuffer 的 flip() 方法
  • 在讀和寫使用了不同的索引
  • 支援方法的鏈式呼叫
  • 支援引用計數
  • 支援池化

ByteBuf

1. 工作原理

ByteBuf 維護了兩個不同的索引:一個用於讀取,一個用於寫入,當你從 ByteBuf 讀取時,readIndex 會遞增已經被讀取的位元組數。同樣的,當你寫入 ByteBuf 時,它的 writeIndex 也會遞增。readIndex 和 writeIndex 的起始位置都為 0

如果 readIndex 和 writeIndex 的值相等,也即此時已經到了可讀取資料的末尾,就如同達到陣列末尾一樣,試圖讀取超出該點的資料將觸發一個 IndexOutOfBoundsException

名稱以 read 或 write 開頭的 ByteBuf 方法,將會推進其對應的索引,而名稱以 set 或 get 開頭的操作則不會

2. ByteBuf 的使用模式

2.1 堆緩衝區

最常用的 ByteBuf 模式是將資料儲存在 JVM 的堆空間中,這種模式被稱為支撐陣列(backing array)它能在沒有使用池化的情況下提供快速的分配和釋放,適合於有遺留的資料需要處理的情況

ByteBuf heapBuf = ...;
// 檢查 ByteBuf 是否有一個支撐陣列
if(heapBuf.hasArray()) {
    // 獲取對該陣列的引用
    byte[] array = heapBuf.array();
    // 計算第一個位元組的偏移量
    int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
    // 獲得可讀位元組數
    int length = heapBuf.readableBytes();
    // 使用陣列、偏移量和長度作為引數呼叫你的方法
    handleArray(array, offset, length);
}
2.2 直接緩衝區

直接緩衝區使用本地記憶體儲存資料,更適合用於網路傳輸,但相對於堆緩衝區,其分配和釋放都較為昂貴。另外,如果你正在處理遺留程式碼,處理直接緩衝區內容時,你必須將其內容進行一次複製

ByteBuf directBuf = ...;
// 不是支撐陣列就是直接緩衝區
if(!directBuf.hasArray()) {
    // 獲取可讀位元組數
    int length = directBuf.readableBytes();
    // 分配一個新的陣列來儲存具有該長度的位元組陣列
    byte[] array = new byte[length];
    // 將位元組複製到該陣列
    directBuf.getBytes(directBuf.readerIndex(), array);
    // 使用陣列、偏移量和長度作為引數呼叫你的方法
    handleArray(array, 0, length);
}
2.3 複合緩衝區

複合緩衝區為多個 ByteBuf 提供了一個聚合檢視,可以根據需要新增或刪除 ByteBuf 例項。Netty 通過一個 ByteBuf 子類 —— CompositeByteBuf 實現這個模式,它提供了一個將多個緩衝區表示為單個合併緩衝區的虛擬表示

CompositeByteBuf 中的 ByteBuf 例項可能同時包含直接記憶體和非直接記憶體分配,如果其中只有一個例項,那麼對 CompositeByteBuf 上的 hasArray() 方法的呼叫將返回該陣列上的 hasArray() 方法的值,否則返回 false

CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
// 將 ByteBuf 例項追加到 CompositeByteBuf
messageBuf.addComponents(headerBuf, bodyBuf);
...
// 刪除第位於索引位置為 0 的 ByteBuf
messageBuf.removeComponent(0);
// 迴圈遍歷所有的 ByteBuf 例項
for(ByteBuf buf : messageBuf) {
    System.out.println(buf.toString());
}

位元組級操作

1. 隨機訪問索引

如同普通的 Java 位元組陣列一樣,ByteBuf 的索引是從零開始的:第一個位元組的索引是 0,最後一個位元組的索引總是 capacity() - 1

ByteBuf buffer = ...;
for(int i = 0; i < buffer.capacity(); i++) {
    byte b = buffer.getByte(i);
    System.out.println((char) b)
}

這種需要一個索引值引數的方法訪問資料不會改變 readerIndex 也不會改變 writerIndex。如果需要改變,也可以通過呼叫 readerIndex(index) 或者 writerIndex(index) 來手動移動這兩者

2. 順序訪問索引

雖然 ByteBuf 同時具有讀索引和寫索引,但是 JDK 的 ByteBuf 卻只有一個索引,這也就是為什麼必須呼叫 flip() 方法來在讀模式和寫模式之間進行切換的原因

3. 可丟棄位元組

可丟棄位元組的分段包含了已經被讀過的位元組,通過呼叫 discardReadBytes() 方法,可以丟棄它們並回收空間。這個分段的初始大小為 0,儲存在 readerIndex 中,會隨著 read 操作的執行而增加

可能你會想到頻繁呼叫 discardReadBytes() 方法以確保可寫分段的最大化,但這極有可能會導致記憶體複製,因為可讀欄位必須被移動到緩衝區的開始位置

4. 可讀位元組

ByteBuf 的可讀位元組分段儲存了實際資料,新分配的、包裝的或者複製的緩衝區的預設的 readerIndex 值為 0。任何名稱以 read 或者 skip 開頭的操作都將檢索或者跳過位於當前 readerIndex 的資料,並且將它增加已讀位元組數

如果嘗試在緩衝區的可讀位元組數已經耗盡時從中讀取資料,那麼將會引發一個 IndexOutOfBoundsException

ByteBuf buffer = ...;
while(buffer.isReadable()) {
    System.out.println(buffer.readByte());
}

5. 可寫位元組

可寫位元組分段是指一個擁有未定義內容、寫入就緒的記憶體區域。新分配的緩衝區的 writerIndex 的預設值為 0.任何名稱以 write 開頭的操作都將從當前的 writerIndex 處開始寫資料,並將它增加已經寫入的位元組數。如果寫操作的目標是 ByteBuf,並且沒有指定源索引的值,則緩衝區的 readerIndex 也同樣會被增加相同的大小

writeBytes(ByteBuf dest)

如果嘗試往目標寫入超過目標容量的資料,將會引發一個 IndexOutOfBoundException

ByteBuf buffer = ...;
while(buffer.writableBytes() >= 4) {
    buffer.writeInt(random.nextInt());
}

6. 索引管理

JDK 的 InputStream 定義了 mark(int readlimit) 和 reset() 方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置

同樣,可以通過 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex() 來標記和重置 ByteBuf 的 readerIndex 和 writerIndex

也可以通過 readerIndex(int) 或者 writerIndex(int) 來將索引移動到指定位置。任何試圖將索引設定到無效位置都將導致 IndexOutOfBoundsException

可以通過呼叫 clear() 方法來將 readerIndex 和 writerIndex 都設定為 0,這樣並不會清除記憶體中的內容。呼叫 clear() 比呼叫 discardReadBytes() 輕量得多,因為它只是重置索引

7. 查詢操作

在 ByteBuf 中有多種可以用來確定指定值的索引的方法,最簡單的是 indexOf() 方法。較為複雜的查詢可以通過那些需要一個 ByteBufProcessor 作為引數的方法達成,這個介面只定義了一個方法

boolean process(byte value);

它將檢查輸入值是否是正在查詢的值,ByteBufProcessor 針對一些常見的值定義了許多便利方法

ByteBuf buffer = ...;
// 查找回車符 \r
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

8. 派生緩衝區

派生緩衝區為 ByteBuf 提供了以專門的方式來呈現其內容的檢視,這些檢視通過以下方法被建立

  • duplicate()
  • slice()
  • slice(int, int)
  • Unpooled.unmodifiableBuffer(...)
  • order(ByteOrder)
  • readSlice(int)

這些方法都將返回一個新的 ByteBuf 例項,其內部儲存和 JDK 的 ByteBuffer 共享,這也意味著,如果你修改了它的內容,也即同時修改了其對應的源例項。如果需要一個現有緩衝區的真實副本,請使用 copy() 或 copy(int, int) 方法

// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 建立該 ByteBuf 從索引 0 到 15 結束的一個新切片
ByteBuf sliced = buf.slice(0, 15);
// 更新索引 0 處的位元組
buf.setByte(0, (byte) 'J');
// 成功,因為資料是共享的
assert buf.getByte(0) == sliced.getByte(0);
// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 建立該 ByteBuf 從索引 0 到 15 結束的一個新副本
ByteBuf sliced = buf.copy(0, 15);
// 更新索引 0 處的位元組
buf.setByte(0, (byte) 'J');
// 成功,因為資料不是共享的
assert buf.getByte(0) != sliced.getByte(0);

9. 讀/寫操作

有兩種類別的讀/寫操作:

  • get() / set() 操作,從給定的索引開始,並且索引不會改變
  • read() / write() 操作,從給定的索引開始,並且會根據已經訪問過的位元組數對索引進行調整
方法 描述
setBoolean (int , boolean) 設定給定索引處的 Boolean 值
getBoolean(int) 返回給定索引處的 Boolean 值
setByte(int index, int value) 設定給定索引處的位元組值
getByte(int) 返回給定索引處的位元組
getUnsignedByte(int ) 將給定索引處的無符號位元組值作為 short 返回
setMedium(int index , int value) 設定給定索引處的 24 位的中等 int值
getMedium(int) 返回給定索引處的 24 位的中等 int 值
getUnsignedMedium (int) 返回給定索引處的無符號的 24 位的中等 int 值
setint(int index , int value) 設定給定索引處的 int 值
getint (int) 返回給定索引處的 int 值
getUnsignedint(int) 將給定索引處的無符號 int 值作為 long 返回
setLong(int index, long value) 設定給定索引處的 long 值
getLong(int) 返回給定索引處的 long 值
setShort(int index, int value) 設定給定索引處的 short 值
getShort(int) 返回給定索引處的 short 值
getUnsignedShort(int) 將給定索引處的無符號 short 值作為 int 返回
getBytes (int, …) 將該緩衝區中從給定索引開始的資料傳送到指定的目的地

read/write 操作的 API 和 set/get 大同小異,只不過會增加索引值

ByteBuf 還提供了其他有用的操作

方法 描述
isReadable () 如果至少有一個位元組可供讀取,則返回 true
isWritable () 如果至少有一個位元組可被寫入,則返回 true
readableBytes() 返回可被讀取的位元組數
writableBytes() 返回可被寫入的位元組數
capacity() 返回 ByteBuf 可容納的位元組數 。在此之後,它會嘗試再次擴充套件直到達到maxCapacity ()
maxCapacity() 返問 ByteBuf 可以容納的最大位元組數
hasArray() 如果 ByteBuf 由一個位元組陣列支撐,則返回 true
array () 如果 ByteBuf 由一個位元組陣列支撐則返問該陣列;否則,它將丟擲 一個 UnsupportedOperat工onException 異常

ByteBuf 分配

1. 按需分配

為了降低分配和釋放記憶體的開銷,Netty 通過 interface ByteBufAllocator 實現了 ByteBuf 的池化,用於分配 ByteBuf 例項

下面是 ByteBufAllocator 的一些 API

方法 描述
buffer()buffer(int initialCapacity);buffer(int initialCapacity, int maxCapacity); 返回一個基於堆或者直接記憶體儲存的 ByteBuf
heapBuffer ()heapBuffer(int initialCapacity)heapBuffer(int initialCapacity, int maxCapacity) 返回一個基於堆記憶體儲存的 ByteBuf
directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity , int maxCapacity) 返回一個基於直接記憶體儲存的 ByteBuf
compositeBuffer()compositeBuffer(int maxNumComponents) compositeDirectBuffer()compositeDirectBuffer (int maxNumComponents); compositeHeapBuffer()compositeHeapBuffer(int maxNumComponents); 返回一個可以通過新增最大到指定數目的基於堆的或者直接記憶體儲存的緩衝區來擴充套件的 CompositeByteBuf
ioBuffer() 返回一個用於套接字的 I/O 操作的 ByteBuf。預設地, 當所執行的環境具有 sun.misc.Unsafe支援時,返回基於直接記憶體儲存的 ByteBuf,否則返回基於堆記憶體儲存的 ByteBuf;當指定使用 PreferHeapByteBufAllocator 時,則只會返回基於堆記憶體儲存的 ByteBuf

可以通過 Channel 或者繫結到 ChannelHandler 的 ChannelHandlerContext 獲取一個 ByteBufAllocator 的引用

Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
...
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator = ctx.alloc();

Netty 提供了兩種 ByteBufAllocator 的實現:PooledByteBufAllocator 和 UnpooledByteBufAllocator ,前者池化了 ByteBuf 例項以提供效能,最大限度減少記憶體碎片。後者不池化 ByteBuf 例項,每次呼叫都會返回一個新的例項

2. Unpooled 緩衝區

如果你未能獲取 ByteBufAllocator 例項,Netty 也提供了名為 Unpooled 的工具類,它提供了靜態的輔助方法來建立未池化的 ByteBuf 例項

方法 描述
buffer()buffer(int 工nitialCapacity)buffer(int initialCapacity, int maxCapacity) 返回一個未池化的基於堆記憶體儲存的ByteBuf
directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity, int maxCapacity) 返回一個未池化的基於直接記憶體儲存ByteBuf
wrappedBuffer() 返回一個包裝了給定資料的ByteBuf
copiedBuffer() 返回一個複製了給定資料的 ByteBuf

3. ByteBufUtil 類

ByteBufUtil 提供了用於操作 ByteBuf 的靜態的輔助方法。因為這個 API 是通用的,並且和池化無關,所以這些方法已然在分配類的外部實現

這些靜態方法中最有價值的可能就是 hexdump() 方法,它以十六進位制的表示形式列印 ByteBuf 的內容。 這在各種情況下都很有用,例如,出於除錯 的目的記錄 ByteBuf 的內容。十六進位制的表示通常會提供一個比位元組值的直接表示形式更加有用的日誌條目,此外,十六進位制的版本還可以很容易地轉換回實際的位元組表示

另一個有用的方法是 boolean equals(ByteBuf , ByteBuf),它被用來判斷兩個 ByteBuf 例項的相等性。 如果你實現自己的 ByteBuf 子類,你可能會發現 ByteBufUtil 的其他有用方法