1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2012 by Delphix. All rights reserved.
24 * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
25 * Copyright 2012 Marcel Telka <marcel@telka.sk>
26 * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
27 */
28/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
29/* All Rights Reserved */
30/*
31 * Portions of this source code were derived from Berkeley
32 * 4.3 BSD under license from the Regents of the University of
33 * California.
34 */
35
36/*
37 * Server side of RPC over RDMA in the kernel.
38 */
39
40#include <sys/param.h>
41#include <sys/types.h>
42#include <sys/user.h>
43#include <sys/sysmacros.h>
44#include <sys/proc.h>
45#include <sys/file.h>
46#include <sys/errno.h>
47#include <sys/kmem.h>
48#include <sys/debug.h>
49#include <sys/systm.h>
50#include <sys/cmn_err.h>
51#include <sys/kstat.h>
52#include <sys/vtrace.h>
53#include <sys/debug.h>
54
55#include <rpc/types.h>
56#include <rpc/xdr.h>
57#include <rpc/auth.h>
58#include <rpc/clnt.h>
59#include <rpc/rpc_msg.h>
60#include <rpc/svc.h>
61#include <rpc/rpc_rdma.h>
62#include <sys/ddi.h>
63#include <sys/sunddi.h>
64
65#include <inet/common.h>
66#include <inet/ip.h>
67#include <inet/ip6.h>
68
69#include <nfs/nfs.h>
70#include <sys/sdt.h>
71
72#define	SVC_RDMA_SUCCESS 0
73#define	SVC_RDMA_FAIL -1
74
75#define	SVC_CREDIT_FACTOR (0.5)
76
77#define	MSG_IS_RPCSEC_GSS(msg)		\
78	((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
79
80
81uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
82
83/*
84 * RDMA transport specific data associated with SVCMASTERXPRT
85 */
86struct rdma_data {
87	SVCMASTERXPRT	*rd_xprt;	/* back ptr to SVCMASTERXPRT */
88	struct rdma_svc_data rd_data;	/* rdma data */
89	rdma_mod_t	*r_mod;		/* RDMA module containing ops ptr */
90};
91
92/*
93 * Plugin connection specific data stashed away in clone SVCXPRT
94 */
95struct clone_rdma_data {
96	bool_t		cloned;		/* xprt cloned for thread processing */
97	CONN		*conn;		/* RDMA connection */
98	rdma_buf_t	rpcbuf;		/* RPC req/resp buffer */
99	struct clist	*cl_reply;	/* reply chunk buffer info */
100	struct clist	*cl_wlist;		/* write list clist */
101};
102
103
104#define	MAXADDRLEN	128	/* max length for address mask */
105
106/*
107 * Routines exported through ops vector.
108 */
109static bool_t		svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
110static bool_t		svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
111static bool_t		svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
112static bool_t		svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
113void			svc_rdma_kdestroy(SVCMASTERXPRT *);
114static int		svc_rdma_kdup(struct svc_req *, caddr_t, int,
115				struct dupreq **, bool_t *);
116static void		svc_rdma_kdupdone(struct dupreq *, caddr_t,
117				void (*)(), int, int);
118static int32_t		*svc_rdma_kgetres(SVCXPRT *, int);
119static void		svc_rdma_kfreeres(SVCXPRT *);
120static void		svc_rdma_kclone_destroy(SVCXPRT *);
121static void		svc_rdma_kstart(SVCMASTERXPRT *);
122void			svc_rdma_kstop(SVCMASTERXPRT *);
123static void		svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
124static void		svc_rdma_ktattrs(SVCXPRT *, int, void **);
125
126static int	svc_process_long_reply(SVCXPRT *, xdrproc_t,
127			caddr_t, struct rpc_msg *, bool_t, int *,
128			int *, int *, unsigned int *);
129
130static int	svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
131			caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
132			bool_t, uint_t *);
133static bool_t rpcmsg_length(xdrproc_t,
134		caddr_t,
135		struct rpc_msg *, bool_t, int);
136
137/*
138 * Server transport operations vector.
139 */
140struct svc_ops rdma_svc_ops = {
141	svc_rdma_krecv,		/* Get requests */
142	svc_rdma_kgetargs,	/* Deserialize arguments */
143	svc_rdma_ksend,		/* Send reply */
144	svc_rdma_kfreeargs,	/* Free argument data space */
145	svc_rdma_kdestroy,	/* Destroy transport handle */
146	svc_rdma_kdup,		/* Check entry in dup req cache */
147	svc_rdma_kdupdone,	/* Mark entry in dup req cache as done */
148	svc_rdma_kgetres,	/* Get pointer to response buffer */
149	svc_rdma_kfreeres,	/* Destroy pre-serialized response header */
150	svc_rdma_kclone_destroy,	/* Destroy a clone xprt */
151	svc_rdma_kstart,	/* Tell `ready-to-receive' to rpcmod */
152	svc_rdma_kclone_xprt,	/* Transport specific clone xprt */
153	svc_rdma_ktattrs,	/* Get Transport Attributes */
154	NULL,			/* Increment transport reference count */
155	NULL			/* Decrement transport reference count */
156};
157
158/*
159 * Server statistics
160 * NOTE: This structure type is duplicated in the NFS fast path.
161 */
162struct {
163	kstat_named_t	rscalls;
164	kstat_named_t	rsbadcalls;
165	kstat_named_t	rsnullrecv;
166	kstat_named_t	rsbadlen;
167	kstat_named_t	rsxdrcall;
168	kstat_named_t	rsdupchecks;
169	kstat_named_t	rsdupreqs;
170	kstat_named_t	rslongrpcs;
171	kstat_named_t	rstotalreplies;
172	kstat_named_t	rstotallongreplies;
173	kstat_named_t	rstotalinlinereplies;
174} rdmarsstat = {
175	{ "calls",	KSTAT_DATA_UINT64 },
176	{ "badcalls",	KSTAT_DATA_UINT64 },
177	{ "nullrecv",	KSTAT_DATA_UINT64 },
178	{ "badlen",	KSTAT_DATA_UINT64 },
179	{ "xdrcall",	KSTAT_DATA_UINT64 },
180	{ "dupchecks",	KSTAT_DATA_UINT64 },
181	{ "dupreqs",	KSTAT_DATA_UINT64 },
182	{ "longrpcs",	KSTAT_DATA_UINT64 },
183	{ "totalreplies",	KSTAT_DATA_UINT64 },
184	{ "totallongreplies",	KSTAT_DATA_UINT64 },
185	{ "totalinlinereplies",	KSTAT_DATA_UINT64 },
186};
187
188kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
189uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
190
191#define	RSSTAT_INCR(x)	atomic_inc_64(&rdmarsstat.x.value.ui64)
192/*
193 * Create a transport record.
194 * The transport record, output buffer, and private data structure
195 * are allocated.  The output buffer is serialized into using xdrmem.
196 * There is one transport record per user process which implements a
197 * set of services.
198 */
199/* ARGSUSED */
200int
201svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
202    rdma_xprt_group_t *started_xprts)
203{
204	int error;
205	SVCMASTERXPRT *xprt;
206	struct rdma_data *rd;
207	rdma_registry_t *rmod;
208	rdma_xprt_record_t *xprt_rec;
209	queue_t	*q;
210	/*
211	 * modload the RDMA plugins is not already done.
212	 */
213	if (!rdma_modloaded) {
214		/*CONSTANTCONDITION*/
215		ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
216
217		mutex_enter(&rdma_modload_lock);
218		if (!rdma_modloaded) {
219			error = rdma_modload();
220		}
221		mutex_exit(&rdma_modload_lock);
222
223		if (error)
224			return (error);
225	}
226
227	/*
228	 * master_xprt_count is the count of master transport handles
229	 * that were successfully created and are ready to recieve for
230	 * RDMA based access.
231	 */
232	error = 0;
233	xprt_rec = NULL;
234	rw_enter(&rdma_lock, RW_READER);
235	if (rdma_mod_head == NULL) {
236		started_xprts->rtg_count = 0;
237		rw_exit(&rdma_lock);
238		if (rdma_dev_available)
239			return (EPROTONOSUPPORT);
240		else
241			return (ENODEV);
242	}
243
244	/*
245	 * If we have reached here, then atleast one RDMA plugin has loaded.
246	 * Create a master_xprt, make it start listenining on the device,
247	 * if an error is generated, record it, we might need to shut
248	 * the master_xprt.
249	 * SVC_START() calls svc_rdma_kstart which calls plugin binding
250	 * routines.
251	 */
252	for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
253
254		/*
255		 * One SVCMASTERXPRT per RDMA plugin.
256		 */
257		xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
258		xprt->xp_ops = &rdma_svc_ops;
259		xprt->xp_sct = sct;
260		xprt->xp_type = T_RDMA;
261		mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
262		mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
263		xprt->xp_req_head = (mblk_t *)0;
264		xprt->xp_req_tail = (mblk_t *)0;
265		xprt->xp_full = FALSE;
266		xprt->xp_enable = FALSE;
267		xprt->xp_reqs = 0;
268		xprt->xp_size = 0;
269		xprt->xp_threads = 0;
270		xprt->xp_detached_threads = 0;
271
272		rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
273		xprt->xp_p2 = (caddr_t)rd;
274		rd->rd_xprt = xprt;
275		rd->r_mod = rmod->r_mod;
276
277		q = &rd->rd_data.q;
278		xprt->xp_wq = q;
279		q->q_ptr = &rd->rd_xprt;
280		xprt->xp_netid = NULL;
281
282		/*
283		 * Each of the plugins will have their own Service ID
284		 * to listener specific mapping, like port number for VI
285		 * and service name for IB.
286		 */
287		rd->rd_data.svcid = id;
288		error = svc_xprt_register(xprt, id);
289		if (error) {
290			DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
291			goto cleanup;
292		}
293
294		SVC_START(xprt);
295		if (!rd->rd_data.active) {
296			svc_xprt_unregister(xprt);
297			error = rd->rd_data.err_code;
298			goto cleanup;
299		}
300
301		/*
302		 * This is set only when there is atleast one or more
303		 * transports successfully created. We insert the pointer
304		 * to the created RDMA master xprt into a separately maintained
305		 * list. This way we can easily reference it later to cleanup,
306		 * when NFS kRPC service pool is going away/unregistered.
307		 */
308		started_xprts->rtg_count ++;
309		xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
310		xprt_rec->rtr_xprt_ptr = xprt;
311		xprt_rec->rtr_next = started_xprts->rtg_listhead;
312		started_xprts->rtg_listhead = xprt_rec;
313		continue;
314cleanup:
315		SVC_DESTROY(xprt);
316		if (error == RDMA_FAILED)
317			error = EPROTONOSUPPORT;
318	}
319
320	rw_exit(&rdma_lock);
321
322	/*
323	 * Don't return any error even if a single plugin was started
324	 * successfully.
325	 */
326	if (started_xprts->rtg_count == 0)
327		return (error);
328	return (0);
329}
330
331/*
332 * Cleanup routine for freeing up memory allocated by
333 * svc_rdma_kcreate()
334 */
335void
336svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
337{
338	struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
339
340
341	mutex_destroy(&xprt->xp_req_lock);
342	mutex_destroy(&xprt->xp_thread_lock);
343	kmem_free(rd, sizeof (*rd));
344	kmem_free(xprt, sizeof (*xprt));
345}
346
347
348static void
349svc_rdma_kstart(SVCMASTERXPRT *xprt)
350{
351	struct rdma_svc_data *svcdata;
352	rdma_mod_t *rmod;
353
354	svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
355	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
356
357	/*
358	 * Create a listener for  module at this port
359	 */
360
361	if (rmod->rdma_count != 0)
362		(*rmod->rdma_ops->rdma_svc_listen)(svcdata);
363	else
364		svcdata->err_code = RDMA_FAILED;
365}
366
367void
368svc_rdma_kstop(SVCMASTERXPRT *xprt)
369{
370	struct rdma_svc_data *svcdata;
371	rdma_mod_t *rmod;
372
373	svcdata	= &((struct rdma_data *)xprt->xp_p2)->rd_data;
374	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
375
376	/*
377	 * Call the stop listener routine for each plugin. If rdma_count is
378	 * already zero set active to zero.
379	 */
380	if (rmod->rdma_count != 0)
381		(*rmod->rdma_ops->rdma_svc_stop)(svcdata);
382	else
383		svcdata->active = 0;
384	if (svcdata->active)
385		DTRACE_PROBE(krpc__e__svcrdma__kstop);
386}
387
388/* ARGSUSED */
389static void
390svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
391{
392
393	struct clone_rdma_data *cdrp;
394	cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
395
396	/*
397	 * Only free buffers and release connection when cloned is set.
398	 */
399	if (cdrp->cloned != TRUE)
400		return;
401
402	rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
403	if (cdrp->cl_reply) {
404		clist_free(cdrp->cl_reply);
405		cdrp->cl_reply = NULL;
406	}
407	RDMA_REL_CONN(cdrp->conn);
408
409	cdrp->cloned = 0;
410}
411
412/*
413 * Clone the xprt specific information.  It will be freed by
414 * SVC_CLONE_DESTROY.
415 */
416static void
417svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
418{
419	struct clone_rdma_data *srcp2;
420	struct clone_rdma_data *dstp2;
421
422	srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
423	dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
424
425	if (srcp2->conn != NULL) {
426		srcp2->cloned = TRUE;
427		*dstp2 = *srcp2;
428	}
429}
430
431static void
432svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
433{
434	CONN	*conn;
435	*tattr = NULL;
436
437	switch (attrflag) {
438	case SVC_TATTR_ADDRMASK:
439		conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
440		ASSERT(conn != NULL);
441		if (conn)
442			*tattr = (void *)&conn->c_addrmask;
443	}
444}
445
446static bool_t
447svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
448{
449	XDR	*xdrs;
450	CONN	*conn;
451	rdma_recv_data_t	*rdp = (rdma_recv_data_t *)mp->b_rptr;
452	struct clone_rdma_data *crdp;
453	struct clist	*cl = NULL;
454	struct clist	*wcl = NULL;
455	struct clist	*cllong = NULL;
456
457	rdma_stat	status;
458	uint32_t vers, op, pos, xid;
459	uint32_t rdma_credit;
460	uint32_t wcl_total_length = 0;
461	bool_t	wwl = FALSE;
462
463	crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
464	RSSTAT_INCR(rscalls);
465	conn = rdp->conn;
466
467	status = rdma_svc_postrecv(conn);
468	if (status != RDMA_SUCCESS) {
469		DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
470		goto badrpc_call;
471	}
472
473	xdrs = &clone_xprt->xp_xdrin;
474	xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
475	xid = *(uint32_t *)rdp->rpcmsg.addr;
476	XDR_SETPOS(xdrs, sizeof (uint32_t));
477
478	if (! xdr_u_int(xdrs, &vers) ||
479	    ! xdr_u_int(xdrs, &rdma_credit) ||
480	    ! xdr_u_int(xdrs, &op)) {
481		DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
482		goto xdr_err;
483	}
484
485	/* Checking if the status of the recv operation was normal */
486	if (rdp->status != 0) {
487		DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
488		    int, rdp->status);
489		goto badrpc_call;
490	}
491
492	if (! xdr_do_clist(xdrs, &cl)) {
493		DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
494		goto xdr_err;
495	}
496
497	if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
498		DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
499		if (cl)
500			clist_free(cl);
501		goto xdr_err;
502	}
503	crdp->cl_wlist = wcl;
504
505	crdp->cl_reply = NULL;
506	(void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
507
508	/*
509	 * A chunk at 0 offset indicates that the RPC call message
510	 * is in a chunk. Get the RPC call message chunk.
511	 */
512	if (cl != NULL && op == RDMA_NOMSG) {
513
514		/* Remove RPC call message chunk from chunklist */
515		cllong = cl;
516		cl = cl->c_next;
517		cllong->c_next = NULL;
518
519
520		/* Allocate and register memory for the RPC call msg chunk */
521		cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
522		cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
523		    cllong->c_len : LONG_REPLY_LEN;
524
525		if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
526			clist_free(cllong);
527			goto cll_malloc_err;
528		}
529
530		cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
531
532		if (cllong->u.c_daddr == 0) {
533			DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
534			rdma_buf_free(conn, &cllong->rb_longbuf);
535			clist_free(cllong);
536			goto cll_malloc_err;
537		}
538
539		status = clist_register(conn, cllong, CLIST_REG_DST);
540		if (status) {
541			DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
542			rdma_buf_free(conn, &cllong->rb_longbuf);
543			clist_free(cllong);
544			goto cll_malloc_err;
545		}
546
547		/*
548		 * Now read the RPC call message in
549		 */
550		status = RDMA_READ(conn, cllong, WAIT);
551		if (status) {
552			DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
553			(void) clist_deregister(conn, cllong);
554			rdma_buf_free(conn, &cllong->rb_longbuf);
555			clist_free(cllong);
556			goto cll_malloc_err;
557		}
558
559		status = clist_syncmem(conn, cllong, CLIST_REG_DST);
560		(void) clist_deregister(conn, cllong);
561
562		xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
563		    cllong->c_len, 0, cl, XDR_DECODE, conn);
564
565		crdp->rpcbuf = cllong->rb_longbuf;
566		crdp->rpcbuf.len = cllong->c_len;
567		clist_free(cllong);
568		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
569	} else {
570		pos = XDR_GETPOS(xdrs);
571		xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
572		    rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
573		crdp->rpcbuf = rdp->rpcmsg;
574
575		/* Use xdrrdmablk_ops to indicate there is a read chunk list */
576		if (cl != NULL) {
577			int32_t flg = XDR_RDMA_RLIST_REG;
578
579			XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
580			xdrs->x_ops = &xdrrdmablk_ops;
581		}
582	}
583
584	if (crdp->cl_wlist) {
585		int32_t flg = XDR_RDMA_WLIST_REG;
586
587		XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
588		XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
589	}
590
591	if (! xdr_callmsg(xdrs, msg)) {
592		DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
593		RSSTAT_INCR(rsxdrcall);
594		goto callmsg_err;
595	}
596
597	/*
598	 * Point the remote transport address in the service_transport
599	 * handle at the address in the request.
600	 */
601	clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
602	clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
603	clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
604
605	clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
606	clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
607	clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
608
609	/*
610	 * In case of RDMA, connection management is
611	 * entirely done in rpcib module and netid in the
612	 * SVCMASTERXPRT is NULL. Initialize the clone netid
613	 * from the connection.
614	 */
615
616	clone_xprt->xp_netid = conn->c_netid;
617
618	clone_xprt->xp_xid = xid;
619	crdp->conn = conn;
620
621	freeb(mp);
622
623	return (TRUE);
624
625callmsg_err:
626	rdma_buf_free(conn, &crdp->rpcbuf);
627
628cll_malloc_err:
629	if (cl)
630		clist_free(cl);
631xdr_err:
632	XDR_DESTROY(xdrs);
633
634badrpc_call:
635	RDMA_BUF_FREE(conn, &rdp->rpcmsg);
636	RDMA_REL_CONN(conn);
637	freeb(mp);
638	RSSTAT_INCR(rsbadcalls);
639	return (FALSE);
640}
641
642static int
643svc_process_long_reply(SVCXPRT * clone_xprt,
644    xdrproc_t xdr_results, caddr_t xdr_location,
645    struct rpc_msg *msg, bool_t has_args, int *msglen,
646    int *freelen, int *numchunks, unsigned int *final_len)
647{
648	int status;
649	XDR xdrslong;
650	struct clist *wcl = NULL;
651	int count = 0;
652	int alloc_len;
653	char  *memp;
654	rdma_buf_t long_rpc = {0};
655	struct clone_rdma_data *crdp;
656
657	crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
658
659	bzero(&xdrslong, sizeof (xdrslong));
660
661	/* Choose a size for the long rpc response */
662	if (MSG_IS_RPCSEC_GSS(msg)) {
663		alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
664	} else {
665		alloc_len = RNDUP(*msglen);
666	}
667
668	if (alloc_len <= 64 * 1024) {
669		if (alloc_len > 32 * 1024) {
670			alloc_len = 64 * 1024;
671		} else {
672			if (alloc_len > 16 * 1024) {
673				alloc_len = 32 * 1024;
674			} else {
675				alloc_len = 16 * 1024;
676			}
677		}
678	}
679
680	long_rpc.type = RDMA_LONG_BUFFER;
681	long_rpc.len = alloc_len;
682	if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
683		return (SVC_RDMA_FAIL);
684	}
685
686	memp = long_rpc.addr;
687	xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
688
689	msg->rm_xid = clone_xprt->xp_xid;
690
691	if (!(xdr_replymsg(&xdrslong, msg) &&
692	    (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
693	    xdr_results, xdr_location)))) {
694		rdma_buf_free(crdp->conn, &long_rpc);
695		DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
696		return (SVC_RDMA_FAIL);
697	}
698
699	*final_len = XDR_GETPOS(&xdrslong);
700
701	DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
702	*numchunks = 0;
703	*freelen = 0;
704
705	wcl = crdp->cl_reply;
706	wcl->rb_longbuf = long_rpc;
707
708	count = *final_len;
709	while ((wcl != NULL) && (count > 0)) {
710
711		if (wcl->c_dmemhandle.mrc_rmr == 0)
712			break;
713
714		DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
715		    uint32_t, wcl->c_len);
716
717		if (wcl->c_len > count) {
718			wcl->c_len = count;
719		}
720		wcl->w.c_saddr3 = (caddr_t)memp;
721
722		count -= wcl->c_len;
723		*numchunks +=  1;
724		memp += wcl->c_len;
725		wcl = wcl->c_next;
726	}
727
728	/*
729	 * Make rest of the chunks 0-len
730	 */
731	while (wcl != NULL) {
732		if (wcl->c_dmemhandle.mrc_rmr == 0)
733			break;
734		wcl->c_len = 0;
735		wcl = wcl->c_next;
736	}
737
738	wcl = crdp->cl_reply;
739
740	/*
741	 * MUST fail if there are still more data
742	 */
743	if (count > 0) {
744		rdma_buf_free(crdp->conn, &long_rpc);
745		DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
746		return (SVC_RDMA_FAIL);
747	}
748
749	if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
750		rdma_buf_free(crdp->conn, &long_rpc);
751		DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
752		return (SVC_RDMA_FAIL);
753	}
754
755	status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
756
757	if (status) {
758		(void) clist_deregister(crdp->conn, wcl);
759		rdma_buf_free(crdp->conn, &long_rpc);
760		DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
761		return (SVC_RDMA_FAIL);
762	}
763
764	status = RDMA_WRITE(crdp->conn, wcl, WAIT);
765
766	(void) clist_deregister(crdp->conn, wcl);
767	rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
768
769	if (status != RDMA_SUCCESS) {
770		DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
771		return (SVC_RDMA_FAIL);
772	}
773
774	return (SVC_RDMA_SUCCESS);
775}
776
777
778static int
779svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
780    caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
781    struct rpc_msg *msg, bool_t has_args, uint_t *len)
782{
783	/*
784	 * Get a pre-allocated buffer for rpc reply
785	 */
786	rpcreply->type = SEND_BUFFER;
787	if (rdma_buf_alloc(conn, rpcreply)) {
788		DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
789		return (SVC_RDMA_FAIL);
790	}
791
792	xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
793	    0, NULL, XDR_ENCODE, conn);
794
795	msg->rm_xid = clone_xprt->xp_xid;
796
797	if (has_args) {
798		if (!(xdr_replymsg(*xdrs, msg) &&
799		    (!has_args ||
800		    SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
801		    xdr_results, xdr_location)))) {
802			rdma_buf_free(conn, rpcreply);
803			DTRACE_PROBE(
804			    krpc__e__svcrdma__rpcmsg__reply__authwrap1);
805			return (SVC_RDMA_FAIL);
806		}
807	} else {
808		if (!xdr_replymsg(*xdrs, msg)) {
809			rdma_buf_free(conn, rpcreply);
810			DTRACE_PROBE(
811			    krpc__e__svcrdma__rpcmsg__reply__authwrap2);
812			return (SVC_RDMA_FAIL);
813		}
814	}
815
816	*len = XDR_GETPOS(*xdrs);
817
818	return (SVC_RDMA_SUCCESS);
819}
820
821/*
822 * Send rpc reply.
823 */
824static bool_t
825svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
826{
827	XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
828	XDR xdrs_rhdr;
829	CONN *conn = NULL;
830	rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
831
832	struct clone_rdma_data *crdp;
833	struct clist *cl_read = NULL;
834	struct clist *cl_send = NULL;
835	struct clist *cl_write = NULL;
836	xdrproc_t xdr_results;		/* results XDR encoding function */
837	caddr_t xdr_location;		/* response results pointer */
838
839	int retval = FALSE;
840	int status, msglen, num_wreply_segments = 0;
841	uint32_t rdma_credit = 0;
842	int freelen = 0;
843	bool_t has_args;
844	uint_t  final_resp_len, rdma_response_op, vers;
845
846	bzero(&xdrs_rhdr, sizeof (XDR));
847	crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
848	conn = crdp->conn;
849
850	/*
851	 * If there is a result procedure specified in the reply message,
852	 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
853	 * We need to make sure it won't be processed twice, so we null
854	 * it for xdr_replymsg here.
855	 */
856	has_args = FALSE;
857	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
858	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
859		if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
860			has_args = TRUE;
861			xdr_location = msg->acpted_rply.ar_results.where;
862			msg->acpted_rply.ar_results.proc = xdr_void;
863			msg->acpted_rply.ar_results.where = NULL;
864		}
865	}
866
867	/*
868	 * Given the limit on the inline response size (RPC_MSG_SZ),
869	 * there is a need to make a guess as to the overall size of
870	 * the response.  If the resultant size is beyond the inline
871	 * size, then the server needs to use the "reply chunk list"
872	 * provided by the client (if the client provided one).  An
873	 * example of this type of response would be a READDIR
874	 * response (e.g. a small directory read would fit in RPC_MSG_SZ
875	 * and that is the preference but it may not fit)
876	 *
877	 * Combine the encoded size and the size of the true results
878	 * and then make the decision about where to encode and send results.
879	 *
880	 * One important note, this calculation is ignoring the size
881	 * of the encoding of the authentication overhead.  The reason
882	 * for this is rooted in the complexities of access to the
883	 * encoded size of RPCSEC_GSS related authentiation,
884	 * integrity, and privacy.
885	 *
886	 * If it turns out that the encoded authentication bumps the
887	 * response over the RPC_MSG_SZ limit, then it may need to
888	 * attempt to encode for the reply chunk list.
889	 */
890
891	/*
892	 * Calculating the "sizeof" the RPC response header and the
893	 * encoded results.
894	 */
895	msglen = xdr_sizeof(xdr_replymsg, msg);
896
897	if (msglen > 0) {
898		RSSTAT_INCR(rstotalreplies);
899	}
900	if (has_args)
901		msglen += xdrrdma_sizeof(xdr_results, xdr_location,
902		    rdma_minchunk, NULL, NULL);
903
904	DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
905
906	status = SVC_RDMA_SUCCESS;
907
908	if (msglen < RPC_MSG_SZ) {
909		/*
910		 * Looks like the response will fit in the inline
911		 * response; let's try
912		 */
913		RSSTAT_INCR(rstotalinlinereplies);
914
915		rdma_response_op = RDMA_MSG;
916
917		status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
918		    xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
919		    has_args, &final_resp_len);
920
921		DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
922		    int, status);
923		DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
924		    int, final_resp_len);
925
926		if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
927			clist_free(crdp->cl_reply);
928			crdp->cl_reply = NULL;
929		}
930	}
931
932	/*
933	 * If the encode failed (size?) or the message really is
934	 * larger than what is allowed, try the response chunk list.
935	 */
936	if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
937		/*
938		 * attempting to use a reply chunk list when there
939		 * isn't one won't get very far...
940		 */
941		if (crdp->cl_reply == NULL) {
942			DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
943			goto out;
944		}
945
946		RSSTAT_INCR(rstotallongreplies);
947
948		msglen = xdr_sizeof(xdr_replymsg, msg);
949		msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
950		    NULL, NULL);
951
952		status = svc_process_long_reply(clone_xprt, xdr_results,
953		    xdr_location, msg, has_args, &msglen, &freelen,
954		    &num_wreply_segments, &final_resp_len);
955
956		DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
957		    int, final_resp_len);
958
959		if (status != SVC_RDMA_SUCCESS) {
960			DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
961			goto out;
962		}
963
964		rdma_response_op = RDMA_NOMSG;
965	}
966
967	DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
968	    int, final_resp_len);
969
970	rbuf_resp.type = SEND_BUFFER;
971	if (rdma_buf_alloc(conn, &rbuf_resp)) {
972		rdma_buf_free(conn, &rbuf_rpc_resp);
973		DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
974		goto out;
975	}
976
977	rdma_credit = rdma_bufs_granted;
978
979	vers = RPCRDMA_VERS;
980	xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
981	(*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
982	/* Skip xid and set the xdr position accordingly. */
983	XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
984	if (!xdr_u_int(&xdrs_rhdr, &vers) ||
985	    !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
986	    !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
987		rdma_buf_free(conn, &rbuf_rpc_resp);
988		rdma_buf_free(conn, &rbuf_resp);
989		DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
990		goto out;
991	}
992
993	/*
994	 * Now XDR the read chunk list, actually always NULL
995	 */
996	(void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
997
998	/*
999	 * encode write list -- we already drove RDMA_WRITEs
1000	 */
1001	cl_write = crdp->cl_wlist;
1002	if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
1003		DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
1004		rdma_buf_free(conn, &rbuf_rpc_resp);
1005		rdma_buf_free(conn, &rbuf_resp);
1006		goto out;
1007	}
1008
1009	/*
1010	 * XDR encode the RDMA_REPLY write chunk
1011	 */
1012	if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1013	    num_wreply_segments)) {
1014		rdma_buf_free(conn, &rbuf_rpc_resp);
1015		rdma_buf_free(conn, &rbuf_resp);
1016		goto out;
1017	}
1018
1019	clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1020	    rbuf_resp.addr, NULL, NULL);
1021
1022	if (rdma_response_op == RDMA_MSG) {
1023		clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1024		    rbuf_rpc_resp.addr, NULL, NULL);
1025	}
1026
1027	status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1028
1029	if (status == RDMA_SUCCESS) {
1030		retval = TRUE;
1031	}
1032
1033out:
1034	/*
1035	 * Free up sendlist chunks
1036	 */
1037	if (cl_send != NULL)
1038		clist_free(cl_send);
1039
1040	/*
1041	 * Destroy private data for xdr rdma
1042	 */
1043	if (clone_xprt->xp_xdrout.x_ops != NULL) {
1044		XDR_DESTROY(&(clone_xprt->xp_xdrout));
1045	}
1046
1047	if (crdp->cl_reply) {
1048		clist_free(crdp->cl_reply);
1049		crdp->cl_reply = NULL;
1050	}
1051
1052	/*
1053	 * This is completely disgusting.  If public is set it is
1054	 * a pointer to a structure whose first field is the address
1055	 * of the function to free that structure and any related
1056	 * stuff.  (see rrokfree in nfs_xdr.c).
1057	 */
1058	if (xdrs_rpc->x_public) {
1059		/* LINTED pointer alignment */
1060		(**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1061	}
1062
1063	if (xdrs_rhdr.x_ops != NULL) {
1064		XDR_DESTROY(&xdrs_rhdr);
1065	}
1066
1067	return (retval);
1068}
1069
1070/*
1071 * Deserialize arguments.
1072 */
1073static bool_t
1074svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1075{
1076	if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1077	    xdr_args, args_ptr)) != TRUE)
1078		return (FALSE);
1079	return (TRUE);
1080}
1081
1082static bool_t
1083svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1084    caddr_t args_ptr)
1085{
1086	struct clone_rdma_data *crdp;
1087	bool_t retval;
1088
1089	/*
1090	 * If the cloned bit is true, then this transport specific
1091	 * rmda data has been duplicated into another cloned xprt. Do
1092	 * not free, or release the connection, it is still in use.  The
1093	 * buffers will be freed and the connection released later by
1094	 * SVC_CLONE_DESTROY().
1095	 */
1096	crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1097	if (crdp->cloned == TRUE) {
1098		crdp->cloned = 0;
1099		return (TRUE);
1100	}
1101
1102	/*
1103	 * Free the args if needed then XDR_DESTROY
1104	 */
1105	if (args_ptr) {
1106		XDR	*xdrs = &clone_xprt->xp_xdrin;
1107
1108		xdrs->x_op = XDR_FREE;
1109		retval = (*xdr_args)(xdrs, args_ptr);
1110	}
1111
1112	XDR_DESTROY(&(clone_xprt->xp_xdrin));
1113	rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1114	if (crdp->cl_reply) {
1115		clist_free(crdp->cl_reply);
1116		crdp->cl_reply = NULL;
1117	}
1118	RDMA_REL_CONN(crdp->conn);
1119
1120	return (retval);
1121}
1122
1123/* ARGSUSED */
1124static int32_t *
1125svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1126{
1127	return (NULL);
1128}
1129
1130/* ARGSUSED */
1131static void
1132svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1133{
1134}
1135
1136/*
1137 * the dup cacheing routines below provide a cache of non-failure
1138 * transaction id's.  rpc service routines can use this to detect
1139 * retransmissions and re-send a non-failure response.
1140 */
1141
1142/*
1143 * MAXDUPREQS is the number of cached items.  It should be adjusted
1144 * to the service load so that there is likely to be a response entry
1145 * when the first retransmission comes in.
1146 */
1147#define	MAXDUPREQS	8192
1148
1149/*
1150 * This should be appropriately scaled to MAXDUPREQS.  To produce as less as
1151 * possible collisions it is suggested to set this to a prime.
1152 */
1153#define	DRHASHSZ	2053
1154
1155#define	XIDHASH(xid)	((xid) % DRHASHSZ)
1156#define	DRHASH(dr)	XIDHASH((dr)->dr_xid)
1157#define	REQTOXID(req)	((req)->rq_xprt->xp_xid)
1158
1159static int	rdmandupreqs = 0;
1160int	rdmamaxdupreqs = MAXDUPREQS;
1161static kmutex_t rdmadupreq_lock;
1162static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1163static int	rdmadrhashstat[DRHASHSZ];
1164
1165static void unhash(struct dupreq *);
1166
1167/*
1168 * rdmadrmru points to the head of a circular linked list in lru order.
1169 * rdmadrmru->dr_next == drlru
1170 */
1171struct dupreq *rdmadrmru;
1172
1173/*
1174 * svc_rdma_kdup searches the request cache and returns 0 if the
1175 * request is not found in the cache.  If it is found, then it
1176 * returns the state of the request (in progress or done) and
1177 * the status or attributes that were part of the original reply.
1178 */
1179static int
1180svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1181    bool_t *dupcachedp)
1182{
1183	struct dupreq *dr;
1184	uint32_t xid;
1185	uint32_t drhash;
1186	int status;
1187
1188	xid = REQTOXID(req);
1189	mutex_enter(&rdmadupreq_lock);
1190	RSSTAT_INCR(rsdupchecks);
1191	/*
1192	 * Check to see whether an entry already exists in the cache.
1193	 */
1194	dr = rdmadrhashtbl[XIDHASH(xid)];
1195	while (dr != NULL) {
1196		if (dr->dr_xid == xid &&
1197		    dr->dr_proc == req->rq_proc &&
1198		    dr->dr_prog == req->rq_prog &&
1199		    dr->dr_vers == req->rq_vers &&
1200		    dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1201		    bcmp((caddr_t)dr->dr_addr.buf,
1202		    (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1203		    dr->dr_addr.len) == 0) {
1204			status = dr->dr_status;
1205			if (status == DUP_DONE) {
1206				bcopy(dr->dr_resp.buf, res, size);
1207				if (dupcachedp != NULL)
1208					*dupcachedp = (dr->dr_resfree != NULL);
1209			} else {
1210				dr->dr_status = DUP_INPROGRESS;
1211				*drpp = dr;
1212			}
1213			RSSTAT_INCR(rsdupreqs);
1214			mutex_exit(&rdmadupreq_lock);
1215			return (status);
1216		}
1217		dr = dr->dr_chain;
1218	}
1219
1220	/*
1221	 * There wasn't an entry, either allocate a new one or recycle
1222	 * an old one.
1223	 */
1224	if (rdmandupreqs < rdmamaxdupreqs) {
1225		dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1226		if (dr == NULL) {
1227			mutex_exit(&rdmadupreq_lock);
1228			return (DUP_ERROR);
1229		}
1230		dr->dr_resp.buf = NULL;
1231		dr->dr_resp.maxlen = 0;
1232		dr->dr_addr.buf = NULL;
1233		dr->dr_addr.maxlen = 0;
1234		if (rdmadrmru) {
1235			dr->dr_next = rdmadrmru->dr_next;
1236			rdmadrmru->dr_next = dr;
1237		} else {
1238			dr->dr_next = dr;
1239		}
1240		rdmandupreqs++;
1241	} else {
1242		dr = rdmadrmru->dr_next;
1243		while (dr->dr_status == DUP_INPROGRESS) {
1244			dr = dr->dr_next;
1245			if (dr == rdmadrmru->dr_next) {
1246				mutex_exit(&rdmadupreq_lock);
1247				return (DUP_ERROR);
1248			}
1249		}
1250		unhash(dr);
1251		if (dr->dr_resfree) {
1252			(*dr->dr_resfree)(dr->dr_resp.buf);
1253		}
1254	}
1255	dr->dr_resfree = NULL;
1256	rdmadrmru = dr;
1257
1258	dr->dr_xid = REQTOXID(req);
1259	dr->dr_prog = req->rq_prog;
1260	dr->dr_vers = req->rq_vers;
1261	dr->dr_proc = req->rq_proc;
1262	if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1263		if (dr->dr_addr.buf != NULL)
1264			kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1265		dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1266		dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1267		if (dr->dr_addr.buf == NULL) {
1268			dr->dr_addr.maxlen = 0;
1269			dr->dr_status = DUP_DROP;
1270			mutex_exit(&rdmadupreq_lock);
1271			return (DUP_ERROR);
1272		}
1273	}
1274	dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1275	bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1276	if (dr->dr_resp.maxlen < size) {
1277		if (dr->dr_resp.buf != NULL)
1278			kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1279		dr->dr_resp.maxlen = (unsigned int)size;
1280		dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1281		if (dr->dr_resp.buf == NULL) {
1282			dr->dr_resp.maxlen = 0;
1283			dr->dr_status = DUP_DROP;
1284			mutex_exit(&rdmadupreq_lock);
1285			return (DUP_ERROR);
1286		}
1287	}
1288	dr->dr_status = DUP_INPROGRESS;
1289
1290	drhash = (uint32_t)DRHASH(dr);
1291	dr->dr_chain = rdmadrhashtbl[drhash];
1292	rdmadrhashtbl[drhash] = dr;
1293	rdmadrhashstat[drhash]++;
1294	mutex_exit(&rdmadupreq_lock);
1295	*drpp = dr;
1296	return (DUP_NEW);
1297}
1298
1299/*
1300 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1301 * and stores the response.
1302 */
1303static void
1304svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1305    int size, int status)
1306{
1307	ASSERT(dr->dr_resfree == NULL);
1308	if (status == DUP_DONE) {
1309		bcopy(res, dr->dr_resp.buf, size);
1310		dr->dr_resfree = dis_resfree;
1311	}
1312	dr->dr_status = status;
1313}
1314
1315/*
1316 * This routine expects that the mutex, rdmadupreq_lock, is already held.
1317 */
1318static void
1319unhash(struct dupreq *dr)
1320{
1321	struct dupreq *drt;
1322	struct dupreq *drtprev = NULL;
1323	uint32_t drhash;
1324
1325	ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1326
1327	drhash = (uint32_t)DRHASH(dr);
1328	drt = rdmadrhashtbl[drhash];
1329	while (drt != NULL) {
1330		if (drt == dr) {
1331			rdmadrhashstat[drhash]--;
1332			if (drtprev == NULL) {
1333				rdmadrhashtbl[drhash] = drt->dr_chain;
1334			} else {
1335				drtprev->dr_chain = drt->dr_chain;
1336			}
1337			return;
1338		}
1339		drtprev = drt;
1340		drt = drt->dr_chain;
1341	}
1342}
1343
1344bool_t
1345rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1346{
1347	struct clist	*clist;
1348	uint32_t	tlen;
1349
1350	if (req->rq_xprt->xp_type != T_RDMA) {
1351		return (FALSE);
1352	}
1353
1354	tlen = 0;
1355	clist = wlist;
1356	while (clist) {
1357		tlen += clist->c_len;
1358		clist = clist->c_next;
1359	}
1360
1361	/*
1362	 * set iov to addr+len of first segment of first wchunk of
1363	 * wlist sent by client.  krecv() already malloc'd a buffer
1364	 * large enough, but registration is deferred until we write
1365	 * the buffer back to (NFS) client using RDMA_WRITE.
1366	 */
1367	iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1368	iov->iov_len = tlen;
1369
1370	return (TRUE);
1371}
1372
1373/*
1374 * routine to setup the read chunk lists
1375 */
1376
1377int
1378rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1379{
1380	int		data_len, avail_len;
1381	uint_t		round_len;
1382
1383	data_len = avail_len = 0;
1384
1385	while (wcl != NULL && count > 0) {
1386		if (wcl->c_dmemhandle.mrc_rmr == 0)
1387			break;
1388
1389		if (wcl->c_len < count) {
1390			data_len += wcl->c_len;
1391			avail_len = 0;
1392		} else {
1393			data_len += count;
1394			avail_len = wcl->c_len - count;
1395			wcl->c_len = count;
1396		}
1397		count -= wcl->c_len;
1398
1399		if (count == 0)
1400			break;
1401
1402		wcl = wcl->c_next;
1403	}
1404
1405	/*
1406	 * MUST fail if there are still more data
1407	 */
1408	if (count > 0) {
1409		DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1410		    int, data_len, int, count);
1411		return (FALSE);
1412	}
1413
1414	/*
1415	 * Round up the last chunk to 4-byte boundary
1416	 */
1417	*wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1418	round_len = *wcl_len - data_len;
1419
1420	if (round_len) {
1421
1422		/*
1423		 * If there is space in the current chunk,
1424		 * add the roundup to the chunk.
1425		 */
1426		if (avail_len >= round_len) {
1427			wcl->c_len += round_len;
1428		} else  {
1429			/*
1430			 * try the next one.
1431			 */
1432			wcl = wcl->c_next;
1433			if ((wcl == NULL) || (wcl->c_len < round_len)) {
1434				DTRACE_PROBE1(
1435				    krpc__e__rdma_setup_read_chunks_rndup,
1436				    int, round_len);
1437				return (FALSE);
1438			}
1439			wcl->c_len = round_len;
1440		}
1441	}
1442
1443	wcl = wcl->c_next;
1444
1445	/*
1446	 * Make rest of the chunks 0-len
1447	 */
1448
1449	clist_zero_len(wcl);
1450
1451	return (TRUE);
1452}
1453