netty原始碼解解析(4.0)-25 ByteBuf記憶體池:PoolArena-PoolChunk
PoolArena實現了用於高效分配和釋放記憶體,並儘可能減少記憶體碎片的記憶體池,這個記憶體管理實現使用PageRun/PoolSubpage演算法。分析程式碼之前,先熟悉一些重要的概念:
- page: 頁,一個頁是可分配的最小的記憶體塊單元,頁的大小:pageSize = 1 << n (n <= 12)。
- chunk: 塊,塊是多個頁的集合。chunkSize是塊中所有page的pageSize之和。
- Tiny: <512B的記憶體塊。
- Small: >=512B, <pageSize的記憶體塊。
- Normal: >=pageSize, <=chunkSize的記憶體塊。
- Huge: >chunkSize的記憶體塊。
PoolArena維護了一個PoolChunkList組成的雙向連結串列,每個PoolChunkList內部維護了一個PoolChunk雙向連結串列。分配記憶體時,PoolArena通過在PoolChunkList找到一個合適的PoolChunk,然後從PoolChunk中分配一塊記憶體。
關鍵屬性
pageSize: page的大小。必須滿足 pageSize = 1 << n (n>=12)。
maxOrder: 完全平衡二叉樹的高度。
chunkSize: chunk的大小。chunkSize = pageSize * (1 << maxOrder)。
memory: chunk的記憶體,大小必須>=chunkSize。
offset: chunk記憶體在memory中的起始位置。 memory大小必須>=offet+chunkSize。
page管理
chunk以完全平衡二叉樹的資料結構管理page, 這顆樹的節點以堆的方式儲存在陣列中, 如果這棵樹的高度maxOrder=4, 它的結構如下圖所示:
圖-1
節點名字格式是d-i, d是節點在樹中的深度,i是節點在陣列中的索引。
它有如下一些性質:
- 任意一個節點i, i的取值範圍是: [1, 1 << (maxOrder + 1) )。i == 1節點是根節點。
- 如果節點i在區間[1 << maxOrder,1 << (maxOrder +1) ), 那麼這些節點都是葉節點。
- 除葉節點以外的節點i, i << 1是它的左子節點,(i << 1) + 1 是它的右子節點。除根節點以外的節點i, i >> 1是它的父節點, i ^ 1是它的另外一個兄弟節點。
- 對於一個節點i, 它樹中的深度d = log2(i) (d是整數)。 d相同的節點位於樹中的同一層上,他們包含相同的頁節點數,有相同的最大可分配記憶體。
- 任意節點i, 深度為d, 如果把同一層的節點放在一個單獨的陣列中,那麼節點i在這個資料組中的偏移量doffset=i ^ (1 << d)。
- 任意節點i, 深度為d, 它包含的頁節點的數量是1 << (maxOrder - d), 記憶體大小是(1 << (maxOrder - d)) * pageSize。
- 已知深度d, [1 << d, 1 << (d + 1) )區間內的所有節點的深度都是d。
- 任意節點i, 深度d,在memory中的起始位置偏移量是offset + (1 ^ (1 << d) * (1 << (maxOrder - d)) * pageSize。
請記住這些性質。PoolChunk的程式碼很簡潔,可是如果不熟悉這些性質,這些簡潔的程式碼也會難以理解。
完全平衡二叉樹的初始化
1 PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) { 2 unpooled = false; 3 this.arena = arena; 4 this.memory = memory; 5 this.pageSize = pageSize; 6 this.pageShifts = pageShifts; 7 this.maxOrder = maxOrder; 8 this.chunkSize = chunkSize; 9 this.offset = offset; 10 unusable = (byte) (maxOrder + 1); 11 log2ChunkSize = log2(chunkSize); 12 subpageOverflowMask = ~(pageSize - 1); 13 freeBytes = chunkSize; 14 15 assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder; 16 maxSubpageAllocs = 1 << maxOrder; 17 18 // Generate the memory map. 19 memoryMap = new byte[maxSubpageAllocs << 1]; 20 depthMap = new byte[memoryMap.length]; 21 int memoryMapIndex = 1; 22 for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time 23 int depth = 1 << d; 24 for (int p = 0; p < depth; ++ p) { 25 // in each level traverse left to right and set value to the depth of subtree 26 memoryMap[memoryMapIndex] = (byte) d; 27 depthMap[memoryMapIndex] = (byte) d; 28 memoryMapIndex ++; 29 } 30 } 31 32 subpages = newSubpageArray(maxSubpageAllocs); 33 }
在構造方法中,19-30行初始化了兩棵完全一樣的完全平衡二叉樹(形如圖-1): memoryMap, depthMap。這兩個map都是以陣列的方式儲存二叉樹,陣列的長度都是maxSubpageAllocs << 1, 由於maxSubpageAllocs = 1 << maxOrder, 因此長度還可以表示為 1 << (maxOrder + 1)。 map陣列的0項保留,[1, 1 << maxOrder)區間中的每個項是二叉樹的一個節點,每個項的值是節點在樹中的深度。
depthMap用來記錄每個節點在樹中的深度,初始化之後,值不會發生變化。已知一個節點在陣列中的索引id, 可以使用這個id查詢節點在樹中的深度: depthMap[id]。
memoryMap用來記錄樹中節點被分配出去的情況,每個項的值會隨著節點分配情況變化而變化。已知一個節點在陣列中的索引id,memoryMap[id]的值會有三中情況:
- memoryMap[id] == depth[id]: 所有子節點都沒被分配出去。
- memoryMap[id] > depth[id]: 至少有一個子節點被分配出去了, 還有可以分配的子節點。
- memoryMap[id] == maxOrder + 1: 這個節點以及完全被分配出去了,沒有可分配的子節點了。
從二叉樹中分配一個記憶體大小合適的節點
1 long allocate(int normCapacity) { 2 if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize 3 return allocateRun(normCapacity); 4 } else { 5 return allocateSubpage(normCapacity); 6 } 7 }
這個方法是分配記憶體節點的入口方法,引數normCapacity必須滿足normCapacity = 1 << n。第2行判斷normCapacity和pageSize的大小關係,在前面的構造方法中,subpageOverflowMask = ~(pageSize - 1), 如果pageSize=2048, subpageOverflowMask的0-11位是0, 12-31位是1,它的二進位制值是: 1111111111111111111100000000000, (normCapacity & subpageOverflowMask) != 0表示,normCapacity的12-31位中至少有一位是1,此時它>=pageSize, 反之比pageSize小。
如果normCapacity >= pageSize, 呼叫allocateRun分配一個深度d < maxOrder的節點。
如果normaCapacity < pageSize, 呼叫allocateSubpage分配一個d == maxOrder的葉葉節點, 即一個page。
PoolChunk分配記憶體的最小單元是一個page,不能分配比一個page更小的記憶體了。
1 private long allocateRun(int normCapacity) { 2 int d = maxOrder - (log2(normCapacity) - pageShifts); 3 int id = allocateNode(d); 4 if (id < 0) { 5 return id; 6 } 7 freeBytes -= runLength(id); 8 return id; 9 }
第2行,計算normCapacity大小的記憶體在二叉樹的最大深度d, 只有深度<=d的節點才有可以分配到>=normCapacity的記憶體。normCapacity可以表示為normCapacity = 2k, log2(normCapacity)就是已知normCapacity求解k。pageShifts可表示為pageSize = 2pageShifts, pageShifts = log2(pageSize)。 normCapacity在二叉樹上的反向深度 rd = log2(mormCapacity) - pageShifts, 這個表示式比較難以理解,這樣會更加直觀一些:
pageCount = normCapacity >> log2(pageSize)
rd = log2(pageCount)
pageCount是normCapacity需要的page數量。 反向深度的含義是,d=0對應二叉樹的最大深度maxOrder, d=1對應maxOrder -1,依次類推。因此maxOrder - rd會得到最大深度d,d <= maxOrder。
第3行,如果能夠根據d找到一個合適的節點,就會把這個節點記錄為已經使用的狀態,然後返回這個節點的索引id, id的取值區間是[0, 1 << maxOrder)。
第7行,重新計算剩餘記憶體數。
rungLength方法用於計算節點id的記憶體長度:
private int runLength(int id) { // represents the size in #bytes supported by node 'id' in the tree return 1 << log2ChunkSize - depth(id); }
log2ChunkSize=log2(chunkSize)在構造方法中初始化。 有性質(6)可以得到節點id的長度 length = (1 << maxOrder - depth(id)) * pageSize,它和程式碼中表達式是等價的,推導過程如下:
已知:
log2ChunkSize = log2(chunkSize)
chunkSize = (1 << maxOrder) * pageSize
pageSize = 2k = 1 << k
=> chunkSize = (1 << maxOrder) * 2k
= 2maxOrder * 2k
= 2maxOrder + k
=> log2ChunkSize = log2(chunkSize)
= log2(2maxOrder + k)
= maxOrder + k
=> log2ChunkSize - depth(id) = maxOrder + k - depth(id)
=> 1 << log2ChunkSize - depth(id) = 1 << maxOrder + k - depth(id)
= (1 << maxOrder - depth(id)) * (1 << k)
= (1 << maxOrder - depth(id)) * pageSize
如果需要的記憶體>=pageSize, 就會呼叫allocateNode方法,這個方法的作用是從二叉樹中分配一個節點,返回值id是這個節點的索引。
1 private int allocateNode(int d) { 2 int id = 1; 3 int initial = - (1 << d); // has last d bits = 0 and rest all = 1 4 byte val = value(id); 5 if (val > d) { // unusable 6 return -1; 7 } 8 while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0 9 id <<= 1; 10 val = value(id); 11 if (val > d) { 12 id ^= 1; 13 val = value(id); 14 } 15 } 16 byte value = value(id); 17 assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", 18 value, id & initial, d); 19 setValue(id, unusable); // mark as unusable 20 updateParentsAlloc(id); 21 return id; 22 }
allocateNode方法的功能是從memoryMap樹中深度[1, d]的節點中找出一個沒有被分配出去的節點,然後把這個節點記錄為已分配的狀態。尋找順序是自上而下,從左到到右。
第2行,從第一個節點開始,這個節點是二叉樹的根節點。
第3行,計算一個32位initial,它的[0, d)位都是0,[d, 31]位都是1。
第4-6行,檢查是否可以分配一個深度<=d節點, 如果不能分配記憶體失敗,返回-1。 val == maxOrder + 1時表示這個節點的記憶體已經被分配完了,val在[0, maxOrder]區間內時,表示可以分配一個深度在[val, maxOder]區間內的節點。所以在第5行檢查到val>d時表示不能分配到記憶體了。
8-15行,能夠執行到第8行,說明在這個chunk中,二叉樹中一定至少有一個節點滿足深度等於d, 且沒有任何子節點被分配出去的節點。迴圈,滿足 val < d或(id & initial) == 0會增加一個深度繼續尋找。也就是說如果滿足val == d 且 (id & initial) == 1時,表示找到了符合調條件的節點了。第9行,增加一個深度。 第10,11行檢查左節點。 12,13行檢查右節點。
19行, 把選中的節點id, 設定成unusable(maxOrder+1)狀態。
20行,更新所有父節點的值。
這個方法展示了已知memoryMap中索引為id的值val = memoryMap[id], 找到一個深度為d的空閒節點的演算法。前面已經講過val值的三種情況,其中第2中情況的時候,表示只有節點id下面只能找到深度>=val的空閒節點,索引d<val情況下,無法找到滿足深度等於d的空閒節點。影響memoryMapy[id]值的演算法在updateParentsAlloc中實現:
1 private void updateParentsAlloc(int id) { 2 while (id > 1) { 3 int parentId = id >>> 1; 4 byte val1 = value(id); 5 byte val2 = value(id ^ 1); 6 byte val = val1 < val2 ? val1 : val2; 7 setValue(parentId, val); 8 id = parentId; 9 } 10 }
3行,得到id的父節點。
4-6行,取memoryMap中,取節點id和它的兄弟節點的值中交小的一個,如果相等的話就隨意取一個。
7行,把上一步中的取值設定到父節點上。
8,2行,深度減1,重複這個過程直到根節點為止。
分配一個小於pageSize的子頁subpage
當需要分配的記憶體小於pageSize時,仍然會分配一個page,因為PoolChunk能分配的最小記憶體單元是一個page。這時候只需分配一個也節點就可以了。
1 private long allocateSubpage(int normCapacity) { 2 // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. 3 // This is need as we may add it back and so alter the linked-list structure. 4 PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); 5 synchronized (head) { 6 int d = maxOrder; // subpages are only be allocated from pages i.e., leaves 7 int id = allocateNode(d); 8 if (id < 0) { 9 return id; 10 } 11 12 final PoolSubpage<T>[] subpages = this.subpages; 13 final int pageSize = this.pageSize; 14 15 freeBytes -= pageSize; 16 17 int subpageIdx = subpageIdx(id); 18 PoolSubpage<T> subpage = subpages[subpageIdx]; 19 if (subpage == null) { 20 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); 21 subpages[subpageIdx] = subpage; 22 } else { 23 subpage.init(head, normCapacity); 24 } 25 return subpage.allocate(); 26 } 27 }
6-10行,分配一個深度d=maxOrder的葉節點。
17,18行,從subpages取出一個PoolSubpage快取。subpages在構造方法中初始化,subpages = new PoolSubpage[maxSubpageAllocs], maxSubpageAllocs = 1 << maxOrder。subpages的長度就是chunk中的page數量。
19-24行,如果快取中沒有,建立一個新的。如果有直接初始PoolSubpage。
25行,分配一個子頁。
關於PoolSubpage子頁面管理的功能,後面會詳細分析,這裡只涉及和PoolChunk相關的內容。
釋放記憶體
分配記憶體成功後會返回一個long型的handle,64位的handle被分為兩部分,[0, 32)位是二叉樹中的節點索引,可以使用memoryMapIdx(handle)方法取出。[32, 64)位是PoolSubpage中子頁面的索引,可以使用bitMapIdx(handler)方法取出。釋放一個handle時,可能需要同時釋放二叉樹中的節點和PoolSubpage中子頁面,free(int handle)方法實現了這個記憶體釋放過程:
1 void free(long handle) { 2 int memoryMapIdx = memoryMapIdx(handle); 3 int bitmapIdx = bitmapIdx(handle); 4 5 if (bitmapIdx != 0) { // free a subpage 6 PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; 7 assert subpage != null && subpage.doNotDestroy; 8 9 // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. 10 // This is need as we may add it back and so alter the linked-list structure. 11 PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize); 12 synchronized (head) { 13 if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) { 14 return; 15 } 16 } 17 } 18 freeBytes += runLength(memoryMapIdx); 19 setValue(memoryMapIdx, depth(memoryMapIdx)); 20 updateParentsFree(memoryMapIdx); 21 }
2,3行,分別取出二叉樹的節點id和PoolSubpage中子頁的id。
5-17行,釋放PoolSubpage子頁。子頁記憶體被釋放之後,subpages陣列中仍然儲存著PoolSubpages物件。13行只有subpage中所有的子頁都釋放完了才會釋放subpage持有的page。
18-20行,釋放二叉樹中的節點。呼叫setValue把被釋放的節點memoryMap值設定成它原本的深度depth(memoryMapIdx)。 呼叫updateParentsFree, 修改memoryMap記錄,這個方法實現了updateParentsAlloc的逆過程。
updateParentsFree釋放二叉樹節點的關鍵,如果一個節點被釋放,它的父節點在memoryMap值可能會發生變化。這個方法的實現如下:
1 private void updateParentsFree(int id) { 2 int logChild = depth(id) + 1; 3 while (id > 1) { 4 int parentId = id >>> 1; 5 byte val1 = value(id); 6 byte val2 = value(id ^ 1); 7 logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up 8 9 if (val1 == logChild && val2 == logChild) { 10 setValue(parentId, (byte) (logChild - 1)); 11 } else { 12 byte val = val1 < val2 ? val1 : val2; 13 setValue(parentId, val); 14 } 15 16 id = parentId; 17 } 18 }
第2行,計算節點id的子節點深度logChild。
第3行,確保id不是根節點。
第4行,得到父節點id。
第5,6行,得到節點id及其兄弟節點memoryMap值: val1, val2。
第7行,把logChild變成id的深度。
第9,10行, 如果id及其兄弟節點的指定都是depth(id),表示這兩個節都已經完全釋放,把父節點的指定還原成depth(parentId) == logChild -1 。
第12,13行,如果id及其兄弟節點至少有一個沒有完全釋放,把較小的值設定到父節點上。
第16行,深度上移,繼續上面的過程。
使用分配的記憶體初始化PooledByteBuf
使用allocate分配記憶體得到一個handle之後,需要呼叫PooledByteBuf的init方法使用handle對應的記憶體初始化。初始化的關鍵是計算出handle對應的記憶體在memory中的偏移量和長度。前面講的lenthRun可以計算出記憶體的長度,剩下的就是計算記憶體偏移量方法runOffset。PoolChunk的initBuf方法用來初始化一個PooledByteBuf物件:
1 void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { 2 int memoryMapIdx = memoryMapIdx(handle); 3 int bitmapIdx = bitmapIdx(handle); 4 if (bitmapIdx == 0) { 5 byte val = value(memoryMapIdx); 6 assert val == unusable : String.valueOf(val); 7 buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx), 8 arena.parent.threadCache()); 9 } else { 10 initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); 11 } 12 }
第2,3行,在分析free程式碼中解釋過。
第4-8行,表示這塊記憶體是二叉樹中的一個節點,直接使用init方法初始化。runOffset的演算法是 (memoryMapIdx ^ 1 << depth(memoryMapIdx)) * runLength(memoryMapIdx), 根據性質(5)可知,memoryMapIdx ^ depth(memoryMapIdx) 是節點memoryMepIdx在深度為depth(memoryMapIdx)層上的偏移量doffset, 即這一層前面還有doffset個節點,根據性質(4)可知每個節點的記憶體大小是runLength(memoryMapIdx),所以doffset * runLength(memoryMapIdx)是節點memoryMapIdx在chunk記憶體上的偏移量。最後還要再加上一個offset,它是chuk在memory上的偏移量。
第10行,表示這塊記憶體是一個subpage,使用initBufWithSubpage初始化。
1 void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) { 2 initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity); 3 } 4 5 private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { 6 assert bitmapIdx != 0; 7 8 int memoryMapIdx = memoryMapIdx(handle); 9 10 PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; 11 assert subpage.doNotDestroy; 12 assert reqCapacity <= subpage.elemSize; 13 14 buf.init( 15 this, handle, 16 runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset, 17 reqCapacity, subpage.elemSize, arena.parent.threadCache()); 18 }
關鍵部分在第二個過載方法。的第14-17行。這個計算記憶體偏移量的演算法是runOffst(memoryMapIdx) + offset + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize,它可以拆分成兩部分:
memoryMapIdx表示的page在記憶體中的偏移量pageOffset = runOffset(memoryMapIdx) + offset
子頁面subpage在page中的偏移量: subpOffset = (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
其中subpOffset是個陌生的東西,會在後面PoolSubpage相關章節詳細分析。
&n