OpenSolaris_b135/uts/common/os/msg.c

Compare this file to the similar file:
Show the results in this format:

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

/*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
/*	  All Rights Reserved  	*/


/*
 * Inter-Process Communication Message Facility.
 *
 * See os/ipc.c for a description of common IPC functionality.
 *
 * Resource controls
 * -----------------
 *
 * Control:      zone.max-msg-ids (rc_zone_msgmni)
 * Description:  Maximum number of message queue ids allowed a zone.
 *
 *   When msgget() is used to allocate a message queue, one id is
 *   allocated.  If the id allocation doesn't succeed, msgget() fails
 *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
 *   the id is deallocated.
 *
 * Control:      project.max-msg-ids (rc_project_msgmni)
 * Description:  Maximum number of message queue ids allowed a project.
 *
 *   When msgget() is used to allocate a message queue, one id is
 *   allocated.  If the id allocation doesn't succeed, msgget() fails
 *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
 *   the id is deallocated.
 *
 * Control:      process.max-msg-qbytes (rc_process_msgmnb)
 * Description:  Maximum number of bytes of messages on a message queue.
 *
 *   When msgget() successfully allocates a message queue, the minimum
 *   enforced value of this limit is used to initialize msg_qbytes.
 *
 * Control:      process.max-msg-messages (rc_process_msgtql)
 * Description:  Maximum number of messages on a message queue.
 *
 *   When msgget() successfully allocates a message queue, the minimum
 *   enforced value of this limit is used to initialize a per-queue
 *   limit on the number of messages.
 */

#include <sys/types.h>
#include <sys/t_lock.h>
#include <sys/param.h>
#include <sys/cred.h>
#include <sys/user.h>
#include <sys/proc.h>
#include <sys/time.h>
#include <sys/ipc.h>
#include <sys/ipc_impl.h>
#include <sys/msg.h>
#include <sys/msg_impl.h>
#include <sys/list.h>
#include <sys/systm.h>
#include <sys/sysmacros.h>
#include <sys/cpuvar.h>
#include <sys/kmem.h>
#include <sys/ddi.h>
#include <sys/errno.h>
#include <sys/cmn_err.h>
#include <sys/debug.h>
#include <sys/project.h>
#include <sys/modctl.h>
#include <sys/syscall.h>
#include <sys/policy.h>
#include <sys/zone.h>

#include <c2/audit.h>

/*
 * The following tunables are obsolete.  Though for compatibility we
 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
 * mechanism for administrating the IPC Message facility is through the
 * resource controls described at the top of this file.
 */
size_t	msginfo_msgmax = 2048;	/* (obsolete) */
size_t	msginfo_msgmnb = 4096;	/* (obsolete) */
int	msginfo_msgmni = 50;	/* (obsolete) */
int	msginfo_msgtql = 40;	/* (obsolete) */
int	msginfo_msgssz = 8;	/* (obsolete) */
int	msginfo_msgmap = 0;	/* (obsolete) */
ushort_t msginfo_msgseg = 1024;	/* (obsolete) */

extern rctl_hndl_t rc_zone_msgmni;
extern rctl_hndl_t rc_project_msgmni;
extern rctl_hndl_t rc_process_msgmnb;
extern rctl_hndl_t rc_process_msgtql;
static ipc_service_t *msq_svc;
static zone_key_t msg_zone_key;

static void msg_dtor(kipc_perm_t *);
static void msg_rmid(kipc_perm_t *);
static void msg_remove_zone(zoneid_t, void *);

/*
 * Module linkage information for the kernel.
 */
static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
	uintptr_t a4, uintptr_t a5);

static struct sysent ipcmsg_sysent = {
	6,
#ifdef	_LP64
	SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
#else
	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
#endif
	(int (*)())msgsys
};

#ifdef	_SYSCALL32_IMPL
static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
	uint32_t a4, uint32_t a5);

static struct sysent ipcmsg_sysent32 = {
	6,
	SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
	(int (*)())msgsys32
};
#endif	/* _SYSCALL32_IMPL */

static struct modlsys modlsys = {
	&mod_syscallops, "System V message facility", &ipcmsg_sysent
};

#ifdef _SYSCALL32_IMPL
static struct modlsys modlsys32 = {
	&mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
};
#endif

/*
 *      Big Theory statement for message queue correctness
 *
 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
 * receivers who are waiting for an event.  Using the cv_broadcast method
 * resulted in negative scaling when the number of waiting receivers are large
 * (the thundering herd problem).  Instead, the receivers waiting to receive a
 * message are now linked in a queue-like fashion and awaken one at a time in
 * a controlled manner.
 *
 * Receivers can block on two different classes of waiting list:
 *    1) "sendwait" list, which is the more complex list of the two.  The
 *	  receiver will be awakened by a sender posting a new message.  There
 *	  are two types of "sendwait" list used:
 *		a) msg_wait_snd: handles all receivers who are looking for
 *		   a message type >= 0, but was unable to locate a match.
 *
 *		   slot 0: reserved for receivers that have designated they
 *			   will take any message type.
 *		   rest:   consist of receivers requesting a specific type
 *			   but the type was not present.  The entries are
 *			   hashed into a bucket in an attempt to keep
 *			   any list search relatively short.
 * 		b) msg_wait_snd_ngt: handles all receivers that have designated
 *		   a negative message type. Unlike msg_wait_snd, the hash bucket
 *		   serves a range of negative message types (-1 to -5, -6 to -10
 *		   and so forth), where the last bucket is reserved for all the
 *		   negative message types that hash outside of MSG_MAX_QNUM - 1.
 *		   This is done this way to simplify the operation of locating a
 *		   negative message type.
 *
 *    2) "copyout" list, where the receiver is awakened by another
 *	 receiver after a message is copied out.  This is a linked list
 *	 of waiters that are awakened one at a time.  Although the solution is
 *	 not optimal, the complexity that would be added in for waking
 *	 up the right entry far exceeds any potential pay back (too many
 *	 correctness and corner case issues).
 *
 * The lists are doubly linked.  In the case of the "sendwait"
 * list, this allows the thread to remove itself from the list without having
 * to traverse the list.  In the case of the "copyout" list it simply allows
 * us to use common functions with the "sendwait" list.
 *
 * To make sure receivers are not hung out to dry, we must guarantee:
 *    1. If any queued message matches any receiver, then at least one
 *       matching receiver must be processing the request.
 *    2. Blocking on the copyout queue is only temporary while messages
 *	 are being copied out.  The process is guaranted to wakeup
 *	 when it gets to front of the queue (copyout is a FIFO).
 *
 * Rules for blocking and waking up:
 *   1. A receiver entering msgrcv must examine all messages for a match
 *      before blocking on a sendwait queue.
 *   2. If the receiver blocks because the message it chose is already
 *	being copied out, then when it wakes up needs to start start
 *	checking the messages from the beginning.
 *   3) When ever a process returns from msgrcv for any reason, if it
 *	had attempted to copy a message or blocked waiting for a copy
 *	to complete it needs to wakeup the next receiver blocked on
 *	a copy out.
 *   4) When a message is sent, the sender selects a process waiting
 *	for that type of message.  This selection process rotates between
 *	receivers types of 0, negative and positive to prevent starvation of
 *	any one particular receiver type.
 *   5) The following are the scenarios for processes that are awakened
 *	by a msgsnd:
 *		a) The process finds the message and is able to copy
 *		   it out.  Once complete, the process returns.
 *		b) The message that was sent that triggered the wakeup is no
 *		   longer available (another process found the message first).
 *		   We issue a wakeup on copy queue and then go back to
 *		   sleep waiting for another matching message to be sent.
 *		c) The message that was supposed to be processed was
 *		   already serviced by another process.  However a different
 *		   message is present which we can service.  The message
 *		   is copied and the process returns.
 *		d) The message is found, but some sort of error occurs that
 *		   prevents the message from being copied.  The receiver
 *		   wakes up the next sender that can service this message
 *		   type and returns an error to the caller.
 *		e) The message is found, but it is marked as being copied
 *		   out.  The receiver then goes to sleep on the copyout
 *		   queue where it will be awakened again sometime in the future.
 *
 *
 *   6) Whenever a message is found that matches the message type designated,
 * 	but is being copied out we have to block on the copyout queue.
 *	After process copying finishes the copy out, it  must wakeup (either
 *	directly or indirectly) all receivers who blocked on its copyout,
 *	so they are guaranteed a chance to examine the remaining messages.
 *	This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
 *	and so on.  The chain cannot be broken.  This leads to the following
 *	cases:
 *		a) A receiver is finished copying the message (or encountered)
 *		   an error), the first entry on the copyout queue is woken
 *		   up.
 *		b) When the receiver is woken up, it attempts to locate
 *		   a message type match.
 *		c) If a message type is found and
 *			-- MSG_RCVCOPY flag is not set, the message is
 *			   marked for copying out.  Regardless of the copyout
 *			   success the next entry on the copyout queue is
 *			   awakened and the operation is completed.
 *			-- MSG_RCVCOPY is set, we simply go back to sleep again
 *			   on the copyout queue.
 *		d) If the message type is not found then we wakeup the next
 *		   process on the copyout queue.
 *   7) If a msgsnd is unable to complete for of any of the following reasons
 *	  a) the msgq has no space for the message
 *	  b) the maximum number of messages allowed has been reached
 *      then one of two things happen:
 *	  1) If the passed in msg_flag has IPC_NOWAIT set, then
 *	     an error is returned.
 *	  2) The IPC_NOWAIT bit is not set in msg_flag, then the
 *	     the thread is placed to sleep until the request can be
 *	     serviced.
 *   8) When waking a thread waiting to send a message, a check is done to
 *      verify that the operation being asked for by the thread will complete.
 *      This decision making process is done in a loop where the oldest request
 *      is checked first. The search will continue until there is no more
 *	room on the msgq or we have checked all the waiters.
 */

static uint_t msg_type_hash(long);
static int msgq_check_err(kmsqid_t *qp, int cvres);
static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
    kmsqid_t *);
static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
    struct msg *, struct ipcmsgbuf *, int);
static void msg_rcvq_wakeup_all(list_t *);
static void msg_wakeup_senders(kmsqid_t *);
static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
static struct msg *msgrcv_lookup(kmsqid_t *, long);

msg_select_t msg_fnd_sndr[] = {
	{ msg_fnd_any_snd, &msg_fnd_sndr[1] },
	{ msg_fnd_spc_snd, &msg_fnd_sndr[2] },
	{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
};

msg_select_t msg_fnd_rdr[1] = {
	{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
};

static struct modlinkage modlinkage = {
	MODREV_1,
	&modlsys,
#ifdef _SYSCALL32_IMPL
	&modlsys32,
#endif
	NULL
};

#define	MSG_SMALL_INIT (size_t)-1
int
_init(void)
{
	int result;

	msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
	    sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
	    offsetof(ipc_rqty_t, ipcq_msgmni));
	zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);

	if ((result = mod_install(&modlinkage)) == 0)
		return (0);

	(void) zone_key_delete(msg_zone_key);
	ipcs_destroy(msq_svc);

	return (result);
}

int
_fini(void)
{
	return (EBUSY);
}

int
_info(struct modinfo *modinfop)
{
	return (mod_info(&modlinkage, modinfop));
}

static void
msg_dtor(kipc_perm_t *perm)
{
	kmsqid_t *qp = (kmsqid_t *)perm;
	int		ii;

	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
		ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
		ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
		list_destroy(&qp->msg_wait_snd[ii]);
		list_destroy(&qp->msg_wait_snd_ngt[ii]);
	}
	ASSERT(list_is_empty(&qp->msg_cpy_block));
	ASSERT(list_is_empty(&qp->msg_wait_rcv));
	list_destroy(&qp->msg_cpy_block);
	ASSERT(qp->msg_snd_cnt == 0);
	ASSERT(qp->msg_cbytes == 0);
	list_destroy(&qp->msg_list);
	list_destroy(&qp->msg_wait_rcv);
}


#define	msg_hold(mp)	(mp)->msg_copycnt++

/*
 * msg_rele - decrement the reference count on the message.  When count
 * reaches zero, free message header and contents.
 */
static void
msg_rele(struct msg *mp)
{
	ASSERT(mp->msg_copycnt > 0);
	if (mp->msg_copycnt-- == 1) {
		if (mp->msg_addr)
			kmem_free(mp->msg_addr, mp->msg_size);
		kmem_free(mp, sizeof (struct msg));
	}
}

/*
 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
 * waiting for free bytes on queue.
 *
 * Called with queue locked.
 */
static void
msgunlink(kmsqid_t *qp, struct msg *mp)
{
	list_remove(&qp->msg_list, mp);
	qp->msg_qnum--;
	qp->msg_cbytes -= mp->msg_size;
	msg_rele(mp);

	/* Wake up waiting writers */
	msg_wakeup_senders(qp);
}

static void
msg_rmid(kipc_perm_t *perm)
{
	kmsqid_t *qp = (kmsqid_t *)perm;
	struct msg *mp;
	int		ii;


	while ((mp = list_head(&qp->msg_list)) != NULL)
		msgunlink(qp, mp);
	ASSERT(qp->msg_cbytes == 0);

	/*
	 * Wake up everyone who is in a wait state of some sort
	 * for this message queue.
	 */
	for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
		msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
		msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
	}
	msg_rcvq_wakeup_all(&qp->msg_cpy_block);
	msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
}

/*
 * msgctl system call.
 *
 * gets q lock (via ipc_lookup), releases before return.
 * may call users of msg_lock
 */
static int
msgctl(int msgid, int cmd, void *arg)
{
	STRUCT_DECL(msqid_ds, ds);		/* SVR4 queue work area */
	kmsqid_t		*qp;		/* ptr to associated q */
	int			error;
	struct	cred		*cr;
	model_t	mdl = get_udatamodel();
	struct msqid_ds64	ds64;
	kmutex_t		*lock;
	proc_t			*pp = curproc;

	STRUCT_INIT(ds, mdl);
	cr = CRED();

	/*
	 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
	 */
	switch (cmd) {
	case IPC_SET:
		if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
			return (set_errno(EFAULT));
		break;

	case IPC_SET64:
		if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
			return (set_errno(EFAULT));
		break;

	case IPC_RMID:
		if (error = ipc_rmid(msq_svc, msgid, cr))
			return (set_errno(error));
		return (0);
	}

	/*
	 * get msqid_ds for this msgid
	 */
	if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
		return (set_errno(EINVAL));

	switch (cmd) {
	case IPC_SET:
		if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
		    secpolicy_ipc_config(cr) != 0) {
			mutex_exit(lock);
			return (set_errno(EPERM));
		}
		if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
		    &STRUCT_BUF(ds)->msg_perm, mdl)) {
			mutex_exit(lock);
			return (set_errno(error));
		}
		qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
		qp->msg_ctime = gethrestime_sec();
		break;

	case IPC_STAT:
		if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
			mutex_exit(lock);
			return (set_errno(error));
		}

		if (qp->msg_rcv_cnt)
			qp->msg_perm.ipc_mode |= MSG_RWAIT;
		if (qp->msg_snd_cnt)
			qp->msg_perm.ipc_mode |= MSG_WWAIT;
		ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
		STRUCT_FSETP(ds, msg_first, NULL); 	/* kernel addr */
		STRUCT_FSETP(ds, msg_last, NULL);
		STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
		STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
		STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
		STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
		STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
		STRUCT_FSET(ds, msg_stime, qp->msg_stime);
		STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
		STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
		break;

	case IPC_SET64:
		mutex_enter(&pp->p_lock);
		if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
		    secpolicy_ipc_config(cr) != 0 &&
		    rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
		    ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
			mutex_exit(&pp->p_lock);
			mutex_exit(lock);
			return (set_errno(EPERM));
		}
		mutex_exit(&pp->p_lock);
		if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
		    &ds64.msgx_perm)) {
			mutex_exit(lock);
			return (set_errno(error));
		}
		qp->msg_qbytes = ds64.msgx_qbytes;
		qp->msg_ctime = gethrestime_sec();
		break;

	case IPC_STAT64:
		if (qp->msg_rcv_cnt)
			qp->msg_perm.ipc_mode |= MSG_RWAIT;
		if (qp->msg_snd_cnt)
			qp->msg_perm.ipc_mode |= MSG_WWAIT;
		ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
		qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
		ds64.msgx_cbytes = qp->msg_cbytes;
		ds64.msgx_qnum = qp->msg_qnum;
		ds64.msgx_qbytes = qp->msg_qbytes;
		ds64.msgx_lspid = qp->msg_lspid;
		ds64.msgx_lrpid = qp->msg_lrpid;
		ds64.msgx_stime = qp->msg_stime;
		ds64.msgx_rtime = qp->msg_rtime;
		ds64.msgx_ctime = qp->msg_ctime;
		break;

	default:
		mutex_exit(lock);
		return (set_errno(EINVAL));
	}

	mutex_exit(lock);

	/*
	 * Do copyout last (after releasing mutex).
	 */
	switch (cmd) {
	case IPC_STAT:
		if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
			return (set_errno(EFAULT));
		break;

	case IPC_STAT64:
		if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
			return (set_errno(EFAULT));
		break;
	}

	return (0);
}

/*
 * Remove all message queues associated with a given zone.  Called by
 * zone_shutdown when the zone is halted.
 */
/*ARGSUSED1*/
static void
msg_remove_zone(zoneid_t zoneid, void *arg)
{
	ipc_remove_zone(msq_svc, zoneid);
}

/*
 * msgget system call.
 */
static int
msgget(key_t key, int msgflg)
{
	kmsqid_t	*qp;
	kmutex_t	*lock;
	int		id, error;
	int		ii;
	proc_t		*pp = curproc;

top:
	if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
		return (set_errno(error));

	if (IPC_FREE(&qp->msg_perm)) {
		mutex_exit(lock);
		mutex_exit(&pp->p_lock);

		list_create(&qp->msg_list, sizeof (struct msg),
		    offsetof(struct msg, msg_node));
		qp->msg_qnum = 0;
		qp->msg_lspid = qp->msg_lrpid = 0;
		qp->msg_stime = qp->msg_rtime = 0;
		qp->msg_ctime = gethrestime_sec();
		qp->msg_ngt_cnt = 0;
		qp->msg_neg_copy = 0;
		for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
			list_create(&qp->msg_wait_snd[ii],
			    sizeof (msgq_wakeup_t),
			    offsetof(msgq_wakeup_t, msgw_list));
			list_create(&qp->msg_wait_snd_ngt[ii],
			    sizeof (msgq_wakeup_t),
			    offsetof(msgq_wakeup_t, msgw_list));
		}
		/*
		 * The proper initialization of msg_lowest_type is to the
		 * highest possible value.  By doing this we guarantee that
		 * when the first send happens, the lowest type will be set
		 * properly.
		 */
		qp->msg_lowest_type = MSG_SMALL_INIT;
		list_create(&qp->msg_cpy_block,
		    sizeof (msgq_wakeup_t),
		    offsetof(msgq_wakeup_t, msgw_list));
		list_create(&qp->msg_wait_rcv,
		    sizeof (msgq_wakeup_t),
		    offsetof(msgq_wakeup_t, msgw_list));
		qp->msg_fnd_sndr = &msg_fnd_sndr[0];
		qp->msg_fnd_rdr = &msg_fnd_rdr[0];
		qp->msg_rcv_cnt = 0;
		qp->msg_snd_cnt = 0;
		qp->msg_snd_smallest = MSG_SMALL_INIT;

		if (error = ipc_commit_begin(msq_svc, key, msgflg,
		    (kipc_perm_t *)qp)) {
			if (error == EAGAIN)
				goto top;
			return (set_errno(error));
		}
		qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
		    pp->p_rctls, pp);
		qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
		    pp->p_rctls, pp);
		lock = ipc_commit_end(msq_svc, &qp->msg_perm);
	}
	if (audit_active)
		audit_ipcget(AT_IPC_MSG, (void *)qp);
	id = qp->msg_perm.ipc_id;
	mutex_exit(lock);
	return (id);
}

static ssize_t
msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
{
	struct msg	*smp;	/* ptr to best msg on q */
	kmsqid_t	*qp;	/* ptr to associated q */
	kmutex_t	*lock;
	size_t		xtsz;	/* transfer byte count */
	int		error = 0;
	int		cvres;
	uint_t		msg_hash;
	msgq_wakeup_t	msg_entry;

	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */

	msg_hash = msg_type_hash(msgtyp);
	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
		return ((ssize_t)set_errno(EINVAL));
	}
	ipc_hold(msq_svc, (kipc_perm_t *)qp);

	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
		goto msgrcv_out;
	}

	/*
	 * Various information (including the condvar_t) required for the
	 * process to sleep is provided by it's stack.
	 */
	msg_entry.msgw_thrd = curthread;
	msg_entry.msgw_snd_wake = 0;
	msg_entry.msgw_type = msgtyp;
findmsg:
	smp = msgrcv_lookup(qp, msgtyp);

	if (smp) {
		/*
		 * We found a possible message to copy out.
		 */
		if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
			long t = msg_entry.msgw_snd_wake;
			long copy_type = smp->msg_type;

			/*
			 * It is available, attempt to copy it.
			 */
			error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
			    smp, msgp, msgflg);

			/*
			 * It is possible to consume a different message
			 * type then what originally awakened for (negative
			 * types).  If this happens a check must be done to
			 * to determine if another receiver is available
			 * for the waking message type,  Failure to do this
			 * can result in a message on the queue that can be
			 * serviced by a sleeping receiver.
			 */
			if (!error && t && (copy_type != t))
				msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);

			/*
			 * Don't forget to wakeup a sleeper that blocked because
			 * we were copying things out.
			 */
			msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
			goto msgrcv_out;
		}
		/*
		 * The selected message is being copied out, so block.  We do
		 * not need to wake the next person up on the msg_cpy_block list
		 * due to the fact some one is copying out and they will get
		 * things moving again once the copy is completed.
		 */
		cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
		    &msg_entry, &lock, qp);
		error = msgq_check_err(qp, cvres);
		if (error) {
			goto msgrcv_out;
		}
		goto findmsg;
	}
	/*
	 * There isn't a message to copy out that matches the designated
	 * criteria.
	 */
	if (msgflg & IPC_NOWAIT) {
		error = ENOMSG;
		goto msgrcv_out;
	}
	msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);

	/*
	 * Wait for new message.  We keep the negative and positive types
	 * separate for performance reasons.
	 */
	msg_entry.msgw_snd_wake = 0;
	if (msgtyp >= 0) {
		cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
		    &msg_entry, &lock, qp);
	} else {
		qp->msg_ngt_cnt++;
		cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
		    &msg_entry, &lock, qp);
		qp->msg_ngt_cnt--;
	}

	if (!(error = msgq_check_err(qp, cvres))) {
		goto findmsg;
	}

msgrcv_out:
	if (error) {
		msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
		if (msg_entry.msgw_snd_wake) {
			msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
			    msg_entry.msgw_snd_wake);
		}
		ipc_rele(msq_svc, (kipc_perm_t *)qp);
		return ((ssize_t)set_errno(error));
	}
	ipc_rele(msq_svc, (kipc_perm_t *)qp);
	return ((ssize_t)xtsz);
}

static int
msgq_check_err(kmsqid_t *qp, int cvres)
{
	if (IPC_FREE(&qp->msg_perm)) {
		return (EIDRM);
	}

	if (cvres == 0) {
		return (EINTR);
	}

	return (0);
}

static int
msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
    size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
{
	size_t		xtsz;
	STRUCT_HANDLE(ipcmsgbuf, umsgp);
	model_t		mdl = get_udatamodel();
	int		copyerror = 0;

	STRUCT_SET_HANDLE(umsgp, mdl, msgp);
	if (msgsz < smp->msg_size) {
		if ((msgflg & MSG_NOERROR) == 0) {
			return (E2BIG);
		} else {
			xtsz = msgsz;
		}
	} else {
		xtsz = smp->msg_size;
	}
	*xtsz_ret = xtsz;

	/*
	 * To prevent a DOS attack we mark the message as being
	 * copied out and release mutex.  When the copy is completed
	 * we need to acquire the mutex and make the appropriate updates.
	 */
	ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
	smp->msg_flags |= MSG_RCVCOPY;
	msg_hold(smp);
	if (msgtyp < 0) {
		ASSERT(qp->msg_neg_copy == 0);
		qp->msg_neg_copy = 1;
	}
	mutex_exit(*lock);

	if (mdl == DATAMODEL_NATIVE) {
		copyerror = copyout(&smp->msg_type, msgp,
		    sizeof (smp->msg_type));
	} else {
		/*
		 * 32-bit callers need an imploded msg type.
		 */
		int32_t	msg_type32 = smp->msg_type;

		copyerror = copyout(&msg_type32, msgp,
		    sizeof (msg_type32));
	}

	if (copyerror == 0 && xtsz) {
		copyerror = copyout(smp->msg_addr,
		    STRUCT_FADDR(umsgp, mtext), xtsz);
	}

	/*
	 * Reclaim the mutex and make sure the message queue still exists.
	 */

	*lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
	if (msgtyp < 0) {
		qp->msg_neg_copy = 0;
	}
	ASSERT(smp->msg_flags & MSG_RCVCOPY);
	smp->msg_flags &= ~MSG_RCVCOPY;
	msg_rele(smp);
	if (IPC_FREE(&qp->msg_perm)) {
		return (EIDRM);
	}
	if (copyerror) {
		return (EFAULT);
	}
	qp->msg_lrpid = ttoproc(curthread)->p_pid;
	qp->msg_rtime = gethrestime_sec();
	msgunlink(qp, smp);
	return (0);
}

static struct msg *
msgrcv_lookup(kmsqid_t *qp, long msgtyp)
{
	struct msg 		*smp = NULL;
	long			qp_low;
	struct msg		*mp;	/* ptr to msg on q */
	long			low_msgtype;
	static struct msg	neg_copy_smp;

	mp = list_head(&qp->msg_list);
	if (msgtyp == 0) {
		smp = mp;
	} else {
		qp_low = qp->msg_lowest_type;
		if (msgtyp > 0) {
			/*
			 * If our lowest possible message type is larger than
			 * the message type desired, then we know there is
			 * no entry present.
			 */
			if (qp_low > msgtyp) {
				return (NULL);
			}

			for (; mp; mp = list_next(&qp->msg_list, mp)) {
				if (msgtyp == mp->msg_type) {
					smp = mp;
					break;
				}
			}
		} else {
			/*
			 * We have kept track of the lowest possible message
			 * type on the send queue.  This allows us to terminate
			 * the search early if we find a message type of that
			 * type.  Note, the lowest type may not be the actual
			 * lowest value in the system, it is only guaranteed
			 * that there isn't a value lower than that.
			 */
			low_msgtype = -msgtyp;
			if (low_msgtype < qp_low) {
				return (NULL);
			}
			if (qp->msg_neg_copy) {
				neg_copy_smp.msg_flags = MSG_RCVCOPY;
				return (&neg_copy_smp);
			}
			for (; mp; mp = list_next(&qp->msg_list, mp)) {
				if (mp->msg_type <= low_msgtype &&
				    !(smp && smp->msg_type <= mp->msg_type)) {
					smp = mp;
					low_msgtype = mp->msg_type;
					if (low_msgtype == qp_low) {
						break;
					}
				}
			}
			if (smp) {
				/*
				 * Update the lowest message type.
				 */
				qp->msg_lowest_type = smp->msg_type;
			}
		}
	}
	return (smp);
}

/*
 * msgids system call.
 */
static int
msgids(int *buf, uint_t nids, uint_t *pnids)
{
	int error;

	if (error = ipc_ids(msq_svc, buf, nids, pnids))
		return (set_errno(error));

	return (0);
}

#define	RND(x)		roundup((x), sizeof (size_t))
#define	RND32(x)	roundup((x), sizeof (size32_t))

/*
 * msgsnap system call.
 */
static int
msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
{
	struct msg	*mp;	/* ptr to msg on q */
	kmsqid_t	*qp;	/* ptr to associated q */
	kmutex_t	*lock;
	size_t		size;
	size_t		nmsg;
	struct msg	**snaplist;
	int		error, i;
	model_t		mdl = get_udatamodel();
	STRUCT_DECL(msgsnap_head, head);
	STRUCT_DECL(msgsnap_mhead, mhead);

	STRUCT_INIT(head, mdl);
	STRUCT_INIT(mhead, mdl);

	if (bufsz < STRUCT_SIZE(head))
		return (set_errno(EINVAL));

	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
		return (set_errno(EINVAL));

	if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
		mutex_exit(lock);
		return (set_errno(error));
	}
	ipc_hold(msq_svc, (kipc_perm_t *)qp);

	/*
	 * First compute the required buffer size and
	 * the number of messages on the queue.
	 */
	size = nmsg = 0;
	for (mp = list_head(&qp->msg_list); mp;
	    mp = list_next(&qp->msg_list, mp)) {
		if (msgtyp == 0 ||
		    (msgtyp > 0 && msgtyp == mp->msg_type) ||
		    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
			nmsg++;
			if (mdl == DATAMODEL_NATIVE)
				size += RND(mp->msg_size);
			else
				size += RND32(mp->msg_size);
		}
	}

	size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
	if (size > bufsz)
		nmsg = 0;

	if (nmsg > 0) {
		/*
		 * Mark the messages as being copied.
		 */
		snaplist = (struct msg **)kmem_alloc(nmsg *
		    sizeof (struct msg *), KM_SLEEP);
		i = 0;
		for (mp = list_head(&qp->msg_list); mp;
		    mp = list_next(&qp->msg_list, mp)) {
			if (msgtyp == 0 ||
			    (msgtyp > 0 && msgtyp == mp->msg_type) ||
			    (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
				msg_hold(mp);
				snaplist[i] = mp;
				i++;
			}
		}
	}
	mutex_exit(lock);

	/*
	 * Copy out the buffer header.
	 */
	STRUCT_FSET(head, msgsnap_size, size);
	STRUCT_FSET(head, msgsnap_nmsg, nmsg);
	if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
		error = EFAULT;

	buf += STRUCT_SIZE(head);

	/*
	 * Now copy out the messages one by one.
	 */
	for (i = 0; i < nmsg; i++) {
		mp = snaplist[i];
		if (error == 0) {
			STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
			STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
			if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
				error = EFAULT;
			buf += STRUCT_SIZE(mhead);

			if (error == 0 &&
			    mp->msg_size != 0 &&
			    copyout(mp->msg_addr, buf, mp->msg_size))
				error = EFAULT;
			if (mdl == DATAMODEL_NATIVE)
				buf += RND(mp->msg_size);
			else
				buf += RND32(mp->msg_size);
		}
		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
		msg_rele(mp);
		/* Check for msg q deleted or reallocated */
		if (IPC_FREE(&qp->msg_perm))
			error = EIDRM;
		mutex_exit(lock);
	}

	(void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
	ipc_rele(msq_svc, (kipc_perm_t *)qp);

	if (nmsg > 0)
		kmem_free(snaplist, nmsg * sizeof (struct msg *));

	if (error)
		return (set_errno(error));
	return (0);
}

#define	MSG_PREALLOC_LIMIT 8192

/*
 * msgsnd system call.
 */
static int
msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
{
	kmsqid_t	*qp;
	kmutex_t	*lock = NULL;
	struct msg	*mp = NULL;
	long		type;
	int		error = 0, wait_wakeup = 0;
	msgq_wakeup_t   msg_entry;
	model_t		mdl = get_udatamodel();
	STRUCT_HANDLE(ipcmsgbuf, umsgp);

	CPU_STATS_ADDQ(CPU, sys, msg, 1);	/* bump msg send/rcv count */
	STRUCT_SET_HANDLE(umsgp, mdl, msgp);

	if (mdl == DATAMODEL_NATIVE) {
		if (copyin(msgp, &type, sizeof (type)))
			return (set_errno(EFAULT));
	} else {
		int32_t	type32;
		if (copyin(msgp, &type32, sizeof (type32)))
			return (set_errno(EFAULT));
		type = type32;
	}

	if (type < 1)
		return (set_errno(EINVAL));

	/*
	 * We want the value here large enough that most of the
	 * the message operations will use the "lockless" path,
	 * but small enough that a user can not reserve large
	 * chunks of kernel memory unless they have a valid
	 * reason to.
	 */
	if (msgsz <= MSG_PREALLOC_LIMIT) {
		/*
		 * We are small enough that we can afford to do the
		 * allocation now.  This saves dropping the lock
		 * and then reacquiring the lock.
		 */
		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
		mp->msg_copycnt = 1;
		mp->msg_size = msgsz;
		if (msgsz) {
			mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
			if (copyin(STRUCT_FADDR(umsgp, mtext),
			    mp->msg_addr, msgsz) == -1) {
				error = EFAULT;
				goto msgsnd_out;
			}
		}
	}

	if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
		error = EINVAL;
		goto msgsnd_out;
	}

	ipc_hold(msq_svc, (kipc_perm_t *)qp);

	if (msgsz > qp->msg_qbytes) {
		error = EINVAL;
		goto msgsnd_out;
	}

	if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
		goto msgsnd_out;

top:
	/*
	 * Allocate space on q, message header, & buffer space.
	 */
	ASSERT(qp->msg_qnum <= qp->msg_qmax);
	while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
	    (qp->msg_qnum == qp->msg_qmax)) {
		int cvres;

		if (msgflg & IPC_NOWAIT) {
			error = EAGAIN;
			goto msgsnd_out;
		}

		wait_wakeup = 0;
		qp->msg_snd_cnt++;
		msg_entry.msgw_snd_size = msgsz;
		msg_entry.msgw_thrd = curthread;
		msg_entry.msgw_type = type;
		cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
		list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
		if (qp->msg_snd_smallest > msgsz)
			qp->msg_snd_smallest = msgsz;
		cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
		lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
		qp->msg_snd_cnt--;
		if (list_link_active(&msg_entry.msgw_list))
			list_remove(&qp->msg_wait_rcv, &msg_entry);
		if (error = msgq_check_err(qp, cvres)) {
			goto msgsnd_out;
		}
		wait_wakeup = 1;
	}

	if (mp == NULL) {
		int failure;

		mutex_exit(lock);
		ASSERT(msgsz > 0);
		mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
		mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
		mp->msg_size = msgsz;
		mp->msg_copycnt = 1;

		failure = (copyin(STRUCT_FADDR(umsgp, mtext),
		    mp->msg_addr, msgsz) == -1);
		lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
		if (IPC_FREE(&qp->msg_perm)) {
			error = EIDRM;
			goto msgsnd_out;
		}
		if (failure) {
			error = EFAULT;
			goto msgsnd_out;
		}
		goto top;
	}

	/*
	 * Everything is available, put msg on q.
	 */
	qp->msg_qnum++;
	qp->msg_cbytes += msgsz;
	qp->msg_lspid = curproc->p_pid;
	qp->msg_stime = gethrestime_sec();
	mp->msg_type = type;
	if (qp->msg_lowest_type > type)
		qp->msg_lowest_type = type;
	list_insert_tail(&qp->msg_list, mp);
	/*
	 * Get the proper receiver going.
	 */
	msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);

msgsnd_out:
	/*
	 * We were woken up from the send wait list, but an
	 * an error occured on placing the message onto the
	 * msg queue.  Given that, we need to do the wakeup
	 * dance again.
	 */

	if (wait_wakeup && error) {
		msg_wakeup_senders(qp);
	}
	if (lock)
		ipc_rele(msq_svc, (kipc_perm_t *)qp);	/* drops lock */

	if (error) {
		if (mp)
			msg_rele(mp);
		return (set_errno(error));
	}

	return (0);
}

static void
msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
{
	msg_select_t	*walker = *flist;
	msgq_wakeup_t	*wakeup;
	uint_t		msg_hash;

	msg_hash = msg_type_hash(type);

	do {
		wakeup = walker->selection(qp, msg_hash, type);
		walker = walker->next_selection;
	} while (!wakeup && walker != *flist);

	*flist = (*flist)->next_selection;
	if (wakeup) {
		if (type) {
			wakeup->msgw_snd_wake = type;
		}
		cv_signal(&wakeup->msgw_wake_cv);
	}
}

static uint_t
msg_type_hash(long msg_type)
{
	if (msg_type < 0) {
		long	hash = -msg_type / MSG_NEG_INTERVAL;
		/*
		 * Negative message types are hashed over an
		 * interval.  Any message type that hashes
		 * beyond MSG_MAX_QNUM is automatically placed
		 * in the last bucket.
		 */
		if (hash > MSG_MAX_QNUM)
			hash = MSG_MAX_QNUM;
		return (hash);
	}

	/*
	 * 0 or positive message type.  The first bucket is reserved for
	 * message receivers of type 0, the other buckets we hash into.
	 */
	if (msg_type)
		return (1 + (msg_type % MSG_MAX_QNUM));
	return (0);
}

/*
 * Routines to see if we have a receiver of type 0 either blocked waiting
 * for a message.  Simply return the first guy on the list.
 */

static msgq_wakeup_t *
/* ARGSUSED */
msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
{
	msgq_wakeup_t	*walker;

	walker = list_head(&qp->msg_wait_snd[0]);

	if (walker)
		list_remove(&qp->msg_wait_snd[0], walker);
	return (walker);
}

static msgq_wakeup_t *
/* ARGSUSED */
msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
{
	msgq_wakeup_t	*walker;

	walker = list_head(&qp->msg_cpy_block);
	if (walker)
		list_remove(&qp->msg_cpy_block, walker);
	return (walker);
}

static msgq_wakeup_t *
msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
{
	msgq_wakeup_t	*walker;

	walker = list_head(&qp->msg_wait_snd[msg_hash]);

	while (walker && walker->msgw_type != type)
		walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
	if (walker)
		list_remove(&qp->msg_wait_snd[msg_hash], walker);
	return (walker);
}

/* ARGSUSED */
static msgq_wakeup_t *
msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
{
	msgq_wakeup_t	*qptr;
	int		count;
	int		check_index;
	int		neg_index;
	int		nbuckets;

	if (!qp->msg_ngt_cnt) {
		return (NULL);
	}
	neg_index = msg_type_hash(-type);

	/*
	 * Check for a match among the negative type queues.  Any buckets
	 * at neg_index or larger can match the type.  Use the last send
	 * time to randomize the starting bucket to prevent starvation.
	 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
	 * from the random starting point, and wrapping around after
	 * MSG_MAX_QNUM.
	 */

	nbuckets = MSG_MAX_QNUM - neg_index + 1;
	check_index = neg_index + (qp->msg_stime % nbuckets);

	for (count = nbuckets; count > 0; count--) {
		qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
		while (qptr) {
			/*
			 * The lowest hash bucket may actually contain
			 * message types that are not valid for this
			 * request.  This can happen due to the fact that
			 * the message buckets actually contain a consecutive
			 * range of types.
			 */
			if (-qptr->msgw_type >= type) {
				list_remove(&qp->msg_wait_snd_ngt[check_index],
				    qptr);
				return (qptr);
			}
			qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
			    qptr);
		}
		if (++check_index > MSG_MAX_QNUM) {
			check_index = neg_index;
		}
	}
	return (NULL);
}

static int
msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
    kmsqid_t *qp)
{
	int		cvres;

	cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);

	list_insert_tail(queue, entry);

	qp->msg_rcv_cnt++;
	cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
	*lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
	qp->msg_rcv_cnt--;

	if (list_link_active(&entry->msgw_list)) {
		/*
		 * We woke up unexpectedly, remove ourself.
		 */
		list_remove(queue, entry);
	}

	return (cvres);
}

static void
msg_rcvq_wakeup_all(list_t *q_ptr)
{
	msgq_wakeup_t	*q_walk;

	while (q_walk = list_head(q_ptr)) {
		list_remove(q_ptr, q_walk);
		cv_signal(&q_walk->msgw_wake_cv);
	}
}

/*
 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
 * system calls.
 */
static ssize_t
msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
	uintptr_t a4, uintptr_t a5)
{
	ssize_t error;

	switch (opcode) {
	case MSGGET:
		error = msgget((key_t)a1, (int)a2);
		break;
	case MSGCTL:
		error = msgctl((int)a1, (int)a2, (void *)a3);
		break;
	case MSGRCV:
		error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
		    (size_t)a3, (long)a4, (int)a5);
		break;
	case MSGSND:
		error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
		    (size_t)a3, (int)a4);
		break;
	case MSGIDS:
		error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
		break;
	case MSGSNAP:
		error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
		break;
	default:
		error = set_errno(EINVAL);
		break;
	}

	return (error);
}

/*
 * Determine if a writer who is waiting can process its message.  If so
 * wake it up.
 */
static void
msg_wakeup_senders(kmsqid_t *qp)

{
	struct msgq_wakeup *ptr, *optr;
	size_t avail, smallest;
	int msgs_out;

	/*
	 * Is there a writer waiting, and if so, can it be serviced? If
	 * not return back to the caller.
	 */
	if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
		return;

	avail = qp->msg_qbytes - qp->msg_cbytes;
	if (avail < qp->msg_snd_smallest)
		return;

	ptr = list_head(&qp->msg_wait_rcv);
	if (ptr == NULL) {
		qp->msg_snd_smallest = MSG_SMALL_INIT;
		return;
	}
	optr = ptr;

	/*
	 * smallest:	minimum message size of all queued writers
	 *
	 * avail:	amount of space left on the msgq
	 *		if all the writers we have woken up are successful.
	 *
	 * msgs_out:	is the number of messages on the message queue if
	 *		all the writers we have woken up are successful.
	 */

	smallest = MSG_SMALL_INIT;
	msgs_out = qp->msg_qnum;
	while (ptr) {
		ptr = list_next(&qp->msg_wait_rcv, ptr);
		if (optr->msgw_snd_size <= avail) {
			list_remove(&qp->msg_wait_rcv, optr);
			avail -= optr->msgw_snd_size;
			cv_signal(&optr->msgw_wake_cv);
			msgs_out++;
			if (msgs_out == qp->msg_qmax ||
			    avail < qp->msg_snd_smallest)
				break;
		} else {
			if (smallest > optr->msgw_snd_size)
				smallest = optr->msgw_snd_size;
		}
		optr = ptr;
	}

	/*
	 * Reset the smallest message size if the entire list has been visited
	 */
	if (ptr == NULL && smallest != MSG_SMALL_INIT)
		qp->msg_snd_smallest = smallest;
}

#ifdef	_SYSCALL32_IMPL
/*
 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
 * system calls for 32-bit callers on LP64 kernel.
 */
static ssize32_t
msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
	uint32_t a4, uint32_t a5)
{
	ssize_t error;

	switch (opcode) {
	case MSGGET:
		error = msgget((key_t)a1, (int)a2);
		break;
	case MSGCTL:
		error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
		break;
	case MSGRCV:
		error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
		    (size_t)a3, (long)(int32_t)a4, (int)a5);
		break;
	case MSGSND:
		error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
		    (size_t)(int32_t)a3, (int)a4);
		break;
	case MSGIDS:
		error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
		    (uint_t *)(uintptr_t)a3);
		break;
	case MSGSNAP:
		error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
		    (long)(int32_t)a4);
		break;
	default:
		error = set_errno(EINVAL);
		break;
	}

	return (error);
}
#endif	/* SYSCALL32_IMPL */