2.11BSD/ingres/source/decomp/call_ovqp.c
# include "../pipes.h"
# include "../ingres.h"
# include "../aux.h"
# include "../unix.h"
# include "../tree.h"
# include "../symbol.h"
# include "decomp.h"
/*
** CALL_OVQP -- Routines which interface to the One Variable Query Processor.
**
** This file contains the routines associated with sending queries
** and receiving results from OVQP. The interface to these routines is
** still messy. Call_ovqp is given the query, mode, and result relation
** as parameters and gets the source relation, and two flags
** (Newq, Newr) as globals. The routines include:
**
** Call_ovqp -- Sends a One-var query to ovqp, flushes the pipe,
** and reads result from ovqp.
**
** Endovqp -- Informs ovqp that the query is over. Helps to synchronize
** the batch file (if any).
**
** Pipesym -- Writes a token symbol to ovqp.
*/
struct pipfrmt Outpipe, Inpipe;
struct retcode Retcode; /* return code structure */
call_ovqp(qtree, mode, resultnum)
struct querytree *qtree;
int mode;
int resultnum;
/*
** Call_ovqp -- send query down pipe to ovqp and flush pipe.
** Inputs are:
** mode retrieve, append, etc.
** resultnum result relation id
** qtree the query
** Sourcevar (global) if >= 0 then source var
** Newq send NEWQ symbol
** Newr send NEWR symbol
*/
{
register struct querytree *tree;
register int i;
char *rangename();
struct retcode ret;
struct symbol s;
int j;
tree = qtree;
# ifdef xDTR1
if (tTf(8, -1))
{
if (tTf(8, 0))
printf("CALL_OVQP-\n");
if (tTf(8, 1))
{
if (Newq)
printree(tree, "new query to ovqp");
else
printf("query same as previous\n");
}
if (tTf(8, 2))
{
printf("Sourcevar=%d\t", Sourcevar);
if (Sourcevar >= 0)
printf("relid=%s\t", rangename(Sourcevar));
if (resultnum >= 0)
printf("Resultname=%s", rnum_convert(resultnum));
if (((struct qt_root *) tree)->rootuser)
printf(", userquery");
printf("\n");
}
}
# endif
/* header information needed by OVQP for each query */
wrpipe(P_PRIME, &Outpipe, EXEC_OVQP, 0, 0);
pipesym(QMODE, 1, mode);
if (Newq)
pipesym(CHANGESTRAT, 0);
if (Newr)
{
pipesym(REOPENRES, 0);
Newr = FALSE;
}
if (Sourcevar >= 0)
{
s.type = SOURCEID;
s.len = MAXNAME;
wrpipe(P_NORM, &Outpipe, W_ovqp, &s, 2);
wrpipe(P_NORM, &Outpipe, W_ovqp, rangename(Sourcevar), MAXNAME);
}
if (resultnum >= 0)
{
s.type = RESULTID;
s.len = MAXNAME;
wrpipe(P_NORM, &Outpipe, W_ovqp, &s, 2);
wrpipe(P_NORM, &Outpipe, W_ovqp, rnum_convert(resultnum), MAXNAME);
}
/* this symbol, USERQRY, should be sent last because OVQP
* requires the query mode and result relation when it
* encounters USERQRY.
*/
if (((struct qt_root *) tree)->rootuser)
{
pipesym(USERQRY, 0);
}
/* now write the query list itself */
pipesym(TREE, 0);
if (tree->sym.type == AGHEAD)
{
pipesym(AGHEAD, 0);
if (tree->left->sym.type == BYHEAD)
{
mklist(tree->left->right);
pipesym(BYHEAD, 0);
mklist(tree->left->left);
}
else
mklist(tree->left);
}
else
mklist(tree->left);
pipesym(ROOT, 0);
mklist(tree->right);
pipesym(QLEND, 0);
/* now flush the query to ovqp */
wrpipe(P_FLUSH, &Outpipe, W_ovqp);
/*
** Read the result of the query from ovqp. A Retcode
** struct is expected.
** The possble values of Retcode.rt_status are:
** EMPTY -- no tuples satisfied
** NONEMPTY -- at least one tuple satisfied
** error num -- ovqp user error
*/
rdpipe(P_PRIME, &Inpipe);
if (rdpipe(P_NORM, &Inpipe, R_ovqp, &ret, sizeof (ret)) != sizeof (ret))
syserr("readresult: read error on R_ovqp");
j = ret.rc_status;
switch (j)
{
case EMPTY:
i = FALSE;
break;
case NONEMPTY:
i = TRUE;
if (((struct qt_root *) tree)->rootuser)
Retcode.rc_tupcount = ret.rc_tupcount;
break;
default:
derror(j);
}
return (i);
}
readagg_result(result)
struct querytree *result[];
/*
** Readagg_result -- read results from ovqp from a simple aggregate.
*/
{
register struct querytree **r, *aop;
register int vallen;
int i;
r = result;
while (aop = *r++)
{
vallen = aop->sym.len & I1MASK;
if (rdpipe(P_NORM, &Inpipe, R_ovqp, &aop->sym, 2) != 2)
syserr("ageval:rd err (1)");
if (vallen != (aop->sym.len & I1MASK))
syserr("ageval:len %d,%d", vallen, aop->sym.len);
if ((i = rdpipe(P_NORM, &Inpipe, R_ovqp, aop->sym.value, vallen)) != vallen)
syserr("ageval:rd err %d,%d", i, vallen);
# ifdef xDTR1
if (tTf(8, 3))
writenod(aop);
# endif
}
}
endovqp(ack)
int ack;
/*
** Endovqp -- Inform ovqp that processing is complete. "Ack" indicates
** whether to wait for an acknowledgement from ovqp. The overall
** mode of the query is sent followed by an EXIT command.
**
** Ovqp decides whether to use batch update or not. If ack == ACK
** then endovqp will read a RETVAL symbol from ovqp and return
** a token which specifies whether to call the update processor or not.
*/
{
register int done, j;
struct retcode ret;
extern int Qry_mode;
extern struct descriptor Inddes;
if (ack != RUBACK)
{
wrpipe(P_PRIME, &Outpipe, EXEC_OVQP, 0, 0);
/* send mode of the query */
pipesym(QMODE, 1, Qry_mode);
/* send exit code */
pipesym(EXIT, 1, ack);
wrpipe(P_END, &Outpipe, W_ovqp);
}
j = NOUPDATE;
if (ack == ACK)
{
/* read acknowledgement from ovqp */
rdpipe(P_PRIME, &Inpipe);
if (rdpipe(P_NORM, &Inpipe, R_ovqp, &ret, sizeof (ret)) != sizeof (ret))
syserr("endovqp no retcode");
j = ret.rc_status;
if (j != UPDATE && j != NOUPDATE)
syserr("endovqp:%d", j);
rdpipe(P_SYNC, &Inpipe, R_ovqp);
}
cleanrel(&Inddes); /* could be closecatalog(FALSE) except for
** text space limitations */
return (j);
}
pipesym(token, length, val)
int token;
int length;
int val;
{
register struct symbol *s;
char temp[4];
register int i;
s = (struct symbol *) temp;
i = length;
s->type = token;
if (s->len = i)
s->value[0] = val;
wrpipe(P_NORM, &Outpipe, W_ovqp, s, i + 2);
}
ovqpnod(node)
struct querytree *node;
{
register int typ;
register struct querytree *q;
register struct symbol *s;
int i;
char temp[4];
struct querytree *ckvar();
q = node;
s = (struct symbol *) temp;
typ = q->sym.type;
s->type = typ;
switch (typ)
{
case VAR:
q = ckvar(q);
/* check to see if this variable has been substituted for.
** if so, pass the constant value to OVQP */
if (((struct qt_var *)q)->valptr)
{
s->type = ((struct qt_var *)q)->frmt;
s->len = ((struct qt_var *)q)->frml;
wrpipe(P_NORM, &Outpipe, W_ovqp, s, 2);
wrpipe(P_NORM, &Outpipe, W_ovqp, ((struct qt_var *)q)->valptr, s->len & 0377);
return;
}
else
{
/* double check that variable is sourcevar */
if (((struct qt_var *)q)->varno != Sourcevar)
syserr("mklist:src=%d,var=%d", Sourcevar, ((struct qt_var *)q)->varno);
s->len = 1;
s->value[0] = ((struct qt_var *)q)->attno;
break;
}
case TREE:
return;
case AND:
case OR:
s->len = 0;
break;
case BOP:
case RESDOM:
case UOP:
s->len = 1;
s->value[0] = *(q->sym.value);
break;
default:
i = (q->sym.len & 0377) + 2;
wrpipe(P_NORM, &Outpipe, W_ovqp, &q->sym, i);
return;
}
wrpipe(P_NORM, &Outpipe, W_ovqp, s, s->len + 2);
}
/*
** Use "inpcloser()" for closing relations. See
** desc_close in openrs.c for details.
*/
extern int inpcloser();
int (*Des_closefunc)() = inpcloser;
init_decomp()
{
/* this is a null routine called from main() */
}
startdecomp()
{
/* called at the start of each user query */
Retcode.rc_tupcount = 0;
initrange();
rnum_init();
}
/*
** Files_avail -- returns how many file descriptors are available
** for decomposition. For decomp running by itself the fixed
** overhead for files is:
**
** 4 -- pipes
** 1 -- relation relation
** 1 -- attribute relation
** 1 -- indexes relation
** 1 -- standard output
** 1 -- concurrency
*/
files_avail(mode)
int mode;
{
return (MAXFILES - 9);
}