/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */

/*
 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#pragma weak mq_open = _mq_open
#pragma weak mq_close = _mq_close
#pragma weak mq_unlink = _mq_unlink
#pragma weak mq_send = _mq_send
#pragma weak mq_timedsend = _mq_timedsend
#pragma weak mq_reltimedsend_np = _mq_reltimedsend_np
#pragma weak mq_receive = _mq_receive
#pragma weak mq_timedreceive = _mq_timedreceive
#pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np
#pragma weak mq_notify = _mq_notify
#pragma weak mq_setattr = _mq_setattr
#pragma weak mq_getattr = _mq_getattr

#include "synonyms.h"
#include "mtlib.h"
#define	_KMEMUSER
#include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
#undef	_KMEMUSER
#include <mqueue.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <errno.h>
#include <stdarg.h>
#include <limits.h>
#include <pthread.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <inttypes.h>
#include "sigev_thread.h"
#include "pos4obj.h"

/*
 * Default values per message queue
 */
#define	MQ_MAXMSG	128
#define	MQ_MAXSIZE	1024

#define	MQ_MAGIC	0x4d534751		/* "MSGQ" */

/*
 * Message header which is part of messages in link list
 */
typedef struct {
	uint64_t 	msg_next;	/* offset of next message in the link */
	uint64_t	msg_len;	/* length of the message */
} msghdr_t;

/*
 * message queue description
 */
struct mq_dn {
	size_t		mqdn_flags;	/* open description flags */
};

/*
 * message queue descriptor structure
 */
typedef struct mq_des {
	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
	struct mq_des	*mqd_prev;	/* needed for fork-safety */
	int		mqd_magic;	/* magic # to identify mq_des */
	int		mqd_flags;	/* operation flag per open */
	struct mq_header *mqd_mq;	/* address pointer of message Q */
	struct mq_dn	*mqd_mqdn;	/* open	description */
	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
} mqdes_t;

/*
 * message queue common header, part of the mmap()ed file.
 * Since message queues may be shared between 32- and 64-bit processes,
 * care must be taken to make sure that the elements of this structure
 * are identical for both _LP64 and _ILP32 cases.
 */
typedef struct mq_header {
	/* first field must be mq_totsize, DO NOT insert before this	*/
	int64_t		mq_totsize;	/* total size of the Queue */
	int64_t		mq_maxsz;	/* max size of each message */
	uint32_t	mq_maxmsg;	/* max messages in the queue */
	uint32_t	mq_maxprio;	/* maximum mqueue priority */
	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
	uint32_t	mq_mask;	/* priority bitmask */
	uint64_t	mq_freep;	/* free message's head pointer */
	uint64_t	mq_headpp;	/* pointer to head pointers */
	uint64_t	mq_tailpp;	/* pointer to tail pointers */
	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
	uint64_t	mq_des;		/* pointer to msg Q descriptor */
	mutex_t		mq_exclusive;	/* acquire for exclusive access */
	sem_t		mq_rblocked;	/* number of processes rblocked */
	sem_t		mq_notfull;	/* mq_send()'s block on this */
	sem_t		mq_notempty;	/* mq_receive()'s block on this */
	sem_t		mq_spawner;	/* spawner thread blocks on this */
} mqhdr_t;

/*
 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
 * If this assumption is somehow invalidated, mq_open() needs to be changed
 * back to the old version which kept a count and enforced a limit.
 * We make sure that this is pointed out to those changing <sys/param.h>
 * by checking _MQ_OPEN_MAX at compile time.
 */
#if _MQ_OPEN_MAX != -1
#error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
#endif

#define	MQ_ALIGNSIZE	8	/* 64-bit alignment */

#ifdef DEBUG
#define	MQ_ASSERT(x)	assert(x);

#define	MQ_ASSERT_PTR(_m, _p) \
	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
	    _m->mq_totsize));

#define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
	int _val; \
	(void) sem_getvalue((sem), &_val); \
	assert((_val) <= val); }
#else
#define	MQ_ASSERT(x)
#define	MQ_ASSERT_PTR(_m, _p)
#define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
#endif

#define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
#define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
#define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))

#define	MQ_RESERVED	((mqdes_t *)-1)

#define	ABS_TIME	0
#define	REL_TIME	1

static mutex_t mq_list_lock = DEFAULTMUTEX;
static mqdes_t *mq_list = NULL;

extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);

static int
mq_is_valid(mqdes_t *mqdp)
{
	/*
	 * Any use of a message queue after it was closed is
	 * undefined.  But the standard strongly favours EBADF
	 * returns.  Before we dereference which could be fatal,
	 * we first do some pointer sanity checks.
	 */
	if (mqdp != NULL && mqdp != MQ_RESERVED &&
	    ((uintptr_t)mqdp & 0x7) == 0) {
		return (mqdp->mqd_magic == MQ_MAGIC);
	}

	return (0);
}

static void
mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
{
	int		i;
	uint64_t	temp;
	uint64_t	currentp;
	uint64_t	nextp;

	/*
	 * We only need to initialize the non-zero fields.  The use of
	 * ftruncate() on the message queue file assures that the
	 * pages will be zfod.
	 */
	(void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL);
	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
	(void) sem_init(&mqhp->mq_notempty, 1, 0);
	(void) sem_init(&mqhp->mq_spawner, 1, 0);
	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);

	mqhp->mq_maxsz = msgsize;
	mqhp->mq_maxmsg = maxmsg;

	/*
	 * As of this writing (1997), there are 32 message queue priorities.
	 * If this is to change, then the size of the mq_mask will
	 * also have to change.  If DEBUG is defined, assert that
	 * _MQ_PRIO_MAX hasn't changed.
	 */
	mqhp->mq_maxprio = _MQ_PRIO_MAX;
#if defined(DEBUG)
	/* LINTED always true */
	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
#endif

	/*
	 * Since the message queue can be mapped into different
	 * virtual address ranges by different processes, we don't
	 * keep track of pointers, only offsets into the shared region.
	 */
	mqhp->mq_headpp = sizeof (mqhdr_t);
	mqhp->mq_tailpp = mqhp->mq_headpp +
	    mqhp->mq_maxprio * sizeof (uint64_t);
	mqhp->mq_freep = mqhp->mq_tailpp +
	    mqhp->mq_maxprio * sizeof (uint64_t);

	currentp = mqhp->mq_freep;
	MQ_PTR(mqhp, currentp)->msg_next = 0;

	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
	for (i = 1; i < mqhp->mq_maxmsg; i++) {
		nextp = currentp + sizeof (msghdr_t) + temp;
		MQ_PTR(mqhp, currentp)->msg_next = nextp;
		MQ_PTR(mqhp, nextp)->msg_next = 0;
		currentp = nextp;
	}
}

static size_t
mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
{
	uint64_t currentp;
	msghdr_t *curbuf;
	uint64_t *headpp;
	uint64_t *tailpp;

	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));

	/*
	 * Get the head and tail pointers for the queue of maximum
	 * priority.  We shouldn't be here unless there is a message for
	 * us, so it's fair to assert that both the head and tail
	 * pointers are non-NULL.
	 */
	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);

	if (msg_prio != NULL)
		*msg_prio = mqhp->mq_curmaxprio;

	currentp = *headpp;
	MQ_ASSERT_PTR(mqhp, currentp);
	curbuf = MQ_PTR(mqhp, currentp);

	if ((*headpp = curbuf->msg_next) == NULL) {
		/*
		 * We just nuked the last message in this priority's queue.
		 * Twiddle this priority's bit, and then find the next bit
		 * tipped.
		 */
		uint_t prio = mqhp->mq_curmaxprio;

		mqhp->mq_mask &= ~(1u << prio);

		for (; prio != 0; prio--)
			if (mqhp->mq_mask & (1u << prio))
				break;
		mqhp->mq_curmaxprio = prio;

		*tailpp = NULL;
	}

	/*
	 * Copy the message, and put the buffer back on the free list.
	 */
	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
	curbuf->msg_next = mqhp->mq_freep;
	mqhp->mq_freep = currentp;

	return (curbuf->msg_len);
}


static void
mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
{
	uint64_t currentp;
	msghdr_t *curbuf;
	uint64_t *headpp;
	uint64_t *tailpp;

	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));

	/*
	 * Grab a free message block, and link it in.  We shouldn't
	 * be here unless there is room in the queue for us;  it's
	 * fair to assert that the free pointer is non-NULL.
	 */
	currentp = mqhp->mq_freep;
	MQ_ASSERT_PTR(mqhp, currentp);
	curbuf = MQ_PTR(mqhp, currentp);

	/*
	 * Remove a message from the free list, and copy in the new contents.
	 */
	mqhp->mq_freep = curbuf->msg_next;
	curbuf->msg_next = NULL;
	(void) memcpy((char *)&curbuf[1], msgp, len);
	curbuf->msg_len = len;

	headpp = HEAD_PTR(mqhp, prio);
	tailpp = TAIL_PTR(mqhp, prio);

	if (*tailpp == 0) {
		/*
		 * This is the first message on this queue.  Set the
		 * head and tail pointers, and tip the appropriate bit
		 * in the priority mask.
		 */
		*headpp = currentp;
		*tailpp = currentp;
		mqhp->mq_mask |= (1u << prio);
		if (prio > mqhp->mq_curmaxprio)
			mqhp->mq_curmaxprio = prio;
	} else {
		MQ_ASSERT_PTR(mqhp, *tailpp);
		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
		*tailpp = currentp;
	}
}

mqd_t
_mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
{
	va_list		ap;
	mode_t		mode;
	struct mq_attr	*attr;
	int		fd;
	int		err;
	int		cr_flag = 0;
	int		locked = 0;
	uint64_t	total_size;
	size_t		msgsize;
	ssize_t		maxmsg;
	uint64_t	temp;
	void		*ptr;
	mqdes_t		*mqdp;
	mqhdr_t		*mqhp;
	struct mq_dn	*mqdnp;

	if (__pos4obj_check(path) == -1)
		return ((mqd_t)-1);

	/* acquire MSGQ lock to have atomic operation */
	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
		goto out;
	locked = 1;

	va_start(ap, oflag);
	/* filter oflag to have READ/WRITE/CREATE modes only */
	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
	if ((oflag & O_CREAT) != 0) {
		mode = va_arg(ap, mode_t);
		attr = va_arg(ap, struct mq_attr *);
	}
	va_end(ap);

	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
	    mode, &cr_flag)) < 0)
		goto out;

	/* closing permission file */
	(void) __close_nc(fd);

	/* Try to open/create data file */
	if (cr_flag) {
		cr_flag = PFILE_CREATE;
		if (attr == NULL) {
			maxmsg = MQ_MAXMSG;
			msgsize = MQ_MAXSIZE;
		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
			errno = EINVAL;
			goto out;
		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
			errno = ENOSPC;
			goto out;
		} else {
			maxmsg = attr->mq_maxmsg;
			msgsize = attr->mq_msgsize;
		}

		/* adjust for message size at word boundary */
		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);

		total_size = sizeof (mqhdr_t) +
		    maxmsg * (temp + sizeof (msghdr_t)) +
		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);

		if (total_size > SSIZE_MAX) {
			errno = ENOSPC;
			goto out;
		}

		/*
		 * data file is opened with read/write to those
		 * who have read or write permission
		 */
		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
			goto out;

		cr_flag |= DFILE_CREATE | DFILE_OPEN;

		/* force permissions to avoid umask effect */
		if (fchmod(fd, mode) < 0)
			goto out;

		if (ftruncate64(fd, (off64_t)total_size) < 0)
			goto out;
	} else {
		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
		    O_RDWR, 0666, &err)) < 0)
			goto out;
		cr_flag = DFILE_OPEN;

		/* Message queue has not been initialized yet */
		if (read(fd, &total_size, sizeof (total_size)) !=
		    sizeof (total_size) || total_size == 0) {
			errno = ENOENT;
			goto out;
		}

		/* Message queue too big for this process to handle */
		if (total_size > SSIZE_MAX) {
			errno = EFBIG;
			goto out;
		}
	}

	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
		errno = ENOMEM;
		goto out;
	}
	cr_flag |= ALLOC_MEM;

	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
		goto out;
	mqhp = ptr;
	cr_flag |= DFILE_MMAP;

	/* closing data file */
	(void) __close_nc(fd);
	cr_flag &= ~DFILE_OPEN;

	/*
	 * create, unlink, size, mmap, and close description file
	 * all for a flag word in anonymous shared memory
	 */
	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
	    0666, &err)) < 0)
		goto out;
	cr_flag |= DFILE_OPEN;
	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
		goto out;

	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
		goto out;
	mqdnp = ptr;
	cr_flag |= MQDNP_MMAP;

	(void) __close_nc(fd);
	cr_flag &= ~DFILE_OPEN;

	/*
	 * we follow the same strategy as filesystem open() routine,
	 * where fcntl.h flags are changed to flags defined in file.h.
	 */
	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);

	/* new message queue requires initialization */
	if ((cr_flag & DFILE_CREATE) != 0) {
		/* message queue header has to be initialized */
		mq_init(mqhp, msgsize, maxmsg);
		mqhp->mq_totsize = total_size;
	}
	mqdp->mqd_mq = mqhp;
	mqdp->mqd_mqdn = mqdnp;
	mqdp->mqd_magic = MQ_MAGIC;
	mqdp->mqd_tcd = NULL;
	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
		lmutex_lock(&mq_list_lock);
		mqdp->mqd_next = mq_list;
		mqdp->mqd_prev = NULL;
		if (mq_list)
			mq_list->mqd_prev = mqdp;
		mq_list = mqdp;
		lmutex_unlock(&mq_list_lock);
		return ((mqd_t)mqdp);
	}

	locked = 0;	/* fall into the error case */
out:
	err = errno;
	if ((cr_flag & DFILE_OPEN) != 0)
		(void) __close_nc(fd);
	if ((cr_flag & DFILE_CREATE) != 0)
		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
	if ((cr_flag & PFILE_CREATE) != 0)
		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
	if ((cr_flag & ALLOC_MEM) != 0)
		free((void *)mqdp);
	if ((cr_flag & DFILE_MMAP) != 0)
		(void) munmap((caddr_t)mqhp, (size_t)total_size);
	if ((cr_flag & MQDNP_MMAP) != 0)
		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
	if (locked)
		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
	errno = err;
	return ((mqd_t)-1);
}

static void
mq_close_cleanup(mqdes_t *mqdp)
{
	mqhdr_t *mqhp = mqdp->mqd_mq;
	struct mq_dn *mqdnp = mqdp->mqd_mqdn;

	/* invalidate the descriptor before freeing it */
	mqdp->mqd_magic = 0;
	(void) mutex_unlock(&mqhp->mq_exclusive);

	lmutex_lock(&mq_list_lock);
	if (mqdp->mqd_next)
		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
	if (mqdp->mqd_prev)
		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
	if (mq_list == mqdp)
		mq_list = mqdp->mqd_next;
	lmutex_unlock(&mq_list_lock);

	free(mqdp);
	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
}

int
_mq_close(mqd_t mqdes)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	thread_communication_data_t *tcdp;

	if (!mq_is_valid(mqdp)) {
		errno = EBADF;
		return (-1);
	}

	mqhp = mqdp->mqd_mq;
	(void) mutex_lock(&mqhp->mq_exclusive);

	if (mqhp->mq_des == (uintptr_t)mqdp &&
	    mqhp->mq_sigid.sn_pid == getpid()) {
		/* notification is set for this descriptor, remove it */
		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
		mqhp->mq_ntype = 0;
		mqhp->mq_des = 0;
	}

	pthread_cleanup_push(mq_close_cleanup, mqdp);
	if ((tcdp = mqdp->mqd_tcd) != NULL) {
		mqdp->mqd_tcd = NULL;
		del_sigev_mq(tcdp);	/* possible cancellation point */
	}
	pthread_cleanup_pop(1);		/* finish in the cleanup handler */

	return (0);
}

int
_mq_unlink(const char *path)
{
	int err;

	if (__pos4obj_check(path) < 0)
		return (-1);

	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
		return (-1);
	}

	err = __pos4obj_unlink(path, MQ_PERM_TYPE);

	if (err == 0 || (err == -1 && errno == EEXIST)) {
		errno = 0;
		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
	}

	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
		return (-1);

	return (err);

}

static int
__mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	int err;
	int notify = 0;

	/*
	 * sem_*wait() does cancellation, if called.
	 * pthread_testcancel() ensures that cancellation takes place if
	 * there is a cancellation pending when mq_*send() is called.
	 */
	pthread_testcancel();

	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
		errno = EBADF;
		return (-1);
	}

	mqhp = mqdp->mqd_mq;

	if (msg_prio >= mqhp->mq_maxprio) {
		errno = EINVAL;
		return (-1);
	}
	if (msg_len > mqhp->mq_maxsz) {
		errno = EMSGSIZE;
		return (-1);
	}

	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
		err = sem_trywait(&mqhp->mq_notfull);
	else {
		/*
		 * We might get cancelled here...
		 */
		if (timeout == NULL)
			err = sem_wait(&mqhp->mq_notfull);
		else if (abs_rel == ABS_TIME)
			err = sem_timedwait(&mqhp->mq_notfull, timeout);
		else
			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
	}
	if (err == -1) {
		/*
		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
		 * by sem_*wait(), so we can just return.
		 */
		return (-1);
	}

	/*
	 * By the time we're here, we know that we've got the capacity
	 * to add to the queue...now acquire the exclusive lock.
	 */
	(void) mutex_lock(&mqhp->mq_exclusive);

	/*
	 * Now determine if we want to kick the notification.  POSIX
	 * requires that if a process has registered for notification,
	 * we must kick it when the queue makes an empty to non-empty
	 * transition, and there are no blocked receivers.  Note that
	 * this mechanism does _not_ guarantee that the kicked process
	 * will be able to receive a message without blocking;
	 * another receiver could intervene in the meantime.  Thus,
	 * the notification mechanism is inherently racy; all we can
	 * do is hope to minimize the window as much as possible.
	 * In general, we want to avoid kicking the notification when
	 * there are clearly receivers blocked.  We'll determine if
	 * we want to kick the notification before the mq_putmsg(),
	 * but the actual signotify() won't be done until the message
	 * is on the queue.
	 */
	if (mqhp->mq_sigid.sn_pid != 0) {
		int nmessages, nblocked;

		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);

		if (nmessages == 0 && nblocked == 0)
			notify = 1;
	}

	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
	(void) sem_post(&mqhp->mq_notempty);

	if (notify) {
		/* notify and also delete the registration */
		(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
		if (mqhp->mq_ntype == SIGEV_THREAD ||
		    mqhp->mq_ntype == SIGEV_PORT)
			(void) sem_post(&mqhp->mq_spawner);
		mqhp->mq_ntype = 0;
		mqhp->mq_des = 0;
	}

	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
	(void) mutex_unlock(&mqhp->mq_exclusive);

	return (0);
}

int
_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
{
	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
	    NULL, ABS_TIME));
}

int
_mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
	uint_t msg_prio, const timespec_t *abs_timeout)
{
	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
	    abs_timeout, ABS_TIME));
}

int
_mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
	uint_t msg_prio, const timespec_t *rel_timeout)
{
	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
	    rel_timeout, REL_TIME));
}

static void
decrement_rblocked(mqhdr_t *mqhp)
{
	int cancel_state;

	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
	while (sem_wait(&mqhp->mq_rblocked) == -1)
		continue;
	(void) pthread_setcancelstate(cancel_state, NULL);
}

static ssize_t
__mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	ssize_t	msg_size;
	int err;

	/*
	 * sem_*wait() does cancellation, if called.
	 * pthread_testcancel() ensures that cancellation takes place if
	 * there is a cancellation pending when mq_*receive() is called.
	 */
	pthread_testcancel();

	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
		errno = EBADF;
		return (ssize_t)(-1);
	}

	mqhp = mqdp->mqd_mq;

	if (msg_len < mqhp->mq_maxsz) {
		errno = EMSGSIZE;
		return (ssize_t)(-1);
	}

	/*
	 * The semaphoring scheme for mq_[timed]receive is a little hairy
	 * thanks to POSIX.1b's arcane notification mechanism.  First,
	 * we try to take the common case and do a sem_trywait().
	 * If that doesn't work, and O_NONBLOCK hasn't been set,
	 * then note that we're going to sleep by incrementing the rblocked
	 * semaphore.  We decrement that semaphore after waking up.
	 */
	if (sem_trywait(&mqhp->mq_notempty) == -1) {
		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
			/*
			 * errno has been set to EAGAIN or EINTR by
			 * sem_trywait(), so we can just return.
			 */
			return (-1);
		}
		/*
		 * If we're here, then we're probably going to block...
		 * increment the rblocked semaphore.  If we get
		 * cancelled, decrement_rblocked() will decrement it.
		 */
		(void) sem_post(&mqhp->mq_rblocked);

		pthread_cleanup_push(decrement_rblocked, mqhp);
		if (timeout == NULL)
			err = sem_wait(&mqhp->mq_notempty);
		else if (abs_rel == ABS_TIME)
			err = sem_timedwait(&mqhp->mq_notempty, timeout);
		else
			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
		pthread_cleanup_pop(1);

		if (err == -1) {
			/*
			 * We took a signal or timeout while waiting
			 * on mq_notempty...
			 */
			return (-1);
		}
	}

	(void) mutex_lock(&mqhp->mq_exclusive);
	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
	(void) sem_post(&mqhp->mq_notfull);
	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
	(void) mutex_unlock(&mqhp->mq_exclusive);

	return (msg_size);
}

ssize_t
_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
{
	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
	    NULL, ABS_TIME));
}

ssize_t
_mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
	uint_t *msg_prio, const timespec_t *abs_timeout)
{
	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
	    abs_timeout, ABS_TIME));
}

ssize_t
_mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
	uint_t *msg_prio, const timespec_t *rel_timeout)
{
	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
	    rel_timeout, REL_TIME));
}

/*
 * Only used below, in _mq_notify().
 * We already have a spawner thread.
 * Verify that the attributes match; cancel it if necessary.
 */
static int
cancel_if_necessary(thread_communication_data_t *tcdp,
	const struct sigevent *sigevp)
{
	int do_cancel = !_pthread_attr_equal(tcdp->tcd_attrp,
	    sigevp->sigev_notify_attributes);

	if (do_cancel) {
		/*
		 * Attributes don't match, cancel the spawner thread.
		 */
		(void) pthread_cancel(tcdp->tcd_server_id);
	} else {
		/*
		 * Reuse the existing spawner thread with possibly
		 * changed notification function and value.
		 */
		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
		tcdp->tcd_notif.sigev_signo = 0;
		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
		tcdp->tcd_notif.sigev_notify_function =
		    sigevp->sigev_notify_function;
	}

	return (do_cancel);
}

int
_mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	thread_communication_data_t *tcdp;
	siginfo_t mq_siginfo;
	struct sigevent sigevent;
	struct stat64 statb;
	port_notify_t *pn;
	void *userval;
	int rval = -1;
	int ntype;
	int port;

	if (!mq_is_valid(mqdp)) {
		errno = EBADF;
		return (-1);
	}

	mqhp = mqdp->mqd_mq;

	(void) mutex_lock(&mqhp->mq_exclusive);

	if (sigevp == NULL) {		/* remove notification */
		if (mqhp->mq_des == (uintptr_t)mqdp &&
		    mqhp->mq_sigid.sn_pid == getpid()) {
			/* notification is set for this descriptor, remove it */
			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
			if ((tcdp = mqdp->mqd_tcd) != NULL) {
				sig_mutex_lock(&tcdp->tcd_lock);
				if (tcdp->tcd_msg_enabled) {
					/* cancel the spawner thread */
					tcdp = mqdp->mqd_tcd;
					mqdp->mqd_tcd = NULL;
					(void) pthread_cancel(
					    tcdp->tcd_server_id);
				}
				sig_mutex_unlock(&tcdp->tcd_lock);
			}
			mqhp->mq_ntype = 0;
			mqhp->mq_des = 0;
		} else {
			/* notification is not set for this descriptor */
			errno = EBUSY;
			goto bad;
		}
	} else {		/* register notification with this process */
		switch (ntype = sigevp->sigev_notify) {
		case SIGEV_THREAD:
			userval = sigevp->sigev_value.sival_ptr;
			port = -1;
			break;
		case SIGEV_PORT:
			pn = sigevp->sigev_value.sival_ptr;
			userval = pn->portnfy_user;
			port = pn->portnfy_port;
			if (fstat64(port, &statb) != 0 ||
			    !S_ISPORT(statb.st_mode)) {
				errno = EBADF;
				goto bad;
			}
			(void) memset(&sigevent, 0, sizeof (sigevent));
			sigevent.sigev_notify = SIGEV_PORT;
			sigevp = &sigevent;
			break;
		}
		switch (ntype) {
		case SIGEV_NONE:
			mq_siginfo.si_signo = 0;
			mq_siginfo.si_code = SI_MESGQ;
			break;
		case SIGEV_SIGNAL:
			mq_siginfo.si_signo = sigevp->sigev_signo;
			mq_siginfo.si_value = sigevp->sigev_value;
			mq_siginfo.si_code = SI_MESGQ;
			break;
		case SIGEV_THREAD:
			if ((tcdp = mqdp->mqd_tcd) != NULL &&
			    cancel_if_necessary(tcdp, sigevp))
				mqdp->mqd_tcd = NULL;
			/* FALLTHROUGH */
		case SIGEV_PORT:
			if ((tcdp = mqdp->mqd_tcd) == NULL) {
				/* we must create a spawner thread */
				tcdp = setup_sigev_handler(sigevp, MQ);
				if (tcdp == NULL) {
					errno = EBADF;
					goto bad;
				}
				tcdp->tcd_msg_enabled = 0;
				tcdp->tcd_msg_closing = 0;
				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
				if (launch_spawner(tcdp) != 0) {
					free_sigev_handler(tcdp);
					goto bad;
				}
				mqdp->mqd_tcd = tcdp;
			}
			mq_siginfo.si_signo = 0;
			mq_siginfo.si_code = SI_MESGQ;
			break;
		default:
			errno = EINVAL;
			goto bad;
		}

		/* register notification */
		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
			goto bad;
		mqhp->mq_ntype = ntype;
		mqhp->mq_des = (uintptr_t)mqdp;
		switch (ntype) {
		case SIGEV_THREAD:
		case SIGEV_PORT:
			tcdp->tcd_port = port;
			tcdp->tcd_msg_object = mqdp;
			tcdp->tcd_msg_userval = userval;
			sig_mutex_lock(&tcdp->tcd_lock);
			tcdp->tcd_msg_enabled = ntype;
			sig_mutex_unlock(&tcdp->tcd_lock);
			(void) cond_broadcast(&tcdp->tcd_cv);
			break;
		}
	}

	rval = 0;	/* success */
bad:
	(void) mutex_unlock(&mqhp->mq_exclusive);
	return (rval);
}

int
_mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	uint_t	flag = 0;

	if (!mq_is_valid(mqdp)) {
		errno = EBADF;
		return (-1);
	}

	/* store current attributes */
	if (omqstat != NULL) {
		int	count;

		mqhp = mqdp->mqd_mq;
		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
		(void) sem_getvalue(&mqhp->mq_notempty, &count);
		omqstat->mq_curmsgs = count;
	}

	/* set description attributes */
	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
		flag = FNONBLOCK;
	mqdp->mqd_mqdn->mqdn_flags = flag;

	return (0);
}

int
_mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
{
	mqdes_t *mqdp = (mqdes_t *)mqdes;
	mqhdr_t *mqhp;
	int count;

	if (!mq_is_valid(mqdp)) {
		errno = EBADF;
		return (-1);
	}

	mqhp = mqdp->mqd_mq;

	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
	(void) sem_getvalue(&mqhp->mq_notempty, &count);
	mqstat->mq_curmsgs = count;
	return (0);
}

/*
 * Cleanup after fork1() in the child process.
 */
void
postfork1_child_sigev_mq(void)
{
	thread_communication_data_t *tcdp;
	mqdes_t *mqdp;

	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
		if ((tcdp = mqdp->mqd_tcd) != NULL) {
			mqdp->mqd_tcd = NULL;
			tcd_teardown(tcdp);
		}
	}
}