1f841f6adSraf /* 2f841f6adSraf * CDDL HEADER START 3f841f6adSraf * 4f841f6adSraf * The contents of this file are subject to the terms of the 5f841f6adSraf * Common Development and Distribution License (the "License"). 6f841f6adSraf * You may not use this file except in compliance with the License. 7f841f6adSraf * 8f841f6adSraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9f841f6adSraf * or http://www.opensolaris.org/os/licensing. 10f841f6adSraf * See the License for the specific language governing permissions 11f841f6adSraf * and limitations under the License. 12f841f6adSraf * 13f841f6adSraf * When distributing Covered Code, include this CDDL HEADER in each 14f841f6adSraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15f841f6adSraf * If applicable, add the following below this CDDL HEADER, with the 16f841f6adSraf * fields enclosed by brackets "[]" replaced with your own identifying 17f841f6adSraf * information: Portions Copyright [yyyy] [name of copyright owner] 18f841f6adSraf * 19f841f6adSraf * CDDL HEADER END 20f841f6adSraf */ 21f841f6adSraf 22f841f6adSraf /* 23a574db85Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24f841f6adSraf * Use is subject to license terms. 25f841f6adSraf */ 26f841f6adSraf 27f841f6adSraf #pragma ident "%Z%%M% %I% %E% SMI" 28f841f6adSraf 297257d1b4Sraf #include "lint.h" 30f841f6adSraf #include "mtlib.h" 31f841f6adSraf #define _KMEMUSER 32f841f6adSraf #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 33f841f6adSraf #undef _KMEMUSER 34f841f6adSraf #include <mqueue.h> 35f841f6adSraf #include <sys/types.h> 36f841f6adSraf #include <sys/file.h> 37f841f6adSraf #include <sys/mman.h> 38f841f6adSraf #include <errno.h> 39f841f6adSraf #include <stdarg.h> 40f841f6adSraf #include <limits.h> 41f841f6adSraf #include <pthread.h> 42f841f6adSraf #include <assert.h> 43f841f6adSraf #include <string.h> 44f841f6adSraf #include <unistd.h> 45f841f6adSraf #include <stdlib.h> 46f841f6adSraf #include <sys/stat.h> 47f841f6adSraf #include <inttypes.h> 48f841f6adSraf #include "sigev_thread.h" 49f841f6adSraf #include "pos4obj.h" 50f841f6adSraf 51f841f6adSraf /* 52f841f6adSraf * Default values per message queue 53f841f6adSraf */ 54f841f6adSraf #define MQ_MAXMSG 128 55f841f6adSraf #define MQ_MAXSIZE 1024 56f841f6adSraf 57f841f6adSraf #define MQ_MAGIC 0x4d534751 /* "MSGQ" */ 58f841f6adSraf 59f841f6adSraf /* 60f841f6adSraf * Message header which is part of messages in link list 61f841f6adSraf */ 62f841f6adSraf typedef struct { 63f841f6adSraf uint64_t msg_next; /* offset of next message in the link */ 64f841f6adSraf uint64_t msg_len; /* length of the message */ 65f841f6adSraf } msghdr_t; 66f841f6adSraf 67f841f6adSraf /* 68f841f6adSraf * message queue description 69f841f6adSraf */ 70f841f6adSraf struct mq_dn { 71f841f6adSraf size_t mqdn_flags; /* open description flags */ 72f841f6adSraf }; 73f841f6adSraf 74f841f6adSraf /* 75f841f6adSraf * message queue descriptor structure 76f841f6adSraf */ 77f841f6adSraf typedef struct mq_des { 78f841f6adSraf struct mq_des *mqd_next; /* list of all open mq descriptors, */ 79f841f6adSraf struct mq_des *mqd_prev; /* needed for fork-safety */ 80f841f6adSraf int mqd_magic; /* magic # to identify mq_des */ 81f841f6adSraf int mqd_flags; /* operation flag per open */ 82f841f6adSraf struct mq_header *mqd_mq; /* address pointer of message Q */ 83f841f6adSraf struct mq_dn *mqd_mqdn; /* open description */ 84f841f6adSraf thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ 85*34537ab1Sraf int mqd_ownerdead; /* mq_exclusive is inconsistent */ 86f841f6adSraf } mqdes_t; 87f841f6adSraf 88f841f6adSraf /* 89f841f6adSraf * message queue common header, part of the mmap()ed file. 90f841f6adSraf * Since message queues may be shared between 32- and 64-bit processes, 91f841f6adSraf * care must be taken to make sure that the elements of this structure 92f841f6adSraf * are identical for both _LP64 and _ILP32 cases. 93f841f6adSraf */ 94f841f6adSraf typedef struct mq_header { 95f841f6adSraf /* first field must be mq_totsize, DO NOT insert before this */ 96f841f6adSraf int64_t mq_totsize; /* total size of the Queue */ 97f841f6adSraf int64_t mq_maxsz; /* max size of each message */ 98f841f6adSraf uint32_t mq_maxmsg; /* max messages in the queue */ 99f841f6adSraf uint32_t mq_maxprio; /* maximum mqueue priority */ 100f841f6adSraf uint32_t mq_curmaxprio; /* current maximum MQ priority */ 101f841f6adSraf uint32_t mq_mask; /* priority bitmask */ 102f841f6adSraf uint64_t mq_freep; /* free message's head pointer */ 103f841f6adSraf uint64_t mq_headpp; /* pointer to head pointers */ 104f841f6adSraf uint64_t mq_tailpp; /* pointer to tail pointers */ 105f841f6adSraf signotify_id_t mq_sigid; /* notification id (3 int's) */ 106f841f6adSraf uint32_t mq_ntype; /* notification type (SIGEV_*) */ 107f841f6adSraf uint64_t mq_des; /* pointer to msg Q descriptor */ 108f841f6adSraf mutex_t mq_exclusive; /* acquire for exclusive access */ 109f841f6adSraf sem_t mq_rblocked; /* number of processes rblocked */ 110f841f6adSraf sem_t mq_notfull; /* mq_send()'s block on this */ 111f841f6adSraf sem_t mq_notempty; /* mq_receive()'s block on this */ 112f841f6adSraf sem_t mq_spawner; /* spawner thread blocks on this */ 113f841f6adSraf } mqhdr_t; 114f841f6adSraf 115f841f6adSraf /* 116f841f6adSraf * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 117f841f6adSraf * If this assumption is somehow invalidated, mq_open() needs to be changed 118f841f6adSraf * back to the old version which kept a count and enforced a limit. 119f841f6adSraf * We make sure that this is pointed out to those changing <sys/param.h> 120f841f6adSraf * by checking _MQ_OPEN_MAX at compile time. 121f841f6adSraf */ 122f841f6adSraf #if _MQ_OPEN_MAX != -1 123f841f6adSraf #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 124f841f6adSraf #endif 125f841f6adSraf 126f841f6adSraf #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 127f841f6adSraf 128f841f6adSraf #ifdef DEBUG 129f841f6adSraf #define MQ_ASSERT(x) assert(x); 130f841f6adSraf 131f841f6adSraf #define MQ_ASSERT_PTR(_m, _p) \ 132f841f6adSraf assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 133f841f6adSraf !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 134f841f6adSraf _m->mq_totsize)); 135f841f6adSraf 136f841f6adSraf #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 137f841f6adSraf int _val; \ 138f841f6adSraf (void) sem_getvalue((sem), &_val); \ 139f841f6adSraf assert((_val) <= val); } 140f841f6adSraf #else 141f841f6adSraf #define MQ_ASSERT(x) 142f841f6adSraf #define MQ_ASSERT_PTR(_m, _p) 143f841f6adSraf #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 144f841f6adSraf #endif 145f841f6adSraf 146f841f6adSraf #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 147f841f6adSraf #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 148f841f6adSraf (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 149f841f6adSraf #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 150f841f6adSraf (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 151f841f6adSraf 152f841f6adSraf #define MQ_RESERVED ((mqdes_t *)-1) 153f841f6adSraf 154f841f6adSraf #define ABS_TIME 0 155f841f6adSraf #define REL_TIME 1 156f841f6adSraf 157f841f6adSraf static mutex_t mq_list_lock = DEFAULTMUTEX; 158f841f6adSraf static mqdes_t *mq_list = NULL; 159f841f6adSraf 160f841f6adSraf extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); 161f841f6adSraf 162f841f6adSraf static int 163f841f6adSraf mq_is_valid(mqdes_t *mqdp) 164f841f6adSraf { 165f841f6adSraf /* 166f841f6adSraf * Any use of a message queue after it was closed is 167f841f6adSraf * undefined. But the standard strongly favours EBADF 168f841f6adSraf * returns. Before we dereference which could be fatal, 169f841f6adSraf * we first do some pointer sanity checks. 170f841f6adSraf */ 171f841f6adSraf if (mqdp != NULL && mqdp != MQ_RESERVED && 172f841f6adSraf ((uintptr_t)mqdp & 0x7) == 0) { 173f841f6adSraf return (mqdp->mqd_magic == MQ_MAGIC); 174f841f6adSraf } 175f841f6adSraf 176f841f6adSraf return (0); 177f841f6adSraf } 178f841f6adSraf 179f841f6adSraf static void 180f841f6adSraf mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 181f841f6adSraf { 182f841f6adSraf int i; 183f841f6adSraf uint64_t temp; 184f841f6adSraf uint64_t currentp; 185f841f6adSraf uint64_t nextp; 186f841f6adSraf 187f841f6adSraf /* 188f841f6adSraf * We only need to initialize the non-zero fields. The use of 189f841f6adSraf * ftruncate() on the message queue file assures that the 190*34537ab1Sraf * pages will be zero-filled. 191f841f6adSraf */ 192*34537ab1Sraf (void) mutex_init(&mqhp->mq_exclusive, 193*34537ab1Sraf USYNC_PROCESS | LOCK_ROBUST, NULL); 194f841f6adSraf (void) sem_init(&mqhp->mq_rblocked, 1, 0); 195f841f6adSraf (void) sem_init(&mqhp->mq_notempty, 1, 0); 196f841f6adSraf (void) sem_init(&mqhp->mq_spawner, 1, 0); 197f841f6adSraf (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 198f841f6adSraf 199f841f6adSraf mqhp->mq_maxsz = msgsize; 200f841f6adSraf mqhp->mq_maxmsg = maxmsg; 201f841f6adSraf 202f841f6adSraf /* 203f841f6adSraf * As of this writing (1997), there are 32 message queue priorities. 204f841f6adSraf * If this is to change, then the size of the mq_mask will 205f841f6adSraf * also have to change. If DEBUG is defined, assert that 206f841f6adSraf * _MQ_PRIO_MAX hasn't changed. 207f841f6adSraf */ 208f841f6adSraf mqhp->mq_maxprio = _MQ_PRIO_MAX; 209f841f6adSraf #if defined(DEBUG) 210f841f6adSraf /* LINTED always true */ 211f841f6adSraf MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 212f841f6adSraf #endif 213f841f6adSraf 214f841f6adSraf /* 215f841f6adSraf * Since the message queue can be mapped into different 216f841f6adSraf * virtual address ranges by different processes, we don't 217f841f6adSraf * keep track of pointers, only offsets into the shared region. 218f841f6adSraf */ 219f841f6adSraf mqhp->mq_headpp = sizeof (mqhdr_t); 220f841f6adSraf mqhp->mq_tailpp = mqhp->mq_headpp + 221f841f6adSraf mqhp->mq_maxprio * sizeof (uint64_t); 222f841f6adSraf mqhp->mq_freep = mqhp->mq_tailpp + 223f841f6adSraf mqhp->mq_maxprio * sizeof (uint64_t); 224f841f6adSraf 225f841f6adSraf currentp = mqhp->mq_freep; 226f841f6adSraf MQ_PTR(mqhp, currentp)->msg_next = 0; 227f841f6adSraf 228f841f6adSraf temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 229f841f6adSraf for (i = 1; i < mqhp->mq_maxmsg; i++) { 230f841f6adSraf nextp = currentp + sizeof (msghdr_t) + temp; 231f841f6adSraf MQ_PTR(mqhp, currentp)->msg_next = nextp; 232f841f6adSraf MQ_PTR(mqhp, nextp)->msg_next = 0; 233f841f6adSraf currentp = nextp; 234f841f6adSraf } 235f841f6adSraf } 236f841f6adSraf 237f841f6adSraf static size_t 238f841f6adSraf mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 239f841f6adSraf { 240f841f6adSraf uint64_t currentp; 241f841f6adSraf msghdr_t *curbuf; 242f841f6adSraf uint64_t *headpp; 243f841f6adSraf uint64_t *tailpp; 244f841f6adSraf 245f841f6adSraf MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 246f841f6adSraf 247f841f6adSraf /* 248f841f6adSraf * Get the head and tail pointers for the queue of maximum 249f841f6adSraf * priority. We shouldn't be here unless there is a message for 250f841f6adSraf * us, so it's fair to assert that both the head and tail 251f841f6adSraf * pointers are non-NULL. 252f841f6adSraf */ 253f841f6adSraf headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 254f841f6adSraf tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 255f841f6adSraf 256f841f6adSraf if (msg_prio != NULL) 257f841f6adSraf *msg_prio = mqhp->mq_curmaxprio; 258f841f6adSraf 259f841f6adSraf currentp = *headpp; 260f841f6adSraf MQ_ASSERT_PTR(mqhp, currentp); 261f841f6adSraf curbuf = MQ_PTR(mqhp, currentp); 262f841f6adSraf 263f841f6adSraf if ((*headpp = curbuf->msg_next) == NULL) { 264f841f6adSraf /* 265f841f6adSraf * We just nuked the last message in this priority's queue. 266f841f6adSraf * Twiddle this priority's bit, and then find the next bit 267f841f6adSraf * tipped. 268f841f6adSraf */ 269f841f6adSraf uint_t prio = mqhp->mq_curmaxprio; 270f841f6adSraf 271f841f6adSraf mqhp->mq_mask &= ~(1u << prio); 272f841f6adSraf 273f841f6adSraf for (; prio != 0; prio--) 274f841f6adSraf if (mqhp->mq_mask & (1u << prio)) 275f841f6adSraf break; 276f841f6adSraf mqhp->mq_curmaxprio = prio; 277f841f6adSraf 278f841f6adSraf *tailpp = NULL; 279f841f6adSraf } 280f841f6adSraf 281f841f6adSraf /* 282f841f6adSraf * Copy the message, and put the buffer back on the free list. 283f841f6adSraf */ 284f841f6adSraf (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 285f841f6adSraf curbuf->msg_next = mqhp->mq_freep; 286f841f6adSraf mqhp->mq_freep = currentp; 287f841f6adSraf 288f841f6adSraf return (curbuf->msg_len); 289f841f6adSraf } 290f841f6adSraf 291f841f6adSraf 292f841f6adSraf static void 293f841f6adSraf mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 294f841f6adSraf { 295f841f6adSraf uint64_t currentp; 296f841f6adSraf msghdr_t *curbuf; 297f841f6adSraf uint64_t *headpp; 298f841f6adSraf uint64_t *tailpp; 299f841f6adSraf 300f841f6adSraf MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); 301f841f6adSraf 302f841f6adSraf /* 303f841f6adSraf * Grab a free message block, and link it in. We shouldn't 304f841f6adSraf * be here unless there is room in the queue for us; it's 305f841f6adSraf * fair to assert that the free pointer is non-NULL. 306f841f6adSraf */ 307f841f6adSraf currentp = mqhp->mq_freep; 308f841f6adSraf MQ_ASSERT_PTR(mqhp, currentp); 309f841f6adSraf curbuf = MQ_PTR(mqhp, currentp); 310f841f6adSraf 311f841f6adSraf /* 312f841f6adSraf * Remove a message from the free list, and copy in the new contents. 313f841f6adSraf */ 314f841f6adSraf mqhp->mq_freep = curbuf->msg_next; 315f841f6adSraf curbuf->msg_next = NULL; 316f841f6adSraf (void) memcpy((char *)&curbuf[1], msgp, len); 317f841f6adSraf curbuf->msg_len = len; 318f841f6adSraf 319f841f6adSraf headpp = HEAD_PTR(mqhp, prio); 320f841f6adSraf tailpp = TAIL_PTR(mqhp, prio); 321f841f6adSraf 322f841f6adSraf if (*tailpp == 0) { 323f841f6adSraf /* 324f841f6adSraf * This is the first message on this queue. Set the 325f841f6adSraf * head and tail pointers, and tip the appropriate bit 326f841f6adSraf * in the priority mask. 327f841f6adSraf */ 328f841f6adSraf *headpp = currentp; 329f841f6adSraf *tailpp = currentp; 330f841f6adSraf mqhp->mq_mask |= (1u << prio); 331f841f6adSraf if (prio > mqhp->mq_curmaxprio) 332f841f6adSraf mqhp->mq_curmaxprio = prio; 333f841f6adSraf } else { 334f841f6adSraf MQ_ASSERT_PTR(mqhp, *tailpp); 335f841f6adSraf MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 336f841f6adSraf *tailpp = currentp; 337f841f6adSraf } 338f841f6adSraf } 339f841f6adSraf 340*34537ab1Sraf /* 341*34537ab1Sraf * Send a notification and also delete the registration. 342*34537ab1Sraf */ 343*34537ab1Sraf static void 344*34537ab1Sraf do_notify(mqhdr_t *mqhp) 345*34537ab1Sraf { 346*34537ab1Sraf (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 347*34537ab1Sraf if (mqhp->mq_ntype == SIGEV_THREAD || 348*34537ab1Sraf mqhp->mq_ntype == SIGEV_PORT) 349*34537ab1Sraf (void) sem_post(&mqhp->mq_spawner); 350*34537ab1Sraf mqhp->mq_ntype = 0; 351*34537ab1Sraf mqhp->mq_des = 0; 352*34537ab1Sraf } 353*34537ab1Sraf 354*34537ab1Sraf /* 355*34537ab1Sraf * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE. 356*34537ab1Sraf * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that 357*34537ab1Sraf * they fail with errno == EBADMSG. Trigger any registered notification. 358*34537ab1Sraf */ 359*34537ab1Sraf static void 360*34537ab1Sraf owner_dead(mqdes_t *mqdp, int error) 361*34537ab1Sraf { 362*34537ab1Sraf mqhdr_t *mqhp = mqdp->mqd_mq; 363*34537ab1Sraf 364*34537ab1Sraf mqdp->mqd_ownerdead = 1; 365*34537ab1Sraf (void) sem_post(&mqhp->mq_notfull); 366*34537ab1Sraf (void) sem_post(&mqhp->mq_notempty); 367*34537ab1Sraf if (error == EOWNERDEAD) { 368*34537ab1Sraf if (mqhp->mq_sigid.sn_pid != 0) 369*34537ab1Sraf do_notify(mqhp); 370*34537ab1Sraf (void) mutex_unlock(&mqhp->mq_exclusive); 371*34537ab1Sraf } 372*34537ab1Sraf errno = EBADMSG; 373*34537ab1Sraf } 374*34537ab1Sraf 375f841f6adSraf mqd_t 3767257d1b4Sraf mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 377f841f6adSraf { 378f841f6adSraf va_list ap; 379*34537ab1Sraf mode_t mode = 0; 380*34537ab1Sraf struct mq_attr *attr = NULL; 381f841f6adSraf int fd; 382f841f6adSraf int err; 383f841f6adSraf int cr_flag = 0; 384f841f6adSraf int locked = 0; 385f841f6adSraf uint64_t total_size; 386f841f6adSraf size_t msgsize; 387f841f6adSraf ssize_t maxmsg; 388f841f6adSraf uint64_t temp; 389f841f6adSraf void *ptr; 390f841f6adSraf mqdes_t *mqdp; 391f841f6adSraf mqhdr_t *mqhp; 392f841f6adSraf struct mq_dn *mqdnp; 393f841f6adSraf 394f841f6adSraf if (__pos4obj_check(path) == -1) 395f841f6adSraf return ((mqd_t)-1); 396f841f6adSraf 397f841f6adSraf /* acquire MSGQ lock to have atomic operation */ 398f841f6adSraf if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 399f841f6adSraf goto out; 400f841f6adSraf locked = 1; 401f841f6adSraf 402f841f6adSraf va_start(ap, oflag); 403f841f6adSraf /* filter oflag to have READ/WRITE/CREATE modes only */ 404f841f6adSraf oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 405f841f6adSraf if ((oflag & O_CREAT) != 0) { 406f841f6adSraf mode = va_arg(ap, mode_t); 407f841f6adSraf attr = va_arg(ap, struct mq_attr *); 408f841f6adSraf } 409f841f6adSraf va_end(ap); 410f841f6adSraf 411f841f6adSraf if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 412f841f6adSraf mode, &cr_flag)) < 0) 413f841f6adSraf goto out; 414f841f6adSraf 415f841f6adSraf /* closing permission file */ 416f841f6adSraf (void) __close_nc(fd); 417f841f6adSraf 418f841f6adSraf /* Try to open/create data file */ 419f841f6adSraf if (cr_flag) { 420f841f6adSraf cr_flag = PFILE_CREATE; 421f841f6adSraf if (attr == NULL) { 422f841f6adSraf maxmsg = MQ_MAXMSG; 423f841f6adSraf msgsize = MQ_MAXSIZE; 424f841f6adSraf } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 425f841f6adSraf errno = EINVAL; 426f841f6adSraf goto out; 427f841f6adSraf } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 428f841f6adSraf errno = ENOSPC; 429f841f6adSraf goto out; 430f841f6adSraf } else { 431f841f6adSraf maxmsg = attr->mq_maxmsg; 432f841f6adSraf msgsize = attr->mq_msgsize; 433f841f6adSraf } 434f841f6adSraf 435f841f6adSraf /* adjust for message size at word boundary */ 436f841f6adSraf temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 437f841f6adSraf 438f841f6adSraf total_size = sizeof (mqhdr_t) + 439f841f6adSraf maxmsg * (temp + sizeof (msghdr_t)) + 440f841f6adSraf 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 441f841f6adSraf 442f841f6adSraf if (total_size > SSIZE_MAX) { 443f841f6adSraf errno = ENOSPC; 444f841f6adSraf goto out; 445f841f6adSraf } 446f841f6adSraf 447f841f6adSraf /* 448f841f6adSraf * data file is opened with read/write to those 449f841f6adSraf * who have read or write permission 450f841f6adSraf */ 451f841f6adSraf mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 452f841f6adSraf if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 453f841f6adSraf (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 454f841f6adSraf goto out; 455f841f6adSraf 456f841f6adSraf cr_flag |= DFILE_CREATE | DFILE_OPEN; 457f841f6adSraf 458f841f6adSraf /* force permissions to avoid umask effect */ 459f841f6adSraf if (fchmod(fd, mode) < 0) 460f841f6adSraf goto out; 461f841f6adSraf 462f841f6adSraf if (ftruncate64(fd, (off64_t)total_size) < 0) 463f841f6adSraf goto out; 464f841f6adSraf } else { 465f841f6adSraf if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 466f841f6adSraf O_RDWR, 0666, &err)) < 0) 467f841f6adSraf goto out; 468f841f6adSraf cr_flag = DFILE_OPEN; 469f841f6adSraf 470f841f6adSraf /* Message queue has not been initialized yet */ 471f841f6adSraf if (read(fd, &total_size, sizeof (total_size)) != 472f841f6adSraf sizeof (total_size) || total_size == 0) { 473f841f6adSraf errno = ENOENT; 474f841f6adSraf goto out; 475f841f6adSraf } 476f841f6adSraf 477f841f6adSraf /* Message queue too big for this process to handle */ 478f841f6adSraf if (total_size > SSIZE_MAX) { 479f841f6adSraf errno = EFBIG; 480f841f6adSraf goto out; 481f841f6adSraf } 482f841f6adSraf } 483f841f6adSraf 484f841f6adSraf if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 485f841f6adSraf errno = ENOMEM; 486f841f6adSraf goto out; 487f841f6adSraf } 488f841f6adSraf cr_flag |= ALLOC_MEM; 489f841f6adSraf 490f841f6adSraf if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 491f841f6adSraf MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 492f841f6adSraf goto out; 493f841f6adSraf mqhp = ptr; 494f841f6adSraf cr_flag |= DFILE_MMAP; 495f841f6adSraf 496f841f6adSraf /* closing data file */ 497f841f6adSraf (void) __close_nc(fd); 498f841f6adSraf cr_flag &= ~DFILE_OPEN; 499f841f6adSraf 500f841f6adSraf /* 501f841f6adSraf * create, unlink, size, mmap, and close description file 502f841f6adSraf * all for a flag word in anonymous shared memory 503f841f6adSraf */ 504f841f6adSraf if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 505f841f6adSraf 0666, &err)) < 0) 506f841f6adSraf goto out; 507f841f6adSraf cr_flag |= DFILE_OPEN; 508f841f6adSraf (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 509f841f6adSraf if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 510f841f6adSraf goto out; 511f841f6adSraf 512f841f6adSraf if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 513f841f6adSraf PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 514f841f6adSraf goto out; 515f841f6adSraf mqdnp = ptr; 516f841f6adSraf cr_flag |= MQDNP_MMAP; 517f841f6adSraf 518f841f6adSraf (void) __close_nc(fd); 519f841f6adSraf cr_flag &= ~DFILE_OPEN; 520f841f6adSraf 521f841f6adSraf /* 522f841f6adSraf * we follow the same strategy as filesystem open() routine, 523f841f6adSraf * where fcntl.h flags are changed to flags defined in file.h. 524f841f6adSraf */ 525f841f6adSraf mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 526f841f6adSraf mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 527f841f6adSraf 528f841f6adSraf /* new message queue requires initialization */ 529f841f6adSraf if ((cr_flag & DFILE_CREATE) != 0) { 530f841f6adSraf /* message queue header has to be initialized */ 531f841f6adSraf mq_init(mqhp, msgsize, maxmsg); 532f841f6adSraf mqhp->mq_totsize = total_size; 533f841f6adSraf } 534f841f6adSraf mqdp->mqd_mq = mqhp; 535f841f6adSraf mqdp->mqd_mqdn = mqdnp; 536f841f6adSraf mqdp->mqd_magic = MQ_MAGIC; 537f841f6adSraf mqdp->mqd_tcd = NULL; 538*34537ab1Sraf mqdp->mqd_ownerdead = 0; 539f841f6adSraf if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { 540f841f6adSraf lmutex_lock(&mq_list_lock); 541f841f6adSraf mqdp->mqd_next = mq_list; 542f841f6adSraf mqdp->mqd_prev = NULL; 543f841f6adSraf if (mq_list) 544f841f6adSraf mq_list->mqd_prev = mqdp; 545f841f6adSraf mq_list = mqdp; 546f841f6adSraf lmutex_unlock(&mq_list_lock); 547f841f6adSraf return ((mqd_t)mqdp); 548f841f6adSraf } 549f841f6adSraf 550f841f6adSraf locked = 0; /* fall into the error case */ 551f841f6adSraf out: 552f841f6adSraf err = errno; 553f841f6adSraf if ((cr_flag & DFILE_OPEN) != 0) 554f841f6adSraf (void) __close_nc(fd); 555f841f6adSraf if ((cr_flag & DFILE_CREATE) != 0) 556f841f6adSraf (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 557f841f6adSraf if ((cr_flag & PFILE_CREATE) != 0) 558f841f6adSraf (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 559f841f6adSraf if ((cr_flag & ALLOC_MEM) != 0) 560f841f6adSraf free((void *)mqdp); 561f841f6adSraf if ((cr_flag & DFILE_MMAP) != 0) 562f841f6adSraf (void) munmap((caddr_t)mqhp, (size_t)total_size); 563f841f6adSraf if ((cr_flag & MQDNP_MMAP) != 0) 564f841f6adSraf (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 565f841f6adSraf if (locked) 566f841f6adSraf (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 567f841f6adSraf errno = err; 568f841f6adSraf return ((mqd_t)-1); 569f841f6adSraf } 570f841f6adSraf 571f841f6adSraf static void 572f841f6adSraf mq_close_cleanup(mqdes_t *mqdp) 573f841f6adSraf { 574f841f6adSraf mqhdr_t *mqhp = mqdp->mqd_mq; 575f841f6adSraf struct mq_dn *mqdnp = mqdp->mqd_mqdn; 576f841f6adSraf 577f841f6adSraf /* invalidate the descriptor before freeing it */ 578f841f6adSraf mqdp->mqd_magic = 0; 579*34537ab1Sraf if (!mqdp->mqd_ownerdead) 580f841f6adSraf (void) mutex_unlock(&mqhp->mq_exclusive); 581f841f6adSraf 582f841f6adSraf lmutex_lock(&mq_list_lock); 583f841f6adSraf if (mqdp->mqd_next) 584f841f6adSraf mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; 585f841f6adSraf if (mqdp->mqd_prev) 586f841f6adSraf mqdp->mqd_prev->mqd_next = mqdp->mqd_next; 587f841f6adSraf if (mq_list == mqdp) 588f841f6adSraf mq_list = mqdp->mqd_next; 589f841f6adSraf lmutex_unlock(&mq_list_lock); 590f841f6adSraf 591f841f6adSraf free(mqdp); 592f841f6adSraf (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 593f841f6adSraf (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); 594f841f6adSraf } 595f841f6adSraf 596f841f6adSraf int 5977257d1b4Sraf mq_close(mqd_t mqdes) 598f841f6adSraf { 599f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 600f841f6adSraf mqhdr_t *mqhp; 601f841f6adSraf thread_communication_data_t *tcdp; 602*34537ab1Sraf int error; 603f841f6adSraf 604f841f6adSraf if (!mq_is_valid(mqdp)) { 605f841f6adSraf errno = EBADF; 606f841f6adSraf return (-1); 607f841f6adSraf } 608f841f6adSraf 609f841f6adSraf mqhp = mqdp->mqd_mq; 610*34537ab1Sraf if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 611*34537ab1Sraf mqdp->mqd_ownerdead = 1; 612*34537ab1Sraf if (error == EOWNERDEAD) 613*34537ab1Sraf (void) mutex_unlock(&mqhp->mq_exclusive); 614*34537ab1Sraf /* carry on regardless, without holding mq_exclusive */ 615*34537ab1Sraf } 616f841f6adSraf 617f841f6adSraf if (mqhp->mq_des == (uintptr_t)mqdp && 618f841f6adSraf mqhp->mq_sigid.sn_pid == getpid()) { 619f841f6adSraf /* notification is set for this descriptor, remove it */ 620f841f6adSraf (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 621f841f6adSraf mqhp->mq_ntype = 0; 622f841f6adSraf mqhp->mq_des = 0; 623f841f6adSraf } 624f841f6adSraf 625f841f6adSraf pthread_cleanup_push(mq_close_cleanup, mqdp); 626f841f6adSraf if ((tcdp = mqdp->mqd_tcd) != NULL) { 627f841f6adSraf mqdp->mqd_tcd = NULL; 628f841f6adSraf del_sigev_mq(tcdp); /* possible cancellation point */ 629f841f6adSraf } 630f841f6adSraf pthread_cleanup_pop(1); /* finish in the cleanup handler */ 631f841f6adSraf 632f841f6adSraf return (0); 633f841f6adSraf } 634f841f6adSraf 635f841f6adSraf int 6367257d1b4Sraf mq_unlink(const char *path) 637f841f6adSraf { 638f841f6adSraf int err; 639f841f6adSraf 640f841f6adSraf if (__pos4obj_check(path) < 0) 641f841f6adSraf return (-1); 642f841f6adSraf 643f841f6adSraf if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 644f841f6adSraf return (-1); 645f841f6adSraf } 646f841f6adSraf 647f841f6adSraf err = __pos4obj_unlink(path, MQ_PERM_TYPE); 648f841f6adSraf 649f841f6adSraf if (err == 0 || (err == -1 && errno == EEXIST)) { 650f841f6adSraf errno = 0; 651f841f6adSraf err = __pos4obj_unlink(path, MQ_DATA_TYPE); 652f841f6adSraf } 653f841f6adSraf 654f841f6adSraf if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 655f841f6adSraf return (-1); 656f841f6adSraf 657f841f6adSraf return (err); 658f841f6adSraf 659f841f6adSraf } 660f841f6adSraf 661f841f6adSraf static int 662f841f6adSraf __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 663f841f6adSraf uint_t msg_prio, const timespec_t *timeout, int abs_rel) 664f841f6adSraf { 665f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 666f841f6adSraf mqhdr_t *mqhp; 667f841f6adSraf int err; 668f841f6adSraf int notify = 0; 669f841f6adSraf 670f841f6adSraf /* 671f841f6adSraf * sem_*wait() does cancellation, if called. 672f841f6adSraf * pthread_testcancel() ensures that cancellation takes place if 673f841f6adSraf * there is a cancellation pending when mq_*send() is called. 674f841f6adSraf */ 675f841f6adSraf pthread_testcancel(); 676f841f6adSraf 677f841f6adSraf if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 678f841f6adSraf errno = EBADF; 679f841f6adSraf return (-1); 680f841f6adSraf } 681f841f6adSraf 682f841f6adSraf mqhp = mqdp->mqd_mq; 683f841f6adSraf 684f841f6adSraf if (msg_prio >= mqhp->mq_maxprio) { 685f841f6adSraf errno = EINVAL; 686f841f6adSraf return (-1); 687f841f6adSraf } 688f841f6adSraf if (msg_len > mqhp->mq_maxsz) { 689f841f6adSraf errno = EMSGSIZE; 690f841f6adSraf return (-1); 691f841f6adSraf } 692f841f6adSraf 693f841f6adSraf if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) 694f841f6adSraf err = sem_trywait(&mqhp->mq_notfull); 695f841f6adSraf else { 696f841f6adSraf /* 697f841f6adSraf * We might get cancelled here... 698f841f6adSraf */ 699f841f6adSraf if (timeout == NULL) 700f841f6adSraf err = sem_wait(&mqhp->mq_notfull); 701f841f6adSraf else if (abs_rel == ABS_TIME) 702f841f6adSraf err = sem_timedwait(&mqhp->mq_notfull, timeout); 703f841f6adSraf else 704f841f6adSraf err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 705f841f6adSraf } 706f841f6adSraf if (err == -1) { 707f841f6adSraf /* 708f841f6adSraf * errno has been set to EAGAIN / EINTR / ETIMEDOUT 709f841f6adSraf * by sem_*wait(), so we can just return. 710f841f6adSraf */ 711f841f6adSraf return (-1); 712f841f6adSraf } 713f841f6adSraf 714f841f6adSraf /* 715f841f6adSraf * By the time we're here, we know that we've got the capacity 716f841f6adSraf * to add to the queue...now acquire the exclusive lock. 717f841f6adSraf */ 718*34537ab1Sraf if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 719*34537ab1Sraf owner_dead(mqdp, err); 720*34537ab1Sraf return (-1); 721*34537ab1Sraf } 722f841f6adSraf 723f841f6adSraf /* 724f841f6adSraf * Now determine if we want to kick the notification. POSIX 725f841f6adSraf * requires that if a process has registered for notification, 726f841f6adSraf * we must kick it when the queue makes an empty to non-empty 727f841f6adSraf * transition, and there are no blocked receivers. Note that 728f841f6adSraf * this mechanism does _not_ guarantee that the kicked process 729f841f6adSraf * will be able to receive a message without blocking; 730f841f6adSraf * another receiver could intervene in the meantime. Thus, 731f841f6adSraf * the notification mechanism is inherently racy; all we can 732f841f6adSraf * do is hope to minimize the window as much as possible. 733f841f6adSraf * In general, we want to avoid kicking the notification when 734f841f6adSraf * there are clearly receivers blocked. We'll determine if 735f841f6adSraf * we want to kick the notification before the mq_putmsg(), 736f841f6adSraf * but the actual signotify() won't be done until the message 737f841f6adSraf * is on the queue. 738f841f6adSraf */ 739f841f6adSraf if (mqhp->mq_sigid.sn_pid != 0) { 740f841f6adSraf int nmessages, nblocked; 741f841f6adSraf 742f841f6adSraf (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 743f841f6adSraf (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 744f841f6adSraf 745f841f6adSraf if (nmessages == 0 && nblocked == 0) 746f841f6adSraf notify = 1; 747f841f6adSraf } 748f841f6adSraf 749f841f6adSraf mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 750f841f6adSraf (void) sem_post(&mqhp->mq_notempty); 751f841f6adSraf 752f841f6adSraf if (notify) { 753f841f6adSraf /* notify and also delete the registration */ 754*34537ab1Sraf do_notify(mqhp); 755f841f6adSraf } 756f841f6adSraf 757f841f6adSraf MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 758f841f6adSraf (void) mutex_unlock(&mqhp->mq_exclusive); 759f841f6adSraf 760f841f6adSraf return (0); 761f841f6adSraf } 762f841f6adSraf 763f841f6adSraf int 7647257d1b4Sraf mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 765f841f6adSraf { 766f841f6adSraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 767f841f6adSraf NULL, ABS_TIME)); 768f841f6adSraf } 769f841f6adSraf 770f841f6adSraf int 7717257d1b4Sraf mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 772f841f6adSraf uint_t msg_prio, const timespec_t *abs_timeout) 773f841f6adSraf { 774f841f6adSraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 775f841f6adSraf abs_timeout, ABS_TIME)); 776f841f6adSraf } 777f841f6adSraf 778f841f6adSraf int 7797257d1b4Sraf mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 780f841f6adSraf uint_t msg_prio, const timespec_t *rel_timeout) 781f841f6adSraf { 782f841f6adSraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 783f841f6adSraf rel_timeout, REL_TIME)); 784f841f6adSraf } 785f841f6adSraf 786f841f6adSraf static void 787f841f6adSraf decrement_rblocked(mqhdr_t *mqhp) 788f841f6adSraf { 789a574db85Sraf int cancel_state; 790f841f6adSraf 791a574db85Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 792f841f6adSraf while (sem_wait(&mqhp->mq_rblocked) == -1) 793f841f6adSraf continue; 794a574db85Sraf (void) pthread_setcancelstate(cancel_state, NULL); 795f841f6adSraf } 796f841f6adSraf 797f841f6adSraf static ssize_t 798f841f6adSraf __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 799f841f6adSraf uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 800f841f6adSraf { 801f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 802f841f6adSraf mqhdr_t *mqhp; 803f841f6adSraf ssize_t msg_size; 804f841f6adSraf int err; 805f841f6adSraf 806f841f6adSraf /* 807f841f6adSraf * sem_*wait() does cancellation, if called. 808f841f6adSraf * pthread_testcancel() ensures that cancellation takes place if 809f841f6adSraf * there is a cancellation pending when mq_*receive() is called. 810f841f6adSraf */ 811f841f6adSraf pthread_testcancel(); 812f841f6adSraf 813f841f6adSraf if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 814f841f6adSraf errno = EBADF; 815f841f6adSraf return (ssize_t)(-1); 816f841f6adSraf } 817f841f6adSraf 818f841f6adSraf mqhp = mqdp->mqd_mq; 819f841f6adSraf 820f841f6adSraf if (msg_len < mqhp->mq_maxsz) { 821f841f6adSraf errno = EMSGSIZE; 822f841f6adSraf return (ssize_t)(-1); 823f841f6adSraf } 824f841f6adSraf 825f841f6adSraf /* 826f841f6adSraf * The semaphoring scheme for mq_[timed]receive is a little hairy 827f841f6adSraf * thanks to POSIX.1b's arcane notification mechanism. First, 828f841f6adSraf * we try to take the common case and do a sem_trywait(). 829f841f6adSraf * If that doesn't work, and O_NONBLOCK hasn't been set, 830f841f6adSraf * then note that we're going to sleep by incrementing the rblocked 831f841f6adSraf * semaphore. We decrement that semaphore after waking up. 832f841f6adSraf */ 833f841f6adSraf if (sem_trywait(&mqhp->mq_notempty) == -1) { 834f841f6adSraf if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 835f841f6adSraf /* 836f841f6adSraf * errno has been set to EAGAIN or EINTR by 837f841f6adSraf * sem_trywait(), so we can just return. 838f841f6adSraf */ 839f841f6adSraf return (-1); 840f841f6adSraf } 841f841f6adSraf /* 842f841f6adSraf * If we're here, then we're probably going to block... 843f841f6adSraf * increment the rblocked semaphore. If we get 844f841f6adSraf * cancelled, decrement_rblocked() will decrement it. 845f841f6adSraf */ 846f841f6adSraf (void) sem_post(&mqhp->mq_rblocked); 847f841f6adSraf 848f841f6adSraf pthread_cleanup_push(decrement_rblocked, mqhp); 849f841f6adSraf if (timeout == NULL) 850f841f6adSraf err = sem_wait(&mqhp->mq_notempty); 851f841f6adSraf else if (abs_rel == ABS_TIME) 852f841f6adSraf err = sem_timedwait(&mqhp->mq_notempty, timeout); 853f841f6adSraf else 854f841f6adSraf err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 855f841f6adSraf pthread_cleanup_pop(1); 856f841f6adSraf 857f841f6adSraf if (err == -1) { 858f841f6adSraf /* 859f841f6adSraf * We took a signal or timeout while waiting 860f841f6adSraf * on mq_notempty... 861f841f6adSraf */ 862f841f6adSraf return (-1); 863f841f6adSraf } 864f841f6adSraf } 865f841f6adSraf 866*34537ab1Sraf if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) { 867*34537ab1Sraf owner_dead(mqdp, err); 868*34537ab1Sraf return (-1); 869*34537ab1Sraf } 870f841f6adSraf msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 871f841f6adSraf (void) sem_post(&mqhp->mq_notfull); 872f841f6adSraf MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 873f841f6adSraf (void) mutex_unlock(&mqhp->mq_exclusive); 874f841f6adSraf 875f841f6adSraf return (msg_size); 876f841f6adSraf } 877f841f6adSraf 878f841f6adSraf ssize_t 8797257d1b4Sraf mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 880f841f6adSraf { 881f841f6adSraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 882f841f6adSraf NULL, ABS_TIME)); 883f841f6adSraf } 884f841f6adSraf 885f841f6adSraf ssize_t 8867257d1b4Sraf mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 887f841f6adSraf uint_t *msg_prio, const timespec_t *abs_timeout) 888f841f6adSraf { 889f841f6adSraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 890f841f6adSraf abs_timeout, ABS_TIME)); 891f841f6adSraf } 892f841f6adSraf 893f841f6adSraf ssize_t 8947257d1b4Sraf mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 895f841f6adSraf uint_t *msg_prio, const timespec_t *rel_timeout) 896f841f6adSraf { 897f841f6adSraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 898f841f6adSraf rel_timeout, REL_TIME)); 899f841f6adSraf } 900f841f6adSraf 901f841f6adSraf /* 9027257d1b4Sraf * Only used below, in mq_notify(). 903f841f6adSraf * We already have a spawner thread. 904f841f6adSraf * Verify that the attributes match; cancel it if necessary. 905f841f6adSraf */ 906f841f6adSraf static int 907f841f6adSraf cancel_if_necessary(thread_communication_data_t *tcdp, 908f841f6adSraf const struct sigevent *sigevp) 909f841f6adSraf { 9107257d1b4Sraf int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp, 911f841f6adSraf sigevp->sigev_notify_attributes); 912f841f6adSraf 913f841f6adSraf if (do_cancel) { 914f841f6adSraf /* 915f841f6adSraf * Attributes don't match, cancel the spawner thread. 916f841f6adSraf */ 917f841f6adSraf (void) pthread_cancel(tcdp->tcd_server_id); 918f841f6adSraf } else { 919f841f6adSraf /* 920f841f6adSraf * Reuse the existing spawner thread with possibly 921f841f6adSraf * changed notification function and value. 922f841f6adSraf */ 923f841f6adSraf tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; 924f841f6adSraf tcdp->tcd_notif.sigev_signo = 0; 925f841f6adSraf tcdp->tcd_notif.sigev_value = sigevp->sigev_value; 926f841f6adSraf tcdp->tcd_notif.sigev_notify_function = 927f841f6adSraf sigevp->sigev_notify_function; 928f841f6adSraf } 929f841f6adSraf 930f841f6adSraf return (do_cancel); 931f841f6adSraf } 932f841f6adSraf 933f841f6adSraf int 9347257d1b4Sraf mq_notify(mqd_t mqdes, const struct sigevent *sigevp) 935f841f6adSraf { 936f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 937f841f6adSraf mqhdr_t *mqhp; 938f841f6adSraf thread_communication_data_t *tcdp; 939f841f6adSraf siginfo_t mq_siginfo; 940f841f6adSraf struct sigevent sigevent; 941f841f6adSraf struct stat64 statb; 942f841f6adSraf port_notify_t *pn; 943f841f6adSraf void *userval; 944f841f6adSraf int rval = -1; 945f841f6adSraf int ntype; 946f841f6adSraf int port; 947*34537ab1Sraf int error; 948f841f6adSraf 949f841f6adSraf if (!mq_is_valid(mqdp)) { 950f841f6adSraf errno = EBADF; 951f841f6adSraf return (-1); 952f841f6adSraf } 953f841f6adSraf 954f841f6adSraf mqhp = mqdp->mqd_mq; 955f841f6adSraf 956*34537ab1Sraf if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) { 957*34537ab1Sraf mqdp->mqd_ownerdead = 1; 958*34537ab1Sraf sigevp = NULL; 959*34537ab1Sraf if (error == EOWNERDEAD) 960*34537ab1Sraf (void) mutex_unlock(&mqhp->mq_exclusive); 961*34537ab1Sraf /* carry on regardless, without holding mq_exclusive */ 962*34537ab1Sraf } 963f841f6adSraf 964f841f6adSraf if (sigevp == NULL) { /* remove notification */ 965f841f6adSraf if (mqhp->mq_des == (uintptr_t)mqdp && 966f841f6adSraf mqhp->mq_sigid.sn_pid == getpid()) { 967f841f6adSraf /* notification is set for this descriptor, remove it */ 968f841f6adSraf (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 969f841f6adSraf if ((tcdp = mqdp->mqd_tcd) != NULL) { 970f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock); 971f841f6adSraf if (tcdp->tcd_msg_enabled) { 972f841f6adSraf /* cancel the spawner thread */ 973f841f6adSraf tcdp = mqdp->mqd_tcd; 974f841f6adSraf mqdp->mqd_tcd = NULL; 975f841f6adSraf (void) pthread_cancel( 976f841f6adSraf tcdp->tcd_server_id); 977f841f6adSraf } 978f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock); 979f841f6adSraf } 980f841f6adSraf mqhp->mq_ntype = 0; 981f841f6adSraf mqhp->mq_des = 0; 982f841f6adSraf } else { 983f841f6adSraf /* notification is not set for this descriptor */ 984f841f6adSraf errno = EBUSY; 985f841f6adSraf goto bad; 986f841f6adSraf } 987f841f6adSraf } else { /* register notification with this process */ 988f841f6adSraf switch (ntype = sigevp->sigev_notify) { 989f841f6adSraf case SIGEV_THREAD: 990f841f6adSraf userval = sigevp->sigev_value.sival_ptr; 991f841f6adSraf port = -1; 992f841f6adSraf break; 993f841f6adSraf case SIGEV_PORT: 994f841f6adSraf pn = sigevp->sigev_value.sival_ptr; 995f841f6adSraf userval = pn->portnfy_user; 996f841f6adSraf port = pn->portnfy_port; 997f841f6adSraf if (fstat64(port, &statb) != 0 || 998f841f6adSraf !S_ISPORT(statb.st_mode)) { 999f841f6adSraf errno = EBADF; 1000f841f6adSraf goto bad; 1001f841f6adSraf } 1002f841f6adSraf (void) memset(&sigevent, 0, sizeof (sigevent)); 1003f841f6adSraf sigevent.sigev_notify = SIGEV_PORT; 1004f841f6adSraf sigevp = &sigevent; 1005f841f6adSraf break; 1006f841f6adSraf } 1007f841f6adSraf switch (ntype) { 1008f841f6adSraf case SIGEV_NONE: 1009f841f6adSraf mq_siginfo.si_signo = 0; 1010f841f6adSraf mq_siginfo.si_code = SI_MESGQ; 1011f841f6adSraf break; 1012f841f6adSraf case SIGEV_SIGNAL: 1013f841f6adSraf mq_siginfo.si_signo = sigevp->sigev_signo; 1014f841f6adSraf mq_siginfo.si_value = sigevp->sigev_value; 1015f841f6adSraf mq_siginfo.si_code = SI_MESGQ; 1016f841f6adSraf break; 1017f841f6adSraf case SIGEV_THREAD: 1018f841f6adSraf if ((tcdp = mqdp->mqd_tcd) != NULL && 1019f841f6adSraf cancel_if_necessary(tcdp, sigevp)) 1020f841f6adSraf mqdp->mqd_tcd = NULL; 1021f841f6adSraf /* FALLTHROUGH */ 1022f841f6adSraf case SIGEV_PORT: 1023f841f6adSraf if ((tcdp = mqdp->mqd_tcd) == NULL) { 1024f841f6adSraf /* we must create a spawner thread */ 1025f841f6adSraf tcdp = setup_sigev_handler(sigevp, MQ); 1026f841f6adSraf if (tcdp == NULL) { 1027f841f6adSraf errno = EBADF; 1028f841f6adSraf goto bad; 1029f841f6adSraf } 1030f841f6adSraf tcdp->tcd_msg_enabled = 0; 1031f841f6adSraf tcdp->tcd_msg_closing = 0; 1032f841f6adSraf tcdp->tcd_msg_avail = &mqhp->mq_spawner; 1033f841f6adSraf if (launch_spawner(tcdp) != 0) { 1034f841f6adSraf free_sigev_handler(tcdp); 1035f841f6adSraf goto bad; 1036f841f6adSraf } 1037f841f6adSraf mqdp->mqd_tcd = tcdp; 1038f841f6adSraf } 1039f841f6adSraf mq_siginfo.si_signo = 0; 1040f841f6adSraf mq_siginfo.si_code = SI_MESGQ; 1041f841f6adSraf break; 1042f841f6adSraf default: 1043f841f6adSraf errno = EINVAL; 1044f841f6adSraf goto bad; 1045f841f6adSraf } 1046f841f6adSraf 1047f841f6adSraf /* register notification */ 1048f841f6adSraf if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 1049f841f6adSraf goto bad; 1050f841f6adSraf mqhp->mq_ntype = ntype; 1051f841f6adSraf mqhp->mq_des = (uintptr_t)mqdp; 1052f841f6adSraf switch (ntype) { 1053f841f6adSraf case SIGEV_THREAD: 1054f841f6adSraf case SIGEV_PORT: 1055f841f6adSraf tcdp->tcd_port = port; 1056f841f6adSraf tcdp->tcd_msg_object = mqdp; 1057f841f6adSraf tcdp->tcd_msg_userval = userval; 1058f841f6adSraf sig_mutex_lock(&tcdp->tcd_lock); 1059f841f6adSraf tcdp->tcd_msg_enabled = ntype; 1060f841f6adSraf sig_mutex_unlock(&tcdp->tcd_lock); 1061f841f6adSraf (void) cond_broadcast(&tcdp->tcd_cv); 1062f841f6adSraf break; 1063f841f6adSraf } 1064f841f6adSraf } 1065f841f6adSraf 1066f841f6adSraf rval = 0; /* success */ 1067f841f6adSraf bad: 1068*34537ab1Sraf if (error == 0) { 1069f841f6adSraf (void) mutex_unlock(&mqhp->mq_exclusive); 1070*34537ab1Sraf } else { 1071*34537ab1Sraf errno = EBADMSG; 1072*34537ab1Sraf rval = -1; 1073*34537ab1Sraf } 1074f841f6adSraf return (rval); 1075f841f6adSraf } 1076f841f6adSraf 1077f841f6adSraf int 10787257d1b4Sraf mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 1079f841f6adSraf { 1080f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 1081f841f6adSraf mqhdr_t *mqhp; 1082f841f6adSraf uint_t flag = 0; 1083f841f6adSraf 1084f841f6adSraf if (!mq_is_valid(mqdp)) { 1085f841f6adSraf errno = EBADF; 1086f841f6adSraf return (-1); 1087f841f6adSraf } 1088f841f6adSraf 1089f841f6adSraf /* store current attributes */ 1090f841f6adSraf if (omqstat != NULL) { 1091f841f6adSraf int count; 1092f841f6adSraf 1093f841f6adSraf mqhp = mqdp->mqd_mq; 1094f841f6adSraf omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1095f841f6adSraf omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1096f841f6adSraf omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1097f841f6adSraf (void) sem_getvalue(&mqhp->mq_notempty, &count); 1098f841f6adSraf omqstat->mq_curmsgs = count; 1099f841f6adSraf } 1100f841f6adSraf 1101f841f6adSraf /* set description attributes */ 1102f841f6adSraf if ((mqstat->mq_flags & O_NONBLOCK) != 0) 1103f841f6adSraf flag = FNONBLOCK; 1104f841f6adSraf mqdp->mqd_mqdn->mqdn_flags = flag; 1105f841f6adSraf 1106f841f6adSraf return (0); 1107f841f6adSraf } 1108f841f6adSraf 1109f841f6adSraf int 11107257d1b4Sraf mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 1111f841f6adSraf { 1112f841f6adSraf mqdes_t *mqdp = (mqdes_t *)mqdes; 1113f841f6adSraf mqhdr_t *mqhp; 1114f841f6adSraf int count; 1115f841f6adSraf 1116f841f6adSraf if (!mq_is_valid(mqdp)) { 1117f841f6adSraf errno = EBADF; 1118f841f6adSraf return (-1); 1119f841f6adSraf } 1120f841f6adSraf 1121f841f6adSraf mqhp = mqdp->mqd_mq; 1122f841f6adSraf 1123f841f6adSraf mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 1124f841f6adSraf mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 1125f841f6adSraf mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 1126f841f6adSraf (void) sem_getvalue(&mqhp->mq_notempty, &count); 1127f841f6adSraf mqstat->mq_curmsgs = count; 1128f841f6adSraf return (0); 1129f841f6adSraf } 1130f841f6adSraf 1131f841f6adSraf /* 1132f841f6adSraf * Cleanup after fork1() in the child process. 1133f841f6adSraf */ 1134f841f6adSraf void 1135f841f6adSraf postfork1_child_sigev_mq(void) 1136f841f6adSraf { 1137f841f6adSraf thread_communication_data_t *tcdp; 1138f841f6adSraf mqdes_t *mqdp; 1139f841f6adSraf 1140f841f6adSraf for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { 1141f841f6adSraf if ((tcdp = mqdp->mqd_tcd) != NULL) { 1142f841f6adSraf mqdp->mqd_tcd = NULL; 1143f841f6adSraf tcd_teardown(tcdp); 1144f841f6adSraf } 1145f841f6adSraf } 1146f841f6adSraf } 1147