使用應用程式測試網絡卡收發UDP資料包效能
阿新 • • 發佈:2019-02-18
通常使用iperf進行網絡卡效能測試,但是有些情況下需要自己編寫應用程式來進行UDP資料包的收發,根據時間間隔統計出網絡卡的收發包資訊。
UDP傳送端程式碼如下:
#define _GNU_SOURCE // sendmmsg #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/socket.h> #include <errno.h> #include "common.h" struct state { struct net_addr *target_addr; int packets_in_buf; const char *payload; int payload_sz; int src_port; }; void thread_loop(void *userdata) { struct state *state = userdata; struct mmsghdr *messages = calloc(state->packets_in_buf, sizeof(struct mmsghdr)); struct iovec *iovecs = calloc(state->packets_in_buf, sizeof(struct iovec)); int fd = net_connect_udp(state->target_addr, state->src_port); int i; for (i = 0; i < state->packets_in_buf; i++) { struct iovec *iovec = &iovecs[i]; struct mmsghdr *msg = &messages[i]; msg->msg_hdr.msg_iov = iovec; msg->msg_hdr.msg_iovlen = 1; iovec->iov_base = (void*)state->payload; iovec->iov_len = state->payload_sz; } while (1) { int r = sendmmsg(fd, messages, state->packets_in_buf, 0); if (r <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } if (errno == ECONNREFUSED) { continue; } PFATAL("sendmmsg()"); } int i, bytes = 0; for (i = 0; i < r; i++) { struct mmsghdr *msg = &messages[i]; /* char *buf = msg->msg_hdr.msg_iov->iov_base; */ int len = msg->msg_len; msg->msg_hdr.msg_flags = 0; msg->msg_len = 0; bytes += len; } } } int main(int argc, const char *argv[]) { int packets_in_buf = 1024; const char *payload = (const char[32]){0}; int payload_sz = 32; if (argc == 1) { FATAL("Usage: %s [target ip:port] [target ...]", argv[0]); } struct net_addr *target_addrs = calloc(argc-1, sizeof(struct net_addr)); int thread_num = argc - 1; int t; for (t = 0; t < thread_num; t++) { const char *target_addr_str = argv[t+1]; parse_addr(&target_addrs[t], target_addr_str); fprintf(stderr, "[*] Sending to %s, send buffer %i packets\n", addr_to_str(&target_addrs[t]), packets_in_buf); } struct state *array_of_states = calloc(thread_num, sizeof(struct state)); for (t = 0; t < thread_num; t++) { struct state *state = &array_of_states[t]; state->target_addr = &target_addrs[t]; state->packets_in_buf = packets_in_buf; state->payload = payload; state->payload_sz = payload_sz; state->src_port = 11404; thread_spawn(thread_loop, state); } while (1) { struct timeval timeout = NSEC_TIMEVAL(MSEC_NSEC(1000UL)); while (1) { int r = select(0, NULL, NULL, NULL, &timeout); if (r != 0) { continue; } if (TIMEVAL_NSEC(&timeout) == 0) { break; } } // pass } return 0; }
UDP接收端程式碼如下:
#define _GNU_SOURCE // for recvmmsg #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdint.h> #include <stdlib.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include "common.h" #define MTU_SIZE (2048-64*2) #define MAX_MSG 512 struct state { int fd; volatile uint64_t bps; volatile uint64_t pps; struct mmsghdr messages[MAX_MSG]; char buffers[MAX_MSG][MTU_SIZE]; struct iovec iovecs[MAX_MSG]; } __attribute__ ((aligned (64))); struct state *state_init(struct state *s) { int i; for (i = 0; i < MAX_MSG; i++) { char *buf = &s->buffers[i][0]; struct iovec *iovec = &s->iovecs[i]; struct mmsghdr *msg = &s->messages[i]; msg->msg_hdr.msg_iov = iovec; msg->msg_hdr.msg_iovlen = 1; iovec->iov_base = buf; iovec->iov_len = MTU_SIZE; } return s; } static void thread_loop(void *userdata) { struct state *state = userdata; while (1) { /* Blocking recv. */ int r = recvmmsg(state->fd, &state->messages[0], MAX_MSG, MSG_WAITFORONE, NULL); if (r <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } PFATAL("recvmmsg()"); } int i, bytes = 0; for (i = 0; i < r; i++) { struct mmsghdr *msg = &state->messages[i]; /* char *buf = msg->msg_hdr.msg_iov->iov_base; */ int len = msg->msg_len; msg->msg_hdr.msg_flags = 0; msg->msg_len = 0; bytes += len; } __atomic_fetch_add(&state->pps, r, 0); __atomic_fetch_add(&state->bps, bytes, 0); } } int main(int argc, const char *argv[]) { const char *listen_addr_str = "0.0.0.0:4321"; int recv_buf_size = 4*1024; int thread_num = 1; int reuseport = 0; switch (argc) { case 4: reuseport = atoi(argv[3]); case 3: thread_num = atoi(argv[2]); case 2: listen_addr_str = argv[1]; case 1: break; default: FATAL("Usage: %s [listen ip:port] [fork cnt] [reuseport]", argv[0]); } struct net_addr listen_addr; parse_addr(&listen_addr, listen_addr_str); int main_fd = -1; if (reuseport == 0) { fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n", addr_to_str(&listen_addr), recv_buf_size / 1024); main_fd = net_bind_udp(&listen_addr, 0); net_set_buffer_size(main_fd, recv_buf_size, 0); } struct state *array_of_states = calloc(thread_num, sizeof(struct state)); int t; for (t = 0; t < thread_num; t++) { struct state *state = &array_of_states[t]; state_init(state); if (reuseport == 0) { state->fd = main_fd; } else { fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n", addr_to_str(&listen_addr), recv_buf_size / 1024); int fd = net_bind_udp(&listen_addr, 1); net_set_buffer_size(fd, recv_buf_size, 0); state->fd = fd; } thread_spawn(thread_loop, state); } uint64_t last_pps = 0; uint64_t last_bps = 0; while (1) { struct timeval timeout = NSEC_TIMEVAL(MSEC_NSEC(1000UL)); while (1) { int r = select(0, NULL, NULL, NULL, &timeout); if (r != 0) { continue; } if (TIMEVAL_NSEC(&timeout) == 0) { break; } } uint64_t now_pps = 0, now_bps = 0; for (t = 0; t < thread_num; t++) { struct state *state = &array_of_states[t]; now_pps += __atomic_load_n(&state->pps, 0); now_bps += __atomic_load_n(&state->bps, 0); } double delta_pps = now_pps - last_pps; double delta_bps = now_bps - last_bps; last_pps = now_pps; last_bps = now_bps; printf("%7.3fM pps %7.3fMiB / %7.3fMb\n", delta_pps / 1000.0 / 1000.0, delta_bps / 1024.0 / 1024.0, delta_bps * 8.0 / 1000.0 / 1000.0 ); } return 0; }
所包含的標頭檔案如下:
common.h
net.c#define ERRORF(x...) fprintf(stderr, x) #define FATAL(x...) \ do { \ ERRORF("[-] PROGRAM ABORT : " x); \ ERRORF("\n\tLocation : %s(), %s:%u\n\n", __FUNCTION__, \ __FILE__, __LINE__); \ exit(EXIT_FAILURE); \ } while (0) #define PFATAL(x...) \ do { \ ERRORF("[-] SYSTEM ERROR : " x); \ ERRORF("\n\tLocation : %s(), %s:%u\n", __FUNCTION__, __FILE__, \ __LINE__); \ perror(" OS message "); \ ERRORF("\n"); \ exit(EXIT_FAILURE); \ } while (0) #define TIMESPEC_NSEC(ts) ((ts)->tv_sec * 1000000000ULL + (ts)->tv_nsec) #define TIMEVAL_NSEC(ts) \ ((ts)->tv_sec * 1000000000ULL + (ts)->tv_usec * 1000ULL) #define NSEC_TIMESPEC(ns) \ (struct timespec) { (ns) / 1000000000ULL, (ns) % 1000000000ULL } #define NSEC_TIMEVAL(ns) \ (struct timeval) \ { \ (ns) / 1000000000ULL, ((ns) % 1000000000ULL) / 1000ULL \ } #define MSEC_NSEC(ms) ((ms)*1000000ULL) /* net.c */ struct net_addr { int ipver; struct sockaddr_in sin4; struct sockaddr_in6 sin6; struct sockaddr *sockaddr; int sockaddr_len; }; void parse_addr(struct net_addr *netaddr, const char *addr); const char *addr_to_str(struct net_addr *addr); int net_bind_udp(struct net_addr *addr, int reuseport); void net_set_buffer_size(int cd, int max, int send); void net_gethostbyname(struct net_addr *shost, const char *host, int port); int net_connect_udp(struct net_addr *addr, int src_port); struct thread; struct thread *thread_spawn(void (*callback)(void *), void *userdata);
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
const char *str_quote(const char *s)
{
static char buf[1024];
int r = snprintf(buf, sizeof(buf), "\"%.*s\"", (int)sizeof(buf) - 4, s);
if (r >= (int)sizeof(buf)) {
buf[sizeof(buf) - 1] = 0;
}
return buf;
}
void net_set_buffer_size(int cd, int max, int send)
{
int i, flag;
if (send) {
flag = SO_SNDBUF;
} else {
flag = SO_RCVBUF;
}
int size = 0;
for (i = 0; i < 10 && size; i++) {
int bef;
socklen_t size = sizeof(bef);
if (getsockopt(cd, SOL_SOCKET, flag, &bef, &size) < 0) {
PFATAL("getsockopt(SOL_SOCKET)");
break;
}
if (bef >= max) {
break;
}
size = bef * 2;
if (setsockopt(cd, SOL_SOCKET, flag, &size, sizeof(size)) < 0) {
// don't log error, just break
break;
}
}
}
void parse_addr(struct net_addr *netaddr, const char *addr) {
char *colon = strrchr(addr, ':');
if (colon == NULL) {
FATAL("You forgot to specify port");
}
int port = atoi(colon+1);
if (port < 0 || port > 65535) {
FATAL("Invalid port number %d", port);
}
char host[255];
int addr_len = colon-addr > 254 ? 254 : colon-addr;
strncpy(host, addr, addr_len);
host[addr_len] = '\0';
net_gethostbyname(netaddr, host, port);
}
void net_gethostbyname(struct net_addr *shost, const char *host, int port)
{
memset(shost, 0, sizeof(struct net_addr));
struct in_addr in_addr;
struct in6_addr in6_addr;
/* Try ipv4 address first */
if (inet_pton(AF_INET, host, &in_addr) == 1) {
goto got_ipv4;
}
/* Then ipv6 */
if (inet_pton(AF_INET6, host, &in6_addr) == 1) {
goto got_ipv6;
}
FATAL("inet_pton(%s)", str_quote(host));
return;
got_ipv4:
shost->ipver = 4;
shost->sockaddr = (struct sockaddr*)&shost->sin4;
shost->sockaddr_len = sizeof(shost->sin4);
shost->sin4.sin_family = AF_INET;
shost->sin4.sin_port = htons(port);
shost->sin4.sin_addr = in_addr;
return;
got_ipv6:
shost->ipver = 6;
shost->sockaddr = (struct sockaddr*)&shost->sin6;
shost->sockaddr_len = sizeof(shost->sin4);
shost->sin6.sin6_family = AF_INET6;
shost->sin6.sin6_port = htons(port);
shost->sin6.sin6_addr = in6_addr;
return;
}
const char *addr_to_str(struct net_addr *addr) {
char dst[INET6_ADDRSTRLEN + 1];
int port = 0;
switch (addr->ipver) {
case 4: {
inet_ntop(AF_INET, &addr->sin4.sin_addr, dst, INET6_ADDRSTRLEN);
port = ntohs(addr->sin4.sin_port);
} break;
case 16: {
inet_ntop(AF_INET6, &addr->sin6.sin6_addr, dst, INET6_ADDRSTRLEN);
port = ntohs(addr->sin6.sin6_port);
} break;
default:
dst[0] = '?';
dst[1] = 0x00;
}
static char buf[255];
snprintf(buf, sizeof(buf), "%s:%i", dst, port);
return buf;
}
int net_bind_udp(struct net_addr *shost, int reuseport)
{
int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0) {
PFATAL("socket()");
}
int one = 1;
int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEADDR)");
}
if (reuseport) {
one = 1;
r = setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, (char*)&one, sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEPORT)");
}
}
if (bind(sd, shost->sockaddr, shost->sockaddr_len) < 0) {
PFATAL("bind()");
}
return sd;
}
struct thread {
pthread_t thread_id;
void (*callback)(void *userdata);
void *userdata;
};
static void *_thread_start(void *userdata)
{
struct thread *thread = userdata;
/* Direct all signals to main thread. */
sigset_t set;
sigfillset(&set);
int r = pthread_sigmask(SIG_SETMASK, &set, NULL);
if (r != 0) {
PFATAL("pthread_sigmask()");
}
thread->callback(thread->userdata);
return NULL;
}
struct thread *thread_spawn(void (*callback)(void *), void *userdata)
{
struct thread *thread = calloc(1, sizeof(struct thread));
thread->callback = callback;
thread->userdata = userdata;
int r = pthread_create(&thread->thread_id, NULL, _thread_start, thread);
if (r != 0) {
PFATAL("pthread_create()");
}
return thread;
}
int net_connect_udp(struct net_addr *shost, int src_port)
{
int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0) {
PFATAL("socket()");
}
int one = 1;
int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEADDR)");
}
if (src_port > 1 && src_port < 65536) {
struct net_addr src;
memset(&src, 0, sizeof(struct net_addr));
char buf[32];
snprintf(buf, sizeof(buf), "0.0.0.0:%d", src_port);
parse_addr(&src, buf);
if (bind(sd, src.sockaddr, src.sockaddr_len) < 0) {
PFATAL("bind()");
}
}
if (-1 == connect(sd, shost->sockaddr, shost->sockaddr_len)) {
/* is non-blocking, so we don't get error at that point yet */
if (EINPROGRESS != errno) {
PFATAL("connect()");
return -1;
}
}
return sd;
}
編譯指令碼如下:
build.sh
#!/bin/sh
set +e
clang -O3 -Wall -Wextra -Wno-unused-parameter \
-ggdb -g -pthread \
-o udpreceiver1 udpreceiver1.c \
net.c
clang -O3 -Wall -Wextra -Wno-unused-parameter \
-ggdb -g -pthread \
-o udpsender udpsender.c \
net.c
命令如下:
傳送端./udpsender 192.168.100.100:5000 […] 可指定多個執行緒
綁核使用
taskset –c 1,2 ./udpsender 192.168.100.100:5000192.168.100.100:5000
接收端 ./udpreceiver 192.168.100.100:5000
綁核使用
taskset –c 1 ./udpreceiver 192.168.100.100:5000
程式碼中可修改的地方如下:
1、 傳送資料包大小,預設為32
2、 一次傳送資料包的個數,預設為1024
3、 接收資料包的快取區,預設為4K
4、 一次接收的最大資料包個數,預設為512
5、 UDP資料包MTU大小,預設為2048-128
實際執行情況如下: