xref: /illumos-gate/usr/src/lib/libc/port/rt/mqueue.c (revision 6faf52448e142b151fa3deade474be359e7c8698)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "lint.h"
28 #include "mtlib.h"
29 #define	_KMEMUSER
30 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
31 #undef	_KMEMUSER
32 #include <mqueue.h>
33 #include <sys/types.h>
34 #include <sys/file.h>
35 #include <sys/mman.h>
36 #include <errno.h>
37 #include <stdarg.h>
38 #include <limits.h>
39 #include <pthread.h>
40 #include <assert.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <sys/stat.h>
45 #include <inttypes.h>
46 #include "sigev_thread.h"
47 #include "pos4obj.h"
48 
49 /*
50  * Default values per message queue
51  */
52 #define	MQ_MAXMSG	128
53 #define	MQ_MAXSIZE	1024
54 
55 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
56 
57 /*
58  * Message header which is part of messages in link list
59  */
60 typedef struct {
61 	uint64_t	msg_next;	/* offset of next message in the link */
62 	uint64_t	msg_len;	/* length of the message */
63 } msghdr_t;
64 
65 /*
66  * message queue description
67  */
68 struct mq_dn {
69 	size_t		mqdn_flags;	/* open description flags */
70 };
71 
72 /*
73  * message queue descriptor structure
74  */
75 typedef struct mq_des {
76 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
77 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
78 	int		mqd_magic;	/* magic # to identify mq_des */
79 	int		mqd_flags;	/* operation flag per open */
80 	struct mq_header *mqd_mq;	/* address pointer of message Q */
81 	struct mq_dn	*mqd_mqdn;	/* open	description */
82 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
83 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
84 } mqdes_t;
85 
86 /*
87  * message queue common header, part of the mmap()ed file.
88  * Since message queues may be shared between 32- and 64-bit processes,
89  * care must be taken to make sure that the elements of this structure
90  * are identical for both _LP64 and _ILP32 cases.
91  */
92 typedef struct mq_header {
93 	/* first field must be mq_totsize, DO NOT insert before this	*/
94 	int64_t		mq_totsize;	/* total size of the Queue */
95 	int64_t		mq_maxsz;	/* max size of each message */
96 	uint32_t	mq_maxmsg;	/* max messages in the queue */
97 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
98 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
99 	uint32_t	mq_mask;	/* priority bitmask */
100 	uint64_t	mq_freep;	/* free message's head pointer */
101 	uint64_t	mq_headpp;	/* pointer to head pointers */
102 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
103 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
104 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
105 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
106 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
107 	sem_t		mq_rblocked;	/* number of processes rblocked */
108 	sem_t		mq_notfull;	/* mq_send()'s block on this */
109 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
110 	sem_t		mq_spawner;	/* spawner thread blocks on this */
111 } mqhdr_t;
112 
113 /*
114  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
115  * If this assumption is somehow invalidated, mq_open() needs to be changed
116  * back to the old version which kept a count and enforced a limit.
117  * We make sure that this is pointed out to those changing <sys/param.h>
118  * by checking _MQ_OPEN_MAX at compile time.
119  */
120 #if _MQ_OPEN_MAX != -1
121 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
122 #endif
123 
124 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
125 
126 #ifdef DEBUG
127 #define	MQ_ASSERT(x)	assert(x);
128 
129 #define	MQ_ASSERT_PTR(_m, _p) \
130 	assert((_p) != 0 && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
131 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
132 	    _m->mq_totsize));
133 
134 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
135 	int _val; \
136 	(void) sem_getvalue((sem), &_val); \
137 	assert((_val) <= val); }
138 #else
139 #define	MQ_ASSERT(x)
140 #define	MQ_ASSERT_PTR(_m, _p)
141 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
142 #endif
143 
144 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
145 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
146 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
147 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
148 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
149 
150 #define	MQ_RESERVED	((mqdes_t *)-1)
151 
152 #define	ABS_TIME	0
153 #define	REL_TIME	1
154 
155 static mutex_t mq_list_lock = DEFAULTMUTEX;
156 static mqdes_t *mq_list = NULL;
157 
158 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
159 
160 static int
161 mq_is_valid(mqdes_t *mqdp)
162 {
163 	/*
164 	 * Any use of a message queue after it was closed is
165 	 * undefined.  But the standard strongly favours EBADF
166 	 * returns.  Before we dereference which could be fatal,
167 	 * we first do some pointer sanity checks.
168 	 */
169 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
170 	    ((uintptr_t)mqdp & 0x7) == 0) {
171 		return (mqdp->mqd_magic == MQ_MAGIC);
172 	}
173 
174 	return (0);
175 }
176 
177 static void
178 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
179 {
180 	int		i;
181 	uint64_t	temp;
182 	uint64_t	currentp;
183 	uint64_t	nextp;
184 
185 	/*
186 	 * We only need to initialize the non-zero fields.  The use of
187 	 * ftruncate() on the message queue file assures that the
188 	 * pages will be zero-filled.
189 	 */
190 	(void) mutex_init(&mqhp->mq_exclusive,
191 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
192 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
193 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
194 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
195 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
196 
197 	mqhp->mq_maxsz = msgsize;
198 	mqhp->mq_maxmsg = maxmsg;
199 
200 	/*
201 	 * As of this writing (1997), there are 32 message queue priorities.
202 	 * If this is to change, then the size of the mq_mask will
203 	 * also have to change.  If DEBUG is defined, assert that
204 	 * _MQ_PRIO_MAX hasn't changed.
205 	 */
206 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
207 #if defined(DEBUG)
208 	/* LINTED always true */
209 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
210 #endif
211 
212 	/*
213 	 * Since the message queue can be mapped into different
214 	 * virtual address ranges by different processes, we don't
215 	 * keep track of pointers, only offsets into the shared region.
216 	 */
217 	mqhp->mq_headpp = sizeof (mqhdr_t);
218 	mqhp->mq_tailpp = mqhp->mq_headpp +
219 	    mqhp->mq_maxprio * sizeof (uint64_t);
220 	mqhp->mq_freep = mqhp->mq_tailpp +
221 	    mqhp->mq_maxprio * sizeof (uint64_t);
222 
223 	currentp = mqhp->mq_freep;
224 	MQ_PTR(mqhp, currentp)->msg_next = 0;
225 
226 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
227 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
228 		nextp = currentp + sizeof (msghdr_t) + temp;
229 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
230 		MQ_PTR(mqhp, nextp)->msg_next = 0;
231 		currentp = nextp;
232 	}
233 }
234 
235 static size_t
236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
237 {
238 	uint64_t currentp;
239 	msghdr_t *curbuf;
240 	uint64_t *headpp;
241 	uint64_t *tailpp;
242 
243 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
244 
245 	/*
246 	 * Get the head and tail pointers for the queue of maximum
247 	 * priority.  We shouldn't be here unless there is a message for
248 	 * us, so it's fair to assert that both the head and tail
249 	 * pointers are non-NULL.
250 	 */
251 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
252 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
253 
254 	if (msg_prio != NULL)
255 		*msg_prio = mqhp->mq_curmaxprio;
256 
257 	currentp = *headpp;
258 	MQ_ASSERT_PTR(mqhp, currentp);
259 	curbuf = MQ_PTR(mqhp, currentp);
260 
261 	if ((*headpp = curbuf->msg_next) == 0) {
262 		/*
263 		 * We just nuked the last message in this priority's queue.
264 		 * Twiddle this priority's bit, and then find the next bit
265 		 * tipped.
266 		 */
267 		uint_t prio = mqhp->mq_curmaxprio;
268 
269 		mqhp->mq_mask &= ~(1u << prio);
270 
271 		for (; prio != 0; prio--)
272 			if (mqhp->mq_mask & (1u << prio))
273 				break;
274 		mqhp->mq_curmaxprio = prio;
275 
276 		*tailpp = 0;
277 	}
278 
279 	/*
280 	 * Copy the message, and put the buffer back on the free list.
281 	 */
282 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
283 	curbuf->msg_next = mqhp->mq_freep;
284 	mqhp->mq_freep = currentp;
285 
286 	return (curbuf->msg_len);
287 }
288 
289 
290 static void
291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
292 {
293 	uint64_t currentp;
294 	msghdr_t *curbuf;
295 	uint64_t *headpp;
296 	uint64_t *tailpp;
297 
298 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
299 
300 	/*
301 	 * Grab a free message block, and link it in.  We shouldn't
302 	 * be here unless there is room in the queue for us;  it's
303 	 * fair to assert that the free pointer is non-NULL.
304 	 */
305 	currentp = mqhp->mq_freep;
306 	MQ_ASSERT_PTR(mqhp, currentp);
307 	curbuf = MQ_PTR(mqhp, currentp);
308 
309 	/*
310 	 * Remove a message from the free list, and copy in the new contents.
311 	 */
312 	mqhp->mq_freep = curbuf->msg_next;
313 	curbuf->msg_next = 0;
314 	(void) memcpy((char *)&curbuf[1], msgp, len);
315 	curbuf->msg_len = len;
316 
317 	headpp = HEAD_PTR(mqhp, prio);
318 	tailpp = TAIL_PTR(mqhp, prio);
319 
320 	if (*tailpp == 0) {
321 		/*
322 		 * This is the first message on this queue.  Set the
323 		 * head and tail pointers, and tip the appropriate bit
324 		 * in the priority mask.
325 		 */
326 		*headpp = currentp;
327 		*tailpp = currentp;
328 		mqhp->mq_mask |= (1u << prio);
329 		if (prio > mqhp->mq_curmaxprio)
330 			mqhp->mq_curmaxprio = prio;
331 	} else {
332 		MQ_ASSERT_PTR(mqhp, *tailpp);
333 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
334 		*tailpp = currentp;
335 	}
336 }
337 
338 /*
339  * Send a notification and also delete the registration.
340  */
341 static void
342 do_notify(mqhdr_t *mqhp)
343 {
344 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
345 	if (mqhp->mq_ntype == SIGEV_THREAD ||
346 	    mqhp->mq_ntype == SIGEV_PORT)
347 		(void) sem_post(&mqhp->mq_spawner);
348 	mqhp->mq_ntype = 0;
349 	mqhp->mq_des = 0;
350 }
351 
352 /*
353  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
354  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
355  * they fail with errno == EBADMSG.  Trigger any registered notification.
356  */
357 static void
358 owner_dead(mqdes_t *mqdp, int error)
359 {
360 	mqhdr_t *mqhp = mqdp->mqd_mq;
361 
362 	mqdp->mqd_ownerdead = 1;
363 	(void) sem_post(&mqhp->mq_notfull);
364 	(void) sem_post(&mqhp->mq_notempty);
365 	if (error == EOWNERDEAD) {
366 		if (mqhp->mq_sigid.sn_pid != 0)
367 			do_notify(mqhp);
368 		(void) mutex_unlock(&mqhp->mq_exclusive);
369 	}
370 	errno = EBADMSG;
371 }
372 
373 mqd_t
374 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
375 {
376 	va_list		ap;
377 	mode_t		mode = 0;
378 	struct mq_attr	*attr = NULL;
379 	int		fd;
380 	int		err;
381 	int		cr_flag = 0;
382 	int		locked = 0;
383 	uint64_t	total_size;
384 	size_t		msgsize;
385 	ssize_t		maxmsg;
386 	uint64_t	temp;
387 	void		*ptr;
388 	mqdes_t		*mqdp;
389 	mqhdr_t		*mqhp;
390 	struct mq_dn	*mqdnp;
391 
392 	if (__pos4obj_check(path) == -1)
393 		return ((mqd_t)-1);
394 
395 	/* acquire MSGQ lock to have atomic operation */
396 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
397 		goto out;
398 	locked = 1;
399 
400 	va_start(ap, oflag);
401 	/* filter oflag to have READ/WRITE/CREATE modes only */
402 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
403 	if ((oflag & O_CREAT) != 0) {
404 		mode = va_arg(ap, mode_t);
405 		attr = va_arg(ap, struct mq_attr *);
406 	}
407 	va_end(ap);
408 
409 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
410 	    mode, &cr_flag)) < 0)
411 		goto out;
412 
413 	/* closing permission file */
414 	(void) __close_nc(fd);
415 
416 	/* Try to open/create data file */
417 	if (cr_flag) {
418 		cr_flag = PFILE_CREATE;
419 		if (attr == NULL) {
420 			maxmsg = MQ_MAXMSG;
421 			msgsize = MQ_MAXSIZE;
422 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
423 			errno = EINVAL;
424 			goto out;
425 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
426 			errno = ENOSPC;
427 			goto out;
428 		} else {
429 			maxmsg = attr->mq_maxmsg;
430 			msgsize = attr->mq_msgsize;
431 		}
432 
433 		/* adjust for message size at word boundary */
434 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
435 
436 		total_size = sizeof (mqhdr_t) +
437 		    maxmsg * (temp + sizeof (msghdr_t)) +
438 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
439 
440 		if (total_size > SSIZE_MAX) {
441 			errno = ENOSPC;
442 			goto out;
443 		}
444 
445 		/*
446 		 * data file is opened with read/write to those
447 		 * who have read or write permission
448 		 */
449 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
450 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
451 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
452 			goto out;
453 
454 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
455 
456 		/* force permissions to avoid umask effect */
457 		if (fchmod(fd, mode) < 0)
458 			goto out;
459 
460 		if (ftruncate64(fd, (off64_t)total_size) < 0)
461 			goto out;
462 	} else {
463 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
464 		    O_RDWR, 0666, &err)) < 0)
465 			goto out;
466 		cr_flag = DFILE_OPEN;
467 
468 		/* Message queue has not been initialized yet */
469 		if (read(fd, &total_size, sizeof (total_size)) !=
470 		    sizeof (total_size) || total_size == 0) {
471 			errno = ENOENT;
472 			goto out;
473 		}
474 
475 		/* Message queue too big for this process to handle */
476 		if (total_size > SSIZE_MAX) {
477 			errno = EFBIG;
478 			goto out;
479 		}
480 	}
481 
482 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
483 		errno = ENOMEM;
484 		goto out;
485 	}
486 	cr_flag |= ALLOC_MEM;
487 
488 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 		goto out;
491 	mqhp = ptr;
492 	cr_flag |= DFILE_MMAP;
493 
494 	/* closing data file */
495 	(void) __close_nc(fd);
496 	cr_flag &= ~DFILE_OPEN;
497 
498 	/*
499 	 * create, unlink, size, mmap, and close description file
500 	 * all for a flag word in anonymous shared memory
501 	 */
502 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503 	    0666, &err)) < 0)
504 		goto out;
505 	cr_flag |= DFILE_OPEN;
506 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508 		goto out;
509 
510 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512 		goto out;
513 	mqdnp = ptr;
514 	cr_flag |= MQDNP_MMAP;
515 
516 	(void) __close_nc(fd);
517 	cr_flag &= ~DFILE_OPEN;
518 
519 	/*
520 	 * we follow the same strategy as filesystem open() routine,
521 	 * where fcntl.h flags are changed to flags defined in file.h.
522 	 */
523 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
524 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
525 
526 	/* new message queue requires initialization */
527 	if ((cr_flag & DFILE_CREATE) != 0) {
528 		/* message queue header has to be initialized */
529 		mq_init(mqhp, msgsize, maxmsg);
530 		mqhp->mq_totsize = total_size;
531 	}
532 	mqdp->mqd_mq = mqhp;
533 	mqdp->mqd_mqdn = mqdnp;
534 	mqdp->mqd_magic = MQ_MAGIC;
535 	mqdp->mqd_tcd = NULL;
536 	mqdp->mqd_ownerdead = 0;
537 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
538 		lmutex_lock(&mq_list_lock);
539 		mqdp->mqd_next = mq_list;
540 		mqdp->mqd_prev = NULL;
541 		if (mq_list)
542 			mq_list->mqd_prev = mqdp;
543 		mq_list = mqdp;
544 		lmutex_unlock(&mq_list_lock);
545 		return ((mqd_t)mqdp);
546 	}
547 
548 	locked = 0;	/* fall into the error case */
549 out:
550 	err = errno;
551 	if ((cr_flag & DFILE_OPEN) != 0)
552 		(void) __close_nc(fd);
553 	if ((cr_flag & DFILE_CREATE) != 0)
554 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
555 	if ((cr_flag & PFILE_CREATE) != 0)
556 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
557 	if ((cr_flag & ALLOC_MEM) != 0)
558 		free((void *)mqdp);
559 	if ((cr_flag & DFILE_MMAP) != 0)
560 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
561 	if ((cr_flag & MQDNP_MMAP) != 0)
562 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
563 	if (locked)
564 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
565 	errno = err;
566 	return ((mqd_t)-1);
567 }
568 
569 static void
570 mq_close_cleanup(mqdes_t *mqdp)
571 {
572 	mqhdr_t *mqhp = mqdp->mqd_mq;
573 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
574 
575 	/* invalidate the descriptor before freeing it */
576 	mqdp->mqd_magic = 0;
577 	if (!mqdp->mqd_ownerdead)
578 		(void) mutex_unlock(&mqhp->mq_exclusive);
579 
580 	lmutex_lock(&mq_list_lock);
581 	if (mqdp->mqd_next)
582 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
583 	if (mqdp->mqd_prev)
584 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
585 	if (mq_list == mqdp)
586 		mq_list = mqdp->mqd_next;
587 	lmutex_unlock(&mq_list_lock);
588 
589 	free(mqdp);
590 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
591 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
592 }
593 
594 int
595 mq_close(mqd_t mqdes)
596 {
597 	mqdes_t *mqdp = (mqdes_t *)mqdes;
598 	mqhdr_t *mqhp;
599 	thread_communication_data_t *tcdp;
600 	int error;
601 
602 	if (!mq_is_valid(mqdp)) {
603 		errno = EBADF;
604 		return (-1);
605 	}
606 
607 	mqhp = mqdp->mqd_mq;
608 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
609 		mqdp->mqd_ownerdead = 1;
610 		if (error == EOWNERDEAD)
611 			(void) mutex_unlock(&mqhp->mq_exclusive);
612 		/* carry on regardless, without holding mq_exclusive */
613 	}
614 
615 	if (mqhp->mq_des == (uintptr_t)mqdp &&
616 	    mqhp->mq_sigid.sn_pid == getpid()) {
617 		/* notification is set for this descriptor, remove it */
618 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
619 		mqhp->mq_ntype = 0;
620 		mqhp->mq_des = 0;
621 	}
622 
623 	pthread_cleanup_push(mq_close_cleanup, mqdp);
624 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
625 		mqdp->mqd_tcd = NULL;
626 		del_sigev_mq(tcdp);	/* possible cancellation point */
627 	}
628 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
629 
630 	return (0);
631 }
632 
633 int
634 mq_unlink(const char *path)
635 {
636 	int err;
637 
638 	if (__pos4obj_check(path) < 0)
639 		return (-1);
640 
641 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
642 		return (-1);
643 	}
644 
645 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
646 
647 	if (err == 0 || (err == -1 && errno == EEXIST)) {
648 		errno = 0;
649 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
650 	}
651 
652 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
653 		return (-1);
654 
655 	return (err);
656 
657 }
658 
659 static int
660 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
661     uint_t msg_prio, const timespec_t *timeout, int abs_rel)
662 {
663 	mqdes_t *mqdp = (mqdes_t *)mqdes;
664 	mqhdr_t *mqhp;
665 	int err;
666 	int notify = 0;
667 
668 	/*
669 	 * sem_*wait() does cancellation, if called.
670 	 * pthread_testcancel() ensures that cancellation takes place if
671 	 * there is a cancellation pending when mq_*send() is called.
672 	 */
673 	pthread_testcancel();
674 
675 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
676 		errno = EBADF;
677 		return (-1);
678 	}
679 
680 	mqhp = mqdp->mqd_mq;
681 
682 	if (msg_prio >= mqhp->mq_maxprio) {
683 		errno = EINVAL;
684 		return (-1);
685 	}
686 	if (msg_len > mqhp->mq_maxsz) {
687 		errno = EMSGSIZE;
688 		return (-1);
689 	}
690 
691 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
692 		err = sem_trywait(&mqhp->mq_notfull);
693 	else {
694 		/*
695 		 * We might get cancelled here...
696 		 */
697 		if (timeout == NULL)
698 			err = sem_wait(&mqhp->mq_notfull);
699 		else if (abs_rel == ABS_TIME)
700 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
701 		else
702 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
703 	}
704 	if (err == -1) {
705 		/*
706 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
707 		 * by sem_*wait(), so we can just return.
708 		 */
709 		return (-1);
710 	}
711 
712 	/*
713 	 * By the time we're here, we know that we've got the capacity
714 	 * to add to the queue...now acquire the exclusive lock.
715 	 */
716 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
717 		owner_dead(mqdp, err);
718 		return (-1);
719 	}
720 
721 	/*
722 	 * Now determine if we want to kick the notification.  POSIX
723 	 * requires that if a process has registered for notification,
724 	 * we must kick it when the queue makes an empty to non-empty
725 	 * transition, and there are no blocked receivers.  Note that
726 	 * this mechanism does _not_ guarantee that the kicked process
727 	 * will be able to receive a message without blocking;
728 	 * another receiver could intervene in the meantime.  Thus,
729 	 * the notification mechanism is inherently racy; all we can
730 	 * do is hope to minimize the window as much as possible.
731 	 * In general, we want to avoid kicking the notification when
732 	 * there are clearly receivers blocked.  We'll determine if
733 	 * we want to kick the notification before the mq_putmsg(),
734 	 * but the actual signotify() won't be done until the message
735 	 * is on the queue.
736 	 */
737 	if (mqhp->mq_sigid.sn_pid != 0) {
738 		int nmessages, nblocked;
739 
740 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
741 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
742 
743 		if (nmessages == 0 && nblocked == 0)
744 			notify = 1;
745 	}
746 
747 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
748 	(void) sem_post(&mqhp->mq_notempty);
749 
750 	if (notify) {
751 		/* notify and also delete the registration */
752 		do_notify(mqhp);
753 	}
754 
755 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
756 	(void) mutex_unlock(&mqhp->mq_exclusive);
757 
758 	return (0);
759 }
760 
761 int
762 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
763 {
764 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
765 	    NULL, ABS_TIME));
766 }
767 
768 int
769 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
770     uint_t msg_prio, const timespec_t *abs_timeout)
771 {
772 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
773 	    abs_timeout, ABS_TIME));
774 }
775 
776 int
777 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
778     uint_t msg_prio, const timespec_t *rel_timeout)
779 {
780 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
781 	    rel_timeout, REL_TIME));
782 }
783 
784 static void
785 decrement_rblocked(mqhdr_t *mqhp)
786 {
787 	int cancel_state;
788 
789 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
790 	while (sem_wait(&mqhp->mq_rblocked) == -1)
791 		continue;
792 	(void) pthread_setcancelstate(cancel_state, NULL);
793 }
794 
795 static ssize_t
796 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
797     uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
798 {
799 	mqdes_t *mqdp = (mqdes_t *)mqdes;
800 	mqhdr_t *mqhp;
801 	ssize_t	msg_size;
802 	int err;
803 
804 	/*
805 	 * sem_*wait() does cancellation, if called.
806 	 * pthread_testcancel() ensures that cancellation takes place if
807 	 * there is a cancellation pending when mq_*receive() is called.
808 	 */
809 	pthread_testcancel();
810 
811 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
812 		errno = EBADF;
813 		return (ssize_t)(-1);
814 	}
815 
816 	mqhp = mqdp->mqd_mq;
817 
818 	if (msg_len < mqhp->mq_maxsz) {
819 		errno = EMSGSIZE;
820 		return (ssize_t)(-1);
821 	}
822 
823 	/*
824 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
825 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
826 	 * we try to take the common case and do a sem_trywait().
827 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
828 	 * then note that we're going to sleep by incrementing the rblocked
829 	 * semaphore.  We decrement that semaphore after waking up.
830 	 */
831 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
832 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
833 			/*
834 			 * errno has been set to EAGAIN or EINTR by
835 			 * sem_trywait(), so we can just return.
836 			 */
837 			return (-1);
838 		}
839 		/*
840 		 * If we're here, then we're probably going to block...
841 		 * increment the rblocked semaphore.  If we get
842 		 * cancelled, decrement_rblocked() will decrement it.
843 		 */
844 		(void) sem_post(&mqhp->mq_rblocked);
845 
846 		pthread_cleanup_push(decrement_rblocked, mqhp);
847 		if (timeout == NULL)
848 			err = sem_wait(&mqhp->mq_notempty);
849 		else if (abs_rel == ABS_TIME)
850 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
851 		else
852 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
853 		pthread_cleanup_pop(1);
854 
855 		if (err == -1) {
856 			/*
857 			 * We took a signal or timeout while waiting
858 			 * on mq_notempty...
859 			 */
860 			return (-1);
861 		}
862 	}
863 
864 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
865 		owner_dead(mqdp, err);
866 		return (-1);
867 	}
868 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
869 	(void) sem_post(&mqhp->mq_notfull);
870 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
871 	(void) mutex_unlock(&mqhp->mq_exclusive);
872 
873 	return (msg_size);
874 }
875 
876 ssize_t
877 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
878 {
879 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
880 	    NULL, ABS_TIME));
881 }
882 
883 ssize_t
884 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
885     uint_t *msg_prio, const timespec_t *abs_timeout)
886 {
887 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
888 	    abs_timeout, ABS_TIME));
889 }
890 
891 ssize_t
892 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
893     uint_t *msg_prio, const timespec_t *rel_timeout)
894 {
895 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
896 	    rel_timeout, REL_TIME));
897 }
898 
899 /*
900  * Only used below, in mq_notify().
901  * We already have a spawner thread.
902  * Verify that the attributes match; cancel it if necessary.
903  */
904 static int
905 cancel_if_necessary(thread_communication_data_t *tcdp,
906     const struct sigevent *sigevp)
907 {
908 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
909 	    sigevp->sigev_notify_attributes);
910 
911 	if (do_cancel) {
912 		/*
913 		 * Attributes don't match, cancel the spawner thread.
914 		 */
915 		(void) pthread_cancel(tcdp->tcd_server_id);
916 	} else {
917 		/*
918 		 * Reuse the existing spawner thread with possibly
919 		 * changed notification function and value.
920 		 */
921 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
922 		tcdp->tcd_notif.sigev_signo = 0;
923 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
924 		tcdp->tcd_notif.sigev_notify_function =
925 		    sigevp->sigev_notify_function;
926 	}
927 
928 	return (do_cancel);
929 }
930 
931 int
932 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
933 {
934 	mqdes_t *mqdp = (mqdes_t *)mqdes;
935 	mqhdr_t *mqhp;
936 	thread_communication_data_t *tcdp;
937 	siginfo_t mq_siginfo;
938 	struct sigevent sigevent;
939 	struct stat64 statb;
940 	port_notify_t *pn;
941 	void *userval;
942 	int rval = -1;
943 	int ntype;
944 	int port;
945 	int error;
946 
947 	if (!mq_is_valid(mqdp)) {
948 		errno = EBADF;
949 		return (-1);
950 	}
951 
952 	mqhp = mqdp->mqd_mq;
953 
954 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
955 		mqdp->mqd_ownerdead = 1;
956 		sigevp = NULL;
957 		if (error == EOWNERDEAD)
958 			(void) mutex_unlock(&mqhp->mq_exclusive);
959 		/* carry on regardless, without holding mq_exclusive */
960 	}
961 
962 	if (sigevp == NULL) {		/* remove notification */
963 		if (mqhp->mq_des == (uintptr_t)mqdp &&
964 		    mqhp->mq_sigid.sn_pid == getpid()) {
965 			/* notification is set for this descriptor, remove it */
966 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
967 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
968 				sig_mutex_lock(&tcdp->tcd_lock);
969 				if (tcdp->tcd_msg_enabled) {
970 					/* cancel the spawner thread */
971 					tcdp = mqdp->mqd_tcd;
972 					mqdp->mqd_tcd = NULL;
973 					(void) pthread_cancel(
974 					    tcdp->tcd_server_id);
975 				}
976 				sig_mutex_unlock(&tcdp->tcd_lock);
977 			}
978 			mqhp->mq_ntype = 0;
979 			mqhp->mq_des = 0;
980 		} else {
981 			/* notification is not set for this descriptor */
982 			errno = EBUSY;
983 			goto bad;
984 		}
985 	} else {		/* register notification with this process */
986 		switch (ntype = sigevp->sigev_notify) {
987 		case SIGEV_THREAD:
988 			userval = sigevp->sigev_value.sival_ptr;
989 			port = -1;
990 			break;
991 		case SIGEV_PORT:
992 			pn = sigevp->sigev_value.sival_ptr;
993 			userval = pn->portnfy_user;
994 			port = pn->portnfy_port;
995 			if (fstat64(port, &statb) != 0 ||
996 			    !S_ISPORT(statb.st_mode)) {
997 				errno = EBADF;
998 				goto bad;
999 			}
1000 			(void) memset(&sigevent, 0, sizeof (sigevent));
1001 			sigevent.sigev_notify = SIGEV_PORT;
1002 			sigevp = &sigevent;
1003 			break;
1004 		}
1005 		switch (ntype) {
1006 		case SIGEV_NONE:
1007 			mq_siginfo.si_signo = 0;
1008 			mq_siginfo.si_code = SI_MESGQ;
1009 			break;
1010 		case SIGEV_SIGNAL:
1011 			mq_siginfo.si_signo = sigevp->sigev_signo;
1012 			mq_siginfo.si_value = sigevp->sigev_value;
1013 			mq_siginfo.si_code = SI_MESGQ;
1014 			break;
1015 		case SIGEV_THREAD:
1016 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
1017 			    cancel_if_necessary(tcdp, sigevp))
1018 				mqdp->mqd_tcd = NULL;
1019 			/* FALLTHROUGH */
1020 		case SIGEV_PORT:
1021 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
1022 				/* we must create a spawner thread */
1023 				tcdp = setup_sigev_handler(sigevp, MQ);
1024 				if (tcdp == NULL) {
1025 					errno = EBADF;
1026 					goto bad;
1027 				}
1028 				tcdp->tcd_msg_enabled = 0;
1029 				tcdp->tcd_msg_closing = 0;
1030 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1031 				if (launch_spawner(tcdp) != 0) {
1032 					free_sigev_handler(tcdp);
1033 					goto bad;
1034 				}
1035 				mqdp->mqd_tcd = tcdp;
1036 			}
1037 			mq_siginfo.si_signo = 0;
1038 			mq_siginfo.si_code = SI_MESGQ;
1039 			break;
1040 		default:
1041 			errno = EINVAL;
1042 			goto bad;
1043 		}
1044 
1045 		/* register notification */
1046 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1047 			goto bad;
1048 		mqhp->mq_ntype = ntype;
1049 		mqhp->mq_des = (uintptr_t)mqdp;
1050 		switch (ntype) {
1051 		case SIGEV_THREAD:
1052 		case SIGEV_PORT:
1053 			tcdp->tcd_port = port;
1054 			tcdp->tcd_msg_object = mqdp;
1055 			tcdp->tcd_msg_userval = userval;
1056 			sig_mutex_lock(&tcdp->tcd_lock);
1057 			tcdp->tcd_msg_enabled = ntype;
1058 			sig_mutex_unlock(&tcdp->tcd_lock);
1059 			(void) cond_broadcast(&tcdp->tcd_cv);
1060 			break;
1061 		}
1062 	}
1063 
1064 	rval = 0;	/* success */
1065 bad:
1066 	if (error == 0) {
1067 		(void) mutex_unlock(&mqhp->mq_exclusive);
1068 	} else {
1069 		errno = EBADMSG;
1070 		rval = -1;
1071 	}
1072 	return (rval);
1073 }
1074 
1075 int
1076 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1077 {
1078 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1079 	mqhdr_t *mqhp;
1080 	uint_t	flag = 0;
1081 
1082 	if (!mq_is_valid(mqdp)) {
1083 		errno = EBADF;
1084 		return (-1);
1085 	}
1086 
1087 	/* store current attributes */
1088 	if (omqstat != NULL) {
1089 		int	count;
1090 
1091 		mqhp = mqdp->mqd_mq;
1092 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1093 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1094 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1095 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1096 		omqstat->mq_curmsgs = count;
1097 	}
1098 
1099 	/* set description attributes */
1100 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1101 		flag = FNONBLOCK;
1102 	mqdp->mqd_mqdn->mqdn_flags = flag;
1103 
1104 	return (0);
1105 }
1106 
1107 int
1108 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1109 {
1110 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1111 	mqhdr_t *mqhp;
1112 	int count;
1113 
1114 	if (!mq_is_valid(mqdp)) {
1115 		errno = EBADF;
1116 		return (-1);
1117 	}
1118 
1119 	mqhp = mqdp->mqd_mq;
1120 
1121 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1122 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1123 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1124 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1125 	mqstat->mq_curmsgs = count;
1126 	return (0);
1127 }
1128 
1129 /*
1130  * Cleanup after fork1() in the child process.
1131  */
1132 void
1133 postfork1_child_sigev_mq(void)
1134 {
1135 	thread_communication_data_t *tcdp;
1136 	mqdes_t *mqdp;
1137 
1138 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1139 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1140 			mqdp->mqd_tcd = NULL;
1141 			tcd_teardown(tcdp);
1142 		}
1143 	}
1144 }
1145