xref: /linux/drivers/md/dm-vdo/funnel-workqueue.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "funnel-workqueue.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/cache.h>
10 #include <linux/completion.h>
11 #include <linux/err.h>
12 #include <linux/kthread.h>
13 #include <linux/percpu.h>
14 
15 #include "funnel-queue.h"
16 #include "logger.h"
17 #include "memory-alloc.h"
18 #include "numeric.h"
19 #include "permassert.h"
20 #include "string-utils.h"
21 
22 #include "completion.h"
23 #include "status-codes.h"
24 
25 static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
26 
27 /**
28  * DOC: Work queue definition.
29  *
30  * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31  * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32  * Externally, both are represented via the same common sub-structure, though there's actually not
33  * a great deal of overlap between the two types internally.
34  */
35 struct vdo_work_queue {
36 	/* Name of just the work queue (e.g., "cpuQ12") */
37 	char *name;
38 	bool round_robin_mode;
39 	struct vdo_thread *owner;
40 	/* Life cycle functions, etc */
41 	const struct vdo_work_queue_type *type;
42 };
43 
44 struct simple_work_queue {
45 	struct vdo_work_queue common;
46 	struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
47 	void *private;
48 
49 	/*
50 	 * The fields above are unchanged after setup but often read, and are good candidates for
51 	 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
52 	 * The fields below are often modified as we sleep and wake, so we want a separate cache
53 	 * line for performance.
54 	 */
55 
56 	/* Any (0 or 1) worker threads waiting for new work to do */
57 	wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
58 	/* Hack to reduce wakeup calls if the worker thread is running */
59 	atomic_t idle;
60 
61 	/* These are infrequently used so in terms of performance we don't care where they land. */
62 	struct task_struct *thread;
63 	/* Notify creator once worker has initialized */
64 	struct completion *started;
65 };
66 
67 struct round_robin_work_queue {
68 	struct vdo_work_queue common;
69 	struct simple_work_queue **service_queues;
70 	unsigned int num_service_queues;
71 };
72 
as_simple_work_queue(struct vdo_work_queue * queue)73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
74 {
75 	return ((queue == NULL) ?
76 		NULL : container_of(queue, struct simple_work_queue, common));
77 }
78 
as_round_robin_work_queue(struct vdo_work_queue * queue)79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
80 {
81 	return ((queue == NULL) ?
82 		 NULL :
83 		 container_of(queue, struct round_robin_work_queue, common));
84 }
85 
86 /* Processing normal completions. */
87 
88 /*
89  * Dequeue and return the next waiting completion, if any.
90  *
91  * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
92  * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93  * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
94  * enforcement of priorities becomes necessary, this function will need fixing.
95  */
poll_for_completion(struct simple_work_queue * queue)96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
97 {
98 	int i;
99 
100 	for (i = queue->common.type->max_priority; i >= 0; i--) {
101 		struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
102 
103 		if (link != NULL)
104 			return container_of(link, struct vdo_completion, work_queue_entry_link);
105 	}
106 
107 	return NULL;
108 }
109 
enqueue_work_queue_completion(struct simple_work_queue * queue,struct vdo_completion * completion)110 static void enqueue_work_queue_completion(struct simple_work_queue *queue,
111 					  struct vdo_completion *completion)
112 {
113 	VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL,
114 			    "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
115 			    completion, completion->callback, queue, completion->my_queue);
116 	if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
117 		completion->priority = queue->common.type->default_priority;
118 
119 	if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority,
120 		       "priority is in range for queue") != VDO_SUCCESS)
121 		completion->priority = 0;
122 
123 	completion->my_queue = &queue->common;
124 
125 	/* Funnel queue handles the synchronization for the put. */
126 	vdo_funnel_queue_put(queue->priority_lists[completion->priority],
127 			     &completion->work_queue_entry_link);
128 
129 	/*
130 	 * Due to how funnel queue synchronization is handled (just atomic operations), the
131 	 * simplest safe implementation here would be to wake-up any waiting threads after
132 	 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133 	 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134 	 * have the same view of the queue as a producer thread.
135 	 *
136 	 * However, the above is wasteful so instead we attempt to minimize the number of thread
137 	 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
138 	 * able to determine when the worker thread might be asleep or going to sleep. We use
139 	 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
140 	 * waking the worker thread, so multiple wakeups aren't tried at once.
141 	 *
142 	 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
143 	 * first is any better or worse for other platforms, even other x86 configurations.
144 	 */
145 	smp_mb();
146 	if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
147 		return;
148 
149 	/* There's a maximum of one thread in this list. */
150 	wake_up(&queue->waiting_worker_threads);
151 }
152 
run_start_hook(struct simple_work_queue * queue)153 static void run_start_hook(struct simple_work_queue *queue)
154 {
155 	if (queue->common.type->start != NULL)
156 		queue->common.type->start(queue->private);
157 }
158 
run_finish_hook(struct simple_work_queue * queue)159 static void run_finish_hook(struct simple_work_queue *queue)
160 {
161 	if (queue->common.type->finish != NULL)
162 		queue->common.type->finish(queue->private);
163 }
164 
165 /*
166  * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
167  * for us to shut down.
168  *
169  * If kthread_should_stop says it's time to stop but we have pending completions return a
170  * completion.
171  *
172  * Also update statistics relating to scheduler interactions.
173  */
wait_for_next_completion(struct simple_work_queue * queue)174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
175 {
176 	struct vdo_completion *completion;
177 	DEFINE_WAIT(wait);
178 
179 	while (true) {
180 		prepare_to_wait(&queue->waiting_worker_threads, &wait,
181 				TASK_INTERRUPTIBLE);
182 		/*
183 		 * Don't set the idle flag until a wakeup will not be lost.
184 		 *
185 		 * Force synchronization between setting the idle flag and checking the funnel
186 		 * queue; the producer side will do them in the reverse order. (There's still a
187 		 * race condition we've chosen to allow, because we've got a timeout below that
188 		 * unwedges us if we hit it, but this may narrow the window a little.)
189 		 */
190 		atomic_set(&queue->idle, 1);
191 		smp_mb(); /* store-load barrier between "idle" and funnel queue */
192 
193 		completion = poll_for_completion(queue);
194 		if (completion != NULL)
195 			break;
196 
197 		/*
198 		 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
199 		 * above. Otherwise, schedule() will put the thread to sleep and might miss a
200 		 * wakeup from kthread_stop() call in vdo_finish_work_queue().
201 		 */
202 		if (kthread_should_stop())
203 			break;
204 
205 		schedule();
206 
207 		/*
208 		 * Most of the time when we wake, it should be because there's work to do. If it
209 		 * was a spurious wakeup, continue looping.
210 		 */
211 		completion = poll_for_completion(queue);
212 		if (completion != NULL)
213 			break;
214 	}
215 
216 	finish_wait(&queue->waiting_worker_threads, &wait);
217 	atomic_set(&queue->idle, 0);
218 
219 	return completion;
220 }
221 
process_completion(struct simple_work_queue * queue,struct vdo_completion * completion)222 static void process_completion(struct simple_work_queue *queue,
223 			       struct vdo_completion *completion)
224 {
225 	if (VDO_ASSERT(completion->my_queue == &queue->common,
226 		       "completion %px from queue %px marked as being in this queue (%px)",
227 		       completion, queue, completion->my_queue) == VDO_SUCCESS)
228 		completion->my_queue = NULL;
229 
230 	vdo_run_completion(completion);
231 }
232 
service_work_queue(struct simple_work_queue * queue)233 static void service_work_queue(struct simple_work_queue *queue)
234 {
235 	run_start_hook(queue);
236 
237 	while (true) {
238 		struct vdo_completion *completion = poll_for_completion(queue);
239 
240 		if (completion == NULL)
241 			completion = wait_for_next_completion(queue);
242 
243 		if (completion == NULL) {
244 			/* No completions but kthread_should_stop() was triggered. */
245 			break;
246 		}
247 
248 		process_completion(queue, completion);
249 
250 		/*
251 		 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
252 		 * This speeds up some performance tests; that "other work" might include other VDO
253 		 * threads.
254 		 */
255 		cond_resched();
256 	}
257 
258 	run_finish_hook(queue);
259 }
260 
work_queue_runner(void * ptr)261 static int work_queue_runner(void *ptr)
262 {
263 	struct simple_work_queue *queue = ptr;
264 
265 	complete(queue->started);
266 	service_work_queue(queue);
267 	return 0;
268 }
269 
270 /* Creation & teardown */
271 
free_simple_work_queue(struct simple_work_queue * queue)272 static void free_simple_work_queue(struct simple_work_queue *queue)
273 {
274 	unsigned int i;
275 
276 	for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
277 		vdo_free_funnel_queue(queue->priority_lists[i]);
278 	vdo_free(queue->common.name);
279 	vdo_free(queue);
280 }
281 
free_round_robin_work_queue(struct round_robin_work_queue * queue)282 static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
283 {
284 	struct simple_work_queue **queue_table = queue->service_queues;
285 	unsigned int count = queue->num_service_queues;
286 	unsigned int i;
287 
288 	queue->service_queues = NULL;
289 
290 	for (i = 0; i < count; i++)
291 		free_simple_work_queue(queue_table[i]);
292 	vdo_free(queue_table);
293 	vdo_free(queue->common.name);
294 	vdo_free(queue);
295 }
296 
vdo_free_work_queue(struct vdo_work_queue * queue)297 void vdo_free_work_queue(struct vdo_work_queue *queue)
298 {
299 	if (queue == NULL)
300 		return;
301 
302 	vdo_finish_work_queue(queue);
303 
304 	if (queue->round_robin_mode)
305 		free_round_robin_work_queue(as_round_robin_work_queue(queue));
306 	else
307 		free_simple_work_queue(as_simple_work_queue(queue));
308 }
309 
make_simple_work_queue(const char * thread_name_prefix,const char * name,struct vdo_thread * owner,void * private,const struct vdo_work_queue_type * type,struct simple_work_queue ** queue_ptr)310 static int make_simple_work_queue(const char *thread_name_prefix, const char *name,
311 				  struct vdo_thread *owner, void *private,
312 				  const struct vdo_work_queue_type *type,
313 				  struct simple_work_queue **queue_ptr)
314 {
315 	DECLARE_COMPLETION_ONSTACK(started);
316 	struct simple_work_queue *queue;
317 	int i;
318 	struct task_struct *thread = NULL;
319 	int result;
320 
321 	VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
322 			    "queue priority count %u within limit %u", type->max_priority,
323 			    VDO_WORK_Q_MAX_PRIORITY);
324 
325 	result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue);
326 	if (result != VDO_SUCCESS)
327 		return result;
328 
329 	queue->private = private;
330 	queue->started = &started;
331 	queue->common.type = type;
332 	queue->common.owner = owner;
333 	init_waitqueue_head(&queue->waiting_worker_threads);
334 
335 	result = vdo_duplicate_string(name, "queue name", &queue->common.name);
336 	if (result != VDO_SUCCESS) {
337 		vdo_free(queue);
338 		return -ENOMEM;
339 	}
340 
341 	for (i = 0; i <= type->max_priority; i++) {
342 		result = vdo_make_funnel_queue(&queue->priority_lists[i]);
343 		if (result != VDO_SUCCESS) {
344 			free_simple_work_queue(queue);
345 			return result;
346 		}
347 	}
348 
349 	thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
350 			     queue->common.name);
351 	if (IS_ERR(thread)) {
352 		free_simple_work_queue(queue);
353 		return (int) PTR_ERR(thread);
354 	}
355 
356 	queue->thread = thread;
357 
358 	/*
359 	 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
360 	 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
361 	 * cleanup code.
362 	 *
363 	 * Eventually we should just make that path safe too, and then we won't need this
364 	 * synchronization.
365 	 */
366 	wait_for_completion(&started);
367 
368 	*queue_ptr = queue;
369 	return VDO_SUCCESS;
370 }
371 
372 /**
373  * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
374  *                         be distributed to them in round-robin fashion.
375  * @thread_name_prefix: A prefix for the thread names to identify them as a vdo thread.
376  * @name: A base name to identify this queue.
377  * @owner: The vdo_thread structure to manage this queue.
378  * @type: The type of queue to create.
379  * @thread_count: The number of actual threads handling this queue.
380  * @thread_privates: An array of private contexts, one for each thread; may be NULL.
381  * @queue_ptr: A pointer to return the new work queue.
382  *
383  * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
384  * of the actual number of queues and threads allocated here, code outside of the queue
385  * implementation will treat this as a single zone.
386  */
vdo_make_work_queue(const char * thread_name_prefix,const char * name,struct vdo_thread * owner,const struct vdo_work_queue_type * type,unsigned int thread_count,void * thread_privates[],struct vdo_work_queue ** queue_ptr)387 int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
388 			struct vdo_thread *owner, const struct vdo_work_queue_type *type,
389 			unsigned int thread_count, void *thread_privates[],
390 			struct vdo_work_queue **queue_ptr)
391 {
392 	struct round_robin_work_queue *queue;
393 	int result;
394 	char thread_name[TASK_COMM_LEN];
395 	unsigned int i;
396 
397 	if (thread_count == 1) {
398 		struct simple_work_queue *simple_queue;
399 		void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
400 
401 		result = make_simple_work_queue(thread_name_prefix, name, owner, context,
402 						type, &simple_queue);
403 		if (result == VDO_SUCCESS)
404 			*queue_ptr = &simple_queue->common;
405 		return result;
406 	}
407 
408 	result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue",
409 			      &queue);
410 	if (result != VDO_SUCCESS)
411 		return result;
412 
413 	result = vdo_allocate(thread_count, struct simple_work_queue *,
414 			      "subordinate work queues", &queue->service_queues);
415 	if (result != VDO_SUCCESS) {
416 		vdo_free(queue);
417 		return result;
418 	}
419 
420 	queue->num_service_queues = thread_count;
421 	queue->common.round_robin_mode = true;
422 	queue->common.owner = owner;
423 
424 	result = vdo_duplicate_string(name, "queue name", &queue->common.name);
425 	if (result != VDO_SUCCESS) {
426 		vdo_free(queue->service_queues);
427 		vdo_free(queue);
428 		return -ENOMEM;
429 	}
430 
431 	*queue_ptr = &queue->common;
432 
433 	for (i = 0; i < thread_count; i++) {
434 		void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
435 
436 		snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
437 		result = make_simple_work_queue(thread_name_prefix, thread_name, owner,
438 						context, type, &queue->service_queues[i]);
439 		if (result != VDO_SUCCESS) {
440 			queue->num_service_queues = i;
441 			/* Destroy previously created subordinates. */
442 			vdo_free_work_queue(vdo_forget(*queue_ptr));
443 			return result;
444 		}
445 	}
446 
447 	return VDO_SUCCESS;
448 }
449 
finish_simple_work_queue(struct simple_work_queue * queue)450 static void finish_simple_work_queue(struct simple_work_queue *queue)
451 {
452 	if (queue->thread == NULL)
453 		return;
454 
455 	/* Tells the worker thread to shut down and waits for it to exit. */
456 	kthread_stop(queue->thread);
457 	queue->thread = NULL;
458 }
459 
finish_round_robin_work_queue(struct round_robin_work_queue * queue)460 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
461 {
462 	struct simple_work_queue **queue_table = queue->service_queues;
463 	unsigned int count = queue->num_service_queues;
464 	unsigned int i;
465 
466 	for (i = 0; i < count; i++)
467 		finish_simple_work_queue(queue_table[i]);
468 }
469 
470 /* No enqueueing of completions should be done once this function is called. */
vdo_finish_work_queue(struct vdo_work_queue * queue)471 void vdo_finish_work_queue(struct vdo_work_queue *queue)
472 {
473 	if (queue == NULL)
474 		return;
475 
476 	if (queue->round_robin_mode)
477 		finish_round_robin_work_queue(as_round_robin_work_queue(queue));
478 	else
479 		finish_simple_work_queue(as_simple_work_queue(queue));
480 }
481 
482 /* Debugging dumps */
483 
dump_simple_work_queue(struct simple_work_queue * queue)484 static void dump_simple_work_queue(struct simple_work_queue *queue)
485 {
486 	const char *thread_status = "no threads";
487 	char task_state_report = '-';
488 
489 	if (queue->thread != NULL) {
490 		task_state_report = task_state_to_char(queue->thread);
491 		thread_status = atomic_read(&queue->idle) ? "idle" : "running";
492 	}
493 
494 	vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
495 		     thread_status, task_state_report);
496 
497 	/* ->waiting_worker_threads wait queue status? anyone waiting? */
498 }
499 
500 /*
501  * Write to the buffer some info about the completion, for logging. Since the common use case is
502  * dumping info about a lot of completions to syslog all at once, the format favors brevity over
503  * readability.
504  */
vdo_dump_work_queue(struct vdo_work_queue * queue)505 void vdo_dump_work_queue(struct vdo_work_queue *queue)
506 {
507 	if (queue->round_robin_mode) {
508 		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
509 		unsigned int i;
510 
511 		for (i = 0; i < round_robin->num_service_queues; i++)
512 			dump_simple_work_queue(round_robin->service_queues[i]);
513 	} else {
514 		dump_simple_work_queue(as_simple_work_queue(queue));
515 	}
516 }
517 
get_function_name(void * pointer,char * buffer,size_t buffer_length)518 static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
519 {
520 	if (pointer == NULL) {
521 		/*
522 		 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
523 		 * sometimes use this when logging lots of data; don't be so verbose.
524 		 */
525 		strscpy(buffer, "-", buffer_length);
526 	} else {
527 		/*
528 		 * Use a pragma to defeat gcc's format checking, which doesn't understand that
529 		 * "%ps" actually does support a precision spec in Linux kernel code.
530 		 */
531 		char *space;
532 
533 #pragma GCC diagnostic push
534 #pragma GCC diagnostic ignored "-Wformat"
535 		snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
536 #pragma GCC diagnostic pop
537 
538 		space = strchr(buffer, ' ');
539 		if (space != NULL)
540 			*space = '\0';
541 	}
542 }
543 
vdo_dump_completion_to_buffer(struct vdo_completion * completion,char * buffer,size_t length)544 void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
545 				   size_t length)
546 {
547 	size_t current_length =
548 		scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
549 			  (completion->my_queue == NULL ? "-" : completion->my_queue->name));
550 
551 	if (current_length < length - 1) {
552 		get_function_name((void *) completion->callback, buffer + current_length,
553 				  length - current_length);
554 	}
555 }
556 
557 /* Completion submission */
558 /*
559  * If the completion has a timeout that has already passed, the timeout handler function may be
560  * invoked by this function.
561  */
vdo_enqueue_work_queue(struct vdo_work_queue * queue,struct vdo_completion * completion)562 void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
563 			    struct vdo_completion *completion)
564 {
565 	/*
566 	 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
567 	 * on.
568 	 */
569 	struct simple_work_queue *simple_queue = NULL;
570 
571 	if (!queue->round_robin_mode) {
572 		simple_queue = as_simple_work_queue(queue);
573 	} else {
574 		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
575 
576 		/*
577 		 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
578 		 * Any patterns that might develop are likely to be disrupted by random ordering of
579 		 * multiple completions and migration between cores, unless the load is so light as
580 		 * to be regular in ordering of tasks and the threads are confined to individual
581 		 * cores; with a load that light we won't care.
582 		 */
583 		unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
584 		unsigned int index = rotor % round_robin->num_service_queues;
585 
586 		simple_queue = round_robin->service_queues[index];
587 	}
588 
589 	enqueue_work_queue_completion(simple_queue, completion);
590 }
591 
592 /* Misc */
593 
594 /*
595  * Return the work queue pointer recorded at initialization time in the work-queue stack handle
596  * initialized on the stack of the current thread, if any.
597  */
get_current_thread_work_queue(void)598 static struct simple_work_queue *get_current_thread_work_queue(void)
599 {
600 	/*
601 	 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
602 	 * the queue for the thread which was interrupted. However, the interrupted thread may have
603 	 * been processing a completion, in which case starting to process another would violate
604 	 * our concurrency assumptions.
605 	 */
606 	if (in_interrupt())
607 		return NULL;
608 
609 	if (kthread_func(current) != work_queue_runner)
610 		/* Not a VDO work queue thread. */
611 		return NULL;
612 
613 	return kthread_data(current);
614 }
615 
vdo_get_current_work_queue(void)616 struct vdo_work_queue *vdo_get_current_work_queue(void)
617 {
618 	struct simple_work_queue *queue = get_current_thread_work_queue();
619 
620 	return (queue == NULL) ? NULL : &queue->common;
621 }
622 
vdo_get_work_queue_owner(struct vdo_work_queue * queue)623 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
624 {
625 	return queue->owner;
626 }
627 
628 /**
629  * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
630  *                                     queue, or NULL if none or if the current thread is not a
631  *                                     work queue thread.
632  */
vdo_get_work_queue_private_data(void)633 void *vdo_get_work_queue_private_data(void)
634 {
635 	struct simple_work_queue *queue = get_current_thread_work_queue();
636 
637 	return (queue != NULL) ? queue->private : NULL;
638 }
639 
vdo_work_queue_type_is(struct vdo_work_queue * queue,const struct vdo_work_queue_type * type)640 bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
641 			    const struct vdo_work_queue_type *type)
642 {
643 	return (queue->type == type);
644 }
645