xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_buf.c (revision 00a3eaf3896a33935e11fd5c5fb5c1714225c067)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #include <sys/ib/clients/rds/rdsib_cm.h>
76 #include <sys/ib/clients/rds/rdsib_ib.h>
77 #include <sys/ib/clients/rds/rdsib_buf.h>
78 #include <sys/ib/clients/rds/rdsib_ep.h>
79 #include <sys/ib/clients/rds/rds_kstat.h>
80 
81 /*
82  * This File contains the buffer management code
83  */
84 
85 #define	DUMP_USER_PARAMS()	\
86 	RDS_DPRINTF3(LABEL, "MaxNodes = %d", MaxNodes); \
87 	RDS_DPRINTF3(LABEL, "UserBufferSize = %d", UserBufferSize); \
88 	RDS_DPRINTF3(LABEL, "RdsPktSize = %d", RdsPktSize); \
89 	RDS_DPRINTF3(LABEL, "MaxDataSendBuffers = %d", MaxDataSendBuffers); \
90 	RDS_DPRINTF3(LABEL, "MaxDataRecvBuffers = %d", MaxDataRecvBuffers); \
91 	RDS_DPRINTF3(LABEL, "MaxCtrlSendBuffers = %d", MaxCtrlSendBuffers); \
92 	RDS_DPRINTF3(LABEL, "MaxCtrlRecvBuffers = %d", MaxCtrlRecvBuffers); \
93 	RDS_DPRINTF3(LABEL, "DataRecvBufferLWM = %d", DataRecvBufferLWM); \
94 	RDS_DPRINTF3(LABEL, "PendingRxPktsHWM = %d", PendingRxPktsHWM); \
95 	RDS_DPRINTF3(LABEL, "MinRnrRetry = %d", MinRnrRetry)
96 
97 static void
98 rds_free_mblk(char *arg)
99 {
100 	rds_buf_t *bp = (rds_buf_t *)(uintptr_t)arg;
101 
102 	/* Free the recv buffer */
103 	RDS_DPRINTF4("rds_free_mblk", "Enter: BP(%p)", bp);
104 	ASSERT(bp->buf_state == RDS_RCVBUF_ONSOCKQ);
105 	rds_free_recv_buf(bp, 1);
106 	RDS_DECR_RXPKTS_PEND(1);
107 	RDS_DPRINTF4("rds_free_mblk", "Return: BP(%p)", bp);
108 }
109 
110 void
111 rds_free_recv_caches(rds_state_t *statep)
112 {
113 	rds_hca_t	*hcap;
114 	int		ret;
115 
116 	RDS_DPRINTF4("rds_free_recv_caches", "Enter");
117 
118 	mutex_enter(&rds_dpool.pool_lock);
119 	if (rds_dpool.pool_memp == NULL) {
120 		RDS_DPRINTF2("rds_free_recv_caches", "Caches are empty");
121 		mutex_exit(&rds_dpool.pool_lock);
122 		return;
123 	}
124 
125 	/*
126 	 * All buffers must have been freed as all sessions are closed
127 	 * and destroyed
128 	 */
129 	ASSERT(rds_dpool.pool_nbusy == 0);
130 	RDS_DPRINTF2("rds_free_recv_caches", "Data Pool has "
131 	    "pending buffers: %d", rds_dpool.pool_nbusy);
132 	while (rds_dpool.pool_nbusy != 0) {
133 		mutex_exit(&rds_dpool.pool_lock);
134 		delay(drv_usectohz(1000000));
135 		mutex_enter(&rds_dpool.pool_lock);
136 	}
137 
138 	hcap = statep->rds_hcalistp;
139 	while (hcap != NULL) {
140 		if (hcap->hca_mrhdl != NULL) {
141 			ret = ibt_deregister_mr(hcap->hca_hdl,
142 			    hcap->hca_mrhdl);
143 			if (ret == IBT_SUCCESS) {
144 				hcap->hca_mrhdl = NULL;
145 				hcap->hca_lkey = 0;
146 				hcap->hca_rkey = 0;
147 			} else {
148 				RDS_DPRINTF2(LABEL, "ibt_deregister_mr "
149 				    "failed: %d, mrhdl: 0x%p", ret,
150 				    hcap->hca_mrhdl);
151 			}
152 		}
153 		hcap = hcap->hca_nextp;
154 	}
155 
156 	kmem_free(rds_dpool.pool_bufmemp, (rds_dpool.pool_nbuffers +
157 	    rds_cpool.pool_nbuffers) * sizeof (rds_buf_t));
158 	rds_dpool.pool_bufmemp = NULL;
159 
160 	kmem_free(rds_dpool.pool_memp, rds_dpool.pool_memsize);
161 	rds_dpool.pool_memp = NULL;
162 
163 	mutex_exit(&rds_dpool.pool_lock);
164 
165 	RDS_DPRINTF4("rds_free_recv_caches", "Return");
166 }
167 
168 int
169 rds_init_recv_caches(rds_state_t *statep)
170 {
171 	uint8_t		*mp;
172 	rds_buf_t	*bp;
173 	rds_hca_t	*hcap;
174 	uint32_t	nsessions;
175 	uint_t		ix;
176 	uint_t		nctrlrx;
177 	uint8_t		*memp;
178 	uint_t		memsize, nbuf;
179 	rds_buf_t	*bufmemp;
180 	ibt_mr_attr_t	mem_attr;
181 	ibt_mr_desc_t	mem_desc;
182 	int		ret;
183 
184 	RDS_DPRINTF4("rds_init_recv_caches", "Enter");
185 
186 	DUMP_USER_PARAMS();
187 
188 	mutex_enter(&rds_dpool.pool_lock);
189 	if (rds_dpool.pool_memp != NULL) {
190 		RDS_DPRINTF2("rds_init_recv_caches", "Pools are already "
191 		    "initialized");
192 		mutex_exit(&rds_dpool.pool_lock);
193 		return (0);
194 	}
195 
196 	/*
197 	 * High water mark for the receive buffers in the system. If the
198 	 * number of buffers used crosses this mark then all sockets in
199 	 * would be stalled. The port quota for the sockets is set based
200 	 * on this limit.
201 	 */
202 	rds_rx_pkts_pending_hwm = (PendingRxPktsHWM * NDataRX)/100;
203 
204 	/* nsessions can never be less than 1 */
205 	nsessions = MaxNodes - 1;
206 	nctrlrx = (nsessions + 1) * MaxCtrlRecvBuffers;
207 
208 	RDS_DPRINTF3(LABEL, "Number of Possible Sessions: %d", nsessions);
209 
210 	/* Add the hdr */
211 	RdsPktSize = UserBufferSize + RDS_DATA_HDR_SZ;
212 
213 	memsize = (NDataRX * RdsPktSize) + (nctrlrx * RDS_CTRLPKT_SIZE);
214 	nbuf = NDataRX + nctrlrx;
215 	RDS_DPRINTF3(LABEL, "RDS Buffer Pool Memory: %lld", memsize);
216 	RDS_DPRINTF3(LABEL, "Total Buffers: %d", nbuf);
217 
218 	memp = (uint8_t *)kmem_zalloc(memsize, KM_NOSLEEP);
219 	if (memp == NULL) {
220 		RDS_DPRINTF1(LABEL, "RDS Memory allocation failed");
221 		mutex_exit(&rds_dpool.pool_lock);
222 		return (-1);
223 	}
224 
225 	RDS_DPRINTF3(LABEL, "RDS Buffer Entries Memory: %lld",
226 	    nbuf * sizeof (rds_buf_t));
227 
228 	/* allocate memory for buffer entries */
229 	bufmemp = (rds_buf_t *)kmem_zalloc(nbuf * sizeof (rds_buf_t),
230 	    KM_SLEEP);
231 
232 	/* register the memory with all HCAs */
233 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)memp;
234 	mem_attr.mr_len = memsize;
235 	mem_attr.mr_as = NULL;
236 	mem_attr.mr_flags = IBT_MR_ENABLE_LOCAL_WRITE;
237 
238 	rw_enter(&statep->rds_hca_lock, RW_WRITER);
239 
240 	hcap = statep->rds_hcalistp;
241 	while (hcap != NULL) {
242 		if (hcap->hca_state != RDS_HCA_STATE_OPEN) {
243 			hcap = hcap->hca_nextp;
244 			continue;
245 		}
246 
247 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
248 		    &mem_attr, &hcap->hca_mrhdl, &mem_desc);
249 		if (ret != IBT_SUCCESS) {
250 			RDS_DPRINTF2(LABEL, "ibt_register_mr failed: %d", ret);
251 			hcap = statep->rds_hcalistp;
252 			while ((hcap) && (hcap->hca_mrhdl != NULL)) {
253 				ret = ibt_deregister_mr(hcap->hca_hdl,
254 				    hcap->hca_mrhdl);
255 				if (ret == IBT_SUCCESS) {
256 					hcap->hca_mrhdl = NULL;
257 					hcap->hca_lkey = 0;
258 					hcap->hca_rkey = 0;
259 				} else {
260 					RDS_DPRINTF2(LABEL, "ibt_deregister_mr "
261 					    "failed: %d, mrhdl: 0x%p", ret,
262 					    hcap->hca_mrhdl);
263 				}
264 				hcap = hcap->hca_nextp;
265 			}
266 			kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
267 			kmem_free(memp, memsize);
268 			rw_exit(&statep->rds_hca_lock);
269 			mutex_exit(&rds_dpool.pool_lock);
270 			return (-1);
271 		}
272 
273 		hcap->hca_state = RDS_HCA_STATE_MEM_REGISTERED;
274 		hcap->hca_lkey = mem_desc.md_lkey;
275 		hcap->hca_rkey = mem_desc.md_rkey;
276 
277 		hcap = hcap->hca_nextp;
278 	}
279 	rw_exit(&statep->rds_hca_lock);
280 
281 	/* Initialize data pool */
282 	rds_dpool.pool_memp = memp;
283 	rds_dpool.pool_memsize = memsize;
284 	rds_dpool.pool_bufmemp = bufmemp;
285 	rds_dpool.pool_nbuffers = NDataRX;
286 	rds_dpool.pool_nbusy = 0;
287 	rds_dpool.pool_nfree = NDataRX;
288 
289 	/* chain the buffers */
290 	mp = memp;
291 	bp = bufmemp;
292 	for (ix = 0; ix < NDataRX; ix++) {
293 		bp[ix].buf_nextp = &bp[ix + 1];
294 		bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
295 		bp[ix].buf_state = RDS_RCVBUF_FREE;
296 		bp[ix].buf_frtn.free_func = rds_free_mblk;
297 		bp[ix].buf_frtn.free_arg = (char *)&bp[ix];
298 		mp = mp + RdsPktSize;
299 	}
300 	bp[NDataRX - 1].buf_nextp = NULL;
301 	rds_dpool.pool_headp = &bp[0];
302 	rds_dpool.pool_tailp = &bp[NDataRX - 1];
303 
304 	/* Initialize ctrl pool */
305 	rds_cpool.pool_nbuffers = nctrlrx;
306 	rds_cpool.pool_nbusy = 0;
307 	rds_cpool.pool_nfree = nctrlrx;
308 
309 	/* chain the buffers */
310 	for (ix = NDataRX; ix < nbuf - 1; ix++) {
311 		bp[ix].buf_nextp = &bp[ix + 1];
312 		bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
313 		mp = mp + RDS_CTRLPKT_SIZE;
314 	}
315 	bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
316 	bp[nbuf - 1].buf_nextp = NULL;
317 	rds_cpool.pool_headp = &bp[NDataRX];
318 	rds_cpool.pool_tailp = &bp[nbuf - 1];
319 
320 	mutex_exit(&rds_dpool.pool_lock);
321 
322 	RDS_DPRINTF3(LABEL, "rdsmemp start: %p end: %p", memp, mp);
323 	RDS_DPRINTF4("rds_init_recv_caches", "Return");
324 	return (0);
325 }
326 
327 rds_hca_t *rds_lkup_hca(ib_guid_t hca_guid);
328 
329 void
330 rds_free_send_pool(rds_ep_t *ep)
331 {
332 	rds_bufpool_t   *pool;
333 	rds_hca_t	*hcap;
334 	int		ret;
335 
336 	pool = &ep->ep_sndpool;
337 
338 	mutex_enter(&pool->pool_lock);
339 	if (pool->pool_memp == NULL) {
340 		mutex_exit(&pool->pool_lock);
341 		RDS_DPRINTF2("rds_free_send_pool",
342 		    "EP(%p) DOUBLE Free on Send Pool", ep);
343 		return;
344 	}
345 
346 	/* get the hcap for the HCA hosting this channel */
347 	hcap = rds_lkup_hca(ep->ep_hca_guid);
348 	if (hcap == NULL) {
349 		RDS_DPRINTF2("rds_free_send_pool", "HCA (0x%llx) not found",
350 		    ep->ep_hca_guid);
351 	} else {
352 		ret = ibt_deregister_mr(hcap->hca_hdl, ep->ep_snd_mrhdl);
353 		if (ret != IBT_SUCCESS) {
354 			RDS_DPRINTF2(LABEL,
355 			    "ibt_deregister_mr failed: %d, mrhdl: 0x%p",
356 			    ret, ep->ep_snd_mrhdl);
357 		}
358 
359 		if (ep->ep_ack_addr) {
360 			ret = ibt_deregister_mr(hcap->hca_hdl, ep->ep_ackhdl);
361 			if (ret != IBT_SUCCESS) {
362 				RDS_DPRINTF2(LABEL,
363 				    "ibt_deregister_mr ackhdl failed: %d, "
364 				    "mrhdl: 0x%p", ret, ep->ep_ackhdl);
365 			}
366 
367 			kmem_free((void *)ep->ep_ack_addr, sizeof (uintptr_t));
368 			ep->ep_ack_addr = NULL;
369 		}
370 	}
371 
372 	kmem_free(pool->pool_memp, pool->pool_memsize);
373 	kmem_free(pool->pool_bufmemp,
374 	    pool->pool_nbuffers * sizeof (rds_buf_t));
375 	pool->pool_memp = NULL;
376 	pool->pool_bufmemp = NULL;
377 	mutex_exit(&pool->pool_lock);
378 }
379 
380 int
381 rds_init_send_pool(rds_ep_t *ep, ib_guid_t hca_guid)
382 {
383 	uint8_t		*mp;
384 	rds_buf_t	*bp;
385 	rds_hca_t	*hcap;
386 	uint_t		ix, rcv_len;
387 	ibt_mr_attr_t   mem_attr;
388 	ibt_mr_desc_t   mem_desc;
389 	uint8_t		*memp;
390 	rds_buf_t	*bufmemp;
391 	uintptr_t	ack_addr = NULL;
392 	uint_t		memsize;
393 	uint_t		nbuf;
394 	rds_bufpool_t   *spool;
395 	rds_data_hdr_t	*pktp;
396 	int		ret;
397 
398 	RDS_DPRINTF2("rds_init_send_pool", "Enter");
399 
400 	spool = &ep->ep_sndpool;
401 
402 	ASSERT(spool->pool_memp == NULL);
403 	ASSERT(ep->ep_hca_guid == 0);
404 
405 	/* get the hcap for the HCA hosting this channel */
406 	hcap = rds_get_hcap(rdsib_statep, hca_guid);
407 	if (hcap == NULL) {
408 		RDS_DPRINTF2("rds_init_send_pool", "HCA (0x%llx) not found",
409 		    hca_guid);
410 		return (-1);
411 	}
412 
413 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
414 		spool->pool_nbuffers = MaxDataSendBuffers;
415 		spool->pool_nbusy = 0;
416 		spool->pool_nfree = MaxDataSendBuffers;
417 		memsize = (MaxDataSendBuffers * RdsPktSize) +
418 		    sizeof (uintptr_t);
419 		rcv_len = RdsPktSize;
420 	} else {
421 		spool->pool_nbuffers = MaxCtrlSendBuffers;
422 		spool->pool_nbusy = 0;
423 		spool->pool_nfree = MaxCtrlSendBuffers;
424 		memsize = MaxCtrlSendBuffers * RDS_CTRLPKT_SIZE;
425 		rcv_len = RDS_CTRLPKT_SIZE;
426 	}
427 	nbuf = spool->pool_nbuffers;
428 
429 	RDS_DPRINTF3(LABEL, "RDS Send Pool Memory: %lld", memsize);
430 
431 	memp = (uint8_t *)kmem_zalloc(memsize, KM_NOSLEEP);
432 	if (memp == NULL) {
433 		RDS_DPRINTF1(LABEL, "RDS Send Memory allocation failed");
434 		return (-1);
435 	}
436 
437 	RDS_DPRINTF3(LABEL, "RDS Buffer Entries Memory: %lld",
438 	    nbuf * sizeof (rds_buf_t));
439 
440 	/* allocate memory for buffer entries */
441 	bufmemp = (rds_buf_t *)kmem_zalloc(nbuf * sizeof (rds_buf_t),
442 	    KM_SLEEP);
443 
444 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
445 		ack_addr = (uintptr_t)kmem_zalloc(sizeof (uintptr_t), KM_SLEEP);
446 
447 		/* register the memory with the HCA for this channel */
448 		mem_attr.mr_vaddr = (ib_vaddr_t)ack_addr;
449 		mem_attr.mr_len = sizeof (uintptr_t);
450 		mem_attr.mr_as = NULL;
451 		mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE |
452 		    IBT_MR_ENABLE_REMOTE_WRITE;
453 
454 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
455 		    &mem_attr, &ep->ep_ackhdl, &mem_desc);
456 		if (ret != IBT_SUCCESS) {
457 			RDS_DPRINTF2("rds_init_send_pool",
458 			    "EP(%p): ibt_register_mr for ack failed: %d",
459 			    ep, ret);
460 			kmem_free(memp, memsize);
461 			kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
462 			kmem_free((void *)ack_addr, sizeof (uintptr_t));
463 			return (-1);
464 		}
465 		ep->ep_ack_rkey = mem_desc.md_rkey;
466 		ep->ep_ack_addr = ack_addr;
467 	}
468 
469 	/* register the memory with the HCA for this channel */
470 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)memp;
471 	mem_attr.mr_len = memsize;
472 	mem_attr.mr_as = NULL;
473 	mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE;
474 
475 	ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
476 	    &mem_attr, &ep->ep_snd_mrhdl, &mem_desc);
477 	if (ret != IBT_SUCCESS) {
478 		RDS_DPRINTF2("rds_init_send_pool", "EP(%p): ibt_register_mr "
479 		    "failed: %d", ep, ret);
480 		kmem_free(memp, memsize);
481 		kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
482 		if (ack_addr != NULL)
483 			kmem_free((void *)ack_addr, sizeof (uintptr_t));
484 		return (-1);
485 	}
486 	ep->ep_snd_lkey = mem_desc.md_lkey;
487 
488 
489 	/* Initialize the pool */
490 	spool->pool_memp = memp;
491 	spool->pool_memsize = memsize;
492 	spool->pool_bufmemp = bufmemp;
493 	spool->pool_sqpoll_pending = B_FALSE;
494 
495 	/* chain the buffers and initialize them */
496 	mp = memp;
497 	bp = bufmemp;
498 
499 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
500 		for (ix = 0; ix < nbuf - 1; ix++) {
501 			bp[ix].buf_nextp = &bp[ix + 1];
502 			bp[ix].buf_ep = ep;
503 			bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
504 			bp[ix].buf_ds.ds_key = ep->ep_snd_lkey;
505 			bp[ix].buf_state = RDS_SNDBUF_FREE;
506 			pktp = (rds_data_hdr_t *)(uintptr_t)mp;
507 			pktp->dh_bufid = (uintptr_t)&bp[ix];
508 			mp = mp + rcv_len;
509 		}
510 		bp[nbuf - 1].buf_nextp = NULL;
511 		bp[nbuf - 1].buf_ep = ep;
512 		bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
513 		bp[nbuf - 1].buf_ds.ds_key = ep->ep_snd_lkey;
514 		bp[nbuf - 1].buf_state = RDS_SNDBUF_FREE;
515 		pktp = (rds_data_hdr_t *)(uintptr_t)mp;
516 		pktp->dh_bufid = (uintptr_t)&bp[nbuf - 1];
517 
518 		spool->pool_headp = &bp[0];
519 		spool->pool_tailp = &bp[nbuf - 1];
520 
521 		mp = mp + rcv_len;
522 		ep->ep_ackds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
523 		ep->ep_ackds.ds_key = ep->ep_snd_lkey;
524 		ep->ep_ackds.ds_len = sizeof (uintptr_t);
525 
526 		*(uintptr_t *)ep->ep_ack_addr = (uintptr_t)spool->pool_tailp;
527 	} else {
528 		/* control send pool */
529 		for (ix = 0; ix < nbuf - 1; ix++) {
530 			bp[ix].buf_nextp = &bp[ix + 1];
531 			bp[ix].buf_ep = ep;
532 			bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
533 			bp[ix].buf_ds.ds_key = ep->ep_snd_lkey;
534 			bp[ix].buf_state = RDS_SNDBUF_FREE;
535 			mp = mp + rcv_len;
536 		}
537 		bp[nbuf - 1].buf_nextp = NULL;
538 		bp[nbuf - 1].buf_ep = ep;
539 		bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
540 		bp[nbuf - 1].buf_ds.ds_key = ep->ep_snd_lkey;
541 		bp[nbuf - 1].buf_state = RDS_SNDBUF_FREE;
542 		spool->pool_headp = &bp[0];
543 		spool->pool_tailp = &bp[nbuf - 1];
544 	}
545 
546 	RDS_DPRINTF3(LABEL, "rdsmemp start: %p end: %p", memp, mp);
547 	RDS_DPRINTF2("rds_init_send_pool", "Return");
548 
549 	return (0);
550 }
551 
552 int
553 rds_reinit_send_pool(rds_ep_t *ep, ib_guid_t hca_guid)
554 {
555 	rds_buf_t	*bp;
556 	rds_hca_t	*hcap;
557 	ibt_mr_attr_t   mem_attr;
558 	ibt_mr_desc_t   mem_desc;
559 	rds_bufpool_t   *spool;
560 	int		ret;
561 
562 	RDS_DPRINTF2("rds_reinit_send_pool", "Enter: EP(%p)", ep);
563 
564 	spool = &ep->ep_sndpool;
565 	ASSERT(spool->pool_memp != NULL);
566 
567 	/* deregister the send pool memory from the previous HCA */
568 	hcap = rds_get_hcap(rdsib_statep, ep->ep_hca_guid);
569 	if (hcap == NULL) {
570 		RDS_DPRINTF2("rds_reinit_send_pool", "HCA (0x%llx) not found",
571 		    ep->ep_hca_guid);
572 	} else {
573 		if (ep->ep_snd_mrhdl != NULL) {
574 			(void) ibt_deregister_mr(hcap->hca_hdl,
575 			    ep->ep_snd_mrhdl);
576 			ep->ep_snd_mrhdl = NULL;
577 			ep->ep_snd_lkey = 0;
578 		}
579 
580 		if ((ep->ep_type == RDS_EP_TYPE_DATA) &&
581 		    (ep->ep_ackhdl != NULL)) {
582 			(void) ibt_deregister_mr(hcap->hca_hdl, ep->ep_ackhdl);
583 			ep->ep_ackhdl = NULL;
584 			ep->ep_ack_rkey = 0;
585 		}
586 
587 		ep->ep_hca_guid = NULL;
588 	}
589 
590 	/* get the hcap for the new HCA */
591 	hcap = rds_get_hcap(rdsib_statep, hca_guid);
592 	if (hcap == NULL) {
593 		RDS_DPRINTF2("rds_reinit_send_pool", "HCA (0x%llx) not found",
594 		    hca_guid);
595 		return (-1);
596 	}
597 
598 	/* register the send memory */
599 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)spool->pool_memp;
600 	mem_attr.mr_len = spool->pool_memsize;
601 	mem_attr.mr_as = NULL;
602 	mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE;
603 
604 	ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
605 	    &mem_attr, &ep->ep_snd_mrhdl, &mem_desc);
606 	if (ret != IBT_SUCCESS) {
607 		RDS_DPRINTF2("rds_reinit_send_pool",
608 		    "EP(%p): ibt_register_mr failed: %d", ep, ret);
609 		return (-1);
610 	}
611 	ep->ep_snd_lkey = mem_desc.md_lkey;
612 
613 	/* register the acknowledgement space */
614 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
615 		mem_attr.mr_vaddr = (ib_vaddr_t)ep->ep_ack_addr;
616 		mem_attr.mr_len = sizeof (uintptr_t);
617 		mem_attr.mr_as = NULL;
618 		mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE |
619 		    IBT_MR_ENABLE_REMOTE_WRITE;
620 
621 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
622 		    &mem_attr, &ep->ep_ackhdl, &mem_desc);
623 		if (ret != IBT_SUCCESS) {
624 			RDS_DPRINTF2("rds_reinit_send_pool",
625 			    "EP(%p): ibt_register_mr for ack failed: %d",
626 			    ep, ret);
627 			(void) ibt_deregister_mr(hcap->hca_hdl,
628 			    ep->ep_snd_mrhdl);
629 			ep->ep_snd_mrhdl = NULL;
630 			ep->ep_snd_lkey = 0;
631 			return (-1);
632 		}
633 		ep->ep_ack_rkey = mem_desc.md_rkey;
634 
635 		/* update the LKEY in the acknowledgement WR */
636 		ep->ep_ackds.ds_key = ep->ep_snd_lkey;
637 	}
638 
639 	/* update the LKEY in each buffer */
640 	bp = spool->pool_headp;
641 	while (bp) {
642 		bp->buf_ds.ds_key = ep->ep_snd_lkey;
643 		bp = bp->buf_nextp;
644 	}
645 
646 	ep->ep_hca_guid = hca_guid;
647 
648 	RDS_DPRINTF2("rds_reinit_send_pool", "Return: EP(%p)", ep);
649 
650 	return (0);
651 }
652 
653 void
654 rds_free_recv_pool(rds_ep_t *ep)
655 {
656 	rds_bufpool_t *pool;
657 
658 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
659 		pool = &rds_dpool;
660 	} else {
661 		pool = &rds_cpool;
662 	}
663 
664 	mutex_enter(&ep->ep_rcvpool.pool_lock);
665 	if (ep->ep_rcvpool.pool_nfree != 0) {
666 		rds_free_buf(pool, ep->ep_rcvpool.pool_headp,
667 		    ep->ep_rcvpool.pool_nfree);
668 		ep->ep_rcvpool.pool_nfree = 0;
669 		ep->ep_rcvpool.pool_headp = NULL;
670 		ep->ep_rcvpool.pool_tailp = NULL;
671 	}
672 	mutex_exit(&ep->ep_rcvpool.pool_lock);
673 }
674 
675 int
676 rds_init_recv_pool(rds_ep_t *ep)
677 {
678 	rds_bufpool_t	*rpool;
679 	rds_qp_t	*recvqp;
680 
681 	recvqp = &ep->ep_recvqp;
682 	rpool = &ep->ep_rcvpool;
683 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
684 		recvqp->qp_depth = MaxDataRecvBuffers;
685 		recvqp->qp_level = 0;
686 		recvqp->qp_lwm = (DataRecvBufferLWM * MaxDataRecvBuffers)/100;
687 		recvqp->qp_taskqpending = B_FALSE;
688 
689 		rpool->pool_nbuffers = MaxDataRecvBuffers;
690 		rpool->pool_nbusy = 0;
691 		rpool->pool_nfree = 0;
692 	} else {
693 		recvqp->qp_depth = MaxCtrlRecvBuffers;
694 		recvqp->qp_level = 0;
695 		recvqp->qp_lwm = (CtrlRecvBufferLWM * MaxCtrlRecvBuffers)/100;
696 		recvqp->qp_taskqpending = B_FALSE;
697 
698 		rpool->pool_nbuffers = MaxCtrlRecvBuffers;
699 		rpool->pool_nbusy = 0;
700 		rpool->pool_nfree = 0;
701 	}
702 
703 	return (0);
704 }
705 
706 /* Free buffers to the global pool, either cpool or dpool */
707 void
708 rds_free_buf(rds_bufpool_t *pool, rds_buf_t *bp, uint_t nbuf)
709 {
710 	uint_t		ix;
711 
712 	RDS_DPRINTF4("rds_free_buf", "Enter");
713 
714 	ASSERT(nbuf != 0);
715 
716 	mutex_enter(&pool->pool_lock);
717 
718 	if (pool->pool_nfree != 0) {
719 		pool->pool_tailp->buf_nextp = bp;
720 	} else {
721 		pool->pool_headp = bp;
722 	}
723 
724 	if (nbuf == 1) {
725 		ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
726 		bp->buf_ep = NULL;
727 		bp->buf_nextp = NULL;
728 		pool->pool_tailp = bp;
729 	} else {
730 		for (ix = 1; ix < nbuf; ix++) {
731 			ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
732 			bp->buf_ep = NULL;
733 			bp = bp->buf_nextp;
734 		}
735 		ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
736 		bp->buf_ep = NULL;
737 		bp->buf_nextp = NULL;
738 		pool->pool_tailp = bp;
739 	}
740 	/* tail is always the last buffer */
741 	pool->pool_tailp->buf_nextp = NULL;
742 
743 	pool->pool_nfree += nbuf;
744 	pool->pool_nbusy -= nbuf;
745 
746 	mutex_exit(&pool->pool_lock);
747 
748 	RDS_DPRINTF4("rds_free_buf", "Return");
749 }
750 
751 /* Get buffers from the global pools, either cpool or dpool */
752 rds_buf_t *
753 rds_get_buf(rds_bufpool_t *pool, uint_t nbuf, uint_t *nret)
754 {
755 	rds_buf_t	*bp = NULL, *bp1;
756 	uint_t		ix;
757 
758 	RDS_DPRINTF4("rds_get_buf", "Enter");
759 
760 	mutex_enter(&pool->pool_lock);
761 
762 	RDS_DPRINTF3("rds_get_buf", "Available: %d Needed: %d",
763 	    pool->pool_nfree, nbuf);
764 
765 	if (nbuf < pool->pool_nfree) {
766 		*nret = nbuf;
767 
768 		bp1 = pool->pool_headp;
769 		for (ix = 1; ix < nbuf; ix++) {
770 			bp1 = bp1->buf_nextp;
771 		}
772 
773 		bp = pool->pool_headp;
774 		pool->pool_headp = bp1->buf_nextp;
775 		bp1->buf_nextp = NULL;
776 
777 		pool->pool_nfree -= nbuf;
778 		pool->pool_nbusy += nbuf;
779 	} else if (nbuf >= pool->pool_nfree) {
780 		*nret = pool->pool_nfree;
781 
782 		bp = pool->pool_headp;
783 
784 		pool->pool_headp = NULL;
785 		pool->pool_tailp = NULL;
786 
787 		pool->pool_nbusy += pool->pool_nfree;
788 		pool->pool_nfree = 0;
789 	}
790 
791 	mutex_exit(&pool->pool_lock);
792 
793 	RDS_DPRINTF4("rds_get_buf", "Return");
794 
795 	return (bp);
796 }
797 
798 boolean_t
799 rds_is_recvq_empty(rds_ep_t *ep, boolean_t wait)
800 {
801 	rds_qp_t	*recvqp;
802 	rds_bufpool_t	*rpool;
803 	boolean_t ret = B_TRUE;
804 
805 	recvqp = &ep->ep_recvqp;
806 	mutex_enter(&recvqp->qp_lock);
807 	RDS_DPRINTF2("rds_is_recvq_empty", "EP(%p): QP has %d WRs",
808 	    ep, recvqp->qp_level);
809 	if (wait) {
810 		/* wait until the RQ is empty */
811 		while (recvqp->qp_level != 0) {
812 			/* wait one second and try again */
813 			mutex_exit(&recvqp->qp_lock);
814 			delay(drv_usectohz(1000000));
815 			mutex_enter(&recvqp->qp_lock);
816 		}
817 	} else if (recvqp->qp_level != 0) {
818 			ret = B_FALSE;
819 	}
820 	mutex_exit(&recvqp->qp_lock);
821 
822 	rpool = &ep->ep_rcvpool;
823 	mutex_enter(&rpool->pool_lock);
824 	RDS_DPRINTF2("rds_is_recvq_empty", "EP(%p): "
825 	    "There are %d pending buffers on sockqs", ep, rpool->pool_nbusy);
826 	if (wait) {
827 		/* Wait for all buffers to be freed by sockfs */
828 		while (rpool->pool_nbusy != 0) {
829 			/* wait one second and try again */
830 			mutex_exit(&rpool->pool_lock);
831 			delay(drv_usectohz(1000000));
832 			mutex_enter(&rpool->pool_lock);
833 		}
834 	} else if (rpool->pool_nbusy != 0) {
835 			ret = B_FALSE;
836 	}
837 	mutex_exit(&rpool->pool_lock);
838 
839 	return (ret);
840 }
841 
842 boolean_t
843 rds_is_sendq_empty(rds_ep_t *ep, uint_t wait)
844 {
845 	rds_bufpool_t	*spool;
846 	rds_buf_t	*bp;
847 	boolean_t	ret1 = B_TRUE;
848 
849 	/* check if all the sends completed */
850 	spool = &ep->ep_sndpool;
851 	mutex_enter(&spool->pool_lock);
852 	RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
853 	    "Send Pool contains: %d", ep, spool->pool_nbusy);
854 	if (wait) {
855 		while (spool->pool_nbusy != 0) {
856 			if (rds_no_interrupts) {
857 				/* wait one second and try again */
858 				delay(drv_usectohz(1000000));
859 				rds_poll_send_completions(ep->ep_sendcq, ep,
860 				    B_TRUE);
861 			} else {
862 				/* wait one second and try again */
863 				mutex_exit(&spool->pool_lock);
864 				delay(drv_usectohz(1000000));
865 				mutex_enter(&spool->pool_lock);
866 			}
867 		}
868 
869 		if ((wait == 2) && (ep->ep_type == RDS_EP_TYPE_DATA)) {
870 			rds_buf_t	*ackbp;
871 			rds_buf_t	*prev_ackbp;
872 
873 			/*
874 			 * If the last one is acknowledged then everything
875 			 * is acknowledged
876 			 */
877 			bp = spool->pool_tailp;
878 			ackbp = *(rds_buf_t **)ep->ep_ack_addr;
879 			prev_ackbp = ackbp;
880 			RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
881 			    "Checking for acknowledgements", ep);
882 			while (bp != ackbp) {
883 				RDS_DPRINTF2("rds_is_sendq_empty",
884 				    "EP(%p) BP(0x%p/0x%p) last "
885 				    "sent/acknowledged", ep, bp, ackbp);
886 				mutex_exit(&spool->pool_lock);
887 				delay(drv_usectohz(1000000));
888 				mutex_enter(&spool->pool_lock);
889 
890 				bp = spool->pool_tailp;
891 				ackbp = *(rds_buf_t **)ep->ep_ack_addr;
892 				if (ackbp == prev_ackbp) {
893 					RDS_DPRINTF2("rds_is_sendq_empty",
894 					    "There has been no progress,"
895 					    "give up and proceed");
896 					break;
897 				}
898 				prev_ackbp = ackbp;
899 			}
900 		}
901 	} else if (spool->pool_nbusy != 0) {
902 			ret1 = B_FALSE;
903 	}
904 	mutex_exit(&spool->pool_lock);
905 
906 	/* check if all the rdma acks completed */
907 	mutex_enter(&ep->ep_lock);
908 	RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
909 	    "Outstanding RDMA Acks: %d", ep, ep->ep_rdmacnt);
910 	if (wait) {
911 		while (ep->ep_rdmacnt != 0) {
912 			if (rds_no_interrupts) {
913 				/* wait one second and try again */
914 				delay(drv_usectohz(1000000));
915 				rds_poll_send_completions(ep->ep_sendcq, ep,
916 				    B_FALSE);
917 			} else {
918 				/* wait one second and try again */
919 				mutex_exit(&ep->ep_lock);
920 				delay(drv_usectohz(1000000));
921 				mutex_enter(&ep->ep_lock);
922 			}
923 		}
924 	} else if (ep->ep_rdmacnt != 0) {
925 			ret1 = B_FALSE;
926 	}
927 	mutex_exit(&ep->ep_lock);
928 
929 	return (ret1);
930 }
931 
932 /* Get buffers from the send pool */
933 rds_buf_t *
934 rds_get_send_buf(rds_ep_t *ep, uint_t nbuf)
935 {
936 	rds_buf_t	*bp = NULL, *bp1;
937 	rds_bufpool_t	*spool;
938 	uint_t		waittime = rds_waittime_ms * 1000;
939 	uint_t		ix;
940 	int		ret;
941 
942 	RDS_DPRINTF4("rds_get_send_buf", "Enter: EP(%p) Buffers requested: %d",
943 	    ep, nbuf);
944 
945 	spool = &ep->ep_sndpool;
946 	mutex_enter(&spool->pool_lock);
947 
948 	if (rds_no_interrupts) {
949 		if ((spool->pool_sqpoll_pending == B_FALSE) &&
950 		    (spool->pool_nbusy >
951 		    (spool->pool_nbuffers * rds_poll_percent_full)/100)) {
952 			spool->pool_sqpoll_pending = B_TRUE;
953 			mutex_exit(&spool->pool_lock);
954 			rds_poll_send_completions(ep->ep_sendcq, ep, B_FALSE);
955 			mutex_enter(&spool->pool_lock);
956 			spool->pool_sqpoll_pending = B_FALSE;
957 		}
958 	}
959 
960 	if (spool->pool_nfree < nbuf) {
961 		/* wait for buffers to become available */
962 		spool->pool_cv_count += nbuf;
963 		ret = cv_timedwait_sig(&spool->pool_cv, &spool->pool_lock,
964 		    ddi_get_lbolt() + drv_usectohz(waittime));
965 		/* ret = cv_wait_sig(&spool->pool_cv, &spool->pool_lock); */
966 		if (ret == 0) {
967 			/* signal pending */
968 			spool->pool_cv_count -= nbuf;
969 			mutex_exit(&spool->pool_lock);
970 			return (NULL);
971 		}
972 
973 		spool->pool_cv_count -= nbuf;
974 	}
975 
976 	/* Have the number of buffers needed */
977 	if (spool->pool_nfree > nbuf) {
978 		bp = spool->pool_headp;
979 
980 		if (ep->ep_type == RDS_EP_TYPE_DATA) {
981 			rds_buf_t *ackbp;
982 			ackbp = *(rds_buf_t **)ep->ep_ack_addr;
983 
984 			/* check if all the needed buffers are acknowledged */
985 			bp1 = bp;
986 			for (ix = 0; ix < nbuf; ix++) {
987 				if ((bp1 == ackbp) ||
988 				    (bp1->buf_state != RDS_SNDBUF_FREE)) {
989 					/*
990 					 * The buffer is not yet signalled or
991 					 * is not yet acknowledged
992 					 */
993 					RDS_DPRINTF5("rds_get_send_buf",
994 					    "EP(%p) Buffer (%p) not yet "
995 					    "acked/completed", ep, bp1);
996 					mutex_exit(&spool->pool_lock);
997 					return (NULL);
998 				}
999 
1000 				bp1 = bp1->buf_nextp;
1001 			}
1002 		}
1003 
1004 		/* mark the buffers as pending */
1005 		bp1 = bp;
1006 		for (ix = 1; ix < nbuf; ix++) {
1007 			ASSERT(bp1->buf_state == RDS_SNDBUF_FREE);
1008 			bp1->buf_state = RDS_SNDBUF_PENDING;
1009 			bp1 = bp1->buf_nextp;
1010 		}
1011 		ASSERT(bp1->buf_state == RDS_SNDBUF_FREE);
1012 		bp1->buf_state = RDS_SNDBUF_PENDING;
1013 
1014 		spool->pool_headp = bp1->buf_nextp;
1015 		bp1->buf_nextp = NULL;
1016 		if (spool->pool_headp == NULL)
1017 			spool->pool_tailp = NULL;
1018 		spool->pool_nfree -= nbuf;
1019 		spool->pool_nbusy += nbuf;
1020 	}
1021 	mutex_exit(&spool->pool_lock);
1022 
1023 	RDS_DPRINTF4("rds_get_send_buf", "Return: EP(%p) Buffers requested: %d",
1024 	    ep, nbuf);
1025 
1026 	return (bp);
1027 }
1028 
1029 #define	RDS_MIN_BUF_TO_WAKE_THREADS	10
1030 
1031 void
1032 rds_free_send_buf(rds_ep_t *ep, rds_buf_t *headp, rds_buf_t *tailp, uint_t nbuf,
1033     boolean_t lock)
1034 {
1035 	rds_bufpool_t	*spool;
1036 	rds_buf_t	*tmp;
1037 
1038 	RDS_DPRINTF4("rds_free_send_buf", "Enter");
1039 
1040 	ASSERT(nbuf != 0);
1041 
1042 	if (tailp == NULL) {
1043 		if (nbuf > 1) {
1044 			tmp = headp;
1045 			while (tmp->buf_nextp) {
1046 				tmp = tmp->buf_nextp;
1047 			}
1048 			tailp = tmp;
1049 		} else {
1050 			tailp = headp;
1051 		}
1052 	}
1053 
1054 	spool = &ep->ep_sndpool;
1055 
1056 	if (lock == B_FALSE) {
1057 		/* lock is not held outside */
1058 		mutex_enter(&spool->pool_lock);
1059 	}
1060 
1061 	if (spool->pool_nfree) {
1062 		spool->pool_tailp->buf_nextp = headp;
1063 	} else {
1064 		spool->pool_headp = headp;
1065 	}
1066 	spool->pool_tailp = tailp;
1067 
1068 	spool->pool_nfree += nbuf;
1069 	spool->pool_nbusy -= nbuf;
1070 
1071 	if ((spool->pool_cv_count > 0) &&
1072 	    (spool->pool_nfree > RDS_MIN_BUF_TO_WAKE_THREADS)) {
1073 		if (spool->pool_nfree >= spool->pool_cv_count)
1074 			cv_broadcast(&spool->pool_cv);
1075 		else
1076 			cv_signal(&spool->pool_cv);
1077 	}
1078 
1079 	if (lock == B_FALSE) {
1080 		mutex_exit(&spool->pool_lock);
1081 	}
1082 
1083 	RDS_DPRINTF4("rds_free_send_buf", "Return");
1084 }
1085 
1086 #define	RDS_NBUFFERS_TO_PUTBACK	100
1087 void
1088 rds_free_recv_buf(rds_buf_t *bp, uint_t nbuf)
1089 {
1090 	rds_ep_t	*ep;
1091 	rds_bufpool_t	*rpool;
1092 	rds_buf_t	*bp1;
1093 	uint_t		ix;
1094 
1095 	RDS_DPRINTF4("rds_free_recv_buf", "Enter");
1096 
1097 	ASSERT(nbuf != 0);
1098 
1099 	ep = bp->buf_ep;
1100 	rpool = &ep->ep_rcvpool;
1101 
1102 	mutex_enter(&rpool->pool_lock);
1103 
1104 	/* Add the buffers to the local pool */
1105 	if (rpool->pool_tailp == NULL) {
1106 		ASSERT(rpool->pool_headp == NULL);
1107 		ASSERT(rpool->pool_nfree == 0);
1108 		rpool->pool_headp = bp;
1109 		bp1 = bp;
1110 		for (ix = 1; ix < nbuf; ix++) {
1111 			if (bp1->buf_state == RDS_RCVBUF_ONSOCKQ) {
1112 				rpool->pool_nbusy--;
1113 			}
1114 			bp1->buf_state = RDS_RCVBUF_FREE;
1115 			bp1 = bp1->buf_nextp;
1116 		}
1117 		bp1->buf_nextp = NULL;
1118 		if (bp->buf_state == RDS_RCVBUF_ONSOCKQ) {
1119 			rpool->pool_nbusy--;
1120 		}
1121 		bp->buf_state = RDS_RCVBUF_FREE;
1122 		rpool->pool_tailp = bp1;
1123 		rpool->pool_nfree += nbuf;
1124 	} else {
1125 		bp1 = bp;
1126 		for (ix = 1; ix < nbuf; ix++) {
1127 			if (bp1->buf_state == RDS_RCVBUF_ONSOCKQ) {
1128 				rpool->pool_nbusy--;
1129 			}
1130 			bp1->buf_state = RDS_RCVBUF_FREE;
1131 			bp1 = bp1->buf_nextp;
1132 		}
1133 		bp1->buf_nextp = NULL;
1134 		if (bp->buf_state == RDS_RCVBUF_ONSOCKQ) {
1135 			rpool->pool_nbusy--;
1136 		}
1137 		bp->buf_state = RDS_RCVBUF_FREE;
1138 		rpool->pool_tailp->buf_nextp = bp;
1139 		rpool->pool_tailp = bp1;
1140 		rpool->pool_nfree += nbuf;
1141 	}
1142 
1143 	if (rpool->pool_nfree >= RDS_NBUFFERS_TO_PUTBACK) {
1144 		bp = rpool->pool_headp;
1145 		nbuf = rpool->pool_nfree;
1146 		rpool->pool_headp = NULL;
1147 		rpool->pool_tailp = NULL;
1148 		rpool->pool_nfree = 0;
1149 		mutex_exit(&rpool->pool_lock);
1150 
1151 		/* Free the buffers to the global pool */
1152 		if (ep->ep_type == RDS_EP_TYPE_DATA) {
1153 			rds_free_buf(&rds_dpool, bp, nbuf);
1154 		} else {
1155 			rds_free_buf(&rds_cpool, bp, nbuf);
1156 		}
1157 
1158 		return;
1159 	}
1160 	mutex_exit(&rpool->pool_lock);
1161 
1162 	RDS_DPRINTF4("rds_free_recv_buf", "Return");
1163 }
1164