OpenSolaris_b135/lib/libslp/clib/slp_queue.c

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

/*
 * A synchronized FIFO queue for inter-thread producer-consumer semantics.
 * This queue will handle multiple writers and readers simultaneously.
 *
 * The following operations are provided:
 * slp_new_queue:	create a new queue
 * slp_enqueue:		place a message at the end of the queue
 * slp_enqueue_at_head:	place a message the the start of the queue
 * slp_dequeue:		remove and return the next message on the queue
 *				(waits indefinately)
 * slp_dequeue_timed:	remove and return the next message on the queue
 *				(waits only for a specified time)
 * slp_flush_queue:	flushes and frees all messages on a queue
 * slp_destroy_queue:	frees an empty queue.
 */

#include <stdio.h>
#include <stdlib.h>
#include <thread.h>
#include <synch.h>
#include <syslog.h>
#include <slp.h>
#include <slp-internal.h>

/* Private implementation details */
struct queue_entry {
	void *msg;
	struct queue_entry *next;
};
typedef struct queue_entry slp_queue_entry_t;

struct queue {
	slp_queue_entry_t *head;
	slp_queue_entry_t *tail;
	mutex_t *lock;
	cond_t *wait;
	int count;
};

/*
 * Creates, initializes, and returns a new queue.
 * If an initialization error occured, returns NULL and sets err to
 * the appropriate SLP error code.
 * queues can operate in one of two modes: timed-wait, and infinite
 * wait. The timeout parameter specifies which of these modes should
 * be enabled for the new queue.
 */
slp_queue_t *slp_new_queue(SLPError *err) {
	mutex_t *lock;
	cond_t *wait;
	struct queue *q;

	*err = SLP_OK;

	/* initialize new mutex and semaphore */
	if ((lock = calloc(1, sizeof (*lock))) == NULL) {
		*err = SLP_MEMORY_ALLOC_FAILED;
		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
		return (NULL);
	}

	/* intialize condition vars */
	if (!(wait = calloc(1, sizeof (*wait)))) {
		*err = SLP_MEMORY_ALLOC_FAILED;
		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
		return (NULL);
	}
	(void) cond_init(wait, NULL, NULL);

	/* create the queue */
	if ((q = malloc(sizeof (*q))) == NULL) {
		*err = SLP_MEMORY_ALLOC_FAILED;
		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
		return (NULL);
	}

	q->head = NULL;
	q->lock = lock;
	q->wait = wait;
	q->count = 0;

	return (q);
}

/*
 * Adds msg to the tail of queue q.
 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
 * if it couldn't allocate memory.
 */
SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
	slp_queue_entry_t *qe;
	struct queue *q = qa;

	if ((qe = malloc(sizeof (*qe))) == NULL) {
		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
		return (SLP_MEMORY_ALLOC_FAILED);
	}

	(void) mutex_lock(q->lock);
	qe->msg = msg;
	qe->next = NULL;
	if (q->head != NULL) {	/* queue is not emptry */
		q->tail->next = qe;
		q->tail = qe;
	} else {		/* queue is empty */
		q->head = q->tail = qe;
	}
	q->count++;
	(void) mutex_unlock(q->lock);
	(void) cond_signal(q->wait);

	return (SLP_OK);
}

/*
 * Inserts a message at the head of the queue. This is useful for inserting
 * things like cancel messages.
 */
SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
	slp_queue_entry_t *qe;
	struct queue *q = qa;

	if ((qe = malloc(sizeof (*qe))) == NULL) {
		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
		return (SLP_MEMORY_ALLOC_FAILED);
	}

	(void) mutex_lock(q->lock);
	qe->msg = msg;
	qe->next = q->head;
	q->head = qe;

	q->count++;
	(void) mutex_unlock(q->lock);
	(void) cond_signal(q->wait);

	return (SLP_OK);
}

/*
 * The core functionality for dequeue.
 */
static void *dequeue_nolock(struct queue *q) {
	void *msg;
	slp_queue_entry_t *qe = q->head;

	if (!qe)
		return (NULL);	/* shouldn't get here */
	msg = qe->msg;
	if (!qe->next)		/* last one in queue */
		q->head = q->tail = NULL;
	else
		q->head = qe->next;
	free(qe);
	q->count--;
	return (msg);
}

/*
 * Returns the first message waiting or arriving in the queue, or if no
 * message is available after waiting the amount of time specified in
 * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
 * returns NULL and sets 'etimed' to false.
 */
void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
	int err;
	void *ans;
	struct queue *q = qa;

	if (etimed)
		*etimed = SLP_FALSE;

	(void) mutex_lock(q->lock);
	if (q->count > 0) {
		/* something's in the q, so no need to wait */
		goto msg_available;
	}

	/* else wait */
	while (q->count == 0) {
		if (to) {
			err = cond_timedwait(q->wait, q->lock, to);
		} else {
			err = cond_wait(q->wait, q->lock);
		}
		if (err == ETIME) {
			(void) mutex_unlock(q->lock);
			*etimed = SLP_TRUE;
			return (NULL);
		}
	}

msg_available:
	ans = dequeue_nolock(q);
	(void) mutex_unlock(q->lock);
	return (ans);
}

/*
 * Removes the first message from the queue and returns it.
 * Returns NULL only on internal error.
 */
void *slp_dequeue(slp_queue_t *qa) {
	return (slp_dequeue_timed(qa, NULL, NULL));
}

/*
 * Flushes the queue, using the caller-specified free function to
 * free each message in the queue.
 */
void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
	slp_queue_entry_t *p, *pn;
	struct queue *q = qa;

	for (p = q->head; p; p = pn) {
		pn = p->next;
		free_f(p);
	}
}

/*
 * Frees a queue.
 * The queue must be empty before it can be destroyed; slp_flush_queue
 * can be used to empty a queue.
 */
void slp_destroy_queue(slp_queue_t *qa) {
	struct queue *q = qa;

	(void) mutex_destroy(q->lock);
	(void) cond_destroy(q->wait);
	free(q->lock);
	free(q->wait);
	free(q);
}