OpenSolaris_b135/uts/common/io/bufmod.c

Compare this file to the similar file:
Show the results in this format:

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

/*
 * STREAMS Buffering module
 *
 * This streams module collects incoming messages from modules below
 * it on the stream and buffers them up into a smaller number of
 * aggregated messages.  Its main purpose is to reduce overhead by
 * cutting down on the number of read (or getmsg) calls its client
 * user process makes.
 *  - only M_DATA is buffered.
 *  - multithreading assumes configured as D_MTQPAIR
 *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
 *    allocation fails.
 *  - in order message transmission. This is enforced for messages other
 *    than high priority messages.
 *  - zero length messages on the read side are not passed up the
 *    stream but used internally for synchronization.
 * FLAGS:
 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
 *   (conversion is the default for backwards compatibility
 *    hence the negative logic).
 * - SB_NO_HEADER - no headers in buffered data.
 *   (adding headers is the default for backwards compatibility
 *    hence the negative logic).
 * - SB_DEFER_CHUNK - provides improved response time in question-answer
 *   applications. Buffering is not enabled until the second message
 *   is received on the read side within the sb_ticks interval.
 *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
 *   data being immediately sent upstream.
 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
 *   the blocked flow condition downstream. If this flag is clear (default)
 *   messages will be dropped if the upstream flow is blocked.
 */


#include	<sys/types.h>
#include	<sys/errno.h>
#include	<sys/debug.h>
#include	<sys/stropts.h>
#include	<sys/time.h>
#include	<sys/stream.h>
#include	<sys/conf.h>
#include	<sys/ddi.h>
#include	<sys/sunddi.h>
#include	<sys/kmem.h>
#include	<sys/strsun.h>
#include	<sys/bufmod.h>
#include	<sys/modctl.h>
#include	<sys/isa_defs.h>

/*
 * Per-Stream state information.
 *
 * If sb_ticks is negative, we don't deliver chunks until they're
 * full.  If it's zero, we deliver every packet as it arrives.  (In
 * this case we force sb_chunk to zero, to make the implementation
 * easier.)  Otherwise, sb_ticks gives the number of ticks in a
 * buffering interval. The interval begins when the a read side data
 * message is received and a timeout is not active. If sb_snap is
 * zero, no truncation of the msg is done.
 */
struct sb {
	queue_t	*sb_rq;		/* our rq */
	mblk_t	*sb_mp;		/* partial chunk */
	mblk_t  *sb_head;	/* pre-allocated space for the next header */
	mblk_t	*sb_tail;	/* first mblk of last message appended */
	uint_t	sb_mlen;	/* sb_mp length */
	uint_t	sb_mcount;	/* input msg count in sb_mp */
	uint_t	sb_chunk;	/* max chunk size */
	clock_t	sb_ticks;	/* timeout interval */
	timeout_id_t sb_timeoutid; /* qtimeout() id */
	uint_t	sb_drops;	/* cumulative # discarded msgs */
	uint_t	sb_snap;	/* snapshot length */
	uint_t	sb_flags;	/* flags field */
	uint_t	sb_state;	/* state variable */
};

/*
 * Function prototypes.
 */
static	int	sbopen(queue_t *, dev_t *, int, int, cred_t *);
static	int	sbclose(queue_t *, int, cred_t *);
static	void	sbwput(queue_t *, mblk_t *);
static	void	sbrput(queue_t *, mblk_t *);
static	void	sbrsrv(queue_t *);
static	void	sbioctl(queue_t *, mblk_t *);
static	void	sbaddmsg(queue_t *, mblk_t *);
static	void	sbtick(void *);
static	void	sbclosechunk(struct sb *);
static	void	sbsendit(queue_t *, mblk_t *);

static struct module_info	sb_minfo = {
	21,		/* mi_idnum */
	"bufmod",	/* mi_idname */
	0,		/* mi_minpsz */
	INFPSZ,		/* mi_maxpsz */
	1,		/* mi_hiwat */
	0		/* mi_lowat */
};

static struct qinit	sb_rinit = {
	(int (*)())sbrput,	/* qi_putp */
	(int (*)())sbrsrv,	/* qi_srvp */
	sbopen,			/* qi_qopen */
	sbclose,		/* qi_qclose */
	NULL,			/* qi_qadmin */
	&sb_minfo,		/* qi_minfo */
	NULL			/* qi_mstat */
};

static struct qinit	sb_winit = {
	(int (*)())sbwput,	/* qi_putp */
	NULL,			/* qi_srvp */
	NULL,			/* qi_qopen */
	NULL,			/* qi_qclose */
	NULL,			/* qi_qadmin */
	&sb_minfo,		/* qi_minfo */
	NULL			/* qi_mstat */
};

static struct streamtab	sb_info = {
	&sb_rinit,	/* st_rdinit */
	&sb_winit,	/* st_wrinit */
	NULL,		/* st_muxrinit */
	NULL		/* st_muxwinit */
};


/*
 * This is the loadable module wrapper.
 */

static struct fmodsw fsw = {
	"bufmod",
	&sb_info,
	D_MTQPAIR | D_MP
};

/*
 * Module linkage information for the kernel.
 */

static struct modlstrmod modlstrmod = {
	&mod_strmodops, "streams buffer mod", &fsw
};

static struct modlinkage modlinkage = {
	MODREV_1, &modlstrmod, NULL
};


int
_init(void)
{
	return (mod_install(&modlinkage));
}

int
_fini(void)
{
	return (mod_remove(&modlinkage));
}

int
_info(struct modinfo *modinfop)
{
	return (mod_info(&modlinkage, modinfop));
}


/* ARGSUSED */
static int
sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
{
	struct sb	*sbp;
	ASSERT(rq);

	if (sflag != MODOPEN)
		return (EINVAL);

	if (rq->q_ptr)
		return (0);

	/*
	 * Allocate and initialize per-Stream structure.
	 */
	sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
	sbp->sb_rq = rq;
	sbp->sb_ticks = -1;
	sbp->sb_chunk = SB_DFLT_CHUNK;
	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
	sbp->sb_mlen = 0;
	sbp->sb_mcount = 0;
	sbp->sb_timeoutid = 0;
	sbp->sb_drops = 0;
	sbp->sb_snap = 0;
	sbp->sb_flags = 0;
	sbp->sb_state = 0;

	rq->q_ptr = WR(rq)->q_ptr = sbp;

	qprocson(rq);


	return (0);
}

/* ARGSUSED1 */
static int
sbclose(queue_t *rq, int flag, cred_t *credp)
{
	struct	sb	*sbp = (struct sb *)rq->q_ptr;

	ASSERT(sbp);

	qprocsoff(rq);
	/*
	 * Cancel an outstanding timeout
	 */
	if (sbp->sb_timeoutid != 0) {
		(void) quntimeout(rq, sbp->sb_timeoutid);
		sbp->sb_timeoutid = 0;
	}
	/*
	 * Free the current chunk.
	 */
	if (sbp->sb_mp) {
		freemsg(sbp->sb_mp);
		sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
		sbp->sb_mlen = 0;
	}

	/*
	 * Free the per-Stream structure.
	 */
	kmem_free((caddr_t)sbp, sizeof (struct sb));
	rq->q_ptr = WR(rq)->q_ptr = NULL;

	return (0);
}

/*
 * the correction factor is introduced to compensate for
 * whatever assumptions the modules below have made about
 * how much traffic is flowing through the stream and the fact
 * that bufmod may be snipping messages with the sb_snap length.
 */
#define	SNIT_HIWAT(msgsize, fudge)	((4 * msgsize * fudge) + 512)
#define	SNIT_LOWAT(msgsize, fudge)	((2 * msgsize * fudge) + 256)


static void
sbioc(queue_t *wq, mblk_t *mp)
{
	struct iocblk *iocp;
	struct sb *sbp = (struct sb *)wq->q_ptr;
	clock_t	ticks;
	mblk_t	*mop;

	iocp = (struct iocblk *)mp->b_rptr;

	switch (iocp->ioc_cmd) {
	case SBIOCGCHUNK:
	case SBIOCGSNAP:
	case SBIOCGFLAGS:
	case SBIOCGTIME:
		miocack(wq, mp, 0, 0);
		return;

	case SBIOCSTIME:
#ifdef _SYSCALL32_IMPL
		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
			struct timeval32 *t32;

			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
			if (t32->tv_sec < 0 || t32->tv_usec < 0) {
				miocnak(wq, mp, 0, EINVAL);
				break;
			}
			ticks = TIMEVAL_TO_TICK(t32);
		} else
#endif /* _SYSCALL32_IMPL */
		{
			struct timeval *tb;

			tb = (struct timeval *)mp->b_cont->b_rptr;

			if (tb->tv_sec < 0 || tb->tv_usec < 0) {
				miocnak(wq, mp, 0, EINVAL);
				break;
			}
			ticks = TIMEVAL_TO_TICK(tb);
		}
		sbp->sb_ticks = ticks;
		if (ticks == 0)
			sbp->sb_chunk = 0;
		miocack(wq, mp, 0, 0);
		sbclosechunk(sbp);
		return;

	case SBIOCSCHUNK:
		/*
		 * set up hi/lo water marks on stream head read queue.
		 * unlikely to run out of resources. Fix at later date.
		 */
		if ((mop = allocb(sizeof (struct stroptions),
		    BPRI_MED)) != NULL) {
			struct stroptions *sop;
			uint_t chunk;

			chunk = *(uint_t *)mp->b_cont->b_rptr;
			mop->b_datap->db_type = M_SETOPTS;
			mop->b_wptr += sizeof (struct stroptions);
			sop = (struct stroptions *)mop->b_rptr;
			sop->so_flags = SO_HIWAT | SO_LOWAT;
			sop->so_hiwat = SNIT_HIWAT(chunk, 1);
			sop->so_lowat = SNIT_LOWAT(chunk, 1);
			qreply(wq, mop);
		}

		sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
		miocack(wq, mp, 0, 0);
		sbclosechunk(sbp);
		return;

	case SBIOCSFLAGS:
		sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
		miocack(wq, mp, 0, 0);
		return;

	case SBIOCSSNAP:
		/*
		 * if chunking dont worry about effects of
		 * snipping of message size on head flow control
		 * since it has a relatively small bearing on the
		 * data rate onto the streamn head.
		 */
		if (!sbp->sb_chunk) {
			/*
			 * set up hi/lo water marks on stream head read queue.
			 * unlikely to run out of resources. Fix at later date.
			 */
			if ((mop = allocb(sizeof (struct stroptions),
			    BPRI_MED)) != NULL) {
				struct stroptions *sop;
				uint_t snap;
				int fudge;

				snap = *(uint_t *)mp->b_cont->b_rptr;
				mop->b_datap->db_type = M_SETOPTS;
				mop->b_wptr += sizeof (struct stroptions);
				sop = (struct stroptions *)mop->b_rptr;
				sop->so_flags = SO_HIWAT | SO_LOWAT;
				fudge = snap <= 100 ?   4 :
				    snap <= 400 ?   2 :
				    1;
				sop->so_hiwat = SNIT_HIWAT(snap, fudge);
				sop->so_lowat = SNIT_LOWAT(snap, fudge);
				qreply(wq, mop);
			}
		}

		sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
		miocack(wq, mp, 0, 0);
		return;

	default:
		ASSERT(0);
		return;
	}
}

/*
 * Write-side put procedure.  Its main task is to detect ioctls
 * for manipulating the buffering state and hand them to sbioctl.
 * Other message types are passed on through.
 */
static void
sbwput(queue_t *wq, mblk_t *mp)
{
	struct	sb	*sbp = (struct sb *)wq->q_ptr;
	struct copyresp *resp;

	if (sbp->sb_flags & SB_SEND_ON_WRITE)
		sbclosechunk(sbp);
	switch (mp->b_datap->db_type) {
	case M_IOCTL:
		sbioctl(wq, mp);
		break;

	case M_IOCDATA:
		resp = (struct copyresp *)mp->b_rptr;
		if (resp->cp_rval) {
			/*
			 * Just free message on failure.
			 */
			freemsg(mp);
			break;
		}

		switch (resp->cp_cmd) {
		case SBIOCSTIME:
		case SBIOCSCHUNK:
		case SBIOCSFLAGS:
		case SBIOCSSNAP:
		case SBIOCGTIME:
		case SBIOCGCHUNK:
		case SBIOCGSNAP:
		case SBIOCGFLAGS:
			sbioc(wq, mp);
			break;

		default:
			putnext(wq, mp);
			break;
		}
		break;

	default:
		putnext(wq, mp);
		break;
	}
}

/*
 * Read-side put procedure.  It's responsible for buffering up incoming
 * messages and grouping them into aggregates according to the current
 * buffering parameters.
 */
static void
sbrput(queue_t *rq, mblk_t *mp)
{
	struct	sb	*sbp = (struct sb *)rq->q_ptr;

	ASSERT(sbp);

	switch (mp->b_datap->db_type) {
	case M_PROTO:
		if (sbp->sb_flags & SB_NO_PROTO_CVT) {
			sbclosechunk(sbp);
			sbsendit(rq, mp);
			break;
		} else {
			/*
			 * Convert M_PROTO to M_DATA.
			 */
			mp->b_datap->db_type = M_DATA;
		}
		/* FALLTHRU */

	case M_DATA:
		if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
		    !(sbp->sb_state & SB_FRCVD)) {
			sbclosechunk(sbp);
			sbsendit(rq, mp);
			sbp->sb_state |= SB_FRCVD;
		} else
			sbaddmsg(rq, mp);

		if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
			sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
			    sbp, sbp->sb_ticks);

		break;

	case M_FLUSH:
		if (*mp->b_rptr & FLUSHR) {
			/*
			 * Reset timeout, flush the chunk currently in
			 * progress, and start a new chunk.
			 */
			if (sbp->sb_timeoutid) {
				(void) quntimeout(sbp->sb_rq,
				    sbp->sb_timeoutid);
				sbp->sb_timeoutid = 0;
			}
			if (sbp->sb_mp) {
				freemsg(sbp->sb_mp);
				sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
				sbp->sb_mlen = 0;
				sbp->sb_mcount = 0;
			}
			flushq(rq, FLUSHALL);
		}
		putnext(rq, mp);
		break;

	case M_CTL:
		/*
		 * Zero-length M_CTL means our timeout() popped.
		 */
		if (MBLKL(mp) == 0) {
			freemsg(mp);
			sbclosechunk(sbp);
		} else {
			sbclosechunk(sbp);
			sbsendit(rq, mp);
		}
		break;

	default:
		if (mp->b_datap->db_type <= QPCTL) {
			sbclosechunk(sbp);
			sbsendit(rq, mp);
		} else {
			/* Note: out of band */
			putnext(rq, mp);
		}
		break;
	}
}

/*
 *  read service procedure.
 */
/* ARGSUSED */
static void
sbrsrv(queue_t *rq)
{
	mblk_t	*mp;

	/*
	 * High priority messages shouldn't get here but if
	 * one does, jam it through to avoid infinite loop.
	 */
	while ((mp = getq(rq)) != NULL) {
		if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
			/* should only get here if SB_NO_SROPS */
			(void) putbq(rq, mp);
			return;
		}
		putnext(rq, mp);
	}
}

/*
 * Handle write-side M_IOCTL messages.
 */
static void
sbioctl(queue_t *wq, mblk_t *mp)
{
	struct	sb	*sbp = (struct sb *)wq->q_ptr;
	struct iocblk	*iocp = (struct iocblk *)mp->b_rptr;
	struct	timeval	*t;
	clock_t	ticks;
	mblk_t	*mop;
	int	transparent = iocp->ioc_count;
	mblk_t	*datamp;
	int	error;

	switch (iocp->ioc_cmd) {
	case SBIOCSTIME:
		if (iocp->ioc_count == TRANSPARENT) {
#ifdef _SYSCALL32_IMPL
			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
				mcopyin(mp, NULL, sizeof (struct timeval32),
				    NULL);
			} else
#endif /* _SYSCALL32_IMPL */
			{
				mcopyin(mp, NULL, sizeof (*t), NULL);
			}
			qreply(wq, mp);
		} else {
			/*
			 * Verify argument length.
			 */
#ifdef _SYSCALL32_IMPL
			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
				struct timeval32 *t32;

				error = miocpullup(mp,
				    sizeof (struct timeval32));
				if (error != 0) {
					miocnak(wq, mp, 0, error);
					break;
				}
				t32 = (struct timeval32 *)mp->b_cont->b_rptr;
				if (t32->tv_sec < 0 || t32->tv_usec < 0) {
					miocnak(wq, mp, 0, EINVAL);
					break;
				}
				ticks = TIMEVAL_TO_TICK(t32);
			} else
#endif /* _SYSCALL32_IMPL */
			{
				error = miocpullup(mp, sizeof (struct timeval));
				if (error != 0) {
					miocnak(wq, mp, 0, error);
					break;
				}

				t = (struct timeval *)mp->b_cont->b_rptr;
				if (t->tv_sec < 0 || t->tv_usec < 0) {
					miocnak(wq, mp, 0, EINVAL);
					break;
				}
				ticks = TIMEVAL_TO_TICK(t);
			}
			sbp->sb_ticks = ticks;
			if (ticks == 0)
				sbp->sb_chunk = 0;
			miocack(wq, mp, 0, 0);
			sbclosechunk(sbp);
		}
		break;

	case SBIOCGTIME: {
		struct timeval *t;

		/*
		 * Verify argument length.
		 */
		if (transparent != TRANSPARENT) {
#ifdef _SYSCALL32_IMPL
			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
				error = miocpullup(mp,
				    sizeof (struct timeval32));
				if (error != 0) {
					miocnak(wq, mp, 0, error);
					break;
				}
			} else
#endif /* _SYSCALL32_IMPL */
			error = miocpullup(mp, sizeof (struct timeval));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}
		}

		/*
		 * If infinite timeout, return range error
		 * for the ioctl.
		 */
		if (sbp->sb_ticks < 0) {
			miocnak(wq, mp, 0, ERANGE);
			break;
		}

#ifdef _SYSCALL32_IMPL
		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
			struct timeval32 *t32;

			if (transparent == TRANSPARENT) {
				datamp = allocb(sizeof (*t32), BPRI_MED);
				if (datamp == NULL) {
					miocnak(wq, mp, 0, EAGAIN);
					break;
				}
				mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
			}

			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
			TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);

			if (transparent == TRANSPARENT)
				qreply(wq, mp);
			else
				miocack(wq, mp, sizeof (*t32), 0);
		} else
#endif /* _SYSCALL32_IMPL */
		{
			if (transparent == TRANSPARENT) {
				datamp = allocb(sizeof (*t), BPRI_MED);
				if (datamp == NULL) {
					miocnak(wq, mp, 0, EAGAIN);
					break;
				}
				mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
			}

			t = (struct timeval *)mp->b_cont->b_rptr;
			TICK_TO_TIMEVAL(sbp->sb_ticks, t);

			if (transparent == TRANSPARENT)
				qreply(wq, mp);
			else
				miocack(wq, mp, sizeof (*t), 0);
		}
		break;
	}

	case SBIOCCTIME:
		sbp->sb_ticks = -1;
		miocack(wq, mp, 0, 0);
		break;

	case SBIOCSCHUNK:
		if (iocp->ioc_count == TRANSPARENT) {
			mcopyin(mp, NULL, sizeof (uint_t), NULL);
			qreply(wq, mp);
		} else {
			/*
			 * Verify argument length.
			 */
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}

			/*
			 * set up hi/lo water marks on stream head read queue.
			 * unlikely to run out of resources. Fix at later date.
			 */
			if ((mop = allocb(sizeof (struct stroptions),
			    BPRI_MED)) != NULL) {
				struct stroptions *sop;
				uint_t chunk;

				chunk = *(uint_t *)mp->b_cont->b_rptr;
				mop->b_datap->db_type = M_SETOPTS;
				mop->b_wptr += sizeof (struct stroptions);
				sop = (struct stroptions *)mop->b_rptr;
				sop->so_flags = SO_HIWAT | SO_LOWAT;
				sop->so_hiwat = SNIT_HIWAT(chunk, 1);
				sop->so_lowat = SNIT_LOWAT(chunk, 1);
				qreply(wq, mop);
			}

			sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
			miocack(wq, mp, 0, 0);
			sbclosechunk(sbp);
		}
		break;

	case SBIOCGCHUNK:
		/*
		 * Verify argument length.
		 */
		if (transparent != TRANSPARENT) {
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}
		}

		if (transparent == TRANSPARENT) {
			datamp = allocb(sizeof (uint_t), BPRI_MED);
			if (datamp == NULL) {
				miocnak(wq, mp, 0, EAGAIN);
				break;
			}
			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
		}

		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;

		if (transparent == TRANSPARENT)
			qreply(wq, mp);
		else
			miocack(wq, mp, sizeof (uint_t), 0);
		break;

	case SBIOCSSNAP:
		if (iocp->ioc_count == TRANSPARENT) {
			mcopyin(mp, NULL, sizeof (uint_t), NULL);
			qreply(wq, mp);
		} else {
			/*
			 * Verify argument length.
			 */
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}

			/*
			 * if chunking dont worry about effects of
			 * snipping of message size on head flow control
			 * since it has a relatively small bearing on the
			 * data rate onto the streamn head.
			 */
			if (!sbp->sb_chunk) {
				/*
				 * set up hi/lo water marks on stream
				 * head read queue.  unlikely to run out
				 * of resources. Fix at later date.
				 */
				if ((mop = allocb(sizeof (struct stroptions),
				    BPRI_MED)) != NULL) {
					struct stroptions *sop;
					uint_t snap;
					int fudge;

					snap = *(uint_t *)mp->b_cont->b_rptr;
					mop->b_datap->db_type = M_SETOPTS;
					mop->b_wptr += sizeof (*sop);
					sop = (struct stroptions *)mop->b_rptr;
					sop->so_flags = SO_HIWAT | SO_LOWAT;
					fudge = (snap <= 100) ? 4 :
					    (snap <= 400) ? 2 : 1;
					sop->so_hiwat = SNIT_HIWAT(snap, fudge);
					sop->so_lowat = SNIT_LOWAT(snap, fudge);
					qreply(wq, mop);
				}
			}

			sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;

			miocack(wq, mp, 0, 0);
		}
		break;

	case SBIOCGSNAP:
		/*
		 * Verify argument length
		 */
		if (transparent != TRANSPARENT) {
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}
		}

		if (transparent == TRANSPARENT) {
			datamp = allocb(sizeof (uint_t), BPRI_MED);
			if (datamp == NULL) {
				miocnak(wq, mp, 0, EAGAIN);
				break;
			}
			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
		}

		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;

		if (transparent == TRANSPARENT)
			qreply(wq, mp);
		else
			miocack(wq, mp, sizeof (uint_t), 0);
		break;

	case SBIOCSFLAGS:
		/*
		 * set the flags.
		 */
		if (iocp->ioc_count == TRANSPARENT) {
			mcopyin(mp, NULL, sizeof (uint_t), NULL);
			qreply(wq, mp);
		} else {
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}
			sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
			miocack(wq, mp, 0, 0);
		}
		break;

	case SBIOCGFLAGS:
		/*
		 * Verify argument length
		 */
		if (transparent != TRANSPARENT) {
			error = miocpullup(mp, sizeof (uint_t));
			if (error != 0) {
				miocnak(wq, mp, 0, error);
				break;
			}
		}

		if (transparent == TRANSPARENT) {
			datamp = allocb(sizeof (uint_t), BPRI_MED);
			if (datamp == NULL) {
				miocnak(wq, mp, 0, EAGAIN);
				break;
			}
			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
		}

		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;

		if (transparent == TRANSPARENT)
			qreply(wq, mp);
		else
			miocack(wq, mp, sizeof (uint_t), 0);
		break;


	default:
		putnext(wq, mp);
		break;
	}
}

/*
 * Given a length l, calculate the amount of extra storage
 * required to round it up to the next multiple of the alignment a.
 */
#define	RoundUpAmt(l, a)	((l) % (a) ? (a) - ((l) % (a)) : 0)
/*
 * Calculate additional amount of space required for alignment.
 */
#define	Align(l)		RoundUpAmt(l, sizeof (ulong_t))
/*
 * Smallest possible message size when headers are enabled.
 * This is used to calculate whether a chunk is nearly full.
 */
#define	SMALLEST_MESSAGE	sizeof (struct sb_hdr) + _POINTER_ALIGNMENT

/*
 * Process a read-side M_DATA message.
 *
 * If the currently accumulating chunk doesn't have enough room
 * for the message, close off the chunk, pass it upward, and start
 * a new one.  Then add the message to the current chunk, taking
 * account of the possibility that the message's size exceeds the
 * chunk size.
 *
 * If headers are enabled add an sb_hdr header and trailing alignment padding.
 *
 * To optimise performance the total number of msgbs should be kept
 * to a minimum. This is achieved by using any remaining space in message N
 * for both its own padding as well as the header of message N+1 if possible.
 * If there's insufficient space we allocate one message to hold this 'wrapper'.
 * (there's likely to be space beyond message N, since allocb would have
 * rounded up the required size to one of the dblk_sizes).
 *
 */
static void
sbaddmsg(queue_t *rq, mblk_t *mp)
{
	struct sb	*sbp;
	struct timeval	t;
	struct sb_hdr	hp;
	mblk_t *wrapper;	/* padding for msg N, header for msg N+1 */
	mblk_t *last;		/* last mblk of current message */
	size_t wrapperlen;	/* length of header + padding */
	size_t origlen;		/* data length before truncation */
	size_t pad;		/* bytes required to align header */

	sbp = (struct sb *)rq->q_ptr;

	origlen = msgdsize(mp);

	/*
	 * Truncate the message.
	 */
	if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
			(adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
		hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
	else
		hp.sbh_totlen = hp.sbh_msglen = origlen;

	if (sbp->sb_flags & SB_NO_HEADER) {

		/*
		 * Would the inclusion of this message overflow the current
		 * chunk? If so close the chunk off and start a new one.
		 */
		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
			sbclosechunk(sbp);
		/*
		 * First message too big for chunk - just send it up.
		 * This will always be true when we're not chunking.
		 */
		if (hp.sbh_totlen > sbp->sb_chunk) {
			sbsendit(rq, mp);
			return;
		}

		/*
		 * We now know that the msg will fit in the chunk.
		 * Link it onto the end of the chunk.
		 * Since linkb() walks the entire chain, we keep a pointer to
		 * the first mblk of the last msgb added and call linkb on that
		 * that last message, rather than performing the
		 * O(n) linkb() operation on the whole chain.
		 * sb_head isn't needed in this SB_NO_HEADER mode.
		 */
		if (sbp->sb_mp)
			linkb(sbp->sb_tail, mp);
		else
			sbp->sb_mp = mp;

		sbp->sb_tail = mp;
		sbp->sb_mlen += hp.sbh_totlen;
		sbp->sb_mcount++;
	} else {
		/* Timestamp must be done immediately */
		uniqtime(&t);
		TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);

		pad = Align(hp.sbh_totlen);
		hp.sbh_totlen += sizeof (hp);
		hp.sbh_totlen += pad;

		/*
		 * Would the inclusion of this message overflow the current
		 * chunk? If so close the chunk off and start a new one.
		 */
		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
				sbclosechunk(sbp);

		if (sbp->sb_head == NULL) {
			/* Allocate leading header of new chunk */
			sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
			if (sbp->sb_head == NULL) {
				/*
				 * Memory allocation failure.
				 * This will need to be revisited
				 * since using certain flag combinations
				 * can result in messages being dropped
				 * silently.
				 */
				freemsg(mp);
				sbp->sb_drops++;
				return;
			}
			sbp->sb_mp = sbp->sb_head;
		}

		/*
		 * Copy header into message
		 */
		hp.sbh_drops = sbp->sb_drops;
		hp.sbh_origlen = origlen;
		(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
		sbp->sb_head->b_wptr += sizeof (hp);

		ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);

		/*
		 * Join message to the chunk
		 */
		linkb(sbp->sb_head, mp);

		sbp->sb_mcount++;
		sbp->sb_mlen += hp.sbh_totlen;

		/*
		 * If the first message alone is too big for the chunk close
		 * the chunk now.
		 * If the next message would immediately cause the chunk to
		 * overflow we may as well close the chunk now. The next
		 * message is certain to be at least SMALLEST_MESSAGE size.
		 */
		if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
			sbclosechunk(sbp);
			return;
		}

		/*
		 * Find space for the wrapper. The wrapper consists of:
		 *
		 * 1) Padding for this message (this is to ensure each header
		 * begins on an 8 byte boundary in the userland buffer).
		 *
		 * 2) Space for the next message's header, in case the next
		 * next message will fit in this chunk.
		 *
		 * It may be possible to append the wrapper to the last mblk
		 * of the message, but only if we 'own' the data. If the dblk
		 * has been shared through dupmsg() we mustn't alter it.
		 */

		wrapperlen = (sizeof (hp) + pad);

		/* Is there space for the wrapper beyond the message's data ? */
		for (last = mp; last->b_cont; last = last->b_cont)
			;

		if ((wrapperlen <= MBLKTAIL(last)) &&
			(last->b_datap->db_ref == 1)) {
			if (pad > 0) {
				/*
				 * Pad with zeroes to the next pointer boundary
				 * (we don't want to disclose kernel data to
				 * users), then advance wptr.
				 */
				(void) memset(last->b_wptr, 0, pad);
				last->b_wptr += pad;
			}
			/* Remember where to write the header information */
			sbp->sb_head = last;
		} else {
			/* Have to allocate additional space for the wrapper */
			wrapper = allocb(wrapperlen, BPRI_MED);
			if (wrapper == NULL) {
				sbclosechunk(sbp);
				return;
			}
			if (pad > 0) {
				/*
				 * Pad with zeroes (we don't want to disclose
				 * kernel data to users).
				 */
				(void) memset(wrapper->b_wptr, 0, pad);
				wrapper->b_wptr += pad;
			}
			/* Link the wrapper msg onto the end of the chunk */
			linkb(mp, wrapper);
			/* Remember to write the next header in this wrapper */
			sbp->sb_head = wrapper;
		}
	}
}

/*
 * Called from timeout().
 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
 * to synchronize with any active module threads (open, close, wput, rput).
 */
static void
sbtick(void *arg)
{
	struct sb *sbp = arg;
	queue_t	*rq;

	ASSERT(sbp);

	rq = sbp->sb_rq;
	sbp->sb_timeoutid = 0;		/* timeout has fired */

	if (putctl(rq, M_CTL) == 0)	/* failure */
		sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
}

/*
 * Close off the currently accumulating chunk and pass
 * it upward.  Takes care of resetting timers as well.
 *
 * This routine is called both directly and as a result
 * of the chunk timeout expiring.
 */
static void
sbclosechunk(struct sb *sbp)
{
	mblk_t	*mp;
	queue_t	*rq;

	ASSERT(sbp);

	if (sbp->sb_timeoutid) {
		(void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
		sbp->sb_timeoutid = 0;
	}

	mp = sbp->sb_mp;
	rq = sbp->sb_rq;

	/*
	 * If there's currently a chunk in progress, close it off
	 * and try to send it up.
	 */
	if (mp) {
		sbsendit(rq, mp);
	}

	/*
	 * Clear old chunk.  Ready for new msgs.
	 */
	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
	sbp->sb_mlen = 0;
	sbp->sb_mcount = 0;
	if (sbp->sb_flags & SB_DEFER_CHUNK)
		sbp->sb_state &= ~SB_FRCVD;

}

static void
sbsendit(queue_t *rq, mblk_t *mp)
{
	struct	sb	*sbp = (struct sb *)rq->q_ptr;

	if (!canputnext(rq)) {
		if (sbp->sb_flags & SB_NO_DROPS)
			(void) putq(rq, mp);
		else {
			freemsg(mp);
			sbp->sb_drops += sbp->sb_mcount;
		}
		return;
	}
	/*
	 * If there are messages on the q already, keep
	 * queueing them since they need to be processed in order.
	 */
	if (qsize(rq) > 0) {
		/* should only get here if SB_NO_DROPS */
		(void) putq(rq, mp);
	}
	else
		putnext(rq, mp);
}