1. 程式人生 > >使用應用程式測試網絡卡收發UDP資料包效能

使用應用程式測試網絡卡收發UDP資料包效能

通常使用iperf進行網絡卡效能測試,但是有些情況下需要自己編寫應用程式來進行UDP資料包的收發,根據時間間隔統計出網絡卡的收發包資訊。

UDP傳送端程式碼如下:

#define _GNU_SOURCE // sendmmsg

#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <errno.h>

#include "common.h"


struct state {
	struct net_addr *target_addr;
	int packets_in_buf;
	const char *payload;
	int payload_sz;
	int src_port;
};

void thread_loop(void *userdata) {
	struct state *state = userdata;

	struct mmsghdr *messages = calloc(state->packets_in_buf, sizeof(struct mmsghdr));
	struct iovec *iovecs = calloc(state->packets_in_buf, sizeof(struct iovec));

	int fd = net_connect_udp(state->target_addr, state->src_port);

	int i;
	for (i = 0; i < state->packets_in_buf; i++) {
		struct iovec *iovec = &iovecs[i];
		struct mmsghdr *msg = &messages[i];

		msg->msg_hdr.msg_iov = iovec;
		msg->msg_hdr.msg_iovlen = 1;

		iovec->iov_base = (void*)state->payload;
		iovec->iov_len = state->payload_sz;
	}
	
	while (1) {
		int r = sendmmsg(fd, messages, state->packets_in_buf, 0);
		if (r <= 0) {
			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
				continue;
			}

			if (errno == ECONNREFUSED) {
				continue;
			}
			PFATAL("sendmmsg()");
		}
		int i, bytes = 0;
		for (i = 0; i < r; i++) {
			struct mmsghdr *msg = &messages[i];
			/* char *buf = msg->msg_hdr.msg_iov->iov_base; */
			int len = msg->msg_len;
			msg->msg_hdr.msg_flags = 0;
			msg->msg_len = 0;
			bytes += len;
		}
	}
}

int main(int argc, const char *argv[])
{
	int packets_in_buf = 1024;
	const char *payload = (const char[32]){0};
	int payload_sz = 32;

	if (argc == 1) {
		FATAL("Usage: %s [target ip:port] [target ...]", argv[0]);
	}

	struct net_addr *target_addrs = calloc(argc-1, sizeof(struct net_addr));
	int thread_num = argc - 1;

	int t;
	for (t = 0; t < thread_num; t++) {
		const char *target_addr_str = argv[t+1];
		parse_addr(&target_addrs[t], target_addr_str);

		fprintf(stderr, "[*] Sending to %s, send buffer %i packets\n",
			addr_to_str(&target_addrs[t]), packets_in_buf);
	}

	struct state *array_of_states = calloc(thread_num, sizeof(struct state));

	for (t = 0; t < thread_num; t++) {
		struct state *state = &array_of_states[t];
		state->target_addr = &target_addrs[t];
		state->packets_in_buf = packets_in_buf;
		state->payload = payload;
		state->payload_sz = payload_sz;
		state->src_port = 11404;
		thread_spawn(thread_loop, state);
	}

	while (1) {
		struct timeval timeout =
			NSEC_TIMEVAL(MSEC_NSEC(1000UL));
		while (1) {
			int r = select(0, NULL, NULL, NULL, &timeout);
			if (r != 0) {
				continue;
			}
			if (TIMEVAL_NSEC(&timeout) == 0) {
				break;
			}
		}
		// pass
	}
	return 0;
}

UDP接收端程式碼如下:

#define _GNU_SOURCE // for recvmmsg

#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include "common.h"


#define MTU_SIZE (2048-64*2)
#define MAX_MSG 512

struct state {
	int fd;
	volatile uint64_t bps;
	volatile uint64_t pps;
	struct mmsghdr messages[MAX_MSG];
	char buffers[MAX_MSG][MTU_SIZE];
	struct iovec iovecs[MAX_MSG];
} __attribute__ ((aligned (64)));

struct state *state_init(struct state *s) {
	int i;
	for (i = 0; i < MAX_MSG; i++) {
		char *buf = &s->buffers[i][0];
		struct iovec *iovec = &s->iovecs[i];
		struct mmsghdr *msg = &s->messages[i];

		msg->msg_hdr.msg_iov = iovec;
		msg->msg_hdr.msg_iovlen = 1;

		iovec->iov_base = buf;
		iovec->iov_len = MTU_SIZE;
	}
	return s;
}

static void thread_loop(void *userdata)
{
	struct state *state = userdata;

	while (1) {
		/* Blocking recv. */
		int r = recvmmsg(state->fd, &state->messages[0], MAX_MSG, MSG_WAITFORONE, NULL);
		if (r <= 0) {
			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
				continue;
			}
			PFATAL("recvmmsg()");
		}

		int i, bytes = 0;
		for (i = 0; i < r; i++) {
			struct mmsghdr *msg = &state->messages[i];
			/* char *buf = msg->msg_hdr.msg_iov->iov_base; */
			int len = msg->msg_len;
			msg->msg_hdr.msg_flags = 0;
			msg->msg_len = 0;
			bytes += len;
		}
		__atomic_fetch_add(&state->pps, r, 0);
		__atomic_fetch_add(&state->bps, bytes, 0);
	}
}

int main(int argc, const char *argv[])
{
	const char *listen_addr_str = "0.0.0.0:4321";
	int recv_buf_size = 4*1024;
	int thread_num = 1;
	int reuseport = 0;

	switch (argc) {
	case 4:
		reuseport = atoi(argv[3]);
	case 3:
		thread_num = atoi(argv[2]);
	case 2:
		listen_addr_str = argv[1];
	case 1:
		break;
	default:
		FATAL("Usage: %s [listen ip:port] [fork cnt] [reuseport]", argv[0]);
	}


	struct net_addr listen_addr;
	parse_addr(&listen_addr, listen_addr_str);

	int main_fd = -1;
	if (reuseport == 0) {
		fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n",
			addr_to_str(&listen_addr), recv_buf_size / 1024);

		main_fd = net_bind_udp(&listen_addr, 0);
		net_set_buffer_size(main_fd, recv_buf_size, 0);
	}

	struct state *array_of_states = calloc(thread_num, sizeof(struct state));

	int t;
	for (t = 0; t < thread_num; t++) {
		struct state *state = &array_of_states[t];
		state_init(state);
		if (reuseport == 0) {
			state->fd = main_fd;
		} else {
			fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n",
				addr_to_str(&listen_addr), recv_buf_size / 1024);

			int fd = net_bind_udp(&listen_addr, 1);
			net_set_buffer_size(fd, recv_buf_size, 0);
			state->fd = fd;
		}
		thread_spawn(thread_loop, state);
	}

	uint64_t last_pps = 0;
	uint64_t last_bps = 0;

	while (1) {
		struct timeval timeout =
			NSEC_TIMEVAL(MSEC_NSEC(1000UL));
		while (1) {
			int r = select(0, NULL, NULL, NULL, &timeout);
			if (r != 0) {
				continue;
			}
			if (TIMEVAL_NSEC(&timeout) == 0) {
				break;
			}
		}

		uint64_t now_pps = 0, now_bps = 0;
		for (t = 0; t < thread_num; t++) {
			struct state *state = &array_of_states[t];
			now_pps += __atomic_load_n(&state->pps, 0);
			now_bps += __atomic_load_n(&state->bps, 0);
		}

		double delta_pps = now_pps - last_pps;
		double delta_bps = now_bps - last_bps;
		last_pps = now_pps;
		last_bps = now_bps;

		printf("%7.3fM pps %7.3fMiB / %7.3fMb\n",
		       delta_pps / 1000.0 / 1000.0,
		       delta_bps / 1024.0 / 1024.0,
		       delta_bps * 8.0 / 1000.0 / 1000.0 );
	}

	return 0;
}

 所包含的標頭檔案如下:

common.h

#define ERRORF(x...) fprintf(stderr, x)

#define FATAL(x...)                                                            \
	do {                                                                   \
		ERRORF("[-] PROGRAM ABORT : " x);                              \
		ERRORF("\n\tLocation : %s(), %s:%u\n\n", __FUNCTION__,         \
		       __FILE__, __LINE__);                                    \
		exit(EXIT_FAILURE);                                            \
	} while (0)

#define PFATAL(x...)                                                           \
	do {                                                                   \
		ERRORF("[-] SYSTEM ERROR : " x);                               \
		ERRORF("\n\tLocation : %s(), %s:%u\n", __FUNCTION__, __FILE__, \
		       __LINE__);                                              \
		perror("      OS message ");                                   \
		ERRORF("\n");                                                  \
		exit(EXIT_FAILURE);                                            \
	} while (0)

#define TIMESPEC_NSEC(ts) ((ts)->tv_sec * 1000000000ULL + (ts)->tv_nsec)
#define TIMEVAL_NSEC(ts)                                                       \
	((ts)->tv_sec * 1000000000ULL + (ts)->tv_usec * 1000ULL)
#define NSEC_TIMESPEC(ns)                                                      \
	(struct timespec) { (ns) / 1000000000ULL, (ns) % 1000000000ULL }
#define NSEC_TIMEVAL(ns)                                                       \
	(struct timeval)                                                       \
	{                                                                      \
		(ns) / 1000000000ULL, ((ns) % 1000000000ULL) / 1000ULL         \
	}
#define MSEC_NSEC(ms) ((ms)*1000000ULL)


/* net.c */
struct net_addr
{
	int ipver;
	struct sockaddr_in sin4;
	struct sockaddr_in6 sin6;
	struct sockaddr *sockaddr;
	int sockaddr_len;
};

void parse_addr(struct net_addr *netaddr, const char *addr);
const char *addr_to_str(struct net_addr *addr);
int net_bind_udp(struct net_addr *addr, int reuseport);
void net_set_buffer_size(int cd, int max, int send);
void net_gethostbyname(struct net_addr *shost, const char *host, int port);
int net_connect_udp(struct net_addr *addr, int src_port);

struct thread;
struct thread *thread_spawn(void (*callback)(void *), void *userdata);
net.c
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "common.h"

const char *str_quote(const char *s)
{
	static char buf[1024];
	int r = snprintf(buf, sizeof(buf), "\"%.*s\"", (int)sizeof(buf) - 4, s);
	if (r >= (int)sizeof(buf)) {
		buf[sizeof(buf) - 1] = 0;
	}
	return buf;
}


void net_set_buffer_size(int cd, int max, int send)
{
	int i, flag;

	if (send) {
		flag = SO_SNDBUF;
	} else {
		flag = SO_RCVBUF;
	}

	int size = 0;

	for (i = 0; i < 10 && size; i++) {
		int bef;
		socklen_t size = sizeof(bef);
		if (getsockopt(cd, SOL_SOCKET, flag, &bef, &size) < 0) {
			PFATAL("getsockopt(SOL_SOCKET)");
			break;
		}
		if (bef >= max) {
			break;
		}

		size = bef * 2;
		if (setsockopt(cd, SOL_SOCKET, flag, &size, sizeof(size)) < 0) {
			// don't log error, just break
			break;
		}
	}
}

void parse_addr(struct net_addr *netaddr, const char *addr) {
	char *colon = strrchr(addr, ':');
	if (colon == NULL) {
		FATAL("You forgot to specify port");
	}
	int port = atoi(colon+1);
	if (port < 0 || port > 65535) {
		FATAL("Invalid port number %d", port);
	}
	char host[255];
	int addr_len = colon-addr > 254 ? 254 : colon-addr;
	strncpy(host, addr, addr_len);
	host[addr_len] = '\0';
	net_gethostbyname(netaddr, host, port);
}

void net_gethostbyname(struct net_addr *shost, const char *host, int port)
{
	memset(shost, 0, sizeof(struct net_addr));

	struct in_addr in_addr;
	struct in6_addr in6_addr;

	/* Try ipv4 address first */
	if (inet_pton(AF_INET, host, &in_addr) == 1) {
		goto got_ipv4;
	}

	/* Then ipv6 */
	if (inet_pton(AF_INET6, host, &in6_addr) == 1) {
		goto got_ipv6;
	}

	FATAL("inet_pton(%s)", str_quote(host));
	return;

got_ipv4:
	shost->ipver = 4;
	shost->sockaddr = (struct sockaddr*)&shost->sin4;
	shost->sockaddr_len = sizeof(shost->sin4);
	shost->sin4.sin_family = AF_INET;
	shost->sin4.sin_port = htons(port);
	shost->sin4.sin_addr = in_addr;
	return;

got_ipv6:
	shost->ipver = 6;
	shost->sockaddr = (struct sockaddr*)&shost->sin6;
	shost->sockaddr_len = sizeof(shost->sin4);
	shost->sin6.sin6_family = AF_INET6;
	shost->sin6.sin6_port = htons(port);
	shost->sin6.sin6_addr = in6_addr;
	return;
}

const char *addr_to_str(struct net_addr *addr) {
	char dst[INET6_ADDRSTRLEN + 1];
	int port = 0;

	switch (addr->ipver) {
	case 4: {
		inet_ntop(AF_INET, &addr->sin4.sin_addr, dst, INET6_ADDRSTRLEN);
		port = ntohs(addr->sin4.sin_port);
	} break;
	case 16: {
		inet_ntop(AF_INET6, &addr->sin6.sin6_addr, dst, INET6_ADDRSTRLEN);
		port = ntohs(addr->sin6.sin6_port);
	} break;
	default:
		dst[0] = '?';
		dst[1] = 0x00;
	}

	static char buf[255];
	snprintf(buf, sizeof(buf), "%s:%i", dst, port);
	return buf;
}

int net_bind_udp(struct net_addr *shost, int reuseport)
{
	int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
	if (sd < 0) {
		PFATAL("socket()");
	}

	int one = 1;
	int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
			   sizeof(one));
	if (r < 0) {
		PFATAL("setsockopt(SO_REUSEADDR)");
	}

	if (reuseport) {
		one = 1;
		r = setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, (char*)&one, sizeof(one));
		if (r < 0) {
			PFATAL("setsockopt(SO_REUSEPORT)");
		}
	}

	if (bind(sd, shost->sockaddr, shost->sockaddr_len) < 0) {
		PFATAL("bind()");
	}

	return sd;
}

struct thread {
	pthread_t thread_id;
	void (*callback)(void *userdata);
	void *userdata;
};

static void *_thread_start(void *userdata)
{
	struct thread *thread = userdata;

	/* Direct all signals to main thread. */
	sigset_t set;
	sigfillset(&set);
	int r = pthread_sigmask(SIG_SETMASK, &set, NULL);
	if (r != 0) {
		PFATAL("pthread_sigmask()");
	}

	thread->callback(thread->userdata);
	return NULL;
}

struct thread *thread_spawn(void (*callback)(void *), void *userdata)
{
	struct thread *thread = calloc(1, sizeof(struct thread));
	thread->callback = callback;
	thread->userdata = userdata;
	int r = pthread_create(&thread->thread_id, NULL, _thread_start, thread);
	if (r != 0) {
		PFATAL("pthread_create()");
	}
	return thread;
}

int net_connect_udp(struct net_addr *shost, int src_port)
{
	int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
	if (sd < 0) {
		PFATAL("socket()");
	}

	int one = 1;
	int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
			   sizeof(one));
	if (r < 0) {
		PFATAL("setsockopt(SO_REUSEADDR)");
	}

	if (src_port > 1 && src_port < 65536) {
		struct net_addr src;
		memset(&src, 0, sizeof(struct net_addr));
		char buf[32];
		snprintf(buf, sizeof(buf), "0.0.0.0:%d", src_port);
		parse_addr(&src, buf);
		if (bind(sd, src.sockaddr, src.sockaddr_len) < 0) {
			PFATAL("bind()");
		}
	}



	if (-1 == connect(sd, shost->sockaddr, shost->sockaddr_len)) {
		/* is non-blocking, so we don't get error at that point yet */
		if (EINPROGRESS != errno) {
			PFATAL("connect()");
			return -1;
		}
	}

	return sd;
}

編譯指令碼如下:

build.sh

#!/bin/sh
set +e

clang -O3 -Wall -Wextra -Wno-unused-parameter \
    -ggdb -g -pthread \
    -o udpreceiver1 udpreceiver1.c \
    net.c

clang -O3 -Wall -Wextra -Wno-unused-parameter \
    -ggdb -g -pthread \
    -o udpsender udpsender.c \
    net.c




命令如下:

傳送端./udpsender 192.168.100.100:5000 […] 可指定多個執行緒

綁核使用 

taskset –c 1,2 ./udpsender 192.168.100.100:5000192.168.100.100:5000


接收端 ./udpreceiver 192.168.100.100:5000

綁核使用 

taskset –c 1 ./udpreceiver 192.168.100.100:5000


程式碼中可修改的地方如下:

1、  傳送資料包大小,預設為32

2、  一次傳送資料包的個數,預設為1024

3、  接收資料包的快取區,預設為4K

4、  一次接收的最大資料包個數,預設為512

5、  UDP資料包MTU大小,預設為2048-128

實際執行情況如下: