1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 #include "lint.h"
30 #include "mtlib.h"
31 #define _KMEMUSER
32 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
33 #undef _KMEMUSER
34 #include <mqueue.h>
35 #include <sys/types.h>
36 #include <sys/file.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #include <stdarg.h>
40 #include <limits.h>
41 #include <pthread.h>
42 #include <assert.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <stdlib.h>
46 #include <sys/stat.h>
47 #include <inttypes.h>
48 #include "sigev_thread.h"
49 #include "pos4obj.h"
50
51 /*
52 * Default values per message queue
53 */
54 #define MQ_MAXMSG 128
55 #define MQ_MAXSIZE 1024
56
57 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
58
59 /*
60 * Message header which is part of messages in link list
61 */
62 typedef struct {
63 uint64_t msg_next; /* offset of next message in the link */
64 uint64_t msg_len; /* length of the message */
65 } msghdr_t;
66
67 /*
68 * message queue description
69 */
70 struct mq_dn {
71 size_t mqdn_flags; /* open description flags */
72 };
73
74 /*
75 * message queue descriptor structure
76 */
77 typedef struct mq_des {
78 struct mq_des *mqd_next; /* list of all open mq descriptors, */
79 struct mq_des *mqd_prev; /* needed for fork-safety */
80 int mqd_magic; /* magic # to identify mq_des */
81 int mqd_flags; /* operation flag per open */
82 struct mq_header *mqd_mq; /* address pointer of message Q */
83 struct mq_dn *mqd_mqdn; /* open description */
84 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
85 int mqd_ownerdead; /* mq_exclusive is inconsistent */
86 } mqdes_t;
87
88 /*
89 * message queue common header, part of the mmap()ed file.
90 * Since message queues may be shared between 32- and 64-bit processes,
91 * care must be taken to make sure that the elements of this structure
92 * are identical for both _LP64 and _ILP32 cases.
93 */
94 typedef struct mq_header {
95 /* first field must be mq_totsize, DO NOT insert before this */
96 int64_t mq_totsize; /* total size of the Queue */
97 int64_t mq_maxsz; /* max size of each message */
98 uint32_t mq_maxmsg; /* max messages in the queue */
99 uint32_t mq_maxprio; /* maximum mqueue priority */
100 uint32_t mq_curmaxprio; /* current maximum MQ priority */
101 uint32_t mq_mask; /* priority bitmask */
102 uint64_t mq_freep; /* free message's head pointer */
103 uint64_t mq_headpp; /* pointer to head pointers */
104 uint64_t mq_tailpp; /* pointer to tail pointers */
105 signotify_id_t mq_sigid; /* notification id (3 int's) */
106 uint32_t mq_ntype; /* notification type (SIGEV_*) */
107 uint64_t mq_des; /* pointer to msg Q descriptor */
108 mutex_t mq_exclusive; /* acquire for exclusive access */
109 sem_t mq_rblocked; /* number of processes rblocked */
110 sem_t mq_notfull; /* mq_send()'s block on this */
111 sem_t mq_notempty; /* mq_receive()'s block on this */
112 sem_t mq_spawner; /* spawner thread blocks on this */
113 } mqhdr_t;
114
115 /*
116 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
117 * If this assumption is somehow invalidated, mq_open() needs to be changed
118 * back to the old version which kept a count and enforced a limit.
119 * We make sure that this is pointed out to those changing <sys/param.h>
120 * by checking _MQ_OPEN_MAX at compile time.
121 */
122 #if _MQ_OPEN_MAX != -1
123 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
124 #endif
125
126 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
127
128 #ifdef DEBUG
129 #define MQ_ASSERT(x) assert(x);
130
131 #define MQ_ASSERT_PTR(_m, _p) \
132 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
133 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
134 _m->mq_totsize));
135
136 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
137 int _val; \
138 (void) sem_getvalue((sem), &_val); \
139 assert((_val) <= val); }
140 #else
141 #define MQ_ASSERT(x)
142 #define MQ_ASSERT_PTR(_m, _p)
143 #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
144 #endif
145
146 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
147 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
148 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
149 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
150 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
151
152 #define MQ_RESERVED ((mqdes_t *)-1)
153
154 #define ABS_TIME 0
155 #define REL_TIME 1
156
157 static mutex_t mq_list_lock = DEFAULTMUTEX;
158 static mqdes_t *mq_list = NULL;
159
160 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
161
162 static int
mq_is_valid(mqdes_t * mqdp)163 mq_is_valid(mqdes_t *mqdp)
164 {
165 /*
166 * Any use of a message queue after it was closed is
167 * undefined. But the standard strongly favours EBADF
168 * returns. Before we dereference which could be fatal,
169 * we first do some pointer sanity checks.
170 */
171 if (mqdp != NULL && mqdp != MQ_RESERVED &&
172 ((uintptr_t)mqdp & 0x7) == 0) {
173 return (mqdp->mqd_magic == MQ_MAGIC);
174 }
175
176 return (0);
177 }
178
179 static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)180 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
181 {
182 int i;
183 uint64_t temp;
184 uint64_t currentp;
185 uint64_t nextp;
186
187 /*
188 * We only need to initialize the non-zero fields. The use of
189 * ftruncate() on the message queue file assures that the
190 * pages will be zero-filled.
191 */
192 (void) mutex_init(&mqhp->mq_exclusive,
193 USYNC_PROCESS | LOCK_ROBUST, NULL);
194 (void) sem_init(&mqhp->mq_rblocked, 1, 0);
195 (void) sem_init(&mqhp->mq_notempty, 1, 0);
196 (void) sem_init(&mqhp->mq_spawner, 1, 0);
197 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
198
199 mqhp->mq_maxsz = msgsize;
200 mqhp->mq_maxmsg = maxmsg;
201
202 /*
203 * As of this writing (1997), there are 32 message queue priorities.
204 * If this is to change, then the size of the mq_mask will
205 * also have to change. If DEBUG is defined, assert that
206 * _MQ_PRIO_MAX hasn't changed.
207 */
208 mqhp->mq_maxprio = _MQ_PRIO_MAX;
209 #if defined(DEBUG)
210 /* LINTED always true */
211 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
212 #endif
213
214 /*
215 * Since the message queue can be mapped into different
216 * virtual address ranges by different processes, we don't
217 * keep track of pointers, only offsets into the shared region.
218 */
219 mqhp->mq_headpp = sizeof (mqhdr_t);
220 mqhp->mq_tailpp = mqhp->mq_headpp +
221 mqhp->mq_maxprio * sizeof (uint64_t);
222 mqhp->mq_freep = mqhp->mq_tailpp +
223 mqhp->mq_maxprio * sizeof (uint64_t);
224
225 currentp = mqhp->mq_freep;
226 MQ_PTR(mqhp, currentp)->msg_next = 0;
227
228 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
229 for (i = 1; i < mqhp->mq_maxmsg; i++) {
230 nextp = currentp + sizeof (msghdr_t) + temp;
231 MQ_PTR(mqhp, currentp)->msg_next = nextp;
232 MQ_PTR(mqhp, nextp)->msg_next = 0;
233 currentp = nextp;
234 }
235 }
236
237 static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)238 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
239 {
240 uint64_t currentp;
241 msghdr_t *curbuf;
242 uint64_t *headpp;
243 uint64_t *tailpp;
244
245 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
246
247 /*
248 * Get the head and tail pointers for the queue of maximum
249 * priority. We shouldn't be here unless there is a message for
250 * us, so it's fair to assert that both the head and tail
251 * pointers are non-NULL.
252 */
253 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
254 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
255
256 if (msg_prio != NULL)
257 *msg_prio = mqhp->mq_curmaxprio;
258
259 currentp = *headpp;
260 MQ_ASSERT_PTR(mqhp, currentp);
261 curbuf = MQ_PTR(mqhp, currentp);
262
263 if ((*headpp = curbuf->msg_next) == NULL) {
264 /*
265 * We just nuked the last message in this priority's queue.
266 * Twiddle this priority's bit, and then find the next bit
267 * tipped.
268 */
269 uint_t prio = mqhp->mq_curmaxprio;
270
271 mqhp->mq_mask &= ~(1u << prio);
272
273 for (; prio != 0; prio--)
274 if (mqhp->mq_mask & (1u << prio))
275 break;
276 mqhp->mq_curmaxprio = prio;
277
278 *tailpp = NULL;
279 }
280
281 /*
282 * Copy the message, and put the buffer back on the free list.
283 */
284 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
285 curbuf->msg_next = mqhp->mq_freep;
286 mqhp->mq_freep = currentp;
287
288 return (curbuf->msg_len);
289 }
290
291
292 static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)293 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
294 {
295 uint64_t currentp;
296 msghdr_t *curbuf;
297 uint64_t *headpp;
298 uint64_t *tailpp;
299
300 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
301
302 /*
303 * Grab a free message block, and link it in. We shouldn't
304 * be here unless there is room in the queue for us; it's
305 * fair to assert that the free pointer is non-NULL.
306 */
307 currentp = mqhp->mq_freep;
308 MQ_ASSERT_PTR(mqhp, currentp);
309 curbuf = MQ_PTR(mqhp, currentp);
310
311 /*
312 * Remove a message from the free list, and copy in the new contents.
313 */
314 mqhp->mq_freep = curbuf->msg_next;
315 curbuf->msg_next = NULL;
316 (void) memcpy((char *)&curbuf[1], msgp, len);
317 curbuf->msg_len = len;
318
319 headpp = HEAD_PTR(mqhp, prio);
320 tailpp = TAIL_PTR(mqhp, prio);
321
322 if (*tailpp == 0) {
323 /*
324 * This is the first message on this queue. Set the
325 * head and tail pointers, and tip the appropriate bit
326 * in the priority mask.
327 */
328 *headpp = currentp;
329 *tailpp = currentp;
330 mqhp->mq_mask |= (1u << prio);
331 if (prio > mqhp->mq_curmaxprio)
332 mqhp->mq_curmaxprio = prio;
333 } else {
334 MQ_ASSERT_PTR(mqhp, *tailpp);
335 MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
336 *tailpp = currentp;
337 }
338 }
339
340 /*
341 * Send a notification and also delete the registration.
342 */
343 static void
do_notify(mqhdr_t * mqhp)344 do_notify(mqhdr_t *mqhp)
345 {
346 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
347 if (mqhp->mq_ntype == SIGEV_THREAD ||
348 mqhp->mq_ntype == SIGEV_PORT)
349 (void) sem_post(&mqhp->mq_spawner);
350 mqhp->mq_ntype = 0;
351 mqhp->mq_des = 0;
352 }
353
354 /*
355 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
356 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
357 * they fail with errno == EBADMSG. Trigger any registered notification.
358 */
359 static void
owner_dead(mqdes_t * mqdp,int error)360 owner_dead(mqdes_t *mqdp, int error)
361 {
362 mqhdr_t *mqhp = mqdp->mqd_mq;
363
364 mqdp->mqd_ownerdead = 1;
365 (void) sem_post(&mqhp->mq_notfull);
366 (void) sem_post(&mqhp->mq_notempty);
367 if (error == EOWNERDEAD) {
368 if (mqhp->mq_sigid.sn_pid != 0)
369 do_notify(mqhp);
370 (void) mutex_unlock(&mqhp->mq_exclusive);
371 }
372 errno = EBADMSG;
373 }
374
375 mqd_t
mq_open(const char * path,int oflag,...)376 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
377 {
378 va_list ap;
379 mode_t mode = 0;
380 struct mq_attr *attr = NULL;
381 int fd;
382 int err;
383 int cr_flag = 0;
384 int locked = 0;
385 uint64_t total_size;
386 size_t msgsize;
387 ssize_t maxmsg;
388 uint64_t temp;
389 void *ptr;
390 mqdes_t *mqdp;
391 mqhdr_t *mqhp;
392 struct mq_dn *mqdnp;
393
394 if (__pos4obj_check(path) == -1)
395 return ((mqd_t)-1);
396
397 /* acquire MSGQ lock to have atomic operation */
398 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
399 goto out;
400 locked = 1;
401
402 va_start(ap, oflag);
403 /* filter oflag to have READ/WRITE/CREATE modes only */
404 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
405 if ((oflag & O_CREAT) != 0) {
406 mode = va_arg(ap, mode_t);
407 attr = va_arg(ap, struct mq_attr *);
408 }
409 va_end(ap);
410
411 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
412 mode, &cr_flag)) < 0)
413 goto out;
414
415 /* closing permission file */
416 (void) __close_nc(fd);
417
418 /* Try to open/create data file */
419 if (cr_flag) {
420 cr_flag = PFILE_CREATE;
421 if (attr == NULL) {
422 maxmsg = MQ_MAXMSG;
423 msgsize = MQ_MAXSIZE;
424 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
425 errno = EINVAL;
426 goto out;
427 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
428 errno = ENOSPC;
429 goto out;
430 } else {
431 maxmsg = attr->mq_maxmsg;
432 msgsize = attr->mq_msgsize;
433 }
434
435 /* adjust for message size at word boundary */
436 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
437
438 total_size = sizeof (mqhdr_t) +
439 maxmsg * (temp + sizeof (msghdr_t)) +
440 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
441
442 if (total_size > SSIZE_MAX) {
443 errno = ENOSPC;
444 goto out;
445 }
446
447 /*
448 * data file is opened with read/write to those
449 * who have read or write permission
450 */
451 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
452 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
453 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
454 goto out;
455
456 cr_flag |= DFILE_CREATE | DFILE_OPEN;
457
458 /* force permissions to avoid umask effect */
459 if (fchmod(fd, mode) < 0)
460 goto out;
461
462 if (ftruncate64(fd, (off64_t)total_size) < 0)
463 goto out;
464 } else {
465 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
466 O_RDWR, 0666, &err)) < 0)
467 goto out;
468 cr_flag = DFILE_OPEN;
469
470 /* Message queue has not been initialized yet */
471 if (read(fd, &total_size, sizeof (total_size)) !=
472 sizeof (total_size) || total_size == 0) {
473 errno = ENOENT;
474 goto out;
475 }
476
477 /* Message queue too big for this process to handle */
478 if (total_size > SSIZE_MAX) {
479 errno = EFBIG;
480 goto out;
481 }
482 }
483
484 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
485 errno = ENOMEM;
486 goto out;
487 }
488 cr_flag |= ALLOC_MEM;
489
490 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
491 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
492 goto out;
493 mqhp = ptr;
494 cr_flag |= DFILE_MMAP;
495
496 /* closing data file */
497 (void) __close_nc(fd);
498 cr_flag &= ~DFILE_OPEN;
499
500 /*
501 * create, unlink, size, mmap, and close description file
502 * all for a flag word in anonymous shared memory
503 */
504 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
505 0666, &err)) < 0)
506 goto out;
507 cr_flag |= DFILE_OPEN;
508 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
509 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
510 goto out;
511
512 if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
513 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
514 goto out;
515 mqdnp = ptr;
516 cr_flag |= MQDNP_MMAP;
517
518 (void) __close_nc(fd);
519 cr_flag &= ~DFILE_OPEN;
520
521 /*
522 * we follow the same strategy as filesystem open() routine,
523 * where fcntl.h flags are changed to flags defined in file.h.
524 */
525 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
526 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
527
528 /* new message queue requires initialization */
529 if ((cr_flag & DFILE_CREATE) != 0) {
530 /* message queue header has to be initialized */
531 mq_init(mqhp, msgsize, maxmsg);
532 mqhp->mq_totsize = total_size;
533 }
534 mqdp->mqd_mq = mqhp;
535 mqdp->mqd_mqdn = mqdnp;
536 mqdp->mqd_magic = MQ_MAGIC;
537 mqdp->mqd_tcd = NULL;
538 mqdp->mqd_ownerdead = 0;
539 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
540 lmutex_lock(&mq_list_lock);
541 mqdp->mqd_next = mq_list;
542 mqdp->mqd_prev = NULL;
543 if (mq_list)
544 mq_list->mqd_prev = mqdp;
545 mq_list = mqdp;
546 lmutex_unlock(&mq_list_lock);
547 return ((mqd_t)mqdp);
548 }
549
550 locked = 0; /* fall into the error case */
551 out:
552 err = errno;
553 if ((cr_flag & DFILE_OPEN) != 0)
554 (void) __close_nc(fd);
555 if ((cr_flag & DFILE_CREATE) != 0)
556 (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
557 if ((cr_flag & PFILE_CREATE) != 0)
558 (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
559 if ((cr_flag & ALLOC_MEM) != 0)
560 free((void *)mqdp);
561 if ((cr_flag & DFILE_MMAP) != 0)
562 (void) munmap((caddr_t)mqhp, (size_t)total_size);
563 if ((cr_flag & MQDNP_MMAP) != 0)
564 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
565 if (locked)
566 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
567 errno = err;
568 return ((mqd_t)-1);
569 }
570
571 static void
mq_close_cleanup(mqdes_t * mqdp)572 mq_close_cleanup(mqdes_t *mqdp)
573 {
574 mqhdr_t *mqhp = mqdp->mqd_mq;
575 struct mq_dn *mqdnp = mqdp->mqd_mqdn;
576
577 /* invalidate the descriptor before freeing it */
578 mqdp->mqd_magic = 0;
579 if (!mqdp->mqd_ownerdead)
580 (void) mutex_unlock(&mqhp->mq_exclusive);
581
582 lmutex_lock(&mq_list_lock);
583 if (mqdp->mqd_next)
584 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
585 if (mqdp->mqd_prev)
586 mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
587 if (mq_list == mqdp)
588 mq_list = mqdp->mqd_next;
589 lmutex_unlock(&mq_list_lock);
590
591 free(mqdp);
592 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
593 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
594 }
595
596 int
mq_close(mqd_t mqdes)597 mq_close(mqd_t mqdes)
598 {
599 mqdes_t *mqdp = (mqdes_t *)mqdes;
600 mqhdr_t *mqhp;
601 thread_communication_data_t *tcdp;
602 int error;
603
604 if (!mq_is_valid(mqdp)) {
605 errno = EBADF;
606 return (-1);
607 }
608
609 mqhp = mqdp->mqd_mq;
610 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
611 mqdp->mqd_ownerdead = 1;
612 if (error == EOWNERDEAD)
613 (void) mutex_unlock(&mqhp->mq_exclusive);
614 /* carry on regardless, without holding mq_exclusive */
615 }
616
617 if (mqhp->mq_des == (uintptr_t)mqdp &&
618 mqhp->mq_sigid.sn_pid == getpid()) {
619 /* notification is set for this descriptor, remove it */
620 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
621 mqhp->mq_ntype = 0;
622 mqhp->mq_des = 0;
623 }
624
625 pthread_cleanup_push(mq_close_cleanup, mqdp);
626 if ((tcdp = mqdp->mqd_tcd) != NULL) {
627 mqdp->mqd_tcd = NULL;
628 del_sigev_mq(tcdp); /* possible cancellation point */
629 }
630 pthread_cleanup_pop(1); /* finish in the cleanup handler */
631
632 return (0);
633 }
634
635 int
mq_unlink(const char * path)636 mq_unlink(const char *path)
637 {
638 int err;
639
640 if (__pos4obj_check(path) < 0)
641 return (-1);
642
643 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
644 return (-1);
645 }
646
647 err = __pos4obj_unlink(path, MQ_PERM_TYPE);
648
649 if (err == 0 || (err == -1 && errno == EEXIST)) {
650 errno = 0;
651 err = __pos4obj_unlink(path, MQ_DATA_TYPE);
652 }
653
654 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
655 return (-1);
656
657 return (err);
658
659 }
660
661 static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)662 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
663 uint_t msg_prio, const timespec_t *timeout, int abs_rel)
664 {
665 mqdes_t *mqdp = (mqdes_t *)mqdes;
666 mqhdr_t *mqhp;
667 int err;
668 int notify = 0;
669
670 /*
671 * sem_*wait() does cancellation, if called.
672 * pthread_testcancel() ensures that cancellation takes place if
673 * there is a cancellation pending when mq_*send() is called.
674 */
675 pthread_testcancel();
676
677 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
678 errno = EBADF;
679 return (-1);
680 }
681
682 mqhp = mqdp->mqd_mq;
683
684 if (msg_prio >= mqhp->mq_maxprio) {
685 errno = EINVAL;
686 return (-1);
687 }
688 if (msg_len > mqhp->mq_maxsz) {
689 errno = EMSGSIZE;
690 return (-1);
691 }
692
693 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
694 err = sem_trywait(&mqhp->mq_notfull);
695 else {
696 /*
697 * We might get cancelled here...
698 */
699 if (timeout == NULL)
700 err = sem_wait(&mqhp->mq_notfull);
701 else if (abs_rel == ABS_TIME)
702 err = sem_timedwait(&mqhp->mq_notfull, timeout);
703 else
704 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
705 }
706 if (err == -1) {
707 /*
708 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
709 * by sem_*wait(), so we can just return.
710 */
711 return (-1);
712 }
713
714 /*
715 * By the time we're here, we know that we've got the capacity
716 * to add to the queue...now acquire the exclusive lock.
717 */
718 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
719 owner_dead(mqdp, err);
720 return (-1);
721 }
722
723 /*
724 * Now determine if we want to kick the notification. POSIX
725 * requires that if a process has registered for notification,
726 * we must kick it when the queue makes an empty to non-empty
727 * transition, and there are no blocked receivers. Note that
728 * this mechanism does _not_ guarantee that the kicked process
729 * will be able to receive a message without blocking;
730 * another receiver could intervene in the meantime. Thus,
731 * the notification mechanism is inherently racy; all we can
732 * do is hope to minimize the window as much as possible.
733 * In general, we want to avoid kicking the notification when
734 * there are clearly receivers blocked. We'll determine if
735 * we want to kick the notification before the mq_putmsg(),
736 * but the actual signotify() won't be done until the message
737 * is on the queue.
738 */
739 if (mqhp->mq_sigid.sn_pid != 0) {
740 int nmessages, nblocked;
741
742 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
743 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
744
745 if (nmessages == 0 && nblocked == 0)
746 notify = 1;
747 }
748
749 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
750 (void) sem_post(&mqhp->mq_notempty);
751
752 if (notify) {
753 /* notify and also delete the registration */
754 do_notify(mqhp);
755 }
756
757 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
758 (void) mutex_unlock(&mqhp->mq_exclusive);
759
760 return (0);
761 }
762
763 int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)764 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
765 {
766 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
767 NULL, ABS_TIME));
768 }
769
770 int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)771 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
772 uint_t msg_prio, const timespec_t *abs_timeout)
773 {
774 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
775 abs_timeout, ABS_TIME));
776 }
777
778 int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)779 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
780 uint_t msg_prio, const timespec_t *rel_timeout)
781 {
782 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
783 rel_timeout, REL_TIME));
784 }
785
786 static void
decrement_rblocked(mqhdr_t * mqhp)787 decrement_rblocked(mqhdr_t *mqhp)
788 {
789 int cancel_state;
790
791 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
792 while (sem_wait(&mqhp->mq_rblocked) == -1)
793 continue;
794 (void) pthread_setcancelstate(cancel_state, NULL);
795 }
796
797 static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)798 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
799 uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
800 {
801 mqdes_t *mqdp = (mqdes_t *)mqdes;
802 mqhdr_t *mqhp;
803 ssize_t msg_size;
804 int err;
805
806 /*
807 * sem_*wait() does cancellation, if called.
808 * pthread_testcancel() ensures that cancellation takes place if
809 * there is a cancellation pending when mq_*receive() is called.
810 */
811 pthread_testcancel();
812
813 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
814 errno = EBADF;
815 return (ssize_t)(-1);
816 }
817
818 mqhp = mqdp->mqd_mq;
819
820 if (msg_len < mqhp->mq_maxsz) {
821 errno = EMSGSIZE;
822 return (ssize_t)(-1);
823 }
824
825 /*
826 * The semaphoring scheme for mq_[timed]receive is a little hairy
827 * thanks to POSIX.1b's arcane notification mechanism. First,
828 * we try to take the common case and do a sem_trywait().
829 * If that doesn't work, and O_NONBLOCK hasn't been set,
830 * then note that we're going to sleep by incrementing the rblocked
831 * semaphore. We decrement that semaphore after waking up.
832 */
833 if (sem_trywait(&mqhp->mq_notempty) == -1) {
834 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
835 /*
836 * errno has been set to EAGAIN or EINTR by
837 * sem_trywait(), so we can just return.
838 */
839 return (-1);
840 }
841 /*
842 * If we're here, then we're probably going to block...
843 * increment the rblocked semaphore. If we get
844 * cancelled, decrement_rblocked() will decrement it.
845 */
846 (void) sem_post(&mqhp->mq_rblocked);
847
848 pthread_cleanup_push(decrement_rblocked, mqhp);
849 if (timeout == NULL)
850 err = sem_wait(&mqhp->mq_notempty);
851 else if (abs_rel == ABS_TIME)
852 err = sem_timedwait(&mqhp->mq_notempty, timeout);
853 else
854 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
855 pthread_cleanup_pop(1);
856
857 if (err == -1) {
858 /*
859 * We took a signal or timeout while waiting
860 * on mq_notempty...
861 */
862 return (-1);
863 }
864 }
865
866 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
867 owner_dead(mqdp, err);
868 return (-1);
869 }
870 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
871 (void) sem_post(&mqhp->mq_notfull);
872 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
873 (void) mutex_unlock(&mqhp->mq_exclusive);
874
875 return (msg_size);
876 }
877
878 ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)879 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
880 {
881 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
882 NULL, ABS_TIME));
883 }
884
885 ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)886 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
887 uint_t *msg_prio, const timespec_t *abs_timeout)
888 {
889 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
890 abs_timeout, ABS_TIME));
891 }
892
893 ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)894 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
895 uint_t *msg_prio, const timespec_t *rel_timeout)
896 {
897 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
898 rel_timeout, REL_TIME));
899 }
900
901 /*
902 * Only used below, in mq_notify().
903 * We already have a spawner thread.
904 * Verify that the attributes match; cancel it if necessary.
905 */
906 static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)907 cancel_if_necessary(thread_communication_data_t *tcdp,
908 const struct sigevent *sigevp)
909 {
910 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
911 sigevp->sigev_notify_attributes);
912
913 if (do_cancel) {
914 /*
915 * Attributes don't match, cancel the spawner thread.
916 */
917 (void) pthread_cancel(tcdp->tcd_server_id);
918 } else {
919 /*
920 * Reuse the existing spawner thread with possibly
921 * changed notification function and value.
922 */
923 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
924 tcdp->tcd_notif.sigev_signo = 0;
925 tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
926 tcdp->tcd_notif.sigev_notify_function =
927 sigevp->sigev_notify_function;
928 }
929
930 return (do_cancel);
931 }
932
933 int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)934 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
935 {
936 mqdes_t *mqdp = (mqdes_t *)mqdes;
937 mqhdr_t *mqhp;
938 thread_communication_data_t *tcdp;
939 siginfo_t mq_siginfo;
940 struct sigevent sigevent;
941 struct stat64 statb;
942 port_notify_t *pn;
943 void *userval;
944 int rval = -1;
945 int ntype;
946 int port;
947 int error;
948
949 if (!mq_is_valid(mqdp)) {
950 errno = EBADF;
951 return (-1);
952 }
953
954 mqhp = mqdp->mqd_mq;
955
956 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
957 mqdp->mqd_ownerdead = 1;
958 sigevp = NULL;
959 if (error == EOWNERDEAD)
960 (void) mutex_unlock(&mqhp->mq_exclusive);
961 /* carry on regardless, without holding mq_exclusive */
962 }
963
964 if (sigevp == NULL) { /* remove notification */
965 if (mqhp->mq_des == (uintptr_t)mqdp &&
966 mqhp->mq_sigid.sn_pid == getpid()) {
967 /* notification is set for this descriptor, remove it */
968 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
969 if ((tcdp = mqdp->mqd_tcd) != NULL) {
970 sig_mutex_lock(&tcdp->tcd_lock);
971 if (tcdp->tcd_msg_enabled) {
972 /* cancel the spawner thread */
973 tcdp = mqdp->mqd_tcd;
974 mqdp->mqd_tcd = NULL;
975 (void) pthread_cancel(
976 tcdp->tcd_server_id);
977 }
978 sig_mutex_unlock(&tcdp->tcd_lock);
979 }
980 mqhp->mq_ntype = 0;
981 mqhp->mq_des = 0;
982 } else {
983 /* notification is not set for this descriptor */
984 errno = EBUSY;
985 goto bad;
986 }
987 } else { /* register notification with this process */
988 switch (ntype = sigevp->sigev_notify) {
989 case SIGEV_THREAD:
990 userval = sigevp->sigev_value.sival_ptr;
991 port = -1;
992 break;
993 case SIGEV_PORT:
994 pn = sigevp->sigev_value.sival_ptr;
995 userval = pn->portnfy_user;
996 port = pn->portnfy_port;
997 if (fstat64(port, &statb) != 0 ||
998 !S_ISPORT(statb.st_mode)) {
999 errno = EBADF;
1000 goto bad;
1001 }
1002 (void) memset(&sigevent, 0, sizeof (sigevent));
1003 sigevent.sigev_notify = SIGEV_PORT;
1004 sigevp = &sigevent;
1005 break;
1006 }
1007 switch (ntype) {
1008 case SIGEV_NONE:
1009 mq_siginfo.si_signo = 0;
1010 mq_siginfo.si_code = SI_MESGQ;
1011 break;
1012 case SIGEV_SIGNAL:
1013 mq_siginfo.si_signo = sigevp->sigev_signo;
1014 mq_siginfo.si_value = sigevp->sigev_value;
1015 mq_siginfo.si_code = SI_MESGQ;
1016 break;
1017 case SIGEV_THREAD:
1018 if ((tcdp = mqdp->mqd_tcd) != NULL &&
1019 cancel_if_necessary(tcdp, sigevp))
1020 mqdp->mqd_tcd = NULL;
1021 /* FALLTHROUGH */
1022 case SIGEV_PORT:
1023 if ((tcdp = mqdp->mqd_tcd) == NULL) {
1024 /* we must create a spawner thread */
1025 tcdp = setup_sigev_handler(sigevp, MQ);
1026 if (tcdp == NULL) {
1027 errno = EBADF;
1028 goto bad;
1029 }
1030 tcdp->tcd_msg_enabled = 0;
1031 tcdp->tcd_msg_closing = 0;
1032 tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1033 if (launch_spawner(tcdp) != 0) {
1034 free_sigev_handler(tcdp);
1035 goto bad;
1036 }
1037 mqdp->mqd_tcd = tcdp;
1038 }
1039 mq_siginfo.si_signo = 0;
1040 mq_siginfo.si_code = SI_MESGQ;
1041 break;
1042 default:
1043 errno = EINVAL;
1044 goto bad;
1045 }
1046
1047 /* register notification */
1048 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1049 goto bad;
1050 mqhp->mq_ntype = ntype;
1051 mqhp->mq_des = (uintptr_t)mqdp;
1052 switch (ntype) {
1053 case SIGEV_THREAD:
1054 case SIGEV_PORT:
1055 tcdp->tcd_port = port;
1056 tcdp->tcd_msg_object = mqdp;
1057 tcdp->tcd_msg_userval = userval;
1058 sig_mutex_lock(&tcdp->tcd_lock);
1059 tcdp->tcd_msg_enabled = ntype;
1060 sig_mutex_unlock(&tcdp->tcd_lock);
1061 (void) cond_broadcast(&tcdp->tcd_cv);
1062 break;
1063 }
1064 }
1065
1066 rval = 0; /* success */
1067 bad:
1068 if (error == 0) {
1069 (void) mutex_unlock(&mqhp->mq_exclusive);
1070 } else {
1071 errno = EBADMSG;
1072 rval = -1;
1073 }
1074 return (rval);
1075 }
1076
1077 int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)1078 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1079 {
1080 mqdes_t *mqdp = (mqdes_t *)mqdes;
1081 mqhdr_t *mqhp;
1082 uint_t flag = 0;
1083
1084 if (!mq_is_valid(mqdp)) {
1085 errno = EBADF;
1086 return (-1);
1087 }
1088
1089 /* store current attributes */
1090 if (omqstat != NULL) {
1091 int count;
1092
1093 mqhp = mqdp->mqd_mq;
1094 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1095 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1096 omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1097 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1098 omqstat->mq_curmsgs = count;
1099 }
1100
1101 /* set description attributes */
1102 if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1103 flag = FNONBLOCK;
1104 mqdp->mqd_mqdn->mqdn_flags = flag;
1105
1106 return (0);
1107 }
1108
1109 int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)1110 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1111 {
1112 mqdes_t *mqdp = (mqdes_t *)mqdes;
1113 mqhdr_t *mqhp;
1114 int count;
1115
1116 if (!mq_is_valid(mqdp)) {
1117 errno = EBADF;
1118 return (-1);
1119 }
1120
1121 mqhp = mqdp->mqd_mq;
1122
1123 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1124 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1125 mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1126 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1127 mqstat->mq_curmsgs = count;
1128 return (0);
1129 }
1130
1131 /*
1132 * Cleanup after fork1() in the child process.
1133 */
1134 void
postfork1_child_sigev_mq(void)1135 postfork1_child_sigev_mq(void)
1136 {
1137 thread_communication_data_t *tcdp;
1138 mqdes_t *mqdp;
1139
1140 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1141 if ((tcdp = mqdp->mqd_tcd) != NULL) {
1142 mqdp->mqd_tcd = NULL;
1143 tcd_teardown(tcdp);
1144 }
1145 }
1146 }
1147