/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */

/*
 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#include "lint.h"
#include "thr_uberdata.h"
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <thread.h>
#include <pthread.h>
#include <synch.h>
#include <port.h>
#include <signal.h>
#include <stdio.h>
#include <errno.h>
#include <stdarg.h>
#include <string.h>
#include <sys/aiocb.h>
#include <time.h>
#include <signal.h>
#include <fcntl.h>
#include "sigev_thread.h"

/*
 * There is but one spawner for all aio operations.
 */
thread_communication_data_t *sigev_aio_tcd = NULL;

/*
 * Set non-zero via _RT_DEBUG to enable debugging printf's.
 */
static int _rt_debug = 0;

void
init_sigev_thread(void)
{
	char *ldebug;

	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
		_rt_debug = atoi(ldebug);
}

/*
 * Routine to print debug messages:
 * If _rt_debug is set, printf the debug message to stderr
 * with an appropriate prefix.
 */
/*PRINTFLIKE1*/
static void
dprintf(const char *format, ...)
{
	if (_rt_debug) {
		va_list alist;

		va_start(alist, format);
		flockfile(stderr);
		pthread_cleanup_push(funlockfile, stderr);
		(void) fputs("DEBUG: ", stderr);
		(void) vfprintf(stderr, format, alist);
		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
		va_end(alist);
	}
}

/*
 * The notify_thread() function can be used as the start function of a new
 * thread but it is normally called from notifier(), below, in the context
 * of a thread pool worker thread.  It is used as the start function of a
 * new thread only when individual pthread attributes differ from those
 * that are common to all workers.  This only occurs in the AIO case.
 */
static void *
notify_thread(void *arg)
{
	sigev_thread_data_t *stdp = arg;
	void (*function)(union sigval) = stdp->std_func;
	union sigval argument = stdp->std_arg;

	lfree(stdp, sizeof (*stdp));
	function(argument);
	return (NULL);
}

/*
 * Thread pool interface to call the user-supplied notification function.
 */
static void
notifier(void *arg)
{
	(void) notify_thread(arg);
}

/*
 * This routine adds a new work request, described by function
 * and argument, to the list of outstanding jobs.
 * It returns 0 indicating success.  A value != 0 indicates an error.
 */
static int
sigev_add_work(thread_communication_data_t *tcdp,
	void (*function)(union sigval), union sigval argument)
{
	tpool_t *tpool = tcdp->tcd_poolp;
	sigev_thread_data_t *stdp;

	if (tpool == NULL)
		return (EINVAL);
	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
		return (errno);
	stdp->std_func = function;
	stdp->std_arg = argument;
	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
		lfree(stdp, sizeof (*stdp));
		return (errno);
	}
	return (0);
}

static void
sigev_destroy_pool(thread_communication_data_t *tcdp)
{
	if (tcdp->tcd_poolp != NULL)
		tpool_abandon(tcdp->tcd_poolp);
	tcdp->tcd_poolp = NULL;

	if (tcdp->tcd_subsystem == MQ) {
		/*
		 * synchronize with del_sigev_mq()
		 */
		sig_mutex_lock(&tcdp->tcd_lock);
		tcdp->tcd_server_id = 0;
		if (tcdp->tcd_msg_closing) {
			(void) cond_broadcast(&tcdp->tcd_cv);
			sig_mutex_unlock(&tcdp->tcd_lock);
			return;		/* del_sigev_mq() will free the tcd */
		}
		sig_mutex_unlock(&tcdp->tcd_lock);
	}

	/*
	 * now delete everything
	 */
	free_sigev_handler(tcdp);
}

/*
 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
 * functions for the daemon threads that get the event(s) for the
 * respective SIGEV_THREAD subsystems.  There is one timer spawner for
 * each timer_create(), one mqueue spawner for every mq_open(), and
 * exactly one aio spawner for all aio requests.  These spawners add
 * work requests to be done by a pool of daemon worker threads.  In case
 * the event requires creation of a worker thread with different pthread
 * attributes than those from the pool of workers, a new daemon thread
 * with these attributes is spawned apart from the pool of workers.
 * If the spawner fails to add work or fails to create an additional
 * thread because of lacking resources, it puts the event back into
 * the kernel queue and re-tries some time later.
 */

void *
timer_spawner(void *arg)
{
	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
	port_event_t port_event;

	/* destroy the pool if we are cancelled */
	pthread_cleanup_push(sigev_destroy_pool, tcdp);

	for (;;) {
		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
			dprintf("port_get on port %d failed with %d <%s>\n",
			    tcdp->tcd_port, errno, strerror(errno));
			break;
		}
		switch (port_event.portev_source) {
		case PORT_SOURCE_TIMER:
			break;
		case PORT_SOURCE_ALERT:
			if (port_event.portev_events != SIGEV_THREAD_TERM)
				errno = EPROTO;
			goto out;
		default:
			dprintf("port_get on port %d returned %u "
			    "(not PORT_SOURCE_TIMER)\n",
			    tcdp->tcd_port, port_event.portev_source);
			errno = EPROTO;
			goto out;
		}

		tcdp->tcd_overruns = port_event.portev_events - 1;
		if (sigev_add_work(tcdp,
		    tcdp->tcd_notif.sigev_notify_function,
		    tcdp->tcd_notif.sigev_value) != 0)
			break;
		/* wait until job is done before looking for another */
		tpool_wait(tcdp->tcd_poolp);
	}
out:
	pthread_cleanup_pop(1);
	return (NULL);
}

void *
mqueue_spawner(void *arg)
{
	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
	int ret = 0;
	int ntype;
	void (*function)(union sigval);
	union sigval argument;

	/* destroy the pool if we are cancelled */
	pthread_cleanup_push(sigev_destroy_pool, tcdp);

	while (ret == 0) {
		sig_mutex_lock(&tcdp->tcd_lock);
		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
		while ((ntype = tcdp->tcd_msg_enabled) == 0)
			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
		pthread_cleanup_pop(1);

		while (sem_wait(tcdp->tcd_msg_avail) == -1)
			continue;

		sig_mutex_lock(&tcdp->tcd_lock);
		tcdp->tcd_msg_enabled = 0;
		sig_mutex_unlock(&tcdp->tcd_lock);

		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
		if (ntype == SIGEV_THREAD) {
			function = tcdp->tcd_notif.sigev_notify_function;
			argument.sival_ptr = tcdp->tcd_msg_userval;
			ret = sigev_add_work(tcdp, function, argument);
		} else {	/* ntype == SIGEV_PORT */
			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
			    0, (uintptr_t)tcdp->tcd_msg_object,
			    tcdp->tcd_msg_userval);
		}
	}
	sig_mutex_unlock(&tcdp->tcd_lock);

	pthread_cleanup_pop(1);
	return (NULL);
}

void *
aio_spawner(void *arg)
{
	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
	int error = 0;
	void (*function)(union sigval);
	union sigval argument;
	port_event_t port_event;
	struct sigevent *sigevp;
	timespec_t delta;
	pthread_attr_t *attrp;

	/* destroy the pool if we are cancelled */
	pthread_cleanup_push(sigev_destroy_pool, tcdp);

	while (error == 0) {
		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
			error = errno;
			dprintf("port_get on port %d failed with %d <%s>\n",
			    tcdp->tcd_port, error, strerror(error));
			break;
		}
		switch (port_event.portev_source) {
		case PORT_SOURCE_AIO:
			break;
		case PORT_SOURCE_ALERT:
			if (port_event.portev_events != SIGEV_THREAD_TERM)
				errno = EPROTO;
			goto out;
		default:
			dprintf("port_get on port %d returned %u "
			    "(not PORT_SOURCE_AIO)\n",
			    tcdp->tcd_port, port_event.portev_source);
			errno = EPROTO;
			goto out;
		}
		argument.sival_ptr = port_event.portev_user;
		switch (port_event.portev_events) {
		case AIOLIO:
#if !defined(_LP64)
		case AIOLIO64:
#endif
			sigevp = (struct sigevent *)port_event.portev_object;
			function = sigevp->sigev_notify_function;
			attrp = sigevp->sigev_notify_attributes;
			break;
		case AIOAREAD:
		case AIOAWRITE:
		case AIOFSYNC:
			{
			aiocb_t *aiocbp =
			    (aiocb_t *)port_event.portev_object;
			function = aiocbp->aio_sigevent.sigev_notify_function;
			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
			break;
			}
#if !defined(_LP64)
		case AIOAREAD64:
		case AIOAWRITE64:
		case AIOFSYNC64:
			{
			aiocb64_t *aiocbp =
			    (aiocb64_t *)port_event.portev_object;
			function = aiocbp->aio_sigevent.sigev_notify_function;
			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
			break;
			}
#endif
		default:
			function = NULL;
			attrp = NULL;
			break;
		}

		if (function == NULL)
			error = EINVAL;
		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
			error = sigev_add_work(tcdp, function, argument);
		else {
			/*
			 * The attributes don't match.
			 * Spawn a thread with the non-matching attributes.
			 */
			pthread_attr_t local_attr;
			sigev_thread_data_t *stdp;

			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
				error = ENOMEM;
			else
				error = pthread_attr_clone(&local_attr, attrp);

			if (error == 0) {
				(void) pthread_attr_setdetachstate(
				    &local_attr, PTHREAD_CREATE_DETACHED);
				(void) pthread_attr_setdaemonstate_np(
				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
				stdp->std_func = function;
				stdp->std_arg = argument;
				error = pthread_create(NULL, &local_attr,
				    notify_thread, stdp);
				(void) pthread_attr_destroy(&local_attr);
			}
			if (error && stdp != NULL)
				lfree(stdp, sizeof (*stdp));
		}

		if (error) {
			dprintf("Cannot add work, error=%d <%s>.\n",
			    error, strerror(error));
			if (error == EAGAIN || error == ENOMEM) {
				/* (Temporary) no resources are available. */
				if (_port_dispatch(tcdp->tcd_port, 0,
				    PORT_SOURCE_AIO, port_event.portev_events,
				    port_event.portev_object,
				    port_event.portev_user) != 0)
					break;
				error = 0;
				delta.tv_sec = 0;
				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
				(void) nanosleep(&delta, NULL);
			}
		}
	}
out:
	pthread_cleanup_pop(1);
	return (NULL);
}

/*
 * Allocate a thread_communication_data_t block.
 */
static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)
{
	thread_communication_data_t *tcdp;

	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
		tcdp->tcd_subsystem = caller;
		tcdp->tcd_port = -1;
		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
	}
	return (tcdp);
}

/*
 * Free a thread_communication_data_t block.
 */
void
free_sigev_handler(thread_communication_data_t *tcdp)
{
	if (tcdp->tcd_attrp) {
		(void) pthread_attr_destroy(tcdp->tcd_attrp);
		tcdp->tcd_attrp = NULL;
	}
	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));

	switch (tcdp->tcd_subsystem) {
	case TIMER:
	case AIO:
		if (tcdp->tcd_port >= 0)
			(void) close(tcdp->tcd_port);
		break;
	case MQ:
		tcdp->tcd_msg_avail = NULL;
		tcdp->tcd_msg_object = NULL;
		tcdp->tcd_msg_userval = NULL;
		tcdp->tcd_msg_enabled = 0;
		break;
	}

	lfree(tcdp, sizeof (*tcdp));
}

/*
 * Initialize data structure and create the port.
 */
thread_communication_data_t *
setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
{
	thread_communication_data_t *tcdp;
	int error;

	if (sigevp == NULL) {
		errno = EINVAL;
		return (NULL);
	}

	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
		errno = ENOMEM;
		return (NULL);
	}

	if (sigevp->sigev_notify_attributes == NULL)
		tcdp->tcd_attrp = NULL;		/* default attributes */
	else {
		/*
		 * We cannot just copy the sigevp->sigev_notify_attributes
		 * pointer.  We need to initialize a new pthread_attr_t
		 * structure with the values from the user-supplied
		 * pthread_attr_t.
		 */
		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
		error = pthread_attr_clone(tcdp->tcd_attrp,
		    sigevp->sigev_notify_attributes);
		if (error) {
			tcdp->tcd_attrp = NULL;
			free_sigev_handler(tcdp);
			errno = error;
			return (NULL);
		}
	}
	tcdp->tcd_notif = *sigevp;
	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;

	if (caller == TIMER || caller == AIO) {
		if ((tcdp->tcd_port = port_create()) < 0 ||
		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
			free_sigev_handler(tcdp);
			errno = EBADF;
			return (NULL);
		}
	}
	return (tcdp);
}

/*
 * Create a thread pool and launch the spawner.
 */
int
launch_spawner(thread_communication_data_t *tcdp)
{
	int ret;
	int maxworkers;
	void *(*spawner)(void *);
	sigset_t set;
	sigset_t oset;

	switch (tcdp->tcd_subsystem) {
	case TIMER:
		spawner = timer_spawner;
		maxworkers = 1;
		break;
	case MQ:
		spawner = mqueue_spawner;
		maxworkers = 1;
		break;
	case AIO:
		spawner = aio_spawner;
		maxworkers = 100;
		break;
	default:
		return (-1);
	}
	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
	    tcdp->tcd_notif.sigev_notify_attributes);
	if (tcdp->tcd_poolp == NULL)
		return (-1);
	/* create the spawner with all signals blocked */
	(void) sigfillset(&set);
	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
	ret = thr_create(NULL, 0, spawner, tcdp,
	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
	if (ret != 0) {
		tpool_destroy(tcdp->tcd_poolp);
		tcdp->tcd_poolp = NULL;
		return (-1);
	}
	return (0);
}

/*
 * Delete the data associated with the sigev_thread timer, if timer is
 * associated with such a notification option.
 * Destroy the timer_spawner thread.
 */
int
del_sigev_timer(timer_t timer)
{
	int rc = 0;
	thread_communication_data_t *tcdp;

	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
		sig_mutex_lock(&tcdp->tcd_lock);
		if (tcdp->tcd_port >= 0) {
			if ((rc = port_alert(tcdp->tcd_port,
			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
				dprintf("del_sigev_timer(%d) OK.\n", timer);
			}
		}
		timer_tcd[timer] = NULL;
		sig_mutex_unlock(&tcdp->tcd_lock);
	}
	return (rc);
}

int
sigev_timer_getoverrun(timer_t timer)
{
	thread_communication_data_t *tcdp;

	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
		return (tcdp->tcd_overruns);
	return (0);
}

static void
del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
{
	sig_mutex_unlock(&tcdp->tcd_lock);
	free_sigev_handler(tcdp);
}

/*
 * Delete the data associated with the sigev_thread message queue,
 * if the message queue is associated with such a notification option.
 * Destroy the mqueue_spawner thread.
 */
void
del_sigev_mq(thread_communication_data_t *tcdp)
{
	pthread_t server_id;
	int rc;

	sig_mutex_lock(&tcdp->tcd_lock);

	server_id = tcdp->tcd_server_id;
	tcdp->tcd_msg_closing = 1;
	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
		sig_mutex_unlock(&tcdp->tcd_lock);
		dprintf("Fail to cancel %u with error %d <%s>.\n",
		    server_id, rc, strerror(rc));
		return;
	}

	/*
	 * wait for sigev_destroy_pool() to finish
	 */
	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
	while (tcdp->tcd_server_id == server_id)
		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
	pthread_cleanup_pop(1);
}

/*
 * POSIX aio:
 * If the notification type is SIGEV_THREAD, set up
 * the port number for notifications.  Create the
 * thread pool and launch the spawner if necessary.
 * If the notification type is not SIGEV_THREAD, do nothing.
 */
int
_aio_sigev_thread_init(struct sigevent *sigevp)
{
	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
	static cond_t sigev_aio_cv = DEFAULTCV;
	static int sigev_aio_busy = 0;

	thread_communication_data_t *tcdp;
	int port;
	int cancel_state;
	int rc = 0;

	if (sigevp == NULL ||
	    sigevp->sigev_notify != SIGEV_THREAD ||
	    sigevp->sigev_notify_function == NULL)
		return (0);

	lmutex_lock(&sigev_aio_lock);
	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
	while (sigev_aio_busy)
		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
	(void) pthread_setcancelstate(cancel_state, NULL);
	if ((tcdp = sigev_aio_tcd) != NULL)
		port = tcdp->tcd_port;
	else {
		sigev_aio_busy = 1;
		lmutex_unlock(&sigev_aio_lock);

		tcdp = setup_sigev_handler(sigevp, AIO);
		if (tcdp == NULL) {
			port = -1;
			rc = -1;
		} else if (launch_spawner(tcdp) != 0) {
			free_sigev_handler(tcdp);
			tcdp = NULL;
			port = -1;
			rc = -1;
		} else {
			port = tcdp->tcd_port;
		}

		lmutex_lock(&sigev_aio_lock);
		sigev_aio_tcd = tcdp;
		sigev_aio_busy = 0;
		(void) cond_broadcast(&sigev_aio_cv);
	}
	lmutex_unlock(&sigev_aio_lock);
	sigevp->sigev_signo = port;
	return (rc);
}

int
_aio_sigev_thread(aiocb_t *aiocbp)
{
	if (aiocbp == NULL)
		return (0);
	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
}

#if !defined(_LP64)
int
_aio_sigev_thread64(aiocb64_t *aiocbp)
{
	if (aiocbp == NULL)
		return (0);
	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
}
#endif

/*
 * Cleanup POSIX aio after fork1() in the child process.
 */
void
postfork1_child_sigev_aio(void)
{
	thread_communication_data_t *tcdp;

	if ((tcdp = sigev_aio_tcd) != NULL) {
		sigev_aio_tcd = NULL;
		tcd_teardown(tcdp);
	}
}

/*
 * Utility function for the various postfork1_child_sigev_*() functions.
 * Clean up the tcdp data structure and close the port.
 */
void
tcd_teardown(thread_communication_data_t *tcdp)
{
	if (tcdp->tcd_poolp != NULL)
		tpool_abandon(tcdp->tcd_poolp);
	tcdp->tcd_poolp = NULL;
	tcdp->tcd_server_id = 0;
	free_sigev_handler(tcdp);
}