1. 程式人生 > >Netty ByteBuf原理及其原始碼分析

Netty ByteBuf原理及其原始碼分析

 類圖    


   緩衝區介紹

    當我們進行資料傳輸的時候,往往需要緩衝區。java NIO 中自帶的提供的就是java.nio.Buffer

    但是由於java自帶的過於複雜,而且自身也有一定的缺陷(定長,一個標識位position等)。Netty便提供的自己的緩衝ByteBuf

     Nio ByteBuffer 和  Netty ByteBuf 對比

    1.指標問題

public class Test2 {
    public static void main(String[] args) {
        String content = "hello,world";
        ByteBuffer byteBuffer = ByteBuffer.allocate(256);
        byteBuffer.put(content.getBytes());
        byteBuffer.flip();
        byte[] bufferValue = new byte[byteBuffer.remaining()];
        byteBuffer.get(bufferValue);
        System.out.println(new String(bufferValue));
    }
}

示例中就是一種比較常見的NIO操作比較關鍵的程式碼 byteBuffer.flip();它會把limit設定為position的位置。否則讀取到的將會是錯誤的內容。

     ByteBuf通過2個索引來維護緩衝區的讀寫操作讀操作通過readerIndex,寫操作通過writeIndex。

他們的初始值都為0,資料的寫入將導致writeIndex增加,資料的讀取將會導致readerIndex增加。但是它不會操作writeIndex。讀取之後在0和readIndex範圍稱之為discard。呼叫discardReadBytes方法。可以釋放這部分空間。readIndex和writeIndex之間的資料為可讀資料。writeIndex和limit之間的資料為可寫的空間。由於讀寫由不同的指標來維護,這樣就可以避免NIO中顯示的呼叫flip()來切換不同的操作了。

2.定長問題

    操作NIO的時候,當我們對緩衝區put的時候,如果緩衝區空間不夠,將會丟擲異常。為了避免這個問題。Netty在write資料的時候,首先會對資料的長度和可寫空間做個校驗。如果不足,就會建立一個新的ByteBuf,並把之前的複製到新建的這個ByteBuf。最後釋放老的ByteBuf。

下來,我們一起來追蹤下原始碼

buffer.writeInt(1);

首先進入writeInt方法

@Override
public ByteBuf writeInt(int value) {
    ensureWritable(4);
_setInt(writerIndex
, value); writerIndex += 4; return this; }

其中非常關鍵的一行 ensureWritable(4);netty就是通過這個方法達到擴容。我們繼續往下追蹤

public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}

    if (minWritableBytes <= writableBytes()) {
        return this;
}

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}

    // Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
    return this;
}

程式碼中首先判斷寫的長度是否小於0,緊接著判斷。當前快取物件是否有足夠的空間存放當前需要寫入的最大長度。否則就計算下次需要生產的空間的大小。也就是

程式碼中的 caculateNewCapacity()方法。

接著 我們可繼續看看它的計算空間演算法

private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
        return threshold;
}

    // If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
} else {
            newCapacity += threshold;
}
        return newCapacity;
}

    // Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
}

    return Math.min(newCapacity, maxCapacity);
}

首先判斷當前傳入的大小是否小於64,否則就返回64,如果大於64且小於threadshould 就每次增大2倍。否則就每次新增4m或者當新需要的空間大於最大空間減去4m時,就直接賦值最大的空間

有了新需要的容器大小,就可以準備擴容了。

public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
}

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
}
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
            setIndex(newCapacity, newCapacity);
}
        setArray(newArray);
}
    return this;
}

首先對引用和引數的校驗。然後建立新的

byte[] newArray = new byte[newCapacity]
容器。接著賦值,更新索引。最後返回新的容器

到這裡,快取就成功的擴容了。