USG_PG3/usr/source/opsys/messag.c
#include "../head/param.h"
#include "../head/reg.h"
#include "../head/ipcomm.h"
#include "../head/user.h"
#include "../head/userx.h"
#include "../head/proc.h"
struct msgqhdr *logqp; /* for error logging */
struct msgqhdr msgqhdr[NMQHDR];
int msgmap[MMAPSIZ]; /* space for message allocation */
int msgcore[MSGMEM*16]; /* space for messages */
int movrhead, msgslept, mmemwant;
/*
* Message System Call
*/
messag()
{ register int n;
register struct msgqhdr *rqp;
register struct msghdr *hp;
int mtest;
mtest = 0;
switch(u.u_arg[0]) {
default:
u.u_error = EINVAL;
return;
case MDISAB:
msgflush();
return;
case MENAB:
if(u.u_msgqhdr != NULL) return;
for(rqp = &msgqhdr[0];rqp < &msgqhdr[NMQHDR];rqp++)
if(rqp->mq_procp == NULL) {
rqp->mq_forw = NULL;
rqp->mq_last = &rqp->mq_forw;
rqp->mq_cnt = 0;
rqp->mq_flag = 0;
u.u_msgqhdr = rqp;
rqp->mq_procp = u.u_procp;
return;
}
u.u_error = ETABLE;
return;
case MSEND:
mtest++;
case MSENDW:
if((n = u.u_ar0[R0]) < 0 || n > MAXMLEN ||
u.u_arg[3] <= 0 || u.u_arg[3] > 128) {
u.u_error = EINVAL;
return;
}
if((rqp = mqsrch(u.u_arg[2])) == NULL) {
u.u_error = ESRCH;
return;
}
if(rqp->mq_cnt >= MAXMSG) {
u.u_error = ETABLE;
return;
}
while((hp = msgsetup(n)) == NULL) {
if(mtest) {
u.u_error = ENOMEM;
return;
}
mmemwant++;
msgslept++;
sleep(msgcore, PMSG);
if(rqp->mq_procp == NULL ||
rqp->mq_procp->p_pid != u.u_arg[2]) {
u.u_error = ESRCH;
return;
}
}
msgmove(hp, n, MSGIN);
if(u.u_error) msgfree(hp);
else {
hp->mq_type = u.u_arg[3];
hp->mq_sender = u.u_procp->p_pid;
msgsend(rqp, hp);
}
return;
case MRECV:
mtest++;
case MRECVW:
if((rqp = u.u_msgqhdr) == NULL) {
u.u_error = ENOALOC;
return;
}
if((n = u.u_arg[3]) < 0 || n > 128 || u.u_ar0[R0] < 0) {
u.u_error = EINVAL;
return;
}
while(!msgrecv(rqp, n)) {
if(mtest) {
u.u_error = ENOMSG;
return;
}
rqp->mq_flag =| IP_WANTED;
sleep(rqp, PMSG);
}
return;
}
}
/*
* Scan a process's message Q for a message of
* the desired type. If found, try to effect transfer
* of the message to the process.
*/
msgrecv(qp, type)
struct msgqhdr *qp;
{ register int n;
register struct msghdr *rhp1, *rhp2;
n = type;
for(rhp1 = qp; rhp2 = rhp1->mq_forw; rhp1 = rhp2)
if(n == 0 || n == rhp2->mq_type) {
n = min(rhp2->mq_size, u.u_ar0[R0]);
msgmove(rhp2, n, MSGOUT);
if(u.u_error) return(1);
if(suword(u.u_arg[2], rhp2->mq_sender) == -1 ||
suword(u.u_arg[2]+2, rhp2->mq_type) == -1)
u.u_error = EFAULT;
else {
msgremov(qp, rhp1);
msgfree(rhp2);
u.u_ar0[R0] = n;
}
return(1);
}
return(0);
}
/*
* Place the message pointed to by "hp" on the message Q
* pointed to by "qp". Awaken the process if it's waiting
* for arrival of a message.
*/
msgsend(qp, hp)
struct msgqhdr *qp;
struct msghdr *hp;
{ register int s;
register struct msgqhdr *rqp;
rqp = qp;
hp->mq_forw = NULL;
s = PS->integ;
spl6();
rqp->mq_last->mq_forw = hp;
rqp->mq_last = hp;
rqp->mq_cnt++;
PS->integ = s;
if(rqp->mq_flag & IP_WANTED) {
rqp->mq_flag =& ~IP_WANTED;
wakeup(rqp);
}
}
/*
* Alloc buffer space for a message and its
* associated header. The spl6 is needed only
* when messages are used for system error logging.
*/
msgsetup(size)
{ register int s;
register struct msghdr *rhp;
s = PS->integ;
spl6();
if((rhp = malloc(msgmap, (size+movrhead)&(~31))) != NULL)
rhp->mq_size = size;
PS->integ = s;
return(rhp);
}
/*
* Deallocate a message Q header and any messages
* pending on the Q. Messages requiring an ACK
* (types 1-63) are returned to the sending process as
* type 128, if possible.
*/
msgflush()
{ register struct msgqhdr *rqp1, *rqp2;
register struct msghdr *rhp;
if((rqp1 = u.u_msgqhdr) == NULL) {
u.u_error = ENOALOC;
return;
}
if(rqp1 == logqp) /* implicit termination of error log */
termlog();
u.u_msgqhdr = NULL;
rqp1->mq_procp = NULL;
while((rhp = rqp1->mq_forw) != NULL) {
msgremov(rqp1, rqp1);
if((rhp->mq_type) < 64 && (rqp2 = mqsrch(rhp->mq_sender))
&& rqp2->mq_cnt < MAXMSG) {
rhp->mq_type = 128;
msgsend(rqp2, rhp);
}
else msgfree(rhp);
}
}
/*
* Remove a message from a message Q.
*/
msgremov(qp, prevhp)
struct msgqhdr *qp;
struct msghdr *prevhp;
{ register int s;
register struct msghdr *rhp1, *rhp2;
rhp1 = prevhp;
rhp2 = rhp1->mq_forw;
s = PS->integ;
spl6();
qp->mq_cnt--;
if((rhp1->mq_forw = rhp2->mq_forw) == NULL)
qp->mq_last = rhp1;
PS->integ = s;
}
/*
* Free the buffer space used by a message and its header.
* Awaken any processes roadblocked because of
* insufficient buffer space.
*/
msgfree(hp)
struct msghdr *hp;
{ register int s;
s = PS->integ;
spl6();
mfree(msgmap, (hp->mq_size+movrhead)&(~31), hp);
PS->integ = s;
if(mmemwant) {
mmemwant = 0;
wakeup(msgcore);
}
}
/*
* Message interface to iomove()
*/
msgmove(hp, count, mode)
struct msghdr *hp;
{ register int len;
if((len = count) == 0) return;
u.u_offset[0] = 0;
u.u_offset[1] = 0;
u.u_count = len;
u.u_base = u.u_arg[1];
u.u_segflg = 0;
iomove(++hp, 0, len, mode | MSGIO);
}
/*
* See if a process is enabled for messages
*/
mqsrch(pid)
{ register struct proc *rpp;
register struct msgqhdr *rqp;
register id;
id = pid;
for(rqp = &msgqhdr[0]; rqp < &msgqhdr[NMQHDR]; rqp++)
if((rpp = rqp->mq_procp) != NULL && rpp->p_pid == id)
return(rqp);
return(NULL);
}
/*
* Initialization
*/
msginit()
{ struct msghdr proto;
movrhead = sizeof(proto) + 31;
mfree(msgmap, sizeof(msgcore), msgcore);
}