1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "lint.h" 28 #include "mtlib.h" 29 #define _KMEMUSER 30 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 31 #undef _KMEMUSER 32 #include <mqueue.h> 33 #include <sys/types.h> 34 #include <sys/file.h> 35 #include <sys/mman.h> 36 #include <errno.h> 37 #include <stdarg.h> 38 #include <limits.h> 39 #include <pthread.h> 40 #include <assert.h> 41 #include <string.h> 42 #include <unistd.h> 43 #include <stdlib.h> 44 #include <sys/stat.h> 45 #include <inttypes.h> 46 #include "sigev_thread.h" 47 #include "pos4obj.h" 48 49 /* 50 * Default values per message queue 51 */ 52 #define MQ_MAXMSG 128 53 #define MQ_MAXSIZE 1024 54 55 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 56 57 /* 58 * Message header which is part of messages in link list 59 */ 60 typedef struct { 61 uint64_t msg_next; /* offset of next message in the link */ 62 uint64_t msg_len; /* length of the message */ 63 } msghdr_t; 64 65 /* 66 * message queue description 67 */ 68 struct mq_dn { 69 size_t mqdn_flags; /* open description flags */ 70 }; 71 72 /* 73 * message queue descriptor structure 74 */ 75 typedef struct mq_des { 76 struct mq_des *mqd_next; /* list of all open mq descriptors, */ 77 struct mq_des *mqd_prev; /* needed for fork-safety */ 78 int mqd_magic; /* magic # to identify mq_des */ 79 int mqd_flags; /* operation flag per open */ 80 struct mq_header *mqd_mq; /* address pointer of message Q */ 81 struct mq_dn *mqd_mqdn; /* open description */ 82 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 83 int mqd_ownerdead; /* mq_exclusive is inconsistent */ 84 } mqdes_t; 85 86 /* 87 * message queue common header, part of the mmap()ed file. 88 * Since message queues may be shared between 32- and 64-bit processes, 89 * care must be taken to make sure that the elements of this structure 90 * are identical for both _LP64 and _ILP32 cases. 91 */ 92 typedef struct mq_header { 93 /* first field must be mq_totsize, DO NOT insert before this */ 94 int64_t mq_totsize; /* total size of the Queue */ 95 int64_t mq_maxsz; /* max size of each message */ 96 uint32_t mq_maxmsg; /* max messages in the queue */ 97 uint32_t mq_maxprio; /* maximum mqueue priority */ 98 uint32_t mq_curmaxprio; /* current maximum MQ priority */ 99 uint32_t mq_mask; /* priority bitmask */ 100 uint64_t mq_freep; /* free message's head pointer */ 101 uint64_t mq_headpp; /* pointer to head pointers */ 102 uint64_t mq_tailpp; /* pointer to tail pointers */ 103 signotify_id_t mq_sigid; /* notification id (3 int's) */ 104 uint32_t mq_ntype; /* notification type (SIGEV_*) */ 105 uint64_t mq_des; /* pointer to msg Q descriptor */ 106 mutex_t mq_exclusive; /* acquire for exclusive access */ 107 sem_t mq_rblocked; /* number of processes rblocked */ 108 sem_t mq_notfull; /* mq_send()'s block on this */ 109 sem_t mq_notempty; /* mq_receive()'s block on this */ 110 sem_t mq_spawner; /* spawner thread blocks on this */ 111 } mqhdr_t; 112 113 /* 114 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 115 * If this assumption is somehow invalidated, mq_open() needs to be changed 116 * back to the old version which kept a count and enforced a limit. 117 * We make sure that this is pointed out to those changing <sys/param.h> 118 * by checking _MQ_OPEN_MAX at compile time. 119 */ 120 #if _MQ_OPEN_MAX != -1 121 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 122 #endif 123 124 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 125 126 #ifdef DEBUG 127 #define MQ_ASSERT(x) assert(x); 128 129 #define MQ_ASSERT_PTR(_m, _p) \ 130 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 131 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 132 _m->mq_totsize)); 133 134 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 135 int _val; \ 136 (void) sem_getvalue((sem), &_val); \ 137 assert((_val) <= val); } 138 #else 139 #define MQ_ASSERT(x) 140 #define MQ_ASSERT_PTR(_m, _p) 141 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 142 #endif 143 144 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 145 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 146 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 147 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 148 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 149 150 #define MQ_RESERVED ((mqdes_t *)-1) 151 152 #define ABS_TIME 0 153 #define REL_TIME 1 154 155 static mutex_t mq_list_lock = DEFAULTMUTEX; 156 static mqdes_t *mq_list = NULL; 157 158 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 159 160 static int 161 mq_is_valid(mqdes_t *mqdp) 162 { 163 /* 164 * Any use of a message queue after it was closed is 165 * undefined. But the standard strongly favours EBADF 166 * returns. Before we dereference which could be fatal, 167 * we first do some pointer sanity checks. 168 */ 169 if (mqdp != NULL && mqdp != MQ_RESERVED && 170 ((uintptr_t)mqdp & 0x7) == 0) { 171 return (mqdp->mqd_magic == MQ_MAGIC); 172 } 173 174 return (0); 175 } 176 177 static void 178 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 179 { 180 int i; 181 uint64_t temp; 182 uint64_t currentp; 183 uint64_t nextp; 184 185 /* 186 * We only need to initialize the non-zero fields. The use of 187 * ftruncate() on the message queue file assures that the 188 * pages will be zero-filled. 189 */ 190 (void) mutex_init(&mqhp->mq_exclusive, 191 USYNC_PROCESS | LOCK_ROBUST, NULL); 192 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 193 (void) sem_init(&mqhp->mq_notempty, 1, 0); 194 (void) sem_init(&mqhp->mq_spawner, 1, 0); 195 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 196 197 mqhp->mq_maxsz = msgsize; 198 mqhp->mq_maxmsg = maxmsg; 199 200 /* 201 * As of this writing (1997), there are 32 message queue priorities. 202 * If this is to change, then the size of the mq_mask will 203 * also have to change. If DEBUG is defined, assert that 204 * _MQ_PRIO_MAX hasn't changed. 205 */ 206 mqhp->mq_maxprio = _MQ_PRIO_MAX; 207 #if defined(DEBUG) 208 /* LINTED always true */ 209 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 210 #endif 211 212 /* 213 * Since the message queue can be mapped into different 214 * virtual address ranges by different processes, we don't 215 * keep track of pointers, only offsets into the shared region. 216 */ 217 mqhp->mq_headpp = sizeof (mqhdr_t); 218 mqhp->mq_tailpp = mqhp->mq_headpp + 219 mqhp->mq_maxprio * sizeof (uint64_t); 220 mqhp->mq_freep = mqhp->mq_tailpp + 221 mqhp->mq_maxprio * sizeof (uint64_t); 222 223 currentp = mqhp->mq_freep; 224 MQ_PTR(mqhp, currentp)->msg_next = 0; 225 226 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 227 for (i = 1; i < mqhp->mq_maxmsg; i++) { 228 nextp = currentp + sizeof (msghdr_t) + temp; 229 MQ_PTR(mqhp, currentp)->msg_next = nextp; 230 MQ_PTR(mqhp, nextp)->msg_next = 0; 231 currentp = nextp; 232 } 233 } 234 235 static size_t 236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 237 { 238 uint64_t currentp; 239 msghdr_t *curbuf; 240 uint64_t *headpp; 241 uint64_t *tailpp; 242 243 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 244 245 /* 246 * Get the head and tail pointers for the queue of maximum 247 * priority. We shouldn't be here unless there is a message for 248 * us, so it's fair to assert that both the head and tail 249 * pointers are non-NULL. 250 */ 251 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 252 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 253 254 if (msg_prio != NULL) 255 *msg_prio = mqhp->mq_curmaxprio; 256 257 currentp = *headpp; 258 MQ_ASSERT_PTR(mqhp, currentp); 259 curbuf = MQ_PTR(mqhp, currentp); 260 261 if ((*headpp = curbuf->msg_next) == 0) { 262 /* 263 * We just nuked the last message in this priority's queue. 264 * Twiddle this priority's bit, and then find the next bit 265 * tipped. 266 */ 267 uint_t prio = mqhp->mq_curmaxprio; 268 269 mqhp->mq_mask &= ~(1u << prio); 270 271 for (; prio != 0; prio--) 272 if (mqhp->mq_mask & (1u << prio)) 273 break; 274 mqhp->mq_curmaxprio = prio; 275 276 *tailpp = 0; 277 } 278 279 /* 280 * Copy the message, and put the buffer back on the free list. 281 */ 282 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 283 curbuf->msg_next = mqhp->mq_freep; 284 mqhp->mq_freep = currentp; 285 286 return (curbuf->msg_len); 287 } 288 289 290 static void 291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 292 { 293 uint64_t currentp; 294 msghdr_t *curbuf; 295 uint64_t *headpp; 296 uint64_t *tailpp; 297 298 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 299 300 /* 301 * Grab a free message block, and link it in. We shouldn't 302 * be here unless there is room in the queue for us; it's 303 * fair to assert that the free pointer is non-NULL. 304 */ 305 currentp = mqhp->mq_freep; 306 MQ_ASSERT_PTR(mqhp, currentp); 307 curbuf = MQ_PTR(mqhp, currentp); 308 309 /* 310 * Remove a message from the free list, and copy in the new contents. 311 */ 312 mqhp->mq_freep = curbuf->msg_next; 313 curbuf->msg_next = 0; 314 (void) memcpy((char *)&curbuf[1], msgp, len); 315 curbuf->msg_len = len; 316 317 headpp = HEAD_PTR(mqhp, prio); 318 tailpp = TAIL_PTR(mqhp, prio); 319 320 if (*tailpp == 0) { 321 /* 322 * This is the first message on this queue. Set the 323 * head and tail pointers, and tip the appropriate bit 324 * in the priority mask. 325 */ 326 *headpp = currentp; 327 *tailpp = currentp; 328 mqhp->mq_mask |= (1u << prio); 329 if (prio > mqhp->mq_curmaxprio) 330 mqhp->mq_curmaxprio = prio; 331 } else { 332 MQ_ASSERT_PTR(mqhp, *tailpp); 333 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 334 *tailpp = currentp; 335 } 336 } 337 338 /* 339 * Send a notification and also delete the registration. 340 */ 341 static void 342 do_notify(mqhdr_t *mqhp) 343 { 344 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 345 if (mqhp->mq_ntype == SIGEV_THREAD || 346 mqhp->mq_ntype == SIGEV_PORT) 347 (void) sem_post(&mqhp->mq_spawner); 348 mqhp->mq_ntype = 0; 349 mqhp->mq_des = 0; 350 } 351 352 /* 353 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE. 354 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that 355 * they fail with errno == EBADMSG. Trigger any registered notification. 356 */ 357 static void 358 owner_dead(mqdes_t *mqdp, int error) 359 { 360 mqhdr_t *mqhp = mqdp->mqd_mq; 361 362 mqdp->mqd_ownerdead = 1; 363 (void) sem_post(&mqhp->mq_notfull); 364 (void) sem_post(&mqhp->mq_notempty); 365 if (error == EOWNERDEAD) { 366 if (mqhp->mq_sigid.sn_pid != 0) 367 do_notify(mqhp); 368 (void) mutex_unlock(&mqhp->mq_exclusive); 369 } 370 errno = EBADMSG; 371 } 372 373 mqd_t 374 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 375 { 376 va_list ap; 377 mode_t mode = 0; 378 struct mq_attr *attr = NULL; 379 int fd; 380 int err; 381 int cr_flag = 0; 382 int locked = 0; 383 uint64_t total_size; 384 size_t msgsize; 385 ssize_t maxmsg; 386 uint64_t temp; 387 void *ptr; 388 mqdes_t *mqdp; 389 mqhdr_t *mqhp; 390 struct mq_dn *mqdnp; 391 392 if (__pos4obj_check(path) == -1) 393 return ((mqd_t)-1); 394 395 /* acquire MSGQ lock to have atomic operation */ 396 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 397 goto out; 398 locked = 1; 399 400 va_start(ap, oflag); 401 /* filter oflag to have READ/WRITE/CREATE modes only */ 402 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 403 if ((oflag & O_CREAT) != 0) { 404 mode = va_arg(ap, mode_t); 405 attr = va_arg(ap, struct mq_attr *); 406 } 407 va_end(ap); 408 409 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 410 mode, &cr_flag)) < 0) 411 goto out; 412 413 /* closing permission file */ 414 (void) __close_nc(fd); 415 416 /* Try to open/create data file */ 417 if (cr_flag) { 418 cr_flag = PFILE_CREATE; 419 if (attr == NULL) { 420 maxmsg = MQ_MAXMSG; 421 msgsize = MQ_MAXSIZE; 422 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 423 errno = EINVAL; 424 goto out; 425 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 426 errno = ENOSPC; 427 goto out; 428 } else { 429 maxmsg = attr->mq_maxmsg; 430 msgsize = attr->mq_msgsize; 431 } 432 433 /* adjust for message size at word boundary */ 434 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 435 436 total_size = sizeof (mqhdr_t) + 437 maxmsg * (temp + sizeof (msghdr_t)) + 438 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 439 440 if (total_size > SSIZE_MAX) { 441 errno = ENOSPC; 442 goto out; 443 } 444 445 /* 446 * data file is opened with read/write to those 447 * who have read or write permission 448 */ 449 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 450 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 451 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 452 goto out; 453 454 cr_flag |= DFILE_CREATE | DFILE_OPEN; 455 456 /* force permissions to avoid umask effect */ 457 if (fchmod(fd, mode) < 0) 458 goto out; 459 460 if (ftruncate64(fd, (off64_t)total_size) < 0) 461 goto out; 462 } else { 463 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 464 O_RDWR, 0666, &err)) < 0) 465 goto out; 466 cr_flag = DFILE_OPEN; 467 468 /* Message queue has not been initialized yet */ 469 if (read(fd, &total_size, sizeof (total_size)) != 470 sizeof (total_size) || total_size == 0) { 471 errno = ENOENT; 472 goto out; 473 } 474 475 /* Message queue too big for this process to handle */ 476 if (total_size > SSIZE_MAX) { 477 errno = EFBIG; 478 goto out; 479 } 480 } 481 482 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 483 errno = ENOMEM; 484 goto out; 485 } 486 cr_flag |= ALLOC_MEM; 487 488 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 489 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 490 goto out; 491 mqhp = ptr; 492 cr_flag |= DFILE_MMAP; 493 494 /* closing data file */ 495 (void) __close_nc(fd); 496 cr_flag &= ~DFILE_OPEN; 497 498 /* 499 * create, unlink, size, mmap, and close description file 500 * all for a flag word in anonymous shared memory 501 */ 502 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 503 0666, &err)) < 0) 504 goto out; 505 cr_flag |= DFILE_OPEN; 506 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 507 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 508 goto out; 509 510 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 512 goto out; 513 mqdnp = ptr; 514 cr_flag |= MQDNP_MMAP; 515 516 (void) __close_nc(fd); 517 cr_flag &= ~DFILE_OPEN; 518 519 /* 520 * we follow the same strategy as filesystem open() routine, 521 * where fcntl.h flags are changed to flags defined in file.h. 522 */ 523 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 524 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 525 526 /* new message queue requires initialization */ 527 if ((cr_flag & DFILE_CREATE) != 0) { 528 /* message queue header has to be initialized */ 529 mq_init(mqhp, msgsize, maxmsg); 530 mqhp->mq_totsize = total_size; 531 } 532 mqdp->mqd_mq = mqhp; 533 mqdp->mqd_mqdn = mqdnp; 534 mqdp->mqd_magic = MQ_MAGIC; 535 mqdp->mqd_tcd = NULL; 536 mqdp->mqd_ownerdead = 0; 537 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 538 lmutex_lock(&mq_list_lock); 539 mqdp->mqd_next = mq_list; 540 mqdp->mqd_prev = NULL; 541 if (mq_list) 542 mq_list->mqd_prev = mqdp; 543 mq_list = mqdp; 544 lmutex_unlock(&mq_list_lock); 545 return ((mqd_t)mqdp); 546 } 547 548 locked = 0; /* fall into the error case */ 549 out: 550 err = errno; 551 if ((cr_flag & DFILE_OPEN) != 0) 552 (void) __close_nc(fd); 553 if ((cr_flag & DFILE_CREATE) != 0) 554 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 555 if ((cr_flag & PFILE_CREATE) != 0) 556 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 557 if ((cr_flag & ALLOC_MEM) != 0) 558 free((void *)mqdp); 559 if ((cr_flag & DFILE_MMAP) != 0) 560 (void) munmap((caddr_t)mqhp, (size_t)total_size); 561 if ((cr_flag & MQDNP_MMAP) != 0) 562 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 563 if (locked) 564 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 565 errno = err; 566 return ((mqd_t)-1); 567 } 568 569 static void 570 mq_close_cleanup(mqdes_t *mqdp) 571 { 572 mqhdr_t *mqhp = mqdp->mqd_mq; 573 struct mq_dn *mqdnp = mqdp->mqd_mqdn; 574 575 /* invalidate the descriptor before freeing it */ 576 mqdp->mqd_magic = 0; 577 if (!mqdp->mqd_ownerdead) 578 (void) mutex_unlock(&mqhp->mq_exclusive); 579 580 lmutex_lock(&mq_list_lock); 581 if (mqdp->mqd_next) 582 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 583 if (mqdp->mqd_prev) 584 mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 585 if (mq_list == mqdp) 586 mq_list = mqdp->mqd_next; 587 lmutex_unlock(&mq_list_lock); 588 589 free(mqdp); 590 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 591 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 592 } 593 594 int 595 mq_close(mqd_t mqdes) 596 { 597 mqdes_t *mqdp = (mqdes_t *)mqdes; 598 mqhdr_t *mqhp; 599 thread_communication_data_t *tcdp; 600 int error; 601 602 if (!mq_is_valid(mqdp)) { 603 errno = EBADF; 604 return (-1); 605 } 606 607 mqhp = mqdp->mqd_mq; 608 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 609 mqdp->mqd_ownerdead = 1; 610 if (error == EOWNERDEAD) 611 (void) mutex_unlock(&mqhp->mq_exclusive); 612 /* carry on regardless, without holding mq_exclusive */ 613 } 614 615 if (mqhp->mq_des == (uintptr_t)mqdp && 616 mqhp->mq_sigid.sn_pid == getpid()) { 617 /* notification is set for this descriptor, remove it */ 618 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 619 mqhp->mq_ntype = 0; 620 mqhp->mq_des = 0; 621 } 622 623 pthread_cleanup_push(mq_close_cleanup, mqdp); 624 if ((tcdp = mqdp->mqd_tcd) != NULL) { 625 mqdp->mqd_tcd = NULL; 626 del_sigev_mq(tcdp); /* possible cancellation point */ 627 } 628 pthread_cleanup_pop(1); /* finish in the cleanup handler */ 629 630 return (0); 631 } 632 633 int 634 mq_unlink(const char *path) 635 { 636 int err; 637 638 if (__pos4obj_check(path) < 0) 639 return (-1); 640 641 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 642 return (-1); 643 } 644 645 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 646 647 if (err == 0 || (err == -1 && errno == EEXIST)) { 648 errno = 0; 649 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 650 } 651 652 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 653 return (-1); 654 655 return (err); 656 657 } 658 659 static int 660 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 661 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 662 { 663 mqdes_t *mqdp = (mqdes_t *)mqdes; 664 mqhdr_t *mqhp; 665 int err; 666 int notify = 0; 667 668 /* 669 * sem_*wait() does cancellation, if called. 670 * pthread_testcancel() ensures that cancellation takes place if 671 * there is a cancellation pending when mq_*send() is called. 672 */ 673 pthread_testcancel(); 674 675 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 676 errno = EBADF; 677 return (-1); 678 } 679 680 mqhp = mqdp->mqd_mq; 681 682 if (msg_prio >= mqhp->mq_maxprio) { 683 errno = EINVAL; 684 return (-1); 685 } 686 if (msg_len > mqhp->mq_maxsz) { 687 errno = EMSGSIZE; 688 return (-1); 689 } 690 691 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 692 err = sem_trywait(&mqhp->mq_notfull); 693 else { 694 /* 695 * We might get cancelled here... 696 */ 697 if (timeout == NULL) 698 err = sem_wait(&mqhp->mq_notfull); 699 else if (abs_rel == ABS_TIME) 700 err = sem_timedwait(&mqhp->mq_notfull, timeout); 701 else 702 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 703 } 704 if (err == -1) { 705 /* 706 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 707 * by sem_*wait(), so we can just return. 708 */ 709 return (-1); 710 } 711 712 /* 713 * By the time we're here, we know that we've got the capacity 714 * to add to the queue...now acquire the exclusive lock. 715 */ 716 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 717 owner_dead(mqdp, err); 718 return (-1); 719 } 720 721 /* 722 * Now determine if we want to kick the notification. POSIX 723 * requires that if a process has registered for notification, 724 * we must kick it when the queue makes an empty to non-empty 725 * transition, and there are no blocked receivers. Note that 726 * this mechanism does _not_ guarantee that the kicked process 727 * will be able to receive a message without blocking; 728 * another receiver could intervene in the meantime. Thus, 729 * the notification mechanism is inherently racy; all we can 730 * do is hope to minimize the window as much as possible. 731 * In general, we want to avoid kicking the notification when 732 * there are clearly receivers blocked. We'll determine if 733 * we want to kick the notification before the mq_putmsg(), 734 * but the actual signotify() won't be done until the message 735 * is on the queue. 736 */ 737 if (mqhp->mq_sigid.sn_pid != 0) { 738 int nmessages, nblocked; 739 740 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 741 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 742 743 if (nmessages == 0 && nblocked == 0) 744 notify = 1; 745 } 746 747 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 748 (void) sem_post(&mqhp->mq_notempty); 749 750 if (notify) { 751 /* notify and also delete the registration */ 752 do_notify(mqhp); 753 } 754 755 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 756 (void) mutex_unlock(&mqhp->mq_exclusive); 757 758 return (0); 759 } 760 761 int 762 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 763 { 764 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 765 NULL, ABS_TIME)); 766 } 767 768 int 769 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 770 uint_t msg_prio, const timespec_t *abs_timeout) 771 { 772 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 773 abs_timeout, ABS_TIME)); 774 } 775 776 int 777 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 778 uint_t msg_prio, const timespec_t *rel_timeout) 779 { 780 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 781 rel_timeout, REL_TIME)); 782 } 783 784 static void 785 decrement_rblocked(mqhdr_t *mqhp) 786 { 787 int cancel_state; 788 789 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 790 while (sem_wait(&mqhp->mq_rblocked) == -1) 791 continue; 792 (void) pthread_setcancelstate(cancel_state, NULL); 793 } 794 795 static ssize_t 796 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 797 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 798 { 799 mqdes_t *mqdp = (mqdes_t *)mqdes; 800 mqhdr_t *mqhp; 801 ssize_t msg_size; 802 int err; 803 804 /* 805 * sem_*wait() does cancellation, if called. 806 * pthread_testcancel() ensures that cancellation takes place if 807 * there is a cancellation pending when mq_*receive() is called. 808 */ 809 pthread_testcancel(); 810 811 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 812 errno = EBADF; 813 return (ssize_t)(-1); 814 } 815 816 mqhp = mqdp->mqd_mq; 817 818 if (msg_len < mqhp->mq_maxsz) { 819 errno = EMSGSIZE; 820 return (ssize_t)(-1); 821 } 822 823 /* 824 * The semaphoring scheme for mq_[timed]receive is a little hairy 825 * thanks to POSIX.1b's arcane notification mechanism. First, 826 * we try to take the common case and do a sem_trywait(). 827 * If that doesn't work, and O_NONBLOCK hasn't been set, 828 * then note that we're going to sleep by incrementing the rblocked 829 * semaphore. We decrement that semaphore after waking up. 830 */ 831 if (sem_trywait(&mqhp->mq_notempty) == -1) { 832 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 833 /* 834 * errno has been set to EAGAIN or EINTR by 835 * sem_trywait(), so we can just return. 836 */ 837 return (-1); 838 } 839 /* 840 * If we're here, then we're probably going to block... 841 * increment the rblocked semaphore. If we get 842 * cancelled, decrement_rblocked() will decrement it. 843 */ 844 (void) sem_post(&mqhp->mq_rblocked); 845 846 pthread_cleanup_push(decrement_rblocked, mqhp); 847 if (timeout == NULL) 848 err = sem_wait(&mqhp->mq_notempty); 849 else if (abs_rel == ABS_TIME) 850 err = sem_timedwait(&mqhp->mq_notempty, timeout); 851 else 852 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 853 pthread_cleanup_pop(1); 854 855 if (err == -1) { 856 /* 857 * We took a signal or timeout while waiting 858 * on mq_notempty... 859 */ 860 return (-1); 861 } 862 } 863 864 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 865 owner_dead(mqdp, err); 866 return (-1); 867 } 868 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 869 (void) sem_post(&mqhp->mq_notfull); 870 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 871 (void) mutex_unlock(&mqhp->mq_exclusive); 872 873 return (msg_size); 874 } 875 876 ssize_t 877 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 878 { 879 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 880 NULL, ABS_TIME)); 881 } 882 883 ssize_t 884 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 885 uint_t *msg_prio, const timespec_t *abs_timeout) 886 { 887 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 888 abs_timeout, ABS_TIME)); 889 } 890 891 ssize_t 892 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 893 uint_t *msg_prio, const timespec_t *rel_timeout) 894 { 895 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 896 rel_timeout, REL_TIME)); 897 } 898 899 /* 900 * Only used below, in mq_notify(). 901 * We already have a spawner thread. 902 * Verify that the attributes match; cancel it if necessary. 903 */ 904 static int 905 cancel_if_necessary(thread_communication_data_t *tcdp, 906 const struct sigevent *sigevp) 907 { 908 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp, 909 sigevp->sigev_notify_attributes); 910 911 if (do_cancel) { 912 /* 913 * Attributes don't match, cancel the spawner thread. 914 */ 915 (void) pthread_cancel(tcdp->tcd_server_id); 916 } else { 917 /* 918 * Reuse the existing spawner thread with possibly 919 * changed notification function and value. 920 */ 921 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 922 tcdp->tcd_notif.sigev_signo = 0; 923 tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 924 tcdp->tcd_notif.sigev_notify_function = 925 sigevp->sigev_notify_function; 926 } 927 928 return (do_cancel); 929 } 930 931 int 932 mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 933 { 934 mqdes_t *mqdp = (mqdes_t *)mqdes; 935 mqhdr_t *mqhp; 936 thread_communication_data_t *tcdp; 937 siginfo_t mq_siginfo; 938 struct sigevent sigevent; 939 struct stat64 statb; 940 port_notify_t *pn; 941 void *userval; 942 int rval = -1; 943 int ntype; 944 int port; 945 int error; 946 947 if (!mq_is_valid(mqdp)) { 948 errno = EBADF; 949 return (-1); 950 } 951 952 mqhp = mqdp->mqd_mq; 953 954 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 955 mqdp->mqd_ownerdead = 1; 956 sigevp = NULL; 957 if (error == EOWNERDEAD) 958 (void) mutex_unlock(&mqhp->mq_exclusive); 959 /* carry on regardless, without holding mq_exclusive */ 960 } 961 962 if (sigevp == NULL) { /* remove notification */ 963 if (mqhp->mq_des == (uintptr_t)mqdp && 964 mqhp->mq_sigid.sn_pid == getpid()) { 965 /* notification is set for this descriptor, remove it */ 966 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 967 if ((tcdp = mqdp->mqd_tcd) != NULL) { 968 sig_mutex_lock(&tcdp->tcd_lock); 969 if (tcdp->tcd_msg_enabled) { 970 /* cancel the spawner thread */ 971 tcdp = mqdp->mqd_tcd; 972 mqdp->mqd_tcd = NULL; 973 (void) pthread_cancel( 974 tcdp->tcd_server_id); 975 } 976 sig_mutex_unlock(&tcdp->tcd_lock); 977 } 978 mqhp->mq_ntype = 0; 979 mqhp->mq_des = 0; 980 } else { 981 /* notification is not set for this descriptor */ 982 errno = EBUSY; 983 goto bad; 984 } 985 } else { /* register notification with this process */ 986 switch (ntype = sigevp->sigev_notify) { 987 case SIGEV_THREAD: 988 userval = sigevp->sigev_value.sival_ptr; 989 port = -1; 990 break; 991 case SIGEV_PORT: 992 pn = sigevp->sigev_value.sival_ptr; 993 userval = pn->portnfy_user; 994 port = pn->portnfy_port; 995 if (fstat64(port, &statb) != 0 || 996 !S_ISPORT(statb.st_mode)) { 997 errno = EBADF; 998 goto bad; 999 } 1000 (void) memset(&sigevent, 0, sizeof (sigevent)); 1001 sigevent.sigev_notify = SIGEV_PORT; 1002 sigevp = &sigevent; 1003 break; 1004 } 1005 switch (ntype) { 1006 case SIGEV_NONE: 1007 mq_siginfo.si_signo = 0; 1008 mq_siginfo.si_code = SI_MESGQ; 1009 break; 1010 case SIGEV_SIGNAL: 1011 mq_siginfo.si_signo = sigevp->sigev_signo; 1012 mq_siginfo.si_value = sigevp->sigev_value; 1013 mq_siginfo.si_code = SI_MESGQ; 1014 break; 1015 case SIGEV_THREAD: 1016 if ((tcdp = mqdp->mqd_tcd) != NULL && 1017 cancel_if_necessary(tcdp, sigevp)) 1018 mqdp->mqd_tcd = NULL; 1019 /* FALLTHROUGH */ 1020 case SIGEV_PORT: 1021 if ((tcdp = mqdp->mqd_tcd) == NULL) { 1022 /* we must create a spawner thread */ 1023 tcdp = setup_sigev_handler(sigevp, MQ); 1024 if (tcdp == NULL) { 1025 errno = EBADF; 1026 goto bad; 1027 } 1028 tcdp->tcd_msg_enabled = 0; 1029 tcdp->tcd_msg_closing = 0; 1030 tcdp->tcd_msg_avail = &mqhp->mq_spawner; 1031 if (launch_spawner(tcdp) != 0) { 1032 free_sigev_handler(tcdp); 1033 goto bad; 1034 } 1035 mqdp->mqd_tcd = tcdp; 1036 } 1037 mq_siginfo.si_signo = 0; 1038 mq_siginfo.si_code = SI_MESGQ; 1039 break; 1040 default: 1041 errno = EINVAL; 1042 goto bad; 1043 } 1044 1045 /* register notification */ 1046 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 1047 goto bad; 1048 mqhp->mq_ntype = ntype; 1049 mqhp->mq_des = (uintptr_t)mqdp; 1050 switch (ntype) { 1051 case SIGEV_THREAD: 1052 case SIGEV_PORT: 1053 tcdp->tcd_port = port; 1054 tcdp->tcd_msg_object = mqdp; 1055 tcdp->tcd_msg_userval = userval; 1056 sig_mutex_lock(&tcdp->tcd_lock); 1057 tcdp->tcd_msg_enabled = ntype; 1058 sig_mutex_unlock(&tcdp->tcd_lock); 1059 (void) cond_broadcast(&tcdp->tcd_cv); 1060 break; 1061 } 1062 } 1063 1064 rval = 0; /* success */ 1065 bad: 1066 if (error == 0) { 1067 (void) mutex_unlock(&mqhp->mq_exclusive); 1068 } else { 1069 errno = EBADMSG; 1070 rval = -1; 1071 } 1072 return (rval); 1073 } 1074 1075 int 1076 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1077 { 1078 mqdes_t *mqdp = (mqdes_t *)mqdes; 1079 mqhdr_t *mqhp; 1080 uint_t flag = 0; 1081 1082 if (!mq_is_valid(mqdp)) { 1083 errno = EBADF; 1084 return (-1); 1085 } 1086 1087 /* store current attributes */ 1088 if (omqstat != NULL) { 1089 int count; 1090 1091 mqhp = mqdp->mqd_mq; 1092 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1093 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1094 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1095 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1096 omqstat->mq_curmsgs = count; 1097 } 1098 1099 /* set description attributes */ 1100 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1101 flag = FNONBLOCK; 1102 mqdp->mqd_mqdn->mqdn_flags = flag; 1103 1104 return (0); 1105 } 1106 1107 int 1108 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1109 { 1110 mqdes_t *mqdp = (mqdes_t *)mqdes; 1111 mqhdr_t *mqhp; 1112 int count; 1113 1114 if (!mq_is_valid(mqdp)) { 1115 errno = EBADF; 1116 return (-1); 1117 } 1118 1119 mqhp = mqdp->mqd_mq; 1120 1121 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1122 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1123 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1124 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1125 mqstat->mq_curmsgs = count; 1126 return (0); 1127 } 1128 1129 /* 1130 * Cleanup after fork1() in the child process. 1131 */ 1132 void 1133 postfork1_child_sigev_mq(void) 1134 { 1135 thread_communication_data_t *tcdp; 1136 mqdes_t *mqdp; 1137 1138 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1139 if ((tcdp = mqdp->mqd_tcd) != NULL) { 1140 mqdp->mqd_tcd = NULL; 1141 tcd_teardown(tcdp); 1142 } 1143 } 1144 } 1145