1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 /*
30 * A synchronized FIFO queue for inter-thread producer-consumer semantics.
31 * This queue will handle multiple writers and readers simultaneously.
32 *
33 * The following operations are provided:
34 * slp_new_queue: create a new queue
35 * slp_enqueue: place a message at the end of the queue
36 * slp_enqueue_at_head: place a message the the start of the queue
37 * slp_dequeue: remove and return the next message on the queue
38 * (waits indefinately)
39 * slp_dequeue_timed: remove and return the next message on the queue
40 * (waits only for a specified time)
41 * slp_flush_queue: flushes and frees all messages on a queue
42 * slp_destroy_queue: frees an empty queue.
43 */
44
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <thread.h>
48 #include <synch.h>
49 #include <syslog.h>
50 #include <slp.h>
51 #include <slp-internal.h>
52
53 /* Private implementation details */
54 struct queue_entry {
55 void *msg;
56 struct queue_entry *next;
57 };
58 typedef struct queue_entry slp_queue_entry_t;
59
60 struct queue {
61 slp_queue_entry_t *head;
62 slp_queue_entry_t *tail;
63 mutex_t *lock;
64 cond_t *wait;
65 int count;
66 };
67
68 /*
69 * Creates, initializes, and returns a new queue.
70 * If an initialization error occured, returns NULL and sets err to
71 * the appropriate SLP error code.
72 * queues can operate in one of two modes: timed-wait, and infinite
73 * wait. The timeout parameter specifies which of these modes should
74 * be enabled for the new queue.
75 */
slp_new_queue(SLPError * err)76 slp_queue_t *slp_new_queue(SLPError *err) {
77 mutex_t *lock;
78 cond_t *wait;
79 struct queue *q;
80
81 *err = SLP_OK;
82
83 /* initialize new mutex and semaphore */
84 if ((lock = calloc(1, sizeof (*lock))) == NULL) {
85 *err = SLP_MEMORY_ALLOC_FAILED;
86 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
87 return (NULL);
88 }
89
90 /* intialize condition vars */
91 if (!(wait = calloc(1, sizeof (*wait)))) {
92 *err = SLP_MEMORY_ALLOC_FAILED;
93 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
94 return (NULL);
95 }
96 (void) cond_init(wait, NULL, NULL);
97
98 /* create the queue */
99 if ((q = malloc(sizeof (*q))) == NULL) {
100 *err = SLP_MEMORY_ALLOC_FAILED;
101 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
102 return (NULL);
103 }
104
105 q->head = NULL;
106 q->lock = lock;
107 q->wait = wait;
108 q->count = 0;
109
110 return (q);
111 }
112
113 /*
114 * Adds msg to the tail of queue q.
115 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
116 * if it couldn't allocate memory.
117 */
slp_enqueue(slp_queue_t * qa,void * msg)118 SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
119 slp_queue_entry_t *qe;
120 struct queue *q = qa;
121
122 if ((qe = malloc(sizeof (*qe))) == NULL) {
123 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
124 return (SLP_MEMORY_ALLOC_FAILED);
125 }
126
127 (void) mutex_lock(q->lock);
128 qe->msg = msg;
129 qe->next = NULL;
130 if (q->head != NULL) { /* queue is not emptry */
131 q->tail->next = qe;
132 q->tail = qe;
133 } else { /* queue is empty */
134 q->head = q->tail = qe;
135 }
136 q->count++;
137 (void) mutex_unlock(q->lock);
138 (void) cond_signal(q->wait);
139
140 return (SLP_OK);
141 }
142
143 /*
144 * Inserts a message at the head of the queue. This is useful for inserting
145 * things like cancel messages.
146 */
slp_enqueue_at_head(slp_queue_t * qa,void * msg)147 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
148 slp_queue_entry_t *qe;
149 struct queue *q = qa;
150
151 if ((qe = malloc(sizeof (*qe))) == NULL) {
152 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
153 return (SLP_MEMORY_ALLOC_FAILED);
154 }
155
156 (void) mutex_lock(q->lock);
157 qe->msg = msg;
158 qe->next = q->head;
159 q->head = qe;
160
161 q->count++;
162 (void) mutex_unlock(q->lock);
163 (void) cond_signal(q->wait);
164
165 return (SLP_OK);
166 }
167
168 /*
169 * The core functionality for dequeue.
170 */
dequeue_nolock(struct queue * q)171 static void *dequeue_nolock(struct queue *q) {
172 void *msg;
173 slp_queue_entry_t *qe = q->head;
174
175 if (!qe)
176 return (NULL); /* shouldn't get here */
177 msg = qe->msg;
178 if (!qe->next) /* last one in queue */
179 q->head = q->tail = NULL;
180 else
181 q->head = qe->next;
182 free(qe);
183 q->count--;
184 return (msg);
185 }
186
187 /*
188 * Returns the first message waiting or arriving in the queue, or if no
189 * message is available after waiting the amount of time specified in
190 * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
191 * returns NULL and sets 'etimed' to false.
192 */
slp_dequeue_timed(slp_queue_t * qa,timestruc_t * to,SLPBoolean * etimed)193 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
194 int err;
195 void *ans;
196 struct queue *q = qa;
197
198 if (etimed)
199 *etimed = SLP_FALSE;
200
201 (void) mutex_lock(q->lock);
202 if (q->count > 0) {
203 /* something's in the q, so no need to wait */
204 goto msg_available;
205 }
206
207 /* else wait */
208 while (q->count == 0) {
209 if (to) {
210 err = cond_timedwait(q->wait, q->lock, to);
211 } else {
212 err = cond_wait(q->wait, q->lock);
213 }
214 if (err == ETIME) {
215 (void) mutex_unlock(q->lock);
216 *etimed = SLP_TRUE;
217 return (NULL);
218 }
219 }
220
221 msg_available:
222 ans = dequeue_nolock(q);
223 (void) mutex_unlock(q->lock);
224 return (ans);
225 }
226
227 /*
228 * Removes the first message from the queue and returns it.
229 * Returns NULL only on internal error.
230 */
slp_dequeue(slp_queue_t * qa)231 void *slp_dequeue(slp_queue_t *qa) {
232 return (slp_dequeue_timed(qa, NULL, NULL));
233 }
234
235 /*
236 * Flushes the queue, using the caller-specified free function to
237 * free each message in the queue.
238 */
slp_flush_queue(slp_queue_t * qa,void (* free_f)(void *))239 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
240 slp_queue_entry_t *p, *pn;
241 struct queue *q = qa;
242
243 for (p = q->head; p; p = pn) {
244 pn = p->next;
245 free_f(p);
246 }
247 }
248
249 /*
250 * Frees a queue.
251 * The queue must be empty before it can be destroyed; slp_flush_queue
252 * can be used to empty a queue.
253 */
slp_destroy_queue(slp_queue_t * qa)254 void slp_destroy_queue(slp_queue_t *qa) {
255 struct queue *q = qa;
256
257 (void) mutex_destroy(q->lock);
258 (void) cond_destroy(q->wait);
259 free(q->lock);
260 free(q->wait);
261 free(q);
262 }
263