1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * A synchronized FIFO queue for inter-thread producer-consumer semantics. 31 * This queue will handle multiple writers and readers simultaneously. 32 * 33 * The following operations are provided: 34 * slp_new_queue: create a new queue 35 * slp_enqueue: place a message at the end of the queue 36 * slp_enqueue_at_head: place a message the the start of the queue 37 * slp_dequeue: remove and return the next message on the queue 38 * (waits indefinately) 39 * slp_dequeue_timed: remove and return the next message on the queue 40 * (waits only for a specified time) 41 * slp_flush_queue: flushes and frees all messages on a queue 42 * slp_destroy_queue: frees an empty queue. 43 */ 44 45 #include <stdio.h> 46 #include <stdlib.h> 47 #include <thread.h> 48 #include <synch.h> 49 #include <syslog.h> 50 #include <slp.h> 51 #include <slp-internal.h> 52 53 /* Private implementation details */ 54 struct queue_entry { 55 void *msg; 56 struct queue_entry *next; 57 }; 58 typedef struct queue_entry slp_queue_entry_t; 59 60 struct queue { 61 slp_queue_entry_t *head; 62 slp_queue_entry_t *tail; 63 mutex_t *lock; 64 cond_t *wait; 65 int count; 66 }; 67 68 /* 69 * Creates, initializes, and returns a new queue. 70 * If an initialization error occured, returns NULL and sets err to 71 * the appropriate SLP error code. 72 * queues can operate in one of two modes: timed-wait, and infinite 73 * wait. The timeout parameter specifies which of these modes should 74 * be enabled for the new queue. 75 */ 76 slp_queue_t *slp_new_queue(SLPError *err) { 77 mutex_t *lock; 78 cond_t *wait; 79 struct queue *q; 80 81 *err = SLP_OK; 82 83 /* initialize new mutex and semaphore */ 84 if ((lock = calloc(1, sizeof (*lock))) == NULL) { 85 *err = SLP_MEMORY_ALLOC_FAILED; 86 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 87 return (NULL); 88 } 89 90 /* intialize condition vars */ 91 if (!(wait = calloc(1, sizeof (*wait)))) { 92 *err = SLP_MEMORY_ALLOC_FAILED; 93 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 94 return (NULL); 95 } 96 (void) cond_init(wait, NULL, NULL); 97 98 /* create the queue */ 99 if ((q = malloc(sizeof (*q))) == NULL) { 100 *err = SLP_MEMORY_ALLOC_FAILED; 101 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 102 return (NULL); 103 } 104 105 q->head = NULL; 106 q->lock = lock; 107 q->wait = wait; 108 q->count = 0; 109 110 return (q); 111 } 112 113 /* 114 * Adds msg to the tail of queue q. 115 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED 116 * if it couldn't allocate memory. 117 */ 118 SLPError slp_enqueue(slp_queue_t *qa, void *msg) { 119 slp_queue_entry_t *qe; 120 struct queue *q = qa; 121 122 if ((qe = malloc(sizeof (*qe))) == NULL) { 123 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); 124 return (SLP_MEMORY_ALLOC_FAILED); 125 } 126 127 (void) mutex_lock(q->lock); 128 qe->msg = msg; 129 qe->next = NULL; 130 if (q->head != NULL) { /* queue is not emptry */ 131 q->tail->next = qe; 132 q->tail = qe; 133 } else { /* queue is empty */ 134 q->head = q->tail = qe; 135 } 136 q->count++; 137 (void) mutex_unlock(q->lock); 138 (void) cond_signal(q->wait); 139 140 return (SLP_OK); 141 } 142 143 /* 144 * Inserts a message at the head of the queue. This is useful for inserting 145 * things like cancel messages. 146 */ 147 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) { 148 slp_queue_entry_t *qe; 149 struct queue *q = qa; 150 151 if ((qe = malloc(sizeof (*qe))) == NULL) { 152 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); 153 return (SLP_MEMORY_ALLOC_FAILED); 154 } 155 156 (void) mutex_lock(q->lock); 157 qe->msg = msg; 158 qe->next = q->head; 159 q->head = qe; 160 161 q->count++; 162 (void) mutex_unlock(q->lock); 163 (void) cond_signal(q->wait); 164 165 return (SLP_OK); 166 } 167 168 /* 169 * The core functionality for dequeue. 170 */ 171 static void *dequeue_nolock(struct queue *q) { 172 void *msg; 173 slp_queue_entry_t *qe = q->head; 174 175 if (!qe) 176 return (NULL); /* shouldn't get here */ 177 msg = qe->msg; 178 if (!qe->next) /* last one in queue */ 179 q->head = q->tail = NULL; 180 else 181 q->head = qe->next; 182 free(qe); 183 q->count--; 184 return (msg); 185 } 186 187 /* 188 * Returns the first message waiting or arriving in the queue, or if no 189 * message is available after waiting the amount of time specified in 190 * 'to', returns NULL, and sets 'etimed' to true. If an error occured, 191 * returns NULL and sets 'etimed' to false. 192 */ 193 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) { 194 int err; 195 void *ans; 196 struct queue *q = qa; 197 198 if (etimed) 199 *etimed = SLP_FALSE; 200 201 (void) mutex_lock(q->lock); 202 if (q->count > 0) { 203 /* something's in the q, so no need to wait */ 204 goto msg_available; 205 } 206 207 /* else wait */ 208 while (q->count == 0) { 209 if (to) { 210 err = cond_timedwait(q->wait, q->lock, to); 211 } else { 212 err = cond_wait(q->wait, q->lock); 213 } 214 if (err == ETIME) { 215 (void) mutex_unlock(q->lock); 216 *etimed = SLP_TRUE; 217 return (NULL); 218 } 219 } 220 221 msg_available: 222 ans = dequeue_nolock(q); 223 (void) mutex_unlock(q->lock); 224 return (ans); 225 } 226 227 /* 228 * Removes the first message from the queue and returns it. 229 * Returns NULL only on internal error. 230 */ 231 void *slp_dequeue(slp_queue_t *qa) { 232 return (slp_dequeue_timed(qa, NULL, NULL)); 233 } 234 235 /* 236 * Flushes the queue, using the caller-specified free function to 237 * free each message in the queue. 238 */ 239 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) { 240 slp_queue_entry_t *p, *pn; 241 struct queue *q = qa; 242 243 for (p = q->head; p; p = pn) { 244 pn = p->next; 245 free_f(p); 246 } 247 } 248 249 /* 250 * Frees a queue. 251 * The queue must be empty before it can be destroyed; slp_flush_queue 252 * can be used to empty a queue. 253 */ 254 void slp_destroy_queue(slp_queue_t *qa) { 255 struct queue *q = qa; 256 257 (void) mutex_destroy(q->lock); 258 (void) cond_destroy(q->wait); 259 free(q->lock); 260 free(q->wait); 261 free(q); 262 } 263