125cf1a3jl/*
225cf1a3jl * CDDL HEADER START
325cf1a3jl *
425cf1a3jl * The contents of this file are subject to the terms of the
525cf1a3jl * Common Development and Distribution License (the "License").
625cf1a3jl * You may not use this file except in compliance with the License.
725cf1a3jl *
825cf1a3jl * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
925cf1a3jl * or http://www.opensolaris.org/os/licensing.
1025cf1a3jl * See the License for the specific language governing permissions
1125cf1a3jl * and limitations under the License.
1225cf1a3jl *
1325cf1a3jl * When distributing Covered Code, include this CDDL HEADER in each
1425cf1a3jl * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
1525cf1a3jl * If applicable, add the following below this CDDL HEADER, with the
1625cf1a3jl * fields enclosed by brackets "[]" replaced with your own identifying
1725cf1a3jl * information: Portions Copyright [yyyy] [name of copyright owner]
1825cf1a3jl *
1925cf1a3jl * CDDL HEADER END
2025cf1a3jl */
2125cf1a3jl
2225cf1a3jl/*
2325cf1a3jl * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
2425cf1a3jl * Use is subject to license terms.
2525cf1a3jl */
2625cf1a3jl
2725cf1a3jl#pragma ident	"%Z%%M%	%I%	%E% SMI"
2825cf1a3jl
2925cf1a3jl/*
3025cf1a3jl * FMA Event Transport Module
3125cf1a3jl *
3225cf1a3jl * Plugin for sending/receiving FMA events to/from a remote endoint.
3325cf1a3jl */
3425cf1a3jl
3525cf1a3jl#include <netinet/in.h>
3677a7fd9jrutt#include <errno.h>
3725cf1a3jl#include <sys/fm/protocol.h>
3825cf1a3jl#include <sys/sysmacros.h>
3925cf1a3jl#include <pthread.h>
4025cf1a3jl#include <strings.h>
4125cf1a3jl#include <ctype.h>
4225cf1a3jl#include <link.h>
4325cf1a3jl#include <libnvpair.h>
4425cf1a3jl#include "etm_xport_api.h"
4525cf1a3jl#include "etm_proto.h"
4625cf1a3jl
4725cf1a3jl/*
4825cf1a3jl * ETM declarations
4925cf1a3jl */
5025cf1a3jl
5125cf1a3jltypedef enum etm_connection_status {
5225cf1a3jl	C_UNINITIALIZED = 0,
5325cf1a3jl	C_OPEN,				/* Connection is open */
5425cf1a3jl	C_CLOSED,			/* Connection is closed */
5525cf1a3jl	C_LIMBO,			/* Bad value in header from peer */
5625cf1a3jl	C_TIMED_OUT			/* Reconnection to peer timed out */
5725cf1a3jl} etm_connstat_t;
5825cf1a3jl
5925cf1a3jltypedef enum etm_fmd_queue_status {
6025cf1a3jl	Q_UNINITIALIZED = 100,
6125cf1a3jl	Q_INIT_PENDING,			/* Queue initialization in progress */
6225cf1a3jl	Q_OPEN,				/* Queue is open */
6325cf1a3jl	Q_SUSPENDED			/* Queue is suspended */
6425cf1a3jl} etm_qstat_t;
6525cf1a3jl
6625cf1a3jl/* Per endpoint data */
6725cf1a3jltypedef struct etm_endpoint_map {
6825cf1a3jl	uint8_t epm_ver;		/* Protocol version being used */
6925cf1a3jl	char *epm_ep_str;		/* Endpoint ID string */
7025cf1a3jl	int epm_xprtflags;		/* FMD transport open flags */
7125cf1a3jl	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
7225cf1a3jl	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
7325cf1a3jl	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
7425cf1a3jl	int epm_txbusy;			/* Busy doing send/transmit */
7525cf1a3jl	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
7625cf1a3jl	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
7725cf1a3jl	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
7825cf1a3jl	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
7925cf1a3jl	etm_connstat_t epm_cstat;	/* Status of connection */
8025cf1a3jl	id_t epm_timer_id;		/* Timer id */
8125cf1a3jl	int epm_timer_in_use;		/* Indicates if timer is in use */
8225cf1a3jl	hrtime_t epm_reconn_end;	/* Reconnection end time */
8325cf1a3jl	struct etm_endpoint_map *epm_next;
8425cf1a3jl} etm_epmap_t;
8525cf1a3jl
8625cf1a3jl#define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
8725cf1a3jl#define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
8825cf1a3jl#define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
8925cf1a3jl#define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
9025cf1a3jl#define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
9125cf1a3jl#define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
9225cf1a3jl
9325cf1a3jl#define	ALLOC_BUF(hdl, buf, size) \
9425cf1a3jl	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
9525cf1a3jl
9625cf1a3jl#define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
9725cf1a3jl
9825cf1a3jl#define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
9925cf1a3jl
10025cf1a3jl#define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10125cf1a3jl				(x)++;					    \
10225cf1a3jl				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10325cf1a3jl			}
10425cf1a3jl
10525cf1a3jl#define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
10625cf1a3jl				(x)--;					    \
10725cf1a3jl				(void) pthread_mutex_unlock(&Etm_mod_lock); \
10825cf1a3jl			}
10925cf1a3jl
11025cf1a3jl#define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
11125cf1a3jl				(x) += (y);				    \
11225cf1a3jl				(void) pthread_mutex_unlock(&Etm_mod_lock); \
11325cf1a3jl			}
11425cf1a3jl
11525cf1a3jl/*
11625cf1a3jl * Global variables
11725cf1a3jl */
11825cf1a3jlstatic pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
11925cf1a3jl					/* Protects globals */
12025cf1a3jlstatic hrtime_t Reconn_interval;	/* Time between reconnection attempts */
12125cf1a3jlstatic hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
12225cf1a3jlstatic hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
12325cf1a3jlstatic int Etm_dump = 0;		/* Enables hex dump for debug */
12425cf1a3jlstatic int Etm_exit = 0;		/* Flag for exit */
12525cf1a3jlstatic etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
12625cf1a3jl
12725cf1a3jl/* Module statistics */
12825cf1a3jlstatic struct etm_stats {
12925cf1a3jl	/* read counters */
13025cf1a3jl	fmd_stat_t read_ack;
13125cf1a3jl	fmd_stat_t read_bytes;
13225cf1a3jl	fmd_stat_t read_msg;
13377a7fd9jrutt	fmd_stat_t post_filter;
13425cf1a3jl	/* write counters */
13525cf1a3jl	fmd_stat_t write_ack;
13625cf1a3jl	fmd_stat_t write_bytes;
13725cf1a3jl	fmd_stat_t write_msg;
13877a7fd9jrutt	fmd_stat_t send_filter;
13925cf1a3jl	/* error counters */
14025cf1a3jl	fmd_stat_t error_protocol;
14125cf1a3jl	fmd_stat_t error_drop_read;
14225cf1a3jl	fmd_stat_t error_read;
14325cf1a3jl	fmd_stat_t error_read_badhdr;
14425cf1a3jl	fmd_stat_t error_write;
14577a7fd9jrutt	fmd_stat_t error_send_filter;
14677a7fd9jrutt	fmd_stat_t error_post_filter;
14725cf1a3jl	/* misc */
14825cf1a3jl	fmd_stat_t peer_count;
14925cf1a3jl
15025cf1a3jl} Etm_stats = {
15125cf1a3jl	/* read counters */
15225cf1a3jl	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
15325cf1a3jl	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
15425cf1a3jl	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
15577a7fd9jrutt	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
15625cf1a3jl	/* write counters */
15725cf1a3jl	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
15825cf1a3jl	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
15925cf1a3jl	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
16077a7fd9jrutt	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
16125cf1a3jl	/* ETM error counters */
16225cf1a3jl	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
16325cf1a3jl	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
16425cf1a3jl	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
16525cf1a3jl	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
16625cf1a3jl	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
16777a7fd9jrutt	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
16877a7fd9jrutt	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
16925cf1a3jl	/* ETM Misc */
17025cf1a3jl	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
17125cf1a3jl};
17225cf1a3jl
17325cf1a3jl/*
17425cf1a3jl * ETM Private functions
17525cf1a3jl */
17625cf1a3jl
17725cf1a3jl/*
17825cf1a3jl * Hex dump for debug.
17925cf1a3jl */
18025cf1a3jlstatic void
18125cf1a3jletm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
18225cf1a3jl{
18325cf1a3jl	int i, j, k;
18425cf1a3jl	int16_t *c;
18525cf1a3jl
18625cf1a3jl	if (Etm_dump == 0)
18725cf1a3jl		return;
18825cf1a3jl
18925cf1a3jl	j = buflen / 16;	/* Number of complete 8-column rows */
19025cf1a3jl	k = buflen % 16;	/* Is there a last (non-8-column) row? */
19125cf1a3jl
19225cf1a3jl	if (direction)
19325cf1a3jl		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
19425cf1a3jl	else
19525cf1a3jl		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
19625cf1a3jl
19725cf1a3jl	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
19825cf1a3jl
19925cf1a3jl	/* Dump the complete 8-column rows */
20025cf1a3jl	for (i = 0; i < j; i++) {
20125cf1a3jl		c = (int16_t *)buf + (i * 8);
20225cf1a3jl		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
20325cf1a3jl		    *(c+0), *(c+1), *(c+2), *(c+3),
20425cf1a3jl		    *(c+4), *(c+5), *(c+6), *(c+7));
20525cf1a3jl	}
20625cf1a3jl
20725cf1a3jl	/* Dump the last (incomplete) row */
20825cf1a3jl	c = (int16_t *)buf + (i * 8);
20925cf1a3jl	switch (k) {
21025cf1a3jl	case 4:
21125cf1a3jl		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
21225cf1a3jl		break;
21325cf1a3jl	case 8:
21425cf1a3jl		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
21525cf1a3jl		    *(c+2), *(c+3));
21625cf1a3jl		break;
21725cf1a3jl	case 12:
21825cf1a3jl		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
21925cf1a3jl		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
22025cf1a3jl		break;
22125cf1a3jl	}
22225cf1a3jl
22325cf1a3jl	fmd_hdl_debug(hdl, "---      End Dump      ---");
22425cf1a3jl}
22525cf1a3jl
22625cf1a3jl/*
22725cf1a3jl * Provide the length of a message based on the data in the given ETM header.
22825cf1a3jl */
22925cf1a3jlstatic size_t
23025cf1a3jletm_get_msglen(void *buf)
23125cf1a3jl{
23225cf1a3jl	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
23325cf1a3jl
23425cf1a3jl	return (ntohl(hp->hdr_msglen));
23525cf1a3jl}
23625cf1a3jl
23725cf1a3jl/*
23825cf1a3jl * Check the contents of the ETM header for errors.
23925cf1a3jl * Return the header type (hdr_type).
24025cf1a3jl */
24125cf1a3jlstatic int
24225cf1a3jletm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
24325cf1a3jl{
24425cf1a3jl	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
24525cf1a3jl
24625cf1a3jl	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247154b1f0jrutt		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
24825cf1a3jl		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
24925cf1a3jl		return (ETM_HDR_INVALID);
25025cf1a3jl	}
25125cf1a3jl
25225cf1a3jl	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
25325cf1a3jl	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
25425cf1a3jl		/* Until version is negotiated, other fields may be wrong */
25525cf1a3jl		return (hp->hdr_type);
25625cf1a3jl	}
25725cf1a3jl
25825cf1a3jl	if (hp->hdr_ver != mp->epm_ver) {
259154b1f0jrutt		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
26025cf1a3jl		    mp->epm_ep_str, hp->hdr_ver);
26125cf1a3jl		return (ETM_HDR_BADVERSION);
26225cf1a3jl	}
26325cf1a3jl
26425cf1a3jl	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
26525cf1a3jl	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266154b1f0jrutt		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
26725cf1a3jl		    mp->epm_ep_str, hp->hdr_type);
26825cf1a3jl		return (ETM_HDR_BADTYPE);
26925cf1a3jl	}
27025cf1a3jl
27125cf1a3jl	return (hp->hdr_type);
27225cf1a3jl}
27325cf1a3jl
27425cf1a3jl/*
27525cf1a3jl * Create an ETM header of a given type in the given buffer.
27625cf1a3jl * Return length of header.
27725cf1a3jl */
27825cf1a3jlstatic size_t
27925cf1a3jletm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
28025cf1a3jl{
28125cf1a3jl	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
28225cf1a3jl
28325cf1a3jl	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
28425cf1a3jl	hp->hdr_ver = ver;
28525cf1a3jl	hp->hdr_type = type;
28625cf1a3jl	hp->hdr_msglen = htonl(msglen);
28725cf1a3jl
28825cf1a3jl	return (ETM_HDRLEN);
28925cf1a3jl}
29025cf1a3jl
29125cf1a3jl/*
29225cf1a3jl * Convert message bytes to nvlist and post to fmd.
29325cf1a3jl * Return zero for success, non-zero for failure.
29425cf1a3jl *
29525cf1a3jl * Note : nvl is free'd by fmd.
29625cf1a3jl */
29725cf1a3jlstatic int
29825cf1a3jletm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
29925cf1a3jl{
30025cf1a3jl	nvlist_t *nvl;
30125cf1a3jl	int rv;
30225cf1a3jl
30325cf1a3jl	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304154b1f0jrutt		fmd_hdl_error(hdl, "failed to unpack message");
30525cf1a3jl		return (1);
30625cf1a3jl	}
30725cf1a3jl
30877a7fd9jrutt	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
30977a7fd9jrutt	if (rv == ETM_XPORT_FILTER_DROP) {
31077a7fd9jrutt		fmd_hdl_debug(hdl, "post_filter dropped event");
31177a7fd9jrutt		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
31277a7fd9jrutt		nvlist_free(nvl);
31377a7fd9jrutt		return (0);
31477a7fd9jrutt	} else if (rv == ETM_XPORT_FILTER_ERROR) {
31577a7fd9jrutt		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
31677a7fd9jrutt		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
31777a7fd9jrutt		/* Still post event */
31877a7fd9jrutt	}
31977a7fd9jrutt
32025cf1a3jl	(void) pthread_mutex_lock(&mp->epm_lock);
32125cf1a3jl	(void) pthread_mutex_lock(&Etm_mod_lock);
32225cf1a3jl	if (!Etm_exit) {
32325cf1a3jl		(void) pthread_mutex_unlock(&Etm_mod_lock);
32425cf1a3jl		if (mp->epm_qstat == Q_OPEN) {
32525cf1a3jl			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
32625cf1a3jl			rv = 0;
32725cf1a3jl		} else if (mp->epm_qstat == Q_SUSPENDED) {
32825cf1a3jl			fmd_xprt_resume(hdl, mp->epm_xprthdl);
32925cf1a3jl			if (mp->epm_timer_in_use) {
33025cf1a3jl				fmd_timer_remove(hdl, mp->epm_timer_id);
33125cf1a3jl				mp->epm_timer_in_use = 0;
33225cf1a3jl			}
33325cf1a3jl			mp->epm_qstat = Q_OPEN;
33425cf1a3jl			fmd_hdl_debug(hdl, "queue resumed for %s",
33525cf1a3jl			    mp->epm_ep_str);
33625cf1a3jl			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
33725cf1a3jl			rv = 0;
33825cf1a3jl		} else {
33925cf1a3jl			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
34025cf1a3jl			    mp->epm_qstat);
341154b1f0jrutt			nvlist_free(nvl);
342154b1f0jrutt			/* Remote peer will attempt to resend event */
34325cf1a3jl			rv = 2;
34425cf1a3jl		}
34525cf1a3jl	} else {
34625cf1a3jl		(void) pthread_mutex_unlock(&Etm_mod_lock);
34725cf1a3jl		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348154b1f0jrutt		nvlist_free(nvl);
349154b1f0jrutt		/* Remote peer will attempt to resend event */
35025cf1a3jl		rv = 3;
35125cf1a3jl	}
35225cf1a3jl
35325cf1a3jl	(void) pthread_mutex_unlock(&mp->epm_lock);
35425cf1a3jl
35525cf1a3jl	return (rv);
35625cf1a3jl}
35725cf1a3jl
35825cf1a3jl/*
35925cf1a3jl * Handle the startup handshake to the server.  The client always initiates
36025cf1a3jl * the startup handshake.  In the following sequence, we are the client and
36125cf1a3jl * the remote endpoint is the server.
36225cf1a3jl *
36325cf1a3jl *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
36425cf1a3jl *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
36525cf1a3jl *	Client sends ACK and transitions to Q_OPEN state.
36625cf1a3jl *	Server receives ACK and transitions to Q_OPEN state.
36725cf1a3jl *
36825cf1a3jl * Return 0 for success, nonzero for failure.
36925cf1a3jl */
37025cf1a3jlstatic int
37125cf1a3jletm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
37225cf1a3jl{
37325cf1a3jl	etm_proto_hdr_t *hp;
37425cf1a3jl	size_t hdrlen = ETM_HDRLEN;
37525cf1a3jl	int hdrstat;
37625cf1a3jl	char hbuf[ETM_HDRLEN];
37725cf1a3jl
37825cf1a3jl	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
37925cf1a3jl		return (1);
38025cf1a3jl
38125cf1a3jl	mp->epm_cstat = C_OPEN;
38225cf1a3jl
38325cf1a3jl	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
38425cf1a3jl
38525cf1a3jl	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
38625cf1a3jl	    hdrlen)) != hdrlen) {
38725cf1a3jl		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
38825cf1a3jl		    mp->epm_ep_str);
38925cf1a3jl		return (2);
39025cf1a3jl	}
39125cf1a3jl
39225cf1a3jl	mp->epm_qstat = Q_INIT_PENDING;
39325cf1a3jl
39425cf1a3jl	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
39525cf1a3jl	    hdrlen)) != hdrlen) {
39625cf1a3jl		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
39725cf1a3jl		    mp->epm_ep_str);
39825cf1a3jl		return (3);
39925cf1a3jl	}
40025cf1a3jl
40125cf1a3jl	hdrstat = etm_check_hdr(hdl, mp, hbuf);
40225cf1a3jl
40325cf1a3jl	if (hdrstat != ETM_HDR_S_HELLO) {
40425cf1a3jl		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
40525cf1a3jl		    "from %s", mp->epm_ep_str);
40625cf1a3jl		return (4);
40725cf1a3jl	}
40825cf1a3jl
40925cf1a3jl	/*
41025cf1a3jl	 * Get version from the server.
41125cf1a3jl	 * Currently, only one version is supported.
41225cf1a3jl	 */
41325cf1a3jl	hp = (etm_proto_hdr_t *)(void *)hbuf;
41425cf1a3jl	if (hp->hdr_ver != ETM_PROTO_V1) {
41525cf1a3jl		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
41625cf1a3jl		    mp->epm_ep_str, hp->hdr_ver);
41725cf1a3jl		return (5);
41825cf1a3jl	}
41925cf1a3jl	mp->epm_ver = hp->hdr_ver;
42025cf1a3jl
42125cf1a3jl	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
42225cf1a3jl
42325cf1a3jl	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
42425cf1a3jl	    hdrlen)) != hdrlen) {
42525cf1a3jl		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
42625cf1a3jl		    mp->epm_ep_str);
42725cf1a3jl		return (6);
42825cf1a3jl	}
42925cf1a3jl
43025cf1a3jl	/*
43125cf1a3jl	 * Call fmd_xprt_open and fmd_xprt_setspecific with
43225cf1a3jl	 * Etm_mod_lock held to avoid race with etm_send thread.
43325cf1a3jl	 */
43425cf1a3jl	(void) pthread_mutex_lock(&Etm_mod_lock);
43525cf1a3jl	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
43625cf1a3jl	    mp->epm_ep_nvl, NULL)) == NULL) {
43725cf1a3jl		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
43825cf1a3jl		    mp->epm_ep_str);
43925cf1a3jl	}
44025cf1a3jl	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
44125cf1a3jl	(void) pthread_mutex_unlock(&Etm_mod_lock);
44225cf1a3jl
44325cf1a3jl	mp->epm_qstat = Q_OPEN;
44425cf1a3jl	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
44525cf1a3jl
44625cf1a3jl	return (0);
44725cf1a3jl}
44825cf1a3jl
44925cf1a3jl/*
450ac92251jrutt * Open a connection to the peer, send a SHUTDOWN message,
451ac92251jrutt * and close the connection.
452ac92251jrutt */
453ac92251jruttstatic void
454ac92251jruttetm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
455ac92251jrutt{
456ac92251jrutt	size_t hdrlen = ETM_HDRLEN;
457ac92251jrutt	char hbuf[ETM_HDRLEN];
458ac92251jrutt
459ac92251jrutt	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
460ac92251jrutt		return;
461ac92251jrutt
462ac92251jrutt	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
463ac92251jrutt
464ac92251jrutt	(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
465ac92251jrutt
466ac92251jrutt	(void) etm_xport_close(hdl, mp->epm_oconn);
467ac92251jrutt	mp->epm_oconn = NULL;
468ac92251jrutt}
469ac92251jrutt
470ac92251jrutt/*
47125cf1a3jl * Alloc a nvlist and add a string for the endpoint.
47225cf1a3jl * Return zero for success, non-zero for failure.
47325cf1a3jl */
47425cf1a3jlstatic int
47525cf1a3jletm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
47625cf1a3jl{
47725cf1a3jl	/*
47825cf1a3jl	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
47925cf1a3jl	 * in fmd when this nvlist_t is free'd.
48025cf1a3jl	 */
48125cf1a3jl	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
48225cf1a3jl
48325cf1a3jl	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
484154b1f0jrutt		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
48525cf1a3jl		    "for %s", mp->epm_ep_str);
48625cf1a3jl		nvlist_free(mp->epm_ep_nvl);
48725cf1a3jl		return (1);
48825cf1a3jl	}
48925cf1a3jl
49025cf1a3jl	return (0);
49125cf1a3jl}
49225cf1a3jl
49325cf1a3jl/*
49425cf1a3jl * Free the nvlist for the endpoint_id string.
49525cf1a3jl */
49625cf1a3jl/*ARGSUSED*/
49725cf1a3jlstatic void
49825cf1a3jletm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
49925cf1a3jl{
50025cf1a3jl	nvlist_free(mp->epm_ep_nvl);
50125cf1a3jl}
50225cf1a3jl
50325cf1a3jl/*
50425cf1a3jl * Check for a duplicate endpoint/peer string.
50525cf1a3jl */
50625cf1a3jl/*ARGSUSED*/
50725cf1a3jlstatic int
50825cf1a3jletm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
50925cf1a3jl{
51025cf1a3jl	etm_epmap_t *mp;
51125cf1a3jl
51225cf1a3jl	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
51325cf1a3jl		if (strcmp(epname, mp->epm_ep_str) == 0)
51425cf1a3jl			return (1);
51525cf1a3jl
51625cf1a3jl	return (0);
51725cf1a3jl}
51825cf1a3jl
51925cf1a3jl/*
52025cf1a3jl * Attempt to re-open a connection with the remote endpoint.
52125cf1a3jl */
52225cf1a3jlstatic void
52325cf1a3jletm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
52425cf1a3jl{
52525cf1a3jl	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
52625cf1a3jl		if (gethrtime() < mp->epm_reconn_end) {
52725cf1a3jl			if ((mp->epm_oconn = etm_xport_open(hdl,
52825cf1a3jl			    mp->epm_tlhdl)) == NULL) {
52925cf1a3jl				fmd_hdl_debug(hdl, "reconnect failed for %s",
53025cf1a3jl				    mp->epm_ep_str);
53125cf1a3jl				mp->epm_timer_id = fmd_timer_install(hdl, mp,
53225cf1a3jl				    NULL, Reconn_interval);
53325cf1a3jl				mp->epm_timer_in_use = 1;
53425cf1a3jl			} else {
53525cf1a3jl				fmd_hdl_debug(hdl, "reconnect success for %s",
53625cf1a3jl				    mp->epm_ep_str);
53725cf1a3jl				mp->epm_reconn_end = 0;
53825cf1a3jl				mp->epm_cstat = C_OPEN;
53925cf1a3jl			}
54025cf1a3jl		} else {
54125cf1a3jl			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
54225cf1a3jl			    mp->epm_ep_str);
54325cf1a3jl			mp->epm_reconn_end = 0;
54425cf1a3jl			mp->epm_cstat = C_TIMED_OUT;
54525cf1a3jl		}
54625cf1a3jl	}
54725cf1a3jl
54825cf1a3jl	if (mp->epm_cstat == C_OPEN) {
54925cf1a3jl		fmd_xprt_resume(hdl, mp->epm_xprthdl);
55025cf1a3jl		mp->epm_qstat = Q_OPEN;
55125cf1a3jl		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
55225cf1a3jl	}
55325cf1a3jl}
55425cf1a3jl
55525cf1a3jl/*
55625cf1a3jl * Suspend a given connection and setup for reconnection retries.
557154b1f0jrutt * Assume caller holds lock on epm_lock.
55825cf1a3jl */
55925cf1a3jlstatic void
56025cf1a3jletm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
56125cf1a3jl{
56225cf1a3jl	(void) pthread_mutex_lock(&Etm_mod_lock);
56325cf1a3jl	if (Etm_exit) {
56425cf1a3jl		(void) pthread_mutex_unlock(&Etm_mod_lock);
56525cf1a3jl		return;
56625cf1a3jl	}
56725cf1a3jl	(void) pthread_mutex_unlock(&Etm_mod_lock);
56825cf1a3jl
56925cf1a3jl	if (mp->epm_oconn != NULL) {
57025cf1a3jl		(void) etm_xport_close(hdl, mp->epm_oconn);
57125cf1a3jl		mp->epm_oconn = NULL;
57225cf1a3jl	}
57325cf1a3jl
57425cf1a3jl	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
57525cf1a3jl	mp->epm_cstat = C_UNINITIALIZED;
57625cf1a3jl
57725cf1a3jl	if (mp->epm_xprthdl != NULL) {
57825cf1a3jl		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
57925cf1a3jl		mp->epm_qstat = Q_SUSPENDED;
58025cf1a3jl		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
58125cf1a3jl
58225cf1a3jl		if (mp->epm_timer_in_use == 0) {
58325cf1a3jl			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
58425cf1a3jl			    Reconn_interval);
58525cf1a3jl			mp->epm_timer_in_use = 1;
58625cf1a3jl		}
58725cf1a3jl	}
58825cf1a3jl}
58925cf1a3jl
59025cf1a3jl/*
59125cf1a3jl * Reinitialize the connection. The old fmd_xprt_t handle must be
59225cf1a3jl * removed/closed first.
59325cf1a3jl * Assume caller holds lock on epm_lock.
59425cf1a3jl */
59525cf1a3jlstatic void
59625cf1a3jletm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
59725cf1a3jl{
59825cf1a3jl	/*
59925cf1a3jl	 * To avoid a deadlock, wait for etm_send to finish before
60025cf1a3jl	 * calling fmd_xprt_close()
60125cf1a3jl	 */
60225cf1a3jl	while (mp->epm_txbusy)
60325cf1a3jl		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
60425cf1a3jl
60525cf1a3jl	if (mp->epm_xprthdl != NULL) {
60625cf1a3jl		fmd_xprt_close(hdl, mp->epm_xprthdl);
607154b1f0jrutt		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
60825cf1a3jl		mp->epm_xprthdl = NULL;
60925cf1a3jl		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
61025cf1a3jl		mp->epm_ep_nvl = NULL;
61125cf1a3jl	}
61225cf1a3jl
61325cf1a3jl	if (mp->epm_timer_in_use) {
61425cf1a3jl		fmd_timer_remove(hdl, mp->epm_timer_id);
61525cf1a3jl		mp->epm_timer_in_use = 0;
61625cf1a3jl	}
61725cf1a3jl
61825cf1a3jl	if (mp->epm_oconn != NULL) {
61925cf1a3jl		(void) etm_xport_close(hdl, mp->epm_oconn);
62025cf1a3jl		mp->epm_oconn = NULL;
62125cf1a3jl	}
62225cf1a3jl
62325cf1a3jl	mp->epm_cstat = C_UNINITIALIZED;
62425cf1a3jl	mp->epm_qstat = Q_UNINITIALIZED;
62525cf1a3jl}
62625cf1a3jl
62725cf1a3jl/*
62825cf1a3jl * Receive data from ETM transport layer.
62925cf1a3jl * Note : This is not the fmdo_recv entry point.
63025cf1a3jl *
63125cf1a3jl */
63225cf1a3jlstatic int
63325cf1a3jletm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
63425cf1a3jl{
63525cf1a3jl	size_t buflen, hdrlen;
63625cf1a3jl	void *buf;
63725cf1a3jl	char hbuf[ETM_HDRLEN];
63825cf1a3jl	int hdrstat, rv;
63925cf1a3jl
64025cf1a3jl	hdrlen = ETM_HDRLEN;
64125cf1a3jl
64225cf1a3jl	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
64325cf1a3jl		fmd_hdl_debug(hdl, "failed to read header from %s",
64425cf1a3jl		    mp->epm_ep_str);
64525cf1a3jl		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
64625cf1a3jl		return (EIO);
64725cf1a3jl	}
64825cf1a3jl
64925cf1a3jl	hdrstat = etm_check_hdr(hdl, mp, hbuf);
65025cf1a3jl
65125cf1a3jl	switch (hdrstat) {
65225cf1a3jl	case ETM_HDR_INVALID:
65325cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
65425cf1a3jl		if (mp->epm_cstat == C_OPEN)
65525cf1a3jl			mp->epm_cstat = C_CLOSED;
65625cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
65725cf1a3jl
65825cf1a3jl		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
65925cf1a3jl		rv = ECANCELED;
66025cf1a3jl		break;
66125cf1a3jl
66225cf1a3jl	case ETM_HDR_BADTYPE:
66325cf1a3jl	case ETM_HDR_BADVERSION:
66425cf1a3jl		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
66525cf1a3jl
66625cf1a3jl		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
66725cf1a3jl		    hdrlen)) != hdrlen) {
66825cf1a3jl			fmd_hdl_debug(hdl, "failed to write NAK to %s",
66925cf1a3jl			    mp->epm_ep_str);
67025cf1a3jl			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
67125cf1a3jl			return (EIO);
67225cf1a3jl		}
67325cf1a3jl
67425cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
67525cf1a3jl		mp->epm_cstat = C_LIMBO;
67625cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
67725cf1a3jl
67825cf1a3jl		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
67925cf1a3jl		rv = ENOTSUP;
68025cf1a3jl		break;
68125cf1a3jl
68225cf1a3jl	case ETM_HDR_C_HELLO:
68325cf1a3jl		/* Client is initiating a startup handshake */
68425cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
68525cf1a3jl		etm_reinit(hdl, mp);
68625cf1a3jl		mp->epm_qstat = Q_INIT_PENDING;
68725cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
68825cf1a3jl
68925cf1a3jl		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
69025cf1a3jl
69125cf1a3jl		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
69225cf1a3jl		    hdrlen)) != hdrlen) {
69325cf1a3jl			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
69425cf1a3jl			    mp->epm_ep_str);
69525cf1a3jl			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
69625cf1a3jl			return (EIO);
69725cf1a3jl		}
69825cf1a3jl
69925cf1a3jl		rv = 0;
70025cf1a3jl		break;
70125cf1a3jl
70225cf1a3jl	case ETM_HDR_ACK:
70325cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
70425cf1a3jl		if (mp->epm_qstat == Q_INIT_PENDING) {
70525cf1a3jl			/* This is client's ACK from startup handshake */
70625cf1a3jl			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
70725cf1a3jl			if (mp->epm_ep_nvl == NULL)
70825cf1a3jl				(void) etm_get_ep_nvl(hdl, mp);
70925cf1a3jl
71025cf1a3jl			/*
71125cf1a3jl			 * Call fmd_xprt_open and fmd_xprt_setspecific with
71225cf1a3jl			 * Etm_mod_lock held to avoid race with etm_send thread.
71325cf1a3jl			 */
71425cf1a3jl			(void) pthread_mutex_lock(&Etm_mod_lock);
71525cf1a3jl			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
71625cf1a3jl			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
71725cf1a3jl				fmd_hdl_abort(hdl, "Failed to init xprthdl "
71825cf1a3jl				    "for %s", mp->epm_ep_str);
71925cf1a3jl			}
72025cf1a3jl			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
72125cf1a3jl			(void) pthread_mutex_unlock(&Etm_mod_lock);
72225cf1a3jl
72325cf1a3jl			mp->epm_qstat = Q_OPEN;
72425cf1a3jl			(void) pthread_mutex_unlock(&mp->epm_lock);
72525cf1a3jl			fmd_hdl_debug(hdl, "queue open for %s",
72625cf1a3jl			    mp->epm_ep_str);
72725cf1a3jl		} else {
72825cf1a3jl			(void) pthread_mutex_unlock(&mp->epm_lock);
72925cf1a3jl			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
73025cf1a3jl			    "from %s\n", mp->epm_ep_str);
73125cf1a3jl			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
73225cf1a3jl		}
73325cf1a3jl
73425cf1a3jl		rv = 0;
73525cf1a3jl		break;
73625cf1a3jl
73725cf1a3jl	case ETM_HDR_SHUTDOWN:
73825cf1a3jl		fmd_hdl_debug(hdl, "received shutdown from %s",
73925cf1a3jl		    mp->epm_ep_str);
74025cf1a3jl
74125cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
74225cf1a3jl
74325cf1a3jl		etm_reinit(hdl, mp);
74425cf1a3jl
74525cf1a3jl		if (IS_CLIENT(mp)) {
74625cf1a3jl			/*
74725cf1a3jl			 * A server shutdown is considered to be temporary.
74825cf1a3jl			 * Prepare for reconnection.
74925cf1a3jl			 */
75025cf1a3jl			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
75125cf1a3jl			    Reconn_interval);
75225cf1a3jl
75325cf1a3jl			mp->epm_timer_in_use = 1;
75425cf1a3jl		}
75525cf1a3jl
75625cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
75725cf1a3jl
75825cf1a3jl		rv = ECANCELED;
75925cf1a3jl		break;
76025cf1a3jl
76125cf1a3jl	case ETM_HDR_MSG:
76225cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
76325cf1a3jl		if (mp->epm_qstat == Q_UNINITIALIZED) {
76425cf1a3jl			/* Peer (client) is unaware that we've restarted */
76525cf1a3jl			(void) pthread_mutex_unlock(&mp->epm_lock);
76625cf1a3jl			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
76725cf1a3jl			    ETM_HDR_S_RESTART, 0);
76825cf1a3jl
76925cf1a3jl			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
77025cf1a3jl			    hdrlen)) != hdrlen) {
77125cf1a3jl				fmd_hdl_debug(hdl, "failed to write S_RESTART "
77225cf1a3jl				    "to %s", mp->epm_ep_str);
77325cf1a3jl				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
77425cf1a3jl				return (EIO);
77525cf1a3jl			}
77625cf1a3jl
77725cf1a3jl			return (ECANCELED);
77825cf1a3jl		}
77925cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
78025cf1a3jl
78125cf1a3jl		buflen = etm_get_msglen(hbuf);
78225cf1a3jl		ALLOC_BUF(hdl, buf, buflen);
78325cf1a3jl
78425cf1a3jl		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
78525cf1a3jl		    buflen) != buflen) {
78625cf1a3jl			fmd_hdl_debug(hdl, "failed to read message from %s",
78725cf1a3jl			    mp->epm_ep_str);
78825cf1a3jl			FREE_BUF(hdl, buf, buflen);
78925cf1a3jl			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
79025cf1a3jl			return (EIO);
79125cf1a3jl		}
79225cf1a3jl
79325cf1a3jl		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
79425cf1a3jl		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
79525cf1a3jl
79625cf1a3jl		etm_hex_dump(hdl, buf, buflen, 0);
79725cf1a3jl
79825cf1a3jl		if (etm_post_msg(hdl, mp, buf, buflen)) {
79925cf1a3jl			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
80025cf1a3jl			FREE_BUF(hdl, buf, buflen);
80125cf1a3jl			return (EIO);
80225cf1a3jl		}
80325cf1a3jl
80425cf1a3jl		FREE_BUF(hdl, buf, buflen);
80525cf1a3jl
80625cf1a3jl		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
80725cf1a3jl
80825cf1a3jl		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
80925cf1a3jl		    hdrlen)) != hdrlen) {
81025cf1a3jl			fmd_hdl_debug(hdl, "failed to write ACK to %s",
81125cf1a3jl			    mp->epm_ep_str);
81225cf1a3jl			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
81325cf1a3jl			return (EIO);
81425cf1a3jl		}
81525cf1a3jl
81625cf1a3jl		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
81725cf1a3jl
81825cf1a3jl		/*
81925cf1a3jl		 * If we got this far and the current state of the
82025cf1a3jl		 * outbound/sending connection is TIMED_OUT or
82125cf1a3jl		 * LIMBO, then we should reinitialize it.
82225cf1a3jl		 */
82325cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
82425cf1a3jl		if (mp->epm_cstat == C_TIMED_OUT ||
82525cf1a3jl		    mp->epm_cstat == C_LIMBO) {
82625cf1a3jl			if (mp->epm_oconn != NULL) {
82725cf1a3jl				(void) etm_xport_close(hdl, mp->epm_oconn);
82825cf1a3jl				mp->epm_oconn = NULL;
82925cf1a3jl			}
83025cf1a3jl			mp->epm_cstat = C_UNINITIALIZED;
83125cf1a3jl			fmd_xprt_resume(hdl, mp->epm_xprthdl);
83225cf1a3jl			if (mp->epm_timer_in_use) {
83325cf1a3jl				fmd_timer_remove(hdl, mp->epm_timer_id);
83425cf1a3jl				mp->epm_timer_in_use = 0;
83525cf1a3jl			}
83625cf1a3jl			mp->epm_qstat = Q_OPEN;
83725cf1a3jl			fmd_hdl_debug(hdl, "queue resumed for %s",
83825cf1a3jl			    mp->epm_ep_str);
83925cf1a3jl		}
84025cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
84125cf1a3jl
84225cf1a3jl		rv = 0;
84325cf1a3jl		break;
84425cf1a3jl
84525cf1a3jl	default:
84625cf1a3jl		fmd_hdl_debug(hdl, "protocol error, unexpected header "
84725cf1a3jl		    "from %s : %d", mp->epm_ep_str, hdrstat);
84825cf1a3jl		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
84925cf1a3jl		rv = 0;
85025cf1a3jl	}
85125cf1a3jl
85225cf1a3jl	return (rv);
85325cf1a3jl}
85425cf1a3jl
85525cf1a3jl/*
85625cf1a3jl * ETM transport layer callback function.
85725cf1a3jl * The transport layer calls this function to :
85825cf1a3jl *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
85925cf1a3jl *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
86025cf1a3jl */
86125cf1a3jlstatic int
86225cf1a3jletm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
86325cf1a3jl    void *arg)
86425cf1a3jl{
86525cf1a3jl	etm_epmap_t *mp = (etm_epmap_t *)arg;
86625cf1a3jl	int rv = 0;
86725cf1a3jl
86825cf1a3jl	(void) pthread_mutex_lock(&Etm_mod_lock);
86925cf1a3jl	if (Etm_exit) {
87025cf1a3jl		(void) pthread_mutex_unlock(&Etm_mod_lock);
87125cf1a3jl		return (ECANCELED);
87225cf1a3jl	}
87325cf1a3jl	(void) pthread_mutex_unlock(&Etm_mod_lock);
87425cf1a3jl
87525cf1a3jl	switch (flag) {
87625cf1a3jl	case ETM_CBFLAG_RECV:
87725cf1a3jl		rv = etm_recv(hdl, conn, mp);
87825cf1a3jl		break;
87925cf1a3jl	case ETM_CBFLAG_REINIT:
88025cf1a3jl		(void) pthread_mutex_lock(&mp->epm_lock);
88125cf1a3jl		etm_reinit(hdl, mp);
882ac92251jrutt		etm_send_shutdown(hdl, mp);
88325cf1a3jl		(void) pthread_mutex_unlock(&mp->epm_lock);
88425cf1a3jl		/*
88525cf1a3jl		 * Return ECANCELED so the transport layer will close the
88625cf1a3jl		 * server connection.  The transport layer is responsible for
88725cf1a3jl		 * reestablishing this connection (should a connection request
88825cf1a3jl		 * arrive from the peer).
88925cf1a3jl		 */
89025cf1a3jl		rv = ECANCELED;
89125cf1a3jl		break;
89225cf1a3jl	default:
89325cf1a3jl		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
89425cf1a3jl		rv = ENOTSUP;
89525cf1a3jl	}
89625cf1a3jl
89725cf1a3jl	return (rv);
89825cf1a3jl}
89925cf1a3jl
90025cf1a3jl/*
90125cf1a3jl * Allocate and initialize an etm_epmap_t struct for the given endpoint
90225cf1a3jl * name string.
90325cf1a3jl */
90425cf1a3jlstatic void
90525cf1a3jletm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
90625cf1a3jl{
90725cf1a3jl	etm_epmap_t *newmap;
90825cf1a3jl
90925cf1a3jl	if (etm_check_dup_ep_str(hdl, epname)) {
91025cf1a3jl		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
91125cf1a3jl		return;
91225cf1a3jl	}
91325cf1a3jl
91425cf1a3jl	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
91525cf1a3jl	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
91625cf1a3jl	newmap->epm_xprtflags = flags;
91725cf1a3jl	newmap->epm_cstat = C_UNINITIALIZED;
91825cf1a3jl	newmap->epm_qstat = Q_UNINITIALIZED;
91925cf1a3jl	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
92025cf1a3jl	newmap->epm_txbusy = 0;
92125cf1a3jl
92225cf1a3jl	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
92325cf1a3jl	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
92425cf1a3jl
92525cf1a3jl	if (etm_get_ep_nvl(hdl, newmap)) {
92625cf1a3jl		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
92725cf1a3jl		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
92825cf1a3jl		return;
92925cf1a3jl	}
93025cf1a3jl
931ac92251jrutt	(void) pthread_mutex_lock(&newmap->epm_lock);
932ac92251jrutt
93325cf1a3jl	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
93425cf1a3jl	    etm_cb_func, newmap)) == NULL) {
93525cf1a3jl		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
93625cf1a3jl		    newmap->epm_ep_str);
93725cf1a3jl		etm_free_ep_nvl(hdl, newmap);
938ac92251jrutt		(void) pthread_mutex_unlock(&newmap->epm_lock);
939ac92251jrutt		(void) pthread_mutex_destroy(&newmap->epm_lock);
94025cf1a3jl		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
94125cf1a3jl		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
94225cf1a3jl		return;
94325cf1a3jl	}
94425cf1a3jl
94525cf1a3jl	if (IS_CLIENT(newmap)) {
94625cf1a3jl		if (etm_handle_startup(hdl, newmap)) {
947154b1f0jrutt			/*
948154b1f0jrutt			 * For whatever reason, we could not complete the
949154b1f0jrutt			 * startup handshake with the server.  Set the timer
950154b1f0jrutt			 * and try again.
951154b1f0jrutt			 */
952154b1f0jrutt			if (newmap->epm_oconn != NULL) {
953154b1f0jrutt				(void) etm_xport_close(hdl, newmap->epm_oconn);
954154b1f0jrutt				newmap->epm_oconn = NULL;
955154b1f0jrutt			}
956154b1f0jrutt			newmap->epm_cstat = C_UNINITIALIZED;
957154b1f0jrutt			newmap->epm_qstat = Q_UNINITIALIZED;
958154b1f0jrutt			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
959154b1f0jrutt			    NULL, Reconn_interval);
960154b1f0jrutt			newmap->epm_timer_in_use = 1;
96125cf1a3jl		}
962ac92251jrutt	} else {
963ac92251jrutt		/*
964ac92251jrutt		 * We may be restarting after a crash.  If so, the client
965ac92251jrutt		 * may be unaware of this.
966ac92251jrutt		 */
967ac92251jrutt		etm_send_shutdown(hdl, newmap);
96825cf1a3jl	}
96925cf1a3jl
97025cf1a3jl	/* Add this transport instance handle to the list */
97125cf1a3jl	newmap->epm_next = Epmap_head;
97225cf1a3jl	Epmap_head = newmap;
97325cf1a3jl
974ac92251jrutt	(void) pthread_mutex_unlock(&newmap->epm_lock);
975ac92251jrutt
97625cf1a3jl	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
97725cf1a3jl}
97825cf1a3jl
97925cf1a3jl/*
98025cf1a3jl * Parse the given property list string and call etm_init_epmap
98125cf1a3jl * for each endpoint.
98225cf1a3jl */
98325cf1a3jlstatic void
98425cf1a3jletm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
98525cf1a3jl{
98625cf1a3jl	char *epstr, *ep, *prefix, *lasts, *numstr;
98725cf1a3jl	char epname[MAXPATHLEN];
98825cf1a3jl	size_t slen, nlen;
98925cf1a3jl	int beg, end, i;
99025cf1a3jl
99125cf1a3jl	if (eplist == NULL)
99225cf1a3jl		return;
99325cf1a3jl	/*
99425cf1a3jl	 * Create a copy of eplist for parsing.
99525cf1a3jl	 * strtok/strtok_r(3C) will insert null chars to the string.
99625cf1a3jl	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
99725cf1a3jl	 */
99825cf1a3jl	slen = strlen(eplist);
99925cf1a3jl	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
100025cf1a3jl	(void) strcpy(epstr, eplist);
100125cf1a3jl
100225cf1a3jl	/*
100325cf1a3jl	 * The following are supported for the "client_list" and
100425cf1a3jl	 * "server_list" properties :
100525cf1a3jl	 *
100625cf1a3jl	 *    A space-separated list of endpoints.
100725cf1a3jl	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
100825cf1a3jl	 *
100925cf1a3jl	 *    An array syntax for a range of instances.
101025cf1a3jl	 *	"dev:///dom[0:2]"
101125cf1a3jl	 *
101225cf1a3jl	 *    A combination of both.
101325cf1a3jl	 *	"dev:///dom0 dev:///dom[1:2]"
101425cf1a3jl	 */
101525cf1a3jl	ep = strtok_r(epstr, " ", &lasts);
101625cf1a3jl	while (ep != NULL) {
101725cf1a3jl		if (strchr(ep, '[') != NULL) {
101825cf1a3jl			/*
101925cf1a3jl			 * This string is using array syntax.
102025cf1a3jl			 * Check the string for correct syntax.
102125cf1a3jl			 */
102225cf1a3jl			if ((strchr(ep, ':') == NULL) ||
102325cf1a3jl			    (strchr(ep, ']') == NULL)) {
102425cf1a3jl				fmd_hdl_error(hdl, "Syntax error in property "
102525cf1a3jl				    "that includes : %s\n", ep);
102625cf1a3jl				ep = strtok_r(NULL, " ", &lasts);
102725cf1a3jl				continue;
102825cf1a3jl			}
102925cf1a3jl
103025cf1a3jl			/* expand the array syntax */
103125cf1a3jl			prefix = strtok(ep, "[");
103225cf1a3jl
103325cf1a3jl			numstr = strtok(NULL, ":");
103425cf1a3jl			if ((numstr == NULL) || (!isdigit(*numstr))) {
103525cf1a3jl				fmd_hdl_error(hdl, "Syntax error in property "
103625cf1a3jl				    "that includes : %s[\n", prefix);
103725cf1a3jl				ep = strtok_r(NULL, " ", &lasts);
103825cf1a3jl				continue;
103925cf1a3jl			}
104025cf1a3jl			beg = atoi(numstr);
104125cf1a3jl
104225cf1a3jl			numstr = strtok(NULL, "]");
104325cf1a3jl			if ((numstr == NULL) || (!isdigit(*numstr))) {
104425cf1a3jl				fmd_hdl_error(hdl, "Syntax error in property "
104525cf1a3jl				    "that includes : %s[\n", prefix);
104625cf1a3jl				ep = strtok_r(NULL, " ", &lasts);
104725cf1a3jl				continue;
104825cf1a3jl			}
104925cf1a3jl			end = atoi(numstr);
105025cf1a3jl
105125cf1a3jl			nlen = strlen(prefix) + ETM_EP_INST_MAX;
105225cf1a3jl
105325cf1a3jl			if (nlen > MAXPATHLEN) {
105425cf1a3jl				fmd_hdl_error(hdl, "Endpoint prop string "
105525cf1a3jl				    "exceeds MAXPATHLEN\n");
105625cf1a3jl				ep = strtok_r(NULL, " ", &lasts);
105725cf1a3jl				continue;
105825cf1a3jl			}
105925cf1a3jl
106025cf1a3jl			for (i = beg; i <= end; i++) {
106125cf1a3jl				bzero(epname, MAXPATHLEN);
106225cf1a3jl				(void) snprintf(epname, nlen, "%s%d",
106325cf1a3jl				    prefix, i);
106425cf1a3jl				etm_init_epmap(hdl, epname, flags);
106525cf1a3jl			}
106625cf1a3jl		} else {
106725cf1a3jl			etm_init_epmap(hdl, ep, flags);
106825cf1a3jl		}
106925cf1a3jl
107025cf1a3jl		ep = strtok_r(NULL, " ", &lasts);
107125cf1a3jl	}
107225cf1a3jl
107325cf1a3jl	fmd_hdl_free(hdl, epstr, slen + 1);
107425cf1a3jl}
107525cf1a3jl
107625cf1a3jl/*
107725cf1a3jl * Free the transport infrastructure for an endpoint.
107825cf1a3jl */
107925cf1a3jlstatic void
108025cf1a3jletm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
108125cf1a3jl{
108225cf1a3jl	size_t hdrlen;
108325cf1a3jl	char hbuf[ETM_HDRLEN];
1084