xref: /titanic_51/usr/src/lib/libc/port/rt/mqueue.c (revision 34537ab1867f3ee4fb96c044d343e33685a463aa)
1f841f6adSraf /*
2f841f6adSraf  * CDDL HEADER START
3f841f6adSraf  *
4f841f6adSraf  * The contents of this file are subject to the terms of the
5f841f6adSraf  * Common Development and Distribution License (the "License").
6f841f6adSraf  * You may not use this file except in compliance with the License.
7f841f6adSraf  *
8f841f6adSraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9f841f6adSraf  * or http://www.opensolaris.org/os/licensing.
10f841f6adSraf  * See the License for the specific language governing permissions
11f841f6adSraf  * and limitations under the License.
12f841f6adSraf  *
13f841f6adSraf  * When distributing Covered Code, include this CDDL HEADER in each
14f841f6adSraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15f841f6adSraf  * If applicable, add the following below this CDDL HEADER, with the
16f841f6adSraf  * fields enclosed by brackets "[]" replaced with your own identifying
17f841f6adSraf  * information: Portions Copyright [yyyy] [name of copyright owner]
18f841f6adSraf  *
19f841f6adSraf  * CDDL HEADER END
20f841f6adSraf  */
21f841f6adSraf 
22f841f6adSraf /*
23a574db85Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24f841f6adSraf  * Use is subject to license terms.
25f841f6adSraf  */
26f841f6adSraf 
27f841f6adSraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
28f841f6adSraf 
297257d1b4Sraf #include "lint.h"
30f841f6adSraf #include "mtlib.h"
31f841f6adSraf #define	_KMEMUSER
32f841f6adSraf #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
33f841f6adSraf #undef	_KMEMUSER
34f841f6adSraf #include <mqueue.h>
35f841f6adSraf #include <sys/types.h>
36f841f6adSraf #include <sys/file.h>
37f841f6adSraf #include <sys/mman.h>
38f841f6adSraf #include <errno.h>
39f841f6adSraf #include <stdarg.h>
40f841f6adSraf #include <limits.h>
41f841f6adSraf #include <pthread.h>
42f841f6adSraf #include <assert.h>
43f841f6adSraf #include <string.h>
44f841f6adSraf #include <unistd.h>
45f841f6adSraf #include <stdlib.h>
46f841f6adSraf #include <sys/stat.h>
47f841f6adSraf #include <inttypes.h>
48f841f6adSraf #include "sigev_thread.h"
49f841f6adSraf #include "pos4obj.h"
50f841f6adSraf 
51f841f6adSraf /*
52f841f6adSraf  * Default values per message queue
53f841f6adSraf  */
54f841f6adSraf #define	MQ_MAXMSG	128
55f841f6adSraf #define	MQ_MAXSIZE	1024
56f841f6adSraf 
57f841f6adSraf #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
58f841f6adSraf 
59f841f6adSraf /*
60f841f6adSraf  * Message header which is part of messages in link list
61f841f6adSraf  */
62f841f6adSraf typedef struct {
63f841f6adSraf 	uint64_t 	msg_next;	/* offset of next message in the link */
64f841f6adSraf 	uint64_t	msg_len;	/* length of the message */
65f841f6adSraf } msghdr_t;
66f841f6adSraf 
67f841f6adSraf /*
68f841f6adSraf  * message queue description
69f841f6adSraf  */
70f841f6adSraf struct mq_dn {
71f841f6adSraf 	size_t		mqdn_flags;	/* open description flags */
72f841f6adSraf };
73f841f6adSraf 
74f841f6adSraf /*
75f841f6adSraf  * message queue descriptor structure
76f841f6adSraf  */
77f841f6adSraf typedef struct mq_des {
78f841f6adSraf 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
79f841f6adSraf 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
80f841f6adSraf 	int		mqd_magic;	/* magic # to identify mq_des */
81f841f6adSraf 	int		mqd_flags;	/* operation flag per open */
82f841f6adSraf 	struct mq_header *mqd_mq;	/* address pointer of message Q */
83f841f6adSraf 	struct mq_dn	*mqd_mqdn;	/* open	description */
84f841f6adSraf 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
85*34537ab1Sraf 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
86f841f6adSraf } mqdes_t;
87f841f6adSraf 
88f841f6adSraf /*
89f841f6adSraf  * message queue common header, part of the mmap()ed file.
90f841f6adSraf  * Since message queues may be shared between 32- and 64-bit processes,
91f841f6adSraf  * care must be taken to make sure that the elements of this structure
92f841f6adSraf  * are identical for both _LP64 and _ILP32 cases.
93f841f6adSraf  */
94f841f6adSraf typedef struct mq_header {
95f841f6adSraf 	/* first field must be mq_totsize, DO NOT insert before this	*/
96f841f6adSraf 	int64_t		mq_totsize;	/* total size of the Queue */
97f841f6adSraf 	int64_t		mq_maxsz;	/* max size of each message */
98f841f6adSraf 	uint32_t	mq_maxmsg;	/* max messages in the queue */
99f841f6adSraf 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
100f841f6adSraf 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
101f841f6adSraf 	uint32_t	mq_mask;	/* priority bitmask */
102f841f6adSraf 	uint64_t	mq_freep;	/* free message's head pointer */
103f841f6adSraf 	uint64_t	mq_headpp;	/* pointer to head pointers */
104f841f6adSraf 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
105f841f6adSraf 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
106f841f6adSraf 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
107f841f6adSraf 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
108f841f6adSraf 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
109f841f6adSraf 	sem_t		mq_rblocked;	/* number of processes rblocked */
110f841f6adSraf 	sem_t		mq_notfull;	/* mq_send()'s block on this */
111f841f6adSraf 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
112f841f6adSraf 	sem_t		mq_spawner;	/* spawner thread blocks on this */
113f841f6adSraf } mqhdr_t;
114f841f6adSraf 
115f841f6adSraf /*
116f841f6adSraf  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
117f841f6adSraf  * If this assumption is somehow invalidated, mq_open() needs to be changed
118f841f6adSraf  * back to the old version which kept a count and enforced a limit.
119f841f6adSraf  * We make sure that this is pointed out to those changing <sys/param.h>
120f841f6adSraf  * by checking _MQ_OPEN_MAX at compile time.
121f841f6adSraf  */
122f841f6adSraf #if _MQ_OPEN_MAX != -1
123f841f6adSraf #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
124f841f6adSraf #endif
125f841f6adSraf 
126f841f6adSraf #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
127f841f6adSraf 
128f841f6adSraf #ifdef DEBUG
129f841f6adSraf #define	MQ_ASSERT(x)	assert(x);
130f841f6adSraf 
131f841f6adSraf #define	MQ_ASSERT_PTR(_m, _p) \
132f841f6adSraf 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
133f841f6adSraf 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
134f841f6adSraf 	    _m->mq_totsize));
135f841f6adSraf 
136f841f6adSraf #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
137f841f6adSraf 	int _val; \
138f841f6adSraf 	(void) sem_getvalue((sem), &_val); \
139f841f6adSraf 	assert((_val) <= val); }
140f841f6adSraf #else
141f841f6adSraf #define	MQ_ASSERT(x)
142f841f6adSraf #define	MQ_ASSERT_PTR(_m, _p)
143f841f6adSraf #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
144f841f6adSraf #endif
145f841f6adSraf 
146f841f6adSraf #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
147f841f6adSraf #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
148f841f6adSraf 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
149f841f6adSraf #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
150f841f6adSraf 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
151f841f6adSraf 
152f841f6adSraf #define	MQ_RESERVED	((mqdes_t *)-1)
153f841f6adSraf 
154f841f6adSraf #define	ABS_TIME	0
155f841f6adSraf #define	REL_TIME	1
156f841f6adSraf 
157f841f6adSraf static mutex_t mq_list_lock = DEFAULTMUTEX;
158f841f6adSraf static mqdes_t *mq_list = NULL;
159f841f6adSraf 
160f841f6adSraf extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
161f841f6adSraf 
162f841f6adSraf static int
163f841f6adSraf mq_is_valid(mqdes_t *mqdp)
164f841f6adSraf {
165f841f6adSraf 	/*
166f841f6adSraf 	 * Any use of a message queue after it was closed is
167f841f6adSraf 	 * undefined.  But the standard strongly favours EBADF
168f841f6adSraf 	 * returns.  Before we dereference which could be fatal,
169f841f6adSraf 	 * we first do some pointer sanity checks.
170f841f6adSraf 	 */
171f841f6adSraf 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
172f841f6adSraf 	    ((uintptr_t)mqdp & 0x7) == 0) {
173f841f6adSraf 		return (mqdp->mqd_magic == MQ_MAGIC);
174f841f6adSraf 	}
175f841f6adSraf 
176f841f6adSraf 	return (0);
177f841f6adSraf }
178f841f6adSraf 
179f841f6adSraf static void
180f841f6adSraf mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
181f841f6adSraf {
182f841f6adSraf 	int		i;
183f841f6adSraf 	uint64_t	temp;
184f841f6adSraf 	uint64_t	currentp;
185f841f6adSraf 	uint64_t	nextp;
186f841f6adSraf 
187f841f6adSraf 	/*
188f841f6adSraf 	 * We only need to initialize the non-zero fields.  The use of
189f841f6adSraf 	 * ftruncate() on the message queue file assures that the
190*34537ab1Sraf 	 * pages will be zero-filled.
191f841f6adSraf 	 */
192*34537ab1Sraf 	(void) mutex_init(&mqhp->mq_exclusive,
193*34537ab1Sraf 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
194f841f6adSraf 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
195f841f6adSraf 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
196f841f6adSraf 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
197f841f6adSraf 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
198f841f6adSraf 
199f841f6adSraf 	mqhp->mq_maxsz = msgsize;
200f841f6adSraf 	mqhp->mq_maxmsg = maxmsg;
201f841f6adSraf 
202f841f6adSraf 	/*
203f841f6adSraf 	 * As of this writing (1997), there are 32 message queue priorities.
204f841f6adSraf 	 * If this is to change, then the size of the mq_mask will
205f841f6adSraf 	 * also have to change.  If DEBUG is defined, assert that
206f841f6adSraf 	 * _MQ_PRIO_MAX hasn't changed.
207f841f6adSraf 	 */
208f841f6adSraf 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
209f841f6adSraf #if defined(DEBUG)
210f841f6adSraf 	/* LINTED always true */
211f841f6adSraf 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
212f841f6adSraf #endif
213f841f6adSraf 
214f841f6adSraf 	/*
215f841f6adSraf 	 * Since the message queue can be mapped into different
216f841f6adSraf 	 * virtual address ranges by different processes, we don't
217f841f6adSraf 	 * keep track of pointers, only offsets into the shared region.
218f841f6adSraf 	 */
219f841f6adSraf 	mqhp->mq_headpp = sizeof (mqhdr_t);
220f841f6adSraf 	mqhp->mq_tailpp = mqhp->mq_headpp +
221f841f6adSraf 	    mqhp->mq_maxprio * sizeof (uint64_t);
222f841f6adSraf 	mqhp->mq_freep = mqhp->mq_tailpp +
223f841f6adSraf 	    mqhp->mq_maxprio * sizeof (uint64_t);
224f841f6adSraf 
225f841f6adSraf 	currentp = mqhp->mq_freep;
226f841f6adSraf 	MQ_PTR(mqhp, currentp)->msg_next = 0;
227f841f6adSraf 
228f841f6adSraf 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
229f841f6adSraf 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
230f841f6adSraf 		nextp = currentp + sizeof (msghdr_t) + temp;
231f841f6adSraf 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
232f841f6adSraf 		MQ_PTR(mqhp, nextp)->msg_next = 0;
233f841f6adSraf 		currentp = nextp;
234f841f6adSraf 	}
235f841f6adSraf }
236f841f6adSraf 
237f841f6adSraf static size_t
238f841f6adSraf mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
239f841f6adSraf {
240f841f6adSraf 	uint64_t currentp;
241f841f6adSraf 	msghdr_t *curbuf;
242f841f6adSraf 	uint64_t *headpp;
243f841f6adSraf 	uint64_t *tailpp;
244f841f6adSraf 
245f841f6adSraf 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
246f841f6adSraf 
247f841f6adSraf 	/*
248f841f6adSraf 	 * Get the head and tail pointers for the queue of maximum
249f841f6adSraf 	 * priority.  We shouldn't be here unless there is a message for
250f841f6adSraf 	 * us, so it's fair to assert that both the head and tail
251f841f6adSraf 	 * pointers are non-NULL.
252f841f6adSraf 	 */
253f841f6adSraf 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
254f841f6adSraf 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
255f841f6adSraf 
256f841f6adSraf 	if (msg_prio != NULL)
257f841f6adSraf 		*msg_prio = mqhp->mq_curmaxprio;
258f841f6adSraf 
259f841f6adSraf 	currentp = *headpp;
260f841f6adSraf 	MQ_ASSERT_PTR(mqhp, currentp);
261f841f6adSraf 	curbuf = MQ_PTR(mqhp, currentp);
262f841f6adSraf 
263f841f6adSraf 	if ((*headpp = curbuf->msg_next) == NULL) {
264f841f6adSraf 		/*
265f841f6adSraf 		 * We just nuked the last message in this priority's queue.
266f841f6adSraf 		 * Twiddle this priority's bit, and then find the next bit
267f841f6adSraf 		 * tipped.
268f841f6adSraf 		 */
269f841f6adSraf 		uint_t prio = mqhp->mq_curmaxprio;
270f841f6adSraf 
271f841f6adSraf 		mqhp->mq_mask &= ~(1u << prio);
272f841f6adSraf 
273f841f6adSraf 		for (; prio != 0; prio--)
274f841f6adSraf 			if (mqhp->mq_mask & (1u << prio))
275f841f6adSraf 				break;
276f841f6adSraf 		mqhp->mq_curmaxprio = prio;
277f841f6adSraf 
278f841f6adSraf 		*tailpp = NULL;
279f841f6adSraf 	}
280f841f6adSraf 
281f841f6adSraf 	/*
282f841f6adSraf 	 * Copy the message, and put the buffer back on the free list.
283f841f6adSraf 	 */
284f841f6adSraf 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
285f841f6adSraf 	curbuf->msg_next = mqhp->mq_freep;
286f841f6adSraf 	mqhp->mq_freep = currentp;
287f841f6adSraf 
288f841f6adSraf 	return (curbuf->msg_len);
289f841f6adSraf }
290f841f6adSraf 
291f841f6adSraf 
292f841f6adSraf static void
293f841f6adSraf mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
294f841f6adSraf {
295f841f6adSraf 	uint64_t currentp;
296f841f6adSraf 	msghdr_t *curbuf;
297f841f6adSraf 	uint64_t *headpp;
298f841f6adSraf 	uint64_t *tailpp;
299f841f6adSraf 
300f841f6adSraf 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
301f841f6adSraf 
302f841f6adSraf 	/*
303f841f6adSraf 	 * Grab a free message block, and link it in.  We shouldn't
304f841f6adSraf 	 * be here unless there is room in the queue for us;  it's
305f841f6adSraf 	 * fair to assert that the free pointer is non-NULL.
306f841f6adSraf 	 */
307f841f6adSraf 	currentp = mqhp->mq_freep;
308f841f6adSraf 	MQ_ASSERT_PTR(mqhp, currentp);
309f841f6adSraf 	curbuf = MQ_PTR(mqhp, currentp);
310f841f6adSraf 
311f841f6adSraf 	/*
312f841f6adSraf 	 * Remove a message from the free list, and copy in the new contents.
313f841f6adSraf 	 */
314f841f6adSraf 	mqhp->mq_freep = curbuf->msg_next;
315f841f6adSraf 	curbuf->msg_next = NULL;
316f841f6adSraf 	(void) memcpy((char *)&curbuf[1], msgp, len);
317f841f6adSraf 	curbuf->msg_len = len;
318f841f6adSraf 
319f841f6adSraf 	headpp = HEAD_PTR(mqhp, prio);
320f841f6adSraf 	tailpp = TAIL_PTR(mqhp, prio);
321f841f6adSraf 
322f841f6adSraf 	if (*tailpp == 0) {
323f841f6adSraf 		/*
324f841f6adSraf 		 * This is the first message on this queue.  Set the
325f841f6adSraf 		 * head and tail pointers, and tip the appropriate bit
326f841f6adSraf 		 * in the priority mask.
327f841f6adSraf 		 */
328f841f6adSraf 		*headpp = currentp;
329f841f6adSraf 		*tailpp = currentp;
330f841f6adSraf 		mqhp->mq_mask |= (1u << prio);
331f841f6adSraf 		if (prio > mqhp->mq_curmaxprio)
332f841f6adSraf 			mqhp->mq_curmaxprio = prio;
333f841f6adSraf 	} else {
334f841f6adSraf 		MQ_ASSERT_PTR(mqhp, *tailpp);
335f841f6adSraf 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
336f841f6adSraf 		*tailpp = currentp;
337f841f6adSraf 	}
338f841f6adSraf }
339f841f6adSraf 
340*34537ab1Sraf /*
341*34537ab1Sraf  * Send a notification and also delete the registration.
342*34537ab1Sraf  */
343*34537ab1Sraf static void
344*34537ab1Sraf do_notify(mqhdr_t *mqhp)
345*34537ab1Sraf {
346*34537ab1Sraf 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
347*34537ab1Sraf 	if (mqhp->mq_ntype == SIGEV_THREAD ||
348*34537ab1Sraf 	    mqhp->mq_ntype == SIGEV_PORT)
349*34537ab1Sraf 		(void) sem_post(&mqhp->mq_spawner);
350*34537ab1Sraf 	mqhp->mq_ntype = 0;
351*34537ab1Sraf 	mqhp->mq_des = 0;
352*34537ab1Sraf }
353*34537ab1Sraf 
354*34537ab1Sraf /*
355*34537ab1Sraf  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
356*34537ab1Sraf  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
357*34537ab1Sraf  * they fail with errno == EBADMSG.  Trigger any registered notification.
358*34537ab1Sraf  */
359*34537ab1Sraf static void
360*34537ab1Sraf owner_dead(mqdes_t *mqdp, int error)
361*34537ab1Sraf {
362*34537ab1Sraf 	mqhdr_t *mqhp = mqdp->mqd_mq;
363*34537ab1Sraf 
364*34537ab1Sraf 	mqdp->mqd_ownerdead = 1;
365*34537ab1Sraf 	(void) sem_post(&mqhp->mq_notfull);
366*34537ab1Sraf 	(void) sem_post(&mqhp->mq_notempty);
367*34537ab1Sraf 	if (error == EOWNERDEAD) {
368*34537ab1Sraf 		if (mqhp->mq_sigid.sn_pid != 0)
369*34537ab1Sraf 			do_notify(mqhp);
370*34537ab1Sraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
371*34537ab1Sraf 	}
372*34537ab1Sraf 	errno = EBADMSG;
373*34537ab1Sraf }
374*34537ab1Sraf 
375f841f6adSraf mqd_t
3767257d1b4Sraf mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
377f841f6adSraf {
378f841f6adSraf 	va_list		ap;
379*34537ab1Sraf 	mode_t		mode = 0;
380*34537ab1Sraf 	struct mq_attr	*attr = NULL;
381f841f6adSraf 	int		fd;
382f841f6adSraf 	int		err;
383f841f6adSraf 	int		cr_flag = 0;
384f841f6adSraf 	int		locked = 0;
385f841f6adSraf 	uint64_t	total_size;
386f841f6adSraf 	size_t		msgsize;
387f841f6adSraf 	ssize_t		maxmsg;
388f841f6adSraf 	uint64_t	temp;
389f841f6adSraf 	void		*ptr;
390f841f6adSraf 	mqdes_t		*mqdp;
391f841f6adSraf 	mqhdr_t		*mqhp;
392f841f6adSraf 	struct mq_dn	*mqdnp;
393f841f6adSraf 
394f841f6adSraf 	if (__pos4obj_check(path) == -1)
395f841f6adSraf 		return ((mqd_t)-1);
396f841f6adSraf 
397f841f6adSraf 	/* acquire MSGQ lock to have atomic operation */
398f841f6adSraf 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
399f841f6adSraf 		goto out;
400f841f6adSraf 	locked = 1;
401f841f6adSraf 
402f841f6adSraf 	va_start(ap, oflag);
403f841f6adSraf 	/* filter oflag to have READ/WRITE/CREATE modes only */
404f841f6adSraf 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
405f841f6adSraf 	if ((oflag & O_CREAT) != 0) {
406f841f6adSraf 		mode = va_arg(ap, mode_t);
407f841f6adSraf 		attr = va_arg(ap, struct mq_attr *);
408f841f6adSraf 	}
409f841f6adSraf 	va_end(ap);
410f841f6adSraf 
411f841f6adSraf 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
412f841f6adSraf 	    mode, &cr_flag)) < 0)
413f841f6adSraf 		goto out;
414f841f6adSraf 
415f841f6adSraf 	/* closing permission file */
416f841f6adSraf 	(void) __close_nc(fd);
417f841f6adSraf 
418f841f6adSraf 	/* Try to open/create data file */
419f841f6adSraf 	if (cr_flag) {
420f841f6adSraf 		cr_flag = PFILE_CREATE;
421f841f6adSraf 		if (attr == NULL) {
422f841f6adSraf 			maxmsg = MQ_MAXMSG;
423f841f6adSraf 			msgsize = MQ_MAXSIZE;
424f841f6adSraf 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
425f841f6adSraf 			errno = EINVAL;
426f841f6adSraf 			goto out;
427f841f6adSraf 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
428f841f6adSraf 			errno = ENOSPC;
429f841f6adSraf 			goto out;
430f841f6adSraf 		} else {
431f841f6adSraf 			maxmsg = attr->mq_maxmsg;
432f841f6adSraf 			msgsize = attr->mq_msgsize;
433f841f6adSraf 		}
434f841f6adSraf 
435f841f6adSraf 		/* adjust for message size at word boundary */
436f841f6adSraf 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
437f841f6adSraf 
438f841f6adSraf 		total_size = sizeof (mqhdr_t) +
439f841f6adSraf 		    maxmsg * (temp + sizeof (msghdr_t)) +
440f841f6adSraf 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
441f841f6adSraf 
442f841f6adSraf 		if (total_size > SSIZE_MAX) {
443f841f6adSraf 			errno = ENOSPC;
444f841f6adSraf 			goto out;
445f841f6adSraf 		}
446f841f6adSraf 
447f841f6adSraf 		/*
448f841f6adSraf 		 * data file is opened with read/write to those
449f841f6adSraf 		 * who have read or write permission
450f841f6adSraf 		 */
451f841f6adSraf 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
452f841f6adSraf 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
453f841f6adSraf 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
454f841f6adSraf 			goto out;
455f841f6adSraf 
456f841f6adSraf 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
457f841f6adSraf 
458f841f6adSraf 		/* force permissions to avoid umask effect */
459f841f6adSraf 		if (fchmod(fd, mode) < 0)
460f841f6adSraf 			goto out;
461f841f6adSraf 
462f841f6adSraf 		if (ftruncate64(fd, (off64_t)total_size) < 0)
463f841f6adSraf 			goto out;
464f841f6adSraf 	} else {
465f841f6adSraf 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
466f841f6adSraf 		    O_RDWR, 0666, &err)) < 0)
467f841f6adSraf 			goto out;
468f841f6adSraf 		cr_flag = DFILE_OPEN;
469f841f6adSraf 
470f841f6adSraf 		/* Message queue has not been initialized yet */
471f841f6adSraf 		if (read(fd, &total_size, sizeof (total_size)) !=
472f841f6adSraf 		    sizeof (total_size) || total_size == 0) {
473f841f6adSraf 			errno = ENOENT;
474f841f6adSraf 			goto out;
475f841f6adSraf 		}
476f841f6adSraf 
477f841f6adSraf 		/* Message queue too big for this process to handle */
478f841f6adSraf 		if (total_size > SSIZE_MAX) {
479f841f6adSraf 			errno = EFBIG;
480f841f6adSraf 			goto out;
481f841f6adSraf 		}
482f841f6adSraf 	}
483f841f6adSraf 
484f841f6adSraf 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
485f841f6adSraf 		errno = ENOMEM;
486f841f6adSraf 		goto out;
487f841f6adSraf 	}
488f841f6adSraf 	cr_flag |= ALLOC_MEM;
489f841f6adSraf 
490f841f6adSraf 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
491f841f6adSraf 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
492f841f6adSraf 		goto out;
493f841f6adSraf 	mqhp = ptr;
494f841f6adSraf 	cr_flag |= DFILE_MMAP;
495f841f6adSraf 
496f841f6adSraf 	/* closing data file */
497f841f6adSraf 	(void) __close_nc(fd);
498f841f6adSraf 	cr_flag &= ~DFILE_OPEN;
499f841f6adSraf 
500f841f6adSraf 	/*
501f841f6adSraf 	 * create, unlink, size, mmap, and close description file
502f841f6adSraf 	 * all for a flag word in anonymous shared memory
503f841f6adSraf 	 */
504f841f6adSraf 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
505f841f6adSraf 	    0666, &err)) < 0)
506f841f6adSraf 		goto out;
507f841f6adSraf 	cr_flag |= DFILE_OPEN;
508f841f6adSraf 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
509f841f6adSraf 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
510f841f6adSraf 		goto out;
511f841f6adSraf 
512f841f6adSraf 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
513f841f6adSraf 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
514f841f6adSraf 		goto out;
515f841f6adSraf 	mqdnp = ptr;
516f841f6adSraf 	cr_flag |= MQDNP_MMAP;
517f841f6adSraf 
518f841f6adSraf 	(void) __close_nc(fd);
519f841f6adSraf 	cr_flag &= ~DFILE_OPEN;
520f841f6adSraf 
521f841f6adSraf 	/*
522f841f6adSraf 	 * we follow the same strategy as filesystem open() routine,
523f841f6adSraf 	 * where fcntl.h flags are changed to flags defined in file.h.
524f841f6adSraf 	 */
525f841f6adSraf 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
526f841f6adSraf 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
527f841f6adSraf 
528f841f6adSraf 	/* new message queue requires initialization */
529f841f6adSraf 	if ((cr_flag & DFILE_CREATE) != 0) {
530f841f6adSraf 		/* message queue header has to be initialized */
531f841f6adSraf 		mq_init(mqhp, msgsize, maxmsg);
532f841f6adSraf 		mqhp->mq_totsize = total_size;
533f841f6adSraf 	}
534f841f6adSraf 	mqdp->mqd_mq = mqhp;
535f841f6adSraf 	mqdp->mqd_mqdn = mqdnp;
536f841f6adSraf 	mqdp->mqd_magic = MQ_MAGIC;
537f841f6adSraf 	mqdp->mqd_tcd = NULL;
538*34537ab1Sraf 	mqdp->mqd_ownerdead = 0;
539f841f6adSraf 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
540f841f6adSraf 		lmutex_lock(&mq_list_lock);
541f841f6adSraf 		mqdp->mqd_next = mq_list;
542f841f6adSraf 		mqdp->mqd_prev = NULL;
543f841f6adSraf 		if (mq_list)
544f841f6adSraf 			mq_list->mqd_prev = mqdp;
545f841f6adSraf 		mq_list = mqdp;
546f841f6adSraf 		lmutex_unlock(&mq_list_lock);
547f841f6adSraf 		return ((mqd_t)mqdp);
548f841f6adSraf 	}
549f841f6adSraf 
550f841f6adSraf 	locked = 0;	/* fall into the error case */
551f841f6adSraf out:
552f841f6adSraf 	err = errno;
553f841f6adSraf 	if ((cr_flag & DFILE_OPEN) != 0)
554f841f6adSraf 		(void) __close_nc(fd);
555f841f6adSraf 	if ((cr_flag & DFILE_CREATE) != 0)
556f841f6adSraf 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
557f841f6adSraf 	if ((cr_flag & PFILE_CREATE) != 0)
558f841f6adSraf 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
559f841f6adSraf 	if ((cr_flag & ALLOC_MEM) != 0)
560f841f6adSraf 		free((void *)mqdp);
561f841f6adSraf 	if ((cr_flag & DFILE_MMAP) != 0)
562f841f6adSraf 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
563f841f6adSraf 	if ((cr_flag & MQDNP_MMAP) != 0)
564f841f6adSraf 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
565f841f6adSraf 	if (locked)
566f841f6adSraf 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
567f841f6adSraf 	errno = err;
568f841f6adSraf 	return ((mqd_t)-1);
569f841f6adSraf }
570f841f6adSraf 
571f841f6adSraf static void
572f841f6adSraf mq_close_cleanup(mqdes_t *mqdp)
573f841f6adSraf {
574f841f6adSraf 	mqhdr_t *mqhp = mqdp->mqd_mq;
575f841f6adSraf 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
576f841f6adSraf 
577f841f6adSraf 	/* invalidate the descriptor before freeing it */
578f841f6adSraf 	mqdp->mqd_magic = 0;
579*34537ab1Sraf 	if (!mqdp->mqd_ownerdead)
580f841f6adSraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
581f841f6adSraf 
582f841f6adSraf 	lmutex_lock(&mq_list_lock);
583f841f6adSraf 	if (mqdp->mqd_next)
584f841f6adSraf 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
585f841f6adSraf 	if (mqdp->mqd_prev)
586f841f6adSraf 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
587f841f6adSraf 	if (mq_list == mqdp)
588f841f6adSraf 		mq_list = mqdp->mqd_next;
589f841f6adSraf 	lmutex_unlock(&mq_list_lock);
590f841f6adSraf 
591f841f6adSraf 	free(mqdp);
592f841f6adSraf 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
593f841f6adSraf 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
594f841f6adSraf }
595f841f6adSraf 
596f841f6adSraf int
5977257d1b4Sraf mq_close(mqd_t mqdes)
598f841f6adSraf {
599f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
600f841f6adSraf 	mqhdr_t *mqhp;
601f841f6adSraf 	thread_communication_data_t *tcdp;
602*34537ab1Sraf 	int error;
603f841f6adSraf 
604f841f6adSraf 	if (!mq_is_valid(mqdp)) {
605f841f6adSraf 		errno = EBADF;
606f841f6adSraf 		return (-1);
607f841f6adSraf 	}
608f841f6adSraf 
609f841f6adSraf 	mqhp = mqdp->mqd_mq;
610*34537ab1Sraf 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
611*34537ab1Sraf 		mqdp->mqd_ownerdead = 1;
612*34537ab1Sraf 		if (error == EOWNERDEAD)
613*34537ab1Sraf 			(void) mutex_unlock(&mqhp->mq_exclusive);
614*34537ab1Sraf 		/* carry on regardless, without holding mq_exclusive */
615*34537ab1Sraf 	}
616f841f6adSraf 
617f841f6adSraf 	if (mqhp->mq_des == (uintptr_t)mqdp &&
618f841f6adSraf 	    mqhp->mq_sigid.sn_pid == getpid()) {
619f841f6adSraf 		/* notification is set for this descriptor, remove it */
620f841f6adSraf 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
621f841f6adSraf 		mqhp->mq_ntype = 0;
622f841f6adSraf 		mqhp->mq_des = 0;
623f841f6adSraf 	}
624f841f6adSraf 
625f841f6adSraf 	pthread_cleanup_push(mq_close_cleanup, mqdp);
626f841f6adSraf 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
627f841f6adSraf 		mqdp->mqd_tcd = NULL;
628f841f6adSraf 		del_sigev_mq(tcdp);	/* possible cancellation point */
629f841f6adSraf 	}
630f841f6adSraf 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
631f841f6adSraf 
632f841f6adSraf 	return (0);
633f841f6adSraf }
634f841f6adSraf 
635f841f6adSraf int
6367257d1b4Sraf mq_unlink(const char *path)
637f841f6adSraf {
638f841f6adSraf 	int err;
639f841f6adSraf 
640f841f6adSraf 	if (__pos4obj_check(path) < 0)
641f841f6adSraf 		return (-1);
642f841f6adSraf 
643f841f6adSraf 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
644f841f6adSraf 		return (-1);
645f841f6adSraf 	}
646f841f6adSraf 
647f841f6adSraf 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
648f841f6adSraf 
649f841f6adSraf 	if (err == 0 || (err == -1 && errno == EEXIST)) {
650f841f6adSraf 		errno = 0;
651f841f6adSraf 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
652f841f6adSraf 	}
653f841f6adSraf 
654f841f6adSraf 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
655f841f6adSraf 		return (-1);
656f841f6adSraf 
657f841f6adSraf 	return (err);
658f841f6adSraf 
659f841f6adSraf }
660f841f6adSraf 
661f841f6adSraf static int
662f841f6adSraf __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
663f841f6adSraf 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
664f841f6adSraf {
665f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
666f841f6adSraf 	mqhdr_t *mqhp;
667f841f6adSraf 	int err;
668f841f6adSraf 	int notify = 0;
669f841f6adSraf 
670f841f6adSraf 	/*
671f841f6adSraf 	 * sem_*wait() does cancellation, if called.
672f841f6adSraf 	 * pthread_testcancel() ensures that cancellation takes place if
673f841f6adSraf 	 * there is a cancellation pending when mq_*send() is called.
674f841f6adSraf 	 */
675f841f6adSraf 	pthread_testcancel();
676f841f6adSraf 
677f841f6adSraf 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
678f841f6adSraf 		errno = EBADF;
679f841f6adSraf 		return (-1);
680f841f6adSraf 	}
681f841f6adSraf 
682f841f6adSraf 	mqhp = mqdp->mqd_mq;
683f841f6adSraf 
684f841f6adSraf 	if (msg_prio >= mqhp->mq_maxprio) {
685f841f6adSraf 		errno = EINVAL;
686f841f6adSraf 		return (-1);
687f841f6adSraf 	}
688f841f6adSraf 	if (msg_len > mqhp->mq_maxsz) {
689f841f6adSraf 		errno = EMSGSIZE;
690f841f6adSraf 		return (-1);
691f841f6adSraf 	}
692f841f6adSraf 
693f841f6adSraf 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
694f841f6adSraf 		err = sem_trywait(&mqhp->mq_notfull);
695f841f6adSraf 	else {
696f841f6adSraf 		/*
697f841f6adSraf 		 * We might get cancelled here...
698f841f6adSraf 		 */
699f841f6adSraf 		if (timeout == NULL)
700f841f6adSraf 			err = sem_wait(&mqhp->mq_notfull);
701f841f6adSraf 		else if (abs_rel == ABS_TIME)
702f841f6adSraf 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
703f841f6adSraf 		else
704f841f6adSraf 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
705f841f6adSraf 	}
706f841f6adSraf 	if (err == -1) {
707f841f6adSraf 		/*
708f841f6adSraf 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
709f841f6adSraf 		 * by sem_*wait(), so we can just return.
710f841f6adSraf 		 */
711f841f6adSraf 		return (-1);
712f841f6adSraf 	}
713f841f6adSraf 
714f841f6adSraf 	/*
715f841f6adSraf 	 * By the time we're here, we know that we've got the capacity
716f841f6adSraf 	 * to add to the queue...now acquire the exclusive lock.
717f841f6adSraf 	 */
718*34537ab1Sraf 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
719*34537ab1Sraf 		owner_dead(mqdp, err);
720*34537ab1Sraf 		return (-1);
721*34537ab1Sraf 	}
722f841f6adSraf 
723f841f6adSraf 	/*
724f841f6adSraf 	 * Now determine if we want to kick the notification.  POSIX
725f841f6adSraf 	 * requires that if a process has registered for notification,
726f841f6adSraf 	 * we must kick it when the queue makes an empty to non-empty
727f841f6adSraf 	 * transition, and there are no blocked receivers.  Note that
728f841f6adSraf 	 * this mechanism does _not_ guarantee that the kicked process
729f841f6adSraf 	 * will be able to receive a message without blocking;
730f841f6adSraf 	 * another receiver could intervene in the meantime.  Thus,
731f841f6adSraf 	 * the notification mechanism is inherently racy; all we can
732f841f6adSraf 	 * do is hope to minimize the window as much as possible.
733f841f6adSraf 	 * In general, we want to avoid kicking the notification when
734f841f6adSraf 	 * there are clearly receivers blocked.  We'll determine if
735f841f6adSraf 	 * we want to kick the notification before the mq_putmsg(),
736f841f6adSraf 	 * but the actual signotify() won't be done until the message
737f841f6adSraf 	 * is on the queue.
738f841f6adSraf 	 */
739f841f6adSraf 	if (mqhp->mq_sigid.sn_pid != 0) {
740f841f6adSraf 		int nmessages, nblocked;
741f841f6adSraf 
742f841f6adSraf 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
743f841f6adSraf 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
744f841f6adSraf 
745f841f6adSraf 		if (nmessages == 0 && nblocked == 0)
746f841f6adSraf 			notify = 1;
747f841f6adSraf 	}
748f841f6adSraf 
749f841f6adSraf 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
750f841f6adSraf 	(void) sem_post(&mqhp->mq_notempty);
751f841f6adSraf 
752f841f6adSraf 	if (notify) {
753f841f6adSraf 		/* notify and also delete the registration */
754*34537ab1Sraf 		do_notify(mqhp);
755f841f6adSraf 	}
756f841f6adSraf 
757f841f6adSraf 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
758f841f6adSraf 	(void) mutex_unlock(&mqhp->mq_exclusive);
759f841f6adSraf 
760f841f6adSraf 	return (0);
761f841f6adSraf }
762f841f6adSraf 
763f841f6adSraf int
7647257d1b4Sraf mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
765f841f6adSraf {
766f841f6adSraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
767f841f6adSraf 	    NULL, ABS_TIME));
768f841f6adSraf }
769f841f6adSraf 
770f841f6adSraf int
7717257d1b4Sraf mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
772f841f6adSraf 	uint_t msg_prio, const timespec_t *abs_timeout)
773f841f6adSraf {
774f841f6adSraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
775f841f6adSraf 	    abs_timeout, ABS_TIME));
776f841f6adSraf }
777f841f6adSraf 
778f841f6adSraf int
7797257d1b4Sraf mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
780f841f6adSraf 	uint_t msg_prio, const timespec_t *rel_timeout)
781f841f6adSraf {
782f841f6adSraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
783f841f6adSraf 	    rel_timeout, REL_TIME));
784f841f6adSraf }
785f841f6adSraf 
786f841f6adSraf static void
787f841f6adSraf decrement_rblocked(mqhdr_t *mqhp)
788f841f6adSraf {
789a574db85Sraf 	int cancel_state;
790f841f6adSraf 
791a574db85Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
792f841f6adSraf 	while (sem_wait(&mqhp->mq_rblocked) == -1)
793f841f6adSraf 		continue;
794a574db85Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
795f841f6adSraf }
796f841f6adSraf 
797f841f6adSraf static ssize_t
798f841f6adSraf __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
799f841f6adSraf 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
800f841f6adSraf {
801f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
802f841f6adSraf 	mqhdr_t *mqhp;
803f841f6adSraf 	ssize_t	msg_size;
804f841f6adSraf 	int err;
805f841f6adSraf 
806f841f6adSraf 	/*
807f841f6adSraf 	 * sem_*wait() does cancellation, if called.
808f841f6adSraf 	 * pthread_testcancel() ensures that cancellation takes place if
809f841f6adSraf 	 * there is a cancellation pending when mq_*receive() is called.
810f841f6adSraf 	 */
811f841f6adSraf 	pthread_testcancel();
812f841f6adSraf 
813f841f6adSraf 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
814f841f6adSraf 		errno = EBADF;
815f841f6adSraf 		return (ssize_t)(-1);
816f841f6adSraf 	}
817f841f6adSraf 
818f841f6adSraf 	mqhp = mqdp->mqd_mq;
819f841f6adSraf 
820f841f6adSraf 	if (msg_len < mqhp->mq_maxsz) {
821f841f6adSraf 		errno = EMSGSIZE;
822f841f6adSraf 		return (ssize_t)(-1);
823f841f6adSraf 	}
824f841f6adSraf 
825f841f6adSraf 	/*
826f841f6adSraf 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
827f841f6adSraf 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
828f841f6adSraf 	 * we try to take the common case and do a sem_trywait().
829f841f6adSraf 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
830f841f6adSraf 	 * then note that we're going to sleep by incrementing the rblocked
831f841f6adSraf 	 * semaphore.  We decrement that semaphore after waking up.
832f841f6adSraf 	 */
833f841f6adSraf 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
834f841f6adSraf 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
835f841f6adSraf 			/*
836f841f6adSraf 			 * errno has been set to EAGAIN or EINTR by
837f841f6adSraf 			 * sem_trywait(), so we can just return.
838f841f6adSraf 			 */
839f841f6adSraf 			return (-1);
840f841f6adSraf 		}
841f841f6adSraf 		/*
842f841f6adSraf 		 * If we're here, then we're probably going to block...
843f841f6adSraf 		 * increment the rblocked semaphore.  If we get
844f841f6adSraf 		 * cancelled, decrement_rblocked() will decrement it.
845f841f6adSraf 		 */
846f841f6adSraf 		(void) sem_post(&mqhp->mq_rblocked);
847f841f6adSraf 
848f841f6adSraf 		pthread_cleanup_push(decrement_rblocked, mqhp);
849f841f6adSraf 		if (timeout == NULL)
850f841f6adSraf 			err = sem_wait(&mqhp->mq_notempty);
851f841f6adSraf 		else if (abs_rel == ABS_TIME)
852f841f6adSraf 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
853f841f6adSraf 		else
854f841f6adSraf 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
855f841f6adSraf 		pthread_cleanup_pop(1);
856f841f6adSraf 
857f841f6adSraf 		if (err == -1) {
858f841f6adSraf 			/*
859f841f6adSraf 			 * We took a signal or timeout while waiting
860f841f6adSraf 			 * on mq_notempty...
861f841f6adSraf 			 */
862f841f6adSraf 			return (-1);
863f841f6adSraf 		}
864f841f6adSraf 	}
865f841f6adSraf 
866*34537ab1Sraf 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
867*34537ab1Sraf 		owner_dead(mqdp, err);
868*34537ab1Sraf 		return (-1);
869*34537ab1Sraf 	}
870f841f6adSraf 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
871f841f6adSraf 	(void) sem_post(&mqhp->mq_notfull);
872f841f6adSraf 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
873f841f6adSraf 	(void) mutex_unlock(&mqhp->mq_exclusive);
874f841f6adSraf 
875f841f6adSraf 	return (msg_size);
876f841f6adSraf }
877f841f6adSraf 
878f841f6adSraf ssize_t
8797257d1b4Sraf mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
880f841f6adSraf {
881f841f6adSraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
882f841f6adSraf 	    NULL, ABS_TIME));
883f841f6adSraf }
884f841f6adSraf 
885f841f6adSraf ssize_t
8867257d1b4Sraf mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
887f841f6adSraf 	uint_t *msg_prio, const timespec_t *abs_timeout)
888f841f6adSraf {
889f841f6adSraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
890f841f6adSraf 	    abs_timeout, ABS_TIME));
891f841f6adSraf }
892f841f6adSraf 
893f841f6adSraf ssize_t
8947257d1b4Sraf mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
895f841f6adSraf 	uint_t *msg_prio, const timespec_t *rel_timeout)
896f841f6adSraf {
897f841f6adSraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
898f841f6adSraf 	    rel_timeout, REL_TIME));
899f841f6adSraf }
900f841f6adSraf 
901f841f6adSraf /*
9027257d1b4Sraf  * Only used below, in mq_notify().
903f841f6adSraf  * We already have a spawner thread.
904f841f6adSraf  * Verify that the attributes match; cancel it if necessary.
905f841f6adSraf  */
906f841f6adSraf static int
907f841f6adSraf cancel_if_necessary(thread_communication_data_t *tcdp,
908f841f6adSraf 	const struct sigevent *sigevp)
909f841f6adSraf {
9107257d1b4Sraf 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
911f841f6adSraf 	    sigevp->sigev_notify_attributes);
912f841f6adSraf 
913f841f6adSraf 	if (do_cancel) {
914f841f6adSraf 		/*
915f841f6adSraf 		 * Attributes don't match, cancel the spawner thread.
916f841f6adSraf 		 */
917f841f6adSraf 		(void) pthread_cancel(tcdp->tcd_server_id);
918f841f6adSraf 	} else {
919f841f6adSraf 		/*
920f841f6adSraf 		 * Reuse the existing spawner thread with possibly
921f841f6adSraf 		 * changed notification function and value.
922f841f6adSraf 		 */
923f841f6adSraf 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
924f841f6adSraf 		tcdp->tcd_notif.sigev_signo = 0;
925f841f6adSraf 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
926f841f6adSraf 		tcdp->tcd_notif.sigev_notify_function =
927f841f6adSraf 		    sigevp->sigev_notify_function;
928f841f6adSraf 	}
929f841f6adSraf 
930f841f6adSraf 	return (do_cancel);
931f841f6adSraf }
932f841f6adSraf 
933f841f6adSraf int
9347257d1b4Sraf mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
935f841f6adSraf {
936f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
937f841f6adSraf 	mqhdr_t *mqhp;
938f841f6adSraf 	thread_communication_data_t *tcdp;
939f841f6adSraf 	siginfo_t mq_siginfo;
940f841f6adSraf 	struct sigevent sigevent;
941f841f6adSraf 	struct stat64 statb;
942f841f6adSraf 	port_notify_t *pn;
943f841f6adSraf 	void *userval;
944f841f6adSraf 	int rval = -1;
945f841f6adSraf 	int ntype;
946f841f6adSraf 	int port;
947*34537ab1Sraf 	int error;
948f841f6adSraf 
949f841f6adSraf 	if (!mq_is_valid(mqdp)) {
950f841f6adSraf 		errno = EBADF;
951f841f6adSraf 		return (-1);
952f841f6adSraf 	}
953f841f6adSraf 
954f841f6adSraf 	mqhp = mqdp->mqd_mq;
955f841f6adSraf 
956*34537ab1Sraf 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
957*34537ab1Sraf 		mqdp->mqd_ownerdead = 1;
958*34537ab1Sraf 		sigevp = NULL;
959*34537ab1Sraf 		if (error == EOWNERDEAD)
960*34537ab1Sraf 			(void) mutex_unlock(&mqhp->mq_exclusive);
961*34537ab1Sraf 		/* carry on regardless, without holding mq_exclusive */
962*34537ab1Sraf 	}
963f841f6adSraf 
964f841f6adSraf 	if (sigevp == NULL) {		/* remove notification */
965f841f6adSraf 		if (mqhp->mq_des == (uintptr_t)mqdp &&
966f841f6adSraf 		    mqhp->mq_sigid.sn_pid == getpid()) {
967f841f6adSraf 			/* notification is set for this descriptor, remove it */
968f841f6adSraf 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
969f841f6adSraf 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
970f841f6adSraf 				sig_mutex_lock(&tcdp->tcd_lock);
971f841f6adSraf 				if (tcdp->tcd_msg_enabled) {
972f841f6adSraf 					/* cancel the spawner thread */
973f841f6adSraf 					tcdp = mqdp->mqd_tcd;
974f841f6adSraf 					mqdp->mqd_tcd = NULL;
975f841f6adSraf 					(void) pthread_cancel(
976f841f6adSraf 					    tcdp->tcd_server_id);
977f841f6adSraf 				}
978f841f6adSraf 				sig_mutex_unlock(&tcdp->tcd_lock);
979f841f6adSraf 			}
980f841f6adSraf 			mqhp->mq_ntype = 0;
981f841f6adSraf 			mqhp->mq_des = 0;
982f841f6adSraf 		} else {
983f841f6adSraf 			/* notification is not set for this descriptor */
984f841f6adSraf 			errno = EBUSY;
985f841f6adSraf 			goto bad;
986f841f6adSraf 		}
987f841f6adSraf 	} else {		/* register notification with this process */
988f841f6adSraf 		switch (ntype = sigevp->sigev_notify) {
989f841f6adSraf 		case SIGEV_THREAD:
990f841f6adSraf 			userval = sigevp->sigev_value.sival_ptr;
991f841f6adSraf 			port = -1;
992f841f6adSraf 			break;
993f841f6adSraf 		case SIGEV_PORT:
994f841f6adSraf 			pn = sigevp->sigev_value.sival_ptr;
995f841f6adSraf 			userval = pn->portnfy_user;
996f841f6adSraf 			port = pn->portnfy_port;
997f841f6adSraf 			if (fstat64(port, &statb) != 0 ||
998f841f6adSraf 			    !S_ISPORT(statb.st_mode)) {
999f841f6adSraf 				errno = EBADF;
1000f841f6adSraf 				goto bad;
1001f841f6adSraf 			}
1002f841f6adSraf 			(void) memset(&sigevent, 0, sizeof (sigevent));
1003f841f6adSraf 			sigevent.sigev_notify = SIGEV_PORT;
1004f841f6adSraf 			sigevp = &sigevent;
1005f841f6adSraf 			break;
1006f841f6adSraf 		}
1007f841f6adSraf 		switch (ntype) {
1008f841f6adSraf 		case SIGEV_NONE:
1009f841f6adSraf 			mq_siginfo.si_signo = 0;
1010f841f6adSraf 			mq_siginfo.si_code = SI_MESGQ;
1011f841f6adSraf 			break;
1012f841f6adSraf 		case SIGEV_SIGNAL:
1013f841f6adSraf 			mq_siginfo.si_signo = sigevp->sigev_signo;
1014f841f6adSraf 			mq_siginfo.si_value = sigevp->sigev_value;
1015f841f6adSraf 			mq_siginfo.si_code = SI_MESGQ;
1016f841f6adSraf 			break;
1017f841f6adSraf 		case SIGEV_THREAD:
1018f841f6adSraf 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
1019f841f6adSraf 			    cancel_if_necessary(tcdp, sigevp))
1020f841f6adSraf 				mqdp->mqd_tcd = NULL;
1021f841f6adSraf 			/* FALLTHROUGH */
1022f841f6adSraf 		case SIGEV_PORT:
1023f841f6adSraf 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
1024f841f6adSraf 				/* we must create a spawner thread */
1025f841f6adSraf 				tcdp = setup_sigev_handler(sigevp, MQ);
1026f841f6adSraf 				if (tcdp == NULL) {
1027f841f6adSraf 					errno = EBADF;
1028f841f6adSraf 					goto bad;
1029f841f6adSraf 				}
1030f841f6adSraf 				tcdp->tcd_msg_enabled = 0;
1031f841f6adSraf 				tcdp->tcd_msg_closing = 0;
1032f841f6adSraf 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1033f841f6adSraf 				if (launch_spawner(tcdp) != 0) {
1034f841f6adSraf 					free_sigev_handler(tcdp);
1035f841f6adSraf 					goto bad;
1036f841f6adSraf 				}
1037f841f6adSraf 				mqdp->mqd_tcd = tcdp;
1038f841f6adSraf 			}
1039f841f6adSraf 			mq_siginfo.si_signo = 0;
1040f841f6adSraf 			mq_siginfo.si_code = SI_MESGQ;
1041f841f6adSraf 			break;
1042f841f6adSraf 		default:
1043f841f6adSraf 			errno = EINVAL;
1044f841f6adSraf 			goto bad;
1045f841f6adSraf 		}
1046f841f6adSraf 
1047f841f6adSraf 		/* register notification */
1048f841f6adSraf 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1049f841f6adSraf 			goto bad;
1050f841f6adSraf 		mqhp->mq_ntype = ntype;
1051f841f6adSraf 		mqhp->mq_des = (uintptr_t)mqdp;
1052f841f6adSraf 		switch (ntype) {
1053f841f6adSraf 		case SIGEV_THREAD:
1054f841f6adSraf 		case SIGEV_PORT:
1055f841f6adSraf 			tcdp->tcd_port = port;
1056f841f6adSraf 			tcdp->tcd_msg_object = mqdp;
1057f841f6adSraf 			tcdp->tcd_msg_userval = userval;
1058f841f6adSraf 			sig_mutex_lock(&tcdp->tcd_lock);
1059f841f6adSraf 			tcdp->tcd_msg_enabled = ntype;
1060f841f6adSraf 			sig_mutex_unlock(&tcdp->tcd_lock);
1061f841f6adSraf 			(void) cond_broadcast(&tcdp->tcd_cv);
1062f841f6adSraf 			break;
1063f841f6adSraf 		}
1064f841f6adSraf 	}
1065f841f6adSraf 
1066f841f6adSraf 	rval = 0;	/* success */
1067f841f6adSraf bad:
1068*34537ab1Sraf 	if (error == 0) {
1069f841f6adSraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
1070*34537ab1Sraf 	} else {
1071*34537ab1Sraf 		errno = EBADMSG;
1072*34537ab1Sraf 		rval = -1;
1073*34537ab1Sraf 	}
1074f841f6adSraf 	return (rval);
1075f841f6adSraf }
1076f841f6adSraf 
1077f841f6adSraf int
10787257d1b4Sraf mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1079f841f6adSraf {
1080f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1081f841f6adSraf 	mqhdr_t *mqhp;
1082f841f6adSraf 	uint_t	flag = 0;
1083f841f6adSraf 
1084f841f6adSraf 	if (!mq_is_valid(mqdp)) {
1085f841f6adSraf 		errno = EBADF;
1086f841f6adSraf 		return (-1);
1087f841f6adSraf 	}
1088f841f6adSraf 
1089f841f6adSraf 	/* store current attributes */
1090f841f6adSraf 	if (omqstat != NULL) {
1091f841f6adSraf 		int	count;
1092f841f6adSraf 
1093f841f6adSraf 		mqhp = mqdp->mqd_mq;
1094f841f6adSraf 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1095f841f6adSraf 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1096f841f6adSraf 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1097f841f6adSraf 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1098f841f6adSraf 		omqstat->mq_curmsgs = count;
1099f841f6adSraf 	}
1100f841f6adSraf 
1101f841f6adSraf 	/* set description attributes */
1102f841f6adSraf 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1103f841f6adSraf 		flag = FNONBLOCK;
1104f841f6adSraf 	mqdp->mqd_mqdn->mqdn_flags = flag;
1105f841f6adSraf 
1106f841f6adSraf 	return (0);
1107f841f6adSraf }
1108f841f6adSraf 
1109f841f6adSraf int
11107257d1b4Sraf mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1111f841f6adSraf {
1112f841f6adSraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1113f841f6adSraf 	mqhdr_t *mqhp;
1114f841f6adSraf 	int count;
1115f841f6adSraf 
1116f841f6adSraf 	if (!mq_is_valid(mqdp)) {
1117f841f6adSraf 		errno = EBADF;
1118f841f6adSraf 		return (-1);
1119f841f6adSraf 	}
1120f841f6adSraf 
1121f841f6adSraf 	mqhp = mqdp->mqd_mq;
1122f841f6adSraf 
1123f841f6adSraf 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1124f841f6adSraf 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1125f841f6adSraf 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1126f841f6adSraf 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1127f841f6adSraf 	mqstat->mq_curmsgs = count;
1128f841f6adSraf 	return (0);
1129f841f6adSraf }
1130f841f6adSraf 
1131f841f6adSraf /*
1132f841f6adSraf  * Cleanup after fork1() in the child process.
1133f841f6adSraf  */
1134f841f6adSraf void
1135f841f6adSraf postfork1_child_sigev_mq(void)
1136f841f6adSraf {
1137f841f6adSraf 	thread_communication_data_t *tcdp;
1138f841f6adSraf 	mqdes_t *mqdp;
1139f841f6adSraf 
1140f841f6adSraf 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1141f841f6adSraf 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1142f841f6adSraf 			mqdp->mqd_tcd = NULL;
1143f841f6adSraf 			tcd_teardown(tcdp);
1144f841f6adSraf 		}
1145f841f6adSraf 	}
1146f841f6adSraf }
1147