1*a2cdcdd2SPaul Dagnelie /*
2*a2cdcdd2SPaul Dagnelie * CDDL HEADER START
3*a2cdcdd2SPaul Dagnelie *
4*a2cdcdd2SPaul Dagnelie * This file and its contents are supplied under the terms of the
5*a2cdcdd2SPaul Dagnelie * Common Development and Distribution License ("CDDL"), version 1.0.
6*a2cdcdd2SPaul Dagnelie * You may only use this file in accordance with the terms of version
7*a2cdcdd2SPaul Dagnelie * 1.0 of the CDDL.
8*a2cdcdd2SPaul Dagnelie *
9*a2cdcdd2SPaul Dagnelie * A full copy of the text of the CDDL should have accompanied this
10*a2cdcdd2SPaul Dagnelie * source. A copy of the CDDL is also available via the Internet at
11*a2cdcdd2SPaul Dagnelie * http://www.illumos.org/license/CDDL.
12*a2cdcdd2SPaul Dagnelie *
13*a2cdcdd2SPaul Dagnelie * CDDL HEADER END
14*a2cdcdd2SPaul Dagnelie */
15*a2cdcdd2SPaul Dagnelie /*
16*a2cdcdd2SPaul Dagnelie * Copyright (c) 2014 by Delphix. All rights reserved.
17*a2cdcdd2SPaul Dagnelie */
18*a2cdcdd2SPaul Dagnelie
19*a2cdcdd2SPaul Dagnelie #include <sys/bqueue.h>
20*a2cdcdd2SPaul Dagnelie #include <sys/zfs_context.h>
21*a2cdcdd2SPaul Dagnelie
22*a2cdcdd2SPaul Dagnelie static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23*a2cdcdd2SPaul Dagnelie obj2node(bqueue_t *q, void *data)
24*a2cdcdd2SPaul Dagnelie {
25*a2cdcdd2SPaul Dagnelie return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26*a2cdcdd2SPaul Dagnelie }
27*a2cdcdd2SPaul Dagnelie
28*a2cdcdd2SPaul Dagnelie /*
29*a2cdcdd2SPaul Dagnelie * Initialize a blocking queue The maximum capacity of the queue is set to
30*a2cdcdd2SPaul Dagnelie * size. Types that want to be stored in a bqueue must contain a bqueue_node_t,
31*a2cdcdd2SPaul Dagnelie * and offset should give its offset from the start of the struct. Return 0 on
32*a2cdcdd2SPaul Dagnelie * success, or -1 on failure.
33*a2cdcdd2SPaul Dagnelie */
34*a2cdcdd2SPaul Dagnelie int
bqueue_init(bqueue_t * q,uint64_t size,size_t node_offset)35*a2cdcdd2SPaul Dagnelie bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36*a2cdcdd2SPaul Dagnelie {
37*a2cdcdd2SPaul Dagnelie list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38*a2cdcdd2SPaul Dagnelie node_offset + offsetof(bqueue_node_t, bqn_node));
39*a2cdcdd2SPaul Dagnelie cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40*a2cdcdd2SPaul Dagnelie cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41*a2cdcdd2SPaul Dagnelie mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42*a2cdcdd2SPaul Dagnelie q->bq_node_offset = node_offset;
43*a2cdcdd2SPaul Dagnelie q->bq_size = 0;
44*a2cdcdd2SPaul Dagnelie q->bq_maxsize = size;
45*a2cdcdd2SPaul Dagnelie return (0);
46*a2cdcdd2SPaul Dagnelie }
47*a2cdcdd2SPaul Dagnelie
48*a2cdcdd2SPaul Dagnelie /*
49*a2cdcdd2SPaul Dagnelie * Destroy a blocking queue. This function asserts that there are no
50*a2cdcdd2SPaul Dagnelie * elements in the queue, and no one is blocked on the condition
51*a2cdcdd2SPaul Dagnelie * variables.
52*a2cdcdd2SPaul Dagnelie */
53*a2cdcdd2SPaul Dagnelie void
bqueue_destroy(bqueue_t * q)54*a2cdcdd2SPaul Dagnelie bqueue_destroy(bqueue_t *q)
55*a2cdcdd2SPaul Dagnelie {
56*a2cdcdd2SPaul Dagnelie ASSERT0(q->bq_size);
57*a2cdcdd2SPaul Dagnelie cv_destroy(&q->bq_add_cv);
58*a2cdcdd2SPaul Dagnelie cv_destroy(&q->bq_pop_cv);
59*a2cdcdd2SPaul Dagnelie mutex_destroy(&q->bq_lock);
60*a2cdcdd2SPaul Dagnelie list_destroy(&q->bq_list);
61*a2cdcdd2SPaul Dagnelie }
62*a2cdcdd2SPaul Dagnelie
63*a2cdcdd2SPaul Dagnelie /*
64*a2cdcdd2SPaul Dagnelie * Add data to q, consuming size units of capacity. If there is insufficient
65*a2cdcdd2SPaul Dagnelie * capacity to consume size units, block until capacity exists. Asserts size is
66*a2cdcdd2SPaul Dagnelie * > 0.
67*a2cdcdd2SPaul Dagnelie */
68*a2cdcdd2SPaul Dagnelie void
bqueue_enqueue(bqueue_t * q,void * data,uint64_t item_size)69*a2cdcdd2SPaul Dagnelie bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70*a2cdcdd2SPaul Dagnelie {
71*a2cdcdd2SPaul Dagnelie ASSERT3U(item_size, >, 0);
72*a2cdcdd2SPaul Dagnelie ASSERT3U(item_size, <, q->bq_maxsize);
73*a2cdcdd2SPaul Dagnelie mutex_enter(&q->bq_lock);
74*a2cdcdd2SPaul Dagnelie obj2node(q, data)->bqn_size = item_size;
75*a2cdcdd2SPaul Dagnelie while (q->bq_size + item_size > q->bq_maxsize) {
76*a2cdcdd2SPaul Dagnelie cv_wait(&q->bq_add_cv, &q->bq_lock);
77*a2cdcdd2SPaul Dagnelie }
78*a2cdcdd2SPaul Dagnelie q->bq_size += item_size;
79*a2cdcdd2SPaul Dagnelie list_insert_tail(&q->bq_list, data);
80*a2cdcdd2SPaul Dagnelie cv_signal(&q->bq_pop_cv);
81*a2cdcdd2SPaul Dagnelie mutex_exit(&q->bq_lock);
82*a2cdcdd2SPaul Dagnelie }
83*a2cdcdd2SPaul Dagnelie /*
84*a2cdcdd2SPaul Dagnelie * Take the first element off of q. If there are no elements on the queue, wait
85*a2cdcdd2SPaul Dagnelie * until one is put there. Return the removed element.
86*a2cdcdd2SPaul Dagnelie */
87*a2cdcdd2SPaul Dagnelie void *
bqueue_dequeue(bqueue_t * q)88*a2cdcdd2SPaul Dagnelie bqueue_dequeue(bqueue_t *q)
89*a2cdcdd2SPaul Dagnelie {
90*a2cdcdd2SPaul Dagnelie void *ret;
91*a2cdcdd2SPaul Dagnelie uint64_t item_size;
92*a2cdcdd2SPaul Dagnelie mutex_enter(&q->bq_lock);
93*a2cdcdd2SPaul Dagnelie while (q->bq_size == 0) {
94*a2cdcdd2SPaul Dagnelie cv_wait(&q->bq_pop_cv, &q->bq_lock);
95*a2cdcdd2SPaul Dagnelie }
96*a2cdcdd2SPaul Dagnelie ret = list_remove_head(&q->bq_list);
97*a2cdcdd2SPaul Dagnelie item_size = obj2node(q, ret)->bqn_size;
98*a2cdcdd2SPaul Dagnelie q->bq_size -= item_size;
99*a2cdcdd2SPaul Dagnelie mutex_exit(&q->bq_lock);
100*a2cdcdd2SPaul Dagnelie cv_signal(&q->bq_add_cv);
101*a2cdcdd2SPaul Dagnelie return (ret);
102*a2cdcdd2SPaul Dagnelie }
103*a2cdcdd2SPaul Dagnelie
104*a2cdcdd2SPaul Dagnelie /*
105*a2cdcdd2SPaul Dagnelie * Returns true if the space used is 0.
106*a2cdcdd2SPaul Dagnelie */
107*a2cdcdd2SPaul Dagnelie boolean_t
bqueue_empty(bqueue_t * q)108*a2cdcdd2SPaul Dagnelie bqueue_empty(bqueue_t *q)
109*a2cdcdd2SPaul Dagnelie {
110*a2cdcdd2SPaul Dagnelie return (q->bq_size == 0);
111*a2cdcdd2SPaul Dagnelie }
112