xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rdsv3/connection.c (revision 5d5562f583b2b6affe19bdce0b3c8b1840d667a4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright (c) 2006 Oracle.  All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *      - Redistributions of source code must retain the above
39  *        copyright notice, this list of conditions and the following
40  *        disclaimer.
41  *
42  *      - Redistributions in binary form must reproduce the above
43  *        copyright notice, this list of conditions and the following
44  *        disclaimer in the documentation and/or other materials
45  *        provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 #include <sys/types.h>
58 #include <sys/kmem.h>
59 #include <sys/rds.h>
60 
61 #include <sys/ib/clients/rdsv3/rdsv3.h>
62 #include <sys/ib/clients/rdsv3/loop.h>
63 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
64 
65 /* converting this to RCU is a chore for another day.. */
66 static krwlock_t rdsv3_conn_lock;
67 static unsigned long rdsv3_conn_count;
68 struct avl_tree rdsv3_conn_hash;
69 static struct kmem_cache *rdsv3_conn_slab = NULL;
70 
71 #define	rdsv3_conn_info_set(var, test, suffix) do {               \
72 	if (test)                                               \
73 		var |= RDSV3_INFO_CONNECTION_FLAG_##suffix;     \
74 } while (0)
75 
76 
77 static struct rdsv3_connection *
78 rdsv3_conn_lookup(uint32_be_t laddr, uint32_be_t faddr, avl_index_t *pos)
79 {
80 	struct rdsv3_connection *conn;
81 	struct rdsv3_conn_info_s conn_info;
82 	avl_index_t place = 0;
83 
84 	conn_info.c_laddr = laddr;
85 	conn_info.c_faddr = faddr;
86 
87 	conn = avl_find(&rdsv3_conn_hash, &conn_info, &place);
88 
89 	RDSV3_DPRINTF5("rdsv3_conn_lookup",
90 	    "returning conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
91 	    conn, NIPQUAD(laddr), NIPQUAD(faddr));
92 
93 	if (pos != NULL)
94 		*pos = place;
95 
96 	return (conn);
97 }
98 
99 /*
100  * This is called by transports as they're bringing down a connection.
101  * It clears partial message state so that the transport can start sending
102  * and receiving over this connection again in the future.  It is up to
103  * the transport to have serialized this call with its send and recv.
104  */
105 void
106 rdsv3_conn_reset(struct rdsv3_connection *conn)
107 {
108 	RDSV3_DPRINTF2("rdsv3_conn_reset",
109 	    "connection %u.%u.%u.%u to %u.%u.%u.%u reset",
110 	    NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
111 
112 	rdsv3_stats_inc(s_conn_reset);
113 	rdsv3_send_reset(conn);
114 	conn->c_flags = 0;
115 
116 	/*
117 	 * Do not clear next_rx_seq here, else we cannot distinguish
118 	 * retransmitted packets from new packets, and will hand all
119 	 * of them to the application. That is not consistent with the
120 	 * reliability guarantees of RDS.
121 	 */
122 }
123 
124 /*
125  * There is only every one 'conn' for a given pair of addresses in the
126  * system at a time.  They contain messages to be retransmitted and so
127  * span the lifetime of the actual underlying transport connections.
128  *
129  * For now they are not garbage collected once they're created.  They
130  * are torn down as the module is removed, if ever.
131  */
132 static struct rdsv3_connection *
133 __rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
134     struct rdsv3_transport *trans, int gfp, int is_outgoing)
135 {
136 	struct rdsv3_connection *conn, *parent = NULL;
137 	avl_index_t pos;
138 	int ret;
139 
140 	rw_enter(&rdsv3_conn_lock, RW_READER);
141 	conn = rdsv3_conn_lookup(laddr, faddr, &pos);
142 	if (conn &&
143 	    conn->c_loopback &&
144 	    conn->c_trans != &rdsv3_loop_transport &&
145 	    !is_outgoing) {
146 		/*
147 		 * This is a looped back IB connection, and we're
148 		 * called by the code handling the incoming connect.
149 		 * We need a second connection object into which we
150 		 * can stick the other QP.
151 		 */
152 		parent = conn;
153 		conn = parent->c_passive;
154 	}
155 	rw_exit(&rdsv3_conn_lock);
156 	if (conn)
157 		goto out;
158 
159 	RDSV3_DPRINTF2("__rdsv3_conn_create", "Enter(%x -> %x)",
160 	    ntohl(laddr), ntohl(faddr));
161 
162 	conn = kmem_cache_alloc(rdsv3_conn_slab, gfp);
163 	if (!conn) {
164 		conn = ERR_PTR(-ENOMEM);
165 		goto out;
166 	}
167 
168 	/* see rdsv3_conn_constructor */
169 	conn->c_laddr = laddr;
170 	conn->c_faddr = faddr;
171 
172 	ret = rdsv3_cong_get_maps(conn);
173 	if (ret) {
174 		kmem_cache_free(rdsv3_conn_slab, conn);
175 		conn = ERR_PTR(ret);
176 		goto out;
177 	}
178 
179 	/*
180 	 * This is where a connection becomes loopback.  If *any* RDS sockets
181 	 * can bind to the destination address then we'd rather the messages
182 	 * flow through loopback rather than either transport.
183 	 */
184 	if (rdsv3_trans_get_preferred(faddr)) {
185 		conn->c_loopback = 1;
186 		if (is_outgoing && trans->t_prefer_loopback) {
187 			/*
188 			 * "outgoing" connection - and the transport
189 			 * says it wants the connection handled by the
190 			 * loopback transport. This is what TCP does.
191 			 */
192 			trans = &rdsv3_loop_transport;
193 		}
194 	}
195 
196 	conn->c_trans = trans;
197 
198 	ret = trans->conn_alloc(conn, gfp);
199 	if (ret) {
200 		kmem_cache_free(rdsv3_conn_slab, conn);
201 		conn = ERR_PTR(ret);
202 		goto out;
203 	}
204 
205 	conn->c_state = RDSV3_CONN_DOWN;
206 	conn->c_reconnect_jiffies = 0;
207 	RDSV3_INIT_DELAYED_WORK(&conn->c_send_w, rdsv3_send_worker);
208 	RDSV3_INIT_DELAYED_WORK(&conn->c_recv_w, rdsv3_recv_worker);
209 	RDSV3_INIT_DELAYED_WORK(&conn->c_conn_w, rdsv3_connect_worker);
210 	RDSV3_INIT_DELAYED_WORK(&conn->c_reap_w, rdsv3_reaper_worker);
211 	RDSV3_INIT_WORK(&conn->c_down_w, rdsv3_shutdown_worker);
212 	mutex_init(&conn->c_cm_lock, NULL, MUTEX_DRIVER, NULL);
213 	conn->c_flags = 0;
214 
215 	RDSV3_DPRINTF2("__rdsv3_conn_create",
216 	    "allocated conn %p for %u.%u.%u.%u -> %u.%u.%u.%u over %s %s",
217 	    conn, NIPQUAD(laddr), NIPQUAD(faddr),
218 	    trans->t_name ? trans->t_name : "[unknown]",
219 	    is_outgoing ? "(outgoing)" : "");
220 
221 	/*
222 	 * Since we ran without holding the conn lock, someone could
223 	 * have created the same conn (either normal or passive) in the
224 	 * interim. We check while holding the lock. If we won, we complete
225 	 * init and return our conn. If we lost, we rollback and return the
226 	 * other one.
227 	 */
228 	rw_enter(&rdsv3_conn_lock, RW_WRITER);
229 	if (parent) {
230 		/* Creating passive conn */
231 		if (parent->c_passive) {
232 			trans->conn_free(conn->c_transport_data);
233 			kmem_cache_free(rdsv3_conn_slab, conn);
234 			conn = parent->c_passive;
235 		} else {
236 			parent->c_passive = conn;
237 			rdsv3_cong_add_conn(conn);
238 			rdsv3_conn_count++;
239 		}
240 	} else {
241 		/* Creating normal conn */
242 		struct rdsv3_connection *found;
243 
244 		found = rdsv3_conn_lookup(laddr, faddr, &pos);
245 		if (found) {
246 			trans->conn_free(conn->c_transport_data);
247 			kmem_cache_free(rdsv3_conn_slab, conn);
248 			conn = found;
249 		} else {
250 			avl_insert(&rdsv3_conn_hash, conn, pos);
251 			rdsv3_cong_add_conn(conn);
252 			rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_reap_w,
253 			    RDSV3_REAPER_WAIT_JIFFIES);
254 			rdsv3_conn_count++;
255 		}
256 	}
257 
258 	rw_exit(&rdsv3_conn_lock);
259 
260 	RDSV3_DPRINTF2("__rdsv3_conn_create", "Return(conn: %p)", conn);
261 
262 out:
263 	return (conn);
264 }
265 
266 struct rdsv3_connection *
267 rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
268     struct rdsv3_transport *trans, int gfp)
269 {
270 	return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 0));
271 }
272 
273 struct rdsv3_connection *
274 rdsv3_conn_create_outgoing(uint32_be_t laddr, uint32_be_t faddr,
275     struct rdsv3_transport *trans, int gfp)
276 {
277 	return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 1));
278 }
279 
280 extern struct avl_tree	rdsv3_conn_hash;
281 
282 void
283 rdsv3_conn_shutdown(struct rdsv3_connection *conn)
284 {
285 	RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Enter(conn: %p)", conn);
286 
287 	/* shut it down unless it's down already */
288 	if (!rdsv3_conn_transition(conn, RDSV3_CONN_DOWN, RDSV3_CONN_DOWN)) {
289 		/*
290 		 * Quiesce the connection mgmt handlers before we start tearing
291 		 * things down. We don't hold the mutex for the entire
292 		 * duration of the shutdown operation, else we may be
293 		 * deadlocking with the CM handler. Instead, the CM event
294 		 * handler is supposed to check for state DISCONNECTING
295 		 */
296 		mutex_enter(&conn->c_cm_lock);
297 		if (!rdsv3_conn_transition(conn, RDSV3_CONN_UP,
298 		    RDSV3_CONN_DISCONNECTING) &&
299 		    !rdsv3_conn_transition(conn, RDSV3_CONN_ERROR,
300 		    RDSV3_CONN_DISCONNECTING)) {
301 			RDSV3_DPRINTF2("rdsv3_conn_shutdown",
302 			    "shutdown called in state %d",
303 			    atomic_get(&conn->c_state));
304 			rdsv3_conn_drop(conn);
305 			mutex_exit(&conn->c_cm_lock);
306 			return;
307 		}
308 		mutex_exit(&conn->c_cm_lock);
309 
310 		/* verify everybody's out of rds_send_xmit() */
311 		mutex_enter(&conn->c_send_lock);
312 		while (atomic_get(&conn->c_senders)) {
313 			mutex_exit(&conn->c_send_lock);
314 			delay(1);
315 			mutex_enter(&conn->c_send_lock);
316 		}
317 
318 		conn->c_trans->conn_shutdown(conn);
319 		rdsv3_conn_reset(conn);
320 		mutex_exit(&conn->c_send_lock);
321 
322 		if (!rdsv3_conn_transition(conn, RDSV3_CONN_DISCONNECTING,
323 		    RDSV3_CONN_DOWN)) {
324 			/*
325 			 * This can happen - eg when we're in the middle of
326 			 * tearing down the connection, and someone unloads
327 			 * the rds module.
328 			 * Quite reproduceable with loopback connections.
329 			 * Mostly harmless.
330 			 */
331 #ifndef __lock_lint
332 			RDSV3_DPRINTF2("rdsv3_conn_shutdown",
333 			    "failed to transition to state DOWN, "
334 			    "current statis is: %d",
335 			    atomic_get(&conn->c_state));
336 			rdsv3_conn_drop(conn);
337 #endif
338 			return;
339 		}
340 	}
341 
342 	/*
343 	 * Then reconnect if it's still live.
344 	 * The passive side of an IB loopback connection is never added
345 	 * to the conn hash, so we never trigger a reconnect on this
346 	 * conn - the reconnect is always triggered by the active peer.
347 	 */
348 	rdsv3_cancel_delayed_work(&conn->c_conn_w);
349 
350 	{
351 		struct rdsv3_conn_info_s conn_info;
352 
353 		conn_info.c_laddr = conn->c_laddr;
354 		conn_info.c_faddr = conn->c_faddr;
355 		if (avl_find(&rdsv3_conn_hash, &conn_info, NULL) == conn)
356 			rdsv3_queue_reconnect(conn);
357 	}
358 	RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Exit");
359 }
360 
361 /*
362  * Stop and free a connection.
363  */
364 void
365 rdsv3_conn_destroy(struct rdsv3_connection *conn)
366 {
367 	struct rdsv3_message *rm, *rtmp;
368 	list_t to_be_dropped;
369 
370 	RDSV3_DPRINTF4("rdsv3_conn_destroy",
371 	    "freeing conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
372 	    conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
373 
374 	avl_remove(&rdsv3_conn_hash, conn);
375 
376 	rdsv3_cancel_delayed_work(&conn->c_reap_w);
377 	rdsv3_cancel_delayed_work(&conn->c_send_w);
378 	rdsv3_cancel_delayed_work(&conn->c_recv_w);
379 
380 	rdsv3_conn_shutdown(conn);
381 
382 	/* tear down queued messages */
383 
384 	list_create(&to_be_dropped, sizeof (struct rdsv3_message),
385 	    offsetof(struct rdsv3_message, m_conn_item));
386 
387 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_retrans, m_conn_item) {
388 		list_remove_node(&rm->m_conn_item);
389 		list_insert_tail(&to_be_dropped, rm);
390 	}
391 
392 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_send_queue,
393 	    m_conn_item) {
394 		list_remove_node(&rm->m_conn_item);
395 		list_insert_tail(&to_be_dropped, rm);
396 	}
397 
398 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &to_be_dropped, m_conn_item) {
399 		clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
400 		list_remove_node(&rm->m_conn_item);
401 		rdsv3_message_put(rm);
402 	}
403 
404 	if (conn->c_xmit_rm)
405 		rdsv3_message_put(conn->c_xmit_rm);
406 
407 	conn->c_trans->conn_free(conn->c_transport_data);
408 
409 	/*
410 	 * The congestion maps aren't freed up here.  They're
411 	 * freed by rdsv3_cong_exit() after all the connections
412 	 * have been freed.
413 	 */
414 	rdsv3_cong_remove_conn(conn);
415 
416 	ASSERT(list_is_empty(&conn->c_retrans));
417 	kmem_cache_free(rdsv3_conn_slab, conn);
418 
419 	rdsv3_conn_count--;
420 }
421 
422 /* ARGSUSED */
423 static void
424 rdsv3_conn_message_info(struct rsock *sock, unsigned int len,
425     struct rdsv3_info_iterator *iter,
426     struct rdsv3_info_lengths *lens,
427     int want_send)
428 {
429 	struct list *list;
430 	struct rdsv3_connection *conn;
431 	struct rdsv3_message *rm;
432 	unsigned int total = 0;
433 
434 	RDSV3_DPRINTF4("rdsv3_conn_message_info", "Enter");
435 
436 	len /= sizeof (struct rdsv3_info_message);
437 
438 	rw_enter(&rdsv3_conn_lock, RW_READER);
439 
440 	if (avl_is_empty(&rdsv3_conn_hash)) {
441 		/* no connections */
442 		rw_exit(&rdsv3_conn_lock);
443 		return;
444 	}
445 
446 	conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
447 
448 	do {
449 		if (want_send)
450 			list = &conn->c_send_queue;
451 		else
452 			list = &conn->c_retrans;
453 
454 		mutex_enter(&conn->c_lock);
455 
456 		/* XXX too lazy to maintain counts.. */
457 		RDSV3_FOR_EACH_LIST_NODE(rm, list, m_conn_item) {
458 			total++;
459 			if (total <= len)
460 				rdsv3_inc_info_copy(&rm->m_inc, iter,
461 				    conn->c_laddr, conn->c_faddr, 0);
462 		}
463 
464 		mutex_exit(&conn->c_lock);
465 
466 		conn = AVL_NEXT(&rdsv3_conn_hash, conn);
467 	} while (conn != NULL);
468 	rw_exit(&rdsv3_conn_lock);
469 
470 	lens->nr = total;
471 	lens->each = sizeof (struct rdsv3_info_message);
472 
473 	RDSV3_DPRINTF4("rdsv3_conn_message_info", "Return");
474 }
475 
476 static void
477 rdsv3_conn_message_info_send(struct rsock *sock, unsigned int len,
478     struct rdsv3_info_iterator *iter,
479     struct rdsv3_info_lengths *lens)
480 {
481 	rdsv3_conn_message_info(sock, len, iter, lens, 1);
482 }
483 
484 static void
485 rdsv3_conn_message_info_retrans(struct rsock *sock,
486     unsigned int len,
487     struct rdsv3_info_iterator *iter,
488     struct rdsv3_info_lengths *lens)
489 {
490 	rdsv3_conn_message_info(sock, len, iter, lens, 0);
491 }
492 
493 /* ARGSUSED */
494 void
495 rdsv3_for_each_conn_info(struct rsock *sock, unsigned int len,
496     struct rdsv3_info_iterator *iter,
497     struct rdsv3_info_lengths *lens,
498     int (*visitor)(struct rdsv3_connection *, void *),
499     size_t item_len)
500 {
501 	uint8_t *buffer;
502 	struct rdsv3_connection *conn;
503 
504 	rw_enter(&rdsv3_conn_lock, RW_READER);
505 
506 	lens->nr = 0;
507 	lens->each = item_len;
508 
509 	if (avl_is_empty(&rdsv3_conn_hash)) {
510 		/* no connections */
511 		rw_exit(&rdsv3_conn_lock);
512 		return;
513 	}
514 
515 	/* allocate a little extra as this can get cast to a uint64_t */
516 	buffer = kmem_zalloc(item_len + 8, KM_SLEEP);
517 
518 	conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
519 
520 	do {
521 		/* XXX no c_lock usage.. */
522 		if (visitor(conn, buffer)) {
523 			/*
524 			 * We copy as much as we can fit in the buffer,
525 			 * but we count all items so that the caller
526 			 * can resize the buffer.
527 			 */
528 			if (len >= item_len) {
529 				RDSV3_DPRINTF4("rdsv3_for_each_conn_info",
530 				    "buffer: %p iter: %p bytes: %d", buffer,
531 				    iter->addr + iter->offset, item_len);
532 				rdsv3_info_copy(iter, buffer, item_len);
533 				len -= item_len;
534 			}
535 			lens->nr++;
536 		}
537 		conn = AVL_NEXT(&rdsv3_conn_hash, conn);
538 	} while (conn != NULL);
539 	rw_exit(&rdsv3_conn_lock);
540 
541 	kmem_free(buffer, item_len + 8);
542 }
543 
544 static int
545 rdsv3_conn_info_visitor(struct rdsv3_connection *conn, void *buffer)
546 {
547 	struct rdsv3_info_connection *cinfo = buffer;
548 
549 	cinfo->next_tx_seq = conn->c_next_tx_seq;
550 	cinfo->next_rx_seq = conn->c_next_rx_seq;
551 	cinfo->laddr = conn->c_laddr;
552 	cinfo->faddr = conn->c_faddr;
553 	(void) strncpy((char *)cinfo->transport, conn->c_trans->t_name,
554 	    sizeof (cinfo->transport));
555 	cinfo->flags = 0;
556 
557 	rdsv3_conn_info_set(cinfo->flags,
558 	    MUTEX_HELD(&conn->c_send_lock), SENDING);
559 
560 	/* XXX Future: return the state rather than these funky bits */
561 	rdsv3_conn_info_set(cinfo->flags,
562 	    atomic_get(&conn->c_state) == RDSV3_CONN_CONNECTING,
563 	    CONNECTING);
564 	rdsv3_conn_info_set(cinfo->flags,
565 	    atomic_get(&conn->c_state) == RDSV3_CONN_UP,
566 	    CONNECTED);
567 	return (1);
568 }
569 
570 static void
571 rdsv3_conn_info(struct rsock *sock, unsigned int len,
572     struct rdsv3_info_iterator *iter, struct rdsv3_info_lengths *lens)
573 {
574 	rdsv3_for_each_conn_info(sock, len, iter, lens,
575 	    rdsv3_conn_info_visitor, sizeof (struct rdsv3_info_connection));
576 }
577 
578 int
579 rdsv3_conn_init()
580 {
581 	RDSV3_DPRINTF4("rdsv3_conn_init", "Enter");
582 
583 	rdsv3_conn_slab = kmem_cache_create("rdsv3_connection",
584 	    sizeof (struct rdsv3_connection), 0, rdsv3_conn_constructor,
585 	    rdsv3_conn_destructor, NULL, NULL, NULL, 0);
586 	if (!rdsv3_conn_slab) {
587 		RDSV3_DPRINTF2("rdsv3_conn_init",
588 		    "kmem_cache_create(rdsv3_conn_slab) failed");
589 		return (-ENOMEM);
590 	}
591 
592 	avl_create(&rdsv3_conn_hash, rdsv3_conn_compare,
593 	    sizeof (struct rdsv3_connection), offsetof(struct rdsv3_connection,
594 	    c_hash_node));
595 
596 	rw_init(&rdsv3_conn_lock, NULL, RW_DRIVER, NULL);
597 
598 	rdsv3_loop_init();
599 
600 	rdsv3_info_register_func(RDSV3_INFO_CONNECTIONS, rdsv3_conn_info);
601 	rdsv3_info_register_func(RDSV3_INFO_SEND_MESSAGES,
602 	    rdsv3_conn_message_info_send);
603 	rdsv3_info_register_func(RDSV3_INFO_RETRANS_MESSAGES,
604 	    rdsv3_conn_message_info_retrans);
605 
606 	RDSV3_DPRINTF4("rdsv3_conn_init", "Return");
607 
608 	return (0);
609 }
610 
611 void
612 rdsv3_conn_exit()
613 {
614 	RDSV3_DPRINTF4("rdsv3_conn_exit", "Enter");
615 
616 	rdsv3_loop_exit();
617 
618 	rw_destroy(&rdsv3_conn_lock);
619 	avl_destroy(&rdsv3_conn_hash);
620 
621 	ASSERT(rdsv3_conn_slab);
622 	kmem_cache_destroy(rdsv3_conn_slab);
623 
624 	RDSV3_DPRINTF4("rdsv3_conn_exit", "Return");
625 }
626 
627 /*
628  * Force a disconnect
629  */
630 void
631 rdsv3_conn_drop(struct rdsv3_connection *conn)
632 {
633 	conn->c_state = RDSV3_CONN_ERROR;
634 	rdsv3_queue_work(rdsv3_wq, &conn->c_down_w);
635 }
636