xref: /linux/lib/test_workqueue.c (revision 9e1e9d660255d7216067193d774f338d08d8528d)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 /*
4  * Test module for stress and performance analysis of workqueue.
5  *
6  * Benchmarks queue_work() throughput on an unbound workqueue to measure
7  * pool->lock contention under different affinity scope configurations
8  * (e.g., cache vs cache_shard).
9  *
10  * The affinity scope is changed between runs via the workqueue's sysfs
11  * affinity_scope attribute (WQ_SYSFS).
12  *
13  * Copyright (c) 2026 Meta Platforms, Inc. and affiliates
14  * Copyright (c) 2026 Breno Leitao <leitao@debian.org>
15  *
16  */
17 #include <linux/init.h>
18 #include <linux/kernel.h>
19 #include <linux/module.h>
20 #include <linux/workqueue.h>
21 #include <linux/kthread.h>
22 #include <linux/moduleparam.h>
23 #include <linux/completion.h>
24 #include <linux/atomic.h>
25 #include <linux/slab.h>
26 #include <linux/ktime.h>
27 #include <linux/cpumask.h>
28 #include <linux/sched.h>
29 #include <linux/sort.h>
30 #include <linux/fs.h>
31 
32 #define WQ_NAME "bench_wq"
33 #define SCOPE_PATH "/sys/bus/workqueue/devices/" WQ_NAME "/affinity_scope"
34 
35 static int nr_threads;
36 module_param(nr_threads, int, 0444);
37 MODULE_PARM_DESC(nr_threads,
38 		 "Number of threads to spawn (default: 0 = num_online_cpus())");
39 
40 static int wq_items = 50000;
41 module_param(wq_items, int, 0444);
42 MODULE_PARM_DESC(wq_items,
43 		 "Number of work items each thread queues (default: 50000)");
44 
45 static struct workqueue_struct *bench_wq;
46 static atomic_t threads_done;
47 static DECLARE_COMPLETION(start_comp);
48 static DECLARE_COMPLETION(all_done_comp);
49 
50 struct thread_ctx {
51 	struct completion work_done;
52 	struct work_struct work;
53 	u64 *latencies;
54 	int cpu;
55 	int items;
56 };
57 
58 static void bench_work_fn(struct work_struct *work)
59 {
60 	struct thread_ctx *ctx = container_of(work, struct thread_ctx, work);
61 
62 	complete(&ctx->work_done);
63 }
64 
65 static int bench_kthread_fn(void *data)
66 {
67 	struct thread_ctx *ctx = data;
68 	ktime_t t_start, t_end;
69 	int i;
70 
71 	/* Wait for all threads to be ready */
72 	wait_for_completion(&start_comp);
73 
74 	if (kthread_should_stop())
75 		return 0;
76 
77 	for (i = 0; i < ctx->items; i++) {
78 		reinit_completion(&ctx->work_done);
79 		INIT_WORK(&ctx->work, bench_work_fn);
80 
81 		t_start = ktime_get();
82 		queue_work(bench_wq, &ctx->work);
83 		t_end = ktime_get();
84 
85 		ctx->latencies[i] = ktime_to_ns(ktime_sub(t_end, t_start));
86 		wait_for_completion(&ctx->work_done);
87 	}
88 
89 	if (atomic_dec_and_test(&threads_done))
90 		complete(&all_done_comp);
91 
92 	/*
93 	 * Wait for kthread_stop() so the module text isn't freed
94 	 * while we're still executing.
95 	 */
96 	while (!kthread_should_stop())
97 		schedule();
98 
99 	return 0;
100 }
101 
102 static int cmp_u64(const void *a, const void *b)
103 {
104 	u64 va = *(const u64 *)a;
105 	u64 vb = *(const u64 *)b;
106 
107 	if (va < vb)
108 		return -1;
109 	if (va > vb)
110 		return 1;
111 	return 0;
112 }
113 
114 static int __init set_affn_scope(const char *scope)
115 {
116 	struct file *f;
117 	loff_t pos = 0;
118 	ssize_t ret;
119 
120 	f = filp_open(SCOPE_PATH, O_WRONLY, 0);
121 	if (IS_ERR(f)) {
122 		pr_err("test_workqueue: open %s failed: %ld\n",
123 		       SCOPE_PATH, PTR_ERR(f));
124 		return PTR_ERR(f);
125 	}
126 
127 	ret = kernel_write(f, scope, strlen(scope), &pos);
128 	filp_close(f, NULL);
129 
130 	if (ret < 0) {
131 		pr_err("test_workqueue: write '%s' failed: %zd\n", scope, ret);
132 		return ret;
133 	}
134 
135 	return 0;
136 }
137 
138 static int __init run_bench(int n_threads, const char *scope, const char *label)
139 {
140 	struct task_struct **tasks;
141 	unsigned long total_items;
142 	struct thread_ctx *ctxs;
143 	u64 *all_latencies;
144 	ktime_t start, end;
145 	int cpu, i, j, ret;
146 	s64 elapsed_us;
147 
148 	ret = set_affn_scope(scope);
149 	if (ret)
150 		return ret;
151 
152 	ctxs = kcalloc(n_threads, sizeof(*ctxs), GFP_KERNEL);
153 	if (!ctxs)
154 		return -ENOMEM;
155 
156 	tasks = kcalloc(n_threads, sizeof(*tasks), GFP_KERNEL);
157 	if (!tasks) {
158 		kfree(ctxs);
159 		return -ENOMEM;
160 	}
161 
162 	total_items = (unsigned long)n_threads * wq_items;
163 	all_latencies = kvmalloc_array(total_items, sizeof(u64), GFP_KERNEL);
164 	if (!all_latencies) {
165 		kfree(tasks);
166 		kfree(ctxs);
167 		return -ENOMEM;
168 	}
169 
170 	/* Allocate per-thread latency arrays */
171 	for (i = 0; i < n_threads; i++) {
172 		ctxs[i].latencies = kvmalloc_array(wq_items, sizeof(u64),
173 						   GFP_KERNEL);
174 		if (!ctxs[i].latencies) {
175 			while (--i >= 0)
176 				kvfree(ctxs[i].latencies);
177 			kvfree(all_latencies);
178 			kfree(tasks);
179 			kfree(ctxs);
180 			return -ENOMEM;
181 		}
182 	}
183 
184 	atomic_set(&threads_done, n_threads);
185 	reinit_completion(&all_done_comp);
186 	reinit_completion(&start_comp);
187 
188 	/* Create kthreads, each bound to a different online CPU */
189 	i = 0;
190 	for_each_online_cpu(cpu) {
191 		if (i >= n_threads)
192 			break;
193 
194 		ctxs[i].cpu = cpu;
195 		ctxs[i].items = wq_items;
196 		init_completion(&ctxs[i].work_done);
197 
198 		tasks[i] = kthread_create(bench_kthread_fn, &ctxs[i],
199 					  "wq_bench/%d", cpu);
200 		if (IS_ERR(tasks[i])) {
201 			ret = PTR_ERR(tasks[i]);
202 			pr_err("test_workqueue: failed to create kthread %d: %d\n",
203 			       i, ret);
204 			/* Unblock threads waiting on start_comp before stopping them */
205 			complete_all(&start_comp);
206 			while (--i >= 0)
207 				kthread_stop(tasks[i]);
208 			goto out_free;
209 		}
210 
211 		kthread_bind(tasks[i], cpu);
212 		wake_up_process(tasks[i]);
213 		i++;
214 	}
215 
216 	/* Start timing and release all threads */
217 	start = ktime_get();
218 	complete_all(&start_comp);
219 
220 	/* Wait for all threads to finish the benchmark */
221 	wait_for_completion(&all_done_comp);
222 
223 	/* Drain any remaining work */
224 	flush_workqueue(bench_wq);
225 
226 	/* Ensure all kthreads have fully exited before module memory is freed */
227 	for (i = 0; i < n_threads; i++)
228 		kthread_stop(tasks[i]);
229 
230 	end = ktime_get();
231 	elapsed_us = ktime_us_delta(end, start);
232 
233 	/* Merge all per-thread latencies and sort for percentile calculation */
234 	j = 0;
235 	for (i = 0; i < n_threads; i++) {
236 		memcpy(&all_latencies[j], ctxs[i].latencies,
237 		       wq_items * sizeof(u64));
238 		j += wq_items;
239 	}
240 
241 	sort(all_latencies, total_items, sizeof(u64), cmp_u64, NULL);
242 
243 	pr_info("test_workqueue:   %-16s %llu items/sec\tp50=%llu\tp90=%llu\tp95=%llu ns\n",
244 		label,
245 		elapsed_us ? div_u64(total_items * 1000000ULL, elapsed_us) : 0,
246 		all_latencies[total_items * 50 / 100],
247 		all_latencies[total_items * 90 / 100],
248 		all_latencies[total_items * 95 / 100]);
249 
250 	ret = 0;
251 out_free:
252 	for (i = 0; i < n_threads; i++)
253 		kvfree(ctxs[i].latencies);
254 	kvfree(all_latencies);
255 	kfree(tasks);
256 	kfree(ctxs);
257 
258 	return ret;
259 }
260 
261 static const char * const bench_scopes[] = {
262 	"cpu", "smt", "cache_shard", "cache", "numa", "system",
263 };
264 
265 static int __init test_workqueue_init(void)
266 {
267 	int n_threads = min(nr_threads ?: num_online_cpus(), num_online_cpus());
268 	int i;
269 
270 	if (wq_items <= 0) {
271 		pr_err("test_workqueue: wq_items must be > 0\n");
272 		return -EINVAL;
273 	}
274 
275 	bench_wq = alloc_workqueue(WQ_NAME, WQ_UNBOUND | WQ_SYSFS, 0);
276 	if (!bench_wq)
277 		return -ENOMEM;
278 
279 	pr_info("test_workqueue: running %d threads, %d items/thread\n",
280 		n_threads, wq_items);
281 
282 	for (i = 0; i < ARRAY_SIZE(bench_scopes); i++)
283 		run_bench(n_threads, bench_scopes[i], bench_scopes[i]);
284 
285 	destroy_workqueue(bench_wq);
286 
287 	/* Return -EAGAIN so the module doesn't stay loaded after the benchmark */
288 	return -EAGAIN;
289 }
290 
291 module_init(test_workqueue_init);
292 MODULE_AUTHOR("Breno Leitao <leitao@debian.org>");
293 MODULE_DESCRIPTION("Stress/performance benchmark for workqueue subsystem");
294 MODULE_LICENSE("GPL");
295