1. 程式人生 > 實用技巧 >Netty原始碼解析 -- PoolChunk實現原理

Netty原始碼解析 -- PoolChunk實現原理

本文主要分享Netty中PoolChunk如何管理記憶體。
原始碼分析基於Netty 4.1.52

記憶體管理演算法

首先說明PoolChunk記憶體組織方式。
PoolChunk的記憶體大小預設是16M,Netty將它劃分為2048個page,每個page為8K。
PoolChunk上可以分配Normal記憶體塊。
Normal記憶體塊大小必須是page的倍數。

PoolChunk通過runsAvail欄位管理記憶體塊。
PoolChunk#runsAvail是PriorityQueue陣列,其中PriorityQueue存放的是handle。
handle可以理解為一個控制代碼,維護一個記憶體塊的資訊,由以下部分組成

  • o: runOffset ,在chunk中page偏移索引,從0開始,15bit
  • s: size,當前位置可分配的page數量,15bit
  • u: isUsed,是否使用?, 1bit
  • e: isSubpage,是否在subpage中, 1bit
  • b: bitmapIdx,記憶體塊在subpage中的索引,不在subpage則為0, 32bit

前面《記憶體對齊類SizeClasses》文章說過,SizeClasses將sizeClasses表格中isMultipageSize為1的行取出可以組成一個新表格,這裡稱為Page表格

runsAvail陣列預設長度為40,每個位置index上放的handle代表了存在一個可用記憶體塊,並且可分配pageSize大於等於(pageIdx=index)上的pageSize,小於(pageIdex=index+1)的pageSize。
如runsAvail[11]上的handle的size可分配pageSize可能為16 ~ 19,
假如runsAvail[11]上handle的size為18,如果該handle分配了7個page,剩下的11個page,這時要將handle移動runsAvail[8](當然,handle的資訊要調整)。
這時如果要找分配6個page,就可以從runsAvail[5]開始查詢runsAvail陣列,如果前面runsAvail[5]~runsAvail[7]都沒有handle,就找到了runsAvail[8]。
分配6個page之後,剩下的5個page,handle移動runsAvail[4]。

先看一下PoolChunk的建構函式

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
    // #1
    unpooled = false;
    this.arena = arena;
    this.memory = memory;
    this.pageSize = pageSize;
    this.pageShifts = pageShifts;
    this.chunkSize = chunkSize;
    this.offset = offset;
    freeBytes = chunkSize;

    runsAvail = newRunsAvailqueueArray(maxPageIdx);
    runsAvailMap = new IntObjectHashMap<Long>();
    subpages = new PoolSubpage[chunkSize >> pageShifts];

    // #2
    int pages = chunkSize >> pageShifts;
    long initHandle = (long) pages << SIZE_SHIFT;
    insertAvailRun(0, pages, initHandle);

    cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}

#1
unpooled: 是否使用記憶體池
arena:該PoolChunk所屬的PoolArena
memory:底層的記憶體塊,對於堆記憶體,它是一個byte陣列,對於直接記憶體,它是(jvm)ByteBuffer,但無論是哪種形式,其記憶體大小預設都是16M。
pageSize:page大小,預設為8K。
chunkSize:整個PoolChunk的記憶體大小,預設為16777216,即16M。
offset:底層記憶體對齊偏移量,預設為0。
runsAvail:初始化runsAvail
runsAvailMap:記錄了每個記憶體塊開始位置和結束位置的runOffset和handle對映。

#2 insertAvailRun方法在runsAvail陣列最後位置插入一個handle,該handle代表page偏移位置為0的地方可以分配16M的記憶體塊

記憶體分配

PoolChunk#allocate

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
    final long handle;
    // #1
    if (sizeIdx <= arena.smallMaxSizeIdx) {
        // small
        handle = allocateSubpage(sizeIdx);
        if (handle < 0) {
            return false;
        }
        assert isSubpage(handle);
    } else {
        // #2
        int runSize = arena.sizeIdx2size(sizeIdx);
        handle = allocateRun(runSize);
        if (handle < 0) {
            return false;
        }
    }

    // #3
    ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
    initBuf(buf, nioBuffer, handle, reqCapacity, cache);
    return true;
}

#1 處理Small記憶體塊申請,呼叫allocateSubpage方法處理,後續文章解析。
#2 處理Normal記憶體塊申請
sizeIdx2size方法根據記憶體塊索引查詢對應記憶體塊size。sizeIdx2size是PoolArena父類SizeClasses提供的方法,可參考系列文章《記憶體對齊類SizeClasses》。
allocateRun方法負責分配Normal記憶體塊,返回handle儲存了分配的記憶體塊大小和偏移量。

#3 使用handle和底層記憶體類(ByteBuffer)初始化ByteBuf了。

private long allocateRun(int runSize) {
    // #1
    int pages = runSize >> pageShifts;
    // #2
    int pageIdx = arena.pages2pageIdx(pages);

    synchronized (runsAvail) {
        //find first queue which has at least one big enough run
        // #3
        int queueIdx = runFirstBestFit(pageIdx);
        if (queueIdx == -1) {
            return -1;
        }

        //get run with min offset in this queue
        PriorityQueue<Long> queue = runsAvail[queueIdx];
        long handle = queue.poll();

        assert !isUsed(handle);
        // #4
        removeAvailRun(queue, handle);
        // #5
        if (handle != -1) {
            handle = splitLargeRun(handle, pages);
        }
        // #6
        freeBytes -= runSize(pageShifts, handle);
        return handle;
    }
}

#1 計算所需的page數量
#2 計算對應的pageIdx
注意,pages2pageIdx方法會將申請記憶體大小對齊為上述Page表格中的一個size。例如申請172032位元組(21個page)的記憶體塊,pages2pageIdx方法計算結果為13,實際分配196608(24個page)的記憶體塊。
#3 從pageIdx開始遍歷runsAvail,找到第一個handle。
該handle上可以分配所需記憶體塊。
#4 從runsAvail,runsAvailMap移除該handle資訊
#5#3步驟找到的handle上劃分出所要的記憶體塊。
#6 減少可用記憶體位元組數

private long splitLargeRun(long handle, int needPages) {
    assert needPages > 0;

    // #1
    int totalPages = runPages(handle);
    assert needPages <= totalPages;

    int remPages = totalPages - needPages;

    // #2 
    if (remPages > 0) {
        int runOffset = runOffset(handle);

        // keep track of trailing unused pages for later use
        int availOffset = runOffset + needPages;
        long availRun = toRunHandle(availOffset, remPages, 0);
        insertAvailRun(availOffset, remPages, availRun);

        // not avail
        return toRunHandle(runOffset, needPages, 1);
    }

    //mark it as used
    handle |= 1L << IS_USED_SHIFT;
    return handle;
}

#1 totalPages,從handle中獲取當前位置可用page數。
remPages,分配後剩餘page數。
#2 剩餘page數大於0
availOffset,計算剩餘page開始偏移量
生成一個新的handle,availRun
insertAvailRun將availRun插入到runsAvail,runsAvailMap中

記憶體釋放

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
    ...

    // #1
    int pages = runPages(handle);

    synchronized (runsAvail) {
        // collapse continuous runs, successfully collapsed runs
        // will be removed from runsAvail and runsAvailMap
        // #2
        long finalRun = collapseRuns(handle);

        // #3
        finalRun &= ~(1L << IS_USED_SHIFT);
        //if it is a subpage, set it to run
        finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
        insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
        freeBytes += pages << pageShifts;
    }

    if (nioBuffer != null && cachedNioBuffers != null &&
        cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
        cachedNioBuffers.offer(nioBuffer);
    }
}

#1 計算釋放的page數
#2 如果可以,將前後的可用記憶體塊進行合併
#3 插入新的handle

collapseRuns

private long collapseRuns(long handle) {
    return collapseNext(collapsePast(handle));
}

collapsePast方法合併前面的可用記憶體塊
collapseNext方法合併後面的可用記憶體塊

private long collapseNext(long handle) {
    for (;;) {
        // #1
        int runOffset = runOffset(handle);
        int runPages = runPages(handle);

        Long nextRun = getAvailRunByOffset(runOffset + runPages);
        if (nextRun == null) {
            return handle;
        }

        int nextOffset = runOffset(nextRun);
        int nextPages = runPages(nextRun);

        //is continuous
        // #2
        if (nextRun != handle && runOffset + runPages == nextOffset) {
            //remove next run
            removeAvailRun(nextRun);
            handle = toRunHandle(runOffset, runPages + nextPages, 0);
        } else {
            return handle;
        }
    }
}

#1 getAvailRunByOffset方法從runsAvailMap中找到下一個記憶體塊的handle。
#2 如果是連續的記憶體塊,則移除下一個記憶體塊handle,並將其page合併生成一個新的handle。

下面來看一個例子

大家可以結合例子中runsAvail和記憶體使用情況的變化,理解上面的程式碼。
實際上,2個Page的記憶體塊是通過Subpage分配,回收時會放回執行緒快取中而不是直接釋放存塊,但為了展示PoolChunk中記憶體管理過程,圖中不考慮這些場景。

PoolChunk在Netty 4.1.52版本修改了演算法,引入了jemalloc 4的演算法 -- https://github.com/netty/netty/commit/0d701d7c3c51263a1eef56d5a549ef2075b9aa9e#diff-6850686cf7ebc7b9ddb873389ded45ebf40e6c1ccf411c44b744e7d3ca2ff774
Netty 4.1.52之前的版本,PoolChunk引入的是jemalloc 3的演算法,使用二叉樹管理記憶體塊。有興趣的同學可以參考我後續的文章《PoolChunk實現(jemalloc 3的演算法)》

如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!