1 /* 2 * CDDL HEADER START 3 * 4 * This file and its contents are supplied under the terms of the 5 * Common Development and Distribution License ("CDDL"), version 1.0. 6 * You may only use this file in accordance with the terms of version 7 * 1.0 of the CDDL. 8 * 9 * A full copy of the text of the CDDL should have accompanied this 10 * source. A copy of the CDDL is also available via the Internet at 11 * http://www.illumos.org/license/CDDL. 12 * 13 * CDDL HEADER END 14 */ 15 /* 16 * Copyright (c) 2014 by Delphix. All rights reserved. 17 */ 18 19 #include <sys/bqueue.h> 20 #include <sys/zfs_context.h> 21 22 static inline bqueue_node_t * 23 obj2node(bqueue_t *q, void *data) 24 { 25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26 } 27 28 /* 29 * Initialize a blocking queue The maximum capacity of the queue is set to 30 * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, 31 * and offset should give its offset from the start of the struct. Return 0 on 32 * success, or -1 on failure. 33 */ 34 int 35 bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) 36 { 37 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 38 node_offset + offsetof(bqueue_node_t, bqn_node)); 39 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 40 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 41 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 42 q->bq_node_offset = node_offset; 43 q->bq_size = 0; 44 q->bq_maxsize = size; 45 return (0); 46 } 47 48 /* 49 * Destroy a blocking queue. This function asserts that there are no 50 * elements in the queue, and no one is blocked on the condition 51 * variables. 52 */ 53 void 54 bqueue_destroy(bqueue_t *q) 55 { 56 ASSERT0(q->bq_size); 57 cv_destroy(&q->bq_add_cv); 58 cv_destroy(&q->bq_pop_cv); 59 mutex_destroy(&q->bq_lock); 60 list_destroy(&q->bq_list); 61 } 62 63 /* 64 * Add data to q, consuming size units of capacity. If there is insufficient 65 * capacity to consume size units, block until capacity exists. Asserts size is 66 * > 0. 67 */ 68 void 69 bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) 70 { 71 ASSERT3U(item_size, >, 0); 72 ASSERT3U(item_size, <, q->bq_maxsize); 73 mutex_enter(&q->bq_lock); 74 obj2node(q, data)->bqn_size = item_size; 75 while (q->bq_size + item_size > q->bq_maxsize) { 76 cv_wait(&q->bq_add_cv, &q->bq_lock); 77 } 78 q->bq_size += item_size; 79 list_insert_tail(&q->bq_list, data); 80 cv_signal(&q->bq_pop_cv); 81 mutex_exit(&q->bq_lock); 82 } 83 /* 84 * Take the first element off of q. If there are no elements on the queue, wait 85 * until one is put there. Return the removed element. 86 */ 87 void * 88 bqueue_dequeue(bqueue_t *q) 89 { 90 void *ret; 91 uint64_t item_size; 92 mutex_enter(&q->bq_lock); 93 while (q->bq_size == 0) { 94 cv_wait(&q->bq_pop_cv, &q->bq_lock); 95 } 96 ret = list_remove_head(&q->bq_list); 97 item_size = obj2node(q, ret)->bqn_size; 98 q->bq_size -= item_size; 99 mutex_exit(&q->bq_lock); 100 cv_signal(&q->bq_add_cv); 101 return (ret); 102 } 103 104 /* 105 * Returns true if the space used is 0. 106 */ 107 boolean_t 108 bqueue_empty(bqueue_t *q) 109 { 110 return (q->bq_size == 0); 111 } 112