xref: /linux/tools/testing/selftests/sched_ext/dequeue.c (revision 59a62ea4583e0f740bb3576ec210b23f39754327)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2025 NVIDIA Corporation.
4  */
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <time.h>
10 #include <bpf/bpf.h>
11 #include <scx/common.h>
12 #include <sys/wait.h>
13 #include <sched.h>
14 #include <pthread.h>
15 #include "scx_test.h"
16 #include "dequeue.bpf.skel.h"
17 
18 #define NUM_WORKERS 8
19 #define AFFINITY_HAMMER_MS 500
20 
21 /*
22  * Worker function that creates enqueue/dequeue events via CPU work and
23  * sleep.
24  */
worker_fn(int id)25 static void worker_fn(int id)
26 {
27 	int i;
28 	volatile int sum = 0;
29 
30 	for (i = 0; i < 1000; i++) {
31 		volatile int j;
32 
33 		/* Do some work to trigger scheduling events */
34 		for (j = 0; j < 10000; j++)
35 			sum += j;
36 		asm volatile("" : : "r"(sum));
37 
38 		/* Sleep to trigger dequeue */
39 		usleep(1000 + (id * 100));
40 	}
41 
42 	exit(0);
43 }
44 
45 /*
46  * This thread changes workers' affinity from outside so that some changes
47  * hit tasks while they are still in the scheduler's queue and trigger
48  * property-change dequeues.
49  */
affinity_hammer_fn(void * arg)50 static void *affinity_hammer_fn(void *arg)
51 {
52 	pid_t *pids = arg;
53 	cpu_set_t cpuset;
54 	int i = 0, n = NUM_WORKERS;
55 	struct timespec start, now;
56 
57 	clock_gettime(CLOCK_MONOTONIC, &start);
58 	while (1) {
59 		int w = i % n;
60 		int cpu = (i / n) % 4;
61 
62 		CPU_ZERO(&cpuset);
63 		CPU_SET(cpu, &cpuset);
64 		sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
65 		i++;
66 
67 		/* Check elapsed time every 256 iterations to limit gettime cost */
68 		if ((i & 255) == 0) {
69 			long long elapsed_ms;
70 
71 			clock_gettime(CLOCK_MONOTONIC, &now);
72 			elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
73 				     (now.tv_nsec - start.tv_nsec) / 1000000;
74 			if (elapsed_ms >= AFFINITY_HAMMER_MS)
75 				break;
76 		}
77 	}
78 	return NULL;
79 }
80 
run_scenario(struct dequeue * skel,u32 scenario,const char * scenario_name)81 static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
82 					 const char *scenario_name)
83 {
84 	struct bpf_link *link;
85 	pid_t pids[NUM_WORKERS];
86 	pthread_t hammer;
87 
88 	int i, status;
89 	u64 enq_start, deq_start,
90 	    dispatch_deq_start, change_deq_start, bpf_queue_full_start;
91 	u64 enq_delta, deq_delta,
92 	    dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;
93 
94 	/* Set the test scenario */
95 	skel->bss->test_scenario = scenario;
96 
97 	/* Record starting counts */
98 	enq_start = skel->bss->enqueue_cnt;
99 	deq_start = skel->bss->dequeue_cnt;
100 	dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
101 	change_deq_start = skel->bss->change_dequeue_cnt;
102 	bpf_queue_full_start = skel->bss->bpf_queue_full;
103 
104 	link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
105 	SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);
106 
107 	/* Fork worker processes to generate enqueue/dequeue events */
108 	for (i = 0; i < NUM_WORKERS; i++) {
109 		pids[i] = fork();
110 		SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);
111 
112 		if (pids[i] == 0) {
113 			worker_fn(i);
114 			/* Should not reach here */
115 			exit(1);
116 		}
117 	}
118 
119 	/*
120 	 * Run an "affinity hammer" so that some property changes hit tasks
121 	 * while they are still in BPF custody (e.g., in user DSQ or BPF
122 	 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues.
123 	 */
124 	SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
125 		    "Failed to create affinity hammer thread");
126 	pthread_join(hammer, NULL);
127 
128 	/* Wait for all workers to complete */
129 	for (i = 0; i < NUM_WORKERS; i++) {
130 		SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
131 			    "Failed to wait for worker %d", i);
132 		SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
133 	}
134 
135 	bpf_link__destroy(link);
136 
137 	SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));
138 
139 	/* Calculate deltas */
140 	enq_delta = skel->bss->enqueue_cnt - enq_start;
141 	deq_delta = skel->bss->dequeue_cnt - deq_start;
142 	dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
143 	change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
144 	bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;
145 
146 	printf("%s:\n", scenario_name);
147 	printf("  enqueues: %lu\n", (unsigned long)enq_delta);
148 	printf("  dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
149 	       (unsigned long)deq_delta,
150 	       (unsigned long)dispatch_deq_delta,
151 	       (unsigned long)change_deq_delta);
152 	printf("  BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);
153 
154 	/*
155 	 * Validate enqueue/dequeue lifecycle tracking.
156 	 *
157 	 * For scenarios 0, 1, 3, 4 (local and global DSQs from
158 	 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues
159 	 * should be 0 because tasks bypass the BPF scheduler entirely:
160 	 * tasks never enter BPF scheduler's custody.
161 	 *
162 	 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect
163 	 * both enqueues and dequeues.
164 	 *
165 	 * The BPF code does strict state machine validation with
166 	 * scx_bpf_error() to ensure the workflow semantics are correct.
167 	 *
168 	 * If we reach this point without errors, the semantics are
169 	 * validated correctly.
170 	 */
171 	if (scenario == 0 || scenario == 1 ||
172 	    scenario == 3 || scenario == 4) {
173 		/* Tasks bypass BPF scheduler completely */
174 		SCX_EQ(enq_delta, 0);
175 		SCX_EQ(deq_delta, 0);
176 		SCX_EQ(dispatch_deq_delta, 0);
177 		SCX_EQ(change_deq_delta, 0);
178 	} else {
179 		/*
180 		 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks
181 		 * enter BPF scheduler's custody.
182 		 *
183 		 * Also validate 1:1 enqueue/dequeue pairing.
184 		 */
185 		SCX_GT(enq_delta, 0);
186 		SCX_GT(deq_delta, 0);
187 		SCX_EQ(enq_delta, deq_delta);
188 	}
189 
190 	return SCX_TEST_PASS;
191 }
192 
setup(void ** ctx)193 static enum scx_test_status setup(void **ctx)
194 {
195 	struct dequeue *skel;
196 
197 	skel = dequeue__open();
198 	SCX_FAIL_IF(!skel, "Failed to open skel");
199 	SCX_ENUM_INIT(skel);
200 	SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");
201 
202 	*ctx = skel;
203 
204 	return SCX_TEST_PASS;
205 }
206 
run(void * ctx)207 static enum scx_test_status run(void *ctx)
208 {
209 	struct dequeue *skel = ctx;
210 	enum scx_test_status status;
211 
212 	status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
213 	if (status != SCX_TEST_PASS)
214 		return status;
215 
216 	status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
217 	if (status != SCX_TEST_PASS)
218 		return status;
219 
220 	status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
221 	if (status != SCX_TEST_PASS)
222 		return status;
223 
224 	status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
225 	if (status != SCX_TEST_PASS)
226 		return status;
227 
228 	status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
229 	if (status != SCX_TEST_PASS)
230 		return status;
231 
232 	status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
233 	if (status != SCX_TEST_PASS)
234 		return status;
235 
236 	status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
237 	if (status != SCX_TEST_PASS)
238 		return status;
239 
240 	printf("\n=== Summary ===\n");
241 	printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
242 	printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
243 	printf("  Dispatch dequeues: %lu (no flag, normal workflow)\n",
244 	       (unsigned long)skel->bss->dispatch_dequeue_cnt);
245 	printf("  Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
246 	       (unsigned long)skel->bss->change_dequeue_cnt);
247 	printf("  BPF queue full: %lu\n",
248 	       (unsigned long)skel->bss->bpf_queue_full);
249 	printf("\nAll scenarios passed - no state machine violations detected\n");
250 	printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
251 	printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
252 	printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
253 	printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
254 	printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
255 	printf("-> Validated: No duplicate enqueues or invalid state transitions\n");
256 
257 	return SCX_TEST_PASS;
258 }
259 
cleanup(void * ctx)260 static void cleanup(void *ctx)
261 {
262 	struct dequeue *skel = ctx;
263 
264 	dequeue__destroy(skel);
265 }
266 
267 struct scx_test dequeue_test = {
268 	.name = "dequeue",
269 	.description = "Verify ops.dequeue() semantics",
270 	.setup = setup,
271 	.run = run,
272 	.cleanup = cleanup,
273 };
274 
275 REGISTER_SCX_TEST(&dequeue_test)
276