1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 1996-2002 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <pthread.h> 30 #include <malloc.h> 31 #include <memory.h> 32 #include "dataq.h" 33 #include <assert.h> 34 35 #ifndef NDEBUG 36 static int 37 dataq_check(dataq_t *ptr) /* call while holding lock! */ 38 { 39 assert(ptr->num_data == ll_check(&ptr->data)); 40 assert(ptr->num_waiters == ll_check(&ptr->waiters)); 41 return (1); 42 } 43 #endif 44 45 int 46 dataq_init(dataq_t *ptr) 47 { 48 ptr->num_data = 0; 49 ptr->num_waiters = 0; 50 ll_init(&ptr->data); 51 ll_init(&ptr->waiters); 52 pthread_mutex_init(&ptr->lock, NULL); 53 assert((pthread_mutex_lock(&ptr->lock) == 0) && 54 (dataq_check(ptr) == 1) && 55 (pthread_mutex_unlock(&ptr->lock) == 0)); 56 return (0); 57 } 58 59 int 60 dataq_enqueue(dataq_t *dataq, void *in) 61 { 62 dataq_data_t *ptr = (dataq_data_t *)malloc(sizeof (*ptr)); 63 dataq_waiter_t *sleeper; 64 65 if (ptr == NULL) 66 return (-1); 67 ptr->data = in; 68 pthread_mutex_lock(&dataq->lock); 69 assert(dataq_check(dataq)); 70 ll_enqueue(&dataq->data, &ptr->list); 71 dataq->num_data++; 72 if (dataq->num_waiters) { 73 /*LINTED*/ 74 sleeper = (dataq_waiter_t *)ll_peek(&dataq->waiters); 75 sleeper->wakeup = 1; 76 pthread_cond_signal(&sleeper->cv); 77 } 78 assert(dataq_check(dataq)); 79 pthread_mutex_unlock(&dataq->lock); 80 return (0); 81 } 82 83 int 84 dataq_dequeue(dataq_t *dataq, void **outptr, int try) 85 { 86 dataq_data_t *dptr; 87 dataq_waiter_t *sleeper; 88 89 pthread_mutex_lock(&dataq->lock); 90 if ((dataq->num_waiters > 0) || 91 ((dptr = (dataq_data_t *)ll_dequeue(&dataq->data)) == NULL)) { 92 dataq_waiter_t wait; 93 if (try) { 94 pthread_mutex_unlock(&dataq->lock); 95 return (1); 96 } 97 wait.wakeup = 0; 98 pthread_cond_init(&wait.cv, NULL); 99 dataq->num_waiters++; 100 ll_enqueue(&dataq->waiters, &wait.list); 101 while (wait.wakeup == 0) 102 pthread_cond_wait(&wait.cv, &dataq->lock); 103 ll_dequeue(&dataq->waiters); 104 dataq->num_waiters--; 105 pthread_cond_destroy(&wait.cv); 106 dptr = (dataq_data_t *)ll_dequeue(&dataq->data); 107 } 108 dataq->num_data--; 109 if (dataq->num_data && dataq->num_waiters) { 110 /*LINTED*/ 111 sleeper = (dataq_waiter_t *)ll_peek(&dataq->waiters); 112 sleeper->wakeup = 1; 113 pthread_cond_signal(&sleeper->cv); 114 } 115 pthread_mutex_unlock(&dataq->lock); 116 *outptr = dptr->data; 117 free(dptr); 118 return (0); 119 } 120 121 static void 122 dataq_data_destroy(void * p) 123 { 124 dataq_data_t *d = (dataq_data_t *)p; 125 free(d->data); 126 free(d); 127 } 128 129 static void 130 dataq_waiters_destroy(void * p) 131 { 132 dataq_waiter_t *d = (dataq_waiter_t *)p; 133 pthread_cond_destroy(&d->cv); 134 free(d); 135 } 136 137 int 138 dataq_destroy(dataq_t *dataq) 139 { 140 pthread_mutex_destroy(&dataq->lock); 141 ll_mapf(&dataq->data, dataq_data_destroy); 142 ll_mapf(&dataq->waiters, dataq_waiters_destroy); 143 return (0); 144 } 145