1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * This file and its contents are supplied under the terms of the
6 * Common Development and Distribution License ("CDDL"), version 1.0.
7 * You may only use this file in accordance with the terms of version
8 * 1.0 of the CDDL.
9 *
10 * A full copy of the text of the CDDL should have accompanied this
11 * source. A copy of the CDDL is also available via the Internet at
12 * http://www.illumos.org/license/CDDL.
13 *
14 * CDDL HEADER END
15 */
16 /*
17 * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
18 */
19
20 #include <sys/bqueue.h>
21 #include <sys/zfs_context.h>
22
23 static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)24 obj2node(bqueue_t *q, void *data)
25 {
26 return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
27 }
28
29 /*
30 * Initialize a blocking queue The maximum capacity of the queue is set to
31 * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
32 * node_offset must be its offset from the start of the struct. fill_fraction
33 * is a performance tuning value; when the queue is full, any threads
34 * attempting to enqueue records will block. They will block until they're
35 * signaled, which will occur when the queue is at least 1/fill_fraction
36 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads
37 * block. They will be signalled when the queue has 1/fill_fraction full.
38 * As a result, you must call bqueue_enqueue_flush() when you enqueue your
39 * final record on a thread, in case the dequeuing threads are currently
40 * blocked and that enqueue does not cause them to be woken. Alternatively,
41 * this behavior can be disabled (causing signaling to happen immediately) by
42 * setting fill_fraction to any value larger than size. Return 0 on success,
43 * or -1 on failure.
44 *
45 * Note: The caller must ensure that for a given bqueue_t, there's only a
46 * single call to bqueue_enqueue() running at a time (e.g. by calling only
47 * from a single thread, or with locking around the call). Similarly, the
48 * caller must ensure that there's only a single call to bqueue_dequeue()
49 * running at a time. However, the one call to bqueue_enqueue() may be
50 * invoked concurrently with the one call to bqueue_dequeue().
51 */
52 int
bqueue_init(bqueue_t * q,uint_t fill_fraction,size_t size,size_t node_offset)53 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
54 {
55 if (fill_fraction == 0) {
56 return (-1);
57 }
58 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
59 node_offset + offsetof(bqueue_node_t, bqn_node));
60 list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
61 node_offset + offsetof(bqueue_node_t, bqn_node));
62 list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
63 node_offset + offsetof(bqueue_node_t, bqn_node));
64 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
65 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
66 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
67 q->bq_node_offset = node_offset;
68 q->bq_size = 0;
69 q->bq_dequeuing_size = 0;
70 q->bq_enqueuing_size = 0;
71 q->bq_maxsize = size;
72 q->bq_fill_fraction = fill_fraction;
73 return (0);
74 }
75
76 /*
77 * Destroy a blocking queue. This function asserts that there are no
78 * elements in the queue, and no one is blocked on the condition
79 * variables.
80 */
81 void
bqueue_destroy(bqueue_t * q)82 bqueue_destroy(bqueue_t *q)
83 {
84 mutex_enter(&q->bq_lock);
85 ASSERT0(q->bq_size);
86 ASSERT0(q->bq_dequeuing_size);
87 ASSERT0(q->bq_enqueuing_size);
88 cv_destroy(&q->bq_add_cv);
89 cv_destroy(&q->bq_pop_cv);
90 list_destroy(&q->bq_list);
91 list_destroy(&q->bq_dequeuing_list);
92 list_destroy(&q->bq_enqueuing_list);
93 mutex_exit(&q->bq_lock);
94 mutex_destroy(&q->bq_lock);
95 }
96
97 static void
bqueue_enqueue_impl(bqueue_t * q,void * data,size_t item_size,boolean_t flush)98 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
99 {
100 ASSERT3U(item_size, >, 0);
101 ASSERT3U(item_size, <=, q->bq_maxsize);
102
103 obj2node(q, data)->bqn_size = item_size;
104 q->bq_enqueuing_size += item_size;
105 list_insert_tail(&q->bq_enqueuing_list, data);
106
107 if (flush ||
108 q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
109 /* Append the enquing list to the shared list. */
110 mutex_enter(&q->bq_lock);
111 while (q->bq_size > q->bq_maxsize) {
112 cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
113 }
114 q->bq_size += q->bq_enqueuing_size;
115 list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
116 q->bq_enqueuing_size = 0;
117 cv_broadcast(&q->bq_pop_cv);
118 mutex_exit(&q->bq_lock);
119 }
120 }
121
122 /*
123 * Add data to q, consuming size units of capacity. If there is insufficient
124 * capacity to consume size units, block until capacity exists. Asserts size is
125 * > 0.
126 */
127 void
bqueue_enqueue(bqueue_t * q,void * data,size_t item_size)128 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
129 {
130 bqueue_enqueue_impl(q, data, item_size, B_FALSE);
131 }
132
133 /*
134 * Enqueue an entry, and then flush the queue. This forces the popping threads
135 * to wake up, even if we're below the fill fraction. We have this in a single
136 * function, rather than having a separate call, because it prevents race
137 * conditions between the enqueuing thread and the dequeuing thread, where the
138 * enqueueing thread will wake up the dequeuing thread, that thread will
139 * destroy the condvar before the enqueuing thread is done.
140 */
141 void
bqueue_enqueue_flush(bqueue_t * q,void * data,size_t item_size)142 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
143 {
144 bqueue_enqueue_impl(q, data, item_size, B_TRUE);
145 }
146
147 /*
148 * Take the first element off of q. If there are no elements on the queue, wait
149 * until one is put there. Return the removed element.
150 */
151 void *
bqueue_dequeue(bqueue_t * q)152 bqueue_dequeue(bqueue_t *q)
153 {
154 void *ret = list_remove_head(&q->bq_dequeuing_list);
155 if (ret == NULL) {
156 /*
157 * Dequeuing list is empty. Wait for there to be something on
158 * the shared list, then move the entire shared list to the
159 * dequeuing list.
160 */
161 mutex_enter(&q->bq_lock);
162 while (q->bq_size == 0) {
163 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
164 }
165 ASSERT0(q->bq_dequeuing_size);
166 ASSERT(list_is_empty(&q->bq_dequeuing_list));
167 list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
168 q->bq_dequeuing_size = q->bq_size;
169 q->bq_size = 0;
170 cv_broadcast(&q->bq_add_cv);
171 mutex_exit(&q->bq_lock);
172 ret = list_remove_head(&q->bq_dequeuing_list);
173 }
174 q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
175 return (ret);
176 }
177