1*eda14cbcSMatt Macy /* 2*eda14cbcSMatt Macy * CDDL HEADER START 3*eda14cbcSMatt Macy * 4*eda14cbcSMatt Macy * This file and its contents are supplied under the terms of the 5*eda14cbcSMatt Macy * Common Development and Distribution License ("CDDL"), version 1.0. 6*eda14cbcSMatt Macy * You may only use this file in accordance with the terms of version 7*eda14cbcSMatt Macy * 1.0 of the CDDL. 8*eda14cbcSMatt Macy * 9*eda14cbcSMatt Macy * A full copy of the text of the CDDL should have accompanied this 10*eda14cbcSMatt Macy * source. A copy of the CDDL is also available via the Internet at 11*eda14cbcSMatt Macy * http://www.illumos.org/license/CDDL. 12*eda14cbcSMatt Macy * 13*eda14cbcSMatt Macy * CDDL HEADER END 14*eda14cbcSMatt Macy */ 15*eda14cbcSMatt Macy /* 16*eda14cbcSMatt Macy * Copyright (c) 2014, 2018 by Delphix. All rights reserved. 17*eda14cbcSMatt Macy */ 18*eda14cbcSMatt Macy 19*eda14cbcSMatt Macy #include <sys/bqueue.h> 20*eda14cbcSMatt Macy #include <sys/zfs_context.h> 21*eda14cbcSMatt Macy 22*eda14cbcSMatt Macy static inline bqueue_node_t * 23*eda14cbcSMatt Macy obj2node(bqueue_t *q, void *data) 24*eda14cbcSMatt Macy { 25*eda14cbcSMatt Macy return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26*eda14cbcSMatt Macy } 27*eda14cbcSMatt Macy 28*eda14cbcSMatt Macy /* 29*eda14cbcSMatt Macy * Initialize a blocking queue The maximum capacity of the queue is set to 30*eda14cbcSMatt Macy * size. Types that are stored in a bqueue must contain a bqueue_node_t, 31*eda14cbcSMatt Macy * and node_offset must be its offset from the start of the struct. 32*eda14cbcSMatt Macy * fill_fraction is a performance tuning value; when the queue is full, any 33*eda14cbcSMatt Macy * threads attempting to enqueue records will block. They will block until 34*eda14cbcSMatt Macy * they're signaled, which will occur when the queue is at least 1/fill_fraction 35*eda14cbcSMatt Macy * empty. Similar behavior occurs on dequeue; if the queue is empty, threads 36*eda14cbcSMatt Macy * block. They will be signalled when the queue has 1/fill_fraction full, or 37*eda14cbcSMatt Macy * when bqueue_flush is called. As a result, you must call bqueue_flush when 38*eda14cbcSMatt Macy * you enqueue your final record on a thread, in case the dequeueing threads are 39*eda14cbcSMatt Macy * currently blocked and that enqueue does not cause them to be awoken. 40*eda14cbcSMatt Macy * Alternatively, this behavior can be disabled (causing signaling to happen 41*eda14cbcSMatt Macy * immediately) by setting fill_fraction to any value larger than size. 42*eda14cbcSMatt Macy * Return 0 on success, or -1 on failure. 43*eda14cbcSMatt Macy */ 44*eda14cbcSMatt Macy int 45*eda14cbcSMatt Macy bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size, 46*eda14cbcSMatt Macy size_t node_offset) 47*eda14cbcSMatt Macy { 48*eda14cbcSMatt Macy if (fill_fraction == 0) { 49*eda14cbcSMatt Macy return (-1); 50*eda14cbcSMatt Macy } 51*eda14cbcSMatt Macy list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 52*eda14cbcSMatt Macy node_offset + offsetof(bqueue_node_t, bqn_node)); 53*eda14cbcSMatt Macy cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 54*eda14cbcSMatt Macy cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 55*eda14cbcSMatt Macy mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 56*eda14cbcSMatt Macy q->bq_node_offset = node_offset; 57*eda14cbcSMatt Macy q->bq_size = 0; 58*eda14cbcSMatt Macy q->bq_maxsize = size; 59*eda14cbcSMatt Macy q->bq_fill_fraction = fill_fraction; 60*eda14cbcSMatt Macy return (0); 61*eda14cbcSMatt Macy } 62*eda14cbcSMatt Macy 63*eda14cbcSMatt Macy /* 64*eda14cbcSMatt Macy * Destroy a blocking queue. This function asserts that there are no 65*eda14cbcSMatt Macy * elements in the queue, and no one is blocked on the condition 66*eda14cbcSMatt Macy * variables. 67*eda14cbcSMatt Macy */ 68*eda14cbcSMatt Macy void 69*eda14cbcSMatt Macy bqueue_destroy(bqueue_t *q) 70*eda14cbcSMatt Macy { 71*eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 72*eda14cbcSMatt Macy ASSERT0(q->bq_size); 73*eda14cbcSMatt Macy cv_destroy(&q->bq_add_cv); 74*eda14cbcSMatt Macy cv_destroy(&q->bq_pop_cv); 75*eda14cbcSMatt Macy list_destroy(&q->bq_list); 76*eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 77*eda14cbcSMatt Macy mutex_destroy(&q->bq_lock); 78*eda14cbcSMatt Macy } 79*eda14cbcSMatt Macy 80*eda14cbcSMatt Macy static void 81*eda14cbcSMatt Macy bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size, 82*eda14cbcSMatt Macy boolean_t flush) 83*eda14cbcSMatt Macy { 84*eda14cbcSMatt Macy ASSERT3U(item_size, >, 0); 85*eda14cbcSMatt Macy ASSERT3U(item_size, <=, q->bq_maxsize); 86*eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 87*eda14cbcSMatt Macy obj2node(q, data)->bqn_size = item_size; 88*eda14cbcSMatt Macy while (q->bq_size + item_size > q->bq_maxsize) { 89*eda14cbcSMatt Macy cv_wait_sig(&q->bq_add_cv, &q->bq_lock); 90*eda14cbcSMatt Macy } 91*eda14cbcSMatt Macy q->bq_size += item_size; 92*eda14cbcSMatt Macy list_insert_tail(&q->bq_list, data); 93*eda14cbcSMatt Macy if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) 94*eda14cbcSMatt Macy cv_signal(&q->bq_pop_cv); 95*eda14cbcSMatt Macy if (flush) 96*eda14cbcSMatt Macy cv_broadcast(&q->bq_pop_cv); 97*eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 98*eda14cbcSMatt Macy } 99*eda14cbcSMatt Macy 100*eda14cbcSMatt Macy /* 101*eda14cbcSMatt Macy * Add data to q, consuming size units of capacity. If there is insufficient 102*eda14cbcSMatt Macy * capacity to consume size units, block until capacity exists. Asserts size is 103*eda14cbcSMatt Macy * > 0. 104*eda14cbcSMatt Macy */ 105*eda14cbcSMatt Macy void 106*eda14cbcSMatt Macy bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) 107*eda14cbcSMatt Macy { 108*eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_FALSE); 109*eda14cbcSMatt Macy } 110*eda14cbcSMatt Macy 111*eda14cbcSMatt Macy /* 112*eda14cbcSMatt Macy * Enqueue an entry, and then flush the queue. This forces the popping threads 113*eda14cbcSMatt Macy * to wake up, even if we're below the fill fraction. We have this in a single 114*eda14cbcSMatt Macy * function, rather than having a separate call, because it prevents race 115*eda14cbcSMatt Macy * conditions between the enqueuing thread and the dequeueing thread, where the 116*eda14cbcSMatt Macy * enqueueing thread will wake up the dequeueing thread, that thread will 117*eda14cbcSMatt Macy * destroy the condvar before the enqueuing thread is done. 118*eda14cbcSMatt Macy */ 119*eda14cbcSMatt Macy void 120*eda14cbcSMatt Macy bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size) 121*eda14cbcSMatt Macy { 122*eda14cbcSMatt Macy bqueue_enqueue_impl(q, data, item_size, B_TRUE); 123*eda14cbcSMatt Macy } 124*eda14cbcSMatt Macy 125*eda14cbcSMatt Macy /* 126*eda14cbcSMatt Macy * Take the first element off of q. If there are no elements on the queue, wait 127*eda14cbcSMatt Macy * until one is put there. Return the removed element. 128*eda14cbcSMatt Macy */ 129*eda14cbcSMatt Macy void * 130*eda14cbcSMatt Macy bqueue_dequeue(bqueue_t *q) 131*eda14cbcSMatt Macy { 132*eda14cbcSMatt Macy void *ret = NULL; 133*eda14cbcSMatt Macy uint64_t item_size; 134*eda14cbcSMatt Macy mutex_enter(&q->bq_lock); 135*eda14cbcSMatt Macy while (q->bq_size == 0) { 136*eda14cbcSMatt Macy cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); 137*eda14cbcSMatt Macy } 138*eda14cbcSMatt Macy ret = list_remove_head(&q->bq_list); 139*eda14cbcSMatt Macy ASSERT3P(ret, !=, NULL); 140*eda14cbcSMatt Macy item_size = obj2node(q, ret)->bqn_size; 141*eda14cbcSMatt Macy q->bq_size -= item_size; 142*eda14cbcSMatt Macy if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) 143*eda14cbcSMatt Macy cv_signal(&q->bq_add_cv); 144*eda14cbcSMatt Macy mutex_exit(&q->bq_lock); 145*eda14cbcSMatt Macy return (ret); 146*eda14cbcSMatt Macy } 147*eda14cbcSMatt Macy 148*eda14cbcSMatt Macy /* 149*eda14cbcSMatt Macy * Returns true if the space used is 0. 150*eda14cbcSMatt Macy */ 151*eda14cbcSMatt Macy boolean_t 152*eda14cbcSMatt Macy bqueue_empty(bqueue_t *q) 153*eda14cbcSMatt Macy { 154*eda14cbcSMatt Macy return (q->bq_size == 0); 155*eda14cbcSMatt Macy } 156