1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2025 NVIDIA Corporation. 4 */ 5 #define _GNU_SOURCE 6 #include <stdio.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <time.h> 10 #include <bpf/bpf.h> 11 #include <scx/common.h> 12 #include <sys/wait.h> 13 #include <sched.h> 14 #include <pthread.h> 15 #include "scx_test.h" 16 #include "dequeue.bpf.skel.h" 17 18 #define NUM_WORKERS 8 19 #define AFFINITY_HAMMER_MS 500 20 21 /* 22 * Worker function that creates enqueue/dequeue events via CPU work and 23 * sleep. 24 */ 25 static void worker_fn(int id) 26 { 27 int i; 28 volatile int sum = 0; 29 30 for (i = 0; i < 1000; i++) { 31 volatile int j; 32 33 /* Do some work to trigger scheduling events */ 34 for (j = 0; j < 10000; j++) 35 sum += j; 36 asm volatile("" : : "r"(sum)); 37 38 /* Sleep to trigger dequeue */ 39 usleep(1000 + (id * 100)); 40 } 41 42 exit(0); 43 } 44 45 /* 46 * This thread changes workers' affinity from outside so that some changes 47 * hit tasks while they are still in the scheduler's queue and trigger 48 * property-change dequeues. 49 */ 50 static void *affinity_hammer_fn(void *arg) 51 { 52 pid_t *pids = arg; 53 cpu_set_t cpuset; 54 int i = 0, n = NUM_WORKERS; 55 struct timespec start, now; 56 57 clock_gettime(CLOCK_MONOTONIC, &start); 58 while (1) { 59 int w = i % n; 60 int cpu = (i / n) % 4; 61 62 CPU_ZERO(&cpuset); 63 CPU_SET(cpu, &cpuset); 64 sched_setaffinity(pids[w], sizeof(cpuset), &cpuset); 65 i++; 66 67 /* Check elapsed time every 256 iterations to limit gettime cost */ 68 if ((i & 255) == 0) { 69 long long elapsed_ms; 70 71 clock_gettime(CLOCK_MONOTONIC, &now); 72 elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL + 73 (now.tv_nsec - start.tv_nsec) / 1000000; 74 if (elapsed_ms >= AFFINITY_HAMMER_MS) 75 break; 76 } 77 } 78 return NULL; 79 } 80 81 static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario, 82 const char *scenario_name) 83 { 84 struct bpf_link *link; 85 pid_t pids[NUM_WORKERS]; 86 pthread_t hammer; 87 88 int i, status; 89 u64 enq_start, deq_start, 90 dispatch_deq_start, change_deq_start, bpf_queue_full_start; 91 u64 enq_delta, deq_delta, 92 dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta; 93 94 /* Set the test scenario */ 95 skel->bss->test_scenario = scenario; 96 97 /* Record starting counts */ 98 enq_start = skel->bss->enqueue_cnt; 99 deq_start = skel->bss->dequeue_cnt; 100 dispatch_deq_start = skel->bss->dispatch_dequeue_cnt; 101 change_deq_start = skel->bss->change_dequeue_cnt; 102 bpf_queue_full_start = skel->bss->bpf_queue_full; 103 104 link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops); 105 SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name); 106 107 /* Fork worker processes to generate enqueue/dequeue events */ 108 for (i = 0; i < NUM_WORKERS; i++) { 109 pids[i] = fork(); 110 SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i); 111 112 if (pids[i] == 0) { 113 worker_fn(i); 114 /* Should not reach here */ 115 exit(1); 116 } 117 } 118 119 /* 120 * Run an "affinity hammer" so that some property changes hit tasks 121 * while they are still in BPF custody (e.g., in user DSQ or BPF 122 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues. 123 */ 124 SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0, 125 "Failed to create affinity hammer thread"); 126 pthread_join(hammer, NULL); 127 128 /* Wait for all workers to complete */ 129 for (i = 0; i < NUM_WORKERS; i++) { 130 SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i], 131 "Failed to wait for worker %d", i); 132 SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status); 133 } 134 135 bpf_link__destroy(link); 136 137 SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG)); 138 139 /* Calculate deltas */ 140 enq_delta = skel->bss->enqueue_cnt - enq_start; 141 deq_delta = skel->bss->dequeue_cnt - deq_start; 142 dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start; 143 change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start; 144 bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start; 145 146 printf("%s:\n", scenario_name); 147 printf(" enqueues: %lu\n", (unsigned long)enq_delta); 148 printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n", 149 (unsigned long)deq_delta, 150 (unsigned long)dispatch_deq_delta, 151 (unsigned long)change_deq_delta); 152 printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta); 153 154 /* 155 * Validate enqueue/dequeue lifecycle tracking. 156 * 157 * For scenarios 0, 1, 3, 4 (local and global DSQs from 158 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues 159 * should be 0 because tasks bypass the BPF scheduler entirely: 160 * tasks never enter BPF scheduler's custody. 161 * 162 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect 163 * both enqueues and dequeues. 164 * 165 * The BPF code does strict state machine validation with 166 * scx_bpf_error() to ensure the workflow semantics are correct. 167 * 168 * If we reach this point without errors, the semantics are 169 * validated correctly. 170 */ 171 if (scenario == 0 || scenario == 1 || 172 scenario == 3 || scenario == 4) { 173 /* Tasks bypass BPF scheduler completely */ 174 SCX_EQ(enq_delta, 0); 175 SCX_EQ(deq_delta, 0); 176 SCX_EQ(dispatch_deq_delta, 0); 177 SCX_EQ(change_deq_delta, 0); 178 } else { 179 /* 180 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks 181 * enter BPF scheduler's custody. 182 * 183 * Also validate 1:1 enqueue/dequeue pairing. 184 */ 185 SCX_GT(enq_delta, 0); 186 SCX_GT(deq_delta, 0); 187 SCX_EQ(enq_delta, deq_delta); 188 } 189 190 return SCX_TEST_PASS; 191 } 192 193 static enum scx_test_status setup(void **ctx) 194 { 195 struct dequeue *skel; 196 197 skel = dequeue__open(); 198 SCX_FAIL_IF(!skel, "Failed to open skel"); 199 SCX_ENUM_INIT(skel); 200 SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel"); 201 202 *ctx = skel; 203 204 return SCX_TEST_PASS; 205 } 206 207 static enum scx_test_status run(void *ctx) 208 { 209 struct dequeue *skel = ctx; 210 enum scx_test_status status; 211 212 status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()"); 213 if (status != SCX_TEST_PASS) 214 return status; 215 216 status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()"); 217 if (status != SCX_TEST_PASS) 218 return status; 219 220 status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()"); 221 if (status != SCX_TEST_PASS) 222 return status; 223 224 status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()"); 225 if (status != SCX_TEST_PASS) 226 return status; 227 228 status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()"); 229 if (status != SCX_TEST_PASS) 230 return status; 231 232 status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()"); 233 if (status != SCX_TEST_PASS) 234 return status; 235 236 status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()"); 237 if (status != SCX_TEST_PASS) 238 return status; 239 240 printf("\n=== Summary ===\n"); 241 printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt); 242 printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt); 243 printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n", 244 (unsigned long)skel->bss->dispatch_dequeue_cnt); 245 printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n", 246 (unsigned long)skel->bss->change_dequeue_cnt); 247 printf(" BPF queue full: %lu\n", 248 (unsigned long)skel->bss->bpf_queue_full); 249 printf("\nAll scenarios passed - no state machine violations detected\n"); 250 printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n"); 251 printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n"); 252 printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n"); 253 printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n"); 254 printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n"); 255 printf("-> Validated: No duplicate enqueues or invalid state transitions\n"); 256 257 return SCX_TEST_PASS; 258 } 259 260 static void cleanup(void *ctx) 261 { 262 struct dequeue *skel = ctx; 263 264 dequeue__destroy(skel); 265 } 266 267 struct scx_test dequeue_test = { 268 .name = "dequeue", 269 .description = "Verify ops.dequeue() semantics", 270 .setup = setup, 271 .run = run, 272 .cleanup = cleanup, 273 }; 274 275 REGISTER_SCX_TEST(&dequeue_test) 276