xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision 77a7fd96f77f04bbd7634db14755686062589eca)
125cf1a30Sjl /*
225cf1a30Sjl  * CDDL HEADER START
325cf1a30Sjl  *
425cf1a30Sjl  * The contents of this file are subject to the terms of the
525cf1a30Sjl  * Common Development and Distribution License (the "License").
625cf1a30Sjl  * You may not use this file except in compliance with the License.
725cf1a30Sjl  *
825cf1a30Sjl  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
925cf1a30Sjl  * or http://www.opensolaris.org/os/licensing.
1025cf1a30Sjl  * See the License for the specific language governing permissions
1125cf1a30Sjl  * and limitations under the License.
1225cf1a30Sjl  *
1325cf1a30Sjl  * When distributing Covered Code, include this CDDL HEADER in each
1425cf1a30Sjl  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
1525cf1a30Sjl  * If applicable, add the following below this CDDL HEADER, with the
1625cf1a30Sjl  * fields enclosed by brackets "[]" replaced with your own identifying
1725cf1a30Sjl  * information: Portions Copyright [yyyy] [name of copyright owner]
1825cf1a30Sjl  *
1925cf1a30Sjl  * CDDL HEADER END
2025cf1a30Sjl  */
2125cf1a30Sjl 
2225cf1a30Sjl /*
2325cf1a30Sjl  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
2425cf1a30Sjl  * Use is subject to license terms.
2525cf1a30Sjl  */
2625cf1a30Sjl 
2725cf1a30Sjl #pragma ident	"%Z%%M%	%I%	%E% SMI"
2825cf1a30Sjl 
2925cf1a30Sjl /*
3025cf1a30Sjl  * FMA Event Transport Module
3125cf1a30Sjl  *
3225cf1a30Sjl  * Plugin for sending/receiving FMA events to/from a remote endoint.
3325cf1a30Sjl  */
3425cf1a30Sjl 
3525cf1a30Sjl #include <netinet/in.h>
36*77a7fd96Sjrutt #include <errno.h>
3725cf1a30Sjl #include <sys/fm/protocol.h>
3825cf1a30Sjl #include <sys/sysmacros.h>
3925cf1a30Sjl #include <pthread.h>
4025cf1a30Sjl #include <strings.h>
4125cf1a30Sjl #include <ctype.h>
4225cf1a30Sjl #include <link.h>
4325cf1a30Sjl #include <libnvpair.h>
4425cf1a30Sjl #include "etm_xport_api.h"
4525cf1a30Sjl #include "etm_proto.h"
4625cf1a30Sjl 
4725cf1a30Sjl /*
4825cf1a30Sjl  * ETM declarations
4925cf1a30Sjl  */
5025cf1a30Sjl 
5125cf1a30Sjl typedef enum etm_connection_status {
5225cf1a30Sjl 	C_UNINITIALIZED = 0,
5325cf1a30Sjl 	C_OPEN,				/* Connection is open */
5425cf1a30Sjl 	C_CLOSED,			/* Connection is closed */
5525cf1a30Sjl 	C_LIMBO,			/* Bad value in header from peer */
5625cf1a30Sjl 	C_TIMED_OUT			/* Reconnection to peer timed out */
5725cf1a30Sjl } etm_connstat_t;
5825cf1a30Sjl 
5925cf1a30Sjl typedef enum etm_fmd_queue_status {
6025cf1a30Sjl 	Q_UNINITIALIZED = 100,
6125cf1a30Sjl 	Q_INIT_PENDING,			/* Queue initialization in progress */
6225cf1a30Sjl 	Q_OPEN,				/* Queue is open */
6325cf1a30Sjl 	Q_SUSPENDED			/* Queue is suspended */
6425cf1a30Sjl } etm_qstat_t;
6525cf1a30Sjl 
6625cf1a30Sjl /* Per endpoint data */
6725cf1a30Sjl typedef struct etm_endpoint_map {
6825cf1a30Sjl 	uint8_t epm_ver;		/* Protocol version being used */
6925cf1a30Sjl 	char *epm_ep_str;		/* Endpoint ID string */
7025cf1a30Sjl 	int epm_xprtflags;		/* FMD transport open flags */
7125cf1a30Sjl 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
7225cf1a30Sjl 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
7325cf1a30Sjl 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
7425cf1a30Sjl 	int epm_txbusy;			/* Busy doing send/transmit */
7525cf1a30Sjl 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
7625cf1a30Sjl 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
7725cf1a30Sjl 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
7825cf1a30Sjl 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
7925cf1a30Sjl 	etm_connstat_t epm_cstat;	/* Status of connection */
8025cf1a30Sjl 	id_t epm_timer_id;		/* Timer id */
8125cf1a30Sjl 	int epm_timer_in_use;		/* Indicates if timer is in use */
8225cf1a30Sjl 	hrtime_t epm_reconn_end;	/* Reconnection end time */
8325cf1a30Sjl 	struct etm_endpoint_map *epm_next;
8425cf1a30Sjl } etm_epmap_t;
8525cf1a30Sjl 
8625cf1a30Sjl #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
8725cf1a30Sjl #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
8825cf1a30Sjl #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
8925cf1a30Sjl #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
9025cf1a30Sjl #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
9125cf1a30Sjl #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
9225cf1a30Sjl 
9325cf1a30Sjl #define	ALLOC_BUF(hdl, buf, size) \
9425cf1a30Sjl 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
9525cf1a30Sjl 
9625cf1a30Sjl #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
9725cf1a30Sjl 
9825cf1a30Sjl #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
9925cf1a30Sjl 
10025cf1a30Sjl #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10125cf1a30Sjl 				(x)++;					    \
10225cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10325cf1a30Sjl 			}
10425cf1a30Sjl 
10525cf1a30Sjl #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10625cf1a30Sjl 				(x)--;					    \
10725cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10825cf1a30Sjl 			}
10925cf1a30Sjl 
11025cf1a30Sjl #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
11125cf1a30Sjl 				(x) += (y);				    \
11225cf1a30Sjl 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
11325cf1a30Sjl 			}
11425cf1a30Sjl 
11525cf1a30Sjl /*
11625cf1a30Sjl  * Global variables
11725cf1a30Sjl  */
11825cf1a30Sjl static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
11925cf1a30Sjl 					/* Protects globals */
12025cf1a30Sjl static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
12125cf1a30Sjl static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
12225cf1a30Sjl static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
12325cf1a30Sjl static int Etm_dump = 0;		/* Enables hex dump for debug */
12425cf1a30Sjl static int Etm_exit = 0;		/* Flag for exit */
12525cf1a30Sjl static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
12625cf1a30Sjl 
12725cf1a30Sjl /* Module statistics */
12825cf1a30Sjl static struct etm_stats {
12925cf1a30Sjl 	/* read counters */
13025cf1a30Sjl 	fmd_stat_t read_ack;
13125cf1a30Sjl 	fmd_stat_t read_bytes;
13225cf1a30Sjl 	fmd_stat_t read_msg;
133*77a7fd96Sjrutt 	fmd_stat_t post_filter;
13425cf1a30Sjl 	/* write counters */
13525cf1a30Sjl 	fmd_stat_t write_ack;
13625cf1a30Sjl 	fmd_stat_t write_bytes;
13725cf1a30Sjl 	fmd_stat_t write_msg;
138*77a7fd96Sjrutt 	fmd_stat_t send_filter;
13925cf1a30Sjl 	/* error counters */
14025cf1a30Sjl 	fmd_stat_t error_protocol;
14125cf1a30Sjl 	fmd_stat_t error_drop_read;
14225cf1a30Sjl 	fmd_stat_t error_read;
14325cf1a30Sjl 	fmd_stat_t error_read_badhdr;
14425cf1a30Sjl 	fmd_stat_t error_write;
145*77a7fd96Sjrutt 	fmd_stat_t error_send_filter;
146*77a7fd96Sjrutt 	fmd_stat_t error_post_filter;
14725cf1a30Sjl 	/* misc */
14825cf1a30Sjl 	fmd_stat_t peer_count;
14925cf1a30Sjl 
15025cf1a30Sjl } Etm_stats = {
15125cf1a30Sjl 	/* read counters */
15225cf1a30Sjl 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
15325cf1a30Sjl 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
15425cf1a30Sjl 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
155*77a7fd96Sjrutt 	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
15625cf1a30Sjl 	/* write counters */
15725cf1a30Sjl 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
15825cf1a30Sjl 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
15925cf1a30Sjl 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
160*77a7fd96Sjrutt 	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
16125cf1a30Sjl 	/* ETM error counters */
16225cf1a30Sjl 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
16325cf1a30Sjl 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
16425cf1a30Sjl 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
16525cf1a30Sjl 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
16625cf1a30Sjl 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
167*77a7fd96Sjrutt 	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
168*77a7fd96Sjrutt 	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
16925cf1a30Sjl 	/* ETM Misc */
17025cf1a30Sjl 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
17125cf1a30Sjl };
17225cf1a30Sjl 
17325cf1a30Sjl /*
17425cf1a30Sjl  * ETM Private functions
17525cf1a30Sjl  */
17625cf1a30Sjl 
17725cf1a30Sjl /*
17825cf1a30Sjl  * Hex dump for debug.
17925cf1a30Sjl  */
18025cf1a30Sjl static void
18125cf1a30Sjl etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
18225cf1a30Sjl {
18325cf1a30Sjl 	int i, j, k;
18425cf1a30Sjl 	int16_t *c;
18525cf1a30Sjl 
18625cf1a30Sjl 	if (Etm_dump == 0)
18725cf1a30Sjl 		return;
18825cf1a30Sjl 
18925cf1a30Sjl 	j = buflen / 16;	/* Number of complete 8-column rows */
19025cf1a30Sjl 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
19125cf1a30Sjl 
19225cf1a30Sjl 	if (direction)
19325cf1a30Sjl 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
19425cf1a30Sjl 	else
19525cf1a30Sjl 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
19625cf1a30Sjl 
19725cf1a30Sjl 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
19825cf1a30Sjl 
19925cf1a30Sjl 	/* Dump the complete 8-column rows */
20025cf1a30Sjl 	for (i = 0; i < j; i++) {
20125cf1a30Sjl 		c = (int16_t *)buf + (i * 8);
20225cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
20325cf1a30Sjl 		    *(c+0), *(c+1), *(c+2), *(c+3),
20425cf1a30Sjl 		    *(c+4), *(c+5), *(c+6), *(c+7));
20525cf1a30Sjl 	}
20625cf1a30Sjl 
20725cf1a30Sjl 	/* Dump the last (incomplete) row */
20825cf1a30Sjl 	c = (int16_t *)buf + (i * 8);
20925cf1a30Sjl 	switch (k) {
21025cf1a30Sjl 	case 4:
21125cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
21225cf1a30Sjl 		break;
21325cf1a30Sjl 	case 8:
21425cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
21525cf1a30Sjl 		    *(c+2), *(c+3));
21625cf1a30Sjl 		break;
21725cf1a30Sjl 	case 12:
21825cf1a30Sjl 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
21925cf1a30Sjl 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
22025cf1a30Sjl 		break;
22125cf1a30Sjl 	}
22225cf1a30Sjl 
22325cf1a30Sjl 	fmd_hdl_debug(hdl, "---      End Dump      ---");
22425cf1a30Sjl }
22525cf1a30Sjl 
22625cf1a30Sjl /*
22725cf1a30Sjl  * Provide the length of a message based on the data in the given ETM header.
22825cf1a30Sjl  */
22925cf1a30Sjl static size_t
23025cf1a30Sjl etm_get_msglen(void *buf)
23125cf1a30Sjl {
23225cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
23325cf1a30Sjl 
23425cf1a30Sjl 	return (ntohl(hp->hdr_msglen));
23525cf1a30Sjl }
23625cf1a30Sjl 
23725cf1a30Sjl /*
23825cf1a30Sjl  * Check the contents of the ETM header for errors.
23925cf1a30Sjl  * Return the header type (hdr_type).
24025cf1a30Sjl  */
24125cf1a30Sjl static int
24225cf1a30Sjl etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
24325cf1a30Sjl {
24425cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
24525cf1a30Sjl 
24625cf1a30Sjl 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
24825cf1a30Sjl 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
24925cf1a30Sjl 		return (ETM_HDR_INVALID);
25025cf1a30Sjl 	}
25125cf1a30Sjl 
25225cf1a30Sjl 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
25325cf1a30Sjl 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
25425cf1a30Sjl 		/* Until version is negotiated, other fields may be wrong */
25525cf1a30Sjl 		return (hp->hdr_type);
25625cf1a30Sjl 	}
25725cf1a30Sjl 
25825cf1a30Sjl 	if (hp->hdr_ver != mp->epm_ver) {
259154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
26025cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_ver);
26125cf1a30Sjl 		return (ETM_HDR_BADVERSION);
26225cf1a30Sjl 	}
26325cf1a30Sjl 
26425cf1a30Sjl 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
26525cf1a30Sjl 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266154b1f02Sjrutt 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
26725cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_type);
26825cf1a30Sjl 		return (ETM_HDR_BADTYPE);
26925cf1a30Sjl 	}
27025cf1a30Sjl 
27125cf1a30Sjl 	return (hp->hdr_type);
27225cf1a30Sjl }
27325cf1a30Sjl 
27425cf1a30Sjl /*
27525cf1a30Sjl  * Create an ETM header of a given type in the given buffer.
27625cf1a30Sjl  * Return length of header.
27725cf1a30Sjl  */
27825cf1a30Sjl static size_t
27925cf1a30Sjl etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
28025cf1a30Sjl {
28125cf1a30Sjl 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
28225cf1a30Sjl 
28325cf1a30Sjl 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
28425cf1a30Sjl 	hp->hdr_ver = ver;
28525cf1a30Sjl 	hp->hdr_type = type;
28625cf1a30Sjl 	hp->hdr_msglen = htonl(msglen);
28725cf1a30Sjl 
28825cf1a30Sjl 	return (ETM_HDRLEN);
28925cf1a30Sjl }
29025cf1a30Sjl 
29125cf1a30Sjl /*
29225cf1a30Sjl  * Convert message bytes to nvlist and post to fmd.
29325cf1a30Sjl  * Return zero for success, non-zero for failure.
29425cf1a30Sjl  *
29525cf1a30Sjl  * Note : nvl is free'd by fmd.
29625cf1a30Sjl  */
29725cf1a30Sjl static int
29825cf1a30Sjl etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
29925cf1a30Sjl {
30025cf1a30Sjl 	nvlist_t *nvl;
30125cf1a30Sjl 	int rv;
30225cf1a30Sjl 
30325cf1a30Sjl 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304154b1f02Sjrutt 		fmd_hdl_error(hdl, "failed to unpack message");
30525cf1a30Sjl 		return (1);
30625cf1a30Sjl 	}
30725cf1a30Sjl 
308*77a7fd96Sjrutt 	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
309*77a7fd96Sjrutt 	if (rv == ETM_XPORT_FILTER_DROP) {
310*77a7fd96Sjrutt 		fmd_hdl_debug(hdl, "post_filter dropped event");
311*77a7fd96Sjrutt 		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
312*77a7fd96Sjrutt 		nvlist_free(nvl);
313*77a7fd96Sjrutt 		return (0);
314*77a7fd96Sjrutt 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
315*77a7fd96Sjrutt 		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
316*77a7fd96Sjrutt 		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
317*77a7fd96Sjrutt 		/* Still post event */
318*77a7fd96Sjrutt 	}
319*77a7fd96Sjrutt 
32025cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
32125cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
32225cf1a30Sjl 	if (!Etm_exit) {
32325cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
32425cf1a30Sjl 		if (mp->epm_qstat == Q_OPEN) {
32525cf1a30Sjl 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
32625cf1a30Sjl 			rv = 0;
32725cf1a30Sjl 		} else if (mp->epm_qstat == Q_SUSPENDED) {
32825cf1a30Sjl 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
32925cf1a30Sjl 			if (mp->epm_timer_in_use) {
33025cf1a30Sjl 				fmd_timer_remove(hdl, mp->epm_timer_id);
33125cf1a30Sjl 				mp->epm_timer_in_use = 0;
33225cf1a30Sjl 			}
33325cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
33425cf1a30Sjl 			fmd_hdl_debug(hdl, "queue resumed for %s",
33525cf1a30Sjl 			    mp->epm_ep_str);
33625cf1a30Sjl 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
33725cf1a30Sjl 			rv = 0;
33825cf1a30Sjl 		} else {
33925cf1a30Sjl 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
34025cf1a30Sjl 			    mp->epm_qstat);
341154b1f02Sjrutt 			nvlist_free(nvl);
342154b1f02Sjrutt 			/* Remote peer will attempt to resend event */
34325cf1a30Sjl 			rv = 2;
34425cf1a30Sjl 		}
34525cf1a30Sjl 	} else {
34625cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
34725cf1a30Sjl 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348154b1f02Sjrutt 		nvlist_free(nvl);
349154b1f02Sjrutt 		/* Remote peer will attempt to resend event */
35025cf1a30Sjl 		rv = 3;
35125cf1a30Sjl 	}
35225cf1a30Sjl 
35325cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
35425cf1a30Sjl 
35525cf1a30Sjl 	return (rv);
35625cf1a30Sjl }
35725cf1a30Sjl 
35825cf1a30Sjl /*
35925cf1a30Sjl  * Handle the startup handshake to the server.  The client always initiates
36025cf1a30Sjl  * the startup handshake.  In the following sequence, we are the client and
36125cf1a30Sjl  * the remote endpoint is the server.
36225cf1a30Sjl  *
36325cf1a30Sjl  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
36425cf1a30Sjl  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
36525cf1a30Sjl  *	Client sends ACK and transitions to Q_OPEN state.
36625cf1a30Sjl  *	Server receives ACK and transitions to Q_OPEN state.
36725cf1a30Sjl  *
36825cf1a30Sjl  * Return 0 for success, nonzero for failure.
36925cf1a30Sjl  */
37025cf1a30Sjl static int
37125cf1a30Sjl etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
37225cf1a30Sjl {
37325cf1a30Sjl 	etm_proto_hdr_t *hp;
37425cf1a30Sjl 	size_t hdrlen = ETM_HDRLEN;
37525cf1a30Sjl 	int hdrstat;
37625cf1a30Sjl 	char hbuf[ETM_HDRLEN];
37725cf1a30Sjl 
37825cf1a30Sjl 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
37925cf1a30Sjl 		return (1);
38025cf1a30Sjl 
38125cf1a30Sjl 	mp->epm_cstat = C_OPEN;
38225cf1a30Sjl 
38325cf1a30Sjl 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
38425cf1a30Sjl 
38525cf1a30Sjl 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
38625cf1a30Sjl 	    hdrlen)) != hdrlen) {
38725cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
38825cf1a30Sjl 		    mp->epm_ep_str);
38925cf1a30Sjl 		return (2);
39025cf1a30Sjl 	}
39125cf1a30Sjl 
39225cf1a30Sjl 	mp->epm_qstat = Q_INIT_PENDING;
39325cf1a30Sjl 
39425cf1a30Sjl 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
39525cf1a30Sjl 	    hdrlen)) != hdrlen) {
39625cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
39725cf1a30Sjl 		    mp->epm_ep_str);
39825cf1a30Sjl 		return (3);
39925cf1a30Sjl 	}
40025cf1a30Sjl 
40125cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
40225cf1a30Sjl 
40325cf1a30Sjl 	if (hdrstat != ETM_HDR_S_HELLO) {
40425cf1a30Sjl 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
40525cf1a30Sjl 		    "from %s", mp->epm_ep_str);
40625cf1a30Sjl 		return (4);
40725cf1a30Sjl 	}
40825cf1a30Sjl 
40925cf1a30Sjl 	/*
41025cf1a30Sjl 	 * Get version from the server.
41125cf1a30Sjl 	 * Currently, only one version is supported.
41225cf1a30Sjl 	 */
41325cf1a30Sjl 	hp = (etm_proto_hdr_t *)(void *)hbuf;
41425cf1a30Sjl 	if (hp->hdr_ver != ETM_PROTO_V1) {
41525cf1a30Sjl 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
41625cf1a30Sjl 		    mp->epm_ep_str, hp->hdr_ver);
41725cf1a30Sjl 		return (5);
41825cf1a30Sjl 	}
41925cf1a30Sjl 	mp->epm_ver = hp->hdr_ver;
42025cf1a30Sjl 
42125cf1a30Sjl 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
42225cf1a30Sjl 
42325cf1a30Sjl 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
42425cf1a30Sjl 	    hdrlen)) != hdrlen) {
42525cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
42625cf1a30Sjl 		    mp->epm_ep_str);
42725cf1a30Sjl 		return (6);
42825cf1a30Sjl 	}
42925cf1a30Sjl 
43025cf1a30Sjl 	/*
43125cf1a30Sjl 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
43225cf1a30Sjl 	 * Etm_mod_lock held to avoid race with etm_send thread.
43325cf1a30Sjl 	 */
43425cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
43525cf1a30Sjl 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
43625cf1a30Sjl 	    mp->epm_ep_nvl, NULL)) == NULL) {
43725cf1a30Sjl 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
43825cf1a30Sjl 		    mp->epm_ep_str);
43925cf1a30Sjl 	}
44025cf1a30Sjl 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
44125cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
44225cf1a30Sjl 
44325cf1a30Sjl 	mp->epm_qstat = Q_OPEN;
44425cf1a30Sjl 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
44525cf1a30Sjl 
44625cf1a30Sjl 	return (0);
44725cf1a30Sjl }
44825cf1a30Sjl 
44925cf1a30Sjl /*
45025cf1a30Sjl  * Alloc a nvlist and add a string for the endpoint.
45125cf1a30Sjl  * Return zero for success, non-zero for failure.
45225cf1a30Sjl  */
45325cf1a30Sjl static int
45425cf1a30Sjl etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
45525cf1a30Sjl {
45625cf1a30Sjl 	/*
45725cf1a30Sjl 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
45825cf1a30Sjl 	 * in fmd when this nvlist_t is free'd.
45925cf1a30Sjl 	 */
46025cf1a30Sjl 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
46125cf1a30Sjl 
46225cf1a30Sjl 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
463154b1f02Sjrutt 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
46425cf1a30Sjl 		    "for %s", mp->epm_ep_str);
46525cf1a30Sjl 		nvlist_free(mp->epm_ep_nvl);
46625cf1a30Sjl 		return (1);
46725cf1a30Sjl 	}
46825cf1a30Sjl 
46925cf1a30Sjl 	return (0);
47025cf1a30Sjl }
47125cf1a30Sjl 
47225cf1a30Sjl /*
47325cf1a30Sjl  * Free the nvlist for the endpoint_id string.
47425cf1a30Sjl  */
47525cf1a30Sjl /*ARGSUSED*/
47625cf1a30Sjl static void
47725cf1a30Sjl etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
47825cf1a30Sjl {
47925cf1a30Sjl 	nvlist_free(mp->epm_ep_nvl);
48025cf1a30Sjl }
48125cf1a30Sjl 
48225cf1a30Sjl /*
48325cf1a30Sjl  * Check for a duplicate endpoint/peer string.
48425cf1a30Sjl  */
48525cf1a30Sjl /*ARGSUSED*/
48625cf1a30Sjl static int
48725cf1a30Sjl etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
48825cf1a30Sjl {
48925cf1a30Sjl 	etm_epmap_t *mp;
49025cf1a30Sjl 
49125cf1a30Sjl 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
49225cf1a30Sjl 		if (strcmp(epname, mp->epm_ep_str) == 0)
49325cf1a30Sjl 			return (1);
49425cf1a30Sjl 
49525cf1a30Sjl 	return (0);
49625cf1a30Sjl }
49725cf1a30Sjl 
49825cf1a30Sjl /*
49925cf1a30Sjl  * Attempt to re-open a connection with the remote endpoint.
50025cf1a30Sjl  */
50125cf1a30Sjl static void
50225cf1a30Sjl etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
50325cf1a30Sjl {
50425cf1a30Sjl 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
50525cf1a30Sjl 		if (gethrtime() < mp->epm_reconn_end) {
50625cf1a30Sjl 			if ((mp->epm_oconn = etm_xport_open(hdl,
50725cf1a30Sjl 			    mp->epm_tlhdl)) == NULL) {
50825cf1a30Sjl 				fmd_hdl_debug(hdl, "reconnect failed for %s",
50925cf1a30Sjl 				    mp->epm_ep_str);
51025cf1a30Sjl 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
51125cf1a30Sjl 				    NULL, Reconn_interval);
51225cf1a30Sjl 				mp->epm_timer_in_use = 1;
51325cf1a30Sjl 			} else {
51425cf1a30Sjl 				fmd_hdl_debug(hdl, "reconnect success for %s",
51525cf1a30Sjl 				    mp->epm_ep_str);
51625cf1a30Sjl 				mp->epm_reconn_end = 0;
51725cf1a30Sjl 				mp->epm_cstat = C_OPEN;
51825cf1a30Sjl 			}
51925cf1a30Sjl 		} else {
52025cf1a30Sjl 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
52125cf1a30Sjl 			    mp->epm_ep_str);
52225cf1a30Sjl 			mp->epm_reconn_end = 0;
52325cf1a30Sjl 			mp->epm_cstat = C_TIMED_OUT;
52425cf1a30Sjl 		}
52525cf1a30Sjl 	}
52625cf1a30Sjl 
52725cf1a30Sjl 	if (mp->epm_cstat == C_OPEN) {
52825cf1a30Sjl 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
52925cf1a30Sjl 		mp->epm_qstat = Q_OPEN;
53025cf1a30Sjl 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
53125cf1a30Sjl 	}
53225cf1a30Sjl }
53325cf1a30Sjl 
53425cf1a30Sjl /*
53525cf1a30Sjl  * Suspend a given connection and setup for reconnection retries.
536154b1f02Sjrutt  * Assume caller holds lock on epm_lock.
53725cf1a30Sjl  */
53825cf1a30Sjl static void
53925cf1a30Sjl etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
54025cf1a30Sjl {
54125cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
54225cf1a30Sjl 	if (Etm_exit) {
54325cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
54425cf1a30Sjl 		return;
54525cf1a30Sjl 	}
54625cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
54725cf1a30Sjl 
54825cf1a30Sjl 	if (mp->epm_oconn != NULL) {
54925cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
55025cf1a30Sjl 		mp->epm_oconn = NULL;
55125cf1a30Sjl 	}
55225cf1a30Sjl 
55325cf1a30Sjl 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
55425cf1a30Sjl 	mp->epm_cstat = C_UNINITIALIZED;
55525cf1a30Sjl 
55625cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
55725cf1a30Sjl 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
55825cf1a30Sjl 		mp->epm_qstat = Q_SUSPENDED;
55925cf1a30Sjl 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
56025cf1a30Sjl 
56125cf1a30Sjl 		if (mp->epm_timer_in_use == 0) {
56225cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
56325cf1a30Sjl 			    Reconn_interval);
56425cf1a30Sjl 			mp->epm_timer_in_use = 1;
56525cf1a30Sjl 		}
56625cf1a30Sjl 	}
56725cf1a30Sjl }
56825cf1a30Sjl 
56925cf1a30Sjl /*
57025cf1a30Sjl  * Reinitialize the connection. The old fmd_xprt_t handle must be
57125cf1a30Sjl  * removed/closed first.
57225cf1a30Sjl  * Assume caller holds lock on epm_lock.
57325cf1a30Sjl  */
57425cf1a30Sjl static void
57525cf1a30Sjl etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
57625cf1a30Sjl {
57725cf1a30Sjl 	/*
57825cf1a30Sjl 	 * To avoid a deadlock, wait for etm_send to finish before
57925cf1a30Sjl 	 * calling fmd_xprt_close()
58025cf1a30Sjl 	 */
58125cf1a30Sjl 	while (mp->epm_txbusy)
58225cf1a30Sjl 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
58325cf1a30Sjl 
58425cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
58525cf1a30Sjl 		fmd_xprt_close(hdl, mp->epm_xprthdl);
586154b1f02Sjrutt 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
58725cf1a30Sjl 		mp->epm_xprthdl = NULL;
58825cf1a30Sjl 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
58925cf1a30Sjl 		mp->epm_ep_nvl = NULL;
59025cf1a30Sjl 	}
59125cf1a30Sjl 
59225cf1a30Sjl 	if (mp->epm_timer_in_use) {
59325cf1a30Sjl 		fmd_timer_remove(hdl, mp->epm_timer_id);
59425cf1a30Sjl 		mp->epm_timer_in_use = 0;
59525cf1a30Sjl 	}
59625cf1a30Sjl 
59725cf1a30Sjl 	if (mp->epm_oconn != NULL) {
59825cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
59925cf1a30Sjl 		mp->epm_oconn = NULL;
60025cf1a30Sjl 	}
60125cf1a30Sjl 
60225cf1a30Sjl 	mp->epm_cstat = C_UNINITIALIZED;
60325cf1a30Sjl 	mp->epm_qstat = Q_UNINITIALIZED;
60425cf1a30Sjl }
60525cf1a30Sjl 
60625cf1a30Sjl /*
60725cf1a30Sjl  * Receive data from ETM transport layer.
60825cf1a30Sjl  * Note : This is not the fmdo_recv entry point.
60925cf1a30Sjl  *
61025cf1a30Sjl  */
61125cf1a30Sjl static int
61225cf1a30Sjl etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
61325cf1a30Sjl {
61425cf1a30Sjl 	size_t buflen, hdrlen;
61525cf1a30Sjl 	void *buf;
61625cf1a30Sjl 	char hbuf[ETM_HDRLEN];
61725cf1a30Sjl 	int hdrstat, rv;
61825cf1a30Sjl 
61925cf1a30Sjl 	hdrlen = ETM_HDRLEN;
62025cf1a30Sjl 
62125cf1a30Sjl 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
62225cf1a30Sjl 		fmd_hdl_debug(hdl, "failed to read header from %s",
62325cf1a30Sjl 		    mp->epm_ep_str);
62425cf1a30Sjl 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
62525cf1a30Sjl 		return (EIO);
62625cf1a30Sjl 	}
62725cf1a30Sjl 
62825cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
62925cf1a30Sjl 
63025cf1a30Sjl 	switch (hdrstat) {
63125cf1a30Sjl 	case ETM_HDR_INVALID:
63225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
63325cf1a30Sjl 		if (mp->epm_cstat == C_OPEN)
63425cf1a30Sjl 			mp->epm_cstat = C_CLOSED;
63525cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
63625cf1a30Sjl 
63725cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
63825cf1a30Sjl 		rv = ECANCELED;
63925cf1a30Sjl 		break;
64025cf1a30Sjl 
64125cf1a30Sjl 	case ETM_HDR_BADTYPE:
64225cf1a30Sjl 	case ETM_HDR_BADVERSION:
64325cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
64425cf1a30Sjl 
64525cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
64625cf1a30Sjl 		    hdrlen)) != hdrlen) {
64725cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
64825cf1a30Sjl 			    mp->epm_ep_str);
64925cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
65025cf1a30Sjl 			return (EIO);
65125cf1a30Sjl 		}
65225cf1a30Sjl 
65325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
65425cf1a30Sjl 		mp->epm_cstat = C_LIMBO;
65525cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
65625cf1a30Sjl 
65725cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
65825cf1a30Sjl 		rv = ENOTSUP;
65925cf1a30Sjl 		break;
66025cf1a30Sjl 
66125cf1a30Sjl 	case ETM_HDR_C_HELLO:
66225cf1a30Sjl 		/* Client is initiating a startup handshake */
66325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
66425cf1a30Sjl 		etm_reinit(hdl, mp);
66525cf1a30Sjl 		mp->epm_qstat = Q_INIT_PENDING;
66625cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
66725cf1a30Sjl 
66825cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
66925cf1a30Sjl 
67025cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
67125cf1a30Sjl 		    hdrlen)) != hdrlen) {
67225cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
67325cf1a30Sjl 			    mp->epm_ep_str);
67425cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
67525cf1a30Sjl 			return (EIO);
67625cf1a30Sjl 		}
67725cf1a30Sjl 
67825cf1a30Sjl 		rv = 0;
67925cf1a30Sjl 		break;
68025cf1a30Sjl 
68125cf1a30Sjl 	case ETM_HDR_ACK:
68225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
68325cf1a30Sjl 		if (mp->epm_qstat == Q_INIT_PENDING) {
68425cf1a30Sjl 			/* This is client's ACK from startup handshake */
68525cf1a30Sjl 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
68625cf1a30Sjl 			if (mp->epm_ep_nvl == NULL)
68725cf1a30Sjl 				(void) etm_get_ep_nvl(hdl, mp);
68825cf1a30Sjl 
68925cf1a30Sjl 			/*
69025cf1a30Sjl 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
69125cf1a30Sjl 			 * Etm_mod_lock held to avoid race with etm_send thread.
69225cf1a30Sjl 			 */
69325cf1a30Sjl 			(void) pthread_mutex_lock(&Etm_mod_lock);
69425cf1a30Sjl 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
69525cf1a30Sjl 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
69625cf1a30Sjl 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
69725cf1a30Sjl 				    "for %s", mp->epm_ep_str);
69825cf1a30Sjl 			}
69925cf1a30Sjl 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
70025cf1a30Sjl 			(void) pthread_mutex_unlock(&Etm_mod_lock);
70125cf1a30Sjl 
70225cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
70325cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
70425cf1a30Sjl 			fmd_hdl_debug(hdl, "queue open for %s",
70525cf1a30Sjl 			    mp->epm_ep_str);
70625cf1a30Sjl 		} else {
70725cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
70825cf1a30Sjl 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
70925cf1a30Sjl 			    "from %s\n", mp->epm_ep_str);
71025cf1a30Sjl 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
71125cf1a30Sjl 		}
71225cf1a30Sjl 
71325cf1a30Sjl 		rv = 0;
71425cf1a30Sjl 		break;
71525cf1a30Sjl 
71625cf1a30Sjl 	case ETM_HDR_SHUTDOWN:
71725cf1a30Sjl 		fmd_hdl_debug(hdl, "received shutdown from %s",
71825cf1a30Sjl 		    mp->epm_ep_str);
71925cf1a30Sjl 
72025cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
72125cf1a30Sjl 
72225cf1a30Sjl 		etm_reinit(hdl, mp);
72325cf1a30Sjl 
72425cf1a30Sjl 		if (IS_CLIENT(mp)) {
72525cf1a30Sjl 			/*
72625cf1a30Sjl 			 * A server shutdown is considered to be temporary.
72725cf1a30Sjl 			 * Prepare for reconnection.
72825cf1a30Sjl 			 */
72925cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
73025cf1a30Sjl 			    Reconn_interval);
73125cf1a30Sjl 
73225cf1a30Sjl 			mp->epm_timer_in_use = 1;
73325cf1a30Sjl 		}
73425cf1a30Sjl 
73525cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
73625cf1a30Sjl 
73725cf1a30Sjl 		rv = ECANCELED;
73825cf1a30Sjl 		break;
73925cf1a30Sjl 
74025cf1a30Sjl 	case ETM_HDR_MSG:
74125cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
74225cf1a30Sjl 		if (mp->epm_qstat == Q_UNINITIALIZED) {
74325cf1a30Sjl 			/* Peer (client) is unaware that we've restarted */
74425cf1a30Sjl 			(void) pthread_mutex_unlock(&mp->epm_lock);
74525cf1a30Sjl 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
74625cf1a30Sjl 			    ETM_HDR_S_RESTART, 0);
74725cf1a30Sjl 
74825cf1a30Sjl 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
74925cf1a30Sjl 			    hdrlen)) != hdrlen) {
75025cf1a30Sjl 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
75125cf1a30Sjl 				    "to %s", mp->epm_ep_str);
75225cf1a30Sjl 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
75325cf1a30Sjl 				return (EIO);
75425cf1a30Sjl 			}
75525cf1a30Sjl 
75625cf1a30Sjl 			return (ECANCELED);
75725cf1a30Sjl 		}
75825cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
75925cf1a30Sjl 
76025cf1a30Sjl 		buflen = etm_get_msglen(hbuf);
76125cf1a30Sjl 		ALLOC_BUF(hdl, buf, buflen);
76225cf1a30Sjl 
76325cf1a30Sjl 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
76425cf1a30Sjl 		    buflen) != buflen) {
76525cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to read message from %s",
76625cf1a30Sjl 			    mp->epm_ep_str);
76725cf1a30Sjl 			FREE_BUF(hdl, buf, buflen);
76825cf1a30Sjl 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
76925cf1a30Sjl 			return (EIO);
77025cf1a30Sjl 		}
77125cf1a30Sjl 
77225cf1a30Sjl 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
77325cf1a30Sjl 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
77425cf1a30Sjl 
77525cf1a30Sjl 		etm_hex_dump(hdl, buf, buflen, 0);
77625cf1a30Sjl 
77725cf1a30Sjl 		if (etm_post_msg(hdl, mp, buf, buflen)) {
77825cf1a30Sjl 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
77925cf1a30Sjl 			FREE_BUF(hdl, buf, buflen);
78025cf1a30Sjl 			return (EIO);
78125cf1a30Sjl 		}
78225cf1a30Sjl 
78325cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
78425cf1a30Sjl 
78525cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
78625cf1a30Sjl 
78725cf1a30Sjl 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
78825cf1a30Sjl 		    hdrlen)) != hdrlen) {
78925cf1a30Sjl 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
79025cf1a30Sjl 			    mp->epm_ep_str);
79125cf1a30Sjl 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
79225cf1a30Sjl 			return (EIO);
79325cf1a30Sjl 		}
79425cf1a30Sjl 
79525cf1a30Sjl 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
79625cf1a30Sjl 
79725cf1a30Sjl 		/*
79825cf1a30Sjl 		 * If we got this far and the current state of the
79925cf1a30Sjl 		 * outbound/sending connection is TIMED_OUT or
80025cf1a30Sjl 		 * LIMBO, then we should reinitialize it.
80125cf1a30Sjl 		 */
80225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
80325cf1a30Sjl 		if (mp->epm_cstat == C_TIMED_OUT ||
80425cf1a30Sjl 		    mp->epm_cstat == C_LIMBO) {
80525cf1a30Sjl 			if (mp->epm_oconn != NULL) {
80625cf1a30Sjl 				(void) etm_xport_close(hdl, mp->epm_oconn);
80725cf1a30Sjl 				mp->epm_oconn = NULL;
80825cf1a30Sjl 			}
80925cf1a30Sjl 			mp->epm_cstat = C_UNINITIALIZED;
81025cf1a30Sjl 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
81125cf1a30Sjl 			if (mp->epm_timer_in_use) {
81225cf1a30Sjl 				fmd_timer_remove(hdl, mp->epm_timer_id);
81325cf1a30Sjl 				mp->epm_timer_in_use = 0;
81425cf1a30Sjl 			}
81525cf1a30Sjl 			mp->epm_qstat = Q_OPEN;
81625cf1a30Sjl 			fmd_hdl_debug(hdl, "queue resumed for %s",
81725cf1a30Sjl 			    mp->epm_ep_str);
81825cf1a30Sjl 		}
81925cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
82025cf1a30Sjl 
82125cf1a30Sjl 		rv = 0;
82225cf1a30Sjl 		break;
82325cf1a30Sjl 
82425cf1a30Sjl 	default:
82525cf1a30Sjl 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
82625cf1a30Sjl 		    "from %s : %d", mp->epm_ep_str, hdrstat);
82725cf1a30Sjl 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
82825cf1a30Sjl 		rv = 0;
82925cf1a30Sjl 	}
83025cf1a30Sjl 
83125cf1a30Sjl 	return (rv);
83225cf1a30Sjl }
83325cf1a30Sjl 
83425cf1a30Sjl /*
83525cf1a30Sjl  * ETM transport layer callback function.
83625cf1a30Sjl  * The transport layer calls this function to :
83725cf1a30Sjl  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
83825cf1a30Sjl  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
83925cf1a30Sjl  */
84025cf1a30Sjl static int
84125cf1a30Sjl etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
84225cf1a30Sjl     void *arg)
84325cf1a30Sjl {
84425cf1a30Sjl 	etm_epmap_t *mp = (etm_epmap_t *)arg;
84525cf1a30Sjl 	int rv = 0;
84625cf1a30Sjl 
84725cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
84825cf1a30Sjl 	if (Etm_exit) {
84925cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
85025cf1a30Sjl 		return (ECANCELED);
85125cf1a30Sjl 	}
85225cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
85325cf1a30Sjl 
85425cf1a30Sjl 	switch (flag) {
85525cf1a30Sjl 	case ETM_CBFLAG_RECV:
85625cf1a30Sjl 		rv = etm_recv(hdl, conn, mp);
85725cf1a30Sjl 		break;
85825cf1a30Sjl 	case ETM_CBFLAG_REINIT:
85925cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
86025cf1a30Sjl 		etm_reinit(hdl, mp);
86125cf1a30Sjl 		(void) pthread_mutex_unlock(&mp->epm_lock);
86225cf1a30Sjl 		/*
86325cf1a30Sjl 		 * Return ECANCELED so the transport layer will close the
86425cf1a30Sjl 		 * server connection.  The transport layer is responsible for
86525cf1a30Sjl 		 * reestablishing this connection (should a connection request
86625cf1a30Sjl 		 * arrive from the peer).
86725cf1a30Sjl 		 */
86825cf1a30Sjl 		rv = ECANCELED;
86925cf1a30Sjl 		break;
87025cf1a30Sjl 	default:
87125cf1a30Sjl 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
87225cf1a30Sjl 		rv = ENOTSUP;
87325cf1a30Sjl 	}
87425cf1a30Sjl 
87525cf1a30Sjl 	return (rv);
87625cf1a30Sjl }
87725cf1a30Sjl 
87825cf1a30Sjl /*
87925cf1a30Sjl  * Allocate and initialize an etm_epmap_t struct for the given endpoint
88025cf1a30Sjl  * name string.
88125cf1a30Sjl  */
88225cf1a30Sjl static void
88325cf1a30Sjl etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
88425cf1a30Sjl {
88525cf1a30Sjl 	etm_epmap_t *newmap;
88625cf1a30Sjl 
88725cf1a30Sjl 	if (etm_check_dup_ep_str(hdl, epname)) {
88825cf1a30Sjl 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
88925cf1a30Sjl 		return;
89025cf1a30Sjl 	}
89125cf1a30Sjl 
89225cf1a30Sjl 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
89325cf1a30Sjl 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
89425cf1a30Sjl 	newmap->epm_xprtflags = flags;
89525cf1a30Sjl 	newmap->epm_cstat = C_UNINITIALIZED;
89625cf1a30Sjl 	newmap->epm_qstat = Q_UNINITIALIZED;
89725cf1a30Sjl 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
89825cf1a30Sjl 	newmap->epm_txbusy = 0;
89925cf1a30Sjl 
90025cf1a30Sjl 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
90125cf1a30Sjl 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
90225cf1a30Sjl 
90325cf1a30Sjl 	if (etm_get_ep_nvl(hdl, newmap)) {
90425cf1a30Sjl 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
90525cf1a30Sjl 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
90625cf1a30Sjl 		return;
90725cf1a30Sjl 	}
90825cf1a30Sjl 
90925cf1a30Sjl 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
91025cf1a30Sjl 	    etm_cb_func, newmap)) == NULL) {
91125cf1a30Sjl 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
91225cf1a30Sjl 		    newmap->epm_ep_str);
91325cf1a30Sjl 		etm_free_ep_nvl(hdl, newmap);
91425cf1a30Sjl 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
91525cf1a30Sjl 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
91625cf1a30Sjl 		return;
91725cf1a30Sjl 	}
91825cf1a30Sjl 
91925cf1a30Sjl 	if (IS_CLIENT(newmap)) {
92025cf1a30Sjl 		if (etm_handle_startup(hdl, newmap)) {
921154b1f02Sjrutt 			/*
922154b1f02Sjrutt 			 * For whatever reason, we could not complete the
923154b1f02Sjrutt 			 * startup handshake with the server.  Set the timer
924154b1f02Sjrutt 			 * and try again.
925154b1f02Sjrutt 			 */
926154b1f02Sjrutt 			if (newmap->epm_oconn != NULL) {
927154b1f02Sjrutt 				(void) etm_xport_close(hdl, newmap->epm_oconn);
928154b1f02Sjrutt 				newmap->epm_oconn = NULL;
929154b1f02Sjrutt 			}
930154b1f02Sjrutt 			newmap->epm_cstat = C_UNINITIALIZED;
931154b1f02Sjrutt 			newmap->epm_qstat = Q_UNINITIALIZED;
932154b1f02Sjrutt 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
933154b1f02Sjrutt 			    NULL, Reconn_interval);
934154b1f02Sjrutt 			newmap->epm_timer_in_use = 1;
93525cf1a30Sjl 		}
93625cf1a30Sjl 	}
93725cf1a30Sjl 
93825cf1a30Sjl 	/* Add this transport instance handle to the list */
93925cf1a30Sjl 	newmap->epm_next = Epmap_head;
94025cf1a30Sjl 	Epmap_head = newmap;
94125cf1a30Sjl 
94225cf1a30Sjl 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
94325cf1a30Sjl }
94425cf1a30Sjl 
94525cf1a30Sjl /*
94625cf1a30Sjl  * Parse the given property list string and call etm_init_epmap
94725cf1a30Sjl  * for each endpoint.
94825cf1a30Sjl  */
94925cf1a30Sjl static void
95025cf1a30Sjl etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
95125cf1a30Sjl {
95225cf1a30Sjl 	char *epstr, *ep, *prefix, *lasts, *numstr;
95325cf1a30Sjl 	char epname[MAXPATHLEN];
95425cf1a30Sjl 	size_t slen, nlen;
95525cf1a30Sjl 	int beg, end, i;
95625cf1a30Sjl 
95725cf1a30Sjl 	if (eplist == NULL)
95825cf1a30Sjl 		return;
95925cf1a30Sjl 	/*
96025cf1a30Sjl 	 * Create a copy of eplist for parsing.
96125cf1a30Sjl 	 * strtok/strtok_r(3C) will insert null chars to the string.
96225cf1a30Sjl 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
96325cf1a30Sjl 	 */
96425cf1a30Sjl 	slen = strlen(eplist);
96525cf1a30Sjl 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
96625cf1a30Sjl 	(void) strcpy(epstr, eplist);
96725cf1a30Sjl 
96825cf1a30Sjl 	/*
96925cf1a30Sjl 	 * The following are supported for the "client_list" and
97025cf1a30Sjl 	 * "server_list" properties :
97125cf1a30Sjl 	 *
97225cf1a30Sjl 	 *    A space-separated list of endpoints.
97325cf1a30Sjl 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
97425cf1a30Sjl 	 *
97525cf1a30Sjl 	 *    An array syntax for a range of instances.
97625cf1a30Sjl 	 *	"dev:///dom[0:2]"
97725cf1a30Sjl 	 *
97825cf1a30Sjl 	 *    A combination of both.
97925cf1a30Sjl 	 *	"dev:///dom0 dev:///dom[1:2]"
98025cf1a30Sjl 	 */
98125cf1a30Sjl 	ep = strtok_r(epstr, " ", &lasts);
98225cf1a30Sjl 	while (ep != NULL) {
98325cf1a30Sjl 		if (strchr(ep, '[') != NULL) {
98425cf1a30Sjl 			/*
98525cf1a30Sjl 			 * This string is using array syntax.
98625cf1a30Sjl 			 * Check the string for correct syntax.
98725cf1a30Sjl 			 */
98825cf1a30Sjl 			if ((strchr(ep, ':') == NULL) ||
98925cf1a30Sjl 			    (strchr(ep, ']') == NULL)) {
99025cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
99125cf1a30Sjl 				    "that includes : %s\n", ep);
99225cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
99325cf1a30Sjl 				continue;
99425cf1a30Sjl 			}
99525cf1a30Sjl 
99625cf1a30Sjl 			/* expand the array syntax */
99725cf1a30Sjl 			prefix = strtok(ep, "[");
99825cf1a30Sjl 
99925cf1a30Sjl 			numstr = strtok(NULL, ":");
100025cf1a30Sjl 			if ((numstr == NULL) || (!isdigit(*numstr))) {
100125cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
100225cf1a30Sjl 				    "that includes : %s[\n", prefix);
100325cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
100425cf1a30Sjl 				continue;
100525cf1a30Sjl 			}
100625cf1a30Sjl 			beg = atoi(numstr);
100725cf1a30Sjl 
100825cf1a30Sjl 			numstr = strtok(NULL, "]");
100925cf1a30Sjl 			if ((numstr == NULL) || (!isdigit(*numstr))) {
101025cf1a30Sjl 				fmd_hdl_error(hdl, "Syntax error in property "
101125cf1a30Sjl 				    "that includes : %s[\n", prefix);
101225cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
101325cf1a30Sjl 				continue;
101425cf1a30Sjl 			}
101525cf1a30Sjl 			end = atoi(numstr);
101625cf1a30Sjl 
101725cf1a30Sjl 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
101825cf1a30Sjl 
101925cf1a30Sjl 			if (nlen > MAXPATHLEN) {
102025cf1a30Sjl 				fmd_hdl_error(hdl, "Endpoint prop string "
102125cf1a30Sjl 				    "exceeds MAXPATHLEN\n");
102225cf1a30Sjl 				ep = strtok_r(NULL, " ", &lasts);
102325cf1a30Sjl 				continue;
102425cf1a30Sjl 			}
102525cf1a30Sjl 
102625cf1a30Sjl 			for (i = beg; i <= end; i++) {
102725cf1a30Sjl 				bzero(epname, MAXPATHLEN);
102825cf1a30Sjl 				(void) snprintf(epname, nlen, "%s%d",
102925cf1a30Sjl 				    prefix, i);
103025cf1a30Sjl 				etm_init_epmap(hdl, epname, flags);
103125cf1a30Sjl 			}
103225cf1a30Sjl 		} else {
103325cf1a30Sjl 			etm_init_epmap(hdl, ep, flags);
103425cf1a30Sjl 		}
103525cf1a30Sjl 
103625cf1a30Sjl 		ep = strtok_r(NULL, " ", &lasts);
103725cf1a30Sjl 	}
103825cf1a30Sjl 
103925cf1a30Sjl 	fmd_hdl_free(hdl, epstr, slen + 1);
104025cf1a30Sjl }
104125cf1a30Sjl 
104225cf1a30Sjl /*
104325cf1a30Sjl  * Free the transport infrastructure for an endpoint.
104425cf1a30Sjl  */
104525cf1a30Sjl static void
104625cf1a30Sjl etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
104725cf1a30Sjl {
104825cf1a30Sjl 	size_t hdrlen;
104925cf1a30Sjl 	char hbuf[ETM_HDRLEN];
105025cf1a30Sjl 
105125cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
105225cf1a30Sjl 
105325cf1a30Sjl 	/*
105425cf1a30Sjl 	 * If an etm_send thread is in progress, wait for it to finish.
105525cf1a30Sjl 	 * The etm_recv thread is managed by the transport layer and will
105625cf1a30Sjl 	 * be destroyed with etm_xport_fini().
105725cf1a30Sjl 	 */
105825cf1a30Sjl 	while (mp->epm_txbusy)
105925cf1a30Sjl 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
106025cf1a30Sjl 
106125cf1a30Sjl 	if (mp->epm_timer_in_use)
106225cf1a30Sjl 		fmd_timer_remove(hdl, mp->epm_timer_id);
106325cf1a30Sjl 
106425cf1a30Sjl 	if (mp->epm_oconn != NULL) {
106525cf1a30Sjl 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
106625cf1a30Sjl 		    ETM_HDR_SHUTDOWN, 0);
106725cf1a30Sjl 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
106825cf1a30Sjl 		    hdrlen);
106925cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
107025cf1a30Sjl 		mp->epm_oconn = NULL;
107125cf1a30Sjl 	}
107225cf1a30Sjl 
107325cf1a30Sjl 	if (mp->epm_xprthdl != NULL) {
107425cf1a30Sjl 		fmd_xprt_close(hdl, mp->epm_xprthdl);
107525cf1a30Sjl 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
107625cf1a30Sjl 		mp->epm_ep_nvl = NULL;
107725cf1a30Sjl 	}
107825cf1a30Sjl 
107925cf1a30Sjl 	if (mp->epm_ep_nvl != NULL)
108025cf1a30Sjl 		etm_free_ep_nvl(hdl, mp);
108125cf1a30Sjl 
108225cf1a30Sjl 	if (mp->epm_tlhdl != NULL)
108325cf1a30Sjl 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
108425cf1a30Sjl 
108525cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
108625cf1a30Sjl 	(void) pthread_mutex_destroy(&mp->epm_lock);
108725cf1a30Sjl 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
108825cf1a30Sjl 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
108925cf1a30Sjl 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
109025cf1a30Sjl }
109125cf1a30Sjl 
109225cf1a30Sjl /*
109325cf1a30Sjl  * FMD entry points
109425cf1a30Sjl  */
109525cf1a30Sjl 
109625cf1a30Sjl /*
109725cf1a30Sjl  * FMD fmdo_send entry point.
109825cf1a30Sjl  * Send an event to the remote endpoint and receive an ACK.
109925cf1a30Sjl  */
110025cf1a30Sjl static int
110125cf1a30Sjl etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
110225cf1a30Sjl {
110325cf1a30Sjl 	etm_epmap_t *mp;
110425cf1a30Sjl 	nvlist_t *msgnvl;
1105*77a7fd96Sjrutt 	int hdrstat, rv, cnt = 0;
110625cf1a30Sjl 	char *buf, *nvbuf, *class;
110725cf1a30Sjl 	size_t nvsize, buflen, hdrlen;
1108*77a7fd96Sjrutt 	struct timespec tms;
110925cf1a30Sjl 
111025cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
111125cf1a30Sjl 	if (Etm_exit) {
111225cf1a30Sjl 		(void) pthread_mutex_unlock(&Etm_mod_lock);
111325cf1a30Sjl 		return (FMD_SEND_RETRY);
111425cf1a30Sjl 	}
111525cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
111625cf1a30Sjl 
111725cf1a30Sjl 	mp = fmd_xprt_getspecific(hdl, xprthdl);
111825cf1a30Sjl 
1119*77a7fd96Sjrutt 	for (;;) {
1120*77a7fd96Sjrutt 		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1121*77a7fd96Sjrutt 			break;
1122*77a7fd96Sjrutt 		} else {
1123*77a7fd96Sjrutt 			/*
1124*77a7fd96Sjrutt 			 * Another thread may be (1) trying to close this
1125*77a7fd96Sjrutt 			 * fmd_xprt_t, or (2) posting an event to it.
1126*77a7fd96Sjrutt 			 * If (1), don't want to spend too much time here.
1127*77a7fd96Sjrutt 			 * If (2), allow it to finish and release epm_lock.
1128*77a7fd96Sjrutt 			 */
1129*77a7fd96Sjrutt 			if (cnt++ < 10) {
1130*77a7fd96Sjrutt 				tms.tv_sec = 0;
1131*77a7fd96Sjrutt 				tms.tv_nsec = (cnt * 10000);
1132*77a7fd96Sjrutt 				(void) nanosleep(&tms, NULL);
1133*77a7fd96Sjrutt 
1134*77a7fd96Sjrutt 			} else {
1135*77a7fd96Sjrutt 				return (FMD_SEND_RETRY);
1136*77a7fd96Sjrutt 			}
1137*77a7fd96Sjrutt 		}
1138*77a7fd96Sjrutt 	}
113925cf1a30Sjl 
114025cf1a30Sjl 	mp->epm_txbusy++;
114125cf1a30Sjl 
1142154b1f02Sjrutt 	if (mp->epm_qstat == Q_UNINITIALIZED) {
114325cf1a30Sjl 		mp->epm_txbusy--;
114425cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
114545736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
1146154b1f02Sjrutt 		return (FMD_SEND_FAILED);
1147154b1f02Sjrutt 	}
1148154b1f02Sjrutt 
1149154b1f02Sjrutt 	if (mp->epm_cstat == C_CLOSED) {
115025cf1a30Sjl 		etm_suspend_reconnect(hdl, mp);
1151154b1f02Sjrutt 		mp->epm_txbusy--;
1152154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
115345736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
115425cf1a30Sjl 		return (FMD_SEND_RETRY);
115525cf1a30Sjl 	}
115625cf1a30Sjl 
115725cf1a30Sjl 	if (mp->epm_cstat == C_LIMBO) {
115825cf1a30Sjl 		if (mp->epm_oconn != NULL) {
115925cf1a30Sjl 			(void) etm_xport_close(hdl, mp->epm_oconn);
116025cf1a30Sjl 			mp->epm_oconn = NULL;
116125cf1a30Sjl 		}
116225cf1a30Sjl 
116325cf1a30Sjl 		fmd_xprt_suspend(hdl, xprthdl);
116425cf1a30Sjl 		mp->epm_qstat = Q_SUSPENDED;
116525cf1a30Sjl 		mp->epm_txbusy--;
116625cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
116745736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
116825cf1a30Sjl 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
116925cf1a30Sjl 		return (FMD_SEND_RETRY);
117025cf1a30Sjl 	}
117125cf1a30Sjl 
117225cf1a30Sjl 	if (mp->epm_oconn == NULL) {
117325cf1a30Sjl 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
117425cf1a30Sjl 		    == NULL) {
1175154b1f02Sjrutt 			etm_suspend_reconnect(hdl, mp);
117625cf1a30Sjl 			mp->epm_txbusy--;
117725cf1a30Sjl 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
117845736083Sjrutt 			(void) pthread_mutex_unlock(&mp->epm_lock);
117925cf1a30Sjl 			return (FMD_SEND_RETRY);
118025cf1a30Sjl 		} else {
118125cf1a30Sjl 			mp->epm_cstat = C_OPEN;
118225cf1a30Sjl 		}
118325cf1a30Sjl 	}
118425cf1a30Sjl 
118525cf1a30Sjl 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
118625cf1a30Sjl 		fmd_hdl_abort(hdl, "No class string in nvlist");
118725cf1a30Sjl 
118825cf1a30Sjl 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
118925cf1a30Sjl 	if (msgnvl == NULL) {
1190154b1f02Sjrutt 		mp->epm_txbusy--;
1191154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
119245736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
119325cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
119425cf1a30Sjl 		    (void *) ep);
119525cf1a30Sjl 		return (FMD_SEND_FAILED);
119625cf1a30Sjl 	}
119725cf1a30Sjl 
1198*77a7fd96Sjrutt 	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1199*77a7fd96Sjrutt 	if (rv == ETM_XPORT_FILTER_DROP) {
1200*77a7fd96Sjrutt 		mp->epm_txbusy--;
1201*77a7fd96Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1202*77a7fd96Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
1203*77a7fd96Sjrutt 		fmd_hdl_debug(hdl, "send_filter dropped event");
1204*77a7fd96Sjrutt 		nvlist_free(msgnvl);
1205*77a7fd96Sjrutt 		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1206*77a7fd96Sjrutt 		return (FMD_SEND_SUCCESS);
1207*77a7fd96Sjrutt 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
1208*77a7fd96Sjrutt 		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1209*77a7fd96Sjrutt 		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1210*77a7fd96Sjrutt 		/* Still send event */
1211*77a7fd96Sjrutt 	}
1212*77a7fd96Sjrutt 
121325cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
121425cf1a30Sjl 
121525cf1a30Sjl 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
121625cf1a30Sjl 
121725cf1a30Sjl 	hdrlen = ETM_HDRLEN;
121825cf1a30Sjl 	buflen = nvsize + hdrlen;
121925cf1a30Sjl 
122025cf1a30Sjl 	ALLOC_BUF(hdl, buf, buflen);
122125cf1a30Sjl 
122225cf1a30Sjl 	nvbuf = buf + hdrlen;
122325cf1a30Sjl 
122425cf1a30Sjl 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
122525cf1a30Sjl 
122625cf1a30Sjl 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1227154b1f02Sjrutt 		(void) pthread_mutex_lock(&mp->epm_lock);
1228154b1f02Sjrutt 		mp->epm_txbusy--;
1229154b1f02Sjrutt 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
123045736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
123125cf1a30Sjl 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1232*77a7fd96Sjrutt 		nvlist_free(msgnvl);
123325cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
123425cf1a30Sjl 		return (FMD_SEND_FAILED);
123525cf1a30Sjl 	}
123625cf1a30Sjl 
123725cf1a30Sjl 	nvlist_free(msgnvl);
123825cf1a30Sjl 
123925cf1a30Sjl 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
124025cf1a30Sjl 	    buflen) != buflen) {
1241154b1f02Sjrutt 		fmd_hdl_debug(hdl, "failed to send message to %s",
1242154b1f02Sjrutt 		    mp->epm_ep_str);
124325cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
1244154b1f02Sjrutt 		etm_suspend_reconnect(hdl, mp);
124525cf1a30Sjl 		mp->epm_txbusy--;
124625cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
124745736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
124825cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
124925cf1a30Sjl 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
125025cf1a30Sjl 		return (FMD_SEND_RETRY);
125125cf1a30Sjl 	}
125225cf1a30Sjl 
125325cf1a30Sjl 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
125425cf1a30Sjl 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
125525cf1a30Sjl 
125625cf1a30Sjl 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
125725cf1a30Sjl 
125825cf1a30Sjl 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
125925cf1a30Sjl 	    hdrlen) != hdrlen) {
1260154b1f02Sjrutt 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1261154b1f02Sjrutt 		    mp->epm_ep_str);
126225cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
1263154b1f02Sjrutt 		etm_suspend_reconnect(hdl, mp);
126425cf1a30Sjl 		mp->epm_txbusy--;
126525cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
126645736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
126725cf1a30Sjl 		FREE_BUF(hdl, buf, buflen);
126825cf1a30Sjl 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
126925cf1a30Sjl 		return (FMD_SEND_RETRY);
127025cf1a30Sjl 	}
127125cf1a30Sjl 
127225cf1a30Sjl 	hdrstat = etm_check_hdr(hdl, mp, buf);
127325cf1a30Sjl 	FREE_BUF(hdl, buf, buflen);
127425cf1a30Sjl 
127525cf1a30Sjl 	if (hdrstat == ETM_HDR_ACK) {
127625cf1a30Sjl 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
127725cf1a30Sjl 	} else {
127825cf1a30Sjl 		(void) pthread_mutex_lock(&mp->epm_lock);
127925cf1a30Sjl 
128025cf1a30Sjl 		(void) etm_xport_close(hdl, mp->epm_oconn);
128125cf1a30Sjl 		mp->epm_oconn = NULL;
128225cf1a30Sjl 
128325cf1a30Sjl 		if (hdrstat == ETM_HDR_NAK) {
128425cf1a30Sjl 			/* Peer received a bad value in the header */
128525cf1a30Sjl 			if (mp->epm_xprthdl != NULL) {
128625cf1a30Sjl 				mp->epm_cstat = C_LIMBO;
128725cf1a30Sjl 				fmd_xprt_suspend(hdl, xprthdl);
128825cf1a30Sjl 				mp->epm_qstat = Q_SUSPENDED;
128925cf1a30Sjl 				fmd_hdl_debug(hdl, "received NAK, queue "
129025cf1a30Sjl 				    "suspended for %s", mp->epm_ep_str);
129125cf1a30Sjl 			}
129225cf1a30Sjl 
129325cf1a30Sjl 			rv = FMD_SEND_RETRY;
129425cf1a30Sjl 
129525cf1a30Sjl 		} else if (hdrstat == ETM_HDR_S_RESTART) {
129625cf1a30Sjl 			/* Server has restarted */
1297154b1f02Sjrutt 			mp->epm_cstat = C_CLOSED;
1298154b1f02Sjrutt 			mp->epm_qstat = Q_UNINITIALIZED;
1299154b1f02Sjrutt 			fmd_hdl_debug(hdl, "server %s restarted",
1300154b1f02Sjrutt 			    mp->epm_ep_str);
1301154b1f02Sjrutt 			/*
1302154b1f02Sjrutt 			 * Cannot call fmd_xprt_close here, so we'll do it
1303154b1f02Sjrutt 			 * on the timeout thread.
1304154b1f02Sjrutt 			 */
1305154b1f02Sjrutt 			if (mp->epm_timer_in_use == 0) {
1306154b1f02Sjrutt 				mp->epm_timer_id = fmd_timer_install(
1307154b1f02Sjrutt 				    hdl, mp, NULL, 0);
1308154b1f02Sjrutt 				mp->epm_timer_in_use = 1;
130925cf1a30Sjl 			}
131025cf1a30Sjl 
131125cf1a30Sjl 			/*
131225cf1a30Sjl 			 * fault.* or list.* events will be replayed if a
131325cf1a30Sjl 			 * transport is opened with the same auth.
131425cf1a30Sjl 			 * Other events will be discarded.
131525cf1a30Sjl 			 */
131625cf1a30Sjl 			rv = FMD_SEND_FAILED;
131725cf1a30Sjl 
131825cf1a30Sjl 		} else {
131925cf1a30Sjl 			mp->epm_cstat = C_CLOSED;
132025cf1a30Sjl 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
132125cf1a30Sjl 
132225cf1a30Sjl 			rv = FMD_SEND_RETRY;
132325cf1a30Sjl 		}
132425cf1a30Sjl 
132525cf1a30Sjl 		mp->epm_txbusy--;
132625cf1a30Sjl 
132725cf1a30Sjl 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
132845736083Sjrutt 		(void) pthread_mutex_unlock(&mp->epm_lock);
132925cf1a30Sjl 
133025cf1a30Sjl 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
133125cf1a30Sjl 
133225cf1a30Sjl 		return (rv);
133325cf1a30Sjl 	}
133425cf1a30Sjl 
133525cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
133625cf1a30Sjl 	mp->epm_txbusy--;
133725cf1a30Sjl 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
133845736083Sjrutt 	(void) pthread_mutex_unlock(&mp->epm_lock);
133925cf1a30Sjl 
134025cf1a30Sjl 	return (FMD_SEND_SUCCESS);
134125cf1a30Sjl }
134225cf1a30Sjl 
134325cf1a30Sjl /*
134425cf1a30Sjl  * FMD fmdo_timeout entry point..
134525cf1a30Sjl  */
134625cf1a30Sjl /*ARGSUSED*/
134725cf1a30Sjl static void
134825cf1a30Sjl etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
134925cf1a30Sjl {
135025cf1a30Sjl 	etm_epmap_t *mp = (etm_epmap_t *)data;
135125cf1a30Sjl 
135225cf1a30Sjl 	(void) pthread_mutex_lock(&mp->epm_lock);
135325cf1a30Sjl 
135425cf1a30Sjl 	mp->epm_timer_in_use = 0;
135525cf1a30Sjl 
135625cf1a30Sjl 	if (mp->epm_qstat == Q_UNINITIALIZED) {
135725cf1a30Sjl 		/* Server has shutdown and we (client) need to reconnect */
1358154b1f02Sjrutt 		if (mp->epm_xprthdl != NULL) {
1359154b1f02Sjrutt 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1360154b1f02Sjrutt 			fmd_hdl_debug(hdl, "queue closed for %s",
1361154b1f02Sjrutt 			    mp->epm_ep_str);
1362154b1f02Sjrutt 			mp->epm_xprthdl = NULL;
1363154b1f02Sjrutt 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1364154b1f02Sjrutt 			mp->epm_ep_nvl = NULL;
1365154b1f02Sjrutt 		}
1366154b1f02Sjrutt 
136725cf1a30Sjl 		if (mp->epm_ep_nvl == NULL)
136825cf1a30Sjl 			(void) etm_get_ep_nvl(hdl, mp);
136925cf1a30Sjl 
137025cf1a30Sjl 		if (etm_handle_startup(hdl, mp)) {
1371154b1f02Sjrutt 			if (mp->epm_oconn != NULL) {
1372154b1f02Sjrutt 				(void) etm_xport_close(hdl, mp->epm_oconn);
1373154b1f02Sjrutt 				mp->epm_oconn = NULL;
1374154b1f02Sjrutt 			}
1375154b1f02Sjrutt 			mp->epm_cstat = C_UNINITIALIZED;
137625cf1a30Sjl 			mp->epm_qstat = Q_UNINITIALIZED;
137725cf1a30Sjl 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
137825cf1a30Sjl 			    Reconn_interval);
137925cf1a30Sjl 			mp->epm_timer_in_use = 1;
138025cf1a30Sjl 		}
138125cf1a30Sjl 	} else {
138225cf1a30Sjl 		etm_reconnect(hdl, mp);
138325cf1a30Sjl 	}
138425cf1a30Sjl 
138525cf1a30Sjl 	(void) pthread_mutex_unlock(&mp->epm_lock);
138625cf1a30Sjl }
138725cf1a30Sjl 
138825cf1a30Sjl /*
138925cf1a30Sjl  * FMD Module declarations
139025cf1a30Sjl  */
139125cf1a30Sjl static const fmd_hdl_ops_t etm_ops = {
139225cf1a30Sjl 	NULL,		/* fmdo_recv */
139325cf1a30Sjl 	etm_timeout,	/* fmdo_timeout */
139425cf1a30Sjl 	NULL,		/* fmdo_close */
139525cf1a30Sjl 	NULL,		/* fmdo_stats */
139625cf1a30Sjl 	NULL,		/* fmdo_gc */
139725cf1a30Sjl 	etm_send,	/* fmdo_send */
139825cf1a30Sjl };
139925cf1a30Sjl 
140025cf1a30Sjl static const fmd_prop_t etm_props[] = {
140125cf1a30Sjl 	{ "client_list", FMD_TYPE_STRING, NULL },
140225cf1a30Sjl 	{ "server_list", FMD_TYPE_STRING, NULL },
140325cf1a30Sjl 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1404*77a7fd96Sjrutt 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1405*77a7fd96Sjrutt 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1406*77a7fd96Sjrutt 	{ "filter_path", FMD_TYPE_STRING, NULL },
140725cf1a30Sjl 	{ NULL, 0, NULL }
140825cf1a30Sjl };
140925cf1a30Sjl 
141025cf1a30Sjl static const fmd_hdl_info_t etm_info = {
141125cf1a30Sjl 	"Event Transport Module", "2.0", &etm_ops, etm_props
141225cf1a30Sjl };
141325cf1a30Sjl 
141425cf1a30Sjl /*
141525cf1a30Sjl  * Initialize the transport for use by ETM.
141625cf1a30Sjl  */
141725cf1a30Sjl void
141825cf1a30Sjl _fmd_init(fmd_hdl_t *hdl)
141925cf1a30Sjl {
142025cf1a30Sjl 	char *propstr;
142125cf1a30Sjl 
142225cf1a30Sjl 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
142325cf1a30Sjl 		return; /* invalid data in configuration file */
142425cf1a30Sjl 	}
142525cf1a30Sjl 
142625cf1a30Sjl 	/* Create global stats */
142725cf1a30Sjl 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
142825cf1a30Sjl 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
142925cf1a30Sjl 
143025cf1a30Sjl 	/* Get module properties */
143125cf1a30Sjl 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
143225cf1a30Sjl 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
143325cf1a30Sjl 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
143425cf1a30Sjl 
143525cf1a30Sjl 	propstr = fmd_prop_get_string(hdl, "client_list");
143625cf1a30Sjl 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
143725cf1a30Sjl 	fmd_prop_free_string(hdl, propstr);
143825cf1a30Sjl 
143925cf1a30Sjl 	propstr = fmd_prop_get_string(hdl, "server_list");
144025cf1a30Sjl 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
144125cf1a30Sjl 	fmd_prop_free_string(hdl, propstr);
144225cf1a30Sjl 
144325cf1a30Sjl 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
144425cf1a30Sjl 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
144525cf1a30Sjl 		fmd_hdl_unregister(hdl);
144625cf1a30Sjl 		return;
144725cf1a30Sjl 	}
144825cf1a30Sjl }
144925cf1a30Sjl 
145025cf1a30Sjl /*
145125cf1a30Sjl  * Teardown the transport
145225cf1a30Sjl  */
145325cf1a30Sjl void
145425cf1a30Sjl _fmd_fini(fmd_hdl_t *hdl)
145525cf1a30Sjl {
145625cf1a30Sjl 	etm_epmap_t *mp, *next;
145725cf1a30Sjl 
145825cf1a30Sjl 	(void) pthread_mutex_lock(&Etm_mod_lock);
145925cf1a30Sjl 	Etm_exit = 1;
146025cf1a30Sjl 	(void) pthread_mutex_unlock(&Etm_mod_lock);
146125cf1a30Sjl 
146225cf1a30Sjl 	mp = Epmap_head;
146325cf1a30Sjl 
146425cf1a30Sjl 	while (mp) {
146525cf1a30Sjl 		next = mp->epm_next;
146625cf1a30Sjl 		etm_free_epmap(hdl, mp);
146725cf1a30Sjl 		mp = next;
146825cf1a30Sjl 	}
146925cf1a30Sjl 
147025cf1a30Sjl 	fmd_hdl_unregister(hdl);
147125cf1a30Sjl }
1472