Minix1.5/amoeba/kernel/trans.c

/****************************************************************************/
/*									    */
/* (c) Copyright 1988 by the Vrije Universiteit, Amsterdam, The Netherlands */
/*									    */
/*    This product is part of the  Amoeba  distributed operating system.    */
/*									    */
/*    Permission to use, sell, duplicate or disclose this software must be  */
/* obtained in writing.  Requests for such permissions may be sent to	    */
/*									    */
/*									    */
/*		Dr. Andrew S. Tanenbaum					    */
/*		Dept. of Mathematics and Computer Science		    */
/*		Vrije Universiteit					    */
/*		Postbus 7161						    */
/*		1007 MC Amsterdam					    */
/*		The Netherlands						    */
/*									    */
/****************************************************************************/

#define NDEBUG
#define TRANS

/*
 * This module implements the transaction mechanism. The transaction
 * calls are:
 *	getreq(header, buffer, count);
 *	putrep(header, buffer, count);
 *	trans(hdr1, buf1, cnt1, hdr2, buf2, cnt2);
 *
 * ``Getreq'' is called by servers that want to wait for a request.
 * ``Putrep'' is called by servers that what to send a reply to a client.
 * ``Trans'' is called by clients that want to send a request to a server.
 * Requests are addressed by the ``h_port'' field in the header structure.
 * Replies are automatically sent back to the corresponding client. Between
 * a ``getreq'' and a ``putrep'' the server may not call ``getreq''. All
 * the three calls are blocking.
 *
#ifndef NONET
 * The following network interface routines must be defined externally:
 *
 *	puthead(dest, source, ident, seq, type, size);
 *	append(data, size, send);
 *	pickoff(data, size);
 *
 * ``Puthead'' is called first when a packet has to be sent. Among other
 * things the destination and the size are specified. If this size is zero,
 * the packet must be sent immediately.
 * ``Append'' is called when more data must be appended to a packet. The
 * ``send'' parameter is set when the packet can be sent.
 * ``Pickoff'' is called when a packet has arrived. The specified number
 * of bytes must copied to the specified data buffer.
 *
 * When a packet arrives, the local routine ``handle'' must be called:
 *	handle(dest, source, ident, seq, type, size, hdr);
 * ``Hdr'' contains the first HEADERSIZE data bytes. With a call to
 * ``getall'' this buffer is enlarged to contain the full packet.
#endif NONET
 */

#include "kernel.h"
#include "amoeba.h"
#undef IDLE
#include "byteorder.h"
#include "exception.h"
#include "global.h"
#include "task.h"
#include "internet.h"
#include "amstat.h"
#include "assert.h"

#define	send		amsend
#define	received	amreceived
extern struct task	task[];

#define FIRSTSIZE	(PACKETSIZE - HEADERSIZE)

#define MAXCNT		30000		/* maximum buffer size */

#define taskptr(to)	(&task[tasknum(to)])

extern port NULLPORT;

#ifndef NONET
#ifndef NOCLIENT
extern unshort maxcrash;
#endif NOCLIENT
extern unshort retranstime, crashtime, clientcrash;
extern unshort maxretrans, mincrash;
#endif

address local;
extern long ticker;
extern address portlookup();
extern phys_bytes umap();

#ifdef STATISTICS
struct amstat amstat;
#endif
#ifdef BUFFERED

/* Some simple routines to test ``BUFFERED transactions''
 */

PRIVATE char bufs[10][BUFSIZE];
PRIVATE char used[11];

#define NOBUF	((buffer) 0)

buffer allocbuf(){
  register buffer b;

  for (b = 1; b <= 10; b++)
	if (!used[b]) {
		used[b] = 1;
		return(b);
	}
#ifndef NDEBUG
  printf("out of buffers\n");
#endif
  return(NOBUF);
}

freebuf(b)
buffer b;
{
  assert(used[b]);
  used[b] = 0;
}

putbuf(b, addr, size)
buffer b;
vir_bytes addr;
unshort size;
{
  register phys_bytes pb;

  assert(used[b]);
  if ((pb = umap(curtask, addr, (vir_bytes) size)) == 0)
	return(0);
  phys_copy((phys_bytes) bufs[b - 1], pb, (phys_bytes) size);
  return(1);
}

getbuf(b, addr, size)
buffer b;
vir_bytes addr;
unshort size;
{
  register phys_bytes pb;

  assert(used[b]);
  if ((pb = umap(curtask, addr, (vir_bytes) size)) == 0)
	return(0);
  phys_copy(pb, (phys_bytes) bufs[b - 1], (phys_bytes) size);
  return(1);
}

#endif BUFFERED

/* return where the ``location'' can be found:
 *	DONTKNOW:	the location is unknown;
 *	LOCAL:		it's on this machine;
 *	GLOBAL:		it's probably somewhere on the net.
 */
area(location)
register address location;
{
  if (location == SOMEWHERE)
	return(DONTKNOW);
  if (siteaddr(location) == local)
	return(LOCAL);
#ifdef NONET
  assert(0);
  /*NOTREACHED*/
#else
  return(GLOBAL);
#endif
}

/* (re)start getreq
 */
static getreqstart(t)
register struct task *t;
{
  t->ts_state = WAITBUF;
  t->ts_seq = 0;
  t->ts_offset = 0;
  portinstall(&t->ts_rhdr->h_port, t->ts_addr, WAIT);
}

#ifndef NONET	/*--------------------------------------------------*/

static headerfromnet(hdr)
header *hdr;
{
  dec_s_be(&hdr->h_command);
  dec_s_be(&hdr->h_size);
  dec_l_be(&hdr->h_offset);
  dec_s_be(&hdr->h_extra);
}

static headertonet(hdr)
header *hdr;
{
  enc_s_be(&hdr->h_command);
  enc_s_be(&hdr->h_size);
  enc_l_be(&hdr->h_offset);
  enc_s_be(&hdr->h_extra);
}

/* A locate message has arrived. This routine handles it by looking the
 * ports up in the cache, and if it finds some there that are local it
 * sends a reply back with those ports.
 */

static locreq(ph, ptab)         /* handle locate request */
register struct pktheader *ph;
register char *ptab;
{
  getall();
  portask(pktfrom(ph), (port *) ptab, ph->ph_size/PORTSIZE);
  netenable();
}

/* called from portcache.c */
hereport(asker, ptab, nports)
address asker;
port *ptab;
unsigned nports;
{

  nports *= PORTSIZE;
  puthead(asker, local, 0, 0, HERE, nports);
  append((phys_bytes) ptab, nports, SEND);
}

#ifndef NOCLIENT

/* called from portcache.c */
whereport(ptab, nports)
port *ptab;
unsigned nports;
{
  nports *= PORTSIZE;
  puthead(BROADCAST, local, 0, 0, LOCATE, nports);
  append((phys_bytes) ptab, nports, SEND);
}

/* start getrep
 */
static getrepstart(t)
register struct task *t;
{
  t->ts_flags &= ~PUTREQ;
  t->ts_state = WAITBUF;
  t->ts_seq = 0;
  t->ts_offset = 0;
  t->ts_flags |= GETREP;
  t->ts_timer = crashtime;
  t->ts_count = maxcrash;
}

/* A reply on a locate message has arrived. Store the ports in the cache.
 */
static here(ph, ptab)	/* handle locate reply */
register struct pktheader *ph;
register char *ptab;
{
  register port *p;
  register unshort from = pktfrom(ph);

  getall();
  p = ABSPTR(port *, ptab + ph->ph_size);
  while ((char *) --p >= ptab)
	portinstall(p, from, NOWAIT);
  netenable();
}

/* After I've enquired about the health of a server, the processor answered
 * that it's fine. Thank goodness. But keep asking.
 */
static alive(ph)
register struct pktheader *ph;
{
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  netenable();
  if (t->ts_server == pktfrom(ph) && t->ts_svident == ph->ph_ident &&
						(t->ts_flags & GETREP)) {
	t->ts_timer = crashtime;
	t->ts_count = maxcrash;
	t->ts_signal = 0;
  }
}

/* After I've enquired about the health of a server, the processor answered
 * that it's dead. Too bad. Notify client.
 */
static dead(ph)
register struct pktheader *ph;
{
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  netenable();
  if (t->ts_server == pktfrom(ph) && t->ts_svident == ph->ph_ident &&
		(t->ts_state == WAITBUF || t->ts_state == RECEIVING ||
		 t->ts_state == SENDING) &&
		(t->ts_flags & (PUTREQ | GETREP))) {
	t->ts_timer = 0;
	t->ts_state = FAILED;
	wakeup((event_t) &t->ts_state);
  }
}
#endif NOCLIENT

/* Someone enquires how a server doing. Inform him. A signal may be sent
 * along with the enquiry. Handle that.
 */
static enquiry(ph)
register struct pktheader *ph;
{
  register unshort from = pktfrom(ph), to = pktto(ph);
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  netenable();
  if (t->ts_client == from && t->ts_clident == ph->ph_ident) {
  	if (t->ts_flags & SERVING) {
		t->ts_cltim = clientcrash;
		puthead(from, to, ph->ph_ident, 0, ALIVE, 0);
		if (ph->ph_signal != 0)
			putsig(t, (unshort) (ph->ph_signal & 0xFF));

	}
  }
  else
	puthead(from, to, ph->ph_ident, 0, DEAD, 0);
}

/* Send a fragment of a packet. If it's the first, insert the header.
 */
static sendfragment(t, what)
struct task *t;
unshort what;
{
  register address to;
  register short size = t->ts_xcnt - t->ts_offset;
  register phys_bytes xbuf;

#ifndef NOCLIENT
  if (t->ts_flags & PUTREQ) {
	to = t->ts_server;
	what |= REQUEST;
  }
  else
#endif
  {
	to = t->ts_client;
	what |= REPLY;
  }
  if (t->ts_seq == 0) {		/* first fragment--append header */
	if (size <= FIRSTSIZE)		/* first and last */
		what |= LAST;
	else {				/* first but not last */
		size = (size + HEADERSIZE - 1) % PACKETSIZE - HEADERSIZE + 1;
		if (size < 0)
			size = 0;
	}
	puthead(to, t->ts_addr, t->ts_ident, t->ts_seq, (char) what,
						(unshort) size + HEADERSIZE);
	headertonet(t->ts_xhdr);
	append((phys_bytes) t->ts_xhdr, HEADERSIZE, size == 0 ? SEND : NOSEND);
	headerfromnet(t->ts_xhdr);
  }
  else {			/* not first */
	if (size <= PACKETSIZE)		/* last but not first */
		what |= LAST;
	else				/* not first and not last */
		size = PACKETSIZE;
	puthead(to, t->ts_addr, t->ts_ident, t->ts_seq, (char) what,
							(unshort) size);
  }
/*
** a change from original trans code because on an ibmpc kernel virtual
** addresses are different from kernel physical.  therefore we must
** replace call to append with am_doappend().
*/
  if (size != 0)
	if ((xbuf = umap(t, (vir_bytes) (t->ts_xbuf + t->ts_offset),
						(vir_bytes) size)) == 0)
		return(0);
	else
		am_doappend(xbuf, (unshort) size, SEND);
  return(1);
}

/* Send a message. Call sendfragment to send the first fragment. When an
 * acknowledgement arrives, the interrupt routine handling it will send
 * the next fragment, if necessary.
 */
static send(){
  register struct task *c = curtask;
  register unshort what = 0;

  c->ts_state = SENDING;
#ifdef NOCLIENT
  c->ts_ident = c->ts_clident;
#else
  c->ts_ident = c->ts_flags & PUTREQ ? c->ts_svident : c->ts_clident;
#endif
  c->ts_seq = 0;
  c->ts_count = maxretrans;
  do {
	if (!sendfragment(c, what)) {
		c->ts_state = MEMFAULT;
		return;
	}
	if (c->ts_state == SENDING) {
		c->ts_timer = retranstime;
		if (sleep((event_t) &c->ts_state)) {
			c->ts_timer = 0;
			c->ts_state = ABORT;
			return;
		}
	}
	if (c->ts_state == ACKED) {
		c->ts_state = SENDING;
		what = 0;
	}
	else /* if (c->ts_state == SENDING) */
		what = RETRANS;
  } while (c->ts_state == SENDING);
}

/* An acknowledgement for a fragment arrived. If it wasn't the last fragment,
 * send the next. Else if it was a putrep, restart the task. Else it was the
 * putreq part of a transaction, start up the getrep part.
 */
static gotack(ph)
register struct pktheader *ph;
{
  register unshort to = pktto(ph), from = pktfrom(ph);
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  netenable();
  if (t->ts_state == SENDING && t->ts_ident == ph->ph_ident &&
						t->ts_seq == ph->ph_seq) {
	if (ph->ph_seq == 0) {		/* first fragment */
		register short size = t->ts_xcnt - t->ts_offset;
		
#ifndef NOCLIENT
		if (t->ts_flags & PUTREQ)
			t->ts_server = from;
		else
#endif
		if (t->ts_client != from)
			return;
		if (size > 0)
			size = (size + HEADERSIZE - 1) % PACKETSIZE
							- HEADERSIZE + 1;
		if (size > 0)
			t->ts_offset += size;
	}
	else
		t->ts_offset += PACKETSIZE;
	t->ts_timer = 0;
	if (t->ts_offset >= t->ts_xcnt)		/* ack for last fragment */
#ifndef NOCLIENT
		if (t->ts_flags & PUTREQ) {	/* in a transaction */
			getrepstart(t);
			if (t->ts_signal != 0)
				puthead(from, to, ph->ph_ident, t->ts_signal,
								ENQUIRY, 0);
		}
		else
#endif
		{				/* putrep done */
			assert(t->ts_flags & PUTREP);
			t->ts_state = DONE;
			wakeup((event_t) &t->ts_state);
		}
	else {
		t->ts_seq++;
#ifdef BUFFERED
		t->ts_timer = 0;
		t->ts_count = maxretrans;
		t->ts_state = ACKED;
		wakeup((event_t) &t->ts_state);
#else
		if (sendfragment(t, 0)) {
			t->ts_timer = retranstime;
			t->ts_count = maxretrans;
		}
		else {
			t->ts_timer = 0;
			t->ts_state = MEMFAULT;
			wakeup((event_t) &t->ts_state);
		}
#endif BUFFERED
	}
  }
}

#ifndef NOCLIENT
/* A nak has arrived. Obviously the server was not at the assumed address.
 * Wake up the task, to do a new locate.
 */
static gotnak(ph)
register struct pktheader *ph;
{
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  netenable();
  if (t->ts_state == SENDING && t->ts_ident == ph->ph_ident && t->ts_seq == 0
	  && (t->ts_flags & PUTREQ) && t->ts_server == (ph->ph_srcnode & 0xFF)) {
	t->ts_timer = 0;
	t->ts_state = NACKED;
	wakeup((event_t) &t->ts_state);
  }
}
#endif NOCLIENT

/* A fragment has arrived. Get the data and see if more fragments are expected.
 */
static received(t, what, size, hdr)
struct task *t;
char what, *hdr;
unshort size;
{
  register unshort cnt = t->ts_rcnt - t->ts_offset, n;
  register phys_bytes rbuf;

  if (cnt > size)
	cnt = size;
  if (cnt != 0) {
	rbuf = umap(t, (vir_bytes) (t->ts_rbuf+t->ts_offset), (vir_bytes) cnt);
	if (rbuf == 0) {
		netenable();
		t->ts_timer = 0;
		t->ts_state = MEMFAULT;
		wakeup((event_t) &t->ts_state);
		return;
	}
	t->ts_offset += cnt;
	if (t->ts_seq != 0) {	/* copy the ``header'' */
		n = cnt < HEADERSIZE ? cnt : HEADERSIZE;
/* kernel virtual != kernel physical under minix */
		am_phys_copy((vir_bytes)hdr, rbuf, (phys_bytes) n);
		rbuf += n;
		cnt -= n;
	}
	if (cnt != 0)
		pickoff(rbuf, cnt);
  }
  netenable();
  if (what & LAST) {
	t->ts_timer = 0;
	t->ts_state = DONE;
#ifndef BUFFERED
	wakeup((event_t) &t->ts_state);
#endif
  }
  else {
	t->ts_seq++;
	t->ts_timer = crashtime;
	t->ts_count = mincrash;
	t->ts_state = RECEIVING;
  }
}

#ifdef BUFFERED

/* A network fragment is available. Wake up the waiting task.
 */
static gotbuffer(t, from, ident, what, size)
register struct task *t;
address from;
char ident, what;
unshort size;
{
  assert(t->ts_state == WAITBUF || t->ts_state == RECEIVING);
  t->ts_sender = from;
  t->ts_ident = ident;
  t->ts_buffer = NETBUF;
  t->ts_bufcnt = size;
  t->ts_what = what;
  t->ts_timer = 0;	/* shouldn't be necessary, but can't hurt */
  wakeup((event_t) &t->ts_state);
}

#endif BUFFERED

/* See if someone is handling the request for `from.'
 */
static struct task *find(from, ident)
address from;
char ident;
{
  register struct task *t;

  for (t = task; t < uppertask; t++)
	if ((t->ts_flags & (GETREQ | SERVING | PUTREP)) &&
				t->ts_client == from && t->ts_clident == ident)
		return(t);
  return(NILTASK);
}

/* A request packet has arrived. Find out which task this packet should go to.
 * Send an acknowledgement if it can't do any harm.
 */
static gotrequest(ph, hdr)
register struct pktheader *ph;
header *hdr;
{
  register struct task *t;
  register unsigned /* unshort */ from = pktfrom(ph), to, size = ph->ph_size;

  if (ph->ph_seq == 0)	/* only the first fragment is really interesting */
	if ((ph->ph_type & RETRANS) &&
				(t = find(from, ph->ph_ident)) != NILTASK) {
		netenable();		/* ack got lost, send it again */
		puthead(from, t->ts_addr, ph->ph_ident, ph->ph_seq, ACK, 0);
	}
	else {
		register address location;

		location = portlookup(&hdr->h_port, NOWAIT, DELETE);
		if (location == NOWHERE || siteaddr(location) != local) {
			netenable();
			puthead(from, pktto(ph), ph->ph_ident, 0, NAK, 0);
			return;
		}
		size -= HEADERSIZE;
		t = taskptr(location);
		t->ts_client = from;
		t->ts_clident = ph->ph_ident;
		*t->ts_rhdr = *hdr;
		headerfromnet(t->ts_rhdr);
#ifdef BUFFERED
		gotbuffer(t, from, ph->ph_ident, ph->ph_type, size);
#else
		if ((ph->ph_type & (LAST | RETRANS)) != LAST)
			puthead(from, location, ph->ph_ident, 0, ACK, 0);
		received(t, ph->ph_type, size, (char *) 0);
#endif BUFFERED
	}
  else {	/* seq != 0 */
	to = pktto(ph);
	t = &task[ph->ph_dsttask & 0xFF];
	if (t->ts_state != RECEIVING || ph->ph_seq != t->ts_seq) {
		netenable();
		puthead(from, to, ph->ph_ident, ph->ph_seq, ACK, 0);
	}
	else {
#ifdef BUFFERED
		t->ts_savehdr = (char *) hdr;
		gotbuffer(t, from, ph->ph_ident, ph->ph_type, size);
#else
		puthead(from, to, ph->ph_ident, ph->ph_seq, ACK, 0);
		received(t, ph->ph_type, size, (char *) hdr);
#endif BUFFERED
	}
  }
}

#ifndef NOCLIENT
/* A reply packet has arrived. Send an acknowledgement if it can't do any
 * harm.
 */
static gotreply(ph, hdr)
register struct pktheader *ph;
header *hdr;
{
  register unshort from = pktfrom(ph), to = pktto(ph), size = ph->ph_size;
  register struct task *t = &task[ph->ph_dsttask & 0xFF];

  if (ph->ph_ident != t->ts_svident || ph->ph_seq != t->ts_seq)
	t = NILTASK;
  else if ((t->ts_flags & GETREP) == 0)
	if (t->ts_flags & PUTREQ) {	/* ack for request got lost */
		compare(t->ts_ident, ==, ph->ph_ident);
		getrepstart(t);		/* start the getrep */
		t->ts_signal = 0;
	}
	else
		t = NILTASK;
  if (t != NILTASK) {
	if (ph->ph_seq == 0) {
		*t->ts_rhdr = *hdr;
		headerfromnet(t->ts_rhdr);
		size -= HEADERSIZE;
	}
	else if (t->ts_state != RECEIVING)
		t = NILTASK;
  }
  if (t == NILTASK) {
	netenable();
	puthead(from, to, ph->ph_ident, ph->ph_seq, ACK, 0);
  }
  else {
#ifdef BUFFERED
	t->ts_savehdr = (char *) hdr;
	gotbuffer(t, from, ph->ph_ident, ph->ph_type, size);
#else
	puthead(from, to, ph->ph_ident, ph->ph_seq, ACK, 0);
	received(t, ph->ph_type, size, (char *) hdr);
#endif BUFFERED
  }
}
#endif NOCLIENT

/* A packet has arrived. Call an appropiate routine, after checking some
 * things.
 */
pkthandle(ph, hdr)
register struct pktheader *ph;
register char *hdr;
{
  if (ph->ph_dsttask < ntask && ph->ph_size <= PACKETSIZE)
	switch (ph->ph_type & TYPE) {
  	case LOCATE:
		if (ph->ph_size == 0 || ph->ph_ident != 0 || ph->ph_seq != 0)
			break;
		if ((ph->ph_size % PORTSIZE) != 0)
			break;
		locreq(ph, hdr);
		return(1);
	case REQUEST:
		if (ph->ph_seq == 0 && ph->ph_size < HEADERSIZE)
			break;
		gotrequest(ph, ABSPTR(header *, hdr));
		return(1);
	case ACK:
		if (ph->ph_size != 0)
			break;
		gotack(ph);
		return(1);
	case ENQUIRY:
		if (ph->ph_size != 0)
			break;
		enquiry(ph);
		return(1);
#ifndef NOCLIENT
	case HERE:
		if (ph->ph_size == 0 || ph->ph_ident != 0 || ph->ph_seq != 0)
			break;
		if ((ph->ph_size % PORTSIZE) != 0)
			break;
		here(ph, hdr);
		return(1);
	case REPLY:
		if (ph->ph_seq == 0 && ph->ph_size < HEADERSIZE)
			break;
		gotreply(ph, ABSPTR(header *, hdr));
		return(1);
	case NAK:
		if (ph->ph_size != 0 || ph->ph_seq != 0)
			break;
		gotnak(ph);
		return(1);
	case ALIVE:
		if (ph->ph_size != 0 || ph->ph_seq != 0)
			break;
		alive(ph);
		return(1);
	case DEAD:
		if (ph->ph_size != 0 || ph->ph_seq != 0)
			break;
		dead(ph);
		return(1);
#endif
	case 0:
		return(0);
  }
  return(0);
}

#ifdef notdef  /* don't need this for minix */
handle(to, from, ident, seq, what, size, hdr)	/* compatibility */
address to, from;
char ident, seq, what;
unshort size;
char *hdr;
{
  struct pktheader ph;

  ph.ph_dstnode = siteaddr(to);
  ph.ph_srcnode = siteaddr(from);
  ph.ph_dsttask = tasknum(to);
  ph.ph_srctask = tasknum(from);
  ph.ph_ident = ident;
  ph.ph_seq = seq;
  ph.ph_size = size;
  ph.ph_type = what;
  return pkthandle(&ph, hdr);
}

#endif

/* A timer has gone off too many times. See what went wrong. Restart the task.
 */
static failed(t)
register struct task *t;
{
  assert(t->ts_flags & (GETREQ | PUTREP | GETREP | PUTREQ));
#ifndef NDEBUG
  if (t->ts_flags & (GETREQ | PUTREP))
	printf("%x: client %x failed (%d)\n", t - task,
					t->ts_client, t->ts_state);
  if (t->ts_flags & (GETREP | PUTREQ))
	printf("%x: server %x failed (%d)\n", t - task,
					t->ts_server, t->ts_state);
#endif
#ifdef STATISTICS
  if (t->ts_flags & (GETREQ | PUTREP))
	amstat.ams_clfail++;
  if (t->ts_flags & (GETREP | PUTREQ))
	amstat.ams_svfail++;
#endif
  switch (t->ts_state) {
  case SENDING:		/* Message didn't arrive */
	t->ts_state = FAILED;
	assert(t->ts_flags & (PUTREQ | PUTREP));
	break;
  case WAITBUF:		/* server site has crashed */
	assert(t->ts_flags & GETREP);
  case RECEIVING:
#ifndef NOCLIENT
	if (t->ts_flags & GETREP)
		t->ts_state = FAILED;
	else
#endif
	{
		getreqstart(t);	/* client failed, restart getreq */
		return;
	}
	break;
  default:
	assert(0);
  }
  wakeup((event_t) &t->ts_state);
}

/* A timer went off. See what is wrong.
 */
static again(t)
register struct task *t;
{
  switch (t->ts_state) {
  case SENDING:			/* retransmit */
#ifdef STATISTICS
  if (t->ts_flags & (GETREQ | PUTREP))
	amstat.ams_rxcl++;
  if (t->ts_flags & (GETREP | PUTREQ))
	amstat.ams_rxsv++;
#endif
#ifdef BUFFERED
	wakeup((event_t) &t->ts_state);
#else
	if (!sendfragment(t, RETRANS))
		assert(0);
	t->ts_timer = retranstime;
#endif
	break;
  case WAITBUF:			/* Check if the server is still alive */
	assert(t->ts_flags & GETREP);
  case RECEIVING:
#ifndef NOCLIENT
	if (t->ts_flags & GETREP)  /* See if the other side is still there */
		puthead(t->ts_server, t->ts_addr, t->ts_svident,
						t->ts_signal, ENQUIRY, 0);
#endif
	t->ts_timer = retranstime;
	break;
  default:
	assert(0);
  }
}

#endif NONET	/*--------------------------------------------------*/

/* First check all the timers. If any went off call the appropiate routine.
 * Then see if there are ports to locate.
 */
netsweep(){
  register struct task *t;

  for (t = task; t < uppertask; t++) {
	if (t->ts_timer != 0 && --t->ts_timer == 0)	/* timer expired */
#ifndef NOCLIENT
		if (t->ts_flags & LOCATING)
			portquit(&t->ts_xhdr->h_port, t);
#endif
#ifndef NONET
#ifndef NOCLIENT
		else
#endif
		{
			compare(t->ts_count, !=, 0);
			if (--t->ts_count == 0)		/* serious */
				failed(t);
			else				/* try again */
				again(t);
			break;
		}
	if (t->ts_cltim != 0 && --t->ts_cltim == 0) {	/* client crashed */
#ifdef STATISTICS
		amstat.ams_clcrash++;
#endif
		putsig(t, CRASH);
	}
#endif NONET
  }
}

#ifdef BUFFERED

/*  Data has arrived. Get it. If there's more, wait for it.
 */
static recvbuf(){
  register struct task *c = curtask, *t;

  c->ts_state = WAITBUF;
  for (;;) {
#ifndef NONET
	if (c->ts_buffer == NETBUF) {		/* something from the net */
		c->ts_buffer = NOBUF;
		puthead(c->ts_sender, c->ts_addr, c->ts_ident, c->ts_seq,
								ACK, 0);
		received(c, (char) c->ts_what, c->ts_bufcnt, c->ts_savehdr);
		if (c->ts_state != RECEIVING)
			return;
	}
	else
#endif NONET
	if (c->ts_buffer != NOBUF && !putbuf(c->ts_buffer, (vir_bytes)
				(c->ts_rbuf + c->ts_offset), c->ts_bufcnt)) {
#ifndef NDEBUG
		printf("%x: bad rbuf (received from %x)\n",
						c->ts_addr, t->ts_addr);
#endif
		c->ts_state = MEMFAULT;		/* receiver fails */
		freebuf(c->ts_buffer);
		c->ts_buffer = NOBUF;
		if (c->ts_sender != NOWHERE) {
			t = taskptr(c->ts_sender);
			t->ts_state = FAILED;
			wakeup((event_t) &t->ts_state);
		}
		return;
	}
	else {		/* local copy done */
		c->ts_offset += c->ts_bufcnt;
		if (c->ts_bufcnt < BUFSIZE) {	/* last buffer */
			if (c->ts_buffer != NOBUF) {
				freebuf(c->ts_buffer);
				c->ts_buffer = NOBUF;
			}
			c->ts_state = DONE;
			return;
		}
		else {			/* more buffers expected */
			compare(c->ts_sender, !=, NOWHERE);
			c->ts_state = RECEIVING;
			t = taskptr(c->ts_sender);
			compare(t->ts_state, ==, SENDING);
			assert(t->ts_flags & (PUTREQ | PUTREP));
			wakeup((event_t) &t->ts_state);
		}
	}
	if (sleep((event_t) &c->ts_state)) {		/* wait for rest */
		portremove(&c->ts_rhdr->h_port, c->ts_addr);
		if (c->ts_buffer != NOBUF) {
			freebuf(c->ts_buffer);
			c->ts_buffer = NOBUF;
		}
		c->ts_state = ABORT;
		return;
	}
  }
}

#endif BUFFERED

/* The transaction is local. This routine does the sending.
 */
static sendbuf(t)
register struct task *t;
{
  register struct task *c = curtask;
  register unshort cnt = t->ts_rcnt < c->ts_xcnt ? t->ts_rcnt : c->ts_xcnt;
#ifdef BUFFERED
  register unshort size;
  buffer allocbuf();

  c->ts_state = SENDING;
  t->ts_sender = c->ts_addr;
  if (cnt == 0)
	t->ts_buffer = NOBUF;
  else if ((t->ts_buffer = allocbuf()) == NOBUF) {
	c->ts_xcnt = FAIL;
	c->ts_state = FAILED;
	if (!(t->ts_flags & GETREQ)) {
		t->ts_state = FAILED;	/* trans fails */
		wakeup((event_t) &t->ts_state);
	}
	return;
  }
  do {
	if ((size = cnt) != 0) {
		if (size > BUFSIZE)
			size = BUFSIZE;
		if (!getbuf(t->ts_buffer, (vir_bytes) (c->ts_xbuf +
						c->ts_offset), size)) {
#ifndef NDEBUG
			printf("%x: bad xbuf (sending to %x)\n",
						c->ts_addr, t->ts_addr);
#endif
			freebuf(t->ts_buffer);
			t->ts_buffer = NOBUF;
			c->ts_xcnt = FAIL;		/* for putrep */
			c->ts_state = MEMFAULT;
			if (!(t->ts_flags & GETREQ)) {
				t->ts_state = FAILED;	/* trans fails */
				wakeup((event_t) &t->ts_state);
			}
			break;
		}
	}
	else if (t->ts_buffer != NOBUF) {
		freebuf(t->ts_buffer);
		t->ts_buffer = NOBUF;
	}
	t->ts_bufcnt = size;
	assert(t->ts_state == WAITBUF || t->ts_state == RECEIVING);
	wakeup((event_t) &t->ts_state);
	if (size != BUFSIZE) {		/* last buffer */
		t->ts_sender = NOWHERE;
		c->ts_state = DONE;
		break;
	}
	cnt -= size;
	c->ts_offset += size;
	if (sleep((event_t) &c->ts_state))
		c->ts_state = ABORT;
  } while (c->ts_state == SENDING);
#else
  compare(t->ts_state, ==, WAITBUF);
  if (cnt != 0) {
	register phys_bytes rbuf, xbuf;

	if ((rbuf = umap(t, t->ts_rbuf, (vir_bytes) cnt)) == 0) {
#ifndef NDEBUG
		printf("bad rbuf (%X,%X)\n", t->ts_rbuf, (vir_bytes) cnt);
#endif
		c->ts_state = FAILED;
		c->ts_xcnt = FAIL;
		t->ts_state = MEMFAULT;
		wakeup((event_t) &t->ts_state);
		return;
	}
	if ((xbuf = umap(c, c->ts_xbuf, (vir_bytes) cnt)) == 0) {
#ifndef NDEBUG
		printf("bad xbuf (%X,%X)\n", c->ts_xbuf, (vir_bytes) cnt);
#endif
		c->ts_state = MEMFAULT;
		c->ts_xcnt = FAIL;
		if (t->ts_flags & GETREQ)
			getreqstart(t);	/* client failed, restart getreq */
		else {
			t->ts_state = FAILED;
			wakeup((event_t) &t->ts_state);
		}
		return;
	}
	phys_copy(xbuf, rbuf, (phys_bytes) cnt);
  }
  t->ts_offset = cnt;
  t->ts_state = DONE;
  wakeup((event_t) &t->ts_state);
  c->ts_state = DONE;
#endif BUFFERED
}

#ifndef NOCLIENT
/* The transaction is local. Send the request to the server.
 */
static recvrequest(){
  register address to;
  register struct task *c = curtask, *t;

  if ((to = portlookup(&c->ts_xhdr->h_port, NOWAIT, DELETE)) == NOWHERE)
	return(0);
#ifndef NONET
  if (siteaddr(to) != local)
	return(0);
#endif
  t = taskptr(to);
  c->ts_server = to;
  t->ts_client = c->ts_addr;
  t->ts_clident = c->ts_svident;
  *t->ts_rhdr = *c->ts_xhdr;
  sendbuf(t);
  return(c->ts_xcnt != FAIL);
}
#endif

/* A task calls this routine when it wants to be blocked awaiting a request.
 * It specifies the header containing the port to wait for, a buffer where
 * the data must go and the size of this buffer. Getreq returns the size of
 * the request when one arrives.
 */
unshort getreq(hdr, buf, cnt)
header *hdr;
bufptr buf;
unshort cnt;
{
  register struct task *c = curtask;

  if (c->ts_flags != 0 || cnt > MAXCNT)
	return(FAIL);
  if (NullPort(&hdr->h_port))
	return(FAIL);
#ifdef STATISTICS
  amstat.ams_getreq++;
#endif
  c->ts_rhdr = hdr;
  c->ts_rbuf = buf;
  c->ts_rcnt = cnt;
  c->ts_flags |= GETREQ;
  getreqstart(c);
  if (sleep((event_t) &c->ts_state)) {
	portremove(&hdr->h_port, c->ts_addr);
	c->ts_state = ABORT;
  }
#ifdef BUFFERED
  if (c->ts_state == WAITBUF)
	recvbuf();
#endif
  c->ts_flags &= ~GETREQ;
  switch (c->ts_state) {
  case DONE:
	c->ts_flags |= SERVING;
#ifndef NONET
	if (area(c->ts_client) != LOCAL)
		c->ts_cltim = clientcrash;
#endif NONET
	cnt = c->ts_offset;
	break;
  case ABORT:
	cnt = ABORTED;
	break;
  case MEMFAULT:
	cnt = BADADDRESS;
	break;
  default:
	assert(0);
  }
  c->ts_state = IDLE;
  return(cnt);
}

/* A task wants to send a reply to its client. Putrep returns the size of
 * the reply.
 */
unshort putrep(hdr, buf, cnt)
header *hdr;
bufptr buf;
unshort cnt;
{
  register struct task *c = curtask;

  if (c->ts_flags != SERVING)
	return(FAIL);
  c->ts_flags &= ~SERVING;
  if (cnt > MAXCNT)
	return(FAIL);
#ifdef STATISTICS
  amstat.ams_putrep++;
#endif
  c->ts_cltim = 0;
  c->ts_xhdr = hdr;
  c->ts_xbuf = buf;
  c->ts_xcnt = cnt;
  c->ts_offset = 0;
  c->ts_flags |= PUTREP;
#ifndef NONET
  if (siteaddr(c->ts_client) != local)
	send();
  else
#endif
  {		/* local transaction */
	register struct task *t = taskptr(c->ts_client);

	if (t->ts_server == c->ts_addr) {
		*t->ts_rhdr = *hdr;
		sendbuf(t);
	}
  }
  c->ts_flags &= ~PUTREP;
  if (c->ts_state == MEMFAULT)
	cnt = BADADDRESS;
  c->ts_state = IDLE;
  return(cnt);
}

#ifndef NOCLIENT
/* Somebody wants to contact a server, and wait for a reply. The port this
 * server should listen to is specified in the first header. The reply
 * comes in the second. Trans returns the size of the reply, or FAIL if
 * a transaction fails after the server has been located.
 */
unshort trans(hdr1, buf1, cnt1, hdr2, buf2, cnt2)
header *hdr1, *hdr2;
bufptr buf1, buf2;
unshort cnt1, cnt2;
{
  register struct task *c = curtask;

  if ((c->ts_flags & ~SERVING) || cnt1 > MAXCNT || cnt2 > MAXCNT)
	return(FAIL);
  if (NullPort(&hdr1->h_port))
	return(FAIL);
#ifdef STATISTICS
  amstat.ams_trans++;
#endif
  for (;;) {
	c->ts_state = IDLE;
	c->ts_xhdr = hdr1; c->ts_xbuf = buf1; c->ts_xcnt = cnt1;
	c->ts_rhdr = hdr2; c->ts_rbuf = buf2; c->ts_rcnt = cnt2;
	c->ts_signal = 0;
	if (!PortCmp(&c->ts_portcache, &hdr1->h_port)) {
		c->ts_flags |= LOCATING;
		c->ts_timer = c->ts_maxloc;
		c->ts_totloc -= ticker;
		c->ts_server = portlookup(&hdr1->h_port, WAIT, LOOK);
		c->ts_totloc += ticker;
		c->ts_timer = 0;
		c->ts_flags &= ~LOCATING;
		switch (c->ts_server) {
		case NOWHERE:		/* server not found */
			c->ts_portcache = NULLPORT;
			return(c->ts_signal == 0 ? NOTFOUND : ABORTED);
		case SOMEWHERE:
			c->ts_portcache = NULLPORT;
			return(FAIL);
		}
		c->ts_portcache = hdr1->h_port;
	}
#ifdef notdef
	else
		c->ts_server = siteaddr(c->ts_server);
#endif
	c->ts_svident++;
	c->ts_offset = 0;
	c->ts_flags |= PUTREQ;
#ifndef NONET
	if (siteaddr(c->ts_server) != local) {
#ifdef STATISTICS
		amstat.ams_remtrans++;
#endif
		c->ts_totsvr -= ticker;
		send();
		c->ts_flags &= ~PUTREQ;
#ifdef BUFFERED
		if (c->ts_state == WAITBUF)
			recvbuf();	/* await the reply */
#endif
		c->ts_totsvr += ticker;
		if (c->ts_state == NACKED || c->ts_state == FAILED) {
			portremove(&hdr1->h_port, siteaddr(c->ts_server));
			c->ts_portcache = NULLPORT;
		}
		if (c->ts_state != NACKED)
			break;
#ifdef STATISTICS
		amstat.ams_naks++;
#endif
		c->ts_portcache = NULLPORT;
	}
	else
#endif NONET
	if (recvrequest()) {	/* local transaction */
#ifdef STATISTICS
		amstat.ams_loctrans++;
#endif
		c->ts_flags &= ~PUTREQ;
		c->ts_flags |= GETREP;
		c->ts_totsvr -= ticker;
		c->ts_offset = 0;
		c->ts_state = WAITBUF;
		if (c->ts_signal != 0) {
			putsig(taskptr(c->ts_server),
					(unshort) (c->ts_signal & 0xFF));
			c->ts_signal = 0;
		}
		if (sleep((event_t) &c->ts_state))
			c->ts_state = ABORT;
#ifdef BUFFERED
		else
			recvbuf();
#endif
		c->ts_totsvr += ticker;
		break;
	}
	else {		/* too bad, try again */
		c->ts_flags &= ~PUTREQ;
		if (c->ts_state == MEMFAULT)
			break;
	}
	c->ts_portcache = NULLPORT;
	if (c->ts_signal != 0) {
		c->ts_state = ABORT;
		break;
	}
  }
  c->ts_signal = 0;
  c->ts_flags &= ~(PUTREQ | GETREP);
  if (c->ts_state == DONE) {
	c->ts_state = IDLE;
	return c->ts_offset;
  }
#ifndef NDEBUG
  printf("trans failed with %x (state = %d; command = %d; port ",
			c->ts_server, c->ts_state, c->ts_xhdr->h_command);
  prport(&c->ts_xhdr->h_port);
  printf(")\n");
#endif
  switch (c->ts_state) {
  case FAILED:
  case ABORT:
	cnt2 = FAIL;
	break;
  case MEMFAULT:
	cnt2 = BADADDRESS;
	break;
  default:
	assert(0);
  }
  c->ts_state = IDLE;
  return(cnt2);
}
#endif NOCLIENT

/* If doing a transaction, send a signal to the server. For a remote server,
 * the signal is sent along with enquiries. If it's still locating a server,
 * abort that.  If it isn't doing a transaction, but blocked in a getreq,
 * abort that.
 */
sendsig(t, signal)
register struct task *t;
char signal;
{
#ifndef NOCLIENT
  if (t->ts_flags & (LOCATING | PUTREQ | GETREP))
	t->ts_signal = signal;
  if (t->ts_flags & LOCATING)
	portquit(&t->ts_xhdr->h_port, t);
  else if (t->ts_state == WAITBUF)
	if (t->ts_flags & GETREQ) {
		portremove(&t->ts_rhdr->h_port, t->ts_addr);
		t->ts_state = ABORT;
		wakeup((event_t) &t->ts_state);
	}
	else if (signal != 0 && (t->ts_flags & GETREP))
#ifndef NONET
		if (area(t->ts_server) != LOCAL)
			puthead(t->ts_server, t->ts_addr, t->ts_svident,
							signal, ENQUIRY, 0);
		else
#endif NONET
		{
			putsig(taskptr(t->ts_server),
						(unshort) (signal & 0xFF));
			t->ts_signal = 0;
		}
#endif NOCLIENT
}

#ifndef NOCLIENT
/* Abort anything task s is doing. If the task is serving somebody, notify
 * it that the server has failed.
 */
destroy(s)
register struct task *s;
{
  register struct task *t;

  sendsig(s, (char) CRASH);
  if (s->ts_flags & SERVING)
#ifndef NONET
	if (area(s->ts_client) != LOCAL) {
		puthead(s->ts_client, s->ts_addr, s->ts_clident, 0, DEAD, 0);
		s->ts_cltim = 0;
	}
	else
#endif
	{
#ifndef NDEBUG
		printf("%x destroyed, %x victim\n", s->ts_addr, s->ts_client);
#endif
		t = taskptr(s->ts_client);
		if (t->ts_state == WAITBUF) {
			assert(t->ts_flags & GETREP);
			t->ts_timer = 0;
			t->ts_state = FAILED;
			wakeup((event_t) &t->ts_state);
		}
	}
  s->ts_timer = 0;
  if (s->ts_flags & (LOCATING|GETREQ|GETREP|PUTREQ|PUTREP)) {
	s->ts_state = ABORT;
	wakeup((event_t) &s->ts_state);
  }
  else {
	s->ts_state = IDLE;
	s->ts_flags = 0;
  }
  s->ts_server = s->ts_client = 0;
  s->ts_portcache = NULLPORT;
}

/* Clean up the mess.
 */
cleanup(){
  register struct task *c = curtask;

  compare(c->ts_state, ==, IDLE);
  destroy(c);
}
#endif


/* Limit the maximum locate time & service time. 0 is infinite.
 */
unshort timeout(maxloc)
unshort maxloc;
{
  unshort oldloc = curtask->ts_maxloc;

  curtask->ts_maxloc = maxloc;
  return(oldloc);
}

#ifndef NDEBUG
transdump(){
  register struct task *t;
  static char *states[] = {
	"IDLE", "SENDING", "DONE", "ACKED", "NACKED", "FAILED",
	"WAITBUF", "RECEIVING", "ABORT", "MEMFAULT"
  };
  static struct ftab {
	unshort flag;
	char *name;
  } ftab[] = {
	{ GETREQ,	"GETREQ"  }, { SERVING,	 "SERVING" },
	{ PUTREP,	"PUTREP"  }, { LOCATING, "LOCATE"  },
	{ PUTREQ,	"PUTREQ"  }, { GETREP,	 "GETREP"  },
  };
  register struct ftab *p;

  printf("\nTK STATE    CTM TIM CNT  CLT  SRV CLI SVI SEQ SIG FLAGS\n");
  for (t = task; t < uppertask; t++) {
	if (t->ts_state == IDLE && t->ts_flags == 0) {
		compare(t->ts_cltim, ==, 0);
		compare(t->ts_timer, ==, 0);
		compare(t->ts_signal, ==, 0);
		continue;
	}
	printf("%2d %9s%3d %3d %3d %4x %4x %3d %3d %3d %3d", t - task,
		states[t->ts_state], t->ts_cltim, t->ts_timer, t->ts_count,
		t->ts_client, t->ts_server, t->ts_clident & 0xFF,
		t->ts_svident & 0xFF, t->ts_seq & 0xFF, t->ts_signal & 0xFF);
	for (p = ftab; p < &ftab[sizeoftable(ftab)]; p++)
		if (t->ts_flags & p->flag)
			printf(" %s", p->name);
	if (t->ts_flags & (GETREQ | LOCATING | GETREP)) {
		printf("  '");
		prport(t->ts_flags & GETREQ ? &t->ts_rhdr->h_port
							: &t->ts_xhdr->h_port);
		printf("'");
	}
	printf("\n");
  }
}
#endif NDEBUG

trinit(){
  curtask->ts_addr = ((curtask - task) << 8 | local);
}

/* Get the site address.
 */
transinit(){
#ifdef NONET
  local = 1;
#else
  extern address interinit();

  local = siteaddr(interinit());
/*
  netenable();
*/
#endif NONET
}

amdump()
{
#ifdef STATISTICS
  printf("\nAmoeba statistics:\n");
  printf("clfail  %7D ", amstat.ams_clfail);
  printf("svfail  %7D ", amstat.ams_svfail);
  printf("clcrash %7D ", amstat.ams_clcrash);
  printf("rxcl    %7D ", amstat.ams_rxcl);
  printf("rxsv    %7D\n",amstat.ams_rxsv);
  printf("trans   %7D ", amstat.ams_trans);
  printf("local   %7D ", amstat.ams_loctrans);
  printf("remote  %7D ", amstat.ams_remtrans);
  printf("getreq  %7D ", amstat.ams_getreq);
  printf("putrep  %7D\n",amstat.ams_putrep);
  printf("naks    %7D\n",amstat.ams_naks);
#endif
}