xref: /illumos-gate/usr/src/lib/libc/port/rt/mqueue.c (revision e64b3450c162f9326a8d0092ff259671602f2c25)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  * Copyright 2025 MNX Cloud, Inc.
26  */
27 
28 #include "lint.h"
29 #include "mtlib.h"
30 #define	_KMEMUSER
31 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
32 #undef	_KMEMUSER
33 #include <mqueue.h>
34 #include <sys/types.h>
35 #include <sys/file.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <stdarg.h>
39 #include <limits.h>
40 #include <pthread.h>
41 #include <assert.h>
42 #include <string.h>
43 #include <unistd.h>
44 #include <stdlib.h>
45 #include <sys/stat.h>
46 #include <inttypes.h>
47 #include "sigev_thread.h"
48 #include "pos4obj.h"
49 
50 /*
51  * Default values per message queue
52  */
53 #define	MQ_MAXMSG	128
54 #define	MQ_MAXSIZE	1024
55 
56 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
57 
58 /*
59  * Message header which is part of messages in link list
60  */
61 typedef struct {
62 	uint64_t	msg_next;	/* offset of next message in the link */
63 	uint64_t	msg_len;	/* length of the message */
64 } msghdr_t;
65 
66 /*
67  * message queue description
68  */
69 struct mq_dn {
70 	size_t		mqdn_flags;	/* open description flags */
71 };
72 
73 /*
74  * message queue descriptor structure
75  */
76 typedef struct mq_des {
77 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
78 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
79 	int		mqd_magic;	/* magic # to identify mq_des */
80 	int		mqd_flags;	/* operation flag per open */
81 	struct mq_header *mqd_mq;	/* address pointer of message Q */
82 	struct mq_dn	*mqd_mqdn;	/* open	description */
83 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
84 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
85 } mqdes_t;
86 
87 /*
88  * message queue common header, part of the mmap()ed file.
89  * Since message queues may be shared between 32- and 64-bit processes,
90  * care must be taken to make sure that the elements of this structure
91  * are identical for both _LP64 and _ILP32 cases.
92  */
93 typedef struct mq_header {
94 	/* first field must be mq_totsize, DO NOT insert before this	*/
95 	int64_t		mq_totsize;	/* total size of the Queue */
96 	int64_t		mq_maxsz;	/* max size of each message */
97 	uint32_t	mq_maxmsg;	/* max messages in the queue */
98 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
99 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
100 	uint32_t	mq_mask;	/* priority bitmask */
101 	uint64_t	mq_freep;	/* free message's head pointer */
102 	uint64_t	mq_headpp;	/* pointer to head pointers */
103 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
104 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
105 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
106 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
107 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
108 	sem_t		mq_rblocked;	/* number of processes rblocked */
109 	sem_t		mq_notfull;	/* mq_send()'s block on this */
110 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
111 	sem_t		mq_spawner;	/* spawner thread blocks on this */
112 } mqhdr_t;
113 
114 /*
115  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
116  * If this assumption is somehow invalidated, mq_open() needs to be changed
117  * back to the old version which kept a count and enforced a limit.
118  * We make sure that this is pointed out to those changing <sys/param.h>
119  * by checking _MQ_OPEN_MAX at compile time.
120  */
121 #if _MQ_OPEN_MAX != -1
122 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
123 #endif
124 
125 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
126 
127 #ifdef DEBUG
128 #define	MQ_ASSERT(x)	assert(x);
129 
130 #define	MQ_ASSERT_PTR(_m, _p) \
131 	assert((_p) != 0 && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
132 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
133 	    _m->mq_totsize));
134 
135 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
136 	int _val; \
137 	(void) sem_getvalue((sem), &_val); \
138 	assert((_val) <= val); }
139 #else
140 #define	MQ_ASSERT(x)
141 #define	MQ_ASSERT_PTR(_m, _p)
142 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
143 #endif
144 
145 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
146 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
147 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
148 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
149 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
150 
151 #define	MQ_RESERVED	((mqdes_t *)-1)
152 
153 #define	ABS_TIME	0
154 #define	REL_TIME	1
155 
156 static mutex_t mq_list_lock = DEFAULTMUTEX;
157 static mqdes_t *mq_list = NULL;
158 
159 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
160 
161 static int
mq_is_valid(mqdes_t * mqdp)162 mq_is_valid(mqdes_t *mqdp)
163 {
164 	/*
165 	 * Any use of a message queue after it was closed is
166 	 * undefined.  But the standard strongly favours EBADF
167 	 * returns.  Before we dereference which could be fatal,
168 	 * we first do some pointer sanity checks.
169 	 */
170 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
171 	    ((uintptr_t)mqdp & 0x7) == 0) {
172 		return (mqdp->mqd_magic == MQ_MAGIC);
173 	}
174 
175 	return (0);
176 }
177 
178 static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)179 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
180 {
181 	int		i;
182 	uint64_t	temp;
183 	uint64_t	currentp;
184 	uint64_t	nextp;
185 
186 	/*
187 	 * We only need to initialize the non-zero fields.  The use of
188 	 * ftruncate() on the message queue file assures that the
189 	 * pages will be zero-filled.
190 	 */
191 	(void) mutex_init(&mqhp->mq_exclusive,
192 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
193 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
194 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
195 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
196 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
197 
198 	mqhp->mq_maxsz = msgsize;
199 	mqhp->mq_maxmsg = maxmsg;
200 
201 	/*
202 	 * As of this writing (1997), there are 32 message queue priorities.
203 	 * If this is to change, then the size of the mq_mask will
204 	 * also have to change.  If DEBUG is defined, assert that
205 	 * _MQ_PRIO_MAX hasn't changed.
206 	 */
207 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
208 #if defined(DEBUG)
209 	/* LINTED always true */
210 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
211 #endif
212 
213 	/*
214 	 * Since the message queue can be mapped into different
215 	 * virtual address ranges by different processes, we don't
216 	 * keep track of pointers, only offsets into the shared region.
217 	 */
218 	mqhp->mq_headpp = sizeof (mqhdr_t);
219 	mqhp->mq_tailpp = mqhp->mq_headpp +
220 	    mqhp->mq_maxprio * sizeof (uint64_t);
221 	mqhp->mq_freep = mqhp->mq_tailpp +
222 	    mqhp->mq_maxprio * sizeof (uint64_t);
223 
224 	currentp = mqhp->mq_freep;
225 	MQ_PTR(mqhp, currentp)->msg_next = 0;
226 
227 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
228 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
229 		nextp = currentp + sizeof (msghdr_t) + temp;
230 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
231 		MQ_PTR(mqhp, nextp)->msg_next = 0;
232 		currentp = nextp;
233 	}
234 }
235 
236 static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)237 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
238 {
239 	uint64_t currentp;
240 	msghdr_t *curbuf;
241 	uint64_t *headpp;
242 	uint64_t *tailpp;
243 
244 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
245 
246 	/*
247 	 * Get the head and tail pointers for the queue of maximum
248 	 * priority.  We shouldn't be here unless there is a message for
249 	 * us, so it's fair to assert that both the head and tail
250 	 * pointers are non-NULL.
251 	 */
252 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
253 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
254 
255 	if (msg_prio != NULL)
256 		*msg_prio = mqhp->mq_curmaxprio;
257 
258 	currentp = *headpp;
259 	MQ_ASSERT_PTR(mqhp, currentp);
260 	curbuf = MQ_PTR(mqhp, currentp);
261 
262 	if ((*headpp = curbuf->msg_next) == 0) {
263 		/*
264 		 * We just nuked the last message in this priority's queue.
265 		 * Twiddle this priority's bit, and then find the next bit
266 		 * tipped.
267 		 */
268 		uint_t prio = mqhp->mq_curmaxprio;
269 
270 		mqhp->mq_mask &= ~(1u << prio);
271 
272 		for (; prio != 0; prio--)
273 			if (mqhp->mq_mask & (1u << prio))
274 				break;
275 		mqhp->mq_curmaxprio = prio;
276 
277 		*tailpp = 0;
278 	}
279 
280 	/*
281 	 * Copy the message, and put the buffer back on the free list.
282 	 */
283 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
284 	curbuf->msg_next = mqhp->mq_freep;
285 	mqhp->mq_freep = currentp;
286 
287 	return (curbuf->msg_len);
288 }
289 
290 
291 static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)292 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
293 {
294 	uint64_t currentp;
295 	msghdr_t *curbuf;
296 	uint64_t *headpp;
297 	uint64_t *tailpp;
298 
299 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
300 
301 	/*
302 	 * Grab a free message block, and link it in.  We shouldn't
303 	 * be here unless there is room in the queue for us;  it's
304 	 * fair to assert that the free pointer is non-NULL.
305 	 */
306 	currentp = mqhp->mq_freep;
307 	MQ_ASSERT_PTR(mqhp, currentp);
308 	curbuf = MQ_PTR(mqhp, currentp);
309 
310 	/*
311 	 * Remove a message from the free list, and copy in the new contents.
312 	 */
313 	mqhp->mq_freep = curbuf->msg_next;
314 	curbuf->msg_next = 0;
315 	(void) memcpy((char *)&curbuf[1], msgp, len);
316 	curbuf->msg_len = len;
317 
318 	headpp = HEAD_PTR(mqhp, prio);
319 	tailpp = TAIL_PTR(mqhp, prio);
320 
321 	if (*tailpp == 0) {
322 		/*
323 		 * This is the first message on this queue.  Set the
324 		 * head and tail pointers, and tip the appropriate bit
325 		 * in the priority mask.
326 		 */
327 		*headpp = currentp;
328 		*tailpp = currentp;
329 		mqhp->mq_mask |= (1u << prio);
330 		if (prio > mqhp->mq_curmaxprio)
331 			mqhp->mq_curmaxprio = prio;
332 	} else {
333 		MQ_ASSERT_PTR(mqhp, *tailpp);
334 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
335 		*tailpp = currentp;
336 	}
337 }
338 
339 /*
340  * Send a notification and also delete the registration.
341  */
342 static void
do_notify(mqhdr_t * mqhp)343 do_notify(mqhdr_t *mqhp)
344 {
345 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
346 	if (mqhp->mq_ntype == SIGEV_THREAD ||
347 	    mqhp->mq_ntype == SIGEV_PORT)
348 		(void) sem_post(&mqhp->mq_spawner);
349 	mqhp->mq_ntype = 0;
350 	mqhp->mq_des = 0;
351 }
352 
353 /*
354  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
355  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
356  * they fail with errno == EBADMSG.  Trigger any registered notification.
357  */
358 static void
owner_dead(mqdes_t * mqdp,int error)359 owner_dead(mqdes_t *mqdp, int error)
360 {
361 	mqhdr_t *mqhp = mqdp->mqd_mq;
362 
363 	mqdp->mqd_ownerdead = 1;
364 	(void) sem_post(&mqhp->mq_notfull);
365 	(void) sem_post(&mqhp->mq_notempty);
366 	if (error == EOWNERDEAD) {
367 		if (mqhp->mq_sigid.sn_pid != 0)
368 			do_notify(mqhp);
369 		(void) mutex_unlock(&mqhp->mq_exclusive);
370 	}
371 	errno = EBADMSG;
372 }
373 
374 mqd_t
mq_open(const char * path,int oflag,...)375 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
376 {
377 	va_list		ap;
378 	mode_t		mode = 0;
379 	struct mq_attr	*attr = NULL;
380 	int		fd;
381 	int		err;
382 	int		cr_flag = 0;
383 	int		locked = 0;
384 	uint64_t	total_size;
385 	size_t		msgsize;
386 	ssize_t		maxmsg;
387 	uint64_t	temp;
388 	void		*ptr;
389 	mqdes_t		*mqdp = NULL;
390 	mqhdr_t		*mqhp;
391 	struct mq_dn	*mqdnp;
392 
393 	if (__pos4obj_check(path) == -1)
394 		return ((mqd_t)-1);
395 
396 	/* acquire MSGQ lock to have atomic operation */
397 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
398 		goto out;
399 	locked = 1;
400 
401 	va_start(ap, oflag);
402 	/* filter oflag to have READ/WRITE/CREATE modes only */
403 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
404 	if ((oflag & O_CREAT) != 0) {
405 		mode = va_arg(ap, mode_t);
406 		attr = va_arg(ap, struct mq_attr *);
407 	}
408 	va_end(ap);
409 
410 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
411 	    mode, &cr_flag)) < 0)
412 		goto out;
413 
414 	/* closing permission file */
415 	(void) __close_nc(fd);
416 
417 	/* Try to open/create data file */
418 	if (cr_flag) {
419 		cr_flag = PFILE_CREATE;
420 		if (attr == NULL) {
421 			maxmsg = MQ_MAXMSG;
422 			msgsize = MQ_MAXSIZE;
423 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
424 			errno = EINVAL;
425 			goto out;
426 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
427 			errno = ENOSPC;
428 			goto out;
429 		} else {
430 			maxmsg = attr->mq_maxmsg;
431 			msgsize = attr->mq_msgsize;
432 		}
433 
434 		/* adjust for message size at word boundary */
435 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
436 
437 		total_size = sizeof (mqhdr_t) +
438 		    maxmsg * (temp + sizeof (msghdr_t)) +
439 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
440 
441 		if (total_size > SSIZE_MAX) {
442 			errno = ENOSPC;
443 			goto out;
444 		}
445 
446 		/*
447 		 * data file is opened with read/write to those
448 		 * who have read or write permission
449 		 */
450 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
451 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
452 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
453 			goto out;
454 
455 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
456 
457 		/* force permissions to avoid umask effect */
458 		if (fchmod(fd, mode) < 0)
459 			goto out;
460 
461 		if (ftruncate64(fd, (off64_t)total_size) < 0)
462 			goto out;
463 	} else {
464 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
465 		    O_RDWR, 0666, &err)) < 0)
466 			goto out;
467 		cr_flag = DFILE_OPEN;
468 
469 		/* Message queue has not been initialized yet */
470 		if (read(fd, &total_size, sizeof (total_size)) !=
471 		    sizeof (total_size) || total_size == 0) {
472 			errno = ENOENT;
473 			goto out;
474 		}
475 
476 		/* Message queue too big for this process to handle */
477 		if (total_size > SSIZE_MAX) {
478 			errno = EFBIG;
479 			goto out;
480 		}
481 	}
482 
483 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
484 		errno = ENOMEM;
485 		goto out;
486 	}
487 
488 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 		goto out;
491 	mqhp = ptr;
492 	cr_flag |= DFILE_MMAP;
493 
494 	/* closing data file */
495 	(void) __close_nc(fd);
496 	cr_flag &= ~DFILE_OPEN;
497 
498 	/*
499 	 * create, unlink, size, mmap, and close description file
500 	 * all for a flag word in anonymous shared memory
501 	 */
502 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503 	    0666, &err)) < 0)
504 		goto out;
505 	cr_flag |= DFILE_OPEN;
506 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508 		goto out;
509 
510 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512 		goto out;
513 	mqdnp = ptr;
514 	cr_flag |= MQDNP_MMAP;
515 
516 	(void) __close_nc(fd);
517 	cr_flag &= ~DFILE_OPEN;
518 
519 	/*
520 	 * we follow the same strategy as filesystem open() routine,
521 	 * where fcntl.h flags are changed to flags defined in file.h.
522 	 */
523 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
524 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
525 
526 	/* new message queue requires initialization */
527 	if ((cr_flag & DFILE_CREATE) != 0) {
528 		/* message queue header has to be initialized */
529 		mq_init(mqhp, msgsize, maxmsg);
530 		mqhp->mq_totsize = total_size;
531 	}
532 	mqdp->mqd_mq = mqhp;
533 	mqdp->mqd_mqdn = mqdnp;
534 	mqdp->mqd_magic = MQ_MAGIC;
535 	mqdp->mqd_tcd = NULL;
536 	mqdp->mqd_ownerdead = 0;
537 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
538 		lmutex_lock(&mq_list_lock);
539 		mqdp->mqd_next = mq_list;
540 		mqdp->mqd_prev = NULL;
541 		if (mq_list)
542 			mq_list->mqd_prev = mqdp;
543 		mq_list = mqdp;
544 		lmutex_unlock(&mq_list_lock);
545 		return ((mqd_t)mqdp);
546 	}
547 
548 	locked = 0;	/* fall into the error case */
549 out:
550 	err = errno;
551 	if ((cr_flag & DFILE_OPEN) != 0)
552 		(void) __close_nc(fd);
553 	if ((cr_flag & DFILE_CREATE) != 0)
554 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
555 	if ((cr_flag & PFILE_CREATE) != 0)
556 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
557 	free(mqdp);
558 	if ((cr_flag & DFILE_MMAP) != 0)
559 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
560 	if ((cr_flag & MQDNP_MMAP) != 0)
561 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
562 	if (locked)
563 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
564 	errno = err;
565 	return ((mqd_t)-1);
566 }
567 
568 static void
mq_close_cleanup(mqdes_t * mqdp)569 mq_close_cleanup(mqdes_t *mqdp)
570 {
571 	mqhdr_t *mqhp = mqdp->mqd_mq;
572 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
573 
574 	/* invalidate the descriptor before freeing it */
575 	mqdp->mqd_magic = 0;
576 	if (!mqdp->mqd_ownerdead)
577 		(void) mutex_unlock(&mqhp->mq_exclusive);
578 
579 	lmutex_lock(&mq_list_lock);
580 	if (mqdp->mqd_next)
581 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
582 	if (mqdp->mqd_prev)
583 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
584 	if (mq_list == mqdp)
585 		mq_list = mqdp->mqd_next;
586 	lmutex_unlock(&mq_list_lock);
587 
588 	free(mqdp);
589 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
590 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
591 }
592 
593 int
mq_close(mqd_t mqdes)594 mq_close(mqd_t mqdes)
595 {
596 	mqdes_t *mqdp = (mqdes_t *)mqdes;
597 	mqhdr_t *mqhp;
598 	thread_communication_data_t *tcdp;
599 	int error;
600 
601 	if (!mq_is_valid(mqdp)) {
602 		errno = EBADF;
603 		return (-1);
604 	}
605 
606 	mqhp = mqdp->mqd_mq;
607 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
608 		mqdp->mqd_ownerdead = 1;
609 		if (error == EOWNERDEAD)
610 			(void) mutex_unlock(&mqhp->mq_exclusive);
611 		/* carry on regardless, without holding mq_exclusive */
612 	}
613 
614 	if (mqhp->mq_des == (uintptr_t)mqdp &&
615 	    mqhp->mq_sigid.sn_pid == getpid()) {
616 		/* notification is set for this descriptor, remove it */
617 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
618 		mqhp->mq_ntype = 0;
619 		mqhp->mq_des = 0;
620 	}
621 
622 	pthread_cleanup_push(mq_close_cleanup, mqdp);
623 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
624 		mqdp->mqd_tcd = NULL;
625 		del_sigev_mq(tcdp);	/* possible cancellation point */
626 	}
627 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
628 
629 	return (0);
630 }
631 
632 int
mq_unlink(const char * path)633 mq_unlink(const char *path)
634 {
635 	int err;
636 
637 	if (__pos4obj_check(path) < 0)
638 		return (-1);
639 
640 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
641 		return (-1);
642 	}
643 
644 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
645 
646 	if (err == 0 || (err == -1 && errno == EEXIST)) {
647 		errno = 0;
648 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
649 	}
650 
651 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
652 		return (-1);
653 
654 	return (err);
655 
656 }
657 
658 static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)659 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
660     uint_t msg_prio, const timespec_t *timeout, int abs_rel)
661 {
662 	mqdes_t *mqdp = (mqdes_t *)mqdes;
663 	mqhdr_t *mqhp;
664 	int err;
665 	int notify = 0;
666 
667 	/*
668 	 * sem_*wait() does cancellation, if called.
669 	 * pthread_testcancel() ensures that cancellation takes place if
670 	 * there is a cancellation pending when mq_*send() is called.
671 	 */
672 	pthread_testcancel();
673 
674 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
675 		errno = EBADF;
676 		return (-1);
677 	}
678 
679 	mqhp = mqdp->mqd_mq;
680 
681 	if (msg_prio >= mqhp->mq_maxprio) {
682 		errno = EINVAL;
683 		return (-1);
684 	}
685 	if (msg_len > mqhp->mq_maxsz) {
686 		errno = EMSGSIZE;
687 		return (-1);
688 	}
689 
690 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
691 		err = sem_trywait(&mqhp->mq_notfull);
692 	else {
693 		/*
694 		 * We might get cancelled here...
695 		 */
696 		if (timeout == NULL)
697 			err = sem_wait(&mqhp->mq_notfull);
698 		else if (abs_rel == ABS_TIME)
699 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
700 		else
701 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
702 	}
703 	if (err == -1) {
704 		/*
705 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
706 		 * by sem_*wait(), so we can just return.
707 		 */
708 		return (-1);
709 	}
710 
711 	/*
712 	 * By the time we're here, we know that we've got the capacity
713 	 * to add to the queue...now acquire the exclusive lock.
714 	 */
715 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
716 		owner_dead(mqdp, err);
717 		return (-1);
718 	}
719 
720 	/*
721 	 * Now determine if we want to kick the notification.  POSIX
722 	 * requires that if a process has registered for notification,
723 	 * we must kick it when the queue makes an empty to non-empty
724 	 * transition, and there are no blocked receivers.  Note that
725 	 * this mechanism does _not_ guarantee that the kicked process
726 	 * will be able to receive a message without blocking;
727 	 * another receiver could intervene in the meantime.  Thus,
728 	 * the notification mechanism is inherently racy; all we can
729 	 * do is hope to minimize the window as much as possible.
730 	 * In general, we want to avoid kicking the notification when
731 	 * there are clearly receivers blocked.  We'll determine if
732 	 * we want to kick the notification before the mq_putmsg(),
733 	 * but the actual signotify() won't be done until the message
734 	 * is on the queue.
735 	 */
736 	if (mqhp->mq_sigid.sn_pid != 0) {
737 		int nmessages, nblocked;
738 
739 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
740 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
741 
742 		if (nmessages == 0 && nblocked == 0)
743 			notify = 1;
744 	}
745 
746 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
747 	(void) sem_post(&mqhp->mq_notempty);
748 
749 	if (notify) {
750 		/* notify and also delete the registration */
751 		do_notify(mqhp);
752 	}
753 
754 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
755 	(void) mutex_unlock(&mqhp->mq_exclusive);
756 
757 	return (0);
758 }
759 
760 int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)761 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
762 {
763 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
764 	    NULL, ABS_TIME));
765 }
766 
767 int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)768 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
769     uint_t msg_prio, const timespec_t *abs_timeout)
770 {
771 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
772 	    abs_timeout, ABS_TIME));
773 }
774 
775 int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)776 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
777     uint_t msg_prio, const timespec_t *rel_timeout)
778 {
779 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
780 	    rel_timeout, REL_TIME));
781 }
782 
783 static void
decrement_rblocked(mqhdr_t * mqhp)784 decrement_rblocked(mqhdr_t *mqhp)
785 {
786 	int cancel_state;
787 
788 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
789 	while (sem_wait(&mqhp->mq_rblocked) == -1)
790 		continue;
791 	(void) pthread_setcancelstate(cancel_state, NULL);
792 }
793 
794 static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)795 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
796     uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
797 {
798 	mqdes_t *mqdp = (mqdes_t *)mqdes;
799 	mqhdr_t *mqhp;
800 	ssize_t	msg_size;
801 	int err;
802 
803 	/*
804 	 * sem_*wait() does cancellation, if called.
805 	 * pthread_testcancel() ensures that cancellation takes place if
806 	 * there is a cancellation pending when mq_*receive() is called.
807 	 */
808 	pthread_testcancel();
809 
810 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
811 		errno = EBADF;
812 		return (ssize_t)(-1);
813 	}
814 
815 	mqhp = mqdp->mqd_mq;
816 
817 	if (msg_len < mqhp->mq_maxsz) {
818 		errno = EMSGSIZE;
819 		return (ssize_t)(-1);
820 	}
821 
822 	/*
823 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
824 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
825 	 * we try to take the common case and do a sem_trywait().
826 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
827 	 * then note that we're going to sleep by incrementing the rblocked
828 	 * semaphore.  We decrement that semaphore after waking up.
829 	 */
830 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
831 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
832 			/*
833 			 * errno has been set to EAGAIN or EINTR by
834 			 * sem_trywait(), so we can just return.
835 			 */
836 			return (-1);
837 		}
838 		/*
839 		 * If we're here, then we're probably going to block...
840 		 * increment the rblocked semaphore.  If we get
841 		 * cancelled, decrement_rblocked() will decrement it.
842 		 */
843 		(void) sem_post(&mqhp->mq_rblocked);
844 
845 		pthread_cleanup_push(decrement_rblocked, mqhp);
846 		if (timeout == NULL)
847 			err = sem_wait(&mqhp->mq_notempty);
848 		else if (abs_rel == ABS_TIME)
849 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
850 		else
851 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
852 		pthread_cleanup_pop(1);
853 
854 		if (err == -1) {
855 			/*
856 			 * We took a signal or timeout while waiting
857 			 * on mq_notempty...
858 			 */
859 			return (-1);
860 		}
861 	}
862 
863 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
864 		owner_dead(mqdp, err);
865 		return (-1);
866 	}
867 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
868 	(void) sem_post(&mqhp->mq_notfull);
869 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
870 	(void) mutex_unlock(&mqhp->mq_exclusive);
871 
872 	return (msg_size);
873 }
874 
875 ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)876 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
877 {
878 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
879 	    NULL, ABS_TIME));
880 }
881 
882 ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)883 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
884     uint_t *msg_prio, const timespec_t *abs_timeout)
885 {
886 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
887 	    abs_timeout, ABS_TIME));
888 }
889 
890 ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)891 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
892     uint_t *msg_prio, const timespec_t *rel_timeout)
893 {
894 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
895 	    rel_timeout, REL_TIME));
896 }
897 
898 /*
899  * Only used below, in mq_notify().
900  * We already have a spawner thread.
901  * Verify that the attributes match; cancel it if necessary.
902  */
903 static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)904 cancel_if_necessary(thread_communication_data_t *tcdp,
905     const struct sigevent *sigevp)
906 {
907 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
908 	    sigevp->sigev_notify_attributes);
909 
910 	if (do_cancel) {
911 		/*
912 		 * Attributes don't match, cancel the spawner thread.
913 		 */
914 		(void) pthread_cancel(tcdp->tcd_server_id);
915 	} else {
916 		/*
917 		 * Reuse the existing spawner thread with possibly
918 		 * changed notification function and value.
919 		 */
920 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
921 		tcdp->tcd_notif.sigev_signo = 0;
922 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
923 		tcdp->tcd_notif.sigev_notify_function =
924 		    sigevp->sigev_notify_function;
925 	}
926 
927 	return (do_cancel);
928 }
929 
930 int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)931 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
932 {
933 	mqdes_t *mqdp = (mqdes_t *)mqdes;
934 	mqhdr_t *mqhp;
935 	thread_communication_data_t *tcdp;
936 	siginfo_t mq_siginfo;
937 	struct sigevent sigevent;
938 	struct stat64 statb;
939 	port_notify_t *pn;
940 	void *userval;
941 	int rval = -1;
942 	int ntype;
943 	int port;
944 	int error;
945 
946 	if (!mq_is_valid(mqdp)) {
947 		errno = EBADF;
948 		return (-1);
949 	}
950 
951 	mqhp = mqdp->mqd_mq;
952 
953 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
954 		mqdp->mqd_ownerdead = 1;
955 		sigevp = NULL;
956 		if (error == EOWNERDEAD)
957 			(void) mutex_unlock(&mqhp->mq_exclusive);
958 		/* carry on regardless, without holding mq_exclusive */
959 	}
960 
961 	if (sigevp == NULL) {		/* remove notification */
962 		if (mqhp->mq_des == (uintptr_t)mqdp &&
963 		    mqhp->mq_sigid.sn_pid == getpid()) {
964 			/* notification is set for this descriptor, remove it */
965 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
966 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
967 				sig_mutex_lock(&tcdp->tcd_lock);
968 				if (tcdp->tcd_msg_enabled) {
969 					/* cancel the spawner thread */
970 					tcdp = mqdp->mqd_tcd;
971 					mqdp->mqd_tcd = NULL;
972 					(void) pthread_cancel(
973 					    tcdp->tcd_server_id);
974 				}
975 				sig_mutex_unlock(&tcdp->tcd_lock);
976 			}
977 			mqhp->mq_ntype = 0;
978 			mqhp->mq_des = 0;
979 		} else {
980 			/* notification is not set for this descriptor */
981 			errno = EBUSY;
982 			goto bad;
983 		}
984 	} else {		/* register notification with this process */
985 		switch (ntype = sigevp->sigev_notify) {
986 		case SIGEV_THREAD:
987 			userval = sigevp->sigev_value.sival_ptr;
988 			port = -1;
989 			break;
990 		case SIGEV_PORT:
991 			pn = sigevp->sigev_value.sival_ptr;
992 			userval = pn->portnfy_user;
993 			port = pn->portnfy_port;
994 			if (fstat64(port, &statb) != 0 ||
995 			    !S_ISPORT(statb.st_mode)) {
996 				errno = EBADF;
997 				goto bad;
998 			}
999 			(void) memset(&sigevent, 0, sizeof (sigevent));
1000 			sigevent.sigev_notify = SIGEV_PORT;
1001 			sigevp = &sigevent;
1002 			break;
1003 		}
1004 		switch (ntype) {
1005 		case SIGEV_NONE:
1006 			mq_siginfo.si_signo = 0;
1007 			mq_siginfo.si_code = SI_MESGQ;
1008 			break;
1009 		case SIGEV_SIGNAL:
1010 			mq_siginfo.si_signo = sigevp->sigev_signo;
1011 			mq_siginfo.si_value = sigevp->sigev_value;
1012 			mq_siginfo.si_code = SI_MESGQ;
1013 			break;
1014 		case SIGEV_THREAD:
1015 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
1016 			    cancel_if_necessary(tcdp, sigevp))
1017 				mqdp->mqd_tcd = NULL;
1018 			/* FALLTHROUGH */
1019 		case SIGEV_PORT:
1020 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
1021 				/* we must create a spawner thread */
1022 				tcdp = setup_sigev_handler(sigevp, MQ);
1023 				if (tcdp == NULL) {
1024 					errno = EBADF;
1025 					goto bad;
1026 				}
1027 				tcdp->tcd_msg_enabled = 0;
1028 				tcdp->tcd_msg_closing = 0;
1029 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1030 				if (launch_spawner(tcdp) != 0) {
1031 					free_sigev_handler(tcdp);
1032 					goto bad;
1033 				}
1034 				mqdp->mqd_tcd = tcdp;
1035 			}
1036 			mq_siginfo.si_signo = 0;
1037 			mq_siginfo.si_code = SI_MESGQ;
1038 			break;
1039 		default:
1040 			errno = EINVAL;
1041 			goto bad;
1042 		}
1043 
1044 		/* register notification */
1045 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1046 			goto bad;
1047 		mqhp->mq_ntype = ntype;
1048 		mqhp->mq_des = (uintptr_t)mqdp;
1049 		switch (ntype) {
1050 		case SIGEV_THREAD:
1051 		case SIGEV_PORT:
1052 			tcdp->tcd_port = port;
1053 			tcdp->tcd_msg_object = mqdp;
1054 			tcdp->tcd_msg_userval = userval;
1055 			sig_mutex_lock(&tcdp->tcd_lock);
1056 			tcdp->tcd_msg_enabled = ntype;
1057 			sig_mutex_unlock(&tcdp->tcd_lock);
1058 			(void) cond_broadcast(&tcdp->tcd_cv);
1059 			break;
1060 		}
1061 	}
1062 
1063 	rval = 0;	/* success */
1064 bad:
1065 	if (error == 0) {
1066 		(void) mutex_unlock(&mqhp->mq_exclusive);
1067 	} else {
1068 		errno = EBADMSG;
1069 		rval = -1;
1070 	}
1071 	return (rval);
1072 }
1073 
1074 int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)1075 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1076 {
1077 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1078 	mqhdr_t *mqhp;
1079 	uint_t	flag = 0;
1080 
1081 	if (!mq_is_valid(mqdp)) {
1082 		errno = EBADF;
1083 		return (-1);
1084 	}
1085 
1086 	/* store current attributes */
1087 	if (omqstat != NULL) {
1088 		int	count;
1089 
1090 		mqhp = mqdp->mqd_mq;
1091 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1092 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1093 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1094 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1095 		omqstat->mq_curmsgs = count;
1096 	}
1097 
1098 	/* set description attributes */
1099 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1100 		flag = FNONBLOCK;
1101 	mqdp->mqd_mqdn->mqdn_flags = flag;
1102 
1103 	return (0);
1104 }
1105 
1106 int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)1107 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1108 {
1109 	mqdes_t *mqdp = (mqdes_t *)mqdes;
1110 	mqhdr_t *mqhp;
1111 	int count;
1112 
1113 	if (!mq_is_valid(mqdp)) {
1114 		errno = EBADF;
1115 		return (-1);
1116 	}
1117 
1118 	mqhp = mqdp->mqd_mq;
1119 
1120 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1121 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1122 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1123 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1124 	mqstat->mq_curmsgs = count;
1125 	return (0);
1126 }
1127 
1128 /*
1129  * Cleanup after fork1() in the child process.
1130  */
1131 void
postfork1_child_sigev_mq(void)1132 postfork1_child_sigev_mq(void)
1133 {
1134 	thread_communication_data_t *tcdp;
1135 	mqdes_t *mqdp;
1136 
1137 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1138 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1139 			mqdp->mqd_tcd = NULL;
1140 			tcd_teardown(tcdp);
1141 		}
1142 	}
1143 }
1144