/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" #include "synonyms.h" #include "thr_uberdata.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "sigev_thread.h" /* * There is but one spawner for all aio operations. */ thread_communication_data_t *sigev_aio_tcd = NULL; /* * Set non-zero via _RT_DEBUG to enable debugging printf's. */ static int _rt_debug = 0; void init_sigev_thread(void) { char *ldebug; if ((ldebug = getenv("_RT_DEBUG")) != NULL) _rt_debug = atoi(ldebug); } /* * Routine to print debug messages: * If _rt_debug is set, printf the debug message to stderr * with an appropriate prefix. */ /*PRINTFLIKE1*/ static void dprintf(const char *format, ...) { if (_rt_debug) { va_list alist; va_start(alist, format); flockfile(stderr); (void) fputs("DEBUG: ", stderr); (void) vfprintf(stderr, format, alist); funlockfile(stderr); va_end(alist); } } /* * The notify_thread() function can be used as the start function of a new * thread but it is normally called from notifier(), below, in the context * of a thread pool worker thread. It is used as the start function of a * new thread only when individual pthread attributes differ from those * that are common to all workers. This only occurs in the AIO case. */ static void * notify_thread(void *arg) { sigev_thread_data_t *stdp = arg; void (*function)(union sigval) = stdp->std_func; union sigval argument = stdp->std_arg; lfree(stdp, sizeof (*stdp)); function(argument); return (NULL); } /* * Thread pool interface to call the user-supplied notification function. */ static void notifier(void *arg) { (void) notify_thread(arg); } /* * This routine adds a new work request, described by function * and argument, to the list of outstanding jobs. * It returns 0 indicating success. A value != 0 indicates an error. */ static int sigev_add_work(thread_communication_data_t *tcdp, void (*function)(union sigval), union sigval argument) { tpool_t *tpool = tcdp->tcd_poolp; sigev_thread_data_t *stdp; if (tpool == NULL) return (EINVAL); if ((stdp = lmalloc(sizeof (*stdp))) == NULL) return (errno); stdp->std_func = function; stdp->std_arg = argument; if (tpool_dispatch(tpool, notifier, stdp) != 0) { lfree(stdp, sizeof (*stdp)); return (errno); } return (0); } static void sigev_destroy_pool(thread_communication_data_t *tcdp) { if (tcdp->tcd_poolp != NULL) tpool_abandon(tcdp->tcd_poolp); tcdp->tcd_poolp = NULL; if (tcdp->tcd_subsystem == MQ) { /* * synchronize with del_sigev_mq() */ sig_mutex_lock(&tcdp->tcd_lock); tcdp->tcd_server_id = 0; if (tcdp->tcd_msg_closing) { (void) cond_broadcast(&tcdp->tcd_cv); sig_mutex_unlock(&tcdp->tcd_lock); return; /* del_sigev_mq() will free the tcd */ } sig_mutex_unlock(&tcdp->tcd_lock); } /* * now delete everything */ free_sigev_handler(tcdp); } /* * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main * functions for the daemon threads that get the event(s) for the * respective SIGEV_THREAD subsystems. There is one timer spawner for * each timer_create(), one mqueue spawner for every mq_open(), and * exactly one aio spawner for all aio requests. These spawners add * work requests to be done by a pool of daemon worker threads. In case * the event requires creation of a worker thread with different pthread * attributes than those from the pool of workers, a new daemon thread * with these attributes is spawned apart from the pool of workers. * If the spawner fails to add work or fails to create an additional * thread because of lacking resources, it puts the event back into * the kernel queue and re-tries some time later. */ void * timer_spawner(void *arg) { thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; port_event_t port_event; /* destroy the pool if we are cancelled */ pthread_cleanup_push(sigev_destroy_pool, tcdp); for (;;) { if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { dprintf("port_get on port %d failed with %d <%s>\n", tcdp->tcd_port, errno, strerror(errno)); break; } switch (port_event.portev_source) { case PORT_SOURCE_TIMER: break; case PORT_SOURCE_ALERT: if (port_event.portev_events != SIGEV_THREAD_TERM) errno = EPROTO; goto out; default: dprintf("port_get on port %d returned %u " "(not PORT_SOURCE_TIMER)\n", tcdp->tcd_port, port_event.portev_source); errno = EPROTO; goto out; } tcdp->tcd_overruns = port_event.portev_events - 1; if (sigev_add_work(tcdp, tcdp->tcd_notif.sigev_notify_function, tcdp->tcd_notif.sigev_value) != 0) break; /* wait until job is done before looking for another */ tpool_wait(tcdp->tcd_poolp); } out: pthread_cleanup_pop(1); return (NULL); } void * mqueue_spawner(void *arg) { thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; int ret = 0; int ntype; void (*function)(union sigval); union sigval argument; /* destroy the pool if we are cancelled */ pthread_cleanup_push(sigev_destroy_pool, tcdp); while (ret == 0) { sig_mutex_lock(&tcdp->tcd_lock); pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock); while ((ntype = tcdp->tcd_msg_enabled) == 0) (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); pthread_cleanup_pop(1); while (sem_wait(tcdp->tcd_msg_avail) == -1) continue; sig_mutex_lock(&tcdp->tcd_lock); tcdp->tcd_msg_enabled = 0; sig_mutex_unlock(&tcdp->tcd_lock); /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */ if (ntype == SIGEV_THREAD) { function = tcdp->tcd_notif.sigev_notify_function; argument.sival_ptr = tcdp->tcd_msg_userval; ret = sigev_add_work(tcdp, function, argument); } else { /* ntype == SIGEV_PORT */ ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ, 0, (uintptr_t)tcdp->tcd_msg_object, tcdp->tcd_msg_userval); } } sig_mutex_unlock(&tcdp->tcd_lock); pthread_cleanup_pop(1); return (NULL); } void * aio_spawner(void *arg) { thread_communication_data_t *tcdp = (thread_communication_data_t *)arg; int error = 0; void (*function)(union sigval); union sigval argument; port_event_t port_event; struct sigevent *sigevp; timespec_t delta; pthread_attr_t *attrp; /* destroy the pool if we are cancelled */ pthread_cleanup_push(sigev_destroy_pool, tcdp); while (error == 0) { if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) { error = errno; dprintf("port_get on port %d failed with %d <%s>\n", tcdp->tcd_port, error, strerror(error)); break; } switch (port_event.portev_source) { case PORT_SOURCE_AIO: break; case PORT_SOURCE_ALERT: if (port_event.portev_events != SIGEV_THREAD_TERM) errno = EPROTO; goto out; default: dprintf("port_get on port %d returned %u " "(not PORT_SOURCE_AIO)\n", tcdp->tcd_port, port_event.portev_source); errno = EPROTO; goto out; } argument.sival_ptr = port_event.portev_user; switch (port_event.portev_events) { case AIOLIO: #if !defined(_LP64) case AIOLIO64: #endif sigevp = (struct sigevent *)port_event.portev_object; function = sigevp->sigev_notify_function; attrp = sigevp->sigev_notify_attributes; break; case AIOAREAD: case AIOAWRITE: case AIOFSYNC: { aiocb_t *aiocbp = (aiocb_t *)port_event.portev_object; function = aiocbp->aio_sigevent.sigev_notify_function; attrp = aiocbp->aio_sigevent.sigev_notify_attributes; break; } #if !defined(_LP64) case AIOAREAD64: case AIOAWRITE64: case AIOFSYNC64: { aiocb64_t *aiocbp = (aiocb64_t *)port_event.portev_object; function = aiocbp->aio_sigevent.sigev_notify_function; attrp = aiocbp->aio_sigevent.sigev_notify_attributes; break; } #endif default: function = NULL; attrp = NULL; break; } if (function == NULL) error = EINVAL; else if (_pthread_attr_equal(attrp, tcdp->tcd_attrp)) error = sigev_add_work(tcdp, function, argument); else { /* * The attributes don't match. * Spawn a thread with the non-matching attributes. */ pthread_attr_t local_attr; sigev_thread_data_t *stdp; if ((stdp = lmalloc(sizeof (*stdp))) == NULL) error = ENOMEM; else error = _pthread_attr_clone(&local_attr, attrp); if (error == 0) { (void) pthread_attr_setdetachstate( &local_attr, PTHREAD_CREATE_DETACHED); (void) _pthread_attr_setdaemonstate_np( &local_attr, PTHREAD_CREATE_DAEMON_NP); stdp->std_func = function; stdp->std_arg = argument; error = pthread_create(NULL, &local_attr, notify_thread, stdp); (void) pthread_attr_destroy(&local_attr); } if (error && stdp != NULL) lfree(stdp, sizeof (*stdp)); } if (error) { dprintf("Cannot add work, error=%d <%s>.\n", error, strerror(error)); if (error == EAGAIN || error == ENOMEM) { /* (Temporary) no resources are available. */ if (_port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_AIO, port_event.portev_events, port_event.portev_object, port_event.portev_user) != 0) break; error = 0; delta.tv_sec = 0; delta.tv_nsec = NANOSEC / 20; /* 50 msec */ (void) nanosleep(&delta, NULL); } } } out: pthread_cleanup_pop(1); return (NULL); } /* * Allocate a thread_communication_data_t block. */ static thread_communication_data_t * alloc_sigev_handler(subsystem_t caller) { thread_communication_data_t *tcdp; if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) { tcdp->tcd_subsystem = caller; tcdp->tcd_port = -1; (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL); (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL); } return (tcdp); } /* * Free a thread_communication_data_t block. */ void free_sigev_handler(thread_communication_data_t *tcdp) { if (tcdp->tcd_attrp) { (void) pthread_attr_destroy(tcdp->tcd_attrp); tcdp->tcd_attrp = NULL; } (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif)); switch (tcdp->tcd_subsystem) { case TIMER: case AIO: if (tcdp->tcd_port >= 0) (void) close(tcdp->tcd_port); break; case MQ: tcdp->tcd_msg_avail = NULL; tcdp->tcd_msg_object = NULL; tcdp->tcd_msg_userval = NULL; tcdp->tcd_msg_enabled = 0; break; } lfree(tcdp, sizeof (*tcdp)); } /* * Initialize data structure and create the port. */ thread_communication_data_t * setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller) { thread_communication_data_t *tcdp; int error; if (sigevp == NULL) { errno = EINVAL; return (NULL); } if ((tcdp = alloc_sigev_handler(caller)) == NULL) { errno = ENOMEM; return (NULL); } if (sigevp->sigev_notify_attributes == NULL) tcdp->tcd_attrp = NULL; /* default attributes */ else { /* * We cannot just copy the sigevp->sigev_notify_attributes * pointer. We need to initialize a new pthread_attr_t * structure with the values from the user-supplied * pthread_attr_t. */ tcdp->tcd_attrp = &tcdp->tcd_user_attr; error = _pthread_attr_clone(tcdp->tcd_attrp, sigevp->sigev_notify_attributes); if (error) { tcdp->tcd_attrp = NULL; free_sigev_handler(tcdp); errno = error; return (NULL); } } tcdp->tcd_notif = *sigevp; tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp; if (caller == TIMER || caller == AIO) { if ((tcdp->tcd_port = port_create()) < 0 || fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) { free_sigev_handler(tcdp); errno = EBADF; return (NULL); } } return (tcdp); } /* * Create a thread pool and launch the spawner. */ int launch_spawner(thread_communication_data_t *tcdp) { int ret; int maxworkers; void *(*spawner)(void *); sigset_t set; sigset_t oset; switch (tcdp->tcd_subsystem) { case TIMER: spawner = timer_spawner; maxworkers = 1; break; case MQ: spawner = mqueue_spawner; maxworkers = 1; break; case AIO: spawner = aio_spawner; maxworkers = 100; break; default: return (-1); } tcdp->tcd_poolp = tpool_create(1, maxworkers, 20, tcdp->tcd_notif.sigev_notify_attributes); if (tcdp->tcd_poolp == NULL) return (-1); /* create the spawner with all signals blocked */ (void) sigfillset(&set); (void) thr_sigsetmask(SIG_SETMASK, &set, &oset); ret = thr_create(NULL, 0, spawner, tcdp, THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id); (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL); if (ret != 0) { tpool_destroy(tcdp->tcd_poolp); tcdp->tcd_poolp = NULL; return (-1); } return (0); } /* * Delete the data associated with the sigev_thread timer, if timer is * associated with such a notification option. * Destroy the timer_spawner thread. */ int del_sigev_timer(timer_t timer) { int rc = 0; thread_communication_data_t *tcdp; if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) { sig_mutex_lock(&tcdp->tcd_lock); if (tcdp->tcd_port >= 0) { if ((rc = port_alert(tcdp->tcd_port, PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) { dprintf("del_sigev_timer(%d) OK.\n", timer); } } timer_tcd[timer] = NULL; sig_mutex_unlock(&tcdp->tcd_lock); } return (rc); } int sigev_timer_getoverrun(timer_t timer) { thread_communication_data_t *tcdp; if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) return (tcdp->tcd_overruns); return (0); } static void del_sigev_mq_cleanup(thread_communication_data_t *tcdp) { sig_mutex_unlock(&tcdp->tcd_lock); free_sigev_handler(tcdp); } /* * Delete the data associated with the sigev_thread message queue, * if the message queue is associated with such a notification option. * Destroy the mqueue_spawner thread. */ void del_sigev_mq(thread_communication_data_t *tcdp) { pthread_t server_id; int rc; sig_mutex_lock(&tcdp->tcd_lock); server_id = tcdp->tcd_server_id; tcdp->tcd_msg_closing = 1; if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */ sig_mutex_unlock(&tcdp->tcd_lock); dprintf("Fail to cancel %u with error %d <%s>.\n", server_id, rc, strerror(rc)); return; } /* * wait for sigev_destroy_pool() to finish */ pthread_cleanup_push(del_sigev_mq_cleanup, tcdp); while (tcdp->tcd_server_id == server_id) (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock); pthread_cleanup_pop(1); } /* * POSIX aio: * If the notification type is SIGEV_THREAD, set up * the port number for notifications. Create the * thread pool and launch the spawner if necessary. * If the notification type is not SIGEV_THREAD, do nothing. */ int _aio_sigev_thread_init(struct sigevent *sigevp) { static mutex_t sigev_aio_lock = DEFAULTMUTEX; static cond_t sigev_aio_cv = DEFAULTCV; static int sigev_aio_busy = 0; thread_communication_data_t *tcdp; int port; int rc = 0; if (sigevp == NULL || sigevp->sigev_notify != SIGEV_THREAD || sigevp->sigev_notify_function == NULL) return (0); lmutex_lock(&sigev_aio_lock); while (sigev_aio_busy) (void) _cond_wait(&sigev_aio_cv, &sigev_aio_lock); if ((tcdp = sigev_aio_tcd) != NULL) port = tcdp->tcd_port; else { sigev_aio_busy = 1; lmutex_unlock(&sigev_aio_lock); tcdp = setup_sigev_handler(sigevp, AIO); if (tcdp == NULL) { port = -1; rc = -1; } else if (launch_spawner(tcdp) != 0) { free_sigev_handler(tcdp); tcdp = NULL; port = -1; rc = -1; } else { port = tcdp->tcd_port; } lmutex_lock(&sigev_aio_lock); sigev_aio_tcd = tcdp; sigev_aio_busy = 0; (void) cond_broadcast(&sigev_aio_cv); } lmutex_unlock(&sigev_aio_lock); sigevp->sigev_signo = port; return (rc); } int _aio_sigev_thread(aiocb_t *aiocbp) { if (aiocbp == NULL) return (0); return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); } #if !defined(_LP64) int _aio_sigev_thread64(aiocb64_t *aiocbp) { if (aiocbp == NULL) return (0); return (_aio_sigev_thread_init(&aiocbp->aio_sigevent)); } #endif /* * Cleanup POSIX aio after fork1() in the child process. */ void postfork1_child_sigev_aio(void) { thread_communication_data_t *tcdp; if ((tcdp = sigev_aio_tcd) != NULL) { sigev_aio_tcd = NULL; tcd_teardown(tcdp); } } /* * Utility function for the various postfork1_child_sigev_*() functions. * Clean up the tcdp data structure and close the port. */ void tcd_teardown(thread_communication_data_t *tcdp) { if (tcdp->tcd_poolp != NULL) tpool_abandon(tcdp->tcd_poolp); tcdp->tcd_poolp = NULL; tcdp->tcd_server_id = 0; free_sigev_handler(tcdp); }