1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include "lint.h"
28 #include "mtlib.h"
29 #define _KMEMUSER
30 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
31 #undef _KMEMUSER
32 #include <mqueue.h>
33 #include <sys/types.h>
34 #include <sys/file.h>
35 #include <sys/mman.h>
36 #include <errno.h>
37 #include <stdarg.h>
38 #include <limits.h>
39 #include <pthread.h>
40 #include <assert.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <sys/stat.h>
45 #include <inttypes.h>
46 #include "sigev_thread.h"
47 #include "pos4obj.h"
48
49 /*
50 * Default values per message queue
51 */
52 #define MQ_MAXMSG 128
53 #define MQ_MAXSIZE 1024
54
55 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
56
57 /*
58 * Message header which is part of messages in link list
59 */
60 typedef struct {
61 uint64_t msg_next; /* offset of next message in the link */
62 uint64_t msg_len; /* length of the message */
63 } msghdr_t;
64
65 /*
66 * message queue description
67 */
68 struct mq_dn {
69 size_t mqdn_flags; /* open description flags */
70 };
71
72 /*
73 * message queue descriptor structure
74 */
75 typedef struct mq_des {
76 struct mq_des *mqd_next; /* list of all open mq descriptors, */
77 struct mq_des *mqd_prev; /* needed for fork-safety */
78 int mqd_magic; /* magic # to identify mq_des */
79 int mqd_flags; /* operation flag per open */
80 struct mq_header *mqd_mq; /* address pointer of message Q */
81 struct mq_dn *mqd_mqdn; /* open description */
82 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
83 int mqd_ownerdead; /* mq_exclusive is inconsistent */
84 } mqdes_t;
85
86 /*
87 * message queue common header, part of the mmap()ed file.
88 * Since message queues may be shared between 32- and 64-bit processes,
89 * care must be taken to make sure that the elements of this structure
90 * are identical for both _LP64 and _ILP32 cases.
91 */
92 typedef struct mq_header {
93 /* first field must be mq_totsize, DO NOT insert before this */
94 int64_t mq_totsize; /* total size of the Queue */
95 int64_t mq_maxsz; /* max size of each message */
96 uint32_t mq_maxmsg; /* max messages in the queue */
97 uint32_t mq_maxprio; /* maximum mqueue priority */
98 uint32_t mq_curmaxprio; /* current maximum MQ priority */
99 uint32_t mq_mask; /* priority bitmask */
100 uint64_t mq_freep; /* free message's head pointer */
101 uint64_t mq_headpp; /* pointer to head pointers */
102 uint64_t mq_tailpp; /* pointer to tail pointers */
103 signotify_id_t mq_sigid; /* notification id (3 int's) */
104 uint32_t mq_ntype; /* notification type (SIGEV_*) */
105 uint64_t mq_des; /* pointer to msg Q descriptor */
106 mutex_t mq_exclusive; /* acquire for exclusive access */
107 sem_t mq_rblocked; /* number of processes rblocked */
108 sem_t mq_notfull; /* mq_send()'s block on this */
109 sem_t mq_notempty; /* mq_receive()'s block on this */
110 sem_t mq_spawner; /* spawner thread blocks on this */
111 } mqhdr_t;
112
113 /*
114 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
115 * If this assumption is somehow invalidated, mq_open() needs to be changed
116 * back to the old version which kept a count and enforced a limit.
117 * We make sure that this is pointed out to those changing <sys/param.h>
118 * by checking _MQ_OPEN_MAX at compile time.
119 */
120 #if _MQ_OPEN_MAX != -1
121 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
122 #endif
123
124 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
125
126 #ifdef DEBUG
127 #define MQ_ASSERT(x) assert(x);
128
129 #define MQ_ASSERT_PTR(_m, _p) \
130 assert((_p) != 0 && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
131 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
132 _m->mq_totsize));
133
134 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
135 int _val; \
136 (void) sem_getvalue((sem), &_val); \
137 assert((_val) <= val); }
138 #else
139 #define MQ_ASSERT(x)
140 #define MQ_ASSERT_PTR(_m, _p)
141 #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
142 #endif
143
144 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
145 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
146 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
147 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
148 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
149
150 #define MQ_RESERVED ((mqdes_t *)-1)
151
152 #define ABS_TIME 0
153 #define REL_TIME 1
154
155 static mutex_t mq_list_lock = DEFAULTMUTEX;
156 static mqdes_t *mq_list = NULL;
157
158 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
159
160 static int
mq_is_valid(mqdes_t * mqdp)161 mq_is_valid(mqdes_t *mqdp)
162 {
163 /*
164 * Any use of a message queue after it was closed is
165 * undefined. But the standard strongly favours EBADF
166 * returns. Before we dereference which could be fatal,
167 * we first do some pointer sanity checks.
168 */
169 if (mqdp != NULL && mqdp != MQ_RESERVED &&
170 ((uintptr_t)mqdp & 0x7) == 0) {
171 return (mqdp->mqd_magic == MQ_MAGIC);
172 }
173
174 return (0);
175 }
176
177 static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)178 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
179 {
180 int i;
181 uint64_t temp;
182 uint64_t currentp;
183 uint64_t nextp;
184
185 /*
186 * We only need to initialize the non-zero fields. The use of
187 * ftruncate() on the message queue file assures that the
188 * pages will be zero-filled.
189 */
190 (void) mutex_init(&mqhp->mq_exclusive,
191 USYNC_PROCESS | LOCK_ROBUST, NULL);
192 (void) sem_init(&mqhp->mq_rblocked, 1, 0);
193 (void) sem_init(&mqhp->mq_notempty, 1, 0);
194 (void) sem_init(&mqhp->mq_spawner, 1, 0);
195 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
196
197 mqhp->mq_maxsz = msgsize;
198 mqhp->mq_maxmsg = maxmsg;
199
200 /*
201 * As of this writing (1997), there are 32 message queue priorities.
202 * If this is to change, then the size of the mq_mask will
203 * also have to change. If DEBUG is defined, assert that
204 * _MQ_PRIO_MAX hasn't changed.
205 */
206 mqhp->mq_maxprio = _MQ_PRIO_MAX;
207 #if defined(DEBUG)
208 /* LINTED always true */
209 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
210 #endif
211
212 /*
213 * Since the message queue can be mapped into different
214 * virtual address ranges by different processes, we don't
215 * keep track of pointers, only offsets into the shared region.
216 */
217 mqhp->mq_headpp = sizeof (mqhdr_t);
218 mqhp->mq_tailpp = mqhp->mq_headpp +
219 mqhp->mq_maxprio * sizeof (uint64_t);
220 mqhp->mq_freep = mqhp->mq_tailpp +
221 mqhp->mq_maxprio * sizeof (uint64_t);
222
223 currentp = mqhp->mq_freep;
224 MQ_PTR(mqhp, currentp)->msg_next = 0;
225
226 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
227 for (i = 1; i < mqhp->mq_maxmsg; i++) {
228 nextp = currentp + sizeof (msghdr_t) + temp;
229 MQ_PTR(mqhp, currentp)->msg_next = nextp;
230 MQ_PTR(mqhp, nextp)->msg_next = 0;
231 currentp = nextp;
232 }
233 }
234
235 static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
237 {
238 uint64_t currentp;
239 msghdr_t *curbuf;
240 uint64_t *headpp;
241 uint64_t *tailpp;
242
243 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
244
245 /*
246 * Get the head and tail pointers for the queue of maximum
247 * priority. We shouldn't be here unless there is a message for
248 * us, so it's fair to assert that both the head and tail
249 * pointers are non-NULL.
250 */
251 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
252 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
253
254 if (msg_prio != NULL)
255 *msg_prio = mqhp->mq_curmaxprio;
256
257 currentp = *headpp;
258 MQ_ASSERT_PTR(mqhp, currentp);
259 curbuf = MQ_PTR(mqhp, currentp);
260
261 if ((*headpp = curbuf->msg_next) == 0) {
262 /*
263 * We just nuked the last message in this priority's queue.
264 * Twiddle this priority's bit, and then find the next bit
265 * tipped.
266 */
267 uint_t prio = mqhp->mq_curmaxprio;
268
269 mqhp->mq_mask &= ~(1u << prio);
270
271 for (; prio != 0; prio--)
272 if (mqhp->mq_mask & (1u << prio))
273 break;
274 mqhp->mq_curmaxprio = prio;
275
276 *tailpp = 0;
277 }
278
279 /*
280 * Copy the message, and put the buffer back on the free list.
281 */
282 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
283 curbuf->msg_next = mqhp->mq_freep;
284 mqhp->mq_freep = currentp;
285
286 return (curbuf->msg_len);
287 }
288
289
290 static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
292 {
293 uint64_t currentp;
294 msghdr_t *curbuf;
295 uint64_t *headpp;
296 uint64_t *tailpp;
297
298 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
299
300 /*
301 * Grab a free message block, and link it in. We shouldn't
302 * be here unless there is room in the queue for us; it's
303 * fair to assert that the free pointer is non-NULL.
304 */
305 currentp = mqhp->mq_freep;
306 MQ_ASSERT_PTR(mqhp, currentp);
307 curbuf = MQ_PTR(mqhp, currentp);
308
309 /*
310 * Remove a message from the free list, and copy in the new contents.
311 */
312 mqhp->mq_freep = curbuf->msg_next;
313 curbuf->msg_next = 0;
314 (void) memcpy((char *)&curbuf[1], msgp, len);
315 curbuf->msg_len = len;
316
317 headpp = HEAD_PTR(mqhp, prio);
318 tailpp = TAIL_PTR(mqhp, prio);
319
320 if (*tailpp == 0) {
321 /*
322 * This is the first message on this queue. Set the
323 * head and tail pointers, and tip the appropriate bit
324 * in the priority mask.
325 */
326 *headpp = currentp;
327 *tailpp = currentp;
328 mqhp->mq_mask |= (1u << prio);
329 if (prio > mqhp->mq_curmaxprio)
330 mqhp->mq_curmaxprio = prio;
331 } else {
332 MQ_ASSERT_PTR(mqhp, *tailpp);
333 MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
334 *tailpp = currentp;
335 }
336 }
337
338 /*
339 * Send a notification and also delete the registration.
340 */
341 static void
do_notify(mqhdr_t * mqhp)342 do_notify(mqhdr_t *mqhp)
343 {
344 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
345 if (mqhp->mq_ntype == SIGEV_THREAD ||
346 mqhp->mq_ntype == SIGEV_PORT)
347 (void) sem_post(&mqhp->mq_spawner);
348 mqhp->mq_ntype = 0;
349 mqhp->mq_des = 0;
350 }
351
352 /*
353 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
354 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
355 * they fail with errno == EBADMSG. Trigger any registered notification.
356 */
357 static void
owner_dead(mqdes_t * mqdp,int error)358 owner_dead(mqdes_t *mqdp, int error)
359 {
360 mqhdr_t *mqhp = mqdp->mqd_mq;
361
362 mqdp->mqd_ownerdead = 1;
363 (void) sem_post(&mqhp->mq_notfull);
364 (void) sem_post(&mqhp->mq_notempty);
365 if (error == EOWNERDEAD) {
366 if (mqhp->mq_sigid.sn_pid != 0)
367 do_notify(mqhp);
368 (void) mutex_unlock(&mqhp->mq_exclusive);
369 }
370 errno = EBADMSG;
371 }
372
373 mqd_t
mq_open(const char * path,int oflag,...)374 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
375 {
376 va_list ap;
377 mode_t mode = 0;
378 struct mq_attr *attr = NULL;
379 int fd;
380 int err;
381 int cr_flag = 0;
382 int locked = 0;
383 uint64_t total_size;
384 size_t msgsize;
385 ssize_t maxmsg;
386 uint64_t temp;
387 void *ptr;
388 mqdes_t *mqdp;
389 mqhdr_t *mqhp;
390 struct mq_dn *mqdnp;
391
392 if (__pos4obj_check(path) == -1)
393 return ((mqd_t)-1);
394
395 /* acquire MSGQ lock to have atomic operation */
396 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
397 goto out;
398 locked = 1;
399
400 va_start(ap, oflag);
401 /* filter oflag to have READ/WRITE/CREATE modes only */
402 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
403 if ((oflag & O_CREAT) != 0) {
404 mode = va_arg(ap, mode_t);
405 attr = va_arg(ap, struct mq_attr *);
406 }
407 va_end(ap);
408
409 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
410 mode, &cr_flag)) < 0)
411 goto out;
412
413 /* closing permission file */
414 (void) __close_nc(fd);
415
416 /* Try to open/create data file */
417 if (cr_flag) {
418 cr_flag = PFILE_CREATE;
419 if (attr == NULL) {
420 maxmsg = MQ_MAXMSG;
421 msgsize = MQ_MAXSIZE;
422 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
423 errno = EINVAL;
424 goto out;
425 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
426 errno = ENOSPC;
427 goto out;
428 } else {
429 maxmsg = attr->mq_maxmsg;
430 msgsize = attr->mq_msgsize;
431 }
432
433 /* adjust for message size at word boundary */
434 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
435
436 total_size = sizeof (mqhdr_t) +
437 maxmsg * (temp + sizeof (msghdr_t)) +
438 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
439
440 if (total_size > SSIZE_MAX) {
441 errno = ENOSPC;
442 goto out;
443 }
444
445 /*
446 * data file is opened with read/write to those
447 * who have read or write permission
448 */
449 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
450 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
451 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
452 goto out;
453
454 cr_flag |= DFILE_CREATE | DFILE_OPEN;
455
456 /* force permissions to avoid umask effect */
457 if (fchmod(fd, mode) < 0)
458 goto out;
459
460 if (ftruncate64(fd, (off64_t)total_size) < 0)
461 goto out;
462 } else {
463 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
464 O_RDWR, 0666, &err)) < 0)
465 goto out;
466 cr_flag = DFILE_OPEN;
467
468 /* Message queue has not been initialized yet */
469 if (read(fd, &total_size, sizeof (total_size)) !=
470 sizeof (total_size) || total_size == 0) {
471 errno = ENOENT;
472 goto out;
473 }
474
475 /* Message queue too big for this process to handle */
476 if (total_size > SSIZE_MAX) {
477 errno = EFBIG;
478 goto out;
479 }
480 }
481
482 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
483 errno = ENOMEM;
484 goto out;
485 }
486 cr_flag |= ALLOC_MEM;
487
488 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 goto out;
491 mqhp = ptr;
492 cr_flag |= DFILE_MMAP;
493
494 /* closing data file */
495 (void) __close_nc(fd);
496 cr_flag &= ~DFILE_OPEN;
497
498 /*
499 * create, unlink, size, mmap, and close description file
500 * all for a flag word in anonymous shared memory
501 */
502 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503 0666, &err)) < 0)
504 goto out;
505 cr_flag |= DFILE_OPEN;
506 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508 goto out;
509
510 if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512 goto out;
513 mqdnp = ptr;
514 cr_flag |= MQDNP_MMAP;
515
516 (void) __close_nc(fd);
517 cr_flag &= ~DFILE_OPEN;
518
519 /*
520 * we follow the same strategy as filesystem open() routine,
521 * where fcntl.h flags are changed to flags defined in file.h.
522 */
523 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
524 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
525
526 /* new message queue requires initialization */
527 if ((cr_flag & DFILE_CREATE) != 0) {
528 /* message queue header has to be initialized */
529 mq_init(mqhp, msgsize, maxmsg);
530 mqhp->mq_totsize = total_size;
531 }
532 mqdp->mqd_mq = mqhp;
533 mqdp->mqd_mqdn = mqdnp;
534 mqdp->mqd_magic = MQ_MAGIC;
535 mqdp->mqd_tcd = NULL;
536 mqdp->mqd_ownerdead = 0;
537 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
538 lmutex_lock(&mq_list_lock);
539 mqdp->mqd_next = mq_list;
540 mqdp->mqd_prev = NULL;
541 if (mq_list)
542 mq_list->mqd_prev = mqdp;
543 mq_list = mqdp;
544 lmutex_unlock(&mq_list_lock);
545 return ((mqd_t)mqdp);
546 }
547
548 locked = 0; /* fall into the error case */
549 out:
550 err = errno;
551 if ((cr_flag & DFILE_OPEN) != 0)
552 (void) __close_nc(fd);
553 if ((cr_flag & DFILE_CREATE) != 0)
554 (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
555 if ((cr_flag & PFILE_CREATE) != 0)
556 (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
557 if ((cr_flag & ALLOC_MEM) != 0)
558 free((void *)mqdp);
559 if ((cr_flag & DFILE_MMAP) != 0)
560 (void) munmap((caddr_t)mqhp, (size_t)total_size);
561 if ((cr_flag & MQDNP_MMAP) != 0)
562 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
563 if (locked)
564 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
565 errno = err;
566 return ((mqd_t)-1);
567 }
568
569 static void
mq_close_cleanup(mqdes_t * mqdp)570 mq_close_cleanup(mqdes_t *mqdp)
571 {
572 mqhdr_t *mqhp = mqdp->mqd_mq;
573 struct mq_dn *mqdnp = mqdp->mqd_mqdn;
574
575 /* invalidate the descriptor before freeing it */
576 mqdp->mqd_magic = 0;
577 if (!mqdp->mqd_ownerdead)
578 (void) mutex_unlock(&mqhp->mq_exclusive);
579
580 lmutex_lock(&mq_list_lock);
581 if (mqdp->mqd_next)
582 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
583 if (mqdp->mqd_prev)
584 mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
585 if (mq_list == mqdp)
586 mq_list = mqdp->mqd_next;
587 lmutex_unlock(&mq_list_lock);
588
589 free(mqdp);
590 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
591 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
592 }
593
594 int
mq_close(mqd_t mqdes)595 mq_close(mqd_t mqdes)
596 {
597 mqdes_t *mqdp = (mqdes_t *)mqdes;
598 mqhdr_t *mqhp;
599 thread_communication_data_t *tcdp;
600 int error;
601
602 if (!mq_is_valid(mqdp)) {
603 errno = EBADF;
604 return (-1);
605 }
606
607 mqhp = mqdp->mqd_mq;
608 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
609 mqdp->mqd_ownerdead = 1;
610 if (error == EOWNERDEAD)
611 (void) mutex_unlock(&mqhp->mq_exclusive);
612 /* carry on regardless, without holding mq_exclusive */
613 }
614
615 if (mqhp->mq_des == (uintptr_t)mqdp &&
616 mqhp->mq_sigid.sn_pid == getpid()) {
617 /* notification is set for this descriptor, remove it */
618 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
619 mqhp->mq_ntype = 0;
620 mqhp->mq_des = 0;
621 }
622
623 pthread_cleanup_push(mq_close_cleanup, mqdp);
624 if ((tcdp = mqdp->mqd_tcd) != NULL) {
625 mqdp->mqd_tcd = NULL;
626 del_sigev_mq(tcdp); /* possible cancellation point */
627 }
628 pthread_cleanup_pop(1); /* finish in the cleanup handler */
629
630 return (0);
631 }
632
633 int
mq_unlink(const char * path)634 mq_unlink(const char *path)
635 {
636 int err;
637
638 if (__pos4obj_check(path) < 0)
639 return (-1);
640
641 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
642 return (-1);
643 }
644
645 err = __pos4obj_unlink(path, MQ_PERM_TYPE);
646
647 if (err == 0 || (err == -1 && errno == EEXIST)) {
648 errno = 0;
649 err = __pos4obj_unlink(path, MQ_DATA_TYPE);
650 }
651
652 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
653 return (-1);
654
655 return (err);
656
657 }
658
659 static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)660 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
661 uint_t msg_prio, const timespec_t *timeout, int abs_rel)
662 {
663 mqdes_t *mqdp = (mqdes_t *)mqdes;
664 mqhdr_t *mqhp;
665 int err;
666 int notify = 0;
667
668 /*
669 * sem_*wait() does cancellation, if called.
670 * pthread_testcancel() ensures that cancellation takes place if
671 * there is a cancellation pending when mq_*send() is called.
672 */
673 pthread_testcancel();
674
675 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
676 errno = EBADF;
677 return (-1);
678 }
679
680 mqhp = mqdp->mqd_mq;
681
682 if (msg_prio >= mqhp->mq_maxprio) {
683 errno = EINVAL;
684 return (-1);
685 }
686 if (msg_len > mqhp->mq_maxsz) {
687 errno = EMSGSIZE;
688 return (-1);
689 }
690
691 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
692 err = sem_trywait(&mqhp->mq_notfull);
693 else {
694 /*
695 * We might get cancelled here...
696 */
697 if (timeout == NULL)
698 err = sem_wait(&mqhp->mq_notfull);
699 else if (abs_rel == ABS_TIME)
700 err = sem_timedwait(&mqhp->mq_notfull, timeout);
701 else
702 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
703 }
704 if (err == -1) {
705 /*
706 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
707 * by sem_*wait(), so we can just return.
708 */
709 return (-1);
710 }
711
712 /*
713 * By the time we're here, we know that we've got the capacity
714 * to add to the queue...now acquire the exclusive lock.
715 */
716 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
717 owner_dead(mqdp, err);
718 return (-1);
719 }
720
721 /*
722 * Now determine if we want to kick the notification. POSIX
723 * requires that if a process has registered for notification,
724 * we must kick it when the queue makes an empty to non-empty
725 * transition, and there are no blocked receivers. Note that
726 * this mechanism does _not_ guarantee that the kicked process
727 * will be able to receive a message without blocking;
728 * another receiver could intervene in the meantime. Thus,
729 * the notification mechanism is inherently racy; all we can
730 * do is hope to minimize the window as much as possible.
731 * In general, we want to avoid kicking the notification when
732 * there are clearly receivers blocked. We'll determine if
733 * we want to kick the notification before the mq_putmsg(),
734 * but the actual signotify() won't be done until the message
735 * is on the queue.
736 */
737 if (mqhp->mq_sigid.sn_pid != 0) {
738 int nmessages, nblocked;
739
740 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
741 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
742
743 if (nmessages == 0 && nblocked == 0)
744 notify = 1;
745 }
746
747 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
748 (void) sem_post(&mqhp->mq_notempty);
749
750 if (notify) {
751 /* notify and also delete the registration */
752 do_notify(mqhp);
753 }
754
755 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
756 (void) mutex_unlock(&mqhp->mq_exclusive);
757
758 return (0);
759 }
760
761 int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)762 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
763 {
764 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
765 NULL, ABS_TIME));
766 }
767
768 int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)769 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
770 uint_t msg_prio, const timespec_t *abs_timeout)
771 {
772 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
773 abs_timeout, ABS_TIME));
774 }
775
776 int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)777 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
778 uint_t msg_prio, const timespec_t *rel_timeout)
779 {
780 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
781 rel_timeout, REL_TIME));
782 }
783
784 static void
decrement_rblocked(mqhdr_t * mqhp)785 decrement_rblocked(mqhdr_t *mqhp)
786 {
787 int cancel_state;
788
789 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
790 while (sem_wait(&mqhp->mq_rblocked) == -1)
791 continue;
792 (void) pthread_setcancelstate(cancel_state, NULL);
793 }
794
795 static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)796 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
797 uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
798 {
799 mqdes_t *mqdp = (mqdes_t *)mqdes;
800 mqhdr_t *mqhp;
801 ssize_t msg_size;
802 int err;
803
804 /*
805 * sem_*wait() does cancellation, if called.
806 * pthread_testcancel() ensures that cancellation takes place if
807 * there is a cancellation pending when mq_*receive() is called.
808 */
809 pthread_testcancel();
810
811 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
812 errno = EBADF;
813 return (ssize_t)(-1);
814 }
815
816 mqhp = mqdp->mqd_mq;
817
818 if (msg_len < mqhp->mq_maxsz) {
819 errno = EMSGSIZE;
820 return (ssize_t)(-1);
821 }
822
823 /*
824 * The semaphoring scheme for mq_[timed]receive is a little hairy
825 * thanks to POSIX.1b's arcane notification mechanism. First,
826 * we try to take the common case and do a sem_trywait().
827 * If that doesn't work, and O_NONBLOCK hasn't been set,
828 * then note that we're going to sleep by incrementing the rblocked
829 * semaphore. We decrement that semaphore after waking up.
830 */
831 if (sem_trywait(&mqhp->mq_notempty) == -1) {
832 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
833 /*
834 * errno has been set to EAGAIN or EINTR by
835 * sem_trywait(), so we can just return.
836 */
837 return (-1);
838 }
839 /*
840 * If we're here, then we're probably going to block...
841 * increment the rblocked semaphore. If we get
842 * cancelled, decrement_rblocked() will decrement it.
843 */
844 (void) sem_post(&mqhp->mq_rblocked);
845
846 pthread_cleanup_push(decrement_rblocked, mqhp);
847 if (timeout == NULL)
848 err = sem_wait(&mqhp->mq_notempty);
849 else if (abs_rel == ABS_TIME)
850 err = sem_timedwait(&mqhp->mq_notempty, timeout);
851 else
852 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
853 pthread_cleanup_pop(1);
854
855 if (err == -1) {
856 /*
857 * We took a signal or timeout while waiting
858 * on mq_notempty...
859 */
860 return (-1);
861 }
862 }
863
864 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
865 owner_dead(mqdp, err);
866 return (-1);
867 }
868 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
869 (void) sem_post(&mqhp->mq_notfull);
870 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
871 (void) mutex_unlock(&mqhp->mq_exclusive);
872
873 return (msg_size);
874 }
875
876 ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)877 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
878 {
879 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
880 NULL, ABS_TIME));
881 }
882
883 ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)884 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
885 uint_t *msg_prio, const timespec_t *abs_timeout)
886 {
887 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
888 abs_timeout, ABS_TIME));
889 }
890
891 ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)892 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
893 uint_t *msg_prio, const timespec_t *rel_timeout)
894 {
895 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
896 rel_timeout, REL_TIME));
897 }
898
899 /*
900 * Only used below, in mq_notify().
901 * We already have a spawner thread.
902 * Verify that the attributes match; cancel it if necessary.
903 */
904 static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)905 cancel_if_necessary(thread_communication_data_t *tcdp,
906 const struct sigevent *sigevp)
907 {
908 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
909 sigevp->sigev_notify_attributes);
910
911 if (do_cancel) {
912 /*
913 * Attributes don't match, cancel the spawner thread.
914 */
915 (void) pthread_cancel(tcdp->tcd_server_id);
916 } else {
917 /*
918 * Reuse the existing spawner thread with possibly
919 * changed notification function and value.
920 */
921 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
922 tcdp->tcd_notif.sigev_signo = 0;
923 tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
924 tcdp->tcd_notif.sigev_notify_function =
925 sigevp->sigev_notify_function;
926 }
927
928 return (do_cancel);
929 }
930
931 int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)932 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
933 {
934 mqdes_t *mqdp = (mqdes_t *)mqdes;
935 mqhdr_t *mqhp;
936 thread_communication_data_t *tcdp;
937 siginfo_t mq_siginfo;
938 struct sigevent sigevent;
939 struct stat64 statb;
940 port_notify_t *pn;
941 void *userval;
942 int rval = -1;
943 int ntype;
944 int port;
945 int error;
946
947 if (!mq_is_valid(mqdp)) {
948 errno = EBADF;
949 return (-1);
950 }
951
952 mqhp = mqdp->mqd_mq;
953
954 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
955 mqdp->mqd_ownerdead = 1;
956 sigevp = NULL;
957 if (error == EOWNERDEAD)
958 (void) mutex_unlock(&mqhp->mq_exclusive);
959 /* carry on regardless, without holding mq_exclusive */
960 }
961
962 if (sigevp == NULL) { /* remove notification */
963 if (mqhp->mq_des == (uintptr_t)mqdp &&
964 mqhp->mq_sigid.sn_pid == getpid()) {
965 /* notification is set for this descriptor, remove it */
966 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
967 if ((tcdp = mqdp->mqd_tcd) != NULL) {
968 sig_mutex_lock(&tcdp->tcd_lock);
969 if (tcdp->tcd_msg_enabled) {
970 /* cancel the spawner thread */
971 tcdp = mqdp->mqd_tcd;
972 mqdp->mqd_tcd = NULL;
973 (void) pthread_cancel(
974 tcdp->tcd_server_id);
975 }
976 sig_mutex_unlock(&tcdp->tcd_lock);
977 }
978 mqhp->mq_ntype = 0;
979 mqhp->mq_des = 0;
980 } else {
981 /* notification is not set for this descriptor */
982 errno = EBUSY;
983 goto bad;
984 }
985 } else { /* register notification with this process */
986 switch (ntype = sigevp->sigev_notify) {
987 case SIGEV_THREAD:
988 userval = sigevp->sigev_value.sival_ptr;
989 port = -1;
990 break;
991 case SIGEV_PORT:
992 pn = sigevp->sigev_value.sival_ptr;
993 userval = pn->portnfy_user;
994 port = pn->portnfy_port;
995 if (fstat64(port, &statb) != 0 ||
996 !S_ISPORT(statb.st_mode)) {
997 errno = EBADF;
998 goto bad;
999 }
1000 (void) memset(&sigevent, 0, sizeof (sigevent));
1001 sigevent.sigev_notify = SIGEV_PORT;
1002 sigevp = &sigevent;
1003 break;
1004 }
1005 switch (ntype) {
1006 case SIGEV_NONE:
1007 mq_siginfo.si_signo = 0;
1008 mq_siginfo.si_code = SI_MESGQ;
1009 break;
1010 case SIGEV_SIGNAL:
1011 mq_siginfo.si_signo = sigevp->sigev_signo;
1012 mq_siginfo.si_value = sigevp->sigev_value;
1013 mq_siginfo.si_code = SI_MESGQ;
1014 break;
1015 case SIGEV_THREAD:
1016 if ((tcdp = mqdp->mqd_tcd) != NULL &&
1017 cancel_if_necessary(tcdp, sigevp))
1018 mqdp->mqd_tcd = NULL;
1019 /* FALLTHROUGH */
1020 case SIGEV_PORT:
1021 if ((tcdp = mqdp->mqd_tcd) == NULL) {
1022 /* we must create a spawner thread */
1023 tcdp = setup_sigev_handler(sigevp, MQ);
1024 if (tcdp == NULL) {
1025 errno = EBADF;
1026 goto bad;
1027 }
1028 tcdp->tcd_msg_enabled = 0;
1029 tcdp->tcd_msg_closing = 0;
1030 tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1031 if (launch_spawner(tcdp) != 0) {
1032 free_sigev_handler(tcdp);
1033 goto bad;
1034 }
1035 mqdp->mqd_tcd = tcdp;
1036 }
1037 mq_siginfo.si_signo = 0;
1038 mq_siginfo.si_code = SI_MESGQ;
1039 break;
1040 default:
1041 errno = EINVAL;
1042 goto bad;
1043 }
1044
1045 /* register notification */
1046 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1047 goto bad;
1048 mqhp->mq_ntype = ntype;
1049 mqhp->mq_des = (uintptr_t)mqdp;
1050 switch (ntype) {
1051 case SIGEV_THREAD:
1052 case SIGEV_PORT:
1053 tcdp->tcd_port = port;
1054 tcdp->tcd_msg_object = mqdp;
1055 tcdp->tcd_msg_userval = userval;
1056 sig_mutex_lock(&tcdp->tcd_lock);
1057 tcdp->tcd_msg_enabled = ntype;
1058 sig_mutex_unlock(&tcdp->tcd_lock);
1059 (void) cond_broadcast(&tcdp->tcd_cv);
1060 break;
1061 }
1062 }
1063
1064 rval = 0; /* success */
1065 bad:
1066 if (error == 0) {
1067 (void) mutex_unlock(&mqhp->mq_exclusive);
1068 } else {
1069 errno = EBADMSG;
1070 rval = -1;
1071 }
1072 return (rval);
1073 }
1074
1075 int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)1076 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1077 {
1078 mqdes_t *mqdp = (mqdes_t *)mqdes;
1079 mqhdr_t *mqhp;
1080 uint_t flag = 0;
1081
1082 if (!mq_is_valid(mqdp)) {
1083 errno = EBADF;
1084 return (-1);
1085 }
1086
1087 /* store current attributes */
1088 if (omqstat != NULL) {
1089 int count;
1090
1091 mqhp = mqdp->mqd_mq;
1092 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1093 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1094 omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1095 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1096 omqstat->mq_curmsgs = count;
1097 }
1098
1099 /* set description attributes */
1100 if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1101 flag = FNONBLOCK;
1102 mqdp->mqd_mqdn->mqdn_flags = flag;
1103
1104 return (0);
1105 }
1106
1107 int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)1108 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1109 {
1110 mqdes_t *mqdp = (mqdes_t *)mqdes;
1111 mqhdr_t *mqhp;
1112 int count;
1113
1114 if (!mq_is_valid(mqdp)) {
1115 errno = EBADF;
1116 return (-1);
1117 }
1118
1119 mqhp = mqdp->mqd_mq;
1120
1121 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1122 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1123 mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1124 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1125 mqstat->mq_curmsgs = count;
1126 return (0);
1127 }
1128
1129 /*
1130 * Cleanup after fork1() in the child process.
1131 */
1132 void
postfork1_child_sigev_mq(void)1133 postfork1_child_sigev_mq(void)
1134 {
1135 thread_communication_data_t *tcdp;
1136 mqdes_t *mqdp;
1137
1138 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1139 if ((tcdp = mqdp->mqd_tcd) != NULL) {
1140 mqdp->mqd_tcd = NULL;
1141 tcd_teardown(tcdp);
1142 }
1143 }
1144 }
1145