1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25
26/*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
27/*	  All Rights Reserved	*/
28
29
30/*
31 * Inter-Process Communication Message Facility.
32 *
33 * See os/ipc.c for a description of common IPC functionality.
34 *
35 * Resource controls
36 * -----------------
37 *
38 * Control:      zone.max-msg-ids (rc_zone_msgmni)
39 * Description:  Maximum number of message queue ids allowed a zone.
40 *
41 *   When msgget() is used to allocate a message queue, one id is
42 *   allocated.  If the id allocation doesn't succeed, msgget() fails
43 *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
44 *   the id is deallocated.
45 *
46 * Control:      project.max-msg-ids (rc_project_msgmni)
47 * Description:  Maximum number of message queue ids allowed a project.
48 *
49 *   When msgget() is used to allocate a message queue, one id is
50 *   allocated.  If the id allocation doesn't succeed, msgget() fails
51 *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
52 *   the id is deallocated.
53 *
54 * Control:      process.max-msg-qbytes (rc_process_msgmnb)
55 * Description:  Maximum number of bytes of messages on a message queue.
56 *
57 *   When msgget() successfully allocates a message queue, the minimum
58 *   enforced value of this limit is used to initialize msg_qbytes.
59 *
60 * Control:      process.max-msg-messages (rc_process_msgtql)
61 * Description:  Maximum number of messages on a message queue.
62 *
63 *   When msgget() successfully allocates a message queue, the minimum
64 *   enforced value of this limit is used to initialize a per-queue
65 *   limit on the number of messages.
66 */
67
68#include <sys/types.h>
69#include <sys/t_lock.h>
70#include <sys/param.h>
71#include <sys/cred.h>
72#include <sys/user.h>
73#include <sys/proc.h>
74#include <sys/time.h>
75#include <sys/ipc.h>
76#include <sys/ipc_impl.h>
77#include <sys/msg.h>
78#include <sys/msg_impl.h>
79#include <sys/list.h>
80#include <sys/systm.h>
81#include <sys/sysmacros.h>
82#include <sys/cpuvar.h>
83#include <sys/kmem.h>
84#include <sys/ddi.h>
85#include <sys/errno.h>
86#include <sys/cmn_err.h>
87#include <sys/debug.h>
88#include <sys/project.h>
89#include <sys/modctl.h>
90#include <sys/syscall.h>
91#include <sys/policy.h>
92#include <sys/zone.h>
93
94#include <c2/audit.h>
95
96/*
97 * The following tunables are obsolete.  Though for compatibility we
98 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
99 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
100 * mechanism for administrating the IPC Message facility is through the
101 * resource controls described at the top of this file.
102 */
103size_t	msginfo_msgmax = 2048;	/* (obsolete) */
104size_t	msginfo_msgmnb = 4096;	/* (obsolete) */
105int	msginfo_msgmni = 50;	/* (obsolete) */
106int	msginfo_msgtql = 40;	/* (obsolete) */
107int	msginfo_msgssz = 8;	/* (obsolete) */
108int	msginfo_msgmap = 0;	/* (obsolete) */
109ushort_t msginfo_msgseg = 1024;	/* (obsolete) */
110
111extern rctl_hndl_t rc_zone_msgmni;
112extern rctl_hndl_t rc_project_msgmni;
113extern rctl_hndl_t rc_process_msgmnb;
114extern rctl_hndl_t rc_process_msgtql;
115static ipc_service_t *msq_svc;
116static zone_key_t msg_zone_key;
117
118static void msg_dtor(kipc_perm_t *);
119static void msg_rmid(kipc_perm_t *);
120static void msg_remove_zone(zoneid_t, void *);
121
122/*
123 * Module linkage information for the kernel.
124 */
125static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
126	uintptr_t a4, uintptr_t a5);
127
128static struct sysent ipcmsg_sysent = {
129	6,
130#ifdef	_LP64
131	SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
132#else
133	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
134#endif
135	(int (*)())(uintptr_t)msgsys
136};
137
138#ifdef	_SYSCALL32_IMPL
139static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
140	uint32_t a4, uint32_t a5);
141
142static struct sysent ipcmsg_sysent32 = {
143	6,
144	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
145	msgsys32
146};
147#endif	/* _SYSCALL32_IMPL */
148
149static struct modlsys modlsys = {
150	&mod_syscallops, "System V message facility", &ipcmsg_sysent
151};
152
153#ifdef _SYSCALL32_IMPL
154static struct modlsys modlsys32 = {
155	&mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
156};
157#endif
158
159/*
160 *      Big Theory statement for message queue correctness
161 *
162 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
163 * receivers who are waiting for an event.  Using the cv_broadcast method
164 * resulted in negative scaling when the number of waiting receivers are large
165 * (the thundering herd problem).  Instead, the receivers waiting to receive a
166 * message are now linked in a queue-like fashion and awaken one at a time in
167 * a controlled manner.
168 *
169 * Receivers can block on two different classes of waiting list:
170 *    1) "sendwait" list, which is the more complex list of the two.  The
171 *	  receiver will be awakened by a sender posting a new message.  There
172 *	  are two types of "sendwait" list used:
173 *		a) msg_wait_snd: handles all receivers who are looking for
174 *		   a message type >= 0, but was unable to locate a match.
175 *
176 *		   slot 0: reserved for receivers that have designated they
177 *			   will take any message type.
178 *		   rest:   consist of receivers requesting a specific type
179 *			   but the type was not present.  The entries are
180 *			   hashed into a bucket in an attempt to keep
181 *			   any list search relatively short.
182 *		b) msg_wait_snd_ngt: handles all receivers that have designated
183 *		   a negative message type. Unlike msg_wait_snd, the hash bucket
184 *		   serves a range of negative message types (-1 to -5, -6 to -10
185 *		   and so forth), where the last bucket is reserved for all the
186 *		   negative message types that hash outside of MSG_MAX_QNUM - 1.
187 *		   This is done this way to simplify the operation of locating a
188 *		   negative message type.
189 *
190 *    2) "copyout" list, where the receiver is awakened by another
191 *	 receiver after a message is copied out.  This is a linked list
192 *	 of waiters that are awakened one at a time.  Although the solution is
193 *	 not optimal, the complexity that would be added in for waking
194 *	 up the right entry far exceeds any potential pay back (too many
195 *	 correctness and corner case issues).
196 *
197 * The lists are doubly linked.  In the case of the "sendwait"
198 * list, this allows the thread to remove itself from the list without having
199 * to traverse the list.  In the case of the "copyout" list it simply allows
200 * us to use common functions with the "sendwait" list.
201 *
202 * To make sure receivers are not hung out to dry, we must guarantee:
203 *    1. If any queued message matches any receiver, then at least one
204 *       matching receiver must be processing the request.
205 *    2. Blocking on the copyout queue is only temporary while messages
206 *	 are being copied out.  The process is guaranted to wakeup
207 *	 when it gets to front of the queue (copyout is a FIFO).
208 *
209 * Rules for blocking and waking up:
210 *   1. A receiver entering msgrcv must examine all messages for a match
211 *      before blocking on a sendwait queue.
212 *   2. If the receiver blocks because the message it chose is already
213 *	being copied out, then when it wakes up needs to start start
214 *	checking the messages from the beginning.
215 *   3) When ever a process returns from msgrcv for any reason, if it
216 *	had attempted to copy a message or blocked waiting for a copy
217 *	to complete it needs to wakeup the next receiver blocked on
218 *	a copy out.
219 *   4) When a message is sent, the sender selects a process waiting
220 *	for that type of message.  This selection process rotates between
221 *	receivers types of 0, negative and positive to prevent starvation of
222 *	any one particular receiver type.
223 *   5) The following are the scenarios for processes that are awakened
224 *	by a msgsnd:
225 *		a) The process finds the message and is able to copy
226 *		   it out.  Once complete, the process returns.
227 *		b) The message that was sent that triggered the wakeup is no
228 *		   longer available (another process found the message first).
229 *		   We issue a wakeup on copy queue and then go back to
230 *		   sleep waiting for another matching message to be sent.
231 *		c) The message that was supposed to be processed was
232 *		   already serviced by another process.  However a different
233 *		   message is present which we can service.  The message
234 *		   is copied and the process returns.
235 *		d) The message is found, but some sort of error occurs that
236 *		   prevents the message from being copied.  The receiver
237 *		   wakes up the next sender that can service this message
238 *		   type and returns an error to the caller.
239 *		e) The message is found, but it is marked as being copied
240 *		   out.  The receiver then goes to sleep on the copyout
241 *		   queue where it will be awakened again sometime in the future.
242 *
243 *
244 *   6) Whenever a message is found that matches the message type designated,
245 *	but is being copied out we have to block on the copyout queue.
246 *	After process copying finishes the copy out, it  must wakeup (either
247 *	directly or indirectly) all receivers who blocked on its copyout,
248 *	so they are guaranteed a chance to examine the remaining messages.
249 *	This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
250 *	and so on.  The chain cannot be broken.  This leads to the following
251 *	cases:
252 *		a) A receiver is finished copying the message (or encountered)
253 *		   an error), the first entry on the copyout queue is woken
254 *		   up.
255 *		b) When the receiver is woken up, it attempts to locate
256 *		   a message type match.
257 *		c) If a message type is found and
258 *			-- MSG_RCVCOPY flag is not set, the message is
259 *			   marked for copying out.  Regardless of the copyout
260 *			   success the next entry on the copyout queue is
261 *			   awakened and the operation is completed.
262 *			-- MSG_RCVCOPY is set, we simply go back to sleep again
263 *			   on the copyout queue.
264 *		d) If the message type is not found then we wakeup the next
265 *		   process on the copyout queue.
266 *   7) If a msgsnd is unable to complete for of any of the following reasons
267 *	  a) the msgq has no space for the message
268 *	  b) the maximum number of messages allowed has been reached
269 *      then one of two things happen:
270 *	  1) If the passed in msg_flag has IPC_NOWAIT set, then
271 *	     an error is returned.
272 *	  2) The IPC_NOWAIT bit is not set in msg_flag, then the
273 *	     the thread is placed to sleep until the request can be
274 *	     serviced.
275 *   8) When waking a thread waiting to send a message, a check is done to
276 *      verify that the operation being asked for by the thread will complete.
277 *      This decision making process is done in a loop where the oldest request
278 *      is checked first. The search will continue until there is no more
279 *	room on the msgq or we have checked all the waiters.
280 */
281
282static uint_t msg_type_hash(long);
283static int msgq_check_err(kmsqid_t *qp, int cvres);
284static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
285    kmsqid_t *);
286static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
287    struct msg *, struct ipcmsgbuf *, int);
288static void msg_rcvq_wakeup_all(list_t *);
289static void msg_wakeup_senders(kmsqid_t *);
290static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
291static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
292static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
293static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
294static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
295static struct msg *msgrcv_lookup(kmsqid_t *, long);
296
297msg_select_t msg_fnd_sndr[] = {
298	{ msg_fnd_any_snd, &msg_fnd_sndr[1] },
299	{ msg_fnd_spc_snd, &msg_fnd_sndr[2] },
300	{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
301};
302
303msg_select_t msg_fnd_rdr[1] = {
304	{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
305};
306
307static struct modlinkage modlinkage = {
308	MODREV_1,
309	&modlsys,
310#ifdef _SYSCALL32_IMPL
311	&modlsys32,
312#endif
313	NULL
314};
315
316#define	MSG_SMALL_INIT (size_t)-1
317int
318_init(void)
319{
320	int result;
321
322	msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
323	    sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
324	    offsetof(ipc_rqty_t, ipcq_msgmni));
325	zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
326
327	if ((result = mod_install(&modlinkage)) == 0)
328		return (0);
329
330	(void) zone_key_delete(msg_zone_key);
331	ipcs_destroy(msq_svc);
332
333	return (result);
334}
335
336int
337_fini(void)
338{
339	return (EBUSY);
340}
341
342int
343_info(struct modinfo *modinfop)
344{
345	return (mod_info(&modlinkage, modinfop));
346}
347
348static void
349msg_dtor(kipc_perm_t *perm)
350{
351	kmsqid_t *qp = (kmsqid_t *)perm;
352	int		ii;
353
354	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
355		ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
356		ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
357		list_destroy(&qp->msg_wait_snd[ii]);
358		list_destroy(&qp->msg_wait_snd_ngt[ii]);
359	}
360	ASSERT(list_is_empty(&qp->msg_cpy_block));
361	ASSERT(list_is_empty(&qp->msg_wait_rcv));
362	list_destroy(&qp->msg_cpy_block);
363	ASSERT(qp->msg_snd_cnt == 0);
364	ASSERT(qp->msg_cbytes == 0);
365	list_destroy(&qp->msg_list);
366	list_destroy(&qp->msg_wait_rcv);
367}
368
369
370#define	msg_hold(mp)	(mp)->msg_copycnt++
371
372/*
373 * msg_rele - decrement the reference count on the message.  When count
374 * reaches zero, free message header and contents.
375 */
376static void
377msg_rele(struct msg *mp)
378{
379	ASSERT(mp->msg_copycnt > 0);
380	if (mp->msg_copycnt-- == 1) {
381		if (mp->msg_addr)
382			kmem_free(mp->msg_addr, mp->msg_size);
383		kmem_free(mp, sizeof (struct msg));
384	}
385}
386
387/*
388 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
389 * waiting for free bytes on queue.
390 *
391 * Called with queue locked.
392 */
393static void
394msgunlink(kmsqid_t *qp, struct msg *mp)
395{
396	list_remove(&qp->msg_list, mp);
397	qp->msg_qnum--;
398	qp->msg_cbytes -= mp->msg_size;
399	msg_rele(mp);
400
401	/* Wake up waiting writers */
402	msg_wakeup_senders(qp);
403}
404
405static void
406msg_rmid(kipc_perm_t *perm)
407{
408	kmsqid_t *qp = (kmsqid_t *)perm;
409	struct msg *mp;
410	int		ii;
411
412
413	while ((mp = list_head(&qp->msg_list)) != NULL)
414		msgunlink(qp, mp);
415	ASSERT(qp->msg_cbytes == 0);
416
417	/*
418	 * Wake up everyone who is in a wait state of some sort
419	 * for this message queue.
420	 */
421	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
422		msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
423		msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
424	}
425	msg_rcvq_wakeup_all(&qp->msg_cpy_block);
426	msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
427}
428
429/*
430 * msgctl system call.
431 *
432 * gets q lock (via ipc_lookup), releases before return.
433 * may call users of msg_lock
434 */
435static int
436msgctl(int msgid, int cmd, void *arg)
437{
438	STRUCT_DECL(msqid_ds, ds);		/* SVR4 queue work area */
439	kmsqid_t		*qp;		/* ptr to associated q */
440	int			error;
441	struct	cred		*cr;
442	model_t	mdl = get_udatamodel();
443	struct msqid_ds64	ds64;
444	kmutex_t		*lock;
445	proc_t			*pp = curproc;
446
447	STRUCT_INIT(ds, mdl);
448	cr = CRED();
449
450	/*
451	 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
452	 */
453	switch (cmd) {
454	case IPC_SET:
455		if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
456			return (set_errno(EFAULT));
457		break;
458
459	case IPC_SET64:
460		if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
461			return (set_errno(EFAULT));
462		break;
463
464	case IPC_RMID:
465		if (error = ipc_rmid(msq_svc, msgid, cr))
466			return (set_errno(error));
467		return (0);
468	}
469
470	/*
471	 * get msqid_ds for this msgid
472	 */
473	if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
474		return (set_errno(EINVAL));
475
476	switch (cmd) {
477	case IPC_SET:
478		if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
479		    secpolicy_ipc_config(cr) != 0) {
480			mutex_exit(lock);
481			return (set_errno(EPERM));
482		}
483		if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
484		    &STRUCT_BUF(ds)->msg_perm, mdl)) {
485			mutex_exit(lock);
486			return (set_errno(error));
487		}
488		qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
489		qp->msg_ctime = gethrestime_sec();
490		break;
491
492	case IPC_STAT:
493		if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
494			mutex_exit(lock);
495			return (set_errno(error));
496		}
497
498		if (qp->msg_rcv_cnt)
499			qp->msg_perm.ipc_mode |= MSG_RWAIT;
500		if (qp->msg_snd_cnt)
501			qp->msg_perm.ipc_mode |= MSG_WWAIT;
502		ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
503		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
504		STRUCT_FSETP(ds, msg_first, NULL);	/* kernel addr */
505		STRUCT_FSETP(ds, msg_last, NULL);
506		STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
507		STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
508		STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
509		STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
510		STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
511		STRUCT_FSET(ds, msg_stime, qp->msg_stime);
512		STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
513		STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
514		break;
515
516	case IPC_SET64:
517		mutex_enter(&pp->p_lock);
518		if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
519		    secpolicy_ipc_config(cr) != 0 &&
520		    rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
521		    ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
522			mutex_exit(&pp->p_lock);
523			mutex_exit(lock);
524			return (set_errno(EPERM));
525		}
526		mutex_exit(&pp->p_lock);
527		if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
528		    &ds64.msgx_perm)) {
529			mutex_exit(lock);
530			return (set_errno(error));
531		}
532		qp->msg_qbytes = ds64.msgx_qbytes;
533		qp->msg_ctime = gethrestime_sec();
534		break;
535
536	case IPC_STAT64:
537		if (qp->msg_rcv_cnt)
538			qp->msg_perm.ipc_mode |= MSG_RWAIT;
539		if (qp->msg_snd_cnt)
540			qp->msg_perm.ipc_mode |= MSG_WWAIT;
541		ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
542		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
543		ds64.msgx_cbytes = qp->msg_cbytes;
544		ds64.msgx_qnum = qp->msg_qnum;
545		ds64.msgx_qbytes = qp->msg_qbytes;
546		ds64.msgx_lspid = qp->msg_lspid;
547		ds64.msgx_lrpid = qp->msg_lrpid;
548		ds64.msgx_stime = qp->msg_stime;
549		ds64.msgx_rtime = qp->msg_rtime;
550		ds64.msgx_ctime = qp->msg_ctime;
551		break;
552
553	default:
554		mutex_exit(lock);
555		return (set_errno(EINVAL));
556	}
557
558	mutex_exit(lock);
559
560	/*
561	 * Do copyout last (after releasing mutex).
562	 */
563	switch (cmd) {
564	case IPC_STAT:
565		if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
566			return (set_errno(EFAULT));
567		break;
568
569	case IPC_STAT64:
570		if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
571			return (set_errno(EFAULT));
572		break;
573	}
574
575	return (0);
576}
577
578/*
579 * Remove all message queues associated with a given zone.  Called by
580 * zone_shutdown when the zone is halted.
581 */
582/*ARGSUSED1*/
583static void
584msg_remove_zone(zoneid_t zoneid, void *arg)
585{
586	ipc_remove_zone(msq_svc, zoneid);
587}
588
589/*
590 * msgget system call.
591 */
592static int
593msgget(key_t key, int msgflg)
594{
595	kmsqid_t	*qp;
596	kmutex_t	*lock;
597	int		id, error;
598	int		ii;
599	proc_t		*pp = curproc;
600
601top:
602	if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
603		return (set_errno(error));
604
605	if (IPC_FREE(&qp->msg_perm)) {
606		mutex_exit(lock);
607		mutex_exit(&pp->p_lock);
608
609		list_create(&qp->msg_list, sizeof (struct msg),
610		    offsetof(struct msg, msg_node));
611		qp->msg_qnum = 0;
612		qp->msg_lspid = qp->msg_lrpid = 0;
613		qp->msg_stime = qp->msg_rtime = 0;
614		qp->msg_ctime = gethrestime_sec();
615		qp->msg_ngt_cnt = 0;
616		qp->msg_neg_copy = 0;
617		for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
618			list_create(&qp->msg_wait_snd[ii],
619			    sizeof (msgq_wakeup_t),
620			    offsetof(msgq_wakeup_t, msgw_list));
621			list_create(&qp->msg_wait_snd_ngt[ii],
622			    sizeof (msgq_wakeup_t),
623			    offsetof(msgq_wakeup_t, msgw_list));
624		}
625		/*
626		 * The proper initialization of msg_lowest_type is to the
627		 * highest possible value.  By doing this we guarantee that
628		 * when the first send happens, the lowest type will be set
629		 * properly.
630		 */
631		qp->msg_lowest_type = MSG_SMALL_INIT;
632		list_create(&qp->msg_cpy_block,
633		    sizeof (msgq_wakeup_t),
634		    offsetof(msgq_wakeup_t, msgw_list));
635		list_create(&qp->msg_wait_rcv,
636		    sizeof (msgq_wakeup_t),
637		    offsetof(msgq_wakeup_t, msgw_list));
638		qp->msg_fnd_sndr = &msg_fnd_sndr[0];
639		qp->msg_fnd_rdr = &msg_fnd_rdr[0];
640		qp->msg_rcv_cnt = 0;
641		qp->msg_snd_cnt = 0;
642		qp->msg_snd_smallest = MSG_SMALL_INIT;
643
644		if (error = ipc_commit_begin(msq_svc, key, msgflg,
645		    (kipc_perm_t *)qp)) {
646			if (error == EAGAIN)
647				goto top;
648			return (set_errno(error));
649		}
650		qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
651		    pp->p_rctls, pp);
652		qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
653		    pp->p_rctls, pp);
654		lock = ipc_commit_end(msq_svc, &qp->msg_perm);
655	}
656
657	if (AU_AUDITING())
658		audit_ipcget(AT_IPC_MSG, (void *)qp);
659
660	id = qp->msg_perm.ipc_id;
661	mutex_exit(lock);
662	return (id);
663}
664
665static ssize_t
666msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
667{
668	struct msg	*smp;	/* ptr to best msg on q */
669	kmsqid_t	*qp;	/* ptr to associated q */
670	kmutex_t	*lock;
671	size_t		xtsz;	/* transfer byte count */
672	int		error = 0;
673	int		cvres;
674	uint_t		msg_hash;
675	msgq_wakeup_t	msg_entry;
676
677	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
678
679	msg_hash = msg_type_hash(msgtyp);
680	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
681		return ((ssize_t)set_errno(EINVAL));
682	}
683	ipc_hold(msq_svc, (kipc_perm_t *)qp);
684
685	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
686		goto msgrcv_out;
687	}
688
689	/*
690	 * Various information (including the condvar_t) required for the
691	 * process to sleep is provided by it's stack.
692	 */
693	msg_entry.msgw_thrd = curthread;
694	msg_entry.msgw_snd_wake = 0;
695	msg_entry.msgw_type = msgtyp;
696findmsg:
697	smp = msgrcv_lookup(qp, msgtyp);
698
699	if (smp) {
700		/*
701		 * We found a possible message to copy out.
702		 */
703		if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
704			long t = msg_entry.msgw_snd_wake;
705			long copy_type = smp->msg_type;
706
707			/*
708			 * It is available, attempt to copy it.
709			 */
710			error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
711			    smp, msgp, msgflg);
712
713			/*
714			 * It is possible to consume a different message
715			 * type then what originally awakened for (negative
716			 * types).  If this happens a check must be done to
717			 * to determine if another receiver is available
718			 * for the waking message type,  Failure to do this
719			 * can result in a message on the queue that can be
720			 * serviced by a sleeping receiver.
721			 */
722			if (!error && t && (copy_type != t))
723				msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
724
725			/*
726			 * Don't forget to wakeup a sleeper that blocked because
727			 * we were copying things out.
728			 */
729			msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
730			goto msgrcv_out;
731		}
732		/*
733		 * The selected message is being copied out, so block.  We do
734		 * not need to wake the next person up on the msg_cpy_block list
735		 * due to the fact some one is copying out and they will get
736		 * things moving again once the copy is completed.
737		 */
738		cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
739		    &msg_entry, &lock, qp);
740		error = msgq_check_err(qp, cvres);
741		if (error) {
742			goto msgrcv_out;
743		}
744		goto findmsg;
745	}
746	/*
747	 * There isn't a message to copy out that matches the designated
748	 * criteria.
749	 */
750	if (msgflg & IPC_NOWAIT) {
751		error = ENOMSG;
752		goto msgrcv_out;
753	}
754	msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
755
756	/*
757	 * Wait for new message.  We keep the negative and positive types
758	 * separate for performance reasons.
759	 */
760	msg_entry.msgw_snd_wake = 0;
761	if (msgtyp >= 0) {
762		cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
763		    &msg_entry, &lock, qp);
764	} else {
765		qp->msg_ngt_cnt++;
766		cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
767		    &msg_entry, &lock, qp);
768		qp->msg_ngt_cnt--;
769	}
770
771	if (!(error = msgq_check_err(qp, cvres))) {
772		goto findmsg;
773	}
774
775msgrcv_out:
776	if (error) {
777		msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
778		if (msg_entry.msgw_snd_wake) {
779			msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
780			    msg_entry.msgw_snd_wake);
781		}
782		ipc_rele(msq_svc, (kipc_perm_t *)qp);
783		return ((ssize_t)set_errno(error));
784	}
785	ipc_rele(msq_svc, (kipc_perm_t *)qp);
786	return ((ssize_t)xtsz);
787}
788
789static int
790msgq_check_err(kmsqid_t *qp, int cvres)
791{
792	if (IPC_FREE(&qp->msg_perm)) {
793		return (EIDRM);
794	}
795
796	if (cvres == 0) {
797		return (EINTR);
798	}
799
800	return (0);
801}
802
803static int
804msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
805    size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
806{
807	size_t		xtsz;
808	STRUCT_HANDLE(ipcmsgbuf, umsgp);
809	model_t		mdl = get_udatamodel();
810	int		copyerror = 0;
811
812	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
813	if (msgsz < smp->msg_size) {
814		if ((msgflg & MSG_NOERROR) == 0) {
815			return (E2BIG);
816		} else {
817			xtsz = msgsz;
818		}
819	} else {
820		xtsz = smp->msg_size;
821	}
822	*xtsz_ret = xtsz;
823
824	/*
825	 * To prevent a DOS attack we mark the message as being
826	 * copied out and release mutex.  When the copy is completed
827	 * we need to acquire the mutex and make the appropriate updates.
828	 */
829	ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
830	smp->msg_flags |= MSG_RCVCOPY;
831	msg_hold(smp);
832	if (msgtyp < 0) {
833		ASSERT(qp->msg_neg_copy == 0);
834		qp->msg_neg_copy = 1;
835	}
836	mutex_exit(*lock);
837
838	if (mdl == DATAMODEL_NATIVE) {
839		copyerror = copyout(&smp->msg_type, msgp,
840		    sizeof (smp->msg_type));
841	} else {
842		/*
843		 * 32-bit callers need an imploded msg type.
844		 */
845		int32_t	msg_type32 = smp->msg_type;
846
847		copyerror = copyout(&msg_type32, msgp,
848		    sizeof (msg_type32));
849	}
850
851	if (copyerror == 0 && xtsz) {
852		copyerror = copyout(smp->msg_addr,
853		    STRUCT_FADDR(umsgp, mtext), xtsz);
854	}
855
856	/*
857	 * Reclaim the mutex and make sure the message queue still exists.
858	 */
859
860	*lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
861	if (msgtyp < 0) {
862		qp->msg_neg_copy = 0;
863	}
864	ASSERT(smp->msg_flags & MSG_RCVCOPY);
865	smp->msg_flags &= ~MSG_RCVCOPY;
866	msg_rele(smp);
867	if (IPC_FREE(&qp->msg_perm)) {
868		return (EIDRM);
869	}
870	if (copyerror) {
871		return (EFAULT);
872	}
873	qp->msg_lrpid = ttoproc(curthread)->p_pid;
874	qp->msg_rtime = gethrestime_sec();
875	msgunlink(qp, smp);
876	return (0);
877}
878
879static struct msg *
880msgrcv_lookup(kmsqid_t *qp, long msgtyp)
881{
882	struct msg		*smp = NULL;
883	long			qp_low;
884	struct msg		*mp;	/* ptr to msg on q */
885	long			low_msgtype;
886	static struct msg	neg_copy_smp;
887
888	mp = list_head(&qp->msg_list);
889	if (msgtyp == 0) {
890		smp = mp;
891	} else {
892		qp_low = qp->msg_lowest_type;
893		if (msgtyp > 0) {
894			/*
895			 * If our lowest possible message type is larger than
896			 * the message type desired, then we know there is
897			 * no entry present.
898			 */
899			if (qp_low > msgtyp) {
900				return (NULL);
901			}
902
903			for (; mp; mp = list_next(&qp->msg_list, mp)) {
904				if (msgtyp == mp->msg_type) {
905					smp = mp;
906					break;
907				}
908			}
909		} else {
910			/*
911			 * We have kept track of the lowest possible message
912			 * type on the send queue.  This allows us to terminate
913			 * the search early if we find a message type of that
914			 * type.  Note, the lowest type may not be the actual
915			 * lowest value in the system, it is only guaranteed
916			 * that there isn't a value lower than that.
917			 */
918			low_msgtype = -msgtyp;
919			if (low_msgtype < qp_low) {
920				return (NULL);
921			}
922			if (qp->msg_neg_copy) {
923				neg_copy_smp.msg_flags = MSG_RCVCOPY;
924				return (&neg_copy_smp);
925			}
926			for (; mp; mp = list_next(&qp->msg_list, mp)) {
927				if (mp->msg_type <= low_msgtype &&
928				    !(smp && smp->msg_type <= mp->msg_type)) {
929					smp = mp;
930					low_msgtype = mp->msg_type;
931					if (low_msgtype == qp_low) {
932						break;
933					}
934				}
935			}
936			if (smp) {
937				/*
938				 * Update the lowest message type.
939				 */
940				qp->msg_lowest_type = smp->msg_type;
941			}
942		}
943	}
944	return (smp);
945}
946
947/*
948 * msgids system call.
949 */
950static int
951msgids(int *buf, uint_t nids, uint_t *pnids)
952{
953	int error;
954
955	if (error = ipc_ids(msq_svc, buf, nids, pnids))
956		return (set_errno(error));
957
958	return (0);
959}
960
961#define	RND(x)		roundup((x), sizeof (size_t))
962#define	RND32(x)	roundup((x), sizeof (size32_t))
963
964/*
965 * msgsnap system call.
966 */
967static int
968msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
969{
970	struct msg	*mp;	/* ptr to msg on q */
971	kmsqid_t	*qp;	/* ptr to associated q */
972	kmutex_t	*lock;
973	size_t		size;
974	size_t		nmsg;
975	struct msg	**snaplist;
976	int		error, i;
977	model_t		mdl = get_udatamodel();
978	STRUCT_DECL(msgsnap_head, head);
979	STRUCT_DECL(msgsnap_mhead, mhead);
980
981	STRUCT_INIT(head, mdl);
982	STRUCT_INIT(mhead, mdl);
983
984	if (bufsz < STRUCT_SIZE(head))
985		return (set_errno(EINVAL));
986
987	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
988		return (set_errno(EINVAL));
989
990	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
991		mutex_exit(lock);
992		return (set_errno(error));
993	}
994	ipc_hold(msq_svc, (kipc_perm_t *)qp);
995
996	/*
997	 * First compute the required buffer size and
998	 * the number of messages on the queue.
999	 */
1000	size = nmsg = 0;
1001	for (mp = list_head(&qp->msg_list); mp;
1002	    mp = list_next(&qp->msg_list, mp)) {
1003		if (msgtyp == 0 ||
1004		    (msgtyp > 0 && msgtyp == mp->msg_type) ||
1005		    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1006			nmsg++;
1007			if (mdl == DATAMODEL_NATIVE)
1008				size += RND(mp->msg_size);
1009			else
1010				size += RND32(mp->msg_size);
1011		}
1012	}
1013
1014	size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1015	if (size > bufsz)
1016		nmsg = 0;
1017
1018	if (nmsg > 0) {
1019		/*
1020		 * Mark the messages as being copied.
1021		 */
1022		snaplist = (struct msg **)kmem_alloc(nmsg *
1023		    sizeof (struct msg *), KM_SLEEP);
1024		i = 0;
1025		for (mp = list_head(&qp->msg_list); mp;
1026		    mp = list_next(&qp->msg_list, mp)) {
1027			if (msgtyp == 0 ||
1028			    (msgtyp > 0 && msgtyp == mp->msg_type) ||
1029			    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1030				msg_hold(mp);
1031				snaplist[i] = mp;
1032				i++;
1033			}
1034		}
1035	}
1036	mutex_exit(lock);
1037
1038	/*
1039	 * Copy out the buffer header.
1040	 */
1041	STRUCT_FSET(head, msgsnap_size, size);
1042	STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1043	if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1044		error = EFAULT;
1045
1046	buf += STRUCT_SIZE(head);
1047
1048	/*
1049	 * Now copy out the messages one by one.
1050	 */
1051	for (i = 0; i < nmsg; i++) {
1052		mp = snaplist[i];
1053		if (error == 0) {
1054			STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1055			STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1056			if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1057				error = EFAULT;
1058			buf += STRUCT_SIZE(mhead);
1059
1060			if (error == 0 &&
1061			    mp->msg_size != 0 &&
1062			    copyout(mp->msg_addr, buf, mp->msg_size))
1063				error = EFAULT;
1064			if (mdl == DATAMODEL_NATIVE)
1065				buf += RND(mp->msg_size);
1066			else
1067				buf += RND32(mp->msg_size);
1068		}
1069		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1070		msg_rele(mp);
1071		/* Check for msg q deleted or reallocated */
1072		if (IPC_FREE(&qp->msg_perm))
1073			error = EIDRM;
1074		mutex_exit(lock);
1075	}
1076
1077	(void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1078	ipc_rele(msq_svc, (kipc_perm_t *)qp);
1079
1080	if (nmsg > 0)
1081		kmem_free(snaplist, nmsg * sizeof (struct msg *));
1082
1083	if (error)
1084		return (set_errno(error));
1085	return (0);
1086}
1087
1088#define	MSG_PREALLOC_LIMIT 8192
1089
1090/*
1091 * msgsnd system call.
1092 */
1093static int
1094msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1095{
1096	kmsqid_t	*qp;
1097	kmutex_t	*lock = NULL;
1098	struct msg	*mp = NULL;
1099	long		type;
1100	int		error = 0, wait_wakeup = 0;
1101	msgq_wakeup_t   msg_entry;
1102	model_t		mdl = get_udatamodel();
1103	STRUCT_HANDLE(ipcmsgbuf, umsgp);
1104
1105	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
1106	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1107
1108	if (mdl == DATAMODEL_NATIVE) {
1109		if (copyin(msgp, &type, sizeof (type)))
1110			return (set_errno(EFAULT));
1111	} else {
1112		int32_t	type32;
1113		if (copyin(msgp, &type32, sizeof (type32)))
1114			return (set_errno(EFAULT));
1115		type = type32;
1116	}
1117
1118	if (type < 1)
1119		return (set_errno(EINVAL));
1120
1121	/*
1122	 * We want the value here large enough that most of the
1123	 * the message operations will use the "lockless" path,
1124	 * but small enough that a user can not reserve large
1125	 * chunks of kernel memory unless they have a valid
1126	 * reason to.
1127	 */
1128	if (msgsz <= MSG_PREALLOC_LIMIT) {
1129		/*
1130		 * We are small enough that we can afford to do the
1131		 * allocation now.  This saves dropping the lock
1132		 * and then reacquiring the lock.
1133		 */
1134		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1135		mp->msg_copycnt = 1;
1136		mp->msg_size = msgsz;
1137		if (msgsz) {
1138			mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1139			if (copyin(STRUCT_FADDR(umsgp, mtext),
1140			    mp->msg_addr, msgsz) == -1) {
1141				error = EFAULT;
1142				goto msgsnd_out;
1143			}
1144		}
1145	}
1146
1147	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1148		error = EINVAL;
1149		goto msgsnd_out;
1150	}
1151
1152	ipc_hold(msq_svc, (kipc_perm_t *)qp);
1153
1154	if (msgsz > qp->msg_qbytes) {
1155		error = EINVAL;
1156		goto msgsnd_out;
1157	}
1158
1159	if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1160		goto msgsnd_out;
1161
1162top:
1163	/*
1164	 * Allocate space on q, message header, & buffer space.
1165	 */
1166	ASSERT(qp->msg_qnum <= qp->msg_qmax);
1167	while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1168	    (qp->msg_qnum == qp->msg_qmax)) {
1169		int cvres;
1170
1171		if (msgflg & IPC_NOWAIT) {
1172			error = EAGAIN;
1173			goto msgsnd_out;
1174		}
1175
1176		wait_wakeup = 0;
1177		qp->msg_snd_cnt++;
1178		msg_entry.msgw_snd_size = msgsz;
1179		msg_entry.msgw_thrd = curthread;
1180		msg_entry.msgw_type = type;
1181		cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1182		list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1183		if (qp->msg_snd_smallest > msgsz)
1184			qp->msg_snd_smallest = msgsz;
1185		cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1186		lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1187		qp->msg_snd_cnt--;
1188		if (list_link_active(&msg_entry.msgw_list))
1189			list_remove(&qp->msg_wait_rcv, &msg_entry);
1190		if (error = msgq_check_err(qp, cvres)) {
1191			goto msgsnd_out;
1192		}
1193		wait_wakeup = 1;
1194	}
1195
1196	if (mp == NULL) {
1197		int failure;
1198
1199		mutex_exit(lock);
1200		ASSERT(msgsz > 0);
1201		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1202		mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1203		mp->msg_size = msgsz;
1204		mp->msg_copycnt = 1;
1205
1206		failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1207		    mp->msg_addr, msgsz) == -1);
1208		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1209		if (IPC_FREE(&qp->msg_perm)) {
1210			error = EIDRM;
1211			goto msgsnd_out;
1212		}
1213		if (failure) {
1214			error = EFAULT;
1215			goto msgsnd_out;
1216		}
1217		goto top;
1218	}
1219
1220	/*
1221	 * Everything is available, put msg on q.
1222	 */
1223	qp->msg_qnum++;
1224	qp->msg_cbytes += msgsz;
1225	qp->msg_lspid = curproc->p_pid;
1226	qp->msg_stime = gethrestime_sec();
1227	mp->msg_type = type;
1228	if (qp->msg_lowest_type > type)
1229		qp->msg_lowest_type = type;
1230	list_insert_tail(&qp->msg_list, mp);
1231	/*
1232	 * Get the proper receiver going.
1233	 */
1234	msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1235
1236msgsnd_out:
1237	/*
1238	 * We were woken up from the send wait list, but an
1239	 * an error occured on placing the message onto the
1240	 * msg queue.  Given that, we need to do the wakeup
1241	 * dance again.
1242	 */
1243
1244	if (wait_wakeup && error) {
1245		msg_wakeup_senders(qp);
1246	}
1247	if (lock)
1248		ipc_rele(msq_svc, (kipc_perm_t *)qp);	/* drops lock */
1249
1250	if (error) {
1251		if (mp)
1252			msg_rele(mp);
1253		return (set_errno(error));
1254	}
1255
1256	return (0);
1257}
1258
1259static void
1260msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1261{
1262	msg_select_t	*walker = *flist;
1263	msgq_wakeup_t	*wakeup;
1264	uint_t		msg_hash;
1265
1266	msg_hash = msg_type_hash(type);
1267
1268	do {
1269		wakeup = walker->selection(qp, msg_hash, type);
1270		walker = walker->next_selection;
1271	} while (!wakeup && walker != *flist);
1272
1273	*flist = (*flist)->next_selection;
1274	if (wakeup) {
1275		if (type) {
1276			wakeup->msgw_snd_wake = type;
1277		}
1278		cv_signal(&wakeup->msgw_wake_cv);
1279	}
1280}
1281
1282static uint_t
1283msg_type_hash(long msg_type)
1284{
1285	if (msg_type < 0) {
1286		long	hash = -msg_type / MSG_NEG_INTERVAL;
1287		/*
1288		 * Negative message types are hashed over an
1289		 * interval.  Any message type that hashes
1290		 * beyond MSG_MAX_QNUM is automatically placed
1291		 * in the last bucket.
1292		 */
1293		if (hash > MSG_MAX_QNUM)
1294			hash = MSG_MAX_QNUM;
1295		return (hash);
1296	}
1297
1298	/*
1299	 * 0 or positive message type.  The first bucket is reserved for
1300	 * message receivers of type 0, the other buckets we hash into.
1301	 */
1302	if (msg_type)
1303		return (1 + (msg_type % MSG_MAX_QNUM));
1304	return (0);
1305}
1306
1307/*
1308 * Routines to see if we have a receiver of type 0 either blocked waiting
1309 * for a message.  Simply return the first guy on the list.
1310 */
1311
1312static msgq_wakeup_t *
1313/* ARGSUSED */
1314msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1315{
1316	msgq_wakeup_t	*walker;
1317
1318	walker = list_head(&qp->msg_wait_snd[0]);
1319
1320	if (walker)
1321		list_remove(&qp->msg_wait_snd[0], walker);
1322	return (walker);
1323}
1324
1325static msgq_wakeup_t *
1326/* ARGSUSED */
1327msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1328{
1329	msgq_wakeup_t	*walker;
1330
1331	walker = list_head(&qp->msg_cpy_block);
1332	if (walker)
1333		list_remove(&qp->msg_cpy_block, walker);
1334	return (walker);
1335}
1336
1337static msgq_wakeup_t *
1338msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1339{
1340	msgq_wakeup_t	*walker;
1341
1342	walker = list_head(&qp->msg_wait_snd[msg_hash]);
1343
1344	while (walker && walker->msgw_type != type)
1345		walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1346	if (walker)
1347		list_remove(&qp->msg_wait_snd[msg_hash], walker);
1348	return (walker);
1349}
1350
1351/* ARGSUSED */
1352static msgq_wakeup_t *
1353msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1354{
1355	msgq_wakeup_t	*qptr;
1356	int		count;
1357	int		check_index;
1358	int		neg_index;
1359	int		nbuckets;
1360
1361	if (!qp->msg_ngt_cnt) {
1362		return (NULL);
1363	}
1364	neg_index = msg_type_hash(-type);
1365
1366	/*
1367	 * Check for a match among the negative type queues.  Any buckets
1368	 * at neg_index or larger can match the type.  Use the last send
1369	 * time to randomize the starting bucket to prevent starvation.
1370	 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1371	 * from the random starting point, and wrapping around after
1372	 * MSG_MAX_QNUM.
1373	 */
1374
1375	nbuckets = MSG_MAX_QNUM - neg_index + 1;
1376	check_index = neg_index + (qp->msg_stime % nbuckets);
1377
1378	for (count = nbuckets; count > 0; count--) {
1379		qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1380		while (qptr) {
1381			/*
1382			 * The lowest hash bucket may actually contain
1383			 * message types that are not valid for this
1384			 * request.  This can happen due to the fact that
1385			 * the message buckets actually contain a consecutive
1386			 * range of types.
1387			 */
1388			if (-qptr->msgw_type >= type) {
1389				list_remove(&qp->msg_wait_snd_ngt[check_index],
1390				    qptr);
1391				return (qptr);
1392			}
1393			qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1394			    qptr);
1395		}
1396		if (++check_index > MSG_MAX_QNUM) {
1397			check_index = neg_index;
1398		}
1399	}
1400	return (NULL);
1401}
1402
1403static int
1404msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1405    kmsqid_t *qp)
1406{
1407	int		cvres;
1408
1409	cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1410
1411	list_insert_tail(queue, entry);
1412
1413	qp->msg_rcv_cnt++;
1414	cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1415	*lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1416	qp->msg_rcv_cnt--;
1417
1418	if (list_link_active(&entry->msgw_list)) {
1419		/*
1420		 * We woke up unexpectedly, remove ourself.
1421		 */
1422		list_remove(queue, entry);
1423	}
1424
1425	return (cvres);
1426}
1427
1428static void
1429msg_rcvq_wakeup_all(list_t *q_ptr)
1430{
1431	msgq_wakeup_t	*q_walk;
1432
1433	while (q_walk = list_head(q_ptr)) {
1434		list_remove(q_ptr, q_walk);
1435		cv_signal(&q_walk->msgw_wake_cv);
1436	}
1437}
1438
1439/*
1440 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441 * system calls.
1442 */
1443static ssize_t
1444msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1445    uintptr_t a4, uintptr_t a5)
1446{
1447	ssize_t error;
1448
1449	switch (opcode) {
1450	case MSGGET:
1451		error = msgget((key_t)a1, (int)a2);
1452		break;
1453	case MSGCTL:
1454		error = msgctl((int)a1, (int)a2, (void *)a3);
1455		break;
1456	case MSGRCV:
1457		error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1458		    (size_t)a3, (long)a4, (int)a5);
1459		break;
1460	case MSGSND:
1461		error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1462		    (size_t)a3, (int)a4);
1463		break;
1464	case MSGIDS:
1465		error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1466		break;
1467	case MSGSNAP:
1468		error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1469		break;
1470	default:
1471		error = set_errno(EINVAL);
1472		break;
1473	}
1474
1475	return (error);
1476}
1477
1478/*
1479 * Determine if a writer who is waiting can process its message.  If so
1480 * wake it up.
1481 */
1482static void
1483msg_wakeup_senders(kmsqid_t *qp)
1484{
1485	struct msgq_wakeup *ptr, *optr;
1486	size_t avail, smallest;
1487	int msgs_out;
1488
1489	/*
1490	 * Is there a writer waiting, and if so, can it be serviced? If
1491	 * not return back to the caller.
1492	 */
1493	if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1494		return;
1495
1496	avail = qp->msg_qbytes - qp->msg_cbytes;
1497	if (avail < qp->msg_snd_smallest)
1498		return;
1499
1500	ptr = list_head(&qp->msg_wait_rcv);
1501	if (ptr == NULL) {
1502		qp->msg_snd_smallest = MSG_SMALL_INIT;
1503		return;
1504	}
1505	optr = ptr;
1506
1507	/*
1508	 * smallest:	minimum message size of all queued writers
1509	 *
1510	 * avail:	amount of space left on the msgq
1511	 *		if all the writers we have woken up are successful.
1512	 *
1513	 * msgs_out:	is the number of messages on the message queue if
1514	 *		all the writers we have woken up are successful.
1515	 */
1516
1517	smallest = MSG_SMALL_INIT;
1518	msgs_out = qp->msg_qnum;
1519	while (ptr) {
1520		ptr = list_next(&qp->msg_wait_rcv, ptr);
1521		if (optr->msgw_snd_size <= avail) {
1522			list_remove(&qp->msg_wait_rcv, optr);
1523			avail -= optr->msgw_snd_size;
1524			cv_signal(&optr->msgw_wake_cv);
1525			msgs_out++;
1526			if (msgs_out == qp->msg_qmax ||
1527			    avail < qp->msg_snd_smallest)
1528				break;
1529		} else {
1530			if (smallest > optr->msgw_snd_size)
1531				smallest = optr->msgw_snd_size;
1532		}
1533		optr = ptr;
1534	}
1535
1536	/*
1537	 * Reset the smallest message size if the entire list has been visited
1538	 */
1539	if (ptr == NULL && smallest != MSG_SMALL_INIT)
1540		qp->msg_snd_smallest = smallest;
1541}
1542
1543#ifdef	_SYSCALL32_IMPL
1544/*
1545 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1546 * system calls for 32-bit callers on LP64 kernel.
1547 */
1548static ssize32_t
1549msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1550    uint32_t a4, uint32_t a5)
1551{
1552	ssize_t error;
1553
1554	switch (opcode) {
1555	case MSGGET:
1556		error = msgget((key_t)a1, (int)a2);
1557		break;
1558	case MSGCTL:
1559		error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1560		break;
1561	case MSGRCV:
1562		error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1563		    (size_t)a3, (long)(int32_t)a4, (int)a5);
1564		break;
1565	case MSGSND:
1566		error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1567		    (size_t)(int32_t)a3, (int)a4);
1568		break;
1569	case MSGIDS:
1570		error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1571		    (uint_t *)(uintptr_t)a3);
1572		break;
1573	case MSGSNAP:
1574		error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1575		    (long)(int32_t)a4);
1576		break;
1577	default:
1578		error = set_errno(EINVAL);
1579		break;
1580	}
1581
1582	return (error);
1583}
1584#endif	/* SYSCALL32_IMPL */
1585