1. 程式人生 > 其它 >Netty-記憶體池原始碼二 (PooledByteBufAllocator)

Netty-記憶體池原始碼二 (PooledByteBufAllocator)

Netty-記憶體池原始碼二 (PooledByteBufAllocator)

記憶體池核心類如下:

  • PooledByteBufAllocator 本期介紹
  • PooledUnsafeDirectByteBuf
  • PooledUnsafeDirectByteBuf
  • PoolThreadCache
  • MemoryRegionCache
  • PoolArena
  • SizeClasses
  • PoolChunk
  • LongPriorityQueue
  • LongLongHashMap
  • PoolSubpage

一、詳情概要

從上一期文章 我們可以知道 記憶體分配的入口是 【PooledByteBufAllocator #newDirectBuffer()

】。【PooledByteBufAllocator】 該類,也就是記憶體分配的 起始類, 其內部定義了很多 預設變數值 用於接下來的記憶體分配整個流程。

PooledByteBufAllocator】的結構圖如下:

從上圖 可看到有三個陌生的類:

​ 1.【PoolArena】(DirectArena, HeapArena都繼承於該類) : 可以把他想象成一塊大記憶體(直接記憶體/ 堆記憶體)

​ 2.【PoolThreadCache】: 執行緒本地記憶體快取池

​ 3.【InternalThreadLocalMap】: 目前把他當成 ThreadLocalMap 就行

在講解原始碼前,請 思考一個問題 :

從圖中可看到 不管是 DirectArena 還是 HeapArena,他們都有多個,且各自組成了陣列 分別是directArenas 和 heapArenas。 前面說了PoolArena , 我們目前可以看成是一塊大記憶體, 當業務來申請記憶體時,需要從PoolArena 這塊大記憶體中 擷取一塊來使用就行。 但是 為什麼要有多個PoolArena呢

解答:

首先我們假設就只有一個PoolArena, 眾所周知 現在的 CPU 一般都是多核的, 此時有多個業務(多執行緒) 同時從Netty中 申請記憶體(從大記憶體中偏移出多個小記憶體)來使用,而本質上是多核CPU來操作這同一塊大記憶體 進行讀寫。但是 由於 作業系統的讀寫記憶體屏障

存在, 會導致多個執行緒的讀寫並不能做到真正的並行。 因此Netty用了多個PoolArena 來減輕這種不能並行的行為,從而提升效率

二、 原始碼分析

1.關鍵成員變數

   	
	// 預設 HeapArena的個數   cpu*2
    private static final int DEFAULT_NUM_HEAP_ARENA;

	// 預設 DirectArena的個數 cpu*2
    private static final int DEFAULT_NUM_DIRECT_ARENA;

 	// 預設一頁大小 PageSize  8192 => 8KB
    private static final int DEFAULT_PAGE_SIZE;
	
	// 用來計算預設Chunk的大小,在jemalloc3 中同時還表示Chunk內部完全二叉樹的最大深度。
    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk

    // 表示預設poolThreadCache中 一個smallMemoryRegionCache中的佇列長度  256
    private static final int DEFAULT_SMALL_CACHE_SIZE;

	// 表示預設poolThreadCache中 一個normalMemoryRegionCache中的佇列長度 64
    private static final int DEFAULT_NORMAL_CACHE_SIZE;

	//  表示最大快取規格 32KB
    static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;

	// 表示 預設 某個執行緒從快取中獲取記憶體的最大次數限制
    private static final int DEFAULT_CACHE_TRIM_INTERVAL;

	// 表示 是否允許所有的執行緒使用快取  預設是true
    private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
	
	// 預設記憶體快取對齊填充為0
    private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
     
	// heapArena陣列
    private final PoolArena<byte[]>[] heapArenas;

	// directArena陣列
    private final PoolArena<ByteBuffer>[] directArenas;

	// PoolThreadLocalCache 執行緒本地快取
    private final PoolThreadLocalCache threadCache;
	
	// chunk的大小
    private final int chunkSize;

2.構造方法

    /**
     *
     * @param preferDirect          是否申請直接記憶體                   預設一般都是true
     * @param nHeapArena            heapArena的個數                  假設 cpu*2
     * @param nDirectArena          directArena的個數                假設 cpu*2 
     * @param pageSize              頁的大小                          預設8KB (8192)
     * @param maxOrder             chunk中完全平衡二叉樹的深度           11
     * @param smallCacheSize     smallMemoryRegionCache的佇列長度      256
     * @param normalCacheSize    normalMemoryRegionCache的佇列長度     64
     * @param useCacheForAllThreads 是否所有的執行緒都使用PoolThreadCache  true
     * @param directMemoryCacheAlignment 對齊填充                      0
     */
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        
        // directByDefault = true
        super(preferDirect);

        // 目前理解成 threadLocal 每個執行緒中有自己的 PoolThreadLocalCache快取
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        
        // 賦值 smallCacheSize=256  normalCacheSize=64
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
		
        
        // 計算chunkSize   8KB<<11 = 16mb (16,777,216)
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
		

        // pageSize=8KB(8192)  
        //pageShifts表示 1 左移多少位是 8192  = 13
        int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
        
        
        
        // 生成 heapArenas 陣列
        if (nHeapArena > 0) {
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }
		
        // netty預設情況下都會使用 直接記憶體,因此我們在整個Netty中關心直接記憶體相關就可以了,而且直接記憶體與堆記憶體邏輯並無太多差異。
        
        // 生成 directArena陣列  
        if (nDirectArena > 0) {

            // 假設平臺CPU 個數是8, 這裡會建立 cpu(8)*2 = 16個長度的 directArenas陣列。
            directArenas = newArenaArray(nDirectArena);

            // 這是個記憶體池圖表 
            //如果想要監測 記憶體池詳情 可使用該物件(關於記憶體分配邏輯不用關心Metric相關物件)
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);

            // for迴圈最終建立了 16個 directArena 物件,並且 將這些DirectArena物件放入到陣列內
            for (int i = 0; i < directArenas.length; i ++) {
                
                // 引數1:allocator 物件
                // 引數2:pageSize 8k
                // 引數3:pageShift 13  1<<13 可推出pageSize的值
                // 引數4:chunkSize: 16mb
                // 引數5:directMemoryCacheAlignment 對齊填充 0
                // 生成 DirectArena物件
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        
        metric = new PooledByteBufAllocatorMetric(this);
    }

3.申請記憶體入口

    /**
     * 申請分配直接記憶體 入口
     * @param initialCapacity  業務需求記憶體大小
     * @param maxCapacity      記憶體最大限制
     * @return  ByteBuf netty中記憶體物件
     */
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {

        // 若當前執行緒沒有PoolThreadCache 則建立一份 PoolThreadCache (裡面包含 Small、Normal MemroyRegionCache陣列【該陣列中每個元素包含一個佇列】)用於快取記憶體物件
      	// 若當前執行緒有 則直接獲取
        PoolThreadCache cache = threadCache.get();

        // 拿到當前執行緒cache中繫結的directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;

        // 這個條件正常邏輯 都會成立
        if (directArena != null) {
            
            //這是咱們的核心入口 ******  
            // 引數1: cache 當前執行緒相關的PoolThreadCache物件
            // 引數2:initialCapactiy 業務層需要的記憶體容量
            // 引數3:maxCapacity 最大記憶體大小
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            // 一般不會走到這裡,不用看
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

三、總結

PooledByteBufAllocator】 類主要是

  1. 設定些預設變數值。 pageSize , chunkSize

  2. 併為每個申請記憶體的執行緒 建立一份 【PoolThreadCache】快取。

  3. 根據當前平臺的CPU數量,設定 【PoolArena 】 ,而最終申請記憶體的工作還是交給了 【PoolArena】。

萬般皆下品,唯有讀書高!