Netty-記憶體池原始碼二 (PooledByteBufAllocator)
Netty-記憶體池原始碼二 (PooledByteBufAllocator)
記憶體池核心類如下:
- PooledByteBufAllocator 本期介紹
- PooledUnsafeDirectByteBuf
- PooledUnsafeDirectByteBuf
- PoolThreadCache
- MemoryRegionCache
- PoolArena
- SizeClasses
- PoolChunk
- LongPriorityQueue
- LongLongHashMap
- PoolSubpage
一、詳情概要
從上一期文章 我們可以知道 記憶體分配的入口是 【PooledByteBufAllocator #newDirectBuffer()
PooledByteBufAllocator
】 該類,也就是記憶體分配的 起始類, 其內部定義了很多 預設變數值 用於接下來的記憶體分配整個流程。
【PooledByteBufAllocator
】的結構圖如下:
從上圖 可看到有三個陌生的類:
1.【PoolArena
】(DirectArena, HeapArena都繼承於該類) : 可以把他想象成一塊大記憶體(直接記憶體/ 堆記憶體)
2.【PoolThreadCache
】: 執行緒本地記憶體快取池
3.【InternalThreadLocalMap
】: 目前把他當成 ThreadLocalMap 就行
在講解原始碼前,請 思考一個問題 :
從圖中可看到 不管是 DirectArena 還是 HeapArena,他們都有多個,且各自組成了陣列 分別是directArenas 和 heapArenas。 前面說了PoolArena , 我們目前可以看成是一塊大記憶體, 當業務來申請記憶體時,需要從PoolArena 這塊大記憶體中 擷取一塊來使用就行。 但是 為什麼要有多個PoolArena呢 ?
解答:
首先我們假設就只有一個PoolArena, 眾所周知 現在的 CPU 一般都是多核的, 此時有多個業務(多執行緒) 同時從Netty中 申請記憶體(從大記憶體中偏移出多個小記憶體)來使用,而本質上是多核CPU來操作這同一塊大記憶體 進行讀寫。但是 由於 作業系統的讀寫記憶體屏障
二、 原始碼分析
1.關鍵成員變數
// 預設 HeapArena的個數 cpu*2
private static final int DEFAULT_NUM_HEAP_ARENA;
// 預設 DirectArena的個數 cpu*2
private static final int DEFAULT_NUM_DIRECT_ARENA;
// 預設一頁大小 PageSize 8192 => 8KB
private static final int DEFAULT_PAGE_SIZE;
// 用來計算預設Chunk的大小,在jemalloc3 中同時還表示Chunk內部完全二叉樹的最大深度。
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
// 表示預設poolThreadCache中 一個smallMemoryRegionCache中的佇列長度 256
private static final int DEFAULT_SMALL_CACHE_SIZE;
// 表示預設poolThreadCache中 一個normalMemoryRegionCache中的佇列長度 64
private static final int DEFAULT_NORMAL_CACHE_SIZE;
// 表示最大快取規格 32KB
static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
// 表示 預設 某個執行緒從快取中獲取記憶體的最大次數限制
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
// 表示 是否允許所有的執行緒使用快取 預設是true
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
// 預設記憶體快取對齊填充為0
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
// heapArena陣列
private final PoolArena<byte[]>[] heapArenas;
// directArena陣列
private final PoolArena<ByteBuffer>[] directArenas;
// PoolThreadLocalCache 執行緒本地快取
private final PoolThreadLocalCache threadCache;
// chunk的大小
private final int chunkSize;
2.構造方法
/**
*
* @param preferDirect 是否申請直接記憶體 預設一般都是true
* @param nHeapArena heapArena的個數 假設 cpu*2
* @param nDirectArena directArena的個數 假設 cpu*2
* @param pageSize 頁的大小 預設8KB (8192)
* @param maxOrder chunk中完全平衡二叉樹的深度 11
* @param smallCacheSize smallMemoryRegionCache的佇列長度 256
* @param normalCacheSize normalMemoryRegionCache的佇列長度 64
* @param useCacheForAllThreads 是否所有的執行緒都使用PoolThreadCache true
* @param directMemoryCacheAlignment 對齊填充 0
*/
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
// directByDefault = true
super(preferDirect);
// 目前理解成 threadLocal 每個執行緒中有自己的 PoolThreadLocalCache快取
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
// 賦值 smallCacheSize=256 normalCacheSize=64
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
// 計算chunkSize 8KB<<11 = 16mb (16,777,216)
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
// pageSize=8KB(8192)
//pageShifts表示 1 左移多少位是 8192 = 13
int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
// 生成 heapArenas 陣列
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// netty預設情況下都會使用 直接記憶體,因此我們在整個Netty中關心直接記憶體相關就可以了,而且直接記憶體與堆記憶體邏輯並無太多差異。
// 生成 directArena陣列
if (nDirectArena > 0) {
// 假設平臺CPU 個數是8, 這裡會建立 cpu(8)*2 = 16個長度的 directArenas陣列。
directArenas = newArenaArray(nDirectArena);
// 這是個記憶體池圖表
//如果想要監測 記憶體池詳情 可使用該物件(關於記憶體分配邏輯不用關心Metric相關物件)
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
// for迴圈最終建立了 16個 directArena 物件,並且 將這些DirectArena物件放入到陣列內
for (int i = 0; i < directArenas.length; i ++) {
// 引數1:allocator 物件
// 引數2:pageSize 8k
// 引數3:pageShift 13 1<<13 可推出pageSize的值
// 引數4:chunkSize: 16mb
// 引數5:directMemoryCacheAlignment 對齊填充 0
// 生成 DirectArena物件
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
3.申請記憶體入口
/**
* 申請分配直接記憶體 入口
* @param initialCapacity 業務需求記憶體大小
* @param maxCapacity 記憶體最大限制
* @return ByteBuf netty中記憶體物件
*/
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 若當前執行緒沒有PoolThreadCache 則建立一份 PoolThreadCache (裡面包含 Small、Normal MemroyRegionCache陣列【該陣列中每個元素包含一個佇列】)用於快取記憶體物件
// 若當前執行緒有 則直接獲取
PoolThreadCache cache = threadCache.get();
// 拿到當前執行緒cache中繫結的directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
// 這個條件正常邏輯 都會成立
if (directArena != null) {
//這是咱們的核心入口 ******
// 引數1: cache 當前執行緒相關的PoolThreadCache物件
// 引數2:initialCapactiy 業務層需要的記憶體容量
// 引數3:maxCapacity 最大記憶體大小
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 一般不會走到這裡,不用看
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
三、總結
【PooledByteBufAllocator
】 類主要是
-
設定些預設變數值。 pageSize , chunkSize等
-
併為每個申請記憶體的執行緒 建立一份 【
PoolThreadCache
】快取。 -
根據當前平臺的CPU數量,設定 【
PoolArena
】 ,而最終申請記憶體的工作還是交給了 【PoolArena
】。