xref: /illumos-gate/usr/src/lib/libslp/clib/slp_queue.c (revision 66582b606a8194f7f3ba5b3a3a6dca5b0d346361)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * A synchronized FIFO queue for inter-thread producer-consumer semantics.
29  * This queue will handle multiple writers and readers simultaneously.
30  *
31  * The following operations are provided:
32  * slp_new_queue:	create a new queue
33  * slp_enqueue:		place a message at the end of the queue
34  * slp_enqueue_at_head:	place a message the the start of the queue
35  * slp_dequeue:		remove and return the next message on the queue
36  *				(waits indefinately)
37  * slp_dequeue_timed:	remove and return the next message on the queue
38  *				(waits only for a specified time)
39  * slp_flush_queue:	flushes and frees all messages on a queue
40  * slp_destroy_queue:	frees an empty queue.
41  */
42 
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <thread.h>
46 #include <synch.h>
47 #include <syslog.h>
48 #include <slp.h>
49 #include <slp-internal.h>
50 
51 /* Private implementation details */
52 struct queue_entry {
53 	void *msg;
54 	struct queue_entry *next;
55 };
56 typedef struct queue_entry slp_queue_entry_t;
57 
58 struct queue {
59 	slp_queue_entry_t *head;
60 	slp_queue_entry_t *tail;
61 	mutex_t *lock;
62 	cond_t *wait;
63 	int count;
64 };
65 
66 /*
67  * Creates, initializes, and returns a new queue.
68  * If an initialization error occured, returns NULL and sets err to
69  * the appropriate SLP error code.
70  * queues can operate in one of two modes: timed-wait, and infinite
71  * wait. The timeout parameter specifies which of these modes should
72  * be enabled for the new queue.
73  */
74 slp_queue_t *slp_new_queue(SLPError *err) {
75 	mutex_t *lock;
76 	cond_t *wait;
77 	struct queue *q;
78 
79 	*err = SLP_OK;
80 
81 	/* initialize new mutex and semaphore */
82 	if ((lock = calloc(1, sizeof (*lock))) == NULL) {
83 		*err = SLP_MEMORY_ALLOC_FAILED;
84 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
85 		return (NULL);
86 	}
87 
88 	/* intialize condition vars */
89 	if (!(wait = calloc(1, sizeof (*wait)))) {
90 		*err = SLP_MEMORY_ALLOC_FAILED;
91 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
92 		return (NULL);
93 	}
94 	(void) cond_init(wait, USYNC_THREAD, NULL);
95 
96 	/* create the queue */
97 	if ((q = malloc(sizeof (*q))) == NULL) {
98 		*err = SLP_MEMORY_ALLOC_FAILED;
99 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
100 		return (NULL);
101 	}
102 
103 	q->head = NULL;
104 	q->lock = lock;
105 	q->wait = wait;
106 	q->count = 0;
107 
108 	return (q);
109 }
110 
111 /*
112  * Adds msg to the tail of queue q.
113  * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
114  * if it couldn't allocate memory.
115  */
116 SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
117 	slp_queue_entry_t *qe;
118 	struct queue *q = qa;
119 
120 	if ((qe = malloc(sizeof (*qe))) == NULL) {
121 		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
122 		return (SLP_MEMORY_ALLOC_FAILED);
123 	}
124 
125 	(void) mutex_lock(q->lock);
126 	qe->msg = msg;
127 	qe->next = NULL;
128 	if (q->head != NULL) {	/* queue is not emptry */
129 		q->tail->next = qe;
130 		q->tail = qe;
131 	} else {		/* queue is empty */
132 		q->head = q->tail = qe;
133 	}
134 	q->count++;
135 	(void) mutex_unlock(q->lock);
136 	(void) cond_signal(q->wait);
137 
138 	return (SLP_OK);
139 }
140 
141 /*
142  * Inserts a message at the head of the queue. This is useful for inserting
143  * things like cancel messages.
144  */
145 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
146 	slp_queue_entry_t *qe;
147 	struct queue *q = qa;
148 
149 	if ((qe = malloc(sizeof (*qe))) == NULL) {
150 		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
151 		return (SLP_MEMORY_ALLOC_FAILED);
152 	}
153 
154 	(void) mutex_lock(q->lock);
155 	qe->msg = msg;
156 	qe->next = q->head;
157 	q->head = qe;
158 
159 	q->count++;
160 	(void) mutex_unlock(q->lock);
161 	(void) cond_signal(q->wait);
162 
163 	return (SLP_OK);
164 }
165 
166 /*
167  * The core functionality for dequeue.
168  */
169 static void *dequeue_nolock(struct queue *q) {
170 	void *msg;
171 	slp_queue_entry_t *qe = q->head;
172 
173 	if (!qe)
174 		return (NULL);	/* shouldn't get here */
175 	msg = qe->msg;
176 	if (!qe->next)		/* last one in queue */
177 		q->head = q->tail = NULL;
178 	else
179 		q->head = qe->next;
180 	free(qe);
181 	q->count--;
182 	return (msg);
183 }
184 
185 /*
186  * Returns the first message waiting or arriving in the queue, or if no
187  * message is available after waiting the amount of time specified in
188  * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
189  * returns NULL and sets 'etimed' to false.
190  */
191 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
192 	int err;
193 	void *ans;
194 	struct queue *q = qa;
195 
196 	if (etimed)
197 		*etimed = SLP_FALSE;
198 
199 	(void) mutex_lock(q->lock);
200 	if (q->count > 0) {
201 		/* something's in the q, so no need to wait */
202 		goto msg_available;
203 	}
204 
205 	/* else wait */
206 	while (q->count == 0) {
207 		if (to) {
208 			err = cond_timedwait(q->wait, q->lock, to);
209 		} else {
210 			err = cond_wait(q->wait, q->lock);
211 		}
212 		if (err == ETIME) {
213 			(void) mutex_unlock(q->lock);
214 			*etimed = SLP_TRUE;
215 			return (NULL);
216 		}
217 	}
218 
219 msg_available:
220 	ans = dequeue_nolock(q);
221 	(void) mutex_unlock(q->lock);
222 	return (ans);
223 }
224 
225 /*
226  * Removes the first message from the queue and returns it.
227  * Returns NULL only on internal error.
228  */
229 void *slp_dequeue(slp_queue_t *qa) {
230 	return (slp_dequeue_timed(qa, NULL, NULL));
231 }
232 
233 /*
234  * Flushes the queue, using the caller-specified free function to
235  * free each message in the queue.
236  */
237 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
238 	slp_queue_entry_t *p, *pn;
239 	struct queue *q = qa;
240 
241 	for (p = q->head; p; p = pn) {
242 		pn = p->next;
243 		free_f(p);
244 	}
245 }
246 
247 /*
248  * Frees a queue.
249  * The queue must be empty before it can be destroyed; slp_flush_queue
250  * can be used to empty a queue.
251  */
252 void slp_destroy_queue(slp_queue_t *qa) {
253 	struct queue *q = qa;
254 
255 	(void) mutex_destroy(q->lock);
256 	(void) cond_destroy(q->wait);
257 	free(q->lock);
258 	free(q->wait);
259 	free(q);
260 }
261