xref: /titanic_52/usr/src/lib/libc/port/rt/mqueue.c (revision 22f5594a529d50114d839d4ddecc2c499731a3d7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "lint.h"
30 #include "mtlib.h"
31 #define	_KMEMUSER
32 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
33 #undef	_KMEMUSER
34 #include <mqueue.h>
35 #include <sys/types.h>
36 #include <sys/file.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #include <stdarg.h>
40 #include <limits.h>
41 #include <pthread.h>
42 #include <assert.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <stdlib.h>
46 #include <sys/stat.h>
47 #include <inttypes.h>
48 #include "sigev_thread.h"
49 #include "pos4obj.h"
50 
51 /*
52  * Default values per message queue
53  */
54 #define	MQ_MAXMSG	128
55 #define	MQ_MAXSIZE	1024
56 
57 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
58 
59 /*
60  * Message header which is part of messages in link list
61  */
62 typedef struct {
63 	uint64_t 	msg_next;	/* offset of next message in the link */
64 	uint64_t	msg_len;	/* length of the message */
65 } msghdr_t;
66 
67 /*
68  * message queue description
69  */
70 struct mq_dn {
71 	size_t		mqdn_flags;	/* open description flags */
72 };
73 
74 /*
75  * message queue descriptor structure
76  */
77 typedef struct mq_des {
78 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
79 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
80 	int		mqd_magic;	/* magic # to identify mq_des */
81 	int		mqd_flags;	/* operation flag per open */
82 	struct mq_header *mqd_mq;	/* address pointer of message Q */
83 	struct mq_dn	*mqd_mqdn;	/* open	description */
84 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
85 } mqdes_t;
86 
87 /*
88  * message queue common header, part of the mmap()ed file.
89  * Since message queues may be shared between 32- and 64-bit processes,
90  * care must be taken to make sure that the elements of this structure
91  * are identical for both _LP64 and _ILP32 cases.
92  */
93 typedef struct mq_header {
94 	/* first field must be mq_totsize, DO NOT insert before this	*/
95 	int64_t		mq_totsize;	/* total size of the Queue */
96 	int64_t		mq_maxsz;	/* max size of each message */
97 	uint32_t	mq_maxmsg;	/* max messages in the queue */
98 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
99 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
100 	uint32_t	mq_mask;	/* priority bitmask */
101 	uint64_t	mq_freep;	/* free message's head pointer */
102 	uint64_t	mq_headpp;	/* pointer to head pointers */
103 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
104 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
105 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
106 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
107 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
108 	sem_t		mq_rblocked;	/* number of processes rblocked */
109 	sem_t		mq_notfull;	/* mq_send()'s block on this */
110 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
111 	sem_t		mq_spawner;	/* spawner thread blocks on this */
112 } mqhdr_t;
113 
114 /*
115  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
116  * If this assumption is somehow invalidated, mq_open() needs to be changed
117  * back to the old version which kept a count and enforced a limit.
118  * We make sure that this is pointed out to those changing <sys/param.h>
119  * by checking _MQ_OPEN_MAX at compile time.
120  */
121 #if _MQ_OPEN_MAX != -1
122 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
123 #endif
124 
125 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
126 
127 #ifdef DEBUG
128 #define	MQ_ASSERT(x)	assert(x);
129 
130 #define	MQ_ASSERT_PTR(_m, _p) \
131 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
132 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
133 	    _m->mq_totsize));
134 
135 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
136 	int _val; \
137 	(void) sem_getvalue((sem), &_val); \
138 	assert((_val) <= val); }
139 #else
140 #define	MQ_ASSERT(x)
141 #define	MQ_ASSERT_PTR(_m, _p)
142 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
143 #endif
144 
145 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
146 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
147 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
148 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
149 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
150 
151 #define	MQ_RESERVED	((mqdes_t *)-1)
152 
153 #define	ABS_TIME	0
154 #define	REL_TIME	1
155 
156 static mutex_t mq_list_lock = DEFAULTMUTEX;
157 static mqdes_t *mq_list = NULL;
158 
159 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
160 
161 static int
162 mq_is_valid(mqdes_t *mqdp)
163 {
164 	/*
165 	 * Any use of a message queue after it was closed is
166 	 * undefined.  But the standard strongly favours EBADF
167 	 * returns.  Before we dereference which could be fatal,
168 	 * we first do some pointer sanity checks.
169 	 */
170 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
171 	    ((uintptr_t)mqdp & 0x7) == 0) {
172 		return (mqdp->mqd_magic == MQ_MAGIC);
173 	}
174 
175 	return (0);
176 }
177 
178 static void
179 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
180 {
181 	int		i;
182 	uint64_t	temp;
183 	uint64_t	currentp;
184 	uint64_t	nextp;
185 
186 	/*
187 	 * We only need to initialize the non-zero fields.  The use of
188 	 * ftruncate() on the message queue file assures that the
189 	 * pages will be zfod.
190 	 */
191 	(void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL);
192 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
193 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
194 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
195 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
196 
197 	mqhp->mq_maxsz = msgsize;
198 	mqhp->mq_maxmsg = maxmsg;
199 
200 	/*
201 	 * As of this writing (1997), there are 32 message queue priorities.
202 	 * If this is to change, then the size of the mq_mask will
203 	 * also have to change.  If DEBUG is defined, assert that
204 	 * _MQ_PRIO_MAX hasn't changed.
205 	 */
206 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
207 #if defined(DEBUG)
208 	/* LINTED always true */
209 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
210 #endif
211 
212 	/*
213 	 * Since the message queue can be mapped into different
214 	 * virtual address ranges by different processes, we don't
215 	 * keep track of pointers, only offsets into the shared region.
216 	 */
217 	mqhp->mq_headpp = sizeof (mqhdr_t);
218 	mqhp->mq_tailpp = mqhp->mq_headpp +
219 	    mqhp->mq_maxprio * sizeof (uint64_t);
220 	mqhp->mq_freep = mqhp->mq_tailpp +
221 	    mqhp->mq_maxprio * sizeof (uint64_t);
222 
223 	currentp = mqhp->mq_freep;
224 	MQ_PTR(mqhp, currentp)->msg_next = 0;
225 
226 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
227 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
228 		nextp = currentp + sizeof (msghdr_t) + temp;
229 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
230 		MQ_PTR(mqhp, nextp)->msg_next = 0;
231 		currentp = nextp;
232 	}
233 }
234 
235 static size_t
236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
237 {
238 	uint64_t currentp;
239 	msghdr_t *curbuf;
240 	uint64_t *headpp;
241 	uint64_t *tailpp;
242 
243 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
244 
245 	/*
246 	 * Get the head and tail pointers for the queue of maximum
247 	 * priority.  We shouldn't be here unless there is a message for
248 	 * us, so it's fair to assert that both the head and tail
249 	 * pointers are non-NULL.
250 	 */
251 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
252 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
253 
254 	if (msg_prio != NULL)
255 		*msg_prio = mqhp->mq_curmaxprio;
256 
257 	currentp = *headpp;
258 	MQ_ASSERT_PTR(mqhp, currentp);
259 	curbuf = MQ_PTR(mqhp, currentp);
260 
261 	if ((*headpp = curbuf->msg_next) == NULL) {
262 		/*
263 		 * We just nuked the last message in this priority's queue.
264 		 * Twiddle this priority's bit, and then find the next bit
265 		 * tipped.
266 		 */
267 		uint_t prio = mqhp->mq_curmaxprio;
268 
269 		mqhp->mq_mask &= ~(1u << prio);
270 
271 		for (; prio != 0; prio--)
272 			if (mqhp->mq_mask & (1u << prio))
273 				break;
274 		mqhp->mq_curmaxprio = prio;
275 
276 		*tailpp = NULL;
277 	}
278 
279 	/*
280 	 * Copy the message, and put the buffer back on the free list.
281 	 */
282 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
283 	curbuf->msg_next = mqhp->mq_freep;
284 	mqhp->mq_freep = currentp;
285 
286 	return (curbuf->msg_len);
287 }
288 
289 
290 static void
291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
292 {
293 	uint64_t currentp;
294 	msghdr_t *curbuf;
295 	uint64_t *headpp;
296 	uint64_t *tailpp;
297 
298 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
299 
300 	/*
301 	 * Grab a free message block, and link it in.  We shouldn't
302 	 * be here unless there is room in the queue for us;  it's
303 	 * fair to assert that the free pointer is non-NULL.
304 	 */
305 	currentp = mqhp->mq_freep;
306 	MQ_ASSERT_PTR(mqhp, currentp);
307 	curbuf = MQ_PTR(mqhp, currentp);
308 
309 	/*
310 	 * Remove a message from the free list, and copy in the new contents.
311 	 */
312 	mqhp->mq_freep = curbuf->msg_next;
313 	curbuf->msg_next = NULL;
314 	(void) memcpy((char *)&curbuf[1], msgp, len);
315 	curbuf->msg_len = len;
316 
317 	headpp = HEAD_PTR(mqhp, prio);
318 	tailpp = TAIL_PTR(mqhp, prio);
319 
320 	if (*tailpp == 0) {
321 		/*
322 		 * This is the first message on this queue.  Set the
323 		 * head and tail pointers, and tip the appropriate bit
324 		 * in the priority mask.
325 		 */
326 		*headpp = currentp;
327 		*tailpp = currentp;
328 		mqhp->mq_mask |= (1u << prio);
329 		if (prio > mqhp->mq_curmaxprio)
330 			mqhp->mq_curmaxprio = prio;
331 	} else {
332 		MQ_ASSERT_PTR(mqhp, *tailpp);
333 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
334 		*tailpp = currentp;
335 	}
336 }
337 
338 mqd_t
339 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
340 {
341 	va_list		ap;
342 	mode_t		mode;
343 	struct mq_attr	*attr;
344 	int		fd;
345 	int		err;
346 	int		cr_flag = 0;
347 	int		locked = 0;
348 	uint64_t	total_size;
349 	size_t		msgsize;
350 	ssize_t		maxmsg;
351 	uint64_t	temp;
352 	void		*ptr;
353 	mqdes_t		*mqdp;
354 	mqhdr_t		*mqhp;
355 	struct mq_dn	*mqdnp;
356 
357 	if (__pos4obj_check(path) == -1)
358 		return ((mqd_t)-1);
359 
360 	/* acquire MSGQ lock to have atomic operation */
361 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
362 		goto out;
363 	locked = 1;
364 
365 	va_start(ap, oflag);
366 	/* filter oflag to have READ/WRITE/CREATE modes only */
367 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
368 	if ((oflag & O_CREAT) != 0) {
369 		mode = va_arg(ap, mode_t);
370 		attr = va_arg(ap, struct mq_attr *);
371 	}
372 	va_end(ap);
373 
374 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
375 	    mode, &cr_flag)) < 0)
376 		goto out;
377 
378 	/* closing permission file */
379 	(void) __close_nc(fd);
380 
381 	/* Try to open/create data file */
382 	if (cr_flag) {
383 		cr_flag = PFILE_CREATE;
384 		if (attr == NULL) {
385 			maxmsg = MQ_MAXMSG;
386 			msgsize = MQ_MAXSIZE;
387 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
388 			errno = EINVAL;
389 			goto out;
390 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
391 			errno = ENOSPC;
392 			goto out;
393 		} else {
394 			maxmsg = attr->mq_maxmsg;
395 			msgsize = attr->mq_msgsize;
396 		}
397 
398 		/* adjust for message size at word boundary */
399 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
400 
401 		total_size = sizeof (mqhdr_t) +
402 		    maxmsg * (temp + sizeof (msghdr_t)) +
403 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
404 
405 		if (total_size > SSIZE_MAX) {
406 			errno = ENOSPC;
407 			goto out;
408 		}
409 
410 		/*
411 		 * data file is opened with read/write to those
412 		 * who have read or write permission
413 		 */
414 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
415 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
416 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
417 			goto out;
418 
419 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
420 
421 		/* force permissions to avoid umask effect */
422 		if (fchmod(fd, mode) < 0)
423 			goto out;
424 
425 		if (ftruncate64(fd, (off64_t)total_size) < 0)
426 			goto out;
427 	} else {
428 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
429 		    O_RDWR, 0666, &err)) < 0)
430 			goto out;
431 		cr_flag = DFILE_OPEN;
432 
433 		/* Message queue has not been initialized yet */
434 		if (read(fd, &total_size, sizeof (total_size)) !=
435 		    sizeof (total_size) || total_size == 0) {
436 			errno = ENOENT;
437 			goto out;
438 		}
439 
440 		/* Message queue too big for this process to handle */
441 		if (total_size > SSIZE_MAX) {
442 			errno = EFBIG;
443 			goto out;
444 		}
445 	}
446 
447 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
448 		errno = ENOMEM;
449 		goto out;
450 	}
451 	cr_flag |= ALLOC_MEM;
452 
453 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
454 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
455 		goto out;
456 	mqhp = ptr;
457 	cr_flag |= DFILE_MMAP;
458 
459 	/* closing data file */
460 	(void) __close_nc(fd);
461 	cr_flag &= ~DFILE_OPEN;
462 
463 	/*
464 	 * create, unlink, size, mmap, and close description file
465 	 * all for a flag word in anonymous shared memory
466 	 */
467 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
468 	    0666, &err)) < 0)
469 		goto out;
470 	cr_flag |= DFILE_OPEN;
471 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
472 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
473 		goto out;
474 
475 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
476 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
477 		goto out;
478 	mqdnp = ptr;
479 	cr_flag |= MQDNP_MMAP;
480 
481 	(void) __close_nc(fd);
482 	cr_flag &= ~DFILE_OPEN;
483 
484 	/*
485 	 * we follow the same strategy as filesystem open() routine,
486 	 * where fcntl.h flags are changed to flags defined in file.h.
487 	 */
488 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
489 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
490 
491 	/* new message queue requires initialization */
492 	if ((cr_flag & DFILE_CREATE) != 0) {
493 		/* message queue header has to be initialized */
494 		mq_init(mqhp, msgsize, maxmsg);
495 		mqhp->mq_totsize = total_size;
496 	}
497 	mqdp->mqd_mq = mqhp;
498 	mqdp->mqd_mqdn = mqdnp;
499 	mqdp->mqd_magic = MQ_MAGIC;
500 	mqdp->mqd_tcd = NULL;
501 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
502 		lmutex_lock(&mq_list_lock);
503 		mqdp->mqd_next = mq_list;
504 		mqdp->mqd_prev = NULL;
505 		if (mq_list)
506 			mq_list->mqd_prev = mqdp;
507 		mq_list = mqdp;
508 		lmutex_unlock(&mq_list_lock);
509 		return ((mqd_t)mqdp);
510 	}
511 
512 	locked = 0;	/* fall into the error case */
513 out:
514 	err = errno;
515 	if ((cr_flag & DFILE_OPEN) != 0)
516 		(void) __close_nc(fd);
517 	if ((cr_flag & DFILE_CREATE) != 0)
518 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
519 	if ((cr_flag & PFILE_CREATE) != 0)
520 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
521 	if ((cr_flag & ALLOC_MEM) != 0)
522 		free((void *)mqdp);
523 	if ((cr_flag & DFILE_MMAP) != 0)
524 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
525 	if ((cr_flag & MQDNP_MMAP) != 0)
526 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
527 	if (locked)
528 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
529 	errno = err;
530 	return ((mqd_t)-1);
531 }
532 
533 static void
534 mq_close_cleanup(mqdes_t *mqdp)
535 {
536 	mqhdr_t *mqhp = mqdp->mqd_mq;
537 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
538 
539 	/* invalidate the descriptor before freeing it */
540 	mqdp->mqd_magic = 0;
541 	(void) mutex_unlock(&mqhp->mq_exclusive);
542 
543 	lmutex_lock(&mq_list_lock);
544 	if (mqdp->mqd_next)
545 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
546 	if (mqdp->mqd_prev)
547 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
548 	if (mq_list == mqdp)
549 		mq_list = mqdp->mqd_next;
550 	lmutex_unlock(&mq_list_lock);
551 
552 	free(mqdp);
553 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
554 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
555 }
556 
557 int
558 mq_close(mqd_t mqdes)
559 {
560 	mqdes_t *mqdp = (mqdes_t *)mqdes;
561 	mqhdr_t *mqhp;
562 	thread_communication_data_t *tcdp;
563 
564 	if (!mq_is_valid(mqdp)) {
565 		errno = EBADF;
566 		return (-1);
567 	}
568 
569 	mqhp = mqdp->mqd_mq;
570 	(void) mutex_lock(&mqhp->mq_exclusive);
571 
572 	if (mqhp->mq_des == (uintptr_t)mqdp &&
573 	    mqhp->mq_sigid.sn_pid == getpid()) {
574 		/* notification is set for this descriptor, remove it */
575 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
576 		mqhp->mq_ntype = 0;
577 		mqhp->mq_des = 0;
578 	}
579 
580 	pthread_cleanup_push(mq_close_cleanup, mqdp);
581 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
582 		mqdp->mqd_tcd = NULL;
583 		del_sigev_mq(tcdp);	/* possible cancellation point */
584 	}
585 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
586 
587 	return (0);
588 }
589 
590 int
591 mq_unlink(const char *path)
592 {
593 	int err;
594 
595 	if (__pos4obj_check(path) < 0)
596 		return (-1);
597 
598 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
599 		return (-1);
600 	}
601 
602 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
603 
604 	if (err == 0 || (err == -1 && errno == EEXIST)) {
605 		errno = 0;
606 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
607 	}
608 
609 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
610 		return (-1);
611 
612 	return (err);
613 
614 }
615 
616 static int
617 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
618 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
619 {
620 	mqdes_t *mqdp = (mqdes_t *)mqdes;
621 	mqhdr_t *mqhp;
622 	int err;
623 	int notify = 0;
624 
625 	/*
626 	 * sem_*wait() does cancellation, if called.
627 	 * pthread_testcancel() ensures that cancellation takes place if
628 	 * there is a cancellation pending when mq_*send() is called.
629 	 */
630 	pthread_testcancel();
631 
632 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
633 		errno = EBADF;
634 		return (-1);
635 	}
636 
637 	mqhp = mqdp->mqd_mq;
638 
639 	if (msg_prio >= mqhp->mq_maxprio) {
640 		errno = EINVAL;
641 		return (-1);
642 	}
643 	if (msg_len > mqhp->mq_maxsz) {
644 		errno = EMSGSIZE;
645 		return (-1);
646 	}
647 
648 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
649 		err = sem_trywait(&mqhp->mq_notfull);
650 	else {
651 		/*
652 		 * We might get cancelled here...
653 		 */
654 		if (timeout == NULL)
655 			err = sem_wait(&mqhp->mq_notfull);
656 		else if (abs_rel == ABS_TIME)
657 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
658 		else
659 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
660 	}
661 	if (err == -1) {
662 		/*
663 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
664 		 * by sem_*wait(), so we can just return.
665 		 */
666 		return (-1);
667 	}
668 
669 	/*
670 	 * By the time we're here, we know that we've got the capacity
671 	 * to add to the queue...now acquire the exclusive lock.
672 	 */
673 	(void) mutex_lock(&mqhp->mq_exclusive);
674 
675 	/*
676 	 * Now determine if we want to kick the notification.  POSIX
677 	 * requires that if a process has registered for notification,
678 	 * we must kick it when the queue makes an empty to non-empty
679 	 * transition, and there are no blocked receivers.  Note that
680 	 * this mechanism does _not_ guarantee that the kicked process
681 	 * will be able to receive a message without blocking;
682 	 * another receiver could intervene in the meantime.  Thus,
683 	 * the notification mechanism is inherently racy; all we can
684 	 * do is hope to minimize the window as much as possible.
685 	 * In general, we want to avoid kicking the notification when
686 	 * there are clearly receivers blocked.  We'll determine if
687 	 * we want to kick the notification before the mq_putmsg(),
688 	 * but the actual signotify() won't be done until the message
689 	 * is on the queue.
690 	 */
691 	if (mqhp->mq_sigid.sn_pid != 0) {
692 		int nmessages, nblocked;
693 
694 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
695 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
696 
697 		if (nmessages == 0 && nblocked == 0)
698 			notify = 1;
699 	}
700 
701 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
702 	(void) sem_post(&mqhp->mq_notempty);
703 
704 	if (notify) {
705 		/* notify and also delete the registration */
706 		(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
707 		if (mqhp->mq_ntype == SIGEV_THREAD ||
708 		    mqhp->mq_ntype == SIGEV_PORT)
709 			(void) sem_post(&mqhp->mq_spawner);
710 		mqhp->mq_ntype = 0;
711 		mqhp->mq_des = 0;
712 	}
713 
714 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
715 	(void) mutex_unlock(&mqhp->mq_exclusive);
716 
717 	return (0);
718 }
719 
720 int
721 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
722 {
723 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
724 	    NULL, ABS_TIME));
725 }
726 
727 int
728 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
729 	uint_t msg_prio, const timespec_t *abs_timeout)
730 {
731 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
732 	    abs_timeout, ABS_TIME));
733 }
734 
735 int
736 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
737 	uint_t msg_prio, const timespec_t *rel_timeout)
738 {
739 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
740 	    rel_timeout, REL_TIME));
741 }
742 
743 static void
744 decrement_rblocked(mqhdr_t *mqhp)
745 {
746 	int cancel_state;
747 
748 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
749 	while (sem_wait(&mqhp->mq_rblocked) == -1)
750 		continue;
751 	(void) pthread_setcancelstate(cancel_state, NULL);
752 }
753 
754 static ssize_t
755 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
756 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
757 {
758 	mqdes_t *mqdp = (mqdes_t *)mqdes;
759 	mqhdr_t *mqhp;
760 	ssize_t	msg_size;
761 	int err;
762 
763 	/*
764 	 * sem_*wait() does cancellation, if called.
765 	 * pthread_testcancel() ensures that cancellation takes place if
766 	 * there is a cancellation pending when mq_*receive() is called.
767 	 */
768 	pthread_testcancel();
769 
770 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
771 		errno = EBADF;
772 		return (ssize_t)(-1);
773 	}
774 
775 	mqhp = mqdp->mqd_mq;
776 
777 	if (msg_len < mqhp->mq_maxsz) {
778 		errno = EMSGSIZE;
779 		return (ssize_t)(-1);
780 	}
781 
782 	/*
783 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
784 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
785 	 * we try to take the common case and do a sem_trywait().
786 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
787 	 * then note that we're going to sleep by incrementing the rblocked
788 	 * semaphore.  We decrement that semaphore after waking up.
789 	 */
790 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
791 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
792 			/*
793 			 * errno has been set to EAGAIN or EINTR by
794 			 * sem_trywait(), so we can just return.
795 			 */
796 			return (-1);
797 		}
798 		/*
799 		 * If we're here, then we're probably going to block...
800 		 * increment the rblocked semaphore.  If we get
801 		 * cancelled, decrement_rblocked() will decrement it.
802 		 */
803 		(void) sem_post(&mqhp->mq_rblocked);
804 
805 		pthread_cleanup_push(decrement_rblocked, mqhp);
806 		if (timeout == NULL)
807 			err = sem_wait(&mqhp->mq_notempty);
808 		else if (abs_rel == ABS_TIME)
809 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
810 		else
811 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
812 		pthread_cleanup_pop(1);
813 
814 		if (err == -1) {
815 			/*
816 			 * We took a signal or timeout while waiting
817 			 * on mq_notempty...
818 			 */
819 			return (-1);
820 		}
821 	}
822 
823 	(void) mutex_lock(&mqhp->mq_exclusive);
824 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
825 	(void) sem_post(&mqhp->mq_notfull);
826 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
827 	(void) mutex_unlock(&mqhp->mq_exclusive);
828 
829 	return (msg_size);
830 }
831 
832 ssize_t
833 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
834 {
835 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
836 	    NULL, ABS_TIME));
837 }
838 
839 ssize_t
840 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
841 	uint_t *msg_prio, const timespec_t *abs_timeout)
842 {
843 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
844 	    abs_timeout, ABS_TIME));
845 }
846 
847 ssize_t
848 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
849 	uint_t *msg_prio, const timespec_t *rel_timeout)
850 {
851 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
852 	    rel_timeout, REL_TIME));
853 }
854 
855 /*
856  * Only used below, in mq_notify().
857  * We already have a spawner thread.
858  * Verify that the attributes match; cancel it if necessary.
859  */
860 static int
861 cancel_if_necessary(thread_communication_data_t *tcdp,
862 	const struct sigevent *sigevp)
863 {
864 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
865 	    sigevp->sigev_notify_attributes);
866 
867 	if (do_cancel) {
868 		/*
869 		 * Attributes don't match, cancel the spawner thread.
870 		 */
871 		(void) pthread_cancel(tcdp->tcd_server_id);
872 	} else {
873 		/*
874 		 * Reuse the existing spawner thread with possibly
875 		 * changed notification function and value.
876 		 */
877 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
878 		tcdp->tcd_notif.sigev_signo = 0;
879 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
880 		tcdp->tcd_notif.sigev_notify_function =
881 		    sigevp->sigev_notify_function;
882 	}
883 
884 	return (do_cancel);
885 }
886 
887 int
888 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
889 {
890 	mqdes_t *mqdp = (mqdes_t *)mqdes;
891 	mqhdr_t *mqhp;
892 	thread_communication_data_t *tcdp;
893 	siginfo_t mq_siginfo;
894 	struct sigevent sigevent;
895 	struct stat64 statb;
896 	port_notify_t *pn;
897 	void *userval;
898 	int rval = -1;
899 	int ntype;
900 	int port;
901 
902 	if (!mq_is_valid(mqdp)) {
903 		errno = EBADF;
904 		return (-1);
905 	}
906 
907 	mqhp = mqdp->mqd_mq;
908 
909 	(void) mutex_lock(&mqhp->mq_exclusive);
910 
911 	if (sigevp == NULL) {		/* remove notification */
912 		if (mqhp->mq_des == (uintptr_t)mqdp &&
913 		    mqhp->mq_sigid.sn_pid == getpid()) {
914 			/* notification is set for this descriptor, remove it */
915 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
916 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
917 				sig_mutex_lock(&tcdp->tcd_lock);
918 				if (tcdp->tcd_msg_enabled) {
919 					/* cancel the spawner thread */
920 					tcdp = mqdp->mqd_tcd;
921 					mqdp->mqd_tcd = NULL;
922 					(void) pthread_cancel(
923 					    tcdp->tcd_server_id);
924 				}
925 				sig_mutex_unlock(&tcdp->tcd_lock);
926 			}
927 			mqhp->mq_ntype = 0;
928 			mqhp->mq_des = 0;
929 		} else {
930 			/* notification is not set for this descriptor */
931 			errno = EBUSY;
932 			goto bad;
933 		}
934 	} else {		/* register notification with this process */
935 		switch (ntype = sigevp->sigev_notify) {
936 		case SIGEV_THREAD:
937 			userval = sigevp->sigev_value.sival_ptr;
938 			port = -1;
939 			break;
940 		case SIGEV_PORT:
941 			pn = sigevp->sigev_value.sival_ptr;
942 			userval = pn->portnfy_user;
943 			port = pn->portnfy_port;
944 			if (fstat64(port, &statb) != 0 ||
945 			    !S_ISPORT(statb.st_mode)) {
946 				errno = EBADF;
947 				goto bad;
948 			}
949 			(void) memset(&sigevent, 0, sizeof (sigevent));
950 			sigevent.sigev_notify = SIGEV_PORT;
951 			sigevp = &sigevent;
952 			break;
953 		}
954 		switch (ntype) {
955 		case SIGEV_NONE:
956 			mq_siginfo.si_signo = 0;
957 			mq_siginfo.si_code = SI_MESGQ;
958 			break;
959 		case SIGEV_SIGNAL:
960 			mq_siginfo.si_signo = sigevp->sigev_signo;
961 			mq_siginfo.si_value = sigevp->sigev_value;
962 			mq_siginfo.si_code = SI_MESGQ;
963 			break;
964 		case SIGEV_THREAD:
965 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
966 			    cancel_if_necessary(tcdp, sigevp))
967 				mqdp->mqd_tcd = NULL;
968 			/* FALLTHROUGH */
969 		case SIGEV_PORT:
970 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
971 				/* we must create a spawner thread */
972 				tcdp = setup_sigev_handler(sigevp, MQ);
973 				if (tcdp == NULL) {
974 					errno = EBADF;
975 					goto bad;
976 				}
977 				tcdp->tcd_msg_enabled = 0;
978 				tcdp->tcd_msg_closing = 0;
979 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
980 				if (launch_spawner(tcdp) != 0) {
981 					free_sigev_handler(tcdp);
982 					goto bad;
983 				}
984 				mqdp->mqd_tcd = tcdp;
985 			}
986 			mq_siginfo.si_signo = 0;
987 			mq_siginfo.si_code = SI_MESGQ;
988 			break;
989 		default:
990 			errno = EINVAL;
991 			goto bad;
992 		}
993 
994 		/* register notification */
995 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
996 			goto bad;
997 		mqhp->mq_ntype = ntype;
998 		mqhp->mq_des = (uintptr_t)mqdp;
999 		switch (ntype) {
1000 		case SIGEV_THREAD:
1001 		case SIGEV_PORT:
1002 			tcdp->tcd_port = port;
1003 			tcdp->tcd_msg_object = mqdp;
1004 			tcdp->tcd_msg_userval = userval;
1005 			sig_mutex_lock(&tcdp->tcd_lock);
1006 			tcdp->tcd_msg_enabled = ntype;
1007 			sig_mutex_unlock(&tcdp->tcd_lock);
1008 			(void) cond_broadcast(&tcdp->tcd_cv);
1009 			break;
1010 		}
1011 	}
1012 
1013 	rval = 0;	/* success */
1014 bad:
1015 	(void) mutex_unlock(&mqhp->mq_exclusive);
1016 	return (rval);
1017 }
1018 
1019 int
1020 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1021 {
1022 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1023 	mqhdr_t *mqhp;
1024 	uint_t	flag = 0;
1025 
1026 	if (!mq_is_valid(mqdp)) {
1027 		errno = EBADF;
1028 		return (-1);
1029 	}
1030 
1031 	/* store current attributes */
1032 	if (omqstat != NULL) {
1033 		int	count;
1034 
1035 		mqhp = mqdp->mqd_mq;
1036 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1037 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1038 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1039 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1040 		omqstat->mq_curmsgs = count;
1041 	}
1042 
1043 	/* set description attributes */
1044 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1045 		flag = FNONBLOCK;
1046 	mqdp->mqd_mqdn->mqdn_flags = flag;
1047 
1048 	return (0);
1049 }
1050 
1051 int
1052 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1053 {
1054 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1055 	mqhdr_t *mqhp;
1056 	int count;
1057 
1058 	if (!mq_is_valid(mqdp)) {
1059 		errno = EBADF;
1060 		return (-1);
1061 	}
1062 
1063 	mqhp = mqdp->mqd_mq;
1064 
1065 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1066 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1067 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1068 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1069 	mqstat->mq_curmsgs = count;
1070 	return (0);
1071 }
1072 
1073 /*
1074  * Cleanup after fork1() in the child process.
1075  */
1076 void
1077 postfork1_child_sigev_mq(void)
1078 {
1079 	thread_communication_data_t *tcdp;
1080 	mqdes_t *mqdp;
1081 
1082 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1083 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1084 			mqdp->mqd_tcd = NULL;
1085 			tcd_teardown(tcdp);
1086 		}
1087 	}
1088 }
1089