1*d29bd8efSBreno Leitao // SPDX-License-Identifier: GPL-2.0 2*d29bd8efSBreno Leitao /* 3*d29bd8efSBreno Leitao * pipe_bench - exercise concurrent pipe operation 4*d29bd8efSBreno Leitao * 5*d29bd8efSBreno Leitao * N writer threads hammer a single pipe with multi-page writes; M reader 6*d29bd8efSBreno Leitao * threads drain it. Each writer records its own write() latency histogram. 7*d29bd8efSBreno Leitao * Multi-page writes (msgsize >= PAGE_SIZE) force the loop in 8*d29bd8efSBreno Leitao * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under 9*d29bd8efSBreno Leitao * pipe->mutex, which is the critical section the patch shrinks. 10*d29bd8efSBreno Leitao * 11*d29bd8efSBreno Leitao * By default the benchmark sweeps writers in {1, 2, 5} x readers in 12*d29bd8efSBreno Leitao * {1, 5, 10} and prints one block per configuration so two runs (e.g. 13*d29bd8efSBreno Leitao * baseline vs patched) can be diffed directly. Pass -w and -r to run a 14*d29bd8efSBreno Leitao * single configuration instead. Pass --memory-pressure to spawn stress-ng 15*d29bd8efSBreno Leitao * alongside the sweep so the per-page alloc_page() path under pipe->mutex 16*d29bd8efSBreno Leitao * has to dip into reclaim. 17*d29bd8efSBreno Leitao * 18*d29bd8efSBreno Leitao * Copyright (c) 2026 Meta Platforms, Inc. and affiliates 19*d29bd8efSBreno Leitao * Copyright (c) 2026 Breno Leitao <leitao@debian.org> 20*d29bd8efSBreno Leitao */ 21*d29bd8efSBreno Leitao 22*d29bd8efSBreno Leitao #define _GNU_SOURCE 23*d29bd8efSBreno Leitao #include <errno.h> 24*d29bd8efSBreno Leitao #include <fcntl.h> 25*d29bd8efSBreno Leitao #include <getopt.h> 26*d29bd8efSBreno Leitao #include <poll.h> 27*d29bd8efSBreno Leitao #include <pthread.h> 28*d29bd8efSBreno Leitao #include <signal.h> 29*d29bd8efSBreno Leitao #include <stdatomic.h> 30*d29bd8efSBreno Leitao #include <stdint.h> 31*d29bd8efSBreno Leitao #include <stdio.h> 32*d29bd8efSBreno Leitao #include <stdlib.h> 33*d29bd8efSBreno Leitao #include <string.h> 34*d29bd8efSBreno Leitao #include <sys/wait.h> 35*d29bd8efSBreno Leitao #include <time.h> 36*d29bd8efSBreno Leitao #include <unistd.h> 37*d29bd8efSBreno Leitao 38*d29bd8efSBreno Leitao #define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) 39*d29bd8efSBreno Leitao #define HIST_BUCKETS 32 40*d29bd8efSBreno Leitao 41*d29bd8efSBreno Leitao static size_t g_msgsize = 16 * 4096; 42*d29bd8efSBreno Leitao static int g_duration = 3; 43*d29bd8efSBreno Leitao static int g_pipe_size = 1024 * 1024; 44*d29bd8efSBreno Leitao static int g_memory_pressure; 45*d29bd8efSBreno Leitao 46*d29bd8efSBreno Leitao static atomic_int g_stop; 47*d29bd8efSBreno Leitao static int g_pipe[2]; 48*d29bd8efSBreno Leitao 49*d29bd8efSBreno Leitao struct wstats { 50*d29bd8efSBreno Leitao uint64_t writes; 51*d29bd8efSBreno Leitao uint64_t bytes; 52*d29bd8efSBreno Leitao uint64_t lat_sum_ns; 53*d29bd8efSBreno Leitao uint64_t lat_max_ns; 54*d29bd8efSBreno Leitao uint64_t lat_hist[HIST_BUCKETS]; 55*d29bd8efSBreno Leitao char *buf; 56*d29bd8efSBreno Leitao }; 57*d29bd8efSBreno Leitao 58*d29bd8efSBreno Leitao struct rstats { 59*d29bd8efSBreno Leitao char *buf; 60*d29bd8efSBreno Leitao }; 61*d29bd8efSBreno Leitao 62*d29bd8efSBreno Leitao struct hist_totals { 63*d29bd8efSBreno Leitao uint64_t writes; 64*d29bd8efSBreno Leitao uint64_t bytes; 65*d29bd8efSBreno Leitao uint64_t lat_sum; 66*d29bd8efSBreno Leitao uint64_t lat_max; 67*d29bd8efSBreno Leitao }; 68*d29bd8efSBreno Leitao 69*d29bd8efSBreno Leitao static inline uint64_t now_ns(void) 70*d29bd8efSBreno Leitao { 71*d29bd8efSBreno Leitao struct timespec ts; 72*d29bd8efSBreno Leitao 73*d29bd8efSBreno Leitao clock_gettime(CLOCK_MONOTONIC, &ts); 74*d29bd8efSBreno Leitao return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; 75*d29bd8efSBreno Leitao } 76*d29bd8efSBreno Leitao 77*d29bd8efSBreno Leitao static inline int log2_bucket(uint64_t v) 78*d29bd8efSBreno Leitao { 79*d29bd8efSBreno Leitao int b = 0; 80*d29bd8efSBreno Leitao 81*d29bd8efSBreno Leitao if (!v) 82*d29bd8efSBreno Leitao return 0; 83*d29bd8efSBreno Leitao while (v >>= 1) 84*d29bd8efSBreno Leitao b++; 85*d29bd8efSBreno Leitao return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1; 86*d29bd8efSBreno Leitao } 87*d29bd8efSBreno Leitao 88*d29bd8efSBreno Leitao static void *writer(void *arg) 89*d29bd8efSBreno Leitao { 90*d29bd8efSBreno Leitao struct wstats *s = arg; 91*d29bd8efSBreno Leitao 92*d29bd8efSBreno Leitao while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) { 93*d29bd8efSBreno Leitao uint64_t t0 = now_ns(); 94*d29bd8efSBreno Leitao ssize_t n = write(g_pipe[1], s->buf, g_msgsize); 95*d29bd8efSBreno Leitao uint64_t dt = now_ns() - t0; 96*d29bd8efSBreno Leitao 97*d29bd8efSBreno Leitao if (n > 0) { 98*d29bd8efSBreno Leitao s->writes++; 99*d29bd8efSBreno Leitao s->bytes += (uint64_t)n; 100*d29bd8efSBreno Leitao s->lat_sum_ns += dt; 101*d29bd8efSBreno Leitao if (dt > s->lat_max_ns) 102*d29bd8efSBreno Leitao s->lat_max_ns = dt; 103*d29bd8efSBreno Leitao s->lat_hist[log2_bucket(dt)]++; 104*d29bd8efSBreno Leitao } else if (n < 0 && (errno == EPIPE || errno == EBADF)) { 105*d29bd8efSBreno Leitao break; 106*d29bd8efSBreno Leitao } 107*d29bd8efSBreno Leitao } 108*d29bd8efSBreno Leitao return NULL; 109*d29bd8efSBreno Leitao } 110*d29bd8efSBreno Leitao 111*d29bd8efSBreno Leitao static void *reader(void *arg) 112*d29bd8efSBreno Leitao { 113*d29bd8efSBreno Leitao struct rstats *s = arg; 114*d29bd8efSBreno Leitao 115*d29bd8efSBreno Leitao /* 116*d29bd8efSBreno Leitao * Drain until EOF (write end closed by main). g_stop is not checked 117*d29bd8efSBreno Leitao * here on purpose: writers may be blocked in write() with the pipe 118*d29bd8efSBreno Leitao * full when g_stop is set, so the reader must keep draining until 119*d29bd8efSBreno Leitao * main closes the write end. 120*d29bd8efSBreno Leitao */ 121*d29bd8efSBreno Leitao for (;;) { 122*d29bd8efSBreno Leitao ssize_t n = read(g_pipe[0], s->buf, g_msgsize); 123*d29bd8efSBreno Leitao 124*d29bd8efSBreno Leitao if (n <= 0) 125*d29bd8efSBreno Leitao break; 126*d29bd8efSBreno Leitao } 127*d29bd8efSBreno Leitao return NULL; 128*d29bd8efSBreno Leitao } 129*d29bd8efSBreno Leitao 130*d29bd8efSBreno Leitao /* Sum per-writer stats and per-bucket counts into the caller's aggregates. */ 131*d29bd8efSBreno Leitao static void aggregate_wstats(struct wstats *all, int nw, 132*d29bd8efSBreno Leitao uint64_t agg[HIST_BUCKETS], 133*d29bd8efSBreno Leitao struct hist_totals *t) 134*d29bd8efSBreno Leitao { 135*d29bd8efSBreno Leitao memset(t, 0, sizeof(*t)); 136*d29bd8efSBreno Leitao for (int i = 0; i < nw; i++) { 137*d29bd8efSBreno Leitao t->writes += all[i].writes; 138*d29bd8efSBreno Leitao t->bytes += all[i].bytes; 139*d29bd8efSBreno Leitao t->lat_sum += all[i].lat_sum_ns; 140*d29bd8efSBreno Leitao if (all[i].lat_max_ns > t->lat_max) 141*d29bd8efSBreno Leitao t->lat_max = all[i].lat_max_ns; 142*d29bd8efSBreno Leitao for (int b = 0; b < HIST_BUCKETS; b++) 143*d29bd8efSBreno Leitao agg[b] += all[i].lat_hist[b]; 144*d29bd8efSBreno Leitao } 145*d29bd8efSBreno Leitao } 146*d29bd8efSBreno Leitao 147*d29bd8efSBreno Leitao /* 148*d29bd8efSBreno Leitao * Walk @agg in order, returning the inclusive upper bound (in ns) of the 149*d29bd8efSBreno Leitao * log2 bucket where the running sum first reaches @target. 150*d29bd8efSBreno Leitao * 151*d29bd8efSBreno Leitao * A percentile is undefined with zero samples, and with very low sample 152*d29bd8efSBreno Leitao * counts integer truncation could make @target zero -- then "cum >= 0" 153*d29bd8efSBreno Leitao * would latch on the first (possibly empty) bucket. Callers must pass 154*d29bd8efSBreno Leitao * @target >= 1. 155*d29bd8efSBreno Leitao */ 156*d29bd8efSBreno Leitao static uint64_t bucket_at(const uint64_t agg[HIST_BUCKETS], uint64_t target) 157*d29bd8efSBreno Leitao { 158*d29bd8efSBreno Leitao uint64_t cum = 0; 159*d29bd8efSBreno Leitao 160*d29bd8efSBreno Leitao for (int b = 0; b < HIST_BUCKETS; b++) { 161*d29bd8efSBreno Leitao /* HIST_BUCKETS <= 63, so (b + 1) is always a safe shift. */ 162*d29bd8efSBreno Leitao uint64_t upper = (1ULL << (b + 1)) - 1; 163*d29bd8efSBreno Leitao 164*d29bd8efSBreno Leitao cum += agg[b]; 165*d29bd8efSBreno Leitao if (cum >= target) 166*d29bd8efSBreno Leitao return upper; 167*d29bd8efSBreno Leitao } 168*d29bd8efSBreno Leitao return 0; 169*d29bd8efSBreno Leitao } 170*d29bd8efSBreno Leitao 171*d29bd8efSBreno Leitao static void compute_p50_p99(const uint64_t agg[HIST_BUCKETS], uint64_t writes, 172*d29bd8efSBreno Leitao uint64_t *p50, uint64_t *p99) 173*d29bd8efSBreno Leitao { 174*d29bd8efSBreno Leitao uint64_t p50_target, p99_target; 175*d29bd8efSBreno Leitao 176*d29bd8efSBreno Leitao *p50 = *p99 = 0; 177*d29bd8efSBreno Leitao if (!writes) 178*d29bd8efSBreno Leitao return; 179*d29bd8efSBreno Leitao 180*d29bd8efSBreno Leitao p50_target = writes * 50 / 100; 181*d29bd8efSBreno Leitao p99_target = writes * 99 / 100; 182*d29bd8efSBreno Leitao if (!p50_target) 183*d29bd8efSBreno Leitao p50_target = 1; 184*d29bd8efSBreno Leitao if (!p99_target) 185*d29bd8efSBreno Leitao p99_target = 1; 186*d29bd8efSBreno Leitao 187*d29bd8efSBreno Leitao *p50 = bucket_at(agg, p50_target); 188*d29bd8efSBreno Leitao *p99 = bucket_at(agg, p99_target); 189*d29bd8efSBreno Leitao } 190*d29bd8efSBreno Leitao 191*d29bd8efSBreno Leitao static void print_summary(int nw, int nr, const struct hist_totals *t, 192*d29bd8efSBreno Leitao uint64_t p50, uint64_t p99) 193*d29bd8efSBreno Leitao { 194*d29bd8efSBreno Leitao double sec = g_duration; 195*d29bd8efSBreno Leitao uint64_t avg_ns = t->writes ? t->lat_sum / t->writes : 0; 196*d29bd8efSBreno Leitao 197*d29bd8efSBreno Leitao printf("config: writers=%d readers=%d msgsize=%zu duration=%d pipe_size=%d memory_pressure=%s\n", 198*d29bd8efSBreno Leitao nw, nr, g_msgsize, g_duration, g_pipe_size, 199*d29bd8efSBreno Leitao g_memory_pressure ? "yes" : "no"); 200*d29bd8efSBreno Leitao printf("writes: total=%llu rate=%.0f/s\n", 201*d29bd8efSBreno Leitao (unsigned long long)t->writes, (double)t->writes / sec); 202*d29bd8efSBreno Leitao printf("throughput_MBps: %.2f\n", 203*d29bd8efSBreno Leitao ((double)t->bytes / sec) / (1024.0 * 1024.0)); 204*d29bd8efSBreno Leitao printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns); 205*d29bd8efSBreno Leitao printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50); 206*d29bd8efSBreno Leitao printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99); 207*d29bd8efSBreno Leitao printf("lat_max_ns: %llu\n", (unsigned long long)t->lat_max); 208*d29bd8efSBreno Leitao } 209*d29bd8efSBreno Leitao 210*d29bd8efSBreno Leitao static void summarize(struct wstats *all, int nw, int nr) 211*d29bd8efSBreno Leitao { 212*d29bd8efSBreno Leitao uint64_t agg[HIST_BUCKETS] = {0}; 213*d29bd8efSBreno Leitao struct hist_totals t; 214*d29bd8efSBreno Leitao uint64_t p50, p99; 215*d29bd8efSBreno Leitao 216*d29bd8efSBreno Leitao aggregate_wstats(all, nw, agg, &t); 217*d29bd8efSBreno Leitao compute_p50_p99(agg, t.writes, &p50, &p99); 218*d29bd8efSBreno Leitao print_summary(nw, nr, &t, p50, p99); 219*d29bd8efSBreno Leitao } 220*d29bd8efSBreno Leitao 221*d29bd8efSBreno Leitao /* 222*d29bd8efSBreno Leitao * Child branch of fork(): restore SIGPIPE to default (parent ignores it), 223*d29bd8efSBreno Leitao * exec stress-ng, and on failure write the reason into @hs_wr before 224*d29bd8efSBreno Leitao * exiting. The parent observes EOF on hs_wr (closed via O_CLOEXEC) when 225*d29bd8efSBreno Leitao * exec succeeds. 226*d29bd8efSBreno Leitao */ 227*d29bd8efSBreno Leitao static void stress_ng_child(int hs_wr) __attribute__((noreturn)); 228*d29bd8efSBreno Leitao static void stress_ng_child(int hs_wr) 229*d29bd8efSBreno Leitao { 230*d29bd8efSBreno Leitao char errbuf[256]; 231*d29bd8efSBreno Leitao 232*d29bd8efSBreno Leitao signal(SIGPIPE, SIG_DFL); 233*d29bd8efSBreno Leitao execlp("stress-ng", "stress-ng", 234*d29bd8efSBreno Leitao "--vm", "4", "--vm-bytes", "80%", 235*d29bd8efSBreno Leitao "--vm-method", "all", 236*d29bd8efSBreno Leitao (char *)NULL); 237*d29bd8efSBreno Leitao snprintf(errbuf, sizeof(errbuf), 238*d29bd8efSBreno Leitao "exec stress-ng failed: %s\n", strerror(errno)); 239*d29bd8efSBreno Leitao (void)!write(hs_wr, errbuf, strlen(errbuf)); 240*d29bd8efSBreno Leitao _exit(127); 241*d29bd8efSBreno Leitao } 242*d29bd8efSBreno Leitao 243*d29bd8efSBreno Leitao /* 244*d29bd8efSBreno Leitao * Read from the O_CLOEXEC handshake pipe. Anything readable means the 245*d29bd8efSBreno Leitao * child wrote an error before exec; EOF (n == 0) means the write-end 246*d29bd8efSBreno Leitao * closed because exec succeeded. Returns 0 on exec success, -1 if the 247*d29bd8efSBreno Leitao * child failed and was reaped. 248*d29bd8efSBreno Leitao */ 249*d29bd8efSBreno Leitao static int stress_ng_wait_handshake(int hs_rd, pid_t pid) 250*d29bd8efSBreno Leitao { 251*d29bd8efSBreno Leitao struct pollfd pfd = { .fd = hs_rd, .events = POLLIN }; 252*d29bd8efSBreno Leitao char errbuf[256]; 253*d29bd8efSBreno Leitao int status; 254*d29bd8efSBreno Leitao int ret; 255*d29bd8efSBreno Leitao 256*d29bd8efSBreno Leitao ret = poll(&pfd, 1, 500); 257*d29bd8efSBreno Leitao if (ret <= 0) 258*d29bd8efSBreno Leitao return 0; 259*d29bd8efSBreno Leitao 260*d29bd8efSBreno Leitao ssize_t n = read(hs_rd, errbuf, sizeof(errbuf) - 1); 261*d29bd8efSBreno Leitao 262*d29bd8efSBreno Leitao if (n > 0) { 263*d29bd8efSBreno Leitao errbuf[n] = '\0'; 264*d29bd8efSBreno Leitao fputs(errbuf, stderr); 265*d29bd8efSBreno Leitao waitpid(pid, &status, 0); 266*d29bd8efSBreno Leitao return -1; 267*d29bd8efSBreno Leitao } 268*d29bd8efSBreno Leitao return 0; 269*d29bd8efSBreno Leitao } 270*d29bd8efSBreno Leitao 271*d29bd8efSBreno Leitao static pid_t spawn_stress_ng(void) 272*d29bd8efSBreno Leitao { 273*d29bd8efSBreno Leitao int hs[2]; 274*d29bd8efSBreno Leitao pid_t pid; 275*d29bd8efSBreno Leitao 276*d29bd8efSBreno Leitao /* 277*d29bd8efSBreno Leitao * Handshake pipe: child writes one byte and _exit()s on exec 278*d29bd8efSBreno Leitao * failure. On exec success the O_CLOEXEC flag closes the write 279*d29bd8efSBreno Leitao * end, which the parent observes as EOF. This makes the "is 280*d29bd8efSBreno Leitao * stress-ng on $PATH?" check fail fast rather than silently. 281*d29bd8efSBreno Leitao */ 282*d29bd8efSBreno Leitao if (pipe2(hs, O_CLOEXEC) < 0) { 283*d29bd8efSBreno Leitao perror("pipe2"); 284*d29bd8efSBreno Leitao return -1; 285*d29bd8efSBreno Leitao } 286*d29bd8efSBreno Leitao 287*d29bd8efSBreno Leitao pid = fork(); 288*d29bd8efSBreno Leitao if (pid < 0) { 289*d29bd8efSBreno Leitao perror("fork"); 290*d29bd8efSBreno Leitao close(hs[0]); 291*d29bd8efSBreno Leitao close(hs[1]); 292*d29bd8efSBreno Leitao return -1; 293*d29bd8efSBreno Leitao } 294*d29bd8efSBreno Leitao if (pid == 0) { 295*d29bd8efSBreno Leitao close(hs[0]); 296*d29bd8efSBreno Leitao stress_ng_child(hs[1]); 297*d29bd8efSBreno Leitao } 298*d29bd8efSBreno Leitao 299*d29bd8efSBreno Leitao close(hs[1]); 300*d29bd8efSBreno Leitao if (stress_ng_wait_handshake(hs[0], pid) < 0) { 301*d29bd8efSBreno Leitao close(hs[0]); 302*d29bd8efSBreno Leitao return -1; 303*d29bd8efSBreno Leitao } 304*d29bd8efSBreno Leitao close(hs[0]); 305*d29bd8efSBreno Leitao 306*d29bd8efSBreno Leitao /* Give stress-ng a moment to map its VM regions before measuring. */ 307*d29bd8efSBreno Leitao sleep(1); 308*d29bd8efSBreno Leitao return pid; 309*d29bd8efSBreno Leitao } 310*d29bd8efSBreno Leitao 311*d29bd8efSBreno Leitao static void kill_stress_ng(pid_t pid) 312*d29bd8efSBreno Leitao { 313*d29bd8efSBreno Leitao int status; 314*d29bd8efSBreno Leitao 315*d29bd8efSBreno Leitao if (pid <= 0) 316*d29bd8efSBreno Leitao return; 317*d29bd8efSBreno Leitao kill(pid, SIGTERM); 318*d29bd8efSBreno Leitao for (int i = 0; i < 20; i++) { 319*d29bd8efSBreno Leitao if (waitpid(pid, &status, WNOHANG) > 0) 320*d29bd8efSBreno Leitao return; 321*d29bd8efSBreno Leitao usleep(100 * 1000); 322*d29bd8efSBreno Leitao } 323*d29bd8efSBreno Leitao kill(pid, SIGKILL); 324*d29bd8efSBreno Leitao waitpid(pid, &status, 0); 325*d29bd8efSBreno Leitao } 326*d29bd8efSBreno Leitao 327*d29bd8efSBreno Leitao /* 328*d29bd8efSBreno Leitao * Allocate per-thread page-aligned buffers in main so a failed 329*d29bd8efSBreno Leitao * aligned_alloc() aborts the run before any thread starts. Workers used 330*d29bd8efSBreno Leitao * to allocate their own buffer and return NULL on failure, which left 331*d29bd8efSBreno Leitao * peers blocked in write()/read() with nobody to unblock them. 332*d29bd8efSBreno Leitao */ 333*d29bd8efSBreno Leitao static int alloc_thread_bufs(struct wstats *ws, int nw, 334*d29bd8efSBreno Leitao struct rstats *rs, int nr) 335*d29bd8efSBreno Leitao { 336*d29bd8efSBreno Leitao for (int i = 0; i < nw; i++) { 337*d29bd8efSBreno Leitao ws[i].buf = aligned_alloc(4096, g_msgsize); 338*d29bd8efSBreno Leitao if (!ws[i].buf) { 339*d29bd8efSBreno Leitao fprintf(stderr, "writer %d: aligned_alloc(%zu) failed\n", 340*d29bd8efSBreno Leitao i, g_msgsize); 341*d29bd8efSBreno Leitao return -1; 342*d29bd8efSBreno Leitao } 343*d29bd8efSBreno Leitao memset(ws[i].buf, 0xAA, g_msgsize); 344*d29bd8efSBreno Leitao } 345*d29bd8efSBreno Leitao for (int i = 0; i < nr; i++) { 346*d29bd8efSBreno Leitao rs[i].buf = aligned_alloc(4096, g_msgsize); 347*d29bd8efSBreno Leitao if (!rs[i].buf) { 348*d29bd8efSBreno Leitao fprintf(stderr, "reader %d: aligned_alloc(%zu) failed\n", 349*d29bd8efSBreno Leitao i, g_msgsize); 350*d29bd8efSBreno Leitao return -1; 351*d29bd8efSBreno Leitao } 352*d29bd8efSBreno Leitao } 353*d29bd8efSBreno Leitao return 0; 354*d29bd8efSBreno Leitao } 355*d29bd8efSBreno Leitao 356*d29bd8efSBreno Leitao static void free_thread_bufs(struct wstats *ws, int nw, 357*d29bd8efSBreno Leitao struct rstats *rs, int nr) 358*d29bd8efSBreno Leitao { 359*d29bd8efSBreno Leitao if (ws) 360*d29bd8efSBreno Leitao for (int i = 0; i < nw; i++) 361*d29bd8efSBreno Leitao free(ws[i].buf); 362*d29bd8efSBreno Leitao if (rs) 363*d29bd8efSBreno Leitao for (int i = 0; i < nr; i++) 364*d29bd8efSBreno Leitao free(rs[i].buf); 365*d29bd8efSBreno Leitao } 366*d29bd8efSBreno Leitao 367*d29bd8efSBreno Leitao static int start_readers(pthread_t *rt, struct rstats *rs, int nr, 368*d29bd8efSBreno Leitao int *created) 369*d29bd8efSBreno Leitao { 370*d29bd8efSBreno Leitao for (int i = 0; i < nr; i++) { 371*d29bd8efSBreno Leitao int err = pthread_create(&rt[i], NULL, reader, &rs[i]); 372*d29bd8efSBreno Leitao 373*d29bd8efSBreno Leitao if (err) { 374*d29bd8efSBreno Leitao fprintf(stderr, "pthread_create reader %d: %s\n", 375*d29bd8efSBreno Leitao i, strerror(err)); 376*d29bd8efSBreno Leitao return -1; 377*d29bd8efSBreno Leitao } 378*d29bd8efSBreno Leitao (*created)++; 379*d29bd8efSBreno Leitao } 380*d29bd8efSBreno Leitao return 0; 381*d29bd8efSBreno Leitao } 382*d29bd8efSBreno Leitao 383*d29bd8efSBreno Leitao static int start_writers(pthread_t *wt, struct wstats *ws, int nw, 384*d29bd8efSBreno Leitao int *created) 385*d29bd8efSBreno Leitao { 386*d29bd8efSBreno Leitao for (int i = 0; i < nw; i++) { 387*d29bd8efSBreno Leitao int err = pthread_create(&wt[i], NULL, writer, &ws[i]); 388*d29bd8efSBreno Leitao 389*d29bd8efSBreno Leitao if (err) { 390*d29bd8efSBreno Leitao fprintf(stderr, "pthread_create writer %d: %s\n", 391*d29bd8efSBreno Leitao i, strerror(err)); 392*d29bd8efSBreno Leitao return -1; 393*d29bd8efSBreno Leitao } 394*d29bd8efSBreno Leitao (*created)++; 395*d29bd8efSBreno Leitao } 396*d29bd8efSBreno Leitao return 0; 397*d29bd8efSBreno Leitao } 398*d29bd8efSBreno Leitao 399*d29bd8efSBreno Leitao static int open_bench_pipe(void) 400*d29bd8efSBreno Leitao { 401*d29bd8efSBreno Leitao if (pipe(g_pipe) < 0) { 402*d29bd8efSBreno Leitao perror("pipe"); 403*d29bd8efSBreno Leitao return -1; 404*d29bd8efSBreno Leitao } 405*d29bd8efSBreno Leitao if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0) 406*d29bd8efSBreno Leitao perror("F_SETPIPE_SZ (continuing)"); 407*d29bd8efSBreno Leitao return 0; 408*d29bd8efSBreno Leitao } 409*d29bd8efSBreno Leitao 410*d29bd8efSBreno Leitao /* 411*d29bd8efSBreno Leitao * Normal termination: g_stop tells writers to leave the loop after the 412*d29bd8efSBreno Leitao * current write() returns. Closing the shared write-end fd means once 413*d29bd8efSBreno Leitao * the in-flight writes drain, readers see EOF and exit. Writers are not 414*d29bd8efSBreno Leitao * unblocked by EPIPE here -- g_pipe[0] stays open so readers can keep 415*d29bd8efSBreno Leitao * draining. 416*d29bd8efSBreno Leitao * 417*d29bd8efSBreno Leitao * Error path: some threads may have been created and others skipped, so 418*d29bd8efSBreno Leitao * writers could be blocked in write() with no reader making progress. 419*d29bd8efSBreno Leitao * Close both ends -- closing the read end is what delivers EPIPE to a 420*d29bd8efSBreno Leitao * blocked writer. 421*d29bd8efSBreno Leitao */ 422*d29bd8efSBreno Leitao static void stop_and_join(pthread_t *wt, int nw_created, 423*d29bd8efSBreno Leitao pthread_t *rt, int nr_created, int rc) 424*d29bd8efSBreno Leitao { 425*d29bd8efSBreno Leitao atomic_store(&g_stop, 1); 426*d29bd8efSBreno Leitao close(g_pipe[1]); 427*d29bd8efSBreno Leitao if (rc < 0) 428*d29bd8efSBreno Leitao close(g_pipe[0]); 429*d29bd8efSBreno Leitao for (int i = 0; i < nw_created; i++) 430*d29bd8efSBreno Leitao pthread_join(wt[i], NULL); 431*d29bd8efSBreno Leitao for (int i = 0; i < nr_created; i++) 432*d29bd8efSBreno Leitao pthread_join(rt[i], NULL); 433*d29bd8efSBreno Leitao if (rc == 0) 434*d29bd8efSBreno Leitao close(g_pipe[0]); 435*d29bd8efSBreno Leitao } 436*d29bd8efSBreno Leitao 437*d29bd8efSBreno Leitao static int run_one(int nw, int nr) 438*d29bd8efSBreno Leitao { 439*d29bd8efSBreno Leitao pthread_t *wt = NULL, *rt = NULL; 440*d29bd8efSBreno Leitao struct wstats *ws = NULL; 441*d29bd8efSBreno Leitao struct rstats *rs = NULL; 442*d29bd8efSBreno Leitao int nw_created = 0, nr_created = 0; 443*d29bd8efSBreno Leitao int rc = 0; 444*d29bd8efSBreno Leitao 445*d29bd8efSBreno Leitao atomic_store(&g_stop, 0); 446*d29bd8efSBreno Leitao 447*d29bd8efSBreno Leitao if (open_bench_pipe() < 0) 448*d29bd8efSBreno Leitao return -1; 449*d29bd8efSBreno Leitao 450*d29bd8efSBreno Leitao wt = calloc((size_t)nw, sizeof(*wt)); 451*d29bd8efSBreno Leitao rt = calloc((size_t)nr, sizeof(*rt)); 452*d29bd8efSBreno Leitao ws = calloc((size_t)nw, sizeof(*ws)); 453*d29bd8efSBreno Leitao rs = calloc((size_t)nr, sizeof(*rs)); 454*d29bd8efSBreno Leitao if (!wt || !rt || !ws || !rs) { 455*d29bd8efSBreno Leitao fprintf(stderr, "alloc failed\n"); 456*d29bd8efSBreno Leitao rc = -1; 457*d29bd8efSBreno Leitao goto teardown; 458*d29bd8efSBreno Leitao } 459*d29bd8efSBreno Leitao 460*d29bd8efSBreno Leitao if (alloc_thread_bufs(ws, nw, rs, nr) < 0) { 461*d29bd8efSBreno Leitao rc = -1; 462*d29bd8efSBreno Leitao goto teardown; 463*d29bd8efSBreno Leitao } 464*d29bd8efSBreno Leitao 465*d29bd8efSBreno Leitao if (start_readers(rt, rs, nr, &nr_created) < 0 || 466*d29bd8efSBreno Leitao start_writers(wt, ws, nw, &nw_created) < 0) { 467*d29bd8efSBreno Leitao rc = -1; 468*d29bd8efSBreno Leitao goto teardown; 469*d29bd8efSBreno Leitao } 470*d29bd8efSBreno Leitao 471*d29bd8efSBreno Leitao sleep((unsigned int)g_duration); 472*d29bd8efSBreno Leitao 473*d29bd8efSBreno Leitao teardown: 474*d29bd8efSBreno Leitao stop_and_join(wt, nw_created, rt, nr_created, rc); 475*d29bd8efSBreno Leitao 476*d29bd8efSBreno Leitao if (rc == 0) { 477*d29bd8efSBreno Leitao summarize(ws, nw, nr); 478*d29bd8efSBreno Leitao fflush(stdout); 479*d29bd8efSBreno Leitao } 480*d29bd8efSBreno Leitao 481*d29bd8efSBreno Leitao free_thread_bufs(ws, nw, rs, nr); 482*d29bd8efSBreno Leitao free(wt); 483*d29bd8efSBreno Leitao free(rt); 484*d29bd8efSBreno Leitao free(ws); 485*d29bd8efSBreno Leitao free(rs); 486*d29bd8efSBreno Leitao return rc; 487*d29bd8efSBreno Leitao } 488*d29bd8efSBreno Leitao 489*d29bd8efSBreno Leitao static void usage(const char *prog) 490*d29bd8efSBreno Leitao { 491*d29bd8efSBreno Leitao fprintf(stderr, 492*d29bd8efSBreno Leitao "usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_size] [--memory-pressure]\n" 493*d29bd8efSBreno Leitao " default: sweep writers={1,2,5} x readers={1,5,10}\n" 494*d29bd8efSBreno Leitao " --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-method all) for the run\n", 495*d29bd8efSBreno Leitao prog); 496*d29bd8efSBreno Leitao } 497*d29bd8efSBreno Leitao 498*d29bd8efSBreno Leitao static int parse_args(int argc, char **argv, 499*d29bd8efSBreno Leitao int *writers_override, int *readers_override) 500*d29bd8efSBreno Leitao { 501*d29bd8efSBreno Leitao static const struct option long_opts[] = { 502*d29bd8efSBreno Leitao {"memory-pressure", no_argument, NULL, 'M'}, 503*d29bd8efSBreno Leitao {0, 0, 0, 0}, 504*d29bd8efSBreno Leitao }; 505*d29bd8efSBreno Leitao int opt; 506*d29bd8efSBreno Leitao 507*d29bd8efSBreno Leitao while ((opt = getopt_long(argc, argv, "w:r:s:d:p:", 508*d29bd8efSBreno Leitao long_opts, NULL)) != -1) { 509*d29bd8efSBreno Leitao switch (opt) { 510*d29bd8efSBreno Leitao case 'w': 511*d29bd8efSBreno Leitao *writers_override = atoi(optarg); 512*d29bd8efSBreno Leitao break; 513*d29bd8efSBreno Leitao case 'r': 514*d29bd8efSBreno Leitao *readers_override = atoi(optarg); 515*d29bd8efSBreno Leitao break; 516*d29bd8efSBreno Leitao case 's': 517*d29bd8efSBreno Leitao g_msgsize = (size_t)atol(optarg); 518*d29bd8efSBreno Leitao break; 519*d29bd8efSBreno Leitao case 'd': 520*d29bd8efSBreno Leitao g_duration = atoi(optarg); 521*d29bd8efSBreno Leitao break; 522*d29bd8efSBreno Leitao case 'p': 523*d29bd8efSBreno Leitao g_pipe_size = atoi(optarg); 524*d29bd8efSBreno Leitao break; 525*d29bd8efSBreno Leitao case 'M': 526*d29bd8efSBreno Leitao g_memory_pressure = 1; 527*d29bd8efSBreno Leitao break; 528*d29bd8efSBreno Leitao default: 529*d29bd8efSBreno Leitao usage(argv[0]); 530*d29bd8efSBreno Leitao return -1; 531*d29bd8efSBreno Leitao } 532*d29bd8efSBreno Leitao } 533*d29bd8efSBreno Leitao return 0; 534*d29bd8efSBreno Leitao } 535*d29bd8efSBreno Leitao 536*d29bd8efSBreno Leitao /* 537*d29bd8efSBreno Leitao * aligned_alloc(4096, size) requires size to be a multiple of the 538*d29bd8efSBreno Leitao * alignment (C11); glibc returns NULL otherwise, which would make 539*d29bd8efSBreno Leitao * writer/reader threads silently exit and the run report zero writes. 540*d29bd8efSBreno Leitao * Validate up front instead. 541*d29bd8efSBreno Leitao */ 542*d29bd8efSBreno Leitao static int validate_args(void) 543*d29bd8efSBreno Leitao { 544*d29bd8efSBreno Leitao if (g_msgsize == 0 || g_msgsize % 4096 != 0) { 545*d29bd8efSBreno Leitao fprintf(stderr, 546*d29bd8efSBreno Leitao "msgsize must be a positive multiple of 4096 (got %zu)\n", 547*d29bd8efSBreno Leitao g_msgsize); 548*d29bd8efSBreno Leitao return -1; 549*d29bd8efSBreno Leitao } 550*d29bd8efSBreno Leitao if (g_duration <= 0) { 551*d29bd8efSBreno Leitao fprintf(stderr, "duration must be > 0 seconds (got %d)\n", 552*d29bd8efSBreno Leitao g_duration); 553*d29bd8efSBreno Leitao return -1; 554*d29bd8efSBreno Leitao } 555*d29bd8efSBreno Leitao if (g_pipe_size <= 0) { 556*d29bd8efSBreno Leitao fprintf(stderr, "pipe_size must be > 0 bytes (got %d)\n", 557*d29bd8efSBreno Leitao g_pipe_size); 558*d29bd8efSBreno Leitao return -1; 559*d29bd8efSBreno Leitao } 560*d29bd8efSBreno Leitao return 0; 561*d29bd8efSBreno Leitao } 562*d29bd8efSBreno Leitao 563*d29bd8efSBreno Leitao static int run_sweep(void) 564*d29bd8efSBreno Leitao { 565*d29bd8efSBreno Leitao static const int writers_sweep[] = {1, 2, 5}; 566*d29bd8efSBreno Leitao static const int readers_sweep[] = {1, 5, 10}; 567*d29bd8efSBreno Leitao 568*d29bd8efSBreno Leitao for (size_t i = 0; i < ARRAY_SIZE(writers_sweep); i++) { 569*d29bd8efSBreno Leitao for (size_t j = 0; j < ARRAY_SIZE(readers_sweep); j++) { 570*d29bd8efSBreno Leitao printf("---\n"); 571*d29bd8efSBreno Leitao if (run_one(writers_sweep[i], readers_sweep[j]) < 0) 572*d29bd8efSBreno Leitao return -1; 573*d29bd8efSBreno Leitao } 574*d29bd8efSBreno Leitao } 575*d29bd8efSBreno Leitao return 0; 576*d29bd8efSBreno Leitao } 577*d29bd8efSBreno Leitao 578*d29bd8efSBreno Leitao int main(int argc, char **argv) 579*d29bd8efSBreno Leitao { 580*d29bd8efSBreno Leitao int writers_override = 0, readers_override = 0; 581*d29bd8efSBreno Leitao pid_t stress_pid = -1; 582*d29bd8efSBreno Leitao int rc = 0; 583*d29bd8efSBreno Leitao 584*d29bd8efSBreno Leitao if (parse_args(argc, argv, &writers_override, &readers_override) < 0) 585*d29bd8efSBreno Leitao return 1; 586*d29bd8efSBreno Leitao if (validate_args() < 0) 587*d29bd8efSBreno Leitao return 1; 588*d29bd8efSBreno Leitao 589*d29bd8efSBreno Leitao signal(SIGPIPE, SIG_IGN); 590*d29bd8efSBreno Leitao setvbuf(stdout, NULL, _IOLBF, 0); 591*d29bd8efSBreno Leitao setvbuf(stderr, NULL, _IOLBF, 0); 592*d29bd8efSBreno Leitao 593*d29bd8efSBreno Leitao fprintf(stderr, "pid=%d\n", getpid()); 594*d29bd8efSBreno Leitao fflush(stderr); 595*d29bd8efSBreno Leitao 596*d29bd8efSBreno Leitao if (g_memory_pressure) { 597*d29bd8efSBreno Leitao stress_pid = spawn_stress_ng(); 598*d29bd8efSBreno Leitao if (stress_pid < 0) { 599*d29bd8efSBreno Leitao fprintf(stderr, 600*d29bd8efSBreno Leitao "memory_pressure requested but stress-ng could not be spawned\n"); 601*d29bd8efSBreno Leitao return 1; 602*d29bd8efSBreno Leitao } 603*d29bd8efSBreno Leitao } 604*d29bd8efSBreno Leitao 605*d29bd8efSBreno Leitao if (writers_override > 0 || readers_override > 0) { 606*d29bd8efSBreno Leitao int nw = writers_override > 0 ? writers_override : 1; 607*d29bd8efSBreno Leitao int nr = readers_override > 0 ? readers_override : 1; 608*d29bd8efSBreno Leitao 609*d29bd8efSBreno Leitao rc = run_one(nw, nr) < 0 ? 1 : 0; 610*d29bd8efSBreno Leitao } else { 611*d29bd8efSBreno Leitao rc = run_sweep() < 0 ? 1 : 0; 612*d29bd8efSBreno Leitao } 613*d29bd8efSBreno Leitao 614*d29bd8efSBreno Leitao kill_stress_ng(stress_pid); 615*d29bd8efSBreno Leitao return rc; 616*d29bd8efSBreno Leitao } 617