V9/sys/sys/streamio.c

Compare this file to the similar file:
Show the results in this format:

#include "../h/param.h"
#include "../h/dir.h"
#include "../h/user.h"
#include "../h/buf.h"
#include "../h/stream.h"
#include "../h/inode.h"
#include "../h/filio.h"
#include "../h/ttyio.h"
#include "../h/ttyld.h"
#include "../h/conf.h"
#include "../h/lnode.h"
#include "../h/proc.h"
#include "../h/file.h"
#include "sparam.h"
#include "../h/share.h"

extern struct queue queue[];

#define	STIPRI	28
#define	STOPRI	29

struct	stdata streams[NSTREAM];

struct  stdata *stenter();
int	strput(), stwsrv(), nulldev(), nilput(), nilopen();

struct	qinit strdata = { strput, NULL, nulldev, nulldev, 512, 256 };
struct	qinit stwdata = { nulldev, stwsrv, nulldev, nulldev, 0, 0};
struct	qinit nilw = { nilput, NULL, nilopen, nulldev, 1, 0 };
struct	streamtab nilinfo = { &nilw, &nilw };

/*
 * open a stream device
 */
struct inode *
stopen(qinfo, dev, flag, ip)
struct streamtab *qinfo;
dev_t dev;
struct inode *ip;
{
	register struct queue *qp;
	register struct stdata *sp;
	register struct inode *nip;
	register s;

	if (sp = ip->i_sptr) {		/* already streaming? */
		if (sp->flag & EXCL && u.u_uid!=0
		   && (sp->pgrp==0 || sp->pgrp != u.u_procp->p_pgrp)) {
			u.u_error = ENXIO;
			return(NULL);
		}
		if (stenter(ip)==NULL)
			return(NULL);
		while (sp->flag&STWOPEN) {
			if (tsleep((caddr_t)sp, STOPRI, 0)!=TS_OK) {
				u.u_error = ENXIO;
				stexit(ip);
				return(NULL);
			}
		}
		qp = sp->wrq;
		if (sp->flag&HUNGUP) {
			u.u_error = ENXIO;
			flushq(RD(qp), 1);
			stexit(ip);
			return(NULL);
		}
		while (qp->next) {
			if (qp->next->flag&QREADR)
				break;
			/* device-cloning;  qopen s.b. struct inode * */
			nip = (struct inode *)
			  (*qp->next->qinfo->qopen)(RD(qp->next), dev);
			if (nip == NULL) {
				u.u_error = ENXIO;
				break;
			}
			if ((long)nip != 1) {
				stexit(ip);
				iput(ip);
				return(nip);
			}
			qp = qp->next;
		}
	} else {
		register struct queue *nq;
		for (sp = streams; sp < &streams[NSTREAM]; sp++)
			if (sp->wrq==NULL)
				break;
		if (sp >= &streams[NSTREAM]) {
			printf("out of streams\n");
			u.u_error = ENFILE;
			return(NULL);
		}
		if ((qp = allocq()) == NULL) {
			printf("Out of queues in stopen\n");
			u.u_error = ENFILE;
			return(NULL);
		}
		sp->wrq = WR(qp);
		sp->pgrp = 0;
		sp->inode = ip;
		sp->flag = 0;
		sp->iocblk = NULL;
		sp->count = 1;
		qp->ptr = (caddr_t)sp;
		WR(qp)->ptr = (caddr_t)sp;
		qp->qinfo = &strdata;
		WR(qp)->qinfo = &stwdata;
		qp->flag |= QBIGB;
		WR(qp)->flag |= QDELIM;
		ip->i_sptr = sp;
		s = spl6();
		if (qattach(qinfo, qp, dev) == 0) {
			sp->flag |= HUNGUP;
			stexit(ip);
			u.u_error = ENXIO;
			splx(s);
			return(NULL);
		}
		nq = RD(WR(qp)->next);
		sp->flag |= STWOPEN;
		nip = (struct inode *)(*nq->qinfo->qopen)(nq, dev);
		sp->flag &= ~STWOPEN;
		splx(s);
		wakeup((caddr_t)sp);
		if (nip == NULL) {
			qdetach(nq, 0);
			sp->flag |= HUNGUP;
			stexit(ip);
			u.u_error = ENXIO;
			return(NULL);
		}
		if ((long)nip != 1) {
			stexit(ip);
			iput(ip);
			return(nip);
		}
	}
	stexit(ip);
	return(NULL);
}

/*
 * Shut down a stream
 *  -- pop all line disciplines
 *  -- shut down the driver
 */
stclose(ip, sleepOK)
struct inode *ip;
{
	register struct stdata *stp;
	register struct queue *qp;
	register s;
	register struct block *bp;

	s = spl6();
	if ((stp = ip->i_sptr) == NULL) {
		splx(s);
		return;
	}
	forceclose(stp, FREAD|FWRITE);
	if (stp->count != 0) {
		printf("count is %d in stclose, stream %x\n", stp->count, stp);
		splx(s);
		return;
	}
	ip->i_sptr = NULL;
	splx(s);
	qp = stp->wrq;
	while (qp->next) {
		register i;
		/* detect cross-connects, and seal off other end */
		if (qp->next->flag & QREADR) {
			putctl(qp->next, M_HANGUP);
			if (qattach(&nilinfo, qp->next, (dev_t)-1) == 0)
				panic("close/qattach");
			WR(qp->next)->next = NULL;
			break;
		}
		if (sleepOK && (stp->flag&HUNGUP)==0)
			for (i=0; qp->next->count!=0 && i<60; i++)
				tsleep((caddr_t)qp, STIPRI, 1);
		qdetach(RD(qp->next), 1);
	}
	flushq(qp, 1);
	while (bp = getq(RD(qp))) {
		if (bp->type == M_PASS)
			closef(((struct kpassfd *)bp->rptr)->f.fp);
		freeb(bp);
	}
	stp->wrq = NULL;
	RD(qp)->flag = 0;
	qp->flag = 0;
}

/*
 * Read a stream
 */
stread(ip)
struct inode *ip;
{
	register struct stdata *stq;
	register struct block *bp;
	register n;
	register s, nc = 0;

	if ((stq = stenter(ip)) == NULL) {
		u.u_error = 0;			/* return EOF not error */
		return;
	}
	/*
	 * Read is complete when:
	 * Q is empty and some data has been read and next Q might not generate
	 * delimiters, or
	 * There is data on Q and count has been satisfied, or
	 * A delimiter is taken from the Q, or
	 * There was an error.
	 */
	for (;;) {
		s = spl6();
		if ((bp = getq(RD(stq->wrq))) == NULL) {
			if ((nc && (OTHERQ(stq->wrq->next)->flag&QDELIM)==0)
			 || stq->flag&HUNGUP) {
				splx(s);
				stexit(ip);
				return;
			}
			stq->flag |= RSLEEP;
			n = tsleep((caddr_t)RD(stq->wrq), STIPRI, 0);
			splx(s);
			if (n == TS_SIG) {
				stexit(ip);
				longjmp(u.u_qsav);
			}
			continue;
		}
		splx(s);
		switch (bp->type) {

		case M_DATA:
			n = min(u.u_count, bp->wptr - bp->rptr);
			if (n)
				iomove(bp->rptr, n, B_READ);
			bp->rptr += n;
			nc += n;
			if (bp->rptr >= bp->wptr)
				freeb(bp);
			else {
				putbq(RD(stq->wrq), bp);
				if (u.u_count == 0)
					goto out;
			}
			if (u.u_error)
				goto out;
			continue;

		case M_DELIM:
			freeb(bp);
			goto out;

		case M_PASS:
			putbq(RD(stq->wrq), bp);
			return;

		default:
			printf("stread: msg %d\n", bp->type);
			freeb(bp);
			continue;
		}
	}
out:
	stexit(ip);
}

/*
 *  Internal version of stread: transfer to given address.
 *  Always returns; >= 0 is # bytes transferred, -1 is error.
 *  Always reads a whole record; excess discarded.
 */
istread(ip, addr, count)
struct inode *ip;
caddr_t addr;
{
	register struct stdata *stq;
	register struct block *bp;
	register n;
	register s, nc = 0;

	if ((stq = stenter(ip)) == NULL)
		return(-1);
	for (;;) {
		s = spl6();
		if ((bp = getq(RD(stq->wrq))) == NULL) {
			if ((nc && (OTHERQ(stq->wrq->next)->flag&QDELIM)==0)
			 || stq->flag&HUNGUP) {
				splx(s);
				stexit(ip);
				return(nc);
			}
			stq->flag |= RSLEEP;
			n = tsleep((caddr_t)RD(stq->wrq), PRIBIO, 30);
			splx(s);
			if (n == TS_TIME) {
				stexit(ip);
				return(-1);
			}
			continue;
		}
		splx(s);
		switch (bp->type) {

		case M_DATA:
			n = min(count, bp->wptr - bp->rptr);
			if (n)
				bcopy(bp->rptr, addr, n);
			addr += n;
			nc += n;
			count -= n;
			freeb(bp);
			continue;

		case M_DELIM:
			freeb(bp);
			stexit(ip);
			return(nc);

		case M_PASS:
			putbq(RD(stq->wrq), bp);
			return(nc);

		default:
			printf("istread: msg %d\n", bp->type);
			freeb(bp);
			continue;
		}
	}
}

/*
 * Stream input put proc
 *  -- collect signals and ioctl acks
    -- put data and delims on data queue
    -- see hangups and seal off channel
 */
strput(q, bp)
register struct queue *q;
register struct block *bp;
{
	register struct stdata *stp = (struct stdata *)q->ptr;
	extern selwait;

	switch (bp->type) {

	case M_DATA:
		if (bp->rptr >= bp->wptr) {
			freeb(bp);
			return;
		}
		/* flow through */
	default:
		putq(q, bp);
		if (stp->flag & RSLEEP) {
			stp->flag &= ~RSLEEP;
			wakeup((caddr_t)q);
		}
		break;

	case M_HANGUP:
		stp->flag |= HUNGUP;
		freeb(bp);
		if (stp->pgrp)
			gsignal(stp->pgrp, SIGHUP);
		forceclose(stp, FWRITE);
		wakeup((caddr_t)q);	/* the readers */
		wakeup((caddr_t)WR(q));	/* the writers */
		wakeup((caddr_t)stp);	/* the ioctllers */
		break;

	case M_SIGNAL:
		gsignal(stp->pgrp, *bp->rptr);
	case M_FLUSH:
		flushq(q, 0);		/* flush races with select */
	case M_BREAK:
	case M_YDEL:
	case M_NDEL:
	case M_CTL:
		freeb(bp);
		return;

	case M_IOCACK:
	case M_IOCNAK:
	case M_IOCWAIT:
		if(stp->iocblk && stp->iocblk->type==M_IOCWAIT) {
			freeb(stp->iocblk);
			stp->iocblk = NULL;
		}
		if ((stp->flag&IOCWAIT)==0 || stp->iocblk) {
			freeb(bp);
			return;
		}
		stp->iocblk = bp;
		wakeup((caddr_t)stp);
		return;

	case M_IOCTL:
		bp->type = M_IOCNAK;
		bp->wptr = bp->rptr;	/* data means error */
		qreply(q, bp);
		return;
	}
	if (stp->rsel) {
		selwakeup(stp->rsel, stp->flag&RSEL);
		stp->flag &= ~RSEL;
		stp->rsel = NULL;
	}
}


/*
 * Write on a stream
 */
stwrite(ip)
struct inode *ip;
{
	register struct stdata *stq;
	register struct block *bp;
	register n = 0;
	register s;

	if ((stq = stenter(ip)) == NULL)
		return;
	for (;;) {
		s = spl6();
		while ((stq->flag&HUNGUP)==0 && stq->wrq->next->flag&QFULL) {
			if (stq->flag & WNBLK) {
				splx(s);
				if (n)
					putctl(stq->wrq->next, M_DELIM);
				else
					u.u_error = EBUSY;
				stexit(ip);
				return;
			}
			stq->flag |= WSLEEP;
			if (tsleep((caddr_t)stq->wrq, STOPRI, 0)==TS_SIG) {
				splx(s);
				stexit(ip);
				longjmp(u.u_qsav);
			}
		}
		splx(s);
		if (stq->flag & HUNGUP) {
			psignal(u.u_procp, SIGPIPE);
			u.u_error = ENXIO;
			break;
		}
		if (u.u_count==0) {
			putctl(stq->wrq->next, M_DELIM);
			break;
		}
		n = QBSIZE;
		if (u.u_count >= 512 && stq->wrq->next->flag&QBIGB)
			n = 1024;
		if ((bp = allocb(n)) == NULL) {
			printf("stwrite can't alloc\n");
			continue;		/* temp */
		}
		bp->type = M_DATA;
		n = min(bp->lim - bp->wptr, u.u_count);
		iomove(bp->wptr, n, B_WRITE);
		if (u.u_error) {
			freeb(bp);
			break;
		}
		bp->wptr += n;
		(*stq->wrq->next->qinfo->putp)(stq->wrq->next, bp);
		u.u_procp->p_lnode->kl_cost += shconsts.sc_tio;
		shconsts.sc_tioc++;
	}
	stexit(ip);
}

/*
 * Stream output -- internal version
 *  return -1 on error, 0 OK
 */
istwrite(ip, addr, count)
register count;
struct inode *ip;
caddr_t addr;
{
	register struct stdata *stq;
	register struct block *bp;
	register n;
	register s;

	if ((stq = stenter(ip)) == NULL)
		return(-1);
	while (count) {
		s = spl6();
		while (stq->wrq->next->flag&QFULL && (stq->flag&HUNGUP)==0) {
			stq->flag |= WSLEEP;
			if (tsleep((caddr_t)stq->wrq, PRIBIO, 30)==TS_TIME) {
				splx(s);
				stexit(ip);
				return(-1);
			}
		}
		splx(s);
		if (stq->flag & HUNGUP) {
			stexit(ip);
			return(-1);
		}
		if ((bp = allocb(QBSIZE)) == NULL) {
			printf("istwrite can't alloc\n");
			continue;		/* temp */
		}
		bp->type = M_DATA;
		n = min(bp->lim - bp->wptr, count);
		bcopy(addr, bp->wptr, n);
		bp->wptr += n;
		addr += n;
		count -= n;
		(*stq->wrq->next->qinfo->putp)(stq->wrq->next, bp);
	}
	putctl(stq->wrq->next, M_DELIM);
	stexit(ip);
	return(0);
}

/*
 * Stream output server (when full queue empties)
 */
stwsrv(q)
register struct queue *q;
{
	register struct stdata *stq = (struct stdata *)q->ptr;
	extern selwait;

	if (stq->flag & WSLEEP) {
		stq->flag &= ~WSLEEP;
		wakeup((caddr_t)q);
	}
	if (stq->wsel) {
		selwakeup(stq->wsel, stq->flag&WSEL);
		stq->flag &= ~WSEL;
		stq->wsel = NULL;
	}
}

/*
 * ioctl for streams
 */
stioctl(ip, cmd, arg)
struct inode *ip;
caddr_t arg;
{
	register struct stdata *stq;
	register struct block *bp;
	register struct queue *q;
	register s;
	int fmt, nld, ioctime;
	struct insld ld;
	extern nstream;

	if ((stq = stenter(ip)) == NULL)
		return;
	switch(cmd) {

	case FIONREAD:
		nld = 0;
		bp = RD(stq->wrq)->first;
		if (bp && bp->type==M_DATA)
			nld = bp->wptr - bp->rptr;
		if (copyout((caddr_t)&nld, arg, sizeof(nld)))
			u.u_error = EFAULT;
		goto out;

	case TIOCGPGRP:
		if (copyout ((caddr_t) &stq->pgrp, arg, sizeof (stq->pgrp)))
			u.u_error = EFAULT;
		goto out;

	case TIOCSPGRP:
		if (arg==0) {
			stq->pgrp = u.u_procp->p_pgrp = u.u_procp->p_pid;
			u.u_ttydev = ip->i_dev;
			u.u_ttyino = ip->i_number;
		} else if (copyin(arg, (caddr_t)&stq->pgrp, sizeof(stq->pgrp)))
			u.u_error = EFAULT;
		goto out;

	case FIOWNBLK:
		if (arg==0)
			fmt = 0;
		else if (copyin(arg, (caddr_t)&fmt, sizeof(fmt))) {
			u.u_error = EFAULT;
			goto out;
		}
		if (fmt)
			stq->flag &= ~WNBLK;
		else
			stq->flag |= WNBLK;
		goto out;

	case TIOCEXCL:
		stq->flag |= EXCL;
		goto out;

	case TIOCNXCL:
		stq->flag &= ~EXCL;
		goto out;

	case TIOCFLUSH:
		if (stq->flag & HUNGUP) {
			u.u_error = ENXIO;
			goto out;
		}
		flushq(RD(stq->wrq), 0);
		putctl(stq->wrq->next, M_FLUSH);
		goto out;

	case TIOCSBRK:
		if (stq->flag & HUNGUP) {
			u.u_error = ENXIO;
			goto out;
		}
		putctl(stq->wrq->next, M_BREAK);
		goto out;

	case FIOSNDFD:
		if (stq->flag & HUNGUP) {
			u.u_error = ENXIO;
			goto out;
		}
		usndfile(stq, arg);
		goto out;

	case FIORCVFD:
		urcvfile(stq, arg);
		goto out;

	case FIOPUSHLD:			/* add a line discipline */
	case FIOINSLD:
		if (stq->flag & HUNGUP) {
			u.u_error = ENXIO;
			goto out;
		}
		if (copyin(arg, (caddr_t)&ld, sizeof(ld))) {
			u.u_error = EFAULT;
			goto out;
		}
		if(ld.ld<0 || ld.ld>=nstream || streamtab[ld.ld]==NULL){
			u.u_error = EINVAL;
			goto out;
		}
		if (cmd==FIOPUSHLD)
			ld.level = 0;
		q = stq->wrq;
		while (ld.level > 0) {
			if (q->next == NULL) {
				u.u_error = EINVAL;
				goto out;
			}
			q = q->next;
			ld.level--;
		}
		s = spl6();
		if (qattach(streamtab[ld.ld], RD(q), ip->i_un.i_rdev)) {
			long nip =
			  (*q->next->qinfo->qopen)(RD(q->next), ip->i_un.i_rdev);
			if (nip==0) {
				qdetach(RD(q->next), 0);
				u.u_error = ENXIO;
			} else if (nip!=1)
				panic("pushld qopen returns inode");	
			splx(s);
			goto out;
		}
		splx(s);
		u.u_error = ENOMEM;
		goto out;

	case FIOPOPLD:
		if (stq->flag & HUNGUP) {
			u.u_error = ENXIO;
			goto out;
		}
		nld = 0;
		if (arg) {
			if (copyin(arg, (caddr_t)&nld, sizeof(nld))) {
				u.u_error = EFAULT;
				goto out;
			}
		}
		q = stq->wrq;
		while (nld > 0) {
			if (q == NULL || q->next==NULL
			 || q->next->flag&QREADR) {
				u.u_error = EINVAL;
				goto out;
			}
			q = q->next;
			nld--;
		}
		if (q->next->next) {	/* LD exists? */
			qdetach(RD(q->next), 1);
			goto out;
		}
		u.u_error = EINVAL;
		goto out;

	case FIOLOOKLD:
		nld = 0;
		if (arg) {
			if (copyin(arg, (caddr_t)&nld, sizeof(nld))) {
				u.u_error = EFAULT;
				goto out;
			}
		}
		q = stq->wrq;
		for (;;) {
			if (q == NULL || q->next==NULL || q->next->next == NULL
			 || q->next->flag&QREADR) {
				u.u_error = EINVAL;
				goto out;
			}
			if (--nld < 0)
				break;
			q = q->next;
		}
		for (fmt=0; fmt<nstream; fmt++)
			if(streamtab[fmt] && streamtab[fmt]->wrinit==q->next->qinfo)
				break;
		if (fmt >= nstream) {
			u.u_error = ENXIO;
			goto out;
		}
		if (arg) {
			if (copyout((caddr_t)&fmt, arg, sizeof(arg))) {
				u.u_error = EINVAL;
				goto out;
			}
		}
		u.u_r.r_val1 = fmt;
		goto out;

	}
	if (stq->flag & HUNGUP) {
		u.u_error = ENXIO;
		goto out;
	}
	if ((bp = allocb(QBSIZE)) == NULL) {
		u.u_error = ENOSPC;
		goto out;
	}
	((union stmsg *)bp->wptr)->ioc0.com = cmd;
	if (arg) {
		if (copyin(arg, (caddr_t)&((union stmsg *)bp->wptr)->ioc1.sb,
		    sizeof(union stmsg))) {
			u.u_error = EFAULT;
			freeb(bp);
			goto out;
		}
		bp->wptr += sizeof(union stmsg);
	} else
		bp->wptr += sizeof(struct ioc0);
	bp->type = M_IOCTL;
	s = spl6();
	while (stq->flag & IOCWAIT) {
		if (tsleep((caddr_t)stq,STIPRI,0)!=TS_OK || stq->flag&HUNGUP) {
			splx(s);
			u.u_error = EIO;
			freeb(bp);
			goto out;
		}
	}
	stq->flag |= IOCWAIT;
	splx(s);
	(*stq->wrq->next->qinfo->putp)(stq->wrq->next, bp);
	/* wait for acknowledgment */
	s = spl6();
	ioctime = 15;
	while ((bp = stq->iocblk) == NULL || bp->type == M_IOCWAIT) {
		if (bp)
			ioctime = 0;
		if (tsleep((caddr_t)stq, STIPRI, ioctime) != TS_OK
				|| stq->flag & HUNGUP) {
			if (stq->iocblk)
				freeb(stq->iocblk);
			stq->iocblk = NULL;
			u.u_error = EIO;
			stq->flag &= ~IOCWAIT;
			splx(s);
			goto out;
		}
	}
	stq->iocblk = NULL;
	stq->flag &= ~IOCWAIT;
	splx(s);
	switch (bp->type) {
	case M_IOCACK:
		if (bp->wptr > bp->rptr) {
			bp->rptr =
			  (u_char *)&((union stmsg *)(bp->rptr))->ioc1.sb;
			if (copyout((caddr_t)bp->rptr, arg,
			    bp->wptr - bp->rptr))
				u.u_error = EFAULT;
		}
		freeb(bp);
		break;

	case M_IOCNAK:
		if (bp->rptr < bp->wptr)
			u.u_error = *bp->rptr;
		freeb(bp);
		if (u.u_error==0)
			u.u_error = ENOTTY;
		break;

	default:
		printf("strange stuff on ioctl ack cell %x\n", stq);
		break;
	}
	wakeup((caddr_t)stq);
out:
	stexit(ip);
}

/*
 * attach a stream device or line discipline
 *   qp is a read queue; the new queue goes in so its next
 *   read ptr is the argument, and the write queue corresponding
 *   to the argument points to this queue.
 */
qattach(qinfo, qp, dev)
register struct streamtab *qinfo;
register struct queue *qp;
dev_t dev;
{
	register struct queue *nq;
	register s;
	extern putq();

	if ((nq = allocq()) == NULL) {
		printf("allocq NULL\n");
		return(0);
	}
	s = spl6();
	nq->next = qp;
	WR(nq)->next = WR(qp)->next;
	if (WR(qp)->next)
		OTHERQ(WR(qp)->next)->next = nq;
	WR(qp)->next = WR(nq);
	nq->qinfo = qinfo->rdinit;
	WR(nq)->qinfo = qinfo->wrinit;
	nq->flag |= QREADR|QWANTR;
	WR(nq)->flag |= QWANTR;
	nq->ptr = NULL;
	WR(nq)->ptr = NULL;
	splx(s);
	return(1);
}

/*
 * Detach a stream device or line discipline.
 *  Call its close routine, then evict it.
 *  The close routine is required to return.
 *  The flag (if 1) indicates that the close routine
 *  should be called (real shutdown) otherwise this
 *  is merely a failed open.
 */
qdetach(qp, flag)
register struct queue *qp;
{
	register s = spl6();
	register i;

	if (flag) {
		queuerun();
		(*qp->qinfo->qclose)(qp);
		for (i=0; (qp->flag|WR(qp)->flag)&QENAB; i++) {
			queuerun();
			if (i>10)
				panic("queue won't give up");
		}
		flushq(qp, 1);
		flushq(WR(qp), 1);
	}
	if (WR(qp)->next)
		backq(qp)->next = qp->next;
	if (qp->next)
		backq(WR(qp))->next = WR(qp)->next;
	qp->flag = 0;
	WR(qp)->flag = 0;
	splx(s);
}

/*
 * Count entries/exits to the routines that process streams.
 * If a stream receives a HANGUP, it should be shut down
 * forcibly, and this makes sure that it does not happen while
 * pointers are still floating about.
 */
struct stdata *
stenter(ip)
register struct inode *ip;
{
	register struct stdata *stp;
	register s = spl6();

	if ((stp = ip->i_sptr) == NULL) {
		u.u_error = ENXIO;
		splx(s);
		return(NULL);
	}
	stp->count++;
	splx(s);
	return(stp);
}

stexit(ip)
register struct inode *ip;
{
	register struct stdata *stp = ip->i_sptr;

	if(stp == 0) {
		printf("null stp in stexit\n");
		return;
	}	/* can't happen, of course */
	if (--stp->count==0 && stp->flag&HUNGUP && RD(stp->wrq)->count==0)
		stclose(ip, 1);
}

/*
 * create a message transferring a file to process on the other end of a stream
 */
usndfile(stq, arg)
register struct stdata *stq;
caddr_t arg;
{
	int f;
	register struct file *fp;

	if (copyin(arg, (caddr_t)&f, sizeof(f))) {
		u.u_error = EFAULT;
		return;
	}
	if ((fp = getf(f)) == NULL) {
		u.u_error = EBADF;
		return;
	}
	sndfile(stq->wrq, fp);
}

sndfile(q, fp)
register struct queue *q;
register struct file *fp;
{
	register struct block *bp;

	for (;;) {
		if (q==NULL) {
			u.u_error = ENXIO;
			return(0);
		}
		if (q->qinfo == &strdata)
			break;
		/* follow pt's here */
		q = q->next;
	}
	if (q->flag&QFULL) {
		u.u_error = ENOMEM;
		return(0);
	}
	if ((bp = allocb(sizeof(struct kpassfd))) == NULL) {
		u.u_error = ENOMEM;
		return(0);
	}
	bp->type = M_PASS;
	((struct kpassfd *)bp->rptr)->uid = u.u_uid;
	((struct kpassfd *)bp->rptr)->gid = u.u_gid;
	((struct kpassfd *)bp->rptr)->nice = u.u_procp->p_nice-NZERO;
	bcopy(u.u_logname,((struct kpassfd *)bp->rptr)->logname,
		sizeof(u.u_logname));
	((struct kpassfd *)bp->rptr)->f.fp = fp;
	bp->wptr += sizeof(struct kpassfd);
	fp->f_count++;
	strput(q, bp);
	return(1);
}

urcvfile(stq, arg)
register struct stdata *stq;
{
	register struct block *bp;
	register i;
	register struct file *fp;
	register struct kpassfd *kp;
	struct passfd pfd;

	while ((bp = getq(RD(stq->wrq))) == NULL) {
		if (stq->flag&HUNGUP) {
			u.u_error = ENXIO;
			return;
		}
		stq->flag |= RSLEEP;
		if (tsleep((caddr_t)RD(stq->wrq), STIPRI, 0)==TS_SIG) {
			stexit(stq->inode);
			longjmp(u.u_qsav);
		}
	}
	if (bp->type != M_PASS) {
		putbq(RD(stq->wrq), bp);
		u.u_error = EIO;
		return;
	}
	kp = (struct kpassfd *)bp->rptr;
	fp = kp->f.fp;
	i = ufalloc();
	if (i < 0) {
		closef(fp);
		freeb(bp);
		return;
	}
	u.u_ofile[i] = fp;
	pfd.fd = i;
	pfd.uid = kp->uid;
	pfd.gid = kp->gid;
	bcopy(kp->logname, pfd.logname, sizeof(kp->logname));
	if (copyout((caddr_t)&pfd, arg, sizeof(struct passfd)))
		u.u_error = EFAULT;
	freeb(bp);
}

stselect(stp, rw, anyyet)
register struct stdata *stp;
{
	register ret = 0;
	register s = spl6();
	extern int selwait;

	if (rw==FWRITE) {
		if ((stp->wrq->next->flag&QFULL) == 0)
			ret = 1;
		else if (!anyyet) {
			if (stp->wsel && stp->wsel->p_wchan==(caddr_t)&selwait)
				stp->flag |= WSEL;
			else
				stp->wsel = u.u_procp;
		}
	} else {
		if (streadable(RD(stp->wrq)) || stp->flag&HUNGUP)
			ret = 1;
		else if (!anyyet) {
			if (stp->rsel && stp->rsel->p_wchan==(caddr_t)&selwait)
				stp->flag |= RSEL;
			else
				stp->rsel = u.u_procp;
		}
	}
	splx(s);
	return(ret);
}

/*
 * poison the flags in the file table leading to this stream
 */
forceclose(sptr, whichflags)
register struct stdata *sptr;
{
	register struct file *fp;

	for (fp = &file[0]; fp < fileNFILE; fp++) {
		if (fp->f_count==0)
			continue;
		if (fp->f_inode==NULL || fp->f_inode->i_sptr != sptr)
			continue;
		fp->f_flag &= ~whichflags;
		fp->f_flag |= FHUNGUP;
	}
}

/*
 * check if a queue is likely to be readable
 */
streadable(q)
register struct queue *q;
{
	register struct block *bp;

	if (q->first == 0)
		return(0);
	if ((backq(q)->flag&QDELIM) == 0 || q->count >= q->qinfo->lolimit)
		return(1);
	for (bp = q->first; bp; bp = bp->next)
		if (bp->type==M_DELIM || bp->type==M_PASS)
			return(1);
	return(0);
}


/*
 * Black hole to seal off closed pipes, and serve as a stub device.
 */
nilopen(info, q, dev)
struct stdata info;
struct queue *q;
dev_t dev;
{
	return(1);
}

nilput(q, bp)
struct queue *q;
struct block *bp;
{
	if (bp->type == M_IOCTL) {
		bp->type = M_IOCNAK;
		bp->wptr = bp->rptr;
		qreply(q, bp);
		return;
	}
	freeb(bp);
}