OpenBSD-4.6/usr.sbin/afs/src/arlad/kernel.c
/*
* Copyright (c) 1995 - 2002 Kungliga Tekniska Högskolan
* (Royal Institute of Technology, Stockholm, Sweden).
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the Institute nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "arla_local.h"
RCSID("$arla: kernel.c,v 1.35 2003/01/10 17:33:44 tol Exp $");
/*
* The fd we use to talk with the kernel on.
*/
int kernel_fd = -1;
/* count of the number of messages in a read */
static unsigned recv_count[20];
/* for more than above... */
static unsigned recv_count_overflow;
/*
* Number of workers used and high
*/
static unsigned long workers_high, workers_used;
unsigned long
kernel_highworkers(void)
{
return workers_high;
}
unsigned long
kernel_usedworkers(void)
{
return workers_used;
}
/*
*
*/
static int
process_message (int msg_length, char *msg)
{
struct nnpfs_message_header *header;
char *p;
int cnt;
cnt = 0;
for (p = msg;
msg_length > 0;
p += header->size, msg_length -= header->size) {
header = (struct nnpfs_message_header *)p;
nnpfs_message_receive (kernel_fd, header, header->size);
++cnt;
}
if (cnt < sizeof(recv_count)/sizeof(recv_count[0]))
++recv_count[cnt];
else
++recv_count_overflow;
return 0;
}
/* no threads available to handle messages */
static int overload = FALSE;
/*
* The work threads.
*/
struct worker {
char name[10];
char pad[6]; /* make sure we're aligned for the data */
char data[MAX_XMSG_SIZE];
PROCESS pid;
int msg_length;
int busyp;
int number;
} *workers;
static void
sub_thread (void *v_myself)
{
struct worker *self = (struct worker *)v_myself;
for (;;) {
arla_warnx (ADEBKERNEL, "worker %d waiting", self->number);
LWP_WaitProcess (self);
self->busyp = 1;
++workers_used;
arla_warnx (ADEBKERNEL, "worker %d: processing", self->number);
process_message (self->msg_length, self->data);
arla_warnx (ADEBKERNEL, "worker %d: done", self->number);
--workers_used;
self->busyp = 0;
overload = FALSE;
LWP_NoYieldSignal(&overload);
}
}
PROCESS version_pid;
static void
version_thread (void *foo)
{
nnpfs_probe_version (kernel_fd, NNPFS_VERSION);
}
/*
* The tcp communication unit
*/
static int
tcp_open (const char *filename)
{
int s, ret, port;
struct sockaddr_in addr;
if (strlen (filename) == 0)
arla_errx (1, ADEBERROR, "tcp_open doesn't contain tcp-port");
port = atoi (filename);
if (port == 0)
arla_errx (1, ADEBERROR, "tcp_open couldn't parse %s as a port#",
filename);
s = socket (PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s < 0) {
arla_warn (ADEBWARN, errno, "tcp_open: socket failed");
return s;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
addr.sin_port = htons(port);
ret = connect (s, (struct sockaddr *)&addr, sizeof(addr));
if (ret < 0) {
arla_warn (ADEBWARN, errno, "tcp_open: connect failed");
return s;
}
return s;
}
static int
tcp_opendef (const char *filename)
{
if (strlen (filename) != 0)
arla_warnx (ADEBWARN, "tcp_opendef ignoring extra data");
return tcp_open ("5000"); /* XXX */
}
static ssize_t
tcp_read (int fd, void *data, size_t len)
{
int32_t slen;
char in_len[4];
if (recv (fd, in_len, sizeof(in_len), 0) != sizeof(in_len)) {
arla_warn (ADEBWARN, errno, "tcp_read: failed to read length");
return -1;
}
memcpy(&slen, in_len, sizeof(slen));
slen = ntohl(slen);
if (len < slen) {
arla_warnx (ADEBWARN,
"tcp_read: recv a too large message %d",
slen);
return -1;
}
return recv (fd, data, slen, 0) == slen ? slen : -1;
}
static ssize_t
tcp_write (int fd, const void *data, size_t len)
{
int ret;
int32_t slen = htonl(len);
char out_len[4];
memcpy (out_len, &slen, sizeof(slen));
if (send (fd, out_len, sizeof(slen), 0) != sizeof(out_len)) {
arla_warn (ADEBWARN, errno, "tcp_write: failed to write length");
return -1;
}
ret = send (fd, data, len, 0);
if (ret != len) {
arla_warn (ADEBWARN, errno, "tcp_write: failed to write msg (%d)",
ret);
return -1;
}
return ret;
}
/*
* The cdev communication unit
*/
static int
dev_open (const char *filename)
{
char fn[MAXPATHLEN];
snprintf (fn, MAXPATHLEN, "/%s", filename);
return open (fn, O_RDWR);
}
static int
dev_fileopen (const char *filename)
{
return dev_open (filename);
}
static ssize_t
dev_read (int fd, void *msg, size_t len)
{
return read (fd, msg, len);
}
static ssize_t
dev_write (int fd, const void *msg, size_t len)
{
return write (fd, msg, len);
}
/*
* The null communication unit
*/
static int
null_open (const char *filename)
{
return 0;
}
static ssize_t
null_read (int fd, void *msg, size_t len)
{
return 0;
}
static ssize_t
null_write (int fd, const void *msg, size_t len)
{
return len;
}
/*
* Way to communticate with the kernel
*/
struct kern_interface {
const char *prefix;
int (*open) (const char *filename);
ssize_t (*read) (int fd, void *msg, size_t len);
ssize_t (*write) (int fd, const void *msg, size_t len);
} kern_comm[] = {
{ "/", dev_open, dev_read, dev_write},
{ "file:/", dev_fileopen, dev_read, dev_write},
{ "tcpport:", tcp_open, tcp_read, tcp_write},
{ "tcp", tcp_opendef, tcp_read, tcp_write},
{ "null", null_open, null_read, null_write},
{ NULL }
} ;
struct kern_interface *kern_cur = NULL;
static int
kern_open (const char *filename)
{
struct kern_interface *ki = &kern_comm[0];
int len;
while (ki->prefix) {
len = strlen (ki->prefix);
if (strncasecmp (ki->prefix, filename, len) == 0) {
break;
}
ki++;
}
if (ki->prefix == NULL)
return -1;
kern_cur = ki;
return (ki->open) (filename+len);
}
ssize_t
kern_read (int fd, void *data, size_t len)
{
assert (kern_cur != NULL);
return (kern_cur->read) (fd, data, len);
}
ssize_t
kern_write (int fd, const void *data, size_t len)
{
assert (kern_cur != NULL);
return (kern_cur->write) (fd, data, len);
}
/*
*
*/
void
kernel_opendevice (const char *dev)
{
int fd;
fd = kern_open (dev);
if (fd < 0)
arla_err (1, ADEBERROR, errno, "kern_open %s", dev);
kernel_fd = fd;
if (kernel_fd >= FD_SETSIZE)
arla_errx (1, ADEBERROR, "kernel fd too large");
}
/*
*
*/
#define WORKER_STACKSIZE (16*1024)
void
kernel_interface (struct kernel_args *args)
{
int i;
assert (kernel_fd >= 0);
workers = malloc (sizeof(*workers) * args->num_workers);
if (workers == NULL)
arla_err (1, ADEBERROR, errno, "malloc %lu failed",
(unsigned long)sizeof(*workers) * args->num_workers);
workers_high = args->num_workers;
workers_used = 0;
for (i = 0; i < args->num_workers; ++i) {
workers[i].busyp = 0;
workers[i].number = i;
snprintf(workers[i].name, sizeof(workers[i].name), "worker %d", i);
if (LWP_CreateProcess (sub_thread, WORKER_STACKSIZE, 1,
(char *)&workers[i],
workers[i].name, &workers[i].pid))
arla_errx (1, ADEBERROR, "CreateProcess of worker failed");
}
if (LWP_CreateProcess (version_thread, WORKER_STACKSIZE, 1,
NULL,
"version", &version_pid))
arla_errx (1, ADEBERROR, "CreateProcess of version thread failed");
arla_warnx(ADEBKERNEL, "Arla: selecting on fd: %d", kernel_fd);
for (;;) {
fd_set readset;
int ret;
FD_ZERO(&readset);
FD_SET(kernel_fd, &readset);
ret = IOMGR_Select (kernel_fd + 1, &readset, NULL, NULL, NULL);
if (ret < 0)
arla_warn (ADEBKERNEL, errno, "select");
else if (ret == 0)
arla_warnx (ADEBKERNEL,
"Arla: select returned with 0. strange.");
else if (FD_ISSET(kernel_fd, &readset)) {
for (i = 0; i < args->num_workers; ++i) {
if (workers[i].busyp == 0) {
ret = kern_read (kernel_fd, workers[i].data,
sizeof(workers[i].data));
if (ret <= 0) {
arla_warn (ADEBWARN, errno, "read");
} else {
workers[i].msg_length = ret;
LWP_SignalProcess (&workers[i]);
}
break;
}
}
if (i == args->num_workers) {
arla_warnx (ADEBWARN, "kernel: all %u workers busy",
args->num_workers);
overload = TRUE;
LWP_WaitProcess(&overload);
}
}
}
}