/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" /* * A synchronized FIFO queue for inter-thread producer-consumer semantics. * This queue will handle multiple writers and readers simultaneously. * * The following operations are provided: * slp_new_queue: create a new queue * slp_enqueue: place a message at the end of the queue * slp_enqueue_at_head: place a message the the start of the queue * slp_dequeue: remove and return the next message on the queue * (waits indefinately) * slp_dequeue_timed: remove and return the next message on the queue * (waits only for a specified time) * slp_flush_queue: flushes and frees all messages on a queue * slp_destroy_queue: frees an empty queue. */ #include #include #include #include #include #include #include /* Private implementation details */ struct queue_entry { void *msg; struct queue_entry *next; }; typedef struct queue_entry slp_queue_entry_t; struct queue { slp_queue_entry_t *head; slp_queue_entry_t *tail; mutex_t *lock; cond_t *wait; int count; }; /* * Creates, initializes, and returns a new queue. * If an initialization error occured, returns NULL and sets err to * the appropriate SLP error code. * queues can operate in one of two modes: timed-wait, and infinite * wait. The timeout parameter specifies which of these modes should * be enabled for the new queue. */ slp_queue_t *slp_new_queue(SLPError *err) { mutex_t *lock; cond_t *wait; struct queue *q; *err = SLP_OK; /* initialize new mutex and semaphore */ if ((lock = calloc(1, sizeof (*lock))) == NULL) { *err = SLP_MEMORY_ALLOC_FAILED; slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); return (NULL); } /* intialize condition vars */ if (!(wait = calloc(1, sizeof (*wait)))) { *err = SLP_MEMORY_ALLOC_FAILED; slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); return (NULL); } (void) cond_init(wait, NULL, NULL); /* create the queue */ if ((q = malloc(sizeof (*q))) == NULL) { *err = SLP_MEMORY_ALLOC_FAILED; slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory"); return (NULL); } q->head = NULL; q->lock = lock; q->wait = wait; q->count = 0; return (q); } /* * Adds msg to the tail of queue q. * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED * if it couldn't allocate memory. */ SLPError slp_enqueue(slp_queue_t *qa, void *msg) { slp_queue_entry_t *qe; struct queue *q = qa; if ((qe = malloc(sizeof (*qe))) == NULL) { slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); return (SLP_MEMORY_ALLOC_FAILED); } (void) mutex_lock(q->lock); qe->msg = msg; qe->next = NULL; if (q->head != NULL) { /* queue is not emptry */ q->tail->next = qe; q->tail = qe; } else { /* queue is empty */ q->head = q->tail = qe; } q->count++; (void) mutex_unlock(q->lock); (void) cond_signal(q->wait); return (SLP_OK); } /* * Inserts a message at the head of the queue. This is useful for inserting * things like cancel messages. */ SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) { slp_queue_entry_t *qe; struct queue *q = qa; if ((qe = malloc(sizeof (*qe))) == NULL) { slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory"); return (SLP_MEMORY_ALLOC_FAILED); } (void) mutex_lock(q->lock); qe->msg = msg; qe->next = q->head; q->head = qe; q->count++; (void) mutex_unlock(q->lock); (void) cond_signal(q->wait); return (SLP_OK); } /* * The core functionality for dequeue. */ static void *dequeue_nolock(struct queue *q) { void *msg; slp_queue_entry_t *qe = q->head; if (!qe) return (NULL); /* shouldn't get here */ msg = qe->msg; if (!qe->next) /* last one in queue */ q->head = q->tail = NULL; else q->head = qe->next; free(qe); q->count--; return (msg); } /* * Returns the first message waiting or arriving in the queue, or if no * message is available after waiting the amount of time specified in * 'to', returns NULL, and sets 'etimed' to true. If an error occured, * returns NULL and sets 'etimed' to false. */ void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) { int err; void *ans; struct queue *q = qa; if (etimed) *etimed = SLP_FALSE; (void) mutex_lock(q->lock); if (q->count > 0) { /* something's in the q, so no need to wait */ goto msg_available; } /* else wait */ while (q->count == 0) { if (to) { err = cond_timedwait(q->wait, q->lock, to); } else { err = cond_wait(q->wait, q->lock); } if (err == ETIME) { (void) mutex_unlock(q->lock); *etimed = SLP_TRUE; return (NULL); } } msg_available: ans = dequeue_nolock(q); (void) mutex_unlock(q->lock); return (ans); } /* * Removes the first message from the queue and returns it. * Returns NULL only on internal error. */ void *slp_dequeue(slp_queue_t *qa) { return (slp_dequeue_timed(qa, NULL, NULL)); } /* * Flushes the queue, using the caller-specified free function to * free each message in the queue. */ void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) { slp_queue_entry_t *p, *pn; struct queue *q = qa; for (p = q->head; p; p = pn) { pn = p->next; free_f(p); } } /* * Frees a queue. * The queue must be empty before it can be destroyed; slp_flush_queue * can be used to empty a queue. */ void slp_destroy_queue(slp_queue_t *qa) { struct queue *q = qa; (void) mutex_destroy(q->lock); (void) cond_destroy(q->wait); free(q->lock); free(q->wait); free(q); }