OpenSolaris_b135/lib/udapl/udapl_tavor/common/dapl_evd_util.c

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */

/*
 * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
 */

/*
 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

/*
 *
 * MODULE: dapl_evd_util.c
 *
 * PURPOSE: Manage EVD Info structure
 *
 * $Id: dapl_evd_util.c,v 1.41 2003/08/20 13:18:36 sjs2 Exp $
 */

#include <sys/time.h>
#include <strings.h>
#include "dapl_evd_util.h"
#include "dapl_ia_util.h"
#include "dapl_cno_util.h"
#include "dapl_ring_buffer_util.h"
#include "dapl_adapter_util.h"
#include "dapl_tavor_ibtf_impl.h"
#include "dapl_cookie.h"
#include "dapl.h"


#ifdef	DAPL_DBG	/* For debugging.  */
static void
dapli_evd_eh_print_cqe(
	IN  ib_work_completion_t	cqe);
#endif

static DAT_BOOLEAN
dapli_evd_cqe_to_event(
    IN DAPL_EVD			*evd_ptr,
    IN ib_work_completion_t	*cqe_ptr,
    IN DAT_BOOLEAN		process_premature_events,
    OUT DAT_EVENT		*event_ptr);

static DAT_RETURN
dapli_evd_event_alloc(
	IN  DAPL_EVD		*evd_ptr,
	IN  DAPL_CNO		*cno_ptr,
	IN  DAT_COUNT		qlen);


/*
 * dapls_evd_internal_create
 *
 * actually create the evd.  this is called after all parameter checking
 * has been performed in dapl_ep_create.  it is also called from dapl_ia_open
 * to create the default async evd.
 *
 * Input:
 * 	ia_ptr
 *	cno_ptr
 *	qlen
 *	evd_flags
 *
 * Output:
 * 	evd_ptr_ptr
 *
 * Returns:
 * 	none
 *
 */

DAT_RETURN
dapls_evd_internal_create(
    DAPL_IA		*ia_ptr,
    DAPL_CNO		*cno_ptr,
    DAT_COUNT		min_qlen,
    DAT_EVD_FLAGS	evd_flags,
    DAPL_EVD		**evd_ptr_ptr)
{
	DAPL_EVD	*evd_ptr;
	DAT_COUNT	cq_len;
	DAT_RETURN	dat_status;

	dat_status	= DAT_SUCCESS;
	*evd_ptr_ptr	= NULL;
	cq_len		= min_qlen;

	evd_ptr = dapls_evd_alloc(ia_ptr,
	    cno_ptr,
	    evd_flags,
	    min_qlen);
	if (!evd_ptr) {
		dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,
		    DAT_RESOURCE_MEMORY);
		goto bail;
	}

	/*
	 * If we are dealing with event streams besides a CQ event stream,
	 * be conservative and set producer side locking.  Otherwise, no.
	 */
	evd_ptr->evd_producer_locking_needed =
	    ((evd_flags & ~ (DAT_EVD_DTO_FLAG|DAT_EVD_RMR_BIND_FLAG)) != 0);

	/* Before we setup any callbacks, transition state to OPEN.  */
	evd_ptr->evd_state = DAPL_EVD_STATE_OPEN;

	/*
	 * we need to call cq_alloc even for connection/cr/async evds
	 * since all the allocation happens there.
	 */
	dat_status = dapls_ib_cq_alloc(ia_ptr,
	    evd_ptr, cno_ptr, &cq_len);
	if (dat_status != DAT_SUCCESS) {
		goto bail;
	}

	dat_status = dapls_ib_setup_async_callback(
	    ia_ptr,
	    DAPL_ASYNC_CQ_COMPLETION,
	    (unsigned int *) evd_ptr->ib_cq_handle,
	    (ib_async_handler_t)dapl_evd_dto_callback,
	    evd_ptr);
	if (dat_status != DAT_SUCCESS) {
		goto bail;
	}
	/*
	 * cq_notify is not required since when evd_wait is called
	 * time we go and poll cq anyways.
	 * dat_status = dapls_set_cq_notify(ia_ptr, evd_ptr);
	 */

	/*
	 * We now have an accurate count of events, so allocate them into
	 * the EVD
	 */
	dat_status = dapli_evd_event_alloc(evd_ptr, cno_ptr, cq_len);
	if (dat_status != DAT_SUCCESS) {
		goto bail;
	}

	/* We're assuming success in the following.   */
	dapl_os_assert(dat_status == DAT_SUCCESS);
	dapl_ia_link_evd(ia_ptr, evd_ptr);
	*evd_ptr_ptr = evd_ptr;

bail:
	if (dat_status != DAT_SUCCESS) {
		if (evd_ptr) {
			(void) dapls_evd_dealloc(evd_ptr);
		}
	}

	return (dat_status);
}

/*
 * dapls_evd_alloc
 *
 * alloc and initialize an EVD struct
 *
 * Input:
 * 	ia
 *
 * Output:
 * 	evd_ptr
 *
 * Returns:
 * 	none
 *
 */
DAPL_EVD *
dapls_evd_alloc(
    IN DAPL_IA		*ia_ptr,
    IN DAPL_CNO		*cno_ptr,
    IN DAT_EVD_FLAGS	evd_flags,
    IN DAT_COUNT	qlen) /* ARGSUSED */
{
	DAPL_EVD	*evd_ptr;

	evd_ptr    = NULL;

	/* Allocate EVD */
	evd_ptr = (DAPL_EVD *)dapl_os_alloc(sizeof (DAPL_EVD));
	if (!evd_ptr) {
		goto bail;
	}

	/* zero the structure */
	(void) dapl_os_memzero(evd_ptr, sizeof (DAPL_EVD));

	/*
	 * initialize the header
	 */
	evd_ptr->header.provider		= ia_ptr->header.provider;
	evd_ptr->header.magic			= DAPL_MAGIC_EVD;
	evd_ptr->header.handle_type		= DAT_HANDLE_TYPE_EVD;
	evd_ptr->header.owner_ia		= ia_ptr;
	evd_ptr->header.user_context.as_64	= 0;
	evd_ptr->header.user_context.as_ptr	= NULL;
	dapl_llist_init_entry(&evd_ptr->header.ia_list_entry);
	dapl_os_lock_init(&evd_ptr->header.lock);

	/*
	 * Initialize the body
	 */
	evd_ptr->evd_state	= DAPL_EVD_STATE_INITIAL;
	evd_ptr->evd_flags	= evd_flags;
	evd_ptr->evd_enabled	= DAT_TRUE;
	evd_ptr->evd_waitable	= DAT_TRUE;
	evd_ptr->evd_producer_locking_needed = 1; /* Conservative value.  */
	evd_ptr->ib_cq_handle	= IB_INVALID_HANDLE;
	evd_ptr->evd_ref_count	= 0;
	evd_ptr->catastrophic_overflow = DAT_FALSE;
	evd_ptr->qlen		= qlen;

	dapl_llist_init_entry(&evd_ptr->cno_list_entry);
	evd_ptr->completion_type = DAPL_EVD_STATE_THRESHOLD;
	(void) dapl_os_wait_object_init(&evd_ptr->wait_object);

bail:
	return (evd_ptr);
}


/*
 * dapls_evd_event_alloc
 *
 * alloc events into an EVD.
 *
 * Input:
 * 	evd_ptr
 *	qlen
 *
 * Output:
 * 	NONE
 *
 * Returns:
 * 	DAT_SUCCESS
 *	ERROR
 *
 */
DAT_RETURN
dapli_evd_event_alloc(
    IN DAPL_EVD		*evd_ptr,
    IN  DAPL_CNO	*cno_ptr,
    IN DAT_COUNT	qlen)
{
	DAT_EVENT	*event_ptr;
	DAT_COUNT	i;
	DAT_RETURN	dat_status;

	dat_status = DAT_SUCCESS;
	event_ptr  = NULL;

	/* Allocate EVENTs */
	event_ptr = (DAT_EVENT *) dapl_os_alloc(qlen * sizeof (DAT_EVENT));
	if (!event_ptr) {
		goto bail;
	}
	evd_ptr->events = event_ptr;
	evd_ptr->qlen = qlen;

	/* allocate free event queue */
	dat_status = dapls_rbuf_alloc(&evd_ptr->free_event_queue, qlen);
	if (dat_status != DAT_SUCCESS) {
		goto bail;
	}

	/* allocate pending event queue */
	dat_status = dapls_rbuf_alloc(&evd_ptr->pending_event_queue, qlen);
	if (dat_status != DAT_SUCCESS) {
		goto bail;
	}

	/* add events to free event queue */
	for (i = 0; i < qlen; i++) {
		dat_status = dapls_rbuf_add(&evd_ptr->free_event_queue,
		    (void *)event_ptr);
		dapl_os_assert(dat_status == DAT_SUCCESS);
		event_ptr++;
	}
	evd_ptr->cq_notified = DAT_FALSE;
	evd_ptr->cq_notified_when = 0;
	evd_ptr->cno_active_count = 0;
	if (cno_ptr != NULL) {
		dapl_os_lock(&cno_ptr->header.lock);
		dapl_llist_add_head(&cno_ptr->evd_list_head,
		    &evd_ptr->cno_list_entry, evd_ptr);
		/* Take a reference count on the CNO */
		dapl_os_atomic_inc(&cno_ptr->cno_ref_count);
		dapl_os_unlock(&cno_ptr->header.lock);
	}
	evd_ptr->cno_ptr = cno_ptr;
	evd_ptr->threshold = 0;

bail:
	return (dat_status);
}


/*
 * dapls_evd_dealloc
 *
 * Free the passed in EVD structure. If an error occurs, this function
 * will clean up all of the internal data structures and report the
 * error.
 *
 * Input:
 * 	evd_ptr
 *
 * Output:
 * 	none
 *
 * Returns:
 * 	status
 *
 */
DAT_RETURN
dapls_evd_dealloc(
    IN DAPL_EVD		*evd_ptr)
{
	DAT_RETURN	dat_status;
	DAPL_IA	*ia_ptr;

	dat_status = DAT_SUCCESS;

	dapl_os_assert(evd_ptr->header.magic == DAPL_MAGIC_EVD);
	dapl_os_assert(evd_ptr->evd_ref_count == 0);

	/*
	 * Destroy the CQ first, to keep any more callbacks from coming
	 * up from it.
	 */
	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
		ia_ptr = evd_ptr->header.owner_ia;

		dat_status = dapls_ib_cq_free(ia_ptr, evd_ptr);
		if (dat_status != DAT_SUCCESS) {
			goto bail;
		}
	}

	/*
	 * We should now be safe to invalidate the EVD; reset the
	 * magic to prevent reuse.
	 */
	evd_ptr->header.magic = DAPL_MAGIC_INVALID;

	/* Release reference on the CNO if it exists */
	if (evd_ptr->cno_ptr != NULL) {
		dapl_os_lock(&evd_ptr->cno_ptr->header.lock);
		(void) dapl_llist_remove_entry(&evd_ptr->cno_ptr->evd_list_head,
		    &evd_ptr->cno_list_entry);
		dapl_os_atomic_dec(&evd_ptr->cno_ptr->cno_ref_count);
		dapl_os_unlock(&evd_ptr->cno_ptr->header.lock);
	}

	/*
	 * If the ring buffer allocation failed, then the dapls_rbuf_destroy
	 * function will detect that the ring buffer's internal data (ex. base
	 * pointer) are invalid and will handle the situation appropriately
	 */
	dapls_rbuf_destroy(&evd_ptr->free_event_queue);
	dapls_rbuf_destroy(&evd_ptr->pending_event_queue);

	if (evd_ptr->events) {
		dapl_os_free(evd_ptr->events,
		    evd_ptr->qlen * sizeof (DAT_EVENT));
	}

	(void) dapl_os_wait_object_destroy(&evd_ptr->wait_object);
	dapl_os_free(evd_ptr, sizeof (DAPL_EVD));

bail:
	return (dat_status);
}


/*
 * dapli_evd_eh_print_cqe
 *
 * Input:
 *	cqe
 *
 * Output:
 *	none
 *
 * Prints out a CQE for debug purposes
 *
 */

#ifdef	DAPL_DBG	/* For debugging.  */
void
dapli_evd_eh_print_cqe(
    IN 	ib_work_completion_t	cqe)
{
	static char *optable[] = {
		"",
		"OP_SEND",
		"OP_RDMA_READ",
		"OP_RDMA_WRITE",
		"OP_COMP_AND_SWAP",
		"OP_FETCH_AND_ADD",
		"OP_BIND_MW",
		"OP_RECEIVE",
		"OP_RECEIVE_RDMAWI",
		0
	};
	DAPL_COOKIE		*dto_cookie;

	dto_cookie = (DAPL_COOKIE *) (uintptr_t)DAPL_GET_CQE_WRID(&cqe);

	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t dapl_evd_dto_callback : CQE \n");
	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t\t work_req_id 0x%llx\n", DAPL_GET_CQE_WRID(&cqe));
	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t\t op_type: %s\n", optable[DAPL_GET_CQE_OPTYPE(&cqe)]);
	if ((DAPL_GET_CQE_OPTYPE(&cqe) == OP_SEND) ||
	    (DAPL_GET_CQE_OPTYPE(&cqe) == OP_RDMA_WRITE)) {
		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
		    "\t\t bytes_num %d\n", dto_cookie->val.dto.size);
	} else {
		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
		    "\t\t bytes_num %d\n", DAPL_GET_CQE_BYTESNUM(&cqe));
	}
	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t\t status %d\n", DAPL_GET_CQE_STATUS(&cqe));
	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
}
#endif

/*
 * Event posting code follows.
 */

/*
 * These next two functions (dapli_evd_get_event and dapli_evd_post_event)
 * are a pair.  They are always called together, from one of the functions
 * at the end of this file (dapl_evd_post_*_event).
 *
 * Note that if producer side locking is enabled, the first one takes the
 * EVD lock and the second releases it.
 */

/*
 * dapli_evd_get_event
 *
 * Get an event struct from the evd.  The caller should fill in the event
 * and call dapl_evd_post_event.
 *
 * If there are no events available, an overflow event is generated to the
 * async EVD handler.
 *
 * If this EVD required producer locking, a successful return implies
 * that the lock is held.
 *
 * Input:
 * 	evd_ptr
 *
 * Output:
 *	event
 *
 */

static DAT_EVENT *
dapli_evd_get_event(
    DAPL_EVD *evd_ptr)
{
	DAT_EVENT	*event;

	if (evd_ptr->evd_producer_locking_needed) {
		dapl_os_lock(&evd_ptr->header.lock);
	}

	event = (DAT_EVENT *)dapls_rbuf_remove(&evd_ptr->free_event_queue);

	/* Release the lock if it was taken and the call failed.  */
	if (!event && evd_ptr->evd_producer_locking_needed) {
		dapl_os_unlock(&evd_ptr->header.lock);
	}

	return (event);
}

/*
 * dapli_evd_post_event
 *
 * Post the <event> to the evd.  If possible, invoke the evd's CNO.
 * Otherwise post the event on the pending queue.
 *
 * If producer side locking is required, the EVD lock must be held upon
 * entry to this function.
 *
 * Input:
 * 	evd_ptr
 * 	event
 *
 * Output:
 *	none
 *
 */

static void
dapli_evd_post_event(
    IN	DAPL_EVD	*evd_ptr,
    IN	const DAT_EVENT	*event_ptr)
{
	DAT_RETURN	dat_status;
	DAPL_CNO 	*cno_to_trigger = NULL;

	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
	    "dapli_evd_post_event: Called with event # %x\n",
	    event_ptr->event_number);

	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
	    (void *)event_ptr);
	dapl_os_assert(dat_status == DAT_SUCCESS);

	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);

	if (evd_ptr->evd_state == DAPL_EVD_STATE_OPEN) {
		/* No waiter.  Arrange to trigger a CNO if it exists.  */

		if (evd_ptr->evd_enabled) {
			cno_to_trigger = evd_ptr->cno_ptr;
		}
		if (evd_ptr->evd_producer_locking_needed) {
			dapl_os_unlock(&evd_ptr->header.lock);
		}
	} else {
		/*
		 * This routine gets called
		 *  - In the context of the waiting thread when CQ, CM or ASYNC
		 *    events need to be put on to the EVD ring buffer.
		 *  - Due to a post of a software event.
		 *
		 * In the first case the waiting thread is pulling the events
		 * from various streams into the evd so there is no need to
		 * wake any thread. In the second case if the evd is in waited
		 * state then we need to wakeup the waiting thread.
		 */
		if (event_ptr->event_number == DAT_SOFTWARE_EVENT) {
			/*
			 * We're in DAPL_EVD_STATE_WAITED.  Take the lock if
			 * we don't have it, recheck, and signal.
			 */

			if (!evd_ptr->evd_producer_locking_needed) {
				dapl_os_lock(&evd_ptr->header.lock);
			}

			if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED) {
				dapl_os_unlock(&evd_ptr->header.lock);
				(void) dapls_ib_event_wakeup(evd_ptr);
			} else {
				dapl_os_unlock(&evd_ptr->header.lock);
			}
		} else {
			if (evd_ptr->evd_producer_locking_needed) {
				dapl_os_unlock(&evd_ptr->header.lock);
			}
		}
	}

	if (cno_to_trigger != NULL) {
		dapl_cno_trigger(cno_to_trigger, evd_ptr);
	}
}

/*
 * dapli_evd_post_event_nosignal
 *
 * Post the <event> to the evd.  Do not do any wakeup processing.
 * This function should only be called if it is known that there are
 * no waiters that it is appropriate to wakeup on this EVD.  An example
 * of such a situation is during internal dat_evd_wait() processing.
 *
 * If producer side locking is required, the EVD lock must be held upon
 * entry to this function.
 *
 * Input:
 * 	evd_ptr
 * 	event
 *
 * Output:
 *	none
 *
 */

static void
dapli_evd_post_event_nosignal(
    IN	DAPL_EVD	*evd_ptr,
    IN	const DAT_EVENT	*event_ptr)
{
	DAT_RETURN	dat_status;

	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
	    "dapli_evd_post_event: Called with event # %x\n",
	    event_ptr->event_number);

	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
	    (void *)event_ptr);
	dapl_os_assert(dat_status == DAT_SUCCESS);

	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);

	if (evd_ptr->evd_producer_locking_needed) {
		dapl_os_unlock(&evd_ptr->header.lock);
	}
}

/*
 * dapli_evd_format_overflow_event
 *
 * format an overflow event for posting
 *
 * Input:
 * 	evd_ptr
 * 	event_ptr
 *
 * Output:
 *	none
 *
 */
static void
dapli_evd_format_overflow_event(
	IN  DAPL_EVD  *evd_ptr,
	OUT DAT_EVENT *event_ptr)
{
	DAPL_IA *ia_ptr;

	ia_ptr = evd_ptr->header.owner_ia;

	event_ptr->evd_handle   = (DAT_EVD_HANDLE)evd_ptr;
	event_ptr->event_number = DAT_ASYNC_ERROR_EVD_OVERFLOW;
	event_ptr->event_data.asynch_error_event_data.dat_handle =
	    (DAT_HANDLE)ia_ptr;
}

/*
 * dapli_evd_post_overflow_event
 *
 * post an overflow event
 *
 * Input:
 * 	async_evd_ptr
 * 	evd_ptr
 *
 * Output:
 *	none
 *
 */
static void
dapli_evd_post_overflow_event(
    IN  DAPL_EVD  *async_evd_ptr,
    IN  DAPL_EVD  *overflow_evd_ptr)
{
	DAT_EVENT *overflow_event;

	/*
	 * The overflow_evd_ptr mght be the same as evd.
	 * In that case we've got a catastrophic overflow.
	 */
	if (async_evd_ptr == overflow_evd_ptr) {
		async_evd_ptr->catastrophic_overflow = DAT_TRUE;
		async_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
		return;
	}

	overflow_event = dapli_evd_get_event(overflow_evd_ptr);
	if (!overflow_event) {
		/* this is not good */
		overflow_evd_ptr->catastrophic_overflow = DAT_TRUE;
		overflow_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
		return;
	}
	dapli_evd_format_overflow_event(overflow_evd_ptr, overflow_event);
	dapli_evd_post_event(overflow_evd_ptr, overflow_event);
}

static DAT_EVENT *
dapli_evd_get_and_init_event(
    IN DAPL_EVD				*evd_ptr,
    IN DAT_EVENT_NUMBER			event_number)
{
	DAT_EVENT 		*event_ptr;

	event_ptr = dapli_evd_get_event(evd_ptr);
	if (NULL == event_ptr) {
		dapli_evd_post_overflow_event(
		    evd_ptr->header.owner_ia->async_error_evd, evd_ptr);
	} else {
		event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;
		event_ptr->event_number = event_number;
	}

	return (event_ptr);
}

DAT_RETURN
dapls_evd_post_cr_arrival_event(
    IN DAPL_EVD				*evd_ptr,
    IN DAT_EVENT_NUMBER			event_number,
    IN DAT_SP_HANDLE			sp_handle,
    DAT_IA_ADDRESS_PTR			ia_address_ptr,
    DAT_CONN_QUAL			conn_qual,
    DAT_CR_HANDLE			cr_handle)
{
	DAT_EVENT 		*event_ptr;
	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
	/*
	 * Note event lock may be held on successful return
	 * to be released by dapli_evd_post_event(), if provider side locking
	 * is needed.
	 */

	if (!event_ptr) {
		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
	}

	event_ptr->event_data.cr_arrival_event_data.sp_handle = sp_handle;
	event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr
	    = ia_address_ptr;
	event_ptr->event_data.cr_arrival_event_data.conn_qual = conn_qual;
	event_ptr->event_data.cr_arrival_event_data.cr_handle = cr_handle;

	dapli_evd_post_event(evd_ptr, event_ptr);
	return (DAT_SUCCESS);
}


DAT_RETURN
dapls_evd_post_connection_event(
    IN DAPL_EVD				*evd_ptr,
    IN DAT_EVENT_NUMBER			event_number,
    IN DAT_EP_HANDLE			ep_handle,
    IN DAT_COUNT			private_data_size,
    IN DAT_PVOID			private_data)
{
	DAT_EVENT 		*event_ptr;
	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
	/*
	 * Note event lock may be held on successful return
	 * to be released by dapli_evd_post_event(), if provider side locking
	 * is needed.
	 */

	if (!event_ptr) {
		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
	}

	event_ptr->event_data.connect_event_data.ep_handle = ep_handle;
	event_ptr->event_data.connect_event_data.private_data_size
	    = private_data_size;
	event_ptr->event_data.connect_event_data.private_data = private_data;

	dapli_evd_post_event(evd_ptr, event_ptr);
	return (DAT_SUCCESS);
}


DAT_RETURN
dapls_evd_post_async_error_event(
    IN DAPL_EVD				*evd_ptr,
    IN DAT_EVENT_NUMBER			event_number,
    IN DAT_IA_HANDLE			ia_handle)
{
	DAT_EVENT 		*event_ptr;
	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
	/*
	 * Note event lock may be held on successful return
	 * to be released by dapli_evd_post_event(), if provider side locking
	 * is needed.
	 */

	if (!event_ptr) {
		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
	}

	event_ptr->event_data.asynch_error_event_data.dat_handle = ia_handle;

	dapli_evd_post_event(evd_ptr, event_ptr);
	return (DAT_SUCCESS);
}


DAT_RETURN
dapls_evd_post_software_event(
    IN DAPL_EVD				*evd_ptr,
    IN DAT_EVENT_NUMBER			event_number,
    IN DAT_PVOID			pointer)
{
	DAT_EVENT 		*event_ptr;
	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
	/*
	 * Note event lock may be held on successful return
	 * to be released by dapli_evd_post_event(), if provider side locking
	 * is needed.
	 */

	if (!event_ptr) {
		return (DAT_QUEUE_FULL);
	}

	event_ptr->event_data.software_event_data.pointer = pointer;

	dapli_evd_post_event(evd_ptr, event_ptr);
	return (DAT_SUCCESS);
}

void
dapls_evd_post_premature_events(IN DAPL_EP *ep_ptr)
{
	DAPL_EVD		*evd_ptr;
	DAT_EVENT		*event;
	ib_work_completion_t	*cqe;
	uint32_t		qpn;
	int			prm_idx;
	int			nevents;
	int			i;

	dapls_ib_poll_premature_events(ep_ptr, &cqe, &nevents);
	/* premature events are always recv events */
	evd_ptr = ep_ptr->param.recv_evd_handle;
	qpn = ep_ptr->qpn;

	i = 0;
	prm_idx = 0;
	while (i < nevents) {
		/*
		 * If srq_attached, premature events cannot exceed max_recv_dtos
		 */
		dapl_os_assert(!ep_ptr->srq_attached ||
		    (prm_idx <= ((DAPL_SRQ *)ep_ptr->param.srq_handle)->
		    param.max_recv_dtos));

		/*
		 * The SRQ premature event list could potentially have
		 * holes (ie. free entries in the middle) or premature
		 * events for other QPs. These need to be skipped.
		 */
		if (ep_ptr->srq_attached &&
		    (!DAPL_CQE_IS_VALID(&cqe[prm_idx]) ||
		    (DAPL_GET_CQE_QPN(&cqe[prm_idx]) != qpn))) {
			prm_idx++;
			continue;
		}

		dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
		    " Premature DTO processing\n");

#ifdef	DAPL_DBG	/* For debugging.  */
		dapli_evd_eh_print_cqe(cqe[i]);
#endif
		/*
		 * Can use DAT_DTO_COMPLETION_EVENT because
		 * dapli_evd_cqe_to_event will overwrite.
		 */
		event = dapli_evd_get_and_init_event(evd_ptr,
		    DAT_DTO_COMPLETION_EVENT);
		if (event == NULL) {
			/* We've already attempted the overflow post, return */
			return;
		}
		(void) dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_TRUE,
		    event);
		dapli_evd_post_event_nosignal(evd_ptr, event);
		/*
		 * For SRQ attached QPs recycle the premature event
		 */
		if (ep_ptr->srq_attached) {
			dapls_ib_free_premature_events(ep_ptr, prm_idx);
			prm_idx++;
		}
		i++;
	}
}

/*
 * dapli_evd_cqe_to_event
 *
 * Convert a CQE into an event structure.
 *
 * Input:
 *	evd_ptr
 * 	cqe_ptr
 *
 * Output:
 * 	event_ptr
 *
 * Returns:
 * 	none
 *
 */
static DAT_BOOLEAN
dapli_evd_cqe_to_event(
    IN DAPL_EVD			*evd_ptr,
    IN ib_work_completion_t	*cqe_ptr,
    IN DAT_BOOLEAN		process_premature_events,
    OUT DAT_EVENT		*event_ptr)
{
	DAPL_EP			*ep_ptr;
	DAPL_SRQ		*srq_ptr;
	DAPL_COOKIE		*cookie;
	DAT_EP_STATE		ep_state;
	ib_qp_handle_t		qp;
	ib_uint32_t		ib_status;
	ib_uint32_t		ibtype;
	int			srq_enabled;
	int			dto_error = 0;


	/*
	 * All that can be relied on if the status is bad is the status
	 * and WRID.
	 */
	ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);

	cookie = (DAPL_COOKIE *)((uintptr_t)DAPL_GET_CQE_WRID(cqe_ptr));
	dapl_os_assert((NULL != cookie));

	if (cookie->queue_type == DAPL_COOKIE_QUEUE_EP) {
		srq_enabled = 0;
		ep_ptr = cookie->queue.ep;
	} else {
		srq_enabled = 1;
		srq_ptr = cookie->queue.srq;
		dapl_os_assert(NULL != srq_ptr);
		dapl_os_assert(srq_ptr->header.magic == DAPL_MAGIC_SRQ);
		ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);
		ep_ptr = dapls_ib_srq_lookup_ep(srq_ptr, cqe_ptr);
	}

	dapl_os_assert((NULL != ep_ptr));
	dapl_os_assert((ep_ptr->header.magic == DAPL_MAGIC_EP) ||
	    (ep_ptr->header.magic == DAPL_MAGIC_EP_EXIT));

	event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;

	/*
	 * Check if the DTO completion arrived before CONNECTION_ESTABLISHED
	 * event -
	 *
	 * Send DTOs can occur only if ep state is CONNECTED/DISCONNECTED
	 * therefore it cannot occur before connection established event.
	 * Receive DTO can potentially complete before connection established
	 * event has been delivered to the client. In this case if the
	 * ep state is ACTIVE_CONNECTION_PENDING (active side) or
	 * COMPLETION_PENDING (passive side) the event is put in a special
	 * event queue in the qp_handle.
	 *
	 */
	if (!process_premature_events &&
	    (cookie->type == DAPL_COOKIE_TYPE_DTO) &&
	    (ib_status == IB_COMP_ST_SUCCESS)) {
		ep_state = ep_ptr->param.ep_state;
		qp = ep_ptr->qp_handle;
		if ((ep_state == DAT_EP_STATE_ACTIVE_CONNECTION_PENDING) ||
		    (ep_state == DAT_EP_STATE_COMPLETION_PENDING) ||
		    (qp->qp_num_premature_events > 0)) {
			/*
			 * not yet ready to put the event in the evd ring
			 * buffer
			 */
			dapls_ib_store_premature_events(qp, cqe_ptr);
			return (DAT_FALSE);
		}
	}

	switch (cookie->type) {
	case DAPL_COOKIE_TYPE_DTO:
	{
		DAPL_COOKIE_BUFFER	*buffer;

		if (DAPL_DTO_TYPE_RECV == cookie->val.dto.type) {
			if (srq_enabled) {
				dapl_os_atomic_dec(&srq_ptr->recv_count);
				buffer = &srq_ptr->recv_buffer;
			} else {
				dapl_os_atomic_dec(&ep_ptr->recv_count);
				buffer = &ep_ptr->recv_buffer;
			}
		} else {
			dapl_os_atomic_dec(&ep_ptr->req_count);
			buffer = &ep_ptr->req_buffer;
		}

		event_ptr->event_number = DAT_DTO_COMPLETION_EVENT;
		event_ptr->event_data.dto_completion_event_data.ep_handle =
		    ep_ptr;
		event_ptr->event_data.dto_completion_event_data.user_cookie =
		    cookie->val.dto.cookie;

		switch (ib_status) {
		case IB_COMP_ST_SUCCESS:
		{
			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);

			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_SUCCESS;
			dapl_os_assert((ibtype == OP_SEND &&
			    cookie->val.dto.type == DAPL_DTO_TYPE_SEND) ||
			    (ibtype == OP_RECEIVE &&
			    cookie->val.dto.type == DAPL_DTO_TYPE_RECV) ||
			    (ibtype == OP_RDMA_WRITE &&
			    cookie->val.dto.type ==
			    DAPL_DTO_TYPE_RDMA_WRITE) ||
			    (ibtype == OP_RDMA_READ &&
			    cookie->val.dto.type ==
			    DAPL_DTO_TYPE_RDMA_READ));
			break;
		}
		case IB_COMP_ST_LOCAL_LEN_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_LOCAL_LENGTH;
			break;
		}
		case IB_COMP_ST_LOCAL_PROTECT_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_LOCAL_PROTECTION;
			break;
		}
		case IB_COMP_ST_WR_FLUSHED_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_FLUSHED;
			break;
		}
		case IB_COMP_ST_BAD_RESPONSE_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_BAD_RESPONSE;
			break;
		}
		case IB_COMP_ST_REM_REQ_ERR:
		case IB_COMP_ST_REM_OP_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_REMOTE_RESPONDER;
			break;
		}
		case IB_COMP_ST_REM_ACC_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_REMOTE_ACCESS;
			break;
		}
		/*
		 * Unsupported RD errors
		 * case IB_COMP_ST_EE_STATE_ERR:
		 * case IB_COMP_ST_EE_CTX_NO_ERR:
		 */
		case IB_COMP_ST_TRANSP_COUNTER:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_TRANSPORT;
			break;
		}
		case IB_COMP_ST_RNR_COUNTER:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_RECEIVER_NOT_READY;
			break;
		}
		case IB_COMP_ST_MW_BIND_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_RMR_OPERATION_FAILED;
			break;
		}
		case IB_COMP_ST_LOCAL_OP_ERR:
		{
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_ERR_LOCAL_EP;
			break;
		}
		default:
		{
			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
			    " DTO completion ERROR: %d: op %#x\n",
			    DAPL_GET_CQE_STATUS(cqe_ptr),
			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
			event_ptr->event_data.dto_completion_event_data.status =
			    DAT_DTO_FAILURE;
			break;
		}
		}

		/* Most error DTO ops result in disconnecting the EP */
		if ((event_ptr->event_data.dto_completion_event_data.status !=
		    DAT_DTO_SUCCESS) &&
		    (event_ptr->event_data.dto_completion_event_data.status !=
		    DAT_RMR_OPERATION_FAILED)) {
			dto_error = 1;
			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
			    " DTO completion ERROR: %d: op %#x\n",
			    DAPL_GET_CQE_STATUS(cqe_ptr),
			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
		}

		if (cookie->val.dto.type == DAPL_DTO_TYPE_SEND ||
		    cookie->val.dto.type == DAPL_DTO_TYPE_RDMA_WRITE) {
			/* Get size from DTO; CQE value may be off.  */
			event_ptr->event_data.dto_completion_event_data.
			    transfered_length = cookie->val.dto.size;
		} else {
			event_ptr->event_data.dto_completion_event_data.
			    transfered_length = DAPL_GET_CQE_BYTESNUM(cqe_ptr);
		}

		dapls_cookie_dealloc(buffer, cookie);
		break;
	}

	case DAPL_COOKIE_TYPE_RMR:
	{
		dapl_os_atomic_dec(&ep_ptr->req_count);

		event_ptr->event_number = DAT_RMR_BIND_COMPLETION_EVENT;

		event_ptr->event_data.rmr_completion_event_data.rmr_handle =
		    cookie->val.rmr.rmr;
		event_ptr->event_data.rmr_completion_event_data.user_cookie =
		    cookie->val.rmr.cookie;
		if (ib_status == IB_COMP_ST_SUCCESS) {
			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);

			event_ptr->event_data.rmr_completion_event_data.status =
			    DAT_RMR_BIND_SUCCESS;
			dapl_os_assert(ibtype == OP_BIND_MW);
		} else {
			event_ptr->event_data.rmr_completion_event_data.status =
			    DAT_RMR_BIND_FAILURE;
			dto_error = 1;
		}

		dapls_cookie_dealloc(&ep_ptr->req_buffer, cookie);
		break;
	}
	default:
	{
		dapl_os_assert(!"Invalid Operation type");
		break;
	}
	}

	/*
	 * A DTO failed this will cause the connection to be broken
	 */
	if ((dto_error) && (ep_ptr->param.ep_state == DAT_EP_STATE_CONNECTED)) {
		ep_ptr->param.ep_state = DAT_EP_STATE_DISCONNECTED;
		/*
		 * Disconnect at the IB level.
		 */
		dapls_ib_disconnect_clean(ep_ptr, DAT_TRUE, IB_CME_CONNECTED);
	}
	/* convert premature rec to error flush on disconnect */
	if (process_premature_events && (ep_ptr->param.ep_state ==
	    DAT_EP_STATE_DISCONNECTED) && (ib_status == IB_COMP_ST_SUCCESS)) {
		dapl_os_assert(ibtype == OP_RECEIVE &&
		    cookie->val.dto.type == DAPL_DTO_TYPE_RECV);
		event_ptr->event_data.dto_completion_event_data.status =
		    DAT_DTO_ERR_FLUSHED;
	}
	return (DAT_TRUE);
}

/*
 * dapls_evd_copy_cq
 *
 * Copy all entries on a CQ associated with the EVD onto that EVD
 * Up to caller to handle races, if any.  Note that no EVD waiters will
 * be awoken by this copy.
 *
 * Input:
 *	evd_ptr
 *
 * Output:
 * 	nevents
 *
 * Returns:
 * 	none
 *
 */
void
dapls_evd_copy_cq(
	DAPL_EVD	*evd_ptr,
	int		*nevents)
{
	ib_work_completion_t	cqe[MAX_CQES_PER_POLL];
	DAT_RETURN		dat_status;
	ib_cq_handle_t		cq_handle;
	DAT_EVENT		*event;
	uint_t			num_cqes_polled = 0;
	int			cqe_events;
	int			i;

	cq_handle = evd_ptr->ib_cq_handle;

	*nevents = 0;

	if (cq_handle == IB_INVALID_HANDLE) {
		/* Nothing to do if no CQ.  */
		return;
	}
	dat_status = DAPL_POLL(evd_ptr)(cq_handle,
	    cqe, MAX_CQES_PER_POLL, &num_cqes_polled);

	if (dat_status == DAT_SUCCESS) {
		dapl_dbg_log(DAPL_DBG_TYPE_EVD, "dapls_evd_copy_cq: %u\n",
		    num_cqes_polled);
		cqe_events = 0;
		for (i = 0; i < num_cqes_polled; i++) {
#ifdef	DAPL_DBG	/* For debugging.  */
			dapli_evd_eh_print_cqe(cqe[i]);
#endif

			/*
			 * Can use DAT_DTO_COMPLETION_EVENT because
			 * dapli_evd_cqe_to_event will overwrite.
			 */

			event = dapli_evd_get_and_init_event(
			    evd_ptr, DAT_DTO_COMPLETION_EVENT);
			if (event == NULL) {
			/*
			 * We've already attempted the overflow post; return.
			 */
				return;
			}
			if (dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_FALSE,
			    event)) {
				dapli_evd_post_event_nosignal(evd_ptr, event);
				cqe_events++;
			} else {
				dapl_dbg_log(DAPL_DBG_TYPE_EVD,
				    "dapls_evd_copy_cq: premature event\n");
				/*
				 * We've deferred processing the CQE, so add
				 * the event_ptr back to free queue
				 */
				dat_status = dapls_rbuf_add(&evd_ptr->
				    free_event_queue, (void *)event);
				dapl_os_assert(dat_status == DAT_SUCCESS);
				if (evd_ptr->evd_producer_locking_needed) {
					dapl_os_unlock(&evd_ptr->header.lock);
				}
			}
		}
		*nevents = cqe_events;
	} else if (DAT_GET_TYPE(dat_status) != DAT_QUEUE_EMPTY) {
		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
		    "dapls_evd_copy_cq: dapls_ib_completion_poll "
		    "returned 0x%x\n", dat_status);
		dapl_os_assert(!"Bad return from dapls_ib_completion_poll");
	}
}

/*
 * dapls_evd_copy_events
 *
 * Copy all events associated with the EVD onto that EVD
 *
 * Input:
 *	evd_ptr
 *	timeout
 *
 * Output:
 * 	return status
 *
 * Returns:
 * 	none
 *
 */
DAT_RETURN
dapls_evd_copy_events(
    DAPL_EVD 	*evd_ptr,
    DAT_TIMEOUT timeout)
{
	dapl_ib_event_t	evp_arr[NUM_EVENTS_PER_POLL];
	dapl_ib_event_t	*evpp_start;
	dapl_ib_event_t	*evpp;
	DAPL_IA		*ia_ptr;
	DAT_RETURN	dat_status;
	int		waited;
	uint64_t	curr_time;
	uint64_t	final_time;
	uint64_t	time_left;
	int		events_needed = 0;
	int		nevents = 0;
	int		num_cqe = 0;
	int		num_ke = 0; /* kernel events - CM or ASYNC events */
	int		i;

	/* rbuf count is zero on entry */

	if (evd_ptr->evd_flags & (DAT_EVD_CONNECTION_FLAG |
	    DAT_EVD_CR_FLAG | DAT_EVD_ASYNC_FLAG)) {
		if (evd_ptr->threshold <= NUM_EVENTS_PER_POLL) {
			evpp = evp_arr;
		} else {
			/* need to allocate on the heap */
			evpp = (dapl_ib_event_t *)dapl_os_alloc(
			    evd_ptr->threshold * sizeof (dapl_ib_event_t));
			if (evpp == NULL) {
				return (DAT_INSUFFICIENT_RESOURCES);
			}
		}
		evpp_start = evpp;
		/* for evd_dequeue, check for ke before returning Q_EMPTY */
		if (evd_ptr->threshold == 0 && timeout == 0)
			evd_ptr->threshold = 1;
	} else {
		evpp = NULL;
		evpp_start = NULL;
	}
	ia_ptr = evd_ptr->header.owner_ia;
	waited = 0;
	dat_status = DAT_SUCCESS;

	/* calculate various time wait elements */
	if (timeout == 0) {
		final_time = 0;
		time_left = 0;
	} else if (timeout == DAT_TIMEOUT_INFINITE) {
		/*
		 * The real value of DAT_TIMEOUT_INFINITE is fairly small
		 * ~71 mins, to prevent premature timeouts map it to
		 * 1 year.  NOTE: 64-bit integers are needed here
		 * because 32 bits is not enough.  Other types,
		 * such as clock_t are not 64-bit, so are not
		 * sufficient for this.  Similarly, hrtime_t is
		 * defined as a "nanosecond counter", which does not
		 * match our need for time in microseconds, so we
		 * just use the more general uint64_t here.
		 */
#define	DAPL_ONE_YEAR_IN_USEC	((365 * 24 * 3600) * 1000000LL)
		curr_time = gethrtime();
		time_left = DAPL_ONE_YEAR_IN_USEC;
		final_time = curr_time + DAPL_ONE_YEAR_IN_USEC * 1000;
	} else {
		/*
		 * maximum time by which the routine needs to return
		 * DAT_TIMEOUT_INFINITE is defined as ~0 but its of type int
		 * so mask the MSB to avoid overflow
		 */
		curr_time = gethrtime();
		final_time = curr_time + (uint64_t)(timeout&0x7fffffff)*1000;
		time_left = (final_time - curr_time)/1000;
	}

	do {
		/*
		 * If this evd has a CQ event stream check the CQs first
		 */
		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
		    DAT_EVD_RMR_BIND_FLAG)) {
			/*
			 * Poll CQ for events, update the total number of CQEs
			 * so far
			 */
			nevents = 0;
			dapls_evd_copy_cq(evd_ptr, &nevents);
			num_cqe += nevents;
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: copy_cq num_cqe(%d)\n",
			    num_cqe);
		}

		/*
		 * We use the dapls_rbuf_count since it includes
		 *  - CQ events pulled by dapls_evd_copy_cq
		 *  - events added by dat_evd_post_se()
		 */
		events_needed = evd_ptr->threshold - num_ke -
		    dapls_rbuf_count(&evd_ptr->pending_event_queue);

		/*
		 * check for pending events
		 * note: threshold=0 implies dapl_evd_dequeue
		 */
		if (events_needed < 0) {
			/* There are more than sufficient events */
			break;
		} else if (events_needed == 0) {
			/* report queue empty on dat_evd_dequeue */
			/* non CQ events are expected to be polled */
			/* by dat_evd_wait */
			if (evd_ptr->threshold == 0)
				dat_status =  DAT_ERROR(DAT_QUEUE_EMPTY, 0);
			/*
			 * when threshold > 0, we have sufficient events
			 */
			break;
		} else {
			/*
			 * when we reach here, this implies dat_evd_wait
			 * return on any dto completion as
			 * threshold > 1 will be taken as hint only
			 */
			if (num_cqe)
				break;
		}

		/* check we've already waited */
		if (waited > 0) {
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: waited[%d]\n", waited);
			if (dat_status != DAT_SUCCESS)
				break;
			curr_time = gethrtime();
			/* exit on time expired */
			if (curr_time >= final_time)
				break;
			time_left = (final_time - curr_time)/1000;
		}

		/* check for DTO type evd's */
		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
		    DAT_EVD_RMR_BIND_FLAG)) {
			if (events_needed == 1) {
				/*
				 * Need only one event so enable cq
				 * notification
				 */
				/*
				 * XXX: Things need to be modified here to
				 * implement the NOTIFICATION suppression
				 * correctly - relies on THRESHOLD flag
				 * and UNSIGNALLED flag to be stored
				 * in the evd.
				 */
				dat_status = dapls_set_cq_notify(ia_ptr,
				    evd_ptr);
				if (dat_status != DAT_SUCCESS) {
					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
					    "dapls_evd_copy_event:"
					    " set_cq_notify(%d)\n", dat_status);
					return (dat_status);
				}
			} else if (events_needed > 1) {
				/*
				 * We need multiple events so lets enable CQ for
				 * notification on N events.
				 * dat_status = dapls_set_cqN_notify(ia_ptr,
				 * evd_ptr, (uint32_t)events_needed);
				 */
				dat_status = dapls_set_cq_notify(ia_ptr,
				    evd_ptr);
				if (dat_status != DAT_SUCCESS) {
					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
					    "dapls_evd_copy_event:"
					    " set_cqN_notify:%d\n", dat_status);
					return (dat_status);
				}
			}

			/*
			 * Per Tavor PRM if completions occur after polling
			 * the CQ and before arming it, upon arming the CQ
			 * handler will be immediately fired. Hence it
			 * recommends that a re-poll of the CQ can be skipped
			 * as an optimization.
			 */
		}

		nevents = 0;

		/*
		 * non-NULL evpp_start denotes either
		 * DAT_EVD_CONNECTION_FLAG, DAT_EVD_CR_FLAG, DAT_EVD_ASYNC_FLAG
		 * is set and thus needs to check events from kernel
		 */
		if (evpp_start) {
			/*
			 * Even if dat_status is not DAT_SUCCESS, num_events
			 * could be non-zero.
			 */
			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
			    (evd_ptr->threshold - (num_cqe + num_ke)), evpp,
			    &nevents);
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: poll returned 0x%x(%d)\n",
			    dat_status, nevents);

			num_ke += nevents;
			evpp += nevents;
		} else {
			/* perform a timewait */
			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
			    0, NULL, &nevents);
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: poll(cq_notification) "
			    "returned 0x%x\n", dat_status);
			if (DAT_GET_TYPE(dat_status) == DAT_INTERRUPTED_CALL)
				return (dat_status);
		}

		waited++;
	} while (dapls_rbuf_count(&evd_ptr->pending_event_queue) + num_ke <
	    evd_ptr->threshold);

	/* process the cm events now */
	for (i = 0; i < num_ke; i++) {
		switch (evpp_start[i].ibe_ev_family) {
		case DAPL_CR_EVENTS: /* PASSIVE side events */
		case DAPL_PASSIVE_CONNECTION_EVENTS:
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: Passive side Event %d\n",
			    evpp_start[i].ibe_ce.ibce_event);
			dapls_cr_callback((ib_cm_handle_t)
			    evpp_start[i].ibe_ce.ibce_psep_cookie,
			    evpp_start[i].ibe_ce.ibce_event,
			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
			break;
		case DAPL_ACTIVE_CONNECTION_EVENTS: /* ACTIVE side events */
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: Active Conn Event %d\n",
			    evpp_start[i].ibe_ce.ibce_event);
			dapl_evd_connection_callback((ib_cm_handle_t)
			    IB_INVALID_HANDLE,
			    evpp_start[i].ibe_ce.ibce_event,
			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
			break;
		case DAPL_ASYNC_EVENTS:
			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
			    "dapls_evd_copy_event: Async Event %d\n",
			    evpp_start[i].ibe_async.ibae_type);
			dapls_ib_async_callback(evd_ptr,
			    ia_ptr->hca_ptr->ib_hca_handle,
			    &(evpp_start[i].ibe_async), ia_ptr);
			break;
		default:
			dapl_dbg_log(DAPL_DBG_TYPE_ERR,
			    "dapls_evd_copy_event: dapls_ib_event_poll %d "
			    "returned 0x%x\n", i, evpp_start[i].ibe_ev_family);
			dapl_os_assert(!"Bad return from dapls_ib_event_poll");
			break;
		}
	}

	return (dat_status);
}

/*
 * dapls_evd_cq_poll_to_event
 *
 * Attempt to dequeue a single CQE from a CQ and turn it into
 * an event.
 *
 * Input:
 *	evd_ptr
 *
 * Output:
 * 	event
 *
 * Returns:
 * 	Status of operation
 *
 */
DAT_RETURN
dapls_evd_cq_poll_to_event(
    IN DAPL_EVD 	*evd_ptr,
    OUT DAT_EVENT	*event)
{
	DAT_RETURN		dat_status;
	ib_work_completion_t	cur_cqe;

	/* skip one layer of do-nothing function */
	dat_status = DAPL_POLL1(evd_ptr)(evd_ptr->ib_cq_handle, &cur_cqe);

	if (dat_status == DAT_SUCCESS) {
#ifdef	DAPL_DBG	/* For debugging.  */
		dapli_evd_eh_print_cqe(cur_cqe);
#endif
		(void) dapli_evd_cqe_to_event(evd_ptr, &cur_cqe, DAT_FALSE,
		    event);
	}

	return (dat_status);
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 *  tab-width: 8
 * End:
 */