xref: /illumos-gate/usr/src/uts/common/os/msg.c (revision 6d02032db7b674f185405d42cc8bf10a46a9ab3a)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
27 /*	  All Rights Reserved  	*/
28 
29 
30 /*
31  * Inter-Process Communication Message Facility.
32  *
33  * See os/ipc.c for a description of common IPC functionality.
34  *
35  * Resource controls
36  * -----------------
37  *
38  * Control:      zone.max-msg-ids (rc_zone_msgmni)
39  * Description:  Maximum number of message queue ids allowed a zone.
40  *
41  *   When msgget() is used to allocate a message queue, one id is
42  *   allocated.  If the id allocation doesn't succeed, msgget() fails
43  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
44  *   the id is deallocated.
45  *
46  * Control:      project.max-msg-ids (rc_project_msgmni)
47  * Description:  Maximum number of message queue ids allowed a project.
48  *
49  *   When msgget() is used to allocate a message queue, one id is
50  *   allocated.  If the id allocation doesn't succeed, msgget() fails
51  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
52  *   the id is deallocated.
53  *
54  * Control:      process.max-msg-qbytes (rc_process_msgmnb)
55  * Description:  Maximum number of bytes of messages on a message queue.
56  *
57  *   When msgget() successfully allocates a message queue, the minimum
58  *   enforced value of this limit is used to initialize msg_qbytes.
59  *
60  * Control:      process.max-msg-messages (rc_process_msgtql)
61  * Description:  Maximum number of messages on a message queue.
62  *
63  *   When msgget() successfully allocates a message queue, the minimum
64  *   enforced value of this limit is used to initialize a per-queue
65  *   limit on the number of messages.
66  */
67 
68 #include <sys/types.h>
69 #include <sys/t_lock.h>
70 #include <sys/param.h>
71 #include <sys/cred.h>
72 #include <sys/user.h>
73 #include <sys/proc.h>
74 #include <sys/time.h>
75 #include <sys/ipc.h>
76 #include <sys/ipc_impl.h>
77 #include <sys/msg.h>
78 #include <sys/msg_impl.h>
79 #include <sys/list.h>
80 #include <sys/systm.h>
81 #include <sys/sysmacros.h>
82 #include <sys/cpuvar.h>
83 #include <sys/kmem.h>
84 #include <sys/ddi.h>
85 #include <sys/errno.h>
86 #include <sys/cmn_err.h>
87 #include <sys/debug.h>
88 #include <sys/project.h>
89 #include <sys/modctl.h>
90 #include <sys/syscall.h>
91 #include <sys/policy.h>
92 #include <sys/zone.h>
93 
94 #include <c2/audit.h>
95 
96 /*
97  * The following tunables are obsolete.  Though for compatibility we
98  * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
99  * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
100  * mechanism for administrating the IPC Message facility is through the
101  * resource controls described at the top of this file.
102  */
103 size_t	msginfo_msgmax = 2048;	/* (obsolete) */
104 size_t	msginfo_msgmnb = 4096;	/* (obsolete) */
105 int	msginfo_msgmni = 50;	/* (obsolete) */
106 int	msginfo_msgtql = 40;	/* (obsolete) */
107 int	msginfo_msgssz = 8;	/* (obsolete) */
108 int	msginfo_msgmap = 0;	/* (obsolete) */
109 ushort_t msginfo_msgseg = 1024;	/* (obsolete) */
110 
111 extern rctl_hndl_t rc_zone_msgmni;
112 extern rctl_hndl_t rc_project_msgmni;
113 extern rctl_hndl_t rc_process_msgmnb;
114 extern rctl_hndl_t rc_process_msgtql;
115 static ipc_service_t *msq_svc;
116 static zone_key_t msg_zone_key;
117 
118 static void msg_dtor(kipc_perm_t *);
119 static void msg_rmid(kipc_perm_t *);
120 static void msg_remove_zone(zoneid_t, void *);
121 
122 /*
123  * Module linkage information for the kernel.
124  */
125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
126 	uintptr_t a4, uintptr_t a5);
127 
128 static struct sysent ipcmsg_sysent = {
129 	6,
130 #ifdef	_LP64
131 	SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
132 #else
133 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
134 #endif
135 	(int (*)())msgsys
136 };
137 
138 #ifdef	_SYSCALL32_IMPL
139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
140 	uint32_t a4, uint32_t a5);
141 
142 static struct sysent ipcmsg_sysent32 = {
143 	6,
144 	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
145 	(int (*)())msgsys32
146 };
147 #endif	/* _SYSCALL32_IMPL */
148 
149 static struct modlsys modlsys = {
150 	&mod_syscallops, "System V message facility", &ipcmsg_sysent
151 };
152 
153 #ifdef _SYSCALL32_IMPL
154 static struct modlsys modlsys32 = {
155 	&mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
156 };
157 #endif
158 
159 /*
160  *      Big Theory statement for message queue correctness
161  *
162  * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
163  * receivers who are waiting for an event.  Using the cv_broadcast method
164  * resulted in negative scaling when the number of waiting receivers are large
165  * (the thundering herd problem).  Instead, the receivers waiting to receive a
166  * message are now linked in a queue-like fashion and awaken one at a time in
167  * a controlled manner.
168  *
169  * Receivers can block on two different classes of waiting list:
170  *    1) "sendwait" list, which is the more complex list of the two.  The
171  *	  receiver will be awakened by a sender posting a new message.  There
172  *	  are two types of "sendwait" list used:
173  *		a) msg_wait_snd: handles all receivers who are looking for
174  *		   a message type >= 0, but was unable to locate a match.
175  *
176  *		   slot 0: reserved for receivers that have designated they
177  *			   will take any message type.
178  *		   rest:   consist of receivers requesting a specific type
179  *			   but the type was not present.  The entries are
180  *			   hashed into a bucket in an attempt to keep
181  *			   any list search relatively short.
182  * 		b) msg_wait_snd_ngt: handles all receivers that have designated
183  *		   a negative message type. Unlike msg_wait_snd, the hash bucket
184  *		   serves a range of negative message types (-1 to -5, -6 to -10
185  *		   and so forth), where the last bucket is reserved for all the
186  *		   negative message types that hash outside of MSG_MAX_QNUM - 1.
187  *		   This is done this way to simplify the operation of locating a
188  *		   negative message type.
189  *
190  *    2) "copyout" list, where the receiver is awakened by another
191  *	 receiver after a message is copied out.  This is a linked list
192  *	 of waiters that are awakened one at a time.  Although the solution is
193  *	 not optimal, the complexity that would be added in for waking
194  *	 up the right entry far exceeds any potential pay back (too many
195  *	 correctness and corner case issues).
196  *
197  * The lists are doubly linked.  In the case of the "sendwait"
198  * list, this allows the thread to remove itself from the list without having
199  * to traverse the list.  In the case of the "copyout" list it simply allows
200  * us to use common functions with the "sendwait" list.
201  *
202  * To make sure receivers are not hung out to dry, we must guarantee:
203  *    1. If any queued message matches any receiver, then at least one
204  *       matching receiver must be processing the request.
205  *    2. Blocking on the copyout queue is only temporary while messages
206  *	 are being copied out.  The process is guaranted to wakeup
207  *	 when it gets to front of the queue (copyout is a FIFO).
208  *
209  * Rules for blocking and waking up:
210  *   1. A receiver entering msgrcv must examine all messages for a match
211  *      before blocking on a sendwait queue.
212  *   2. If the receiver blocks because the message it chose is already
213  *	being copied out, then when it wakes up needs to start start
214  *	checking the messages from the beginning.
215  *   3) When ever a process returns from msgrcv for any reason, if it
216  *	had attempted to copy a message or blocked waiting for a copy
217  *	to complete it needs to wakeup the next receiver blocked on
218  *	a copy out.
219  *   4) When a message is sent, the sender selects a process waiting
220  *	for that type of message.  This selection process rotates between
221  *	receivers types of 0, negative and positive to prevent starvation of
222  *	any one particular receiver type.
223  *   5) The following are the scenarios for processes that are awakened
224  *	by a msgsnd:
225  *		a) The process finds the message and is able to copy
226  *		   it out.  Once complete, the process returns.
227  *		b) The message that was sent that triggered the wakeup is no
228  *		   longer available (another process found the message first).
229  *		   We issue a wakeup on copy queue and then go back to
230  *		   sleep waiting for another matching message to be sent.
231  *		c) The message that was supposed to be processed was
232  *		   already serviced by another process.  However a different
233  *		   message is present which we can service.  The message
234  *		   is copied and the process returns.
235  *		d) The message is found, but some sort of error occurs that
236  *		   prevents the message from being copied.  The receiver
237  *		   wakes up the next sender that can service this message
238  *		   type and returns an error to the caller.
239  *		e) The message is found, but it is marked as being copied
240  *		   out.  The receiver then goes to sleep on the copyout
241  *		   queue where it will be awakened again sometime in the future.
242  *
243  *
244  *   6) Whenever a message is found that matches the message type designated,
245  * 	but is being copied out we have to block on the copyout queue.
246  *	After process copying finishes the copy out, it  must wakeup (either
247  *	directly or indirectly) all receivers who blocked on its copyout,
248  *	so they are guaranteed a chance to examine the remaining messages.
249  *	This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
250  *	and so on.  The chain cannot be broken.  This leads to the following
251  *	cases:
252  *		a) A receiver is finished copying the message (or encountered)
253  *		   an error), the first entry on the copyout queue is woken
254  *		   up.
255  *		b) When the receiver is woken up, it attempts to locate
256  *		   a message type match.
257  *		c) If a message type is found and
258  *			-- MSG_RCVCOPY flag is not set, the message is
259  *			   marked for copying out.  Regardless of the copyout
260  *			   success the next entry on the copyout queue is
261  *			   awakened and the operation is completed.
262  *			-- MSG_RCVCOPY is set, we simply go back to sleep again
263  *			   on the copyout queue.
264  *		d) If the message type is not found then we wakeup the next
265  *		   process on the copyout queue.
266  *   7) If a msgsnd is unable to complete for of any of the following reasons
267  *	  a) the msgq has no space for the message
268  *	  b) the maximum number of messages allowed has been reached
269  *      then one of two things happen:
270  *	  1) If the passed in msg_flag has IPC_NOWAIT set, then
271  *	     an error is returned.
272  *	  2) The IPC_NOWAIT bit is not set in msg_flag, then the
273  *	     the thread is placed to sleep until the request can be
274  *	     serviced.
275  *   8) When waking a thread waiting to send a message, a check is done to
276  *      verify that the operation being asked for by the thread will complete.
277  *      This decision making process is done in a loop where the oldest request
278  *      is checked first. The search will continue until there is no more
279  *	room on the msgq or we have checked all the waiters.
280  */
281 
282 static uint_t msg_type_hash(long);
283 static int msgq_check_err(kmsqid_t *qp, int cvres);
284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
285     kmsqid_t *);
286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
287     struct msg *, struct ipcmsgbuf *, int);
288 static void msg_rcvq_wakeup_all(list_t *);
289 static void msg_wakeup_senders(kmsqid_t *);
290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
296 
297 msg_select_t msg_fnd_sndr[] = {
298 	{ msg_fnd_any_snd, &msg_fnd_sndr[1] },
299 	{ msg_fnd_spc_snd, &msg_fnd_sndr[2] },
300 	{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
301 };
302 
303 msg_select_t msg_fnd_rdr[1] = {
304 	{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
305 };
306 
307 static struct modlinkage modlinkage = {
308 	MODREV_1,
309 	&modlsys,
310 #ifdef _SYSCALL32_IMPL
311 	&modlsys32,
312 #endif
313 	NULL
314 };
315 
316 #define	MSG_SMALL_INIT (size_t)-1
317 int
318 _init(void)
319 {
320 	int result;
321 
322 	msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
323 	    sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
324 	    offsetof(ipc_rqty_t, ipcq_msgmni));
325 	zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
326 
327 	if ((result = mod_install(&modlinkage)) == 0)
328 		return (0);
329 
330 	(void) zone_key_delete(msg_zone_key);
331 	ipcs_destroy(msq_svc);
332 
333 	return (result);
334 }
335 
336 int
337 _fini(void)
338 {
339 	return (EBUSY);
340 }
341 
342 int
343 _info(struct modinfo *modinfop)
344 {
345 	return (mod_info(&modlinkage, modinfop));
346 }
347 
348 static void
349 msg_dtor(kipc_perm_t *perm)
350 {
351 	kmsqid_t *qp = (kmsqid_t *)perm;
352 	int		ii;
353 
354 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
355 		ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
356 		ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
357 		list_destroy(&qp->msg_wait_snd[ii]);
358 		list_destroy(&qp->msg_wait_snd_ngt[ii]);
359 	}
360 	ASSERT(list_is_empty(&qp->msg_cpy_block));
361 	ASSERT(list_is_empty(&qp->msg_wait_rcv));
362 	list_destroy(&qp->msg_cpy_block);
363 	ASSERT(qp->msg_snd_cnt == 0);
364 	ASSERT(qp->msg_cbytes == 0);
365 	list_destroy(&qp->msg_list);
366 	list_destroy(&qp->msg_wait_rcv);
367 }
368 
369 
370 #define	msg_hold(mp)	(mp)->msg_copycnt++
371 
372 /*
373  * msg_rele - decrement the reference count on the message.  When count
374  * reaches zero, free message header and contents.
375  */
376 static void
377 msg_rele(struct msg *mp)
378 {
379 	ASSERT(mp->msg_copycnt > 0);
380 	if (mp->msg_copycnt-- == 1) {
381 		if (mp->msg_addr)
382 			kmem_free(mp->msg_addr, mp->msg_size);
383 		kmem_free(mp, sizeof (struct msg));
384 	}
385 }
386 
387 /*
388  * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
389  * waiting for free bytes on queue.
390  *
391  * Called with queue locked.
392  */
393 static void
394 msgunlink(kmsqid_t *qp, struct msg *mp)
395 {
396 	list_remove(&qp->msg_list, mp);
397 	qp->msg_qnum--;
398 	qp->msg_cbytes -= mp->msg_size;
399 	msg_rele(mp);
400 
401 	/* Wake up waiting writers */
402 	msg_wakeup_senders(qp);
403 }
404 
405 static void
406 msg_rmid(kipc_perm_t *perm)
407 {
408 	kmsqid_t *qp = (kmsqid_t *)perm;
409 	struct msg *mp;
410 	int		ii;
411 
412 
413 	while ((mp = list_head(&qp->msg_list)) != NULL)
414 		msgunlink(qp, mp);
415 	ASSERT(qp->msg_cbytes == 0);
416 
417 	/*
418 	 * Wake up everyone who is in a wait state of some sort
419 	 * for this message queue.
420 	 */
421 	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
422 		msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
423 		msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
424 	}
425 	msg_rcvq_wakeup_all(&qp->msg_cpy_block);
426 	msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
427 }
428 
429 /*
430  * msgctl system call.
431  *
432  * gets q lock (via ipc_lookup), releases before return.
433  * may call users of msg_lock
434  */
435 static int
436 msgctl(int msgid, int cmd, void *arg)
437 {
438 	STRUCT_DECL(msqid_ds, ds);		/* SVR4 queue work area */
439 	kmsqid_t		*qp;		/* ptr to associated q */
440 	int			error;
441 	struct	cred		*cr;
442 	model_t	mdl = get_udatamodel();
443 	struct msqid_ds64	ds64;
444 	kmutex_t		*lock;
445 	proc_t			*pp = curproc;
446 
447 	STRUCT_INIT(ds, mdl);
448 	cr = CRED();
449 
450 	/*
451 	 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
452 	 */
453 	switch (cmd) {
454 	case IPC_SET:
455 		if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
456 			return (set_errno(EFAULT));
457 		break;
458 
459 	case IPC_SET64:
460 		if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
461 			return (set_errno(EFAULT));
462 		break;
463 
464 	case IPC_RMID:
465 		if (error = ipc_rmid(msq_svc, msgid, cr))
466 			return (set_errno(error));
467 		return (0);
468 	}
469 
470 	/*
471 	 * get msqid_ds for this msgid
472 	 */
473 	if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
474 		return (set_errno(EINVAL));
475 
476 	switch (cmd) {
477 	case IPC_SET:
478 		if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
479 		    secpolicy_ipc_config(cr) != 0) {
480 			mutex_exit(lock);
481 			return (set_errno(EPERM));
482 		}
483 		if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
484 		    &STRUCT_BUF(ds)->msg_perm, mdl)) {
485 			mutex_exit(lock);
486 			return (set_errno(error));
487 		}
488 		qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
489 		qp->msg_ctime = gethrestime_sec();
490 		break;
491 
492 	case IPC_STAT:
493 		if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
494 			mutex_exit(lock);
495 			return (set_errno(error));
496 		}
497 
498 		if (qp->msg_rcv_cnt)
499 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
500 		if (qp->msg_snd_cnt)
501 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
502 		ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
503 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
504 		STRUCT_FSETP(ds, msg_first, NULL); 	/* kernel addr */
505 		STRUCT_FSETP(ds, msg_last, NULL);
506 		STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
507 		STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
508 		STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
509 		STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
510 		STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
511 		STRUCT_FSET(ds, msg_stime, qp->msg_stime);
512 		STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
513 		STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
514 		break;
515 
516 	case IPC_SET64:
517 		mutex_enter(&pp->p_lock);
518 		if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
519 		    secpolicy_ipc_config(cr) != 0 &&
520 		    rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
521 		    ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
522 			mutex_exit(&pp->p_lock);
523 			mutex_exit(lock);
524 			return (set_errno(EPERM));
525 		}
526 		mutex_exit(&pp->p_lock);
527 		if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
528 		    &ds64.msgx_perm)) {
529 			mutex_exit(lock);
530 			return (set_errno(error));
531 		}
532 		qp->msg_qbytes = ds64.msgx_qbytes;
533 		qp->msg_ctime = gethrestime_sec();
534 		break;
535 
536 	case IPC_STAT64:
537 		if (qp->msg_rcv_cnt)
538 			qp->msg_perm.ipc_mode |= MSG_RWAIT;
539 		if (qp->msg_snd_cnt)
540 			qp->msg_perm.ipc_mode |= MSG_WWAIT;
541 		ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
542 		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
543 		ds64.msgx_cbytes = qp->msg_cbytes;
544 		ds64.msgx_qnum = qp->msg_qnum;
545 		ds64.msgx_qbytes = qp->msg_qbytes;
546 		ds64.msgx_lspid = qp->msg_lspid;
547 		ds64.msgx_lrpid = qp->msg_lrpid;
548 		ds64.msgx_stime = qp->msg_stime;
549 		ds64.msgx_rtime = qp->msg_rtime;
550 		ds64.msgx_ctime = qp->msg_ctime;
551 		break;
552 
553 	default:
554 		mutex_exit(lock);
555 		return (set_errno(EINVAL));
556 	}
557 
558 	mutex_exit(lock);
559 
560 	/*
561 	 * Do copyout last (after releasing mutex).
562 	 */
563 	switch (cmd) {
564 	case IPC_STAT:
565 		if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
566 			return (set_errno(EFAULT));
567 		break;
568 
569 	case IPC_STAT64:
570 		if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
571 			return (set_errno(EFAULT));
572 		break;
573 	}
574 
575 	return (0);
576 }
577 
578 /*
579  * Remove all message queues associated with a given zone.  Called by
580  * zone_shutdown when the zone is halted.
581  */
582 /*ARGSUSED1*/
583 static void
584 msg_remove_zone(zoneid_t zoneid, void *arg)
585 {
586 	ipc_remove_zone(msq_svc, zoneid);
587 }
588 
589 /*
590  * msgget system call.
591  */
592 static int
593 msgget(key_t key, int msgflg)
594 {
595 	kmsqid_t	*qp;
596 	kmutex_t	*lock;
597 	int		id, error;
598 	int		ii;
599 	proc_t		*pp = curproc;
600 
601 top:
602 	if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
603 		return (set_errno(error));
604 
605 	if (IPC_FREE(&qp->msg_perm)) {
606 		mutex_exit(lock);
607 		mutex_exit(&pp->p_lock);
608 
609 		list_create(&qp->msg_list, sizeof (struct msg),
610 		    offsetof(struct msg, msg_node));
611 		qp->msg_qnum = 0;
612 		qp->msg_lspid = qp->msg_lrpid = 0;
613 		qp->msg_stime = qp->msg_rtime = 0;
614 		qp->msg_ctime = gethrestime_sec();
615 		qp->msg_ngt_cnt = 0;
616 		qp->msg_neg_copy = 0;
617 		for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
618 			list_create(&qp->msg_wait_snd[ii],
619 			    sizeof (msgq_wakeup_t),
620 			    offsetof(msgq_wakeup_t, msgw_list));
621 			list_create(&qp->msg_wait_snd_ngt[ii],
622 			    sizeof (msgq_wakeup_t),
623 			    offsetof(msgq_wakeup_t, msgw_list));
624 		}
625 		/*
626 		 * The proper initialization of msg_lowest_type is to the
627 		 * highest possible value.  By doing this we guarantee that
628 		 * when the first send happens, the lowest type will be set
629 		 * properly.
630 		 */
631 		qp->msg_lowest_type = MSG_SMALL_INIT;
632 		list_create(&qp->msg_cpy_block,
633 		    sizeof (msgq_wakeup_t),
634 		    offsetof(msgq_wakeup_t, msgw_list));
635 		list_create(&qp->msg_wait_rcv,
636 		    sizeof (msgq_wakeup_t),
637 		    offsetof(msgq_wakeup_t, msgw_list));
638 		qp->msg_fnd_sndr = &msg_fnd_sndr[0];
639 		qp->msg_fnd_rdr = &msg_fnd_rdr[0];
640 		qp->msg_rcv_cnt = 0;
641 		qp->msg_snd_cnt = 0;
642 		qp->msg_snd_smallest = MSG_SMALL_INIT;
643 
644 		if (error = ipc_commit_begin(msq_svc, key, msgflg,
645 		    (kipc_perm_t *)qp)) {
646 			if (error == EAGAIN)
647 				goto top;
648 			return (set_errno(error));
649 		}
650 		qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
651 		    pp->p_rctls, pp);
652 		qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
653 		    pp->p_rctls, pp);
654 		lock = ipc_commit_end(msq_svc, &qp->msg_perm);
655 	}
656 
657 	if (AU_AUDITING())
658 		audit_ipcget(AT_IPC_MSG, (void *)qp);
659 
660 	id = qp->msg_perm.ipc_id;
661 	mutex_exit(lock);
662 	return (id);
663 }
664 
665 static ssize_t
666 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
667 {
668 	struct msg	*smp;	/* ptr to best msg on q */
669 	kmsqid_t	*qp;	/* ptr to associated q */
670 	kmutex_t	*lock;
671 	size_t		xtsz;	/* transfer byte count */
672 	int		error = 0;
673 	int		cvres;
674 	uint_t		msg_hash;
675 	msgq_wakeup_t	msg_entry;
676 
677 	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
678 
679 	msg_hash = msg_type_hash(msgtyp);
680 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
681 		return ((ssize_t)set_errno(EINVAL));
682 	}
683 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
684 
685 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
686 		goto msgrcv_out;
687 	}
688 
689 	/*
690 	 * Various information (including the condvar_t) required for the
691 	 * process to sleep is provided by it's stack.
692 	 */
693 	msg_entry.msgw_thrd = curthread;
694 	msg_entry.msgw_snd_wake = 0;
695 	msg_entry.msgw_type = msgtyp;
696 findmsg:
697 	smp = msgrcv_lookup(qp, msgtyp);
698 
699 	if (smp) {
700 		/*
701 		 * We found a possible message to copy out.
702 		 */
703 		if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
704 			long t = msg_entry.msgw_snd_wake;
705 			long copy_type = smp->msg_type;
706 
707 			/*
708 			 * It is available, attempt to copy it.
709 			 */
710 			error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
711 			    smp, msgp, msgflg);
712 
713 			/*
714 			 * It is possible to consume a different message
715 			 * type then what originally awakened for (negative
716 			 * types).  If this happens a check must be done to
717 			 * to determine if another receiver is available
718 			 * for the waking message type,  Failure to do this
719 			 * can result in a message on the queue that can be
720 			 * serviced by a sleeping receiver.
721 			 */
722 			if (!error && t && (copy_type != t))
723 				msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
724 
725 			/*
726 			 * Don't forget to wakeup a sleeper that blocked because
727 			 * we were copying things out.
728 			 */
729 			msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
730 			goto msgrcv_out;
731 		}
732 		/*
733 		 * The selected message is being copied out, so block.  We do
734 		 * not need to wake the next person up on the msg_cpy_block list
735 		 * due to the fact some one is copying out and they will get
736 		 * things moving again once the copy is completed.
737 		 */
738 		cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
739 		    &msg_entry, &lock, qp);
740 		error = msgq_check_err(qp, cvres);
741 		if (error) {
742 			goto msgrcv_out;
743 		}
744 		goto findmsg;
745 	}
746 	/*
747 	 * There isn't a message to copy out that matches the designated
748 	 * criteria.
749 	 */
750 	if (msgflg & IPC_NOWAIT) {
751 		error = ENOMSG;
752 		goto msgrcv_out;
753 	}
754 	msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
755 
756 	/*
757 	 * Wait for new message.  We keep the negative and positive types
758 	 * separate for performance reasons.
759 	 */
760 	msg_entry.msgw_snd_wake = 0;
761 	if (msgtyp >= 0) {
762 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
763 		    &msg_entry, &lock, qp);
764 	} else {
765 		qp->msg_ngt_cnt++;
766 		cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
767 		    &msg_entry, &lock, qp);
768 		qp->msg_ngt_cnt--;
769 	}
770 
771 	if (!(error = msgq_check_err(qp, cvres))) {
772 		goto findmsg;
773 	}
774 
775 msgrcv_out:
776 	if (error) {
777 		msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
778 		if (msg_entry.msgw_snd_wake) {
779 			msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
780 			    msg_entry.msgw_snd_wake);
781 		}
782 		ipc_rele(msq_svc, (kipc_perm_t *)qp);
783 		return ((ssize_t)set_errno(error));
784 	}
785 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
786 	return ((ssize_t)xtsz);
787 }
788 
789 static int
790 msgq_check_err(kmsqid_t *qp, int cvres)
791 {
792 	if (IPC_FREE(&qp->msg_perm)) {
793 		return (EIDRM);
794 	}
795 
796 	if (cvres == 0) {
797 		return (EINTR);
798 	}
799 
800 	return (0);
801 }
802 
803 static int
804 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
805     size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
806 {
807 	size_t		xtsz;
808 	STRUCT_HANDLE(ipcmsgbuf, umsgp);
809 	model_t		mdl = get_udatamodel();
810 	int		copyerror = 0;
811 
812 	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
813 	if (msgsz < smp->msg_size) {
814 		if ((msgflg & MSG_NOERROR) == 0) {
815 			return (E2BIG);
816 		} else {
817 			xtsz = msgsz;
818 		}
819 	} else {
820 		xtsz = smp->msg_size;
821 	}
822 	*xtsz_ret = xtsz;
823 
824 	/*
825 	 * To prevent a DOS attack we mark the message as being
826 	 * copied out and release mutex.  When the copy is completed
827 	 * we need to acquire the mutex and make the appropriate updates.
828 	 */
829 	ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
830 	smp->msg_flags |= MSG_RCVCOPY;
831 	msg_hold(smp);
832 	if (msgtyp < 0) {
833 		ASSERT(qp->msg_neg_copy == 0);
834 		qp->msg_neg_copy = 1;
835 	}
836 	mutex_exit(*lock);
837 
838 	if (mdl == DATAMODEL_NATIVE) {
839 		copyerror = copyout(&smp->msg_type, msgp,
840 		    sizeof (smp->msg_type));
841 	} else {
842 		/*
843 		 * 32-bit callers need an imploded msg type.
844 		 */
845 		int32_t	msg_type32 = smp->msg_type;
846 
847 		copyerror = copyout(&msg_type32, msgp,
848 		    sizeof (msg_type32));
849 	}
850 
851 	if (copyerror == 0 && xtsz) {
852 		copyerror = copyout(smp->msg_addr,
853 		    STRUCT_FADDR(umsgp, mtext), xtsz);
854 	}
855 
856 	/*
857 	 * Reclaim the mutex and make sure the message queue still exists.
858 	 */
859 
860 	*lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
861 	if (msgtyp < 0) {
862 		qp->msg_neg_copy = 0;
863 	}
864 	ASSERT(smp->msg_flags & MSG_RCVCOPY);
865 	smp->msg_flags &= ~MSG_RCVCOPY;
866 	msg_rele(smp);
867 	if (IPC_FREE(&qp->msg_perm)) {
868 		return (EIDRM);
869 	}
870 	if (copyerror) {
871 		return (EFAULT);
872 	}
873 	qp->msg_lrpid = ttoproc(curthread)->p_pid;
874 	qp->msg_rtime = gethrestime_sec();
875 	msgunlink(qp, smp);
876 	return (0);
877 }
878 
879 static struct msg *
880 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
881 {
882 	struct msg 		*smp = NULL;
883 	long			qp_low;
884 	struct msg		*mp;	/* ptr to msg on q */
885 	long			low_msgtype;
886 	static struct msg	neg_copy_smp;
887 
888 	mp = list_head(&qp->msg_list);
889 	if (msgtyp == 0) {
890 		smp = mp;
891 	} else {
892 		qp_low = qp->msg_lowest_type;
893 		if (msgtyp > 0) {
894 			/*
895 			 * If our lowest possible message type is larger than
896 			 * the message type desired, then we know there is
897 			 * no entry present.
898 			 */
899 			if (qp_low > msgtyp) {
900 				return (NULL);
901 			}
902 
903 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
904 				if (msgtyp == mp->msg_type) {
905 					smp = mp;
906 					break;
907 				}
908 			}
909 		} else {
910 			/*
911 			 * We have kept track of the lowest possible message
912 			 * type on the send queue.  This allows us to terminate
913 			 * the search early if we find a message type of that
914 			 * type.  Note, the lowest type may not be the actual
915 			 * lowest value in the system, it is only guaranteed
916 			 * that there isn't a value lower than that.
917 			 */
918 			low_msgtype = -msgtyp;
919 			if (low_msgtype < qp_low) {
920 				return (NULL);
921 			}
922 			if (qp->msg_neg_copy) {
923 				neg_copy_smp.msg_flags = MSG_RCVCOPY;
924 				return (&neg_copy_smp);
925 			}
926 			for (; mp; mp = list_next(&qp->msg_list, mp)) {
927 				if (mp->msg_type <= low_msgtype &&
928 				    !(smp && smp->msg_type <= mp->msg_type)) {
929 					smp = mp;
930 					low_msgtype = mp->msg_type;
931 					if (low_msgtype == qp_low) {
932 						break;
933 					}
934 				}
935 			}
936 			if (smp) {
937 				/*
938 				 * Update the lowest message type.
939 				 */
940 				qp->msg_lowest_type = smp->msg_type;
941 			}
942 		}
943 	}
944 	return (smp);
945 }
946 
947 /*
948  * msgids system call.
949  */
950 static int
951 msgids(int *buf, uint_t nids, uint_t *pnids)
952 {
953 	int error;
954 
955 	if (error = ipc_ids(msq_svc, buf, nids, pnids))
956 		return (set_errno(error));
957 
958 	return (0);
959 }
960 
961 #define	RND(x)		roundup((x), sizeof (size_t))
962 #define	RND32(x)	roundup((x), sizeof (size32_t))
963 
964 /*
965  * msgsnap system call.
966  */
967 static int
968 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
969 {
970 	struct msg	*mp;	/* ptr to msg on q */
971 	kmsqid_t	*qp;	/* ptr to associated q */
972 	kmutex_t	*lock;
973 	size_t		size;
974 	size_t		nmsg;
975 	struct msg	**snaplist;
976 	int		error, i;
977 	model_t		mdl = get_udatamodel();
978 	STRUCT_DECL(msgsnap_head, head);
979 	STRUCT_DECL(msgsnap_mhead, mhead);
980 
981 	STRUCT_INIT(head, mdl);
982 	STRUCT_INIT(mhead, mdl);
983 
984 	if (bufsz < STRUCT_SIZE(head))
985 		return (set_errno(EINVAL));
986 
987 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
988 		return (set_errno(EINVAL));
989 
990 	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
991 		mutex_exit(lock);
992 		return (set_errno(error));
993 	}
994 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
995 
996 	/*
997 	 * First compute the required buffer size and
998 	 * the number of messages on the queue.
999 	 */
1000 	size = nmsg = 0;
1001 	for (mp = list_head(&qp->msg_list); mp;
1002 	    mp = list_next(&qp->msg_list, mp)) {
1003 		if (msgtyp == 0 ||
1004 		    (msgtyp > 0 && msgtyp == mp->msg_type) ||
1005 		    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1006 			nmsg++;
1007 			if (mdl == DATAMODEL_NATIVE)
1008 				size += RND(mp->msg_size);
1009 			else
1010 				size += RND32(mp->msg_size);
1011 		}
1012 	}
1013 
1014 	size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1015 	if (size > bufsz)
1016 		nmsg = 0;
1017 
1018 	if (nmsg > 0) {
1019 		/*
1020 		 * Mark the messages as being copied.
1021 		 */
1022 		snaplist = (struct msg **)kmem_alloc(nmsg *
1023 		    sizeof (struct msg *), KM_SLEEP);
1024 		i = 0;
1025 		for (mp = list_head(&qp->msg_list); mp;
1026 		    mp = list_next(&qp->msg_list, mp)) {
1027 			if (msgtyp == 0 ||
1028 			    (msgtyp > 0 && msgtyp == mp->msg_type) ||
1029 			    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1030 				msg_hold(mp);
1031 				snaplist[i] = mp;
1032 				i++;
1033 			}
1034 		}
1035 	}
1036 	mutex_exit(lock);
1037 
1038 	/*
1039 	 * Copy out the buffer header.
1040 	 */
1041 	STRUCT_FSET(head, msgsnap_size, size);
1042 	STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1043 	if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1044 		error = EFAULT;
1045 
1046 	buf += STRUCT_SIZE(head);
1047 
1048 	/*
1049 	 * Now copy out the messages one by one.
1050 	 */
1051 	for (i = 0; i < nmsg; i++) {
1052 		mp = snaplist[i];
1053 		if (error == 0) {
1054 			STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1055 			STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1056 			if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1057 				error = EFAULT;
1058 			buf += STRUCT_SIZE(mhead);
1059 
1060 			if (error == 0 &&
1061 			    mp->msg_size != 0 &&
1062 			    copyout(mp->msg_addr, buf, mp->msg_size))
1063 				error = EFAULT;
1064 			if (mdl == DATAMODEL_NATIVE)
1065 				buf += RND(mp->msg_size);
1066 			else
1067 				buf += RND32(mp->msg_size);
1068 		}
1069 		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1070 		msg_rele(mp);
1071 		/* Check for msg q deleted or reallocated */
1072 		if (IPC_FREE(&qp->msg_perm))
1073 			error = EIDRM;
1074 		mutex_exit(lock);
1075 	}
1076 
1077 	(void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1078 	ipc_rele(msq_svc, (kipc_perm_t *)qp);
1079 
1080 	if (nmsg > 0)
1081 		kmem_free(snaplist, nmsg * sizeof (struct msg *));
1082 
1083 	if (error)
1084 		return (set_errno(error));
1085 	return (0);
1086 }
1087 
1088 #define	MSG_PREALLOC_LIMIT 8192
1089 
1090 /*
1091  * msgsnd system call.
1092  */
1093 static int
1094 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1095 {
1096 	kmsqid_t	*qp;
1097 	kmutex_t	*lock = NULL;
1098 	struct msg	*mp = NULL;
1099 	long		type;
1100 	int		error = 0, wait_wakeup = 0;
1101 	msgq_wakeup_t   msg_entry;
1102 	model_t		mdl = get_udatamodel();
1103 	STRUCT_HANDLE(ipcmsgbuf, umsgp);
1104 
1105 	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
1106 	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1107 
1108 	if (mdl == DATAMODEL_NATIVE) {
1109 		if (copyin(msgp, &type, sizeof (type)))
1110 			return (set_errno(EFAULT));
1111 	} else {
1112 		int32_t	type32;
1113 		if (copyin(msgp, &type32, sizeof (type32)))
1114 			return (set_errno(EFAULT));
1115 		type = type32;
1116 	}
1117 
1118 	if (type < 1)
1119 		return (set_errno(EINVAL));
1120 
1121 	/*
1122 	 * We want the value here large enough that most of the
1123 	 * the message operations will use the "lockless" path,
1124 	 * but small enough that a user can not reserve large
1125 	 * chunks of kernel memory unless they have a valid
1126 	 * reason to.
1127 	 */
1128 	if (msgsz <= MSG_PREALLOC_LIMIT) {
1129 		/*
1130 		 * We are small enough that we can afford to do the
1131 		 * allocation now.  This saves dropping the lock
1132 		 * and then reacquiring the lock.
1133 		 */
1134 		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1135 		mp->msg_copycnt = 1;
1136 		mp->msg_size = msgsz;
1137 		if (msgsz) {
1138 			mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1139 			if (copyin(STRUCT_FADDR(umsgp, mtext),
1140 			    mp->msg_addr, msgsz) == -1) {
1141 				error = EFAULT;
1142 				goto msgsnd_out;
1143 			}
1144 		}
1145 	}
1146 
1147 	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1148 		error = EINVAL;
1149 		goto msgsnd_out;
1150 	}
1151 
1152 	ipc_hold(msq_svc, (kipc_perm_t *)qp);
1153 
1154 	if (msgsz > qp->msg_qbytes) {
1155 		error = EINVAL;
1156 		goto msgsnd_out;
1157 	}
1158 
1159 	if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1160 		goto msgsnd_out;
1161 
1162 top:
1163 	/*
1164 	 * Allocate space on q, message header, & buffer space.
1165 	 */
1166 	ASSERT(qp->msg_qnum <= qp->msg_qmax);
1167 	while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1168 	    (qp->msg_qnum == qp->msg_qmax)) {
1169 		int cvres;
1170 
1171 		if (msgflg & IPC_NOWAIT) {
1172 			error = EAGAIN;
1173 			goto msgsnd_out;
1174 		}
1175 
1176 		wait_wakeup = 0;
1177 		qp->msg_snd_cnt++;
1178 		msg_entry.msgw_snd_size = msgsz;
1179 		msg_entry.msgw_thrd = curthread;
1180 		msg_entry.msgw_type = type;
1181 		cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1182 		list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1183 		if (qp->msg_snd_smallest > msgsz)
1184 			qp->msg_snd_smallest = msgsz;
1185 		cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1186 		lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1187 		qp->msg_snd_cnt--;
1188 		if (list_link_active(&msg_entry.msgw_list))
1189 			list_remove(&qp->msg_wait_rcv, &msg_entry);
1190 		if (error = msgq_check_err(qp, cvres)) {
1191 			goto msgsnd_out;
1192 		}
1193 		wait_wakeup = 1;
1194 	}
1195 
1196 	if (mp == NULL) {
1197 		int failure;
1198 
1199 		mutex_exit(lock);
1200 		ASSERT(msgsz > 0);
1201 		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1202 		mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1203 		mp->msg_size = msgsz;
1204 		mp->msg_copycnt = 1;
1205 
1206 		failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1207 		    mp->msg_addr, msgsz) == -1);
1208 		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1209 		if (IPC_FREE(&qp->msg_perm)) {
1210 			error = EIDRM;
1211 			goto msgsnd_out;
1212 		}
1213 		if (failure) {
1214 			error = EFAULT;
1215 			goto msgsnd_out;
1216 		}
1217 		goto top;
1218 	}
1219 
1220 	/*
1221 	 * Everything is available, put msg on q.
1222 	 */
1223 	qp->msg_qnum++;
1224 	qp->msg_cbytes += msgsz;
1225 	qp->msg_lspid = curproc->p_pid;
1226 	qp->msg_stime = gethrestime_sec();
1227 	mp->msg_type = type;
1228 	if (qp->msg_lowest_type > type)
1229 		qp->msg_lowest_type = type;
1230 	list_insert_tail(&qp->msg_list, mp);
1231 	/*
1232 	 * Get the proper receiver going.
1233 	 */
1234 	msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1235 
1236 msgsnd_out:
1237 	/*
1238 	 * We were woken up from the send wait list, but an
1239 	 * an error occured on placing the message onto the
1240 	 * msg queue.  Given that, we need to do the wakeup
1241 	 * dance again.
1242 	 */
1243 
1244 	if (wait_wakeup && error) {
1245 		msg_wakeup_senders(qp);
1246 	}
1247 	if (lock)
1248 		ipc_rele(msq_svc, (kipc_perm_t *)qp);	/* drops lock */
1249 
1250 	if (error) {
1251 		if (mp)
1252 			msg_rele(mp);
1253 		return (set_errno(error));
1254 	}
1255 
1256 	return (0);
1257 }
1258 
1259 static void
1260 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1261 {
1262 	msg_select_t	*walker = *flist;
1263 	msgq_wakeup_t	*wakeup;
1264 	uint_t		msg_hash;
1265 
1266 	msg_hash = msg_type_hash(type);
1267 
1268 	do {
1269 		wakeup = walker->selection(qp, msg_hash, type);
1270 		walker = walker->next_selection;
1271 	} while (!wakeup && walker != *flist);
1272 
1273 	*flist = (*flist)->next_selection;
1274 	if (wakeup) {
1275 		if (type) {
1276 			wakeup->msgw_snd_wake = type;
1277 		}
1278 		cv_signal(&wakeup->msgw_wake_cv);
1279 	}
1280 }
1281 
1282 static uint_t
1283 msg_type_hash(long msg_type)
1284 {
1285 	if (msg_type < 0) {
1286 		long	hash = -msg_type / MSG_NEG_INTERVAL;
1287 		/*
1288 		 * Negative message types are hashed over an
1289 		 * interval.  Any message type that hashes
1290 		 * beyond MSG_MAX_QNUM is automatically placed
1291 		 * in the last bucket.
1292 		 */
1293 		if (hash > MSG_MAX_QNUM)
1294 			hash = MSG_MAX_QNUM;
1295 		return (hash);
1296 	}
1297 
1298 	/*
1299 	 * 0 or positive message type.  The first bucket is reserved for
1300 	 * message receivers of type 0, the other buckets we hash into.
1301 	 */
1302 	if (msg_type)
1303 		return (1 + (msg_type % MSG_MAX_QNUM));
1304 	return (0);
1305 }
1306 
1307 /*
1308  * Routines to see if we have a receiver of type 0 either blocked waiting
1309  * for a message.  Simply return the first guy on the list.
1310  */
1311 
1312 static msgq_wakeup_t *
1313 /* ARGSUSED */
1314 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1315 {
1316 	msgq_wakeup_t	*walker;
1317 
1318 	walker = list_head(&qp->msg_wait_snd[0]);
1319 
1320 	if (walker)
1321 		list_remove(&qp->msg_wait_snd[0], walker);
1322 	return (walker);
1323 }
1324 
1325 static msgq_wakeup_t *
1326 /* ARGSUSED */
1327 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1328 {
1329 	msgq_wakeup_t	*walker;
1330 
1331 	walker = list_head(&qp->msg_cpy_block);
1332 	if (walker)
1333 		list_remove(&qp->msg_cpy_block, walker);
1334 	return (walker);
1335 }
1336 
1337 static msgq_wakeup_t *
1338 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1339 {
1340 	msgq_wakeup_t	*walker;
1341 
1342 	walker = list_head(&qp->msg_wait_snd[msg_hash]);
1343 
1344 	while (walker && walker->msgw_type != type)
1345 		walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1346 	if (walker)
1347 		list_remove(&qp->msg_wait_snd[msg_hash], walker);
1348 	return (walker);
1349 }
1350 
1351 /* ARGSUSED */
1352 static msgq_wakeup_t *
1353 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1354 {
1355 	msgq_wakeup_t	*qptr;
1356 	int		count;
1357 	int		check_index;
1358 	int		neg_index;
1359 	int		nbuckets;
1360 
1361 	if (!qp->msg_ngt_cnt) {
1362 		return (NULL);
1363 	}
1364 	neg_index = msg_type_hash(-type);
1365 
1366 	/*
1367 	 * Check for a match among the negative type queues.  Any buckets
1368 	 * at neg_index or larger can match the type.  Use the last send
1369 	 * time to randomize the starting bucket to prevent starvation.
1370 	 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1371 	 * from the random starting point, and wrapping around after
1372 	 * MSG_MAX_QNUM.
1373 	 */
1374 
1375 	nbuckets = MSG_MAX_QNUM - neg_index + 1;
1376 	check_index = neg_index + (qp->msg_stime % nbuckets);
1377 
1378 	for (count = nbuckets; count > 0; count--) {
1379 		qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1380 		while (qptr) {
1381 			/*
1382 			 * The lowest hash bucket may actually contain
1383 			 * message types that are not valid for this
1384 			 * request.  This can happen due to the fact that
1385 			 * the message buckets actually contain a consecutive
1386 			 * range of types.
1387 			 */
1388 			if (-qptr->msgw_type >= type) {
1389 				list_remove(&qp->msg_wait_snd_ngt[check_index],
1390 				    qptr);
1391 				return (qptr);
1392 			}
1393 			qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1394 			    qptr);
1395 		}
1396 		if (++check_index > MSG_MAX_QNUM) {
1397 			check_index = neg_index;
1398 		}
1399 	}
1400 	return (NULL);
1401 }
1402 
1403 static int
1404 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1405     kmsqid_t *qp)
1406 {
1407 	int		cvres;
1408 
1409 	cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1410 
1411 	list_insert_tail(queue, entry);
1412 
1413 	qp->msg_rcv_cnt++;
1414 	cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1415 	*lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1416 	qp->msg_rcv_cnt--;
1417 
1418 	if (list_link_active(&entry->msgw_list)) {
1419 		/*
1420 		 * We woke up unexpectedly, remove ourself.
1421 		 */
1422 		list_remove(queue, entry);
1423 	}
1424 
1425 	return (cvres);
1426 }
1427 
1428 static void
1429 msg_rcvq_wakeup_all(list_t *q_ptr)
1430 {
1431 	msgq_wakeup_t	*q_walk;
1432 
1433 	while (q_walk = list_head(q_ptr)) {
1434 		list_remove(q_ptr, q_walk);
1435 		cv_signal(&q_walk->msgw_wake_cv);
1436 	}
1437 }
1438 
1439 /*
1440  * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441  * system calls.
1442  */
1443 static ssize_t
1444 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1445 	uintptr_t a4, uintptr_t a5)
1446 {
1447 	ssize_t error;
1448 
1449 	switch (opcode) {
1450 	case MSGGET:
1451 		error = msgget((key_t)a1, (int)a2);
1452 		break;
1453 	case MSGCTL:
1454 		error = msgctl((int)a1, (int)a2, (void *)a3);
1455 		break;
1456 	case MSGRCV:
1457 		error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1458 		    (size_t)a3, (long)a4, (int)a5);
1459 		break;
1460 	case MSGSND:
1461 		error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1462 		    (size_t)a3, (int)a4);
1463 		break;
1464 	case MSGIDS:
1465 		error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1466 		break;
1467 	case MSGSNAP:
1468 		error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1469 		break;
1470 	default:
1471 		error = set_errno(EINVAL);
1472 		break;
1473 	}
1474 
1475 	return (error);
1476 }
1477 
1478 /*
1479  * Determine if a writer who is waiting can process its message.  If so
1480  * wake it up.
1481  */
1482 static void
1483 msg_wakeup_senders(kmsqid_t *qp)
1484 
1485 {
1486 	struct msgq_wakeup *ptr, *optr;
1487 	size_t avail, smallest;
1488 	int msgs_out;
1489 
1490 	/*
1491 	 * Is there a writer waiting, and if so, can it be serviced? If
1492 	 * not return back to the caller.
1493 	 */
1494 	if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1495 		return;
1496 
1497 	avail = qp->msg_qbytes - qp->msg_cbytes;
1498 	if (avail < qp->msg_snd_smallest)
1499 		return;
1500 
1501 	ptr = list_head(&qp->msg_wait_rcv);
1502 	if (ptr == NULL) {
1503 		qp->msg_snd_smallest = MSG_SMALL_INIT;
1504 		return;
1505 	}
1506 	optr = ptr;
1507 
1508 	/*
1509 	 * smallest:	minimum message size of all queued writers
1510 	 *
1511 	 * avail:	amount of space left on the msgq
1512 	 *		if all the writers we have woken up are successful.
1513 	 *
1514 	 * msgs_out:	is the number of messages on the message queue if
1515 	 *		all the writers we have woken up are successful.
1516 	 */
1517 
1518 	smallest = MSG_SMALL_INIT;
1519 	msgs_out = qp->msg_qnum;
1520 	while (ptr) {
1521 		ptr = list_next(&qp->msg_wait_rcv, ptr);
1522 		if (optr->msgw_snd_size <= avail) {
1523 			list_remove(&qp->msg_wait_rcv, optr);
1524 			avail -= optr->msgw_snd_size;
1525 			cv_signal(&optr->msgw_wake_cv);
1526 			msgs_out++;
1527 			if (msgs_out == qp->msg_qmax ||
1528 			    avail < qp->msg_snd_smallest)
1529 				break;
1530 		} else {
1531 			if (smallest > optr->msgw_snd_size)
1532 				smallest = optr->msgw_snd_size;
1533 		}
1534 		optr = ptr;
1535 	}
1536 
1537 	/*
1538 	 * Reset the smallest message size if the entire list has been visited
1539 	 */
1540 	if (ptr == NULL && smallest != MSG_SMALL_INIT)
1541 		qp->msg_snd_smallest = smallest;
1542 }
1543 
1544 #ifdef	_SYSCALL32_IMPL
1545 /*
1546  * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1547  * system calls for 32-bit callers on LP64 kernel.
1548  */
1549 static ssize32_t
1550 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1551 	uint32_t a4, uint32_t a5)
1552 {
1553 	ssize_t error;
1554 
1555 	switch (opcode) {
1556 	case MSGGET:
1557 		error = msgget((key_t)a1, (int)a2);
1558 		break;
1559 	case MSGCTL:
1560 		error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1561 		break;
1562 	case MSGRCV:
1563 		error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1564 		    (size_t)a3, (long)(int32_t)a4, (int)a5);
1565 		break;
1566 	case MSGSND:
1567 		error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1568 		    (size_t)(int32_t)a3, (int)a4);
1569 		break;
1570 	case MSGIDS:
1571 		error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1572 		    (uint_t *)(uintptr_t)a3);
1573 		break;
1574 	case MSGSNAP:
1575 		error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1576 		    (long)(int32_t)a4);
1577 		break;
1578 	default:
1579 		error = set_errno(EINVAL);
1580 		break;
1581 	}
1582 
1583 	return (error);
1584 }
1585 #endif	/* SYSCALL32_IMPL */
1586