1. 程式人生 > >dpdk驅動收包

dpdk驅動收包

dpdk驅動根據inter網絡卡驅動進行修改簡化而來,減少記憶體拷貝,替換inter網絡卡的中斷模式取資料,採用輪詢模式。

記憶體初始化

首先來看記憶體的初始化:int ret = rte_eal_init(argc, argv);

int
rte_eal_init(int argc, char **argv)
{
	int i, fctret, ret;
	pthread_t thread_id;
	static rte_atomic32_t run_once = RTE_ATOMIC32_INIT(0);
	struct shared_driver *solib = NULL;
	const char *logid;
	char cpuset[RTE_CPU_AFFINITY_STR_LEN];

	if (!rte_atomic32_test_and_set(&run_once))
		return -1;

	logid = strrchr(argv[0], '/');
	logid = strdup(logid ? logid + 1: argv[0]);

	thread_id = pthread_self();

	if (rte_eal_log_early_init() < 0)
		rte_panic("Cannot init early logs\n");

	if (rte_eal_cpu_init() < 0)
		rte_panic("Cannot detect lcores\n");

	fctret = eal_parse_args(argc, argv);
	if (fctret < 0)
		exit(1);

	/* set log level as early as possible */
	rte_set_log_level(internal_config.log_level);

	if (internal_config.no_hugetlbfs == 0 &&
			internal_config.process_type != RTE_PROC_SECONDARY &&
			internal_config.xen_dom0_support == 0 &&
			eal_hugepage_info_init() < 0)//獲取huge的資訊,和選取一個有效的huge
		rte_panic("Cannot get hugepage information\n");

	if (internal_config.memory == 0 && internal_config.force_sockets == 0) {
		if (internal_config.no_hugetlbfs)
			internal_config.memory = MEMSIZE_IF_NO_HUGE_PAGE;
		else
			internal_config.memory = eal_get_hugepage_mem_size();
	}

	if (internal_config.vmware_tsc_map == 1) {
#ifdef RTE_LIBRTE_EAL_VMWARE_TSC_MAP_SUPPORT
		rte_cycles_vmware_tsc_map = 1;
		RTE_LOG (DEBUG, EAL, "Using VMWARE TSC MAP, "
				"you must have monitor_control.pseudo_perfctr = TRUE\n");
#else
		RTE_LOG (WARNING, EAL, "Ignoring --vmware-tsc-map because "
				"RTE_LIBRTE_EAL_VMWARE_TSC_MAP_SUPPORT is not set\n");
#endif
	}

	rte_srand(rte_rdtsc());

	rte_config_init();//分配記憶體mmap對映

	if (rte_eal_pci_init() < 0)
		rte_panic("Cannot init PCI\n");//初始化pci匯流排

#ifdef RTE_LIBRTE_IVSHMEM
	if (rte_eal_ivshmem_init() < 0)
		rte_panic("Cannot init IVSHMEM\n");
#endif

	if (rte_eal_memory_init() < 0)//根據大頁大小分配對映空間
		rte_panic("Cannot init memory\n");

	/* the directories are locked during eal_hugepage_info_init */
	eal_hugedirs_unlock();

	if (rte_eal_memzone_init() < 0)//管理記憶體
		rte_panic("Cannot init memzone\n");

	if (rte_eal_tailqs_init() < 0)
		rte_panic("Cannot init tail queues for objects\n");

#ifdef RTE_LIBRTE_IVSHMEM
	if (rte_eal_ivshmem_obj_init() < 0)
		rte_panic("Cannot init IVSHMEM objects\n");
#endif

	if (rte_eal_log_init(logid, internal_config.syslog_facility) < 0)
		rte_panic("Cannot init logs\n");

	if (rte_eal_alarm_init() < 0)
		rte_panic("Cannot init interrupt-handling thread\n");

	if (rte_eal_intr_init() < 0)//IO的多路複用(輪詢)
		rte_panic("Cannot init interrupt-handling thread\n");

	if (rte_eal_timer_init() < 0)
		rte_panic("Cannot init HPET or TSC timers\n");

	eal_check_mem_on_local_socket();

	rte_eal_mcfg_complete();

	TAILQ_FOREACH(solib, &solib_list, next) {
		RTE_LOG(INFO, EAL, "open shared lib %s\n", solib->name);
		solib->lib_handle = dlopen(solib->name, RTLD_NOW);
		if (solib->lib_handle == NULL)
			RTE_LOG(WARNING, EAL, "%s\n", dlerror());
	}

	eal_thread_init_master(rte_config.master_lcore);

	ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);

	RTE_LOG(DEBUG, EAL, "Master lcore %u is ready (tid=%x;cpuset=[%s%s])\n",
		rte_config.master_lcore, (int)thread_id, cpuset,
		ret == 0 ? "" : "...");

	if (rte_eal_dev_init() < 0)//這個地方應該是在初始化裝置資訊。
		rte_panic("Cannot init pmd devices\n");

	RTE_LCORE_FOREACH_SLAVE(i) {

		/*
		 * create communication pipes between master thread
		 * and children
		 */
		if (pipe(lcore_config[i].pipe_master2slave) < 0)
			rte_panic("Cannot create pipe\n");
		if (pipe(lcore_config[i].pipe_slave2master) < 0)
			rte_panic("Cannot create pipe\n");

		lcore_config[i].state = WAIT;

		/* create a thread for each lcore */
		ret = pthread_create(&lcore_config[i].thread_id, NULL,
				     eal_thread_loop, NULL);
		if (ret != 0)
			rte_panic("Cannot create thread\n");
	}

	/*
	 * Launch a dummy function on all slave lcores, so that master lcore
	 * knows they are all ready when this function returns.
	 */
	rte_eal_mp_remote_launch(sync_func, NULL, SKIP_MASTER);
	rte_eal_mp_wait_lcore();

	/* Probe & Initialize PCI devices */
	if (rte_eal_pci_probe())
		rte_panic("Cannot probe PCI\n");

	return fctret;
}

其中空間分配最主要還是rte_eal_memory_init()這個函式,根據選取的大頁配置資訊進行空間的初始化以及對映。

int
rte_eal_memory_init(void)
{
	RTE_LOG(INFO, EAL, "Setting up memory...\n");
	const int retval = rte_eal_process_type() == RTE_PROC_PRIMARY ?
			rte_eal_hugepage_init() ://主執行緒呼叫
			rte_eal_hugepage_attach();//從執行緒呼叫
	if (retval < 0)
		return -1;

	if (internal_config.no_shconf == 0 && rte_eal_memdevice_init() < 0)
		return -1;

	return 0;
}
static int
rte_eal_hugepage_init(void)
{
    。。。。。。。。
    tmp_hp = malloc(nr_hugepages * sizeof(struct hugepage_file));
	if (tmp_hp == NULL)
		goto fail;

	memset(tmp_hp, 0, nr_hugepages * sizeof(struct hugepage_file));

	hp_offset = 0; /* where we start the current page size entries */

	/* map all hugepages and sort them */
	for (i = 0; i < (int)internal_config.num_hugepage_sizes; i ++){
		struct hugepage_info *hpi;

		/*
		 * we don't yet mark hugepages as used at this stage, so
		 * we just map all hugepages available to the system
		 * all hugepages are still located on socket 0
		 */
		hpi = &internal_config.hugepage_info[i];

		if (hpi->num_pages[0] == 0)
			continue;

		/* map all hugepages available 對映第一次的連續空間*/
		if (map_all_hugepages(&tmp_hp[hp_offset], hpi, 1) < 0){
			RTE_LOG(DEBUG, EAL, "Failed to mmap %u MB hugepages\n",
					(unsigned)(hpi->hugepage_sz / 0x100000));
			goto fail;
		}

		/* find physical addresses and sockets for each hugepage */
		//查詢對應的實體地址
		if (find_physaddrs(&tmp_hp[hp_offset], hpi) < 0){
			RTE_LOG(DEBUG, EAL, "Failed to find phys addr for %u MB pages\n",
					(unsigned)(hpi->hugepage_sz / 0x100000));
			goto fail;
		}
        //查詢對應是socke,找到對映的大頁記憶體被放在哪個NUMA node上
		if (find_numasocket(&tmp_hp[hp_offset], hpi) < 0){
			RTE_LOG(DEBUG, EAL, "Failed to find NUMA socket for %u MB pages\n",
					(unsigned)(hpi->hugepage_sz / 0x100000));
			goto fail;
		}

		if (sort_by_physaddr(&tmp_hp[hp_offset], hpi) < 0)//排序
			goto fail;
//重新對映虛擬空間平且釋放掉第一次對映的空間
#ifdef RTE_EAL_SINGLE_FILE_SEGMENTS
		/* remap all hugepages into single file segments */
		new_pages_count[i] = remap_all_hugepages(&tmp_hp[hp_offset], hpi);
		if (new_pages_count[i] < 0){
			RTE_LOG(DEBUG, EAL, "Failed to remap %u MB pages\n",
					(unsigned)(hpi->hugepage_sz / 0x100000));
			goto fail;
		}

		/* we have processed a num of hugepages of this size, so inc offset */
		hp_offset += new_pages_count[i];
#else
		/* remap all hugepages */
		if (map_all_hugepages(&tmp_hp[hp_offset], hpi, 0) < 0){
			RTE_LOG(DEBUG, EAL, "Failed to remap %u MB pages\n",
					(unsigned)(hpi->hugepage_sz / 0x100000));
			goto fail;
		}

		/* unmap original mappings */
		if (unmap_all_hugepages_orig(&tmp_hp[hp_offset], hpi) < 0)
			goto fail;

		/* we have processed a num of hugepages of this size, so inc offset */
		hp_offset += hpi->num_pages[0];
#endif
	}
    。。。。。。。。。
    /* create shared memory 建立共享記憶體管理映射出來的虛擬空間,建立一塊共享記憶體儲存虛擬/實體地址資訊,以便後續primary程序或者secondary程序使用*/
	hugepage = create_shared_memory(eal_hugepage_info_path(),
			nr_hugefiles * sizeof(struct hugepage_file));

	if (hugepage == NULL) {
		RTE_LOG(ERR, EAL, "Failed to create shared memory!\n");
		goto fail;
	}
	memset(hugepage, 0, nr_hugefiles * sizeof(struct hugepage_file));

	/*取消一些多餘的對映
	 * unmap pages that we won't need (looks at used_hp).
	 * also, sets final_va to NULL on pages that were unmapped.
	 */
	if (unmap_unneeded_hugepages(tmp_hp, used_hp,
			internal_config.num_hugepage_sizes) < 0) {
		RTE_LOG(ERR, EAL, "Unmapping and locking hugepages failed!\n");
		goto fail;
	}

	/*將虛擬/實體地址資訊拷貝到共享記憶體
	 * copy stuff from malloc'd hugepage* to the actual shared memory.
	 * this procedure only copies those hugepages that have final_va
	 * not NULL. has overflow protection.
	 */
	if (copy_hugepages_to_shared_mem(hugepage, nr_hugefiles,
			tmp_hp, nr_hugefiles) < 0) {
		RTE_LOG(ERR, EAL, "Copying tables to shared memory failed!\n");
		goto fail;
	}

	/* free the temporary hugepage table */
	free(tmp_hp);
	tmp_hp = NULL;

	/* find earliest free memseg - this is needed because in case of IVSHMEM,
	 * segments might have already been initialized */
	for (j = 0; j < RTE_MAX_MEMSEG; j++)
		if (mcfg->memseg[j].addr == NULL) {
			/* move to previous segment and exit loop */
			j--;
			break;
		}
//整理不連續的記憶體片段,用陣列來儲存
	for (i = 0; i < nr_hugefiles; i++) {
		new_memseg = 0;

		/* if this is a new section, create a new memseg */
		if (i == 0)
			new_memseg = 1;
		else if (hugepage[i].socket_id != hugepage[i-1].socket_id)
			new_memseg = 1;
		else if (hugepage[i].size != hugepage[i-1].size)
			new_memseg = 1;

#ifdef RTE_ARCH_PPC_64
		/* On PPC64 architecture, the mmap always start from higher
		 * virtual address to lower address. Here, both the physical
		 * address and virtual address are in descending order */
		else if ((hugepage[i-1].physaddr - hugepage[i].physaddr) !=
		    hugepage[i].size)
			new_memseg = 1;
		else if (((unsigned long)hugepage[i-1].final_va -
		    (unsigned long)hugepage[i].final_va) != hugepage[i].size)
			new_memseg = 1;
#else
		else if ((hugepage[i].physaddr - hugepage[i-1].physaddr) !=
		    hugepage[i].size)
			new_memseg = 1;
		else if (((unsigned long)hugepage[i].final_va -
		    (unsigned long)hugepage[i-1].final_va) != hugepage[i].size)
			new_memseg = 1;
#endif

		if (new_memseg) {
			j += 1;
			if (j == RTE_MAX_MEMSEG)
				break;

			mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
			mcfg->memseg[j].addr = hugepage[i].final_va;
#ifdef RTE_EAL_SINGLE_FILE_SEGMENTS
			mcfg->memseg[j].len = hugepage[i].size * hugepage[i].repeated;
#else
			mcfg->memseg[j].len = hugepage[i].size;
#endif
			mcfg->memseg[j].socket_id = hugepage[i].socket_id;
			mcfg->memseg[j].hugepage_sz = hugepage[i].size;
		}
		/* continuation of previous memseg */
		else {
#ifdef RTE_ARCH_PPC_64
		/* Use the phy and virt address of the last page as segment
		 * address for IBM Power architecture */
			mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
			mcfg->memseg[j].addr = hugepage[i].final_va;
#endif
			mcfg->memseg[j].len += mcfg->memseg[j].hugepage_sz;
		}
		hugepage[i].memseg_id = j;
	}

	if (i < nr_hugefiles) {
		RTE_LOG(ERR, EAL, "Can only reserve %d pages "
			"from %d requested\n"
			"Current %s=%d is not enough\n"
			"Please either increase it or request less amount "
			"of memory.\n",
			i, nr_hugefiles, RTE_STR(CONFIG_RTE_MAX_MEMSEG),
			RTE_MAX_MEMSEG);
		return (-ENOMEM);
	}

	return 0;
}

空間分配主要還是這個函式,將分配出來的空間進行對映儘量使其虛擬空間連續,其中需要對映兩次,第一次映射出來的虛擬空間作為實體地址的關聯,第二次mmap會在phy addr連續的基礎上,儘量也保證virt addr也是連續的,同時,本次mmap,會盡量保證virt addr在使用者傳進來的baseaddr基礎上增長。然後釋放掉第一次mmap的空間。

這些空間將分配到不同的socekt上,通過find_numasocket函式可以找到對映的大頁記憶體被放在哪個NUMA node上。後面還會進行重新分配。這樣我們的資料就可以直接到使用者態了。

這些映射出來的空間都有一個對應的hugepage file結構體儲存對應的virt addr/phy addr等資訊,通過共享記憶體,將這些結構體進行儲存和共享,後面primary程序或者secondary程序就可以很方便的使用這些地址。

由於頁的數量很多(1024個2M),所以不可能全部的空間都是連續的所以使用全域性的陣列將這些空間連續起來。

參考:https://www.cnblogs.com/yhp-smarthome/p/6995292.html

https://www.cnblogs.com/jiayy/p/3429725.html

記憶體分配

這個時候有人就會問道:根據大頁記憶體分配記憶體到底幹了些什麼事?

問的好,我當初也在糾結大頁記憶體到底幹了些什麼,怎麼儲存資料的?

上面說了,分配記憶體之後就把這些記憶體分配到不同的socket上了嘛,接下來在記憶體管理的時候在進行其他操作。

我們先看rte_eal_memzone_init(void)函式:

先看malloc_elm結構體吧
struct malloc_elem {
    struct malloc_heap *heap;
    struct malloc_elem *volatile prev;      /* points to prev elem in memseg */
    LIST_ENTRY(malloc_elem) free_list;      /* list of free elements in heap */
    const struct rte_memseg *ms;
    volatile enum elem_state state;
    uint32_t pad;
    size_t size;
#ifdef RTE_LIBRTE_MALLOC_DEBUG
    uint64_t header_cookie;         /* Cookie marking start of data */
                                    /* trailer cookie at start + size */
#endif
} __rte_cache_aligned;


/*記憶體管理模組
 * Init the memzone subsystem
 */
int
rte_eal_memzone_init(void)
{
	struct rte_mem_config *mcfg;
	const struct rte_memseg *memseg;
	unsigned i = 0;

	/* get pointer to global configuration */
	mcfg = rte_eal_get_configuration()->mem_config;

	/* mirror the runtime memsegs from config */
	free_memseg = mcfg->free_memseg;

	/* secondary processes don't need to initialise anything */
	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
		return 0;

	memseg = rte_eal_get_physmem_layout();
	if (memseg == NULL) {
		RTE_LOG(ERR, EAL, "%s(): Cannot get physical layout\n", __func__);
		return -1;
	}

	rte_rwlock_write_lock(&mcfg->mlock);

	/* fill in uninitialized free_memsegs */
	for (i = 0; i < RTE_MAX_MEMSEG; i++) {
		if (memseg[i].addr == NULL)
			break;
		if (free_memseg[i].addr != NULL)
			continue;
		memcpy(&free_memseg[i], &memseg[i], sizeof(struct rte_memseg));
	}

	/* make all zones cache-aligned 快取對齊*/
	for (i = 0; i < RTE_MAX_MEMSEG; i++) {
		if (free_memseg[i].addr == NULL)
			break;
		if (memseg_sanitize(&free_memseg[i]) < 0) {
			RTE_LOG(ERR, EAL, "%s(): Sanity check failed\n", __func__);
			rte_rwlock_write_unlock(&mcfg->mlock);
			return -1;
		}
	}

	/* delete all zones */
	mcfg->memzone_idx = 0;
	memset(mcfg->memzone, 0, sizeof(mcfg->memzone));

	rte_rwlock_write_unlock(&mcfg->mlock);

	return 0;
}

這個函式主要就是把記憶體放到空閒連結串列中,等需要的時候,能夠分配出去。

接下來再需要記憶體的時候就進行分配:

記憶體存放的地方當然事記憶體池了,將所有的記憶體進行管理,雖然這些記憶體已經分配到具體的socket上去了。在建立記憶體池時,會建立一個ring來儲存分配的物件,同時,為了減少多核之間對同一個ring的訪問,每一個核都維護著一個cache,優先從cache中取。

記憶體分配有一系列的介面:大多定義在rte_malloc.c檔案中。我們重點挑兩個來看一下。

rte_malloc_socket()這個是一個基礎函式,可以在這個函式的基礎上進行封裝,主要引數是型別,大小,對齊,以及從哪個socket上分。一般來說,分配記憶體從當前執行緒執行的socket上分配,可以避免記憶體跨socket訪問,提高效能。

ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type,
                size, 0, align == 0 ? 1 : align, 0);
if (ret != NULL || socket_arg != SOCKET_ID_ANY)
    return ret;

先在指定的socket上分配,如果不能成功,再去嘗試其他的socket。我們接著看函式malloc_heap_alloc():

void *
malloc_heap_alloc(struct malloc_heap *heap,
        const char *type __attribute__((unused)), size_t size, unsigned flags,
        size_t align, size_t bound)
{
    struct malloc_elem *elem;

    size = RTE_CACHE_LINE_ROUNDUP(size);
    align = RTE_CACHE_LINE_ROUNDUP(align);

    rte_spinlock_lock(&heap->lock);

    elem = find_suitable_element(heap, size, flags, align, bound);
    if (elem != NULL) {
        elem = malloc_elem_alloc(elem, size, align, bound);
        /* increase heap's count of allocated elements */
        heap->alloc_count++;
    }
    rte_spinlock_unlock(&heap->lock);

    return elem == NULL ? NULL : (void *)(&elem[1]);

先去空閒連結串列中找是否有滿足需求的記憶體塊,如果找到,就進行分配,否則返回失敗。進一步的,在函式malloc_elem_alloc()分配的的時候,如果存在的記憶體大於需要的記憶體時,會對記憶體進行切割,然後把用不完的重新掛在空閒連結串列上。就不細緻的程式碼分析了。

rte_memzone_reserve_aligned()這個函式的返回值型別是struct rte_memzone型的,這是和上一個分配介面的不同之處,同時注意分配時的flag的不同。分配出來的memzone可以直接使用名字索引到。這個函式最終也是會呼叫到malloc_heap_alloc(),就不多說了,可以看看程式碼。

分配佇列

看示例程式碼

/* Allocate and set up 4 RX queue per Ethernet port. */
	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}

這個地方我需要分配4給佇列給網口。

ret = (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
					      socket_id, rx_conf, mp);

設定佇列以及初始化。由於這個地方是封裝了的函式指標,所以適配dpdk上所有的相容的驅動。這個地方以ixgbe驅動為例:

int
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
			 uint16_t queue_idx,
			 uint16_t nb_desc,
			 unsigned int socket_id,
			 const struct rte_eth_rxconf *rx_conf,
			 struct rte_mempool *mp)
{
	const struct rte_memzone *rz;
	struct ixgbe_rx_queue *rxq;
	struct ixgbe_hw     *hw;
	uint16_t len;

	PMD_INIT_FUNC_TRACE();
	hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);

	/*
	 * Validate number of receive descriptors.
	 * It must not exceed hardware maximum, and must be multiple
	 * of IXGBE_ALIGN.
	 */
	if (((nb_desc * sizeof(union ixgbe_adv_rx_desc)) % IXGBE_ALIGN) != 0 ||
	    (nb_desc > IXGBE_MAX_RING_DESC) ||
	    (nb_desc < IXGBE_MIN_RING_DESC)) {
		return (-EINVAL);
	}

	/* Free memory prior to re-allocation if needed... */
	if (dev->data->rx_queues[queue_idx] != NULL) {
		ixgbe_rx_queue_release(dev->data->rx_queues[queue_idx]);
		dev->data->rx_queues[queue_idx] = NULL;
	}

	/* First allocate the rx queue data structure */
	rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
				 RTE_CACHE_LINE_SIZE, socket_id);
	if (rxq == NULL)
		return (-ENOMEM);
	rxq->mb_pool = mp;
	rxq->nb_rx_desc = nb_desc;
	rxq->rx_free_thresh = rx_conf->rx_free_thresh;
	rxq->queue_id = queue_idx;
	rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
		queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
	rxq->port_id = dev->data->port_id;
	rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?
							0 : ETHER_CRC_LEN);
	rxq->drop_en = rx_conf->rx_drop_en;
	rxq->rx_deferred_start = rx_conf->rx_deferred_start;

	/*
	 * Allocate RX ring hardware descriptors. A memzone large enough to
	 * handle the maximum ring size is allocated in order to allow for
	 * resizing in later calls to the queue setup function.
	 */
	rz = ring_dma_zone_reserve(dev, "rx_ring", queue_idx,
				   RX_RING_SZ, socket_id);
	if (rz == NULL) {
		ixgbe_rx_queue_release(rxq);
		return (-ENOMEM);
	}

	/*
	 * Zero init all the descriptors in the ring.
	 */
	memset (rz->addr, 0, RX_RING_SZ);

	/*
	 * Modified to setup VFRDT for Virtual Function
	 */
	if (hw->mac.type == ixgbe_mac_82599_vf ||
	    hw->mac.type == ixgbe_mac_X540_vf ||
	    hw->mac.type == ixgbe_mac_X550_vf ||
	    hw->mac.type == ixgbe_mac_X550EM_x_vf) {
		rxq->rdt_reg_addr =
			IXGBE_PCI_REG_ADDR(hw, IXGBE_VFRDT(queue_idx));
		rxq->rdh_reg_addr =
			IXGBE_PCI_REG_ADDR(hw, IXGBE_VFRDH(queue_idx));
	}
	else {
		rxq->rdt_reg_addr =
			IXGBE_PCI_REG_ADDR(hw, IXGBE_RDT(rxq->reg_idx));
		rxq->rdh_reg_addr =
			IXGBE_PCI_REG_ADDR(hw, IXGBE_RDH(rxq->reg_idx));
	}
#ifndef RTE_LIBRTE_XEN_DOM0
	rxq->rx_ring_phys_addr = (uint64_t) rz->phys_addr;
#else
	rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
#endif
	rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

	/*
	 * Certain constraints must be met in order to use the bulk buffer
	 * allocation Rx burst function. If any of Rx queues doesn't meet them
	 * the feature should be disabled for the whole port.
	 */
	if (check_rx_burst_bulk_alloc_preconditions(rxq)) {
		PMD_INIT_LOG(DEBUG, "queue[%d] doesn't meet Rx Bulk Alloc "
				    "preconditions - canceling the feature for "
				    "the whole port[%d]",
			     rxq->queue_id, rxq->port_id);
		hw->rx_bulk_alloc_allowed = false;
	}

	/*
	 * Allocate software ring. Allow for space at the end of the
	 * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
	 * function does not access an invalid memory region.
	 */
	len = nb_desc;
	if (hw->rx_bulk_alloc_allowed)
		len += RTE_PMD_IXGBE_RX_MAX_BURST;

	rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
					  sizeof(struct ixgbe_rx_entry) * len,
					  RTE_CACHE_LINE_SIZE, socket_id);
	if (rxq->sw_ring == NULL) {
		ixgbe_rx_queue_release(rxq);
		return (-ENOMEM);
	}
	PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64,
		     rxq->sw_ring, rxq->rx_ring, rxq->rx_ring_phys_addr);

	if (!rte_is_power_of_2(nb_desc)) {
		PMD_INIT_LOG(DEBUG, "queue[%d] doesn't meet Vector Rx "
				    "preconditions - canceling the feature for "
				    "the whole port[%d]",
			     rxq->queue_id, rxq->port_id);
		hw->rx_vec_allowed = false;
	} else
		ixgbe_rxq_vec_setup(rxq);

	dev->data->rx_queues[queue_idx] = rxq;

	ixgbe_reset_rx_queue(hw, rxq);

	return 0;
}

接下來的都是重點咯:
<1>.分配佇列結構體,並填充結構

rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);

填充結構體的所屬記憶體池,描述符個數,佇列號,佇列所屬介面號等成員。
<2>.分配描述符佇列的空間,按照最大的描述符個數進行分配

rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);

接著獲取描述符佇列的頭和尾暫存器的地址,在收發包後,軟體要對這個暫存器進行處理。

rxq->rdt_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDT(rxq->reg_idx));
        rxq->rdh_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDH(rxq->reg_idx));

設定佇列的接收描述符ring的實體地址和虛擬地址。

rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

<3>分配sw_ring,這個ring中儲存的物件是struct ixgbe_rx_entry,其實裡面就是資料包mbuf的指標。

rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);

以上三步做完以後,新分配的佇列結構體重要的部分就已經填充完了,下面需要重置一下其他成員

ixgbe_reset_rx_queue()

先把分配的描述符佇列清空,其實清空在分配的時候就已經做了,沒必要重複做

for (i = 0; i < len; i++) {
        rxq->rx_ring[i] = zeroed_desc;
    }

然後初始化佇列中一下其他成員

rxq->rx_nb_avail = 0;
rxq->rx_next_avail = 0;
rxq->rx_free_trigger = (uint16_t)(rxq->rx_free_thresh - 1);
rxq->rx_tail = 0;
rxq->nb_rx_hold = 0;
rxq->pkt_first_seg = NULL;
rxq->pkt_last_seg = NULL;

這樣,接收佇列就初始化完了。

 

收資料包

參考一下:https://www.cnblogs.com/yhp-smarthome/p/6705638.html

 

dpdk在初始化的時候就根據大頁的配置資訊初始化好了記憶體,這些記憶體片又被分配到不同的socket上去了,最後通過共享記憶體和記憶體池來進行管理。
在建立記憶體池的時候就將這記憶體片的資訊結構體放入到記憶體池的一個ring佇列中去了。
在多佇列的模式下,將記憶體池中的記憶體資訊進行多佇列的分配,將這些記憶體的資訊再次進行填充,如果當前執行緒下的socket上分配的記憶體足夠的化那麼當前佇列直接使用當前socket上的記憶體,如果不夠則取其他的socket上的記憶體使用,這樣也減少了socket之間的呼叫。
每個佇列中都有一個DMA暫存器,這些暫存器的工作就是將資料包拷貝到這些記憶體中去。這些地址通過各種轉換之後就可以直接讓DMA暫存器來使用,當DMA暫存器把這些資料放到這些記憶體之後CPU就可以直接取對應地址的資料了。