netty原始碼解解析(4.0)-24 ByteBuf基於記憶體池的記憶體管理
io.netty.buffer.PooledByteBuf<T>使用記憶體池中的一塊記憶體作為自己的資料記憶體,這個塊記憶體是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象型別,它有4個派生類:
- PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆記憶體的PooledByteBuffer<byte[]>。
- PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接記憶體的PooledByteBuf<ByteBuffer>。
初始化
PooledByteBuf的初始化過程分為兩個步驟:建立例項;初始化記憶體。這兩個步驟的程式碼如下:
protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = recyclerHandle; } void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, handle, offset, length, maxLength, cache); } private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; this.chunk = chunk; memory = chunk.memory; allocator = chunk.arena.parent; this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; }
建立例項時呼叫的構造方法只是為maxCapacity和recyclerHandler屬性賦值,構造方法是protected,不打算暴露到外面。派生類都提供了newInstance方法建立例項,以PooledHeapByteBuf為例,它的newInstance方法實現如下:
1 private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { 2 @Override 3 protected PooledHeapByteBuf newObject(Handle handle) { 4 return new PooledHeapByteBuf(handle, 0); 5 } 6 }; 7 8 static PooledHeapByteBuf newInstance(int maxCapacity) { 9 PooledHeapByteBuf buf = RECYCLER.get(); 10 buf.reuse(maxCapacity); 11 return buf; 12 }
這裡的newInstance使用RECYCLER建立例項物件。Recycler<T>是一個輕量級的,支援迴圈使用的物件池。當物件池中沒有可用物件時,會在第4行使用構造方法建立一個新的物件。
init呼叫init0初始化資料記憶體,init0方法為幾個記憶體相關的關鍵屬性賦值:
- chunk: PoolChunk<T>物件,這個PooledByteBuf使用的記憶體就是它的一部分。
- memory: 記憶體物件。更準確地說,PooledByteBuf使用的記憶體是它的一部分。
- allocator: 建立這個PooledByteBuf的PooledByteBufAllocator物件。
- cache: 執行緒專用的記憶體快取。分配記憶體時會優先從這個快取中尋找合適的記憶體塊。
- handle: 記憶體在chunk中node的控制代碼。chunk使用handle可以計算出它對應記憶體的起始位置offset。
- offset: 分配記憶體的起始位置。
- length: 分配記憶體的長度,也是這個PooledByteBuf的capacity。
- maxLength: 這塊記憶體node的最大長度。當呼叫capacity(int newCapacity)方法增加capacity時,只要newCapacity不大於這個值,就不用從新分配記憶體。
記憶體初始化完成之後,這個PooledByteBuf可使用的記憶體範圍是memory記憶體中[offset, offset+length)。idx方法可以把ByteBuf的索引轉換成memory的索引:
1 protected final int idx(int index) { 2 return offset + index; 3 }
重新分配記憶體
和前面講過的ByteBuf實現一樣,PooledByteBuf也需要使用capacity(int newCapacity)改變記憶體大小,也會涉及到把資料從舊記憶體中複製到新記憶體的問題。也就是說,要解決的問題是一樣的,只是具體實現的差異。
1 @Override 2 public final ByteBuf capacity(int newCapacity) { 3 checkNewCapacity(newCapacity); 4 5 // If the request capacity does not require reallocation, just update the length of the memory. 6 if (chunk.unpooled) { 7 if (newCapacity == length) { 8 return this; 9 } 10 } else { 11 if (newCapacity > length) { 12 if (newCapacity <= maxLength) { 13 length = newCapacity; 14 return this; 15 } 16 } else if (newCapacity < length) { 17 if (newCapacity > maxLength >>> 1) { 18 if (maxLength <= 512) { 19 if (newCapacity > maxLength - 16) { 20 length = newCapacity; 21 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 22 return this; 23 } 24 } else { // > 512 (i.e. >= 1024) 25 length = newCapacity; 26 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 27 return this; 28 } 29 } 30 } else { 31 return this; 32 } 33 } 34 35 // Reallocation required. 36 chunk.arena.reallocate(this, newCapacity, true); 37 return this; 38 }
這個方法處理兩大類情況: 不重新分配記憶體;重新分配記憶體並複製ByteBuf中的資料和狀態。
不重新分配記憶體:
8行: chunk不需要回收到記憶體池中,且newCapacity沒有變化。
11-32行: chunk需要回收到記憶體池中。
13-14行:記憶體增大,且newcapacity不大於maxLength。把容量修改成newCapacity即可。
20-22行: 記憶體減小, newCapacity 大於maxLength的一半,maxLength<=512, newCapacity >maxLength - 16。 把容量修改成newCapacity, 調整readerIndex, writerIndex。
25-27行: 記憶體減小,newCapacity大於maxLength的一半, maxLength > 512。把容量修改成newCapacity, 調整readerIndex, writerIndex。
31行: 記憶體不變,不做任何操作。
需要重新分配記憶體:
36行: 任何不滿足以上情況的都要重新分配記憶體。這裡使用Arena的reallocate方法重新分配記憶體,並把舊記憶體釋放調,程式碼如下:
1 //io.netty.buffer.PoolArena#reallocate, 2 void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) { 3 if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { 4 throw new IllegalArgumentException("newCapacity: " + newCapacity); 5 } 6 7 int oldCapacity = buf.length; 8 if (oldCapacity == newCapacity) { 9 return; 10 } 11 12 PoolChunk<T> oldChunk = buf.chunk; 13 long oldHandle = buf.handle; 14 T oldMemory = buf.memory; 15 int oldOffset = buf.offset; 16 int oldMaxLength = buf.maxLength; 17 int readerIndex = buf.readerIndex(); 18 int writerIndex = buf.writerIndex(); 19 20 allocate(parent.threadCache(), buf, newCapacity); 21 if (newCapacity > oldCapacity) { 22 memoryCopy( 23 oldMemory, oldOffset, 24 buf.memory, buf.offset, oldCapacity); 25 } else if (newCapacity < oldCapacity) { 26 if (readerIndex < newCapacity) { 27 if (writerIndex > newCapacity) { 28 writerIndex = newCapacity; 29 } 30 memoryCopy( 31 oldMemory, oldOffset + readerIndex, 32 buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); 33 } else { 34 readerIndex = writerIndex = newCapacity; 35 } 36 } 37 38 buf.setIndex(readerIndex, writerIndex); 39 40 if (freeOldMemory) { 41 free(oldChunk, oldHandle, oldMaxLength, buf.cache); 42 } 43 }
7-9行: 記憶體大小沒變化,返回。
12-18行: 記錄下舊記憶體的資訊,readerIndex, writerIndex。
20行: 為PooledByteBuf分配新記憶體。
21-38行: 把舊記憶體中資料複製到新記憶體,並把readerIndex,writerIndex設定在正確。
41行: 釋放就記憶體。
釋放記憶體
記憶體釋放程式碼在deallocate中:
1 @Override 2 protected final void deallocate() { 3 if (handle >= 0) { 4 final long handle = this.handle; 5 this.handle = -1; 6 memory = null; 7 tmpNioBuf = null; 8 chunk.arena.free(chunk, handle, maxLength, cache); 9 chunk = null; 10 recycle(); 11 } 12 }
關鍵是第8行程式碼,使用PoolArena的free方法釋放記憶體。然後是recycle把當前PooledByteBuf物件放到RECYCLER中迴圈使用。
PooledByteBufAllocator建立記憶體管理模組
在前面分析PooledByteBuf記憶體初始化,重新分配及釋放時,看到了記憶體管理的三個核心模組: PoolArena(chunk.arena), PoolChunk(chunk), PoolThreadCache(cache)。PooledByteBuf的記憶體管理能力都是使用這三模組實現的,它本身沒有實現記憶體管理演算法。當需要為PooledByteBuf分配一塊記憶體時,先從一個執行緒專用的PoolThreadCache中得到一個PoolArena, 使用PoolArena的allocate找到一個滿足要求記憶體塊PoolChunk, 從這個記憶體塊中分配一塊連續的記憶體handle,計算出這塊記憶體起始位置的偏移量offset, 最後呼叫PooledByteBuf的init方法初始化記憶體完成記憶體分配。 釋放記憶體呼叫PoolArena的free方法。在記憶體分配時,會優先從PoolThreadCache中尋找合適的記憶體塊。在記憶體釋放時會把記憶體塊暫時放在PoolThreadCache中,等使用頻率過低時才會還給PoolChunk。這三個模組中PoolArena, PoolThreadCache由PooledByteBufAllocator建立,PoolChunk由PoolArean維護。
PooledByteBufAllocator維護了相關的幾個屬性:
PoolArena<byte[]>[] heapArenas
PoolArena<ByteBuffer>[] directArenas
PoolThreadLocalCache threadCache
headArenas和directArenas分別維護了多個PoolArena, 他們分別用來分配堆記憶體和直接記憶體。 如果使用得當,可以讓每個執行緒持有一個專用的PoolArena, 避免執行緒間資料同步的開銷。PoolThreadLocalCache會為每個執行緒建立一個專用的PoolThreadCache例項,這個例項分別持有一個heapArena和directArena。
接下來的的幾個章節會詳細分析PoolArena和PoolThreadCache的實現程式碼。
&n