1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "lint.h" 30 #include "mtlib.h" 31 #define _KMEMUSER 32 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 33 #undef _KMEMUSER 34 #include <mqueue.h> 35 #include <sys/types.h> 36 #include <sys/file.h> 37 #include <sys/mman.h> 38 #include <errno.h> 39 #include <stdarg.h> 40 #include <limits.h> 41 #include <pthread.h> 42 #include <assert.h> 43 #include <string.h> 44 #include <unistd.h> 45 #include <stdlib.h> 46 #include <sys/stat.h> 47 #include <inttypes.h> 48 #include "sigev_thread.h" 49 #include "pos4obj.h" 50 51 /* 52 * Default values per message queue 53 */ 54 #define MQ_MAXMSG 128 55 #define MQ_MAXSIZE 1024 56 57 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 58 59 /* 60 * Message header which is part of messages in link list 61 */ 62 typedef struct { 63 uint64_t msg_next; /* offset of next message in the link */ 64 uint64_t msg_len; /* length of the message */ 65 } msghdr_t; 66 67 /* 68 * message queue description 69 */ 70 struct mq_dn { 71 size_t mqdn_flags; /* open description flags */ 72 }; 73 74 /* 75 * message queue descriptor structure 76 */ 77 typedef struct mq_des { 78 struct mq_des *mqd_next; /* list of all open mq descriptors, */ 79 struct mq_des *mqd_prev; /* needed for fork-safety */ 80 int mqd_magic; /* magic # to identify mq_des */ 81 int mqd_flags; /* operation flag per open */ 82 struct mq_header *mqd_mq; /* address pointer of message Q */ 83 struct mq_dn *mqd_mqdn; /* open description */ 84 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 85 } mqdes_t; 86 87 /* 88 * message queue common header, part of the mmap()ed file. 89 * Since message queues may be shared between 32- and 64-bit processes, 90 * care must be taken to make sure that the elements of this structure 91 * are identical for both _LP64 and _ILP32 cases. 92 */ 93 typedef struct mq_header { 94 /* first field must be mq_totsize, DO NOT insert before this */ 95 int64_t mq_totsize; /* total size of the Queue */ 96 int64_t mq_maxsz; /* max size of each message */ 97 uint32_t mq_maxmsg; /* max messages in the queue */ 98 uint32_t mq_maxprio; /* maximum mqueue priority */ 99 uint32_t mq_curmaxprio; /* current maximum MQ priority */ 100 uint32_t mq_mask; /* priority bitmask */ 101 uint64_t mq_freep; /* free message's head pointer */ 102 uint64_t mq_headpp; /* pointer to head pointers */ 103 uint64_t mq_tailpp; /* pointer to tail pointers */ 104 signotify_id_t mq_sigid; /* notification id (3 int's) */ 105 uint32_t mq_ntype; /* notification type (SIGEV_*) */ 106 uint64_t mq_des; /* pointer to msg Q descriptor */ 107 mutex_t mq_exclusive; /* acquire for exclusive access */ 108 sem_t mq_rblocked; /* number of processes rblocked */ 109 sem_t mq_notfull; /* mq_send()'s block on this */ 110 sem_t mq_notempty; /* mq_receive()'s block on this */ 111 sem_t mq_spawner; /* spawner thread blocks on this */ 112 } mqhdr_t; 113 114 /* 115 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 116 * If this assumption is somehow invalidated, mq_open() needs to be changed 117 * back to the old version which kept a count and enforced a limit. 118 * We make sure that this is pointed out to those changing <sys/param.h> 119 * by checking _MQ_OPEN_MAX at compile time. 120 */ 121 #if _MQ_OPEN_MAX != -1 122 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 123 #endif 124 125 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 126 127 #ifdef DEBUG 128 #define MQ_ASSERT(x) assert(x); 129 130 #define MQ_ASSERT_PTR(_m, _p) \ 131 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 132 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 133 _m->mq_totsize)); 134 135 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 136 int _val; \ 137 (void) sem_getvalue((sem), &_val); \ 138 assert((_val) <= val); } 139 #else 140 #define MQ_ASSERT(x) 141 #define MQ_ASSERT_PTR(_m, _p) 142 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 143 #endif 144 145 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 146 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 147 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 148 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 149 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 150 151 #define MQ_RESERVED ((mqdes_t *)-1) 152 153 #define ABS_TIME 0 154 #define REL_TIME 1 155 156 static mutex_t mq_list_lock = DEFAULTMUTEX; 157 static mqdes_t *mq_list = NULL; 158 159 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 160 161 static int 162 mq_is_valid(mqdes_t *mqdp) 163 { 164 /* 165 * Any use of a message queue after it was closed is 166 * undefined. But the standard strongly favours EBADF 167 * returns. Before we dereference which could be fatal, 168 * we first do some pointer sanity checks. 169 */ 170 if (mqdp != NULL && mqdp != MQ_RESERVED && 171 ((uintptr_t)mqdp & 0x7) == 0) { 172 return (mqdp->mqd_magic == MQ_MAGIC); 173 } 174 175 return (0); 176 } 177 178 static void 179 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 180 { 181 int i; 182 uint64_t temp; 183 uint64_t currentp; 184 uint64_t nextp; 185 186 /* 187 * We only need to initialize the non-zero fields. The use of 188 * ftruncate() on the message queue file assures that the 189 * pages will be zfod. 190 */ 191 (void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL); 192 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 193 (void) sem_init(&mqhp->mq_notempty, 1, 0); 194 (void) sem_init(&mqhp->mq_spawner, 1, 0); 195 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 196 197 mqhp->mq_maxsz = msgsize; 198 mqhp->mq_maxmsg = maxmsg; 199 200 /* 201 * As of this writing (1997), there are 32 message queue priorities. 202 * If this is to change, then the size of the mq_mask will 203 * also have to change. If DEBUG is defined, assert that 204 * _MQ_PRIO_MAX hasn't changed. 205 */ 206 mqhp->mq_maxprio = _MQ_PRIO_MAX; 207 #if defined(DEBUG) 208 /* LINTED always true */ 209 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 210 #endif 211 212 /* 213 * Since the message queue can be mapped into different 214 * virtual address ranges by different processes, we don't 215 * keep track of pointers, only offsets into the shared region. 216 */ 217 mqhp->mq_headpp = sizeof (mqhdr_t); 218 mqhp->mq_tailpp = mqhp->mq_headpp + 219 mqhp->mq_maxprio * sizeof (uint64_t); 220 mqhp->mq_freep = mqhp->mq_tailpp + 221 mqhp->mq_maxprio * sizeof (uint64_t); 222 223 currentp = mqhp->mq_freep; 224 MQ_PTR(mqhp, currentp)->msg_next = 0; 225 226 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 227 for (i = 1; i < mqhp->mq_maxmsg; i++) { 228 nextp = currentp + sizeof (msghdr_t) + temp; 229 MQ_PTR(mqhp, currentp)->msg_next = nextp; 230 MQ_PTR(mqhp, nextp)->msg_next = 0; 231 currentp = nextp; 232 } 233 } 234 235 static size_t 236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 237 { 238 uint64_t currentp; 239 msghdr_t *curbuf; 240 uint64_t *headpp; 241 uint64_t *tailpp; 242 243 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 244 245 /* 246 * Get the head and tail pointers for the queue of maximum 247 * priority. We shouldn't be here unless there is a message for 248 * us, so it's fair to assert that both the head and tail 249 * pointers are non-NULL. 250 */ 251 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 252 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 253 254 if (msg_prio != NULL) 255 *msg_prio = mqhp->mq_curmaxprio; 256 257 currentp = *headpp; 258 MQ_ASSERT_PTR(mqhp, currentp); 259 curbuf = MQ_PTR(mqhp, currentp); 260 261 if ((*headpp = curbuf->msg_next) == NULL) { 262 /* 263 * We just nuked the last message in this priority's queue. 264 * Twiddle this priority's bit, and then find the next bit 265 * tipped. 266 */ 267 uint_t prio = mqhp->mq_curmaxprio; 268 269 mqhp->mq_mask &= ~(1u << prio); 270 271 for (; prio != 0; prio--) 272 if (mqhp->mq_mask & (1u << prio)) 273 break; 274 mqhp->mq_curmaxprio = prio; 275 276 *tailpp = NULL; 277 } 278 279 /* 280 * Copy the message, and put the buffer back on the free list. 281 */ 282 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 283 curbuf->msg_next = mqhp->mq_freep; 284 mqhp->mq_freep = currentp; 285 286 return (curbuf->msg_len); 287 } 288 289 290 static void 291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 292 { 293 uint64_t currentp; 294 msghdr_t *curbuf; 295 uint64_t *headpp; 296 uint64_t *tailpp; 297 298 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 299 300 /* 301 * Grab a free message block, and link it in. We shouldn't 302 * be here unless there is room in the queue for us; it's 303 * fair to assert that the free pointer is non-NULL. 304 */ 305 currentp = mqhp->mq_freep; 306 MQ_ASSERT_PTR(mqhp, currentp); 307 curbuf = MQ_PTR(mqhp, currentp); 308 309 /* 310 * Remove a message from the free list, and copy in the new contents. 311 */ 312 mqhp->mq_freep = curbuf->msg_next; 313 curbuf->msg_next = NULL; 314 (void) memcpy((char *)&curbuf[1], msgp, len); 315 curbuf->msg_len = len; 316 317 headpp = HEAD_PTR(mqhp, prio); 318 tailpp = TAIL_PTR(mqhp, prio); 319 320 if (*tailpp == 0) { 321 /* 322 * This is the first message on this queue. Set the 323 * head and tail pointers, and tip the appropriate bit 324 * in the priority mask. 325 */ 326 *headpp = currentp; 327 *tailpp = currentp; 328 mqhp->mq_mask |= (1u << prio); 329 if (prio > mqhp->mq_curmaxprio) 330 mqhp->mq_curmaxprio = prio; 331 } else { 332 MQ_ASSERT_PTR(mqhp, *tailpp); 333 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 334 *tailpp = currentp; 335 } 336 } 337 338 mqd_t 339 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 340 { 341 va_list ap; 342 mode_t mode; 343 struct mq_attr *attr; 344 int fd; 345 int err; 346 int cr_flag = 0; 347 int locked = 0; 348 uint64_t total_size; 349 size_t msgsize; 350 ssize_t maxmsg; 351 uint64_t temp; 352 void *ptr; 353 mqdes_t *mqdp; 354 mqhdr_t *mqhp; 355 struct mq_dn *mqdnp; 356 357 if (__pos4obj_check(path) == -1) 358 return ((mqd_t)-1); 359 360 /* acquire MSGQ lock to have atomic operation */ 361 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 362 goto out; 363 locked = 1; 364 365 va_start(ap, oflag); 366 /* filter oflag to have READ/WRITE/CREATE modes only */ 367 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 368 if ((oflag & O_CREAT) != 0) { 369 mode = va_arg(ap, mode_t); 370 attr = va_arg(ap, struct mq_attr *); 371 } 372 va_end(ap); 373 374 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 375 mode, &cr_flag)) < 0) 376 goto out; 377 378 /* closing permission file */ 379 (void) __close_nc(fd); 380 381 /* Try to open/create data file */ 382 if (cr_flag) { 383 cr_flag = PFILE_CREATE; 384 if (attr == NULL) { 385 maxmsg = MQ_MAXMSG; 386 msgsize = MQ_MAXSIZE; 387 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 388 errno = EINVAL; 389 goto out; 390 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 391 errno = ENOSPC; 392 goto out; 393 } else { 394 maxmsg = attr->mq_maxmsg; 395 msgsize = attr->mq_msgsize; 396 } 397 398 /* adjust for message size at word boundary */ 399 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 400 401 total_size = sizeof (mqhdr_t) + 402 maxmsg * (temp + sizeof (msghdr_t)) + 403 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 404 405 if (total_size > SSIZE_MAX) { 406 errno = ENOSPC; 407 goto out; 408 } 409 410 /* 411 * data file is opened with read/write to those 412 * who have read or write permission 413 */ 414 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 415 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 416 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 417 goto out; 418 419 cr_flag |= DFILE_CREATE | DFILE_OPEN; 420 421 /* force permissions to avoid umask effect */ 422 if (fchmod(fd, mode) < 0) 423 goto out; 424 425 if (ftruncate64(fd, (off64_t)total_size) < 0) 426 goto out; 427 } else { 428 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 429 O_RDWR, 0666, &err)) < 0) 430 goto out; 431 cr_flag = DFILE_OPEN; 432 433 /* Message queue has not been initialized yet */ 434 if (read(fd, &total_size, sizeof (total_size)) != 435 sizeof (total_size) || total_size == 0) { 436 errno = ENOENT; 437 goto out; 438 } 439 440 /* Message queue too big for this process to handle */ 441 if (total_size > SSIZE_MAX) { 442 errno = EFBIG; 443 goto out; 444 } 445 } 446 447 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 448 errno = ENOMEM; 449 goto out; 450 } 451 cr_flag |= ALLOC_MEM; 452 453 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 454 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 455 goto out; 456 mqhp = ptr; 457 cr_flag |= DFILE_MMAP; 458 459 /* closing data file */ 460 (void) __close_nc(fd); 461 cr_flag &= ~DFILE_OPEN; 462 463 /* 464 * create, unlink, size, mmap, and close description file 465 * all for a flag word in anonymous shared memory 466 */ 467 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 468 0666, &err)) < 0) 469 goto out; 470 cr_flag |= DFILE_OPEN; 471 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 472 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 473 goto out; 474 475 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 476 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 477 goto out; 478 mqdnp = ptr; 479 cr_flag |= MQDNP_MMAP; 480 481 (void) __close_nc(fd); 482 cr_flag &= ~DFILE_OPEN; 483 484 /* 485 * we follow the same strategy as filesystem open() routine, 486 * where fcntl.h flags are changed to flags defined in file.h. 487 */ 488 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 489 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 490 491 /* new message queue requires initialization */ 492 if ((cr_flag & DFILE_CREATE) != 0) { 493 /* message queue header has to be initialized */ 494 mq_init(mqhp, msgsize, maxmsg); 495 mqhp->mq_totsize = total_size; 496 } 497 mqdp->mqd_mq = mqhp; 498 mqdp->mqd_mqdn = mqdnp; 499 mqdp->mqd_magic = MQ_MAGIC; 500 mqdp->mqd_tcd = NULL; 501 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 502 lmutex_lock(&mq_list_lock); 503 mqdp->mqd_next = mq_list; 504 mqdp->mqd_prev = NULL; 505 if (mq_list) 506 mq_list->mqd_prev = mqdp; 507 mq_list = mqdp; 508 lmutex_unlock(&mq_list_lock); 509 return ((mqd_t)mqdp); 510 } 511 512 locked = 0; /* fall into the error case */ 513 out: 514 err = errno; 515 if ((cr_flag & DFILE_OPEN) != 0) 516 (void) __close_nc(fd); 517 if ((cr_flag & DFILE_CREATE) != 0) 518 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 519 if ((cr_flag & PFILE_CREATE) != 0) 520 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 521 if ((cr_flag & ALLOC_MEM) != 0) 522 free((void *)mqdp); 523 if ((cr_flag & DFILE_MMAP) != 0) 524 (void) munmap((caddr_t)mqhp, (size_t)total_size); 525 if ((cr_flag & MQDNP_MMAP) != 0) 526 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 527 if (locked) 528 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 529 errno = err; 530 return ((mqd_t)-1); 531 } 532 533 static void 534 mq_close_cleanup(mqdes_t *mqdp) 535 { 536 mqhdr_t *mqhp = mqdp->mqd_mq; 537 struct mq_dn *mqdnp = mqdp->mqd_mqdn; 538 539 /* invalidate the descriptor before freeing it */ 540 mqdp->mqd_magic = 0; 541 (void) mutex_unlock(&mqhp->mq_exclusive); 542 543 lmutex_lock(&mq_list_lock); 544 if (mqdp->mqd_next) 545 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 546 if (mqdp->mqd_prev) 547 mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 548 if (mq_list == mqdp) 549 mq_list = mqdp->mqd_next; 550 lmutex_unlock(&mq_list_lock); 551 552 free(mqdp); 553 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 554 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 555 } 556 557 int 558 mq_close(mqd_t mqdes) 559 { 560 mqdes_t *mqdp = (mqdes_t *)mqdes; 561 mqhdr_t *mqhp; 562 thread_communication_data_t *tcdp; 563 564 if (!mq_is_valid(mqdp)) { 565 errno = EBADF; 566 return (-1); 567 } 568 569 mqhp = mqdp->mqd_mq; 570 (void) mutex_lock(&mqhp->mq_exclusive); 571 572 if (mqhp->mq_des == (uintptr_t)mqdp && 573 mqhp->mq_sigid.sn_pid == getpid()) { 574 /* notification is set for this descriptor, remove it */ 575 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 576 mqhp->mq_ntype = 0; 577 mqhp->mq_des = 0; 578 } 579 580 pthread_cleanup_push(mq_close_cleanup, mqdp); 581 if ((tcdp = mqdp->mqd_tcd) != NULL) { 582 mqdp->mqd_tcd = NULL; 583 del_sigev_mq(tcdp); /* possible cancellation point */ 584 } 585 pthread_cleanup_pop(1); /* finish in the cleanup handler */ 586 587 return (0); 588 } 589 590 int 591 mq_unlink(const char *path) 592 { 593 int err; 594 595 if (__pos4obj_check(path) < 0) 596 return (-1); 597 598 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 599 return (-1); 600 } 601 602 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 603 604 if (err == 0 || (err == -1 && errno == EEXIST)) { 605 errno = 0; 606 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 607 } 608 609 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 610 return (-1); 611 612 return (err); 613 614 } 615 616 static int 617 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 618 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 619 { 620 mqdes_t *mqdp = (mqdes_t *)mqdes; 621 mqhdr_t *mqhp; 622 int err; 623 int notify = 0; 624 625 /* 626 * sem_*wait() does cancellation, if called. 627 * pthread_testcancel() ensures that cancellation takes place if 628 * there is a cancellation pending when mq_*send() is called. 629 */ 630 pthread_testcancel(); 631 632 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 633 errno = EBADF; 634 return (-1); 635 } 636 637 mqhp = mqdp->mqd_mq; 638 639 if (msg_prio >= mqhp->mq_maxprio) { 640 errno = EINVAL; 641 return (-1); 642 } 643 if (msg_len > mqhp->mq_maxsz) { 644 errno = EMSGSIZE; 645 return (-1); 646 } 647 648 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 649 err = sem_trywait(&mqhp->mq_notfull); 650 else { 651 /* 652 * We might get cancelled here... 653 */ 654 if (timeout == NULL) 655 err = sem_wait(&mqhp->mq_notfull); 656 else if (abs_rel == ABS_TIME) 657 err = sem_timedwait(&mqhp->mq_notfull, timeout); 658 else 659 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 660 } 661 if (err == -1) { 662 /* 663 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 664 * by sem_*wait(), so we can just return. 665 */ 666 return (-1); 667 } 668 669 /* 670 * By the time we're here, we know that we've got the capacity 671 * to add to the queue...now acquire the exclusive lock. 672 */ 673 (void) mutex_lock(&mqhp->mq_exclusive); 674 675 /* 676 * Now determine if we want to kick the notification. POSIX 677 * requires that if a process has registered for notification, 678 * we must kick it when the queue makes an empty to non-empty 679 * transition, and there are no blocked receivers. Note that 680 * this mechanism does _not_ guarantee that the kicked process 681 * will be able to receive a message without blocking; 682 * another receiver could intervene in the meantime. Thus, 683 * the notification mechanism is inherently racy; all we can 684 * do is hope to minimize the window as much as possible. 685 * In general, we want to avoid kicking the notification when 686 * there are clearly receivers blocked. We'll determine if 687 * we want to kick the notification before the mq_putmsg(), 688 * but the actual signotify() won't be done until the message 689 * is on the queue. 690 */ 691 if (mqhp->mq_sigid.sn_pid != 0) { 692 int nmessages, nblocked; 693 694 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 695 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 696 697 if (nmessages == 0 && nblocked == 0) 698 notify = 1; 699 } 700 701 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 702 (void) sem_post(&mqhp->mq_notempty); 703 704 if (notify) { 705 /* notify and also delete the registration */ 706 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 707 if (mqhp->mq_ntype == SIGEV_THREAD || 708 mqhp->mq_ntype == SIGEV_PORT) 709 (void) sem_post(&mqhp->mq_spawner); 710 mqhp->mq_ntype = 0; 711 mqhp->mq_des = 0; 712 } 713 714 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 715 (void) mutex_unlock(&mqhp->mq_exclusive); 716 717 return (0); 718 } 719 720 int 721 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 722 { 723 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 724 NULL, ABS_TIME)); 725 } 726 727 int 728 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 729 uint_t msg_prio, const timespec_t *abs_timeout) 730 { 731 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 732 abs_timeout, ABS_TIME)); 733 } 734 735 int 736 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 737 uint_t msg_prio, const timespec_t *rel_timeout) 738 { 739 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 740 rel_timeout, REL_TIME)); 741 } 742 743 static void 744 decrement_rblocked(mqhdr_t *mqhp) 745 { 746 int cancel_state; 747 748 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 749 while (sem_wait(&mqhp->mq_rblocked) == -1) 750 continue; 751 (void) pthread_setcancelstate(cancel_state, NULL); 752 } 753 754 static ssize_t 755 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 756 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 757 { 758 mqdes_t *mqdp = (mqdes_t *)mqdes; 759 mqhdr_t *mqhp; 760 ssize_t msg_size; 761 int err; 762 763 /* 764 * sem_*wait() does cancellation, if called. 765 * pthread_testcancel() ensures that cancellation takes place if 766 * there is a cancellation pending when mq_*receive() is called. 767 */ 768 pthread_testcancel(); 769 770 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 771 errno = EBADF; 772 return (ssize_t)(-1); 773 } 774 775 mqhp = mqdp->mqd_mq; 776 777 if (msg_len < mqhp->mq_maxsz) { 778 errno = EMSGSIZE; 779 return (ssize_t)(-1); 780 } 781 782 /* 783 * The semaphoring scheme for mq_[timed]receive is a little hairy 784 * thanks to POSIX.1b's arcane notification mechanism. First, 785 * we try to take the common case and do a sem_trywait(). 786 * If that doesn't work, and O_NONBLOCK hasn't been set, 787 * then note that we're going to sleep by incrementing the rblocked 788 * semaphore. We decrement that semaphore after waking up. 789 */ 790 if (sem_trywait(&mqhp->mq_notempty) == -1) { 791 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 792 /* 793 * errno has been set to EAGAIN or EINTR by 794 * sem_trywait(), so we can just return. 795 */ 796 return (-1); 797 } 798 /* 799 * If we're here, then we're probably going to block... 800 * increment the rblocked semaphore. If we get 801 * cancelled, decrement_rblocked() will decrement it. 802 */ 803 (void) sem_post(&mqhp->mq_rblocked); 804 805 pthread_cleanup_push(decrement_rblocked, mqhp); 806 if (timeout == NULL) 807 err = sem_wait(&mqhp->mq_notempty); 808 else if (abs_rel == ABS_TIME) 809 err = sem_timedwait(&mqhp->mq_notempty, timeout); 810 else 811 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 812 pthread_cleanup_pop(1); 813 814 if (err == -1) { 815 /* 816 * We took a signal or timeout while waiting 817 * on mq_notempty... 818 */ 819 return (-1); 820 } 821 } 822 823 (void) mutex_lock(&mqhp->mq_exclusive); 824 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 825 (void) sem_post(&mqhp->mq_notfull); 826 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 827 (void) mutex_unlock(&mqhp->mq_exclusive); 828 829 return (msg_size); 830 } 831 832 ssize_t 833 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 834 { 835 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 836 NULL, ABS_TIME)); 837 } 838 839 ssize_t 840 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 841 uint_t *msg_prio, const timespec_t *abs_timeout) 842 { 843 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 844 abs_timeout, ABS_TIME)); 845 } 846 847 ssize_t 848 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 849 uint_t *msg_prio, const timespec_t *rel_timeout) 850 { 851 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 852 rel_timeout, REL_TIME)); 853 } 854 855 /* 856 * Only used below, in mq_notify(). 857 * We already have a spawner thread. 858 * Verify that the attributes match; cancel it if necessary. 859 */ 860 static int 861 cancel_if_necessary(thread_communication_data_t *tcdp, 862 const struct sigevent *sigevp) 863 { 864 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp, 865 sigevp->sigev_notify_attributes); 866 867 if (do_cancel) { 868 /* 869 * Attributes don't match, cancel the spawner thread. 870 */ 871 (void) pthread_cancel(tcdp->tcd_server_id); 872 } else { 873 /* 874 * Reuse the existing spawner thread with possibly 875 * changed notification function and value. 876 */ 877 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 878 tcdp->tcd_notif.sigev_signo = 0; 879 tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 880 tcdp->tcd_notif.sigev_notify_function = 881 sigevp->sigev_notify_function; 882 } 883 884 return (do_cancel); 885 } 886 887 int 888 mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 889 { 890 mqdes_t *mqdp = (mqdes_t *)mqdes; 891 mqhdr_t *mqhp; 892 thread_communication_data_t *tcdp; 893 siginfo_t mq_siginfo; 894 struct sigevent sigevent; 895 struct stat64 statb; 896 port_notify_t *pn; 897 void *userval; 898 int rval = -1; 899 int ntype; 900 int port; 901 902 if (!mq_is_valid(mqdp)) { 903 errno = EBADF; 904 return (-1); 905 } 906 907 mqhp = mqdp->mqd_mq; 908 909 (void) mutex_lock(&mqhp->mq_exclusive); 910 911 if (sigevp == NULL) { /* remove notification */ 912 if (mqhp->mq_des == (uintptr_t)mqdp && 913 mqhp->mq_sigid.sn_pid == getpid()) { 914 /* notification is set for this descriptor, remove it */ 915 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 916 if ((tcdp = mqdp->mqd_tcd) != NULL) { 917 sig_mutex_lock(&tcdp->tcd_lock); 918 if (tcdp->tcd_msg_enabled) { 919 /* cancel the spawner thread */ 920 tcdp = mqdp->mqd_tcd; 921 mqdp->mqd_tcd = NULL; 922 (void) pthread_cancel( 923 tcdp->tcd_server_id); 924 } 925 sig_mutex_unlock(&tcdp->tcd_lock); 926 } 927 mqhp->mq_ntype = 0; 928 mqhp->mq_des = 0; 929 } else { 930 /* notification is not set for this descriptor */ 931 errno = EBUSY; 932 goto bad; 933 } 934 } else { /* register notification with this process */ 935 switch (ntype = sigevp->sigev_notify) { 936 case SIGEV_THREAD: 937 userval = sigevp->sigev_value.sival_ptr; 938 port = -1; 939 break; 940 case SIGEV_PORT: 941 pn = sigevp->sigev_value.sival_ptr; 942 userval = pn->portnfy_user; 943 port = pn->portnfy_port; 944 if (fstat64(port, &statb) != 0 || 945 !S_ISPORT(statb.st_mode)) { 946 errno = EBADF; 947 goto bad; 948 } 949 (void) memset(&sigevent, 0, sizeof (sigevent)); 950 sigevent.sigev_notify = SIGEV_PORT; 951 sigevp = &sigevent; 952 break; 953 } 954 switch (ntype) { 955 case SIGEV_NONE: 956 mq_siginfo.si_signo = 0; 957 mq_siginfo.si_code = SI_MESGQ; 958 break; 959 case SIGEV_SIGNAL: 960 mq_siginfo.si_signo = sigevp->sigev_signo; 961 mq_siginfo.si_value = sigevp->sigev_value; 962 mq_siginfo.si_code = SI_MESGQ; 963 break; 964 case SIGEV_THREAD: 965 if ((tcdp = mqdp->mqd_tcd) != NULL && 966 cancel_if_necessary(tcdp, sigevp)) 967 mqdp->mqd_tcd = NULL; 968 /* FALLTHROUGH */ 969 case SIGEV_PORT: 970 if ((tcdp = mqdp->mqd_tcd) == NULL) { 971 /* we must create a spawner thread */ 972 tcdp = setup_sigev_handler(sigevp, MQ); 973 if (tcdp == NULL) { 974 errno = EBADF; 975 goto bad; 976 } 977 tcdp->tcd_msg_enabled = 0; 978 tcdp->tcd_msg_closing = 0; 979 tcdp->tcd_msg_avail = &mqhp->mq_spawner; 980 if (launch_spawner(tcdp) != 0) { 981 free_sigev_handler(tcdp); 982 goto bad; 983 } 984 mqdp->mqd_tcd = tcdp; 985 } 986 mq_siginfo.si_signo = 0; 987 mq_siginfo.si_code = SI_MESGQ; 988 break; 989 default: 990 errno = EINVAL; 991 goto bad; 992 } 993 994 /* register notification */ 995 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 996 goto bad; 997 mqhp->mq_ntype = ntype; 998 mqhp->mq_des = (uintptr_t)mqdp; 999 switch (ntype) { 1000 case SIGEV_THREAD: 1001 case SIGEV_PORT: 1002 tcdp->tcd_port = port; 1003 tcdp->tcd_msg_object = mqdp; 1004 tcdp->tcd_msg_userval = userval; 1005 sig_mutex_lock(&tcdp->tcd_lock); 1006 tcdp->tcd_msg_enabled = ntype; 1007 sig_mutex_unlock(&tcdp->tcd_lock); 1008 (void) cond_broadcast(&tcdp->tcd_cv); 1009 break; 1010 } 1011 } 1012 1013 rval = 0; /* success */ 1014 bad: 1015 (void) mutex_unlock(&mqhp->mq_exclusive); 1016 return (rval); 1017 } 1018 1019 int 1020 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1021 { 1022 mqdes_t *mqdp = (mqdes_t *)mqdes; 1023 mqhdr_t *mqhp; 1024 uint_t flag = 0; 1025 1026 if (!mq_is_valid(mqdp)) { 1027 errno = EBADF; 1028 return (-1); 1029 } 1030 1031 /* store current attributes */ 1032 if (omqstat != NULL) { 1033 int count; 1034 1035 mqhp = mqdp->mqd_mq; 1036 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1037 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1038 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1039 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1040 omqstat->mq_curmsgs = count; 1041 } 1042 1043 /* set description attributes */ 1044 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1045 flag = FNONBLOCK; 1046 mqdp->mqd_mqdn->mqdn_flags = flag; 1047 1048 return (0); 1049 } 1050 1051 int 1052 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1053 { 1054 mqdes_t *mqdp = (mqdes_t *)mqdes; 1055 mqhdr_t *mqhp; 1056 int count; 1057 1058 if (!mq_is_valid(mqdp)) { 1059 errno = EBADF; 1060 return (-1); 1061 } 1062 1063 mqhp = mqdp->mqd_mq; 1064 1065 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1066 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1067 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1068 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1069 mqstat->mq_curmsgs = count; 1070 return (0); 1071 } 1072 1073 /* 1074 * Cleanup after fork1() in the child process. 1075 */ 1076 void 1077 postfork1_child_sigev_mq(void) 1078 { 1079 thread_communication_data_t *tcdp; 1080 mqdes_t *mqdp; 1081 1082 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1083 if ((tcdp = mqdp->mqd_tcd) != NULL) { 1084 mqdp->mqd_tcd = NULL; 1085 tcd_teardown(tcdp); 1086 } 1087 } 1088 } 1089