1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 /* 28 * A synchronized FIFO queue for inter-thread producer-consumer semantics. 29 * This queue will handle multiple writers and readers simultaneously. 30 * 31 * The following operations are provided: 32 * slp_new_queue: create a new queue 33 * slp_enqueue: place a message at the end of the queue 34 * slp_enqueue_at_head: place a message the the start of the queue 35 * slp_dequeue: remove and return the next message on the queue 36 * (waits indefinately) 37 * slp_dequeue_timed: remove and return the next message on the queue 38 * (waits only for a specified time) 39 * slp_flush_queue: flushes and frees all messages on a queue 40 * slp_destroy_queue: frees an empty queue. 41 */ 42 43 #include <stdio.h> 44 #include <stdlib.h> 45 #include <thread.h> 46 #include <synch.h> 47 #include <syslog.h> 48 #include <slp.h> 49 #include <slp-internal.h> 50 51 /* Private implementation details */ 52 struct queue_entry { 53 void *msg; 54 struct queue_entry *next; 55 }; 56 typedef struct queue_entry slp_queue_entry_t; 57 58 struct queue { 59 slp_queue_entry_t *head; 60 slp_queue_entry_t *tail; 61 mutex_t *lock; 62 cond_t *wait; 63 int count; 64 }; 65 66 /* 67 * Creates, initializes, and returns a new queue. 68 * If an initialization error occured, returns NULL and sets err to 69 * the appropriate SLP error code. 70 * queues can operate in one of two modes: timed-wait, and infinite 71 * wait. The timeout parameter specifies which of these modes should 72 * be enabled for the new queue. 73 */ 74 slp_queue_t *slp_new_queue(SLPError *err) { 75 mutex_t *lock; 76 cond_t *wait; 77 struct queue *q; 78 79 *err = SLP_OK; 80 81 /* initialize new mutex and semaphore */ 82 if ((lock = calloc(1, sizeof (*lock))) == NULL) { 83 *err = SLP_MEMORY_ALLOC_FAILED; 84 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 85 return (NULL); 86 } 87 88 /* intialize condition vars */ 89 if (!(wait = calloc(1, sizeof (*wait)))) { 90 *err = SLP_MEMORY_ALLOC_FAILED; 91 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 92 return (NULL); 93 } 94 (void) cond_init(wait, USYNC_THREAD, NULL); 95 96 /* create the queue */ 97 if ((q = malloc(sizeof (*q))) == NULL) { 98 *err = SLP_MEMORY_ALLOC_FAILED; 99 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); 100 return (NULL); 101 } 102 103 q->head = NULL; 104 q->lock = lock; 105 q->wait = wait; 106 q->count = 0; 107 108 return (q); 109 } 110 111 /* 112 * Adds msg to the tail of queue q. 113 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED 114 * if it couldn't allocate memory. 115 */ 116 SLPError slp_enqueue(slp_queue_t *qa, void *msg) { 117 slp_queue_entry_t *qe; 118 struct queue *q = qa; 119 120 if ((qe = malloc(sizeof (*qe))) == NULL) { 121 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); 122 return (SLP_MEMORY_ALLOC_FAILED); 123 } 124 125 (void) mutex_lock(q->lock); 126 qe->msg = msg; 127 qe->next = NULL; 128 if (q->head != NULL) { /* queue is not emptry */ 129 q->tail->next = qe; 130 q->tail = qe; 131 } else { /* queue is empty */ 132 q->head = q->tail = qe; 133 } 134 q->count++; 135 (void) mutex_unlock(q->lock); 136 (void) cond_signal(q->wait); 137 138 return (SLP_OK); 139 } 140 141 /* 142 * Inserts a message at the head of the queue. This is useful for inserting 143 * things like cancel messages. 144 */ 145 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) { 146 slp_queue_entry_t *qe; 147 struct queue *q = qa; 148 149 if ((qe = malloc(sizeof (*qe))) == NULL) { 150 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); 151 return (SLP_MEMORY_ALLOC_FAILED); 152 } 153 154 (void) mutex_lock(q->lock); 155 qe->msg = msg; 156 qe->next = q->head; 157 q->head = qe; 158 159 q->count++; 160 (void) mutex_unlock(q->lock); 161 (void) cond_signal(q->wait); 162 163 return (SLP_OK); 164 } 165 166 /* 167 * The core functionality for dequeue. 168 */ 169 static void *dequeue_nolock(struct queue *q) { 170 void *msg; 171 slp_queue_entry_t *qe = q->head; 172 173 if (!qe) 174 return (NULL); /* shouldn't get here */ 175 msg = qe->msg; 176 if (!qe->next) /* last one in queue */ 177 q->head = q->tail = NULL; 178 else 179 q->head = qe->next; 180 free(qe); 181 q->count--; 182 return (msg); 183 } 184 185 /* 186 * Returns the first message waiting or arriving in the queue, or if no 187 * message is available after waiting the amount of time specified in 188 * 'to', returns NULL, and sets 'etimed' to true. If an error occured, 189 * returns NULL and sets 'etimed' to false. 190 */ 191 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) { 192 int err; 193 void *ans; 194 struct queue *q = qa; 195 196 if (etimed) 197 *etimed = SLP_FALSE; 198 199 (void) mutex_lock(q->lock); 200 if (q->count > 0) { 201 /* something's in the q, so no need to wait */ 202 goto msg_available; 203 } 204 205 /* else wait */ 206 while (q->count == 0) { 207 if (to) { 208 err = cond_timedwait(q->wait, q->lock, to); 209 } else { 210 err = cond_wait(q->wait, q->lock); 211 } 212 if (err == ETIME) { 213 (void) mutex_unlock(q->lock); 214 *etimed = SLP_TRUE; 215 return (NULL); 216 } 217 } 218 219 msg_available: 220 ans = dequeue_nolock(q); 221 (void) mutex_unlock(q->lock); 222 return (ans); 223 } 224 225 /* 226 * Removes the first message from the queue and returns it. 227 * Returns NULL only on internal error. 228 */ 229 void *slp_dequeue(slp_queue_t *qa) { 230 return (slp_dequeue_timed(qa, NULL, NULL)); 231 } 232 233 /* 234 * Flushes the queue, using the caller-specified free function to 235 * free each message in the queue. 236 */ 237 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) { 238 slp_queue_entry_t *p, *pn; 239 struct queue *q = qa; 240 241 for (p = q->head; p; p = pn) { 242 pn = p->next; 243 free_f(p); 244 } 245 } 246 247 /* 248 * Frees a queue. 249 * The queue must be empty before it can be destroyed; slp_flush_queue 250 * can be used to empty a queue. 251 */ 252 void slp_destroy_queue(slp_queue_t *qa) { 253 struct queue *q = qa; 254 255 (void) mutex_destroy(q->lock); 256 (void) cond_destroy(q->wait); 257 free(q->lock); 258 free(q->wait); 259 free(q); 260 } 261