xref: /illumos-gate/usr/src/lib/libc/port/rt/mqueue.c (revision 75eba5b6d79ed4d2ce3daf7b2806306b6b69a938)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "lint.h"
30 #include "mtlib.h"
31 #define	_KMEMUSER
32 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
33 #undef	_KMEMUSER
34 #include <mqueue.h>
35 #include <sys/types.h>
36 #include <sys/file.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #include <stdarg.h>
40 #include <limits.h>
41 #include <pthread.h>
42 #include <assert.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <stdlib.h>
46 #include <sys/stat.h>
47 #include <inttypes.h>
48 #include "sigev_thread.h"
49 #include "pos4obj.h"
50 
51 /*
52  * Default values per message queue
53  */
54 #define	MQ_MAXMSG	128
55 #define	MQ_MAXSIZE	1024
56 
57 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
58 
59 /*
60  * Message header which is part of messages in link list
61  */
62 typedef struct {
63 	uint64_t 	msg_next;	/* offset of next message in the link */
64 	uint64_t	msg_len;	/* length of the message */
65 } msghdr_t;
66 
67 /*
68  * message queue description
69  */
70 struct mq_dn {
71 	size_t		mqdn_flags;	/* open description flags */
72 };
73 
74 /*
75  * message queue descriptor structure
76  */
77 typedef struct mq_des {
78 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
79 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
80 	int		mqd_magic;	/* magic # to identify mq_des */
81 	int		mqd_flags;	/* operation flag per open */
82 	struct mq_header *mqd_mq;	/* address pointer of message Q */
83 	struct mq_dn	*mqd_mqdn;	/* open	description */
84 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
85 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
86 } mqdes_t;
87 
88 /*
89  * message queue common header, part of the mmap()ed file.
90  * Since message queues may be shared between 32- and 64-bit processes,
91  * care must be taken to make sure that the elements of this structure
92  * are identical for both _LP64 and _ILP32 cases.
93  */
94 typedef struct mq_header {
95 	/* first field must be mq_totsize, DO NOT insert before this	*/
96 	int64_t		mq_totsize;	/* total size of the Queue */
97 	int64_t		mq_maxsz;	/* max size of each message */
98 	uint32_t	mq_maxmsg;	/* max messages in the queue */
99 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
100 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
101 	uint32_t	mq_mask;	/* priority bitmask */
102 	uint64_t	mq_freep;	/* free message's head pointer */
103 	uint64_t	mq_headpp;	/* pointer to head pointers */
104 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
105 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
106 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
107 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
108 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
109 	sem_t		mq_rblocked;	/* number of processes rblocked */
110 	sem_t		mq_notfull;	/* mq_send()'s block on this */
111 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
112 	sem_t		mq_spawner;	/* spawner thread blocks on this */
113 } mqhdr_t;
114 
115 /*
116  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
117  * If this assumption is somehow invalidated, mq_open() needs to be changed
118  * back to the old version which kept a count and enforced a limit.
119  * We make sure that this is pointed out to those changing <sys/param.h>
120  * by checking _MQ_OPEN_MAX at compile time.
121  */
122 #if _MQ_OPEN_MAX != -1
123 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
124 #endif
125 
126 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
127 
128 #ifdef DEBUG
129 #define	MQ_ASSERT(x)	assert(x);
130 
131 #define	MQ_ASSERT_PTR(_m, _p) \
132 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
133 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
134 	    _m->mq_totsize));
135 
136 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
137 	int _val; \
138 	(void) sem_getvalue((sem), &_val); \
139 	assert((_val) <= val); }
140 #else
141 #define	MQ_ASSERT(x)
142 #define	MQ_ASSERT_PTR(_m, _p)
143 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
144 #endif
145 
146 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
147 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
148 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
149 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
150 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
151 
152 #define	MQ_RESERVED	((mqdes_t *)-1)
153 
154 #define	ABS_TIME	0
155 #define	REL_TIME	1
156 
157 static mutex_t mq_list_lock = DEFAULTMUTEX;
158 static mqdes_t *mq_list = NULL;
159 
160 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
161 
162 static int
163 mq_is_valid(mqdes_t *mqdp)
164 {
165 	/*
166 	 * Any use of a message queue after it was closed is
167 	 * undefined.  But the standard strongly favours EBADF
168 	 * returns.  Before we dereference which could be fatal,
169 	 * we first do some pointer sanity checks.
170 	 */
171 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
172 	    ((uintptr_t)mqdp & 0x7) == 0) {
173 		return (mqdp->mqd_magic == MQ_MAGIC);
174 	}
175 
176 	return (0);
177 }
178 
179 static void
180 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
181 {
182 	int		i;
183 	uint64_t	temp;
184 	uint64_t	currentp;
185 	uint64_t	nextp;
186 
187 	/*
188 	 * We only need to initialize the non-zero fields.  The use of
189 	 * ftruncate() on the message queue file assures that the
190 	 * pages will be zero-filled.
191 	 */
192 	(void) mutex_init(&mqhp->mq_exclusive,
193 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
194 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
195 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
196 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
197 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
198 
199 	mqhp->mq_maxsz = msgsize;
200 	mqhp->mq_maxmsg = maxmsg;
201 
202 	/*
203 	 * As of this writing (1997), there are 32 message queue priorities.
204 	 * If this is to change, then the size of the mq_mask will
205 	 * also have to change.  If DEBUG is defined, assert that
206 	 * _MQ_PRIO_MAX hasn't changed.
207 	 */
208 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
209 #if defined(DEBUG)
210 	/* LINTED always true */
211 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
212 #endif
213 
214 	/*
215 	 * Since the message queue can be mapped into different
216 	 * virtual address ranges by different processes, we don't
217 	 * keep track of pointers, only offsets into the shared region.
218 	 */
219 	mqhp->mq_headpp = sizeof (mqhdr_t);
220 	mqhp->mq_tailpp = mqhp->mq_headpp +
221 	    mqhp->mq_maxprio * sizeof (uint64_t);
222 	mqhp->mq_freep = mqhp->mq_tailpp +
223 	    mqhp->mq_maxprio * sizeof (uint64_t);
224 
225 	currentp = mqhp->mq_freep;
226 	MQ_PTR(mqhp, currentp)->msg_next = 0;
227 
228 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
229 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
230 		nextp = currentp + sizeof (msghdr_t) + temp;
231 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
232 		MQ_PTR(mqhp, nextp)->msg_next = 0;
233 		currentp = nextp;
234 	}
235 }
236 
237 static size_t
238 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
239 {
240 	uint64_t currentp;
241 	msghdr_t *curbuf;
242 	uint64_t *headpp;
243 	uint64_t *tailpp;
244 
245 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
246 
247 	/*
248 	 * Get the head and tail pointers for the queue of maximum
249 	 * priority.  We shouldn't be here unless there is a message for
250 	 * us, so it's fair to assert that both the head and tail
251 	 * pointers are non-NULL.
252 	 */
253 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
254 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
255 
256 	if (msg_prio != NULL)
257 		*msg_prio = mqhp->mq_curmaxprio;
258 
259 	currentp = *headpp;
260 	MQ_ASSERT_PTR(mqhp, currentp);
261 	curbuf = MQ_PTR(mqhp, currentp);
262 
263 	if ((*headpp = curbuf->msg_next) == NULL) {
264 		/*
265 		 * We just nuked the last message in this priority's queue.
266 		 * Twiddle this priority's bit, and then find the next bit
267 		 * tipped.
268 		 */
269 		uint_t prio = mqhp->mq_curmaxprio;
270 
271 		mqhp->mq_mask &= ~(1u << prio);
272 
273 		for (; prio != 0; prio--)
274 			if (mqhp->mq_mask & (1u << prio))
275 				break;
276 		mqhp->mq_curmaxprio = prio;
277 
278 		*tailpp = NULL;
279 	}
280 
281 	/*
282 	 * Copy the message, and put the buffer back on the free list.
283 	 */
284 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
285 	curbuf->msg_next = mqhp->mq_freep;
286 	mqhp->mq_freep = currentp;
287 
288 	return (curbuf->msg_len);
289 }
290 
291 
292 static void
293 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
294 {
295 	uint64_t currentp;
296 	msghdr_t *curbuf;
297 	uint64_t *headpp;
298 	uint64_t *tailpp;
299 
300 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
301 
302 	/*
303 	 * Grab a free message block, and link it in.  We shouldn't
304 	 * be here unless there is room in the queue for us;  it's
305 	 * fair to assert that the free pointer is non-NULL.
306 	 */
307 	currentp = mqhp->mq_freep;
308 	MQ_ASSERT_PTR(mqhp, currentp);
309 	curbuf = MQ_PTR(mqhp, currentp);
310 
311 	/*
312 	 * Remove a message from the free list, and copy in the new contents.
313 	 */
314 	mqhp->mq_freep = curbuf->msg_next;
315 	curbuf->msg_next = NULL;
316 	(void) memcpy((char *)&curbuf[1], msgp, len);
317 	curbuf->msg_len = len;
318 
319 	headpp = HEAD_PTR(mqhp, prio);
320 	tailpp = TAIL_PTR(mqhp, prio);
321 
322 	if (*tailpp == 0) {
323 		/*
324 		 * This is the first message on this queue.  Set the
325 		 * head and tail pointers, and tip the appropriate bit
326 		 * in the priority mask.
327 		 */
328 		*headpp = currentp;
329 		*tailpp = currentp;
330 		mqhp->mq_mask |= (1u << prio);
331 		if (prio > mqhp->mq_curmaxprio)
332 			mqhp->mq_curmaxprio = prio;
333 	} else {
334 		MQ_ASSERT_PTR(mqhp, *tailpp);
335 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
336 		*tailpp = currentp;
337 	}
338 }
339 
340 /*
341  * Send a notification and also delete the registration.
342  */
343 static void
344 do_notify(mqhdr_t *mqhp)
345 {
346 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
347 	if (mqhp->mq_ntype == SIGEV_THREAD ||
348 	    mqhp->mq_ntype == SIGEV_PORT)
349 		(void) sem_post(&mqhp->mq_spawner);
350 	mqhp->mq_ntype = 0;
351 	mqhp->mq_des = 0;
352 }
353 
354 /*
355  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
356  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
357  * they fail with errno == EBADMSG.  Trigger any registered notification.
358  */
359 static void
360 owner_dead(mqdes_t *mqdp, int error)
361 {
362 	mqhdr_t *mqhp = mqdp->mqd_mq;
363 
364 	mqdp->mqd_ownerdead = 1;
365 	(void) sem_post(&mqhp->mq_notfull);
366 	(void) sem_post(&mqhp->mq_notempty);
367 	if (error == EOWNERDEAD) {
368 		if (mqhp->mq_sigid.sn_pid != 0)
369 			do_notify(mqhp);
370 		(void) mutex_unlock(&mqhp->mq_exclusive);
371 	}
372 	errno = EBADMSG;
373 }
374 
375 mqd_t
376 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
377 {
378 	va_list		ap;
379 	mode_t		mode = 0;
380 	struct mq_attr	*attr = NULL;
381 	int		fd;
382 	int		err;
383 	int		cr_flag = 0;
384 	int		locked = 0;
385 	uint64_t	total_size;
386 	size_t		msgsize;
387 	ssize_t		maxmsg;
388 	uint64_t	temp;
389 	void		*ptr;
390 	mqdes_t		*mqdp;
391 	mqhdr_t		*mqhp;
392 	struct mq_dn	*mqdnp;
393 
394 	if (__pos4obj_check(path) == -1)
395 		return ((mqd_t)-1);
396 
397 	/* acquire MSGQ lock to have atomic operation */
398 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
399 		goto out;
400 	locked = 1;
401 
402 	va_start(ap, oflag);
403 	/* filter oflag to have READ/WRITE/CREATE modes only */
404 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
405 	if ((oflag & O_CREAT) != 0) {
406 		mode = va_arg(ap, mode_t);
407 		attr = va_arg(ap, struct mq_attr *);
408 	}
409 	va_end(ap);
410 
411 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
412 	    mode, &cr_flag)) < 0)
413 		goto out;
414 
415 	/* closing permission file */
416 	(void) __close_nc(fd);
417 
418 	/* Try to open/create data file */
419 	if (cr_flag) {
420 		cr_flag = PFILE_CREATE;
421 		if (attr == NULL) {
422 			maxmsg = MQ_MAXMSG;
423 			msgsize = MQ_MAXSIZE;
424 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
425 			errno = EINVAL;
426 			goto out;
427 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
428 			errno = ENOSPC;
429 			goto out;
430 		} else {
431 			maxmsg = attr->mq_maxmsg;
432 			msgsize = attr->mq_msgsize;
433 		}
434 
435 		/* adjust for message size at word boundary */
436 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
437 
438 		total_size = sizeof (mqhdr_t) +
439 		    maxmsg * (temp + sizeof (msghdr_t)) +
440 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
441 
442 		if (total_size > SSIZE_MAX) {
443 			errno = ENOSPC;
444 			goto out;
445 		}
446 
447 		/*
448 		 * data file is opened with read/write to those
449 		 * who have read or write permission
450 		 */
451 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
452 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
453 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
454 			goto out;
455 
456 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
457 
458 		/* force permissions to avoid umask effect */
459 		if (fchmod(fd, mode) < 0)
460 			goto out;
461 
462 		if (ftruncate64(fd, (off64_t)total_size) < 0)
463 			goto out;
464 	} else {
465 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
466 		    O_RDWR, 0666, &err)) < 0)
467 			goto out;
468 		cr_flag = DFILE_OPEN;
469 
470 		/* Message queue has not been initialized yet */
471 		if (read(fd, &total_size, sizeof (total_size)) !=
472 		    sizeof (total_size) || total_size == 0) {
473 			errno = ENOENT;
474 			goto out;
475 		}
476 
477 		/* Message queue too big for this process to handle */
478 		if (total_size > SSIZE_MAX) {
479 			errno = EFBIG;
480 			goto out;
481 		}
482 	}
483 
484 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
485 		errno = ENOMEM;
486 		goto out;
487 	}
488 	cr_flag |= ALLOC_MEM;
489 
490 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
491 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
492 		goto out;
493 	mqhp = ptr;
494 	cr_flag |= DFILE_MMAP;
495 
496 	/* closing data file */
497 	(void) __close_nc(fd);
498 	cr_flag &= ~DFILE_OPEN;
499 
500 	/*
501 	 * create, unlink, size, mmap, and close description file
502 	 * all for a flag word in anonymous shared memory
503 	 */
504 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
505 	    0666, &err)) < 0)
506 		goto out;
507 	cr_flag |= DFILE_OPEN;
508 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
509 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
510 		goto out;
511 
512 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
513 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
514 		goto out;
515 	mqdnp = ptr;
516 	cr_flag |= MQDNP_MMAP;
517 
518 	(void) __close_nc(fd);
519 	cr_flag &= ~DFILE_OPEN;
520 
521 	/*
522 	 * we follow the same strategy as filesystem open() routine,
523 	 * where fcntl.h flags are changed to flags defined in file.h.
524 	 */
525 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
526 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
527 
528 	/* new message queue requires initialization */
529 	if ((cr_flag & DFILE_CREATE) != 0) {
530 		/* message queue header has to be initialized */
531 		mq_init(mqhp, msgsize, maxmsg);
532 		mqhp->mq_totsize = total_size;
533 	}
534 	mqdp->mqd_mq = mqhp;
535 	mqdp->mqd_mqdn = mqdnp;
536 	mqdp->mqd_magic = MQ_MAGIC;
537 	mqdp->mqd_tcd = NULL;
538 	mqdp->mqd_ownerdead = 0;
539 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
540 		lmutex_lock(&mq_list_lock);
541 		mqdp->mqd_next = mq_list;
542 		mqdp->mqd_prev = NULL;
543 		if (mq_list)
544 			mq_list->mqd_prev = mqdp;
545 		mq_list = mqdp;
546 		lmutex_unlock(&mq_list_lock);
547 		return ((mqd_t)mqdp);
548 	}
549 
550 	locked = 0;	/* fall into the error case */
551 out:
552 	err = errno;
553 	if ((cr_flag & DFILE_OPEN) != 0)
554 		(void) __close_nc(fd);
555 	if ((cr_flag & DFILE_CREATE) != 0)
556 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
557 	if ((cr_flag & PFILE_CREATE) != 0)
558 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
559 	if ((cr_flag & ALLOC_MEM) != 0)
560 		free((void *)mqdp);
561 	if ((cr_flag & DFILE_MMAP) != 0)
562 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
563 	if ((cr_flag & MQDNP_MMAP) != 0)
564 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
565 	if (locked)
566 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
567 	errno = err;
568 	return ((mqd_t)-1);
569 }
570 
571 static void
572 mq_close_cleanup(mqdes_t *mqdp)
573 {
574 	mqhdr_t *mqhp = mqdp->mqd_mq;
575 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
576 
577 	/* invalidate the descriptor before freeing it */
578 	mqdp->mqd_magic = 0;
579 	if (!mqdp->mqd_ownerdead)
580 		(void) mutex_unlock(&mqhp->mq_exclusive);
581 
582 	lmutex_lock(&mq_list_lock);
583 	if (mqdp->mqd_next)
584 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
585 	if (mqdp->mqd_prev)
586 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
587 	if (mq_list == mqdp)
588 		mq_list = mqdp->mqd_next;
589 	lmutex_unlock(&mq_list_lock);
590 
591 	free(mqdp);
592 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
593 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
594 }
595 
596 int
597 mq_close(mqd_t mqdes)
598 {
599 	mqdes_t *mqdp = (mqdes_t *)mqdes;
600 	mqhdr_t *mqhp;
601 	thread_communication_data_t *tcdp;
602 	int error;
603 
604 	if (!mq_is_valid(mqdp)) {
605 		errno = EBADF;
606 		return (-1);
607 	}
608 
609 	mqhp = mqdp->mqd_mq;
610 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
611 		mqdp->mqd_ownerdead = 1;
612 		if (error == EOWNERDEAD)
613 			(void) mutex_unlock(&mqhp->mq_exclusive);
614 		/* carry on regardless, without holding mq_exclusive */
615 	}
616 
617 	if (mqhp->mq_des == (uintptr_t)mqdp &&
618 	    mqhp->mq_sigid.sn_pid == getpid()) {
619 		/* notification is set for this descriptor, remove it */
620 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
621 		mqhp->mq_ntype = 0;
622 		mqhp->mq_des = 0;
623 	}
624 
625 	pthread_cleanup_push(mq_close_cleanup, mqdp);
626 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
627 		mqdp->mqd_tcd = NULL;
628 		del_sigev_mq(tcdp);	/* possible cancellation point */
629 	}
630 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
631 
632 	return (0);
633 }
634 
635 int
636 mq_unlink(const char *path)
637 {
638 	int err;
639 
640 	if (__pos4obj_check(path) < 0)
641 		return (-1);
642 
643 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
644 		return (-1);
645 	}
646 
647 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
648 
649 	if (err == 0 || (err == -1 && errno == EEXIST)) {
650 		errno = 0;
651 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
652 	}
653 
654 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
655 		return (-1);
656 
657 	return (err);
658 
659 }
660 
661 static int
662 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
663 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
664 {
665 	mqdes_t *mqdp = (mqdes_t *)mqdes;
666 	mqhdr_t *mqhp;
667 	int err;
668 	int notify = 0;
669 
670 	/*
671 	 * sem_*wait() does cancellation, if called.
672 	 * pthread_testcancel() ensures that cancellation takes place if
673 	 * there is a cancellation pending when mq_*send() is called.
674 	 */
675 	pthread_testcancel();
676 
677 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
678 		errno = EBADF;
679 		return (-1);
680 	}
681 
682 	mqhp = mqdp->mqd_mq;
683 
684 	if (msg_prio >= mqhp->mq_maxprio) {
685 		errno = EINVAL;
686 		return (-1);
687 	}
688 	if (msg_len > mqhp->mq_maxsz) {
689 		errno = EMSGSIZE;
690 		return (-1);
691 	}
692 
693 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
694 		err = sem_trywait(&mqhp->mq_notfull);
695 	else {
696 		/*
697 		 * We might get cancelled here...
698 		 */
699 		if (timeout == NULL)
700 			err = sem_wait(&mqhp->mq_notfull);
701 		else if (abs_rel == ABS_TIME)
702 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
703 		else
704 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
705 	}
706 	if (err == -1) {
707 		/*
708 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
709 		 * by sem_*wait(), so we can just return.
710 		 */
711 		return (-1);
712 	}
713 
714 	/*
715 	 * By the time we're here, we know that we've got the capacity
716 	 * to add to the queue...now acquire the exclusive lock.
717 	 */
718 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
719 		owner_dead(mqdp, err);
720 		return (-1);
721 	}
722 
723 	/*
724 	 * Now determine if we want to kick the notification.  POSIX
725 	 * requires that if a process has registered for notification,
726 	 * we must kick it when the queue makes an empty to non-empty
727 	 * transition, and there are no blocked receivers.  Note that
728 	 * this mechanism does _not_ guarantee that the kicked process
729 	 * will be able to receive a message without blocking;
730 	 * another receiver could intervene in the meantime.  Thus,
731 	 * the notification mechanism is inherently racy; all we can
732 	 * do is hope to minimize the window as much as possible.
733 	 * In general, we want to avoid kicking the notification when
734 	 * there are clearly receivers blocked.  We'll determine if
735 	 * we want to kick the notification before the mq_putmsg(),
736 	 * but the actual signotify() won't be done until the message
737 	 * is on the queue.
738 	 */
739 	if (mqhp->mq_sigid.sn_pid != 0) {
740 		int nmessages, nblocked;
741 
742 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
743 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
744 
745 		if (nmessages == 0 && nblocked == 0)
746 			notify = 1;
747 	}
748 
749 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
750 	(void) sem_post(&mqhp->mq_notempty);
751 
752 	if (notify) {
753 		/* notify and also delete the registration */
754 		do_notify(mqhp);
755 	}
756 
757 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
758 	(void) mutex_unlock(&mqhp->mq_exclusive);
759 
760 	return (0);
761 }
762 
763 int
764 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
765 {
766 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
767 	    NULL, ABS_TIME));
768 }
769 
770 int
771 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
772 	uint_t msg_prio, const timespec_t *abs_timeout)
773 {
774 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
775 	    abs_timeout, ABS_TIME));
776 }
777 
778 int
779 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
780 	uint_t msg_prio, const timespec_t *rel_timeout)
781 {
782 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
783 	    rel_timeout, REL_TIME));
784 }
785 
786 static void
787 decrement_rblocked(mqhdr_t *mqhp)
788 {
789 	int cancel_state;
790 
791 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
792 	while (sem_wait(&mqhp->mq_rblocked) == -1)
793 		continue;
794 	(void) pthread_setcancelstate(cancel_state, NULL);
795 }
796 
797 static ssize_t
798 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
799 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
800 {
801 	mqdes_t *mqdp = (mqdes_t *)mqdes;
802 	mqhdr_t *mqhp;
803 	ssize_t	msg_size;
804 	int err;
805 
806 	/*
807 	 * sem_*wait() does cancellation, if called.
808 	 * pthread_testcancel() ensures that cancellation takes place if
809 	 * there is a cancellation pending when mq_*receive() is called.
810 	 */
811 	pthread_testcancel();
812 
813 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
814 		errno = EBADF;
815 		return (ssize_t)(-1);
816 	}
817 
818 	mqhp = mqdp->mqd_mq;
819 
820 	if (msg_len < mqhp->mq_maxsz) {
821 		errno = EMSGSIZE;
822 		return (ssize_t)(-1);
823 	}
824 
825 	/*
826 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
827 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
828 	 * we try to take the common case and do a sem_trywait().
829 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
830 	 * then note that we're going to sleep by incrementing the rblocked
831 	 * semaphore.  We decrement that semaphore after waking up.
832 	 */
833 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
834 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
835 			/*
836 			 * errno has been set to EAGAIN or EINTR by
837 			 * sem_trywait(), so we can just return.
838 			 */
839 			return (-1);
840 		}
841 		/*
842 		 * If we're here, then we're probably going to block...
843 		 * increment the rblocked semaphore.  If we get
844 		 * cancelled, decrement_rblocked() will decrement it.
845 		 */
846 		(void) sem_post(&mqhp->mq_rblocked);
847 
848 		pthread_cleanup_push(decrement_rblocked, mqhp);
849 		if (timeout == NULL)
850 			err = sem_wait(&mqhp->mq_notempty);
851 		else if (abs_rel == ABS_TIME)
852 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
853 		else
854 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
855 		pthread_cleanup_pop(1);
856 
857 		if (err == -1) {
858 			/*
859 			 * We took a signal or timeout while waiting
860 			 * on mq_notempty...
861 			 */
862 			return (-1);
863 		}
864 	}
865 
866 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
867 		owner_dead(mqdp, err);
868 		return (-1);
869 	}
870 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
871 	(void) sem_post(&mqhp->mq_notfull);
872 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
873 	(void) mutex_unlock(&mqhp->mq_exclusive);
874 
875 	return (msg_size);
876 }
877 
878 ssize_t
879 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
880 {
881 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
882 	    NULL, ABS_TIME));
883 }
884 
885 ssize_t
886 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
887 	uint_t *msg_prio, const timespec_t *abs_timeout)
888 {
889 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
890 	    abs_timeout, ABS_TIME));
891 }
892 
893 ssize_t
894 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
895 	uint_t *msg_prio, const timespec_t *rel_timeout)
896 {
897 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
898 	    rel_timeout, REL_TIME));
899 }
900 
901 /*
902  * Only used below, in mq_notify().
903  * We already have a spawner thread.
904  * Verify that the attributes match; cancel it if necessary.
905  */
906 static int
907 cancel_if_necessary(thread_communication_data_t *tcdp,
908 	const struct sigevent *sigevp)
909 {
910 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
911 	    sigevp->sigev_notify_attributes);
912 
913 	if (do_cancel) {
914 		/*
915 		 * Attributes don't match, cancel the spawner thread.
916 		 */
917 		(void) pthread_cancel(tcdp->tcd_server_id);
918 	} else {
919 		/*
920 		 * Reuse the existing spawner thread with possibly
921 		 * changed notification function and value.
922 		 */
923 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
924 		tcdp->tcd_notif.sigev_signo = 0;
925 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
926 		tcdp->tcd_notif.sigev_notify_function =
927 		    sigevp->sigev_notify_function;
928 	}
929 
930 	return (do_cancel);
931 }
932 
933 int
934 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
935 {
936 	mqdes_t *mqdp = (mqdes_t *)mqdes;
937 	mqhdr_t *mqhp;
938 	thread_communication_data_t *tcdp;
939 	siginfo_t mq_siginfo;
940 	struct sigevent sigevent;
941 	struct stat64 statb;
942 	port_notify_t *pn;
943 	void *userval;
944 	int rval = -1;
945 	int ntype;
946 	int port;
947 	int error;
948 
949 	if (!mq_is_valid(mqdp)) {
950 		errno = EBADF;
951 		return (-1);
952 	}
953 
954 	mqhp = mqdp->mqd_mq;
955 
956 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
957 		mqdp->mqd_ownerdead = 1;
958 		sigevp = NULL;
959 		if (error == EOWNERDEAD)
960 			(void) mutex_unlock(&mqhp->mq_exclusive);
961 		/* carry on regardless, without holding mq_exclusive */
962 	}
963 
964 	if (sigevp == NULL) {		/* remove notification */
965 		if (mqhp->mq_des == (uintptr_t)mqdp &&
966 		    mqhp->mq_sigid.sn_pid == getpid()) {
967 			/* notification is set for this descriptor, remove it */
968 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
969 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
970 				sig_mutex_lock(&tcdp->tcd_lock);
971 				if (tcdp->tcd_msg_enabled) {
972 					/* cancel the spawner thread */
973 					tcdp = mqdp->mqd_tcd;
974 					mqdp->mqd_tcd = NULL;
975 					(void) pthread_cancel(
976 					    tcdp->tcd_server_id);
977 				}
978 				sig_mutex_unlock(&tcdp->tcd_lock);
979 			}
980 			mqhp->mq_ntype = 0;
981 			mqhp->mq_des = 0;
982 		} else {
983 			/* notification is not set for this descriptor */
984 			errno = EBUSY;
985 			goto bad;
986 		}
987 	} else {		/* register notification with this process */
988 		switch (ntype = sigevp->sigev_notify) {
989 		case SIGEV_THREAD:
990 			userval = sigevp->sigev_value.sival_ptr;
991 			port = -1;
992 			break;
993 		case SIGEV_PORT:
994 			pn = sigevp->sigev_value.sival_ptr;
995 			userval = pn->portnfy_user;
996 			port = pn->portnfy_port;
997 			if (fstat64(port, &statb) != 0 ||
998 			    !S_ISPORT(statb.st_mode)) {
999 				errno = EBADF;
1000 				goto bad;
1001 			}
1002 			(void) memset(&sigevent, 0, sizeof (sigevent));
1003 			sigevent.sigev_notify = SIGEV_PORT;
1004 			sigevp = &sigevent;
1005 			break;
1006 		}
1007 		switch (ntype) {
1008 		case SIGEV_NONE:
1009 			mq_siginfo.si_signo = 0;
1010 			mq_siginfo.si_code = SI_MESGQ;
1011 			break;
1012 		case SIGEV_SIGNAL:
1013 			mq_siginfo.si_signo = sigevp->sigev_signo;
1014 			mq_siginfo.si_value = sigevp->sigev_value;
1015 			mq_siginfo.si_code = SI_MESGQ;
1016 			break;
1017 		case SIGEV_THREAD:
1018 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
1019 			    cancel_if_necessary(tcdp, sigevp))
1020 				mqdp->mqd_tcd = NULL;
1021 			/* FALLTHROUGH */
1022 		case SIGEV_PORT:
1023 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
1024 				/* we must create a spawner thread */
1025 				tcdp = setup_sigev_handler(sigevp, MQ);
1026 				if (tcdp == NULL) {
1027 					errno = EBADF;
1028 					goto bad;
1029 				}
1030 				tcdp->tcd_msg_enabled = 0;
1031 				tcdp->tcd_msg_closing = 0;
1032 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1033 				if (launch_spawner(tcdp) != 0) {
1034 					free_sigev_handler(tcdp);
1035 					goto bad;
1036 				}
1037 				mqdp->mqd_tcd = tcdp;
1038 			}
1039 			mq_siginfo.si_signo = 0;
1040 			mq_siginfo.si_code = SI_MESGQ;
1041 			break;
1042 		default:
1043 			errno = EINVAL;
1044 			goto bad;
1045 		}
1046 
1047 		/* register notification */
1048 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1049 			goto bad;
1050 		mqhp->mq_ntype = ntype;
1051 		mqhp->mq_des = (uintptr_t)mqdp;
1052 		switch (ntype) {
1053 		case SIGEV_THREAD:
1054 		case SIGEV_PORT:
1055 			tcdp->tcd_port = port;
1056 			tcdp->tcd_msg_object = mqdp;
1057 			tcdp->tcd_msg_userval = userval;
1058 			sig_mutex_lock(&tcdp->tcd_lock);
1059 			tcdp->tcd_msg_enabled = ntype;
1060 			sig_mutex_unlock(&tcdp->tcd_lock);
1061 			(void) cond_broadcast(&tcdp->tcd_cv);
1062 			break;
1063 		}
1064 	}
1065 
1066 	rval = 0;	/* success */
1067 bad:
1068 	if (error == 0) {
1069 		(void) mutex_unlock(&mqhp->mq_exclusive);
1070 	} else {
1071 		errno = EBADMSG;
1072 		rval = -1;
1073 	}
1074 	return (rval);
1075 }
1076 
1077 int
1078 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1079 {
1080 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1081 	mqhdr_t *mqhp;
1082 	uint_t	flag = 0;
1083 
1084 	if (!mq_is_valid(mqdp)) {
1085 		errno = EBADF;
1086 		return (-1);
1087 	}
1088 
1089 	/* store current attributes */
1090 	if (omqstat != NULL) {
1091 		int	count;
1092 
1093 		mqhp = mqdp->mqd_mq;
1094 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1095 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1096 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1097 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1098 		omqstat->mq_curmsgs = count;
1099 	}
1100 
1101 	/* set description attributes */
1102 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1103 		flag = FNONBLOCK;
1104 	mqdp->mqd_mqdn->mqdn_flags = flag;
1105 
1106 	return (0);
1107 }
1108 
1109 int
1110 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1111 {
1112 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1113 	mqhdr_t *mqhp;
1114 	int count;
1115 
1116 	if (!mq_is_valid(mqdp)) {
1117 		errno = EBADF;
1118 		return (-1);
1119 	}
1120 
1121 	mqhp = mqdp->mqd_mq;
1122 
1123 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1124 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1125 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1126 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1127 	mqstat->mq_curmsgs = count;
1128 	return (0);
1129 }
1130 
1131 /*
1132  * Cleanup after fork1() in the child process.
1133  */
1134 void
1135 postfork1_child_sigev_mq(void)
1136 {
1137 	thread_communication_data_t *tcdp;
1138 	mqdes_t *mqdp;
1139 
1140 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1141 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1142 			mqdp->mqd_tcd = NULL;
1143 			tcd_teardown(tcdp);
1144 		}
1145 	}
1146 }
1147