1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Copyright (c) 2025 NVIDIA Corporation.
4 */
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <time.h>
10 #include <bpf/bpf.h>
11 #include <scx/common.h>
12 #include <sys/wait.h>
13 #include <sched.h>
14 #include <pthread.h>
15 #include "scx_test.h"
16 #include "dequeue.bpf.skel.h"
17
18 #define NUM_WORKERS 8
19 #define AFFINITY_HAMMER_MS 500
20
21 /*
22 * Worker function that creates enqueue/dequeue events via CPU work and
23 * sleep.
24 */
worker_fn(int id)25 static void worker_fn(int id)
26 {
27 int i;
28 volatile int sum = 0;
29
30 for (i = 0; i < 1000; i++) {
31 volatile int j;
32
33 /* Do some work to trigger scheduling events */
34 for (j = 0; j < 10000; j++)
35 sum += j;
36 asm volatile("" : : "r"(sum));
37
38 /* Sleep to trigger dequeue */
39 usleep(1000 + (id * 100));
40 }
41
42 exit(0);
43 }
44
45 /*
46 * This thread changes workers' affinity from outside so that some changes
47 * hit tasks while they are still in the scheduler's queue and trigger
48 * property-change dequeues.
49 */
affinity_hammer_fn(void * arg)50 static void *affinity_hammer_fn(void *arg)
51 {
52 pid_t *pids = arg;
53 cpu_set_t cpuset;
54 int i = 0, n = NUM_WORKERS;
55 struct timespec start, now;
56
57 clock_gettime(CLOCK_MONOTONIC, &start);
58 while (1) {
59 int w = i % n;
60 int cpu = (i / n) % 4;
61
62 CPU_ZERO(&cpuset);
63 CPU_SET(cpu, &cpuset);
64 sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
65 i++;
66
67 /* Check elapsed time every 256 iterations to limit gettime cost */
68 if ((i & 255) == 0) {
69 long long elapsed_ms;
70
71 clock_gettime(CLOCK_MONOTONIC, &now);
72 elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
73 (now.tv_nsec - start.tv_nsec) / 1000000;
74 if (elapsed_ms >= AFFINITY_HAMMER_MS)
75 break;
76 }
77 }
78 return NULL;
79 }
80
run_scenario(struct dequeue * skel,u32 scenario,const char * scenario_name)81 static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
82 const char *scenario_name)
83 {
84 struct bpf_link *link;
85 pid_t pids[NUM_WORKERS];
86 pthread_t hammer;
87
88 int i, status;
89 u64 enq_start, deq_start,
90 dispatch_deq_start, change_deq_start, bpf_queue_full_start;
91 u64 enq_delta, deq_delta,
92 dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;
93
94 /* Set the test scenario */
95 skel->bss->test_scenario = scenario;
96
97 /* Record starting counts */
98 enq_start = skel->bss->enqueue_cnt;
99 deq_start = skel->bss->dequeue_cnt;
100 dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
101 change_deq_start = skel->bss->change_dequeue_cnt;
102 bpf_queue_full_start = skel->bss->bpf_queue_full;
103
104 link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
105 SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);
106
107 /* Fork worker processes to generate enqueue/dequeue events */
108 for (i = 0; i < NUM_WORKERS; i++) {
109 pids[i] = fork();
110 SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);
111
112 if (pids[i] == 0) {
113 worker_fn(i);
114 /* Should not reach here */
115 exit(1);
116 }
117 }
118
119 /*
120 * Run an "affinity hammer" so that some property changes hit tasks
121 * while they are still in BPF custody (e.g., in user DSQ or BPF
122 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues.
123 */
124 SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
125 "Failed to create affinity hammer thread");
126 pthread_join(hammer, NULL);
127
128 /* Wait for all workers to complete */
129 for (i = 0; i < NUM_WORKERS; i++) {
130 SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
131 "Failed to wait for worker %d", i);
132 SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
133 }
134
135 bpf_link__destroy(link);
136
137 SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));
138
139 /* Calculate deltas */
140 enq_delta = skel->bss->enqueue_cnt - enq_start;
141 deq_delta = skel->bss->dequeue_cnt - deq_start;
142 dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
143 change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
144 bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;
145
146 printf("%s:\n", scenario_name);
147 printf(" enqueues: %lu\n", (unsigned long)enq_delta);
148 printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
149 (unsigned long)deq_delta,
150 (unsigned long)dispatch_deq_delta,
151 (unsigned long)change_deq_delta);
152 printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);
153
154 /*
155 * Validate enqueue/dequeue lifecycle tracking.
156 *
157 * For scenarios 0, 1, 3, 4 (local and global DSQs from
158 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues
159 * should be 0 because tasks bypass the BPF scheduler entirely:
160 * tasks never enter BPF scheduler's custody.
161 *
162 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect
163 * both enqueues and dequeues.
164 *
165 * The BPF code does strict state machine validation with
166 * scx_bpf_error() to ensure the workflow semantics are correct.
167 *
168 * If we reach this point without errors, the semantics are
169 * validated correctly.
170 */
171 if (scenario == 0 || scenario == 1 ||
172 scenario == 3 || scenario == 4) {
173 /* Tasks bypass BPF scheduler completely */
174 SCX_EQ(enq_delta, 0);
175 SCX_EQ(deq_delta, 0);
176 SCX_EQ(dispatch_deq_delta, 0);
177 SCX_EQ(change_deq_delta, 0);
178 } else {
179 /*
180 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks
181 * enter BPF scheduler's custody.
182 *
183 * Also validate 1:1 enqueue/dequeue pairing.
184 */
185 SCX_GT(enq_delta, 0);
186 SCX_GT(deq_delta, 0);
187 SCX_EQ(enq_delta, deq_delta);
188 }
189
190 return SCX_TEST_PASS;
191 }
192
setup(void ** ctx)193 static enum scx_test_status setup(void **ctx)
194 {
195 struct dequeue *skel;
196
197 skel = dequeue__open();
198 SCX_FAIL_IF(!skel, "Failed to open skel");
199 SCX_ENUM_INIT(skel);
200 SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");
201
202 *ctx = skel;
203
204 return SCX_TEST_PASS;
205 }
206
run(void * ctx)207 static enum scx_test_status run(void *ctx)
208 {
209 struct dequeue *skel = ctx;
210 enum scx_test_status status;
211
212 status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
213 if (status != SCX_TEST_PASS)
214 return status;
215
216 status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
217 if (status != SCX_TEST_PASS)
218 return status;
219
220 status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
221 if (status != SCX_TEST_PASS)
222 return status;
223
224 status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
225 if (status != SCX_TEST_PASS)
226 return status;
227
228 status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
229 if (status != SCX_TEST_PASS)
230 return status;
231
232 status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
233 if (status != SCX_TEST_PASS)
234 return status;
235
236 status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
237 if (status != SCX_TEST_PASS)
238 return status;
239
240 printf("\n=== Summary ===\n");
241 printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
242 printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
243 printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n",
244 (unsigned long)skel->bss->dispatch_dequeue_cnt);
245 printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
246 (unsigned long)skel->bss->change_dequeue_cnt);
247 printf(" BPF queue full: %lu\n",
248 (unsigned long)skel->bss->bpf_queue_full);
249 printf("\nAll scenarios passed - no state machine violations detected\n");
250 printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
251 printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
252 printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
253 printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
254 printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
255 printf("-> Validated: No duplicate enqueues or invalid state transitions\n");
256
257 return SCX_TEST_PASS;
258 }
259
cleanup(void * ctx)260 static void cleanup(void *ctx)
261 {
262 struct dequeue *skel = ctx;
263
264 dequeue__destroy(skel);
265 }
266
267 struct scx_test dequeue_test = {
268 .name = "dequeue",
269 .description = "Verify ops.dequeue() semantics",
270 .setup = setup,
271 .run = run,
272 .cleanup = cleanup,
273 };
274
275 REGISTER_SCX_TEST(&dequeue_test)
276