1. 程式人生 > 其它 >dpdk虛機前後端連線建立(dpdk側)

dpdk虛機前後端連線建立(dpdk側)

技術標籤:dpdk

概述

使用ovs+dpdk模式的kvm虛機,在ovs上會建立一個vhostuser型別的口,vhostuser通過unix socket方式與qemu建立前後端通訊機制,其中vhostuser分為server型別和clinet型別,當vhostuser為clinet時,vhotuser主動跟qemu建立連線,因此當ovs重啟的時候,可保證虛機自動重連。本文主要描述dpdk側vhostuser與qemu之間unix socket連線建立過程。

rte_vhost_driver_register

當ovs新增一個vhostuser口時,會進入該register流程,在register流程裡,首先根據傳遞下來的unxi socket檔案路徑建立socket資訊,然後儲存到unix socket本地資料裡;

int
rte_vhost_driver_register(const char *path, uint64_t flags)
{
	int ret = -1;
	struct vhost_user_socket *vsocket;

	if (!path)
		return -1;

	pthread_mutex_lock(&vhost_user.mutex);

	if (vhost_user.vsocket_cnt == MAX_VHOST_SOCKET) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"error: the number of vhost sockets reaches maximum\n");
		goto out;
	}

	vsocket = malloc(sizeof(struct vhost_user_socket));
	if (!vsocket)
		goto out;
	memset(vsocket, 0, sizeof(struct vhost_user_socket));
	//對vsocket的path資訊賦值
	vsocket->path = strdup(path);
	if (vsocket->path == NULL) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"error: failed to copy socket path string\n");
		vhost_user_socket_mem_free(vsocket);
		goto out;
	}
	TAILQ_INIT(&vsocket->conn_list);
	ret = pthread_mutex_init(&vsocket->conn_mutex, NULL);
	if (ret) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"error: failed to init connection mutex\n");
		goto out_free;
	}
	vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;

	/*
	 * Set the supported features correctly for the builtin vhost-user
	 * net driver.
	 *
	 * Applications know nothing about features the builtin virtio net
	 * driver (virtio_net.c) supports, thus it's not possible for them
	 * to invoke rte_vhost_driver_set_features(). To workaround it, here
	 * we set it unconditionally. If the application want to implement
	 * another vhost-user driver (say SCSI), it should call the
	 * rte_vhost_driver_set_features(), which will overwrite following
	 * two values.
	 */
	vsocket->use_builtin_virtio_net = true;
	vsocket->supported_features = VIRTIO_NET_SUPPORTED_FEATURES;
	vsocket->features           = VIRTIO_NET_SUPPORTED_FEATURES;
	vsocket->protocol_features  = VHOST_USER_PROTOCOL_FEATURES;

	/*
	 * Dequeue zero copy can't assure descriptors returned in order.
	 * Also, it requires that the guest memory is populated, which is
	 * not compatible with postcopy.
	 */
	if (vsocket->dequeue_zero_copy) {
		vsocket->supported_features &= ~(1ULL << VIRTIO_F_IN_ORDER);
		vsocket->features &= ~(1ULL << VIRTIO_F_IN_ORDER);

		RTE_LOG(INFO, VHOST_CONFIG,
			"Dequeue zero copy requested, disabling postcopy support\n");
		vsocket->protocol_features &=
			~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT);
	}

	if (!(flags & RTE_VHOST_USER_IOMMU_SUPPORT)) {
		vsocket->supported_features &= ~(1ULL << VIRTIO_F_IOMMU_PLATFORM);
		vsocket->features &= ~(1ULL << VIRTIO_F_IOMMU_PLATFORM);
	}

	if (!(flags & RTE_VHOST_USER_POSTCOPY_SUPPORT)) {
		vsocket->protocol_features &=
			~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT);
	} else {
#ifndef RTE_LIBRTE_VHOST_POSTCOPY
		RTE_LOG(ERR, VHOST_CONFIG,
			"Postcopy requested but not compiled\n");
		ret = -1;
		goto out_mutex;
#endif
	}

	if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
		vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
		if (vsocket->reconnect && reconn_tid == 0) {
			if (vhost_user_reconnect_init() != 0)
				goto out_mutex;
		}
	} else {
		vsocket->is_server = true;
	}
	//建立unix socket
	ret = create_unix_socket(vsocket);
	if (ret < 0) {
		goto out_mutex;
	}

	//儲存到unix socket數組裡
	vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket;

	pthread_mutex_unlock(&vhost_user.mutex);
	return ret;

out_mutex:
	if (pthread_mutex_destroy(&vsocket->conn_mutex)) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"error: failed to destroy connection mutex\n");
	}
out_free:
	vhost_user_socket_mem_free(vsocket);
out:
	pthread_mutex_unlock(&vhost_user.mutex);

	return ret;
}

rte_vhost_driver_start

vhostuser口啟動時進入start流程,在start流程首先根據unix socket path資訊找到register階段建立socket資訊,然後建立vhost-events poll執行緒,該poll執行緒的處理函式fdset_event_dispatch會去監聽vhost_user.fdset指定的socket fd的write、read事件;

int
rte_vhost_driver_start(const char *path)
{
	struct vhost_user_socket *vsocket;
	static pthread_t fdset_tid;

	pthread_mutex_lock(&vhost_user.mutex);
	//首先根據vhu socket檔案資訊,找到對應的vsocket
	vsocket = find_vhost_user_socket(path);
	pthread_mutex_unlock(&vhost_user.mutex);

	if (!vsocket)
		return -1;

	if (fdset_tid == 0) {
		/**
		 * create a pipe which will be waited by poll and notified to
		 * rebuild the wait list of poll.
		 */
		if (fdset_pipe_init(&vhost_user.fdset) < 0) {
			RTE_LOG(ERR, VHOST_CONFIG,
				"failed to create pipe for vhost fdset\n");
			return -1;
		}

		//建立vhost-events執行緒,用於unix socket的read、write操作
		//執行緒處理函式fdset_event_dispatch會去poll所有的socket fd(pfdset->rwfds)
		//poll到fd事件後, 根據fd的型別(read、write)分別執行vhost_user_read_cb和write操作函式(目前好像只有read的?)
		int ret = rte_ctrl_thread_create(&fdset_tid,
			"vhost-events", NULL, fdset_event_dispatch,
			&vhost_user.fdset);
		if (ret != 0) {
			RTE_LOG(ERR, VHOST_CONFIG,
				"failed to create fdset handling thread");

			fdset_pipe_uninit(&vhost_user.fdset);
			return -1;
		}
	}

	//vhost啟動
	if (vsocket->is_server)
		return vhost_user_start_server(vsocket);
	else
		return vhost_user_start_client(vsocket);
}

vhost_user_start_client

當vhostuser作為client端時,進入starat_client流程,start_clinet首先先通過connect系統呼叫建立unix socket連線,connect成功後通過vhost_user_add_connection將該vhostuser unix socket fd註冊到vhost_user.fdset(vhost-events執行緒poll使用);同時為fd分配read事件處理函式vhost_user_read_cb,然後fdset_event_dispatch開始poll unix socket的write、read事件;

static int
vhost_user_start_client(struct vhost_user_socket *vsocket)
{
	int ret;
	int fd = vsocket->socket_fd;
	const char *path = vsocket->path;
	struct vhost_user_reconnect *reconn;

	//unix socket完成connetct連線
	ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
					  sizeof(vsocket->un));
	if (ret == 0) {
		//將fd新增到vhost_user.fdset裡,併為fd分配read處理函式vhost_user_read_cb
		//等待vhost_event執行緒poll
		vhost_user_add_connection(fd, vsocket);
		return 0;
	}

	RTE_LOG(WARNING, VHOST_CONFIG,
		"failed to connect to %s: %s\n",
		path, strerror(errno));

	if (ret == -2 || !vsocket->reconnect) {
		close(fd);
		return -1;
	}

	RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
	reconn = malloc(sizeof(*reconn));
	if (reconn == NULL) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"failed to allocate memory for reconnect\n");
		close(fd);
		return -1;
	}
	reconn->un = vsocket->un;
	reconn->fd = fd;
	reconn->vsocket = vsocket;
	pthread_mutex_lock(&reconn_list.mutex);
	TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
	pthread_mutex_unlock(&reconn_list.mutex);

	return 0;
}

static void
vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
{
	...
	conn->connfd = fd;
	conn->vsocket = vsocket;
	conn->vid = vid;
	//儲存socket fd資訊,已經socket的read處理函式
	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
			NULL, conn);
	if (ret < 0) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"failed to add fd %d into vhost server fdset\n",
			fd);

		if (vsocket->notify_ops->destroy_connection)
			vsocket->notify_ops->destroy_connection(conn->vid);

		goto err;
	}

	...
}

fdset_event_dispatch

當qemu側往unix socket寫事件時,poll執行緒捕獲到read事件,呼叫read處理函式vhost_user_read_cb,然後處理函式最終進入vhost_user_msg_handler,根據不同的訊息型別做不同的處理;

void *
fdset_event_dispatch(void *arg)
{
	int i;
	struct pollfd *pfd;
	struct fdentry *pfdentry;
	fd_cb rcb, wcb;
	void *dat;
	int fd, numfds;
	int remove1, remove2;
	int need_shrink;
	struct fdset *pfdset = arg;
	int val;

	if (pfdset == NULL)
		return NULL;

	while (1) {

		/*
		 * When poll is blocked, other threads might unregister
		 * listenfds from and register new listenfds into fdset.
		 * When poll returns, the entries for listenfds in the fdset
		 * might have been updated. It is ok if there is unwanted call
		 * for new listenfds.
		 */
		pthread_mutex_lock(&pfdset->fd_mutex);
		numfds = pfdset->num;
		pthread_mutex_unlock(&pfdset->fd_mutex);

		val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
		if (val < 0)
			continue;

		need_shrink = 0;
		for (i = 0; i < numfds; i++) {
			pthread_mutex_lock(&pfdset->fd_mutex);

			pfdentry = &pfdset->fd[i];
			fd = pfdentry->fd;
			pfd = &pfdset->rwfds[i];
 
			if (fd < 0) {
				need_shrink = 1;
				pthread_mutex_unlock(&pfdset->fd_mutex);
				continue;
			}

			if (!pfd->revents) {
				pthread_mutex_unlock(&pfdset->fd_mutex);
				continue;
			}

			remove1 = remove2 = 0;

			rcb = pfdentry->rcb;
			wcb = pfdentry->wcb;
			dat = pfdentry->dat;
			pfdentry->busy = 1;

			pthread_mutex_unlock(&pfdset->fd_mutex);

			if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
				rcb(fd, dat, &remove1);
			if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
				wcb(fd, dat, &remove2);
			pfdentry->busy = 0;
			/*
			 * fdset_del needs to check busy flag.
			 * We don't allow fdset_del to be called in callback
			 * directly.
			 */
			/*
			 * When we are to clean up the fd from fdset,
			 * because the fd is closed in the cb,
			 * the old fd val could be reused by when creates new
			 * listen fd in another thread, we couldn't call
			 * fdset_del.
			 */
			if (remove1 || remove2) {
				pfdentry->fd = -1;
				need_shrink = 1;
			}
		}

		if (need_shrink)
			fdset_shrink(pfdset);
	}

	return NULL;
}

vhost_user_client_reconnect

vhostuser初始化時會啟動vhost_reconn執行緒用於connect建立連線失敗時嘗試重連,當vhost_user_start_client呼叫connect建立連線失敗,會建立reconnect資訊,然後新增到reconn_list

static int
vhost_user_start_client(struct vhost_user_socket *vsocket)
{
	...
	RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
	reconn = malloc(sizeof(*reconn));
	if (reconn == NULL) {
		RTE_LOG(ERR, VHOST_CONFIG,
			"failed to allocate memory for reconnect\n");
		close(fd);
		return -1;
	}
	reconn->un = vsocket->un;
	reconn->fd = fd;
	reconn->vsocket = vsocket;
	pthread_mutex_lock(&reconn_list.mutex);
	TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
	pthread_mutex_unlock(&reconn_list.mutex);

	return 0;
}

vhost-connet執行緒不斷檢測reconn_list,如果發現有資料需要重連,則呼叫vhost_user_add_connection嘗試重新建立連線;

static void *
vhost_user_client_reconnect(void *arg __rte_unused)
{
	int ret;
	struct vhost_user_reconnect *reconn, *next;

	while (1) {
		pthread_mutex_lock(&reconn_list.mutex);

		/*
		 * An equal implementation of TAILQ_FOREACH_SAFE,
		 * which does not exist on all platforms.
		 */
		for (reconn = TAILQ_FIRST(&reconn_list.head);
		     reconn != NULL; reconn = next) {
			next = TAILQ_NEXT(reconn, next);

			ret = vhost_user_connect_nonblock(reconn->fd,
						(struct sockaddr *)&reconn->un,
						sizeof(reconn->un));
			if (ret == -2) {
				close(reconn->fd);
				RTE_LOG(ERR, VHOST_CONFIG,
					"reconnection for fd %d failed\n",
					reconn->fd);
				goto remove_fd;
			}
			if (ret == -1)
				continue;

			RTE_LOG(INFO, VHOST_CONFIG,
				"%s: connected\n", reconn->vsocket->path);
			vhost_user_add_connection(reconn->fd, reconn->vsocket);
remove_fd:
			TAILQ_REMOVE(&reconn_list.head, reconn, next);
			free(reconn);
		}

		pthread_mutex_unlock(&reconn_list.mutex);
		sleep(1);
	}

	return NULL;
}