1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "mtlib.h" 31 #define _KMEMUSER 32 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 33 #undef _KMEMUSER 34 #include <mqueue.h> 35 #include <sys/types.h> 36 #include <sys/file.h> 37 #include <sys/mman.h> 38 #include <errno.h> 39 #include <stdarg.h> 40 #include <limits.h> 41 #include <pthread.h> 42 #include <assert.h> 43 #include <string.h> 44 #include <unistd.h> 45 #include <stdlib.h> 46 #include <sys/stat.h> 47 #include <inttypes.h> 48 #include "sigev_thread.h" 49 #include "pos4obj.h" 50 51 /* 52 * Default values per message queue 53 */ 54 #define MQ_MAXMSG 128 55 #define MQ_MAXSIZE 1024 56 57 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 58 59 /* 60 * Message header which is part of messages in link list 61 */ 62 typedef struct { 63 uint64_t msg_next; /* offset of next message in the link */ 64 uint64_t msg_len; /* length of the message */ 65 } msghdr_t; 66 67 /* 68 * message queue description 69 */ 70 struct mq_dn { 71 size_t mqdn_flags; /* open description flags */ 72 }; 73 74 /* 75 * message queue descriptor structure 76 */ 77 typedef struct mq_des { 78 struct mq_des *mqd_next; /* list of all open mq descriptors, */ 79 struct mq_des *mqd_prev; /* needed for fork-safety */ 80 int mqd_magic; /* magic # to identify mq_des */ 81 int mqd_flags; /* operation flag per open */ 82 struct mq_header *mqd_mq; /* address pointer of message Q */ 83 struct mq_dn *mqd_mqdn; /* open description */ 84 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 85 int mqd_ownerdead; /* mq_exclusive is inconsistent */ 86 } mqdes_t; 87 88 /* 89 * message queue common header, part of the mmap()ed file. 90 * Since message queues may be shared between 32- and 64-bit processes, 91 * care must be taken to make sure that the elements of this structure 92 * are identical for both _LP64 and _ILP32 cases. 93 */ 94 typedef struct mq_header { 95 /* first field must be mq_totsize, DO NOT insert before this */ 96 int64_t mq_totsize; /* total size of the Queue */ 97 int64_t mq_maxsz; /* max size of each message */ 98 uint32_t mq_maxmsg; /* max messages in the queue */ 99 uint32_t mq_maxprio; /* maximum mqueue priority */ 100 uint32_t mq_curmaxprio; /* current maximum MQ priority */ 101 uint32_t mq_mask; /* priority bitmask */ 102 uint64_t mq_freep; /* free message's head pointer */ 103 uint64_t mq_headpp; /* pointer to head pointers */ 104 uint64_t mq_tailpp; /* pointer to tail pointers */ 105 signotify_id_t mq_sigid; /* notification id (3 int's) */ 106 uint32_t mq_ntype; /* notification type (SIGEV_*) */ 107 uint64_t mq_des; /* pointer to msg Q descriptor */ 108 mutex_t mq_exclusive; /* acquire for exclusive access */ 109 sem_t mq_rblocked; /* number of processes rblocked */ 110 sem_t mq_notfull; /* mq_send()'s block on this */ 111 sem_t mq_notempty; /* mq_receive()'s block on this */ 112 sem_t mq_spawner; /* spawner thread blocks on this */ 113 } mqhdr_t; 114 115 /* 116 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 117 * If this assumption is somehow invalidated, mq_open() needs to be changed 118 * back to the old version which kept a count and enforced a limit. 119 * We make sure that this is pointed out to those changing <sys/param.h> 120 * by checking _MQ_OPEN_MAX at compile time. 121 */ 122 #if _MQ_OPEN_MAX != -1 123 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 124 #endif 125 126 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 127 128 #ifdef DEBUG 129 #define MQ_ASSERT(x) assert(x); 130 131 #define MQ_ASSERT_PTR(_m, _p) \ 132 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 133 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 134 _m->mq_totsize)); 135 136 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 137 int _val; \ 138 (void) sem_getvalue((sem), &_val); \ 139 assert((_val) <= val); } 140 #else 141 #define MQ_ASSERT(x) 142 #define MQ_ASSERT_PTR(_m, _p) 143 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 144 #endif 145 146 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 147 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 148 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 149 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 150 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 151 152 #define MQ_RESERVED ((mqdes_t *)-1) 153 154 #define ABS_TIME 0 155 #define REL_TIME 1 156 157 static mutex_t mq_list_lock = DEFAULTMUTEX; 158 static mqdes_t *mq_list = NULL; 159 160 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 161 162 static int 163 mq_is_valid(mqdes_t *mqdp) 164 { 165 /* 166 * Any use of a message queue after it was closed is 167 * undefined. But the standard strongly favours EBADF 168 * returns. Before we dereference which could be fatal, 169 * we first do some pointer sanity checks. 170 */ 171 if (mqdp != NULL && mqdp != MQ_RESERVED && 172 ((uintptr_t)mqdp & 0x7) == 0) { 173 return (mqdp->mqd_magic == MQ_MAGIC); 174 } 175 176 return (0); 177 } 178 179 static void 180 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 181 { 182 int i; 183 uint64_t temp; 184 uint64_t currentp; 185 uint64_t nextp; 186 187 /* 188 * We only need to initialize the non-zero fields. The use of 189 * ftruncate() on the message queue file assures that the 190 * pages will be zero-filled. 191 */ 192 (void) mutex_init(&mqhp->mq_exclusive, 193 USYNC_PROCESS | LOCK_ROBUST, NULL); 194 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 195 (void) sem_init(&mqhp->mq_notempty, 1, 0); 196 (void) sem_init(&mqhp->mq_spawner, 1, 0); 197 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 198 199 mqhp->mq_maxsz = msgsize; 200 mqhp->mq_maxmsg = maxmsg; 201 202 /* 203 * As of this writing (1997), there are 32 message queue priorities. 204 * If this is to change, then the size of the mq_mask will 205 * also have to change. If DEBUG is defined, assert that 206 * _MQ_PRIO_MAX hasn't changed. 207 */ 208 mqhp->mq_maxprio = _MQ_PRIO_MAX; 209 #if defined(DEBUG) 210 /* LINTED always true */ 211 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 212 #endif 213 214 /* 215 * Since the message queue can be mapped into different 216 * virtual address ranges by different processes, we don't 217 * keep track of pointers, only offsets into the shared region. 218 */ 219 mqhp->mq_headpp = sizeof (mqhdr_t); 220 mqhp->mq_tailpp = mqhp->mq_headpp + 221 mqhp->mq_maxprio * sizeof (uint64_t); 222 mqhp->mq_freep = mqhp->mq_tailpp + 223 mqhp->mq_maxprio * sizeof (uint64_t); 224 225 currentp = mqhp->mq_freep; 226 MQ_PTR(mqhp, currentp)->msg_next = 0; 227 228 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 229 for (i = 1; i < mqhp->mq_maxmsg; i++) { 230 nextp = currentp + sizeof (msghdr_t) + temp; 231 MQ_PTR(mqhp, currentp)->msg_next = nextp; 232 MQ_PTR(mqhp, nextp)->msg_next = 0; 233 currentp = nextp; 234 } 235 } 236 237 static size_t 238 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 239 { 240 uint64_t currentp; 241 msghdr_t *curbuf; 242 uint64_t *headpp; 243 uint64_t *tailpp; 244 245 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 246 247 /* 248 * Get the head and tail pointers for the queue of maximum 249 * priority. We shouldn't be here unless there is a message for 250 * us, so it's fair to assert that both the head and tail 251 * pointers are non-NULL. 252 */ 253 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 254 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 255 256 if (msg_prio != NULL) 257 *msg_prio = mqhp->mq_curmaxprio; 258 259 currentp = *headpp; 260 MQ_ASSERT_PTR(mqhp, currentp); 261 curbuf = MQ_PTR(mqhp, currentp); 262 263 if ((*headpp = curbuf->msg_next) == NULL) { 264 /* 265 * We just nuked the last message in this priority's queue. 266 * Twiddle this priority's bit, and then find the next bit 267 * tipped. 268 */ 269 uint_t prio = mqhp->mq_curmaxprio; 270 271 mqhp->mq_mask &= ~(1u << prio); 272 273 for (; prio != 0; prio--) 274 if (mqhp->mq_mask & (1u << prio)) 275 break; 276 mqhp->mq_curmaxprio = prio; 277 278 *tailpp = NULL; 279 } 280 281 /* 282 * Copy the message, and put the buffer back on the free list. 283 */ 284 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 285 curbuf->msg_next = mqhp->mq_freep; 286 mqhp->mq_freep = currentp; 287 288 return (curbuf->msg_len); 289 } 290 291 292 static void 293 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 294 { 295 uint64_t currentp; 296 msghdr_t *curbuf; 297 uint64_t *headpp; 298 uint64_t *tailpp; 299 300 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 301 302 /* 303 * Grab a free message block, and link it in. We shouldn't 304 * be here unless there is room in the queue for us; it's 305 * fair to assert that the free pointer is non-NULL. 306 */ 307 currentp = mqhp->mq_freep; 308 MQ_ASSERT_PTR(mqhp, currentp); 309 curbuf = MQ_PTR(mqhp, currentp); 310 311 /* 312 * Remove a message from the free list, and copy in the new contents. 313 */ 314 mqhp->mq_freep = curbuf->msg_next; 315 curbuf->msg_next = NULL; 316 (void) memcpy((char *)&curbuf[1], msgp, len); 317 curbuf->msg_len = len; 318 319 headpp = HEAD_PTR(mqhp, prio); 320 tailpp = TAIL_PTR(mqhp, prio); 321 322 if (*tailpp == 0) { 323 /* 324 * This is the first message on this queue. Set the 325 * head and tail pointers, and tip the appropriate bit 326 * in the priority mask. 327 */ 328 *headpp = currentp; 329 *tailpp = currentp; 330 mqhp->mq_mask |= (1u << prio); 331 if (prio > mqhp->mq_curmaxprio) 332 mqhp->mq_curmaxprio = prio; 333 } else { 334 MQ_ASSERT_PTR(mqhp, *tailpp); 335 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 336 *tailpp = currentp; 337 } 338 } 339 340 /* 341 * Send a notification and also delete the registration. 342 */ 343 static void 344 do_notify(mqhdr_t *mqhp) 345 { 346 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 347 if (mqhp->mq_ntype == SIGEV_THREAD || 348 mqhp->mq_ntype == SIGEV_PORT) 349 (void) sem_post(&mqhp->mq_spawner); 350 mqhp->mq_ntype = 0; 351 mqhp->mq_des = 0; 352 } 353 354 /* 355 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE. 356 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that 357 * they fail with errno == EBADMSG. Trigger any registered notification. 358 */ 359 static void 360 owner_dead(mqdes_t *mqdp, int error) 361 { 362 mqhdr_t *mqhp = mqdp->mqd_mq; 363 364 mqdp->mqd_ownerdead = 1; 365 (void) sem_post(&mqhp->mq_notfull); 366 (void) sem_post(&mqhp->mq_notempty); 367 if (error == EOWNERDEAD) { 368 if (mqhp->mq_sigid.sn_pid != 0) 369 do_notify(mqhp); 370 (void) mutex_unlock(&mqhp->mq_exclusive); 371 } 372 errno = EBADMSG; 373 } 374 375 mqd_t 376 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 377 { 378 va_list ap; 379 mode_t mode = 0; 380 struct mq_attr *attr = NULL; 381 int fd; 382 int err; 383 int cr_flag = 0; 384 int locked = 0; 385 uint64_t total_size; 386 size_t msgsize; 387 ssize_t maxmsg; 388 uint64_t temp; 389 void *ptr; 390 mqdes_t *mqdp; 391 mqhdr_t *mqhp; 392 struct mq_dn *mqdnp; 393 394 if (__pos4obj_check(path) == -1) 395 return ((mqd_t)-1); 396 397 /* acquire MSGQ lock to have atomic operation */ 398 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 399 goto out; 400 locked = 1; 401 402 va_start(ap, oflag); 403 /* filter oflag to have READ/WRITE/CREATE modes only */ 404 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 405 if ((oflag & O_CREAT) != 0) { 406 mode = va_arg(ap, mode_t); 407 attr = va_arg(ap, struct mq_attr *); 408 } 409 va_end(ap); 410 411 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 412 mode, &cr_flag)) < 0) 413 goto out; 414 415 /* closing permission file */ 416 (void) __close_nc(fd); 417 418 /* Try to open/create data file */ 419 if (cr_flag) { 420 cr_flag = PFILE_CREATE; 421 if (attr == NULL) { 422 maxmsg = MQ_MAXMSG; 423 msgsize = MQ_MAXSIZE; 424 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 425 errno = EINVAL; 426 goto out; 427 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 428 errno = ENOSPC; 429 goto out; 430 } else { 431 maxmsg = attr->mq_maxmsg; 432 msgsize = attr->mq_msgsize; 433 } 434 435 /* adjust for message size at word boundary */ 436 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 437 438 total_size = sizeof (mqhdr_t) + 439 maxmsg * (temp + sizeof (msghdr_t)) + 440 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 441 442 if (total_size > SSIZE_MAX) { 443 errno = ENOSPC; 444 goto out; 445 } 446 447 /* 448 * data file is opened with read/write to those 449 * who have read or write permission 450 */ 451 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 452 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 453 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 454 goto out; 455 456 cr_flag |= DFILE_CREATE | DFILE_OPEN; 457 458 /* force permissions to avoid umask effect */ 459 if (fchmod(fd, mode) < 0) 460 goto out; 461 462 if (ftruncate64(fd, (off64_t)total_size) < 0) 463 goto out; 464 } else { 465 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 466 O_RDWR, 0666, &err)) < 0) 467 goto out; 468 cr_flag = DFILE_OPEN; 469 470 /* Message queue has not been initialized yet */ 471 if (read(fd, &total_size, sizeof (total_size)) != 472 sizeof (total_size) || total_size == 0) { 473 errno = ENOENT; 474 goto out; 475 } 476 477 /* Message queue too big for this process to handle */ 478 if (total_size > SSIZE_MAX) { 479 errno = EFBIG; 480 goto out; 481 } 482 } 483 484 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 485 errno = ENOMEM; 486 goto out; 487 } 488 cr_flag |= ALLOC_MEM; 489 490 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 491 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 492 goto out; 493 mqhp = ptr; 494 cr_flag |= DFILE_MMAP; 495 496 /* closing data file */ 497 (void) __close_nc(fd); 498 cr_flag &= ~DFILE_OPEN; 499 500 /* 501 * create, unlink, size, mmap, and close description file 502 * all for a flag word in anonymous shared memory 503 */ 504 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 505 0666, &err)) < 0) 506 goto out; 507 cr_flag |= DFILE_OPEN; 508 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 509 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 510 goto out; 511 512 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 513 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 514 goto out; 515 mqdnp = ptr; 516 cr_flag |= MQDNP_MMAP; 517 518 (void) __close_nc(fd); 519 cr_flag &= ~DFILE_OPEN; 520 521 /* 522 * we follow the same strategy as filesystem open() routine, 523 * where fcntl.h flags are changed to flags defined in file.h. 524 */ 525 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 526 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 527 528 /* new message queue requires initialization */ 529 if ((cr_flag & DFILE_CREATE) != 0) { 530 /* message queue header has to be initialized */ 531 mq_init(mqhp, msgsize, maxmsg); 532 mqhp->mq_totsize = total_size; 533 } 534 mqdp->mqd_mq = mqhp; 535 mqdp->mqd_mqdn = mqdnp; 536 mqdp->mqd_magic = MQ_MAGIC; 537 mqdp->mqd_tcd = NULL; 538 mqdp->mqd_ownerdead = 0; 539 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 540 lmutex_lock(&mq_list_lock); 541 mqdp->mqd_next = mq_list; 542 mqdp->mqd_prev = NULL; 543 if (mq_list) 544 mq_list->mqd_prev = mqdp; 545 mq_list = mqdp; 546 lmutex_unlock(&mq_list_lock); 547 return ((mqd_t)mqdp); 548 } 549 550 locked = 0; /* fall into the error case */ 551 out: 552 err = errno; 553 if ((cr_flag & DFILE_OPEN) != 0) 554 (void) __close_nc(fd); 555 if ((cr_flag & DFILE_CREATE) != 0) 556 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 557 if ((cr_flag & PFILE_CREATE) != 0) 558 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 559 if ((cr_flag & ALLOC_MEM) != 0) 560 free((void *)mqdp); 561 if ((cr_flag & DFILE_MMAP) != 0) 562 (void) munmap((caddr_t)mqhp, (size_t)total_size); 563 if ((cr_flag & MQDNP_MMAP) != 0) 564 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 565 if (locked) 566 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 567 errno = err; 568 return ((mqd_t)-1); 569 } 570 571 static void 572 mq_close_cleanup(mqdes_t *mqdp) 573 { 574 mqhdr_t *mqhp = mqdp->mqd_mq; 575 struct mq_dn *mqdnp = mqdp->mqd_mqdn; 576 577 /* invalidate the descriptor before freeing it */ 578 mqdp->mqd_magic = 0; 579 if (!mqdp->mqd_ownerdead) 580 (void) mutex_unlock(&mqhp->mq_exclusive); 581 582 lmutex_lock(&mq_list_lock); 583 if (mqdp->mqd_next) 584 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 585 if (mqdp->mqd_prev) 586 mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 587 if (mq_list == mqdp) 588 mq_list = mqdp->mqd_next; 589 lmutex_unlock(&mq_list_lock); 590 591 free(mqdp); 592 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 593 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 594 } 595 596 int 597 mq_close(mqd_t mqdes) 598 { 599 mqdes_t *mqdp = (mqdes_t *)mqdes; 600 mqhdr_t *mqhp; 601 thread_communication_data_t *tcdp; 602 int error; 603 604 if (!mq_is_valid(mqdp)) { 605 errno = EBADF; 606 return (-1); 607 } 608 609 mqhp = mqdp->mqd_mq; 610 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 611 mqdp->mqd_ownerdead = 1; 612 if (error == EOWNERDEAD) 613 (void) mutex_unlock(&mqhp->mq_exclusive); 614 /* carry on regardless, without holding mq_exclusive */ 615 } 616 617 if (mqhp->mq_des == (uintptr_t)mqdp && 618 mqhp->mq_sigid.sn_pid == getpid()) { 619 /* notification is set for this descriptor, remove it */ 620 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 621 mqhp->mq_ntype = 0; 622 mqhp->mq_des = 0; 623 } 624 625 pthread_cleanup_push(mq_close_cleanup, mqdp); 626 if ((tcdp = mqdp->mqd_tcd) != NULL) { 627 mqdp->mqd_tcd = NULL; 628 del_sigev_mq(tcdp); /* possible cancellation point */ 629 } 630 pthread_cleanup_pop(1); /* finish in the cleanup handler */ 631 632 return (0); 633 } 634 635 int 636 mq_unlink(const char *path) 637 { 638 int err; 639 640 if (__pos4obj_check(path) < 0) 641 return (-1); 642 643 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 644 return (-1); 645 } 646 647 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 648 649 if (err == 0 || (err == -1 && errno == EEXIST)) { 650 errno = 0; 651 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 652 } 653 654 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 655 return (-1); 656 657 return (err); 658 659 } 660 661 static int 662 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 663 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 664 { 665 mqdes_t *mqdp = (mqdes_t *)mqdes; 666 mqhdr_t *mqhp; 667 int err; 668 int notify = 0; 669 670 /* 671 * sem_*wait() does cancellation, if called. 672 * pthread_testcancel() ensures that cancellation takes place if 673 * there is a cancellation pending when mq_*send() is called. 674 */ 675 pthread_testcancel(); 676 677 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 678 errno = EBADF; 679 return (-1); 680 } 681 682 mqhp = mqdp->mqd_mq; 683 684 if (msg_prio >= mqhp->mq_maxprio) { 685 errno = EINVAL; 686 return (-1); 687 } 688 if (msg_len > mqhp->mq_maxsz) { 689 errno = EMSGSIZE; 690 return (-1); 691 } 692 693 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 694 err = sem_trywait(&mqhp->mq_notfull); 695 else { 696 /* 697 * We might get cancelled here... 698 */ 699 if (timeout == NULL) 700 err = sem_wait(&mqhp->mq_notfull); 701 else if (abs_rel == ABS_TIME) 702 err = sem_timedwait(&mqhp->mq_notfull, timeout); 703 else 704 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 705 } 706 if (err == -1) { 707 /* 708 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 709 * by sem_*wait(), so we can just return. 710 */ 711 return (-1); 712 } 713 714 /* 715 * By the time we're here, we know that we've got the capacity 716 * to add to the queue...now acquire the exclusive lock. 717 */ 718 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 719 owner_dead(mqdp, err); 720 return (-1); 721 } 722 723 /* 724 * Now determine if we want to kick the notification. POSIX 725 * requires that if a process has registered for notification, 726 * we must kick it when the queue makes an empty to non-empty 727 * transition, and there are no blocked receivers. Note that 728 * this mechanism does _not_ guarantee that the kicked process 729 * will be able to receive a message without blocking; 730 * another receiver could intervene in the meantime. Thus, 731 * the notification mechanism is inherently racy; all we can 732 * do is hope to minimize the window as much as possible. 733 * In general, we want to avoid kicking the notification when 734 * there are clearly receivers blocked. We'll determine if 735 * we want to kick the notification before the mq_putmsg(), 736 * but the actual signotify() won't be done until the message 737 * is on the queue. 738 */ 739 if (mqhp->mq_sigid.sn_pid != 0) { 740 int nmessages, nblocked; 741 742 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 743 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 744 745 if (nmessages == 0 && nblocked == 0) 746 notify = 1; 747 } 748 749 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 750 (void) sem_post(&mqhp->mq_notempty); 751 752 if (notify) { 753 /* notify and also delete the registration */ 754 do_notify(mqhp); 755 } 756 757 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 758 (void) mutex_unlock(&mqhp->mq_exclusive); 759 760 return (0); 761 } 762 763 int 764 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 765 { 766 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 767 NULL, ABS_TIME)); 768 } 769 770 int 771 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 772 uint_t msg_prio, const timespec_t *abs_timeout) 773 { 774 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 775 abs_timeout, ABS_TIME)); 776 } 777 778 int 779 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 780 uint_t msg_prio, const timespec_t *rel_timeout) 781 { 782 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 783 rel_timeout, REL_TIME)); 784 } 785 786 static void 787 decrement_rblocked(mqhdr_t *mqhp) 788 { 789 int cancel_state; 790 791 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 792 while (sem_wait(&mqhp->mq_rblocked) == -1) 793 continue; 794 (void) pthread_setcancelstate(cancel_state, NULL); 795 } 796 797 static ssize_t 798 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 799 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 800 { 801 mqdes_t *mqdp = (mqdes_t *)mqdes; 802 mqhdr_t *mqhp; 803 ssize_t msg_size; 804 int err; 805 806 /* 807 * sem_*wait() does cancellation, if called. 808 * pthread_testcancel() ensures that cancellation takes place if 809 * there is a cancellation pending when mq_*receive() is called. 810 */ 811 pthread_testcancel(); 812 813 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 814 errno = EBADF; 815 return (ssize_t)(-1); 816 } 817 818 mqhp = mqdp->mqd_mq; 819 820 if (msg_len < mqhp->mq_maxsz) { 821 errno = EMSGSIZE; 822 return (ssize_t)(-1); 823 } 824 825 /* 826 * The semaphoring scheme for mq_[timed]receive is a little hairy 827 * thanks to POSIX.1b's arcane notification mechanism. First, 828 * we try to take the common case and do a sem_trywait(). 829 * If that doesn't work, and O_NONBLOCK hasn't been set, 830 * then note that we're going to sleep by incrementing the rblocked 831 * semaphore. We decrement that semaphore after waking up. 832 */ 833 if (sem_trywait(&mqhp->mq_notempty) == -1) { 834 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 835 /* 836 * errno has been set to EAGAIN or EINTR by 837 * sem_trywait(), so we can just return. 838 */ 839 return (-1); 840 } 841 /* 842 * If we're here, then we're probably going to block... 843 * increment the rblocked semaphore. If we get 844 * cancelled, decrement_rblocked() will decrement it. 845 */ 846 (void) sem_post(&mqhp->mq_rblocked); 847 848 pthread_cleanup_push(decrement_rblocked, mqhp); 849 if (timeout == NULL) 850 err = sem_wait(&mqhp->mq_notempty); 851 else if (abs_rel == ABS_TIME) 852 err = sem_timedwait(&mqhp->mq_notempty, timeout); 853 else 854 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 855 pthread_cleanup_pop(1); 856 857 if (err == -1) { 858 /* 859 * We took a signal or timeout while waiting 860 * on mq_notempty... 861 */ 862 return (-1); 863 } 864 } 865 866 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 867 owner_dead(mqdp, err); 868 return (-1); 869 } 870 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 871 (void) sem_post(&mqhp->mq_notfull); 872 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 873 (void) mutex_unlock(&mqhp->mq_exclusive); 874 875 return (msg_size); 876 } 877 878 ssize_t 879 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 880 { 881 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 882 NULL, ABS_TIME)); 883 } 884 885 ssize_t 886 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 887 uint_t *msg_prio, const timespec_t *abs_timeout) 888 { 889 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 890 abs_timeout, ABS_TIME)); 891 } 892 893 ssize_t 894 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 895 uint_t *msg_prio, const timespec_t *rel_timeout) 896 { 897 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 898 rel_timeout, REL_TIME)); 899 } 900 901 /* 902 * Only used below, in mq_notify(). 903 * We already have a spawner thread. 904 * Verify that the attributes match; cancel it if necessary. 905 */ 906 static int 907 cancel_if_necessary(thread_communication_data_t *tcdp, 908 const struct sigevent *sigevp) 909 { 910 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp, 911 sigevp->sigev_notify_attributes); 912 913 if (do_cancel) { 914 /* 915 * Attributes don't match, cancel the spawner thread. 916 */ 917 (void) pthread_cancel(tcdp->tcd_server_id); 918 } else { 919 /* 920 * Reuse the existing spawner thread with possibly 921 * changed notification function and value. 922 */ 923 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 924 tcdp->tcd_notif.sigev_signo = 0; 925 tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 926 tcdp->tcd_notif.sigev_notify_function = 927 sigevp->sigev_notify_function; 928 } 929 930 return (do_cancel); 931 } 932 933 int 934 mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 935 { 936 mqdes_t *mqdp = (mqdes_t *)mqdes; 937 mqhdr_t *mqhp; 938 thread_communication_data_t *tcdp; 939 siginfo_t mq_siginfo; 940 struct sigevent sigevent; 941 struct stat64 statb; 942 port_notify_t *pn; 943 void *userval; 944 int rval = -1; 945 int ntype; 946 int port; 947 int error; 948 949 if (!mq_is_valid(mqdp)) { 950 errno = EBADF; 951 return (-1); 952 } 953 954 mqhp = mqdp->mqd_mq; 955 956 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 957 mqdp->mqd_ownerdead = 1; 958 sigevp = NULL; 959 if (error == EOWNERDEAD) 960 (void) mutex_unlock(&mqhp->mq_exclusive); 961 /* carry on regardless, without holding mq_exclusive */ 962 } 963 964 if (sigevp == NULL) { /* remove notification */ 965 if (mqhp->mq_des == (uintptr_t)mqdp && 966 mqhp->mq_sigid.sn_pid == getpid()) { 967 /* notification is set for this descriptor, remove it */ 968 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 969 if ((tcdp = mqdp->mqd_tcd) != NULL) { 970 sig_mutex_lock(&tcdp->tcd_lock); 971 if (tcdp->tcd_msg_enabled) { 972 /* cancel the spawner thread */ 973 tcdp = mqdp->mqd_tcd; 974 mqdp->mqd_tcd = NULL; 975 (void) pthread_cancel( 976 tcdp->tcd_server_id); 977 } 978 sig_mutex_unlock(&tcdp->tcd_lock); 979 } 980 mqhp->mq_ntype = 0; 981 mqhp->mq_des = 0; 982 } else { 983 /* notification is not set for this descriptor */ 984 errno = EBUSY; 985 goto bad; 986 } 987 } else { /* register notification with this process */ 988 switch (ntype = sigevp->sigev_notify) { 989 case SIGEV_THREAD: 990 userval = sigevp->sigev_value.sival_ptr; 991 port = -1; 992 break; 993 case SIGEV_PORT: 994 pn = sigevp->sigev_value.sival_ptr; 995 userval = pn->portnfy_user; 996 port = pn->portnfy_port; 997 if (fstat64(port, &statb) != 0 || 998 !S_ISPORT(statb.st_mode)) { 999 errno = EBADF; 1000 goto bad; 1001 } 1002 (void) memset(&sigevent, 0, sizeof (sigevent)); 1003 sigevent.sigev_notify = SIGEV_PORT; 1004 sigevp = &sigevent; 1005 break; 1006 } 1007 switch (ntype) { 1008 case SIGEV_NONE: 1009 mq_siginfo.si_signo = 0; 1010 mq_siginfo.si_code = SI_MESGQ; 1011 break; 1012 case SIGEV_SIGNAL: 1013 mq_siginfo.si_signo = sigevp->sigev_signo; 1014 mq_siginfo.si_value = sigevp->sigev_value; 1015 mq_siginfo.si_code = SI_MESGQ; 1016 break; 1017 case SIGEV_THREAD: 1018 if ((tcdp = mqdp->mqd_tcd) != NULL && 1019 cancel_if_necessary(tcdp, sigevp)) 1020 mqdp->mqd_tcd = NULL; 1021 /* FALLTHROUGH */ 1022 case SIGEV_PORT: 1023 if ((tcdp = mqdp->mqd_tcd) == NULL) { 1024 /* we must create a spawner thread */ 1025 tcdp = setup_sigev_handler(sigevp, MQ); 1026 if (tcdp == NULL) { 1027 errno = EBADF; 1028 goto bad; 1029 } 1030 tcdp->tcd_msg_enabled = 0; 1031 tcdp->tcd_msg_closing = 0; 1032 tcdp->tcd_msg_avail = &mqhp->mq_spawner; 1033 if (launch_spawner(tcdp) != 0) { 1034 free_sigev_handler(tcdp); 1035 goto bad; 1036 } 1037 mqdp->mqd_tcd = tcdp; 1038 } 1039 mq_siginfo.si_signo = 0; 1040 mq_siginfo.si_code = SI_MESGQ; 1041 break; 1042 default: 1043 errno = EINVAL; 1044 goto bad; 1045 } 1046 1047 /* register notification */ 1048 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 1049 goto bad; 1050 mqhp->mq_ntype = ntype; 1051 mqhp->mq_des = (uintptr_t)mqdp; 1052 switch (ntype) { 1053 case SIGEV_THREAD: 1054 case SIGEV_PORT: 1055 tcdp->tcd_port = port; 1056 tcdp->tcd_msg_object = mqdp; 1057 tcdp->tcd_msg_userval = userval; 1058 sig_mutex_lock(&tcdp->tcd_lock); 1059 tcdp->tcd_msg_enabled = ntype; 1060 sig_mutex_unlock(&tcdp->tcd_lock); 1061 (void) cond_broadcast(&tcdp->tcd_cv); 1062 break; 1063 } 1064 } 1065 1066 rval = 0; /* success */ 1067 bad: 1068 if (error == 0) { 1069 (void) mutex_unlock(&mqhp->mq_exclusive); 1070 } else { 1071 errno = EBADMSG; 1072 rval = -1; 1073 } 1074 return (rval); 1075 } 1076 1077 int 1078 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1079 { 1080 mqdes_t *mqdp = (mqdes_t *)mqdes; 1081 mqhdr_t *mqhp; 1082 uint_t flag = 0; 1083 1084 if (!mq_is_valid(mqdp)) { 1085 errno = EBADF; 1086 return (-1); 1087 } 1088 1089 /* store current attributes */ 1090 if (omqstat != NULL) { 1091 int count; 1092 1093 mqhp = mqdp->mqd_mq; 1094 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1095 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1096 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1097 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1098 omqstat->mq_curmsgs = count; 1099 } 1100 1101 /* set description attributes */ 1102 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1103 flag = FNONBLOCK; 1104 mqdp->mqd_mqdn->mqdn_flags = flag; 1105 1106 return (0); 1107 } 1108 1109 int 1110 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1111 { 1112 mqdes_t *mqdp = (mqdes_t *)mqdes; 1113 mqhdr_t *mqhp; 1114 int count; 1115 1116 if (!mq_is_valid(mqdp)) { 1117 errno = EBADF; 1118 return (-1); 1119 } 1120 1121 mqhp = mqdp->mqd_mq; 1122 1123 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1124 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1125 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1126 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1127 mqstat->mq_curmsgs = count; 1128 return (0); 1129 } 1130 1131 /* 1132 * Cleanup after fork1() in the child process. 1133 */ 1134 void 1135 postfork1_child_sigev_mq(void) 1136 { 1137 thread_communication_data_t *tcdp; 1138 mqdes_t *mqdp; 1139 1140 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1141 if ((tcdp = mqdp->mqd_tcd) != NULL) { 1142 mqdp->mqd_tcd = NULL; 1143 tcd_teardown(tcdp); 1144 } 1145 } 1146 } 1147