1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 * Copyright 2025 MNX Cloud, Inc.
26 */
27
28 #include "lint.h"
29 #include "mtlib.h"
30 #define _KMEMUSER
31 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
32 #undef _KMEMUSER
33 #include <mqueue.h>
34 #include <sys/types.h>
35 #include <sys/file.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <stdarg.h>
39 #include <limits.h>
40 #include <pthread.h>
41 #include <assert.h>
42 #include <string.h>
43 #include <unistd.h>
44 #include <stdlib.h>
45 #include <sys/stat.h>
46 #include <inttypes.h>
47 #include "sigev_thread.h"
48 #include "pos4obj.h"
49
50 /*
51 * Default values per message queue
52 */
53 #define MQ_MAXMSG 128
54 #define MQ_MAXSIZE 1024
55
56 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
57
58 /*
59 * Message header which is part of messages in link list
60 */
61 typedef struct {
62 uint64_t msg_next; /* offset of next message in the link */
63 uint64_t msg_len; /* length of the message */
64 } msghdr_t;
65
66 /*
67 * message queue description
68 */
69 struct mq_dn {
70 size_t mqdn_flags; /* open description flags */
71 };
72
73 /*
74 * message queue descriptor structure
75 */
76 typedef struct mq_des {
77 struct mq_des *mqd_next; /* list of all open mq descriptors, */
78 struct mq_des *mqd_prev; /* needed for fork-safety */
79 int mqd_magic; /* magic # to identify mq_des */
80 int mqd_flags; /* operation flag per open */
81 struct mq_header *mqd_mq; /* address pointer of message Q */
82 struct mq_dn *mqd_mqdn; /* open description */
83 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
84 int mqd_ownerdead; /* mq_exclusive is inconsistent */
85 } mqdes_t;
86
87 /*
88 * message queue common header, part of the mmap()ed file.
89 * Since message queues may be shared between 32- and 64-bit processes,
90 * care must be taken to make sure that the elements of this structure
91 * are identical for both _LP64 and _ILP32 cases.
92 */
93 typedef struct mq_header {
94 /* first field must be mq_totsize, DO NOT insert before this */
95 int64_t mq_totsize; /* total size of the Queue */
96 int64_t mq_maxsz; /* max size of each message */
97 uint32_t mq_maxmsg; /* max messages in the queue */
98 uint32_t mq_maxprio; /* maximum mqueue priority */
99 uint32_t mq_curmaxprio; /* current maximum MQ priority */
100 uint32_t mq_mask; /* priority bitmask */
101 uint64_t mq_freep; /* free message's head pointer */
102 uint64_t mq_headpp; /* pointer to head pointers */
103 uint64_t mq_tailpp; /* pointer to tail pointers */
104 signotify_id_t mq_sigid; /* notification id (3 int's) */
105 uint32_t mq_ntype; /* notification type (SIGEV_*) */
106 uint64_t mq_des; /* pointer to msg Q descriptor */
107 mutex_t mq_exclusive; /* acquire for exclusive access */
108 sem_t mq_rblocked; /* number of processes rblocked */
109 sem_t mq_notfull; /* mq_send()'s block on this */
110 sem_t mq_notempty; /* mq_receive()'s block on this */
111 sem_t mq_spawner; /* spawner thread blocks on this */
112 } mqhdr_t;
113
114 /*
115 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
116 * If this assumption is somehow invalidated, mq_open() needs to be changed
117 * back to the old version which kept a count and enforced a limit.
118 * We make sure that this is pointed out to those changing <sys/param.h>
119 * by checking _MQ_OPEN_MAX at compile time.
120 */
121 #if _MQ_OPEN_MAX != -1
122 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
123 #endif
124
125 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
126
127 #ifdef DEBUG
128 #define MQ_ASSERT(x) assert(x);
129
130 #define MQ_ASSERT_PTR(_m, _p) \
131 assert((_p) != 0 && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
132 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
133 _m->mq_totsize));
134
135 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
136 int _val; \
137 (void) sem_getvalue((sem), &_val); \
138 assert((_val) <= val); }
139 #else
140 #define MQ_ASSERT(x)
141 #define MQ_ASSERT_PTR(_m, _p)
142 #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
143 #endif
144
145 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
146 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
147 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
148 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
149 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
150
151 #define MQ_RESERVED ((mqdes_t *)-1)
152
153 #define ABS_TIME 0
154 #define REL_TIME 1
155
156 static mutex_t mq_list_lock = DEFAULTMUTEX;
157 static mqdes_t *mq_list = NULL;
158
159 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
160
161 static int
mq_is_valid(mqdes_t * mqdp)162 mq_is_valid(mqdes_t *mqdp)
163 {
164 /*
165 * Any use of a message queue after it was closed is
166 * undefined. But the standard strongly favours EBADF
167 * returns. Before we dereference which could be fatal,
168 * we first do some pointer sanity checks.
169 */
170 if (mqdp != NULL && mqdp != MQ_RESERVED &&
171 ((uintptr_t)mqdp & 0x7) == 0) {
172 return (mqdp->mqd_magic == MQ_MAGIC);
173 }
174
175 return (0);
176 }
177
178 static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)179 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
180 {
181 int i;
182 uint64_t temp;
183 uint64_t currentp;
184 uint64_t nextp;
185
186 /*
187 * We only need to initialize the non-zero fields. The use of
188 * ftruncate() on the message queue file assures that the
189 * pages will be zero-filled.
190 */
191 (void) mutex_init(&mqhp->mq_exclusive,
192 USYNC_PROCESS | LOCK_ROBUST, NULL);
193 (void) sem_init(&mqhp->mq_rblocked, 1, 0);
194 (void) sem_init(&mqhp->mq_notempty, 1, 0);
195 (void) sem_init(&mqhp->mq_spawner, 1, 0);
196 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
197
198 mqhp->mq_maxsz = msgsize;
199 mqhp->mq_maxmsg = maxmsg;
200
201 /*
202 * As of this writing (1997), there are 32 message queue priorities.
203 * If this is to change, then the size of the mq_mask will
204 * also have to change. If DEBUG is defined, assert that
205 * _MQ_PRIO_MAX hasn't changed.
206 */
207 mqhp->mq_maxprio = _MQ_PRIO_MAX;
208 #if defined(DEBUG)
209 /* LINTED always true */
210 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
211 #endif
212
213 /*
214 * Since the message queue can be mapped into different
215 * virtual address ranges by different processes, we don't
216 * keep track of pointers, only offsets into the shared region.
217 */
218 mqhp->mq_headpp = sizeof (mqhdr_t);
219 mqhp->mq_tailpp = mqhp->mq_headpp +
220 mqhp->mq_maxprio * sizeof (uint64_t);
221 mqhp->mq_freep = mqhp->mq_tailpp +
222 mqhp->mq_maxprio * sizeof (uint64_t);
223
224 currentp = mqhp->mq_freep;
225 MQ_PTR(mqhp, currentp)->msg_next = 0;
226
227 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
228 for (i = 1; i < mqhp->mq_maxmsg; i++) {
229 nextp = currentp + sizeof (msghdr_t) + temp;
230 MQ_PTR(mqhp, currentp)->msg_next = nextp;
231 MQ_PTR(mqhp, nextp)->msg_next = 0;
232 currentp = nextp;
233 }
234 }
235
236 static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)237 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
238 {
239 uint64_t currentp;
240 msghdr_t *curbuf;
241 uint64_t *headpp;
242 uint64_t *tailpp;
243
244 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
245
246 /*
247 * Get the head and tail pointers for the queue of maximum
248 * priority. We shouldn't be here unless there is a message for
249 * us, so it's fair to assert that both the head and tail
250 * pointers are non-NULL.
251 */
252 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
253 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
254
255 if (msg_prio != NULL)
256 *msg_prio = mqhp->mq_curmaxprio;
257
258 currentp = *headpp;
259 MQ_ASSERT_PTR(mqhp, currentp);
260 curbuf = MQ_PTR(mqhp, currentp);
261
262 if ((*headpp = curbuf->msg_next) == 0) {
263 /*
264 * We just nuked the last message in this priority's queue.
265 * Twiddle this priority's bit, and then find the next bit
266 * tipped.
267 */
268 uint_t prio = mqhp->mq_curmaxprio;
269
270 mqhp->mq_mask &= ~(1u << prio);
271
272 for (; prio != 0; prio--)
273 if (mqhp->mq_mask & (1u << prio))
274 break;
275 mqhp->mq_curmaxprio = prio;
276
277 *tailpp = 0;
278 }
279
280 /*
281 * Copy the message, and put the buffer back on the free list.
282 */
283 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
284 curbuf->msg_next = mqhp->mq_freep;
285 mqhp->mq_freep = currentp;
286
287 return (curbuf->msg_len);
288 }
289
290
291 static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)292 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
293 {
294 uint64_t currentp;
295 msghdr_t *curbuf;
296 uint64_t *headpp;
297 uint64_t *tailpp;
298
299 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
300
301 /*
302 * Grab a free message block, and link it in. We shouldn't
303 * be here unless there is room in the queue for us; it's
304 * fair to assert that the free pointer is non-NULL.
305 */
306 currentp = mqhp->mq_freep;
307 MQ_ASSERT_PTR(mqhp, currentp);
308 curbuf = MQ_PTR(mqhp, currentp);
309
310 /*
311 * Remove a message from the free list, and copy in the new contents.
312 */
313 mqhp->mq_freep = curbuf->msg_next;
314 curbuf->msg_next = 0;
315 (void) memcpy((char *)&curbuf[1], msgp, len);
316 curbuf->msg_len = len;
317
318 headpp = HEAD_PTR(mqhp, prio);
319 tailpp = TAIL_PTR(mqhp, prio);
320
321 if (*tailpp == 0) {
322 /*
323 * This is the first message on this queue. Set the
324 * head and tail pointers, and tip the appropriate bit
325 * in the priority mask.
326 */
327 *headpp = currentp;
328 *tailpp = currentp;
329 mqhp->mq_mask |= (1u << prio);
330 if (prio > mqhp->mq_curmaxprio)
331 mqhp->mq_curmaxprio = prio;
332 } else {
333 MQ_ASSERT_PTR(mqhp, *tailpp);
334 MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
335 *tailpp = currentp;
336 }
337 }
338
339 /*
340 * Send a notification and also delete the registration.
341 */
342 static void
do_notify(mqhdr_t * mqhp)343 do_notify(mqhdr_t *mqhp)
344 {
345 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
346 if (mqhp->mq_ntype == SIGEV_THREAD ||
347 mqhp->mq_ntype == SIGEV_PORT)
348 (void) sem_post(&mqhp->mq_spawner);
349 mqhp->mq_ntype = 0;
350 mqhp->mq_des = 0;
351 }
352
353 /*
354 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
355 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
356 * they fail with errno == EBADMSG. Trigger any registered notification.
357 */
358 static void
owner_dead(mqdes_t * mqdp,int error)359 owner_dead(mqdes_t *mqdp, int error)
360 {
361 mqhdr_t *mqhp = mqdp->mqd_mq;
362
363 mqdp->mqd_ownerdead = 1;
364 (void) sem_post(&mqhp->mq_notfull);
365 (void) sem_post(&mqhp->mq_notempty);
366 if (error == EOWNERDEAD) {
367 if (mqhp->mq_sigid.sn_pid != 0)
368 do_notify(mqhp);
369 (void) mutex_unlock(&mqhp->mq_exclusive);
370 }
371 errno = EBADMSG;
372 }
373
374 mqd_t
mq_open(const char * path,int oflag,...)375 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
376 {
377 va_list ap;
378 mode_t mode = 0;
379 struct mq_attr *attr = NULL;
380 int fd;
381 int err;
382 int cr_flag = 0;
383 int locked = 0;
384 uint64_t total_size;
385 size_t msgsize;
386 ssize_t maxmsg;
387 uint64_t temp;
388 void *ptr;
389 mqdes_t *mqdp = NULL;
390 mqhdr_t *mqhp;
391 struct mq_dn *mqdnp;
392
393 if (__pos4obj_check(path) == -1)
394 return ((mqd_t)-1);
395
396 /* acquire MSGQ lock to have atomic operation */
397 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
398 goto out;
399 locked = 1;
400
401 va_start(ap, oflag);
402 /* filter oflag to have READ/WRITE/CREATE modes only */
403 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
404 if ((oflag & O_CREAT) != 0) {
405 mode = va_arg(ap, mode_t);
406 attr = va_arg(ap, struct mq_attr *);
407 }
408 va_end(ap);
409
410 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
411 mode, &cr_flag)) < 0)
412 goto out;
413
414 /* closing permission file */
415 (void) __close_nc(fd);
416
417 /* Try to open/create data file */
418 if (cr_flag) {
419 cr_flag = PFILE_CREATE;
420 if (attr == NULL) {
421 maxmsg = MQ_MAXMSG;
422 msgsize = MQ_MAXSIZE;
423 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
424 errno = EINVAL;
425 goto out;
426 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
427 errno = ENOSPC;
428 goto out;
429 } else {
430 maxmsg = attr->mq_maxmsg;
431 msgsize = attr->mq_msgsize;
432 }
433
434 /* adjust for message size at word boundary */
435 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
436
437 total_size = sizeof (mqhdr_t) +
438 maxmsg * (temp + sizeof (msghdr_t)) +
439 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
440
441 if (total_size > SSIZE_MAX) {
442 errno = ENOSPC;
443 goto out;
444 }
445
446 /*
447 * data file is opened with read/write to those
448 * who have read or write permission
449 */
450 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
451 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
452 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
453 goto out;
454
455 cr_flag |= DFILE_CREATE | DFILE_OPEN;
456
457 /* force permissions to avoid umask effect */
458 if (fchmod(fd, mode) < 0)
459 goto out;
460
461 if (ftruncate64(fd, (off64_t)total_size) < 0)
462 goto out;
463 } else {
464 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
465 O_RDWR, 0666, &err)) < 0)
466 goto out;
467 cr_flag = DFILE_OPEN;
468
469 /* Message queue has not been initialized yet */
470 if (read(fd, &total_size, sizeof (total_size)) !=
471 sizeof (total_size) || total_size == 0) {
472 errno = ENOENT;
473 goto out;
474 }
475
476 /* Message queue too big for this process to handle */
477 if (total_size > SSIZE_MAX) {
478 errno = EFBIG;
479 goto out;
480 }
481 }
482
483 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
484 errno = ENOMEM;
485 goto out;
486 }
487
488 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 goto out;
491 mqhp = ptr;
492 cr_flag |= DFILE_MMAP;
493
494 /* closing data file */
495 (void) __close_nc(fd);
496 cr_flag &= ~DFILE_OPEN;
497
498 /*
499 * create, unlink, size, mmap, and close description file
500 * all for a flag word in anonymous shared memory
501 */
502 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503 0666, &err)) < 0)
504 goto out;
505 cr_flag |= DFILE_OPEN;
506 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508 goto out;
509
510 if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512 goto out;
513 mqdnp = ptr;
514 cr_flag |= MQDNP_MMAP;
515
516 (void) __close_nc(fd);
517 cr_flag &= ~DFILE_OPEN;
518
519 /*
520 * we follow the same strategy as filesystem open() routine,
521 * where fcntl.h flags are changed to flags defined in file.h.
522 */
523 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
524 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
525
526 /* new message queue requires initialization */
527 if ((cr_flag & DFILE_CREATE) != 0) {
528 /* message queue header has to be initialized */
529 mq_init(mqhp, msgsize, maxmsg);
530 mqhp->mq_totsize = total_size;
531 }
532 mqdp->mqd_mq = mqhp;
533 mqdp->mqd_mqdn = mqdnp;
534 mqdp->mqd_magic = MQ_MAGIC;
535 mqdp->mqd_tcd = NULL;
536 mqdp->mqd_ownerdead = 0;
537 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
538 lmutex_lock(&mq_list_lock);
539 mqdp->mqd_next = mq_list;
540 mqdp->mqd_prev = NULL;
541 if (mq_list)
542 mq_list->mqd_prev = mqdp;
543 mq_list = mqdp;
544 lmutex_unlock(&mq_list_lock);
545 return ((mqd_t)mqdp);
546 }
547
548 locked = 0; /* fall into the error case */
549 out:
550 err = errno;
551 if ((cr_flag & DFILE_OPEN) != 0)
552 (void) __close_nc(fd);
553 if ((cr_flag & DFILE_CREATE) != 0)
554 (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
555 if ((cr_flag & PFILE_CREATE) != 0)
556 (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
557 free(mqdp);
558 if ((cr_flag & DFILE_MMAP) != 0)
559 (void) munmap((caddr_t)mqhp, (size_t)total_size);
560 if ((cr_flag & MQDNP_MMAP) != 0)
561 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
562 if (locked)
563 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
564 errno = err;
565 return ((mqd_t)-1);
566 }
567
568 static void
mq_close_cleanup(mqdes_t * mqdp)569 mq_close_cleanup(mqdes_t *mqdp)
570 {
571 mqhdr_t *mqhp = mqdp->mqd_mq;
572 struct mq_dn *mqdnp = mqdp->mqd_mqdn;
573
574 /* invalidate the descriptor before freeing it */
575 mqdp->mqd_magic = 0;
576 if (!mqdp->mqd_ownerdead)
577 (void) mutex_unlock(&mqhp->mq_exclusive);
578
579 lmutex_lock(&mq_list_lock);
580 if (mqdp->mqd_next)
581 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
582 if (mqdp->mqd_prev)
583 mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
584 if (mq_list == mqdp)
585 mq_list = mqdp->mqd_next;
586 lmutex_unlock(&mq_list_lock);
587
588 free(mqdp);
589 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
590 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
591 }
592
593 int
mq_close(mqd_t mqdes)594 mq_close(mqd_t mqdes)
595 {
596 mqdes_t *mqdp = (mqdes_t *)mqdes;
597 mqhdr_t *mqhp;
598 thread_communication_data_t *tcdp;
599 int error;
600
601 if (!mq_is_valid(mqdp)) {
602 errno = EBADF;
603 return (-1);
604 }
605
606 mqhp = mqdp->mqd_mq;
607 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
608 mqdp->mqd_ownerdead = 1;
609 if (error == EOWNERDEAD)
610 (void) mutex_unlock(&mqhp->mq_exclusive);
611 /* carry on regardless, without holding mq_exclusive */
612 }
613
614 if (mqhp->mq_des == (uintptr_t)mqdp &&
615 mqhp->mq_sigid.sn_pid == getpid()) {
616 /* notification is set for this descriptor, remove it */
617 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
618 mqhp->mq_ntype = 0;
619 mqhp->mq_des = 0;
620 }
621
622 pthread_cleanup_push(mq_close_cleanup, mqdp);
623 if ((tcdp = mqdp->mqd_tcd) != NULL) {
624 mqdp->mqd_tcd = NULL;
625 del_sigev_mq(tcdp); /* possible cancellation point */
626 }
627 pthread_cleanup_pop(1); /* finish in the cleanup handler */
628
629 return (0);
630 }
631
632 int
mq_unlink(const char * path)633 mq_unlink(const char *path)
634 {
635 int err;
636
637 if (__pos4obj_check(path) < 0)
638 return (-1);
639
640 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
641 return (-1);
642 }
643
644 err = __pos4obj_unlink(path, MQ_PERM_TYPE);
645
646 if (err == 0 || (err == -1 && errno == EEXIST)) {
647 errno = 0;
648 err = __pos4obj_unlink(path, MQ_DATA_TYPE);
649 }
650
651 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
652 return (-1);
653
654 return (err);
655
656 }
657
658 static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)659 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
660 uint_t msg_prio, const timespec_t *timeout, int abs_rel)
661 {
662 mqdes_t *mqdp = (mqdes_t *)mqdes;
663 mqhdr_t *mqhp;
664 int err;
665 int notify = 0;
666
667 /*
668 * sem_*wait() does cancellation, if called.
669 * pthread_testcancel() ensures that cancellation takes place if
670 * there is a cancellation pending when mq_*send() is called.
671 */
672 pthread_testcancel();
673
674 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
675 errno = EBADF;
676 return (-1);
677 }
678
679 mqhp = mqdp->mqd_mq;
680
681 if (msg_prio >= mqhp->mq_maxprio) {
682 errno = EINVAL;
683 return (-1);
684 }
685 if (msg_len > mqhp->mq_maxsz) {
686 errno = EMSGSIZE;
687 return (-1);
688 }
689
690 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
691 err = sem_trywait(&mqhp->mq_notfull);
692 else {
693 /*
694 * We might get cancelled here...
695 */
696 if (timeout == NULL)
697 err = sem_wait(&mqhp->mq_notfull);
698 else if (abs_rel == ABS_TIME)
699 err = sem_timedwait(&mqhp->mq_notfull, timeout);
700 else
701 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
702 }
703 if (err == -1) {
704 /*
705 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
706 * by sem_*wait(), so we can just return.
707 */
708 return (-1);
709 }
710
711 /*
712 * By the time we're here, we know that we've got the capacity
713 * to add to the queue...now acquire the exclusive lock.
714 */
715 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
716 owner_dead(mqdp, err);
717 return (-1);
718 }
719
720 /*
721 * Now determine if we want to kick the notification. POSIX
722 * requires that if a process has registered for notification,
723 * we must kick it when the queue makes an empty to non-empty
724 * transition, and there are no blocked receivers. Note that
725 * this mechanism does _not_ guarantee that the kicked process
726 * will be able to receive a message without blocking;
727 * another receiver could intervene in the meantime. Thus,
728 * the notification mechanism is inherently racy; all we can
729 * do is hope to minimize the window as much as possible.
730 * In general, we want to avoid kicking the notification when
731 * there are clearly receivers blocked. We'll determine if
732 * we want to kick the notification before the mq_putmsg(),
733 * but the actual signotify() won't be done until the message
734 * is on the queue.
735 */
736 if (mqhp->mq_sigid.sn_pid != 0) {
737 int nmessages, nblocked;
738
739 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
740 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
741
742 if (nmessages == 0 && nblocked == 0)
743 notify = 1;
744 }
745
746 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
747 (void) sem_post(&mqhp->mq_notempty);
748
749 if (notify) {
750 /* notify and also delete the registration */
751 do_notify(mqhp);
752 }
753
754 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
755 (void) mutex_unlock(&mqhp->mq_exclusive);
756
757 return (0);
758 }
759
760 int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)761 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
762 {
763 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
764 NULL, ABS_TIME));
765 }
766
767 int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)768 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
769 uint_t msg_prio, const timespec_t *abs_timeout)
770 {
771 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
772 abs_timeout, ABS_TIME));
773 }
774
775 int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)776 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
777 uint_t msg_prio, const timespec_t *rel_timeout)
778 {
779 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
780 rel_timeout, REL_TIME));
781 }
782
783 static void
decrement_rblocked(mqhdr_t * mqhp)784 decrement_rblocked(mqhdr_t *mqhp)
785 {
786 int cancel_state;
787
788 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
789 while (sem_wait(&mqhp->mq_rblocked) == -1)
790 continue;
791 (void) pthread_setcancelstate(cancel_state, NULL);
792 }
793
794 static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)795 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
796 uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
797 {
798 mqdes_t *mqdp = (mqdes_t *)mqdes;
799 mqhdr_t *mqhp;
800 ssize_t msg_size;
801 int err;
802
803 /*
804 * sem_*wait() does cancellation, if called.
805 * pthread_testcancel() ensures that cancellation takes place if
806 * there is a cancellation pending when mq_*receive() is called.
807 */
808 pthread_testcancel();
809
810 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
811 errno = EBADF;
812 return (ssize_t)(-1);
813 }
814
815 mqhp = mqdp->mqd_mq;
816
817 if (msg_len < mqhp->mq_maxsz) {
818 errno = EMSGSIZE;
819 return (ssize_t)(-1);
820 }
821
822 /*
823 * The semaphoring scheme for mq_[timed]receive is a little hairy
824 * thanks to POSIX.1b's arcane notification mechanism. First,
825 * we try to take the common case and do a sem_trywait().
826 * If that doesn't work, and O_NONBLOCK hasn't been set,
827 * then note that we're going to sleep by incrementing the rblocked
828 * semaphore. We decrement that semaphore after waking up.
829 */
830 if (sem_trywait(&mqhp->mq_notempty) == -1) {
831 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
832 /*
833 * errno has been set to EAGAIN or EINTR by
834 * sem_trywait(), so we can just return.
835 */
836 return (-1);
837 }
838 /*
839 * If we're here, then we're probably going to block...
840 * increment the rblocked semaphore. If we get
841 * cancelled, decrement_rblocked() will decrement it.
842 */
843 (void) sem_post(&mqhp->mq_rblocked);
844
845 pthread_cleanup_push(decrement_rblocked, mqhp);
846 if (timeout == NULL)
847 err = sem_wait(&mqhp->mq_notempty);
848 else if (abs_rel == ABS_TIME)
849 err = sem_timedwait(&mqhp->mq_notempty, timeout);
850 else
851 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
852 pthread_cleanup_pop(1);
853
854 if (err == -1) {
855 /*
856 * We took a signal or timeout while waiting
857 * on mq_notempty...
858 */
859 return (-1);
860 }
861 }
862
863 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
864 owner_dead(mqdp, err);
865 return (-1);
866 }
867 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
868 (void) sem_post(&mqhp->mq_notfull);
869 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
870 (void) mutex_unlock(&mqhp->mq_exclusive);
871
872 return (msg_size);
873 }
874
875 ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)876 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
877 {
878 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
879 NULL, ABS_TIME));
880 }
881
882 ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)883 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
884 uint_t *msg_prio, const timespec_t *abs_timeout)
885 {
886 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
887 abs_timeout, ABS_TIME));
888 }
889
890 ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)891 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
892 uint_t *msg_prio, const timespec_t *rel_timeout)
893 {
894 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
895 rel_timeout, REL_TIME));
896 }
897
898 /*
899 * Only used below, in mq_notify().
900 * We already have a spawner thread.
901 * Verify that the attributes match; cancel it if necessary.
902 */
903 static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)904 cancel_if_necessary(thread_communication_data_t *tcdp,
905 const struct sigevent *sigevp)
906 {
907 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
908 sigevp->sigev_notify_attributes);
909
910 if (do_cancel) {
911 /*
912 * Attributes don't match, cancel the spawner thread.
913 */
914 (void) pthread_cancel(tcdp->tcd_server_id);
915 } else {
916 /*
917 * Reuse the existing spawner thread with possibly
918 * changed notification function and value.
919 */
920 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
921 tcdp->tcd_notif.sigev_signo = 0;
922 tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
923 tcdp->tcd_notif.sigev_notify_function =
924 sigevp->sigev_notify_function;
925 }
926
927 return (do_cancel);
928 }
929
930 int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)931 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
932 {
933 mqdes_t *mqdp = (mqdes_t *)mqdes;
934 mqhdr_t *mqhp;
935 thread_communication_data_t *tcdp;
936 siginfo_t mq_siginfo;
937 struct sigevent sigevent;
938 struct stat64 statb;
939 port_notify_t *pn;
940 void *userval;
941 int rval = -1;
942 int ntype;
943 int port;
944 int error;
945
946 if (!mq_is_valid(mqdp)) {
947 errno = EBADF;
948 return (-1);
949 }
950
951 mqhp = mqdp->mqd_mq;
952
953 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
954 mqdp->mqd_ownerdead = 1;
955 sigevp = NULL;
956 if (error == EOWNERDEAD)
957 (void) mutex_unlock(&mqhp->mq_exclusive);
958 /* carry on regardless, without holding mq_exclusive */
959 }
960
961 if (sigevp == NULL) { /* remove notification */
962 if (mqhp->mq_des == (uintptr_t)mqdp &&
963 mqhp->mq_sigid.sn_pid == getpid()) {
964 /* notification is set for this descriptor, remove it */
965 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
966 if ((tcdp = mqdp->mqd_tcd) != NULL) {
967 sig_mutex_lock(&tcdp->tcd_lock);
968 if (tcdp->tcd_msg_enabled) {
969 /* cancel the spawner thread */
970 tcdp = mqdp->mqd_tcd;
971 mqdp->mqd_tcd = NULL;
972 (void) pthread_cancel(
973 tcdp->tcd_server_id);
974 }
975 sig_mutex_unlock(&tcdp->tcd_lock);
976 }
977 mqhp->mq_ntype = 0;
978 mqhp->mq_des = 0;
979 } else {
980 /* notification is not set for this descriptor */
981 errno = EBUSY;
982 goto bad;
983 }
984 } else { /* register notification with this process */
985 switch (ntype = sigevp->sigev_notify) {
986 case SIGEV_THREAD:
987 userval = sigevp->sigev_value.sival_ptr;
988 port = -1;
989 break;
990 case SIGEV_PORT:
991 pn = sigevp->sigev_value.sival_ptr;
992 userval = pn->portnfy_user;
993 port = pn->portnfy_port;
994 if (fstat64(port, &statb) != 0 ||
995 !S_ISPORT(statb.st_mode)) {
996 errno = EBADF;
997 goto bad;
998 }
999 (void) memset(&sigevent, 0, sizeof (sigevent));
1000 sigevent.sigev_notify = SIGEV_PORT;
1001 sigevp = &sigevent;
1002 break;
1003 }
1004 switch (ntype) {
1005 case SIGEV_NONE:
1006 mq_siginfo.si_signo = 0;
1007 mq_siginfo.si_code = SI_MESGQ;
1008 break;
1009 case SIGEV_SIGNAL:
1010 mq_siginfo.si_signo = sigevp->sigev_signo;
1011 mq_siginfo.si_value = sigevp->sigev_value;
1012 mq_siginfo.si_code = SI_MESGQ;
1013 break;
1014 case SIGEV_THREAD:
1015 if ((tcdp = mqdp->mqd_tcd) != NULL &&
1016 cancel_if_necessary(tcdp, sigevp))
1017 mqdp->mqd_tcd = NULL;
1018 /* FALLTHROUGH */
1019 case SIGEV_PORT:
1020 if ((tcdp = mqdp->mqd_tcd) == NULL) {
1021 /* we must create a spawner thread */
1022 tcdp = setup_sigev_handler(sigevp, MQ);
1023 if (tcdp == NULL) {
1024 errno = EBADF;
1025 goto bad;
1026 }
1027 tcdp->tcd_msg_enabled = 0;
1028 tcdp->tcd_msg_closing = 0;
1029 tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1030 if (launch_spawner(tcdp) != 0) {
1031 free_sigev_handler(tcdp);
1032 goto bad;
1033 }
1034 mqdp->mqd_tcd = tcdp;
1035 }
1036 mq_siginfo.si_signo = 0;
1037 mq_siginfo.si_code = SI_MESGQ;
1038 break;
1039 default:
1040 errno = EINVAL;
1041 goto bad;
1042 }
1043
1044 /* register notification */
1045 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1046 goto bad;
1047 mqhp->mq_ntype = ntype;
1048 mqhp->mq_des = (uintptr_t)mqdp;
1049 switch (ntype) {
1050 case SIGEV_THREAD:
1051 case SIGEV_PORT:
1052 tcdp->tcd_port = port;
1053 tcdp->tcd_msg_object = mqdp;
1054 tcdp->tcd_msg_userval = userval;
1055 sig_mutex_lock(&tcdp->tcd_lock);
1056 tcdp->tcd_msg_enabled = ntype;
1057 sig_mutex_unlock(&tcdp->tcd_lock);
1058 (void) cond_broadcast(&tcdp->tcd_cv);
1059 break;
1060 }
1061 }
1062
1063 rval = 0; /* success */
1064 bad:
1065 if (error == 0) {
1066 (void) mutex_unlock(&mqhp->mq_exclusive);
1067 } else {
1068 errno = EBADMSG;
1069 rval = -1;
1070 }
1071 return (rval);
1072 }
1073
1074 int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)1075 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1076 {
1077 mqdes_t *mqdp = (mqdes_t *)mqdes;
1078 mqhdr_t *mqhp;
1079 uint_t flag = 0;
1080
1081 if (!mq_is_valid(mqdp)) {
1082 errno = EBADF;
1083 return (-1);
1084 }
1085
1086 /* store current attributes */
1087 if (omqstat != NULL) {
1088 int count;
1089
1090 mqhp = mqdp->mqd_mq;
1091 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1092 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1093 omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1094 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1095 omqstat->mq_curmsgs = count;
1096 }
1097
1098 /* set description attributes */
1099 if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1100 flag = FNONBLOCK;
1101 mqdp->mqd_mqdn->mqdn_flags = flag;
1102
1103 return (0);
1104 }
1105
1106 int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)1107 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1108 {
1109 mqdes_t *mqdp = (mqdes_t *)mqdes;
1110 mqhdr_t *mqhp;
1111 int count;
1112
1113 if (!mq_is_valid(mqdp)) {
1114 errno = EBADF;
1115 return (-1);
1116 }
1117
1118 mqhp = mqdp->mqd_mq;
1119
1120 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1121 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1122 mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1123 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1124 mqstat->mq_curmsgs = count;
1125 return (0);
1126 }
1127
1128 /*
1129 * Cleanup after fork1() in the child process.
1130 */
1131 void
postfork1_child_sigev_mq(void)1132 postfork1_child_sigev_mq(void)
1133 {
1134 thread_communication_data_t *tcdp;
1135 mqdes_t *mqdp;
1136
1137 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1138 if ((tcdp = mqdp->mqd_tcd) != NULL) {
1139 mqdp->mqd_tcd = NULL;
1140 tcd_teardown(tcdp);
1141 }
1142 }
1143 }
1144