Lines Matching full:queue
15 #include "funnel-queue.h"
28 * DOC: Work queue definition.
36 /* Name of just the work queue (e.g., "cpuQ12") */
73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
75 return ((queue == NULL) ?
76 NULL : container_of(queue, struct simple_work_queue, common));
79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
81 return ((queue == NULL) ?
83 container_of(queue, struct round_robin_work_queue, common));
96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
100 for (i = queue->common.type->max_priority; i >= 0; i--) {
101 struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
110 static void enqueue_work_queue_completion(struct simple_work_queue *queue,
115 completion, completion->callback, queue, completion->my_queue);
117 completion->priority = queue->common.type->default_priority;
119 if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority,
120 "priority is in range for queue") != VDO_SUCCESS)
123 completion->my_queue = &queue->common;
125 /* Funnel queue handles the synchronization for the put. */
126 vdo_funnel_queue_put(queue->priority_lists[completion->priority],
130 * Due to how funnel queue synchronization is handled (just atomic operations), the
132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134 * have the same view of the queue as a producer thread.
146 if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
150 wake_up(&queue->waiting_worker_threads);
153 static void run_start_hook(struct simple_work_queue *queue)
155 if (queue->common.type->start != NULL)
156 queue->common.type->start(queue->private);
159 static void run_finish_hook(struct simple_work_queue *queue)
161 if (queue->common.type->finish != NULL)
162 queue->common.type->finish(queue->private);
174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
180 prepare_to_wait(&queue->waiting_worker_threads, &wait,
186 * queue; the producer side will do them in the reverse order. (There's still a
190 atomic_set(&queue->idle, 1);
191 smp_mb(); /* store-load barrier between "idle" and funnel queue */
193 completion = poll_for_completion(queue);
211 completion = poll_for_completion(queue);
216 finish_wait(&queue->waiting_worker_threads, &wait);
217 atomic_set(&queue->idle, 0);
222 static void process_completion(struct simple_work_queue *queue,
225 if (VDO_ASSERT(completion->my_queue == &queue->common,
226 "completion %px from queue %px marked as being in this queue (%px)",
227 completion, queue, completion->my_queue) == VDO_SUCCESS)
233 static void service_work_queue(struct simple_work_queue *queue)
235 run_start_hook(queue);
238 struct vdo_completion *completion = poll_for_completion(queue);
241 completion = wait_for_next_completion(queue);
248 process_completion(queue, completion);
258 run_finish_hook(queue);
263 struct simple_work_queue *queue = ptr;
265 complete(queue->started);
266 service_work_queue(queue);
272 static void free_simple_work_queue(struct simple_work_queue *queue)
277 vdo_free_funnel_queue(queue->priority_lists[i]);
278 vdo_free(queue->common.name);
279 vdo_free(queue);
282 static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
284 struct simple_work_queue **queue_table = queue->service_queues;
285 unsigned int count = queue->num_service_queues;
288 queue->service_queues = NULL;
293 vdo_free(queue->common.name);
294 vdo_free(queue);
297 void vdo_free_work_queue(struct vdo_work_queue *queue)
299 if (queue == NULL)
302 vdo_finish_work_queue(queue);
304 if (queue->round_robin_mode)
305 free_round_robin_work_queue(as_round_robin_work_queue(queue));
307 free_simple_work_queue(as_simple_work_queue(queue));
316 struct simple_work_queue *queue;
322 "queue priority count %u within limit %u", type->max_priority,
325 result = vdo_allocate(1, "simple work queue", &queue);
329 queue->private = private;
330 queue->started = &started;
331 queue->common.type = type;
332 queue->common.owner = owner;
333 init_waitqueue_head(&queue->waiting_worker_threads);
335 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
337 vdo_free(queue);
342 result = vdo_make_funnel_queue(&queue->priority_lists[i]);
344 free_simple_work_queue(queue);
349 thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
350 queue->common.name);
352 free_simple_work_queue(queue);
356 queue->thread = thread;
368 *queue_ptr = queue;
373 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
376 * @name: A base name to identify this queue.
377 * @owner: The vdo_thread structure to manage this queue.
378 * @type: The type of queue to create.
379 * @thread_count: The number of actual threads handling this queue.
381 * @queue_ptr: A pointer to return the new work queue.
383 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
384 * of the actual number of queues and threads allocated here, code outside of the queue
392 struct round_robin_work_queue *queue;
408 result = vdo_allocate(1, "round-robin work queue", &queue);
412 result = vdo_allocate(thread_count, "subordinate work queues", &queue->service_queues);
414 vdo_free(queue);
418 queue->num_service_queues = thread_count;
419 queue->common.round_robin_mode = true;
420 queue->common.owner = owner;
422 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
424 vdo_free(queue->service_queues);
425 vdo_free(queue);
429 *queue_ptr = &queue->common;
436 context, type, &queue->service_queues[i]);
438 queue->num_service_queues = i;
448 static void finish_simple_work_queue(struct simple_work_queue *queue)
450 if (queue->thread == NULL)
454 kthread_stop(queue->thread);
455 queue->thread = NULL;
458 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
460 struct simple_work_queue **queue_table = queue->service_queues;
461 unsigned int count = queue->num_service_queues;
469 void vdo_finish_work_queue(struct vdo_work_queue *queue)
471 if (queue == NULL)
474 if (queue->round_robin_mode)
475 finish_round_robin_work_queue(as_round_robin_work_queue(queue));
477 finish_simple_work_queue(as_simple_work_queue(queue));
482 static void dump_simple_work_queue(struct simple_work_queue *queue)
487 if (queue->thread != NULL) {
488 task_state_report = task_state_to_char(queue->thread);
489 thread_status = atomic_read(&queue->idle) ? "idle" : "running";
492 vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
495 /* ->waiting_worker_threads wait queue status? anyone waiting? */
503 void vdo_dump_work_queue(struct vdo_work_queue *queue)
505 if (queue->round_robin_mode) {
506 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
512 dump_simple_work_queue(as_simple_work_queue(queue));
560 void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
564 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
569 if (!queue->round_robin_mode) {
570 simple_queue = as_simple_work_queue(queue);
572 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
593 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
600 * the queue for the thread which was interrupted. However, the interrupted thread may have
608 /* Not a VDO work queue thread. */
616 struct simple_work_queue *queue = get_current_thread_work_queue();
618 return (queue == NULL) ? NULL : &queue->common;
621 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
623 return queue->owner;
628 * queue, or NULL if none or if the current thread is not a
629 * work queue thread.
633 struct simple_work_queue *queue = get_current_thread_work_queue();
635 return (queue != NULL) ? queue->private : NULL;
638 bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
641 return (queue->type == type);