xref: /illumos-gate/usr/src/lib/libnsl/rpc/clnt_vc.c (revision 48bbca81)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
24  * Copyright (c) 2016 by Delphix. All rights reserved.
25  */
26 
27 /*
28  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
29  * Use is subject to license terms.
30  */
31 
32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
33 /* All Rights Reserved */
34 /*
35  * Portions of this source code were derived from Berkeley
36  * 4.3 BSD under license from the Regents of the University of
37  * California.
38  */
39 
40 /*
41  * clnt_vc.c
42  *
43  * Implements a connectionful client side RPC.
44  *
45  * Connectionful RPC supports 'batched calls'.
46  * A sequence of calls may be batched-up in a send buffer. The rpc call
47  * return immediately to the client even though the call was not necessarily
48  * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49  * the rpc timeout value is zero (see clnt.h, rpc).
50  *
51  * Clients should NOT casually batch calls that in fact return results; that
52  * is the server side should be aware that a call is batched and not produce
53  * any return message. Batched calls that produce many result messages can
54  * deadlock (netlock) the client and the server....
55  */
56 
57 
58 #include "mt.h"
59 #include "rpc_mt.h"
60 #include <assert.h>
61 #include <rpc/rpc.h>
62 #include <errno.h>
63 #include <sys/byteorder.h>
64 #include <sys/mkdev.h>
65 #include <sys/poll.h>
66 #include <syslog.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <netinet/tcp.h>
70 #include <limits.h>
71 
72 #define	MCALL_MSG_SIZE 24
73 #define	SECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000 * 1000)
74 #define	MSECS_TO_NS(x)	((hrtime_t)(x) * 1000 * 1000)
75 #define	USECS_TO_NS(x)	((hrtime_t)(x) * 1000)
76 #define	NSECS_TO_MS(x)	((x) / 1000 / 1000)
77 #ifndef MIN
78 #define	MIN(a, b)	(((a) < (b)) ? (a) : (b))
79 #endif
80 
81 extern int __rpc_timeval_to_msec(struct timeval *);
82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
85 								caddr_t);
86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
88 		rpcvers_t, uint_t, uint_t, const struct timeval *);
89 
90 static struct clnt_ops	*clnt_vc_ops(void);
91 static int		read_vc(void *, caddr_t, int);
92 static int		write_vc(void *, caddr_t, int);
93 static int		t_rcvall(int, char *, int);
94 static bool_t		time_not_ok(struct timeval *);
95 
96 struct ct_data;
97 static bool_t		set_up_connection(int, struct netbuf *,
98 				struct ct_data *, const struct timeval *);
99 static bool_t		set_io_mode(struct ct_data *, int);
100 
101 /*
102  * Lock table handle used by various MT sync. routines
103  */
104 static mutex_t	vctbl_lock = DEFAULTMUTEX;
105 static void	*vctbl = NULL;
106 
107 static const char clnt_vc_errstr[] = "%s : %s";
108 static const char clnt_vc_str[] = "clnt_vc_create";
109 static const char clnt_read_vc_str[] = "read_vc";
110 static const char __no_mem_str[] = "out of memory";
111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
112 static const char no_nonblock_str[] = "could not set transport blocking mode";
113 
114 /*
115  * Private data structure
116  */
117 struct ct_data {
118 	int		ct_fd;		/* connection's fd */
119 	bool_t		ct_closeit;	/* close it on destroy */
120 	int		ct_tsdu;	/* size of tsdu */
121 	int		ct_wait;	/* wait interval in milliseconds */
122 	bool_t		ct_waitset;	/* wait set by clnt_control? */
123 	struct netbuf	ct_addr;	/* remote addr */
124 	struct rpc_err	ct_error;
125 	char		ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
126 	uint_t		ct_mpos;	/* pos after marshal */
127 	XDR		ct_xdrs;	/* XDR stream */
128 
129 	/* NON STANDARD INFO - 00-08-31 */
130 	bool_t		ct_is_oneway; /* True if the current call is oneway. */
131 	bool_t		ct_is_blocking;
132 	ushort_t	ct_io_mode;
133 	ushort_t	ct_blocking_mode;
134 	uint_t		ct_bufferSize; /* Total size of the buffer. */
135 	uint_t		ct_bufferPendingSize; /* Size of unsent data. */
136 	char 		*ct_buffer; /* Pointer to the buffer. */
137 	char 		*ct_bufferWritePtr; /* Ptr to the first free byte. */
138 	char 		*ct_bufferReadPtr; /* Ptr to the first byte of data. */
139 };
140 
141 struct nb_reg_node {
142 	struct nb_reg_node *next;
143 	struct ct_data *ct;
144 };
145 
146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
147 static struct nb_reg_node *nb_free  = (struct nb_reg_node *)&nb_free;
148 
149 static bool_t exit_handler_set = FALSE;
150 
151 static mutex_t nb_list_mutex = DEFAULTMUTEX;
152 
153 
154 /* Define some macros to manage the linked list. */
155 #define	LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
156 #define	LIST_CLR(l) (l = (struct nb_reg_node *)&l)
157 #define	LIST_ADD(l, node) (node->next = l->next, l = node)
158 #define	LIST_EXTRACT(l, node) (node = l, l = l->next)
159 #define	LIST_FOR_EACH(l, node) \
160 	for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
161 
162 
163 /* Default size of the IO buffer used in non blocking mode */
164 #define	DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
165 
166 static int nb_send(struct ct_data *, void *, unsigned int);
167 static int do_flush(struct ct_data *, uint_t);
168 static bool_t set_flush_mode(struct ct_data *, int);
169 static bool_t set_blocking_connection(struct ct_data *, bool_t);
170 
171 static int register_nb(struct ct_data *);
172 static int unregister_nb(struct ct_data *);
173 
174 
175 /*
176  * Change the mode of the underlying fd.
177  */
178 static bool_t
179 set_blocking_connection(struct ct_data *ct, bool_t blocking)
180 {
181 	int flag;
182 
183 	/*
184 	 * If the underlying fd is already in the required mode,
185 	 * avoid the syscall.
186 	 */
187 	if (ct->ct_is_blocking == blocking)
188 		return (TRUE);
189 
190 	if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
191 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
192 		    no_fcntl_getfl_str);
193 		return (FALSE);
194 	}
195 
196 	flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
197 	if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
198 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
199 		    no_nonblock_str);
200 		return (FALSE);
201 	}
202 	ct->ct_is_blocking = blocking;
203 	return (TRUE);
204 }
205 
206 /*
207  * Create a client handle for a connection.
208  * Default options are set, which the user can change using clnt_control()'s.
209  * The rpc/vc package does buffering similar to stdio, so the client
210  * must pick send and receive buffer sizes, 0 => use the default.
211  * NB: fd is copied into a private area.
212  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
213  * set this something more useful.
214  *
215  * fd should be open and bound.
216  */
217 CLIENT *
218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
219 	const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
220 {
221 	return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
222 	    recvsz, NULL));
223 }
224 
225 /*
226  * This has the same definition as clnt_vc_create(), except it
227  * takes an additional parameter - a pointer to a timeval structure.
228  *
229  * Not a public interface. This is for clnt_create_timed,
230  * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
231  * value to control a tcp connection attempt.
232  * (for bug 4049792: clnt_create_timed does not time out)
233  *
234  * If tp is NULL, use default timeout to set up the connection.
235  */
236 CLIENT *
237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
238 	rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
239 {
240 	CLIENT *cl;			/* client handle */
241 	struct ct_data *ct;		/* private data */
242 	struct timeval now;
243 	struct rpc_msg call_msg;
244 	struct t_info tinfo;
245 	int flag;
246 
247 	cl = malloc(sizeof (*cl));
248 	if ((ct = malloc(sizeof (*ct))) != NULL)
249 		ct->ct_addr.buf = NULL;
250 
251 	if ((cl == NULL) || (ct == NULL)) {
252 		(void) syslog(LOG_ERR, clnt_vc_errstr,
253 		    clnt_vc_str, __no_mem_str);
254 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
255 		rpc_createerr.cf_error.re_errno = errno;
256 		rpc_createerr.cf_error.re_terrno = 0;
257 		goto err;
258 	}
259 
260 	/*
261 	 * The only use of vctbl_lock is for serializing the creation of
262 	 * vctbl. Once created the lock needs to be released so we don't
263 	 * hold it across the set_up_connection() call and end up with a
264 	 * bunch of threads stuck waiting for the mutex.
265 	 */
266 	sig_mutex_lock(&vctbl_lock);
267 
268 	if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
269 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
270 		rpc_createerr.cf_error.re_errno = errno;
271 		rpc_createerr.cf_error.re_terrno = 0;
272 		sig_mutex_unlock(&vctbl_lock);
273 		goto err;
274 	}
275 
276 	sig_mutex_unlock(&vctbl_lock);
277 
278 	ct->ct_io_mode = RPC_CL_BLOCKING;
279 	ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
280 
281 	ct->ct_buffer = NULL;	/* We allocate the buffer when needed. */
282 	ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
283 	ct->ct_bufferPendingSize = 0;
284 	ct->ct_bufferWritePtr = NULL;
285 	ct->ct_bufferReadPtr = NULL;
286 
287 	/* Check the current state of the fd. */
288 	if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
289 		(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
290 		    no_fcntl_getfl_str);
291 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
292 		rpc_createerr.cf_error.re_terrno = errno;
293 		rpc_createerr.cf_error.re_errno = 0;
294 		goto err;
295 	}
296 	ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
297 
298 	if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
299 		goto err;
300 	}
301 
302 	/*
303 	 * Set up other members of private data struct
304 	 */
305 	ct->ct_fd = fd;
306 	/*
307 	 * The actual value will be set by clnt_call or clnt_control
308 	 */
309 	ct->ct_wait = 30000;
310 	ct->ct_waitset = FALSE;
311 	/*
312 	 * By default, closeit is always FALSE. It is users responsibility
313 	 * to do a t_close on it, else the user may use clnt_control
314 	 * to let clnt_destroy do it for them.
315 	 */
316 	ct->ct_closeit = FALSE;
317 
318 	/*
319 	 * Initialize call message
320 	 */
321 	(void) gettimeofday(&now, (struct timezone *)0);
322 	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
323 	call_msg.rm_call.cb_prog = prog;
324 	call_msg.rm_call.cb_vers = vers;
325 
326 	/*
327 	 * pre-serialize the static part of the call msg and stash it away
328 	 */
329 	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
330 	if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
331 		goto err;
332 	}
333 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
334 	XDR_DESTROY(&(ct->ct_xdrs));
335 
336 	if (t_getinfo(fd, &tinfo) == -1) {
337 		rpc_createerr.cf_stat = RPC_TLIERROR;
338 		rpc_createerr.cf_error.re_terrno = t_errno;
339 		rpc_createerr.cf_error.re_errno = 0;
340 		goto err;
341 	}
342 	/*
343 	 * Find the receive and the send size
344 	 */
345 	sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
346 	recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
347 	if ((sendsz == 0) || (recvsz == 0)) {
348 		rpc_createerr.cf_stat = RPC_TLIERROR;
349 		rpc_createerr.cf_error.re_terrno = 0;
350 		rpc_createerr.cf_error.re_errno = 0;
351 		goto err;
352 	}
353 	ct->ct_tsdu = tinfo.tsdu;
354 	/*
355 	 * Create a client handle which uses xdrrec for serialization
356 	 * and authnone for authentication.
357 	 */
358 	ct->ct_xdrs.x_ops = NULL;
359 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
360 	    read_vc, write_vc);
361 	if (ct->ct_xdrs.x_ops == NULL) {
362 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
363 		rpc_createerr.cf_error.re_terrno = 0;
364 		rpc_createerr.cf_error.re_errno = ENOMEM;
365 		goto err;
366 	}
367 	cl->cl_ops = clnt_vc_ops();
368 	cl->cl_private = (caddr_t)ct;
369 	cl->cl_auth = authnone_create();
370 	cl->cl_tp = NULL;
371 	cl->cl_netid = NULL;
372 	return (cl);
373 
374 err:
375 	if (ct) {
376 		free(ct->ct_addr.buf);
377 		free(ct);
378 	}
379 	free(cl);
380 
381 	return (NULL);
382 }
383 
384 #define	TCPOPT_BUFSIZE 128
385 
386 /*
387  * Set tcp connection timeout value.
388  * Retun 0 for success, -1 for failure.
389  */
390 static int
391 _set_tcp_conntime(int fd, int optval)
392 {
393 	struct t_optmgmt req, res;
394 	struct opthdr *opt;
395 	int *ip;
396 	char buf[TCPOPT_BUFSIZE];
397 
398 	/* LINTED pointer cast */
399 	opt = (struct opthdr *)buf;
400 	opt->level =  IPPROTO_TCP;
401 	opt->name = TCP_CONN_ABORT_THRESHOLD;
402 	opt->len = sizeof (int);
403 
404 	req.flags = T_NEGOTIATE;
405 	req.opt.len = sizeof (struct opthdr) + opt->len;
406 	req.opt.buf = (char *)opt;
407 	/* LINTED pointer cast */
408 	ip = (int *)((char *)buf + sizeof (struct opthdr));
409 	*ip = optval;
410 
411 	res.flags = 0;
412 	res.opt.buf = (char *)buf;
413 	res.opt.maxlen = sizeof (buf);
414 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
415 		return (-1);
416 	}
417 	return (0);
418 }
419 
420 /*
421  * Get current tcp connection timeout value.
422  * Retun the timeout in milliseconds, or -1 for failure.
423  */
424 static int
425 _get_tcp_conntime(int fd)
426 {
427 	struct t_optmgmt req, res;
428 	struct opthdr *opt;
429 	int *ip, retval;
430 	char buf[TCPOPT_BUFSIZE];
431 
432 	/* LINTED pointer cast */
433 	opt = (struct opthdr *)buf;
434 	opt->level =  IPPROTO_TCP;
435 	opt->name = TCP_CONN_ABORT_THRESHOLD;
436 	opt->len = sizeof (int);
437 
438 	req.flags = T_CURRENT;
439 	req.opt.len = sizeof (struct opthdr) + opt->len;
440 	req.opt.buf = (char *)opt;
441 	/* LINTED pointer cast */
442 	ip = (int *)((char *)buf + sizeof (struct opthdr));
443 	*ip = 0;
444 
445 	res.flags = 0;
446 	res.opt.buf = (char *)buf;
447 	res.opt.maxlen = sizeof (buf);
448 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
449 		return (-1);
450 	}
451 
452 	/* LINTED pointer cast */
453 	ip = (int *)((char *)buf + sizeof (struct opthdr));
454 	retval = *ip;
455 	return (retval);
456 }
457 
458 static bool_t
459 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
460     const struct timeval *tp)
461 {
462 	int state;
463 	struct t_call sndcallstr, *rcvcall;
464 	int nconnect;
465 	bool_t connected, do_rcv_connect;
466 	int curr_time = -1;
467 	hrtime_t start;
468 	hrtime_t tout;	/* timeout in nanoseconds (from tp) */
469 
470 	ct->ct_addr.len = 0;
471 	state = t_getstate(fd);
472 	if (state == -1) {
473 		rpc_createerr.cf_stat = RPC_TLIERROR;
474 		rpc_createerr.cf_error.re_errno = 0;
475 		rpc_createerr.cf_error.re_terrno = t_errno;
476 		return (FALSE);
477 	}
478 
479 	switch (state) {
480 	case T_IDLE:
481 		if (svcaddr == NULL) {
482 			rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
483 			return (FALSE);
484 		}
485 		/*
486 		 * Connect only if state is IDLE and svcaddr known
487 		 */
488 /* LINTED pointer alignment */
489 		rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
490 		if (rcvcall == NULL) {
491 			rpc_createerr.cf_stat = RPC_TLIERROR;
492 			rpc_createerr.cf_error.re_terrno = t_errno;
493 			rpc_createerr.cf_error.re_errno = errno;
494 			return (FALSE);
495 		}
496 		rcvcall->udata.maxlen = 0;
497 		sndcallstr.addr = *svcaddr;
498 		sndcallstr.opt.len = 0;
499 		sndcallstr.udata.len = 0;
500 		/*
501 		 * Even NULL could have sufficed for rcvcall, because
502 		 * the address returned is same for all cases except
503 		 * for the gateway case, and hence required.
504 		 */
505 		connected = FALSE;
506 		do_rcv_connect = FALSE;
507 
508 		/*
509 		 * If there is a timeout value specified, we will try to
510 		 * reset the tcp connection timeout. If the transport does
511 		 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
512 		 * for other reason, default timeout will be used.
513 		 */
514 		if (tp != NULL) {
515 			start = gethrtime();
516 
517 			/*
518 			 * Calculate the timeout in nanoseconds
519 			 */
520 			tout = SECS_TO_NS(tp->tv_sec) +
521 			    USECS_TO_NS(tp->tv_usec);
522 			curr_time = _get_tcp_conntime(fd);
523 		}
524 
525 		for (nconnect = 0; nconnect < 3; nconnect++) {
526 			if (tp != NULL) {
527 				/*
528 				 * Calculate the elapsed time
529 				 */
530 				hrtime_t elapsed = gethrtime() - start;
531 				if (elapsed >= tout)
532 					break;
533 
534 				if (curr_time != -1) {
535 					int ms;
536 
537 					/*
538 					 * TCP_CONN_ABORT_THRESHOLD takes int
539 					 * value in milliseconds.  Make sure we
540 					 * do not overflow.
541 					 */
542 					if (NSECS_TO_MS(tout - elapsed) >=
543 					    INT_MAX) {
544 						ms = INT_MAX;
545 					} else {
546 						ms = (int)
547 						    NSECS_TO_MS(tout - elapsed);
548 						if (MSECS_TO_NS(ms) !=
549 						    tout - elapsed)
550 							ms++;
551 					}
552 
553 					(void) _set_tcp_conntime(fd, ms);
554 				}
555 			}
556 
557 			if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
558 				connected = TRUE;
559 				break;
560 			}
561 			if (t_errno == TLOOK) {
562 				switch (t_look(fd)) {
563 				case T_DISCONNECT:
564 					(void) t_rcvdis(fd, (struct
565 					    t_discon *) NULL);
566 					break;
567 				default:
568 					break;
569 				}
570 			} else if (!(t_errno == TSYSERR && errno == EINTR)) {
571 				break;
572 			}
573 			if ((state = t_getstate(fd)) == T_OUTCON) {
574 				do_rcv_connect = TRUE;
575 				break;
576 			}
577 			if (state != T_IDLE) {
578 				break;
579 			}
580 		}
581 		if (do_rcv_connect) {
582 			do {
583 				if (t_rcvconnect(fd, rcvcall) != -1) {
584 					connected = TRUE;
585 					break;
586 				}
587 			} while (t_errno == TSYSERR && errno == EINTR);
588 		}
589 
590 		/*
591 		 * Set the connection timeout back to its old value.
592 		 */
593 		if (curr_time != -1) {
594 			(void) _set_tcp_conntime(fd, curr_time);
595 		}
596 
597 		if (!connected) {
598 			rpc_createerr.cf_stat = RPC_TLIERROR;
599 			rpc_createerr.cf_error.re_terrno = t_errno;
600 			rpc_createerr.cf_error.re_errno = errno;
601 			(void) t_free((char *)rcvcall, T_CALL);
602 			return (FALSE);
603 		}
604 
605 		/* Free old area if allocated */
606 		if (ct->ct_addr.buf)
607 			free(ct->ct_addr.buf);
608 		ct->ct_addr = rcvcall->addr;	/* To get the new address */
609 		/* So that address buf does not get freed */
610 		rcvcall->addr.buf = NULL;
611 		(void) t_free((char *)rcvcall, T_CALL);
612 		break;
613 	case T_DATAXFER:
614 	case T_OUTCON:
615 		if (svcaddr == NULL) {
616 			/*
617 			 * svcaddr could also be NULL in cases where the
618 			 * client is already bound and connected.
619 			 */
620 			ct->ct_addr.len = 0;
621 		} else {
622 			ct->ct_addr.buf = malloc(svcaddr->len);
623 			if (ct->ct_addr.buf == NULL) {
624 				(void) syslog(LOG_ERR, clnt_vc_errstr,
625 				    clnt_vc_str, __no_mem_str);
626 				rpc_createerr.cf_stat = RPC_SYSTEMERROR;
627 				rpc_createerr.cf_error.re_errno = errno;
628 				rpc_createerr.cf_error.re_terrno = 0;
629 				return (FALSE);
630 			}
631 			(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
632 			    (size_t)svcaddr->len);
633 			ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
634 		}
635 		break;
636 	default:
637 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
638 		return (FALSE);
639 	}
640 	return (TRUE);
641 }
642 
643 static enum clnt_stat
644 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
645 	xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
646 {
647 /* LINTED pointer alignment */
648 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
649 	XDR *xdrs = &(ct->ct_xdrs);
650 	struct rpc_msg reply_msg;
651 	uint32_t x_id;
652 /* LINTED pointer alignment */
653 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
654 	bool_t shipnow;
655 	int refreshes = 2;
656 
657 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
658 		rpc_callerr.re_status = RPC_FAILED;
659 		rpc_callerr.re_errno = errno;
660 		rpc_fd_unlock(vctbl, ct->ct_fd);
661 		return (RPC_FAILED);
662 	}
663 
664 	ct->ct_is_oneway = FALSE;
665 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
666 		if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
667 			rpc_fd_unlock(vctbl, ct->ct_fd);
668 			return (RPC_FAILED);  /* XXX */
669 		}
670 	}
671 
672 	if (!ct->ct_waitset) {
673 		/* If time is not within limits, we ignore it. */
674 		if (time_not_ok(&timeout) == FALSE)
675 			ct->ct_wait = __rpc_timeval_to_msec(&timeout);
676 	} else {
677 		timeout.tv_sec = (ct->ct_wait / 1000);
678 		timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
679 	}
680 
681 	shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
682 	    (timeout.tv_usec == 0)) ? FALSE : TRUE;
683 call_again:
684 	xdrs->x_op = XDR_ENCODE;
685 	rpc_callerr.re_status = RPC_SUCCESS;
686 	/*
687 	 * Due to little endian byte order, it is necessary to convert to host
688 	 * format before decrementing xid.
689 	 */
690 	x_id = ntohl(*msg_x_id) - 1;
691 	*msg_x_id = htonl(x_id);
692 
693 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
694 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
695 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
696 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
697 		    (!xdr_args(xdrs, args_ptr))) {
698 			if (rpc_callerr.re_status == RPC_SUCCESS)
699 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
700 			(void) xdrrec_endofrecord(xdrs, TRUE);
701 			rpc_fd_unlock(vctbl, ct->ct_fd);
702 			return (rpc_callerr.re_status);
703 		}
704 	} else {
705 /* LINTED pointer alignment */
706 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
707 		IXDR_PUT_U_INT32(u, proc);
708 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
709 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
710 			if (rpc_callerr.re_status == RPC_SUCCESS)
711 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
712 			(void) xdrrec_endofrecord(xdrs, TRUE);
713 			rpc_fd_unlock(vctbl, ct->ct_fd);
714 			return (rpc_callerr.re_status);
715 		}
716 	}
717 	if (!xdrrec_endofrecord(xdrs, shipnow)) {
718 		rpc_fd_unlock(vctbl, ct->ct_fd);
719 		return (rpc_callerr.re_status = RPC_CANTSEND);
720 	}
721 	if (!shipnow) {
722 		rpc_fd_unlock(vctbl, ct->ct_fd);
723 		return (RPC_SUCCESS);
724 	}
725 	/*
726 	 * Hack to provide rpc-based message passing
727 	 */
728 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
729 		rpc_fd_unlock(vctbl, ct->ct_fd);
730 		return (rpc_callerr.re_status = RPC_TIMEDOUT);
731 	}
732 
733 
734 	/*
735 	 * Keep receiving until we get a valid transaction id
736 	 */
737 	xdrs->x_op = XDR_DECODE;
738 	for (;;) {
739 		reply_msg.acpted_rply.ar_verf = _null_auth;
740 		reply_msg.acpted_rply.ar_results.where = NULL;
741 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
742 		if (!xdrrec_skiprecord(xdrs)) {
743 			rpc_fd_unlock(vctbl, ct->ct_fd);
744 			return (rpc_callerr.re_status);
745 		}
746 		/* now decode and validate the response header */
747 		if (!xdr_replymsg(xdrs, &reply_msg)) {
748 			if (rpc_callerr.re_status == RPC_SUCCESS)
749 				continue;
750 			rpc_fd_unlock(vctbl, ct->ct_fd);
751 			return (rpc_callerr.re_status);
752 		}
753 		if (reply_msg.rm_xid == x_id)
754 			break;
755 	}
756 
757 	/*
758 	 * process header
759 	 */
760 	if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
761 	    (reply_msg.acpted_rply.ar_stat == SUCCESS))
762 		rpc_callerr.re_status = RPC_SUCCESS;
763 	else
764 		__seterr_reply(&reply_msg, &(rpc_callerr));
765 
766 	if (rpc_callerr.re_status == RPC_SUCCESS) {
767 		if (!AUTH_VALIDATE(cl->cl_auth,
768 		    &reply_msg.acpted_rply.ar_verf)) {
769 			rpc_callerr.re_status = RPC_AUTHERROR;
770 			rpc_callerr.re_why = AUTH_INVALIDRESP;
771 		} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
772 			if (!(*xdr_results)(xdrs, results_ptr)) {
773 				if (rpc_callerr.re_status == RPC_SUCCESS)
774 					rpc_callerr.re_status =
775 					    RPC_CANTDECODERES;
776 			}
777 		} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
778 		    results_ptr)) {
779 			if (rpc_callerr.re_status == RPC_SUCCESS)
780 				rpc_callerr.re_status = RPC_CANTDECODERES;
781 		}
782 	}	/* end successful completion */
783 	/*
784 	 * If unsuccesful AND error is an authentication error
785 	 * then refresh credentials and try again, else break
786 	 */
787 	else if (rpc_callerr.re_status == RPC_AUTHERROR) {
788 		/* maybe our credentials need to be refreshed ... */
789 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
790 			goto call_again;
791 		else
792 			/*
793 			 * We are setting rpc_callerr here given that libnsl
794 			 * is not reentrant thereby reinitializing the TSD.
795 			 * If not set here then success could be returned even
796 			 * though refresh failed.
797 			 */
798 			rpc_callerr.re_status = RPC_AUTHERROR;
799 	} /* end of unsuccessful completion */
800 	/* free verifier ... */
801 	if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
802 	    reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
803 		xdrs->x_op = XDR_FREE;
804 		(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
805 	}
806 	rpc_fd_unlock(vctbl, ct->ct_fd);
807 	return (rpc_callerr.re_status);
808 }
809 
810 static enum clnt_stat
811 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
812 {
813 /* LINTED pointer alignment */
814 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
815 	XDR *xdrs = &(ct->ct_xdrs);
816 	uint32_t x_id;
817 /* LINTED pointer alignment */
818 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
819 
820 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
821 		rpc_callerr.re_status = RPC_FAILED;
822 		rpc_callerr.re_errno = errno;
823 		rpc_fd_unlock(vctbl, ct->ct_fd);
824 		return (RPC_FAILED);
825 	}
826 
827 	ct->ct_is_oneway = TRUE;
828 
829 	xdrs->x_op = XDR_ENCODE;
830 	rpc_callerr.re_status = RPC_SUCCESS;
831 	/*
832 	 * Due to little endian byte order, it is necessary to convert to host
833 	 * format before decrementing xid.
834 	 */
835 	x_id = ntohl(*msg_x_id) - 1;
836 	*msg_x_id = htonl(x_id);
837 
838 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
839 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
840 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
841 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
842 		    (!xdr_args(xdrs, args_ptr))) {
843 			if (rpc_callerr.re_status == RPC_SUCCESS)
844 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
845 			(void) xdrrec_endofrecord(xdrs, TRUE);
846 			rpc_fd_unlock(vctbl, ct->ct_fd);
847 			return (rpc_callerr.re_status);
848 		}
849 	} else {
850 /* LINTED pointer alignment */
851 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
852 		IXDR_PUT_U_INT32(u, proc);
853 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
854 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
855 			if (rpc_callerr.re_status == RPC_SUCCESS)
856 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
857 			(void) xdrrec_endofrecord(xdrs, TRUE);
858 			rpc_fd_unlock(vctbl, ct->ct_fd);
859 			return (rpc_callerr.re_status);
860 		}
861 	}
862 
863 	/*
864 	 * Do not need to check errors, as the following code does
865 	 * not depend on the successful completion of the call.
866 	 * An error, if any occurs, is reported through
867 	 * rpc_callerr.re_status.
868 	 */
869 	(void) xdrrec_endofrecord(xdrs, TRUE);
870 
871 	rpc_fd_unlock(vctbl, ct->ct_fd);
872 	return (rpc_callerr.re_status);
873 }
874 
875 /* ARGSUSED */
876 static void
877 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
878 {
879 	*errp = rpc_callerr;
880 }
881 
882 static bool_t
883 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
884 {
885 /* LINTED pointer alignment */
886 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
887 	XDR *xdrs = &(ct->ct_xdrs);
888 	bool_t stat;
889 
890 	(void) rpc_fd_lock(vctbl, ct->ct_fd);
891 	xdrs->x_op = XDR_FREE;
892 	stat = (*xdr_res)(xdrs, res_ptr);
893 	rpc_fd_unlock(vctbl, ct->ct_fd);
894 	return (stat);
895 }
896 
897 static void
898 clnt_vc_abort(void)
899 {
900 }
901 
902 /*ARGSUSED*/
903 static bool_t
904 clnt_vc_control(CLIENT *cl, int request, char *info)
905 {
906 	bool_t ret;
907 /* LINTED pointer alignment */
908 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
909 
910 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
911 		rpc_fd_unlock(vctbl, ct->ct_fd);
912 		return (FALSE);
913 	}
914 
915 	switch (request) {
916 	case CLSET_FD_CLOSE:
917 		ct->ct_closeit = TRUE;
918 		rpc_fd_unlock(vctbl, ct->ct_fd);
919 		return (TRUE);
920 	case CLSET_FD_NCLOSE:
921 		ct->ct_closeit = FALSE;
922 		rpc_fd_unlock(vctbl, ct->ct_fd);
923 		return (TRUE);
924 	case CLFLUSH:
925 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
926 			int res;
927 			res = do_flush(ct, (info == NULL ||
928 			    /* LINTED pointer cast */
929 			    *(int *)info == RPC_CL_DEFAULT_FLUSH)?
930 			    /* LINTED pointer cast */
931 			    ct->ct_blocking_mode: *(int *)info);
932 			ret = (0 == res);
933 		} else {
934 			ret = FALSE;
935 		}
936 		rpc_fd_unlock(vctbl, ct->ct_fd);
937 		return (ret);
938 	}
939 
940 	/* for other requests which use info */
941 	if (info == NULL) {
942 		rpc_fd_unlock(vctbl, ct->ct_fd);
943 		return (FALSE);
944 	}
945 	switch (request) {
946 	case CLSET_TIMEOUT:
947 /* LINTED pointer alignment */
948 		if (time_not_ok((struct timeval *)info)) {
949 			rpc_fd_unlock(vctbl, ct->ct_fd);
950 			return (FALSE);
951 		}
952 /* LINTED pointer alignment */
953 		ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
954 		ct->ct_waitset = TRUE;
955 		break;
956 	case CLGET_TIMEOUT:
957 /* LINTED pointer alignment */
958 		((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
959 /* LINTED pointer alignment */
960 		((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
961 		break;
962 	case CLGET_SERVER_ADDR:	/* For compatibility only */
963 		(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
964 		break;
965 	case CLGET_FD:
966 /* LINTED pointer alignment */
967 		*(int *)info = ct->ct_fd;
968 		break;
969 	case CLGET_SVC_ADDR:
970 		/* The caller should not free this memory area */
971 /* LINTED pointer alignment */
972 		*(struct netbuf *)info = ct->ct_addr;
973 		break;
974 	case CLSET_SVC_ADDR:		/* set to new address */
975 #ifdef undef
976 		/*
977 		 * XXX: once the t_snddis(), followed by t_connect() starts to
978 		 * work, this ifdef should be removed.  CLIENT handle reuse
979 		 * would then be possible for COTS as well.
980 		 */
981 		if (t_snddis(ct->ct_fd, NULL) == -1) {
982 			rpc_createerr.cf_stat = RPC_TLIERROR;
983 			rpc_createerr.cf_error.re_terrno = t_errno;
984 			rpc_createerr.cf_error.re_errno = errno;
985 			rpc_fd_unlock(vctbl, ct->ct_fd);
986 			return (FALSE);
987 		}
988 		ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
989 		    ct, NULL);
990 		rpc_fd_unlock(vctbl, ct->ct_fd);
991 		return (ret);
992 #else
993 		rpc_fd_unlock(vctbl, ct->ct_fd);
994 		return (FALSE);
995 #endif
996 	case CLGET_XID:
997 		/*
998 		 * use the knowledge that xid is the
999 		 * first element in the call structure
1000 		 * This will get the xid of the PREVIOUS call
1001 		 */
1002 /* LINTED pointer alignment */
1003 		*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
1004 		break;
1005 	case CLSET_XID:
1006 		/* This will set the xid of the NEXT call */
1007 /* LINTED pointer alignment */
1008 		*(uint32_t *)ct->ct_mcall =  htonl(*(uint32_t *)info + 1);
1009 		/* increment by 1 as clnt_vc_call() decrements once */
1010 		break;
1011 	case CLGET_VERS:
1012 		/*
1013 		 * This RELIES on the information that, in the call body,
1014 		 * the version number field is the fifth field from the
1015 		 * begining of the RPC header. MUST be changed if the
1016 		 * call_struct is changed
1017 		 */
1018 /* LINTED pointer alignment */
1019 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1020 		    4 * BYTES_PER_XDR_UNIT));
1021 		break;
1022 
1023 	case CLSET_VERS:
1024 /* LINTED pointer alignment */
1025 		*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
1026 /* LINTED pointer alignment */
1027 		    htonl(*(uint32_t *)info);
1028 		break;
1029 
1030 	case CLGET_PROG:
1031 		/*
1032 		 * This RELIES on the information that, in the call body,
1033 		 * the program number field is the fourth field from the
1034 		 * begining of the RPC header. MUST be changed if the
1035 		 * call_struct is changed
1036 		 */
1037 /* LINTED pointer alignment */
1038 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1039 		    3 * BYTES_PER_XDR_UNIT));
1040 		break;
1041 
1042 	case CLSET_PROG:
1043 /* LINTED pointer alignment */
1044 		*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1045 /* LINTED pointer alignment */
1046 		    htonl(*(uint32_t *)info);
1047 		break;
1048 
1049 	case CLSET_IO_MODE:
1050 		/* LINTED pointer cast */
1051 		if (!set_io_mode(ct, *(int *)info)) {
1052 			rpc_fd_unlock(vctbl, ct->ct_fd);
1053 			return (FALSE);
1054 		}
1055 		break;
1056 	case CLSET_FLUSH_MODE:
1057 		/* Set a specific FLUSH_MODE */
1058 		/* LINTED pointer cast */
1059 		if (!set_flush_mode(ct, *(int *)info)) {
1060 			rpc_fd_unlock(vctbl, ct->ct_fd);
1061 			return (FALSE);
1062 		}
1063 		break;
1064 	case CLGET_FLUSH_MODE:
1065 		/* LINTED pointer cast */
1066 		*(rpcflushmode_t *)info = ct->ct_blocking_mode;
1067 		break;
1068 
1069 	case CLGET_IO_MODE:
1070 		/* LINTED pointer cast */
1071 		*(rpciomode_t *)info = ct->ct_io_mode;
1072 		break;
1073 
1074 	case CLGET_CURRENT_REC_SIZE:
1075 		/*
1076 		 * Returns the current amount of memory allocated
1077 		 * to pending requests
1078 		 */
1079 		/* LINTED pointer cast */
1080 		*(int *)info = ct->ct_bufferPendingSize;
1081 		break;
1082 
1083 	case CLSET_CONNMAXREC_SIZE:
1084 		/* Cannot resize the buffer if it is used. */
1085 		if (ct->ct_bufferPendingSize != 0) {
1086 			rpc_fd_unlock(vctbl, ct->ct_fd);
1087 			return (FALSE);
1088 		}
1089 		/*
1090 		 * If the new size is equal to the current size,
1091 		 * there is nothing to do.
1092 		 */
1093 		/* LINTED pointer cast */
1094 		if (ct->ct_bufferSize == *(uint_t *)info)
1095 			break;
1096 
1097 		/* LINTED pointer cast */
1098 		ct->ct_bufferSize = *(uint_t *)info;
1099 		if (ct->ct_buffer) {
1100 			free(ct->ct_buffer);
1101 			ct->ct_buffer = NULL;
1102 			ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1103 		}
1104 		break;
1105 
1106 	case CLGET_CONNMAXREC_SIZE:
1107 		/*
1108 		 * Returns the size of buffer allocated
1109 		 * to pending requests
1110 		 */
1111 		/* LINTED pointer cast */
1112 		*(uint_t *)info = ct->ct_bufferSize;
1113 		break;
1114 
1115 	default:
1116 		rpc_fd_unlock(vctbl, ct->ct_fd);
1117 		return (FALSE);
1118 	}
1119 	rpc_fd_unlock(vctbl, ct->ct_fd);
1120 	return (TRUE);
1121 }
1122 
1123 static void
1124 clnt_vc_destroy(CLIENT *cl)
1125 {
1126 /* LINTED pointer alignment */
1127 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
1128 	int ct_fd = ct->ct_fd;
1129 
1130 	(void) rpc_fd_lock(vctbl, ct_fd);
1131 
1132 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1133 		(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1134 		(void) unregister_nb(ct);
1135 	}
1136 
1137 	if (ct->ct_closeit)
1138 		(void) t_close(ct_fd);
1139 	XDR_DESTROY(&(ct->ct_xdrs));
1140 	if (ct->ct_addr.buf)
1141 		free(ct->ct_addr.buf);
1142 	free(ct);
1143 	if (cl->cl_netid && cl->cl_netid[0])
1144 		free(cl->cl_netid);
1145 	if (cl->cl_tp && cl->cl_tp[0])
1146 		free(cl->cl_tp);
1147 	free(cl);
1148 	rpc_fd_unlock(vctbl, ct_fd);
1149 }
1150 
1151 /*
1152  * Interface between xdr serializer and vc connection.
1153  * Behaves like the system calls, read & write, but keeps some error state
1154  * around for the rpc level.
1155  */
1156 static int
1157 read_vc(void *ct_tmp, caddr_t buf, int len)
1158 {
1159 	static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
1160 	struct pollfd *pfdp;
1161 	int npfd;		/* total number of pfdp allocated */
1162 	struct ct_data *ct = ct_tmp;
1163 	struct timeval starttime;
1164 	struct timeval curtime;
1165 	int poll_time;
1166 	int delta;
1167 
1168 	if (len == 0)
1169 		return (0);
1170 
1171 	/*
1172 	 * Allocate just one the first time.  thr_get_storage() may
1173 	 * return a larger buffer, left over from the last time we were
1174 	 * here, but that's OK.  realloc() will deal with it properly.
1175 	 */
1176 	npfd = 1;
1177 	pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1178 	if (pfdp == NULL) {
1179 		(void) syslog(LOG_ERR, clnt_vc_errstr,
1180 		    clnt_read_vc_str, __no_mem_str);
1181 		rpc_callerr.re_status = RPC_SYSTEMERROR;
1182 		rpc_callerr.re_errno = errno;
1183 		rpc_callerr.re_terrno = 0;
1184 		return (-1);
1185 	}
1186 
1187 	/*
1188 	 *	N.B.:  slot 0 in the pollfd array is reserved for the file
1189 	 *	descriptor we're really interested in (as opposed to the
1190 	 *	callback descriptors).
1191 	 */
1192 	pfdp[0].fd = ct->ct_fd;
1193 	pfdp[0].events = MASKVAL;
1194 	pfdp[0].revents = 0;
1195 	poll_time = ct->ct_wait;
1196 	if (gettimeofday(&starttime, NULL) == -1) {
1197 		syslog(LOG_ERR, "Unable to get time of day: %m");
1198 		return (-1);
1199 	}
1200 
1201 	for (;;) {
1202 		extern void (*_svc_getreqset_proc)();
1203 		extern pollfd_t *svc_pollfd;
1204 		extern int svc_max_pollfd;
1205 		int fds;
1206 
1207 		/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1208 
1209 		if (_svc_getreqset_proc) {
1210 			sig_rw_rdlock(&svc_fd_lock);
1211 
1212 			/* reallocate pfdp to svc_max_pollfd +1 */
1213 			if (npfd != (svc_max_pollfd + 1)) {
1214 				struct pollfd *tmp_pfdp = realloc(pfdp,
1215 				    sizeof (struct pollfd) *
1216 				    (svc_max_pollfd + 1));
1217 				if (tmp_pfdp == NULL) {
1218 					sig_rw_unlock(&svc_fd_lock);
1219 					(void) syslog(LOG_ERR, clnt_vc_errstr,
1220 					    clnt_read_vc_str, __no_mem_str);
1221 					rpc_callerr.re_status = RPC_SYSTEMERROR;
1222 					rpc_callerr.re_errno = errno;
1223 					rpc_callerr.re_terrno = 0;
1224 					return (-1);
1225 				}
1226 
1227 				pfdp = tmp_pfdp;
1228 				npfd = svc_max_pollfd + 1;
1229 				(void) pthread_setspecific(pfdp_key, pfdp);
1230 			}
1231 			if (npfd > 1)
1232 				(void) memcpy(&pfdp[1], svc_pollfd,
1233 				    sizeof (struct pollfd) * (npfd - 1));
1234 
1235 			sig_rw_unlock(&svc_fd_lock);
1236 		} else {
1237 			npfd = 1;	/* don't forget about pfdp[0] */
1238 		}
1239 
1240 		switch (fds = poll(pfdp, npfd, poll_time)) {
1241 		case 0:
1242 			rpc_callerr.re_status = RPC_TIMEDOUT;
1243 			return (-1);
1244 
1245 		case -1:
1246 			if (errno != EINTR)
1247 				continue;
1248 			else {
1249 				/*
1250 				 * interrupted by another signal,
1251 				 * update time_waited
1252 				 */
1253 
1254 				if (gettimeofday(&curtime, NULL) == -1) {
1255 					syslog(LOG_ERR,
1256 					    "Unable to get time of day:  %m");
1257 					errno = 0;
1258 					continue;
1259 				};
1260 				delta = (curtime.tv_sec -
1261 				    starttime.tv_sec) * 1000 +
1262 				    (curtime.tv_usec -
1263 				    starttime.tv_usec) / 1000;
1264 				poll_time -= delta;
1265 				if (poll_time < 0) {
1266 					rpc_callerr.re_status = RPC_TIMEDOUT;
1267 					errno = 0;
1268 					return (-1);
1269 				} else {
1270 					errno = 0; /* reset it */
1271 					continue;
1272 				}
1273 			}
1274 		}
1275 
1276 		if (pfdp[0].revents == 0) {
1277 			/* must be for server side of the house */
1278 			(*_svc_getreqset_proc)(&pfdp[1], fds);
1279 			continue;	/* do poll again */
1280 		}
1281 
1282 		if (pfdp[0].revents & POLLNVAL) {
1283 			rpc_callerr.re_status = RPC_CANTRECV;
1284 			/*
1285 			 *	Note:  we're faking errno here because we
1286 			 *	previously would have expected select() to
1287 			 *	return -1 with errno EBADF.  Poll(BA_OS)
1288 			 *	returns 0 and sets the POLLNVAL revents flag
1289 			 *	instead.
1290 			 */
1291 			rpc_callerr.re_errno = errno = EBADF;
1292 			return (-1);
1293 		}
1294 
1295 		if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1296 			rpc_callerr.re_status = RPC_CANTRECV;
1297 			rpc_callerr.re_errno = errno = EPIPE;
1298 			return (-1);
1299 		}
1300 		break;
1301 	}
1302 
1303 	switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1304 	case 0:
1305 		/* premature eof */
1306 		rpc_callerr.re_errno = ENOLINK;
1307 		rpc_callerr.re_terrno = 0;
1308 		rpc_callerr.re_status = RPC_CANTRECV;
1309 		len = -1;	/* it's really an error */
1310 		break;
1311 
1312 	case -1:
1313 		rpc_callerr.re_terrno = t_errno;
1314 		rpc_callerr.re_errno = 0;
1315 		rpc_callerr.re_status = RPC_CANTRECV;
1316 		break;
1317 	}
1318 	return (len);
1319 }
1320 
1321 static int
1322 write_vc(void *ct_tmp, caddr_t buf, int len)
1323 {
1324 	int i, cnt;
1325 	struct ct_data *ct = ct_tmp;
1326 	int flag;
1327 	int maxsz;
1328 
1329 	maxsz = ct->ct_tsdu;
1330 
1331 	/* Handle the non-blocking mode */
1332 	if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1333 		/*
1334 		 * Test a special case here. If the length of the current
1335 		 * write is greater than the transport data unit, and the
1336 		 * mode is non blocking, we return RPC_CANTSEND.
1337 		 * XXX  this is not very clean.
1338 		 */
1339 		if (maxsz > 0 && len > maxsz) {
1340 			rpc_callerr.re_terrno = errno;
1341 			rpc_callerr.re_errno = 0;
1342 			rpc_callerr.re_status = RPC_CANTSEND;
1343 			return (-1);
1344 		}
1345 
1346 		len = nb_send(ct, buf, (unsigned)len);
1347 		if (len == -1) {
1348 			rpc_callerr.re_terrno = errno;
1349 			rpc_callerr.re_errno = 0;
1350 			rpc_callerr.re_status = RPC_CANTSEND;
1351 		} else if (len == -2) {
1352 			rpc_callerr.re_terrno = 0;
1353 			rpc_callerr.re_errno = 0;
1354 			rpc_callerr.re_status = RPC_CANTSTORE;
1355 		}
1356 		return (len);
1357 	}
1358 
1359 	if ((maxsz == 0) || (maxsz == -1)) {
1360 		/*
1361 		 * T_snd may return -1 for error on connection (connection
1362 		 * needs to be repaired/closed, and -2 for flow-control
1363 		 * handling error (no operation to do, just wait and call
1364 		 * T_Flush()).
1365 		 */
1366 		if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1367 			rpc_callerr.re_terrno = t_errno;
1368 			rpc_callerr.re_errno = 0;
1369 			rpc_callerr.re_status = RPC_CANTSEND;
1370 		}
1371 		return (len);
1372 	}
1373 
1374 	/*
1375 	 * This for those transports which have a max size for data.
1376 	 */
1377 	for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1378 		flag = cnt > maxsz ? T_MORE : 0;
1379 		if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1380 		    flag)) == -1) {
1381 			rpc_callerr.re_terrno = t_errno;
1382 			rpc_callerr.re_errno = 0;
1383 			rpc_callerr.re_status = RPC_CANTSEND;
1384 			return (-1);
1385 		}
1386 	}
1387 	return (len);
1388 }
1389 
1390 /*
1391  * Receive the required bytes of data, even if it is fragmented.
1392  */
1393 static int
1394 t_rcvall(int fd, char *buf, int len)
1395 {
1396 	int moreflag;
1397 	int final = 0;
1398 	int res;
1399 
1400 	do {
1401 		moreflag = 0;
1402 		res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1403 		if (res == -1) {
1404 			if (t_errno == TLOOK)
1405 				switch (t_look(fd)) {
1406 				case T_DISCONNECT:
1407 					(void) t_rcvdis(fd, NULL);
1408 					(void) t_snddis(fd, NULL);
1409 					return (-1);
1410 				case T_ORDREL:
1411 				/* Received orderly release indication */
1412 					(void) t_rcvrel(fd);
1413 				/* Send orderly release indicator */
1414 					(void) t_sndrel(fd);
1415 					return (-1);
1416 				default:
1417 					return (-1);
1418 				}
1419 		} else if (res == 0) {
1420 			return (0);
1421 		}
1422 		final += res;
1423 		buf += res;
1424 		len -= res;
1425 	} while ((len > 0) && (moreflag & T_MORE));
1426 	return (final);
1427 }
1428 
1429 static struct clnt_ops *
1430 clnt_vc_ops(void)
1431 {
1432 	static struct clnt_ops ops;
1433 	extern mutex_t	ops_lock;
1434 
1435 	/* VARIABLES PROTECTED BY ops_lock: ops */
1436 
1437 	sig_mutex_lock(&ops_lock);
1438 	if (ops.cl_call == NULL) {
1439 		ops.cl_call = clnt_vc_call;
1440 		ops.cl_send = clnt_vc_send;
1441 		ops.cl_abort = clnt_vc_abort;
1442 		ops.cl_geterr = clnt_vc_geterr;
1443 		ops.cl_freeres = clnt_vc_freeres;
1444 		ops.cl_destroy = clnt_vc_destroy;
1445 		ops.cl_control = clnt_vc_control;
1446 	}
1447 	sig_mutex_unlock(&ops_lock);
1448 	return (&ops);
1449 }
1450 
1451 /*
1452  * Make sure that the time is not garbage.   -1 value is disallowed.
1453  * Note this is different from time_not_ok in clnt_dg.c
1454  */
1455 static bool_t
1456 time_not_ok(struct timeval *t)
1457 {
1458 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1459 	    t->tv_usec <= -1 || t->tv_usec > 1000000);
1460 }
1461 
1462 
1463 /* Compute the # of bytes that remains until the end of the buffer */
1464 #define	REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1465 
1466 static int
1467 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1468 {
1469 	if (NULL == ct->ct_buffer) {
1470 		/* Buffer not allocated yet. */
1471 		char *buffer;
1472 
1473 		buffer = malloc(ct->ct_bufferSize);
1474 		if (NULL == buffer) {
1475 			errno = ENOMEM;
1476 			return (-1);
1477 		}
1478 		(void) memcpy(buffer, dataToAdd, nBytes);
1479 
1480 		ct->ct_buffer = buffer;
1481 		ct->ct_bufferReadPtr = buffer;
1482 		ct->ct_bufferWritePtr = buffer + nBytes;
1483 		ct->ct_bufferPendingSize = nBytes;
1484 	} else {
1485 		/*
1486 		 * For an already allocated buffer, two mem copies
1487 		 * might be needed, depending on the current
1488 		 * writing position.
1489 		 */
1490 
1491 		/* Compute the length of the first copy. */
1492 		int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1493 
1494 		ct->ct_bufferPendingSize += nBytes;
1495 
1496 		(void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1497 		ct->ct_bufferWritePtr += len;
1498 		nBytes -= len;
1499 		if (0 == nBytes) {
1500 			/* One memcopy needed. */
1501 
1502 			/*
1503 			 * If the write pointer is at the end of the buffer,
1504 			 * wrap it now.
1505 			 */
1506 			if (ct->ct_bufferWritePtr ==
1507 			    (ct->ct_buffer + ct->ct_bufferSize)) {
1508 				ct->ct_bufferWritePtr = ct->ct_buffer;
1509 			}
1510 		} else {
1511 			/* Two memcopy needed. */
1512 			dataToAdd += len;
1513 
1514 			/*
1515 			 * Copy the remaining data to the beginning of the
1516 			 * buffer
1517 			 */
1518 			(void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1519 			ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1520 		}
1521 	}
1522 	return (0);
1523 }
1524 
1525 static void
1526 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1527 {
1528 	ct->ct_bufferPendingSize -= nBytes;
1529 	if (ct->ct_bufferPendingSize == 0) {
1530 		/*
1531 		 * If the buffer contains no data, we set the two pointers at
1532 		 * the beginning of the buffer (to miminize buffer wraps).
1533 		 */
1534 		ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1535 	} else {
1536 		ct->ct_bufferReadPtr += nBytes;
1537 		if (ct->ct_bufferReadPtr >
1538 		    ct->ct_buffer + ct->ct_bufferSize) {
1539 			ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1540 		}
1541 	}
1542 }
1543 
1544 static int
1545 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1546 {
1547 	int l;
1548 
1549 	if (ct->ct_bufferPendingSize == 0)
1550 		return (0);
1551 
1552 	l = REMAIN_BYTES(bufferReadPtr);
1553 	if (l < ct->ct_bufferPendingSize) {
1554 		/* Buffer in two fragments. */
1555 		iov[0].iov_base = ct->ct_bufferReadPtr;
1556 		iov[0].iov_len  = l;
1557 
1558 		iov[1].iov_base = ct->ct_buffer;
1559 		iov[1].iov_len  = ct->ct_bufferPendingSize - l;
1560 		return (2);
1561 	} else {
1562 		/* Buffer in one fragment. */
1563 		iov[0].iov_base = ct->ct_bufferReadPtr;
1564 		iov[0].iov_len  = ct->ct_bufferPendingSize;
1565 		return (1);
1566 	}
1567 }
1568 
1569 static bool_t
1570 set_flush_mode(struct ct_data *ct, int mode)
1571 {
1572 	switch (mode) {
1573 	case RPC_CL_BLOCKING_FLUSH:
1574 		/* flush as most as possible without blocking */
1575 	case RPC_CL_BESTEFFORT_FLUSH:
1576 		/* flush the buffer completely (possibly blocking) */
1577 	case RPC_CL_DEFAULT_FLUSH:
1578 		/* flush according to the currently defined policy */
1579 		ct->ct_blocking_mode = mode;
1580 		return (TRUE);
1581 	default:
1582 		return (FALSE);
1583 	}
1584 }
1585 
1586 static bool_t
1587 set_io_mode(struct ct_data *ct, int ioMode)
1588 {
1589 	switch (ioMode) {
1590 	case RPC_CL_BLOCKING:
1591 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1592 			if (NULL != ct->ct_buffer) {
1593 				/*
1594 				 * If a buffer was allocated for this
1595 				 * connection, flush it now, and free it.
1596 				 */
1597 				(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1598 				free(ct->ct_buffer);
1599 				ct->ct_buffer = NULL;
1600 			}
1601 			(void) unregister_nb(ct);
1602 			ct->ct_io_mode = ioMode;
1603 		}
1604 		break;
1605 	case RPC_CL_NONBLOCKING:
1606 		if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1607 			if (-1 == register_nb(ct)) {
1608 				return (FALSE);
1609 			}
1610 			ct->ct_io_mode = ioMode;
1611 		}
1612 		break;
1613 	default:
1614 		return (FALSE);
1615 	}
1616 	return (TRUE);
1617 }
1618 
1619 static int
1620 do_flush(struct ct_data *ct, uint_t flush_mode)
1621 {
1622 	int result;
1623 	if (ct->ct_bufferPendingSize == 0) {
1624 		return (0);
1625 	}
1626 
1627 	switch (flush_mode) {
1628 	case RPC_CL_BLOCKING_FLUSH:
1629 		if (!set_blocking_connection(ct, TRUE)) {
1630 			return (-1);
1631 		}
1632 		while (ct->ct_bufferPendingSize > 0) {
1633 			if (REMAIN_BYTES(bufferReadPtr) <
1634 			    ct->ct_bufferPendingSize) {
1635 				struct iovec iov[2];
1636 				(void) iovFromBuffer(ct, iov);
1637 				result = writev(ct->ct_fd, iov, 2);
1638 			} else {
1639 				result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1640 				    ct->ct_bufferPendingSize, 0);
1641 			}
1642 			if (result < 0) {
1643 				return (-1);
1644 			}
1645 			consumeFromBuffer(ct, result);
1646 		}
1647 
1648 		break;
1649 
1650 	case RPC_CL_BESTEFFORT_FLUSH:
1651 		(void) set_blocking_connection(ct, FALSE);
1652 		if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1653 			struct iovec iov[2];
1654 			(void) iovFromBuffer(ct, iov);
1655 			result = writev(ct->ct_fd, iov, 2);
1656 		} else {
1657 			result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1658 			    ct->ct_bufferPendingSize, 0);
1659 		}
1660 		if (result < 0) {
1661 			if (errno != EWOULDBLOCK) {
1662 				perror("flush");
1663 				return (-1);
1664 			}
1665 			return (0);
1666 		}
1667 		if (result > 0)
1668 			consumeFromBuffer(ct, result);
1669 		break;
1670 	}
1671 	return (0);
1672 }
1673 
1674 /*
1675  * Non blocking send.
1676  */
1677 
1678 static int
1679 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1680 {
1681 	int result;
1682 
1683 	if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
1684 		return (-1);
1685 	}
1686 
1687 	/*
1688 	 * Check to see if the current message can be stored fully in the
1689 	 * buffer. We have to check this now because it may be impossible
1690 	 * to send any data, so the message must be stored in the buffer.
1691 	 */
1692 	if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1693 		/* Try to flush  (to free some space). */
1694 		(void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1695 
1696 		/* Can we store the message now ? */
1697 		if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1698 			return (-2);
1699 	}
1700 
1701 	(void) set_blocking_connection(ct, FALSE);
1702 
1703 	/*
1704 	 * If there is no data pending, we can simply try
1705 	 * to send our data.
1706 	 */
1707 	if (ct->ct_bufferPendingSize == 0) {
1708 		result = t_snd(ct->ct_fd, buff, nBytes, 0);
1709 		if (result == -1) {
1710 			if (errno == EWOULDBLOCK) {
1711 				result = 0;
1712 			} else {
1713 				perror("send");
1714 				return (-1);
1715 			}
1716 		}
1717 		/*
1718 		 * If we have not sent all data, we must store them
1719 		 * in the buffer.
1720 		 */
1721 		if (result != nBytes) {
1722 			if (addInBuffer(ct, (char *)buff + result,
1723 			    nBytes - result) == -1) {
1724 				return (-1);
1725 			}
1726 		}
1727 	} else {
1728 		/*
1729 		 * Some data pending in the buffer.  We try to send
1730 		 * both buffer data and current message in one shot.
1731 		 */
1732 		struct iovec iov[3];
1733 		int i = iovFromBuffer(ct, &iov[0]);
1734 
1735 		iov[i].iov_base = buff;
1736 		iov[i].iov_len  = nBytes;
1737 
1738 		result = writev(ct->ct_fd, iov, i+1);
1739 		if (result == -1) {
1740 			if (errno == EWOULDBLOCK) {
1741 				/* No bytes sent */
1742 				result = 0;
1743 			} else {
1744 				return (-1);
1745 			}
1746 		}
1747 
1748 		/*
1749 		 * Add the bytes from the message
1750 		 * that we have not sent.
1751 		 */
1752 		if (result <= ct->ct_bufferPendingSize) {
1753 			/* No bytes from the message sent */
1754 			consumeFromBuffer(ct, result);
1755 			if (addInBuffer(ct, buff, nBytes) == -1) {
1756 				return (-1);
1757 			}
1758 		} else {
1759 			/*
1760 			 * Some bytes of the message are sent.
1761 			 * Compute the length of the message that has
1762 			 * been sent.
1763 			 */
1764 			int len = result - ct->ct_bufferPendingSize;
1765 
1766 			/* So, empty the buffer. */
1767 			ct->ct_bufferReadPtr = ct->ct_buffer;
1768 			ct->ct_bufferWritePtr = ct->ct_buffer;
1769 			ct->ct_bufferPendingSize = 0;
1770 
1771 			/* And add the remaining part of the message. */
1772 			if (len != nBytes) {
1773 				if (addInBuffer(ct, (char *)buff + len,
1774 				    nBytes-len) == -1) {
1775 					return (-1);
1776 				}
1777 			}
1778 		}
1779 	}
1780 	return (nBytes);
1781 }
1782 
1783 static void
1784 flush_registered_clients(void)
1785 {
1786 	struct nb_reg_node *node;
1787 
1788 	if (LIST_ISEMPTY(nb_first)) {
1789 		return;
1790 	}
1791 
1792 	LIST_FOR_EACH(nb_first, node) {
1793 		(void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1794 	}
1795 }
1796 
1797 static int
1798 allocate_chunk(void)
1799 {
1800 #define	CHUNK_SIZE 16
1801 	struct nb_reg_node *chk =
1802 	    malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1803 	struct nb_reg_node *n;
1804 	int i;
1805 
1806 	if (NULL == chk) {
1807 		return (-1);
1808 	}
1809 
1810 	n = chk;
1811 	for (i = 0; i < CHUNK_SIZE-1; ++i) {
1812 		n[i].next = &(n[i+1]);
1813 	}
1814 	n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1815 	nb_free = chk;
1816 	return (0);
1817 }
1818 
1819 static int
1820 register_nb(struct ct_data *ct)
1821 {
1822 	struct nb_reg_node *node;
1823 
1824 	(void) mutex_lock(&nb_list_mutex);
1825 
1826 	if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1827 		(void) mutex_unlock(&nb_list_mutex);
1828 		errno = ENOMEM;
1829 		return (-1);
1830 	}
1831 
1832 	if (!exit_handler_set) {
1833 		(void) atexit(flush_registered_clients);
1834 		exit_handler_set = TRUE;
1835 	}
1836 	/* Get the first free node */
1837 	LIST_EXTRACT(nb_free, node);
1838 
1839 	node->ct = ct;
1840 
1841 	LIST_ADD(nb_first, node);
1842 	(void) mutex_unlock(&nb_list_mutex);
1843 
1844 	return (0);
1845 }
1846 
1847 static int
1848 unregister_nb(struct ct_data *ct)
1849 {
1850 	struct nb_reg_node *node;
1851 
1852 	(void) mutex_lock(&nb_list_mutex);
1853 	assert(!LIST_ISEMPTY(nb_first));
1854 
1855 	node = nb_first;
1856 	LIST_FOR_EACH(nb_first, node) {
1857 		if (node->next->ct == ct) {
1858 			/* Get the node to unregister. */
1859 			struct nb_reg_node *n = node->next;
1860 			node->next = n->next;
1861 
1862 			n->ct = NULL;
1863 			LIST_ADD(nb_free, n);
1864 			break;
1865 		}
1866 	}
1867 	(void) mutex_unlock(&nb_list_mutex);
1868 	return (0);
1869 }
1870