1*61145dc2SMartin Matuska // SPDX-License-Identifier: CDDL-1.0
2eda14cbcSMatt Macy /*
3eda14cbcSMatt Macy * CDDL HEADER START
4eda14cbcSMatt Macy *
5eda14cbcSMatt Macy * This file and its contents are supplied under the terms of the
6eda14cbcSMatt Macy * Common Development and Distribution License ("CDDL"), version 1.0.
7eda14cbcSMatt Macy * You may only use this file in accordance with the terms of version
8eda14cbcSMatt Macy * 1.0 of the CDDL.
9eda14cbcSMatt Macy *
10eda14cbcSMatt Macy * A full copy of the text of the CDDL should have accompanied this
11eda14cbcSMatt Macy * source. A copy of the CDDL is also available via the Internet at
12eda14cbcSMatt Macy * http://www.illumos.org/license/CDDL.
13eda14cbcSMatt Macy *
14eda14cbcSMatt Macy * CDDL HEADER END
15eda14cbcSMatt Macy */
16eda14cbcSMatt Macy /*
17eda14cbcSMatt Macy * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
18eda14cbcSMatt Macy */
19eda14cbcSMatt Macy
20eda14cbcSMatt Macy #include <sys/bqueue.h>
21eda14cbcSMatt Macy #include <sys/zfs_context.h>
22eda14cbcSMatt Macy
23eda14cbcSMatt Macy static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)24eda14cbcSMatt Macy obj2node(bqueue_t *q, void *data)
25eda14cbcSMatt Macy {
26eda14cbcSMatt Macy return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
27eda14cbcSMatt Macy }
28eda14cbcSMatt Macy
29eda14cbcSMatt Macy /*
30eda14cbcSMatt Macy * Initialize a blocking queue The maximum capacity of the queue is set to
3115f0b8c3SMartin Matuska * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
3215f0b8c3SMartin Matuska * node_offset must be its offset from the start of the struct. fill_fraction
3315f0b8c3SMartin Matuska * is a performance tuning value; when the queue is full, any threads
3415f0b8c3SMartin Matuska * attempting to enqueue records will block. They will block until they're
3515f0b8c3SMartin Matuska * signaled, which will occur when the queue is at least 1/fill_fraction
36eda14cbcSMatt Macy * empty. Similar behavior occurs on dequeue; if the queue is empty, threads
3715f0b8c3SMartin Matuska * block. They will be signalled when the queue has 1/fill_fraction full.
3815f0b8c3SMartin Matuska * As a result, you must call bqueue_enqueue_flush() when you enqueue your
3915f0b8c3SMartin Matuska * final record on a thread, in case the dequeuing threads are currently
4015f0b8c3SMartin Matuska * blocked and that enqueue does not cause them to be woken. Alternatively,
4115f0b8c3SMartin Matuska * this behavior can be disabled (causing signaling to happen immediately) by
4215f0b8c3SMartin Matuska * setting fill_fraction to any value larger than size. Return 0 on success,
4315f0b8c3SMartin Matuska * or -1 on failure.
4415f0b8c3SMartin Matuska *
4515f0b8c3SMartin Matuska * Note: The caller must ensure that for a given bqueue_t, there's only a
4615f0b8c3SMartin Matuska * single call to bqueue_enqueue() running at a time (e.g. by calling only
4715f0b8c3SMartin Matuska * from a single thread, or with locking around the call). Similarly, the
4815f0b8c3SMartin Matuska * caller must ensure that there's only a single call to bqueue_dequeue()
4915f0b8c3SMartin Matuska * running at a time. However, the one call to bqueue_enqueue() may be
5015f0b8c3SMartin Matuska * invoked concurrently with the one call to bqueue_dequeue().
51eda14cbcSMatt Macy */
52eda14cbcSMatt Macy int
bqueue_init(bqueue_t * q,uint_t fill_fraction,size_t size,size_t node_offset)53c7046f76SMartin Matuska bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
54eda14cbcSMatt Macy {
55eda14cbcSMatt Macy if (fill_fraction == 0) {
56eda14cbcSMatt Macy return (-1);
57eda14cbcSMatt Macy }
58eda14cbcSMatt Macy list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
59eda14cbcSMatt Macy node_offset + offsetof(bqueue_node_t, bqn_node));
6015f0b8c3SMartin Matuska list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
6115f0b8c3SMartin Matuska node_offset + offsetof(bqueue_node_t, bqn_node));
6215f0b8c3SMartin Matuska list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
6315f0b8c3SMartin Matuska node_offset + offsetof(bqueue_node_t, bqn_node));
64eda14cbcSMatt Macy cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
65eda14cbcSMatt Macy cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
66eda14cbcSMatt Macy mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
67eda14cbcSMatt Macy q->bq_node_offset = node_offset;
68eda14cbcSMatt Macy q->bq_size = 0;
6915f0b8c3SMartin Matuska q->bq_dequeuing_size = 0;
7015f0b8c3SMartin Matuska q->bq_enqueuing_size = 0;
71eda14cbcSMatt Macy q->bq_maxsize = size;
72eda14cbcSMatt Macy q->bq_fill_fraction = fill_fraction;
73eda14cbcSMatt Macy return (0);
74eda14cbcSMatt Macy }
75eda14cbcSMatt Macy
76eda14cbcSMatt Macy /*
77eda14cbcSMatt Macy * Destroy a blocking queue. This function asserts that there are no
78eda14cbcSMatt Macy * elements in the queue, and no one is blocked on the condition
79eda14cbcSMatt Macy * variables.
80eda14cbcSMatt Macy */
81eda14cbcSMatt Macy void
bqueue_destroy(bqueue_t * q)82eda14cbcSMatt Macy bqueue_destroy(bqueue_t *q)
83eda14cbcSMatt Macy {
84eda14cbcSMatt Macy mutex_enter(&q->bq_lock);
85eda14cbcSMatt Macy ASSERT0(q->bq_size);
8615f0b8c3SMartin Matuska ASSERT0(q->bq_dequeuing_size);
8715f0b8c3SMartin Matuska ASSERT0(q->bq_enqueuing_size);
88eda14cbcSMatt Macy cv_destroy(&q->bq_add_cv);
89eda14cbcSMatt Macy cv_destroy(&q->bq_pop_cv);
90eda14cbcSMatt Macy list_destroy(&q->bq_list);
9115f0b8c3SMartin Matuska list_destroy(&q->bq_dequeuing_list);
9215f0b8c3SMartin Matuska list_destroy(&q->bq_enqueuing_list);
93eda14cbcSMatt Macy mutex_exit(&q->bq_lock);
94eda14cbcSMatt Macy mutex_destroy(&q->bq_lock);
95eda14cbcSMatt Macy }
96eda14cbcSMatt Macy
97eda14cbcSMatt Macy static void
bqueue_enqueue_impl(bqueue_t * q,void * data,size_t item_size,boolean_t flush)98c7046f76SMartin Matuska bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
99eda14cbcSMatt Macy {
100eda14cbcSMatt Macy ASSERT3U(item_size, >, 0);
101eda14cbcSMatt Macy ASSERT3U(item_size, <=, q->bq_maxsize);
10215f0b8c3SMartin Matuska
103eda14cbcSMatt Macy obj2node(q, data)->bqn_size = item_size;
10415f0b8c3SMartin Matuska q->bq_enqueuing_size += item_size;
10515f0b8c3SMartin Matuska list_insert_tail(&q->bq_enqueuing_list, data);
10615f0b8c3SMartin Matuska
10715f0b8c3SMartin Matuska if (flush ||
10815f0b8c3SMartin Matuska q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
10915f0b8c3SMartin Matuska /* Append the enquing list to the shared list. */
11015f0b8c3SMartin Matuska mutex_enter(&q->bq_lock);
11115f0b8c3SMartin Matuska while (q->bq_size > q->bq_maxsize) {
112eda14cbcSMatt Macy cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
113eda14cbcSMatt Macy }
11415f0b8c3SMartin Matuska q->bq_size += q->bq_enqueuing_size;
11515f0b8c3SMartin Matuska list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
11615f0b8c3SMartin Matuska q->bq_enqueuing_size = 0;
117eda14cbcSMatt Macy cv_broadcast(&q->bq_pop_cv);
118eda14cbcSMatt Macy mutex_exit(&q->bq_lock);
119eda14cbcSMatt Macy }
12015f0b8c3SMartin Matuska }
121eda14cbcSMatt Macy
122eda14cbcSMatt Macy /*
123eda14cbcSMatt Macy * Add data to q, consuming size units of capacity. If there is insufficient
124eda14cbcSMatt Macy * capacity to consume size units, block until capacity exists. Asserts size is
125eda14cbcSMatt Macy * > 0.
126eda14cbcSMatt Macy */
127eda14cbcSMatt Macy void
bqueue_enqueue(bqueue_t * q,void * data,size_t item_size)128c7046f76SMartin Matuska bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
129eda14cbcSMatt Macy {
130eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_FALSE);
131eda14cbcSMatt Macy }
132eda14cbcSMatt Macy
133eda14cbcSMatt Macy /*
134eda14cbcSMatt Macy * Enqueue an entry, and then flush the queue. This forces the popping threads
135eda14cbcSMatt Macy * to wake up, even if we're below the fill fraction. We have this in a single
136eda14cbcSMatt Macy * function, rather than having a separate call, because it prevents race
13715f0b8c3SMartin Matuska * conditions between the enqueuing thread and the dequeuing thread, where the
13815f0b8c3SMartin Matuska * enqueueing thread will wake up the dequeuing thread, that thread will
139eda14cbcSMatt Macy * destroy the condvar before the enqueuing thread is done.
140eda14cbcSMatt Macy */
141eda14cbcSMatt Macy void
bqueue_enqueue_flush(bqueue_t * q,void * data,size_t item_size)142c7046f76SMartin Matuska bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
143eda14cbcSMatt Macy {
144eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_TRUE);
145eda14cbcSMatt Macy }
146eda14cbcSMatt Macy
147eda14cbcSMatt Macy /*
148eda14cbcSMatt Macy * Take the first element off of q. If there are no elements on the queue, wait
149eda14cbcSMatt Macy * until one is put there. Return the removed element.
150eda14cbcSMatt Macy */
151eda14cbcSMatt Macy void *
bqueue_dequeue(bqueue_t * q)152eda14cbcSMatt Macy bqueue_dequeue(bqueue_t *q)
153eda14cbcSMatt Macy {
15415f0b8c3SMartin Matuska void *ret = list_remove_head(&q->bq_dequeuing_list);
15515f0b8c3SMartin Matuska if (ret == NULL) {
15615f0b8c3SMartin Matuska /*
15715f0b8c3SMartin Matuska * Dequeuing list is empty. Wait for there to be something on
15815f0b8c3SMartin Matuska * the shared list, then move the entire shared list to the
15915f0b8c3SMartin Matuska * dequeuing list.
16015f0b8c3SMartin Matuska */
161eda14cbcSMatt Macy mutex_enter(&q->bq_lock);
162eda14cbcSMatt Macy while (q->bq_size == 0) {
163eda14cbcSMatt Macy cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
164eda14cbcSMatt Macy }
16515f0b8c3SMartin Matuska ASSERT0(q->bq_dequeuing_size);
16615f0b8c3SMartin Matuska ASSERT(list_is_empty(&q->bq_dequeuing_list));
16715f0b8c3SMartin Matuska list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
16815f0b8c3SMartin Matuska q->bq_dequeuing_size = q->bq_size;
16915f0b8c3SMartin Matuska q->bq_size = 0;
17015f0b8c3SMartin Matuska cv_broadcast(&q->bq_add_cv);
171eda14cbcSMatt Macy mutex_exit(&q->bq_lock);
17215f0b8c3SMartin Matuska ret = list_remove_head(&q->bq_dequeuing_list);
173eda14cbcSMatt Macy }
17415f0b8c3SMartin Matuska q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
17515f0b8c3SMartin Matuska return (ret);
176eda14cbcSMatt Macy }
177