1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * pipe_bench - exercise concurrent pipe operation 4 * 5 * N writer threads hammer a single pipe with multi-page writes; M reader 6 * threads drain it. Each writer records its own write() latency histogram. 7 * Multi-page writes (msgsize >= PAGE_SIZE) force the loop in 8 * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under 9 * pipe->mutex, which is the critical section the patch shrinks. 10 * 11 * By default the benchmark sweeps writers in {1, 2, 5} x readers in 12 * {1, 5, 10} and prints one block per configuration so two runs (e.g. 13 * baseline vs patched) can be diffed directly. Pass -w and -r to run a 14 * single configuration instead. Pass --memory-pressure to spawn stress-ng 15 * alongside the sweep so the per-page alloc_page() path under pipe->mutex 16 * has to dip into reclaim. 17 * 18 * Copyright (c) 2026 Meta Platforms, Inc. and affiliates 19 * Copyright (c) 2026 Breno Leitao <leitao@debian.org> 20 */ 21 22 #define _GNU_SOURCE 23 #include <errno.h> 24 #include <fcntl.h> 25 #include <getopt.h> 26 #include <poll.h> 27 #include <pthread.h> 28 #include <signal.h> 29 #include <stdatomic.h> 30 #include <stdint.h> 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <string.h> 34 #include <sys/wait.h> 35 #include <time.h> 36 #include <unistd.h> 37 38 #define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) 39 #define HIST_BUCKETS 32 40 41 static size_t g_msgsize = 16 * 4096; 42 static int g_duration = 3; 43 static int g_pipe_size = 1024 * 1024; 44 static int g_memory_pressure; 45 46 static atomic_int g_stop; 47 static int g_pipe[2]; 48 49 struct wstats { 50 uint64_t writes; 51 uint64_t bytes; 52 uint64_t lat_sum_ns; 53 uint64_t lat_max_ns; 54 uint64_t lat_hist[HIST_BUCKETS]; 55 char *buf; 56 }; 57 58 struct rstats { 59 char *buf; 60 }; 61 62 struct hist_totals { 63 uint64_t writes; 64 uint64_t bytes; 65 uint64_t lat_sum; 66 uint64_t lat_max; 67 }; 68 69 static inline uint64_t now_ns(void) 70 { 71 struct timespec ts; 72 73 clock_gettime(CLOCK_MONOTONIC, &ts); 74 return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; 75 } 76 77 static inline int log2_bucket(uint64_t v) 78 { 79 int b = 0; 80 81 if (!v) 82 return 0; 83 while (v >>= 1) 84 b++; 85 return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1; 86 } 87 88 static void *writer(void *arg) 89 { 90 struct wstats *s = arg; 91 92 while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) { 93 uint64_t t0 = now_ns(); 94 ssize_t n = write(g_pipe[1], s->buf, g_msgsize); 95 uint64_t dt = now_ns() - t0; 96 97 if (n > 0) { 98 s->writes++; 99 s->bytes += (uint64_t)n; 100 s->lat_sum_ns += dt; 101 if (dt > s->lat_max_ns) 102 s->lat_max_ns = dt; 103 s->lat_hist[log2_bucket(dt)]++; 104 } else if (n < 0 && (errno == EPIPE || errno == EBADF)) { 105 break; 106 } 107 } 108 return NULL; 109 } 110 111 static void *reader(void *arg) 112 { 113 struct rstats *s = arg; 114 115 /* 116 * Drain until EOF (write end closed by main). g_stop is not checked 117 * here on purpose: writers may be blocked in write() with the pipe 118 * full when g_stop is set, so the reader must keep draining until 119 * main closes the write end. 120 */ 121 for (;;) { 122 ssize_t n = read(g_pipe[0], s->buf, g_msgsize); 123 124 if (n <= 0) 125 break; 126 } 127 return NULL; 128 } 129 130 /* Sum per-writer stats and per-bucket counts into the caller's aggregates. */ 131 static void aggregate_wstats(struct wstats *all, int nw, 132 uint64_t agg[HIST_BUCKETS], 133 struct hist_totals *t) 134 { 135 memset(t, 0, sizeof(*t)); 136 for (int i = 0; i < nw; i++) { 137 t->writes += all[i].writes; 138 t->bytes += all[i].bytes; 139 t->lat_sum += all[i].lat_sum_ns; 140 if (all[i].lat_max_ns > t->lat_max) 141 t->lat_max = all[i].lat_max_ns; 142 for (int b = 0; b < HIST_BUCKETS; b++) 143 agg[b] += all[i].lat_hist[b]; 144 } 145 } 146 147 /* 148 * Walk @agg in order, returning the inclusive upper bound (in ns) of the 149 * log2 bucket where the running sum first reaches @target. 150 * 151 * A percentile is undefined with zero samples, and with very low sample 152 * counts integer truncation could make @target zero -- then "cum >= 0" 153 * would latch on the first (possibly empty) bucket. Callers must pass 154 * @target >= 1. 155 */ 156 static uint64_t bucket_at(const uint64_t agg[HIST_BUCKETS], uint64_t target) 157 { 158 uint64_t cum = 0; 159 160 for (int b = 0; b < HIST_BUCKETS; b++) { 161 /* HIST_BUCKETS <= 63, so (b + 1) is always a safe shift. */ 162 uint64_t upper = (1ULL << (b + 1)) - 1; 163 164 cum += agg[b]; 165 if (cum >= target) 166 return upper; 167 } 168 return 0; 169 } 170 171 static void compute_p50_p99(const uint64_t agg[HIST_BUCKETS], uint64_t writes, 172 uint64_t *p50, uint64_t *p99) 173 { 174 uint64_t p50_target, p99_target; 175 176 *p50 = *p99 = 0; 177 if (!writes) 178 return; 179 180 p50_target = writes * 50 / 100; 181 p99_target = writes * 99 / 100; 182 if (!p50_target) 183 p50_target = 1; 184 if (!p99_target) 185 p99_target = 1; 186 187 *p50 = bucket_at(agg, p50_target); 188 *p99 = bucket_at(agg, p99_target); 189 } 190 191 static void print_summary(int nw, int nr, const struct hist_totals *t, 192 uint64_t p50, uint64_t p99) 193 { 194 double sec = g_duration; 195 uint64_t avg_ns = t->writes ? t->lat_sum / t->writes : 0; 196 197 printf("config: writers=%d readers=%d msgsize=%zu duration=%d pipe_size=%d memory_pressure=%s\n", 198 nw, nr, g_msgsize, g_duration, g_pipe_size, 199 g_memory_pressure ? "yes" : "no"); 200 printf("writes: total=%llu rate=%.0f/s\n", 201 (unsigned long long)t->writes, (double)t->writes / sec); 202 printf("throughput_MBps: %.2f\n", 203 ((double)t->bytes / sec) / (1024.0 * 1024.0)); 204 printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns); 205 printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50); 206 printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99); 207 printf("lat_max_ns: %llu\n", (unsigned long long)t->lat_max); 208 } 209 210 static void summarize(struct wstats *all, int nw, int nr) 211 { 212 uint64_t agg[HIST_BUCKETS] = {0}; 213 struct hist_totals t; 214 uint64_t p50, p99; 215 216 aggregate_wstats(all, nw, agg, &t); 217 compute_p50_p99(agg, t.writes, &p50, &p99); 218 print_summary(nw, nr, &t, p50, p99); 219 } 220 221 /* 222 * Child branch of fork(): restore SIGPIPE to default (parent ignores it), 223 * exec stress-ng, and on failure write the reason into @hs_wr before 224 * exiting. The parent observes EOF on hs_wr (closed via O_CLOEXEC) when 225 * exec succeeds. 226 */ 227 static void stress_ng_child(int hs_wr) __attribute__((noreturn)); 228 static void stress_ng_child(int hs_wr) 229 { 230 char errbuf[256]; 231 232 signal(SIGPIPE, SIG_DFL); 233 execlp("stress-ng", "stress-ng", 234 "--vm", "4", "--vm-bytes", "80%", 235 "--vm-method", "all", 236 (char *)NULL); 237 snprintf(errbuf, sizeof(errbuf), 238 "exec stress-ng failed: %s\n", strerror(errno)); 239 (void)!write(hs_wr, errbuf, strlen(errbuf)); 240 _exit(127); 241 } 242 243 /* 244 * Read from the O_CLOEXEC handshake pipe. Anything readable means the 245 * child wrote an error before exec; EOF (n == 0) means the write-end 246 * closed because exec succeeded. Returns 0 on exec success, -1 if the 247 * child failed and was reaped. 248 */ 249 static int stress_ng_wait_handshake(int hs_rd, pid_t pid) 250 { 251 struct pollfd pfd = { .fd = hs_rd, .events = POLLIN }; 252 char errbuf[256]; 253 int status; 254 int ret; 255 256 ret = poll(&pfd, 1, 500); 257 if (ret <= 0) 258 return 0; 259 260 ssize_t n = read(hs_rd, errbuf, sizeof(errbuf) - 1); 261 262 if (n > 0) { 263 errbuf[n] = '\0'; 264 fputs(errbuf, stderr); 265 waitpid(pid, &status, 0); 266 return -1; 267 } 268 return 0; 269 } 270 271 static pid_t spawn_stress_ng(void) 272 { 273 int hs[2]; 274 pid_t pid; 275 276 /* 277 * Handshake pipe: child writes one byte and _exit()s on exec 278 * failure. On exec success the O_CLOEXEC flag closes the write 279 * end, which the parent observes as EOF. This makes the "is 280 * stress-ng on $PATH?" check fail fast rather than silently. 281 */ 282 if (pipe2(hs, O_CLOEXEC) < 0) { 283 perror("pipe2"); 284 return -1; 285 } 286 287 pid = fork(); 288 if (pid < 0) { 289 perror("fork"); 290 close(hs[0]); 291 close(hs[1]); 292 return -1; 293 } 294 if (pid == 0) { 295 close(hs[0]); 296 stress_ng_child(hs[1]); 297 } 298 299 close(hs[1]); 300 if (stress_ng_wait_handshake(hs[0], pid) < 0) { 301 close(hs[0]); 302 return -1; 303 } 304 close(hs[0]); 305 306 /* Give stress-ng a moment to map its VM regions before measuring. */ 307 sleep(1); 308 return pid; 309 } 310 311 static void kill_stress_ng(pid_t pid) 312 { 313 int status; 314 315 if (pid <= 0) 316 return; 317 kill(pid, SIGTERM); 318 for (int i = 0; i < 20; i++) { 319 if (waitpid(pid, &status, WNOHANG) > 0) 320 return; 321 usleep(100 * 1000); 322 } 323 kill(pid, SIGKILL); 324 waitpid(pid, &status, 0); 325 } 326 327 /* 328 * Allocate per-thread page-aligned buffers in main so a failed 329 * aligned_alloc() aborts the run before any thread starts. Workers used 330 * to allocate their own buffer and return NULL on failure, which left 331 * peers blocked in write()/read() with nobody to unblock them. 332 */ 333 static int alloc_thread_bufs(struct wstats *ws, int nw, 334 struct rstats *rs, int nr) 335 { 336 for (int i = 0; i < nw; i++) { 337 ws[i].buf = aligned_alloc(4096, g_msgsize); 338 if (!ws[i].buf) { 339 fprintf(stderr, "writer %d: aligned_alloc(%zu) failed\n", 340 i, g_msgsize); 341 return -1; 342 } 343 memset(ws[i].buf, 0xAA, g_msgsize); 344 } 345 for (int i = 0; i < nr; i++) { 346 rs[i].buf = aligned_alloc(4096, g_msgsize); 347 if (!rs[i].buf) { 348 fprintf(stderr, "reader %d: aligned_alloc(%zu) failed\n", 349 i, g_msgsize); 350 return -1; 351 } 352 } 353 return 0; 354 } 355 356 static void free_thread_bufs(struct wstats *ws, int nw, 357 struct rstats *rs, int nr) 358 { 359 if (ws) 360 for (int i = 0; i < nw; i++) 361 free(ws[i].buf); 362 if (rs) 363 for (int i = 0; i < nr; i++) 364 free(rs[i].buf); 365 } 366 367 static int start_readers(pthread_t *rt, struct rstats *rs, int nr, 368 int *created) 369 { 370 for (int i = 0; i < nr; i++) { 371 int err = pthread_create(&rt[i], NULL, reader, &rs[i]); 372 373 if (err) { 374 fprintf(stderr, "pthread_create reader %d: %s\n", 375 i, strerror(err)); 376 return -1; 377 } 378 (*created)++; 379 } 380 return 0; 381 } 382 383 static int start_writers(pthread_t *wt, struct wstats *ws, int nw, 384 int *created) 385 { 386 for (int i = 0; i < nw; i++) { 387 int err = pthread_create(&wt[i], NULL, writer, &ws[i]); 388 389 if (err) { 390 fprintf(stderr, "pthread_create writer %d: %s\n", 391 i, strerror(err)); 392 return -1; 393 } 394 (*created)++; 395 } 396 return 0; 397 } 398 399 static int open_bench_pipe(void) 400 { 401 if (pipe(g_pipe) < 0) { 402 perror("pipe"); 403 return -1; 404 } 405 if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0) 406 perror("F_SETPIPE_SZ (continuing)"); 407 return 0; 408 } 409 410 /* 411 * Normal termination: g_stop tells writers to leave the loop after the 412 * current write() returns. Closing the shared write-end fd means once 413 * the in-flight writes drain, readers see EOF and exit. Writers are not 414 * unblocked by EPIPE here -- g_pipe[0] stays open so readers can keep 415 * draining. 416 * 417 * Error path: some threads may have been created and others skipped, so 418 * writers could be blocked in write() with no reader making progress. 419 * Close both ends -- closing the read end is what delivers EPIPE to a 420 * blocked writer. 421 */ 422 static void stop_and_join(pthread_t *wt, int nw_created, 423 pthread_t *rt, int nr_created, int rc) 424 { 425 atomic_store(&g_stop, 1); 426 close(g_pipe[1]); 427 if (rc < 0) 428 close(g_pipe[0]); 429 for (int i = 0; i < nw_created; i++) 430 pthread_join(wt[i], NULL); 431 for (int i = 0; i < nr_created; i++) 432 pthread_join(rt[i], NULL); 433 if (rc == 0) 434 close(g_pipe[0]); 435 } 436 437 static int run_one(int nw, int nr) 438 { 439 pthread_t *wt = NULL, *rt = NULL; 440 struct wstats *ws = NULL; 441 struct rstats *rs = NULL; 442 int nw_created = 0, nr_created = 0; 443 int rc = 0; 444 445 atomic_store(&g_stop, 0); 446 447 if (open_bench_pipe() < 0) 448 return -1; 449 450 wt = calloc((size_t)nw, sizeof(*wt)); 451 rt = calloc((size_t)nr, sizeof(*rt)); 452 ws = calloc((size_t)nw, sizeof(*ws)); 453 rs = calloc((size_t)nr, sizeof(*rs)); 454 if (!wt || !rt || !ws || !rs) { 455 fprintf(stderr, "alloc failed\n"); 456 rc = -1; 457 goto teardown; 458 } 459 460 if (alloc_thread_bufs(ws, nw, rs, nr) < 0) { 461 rc = -1; 462 goto teardown; 463 } 464 465 if (start_readers(rt, rs, nr, &nr_created) < 0 || 466 start_writers(wt, ws, nw, &nw_created) < 0) { 467 rc = -1; 468 goto teardown; 469 } 470 471 sleep((unsigned int)g_duration); 472 473 teardown: 474 stop_and_join(wt, nw_created, rt, nr_created, rc); 475 476 if (rc == 0) { 477 summarize(ws, nw, nr); 478 fflush(stdout); 479 } 480 481 free_thread_bufs(ws, nw, rs, nr); 482 free(wt); 483 free(rt); 484 free(ws); 485 free(rs); 486 return rc; 487 } 488 489 static void usage(const char *prog) 490 { 491 fprintf(stderr, 492 "usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_size] [--memory-pressure]\n" 493 " default: sweep writers={1,2,5} x readers={1,5,10}\n" 494 " --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-method all) for the run\n", 495 prog); 496 } 497 498 static int parse_args(int argc, char **argv, 499 int *writers_override, int *readers_override) 500 { 501 static const struct option long_opts[] = { 502 {"memory-pressure", no_argument, NULL, 'M'}, 503 {0, 0, 0, 0}, 504 }; 505 int opt; 506 507 while ((opt = getopt_long(argc, argv, "w:r:s:d:p:", 508 long_opts, NULL)) != -1) { 509 switch (opt) { 510 case 'w': 511 *writers_override = atoi(optarg); 512 break; 513 case 'r': 514 *readers_override = atoi(optarg); 515 break; 516 case 's': 517 g_msgsize = (size_t)atol(optarg); 518 break; 519 case 'd': 520 g_duration = atoi(optarg); 521 break; 522 case 'p': 523 g_pipe_size = atoi(optarg); 524 break; 525 case 'M': 526 g_memory_pressure = 1; 527 break; 528 default: 529 usage(argv[0]); 530 return -1; 531 } 532 } 533 return 0; 534 } 535 536 /* 537 * aligned_alloc(4096, size) requires size to be a multiple of the 538 * alignment (C11); glibc returns NULL otherwise, which would make 539 * writer/reader threads silently exit and the run report zero writes. 540 * Validate up front instead. 541 */ 542 static int validate_args(void) 543 { 544 if (g_msgsize == 0 || g_msgsize % 4096 != 0) { 545 fprintf(stderr, 546 "msgsize must be a positive multiple of 4096 (got %zu)\n", 547 g_msgsize); 548 return -1; 549 } 550 if (g_duration <= 0) { 551 fprintf(stderr, "duration must be > 0 seconds (got %d)\n", 552 g_duration); 553 return -1; 554 } 555 if (g_pipe_size <= 0) { 556 fprintf(stderr, "pipe_size must be > 0 bytes (got %d)\n", 557 g_pipe_size); 558 return -1; 559 } 560 return 0; 561 } 562 563 static int run_sweep(void) 564 { 565 static const int writers_sweep[] = {1, 2, 5}; 566 static const int readers_sweep[] = {1, 5, 10}; 567 568 for (size_t i = 0; i < ARRAY_SIZE(writers_sweep); i++) { 569 for (size_t j = 0; j < ARRAY_SIZE(readers_sweep); j++) { 570 printf("---\n"); 571 if (run_one(writers_sweep[i], readers_sweep[j]) < 0) 572 return -1; 573 } 574 } 575 return 0; 576 } 577 578 int main(int argc, char **argv) 579 { 580 int writers_override = 0, readers_override = 0; 581 pid_t stress_pid = -1; 582 int rc = 0; 583 584 if (parse_args(argc, argv, &writers_override, &readers_override) < 0) 585 return 1; 586 if (validate_args() < 0) 587 return 1; 588 589 signal(SIGPIPE, SIG_IGN); 590 setvbuf(stdout, NULL, _IOLBF, 0); 591 setvbuf(stderr, NULL, _IOLBF, 0); 592 593 fprintf(stderr, "pid=%d\n", getpid()); 594 fflush(stderr); 595 596 if (g_memory_pressure) { 597 stress_pid = spawn_stress_ng(); 598 if (stress_pid < 0) { 599 fprintf(stderr, 600 "memory_pressure requested but stress-ng could not be spawned\n"); 601 return 1; 602 } 603 } 604 605 if (writers_override > 0 || readers_override > 0) { 606 int nw = writers_override > 0 ? writers_override : 1; 607 int nr = readers_override > 0 ? readers_override : 1; 608 609 rc = run_one(nw, nr) < 0 ? 1 : 0; 610 } else { 611 rc = run_sweep() < 0 ? 1 : 0; 612 } 613 614 kill_stress_ng(stress_pid); 615 return rc; 616 } 617