dpdk虛機前後端連線建立(dpdk側)
技術標籤:dpdk
概述
使用ovs+dpdk模式的kvm虛機,在ovs上會建立一個vhostuser型別的口,vhostuser通過unix socket方式與qemu建立前後端通訊機制,其中vhostuser分為server型別和clinet型別,當vhostuser為clinet時,vhotuser主動跟qemu建立連線,因此當ovs重啟的時候,可保證虛機自動重連。本文主要描述dpdk側vhostuser與qemu之間unix socket連線建立過程。
rte_vhost_driver_register
當ovs新增一個vhostuser口時,會進入該register流程,在register流程裡,首先根據傳遞下來的unxi socket檔案路徑建立socket資訊,然後儲存到unix socket本地資料裡;
int rte_vhost_driver_register(const char *path, uint64_t flags) { int ret = -1; struct vhost_user_socket *vsocket; if (!path) return -1; pthread_mutex_lock(&vhost_user.mutex); if (vhost_user.vsocket_cnt == MAX_VHOST_SOCKET) { RTE_LOG(ERR, VHOST_CONFIG, "error: the number of vhost sockets reaches maximum\n"); goto out; } vsocket = malloc(sizeof(struct vhost_user_socket)); if (!vsocket) goto out; memset(vsocket, 0, sizeof(struct vhost_user_socket)); //對vsocket的path資訊賦值 vsocket->path = strdup(path); if (vsocket->path == NULL) { RTE_LOG(ERR, VHOST_CONFIG, "error: failed to copy socket path string\n"); vhost_user_socket_mem_free(vsocket); goto out; } TAILQ_INIT(&vsocket->conn_list); ret = pthread_mutex_init(&vsocket->conn_mutex, NULL); if (ret) { RTE_LOG(ERR, VHOST_CONFIG, "error: failed to init connection mutex\n"); goto out_free; } vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY; /* * Set the supported features correctly for the builtin vhost-user * net driver. * * Applications know nothing about features the builtin virtio net * driver (virtio_net.c) supports, thus it's not possible for them * to invoke rte_vhost_driver_set_features(). To workaround it, here * we set it unconditionally. If the application want to implement * another vhost-user driver (say SCSI), it should call the * rte_vhost_driver_set_features(), which will overwrite following * two values. */ vsocket->use_builtin_virtio_net = true; vsocket->supported_features = VIRTIO_NET_SUPPORTED_FEATURES; vsocket->features = VIRTIO_NET_SUPPORTED_FEATURES; vsocket->protocol_features = VHOST_USER_PROTOCOL_FEATURES; /* * Dequeue zero copy can't assure descriptors returned in order. * Also, it requires that the guest memory is populated, which is * not compatible with postcopy. */ if (vsocket->dequeue_zero_copy) { vsocket->supported_features &= ~(1ULL << VIRTIO_F_IN_ORDER); vsocket->features &= ~(1ULL << VIRTIO_F_IN_ORDER); RTE_LOG(INFO, VHOST_CONFIG, "Dequeue zero copy requested, disabling postcopy support\n"); vsocket->protocol_features &= ~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT); } if (!(flags & RTE_VHOST_USER_IOMMU_SUPPORT)) { vsocket->supported_features &= ~(1ULL << VIRTIO_F_IOMMU_PLATFORM); vsocket->features &= ~(1ULL << VIRTIO_F_IOMMU_PLATFORM); } if (!(flags & RTE_VHOST_USER_POSTCOPY_SUPPORT)) { vsocket->protocol_features &= ~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT); } else { #ifndef RTE_LIBRTE_VHOST_POSTCOPY RTE_LOG(ERR, VHOST_CONFIG, "Postcopy requested but not compiled\n"); ret = -1; goto out_mutex; #endif } if ((flags & RTE_VHOST_USER_CLIENT) != 0) { vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT); if (vsocket->reconnect && reconn_tid == 0) { if (vhost_user_reconnect_init() != 0) goto out_mutex; } } else { vsocket->is_server = true; } //建立unix socket ret = create_unix_socket(vsocket); if (ret < 0) { goto out_mutex; } //儲存到unix socket數組裡 vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket; pthread_mutex_unlock(&vhost_user.mutex); return ret; out_mutex: if (pthread_mutex_destroy(&vsocket->conn_mutex)) { RTE_LOG(ERR, VHOST_CONFIG, "error: failed to destroy connection mutex\n"); } out_free: vhost_user_socket_mem_free(vsocket); out: pthread_mutex_unlock(&vhost_user.mutex); return ret; }
rte_vhost_driver_start
vhostuser口啟動時進入start流程,在start流程首先根據unix socket path資訊找到register階段建立socket資訊,然後建立vhost-events poll執行緒,該poll執行緒的處理函式fdset_event_dispatch會去監聽vhost_user.fdset指定的socket fd的write、read事件;
int rte_vhost_driver_start(const char *path) { struct vhost_user_socket *vsocket; static pthread_t fdset_tid; pthread_mutex_lock(&vhost_user.mutex); //首先根據vhu socket檔案資訊,找到對應的vsocket vsocket = find_vhost_user_socket(path); pthread_mutex_unlock(&vhost_user.mutex); if (!vsocket) return -1; if (fdset_tid == 0) { /** * create a pipe which will be waited by poll and notified to * rebuild the wait list of poll. */ if (fdset_pipe_init(&vhost_user.fdset) < 0) { RTE_LOG(ERR, VHOST_CONFIG, "failed to create pipe for vhost fdset\n"); return -1; } //建立vhost-events執行緒,用於unix socket的read、write操作 //執行緒處理函式fdset_event_dispatch會去poll所有的socket fd(pfdset->rwfds) //poll到fd事件後, 根據fd的型別(read、write)分別執行vhost_user_read_cb和write操作函式(目前好像只有read的?) int ret = rte_ctrl_thread_create(&fdset_tid, "vhost-events", NULL, fdset_event_dispatch, &vhost_user.fdset); if (ret != 0) { RTE_LOG(ERR, VHOST_CONFIG, "failed to create fdset handling thread"); fdset_pipe_uninit(&vhost_user.fdset); return -1; } } //vhost啟動 if (vsocket->is_server) return vhost_user_start_server(vsocket); else return vhost_user_start_client(vsocket); }
vhost_user_start_client
當vhostuser作為client端時,進入starat_client流程,start_clinet首先先通過connect系統呼叫建立unix socket連線,connect成功後通過vhost_user_add_connection將該vhostuser unix socket fd註冊到vhost_user.fdset(vhost-events執行緒poll使用);同時為fd分配read事件處理函式vhost_user_read_cb,然後fdset_event_dispatch開始poll unix socket的write、read事件;
static int
vhost_user_start_client(struct vhost_user_socket *vsocket)
{
int ret;
int fd = vsocket->socket_fd;
const char *path = vsocket->path;
struct vhost_user_reconnect *reconn;
//unix socket完成connetct連線
ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
sizeof(vsocket->un));
if (ret == 0) {
//將fd新增到vhost_user.fdset裡,併為fd分配read處理函式vhost_user_read_cb
//等待vhost_event執行緒poll
vhost_user_add_connection(fd, vsocket);
return 0;
}
RTE_LOG(WARNING, VHOST_CONFIG,
"failed to connect to %s: %s\n",
path, strerror(errno));
if (ret == -2 || !vsocket->reconnect) {
close(fd);
return -1;
}
RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
reconn = malloc(sizeof(*reconn));
if (reconn == NULL) {
RTE_LOG(ERR, VHOST_CONFIG,
"failed to allocate memory for reconnect\n");
close(fd);
return -1;
}
reconn->un = vsocket->un;
reconn->fd = fd;
reconn->vsocket = vsocket;
pthread_mutex_lock(&reconn_list.mutex);
TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
pthread_mutex_unlock(&reconn_list.mutex);
return 0;
}
static void
vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
{
...
conn->connfd = fd;
conn->vsocket = vsocket;
conn->vid = vid;
//儲存socket fd資訊,已經socket的read處理函式
ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
NULL, conn);
if (ret < 0) {
RTE_LOG(ERR, VHOST_CONFIG,
"failed to add fd %d into vhost server fdset\n",
fd);
if (vsocket->notify_ops->destroy_connection)
vsocket->notify_ops->destroy_connection(conn->vid);
goto err;
}
...
}
fdset_event_dispatch
當qemu側往unix socket寫事件時,poll執行緒捕獲到read事件,呼叫read處理函式vhost_user_read_cb,然後處理函式最終進入vhost_user_msg_handler,根據不同的訊息型別做不同的處理;
void *
fdset_event_dispatch(void *arg)
{
int i;
struct pollfd *pfd;
struct fdentry *pfdentry;
fd_cb rcb, wcb;
void *dat;
int fd, numfds;
int remove1, remove2;
int need_shrink;
struct fdset *pfdset = arg;
int val;
if (pfdset == NULL)
return NULL;
while (1) {
/*
* When poll is blocked, other threads might unregister
* listenfds from and register new listenfds into fdset.
* When poll returns, the entries for listenfds in the fdset
* might have been updated. It is ok if there is unwanted call
* for new listenfds.
*/
pthread_mutex_lock(&pfdset->fd_mutex);
numfds = pfdset->num;
pthread_mutex_unlock(&pfdset->fd_mutex);
val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
if (val < 0)
continue;
need_shrink = 0;
for (i = 0; i < numfds; i++) {
pthread_mutex_lock(&pfdset->fd_mutex);
pfdentry = &pfdset->fd[i];
fd = pfdentry->fd;
pfd = &pfdset->rwfds[i];
if (fd < 0) {
need_shrink = 1;
pthread_mutex_unlock(&pfdset->fd_mutex);
continue;
}
if (!pfd->revents) {
pthread_mutex_unlock(&pfdset->fd_mutex);
continue;
}
remove1 = remove2 = 0;
rcb = pfdentry->rcb;
wcb = pfdentry->wcb;
dat = pfdentry->dat;
pfdentry->busy = 1;
pthread_mutex_unlock(&pfdset->fd_mutex);
if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
rcb(fd, dat, &remove1);
if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
wcb(fd, dat, &remove2);
pfdentry->busy = 0;
/*
* fdset_del needs to check busy flag.
* We don't allow fdset_del to be called in callback
* directly.
*/
/*
* When we are to clean up the fd from fdset,
* because the fd is closed in the cb,
* the old fd val could be reused by when creates new
* listen fd in another thread, we couldn't call
* fdset_del.
*/
if (remove1 || remove2) {
pfdentry->fd = -1;
need_shrink = 1;
}
}
if (need_shrink)
fdset_shrink(pfdset);
}
return NULL;
}
vhost_user_client_reconnect
vhostuser初始化時會啟動vhost_reconn執行緒用於connect建立連線失敗時嘗試重連,當vhost_user_start_client呼叫connect建立連線失敗,會建立reconnect資訊,然後新增到reconn_list
static int
vhost_user_start_client(struct vhost_user_socket *vsocket)
{
...
RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
reconn = malloc(sizeof(*reconn));
if (reconn == NULL) {
RTE_LOG(ERR, VHOST_CONFIG,
"failed to allocate memory for reconnect\n");
close(fd);
return -1;
}
reconn->un = vsocket->un;
reconn->fd = fd;
reconn->vsocket = vsocket;
pthread_mutex_lock(&reconn_list.mutex);
TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
pthread_mutex_unlock(&reconn_list.mutex);
return 0;
}
vhost-connet執行緒不斷檢測reconn_list,如果發現有資料需要重連,則呼叫vhost_user_add_connection嘗試重新建立連線;
static void *
vhost_user_client_reconnect(void *arg __rte_unused)
{
int ret;
struct vhost_user_reconnect *reconn, *next;
while (1) {
pthread_mutex_lock(&reconn_list.mutex);
/*
* An equal implementation of TAILQ_FOREACH_SAFE,
* which does not exist on all platforms.
*/
for (reconn = TAILQ_FIRST(&reconn_list.head);
reconn != NULL; reconn = next) {
next = TAILQ_NEXT(reconn, next);
ret = vhost_user_connect_nonblock(reconn->fd,
(struct sockaddr *)&reconn->un,
sizeof(reconn->un));
if (ret == -2) {
close(reconn->fd);
RTE_LOG(ERR, VHOST_CONFIG,
"reconnection for fd %d failed\n",
reconn->fd);
goto remove_fd;
}
if (ret == -1)
continue;
RTE_LOG(INFO, VHOST_CONFIG,
"%s: connected\n", reconn->vsocket->path);
vhost_user_add_connection(reconn->fd, reconn->vsocket);
remove_fd:
TAILQ_REMOVE(&reconn_list.head, reconn, next);
free(reconn);
}
pthread_mutex_unlock(&reconn_list.mutex);
sleep(1);
}
return NULL;
}