1. 程式人生 > >7.netty記憶體管理-ByteBuf

7.netty記憶體管理-ByteBuf

  • ByteBuf
    • ByteBuf是什麼
    • ByteBuf重要API
      • read、write、set、skipBytes
      • mark和reset
      • duplicate、slice、copy
      • retain、release
      • ByteBuf擴容
    • ByteBuf種類
  • ByteBufAllocate
    • UnPooledByteBufAllocate
      • newHeapBuffer
      • newDirectBuffer

ByteBuf

ByteBuf是什麼

為了平衡資料傳輸時CPU與各種IO裝置速度的差異性,計算機設計者引入了緩衝區這一重要抽象。jdkNIO庫提供了java.nio.Buffer介面,並且提供了7種預設實現,常見的實現類為ByteBuffer。不過netty並沒有直接使用nio的ByteBuffer,這主要是由於jdk的Buffer有以下幾個缺點:

  1. 當呼叫allocate方法分配記憶體時,Buffer的長度就固定了,不能動態擴充套件和收縮,當寫入資料大於緩衝區的capacity時會發生陣列越界錯誤
  2. Buffer只有一個位置標誌位屬性position,讀寫切換時,必須先呼叫flip或rewind方法。不僅如此,因為flip的切換
  3. Buffer只提供了存取、翻轉、釋放、標誌、比較、批量移動等緩衝區的基本操作,想使用高階的功能(比如池化),就得自己手動進行封裝及維護,使用非常不方便。
    也因此,netty實現了自己的緩衝區——ByteBuf,連名字都如此相似。那麼ByteBuf是如何規避ByteBuffer的缺點的?
    第一點顯然是很好解決的,由於ByteBuf底層也是陣列,那麼它就可以像ArrayList一樣,在寫入操作時進行容量檢查,當容量不足時進行擴容。
    第二點,ByteBuf通過2個索引readerIndex,writerIndex將陣列分為3部分,如下圖所示
+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity

初始化時,readerIndex和writerIndex都是0,隨著資料的寫入writerIndex會增加,此時readable byte部分增加,writable bytes減少。當讀取時,discardable bytes增加,readable bytes減少。由於讀操作只修改readerIndex,寫操作只修改writerIndex,讓ByteBuf的使用更加容易理解,避免了由於遺漏flip導致的功能異常。
此外,當呼叫discardReadBytes方法時,可以把discardable bytes這部分的記憶體釋放。總體想法是通過將readerIndex移動到0,writerIndex移動到writerIndex-readerIndex下標,具體移動下標的方式依據ByteBuf實現類有所不同。這個方法可以顯著提高緩衝區的空間複用率,避免無限度的擴容,但會發生位元組陣列的記憶體複製,屬於以時間換空間的做法。

ByteBuf重要API

read、write、set、skipBytes

前3個系列的方法及最後一個skipBytes都屬於改變指標的方法。舉例來說,readByte會移動readerIndex1個下標位,而int是4個byte的大小,所以readInt會移動readerIndex4個下標位,相應的,writeByte會移動writerIndex1個下標位,writeInt會移動writerIndex4個下標位。set系列方法比較特殊,它的引數為index和value,意即將value寫入指定的index位置,但這個操作不會改變readerIndex和writerIndex。skipBytes比較簡單粗暴,直接將readerIndex移動指定長度。

mark和reset

markReaderIndex和markWriterIndex可以將對應的指標做一個標記,當需要重新操作這部分資料時,再使用resetReaderIndex或resetWriterIndex,將對應指標復位到mark的位置。

duplicate、slice、copy

這3種方法都可以複製一份位元組陣列,不同之處在於duplicate和slice兩個方法返回的新ByteBuf和原有的老ByteBuf之間的內容會互相影響,而copy則不會。duplicate和slice的區別在於前者複製整個ByteBuf的位元組陣列,而後者預設僅複製可讀部分,但可以通過slice(index, length)分割指定的區間。

retain、release

這是ByteBuf介面繼承自ReferenceCounted介面的方法,用於引用計數,以便在不使用物件時及時釋放。實現思路是當需要使用一個物件時,計數加1;不再使用時,計數減1。考慮到多執行緒場景,一般也多采用AtomicInteger實現。netty卻另闢蹊徑,選擇了volatile + AtomicIntegerFieldUpdater這樣一種更節省記憶體的方式。

ByteBuf擴容

在ByteBuf寫入資料時會檢查可寫入的容量,若容量不足會進行擴容。

final void ensureWritable0(int minWritableBytes) {
    if (minWritableBytes <= writableBytes()) {
        return;
    }
    int minNewCapacity = writerIndex + minWritableBytes;
    int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
    int fastCapacity = writerIndex + maxFastWritableBytes();
    if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
        newCapacity = fastCapacity;
    }
    capacity(newCapacity);
}

忽略一些檢驗性質的程式碼後,可以看到擴容時先嚐試將現有寫索引加上需要寫入的容量大小作為最小新容量,並呼叫ByteBufAllocate的calculateNewCapacity方法進行計算。跟入這個方法:

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
    if (minNewCapacity == threshold) {
        return threshold;
    }
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }
    return Math.min(newCapacity, maxCapacity);
}

可以看到這個方法的目的則是計算比可寫容量稍大的2的冪次方。minNewCapacity由上一個方法傳入,而maxCapacity則為Integer.MAX_VALUE。具體步驟是首先判斷新容量minNewCapacity是否超過了計算限制CALCULATE_THRESHOLD,預設為4M,如果沒有超過4MB,那麼從64B開始不斷以2的冪次方形式擴容,直到newCapacity超過minNewCapacity。而若一開始新容量就超過了4M,則調整新容量到4M的倍數+1。比如newCapacity為6M,因為6/4 = 1,所以調整為(1+1)*4M=8M。

在計算完容量之後會呼叫capacity方法。這是一個抽象方法,這裡以UnpooledHeapByteBuf為例。

public ByteBuf capacity(int newCapacity) {
    checkNewCapacity(newCapacity);
    byte[] oldArray = array;
    int oldCapacity = oldArray.length;
    if (newCapacity == oldCapacity) {
        return this;
    }
    int bytesToCopy;
    if (newCapacity > oldCapacity) {
        bytesToCopy = oldCapacity;
    } else {
        trimIndicesToCapacity(newCapacity);
        bytesToCopy = newCapacity;
    }
    byte[] newArray = allocateArray(newCapacity);
    System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
    setArray(newArray);
    freeArray(oldArray);
    return this;
}

首先檢查newCapacity是否大於0且小於最大容量。之後準備好老陣列要複製的長度。trimIndicesToCapacity(newCapacity)是縮容時呼叫的,它將readerIndex和newCapacity的較小值設定為新的readerIndex,將newCapacity設定為新的writerIndex。
之後便分配一個新陣列,並開始複製舊陣列的元素。複製成功後,將新陣列儲存為成員變數,將老陣列釋放掉。

ByteBuf種類

出於效能和空間的多方考慮,netty從3個維度定義了各種不同的ByteBuf實現類,主要是池化、堆內堆外、可否使用Unsafe類這3個維度,從而演化出8種不同的ByteBuf,它們分別是PooledUnsafeHeapBytebuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf、PooledDirectBytebuf、UnpooledUnsafeHeapByteBuf、UnpooledHeapByteBuf、UnpooledUnsafeDirectByteBuf、UnpooledDirectByteBuf。
ByteBuf介面之下有一個抽象類AbstractByteBuf,實現了介面定義的read、write、set相關的方法,但在實現時只做了檢查,而具體邏輯則定義一系列以_開頭的proteced方法,留待子類實現。

ByteBufAllocate

不同於一般形式的建立物件,ByteBuf需要通過記憶體分配器ByteBufAllocate分配,對應於不同的ByteBuf也會有不同的BtteBufferAllocate。netty將之抽象為ByteBufAllocate介面。我們看一下有哪些方法:

  1. buffer()、buffer(initialCapacity)、buffer(initialCapacity、maxCapacity),分配ByteBuf的方法,具體分配的Buffer是堆內還是堆外則由實現類決定。2個過載方法分別以給定初始容量、最大容量的方式分配記憶體
  2. ioBuffer()、ioBuffer(initialCapacity)、ioBuffer(initialCapacity、maxCapacity)更傾向於分配堆外記憶體的方法,因為堆外記憶體更適合用於IO操作。過載方法同上
  3. heapBuffer()、heapBuffer(initialCapacity)、heapBuffer(initialCapacity、maxCapacity)分配堆內記憶體的方法。
  4. directBuffer()、directBuffer(initialCapacity)、directBuffer(initialCapacity、maxCapacity)分配堆外記憶體的方法。
  5. compositeBuffer()。可以將多個ByteBuf合併為一個ByteBuf,多個ByteBuf可以部分是堆內記憶體,部分是堆外記憶體。
    ByteBufAllocate介面定義了heap和direct這一個維度,其他維度則交由子類來定義。

UnPooledByteBufAllocate

ByteBufAllocate有一個直接實現類AbstractByteBufAllocate,它實現了大部分方法,只留下2個抽象方法newHeapBuffer和newDirectBuffer交由子類實現。AbstractByteBufAllocate有2個子類PooledByteBufAllocate和UnpooledByteBufAllocate,在這裡定義了pooled池化維度的分配方式。
看看UnpooledByteBufAllocate如何實現2個抽象方法:

newHeapBuffer

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

可以看到實現類根據PlatformDependent.hasUnsafe()方法自動判定是否使用unsafe維度,這個方法通過在靜態程式碼塊中嘗試初始化sun.misc.Unsafe來判斷Unsafe類是否在當前平臺可用,在juc中,這個類使用頗多,作為與高併發打交道的netty,出現這個類不令人意外。UnpooledUnsafeHeapByteBuf與UnpooledHeapByteBuf並不是平級關係,事實上前者繼承了後者,在構造方法上也直接呼叫UnpooledHeapByteBuf的構造方法。構造方法比較簡單,初始化byte陣列、初始容量、最大容量,將讀寫指標的設定為0,並將子類傳入的this指標儲存到alloc變數中。
兩種Bytebuf的區別在於unsafe會嘗試通過反射的方式建立byte陣列,並將陣列的地址儲存起來,之後再獲取資料時也會呼叫Unsafe的getByte方法,通過陣列在記憶體中的地址+偏移量的形式直接獲取,而普通的SafeByteBuf則是儲存byte陣列,通過陣列索引即array[index]訪問。

// UnsafeHeapByteBuf初始化陣列
protected byte[] allocateArray(int initialCapacity) {
    return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
// HeapByteBuf初始化陣列
protected byte[] allocateArray(int initialCapacity) {
    return new byte[initialCapacity];
}
// UnsafeHeapByteBuf通過UnsafeByteBufUtil獲取位元組
static byte getByte(byte[] data, int index) {
    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
// HeapByteBuf獲取位元組
static byte getByte(byte[] memory, int index) {
    return memory[index];
}

newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

DirectByteBuf構造方法大致與heap的類似,只是儲存資料的容器由位元組陣列變為了jdk的ByteBuffer。相應的,分配與釋放記憶體的方法也變成呼叫jdk的ByteBuffer方法。而UnsafeByteBuf更是直接用long型別記錄記憶體地址。

// DirectByteBuf獲取位元組
protected byte _getByte(int index) {
    return buffer.get(index);
}
// UnsafeDirectByteBuf獲取位元組
protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(addr(index));
}
// 獲取記憶體地址
final long addr(int index) {
    return memoryAddress + index;
}
// UnsafeByteBufUtil獲取位元組
static byte getByte(long address) {
    return UNSAFE.getByte(address);
}

由於PooledByteBufAllocate內容較為龐大,放入下一節講述。
未完待續·