OpenSolaris_b135/uts/common/rpc/clnt_rdma.c

Compare this file to the similar file:
Show the results in this format:

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
 * Portions of this source code were derived from Berkeley
 * 4.3 BSD under license from the Regents of the University of
 * California.
 */

#include <sys/param.h>
#include <sys/types.h>
#include <sys/user.h>
#include <sys/systm.h>
#include <sys/sysmacros.h>
#include <sys/errno.h>
#include <sys/kmem.h>
#include <sys/debug.h>
#include <sys/systm.h>
#include <sys/kstat.h>
#include <sys/t_lock.h>
#include <sys/ddi.h>
#include <sys/cmn_err.h>
#include <sys/time.h>
#include <sys/isa_defs.h>
#include <sys/zone.h>
#include <sys/sdt.h>

#include <rpc/types.h>
#include <rpc/xdr.h>
#include <rpc/auth.h>
#include <rpc/clnt.h>
#include <rpc/rpc_msg.h>
#include <rpc/rpc_rdma.h>
#include <nfs/nfs.h>
#include <nfs/nfs4_kprot.h>

static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;

static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
			    XDR *, xdrproc_t, caddr_t);
static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
		    XDR **, uint_t *);
static int clnt_setup_rlist(CONN *, XDR *, XDR *);
static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
static void clnt_check_credit(CONN *);
static void clnt_return_credit(CONN *);
static void clnt_decode_long_reply(CONN *, struct clist *,
		struct clist *, XDR *, XDR **, struct clist *,
		struct clist *, uint_t, uint_t);

static void clnt_update_credit(CONN *, uint32_t);

static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
    caddr_t, xdrproc_t, caddr_t, struct timeval);
static void	clnt_rdma_kabort(CLIENT *);
static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
static void	clnt_rdma_kdestroy(CLIENT *);
static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
    struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);

/*
 * Operations vector for RDMA based RPC
 */
static struct clnt_ops rdma_clnt_ops = {
	clnt_rdma_kcallit,	/* do rpc call */
	clnt_rdma_kabort,	/* abort call */
	clnt_rdma_kerror,	/* return error status */
	clnt_rdma_kfreeres,	/* free results */
	clnt_rdma_kdestroy,	/* destroy rpc handle */
	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
	clnt_rdma_ksettimers,	/* set retry timers */
};

/*
 * The size of the preserialized RPC header information.
 */
#define	CKU_HDRSIZE	20
#define	CLNT_RDMA_SUCCESS 0
#define	CLNT_RDMA_FAIL (-1)

#define	AUTH_REFRESH_COUNT 2

#define	IS_RPCSEC_GSS(authh)			\
	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)

/*
 * Per RPC RDMA endpoint details
 */
typedef struct cku_private {
	CLIENT			cku_client;	/* client handle */
	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
	void			*cku_rd_handle;	/* underlying RDMA device */
	struct netbuf		cku_srcaddr;	/* source address for retries */
	struct netbuf		cku_addr;	/* remote netbuf address */
	int			cku_addrfmly;	/* for finding addr_type */
	struct rpc_err		cku_err;	/* error status */
	struct cred		*cku_cred;	/* credentials */
	XDR			cku_outxdr;	/* xdr stream for output */
	uint32_t		cku_outsz;
	XDR			cku_inxdr;	/* xdr stream for input */
	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
	uint32_t		cku_xid;	/* current XID */
} cku_private_t;

#define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;

struct {
	kstat_named_t	rccalls;
	kstat_named_t	rcbadcalls;
	kstat_named_t	rcbadxids;
	kstat_named_t	rctimeouts;
	kstat_named_t	rcnewcreds;
	kstat_named_t	rcbadverfs;
	kstat_named_t	rctimers;
	kstat_named_t	rccantconn;
	kstat_named_t	rcnomem;
	kstat_named_t	rcintrs;
	kstat_named_t	rclongrpcs;
} rdmarcstat = {
	{ "calls",	KSTAT_DATA_UINT64 },
	{ "badcalls",	KSTAT_DATA_UINT64 },
	{ "badxids",	KSTAT_DATA_UINT64 },
	{ "timeouts",	KSTAT_DATA_UINT64 },
	{ "newcreds",	KSTAT_DATA_UINT64 },
	{ "badverfs",	KSTAT_DATA_UINT64 },
	{ "timers",	KSTAT_DATA_UINT64 },
	{ "cantconn",	KSTAT_DATA_UINT64 },
	{ "nomem",	KSTAT_DATA_UINT64 },
	{ "interrupts", KSTAT_DATA_UINT64 },
	{ "longrpc", 	KSTAT_DATA_UINT64 }
};

kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);

#ifdef DEBUG
int rdma_clnt_debug = 0;
#endif

#ifdef accurate_stats
extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */

#define	RCSTAT_INCR(x)			\
	mutex_enter(&rdmarcstat_lock);	\
	rdmarcstat.x.value.ui64++;	\
	mutex_exit(&rdmarcstat_lock);
#else
#define	RCSTAT_INCR(x)			\
	rdmarcstat.x.value.ui64++;
#endif

#define	ptoh(p)		(&((p)->cku_client))
#define	htop(h)		((cku_private_t *)((h)->cl_private))

uint_t
calc_length(uint_t len)
{
	len = RNDUP(len);

	if (len <= 64 * 1024) {
		if (len > 32 * 1024) {
			len = 64 * 1024;
		} else {
			if (len > 16 * 1024) {
				len = 32 * 1024;
			} else {
				if (len > 8 * 1024) {
					len = 16 * 1024;
				} else {
					len = 8 * 1024;
				}
			}
		}
	}
	return (len);
}
int
clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
    rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
{
	CLIENT *h;
	struct cku_private *p;
	struct rpc_msg call_msg;
	rdma_registry_t *rp;

	ASSERT(INGLOBALZONE(curproc));

	if (cl == NULL)
		return (EINVAL);
	*cl = NULL;

	p = kmem_zalloc(sizeof (*p), KM_SLEEP);

	/*
	 * Find underlying RDMATF plugin
	 */
	rw_enter(&rdma_lock, RW_READER);
	rp = rdma_mod_head;
	while (rp != NULL) {
		if (strcmp(rp->r_mod->rdma_api, proto))
			rp = rp->r_next;
		else {
			p->cku_rd_mod = rp->r_mod;
			p->cku_rd_handle = handle;
			break;
		}
	}
	rw_exit(&rdma_lock);

	if (p->cku_rd_mod == NULL) {
		/*
		 * Should not happen.
		 * No matching RDMATF plugin.
		 */
		kmem_free(p, sizeof (struct cku_private));
		return (EINVAL);
	}

	h = ptoh(p);
	h->cl_ops = &rdma_clnt_ops;
	h->cl_private = (caddr_t)p;
	h->cl_auth = authkern_create();

	/* call message, just used to pre-serialize below */
	call_msg.rm_xid = 0;
	call_msg.rm_direction = CALL;
	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
	call_msg.rm_call.cb_prog = pgm;
	call_msg.rm_call.cb_vers = vers;

	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
	/* pre-serialize call message header */
	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
		XDR_DESTROY(&p->cku_outxdr);
		auth_destroy(h->cl_auth);
		kmem_free(p, sizeof (struct cku_private));
		return (EINVAL);
	}

	/*
	 * Set up the rpc information
	 */
	p->cku_cred = cred;
	p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
	p->cku_srcaddr.maxlen = raddr->maxlen;
	p->cku_srcaddr.len = 0;
	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
	p->cku_addr.maxlen = raddr->maxlen;
	p->cku_addr.len = raddr->len;
	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
	p->cku_addrfmly = family;

	*cl = h;
	return (0);
}

static void
clnt_rdma_kdestroy(CLIENT *h)
{
	struct cku_private *p = htop(h);

	kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
	kmem_free(p, sizeof (*p));
}

void
clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
    struct cred *cred)
{
	struct cku_private *p = htop(h);
	rdma_registry_t *rp;

	ASSERT(INGLOBALZONE(curproc));
	/*
	 * Find underlying RDMATF plugin
	 */
	p->cku_rd_mod = NULL;
	rw_enter(&rdma_lock, RW_READER);
	rp = rdma_mod_head;
	while (rp != NULL) {
		if (strcmp(rp->r_mod->rdma_api, proto))
			rp = rp->r_next;
		else {
			p->cku_rd_mod = rp->r_mod;
			p->cku_rd_handle = handle;
			break;
		}

	}
	rw_exit(&rdma_lock);

	/*
	 * Set up the rpc information
	 */
	p->cku_cred = cred;
	p->cku_xid = 0;

	if (p->cku_addr.maxlen < raddr->len) {
		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
		p->cku_addr.maxlen = raddr->maxlen;
	}

	p->cku_srcaddr.len = 0;

	p->cku_addr.len = raddr->len;
	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
	h->cl_ops = &rdma_clnt_ops;
}

static int
clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
    rdma_buf_t *rpcmsg, XDR *xdrs,
    xdrproc_t xdr_args, caddr_t argsp)
{
	cku_private_t *p = htop(h);

	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
		/*
		 * Copy in the preserialized RPC header
		 * information.
		 */
		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);

		/*
		 * transaction id is the 1st thing in the output
		 * buffer.
		 */
		/* LINTED pointer alignment */
		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;

		/* Skip the preserialized stuff. */
		XDR_SETPOS(xdrs, CKU_HDRSIZE);

		/* Serialize dynamic stuff into the output buffer. */
		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
		    (!(*xdr_args)(xdrs, argsp))) {
			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
			return (CLNT_RDMA_FAIL);
		}
		p->cku_outsz = XDR_GETPOS(xdrs);
	} else {
		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
		IXDR_PUT_U_INT32(uproc, procnum);
		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
		XDR_SETPOS(xdrs, 0);

		/* Serialize the procedure number and the arguments. */
		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
			if (rpcmsg->addr != xdrs->x_base) {
				rpcmsg->addr = xdrs->x_base;
				rpcmsg->len = xdr_getbufsize(xdrs);
			}
			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
			return (CLNT_RDMA_FAIL);
		}
		/*
		 * If we had to allocate a new buffer while encoding
		 * then update the addr and len.
		 */
		if (rpcmsg->addr != xdrs->x_base) {
			rpcmsg->addr = xdrs->x_base;
			rpcmsg->len = xdr_getbufsize(xdrs);
		}

		p->cku_outsz = XDR_GETPOS(xdrs);
		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
	}

	return (CLNT_RDMA_SUCCESS);
}

static int
clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
    XDR **xdrs, uint_t *op)
{
	cku_private_t *p = htop(h);
	uint_t vers;
	uint32_t rdma_credit = rdma_bufs_rqst;

	vers = RPCRDMA_VERS;
	clmsg->type = SEND_BUFFER;

	if (rdma_buf_alloc(conn, clmsg)) {
		return (CLNT_RDMA_FAIL);
	}

	*xdrs = &p->cku_outxdr;
	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);

	(*(uint32_t *)clmsg->addr) = p->cku_xid;
	XDR_SETPOS(*xdrs, sizeof (uint32_t));
	(void) xdr_u_int(*xdrs, &vers);
	(void) xdr_u_int(*xdrs, &rdma_credit);
	(void) xdr_u_int(*xdrs, op);

	return (CLNT_RDMA_SUCCESS);
}

/*
 * If xp_cl is NULL value, then the RPC payload will NOT carry
 * an RDMA READ chunk list, in this case we insert FALSE into
 * the XDR stream. Otherwise we use the clist and RDMA register
 * the memory and encode the clist into the outbound XDR stream.
 */
static int
clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
{
	int status;
	struct clist *rclp;
	int32_t xdr_flag = XDR_RDMA_RLIST_REG;

	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);

	if (rclp != NULL) {
		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
		if (status != RDMA_SUCCESS) {
			return (CLNT_RDMA_FAIL);
		}
		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
	}
	(void) xdr_do_clist(xdrs, &rclp);

	return (CLNT_RDMA_SUCCESS);
}

/*
 * If xp_wcl is NULL value, then the RPC payload will NOT carry
 * an RDMA WRITE chunk list, in this case we insert FALSE into
 * the XDR stream. Otherwise we use the clist and  RDMA register
 * the memory and encode the clist into the outbound XDR stream.
 */
static int
clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
{
	int status;
	struct clist *wlist, *rndcl;
	int wlen, rndlen;
	int32_t xdr_flag = XDR_RDMA_WLIST_REG;

	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);

	if (wlist != NULL) {
		/*
		 * If we are sending a non 4-byte alligned length
		 * the server will roundup the length to 4-byte
		 * boundary. In such a case, a trailing chunk is
		 * added to take any spill over roundup bytes.
		 */
		wlen = clist_len(wlist);
		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
		if (rndlen) {
			rndcl = clist_alloc();
			/*
			 * calc_length() will allocate a PAGESIZE
			 * buffer below.
			 */
			rndcl->c_len = calc_length(rndlen);
			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
			rndcl->rb_longbuf.len = rndcl->c_len;
			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
				clist_free(rndcl);
				return (CLNT_RDMA_FAIL);
			}

			/* Roundup buffer freed back in caller */
			*rndbuf = rndcl->rb_longbuf;

			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
			rndcl->c_next = NULL;
			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
			wlist->c_next = rndcl;
		}

		status = clist_register(conn, wlist, CLIST_REG_DST);
		if (status != RDMA_SUCCESS) {
			rdma_buf_free(conn, rndbuf);
			bzero(rndbuf, sizeof (rdma_buf_t));
			return (CLNT_RDMA_FAIL);
		}
		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
	}

	if (!xdr_encode_wlist(xdrs, wlist)) {
		if (rndlen) {
			rdma_buf_free(conn, rndbuf);
			bzero(rndbuf, sizeof (rdma_buf_t));
		}
		return (CLNT_RDMA_FAIL);
	}

	return (CLNT_RDMA_SUCCESS);
}

static int
clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
{
	if (length == 0) {
		*clpp = NULL;
		return (CLNT_RDMA_SUCCESS);
	}

	*clpp = clist_alloc();

	(*clpp)->rb_longbuf.len = calc_length(length);
	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;

	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
		clist_free(*clpp);
		*clpp = NULL;
		return (CLNT_RDMA_FAIL);
	}

	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
	(*clpp)->c_next = NULL;
	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;

	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
		clist_free(*clpp);
		*clpp = NULL;
		return (CLNT_RDMA_FAIL);
	}

	return (CLNT_RDMA_SUCCESS);
}

/* ARGSUSED */
static enum clnt_stat
clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
    caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
    struct timeval wait)
{
	cku_private_t *p = htop(h);

	int 	try_call_again;
	int	refresh_attempt = AUTH_REFRESH_COUNT;
	int 	status;
	int 	msglen;

	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;

	struct rpc_msg 	reply_msg;
	rdma_registry_t	*m;

	struct clist *cl_sendlist;
	struct clist *cl_recvlist;
	struct clist *cl;
	struct clist *cl_rpcmsg;
	struct clist *cl_rdma_reply;
	struct clist *cl_rpcreply_wlist;
	struct clist *cl_long_reply;
	rdma_buf_t  rndup;

	uint_t vers;
	uint_t op;
	uint_t off;
	uint32_t seg_array_len;
	uint_t long_reply_len;
	uint_t rpcsec_gss;
	uint_t gss_i_or_p;

	CONN *conn = NULL;
	rdma_buf_t clmsg;
	rdma_buf_t rpcmsg;
	rdma_chunkinfo_lengths_t rcil;

	clock_t	ticks;
	bool_t wlist_exists_reply;

	uint32_t rdma_credit = rdma_bufs_rqst;

	RCSTAT_INCR(rccalls);

call_again:

	bzero(&clmsg, sizeof (clmsg));
	bzero(&rpcmsg, sizeof (rpcmsg));
	bzero(&rndup, sizeof (rndup));
	try_call_again = 0;
	cl_sendlist = NULL;
	cl_recvlist = NULL;
	cl = NULL;
	cl_rpcmsg = NULL;
	cl_rdma_reply = NULL;
	call_xdrp = NULL;
	reply_xdrp = NULL;
	wlist_exists_reply  = FALSE;
	cl_rpcreply_wlist = NULL;
	cl_long_reply = NULL;
	rcil.rcil_len = 0;
	rcil.rcil_len_alt = 0;
	long_reply_len = 0;

	rw_enter(&rdma_lock, RW_READER);
	m = (rdma_registry_t *)p->cku_rd_handle;
	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
		/*
		 * If we didn't find a matching RDMA module in the registry
		 * then there is no transport.
		 */
		rw_exit(&rdma_lock);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
		if (h->cl_nosignal == TRUE) {
			delay(ticks);
		} else {
			if (delay_sig(ticks) == EINTR) {
				p->cku_err.re_status = RPC_INTR;
				p->cku_err.re_errno = EINTR;
			}
		}
		return (RPC_CANTSEND);
	}
	/*
	 * Get unique xid
	 */
	if (p->cku_xid == 0)
		p->cku_xid = alloc_xid();

	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
	    &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
	rw_exit(&rdma_lock);

	/*
	 * If there is a problem with the connection reflect the issue
	 * back to the higher level to address, we MAY delay for a short
	 * period so that we are kind to the transport.
	 */
	if (conn == NULL) {
		/*
		 * Connect failed to server. Could be because of one
		 * of several things. In some cases we don't want
		 * the caller to retry immediately - delay before
		 * returning to caller.
		 */
		switch (status) {
		case RDMA_TIMEDOUT:
			/*
			 * Already timed out. No need to delay
			 * some more.
			 */
			p->cku_err.re_status = RPC_TIMEDOUT;
			p->cku_err.re_errno = ETIMEDOUT;
			break;
		case RDMA_INTR:
			/*
			 * Failed because of an signal. Very likely
			 * the caller will not retry.
			 */
			p->cku_err.re_status = RPC_INTR;
			p->cku_err.re_errno = EINTR;
			break;
		default:
			/*
			 * All other failures - server down or service
			 * down or temporary resource failure. Delay before
			 * returning to caller.
			 */
			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
			p->cku_err.re_status = RPC_CANTCONNECT;
			p->cku_err.re_errno = EIO;

			if (h->cl_nosignal == TRUE) {
				delay(ticks);
			} else {
				if (delay_sig(ticks) == EINTR) {
					p->cku_err.re_status = RPC_INTR;
					p->cku_err.re_errno = EINTR;
				}
			}
			break;
		}

		return (p->cku_err.re_status);
	}

	if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
		if ((p->cku_srcaddr.maxlen != 0) &&
		    (p->cku_srcaddr.buf != NULL))
			kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
		p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
		    KM_SLEEP);
		p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
	}

	p->cku_srcaddr.len = conn->c_laddr.len;
	bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);

	clnt_check_credit(conn);

	status = CLNT_RDMA_FAIL;

	rpcsec_gss = gss_i_or_p = FALSE;

	if (IS_RPCSEC_GSS(h)) {
		rpcsec_gss = TRUE;
		if (rpc_gss_get_service_type(h->cl_auth) ==
		    rpc_gss_svc_integrity ||
		    rpc_gss_get_service_type(h->cl_auth) ==
		    rpc_gss_svc_privacy)
			gss_i_or_p = TRUE;
	}

	/*
	 * Try a regular RDMA message if RPCSEC_GSS is not being used
	 * or if RPCSEC_GSS is being used for authentication only.
	 */
	if (rpcsec_gss == FALSE ||
	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
		/*
		 * Grab a send buffer for the request.  Try to
		 * encode it to see if it fits. If not, then it
		 * needs to be sent in a chunk.
		 */
		rpcmsg.type = SEND_BUFFER;
		if (rdma_buf_alloc(conn, &rpcmsg)) {
			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
			goto done;
		}

		/* First try to encode into regular send buffer */
		op = RDMA_MSG;

		call_xdrp = &callxdr;

		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
		    rdma_minchunk, NULL, XDR_ENCODE, conn);

		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
		    xdr_args, argsp);

		if (status != CLNT_RDMA_SUCCESS) {
			/* Clean up from previous encode attempt */
			rdma_buf_free(conn, &rpcmsg);
			XDR_DESTROY(call_xdrp);
		} else {
			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
		}
	}

	/* If the encode didn't work, then try a NOMSG */
	if (status != CLNT_RDMA_SUCCESS) {

		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
		    xdr_sizeof(xdr_args, argsp);

		msglen = calc_length(msglen);

		/* pick up the lengths for the reply buffer needed */
		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
		    &rcil.rcil_len, &rcil.rcil_len_alt);

		/*
		 * Construct a clist to describe the CHUNK_BUFFER
		 * for the rpcmsg.
		 */
		cl_rpcmsg = clist_alloc();
		cl_rpcmsg->c_len = msglen;
		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
		cl_rpcmsg->rb_longbuf.len = msglen;
		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
			clist_free(cl_rpcmsg);
			goto done;
		}
		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;

		op = RDMA_NOMSG;
		call_xdrp = &callxdr;

		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
		    cl_rpcmsg->rb_longbuf.len, 0,
		    cl_rpcmsg, XDR_ENCODE, conn);

		status = clnt_compose_rpcmsg(h, procnum, &cl_rpcmsg->rb_longbuf,
		    call_xdrp, xdr_args, argsp);

		DTRACE_PROBE2(krpc__i__clntrdma__callit__longbuf, int, status,
		    int, msglen);
		if (status != CLNT_RDMA_SUCCESS) {
			p->cku_err.re_status = RPC_CANTENCODEARGS;
			p->cku_err.re_errno = EIO;
			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
			goto done;
		}
	}

	/*
	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
	 * RDMA WRITE clist.
	 *
	 * First pull the RDMA READ chunk list from the XDR private
	 * area to keep it handy.
	 */
	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);

	if (gss_i_or_p) {
		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
		long_reply_len += MAX_AUTH_BYTES;
	} else {
		long_reply_len = rcil.rcil_len;
	}

	/*
	 * Update the chunk size information for the Long RPC msg.
	 */
	if (cl && op == RDMA_NOMSG)
		cl->c_len = p->cku_outsz;

	/*
	 * Prepare the RDMA header. On success xdrs will hold the result
	 * of xdrmem_create() for a SEND_BUFFER.
	 */
	status = clnt_compose_rdma_header(conn, h, &clmsg,
	    &rdmahdr_o_xdrs, &op);

	if (status != CLNT_RDMA_SUCCESS) {
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		RCSTAT_INCR(rcnomem);
		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
		goto done;
	}

	/*
	 * Now insert the RDMA READ list iff present
	 */
	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
	if (status != CLNT_RDMA_SUCCESS) {
		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
		rdma_buf_free(conn, &clmsg);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * Setup RDMA WRITE chunk list for nfs read operation
	 * other operations will have a NULL which will result
	 * as a NULL list in the XDR stream.
	 */
	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
	if (status != CLNT_RDMA_SUCCESS) {
		rdma_buf_free(conn, &clmsg);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * If NULL call and RPCSEC_GSS, provide a chunk such that
	 * large responses can flow back to the client.
	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
	 */
	if ((procnum == 0 && rpcsec_gss == TRUE) ||
	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
		long_reply_len += 1024;

	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);

	DTRACE_PROBE2(krpc__i__clntrdma__callit__longreply, int, status,
	    int, long_reply_len);

	if (status != CLNT_RDMA_SUCCESS) {
		rdma_buf_free(conn, &clmsg);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * XDR encode the RDMA_REPLY write chunk
	 */
	seg_array_len = (cl_long_reply ? 1 : 0);
	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
	    seg_array_len);

	/*
	 * Construct a clist in "sendlist" that represents what we
	 * will push over the wire.
	 *
	 * Start with the RDMA header and clist (if any)
	 */
	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
	    clmsg.addr, NULL, NULL);

	/*
	 * Put the RPC call message in  sendlist if small RPC
	 */
	if (op == RDMA_MSG) {
		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
		    rpcmsg.addr, NULL, NULL);
	} else {
		/* Long RPC already in chunk list */
		RCSTAT_INCR(rclongrpcs);
	}

	/*
	 * Set up a reply buffer ready for the reply
	 */
	status = rdma_clnt_postrecv(conn, p->cku_xid);
	if (status != RDMA_SUCCESS) {
		rdma_buf_free(conn, &clmsg);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * sync the memory for dma
	 */
	if (cl != NULL) {
		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
		if (status != RDMA_SUCCESS) {
			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
			rdma_buf_free(conn, &clmsg);
			p->cku_err.re_status = RPC_CANTSEND;
			p->cku_err.re_errno = EIO;
			goto done;
		}
	}

	/*
	 * Send the RDMA Header and RPC call message to the server
	 */
	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
	if (status != RDMA_SUCCESS) {
		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
		p->cku_err.re_status = RPC_CANTSEND;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * RDMA plugin now owns the send msg buffers.
	 * Clear them out and don't free them.
	 */
	clmsg.addr = NULL;
	if (rpcmsg.type == SEND_BUFFER)
		rpcmsg.addr = NULL;

	/*
	 * Recv rpc reply
	 */
	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);

	/*
	 * Now check recv status
	 */
	if (status != 0) {
		if (status == RDMA_INTR) {
			p->cku_err.re_status = RPC_INTR;
			p->cku_err.re_errno = EINTR;
			RCSTAT_INCR(rcintrs);
		} else if (status == RPC_TIMEDOUT) {
			p->cku_err.re_status = RPC_TIMEDOUT;
			p->cku_err.re_errno = ETIMEDOUT;
			RCSTAT_INCR(rctimeouts);
		} else {
			p->cku_err.re_status = RPC_CANTRECV;
			p->cku_err.re_errno = EIO;
		}
		goto done;
	}

	/*
	 * Process the reply message.
	 *
	 * First the chunk list (if any)
	 */
	rdmahdr_i_xdrs = &(p->cku_inxdr);
	xdrmem_create(rdmahdr_i_xdrs,
	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
	    cl_recvlist->c_len, XDR_DECODE);

	/*
	 * Treat xid as opaque (xid is the first entity
	 * in the rpc rdma message).
	 * Skip xid and set the xdr position accordingly.
	 */
	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);

	clnt_update_credit(conn, rdma_credit);

	wlist_exists_reply = FALSE;
	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
	    &wlist_exists_reply)) {
		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
		p->cku_err.re_status = RPC_CANTDECODERES;
		p->cku_err.re_errno = EIO;
		goto done;
	}

	/*
	 * The server shouldn't have sent a RDMA_SEND that
	 * the client needs to RDMA_WRITE a reply back to
	 * the server.  So silently ignoring what the
	 * server returns in the rdma_reply section of the
	 * header.
	 */
	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
	off = xdr_getpos(rdmahdr_i_xdrs);

	clnt_decode_long_reply(conn, cl_long_reply,
	    cl_rdma_reply, &replyxdr, &reply_xdrp,
	    cl, cl_recvlist, op, off);

	if (reply_xdrp == NULL)
		goto done;

	if (wlist_exists_reply) {
		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
	}

	reply_msg.rm_direction = REPLY;
	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
	reply_msg.acpted_rply.ar_stat = SUCCESS;
	reply_msg.acpted_rply.ar_verf = _null_auth;

	/*
	 *  xdr_results will be done in AUTH_UNWRAP.
	 */
	reply_msg.acpted_rply.ar_results.where = NULL;
	reply_msg.acpted_rply.ar_results.proc = xdr_void;

	/*
	 * Decode and validate the response.
	 */
	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
		enum clnt_stat re_status;

		_seterr_reply(&reply_msg, &(p->cku_err));

		re_status = p->cku_err.re_status;
		if (re_status == RPC_SUCCESS) {
			/*
			 * Reply is good, check auth.
			 */
			if (!AUTH_VALIDATE(h->cl_auth,
			    &reply_msg.acpted_rply.ar_verf)) {
				p->cku_err.re_status = RPC_AUTHERROR;
				p->cku_err.re_why = AUTH_INVALIDRESP;
				RCSTAT_INCR(rcbadverfs);
				DTRACE_PROBE(
				    krpc__e__clntrdma__callit__authvalidate);
			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
			    xdr_results, resultsp)) {
				p->cku_err.re_status = RPC_CANTDECODERES;
				p->cku_err.re_errno = EIO;
				DTRACE_PROBE(
				    krpc__e__clntrdma__callit__authunwrap);
			}
		} else {
			/* set errno in case we can't recover */
			if (re_status != RPC_VERSMISMATCH &&
			    re_status != RPC_AUTHERROR &&
			    re_status != RPC_PROGVERSMISMATCH)
				p->cku_err.re_errno = EIO;

			if (re_status == RPC_AUTHERROR) {
				if ((refresh_attempt > 0) &&
				    AUTH_REFRESH(h->cl_auth, &reply_msg,
				    p->cku_cred)) {
					refresh_attempt--;
					try_call_again = 1;
					goto done;
				}

				try_call_again = 0;

				/*
				 * We have used the client handle to
				 * do an AUTH_REFRESH and the RPC status may
				 * be set to RPC_SUCCESS; Let's make sure to
				 * set it to RPC_AUTHERROR.
				 */
				p->cku_err.re_status = RPC_AUTHERROR;

				/*
				 * Map recoverable and unrecoverable
				 * authentication errors to appropriate
				 * errno
				 */
				switch (p->cku_err.re_why) {
				case AUTH_BADCRED:
				case AUTH_BADVERF:
				case AUTH_INVALIDRESP:
				case AUTH_TOOWEAK:
				case AUTH_FAILED:
				case RPCSEC_GSS_NOCRED:
				case RPCSEC_GSS_FAILED:
					p->cku_err.re_errno = EACCES;
					break;
				case AUTH_REJECTEDCRED:
				case AUTH_REJECTEDVERF:
				default:
					p->cku_err.re_errno = EIO;
					break;
				}
			}
			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
			    int, p->cku_err.re_why);
		}
	} else {
		p->cku_err.re_status = RPC_CANTDECODERES;
		p->cku_err.re_errno = EIO;
		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
	}

done:
	clnt_return_credit(conn);

	if (cl_sendlist != NULL)
		clist_free(cl_sendlist);

	/*
	 * If rpc reply is in a chunk, free it now.
	 */
	if (cl_long_reply) {
		(void) clist_deregister(conn, cl_long_reply);
		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
		clist_free(cl_long_reply);
	}

	if (call_xdrp)
		XDR_DESTROY(call_xdrp);

	if (rndup.rb_private) {
		rdma_buf_free(conn, &rndup);
	}

	if (reply_xdrp) {
		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
		XDR_DESTROY(reply_xdrp);
	}

	if (cl_rdma_reply) {
		clist_free(cl_rdma_reply);
	}

	if (cl_recvlist) {
		rdma_buf_t	recvmsg = {0};
		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
		recvmsg.type = RECV_BUFFER;
		RDMA_BUF_FREE(conn, &recvmsg);
		clist_free(cl_recvlist);
	}

	RDMA_REL_CONN(conn);

	if (try_call_again)
		goto call_again;

	if (p->cku_err.re_status != RPC_SUCCESS) {
		RCSTAT_INCR(rcbadcalls);
	}
	return (p->cku_err.re_status);
}


static void
clnt_decode_long_reply(CONN *conn,
    struct clist *cl_long_reply,
    struct clist *cl_rdma_reply, XDR *xdrs,
    XDR **rxdrp, struct clist *cl,
    struct clist *cl_recvlist,
    uint_t  op, uint_t off)
{
	if (op != RDMA_NOMSG) {
		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
		    int, cl_recvlist->c_len - off);
		xdrrdma_create(xdrs,
		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
		*rxdrp = xdrs;
		return;
	}

	/* op must be RDMA_NOMSG */
	if (cl) {
		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
		return;
	}

	if (cl_long_reply->u.c_daddr) {
		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
		    int, cl_rdma_reply->c_len);

		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);

		*rxdrp = xdrs;
	}
}

static void
clnt_return_credit(CONN *conn)
{
	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;

	mutex_enter(&conn->c_lock);
	cc_info->clnt_cc_in_flight_ops--;
	cv_signal(&cc_info->clnt_cc_cv);
	mutex_exit(&conn->c_lock);
}

static void
clnt_update_credit(CONN *conn, uint32_t rdma_credit)
{
	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;

	/*
	 * If the granted has not altered, avoid taking the
	 * mutex, to essentially do nothing..
	 */
	if (cc_info->clnt_cc_granted_ops == rdma_credit)
		return;
	/*
	 * Get the granted number of buffers for credit control.
	 */
	mutex_enter(&conn->c_lock);
	cc_info->clnt_cc_granted_ops = rdma_credit;
	mutex_exit(&conn->c_lock);
}

static void
clnt_check_credit(CONN *conn)
{
	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;

	/*
	 * Make sure we are not going over our allowed buffer use
	 * (and make sure we have gotten a granted value before).
	 */
	mutex_enter(&conn->c_lock);
	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
	    cc_info->clnt_cc_granted_ops != 0) {
		/*
		 * Client has maxed out its granted buffers due to
		 * credit control.  Current handling is to block and wait.
		 */
		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
	}
	cc_info->clnt_cc_in_flight_ops++;
	mutex_exit(&conn->c_lock);
}

/* ARGSUSED */
static void
clnt_rdma_kabort(CLIENT *h)
{
}

static void
clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
{
	struct cku_private *p = htop(h);
	*err = p->cku_err;
}

static bool_t
clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
{
	struct cku_private *p = htop(h);
	XDR *xdrs;

	xdrs = &(p->cku_outxdr);
	xdrs->x_op = XDR_FREE;
	return ((*xdr_res)(xdrs, res_ptr));
}

/* ARGSUSED */
static bool_t
clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
{
	return (TRUE);
}

/* ARGSUSED */
static int
clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
	uint32_t xid)
{
	RCSTAT_INCR(rctimers);
	return (0);
}

int
rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
{
	rdma_registry_t	*rp;
	void *handle = NULL;
	struct knetconfig *knc;
	char *pf, *p;
	rdma_stat status;
	int error = 0;

	if (!INGLOBALZONE(curproc))
		return (-1);

	/*
	 * modload the RDMA plugins if not already done.
	 */
	if (!rdma_modloaded) {
		mutex_enter(&rdma_modload_lock);
		if (!rdma_modloaded) {
			error = rdma_modload();
		}
		mutex_exit(&rdma_modload_lock);
		if (error)
			return (-1);
	}

	if (!rdma_dev_available)
		return (-1);

	rw_enter(&rdma_lock, RW_READER);
	rp = rdma_mod_head;
	while (rp != NULL) {
		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
			rp = rp->r_next;
			continue;
		}
		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
		    &handle);
		if (status == RDMA_SUCCESS) {
			knc = kmem_zalloc(sizeof (struct knetconfig),
			    KM_SLEEP);
			knc->knc_semantics = NC_TPI_RDMA;
			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
			if (addr_type == AF_INET)
				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
			else if (addr_type == AF_INET6)
				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
			pf[KNC_STRSIZE - 1] = '\0';

			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
			p[KNC_STRSIZE - 1] = '\0';

			knc->knc_protofmly = pf;
			knc->knc_proto = p;
			knc->knc_rdev = (dev_t)rp;
			*knconf = knc;
			rw_exit(&rdma_lock);
			return (0);
		}
		rp = rp->r_next;
	}
	rw_exit(&rdma_lock);
	return (-1);
}