1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "funnel-workqueue.h" 7 8 #include <linux/atomic.h> 9 #include <linux/cache.h> 10 #include <linux/completion.h> 11 #include <linux/err.h> 12 #include <linux/kthread.h> 13 #include <linux/percpu.h> 14 15 #include "funnel-queue.h" 16 #include "logger.h" 17 #include "memory-alloc.h" 18 #include "numeric.h" 19 #include "permassert.h" 20 #include "string-utils.h" 21 22 #include "completion.h" 23 #include "status-codes.h" 24 25 static DEFINE_PER_CPU(unsigned int, service_queue_rotor); 26 27 /** 28 * DOC: Work queue definition. 29 * 30 * There are two types of work queues: simple, with one worker thread, and round-robin, which uses 31 * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly). 32 * Externally, both are represented via the same common sub-structure, though there's actually not 33 * a great deal of overlap between the two types internally. 34 */ 35 struct vdo_work_queue { 36 /* Name of just the work queue (e.g., "cpuQ12") */ 37 char *name; 38 bool round_robin_mode; 39 struct vdo_thread *owner; 40 /* Life cycle functions, etc */ 41 const struct vdo_work_queue_type *type; 42 }; 43 44 struct simple_work_queue { 45 struct vdo_work_queue common; 46 struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1]; 47 void *private; 48 49 /* 50 * The fields above are unchanged after setup but often read, and are good candidates for 51 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned. 52 * The fields below are often modified as we sleep and wake, so we want a separate cache 53 * line for performance. 54 */ 55 56 /* Any (0 or 1) worker threads waiting for new work to do */ 57 wait_queue_head_t waiting_worker_threads ____cacheline_aligned; 58 /* Hack to reduce wakeup calls if the worker thread is running */ 59 atomic_t idle; 60 61 /* These are infrequently used so in terms of performance we don't care where they land. */ 62 struct task_struct *thread; 63 /* Notify creator once worker has initialized */ 64 struct completion *started; 65 }; 66 67 struct round_robin_work_queue { 68 struct vdo_work_queue common; 69 struct simple_work_queue **service_queues; 70 unsigned int num_service_queues; 71 }; 72 73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue) 74 { 75 return ((queue == NULL) ? 76 NULL : container_of(queue, struct simple_work_queue, common)); 77 } 78 79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue) 80 { 81 return ((queue == NULL) ? 82 NULL : 83 container_of(queue, struct round_robin_work_queue, common)); 84 } 85 86 /* Processing normal completions. */ 87 88 /* 89 * Dequeue and return the next waiting completion, if any. 90 * 91 * We scan the funnel queues from highest priority to lowest, once; there is therefore a race 92 * condition where a high-priority completion can be enqueued followed by a lower-priority one, and 93 * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict 94 * enforcement of priorities becomes necessary, this function will need fixing. 95 */ 96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue) 97 { 98 int i; 99 100 for (i = queue->common.type->max_priority; i >= 0; i--) { 101 struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]); 102 103 if (link != NULL) 104 return container_of(link, struct vdo_completion, work_queue_entry_link); 105 } 106 107 return NULL; 108 } 109 110 static void enqueue_work_queue_completion(struct simple_work_queue *queue, 111 struct vdo_completion *completion) 112 { 113 VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL, 114 "completion %px (fn %px) to enqueue (%px) is not already queued (%px)", 115 completion, completion->callback, queue, completion->my_queue); 116 if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY) 117 completion->priority = queue->common.type->default_priority; 118 119 if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority, 120 "priority is in range for queue") != VDO_SUCCESS) 121 completion->priority = 0; 122 123 completion->my_queue = &queue->common; 124 125 /* Funnel queue handles the synchronization for the put. */ 126 vdo_funnel_queue_put(queue->priority_lists[completion->priority], 127 &completion->work_queue_entry_link); 128 129 /* 130 * Due to how funnel queue synchronization is handled (just atomic operations), the 131 * simplest safe implementation here would be to wake-up any waiting threads after 132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an 133 * item to the queue, the consumer thread may not see this since it is not guaranteed to 134 * have the same view of the queue as a producer thread. 135 * 136 * However, the above is wasteful so instead we attempt to minimize the number of thread 137 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be 138 * able to determine when the worker thread might be asleep or going to sleep. We use 139 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for 140 * waking the worker thread, so multiple wakeups aren't tried at once. 141 * 142 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read 143 * first is any better or worse for other platforms, even other x86 configurations. 144 */ 145 smp_mb(); 146 if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1)) 147 return; 148 149 /* There's a maximum of one thread in this list. */ 150 wake_up(&queue->waiting_worker_threads); 151 } 152 153 static void run_start_hook(struct simple_work_queue *queue) 154 { 155 if (queue->common.type->start != NULL) 156 queue->common.type->start(queue->private); 157 } 158 159 static void run_finish_hook(struct simple_work_queue *queue) 160 { 161 if (queue->common.type->finish != NULL) 162 queue->common.type->finish(queue->private); 163 } 164 165 /* 166 * Wait for the next completion to process, or until kthread_should_stop indicates that it's time 167 * for us to shut down. 168 * 169 * If kthread_should_stop says it's time to stop but we have pending completions return a 170 * completion. 171 * 172 * Also update statistics relating to scheduler interactions. 173 */ 174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue) 175 { 176 struct vdo_completion *completion; 177 DEFINE_WAIT(wait); 178 179 while (true) { 180 prepare_to_wait(&queue->waiting_worker_threads, &wait, 181 TASK_INTERRUPTIBLE); 182 /* 183 * Don't set the idle flag until a wakeup will not be lost. 184 * 185 * Force synchronization between setting the idle flag and checking the funnel 186 * queue; the producer side will do them in the reverse order. (There's still a 187 * race condition we've chosen to allow, because we've got a timeout below that 188 * unwedges us if we hit it, but this may narrow the window a little.) 189 */ 190 atomic_set(&queue->idle, 1); 191 smp_mb(); /* store-load barrier between "idle" and funnel queue */ 192 193 completion = poll_for_completion(queue); 194 if (completion != NULL) 195 break; 196 197 /* 198 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up 199 * above. Otherwise, schedule() will put the thread to sleep and might miss a 200 * wakeup from kthread_stop() call in vdo_finish_work_queue(). 201 */ 202 if (kthread_should_stop()) 203 break; 204 205 schedule(); 206 207 /* 208 * Most of the time when we wake, it should be because there's work to do. If it 209 * was a spurious wakeup, continue looping. 210 */ 211 completion = poll_for_completion(queue); 212 if (completion != NULL) 213 break; 214 } 215 216 finish_wait(&queue->waiting_worker_threads, &wait); 217 atomic_set(&queue->idle, 0); 218 219 return completion; 220 } 221 222 static void process_completion(struct simple_work_queue *queue, 223 struct vdo_completion *completion) 224 { 225 if (VDO_ASSERT(completion->my_queue == &queue->common, 226 "completion %px from queue %px marked as being in this queue (%px)", 227 completion, queue, completion->my_queue) == VDO_SUCCESS) 228 completion->my_queue = NULL; 229 230 vdo_run_completion(completion); 231 } 232 233 static void service_work_queue(struct simple_work_queue *queue) 234 { 235 run_start_hook(queue); 236 237 while (true) { 238 struct vdo_completion *completion = poll_for_completion(queue); 239 240 if (completion == NULL) 241 completion = wait_for_next_completion(queue); 242 243 if (completion == NULL) { 244 /* No completions but kthread_should_stop() was triggered. */ 245 break; 246 } 247 248 process_completion(queue, completion); 249 250 /* 251 * Be friendly to a CPU that has other work to do, if the kernel has told us to. 252 * This speeds up some performance tests; that "other work" might include other VDO 253 * threads. 254 */ 255 if (need_resched()) 256 cond_resched(); 257 } 258 259 run_finish_hook(queue); 260 } 261 262 static int work_queue_runner(void *ptr) 263 { 264 struct simple_work_queue *queue = ptr; 265 266 complete(queue->started); 267 service_work_queue(queue); 268 return 0; 269 } 270 271 /* Creation & teardown */ 272 273 static void free_simple_work_queue(struct simple_work_queue *queue) 274 { 275 unsigned int i; 276 277 for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++) 278 vdo_free_funnel_queue(queue->priority_lists[i]); 279 vdo_free(queue->common.name); 280 vdo_free(queue); 281 } 282 283 static void free_round_robin_work_queue(struct round_robin_work_queue *queue) 284 { 285 struct simple_work_queue **queue_table = queue->service_queues; 286 unsigned int count = queue->num_service_queues; 287 unsigned int i; 288 289 queue->service_queues = NULL; 290 291 for (i = 0; i < count; i++) 292 free_simple_work_queue(queue_table[i]); 293 vdo_free(queue_table); 294 vdo_free(queue->common.name); 295 vdo_free(queue); 296 } 297 298 void vdo_free_work_queue(struct vdo_work_queue *queue) 299 { 300 if (queue == NULL) 301 return; 302 303 vdo_finish_work_queue(queue); 304 305 if (queue->round_robin_mode) 306 free_round_robin_work_queue(as_round_robin_work_queue(queue)); 307 else 308 free_simple_work_queue(as_simple_work_queue(queue)); 309 } 310 311 static int make_simple_work_queue(const char *thread_name_prefix, const char *name, 312 struct vdo_thread *owner, void *private, 313 const struct vdo_work_queue_type *type, 314 struct simple_work_queue **queue_ptr) 315 { 316 DECLARE_COMPLETION_ONSTACK(started); 317 struct simple_work_queue *queue; 318 int i; 319 struct task_struct *thread = NULL; 320 int result; 321 322 VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY), 323 "queue priority count %u within limit %u", type->max_priority, 324 VDO_WORK_Q_MAX_PRIORITY); 325 326 result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue); 327 if (result != VDO_SUCCESS) 328 return result; 329 330 queue->private = private; 331 queue->started = &started; 332 queue->common.type = type; 333 queue->common.owner = owner; 334 init_waitqueue_head(&queue->waiting_worker_threads); 335 336 result = vdo_duplicate_string(name, "queue name", &queue->common.name); 337 if (result != VDO_SUCCESS) { 338 vdo_free(queue); 339 return -ENOMEM; 340 } 341 342 for (i = 0; i <= type->max_priority; i++) { 343 result = vdo_make_funnel_queue(&queue->priority_lists[i]); 344 if (result != VDO_SUCCESS) { 345 free_simple_work_queue(queue); 346 return result; 347 } 348 } 349 350 thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix, 351 queue->common.name); 352 if (IS_ERR(thread)) { 353 free_simple_work_queue(queue); 354 return (int) PTR_ERR(thread); 355 } 356 357 queue->thread = thread; 358 359 /* 360 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to 361 * errors elsewhere) could cause it to never get as far as running VDO, skipping the 362 * cleanup code. 363 * 364 * Eventually we should just make that path safe too, and then we won't need this 365 * synchronization. 366 */ 367 wait_for_completion(&started); 368 369 *queue_ptr = queue; 370 return VDO_SUCCESS; 371 } 372 373 /** 374 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will 375 * be distributed to them in round-robin fashion. 376 * 377 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless 378 * of the actual number of queues and threads allocated here, code outside of the queue 379 * implementation will treat this as a single zone. 380 */ 381 int vdo_make_work_queue(const char *thread_name_prefix, const char *name, 382 struct vdo_thread *owner, const struct vdo_work_queue_type *type, 383 unsigned int thread_count, void *thread_privates[], 384 struct vdo_work_queue **queue_ptr) 385 { 386 struct round_robin_work_queue *queue; 387 int result; 388 char thread_name[TASK_COMM_LEN]; 389 unsigned int i; 390 391 if (thread_count == 1) { 392 struct simple_work_queue *simple_queue; 393 void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL); 394 395 result = make_simple_work_queue(thread_name_prefix, name, owner, context, 396 type, &simple_queue); 397 if (result == VDO_SUCCESS) 398 *queue_ptr = &simple_queue->common; 399 return result; 400 } 401 402 result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue", 403 &queue); 404 if (result != VDO_SUCCESS) 405 return result; 406 407 result = vdo_allocate(thread_count, struct simple_work_queue *, 408 "subordinate work queues", &queue->service_queues); 409 if (result != VDO_SUCCESS) { 410 vdo_free(queue); 411 return result; 412 } 413 414 queue->num_service_queues = thread_count; 415 queue->common.round_robin_mode = true; 416 queue->common.owner = owner; 417 418 result = vdo_duplicate_string(name, "queue name", &queue->common.name); 419 if (result != VDO_SUCCESS) { 420 vdo_free(queue->service_queues); 421 vdo_free(queue); 422 return -ENOMEM; 423 } 424 425 *queue_ptr = &queue->common; 426 427 for (i = 0; i < thread_count; i++) { 428 void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL); 429 430 snprintf(thread_name, sizeof(thread_name), "%s%u", name, i); 431 result = make_simple_work_queue(thread_name_prefix, thread_name, owner, 432 context, type, &queue->service_queues[i]); 433 if (result != VDO_SUCCESS) { 434 queue->num_service_queues = i; 435 /* Destroy previously created subordinates. */ 436 vdo_free_work_queue(vdo_forget(*queue_ptr)); 437 return result; 438 } 439 } 440 441 return VDO_SUCCESS; 442 } 443 444 static void finish_simple_work_queue(struct simple_work_queue *queue) 445 { 446 if (queue->thread == NULL) 447 return; 448 449 /* Tells the worker thread to shut down and waits for it to exit. */ 450 kthread_stop(queue->thread); 451 queue->thread = NULL; 452 } 453 454 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue) 455 { 456 struct simple_work_queue **queue_table = queue->service_queues; 457 unsigned int count = queue->num_service_queues; 458 unsigned int i; 459 460 for (i = 0; i < count; i++) 461 finish_simple_work_queue(queue_table[i]); 462 } 463 464 /* No enqueueing of completions should be done once this function is called. */ 465 void vdo_finish_work_queue(struct vdo_work_queue *queue) 466 { 467 if (queue == NULL) 468 return; 469 470 if (queue->round_robin_mode) 471 finish_round_robin_work_queue(as_round_robin_work_queue(queue)); 472 else 473 finish_simple_work_queue(as_simple_work_queue(queue)); 474 } 475 476 /* Debugging dumps */ 477 478 static void dump_simple_work_queue(struct simple_work_queue *queue) 479 { 480 const char *thread_status = "no threads"; 481 char task_state_report = '-'; 482 483 if (queue->thread != NULL) { 484 task_state_report = task_state_to_char(queue->thread); 485 thread_status = atomic_read(&queue->idle) ? "idle" : "running"; 486 } 487 488 vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name, 489 thread_status, task_state_report); 490 491 /* ->waiting_worker_threads wait queue status? anyone waiting? */ 492 } 493 494 /* 495 * Write to the buffer some info about the completion, for logging. Since the common use case is 496 * dumping info about a lot of completions to syslog all at once, the format favors brevity over 497 * readability. 498 */ 499 void vdo_dump_work_queue(struct vdo_work_queue *queue) 500 { 501 if (queue->round_robin_mode) { 502 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); 503 unsigned int i; 504 505 for (i = 0; i < round_robin->num_service_queues; i++) 506 dump_simple_work_queue(round_robin->service_queues[i]); 507 } else { 508 dump_simple_work_queue(as_simple_work_queue(queue)); 509 } 510 } 511 512 static void get_function_name(void *pointer, char *buffer, size_t buffer_length) 513 { 514 if (pointer == NULL) { 515 /* 516 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We 517 * sometimes use this when logging lots of data; don't be so verbose. 518 */ 519 strscpy(buffer, "-", buffer_length); 520 } else { 521 /* 522 * Use a pragma to defeat gcc's format checking, which doesn't understand that 523 * "%ps" actually does support a precision spec in Linux kernel code. 524 */ 525 char *space; 526 527 #pragma GCC diagnostic push 528 #pragma GCC diagnostic ignored "-Wformat" 529 snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer); 530 #pragma GCC diagnostic pop 531 532 space = strchr(buffer, ' '); 533 if (space != NULL) 534 *space = '\0'; 535 } 536 } 537 538 void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, 539 size_t length) 540 { 541 size_t current_length = 542 scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN, 543 (completion->my_queue == NULL ? "-" : completion->my_queue->name)); 544 545 if (current_length < length - 1) { 546 get_function_name((void *) completion->callback, buffer + current_length, 547 length - current_length); 548 } 549 } 550 551 /* Completion submission */ 552 /* 553 * If the completion has a timeout that has already passed, the timeout handler function may be 554 * invoked by this function. 555 */ 556 void vdo_enqueue_work_queue(struct vdo_work_queue *queue, 557 struct vdo_completion *completion) 558 { 559 /* 560 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue 561 * on. 562 */ 563 struct simple_work_queue *simple_queue = NULL; 564 565 if (!queue->round_robin_mode) { 566 simple_queue = as_simple_work_queue(queue); 567 } else { 568 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); 569 570 /* 571 * It shouldn't be a big deal if the same rotor gets used for multiple work queues. 572 * Any patterns that might develop are likely to be disrupted by random ordering of 573 * multiple completions and migration between cores, unless the load is so light as 574 * to be regular in ordering of tasks and the threads are confined to individual 575 * cores; with a load that light we won't care. 576 */ 577 unsigned int rotor = this_cpu_inc_return(service_queue_rotor); 578 unsigned int index = rotor % round_robin->num_service_queues; 579 580 simple_queue = round_robin->service_queues[index]; 581 } 582 583 enqueue_work_queue_completion(simple_queue, completion); 584 } 585 586 /* Misc */ 587 588 /* 589 * Return the work queue pointer recorded at initialization time in the work-queue stack handle 590 * initialized on the stack of the current thread, if any. 591 */ 592 static struct simple_work_queue *get_current_thread_work_queue(void) 593 { 594 /* 595 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find 596 * the queue for the thread which was interrupted. However, the interrupted thread may have 597 * been processing a completion, in which case starting to process another would violate 598 * our concurrency assumptions. 599 */ 600 if (in_interrupt()) 601 return NULL; 602 603 if (kthread_func(current) != work_queue_runner) 604 /* Not a VDO work queue thread. */ 605 return NULL; 606 607 return kthread_data(current); 608 } 609 610 struct vdo_work_queue *vdo_get_current_work_queue(void) 611 { 612 struct simple_work_queue *queue = get_current_thread_work_queue(); 613 614 return (queue == NULL) ? NULL : &queue->common; 615 } 616 617 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue) 618 { 619 return queue->owner; 620 } 621 622 /** 623 * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work 624 * queue, or NULL if none or if the current thread is not a 625 * work queue thread. 626 */ 627 void *vdo_get_work_queue_private_data(void) 628 { 629 struct simple_work_queue *queue = get_current_thread_work_queue(); 630 631 return (queue != NULL) ? queue->private : NULL; 632 } 633 634 bool vdo_work_queue_type_is(struct vdo_work_queue *queue, 635 const struct vdo_work_queue_type *type) 636 { 637 return (queue->type == type); 638 } 639