OpenSolaris_b135/lib/libslp/clib/slp_ua_common.c

Compare this file to the similar file:
Show the results in this format:

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <thread.h>
#include <synch.h>
#include <slp-internal.h>

/* This is used to pass needed params to consumer_thr and slp_call */
struct thr_call_args {
	slp_handle_impl_t *hp;
	SLPGenericAppCB *cb;
	void *cookie;
	SLPMsgReplyCB *msg_cb;
	slp_target_list_t *targets;
};

static SLPError consumer(void *);
static void slp_call(void *);
static SLPError check_message_fit(slp_handle_impl_t *, slp_target_list_t *);

SLPError slp_ua_common(SLPHandle hSLP, const char *scopes,
			SLPGenericAppCB cb, void *cookie,
			SLPMsgReplyCB msg_cb) {
	slp_handle_impl_t *hp;
	slp_target_list_t *targets;
	struct thr_call_args *args;
	slp_queue_t *q;
	SLPError err;
	thread_t tid;
	int terr;

	hp = (slp_handle_impl_t *)hSLP;

	/* select targets */
	if ((err = slp_new_target_list(hp, scopes, &targets)) != SLP_OK)
		return (err);
	if ((err = check_message_fit(hp, targets)) != SLP_OK) {
		slp_destroy_target_list(targets);
		return (err);
	}

	/* populate the args structure */
	args = malloc(sizeof (*args));
	if (args == NULL) {
		slp_err(LOG_CRIT, 0, "ua_common", "out of memory");
		return (SLP_MEMORY_ALLOC_FAILED);
	}

	args->hp = hp;
	args->cb = cb;
	args->cookie = cookie;
	args->msg_cb = msg_cb;
	args->targets = targets;

	/* create the queue that this call will use */
	q = slp_new_queue(&err);	/* freed in consumer_thr */
	if (err != SLP_OK)
		goto error;
	hp->q = q;

	/* kick off the producer thread */
	if ((terr = thr_create(
		NULL, 0, (void *(*)(void *)) slp_call, args, 0, &tid)) != 0) {
		slp_err(LOG_CRIT, 0, "ua_common", "could not start thread: %s",
			strerror(terr));
		err = SLP_INTERNAL_SYSTEM_ERROR;
		goto error;
	}
	hp->producer_tid = tid;

	if (hp->async) {
		/* kick off the consumer thread */
		if ((terr = thr_create(
			NULL, 0, (void *(*)(void *))consumer,
			args, 0, NULL)) != 0) {
			slp_err(LOG_CRIT, 0, "ua_common",
				"could not start thread: %s",
				strerror(terr));
			err = SLP_INTERNAL_SYSTEM_ERROR;
			/* cleanup producer thread, if necessary */
			hp->cancel = 1;
			(void) thr_join(tid, NULL, NULL);

			goto error;
		}
		return (SLP_OK);
	}
	/* else	sync */
	return (consumer(args));
error:
	free(args);
	return (err);
}

static SLPError consumer(void *ap) {
	slp_handle_impl_t *hp;
	char *reply;
	void *collator;
	int numResults = 0;
	struct thr_call_args *args = (struct thr_call_args *)ap;

	hp = args->hp;
	collator = NULL;
	hp->consumer_tid = thr_self();
	/* while cb wants more and there is more to get ... */
	for (;;) {
		SLPBoolean cont;

		reply = slp_dequeue(hp->q);
		/* reply == NULL if no more available or SLPClosed */
		cont = args->msg_cb(hp, reply, args->cb, args->cookie,
				    &collator, &numResults);

		if (reply) {
		    free(reply);
		} else {
		    break;
		}

		if (!cont) {
		    /* cb doesn't want any more; invoke last call */
		    args->msg_cb(hp, NULL, args->cb, args->cookie,
				    &collator, &numResults);
		    break;
		}
	}
	/* cleanup */
	/* clean stop producer [thread] */
	hp->cancel = 1;
	(void) thr_join(hp->producer_tid, NULL, NULL);

	/* empty and free queue */
	slp_flush_queue(hp->q, free);
	slp_destroy_queue(hp->q);

	free(args);
	slp_end_call(hp);
	return (SLP_OK);
}

/*
 * This is the producer thread
 */
static void slp_call(void *ap) {
	struct thr_call_args *args = (struct thr_call_args *)ap;
	slp_target_t *t;
	const char *uc_scopes, *mc_scopes;
	SLPBoolean use_tcp = SLP_FALSE;
	size_t len;

	/* Unicast */
	if (uc_scopes = slp_get_uc_scopes(args->targets)) {
		size_t mtu;
		int i;

		/* calculate msg length */
		len = slp_hdrlang_length(args->hp);
		for (i = 0; i < args->hp->msg.iovlen; i++) {
			len += args->hp->msg.iov[i].iov_len;
		}
		len += strlen(uc_scopes);

		mtu = slp_get_mtu();
		if (len > mtu)
			use_tcp = SLP_TRUE;

		for (
			t = slp_next_uc_target(args->targets);
			t;
			t = slp_next_uc_target(args->targets)) {
			if (args->hp->cancel)
				break;

			if (use_tcp)
				slp_uc_tcp_send(args->hp, t, uc_scopes,
						SLP_FALSE, 0);
			else
				slp_uc_udp_send(args->hp, t, uc_scopes);
		}
	}

	/* Multicast */
	if ((!args->hp->cancel) &&
	    (mc_scopes = slp_get_mc_scopes(args->targets)))
		slp_mc_send(args->hp, mc_scopes);

	/* Wait for TCP to complete, if necessary */
	if (args->hp->tcp_lock)
		slp_tcp_wait(args->hp);

	slp_destroy_target_list(args->targets);

	/* free the message */
	free(args->hp->msg.iov);
	free(args->hp->msg.msg);

	/* null terminate message queue */
	(void) slp_enqueue(args->hp->q, NULL);

	thr_exit(NULL);	/* we're outa here */
}

/*
 * If the message to be sent needs to be multicast, check that it
 * can fit into a datagram. If not, return BUFFER_OVERFLOW, otherwise
 * return SLP_OK.
 */
static SLPError check_message_fit(slp_handle_impl_t *hp,
					slp_target_list_t *targets) {
	size_t msgSize;
	int i;
	const char *mc_scopes;

	if (!(mc_scopes = slp_get_mc_scopes(targets)))
		return (SLP_OK);	/* no mc targets to worry about */

	msgSize = slp_hdrlang_length(hp);
	for (i = 0; i < hp->msg.iovlen; i++) {
		msgSize += hp->msg.iov[i].iov_len;
	}
	msgSize += strlen(mc_scopes);

	if (msgSize > slp_get_mtu())
		return (SLP_BUFFER_OVERFLOW);
	return (SLP_OK);
}