1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * This file and its contents are supplied under the terms of the 6 * Common Development and Distribution License ("CDDL"), version 1.0. 7 * You may only use this file in accordance with the terms of version 8 * 1.0 of the CDDL. 9 * 10 * A full copy of the text of the CDDL should have accompanied this 11 * source. A copy of the CDDL is also available via the Internet at 12 * http://www.illumos.org/license/CDDL. 13 * 14 * CDDL HEADER END 15 */ 16 /* 17 * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 18 */ 19 20 #include <sys/bqueue.h> 21 #include <sys/zfs_context.h> 22 23 static inline bqueue_node_t * 24 obj2node(bqueue_t *q, void *data) 25 { 26 return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 27 } 28 29 /* 30 * Initialize a blocking queue The maximum capacity of the queue is set to 31 * size. Types that are stored in a bqueue must contain a bqueue_node_t, and 32 * node_offset must be its offset from the start of the struct. fill_fraction 33 * is a performance tuning value; when the queue is full, any threads 34 * attempting to enqueue records will block. They will block until they're 35 * signaled, which will occur when the queue is at least 1/fill_fraction 36 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 37 * block. They will be signalled when the queue has 1/fill_fraction full. 38 * As a result, you must call bqueue_enqueue_flush() when you enqueue your 39 * final record on a thread, in case the dequeuing threads are currently 40 * blocked and that enqueue does not cause them to be woken. Alternatively, 41 * this behavior can be disabled (causing signaling to happen immediately) by 42 * setting fill_fraction to any value larger than size. Return 0 on success, 43 * or -1 on failure. 44 * 45 * Note: The caller must ensure that for a given bqueue_t, there's only a 46 * single call to bqueue_enqueue() running at a time (e.g. by calling only 47 * from a single thread, or with locking around the call). Similarly, the 48 * caller must ensure that there's only a single call to bqueue_dequeue() 49 * running at a time. However, the one call to bqueue_enqueue() may be 50 * invoked concurrently with the one call to bqueue_dequeue(). 51 */ 52 int 53 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) 54 { 55 if (fill_fraction == 0) { 56 return (-1); 57 } 58 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 59 node_offset + offsetof(bqueue_node_t, bqn_node)); 60 list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t), 61 node_offset + offsetof(bqueue_node_t, bqn_node)); 62 list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t), 63 node_offset + offsetof(bqueue_node_t, bqn_node)); 64 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 65 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 66 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 67 q->bq_node_offset = node_offset; 68 q->bq_size = 0; 69 q->bq_dequeuing_size = 0; 70 q->bq_enqueuing_size = 0; 71 q->bq_maxsize = size; 72 q->bq_fill_fraction = fill_fraction; 73 return (0); 74 } 75 76 /* 77 * Destroy a blocking queue. This function asserts that there are no 78 * elements in the queue, and no one is blocked on the condition 79 * variables. 80 */ 81 void 82 bqueue_destroy(bqueue_t *q) 83 { 84 mutex_enter(&q->bq_lock); 85 ASSERT0(q->bq_size); 86 ASSERT0(q->bq_dequeuing_size); 87 ASSERT0(q->bq_enqueuing_size); 88 cv_destroy(&q->bq_add_cv); 89 cv_destroy(&q->bq_pop_cv); 90 list_destroy(&q->bq_list); 91 list_destroy(&q->bq_dequeuing_list); 92 list_destroy(&q->bq_enqueuing_list); 93 mutex_exit(&q->bq_lock); 94 mutex_destroy(&q->bq_lock); 95 } 96 97 static void 98 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) 99 { 100 ASSERT3U(item_size, >, 0); 101 ASSERT3U(item_size, <=, q->bq_maxsize); 102 103 obj2node(q, data)->bqn_size = item_size; 104 q->bq_enqueuing_size += item_size; 105 list_insert_tail(&q->bq_enqueuing_list, data); 106 107 if (flush || 108 q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) { 109 /* Append the enquing list to the shared list. */ 110 mutex_enter(&q->bq_lock); 111 while (q->bq_size > q->bq_maxsize) { 112 cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 113 } 114 q->bq_size += q->bq_enqueuing_size; 115 list_move_tail(&q->bq_list, &q->bq_enqueuing_list); 116 q->bq_enqueuing_size = 0; 117 cv_broadcast(&q->bq_pop_cv); 118 mutex_exit(&q->bq_lock); 119 } 120 } 121 122 /* 123 * Add data to q, consuming size units of capacity. If there is insufficient 124 * capacity to consume size units, block until capacity exists. Asserts size is 125 * > 0. 126 */ 127 void 128 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) 129 { 130 bqueue_enqueue_impl(q, data, item_size, B_FALSE); 131 } 132 133 /* 134 * Enqueue an entry, and then flush the queue. This forces the popping threads 135 * to wake up, even if we're below the fill fraction. We have this in a single 136 * function, rather than having a separate call, because it prevents race 137 * conditions between the enqueuing thread and the dequeuing thread, where the 138 * enqueueing thread will wake up the dequeuing thread, that thread will 139 * destroy the condvar before the enqueuing thread is done. 140 */ 141 void 142 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) 143 { 144 bqueue_enqueue_impl(q, data, item_size, B_TRUE); 145 } 146 147 /* 148 * Take the first element off of q. If there are no elements on the queue, wait 149 * until one is put there. Return the removed element. 150 */ 151 void * 152 bqueue_dequeue(bqueue_t *q) 153 { 154 void *ret = list_remove_head(&q->bq_dequeuing_list); 155 if (ret == NULL) { 156 /* 157 * Dequeuing list is empty. Wait for there to be something on 158 * the shared list, then move the entire shared list to the 159 * dequeuing list. 160 */ 161 mutex_enter(&q->bq_lock); 162 while (q->bq_size == 0) { 163 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 164 } 165 ASSERT0(q->bq_dequeuing_size); 166 ASSERT(list_is_empty(&q->bq_dequeuing_list)); 167 list_move_tail(&q->bq_dequeuing_list, &q->bq_list); 168 q->bq_dequeuing_size = q->bq_size; 169 q->bq_size = 0; 170 cv_broadcast(&q->bq_add_cv); 171 mutex_exit(&q->bq_lock); 172 ret = list_remove_head(&q->bq_dequeuing_list); 173 } 174 q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size; 175 return (ret); 176 } 177