V10/sys/io/devnonet.c

Compare this file to the similar file:
Show the results in this format:

#include	"u.h"
#include	"lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"io.h"
#include	"errno.h"

#define NOW (MACHP(0)->ticks*MS2HZ)

enum {
	/*
	 *  configuration parameters
	 */
	Nconv = 32,		/* total number of active circuits */
	Nifc = 2,		/* number of interfaces */
	MSrexmit = 300,		/* retranmission interval in ms */
	MSack = 50,		/* ms to sit on an ack */

	/*
	 *  relative or immutable
	 */
	Ndir = Nconv + 1,	/* entries in the nonet directory */
	Nsubdir = 2,		/* entries in the nonet directory */
	Nmsg = 128,		/* max number of outstanding messages */
	Nmask = Nmsg - 1,	/* mask for log(Nmsg) bits */
};

typedef struct Hdr		Hdr;
typedef struct Msg		Msg;
typedef struct Conversation	Conversation;
typedef struct Interface	Interface;
typedef struct Etherhdr		Etherhdr;

/*
 *  generic nonet header
 */
struct Hdr {
	uchar	circuit[3];	/* circuit number */
	uchar	flag;
	uchar	mid;		/* message id */
	uchar	ack;		/* piggy back ack */
	uchar	remain[2];	/* count of remaing bytes of data */
	uchar	sum[2];		/* checksum (0 means none) */
};
#define HDRSIZE 10
#define NEWCALL	0x1		/* flag bit marking a new circuit */
#define HANGUP 0x2		/* flag bit requesting hangup */
#define ACKME 0x4		/* acknowledge this message */

/*
 *  a buffer describing a nonet message
 */
struct Msg {
	QLock;
	Blist;
	int	mid;		/* sequence number */
	int	rem;		/* remaining */
	long	time;
	int	acked;
};

/*
 *  Nonet conversation states (for Conversation.state)
 */
enum {
	Cclosed,
	Copen,
	Clistening,
	Cconnected,
	Cconnecting,
	Chungup,
	Cclosing,
};

/*
 *  one exists for each Nonet conversation.
 */
struct Conversation {
	QLock;

	Queue	*rq;		/* input queue */
	int	version;	/* incremented each time struct is changed */
	int	state;		/* true if listening */

	Msg	in[Nmsg];	/* messages being received */
	int	rcvcircuit;	/* circuit number of incoming packets */

	uchar	ack[Nmsg];	/* acknowledgements waiting to be sent */
	long	atime[Nmsg];
	int	afirst;
	int	anext;

	QLock	xlock;		/* one trasmitter at a time */
	Rendez	r;		/* process waiting for an output mid */
	Msg	ctl;		/* for control messages */
	Msg	out[Nmsg];	/* messages being sent */
	int	first;		/* first unacknowledged message */
	int	next;		/* next message buffer to use */
	int	lastacked;	/* last message acked */		
	Block	*media;		/* prototype media output header */
	Hdr	*hdr;		/* nonet header inside of media header */

	Interface *ifc;
	int	kstarted;
	char	raddr[64];	/* remote address */
	int	rexmit;		/* statistics */
	int	retry;
	int	bad;
	int	sent;
	int	rcvd;
};

/*
 *  a nonet interface.  one exists for every stream that a 
 *  nonet multiplexor is pushed onto.
 */
struct Interface {
	QLock;
	int	ref;
	char	name[64];	/* interface name */		
	int	maxtu;		/* maximum transfer unit */
	int	mintu;		/* minimum transfer unit */
	int	hsize;		/* media header size */
	Queue	*wq;		/* interface output queue */
	void	(*connect)(Conversation *, char *);
	Conversation conv[Nconv];
};
static Interface interface[Nifc];

void nonetkproc(void *);
int cksum(Block*, int);
extern Qinfo noetherinfo;

/*
 *  start a new conversation.  start an ack/retransmit process if
 *  none already exists for this circuit.
 */
static void
startconv(Conversation *cp, int circuit, int state)
{
	int i;
	char name[32];
	Interface *ifc;

	ifc = cp->ifc;

	/*
	 *  allocate the prototype header
	 */
	cp->media = allocb(ifc->hsize + HDRSIZE);
	cp->media->wptr += ifc->hsize + HDRSIZE;
	cp->hdr = (Hdr *)(cp->media->rptr + ifc->hsize);

	/*
	 *  fill in the circuit number
	 */
	cp->hdr->flag = NEWCALL|ACKME;
	cp->hdr->circuit[2] = circuit>>16;
	cp->hdr->circuit[1] = circuit>>8;
	cp->hdr->circuit[0] = circuit;

	/*
	 *  set the state variables
	 */
	cp->state = state;
	cp->retry = 0;
	for(i = 1; i < Nmsg; i++){
		cp->in[i].mid = i;
		cp->in[i].acked = 0;
		cp->in[i].rem = 0;
		cp->out[i].mid = i | Nmsg;
		cp->out[i].acked = 1;
		cp->out[i].rem = 0;
	}
	cp->in[0].mid = Nmsg;
	cp->in[0].acked = 0;
	cp->in[0].rem = 0;
	cp->out[0].mid = 0;
	cp->out[0].acked = 1;
	cp->out[0].rem = 0;
	cp->first = cp->next = 1;
	cp->rexmit = cp->bad = cp->sent = cp->rcvd = cp->lastacked = 0;

	/*
	 *  used for demultiplexing
	 */
	cp->rcvcircuit = circuit ^ 1;

	/*
	 *  start the ack/rexmit process
	 */
	if(cp->kstarted == 0){
		cp->kstarted = 1;
		sprint(name, "**nonet%d**", cp - ifc->conv);
		kproc(name, nonetkproc, cp);
	}
}

/*
 *  connect to the destination whose name is pointed to by bp->rptr.
 */
void
connect(Conversation *cp, Block *bp)
{
	Interface *ifc;
 	char *iname;
	int len;
	int circuit;

	ifc = cp->ifc;
	qlock(ifc);
	if(waserror()){
		qunlock(ifc);
		if(cp->media){
			freeb(cp->media);
			cp->media = 0;
		}
		freeb(bp);
		nexterror();
	}

	/*
	 *  init
	 */
	startconv(cp, (++(cp->version) * Nconv) + 2*(cp - ifc->conv), Cconnecting);
	(*ifc->connect)(cp, (char *)bp->rptr);
	strncpy(cp->raddr, (char *)bp->rptr, sizeof(cp->raddr));

	qunlock(ifc);
	freeb(bp);
	poperror();
}

/*
 *  listen for calls from any interface
 */
static int
listendone(void *a)
{
	Conversation *cp;

	cp = (Conversation *)a;
	return cp->state != Clistening;
}
static void
listen(Conversation *cp, Block *bp)
{
	freeb(bp);
	if(cp->state >= Clistening){
		print("listen in use %d %ux %ux\n", cp->state, cp->ifc, interface);
		error(0, Einuse);
	}
	cp->state = Clistening;
	sleep(&cp->r, listendone, cp);
}

/*
 *  send a hangup signal up the stream to get all line disciplines
 *  to cease and desist
 */
static void
hangup(Conversation *cp)
{
	Block *bp;
	Queue *q;

	cp->state = Chungup;
	bp = allocb(0);
	bp->type = M_HANGUP;
	q = cp->rq;
	PUTNEXT(q, bp);
	wakeup(&cp->r);
}

/*
 *  process a message acknowledgement.  if the message
 *  has any xmit buffers queued, free them.
 */
static void
rcvack(Conversation *cp, int mid)
{
	Msg *mp;
	Block *bp;
	int i;

	mp = &cp->out[mid & Nmask];

	/*
	 *  if already acked, ignore
	 */
	if(mp->acked || mp->mid != mid)
		return;
	mp->acked = 1;
	cp->lastacked = mid;

	/*
	 *  free buffers
	 */
	qlock(mp);
	while(bp = getb(mp))
		freeb(bp);
	qunlock(mp);

	/*
	 *  advance the first pointer and wakeup any processes waiting for a mid
	 */
	if((mid&Nmask) == cp->first){
		cp->retry = 0;
		for(i = cp->first; i!=cp->next && cp->out[i].acked; i = (i+1)&Nmask)
			;
		cp->first = i;
		wakeup(&cp->r);
	}
}

/*
 *  queue an acknowledgement to be sent.  ignore it if we already have Nmsg
 *  acknowledgements queued.
 */
static void
queueack(Conversation *cp, int mid)
{
	int next;
	ulong now;

	now = NOW;
	next = (cp->anext + 1)&Nmask;
	if(next != cp->afirst){
		cp->ack[cp->anext] = mid;
		cp->atime[cp->anext] = now + MSack;
		cp->anext = next;
	}
	if(now > cp->atime[cp->afirst])
		wakeup(&cp->rq->r);
}

/*
 *  make a packet header
 */
Block *
mkhdr(Conversation *cp, int rem)
{
	Block *bp;
	Hdr *hp;

	bp = allocb(cp->ifc->hsize + HDRSIZE + cp->ifc->mintu);
	memcpy(bp->wptr, cp->media->rptr, cp->ifc->hsize + HDRSIZE);
	bp->wptr += cp->ifc->hsize + HDRSIZE;
	hp = (Hdr *)(bp->rptr + cp->ifc->hsize);
	hp->remain[1] = rem>>8;
	hp->remain[0] = rem;
	hp->sum[0] = hp->sum[1] = 0;
	return bp;
}

/*
 *  transmit a message.  this involves breaking a possibly multi-block message into
 *  a train of packets on the media.
 *
 *  called by nonetoput() and nonetkproc().  the qlock(mp) synchronizes these two
 *  processes.
 */
static void
sendmsg(Conversation *cp, Msg *mp)
{
	Interface *ifc;
	Queue *wq;
	int msgrem;
	int pktrem;
	int n;
	Block *bp, *pkt, *last;
	uchar *rptr;

	ifc = cp->ifc;
	if(ifc == 0)
		return;
	wq = ifc->wq->next;

	/*
	 *  one transmitter at a time
	 */
	qlock(&cp->xlock);

	/*
	 *  synchronize with rcvack, don't want to send while the
	 *  message is being freed.
	 */
	qlock(mp);

	if(waserror() || mp->acked){
		qunlock(&cp->xlock);
		qunlock(mp);
		return;
	}

	/*
	 *  get the next acknowledge to use if the next queue up
	 *  is not full.
	 */
	if(cp->afirst != cp->anext && cp->rq->next->len < 16*1024){
		cp->hdr->ack = cp->ack[cp->afirst];
		cp->afirst = (cp->afirst+1)&Nmask;
	}
	cp->hdr->mid = mp->mid;

	if(ifc->mintu > mp->len) {
		/*
		 *  short message:
		 *  copy the whole message into the header block
		 */
		last = pkt = mkhdr(cp, mp->len);
		for(bp = mp->first; bp; bp = bp->next){
			memcpy(pkt->wptr, bp->rptr, n = BLEN(bp));
			pkt->wptr += n;
		}
		memset(pkt->wptr, 0, n = ifc->mintu - mp->len);
		pkt->wptr += n;
	} else {
		/*
		 *  long message:
		 *  break up the message into interface packets and send them.
		 *  once around this loop for each non-header block generated.
		 */
		msgrem = mp->len;
		pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem;
		bp = mp->first;
		if(bp)
			rptr = bp->rptr;
		last = pkt = mkhdr(cp, msgrem);
		while(bp){
			/*
			 *  if pkt full, send and create new header block
			 */
			if(pktrem == 0){
				cksum(pkt, ifc->hsize);
				last->flags |= S_DELIM;
				(*wq->put)(wq, pkt);
				last = pkt = mkhdr(cp, -msgrem);
				pktrem = msgrem > ifc->maxtu ? ifc->maxtu : msgrem;
			}
			n = bp->wptr - rptr;
			if(n > pktrem)
				n = pktrem;
			last = last->next = allocb(0);
			last->rptr = rptr;
			last->wptr = rptr = rptr + n;
			msgrem -= n;
			pktrem -= n;
			if(rptr >= bp->wptr){
				bp = bp->next;
				if(bp)
					rptr = bp->rptr;
			}
		}
	}
	cksum(pkt, ifc->hsize);
	last->flags |= S_DELIM;
	(*wq->put)(wq, pkt);
	mp->time = NOW + MSrexmit;
	qunlock(mp);
	qunlock(&cp->xlock);
	poperror();
}

/*
 *  send a control message (hangup or acknowledgement).
 */
sendctlmsg(Conversation *cp, int flag, int new)
{
	cp->ctl.len = 0;
	cp->ctl.first = 0;
	cp->ctl.acked = 0;
	if(new)
		cp->ctl.mid = Nmsg^cp->out[cp->next].mid;
	else
		cp->ctl.mid = cp->lastacked;
	cp->hdr->flag |= flag;
	sendmsg(cp, &cp->ctl);
}

/*
 *  receive a message (called by the multiplexor; noetheriput, nofddiiput, ...)
 */
static void
rcvmsg(Conversation *cp, Block *bp)
{
	Block *nbp;
	Hdr *h;
	short r;
	int c;
	Msg *mp;
	int f;
	Queue *q;

	q = cp->rq;

	/*
	 *  grab the packet header, push the pointer past the nonet header
	 */
	h = (Hdr *)bp->rptr;
	bp->rptr += HDRSIZE;
	mp = &cp->in[h->mid & Nmask];
	r = (h->remain[1]<<8) | h->remain[0];
	f = h->flag;

	/*
	 *  if a new call request comes in on a connected channel, hang up the call
	 */
	if(h->mid==0 && (f & NEWCALL) && cp->state==Cconnected){
		freeb(bp);
		hangup(cp);
		return;
	}

	/*
	 *  ignore old messages and process the acknowledgement
	 */
	if(h->mid != mp->mid){
		if(r == 0){
			rcvack(cp, h->ack);
			if(f & HANGUP)
				hangup(cp);
		} else {
			if(r>0)
				queueack(cp, h->mid);
			cp->bad++;
		}
		freeb(bp);
		return;
	}

	if(r>=0){
		/*
		 *  start of message packet
		 */
		if(mp->first){
			cp->bad++;
			freeb(bp);
			return;
		}
		mp->rem = r;
	} else {
		/*
		 *  a continuation
		 */
		if(-r != mp->rem) {
			cp->bad++;
			freeb(bp);
			return;
		}
	}

	/*
	 *  take care of packets that were padded up
	 */
	mp->rem -= BLEN(bp);
	if(mp->rem < 0){
		if(-mp->rem <= BLEN(bp)){
			bp->wptr += mp->rem;
			mp->rem = 0;
		} else
			panic("rcvmsg: short packet");
	}
	putb(mp, bp);

	/*
	 *  if the last chunk - pass it up the stream and wake any
	 *  waiting process.
	 *
	 *  if not, strip off the delimiter.
	 */
	if(mp->rem == 0){
		rcvack(cp, h->ack);
		if(f & ACKME)
			queueack(cp, h->mid);
		mp->last->flags |= S_DELIM;
		PUTNEXT(q, mp->first);
		mp->first = mp->last = 0;
		mp->len = 0;
		cp->rcvd++;

		/*
		 *  cycle bufffer to next expected mid
		 */
		mp->mid ^= Nmsg;

		/*
		 *  stop xmitting the NEWCALL flag
		 */
		if(cp->state==Cconnecting && !(f & NEWCALL))
			cp->state = Cconnected;
	} else
		mp->last->flags &= ~S_DELIM;

}

/*
 *  the device stream module definition
 */
static void nonetstopen(Queue *, Stream *);
static void nonetstclose(Queue *);
static void nonetoput(Queue *, Block *);
static void nonetiput(Queue *, Block *);
Qinfo nonetinfo = { nonetiput, nonetoput, nonetstopen, nonetstclose, "nonet" } ;

/*
 *  store the device end of the stream so that the multiplexor can
 *  send blocks upstream.  this is called by streamopen() when a
 *  nonet device steam is created.
 */
static void
nonetstopen(Queue *q, Stream *s)
{
	Interface *ifc;
	Conversation *cp;

	ifc = &interface[s->dev];
	cp = &ifc->conv[s->id];
	cp->ifc = ifc;
	cp->rq = RD(q);
	cp->state = Copen;
	RD(q)->ptr = WR(q)->ptr = (void *)cp;
}

/*
 *  wait until all output has drained or a hangup is received.
 *  then send a hangup message (until one is received).
 */
static int
isflushed(void *a)
{
	Conversation *cp;

	cp = (Conversation *)a;
	return cp->first == cp->next || cp->state == Chungup;
}
static int
ishungup(void *a)
{
	Conversation *cp;

	cp = (Conversation *)a;
	return cp->state == Chungup;
}
static int
isdead(void *a)
{
	Conversation *cp;

	cp = (Conversation *)a;
	return cp->kstarted == 0;
}
static void
nonetstclose(Queue *q)
{
	Conversation *cp;
	Msg *mp;
	int i;

	cp = (Conversation *)q->ptr;

	/*
	 *  wait for all messages to drain
	 */
	while(!isflushed(cp))
		sleep(&cp->r, isflushed, cp);

	/*
	 *  ack all outstanding messages
	 */
	for(; cp->first != cp->next; cp->first = (cp->first+1)&Nmask) {
		mp = &cp->out[cp->first];
		if(!mp->acked)
			rcvack(cp, mp->mid);
	}
	cp->first = cp->next;

	/*
	 *  send hangup messages to the other side
	 *  until it hangs up.
	 */
	if(cp->state >= Cconnected){
		sendctlmsg(cp, HANGUP, 1);
		for(i=0; i<10 && !ishungup(cp); i++){
			sendctlmsg(cp, HANGUP, 1);
			tsleep(&cp->r, ishungup, cp, MSrexmit);
		}
	}

	/*
	 *  kill off the nonetkproc
	 */
	cp->state = Cclosed;
	wakeup(&cp->rq->r);
	sleep(&cp->r, isdead, cp);

	/*
	 *  close down, synchronizing with interface
	 */
	qlock(cp);
	cp->ifc = 0;
	qunlock(cp);
}

/*
 *  send all messages up stream.  this should only be control messages
 */
static void
nonetiput(Queue *q, Block *bp)
{
	Conversation *cp;

	if(bp->type == M_HANGUP){
		cp = (Conversation *)q->ptr;
		cp->state = Chungup;
	}
	PUTNEXT(q, bp);
}

/*
 *  queue a block
 */
static int
windowopen(void *a)
{
	Conversation *cp;

	cp = (Conversation *)a;
	return (cp->next + 1)&Nmask != cp->first;	
}
static void
nonetoput(Queue *q, Block *bp)
{
	Conversation *cp;
	int next;
	Msg *mp;

	cp = (Conversation *)(q->ptr);

	/*
	 *  do all control functions
	 */
	if(bp->type != M_DATA){
		if(streamparse("connect", bp))
			connect(cp, bp);
		else if(streamparse("listen", bp))
			listen(cp, bp);
		else
			freeb(bp);
		return;
	}

	/*
	 *  collect till we see a delim
	 */
	if(!putb(q, bp))
		return;

	/*
	 *  block if we don't have any free mid's
	 */
	while((next = (cp->next + 1)&Nmask) == cp->first)
		sleep(&cp->r, windowopen, cp);

	/*
	 *  stick the message in a Msg structure
	 */
	mp = &cp->out[cp->next];
	mp->time = NOW + MSrexmit;
	mp->first = q->first;
	mp->last = q->last;
	mp->len = q->len;
	mp->mid ^= Nmsg;
	mp->acked = 0;

	/*
	 *  init the queue for new messages
	 */
	q->len = 0;
	q->first = q->last = 0;

	/*
	 *  send it
	 */
	cp->next = next;
	sendmsg(cp, mp);
	cp->sent++;
}

/*
 *  wake up every 250 ms to send and ack or resend a message
 */
static int
always(void *a)
{
	return 0;
}
void
nonetkproc(void *arg)
{
	Conversation *cp;
	Interface *ifc;
	Queue *wq;
	Msg *mp;
	int i;

	cp = (Conversation *)arg;

	for(;;){
		/*
		 *  die on request
		 */
		if(cp->state == Cclosed){
			cp->kstarted = 0;
			wakeup(&cp->r);
			return;
		}

		/*
		 *  retransmit first message
		 */
		if(cp->first != cp->next){
			mp = &cp->out[cp->first];
			if(!mp->acked && NOW >= mp->time){
				if(cp->retry++ > 400)
					hangup(cp);
				else {
					cp->rexmit++;
					sendmsg(cp, mp);
				}
			}
		}

		/*
		 *  send any ack whose time is come
		 */
		while(cp->afirst != cp->anext && NOW >= cp->atime[cp->anext]
		      && cp->rq->next->len < Streamhi)
			sendctlmsg(cp, 0, 0);
		tsleep(&cp->rq->r, always, 0, MSrexmit/2);
	}
}

/*
 *  nonet directory and subdirectory
 */
enum {
	Nraddrqid,
	Nstatsqid,
	Nchanqid,
	Ncloneqid,
};
Dirtab nonetdir[Ndir];
Dirtab nosubdir[]={
	"raddr",	Nraddrqid,	0,	0600,
	"stats",	Nstatsqid,	0,	0600,
};

/*
 *  nonet file system.  most of the calls use dev.c to access the nonet
 *  directory and stream.c to access the nonet devices.
 */
void
nonetreset(void)
{
	newqinfo(&noetherinfo);
}

/*
 *  create the nonet directory.  the files are `clone' and stream
 * directories '1' to '32' (or whatever Nconv is in decimal)
 */
void
nonetinit(void)
{
	int i;

	/*
	 *  create the directory.
	 */
	/*
	 *  the circuits
	 */
	for(i = 0; i < Nconv; i++) {
		sprint(nonetdir[i].name, "%d", i);
		nonetdir[i].qid = CHDIR|STREAMQID(i, Nchanqid);
		nonetdir[i].length = 0;
		nonetdir[i].perm = 0600;
	}

	/*
	 *  the clone device
	 */
	strcpy(nonetdir[i].name, "clone");
	nonetdir[i].qid = Ncloneqid;
	nonetdir[i].length = 0;
	nonetdir[i].perm = 0600;

}

Chan*
nonetattach(char *spec)
{
	Interface *ifc;
	Chan *c;

	/*
	 *  find an interface with the same name
	 */
	for(ifc = interface; ifc < &interface[Nifc]; ifc++){
		qlock(ifc);
		if(strcmp(spec, ifc->name)==0 && ifc->wq) {
			ifc->ref++;
			qunlock(ifc);
			break;
		}
		qunlock(ifc);
	}
	if(ifc == &interface[Nifc])
		error(0, Enoifc);
	c = devattach('n', spec);
	c->dev = 0;
	return c;
}

Chan*
nonetclone(Chan *c, Chan *nc)
{
	Interface *ifc;

	c = devclone(c, nc);
	ifc = &interface[c->dev];
	qlock(ifc);
	ifc->ref++;
	qunlock(ifc);
	return c;
}

int	 
nonetwalk(Chan *c, char *name)
{
	if(c->qid == CHDIR)
		return devwalk(c, name, nonetdir, Ndir, devgen);
	else
		return devwalk(c, name, nosubdir, Nsubdir, streamgen);
}

void	 
nonetstat(Chan *c, char *dp)
{
	if(c->qid == CHDIR)
		devstat(c, dp, nonetdir, Ndir, devgen);
	else
		devstat(c, dp, nosubdir, Nsubdir, streamgen);
}

/*
 *  opening a nonet device allocates a Conversation.  Opening the `clone'
 *  device is a ``macro'' for finding a free Conversation and opening
 *  it's ctl file.
 */
Chan*
nonetopen(Chan *c, int omode)
{
	extern Qinfo nonetinfo;
	Stream *s;
	Conversation *cp;
	Interface *ifc;

	if(c->qid == Ncloneqid){
		ifc = &interface[c->dev];
		for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
			if(cp->state == Cclosed && canqlock(cp)){
				if(cp->state != Cclosed){
					qunlock(cp);
					continue;
				}
				c->qid = CHDIR|STREAMQID(cp-ifc->conv, Nchanqid);
				devwalk(c, "ctl", 0, 0, streamgen);
				streamopen(c, &nonetinfo);
				qunlock(cp);
				break;
			}
		}
		if(cp == &ifc->conv[Nconv])
			error(0, Enodev);
	} else if(c->qid != CHDIR)
		streamopen(c, &nonetinfo);

	c->mode = openmode(omode);
	c->flag |= COPEN;
	c->offset = 0;
	return c;
}

void	 
nonetcreate(Chan *c, char *name, int omode, ulong perm)
{
	error(0, Eperm);
}

void	 
nonetclose(Chan *c)
{
	Interface *ifc;

	/* real closing happens in lancestclose */
	if(c->qid != CHDIR)
		streamclose(c);

	ifc = &interface[c->dev];
	qlock(ifc);
	ifc->ref--;
	qunlock(ifc);
}

long	 
nonetread(Chan *c, void *a, long n)
{
	int t;
	Conversation *cp;
	char stats[256];

	t = STREAMTYPE(c->qid);
	if(t >= Slowqid)
		return streamread(c, a, n);

	if(c->qid == CHDIR)
		return devdirread(c, a, n, nonetdir, Ndir, devgen);
	if(c->qid & CHDIR)
		return devdirread(c, a, n, nosubdir, Nsubdir, streamgen);

	cp = &interface[c->dev].conv[STREAMID(c->qid)];
	switch(t){
	case Nraddrqid:
		return stringread(c, a, n, cp->raddr);
	case Nstatsqid:
		sprint(stats, "sent: %d\nrcved: %d\nrexmit: %d\nbad: %d\n",
			cp->sent, cp->rcvd, cp->rexmit, cp->bad);
		return stringread(c, a, n, stats);
	}
	error(0, Eperm);
}

long	 
nonetwrite(Chan *c, void *a, long n)
{
	int t;

	t = STREAMTYPE(c->qid);
	if(t >= Slowqid)
		return streamwrite(c, a, n, 0);
	
	error(0, Eperm);
}

void	 
nonetremove(Chan *c)
{
	error(0, Eperm);
}

void	 
nonetwstat(Chan *c, char *dp)
{
	error(0, Eperm);
}

void	 
noneterrstr(Error *e, char *buf)
{
	rooterrstr(e, buf);
}

void	 
nonetuserstr(Error *e, char *buf)
{
	extern consuserstr(Error *, char *);

	consuserstr(e, buf);
}

/*
 *  interface
 */
/*
 *  Create an interface.  The qlock on ifc prevents a circuit
 *  from connecting to (nonetconnect) or outputting on (nonetoput)
 *  the interface while it is being created.
 */
Interface *
newifc(Queue *q, Stream *s, int maxtu, int mintu, int hsize,
	void (*connect)(Conversation *, char *))
{
	Interface *ifc;

	for(ifc = interface; ifc < &interface[Nifc]; ifc++){
		if(ifc->wq == 0){
			qlock(ifc);
			if(ifc->wq) {
				/* someone was faster than us */
				qunlock(ifc);
				continue;
			}
			RD(q)->ptr = WR(q)->ptr = (void *)ifc;
			ifc->maxtu = maxtu - hsize - HDRSIZE;
			ifc->mintu = mintu - hsize - HDRSIZE;
			ifc->hsize = hsize;
			ifc->connect = connect;
			ifc->wq = WR(q);
			ifc->name[0] = 0;
			qunlock(ifc);
			return ifc;
		}
	}
	error(0, Enoifc);
}

/*
 *  Free an interface.
 */
void
freeifc(Interface *ifc)
{
	qlock(ifc);
	if(ifc->ref){
		qunlock(ifc);
		print("freeifc in use\n");
		error(0, Einuse);
	}
	ifc->wq = 0;
	qunlock(ifc);
}
/*
 *  ethernet specific multiplexor
 */
/*
 *  ethernet header of a packet
 */
struct Etherhdr {
	uchar	d[6];
	uchar	s[6];
	uchar	type[2];
	uchar	circuit[3];	/* circuit number */
	uchar	flag;
	uchar	mid;		/* message id */
	uchar	ack;		/* piggy back ack */
	uchar	remain[2];	/* count of remaing bytes of data */
	uchar	sum[2];		/* checksum (0 means none) */
};
#define EHDRSIZE 24
#define EMAXBODY	(1514-HDRSIZE)	/* maximum ethernet packet body */
#define ETHER_TYPE	0x900	/* most significant byte last */

/*
 *  the ethernet multiplexor stream module definition
 */
static void noetheropen(Queue *, Stream *);
static void noetherclose(Queue *);
static void noetheroput(Queue *, Block *);
static void noetheriput(Queue *, Block *);
Qinfo noetherinfo = { noetheriput, nullput, noetheropen, noetherclose, "noether" };

/*
 *  parse an ethernet address (assumed to be 12 ascii hex digits)
 */
void
etherparse(uchar *to, char *from)
{
	int tdig;
	int fdig;
	int i;

	if(strlen(from) != 12)
		error(0, Ebadnet);

	for(i = 0; i < 6; i++){
		fdig = *from++;
		tdig = fdig > 'a' ? (fdig - 'a' + 10)
				: (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0'));
		fdig = *from++;
		tdig <<= 4;
		tdig |= fdig > 'a' ? (fdig - 'a' + 10)
				: (fdig > 'A' ? (fdig - 'A' + 10) : (fdig - '0'));
		*to++ = tdig;
	}
}

/*
 *  perfrorm the ether specific part of nonetconnect.  just stick
 *  the address into the prototype header.
 */
void
noetherconnect(Conversation *cp, char *ea)
{
	Etherhdr *eh;

	/*
	 *  special hack for ross
	 */
	if(strcmp(ea, "020701005eff")==0)
		cp->hdr->flag &= ~ACKME;

	eh = (Etherhdr *)cp->media->rptr;
	etherparse(eh->d, ea);
	eh->type[0] = ETHER_TYPE>>8;
	eh->type[1] = ETHER_TYPE;
}

/*
 *  set up an ether interface
 */
static void
noetheropen(Queue *q, Stream *s)
{
	newifc(q, s, 1514, 60, 14, noetherconnect);
}

/*
 *  tear down an ether interface
 */
static void
noetherclose(Queue *q)
{
	Interface *ifc;

	ifc = (Interface *)(q->ptr);
	freeifc(ifc);
}

/*
 *  Input a packet and use the ether address to select the correct
 *  nonet device to pass it to.
 *
 *  Simplifying assumption:  one put == one packet && the complete header
 *	is in the first block.  If this isn't true, demultiplexing will not work.
 */
static void
noetheriput(Queue *q, Block *bp)
{
	Interface *ifc;
	int circuit;
	Conversation *cp;
	Etherhdr *h;
	Etherhdr *ph;
	ulong s;
	Block *end;

	if(bp->type != M_DATA){
		PUTNEXT(q, bp);
		return;
	}

	ifc = (Interface *)(q->ptr);
	h = (Etherhdr *)(bp->rptr);
	circuit = (h->circuit[2]<<16) | (h->circuit[1]<<8) | h->circuit[0];
	s = (h->sum[1]<<8) | h->sum[0];
	if(s && s != cksum(bp, 14)){
		print("checksum error %ux %ux\n", s, (h->sum[1]<<8) | h->sum[0]);
		freeb(bp);
		return;
	}

	/*
	 *  look for an existing circuit.
	 */
	for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
		if(cp->state > Clistening && circuit == cp->rcvcircuit && canqlock(cp)){
			ph = (Etherhdr *)(cp->media->rptr);
			if(ifc == cp->ifc
			&& circuit == cp->rcvcircuit
			&& cp->state > Clistening
			&& memcmp(ph->d, h->s, sizeof(h->s)) == 0){
				cp->hdr->flag &= ~NEWCALL;
				bp->rptr += ifc->hsize;
				rcvmsg(cp, bp);
				qunlock(cp);
				return;
			}
			qunlock(cp);
		}
	}

	/*
	 *  ... or one just listening
	 */
	if((h->flag & NEWCALL) == 0) {
		freeb(bp);
		return;
	}
	for(cp = &ifc->conv[0]; cp < &ifc->conv[Nconv]; cp++){
		if(cp->state == Clistening && canqlock(cp)) {
			/*
			 *  initialize the conversation
			 */
			startconv(cp, circuit^1, Cconnecting);
			wakeup(&cp->r);
			sprint(cp->raddr, "%.2ux%.2ux%.2ux%.2ux%.2ux%.2ux", h->s[0],
				h->s[1], h->s[2], h->s[3], h->s[4], h->s[5]);

			/*
			 *  fill in media dependent prototype header
			 */
			ph = (Etherhdr *)(cp->media->rptr);
			memcpy(ph->d, h->s, sizeof(h->s));
			ph->type[0] = ETHER_TYPE>>8;
			ph->type[1] = ETHER_TYPE;

			/*
			 *  pass on the packet
			 */
			bp->rptr += ifc->hsize;
			rcvmsg(cp, bp);
			qunlock(cp);
			return;
		}
	}

	/*
	 *  not found 
	 */
	freeb(bp);
}

/*
 *  calculate the checksum of a list of blocks.  ignore the first `offset' bytes.
 */
int
cksum(Block *bp, int offset)
{
	Block *nbp = bp;
	uchar *ep, *p;
	int n;
	ulong s;
	Hdr *hp;

	s = 0;
	p = bp->rptr + offset;
	n = bp->wptr - p;
	hp = (Hdr *)p;
	hp->sum[0] = hp->sum[1] = 0;
	for(;;){
		ep = p+(n&~0x7);
		while(p < ep) {
			s = s + s + p[0];
			s = s + s + p[1];
			s = s + s + p[2];
			s = s + s + p[3];
			s = s + s + p[4];
			s = s + s + p[5];
			s = s + s + p[6];
			s = s + s + p[7];
			s = (s&0xffff) + (s>>16);
			p += 8;
		}
		ep = p+(n&0x7);
		while(p < ep) {
			s = s + s + *p;
			p++;
		}
		s = (s&0xffff) + (s>>16);
		bp = bp->next;
		if(bp == 0)
			break;
		p = bp->rptr;
		n = BLEN(bp);
	}
	s = (s&0xffff) + (s>>16);
	hp->sum[1] = s>>8;
	hp->sum[0] = s;
	return s & 0xffff;
}