4.4BSD/usr/src/contrib/news/inn/innd/chan.c

Compare this file to the similar file:
Show the results in this format:

/*  $Revision: 1.30 $
**
**  I/O channel (and buffer) processing.
*/
#include "innd.h"


STATIC FDSET	RCHANmask;
STATIC FDSET	SCHANmask;
STATIC FDSET	WCHANmask;
STATIC int	SCHANcount;
STATIC int	CHANlastfd;
STATIC int	CHANlastsleepfd;
STATIC int	CHANccfd;
STATIC int	CHANtablesize;
STATIC CHANNEL	*CHANtable;
STATIC CHANNEL	*CHANcc;
STATIC CHANNEL	CHANnull = { CTfree, CSerror, -1 };


/*
**  Set a buffer's contents, ignoring anything that might have
**  been there.
*/
void
BUFFset(bp, p, length)
    register BUFFER	*bp;
    register char	*p;
    register int	length;
{
    register char	*dest;

    if ((bp->Left = length) != 0) {
	/* Need more space? */
	if (bp->Size < length) {
	    bp->Size = GROW_AMOUNT(length);
	    RENEW(bp->Data, char, bp->Size);
	}

	if (length > MEMCPY_THRESHOLD)
	    (void)memcpy((POINTER)bp->Data, (POINTER)p, (SIZE_T)length);
	else {
	    for (dest = bp->Data, length++; --length > 0; )
		*dest++ = *p++;
	}
    }
    bp->Used = 0;
}


/*
**  Initialize all the I/O channels.
*/
void
CHANsetup(i)
    register int	i;
{
    register CHANNEL	*cp;

    FD_ZERO(&RCHANmask);
    FD_ZERO(&SCHANmask);
    FD_ZERO(&WCHANmask);
    if (CHANtable)
	DISPOSE(CHANtable);
    CHANtablesize = i;
    CHANtable = NEW(CHANNEL, CHANtablesize);
    (void)memset((POINTER)CHANtable, 0,
	    (SIZE_T)(CHANtablesize * sizeof *CHANtable));
    CHANnull.NextLog = CHANNEL_INACTIVE_TIME;
    CHANnull.Address.s_addr = MyAddress.s_addr;
    for (cp = CHANtable; --i >= 0; cp++)
	*cp = CHANnull;
}


/*
**  Create a channel from a descriptor.
*/
CHANNEL *
CHANcreate(fd, Type, State, Reader, WriteDone)
    int			fd;
    CHANNELTYPE		Type;
    CHANNELSTATE	State;
    FUNCPTR		Reader;
    FUNCPTR		WriteDone;
{
    register CHANNEL	*cp;
    BUFFER		in;
    BUFFER		out;

    cp = &CHANtable[fd];

    /* Don't overwrite the buffers with CHANnull. */
    in = cp->In;
    if (in.Size == 0) {
	in.Size = START_BUFF_SIZE;
	in.Data = NEW(char, in.Size);
    }
    in.Used = 0;
    in.Left = in.Size;
    out = cp->Out;
    if (out.Size == 0) {
	out.Size = SMBUF;
	out.Data = NEW(char, out.Size);
    }
    out.Used = 0;
    out.Left = 0;

    /* Set up the channel's info. */
    *cp = CHANnull;
    cp->fd = fd;
    cp->Type = Type;
    cp->State = State;
    cp->Reader = Reader;
    cp->WriteDone = WriteDone;
    cp->Started = cp->LastActive = Now.time;
    cp->In = in;
    cp->Out = out;
    cp->Tracing = Tracing;

    /* Make the descriptor close-on-exec and non-blocking. */
    CloseOnExec(fd, TRUE);
#if	defined(ENOTSOCK)
    if (SetNonBlocking(fd, TRUE) < 0 && errno != ENOTSOCK)
	syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#else
    if (SetNonBlocking(fd, TRUE) < 0)
	syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#endif	/* defined(ENOTSOCK) */

    /* Note control channel, for efficiency. */
    if (Type == CTcontrol) {
	CHANcc = cp;
	CHANccfd = fd;
    }
    return cp;
}


/*
**  Start tracing a channel.
*/
void
CHANtracing(cp, Flag)
    register CHANNEL	*cp;
    BOOL		Flag;
{
    char		*p;

    p = CHANname(cp);
    syslog(L_NOTICE, "%s trace %s", p, Flag ? "on" : "off");
    cp->Tracing = Flag;
    if (Flag) {
	syslog(L_NOTICE, "%s trace badwrites %d blockwrites %d badreads %d",
	    p, cp->BadWrites, cp->BadReads, cp->BlockedWrites);
	syslog(L_NOTICE, "%s trace address %s lastactive %ld nextlod %ld",
	    p, inet_ntoa(cp->Address), cp->LastActive, cp->NextLog);
	if (FD_ISSET(cp->fd, &SCHANmask))
	    syslog(L_NOTICE, "%s trace sleeping %ld 0x%x",
		p, (long)cp->Waketime, cp->Waker);
	if (FD_ISSET(cp->fd, &RCHANmask))
	    syslog(L_NOTICE, "%s trace reading %d %s",
		p, cp->In.Used, MaxLength(cp->In.Data, cp->In.Data));
	if (FD_ISSET(cp->fd, &WCHANmask))
	    syslog(L_NOTICE, "%s trace writing %d %s",
		p, cp->Out.Left, MaxLength(cp->Out.Data, cp->Out.Data));
    }
}


/*
**  Close a channel.
*/
void
CHANclose(cp, name)
    register CHANNEL	*cp;
    char		*name;
{
    if (cp->Type == CTfree)
	syslog(L_ERROR, "%s internal closing free channel", name);
    else {
	if (cp->Type == CTnntp)
	    syslog(L_NOTICE,
		"%s closed seconds %ld accepted %ld refused %ld rejected %ld",
		name, (long)(Now.time - cp->Started),
		cp->Received, cp->Refused, cp->Rejected);
	else
	    syslog(L_NOTICE, "%s closed", name);
	WCHANremove(cp);
	RCHANremove(cp);
	SCHANremove(cp);
	if (cp->Argument != NULL)
	    /* Set to NULL below. */
	    DISPOSE(cp->Argument);
	if (cp->fd >= 0 && close(cp->fd) < 0)
	    syslog(L_ERROR, "%s cant close %s %m", LogName, name);
    }

    /* Mark it unused. */
    cp->Type = CTfree;
    cp->State = CSerror;
    cp->fd = -1;
    cp->Argument = NULL;

    /* Free the buffers if they got big. */
    if (cp->In.Size > BIG_BUFFER) {
	cp->In.Size = 0;
	DISPOSE(cp->In.Data);
    }
    if (cp->Out.Size > BIG_BUFFER) {
	cp->Out.Size = 0;
	DISPOSE(cp->Out.Data);
    }
}


/*
**  Return a printable name for the channel.
*/
char *
CHANname(cp)
    register CHANNEL	*cp;
{
    static char		buff[SMBUF];
    register int	i;
    register SITE	*sp;
    STRING		p;
    PID_T		pid;

    switch (cp->Type) {
    default:
	(void)sprintf(buff, "?%d(#%d@%d)?", cp->Type, cp->fd, cp - CHANtable);
	break;
    case CTany:
	(void)sprintf(buff, "any:%d", cp->fd);
	break;
    case CTfree:
	(void)sprintf(buff, "free:%d", cp->fd);
	break;
    case CTremconn:
	(void)sprintf(buff, "remconn:%d", cp->fd);
	break;
    case CTnntp:
	(void)sprintf(buff, "%s:%d",
		cp->Address.s_addr == 0 ? "localhost" : RChostname(cp),
		cp->fd);
	break;
    case CTlocalconn:
	(void)sprintf(buff, "localconn:%d", cp->fd);
	break;
    case CTcontrol:
	(void)sprintf(buff, "control:%d", cp->fd);
	break;
    case CTexploder:
    case CTfile:
    case CTprocess:
	/* Find the site that has this channel. */
	for (p = "?", i = nSites, sp = Sites, pid = 0; --i >= 0; sp++)
	    if (sp->Channel == cp) {
		p = sp->Name;
		if (cp->Type != CTfile)
		    pid = sp->pid;
		break;
	    }
	if (pid == 0)
	    (void)sprintf(buff, "%s:%d:%s",
		    MaxLength(p, p), cp->fd,
		    cp->Type == CTfile ? "file" : "proc");
	else
	    (void)sprintf(buff, "%s:%d:%s:%ld",
		    MaxLength(p, p), cp->fd,
		    cp->Type == CTfile ? "file" : "proc", (long)pid);
	break;
    }
    return buff;
}


/*
**  Return the channel for a specified descriptor.
*/
CHANNEL *
CHANfromdescriptor(fd)
    int		fd;
{
    if (fd <0 || fd > CHANtablesize)
	return NULL;
    return &CHANtable[fd];
}


/*
**  Iterate over all channels of a specified type.
*/
CHANNEL *
CHANiter(ip, Type)
    int			*ip;
    CHANNELTYPE		Type;
{
    register CHANNEL	*cp;
    register int	i;

    if ((i = *ip) >= 0 && i < CHANtablesize) {
	do {
	    cp = &CHANtable[i];
	    if (Type == CTany || cp->Type == Type) {
		*ip = ++i;
		return cp;
	    }
	} while (++i < CHANtablesize);
    }
    return NULL;
}


/*
**  Mark a channel as an active reader.
*/
void
RCHANadd(cp)
    register CHANNEL	*cp;
{
    FD_SET(cp->fd, &RCHANmask);
    if (cp->fd > CHANlastfd)
	CHANlastfd = cp->fd;

    /* Start reading at the beginning of the buffer. */
    cp->In.Used = 0;
}


/*
**  Remove a channel from the set of readers.
*/
void
RCHANremove(cp)
    register CHANNEL	*cp;
{
    if (FD_ISSET(cp->fd, &RCHANmask)) {
	FD_CLR(cp->fd, &RCHANmask);
	if (cp->fd == CHANlastfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastfd, &RCHANmask)
	      && !FD_ISSET(CHANlastfd, &WCHANmask)
	      && CHANlastfd > 1)
		CHANlastfd--;
	}
    }
}


/*
**  Put a channel to sleep, call a function when it wakes.
**  Note that the Argument must be NULL or allocated memory!
*/
void
SCHANadd(cp, Waketime, Event, Waker, Argument)
    register CHANNEL	*cp;
    time_t		Waketime;
    POINTER		Event;
    FUNCPTR		Waker;
    POINTER		Argument;
{
    if (!FD_ISSET(cp->fd, &SCHANmask)) {
	SCHANcount++;
	FD_SET(cp->fd, &SCHANmask);
    }
    if (cp->fd > CHANlastsleepfd)
	CHANlastsleepfd = cp->fd;
    cp->Waketime = Waketime;
    cp->Waker = Waker;
    if (cp->Argument != Argument) {
	DISPOSE(cp->Argument);
	cp->Argument = Argument;
    }
    cp->Event = Event;
}


/*
**  Take a channel off the sleep list.
*/
void
SCHANremove(cp)
    register CHANNEL	*cp;
{
    if (FD_ISSET(cp->fd, &SCHANmask)) {
	FD_CLR(cp->fd, &SCHANmask);
	SCHANcount--;
	cp->Waketime = 0;
	if (cp->fd == CHANlastsleepfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastsleepfd, &WCHANmask)
	      && CHANlastsleepfd > 1)
		CHANlastsleepfd--;
	}
    }
}


/*
**  Is a channel on the sleep list?
*/
BOOL
CHANsleeping(cp)
    CHANNEL	*cp;
{
    return FD_ISSET(cp->fd, &SCHANmask);
}


/*
**  Wake up channels waiting for a specific event.
*/
void
SCHANwakeup(Event)
    register POINTER	Event;
{
    register CHANNEL	*cp;
    register int	i;

    for (cp = CHANtable, i = CHANtablesize; --i >= 0; cp++)
	if (cp->Type != CTfree && cp->Event == Event && CHANsleeping(cp))
	    cp->Waketime = 0;
}


/*
**  Mark a channel as an active writer.  Don't reset the Out->Left field
**  since we could have buffered I/O already in there.
*/
void
WCHANadd(cp)
    register CHANNEL	*cp;
{
    if (cp->Out.Left > 0) {
	FD_SET(cp->fd, &WCHANmask);
	if (cp->fd > CHANlastfd)
	    CHANlastfd = cp->fd;
    }
}


/*
**  Remove a channel from the set of writers.
*/
void
WCHANremove(cp)
    register CHANNEL	*cp;
{
    if (FD_ISSET(cp->fd, &WCHANmask)) {
	FD_CLR(cp->fd, &WCHANmask);
	if (cp->Out.Left <= 0) {
	    /* No data left -- reset used so we don't grow the buffer. */
	    cp->Out.Used = 0;
	    cp->Out.Left = 0;
	}
	if (cp->fd == CHANlastfd) {
	    /* This was the highest descriptor, get a new highest. */
	    while (!FD_ISSET(CHANlastfd, &RCHANmask)
	      && !FD_ISSET(CHANlastfd, &WCHANmask)
	      && CHANlastfd > 1)
		CHANlastfd--;
	}
    }
}


/*
**  Set a channel to start off with the contents of an existing channel.
*/
void
WCHANsetfrombuffer(cp, bp)
    CHANNEL	*cp;
    BUFFER	*bp;
{
    WCHANset(cp, &bp->Data[bp->Used], bp->Left);
}



/*
**  Read in text data, return the amount we read.
*/
int
CHANreadtext(cp)
    register CHANNEL	*cp;
{
    register int	i;
    register BUFFER	*bp;
    char		*p;

    p = CHANname(cp);

    /* Read in whatever is there. */
    bp = &cp->In;
    bp->Left = bp->Size - bp->Used;
    i = read(cp->fd, &bp->Data[bp->Used], bp->Left - 1);
    if (i < 0) {
	syslog(L_ERROR, "%s cant read %m", p);
	return -1;
    }
    if (i == 0) {
	syslog(L_NOTICE, "%s readclose", p);
	CHANclose(cp, p);
	return 0;
    }

    /* Update values, grow buffer if we're getting close. */
    bp->Used += i;
    bp->Left -= i;
    if (bp->Left <= LOW_WATER) {
	i = GROW_AMOUNT(bp->Size);
	bp->Size += i;
	bp->Left += i;
	RENEW(bp->Data, char, bp->Size);
    }
    return i;
}


/*
**  If I/O backs up a lot, we can get EMSGSIZE on some systems.  If that
**  happens we want to do the I/O in chunks.  We assume stdio's BUFSIZ is
**  a good chunk value.
*/
STATIC int
largewrite(fd, p, length)
    register int	fd;
    register char	*p;
    register int	length;
{
    register int	i;
    register char	*save;

    do {
	/* Try the standard case -- write it all. */
	i = write(fd, (POINTER)p, (SIZE_T)length);
	if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR))
	    return i;
    } while (i < 0 && errno == EINTR);

    /* Write it in pieces. */
    for (save = p, i = 0; length; p += i, length -= i) {
	i = write(fd, (POINTER)p, (SIZE_T)(length > BUFSIZ ? BUFSIZ : length));
	if (i <= 0)
	    break;
    }

    /* Return error, or partial results if we got something. */
    return p == save ? i : p - save;
}


/*
**  Try to flush out the buffer.  Use this only on file channels!
*/
BOOL
WCHANflush(cp)
    register CHANNEL	*cp;
{
    register BUFFER	*bp;
    register int	i;

    /* If nothing in there, or nothing left, nothing to do. */
    bp = &cp->Out;
    if (bp->Left == 0)
	return TRUE;

    /* Write it. */
    while (bp->Left > 0) {
	i = write(cp->fd, (POINTER)&bp->Data[bp->Used], (SIZE_T)bp->Left);
	if (i < 0) {
	    syslog(L_ERROR, "%s cant flush count %d %m",
		CHANname(cp), bp->Left);
	    return FALSE;
	}
	if (i == 0) {
	    syslog(L_ERROR, "%s cant flush count %d",
		CHANname(cp), bp->Left);
	    return FALSE;
	}
	bp->Left -= i;
	bp->Used += i;
	if (bp->Left <= 0)
	    WCHANremove(cp);
    }
    return TRUE;
}



/*
**  Wakeup routine called after a write channel was put to sleep.
*/
STATIC FUNCTYPE
CHANwakeup(cp)
    CHANNEL	*cp;
{
    syslog(L_NOTICE, "%s wakeup", CHANname(cp));
    WCHANadd(cp);
}


/*
**  Attempting to write would block; stop output or give up.
*/
STATIC void
CHANwritesleep(cp, p)
    register CHANNEL	*cp;
    char		*p;
{
    int			i;

    if ((i = ++(cp->BlockedWrites)) > BAD_IO_COUNT)
	switch (cp->Type) {
	default:
	    break;
	case CTnntp:
	case CTfile:
	case CTexploder:
	case CTprocess:
	    syslog(L_ERROR, "%s blocked closing", p);
	    SITEchanclose(cp);
	    CHANclose(cp, p);
	    return;
	}
    i *= BLOCK_BACKOFF;
    syslog(L_ERROR, "%s blocked sleeping %d", p, i);
    SCHANadd(cp, (time_t)(Now.time + i), (POINTER)NULL,
	CHANwakeup, (POINTER)NULL);
}


#if	defined(INND_FIND_BAD_FDS)
/*
**  We got an unknown error in select.  Find out the culprit.
**  Not really ready for production use yet, and it's expensive, too.
*/
STATIC void
CHANdiagnose()
{
    FDSET		Test;
    int			i;
    struct timeval 	t;

    FD_ZERO(&Test);
    for (i = CHANlastfd; i >= 0; i--) {
	if (FD_ISSET(i, &RCHANmask)) {
	    FD_SET(i, &Test);
	    t.tv_sec = 0;
	    t.tv_usec = 0;
	    if (select(i + 1, &Test, (FDSET *)NULL, (FDSET *)NULL, &t) < 0
	      && errno != EINTR) {
		syslog(L_ERROR, "%s Bad Read File %d", LogName, i);
		FD_CLR(i, &RCHANmask);
		/* Probably do something about the file descriptor here; call
		 * CHANclose on it? */
	    }
	    FD_CLR(i, &Test);
	}
	if (FD_ISSET(i, &WCHANmask)) {
	    FD_SET(i, &Test);
	    t.tv_sec = 0;
	    t.tv_usec = 0;
	    if (select(i + 1, (FDSET *)NULL, &Test, (FDSET *)NULL, &t) < 0
	     && errno != EINTR) {
		syslog(L_ERROR, "%s Bad Write File %d", LogName, i);
		FD_CLR(i, &WCHANmask);
		/* Probably do something about the file descriptor here; call
		 * CHANclose on it? */
	    }
	    FD_CLR(i, &Test);
	}
    }
}
#endif	/* defined(INND_FIND_BAD_FDS) */


/*
**  Main I/O loop.  Wait for data, call the channel's handler when there is
**  something to read or when the queued write is finished.  In order to
**  be fair (i.e., don't always give descriptor n priority over n+1), we
**  remember where we last had something and pick up from there.
*/
void
CHANreadloop()
{
    static char		EXITING[] = "INND exiting because of signal\n";
    static int		fd;
    register int	i;
    register int	startpoint;
    register int	count;
    register int	lastfd;
    int			oerrno;
    register CHANNEL	*cp;
    register BUFFER	*bp;
    FDSET		MyRead;
    FDSET		MyWrite;
    struct timeval	MyTime;
    long		silence;
    char		*p;

    for ( ; ; ) {
	/* See if any processes died. */
	PROCscan();

	/* Wait for data, note the time. */
	MyRead = RCHANmask;
	MyWrite = WCHANmask;
	MyTime = TimeOut;
	count = select(CHANlastfd + 1, &MyRead, &MyWrite, (FDSET *)NULL,
		&MyTime);
	if (GotTerminate) {
	    (void)write(2, EXITING, STRLEN(EXITING));
	    CleanupAndExit(0, "received signal");
	}
	if (count < 0) {
	    if (errno != EINTR) {
		syslog(L_ERROR, "%s cant select %m", LogName);
#if	defined(INND_FIND_BAD_FDS)
		CHANdiagnose();
#endif	/* defined(INND_FIND_BAD_FDS) */
	    }
	    continue;
	}

	if (count == 0) {
	    /* No channels active, so flush and skip if nobody's
	     * sleeping. */
	    if (Mode == OMrunning)
		ICDwrite();
	    if (SCHANcount == 0)
		continue;
	}

	/* Update the "reasonably accurate" time. */
	if (GetTimeInfo(&Now) < 0)
	    syslog(L_ERROR, "%s cant gettimeinfo %m", LogName);

	/* Try the control channel first. */
	if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) {
	    count--;
	    (*CHANcc->Reader)(CHANcc);
	    FD_CLR(CHANccfd, &MyRead);
	}

	/* Loop through all active channels.  Somebody could have closed
	 * closed a channel so we double-check the global mask before
	 * looking at what select returned.  The code here is written so
	 * that a channel could be reading and writing and sleeping at the
	 * same time, even though that's not possible.  (Just as well,
	 * since in SysVr4 the count would be wrong.) */
	lastfd = CHANlastfd;
	if (lastfd < CHANlastsleepfd)
	    lastfd = CHANlastsleepfd;
	if (fd > lastfd)
	    fd = 0;
	startpoint = fd;
	do {
	    cp = &CHANtable[fd];
	    if (cp->Type == CTfree)
		goto Next;

	    /* Anything to read? */
	    if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) {
		count--;
		cp->LastActive = Now.time;
		(*cp->Reader)(cp);
	    }

	    /* Possibly recheck for dead children so we don't get SIGPIPE
	     * on readerless channels. */
	    if (PROCneedscan)
		PROCscan();

	    /* Ready to write? */
	    if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) {
		count--;
		bp = &cp->Out;
		if (bp->Left) {
		    cp->LastActive = Now.time;
		    i = largewrite(fd, &bp->Data[bp->Used], bp->Left);
		    if (i <= 0) {
			oerrno = errno;
			p = CHANname(cp);
			errno = oerrno;
			if (i < 0)
			    syslog(L_ERROR, "%s cant write %m", p);
			else
			    syslog(L_ERROR, "%s cant write", p);
			cp->BadWrites++;
			if (i < 0 && oerrno == EPIPE) {
			    SITEchanclose(cp);
			    CHANclose(cp, p);
			}
			else if (i < 0 && oerrno == EWOULDBLOCK) {
			    WCHANremove(cp);
			    CHANwritesleep(cp, p);
			}
			else if (cp->BadWrites >= BAD_IO_COUNT) {
			    syslog(L_ERROR, "%s sleeping", p);
			    WCHANremove(cp);
			    SCHANadd(cp, (time_t)(Now.time + PAUSE_RETRY_TIME),
				(POINTER)NULL, CHANwakeup, (POINTER)NULL);
			}
		    }
		    else {
			cp->BadWrites = 0;
			cp->BlockedWrites = 0;
			bp->Left -= i;
			bp->Used += i;
			if (bp->Left <= 0) {
			    WCHANremove(cp);
			    (*cp->WriteDone)(cp);
			}
		    }
		}
	    }

	    /* Coming off a sleep? */
	    if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) {
		cp->LastActive = Now.time;
		SCHANremove(cp);
		(*cp->Waker)(cp);
	    }

	    /* Has this channel been inactive very long? */
	    if (cp->Type == CTnntp
	     && cp->LastActive + cp->NextLog < Now.time) {
		p = CHANname(cp);
		silence = Now.time - cp->LastActive;
		cp->NextLog += CHANNEL_INACTIVE_TIME;
		syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L);
		if (silence > PEER_TIMEOUT) {
		    syslog(L_NOTICE, "%s timeout", p);
		    CHANclose(cp, p);
		}
	    }

    Next:
	    /* Bump pointer, modulo the table size. */
	    if (fd >= lastfd)
		fd = 0;
	    else
		fd++;

	    /* If there is nothing to do, break out. */
	    if (count == 0 && SCHANcount == 0)
		break;

	} while (fd != startpoint);
    }
}