xref: /illumos-gate/usr/src/lib/libc/port/rt/mqueue.c (revision 002c70ff32f5df6f93c15f88d351ce26443e6ee7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #pragma weak mq_open = _mq_open
30 #pragma weak mq_close = _mq_close
31 #pragma weak mq_unlink = _mq_unlink
32 #pragma weak mq_send = _mq_send
33 #pragma weak mq_timedsend = _mq_timedsend
34 #pragma weak mq_reltimedsend_np = _mq_reltimedsend_np
35 #pragma weak mq_receive = _mq_receive
36 #pragma weak mq_timedreceive = _mq_timedreceive
37 #pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np
38 #pragma weak mq_notify = _mq_notify
39 #pragma weak mq_setattr = _mq_setattr
40 #pragma weak mq_getattr = _mq_getattr
41 
42 #include "synonyms.h"
43 #include "mtlib.h"
44 #define	_KMEMUSER
45 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
46 #undef	_KMEMUSER
47 #include <mqueue.h>
48 #include <sys/types.h>
49 #include <sys/file.h>
50 #include <sys/mman.h>
51 #include <errno.h>
52 #include <stdarg.h>
53 #include <limits.h>
54 #include <pthread.h>
55 #include <assert.h>
56 #include <string.h>
57 #include <unistd.h>
58 #include <stdlib.h>
59 #include <sys/stat.h>
60 #include <inttypes.h>
61 #include "sigev_thread.h"
62 #include "pos4obj.h"
63 
64 /*
65  * Default values per message queue
66  */
67 #define	MQ_MAXMSG	128
68 #define	MQ_MAXSIZE	1024
69 
70 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
71 
72 /*
73  * Message header which is part of messages in link list
74  */
75 typedef struct {
76 	uint64_t 	msg_next;	/* offset of next message in the link */
77 	uint64_t	msg_len;	/* length of the message */
78 } msghdr_t;
79 
80 /*
81  * message queue description
82  */
83 struct mq_dn {
84 	size_t		mqdn_flags;	/* open description flags */
85 };
86 
87 /*
88  * message queue descriptor structure
89  */
90 typedef struct mq_des {
91 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
92 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
93 	int		mqd_magic;	/* magic # to identify mq_des */
94 	int		mqd_flags;	/* operation flag per open */
95 	struct mq_header *mqd_mq;	/* address pointer of message Q */
96 	struct mq_dn	*mqd_mqdn;	/* open	description */
97 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
98 } mqdes_t;
99 
100 /*
101  * message queue common header, part of the mmap()ed file.
102  * Since message queues may be shared between 32- and 64-bit processes,
103  * care must be taken to make sure that the elements of this structure
104  * are identical for both _LP64 and _ILP32 cases.
105  */
106 typedef struct mq_header {
107 	/* first field must be mq_totsize, DO NOT insert before this	*/
108 	int64_t		mq_totsize;	/* total size of the Queue */
109 	int64_t		mq_maxsz;	/* max size of each message */
110 	uint32_t	mq_maxmsg;	/* max messages in the queue */
111 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
112 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
113 	uint32_t	mq_mask;	/* priority bitmask */
114 	uint64_t	mq_freep;	/* free message's head pointer */
115 	uint64_t	mq_headpp;	/* pointer to head pointers */
116 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
117 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
118 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
119 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
120 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
121 	sem_t		mq_rblocked;	/* number of processes rblocked */
122 	sem_t		mq_notfull;	/* mq_send()'s block on this */
123 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
124 	sem_t		mq_spawner;	/* spawner thread blocks on this */
125 } mqhdr_t;
126 
127 /*
128  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
129  * If this assumption is somehow invalidated, mq_open() needs to be changed
130  * back to the old version which kept a count and enforced a limit.
131  * We make sure that this is pointed out to those changing <sys/param.h>
132  * by checking _MQ_OPEN_MAX at compile time.
133  */
134 #if _MQ_OPEN_MAX != -1
135 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
136 #endif
137 
138 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
139 
140 #ifdef DEBUG
141 #define	MQ_ASSERT(x)	assert(x);
142 
143 #define	MQ_ASSERT_PTR(_m, _p) \
144 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
145 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
146 	    _m->mq_totsize));
147 
148 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
149 	int _val; \
150 	(void) sem_getvalue((sem), &_val); \
151 	assert((_val) <= val); }
152 #else
153 #define	MQ_ASSERT(x)
154 #define	MQ_ASSERT_PTR(_m, _p)
155 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
156 #endif
157 
158 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
159 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
160 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
161 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
162 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
163 
164 #define	MQ_RESERVED	((mqdes_t *)-1)
165 
166 #define	ABS_TIME	0
167 #define	REL_TIME	1
168 
169 static mutex_t mq_list_lock = DEFAULTMUTEX;
170 static mqdes_t *mq_list = NULL;
171 
172 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
173 
174 static int
175 mq_is_valid(mqdes_t *mqdp)
176 {
177 	/*
178 	 * Any use of a message queue after it was closed is
179 	 * undefined.  But the standard strongly favours EBADF
180 	 * returns.  Before we dereference which could be fatal,
181 	 * we first do some pointer sanity checks.
182 	 */
183 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
184 	    ((uintptr_t)mqdp & 0x7) == 0) {
185 		return (mqdp->mqd_magic == MQ_MAGIC);
186 	}
187 
188 	return (0);
189 }
190 
191 static void
192 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
193 {
194 	int		i;
195 	uint64_t	temp;
196 	uint64_t	currentp;
197 	uint64_t	nextp;
198 
199 	/*
200 	 * We only need to initialize the non-zero fields.  The use of
201 	 * ftruncate() on the message queue file assures that the
202 	 * pages will be zfod.
203 	 */
204 	(void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL);
205 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
206 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
207 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
208 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
209 
210 	mqhp->mq_maxsz = msgsize;
211 	mqhp->mq_maxmsg = maxmsg;
212 
213 	/*
214 	 * As of this writing (1997), there are 32 message queue priorities.
215 	 * If this is to change, then the size of the mq_mask will
216 	 * also have to change.  If DEBUG is defined, assert that
217 	 * _MQ_PRIO_MAX hasn't changed.
218 	 */
219 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
220 #if defined(DEBUG)
221 	/* LINTED always true */
222 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
223 #endif
224 
225 	/*
226 	 * Since the message queue can be mapped into different
227 	 * virtual address ranges by different processes, we don't
228 	 * keep track of pointers, only offsets into the shared region.
229 	 */
230 	mqhp->mq_headpp = sizeof (mqhdr_t);
231 	mqhp->mq_tailpp = mqhp->mq_headpp +
232 		mqhp->mq_maxprio * sizeof (uint64_t);
233 	mqhp->mq_freep = mqhp->mq_tailpp +
234 		mqhp->mq_maxprio * sizeof (uint64_t);
235 
236 	currentp = mqhp->mq_freep;
237 	MQ_PTR(mqhp, currentp)->msg_next = 0;
238 
239 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
240 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
241 		nextp = currentp + sizeof (msghdr_t) + temp;
242 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
243 		MQ_PTR(mqhp, nextp)->msg_next = 0;
244 		currentp = nextp;
245 	}
246 }
247 
248 static size_t
249 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
250 {
251 	uint64_t currentp;
252 	msghdr_t *curbuf;
253 	uint64_t *headpp;
254 	uint64_t *tailpp;
255 
256 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
257 
258 	/*
259 	 * Get the head and tail pointers for the queue of maximum
260 	 * priority.  We shouldn't be here unless there is a message for
261 	 * us, so it's fair to assert that both the head and tail
262 	 * pointers are non-NULL.
263 	 */
264 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
265 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
266 
267 	if (msg_prio != NULL)
268 		*msg_prio = mqhp->mq_curmaxprio;
269 
270 	currentp = *headpp;
271 	MQ_ASSERT_PTR(mqhp, currentp);
272 	curbuf = MQ_PTR(mqhp, currentp);
273 
274 	if ((*headpp = curbuf->msg_next) == NULL) {
275 		/*
276 		 * We just nuked the last message in this priority's queue.
277 		 * Twiddle this priority's bit, and then find the next bit
278 		 * tipped.
279 		 */
280 		uint_t prio = mqhp->mq_curmaxprio;
281 
282 		mqhp->mq_mask &= ~(1u << prio);
283 
284 		for (; prio != 0; prio--)
285 			if (mqhp->mq_mask & (1u << prio))
286 				break;
287 		mqhp->mq_curmaxprio = prio;
288 
289 		*tailpp = NULL;
290 	}
291 
292 	/*
293 	 * Copy the message, and put the buffer back on the free list.
294 	 */
295 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
296 	curbuf->msg_next = mqhp->mq_freep;
297 	mqhp->mq_freep = currentp;
298 
299 	return (curbuf->msg_len);
300 }
301 
302 
303 static void
304 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
305 {
306 	uint64_t currentp;
307 	msghdr_t *curbuf;
308 	uint64_t *headpp;
309 	uint64_t *tailpp;
310 
311 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
312 
313 	/*
314 	 * Grab a free message block, and link it in.  We shouldn't
315 	 * be here unless there is room in the queue for us;  it's
316 	 * fair to assert that the free pointer is non-NULL.
317 	 */
318 	currentp = mqhp->mq_freep;
319 	MQ_ASSERT_PTR(mqhp, currentp);
320 	curbuf = MQ_PTR(mqhp, currentp);
321 
322 	/*
323 	 * Remove a message from the free list, and copy in the new contents.
324 	 */
325 	mqhp->mq_freep = curbuf->msg_next;
326 	curbuf->msg_next = NULL;
327 	(void) memcpy((char *)&curbuf[1], msgp, len);
328 	curbuf->msg_len = len;
329 
330 	headpp = HEAD_PTR(mqhp, prio);
331 	tailpp = TAIL_PTR(mqhp, prio);
332 
333 	if (*tailpp == 0) {
334 		/*
335 		 * This is the first message on this queue.  Set the
336 		 * head and tail pointers, and tip the appropriate bit
337 		 * in the priority mask.
338 		 */
339 		*headpp = currentp;
340 		*tailpp = currentp;
341 		mqhp->mq_mask |= (1u << prio);
342 		if (prio > mqhp->mq_curmaxprio)
343 			mqhp->mq_curmaxprio = prio;
344 	} else {
345 		MQ_ASSERT_PTR(mqhp, *tailpp);
346 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
347 		*tailpp = currentp;
348 	}
349 }
350 
351 mqd_t
352 _mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
353 {
354 	va_list		ap;
355 	mode_t		mode;
356 	struct mq_attr	*attr;
357 	int		fd;
358 	int		err;
359 	int		cr_flag = 0;
360 	int		locked = 0;
361 	uint64_t	total_size;
362 	size_t		msgsize;
363 	ssize_t		maxmsg;
364 	uint64_t	temp;
365 	void		*ptr;
366 	mqdes_t		*mqdp;
367 	mqhdr_t		*mqhp;
368 	struct mq_dn	*mqdnp;
369 
370 	if (__pos4obj_check(path) == -1)
371 		return ((mqd_t)-1);
372 
373 	/* acquire MSGQ lock to have atomic operation */
374 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
375 		goto out;
376 	locked = 1;
377 
378 	va_start(ap, oflag);
379 	/* filter oflag to have READ/WRITE/CREATE modes only */
380 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
381 	if ((oflag & O_CREAT) != 0) {
382 		mode = va_arg(ap, mode_t);
383 		attr = va_arg(ap, struct mq_attr *);
384 	}
385 	va_end(ap);
386 
387 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
388 	    mode, &cr_flag)) < 0)
389 		goto out;
390 
391 	/* closing permission file */
392 	(void) __close_nc(fd);
393 
394 	/* Try to open/create data file */
395 	if (cr_flag) {
396 		cr_flag = PFILE_CREATE;
397 		if (attr == NULL) {
398 			maxmsg = MQ_MAXMSG;
399 			msgsize = MQ_MAXSIZE;
400 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
401 			errno = EINVAL;
402 			goto out;
403 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
404 			errno = ENOSPC;
405 			goto out;
406 		} else {
407 			maxmsg = attr->mq_maxmsg;
408 			msgsize = attr->mq_msgsize;
409 		}
410 
411 		/* adjust for message size at word boundary */
412 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
413 
414 		total_size = sizeof (mqhdr_t) +
415 			maxmsg * (temp + sizeof (msghdr_t)) +
416 			2 * _MQ_PRIO_MAX * sizeof (uint64_t);
417 
418 		if (total_size > SSIZE_MAX) {
419 			errno = ENOSPC;
420 			goto out;
421 		}
422 
423 		/*
424 		 * data file is opened with read/write to those
425 		 * who have read or write permission
426 		 */
427 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
428 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
429 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
430 			goto out;
431 
432 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
433 
434 		/* force permissions to avoid umask effect */
435 		if (fchmod(fd, mode) < 0)
436 			goto out;
437 
438 		if (ftruncate64(fd, (off64_t)total_size) < 0)
439 			goto out;
440 	} else {
441 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
442 		    O_RDWR, 0666, &err)) < 0)
443 			goto out;
444 		cr_flag = DFILE_OPEN;
445 
446 		/* Message queue has not been initialized yet */
447 		if (read(fd, &total_size, sizeof (total_size)) !=
448 		    sizeof (total_size) || total_size == 0) {
449 			errno = ENOENT;
450 			goto out;
451 		}
452 
453 		/* Message queue too big for this process to handle */
454 		if (total_size > SSIZE_MAX) {
455 			errno = EFBIG;
456 			goto out;
457 		}
458 	}
459 
460 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
461 		errno = ENOMEM;
462 		goto out;
463 	}
464 	cr_flag |= ALLOC_MEM;
465 
466 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
467 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
468 		goto out;
469 	mqhp = ptr;
470 	cr_flag |= DFILE_MMAP;
471 
472 	/* closing data file */
473 	(void) __close_nc(fd);
474 	cr_flag &= ~DFILE_OPEN;
475 
476 	/*
477 	 * create, unlink, size, mmap, and close description file
478 	 * all for a flag word in anonymous shared memory
479 	 */
480 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
481 	    0666, &err)) < 0)
482 		goto out;
483 	cr_flag |= DFILE_OPEN;
484 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
485 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
486 		goto out;
487 
488 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
489 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 		goto out;
491 	mqdnp = ptr;
492 	cr_flag |= MQDNP_MMAP;
493 
494 	(void) __close_nc(fd);
495 	cr_flag &= ~DFILE_OPEN;
496 
497 	/*
498 	 * we follow the same strategy as filesystem open() routine,
499 	 * where fcntl.h flags are changed to flags defined in file.h.
500 	 */
501 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
502 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
503 
504 	/* new message queue requires initialization */
505 	if ((cr_flag & DFILE_CREATE) != 0) {
506 		/* message queue header has to be initialized */
507 		mq_init(mqhp, msgsize, maxmsg);
508 		mqhp->mq_totsize = total_size;
509 	}
510 	mqdp->mqd_mq = mqhp;
511 	mqdp->mqd_mqdn = mqdnp;
512 	mqdp->mqd_magic = MQ_MAGIC;
513 	mqdp->mqd_tcd = NULL;
514 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
515 		lmutex_lock(&mq_list_lock);
516 		mqdp->mqd_next = mq_list;
517 		mqdp->mqd_prev = NULL;
518 		if (mq_list)
519 			mq_list->mqd_prev = mqdp;
520 		mq_list = mqdp;
521 		lmutex_unlock(&mq_list_lock);
522 		return ((mqd_t)mqdp);
523 	}
524 
525 	locked = 0;	/* fall into the error case */
526 out:
527 	err = errno;
528 	if ((cr_flag & DFILE_OPEN) != 0)
529 		(void) __close_nc(fd);
530 	if ((cr_flag & DFILE_CREATE) != 0)
531 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
532 	if ((cr_flag & PFILE_CREATE) != 0)
533 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
534 	if ((cr_flag & ALLOC_MEM) != 0)
535 		free((void *)mqdp);
536 	if ((cr_flag & DFILE_MMAP) != 0)
537 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
538 	if ((cr_flag & MQDNP_MMAP) != 0)
539 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
540 	if (locked)
541 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
542 	errno = err;
543 	return ((mqd_t)-1);
544 }
545 
546 static void
547 mq_close_cleanup(mqdes_t *mqdp)
548 {
549 	mqhdr_t *mqhp = mqdp->mqd_mq;
550 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
551 
552 	/* invalidate the descriptor before freeing it */
553 	mqdp->mqd_magic = 0;
554 	(void) mutex_unlock(&mqhp->mq_exclusive);
555 
556 	lmutex_lock(&mq_list_lock);
557 	if (mqdp->mqd_next)
558 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
559 	if (mqdp->mqd_prev)
560 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
561 	if (mq_list == mqdp)
562 		mq_list = mqdp->mqd_next;
563 	lmutex_unlock(&mq_list_lock);
564 
565 	free(mqdp);
566 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
567 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
568 }
569 
570 int
571 _mq_close(mqd_t mqdes)
572 {
573 	mqdes_t *mqdp = (mqdes_t *)mqdes;
574 	mqhdr_t *mqhp;
575 	thread_communication_data_t *tcdp;
576 
577 	if (!mq_is_valid(mqdp)) {
578 		errno = EBADF;
579 		return (-1);
580 	}
581 
582 	mqhp = mqdp->mqd_mq;
583 	(void) mutex_lock(&mqhp->mq_exclusive);
584 
585 	if (mqhp->mq_des == (uintptr_t)mqdp &&
586 	    mqhp->mq_sigid.sn_pid == getpid()) {
587 		/* notification is set for this descriptor, remove it */
588 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
589 		mqhp->mq_ntype = 0;
590 		mqhp->mq_des = 0;
591 	}
592 
593 	pthread_cleanup_push(mq_close_cleanup, mqdp);
594 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
595 		mqdp->mqd_tcd = NULL;
596 		del_sigev_mq(tcdp);	/* possible cancellation point */
597 	}
598 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
599 
600 	return (0);
601 }
602 
603 int
604 _mq_unlink(const char *path)
605 {
606 	int err;
607 
608 	if (__pos4obj_check(path) < 0)
609 		return (-1);
610 
611 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
612 		return (-1);
613 	}
614 
615 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
616 
617 	if (err == 0 || (err == -1 && errno == EEXIST)) {
618 		errno = 0;
619 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
620 	}
621 
622 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
623 		return (-1);
624 
625 	return (err);
626 
627 }
628 
629 static int
630 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
631 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
632 {
633 	mqdes_t *mqdp = (mqdes_t *)mqdes;
634 	mqhdr_t *mqhp;
635 	int err;
636 	int notify = 0;
637 
638 	/*
639 	 * sem_*wait() does cancellation, if called.
640 	 * pthread_testcancel() ensures that cancellation takes place if
641 	 * there is a cancellation pending when mq_*send() is called.
642 	 */
643 	pthread_testcancel();
644 
645 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
646 		errno = EBADF;
647 		return (-1);
648 	}
649 
650 	mqhp = mqdp->mqd_mq;
651 
652 	if (msg_prio >= mqhp->mq_maxprio) {
653 		errno = EINVAL;
654 		return (-1);
655 	}
656 	if (msg_len > mqhp->mq_maxsz) {
657 		errno = EMSGSIZE;
658 		return (-1);
659 	}
660 
661 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
662 		err = sem_trywait(&mqhp->mq_notfull);
663 	else {
664 		/*
665 		 * We might get cancelled here...
666 		 */
667 		if (timeout == NULL)
668 			err = sem_wait(&mqhp->mq_notfull);
669 		else if (abs_rel == ABS_TIME)
670 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
671 		else
672 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
673 	}
674 	if (err == -1) {
675 		/*
676 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
677 		 * by sem_*wait(), so we can just return.
678 		 */
679 		return (-1);
680 	}
681 
682 	/*
683 	 * By the time we're here, we know that we've got the capacity
684 	 * to add to the queue...now acquire the exclusive lock.
685 	 */
686 	(void) mutex_lock(&mqhp->mq_exclusive);
687 
688 	/*
689 	 * Now determine if we want to kick the notification.  POSIX
690 	 * requires that if a process has registered for notification,
691 	 * we must kick it when the queue makes an empty to non-empty
692 	 * transition, and there are no blocked receivers.  Note that
693 	 * this mechanism does _not_ guarantee that the kicked process
694 	 * will be able to receive a message without blocking;
695 	 * another receiver could intervene in the meantime.  Thus,
696 	 * the notification mechanism is inherently racy; all we can
697 	 * do is hope to minimize the window as much as possible.
698 	 * In general, we want to avoid kicking the notification when
699 	 * there are clearly receivers blocked.  We'll determine if
700 	 * we want to kick the notification before the mq_putmsg(),
701 	 * but the actual signotify() won't be done until the message
702 	 * is on the queue.
703 	 */
704 	if (mqhp->mq_sigid.sn_pid != 0) {
705 		int nmessages, nblocked;
706 
707 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
708 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
709 
710 		if (nmessages == 0 && nblocked == 0)
711 			notify = 1;
712 	}
713 
714 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
715 	(void) sem_post(&mqhp->mq_notempty);
716 
717 	if (notify) {
718 		/* notify and also delete the registration */
719 		(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
720 		if (mqhp->mq_ntype == SIGEV_THREAD ||
721 		    mqhp->mq_ntype == SIGEV_PORT)
722 			(void) sem_post(&mqhp->mq_spawner);
723 		mqhp->mq_ntype = 0;
724 		mqhp->mq_des = 0;
725 	}
726 
727 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
728 	(void) mutex_unlock(&mqhp->mq_exclusive);
729 
730 	return (0);
731 }
732 
733 int
734 _mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
735 {
736 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
737 		NULL, ABS_TIME));
738 }
739 
740 int
741 _mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
742 	uint_t msg_prio, const timespec_t *abs_timeout)
743 {
744 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
745 		abs_timeout, ABS_TIME));
746 }
747 
748 int
749 _mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
750 	uint_t msg_prio, const timespec_t *rel_timeout)
751 {
752 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
753 		rel_timeout, REL_TIME));
754 }
755 
756 static void
757 decrement_rblocked(mqhdr_t *mqhp)
758 {
759 	int canstate;
760 
761 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
762 	while (sem_wait(&mqhp->mq_rblocked) == -1)
763 		continue;
764 	(void) pthread_setcancelstate(canstate, NULL);
765 }
766 
767 static ssize_t
768 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
769 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
770 {
771 	mqdes_t *mqdp = (mqdes_t *)mqdes;
772 	mqhdr_t *mqhp;
773 	ssize_t	msg_size;
774 	int err;
775 
776 	/*
777 	 * sem_*wait() does cancellation, if called.
778 	 * pthread_testcancel() ensures that cancellation takes place if
779 	 * there is a cancellation pending when mq_*receive() is called.
780 	 */
781 	pthread_testcancel();
782 
783 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
784 		errno = EBADF;
785 		return (ssize_t)(-1);
786 	}
787 
788 	mqhp = mqdp->mqd_mq;
789 
790 	if (msg_len < mqhp->mq_maxsz) {
791 		errno = EMSGSIZE;
792 		return (ssize_t)(-1);
793 	}
794 
795 	/*
796 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
797 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
798 	 * we try to take the common case and do a sem_trywait().
799 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
800 	 * then note that we're going to sleep by incrementing the rblocked
801 	 * semaphore.  We decrement that semaphore after waking up.
802 	 */
803 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
804 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
805 			/*
806 			 * errno has been set to EAGAIN or EINTR by
807 			 * sem_trywait(), so we can just return.
808 			 */
809 			return (-1);
810 		}
811 		/*
812 		 * If we're here, then we're probably going to block...
813 		 * increment the rblocked semaphore.  If we get
814 		 * cancelled, decrement_rblocked() will decrement it.
815 		 */
816 		(void) sem_post(&mqhp->mq_rblocked);
817 
818 		pthread_cleanup_push(decrement_rblocked, mqhp);
819 		if (timeout == NULL)
820 			err = sem_wait(&mqhp->mq_notempty);
821 		else if (abs_rel == ABS_TIME)
822 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
823 		else
824 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
825 		pthread_cleanup_pop(1);
826 
827 		if (err == -1) {
828 			/*
829 			 * We took a signal or timeout while waiting
830 			 * on mq_notempty...
831 			 */
832 			return (-1);
833 		}
834 	}
835 
836 	(void) mutex_lock(&mqhp->mq_exclusive);
837 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
838 	(void) sem_post(&mqhp->mq_notfull);
839 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
840 	(void) mutex_unlock(&mqhp->mq_exclusive);
841 
842 	return (msg_size);
843 }
844 
845 ssize_t
846 _mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
847 {
848 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
849 		NULL, ABS_TIME));
850 }
851 
852 ssize_t
853 _mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
854 	uint_t *msg_prio, const timespec_t *abs_timeout)
855 {
856 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
857 		abs_timeout, ABS_TIME));
858 }
859 
860 ssize_t
861 _mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
862 	uint_t *msg_prio, const timespec_t *rel_timeout)
863 {
864 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
865 		rel_timeout, REL_TIME));
866 }
867 
868 /*
869  * Only used below, in _mq_notify().
870  * We already have a spawner thread.
871  * Verify that the attributes match; cancel it if necessary.
872  */
873 static int
874 cancel_if_necessary(thread_communication_data_t *tcdp,
875 	const struct sigevent *sigevp)
876 {
877 	int do_cancel = !_pthread_attr_equal(tcdp->tcd_attrp,
878 			    sigevp->sigev_notify_attributes);
879 
880 	if (do_cancel) {
881 		/*
882 		 * Attributes don't match, cancel the spawner thread.
883 		 */
884 		(void) pthread_cancel(tcdp->tcd_server_id);
885 	} else {
886 		/*
887 		 * Reuse the existing spawner thread with possibly
888 		 * changed notification function and value.
889 		 */
890 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
891 		tcdp->tcd_notif.sigev_signo = 0;
892 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
893 		tcdp->tcd_notif.sigev_notify_function =
894 			sigevp->sigev_notify_function;
895 	}
896 
897 	return (do_cancel);
898 }
899 
900 int
901 _mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
902 {
903 	mqdes_t *mqdp = (mqdes_t *)mqdes;
904 	mqhdr_t *mqhp;
905 	thread_communication_data_t *tcdp;
906 	siginfo_t mq_siginfo;
907 	struct sigevent sigevent;
908 	struct stat64 statb;
909 	port_notify_t *pn;
910 	void *userval;
911 	int rval = -1;
912 	int ntype;
913 	int port;
914 
915 	if (!mq_is_valid(mqdp)) {
916 		errno = EBADF;
917 		return (-1);
918 	}
919 
920 	mqhp = mqdp->mqd_mq;
921 
922 	(void) mutex_lock(&mqhp->mq_exclusive);
923 
924 	if (sigevp == NULL) {		/* remove notification */
925 		if (mqhp->mq_des == (uintptr_t)mqdp &&
926 		    mqhp->mq_sigid.sn_pid == getpid()) {
927 			/* notification is set for this descriptor, remove it */
928 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
929 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
930 				sig_mutex_lock(&tcdp->tcd_lock);
931 				if (tcdp->tcd_msg_enabled) {
932 					/* cancel the spawner thread */
933 					tcdp = mqdp->mqd_tcd;
934 					mqdp->mqd_tcd = NULL;
935 					(void) pthread_cancel(
936 					    tcdp->tcd_server_id);
937 				}
938 				sig_mutex_unlock(&tcdp->tcd_lock);
939 			}
940 			mqhp->mq_ntype = 0;
941 			mqhp->mq_des = 0;
942 		} else {
943 			/* notification is not set for this descriptor */
944 			errno = EBUSY;
945 			goto bad;
946 		}
947 	} else {		/* register notification with this process */
948 		switch (ntype = sigevp->sigev_notify) {
949 		case SIGEV_THREAD:
950 			userval = sigevp->sigev_value.sival_ptr;
951 			port = -1;
952 			break;
953 		case SIGEV_PORT:
954 			pn = sigevp->sigev_value.sival_ptr;
955 			userval = pn->portnfy_user;
956 			port = pn->portnfy_port;
957 			if (fstat64(port, &statb) != 0 ||
958 			    !S_ISPORT(statb.st_mode)) {
959 				errno = EBADF;
960 				goto bad;
961 			}
962 			(void) memset(&sigevent, 0, sizeof (sigevent));
963 			sigevent.sigev_notify = SIGEV_PORT;
964 			sigevp = &sigevent;
965 			break;
966 		}
967 		switch (ntype) {
968 		case SIGEV_NONE:
969 			mq_siginfo.si_signo = 0;
970 			mq_siginfo.si_code = SI_MESGQ;
971 			break;
972 		case SIGEV_SIGNAL:
973 			mq_siginfo.si_signo = sigevp->sigev_signo;
974 			mq_siginfo.si_value = sigevp->sigev_value;
975 			mq_siginfo.si_code = SI_MESGQ;
976 			break;
977 		case SIGEV_THREAD:
978 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
979 			    cancel_if_necessary(tcdp, sigevp))
980 				mqdp->mqd_tcd = NULL;
981 			/* FALLTHROUGH */
982 		case SIGEV_PORT:
983 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
984 				/* we must create a spawner thread */
985 				tcdp = setup_sigev_handler(sigevp, MQ);
986 				if (tcdp == NULL) {
987 					errno = EBADF;
988 					goto bad;
989 				}
990 				tcdp->tcd_msg_enabled = 0;
991 				tcdp->tcd_msg_closing = 0;
992 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
993 				if (launch_spawner(tcdp) != 0) {
994 					free_sigev_handler(tcdp);
995 					goto bad;
996 				}
997 				mqdp->mqd_tcd = tcdp;
998 			}
999 			mq_siginfo.si_signo = 0;
1000 			mq_siginfo.si_code = SI_MESGQ;
1001 			break;
1002 		default:
1003 			errno = EINVAL;
1004 			goto bad;
1005 		}
1006 
1007 		/* register notification */
1008 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1009 			goto bad;
1010 		mqhp->mq_ntype = ntype;
1011 		mqhp->mq_des = (uintptr_t)mqdp;
1012 		switch (ntype) {
1013 		case SIGEV_THREAD:
1014 		case SIGEV_PORT:
1015 			tcdp->tcd_port = port;
1016 			tcdp->tcd_msg_object = mqdp;
1017 			tcdp->tcd_msg_userval = userval;
1018 			sig_mutex_lock(&tcdp->tcd_lock);
1019 			tcdp->tcd_msg_enabled = ntype;
1020 			sig_mutex_unlock(&tcdp->tcd_lock);
1021 			(void) cond_broadcast(&tcdp->tcd_cv);
1022 			break;
1023 		}
1024 	}
1025 
1026 	rval = 0;	/* success */
1027 bad:
1028 	(void) mutex_unlock(&mqhp->mq_exclusive);
1029 	return (rval);
1030 }
1031 
1032 int
1033 _mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1034 {
1035 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1036 	mqhdr_t *mqhp;
1037 	uint_t	flag = 0;
1038 
1039 	if (!mq_is_valid(mqdp)) {
1040 		errno = EBADF;
1041 		return (-1);
1042 	}
1043 
1044 	/* store current attributes */
1045 	if (omqstat != NULL) {
1046 		int	count;
1047 
1048 		mqhp = mqdp->mqd_mq;
1049 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1050 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1051 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1052 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1053 		omqstat->mq_curmsgs = count;
1054 	}
1055 
1056 	/* set description attributes */
1057 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1058 		flag = FNONBLOCK;
1059 	mqdp->mqd_mqdn->mqdn_flags = flag;
1060 
1061 	return (0);
1062 }
1063 
1064 int
1065 _mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1066 {
1067 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1068 	mqhdr_t *mqhp;
1069 	int count;
1070 
1071 	if (!mq_is_valid(mqdp)) {
1072 		errno = EBADF;
1073 		return (-1);
1074 	}
1075 
1076 	mqhp = mqdp->mqd_mq;
1077 
1078 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1079 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1080 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1081 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1082 	mqstat->mq_curmsgs = count;
1083 	return (0);
1084 }
1085 
1086 /*
1087  * Cleanup after fork1() in the child process.
1088  */
1089 void
1090 postfork1_child_sigev_mq(void)
1091 {
1092 	thread_communication_data_t *tcdp;
1093 	mqdes_t *mqdp;
1094 
1095 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1096 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1097 			mqdp->mqd_tcd = NULL;
1098 			tcd_teardown(tcdp);
1099 		}
1100 	}
1101 }
1102