xref: /linux/tools/testing/selftests/pipe/pipe_bench.c (revision d29bd8efe16239608b60173a7e8d842bcbfcd9e9)
1*d29bd8efSBreno Leitao // SPDX-License-Identifier: GPL-2.0
2*d29bd8efSBreno Leitao /*
3*d29bd8efSBreno Leitao  * pipe_bench - exercise concurrent pipe operation
4*d29bd8efSBreno Leitao  *
5*d29bd8efSBreno Leitao  * N writer threads hammer a single pipe with multi-page writes; M reader
6*d29bd8efSBreno Leitao  * threads drain it. Each writer records its own write() latency histogram.
7*d29bd8efSBreno Leitao  * Multi-page writes (msgsize >= PAGE_SIZE) force the loop in
8*d29bd8efSBreno Leitao  * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under
9*d29bd8efSBreno Leitao  * pipe->mutex, which is the critical section the patch shrinks.
10*d29bd8efSBreno Leitao  *
11*d29bd8efSBreno Leitao  * By default the benchmark sweeps writers in {1, 2, 5} x readers in
12*d29bd8efSBreno Leitao  * {1, 5, 10} and prints one block per configuration so two runs (e.g.
13*d29bd8efSBreno Leitao  * baseline vs patched) can be diffed directly. Pass -w and -r to run a
14*d29bd8efSBreno Leitao  * single configuration instead. Pass --memory-pressure to spawn stress-ng
15*d29bd8efSBreno Leitao  * alongside the sweep so the per-page alloc_page() path under pipe->mutex
16*d29bd8efSBreno Leitao  * has to dip into reclaim.
17*d29bd8efSBreno Leitao  *
18*d29bd8efSBreno Leitao  * Copyright (c) 2026 Meta Platforms, Inc. and affiliates
19*d29bd8efSBreno Leitao  * Copyright (c) 2026 Breno Leitao <leitao@debian.org>
20*d29bd8efSBreno Leitao  */
21*d29bd8efSBreno Leitao 
22*d29bd8efSBreno Leitao #define _GNU_SOURCE
23*d29bd8efSBreno Leitao #include <errno.h>
24*d29bd8efSBreno Leitao #include <fcntl.h>
25*d29bd8efSBreno Leitao #include <getopt.h>
26*d29bd8efSBreno Leitao #include <poll.h>
27*d29bd8efSBreno Leitao #include <pthread.h>
28*d29bd8efSBreno Leitao #include <signal.h>
29*d29bd8efSBreno Leitao #include <stdatomic.h>
30*d29bd8efSBreno Leitao #include <stdint.h>
31*d29bd8efSBreno Leitao #include <stdio.h>
32*d29bd8efSBreno Leitao #include <stdlib.h>
33*d29bd8efSBreno Leitao #include <string.h>
34*d29bd8efSBreno Leitao #include <sys/wait.h>
35*d29bd8efSBreno Leitao #include <time.h>
36*d29bd8efSBreno Leitao #include <unistd.h>
37*d29bd8efSBreno Leitao 
38*d29bd8efSBreno Leitao #define ARRAY_SIZE(a)	(sizeof(a) / sizeof((a)[0]))
39*d29bd8efSBreno Leitao #define HIST_BUCKETS	32
40*d29bd8efSBreno Leitao 
41*d29bd8efSBreno Leitao static size_t g_msgsize = 16 * 4096;
42*d29bd8efSBreno Leitao static int g_duration = 3;
43*d29bd8efSBreno Leitao static int g_pipe_size = 1024 * 1024;
44*d29bd8efSBreno Leitao static int g_memory_pressure;
45*d29bd8efSBreno Leitao 
46*d29bd8efSBreno Leitao static atomic_int g_stop;
47*d29bd8efSBreno Leitao static int g_pipe[2];
48*d29bd8efSBreno Leitao 
49*d29bd8efSBreno Leitao struct wstats {
50*d29bd8efSBreno Leitao 	uint64_t writes;
51*d29bd8efSBreno Leitao 	uint64_t bytes;
52*d29bd8efSBreno Leitao 	uint64_t lat_sum_ns;
53*d29bd8efSBreno Leitao 	uint64_t lat_max_ns;
54*d29bd8efSBreno Leitao 	uint64_t lat_hist[HIST_BUCKETS];
55*d29bd8efSBreno Leitao 	char *buf;
56*d29bd8efSBreno Leitao };
57*d29bd8efSBreno Leitao 
58*d29bd8efSBreno Leitao struct rstats {
59*d29bd8efSBreno Leitao 	char *buf;
60*d29bd8efSBreno Leitao };
61*d29bd8efSBreno Leitao 
62*d29bd8efSBreno Leitao struct hist_totals {
63*d29bd8efSBreno Leitao 	uint64_t writes;
64*d29bd8efSBreno Leitao 	uint64_t bytes;
65*d29bd8efSBreno Leitao 	uint64_t lat_sum;
66*d29bd8efSBreno Leitao 	uint64_t lat_max;
67*d29bd8efSBreno Leitao };
68*d29bd8efSBreno Leitao 
69*d29bd8efSBreno Leitao static inline uint64_t now_ns(void)
70*d29bd8efSBreno Leitao {
71*d29bd8efSBreno Leitao 	struct timespec ts;
72*d29bd8efSBreno Leitao 
73*d29bd8efSBreno Leitao 	clock_gettime(CLOCK_MONOTONIC, &ts);
74*d29bd8efSBreno Leitao 	return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
75*d29bd8efSBreno Leitao }
76*d29bd8efSBreno Leitao 
77*d29bd8efSBreno Leitao static inline int log2_bucket(uint64_t v)
78*d29bd8efSBreno Leitao {
79*d29bd8efSBreno Leitao 	int b = 0;
80*d29bd8efSBreno Leitao 
81*d29bd8efSBreno Leitao 	if (!v)
82*d29bd8efSBreno Leitao 		return 0;
83*d29bd8efSBreno Leitao 	while (v >>= 1)
84*d29bd8efSBreno Leitao 		b++;
85*d29bd8efSBreno Leitao 	return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1;
86*d29bd8efSBreno Leitao }
87*d29bd8efSBreno Leitao 
88*d29bd8efSBreno Leitao static void *writer(void *arg)
89*d29bd8efSBreno Leitao {
90*d29bd8efSBreno Leitao 	struct wstats *s = arg;
91*d29bd8efSBreno Leitao 
92*d29bd8efSBreno Leitao 	while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) {
93*d29bd8efSBreno Leitao 		uint64_t t0 = now_ns();
94*d29bd8efSBreno Leitao 		ssize_t n = write(g_pipe[1], s->buf, g_msgsize);
95*d29bd8efSBreno Leitao 		uint64_t dt = now_ns() - t0;
96*d29bd8efSBreno Leitao 
97*d29bd8efSBreno Leitao 		if (n > 0) {
98*d29bd8efSBreno Leitao 			s->writes++;
99*d29bd8efSBreno Leitao 			s->bytes += (uint64_t)n;
100*d29bd8efSBreno Leitao 			s->lat_sum_ns += dt;
101*d29bd8efSBreno Leitao 			if (dt > s->lat_max_ns)
102*d29bd8efSBreno Leitao 				s->lat_max_ns = dt;
103*d29bd8efSBreno Leitao 			s->lat_hist[log2_bucket(dt)]++;
104*d29bd8efSBreno Leitao 		} else if (n < 0 && (errno == EPIPE || errno == EBADF)) {
105*d29bd8efSBreno Leitao 			break;
106*d29bd8efSBreno Leitao 		}
107*d29bd8efSBreno Leitao 	}
108*d29bd8efSBreno Leitao 	return NULL;
109*d29bd8efSBreno Leitao }
110*d29bd8efSBreno Leitao 
111*d29bd8efSBreno Leitao static void *reader(void *arg)
112*d29bd8efSBreno Leitao {
113*d29bd8efSBreno Leitao 	struct rstats *s = arg;
114*d29bd8efSBreno Leitao 
115*d29bd8efSBreno Leitao 	/*
116*d29bd8efSBreno Leitao 	 * Drain until EOF (write end closed by main). g_stop is not checked
117*d29bd8efSBreno Leitao 	 * here on purpose: writers may be blocked in write() with the pipe
118*d29bd8efSBreno Leitao 	 * full when g_stop is set, so the reader must keep draining until
119*d29bd8efSBreno Leitao 	 * main closes the write end.
120*d29bd8efSBreno Leitao 	 */
121*d29bd8efSBreno Leitao 	for (;;) {
122*d29bd8efSBreno Leitao 		ssize_t n = read(g_pipe[0], s->buf, g_msgsize);
123*d29bd8efSBreno Leitao 
124*d29bd8efSBreno Leitao 		if (n <= 0)
125*d29bd8efSBreno Leitao 			break;
126*d29bd8efSBreno Leitao 	}
127*d29bd8efSBreno Leitao 	return NULL;
128*d29bd8efSBreno Leitao }
129*d29bd8efSBreno Leitao 
130*d29bd8efSBreno Leitao /* Sum per-writer stats and per-bucket counts into the caller's aggregates. */
131*d29bd8efSBreno Leitao static void aggregate_wstats(struct wstats *all, int nw,
132*d29bd8efSBreno Leitao 			     uint64_t agg[HIST_BUCKETS],
133*d29bd8efSBreno Leitao 			     struct hist_totals *t)
134*d29bd8efSBreno Leitao {
135*d29bd8efSBreno Leitao 	memset(t, 0, sizeof(*t));
136*d29bd8efSBreno Leitao 	for (int i = 0; i < nw; i++) {
137*d29bd8efSBreno Leitao 		t->writes += all[i].writes;
138*d29bd8efSBreno Leitao 		t->bytes += all[i].bytes;
139*d29bd8efSBreno Leitao 		t->lat_sum += all[i].lat_sum_ns;
140*d29bd8efSBreno Leitao 		if (all[i].lat_max_ns > t->lat_max)
141*d29bd8efSBreno Leitao 			t->lat_max = all[i].lat_max_ns;
142*d29bd8efSBreno Leitao 		for (int b = 0; b < HIST_BUCKETS; b++)
143*d29bd8efSBreno Leitao 			agg[b] += all[i].lat_hist[b];
144*d29bd8efSBreno Leitao 	}
145*d29bd8efSBreno Leitao }
146*d29bd8efSBreno Leitao 
147*d29bd8efSBreno Leitao /*
148*d29bd8efSBreno Leitao  * Walk @agg in order, returning the inclusive upper bound (in ns) of the
149*d29bd8efSBreno Leitao  * log2 bucket where the running sum first reaches @target.
150*d29bd8efSBreno Leitao  *
151*d29bd8efSBreno Leitao  * A percentile is undefined with zero samples, and with very low sample
152*d29bd8efSBreno Leitao  * counts integer truncation could make @target zero -- then "cum >= 0"
153*d29bd8efSBreno Leitao  * would latch on the first (possibly empty) bucket. Callers must pass
154*d29bd8efSBreno Leitao  * @target >= 1.
155*d29bd8efSBreno Leitao  */
156*d29bd8efSBreno Leitao static uint64_t bucket_at(const uint64_t agg[HIST_BUCKETS], uint64_t target)
157*d29bd8efSBreno Leitao {
158*d29bd8efSBreno Leitao 	uint64_t cum = 0;
159*d29bd8efSBreno Leitao 
160*d29bd8efSBreno Leitao 	for (int b = 0; b < HIST_BUCKETS; b++) {
161*d29bd8efSBreno Leitao 		/* HIST_BUCKETS <= 63, so (b + 1) is always a safe shift. */
162*d29bd8efSBreno Leitao 		uint64_t upper = (1ULL << (b + 1)) - 1;
163*d29bd8efSBreno Leitao 
164*d29bd8efSBreno Leitao 		cum += agg[b];
165*d29bd8efSBreno Leitao 		if (cum >= target)
166*d29bd8efSBreno Leitao 			return upper;
167*d29bd8efSBreno Leitao 	}
168*d29bd8efSBreno Leitao 	return 0;
169*d29bd8efSBreno Leitao }
170*d29bd8efSBreno Leitao 
171*d29bd8efSBreno Leitao static void compute_p50_p99(const uint64_t agg[HIST_BUCKETS], uint64_t writes,
172*d29bd8efSBreno Leitao 			    uint64_t *p50, uint64_t *p99)
173*d29bd8efSBreno Leitao {
174*d29bd8efSBreno Leitao 	uint64_t p50_target, p99_target;
175*d29bd8efSBreno Leitao 
176*d29bd8efSBreno Leitao 	*p50 = *p99 = 0;
177*d29bd8efSBreno Leitao 	if (!writes)
178*d29bd8efSBreno Leitao 		return;
179*d29bd8efSBreno Leitao 
180*d29bd8efSBreno Leitao 	p50_target = writes * 50 / 100;
181*d29bd8efSBreno Leitao 	p99_target = writes * 99 / 100;
182*d29bd8efSBreno Leitao 	if (!p50_target)
183*d29bd8efSBreno Leitao 		p50_target = 1;
184*d29bd8efSBreno Leitao 	if (!p99_target)
185*d29bd8efSBreno Leitao 		p99_target = 1;
186*d29bd8efSBreno Leitao 
187*d29bd8efSBreno Leitao 	*p50 = bucket_at(agg, p50_target);
188*d29bd8efSBreno Leitao 	*p99 = bucket_at(agg, p99_target);
189*d29bd8efSBreno Leitao }
190*d29bd8efSBreno Leitao 
191*d29bd8efSBreno Leitao static void print_summary(int nw, int nr, const struct hist_totals *t,
192*d29bd8efSBreno Leitao 			  uint64_t p50, uint64_t p99)
193*d29bd8efSBreno Leitao {
194*d29bd8efSBreno Leitao 	double sec = g_duration;
195*d29bd8efSBreno Leitao 	uint64_t avg_ns = t->writes ? t->lat_sum / t->writes : 0;
196*d29bd8efSBreno Leitao 
197*d29bd8efSBreno Leitao 	printf("config: writers=%d readers=%d msgsize=%zu duration=%d pipe_size=%d memory_pressure=%s\n",
198*d29bd8efSBreno Leitao 	       nw, nr, g_msgsize, g_duration, g_pipe_size,
199*d29bd8efSBreno Leitao 	       g_memory_pressure ? "yes" : "no");
200*d29bd8efSBreno Leitao 	printf("writes: total=%llu rate=%.0f/s\n",
201*d29bd8efSBreno Leitao 	       (unsigned long long)t->writes, (double)t->writes / sec);
202*d29bd8efSBreno Leitao 	printf("throughput_MBps: %.2f\n",
203*d29bd8efSBreno Leitao 	       ((double)t->bytes / sec) / (1024.0 * 1024.0));
204*d29bd8efSBreno Leitao 	printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns);
205*d29bd8efSBreno Leitao 	printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50);
206*d29bd8efSBreno Leitao 	printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99);
207*d29bd8efSBreno Leitao 	printf("lat_max_ns: %llu\n", (unsigned long long)t->lat_max);
208*d29bd8efSBreno Leitao }
209*d29bd8efSBreno Leitao 
210*d29bd8efSBreno Leitao static void summarize(struct wstats *all, int nw, int nr)
211*d29bd8efSBreno Leitao {
212*d29bd8efSBreno Leitao 	uint64_t agg[HIST_BUCKETS] = {0};
213*d29bd8efSBreno Leitao 	struct hist_totals t;
214*d29bd8efSBreno Leitao 	uint64_t p50, p99;
215*d29bd8efSBreno Leitao 
216*d29bd8efSBreno Leitao 	aggregate_wstats(all, nw, agg, &t);
217*d29bd8efSBreno Leitao 	compute_p50_p99(agg, t.writes, &p50, &p99);
218*d29bd8efSBreno Leitao 	print_summary(nw, nr, &t, p50, p99);
219*d29bd8efSBreno Leitao }
220*d29bd8efSBreno Leitao 
221*d29bd8efSBreno Leitao /*
222*d29bd8efSBreno Leitao  * Child branch of fork(): restore SIGPIPE to default (parent ignores it),
223*d29bd8efSBreno Leitao  * exec stress-ng, and on failure write the reason into @hs_wr before
224*d29bd8efSBreno Leitao  * exiting. The parent observes EOF on hs_wr (closed via O_CLOEXEC) when
225*d29bd8efSBreno Leitao  * exec succeeds.
226*d29bd8efSBreno Leitao  */
227*d29bd8efSBreno Leitao static void stress_ng_child(int hs_wr) __attribute__((noreturn));
228*d29bd8efSBreno Leitao static void stress_ng_child(int hs_wr)
229*d29bd8efSBreno Leitao {
230*d29bd8efSBreno Leitao 	char errbuf[256];
231*d29bd8efSBreno Leitao 
232*d29bd8efSBreno Leitao 	signal(SIGPIPE, SIG_DFL);
233*d29bd8efSBreno Leitao 	execlp("stress-ng", "stress-ng",
234*d29bd8efSBreno Leitao 	       "--vm", "4", "--vm-bytes", "80%",
235*d29bd8efSBreno Leitao 	       "--vm-method", "all",
236*d29bd8efSBreno Leitao 	       (char *)NULL);
237*d29bd8efSBreno Leitao 	snprintf(errbuf, sizeof(errbuf),
238*d29bd8efSBreno Leitao 		 "exec stress-ng failed: %s\n", strerror(errno));
239*d29bd8efSBreno Leitao 	(void)!write(hs_wr, errbuf, strlen(errbuf));
240*d29bd8efSBreno Leitao 	_exit(127);
241*d29bd8efSBreno Leitao }
242*d29bd8efSBreno Leitao 
243*d29bd8efSBreno Leitao /*
244*d29bd8efSBreno Leitao  * Read from the O_CLOEXEC handshake pipe. Anything readable means the
245*d29bd8efSBreno Leitao  * child wrote an error before exec; EOF (n == 0) means the write-end
246*d29bd8efSBreno Leitao  * closed because exec succeeded. Returns 0 on exec success, -1 if the
247*d29bd8efSBreno Leitao  * child failed and was reaped.
248*d29bd8efSBreno Leitao  */
249*d29bd8efSBreno Leitao static int stress_ng_wait_handshake(int hs_rd, pid_t pid)
250*d29bd8efSBreno Leitao {
251*d29bd8efSBreno Leitao 	struct pollfd pfd = { .fd = hs_rd, .events = POLLIN };
252*d29bd8efSBreno Leitao 	char errbuf[256];
253*d29bd8efSBreno Leitao 	int status;
254*d29bd8efSBreno Leitao 	int ret;
255*d29bd8efSBreno Leitao 
256*d29bd8efSBreno Leitao 	ret = poll(&pfd, 1, 500);
257*d29bd8efSBreno Leitao 	if (ret <= 0)
258*d29bd8efSBreno Leitao 		return 0;
259*d29bd8efSBreno Leitao 
260*d29bd8efSBreno Leitao 	ssize_t n = read(hs_rd, errbuf, sizeof(errbuf) - 1);
261*d29bd8efSBreno Leitao 
262*d29bd8efSBreno Leitao 	if (n > 0) {
263*d29bd8efSBreno Leitao 		errbuf[n] = '\0';
264*d29bd8efSBreno Leitao 		fputs(errbuf, stderr);
265*d29bd8efSBreno Leitao 		waitpid(pid, &status, 0);
266*d29bd8efSBreno Leitao 		return -1;
267*d29bd8efSBreno Leitao 	}
268*d29bd8efSBreno Leitao 	return 0;
269*d29bd8efSBreno Leitao }
270*d29bd8efSBreno Leitao 
271*d29bd8efSBreno Leitao static pid_t spawn_stress_ng(void)
272*d29bd8efSBreno Leitao {
273*d29bd8efSBreno Leitao 	int hs[2];
274*d29bd8efSBreno Leitao 	pid_t pid;
275*d29bd8efSBreno Leitao 
276*d29bd8efSBreno Leitao 	/*
277*d29bd8efSBreno Leitao 	 * Handshake pipe: child writes one byte and _exit()s on exec
278*d29bd8efSBreno Leitao 	 * failure. On exec success the O_CLOEXEC flag closes the write
279*d29bd8efSBreno Leitao 	 * end, which the parent observes as EOF. This makes the "is
280*d29bd8efSBreno Leitao 	 * stress-ng on $PATH?" check fail fast rather than silently.
281*d29bd8efSBreno Leitao 	 */
282*d29bd8efSBreno Leitao 	if (pipe2(hs, O_CLOEXEC) < 0) {
283*d29bd8efSBreno Leitao 		perror("pipe2");
284*d29bd8efSBreno Leitao 		return -1;
285*d29bd8efSBreno Leitao 	}
286*d29bd8efSBreno Leitao 
287*d29bd8efSBreno Leitao 	pid = fork();
288*d29bd8efSBreno Leitao 	if (pid < 0) {
289*d29bd8efSBreno Leitao 		perror("fork");
290*d29bd8efSBreno Leitao 		close(hs[0]);
291*d29bd8efSBreno Leitao 		close(hs[1]);
292*d29bd8efSBreno Leitao 		return -1;
293*d29bd8efSBreno Leitao 	}
294*d29bd8efSBreno Leitao 	if (pid == 0) {
295*d29bd8efSBreno Leitao 		close(hs[0]);
296*d29bd8efSBreno Leitao 		stress_ng_child(hs[1]);
297*d29bd8efSBreno Leitao 	}
298*d29bd8efSBreno Leitao 
299*d29bd8efSBreno Leitao 	close(hs[1]);
300*d29bd8efSBreno Leitao 	if (stress_ng_wait_handshake(hs[0], pid) < 0) {
301*d29bd8efSBreno Leitao 		close(hs[0]);
302*d29bd8efSBreno Leitao 		return -1;
303*d29bd8efSBreno Leitao 	}
304*d29bd8efSBreno Leitao 	close(hs[0]);
305*d29bd8efSBreno Leitao 
306*d29bd8efSBreno Leitao 	/* Give stress-ng a moment to map its VM regions before measuring. */
307*d29bd8efSBreno Leitao 	sleep(1);
308*d29bd8efSBreno Leitao 	return pid;
309*d29bd8efSBreno Leitao }
310*d29bd8efSBreno Leitao 
311*d29bd8efSBreno Leitao static void kill_stress_ng(pid_t pid)
312*d29bd8efSBreno Leitao {
313*d29bd8efSBreno Leitao 	int status;
314*d29bd8efSBreno Leitao 
315*d29bd8efSBreno Leitao 	if (pid <= 0)
316*d29bd8efSBreno Leitao 		return;
317*d29bd8efSBreno Leitao 	kill(pid, SIGTERM);
318*d29bd8efSBreno Leitao 	for (int i = 0; i < 20; i++) {
319*d29bd8efSBreno Leitao 		if (waitpid(pid, &status, WNOHANG) > 0)
320*d29bd8efSBreno Leitao 			return;
321*d29bd8efSBreno Leitao 		usleep(100 * 1000);
322*d29bd8efSBreno Leitao 	}
323*d29bd8efSBreno Leitao 	kill(pid, SIGKILL);
324*d29bd8efSBreno Leitao 	waitpid(pid, &status, 0);
325*d29bd8efSBreno Leitao }
326*d29bd8efSBreno Leitao 
327*d29bd8efSBreno Leitao /*
328*d29bd8efSBreno Leitao  * Allocate per-thread page-aligned buffers in main so a failed
329*d29bd8efSBreno Leitao  * aligned_alloc() aborts the run before any thread starts. Workers used
330*d29bd8efSBreno Leitao  * to allocate their own buffer and return NULL on failure, which left
331*d29bd8efSBreno Leitao  * peers blocked in write()/read() with nobody to unblock them.
332*d29bd8efSBreno Leitao  */
333*d29bd8efSBreno Leitao static int alloc_thread_bufs(struct wstats *ws, int nw,
334*d29bd8efSBreno Leitao 			     struct rstats *rs, int nr)
335*d29bd8efSBreno Leitao {
336*d29bd8efSBreno Leitao 	for (int i = 0; i < nw; i++) {
337*d29bd8efSBreno Leitao 		ws[i].buf = aligned_alloc(4096, g_msgsize);
338*d29bd8efSBreno Leitao 		if (!ws[i].buf) {
339*d29bd8efSBreno Leitao 			fprintf(stderr, "writer %d: aligned_alloc(%zu) failed\n",
340*d29bd8efSBreno Leitao 				i, g_msgsize);
341*d29bd8efSBreno Leitao 			return -1;
342*d29bd8efSBreno Leitao 		}
343*d29bd8efSBreno Leitao 		memset(ws[i].buf, 0xAA, g_msgsize);
344*d29bd8efSBreno Leitao 	}
345*d29bd8efSBreno Leitao 	for (int i = 0; i < nr; i++) {
346*d29bd8efSBreno Leitao 		rs[i].buf = aligned_alloc(4096, g_msgsize);
347*d29bd8efSBreno Leitao 		if (!rs[i].buf) {
348*d29bd8efSBreno Leitao 			fprintf(stderr, "reader %d: aligned_alloc(%zu) failed\n",
349*d29bd8efSBreno Leitao 				i, g_msgsize);
350*d29bd8efSBreno Leitao 			return -1;
351*d29bd8efSBreno Leitao 		}
352*d29bd8efSBreno Leitao 	}
353*d29bd8efSBreno Leitao 	return 0;
354*d29bd8efSBreno Leitao }
355*d29bd8efSBreno Leitao 
356*d29bd8efSBreno Leitao static void free_thread_bufs(struct wstats *ws, int nw,
357*d29bd8efSBreno Leitao 			     struct rstats *rs, int nr)
358*d29bd8efSBreno Leitao {
359*d29bd8efSBreno Leitao 	if (ws)
360*d29bd8efSBreno Leitao 		for (int i = 0; i < nw; i++)
361*d29bd8efSBreno Leitao 			free(ws[i].buf);
362*d29bd8efSBreno Leitao 	if (rs)
363*d29bd8efSBreno Leitao 		for (int i = 0; i < nr; i++)
364*d29bd8efSBreno Leitao 			free(rs[i].buf);
365*d29bd8efSBreno Leitao }
366*d29bd8efSBreno Leitao 
367*d29bd8efSBreno Leitao static int start_readers(pthread_t *rt, struct rstats *rs, int nr,
368*d29bd8efSBreno Leitao 			 int *created)
369*d29bd8efSBreno Leitao {
370*d29bd8efSBreno Leitao 	for (int i = 0; i < nr; i++) {
371*d29bd8efSBreno Leitao 		int err = pthread_create(&rt[i], NULL, reader, &rs[i]);
372*d29bd8efSBreno Leitao 
373*d29bd8efSBreno Leitao 		if (err) {
374*d29bd8efSBreno Leitao 			fprintf(stderr, "pthread_create reader %d: %s\n",
375*d29bd8efSBreno Leitao 				i, strerror(err));
376*d29bd8efSBreno Leitao 			return -1;
377*d29bd8efSBreno Leitao 		}
378*d29bd8efSBreno Leitao 		(*created)++;
379*d29bd8efSBreno Leitao 	}
380*d29bd8efSBreno Leitao 	return 0;
381*d29bd8efSBreno Leitao }
382*d29bd8efSBreno Leitao 
383*d29bd8efSBreno Leitao static int start_writers(pthread_t *wt, struct wstats *ws, int nw,
384*d29bd8efSBreno Leitao 			 int *created)
385*d29bd8efSBreno Leitao {
386*d29bd8efSBreno Leitao 	for (int i = 0; i < nw; i++) {
387*d29bd8efSBreno Leitao 		int err = pthread_create(&wt[i], NULL, writer, &ws[i]);
388*d29bd8efSBreno Leitao 
389*d29bd8efSBreno Leitao 		if (err) {
390*d29bd8efSBreno Leitao 			fprintf(stderr, "pthread_create writer %d: %s\n",
391*d29bd8efSBreno Leitao 				i, strerror(err));
392*d29bd8efSBreno Leitao 			return -1;
393*d29bd8efSBreno Leitao 		}
394*d29bd8efSBreno Leitao 		(*created)++;
395*d29bd8efSBreno Leitao 	}
396*d29bd8efSBreno Leitao 	return 0;
397*d29bd8efSBreno Leitao }
398*d29bd8efSBreno Leitao 
399*d29bd8efSBreno Leitao static int open_bench_pipe(void)
400*d29bd8efSBreno Leitao {
401*d29bd8efSBreno Leitao 	if (pipe(g_pipe) < 0) {
402*d29bd8efSBreno Leitao 		perror("pipe");
403*d29bd8efSBreno Leitao 		return -1;
404*d29bd8efSBreno Leitao 	}
405*d29bd8efSBreno Leitao 	if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0)
406*d29bd8efSBreno Leitao 		perror("F_SETPIPE_SZ (continuing)");
407*d29bd8efSBreno Leitao 	return 0;
408*d29bd8efSBreno Leitao }
409*d29bd8efSBreno Leitao 
410*d29bd8efSBreno Leitao /*
411*d29bd8efSBreno Leitao  * Normal termination: g_stop tells writers to leave the loop after the
412*d29bd8efSBreno Leitao  * current write() returns. Closing the shared write-end fd means once
413*d29bd8efSBreno Leitao  * the in-flight writes drain, readers see EOF and exit. Writers are not
414*d29bd8efSBreno Leitao  * unblocked by EPIPE here -- g_pipe[0] stays open so readers can keep
415*d29bd8efSBreno Leitao  * draining.
416*d29bd8efSBreno Leitao  *
417*d29bd8efSBreno Leitao  * Error path: some threads may have been created and others skipped, so
418*d29bd8efSBreno Leitao  * writers could be blocked in write() with no reader making progress.
419*d29bd8efSBreno Leitao  * Close both ends -- closing the read end is what delivers EPIPE to a
420*d29bd8efSBreno Leitao  * blocked writer.
421*d29bd8efSBreno Leitao  */
422*d29bd8efSBreno Leitao static void stop_and_join(pthread_t *wt, int nw_created,
423*d29bd8efSBreno Leitao 			  pthread_t *rt, int nr_created, int rc)
424*d29bd8efSBreno Leitao {
425*d29bd8efSBreno Leitao 	atomic_store(&g_stop, 1);
426*d29bd8efSBreno Leitao 	close(g_pipe[1]);
427*d29bd8efSBreno Leitao 	if (rc < 0)
428*d29bd8efSBreno Leitao 		close(g_pipe[0]);
429*d29bd8efSBreno Leitao 	for (int i = 0; i < nw_created; i++)
430*d29bd8efSBreno Leitao 		pthread_join(wt[i], NULL);
431*d29bd8efSBreno Leitao 	for (int i = 0; i < nr_created; i++)
432*d29bd8efSBreno Leitao 		pthread_join(rt[i], NULL);
433*d29bd8efSBreno Leitao 	if (rc == 0)
434*d29bd8efSBreno Leitao 		close(g_pipe[0]);
435*d29bd8efSBreno Leitao }
436*d29bd8efSBreno Leitao 
437*d29bd8efSBreno Leitao static int run_one(int nw, int nr)
438*d29bd8efSBreno Leitao {
439*d29bd8efSBreno Leitao 	pthread_t *wt = NULL, *rt = NULL;
440*d29bd8efSBreno Leitao 	struct wstats *ws = NULL;
441*d29bd8efSBreno Leitao 	struct rstats *rs = NULL;
442*d29bd8efSBreno Leitao 	int nw_created = 0, nr_created = 0;
443*d29bd8efSBreno Leitao 	int rc = 0;
444*d29bd8efSBreno Leitao 
445*d29bd8efSBreno Leitao 	atomic_store(&g_stop, 0);
446*d29bd8efSBreno Leitao 
447*d29bd8efSBreno Leitao 	if (open_bench_pipe() < 0)
448*d29bd8efSBreno Leitao 		return -1;
449*d29bd8efSBreno Leitao 
450*d29bd8efSBreno Leitao 	wt = calloc((size_t)nw, sizeof(*wt));
451*d29bd8efSBreno Leitao 	rt = calloc((size_t)nr, sizeof(*rt));
452*d29bd8efSBreno Leitao 	ws = calloc((size_t)nw, sizeof(*ws));
453*d29bd8efSBreno Leitao 	rs = calloc((size_t)nr, sizeof(*rs));
454*d29bd8efSBreno Leitao 	if (!wt || !rt || !ws || !rs) {
455*d29bd8efSBreno Leitao 		fprintf(stderr, "alloc failed\n");
456*d29bd8efSBreno Leitao 		rc = -1;
457*d29bd8efSBreno Leitao 		goto teardown;
458*d29bd8efSBreno Leitao 	}
459*d29bd8efSBreno Leitao 
460*d29bd8efSBreno Leitao 	if (alloc_thread_bufs(ws, nw, rs, nr) < 0) {
461*d29bd8efSBreno Leitao 		rc = -1;
462*d29bd8efSBreno Leitao 		goto teardown;
463*d29bd8efSBreno Leitao 	}
464*d29bd8efSBreno Leitao 
465*d29bd8efSBreno Leitao 	if (start_readers(rt, rs, nr, &nr_created) < 0 ||
466*d29bd8efSBreno Leitao 	    start_writers(wt, ws, nw, &nw_created) < 0) {
467*d29bd8efSBreno Leitao 		rc = -1;
468*d29bd8efSBreno Leitao 		goto teardown;
469*d29bd8efSBreno Leitao 	}
470*d29bd8efSBreno Leitao 
471*d29bd8efSBreno Leitao 	sleep((unsigned int)g_duration);
472*d29bd8efSBreno Leitao 
473*d29bd8efSBreno Leitao teardown:
474*d29bd8efSBreno Leitao 	stop_and_join(wt, nw_created, rt, nr_created, rc);
475*d29bd8efSBreno Leitao 
476*d29bd8efSBreno Leitao 	if (rc == 0) {
477*d29bd8efSBreno Leitao 		summarize(ws, nw, nr);
478*d29bd8efSBreno Leitao 		fflush(stdout);
479*d29bd8efSBreno Leitao 	}
480*d29bd8efSBreno Leitao 
481*d29bd8efSBreno Leitao 	free_thread_bufs(ws, nw, rs, nr);
482*d29bd8efSBreno Leitao 	free(wt);
483*d29bd8efSBreno Leitao 	free(rt);
484*d29bd8efSBreno Leitao 	free(ws);
485*d29bd8efSBreno Leitao 	free(rs);
486*d29bd8efSBreno Leitao 	return rc;
487*d29bd8efSBreno Leitao }
488*d29bd8efSBreno Leitao 
489*d29bd8efSBreno Leitao static void usage(const char *prog)
490*d29bd8efSBreno Leitao {
491*d29bd8efSBreno Leitao 	fprintf(stderr,
492*d29bd8efSBreno Leitao 		"usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_size] [--memory-pressure]\n"
493*d29bd8efSBreno Leitao 		"  default: sweep writers={1,2,5} x readers={1,5,10}\n"
494*d29bd8efSBreno Leitao 		"  --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-method all) for the run\n",
495*d29bd8efSBreno Leitao 		prog);
496*d29bd8efSBreno Leitao }
497*d29bd8efSBreno Leitao 
498*d29bd8efSBreno Leitao static int parse_args(int argc, char **argv,
499*d29bd8efSBreno Leitao 		      int *writers_override, int *readers_override)
500*d29bd8efSBreno Leitao {
501*d29bd8efSBreno Leitao 	static const struct option long_opts[] = {
502*d29bd8efSBreno Leitao 		{"memory-pressure", no_argument, NULL, 'M'},
503*d29bd8efSBreno Leitao 		{0, 0, 0, 0},
504*d29bd8efSBreno Leitao 	};
505*d29bd8efSBreno Leitao 	int opt;
506*d29bd8efSBreno Leitao 
507*d29bd8efSBreno Leitao 	while ((opt = getopt_long(argc, argv, "w:r:s:d:p:",
508*d29bd8efSBreno Leitao 				  long_opts, NULL)) != -1) {
509*d29bd8efSBreno Leitao 		switch (opt) {
510*d29bd8efSBreno Leitao 		case 'w':
511*d29bd8efSBreno Leitao 			*writers_override = atoi(optarg);
512*d29bd8efSBreno Leitao 			break;
513*d29bd8efSBreno Leitao 		case 'r':
514*d29bd8efSBreno Leitao 			*readers_override = atoi(optarg);
515*d29bd8efSBreno Leitao 			break;
516*d29bd8efSBreno Leitao 		case 's':
517*d29bd8efSBreno Leitao 			g_msgsize = (size_t)atol(optarg);
518*d29bd8efSBreno Leitao 			break;
519*d29bd8efSBreno Leitao 		case 'd':
520*d29bd8efSBreno Leitao 			g_duration = atoi(optarg);
521*d29bd8efSBreno Leitao 			break;
522*d29bd8efSBreno Leitao 		case 'p':
523*d29bd8efSBreno Leitao 			g_pipe_size = atoi(optarg);
524*d29bd8efSBreno Leitao 			break;
525*d29bd8efSBreno Leitao 		case 'M':
526*d29bd8efSBreno Leitao 			g_memory_pressure = 1;
527*d29bd8efSBreno Leitao 			break;
528*d29bd8efSBreno Leitao 		default:
529*d29bd8efSBreno Leitao 			usage(argv[0]);
530*d29bd8efSBreno Leitao 			return -1;
531*d29bd8efSBreno Leitao 		}
532*d29bd8efSBreno Leitao 	}
533*d29bd8efSBreno Leitao 	return 0;
534*d29bd8efSBreno Leitao }
535*d29bd8efSBreno Leitao 
536*d29bd8efSBreno Leitao /*
537*d29bd8efSBreno Leitao  * aligned_alloc(4096, size) requires size to be a multiple of the
538*d29bd8efSBreno Leitao  * alignment (C11); glibc returns NULL otherwise, which would make
539*d29bd8efSBreno Leitao  * writer/reader threads silently exit and the run report zero writes.
540*d29bd8efSBreno Leitao  * Validate up front instead.
541*d29bd8efSBreno Leitao  */
542*d29bd8efSBreno Leitao static int validate_args(void)
543*d29bd8efSBreno Leitao {
544*d29bd8efSBreno Leitao 	if (g_msgsize == 0 || g_msgsize % 4096 != 0) {
545*d29bd8efSBreno Leitao 		fprintf(stderr,
546*d29bd8efSBreno Leitao 			"msgsize must be a positive multiple of 4096 (got %zu)\n",
547*d29bd8efSBreno Leitao 			g_msgsize);
548*d29bd8efSBreno Leitao 		return -1;
549*d29bd8efSBreno Leitao 	}
550*d29bd8efSBreno Leitao 	if (g_duration <= 0) {
551*d29bd8efSBreno Leitao 		fprintf(stderr, "duration must be > 0 seconds (got %d)\n",
552*d29bd8efSBreno Leitao 			g_duration);
553*d29bd8efSBreno Leitao 		return -1;
554*d29bd8efSBreno Leitao 	}
555*d29bd8efSBreno Leitao 	if (g_pipe_size <= 0) {
556*d29bd8efSBreno Leitao 		fprintf(stderr, "pipe_size must be > 0 bytes (got %d)\n",
557*d29bd8efSBreno Leitao 			g_pipe_size);
558*d29bd8efSBreno Leitao 		return -1;
559*d29bd8efSBreno Leitao 	}
560*d29bd8efSBreno Leitao 	return 0;
561*d29bd8efSBreno Leitao }
562*d29bd8efSBreno Leitao 
563*d29bd8efSBreno Leitao static int run_sweep(void)
564*d29bd8efSBreno Leitao {
565*d29bd8efSBreno Leitao 	static const int writers_sweep[] = {1, 2, 5};
566*d29bd8efSBreno Leitao 	static const int readers_sweep[] = {1, 5, 10};
567*d29bd8efSBreno Leitao 
568*d29bd8efSBreno Leitao 	for (size_t i = 0; i < ARRAY_SIZE(writers_sweep); i++) {
569*d29bd8efSBreno Leitao 		for (size_t j = 0; j < ARRAY_SIZE(readers_sweep); j++) {
570*d29bd8efSBreno Leitao 			printf("---\n");
571*d29bd8efSBreno Leitao 			if (run_one(writers_sweep[i], readers_sweep[j]) < 0)
572*d29bd8efSBreno Leitao 				return -1;
573*d29bd8efSBreno Leitao 		}
574*d29bd8efSBreno Leitao 	}
575*d29bd8efSBreno Leitao 	return 0;
576*d29bd8efSBreno Leitao }
577*d29bd8efSBreno Leitao 
578*d29bd8efSBreno Leitao int main(int argc, char **argv)
579*d29bd8efSBreno Leitao {
580*d29bd8efSBreno Leitao 	int writers_override = 0, readers_override = 0;
581*d29bd8efSBreno Leitao 	pid_t stress_pid = -1;
582*d29bd8efSBreno Leitao 	int rc = 0;
583*d29bd8efSBreno Leitao 
584*d29bd8efSBreno Leitao 	if (parse_args(argc, argv, &writers_override, &readers_override) < 0)
585*d29bd8efSBreno Leitao 		return 1;
586*d29bd8efSBreno Leitao 	if (validate_args() < 0)
587*d29bd8efSBreno Leitao 		return 1;
588*d29bd8efSBreno Leitao 
589*d29bd8efSBreno Leitao 	signal(SIGPIPE, SIG_IGN);
590*d29bd8efSBreno Leitao 	setvbuf(stdout, NULL, _IOLBF, 0);
591*d29bd8efSBreno Leitao 	setvbuf(stderr, NULL, _IOLBF, 0);
592*d29bd8efSBreno Leitao 
593*d29bd8efSBreno Leitao 	fprintf(stderr, "pid=%d\n", getpid());
594*d29bd8efSBreno Leitao 	fflush(stderr);
595*d29bd8efSBreno Leitao 
596*d29bd8efSBreno Leitao 	if (g_memory_pressure) {
597*d29bd8efSBreno Leitao 		stress_pid = spawn_stress_ng();
598*d29bd8efSBreno Leitao 		if (stress_pid < 0) {
599*d29bd8efSBreno Leitao 			fprintf(stderr,
600*d29bd8efSBreno Leitao 				"memory_pressure requested but stress-ng could not be spawned\n");
601*d29bd8efSBreno Leitao 			return 1;
602*d29bd8efSBreno Leitao 		}
603*d29bd8efSBreno Leitao 	}
604*d29bd8efSBreno Leitao 
605*d29bd8efSBreno Leitao 	if (writers_override > 0 || readers_override > 0) {
606*d29bd8efSBreno Leitao 		int nw = writers_override > 0 ? writers_override : 1;
607*d29bd8efSBreno Leitao 		int nr = readers_override > 0 ? readers_override : 1;
608*d29bd8efSBreno Leitao 
609*d29bd8efSBreno Leitao 		rc = run_one(nw, nr) < 0 ? 1 : 0;
610*d29bd8efSBreno Leitao 	} else {
611*d29bd8efSBreno Leitao 		rc = run_sweep() < 0 ? 1 : 0;
612*d29bd8efSBreno Leitao 	}
613*d29bd8efSBreno Leitao 
614*d29bd8efSBreno Leitao 	kill_stress_ng(stress_pid);
615*d29bd8efSBreno Leitao 	return rc;
616*d29bd8efSBreno Leitao }
617