xref: /linux/tools/testing/selftests/bpf/benchs/bench_sockmap.c (revision 9fd2da71c301184d98fe37674ca8d017d1ce6600)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <error.h>
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7 #include <sys/sendfile.h>
8 #include <arpa/inet.h>
9 #include <fcntl.h>
10 #include <argp.h>
11 #include "bench.h"
12 #include "bench_sockmap_prog.skel.h"
13 
14 #define FILE_SIZE (128 * 1024)
15 #define DATA_REPEAT_SIZE 10
16 
17 static const char snd_data[DATA_REPEAT_SIZE] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
18 
19 /* c1 <-> [p1, p2] <-> c2
20  * RX bench(BPF_SK_SKB_STREAM_VERDICT):
21  *	ARG_FW_RX_PASS:
22  *		send(p2) -> recv(c2) -> bpf skb passthrough -> recv(c2)
23  *	ARG_FW_RX_VERDICT_EGRESS:
24  *		send(c1) -> verdict skb to tx queuec of p2 -> recv(c2)
25  *	ARG_FW_RX_VERDICT_INGRESS:
26  *		send(c1) -> verdict skb to rx queuec of c2 -> recv(c2)
27  *
28  * TX bench(BPF_SK_MSG_VERDIC):
29  *	ARG_FW_TX_PASS:
30  *		send(p2) -> bpf msg passthrough -> send(p2) -> recv(c2)
31  *	ARG_FW_TX_VERDICT_INGRESS:
32  *		send(p2) -> verdict msg to rx queue of c2 -> recv(c2)
33  *	ARG_FW_TX_VERDICT_EGRESS:
34  *		send(p1) -> verdict msg to tx queue of p2 -> recv(c2)
35  */
36 enum SOCKMAP_ARG_FLAG {
37 	ARG_FW_RX_NORMAL = 11000,
38 	ARG_FW_RX_PASS,
39 	ARG_FW_RX_VERDICT_EGRESS,
40 	ARG_FW_RX_VERDICT_INGRESS,
41 	ARG_FW_TX_NORMAL,
42 	ARG_FW_TX_PASS,
43 	ARG_FW_TX_VERDICT_INGRESS,
44 	ARG_FW_TX_VERDICT_EGRESS,
45 	ARG_CTL_RX_STRP,
46 	ARG_CONSUMER_DELAY_TIME,
47 	ARG_PRODUCER_DURATION,
48 };
49 
50 #define TXMODE_NORMAL()				\
51 	((ctx.mode) == ARG_FW_TX_NORMAL)
52 
53 #define TXMODE_BPF_INGRESS()			\
54 	((ctx.mode) == ARG_FW_TX_VERDICT_INGRESS)
55 
56 #define TXMODE_BPF_EGRESS()			\
57 	((ctx.mode) == ARG_FW_TX_VERDICT_EGRESS)
58 
59 #define TXMODE_BPF_PASS()			\
60 	((ctx.mode) == ARG_FW_TX_PASS)
61 
62 #define TXMODE_BPF() (				\
63 	TXMODE_BPF_PASS() ||			\
64 	TXMODE_BPF_INGRESS() ||			\
65 	TXMODE_BPF_EGRESS())
66 
67 #define TXMODE() (				\
68 	TXMODE_NORMAL() ||			\
69 	TXMODE_BPF())
70 
71 #define RXMODE_NORMAL()				\
72 	((ctx.mode) == ARG_FW_RX_NORMAL)
73 
74 #define RXMODE_BPF_PASS()			\
75 	((ctx.mode) == ARG_FW_RX_PASS)
76 
77 #define RXMODE_BPF_VERDICT_EGRESS()		\
78 	((ctx.mode) == ARG_FW_RX_VERDICT_EGRESS)
79 
80 #define RXMODE_BPF_VERDICT_INGRESS()		\
81 	((ctx.mode) == ARG_FW_RX_VERDICT_INGRESS)
82 
83 #define RXMODE_BPF_VERDICT() (			\
84 	RXMODE_BPF_VERDICT_INGRESS() ||		\
85 	RXMODE_BPF_VERDICT_EGRESS())
86 
87 #define RXMODE_BPF() (				\
88 	RXMODE_BPF_PASS() ||			\
89 	RXMODE_BPF_VERDICT())
90 
91 #define RXMODE() (				\
92 	RXMODE_NORMAL() ||			\
93 	RXMODE_BPF())
94 
95 static struct socmap_ctx {
96 	struct bench_sockmap_prog *skel;
97 	enum SOCKMAP_ARG_FLAG mode;
98 	#define c1	fds[0]
99 	#define p1	fds[1]
100 	#define c2	fds[2]
101 	#define p2	fds[3]
102 	#define sfd	fds[4]
103 	int		fds[5];
104 	long		send_calls;
105 	long		read_calls;
106 	long		prod_send;
107 	long		user_read;
108 	int		file_size;
109 	int		delay_consumer;
110 	int		prod_run_time;
111 	int		strp_size;
112 } ctx = {
113 	.prod_send	= 0,
114 	.user_read	= 0,
115 	.file_size	= FILE_SIZE,
116 	.mode		= ARG_FW_RX_VERDICT_EGRESS,
117 	.fds		= {0},
118 	.delay_consumer = 0,
119 	.prod_run_time	= 0,
120 	.strp_size	= 0,
121 };
122 
123 static void bench_sockmap_prog_destroy(void)
124 {
125 	int i;
126 
127 	for (i = 0; i < sizeof(ctx.fds); i++) {
128 		if (ctx.fds[0] > 0)
129 			close(ctx.fds[i]);
130 	}
131 
132 	bench_sockmap_prog__destroy(ctx.skel);
133 }
134 
135 static void init_addr(struct sockaddr_storage *ss,
136 		      socklen_t *len)
137 {
138 	struct sockaddr_in *addr4 = memset(ss, 0, sizeof(*ss));
139 
140 	addr4->sin_family = AF_INET;
141 	addr4->sin_port = 0;
142 	addr4->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
143 	*len = sizeof(*addr4);
144 }
145 
146 static bool set_non_block(int fd, bool blocking)
147 {
148 	int flags = fcntl(fd, F_GETFL, 0);
149 
150 	if (flags == -1)
151 		return false;
152 	flags = blocking ? (flags | O_NONBLOCK) : (flags & ~O_NONBLOCK);
153 	return (fcntl(fd, F_SETFL, flags) == 0);
154 }
155 
156 static int create_pair(int *c, int *p, int type)
157 {
158 	struct sockaddr_storage addr;
159 	int err, cfd, pfd;
160 	socklen_t addr_len = sizeof(struct sockaddr_storage);
161 
162 	err = getsockname(ctx.sfd, (struct sockaddr *)&addr, &addr_len);
163 	if (err) {
164 		fprintf(stderr, "getsockname error %d\n", errno);
165 		return err;
166 	}
167 	cfd = socket(AF_INET, type, 0);
168 	if (cfd < 0) {
169 		fprintf(stderr, "socket error %d\n", errno);
170 		return err;
171 	}
172 
173 	err = connect(cfd, (struct sockaddr *)&addr, addr_len);
174 	if (err && errno != EINPROGRESS) {
175 		fprintf(stderr, "connect error %d\n", errno);
176 		return err;
177 	}
178 
179 	pfd = accept(ctx.sfd, NULL, NULL);
180 	if (pfd < 0) {
181 		fprintf(stderr, "accept error %d\n", errno);
182 		return err;
183 	}
184 	*c = cfd;
185 	*p = pfd;
186 	return 0;
187 }
188 
189 static int create_sockets(void)
190 {
191 	struct sockaddr_storage addr;
192 	int err, one = 1;
193 	socklen_t addr_len;
194 
195 	init_addr(&addr, &addr_len);
196 	ctx.sfd = socket(AF_INET, SOCK_STREAM, 0);
197 	if (ctx.sfd < 0) {
198 		fprintf(stderr, "socket error:%d\n", errno);
199 		return ctx.sfd;
200 	}
201 	err = setsockopt(ctx.sfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
202 	if (err) {
203 		fprintf(stderr, "setsockopt error:%d\n", errno);
204 		return err;
205 	}
206 
207 	err = bind(ctx.sfd, (struct sockaddr *)&addr, addr_len);
208 	if (err) {
209 		fprintf(stderr, "bind error:%d\n", errno);
210 		return err;
211 	}
212 
213 	err = listen(ctx.sfd, SOMAXCONN);
214 	if (err) {
215 		fprintf(stderr, "listen error:%d\n", errno);
216 		return err;
217 	}
218 
219 	err = create_pair(&ctx.c1, &ctx.p1, SOCK_STREAM);
220 	if (err) {
221 		fprintf(stderr, "create_pair 1 error\n");
222 		return err;
223 	}
224 
225 	err = create_pair(&ctx.c2, &ctx.p2, SOCK_STREAM);
226 	if (err) {
227 		fprintf(stderr, "create_pair 2 error\n");
228 		return err;
229 	}
230 	printf("create socket fd c1:%d p1:%d c2:%d p2:%d\n",
231 	       ctx.c1, ctx.p1, ctx.c2, ctx.p2);
232 	return 0;
233 }
234 
235 static void validate(void)
236 {
237 	if (env.consumer_cnt != 2 || env.producer_cnt != 1 ||
238 	    !env.affinity)
239 		goto err;
240 	return;
241 err:
242 	fprintf(stderr, "argument '-c 2 -p 1 -a' is necessary");
243 	exit(1);
244 }
245 
246 static int setup_rx_sockmap(void)
247 {
248 	int verdict, pass, parser, map;
249 	int zero = 0, one = 1;
250 	int err;
251 
252 	parser = bpf_program__fd(ctx.skel->progs.prog_skb_parser);
253 	verdict = bpf_program__fd(ctx.skel->progs.prog_skb_verdict);
254 	pass = bpf_program__fd(ctx.skel->progs.prog_skb_pass);
255 	map = bpf_map__fd(ctx.skel->maps.sock_map_rx);
256 
257 	if (ctx.strp_size != 0) {
258 		ctx.skel->bss->pkt_size = ctx.strp_size;
259 		err = bpf_prog_attach(parser, map, BPF_SK_SKB_STREAM_PARSER, 0);
260 		if (err)
261 			return err;
262 	}
263 
264 	if (RXMODE_BPF_VERDICT())
265 		err = bpf_prog_attach(verdict, map, BPF_SK_SKB_STREAM_VERDICT, 0);
266 	else if (RXMODE_BPF_PASS())
267 		err = bpf_prog_attach(pass, map, BPF_SK_SKB_STREAM_VERDICT, 0);
268 	if (err)
269 		return err;
270 
271 	if (RXMODE_BPF_PASS())
272 		return bpf_map_update_elem(map, &zero, &ctx.c2, BPF_NOEXIST);
273 
274 	err = bpf_map_update_elem(map, &zero, &ctx.p1, BPF_NOEXIST);
275 	if (err < 0)
276 		return err;
277 
278 	if (RXMODE_BPF_VERDICT_INGRESS()) {
279 		ctx.skel->bss->verdict_dir = BPF_F_INGRESS;
280 		err = bpf_map_update_elem(map, &one, &ctx.c2, BPF_NOEXIST);
281 	} else {
282 		err = bpf_map_update_elem(map, &one, &ctx.p2, BPF_NOEXIST);
283 	}
284 	if (err < 0)
285 		return err;
286 
287 	return 0;
288 }
289 
290 static int setup_tx_sockmap(void)
291 {
292 	int zero = 0, one = 1;
293 	int prog, map;
294 	int err;
295 
296 	map = bpf_map__fd(ctx.skel->maps.sock_map_tx);
297 	prog = TXMODE_BPF_PASS() ?
298 		bpf_program__fd(ctx.skel->progs.prog_skmsg_pass) :
299 		bpf_program__fd(ctx.skel->progs.prog_skmsg_verdict);
300 
301 	err = bpf_prog_attach(prog, map, BPF_SK_MSG_VERDICT, 0);
302 	if (err)
303 		return err;
304 
305 	if (TXMODE_BPF_EGRESS()) {
306 		err = bpf_map_update_elem(map, &zero, &ctx.p1, BPF_NOEXIST);
307 		err |= bpf_map_update_elem(map, &one, &ctx.p2, BPF_NOEXIST);
308 	} else {
309 		ctx.skel->bss->verdict_dir = BPF_F_INGRESS;
310 		err = bpf_map_update_elem(map, &zero, &ctx.p2, BPF_NOEXIST);
311 		err |= bpf_map_update_elem(map, &one, &ctx.c2, BPF_NOEXIST);
312 	}
313 
314 	if (err < 0)
315 		return err;
316 
317 	return 0;
318 }
319 
320 static void setup(void)
321 {
322 	int err;
323 
324 	ctx.skel = bench_sockmap_prog__open_and_load();
325 	if (!ctx.skel) {
326 		fprintf(stderr, "error loading skel\n");
327 		exit(1);
328 	}
329 
330 	if (create_sockets()) {
331 		fprintf(stderr, "create_net_mode error\n");
332 		goto err;
333 	}
334 
335 	if (RXMODE_BPF()) {
336 		err = setup_rx_sockmap();
337 		if (err) {
338 			fprintf(stderr, "setup_rx_sockmap error:%d\n", err);
339 			goto err;
340 		}
341 	} else if (TXMODE_BPF()) {
342 		err = setup_tx_sockmap();
343 		if (err) {
344 			fprintf(stderr, "setup_tx_sockmap error:%d\n", err);
345 			goto err;
346 		}
347 	} else {
348 		fprintf(stderr, "unknown sockmap bench mode: %d\n", ctx.mode);
349 		goto err;
350 	}
351 
352 	return;
353 
354 err:
355 	bench_sockmap_prog_destroy();
356 	exit(1);
357 }
358 
359 static void measure(struct bench_res *res)
360 {
361 	res->drops = atomic_swap(&ctx.prod_send, 0);
362 	res->hits = atomic_swap(&ctx.skel->bss->process_byte, 0);
363 	res->false_hits = atomic_swap(&ctx.user_read, 0);
364 	res->important_hits = atomic_swap(&ctx.send_calls, 0);
365 	res->important_hits |= atomic_swap(&ctx.read_calls, 0) << 32;
366 }
367 
368 static void verify_data(int *check_pos, char *buf, int rcv)
369 {
370 	for (int i = 0 ; i < rcv; i++) {
371 		if (buf[i] != snd_data[(*check_pos) % DATA_REPEAT_SIZE]) {
372 			fprintf(stderr, "verify data fail");
373 			exit(1);
374 		}
375 		(*check_pos)++;
376 		if (*check_pos >= FILE_SIZE)
377 			*check_pos = 0;
378 	}
379 }
380 
381 static void *consumer(void *input)
382 {
383 	int rcv, sent;
384 	int check_pos = 0;
385 	int tid = (long)input;
386 	int recv_buf_size = FILE_SIZE;
387 	char *buf = malloc(recv_buf_size);
388 	int delay_read = ctx.delay_consumer;
389 
390 	if (!buf) {
391 		fprintf(stderr, "fail to init read buffer");
392 		return NULL;
393 	}
394 
395 	while (true) {
396 		if (tid == 1) {
397 			/* consumer 1 is unused for tx test and stream verdict test */
398 			if (RXMODE_BPF() || TXMODE())
399 				return NULL;
400 			/* it's only for RX_NORMAL which service as reserve-proxy mode */
401 			rcv = read(ctx.p1, buf, recv_buf_size);
402 			if (rcv < 0) {
403 				fprintf(stderr, "fail to read p1");
404 				return NULL;
405 			}
406 
407 			sent = send(ctx.p2, buf, recv_buf_size, 0);
408 			if (sent < 0) {
409 				fprintf(stderr, "fail to send p2");
410 				return NULL;
411 			}
412 		} else {
413 			if (delay_read != 0) {
414 				if (delay_read < 0)
415 					return NULL;
416 				sleep(delay_read);
417 				delay_read = 0;
418 			}
419 			/* read real endpoint by consumer 0 */
420 			atomic_inc(&ctx.read_calls);
421 			rcv = read(ctx.c2, buf, recv_buf_size);
422 			if (rcv < 0 && errno != EAGAIN) {
423 				fprintf(stderr, "%s fail to read c2 %d\n", __func__, errno);
424 				return NULL;
425 			}
426 			verify_data(&check_pos, buf, rcv);
427 			atomic_add(&ctx.user_read, rcv);
428 		}
429 	}
430 
431 	return NULL;
432 }
433 
434 static void *producer(void *input)
435 {
436 	int off = 0, fp, need_sent, sent;
437 	int file_size = ctx.file_size;
438 	struct timespec ts1, ts2;
439 	int target;
440 	FILE *file;
441 
442 	file = tmpfile();
443 	if (!file) {
444 		fprintf(stderr, "create file for sendfile");
445 		return NULL;
446 	}
447 
448 	/* we need simple verify */
449 	for (int i = 0; i < file_size; i++) {
450 		if (fwrite(&snd_data[off], sizeof(char), 1, file) != 1) {
451 			fprintf(stderr, "init tmpfile error");
452 			return NULL;
453 		}
454 		if (++off >= sizeof(snd_data))
455 			off = 0;
456 	}
457 	fflush(file);
458 	fseek(file, 0, SEEK_SET);
459 
460 	fp = fileno(file);
461 	need_sent = file_size;
462 	clock_gettime(CLOCK_MONOTONIC, &ts1);
463 
464 	if (RXMODE_BPF_VERDICT())
465 		target = ctx.c1;
466 	else if (TXMODE_BPF_EGRESS())
467 		target = ctx.p1;
468 	else
469 		target = ctx.p2;
470 	set_non_block(target, true);
471 	while (true) {
472 		if (ctx.prod_run_time) {
473 			clock_gettime(CLOCK_MONOTONIC, &ts2);
474 			if (ts2.tv_sec - ts1.tv_sec > ctx.prod_run_time)
475 				return NULL;
476 		}
477 
478 		errno = 0;
479 		atomic_inc(&ctx.send_calls);
480 		sent = sendfile(target, fp, NULL, need_sent);
481 		if (sent < 0) {
482 			if (errno != EAGAIN && errno != ENOMEM && errno != ENOBUFS) {
483 				fprintf(stderr, "sendfile return %d, errorno %d:%s\n",
484 					sent, errno, strerror(errno));
485 				return NULL;
486 			}
487 			continue;
488 		} else if (sent < need_sent) {
489 			need_sent -= sent;
490 			atomic_add(&ctx.prod_send, sent);
491 			continue;
492 		}
493 		atomic_add(&ctx.prod_send, need_sent);
494 		need_sent = file_size;
495 		lseek(fp, 0, SEEK_SET);
496 	}
497 
498 	return NULL;
499 }
500 
501 static void report_progress(int iter, struct bench_res *res, long delta_ns)
502 {
503 	double speed_mbs, prod_mbs, bpf_mbs, send_hz, read_hz;
504 
505 	prod_mbs = res->drops / 1000000.0 / (delta_ns / 1000000000.0);
506 	speed_mbs = res->false_hits / 1000000.0 / (delta_ns / 1000000000.0);
507 	bpf_mbs = res->hits / 1000000.0 / (delta_ns / 1000000000.0);
508 	send_hz = (res->important_hits & 0xFFFFFFFF) / (delta_ns / 1000000000.0);
509 	read_hz = (res->important_hits >> 32) / (delta_ns / 1000000000.0);
510 
511 	printf("Iter %3d (%7.3lfus): ",
512 	       iter, (delta_ns - 1000000000) / 1000.0);
513 	printf("Send Speed %8.3lf MB/s (%8.3lf calls/s), BPF Speed %8.3lf MB/s, "
514 	       "Rcv Speed %8.3lf MB/s (%8.3lf calls/s)\n",
515 	       prod_mbs, send_hz, bpf_mbs, speed_mbs, read_hz);
516 }
517 
518 static void report_final(struct bench_res res[], int res_cnt)
519 {
520 	double verdict_mbs_mean = 0.0;
521 	long verdict_total = 0;
522 	int i;
523 
524 	for (i = 0; i < res_cnt; i++) {
525 		verdict_mbs_mean += res[i].hits / 1000000.0 / (0.0 + res_cnt);
526 		verdict_total += res[i].hits / 1000000.0;
527 	}
528 
529 	printf("Summary: total trans %8.3lu MB \u00B1 %5.3lf MB/s\n",
530 	       verdict_total, verdict_mbs_mean);
531 }
532 
533 static const struct argp_option opts[] = {
534 	{ "rx-normal", ARG_FW_RX_NORMAL, NULL, 0,
535 		"simple reserve-proxy mode, no bfp enabled"},
536 	{ "rx-pass", ARG_FW_RX_PASS, NULL, 0,
537 		"run bpf prog but no redir applied"},
538 	{ "rx-strp", ARG_CTL_RX_STRP, "Byte", 0,
539 		"enable strparser and set the encapsulation size"},
540 	{ "rx-verdict-egress", ARG_FW_RX_VERDICT_EGRESS, NULL, 0,
541 		"forward data with bpf(stream verdict)"},
542 	{ "rx-verdict-ingress", ARG_FW_RX_VERDICT_INGRESS, NULL, 0,
543 		"forward data with bpf(stream verdict)"},
544 	{ "tx-normal", ARG_FW_TX_NORMAL, NULL, 0,
545 		"simple c-s mode, no bfp enabled"},
546 	{ "tx-pass", ARG_FW_TX_PASS, NULL, 0,
547 		"run bpf prog but no redir applied"},
548 	{ "tx-verdict-ingress", ARG_FW_TX_VERDICT_INGRESS, NULL, 0,
549 		"forward msg to ingress queue of another socket"},
550 	{ "tx-verdict-egress", ARG_FW_TX_VERDICT_EGRESS, NULL, 0,
551 		"forward msg to egress queue of another socket"},
552 	{ "delay-consumer", ARG_CONSUMER_DELAY_TIME, "SEC", 0,
553 		"delay consumer start"},
554 	{ "producer-duration", ARG_PRODUCER_DURATION, "SEC", 0,
555 		"producer duration"},
556 	{},
557 };
558 
559 static error_t parse_arg(int key, char *arg, struct argp_state *state)
560 {
561 	switch (key) {
562 	case ARG_FW_RX_NORMAL...ARG_FW_TX_VERDICT_EGRESS:
563 		ctx.mode = key;
564 		break;
565 	case ARG_CONSUMER_DELAY_TIME:
566 		ctx.delay_consumer = strtol(arg, NULL, 10);
567 		break;
568 	case ARG_PRODUCER_DURATION:
569 		ctx.prod_run_time = strtol(arg, NULL, 10);
570 		break;
571 	case ARG_CTL_RX_STRP:
572 		ctx.strp_size = strtol(arg, NULL, 10);
573 		break;
574 	default:
575 		return ARGP_ERR_UNKNOWN;
576 	}
577 
578 	return 0;
579 }
580 
581 /* exported into benchmark runner */
582 const struct argp bench_sockmap_argp = {
583 	.options	= opts,
584 	.parser		= parse_arg,
585 };
586 
587 /* Benchmark performance of creating bpf local storage  */
588 const struct bench bench_sockmap = {
589 	.name			= "sockmap",
590 	.argp			= &bench_sockmap_argp,
591 	.validate		= validate,
592 	.setup			= setup,
593 	.producer_thread	= producer,
594 	.consumer_thread	= consumer,
595 	.measure		= measure,
596 	.report_progress	= report_progress,
597 	.report_final		= report_final,
598 };
599