xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision ac92251dc182f030faf6a5f76981d551b0b16072)
125cf1a30Sjl /*
225cf1a30Sjl  * CDDL HEADER START
325cf1a30Sjl  *
425cf1a30Sjl  * The contents of this file are subject to the terms of the
525cf1a30Sjl  * Common Development and Distribution License (the "License").
625cf1a30Sjl  * You may not use this file except in compliance with the License.
725cf1a30Sjl  *
825cf1a30Sjl  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
925cf1a30Sjl  * or http://www.opensolaris.org/os/licensing.
1025cf1a30Sjl  * See the License for the specific language governing permissions
1125cf1a30Sjl  * and limitations under the License.
1225cf1a30Sjl  *
1325cf1a30Sjl  * When distributing Covered Code, include this CDDL HEADER in each
1425cf1a30Sjl  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
1525cf1a30Sjl  * If applicable, add the following below this CDDL HEADER, with the
1625cf1a30Sjl  * fields enclosed by brackets "[]" replaced with your own identifying
1725cf1a30Sjl  * information: Portions Copyright [yyyy] [name of copyright owner]
1825cf1a30Sjl  *
1925cf1a30Sjl  * CDDL HEADER END
2025cf1a30Sjl  */
2125cf1a30Sjl 
2225cf1a30Sjl /*
2325cf1a30Sjl  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
2425cf1a30Sjl  * Use is subject to license terms.
2525cf1a30Sjl  */
2625cf1a30Sjl 
2725cf1a30Sjl #pragma ident	"%Z%%M%	%I%	%E% SMI"
2825cf1a30Sjl 
2925cf1a30Sjl /*
3025cf1a30Sjl  * FMA Event Transport Module
3125cf1a30Sjl  *
3225cf1a30Sjl  * Plugin for sending/receiving FMA events to/from a remote endoint.
3325cf1a30Sjl  */
3425cf1a30Sjl 
3525cf1a30Sjl #include <netinet/in.h>
3677a7fd96Sjrutt #include <errno.h>
3725cf1a30Sjl #include <sys/fm/protocol.h>
3825cf1a30Sjl #include <sys/sysmacros.h>
3925cf1a30Sjl #include <pthread.h>
4025cf1a30Sjl #include <strings.h>
4125cf1a30Sjl #include <ctype.h>
4225cf1a30Sjl #include <link.h>
4325cf1a30Sjl #include <libnvpair.h>
4425cf1a30Sjl #include "etm_xport_api.h"
4525cf1a30Sjl #include "etm_proto.h"
4625cf1a30Sjl 
4725cf1a30Sjl /*
4825cf1a30Sjl  * ETM declarations
4925cf1a30Sjl  */
5025cf1a30Sjl 
5125cf1a30Sjl typedef enum etm_connection_status {
5225cf1a30Sjl 	C_UNINITIALIZED = 0,
5325cf1a30Sjl 	C_OPEN,				/* Connection is open */
5425cf1a30Sjl 	C_CLOSED,			/* Connection is closed */
5525cf1a30Sjl 	C_LIMBO,			/* Bad value in header from peer */
5625cf1a30Sjl 	C_TIMED_OUT			/* Reconnection to peer timed out */
5725cf1a30Sjl } etm_connstat_t;
5825cf1a30Sjl 
5925cf1a30Sjl typedef enum etm_fmd_queue_status {
6025cf1a30Sjl 	Q_UNINITIALIZED = 100,
6125cf1a30Sjl 	Q_INIT_PENDING,			/* Queue initialization in progress */
6225cf1a30Sjl 	Q_OPEN,				/* Queue is open */
6325cf1a30Sjl 	Q_SUSPENDED			/* Queue is suspended */
6425cf1a30Sjl } etm_qstat_t;
6525cf1a30Sjl 
6625cf1a30Sjl /* Per endpoint data */
6725cf1a30Sjl typedef struct etm_endpoint_map {
6825cf1a30Sjl 	uint8_t epm_ver;		/* Protocol version being used */
6925cf1a30Sjl 	char *epm_ep_str;		/* Endpoint ID string */
7025cf1a30Sjl 	int epm_xprtflags;		/* FMD transport open flags */
7125cf1a30Sjl 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
7225cf1a30Sjl 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
7325cf1a30Sjl 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
7425cf1a30Sjl 	int epm_txbusy;			/* Busy doing send/transmit */
7525cf1a30Sjl 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
7625cf1a30Sjl 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
7725cf1a30Sjl 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
7825cf1a30Sjl 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
7925cf1a30Sjl 	etm_connstat_t epm_cstat;	/* Status of connection */
8025cf1a30Sjl 	id_t epm_timer_id;		/* Timer id */
8125cf1a30Sjl 	int epm_timer_in_use;		/* Indicates if timer is in use */
8225cf1a30Sjl 	hrtime_t epm_reconn_end;	/* Reconnection end time */
8325cf1a30Sjl 	struct etm_endpoint_map *epm_next;
8425cf1a30Sjl } etm_epmap_t;
8525cf1a30Sjl 
8625cf1a30Sjl #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
8725cf1a30Sjl #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
8825cf1a30Sjl #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
8925cf1a30Sjl #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
9025cf1a30Sjl #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
9125cf1a30Sjl #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
9225cf1a30Sjl 
9325cf1a30Sjl #define	ALLOC_BUF(hdl, buf, size) \
9425cf1a30Sjl 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
9525cf1a30Sjl 
9625cf1a30Sjl #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
9725cf1a30Sjl 
9825cf1a30Sjl #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
9925cf1a30Sjl 
10025cf1a30Sjl #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10125cf1a30Sjl 				(x)++;					    \
10225cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10325cf1a30Sjl 			}
10425cf1a30Sjl 
10525cf1a30Sjl #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10625cf1a30Sjl 				(x)--;					    \
10725cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10825cf1a30Sjl 			}
10925cf1a30Sjl 
11025cf1a30Sjl #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
11125cf1a30Sjl 				(x) += (y);				    \
11225cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
11325cf1a30Sjl 			}
11425cf1a30Sjl 
11525cf1a30Sjl /*
11625cf1a30Sjl  * Global variables
11725cf1a30Sjl  */
11825cf1a30Sjl static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
11925cf1a30Sjl 					/* Protects globals */
12025cf1a30Sjl static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
12125cf1a30Sjl static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
12225cf1a30Sjl static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
12325cf1a30Sjl static int Etm_dump = 0;		/* Enables hex dump for debug */
12425cf1a30Sjl static int Etm_exit = 0;		/* Flag for exit */
12525cf1a30Sjl static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
12625cf1a30Sjl 
12725cf1a30Sjl /* Module statistics */
12825cf1a30Sjl static struct etm_stats {
12925cf1a30Sjl 	/* read counters */
13025cf1a30Sjl 	fmd_stat_t read_ack;
13125cf1a30Sjl 	fmd_stat_t read_bytes;
13225cf1a30Sjl 	fmd_stat_t read_msg;
13377a7fd96Sjrutt 	fmd_stat_t post_filter;
13425cf1a30Sjl 	/* write counters */
13525cf1a30Sjl 	fmd_stat_t write_ack;
13625cf1a30Sjl 	fmd_stat_t write_bytes;
13725cf1a30Sjl 	fmd_stat_t write_msg;
13877a7fd96Sjrutt 	fmd_stat_t send_filter;
13925cf1a30Sjl 	/* error counters */
14025cf1a30Sjl 	fmd_stat_t error_protocol;
14125cf1a30Sjl 	fmd_stat_t error_drop_read;
14225cf1a30Sjl 	fmd_stat_t error_read;
14325cf1a30Sjl 	fmd_stat_t error_read_badhdr;
14425cf1a30Sjl 	fmd_stat_t error_write;
14577a7fd96Sjrutt 	fmd_stat_t error_send_filter;
14677a7fd96Sjrutt 	fmd_stat_t error_post_filter;
14725cf1a30Sjl 	/* misc */
14825cf1a30Sjl 	fmd_stat_t peer_count;
14925cf1a30Sjl 
15025cf1a30Sjl } Etm_stats = {
15125cf1a30Sjl 	/* read counters */
15225cf1a30Sjl 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
15325cf1a30Sjl 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
15425cf1a30Sjl 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
15577a7fd96Sjrutt 	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
15625cf1a30Sjl 	/* write counters */
15725cf1a30Sjl 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
15825cf1a30Sjl 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
15925cf1a30Sjl 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
16077a7fd96Sjrutt 	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
16125cf1a30Sjl 	/* ETM error counters */
16225cf1a30Sjl 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
16325cf1a30Sjl 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
16425cf1a30Sjl 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
16525cf1a30Sjl 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
16625cf1a30Sjl 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
16777a7fd96Sjrutt 	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
16877a7fd96Sjrutt 	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
16925cf1a30Sjl 	/* ETM Misc */
17025cf1a30Sjl 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
17125cf1a30Sjl };
17225cf1a30Sjl 
17325cf1a30Sjl /*
17425cf1a30Sjl  * ETM Private functions
17525cf1a30Sjl  */
17625cf1a30Sjl 
17725cf1a30Sjl /*
17825cf1a30Sjl  * Hex dump for debug.
17925cf1a30Sjl  */
18025cf1a30Sjl static void
18125cf1a30Sjl etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
18225cf1a30Sjl {
18325cf1a30Sjl 	int i, j, k;
18425cf1a30Sjl 	int16_t *c;
18525cf1a30Sjl 
18625cf1a30Sjl 	if (Etm_dump == 0)
18725cf1a30Sjl 		return;
18825cf1a30Sjl 
18925cf1a30Sjl 	j = buflen / 16;	/* Number of complete 8-column rows */
19025cf1a30Sjl 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
19125cf1a30Sjl 
19225cf1a30Sjl 	if (direction)
19325cf1a30Sjl 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
19425cf1a30Sjl 	else
19525cf1a30Sjl 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
19625cf1a30Sjl 
19725cf1a30Sjl 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
19825cf1a30Sjl 
19925cf1a30Sjl 	/* Dump the complete 8-column rows */
20025cf1a30Sjl 	for (i = 0; i < j; i++) {
20125cf1a30Sjl 		c = (int16_t *)buf + (i * 8);
20225cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
20325cf1a30Sjl 		    *(c+0), *(c+1), *(c+2), *(c+3),
20425cf1a30Sjl 		    *(c+4), *(c+5), *(c+6), *(c+7));
20525cf1a30Sjl 	}
20625cf1a30Sjl 
20725cf1a30Sjl 	/* Dump the last (incomplete) row */
20825cf1a30Sjl 	c = (int16_t *)buf + (i * 8);
20925cf1a30Sjl 	switch (k) {
21025cf1a30Sjl 	case 4:
21125cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
21225cf1a30Sjl 		break;
21325cf1a30Sjl 	case 8:
21425cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
21525cf1a30Sjl 		    *(c+2), *(c+3));
21625cf1a30Sjl 		break;
21725cf1a30Sjl 	case 12:
21825cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
21925cf1a30Sjl 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
22025cf1a30Sjl 		break;
22125cf1a30Sjl 	}
22225cf1a30Sjl 
22325cf1a30Sjl 	fmd_hdl_debug(hdl, "---      End Dump      ---");
22425cf1a30Sjl }
22525cf1a30Sjl 
22625cf1a30Sjl /*
22725cf1a30Sjl  * Provide the length of a message based on the data in the given ETM header.
22825cf1a30Sjl  */
22925cf1a30Sjl static size_t
23025cf1a30Sjl etm_get_msglen(void *buf)
23125cf1a30Sjl {
23225cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
23325cf1a30Sjl 
23425cf1a30Sjl 	return (ntohl(hp->hdr_msglen));
23525cf1a30Sjl }
23625cf1a30Sjl 
23725cf1a30Sjl /*
23825cf1a30Sjl  * Check the contents of the ETM header for errors.
23925cf1a30Sjl  * Return the header type (hdr_type).
24025cf1a30Sjl  */
24125cf1a30Sjl static int
24225cf1a30Sjl etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
24325cf1a30Sjl {
24425cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
24525cf1a30Sjl 
24625cf1a30Sjl 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
24825cf1a30Sjl 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
24925cf1a30Sjl 		return (ETM_HDR_INVALID);
25025cf1a30Sjl 	}
25125cf1a30Sjl 
25225cf1a30Sjl 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
25325cf1a30Sjl 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
25425cf1a30Sjl 		/* Until version is negotiated, other fields may be wrong */
25525cf1a30Sjl 		return (hp->hdr_type);
25625cf1a30Sjl 	}
25725cf1a30Sjl 
25825cf1a30Sjl 	if (hp->hdr_ver != mp->epm_ver) {
259154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
26025cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_ver);
26125cf1a30Sjl 		return (ETM_HDR_BADVERSION);
26225cf1a30Sjl 	}
26325cf1a30Sjl 
26425cf1a30Sjl 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
26525cf1a30Sjl 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
26725cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_type);
26825cf1a30Sjl 		return (ETM_HDR_BADTYPE);
26925cf1a30Sjl 	}
27025cf1a30Sjl 
27125cf1a30Sjl 	return (hp->hdr_type);
27225cf1a30Sjl }
27325cf1a30Sjl 
27425cf1a30Sjl /*
27525cf1a30Sjl  * Create an ETM header of a given type in the given buffer.
27625cf1a30Sjl  * Return length of header.
27725cf1a30Sjl  */
27825cf1a30Sjl static size_t
27925cf1a30Sjl etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
28025cf1a30Sjl {
28125cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
28225cf1a30Sjl 
28325cf1a30Sjl 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
28425cf1a30Sjl 	hp->hdr_ver = ver;
28525cf1a30Sjl 	hp->hdr_type = type;
28625cf1a30Sjl 	hp->hdr_msglen = htonl(msglen);
28725cf1a30Sjl 
28825cf1a30Sjl 	return (ETM_HDRLEN);
28925cf1a30Sjl }
29025cf1a30Sjl 
29125cf1a30Sjl /*
29225cf1a30Sjl  * Convert message bytes to nvlist and post to fmd.
29325cf1a30Sjl  * Return zero for success, non-zero for failure.
29425cf1a30Sjl  *
29525cf1a30Sjl  * Note : nvl is free'd by fmd.
29625cf1a30Sjl  */
29725cf1a30Sjl static int
29825cf1a30Sjl etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
29925cf1a30Sjl {
30025cf1a30Sjl 	nvlist_t *nvl;
30125cf1a30Sjl 	int rv;
30225cf1a30Sjl 
30325cf1a30Sjl 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304154b1f02Sjrutt 		fmd_hdl_error(hdl, "failed to unpack message");
30525cf1a30Sjl 		return (1);
30625cf1a30Sjl 	}
30725cf1a30Sjl 
30877a7fd96Sjrutt 	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
30977a7fd96Sjrutt 	if (rv == ETM_XPORT_FILTER_DROP) {
31077a7fd96Sjrutt 		fmd_hdl_debug(hdl, "post_filter dropped event");
31177a7fd96Sjrutt 		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
31277a7fd96Sjrutt 		nvlist_free(nvl);
31377a7fd96Sjrutt 		return (0);
31477a7fd96Sjrutt 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
31577a7fd96Sjrutt 		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
31677a7fd96Sjrutt 		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
31777a7fd96Sjrutt 		/* Still post event */
31877a7fd96Sjrutt 	}
31977a7fd96Sjrutt 
32025cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
32125cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
32225cf1a30Sjl 	if (!Etm_exit) {
32325cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
32425cf1a30Sjl 		if (mp->epm_qstat == Q_OPEN) {
32525cf1a30Sjl 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
32625cf1a30Sjl 			rv = 0;
32725cf1a30Sjl 		} else if (mp->epm_qstat == Q_SUSPENDED) {
32825cf1a30Sjl 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
32925cf1a30Sjl 			if (mp->epm_timer_in_use) {
33025cf1a30Sjl 				fmd_timer_remove(hdl, mp->epm_timer_id);
33125cf1a30Sjl 				mp->epm_timer_in_use = 0;
33225cf1a30Sjl 			}
33325cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
33425cf1a30Sjl 			fmd_hdl_debug(hdl, "queue resumed for %s",
33525cf1a30Sjl 			    mp->epm_ep_str);
33625cf1a30Sjl 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
33725cf1a30Sjl 			rv = 0;
33825cf1a30Sjl 		} else {
33925cf1a30Sjl 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
34025cf1a30Sjl 			    mp->epm_qstat);
341154b1f02Sjrutt 			nvlist_free(nvl);
342154b1f02Sjrutt 			/* Remote peer will attempt to resend event */
34325cf1a30Sjl 			rv = 2;
34425cf1a30Sjl 		}
34525cf1a30Sjl 	} else {
34625cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
34725cf1a30Sjl 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348154b1f02Sjrutt 		nvlist_free(nvl);
349154b1f02Sjrutt 		/* Remote peer will attempt to resend event */
35025cf1a30Sjl 		rv = 3;
35125cf1a30Sjl 	}
35225cf1a30Sjl 
35325cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
35425cf1a30Sjl 
35525cf1a30Sjl 	return (rv);
35625cf1a30Sjl }
35725cf1a30Sjl 
35825cf1a30Sjl /*
35925cf1a30Sjl  * Handle the startup handshake to the server.  The client always initiates
36025cf1a30Sjl  * the startup handshake.  In the following sequence, we are the client and
36125cf1a30Sjl  * the remote endpoint is the server.
36225cf1a30Sjl  *
36325cf1a30Sjl  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
36425cf1a30Sjl  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
36525cf1a30Sjl  *	Client sends ACK and transitions to Q_OPEN state.
36625cf1a30Sjl  *	Server receives ACK and transitions to Q_OPEN state.
36725cf1a30Sjl  *
36825cf1a30Sjl  * Return 0 for success, nonzero for failure.
36925cf1a30Sjl  */
37025cf1a30Sjl static int
37125cf1a30Sjl etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
37225cf1a30Sjl {
37325cf1a30Sjl 	etm_proto_hdr_t *hp;
37425cf1a30Sjl 	size_t hdrlen = ETM_HDRLEN;
37525cf1a30Sjl 	int hdrstat;
37625cf1a30Sjl 	char hbuf[ETM_HDRLEN];
37725cf1a30Sjl 
37825cf1a30Sjl 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
37925cf1a30Sjl 		return (1);
38025cf1a30Sjl 
38125cf1a30Sjl 	mp->epm_cstat = C_OPEN;
38225cf1a30Sjl 
38325cf1a30Sjl 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
38425cf1a30Sjl 
38525cf1a30Sjl 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
38625cf1a30Sjl 	    hdrlen)) != hdrlen) {
38725cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
38825cf1a30Sjl 		    mp->epm_ep_str);
38925cf1a30Sjl 		return (2);
39025cf1a30Sjl 	}
39125cf1a30Sjl 
39225cf1a30Sjl 	mp->epm_qstat = Q_INIT_PENDING;
39325cf1a30Sjl 
39425cf1a30Sjl 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
39525cf1a30Sjl 	    hdrlen)) != hdrlen) {
39625cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
39725cf1a30Sjl 		    mp->epm_ep_str);
39825cf1a30Sjl 		return (3);
39925cf1a30Sjl 	}
40025cf1a30Sjl 
40125cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
40225cf1a30Sjl 
40325cf1a30Sjl 	if (hdrstat != ETM_HDR_S_HELLO) {
40425cf1a30Sjl 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
40525cf1a30Sjl 		    "from %s", mp->epm_ep_str);
40625cf1a30Sjl 		return (4);
40725cf1a30Sjl 	}
40825cf1a30Sjl 
40925cf1a30Sjl 	/*
41025cf1a30Sjl 	 * Get version from the server.
41125cf1a30Sjl 	 * Currently, only one version is supported.
41225cf1a30Sjl 	 */
41325cf1a30Sjl 	hp = (etm_proto_hdr_t *)(void *)hbuf;
41425cf1a30Sjl 	if (hp->hdr_ver != ETM_PROTO_V1) {
41525cf1a30Sjl 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
41625cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_ver);
41725cf1a30Sjl 		return (5);
41825cf1a30Sjl 	}
41925cf1a30Sjl 	mp->epm_ver = hp->hdr_ver;
42025cf1a30Sjl 
42125cf1a30Sjl 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
42225cf1a30Sjl 
42325cf1a30Sjl 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
42425cf1a30Sjl 	    hdrlen)) != hdrlen) {
42525cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
42625cf1a30Sjl 		    mp->epm_ep_str);
42725cf1a30Sjl 		return (6);
42825cf1a30Sjl 	}
42925cf1a30Sjl 
43025cf1a30Sjl 	/*
43125cf1a30Sjl 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
43225cf1a30Sjl 	 * Etm_mod_lock held to avoid race with etm_send thread.
43325cf1a30Sjl 	 */
43425cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
43525cf1a30Sjl 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
43625cf1a30Sjl 	    mp->epm_ep_nvl, NULL)) == NULL) {
43725cf1a30Sjl 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
43825cf1a30Sjl 		    mp->epm_ep_str);
43925cf1a30Sjl 	}
44025cf1a30Sjl 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
44125cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
44225cf1a30Sjl 
44325cf1a30Sjl 	mp->epm_qstat = Q_OPEN;
44425cf1a30Sjl 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
44525cf1a30Sjl 
44625cf1a30Sjl 	return (0);
44725cf1a30Sjl }
44825cf1a30Sjl 
449*ac92251dSjrutt /*
450*ac92251dSjrutt  * Open a connection to the peer, send a SHUTDOWN message,
451*ac92251dSjrutt  * and close the connection.
452*ac92251dSjrutt  */
453*ac92251dSjrutt static void
454*ac92251dSjrutt etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
455*ac92251dSjrutt {
456*ac92251dSjrutt 	size_t hdrlen = ETM_HDRLEN;
457*ac92251dSjrutt 	char hbuf[ETM_HDRLEN];
458*ac92251dSjrutt 
459*ac92251dSjrutt 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
460*ac92251dSjrutt 		return;
461*ac92251dSjrutt 
462*ac92251dSjrutt 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
463*ac92251dSjrutt 
464*ac92251dSjrutt 	(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
465*ac92251dSjrutt 
466*ac92251dSjrutt 	(void) etm_xport_close(hdl, mp->epm_oconn);
467*ac92251dSjrutt 	mp->epm_oconn = NULL;
468*ac92251dSjrutt }
469*ac92251dSjrutt 
47025cf1a30Sjl /*
47125cf1a30Sjl  * Alloc a nvlist and add a string for the endpoint.
47225cf1a30Sjl  * Return zero for success, non-zero for failure.
47325cf1a30Sjl  */
47425cf1a30Sjl static int
47525cf1a30Sjl etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
47625cf1a30Sjl {
47725cf1a30Sjl 	/*
47825cf1a30Sjl 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
47925cf1a30Sjl 	 * in fmd when this nvlist_t is free'd.
48025cf1a30Sjl 	 */
48125cf1a30Sjl 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
48225cf1a30Sjl 
48325cf1a30Sjl 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
484154b1f02Sjrutt 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
48525cf1a30Sjl 		    "for %s", mp->epm_ep_str);
48625cf1a30Sjl 		nvlist_free(mp->epm_ep_nvl);
48725cf1a30Sjl 		return (1);
48825cf1a30Sjl 	}
48925cf1a30Sjl 
49025cf1a30Sjl 	return (0);
49125cf1a30Sjl }
49225cf1a30Sjl 
49325cf1a30Sjl /*
49425cf1a30Sjl  * Free the nvlist for the endpoint_id string.
49525cf1a30Sjl  */
49625cf1a30Sjl /*ARGSUSED*/
49725cf1a30Sjl static void
49825cf1a30Sjl etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
49925cf1a30Sjl {
50025cf1a30Sjl 	nvlist_free(mp->epm_ep_nvl);
50125cf1a30Sjl }
50225cf1a30Sjl 
50325cf1a30Sjl /*
50425cf1a30Sjl  * Check for a duplicate endpoint/peer string.
50525cf1a30Sjl  */
50625cf1a30Sjl /*ARGSUSED*/
50725cf1a30Sjl static int
50825cf1a30Sjl etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
50925cf1a30Sjl {
51025cf1a30Sjl 	etm_epmap_t *mp;
51125cf1a30Sjl 
51225cf1a30Sjl 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
51325cf1a30Sjl 		if (strcmp(epname, mp->epm_ep_str) == 0)
51425cf1a30Sjl 			return (1);
51525cf1a30Sjl 
51625cf1a30Sjl 	return (0);
51725cf1a30Sjl }
51825cf1a30Sjl 
51925cf1a30Sjl /*
52025cf1a30Sjl  * Attempt to re-open a connection with the remote endpoint.
52125cf1a30Sjl  */
52225cf1a30Sjl static void
52325cf1a30Sjl etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
52425cf1a30Sjl {
52525cf1a30Sjl 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
52625cf1a30Sjl 		if (gethrtime() < mp->epm_reconn_end) {
52725cf1a30Sjl 			if ((mp->epm_oconn = etm_xport_open(hdl,
52825cf1a30Sjl 			    mp->epm_tlhdl)) == NULL) {
52925cf1a30Sjl 				fmd_hdl_debug(hdl, "reconnect failed for %s",
53025cf1a30Sjl 				    mp->epm_ep_str);
53125cf1a30Sjl 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
53225cf1a30Sjl 				    NULL, Reconn_interval);
53325cf1a30Sjl 				mp->epm_timer_in_use = 1;
53425cf1a30Sjl 			} else {
53525cf1a30Sjl 				fmd_hdl_debug(hdl, "reconnect success for %s",
53625cf1a30Sjl 				    mp->epm_ep_str);
53725cf1a30Sjl 				mp->epm_reconn_end = 0;
53825cf1a30Sjl 				mp->epm_cstat = C_OPEN;
53925cf1a30Sjl 			}
54025cf1a30Sjl 		} else {
54125cf1a30Sjl 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
54225cf1a30Sjl 			    mp->epm_ep_str);
54325cf1a30Sjl 			mp->epm_reconn_end = 0;
54425cf1a30Sjl 			mp->epm_cstat = C_TIMED_OUT;
54525cf1a30Sjl 		}
54625cf1a30Sjl 	}
54725cf1a30Sjl 
54825cf1a30Sjl 	if (mp->epm_cstat == C_OPEN) {
54925cf1a30Sjl 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
55025cf1a30Sjl 		mp->epm_qstat = Q_OPEN;
55125cf1a30Sjl 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
55225cf1a30Sjl 	}
55325cf1a30Sjl }
55425cf1a30Sjl 
55525cf1a30Sjl /*
55625cf1a30Sjl  * Suspend a given connection and setup for reconnection retries.
557154b1f02Sjrutt  * Assume caller holds lock on epm_lock.
55825cf1a30Sjl  */
55925cf1a30Sjl static void
56025cf1a30Sjl etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
56125cf1a30Sjl {
56225cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
56325cf1a30Sjl 	if (Etm_exit) {
56425cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
56525cf1a30Sjl 		return;
56625cf1a30Sjl 	}
56725cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
56825cf1a30Sjl 
56925cf1a30Sjl 	if (mp->epm_oconn != NULL) {
57025cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
57125cf1a30Sjl 		mp->epm_oconn = NULL;
57225cf1a30Sjl 	}
57325cf1a30Sjl 
57425cf1a30Sjl 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
57525cf1a30Sjl 	mp->epm_cstat = C_UNINITIALIZED;
57625cf1a30Sjl 
57725cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
57825cf1a30Sjl 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
57925cf1a30Sjl 		mp->epm_qstat = Q_SUSPENDED;
58025cf1a30Sjl 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
58125cf1a30Sjl 
58225cf1a30Sjl 		if (mp->epm_timer_in_use == 0) {
58325cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
58425cf1a30Sjl 			    Reconn_interval);
58525cf1a30Sjl 			mp->epm_timer_in_use = 1;
58625cf1a30Sjl 		}
58725cf1a30Sjl 	}
58825cf1a30Sjl }
58925cf1a30Sjl 
59025cf1a30Sjl /*
59125cf1a30Sjl  * Reinitialize the connection. The old fmd_xprt_t handle must be
59225cf1a30Sjl  * removed/closed first.
59325cf1a30Sjl  * Assume caller holds lock on epm_lock.
59425cf1a30Sjl  */
59525cf1a30Sjl static void
59625cf1a30Sjl etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
59725cf1a30Sjl {
59825cf1a30Sjl 	/*
59925cf1a30Sjl 	 * To avoid a deadlock, wait for etm_send to finish before
60025cf1a30Sjl 	 * calling fmd_xprt_close()
60125cf1a30Sjl 	 */
60225cf1a30Sjl 	while (mp->epm_txbusy)
60325cf1a30Sjl 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
60425cf1a30Sjl 
60525cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
60625cf1a30Sjl 		fmd_xprt_close(hdl, mp->epm_xprthdl);
607154b1f02Sjrutt 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
60825cf1a30Sjl 		mp->epm_xprthdl = NULL;
60925cf1a30Sjl 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
61025cf1a30Sjl 		mp->epm_ep_nvl = NULL;
61125cf1a30Sjl 	}
61225cf1a30Sjl 
61325cf1a30Sjl 	if (mp->epm_timer_in_use) {
61425cf1a30Sjl 		fmd_timer_remove(hdl, mp->epm_timer_id);
61525cf1a30Sjl 		mp->epm_timer_in_use = 0;
61625cf1a30Sjl 	}
61725cf1a30Sjl 
61825cf1a30Sjl 	if (mp->epm_oconn != NULL) {
61925cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
62025cf1a30Sjl 		mp->epm_oconn = NULL;
62125cf1a30Sjl 	}
62225cf1a30Sjl 
62325cf1a30Sjl 	mp->epm_cstat = C_UNINITIALIZED;
62425cf1a30Sjl 	mp->epm_qstat = Q_UNINITIALIZED;
62525cf1a30Sjl }
62625cf1a30Sjl 
62725cf1a30Sjl /*
62825cf1a30Sjl  * Receive data from ETM transport layer.
62925cf1a30Sjl  * Note : This is not the fmdo_recv entry point.
63025cf1a30Sjl  *
63125cf1a30Sjl  */
63225cf1a30Sjl static int
63325cf1a30Sjl etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
63425cf1a30Sjl {
63525cf1a30Sjl 	size_t buflen, hdrlen;
63625cf1a30Sjl 	void *buf;
63725cf1a30Sjl 	char hbuf[ETM_HDRLEN];
63825cf1a30Sjl 	int hdrstat, rv;
63925cf1a30Sjl 
64025cf1a30Sjl 	hdrlen = ETM_HDRLEN;
64125cf1a30Sjl 
64225cf1a30Sjl 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
64325cf1a30Sjl 		fmd_hdl_debug(hdl, "failed to read header from %s",
64425cf1a30Sjl 		    mp->epm_ep_str);
64525cf1a30Sjl 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
64625cf1a30Sjl 		return (EIO);
64725cf1a30Sjl 	}
64825cf1a30Sjl 
64925cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
65025cf1a30Sjl 
65125cf1a30Sjl 	switch (hdrstat) {
65225cf1a30Sjl 	case ETM_HDR_INVALID:
65325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
65425cf1a30Sjl 		if (mp->epm_cstat == C_OPEN)
65525cf1a30Sjl 			mp->epm_cstat = C_CLOSED;
65625cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
65725cf1a30Sjl 
65825cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
65925cf1a30Sjl 		rv = ECANCELED;
66025cf1a30Sjl 		break;
66125cf1a30Sjl 
66225cf1a30Sjl 	case ETM_HDR_BADTYPE:
66325cf1a30Sjl 	case ETM_HDR_BADVERSION:
66425cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
66525cf1a30Sjl 
66625cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
66725cf1a30Sjl 		    hdrlen)) != hdrlen) {
66825cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
66925cf1a30Sjl 			    mp->epm_ep_str);
67025cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
67125cf1a30Sjl 			return (EIO);
67225cf1a30Sjl 		}
67325cf1a30Sjl 
67425cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
67525cf1a30Sjl 		mp->epm_cstat = C_LIMBO;
67625cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
67725cf1a30Sjl 
67825cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
67925cf1a30Sjl 		rv = ENOTSUP;
68025cf1a30Sjl 		break;
68125cf1a30Sjl 
68225cf1a30Sjl 	case ETM_HDR_C_HELLO:
68325cf1a30Sjl 		/* Client is initiating a startup handshake */
68425cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
68525cf1a30Sjl 		etm_reinit(hdl, mp);
68625cf1a30Sjl 		mp->epm_qstat = Q_INIT_PENDING;
68725cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
68825cf1a30Sjl 
68925cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
69025cf1a30Sjl 
69125cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
69225cf1a30Sjl 		    hdrlen)) != hdrlen) {
69325cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
69425cf1a30Sjl 			    mp->epm_ep_str);
69525cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
69625cf1a30Sjl 			return (EIO);
69725cf1a30Sjl 		}
69825cf1a30Sjl 
69925cf1a30Sjl 		rv = 0;
70025cf1a30Sjl 		break;
70125cf1a30Sjl 
70225cf1a30Sjl 	case ETM_HDR_ACK:
70325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
70425cf1a30Sjl 		if (mp->epm_qstat == Q_INIT_PENDING) {
70525cf1a30Sjl 			/* This is client's ACK from startup handshake */
70625cf1a30Sjl 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
70725cf1a30Sjl 			if (mp->epm_ep_nvl == NULL)
70825cf1a30Sjl 				(void) etm_get_ep_nvl(hdl, mp);
70925cf1a30Sjl 
71025cf1a30Sjl 			/*
71125cf1a30Sjl 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
71225cf1a30Sjl 			 * Etm_mod_lock held to avoid race with etm_send thread.
71325cf1a30Sjl 			 */
71425cf1a30Sjl 			(void) pthread_mutex_lock(&Etm_mod_lock);
71525cf1a30Sjl 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
71625cf1a30Sjl 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
71725cf1a30Sjl 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
71825cf1a30Sjl 				    "for %s", mp->epm_ep_str);
71925cf1a30Sjl 			}
72025cf1a30Sjl 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
72125cf1a30Sjl 			(void) pthread_mutex_unlock(&Etm_mod_lock);
72225cf1a30Sjl 
72325cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
72425cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
72525cf1a30Sjl 			fmd_hdl_debug(hdl, "queue open for %s",
72625cf1a30Sjl 			    mp->epm_ep_str);
72725cf1a30Sjl 		} else {
72825cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
72925cf1a30Sjl 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
73025cf1a30Sjl 			    "from %s\n", mp->epm_ep_str);
73125cf1a30Sjl 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
73225cf1a30Sjl 		}
73325cf1a30Sjl 
73425cf1a30Sjl 		rv = 0;
73525cf1a30Sjl 		break;
73625cf1a30Sjl 
73725cf1a30Sjl 	case ETM_HDR_SHUTDOWN:
73825cf1a30Sjl 		fmd_hdl_debug(hdl, "received shutdown from %s",
73925cf1a30Sjl 		    mp->epm_ep_str);
74025cf1a30Sjl 
74125cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
74225cf1a30Sjl 
74325cf1a30Sjl 		etm_reinit(hdl, mp);
74425cf1a30Sjl 
74525cf1a30Sjl 		if (IS_CLIENT(mp)) {
74625cf1a30Sjl 			/*
74725cf1a30Sjl 			 * A server shutdown is considered to be temporary.
74825cf1a30Sjl 			 * Prepare for reconnection.
74925cf1a30Sjl 			 */
75025cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
75125cf1a30Sjl 			    Reconn_interval);
75225cf1a30Sjl 
75325cf1a30Sjl 			mp->epm_timer_in_use = 1;
75425cf1a30Sjl 		}
75525cf1a30Sjl 
75625cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
75725cf1a30Sjl 
75825cf1a30Sjl 		rv = ECANCELED;
75925cf1a30Sjl 		break;
76025cf1a30Sjl 
76125cf1a30Sjl 	case ETM_HDR_MSG:
76225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
76325cf1a30Sjl 		if (mp->epm_qstat == Q_UNINITIALIZED) {
76425cf1a30Sjl 			/* Peer (client) is unaware that we've restarted */
76525cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
76625cf1a30Sjl 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
76725cf1a30Sjl 			    ETM_HDR_S_RESTART, 0);
76825cf1a30Sjl 
76925cf1a30Sjl 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
77025cf1a30Sjl 			    hdrlen)) != hdrlen) {
77125cf1a30Sjl 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
77225cf1a30Sjl 				    "to %s", mp->epm_ep_str);
77325cf1a30Sjl 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
77425cf1a30Sjl 				return (EIO);
77525cf1a30Sjl 			}
77625cf1a30Sjl 
77725cf1a30Sjl 			return (ECANCELED);
77825cf1a30Sjl 		}
77925cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
78025cf1a30Sjl 
78125cf1a30Sjl 		buflen = etm_get_msglen(hbuf);
78225cf1a30Sjl 		ALLOC_BUF(hdl, buf, buflen);
78325cf1a30Sjl 
78425cf1a30Sjl 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
78525cf1a30Sjl 		    buflen) != buflen) {
78625cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to read message from %s",
78725cf1a30Sjl 			    mp->epm_ep_str);
78825cf1a30Sjl 			FREE_BUF(hdl, buf, buflen);
78925cf1a30Sjl 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
79025cf1a30Sjl 			return (EIO);
79125cf1a30Sjl 		}
79225cf1a30Sjl 
79325cf1a30Sjl 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
79425cf1a30Sjl 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
79525cf1a30Sjl 
79625cf1a30Sjl 		etm_hex_dump(hdl, buf, buflen, 0);
79725cf1a30Sjl 
79825cf1a30Sjl 		if (etm_post_msg(hdl, mp, buf, buflen)) {
79925cf1a30Sjl 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
80025cf1a30Sjl 			FREE_BUF(hdl, buf, buflen);
80125cf1a30Sjl 			return (EIO);
80225cf1a30Sjl 		}
80325cf1a30Sjl 
80425cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
80525cf1a30Sjl 
80625cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
80725cf1a30Sjl 
80825cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
80925cf1a30Sjl 		    hdrlen)) != hdrlen) {
81025cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
81125cf1a30Sjl 			    mp->epm_ep_str);
81225cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
81325cf1a30Sjl 			return (EIO);
81425cf1a30Sjl 		}
81525cf1a30Sjl 
81625cf1a30Sjl 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
81725cf1a30Sjl 
81825cf1a30Sjl 		/*
81925cf1a30Sjl 		 * If we got this far and the current state of the
82025cf1a30Sjl 		 * outbound/sending connection is TIMED_OUT or
82125cf1a30Sjl 		 * LIMBO, then we should reinitialize it.
82225cf1a30Sjl 		 */
82325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
82425cf1a30Sjl 		if (mp->epm_cstat == C_TIMED_OUT ||
82525cf1a30Sjl 		    mp->epm_cstat == C_LIMBO) {
82625cf1a30Sjl 			if (mp->epm_oconn != NULL) {
82725cf1a30Sjl 				(void) etm_xport_close(hdl, mp->epm_oconn);
82825cf1a30Sjl 				mp->epm_oconn = NULL;
82925cf1a30Sjl 			}
83025cf1a30Sjl 			mp->epm_cstat = C_UNINITIALIZED;
83125cf1a30Sjl 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
83225cf1a30Sjl 			if (mp->epm_timer_in_use) {
83325cf1a30Sjl 				fmd_timer_remove(hdl, mp->epm_timer_id);
83425cf1a30Sjl 				mp->epm_timer_in_use = 0;
83525cf1a30Sjl 			}
83625cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
83725cf1a30Sjl 			fmd_hdl_debug(hdl, "queue resumed for %s",
83825cf1a30Sjl 			    mp->epm_ep_str);
83925cf1a30Sjl 		}
84025cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
84125cf1a30Sjl 
84225cf1a30Sjl 		rv = 0;
84325cf1a30Sjl 		break;
84425cf1a30Sjl 
84525cf1a30Sjl 	default:
84625cf1a30Sjl 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
84725cf1a30Sjl 		    "from %s : %d", mp->epm_ep_str, hdrstat);
84825cf1a30Sjl 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
84925cf1a30Sjl 		rv = 0;
85025cf1a30Sjl 	}
85125cf1a30Sjl 
85225cf1a30Sjl 	return (rv);
85325cf1a30Sjl }
85425cf1a30Sjl 
85525cf1a30Sjl /*
85625cf1a30Sjl  * ETM transport layer callback function.
85725cf1a30Sjl  * The transport layer calls this function to :
85825cf1a30Sjl  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
85925cf1a30Sjl  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
86025cf1a30Sjl  */
86125cf1a30Sjl static int
86225cf1a30Sjl etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
86325cf1a30Sjl     void *arg)
86425cf1a30Sjl {
86525cf1a30Sjl 	etm_epmap_t *mp = (etm_epmap_t *)arg;
86625cf1a30Sjl 	int rv = 0;
86725cf1a30Sjl 
86825cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
86925cf1a30Sjl 	if (Etm_exit) {
87025cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
87125cf1a30Sjl 		return (ECANCELED);
87225cf1a30Sjl 	}
87325cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
87425cf1a30Sjl 
87525cf1a30Sjl 	switch (flag) {
87625cf1a30Sjl 	case ETM_CBFLAG_RECV:
87725cf1a30Sjl 		rv = etm_recv(hdl, conn, mp);
87825cf1a30Sjl 		break;
87925cf1a30Sjl 	case ETM_CBFLAG_REINIT:
88025cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
88125cf1a30Sjl 		etm_reinit(hdl, mp);
882*ac92251dSjrutt 		etm_send_shutdown(hdl, mp);
88325cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
88425cf1a30Sjl 		/*
88525cf1a30Sjl 		 * Return ECANCELED so the transport layer will close the
88625cf1a30Sjl 		 * server connection.  The transport layer is responsible for
88725cf1a30Sjl 		 * reestablishing this connection (should a connection request
88825cf1a30Sjl 		 * arrive from the peer).
88925cf1a30Sjl 		 */
89025cf1a30Sjl 		rv = ECANCELED;
89125cf1a30Sjl 		break;
89225cf1a30Sjl 	default:
89325cf1a30Sjl 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
89425cf1a30Sjl 		rv = ENOTSUP;
89525cf1a30Sjl 	}
89625cf1a30Sjl 
89725cf1a30Sjl 	return (rv);
89825cf1a30Sjl }
89925cf1a30Sjl 
90025cf1a30Sjl /*
90125cf1a30Sjl  * Allocate and initialize an etm_epmap_t struct for the given endpoint
90225cf1a30Sjl  * name string.
90325cf1a30Sjl  */
90425cf1a30Sjl static void
90525cf1a30Sjl etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
90625cf1a30Sjl {
90725cf1a30Sjl 	etm_epmap_t *newmap;
90825cf1a30Sjl 
90925cf1a30Sjl 	if (etm_check_dup_ep_str(hdl, epname)) {
91025cf1a30Sjl 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
91125cf1a30Sjl 		return;
91225cf1a30Sjl 	}
91325cf1a30Sjl 
91425cf1a30Sjl 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
91525cf1a30Sjl 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
91625cf1a30Sjl 	newmap->epm_xprtflags = flags;
91725cf1a30Sjl 	newmap->epm_cstat = C_UNINITIALIZED;
91825cf1a30Sjl 	newmap->epm_qstat = Q_UNINITIALIZED;
91925cf1a30Sjl 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
92025cf1a30Sjl 	newmap->epm_txbusy = 0;
92125cf1a30Sjl 
92225cf1a30Sjl 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
92325cf1a30Sjl 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
92425cf1a30Sjl 
92525cf1a30Sjl 	if (etm_get_ep_nvl(hdl, newmap)) {
92625cf1a30Sjl 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
92725cf1a30Sjl 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
92825cf1a30Sjl 		return;
92925cf1a30Sjl 	}
93025cf1a30Sjl 
931*ac92251dSjrutt 	(void) pthread_mutex_lock(&newmap->epm_lock);
932*ac92251dSjrutt 
93325cf1a30Sjl 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
93425cf1a30Sjl 	    etm_cb_func, newmap)) == NULL) {
93525cf1a30Sjl 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
93625cf1a30Sjl 		    newmap->epm_ep_str);
93725cf1a30Sjl 		etm_free_ep_nvl(hdl, newmap);
938*ac92251dSjrutt 		(void) pthread_mutex_unlock(&newmap->epm_lock);
939*ac92251dSjrutt 		(void) pthread_mutex_destroy(&newmap->epm_lock);
94025cf1a30Sjl 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
94125cf1a30Sjl 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
94225cf1a30Sjl 		return;
94325cf1a30Sjl 	}
94425cf1a30Sjl 
94525cf1a30Sjl 	if (IS_CLIENT(newmap)) {
94625cf1a30Sjl 		if (etm_handle_startup(hdl, newmap)) {
947154b1f02Sjrutt 			/*
948154b1f02Sjrutt 			 * For whatever reason, we could not complete the
949154b1f02Sjrutt 			 * startup handshake with the server.  Set the timer
950154b1f02Sjrutt 			 * and try again.
951154b1f02Sjrutt 			 */
952154b1f02Sjrutt 			if (newmap->epm_oconn != NULL) {
953154b1f02Sjrutt 				(void) etm_xport_close(hdl, newmap->epm_oconn);
954154b1f02Sjrutt 				newmap->epm_oconn = NULL;
955154b1f02Sjrutt 			}
956154b1f02Sjrutt 			newmap->epm_cstat = C_UNINITIALIZED;
957154b1f02Sjrutt 			newmap->epm_qstat = Q_UNINITIALIZED;
958154b1f02Sjrutt 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
959154b1f02Sjrutt 			    NULL, Reconn_interval);
960154b1f02Sjrutt 			newmap->epm_timer_in_use = 1;
96125cf1a30Sjl 		}
962*ac92251dSjrutt 	} else {
963*ac92251dSjrutt 		/*
964*ac92251dSjrutt 		 * We may be restarting after a crash.  If so, the client
965*ac92251dSjrutt 		 * may be unaware of this.
966*ac92251dSjrutt 		 */
967*ac92251dSjrutt 		etm_send_shutdown(hdl, newmap);
96825cf1a30Sjl 	}
96925cf1a30Sjl 
97025cf1a30Sjl 	/* Add this transport instance handle to the list */
97125cf1a30Sjl 	newmap->epm_next = Epmap_head;
97225cf1a30Sjl 	Epmap_head = newmap;
97325cf1a30Sjl 
974*ac92251dSjrutt 	(void) pthread_mutex_unlock(&newmap->epm_lock);
975*ac92251dSjrutt 
97625cf1a30Sjl 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
97725cf1a30Sjl }
97825cf1a30Sjl 
97925cf1a30Sjl /*
98025cf1a30Sjl  * Parse the given property list string and call etm_init_epmap
98125cf1a30Sjl  * for each endpoint.
98225cf1a30Sjl  */
98325cf1a30Sjl static void
98425cf1a30Sjl etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
98525cf1a30Sjl {
98625cf1a30Sjl 	char *epstr, *ep, *prefix, *lasts, *numstr;
98725cf1a30Sjl 	char epname[MAXPATHLEN];
98825cf1a30Sjl 	size_t slen, nlen;
98925cf1a30Sjl 	int beg, end, i;
99025cf1a30Sjl 
99125cf1a30Sjl 	if (eplist == NULL)
99225cf1a30Sjl 		return;
99325cf1a30Sjl 	/*
99425cf1a30Sjl 	 * Create a copy of eplist for parsing.
99525cf1a30Sjl 	 * strtok/strtok_r(3C) will insert null chars to the string.
99625cf1a30Sjl 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
99725cf1a30Sjl 	 */
99825cf1a30Sjl 	slen = strlen(eplist);
99925cf1a30Sjl 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
100025cf1a30Sjl 	(void) strcpy(epstr, eplist);
100125cf1a30Sjl 
100225cf1a30Sjl 	/*
100325cf1a30Sjl 	 * The following are supported for the "client_list" and
100425cf1a30Sjl 	 * "server_list" properties :
100525cf1a30Sjl 	 *
100625cf1a30Sjl 	 *    A space-separated list of endpoints.
100725cf1a30Sjl 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
100825cf1a30Sjl 	 *
100925cf1a30Sjl 	 *    An array syntax for a range of instances.
101025cf1a30Sjl 	 *	"dev:///dom[0:2]"
101125cf1a30Sjl 	 *
101225cf1a30Sjl 	 *    A combination of both.
101325cf1a30Sjl 	 *	"dev:///dom0 dev:///dom[1:2]"
101425cf1a30Sjl 	 */
101525cf1a30Sjl 	ep = strtok_r(epstr, " ", &lasts);
101625cf1a30Sjl 	while (ep != NULL) {
101725cf1a30Sjl 		if (strchr(ep, '[') != NULL) {
101825cf1a30Sjl 			/*
101925cf1a30Sjl 			 * This string is using array syntax.
102025cf1a30Sjl 			 * Check the string for correct syntax.
102125cf1a30Sjl 			 */
102225cf1a30Sjl 			if ((strchr(ep, ':') == NULL) ||
102325cf1a30Sjl 			    (strchr(ep, ']') == NULL)) {
102425cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
102525cf1a30Sjl 				    "that includes : %s\n", ep);
102625cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
102725cf1a30Sjl 				continue;
102825cf1a30Sjl 			}
102925cf1a30Sjl 
103025cf1a30Sjl 			/* expand the array syntax */
103125cf1a30Sjl 			prefix = strtok(ep, "[");
103225cf1a30Sjl 
103325cf1a30Sjl 			numstr = strtok(NULL, ":");
103425cf1a30Sjl 			if ((numstr == NULL) || (!isdigit(*numstr))) {
103525cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
103625cf1a30Sjl 				    "that includes : %s[\n", prefix);
103725cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
103825cf1a30Sjl 				continue;
103925cf1a30Sjl 			}
104025cf1a30Sjl 			beg = atoi(numstr);
104125cf1a30Sjl 
104225cf1a30Sjl 			numstr = strtok(NULL, "]");
104325cf1a30Sjl 			if ((numstr == NULL) || (!isdigit(*numstr))) {
104425cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
104525cf1a30Sjl 				    "that includes : %s[\n", prefix);
104625cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
104725cf1a30Sjl 				continue;
104825cf1a30Sjl 			}
104925cf1a30Sjl 			end = atoi(numstr);
105025cf1a30Sjl 
105125cf1a30Sjl 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
105225cf1a30Sjl 
105325cf1a30Sjl 			if (nlen > MAXPATHLEN) {
105425cf1a30Sjl 				fmd_hdl_error(hdl, "Endpoint prop string "
105525cf1a30Sjl 				    "exceeds MAXPATHLEN\n");
105625cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
105725cf1a30Sjl 				continue;
105825cf1a30Sjl 			}
105925cf1a30Sjl 
106025cf1a30Sjl 			for (i = beg; i <= end; i++) {
106125cf1a30Sjl 				bzero(epname, MAXPATHLEN);
106225cf1a30Sjl 				(void) snprintf(epname, nlen, "%s%d",
106325cf1a30Sjl 				    prefix, i);
106425cf1a30Sjl 				etm_init_epmap(hdl, epname, flags);
106525cf1a30Sjl 			}
106625cf1a30Sjl 		} else {
106725cf1a30Sjl 			etm_init_epmap(hdl, ep, flags);
106825cf1a30Sjl 		}
106925cf1a30Sjl 
107025cf1a30Sjl 		ep = strtok_r(NULL, " ", &lasts);
107125cf1a30Sjl 	}
107225cf1a30Sjl 
107325cf1a30Sjl 	fmd_hdl_free(hdl, epstr, slen + 1);
107425cf1a30Sjl }
107525cf1a30Sjl 
107625cf1a30Sjl /*
107725cf1a30Sjl  * Free the transport infrastructure for an endpoint.
107825cf1a30Sjl  */
107925cf1a30Sjl static void
108025cf1a30Sjl etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
108125cf1a30Sjl {
108225cf1a30Sjl 	size_t hdrlen;
108325cf1a30Sjl 	char hbuf[ETM_HDRLEN];
108425cf1a30Sjl 
108525cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
108625cf1a30Sjl 
108725cf1a30Sjl 	/*
108825cf1a30Sjl 	 * If an etm_send thread is in progress, wait for it to finish.
108925cf1a30Sjl 	 * The etm_recv thread is managed by the transport layer and will
109025cf1a30Sjl 	 * be destroyed with etm_xport_fini().
109125cf1a30Sjl 	 */
109225cf1a30Sjl 	while (mp->epm_txbusy)
109325cf1a30Sjl 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
109425cf1a30Sjl 
109525cf1a30Sjl 	if (mp->epm_timer_in_use)
109625cf1a30Sjl 		fmd_timer_remove(hdl, mp->epm_timer_id);
109725cf1a30Sjl 
109825cf1a30Sjl 	if (mp->epm_oconn != NULL) {
109925cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
110025cf1a30Sjl 		    ETM_HDR_SHUTDOWN, 0);
110125cf1a30Sjl 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
110225cf1a30Sjl 		    hdrlen);
110325cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
110425cf1a30Sjl 		mp->epm_oconn = NULL;
110525cf1a30Sjl 	}
110625cf1a30Sjl 
110725cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
110825cf1a30Sjl 		fmd_xprt_close(hdl, mp->epm_xprthdl);
110925cf1a30Sjl 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
111025cf1a30Sjl 		mp->epm_ep_nvl = NULL;
111125cf1a30Sjl 	}
111225cf1a30Sjl 
111325cf1a30Sjl 	if (mp->epm_ep_nvl != NULL)
111425cf1a30Sjl 		etm_free_ep_nvl(hdl, mp);
111525cf1a30Sjl 
111625cf1a30Sjl 	if (mp->epm_tlhdl != NULL)
111725cf1a30Sjl 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
111825cf1a30Sjl 
111925cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
112025cf1a30Sjl 	(void) pthread_mutex_destroy(&mp->epm_lock);
112125cf1a30Sjl 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
112225cf1a30Sjl 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
112325cf1a30Sjl 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
112425cf1a30Sjl }
112525cf1a30Sjl 
112625cf1a30Sjl /*
112725cf1a30Sjl  * FMD entry points
112825cf1a30Sjl  */
112925cf1a30Sjl 
113025cf1a30Sjl /*
113125cf1a30Sjl  * FMD fmdo_send entry point.
113225cf1a30Sjl  * Send an event to the remote endpoint and receive an ACK.
113325cf1a30Sjl  */
113425cf1a30Sjl static int
113525cf1a30Sjl etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
113625cf1a30Sjl {
113725cf1a30Sjl 	etm_epmap_t *mp;
113825cf1a30Sjl 	nvlist_t *msgnvl;
113977a7fd96Sjrutt 	int hdrstat, rv, cnt = 0;
114025cf1a30Sjl 	char *buf, *nvbuf, *class;
114125cf1a30Sjl 	size_t nvsize, buflen, hdrlen;
114277a7fd96Sjrutt 	struct timespec tms;
114325cf1a30Sjl 
114425cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
114525cf1a30Sjl 	if (Etm_exit) {
114625cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
114725cf1a30Sjl 		return (FMD_SEND_RETRY);
114825cf1a30Sjl 	}
114925cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
115025cf1a30Sjl 
115125cf1a30Sjl 	mp = fmd_xprt_getspecific(hdl, xprthdl);
115225cf1a30Sjl 
115377a7fd96Sjrutt 	for (;;) {
115477a7fd96Sjrutt 		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
115577a7fd96Sjrutt 			break;
115677a7fd96Sjrutt 		} else {
115777a7fd96Sjrutt 			/*
115877a7fd96Sjrutt 			 * Another thread may be (1) trying to close this
115977a7fd96Sjrutt 			 * fmd_xprt_t, or (2) posting an event to it.
116077a7fd96Sjrutt 			 * If (1), don't want to spend too much time here.
116177a7fd96Sjrutt 			 * If (2), allow it to finish and release epm_lock.
116277a7fd96Sjrutt 			 */
116377a7fd96Sjrutt 			if (cnt++ < 10) {
116477a7fd96Sjrutt 				tms.tv_sec = 0;
116577a7fd96Sjrutt 				tms.tv_nsec = (cnt * 10000);
116677a7fd96Sjrutt 				(void) nanosleep(&tms, NULL);
116777a7fd96Sjrutt 
116877a7fd96Sjrutt 			} else {
116977a7fd96Sjrutt 				return (FMD_SEND_RETRY);
117077a7fd96Sjrutt 			}
117177a7fd96Sjrutt 		}
117277a7fd96Sjrutt 	}
117325cf1a30Sjl 
117425cf1a30Sjl 	mp->epm_txbusy++;
117525cf1a30Sjl 
1176154b1f02Sjrutt 	if (mp->epm_qstat == Q_UNINITIALIZED) {
117725cf1a30Sjl 		mp->epm_txbusy--;
117825cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
117945736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
1180154b1f02Sjrutt 		return (FMD_SEND_FAILED);
1181154b1f02Sjrutt 	}
1182154b1f02Sjrutt 
1183154b1f02Sjrutt 	if (mp->epm_cstat == C_CLOSED) {
118425cf1a30Sjl 		etm_suspend_reconnect(hdl, mp);
1185154b1f02Sjrutt 		mp->epm_txbusy--;
1186154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
118745736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
118825cf1a30Sjl 		return (FMD_SEND_RETRY);
118925cf1a30Sjl 	}
119025cf1a30Sjl 
119125cf1a30Sjl 	if (mp->epm_cstat == C_LIMBO) {
119225cf1a30Sjl 		if (mp->epm_oconn != NULL) {
119325cf1a30Sjl 			(void) etm_xport_close(hdl, mp->epm_oconn);
119425cf1a30Sjl 			mp->epm_oconn = NULL;
119525cf1a30Sjl 		}
119625cf1a30Sjl 
119725cf1a30Sjl 		fmd_xprt_suspend(hdl, xprthdl);
119825cf1a30Sjl 		mp->epm_qstat = Q_SUSPENDED;
119925cf1a30Sjl 		mp->epm_txbusy--;
120025cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
120145736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
120225cf1a30Sjl 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
120325cf1a30Sjl 		return (FMD_SEND_RETRY);
120425cf1a30Sjl 	}
120525cf1a30Sjl 
120625cf1a30Sjl 	if (mp->epm_oconn == NULL) {
120725cf1a30Sjl 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
120825cf1a30Sjl 		    == NULL) {
1209154b1f02Sjrutt 			etm_suspend_reconnect(hdl, mp);
121025cf1a30Sjl 			mp->epm_txbusy--;
121125cf1a30Sjl 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
121245736083Sjrutt 			(void) pthread_mutex_unlock(&mp->epm_lock);
121325cf1a30Sjl 			return (FMD_SEND_RETRY);
121425cf1a30Sjl 		} else {
121525cf1a30Sjl 			mp->epm_cstat = C_OPEN;
121625cf1a30Sjl 		}
121725cf1a30Sjl 	}
121825cf1a30Sjl 
121925cf1a30Sjl 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
122025cf1a30Sjl 		fmd_hdl_abort(hdl, "No class string in nvlist");
122125cf1a30Sjl 
122225cf1a30Sjl 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
122325cf1a30Sjl 	if (msgnvl == NULL) {
1224154b1f02Sjrutt 		mp->epm_txbusy--;
1225154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
122645736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
122725cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
122825cf1a30Sjl 		    (void *) ep);
122925cf1a30Sjl 		return (FMD_SEND_FAILED);
123025cf1a30Sjl 	}
123125cf1a30Sjl 
123277a7fd96Sjrutt 	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
123377a7fd96Sjrutt 	if (rv == ETM_XPORT_FILTER_DROP) {
123477a7fd96Sjrutt 		mp->epm_txbusy--;
123577a7fd96Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
123677a7fd96Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
123777a7fd96Sjrutt 		fmd_hdl_debug(hdl, "send_filter dropped event");
123877a7fd96Sjrutt 		nvlist_free(msgnvl);
123977a7fd96Sjrutt 		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
124077a7fd96Sjrutt 		return (FMD_SEND_SUCCESS);
124177a7fd96Sjrutt 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
124277a7fd96Sjrutt 		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
124377a7fd96Sjrutt 		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
124477a7fd96Sjrutt 		/* Still send event */
124577a7fd96Sjrutt 	}
124677a7fd96Sjrutt 
124725cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
124825cf1a30Sjl 
124925cf1a30Sjl 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
125025cf1a30Sjl 
125125cf1a30Sjl 	hdrlen = ETM_HDRLEN;
125225cf1a30Sjl 	buflen = nvsize + hdrlen;
125325cf1a30Sjl 
125425cf1a30Sjl 	ALLOC_BUF(hdl, buf, buflen);
125525cf1a30Sjl 
125625cf1a30Sjl 	nvbuf = buf + hdrlen;
125725cf1a30Sjl 
125825cf1a30Sjl 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
125925cf1a30Sjl 
126025cf1a30Sjl 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1261154b1f02Sjrutt 		(void) pthread_mutex_lock(&mp->epm_lock);
1262154b1f02Sjrutt 		mp->epm_txbusy--;
1263154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
126445736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
126525cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
126677a7fd96Sjrutt 		nvlist_free(msgnvl);
126725cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
126825cf1a30Sjl 		return (FMD_SEND_FAILED);
126925cf1a30Sjl 	}
127025cf1a30Sjl 
127125cf1a30Sjl 	nvlist_free(msgnvl);
127225cf1a30Sjl 
127325cf1a30Sjl 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
127425cf1a30Sjl 	    buflen) != buflen) {
1275154b1f02Sjrutt 		fmd_hdl_debug(hdl, "failed to send message to %s",
1276154b1f02Sjrutt 		    mp->epm_ep_str);
127725cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
1278154b1f02Sjrutt 		etm_suspend_reconnect(hdl, mp);
127925cf1a30Sjl 		mp->epm_txbusy--;
128025cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
128145736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
128225cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
128325cf1a30Sjl 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
128425cf1a30Sjl 		return (FMD_SEND_RETRY);
128525cf1a30Sjl 	}
128625cf1a30Sjl 
128725cf1a30Sjl 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
128825cf1a30Sjl 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
128925cf1a30Sjl 
129025cf1a30Sjl 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
129125cf1a30Sjl 
129225cf1a30Sjl 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
129325cf1a30Sjl 	    hdrlen) != hdrlen) {
1294154b1f02Sjrutt 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1295154b1f02Sjrutt 		    mp->epm_ep_str);
129625cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
1297154b1f02Sjrutt 		etm_suspend_reconnect(hdl, mp);
129825cf1a30Sjl 		mp->epm_txbusy--;
129925cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
130045736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
130125cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
130225cf1a30Sjl 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
130325cf1a30Sjl 		return (FMD_SEND_RETRY);
130425cf1a30Sjl 	}
130525cf1a30Sjl 
130625cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, buf);
130725cf1a30Sjl 	FREE_BUF(hdl, buf, buflen);
130825cf1a30Sjl 
130925cf1a30Sjl 	if (hdrstat == ETM_HDR_ACK) {
131025cf1a30Sjl 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
131125cf1a30Sjl 	} else {
131225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
131325cf1a30Sjl 
131425cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
131525cf1a30Sjl 		mp->epm_oconn = NULL;
131625cf1a30Sjl 
131725cf1a30Sjl 		if (hdrstat == ETM_HDR_NAK) {
131825cf1a30Sjl 			/* Peer received a bad value in the header */
131925cf1a30Sjl 			if (mp->epm_xprthdl != NULL) {
132025cf1a30Sjl 				mp->epm_cstat = C_LIMBO;
132125cf1a30Sjl 				fmd_xprt_suspend(hdl, xprthdl);
132225cf1a30Sjl 				mp->epm_qstat = Q_SUSPENDED;
132325cf1a30Sjl 				fmd_hdl_debug(hdl, "received NAK, queue "
132425cf1a30Sjl 				    "suspended for %s", mp->epm_ep_str);
132525cf1a30Sjl 			}
132625cf1a30Sjl 
132725cf1a30Sjl 			rv = FMD_SEND_RETRY;
132825cf1a30Sjl 
132925cf1a30Sjl 		} else if (hdrstat == ETM_HDR_S_RESTART) {
133025cf1a30Sjl 			/* Server has restarted */
1331154b1f02Sjrutt 			mp->epm_cstat = C_CLOSED;
1332154b1f02Sjrutt 			mp->epm_qstat = Q_UNINITIALIZED;
1333154b1f02Sjrutt 			fmd_hdl_debug(hdl, "server %s restarted",
1334154b1f02Sjrutt 			    mp->epm_ep_str);
1335154b1f02Sjrutt 			/*
1336154b1f02Sjrutt 			 * Cannot call fmd_xprt_close here, so we'll do it
1337154b1f02Sjrutt 			 * on the timeout thread.
1338154b1f02Sjrutt 			 */
1339154b1f02Sjrutt 			if (mp->epm_timer_in_use == 0) {
1340154b1f02Sjrutt 				mp->epm_timer_id = fmd_timer_install(
1341154b1f02Sjrutt 				    hdl, mp, NULL, 0);
1342154b1f02Sjrutt 				mp->epm_timer_in_use = 1;
134325cf1a30Sjl 			}
134425cf1a30Sjl 
134525cf1a30Sjl 			/*
134625cf1a30Sjl 			 * fault.* or list.* events will be replayed if a
134725cf1a30Sjl 			 * transport is opened with the same auth.
134825cf1a30Sjl 			 * Other events will be discarded.
134925cf1a30Sjl 			 */
135025cf1a30Sjl 			rv = FMD_SEND_FAILED;
135125cf1a30Sjl 
135225cf1a30Sjl 		} else {
135325cf1a30Sjl 			mp->epm_cstat = C_CLOSED;
135425cf1a30Sjl 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
135525cf1a30Sjl 
135625cf1a30Sjl 			rv = FMD_SEND_RETRY;
135725cf1a30Sjl 		}
135825cf1a30Sjl 
135925cf1a30Sjl 		mp->epm_txbusy--;
136025cf1a30Sjl 
136125cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
136245736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
136325cf1a30Sjl 
136425cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
136525cf1a30Sjl 
136625cf1a30Sjl 		return (rv);
136725cf1a30Sjl 	}
136825cf1a30Sjl 
136925cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
137025cf1a30Sjl 	mp->epm_txbusy--;
137125cf1a30Sjl 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
137245736083Sjrutt 	(void) pthread_mutex_unlock(&mp->epm_lock);
137325cf1a30Sjl 
137425cf1a30Sjl 	return (FMD_SEND_SUCCESS);
137525cf1a30Sjl }
137625cf1a30Sjl 
137725cf1a30Sjl /*
137825cf1a30Sjl  * FMD fmdo_timeout entry point..
137925cf1a30Sjl  */
138025cf1a30Sjl /*ARGSUSED*/
138125cf1a30Sjl static void
138225cf1a30Sjl etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
138325cf1a30Sjl {
138425cf1a30Sjl 	etm_epmap_t *mp = (etm_epmap_t *)data;
138525cf1a30Sjl 
138625cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
138725cf1a30Sjl 
138825cf1a30Sjl 	mp->epm_timer_in_use = 0;
138925cf1a30Sjl 
139025cf1a30Sjl 	if (mp->epm_qstat == Q_UNINITIALIZED) {
139125cf1a30Sjl 		/* Server has shutdown and we (client) need to reconnect */
1392154b1f02Sjrutt 		if (mp->epm_xprthdl != NULL) {
1393154b1f02Sjrutt 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1394154b1f02Sjrutt 			fmd_hdl_debug(hdl, "queue closed for %s",
1395154b1f02Sjrutt 			    mp->epm_ep_str);
1396154b1f02Sjrutt 			mp->epm_xprthdl = NULL;
1397154b1f02Sjrutt 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1398154b1f02Sjrutt 			mp->epm_ep_nvl = NULL;
1399154b1f02Sjrutt 		}
1400154b1f02Sjrutt 
140125cf1a30Sjl 		if (mp->epm_ep_nvl == NULL)
140225cf1a30Sjl 			(void) etm_get_ep_nvl(hdl, mp);
140325cf1a30Sjl 
140425cf1a30Sjl 		if (etm_handle_startup(hdl, mp)) {
1405154b1f02Sjrutt 			if (mp->epm_oconn != NULL) {
1406154b1f02Sjrutt 				(void) etm_xport_close(hdl, mp->epm_oconn);
1407154b1f02Sjrutt 				mp->epm_oconn = NULL;
1408154b1f02Sjrutt 			}
1409154b1f02Sjrutt 			mp->epm_cstat = C_UNINITIALIZED;
141025cf1a30Sjl 			mp->epm_qstat = Q_UNINITIALIZED;
141125cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
141225cf1a30Sjl 			    Reconn_interval);
141325cf1a30Sjl 			mp->epm_timer_in_use = 1;
141425cf1a30Sjl 		}
141525cf1a30Sjl 	} else {
141625cf1a30Sjl 		etm_reconnect(hdl, mp);
141725cf1a30Sjl 	}
141825cf1a30Sjl 
141925cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
142025cf1a30Sjl }
142125cf1a30Sjl 
142225cf1a30Sjl /*
142325cf1a30Sjl  * FMD Module declarations
142425cf1a30Sjl  */
142525cf1a30Sjl static const fmd_hdl_ops_t etm_ops = {
142625cf1a30Sjl 	NULL,		/* fmdo_recv */
142725cf1a30Sjl 	etm_timeout,	/* fmdo_timeout */
142825cf1a30Sjl 	NULL,		/* fmdo_close */
142925cf1a30Sjl 	NULL,		/* fmdo_stats */
143025cf1a30Sjl 	NULL,		/* fmdo_gc */
143125cf1a30Sjl 	etm_send,	/* fmdo_send */
143225cf1a30Sjl };
143325cf1a30Sjl 
143425cf1a30Sjl static const fmd_prop_t etm_props[] = {
143525cf1a30Sjl 	{ "client_list", FMD_TYPE_STRING, NULL },
143625cf1a30Sjl 	{ "server_list", FMD_TYPE_STRING, NULL },
143725cf1a30Sjl 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
143877a7fd96Sjrutt 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
143977a7fd96Sjrutt 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
144077a7fd96Sjrutt 	{ "filter_path", FMD_TYPE_STRING, NULL },
144125cf1a30Sjl 	{ NULL, 0, NULL }
144225cf1a30Sjl };
144325cf1a30Sjl 
144425cf1a30Sjl static const fmd_hdl_info_t etm_info = {
144525cf1a30Sjl 	"Event Transport Module", "2.0", &etm_ops, etm_props
144625cf1a30Sjl };
144725cf1a30Sjl 
144825cf1a30Sjl /*
144925cf1a30Sjl  * Initialize the transport for use by ETM.
145025cf1a30Sjl  */
145125cf1a30Sjl void
145225cf1a30Sjl _fmd_init(fmd_hdl_t *hdl)
145325cf1a30Sjl {
145425cf1a30Sjl 	char *propstr;
145525cf1a30Sjl 
145625cf1a30Sjl 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
145725cf1a30Sjl 		return; /* invalid data in configuration file */
145825cf1a30Sjl 	}
145925cf1a30Sjl 
146025cf1a30Sjl 	/* Create global stats */
146125cf1a30Sjl 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
146225cf1a30Sjl 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
146325cf1a30Sjl 
146425cf1a30Sjl 	/* Get module properties */
146525cf1a30Sjl 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
146625cf1a30Sjl 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
146725cf1a30Sjl 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
146825cf1a30Sjl 
146925cf1a30Sjl 	propstr = fmd_prop_get_string(hdl, "client_list");
147025cf1a30Sjl 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
147125cf1a30Sjl 	fmd_prop_free_string(hdl, propstr);
147225cf1a30Sjl 
147325cf1a30Sjl 	propstr = fmd_prop_get_string(hdl, "server_list");
147425cf1a30Sjl 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
147525cf1a30Sjl 	fmd_prop_free_string(hdl, propstr);
147625cf1a30Sjl 
147725cf1a30Sjl 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
147825cf1a30Sjl 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
147925cf1a30Sjl 		fmd_hdl_unregister(hdl);
148025cf1a30Sjl 		return;
148125cf1a30Sjl 	}
148225cf1a30Sjl }
148325cf1a30Sjl 
148425cf1a30Sjl /*
148525cf1a30Sjl  * Teardown the transport
148625cf1a30Sjl  */
148725cf1a30Sjl void
148825cf1a30Sjl _fmd_fini(fmd_hdl_t *hdl)
148925cf1a30Sjl {
149025cf1a30Sjl 	etm_epmap_t *mp, *next;
149125cf1a30Sjl 
149225cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
149325cf1a30Sjl 	Etm_exit = 1;
149425cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
149525cf1a30Sjl 
149625cf1a30Sjl 	mp = Epmap_head;
149725cf1a30Sjl 
149825cf1a30Sjl 	while (mp) {
149925cf1a30Sjl 		next = mp->epm_next;
150025cf1a30Sjl 		etm_free_epmap(hdl, mp);
150125cf1a30Sjl 		mp = next;
150225cf1a30Sjl 	}
150325cf1a30Sjl 
150425cf1a30Sjl 	fmd_hdl_unregister(hdl);
150525cf1a30Sjl }
1506