1 /* 2 * CDDL HEADER START 3 * 4 * This file and its contents are supplied under the terms of the 5 * Common Development and Distribution License ("CDDL"), version 1.0. 6 * You may only use this file in accordance with the terms of version 7 * 1.0 of the CDDL. 8 * 9 * A full copy of the text of the CDDL should have accompanied this 10 * source. A copy of the CDDL is also available via the Internet at 11 * http://www.illumos.org/license/CDDL. 12 * 13 * CDDL HEADER END 14 */ 15 /* 16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 17 */ 18 19 #include <sys/bqueue.h> 20 #include <sys/zfs_context.h> 21 22 static inline bqueue_node_t * 23 obj2node(bqueue_t *q, void *data) 24 { 25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26 } 27 28 /* 29 * Initialize a blocking queue The maximum capacity of the queue is set to 30 * size. Types that are stored in a bqueue must contain a bqueue_node_t, 31 * and node_offset must be its offset from the start of the struct. 32 * fill_fraction is a performance tuning value; when the queue is full, any 33 * threads attempting to enqueue records will block. They will block until 34 * they're signaled, which will occur when the queue is at least 1/fill_fraction 35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 36 * block. They will be signalled when the queue has 1/fill_fraction full, or 37 * when bqueue_flush is called. As a result, you must call bqueue_flush when 38 * you enqueue your final record on a thread, in case the dequeueing threads are 39 * currently blocked and that enqueue does not cause them to be awoken. 40 * Alternatively, this behavior can be disabled (causing signaling to happen 41 * immediately) by setting fill_fraction to any value larger than size. 42 * Return 0 on success, or -1 on failure. 43 */ 44 int 45 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) 46 { 47 if (fill_fraction == 0) { 48 return (-1); 49 } 50 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 51 node_offset + offsetof(bqueue_node_t, bqn_node)); 52 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 53 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 54 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 55 q->bq_node_offset = node_offset; 56 q->bq_size = 0; 57 q->bq_maxsize = size; 58 q->bq_fill_fraction = fill_fraction; 59 return (0); 60 } 61 62 /* 63 * Destroy a blocking queue. This function asserts that there are no 64 * elements in the queue, and no one is blocked on the condition 65 * variables. 66 */ 67 void 68 bqueue_destroy(bqueue_t *q) 69 { 70 mutex_enter(&q->bq_lock); 71 ASSERT0(q->bq_size); 72 cv_destroy(&q->bq_add_cv); 73 cv_destroy(&q->bq_pop_cv); 74 list_destroy(&q->bq_list); 75 mutex_exit(&q->bq_lock); 76 mutex_destroy(&q->bq_lock); 77 } 78 79 static void 80 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) 81 { 82 ASSERT3U(item_size, >, 0); 83 ASSERT3U(item_size, <=, q->bq_maxsize); 84 mutex_enter(&q->bq_lock); 85 obj2node(q, data)->bqn_size = item_size; 86 while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) { 87 /* 88 * Wake up bqueue_dequeue() thread if already sleeping in order 89 * to prevent the deadlock condition 90 */ 91 cv_signal(&q->bq_pop_cv); 92 cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 93 } 94 q->bq_size += item_size; 95 list_insert_tail(&q->bq_list, data); 96 if (flush) 97 cv_broadcast(&q->bq_pop_cv); 98 else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) 99 cv_signal(&q->bq_pop_cv); 100 mutex_exit(&q->bq_lock); 101 } 102 103 /* 104 * Add data to q, consuming size units of capacity. If there is insufficient 105 * capacity to consume size units, block until capacity exists. Asserts size is 106 * > 0. 107 */ 108 void 109 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) 110 { 111 bqueue_enqueue_impl(q, data, item_size, B_FALSE); 112 } 113 114 /* 115 * Enqueue an entry, and then flush the queue. This forces the popping threads 116 * to wake up, even if we're below the fill fraction. We have this in a single 117 * function, rather than having a separate call, because it prevents race 118 * conditions between the enqueuing thread and the dequeueing thread, where the 119 * enqueueing thread will wake up the dequeueing thread, that thread will 120 * destroy the condvar before the enqueuing thread is done. 121 */ 122 void 123 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) 124 { 125 bqueue_enqueue_impl(q, data, item_size, B_TRUE); 126 } 127 128 /* 129 * Take the first element off of q. If there are no elements on the queue, wait 130 * until one is put there. Return the removed element. 131 */ 132 void * 133 bqueue_dequeue(bqueue_t *q) 134 { 135 void *ret = NULL; 136 size_t item_size; 137 mutex_enter(&q->bq_lock); 138 while (q->bq_size == 0) { 139 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 140 } 141 ret = list_remove_head(&q->bq_list); 142 ASSERT3P(ret, !=, NULL); 143 item_size = obj2node(q, ret)->bqn_size; 144 q->bq_size -= item_size; 145 if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) 146 cv_signal(&q->bq_add_cv); 147 mutex_exit(&q->bq_lock); 148 return (ret); 149 } 150 151 /* 152 * Returns true if the space used is 0. 153 */ 154 boolean_t 155 bqueue_empty(bqueue_t *q) 156 { 157 return (q->bq_size == 0); 158 } 159