1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27#pragma ident	"%Z%%M%	%I%	%E% SMI"
28
29/*
30 * FMA Event Transport Module
31 *
32 * Plugin for sending/receiving FMA events to/from a remote endoint.
33 */
34
35#include <netinet/in.h>
36#include <errno.h>
37#include <sys/fm/protocol.h>
38#include <sys/sysmacros.h>
39#include <pthread.h>
40#include <strings.h>
41#include <ctype.h>
42#include <link.h>
43#include <libnvpair.h>
44#include "etm_xport_api.h"
45#include "etm_proto.h"
46
47/*
48 * ETM declarations
49 */
50
51typedef enum etm_connection_status {
52	C_UNINITIALIZED = 0,
53	C_OPEN,				/* Connection is open */
54	C_CLOSED,			/* Connection is closed */
55	C_LIMBO,			/* Bad value in header from peer */
56	C_TIMED_OUT			/* Reconnection to peer timed out */
57} etm_connstat_t;
58
59typedef enum etm_fmd_queue_status {
60	Q_UNINITIALIZED = 100,
61	Q_INIT_PENDING,			/* Queue initialization in progress */
62	Q_OPEN,				/* Queue is open */
63	Q_SUSPENDED			/* Queue is suspended */
64} etm_qstat_t;
65
66/* Per endpoint data */
67typedef struct etm_endpoint_map {
68	uint8_t epm_ver;		/* Protocol version being used */
69	char *epm_ep_str;		/* Endpoint ID string */
70	int epm_xprtflags;		/* FMD transport open flags */
71	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
72	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
73	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
74	int epm_txbusy;			/* Busy doing send/transmit */
75	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
76	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
77	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
78	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
79	etm_connstat_t epm_cstat;	/* Status of connection */
80	id_t epm_timer_id;		/* Timer id */
81	int epm_timer_in_use;		/* Indicates if timer is in use */
82	hrtime_t epm_reconn_end;	/* Reconnection end time */
83	struct etm_endpoint_map *epm_next;
84} etm_epmap_t;
85
86#define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
87#define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
88#define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
89#define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
90#define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
91#define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
92
93#define	ALLOC_BUF(hdl, buf, size) \
94	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
95
96#define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
97
98#define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
99
100#define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
101				(x)++;					    \
102				(void) pthread_mutex_unlock(&Etm_mod_lock); \
103			}
104
105#define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
106				(x)--;					    \
107				(void) pthread_mutex_unlock(&Etm_mod_lock); \
108			}
109
110#define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
111				(x) += (y);				    \
112				(void) pthread_mutex_unlock(&Etm_mod_lock); \
113			}
114
115/*
116 * Global variables
117 */
118static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
119					/* Protects globals */
120static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
121static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
122static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
123static int Etm_dump = 0;		/* Enables hex dump for debug */
124static int Etm_exit = 0;		/* Flag for exit */
125static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
126
127/* Module statistics */
128static struct etm_stats {
129	/* read counters */
130	fmd_stat_t read_ack;
131	fmd_stat_t read_bytes;
132	fmd_stat_t read_msg;
133	fmd_stat_t post_filter;
134	/* write counters */
135	fmd_stat_t write_ack;
136	fmd_stat_t write_bytes;
137	fmd_stat_t write_msg;
138	fmd_stat_t send_filter;
139	/* error counters */
140	fmd_stat_t error_protocol;
141	fmd_stat_t error_drop_read;
142	fmd_stat_t error_read;
143	fmd_stat_t error_read_badhdr;
144	fmd_stat_t error_write;
145	fmd_stat_t error_send_filter;
146	fmd_stat_t error_post_filter;
147	/* misc */
148	fmd_stat_t peer_count;
149
150} Etm_stats = {
151	/* read counters */
152	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
153	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
154	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
155	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
156	/* write counters */
157	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
158	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
159	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
160	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
161	/* ETM error counters */
162	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
163	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
164	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
165	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
166	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
167	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
168	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
169	/* ETM Misc */
170	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
171};
172
173/*
174 * ETM Private functions
175 */
176
177/*
178 * Hex dump for debug.
179 */
180static void
181etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
182{
183	int i, j, k;
184	int16_t *c;
185
186	if (Etm_dump == 0)
187		return;
188
189	j = buflen / 16;	/* Number of complete 8-column rows */
190	k = buflen % 16;	/* Is there a last (non-8-column) row? */
191
192	if (direction)
193		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
194	else
195		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
196
197	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
198
199	/* Dump the complete 8-column rows */
200	for (i = 0; i < j; i++) {
201		c = (int16_t *)buf + (i * 8);
202		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
203		    *(c+0), *(c+1), *(c+2), *(c+3),
204		    *(c+4), *(c+5), *(c+6), *(c+7));
205	}
206
207	/* Dump the last (incomplete) row */
208	c = (int16_t *)buf + (i * 8);
209	switch (k) {
210	case 4:
211		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
212		break;
213	case 8:
214		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
215		    *(c+2), *(c+3));
216		break;
217	case 12:
218		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
219		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
220		break;
221	}
222
223	fmd_hdl_debug(hdl, "---      End Dump      ---");
224}
225
226/*
227 * Provide the length of a message based on the data in the given ETM header.
228 */
229static size_t
230etm_get_msglen(void *buf)
231{
232	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
233
234	return (ntohl(hp->hdr_msglen));
235}
236
237/*
238 * Check the contents of the ETM header for errors.
239 * Return the header type (hdr_type).
240 */
241static int
242etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
243{
244	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
245
246	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
248		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
249		return (ETM_HDR_INVALID);
250	}
251
252	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
253	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
254		/* Until version is negotiated, other fields may be wrong */
255		return (hp->hdr_type);
256	}
257
258	if (hp->hdr_ver != mp->epm_ver) {
259		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
260		    mp->epm_ep_str, hp->hdr_ver);
261		return (ETM_HDR_BADVERSION);
262	}
263
264	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
265	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
267		    mp->epm_ep_str, hp->hdr_type);
268		return (ETM_HDR_BADTYPE);
269	}
270
271	return (hp->hdr_type);
272}
273
274/*
275 * Create an ETM header of a given type in the given buffer.
276 * Return length of header.
277 */
278static size_t
279etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
280{
281	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
282
283	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
284	hp->hdr_ver = ver;
285	hp->hdr_type = type;
286	hp->hdr_msglen = htonl(msglen);
287
288	return (ETM_HDRLEN);
289}
290
291/*
292 * Convert message bytes to nvlist and post to fmd.
293 * Return zero for success, non-zero for failure.
294 *
295 * Note : nvl is free'd by fmd.
296 */
297static int
298etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
299{
300	nvlist_t *nvl;
301	int rv;
302
303	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304		fmd_hdl_error(hdl, "failed to unpack message");
305		return (1);
306	}
307
308	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
309	if (rv == ETM_XPORT_FILTER_DROP) {
310		fmd_hdl_debug(hdl, "post_filter dropped event");
311		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
312		nvlist_free(nvl);
313		return (0);
314	} else if (rv == ETM_XPORT_FILTER_ERROR) {
315		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
316		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
317		/* Still post event */
318	}
319
320	(void) pthread_mutex_lock(&mp->epm_lock);
321	(void) pthread_mutex_lock(&Etm_mod_lock);
322	if (!Etm_exit) {
323		(void) pthread_mutex_unlock(&Etm_mod_lock);
324		if (mp->epm_qstat == Q_OPEN) {
325			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
326			rv = 0;
327		} else if (mp->epm_qstat == Q_SUSPENDED) {
328			fmd_xprt_resume(hdl, mp->epm_xprthdl);
329			if (mp->epm_timer_in_use) {
330				fmd_timer_remove(hdl, mp->epm_timer_id);
331				mp->epm_timer_in_use = 0;
332			}
333			mp->epm_qstat = Q_OPEN;
334			fmd_hdl_debug(hdl, "queue resumed for %s",
335			    mp->epm_ep_str);
336			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
337			rv = 0;
338		} else {
339			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
340			    mp->epm_qstat);
341			nvlist_free(nvl);
342			/* Remote peer will attempt to resend event */
343			rv = 2;
344		}
345	} else {
346		(void) pthread_mutex_unlock(&Etm_mod_lock);
347		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348		nvlist_free(nvl);
349		/* Remote peer will attempt to resend event */
350		rv = 3;
351	}
352
353	(void) pthread_mutex_unlock(&mp->epm_lock);
354
355	return (rv);
356}
357
358/*
359 * Handle the startup handshake to the server.  The client always initiates
360 * the startup handshake.  In the following sequence, we are the client and
361 * the remote endpoint is the server.
362 *
363 *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
364 *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
365 *	Client sends ACK and transitions to Q_OPEN state.
366 *	Server receives ACK and transitions to Q_OPEN state.
367 *
368 * Return 0 for success, nonzero for failure.
369 */
370static int
371etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
372{
373	etm_proto_hdr_t *hp;
374	size_t hdrlen = ETM_HDRLEN;
375	int hdrstat;
376	char hbuf[ETM_HDRLEN];
377
378	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
379		return (1);
380
381	mp->epm_cstat = C_OPEN;
382
383	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
384
385	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
386	    hdrlen)) != hdrlen) {
387		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
388		    mp->epm_ep_str);
389		return (2);
390	}
391
392	mp->epm_qstat = Q_INIT_PENDING;
393
394	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
395	    hdrlen)) != hdrlen) {
396		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
397		    mp->epm_ep_str);
398		return (3);
399	}
400
401	hdrstat = etm_check_hdr(hdl, mp, hbuf);
402
403	if (hdrstat != ETM_HDR_S_HELLO) {
404		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
405		    "from %s", mp->epm_ep_str);
406		return (4);
407	}
408
409	/*
410	 * Get version from the server.
411	 * Currently, only one version is supported.
412	 */
413	hp = (etm_proto_hdr_t *)(void *)hbuf;
414	if (hp->hdr_ver != ETM_PROTO_V1) {
415		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
416		    mp->epm_ep_str, hp->hdr_ver);
417		return (5);
418	}
419	mp->epm_ver = hp->hdr_ver;
420
421	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
422
423	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
424	    hdrlen)) != hdrlen) {
425		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
426		    mp->epm_ep_str);
427		return (6);
428	}
429
430	/*
431	 * Call fmd_xprt_open and fmd_xprt_setspecific with
432	 * Etm_mod_lock held to avoid race with etm_send thread.
433	 */
434	(void) pthread_mutex_lock(&Etm_mod_lock);
435	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
436	    mp->epm_ep_nvl, NULL)) == NULL) {
437		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
438		    mp->epm_ep_str);
439	}
440	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
441	(void) pthread_mutex_unlock(&Etm_mod_lock);
442
443	mp->epm_qstat = Q_OPEN;
444	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
445
446	return (0);
447}
448
449/*
450 * Open a connection to the peer, send a SHUTDOWN message,
451 * and close the connection.
452 */
453static void
454etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
455{
456	size_t hdrlen = ETM_HDRLEN;
457	char hbuf[ETM_HDRLEN];
458
459	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
460		return;
461
462	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
463
464	(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
465
466	(void) etm_xport_close(hdl, mp->epm_oconn);
467	mp->epm_oconn = NULL;
468}
469
470/*
471 * Alloc a nvlist and add a string for the endpoint.
472 * Return zero for success, non-zero for failure.
473 */
474static int
475etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
476{
477	/*
478	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
479	 * in fmd when this nvlist_t is free'd.
480	 */
481	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
482
483	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
484		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
485		    "for %s", mp->epm_ep_str);
486		nvlist_free(mp->epm_ep_nvl);
487		return (1);
488	}
489
490	return (0);
491}
492
493/*
494 * Free the nvlist for the endpoint_id string.
495 */
496/*ARGSUSED*/
497static void
498etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
499{
500	nvlist_free(mp->epm_ep_nvl);
501}
502
503/*
504 * Check for a duplicate endpoint/peer string.
505 */
506/*ARGSUSED*/
507static int
508etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
509{
510	etm_epmap_t *mp;
511
512	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
513		if (strcmp(epname, mp->epm_ep_str) == 0)
514			return (1);
515
516	return (0);
517}
518
519/*
520 * Attempt to re-open a connection with the remote endpoint.
521 */
522static void
523etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
524{
525	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
526		if (gethrtime() < mp->epm_reconn_end) {
527			if ((mp->epm_oconn = etm_xport_open(hdl,
528			    mp->epm_tlhdl)) == NULL) {
529				fmd_hdl_debug(hdl, "reconnect failed for %s",
530				    mp->epm_ep_str);
531				mp->epm_timer_id = fmd_timer_install(hdl, mp,
532				    NULL, Reconn_interval);
533				mp->epm_timer_in_use = 1;
534			} else {
535				fmd_hdl_debug(hdl, "reconnect success for %s",
536				    mp->epm_ep_str);
537				mp->epm_reconn_end = 0;
538				mp->epm_cstat = C_OPEN;
539			}
540		} else {
541			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
542			    mp->epm_ep_str);
543			mp->epm_reconn_end = 0;
544			mp->epm_cstat = C_TIMED_OUT;
545		}
546	}
547
548	if (mp->epm_cstat == C_OPEN) {
549		fmd_xprt_resume(hdl, mp->epm_xprthdl);
550		mp->epm_qstat = Q_OPEN;
551		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
552	}
553}
554
555/*
556 * Suspend a given connection and setup for reconnection retries.
557 * Assume caller holds lock on epm_lock.
558 */
559static void
560etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
561{
562	(void) pthread_mutex_lock(&Etm_mod_lock);
563	if (Etm_exit) {
564		(void) pthread_mutex_unlock(&Etm_mod_lock);
565		return;
566	}
567	(void) pthread_mutex_unlock(&Etm_mod_lock);
568
569	if (mp->epm_oconn != NULL) {
570		(void) etm_xport_close(hdl, mp->epm_oconn);
571		mp->epm_oconn = NULL;
572	}
573
574	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
575	mp->epm_cstat = C_UNINITIALIZED;
576
577	if (mp->epm_xprthdl != NULL) {
578		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
579		mp->epm_qstat = Q_SUSPENDED;
580		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
581
582		if (mp->epm_timer_in_use == 0) {
583			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
584			    Reconn_interval);
585			mp->epm_timer_in_use = 1;
586		}
587	}
588}
589
590/*
591 * Reinitialize the connection. The old fmd_xprt_t handle must be
592 * removed/closed first.
593 * Assume caller holds lock on epm_lock.
594 */
595static void
596etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
597{
598	/*
599	 * To avoid a deadlock, wait for etm_send to finish before
600	 * calling fmd_xprt_close()
601	 */
602	while (mp->epm_txbusy)
603		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
604
605	if (mp->epm_xprthdl != NULL) {
606		fmd_xprt_close(hdl, mp->epm_xprthdl);
607		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
608		mp->epm_xprthdl = NULL;
609		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
610		mp->epm_ep_nvl = NULL;
611	}
612
613	if (mp->epm_timer_in_use) {
614		fmd_timer_remove(hdl, mp->epm_timer_id);
615		mp->epm_timer_in_use = 0;
616	}
617
618	if (mp->epm_oconn != NULL) {
619		(void) etm_xport_close(hdl, mp->epm_oconn);
620		mp->epm_oconn = NULL;
621	}
622
623	mp->epm_cstat = C_UNINITIALIZED;
624	mp->epm_qstat = Q_UNINITIALIZED;
625}
626
627/*
628 * Receive data from ETM transport layer.
629 * Note : This is not the fmdo_recv entry point.
630 *
631 */
632static int
633etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
634{
635	size_t buflen, hdrlen;
636	void *buf;
637	char hbuf[ETM_HDRLEN];
638	int hdrstat, rv;
639
640	hdrlen = ETM_HDRLEN;
641
642	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
643		fmd_hdl_debug(hdl, "failed to read header from %s",
644		    mp->epm_ep_str);
645		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
646		return (EIO);
647	}
648
649	hdrstat = etm_check_hdr(hdl, mp, hbuf);
650
651	switch (hdrstat) {
652	case ETM_HDR_INVALID:
653		(void) pthread_mutex_lock(&mp->epm_lock);
654		if (mp->epm_cstat == C_OPEN)
655			mp->epm_cstat = C_CLOSED;
656		(void) pthread_mutex_unlock(&mp->epm_lock);
657
658		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
659		rv = ECANCELED;
660		break;
661
662	case ETM_HDR_BADTYPE:
663	case ETM_HDR_BADVERSION:
664		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
665
666		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
667		    hdrlen)) != hdrlen) {
668			fmd_hdl_debug(hdl, "failed to write NAK to %s",
669			    mp->epm_ep_str);
670			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
671			return (EIO);
672		}
673
674		(void) pthread_mutex_lock(&mp->epm_lock);
675		mp->epm_cstat = C_LIMBO;
676		(void) pthread_mutex_unlock(&mp->epm_lock);
677
678		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
679		rv = ENOTSUP;
680		break;
681
682	case ETM_HDR_C_HELLO:
683		/* Client is initiating a startup handshake */
684		(void) pthread_mutex_lock(&mp->epm_lock);
685		etm_reinit(hdl, mp);
686		mp->epm_qstat = Q_INIT_PENDING;
687		(void) pthread_mutex_unlock(&mp->epm_lock);
688
689		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
690
691		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
692		    hdrlen)) != hdrlen) {
693			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
694			    mp->epm_ep_str);
695			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
696			return (EIO);
697		}
698
699		rv = 0;
700		break;
701
702	case ETM_HDR_ACK:
703		(void) pthread_mutex_lock(&mp->epm_lock);
704		if (mp->epm_qstat == Q_INIT_PENDING) {
705			/* This is client's ACK from startup handshake */
706			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
707			if (mp->epm_ep_nvl == NULL)
708				(void) etm_get_ep_nvl(hdl, mp);
709
710			/*
711			 * Call fmd_xprt_open and fmd_xprt_setspecific with
712			 * Etm_mod_lock held to avoid race with etm_send thread.
713			 */
714			(void) pthread_mutex_lock(&Etm_mod_lock);
715			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
716			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
717				fmd_hdl_abort(hdl, "Failed to init xprthdl "
718				    "for %s", mp->epm_ep_str);
719			}
720			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
721			(void) pthread_mutex_unlock(&Etm_mod_lock);
722
723			mp->epm_qstat = Q_OPEN;
724			(void) pthread_mutex_unlock(&mp->epm_lock);
725			fmd_hdl_debug(hdl, "queue open for %s",
726			    mp->epm_ep_str);
727		} else {
728			(void) pthread_mutex_unlock(&mp->epm_lock);
729			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
730			    "from %s\n", mp->epm_ep_str);
731			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
732		}
733
734		rv = 0;
735		break;
736
737	case ETM_HDR_SHUTDOWN:
738		fmd_hdl_debug(hdl, "received shutdown from %s",
739		    mp->epm_ep_str);
740
741		(void) pthread_mutex_lock(&mp->epm_lock);
742
743		etm_reinit(hdl, mp);
744
745		if (IS_CLIENT(mp)) {
746			/*
747			 * A server shutdown is considered to be temporary.
748			 * Prepare for reconnection.
749			 */
750			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
751			    Reconn_interval);
752
753			mp->epm_timer_in_use = 1;
754		}
755
756		(void) pthread_mutex_unlock(&mp->epm_lock);
757
758		rv = ECANCELED;
759		break;
760
761	case ETM_HDR_MSG:
762		(void) pthread_mutex_lock(&mp->epm_lock);
763		if (mp->epm_qstat == Q_UNINITIALIZED) {
764			/* Peer (client) is unaware that we've restarted */
765			(void) pthread_mutex_unlock(&mp->epm_lock);
766			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
767			    ETM_HDR_S_RESTART, 0);
768
769			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
770			    hdrlen)) != hdrlen) {
771				fmd_hdl_debug(hdl, "failed to write S_RESTART "
772				    "to %s", mp->epm_ep_str);
773				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
774				return (EIO);
775			}
776
777			return (ECANCELED);
778		}
779		(void) pthread_mutex_unlock(&mp->epm_lock);
780
781		buflen = etm_get_msglen(hbuf);
782		ALLOC_BUF(hdl, buf, buflen);
783
784		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
785		    buflen) != buflen) {
786			fmd_hdl_debug(hdl, "failed to read message from %s",
787			    mp->epm_ep_str);
788			FREE_BUF(hdl, buf, buflen);
789			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
790			return (EIO);
791		}
792
793		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
794		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
795
796		etm_hex_dump(hdl, buf, buflen, 0);
797
798		if (etm_post_msg(hdl, mp, buf, buflen)) {
799			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
800			FREE_BUF(hdl, buf, buflen);
801			return (EIO);
802		}
803
804		FREE_BUF(hdl, buf, buflen);
805
806		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
807
808		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
809		    hdrlen)) != hdrlen) {
810			fmd_hdl_debug(hdl, "failed to write ACK to %s",
811			    mp->epm_ep_str);
812			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
813			return (EIO);
814		}
815
816		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
817
818		/*
819		 * If we got this far and the current state of the
820		 * outbound/sending connection is TIMED_OUT or
821		 * LIMBO, then we should reinitialize it.
822		 */
823		(void) pthread_mutex_lock(&mp->epm_lock);
824		if (mp->epm_cstat == C_TIMED_OUT ||
825		    mp->epm_cstat == C_LIMBO) {
826			if (mp->epm_oconn != NULL) {
827				(void) etm_xport_close(hdl, mp->epm_oconn);
828				mp->epm_oconn = NULL;
829			}
830			mp->epm_cstat = C_UNINITIALIZED;
831			fmd_xprt_resume(hdl, mp->epm_xprthdl);
832			if (mp->epm_timer_in_use) {
833				fmd_timer_remove(hdl, mp->epm_timer_id);
834				mp->epm_timer_in_use = 0;
835			}
836			mp->epm_qstat = Q_OPEN;
837			fmd_hdl_debug(hdl, "queue resumed for %s",
838			    mp->epm_ep_str);
839		}
840		(void) pthread_mutex_unlock(&mp->epm_lock);
841
842		rv = 0;
843		break;
844
845	default:
846		fmd_hdl_debug(hdl, "protocol error, unexpected header "
847		    "from %s : %d", mp->epm_ep_str, hdrstat);
848		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
849		rv = 0;
850	}
851
852	return (rv);
853}
854
855/*
856 * ETM transport layer callback function.
857 * The transport layer calls this function to :
858 *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
859 *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
860 */
861static int
862etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
863    void *arg)
864{
865	etm_epmap_t *mp = (etm_epmap_t *)arg;
866	int rv = 0;
867
868	(void) pthread_mutex_lock(&Etm_mod_lock);
869	if (Etm_exit) {
870		(void) pthread_mutex_unlock(&Etm_mod_lock);
871		return (ECANCELED);
872	}
873	(void) pthread_mutex_unlock(&Etm_mod_lock);
874
875	switch (flag) {
876	case ETM_CBFLAG_RECV:
877		rv = etm_recv(hdl, conn, mp);
878		break;
879	case ETM_CBFLAG_REINIT:
880		(void) pthread_mutex_lock(&mp->epm_lock);
881		etm_reinit(hdl, mp);
882		etm_send_shutdown(hdl, mp);
883		(void) pthread_mutex_unlock(&mp->epm_lock);
884		/*
885		 * Return ECANCELED so the transport layer will close the
886		 * server connection.  The transport layer is responsible for
887		 * reestablishing this connection (should a connection request
888		 * arrive from the peer).
889		 */
890		rv = ECANCELED;
891		break;
892	default:
893		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
894		rv = ENOTSUP;
895	}
896
897	return (rv);
898}
899
900/*
901 * Allocate and initialize an etm_epmap_t struct for the given endpoint
902 * name string.
903 */
904static void
905etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
906{
907	etm_epmap_t *newmap;
908
909	if (etm_check_dup_ep_str(hdl, epname)) {
910		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
911		return;
912	}
913
914	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
915	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
916	newmap->epm_xprtflags = flags;
917	newmap->epm_cstat = C_UNINITIALIZED;
918	newmap->epm_qstat = Q_UNINITIALIZED;
919	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
920	newmap->epm_txbusy = 0;
921
922	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
923	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
924
925	if (etm_get_ep_nvl(hdl, newmap)) {
926		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
927		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
928		return;
929	}
930
931	(void) pthread_mutex_lock(&newmap->epm_lock);
932
933	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
934	    etm_cb_func, newmap)) == NULL) {
935		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
936		    newmap->epm_ep_str);
937		etm_free_ep_nvl(hdl, newmap);
938		(void) pthread_mutex_unlock(&newmap->epm_lock);
939		(void) pthread_mutex_destroy(&newmap->epm_lock);
940		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
941		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
942		return;
943	}
944
945	if (IS_CLIENT(newmap)) {
946		if (etm_handle_startup(hdl, newmap)) {
947			/*
948			 * For whatever reason, we could not complete the
949			 * startup handshake with the server.  Set the timer
950			 * and try again.
951			 */
952			if (newmap->epm_oconn != NULL) {
953				(void) etm_xport_close(hdl, newmap->epm_oconn);
954				newmap->epm_oconn = NULL;
955			}
956			newmap->epm_cstat = C_UNINITIALIZED;
957			newmap->epm_qstat = Q_UNINITIALIZED;
958			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
959			    NULL, Reconn_interval);
960			newmap->epm_timer_in_use = 1;
961		}
962	} else {
963		/*
964		 * We may be restarting after a crash.  If so, the client
965		 * may be unaware of this.
966		 */
967		etm_send_shutdown(hdl, newmap);
968	}
969
970	/* Add this transport instance handle to the list */
971	newmap->epm_next = Epmap_head;
972	Epmap_head = newmap;
973
974	(void) pthread_mutex_unlock(&newmap->epm_lock);
975
976	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
977}
978
979/*
980 * Parse the given property list string and call etm_init_epmap
981 * for each endpoint.
982 */
983static void
984etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
985{
986	char *epstr, *ep, *prefix, *lasts, *numstr;
987	char epname[MAXPATHLEN];
988	size_t slen, nlen;
989	int beg, end, i;
990
991	if (eplist == NULL)
992		return;
993	/*
994	 * Create a copy of eplist for parsing.
995	 * strtok/strtok_r(3C) will insert null chars to the string.
996	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
997	 */
998	slen = strlen(eplist);
999	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
1000	(void) strcpy(epstr, eplist);
1001
1002	/*
1003	 * The following are supported for the "client_list" and
1004	 * "server_list" properties :
1005	 *
1006	 *    A space-separated list of endpoints.
1007	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
1008	 *
1009	 *    An array syntax for a range of instances.
1010	 *	"dev:///dom[0:2]"
1011	 *
1012	 *    A combination of both.
1013	 *	"dev:///dom0 dev:///dom[1:2]"
1014	 */
1015	ep = strtok_r(epstr, " ", &lasts);
1016	while (ep != NULL) {
1017		if (strchr(ep, '[') != NULL) {
1018			/*
1019			 * This string is using array syntax.
1020			 * Check the string for correct syntax.
1021			 */
1022			if ((strchr(ep, ':') == NULL) ||
1023			    (strchr(ep, ']') == NULL)) {
1024				fmd_hdl_error(hdl, "Syntax error in property "
1025				    "that includes : %s\n", ep);
1026				ep = strtok_r(NULL, " ", &lasts);
1027				continue;
1028			}
1029
1030			/* expand the array syntax */
1031			prefix = strtok(ep, "[");
1032
1033			numstr = strtok(NULL, ":");
1034			if ((numstr == NULL) || (!isdigit(*numstr))) {
1035				fmd_hdl_error(hdl, "Syntax error in property "
1036				    "that includes : %s[\n", prefix);
1037				ep = strtok_r(NULL, " ", &lasts);
1038				continue;
1039			}
1040			beg = atoi(numstr);
1041
1042			numstr = strtok(NULL, "]");
1043			if ((numstr == NULL) || (!isdigit(*numstr))) {
1044				fmd_hdl_error(hdl, "Syntax error in property "
1045				    "that includes : %s[\n", prefix);
1046				ep = strtok_r(NULL, " ", &lasts);
1047				continue;
1048			}
1049			end = atoi(numstr);
1050
1051			nlen = strlen(prefix) + ETM_EP_INST_MAX;
1052
1053			if (nlen > MAXPATHLEN) {
1054				fmd_hdl_error(hdl, "Endpoint prop string "
1055				    "exceeds MAXPATHLEN\n");
1056				ep = strtok_r(NULL, " ", &lasts);
1057				continue;
1058			}
1059
1060			for (i = beg; i <= end; i++) {
1061				bzero(epname, MAXPATHLEN);
1062				(void) snprintf(epname, nlen, "%s%d",
1063				    prefix, i);
1064				etm_init_epmap(hdl, epname, flags);
1065			}
1066		} else {
1067			etm_init_epmap(hdl, ep, flags);
1068		}
1069
1070		ep = strtok_r(NULL, " ", &lasts);
1071	}
1072
1073	fmd_hdl_free(hdl, epstr, slen + 1);
1074}
1075
1076/*
1077 * Free the transport infrastructure for an endpoint.
1078 */
1079static void
1080etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1081{
1082	size_t hdrlen;
1083	char hbuf[ETM_HDRLEN];
1084
1085	(void) pthread_mutex_lock(&mp->epm_lock);
1086
1087	/*
1088	 * If an etm_send thread is in progress, wait for it to finish.
1089	 * The etm_recv thread is managed by the transport layer and will
1090	 * be destroyed with etm_xport_fini().
1091	 */
1092	while (mp->epm_txbusy)
1093		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1094
1095	if (mp->epm_timer_in_use)
1096		fmd_timer_remove(hdl, mp->epm_timer_id);
1097
1098	if (mp->epm_oconn != NULL) {
1099		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1100		    ETM_HDR_SHUTDOWN, 0);
1101		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1102		    hdrlen);
1103		(void) etm_xport_close(hdl, mp->epm_oconn);
1104		mp->epm_oconn = NULL;
1105	}
1106
1107	if (mp->epm_xprthdl != NULL) {
1108		fmd_xprt_close(hdl, mp->epm_xprthdl);
1109		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1110		mp->epm_ep_nvl = NULL;
1111	}
1112
1113	if (mp->epm_ep_nvl != NULL)
1114		etm_free_ep_nvl(hdl, mp);
1115
1116	if (mp->epm_tlhdl != NULL)
1117		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1118
1119	(void) pthread_mutex_unlock(&mp->epm_lock);
1120	(void) pthread_mutex_destroy(&mp->epm_lock);
1121	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1122	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1123	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1124}
1125
1126/*
1127 * FMD entry points
1128 */
1129
1130/*
1131 * FMD fmdo_send entry point.
1132 * Send an event to the remote endpoint and receive an ACK.
1133 */
1134static int
1135etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1136{
1137	etm_epmap_t *mp;
1138	nvlist_t *msgnvl;
1139	int hdrstat, rv, cnt = 0;
1140	char *buf, *nvbuf, *class;
1141	size_t nvsize, buflen, hdrlen;
1142	struct timespec tms;
1143
1144	(void) pthread_mutex_lock(&Etm_mod_lock);
1145	if (Etm_exit) {
1146		(void) pthread_mutex_unlock(&Etm_mod_lock);
1147		return (FMD_SEND_RETRY);
1148	}
1149	(void) pthread_mutex_unlock(&Etm_mod_lock);
1150
1151	mp = fmd_xprt_getspecific(hdl, xprthdl);
1152
1153	for (;;) {
1154		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1155			break;
1156		} else {
1157			/*
1158			 * Another thread may be (1) trying to close this
1159			 * fmd_xprt_t, or (2) posting an event to it.
1160			 * If (1), don't want to spend too much time here.
1161			 * If (2), allow it to finish and release epm_lock.
1162			 */
1163			if (cnt++ < 10) {
1164				tms.tv_sec = 0;
1165				tms.tv_nsec = (cnt * 10000);
1166				(void) nanosleep(&tms, NULL);
1167
1168			} else {
1169				return (FMD_SEND_RETRY);
1170			}
1171		}
1172	}
1173
1174	mp->epm_txbusy++;
1175
1176	if (mp->epm_qstat == Q_UNINITIALIZED) {
1177		mp->epm_txbusy--;
1178		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1179		(void) pthread_mutex_unlock(&mp->epm_lock);
1180		return (FMD_SEND_FAILED);
1181	}
1182
1183	if (mp->epm_cstat == C_CLOSED) {
1184		etm_suspend_reconnect(hdl, mp);
1185		mp->epm_txbusy--;
1186		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1187		(void) pthread_mutex_unlock(&mp->epm_lock);
1188		return (FMD_SEND_RETRY);
1189	}
1190
1191	if (mp->epm_cstat == C_LIMBO) {
1192		if (mp->epm_oconn != NULL) {
1193			(void) etm_xport_close(hdl, mp->epm_oconn);
1194			mp->epm_oconn = NULL;
1195		}
1196
1197		fmd_xprt_suspend(hdl, xprthdl);
1198		mp->epm_qstat = Q_SUSPENDED;
1199		mp->epm_txbusy--;
1200		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1201		(void) pthread_mutex_unlock(&mp->epm_lock);
1202		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1203		return (FMD_SEND_RETRY);
1204	}
1205
1206	if (mp->epm_oconn == NULL) {
1207		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1208		    == NULL) {
1209			etm_suspend_reconnect(hdl, mp);
1210			mp->epm_txbusy--;
1211			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1212			(void) pthread_mutex_unlock(&mp->epm_lock);
1213			return (FMD_SEND_RETRY);
1214		} else {
1215			mp->epm_cstat = C_OPEN;
1216		}
1217	}
1218
1219	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1220		fmd_hdl_abort(hdl, "No class string in nvlist");
1221
1222	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1223	if (msgnvl == NULL) {
1224		mp->epm_txbusy--;
1225		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1226		(void) pthread_mutex_unlock(&mp->epm_lock);
1227		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1228		    (void *) ep);
1229		return (FMD_SEND_FAILED);
1230	}
1231
1232	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1233	if (rv == ETM_XPORT_FILTER_DROP) {
1234		mp->epm_txbusy--;
1235		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1236		(void) pthread_mutex_unlock(&mp->epm_lock);
1237		fmd_hdl_debug(hdl, "send_filter dropped event");
1238		nvlist_free(msgnvl);
1239		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1240		return (FMD_SEND_SUCCESS);
1241	} else if (rv == ETM_XPORT_FILTER_ERROR) {
1242		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1243		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1244		/* Still send event */
1245	}
1246
1247	(void) pthread_mutex_unlock(&mp->epm_lock);
1248
1249	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1250
1251	hdrlen = ETM_HDRLEN;
1252	buflen = nvsize + hdrlen;
1253
1254	ALLOC_BUF(hdl, buf, buflen);
1255
1256	nvbuf = buf + hdrlen;
1257
1258	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1259
1260	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1261		(void) pthread_mutex_lock(&mp->epm_lock);
1262		mp->epm_txbusy--;
1263		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1264		(void) pthread_mutex_unlock(&mp->epm_lock);
1265		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1266		nvlist_free(msgnvl);
1267		FREE_BUF(hdl, buf, buflen);
1268		return (FMD_SEND_FAILED);
1269	}
1270
1271	nvlist_free(msgnvl);
1272
1273	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1274	    buflen) != buflen) {
1275		fmd_hdl_debug(hdl, "failed to send message to %s",
1276		    mp->epm_ep_str);
1277		(void) pthread_mutex_lock(&mp->epm_lock);
1278		etm_suspend_reconnect(hdl, mp);
1279		mp->epm_txbusy--;
1280		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1281		(void) pthread_mutex_unlock(&mp->epm_lock);
1282		FREE_BUF(hdl, buf, buflen);
1283		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1284		return (FMD_SEND_RETRY);
1285	}
1286
1287	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1288	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1289
1290	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1291
1292	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1293	    hdrlen) != hdrlen) {
1294		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1295		    mp->epm_ep_str);
1296		(void) pthread_mutex_lock(&mp->epm_lock);
1297		etm_suspend_reconnect(hdl, mp);
1298		mp->epm_txbusy--;
1299		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1300		(void) pthread_mutex_unlock(&mp->epm_lock);
1301		FREE_BUF(hdl, buf, buflen);
1302		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1303		return (FMD_SEND_RETRY);
1304	}
1305
1306	hdrstat = etm_check_hdr(hdl, mp, buf);
1307	FREE_BUF(hdl, buf, buflen);
1308
1309	if (hdrstat == ETM_HDR_ACK) {
1310		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1311	} else {
1312		(void) pthread_mutex_lock(&mp->epm_lock);
1313
1314		(void) etm_xport_close(hdl, mp->epm_oconn);
1315		mp->epm_oconn = NULL;
1316
1317		if (hdrstat == ETM_HDR_NAK) {
1318			/* Peer received a bad value in the header */
1319			if (mp->epm_xprthdl != NULL) {
1320				mp->epm_cstat = C_LIMBO;
1321				fmd_xprt_suspend(hdl, xprthdl);
1322				mp->epm_qstat = Q_SUSPENDED;
1323				fmd_hdl_debug(hdl, "received NAK, queue "
1324				    "suspended for %s", mp->epm_ep_str);
1325			}
1326
1327			rv = FMD_SEND_RETRY;
1328
1329		} else if (hdrstat == ETM_HDR_S_RESTART) {
1330			/* Server has restarted */
1331			mp->epm_cstat = C_CLOSED;
1332			mp->epm_qstat = Q_UNINITIALIZED;
1333			fmd_hdl_debug(hdl, "server %s restarted",
1334			    mp->epm_ep_str);
1335			/*
1336			 * Cannot call fmd_xprt_close here, so we'll do it
1337			 * on the timeout thread.
1338			 */
1339			if (mp->epm_timer_in_use == 0) {
1340				mp->epm_timer_id = fmd_timer_install(
1341				    hdl, mp, NULL, 0);
1342				mp->epm_timer_in_use = 1;
1343			}
1344
1345			/*
1346			 * fault.* or list.* events will be replayed if a
1347			 * transport is opened with the same auth.
1348			 * Other events will be discarded.
1349			 */
1350			rv = FMD_SEND_FAILED;
1351
1352		} else {
1353			mp->epm_cstat = C_CLOSED;
1354			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1355
1356			rv = FMD_SEND_RETRY;
1357		}
1358
1359		mp->epm_txbusy--;
1360
1361		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1362		(void) pthread_mutex_unlock(&mp->epm_lock);
1363
1364		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1365
1366		return (rv);
1367	}
1368
1369	(void) pthread_mutex_lock(&mp->epm_lock);
1370	mp->epm_txbusy--;
1371	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1372	(void) pthread_mutex_unlock(&mp->epm_lock);
1373
1374	return (FMD_SEND_SUCCESS);
1375}
1376
1377/*
1378 * FMD fmdo_timeout entry point..
1379 */
1380/*ARGSUSED*/
1381static void
1382etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1383{
1384	etm_epmap_t *mp = (etm_epmap_t *)data;
1385
1386	(void) pthread_mutex_lock(&mp->epm_lock);
1387
1388	mp->epm_timer_in_use = 0;
1389
1390	if (mp->epm_qstat == Q_UNINITIALIZED) {
1391		/* Server has shutdown and we (client) need to reconnect */
1392		if (mp->epm_xprthdl != NULL) {
1393			fmd_xprt_close(hdl, mp->epm_xprthdl);
1394			fmd_hdl_debug(hdl, "queue closed for %s",
1395			    mp->epm_ep_str);
1396			mp->epm_xprthdl = NULL;
1397			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1398			mp->epm_ep_nvl = NULL;
1399		}
1400
1401		if (mp->epm_ep_nvl == NULL)
1402			(void) etm_get_ep_nvl(hdl, mp);
1403
1404		if (etm_handle_startup(hdl, mp)) {
1405			if (mp->epm_oconn != NULL) {
1406				(void) etm_xport_close(hdl, mp->epm_oconn);
1407				mp->epm_oconn = NULL;
1408			}
1409			mp->epm_cstat = C_UNINITIALIZED;
1410			mp->epm_qstat = Q_UNINITIALIZED;
1411			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1412			    Reconn_interval);
1413			mp->epm_timer_in_use = 1;
1414		}
1415	} else {
1416		etm_reconnect(hdl, mp);
1417	}
1418
1419	(void) pthread_mutex_unlock(&mp->epm_lock);
1420}
1421
1422/*
1423 * FMD Module declarations
1424 */
1425static const fmd_hdl_ops_t etm_ops = {
1426	NULL,		/* fmdo_recv */
1427	etm_timeout,	/* fmdo_timeout */
1428	NULL,		/* fmdo_close */
1429	NULL,		/* fmdo_stats */
1430	NULL,		/* fmdo_gc */
1431	etm_send,	/* fmdo_send */
1432};
1433
1434static const fmd_prop_t etm_props[] = {
1435	{ "client_list", FMD_TYPE_STRING, NULL },
1436	{ "server_list", FMD_TYPE_STRING, NULL },
1437	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1438	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1439	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1440	{ "filter_path", FMD_TYPE_STRING, NULL },
1441	{ NULL, 0, NULL }
1442};
1443
1444static const fmd_hdl_info_t etm_info = {
1445	"Event Transport Module", "2.0", &etm_ops, etm_props
1446};
1447
1448/*
1449 * Initialize the transport for use by ETM.
1450 */
1451void
1452_fmd_init(fmd_hdl_t *hdl)
1453{
1454	char *propstr;
1455
1456	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1457		return; /* invalid data in configuration file */
1458	}
1459
1460	/* Create global stats */
1461	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1462	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1463
1464	/* Get module properties */
1465	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1466	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1467	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1468
1469	propstr = fmd_prop_get_string(hdl, "client_list");
1470	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1471	fmd_prop_free_string(hdl, propstr);
1472
1473	propstr = fmd_prop_get_string(hdl, "server_list");
1474	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1475	fmd_prop_free_string(hdl, propstr);
1476
1477	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1478		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1479		fmd_hdl_unregister(hdl);
1480		return;
1481	}
1482}
1483
1484/*
1485 * Teardown the transport
1486 */
1487void
1488_fmd_fini(fmd_hdl_t *hdl)
1489{
1490	etm_epmap_t *mp, *next;
1491
1492	(void) pthread_mutex_lock(&Etm_mod_lock);
1493	Etm_exit = 1;
1494	(void) pthread_mutex_unlock(&Etm_mod_lock);
1495
1496	mp = Epmap_head;
1497
1498	while (mp) {
1499		next = mp->epm_next;
1500		etm_free_epmap(hdl, mp);
1501		mp = next;
1502	}
1503
1504	fmd_hdl_unregister(hdl);
1505}
1506