1. 程式人生 > >netty原始碼解析(4.0)-26 ByteBuf記憶體池:PoolArena-PoolSubpage

netty原始碼解析(4.0)-26 ByteBuf記憶體池:PoolArena-PoolSubpage

  PoolChunk用來分配大於或等於一個page的記憶體,如果需要小於一個page的記憶體,需要先從PoolChunk中分配一個page,然後再把一個page切割成多個子頁-subpage,最後把記憶體以subpage為單位分配出去。PoolSubpage就是用來管理subpage的。

  一個page會被分割成若干個大小相同的subpage,subpage的的大小是elemSize。elemSize必須是16的整數倍,即必須滿足elemSize & 15 == 0。elemSize的取值範圍是(16, pageSize-16)。多個elemSize相等的PoolSubpage組成一個雙向連結串列,用於分配特定大小的subpage。

  Tiny和Small型別的記憶體,都是用subpage來分配。Tiny記憶體大小範圍是[16, 512),如果把大小不同的subpage按順序排列,除最後一個外,任意一個subpage的elemSize+16等於下一個subpage的elemSize,可以用於分配Tiny記憶體的subpage有512>>4=32種。Small記憶體大小範圍是[512, pageSize),subpage的elemSize=512 * 2n = 2(9+n), 可以用於分配Small記憶體的subpage有n種,n的最小值是0, 最大值由pageSize決定。

  已知:

    elemSize < pageSize

    pageSize可以表示為2k

    elemSize = 29+n

  => 29+n < 2k

  => 9+n < k

  => n < k - 9

  => n的取值範圍是(0, k - 9)

  上一章中分析過pageShifts, 它就是上面推導過程中使用的變數k。

 

PoolArena中的PoolSubpage陣列

  PoolArena維護了兩個PoolSubpage表,都是以PoolSubpage<T>[]陣列的形式儲存:

  • tinySubpagePools:用於分配Tiny記憶體,陣列長度是521 >> 4 = 32。
  • smallSubpagePools: 用於分配Small記憶體,陣列長度是pageShifts - 9。

  PoolArean在構造方法中初始化這兩個陣列:

 1         tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
 2         for (int i = 0; i < tinySubpagePools.length; i ++) {
 3             tinySubpagePools[i] = newSubpagePoolHead(pageSize);
 4         }
 5 
 6         numSmallSubpagePools = pageShifts - 9;
 7         smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
 8         for (int i = 0; i < smallSubpagePools.length; i ++) {
 9             smallSubpagePools[i] = newSubpagePoolHead(pageSize);
10         }

  程式碼中的numTinySubpagePools=512>>4和numSmallSubpagePools=pageShifts - 9,分別是兩個陣列的長度。這兩個陣列儲存的都是PoolSubpage雙向連結串列的頭節點,頭節點不能用來分配記憶體。

  findSubpagePoolHead方法可以根據elemSize找到對應的PoolSubpage連結串列的頭節點:

 1     PoolSubpage<T> findSubpagePoolHead(int elemSize) {
 2         int tableIdx;
 3         PoolSubpage<T>[] table;
 4         if (isTiny(elemSize)) { // < 512
 5             tableIdx = elemSize >>> 4;
 6             table = tinySubpagePools;
 7         } else {
 8             tableIdx = 0;
 9             elemSize >>>= 10;
10             while (elemSize != 0) {
11                 elemSize >>>= 1;
12                 tableIdx ++;
13             }
14             table = smallSubpagePools;
15         }
16 
17         return table[tableIdx];
18     }

  4-6行,如果是Tiny記憶體,計算elemSize在tinySubpagePools中的偏移量tableIdx。

  8-14行,如果是Normal記憶體,計算elemSize在smallSubpagePools中的偏移量tabIeIdx。計算tableIdx的演算法是把elemSize無符號右移10位之後,找非0的最高位,在找的過程中累加tableIdx,找到之後及得到了正確的偏移量。這個演算法還可以簡化成log2(elemSize) - 9。

  17行,取出一個PoolSubpage連結串列頭。

  

PoolSubpage初始化

  在PoolChunk的allocateSubpage方法中,呼叫findSubpagePoolHead得到一個head,然後使用分配到的二叉樹記憶體節點初始化一個PoolSubpage節點。一個能用來分配記憶體的PooSubpage節點可以呼叫構造方法或init方法進行初始化。

 1     PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
 2         this.chunk = chunk;
 3         this.memoryMapIdx = memoryMapIdx;
 4         this.runOffset = runOffset;
 5         this.pageSize = pageSize;
 6         bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
 7         init(head, elemSize);
 8     }

  這個構造方法只是做了一些簡單的屬性初始化工作。第6行初始bitmap,它用bit位來記錄每個subpage的使用情況,每個bit對應一個subpage,0表示subpage空閒,1表示subpage已經被分配出去。一個subpage的大小是elemSize,前面已經講過,最小的elemSize=16,  那麼一個page最多可分割成subpage的數量maxSubpageCount=pageSize/16=pageSize >> 4。bitmap是個long型的數字,每個long資料有64位,因此bitmap的最大長度只需要maxBitmapLength = maxSubpageCount / 64 = pageSize / 16 / 64 = pageSize >> 10就夠用了。

  init方法的作用是根據elemSize計算出有效的bitmap長度bitmapLength,然後把bitmapLength範圍內存的bit值都初始化為0。

 1     void init(PoolSubpage<T> head, int elemSize) {
 2         doNotDestroy = true;
 3         this.elemSize = elemSize;
 4         if (elemSize != 0) {
 5             maxNumElems = numAvail = pageSize / elemSize;
 6             nextAvail = 0;
 7             bitmapLength = maxNumElems >>> 6;
 8             if ((maxNumElems & 63) != 0) {
 9                 bitmapLength ++;
10             }
11 
12             for (int i = 0; i < bitmapLength; i ++) {
13                 bitmap[i] = 0;
14             }
15         }
16         addToPool(head);
17     }

  第5行,初始subpage的最大數量maxNumElems和可用數量numAvail。

  第6行,初始化下一個可用subpage的索引。

  第7行, 計算bitmap的有效數量,bitmapLength = maxNumElems >>> 6 = maxNumElems / 64。

  第8,9行,如果maxNumElems不是64的整數倍,bitmapLength需要額外加1。

  第12-14行,有效長度的bitmap值都設定成0。

  第16行, 把當前PoolSubpage節點新增到head後面。

  bitmap位的索引範圍是[0, maxNumElems)。

  

分配一個subpage

  PoolSubpage初始化完成之後,呼叫allocate可以分配一個subpage,返回的是一個long型的的handle,這個handle代表一塊記憶體。handle的低32位memoryMapIdx,是PoolChunk中二叉樹節點的索引;高32位bitmapIdx,是subpage在bitmap中對應位的索引。

  分配一個subpage有兩個步驟:

  1. 找到一個可用subpage的索引bitmapIdx
  2. 把這個bitmapIdx在bitmap中對應的bit為置為1

  findNextAvail方法負責到底一個可用的subpage並返回它的bitmapIdx。

 1     private int findNextAvail() {
 2         final long[] bitmap = this.bitmap;
 3         final int bitmapLength = this.bitmapLength;
 4         for (int i = 0; i < bitmapLength; i ++) {
 5             long bits = bitmap[i];
 6             if (~bits != 0) {
 7                 return findNextAvail0(i, bits);
 8             }
 9         }
10         return -1;
11     }
12 
13     private int findNextAvail0(int i, long bits) {
14         final int maxNumElems = this.maxNumElems;
15         final int baseVal = i << 6;
16 
17         for (int j = 0; j < 64; j ++) {
18             if ((bits & 1) == 0) {
19                 int val = baseVal | j;
20                 if (val < maxNumElems) {
21                     return val;
22                 } else {
23                     break;
24                 }
25             }
26             bits >>>= 1;
27         }
28         return -1;
29     }

  第1-11行,變數bitmap陣列,找到一個至少有一位是0的long資料。~bits != 0 說明bits中至少有一位是0。然後呼叫findNextAvail0找到bits中為0的最低位。

  第15行,計算bitmap陣列中的索引i對應的bit索引baseVal = i << 6 = i * 64。

  第17-27行,遍歷bits的每個bit位,遇到為0的bit後在19計算回bitmpaIdx = baseVal | j,j表示bit位在long資料中bit索引。如果滿足bitmapIdx < maxNumElems在21返回。

  第28行,如果沒找到可用的subpage, 返回-1。當maxNumElems不是64的整數倍時,bitmap陣列中最後一個bits在~bits != 0的情況下可能已經沒有subpage可用。

  

  allcate方法是分配supage的入口,它呼叫getNextAvail得到一個supage的bitmapIdx,  getNextAvail在nextAvail屬性為-1的時候,呼叫findNexAvail。然後把bitmapIdx對應的bit為置為1,最後返回handle。

 1     long allocate() {
 2         if (elemSize == 0) {
 3             return toHandle(0);
 4         }
 5 
 6         if (numAvail == 0 || !doNotDestroy) {
 7             return -1;
 8         }
 9 
10         final int bitmapIdx = getNextAvail();
11         int q = bitmapIdx >>> 6;
12         int r = bitmapIdx & 63;
13         assert (bitmap[q] >>> r & 1) == 0;
14         bitmap[q] |= 1L << r;
15 
16         if (-- numAvail == 0) {
17             removeFromPool();
18         }
19 
20         return toHandle(bitmapIdx);
21     }

  第10行,得到下一個可用的subpage在bitmap中的索引bitmapIdx。

  第11行,計算bitmapIdx在bitpmap陣列中索引,q = bitmapIdx >>> 6 = (int)(bitmapIdx/64)。

  第12行,計算bitmapIdx對應的bit在long資料中的位索引r,表示q對應的long資料的第r位就是。

  第14行,把bitmapIdx對應的bit為設定為1。

  第16,17行,把可用subpage數numAvail減1,如果numAvail==0表示當前PoolSubpage節點已經沒有可用的subpage了,呼叫removeFromPool把它從連結串列中刪除。

  第20行,把bitmapIdx轉換成表示記憶體的handle,演算法是:  handle = 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;

  

釋放一個subpage

  free方法實現了subpage釋放的功能,和allocate相比要簡單的多,它的主要工作是把bitmapIdx對應的bit為設定為0,還順便做了一下清理善後工作。

 1     boolean free(PoolSubpage<T> head, int bitmapIdx) {
 2         if (elemSize == 0) {
 3             return true;
 4         }
 5         int q = bitmapIdx >>> 6;
 6         int r = bitmapIdx & 63;
 7         assert (bitmap[q] >>> r & 1) != 0;
 8         bitmap[q] ^= 1L << r;
 9 
10         setNextAvail(bitmapIdx);
11 
12         if (numAvail ++ == 0) {
13             addToPool(head);
14             return true;
15         }
16 
17         if (numAvail != maxNumElems) {
18             return true;
19         } else {
20             // Subpage not in use (numAvail == maxNumElems)
21             if (prev == next) {
22                 // Do not remove if this subpage is the only one left in the pool.
23                 return true;
24             }
25 
26             // Remove this subpage from the pool if there are other subpages left in the pool.
27             doNotDestroy = false;
28             removeFromPool();
29             return false;
30         }
31     }

  第5,6行,和allocate中解釋過。

  第8行,把bitmapIdx對應的bit為置為0。

  第10行,把這個bitmapIdx賦值給nextAvail屬性,這樣在一次或多次呼叫free之後的第一次allocate呼叫就不會呼叫findNextAvail方法,可以提升allocate的效能。

  第12,13行,當前PoolSubpage節點中至少有一個可用的subpage,把當前節點新增到連結串列中。

  第27-29行,當前PooSubpage節點中所有分配出去的節點都全部還會來了,換言之,當前節點有回到bitmap初始化狀態,把當前節點從連結串列中刪除。

 &n