1 /* 2 * CDDL HEADER START 3 * 4 * This file and its contents are supplied under the terms of the 5 * Common Development and Distribution License ("CDDL"), version 1.0. 6 * You may only use this file in accordance with the terms of version 7 * 1.0 of the CDDL. 8 * 9 * A full copy of the text of the CDDL should have accompanied this 10 * source. A copy of the CDDL is also available via the Internet at 11 * http://www.illumos.org/license/CDDL. 12 * 13 * CDDL HEADER END 14 */ 15 /* 16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 17 */ 18 19 #include <sys/bqueue.h> 20 #include <sys/zfs_context.h> 21 22 static inline bqueue_node_t * 23 obj2node(bqueue_t *q, void *data) 24 { 25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26 } 27 28 /* 29 * Initialize a blocking queue The maximum capacity of the queue is set to 30 * size. Types that are stored in a bqueue must contain a bqueue_node_t, and 31 * node_offset must be its offset from the start of the struct. fill_fraction 32 * is a performance tuning value; when the queue is full, any threads 33 * attempting to enqueue records will block. They will block until they're 34 * signaled, which will occur when the queue is at least 1/fill_fraction 35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 36 * block. They will be signalled when the queue has 1/fill_fraction full. 37 * As a result, you must call bqueue_enqueue_flush() when you enqueue your 38 * final record on a thread, in case the dequeuing threads are currently 39 * blocked and that enqueue does not cause them to be woken. Alternatively, 40 * this behavior can be disabled (causing signaling to happen immediately) by 41 * setting fill_fraction to any value larger than size. Return 0 on success, 42 * or -1 on failure. 43 * 44 * Note: The caller must ensure that for a given bqueue_t, there's only a 45 * single call to bqueue_enqueue() running at a time (e.g. by calling only 46 * from a single thread, or with locking around the call). Similarly, the 47 * caller must ensure that there's only a single call to bqueue_dequeue() 48 * running at a time. However, the one call to bqueue_enqueue() may be 49 * invoked concurrently with the one call to bqueue_dequeue(). 50 */ 51 int 52 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) 53 { 54 if (fill_fraction == 0) { 55 return (-1); 56 } 57 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 58 node_offset + offsetof(bqueue_node_t, bqn_node)); 59 list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t), 60 node_offset + offsetof(bqueue_node_t, bqn_node)); 61 list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t), 62 node_offset + offsetof(bqueue_node_t, bqn_node)); 63 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 64 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 65 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 66 q->bq_node_offset = node_offset; 67 q->bq_size = 0; 68 q->bq_dequeuing_size = 0; 69 q->bq_enqueuing_size = 0; 70 q->bq_maxsize = size; 71 q->bq_fill_fraction = fill_fraction; 72 return (0); 73 } 74 75 /* 76 * Destroy a blocking queue. This function asserts that there are no 77 * elements in the queue, and no one is blocked on the condition 78 * variables. 79 */ 80 void 81 bqueue_destroy(bqueue_t *q) 82 { 83 mutex_enter(&q->bq_lock); 84 ASSERT0(q->bq_size); 85 ASSERT0(q->bq_dequeuing_size); 86 ASSERT0(q->bq_enqueuing_size); 87 cv_destroy(&q->bq_add_cv); 88 cv_destroy(&q->bq_pop_cv); 89 list_destroy(&q->bq_list); 90 list_destroy(&q->bq_dequeuing_list); 91 list_destroy(&q->bq_enqueuing_list); 92 mutex_exit(&q->bq_lock); 93 mutex_destroy(&q->bq_lock); 94 } 95 96 static void 97 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) 98 { 99 ASSERT3U(item_size, >, 0); 100 ASSERT3U(item_size, <=, q->bq_maxsize); 101 102 obj2node(q, data)->bqn_size = item_size; 103 q->bq_enqueuing_size += item_size; 104 list_insert_tail(&q->bq_enqueuing_list, data); 105 106 if (flush || 107 q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) { 108 /* Append the enquing list to the shared list. */ 109 mutex_enter(&q->bq_lock); 110 while (q->bq_size > q->bq_maxsize) { 111 cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 112 } 113 q->bq_size += q->bq_enqueuing_size; 114 list_move_tail(&q->bq_list, &q->bq_enqueuing_list); 115 q->bq_enqueuing_size = 0; 116 cv_broadcast(&q->bq_pop_cv); 117 mutex_exit(&q->bq_lock); 118 } 119 } 120 121 /* 122 * Add data to q, consuming size units of capacity. If there is insufficient 123 * capacity to consume size units, block until capacity exists. Asserts size is 124 * > 0. 125 */ 126 void 127 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) 128 { 129 bqueue_enqueue_impl(q, data, item_size, B_FALSE); 130 } 131 132 /* 133 * Enqueue an entry, and then flush the queue. This forces the popping threads 134 * to wake up, even if we're below the fill fraction. We have this in a single 135 * function, rather than having a separate call, because it prevents race 136 * conditions between the enqueuing thread and the dequeuing thread, where the 137 * enqueueing thread will wake up the dequeuing thread, that thread will 138 * destroy the condvar before the enqueuing thread is done. 139 */ 140 void 141 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) 142 { 143 bqueue_enqueue_impl(q, data, item_size, B_TRUE); 144 } 145 146 /* 147 * Take the first element off of q. If there are no elements on the queue, wait 148 * until one is put there. Return the removed element. 149 */ 150 void * 151 bqueue_dequeue(bqueue_t *q) 152 { 153 void *ret = list_remove_head(&q->bq_dequeuing_list); 154 if (ret == NULL) { 155 /* 156 * Dequeuing list is empty. Wait for there to be something on 157 * the shared list, then move the entire shared list to the 158 * dequeuing list. 159 */ 160 mutex_enter(&q->bq_lock); 161 while (q->bq_size == 0) { 162 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 163 } 164 ASSERT0(q->bq_dequeuing_size); 165 ASSERT(list_is_empty(&q->bq_dequeuing_list)); 166 list_move_tail(&q->bq_dequeuing_list, &q->bq_list); 167 q->bq_dequeuing_size = q->bq_size; 168 q->bq_size = 0; 169 cv_broadcast(&q->bq_add_cv); 170 mutex_exit(&q->bq_lock); 171 ret = list_remove_head(&q->bq_dequeuing_list); 172 } 173 q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size; 174 return (ret); 175 } 176