Netty ByteBuf原理及其原始碼分析
類圖
緩衝區介紹
當我們進行資料傳輸的時候,往往需要緩衝區。java NIO 中自帶的提供的就是java.nio.Buffer
但是由於java自帶的過於複雜,而且自身也有一定的缺陷(定長,一個標識位position等)。Netty便提供的自己的緩衝ByteBuf
Nio ByteBuffer 和 Netty ByteBuf 對比
1.指標問題
public class Test2 { public static void main(String[] args) { String content = "hello,world"; ByteBuffer byteBuffer = ByteBuffer.allocate(256); byteBuffer.put(content.getBytes()); byteBuffer.flip(); byte[] bufferValue = new byte[byteBuffer.remaining()]; byteBuffer.get(bufferValue); System.out.println(new String(bufferValue)); } }
示例中就是一種比較常見的NIO操作,比較關鍵的程式碼 byteBuffer.flip();它會把limit設定為position的位置。否則讀取到的將會是錯誤的內容。
ByteBuf通過2個索引來維護緩衝區的讀寫操作。讀操作通過readerIndex,寫操作通過writeIndex。
他們的初始值都為0,資料的寫入將導致writeIndex增加,資料的讀取將會導致readerIndex增加。但是它不會操作writeIndex。讀取之後在0和readIndex範圍稱之為discard。呼叫discardReadBytes方法。可以釋放這部分空間。readIndex和writeIndex之間的資料為可讀資料。writeIndex和limit之間的資料為可寫的空間。由於讀寫由不同的指標來維護,這樣就可以避免NIO中顯示的呼叫flip()來切換不同的操作了。
2.定長問題
操作NIO的時候,當我們對緩衝區put的時候,如果緩衝區空間不夠,將會丟擲異常。為了避免這個問題。Netty在write資料的時候,首先會對資料的長度和可寫空間做個校驗。如果不足,就會建立一個新的ByteBuf,並把之前的複製到新建的這個ByteBuf。最後釋放老的ByteBuf。
下來,我們一起來追蹤下原始碼
buffer.writeInt(1);
首先進入writeInt方法
@Override public ByteBuf writeInt(int value) { ensureWritable(4); _setInt(writerIndex, value); writerIndex += 4; return this; }
其中非常關鍵的一行 ensureWritable(4);netty就是通過這個方法達到擴容。我們繼續往下追蹤
public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } if (minWritableBytes <= writableBytes()) { return this; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // Adjust to the new capacity. capacity(newCapacity); return this; }
程式碼中首先判斷寫的長度是否小於0,緊接著判斷。當前快取物件是否有足夠的空間存放當前需要寫入的最大長度。否則就計算下次需要生產的空間的大小。也就是
程式碼中的 caculateNewCapacity()方法。
接著 我們可繼續看看它的計算空間演算法
private int calculateNewCapacity(int minNewCapacity) { final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
首先判斷當前傳入的大小是否小於64,否則就返回64,如果大於64且小於threadshould 就每次增大2倍。否則就每次新增4m或者當新需要的空間大於最大空間減去4m時,就直接賦值最大的空間
有了新需要的容器大小,就可以準備擴容了。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
首先對引用和引數的校驗。然後建立新的
byte[] newArray = new byte[newCapacity]容器。接著賦值,更新索引。最後返回新的容器
到這裡,快取就成功的擴容了。