xref: /freebsd/sys/contrib/openzfs/module/zfs/bqueue.c (revision eda14cbc264d6969b02f2b1994cef11148e914f1)
1*eda14cbcSMatt Macy /*
2*eda14cbcSMatt Macy  * CDDL HEADER START
3*eda14cbcSMatt Macy  *
4*eda14cbcSMatt Macy  * This file and its contents are supplied under the terms of the
5*eda14cbcSMatt Macy  * Common Development and Distribution License ("CDDL"), version 1.0.
6*eda14cbcSMatt Macy  * You may only use this file in accordance with the terms of version
7*eda14cbcSMatt Macy  * 1.0 of the CDDL.
8*eda14cbcSMatt Macy  *
9*eda14cbcSMatt Macy  * A full copy of the text of the CDDL should have accompanied this
10*eda14cbcSMatt Macy  * source.  A copy of the CDDL is also available via the Internet at
11*eda14cbcSMatt Macy  * http://www.illumos.org/license/CDDL.
12*eda14cbcSMatt Macy  *
13*eda14cbcSMatt Macy  * CDDL HEADER END
14*eda14cbcSMatt Macy  */
15*eda14cbcSMatt Macy /*
16*eda14cbcSMatt Macy  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
17*eda14cbcSMatt Macy  */
18*eda14cbcSMatt Macy 
19*eda14cbcSMatt Macy #include	<sys/bqueue.h>
20*eda14cbcSMatt Macy #include	<sys/zfs_context.h>
21*eda14cbcSMatt Macy 
22*eda14cbcSMatt Macy static inline bqueue_node_t *
23*eda14cbcSMatt Macy obj2node(bqueue_t *q, void *data)
24*eda14cbcSMatt Macy {
25*eda14cbcSMatt Macy 	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26*eda14cbcSMatt Macy }
27*eda14cbcSMatt Macy 
28*eda14cbcSMatt Macy /*
29*eda14cbcSMatt Macy  * Initialize a blocking queue  The maximum capacity of the queue is set to
30*eda14cbcSMatt Macy  * size.  Types that are stored in a bqueue must contain a bqueue_node_t,
31*eda14cbcSMatt Macy  * and node_offset must be its offset from the start of the struct.
32*eda14cbcSMatt Macy  * fill_fraction is a performance tuning value; when the queue is full, any
33*eda14cbcSMatt Macy  * threads attempting to enqueue records will block.  They will block until
34*eda14cbcSMatt Macy  * they're signaled, which will occur when the queue is at least 1/fill_fraction
35*eda14cbcSMatt Macy  * empty.  Similar behavior occurs on dequeue; if the queue is empty, threads
36*eda14cbcSMatt Macy  * block.  They will be signalled when the queue has 1/fill_fraction full, or
37*eda14cbcSMatt Macy  * when bqueue_flush is called.  As a result, you must call bqueue_flush when
38*eda14cbcSMatt Macy  * you enqueue your final record on a thread, in case the dequeueing threads are
39*eda14cbcSMatt Macy  * currently blocked and that enqueue does not cause them to be awoken.
40*eda14cbcSMatt Macy  * Alternatively, this behavior can be disabled (causing signaling to happen
41*eda14cbcSMatt Macy  * immediately) by setting fill_fraction to any value larger than size.
42*eda14cbcSMatt Macy  * Return 0 on success, or -1 on failure.
43*eda14cbcSMatt Macy  */
44*eda14cbcSMatt Macy int
45*eda14cbcSMatt Macy bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size,
46*eda14cbcSMatt Macy     size_t node_offset)
47*eda14cbcSMatt Macy {
48*eda14cbcSMatt Macy 	if (fill_fraction == 0) {
49*eda14cbcSMatt Macy 		return (-1);
50*eda14cbcSMatt Macy 	}
51*eda14cbcSMatt Macy 	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
52*eda14cbcSMatt Macy 	    node_offset + offsetof(bqueue_node_t, bqn_node));
53*eda14cbcSMatt Macy 	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
54*eda14cbcSMatt Macy 	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
55*eda14cbcSMatt Macy 	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
56*eda14cbcSMatt Macy 	q->bq_node_offset = node_offset;
57*eda14cbcSMatt Macy 	q->bq_size = 0;
58*eda14cbcSMatt Macy 	q->bq_maxsize = size;
59*eda14cbcSMatt Macy 	q->bq_fill_fraction = fill_fraction;
60*eda14cbcSMatt Macy 	return (0);
61*eda14cbcSMatt Macy }
62*eda14cbcSMatt Macy 
63*eda14cbcSMatt Macy /*
64*eda14cbcSMatt Macy  * Destroy a blocking queue.  This function asserts that there are no
65*eda14cbcSMatt Macy  * elements in the queue, and no one is blocked on the condition
66*eda14cbcSMatt Macy  * variables.
67*eda14cbcSMatt Macy  */
68*eda14cbcSMatt Macy void
69*eda14cbcSMatt Macy bqueue_destroy(bqueue_t *q)
70*eda14cbcSMatt Macy {
71*eda14cbcSMatt Macy 	mutex_enter(&q->bq_lock);
72*eda14cbcSMatt Macy 	ASSERT0(q->bq_size);
73*eda14cbcSMatt Macy 	cv_destroy(&q->bq_add_cv);
74*eda14cbcSMatt Macy 	cv_destroy(&q->bq_pop_cv);
75*eda14cbcSMatt Macy 	list_destroy(&q->bq_list);
76*eda14cbcSMatt Macy 	mutex_exit(&q->bq_lock);
77*eda14cbcSMatt Macy 	mutex_destroy(&q->bq_lock);
78*eda14cbcSMatt Macy }
79*eda14cbcSMatt Macy 
80*eda14cbcSMatt Macy static void
81*eda14cbcSMatt Macy bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size,
82*eda14cbcSMatt Macy     boolean_t flush)
83*eda14cbcSMatt Macy {
84*eda14cbcSMatt Macy 	ASSERT3U(item_size, >, 0);
85*eda14cbcSMatt Macy 	ASSERT3U(item_size, <=, q->bq_maxsize);
86*eda14cbcSMatt Macy 	mutex_enter(&q->bq_lock);
87*eda14cbcSMatt Macy 	obj2node(q, data)->bqn_size = item_size;
88*eda14cbcSMatt Macy 	while (q->bq_size + item_size > q->bq_maxsize) {
89*eda14cbcSMatt Macy 		cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
90*eda14cbcSMatt Macy 	}
91*eda14cbcSMatt Macy 	q->bq_size += item_size;
92*eda14cbcSMatt Macy 	list_insert_tail(&q->bq_list, data);
93*eda14cbcSMatt Macy 	if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
94*eda14cbcSMatt Macy 		cv_signal(&q->bq_pop_cv);
95*eda14cbcSMatt Macy 	if (flush)
96*eda14cbcSMatt Macy 		cv_broadcast(&q->bq_pop_cv);
97*eda14cbcSMatt Macy 	mutex_exit(&q->bq_lock);
98*eda14cbcSMatt Macy }
99*eda14cbcSMatt Macy 
100*eda14cbcSMatt Macy /*
101*eda14cbcSMatt Macy  * Add data to q, consuming size units of capacity.  If there is insufficient
102*eda14cbcSMatt Macy  * capacity to consume size units, block until capacity exists.  Asserts size is
103*eda14cbcSMatt Macy  * > 0.
104*eda14cbcSMatt Macy  */
105*eda14cbcSMatt Macy void
106*eda14cbcSMatt Macy bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
107*eda14cbcSMatt Macy {
108*eda14cbcSMatt Macy 	bqueue_enqueue_impl(q, data, item_size, B_FALSE);
109*eda14cbcSMatt Macy }
110*eda14cbcSMatt Macy 
111*eda14cbcSMatt Macy /*
112*eda14cbcSMatt Macy  * Enqueue an entry, and then flush the queue.  This forces the popping threads
113*eda14cbcSMatt Macy  * to wake up, even if we're below the fill fraction.  We have this in a single
114*eda14cbcSMatt Macy  * function, rather than having a separate call, because it prevents race
115*eda14cbcSMatt Macy  * conditions between the enqueuing thread and the dequeueing thread, where the
116*eda14cbcSMatt Macy  * enqueueing thread will wake up the dequeueing thread, that thread will
117*eda14cbcSMatt Macy  * destroy the condvar before the enqueuing thread is done.
118*eda14cbcSMatt Macy  */
119*eda14cbcSMatt Macy void
120*eda14cbcSMatt Macy bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size)
121*eda14cbcSMatt Macy {
122*eda14cbcSMatt Macy 	bqueue_enqueue_impl(q, data, item_size, B_TRUE);
123*eda14cbcSMatt Macy }
124*eda14cbcSMatt Macy 
125*eda14cbcSMatt Macy /*
126*eda14cbcSMatt Macy  * Take the first element off of q.  If there are no elements on the queue, wait
127*eda14cbcSMatt Macy  * until one is put there.  Return the removed element.
128*eda14cbcSMatt Macy  */
129*eda14cbcSMatt Macy void *
130*eda14cbcSMatt Macy bqueue_dequeue(bqueue_t *q)
131*eda14cbcSMatt Macy {
132*eda14cbcSMatt Macy 	void *ret = NULL;
133*eda14cbcSMatt Macy 	uint64_t item_size;
134*eda14cbcSMatt Macy 	mutex_enter(&q->bq_lock);
135*eda14cbcSMatt Macy 	while (q->bq_size == 0) {
136*eda14cbcSMatt Macy 		cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
137*eda14cbcSMatt Macy 	}
138*eda14cbcSMatt Macy 	ret = list_remove_head(&q->bq_list);
139*eda14cbcSMatt Macy 	ASSERT3P(ret, !=, NULL);
140*eda14cbcSMatt Macy 	item_size = obj2node(q, ret)->bqn_size;
141*eda14cbcSMatt Macy 	q->bq_size -= item_size;
142*eda14cbcSMatt Macy 	if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
143*eda14cbcSMatt Macy 		cv_signal(&q->bq_add_cv);
144*eda14cbcSMatt Macy 	mutex_exit(&q->bq_lock);
145*eda14cbcSMatt Macy 	return (ret);
146*eda14cbcSMatt Macy }
147*eda14cbcSMatt Macy 
148*eda14cbcSMatt Macy /*
149*eda14cbcSMatt Macy  * Returns true if the space used is 0.
150*eda14cbcSMatt Macy  */
151*eda14cbcSMatt Macy boolean_t
152*eda14cbcSMatt Macy bqueue_empty(bqueue_t *q)
153*eda14cbcSMatt Macy {
154*eda14cbcSMatt Macy 	return (q->bq_size == 0);
155*eda14cbcSMatt Macy }
156