Lines Matching +full:wait +full:- +full:queue

1 // SPDX-License-Identifier: GPL-2.0-only
6 #include "funnel-workqueue.h"
15 #include "funnel-queue.h"
17 #include "memory-alloc.h"
20 #include "string-utils.h"
23 #include "status-codes.h"
28 * DOC: Work queue definition.
30 * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31 * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32 * Externally, both are represented via the same common sub-structure, though there's actually not
36 /* Name of just the work queue (e.g., "cpuQ12") */
51 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue) in as_simple_work_queue() argument
75 return ((queue == NULL) ? in as_simple_work_queue()
76 NULL : container_of(queue, struct simple_work_queue, common)); in as_simple_work_queue()
79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue) in as_round_robin_work_queue() argument
81 return ((queue == NULL) ? in as_round_robin_work_queue()
83 container_of(queue, struct round_robin_work_queue, common)); in as_round_robin_work_queue()
92 * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93 * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue) in poll_for_completion() argument
100 for (i = queue->common.type->max_priority; i >= 0; i--) { in poll_for_completion()
101 struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]); in poll_for_completion()
110 static void enqueue_work_queue_completion(struct simple_work_queue *queue, in enqueue_work_queue_completion() argument
113 VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL, in enqueue_work_queue_completion()
115 completion, completion->callback, queue, completion->my_queue); in enqueue_work_queue_completion()
116 if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY) in enqueue_work_queue_completion()
117 completion->priority = queue->common.type->default_priority; in enqueue_work_queue_completion()
119 if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority, in enqueue_work_queue_completion()
120 "priority is in range for queue") != VDO_SUCCESS) in enqueue_work_queue_completion()
121 completion->priority = 0; in enqueue_work_queue_completion()
123 completion->my_queue = &queue->common; in enqueue_work_queue_completion()
125 /* Funnel queue handles the synchronization for the put. */ in enqueue_work_queue_completion()
126 vdo_funnel_queue_put(queue->priority_lists[completion->priority], in enqueue_work_queue_completion()
127 &completion->work_queue_entry_link); in enqueue_work_queue_completion()
130 * Due to how funnel queue synchronization is handled (just atomic operations), the in enqueue_work_queue_completion()
131 * simplest safe implementation here would be to wake-up any waiting threads after in enqueue_work_queue_completion()
132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an in enqueue_work_queue_completion()
133 * item to the queue, the consumer thread may not see this since it is not guaranteed to in enqueue_work_queue_completion()
134 * have the same view of the queue as a producer thread. in enqueue_work_queue_completion()
146 if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1)) in enqueue_work_queue_completion()
150 wake_up(&queue->waiting_worker_threads); in enqueue_work_queue_completion()
153 static void run_start_hook(struct simple_work_queue *queue) in run_start_hook() argument
155 if (queue->common.type->start != NULL) in run_start_hook()
156 queue->common.type->start(queue->private); in run_start_hook()
159 static void run_finish_hook(struct simple_work_queue *queue) in run_finish_hook() argument
161 if (queue->common.type->finish != NULL) in run_finish_hook()
162 queue->common.type->finish(queue->private); in run_finish_hook()
166 * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue) in wait_for_next_completion() argument
177 DEFINE_WAIT(wait); in wait_for_next_completion()
180 prepare_to_wait(&queue->waiting_worker_threads, &wait, in wait_for_next_completion()
186 * queue; the producer side will do them in the reverse order. (There's still a in wait_for_next_completion()
190 atomic_set(&queue->idle, 1); in wait_for_next_completion()
191 smp_mb(); /* store-load barrier between "idle" and funnel queue */ in wait_for_next_completion()
193 completion = poll_for_completion(queue); in wait_for_next_completion()
198 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up in wait_for_next_completion()
211 completion = poll_for_completion(queue); in wait_for_next_completion()
216 finish_wait(&queue->waiting_worker_threads, &wait); in wait_for_next_completion()
217 atomic_set(&queue->idle, 0); in wait_for_next_completion()
222 static void process_completion(struct simple_work_queue *queue, in process_completion() argument
225 if (VDO_ASSERT(completion->my_queue == &queue->common, in process_completion()
226 "completion %px from queue %px marked as being in this queue (%px)", in process_completion()
227 completion, queue, completion->my_queue) == VDO_SUCCESS) in process_completion()
228 completion->my_queue = NULL; in process_completion()
233 static void service_work_queue(struct simple_work_queue *queue) in service_work_queue() argument
235 run_start_hook(queue); in service_work_queue()
238 struct vdo_completion *completion = poll_for_completion(queue); in service_work_queue()
241 completion = wait_for_next_completion(queue); in service_work_queue()
248 process_completion(queue, completion); in service_work_queue()
259 run_finish_hook(queue); in service_work_queue()
264 struct simple_work_queue *queue = ptr; in work_queue_runner() local
266 complete(queue->started); in work_queue_runner()
267 service_work_queue(queue); in work_queue_runner()
273 static void free_simple_work_queue(struct simple_work_queue *queue) in free_simple_work_queue() argument
278 vdo_free_funnel_queue(queue->priority_lists[i]); in free_simple_work_queue()
279 vdo_free(queue->common.name); in free_simple_work_queue()
280 vdo_free(queue); in free_simple_work_queue()
283 static void free_round_robin_work_queue(struct round_robin_work_queue *queue) in free_round_robin_work_queue() argument
285 struct simple_work_queue **queue_table = queue->service_queues; in free_round_robin_work_queue()
286 unsigned int count = queue->num_service_queues; in free_round_robin_work_queue()
289 queue->service_queues = NULL; in free_round_robin_work_queue()
294 vdo_free(queue->common.name); in free_round_robin_work_queue()
295 vdo_free(queue); in free_round_robin_work_queue()
298 void vdo_free_work_queue(struct vdo_work_queue *queue) in vdo_free_work_queue() argument
300 if (queue == NULL) in vdo_free_work_queue()
303 vdo_finish_work_queue(queue); in vdo_free_work_queue()
305 if (queue->round_robin_mode) in vdo_free_work_queue()
306 free_round_robin_work_queue(as_round_robin_work_queue(queue)); in vdo_free_work_queue()
308 free_simple_work_queue(as_simple_work_queue(queue)); in vdo_free_work_queue()
317 struct simple_work_queue *queue; in make_simple_work_queue() local
322 VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY), in make_simple_work_queue()
323 "queue priority count %u within limit %u", type->max_priority, in make_simple_work_queue()
326 result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue); in make_simple_work_queue()
330 queue->private = private; in make_simple_work_queue()
331 queue->started = &started; in make_simple_work_queue()
332 queue->common.type = type; in make_simple_work_queue()
333 queue->common.owner = owner; in make_simple_work_queue()
334 init_waitqueue_head(&queue->waiting_worker_threads); in make_simple_work_queue()
336 result = vdo_duplicate_string(name, "queue name", &queue->common.name); in make_simple_work_queue()
338 vdo_free(queue); in make_simple_work_queue()
339 return -ENOMEM; in make_simple_work_queue()
342 for (i = 0; i <= type->max_priority; i++) { in make_simple_work_queue()
343 result = vdo_make_funnel_queue(&queue->priority_lists[i]); in make_simple_work_queue()
345 free_simple_work_queue(queue); in make_simple_work_queue()
350 thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix, in make_simple_work_queue()
351 queue->common.name); in make_simple_work_queue()
353 free_simple_work_queue(queue); in make_simple_work_queue()
357 queue->thread = thread; in make_simple_work_queue()
360 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to in make_simple_work_queue()
369 *queue_ptr = queue; in make_simple_work_queue()
374 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
375 * be distributed to them in round-robin fashion.
377 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
378 * of the actual number of queues and threads allocated here, code outside of the queue
386 struct round_robin_work_queue *queue; in vdo_make_work_queue() local
398 *queue_ptr = &simple_queue->common; in vdo_make_work_queue()
402 result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue", in vdo_make_work_queue()
403 &queue); in vdo_make_work_queue()
408 "subordinate work queues", &queue->service_queues); in vdo_make_work_queue()
410 vdo_free(queue); in vdo_make_work_queue()
414 queue->num_service_queues = thread_count; in vdo_make_work_queue()
415 queue->common.round_robin_mode = true; in vdo_make_work_queue()
416 queue->common.owner = owner; in vdo_make_work_queue()
418 result = vdo_duplicate_string(name, "queue name", &queue->common.name); in vdo_make_work_queue()
420 vdo_free(queue->service_queues); in vdo_make_work_queue()
421 vdo_free(queue); in vdo_make_work_queue()
422 return -ENOMEM; in vdo_make_work_queue()
425 *queue_ptr = &queue->common; in vdo_make_work_queue()
432 context, type, &queue->service_queues[i]); in vdo_make_work_queue()
434 queue->num_service_queues = i; in vdo_make_work_queue()
444 static void finish_simple_work_queue(struct simple_work_queue *queue) in finish_simple_work_queue() argument
446 if (queue->thread == NULL) in finish_simple_work_queue()
450 kthread_stop(queue->thread); in finish_simple_work_queue()
451 queue->thread = NULL; in finish_simple_work_queue()
454 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue) in finish_round_robin_work_queue() argument
456 struct simple_work_queue **queue_table = queue->service_queues; in finish_round_robin_work_queue()
457 unsigned int count = queue->num_service_queues; in finish_round_robin_work_queue()
465 void vdo_finish_work_queue(struct vdo_work_queue *queue) in vdo_finish_work_queue() argument
467 if (queue == NULL) in vdo_finish_work_queue()
470 if (queue->round_robin_mode) in vdo_finish_work_queue()
471 finish_round_robin_work_queue(as_round_robin_work_queue(queue)); in vdo_finish_work_queue()
473 finish_simple_work_queue(as_simple_work_queue(queue)); in vdo_finish_work_queue()
478 static void dump_simple_work_queue(struct simple_work_queue *queue) in dump_simple_work_queue() argument
481 char task_state_report = '-'; in dump_simple_work_queue()
483 if (queue->thread != NULL) { in dump_simple_work_queue()
484 task_state_report = task_state_to_char(queue->thread); in dump_simple_work_queue()
485 thread_status = atomic_read(&queue->idle) ? "idle" : "running"; in dump_simple_work_queue()
488 vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name, in dump_simple_work_queue()
491 /* ->waiting_worker_threads wait queue status? anyone waiting? */ in dump_simple_work_queue()
499 void vdo_dump_work_queue(struct vdo_work_queue *queue) in vdo_dump_work_queue() argument
501 if (queue->round_robin_mode) { in vdo_dump_work_queue()
502 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); in vdo_dump_work_queue()
505 for (i = 0; i < round_robin->num_service_queues; i++) in vdo_dump_work_queue()
506 dump_simple_work_queue(round_robin->service_queues[i]); in vdo_dump_work_queue()
508 dump_simple_work_queue(as_simple_work_queue(queue)); in vdo_dump_work_queue()
519 strscpy(buffer, "-", buffer_length); in get_function_name()
528 #pragma GCC diagnostic ignored "-Wformat" in get_function_name()
529 snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer); in get_function_name()
543 (completion->my_queue == NULL ? "-" : completion->my_queue->name)); in vdo_dump_completion_to_buffer()
545 if (current_length < length - 1) { in vdo_dump_completion_to_buffer()
546 get_function_name((void *) completion->callback, buffer + current_length, in vdo_dump_completion_to_buffer()
547 length - current_length); in vdo_dump_completion_to_buffer()
556 void vdo_enqueue_work_queue(struct vdo_work_queue *queue, in vdo_enqueue_work_queue() argument
560 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue in vdo_enqueue_work_queue()
565 if (!queue->round_robin_mode) { in vdo_enqueue_work_queue()
566 simple_queue = as_simple_work_queue(queue); in vdo_enqueue_work_queue()
568 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); in vdo_enqueue_work_queue()
578 unsigned int index = rotor % round_robin->num_service_queues; in vdo_enqueue_work_queue()
580 simple_queue = round_robin->service_queues[index]; in vdo_enqueue_work_queue()
589 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
596 * the queue for the thread which was interrupted. However, the interrupted thread may have in get_current_thread_work_queue()
604 /* Not a VDO work queue thread. */ in get_current_thread_work_queue()
612 struct simple_work_queue *queue = get_current_thread_work_queue(); in vdo_get_current_work_queue() local
614 return (queue == NULL) ? NULL : &queue->common; in vdo_get_current_work_queue()
617 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue) in vdo_get_work_queue_owner() argument
619 return queue->owner; in vdo_get_work_queue_owner()
623 * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
624 * queue, or NULL if none or if the current thread is not a
625 * work queue thread.
629 struct simple_work_queue *queue = get_current_thread_work_queue(); in vdo_get_work_queue_private_data() local
631 return (queue != NULL) ? queue->private : NULL; in vdo_get_work_queue_private_data()
634 bool vdo_work_queue_type_is(struct vdo_work_queue *queue, in vdo_work_queue_type_is() argument
637 return (queue->type == type); in vdo_work_queue_type_is()