1 /* 2 * CDDL HEADER START 3 * 4 * This file and its contents are supplied under the terms of the 5 * Common Development and Distribution License ("CDDL"), version 1.0. 6 * You may only use this file in accordance with the terms of version 7 * 1.0 of the CDDL. 8 * 9 * A full copy of the text of the CDDL should have accompanied this 10 * source. A copy of the CDDL is also available via the Internet at 11 * http://www.illumos.org/license/CDDL. 12 * 13 * CDDL HEADER END 14 */ 15 /* 16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 17 */ 18 19 #include <sys/bqueue.h> 20 #include <sys/zfs_context.h> 21 22 static inline bqueue_node_t * 23 obj2node(bqueue_t *q, void *data) 24 { 25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26 } 27 28 /* 29 * Initialize a blocking queue The maximum capacity of the queue is set to 30 * size. Types that are stored in a bqueue must contain a bqueue_node_t, 31 * and node_offset must be its offset from the start of the struct. 32 * fill_fraction is a performance tuning value; when the queue is full, any 33 * threads attempting to enqueue records will block. They will block until 34 * they're signaled, which will occur when the queue is at least 1/fill_fraction 35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 36 * block. They will be signalled when the queue has 1/fill_fraction full, or 37 * when bqueue_flush is called. As a result, you must call bqueue_flush when 38 * you enqueue your final record on a thread, in case the dequeueing threads are 39 * currently blocked and that enqueue does not cause them to be awoken. 40 * Alternatively, this behavior can be disabled (causing signaling to happen 41 * immediately) by setting fill_fraction to any value larger than size. 42 * Return 0 on success, or -1 on failure. 43 */ 44 int 45 bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size, 46 size_t node_offset) 47 { 48 if (fill_fraction == 0) { 49 return (-1); 50 } 51 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 52 node_offset + offsetof(bqueue_node_t, bqn_node)); 53 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 54 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 55 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 56 q->bq_node_offset = node_offset; 57 q->bq_size = 0; 58 q->bq_maxsize = size; 59 q->bq_fill_fraction = fill_fraction; 60 return (0); 61 } 62 63 /* 64 * Destroy a blocking queue. This function asserts that there are no 65 * elements in the queue, and no one is blocked on the condition 66 * variables. 67 */ 68 void 69 bqueue_destroy(bqueue_t *q) 70 { 71 mutex_enter(&q->bq_lock); 72 ASSERT0(q->bq_size); 73 cv_destroy(&q->bq_add_cv); 74 cv_destroy(&q->bq_pop_cv); 75 list_destroy(&q->bq_list); 76 mutex_exit(&q->bq_lock); 77 mutex_destroy(&q->bq_lock); 78 } 79 80 static void 81 bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size, 82 boolean_t flush) 83 { 84 ASSERT3U(item_size, >, 0); 85 ASSERT3U(item_size, <=, q->bq_maxsize); 86 mutex_enter(&q->bq_lock); 87 obj2node(q, data)->bqn_size = item_size; 88 while (q->bq_size + item_size > q->bq_maxsize) { 89 cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 90 } 91 q->bq_size += item_size; 92 list_insert_tail(&q->bq_list, data); 93 if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) 94 cv_signal(&q->bq_pop_cv); 95 if (flush) 96 cv_broadcast(&q->bq_pop_cv); 97 mutex_exit(&q->bq_lock); 98 } 99 100 /* 101 * Add data to q, consuming size units of capacity. If there is insufficient 102 * capacity to consume size units, block until capacity exists. Asserts size is 103 * > 0. 104 */ 105 void 106 bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) 107 { 108 bqueue_enqueue_impl(q, data, item_size, B_FALSE); 109 } 110 111 /* 112 * Enqueue an entry, and then flush the queue. This forces the popping threads 113 * to wake up, even if we're below the fill fraction. We have this in a single 114 * function, rather than having a separate call, because it prevents race 115 * conditions between the enqueuing thread and the dequeueing thread, where the 116 * enqueueing thread will wake up the dequeueing thread, that thread will 117 * destroy the condvar before the enqueuing thread is done. 118 */ 119 void 120 bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size) 121 { 122 bqueue_enqueue_impl(q, data, item_size, B_TRUE); 123 } 124 125 /* 126 * Take the first element off of q. If there are no elements on the queue, wait 127 * until one is put there. Return the removed element. 128 */ 129 void * 130 bqueue_dequeue(bqueue_t *q) 131 { 132 void *ret = NULL; 133 uint64_t item_size; 134 mutex_enter(&q->bq_lock); 135 while (q->bq_size == 0) { 136 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 137 } 138 ret = list_remove_head(&q->bq_list); 139 ASSERT3P(ret, !=, NULL); 140 item_size = obj2node(q, ret)->bqn_size; 141 q->bq_size -= item_size; 142 if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) 143 cv_signal(&q->bq_add_cv); 144 mutex_exit(&q->bq_lock); 145 return (ret); 146 } 147 148 /* 149 * Returns true if the space used is 0. 150 */ 151 boolean_t 152 bqueue_empty(bqueue_t *q) 153 { 154 return (q->bq_size == 0); 155 } 156