1 /*
2 * CDDL HEADER START
3 *
4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
7 * 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15 /*
16 * Copyright (c) 2014 by Delphix. All rights reserved.
17 */
18
19 #include <sys/bqueue.h>
20 #include <sys/zfs_context.h>
21
22 static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23 obj2node(bqueue_t *q, void *data)
24 {
25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26 }
27
28 /*
29 * Initialize a blocking queue The maximum capacity of the queue is set to
30 * size. Types that want to be stored in a bqueue must contain a bqueue_node_t,
31 * and offset should give its offset from the start of the struct. Return 0 on
32 * success, or -1 on failure.
33 */
34 int
bqueue_init(bqueue_t * q,uint64_t size,size_t node_offset)35 bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36 {
37 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38 node_offset + offsetof(bqueue_node_t, bqn_node));
39 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42 q->bq_node_offset = node_offset;
43 q->bq_size = 0;
44 q->bq_maxsize = size;
45 return (0);
46 }
47
48 /*
49 * Destroy a blocking queue. This function asserts that there are no
50 * elements in the queue, and no one is blocked on the condition
51 * variables.
52 */
53 void
bqueue_destroy(bqueue_t * q)54 bqueue_destroy(bqueue_t *q)
55 {
56 ASSERT0(q->bq_size);
57 cv_destroy(&q->bq_add_cv);
58 cv_destroy(&q->bq_pop_cv);
59 mutex_destroy(&q->bq_lock);
60 list_destroy(&q->bq_list);
61 }
62
63 /*
64 * Add data to q, consuming size units of capacity. If there is insufficient
65 * capacity to consume size units, block until capacity exists. Asserts size is
66 * > 0.
67 */
68 void
bqueue_enqueue(bqueue_t * q,void * data,uint64_t item_size)69 bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70 {
71 ASSERT3U(item_size, >, 0);
72 ASSERT3U(item_size, <, q->bq_maxsize);
73 mutex_enter(&q->bq_lock);
74 obj2node(q, data)->bqn_size = item_size;
75 while (q->bq_size + item_size > q->bq_maxsize) {
76 cv_wait(&q->bq_add_cv, &q->bq_lock);
77 }
78 q->bq_size += item_size;
79 list_insert_tail(&q->bq_list, data);
80 cv_signal(&q->bq_pop_cv);
81 mutex_exit(&q->bq_lock);
82 }
83 /*
84 * Take the first element off of q. If there are no elements on the queue, wait
85 * until one is put there. Return the removed element.
86 */
87 void *
bqueue_dequeue(bqueue_t * q)88 bqueue_dequeue(bqueue_t *q)
89 {
90 void *ret;
91 uint64_t item_size;
92 mutex_enter(&q->bq_lock);
93 while (q->bq_size == 0) {
94 cv_wait(&q->bq_pop_cv, &q->bq_lock);
95 }
96 ret = list_remove_head(&q->bq_list);
97 item_size = obj2node(q, ret)->bqn_size;
98 q->bq_size -= item_size;
99 mutex_exit(&q->bq_lock);
100 cv_signal(&q->bq_add_cv);
101 return (ret);
102 }
103
104 /*
105 * Returns true if the space used is 0.
106 */
107 boolean_t
bqueue_empty(bqueue_t * q)108 bqueue_empty(bqueue_t *q)
109 {
110 return (q->bq_size == 0);
111 }
112