1. 程式人生 > >netty記憶體--PoolChunk分析

netty記憶體--PoolChunk分析

PoolChunk使用了jemalloc分配演算法。對這個演算法不瞭解的,請另行百度哈,這裡我就不展開了。

首先說下幾個概念吧。page是chunk中記憶體分配的最小單元,chunk是由一系列的page組成的。當然,page也可以分割成一系列的subpage。一個chunk的大小chunksize=2{maxorder}*pageSize。

PoolChunk是由final修飾的,這代表不能去修改的哈。

先看下PoolChunk的成員變數咯

    final PoolArena<T> arena;//chunk所屬的arena
    final T memory;//擁有的記憶體塊
    final boolean unpooled;//是否非池化
    final int offset;//偏移量

    private final byte[] memoryMap;//各個page分配的二叉樹
    private final byte[] depthMap;//高度二叉樹
    private final PoolSubpage<T>[] subpages;//subpage的節點陣列
    /** Used to determine if the requested capacity is equal to or greater than pageSize. */
    private final int subpageOverflowMask;//判斷分配的請求是subpage
    private final int pageSize;//頁大小 預設為8K
    private final int pageShifts;//1左移多少位 == pageSize  預設為13 即 1 << 13 =8k
    private final int maxOrder;//最大的高度 預設 11
    private final int chunkSize;//塊大小 預設 16MB
    private final int log2ChunkSize;//log2(chunkSize) 預設24
    private final int maxSubpageAllocs;//可分配的最大節點數 預設 1<<11= 2048 
    /** Used to mark memory as unusable */
    private final byte unusable;//標記節點為不可用  maxOrder +1 預設12

    private int freeBytes;//可分配的位元組數

    PoolChunkList<T> parent;//poolChunkList使用,雙向連結串列
    PoolChunk<T> prev;
    PoolChunk<T> next;

 該類有兩個建構函式,一個用於普通初始化,另一個用於Huge分配請求。

我們這就只關注普通初始化,上程式碼

    PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
        unpooled = false;
        this.arena = arena;
        this.memory = memory;
        this.pageSize = pageSize;
        this.pageShifts = pageShifts;
        this.maxOrder = maxOrder;
        this.chunkSize = chunkSize;
        this.offset = offset;

        unusable = (byte) (maxOrder + 1);//不可用標記
        log2ChunkSize = log2(chunkSize);
        subpageOverflowMask = ~(pageSize - 1);//判斷是否是subpage請求的標誌  
        freeBytes = chunkSize;//可分配的位元組數

        assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
        maxSubpageAllocs = 1 << maxOrder;//subpage最大分配個數


        //初始化資料
        // Generate the memory map.
        memoryMap = new byte[maxSubpageAllocs << 1];
        depthMap = new byte[memoryMap.length];
        int memoryMapIndex = 1;
        for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
            int depth = 1 << d;
            for (int p = 0; p < depth; ++ p) {
                // in each level traverse left to right and set value to the depth of subtree
                memoryMap[memoryMapIndex] = (byte) d;
                depthMap[memoryMapIndex] = (byte) d;
                memoryMapIndex ++;
            }
        }

        subpages = newSubpageArray(maxSubpageAllocs);
    }

 構造方法沒什麼好看的,無非就是給各個屬性賦值,在給memoryMap 和depthMap初始化各個值。

下面看下一個核心的方法long allocate(int normCapacity)

    //首先判斷要分配的空間大小是否 >= pageSize
    if ((normCapacity & subpageOverflowMask) != 0) { 
            return allocateRun(normCapacity);//走正常分配流程
        } else {
            return allocateSubpage(normCapacity);//分配subpage
        }

 這個方法疑惑的地方應該就是(normCapacity & subpageOverflowMask) != 0這個判斷了。因為netty追求效能,所以把位運算算是運用到了極致。之前再構造方法裡看了  subpageOverflowMask = ~(pageSize-1),按預設的來說 即subpageOverflowMask = ~(8192-1) = -8972;而-8192 & 小於 8972的數都是為0 的,缺少畫圖工具,圖就不劃了,可以自己敲程式碼驗證一下。

long allocateRun(int normCapacity)

    //該方法至少會分配一個pageSize的記憶體
     private long allocateRun(int normCapacity) {
        //首先計算出所需要的節點的高度
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        //在該層找到能分配的節點
        int id = allocateNode(d);
        //沒找到
        if (id < 0) {
            return id;
        }
        //計算剩餘能分配的位元組數
        freeBytes -= runLength(id);
        return id;
    }

 private int allocateNode(int d)

     private int allocateNode(int d) {
        int id = 1;
        //和subpageOverflowMask作用相當,所有小於 d 的值 &initial 都等於0
        int initial = - (1 << d); 
        //memoryMap[id]
        byte val = value(id);
        //表示沒有滿足條件的節點
        if (val > d) { // unusable
            return -1;
        }
        //子節點可滿足田間
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            //左移一位,進入子節點
            id <<= 1;
            val = value(id);
            if (val > d) {//左節點不滿足
                id ^= 1;//右節點
                val = value(id);
            }
        }
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);
        //把符合條件的節點標記位不可用  memoryMap[id] = unusable;
        setValue(id, unusable); // mark as unusable
        //更新父節點的分配資訊
        updateParentsAlloc(id);
        return id;
    }

 private void updateParentsAlloc(int id)

         while (id > 1) {    
            //取父節點
            int parentId = id >>> 1;
            byte val1 = value(id);//獲取該節點的值
            byte val2 = value(id ^ 1);//獲取該節點的兄弟節點的值
            byte val = val1 < val2 ? val1 : val2;//獲取較小的值
            setValue(parentId, val);//把較小的值付給父節點
            id = parentId;//遞迴更新
        }

 下面分析分配的空間 <pageSize的情況

  allocateSubpage(normCapacity)

        //找到subpage的頭節點
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        //加鎖,因為在分配的過程中會修改連結串列的結構
        synchronized (head) {
            //subpage只能在最高的chunk裡分配
            int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
            //
            int id = allocateNode(d);
            //沒有節點可以分配
            if (id < 0) {
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            freeBytes -= pageSize;
            //獲取subpage的偏移索引
            int subpageIdx = subpageIdx(id);
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else {
                subpage.init(head, normCapacity);
            }
            return subpage.allocate();
        }

 int subpageIdx(int memoryMapIdx)

 return memoryMapIdx ^ maxSubpageAllocs;

移除高位的值,保留地位的值,預設來說,= memoryMapIdx-maxSubpageAllocs

最後再來說說記憶體的釋放過程,

主要的方法為void free(long handle)

    void free(long handle) {
        //取低位 32位獲取節點
        int memoryMapIdx = memoryMapIdx(handle);
        //取高位32位回去subpage
        int bitmapIdx = bitmapIdx(handle);
        //釋放subpage
        if (bitmapIdx != 0) { // free a subpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage != null && subpage.doNotDestroy;

            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
            synchronized (head) {
                if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                    return;
                }
            }
        }
        freeBytes += runLength(memoryMapIdx);//更新空用位元組
        setValue(memoryMapIdx, depth(memoryMapIdx));//還原節點的高度值
        updateParentsFree(memoryMapIdx);//更新父節點
    }

至此,poolchunk就差不多講完了。