1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
28
29
30 /*
31 * Inter-Process Communication Message Facility.
32 *
33 * See os/ipc.c for a description of common IPC functionality.
34 *
35 * Resource controls
36 * -----------------
37 *
38 * Control: zone.max-msg-ids (rc_zone_msgmni)
39 * Description: Maximum number of message queue ids allowed a zone.
40 *
41 * When msgget() is used to allocate a message queue, one id is
42 * allocated. If the id allocation doesn't succeed, msgget() fails
43 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
44 * the id is deallocated.
45 *
46 * Control: project.max-msg-ids (rc_project_msgmni)
47 * Description: Maximum number of message queue ids allowed a project.
48 *
49 * When msgget() is used to allocate a message queue, one id is
50 * allocated. If the id allocation doesn't succeed, msgget() fails
51 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
52 * the id is deallocated.
53 *
54 * Control: process.max-msg-qbytes (rc_process_msgmnb)
55 * Description: Maximum number of bytes of messages on a message queue.
56 *
57 * When msgget() successfully allocates a message queue, the minimum
58 * enforced value of this limit is used to initialize msg_qbytes.
59 *
60 * Control: process.max-msg-messages (rc_process_msgtql)
61 * Description: Maximum number of messages on a message queue.
62 *
63 * When msgget() successfully allocates a message queue, the minimum
64 * enforced value of this limit is used to initialize a per-queue
65 * limit on the number of messages.
66 */
67
68 #include <sys/types.h>
69 #include <sys/t_lock.h>
70 #include <sys/param.h>
71 #include <sys/cred.h>
72 #include <sys/user.h>
73 #include <sys/proc.h>
74 #include <sys/time.h>
75 #include <sys/ipc.h>
76 #include <sys/ipc_impl.h>
77 #include <sys/msg.h>
78 #include <sys/msg_impl.h>
79 #include <sys/list.h>
80 #include <sys/systm.h>
81 #include <sys/sysmacros.h>
82 #include <sys/cpuvar.h>
83 #include <sys/kmem.h>
84 #include <sys/ddi.h>
85 #include <sys/errno.h>
86 #include <sys/cmn_err.h>
87 #include <sys/debug.h>
88 #include <sys/project.h>
89 #include <sys/modctl.h>
90 #include <sys/syscall.h>
91 #include <sys/policy.h>
92 #include <sys/zone.h>
93
94 #include <c2/audit.h>
95
96 /*
97 * The following tunables are obsolete. Though for compatibility we
98 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
99 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
100 * mechanism for administrating the IPC Message facility is through the
101 * resource controls described at the top of this file.
102 */
103 size_t msginfo_msgmax = 2048; /* (obsolete) */
104 size_t msginfo_msgmnb = 4096; /* (obsolete) */
105 int msginfo_msgmni = 50; /* (obsolete) */
106 int msginfo_msgtql = 40; /* (obsolete) */
107 int msginfo_msgssz = 8; /* (obsolete) */
108 int msginfo_msgmap = 0; /* (obsolete) */
109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */
110
111 extern rctl_hndl_t rc_zone_msgmni;
112 extern rctl_hndl_t rc_project_msgmni;
113 extern rctl_hndl_t rc_process_msgmnb;
114 extern rctl_hndl_t rc_process_msgtql;
115 static ipc_service_t *msq_svc;
116 static zone_key_t msg_zone_key;
117
118 static void msg_dtor(kipc_perm_t *);
119 static void msg_rmid(kipc_perm_t *);
120 static void msg_remove_zone(zoneid_t, void *);
121
122 /*
123 * Module linkage information for the kernel.
124 */
125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
126 uintptr_t a4, uintptr_t a5);
127
128 static struct sysent ipcmsg_sysent = {
129 6,
130 #ifdef _LP64
131 SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
132 #else
133 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
134 #endif
135 (int (*)())msgsys
136 };
137
138 #ifdef _SYSCALL32_IMPL
139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
140 uint32_t a4, uint32_t a5);
141
142 static struct sysent ipcmsg_sysent32 = {
143 6,
144 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
145 (int (*)())msgsys32
146 };
147 #endif /* _SYSCALL32_IMPL */
148
149 static struct modlsys modlsys = {
150 &mod_syscallops, "System V message facility", &ipcmsg_sysent
151 };
152
153 #ifdef _SYSCALL32_IMPL
154 static struct modlsys modlsys32 = {
155 &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
156 };
157 #endif
158
159 /*
160 * Big Theory statement for message queue correctness
161 *
162 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
163 * receivers who are waiting for an event. Using the cv_broadcast method
164 * resulted in negative scaling when the number of waiting receivers are large
165 * (the thundering herd problem). Instead, the receivers waiting to receive a
166 * message are now linked in a queue-like fashion and awaken one at a time in
167 * a controlled manner.
168 *
169 * Receivers can block on two different classes of waiting list:
170 * 1) "sendwait" list, which is the more complex list of the two. The
171 * receiver will be awakened by a sender posting a new message. There
172 * are two types of "sendwait" list used:
173 * a) msg_wait_snd: handles all receivers who are looking for
174 * a message type >= 0, but was unable to locate a match.
175 *
176 * slot 0: reserved for receivers that have designated they
177 * will take any message type.
178 * rest: consist of receivers requesting a specific type
179 * but the type was not present. The entries are
180 * hashed into a bucket in an attempt to keep
181 * any list search relatively short.
182 * b) msg_wait_snd_ngt: handles all receivers that have designated
183 * a negative message type. Unlike msg_wait_snd, the hash bucket
184 * serves a range of negative message types (-1 to -5, -6 to -10
185 * and so forth), where the last bucket is reserved for all the
186 * negative message types that hash outside of MSG_MAX_QNUM - 1.
187 * This is done this way to simplify the operation of locating a
188 * negative message type.
189 *
190 * 2) "copyout" list, where the receiver is awakened by another
191 * receiver after a message is copied out. This is a linked list
192 * of waiters that are awakened one at a time. Although the solution is
193 * not optimal, the complexity that would be added in for waking
194 * up the right entry far exceeds any potential pay back (too many
195 * correctness and corner case issues).
196 *
197 * The lists are doubly linked. In the case of the "sendwait"
198 * list, this allows the thread to remove itself from the list without having
199 * to traverse the list. In the case of the "copyout" list it simply allows
200 * us to use common functions with the "sendwait" list.
201 *
202 * To make sure receivers are not hung out to dry, we must guarantee:
203 * 1. If any queued message matches any receiver, then at least one
204 * matching receiver must be processing the request.
205 * 2. Blocking on the copyout queue is only temporary while messages
206 * are being copied out. The process is guaranted to wakeup
207 * when it gets to front of the queue (copyout is a FIFO).
208 *
209 * Rules for blocking and waking up:
210 * 1. A receiver entering msgrcv must examine all messages for a match
211 * before blocking on a sendwait queue.
212 * 2. If the receiver blocks because the message it chose is already
213 * being copied out, then when it wakes up needs to start start
214 * checking the messages from the beginning.
215 * 3) When ever a process returns from msgrcv for any reason, if it
216 * had attempted to copy a message or blocked waiting for a copy
217 * to complete it needs to wakeup the next receiver blocked on
218 * a copy out.
219 * 4) When a message is sent, the sender selects a process waiting
220 * for that type of message. This selection process rotates between
221 * receivers types of 0, negative and positive to prevent starvation of
222 * any one particular receiver type.
223 * 5) The following are the scenarios for processes that are awakened
224 * by a msgsnd:
225 * a) The process finds the message and is able to copy
226 * it out. Once complete, the process returns.
227 * b) The message that was sent that triggered the wakeup is no
228 * longer available (another process found the message first).
229 * We issue a wakeup on copy queue and then go back to
230 * sleep waiting for another matching message to be sent.
231 * c) The message that was supposed to be processed was
232 * already serviced by another process. However a different
233 * message is present which we can service. The message
234 * is copied and the process returns.
235 * d) The message is found, but some sort of error occurs that
236 * prevents the message from being copied. The receiver
237 * wakes up the next sender that can service this message
238 * type and returns an error to the caller.
239 * e) The message is found, but it is marked as being copied
240 * out. The receiver then goes to sleep on the copyout
241 * queue where it will be awakened again sometime in the future.
242 *
243 *
244 * 6) Whenever a message is found that matches the message type designated,
245 * but is being copied out we have to block on the copyout queue.
246 * After process copying finishes the copy out, it must wakeup (either
247 * directly or indirectly) all receivers who blocked on its copyout,
248 * so they are guaranteed a chance to examine the remaining messages.
249 * This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
250 * and so on. The chain cannot be broken. This leads to the following
251 * cases:
252 * a) A receiver is finished copying the message (or encountered)
253 * an error), the first entry on the copyout queue is woken
254 * up.
255 * b) When the receiver is woken up, it attempts to locate
256 * a message type match.
257 * c) If a message type is found and
258 * -- MSG_RCVCOPY flag is not set, the message is
259 * marked for copying out. Regardless of the copyout
260 * success the next entry on the copyout queue is
261 * awakened and the operation is completed.
262 * -- MSG_RCVCOPY is set, we simply go back to sleep again
263 * on the copyout queue.
264 * d) If the message type is not found then we wakeup the next
265 * process on the copyout queue.
266 * 7) If a msgsnd is unable to complete for of any of the following reasons
267 * a) the msgq has no space for the message
268 * b) the maximum number of messages allowed has been reached
269 * then one of two things happen:
270 * 1) If the passed in msg_flag has IPC_NOWAIT set, then
271 * an error is returned.
272 * 2) The IPC_NOWAIT bit is not set in msg_flag, then the
273 * the thread is placed to sleep until the request can be
274 * serviced.
275 * 8) When waking a thread waiting to send a message, a check is done to
276 * verify that the operation being asked for by the thread will complete.
277 * This decision making process is done in a loop where the oldest request
278 * is checked first. The search will continue until there is no more
279 * room on the msgq or we have checked all the waiters.
280 */
281
282 static uint_t msg_type_hash(long);
283 static int msgq_check_err(kmsqid_t *qp, int cvres);
284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
285 kmsqid_t *);
286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
287 struct msg *, struct ipcmsgbuf *, int);
288 static void msg_rcvq_wakeup_all(list_t *);
289 static void msg_wakeup_senders(kmsqid_t *);
290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
296
297 msg_select_t msg_fnd_sndr[] = {
298 { msg_fnd_any_snd, &msg_fnd_sndr[1] },
299 { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
300 { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
301 };
302
303 msg_select_t msg_fnd_rdr[1] = {
304 { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
305 };
306
307 static struct modlinkage modlinkage = {
308 MODREV_1,
309 &modlsys,
310 #ifdef _SYSCALL32_IMPL
311 &modlsys32,
312 #endif
313 NULL
314 };
315
316 #define MSG_SMALL_INIT (size_t)-1
317 int
_init(void)318 _init(void)
319 {
320 int result;
321
322 msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
323 sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
324 offsetof(ipc_rqty_t, ipcq_msgmni));
325 zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
326
327 if ((result = mod_install(&modlinkage)) == 0)
328 return (0);
329
330 (void) zone_key_delete(msg_zone_key);
331 ipcs_destroy(msq_svc);
332
333 return (result);
334 }
335
336 int
_fini(void)337 _fini(void)
338 {
339 return (EBUSY);
340 }
341
342 int
_info(struct modinfo * modinfop)343 _info(struct modinfo *modinfop)
344 {
345 return (mod_info(&modlinkage, modinfop));
346 }
347
348 static void
msg_dtor(kipc_perm_t * perm)349 msg_dtor(kipc_perm_t *perm)
350 {
351 kmsqid_t *qp = (kmsqid_t *)perm;
352 int ii;
353
354 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
355 ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
356 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
357 list_destroy(&qp->msg_wait_snd[ii]);
358 list_destroy(&qp->msg_wait_snd_ngt[ii]);
359 }
360 ASSERT(list_is_empty(&qp->msg_cpy_block));
361 ASSERT(list_is_empty(&qp->msg_wait_rcv));
362 list_destroy(&qp->msg_cpy_block);
363 ASSERT(qp->msg_snd_cnt == 0);
364 ASSERT(qp->msg_cbytes == 0);
365 list_destroy(&qp->msg_list);
366 list_destroy(&qp->msg_wait_rcv);
367 }
368
369
370 #define msg_hold(mp) (mp)->msg_copycnt++
371
372 /*
373 * msg_rele - decrement the reference count on the message. When count
374 * reaches zero, free message header and contents.
375 */
376 static void
msg_rele(struct msg * mp)377 msg_rele(struct msg *mp)
378 {
379 ASSERT(mp->msg_copycnt > 0);
380 if (mp->msg_copycnt-- == 1) {
381 if (mp->msg_addr)
382 kmem_free(mp->msg_addr, mp->msg_size);
383 kmem_free(mp, sizeof (struct msg));
384 }
385 }
386
387 /*
388 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
389 * waiting for free bytes on queue.
390 *
391 * Called with queue locked.
392 */
393 static void
msgunlink(kmsqid_t * qp,struct msg * mp)394 msgunlink(kmsqid_t *qp, struct msg *mp)
395 {
396 list_remove(&qp->msg_list, mp);
397 qp->msg_qnum--;
398 qp->msg_cbytes -= mp->msg_size;
399 msg_rele(mp);
400
401 /* Wake up waiting writers */
402 msg_wakeup_senders(qp);
403 }
404
405 static void
msg_rmid(kipc_perm_t * perm)406 msg_rmid(kipc_perm_t *perm)
407 {
408 kmsqid_t *qp = (kmsqid_t *)perm;
409 struct msg *mp;
410 int ii;
411
412
413 while ((mp = list_head(&qp->msg_list)) != NULL)
414 msgunlink(qp, mp);
415 ASSERT(qp->msg_cbytes == 0);
416
417 /*
418 * Wake up everyone who is in a wait state of some sort
419 * for this message queue.
420 */
421 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
422 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
423 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
424 }
425 msg_rcvq_wakeup_all(&qp->msg_cpy_block);
426 msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
427 }
428
429 /*
430 * msgctl system call.
431 *
432 * gets q lock (via ipc_lookup), releases before return.
433 * may call users of msg_lock
434 */
435 static int
msgctl(int msgid,int cmd,void * arg)436 msgctl(int msgid, int cmd, void *arg)
437 {
438 STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */
439 kmsqid_t *qp; /* ptr to associated q */
440 int error;
441 struct cred *cr;
442 model_t mdl = get_udatamodel();
443 struct msqid_ds64 ds64;
444 kmutex_t *lock;
445 proc_t *pp = curproc;
446
447 STRUCT_INIT(ds, mdl);
448 cr = CRED();
449
450 /*
451 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
452 */
453 switch (cmd) {
454 case IPC_SET:
455 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
456 return (set_errno(EFAULT));
457 break;
458
459 case IPC_SET64:
460 if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
461 return (set_errno(EFAULT));
462 break;
463
464 case IPC_RMID:
465 if (error = ipc_rmid(msq_svc, msgid, cr))
466 return (set_errno(error));
467 return (0);
468 }
469
470 /*
471 * get msqid_ds for this msgid
472 */
473 if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
474 return (set_errno(EINVAL));
475
476 switch (cmd) {
477 case IPC_SET:
478 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
479 secpolicy_ipc_config(cr) != 0) {
480 mutex_exit(lock);
481 return (set_errno(EPERM));
482 }
483 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
484 &STRUCT_BUF(ds)->msg_perm, mdl)) {
485 mutex_exit(lock);
486 return (set_errno(error));
487 }
488 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
489 qp->msg_ctime = gethrestime_sec();
490 break;
491
492 case IPC_STAT:
493 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
494 mutex_exit(lock);
495 return (set_errno(error));
496 }
497
498 if (qp->msg_rcv_cnt)
499 qp->msg_perm.ipc_mode |= MSG_RWAIT;
500 if (qp->msg_snd_cnt)
501 qp->msg_perm.ipc_mode |= MSG_WWAIT;
502 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
503 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
504 STRUCT_FSETP(ds, msg_first, NULL); /* kernel addr */
505 STRUCT_FSETP(ds, msg_last, NULL);
506 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
507 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
508 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
509 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
510 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
511 STRUCT_FSET(ds, msg_stime, qp->msg_stime);
512 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
513 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
514 break;
515
516 case IPC_SET64:
517 mutex_enter(&pp->p_lock);
518 if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
519 secpolicy_ipc_config(cr) != 0 &&
520 rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
521 ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
522 mutex_exit(&pp->p_lock);
523 mutex_exit(lock);
524 return (set_errno(EPERM));
525 }
526 mutex_exit(&pp->p_lock);
527 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
528 &ds64.msgx_perm)) {
529 mutex_exit(lock);
530 return (set_errno(error));
531 }
532 qp->msg_qbytes = ds64.msgx_qbytes;
533 qp->msg_ctime = gethrestime_sec();
534 break;
535
536 case IPC_STAT64:
537 if (qp->msg_rcv_cnt)
538 qp->msg_perm.ipc_mode |= MSG_RWAIT;
539 if (qp->msg_snd_cnt)
540 qp->msg_perm.ipc_mode |= MSG_WWAIT;
541 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
542 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
543 ds64.msgx_cbytes = qp->msg_cbytes;
544 ds64.msgx_qnum = qp->msg_qnum;
545 ds64.msgx_qbytes = qp->msg_qbytes;
546 ds64.msgx_lspid = qp->msg_lspid;
547 ds64.msgx_lrpid = qp->msg_lrpid;
548 ds64.msgx_stime = qp->msg_stime;
549 ds64.msgx_rtime = qp->msg_rtime;
550 ds64.msgx_ctime = qp->msg_ctime;
551 break;
552
553 default:
554 mutex_exit(lock);
555 return (set_errno(EINVAL));
556 }
557
558 mutex_exit(lock);
559
560 /*
561 * Do copyout last (after releasing mutex).
562 */
563 switch (cmd) {
564 case IPC_STAT:
565 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
566 return (set_errno(EFAULT));
567 break;
568
569 case IPC_STAT64:
570 if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
571 return (set_errno(EFAULT));
572 break;
573 }
574
575 return (0);
576 }
577
578 /*
579 * Remove all message queues associated with a given zone. Called by
580 * zone_shutdown when the zone is halted.
581 */
582 /*ARGSUSED1*/
583 static void
msg_remove_zone(zoneid_t zoneid,void * arg)584 msg_remove_zone(zoneid_t zoneid, void *arg)
585 {
586 ipc_remove_zone(msq_svc, zoneid);
587 }
588
589 /*
590 * msgget system call.
591 */
592 static int
msgget(key_t key,int msgflg)593 msgget(key_t key, int msgflg)
594 {
595 kmsqid_t *qp;
596 kmutex_t *lock;
597 int id, error;
598 int ii;
599 proc_t *pp = curproc;
600
601 top:
602 if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
603 return (set_errno(error));
604
605 if (IPC_FREE(&qp->msg_perm)) {
606 mutex_exit(lock);
607 mutex_exit(&pp->p_lock);
608
609 list_create(&qp->msg_list, sizeof (struct msg),
610 offsetof(struct msg, msg_node));
611 qp->msg_qnum = 0;
612 qp->msg_lspid = qp->msg_lrpid = 0;
613 qp->msg_stime = qp->msg_rtime = 0;
614 qp->msg_ctime = gethrestime_sec();
615 qp->msg_ngt_cnt = 0;
616 qp->msg_neg_copy = 0;
617 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
618 list_create(&qp->msg_wait_snd[ii],
619 sizeof (msgq_wakeup_t),
620 offsetof(msgq_wakeup_t, msgw_list));
621 list_create(&qp->msg_wait_snd_ngt[ii],
622 sizeof (msgq_wakeup_t),
623 offsetof(msgq_wakeup_t, msgw_list));
624 }
625 /*
626 * The proper initialization of msg_lowest_type is to the
627 * highest possible value. By doing this we guarantee that
628 * when the first send happens, the lowest type will be set
629 * properly.
630 */
631 qp->msg_lowest_type = MSG_SMALL_INIT;
632 list_create(&qp->msg_cpy_block,
633 sizeof (msgq_wakeup_t),
634 offsetof(msgq_wakeup_t, msgw_list));
635 list_create(&qp->msg_wait_rcv,
636 sizeof (msgq_wakeup_t),
637 offsetof(msgq_wakeup_t, msgw_list));
638 qp->msg_fnd_sndr = &msg_fnd_sndr[0];
639 qp->msg_fnd_rdr = &msg_fnd_rdr[0];
640 qp->msg_rcv_cnt = 0;
641 qp->msg_snd_cnt = 0;
642 qp->msg_snd_smallest = MSG_SMALL_INIT;
643
644 if (error = ipc_commit_begin(msq_svc, key, msgflg,
645 (kipc_perm_t *)qp)) {
646 if (error == EAGAIN)
647 goto top;
648 return (set_errno(error));
649 }
650 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
651 pp->p_rctls, pp);
652 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
653 pp->p_rctls, pp);
654 lock = ipc_commit_end(msq_svc, &qp->msg_perm);
655 }
656
657 if (AU_AUDITING())
658 audit_ipcget(AT_IPC_MSG, (void *)qp);
659
660 id = qp->msg_perm.ipc_id;
661 mutex_exit(lock);
662 return (id);
663 }
664
665 static ssize_t
msgrcv(int msqid,struct ipcmsgbuf * msgp,size_t msgsz,long msgtyp,int msgflg)666 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
667 {
668 struct msg *smp; /* ptr to best msg on q */
669 kmsqid_t *qp; /* ptr to associated q */
670 kmutex_t *lock;
671 size_t xtsz; /* transfer byte count */
672 int error = 0;
673 int cvres;
674 uint_t msg_hash;
675 msgq_wakeup_t msg_entry;
676
677 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
678
679 msg_hash = msg_type_hash(msgtyp);
680 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
681 return ((ssize_t)set_errno(EINVAL));
682 }
683 ipc_hold(msq_svc, (kipc_perm_t *)qp);
684
685 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
686 goto msgrcv_out;
687 }
688
689 /*
690 * Various information (including the condvar_t) required for the
691 * process to sleep is provided by it's stack.
692 */
693 msg_entry.msgw_thrd = curthread;
694 msg_entry.msgw_snd_wake = 0;
695 msg_entry.msgw_type = msgtyp;
696 findmsg:
697 smp = msgrcv_lookup(qp, msgtyp);
698
699 if (smp) {
700 /*
701 * We found a possible message to copy out.
702 */
703 if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
704 long t = msg_entry.msgw_snd_wake;
705 long copy_type = smp->msg_type;
706
707 /*
708 * It is available, attempt to copy it.
709 */
710 error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
711 smp, msgp, msgflg);
712
713 /*
714 * It is possible to consume a different message
715 * type then what originally awakened for (negative
716 * types). If this happens a check must be done to
717 * to determine if another receiver is available
718 * for the waking message type, Failure to do this
719 * can result in a message on the queue that can be
720 * serviced by a sleeping receiver.
721 */
722 if (!error && t && (copy_type != t))
723 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
724
725 /*
726 * Don't forget to wakeup a sleeper that blocked because
727 * we were copying things out.
728 */
729 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
730 goto msgrcv_out;
731 }
732 /*
733 * The selected message is being copied out, so block. We do
734 * not need to wake the next person up on the msg_cpy_block list
735 * due to the fact some one is copying out and they will get
736 * things moving again once the copy is completed.
737 */
738 cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
739 &msg_entry, &lock, qp);
740 error = msgq_check_err(qp, cvres);
741 if (error) {
742 goto msgrcv_out;
743 }
744 goto findmsg;
745 }
746 /*
747 * There isn't a message to copy out that matches the designated
748 * criteria.
749 */
750 if (msgflg & IPC_NOWAIT) {
751 error = ENOMSG;
752 goto msgrcv_out;
753 }
754 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
755
756 /*
757 * Wait for new message. We keep the negative and positive types
758 * separate for performance reasons.
759 */
760 msg_entry.msgw_snd_wake = 0;
761 if (msgtyp >= 0) {
762 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
763 &msg_entry, &lock, qp);
764 } else {
765 qp->msg_ngt_cnt++;
766 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
767 &msg_entry, &lock, qp);
768 qp->msg_ngt_cnt--;
769 }
770
771 if (!(error = msgq_check_err(qp, cvres))) {
772 goto findmsg;
773 }
774
775 msgrcv_out:
776 if (error) {
777 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
778 if (msg_entry.msgw_snd_wake) {
779 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
780 msg_entry.msgw_snd_wake);
781 }
782 ipc_rele(msq_svc, (kipc_perm_t *)qp);
783 return ((ssize_t)set_errno(error));
784 }
785 ipc_rele(msq_svc, (kipc_perm_t *)qp);
786 return ((ssize_t)xtsz);
787 }
788
789 static int
msgq_check_err(kmsqid_t * qp,int cvres)790 msgq_check_err(kmsqid_t *qp, int cvres)
791 {
792 if (IPC_FREE(&qp->msg_perm)) {
793 return (EIDRM);
794 }
795
796 if (cvres == 0) {
797 return (EINTR);
798 }
799
800 return (0);
801 }
802
803 static int
msg_copyout(kmsqid_t * qp,long msgtyp,kmutex_t ** lock,size_t * xtsz_ret,size_t msgsz,struct msg * smp,struct ipcmsgbuf * msgp,int msgflg)804 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
805 size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
806 {
807 size_t xtsz;
808 STRUCT_HANDLE(ipcmsgbuf, umsgp);
809 model_t mdl = get_udatamodel();
810 int copyerror = 0;
811
812 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
813 if (msgsz < smp->msg_size) {
814 if ((msgflg & MSG_NOERROR) == 0) {
815 return (E2BIG);
816 } else {
817 xtsz = msgsz;
818 }
819 } else {
820 xtsz = smp->msg_size;
821 }
822 *xtsz_ret = xtsz;
823
824 /*
825 * To prevent a DOS attack we mark the message as being
826 * copied out and release mutex. When the copy is completed
827 * we need to acquire the mutex and make the appropriate updates.
828 */
829 ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
830 smp->msg_flags |= MSG_RCVCOPY;
831 msg_hold(smp);
832 if (msgtyp < 0) {
833 ASSERT(qp->msg_neg_copy == 0);
834 qp->msg_neg_copy = 1;
835 }
836 mutex_exit(*lock);
837
838 if (mdl == DATAMODEL_NATIVE) {
839 copyerror = copyout(&smp->msg_type, msgp,
840 sizeof (smp->msg_type));
841 } else {
842 /*
843 * 32-bit callers need an imploded msg type.
844 */
845 int32_t msg_type32 = smp->msg_type;
846
847 copyerror = copyout(&msg_type32, msgp,
848 sizeof (msg_type32));
849 }
850
851 if (copyerror == 0 && xtsz) {
852 copyerror = copyout(smp->msg_addr,
853 STRUCT_FADDR(umsgp, mtext), xtsz);
854 }
855
856 /*
857 * Reclaim the mutex and make sure the message queue still exists.
858 */
859
860 *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
861 if (msgtyp < 0) {
862 qp->msg_neg_copy = 0;
863 }
864 ASSERT(smp->msg_flags & MSG_RCVCOPY);
865 smp->msg_flags &= ~MSG_RCVCOPY;
866 msg_rele(smp);
867 if (IPC_FREE(&qp->msg_perm)) {
868 return (EIDRM);
869 }
870 if (copyerror) {
871 return (EFAULT);
872 }
873 qp->msg_lrpid = ttoproc(curthread)->p_pid;
874 qp->msg_rtime = gethrestime_sec();
875 msgunlink(qp, smp);
876 return (0);
877 }
878
879 static struct msg *
msgrcv_lookup(kmsqid_t * qp,long msgtyp)880 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
881 {
882 struct msg *smp = NULL;
883 long qp_low;
884 struct msg *mp; /* ptr to msg on q */
885 long low_msgtype;
886 static struct msg neg_copy_smp;
887
888 mp = list_head(&qp->msg_list);
889 if (msgtyp == 0) {
890 smp = mp;
891 } else {
892 qp_low = qp->msg_lowest_type;
893 if (msgtyp > 0) {
894 /*
895 * If our lowest possible message type is larger than
896 * the message type desired, then we know there is
897 * no entry present.
898 */
899 if (qp_low > msgtyp) {
900 return (NULL);
901 }
902
903 for (; mp; mp = list_next(&qp->msg_list, mp)) {
904 if (msgtyp == mp->msg_type) {
905 smp = mp;
906 break;
907 }
908 }
909 } else {
910 /*
911 * We have kept track of the lowest possible message
912 * type on the send queue. This allows us to terminate
913 * the search early if we find a message type of that
914 * type. Note, the lowest type may not be the actual
915 * lowest value in the system, it is only guaranteed
916 * that there isn't a value lower than that.
917 */
918 low_msgtype = -msgtyp;
919 if (low_msgtype < qp_low) {
920 return (NULL);
921 }
922 if (qp->msg_neg_copy) {
923 neg_copy_smp.msg_flags = MSG_RCVCOPY;
924 return (&neg_copy_smp);
925 }
926 for (; mp; mp = list_next(&qp->msg_list, mp)) {
927 if (mp->msg_type <= low_msgtype &&
928 !(smp && smp->msg_type <= mp->msg_type)) {
929 smp = mp;
930 low_msgtype = mp->msg_type;
931 if (low_msgtype == qp_low) {
932 break;
933 }
934 }
935 }
936 if (smp) {
937 /*
938 * Update the lowest message type.
939 */
940 qp->msg_lowest_type = smp->msg_type;
941 }
942 }
943 }
944 return (smp);
945 }
946
947 /*
948 * msgids system call.
949 */
950 static int
msgids(int * buf,uint_t nids,uint_t * pnids)951 msgids(int *buf, uint_t nids, uint_t *pnids)
952 {
953 int error;
954
955 if (error = ipc_ids(msq_svc, buf, nids, pnids))
956 return (set_errno(error));
957
958 return (0);
959 }
960
961 #define RND(x) roundup((x), sizeof (size_t))
962 #define RND32(x) roundup((x), sizeof (size32_t))
963
964 /*
965 * msgsnap system call.
966 */
967 static int
msgsnap(int msqid,caddr_t buf,size_t bufsz,long msgtyp)968 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
969 {
970 struct msg *mp; /* ptr to msg on q */
971 kmsqid_t *qp; /* ptr to associated q */
972 kmutex_t *lock;
973 size_t size;
974 size_t nmsg;
975 struct msg **snaplist;
976 int error, i;
977 model_t mdl = get_udatamodel();
978 STRUCT_DECL(msgsnap_head, head);
979 STRUCT_DECL(msgsnap_mhead, mhead);
980
981 STRUCT_INIT(head, mdl);
982 STRUCT_INIT(mhead, mdl);
983
984 if (bufsz < STRUCT_SIZE(head))
985 return (set_errno(EINVAL));
986
987 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
988 return (set_errno(EINVAL));
989
990 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
991 mutex_exit(lock);
992 return (set_errno(error));
993 }
994 ipc_hold(msq_svc, (kipc_perm_t *)qp);
995
996 /*
997 * First compute the required buffer size and
998 * the number of messages on the queue.
999 */
1000 size = nmsg = 0;
1001 for (mp = list_head(&qp->msg_list); mp;
1002 mp = list_next(&qp->msg_list, mp)) {
1003 if (msgtyp == 0 ||
1004 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1005 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1006 nmsg++;
1007 if (mdl == DATAMODEL_NATIVE)
1008 size += RND(mp->msg_size);
1009 else
1010 size += RND32(mp->msg_size);
1011 }
1012 }
1013
1014 size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1015 if (size > bufsz)
1016 nmsg = 0;
1017
1018 if (nmsg > 0) {
1019 /*
1020 * Mark the messages as being copied.
1021 */
1022 snaplist = (struct msg **)kmem_alloc(nmsg *
1023 sizeof (struct msg *), KM_SLEEP);
1024 i = 0;
1025 for (mp = list_head(&qp->msg_list); mp;
1026 mp = list_next(&qp->msg_list, mp)) {
1027 if (msgtyp == 0 ||
1028 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1029 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1030 msg_hold(mp);
1031 snaplist[i] = mp;
1032 i++;
1033 }
1034 }
1035 }
1036 mutex_exit(lock);
1037
1038 /*
1039 * Copy out the buffer header.
1040 */
1041 STRUCT_FSET(head, msgsnap_size, size);
1042 STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1043 if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1044 error = EFAULT;
1045
1046 buf += STRUCT_SIZE(head);
1047
1048 /*
1049 * Now copy out the messages one by one.
1050 */
1051 for (i = 0; i < nmsg; i++) {
1052 mp = snaplist[i];
1053 if (error == 0) {
1054 STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1055 STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1056 if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1057 error = EFAULT;
1058 buf += STRUCT_SIZE(mhead);
1059
1060 if (error == 0 &&
1061 mp->msg_size != 0 &&
1062 copyout(mp->msg_addr, buf, mp->msg_size))
1063 error = EFAULT;
1064 if (mdl == DATAMODEL_NATIVE)
1065 buf += RND(mp->msg_size);
1066 else
1067 buf += RND32(mp->msg_size);
1068 }
1069 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1070 msg_rele(mp);
1071 /* Check for msg q deleted or reallocated */
1072 if (IPC_FREE(&qp->msg_perm))
1073 error = EIDRM;
1074 mutex_exit(lock);
1075 }
1076
1077 (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1078 ipc_rele(msq_svc, (kipc_perm_t *)qp);
1079
1080 if (nmsg > 0)
1081 kmem_free(snaplist, nmsg * sizeof (struct msg *));
1082
1083 if (error)
1084 return (set_errno(error));
1085 return (0);
1086 }
1087
1088 #define MSG_PREALLOC_LIMIT 8192
1089
1090 /*
1091 * msgsnd system call.
1092 */
1093 static int
msgsnd(int msqid,struct ipcmsgbuf * msgp,size_t msgsz,int msgflg)1094 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1095 {
1096 kmsqid_t *qp;
1097 kmutex_t *lock = NULL;
1098 struct msg *mp = NULL;
1099 long type;
1100 int error = 0, wait_wakeup = 0;
1101 msgq_wakeup_t msg_entry;
1102 model_t mdl = get_udatamodel();
1103 STRUCT_HANDLE(ipcmsgbuf, umsgp);
1104
1105 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
1106 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1107
1108 if (mdl == DATAMODEL_NATIVE) {
1109 if (copyin(msgp, &type, sizeof (type)))
1110 return (set_errno(EFAULT));
1111 } else {
1112 int32_t type32;
1113 if (copyin(msgp, &type32, sizeof (type32)))
1114 return (set_errno(EFAULT));
1115 type = type32;
1116 }
1117
1118 if (type < 1)
1119 return (set_errno(EINVAL));
1120
1121 /*
1122 * We want the value here large enough that most of the
1123 * the message operations will use the "lockless" path,
1124 * but small enough that a user can not reserve large
1125 * chunks of kernel memory unless they have a valid
1126 * reason to.
1127 */
1128 if (msgsz <= MSG_PREALLOC_LIMIT) {
1129 /*
1130 * We are small enough that we can afford to do the
1131 * allocation now. This saves dropping the lock
1132 * and then reacquiring the lock.
1133 */
1134 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1135 mp->msg_copycnt = 1;
1136 mp->msg_size = msgsz;
1137 if (msgsz) {
1138 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1139 if (copyin(STRUCT_FADDR(umsgp, mtext),
1140 mp->msg_addr, msgsz) == -1) {
1141 error = EFAULT;
1142 goto msgsnd_out;
1143 }
1144 }
1145 }
1146
1147 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1148 error = EINVAL;
1149 goto msgsnd_out;
1150 }
1151
1152 ipc_hold(msq_svc, (kipc_perm_t *)qp);
1153
1154 if (msgsz > qp->msg_qbytes) {
1155 error = EINVAL;
1156 goto msgsnd_out;
1157 }
1158
1159 if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1160 goto msgsnd_out;
1161
1162 top:
1163 /*
1164 * Allocate space on q, message header, & buffer space.
1165 */
1166 ASSERT(qp->msg_qnum <= qp->msg_qmax);
1167 while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1168 (qp->msg_qnum == qp->msg_qmax)) {
1169 int cvres;
1170
1171 if (msgflg & IPC_NOWAIT) {
1172 error = EAGAIN;
1173 goto msgsnd_out;
1174 }
1175
1176 wait_wakeup = 0;
1177 qp->msg_snd_cnt++;
1178 msg_entry.msgw_snd_size = msgsz;
1179 msg_entry.msgw_thrd = curthread;
1180 msg_entry.msgw_type = type;
1181 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1182 list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1183 if (qp->msg_snd_smallest > msgsz)
1184 qp->msg_snd_smallest = msgsz;
1185 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1186 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1187 qp->msg_snd_cnt--;
1188 if (list_link_active(&msg_entry.msgw_list))
1189 list_remove(&qp->msg_wait_rcv, &msg_entry);
1190 if (error = msgq_check_err(qp, cvres)) {
1191 goto msgsnd_out;
1192 }
1193 wait_wakeup = 1;
1194 }
1195
1196 if (mp == NULL) {
1197 int failure;
1198
1199 mutex_exit(lock);
1200 ASSERT(msgsz > 0);
1201 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1202 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1203 mp->msg_size = msgsz;
1204 mp->msg_copycnt = 1;
1205
1206 failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1207 mp->msg_addr, msgsz) == -1);
1208 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1209 if (IPC_FREE(&qp->msg_perm)) {
1210 error = EIDRM;
1211 goto msgsnd_out;
1212 }
1213 if (failure) {
1214 error = EFAULT;
1215 goto msgsnd_out;
1216 }
1217 goto top;
1218 }
1219
1220 /*
1221 * Everything is available, put msg on q.
1222 */
1223 qp->msg_qnum++;
1224 qp->msg_cbytes += msgsz;
1225 qp->msg_lspid = curproc->p_pid;
1226 qp->msg_stime = gethrestime_sec();
1227 mp->msg_type = type;
1228 if (qp->msg_lowest_type > type)
1229 qp->msg_lowest_type = type;
1230 list_insert_tail(&qp->msg_list, mp);
1231 /*
1232 * Get the proper receiver going.
1233 */
1234 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1235
1236 msgsnd_out:
1237 /*
1238 * We were woken up from the send wait list, but an
1239 * an error occured on placing the message onto the
1240 * msg queue. Given that, we need to do the wakeup
1241 * dance again.
1242 */
1243
1244 if (wait_wakeup && error) {
1245 msg_wakeup_senders(qp);
1246 }
1247 if (lock)
1248 ipc_rele(msq_svc, (kipc_perm_t *)qp); /* drops lock */
1249
1250 if (error) {
1251 if (mp)
1252 msg_rele(mp);
1253 return (set_errno(error));
1254 }
1255
1256 return (0);
1257 }
1258
1259 static void
msg_wakeup_rdr(kmsqid_t * qp,msg_select_t ** flist,long type)1260 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1261 {
1262 msg_select_t *walker = *flist;
1263 msgq_wakeup_t *wakeup;
1264 uint_t msg_hash;
1265
1266 msg_hash = msg_type_hash(type);
1267
1268 do {
1269 wakeup = walker->selection(qp, msg_hash, type);
1270 walker = walker->next_selection;
1271 } while (!wakeup && walker != *flist);
1272
1273 *flist = (*flist)->next_selection;
1274 if (wakeup) {
1275 if (type) {
1276 wakeup->msgw_snd_wake = type;
1277 }
1278 cv_signal(&wakeup->msgw_wake_cv);
1279 }
1280 }
1281
1282 static uint_t
msg_type_hash(long msg_type)1283 msg_type_hash(long msg_type)
1284 {
1285 if (msg_type < 0) {
1286 long hash = -msg_type / MSG_NEG_INTERVAL;
1287 /*
1288 * Negative message types are hashed over an
1289 * interval. Any message type that hashes
1290 * beyond MSG_MAX_QNUM is automatically placed
1291 * in the last bucket.
1292 */
1293 if (hash > MSG_MAX_QNUM)
1294 hash = MSG_MAX_QNUM;
1295 return (hash);
1296 }
1297
1298 /*
1299 * 0 or positive message type. The first bucket is reserved for
1300 * message receivers of type 0, the other buckets we hash into.
1301 */
1302 if (msg_type)
1303 return (1 + (msg_type % MSG_MAX_QNUM));
1304 return (0);
1305 }
1306
1307 /*
1308 * Routines to see if we have a receiver of type 0 either blocked waiting
1309 * for a message. Simply return the first guy on the list.
1310 */
1311
1312 static msgq_wakeup_t *
1313 /* ARGSUSED */
msg_fnd_any_snd(kmsqid_t * qp,int msg_hash,long type)1314 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1315 {
1316 msgq_wakeup_t *walker;
1317
1318 walker = list_head(&qp->msg_wait_snd[0]);
1319
1320 if (walker)
1321 list_remove(&qp->msg_wait_snd[0], walker);
1322 return (walker);
1323 }
1324
1325 static msgq_wakeup_t *
1326 /* ARGSUSED */
msg_fnd_any_rdr(kmsqid_t * qp,int msg_hash,long type)1327 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1328 {
1329 msgq_wakeup_t *walker;
1330
1331 walker = list_head(&qp->msg_cpy_block);
1332 if (walker)
1333 list_remove(&qp->msg_cpy_block, walker);
1334 return (walker);
1335 }
1336
1337 static msgq_wakeup_t *
msg_fnd_spc_snd(kmsqid_t * qp,int msg_hash,long type)1338 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1339 {
1340 msgq_wakeup_t *walker;
1341
1342 walker = list_head(&qp->msg_wait_snd[msg_hash]);
1343
1344 while (walker && walker->msgw_type != type)
1345 walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1346 if (walker)
1347 list_remove(&qp->msg_wait_snd[msg_hash], walker);
1348 return (walker);
1349 }
1350
1351 /* ARGSUSED */
1352 static msgq_wakeup_t *
msg_fnd_neg_snd(kmsqid_t * qp,int msg_hash,long type)1353 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1354 {
1355 msgq_wakeup_t *qptr;
1356 int count;
1357 int check_index;
1358 int neg_index;
1359 int nbuckets;
1360
1361 if (!qp->msg_ngt_cnt) {
1362 return (NULL);
1363 }
1364 neg_index = msg_type_hash(-type);
1365
1366 /*
1367 * Check for a match among the negative type queues. Any buckets
1368 * at neg_index or larger can match the type. Use the last send
1369 * time to randomize the starting bucket to prevent starvation.
1370 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1371 * from the random starting point, and wrapping around after
1372 * MSG_MAX_QNUM.
1373 */
1374
1375 nbuckets = MSG_MAX_QNUM - neg_index + 1;
1376 check_index = neg_index + (qp->msg_stime % nbuckets);
1377
1378 for (count = nbuckets; count > 0; count--) {
1379 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1380 while (qptr) {
1381 /*
1382 * The lowest hash bucket may actually contain
1383 * message types that are not valid for this
1384 * request. This can happen due to the fact that
1385 * the message buckets actually contain a consecutive
1386 * range of types.
1387 */
1388 if (-qptr->msgw_type >= type) {
1389 list_remove(&qp->msg_wait_snd_ngt[check_index],
1390 qptr);
1391 return (qptr);
1392 }
1393 qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1394 qptr);
1395 }
1396 if (++check_index > MSG_MAX_QNUM) {
1397 check_index = neg_index;
1398 }
1399 }
1400 return (NULL);
1401 }
1402
1403 static int
msg_rcvq_sleep(list_t * queue,msgq_wakeup_t * entry,kmutex_t ** lock,kmsqid_t * qp)1404 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1405 kmsqid_t *qp)
1406 {
1407 int cvres;
1408
1409 cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1410
1411 list_insert_tail(queue, entry);
1412
1413 qp->msg_rcv_cnt++;
1414 cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1415 *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1416 qp->msg_rcv_cnt--;
1417
1418 if (list_link_active(&entry->msgw_list)) {
1419 /*
1420 * We woke up unexpectedly, remove ourself.
1421 */
1422 list_remove(queue, entry);
1423 }
1424
1425 return (cvres);
1426 }
1427
1428 static void
msg_rcvq_wakeup_all(list_t * q_ptr)1429 msg_rcvq_wakeup_all(list_t *q_ptr)
1430 {
1431 msgq_wakeup_t *q_walk;
1432
1433 while (q_walk = list_head(q_ptr)) {
1434 list_remove(q_ptr, q_walk);
1435 cv_signal(&q_walk->msgw_wake_cv);
1436 }
1437 }
1438
1439 /*
1440 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441 * system calls.
1442 */
1443 static ssize_t
msgsys(int opcode,uintptr_t a1,uintptr_t a2,uintptr_t a3,uintptr_t a4,uintptr_t a5)1444 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1445 uintptr_t a4, uintptr_t a5)
1446 {
1447 ssize_t error;
1448
1449 switch (opcode) {
1450 case MSGGET:
1451 error = msgget((key_t)a1, (int)a2);
1452 break;
1453 case MSGCTL:
1454 error = msgctl((int)a1, (int)a2, (void *)a3);
1455 break;
1456 case MSGRCV:
1457 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1458 (size_t)a3, (long)a4, (int)a5);
1459 break;
1460 case MSGSND:
1461 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1462 (size_t)a3, (int)a4);
1463 break;
1464 case MSGIDS:
1465 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1466 break;
1467 case MSGSNAP:
1468 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1469 break;
1470 default:
1471 error = set_errno(EINVAL);
1472 break;
1473 }
1474
1475 return (error);
1476 }
1477
1478 /*
1479 * Determine if a writer who is waiting can process its message. If so
1480 * wake it up.
1481 */
1482 static void
msg_wakeup_senders(kmsqid_t * qp)1483 msg_wakeup_senders(kmsqid_t *qp)
1484
1485 {
1486 struct msgq_wakeup *ptr, *optr;
1487 size_t avail, smallest;
1488 int msgs_out;
1489
1490 /*
1491 * Is there a writer waiting, and if so, can it be serviced? If
1492 * not return back to the caller.
1493 */
1494 if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1495 return;
1496
1497 avail = qp->msg_qbytes - qp->msg_cbytes;
1498 if (avail < qp->msg_snd_smallest)
1499 return;
1500
1501 ptr = list_head(&qp->msg_wait_rcv);
1502 if (ptr == NULL) {
1503 qp->msg_snd_smallest = MSG_SMALL_INIT;
1504 return;
1505 }
1506 optr = ptr;
1507
1508 /*
1509 * smallest: minimum message size of all queued writers
1510 *
1511 * avail: amount of space left on the msgq
1512 * if all the writers we have woken up are successful.
1513 *
1514 * msgs_out: is the number of messages on the message queue if
1515 * all the writers we have woken up are successful.
1516 */
1517
1518 smallest = MSG_SMALL_INIT;
1519 msgs_out = qp->msg_qnum;
1520 while (ptr) {
1521 ptr = list_next(&qp->msg_wait_rcv, ptr);
1522 if (optr->msgw_snd_size <= avail) {
1523 list_remove(&qp->msg_wait_rcv, optr);
1524 avail -= optr->msgw_snd_size;
1525 cv_signal(&optr->msgw_wake_cv);
1526 msgs_out++;
1527 if (msgs_out == qp->msg_qmax ||
1528 avail < qp->msg_snd_smallest)
1529 break;
1530 } else {
1531 if (smallest > optr->msgw_snd_size)
1532 smallest = optr->msgw_snd_size;
1533 }
1534 optr = ptr;
1535 }
1536
1537 /*
1538 * Reset the smallest message size if the entire list has been visited
1539 */
1540 if (ptr == NULL && smallest != MSG_SMALL_INIT)
1541 qp->msg_snd_smallest = smallest;
1542 }
1543
1544 #ifdef _SYSCALL32_IMPL
1545 /*
1546 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1547 * system calls for 32-bit callers on LP64 kernel.
1548 */
1549 static ssize32_t
msgsys32(int opcode,uint32_t a1,uint32_t a2,uint32_t a3,uint32_t a4,uint32_t a5)1550 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1551 uint32_t a4, uint32_t a5)
1552 {
1553 ssize_t error;
1554
1555 switch (opcode) {
1556 case MSGGET:
1557 error = msgget((key_t)a1, (int)a2);
1558 break;
1559 case MSGCTL:
1560 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1561 break;
1562 case MSGRCV:
1563 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1564 (size_t)a3, (long)(int32_t)a4, (int)a5);
1565 break;
1566 case MSGSND:
1567 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1568 (size_t)(int32_t)a3, (int)a4);
1569 break;
1570 case MSGIDS:
1571 error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1572 (uint_t *)(uintptr_t)a3);
1573 break;
1574 case MSGSNAP:
1575 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1576 (long)(int32_t)a4);
1577 break;
1578 default:
1579 error = set_errno(EINVAL);
1580 break;
1581 }
1582
1583 return (error);
1584 }
1585 #endif /* SYSCALL32_IMPL */
1586