4.4BSD/usr/src/contrib/news/inn/innd/chan.c
/* $Revision: 1.30 $
**
** I/O channel (and buffer) processing.
*/
#include "innd.h"
STATIC FDSET RCHANmask;
STATIC FDSET SCHANmask;
STATIC FDSET WCHANmask;
STATIC int SCHANcount;
STATIC int CHANlastfd;
STATIC int CHANlastsleepfd;
STATIC int CHANccfd;
STATIC int CHANtablesize;
STATIC CHANNEL *CHANtable;
STATIC CHANNEL *CHANcc;
STATIC CHANNEL CHANnull = { CTfree, CSerror, -1 };
/*
** Set a buffer's contents, ignoring anything that might have
** been there.
*/
void
BUFFset(bp, p, length)
register BUFFER *bp;
register char *p;
register int length;
{
register char *dest;
if ((bp->Left = length) != 0) {
/* Need more space? */
if (bp->Size < length) {
bp->Size = GROW_AMOUNT(length);
RENEW(bp->Data, char, bp->Size);
}
if (length > MEMCPY_THRESHOLD)
(void)memcpy((POINTER)bp->Data, (POINTER)p, (SIZE_T)length);
else {
for (dest = bp->Data, length++; --length > 0; )
*dest++ = *p++;
}
}
bp->Used = 0;
}
/*
** Initialize all the I/O channels.
*/
void
CHANsetup(i)
register int i;
{
register CHANNEL *cp;
FD_ZERO(&RCHANmask);
FD_ZERO(&SCHANmask);
FD_ZERO(&WCHANmask);
if (CHANtable)
DISPOSE(CHANtable);
CHANtablesize = i;
CHANtable = NEW(CHANNEL, CHANtablesize);
(void)memset((POINTER)CHANtable, 0,
(SIZE_T)(CHANtablesize * sizeof *CHANtable));
CHANnull.NextLog = CHANNEL_INACTIVE_TIME;
CHANnull.Address.s_addr = MyAddress.s_addr;
for (cp = CHANtable; --i >= 0; cp++)
*cp = CHANnull;
}
/*
** Create a channel from a descriptor.
*/
CHANNEL *
CHANcreate(fd, Type, State, Reader, WriteDone)
int fd;
CHANNELTYPE Type;
CHANNELSTATE State;
FUNCPTR Reader;
FUNCPTR WriteDone;
{
register CHANNEL *cp;
BUFFER in;
BUFFER out;
cp = &CHANtable[fd];
/* Don't overwrite the buffers with CHANnull. */
in = cp->In;
if (in.Size == 0) {
in.Size = START_BUFF_SIZE;
in.Data = NEW(char, in.Size);
}
in.Used = 0;
in.Left = in.Size;
out = cp->Out;
if (out.Size == 0) {
out.Size = SMBUF;
out.Data = NEW(char, out.Size);
}
out.Used = 0;
out.Left = 0;
/* Set up the channel's info. */
*cp = CHANnull;
cp->fd = fd;
cp->Type = Type;
cp->State = State;
cp->Reader = Reader;
cp->WriteDone = WriteDone;
cp->Started = cp->LastActive = Now.time;
cp->In = in;
cp->Out = out;
cp->Tracing = Tracing;
/* Make the descriptor close-on-exec and non-blocking. */
CloseOnExec(fd, TRUE);
#if defined(ENOTSOCK)
if (SetNonBlocking(fd, TRUE) < 0 && errno != ENOTSOCK)
syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#else
if (SetNonBlocking(fd, TRUE) < 0)
syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
#endif /* defined(ENOTSOCK) */
/* Note control channel, for efficiency. */
if (Type == CTcontrol) {
CHANcc = cp;
CHANccfd = fd;
}
return cp;
}
/*
** Start tracing a channel.
*/
void
CHANtracing(cp, Flag)
register CHANNEL *cp;
BOOL Flag;
{
char *p;
p = CHANname(cp);
syslog(L_NOTICE, "%s trace %s", p, Flag ? "on" : "off");
cp->Tracing = Flag;
if (Flag) {
syslog(L_NOTICE, "%s trace badwrites %d blockwrites %d badreads %d",
p, cp->BadWrites, cp->BadReads, cp->BlockedWrites);
syslog(L_NOTICE, "%s trace address %s lastactive %ld nextlod %ld",
p, inet_ntoa(cp->Address), cp->LastActive, cp->NextLog);
if (FD_ISSET(cp->fd, &SCHANmask))
syslog(L_NOTICE, "%s trace sleeping %ld 0x%x",
p, (long)cp->Waketime, cp->Waker);
if (FD_ISSET(cp->fd, &RCHANmask))
syslog(L_NOTICE, "%s trace reading %d %s",
p, cp->In.Used, MaxLength(cp->In.Data, cp->In.Data));
if (FD_ISSET(cp->fd, &WCHANmask))
syslog(L_NOTICE, "%s trace writing %d %s",
p, cp->Out.Left, MaxLength(cp->Out.Data, cp->Out.Data));
}
}
/*
** Close a channel.
*/
void
CHANclose(cp, name)
register CHANNEL *cp;
char *name;
{
if (cp->Type == CTfree)
syslog(L_ERROR, "%s internal closing free channel", name);
else {
if (cp->Type == CTnntp)
syslog(L_NOTICE,
"%s closed seconds %ld accepted %ld refused %ld rejected %ld",
name, (long)(Now.time - cp->Started),
cp->Received, cp->Refused, cp->Rejected);
else
syslog(L_NOTICE, "%s closed", name);
WCHANremove(cp);
RCHANremove(cp);
SCHANremove(cp);
if (cp->Argument != NULL)
/* Set to NULL below. */
DISPOSE(cp->Argument);
if (cp->fd >= 0 && close(cp->fd) < 0)
syslog(L_ERROR, "%s cant close %s %m", LogName, name);
}
/* Mark it unused. */
cp->Type = CTfree;
cp->State = CSerror;
cp->fd = -1;
cp->Argument = NULL;
/* Free the buffers if they got big. */
if (cp->In.Size > BIG_BUFFER) {
cp->In.Size = 0;
DISPOSE(cp->In.Data);
}
if (cp->Out.Size > BIG_BUFFER) {
cp->Out.Size = 0;
DISPOSE(cp->Out.Data);
}
}
/*
** Return a printable name for the channel.
*/
char *
CHANname(cp)
register CHANNEL *cp;
{
static char buff[SMBUF];
register int i;
register SITE *sp;
STRING p;
PID_T pid;
switch (cp->Type) {
default:
(void)sprintf(buff, "?%d(#%d@%d)?", cp->Type, cp->fd, cp - CHANtable);
break;
case CTany:
(void)sprintf(buff, "any:%d", cp->fd);
break;
case CTfree:
(void)sprintf(buff, "free:%d", cp->fd);
break;
case CTremconn:
(void)sprintf(buff, "remconn:%d", cp->fd);
break;
case CTnntp:
(void)sprintf(buff, "%s:%d",
cp->Address.s_addr == 0 ? "localhost" : RChostname(cp),
cp->fd);
break;
case CTlocalconn:
(void)sprintf(buff, "localconn:%d", cp->fd);
break;
case CTcontrol:
(void)sprintf(buff, "control:%d", cp->fd);
break;
case CTexploder:
case CTfile:
case CTprocess:
/* Find the site that has this channel. */
for (p = "?", i = nSites, sp = Sites, pid = 0; --i >= 0; sp++)
if (sp->Channel == cp) {
p = sp->Name;
if (cp->Type != CTfile)
pid = sp->pid;
break;
}
if (pid == 0)
(void)sprintf(buff, "%s:%d:%s",
MaxLength(p, p), cp->fd,
cp->Type == CTfile ? "file" : "proc");
else
(void)sprintf(buff, "%s:%d:%s:%ld",
MaxLength(p, p), cp->fd,
cp->Type == CTfile ? "file" : "proc", (long)pid);
break;
}
return buff;
}
/*
** Return the channel for a specified descriptor.
*/
CHANNEL *
CHANfromdescriptor(fd)
int fd;
{
if (fd <0 || fd > CHANtablesize)
return NULL;
return &CHANtable[fd];
}
/*
** Iterate over all channels of a specified type.
*/
CHANNEL *
CHANiter(ip, Type)
int *ip;
CHANNELTYPE Type;
{
register CHANNEL *cp;
register int i;
if ((i = *ip) >= 0 && i < CHANtablesize) {
do {
cp = &CHANtable[i];
if (Type == CTany || cp->Type == Type) {
*ip = ++i;
return cp;
}
} while (++i < CHANtablesize);
}
return NULL;
}
/*
** Mark a channel as an active reader.
*/
void
RCHANadd(cp)
register CHANNEL *cp;
{
FD_SET(cp->fd, &RCHANmask);
if (cp->fd > CHANlastfd)
CHANlastfd = cp->fd;
/* Start reading at the beginning of the buffer. */
cp->In.Used = 0;
}
/*
** Remove a channel from the set of readers.
*/
void
RCHANremove(cp)
register CHANNEL *cp;
{
if (FD_ISSET(cp->fd, &RCHANmask)) {
FD_CLR(cp->fd, &RCHANmask);
if (cp->fd == CHANlastfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastfd, &RCHANmask)
&& !FD_ISSET(CHANlastfd, &WCHANmask)
&& CHANlastfd > 1)
CHANlastfd--;
}
}
}
/*
** Put a channel to sleep, call a function when it wakes.
** Note that the Argument must be NULL or allocated memory!
*/
void
SCHANadd(cp, Waketime, Event, Waker, Argument)
register CHANNEL *cp;
time_t Waketime;
POINTER Event;
FUNCPTR Waker;
POINTER Argument;
{
if (!FD_ISSET(cp->fd, &SCHANmask)) {
SCHANcount++;
FD_SET(cp->fd, &SCHANmask);
}
if (cp->fd > CHANlastsleepfd)
CHANlastsleepfd = cp->fd;
cp->Waketime = Waketime;
cp->Waker = Waker;
if (cp->Argument != Argument) {
DISPOSE(cp->Argument);
cp->Argument = Argument;
}
cp->Event = Event;
}
/*
** Take a channel off the sleep list.
*/
void
SCHANremove(cp)
register CHANNEL *cp;
{
if (FD_ISSET(cp->fd, &SCHANmask)) {
FD_CLR(cp->fd, &SCHANmask);
SCHANcount--;
cp->Waketime = 0;
if (cp->fd == CHANlastsleepfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastsleepfd, &WCHANmask)
&& CHANlastsleepfd > 1)
CHANlastsleepfd--;
}
}
}
/*
** Is a channel on the sleep list?
*/
BOOL
CHANsleeping(cp)
CHANNEL *cp;
{
return FD_ISSET(cp->fd, &SCHANmask);
}
/*
** Wake up channels waiting for a specific event.
*/
void
SCHANwakeup(Event)
register POINTER Event;
{
register CHANNEL *cp;
register int i;
for (cp = CHANtable, i = CHANtablesize; --i >= 0; cp++)
if (cp->Type != CTfree && cp->Event == Event && CHANsleeping(cp))
cp->Waketime = 0;
}
/*
** Mark a channel as an active writer. Don't reset the Out->Left field
** since we could have buffered I/O already in there.
*/
void
WCHANadd(cp)
register CHANNEL *cp;
{
if (cp->Out.Left > 0) {
FD_SET(cp->fd, &WCHANmask);
if (cp->fd > CHANlastfd)
CHANlastfd = cp->fd;
}
}
/*
** Remove a channel from the set of writers.
*/
void
WCHANremove(cp)
register CHANNEL *cp;
{
if (FD_ISSET(cp->fd, &WCHANmask)) {
FD_CLR(cp->fd, &WCHANmask);
if (cp->Out.Left <= 0) {
/* No data left -- reset used so we don't grow the buffer. */
cp->Out.Used = 0;
cp->Out.Left = 0;
}
if (cp->fd == CHANlastfd) {
/* This was the highest descriptor, get a new highest. */
while (!FD_ISSET(CHANlastfd, &RCHANmask)
&& !FD_ISSET(CHANlastfd, &WCHANmask)
&& CHANlastfd > 1)
CHANlastfd--;
}
}
}
/*
** Set a channel to start off with the contents of an existing channel.
*/
void
WCHANsetfrombuffer(cp, bp)
CHANNEL *cp;
BUFFER *bp;
{
WCHANset(cp, &bp->Data[bp->Used], bp->Left);
}
/*
** Read in text data, return the amount we read.
*/
int
CHANreadtext(cp)
register CHANNEL *cp;
{
register int i;
register BUFFER *bp;
char *p;
p = CHANname(cp);
/* Read in whatever is there. */
bp = &cp->In;
bp->Left = bp->Size - bp->Used;
i = read(cp->fd, &bp->Data[bp->Used], bp->Left - 1);
if (i < 0) {
syslog(L_ERROR, "%s cant read %m", p);
return -1;
}
if (i == 0) {
syslog(L_NOTICE, "%s readclose", p);
CHANclose(cp, p);
return 0;
}
/* Update values, grow buffer if we're getting close. */
bp->Used += i;
bp->Left -= i;
if (bp->Left <= LOW_WATER) {
i = GROW_AMOUNT(bp->Size);
bp->Size += i;
bp->Left += i;
RENEW(bp->Data, char, bp->Size);
}
return i;
}
/*
** If I/O backs up a lot, we can get EMSGSIZE on some systems. If that
** happens we want to do the I/O in chunks. We assume stdio's BUFSIZ is
** a good chunk value.
*/
STATIC int
largewrite(fd, p, length)
register int fd;
register char *p;
register int length;
{
register int i;
register char *save;
do {
/* Try the standard case -- write it all. */
i = write(fd, (POINTER)p, (SIZE_T)length);
if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR))
return i;
} while (i < 0 && errno == EINTR);
/* Write it in pieces. */
for (save = p, i = 0; length; p += i, length -= i) {
i = write(fd, (POINTER)p, (SIZE_T)(length > BUFSIZ ? BUFSIZ : length));
if (i <= 0)
break;
}
/* Return error, or partial results if we got something. */
return p == save ? i : p - save;
}
/*
** Try to flush out the buffer. Use this only on file channels!
*/
BOOL
WCHANflush(cp)
register CHANNEL *cp;
{
register BUFFER *bp;
register int i;
/* If nothing in there, or nothing left, nothing to do. */
bp = &cp->Out;
if (bp->Left == 0)
return TRUE;
/* Write it. */
while (bp->Left > 0) {
i = write(cp->fd, (POINTER)&bp->Data[bp->Used], (SIZE_T)bp->Left);
if (i < 0) {
syslog(L_ERROR, "%s cant flush count %d %m",
CHANname(cp), bp->Left);
return FALSE;
}
if (i == 0) {
syslog(L_ERROR, "%s cant flush count %d",
CHANname(cp), bp->Left);
return FALSE;
}
bp->Left -= i;
bp->Used += i;
if (bp->Left <= 0)
WCHANremove(cp);
}
return TRUE;
}
/*
** Wakeup routine called after a write channel was put to sleep.
*/
STATIC FUNCTYPE
CHANwakeup(cp)
CHANNEL *cp;
{
syslog(L_NOTICE, "%s wakeup", CHANname(cp));
WCHANadd(cp);
}
/*
** Attempting to write would block; stop output or give up.
*/
STATIC void
CHANwritesleep(cp, p)
register CHANNEL *cp;
char *p;
{
int i;
if ((i = ++(cp->BlockedWrites)) > BAD_IO_COUNT)
switch (cp->Type) {
default:
break;
case CTnntp:
case CTfile:
case CTexploder:
case CTprocess:
syslog(L_ERROR, "%s blocked closing", p);
SITEchanclose(cp);
CHANclose(cp, p);
return;
}
i *= BLOCK_BACKOFF;
syslog(L_ERROR, "%s blocked sleeping %d", p, i);
SCHANadd(cp, (time_t)(Now.time + i), (POINTER)NULL,
CHANwakeup, (POINTER)NULL);
}
#if defined(INND_FIND_BAD_FDS)
/*
** We got an unknown error in select. Find out the culprit.
** Not really ready for production use yet, and it's expensive, too.
*/
STATIC void
CHANdiagnose()
{
FDSET Test;
int i;
struct timeval t;
FD_ZERO(&Test);
for (i = CHANlastfd; i >= 0; i--) {
if (FD_ISSET(i, &RCHANmask)) {
FD_SET(i, &Test);
t.tv_sec = 0;
t.tv_usec = 0;
if (select(i + 1, &Test, (FDSET *)NULL, (FDSET *)NULL, &t) < 0
&& errno != EINTR) {
syslog(L_ERROR, "%s Bad Read File %d", LogName, i);
FD_CLR(i, &RCHANmask);
/* Probably do something about the file descriptor here; call
* CHANclose on it? */
}
FD_CLR(i, &Test);
}
if (FD_ISSET(i, &WCHANmask)) {
FD_SET(i, &Test);
t.tv_sec = 0;
t.tv_usec = 0;
if (select(i + 1, (FDSET *)NULL, &Test, (FDSET *)NULL, &t) < 0
&& errno != EINTR) {
syslog(L_ERROR, "%s Bad Write File %d", LogName, i);
FD_CLR(i, &WCHANmask);
/* Probably do something about the file descriptor here; call
* CHANclose on it? */
}
FD_CLR(i, &Test);
}
}
}
#endif /* defined(INND_FIND_BAD_FDS) */
/*
** Main I/O loop. Wait for data, call the channel's handler when there is
** something to read or when the queued write is finished. In order to
** be fair (i.e., don't always give descriptor n priority over n+1), we
** remember where we last had something and pick up from there.
*/
void
CHANreadloop()
{
static char EXITING[] = "INND exiting because of signal\n";
static int fd;
register int i;
register int startpoint;
register int count;
register int lastfd;
int oerrno;
register CHANNEL *cp;
register BUFFER *bp;
FDSET MyRead;
FDSET MyWrite;
struct timeval MyTime;
long silence;
char *p;
for ( ; ; ) {
/* See if any processes died. */
PROCscan();
/* Wait for data, note the time. */
MyRead = RCHANmask;
MyWrite = WCHANmask;
MyTime = TimeOut;
count = select(CHANlastfd + 1, &MyRead, &MyWrite, (FDSET *)NULL,
&MyTime);
if (GotTerminate) {
(void)write(2, EXITING, STRLEN(EXITING));
CleanupAndExit(0, "received signal");
}
if (count < 0) {
if (errno != EINTR) {
syslog(L_ERROR, "%s cant select %m", LogName);
#if defined(INND_FIND_BAD_FDS)
CHANdiagnose();
#endif /* defined(INND_FIND_BAD_FDS) */
}
continue;
}
if (count == 0) {
/* No channels active, so flush and skip if nobody's
* sleeping. */
if (Mode == OMrunning)
ICDwrite();
if (SCHANcount == 0)
continue;
}
/* Update the "reasonably accurate" time. */
if (GetTimeInfo(&Now) < 0)
syslog(L_ERROR, "%s cant gettimeinfo %m", LogName);
/* Try the control channel first. */
if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) {
count--;
(*CHANcc->Reader)(CHANcc);
FD_CLR(CHANccfd, &MyRead);
}
/* Loop through all active channels. Somebody could have closed
* closed a channel so we double-check the global mask before
* looking at what select returned. The code here is written so
* that a channel could be reading and writing and sleeping at the
* same time, even though that's not possible. (Just as well,
* since in SysVr4 the count would be wrong.) */
lastfd = CHANlastfd;
if (lastfd < CHANlastsleepfd)
lastfd = CHANlastsleepfd;
if (fd > lastfd)
fd = 0;
startpoint = fd;
do {
cp = &CHANtable[fd];
if (cp->Type == CTfree)
goto Next;
/* Anything to read? */
if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) {
count--;
cp->LastActive = Now.time;
(*cp->Reader)(cp);
}
/* Possibly recheck for dead children so we don't get SIGPIPE
* on readerless channels. */
if (PROCneedscan)
PROCscan();
/* Ready to write? */
if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) {
count--;
bp = &cp->Out;
if (bp->Left) {
cp->LastActive = Now.time;
i = largewrite(fd, &bp->Data[bp->Used], bp->Left);
if (i <= 0) {
oerrno = errno;
p = CHANname(cp);
errno = oerrno;
if (i < 0)
syslog(L_ERROR, "%s cant write %m", p);
else
syslog(L_ERROR, "%s cant write", p);
cp->BadWrites++;
if (i < 0 && oerrno == EPIPE) {
SITEchanclose(cp);
CHANclose(cp, p);
}
else if (i < 0 && oerrno == EWOULDBLOCK) {
WCHANremove(cp);
CHANwritesleep(cp, p);
}
else if (cp->BadWrites >= BAD_IO_COUNT) {
syslog(L_ERROR, "%s sleeping", p);
WCHANremove(cp);
SCHANadd(cp, (time_t)(Now.time + PAUSE_RETRY_TIME),
(POINTER)NULL, CHANwakeup, (POINTER)NULL);
}
}
else {
cp->BadWrites = 0;
cp->BlockedWrites = 0;
bp->Left -= i;
bp->Used += i;
if (bp->Left <= 0) {
WCHANremove(cp);
(*cp->WriteDone)(cp);
}
}
}
}
/* Coming off a sleep? */
if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) {
cp->LastActive = Now.time;
SCHANremove(cp);
(*cp->Waker)(cp);
}
/* Has this channel been inactive very long? */
if (cp->Type == CTnntp
&& cp->LastActive + cp->NextLog < Now.time) {
p = CHANname(cp);
silence = Now.time - cp->LastActive;
cp->NextLog += CHANNEL_INACTIVE_TIME;
syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L);
if (silence > PEER_TIMEOUT) {
syslog(L_NOTICE, "%s timeout", p);
CHANclose(cp, p);
}
}
Next:
/* Bump pointer, modulo the table size. */
if (fd >= lastfd)
fd = 0;
else
fd++;
/* If there is nothing to do, break out. */
if (count == 0 && SCHANcount == 0)
break;
} while (fd != startpoint);
}
}