1*a2cdcdd2SPaul Dagnelie /* 2*a2cdcdd2SPaul Dagnelie * CDDL HEADER START 3*a2cdcdd2SPaul Dagnelie * 4*a2cdcdd2SPaul Dagnelie * This file and its contents are supplied under the terms of the 5*a2cdcdd2SPaul Dagnelie * Common Development and Distribution License ("CDDL"), version 1.0. 6*a2cdcdd2SPaul Dagnelie * You may only use this file in accordance with the terms of version 7*a2cdcdd2SPaul Dagnelie * 1.0 of the CDDL. 8*a2cdcdd2SPaul Dagnelie * 9*a2cdcdd2SPaul Dagnelie * A full copy of the text of the CDDL should have accompanied this 10*a2cdcdd2SPaul Dagnelie * source. A copy of the CDDL is also available via the Internet at 11*a2cdcdd2SPaul Dagnelie * http://www.illumos.org/license/CDDL. 12*a2cdcdd2SPaul Dagnelie * 13*a2cdcdd2SPaul Dagnelie * CDDL HEADER END 14*a2cdcdd2SPaul Dagnelie */ 15*a2cdcdd2SPaul Dagnelie /* 16*a2cdcdd2SPaul Dagnelie * Copyright (c) 2014 by Delphix. All rights reserved. 17*a2cdcdd2SPaul Dagnelie */ 18*a2cdcdd2SPaul Dagnelie 19*a2cdcdd2SPaul Dagnelie #include <sys/bqueue.h> 20*a2cdcdd2SPaul Dagnelie #include <sys/zfs_context.h> 21*a2cdcdd2SPaul Dagnelie 22*a2cdcdd2SPaul Dagnelie static inline bqueue_node_t * 23*a2cdcdd2SPaul Dagnelie obj2node(bqueue_t *q, void *data) 24*a2cdcdd2SPaul Dagnelie { 25*a2cdcdd2SPaul Dagnelie return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26*a2cdcdd2SPaul Dagnelie } 27*a2cdcdd2SPaul Dagnelie 28*a2cdcdd2SPaul Dagnelie /* 29*a2cdcdd2SPaul Dagnelie * Initialize a blocking queue The maximum capacity of the queue is set to 30*a2cdcdd2SPaul Dagnelie * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, 31*a2cdcdd2SPaul Dagnelie * and offset should give its offset from the start of the struct. Return 0 on 32*a2cdcdd2SPaul Dagnelie * success, or -1 on failure. 33*a2cdcdd2SPaul Dagnelie */ 34*a2cdcdd2SPaul Dagnelie int 35*a2cdcdd2SPaul Dagnelie bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) 36*a2cdcdd2SPaul Dagnelie { 37*a2cdcdd2SPaul Dagnelie list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 38*a2cdcdd2SPaul Dagnelie node_offset + offsetof(bqueue_node_t, bqn_node)); 39*a2cdcdd2SPaul Dagnelie cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 40*a2cdcdd2SPaul Dagnelie cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 41*a2cdcdd2SPaul Dagnelie mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 42*a2cdcdd2SPaul Dagnelie q->bq_node_offset = node_offset; 43*a2cdcdd2SPaul Dagnelie q->bq_size = 0; 44*a2cdcdd2SPaul Dagnelie q->bq_maxsize = size; 45*a2cdcdd2SPaul Dagnelie return (0); 46*a2cdcdd2SPaul Dagnelie } 47*a2cdcdd2SPaul Dagnelie 48*a2cdcdd2SPaul Dagnelie /* 49*a2cdcdd2SPaul Dagnelie * Destroy a blocking queue. This function asserts that there are no 50*a2cdcdd2SPaul Dagnelie * elements in the queue, and no one is blocked on the condition 51*a2cdcdd2SPaul Dagnelie * variables. 52*a2cdcdd2SPaul Dagnelie */ 53*a2cdcdd2SPaul Dagnelie void 54*a2cdcdd2SPaul Dagnelie bqueue_destroy(bqueue_t *q) 55*a2cdcdd2SPaul Dagnelie { 56*a2cdcdd2SPaul Dagnelie ASSERT0(q->bq_size); 57*a2cdcdd2SPaul Dagnelie cv_destroy(&q->bq_add_cv); 58*a2cdcdd2SPaul Dagnelie cv_destroy(&q->bq_pop_cv); 59*a2cdcdd2SPaul Dagnelie mutex_destroy(&q->bq_lock); 60*a2cdcdd2SPaul Dagnelie list_destroy(&q->bq_list); 61*a2cdcdd2SPaul Dagnelie } 62*a2cdcdd2SPaul Dagnelie 63*a2cdcdd2SPaul Dagnelie /* 64*a2cdcdd2SPaul Dagnelie * Add data to q, consuming size units of capacity. If there is insufficient 65*a2cdcdd2SPaul Dagnelie * capacity to consume size units, block until capacity exists. Asserts size is 66*a2cdcdd2SPaul Dagnelie * > 0. 67*a2cdcdd2SPaul Dagnelie */ 68*a2cdcdd2SPaul Dagnelie void 69*a2cdcdd2SPaul Dagnelie bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) 70*a2cdcdd2SPaul Dagnelie { 71*a2cdcdd2SPaul Dagnelie ASSERT3U(item_size, >, 0); 72*a2cdcdd2SPaul Dagnelie ASSERT3U(item_size, <, q->bq_maxsize); 73*a2cdcdd2SPaul Dagnelie mutex_enter(&q->bq_lock); 74*a2cdcdd2SPaul Dagnelie obj2node(q, data)->bqn_size = item_size; 75*a2cdcdd2SPaul Dagnelie while (q->bq_size + item_size > q->bq_maxsize) { 76*a2cdcdd2SPaul Dagnelie cv_wait(&q->bq_add_cv, &q->bq_lock); 77*a2cdcdd2SPaul Dagnelie } 78*a2cdcdd2SPaul Dagnelie q->bq_size += item_size; 79*a2cdcdd2SPaul Dagnelie list_insert_tail(&q->bq_list, data); 80*a2cdcdd2SPaul Dagnelie cv_signal(&q->bq_pop_cv); 81*a2cdcdd2SPaul Dagnelie mutex_exit(&q->bq_lock); 82*a2cdcdd2SPaul Dagnelie } 83*a2cdcdd2SPaul Dagnelie /* 84*a2cdcdd2SPaul Dagnelie * Take the first element off of q. If there are no elements on the queue, wait 85*a2cdcdd2SPaul Dagnelie * until one is put there. Return the removed element. 86*a2cdcdd2SPaul Dagnelie */ 87*a2cdcdd2SPaul Dagnelie void * 88*a2cdcdd2SPaul Dagnelie bqueue_dequeue(bqueue_t *q) 89*a2cdcdd2SPaul Dagnelie { 90*a2cdcdd2SPaul Dagnelie void *ret; 91*a2cdcdd2SPaul Dagnelie uint64_t item_size; 92*a2cdcdd2SPaul Dagnelie mutex_enter(&q->bq_lock); 93*a2cdcdd2SPaul Dagnelie while (q->bq_size == 0) { 94*a2cdcdd2SPaul Dagnelie cv_wait(&q->bq_pop_cv, &q->bq_lock); 95*a2cdcdd2SPaul Dagnelie } 96*a2cdcdd2SPaul Dagnelie ret = list_remove_head(&q->bq_list); 97*a2cdcdd2SPaul Dagnelie item_size = obj2node(q, ret)->bqn_size; 98*a2cdcdd2SPaul Dagnelie q->bq_size -= item_size; 99*a2cdcdd2SPaul Dagnelie mutex_exit(&q->bq_lock); 100*a2cdcdd2SPaul Dagnelie cv_signal(&q->bq_add_cv); 101*a2cdcdd2SPaul Dagnelie return (ret); 102*a2cdcdd2SPaul Dagnelie } 103*a2cdcdd2SPaul Dagnelie 104*a2cdcdd2SPaul Dagnelie /* 105*a2cdcdd2SPaul Dagnelie * Returns true if the space used is 0. 106*a2cdcdd2SPaul Dagnelie */ 107*a2cdcdd2SPaul Dagnelie boolean_t 108*a2cdcdd2SPaul Dagnelie bqueue_empty(bqueue_t *q) 109*a2cdcdd2SPaul Dagnelie { 110*a2cdcdd2SPaul Dagnelie return (q->bq_size == 0); 111*a2cdcdd2SPaul Dagnelie } 112