1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <error.h> 4 #include <sys/types.h> 5 #include <sys/socket.h> 6 #include <netinet/in.h> 7 #include <sys/sendfile.h> 8 #include <arpa/inet.h> 9 #include <fcntl.h> 10 #include <argp.h> 11 #include "bench.h" 12 #include "bench_sockmap_prog.skel.h" 13 #include "bpf_util.h" 14 15 #define FILE_SIZE (128 * 1024) 16 #define DATA_REPEAT_SIZE 10 17 18 static const char snd_data[DATA_REPEAT_SIZE] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; 19 20 /* c1 <-> [p1, p2] <-> c2 21 * RX bench(BPF_SK_SKB_STREAM_VERDICT): 22 * ARG_FW_RX_PASS: 23 * send(p2) -> recv(c2) -> bpf skb passthrough -> recv(c2) 24 * ARG_FW_RX_VERDICT_EGRESS: 25 * send(c1) -> verdict skb to tx queuec of p2 -> recv(c2) 26 * ARG_FW_RX_VERDICT_INGRESS: 27 * send(c1) -> verdict skb to rx queuec of c2 -> recv(c2) 28 * 29 * TX bench(BPF_SK_MSG_VERDIC): 30 * ARG_FW_TX_PASS: 31 * send(p2) -> bpf msg passthrough -> send(p2) -> recv(c2) 32 * ARG_FW_TX_VERDICT_INGRESS: 33 * send(p2) -> verdict msg to rx queue of c2 -> recv(c2) 34 * ARG_FW_TX_VERDICT_EGRESS: 35 * send(p1) -> verdict msg to tx queue of p2 -> recv(c2) 36 */ 37 enum SOCKMAP_ARG_FLAG { 38 ARG_FW_RX_NORMAL = 11000, 39 ARG_FW_RX_PASS, 40 ARG_FW_RX_VERDICT_EGRESS, 41 ARG_FW_RX_VERDICT_INGRESS, 42 ARG_FW_TX_NORMAL, 43 ARG_FW_TX_PASS, 44 ARG_FW_TX_VERDICT_INGRESS, 45 ARG_FW_TX_VERDICT_EGRESS, 46 ARG_CTL_RX_STRP, 47 ARG_CONSUMER_DELAY_TIME, 48 ARG_PRODUCER_DURATION, 49 }; 50 51 #define TXMODE_NORMAL() \ 52 ((ctx.mode) == ARG_FW_TX_NORMAL) 53 54 #define TXMODE_BPF_INGRESS() \ 55 ((ctx.mode) == ARG_FW_TX_VERDICT_INGRESS) 56 57 #define TXMODE_BPF_EGRESS() \ 58 ((ctx.mode) == ARG_FW_TX_VERDICT_EGRESS) 59 60 #define TXMODE_BPF_PASS() \ 61 ((ctx.mode) == ARG_FW_TX_PASS) 62 63 #define TXMODE_BPF() ( \ 64 TXMODE_BPF_PASS() || \ 65 TXMODE_BPF_INGRESS() || \ 66 TXMODE_BPF_EGRESS()) 67 68 #define TXMODE() ( \ 69 TXMODE_NORMAL() || \ 70 TXMODE_BPF()) 71 72 #define RXMODE_NORMAL() \ 73 ((ctx.mode) == ARG_FW_RX_NORMAL) 74 75 #define RXMODE_BPF_PASS() \ 76 ((ctx.mode) == ARG_FW_RX_PASS) 77 78 #define RXMODE_BPF_VERDICT_EGRESS() \ 79 ((ctx.mode) == ARG_FW_RX_VERDICT_EGRESS) 80 81 #define RXMODE_BPF_VERDICT_INGRESS() \ 82 ((ctx.mode) == ARG_FW_RX_VERDICT_INGRESS) 83 84 #define RXMODE_BPF_VERDICT() ( \ 85 RXMODE_BPF_VERDICT_INGRESS() || \ 86 RXMODE_BPF_VERDICT_EGRESS()) 87 88 #define RXMODE_BPF() ( \ 89 RXMODE_BPF_PASS() || \ 90 RXMODE_BPF_VERDICT()) 91 92 #define RXMODE() ( \ 93 RXMODE_NORMAL() || \ 94 RXMODE_BPF()) 95 96 static struct socmap_ctx { 97 struct bench_sockmap_prog *skel; 98 enum SOCKMAP_ARG_FLAG mode; 99 #define c1 fds[0] 100 #define p1 fds[1] 101 #define c2 fds[2] 102 #define p2 fds[3] 103 #define sfd fds[4] 104 int fds[5]; 105 long send_calls; 106 long read_calls; 107 long prod_send; 108 long user_read; 109 int file_size; 110 int delay_consumer; 111 int prod_run_time; 112 int strp_size; 113 } ctx = { 114 .prod_send = 0, 115 .user_read = 0, 116 .file_size = FILE_SIZE, 117 .mode = ARG_FW_RX_VERDICT_EGRESS, 118 .fds = {0}, 119 .delay_consumer = 0, 120 .prod_run_time = 0, 121 .strp_size = 0, 122 }; 123 124 static void bench_sockmap_prog_destroy(void) 125 { 126 int i; 127 128 for (i = 0; i < ARRAY_SIZE(ctx.fds); i++) { 129 if (ctx.fds[i] > 0) 130 close(ctx.fds[i]); 131 } 132 133 bench_sockmap_prog__destroy(ctx.skel); 134 } 135 136 static void init_addr(struct sockaddr_storage *ss, 137 socklen_t *len) 138 { 139 struct sockaddr_in *addr4 = memset(ss, 0, sizeof(*ss)); 140 141 addr4->sin_family = AF_INET; 142 addr4->sin_port = 0; 143 addr4->sin_addr.s_addr = htonl(INADDR_LOOPBACK); 144 *len = sizeof(*addr4); 145 } 146 147 static bool set_non_block(int fd, bool blocking) 148 { 149 int flags = fcntl(fd, F_GETFL, 0); 150 151 if (flags == -1) 152 return false; 153 flags = blocking ? (flags | O_NONBLOCK) : (flags & ~O_NONBLOCK); 154 return (fcntl(fd, F_SETFL, flags) == 0); 155 } 156 157 static int create_pair(int *c, int *p, int type) 158 { 159 struct sockaddr_storage addr; 160 int err, cfd, pfd; 161 socklen_t addr_len = sizeof(struct sockaddr_storage); 162 163 err = getsockname(ctx.sfd, (struct sockaddr *)&addr, &addr_len); 164 if (err) { 165 fprintf(stderr, "getsockname error %d\n", errno); 166 return err; 167 } 168 cfd = socket(AF_INET, type, 0); 169 if (cfd < 0) { 170 fprintf(stderr, "socket error %d\n", errno); 171 return err; 172 } 173 174 err = connect(cfd, (struct sockaddr *)&addr, addr_len); 175 if (err && errno != EINPROGRESS) { 176 fprintf(stderr, "connect error %d\n", errno); 177 return err; 178 } 179 180 pfd = accept(ctx.sfd, NULL, NULL); 181 if (pfd < 0) { 182 fprintf(stderr, "accept error %d\n", errno); 183 return err; 184 } 185 *c = cfd; 186 *p = pfd; 187 return 0; 188 } 189 190 static int create_sockets(void) 191 { 192 struct sockaddr_storage addr; 193 int err, one = 1; 194 socklen_t addr_len; 195 196 init_addr(&addr, &addr_len); 197 ctx.sfd = socket(AF_INET, SOCK_STREAM, 0); 198 if (ctx.sfd < 0) { 199 fprintf(stderr, "socket error:%d\n", errno); 200 return ctx.sfd; 201 } 202 err = setsockopt(ctx.sfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)); 203 if (err) { 204 fprintf(stderr, "setsockopt error:%d\n", errno); 205 return err; 206 } 207 208 err = bind(ctx.sfd, (struct sockaddr *)&addr, addr_len); 209 if (err) { 210 fprintf(stderr, "bind error:%d\n", errno); 211 return err; 212 } 213 214 err = listen(ctx.sfd, SOMAXCONN); 215 if (err) { 216 fprintf(stderr, "listen error:%d\n", errno); 217 return err; 218 } 219 220 err = create_pair(&ctx.c1, &ctx.p1, SOCK_STREAM); 221 if (err) { 222 fprintf(stderr, "create_pair 1 error\n"); 223 return err; 224 } 225 226 err = create_pair(&ctx.c2, &ctx.p2, SOCK_STREAM); 227 if (err) { 228 fprintf(stderr, "create_pair 2 error\n"); 229 return err; 230 } 231 printf("create socket fd c1:%d p1:%d c2:%d p2:%d\n", 232 ctx.c1, ctx.p1, ctx.c2, ctx.p2); 233 return 0; 234 } 235 236 static void validate(void) 237 { 238 if (env.consumer_cnt != 2 || env.producer_cnt != 1 || 239 !env.affinity) 240 goto err; 241 return; 242 err: 243 fprintf(stderr, "argument '-c 2 -p 1 -a' is necessary"); 244 exit(1); 245 } 246 247 static int setup_rx_sockmap(void) 248 { 249 int verdict, pass, parser, map; 250 int zero = 0, one = 1; 251 int err; 252 253 parser = bpf_program__fd(ctx.skel->progs.prog_skb_parser); 254 verdict = bpf_program__fd(ctx.skel->progs.prog_skb_verdict); 255 pass = bpf_program__fd(ctx.skel->progs.prog_skb_pass); 256 map = bpf_map__fd(ctx.skel->maps.sock_map_rx); 257 258 if (ctx.strp_size != 0) { 259 ctx.skel->bss->pkt_size = ctx.strp_size; 260 err = bpf_prog_attach(parser, map, BPF_SK_SKB_STREAM_PARSER, 0); 261 if (err) 262 return err; 263 } 264 265 if (RXMODE_BPF_VERDICT()) 266 err = bpf_prog_attach(verdict, map, BPF_SK_SKB_STREAM_VERDICT, 0); 267 else if (RXMODE_BPF_PASS()) 268 err = bpf_prog_attach(pass, map, BPF_SK_SKB_STREAM_VERDICT, 0); 269 if (err) 270 return err; 271 272 if (RXMODE_BPF_PASS()) 273 return bpf_map_update_elem(map, &zero, &ctx.c2, BPF_NOEXIST); 274 275 err = bpf_map_update_elem(map, &zero, &ctx.p1, BPF_NOEXIST); 276 if (err < 0) 277 return err; 278 279 if (RXMODE_BPF_VERDICT_INGRESS()) { 280 ctx.skel->bss->verdict_dir = BPF_F_INGRESS; 281 err = bpf_map_update_elem(map, &one, &ctx.c2, BPF_NOEXIST); 282 } else { 283 err = bpf_map_update_elem(map, &one, &ctx.p2, BPF_NOEXIST); 284 } 285 if (err < 0) 286 return err; 287 288 return 0; 289 } 290 291 static int setup_tx_sockmap(void) 292 { 293 int zero = 0, one = 1; 294 int prog, map; 295 int err; 296 297 map = bpf_map__fd(ctx.skel->maps.sock_map_tx); 298 prog = TXMODE_BPF_PASS() ? 299 bpf_program__fd(ctx.skel->progs.prog_skmsg_pass) : 300 bpf_program__fd(ctx.skel->progs.prog_skmsg_verdict); 301 302 err = bpf_prog_attach(prog, map, BPF_SK_MSG_VERDICT, 0); 303 if (err) 304 return err; 305 306 if (TXMODE_BPF_EGRESS()) { 307 err = bpf_map_update_elem(map, &zero, &ctx.p1, BPF_NOEXIST); 308 err |= bpf_map_update_elem(map, &one, &ctx.p2, BPF_NOEXIST); 309 } else { 310 ctx.skel->bss->verdict_dir = BPF_F_INGRESS; 311 err = bpf_map_update_elem(map, &zero, &ctx.p2, BPF_NOEXIST); 312 err |= bpf_map_update_elem(map, &one, &ctx.c2, BPF_NOEXIST); 313 } 314 315 if (err < 0) 316 return err; 317 318 return 0; 319 } 320 321 static void setup(void) 322 { 323 int err; 324 325 ctx.skel = bench_sockmap_prog__open_and_load(); 326 if (!ctx.skel) { 327 fprintf(stderr, "error loading skel\n"); 328 exit(1); 329 } 330 331 if (create_sockets()) { 332 fprintf(stderr, "create_net_mode error\n"); 333 goto err; 334 } 335 336 if (RXMODE_BPF()) { 337 err = setup_rx_sockmap(); 338 if (err) { 339 fprintf(stderr, "setup_rx_sockmap error:%d\n", err); 340 goto err; 341 } 342 } else if (TXMODE_BPF()) { 343 err = setup_tx_sockmap(); 344 if (err) { 345 fprintf(stderr, "setup_tx_sockmap error:%d\n", err); 346 goto err; 347 } 348 } else { 349 fprintf(stderr, "unknown sockmap bench mode: %d\n", ctx.mode); 350 goto err; 351 } 352 353 return; 354 355 err: 356 bench_sockmap_prog_destroy(); 357 exit(1); 358 } 359 360 static void measure(struct bench_res *res) 361 { 362 res->drops = atomic_swap(&ctx.prod_send, 0); 363 res->hits = atomic_swap(&ctx.skel->bss->process_byte, 0); 364 res->false_hits = atomic_swap(&ctx.user_read, 0); 365 res->important_hits = atomic_swap(&ctx.send_calls, 0); 366 res->important_hits |= atomic_swap(&ctx.read_calls, 0) << 32; 367 } 368 369 static void verify_data(int *check_pos, char *buf, int rcv) 370 { 371 for (int i = 0 ; i < rcv; i++) { 372 if (buf[i] != snd_data[(*check_pos) % DATA_REPEAT_SIZE]) { 373 fprintf(stderr, "verify data fail"); 374 exit(1); 375 } 376 (*check_pos)++; 377 if (*check_pos >= FILE_SIZE) 378 *check_pos = 0; 379 } 380 } 381 382 static void *consumer(void *input) 383 { 384 int rcv, sent; 385 int check_pos = 0; 386 int tid = (long)input; 387 int recv_buf_size = FILE_SIZE; 388 char *buf = malloc(recv_buf_size); 389 int delay_read = ctx.delay_consumer; 390 391 if (!buf) { 392 fprintf(stderr, "fail to init read buffer"); 393 return NULL; 394 } 395 396 while (true) { 397 if (tid == 1) { 398 /* consumer 1 is unused for tx test and stream verdict test */ 399 if (RXMODE_BPF() || TXMODE()) 400 return NULL; 401 /* it's only for RX_NORMAL which service as reserve-proxy mode */ 402 rcv = read(ctx.p1, buf, recv_buf_size); 403 if (rcv < 0) { 404 fprintf(stderr, "fail to read p1"); 405 return NULL; 406 } 407 408 sent = send(ctx.p2, buf, recv_buf_size, 0); 409 if (sent < 0) { 410 fprintf(stderr, "fail to send p2"); 411 return NULL; 412 } 413 } else { 414 if (delay_read != 0) { 415 if (delay_read < 0) 416 return NULL; 417 sleep(delay_read); 418 delay_read = 0; 419 } 420 /* read real endpoint by consumer 0 */ 421 atomic_inc(&ctx.read_calls); 422 rcv = read(ctx.c2, buf, recv_buf_size); 423 if (rcv < 0 && errno != EAGAIN) { 424 fprintf(stderr, "%s fail to read c2 %d\n", __func__, errno); 425 return NULL; 426 } 427 verify_data(&check_pos, buf, rcv); 428 atomic_add(&ctx.user_read, rcv); 429 } 430 } 431 432 return NULL; 433 } 434 435 static void *producer(void *input) 436 { 437 int off = 0, fp, need_sent, sent; 438 int file_size = ctx.file_size; 439 struct timespec ts1, ts2; 440 int target; 441 FILE *file; 442 443 file = tmpfile(); 444 if (!file) { 445 fprintf(stderr, "create file for sendfile"); 446 return NULL; 447 } 448 449 /* we need simple verify */ 450 for (int i = 0; i < file_size; i++) { 451 if (fwrite(&snd_data[off], sizeof(char), 1, file) != 1) { 452 fprintf(stderr, "init tmpfile error"); 453 return NULL; 454 } 455 if (++off >= sizeof(snd_data)) 456 off = 0; 457 } 458 fflush(file); 459 fseek(file, 0, SEEK_SET); 460 461 fp = fileno(file); 462 need_sent = file_size; 463 clock_gettime(CLOCK_MONOTONIC, &ts1); 464 465 if (RXMODE_BPF_VERDICT()) 466 target = ctx.c1; 467 else if (TXMODE_BPF_EGRESS()) 468 target = ctx.p1; 469 else 470 target = ctx.p2; 471 set_non_block(target, true); 472 while (true) { 473 if (ctx.prod_run_time) { 474 clock_gettime(CLOCK_MONOTONIC, &ts2); 475 if (ts2.tv_sec - ts1.tv_sec > ctx.prod_run_time) 476 return NULL; 477 } 478 479 errno = 0; 480 atomic_inc(&ctx.send_calls); 481 sent = sendfile(target, fp, NULL, need_sent); 482 if (sent < 0) { 483 if (errno != EAGAIN && errno != ENOMEM && errno != ENOBUFS) { 484 fprintf(stderr, "sendfile return %d, errorno %d:%s\n", 485 sent, errno, strerror(errno)); 486 return NULL; 487 } 488 continue; 489 } else if (sent < need_sent) { 490 need_sent -= sent; 491 atomic_add(&ctx.prod_send, sent); 492 continue; 493 } 494 atomic_add(&ctx.prod_send, need_sent); 495 need_sent = file_size; 496 lseek(fp, 0, SEEK_SET); 497 } 498 499 return NULL; 500 } 501 502 static void report_progress(int iter, struct bench_res *res, long delta_ns) 503 { 504 double speed_mbs, prod_mbs, bpf_mbs, send_hz, read_hz; 505 506 prod_mbs = res->drops / 1000000.0 / (delta_ns / 1000000000.0); 507 speed_mbs = res->false_hits / 1000000.0 / (delta_ns / 1000000000.0); 508 bpf_mbs = res->hits / 1000000.0 / (delta_ns / 1000000000.0); 509 send_hz = (res->important_hits & 0xFFFFFFFF) / (delta_ns / 1000000000.0); 510 read_hz = (res->important_hits >> 32) / (delta_ns / 1000000000.0); 511 512 printf("Iter %3d (%7.3lfus): ", 513 iter, (delta_ns - 1000000000) / 1000.0); 514 printf("Send Speed %8.3lf MB/s (%8.3lf calls/s), BPF Speed %8.3lf MB/s, " 515 "Rcv Speed %8.3lf MB/s (%8.3lf calls/s)\n", 516 prod_mbs, send_hz, bpf_mbs, speed_mbs, read_hz); 517 } 518 519 static void report_final(struct bench_res res[], int res_cnt) 520 { 521 double verdict_mbs_mean = 0.0; 522 long verdict_total = 0; 523 int i; 524 525 for (i = 0; i < res_cnt; i++) { 526 verdict_mbs_mean += res[i].hits / 1000000.0 / (0.0 + res_cnt); 527 verdict_total += res[i].hits / 1000000.0; 528 } 529 530 printf("Summary: total trans %8.3lu MB \u00B1 %5.3lf MB/s\n", 531 verdict_total, verdict_mbs_mean); 532 } 533 534 static const struct argp_option opts[] = { 535 { "rx-normal", ARG_FW_RX_NORMAL, NULL, 0, 536 "simple reserve-proxy mode, no bfp enabled"}, 537 { "rx-pass", ARG_FW_RX_PASS, NULL, 0, 538 "run bpf prog but no redir applied"}, 539 { "rx-strp", ARG_CTL_RX_STRP, "Byte", 0, 540 "enable strparser and set the encapsulation size"}, 541 { "rx-verdict-egress", ARG_FW_RX_VERDICT_EGRESS, NULL, 0, 542 "forward data with bpf(stream verdict)"}, 543 { "rx-verdict-ingress", ARG_FW_RX_VERDICT_INGRESS, NULL, 0, 544 "forward data with bpf(stream verdict)"}, 545 { "tx-normal", ARG_FW_TX_NORMAL, NULL, 0, 546 "simple c-s mode, no bfp enabled"}, 547 { "tx-pass", ARG_FW_TX_PASS, NULL, 0, 548 "run bpf prog but no redir applied"}, 549 { "tx-verdict-ingress", ARG_FW_TX_VERDICT_INGRESS, NULL, 0, 550 "forward msg to ingress queue of another socket"}, 551 { "tx-verdict-egress", ARG_FW_TX_VERDICT_EGRESS, NULL, 0, 552 "forward msg to egress queue of another socket"}, 553 { "delay-consumer", ARG_CONSUMER_DELAY_TIME, "SEC", 0, 554 "delay consumer start"}, 555 { "producer-duration", ARG_PRODUCER_DURATION, "SEC", 0, 556 "producer duration"}, 557 {}, 558 }; 559 560 static error_t parse_arg(int key, char *arg, struct argp_state *state) 561 { 562 switch (key) { 563 case ARG_FW_RX_NORMAL...ARG_FW_TX_VERDICT_EGRESS: 564 ctx.mode = key; 565 break; 566 case ARG_CONSUMER_DELAY_TIME: 567 ctx.delay_consumer = strtol(arg, NULL, 10); 568 break; 569 case ARG_PRODUCER_DURATION: 570 ctx.prod_run_time = strtol(arg, NULL, 10); 571 break; 572 case ARG_CTL_RX_STRP: 573 ctx.strp_size = strtol(arg, NULL, 10); 574 break; 575 default: 576 return ARGP_ERR_UNKNOWN; 577 } 578 579 return 0; 580 } 581 582 /* exported into benchmark runner */ 583 const struct argp bench_sockmap_argp = { 584 .options = opts, 585 .parser = parse_arg, 586 }; 587 588 /* Benchmark performance of creating bpf local storage */ 589 const struct bench bench_sockmap = { 590 .name = "sockmap", 591 .argp = &bench_sockmap_argp, 592 .validate = validate, 593 .setup = setup, 594 .producer_thread = producer, 595 .consumer_thread = consumer, 596 .measure = measure, 597 .report_progress = report_progress, 598 .report_final = report_final, 599 }; 600