1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 * Copyright 2025 MNX Cloud, Inc. 26 */ 27 28 #include "lint.h" 29 #include "mtlib.h" 30 #define _KMEMUSER 31 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 32 #undef _KMEMUSER 33 #include <mqueue.h> 34 #include <sys/types.h> 35 #include <sys/file.h> 36 #include <sys/mman.h> 37 #include <errno.h> 38 #include <stdarg.h> 39 #include <limits.h> 40 #include <pthread.h> 41 #include <assert.h> 42 #include <string.h> 43 #include <unistd.h> 44 #include <stdlib.h> 45 #include <sys/stat.h> 46 #include <inttypes.h> 47 #include "sigev_thread.h" 48 #include "pos4obj.h" 49 50 /* 51 * Default values per message queue 52 */ 53 #define MQ_MAXMSG 128 54 #define MQ_MAXSIZE 1024 55 56 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 57 58 /* 59 * Message header which is part of messages in link list 60 */ 61 typedef struct { 62 uint64_t msg_next; /* offset of next message in the link */ 63 uint64_t msg_len; /* length of the message */ 64 } msghdr_t; 65 66 /* 67 * message queue description 68 */ 69 struct mq_dn { 70 size_t mqdn_flags; /* open description flags */ 71 }; 72 73 /* 74 * message queue descriptor structure 75 */ 76 typedef struct mq_des { 77 struct mq_des *mqd_next; /* list of all open mq descriptors, */ 78 struct mq_des *mqd_prev; /* needed for fork-safety */ 79 int mqd_magic; /* magic # to identify mq_des */ 80 int mqd_flags; /* operation flag per open */ 81 struct mq_header *mqd_mq; /* address pointer of message Q */ 82 struct mq_dn *mqd_mqdn; /* open description */ 83 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 84 int mqd_ownerdead; /* mq_exclusive is inconsistent */ 85 } mqdes_t; 86 87 /* 88 * message queue common header, part of the mmap()ed file. 89 * Since message queues may be shared between 32- and 64-bit processes, 90 * care must be taken to make sure that the elements of this structure 91 * are identical for both _LP64 and _ILP32 cases. 92 */ 93 typedef struct mq_header { 94 /* first field must be mq_totsize, DO NOT insert before this */ 95 int64_t mq_totsize; /* total size of the Queue */ 96 int64_t mq_maxsz; /* max size of each message */ 97 uint32_t mq_maxmsg; /* max messages in the queue */ 98 uint32_t mq_maxprio; /* maximum mqueue priority */ 99 uint32_t mq_curmaxprio; /* current maximum MQ priority */ 100 uint32_t mq_mask; /* priority bitmask */ 101 uint64_t mq_freep; /* free message's head pointer */ 102 uint64_t mq_headpp; /* pointer to head pointers */ 103 uint64_t mq_tailpp; /* pointer to tail pointers */ 104 signotify_id_t mq_sigid; /* notification id (3 int's) */ 105 uint32_t mq_ntype; /* notification type (SIGEV_*) */ 106 uint64_t mq_des; /* pointer to msg Q descriptor */ 107 mutex_t mq_exclusive; /* acquire for exclusive access */ 108 sem_t mq_rblocked; /* number of processes rblocked */ 109 sem_t mq_notfull; /* mq_send()'s block on this */ 110 sem_t mq_notempty; /* mq_receive()'s block on this */ 111 sem_t mq_spawner; /* spawner thread blocks on this */ 112 } mqhdr_t; 113 114 /* 115 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 116 * If this assumption is somehow invalidated, mq_open() needs to be changed 117 * back to the old version which kept a count and enforced a limit. 118 * We make sure that this is pointed out to those changing <sys/param.h> 119 * by checking _MQ_OPEN_MAX at compile time. 120 */ 121 #if _MQ_OPEN_MAX != -1 122 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 123 #endif 124 125 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 126 127 #ifdef DEBUG 128 #define MQ_ASSERT(x) assert(x); 129 130 #define MQ_ASSERT_PTR(_m, _p) \ 131 assert((_p) != 0 && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 132 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 133 _m->mq_totsize)); 134 135 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 136 int _val; \ 137 (void) sem_getvalue((sem), &_val); \ 138 assert((_val) <= val); } 139 #else 140 #define MQ_ASSERT(x) 141 #define MQ_ASSERT_PTR(_m, _p) 142 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 143 #endif 144 145 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 146 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 147 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 148 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 149 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 150 151 #define MQ_RESERVED ((mqdes_t *)-1) 152 153 #define ABS_TIME 0 154 #define REL_TIME 1 155 156 static mutex_t mq_list_lock = DEFAULTMUTEX; 157 static mqdes_t *mq_list = NULL; 158 159 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 160 161 static int 162 mq_is_valid(mqdes_t *mqdp) 163 { 164 /* 165 * Any use of a message queue after it was closed is 166 * undefined. But the standard strongly favours EBADF 167 * returns. Before we dereference which could be fatal, 168 * we first do some pointer sanity checks. 169 */ 170 if (mqdp != NULL && mqdp != MQ_RESERVED && 171 ((uintptr_t)mqdp & 0x7) == 0) { 172 return (mqdp->mqd_magic == MQ_MAGIC); 173 } 174 175 return (0); 176 } 177 178 static void 179 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 180 { 181 int i; 182 uint64_t temp; 183 uint64_t currentp; 184 uint64_t nextp; 185 186 /* 187 * We only need to initialize the non-zero fields. The use of 188 * ftruncate() on the message queue file assures that the 189 * pages will be zero-filled. 190 */ 191 (void) mutex_init(&mqhp->mq_exclusive, 192 USYNC_PROCESS | LOCK_ROBUST, NULL); 193 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 194 (void) sem_init(&mqhp->mq_notempty, 1, 0); 195 (void) sem_init(&mqhp->mq_spawner, 1, 0); 196 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 197 198 mqhp->mq_maxsz = msgsize; 199 mqhp->mq_maxmsg = maxmsg; 200 201 /* 202 * As of this writing (1997), there are 32 message queue priorities. 203 * If this is to change, then the size of the mq_mask will 204 * also have to change. If DEBUG is defined, assert that 205 * _MQ_PRIO_MAX hasn't changed. 206 */ 207 mqhp->mq_maxprio = _MQ_PRIO_MAX; 208 #if defined(DEBUG) 209 /* LINTED always true */ 210 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 211 #endif 212 213 /* 214 * Since the message queue can be mapped into different 215 * virtual address ranges by different processes, we don't 216 * keep track of pointers, only offsets into the shared region. 217 */ 218 mqhp->mq_headpp = sizeof (mqhdr_t); 219 mqhp->mq_tailpp = mqhp->mq_headpp + 220 mqhp->mq_maxprio * sizeof (uint64_t); 221 mqhp->mq_freep = mqhp->mq_tailpp + 222 mqhp->mq_maxprio * sizeof (uint64_t); 223 224 currentp = mqhp->mq_freep; 225 MQ_PTR(mqhp, currentp)->msg_next = 0; 226 227 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 228 for (i = 1; i < mqhp->mq_maxmsg; i++) { 229 nextp = currentp + sizeof (msghdr_t) + temp; 230 MQ_PTR(mqhp, currentp)->msg_next = nextp; 231 MQ_PTR(mqhp, nextp)->msg_next = 0; 232 currentp = nextp; 233 } 234 } 235 236 static size_t 237 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 238 { 239 uint64_t currentp; 240 msghdr_t *curbuf; 241 uint64_t *headpp; 242 uint64_t *tailpp; 243 244 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 245 246 /* 247 * Get the head and tail pointers for the queue of maximum 248 * priority. We shouldn't be here unless there is a message for 249 * us, so it's fair to assert that both the head and tail 250 * pointers are non-NULL. 251 */ 252 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 253 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 254 255 if (msg_prio != NULL) 256 *msg_prio = mqhp->mq_curmaxprio; 257 258 currentp = *headpp; 259 MQ_ASSERT_PTR(mqhp, currentp); 260 curbuf = MQ_PTR(mqhp, currentp); 261 262 if ((*headpp = curbuf->msg_next) == 0) { 263 /* 264 * We just nuked the last message in this priority's queue. 265 * Twiddle this priority's bit, and then find the next bit 266 * tipped. 267 */ 268 uint_t prio = mqhp->mq_curmaxprio; 269 270 mqhp->mq_mask &= ~(1u << prio); 271 272 for (; prio != 0; prio--) 273 if (mqhp->mq_mask & (1u << prio)) 274 break; 275 mqhp->mq_curmaxprio = prio; 276 277 *tailpp = 0; 278 } 279 280 /* 281 * Copy the message, and put the buffer back on the free list. 282 */ 283 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 284 curbuf->msg_next = mqhp->mq_freep; 285 mqhp->mq_freep = currentp; 286 287 return (curbuf->msg_len); 288 } 289 290 291 static void 292 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 293 { 294 uint64_t currentp; 295 msghdr_t *curbuf; 296 uint64_t *headpp; 297 uint64_t *tailpp; 298 299 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 300 301 /* 302 * Grab a free message block, and link it in. We shouldn't 303 * be here unless there is room in the queue for us; it's 304 * fair to assert that the free pointer is non-NULL. 305 */ 306 currentp = mqhp->mq_freep; 307 MQ_ASSERT_PTR(mqhp, currentp); 308 curbuf = MQ_PTR(mqhp, currentp); 309 310 /* 311 * Remove a message from the free list, and copy in the new contents. 312 */ 313 mqhp->mq_freep = curbuf->msg_next; 314 curbuf->msg_next = 0; 315 (void) memcpy((char *)&curbuf[1], msgp, len); 316 curbuf->msg_len = len; 317 318 headpp = HEAD_PTR(mqhp, prio); 319 tailpp = TAIL_PTR(mqhp, prio); 320 321 if (*tailpp == 0) { 322 /* 323 * This is the first message on this queue. Set the 324 * head and tail pointers, and tip the appropriate bit 325 * in the priority mask. 326 */ 327 *headpp = currentp; 328 *tailpp = currentp; 329 mqhp->mq_mask |= (1u << prio); 330 if (prio > mqhp->mq_curmaxprio) 331 mqhp->mq_curmaxprio = prio; 332 } else { 333 MQ_ASSERT_PTR(mqhp, *tailpp); 334 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 335 *tailpp = currentp; 336 } 337 } 338 339 /* 340 * Send a notification and also delete the registration. 341 */ 342 static void 343 do_notify(mqhdr_t *mqhp) 344 { 345 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 346 if (mqhp->mq_ntype == SIGEV_THREAD || 347 mqhp->mq_ntype == SIGEV_PORT) 348 (void) sem_post(&mqhp->mq_spawner); 349 mqhp->mq_ntype = 0; 350 mqhp->mq_des = 0; 351 } 352 353 /* 354 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE. 355 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that 356 * they fail with errno == EBADMSG. Trigger any registered notification. 357 */ 358 static void 359 owner_dead(mqdes_t *mqdp, int error) 360 { 361 mqhdr_t *mqhp = mqdp->mqd_mq; 362 363 mqdp->mqd_ownerdead = 1; 364 (void) sem_post(&mqhp->mq_notfull); 365 (void) sem_post(&mqhp->mq_notempty); 366 if (error == EOWNERDEAD) { 367 if (mqhp->mq_sigid.sn_pid != 0) 368 do_notify(mqhp); 369 (void) mutex_unlock(&mqhp->mq_exclusive); 370 } 371 errno = EBADMSG; 372 } 373 374 mqd_t 375 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 376 { 377 va_list ap; 378 mode_t mode = 0; 379 struct mq_attr *attr = NULL; 380 int fd; 381 int err; 382 int cr_flag = 0; 383 int locked = 0; 384 uint64_t total_size; 385 size_t msgsize; 386 ssize_t maxmsg; 387 uint64_t temp; 388 void *ptr; 389 mqdes_t *mqdp = NULL; 390 mqhdr_t *mqhp; 391 struct mq_dn *mqdnp; 392 393 if (__pos4obj_check(path) == -1) 394 return ((mqd_t)-1); 395 396 /* acquire MSGQ lock to have atomic operation */ 397 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 398 goto out; 399 locked = 1; 400 401 va_start(ap, oflag); 402 /* filter oflag to have READ/WRITE/CREATE modes only */ 403 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 404 if ((oflag & O_CREAT) != 0) { 405 mode = va_arg(ap, mode_t); 406 attr = va_arg(ap, struct mq_attr *); 407 } 408 va_end(ap); 409 410 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 411 mode, &cr_flag)) < 0) 412 goto out; 413 414 /* closing permission file */ 415 (void) __close_nc(fd); 416 417 /* Try to open/create data file */ 418 if (cr_flag) { 419 cr_flag = PFILE_CREATE; 420 if (attr == NULL) { 421 maxmsg = MQ_MAXMSG; 422 msgsize = MQ_MAXSIZE; 423 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 424 errno = EINVAL; 425 goto out; 426 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 427 errno = ENOSPC; 428 goto out; 429 } else { 430 maxmsg = attr->mq_maxmsg; 431 msgsize = attr->mq_msgsize; 432 } 433 434 /* adjust for message size at word boundary */ 435 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 436 437 total_size = sizeof (mqhdr_t) + 438 maxmsg * (temp + sizeof (msghdr_t)) + 439 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 440 441 if (total_size > SSIZE_MAX) { 442 errno = ENOSPC; 443 goto out; 444 } 445 446 /* 447 * data file is opened with read/write to those 448 * who have read or write permission 449 */ 450 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 451 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 452 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 453 goto out; 454 455 cr_flag |= DFILE_CREATE | DFILE_OPEN; 456 457 /* force permissions to avoid umask effect */ 458 if (fchmod(fd, mode) < 0) 459 goto out; 460 461 if (ftruncate64(fd, (off64_t)total_size) < 0) 462 goto out; 463 } else { 464 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 465 O_RDWR, 0666, &err)) < 0) 466 goto out; 467 cr_flag = DFILE_OPEN; 468 469 /* Message queue has not been initialized yet */ 470 if (read(fd, &total_size, sizeof (total_size)) != 471 sizeof (total_size) || total_size == 0) { 472 errno = ENOENT; 473 goto out; 474 } 475 476 /* Message queue too big for this process to handle */ 477 if (total_size > SSIZE_MAX) { 478 errno = EFBIG; 479 goto out; 480 } 481 } 482 483 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 484 errno = ENOMEM; 485 goto out; 486 } 487 488 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 489 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 490 goto out; 491 mqhp = ptr; 492 cr_flag |= DFILE_MMAP; 493 494 /* closing data file */ 495 (void) __close_nc(fd); 496 cr_flag &= ~DFILE_OPEN; 497 498 /* 499 * create, unlink, size, mmap, and close description file 500 * all for a flag word in anonymous shared memory 501 */ 502 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 503 0666, &err)) < 0) 504 goto out; 505 cr_flag |= DFILE_OPEN; 506 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 507 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 508 goto out; 509 510 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 512 goto out; 513 mqdnp = ptr; 514 cr_flag |= MQDNP_MMAP; 515 516 (void) __close_nc(fd); 517 cr_flag &= ~DFILE_OPEN; 518 519 /* 520 * we follow the same strategy as filesystem open() routine, 521 * where fcntl.h flags are changed to flags defined in file.h. 522 */ 523 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 524 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 525 526 /* new message queue requires initialization */ 527 if ((cr_flag & DFILE_CREATE) != 0) { 528 /* message queue header has to be initialized */ 529 mq_init(mqhp, msgsize, maxmsg); 530 mqhp->mq_totsize = total_size; 531 } 532 mqdp->mqd_mq = mqhp; 533 mqdp->mqd_mqdn = mqdnp; 534 mqdp->mqd_magic = MQ_MAGIC; 535 mqdp->mqd_tcd = NULL; 536 mqdp->mqd_ownerdead = 0; 537 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 538 lmutex_lock(&mq_list_lock); 539 mqdp->mqd_next = mq_list; 540 mqdp->mqd_prev = NULL; 541 if (mq_list) 542 mq_list->mqd_prev = mqdp; 543 mq_list = mqdp; 544 lmutex_unlock(&mq_list_lock); 545 return ((mqd_t)mqdp); 546 } 547 548 locked = 0; /* fall into the error case */ 549 out: 550 err = errno; 551 if ((cr_flag & DFILE_OPEN) != 0) 552 (void) __close_nc(fd); 553 if ((cr_flag & DFILE_CREATE) != 0) 554 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 555 if ((cr_flag & PFILE_CREATE) != 0) 556 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 557 free(mqdp); 558 if ((cr_flag & DFILE_MMAP) != 0) 559 (void) munmap((caddr_t)mqhp, (size_t)total_size); 560 if ((cr_flag & MQDNP_MMAP) != 0) 561 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 562 if (locked) 563 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 564 errno = err; 565 return ((mqd_t)-1); 566 } 567 568 static void 569 mq_close_cleanup(mqdes_t *mqdp) 570 { 571 mqhdr_t *mqhp = mqdp->mqd_mq; 572 struct mq_dn *mqdnp = mqdp->mqd_mqdn; 573 574 /* invalidate the descriptor before freeing it */ 575 mqdp->mqd_magic = 0; 576 if (!mqdp->mqd_ownerdead) 577 (void) mutex_unlock(&mqhp->mq_exclusive); 578 579 lmutex_lock(&mq_list_lock); 580 if (mqdp->mqd_next) 581 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 582 if (mqdp->mqd_prev) 583 mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 584 if (mq_list == mqdp) 585 mq_list = mqdp->mqd_next; 586 lmutex_unlock(&mq_list_lock); 587 588 free(mqdp); 589 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 590 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 591 } 592 593 int 594 mq_close(mqd_t mqdes) 595 { 596 mqdes_t *mqdp = (mqdes_t *)mqdes; 597 mqhdr_t *mqhp; 598 thread_communication_data_t *tcdp; 599 int error; 600 601 if (!mq_is_valid(mqdp)) { 602 errno = EBADF; 603 return (-1); 604 } 605 606 mqhp = mqdp->mqd_mq; 607 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 608 mqdp->mqd_ownerdead = 1; 609 if (error == EOWNERDEAD) 610 (void) mutex_unlock(&mqhp->mq_exclusive); 611 /* carry on regardless, without holding mq_exclusive */ 612 } 613 614 if (mqhp->mq_des == (uintptr_t)mqdp && 615 mqhp->mq_sigid.sn_pid == getpid()) { 616 /* notification is set for this descriptor, remove it */ 617 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 618 mqhp->mq_ntype = 0; 619 mqhp->mq_des = 0; 620 } 621 622 pthread_cleanup_push(mq_close_cleanup, mqdp); 623 if ((tcdp = mqdp->mqd_tcd) != NULL) { 624 mqdp->mqd_tcd = NULL; 625 del_sigev_mq(tcdp); /* possible cancellation point */ 626 } 627 pthread_cleanup_pop(1); /* finish in the cleanup handler */ 628 629 return (0); 630 } 631 632 int 633 mq_unlink(const char *path) 634 { 635 int err; 636 637 if (__pos4obj_check(path) < 0) 638 return (-1); 639 640 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 641 return (-1); 642 } 643 644 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 645 646 if (err == 0 || (err == -1 && errno == EEXIST)) { 647 errno = 0; 648 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 649 } 650 651 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 652 return (-1); 653 654 return (err); 655 656 } 657 658 static int 659 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 660 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 661 { 662 mqdes_t *mqdp = (mqdes_t *)mqdes; 663 mqhdr_t *mqhp; 664 int err; 665 int notify = 0; 666 667 /* 668 * sem_*wait() does cancellation, if called. 669 * pthread_testcancel() ensures that cancellation takes place if 670 * there is a cancellation pending when mq_*send() is called. 671 */ 672 pthread_testcancel(); 673 674 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 675 errno = EBADF; 676 return (-1); 677 } 678 679 mqhp = mqdp->mqd_mq; 680 681 if (msg_prio >= mqhp->mq_maxprio) { 682 errno = EINVAL; 683 return (-1); 684 } 685 if (msg_len > mqhp->mq_maxsz) { 686 errno = EMSGSIZE; 687 return (-1); 688 } 689 690 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 691 err = sem_trywait(&mqhp->mq_notfull); 692 else { 693 /* 694 * We might get cancelled here... 695 */ 696 if (timeout == NULL) 697 err = sem_wait(&mqhp->mq_notfull); 698 else if (abs_rel == ABS_TIME) 699 err = sem_timedwait(&mqhp->mq_notfull, timeout); 700 else 701 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 702 } 703 if (err == -1) { 704 /* 705 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 706 * by sem_*wait(), so we can just return. 707 */ 708 return (-1); 709 } 710 711 /* 712 * By the time we're here, we know that we've got the capacity 713 * to add to the queue...now acquire the exclusive lock. 714 */ 715 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 716 owner_dead(mqdp, err); 717 return (-1); 718 } 719 720 /* 721 * Now determine if we want to kick the notification. POSIX 722 * requires that if a process has registered for notification, 723 * we must kick it when the queue makes an empty to non-empty 724 * transition, and there are no blocked receivers. Note that 725 * this mechanism does _not_ guarantee that the kicked process 726 * will be able to receive a message without blocking; 727 * another receiver could intervene in the meantime. Thus, 728 * the notification mechanism is inherently racy; all we can 729 * do is hope to minimize the window as much as possible. 730 * In general, we want to avoid kicking the notification when 731 * there are clearly receivers blocked. We'll determine if 732 * we want to kick the notification before the mq_putmsg(), 733 * but the actual signotify() won't be done until the message 734 * is on the queue. 735 */ 736 if (mqhp->mq_sigid.sn_pid != 0) { 737 int nmessages, nblocked; 738 739 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 740 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 741 742 if (nmessages == 0 && nblocked == 0) 743 notify = 1; 744 } 745 746 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 747 (void) sem_post(&mqhp->mq_notempty); 748 749 if (notify) { 750 /* notify and also delete the registration */ 751 do_notify(mqhp); 752 } 753 754 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 755 (void) mutex_unlock(&mqhp->mq_exclusive); 756 757 return (0); 758 } 759 760 int 761 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 762 { 763 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 764 NULL, ABS_TIME)); 765 } 766 767 int 768 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 769 uint_t msg_prio, const timespec_t *abs_timeout) 770 { 771 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 772 abs_timeout, ABS_TIME)); 773 } 774 775 int 776 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 777 uint_t msg_prio, const timespec_t *rel_timeout) 778 { 779 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 780 rel_timeout, REL_TIME)); 781 } 782 783 static void 784 decrement_rblocked(mqhdr_t *mqhp) 785 { 786 int cancel_state; 787 788 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 789 while (sem_wait(&mqhp->mq_rblocked) == -1) 790 continue; 791 (void) pthread_setcancelstate(cancel_state, NULL); 792 } 793 794 static ssize_t 795 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 796 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 797 { 798 mqdes_t *mqdp = (mqdes_t *)mqdes; 799 mqhdr_t *mqhp; 800 ssize_t msg_size; 801 int err; 802 803 /* 804 * sem_*wait() does cancellation, if called. 805 * pthread_testcancel() ensures that cancellation takes place if 806 * there is a cancellation pending when mq_*receive() is called. 807 */ 808 pthread_testcancel(); 809 810 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 811 errno = EBADF; 812 return (ssize_t)(-1); 813 } 814 815 mqhp = mqdp->mqd_mq; 816 817 if (msg_len < mqhp->mq_maxsz) { 818 errno = EMSGSIZE; 819 return (ssize_t)(-1); 820 } 821 822 /* 823 * The semaphoring scheme for mq_[timed]receive is a little hairy 824 * thanks to POSIX.1b's arcane notification mechanism. First, 825 * we try to take the common case and do a sem_trywait(). 826 * If that doesn't work, and O_NONBLOCK hasn't been set, 827 * then note that we're going to sleep by incrementing the rblocked 828 * semaphore. We decrement that semaphore after waking up. 829 */ 830 if (sem_trywait(&mqhp->mq_notempty) == -1) { 831 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 832 /* 833 * errno has been set to EAGAIN or EINTR by 834 * sem_trywait(), so we can just return. 835 */ 836 return (-1); 837 } 838 /* 839 * If we're here, then we're probably going to block... 840 * increment the rblocked semaphore. If we get 841 * cancelled, decrement_rblocked() will decrement it. 842 */ 843 (void) sem_post(&mqhp->mq_rblocked); 844 845 pthread_cleanup_push(decrement_rblocked, mqhp); 846 if (timeout == NULL) 847 err = sem_wait(&mqhp->mq_notempty); 848 else if (abs_rel == ABS_TIME) 849 err = sem_timedwait(&mqhp->mq_notempty, timeout); 850 else 851 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 852 pthread_cleanup_pop(1); 853 854 if (err == -1) { 855 /* 856 * We took a signal or timeout while waiting 857 * on mq_notempty... 858 */ 859 return (-1); 860 } 861 } 862 863 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 864 owner_dead(mqdp, err); 865 return (-1); 866 } 867 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 868 (void) sem_post(&mqhp->mq_notfull); 869 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 870 (void) mutex_unlock(&mqhp->mq_exclusive); 871 872 return (msg_size); 873 } 874 875 ssize_t 876 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 877 { 878 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 879 NULL, ABS_TIME)); 880 } 881 882 ssize_t 883 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 884 uint_t *msg_prio, const timespec_t *abs_timeout) 885 { 886 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 887 abs_timeout, ABS_TIME)); 888 } 889 890 ssize_t 891 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 892 uint_t *msg_prio, const timespec_t *rel_timeout) 893 { 894 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 895 rel_timeout, REL_TIME)); 896 } 897 898 /* 899 * Only used below, in mq_notify(). 900 * We already have a spawner thread. 901 * Verify that the attributes match; cancel it if necessary. 902 */ 903 static int 904 cancel_if_necessary(thread_communication_data_t *tcdp, 905 const struct sigevent *sigevp) 906 { 907 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp, 908 sigevp->sigev_notify_attributes); 909 910 if (do_cancel) { 911 /* 912 * Attributes don't match, cancel the spawner thread. 913 */ 914 (void) pthread_cancel(tcdp->tcd_server_id); 915 } else { 916 /* 917 * Reuse the existing spawner thread with possibly 918 * changed notification function and value. 919 */ 920 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 921 tcdp->tcd_notif.sigev_signo = 0; 922 tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 923 tcdp->tcd_notif.sigev_notify_function = 924 sigevp->sigev_notify_function; 925 } 926 927 return (do_cancel); 928 } 929 930 int 931 mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 932 { 933 mqdes_t *mqdp = (mqdes_t *)mqdes; 934 mqhdr_t *mqhp; 935 thread_communication_data_t *tcdp; 936 siginfo_t mq_siginfo; 937 struct sigevent sigevent; 938 struct stat64 statb; 939 port_notify_t *pn; 940 void *userval; 941 int rval = -1; 942 int ntype; 943 int port; 944 int error; 945 946 if (!mq_is_valid(mqdp)) { 947 errno = EBADF; 948 return (-1); 949 } 950 951 mqhp = mqdp->mqd_mq; 952 953 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 954 mqdp->mqd_ownerdead = 1; 955 sigevp = NULL; 956 if (error == EOWNERDEAD) 957 (void) mutex_unlock(&mqhp->mq_exclusive); 958 /* carry on regardless, without holding mq_exclusive */ 959 } 960 961 if (sigevp == NULL) { /* remove notification */ 962 if (mqhp->mq_des == (uintptr_t)mqdp && 963 mqhp->mq_sigid.sn_pid == getpid()) { 964 /* notification is set for this descriptor, remove it */ 965 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 966 if ((tcdp = mqdp->mqd_tcd) != NULL) { 967 sig_mutex_lock(&tcdp->tcd_lock); 968 if (tcdp->tcd_msg_enabled) { 969 /* cancel the spawner thread */ 970 tcdp = mqdp->mqd_tcd; 971 mqdp->mqd_tcd = NULL; 972 (void) pthread_cancel( 973 tcdp->tcd_server_id); 974 } 975 sig_mutex_unlock(&tcdp->tcd_lock); 976 } 977 mqhp->mq_ntype = 0; 978 mqhp->mq_des = 0; 979 } else { 980 /* notification is not set for this descriptor */ 981 errno = EBUSY; 982 goto bad; 983 } 984 } else { /* register notification with this process */ 985 switch (ntype = sigevp->sigev_notify) { 986 case SIGEV_THREAD: 987 userval = sigevp->sigev_value.sival_ptr; 988 port = -1; 989 break; 990 case SIGEV_PORT: 991 pn = sigevp->sigev_value.sival_ptr; 992 userval = pn->portnfy_user; 993 port = pn->portnfy_port; 994 if (fstat64(port, &statb) != 0 || 995 !S_ISPORT(statb.st_mode)) { 996 errno = EBADF; 997 goto bad; 998 } 999 (void) memset(&sigevent, 0, sizeof (sigevent)); 1000 sigevent.sigev_notify = SIGEV_PORT; 1001 sigevp = &sigevent; 1002 break; 1003 } 1004 switch (ntype) { 1005 case SIGEV_NONE: 1006 mq_siginfo.si_signo = 0; 1007 mq_siginfo.si_code = SI_MESGQ; 1008 break; 1009 case SIGEV_SIGNAL: 1010 mq_siginfo.si_signo = sigevp->sigev_signo; 1011 mq_siginfo.si_value = sigevp->sigev_value; 1012 mq_siginfo.si_code = SI_MESGQ; 1013 break; 1014 case SIGEV_THREAD: 1015 if ((tcdp = mqdp->mqd_tcd) != NULL && 1016 cancel_if_necessary(tcdp, sigevp)) 1017 mqdp->mqd_tcd = NULL; 1018 /* FALLTHROUGH */ 1019 case SIGEV_PORT: 1020 if ((tcdp = mqdp->mqd_tcd) == NULL) { 1021 /* we must create a spawner thread */ 1022 tcdp = setup_sigev_handler(sigevp, MQ); 1023 if (tcdp == NULL) { 1024 errno = EBADF; 1025 goto bad; 1026 } 1027 tcdp->tcd_msg_enabled = 0; 1028 tcdp->tcd_msg_closing = 0; 1029 tcdp->tcd_msg_avail = &mqhp->mq_spawner; 1030 if (launch_spawner(tcdp) != 0) { 1031 free_sigev_handler(tcdp); 1032 goto bad; 1033 } 1034 mqdp->mqd_tcd = tcdp; 1035 } 1036 mq_siginfo.si_signo = 0; 1037 mq_siginfo.si_code = SI_MESGQ; 1038 break; 1039 default: 1040 errno = EINVAL; 1041 goto bad; 1042 } 1043 1044 /* register notification */ 1045 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 1046 goto bad; 1047 mqhp->mq_ntype = ntype; 1048 mqhp->mq_des = (uintptr_t)mqdp; 1049 switch (ntype) { 1050 case SIGEV_THREAD: 1051 case SIGEV_PORT: 1052 tcdp->tcd_port = port; 1053 tcdp->tcd_msg_object = mqdp; 1054 tcdp->tcd_msg_userval = userval; 1055 sig_mutex_lock(&tcdp->tcd_lock); 1056 tcdp->tcd_msg_enabled = ntype; 1057 sig_mutex_unlock(&tcdp->tcd_lock); 1058 (void) cond_broadcast(&tcdp->tcd_cv); 1059 break; 1060 } 1061 } 1062 1063 rval = 0; /* success */ 1064 bad: 1065 if (error == 0) { 1066 (void) mutex_unlock(&mqhp->mq_exclusive); 1067 } else { 1068 errno = EBADMSG; 1069 rval = -1; 1070 } 1071 return (rval); 1072 } 1073 1074 int 1075 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1076 { 1077 mqdes_t *mqdp = (mqdes_t *)mqdes; 1078 mqhdr_t *mqhp; 1079 uint_t flag = 0; 1080 1081 if (!mq_is_valid(mqdp)) { 1082 errno = EBADF; 1083 return (-1); 1084 } 1085 1086 /* store current attributes */ 1087 if (omqstat != NULL) { 1088 int count; 1089 1090 mqhp = mqdp->mqd_mq; 1091 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1092 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1093 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1094 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1095 omqstat->mq_curmsgs = count; 1096 } 1097 1098 /* set description attributes */ 1099 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1100 flag = FNONBLOCK; 1101 mqdp->mqd_mqdn->mqdn_flags = flag; 1102 1103 return (0); 1104 } 1105 1106 int 1107 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1108 { 1109 mqdes_t *mqdp = (mqdes_t *)mqdes; 1110 mqhdr_t *mqhp; 1111 int count; 1112 1113 if (!mq_is_valid(mqdp)) { 1114 errno = EBADF; 1115 return (-1); 1116 } 1117 1118 mqhp = mqdp->mqd_mq; 1119 1120 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1121 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1122 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1123 (void) sem_getvalue(&mqhp->mq_notempty, &count); 1124 mqstat->mq_curmsgs = count; 1125 return (0); 1126 } 1127 1128 /* 1129 * Cleanup after fork1() in the child process. 1130 */ 1131 void 1132 postfork1_child_sigev_mq(void) 1133 { 1134 thread_communication_data_t *tcdp; 1135 mqdes_t *mqdp; 1136 1137 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1138 if ((tcdp = mqdp->mqd_tcd) != NULL) { 1139 mqdp->mqd_tcd = NULL; 1140 tcd_teardown(tcdp); 1141 } 1142 } 1143 } 1144