1. 程式人生 > >DPDK-記憶體管理分析一

DPDK-記憶體管理分析一

前言

《DPDK-大頁記憶體使用分析》中粗略分析了DPDK獲取hugepage配置和記憶體對映的流程,並提到儲存了相關資訊在全域性的memseg陣列中。到此為止,DPDK相關程序就可以像使用普通記憶體一樣使用這些hugepage。但是具體如何使用,還需要進一步的分析DPDK的記憶體管理結構。

另:落筆之前,並未仔細研讀過DPDK Programming Guide中關於記憶體管理的描述。曾粗粗瀏覽過,印象並不深刻,此文全憑個人對DPDK原始碼的理解及猜想寫成,箇中缺漏及謬誤望指正。

DPDK Version: 17.11.2

Date: 2018-06-18, Created by HRG

正文

首先,猜測DPDK的記憶體管理基礎模組就是memzone模組。

在eal.c rte_eal_init()中呼叫了memzone的初始化函式rte_eal_memzone_init() 。

	if (rte_eal_memzone_init() < 0) {
		rte_eal_init_alert("Cannot init memzone\n");
		rte_errno = ENODEV;
		return -1;
	}

rte_eal_memzone_init()前把memzone clear掉了,最後返回rte_eal_malloc_heap_init()的呼叫結果。

int
rte_eal_memzone_init(void)
{
	rte_rwlock_write_lock(&mcfg->mlock);
	/* delete all zones */
	mcfg->memzone_cnt = 0;
	memset(mcfg->memzone, 0, sizeof(mcfg->memzone));
	rte_rwlock_write_unlock(&mcfg->mlock);

	return rte_eal_malloc_heap_init();
}

我們首先著重看下這個函式最後呼叫的rte_eal_malloc_heap_init(),這裡針對已經整理排序好的所有memseg迴圈呼叫malloc_heap_add_memseg,將memseg新增到全域性的malloc_heaps陣列中去。對於NUMA系統來說,有多少個NUMA節點這個malloc_heaps陣列就有多少個成員,每個成員都管理著對應NUMA節點的memseg。

int
rte_eal_malloc_heap_init(void)
{
for (ms = &mcfg->memseg[0], ms_cnt = 0;
(ms_cnt < RTE_MAX_MEMSEG) && (ms->len > 0);
ms_cnt++, ms++) {
malloc_heap_add_memseg(&mcfg->malloc_heaps[ms->socket_id], ms);
}
	return 0;
};

我們來看看這個malloc_heaps結構體細節,其中free_head是一個有13個成員的陣列,此處使用了Linux下的系統庫sys/queue.h進行定義,heap中的free_head對應每一個queue的起點,元素的結構體是malloc_elem。即malloc_heaps下總共有RTE_HEAP_NUM_FREELISTS=13個成員是malloc_elem的佇列。

struct malloc_heap {
	rte_spinlock_t lock;
	LIST_HEAD(, malloc_elem) free_head[RTE_HEAP_NUM_FREELISTS];
	unsigned alloc_count;
	size_t total_size;
} __rte_cache_aligned;

對於heap來說,上面每次呼叫malloc_heap_add_memseg時,就會將每個memseg整理成malloc_elem的形式並放到合適的佇列中。每個佇列對應於一定範圍大小的memseg,比如下面這樣的佈局:

 * Example element size ranges for a heap with five free lists:
 *   heap->free_head[0] - (0   , 2^8]
 *   heap->free_head[1] - (2^8 , 2^10]
 *   heap->free_head[2] - (2^10 ,2^12]
 *   heap->free_head[3] - (2^12, 2^14]
 *   heap->free_head[4] - (2^14, MAX_SIZE]

接下來看malloc_heap_add_memseg()怎麼將memseg整理成malloc_elem。首先取memseg指向的大頁共享記憶體的起始地址和cache line對齊後的結束地址,計算對齊後的memseg長度。然後呼叫malloc_elem_init和malloc_elem_mkend進行初始化,在memseg頭部和尾部填寫相關的指標資訊和cookie,至此,這個memseg就可以用start_elem指標代替了,對應的這塊記憶體就歸屬於這個malloc_elem。最後呼叫malloc_elem_free_list_insert將這個elem插入到合適的heap下的13個佇列之一,具體哪個佇列就要看memseg的長度了。

static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
	/* allocate the memory block headers, one at end, one at start */
	struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
	struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,
			ms->len - MALLOC_ELEM_OVERHEAD);
	end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
	const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;

	malloc_elem_init(start_elem, heap, ms, elem_size);
	malloc_elem_mkend(end_elem, start_elem);
	malloc_elem_free_list_insert(start_elem);

	heap->total_size += elem_size;
}

我們看看malloc_elem結構體的內容就知道memseg頭部和尾部大概填寫的是什麼內容了。

struct malloc_elem {
	struct malloc_heap *heap;
	struct malloc_elem *volatile prev;      /* points to prev elem in memseg */
	LIST_ENTRY(malloc_elem) free_list;      /* list of free elements in heap */
	const struct rte_memseg *ms;
	volatile enum elem_state state;
	uint32_t pad;
	size_t size;
#ifdef RTE_MALLOC_DEBUG
	uint64_t header_cookie;         /* Cookie marking start of data */
	                                /* trailer cookie at start + size */
#endif
} __rte_cache_aligned;

通過以上分析,可以得到大概的DPDK記憶體管理結構了:

1、在NUMA系統中,每一個NUMA節點都有對應的heap,當然,非NUMA系統就只有一個heap了。(這個名字取得很好,程式在動態申請記憶體時一般是在程序記憶體結構中的heap中去取的,現在,DPDK自己去管理這些大頁記憶體以供後續程式動態申請,所以也叫做heap)。

2、一個heap對應13個佇列,每個佇列管理著一定大小範圍內的連續的記憶體塊。

3、猜想:程式動態申請記憶體時,將會根據申請的記憶體大小,到對應的heap下的記憶體佇列中尋找相應的空閒態記憶體塊返回給程式;程式free時,將記憶體塊整理好繼續放在佇列中。

為印證以上第3點猜想,繼續研究malloc_heap.c/.h檔案中的其他函式。

首先呼叫find_suitable_element()從heap中尋找合適的queue並返回合適的element。最後呼叫malloc_elem_alloc初始化相關的element。

void *
malloc_heap_alloc(struct malloc_heap *heap,
		const char *type __attribute__((unused)), size_t size, unsigned flags,
		size_t align, size_t bound)
{
	rte_spinlock_lock(&heap->lock);

	elem = find_suitable_element(heap, size, flags, align, bound);
	if (elem != NULL) {
		elem = malloc_elem_alloc(elem, size, align, bound);
		heap->alloc_count++;
	}
	rte_spinlock_unlock(&heap->lock);

	return elem == NULL ? NULL : (void *)(&elem[1]);
}

呼叫malloc_elem_free_list_index()確定要在heap的哪個queue中去取elem,這裡注意for迴圈,queue本身idx遞增就表明了queue所管理的記憶體塊長度範圍是遞增的。for迴圈一開始,先找個能容納使用者malloc需求的最小的queue,當這個queue記憶體用盡或者剩下的記憶體不足以滿足時,便往下一個更大的queue找去。

static struct malloc_elem *
find_suitable_element(struct malloc_heap *heap, size_t size,
		unsigned flags, size_t align, size_t bound)
{
	for (idx = malloc_elem_free_list_index(size);
			idx < RTE_HEAP_NUM_FREELISTS; idx++) {
		for (elem = LIST_FIRST(&heap->free_head[idx]);
				!!elem; elem = LIST_NEXT(elem, free_list)) {
			if (malloc_elem_can_hold(elem, size, align, bound)) {
				if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
					return elem;
			}
		}
	}
}

找到能滿足size和align需求之後,還要怎樣操作elem?因為elem可能是一塊連續的很大的記憶體,遠遠超出使用者需求的size的,因此後面肯定還會繼續處理elem以便更高效地使用elem所代表的記憶體塊。繼續看malloc_elem_alloc這個函式針對這個疑問做了什麼處理。

首先,根據size和align在elem中取對齊後的new elem(這裡可能返回NULL,怎麼沒看到指標安全檢查?),然後計算頭部和尾部剩餘的長度,接著把old elem從heap的queue中remove掉。

	struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
	const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
	const size_t trailer_size = elem->size - old_elem_size - size -
		MALLOC_ELEM_OVERHEAD;

	elem_free_list_remove(elem);

檢查尾部的長度是否能夠容得下一個elem(包括elem的頭部資訊和最小data長度),足夠大的話就將尾部空間分離出來,並插入到heap合適的queue中。

	if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
		/* split it, too much free space after elem */
		struct malloc_elem *new_free_elem =
				RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);

		split_elem(elem, new_free_elem);
		malloc_elem_free_list_insert(new_free_elem);
	}

檢查頭部長度也是否能夠容納一個elem,如果太小的話就將old elem的狀態置為ELEM_BUSY,並更新new elem的資訊。

	if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
		/* don't split it, pad the element instead */
		elem->state = ELEM_BUSY;
		elem->pad = old_elem_size;

		/* put a dummy header in padding, to point to real element header */
		if (elem->pad > 0) { /* pad will be at least 64-bytes, as everything
		                     * is cache-line aligned */
			new_elem->pad = elem->pad; //賦這個長度是為了後面找回原elem的起始地址嗎?
			new_elem->state = ELEM_PAD;
			new_elem->size = elem->size - elem->pad; //減去尾部後的old elem長度減去pad長度,剩下的就是新elem的長度了
			set_header(new_elem);
		}

		return new_elem;
	}

如果頭部的長度依然滿足一個elem的長度要求,那不能浪費記憶體,把頭部分離出來並插入到合適的heap的queue中去。至此,new elem已經準備好了,將狀態置為busy,返回。

	split_elem(elem, new_elem);
	new_elem->state = ELEM_BUSY;
	malloc_elem_free_list_insert(elem);

	return new_elem;