/* * Copyright (c) 1995 - 2002 Kungliga Tekniska Högskolan * (Royal Institute of Technology, Stockholm, Sweden). * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * 3. Neither the name of the Institute nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include "arla_local.h" RCSID("$arla: kernel.c,v 1.35 2003/01/10 17:33:44 tol Exp $"); /* * The fd we use to talk with the kernel on. */ int kernel_fd = -1; /* count of the number of messages in a read */ static unsigned recv_count[20]; /* for more than above... */ static unsigned recv_count_overflow; /* * Number of workers used and high */ static unsigned long workers_high, workers_used; unsigned long kernel_highworkers(void) { return workers_high; } unsigned long kernel_usedworkers(void) { return workers_used; } /* * */ static int process_message (int msg_length, char *msg) { struct nnpfs_message_header *header; char *p; int cnt; cnt = 0; for (p = msg; msg_length > 0; p += header->size, msg_length -= header->size) { header = (struct nnpfs_message_header *)p; nnpfs_message_receive (kernel_fd, header, header->size); ++cnt; } if (cnt < sizeof(recv_count)/sizeof(recv_count[0])) ++recv_count[cnt]; else ++recv_count_overflow; return 0; } /* no threads available to handle messages */ static int overload = FALSE; /* * The work threads. */ struct worker { char name[10]; char pad[6]; /* make sure we're aligned for the data */ char data[MAX_XMSG_SIZE]; PROCESS pid; int msg_length; int busyp; int number; } *workers; static void sub_thread (void *v_myself) { struct worker *self = (struct worker *)v_myself; for (;;) { arla_warnx (ADEBKERNEL, "worker %d waiting", self->number); LWP_WaitProcess (self); self->busyp = 1; ++workers_used; arla_warnx (ADEBKERNEL, "worker %d: processing", self->number); process_message (self->msg_length, self->data); arla_warnx (ADEBKERNEL, "worker %d: done", self->number); --workers_used; self->busyp = 0; overload = FALSE; LWP_NoYieldSignal(&overload); } } PROCESS version_pid; static void version_thread (void *foo) { nnpfs_probe_version (kernel_fd, NNPFS_VERSION); } /* * The tcp communication unit */ static int tcp_open (const char *filename) { int s, ret, port; struct sockaddr_in addr; if (strlen (filename) == 0) arla_errx (1, ADEBERROR, "tcp_open doesn't contain tcp-port"); port = atoi (filename); if (port == 0) arla_errx (1, ADEBERROR, "tcp_open couldn't parse %s as a port#", filename); s = socket (PF_INET, SOCK_STREAM, IPPROTO_TCP); if (s < 0) { arla_warn (ADEBWARN, errno, "tcp_open: socket failed"); return s; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ addr.sin_port = htons(port); ret = connect (s, (struct sockaddr *)&addr, sizeof(addr)); if (ret < 0) { arla_warn (ADEBWARN, errno, "tcp_open: connect failed"); return s; } return s; } static int tcp_opendef (const char *filename) { if (strlen (filename) != 0) arla_warnx (ADEBWARN, "tcp_opendef ignoring extra data"); return tcp_open ("5000"); /* XXX */ } static ssize_t tcp_read (int fd, void *data, size_t len) { int32_t slen; char in_len[4]; if (recv (fd, in_len, sizeof(in_len), 0) != sizeof(in_len)) { arla_warn (ADEBWARN, errno, "tcp_read: failed to read length"); return -1; } memcpy(&slen, in_len, sizeof(slen)); slen = ntohl(slen); if (len < slen) { arla_warnx (ADEBWARN, "tcp_read: recv a too large message %d", slen); return -1; } return recv (fd, data, slen, 0) == slen ? slen : -1; } static ssize_t tcp_write (int fd, const void *data, size_t len) { int ret; int32_t slen = htonl(len); char out_len[4]; memcpy (out_len, &slen, sizeof(slen)); if (send (fd, out_len, sizeof(slen), 0) != sizeof(out_len)) { arla_warn (ADEBWARN, errno, "tcp_write: failed to write length"); return -1; } ret = send (fd, data, len, 0); if (ret != len) { arla_warn (ADEBWARN, errno, "tcp_write: failed to write msg (%d)", ret); return -1; } return ret; } /* * The cdev communication unit */ static int dev_open (const char *filename) { char fn[MAXPATHLEN]; snprintf (fn, MAXPATHLEN, "/%s", filename); return open (fn, O_RDWR); } static int dev_fileopen (const char *filename) { return dev_open (filename); } static ssize_t dev_read (int fd, void *msg, size_t len) { return read (fd, msg, len); } static ssize_t dev_write (int fd, const void *msg, size_t len) { return write (fd, msg, len); } /* * The null communication unit */ static int null_open (const char *filename) { return 0; } static ssize_t null_read (int fd, void *msg, size_t len) { return 0; } static ssize_t null_write (int fd, const void *msg, size_t len) { return len; } /* * Way to communticate with the kernel */ struct kern_interface { const char *prefix; int (*open) (const char *filename); ssize_t (*read) (int fd, void *msg, size_t len); ssize_t (*write) (int fd, const void *msg, size_t len); } kern_comm[] = { { "/", dev_open, dev_read, dev_write}, { "file:/", dev_fileopen, dev_read, dev_write}, { "tcpport:", tcp_open, tcp_read, tcp_write}, { "tcp", tcp_opendef, tcp_read, tcp_write}, { "null", null_open, null_read, null_write}, { NULL } } ; struct kern_interface *kern_cur = NULL; static int kern_open (const char *filename) { struct kern_interface *ki = &kern_comm[0]; int len; while (ki->prefix) { len = strlen (ki->prefix); if (strncasecmp (ki->prefix, filename, len) == 0) { break; } ki++; } if (ki->prefix == NULL) return -1; kern_cur = ki; return (ki->open) (filename+len); } ssize_t kern_read (int fd, void *data, size_t len) { assert (kern_cur != NULL); return (kern_cur->read) (fd, data, len); } ssize_t kern_write (int fd, const void *data, size_t len) { assert (kern_cur != NULL); return (kern_cur->write) (fd, data, len); } /* * */ void kernel_opendevice (const char *dev) { int fd; fd = kern_open (dev); if (fd < 0) arla_err (1, ADEBERROR, errno, "kern_open %s", dev); kernel_fd = fd; if (kernel_fd >= FD_SETSIZE) arla_errx (1, ADEBERROR, "kernel fd too large"); } /* * */ #define WORKER_STACKSIZE (16*1024) void kernel_interface (struct kernel_args *args) { int i; assert (kernel_fd >= 0); workers = malloc (sizeof(*workers) * args->num_workers); if (workers == NULL) arla_err (1, ADEBERROR, errno, "malloc %lu failed", (unsigned long)sizeof(*workers) * args->num_workers); workers_high = args->num_workers; workers_used = 0; for (i = 0; i < args->num_workers; ++i) { workers[i].busyp = 0; workers[i].number = i; snprintf(workers[i].name, sizeof(workers[i].name), "worker %d", i); if (LWP_CreateProcess (sub_thread, WORKER_STACKSIZE, 1, (char *)&workers[i], workers[i].name, &workers[i].pid)) arla_errx (1, ADEBERROR, "CreateProcess of worker failed"); } if (LWP_CreateProcess (version_thread, WORKER_STACKSIZE, 1, NULL, "version", &version_pid)) arla_errx (1, ADEBERROR, "CreateProcess of version thread failed"); arla_warnx(ADEBKERNEL, "Arla: selecting on fd: %d", kernel_fd); for (;;) { fd_set readset; int ret; FD_ZERO(&readset); FD_SET(kernel_fd, &readset); ret = IOMGR_Select (kernel_fd + 1, &readset, NULL, NULL, NULL); if (ret < 0) arla_warn (ADEBKERNEL, errno, "select"); else if (ret == 0) arla_warnx (ADEBKERNEL, "Arla: select returned with 0. strange."); else if (FD_ISSET(kernel_fd, &readset)) { for (i = 0; i < args->num_workers; ++i) { if (workers[i].busyp == 0) { ret = kern_read (kernel_fd, workers[i].data, sizeof(workers[i].data)); if (ret <= 0) { arla_warn (ADEBWARN, errno, "read"); } else { workers[i].msg_length = ret; LWP_SignalProcess (&workers[i]); } break; } } if (i == args->num_workers) { arla_warnx (ADEBWARN, "kernel: all %u workers busy", args->num_workers); overload = TRUE; LWP_WaitProcess(&overload); } } } }