1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 /*
28 * A synchronized FIFO queue for inter-thread producer-consumer semantics.
29 * This queue will handle multiple writers and readers simultaneously.
30 *
31 * The following operations are provided:
32 * slp_new_queue: create a new queue
33 * slp_enqueue: place a message at the end of the queue
34 * slp_enqueue_at_head: place a message the the start of the queue
35 * slp_dequeue: remove and return the next message on the queue
36 * (waits indefinately)
37 * slp_dequeue_timed: remove and return the next message on the queue
38 * (waits only for a specified time)
39 * slp_flush_queue: flushes and frees all messages on a queue
40 * slp_destroy_queue: frees an empty queue.
41 */
42
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <thread.h>
46 #include <synch.h>
47 #include <syslog.h>
48 #include <slp.h>
49 #include <slp-internal.h>
50
51 /* Private implementation details */
52 struct queue_entry {
53 void *msg;
54 struct queue_entry *next;
55 };
56 typedef struct queue_entry slp_queue_entry_t;
57
58 struct queue {
59 slp_queue_entry_t *head;
60 slp_queue_entry_t *tail;
61 mutex_t *lock;
62 cond_t *wait;
63 int count;
64 };
65
66 /*
67 * Creates, initializes, and returns a new queue.
68 * If an initialization error occured, returns NULL and sets err to
69 * the appropriate SLP error code.
70 * queues can operate in one of two modes: timed-wait, and infinite
71 * wait. The timeout parameter specifies which of these modes should
72 * be enabled for the new queue.
73 */
slp_new_queue(SLPError * err)74 slp_queue_t *slp_new_queue(SLPError *err) {
75 mutex_t *lock;
76 cond_t *wait;
77 struct queue *q;
78
79 *err = SLP_OK;
80
81 /* initialize new mutex and semaphore */
82 if ((lock = calloc(1, sizeof (*lock))) == NULL) {
83 *err = SLP_MEMORY_ALLOC_FAILED;
84 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
85 return (NULL);
86 }
87
88 /* intialize condition vars */
89 if (!(wait = calloc(1, sizeof (*wait)))) {
90 *err = SLP_MEMORY_ALLOC_FAILED;
91 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
92 return (NULL);
93 }
94 (void) cond_init(wait, USYNC_THREAD, NULL);
95
96 /* create the queue */
97 if ((q = malloc(sizeof (*q))) == NULL) {
98 *err = SLP_MEMORY_ALLOC_FAILED;
99 slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
100 return (NULL);
101 }
102
103 q->head = NULL;
104 q->lock = lock;
105 q->wait = wait;
106 q->count = 0;
107
108 return (q);
109 }
110
111 /*
112 * Adds msg to the tail of queue q.
113 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
114 * if it couldn't allocate memory.
115 */
slp_enqueue(slp_queue_t * qa,void * msg)116 SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
117 slp_queue_entry_t *qe;
118 struct queue *q = qa;
119
120 if ((qe = malloc(sizeof (*qe))) == NULL) {
121 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
122 return (SLP_MEMORY_ALLOC_FAILED);
123 }
124
125 (void) mutex_lock(q->lock);
126 qe->msg = msg;
127 qe->next = NULL;
128 if (q->head != NULL) { /* queue is not emptry */
129 q->tail->next = qe;
130 q->tail = qe;
131 } else { /* queue is empty */
132 q->head = q->tail = qe;
133 }
134 q->count++;
135 (void) mutex_unlock(q->lock);
136 (void) cond_signal(q->wait);
137
138 return (SLP_OK);
139 }
140
141 /*
142 * Inserts a message at the head of the queue. This is useful for inserting
143 * things like cancel messages.
144 */
slp_enqueue_at_head(slp_queue_t * qa,void * msg)145 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
146 slp_queue_entry_t *qe;
147 struct queue *q = qa;
148
149 if ((qe = malloc(sizeof (*qe))) == NULL) {
150 slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
151 return (SLP_MEMORY_ALLOC_FAILED);
152 }
153
154 (void) mutex_lock(q->lock);
155 qe->msg = msg;
156 qe->next = q->head;
157 q->head = qe;
158
159 q->count++;
160 (void) mutex_unlock(q->lock);
161 (void) cond_signal(q->wait);
162
163 return (SLP_OK);
164 }
165
166 /*
167 * The core functionality for dequeue.
168 */
dequeue_nolock(struct queue * q)169 static void *dequeue_nolock(struct queue *q) {
170 void *msg;
171 slp_queue_entry_t *qe = q->head;
172
173 if (!qe)
174 return (NULL); /* shouldn't get here */
175 msg = qe->msg;
176 if (!qe->next) /* last one in queue */
177 q->head = q->tail = NULL;
178 else
179 q->head = qe->next;
180 free(qe);
181 q->count--;
182 return (msg);
183 }
184
185 /*
186 * Returns the first message waiting or arriving in the queue, or if no
187 * message is available after waiting the amount of time specified in
188 * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
189 * returns NULL and sets 'etimed' to false.
190 */
slp_dequeue_timed(slp_queue_t * qa,timestruc_t * to,SLPBoolean * etimed)191 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
192 int err;
193 void *ans;
194 struct queue *q = qa;
195
196 if (etimed)
197 *etimed = SLP_FALSE;
198
199 (void) mutex_lock(q->lock);
200 if (q->count > 0) {
201 /* something's in the q, so no need to wait */
202 goto msg_available;
203 }
204
205 /* else wait */
206 while (q->count == 0) {
207 if (to) {
208 err = cond_timedwait(q->wait, q->lock, to);
209 } else {
210 err = cond_wait(q->wait, q->lock);
211 }
212 if (err == ETIME) {
213 (void) mutex_unlock(q->lock);
214 *etimed = SLP_TRUE;
215 return (NULL);
216 }
217 }
218
219 msg_available:
220 ans = dequeue_nolock(q);
221 (void) mutex_unlock(q->lock);
222 return (ans);
223 }
224
225 /*
226 * Removes the first message from the queue and returns it.
227 * Returns NULL only on internal error.
228 */
slp_dequeue(slp_queue_t * qa)229 void *slp_dequeue(slp_queue_t *qa) {
230 return (slp_dequeue_timed(qa, NULL, NULL));
231 }
232
233 /*
234 * Flushes the queue, using the caller-specified free function to
235 * free each message in the queue.
236 */
slp_flush_queue(slp_queue_t * qa,void (* free_f)(void *))237 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
238 slp_queue_entry_t *p, *pn;
239 struct queue *q = qa;
240
241 for (p = q->head; p; p = pn) {
242 pn = p->next;
243 free_f(p);
244 }
245 }
246
247 /*
248 * Frees a queue.
249 * The queue must be empty before it can be destroyed; slp_flush_queue
250 * can be used to empty a queue.
251 */
slp_destroy_queue(slp_queue_t * qa)252 void slp_destroy_queue(slp_queue_t *qa) {
253 struct queue *q = qa;
254
255 (void) mutex_destroy(q->lock);
256 (void) cond_destroy(q->wait);
257 free(q->lock);
258 free(q->wait);
259 free(q);
260 }
261