1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #pragma weak mq_open = _mq_open 30 #pragma weak mq_close = _mq_close 31 #pragma weak mq_unlink = _mq_unlink 32 #pragma weak mq_send = _mq_send 33 #pragma weak mq_timedsend = _mq_timedsend 34 #pragma weak mq_reltimedsend_np = _mq_reltimedsend_np 35 #pragma weak mq_receive = _mq_receive 36 #pragma weak mq_timedreceive = _mq_timedreceive 37 #pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np 38 #pragma weak mq_notify = _mq_notify 39 #pragma weak mq_setattr = _mq_setattr 40 #pragma weak mq_getattr = _mq_getattr 41 42 #include "synonyms.h" 43 #include "mtlib.h" 44 #define _KMEMUSER 45 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 46 #undef _KMEMUSER 47 #include <mqueue.h> 48 #include <sys/types.h> 49 #include <sys/file.h> 50 #include <sys/mman.h> 51 #include <errno.h> 52 #include <stdarg.h> 53 #include <limits.h> 54 #include <pthread.h> 55 #include <assert.h> 56 #include <string.h> 57 #include <unistd.h> 58 #include <stdlib.h> 59 #include <sys/stat.h> 60 #include <inttypes.h> 61 #include "sigev_thread.h" 62 #include "pos4obj.h" 63 64 /* 65 * Default values per message queue 66 */ 67 #define MQ_MAXMSG 128 68 #define MQ_MAXSIZE 1024 69 70 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 71 72 /* 73 * Message header which is part of messages in link list 74 */ 75 typedef struct { 76 uint64_t msg_next; /* offset of next message in the link */ 77 uint64_t msg_len; /* length of the message */ 78 } msghdr_t; 79 80 /* 81 * message queue description 82 */ 83 struct mq_dn { 84 size_t mqdn_flags; /* open description flags */ 85 }; 86 87 /* 88 * message queue descriptor structure 89 */ 90 typedef struct mq_des { 91 struct mq_des *mqd_next; /* list of all open mq descriptors, */ 92 struct mq_des *mqd_prev; /* needed for fork-safety */ 93 int mqd_magic; /* magic # to identify mq_des */ 94 int mqd_flags; /* operation flag per open */ 95 struct mq_header *mqd_mq; /* address pointer of message Q */ 96 struct mq_dn *mqd_mqdn; /* open description */ 97 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 98 } mqdes_t; 99 100 /* 101 * message queue common header, part of the mmap()ed file. 102 * Since message queues may be shared between 32- and 64-bit processes, 103 * care must be taken to make sure that the elements of this structure 104 * are identical for both _LP64 and _ILP32 cases. 105 */ 106 typedef struct mq_header { 107 /* first field must be mq_totsize, DO NOT insert before this */ 108 int64_t mq_totsize; /* total size of the Queue */ 109 int64_t mq_maxsz; /* max size of each message */ 110 uint32_t mq_maxmsg; /* max messages in the queue */ 111 uint32_t mq_maxprio; /* maximum mqueue priority */ 112 uint32_t mq_curmaxprio; /* current maximum MQ priority */ 113 uint32_t mq_mask; /* priority bitmask */ 114 uint64_t mq_freep; /* free message's head pointer */ 115 uint64_t mq_headpp; /* pointer to head pointers */ 116 uint64_t mq_tailpp; /* pointer to tail pointers */ 117 signotify_id_t mq_sigid; /* notification id (3 int's) */ 118 uint32_t mq_ntype; /* notification type (SIGEV_*) */ 119 uint64_t mq_des; /* pointer to msg Q descriptor */ 120 mutex_t mq_exclusive; /* acquire for exclusive access */ 121 sem_t mq_rblocked; /* number of processes rblocked */ 122 sem_t mq_notfull; /* mq_send()'s block on this */ 123 sem_t mq_notempty; /* mq_receive()'s block on this */ 124 sem_t mq_spawner; /* spawner thread blocks on this */ 125 } mqhdr_t; 126 127 /* 128 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 129 * If this assumption is somehow invalidated, mq_open() needs to be changed 130 * back to the old version which kept a count and enforced a limit. 131 * We make sure that this is pointed out to those changing <sys/param.h> 132 * by checking _MQ_OPEN_MAX at compile time. 133 */ 134 #if _MQ_OPEN_MAX != -1 135 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 136 #endif 137 138 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 139 140 #ifdef DEBUG 141 #define MQ_ASSERT(x) assert(x); 142 143 #define MQ_ASSERT_PTR(_m, _p) \ 144 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 145 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 146 _m->mq_totsize)); 147 148 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 149 int _val; \ 150 (void) sem_getvalue((sem), &_val); \ 151 assert((_val) <= val); } 152 #else 153 #define MQ_ASSERT(x) 154 #define MQ_ASSERT_PTR(_m, _p) 155 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 156 #endif 157 158 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 159 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 160 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 161 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 162 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 163 164 #define MQ_RESERVED ((mqdes_t *)-1) 165 166 #define ABS_TIME 0 167 #define REL_TIME 1 168 169 static mutex_t mq_list_lock = DEFAULTMUTEX; 170 static mqdes_t *mq_list = NULL; 171 172 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 173 174 static int 175 mq_is_valid(mqdes_t *mqdp) 176 { 177 /* 178 * Any use of a message queue after it was closed is 179 * undefined. But the standard strongly favours EBADF 180 * returns. Before we dereference which could be fatal, 181 * we first do some pointer sanity checks. 182 */ 183 if (mqdp != NULL && mqdp != MQ_RESERVED && 184 ((uintptr_t)mqdp & 0x7) == 0) { 185 return (mqdp->mqd_magic == MQ_MAGIC); 186 } 187 188 return (0); 189 } 190 191 static void 192 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 193 { 194 int i; 195 uint64_t temp; 196 uint64_t currentp; 197 uint64_t nextp; 198 199 /* 200 * We only need to initialize the non-zero fields. The use of 201 * ftruncate() on the message queue file assures that the 202 * pages will be zfod. 203 */ 204 (void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL); 205 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 206 (void) sem_init(&mqhp->mq_notempty, 1, 0); 207 (void) sem_init(&mqhp->mq_spawner, 1, 0); 208 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 209 210 mqhp->mq_maxsz = msgsize; 211 mqhp->mq_maxmsg = maxmsg; 212 213 /* 214 * As of this writing (1997), there are 32 message queue priorities. 215 * If this is to change, then the size of the mq_mask will 216 * also have to change. If DEBUG is defined, assert that 217 * _MQ_PRIO_MAX hasn't changed. 218 */ 219 mqhp->mq_maxprio = _MQ_PRIO_MAX; 220 #if defined(DEBUG) 221 /* LINTED always true */ 222 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 223 #endif 224 225 /* 226 * Since the message queue can be mapped into different 227 * virtual address ranges by different processes, we don't 228 * keep track of pointers, only offsets into the shared region. 229 */ 230 mqhp->mq_headpp = sizeof (mqhdr_t); 231 mqhp->mq_tailpp = mqhp->mq_headpp + 232 mqhp->mq_maxprio * sizeof (uint64_t); 233 mqhp->mq_freep = mqhp->mq_tailpp + 234 mqhp->mq_maxprio * sizeof (uint64_t); 235 236 currentp = mqhp->mq_freep; 237 MQ_PTR(mqhp, currentp)->msg_next = 0; 238 239 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 240 for (i = 1; i < mqhp->mq_maxmsg; i++) { 241 nextp = currentp + sizeof (msghdr_t) + temp; 242 MQ_PTR(mqhp, currentp)->msg_next = nextp; 243 MQ_PTR(mqhp, nextp)->msg_next = 0; 244 currentp = nextp; 245 } 246 } 247 248 static size_t 249 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 250 { 251 uint64_t currentp; 252 msghdr_t *curbuf; 253 uint64_t *headpp; 254 uint64_t *tailpp; 255 256 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 257 258 /* 259 * Get the head and tail pointers for the queue of maximum 260 * priority. We shouldn't be here unless there is a message for 261 * us, so it's fair to assert that both the head and tail 262 * pointers are non-NULL. 263 */ 264 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 265 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 266 267 if (msg_prio != NULL) 268 *msg_prio = mqhp->mq_curmaxprio; 269 270 currentp = *headpp; 271 MQ_ASSERT_PTR(mqhp, currentp); 272 curbuf = MQ_PTR(mqhp, currentp); 273 274 if ((*headpp = curbuf->msg_next) == NULL) { 275 /* 276 * We just nuked the last message in this priority's queue. 277 * Twiddle this priority's bit, and then find the next bit 278 * tipped. 279 */ 280 uint_t prio = mqhp->mq_curmaxprio; 281 282 mqhp->mq_mask &= ~(1u << prio); 283 284 for (; prio != 0; prio--) 285 if (mqhp->mq_mask & (1u << prio)) 286 break; 287 mqhp->mq_curmaxprio = prio; 288 289 *tailpp = NULL; 290 } 291 292 /* 293 * Copy the message, and put the buffer back on the free list. 294 */ 295 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 296 curbuf->msg_next = mqhp->mq_freep; 297 mqhp->mq_freep = currentp; 298 299 return (curbuf->msg_len); 300 } 301 302 303 static void 304 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 305 { 306 uint64_t currentp; 307 msghdr_t *curbuf; 308 uint64_t *headpp; 309 uint64_t *tailpp; 310 311 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 312 313 /* 314 * Grab a free message block, and link it in. We shouldn't 315 * be here unless there is room in the queue for us; it's 316 * fair to assert that the free pointer is non-NULL. 317 */ 318 currentp = mqhp->mq_freep; 319 MQ_ASSERT_PTR(mqhp, currentp); 320 curbuf = MQ_PTR(mqhp, currentp); 321 322 /* 323 * Remove a message from the free list, and copy in the new contents. 324 */ 325 mqhp->mq_freep = curbuf->msg_next; 326 curbuf->msg_next = NULL; 327 (void) memcpy((char *)&curbuf[1], msgp, len); 328 curbuf->msg_len = len; 329 330 headpp = HEAD_PTR(mqhp, prio); 331 tailpp = TAIL_PTR(mqhp, prio); 332 333 if (*tailpp == 0) { 334 /* 335 * This is the first message on this queue. Set the 336 * head and tail pointers, and tip the appropriate bit 337 * in the priority mask. 338 */ 339 *headpp = currentp; 340 *tailpp = currentp; 341 mqhp->mq_mask |= (1u << prio); 342 if (prio > mqhp->mq_curmaxprio) 343 mqhp->mq_curmaxprio = prio; 344 } else { 345 MQ_ASSERT_PTR(mqhp, *tailpp); 346 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 347 *tailpp = currentp; 348 } 349 } 350 351 mqd_t 352 _mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 353 { 354 va_list ap; 355 mode_t mode; 356 struct mq_attr *attr; 357 int fd; 358 int err; 359 int cr_flag = 0; 360 int locked = 0; 361 uint64_t total_size; 362 size_t msgsize; 363 ssize_t maxmsg; 364 uint64_t temp; 365 void *ptr; 366 mqdes_t *mqdp; 367 mqhdr_t *mqhp; 368 struct mq_dn *mqdnp; 369 370 if (__pos4obj_check(path) == -1) 371 return ((mqd_t)-1); 372 373 /* acquire MSGQ lock to have atomic operation */ 374 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 375 goto out; 376 locked = 1; 377 378 va_start(ap, oflag); 379 /* filter oflag to have READ/WRITE/CREATE modes only */ 380 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 381 if ((oflag & O_CREAT) != 0) { 382 mode = va_arg(ap, mode_t); 383 attr = va_arg(ap, struct mq_attr *); 384 } 385 va_end(ap); 386 387 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 388 mode, &cr_flag)) < 0) 389 goto out; 390 391 /* closing permission file */ 392 (void) __close_nc(fd); 393 394 /* Try to open/create data file */ 395 if (cr_flag) { 396 cr_flag = PFILE_CREATE; 397 if (attr == NULL) { 398 maxmsg = MQ_MAXMSG; 399 msgsize = MQ_MAXSIZE; 400 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 401 errno = EINVAL; 402 goto out; 403 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 404 errno = ENOSPC; 405 goto out; 406 } else { 407 maxmsg = attr->mq_maxmsg; 408 msgsize = attr->mq_msgsize; 409 } 410 411 /* adjust for message size at word boundary */ 412 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 413 414 total_size = sizeof (mqhdr_t) + 415 maxmsg * (temp + sizeof (msghdr_t)) + 416 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 417 418 if (total_size > SSIZE_MAX) { 419 errno = ENOSPC; 420 goto out; 421 } 422 423 /* 424 * data file is opened with read/write to those 425 * who have read or write permission 426 */ 427 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 428 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 429 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 430 goto out; 431 432 cr_flag |= DFILE_CREATE | DFILE_OPEN; 433 434 /* force permissions to avoid umask effect */ 435 if (fchmod(fd, mode) < 0) 436 goto out; 437 438 if (ftruncate64(fd, (off64_t)total_size) < 0) 439 goto out; 440 } else { 441 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 442 O_RDWR, 0666, &err)) < 0) 443 goto out; 444 cr_flag = DFILE_OPEN; 445 446 /* Message queue has not been initialized yet */ 447 if (read(fd, &total_size, sizeof (total_size)) != 448 sizeof (total_size) || total_size == 0) { 449 errno = ENOENT; 450 goto out; 451 } 452 453 /* Message queue too big for this process to handle */ 454 if (total_size > SSIZE_MAX) { 455 errno = EFBIG; 456 goto out; 457 } 458 } 459 460 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 461 errno = ENOMEM; 462 goto out; 463 } 464 cr_flag |= ALLOC_MEM; 465 466 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 467 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 468 goto out; 469 mqhp = ptr; 470 cr_flag |= DFILE_MMAP; 471 472 /* closing data file */ 473 (void) __close_nc(fd); 474 cr_flag &= ~DFILE_OPEN; 475 476 /* 477 * create, unlink, size, mmap, and close description file 478 * all for a flag word in anonymous shared memory 479 */ 480 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 481 0666, &err)) < 0) 482 goto out; 483 cr_flag |= DFILE_OPEN; 484 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 485 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 486 goto out; 487 488 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 489 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 490 goto out; 491 mqdnp = ptr; 492 cr_flag |= MQDNP_MMAP; 493 494 (void) __close_nc(fd); 495 cr_flag &= ~DFILE_OPEN; 496 497 /* 498 * we follow the same strategy as filesystem open() routine, 499 * where fcntl.h flags are changed to flags defined in file.h. 500 */ 501 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 502 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 503 504 /* new message queue requires initialization */ 505 if ((cr_flag & DFILE_CREATE) != 0) { 506 /* message queue header has to be initialized */ 507 mq_init(mqhp, msgsize, maxmsg); 508 mqhp->mq_totsize = total_size; 509 } 510 mqdp->mqd_mq = mqhp; 511 mqdp->mqd_mqdn = mqdnp; 512 mqdp->mqd_magic = MQ_MAGIC; 513 mqdp->mqd_tcd = NULL; 514 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 515 lmutex_lock(&mq_list_lock); 516 mqdp->mqd_next = mq_list; 517 mqdp->mqd_prev = NULL; 518 if (mq_list) 519 mq_list->mqd_prev = mqdp; 520 mq_list = mqdp; 521 lmutex_unlock(&mq_list_lock); 522 return ((mqd_t)mqdp); 523 } 524 525 locked = 0; /* fall into the error case */ 526 out: 527 err = errno; 528 if ((cr_flag & DFILE_OPEN) != 0) 529 (void) __close_nc(fd); 530 if ((cr_flag & DFILE_CREATE) != 0) 531 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 532 if ((cr_flag & PFILE_CREATE) != 0) 533 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 534 if ((cr_flag & ALLOC_MEM) != 0) 535 free((void *)mqdp); 536 if ((cr_flag & DFILE_MMAP) != 0) 537 (void) munmap((caddr_t)mqhp, (size_t)total_size); 538 if ((cr_flag & MQDNP_MMAP) != 0) 539 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 540 if (locked) 541 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 542 errno = err; 543 return ((mqd_t)-1); 544 } 545 546 static void 547 mq_close_cleanup(mqdes_t *mqdp) 548 { 549 mqhdr_t *mqhp = mqdp->mqd_mq; 550 struct mq_dn *mqdnp = mqdp->mqd_mqdn; 551 552 /* invalidate the descriptor before freeing it */ 553 mqdp->mqd_magic = 0; 554 (void) mutex_unlock(&mqhp->mq_exclusive); 555 556 lmutex_lock(&mq_list_lock); 557 if (mqdp->mqd_next) 558 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 559 if (mqdp->mqd_prev) 560 mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 561 if (mq_list == mqdp) 562 mq_list = mqdp->mqd_next; 563 lmutex_unlock(&mq_list_lock); 564 565 free(mqdp); 566 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 567 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 568 } 569 570 int 571 _mq_close(mqd_t mqdes) 572 { 573 mqdes_t *mqdp = (mqdes_t *)mqdes; 574 mqhdr_t *mqhp; 575 thread_communication_data_t *tcdp; 576 577 if (!mq_is_valid(mqdp)) { 578 errno = EBADF; 579 return (-1); 580 } 581 582 mqhp = mqdp->mqd_mq; 583 (void) mutex_lock(&mqhp->mq_exclusive); 584 585 if (mqhp->mq_des == (uintptr_t)mqdp && 586 mqhp->mq_sigid.sn_pid == getpid()) { 587 /* notification is set for this descriptor, remove it */ 588 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 589 mqhp->mq_ntype = 0; 590 mqhp->mq_des = 0; 591 } 592 593 pthread_cleanup_push(mq_close_cleanup, mqdp); 594 if ((tcdp = mqdp->mqd_tcd) != NULL) { 595 mqdp->mqd_tcd = NULL; 596 del_sigev_mq(tcdp); /* possible cancellation point */ 597 } 598 pthread_cleanup_pop(1); /* finish in the cleanup handler */ 599 600 return (0); 601 } 602 603 int 604 _mq_unlink(const char *path) 605 { 606 int err; 607 608 if (__pos4obj_check(path) < 0) 609 return (-1); 610 611 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 612 return (-1); 613 } 614 615 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 616 617 if (err == 0 || (err == -1 && errno == EEXIST)) { 618 errno = 0; 619 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 620 } 621 622 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 623 return (-1); 624 625 return (err); 626 627 } 628 629 static int 630 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 631 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 632 { 633 mqdes_t *mqdp = (mqdes_t *)mqdes; 634 mqhdr_t *mqhp; 635 int err; 636 int notify = 0; 637 638 /* 639 * sem_*wait() does cancellation, if called. 640 * pthread_testcancel() ensures that cancellation takes place if 641 * there is a cancellation pending when mq_*send() is called. 642 */ 643 pthread_testcancel(); 644 645 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 646 errno = EBADF; 647 return (-1); 648 } 649 650 mqhp = mqdp->mqd_mq; 651 652 if (msg_prio >= mqhp->mq_maxprio) { 653 errno = EINVAL; 654 return (-1); 655 } 656 if (msg_len > mqhp->mq_maxsz) { 657 errno = EMSGSIZE; 658 return (-1); 659 } 660 661 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 662 err = sem_trywait(&mqhp->mq_notfull); 663 else { 664 /* 665 * We might get cancelled here... 666 */ 667 if (timeout == NULL) 668 err = sem_wait(&mqhp->mq_notfull); 669 else if (abs_rel == ABS_TIME) 670 err = sem_timedwait(&mqhp->mq_notfull, timeout); 671 else 672 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 673 } 674 if (err == -1) { 675 /* 676 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 677 * by sem_*wait(), so we can just return. 678 */ 679 return (-1); 680 } 681 682 /* 683 * By the time we're here, we know that we've got the capacity 684 * to add to the queue...now acquire the exclusive lock. 685 */ 686 (void) mutex_lock(&mqhp->mq_exclusive); 687 688 /* 689 * Now determine if we want to kick the notification. POSIX 690 * requires that if a process has registered for notification, 691 * we must kick it when the queue makes an empty to non-empty 692 * transition, and there are no blocked receivers. Note that 693 * this mechanism does _not_ guarantee that the kicked process 694 * will be able to receive a message without blocking; 695 * another receiver could intervene in the meantime. Thus, 696 * the notification mechanism is inherently racy; all we can 697 * do is hope to minimize the window as much as possible. 698 * In general, we want to avoid kicking the notification when 699 * there are clearly receivers blocked. We'll determine if 700 * we want to kick the notification before the mq_putmsg(), 701 * but the actual signotify() won't be done until the message 702 * is on the queue. 703 */ 704 if (mqhp->mq_sigid.sn_pid != 0) { 705 int nmessages, nblocked; 706 707 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 708 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 709 710 if (nmessages == 0 && nblocked == 0) 711 notify = 1; 712 } 713 714 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 715 (void) sem_post(&mqhp->mq_notempty); 716 717 if (notify) { 718 /* notify and also delete the registration */ 719 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 720 if (mqhp->mq_ntype == SIGEV_THREAD || 721 mqhp->mq_ntype == SIGEV_PORT) 722 (void) sem_post(&mqhp->mq_spawner); 723 mqhp->mq_ntype = 0; 724 mqhp->mq_des = 0; 725 } 726 727 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 728 (void) mutex_unlock(&mqhp->mq_exclusive); 729 730 return (0); 731 } 732 733 int 734 _mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 735 { 736 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 737 NULL, ABS_TIME)); 738 } 739 740 int 741 _mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 742 uint_t msg_prio, const timespec_t *abs_timeout) 743 { 744 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 745 abs_timeout, ABS_TIME)); 746 } 747 748 int 749 _mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 750 uint_t msg_prio, const timespec_t *rel_timeout) 751 { 752 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 753 rel_timeout, REL_TIME)); 754 } 755 756 static void 757 decrement_rblocked(mqhdr_t *mqhp) 758 { 759 int cancel_state; 760 761 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 762 while (sem_wait(&mqhp->mq_rblocked) == -1) 763 continue; 764 (void) pthread_setcancelstate(cancel_state, NULL); 765 } 766 767 static ssize_t 768 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 769 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 770 { 771 mqdes_t *mqdp = (mqdes_t *)mqdes; 772 mqhdr_t *mqhp; 773 ssize_t msg_size; 774 int err; 775 776 /* 777 * sem_*wait() does cancellation, if called. 778 * pthread_testcancel() ensures that cancellation takes place if 779 * there is a cancellation pending when mq_*receive() is called. 780 */ 781 pthread_testcancel(); 782 783 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 784 errno = EBADF; 785 return (ssize_t)(-1); 786 } 787 788 mqhp = mqdp->mqd_mq; 789 790 if (msg_len < mqhp->mq_maxsz) { 791 errno = EMSGSIZE; 792 return (ssize_t)(-1); 793 } 794 795 /* 796 * The semaphoring scheme for mq_[timed]receive is a little hairy 797 * thanks to POSIX.1b's arcane notification mechanism. First, 798 * we try to take the common case and do a sem_trywait(). 799 * If that doesn't work, and O_NONBLOCK hasn't been set, 800 * then note that we're going to sleep by incrementing the rblocked 801 * semaphore. We decrement that semaphore after waking up. 802 */ 803 if (sem_trywait(&mqhp->mq_notempty) == -1) { 804 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 805 /* 806 * errno has been set to EAGAIN or EINTR by 807 * sem_trywait(), so we can just return. 808 */ 809 return (-1); 810 } 811 /* 812 * If we're here, then we're probably going to block... 813 * increment the rblocked semaphore. If we get 814 * cancelled, decrement_rblocked() will decrement it. 815 */ 816 (void) sem_post(&mqhp->mq_rblocked); 817 818 pthread_cleanup_push(decrement_rblocked, mqhp); 819 if (timeout == NULL) 820 err = sem_wait(&mqhp->mq_notempty); 821 else if (abs_rel == ABS_TIME) 822 err = sem_timedwait(&mqhp->mq_notempty, timeout); 823 else 824 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 825 pthread_cleanup_pop(1); 826 827 if (err == -1) { 828 /* 829 * We took a signal or timeout while waiting 830 * on mq_notempty... 831 */ 832 return (-1); 833 } 834 } 835 836 (void) mutex_lock(&mqhp->mq_exclusive); 837 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 838 (void) sem_post(&mqhp->mq_notfull); 839 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 840 (void) mutex_unlock(&mqhp->mq_exclusive); 841 842 return (msg_size); 843 } 844 845 ssize_t 846 _mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 847 { 848 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 849 NULL, ABS_TIME)); 850 } 851 852 ssize_t 853 _mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 854 uint_t *msg_prio, const timespec_t *abs_timeout) 855 { 856 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 857 abs_timeout, ABS_TIME)); 858 } 859 860 ssize_t 861 _mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 862 uint_t *msg_prio, const timespec_t *rel_timeout) 863 { 864 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 865 rel_timeout, REL_TIME)); 866 } 867 868 /* 869 * Only used below, in _mq_notify(). 870 * We already have a spawner thread. 871 * Verify that the attributes match; cancel it if necessary. 872 */ 873 static int 874 cancel_if_necessary(thread_communication_data_t *tcdp, 875 const struct sigevent *sigevp) 876 { 877 int do_cancel = !_pthread_attr_equal(tcdp->tcd_attrp, 878 sigevp->sigev_notify_attributes); 879 880 if (do_cancel) { 881 /* 882 * Attributes don't match, cancel the spawner thread. 883 */ 884 (void) pthread_cancel(tcdp->tcd_server_id); 885 } else { 886 /* 887 * Reuse the existing spawner thread with possibly 888 * changed notification function and value. 889 */ 890 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 891 tcdp->tcd_notif.sigev_signo = 0; 892 tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 893 tcdp->tcd_notif.sigev_notify_function = 894 sigevp->sigev_notify_function; 895 } 896 897 return (do_cancel); 898 } 899 900 int 901 _mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 902 { 903 mqdes_t *mqdp = (mqdes_t *)mqdes; 904 mqhdr_t *mqhp; 905 thread_communication_data_t *tcdp; 906 siginfo_t mq_siginfo; 907 struct sigevent sigevent; 908 struct stat64 statb; 909 port_notify_t *pn; 910 void *userval; 911 int rval = -1; 912 int ntype; 913 int port; 914 915 if (!mq_is_valid(mqdp)) { 916 errno = EBADF; 917 return (-1); 918 } 919 920 mqhp = mqdp->mqd_mq; 921 922 (void) mutex_lock(&mqhp->mq_exclusive); 923 924 if (sigevp == NULL) { /* remove notification */ 925 if (mqhp->mq_des == (uintptr_t)mqdp && 926 mqhp->mq_sigid.sn_pid == getpid()) { 927 /* notification is set for this descriptor, remove it */ 928 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 929 if ((tcdp = mqdp->mqd_tcd) != NULL) { 930 sig_mutex_lock(&tcdp->tcd_lock); 931 if (tcdp->tcd_msg_enabled) { 932 /* cancel the spawner thread */ 933 tcdp = mqdp->mqd_tcd; 934 mqdp->mqd_tcd = NULL; 935 (void) pthread_cancel( 936 tcdp->tcd_server_id); 937 } 938 sig_mutex_unlock(&tcdp->tcd_lock); 939 } 940 mqhp->mq_ntype = 0; 941 mqhp->mq_des = 0; 942 } else { 943 /* notification is not set for this descriptor */ 944 errno = EBUSY; 945 goto bad; 946 } 947 } else { /* register notification with this process */ 948 switch (ntype = sigevp->sigev_notify) { 949 case SIGEV_THREAD: 950 userval = sigevp->sigev_value.sival_ptr; 951 port = -1; 952 break; 953 case SIGEV_PORT: 954 pn = sigevp->sigev_value.sival_ptr; 955 userval = pn->portnfy_user; 956 port = pn->portnfy_port; 957 if (fstat64(port, &statb) != 0 || 958 !S_ISPORT(statb.st_mode)) { 959 errno = EBADF; 960 goto bad; 961 } 962 (void) memset(&sigevent, 0, sizeof (sigevent)); 963 sigevent.sigev_notify = SIGEV_PORT; 964 sigevp = &sigevent; 965 break; 966 } 967 switch (ntype) { 968 case SIGEV_NONE: 969 mq_siginfo.si_signo = 0; 970 mq_siginfo.si_code = SI_MESGQ; 971 break; 972 case SIGEV_SIGNAL: 973 mq_siginfo.si_signo = sigevp->sigev_signo; 974 mq_siginfo.si_value = sigevp->sigev_value; 975 mq_siginfo.si_code = SI_MESGQ; 976 break; 977 case SIGEV_THREAD: 978 if ((tcdp = mqdp->mqd_tcd) != NULL && 979 cancel_if_necessary(tcdp, sigevp)) 980 mqdp->mqd_tcd = NULL; 981 /* FALLTHROUGH */ 982 case SIGEV_PORT: 983 if ((tcdp = mqdp->mqd_tcd) == NULL) { 984 /* we must create a spawner thread */ 985 tcdp = setup_sigev_handler(sigevp, MQ); 986 if (tcdp == NULL) { 987 errno = EBADF; 988 goto bad; 989 } 990 tcdp->tcd_msg_enabled = 0; 991 tcdp->tcd_msg_closing = 0; 992 tcdp->tcd_msg_avail = &mqhp->mq_spawner; 993 if (launch_spawner(tcdp) != 0) { 994 free_sigev_handler(tcdp); 995 goto bad; 996 } 997 mqdp->mqd_tcd = tcdp; 998 } 999 mq_siginfo.si_signo = 0; 1000 mq_siginfo.si_code = SI_MESGQ; 1001 break; 1002 default: 1003 errno = EINVAL; 1004 goto bad; 1005 } 1006 1007 /* register notification */ 1008 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 1009 goto bad; 1010 mqhp->mq_ntype = ntype; 1011 mqhp->mq_des = (uintptr_t)mqdp; 1012 switch (ntype) { 1013 case SIGEV_THREAD: 1014 case SIGEV_PORT: 1015 tcdp->tcd_port = port; 1016 tcdp->tcd_msg_object = mqdp; 1017 tcdp->tcd_msg_userval = userval; 1018 sig_mutex_lock(&tcdp->tcd_lock); 1019 tcdp->tcd_msg_enabled = ntype; 1020 sig_mutex_unlock(&tcdp->tcd_lock); 1021 (void) cond_broadcast(&tcdp->tcd_cv); 1022 break; 1023 } 1024 } 1025 1026 rval = 0; /* success */ 1027 bad: 1028 (void) mutex_unlock(&mqhp->mq_exclusive); 1029 return (rval); 1030 } 1031 1032 int 1033 _mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1034 { 1035 mqdes_t *mqdp = (mqdes_t *)mqdes; 1036 mqhdr_t *mqhp; 1037 uint_t flag = 0; 1038 1039 if (!mq_is_valid(mqdp)) { 1040 errno = EBADF; 1041 return (-1); 1042 } 1043 1044 /* store current attributes */ 1045 if (omqstat != NULL) { 1046 int count; 1047 1048 mqhp = mqdp->mqd_mq; 1049 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1050 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1051 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1052 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1053 omqstat->mq_curmsgs = count; 1054 } 1055 1056 /* set description attributes */ 1057 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1058 flag = FNONBLOCK; 1059 mqdp->mqd_mqdn->mqdn_flags = flag; 1060 1061 return (0); 1062 } 1063 1064 int 1065 _mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1066 { 1067 mqdes_t *mqdp = (mqdes_t *)mqdes; 1068 mqhdr_t *mqhp; 1069 int count; 1070 1071 if (!mq_is_valid(mqdp)) { 1072 errno = EBADF; 1073 return (-1); 1074 } 1075 1076 mqhp = mqdp->mqd_mq; 1077 1078 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1079 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1080 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1081 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1082 mqstat->mq_curmsgs = count; 1083 return (0); 1084 } 1085 1086 /* 1087 * Cleanup after fork1() in the child process. 1088 */ 1089 void 1090 postfork1_child_sigev_mq(void) 1091 { 1092 thread_communication_data_t *tcdp; 1093 mqdes_t *mqdp; 1094 1095 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1096 if ((tcdp = mqdp->mqd_tcd) != NULL) { 1097 mqdp->mqd_tcd = NULL; 1098 tcd_teardown(tcdp); 1099 } 1100 } 1101 } 1102