1 // SPDX-License-Identifier: GPL-2.0 2 3 /* 4 * Test module for stress and performance analysis of workqueue. 5 * 6 * Benchmarks queue_work() throughput on an unbound workqueue to measure 7 * pool->lock contention under different affinity scope configurations 8 * (e.g., cache vs cache_shard). 9 * 10 * The affinity scope is changed between runs via the workqueue's sysfs 11 * affinity_scope attribute (WQ_SYSFS). 12 * 13 * Copyright (c) 2026 Meta Platforms, Inc. and affiliates 14 * Copyright (c) 2026 Breno Leitao <leitao@debian.org> 15 * 16 */ 17 #include <linux/init.h> 18 #include <linux/kernel.h> 19 #include <linux/module.h> 20 #include <linux/workqueue.h> 21 #include <linux/kthread.h> 22 #include <linux/moduleparam.h> 23 #include <linux/completion.h> 24 #include <linux/atomic.h> 25 #include <linux/slab.h> 26 #include <linux/ktime.h> 27 #include <linux/cpumask.h> 28 #include <linux/sched.h> 29 #include <linux/sort.h> 30 #include <linux/fs.h> 31 32 #define WQ_NAME "bench_wq" 33 #define SCOPE_PATH "/sys/bus/workqueue/devices/" WQ_NAME "/affinity_scope" 34 35 static int nr_threads; 36 module_param(nr_threads, int, 0444); 37 MODULE_PARM_DESC(nr_threads, 38 "Number of threads to spawn (default: 0 = num_online_cpus())"); 39 40 static int wq_items = 50000; 41 module_param(wq_items, int, 0444); 42 MODULE_PARM_DESC(wq_items, 43 "Number of work items each thread queues (default: 50000)"); 44 45 static struct workqueue_struct *bench_wq; 46 static atomic_t threads_done; 47 static DECLARE_COMPLETION(start_comp); 48 static DECLARE_COMPLETION(all_done_comp); 49 50 struct thread_ctx { 51 struct completion work_done; 52 struct work_struct work; 53 u64 *latencies; 54 int cpu; 55 int items; 56 }; 57 58 static void bench_work_fn(struct work_struct *work) 59 { 60 struct thread_ctx *ctx = container_of(work, struct thread_ctx, work); 61 62 complete(&ctx->work_done); 63 } 64 65 static int bench_kthread_fn(void *data) 66 { 67 struct thread_ctx *ctx = data; 68 ktime_t t_start, t_end; 69 int i; 70 71 /* Wait for all threads to be ready */ 72 wait_for_completion(&start_comp); 73 74 if (kthread_should_stop()) 75 return 0; 76 77 for (i = 0; i < ctx->items; i++) { 78 reinit_completion(&ctx->work_done); 79 INIT_WORK(&ctx->work, bench_work_fn); 80 81 t_start = ktime_get(); 82 queue_work(bench_wq, &ctx->work); 83 t_end = ktime_get(); 84 85 ctx->latencies[i] = ktime_to_ns(ktime_sub(t_end, t_start)); 86 wait_for_completion(&ctx->work_done); 87 } 88 89 if (atomic_dec_and_test(&threads_done)) 90 complete(&all_done_comp); 91 92 /* 93 * Wait for kthread_stop() so the module text isn't freed 94 * while we're still executing. 95 */ 96 while (!kthread_should_stop()) 97 schedule(); 98 99 return 0; 100 } 101 102 static int cmp_u64(const void *a, const void *b) 103 { 104 u64 va = *(const u64 *)a; 105 u64 vb = *(const u64 *)b; 106 107 if (va < vb) 108 return -1; 109 if (va > vb) 110 return 1; 111 return 0; 112 } 113 114 static int __init set_affn_scope(const char *scope) 115 { 116 struct file *f; 117 loff_t pos = 0; 118 ssize_t ret; 119 120 f = filp_open(SCOPE_PATH, O_WRONLY, 0); 121 if (IS_ERR(f)) { 122 pr_err("test_workqueue: open %s failed: %ld\n", 123 SCOPE_PATH, PTR_ERR(f)); 124 return PTR_ERR(f); 125 } 126 127 ret = kernel_write(f, scope, strlen(scope), &pos); 128 filp_close(f, NULL); 129 130 if (ret < 0) { 131 pr_err("test_workqueue: write '%s' failed: %zd\n", scope, ret); 132 return ret; 133 } 134 135 return 0; 136 } 137 138 static int __init run_bench(int n_threads, const char *scope, const char *label) 139 { 140 struct task_struct **tasks; 141 unsigned long total_items; 142 struct thread_ctx *ctxs; 143 u64 *all_latencies; 144 ktime_t start, end; 145 int cpu, i, j, ret; 146 s64 elapsed_us; 147 148 ret = set_affn_scope(scope); 149 if (ret) 150 return ret; 151 152 ctxs = kcalloc(n_threads, sizeof(*ctxs), GFP_KERNEL); 153 if (!ctxs) 154 return -ENOMEM; 155 156 tasks = kcalloc(n_threads, sizeof(*tasks), GFP_KERNEL); 157 if (!tasks) { 158 kfree(ctxs); 159 return -ENOMEM; 160 } 161 162 total_items = (unsigned long)n_threads * wq_items; 163 all_latencies = kvmalloc_array(total_items, sizeof(u64), GFP_KERNEL); 164 if (!all_latencies) { 165 kfree(tasks); 166 kfree(ctxs); 167 return -ENOMEM; 168 } 169 170 /* Allocate per-thread latency arrays */ 171 for (i = 0; i < n_threads; i++) { 172 ctxs[i].latencies = kvmalloc_array(wq_items, sizeof(u64), 173 GFP_KERNEL); 174 if (!ctxs[i].latencies) { 175 while (--i >= 0) 176 kvfree(ctxs[i].latencies); 177 kvfree(all_latencies); 178 kfree(tasks); 179 kfree(ctxs); 180 return -ENOMEM; 181 } 182 } 183 184 atomic_set(&threads_done, n_threads); 185 reinit_completion(&all_done_comp); 186 reinit_completion(&start_comp); 187 188 /* Create kthreads, each bound to a different online CPU */ 189 i = 0; 190 for_each_online_cpu(cpu) { 191 if (i >= n_threads) 192 break; 193 194 ctxs[i].cpu = cpu; 195 ctxs[i].items = wq_items; 196 init_completion(&ctxs[i].work_done); 197 198 tasks[i] = kthread_create(bench_kthread_fn, &ctxs[i], 199 "wq_bench/%d", cpu); 200 if (IS_ERR(tasks[i])) { 201 ret = PTR_ERR(tasks[i]); 202 pr_err("test_workqueue: failed to create kthread %d: %d\n", 203 i, ret); 204 /* Unblock threads waiting on start_comp before stopping them */ 205 complete_all(&start_comp); 206 while (--i >= 0) 207 kthread_stop(tasks[i]); 208 goto out_free; 209 } 210 211 kthread_bind(tasks[i], cpu); 212 wake_up_process(tasks[i]); 213 i++; 214 } 215 216 /* Start timing and release all threads */ 217 start = ktime_get(); 218 complete_all(&start_comp); 219 220 /* Wait for all threads to finish the benchmark */ 221 wait_for_completion(&all_done_comp); 222 223 /* Drain any remaining work */ 224 flush_workqueue(bench_wq); 225 226 /* Ensure all kthreads have fully exited before module memory is freed */ 227 for (i = 0; i < n_threads; i++) 228 kthread_stop(tasks[i]); 229 230 end = ktime_get(); 231 elapsed_us = ktime_us_delta(end, start); 232 233 /* Merge all per-thread latencies and sort for percentile calculation */ 234 j = 0; 235 for (i = 0; i < n_threads; i++) { 236 memcpy(&all_latencies[j], ctxs[i].latencies, 237 wq_items * sizeof(u64)); 238 j += wq_items; 239 } 240 241 sort(all_latencies, total_items, sizeof(u64), cmp_u64, NULL); 242 243 pr_info("test_workqueue: %-16s %llu items/sec\tp50=%llu\tp90=%llu\tp95=%llu ns\n", 244 label, 245 elapsed_us ? div_u64(total_items * 1000000ULL, elapsed_us) : 0, 246 all_latencies[total_items * 50 / 100], 247 all_latencies[total_items * 90 / 100], 248 all_latencies[total_items * 95 / 100]); 249 250 ret = 0; 251 out_free: 252 for (i = 0; i < n_threads; i++) 253 kvfree(ctxs[i].latencies); 254 kvfree(all_latencies); 255 kfree(tasks); 256 kfree(ctxs); 257 258 return ret; 259 } 260 261 static const char * const bench_scopes[] = { 262 "cpu", "smt", "cache_shard", "cache", "numa", "system", 263 }; 264 265 static int __init test_workqueue_init(void) 266 { 267 int n_threads = min(nr_threads ?: num_online_cpus(), num_online_cpus()); 268 int i; 269 270 if (wq_items <= 0) { 271 pr_err("test_workqueue: wq_items must be > 0\n"); 272 return -EINVAL; 273 } 274 275 bench_wq = alloc_workqueue(WQ_NAME, WQ_UNBOUND | WQ_SYSFS, 0); 276 if (!bench_wq) 277 return -ENOMEM; 278 279 pr_info("test_workqueue: running %d threads, %d items/thread\n", 280 n_threads, wq_items); 281 282 for (i = 0; i < ARRAY_SIZE(bench_scopes); i++) 283 run_bench(n_threads, bench_scopes[i], bench_scopes[i]); 284 285 destroy_workqueue(bench_wq); 286 287 /* Return -EAGAIN so the module doesn't stay loaded after the benchmark */ 288 return -EAGAIN; 289 } 290 291 module_init(test_workqueue_init); 292 MODULE_AUTHOR("Breno Leitao <leitao@debian.org>"); 293 MODULE_DESCRIPTION("Stress/performance benchmark for workqueue subsystem"); 294 MODULE_LICENSE("GPL"); 295