xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision b86efd96f8acd85ddaa930a2f0c1d664237e4aaf)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #pragma ident	"%Z%%M%	%I%	%E% SMI"
76 
77 #include <sys/stream.h>
78 #include <sys/ib/clients/rds/rdsib_cm.h>
79 #include <sys/ib/clients/rds/rdsib_ib.h>
80 #include <sys/ib/clients/rds/rdsib_buf.h>
81 #include <sys/ib/clients/rds/rdsib_ep.h>
82 #include <sys/ib/clients/rds/rds_kstat.h>
83 #include <sys/zone.h>
84 
85 #define	RDS_POLL_CQ_IN_2TICKS	1
86 
87 /*
88  * This File contains the endpoint related calls
89  */
90 
91 extern int rds_get_ibaddr(ipaddr_t, ipaddr_t, ib_gid_t *, ib_gid_t *);
92 extern boolean_t rds_islocal(ipaddr_t addr);
93 extern uint_t rds_wc_signal;
94 
95 static uint8_t
96 rds_is_port_marked(rds_session_t *sp, in_port_t port)
97 {
98 	uint8_t	ret;
99 
100 	if (sp != NULL) {
101 		rw_enter(&sp->session_portmap_lock, RW_READER);
102 		ret = (sp->session_portmap[port/8] & (1 << (port % 8)));
103 		rw_exit(&sp->session_portmap_lock);
104 	} else {
105 		rw_enter(&rds_local_portmap_lock, RW_READER);
106 		ret = (rds_local_portmap[port/8] & (1 << (port % 8)));
107 		rw_exit(&rds_local_portmap_lock);
108 	}
109 
110 	return (ret);
111 }
112 
113 static uint8_t
114 rds_check_n_mark_port(rds_session_t *sp, in_port_t port)
115 {
116 	uint8_t	ret;
117 
118 	if (sp != NULL) {
119 		rw_enter(&sp->session_portmap_lock, RW_WRITER);
120 		ret = (sp->session_portmap[port/8] & (1 << (port % 8)));
121 		if (!ret) {
122 			/* port is not marked, mark it */
123 			sp->session_portmap[port/8] =
124 			    sp->session_portmap[port/8] | (1 << (port % 8));
125 		}
126 		rw_exit(&sp->session_portmap_lock);
127 	} else {
128 		rw_enter(&rds_local_portmap_lock, RW_WRITER);
129 		ret = (rds_local_portmap[port/8] & (1 << (port % 8)));
130 		if (!ret) {
131 			/* port is not marked, mark it */
132 			rds_local_portmap[port/8] =
133 			    rds_local_portmap[port/8] | (1 << (port % 8));
134 		}
135 		rw_exit(&rds_local_portmap_lock);
136 	}
137 
138 	return (ret);
139 }
140 
141 static uint8_t
142 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port)
143 {
144 	uint8_t	ret;
145 
146 	if (sp != NULL) {
147 		rw_enter(&sp->session_portmap_lock, RW_WRITER);
148 		ret = (sp->session_portmap[port/8] & (1 << (port % 8)));
149 		if (ret) {
150 			/* port is marked, unmark it */
151 			sp->session_portmap[port/8] =
152 			    sp->session_portmap[port/8] & ~(1 << (port % 8));
153 		}
154 		rw_exit(&sp->session_portmap_lock);
155 	} else {
156 		rw_enter(&rds_local_portmap_lock, RW_WRITER);
157 		ret = (rds_local_portmap[port/8] & (1 << (port % 8)));
158 		if (ret) {
159 			/* port is marked, unmark it */
160 			rds_local_portmap[port/8] =
161 			    rds_local_portmap[port/8] & ~(1 << (port % 8));
162 		}
163 		rw_exit(&rds_local_portmap_lock);
164 	}
165 
166 	return (ret);
167 }
168 
169 static void
170 rds_mark_all_ports(rds_session_t *sp)
171 {
172 	if (sp != NULL) {
173 		rw_enter(&sp->session_portmap_lock, RW_WRITER);
174 		(void) memset(sp->session_portmap, 0xFF, RDS_PORT_MAP_SIZE);
175 		rw_exit(&sp->session_portmap_lock);
176 	} else {
177 		rw_enter(&rds_local_portmap_lock, RW_WRITER);
178 		(void) memset(rds_local_portmap, 0xFF, RDS_PORT_MAP_SIZE);
179 		rw_exit(&rds_local_portmap_lock);
180 	}
181 }
182 
183 static void
184 rds_unmark_all_ports(rds_session_t *sp)
185 {
186 	if (sp != NULL) {
187 		rw_enter(&sp->session_portmap_lock, RW_WRITER);
188 		bzero(sp->session_portmap, RDS_PORT_MAP_SIZE);
189 		rw_exit(&sp->session_portmap_lock);
190 	} else {
191 		rw_enter(&rds_local_portmap_lock, RW_WRITER);
192 		bzero(rds_local_portmap, RDS_PORT_MAP_SIZE);
193 		rw_exit(&rds_local_portmap_lock);
194 	}
195 }
196 
197 static void
198 rds_add_session(rds_session_t *sp, boolean_t locked)
199 {
200 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
201 
202 	if (!locked) {
203 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
204 	}
205 
206 	sp->session_nextp = rdsib_statep->rds_sessionlistp;
207 	rdsib_statep->rds_sessionlistp = sp;
208 	rdsib_statep->rds_nsessions++;
209 
210 	if (!locked) {
211 		rw_exit(&rdsib_statep->rds_sessionlock);
212 	}
213 	RDS_INCR_SESS();
214 
215 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
216 }
217 
218 /* Session lookup based on destination IP or destination node guid */
219 rds_session_t *
220 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
221 {
222 	rds_session_t	*sp;
223 
224 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
225 	    remoteip, node_guid);
226 
227 	/* A read/write lock is expected, will panic if none of them are held */
228 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
229 	sp = statep->rds_sessionlistp;
230 	while (sp) {
231 		if ((sp->session_rgid.gid_guid == node_guid) ||
232 		    (sp->session_remip == remoteip)) {
233 			break;
234 		}
235 
236 		sp = sp->session_nextp;
237 	}
238 
239 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
240 
241 	return (sp);
242 }
243 
244 static void
245 rds_ep_fini(rds_ep_t *ep)
246 {
247 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
248 
249 	/* free send pool */
250 	rds_free_send_pool(ep);
251 
252 	/* free recv pool */
253 	rds_free_recv_pool(ep);
254 
255 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
256 }
257 
258 /* Assumes SP write lock is held */
259 int
260 rds_ep_init(rds_ep_t *ep)
261 {
262 	uint_t		ret;
263 
264 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
265 
266 	/* send pool */
267 	ret = rds_init_send_pool(ep);
268 	if (ret != 0) {
269 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
270 		    ep, ret);
271 		return (-1);
272 	}
273 
274 	/* recv pool */
275 	ret = rds_init_recv_pool(ep);
276 	if (ret != 0) {
277 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
278 		    ep, ret);
279 		rds_free_send_pool(ep);
280 		return (-1);
281 	}
282 
283 	/* reset the ep state */
284 	mutex_enter(&ep->ep_lock);
285 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
286 	ep->ep_lbufid = NULL;
287 	ep->ep_rbufid = NULL;
288 	ep->ep_segfbp = NULL;
289 	ep->ep_seglbp = NULL;
290 
291 	/* Initialize the WR to send acknowledgements */
292 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
293 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
294 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
295 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
296 	ep->ep_ackwr.wr_nds = 1;
297 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
298 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
299 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
300 	mutex_exit(&ep->ep_lock);
301 
302 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
303 
304 	return (0);
305 }
306 
307 void
308 rds_session_fini(rds_session_t *sp)
309 {
310 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
311 
312 	rds_ep_fini(&sp->session_dataep);
313 	rds_ep_fini(&sp->session_ctrlep);
314 
315 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
316 }
317 
318 /*
319  * Allocate and initialize the resources needed for the control and
320  * data channels
321  */
322 int
323 rds_session_init(rds_session_t *sp)
324 {
325 	int		ret;
326 
327 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
328 
329 	/* CALLED WITH SESSION WRITE LOCK */
330 
331 	/* allocate and initialize the ctrl channel */
332 	ret = rds_ep_init(&sp->session_ctrlep);
333 	if (ret != 0) {
334 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
335 		    "failed", sp, &sp->session_ctrlep);
336 		return (-1);
337 	}
338 
339 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
340 
341 	/* allocate and initialize the data channel */
342 	ret = rds_ep_init(&sp->session_dataep);
343 	if (ret != 0) {
344 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
345 		    "failed", sp, &sp->session_dataep);
346 		rds_ep_fini(&sp->session_ctrlep);
347 		return (-1);
348 	}
349 
350 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
351 
352 	RDS_DPRINTF2("rds_session_init", "Return");
353 
354 	return (0);
355 }
356 
357 static int
358 rds_session_connect(rds_session_t *sp)
359 {
360 	ibt_channel_hdl_t	ctrlchan, datachan;
361 	rds_ep_t		*ep;
362 	ibt_path_info_t		pinfo;
363 	ibt_path_attr_t		pattr;
364 	ib_gid_t		lgid, rgid;
365 	int			ret;
366 
367 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
368 
369 	rw_enter(&sp->session_lock, RW_READER);
370 	rgid = sp->session_rgid;
371 	lgid = sp->session_lgid;
372 	rw_exit(&sp->session_lock);
373 
374 	/* get paths to the destination */
375 	bzero(&pattr, sizeof (ibt_path_attr_t));
376 	pattr.pa_dgids = &rgid;
377 	pattr.pa_sgid = lgid;
378 	pattr.pa_sd_flags = IBT_NO_SDATA;
379 	pattr.pa_num_dgids = 1;
380 	ret = ibt_get_paths(rdsib_statep->rds_ibhdl, IBT_PATH_NO_FLAGS,
381 	    &pattr, 1, &pinfo, NULL);
382 	if (ret != IBT_SUCCESS) {
383 		RDS_DPRINTF2(LABEL, "ibt_get_paths failed: %d", ret);
384 		return (-1);
385 	}
386 	pinfo.pi_sid = RDS_SERVICE_ID;
387 
388 	/* Override the packet life time based on the conf file */
389 	if (IBPktLifeTime != 0) {
390 		pinfo.pi_prim_cep_path.cep_cm_opaque1 = IBPktLifeTime;
391 	}
392 
393 	/* Session type may change if we run into peer-to-peer case. */
394 	rw_enter(&sp->session_lock, RW_READER);
395 	if (sp->session_type == RDS_SESSION_PASSIVE) {
396 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
397 		    "active end", sp);
398 		rw_exit(&sp->session_lock);
399 		return (0); /* return success */
400 	}
401 	rw_exit(&sp->session_lock);
402 
403 	/* connect the data ep first */
404 	ep = &sp->session_dataep;
405 	mutex_enter(&ep->ep_lock);
406 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
407 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
408 		mutex_exit(&ep->ep_lock);
409 		ret = rds_open_rc_channel(ep, &pinfo, IBT_BLOCKING, &datachan);
410 		if (ret != IBT_SUCCESS) {
411 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
412 			    "failed: %d", ret);
413 			return (-1);
414 		}
415 		sp->session_dataep.ep_chanhdl = datachan;
416 	} else {
417 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
418 		    "unexpected state: %d", sp, ep, ep->ep_state);
419 		mutex_exit(&ep->ep_lock);
420 		return (-1);
421 	}
422 
423 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
424 	    sp, ep);
425 
426 	ep = &sp->session_ctrlep;
427 	mutex_enter(&ep->ep_lock);
428 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
429 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
430 		mutex_exit(&ep->ep_lock);
431 		ret = rds_open_rc_channel(ep, &pinfo, IBT_BLOCKING, &ctrlchan);
432 		if (ret != IBT_SUCCESS) {
433 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
434 			    "failed: %d", ep, ret);
435 			return (-1);
436 		}
437 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
438 	} else {
439 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
440 		    "unexpected state: %d", sp, ep, ep->ep_state);
441 		mutex_exit(&ep->ep_lock);
442 		return (-1);
443 	}
444 
445 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
446 
447 	return (0);
448 }
449 
450 /*
451  * Can be called with or without session_lock.
452  */
453 void
454 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
455 {
456 	rds_ep_t		*ep;
457 
458 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
459 	    sp->session_state);
460 
461 	ep = &sp->session_dataep;
462 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
463 
464 	/* wait until the SQ is empty before closing */
465 	(void) rds_is_sendq_empty(ep, wait);
466 
467 	mutex_enter(&ep->ep_lock);
468 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
469 		mutex_exit(&ep->ep_lock);
470 		delay(drv_usectohz(300000));
471 		mutex_enter(&ep->ep_lock);
472 	}
473 
474 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
475 		ep->ep_state = RDS_EP_STATE_CLOSING;
476 		mutex_exit(&ep->ep_lock);
477 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
478 		mutex_enter(&ep->ep_lock);
479 	}
480 	rds_ep_free_rc_channel(ep);
481 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
482 	ep->ep_segfbp = NULL;
483 	ep->ep_seglbp = NULL;
484 	mutex_exit(&ep->ep_lock);
485 
486 	ep = &sp->session_ctrlep;
487 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
488 
489 	/* wait until the SQ is empty before closing */
490 	(void) rds_is_sendq_empty(ep, 1);
491 
492 	mutex_enter(&ep->ep_lock);
493 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
494 		mutex_exit(&ep->ep_lock);
495 		delay(drv_usectohz(300000));
496 		mutex_enter(&ep->ep_lock);
497 	}
498 
499 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
500 		mutex_exit(&ep->ep_lock);
501 		ep->ep_state = RDS_EP_STATE_CLOSING;
502 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
503 		mutex_enter(&ep->ep_lock);
504 	}
505 	rds_ep_free_rc_channel(ep);
506 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
507 	ep->ep_segfbp = NULL;
508 	ep->ep_seglbp = NULL;
509 	mutex_exit(&ep->ep_lock);
510 
511 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
512 }
513 
514 /* Free the session */
515 static void
516 rds_destroy_session(rds_session_t *sp)
517 {
518 	rds_ep_t	*ep;
519 	rds_bufpool_t	*pool;
520 
521 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
522 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
523 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
524 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
525 
526 	rw_enter(&sp->session_lock, RW_READER);
527 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
528 	    sp->session_state);
529 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
530 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
531 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
532 		rw_exit(&sp->session_lock);
533 		delay(drv_usectohz(1000000));
534 		rw_enter(&sp->session_lock, RW_READER);
535 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
536 		    "ON SESSION", sp, sp->session_state);
537 	}
538 	rw_exit(&sp->session_lock);
539 
540 	/* data channel */
541 	ep = &sp->session_dataep;
542 
543 	/* send pool locks */
544 	pool = &ep->ep_sndpool;
545 	cv_destroy(&pool->pool_cv);
546 	mutex_destroy(&pool->pool_lock);
547 
548 	/* recv pool locks */
549 	pool = &ep->ep_rcvpool;
550 	cv_destroy(&pool->pool_cv);
551 	mutex_destroy(&pool->pool_lock);
552 	mutex_destroy(&ep->ep_recvqp.qp_lock);
553 
554 	/* control channel */
555 	ep = &sp->session_ctrlep;
556 
557 	/* send pool locks */
558 	pool = &ep->ep_sndpool;
559 	cv_destroy(&pool->pool_cv);
560 	mutex_destroy(&pool->pool_lock);
561 
562 	/* recv pool locks */
563 	pool = &ep->ep_rcvpool;
564 	cv_destroy(&pool->pool_cv);
565 	mutex_destroy(&pool->pool_lock);
566 	mutex_destroy(&ep->ep_recvqp.qp_lock);
567 
568 	/* session */
569 	rw_destroy(&sp->session_lock);
570 	rw_destroy(&sp->session_portmap_lock);
571 
572 	/* free the session */
573 	kmem_free(sp, sizeof (rds_session_t));
574 
575 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
576 }
577 
578 /* This is called on the taskq thread */
579 static void
580 rds_failover_session(void *arg)
581 {
582 	rds_session_t	*sp = (rds_session_t *)arg;
583 	ib_gid_t	lgid, rgid;
584 	ipaddr_t	myip, remip;
585 	int		ret, cnt = 0;
586 
587 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
588 
589 	RDS_INCR_FAILOVERS();
590 
591 	rw_enter(&sp->session_lock, RW_WRITER);
592 	if (sp->session_type != RDS_SESSION_ACTIVE) {
593 		/*
594 		 * The remote side must have seen the error and initiated
595 		 * a re-connect.
596 		 */
597 		RDS_DPRINTF2("rds_failover_session",
598 		    "SP(%p) has become passive", sp);
599 		rw_exit(&sp->session_lock);
600 		return;
601 	}
602 	sp->session_failover++;
603 	rw_exit(&sp->session_lock);
604 
605 	/*
606 	 * The session is in ERROR state but close both channels
607 	 * for a clean start.
608 	 */
609 	rds_session_close(sp, IBT_BLOCKING, 1);
610 
611 	/* wait 1 sec before re-connecting */
612 	delay(drv_usectohz(1000000));
613 
614 	do {
615 		/* The ipaddr should be in the network order */
616 		myip = sp->session_myip;
617 		remip = sp->session_remip;
618 		ret = rds_sc_path_lookup(&myip, &remip);
619 		if (ret == 0) {
620 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
621 			    myip, remip);
622 		}
623 		/* check if we have (new) path from the source to destination */
624 		ret = rds_get_ibaddr(htonl(myip), htonl(remip), &lgid, &rgid);
625 		if (ret == 0) {
626 			break;
627 		}
628 
629 		RDS_DPRINTF1(LABEL, "rds_get_ibaddr failed: %d", ret);
630 		/* wait 1 sec before re-trying */
631 		delay(drv_usectohz(1000000));
632 		cnt++;
633 	} while (cnt < 3);
634 
635 	if (ret != 0) {
636 		rw_enter(&sp->session_lock, RW_WRITER);
637 		if (sp->session_type == RDS_SESSION_ACTIVE) {
638 			rds_session_fini(sp);
639 			sp->session_state = RDS_SESSION_STATE_FAILED;
640 		} else {
641 			RDS_DPRINTF2("rds_failover_session",
642 			    "SP(%p) has become passive", sp);
643 		}
644 		rw_exit(&sp->session_lock);
645 		return;
646 	}
647 
648 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
649 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
650 	    rgid.gid_guid);
651 
652 	rw_enter(&sp->session_lock, RW_WRITER);
653 	if (sp->session_type != RDS_SESSION_ACTIVE) {
654 		/*
655 		 * The remote side must have seen the error and initiated
656 		 * a re-connect.
657 		 */
658 		RDS_DPRINTF2("rds_failover_session",
659 		    "SP(%p) has become passive", sp);
660 		rw_exit(&sp->session_lock);
661 		return;
662 	}
663 
664 	/* move the session to init state */
665 	sp->session_state = RDS_SESSION_STATE_INIT;
666 	sp->session_lgid = lgid;
667 	sp->session_rgid = rgid;
668 	rw_exit(&sp->session_lock);
669 
670 	rds_session_open(sp);
671 
672 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
673 }
674 
675 void
676 rds_handle_send_error(rds_ep_t *ep)
677 {
678 	if (rds_is_sendq_empty(ep, 0)) {
679 		/* Session should already be in ERROR, try to reconnect */
680 		RDS_DPRINTF2("rds_handle_send_error",
681 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
682 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
683 		    (void *)ep->ep_sp, DDI_SLEEP);
684 	}
685 }
686 
687 /*
688  * Called in the CM handler on the passive side
689  * Called on a taskq thread.
690  */
691 void
692 rds_cleanup_passive_session(void *arg)
693 {
694 	rds_session_t	*sp = arg;
695 
696 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
697 	    sp->session_state);
698 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
699 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
700 
701 	rds_session_close(sp, IBT_BLOCKING, 1);
702 
703 	rw_enter(&sp->session_lock, RW_WRITER);
704 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
705 		rds_session_fini(sp);
706 		sp->session_state = RDS_SESSION_STATE_FINI;
707 		RDS_DPRINTF3("rds_cleanup_passive_session",
708 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
709 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
710 		rds_session_fini(sp);
711 		sp->session_state = RDS_SESSION_STATE_FAILED;
712 		RDS_DPRINTF3("rds_cleanup_passive_session",
713 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
714 	}
715 	rw_exit(&sp->session_lock);
716 
717 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
718 }
719 
720 /*
721  * Called by the CM handler on the passive side
722  * Called with WRITE lock on the session
723  */
724 void
725 rds_passive_session_fini(rds_session_t *sp)
726 {
727 	rds_ep_t	*ep;
728 
729 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
730 	    sp->session_state);
731 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
732 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
733 
734 	/* clean the data channel */
735 	ep = &sp->session_dataep;
736 	(void) rds_is_sendq_empty(ep, 1);
737 	mutex_enter(&ep->ep_lock);
738 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
739 	    ep->ep_state);
740 	rds_ep_free_rc_channel(ep);
741 	mutex_exit(&ep->ep_lock);
742 
743 	/* clean the control channel */
744 	ep = &sp->session_ctrlep;
745 	(void) rds_is_sendq_empty(ep, 1);
746 	mutex_enter(&ep->ep_lock);
747 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
748 	    ep->ep_state);
749 	rds_ep_free_rc_channel(ep);
750 	mutex_exit(&ep->ep_lock);
751 
752 	rds_session_fini(sp);
753 
754 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
755 }
756 
757 /*
758  * Can be called:
759  * 1. on driver detach
760  * 2. on taskq thread
761  * arg is always NULL
762  */
763 /* ARGSUSED */
764 void
765 rds_close_sessions(void *arg)
766 {
767 	rds_session_t *sp, *spnextp;
768 
769 	RDS_DPRINTF2("rds_close_sessions", "Enter");
770 
771 	/* wait until all the buffers are freed by the sockets */
772 	while (RDS_GET_RXPKTS_PEND() != 0) {
773 		/* wait one second and try again */
774 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
775 		    "pending packets", RDS_GET_RXPKTS_PEND());
776 		delay(drv_usectohz(1000000));
777 	}
778 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
779 
780 	/* close all the sessions */
781 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
782 	sp = rdsib_statep->rds_sessionlistp;
783 	while (sp) {
784 		rw_enter(&sp->session_lock, RW_WRITER);
785 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
786 		    sp->session_state);
787 
788 		switch (sp->session_state) {
789 		case RDS_SESSION_STATE_CONNECTED:
790 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
791 			rw_exit(&sp->session_lock);
792 
793 			rds_session_close(sp, IBT_BLOCKING, 2);
794 
795 			rw_enter(&sp->session_lock, RW_WRITER);
796 			sp->session_state = RDS_SESSION_STATE_CLOSED;
797 			RDS_DPRINTF3("rds_close_sessions",
798 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
799 			rds_session_fini(sp);
800 			sp->session_state = RDS_SESSION_STATE_FINI;
801 			RDS_DPRINTF3("rds_close_sessions",
802 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
803 			break;
804 
805 		case RDS_SESSION_STATE_ERROR:
806 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
807 		case RDS_SESSION_STATE_INIT:
808 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
809 			rw_exit(&sp->session_lock);
810 
811 			rds_session_close(sp, IBT_BLOCKING, 1);
812 
813 			rw_enter(&sp->session_lock, RW_WRITER);
814 			sp->session_state = RDS_SESSION_STATE_CLOSED;
815 			RDS_DPRINTF3("rds_close_sessions",
816 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
817 			/* FALLTHRU */
818 		case RDS_SESSION_STATE_CLOSED:
819 			rds_session_fini(sp);
820 			sp->session_state = RDS_SESSION_STATE_FINI;
821 			RDS_DPRINTF3("rds_close_sessions",
822 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
823 			break;
824 		}
825 
826 		rw_exit(&sp->session_lock);
827 		sp = sp->session_nextp;
828 	}
829 
830 	sp = rdsib_statep->rds_sessionlistp;
831 	rdsib_statep->rds_sessionlistp = NULL;
832 	rdsib_statep->rds_nsessions = 0;
833 	rw_exit(&rdsib_statep->rds_sessionlock);
834 
835 	while (sp) {
836 		spnextp = sp->session_nextp;
837 		rds_destroy_session(sp);
838 		RDS_DECR_SESS();
839 		sp = spnextp;
840 	}
841 
842 	/* free the global pool */
843 	rds_free_recv_caches(rdsib_statep);
844 
845 	RDS_DPRINTF2("rds_close_sessions", "Return");
846 }
847 
848 void
849 rds_session_open(rds_session_t *sp)
850 {
851 	int		ret;
852 
853 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
854 
855 	ret = rds_session_connect(sp);
856 	if (ret == -1) {
857 		/*
858 		 * may be the session has become passive due to
859 		 * hitting peer-to-peer case
860 		 */
861 		rw_enter(&sp->session_lock, RW_READER);
862 		if (sp->session_type == RDS_SESSION_PASSIVE) {
863 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
864 			    "has become passive from active", sp);
865 			rw_exit(&sp->session_lock);
866 			return;
867 		}
868 
869 		/* get the lock for writing */
870 		rw_exit(&sp->session_lock);
871 		rw_enter(&sp->session_lock, RW_WRITER);
872 		sp->session_state = RDS_SESSION_STATE_ERROR;
873 		RDS_DPRINTF3("rds_session_open",
874 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
875 		rw_exit(&sp->session_lock);
876 
877 		/* Connect request failed */
878 		rds_session_close(sp, IBT_BLOCKING, 1);
879 
880 		rw_enter(&sp->session_lock, RW_WRITER);
881 		rds_session_fini(sp);
882 		sp->session_state = RDS_SESSION_STATE_FAILED;
883 		RDS_DPRINTF3("rds_session_open",
884 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
885 		rw_exit(&sp->session_lock);
886 
887 		return;
888 	}
889 
890 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
891 	    sp, sp->session_myip, sp->session_remip);
892 
893 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
894 }
895 
896 /*
897  * Creates a session and inserts it into the list of sessions. The session
898  * state would be CREATED.
899  * Return Values:
900  *	EWOULDBLOCK
901  */
902 rds_session_t *
903 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
904     ibt_cm_req_rcv_t *reqp, uint8_t type)
905 {
906 	ib_gid_t	lgid, rgid;
907 	rds_session_t	*newp, *oldp;
908 	rds_ep_t	*dataep, *ctrlep;
909 	rds_bufpool_t	*pool;
910 	rds_hca_t	*hcap;
911 	int		ret;
912 
913 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x",
914 	    statep, localip, remip);
915 
916 	/* Allocate and initialize global buffer pool */
917 	ret = rds_init_recv_caches(statep);
918 	if (ret != 0) {
919 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
920 		return (NULL);
921 	}
922 
923 	/* enough memory for session (includes 2 endpoints) */
924 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
925 
926 	newp->session_remip = remip;
927 	newp->session_myip = localip;
928 	newp->session_type = type;
929 	newp->session_state = RDS_SESSION_STATE_CREATED;
930 	RDS_DPRINTF3("rds_session_create",
931 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
932 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
933 	rw_init(&newp->session_portmap_lock, NULL, RW_DRIVER, NULL);
934 
935 	/* Initialize data endpoint */
936 	dataep = &newp->session_dataep;
937 	dataep->ep_remip = newp->session_remip;
938 	dataep->ep_myip = newp->session_myip;
939 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
940 	dataep->ep_sp = newp;
941 	dataep->ep_type = RDS_EP_TYPE_DATA;
942 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
943 
944 	/* Initialize send pool locks */
945 	pool = &dataep->ep_sndpool;
946 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
947 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
948 
949 	/* Initialize recv pool locks */
950 	pool = &dataep->ep_rcvpool;
951 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
952 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
953 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
954 
955 	/* Initialize control endpoint */
956 	ctrlep = &newp->session_ctrlep;
957 	ctrlep->ep_remip = newp->session_remip;
958 	ctrlep->ep_myip = newp->session_myip;
959 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
960 	ctrlep->ep_sp = newp;
961 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
962 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
963 
964 	/* Initialize send pool locks */
965 	pool = &ctrlep->ep_sndpool;
966 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
967 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
968 
969 	/* Initialize recv pool locks */
970 	pool = &ctrlep->ep_rcvpool;
971 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
972 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
973 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
974 
975 	/* lkup if there is already a session */
976 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
977 	oldp = rds_session_lkup(statep, remip, 0);
978 	if (oldp != NULL) {
979 		/* A session to this destination exists */
980 		rw_exit(&statep->rds_sessionlock);
981 		rw_destroy(&newp->session_lock);
982 		rw_destroy(&newp->session_portmap_lock);
983 		mutex_destroy(&dataep->ep_lock);
984 		mutex_destroy(&ctrlep->ep_lock);
985 		kmem_free(newp, sizeof (rds_session_t));
986 		return (NULL);
987 	}
988 
989 	/* Insert this session into the list */
990 	rds_add_session(newp, B_TRUE);
991 
992 	/* unlock the session list */
993 	rw_exit(&statep->rds_sessionlock);
994 
995 	if (type == RDS_SESSION_ACTIVE) {
996 		ipaddr_t localip1, remip1;
997 
998 		/* The ipaddr should be in the network order */
999 		localip1 = localip;
1000 		remip1 = remip;
1001 		ret = rds_sc_path_lookup(&localip1, &remip1);
1002 		if (ret == 0) {
1003 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1004 			    localip, remip);
1005 		}
1006 
1007 		/* Get the gids for the source and destination ip addrs */
1008 		ret = rds_get_ibaddr(ntohl(localip1), ntohl(remip1),
1009 		    &lgid, &rgid);
1010 		if (ret != 0) {
1011 			RDS_DPRINTF1(LABEL, "rds_get_ibaddr failed: %d", ret);
1012 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1013 			return (NULL);
1014 		}
1015 
1016 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1017 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1018 		    rgid.gid_guid);
1019 	}
1020 
1021 	rw_enter(&newp->session_lock, RW_WRITER);
1022 	/* check for peer-to-peer case */
1023 	if (type == newp->session_type) {
1024 		/* no peer-to-peer case */
1025 		if (type == RDS_SESSION_ACTIVE) {
1026 			newp->session_lgid = lgid;
1027 			newp->session_rgid = rgid;
1028 		} else {
1029 			/* rgid is requester gid & lgid is receiver gid */
1030 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1031 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1032 		}
1033 
1034 		hcap = rds_gid_to_hcap(statep, newp->session_lgid);
1035 		if (hcap == NULL) {
1036 			RDS_DPRINTF1(LABEL, "SGID is on an uninitialized "
1037 			    "HCA: %llx", newp->session_lgid.gid_guid);
1038 			newp->session_state = RDS_SESSION_STATE_FAILED;
1039 			RDS_DPRINTF3("rds_session_create",
1040 			    "SP(%p) State RDS_SESSION_STATE_FAILED", newp);
1041 			rw_exit(&newp->session_lock);
1042 			return (NULL);
1043 		}
1044 		dataep->ep_hca_guid = hcap->hca_guid;
1045 		ctrlep->ep_hca_guid = hcap->hca_guid;
1046 	}
1047 	rw_exit(&newp->session_lock);
1048 
1049 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1050 
1051 	return (newp);
1052 }
1053 
1054 void
1055 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1056 {
1057 	cpkt->rcp_port = cpkt->rcp_port;
1058 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1059 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1060 
1061 	switch (cpkt->rcp_code) {
1062 	case RDS_CTRL_CODE_STALL:
1063 		RDS_INCR_STALLS_RCVD();
1064 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port);
1065 		break;
1066 	case RDS_CTRL_CODE_UNSTALL:
1067 		RDS_INCR_UNSTALLS_RCVD();
1068 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port);
1069 		break;
1070 	case RDS_CTRL_CODE_STALL_PORTS:
1071 		rds_mark_all_ports(sp);
1072 		break;
1073 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1074 		rds_unmark_all_ports(sp);
1075 		break;
1076 	case RDS_CTRL_CODE_HEARTBEAT:
1077 		break;
1078 	default:
1079 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1080 		    cpkt->rcp_code);
1081 		break;
1082 	}
1083 
1084 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1085 }
1086 
1087 void
1088 rds_post_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1089 {
1090 	ibt_send_wr_t	wr;
1091 	rds_ep_t	*ep;
1092 	rds_buf_t	*bp;
1093 	rds_ctrl_pkt_t	*cp;
1094 	int		ret;
1095 
1096 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1097 	    "Port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1098 
1099 	ep = &sp->session_ctrlep;
1100 
1101 	bp = rds_get_send_buf(ep, 1);
1102 	if (bp == NULL) {
1103 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1104 		    "message: SP(%p) Code: %d Port: %d", sp, cpkt->rcp_code,
1105 		    cpkt->rcp_port);
1106 		return;
1107 	}
1108 
1109 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1110 	cp->rcp_code = cpkt->rcp_code;
1111 	cp->rcp_port = cpkt->rcp_port;
1112 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1113 
1114 	wr.wr_id = (uintptr_t)bp;
1115 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1116 	wr.wr_trans = IBT_RC_SRV;
1117 	wr.wr_opcode = IBT_WRC_SEND;
1118 	wr.wr_nds = 1;
1119 	wr.wr_sgl = &bp->buf_ds;
1120 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1121 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1122 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1123 	if (ret != IBT_SUCCESS) {
1124 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1125 		    "%d", ep, ret);
1126 		bp->buf_state = RDS_SNDBUF_FREE;
1127 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1128 		return;
1129 	}
1130 
1131 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1132 	    "Port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1133 }
1134 
1135 void
1136 rds_send_control_message(void *arg)
1137 {
1138 	rds_buf_t	*bp;
1139 	rds_ctrl_pkt_t	*cp;
1140 	rds_session_t	*sp;
1141 	uint_t		ix;
1142 
1143 	RDS_DPRINTF4("rds_send_control_message", "Enter");
1144 
1145 	bp = (rds_buf_t *)arg;
1146 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1147 
1148 	/* send the stall message on all sessions */
1149 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1150 
1151 	sp = rdsib_statep->rds_sessionlistp;
1152 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1153 		ASSERT(sp != NULL);
1154 		if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1155 			rds_post_control_message(sp, cp);
1156 		}
1157 
1158 		sp = sp->session_nextp;
1159 	}
1160 
1161 	rw_exit(&rdsib_statep->rds_sessionlock);
1162 
1163 	/* free the arg */
1164 	rds_free_buf(&rds_cpool, bp, 1);
1165 
1166 	RDS_DPRINTF4("rds_send_control_message", "Return");
1167 }
1168 
1169 void
1170 rds_stall_port(in_port_t port)
1171 {
1172 	rds_ctrl_pkt_t	*cpkt;
1173 	rds_buf_t	*bp;
1174 	uint_t		ix;
1175 
1176 	RDS_DPRINTF4("rds_stall_port", "Enter: Port %d", port);
1177 
1178 	RDS_INCR_STALLS_TRIGGERED();
1179 	if (!rds_check_n_mark_port(NULL, port)) {
1180 
1181 		bp = rds_get_buf(&rds_cpool, 1, &ix);
1182 		if (bp == NULL) {
1183 			RDS_DPRINTF2(LABEL, "No buffers available "
1184 			    "to send control message: Code: %d "
1185 			    "Local Port: %d", RDS_CTRL_CODE_STALL, port);
1186 			(void) rds_check_n_unmark_port(NULL, port);
1187 			return;
1188 		}
1189 
1190 		cpkt = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1191 		cpkt->rcp_code = RDS_CTRL_CODE_STALL;
1192 		cpkt->rcp_port = port;
1193 #if 0
1194 		/*
1195 		 * Taskq runs at some later point in time and the port may
1196 		 * not be in stall state anymore at that time.
1197 		 */
1198 		(void) ddi_taskq_dispatch(rds_taskq,
1199 		    rds_send_control_message, (void *)bp, DDI_SLEEP);
1200 #else
1201 		rds_send_control_message((void *)bp);
1202 #endif
1203 		RDS_INCR_STALLS_SENT();
1204 	} else {
1205 		RDS_DPRINTF3(LABEL,
1206 		    "Port %d is already in stall state", port);
1207 	}
1208 
1209 	RDS_DPRINTF4("rds_stall_port", "Return: Port %d", port);
1210 }
1211 
1212 void
1213 rds_resume_port(in_port_t port)
1214 {
1215 	rds_ctrl_pkt_t	*cpkt;
1216 	rds_buf_t	*bp;
1217 	uint_t		ix;
1218 
1219 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1220 
1221 	RDS_INCR_UNSTALLS_TRIGGERED();
1222 	if (rds_check_n_unmark_port(NULL, port)) {
1223 
1224 		bp = rds_get_buf(&rds_cpool, 1, &ix);
1225 		if (bp == NULL) {
1226 			RDS_DPRINTF2(LABEL, "No buffers available "
1227 			    "to send control message: Code: %d "
1228 			    "Local Port: %d", RDS_CTRL_CODE_UNSTALL, port);
1229 			(void) rds_check_n_mark_port(NULL, port);
1230 			return;
1231 		}
1232 
1233 		/* send control message to resume the port for remote traffic */
1234 		cpkt = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1235 		cpkt->rcp_code = RDS_CTRL_CODE_UNSTALL;
1236 		cpkt->rcp_port = port;
1237 		(void) ddi_taskq_dispatch(rds_taskq,
1238 		    rds_send_control_message, (void *)bp, DDI_SLEEP);
1239 		RDS_INCR_UNSTALLS_SENT();
1240 	} else {
1241 		RDS_DPRINTF5(LABEL,
1242 		    "Port %d is not stalled anymore", port);
1243 	}
1244 
1245 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1246 }
1247 
1248 static int
1249 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1250     in_port_t recvport)
1251 {
1252 	ibt_send_wr_t	*wrp, wr;
1253 	rds_buf_t	*bp, *bp1;
1254 	rds_data_hdr_t	*pktp;
1255 	uint32_t	msgsize, npkts, residual, pktno, ix;
1256 	int		ret;
1257 
1258 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1259 	    ep, uiop);
1260 
1261 	/* how many pkts are needed to carry this msg */
1262 	msgsize = uiop->uio_resid;
1263 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1264 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1265 
1266 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1267 	    msgsize, npkts);
1268 
1269 	/* Get the buffers needed to post this message */
1270 	bp = rds_get_send_buf(ep, npkts);
1271 	if (bp == NULL) {
1272 		RDS_INCR_ENOBUFS();
1273 		return (ENOBUFS);
1274 	}
1275 
1276 	if (npkts > 1) {
1277 		/*
1278 		 * multi-pkt messages are posted at the same time as a list
1279 		 * of WRs
1280 		 */
1281 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1282 		    npkts, KM_SLEEP);
1283 	}
1284 
1285 
1286 	pktno = 0;
1287 	bp1 = bp;
1288 	do {
1289 		/* prepare the header */
1290 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1291 		pktp->dh_datalen = UserBufferSize;
1292 		pktp->dh_npkts = npkts - pktno;
1293 		pktp->dh_psn = pktno;
1294 		pktp->dh_sendport = sendport;
1295 		pktp->dh_recvport = recvport;
1296 		bp1->buf_ds.ds_len = RdsPktSize;
1297 
1298 		/* copy the data */
1299 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1300 		    UserBufferSize, UIO_WRITE, uiop);
1301 		if (ret != 0) {
1302 			break;
1303 		}
1304 
1305 		if (uiop->uio_resid == 0) {
1306 			pktp->dh_datalen = residual;
1307 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1308 			break;
1309 		}
1310 		pktno++;
1311 		bp1 = bp1->buf_nextp;
1312 	} while (uiop->uio_resid);
1313 
1314 	if (ret) {
1315 		/* uiomove failed */
1316 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1317 		    uiop, ret);
1318 		if (npkts > 1) {
1319 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1320 		}
1321 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1322 		return (ret);
1323 	}
1324 
1325 	if (npkts > 1) {
1326 		/* multi-pkt message */
1327 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1328 
1329 		bp1 = bp;
1330 		for (ix = 0; ix < npkts; ix++) {
1331 			wrp[ix].wr_id = (uintptr_t)bp1;
1332 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1333 			wrp[ix].wr_trans = IBT_RC_SRV;
1334 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1335 			wrp[ix].wr_nds = 1;
1336 			wrp[ix].wr_sgl = &bp1->buf_ds;
1337 			bp1 = bp1->buf_nextp;
1338 		}
1339 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1340 
1341 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1342 		if (ret != IBT_SUCCESS) {
1343 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1344 			    "%d for %d pkts", ep, ret, npkts);
1345 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1346 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1347 			return (ret);
1348 		}
1349 
1350 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1351 	} else {
1352 		/* single pkt */
1353 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1354 		wr.wr_id = (uintptr_t)bp;
1355 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1356 		wr.wr_trans = IBT_RC_SRV;
1357 		wr.wr_opcode = IBT_WRC_SEND;
1358 		wr.wr_nds = 1;
1359 		wr.wr_sgl = &bp->buf_ds;
1360 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1361 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1362 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1363 		if (ret != IBT_SUCCESS) {
1364 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1365 			    "%d", ep, ret);
1366 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1367 			return (ret);
1368 		}
1369 	}
1370 
1371 	RDS_INCR_TXPKTS(npkts);
1372 	RDS_INCR_TXBYTES(msgsize);
1373 
1374 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1375 	    ep, uiop);
1376 
1377 	return (0);
1378 }
1379 
1380 static int
1381 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1382     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1383 {
1384 	mblk_t		*mp;
1385 	int		ret;
1386 
1387 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1388 
1389 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1390 	    "%d to recvport: %d", sendport, recvport);
1391 
1392 	mp = allocb(uiop->uio_resid, BPRI_MED);
1393 	if (mp == NULL) {
1394 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1395 		    uiop->uio_resid);
1396 		return (ENOSPC);
1397 	}
1398 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1399 
1400 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1401 	if (ret) {
1402 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1403 		freeb(mp);
1404 		return (ret);
1405 	}
1406 
1407 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1408 	    zoneid);
1409 	if (ret != 0) {
1410 		if (ret == ENOSPC) {
1411 			/*
1412 			 * The message is delivered but cannot take more,
1413 			 * stall the port, if it is not already stalled
1414 			 */
1415 			RDS_DPRINTF2(LABEL, "Port %d NO SPACE", recvport);
1416 			rds_stall_port(recvport);
1417 		} else {
1418 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1419 			    "port %d failed: %d", sendport, recvport, ret);
1420 			return (ret);
1421 		}
1422 	}
1423 
1424 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1425 	return (0);
1426 }
1427 
1428 static void
1429 rds_resend_messages(void *arg)
1430 {
1431 	rds_session_t	*sp = (rds_session_t *)arg;
1432 	rds_ep_t	*ep;
1433 	rds_bufpool_t	*spool;
1434 	rds_buf_t	*bp, *endp, *tmp;
1435 	ibt_send_wr_t	*wrp;
1436 	uint_t		nwr = 0, ix, jx;
1437 	int		ret;
1438 
1439 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1440 
1441 	ep = &sp->session_dataep;
1442 
1443 	spool = &ep->ep_sndpool;
1444 	mutex_enter(&spool->pool_lock);
1445 
1446 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1447 
1448 	if (ep->ep_lbufid == NULL) {
1449 		RDS_DPRINTF2("rds_resend_messages",
1450 		    "SP(%p) Remote session is cleaned up ", sp);
1451 		/*
1452 		 * The remote end cleaned up its session. There may be loss
1453 		 * of messages. Mark all buffers as acknowledged.
1454 		 */
1455 		tmp = spool->pool_tailp;
1456 	} else {
1457 		tmp = (rds_buf_t *)ep->ep_lbufid;
1458 		RDS_DPRINTF2("rds_resend_messages",
1459 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1460 	}
1461 
1462 	endp = spool->pool_tailp;
1463 	bp = spool->pool_headp;
1464 	jx = 0;
1465 	while ((bp != NULL) && (bp != tmp)) {
1466 		bp->buf_state = RDS_SNDBUF_FREE;
1467 		jx++;
1468 		bp = bp->buf_nextp;
1469 	}
1470 
1471 	if (bp == NULL) {
1472 		mutex_exit(&spool->pool_lock);
1473 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1474 		    "found in the list", tmp);
1475 
1476 		rw_enter(&sp->session_lock, RW_WRITER);
1477 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1478 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1479 		} else {
1480 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1481 			    "Expected State: %d", sp, sp->session_state,
1482 			    RDS_SESSION_STATE_CONNECTED);
1483 		}
1484 		sp->session_failover--;
1485 		rw_exit(&sp->session_lock);
1486 		return;
1487 	}
1488 
1489 	/* Found the match */
1490 	bp->buf_state = RDS_SNDBUF_FREE;
1491 	jx++;
1492 
1493 	spool->pool_tailp = bp;
1494 	bp = bp->buf_nextp;
1495 	spool->pool_tailp->buf_nextp = NULL;
1496 	nwr = spool->pool_nfree - jx;
1497 	spool->pool_nfree = jx;
1498 	mutex_exit(&spool->pool_lock);
1499 
1500 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1501 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1502 
1503 	if (bp) {
1504 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1505 		    KM_SLEEP);
1506 
1507 		while (nwr) {
1508 			jx = (nwr > 100) ? 100 : nwr;
1509 
1510 			tmp = bp;
1511 			for (ix = 0; ix < jx; ix++) {
1512 				bp->buf_state = RDS_SNDBUF_PENDING;
1513 				wrp[ix].wr_id = (uintptr_t)bp;
1514 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1515 				wrp[ix].wr_trans = IBT_RC_SRV;
1516 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1517 				wrp[ix].wr_nds = 1;
1518 				wrp[ix].wr_sgl = &bp->buf_ds;
1519 				bp = bp->buf_nextp;
1520 			}
1521 
1522 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1523 			if (ret != IBT_SUCCESS) {
1524 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1525 				    "failed: %d for % pkts", ep, ret, jx);
1526 				break;
1527 			}
1528 
1529 			mutex_enter(&spool->pool_lock);
1530 			spool->pool_nbusy += jx;
1531 			mutex_exit(&spool->pool_lock);
1532 
1533 			nwr -= jx;
1534 		}
1535 
1536 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1537 
1538 		if (nwr != 0) {
1539 
1540 			/*
1541 			 * An error while failover is in progress. Some WRs are
1542 			 * posted while other remain. If any of the posted WRs
1543 			 * complete in error then they would dispatch a taskq to
1544 			 * do a failover. Getting the session lock will prevent
1545 			 * the taskq to wait until we are done here.
1546 			 */
1547 			rw_enter(&sp->session_lock, RW_READER);
1548 
1549 			/*
1550 			 * Wait until all the previous WRs are completed and
1551 			 * then queue the remaining, otherwise the order of
1552 			 * the messages may change.
1553 			 */
1554 			(void) rds_is_sendq_empty(ep, 1);
1555 
1556 			/* free the remaining buffers */
1557 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1558 
1559 			rw_exit(&sp->session_lock);
1560 			return;
1561 		}
1562 	}
1563 
1564 	rw_enter(&sp->session_lock, RW_WRITER);
1565 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1566 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1567 	} else {
1568 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1569 		    "Expected State: %d", sp, sp->session_state,
1570 		    RDS_SESSION_STATE_CONNECTED);
1571 	}
1572 	sp->session_failover--;
1573 	rw_exit(&sp->session_lock);
1574 
1575 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1576 }
1577 
1578 /*
1579  * This is called when a channel is connected. Transition the session to
1580  * CONNECTED state iff both channels are connected.
1581  */
1582 void
1583 rds_session_active(rds_session_t *sp)
1584 {
1585 	rds_ep_t	*ep;
1586 	uint_t		failover;
1587 
1588 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1589 
1590 	rw_enter(&sp->session_lock, RW_READER);
1591 
1592 	failover = sp->session_failover;
1593 
1594 	/*
1595 	 * we establish the data channel first, so check the control channel
1596 	 * first but make sure it is initialized.
1597 	 */
1598 	ep = &sp->session_ctrlep;
1599 	mutex_enter(&ep->ep_lock);
1600 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1601 		/* the session is not ready yet */
1602 		mutex_exit(&ep->ep_lock);
1603 		rw_exit(&sp->session_lock);
1604 		return;
1605 	}
1606 	mutex_exit(&ep->ep_lock);
1607 
1608 	/* control channel is connected, check the data channel */
1609 	ep = &sp->session_dataep;
1610 	mutex_enter(&ep->ep_lock);
1611 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1612 		/* data channel is not yet connected */
1613 		mutex_exit(&ep->ep_lock);
1614 		rw_exit(&sp->session_lock);
1615 		return;
1616 	}
1617 	mutex_exit(&ep->ep_lock);
1618 
1619 	if (failover) {
1620 		rw_exit(&sp->session_lock);
1621 
1622 		/*
1623 		 * The session has failed over. Previous msgs have to be
1624 		 * re-sent before the session is moved to the connected
1625 		 * state.
1626 		 */
1627 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1628 		    "to re-send messages", sp);
1629 		(void) ddi_taskq_dispatch(rds_taskq,
1630 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1631 		return;
1632 	}
1633 
1634 	/* the session is ready */
1635 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1636 	RDS_DPRINTF3("rds_session_active",
1637 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1638 
1639 	rw_exit(&sp->session_lock);
1640 
1641 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1642 }
1643 
1644 static int
1645 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1646     in_port_t recvport)
1647 {
1648 	int	ret;
1649 
1650 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1651 	    "%d", ep, sendport, recvport);
1652 
1653 	/* make sure the port is not stalled */
1654 	if (rds_is_port_marked(ep->ep_sp, recvport)) {
1655 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1656 		    ep->ep_sp, recvport);
1657 		RDS_INCR_EWOULDBLOCK();
1658 		ret = ENOMEM;
1659 	} else {
1660 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1661 	}
1662 
1663 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1664 
1665 	return (ret);
1666 }
1667 
1668 /* Send a message to a destination socket */
1669 int
1670 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1671     in_port_t recvport, zoneid_t zoneid)
1672 {
1673 	rds_session_t	*sp;
1674 	ib_gid_t	lgid, rgid;
1675 	rds_hca_t	*hcap;
1676 	int		ret;
1677 
1678 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1679 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1680 	    sendport, recvport);
1681 
1682 	/* If msg length is 0, just return success */
1683 	if (uiop->uio_resid == 0) {
1684 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1685 		return (0);
1686 	}
1687 
1688 	/* Is there a session to the destination? */
1689 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1690 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1691 	rw_exit(&rdsib_statep->rds_sessionlock);
1692 
1693 	/* Is this a loopback message? */
1694 	if ((sp == NULL) && (rds_islocal(recvip))) {
1695 		/* make sure the port is not stalled */
1696 		if (rds_is_port_marked(NULL, recvport)) {
1697 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1698 			    recvport);
1699 			RDS_INCR_EWOULDBLOCK();
1700 			return (ENOMEM);
1701 		}
1702 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1703 		    sendport, zoneid);
1704 		return (ret);
1705 	}
1706 
1707 	/* Not a loopback message */
1708 	if (sp == NULL) {
1709 		/* There is no session to the destination, create one. */
1710 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1711 		    "IP: 0x%x", recvip);
1712 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1713 		    RDS_SESSION_ACTIVE);
1714 		if (sp != NULL) {
1715 			rw_enter(&sp->session_lock, RW_WRITER);
1716 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1717 				ret = rds_session_init(sp);
1718 				if (ret != 0) {
1719 					RDS_DPRINTF2("rds_sendmsg",
1720 					    "SP(%p): rds_session_init failed",
1721 					    sp);
1722 					sp->session_state =
1723 					    RDS_SESSION_STATE_FAILED;
1724 					RDS_DPRINTF3("rds_sendmsg",
1725 					    "SP(%p) State "
1726 					    "RDS_SESSION_STATE_FAILED", sp);
1727 					rw_exit(&sp->session_lock);
1728 					return (EFAULT);
1729 				}
1730 				sp->session_state = RDS_SESSION_STATE_INIT;
1731 				RDS_DPRINTF3("rds_sendmsg",
1732 				    "SP(%p) State "
1733 				    "RDS_SESSION_STATE_INIT", sp);
1734 				rw_exit(&sp->session_lock);
1735 				rds_session_open(sp);
1736 			} else {
1737 				rw_exit(&sp->session_lock);
1738 			}
1739 		} else {
1740 			/* Is a session created for this destination */
1741 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1742 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
1743 			rw_exit(&rdsib_statep->rds_sessionlock);
1744 			if (sp == NULL) {
1745 				return (EFAULT);
1746 			}
1747 		}
1748 	}
1749 
1750 	/* There is a session to the destination */
1751 	rw_enter(&sp->session_lock, RW_READER);
1752 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1753 		rw_exit(&sp->session_lock);
1754 
1755 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
1756 		    recvport);
1757 		return (ret);
1758 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1759 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1760 		ipaddr_t sendip1, recvip1;
1761 
1762 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
1763 		    "%d", sp);
1764 		rw_exit(&sp->session_lock);
1765 		rw_enter(&sp->session_lock, RW_WRITER);
1766 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1767 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1768 			sp->session_state = RDS_SESSION_STATE_CREATED;
1769 			sp->session_type = RDS_SESSION_ACTIVE;
1770 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
1771 			    "RDS_SESSION_STATE_CREATED", sp);
1772 			rw_exit(&sp->session_lock);
1773 
1774 
1775 			/* The ipaddr should be in the network order */
1776 			sendip1 = sendip;
1777 			recvip1 = recvip;
1778 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
1779 			if (ret == 0) {
1780 				RDS_DPRINTF2(LABEL, "Path not found "
1781 				    "(0x%x 0x%x)", sendip1, recvip1);
1782 			}
1783 
1784 			/* Resolve the IP addresses */
1785 			ret = rds_get_ibaddr(htonl(sendip1), htonl(recvip1),
1786 			    &lgid, &rgid);
1787 			if (ret != 0) {
1788 				RDS_DPRINTF1(LABEL, "rds_get_ibaddr failed: %d",
1789 				    ret);
1790 				rw_enter(&sp->session_lock, RW_WRITER);
1791 				if (sp->session_type == RDS_SESSION_ACTIVE) {
1792 					sp->session_state =
1793 					    RDS_SESSION_STATE_FAILED;
1794 					RDS_DPRINTF3("rds_sendmsg",
1795 					    "SP(%p) State "
1796 					    "RDS_SESSION_STATE_FAILED", sp);
1797 					rw_exit(&sp->session_lock);
1798 					return (EFAULT);
1799 				} else {
1800 					rw_exit(&sp->session_lock);
1801 					return (ENOMEM);
1802 				}
1803 			}
1804 
1805 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1806 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1807 			    rgid.gid_guid);
1808 
1809 			rw_enter(&sp->session_lock, RW_WRITER);
1810 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1811 				sp->session_lgid = lgid;
1812 				sp->session_rgid = rgid;
1813 				hcap = rds_gid_to_hcap(rdsib_statep, lgid);
1814 				if (hcap == NULL) {
1815 					RDS_DPRINTF1(LABEL, "REQ received on "
1816 					    "an uninitialized HCA: %llx",
1817 					    sp->session_lgid.gid_guid);
1818 					sp->session_state =
1819 					    RDS_SESSION_STATE_FAILED;
1820 					RDS_DPRINTF3("rds_sendmsg",
1821 					    "SP(%p) State "
1822 					    "RDS_SESSION_STATE_FAILED", sp);
1823 					rw_exit(&sp->session_lock);
1824 					return (ENOMEM);
1825 				}
1826 
1827 				ret = rds_session_init(sp);
1828 				if (ret != 0) {
1829 					RDS_DPRINTF2("rds_sendmsg",
1830 					    "SP(%p): rds_session_init failed",
1831 					    sp);
1832 					sp->session_state =
1833 					    RDS_SESSION_STATE_FAILED;
1834 					RDS_DPRINTF3("rds_sendmsg",
1835 					    "SP(%p) State "
1836 					    "RDS_SESSION_STATE_FAILED", sp);
1837 					rw_exit(&sp->session_lock);
1838 					return (EFAULT);
1839 				}
1840 				sp->session_state = RDS_SESSION_STATE_INIT;
1841 				rw_exit(&sp->session_lock);
1842 
1843 				rds_session_open(sp);
1844 
1845 			} else {
1846 				RDS_DPRINTF2(LABEL, "SP(%p): state changed "
1847 				    "to %d", sp, sp->session_state);
1848 				rw_exit(&sp->session_lock);
1849 				return (ENOMEM);
1850 			}
1851 		} else {
1852 			RDS_DPRINTF2(LABEL, "SP(%p): Session state %d changed",
1853 			    sp, sp->session_state);
1854 			rw_exit(&sp->session_lock);
1855 			return (ENOMEM);
1856 		}
1857 	} else {
1858 		RDS_DPRINTF2(LABEL, "SP(%p): Session is in %d state",
1859 		    sp, sp->session_state);
1860 		rw_exit(&sp->session_lock);
1861 		return (ENOMEM);
1862 	}
1863 
1864 	rw_enter(&sp->session_lock, RW_READER);
1865 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1866 		rw_exit(&sp->session_lock);
1867 
1868 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
1869 		    recvport);
1870 	} else {
1871 		RDS_DPRINTF2(LABEL, "SP(%p): state(%d) not connected",
1872 		    sp, sp->session_state);
1873 		rw_exit(&sp->session_lock);
1874 	}
1875 
1876 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
1877 
1878 	return (ret);
1879 }
1880 
1881 /* Note: This is called on the CQ handler thread */
1882 void
1883 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
1884 {
1885 	mblk_t		*mp, *mp1;
1886 	rds_data_hdr_t	*pktp, *pktp1;
1887 	uint8_t		*datap;
1888 	rds_buf_t	*bp1;
1889 	rds_bufpool_t	*rpool;
1890 	uint_t		npkts, ix;
1891 	int		ret;
1892 
1893 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
1894 
1895 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
1896 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
1897 	npkts = pktp->dh_npkts;
1898 
1899 	/* increment rx pending here */
1900 	rpool = &ep->ep_rcvpool;
1901 	mutex_enter(&rpool->pool_lock);
1902 	rpool->pool_nbusy += npkts;
1903 	mutex_exit(&rpool->pool_lock);
1904 
1905 	/* this will get freed by sockfs */
1906 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
1907 	if (mp == NULL) {
1908 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
1909 		    ep, bp);
1910 		rds_free_recv_buf(bp, npkts);
1911 		return;
1912 	}
1913 	mp->b_wptr = datap + pktp->dh_datalen;
1914 	mp->b_datap->db_type = M_DATA;
1915 
1916 	mp1 = mp;
1917 	bp1 = bp->buf_nextp;
1918 	while (bp1 != NULL) {
1919 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1920 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
1921 		    RDS_DATA_HDR_SZ;
1922 
1923 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
1924 		    BPRI_HI, &bp1->buf_frtn);
1925 		if (mp1->b_cont == NULL) {
1926 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
1927 			    ep, bp1);
1928 			freemsg(mp);
1929 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
1930 			return;
1931 		}
1932 		mp1 = mp1->b_cont;
1933 		mp1->b_wptr = datap + pktp1->dh_datalen;
1934 		mp1->b_datap->db_type = M_DATA;
1935 
1936 		bp1 = bp1->buf_nextp;
1937 	}
1938 
1939 	RDS_INCR_RXPKTS_PEND(npkts);
1940 	RDS_INCR_RXPKTS(npkts);
1941 	RDS_INCR_RXBYTES(msgdsize(mp));
1942 
1943 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
1944 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
1945 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
1946 	    npkts, pktp->dh_psn);
1947 
1948 	/* store the last buffer id, no lock needed */
1949 	if (npkts > 1) {
1950 		ep->ep_rbufid = pktp1->dh_bufid;
1951 	} else {
1952 		ep->ep_rbufid = pktp->dh_bufid;
1953 	}
1954 
1955 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
1956 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
1957 	if (ret != 0) {
1958 		if (ret == ENOSPC) {
1959 			/*
1960 			 * The message is delivered but cannot take more,
1961 			 * stall the port
1962 			 */
1963 			RDS_DPRINTF2(LABEL, "Port %d NO SPACE",
1964 			    pktp->dh_recvport);
1965 			rds_stall_port(pktp->dh_recvport);
1966 		} else {
1967 			RDS_DPRINTF1(LABEL, "rds_deliver_new_msg returned: %d",
1968 			    ret);
1969 		}
1970 	}
1971 
1972 	mutex_enter(&ep->ep_lock);
1973 	if (ep->ep_rdmacnt == 0) {
1974 		ep->ep_rdmacnt++;
1975 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
1976 		mutex_exit(&ep->ep_lock);
1977 
1978 		/* send acknowledgement */
1979 		RDS_INCR_TXACKS();
1980 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
1981 		if (ret != IBT_SUCCESS) {
1982 			RDS_DPRINTF1(LABEL, "EP(%p): ibt_post_send for "
1983 			    "acknowledgement failed: %d, SQ depth: %d",
1984 			    ep, ret, ep->ep_sndpool.pool_nbusy);
1985 			mutex_enter(&ep->ep_lock);
1986 			ep->ep_rdmacnt--;
1987 			mutex_exit(&ep->ep_lock);
1988 		}
1989 	} else {
1990 		/* no room to send acknowledgement */
1991 		mutex_exit(&ep->ep_lock);
1992 	}
1993 
1994 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
1995 }
1996