xref: /titanic_51/usr/src/uts/common/os/msg.c (revision f13ac6397da869bb7b7bc8a9f0b2e8fff530c346)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
27 /*	  All Rights Reserved  	*/
28 
29 
30 #pragma ident	"%Z%%M%	%I%	%E% SMI"
31 
32 /*
33  * Inter-Process Communication Message Facility.
34  *
35  * See os/ipc.c for a description of common IPC functionality.
36  *
37  * Resource controls
38  * -----------------
39  *
40  * Control:      zone.max-msg-ids (rc_zone_msgmni)
41  * Description:  Maximum number of message queue ids allowed a zone.
42  *
43  *   When msgget() is used to allocate a message queue, one id is
44  *   allocated.  If the id allocation doesn't succeed, msgget() fails
45  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
46  *   the id is deallocated.
47  *
48  * Control:      project.max-msg-ids (rc_project_msgmni)
49  * Description:  Maximum number of message queue ids allowed a project.
50  *
51  *   When msgget() is used to allocate a message queue, one id is
52  *   allocated.  If the id allocation doesn't succeed, msgget() fails
53  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
54  *   the id is deallocated.
55  *
56  * Control:      process.max-msg-qbytes (rc_process_msgmnb)
57  * Description:  Maximum number of bytes of messages on a message queue.
58  *
59  *   When msgget() successfully allocates a message queue, the minimum
60  *   enforced value of this limit is used to initialize msg_qbytes.
61  *
62  * Control:      process.max-msg-messages (rc_process_msgtql)
63  * Description:  Maximum number of messages on a message queue.
64  *
65  *   When msgget() successfully allocates a message queue, the minimum
66  *   enforced value of this limit is used to initialize a per-queue
67  *   limit on the number of messages.
68  */
69 
70 #include <sys/types.h>
71 #include <sys/t_lock.h>
72 #include <sys/param.h>
73 #include <sys/cred.h>
74 #include <sys/user.h>
75 #include <sys/proc.h>
76 #include <sys/time.h>
77 #include <sys/ipc.h>
78 #include <sys/ipc_impl.h>
79 #include <sys/msg.h>
80 #include <sys/msg_impl.h>
81 #include <sys/list.h>
82 #include <sys/systm.h>
83 #include <sys/sysmacros.h>
84 #include <sys/cpuvar.h>
85 #include <sys/kmem.h>
86 #include <sys/ddi.h>
87 #include <sys/errno.h>
88 #include <sys/cmn_err.h>
89 #include <sys/debug.h>
90 #include <sys/project.h>
91 #include <sys/modctl.h>
92 #include <sys/syscall.h>
93 #include <sys/policy.h>
94 #include <sys/zone.h>
95 
96 #include <c2/audit.h>
97 
98 /*
99  * The following tunables are obsolete.  Though for compatibility we
100  * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
101  * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
102  * mechanism for administrating the IPC Message facility is through the
103  * resource controls described at the top of this file.
104  */
105 size_t	msginfo_msgmax = 2048;	/* (obsolete) */
106 size_t	msginfo_msgmnb = 4096;	/* (obsolete) */
107 int	msginfo_msgmni = 50;	/* (obsolete) */
108 int	msginfo_msgtql = 40;	/* (obsolete) */
109 int	msginfo_msgssz = 8;	/* (obsolete) */
110 int	msginfo_msgmap = 0;	/* (obsolete) */
111 ushort_t msginfo_msgseg = 1024;	/* (obsolete) */
112 
113 extern rctl_hndl_t rc_zone_msgmni;
114 extern rctl_hndl_t rc_project_msgmni;
115 extern rctl_hndl_t rc_process_msgmnb;
116 extern rctl_hndl_t rc_process_msgtql;
117 static ipc_service_t *msq_svc;
118 static zone_key_t msg_zone_key;
119 
120 static void msg_dtor(kipc_perm_t *);
121 static void msg_rmid(kipc_perm_t *);
122 static void msg_remove_zone(zoneid_t, void *);
123 
124 /*
125  * Module linkage information for the kernel.
126  */
127 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
128 	uintptr_t a4, uintptr_t a5);
129 
130 static struct sysent ipcmsg_sysent = {
131 	6,
132 #ifdef	_LP64
133 	SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
134 #else
135 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
136 #endif
137 	(int (*)())msgsys
138 };
139 
140 #ifdef	_SYSCALL32_IMPL
141 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
142 	uint32_t a4, uint32_t a5);
143 
144 static struct sysent ipcmsg_sysent32 = {
145 	6,
146 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
147 	(int (*)())msgsys32
148 };
149 #endif	/* _SYSCALL32_IMPL */
150 
151 static struct modlsys modlsys = {
152 	&mod_syscallops, "System V message facility", &ipcmsg_sysent
153 };
154 
155 #ifdef _SYSCALL32_IMPL
156 static struct modlsys modlsys32 = {
157 	&mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
158 };
159 #endif
160 
161 /*
162  *      Big Theory statement for message queue correctness
163  *
164  * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
165  * receivers who are waiting for an event.  Using the cv_broadcast method
166  * resulted in negative scaling when the number of waiting receivers are large
167  * (the thundering herd problem).  Instead, the receivers waiting to receive a
168  * message are now linked in a queue-like fashion and awaken one at a time in
169  * a controlled manner.
170  *
171  * Receivers can block on two different classes of waiting list:
172  *    1) "sendwait" list, which is the more complex list of the two.  The
173  *	  receiver will be awakened by a sender posting a new message.  There
174  *	  are two types of "sendwait" list used:
175  *		a) msg_wait_snd: handles all receivers who are looking for
176  *		   a message type >= 0, but was unable to locate a match.
177  *
178  *		   slot 0: reserved for receivers that have designated they
179  *			   will take any message type.
180  *		   rest:   consist of receivers requesting a specific type
181  *			   but the type was not present.  The entries are
182  *			   hashed into a bucket in an attempt to keep
183  *			   any list search relatively short.
184  * 		b) msg_wait_snd_ngt: handles all receivers that have designated
185  *		   a negative message type. Unlike msg_wait_snd, the hash bucket
186  *		   serves a range of negative message types (-1 to -5, -6 to -10
187  *		   and so forth), where the last bucket is reserved for all the
188  *		   negative message types that hash outside of MSG_MAX_QNUM - 1.
189  *		   This is done this way to simplify the operation of locating a
190  *		   negative message type.
191  *
192  *    2) "copyout" list, where the receiver is awakened by another
193  *	 receiver after a message is copied out.  This is a linked list
194  *	 of waiters that are awakened one at a time.  Although the solution is
195  *	 not optimal, the complexity that would be added in for waking
196  *	 up the right entry far exceeds any potential pay back (too many
197  *	 correctness and corner case issues).
198  *
199  * The lists are doubly linked.  In the case of the "sendwait"
200  * list, this allows the thread to remove itself from the list without having
201  * to traverse the list.  In the case of the "copyout" list it simply allows
202  * us to use common functions with the "sendwait" list.
203  *
204  * To make sure receivers are not hung out to dry, we must guarantee:
205  *    1. If any queued message matches any receiver, then at least one
206  *       matching receiver must be processing the request.
207  *    2. Blocking on the copyout queue is only temporary while messages
208  *	 are being copied out.  The process is guaranted to wakeup
209  *	 when it gets to front of the queue (copyout is a FIFO).
210  *
211  * Rules for blocking and waking up:
212  *   1. A receiver entering msgrcv must examine all messages for a match
213  *      before blocking on a sendwait queue.
214  *   2. If the receiver blocks because the message it chose is already
215  *	being copied out, then when it wakes up needs to start start
216  *	checking the messages from the beginning.
217  *   3) When ever a process returns from msgrcv for any reason, if it
218  *	had attempted to copy a message or blocked waiting for a copy
219  *	to complete it needs to wakeup the next receiver blocked on
220  *	a copy out.
221  *   4) When a message is sent, the sender selects a process waiting
222  *	for that type of message.  This selection process rotates between
223  *	receivers types of 0, negative and positive to prevent starvation of
224  *	any one particular receiver type.
225  *   5) The following are the scenarios for processes that are awakened
226  *	by a msgsnd:
227  *		a) The process finds the message and is able to copy
228  *		   it out.  Once complete, the process returns.
229  *		b) The message that was sent that triggered the wakeup is no
230  *		   longer available (another process found the message first).
231  *		   We issue a wakeup on copy queue and then go back to
232  *		   sleep waiting for another matching message to be sent.
233  *		c) The message that was supposed to be processed was
234  *		   already serviced by another process.  However a different
235  *		   message is present which we can service.  The message
236  *		   is copied and the process returns.
237  *		d) The message is found, but some sort of error occurs that
238  *		   prevents the message from being copied.  The receiver
239  *		   wakes up the next sender that can service this message
240  *		   type and returns an error to the caller.
241  *		e) The message is found, but it is marked as being copied
242  *		   out.  The receiver then goes to sleep on the copyout
243  *		   queue where it will be awakened again sometime in the future.
244  *
245  *
246  *   6) Whenever a message is found that matches the message type designated,
247  * 	but is being copied out we have to block on the copyout queue.
248  *	After process copying finishes the copy out, it  must wakeup (either
249  *	directly or indirectly) all receivers who blocked on its copyout,
250  *	so they are guaranteed a chance to examine the remaining messages.
251  *	This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
252  *	and so on.  The chain cannot be broken.  This leads to the following
253  *	cases:
254  *		a) A receiver is finished copying the message (or encountered)
255  *		   an error), the first entry on the copyout queue is woken
256  *		   up.
257  *		b) When the receiver is woken up, it attempts to locate
258  *		   a message type match.
259  *		c) If a message type is found and
260  *			-- MSG_RCVCOPY flag is not set, the message is
261  *			   marked for copying out.  Regardless of the copyout
262  *			   success the next entry on the copyout queue is
263  *			   awakened and the operation is completed.
264  *			-- MSG_RCVCOPY is set, we simply go back to sleep again
265  *			   on the copyout queue.
266  *		d) If the message type is not found then we wakeup the next
267  *		   process on the copyout queue.
268  */
269 
270 static uint_t msg_type_hash(long);
271 static int msgq_check_err(kmsqid_t *qp, int cvres);
272 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
273     kmsqid_t *);
274 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
275     struct msg *, struct ipcmsgbuf *, int);
276 static void msg_rcvq_wakeup_all(list_t *);
277 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
278 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
279 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
280 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
281 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
282 static struct msg *msgrcv_lookup(kmsqid_t *, long);
283 
284 msg_select_t msg_fnd_sndr[] = {
285 	{ msg_fnd_any_snd, &msg_fnd_sndr[1] },
286 	{ msg_fnd_spc_snd, &msg_fnd_sndr[2] },
287 	{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
288 };
289 
290 msg_select_t msg_fnd_rdr[1] = {
291 	{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
292 };
293 
294 static struct modlinkage modlinkage = {
295 	MODREV_1,
296 	&modlsys,
297 #ifdef _SYSCALL32_IMPL
298 	&modlsys32,
299 #endif
300 	NULL
301 };
302 
303 
304 int
305 _init(void)
306 {
307 	int result;
308 
309 	msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
310 	    sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
311 	    offsetof(ipc_rqty_t, ipcq_msgmni));
312 	zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
313 
314 	if ((result = mod_install(&modlinkage)) == 0)
315 		return (0);
316 
317 	(void) zone_key_delete(msg_zone_key);
318 	ipcs_destroy(msq_svc);
319 
320 	return (result);
321 }
322 
323 int
324 _fini(void)
325 {
326 	return (EBUSY);
327 }
328 
329 int
330 _info(struct modinfo *modinfop)
331 {
332 	return (mod_info(&modlinkage, modinfop));
333 }
334 
335 static void
336 msg_dtor(kipc_perm_t *perm)
337 {
338 	kmsqid_t *qp = (kmsqid_t *)perm;
339 	int		ii;
340 
341 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
342 		ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
343 		ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
344 		list_destroy(&qp->msg_wait_snd[ii]);
345 		list_destroy(&qp->msg_wait_snd_ngt[ii]);
346 	}
347 	ASSERT(list_is_empty(&qp->msg_cpy_block));
348 	list_destroy(&qp->msg_cpy_block);
349 	ASSERT(qp->msg_snd_cnt == 0);
350 	ASSERT(qp->msg_cbytes == 0);
351 	list_destroy(&qp->msg_list);
352 }
353 
354 
355 #define	msg_hold(mp)	(mp)->msg_copycnt++
356 
357 /*
358  * msg_rele - decrement the reference count on the message.  When count
359  * reaches zero, free message header and contents.
360  */
361 static void
362 msg_rele(struct msg *mp)
363 {
364 	ASSERT(mp->msg_copycnt > 0);
365 	if (mp->msg_copycnt-- == 1) {
366 		if (mp->msg_addr)
367 			kmem_free(mp->msg_addr, mp->msg_size);
368 		kmem_free(mp, sizeof (struct msg));
369 	}
370 }
371 
372 /*
373  * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
374  * waiting for free bytes on queue.
375  *
376  * Called with queue locked.
377  */
378 static void
379 msgunlink(kmsqid_t *qp, struct msg *mp)
380 {
381 	list_remove(&qp->msg_list, mp);
382 	qp->msg_qnum--;
383 	qp->msg_cbytes -= mp->msg_size;
384 	msg_rele(mp);
385 
386 	/* Wake up waiting writers */
387 	if (qp->msg_snd_cnt)
388 		cv_broadcast(&qp->msg_snd_cv);
389 }
390 
391 static void
392 msg_rmid(kipc_perm_t *perm)
393 {
394 	kmsqid_t *qp = (kmsqid_t *)perm;
395 	struct msg *mp;
396 	int		ii;
397 
398 
399 	while ((mp = list_head(&qp->msg_list)) != NULL)
400 		msgunlink(qp, mp);
401 	ASSERT(qp->msg_cbytes == 0);
402 
403 	/*
404 	 * Wake up everyone who is in a wait state of some sort
405 	 * for this message queue.
406 	 */
407 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
408 		msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
409 		msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
410 	}
411 	msg_rcvq_wakeup_all(&qp->msg_cpy_block);
412 	if (qp->msg_snd_cnt)
413 		cv_broadcast(&qp->msg_snd_cv);
414 }
415 
416 /*
417  * msgctl system call.
418  *
419  * gets q lock (via ipc_lookup), releases before return.
420  * may call users of msg_lock
421  */
422 static int
423 msgctl(int msgid, int cmd, void *arg)
424 {
425 	STRUCT_DECL(msqid_ds, ds);		/* SVR4 queue work area */
426 	kmsqid_t		*qp;		/* ptr to associated q */
427 	int			error;
428 	struct	cred		*cr;
429 	model_t	mdl = get_udatamodel();
430 	struct msqid_ds64	ds64;
431 	kmutex_t		*lock;
432 	proc_t			*pp = curproc;
433 
434 	STRUCT_INIT(ds, mdl);
435 	cr = CRED();
436 
437 	/*
438 	 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
439 	 */
440 	switch (cmd) {
441 	case IPC_SET:
442 		if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
443 			return (set_errno(EFAULT));
444 		break;
445 
446 	case IPC_SET64:
447 		if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
448 			return (set_errno(EFAULT));
449 		break;
450 
451 	case IPC_RMID:
452 		if (error = ipc_rmid(msq_svc, msgid, cr))
453 			return (set_errno(error));
454 		return (0);
455 	}
456 
457 	/*
458 	 * get msqid_ds for this msgid
459 	 */
460 	if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
461 		return (set_errno(EINVAL));
462 
463 	switch (cmd) {
464 	case IPC_SET:
465 		if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
466 		    secpolicy_ipc_config(cr) != 0) {
467 			mutex_exit(lock);
468 			return (set_errno(EPERM));
469 		}
470 		if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
471 		    &STRUCT_BUF(ds)->msg_perm, mdl)) {
472 			mutex_exit(lock);
473 			return (set_errno(error));
474 		}
475 		qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
476 		qp->msg_ctime = gethrestime_sec();
477 		break;
478 
479 	case IPC_STAT:
480 		if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
481 			mutex_exit(lock);
482 			return (set_errno(error));
483 		}
484 
485 		if (qp->msg_rcv_cnt)
486 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
487 		if (qp->msg_snd_cnt)
488 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
489 		ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
490 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
491 		STRUCT_FSETP(ds, msg_first, NULL); 	/* kernel addr */
492 		STRUCT_FSETP(ds, msg_last, NULL);
493 		STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
494 		STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
495 		STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
496 		STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
497 		STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
498 		STRUCT_FSET(ds, msg_stime, qp->msg_stime);
499 		STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
500 		STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
501 		break;
502 
503 	case IPC_SET64:
504 		mutex_enter(&pp->p_lock);
505 		if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
506 		    secpolicy_ipc_config(cr) != 0 &&
507 		    rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
508 		    ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
509 			mutex_exit(&pp->p_lock);
510 			mutex_exit(lock);
511 			return (set_errno(EPERM));
512 		}
513 		mutex_exit(&pp->p_lock);
514 		if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
515 		    &ds64.msgx_perm)) {
516 			mutex_exit(lock);
517 			return (set_errno(error));
518 		}
519 		qp->msg_qbytes = ds64.msgx_qbytes;
520 		qp->msg_ctime = gethrestime_sec();
521 		break;
522 
523 	case IPC_STAT64:
524 		if (qp->msg_rcv_cnt)
525 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
526 		if (qp->msg_snd_cnt)
527 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
528 		ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
529 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
530 		ds64.msgx_cbytes = qp->msg_cbytes;
531 		ds64.msgx_qnum = qp->msg_qnum;
532 		ds64.msgx_qbytes = qp->msg_qbytes;
533 		ds64.msgx_lspid = qp->msg_lspid;
534 		ds64.msgx_lrpid = qp->msg_lrpid;
535 		ds64.msgx_stime = qp->msg_stime;
536 		ds64.msgx_rtime = qp->msg_rtime;
537 		ds64.msgx_ctime = qp->msg_ctime;
538 		break;
539 
540 	default:
541 		mutex_exit(lock);
542 		return (set_errno(EINVAL));
543 	}
544 
545 	mutex_exit(lock);
546 
547 	/*
548 	 * Do copyout last (after releasing mutex).
549 	 */
550 	switch (cmd) {
551 	case IPC_STAT:
552 		if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
553 			return (set_errno(EFAULT));
554 		break;
555 
556 	case IPC_STAT64:
557 		if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
558 			return (set_errno(EFAULT));
559 		break;
560 	}
561 
562 	return (0);
563 }
564 
565 /*
566  * Remove all message queues associated with a given zone.  Called by
567  * zone_shutdown when the zone is halted.
568  */
569 /*ARGSUSED1*/
570 static void
571 msg_remove_zone(zoneid_t zoneid, void *arg)
572 {
573 	ipc_remove_zone(msq_svc, zoneid);
574 }
575 
576 /*
577  * msgget system call.
578  */
579 static int
580 msgget(key_t key, int msgflg)
581 {
582 	kmsqid_t	*qp;
583 	kmutex_t	*lock;
584 	int		id, error;
585 	int		ii;
586 	proc_t		*pp = curproc;
587 
588 top:
589 	if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
590 		return (set_errno(error));
591 
592 	if (IPC_FREE(&qp->msg_perm)) {
593 		mutex_exit(lock);
594 		mutex_exit(&pp->p_lock);
595 
596 		list_create(&qp->msg_list, sizeof (struct msg),
597 		    offsetof(struct msg, msg_node));
598 		qp->msg_qnum = 0;
599 		qp->msg_lspid = qp->msg_lrpid = 0;
600 		qp->msg_stime = qp->msg_rtime = 0;
601 		qp->msg_ctime = gethrestime_sec();
602 		qp->msg_ngt_cnt = 0;
603 		qp->msg_neg_copy = 0;
604 		for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
605 			list_create(&qp->msg_wait_snd[ii],
606 			    sizeof (msgq_wakeup_t),
607 			    offsetof(msgq_wakeup_t, msgw_list));
608 			list_create(&qp->msg_wait_snd_ngt[ii],
609 			    sizeof (msgq_wakeup_t),
610 			    offsetof(msgq_wakeup_t, msgw_list));
611 		}
612 		/*
613 		 * The proper initialization of msg_lowest_type is to the
614 		 * highest possible value.  By doing this we guarantee that
615 		 * when the first send happens, the lowest type will be set
616 		 * properly.
617 		 */
618 		qp->msg_lowest_type = LONG_MAX;
619 		list_create(&qp->msg_cpy_block,
620 		    sizeof (msgq_wakeup_t),
621 		    offsetof(msgq_wakeup_t, msgw_list));
622 		qp->msg_fnd_sndr = &msg_fnd_sndr[0];
623 		qp->msg_fnd_rdr = &msg_fnd_rdr[0];
624 		qp->msg_rcv_cnt = 0;
625 		qp->msg_snd_cnt = 0;
626 
627 		if (error = ipc_commit_begin(msq_svc, key, msgflg,
628 		    (kipc_perm_t *)qp)) {
629 			if (error == EAGAIN)
630 				goto top;
631 			return (set_errno(error));
632 		}
633 		qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
634 		    pp->p_rctls, pp);
635 		qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
636 		    pp->p_rctls, pp);
637 		lock = ipc_commit_end(msq_svc, &qp->msg_perm);
638 	}
639 	if (audit_active)
640 		audit_ipcget(AT_IPC_MSG, (void *)qp);
641 	id = qp->msg_perm.ipc_id;
642 	mutex_exit(lock);
643 	return (id);
644 }
645 
646 static ssize_t
647 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
648 {
649 	struct msg	*smp;	/* ptr to best msg on q */
650 	kmsqid_t	*qp;	/* ptr to associated q */
651 	kmutex_t	*lock;
652 	size_t		xtsz;	/* transfer byte count */
653 	int		error = 0;
654 	int		cvres;
655 	uint_t		msg_hash;
656 	msgq_wakeup_t	msg_entry;
657 
658 	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
659 
660 	msg_hash = msg_type_hash(msgtyp);
661 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
662 		return ((ssize_t)set_errno(EINVAL));
663 	}
664 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
665 
666 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
667 		goto msgrcv_out;
668 	}
669 
670 	/*
671 	 * Various information (including the condvar_t) required for the
672 	 * process to sleep is provided by it's stack.
673 	 */
674 	msg_entry.msgw_thrd = curthread;
675 	msg_entry.msgw_snd_wake = 0;
676 	msg_entry.msgw_type = msgtyp;
677 findmsg:
678 	smp = msgrcv_lookup(qp, msgtyp);
679 
680 	if (smp) {
681 		/*
682 		 * We found a possible message to copy out.
683 		 */
684 		if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
685 			long t = msg_entry.msgw_snd_wake;
686 			long copy_type = smp->msg_type;
687 
688 			/*
689 			 * It is available, attempt to copy it.
690 			 */
691 			error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
692 			    smp, msgp, msgflg);
693 
694 			/*
695 			 * It is possible to consume a different message
696 			 * type then what originally awakened for (negative
697 			 * types).  If this happens a check must be done to
698 			 * to determine if another receiver is available
699 			 * for the waking message type,  Failure to do this
700 			 * can result in a message on the queue that can be
701 			 * serviced by a sleeping receiver.
702 			 */
703 			if (!error && t && (copy_type != t))
704 				msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
705 
706 			/*
707 			 * Don't forget to wakeup a sleeper that blocked because
708 			 * we were copying things out.
709 			 */
710 			msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
711 			goto msgrcv_out;
712 		}
713 		/*
714 		 * The selected message is being copied out, so block.  We do
715 		 * not need to wake the next person up on the msg_cpy_block list
716 		 * due to the fact some one is copying out and they will get
717 		 * things moving again once the copy is completed.
718 		 */
719 		cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
720 		    &msg_entry, &lock, qp);
721 		error = msgq_check_err(qp, cvres);
722 		if (error) {
723 			goto msgrcv_out;
724 		}
725 		goto findmsg;
726 	}
727 	/*
728 	 * There isn't a message to copy out that matches the designated
729 	 * criteria.
730 	 */
731 	if (msgflg & IPC_NOWAIT) {
732 		error = ENOMSG;
733 		goto msgrcv_out;
734 	}
735 	msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
736 
737 	/*
738 	 * Wait for new message.  We keep the negative and positive types
739 	 * separate for performance reasons.
740 	 */
741 	msg_entry.msgw_snd_wake = 0;
742 	if (msgtyp >= 0) {
743 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
744 		    &msg_entry, &lock, qp);
745 	} else {
746 		qp->msg_ngt_cnt++;
747 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
748 		    &msg_entry, &lock, qp);
749 		qp->msg_ngt_cnt--;
750 	}
751 
752 	if (!(error = msgq_check_err(qp, cvres))) {
753 		goto findmsg;
754 	}
755 
756 msgrcv_out:
757 	if (error) {
758 		msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
759 		if (msg_entry.msgw_snd_wake) {
760 			msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
761 			    msg_entry.msgw_snd_wake);
762 		}
763 		ipc_rele(msq_svc, (kipc_perm_t *)qp);
764 		return ((ssize_t)set_errno(error));
765 	}
766 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
767 	return ((ssize_t)xtsz);
768 }
769 
770 static int
771 msgq_check_err(kmsqid_t *qp, int cvres)
772 {
773 	if (IPC_FREE(&qp->msg_perm)) {
774 		return (EIDRM);
775 	}
776 
777 	if (cvres == 0) {
778 		return (EINTR);
779 	}
780 
781 	return (0);
782 }
783 
784 static int
785 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
786     size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
787 {
788 	size_t		xtsz;
789 	STRUCT_HANDLE(ipcmsgbuf, umsgp);
790 	model_t		mdl = get_udatamodel();
791 	int		copyerror = 0;
792 
793 	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
794 	if (msgsz < smp->msg_size) {
795 		if ((msgflg & MSG_NOERROR) == 0) {
796 			return (E2BIG);
797 		} else {
798 			xtsz = msgsz;
799 		}
800 	} else {
801 		xtsz = smp->msg_size;
802 	}
803 	*xtsz_ret = xtsz;
804 
805 	/*
806 	 * To prevent a DOS attack we mark the message as being
807 	 * copied out and release mutex.  When the copy is completed
808 	 * we need to acquire the mutex and make the appropriate updates.
809 	 */
810 	ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
811 	smp->msg_flags |= MSG_RCVCOPY;
812 	msg_hold(smp);
813 	if (msgtyp < 0) {
814 		ASSERT(qp->msg_neg_copy == 0);
815 		qp->msg_neg_copy = 1;
816 	}
817 	mutex_exit(*lock);
818 
819 	if (mdl == DATAMODEL_NATIVE) {
820 		copyerror = copyout(&smp->msg_type, msgp,
821 		    sizeof (smp->msg_type));
822 	} else {
823 		/*
824 		 * 32-bit callers need an imploded msg type.
825 		 */
826 		int32_t	msg_type32 = smp->msg_type;
827 
828 		copyerror = copyout(&msg_type32, msgp,
829 		    sizeof (msg_type32));
830 	}
831 
832 	if (copyerror == 0 && xtsz) {
833 		copyerror = copyout(smp->msg_addr,
834 		    STRUCT_FADDR(umsgp, mtext), xtsz);
835 	}
836 
837 	/*
838 	 * Reclaim the mutex and make sure the message queue still exists.
839 	 */
840 
841 	*lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
842 	if (msgtyp < 0) {
843 		qp->msg_neg_copy = 0;
844 	}
845 	ASSERT(smp->msg_flags & MSG_RCVCOPY);
846 	smp->msg_flags &= ~MSG_RCVCOPY;
847 	msg_rele(smp);
848 	if (IPC_FREE(&qp->msg_perm)) {
849 		return (EIDRM);
850 	}
851 	if (copyerror) {
852 		return (EFAULT);
853 	}
854 	qp->msg_lrpid = ttoproc(curthread)->p_pid;
855 	qp->msg_rtime = gethrestime_sec();
856 	msgunlink(qp, smp);
857 	return (0);
858 }
859 
860 static struct msg *
861 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
862 {
863 	struct msg 		*smp = NULL;
864 	long			qp_low;
865 	struct msg		*mp;	/* ptr to msg on q */
866 	long			low_msgtype;
867 	static struct msg	neg_copy_smp;
868 
869 	mp = list_head(&qp->msg_list);
870 	if (msgtyp == 0) {
871 		smp = mp;
872 	} else {
873 		qp_low = qp->msg_lowest_type;
874 		if (msgtyp > 0) {
875 			/*
876 			 * If our lowest possible message type is larger than
877 			 * the message type desired, then we know there is
878 			 * no entry present.
879 			 */
880 			if (qp_low > msgtyp) {
881 				return (NULL);
882 			}
883 
884 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
885 				if (msgtyp == mp->msg_type) {
886 					smp = mp;
887 					break;
888 				}
889 			}
890 		} else {
891 			/*
892 			 * We have kept track of the lowest possible message
893 			 * type on the send queue.  This allows us to terminate
894 			 * the search early if we find a message type of that
895 			 * type.  Note, the lowest type may not be the actual
896 			 * lowest value in the system, it is only guaranteed
897 			 * that there isn't a value lower than that.
898 			 */
899 			low_msgtype = -msgtyp;
900 			if (low_msgtype < qp_low) {
901 				return (NULL);
902 			}
903 			if (qp->msg_neg_copy) {
904 				neg_copy_smp.msg_flags = MSG_RCVCOPY;
905 				return (&neg_copy_smp);
906 			}
907 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
908 				if (mp->msg_type <= low_msgtype &&
909 				    !(smp && smp->msg_type <= mp->msg_type)) {
910 					smp = mp;
911 					low_msgtype = mp->msg_type;
912 					if (low_msgtype == qp_low) {
913 						break;
914 					}
915 				}
916 			}
917 			if (smp) {
918 				/*
919 				 * Update the lowest message type.
920 				 */
921 				qp->msg_lowest_type = smp->msg_type;
922 			}
923 		}
924 	}
925 	return (smp);
926 }
927 
928 /*
929  * msgids system call.
930  */
931 static int
932 msgids(int *buf, uint_t nids, uint_t *pnids)
933 {
934 	int error;
935 
936 	if (error = ipc_ids(msq_svc, buf, nids, pnids))
937 		return (set_errno(error));
938 
939 	return (0);
940 }
941 
942 #define	RND(x)		roundup((x), sizeof (size_t))
943 #define	RND32(x)	roundup((x), sizeof (size32_t))
944 
945 /*
946  * msgsnap system call.
947  */
948 static int
949 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
950 {
951 	struct msg	*mp;	/* ptr to msg on q */
952 	kmsqid_t	*qp;	/* ptr to associated q */
953 	kmutex_t	*lock;
954 	size_t		size;
955 	size_t		nmsg;
956 	struct msg	**snaplist;
957 	int		error, i;
958 	model_t		mdl = get_udatamodel();
959 	STRUCT_DECL(msgsnap_head, head);
960 	STRUCT_DECL(msgsnap_mhead, mhead);
961 
962 	STRUCT_INIT(head, mdl);
963 	STRUCT_INIT(mhead, mdl);
964 
965 	if (bufsz < STRUCT_SIZE(head))
966 		return (set_errno(EINVAL));
967 
968 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
969 		return (set_errno(EINVAL));
970 
971 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
972 		mutex_exit(lock);
973 		return (set_errno(error));
974 	}
975 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
976 
977 	/*
978 	 * First compute the required buffer size and
979 	 * the number of messages on the queue.
980 	 */
981 	size = nmsg = 0;
982 	for (mp = list_head(&qp->msg_list); mp;
983 	    mp = list_next(&qp->msg_list, mp)) {
984 		if (msgtyp == 0 ||
985 		    (msgtyp > 0 && msgtyp == mp->msg_type) ||
986 		    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
987 			nmsg++;
988 			if (mdl == DATAMODEL_NATIVE)
989 				size += RND(mp->msg_size);
990 			else
991 				size += RND32(mp->msg_size);
992 		}
993 	}
994 
995 	size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
996 	if (size > bufsz)
997 		nmsg = 0;
998 
999 	if (nmsg > 0) {
1000 		/*
1001 		 * Mark the messages as being copied.
1002 		 */
1003 		snaplist = (struct msg **)kmem_alloc(nmsg *
1004 		    sizeof (struct msg *), KM_SLEEP);
1005 		i = 0;
1006 		for (mp = list_head(&qp->msg_list); mp;
1007 		    mp = list_next(&qp->msg_list, mp)) {
1008 			if (msgtyp == 0 ||
1009 			    (msgtyp > 0 && msgtyp == mp->msg_type) ||
1010 			    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1011 				msg_hold(mp);
1012 				snaplist[i] = mp;
1013 				i++;
1014 			}
1015 		}
1016 	}
1017 	mutex_exit(lock);
1018 
1019 	/*
1020 	 * Copy out the buffer header.
1021 	 */
1022 	STRUCT_FSET(head, msgsnap_size, size);
1023 	STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1024 	if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1025 		error = EFAULT;
1026 
1027 	buf += STRUCT_SIZE(head);
1028 
1029 	/*
1030 	 * Now copy out the messages one by one.
1031 	 */
1032 	for (i = 0; i < nmsg; i++) {
1033 		mp = snaplist[i];
1034 		if (error == 0) {
1035 			STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1036 			STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1037 			if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1038 				error = EFAULT;
1039 			buf += STRUCT_SIZE(mhead);
1040 
1041 			if (error == 0 &&
1042 			    mp->msg_size != 0 &&
1043 			    copyout(mp->msg_addr, buf, mp->msg_size))
1044 				error = EFAULT;
1045 			if (mdl == DATAMODEL_NATIVE)
1046 				buf += RND(mp->msg_size);
1047 			else
1048 				buf += RND32(mp->msg_size);
1049 		}
1050 		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1051 		msg_rele(mp);
1052 		/* Check for msg q deleted or reallocated */
1053 		if (IPC_FREE(&qp->msg_perm))
1054 			error = EIDRM;
1055 		mutex_exit(lock);
1056 	}
1057 
1058 	(void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1059 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
1060 
1061 	if (nmsg > 0)
1062 		kmem_free(snaplist, nmsg * sizeof (struct msg *));
1063 
1064 	if (error)
1065 		return (set_errno(error));
1066 	return (0);
1067 }
1068 
1069 #define	MSG_PREALLOC_LIMIT 8192
1070 
1071 /*
1072  * msgsnd system call.
1073  */
1074 static int
1075 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1076 {
1077 	kmsqid_t	*qp;
1078 	kmutex_t	*lock = NULL;
1079 	struct msg	*mp = NULL;
1080 	long		type;
1081 	int		error = 0;
1082 	model_t		mdl = get_udatamodel();
1083 	STRUCT_HANDLE(ipcmsgbuf, umsgp);
1084 
1085 	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
1086 	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1087 
1088 	if (mdl == DATAMODEL_NATIVE) {
1089 		if (copyin(msgp, &type, sizeof (type)))
1090 			return (set_errno(EFAULT));
1091 	} else {
1092 		int32_t	type32;
1093 		if (copyin(msgp, &type32, sizeof (type32)))
1094 			return (set_errno(EFAULT));
1095 		type = type32;
1096 	}
1097 
1098 	if (type < 1)
1099 		return (set_errno(EINVAL));
1100 
1101 	/*
1102 	 * We want the value here large enough that most of the
1103 	 * the message operations will use the "lockless" path,
1104 	 * but small enough that a user can not reserve large
1105 	 * chunks of kernel memory unless they have a valid
1106 	 * reason to.
1107 	 */
1108 	if (msgsz <= MSG_PREALLOC_LIMIT) {
1109 		/*
1110 		 * We are small enough that we can afford to do the
1111 		 * allocation now.  This saves dropping the lock
1112 		 * and then reacquiring the lock.
1113 		 */
1114 		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1115 		mp->msg_copycnt = 1;
1116 		mp->msg_size = msgsz;
1117 		if (msgsz) {
1118 			mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1119 			if (copyin(STRUCT_FADDR(umsgp, mtext),
1120 			    mp->msg_addr, msgsz) == -1) {
1121 				error = EFAULT;
1122 				goto msgsnd_out;
1123 			}
1124 		}
1125 	}
1126 
1127 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1128 		error = EINVAL;
1129 		goto msgsnd_out;
1130 	}
1131 
1132 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
1133 
1134 	if (msgsz > qp->msg_qbytes) {
1135 		error = EINVAL;
1136 		goto msgsnd_out;
1137 	}
1138 
1139 	if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1140 		goto msgsnd_out;
1141 
1142 top:
1143 	/*
1144 	 * Allocate space on q, message header, & buffer space.
1145 	 */
1146 	ASSERT(qp->msg_qnum <= qp->msg_qmax);
1147 	while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1148 	    (qp->msg_qnum == qp->msg_qmax)) {
1149 		int cvres;
1150 
1151 		if (msgflg & IPC_NOWAIT) {
1152 			error = EAGAIN;
1153 			goto msgsnd_out;
1154 		}
1155 
1156 		qp->msg_snd_cnt++;
1157 		cvres = cv_wait_sig(&qp->msg_snd_cv, lock);
1158 		lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1159 		qp->msg_snd_cnt--;
1160 
1161 		if (error = msgq_check_err(qp, cvres)) {
1162 			goto msgsnd_out;
1163 		}
1164 	}
1165 
1166 	if (mp == NULL) {
1167 		int failure;
1168 
1169 		mutex_exit(lock);
1170 		ASSERT(msgsz > 0);
1171 		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1172 		mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1173 		mp->msg_size = msgsz;
1174 		mp->msg_copycnt = 1;
1175 
1176 		failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1177 		    mp->msg_addr, msgsz) == -1);
1178 		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1179 		if (IPC_FREE(&qp->msg_perm)) {
1180 			error = EIDRM;
1181 			goto msgsnd_out;
1182 		}
1183 		if (failure) {
1184 			error = EFAULT;
1185 			goto msgsnd_out;
1186 		}
1187 		goto top;
1188 	}
1189 
1190 	/*
1191 	 * Everything is available, put msg on q.
1192 	 */
1193 	qp->msg_qnum++;
1194 	qp->msg_cbytes += msgsz;
1195 	qp->msg_lspid = curproc->p_pid;
1196 	qp->msg_stime = gethrestime_sec();
1197 	mp->msg_type = type;
1198 	if (qp->msg_lowest_type > type)
1199 		qp->msg_lowest_type = type;
1200 	list_insert_tail(&qp->msg_list, mp);
1201 	/*
1202 	 * Get the proper receiver going.
1203 	 */
1204 	msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1205 
1206 msgsnd_out:
1207 	if (lock)
1208 		ipc_rele(msq_svc, (kipc_perm_t *)qp);	/* drops lock */
1209 
1210 	if (error) {
1211 		if (mp)
1212 			msg_rele(mp);
1213 		return (set_errno(error));
1214 	}
1215 
1216 	return (0);
1217 }
1218 
1219 static void
1220 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1221 {
1222 	msg_select_t	*walker = *flist;
1223 	msgq_wakeup_t	*wakeup;
1224 	uint_t		msg_hash;
1225 
1226 	msg_hash = msg_type_hash(type);
1227 
1228 	do {
1229 		wakeup = walker->selection(qp, msg_hash, type);
1230 		walker = walker->next_selection;
1231 	} while (!wakeup && walker != *flist);
1232 
1233 	*flist = (*flist)->next_selection;
1234 	if (wakeup) {
1235 		if (type) {
1236 			wakeup->msgw_snd_wake = type;
1237 		}
1238 		cv_signal(&wakeup->msgw_wake_cv);
1239 	}
1240 }
1241 
1242 static uint_t
1243 msg_type_hash(long msg_type)
1244 {
1245 	if (msg_type < 0) {
1246 		long	hash = -msg_type / MSG_NEG_INTERVAL;
1247 		/*
1248 		 * Negative message types are hashed over an
1249 		 * interval.  Any message type that hashes
1250 		 * beyond MSG_MAX_QNUM is automatically placed
1251 		 * in the last bucket.
1252 		 */
1253 		if (hash > MSG_MAX_QNUM)
1254 			hash = MSG_MAX_QNUM;
1255 		return (hash);
1256 	}
1257 
1258 	/*
1259 	 * 0 or positive message type.  The first bucket is reserved for
1260 	 * message receivers of type 0, the other buckets we hash into.
1261 	 */
1262 	if (msg_type)
1263 		return (1 + (msg_type % MSG_MAX_QNUM));
1264 	return (0);
1265 }
1266 
1267 /*
1268  * Routines to see if we have a receiver of type 0 either blocked waiting
1269  * for a message.  Simply return the first guy on the list.
1270  */
1271 
1272 static msgq_wakeup_t *
1273 /* ARGSUSED */
1274 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1275 {
1276 	msgq_wakeup_t	*walker;
1277 
1278 	walker = list_head(&qp->msg_wait_snd[0]);
1279 
1280 	if (walker)
1281 		list_remove(&qp->msg_wait_snd[0], walker);
1282 	return (walker);
1283 }
1284 
1285 static msgq_wakeup_t *
1286 /* ARGSUSED */
1287 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1288 {
1289 	msgq_wakeup_t	*walker;
1290 
1291 	walker = list_head(&qp->msg_cpy_block);
1292 	if (walker)
1293 		list_remove(&qp->msg_cpy_block, walker);
1294 	return (walker);
1295 }
1296 
1297 static msgq_wakeup_t *
1298 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1299 {
1300 	msgq_wakeup_t	*walker;
1301 
1302 	walker = list_head(&qp->msg_wait_snd[msg_hash]);
1303 
1304 	while (walker && walker->msgw_type != type)
1305 		walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1306 	if (walker)
1307 		list_remove(&qp->msg_wait_snd[msg_hash], walker);
1308 	return (walker);
1309 }
1310 
1311 /* ARGSUSED */
1312 static msgq_wakeup_t *
1313 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1314 {
1315 	msgq_wakeup_t	*qptr;
1316 	int		count;
1317 	int		check_index;
1318 	int		neg_index;
1319 	int		nbuckets;
1320 
1321 	if (!qp->msg_ngt_cnt) {
1322 		return (NULL);
1323 	}
1324 	neg_index = msg_type_hash(-type);
1325 
1326 	/*
1327 	 * Check for a match among the negative type queues.  Any buckets
1328 	 * at neg_index or larger can match the type.  Use the last send
1329 	 * time to randomize the starting bucket to prevent starvation.
1330 	 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1331 	 * from the random starting point, and wrapping around after
1332 	 * MSG_MAX_QNUM.
1333 	 */
1334 
1335 	nbuckets = MSG_MAX_QNUM - neg_index + 1;
1336 	check_index = neg_index + (qp->msg_stime % nbuckets);
1337 
1338 	for (count = nbuckets; count > 0; count--) {
1339 		qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1340 		while (qptr) {
1341 			/*
1342 			 * The lowest hash bucket may actually contain
1343 			 * message types that are not valid for this
1344 			 * request.  This can happen due to the fact that
1345 			 * the message buckets actually contain a consecutive
1346 			 * range of types.
1347 			 */
1348 			if (-qptr->msgw_type >= type) {
1349 				list_remove(&qp->msg_wait_snd_ngt[check_index],
1350 				    qptr);
1351 				return (qptr);
1352 			}
1353 			qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1354 			    qptr);
1355 		}
1356 		if (++check_index > MSG_MAX_QNUM) {
1357 			check_index = neg_index;
1358 		}
1359 	}
1360 	return (NULL);
1361 }
1362 
1363 static int
1364 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1365     kmsqid_t *qp)
1366 {
1367 	int		cvres;
1368 
1369 	cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1370 
1371 	list_insert_tail(queue, entry);
1372 
1373 	qp->msg_rcv_cnt++;
1374 	cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1375 	*lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1376 	qp->msg_rcv_cnt--;
1377 
1378 	if (list_link_active(&entry->msgw_list)) {
1379 		/*
1380 		 * We woke up unexpectedly, remove ourself.
1381 		 */
1382 		list_remove(queue, entry);
1383 	}
1384 
1385 	return (cvres);
1386 }
1387 
1388 static void
1389 msg_rcvq_wakeup_all(list_t *q_ptr)
1390 {
1391 	msgq_wakeup_t	*q_walk;
1392 
1393 	while (q_walk = list_head(q_ptr)) {
1394 		list_remove(q_ptr, q_walk);
1395 		cv_signal(&q_walk->msgw_wake_cv);
1396 	}
1397 }
1398 
1399 /*
1400  * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1401  * system calls.
1402  */
1403 static ssize_t
1404 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1405 	uintptr_t a4, uintptr_t a5)
1406 {
1407 	ssize_t error;
1408 
1409 	switch (opcode) {
1410 	case MSGGET:
1411 		error = msgget((key_t)a1, (int)a2);
1412 		break;
1413 	case MSGCTL:
1414 		error = msgctl((int)a1, (int)a2, (void *)a3);
1415 		break;
1416 	case MSGRCV:
1417 		error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1418 		    (size_t)a3, (long)a4, (int)a5);
1419 		break;
1420 	case MSGSND:
1421 		error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1422 		    (size_t)a3, (int)a4);
1423 		break;
1424 	case MSGIDS:
1425 		error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1426 		break;
1427 	case MSGSNAP:
1428 		error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1429 		break;
1430 	default:
1431 		error = set_errno(EINVAL);
1432 		break;
1433 	}
1434 
1435 	return (error);
1436 }
1437 
1438 #ifdef	_SYSCALL32_IMPL
1439 /*
1440  * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441  * system calls for 32-bit callers on LP64 kernel.
1442  */
1443 static ssize32_t
1444 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1445 	uint32_t a4, uint32_t a5)
1446 {
1447 	ssize_t error;
1448 
1449 	switch (opcode) {
1450 	case MSGGET:
1451 		error = msgget((key_t)a1, (int)a2);
1452 		break;
1453 	case MSGCTL:
1454 		error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1455 		break;
1456 	case MSGRCV:
1457 		error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1458 		    (size_t)a3, (long)(int32_t)a4, (int)a5);
1459 		break;
1460 	case MSGSND:
1461 		error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1462 		    (size_t)(int32_t)a3, (int)a4);
1463 		break;
1464 	case MSGIDS:
1465 		error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1466 		    (uint_t *)(uintptr_t)a3);
1467 		break;
1468 	case MSGSNAP:
1469 		error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1470 		    (long)(int32_t)a4);
1471 		break;
1472 	default:
1473 		error = set_errno(EINVAL);
1474 		break;
1475 	}
1476 
1477 	return (error);
1478 }
1479 #endif	/* SYSCALL32_IMPL */
1480