125cf1a30Sjl /* 225cf1a30Sjl * CDDL HEADER START 325cf1a30Sjl * 425cf1a30Sjl * The contents of this file are subject to the terms of the 525cf1a30Sjl * Common Development and Distribution License (the "License"). 625cf1a30Sjl * You may not use this file except in compliance with the License. 725cf1a30Sjl * 825cf1a30Sjl * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 925cf1a30Sjl * or http://www.opensolaris.org/os/licensing. 1025cf1a30Sjl * See the License for the specific language governing permissions 1125cf1a30Sjl * and limitations under the License. 1225cf1a30Sjl * 1325cf1a30Sjl * When distributing Covered Code, include this CDDL HEADER in each 1425cf1a30Sjl * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 1525cf1a30Sjl * If applicable, add the following below this CDDL HEADER, with the 1625cf1a30Sjl * fields enclosed by brackets "[]" replaced with your own identifying 1725cf1a30Sjl * information: Portions Copyright [yyyy] [name of copyright owner] 1825cf1a30Sjl * 1925cf1a30Sjl * CDDL HEADER END 2025cf1a30Sjl */ 2125cf1a30Sjl 2225cf1a30Sjl /* 2325cf1a30Sjl * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 2425cf1a30Sjl * Use is subject to license terms. 2525cf1a30Sjl */ 2625cf1a30Sjl 2725cf1a30Sjl #pragma ident "%Z%%M% %I% %E% SMI" 2825cf1a30Sjl 2925cf1a30Sjl /* 3025cf1a30Sjl * FMA Event Transport Module 3125cf1a30Sjl * 3225cf1a30Sjl * Plugin for sending/receiving FMA events to/from a remote endoint. 3325cf1a30Sjl */ 3425cf1a30Sjl 3525cf1a30Sjl #include <netinet/in.h> 3677a7fd96Sjrutt #include <errno.h> 3725cf1a30Sjl #include <sys/fm/protocol.h> 3825cf1a30Sjl #include <sys/sysmacros.h> 3925cf1a30Sjl #include <pthread.h> 4025cf1a30Sjl #include <strings.h> 4125cf1a30Sjl #include <ctype.h> 4225cf1a30Sjl #include <link.h> 4325cf1a30Sjl #include <libnvpair.h> 4425cf1a30Sjl #include "etm_xport_api.h" 4525cf1a30Sjl #include "etm_proto.h" 4625cf1a30Sjl 4725cf1a30Sjl /* 4825cf1a30Sjl * ETM declarations 4925cf1a30Sjl */ 5025cf1a30Sjl 5125cf1a30Sjl typedef enum etm_connection_status { 5225cf1a30Sjl C_UNINITIALIZED = 0, 5325cf1a30Sjl C_OPEN, /* Connection is open */ 5425cf1a30Sjl C_CLOSED, /* Connection is closed */ 5525cf1a30Sjl C_LIMBO, /* Bad value in header from peer */ 5625cf1a30Sjl C_TIMED_OUT /* Reconnection to peer timed out */ 5725cf1a30Sjl } etm_connstat_t; 5825cf1a30Sjl 5925cf1a30Sjl typedef enum etm_fmd_queue_status { 6025cf1a30Sjl Q_UNINITIALIZED = 100, 6125cf1a30Sjl Q_INIT_PENDING, /* Queue initialization in progress */ 6225cf1a30Sjl Q_OPEN, /* Queue is open */ 6325cf1a30Sjl Q_SUSPENDED /* Queue is suspended */ 6425cf1a30Sjl } etm_qstat_t; 6525cf1a30Sjl 6625cf1a30Sjl /* Per endpoint data */ 6725cf1a30Sjl typedef struct etm_endpoint_map { 6825cf1a30Sjl uint8_t epm_ver; /* Protocol version being used */ 6925cf1a30Sjl char *epm_ep_str; /* Endpoint ID string */ 7025cf1a30Sjl int epm_xprtflags; /* FMD transport open flags */ 7125cf1a30Sjl etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 7225cf1a30Sjl pthread_mutex_t epm_lock; /* Protects remainder of struct */ 7325cf1a30Sjl pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 7425cf1a30Sjl int epm_txbusy; /* Busy doing send/transmit */ 7525cf1a30Sjl fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 7625cf1a30Sjl etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 7725cf1a30Sjl nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 7825cf1a30Sjl etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 7925cf1a30Sjl etm_connstat_t epm_cstat; /* Status of connection */ 8025cf1a30Sjl id_t epm_timer_id; /* Timer id */ 8125cf1a30Sjl int epm_timer_in_use; /* Indicates if timer is in use */ 8225cf1a30Sjl hrtime_t epm_reconn_end; /* Reconnection end time */ 8325cf1a30Sjl struct etm_endpoint_map *epm_next; 8425cf1a30Sjl } etm_epmap_t; 8525cf1a30Sjl 8625cf1a30Sjl #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 8725cf1a30Sjl #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 8825cf1a30Sjl #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 8925cf1a30Sjl #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 9025cf1a30Sjl #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 9125cf1a30Sjl #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 9225cf1a30Sjl 9325cf1a30Sjl #define ALLOC_BUF(hdl, buf, size) \ 9425cf1a30Sjl buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 9525cf1a30Sjl 9625cf1a30Sjl #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 9725cf1a30Sjl 9825cf1a30Sjl #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 9925cf1a30Sjl 10025cf1a30Sjl #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 10125cf1a30Sjl (x)++; \ 10225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 10325cf1a30Sjl } 10425cf1a30Sjl 10525cf1a30Sjl #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 10625cf1a30Sjl (x)--; \ 10725cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 10825cf1a30Sjl } 10925cf1a30Sjl 11025cf1a30Sjl #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 11125cf1a30Sjl (x) += (y); \ 11225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 11325cf1a30Sjl } 11425cf1a30Sjl 11525cf1a30Sjl /* 11625cf1a30Sjl * Global variables 11725cf1a30Sjl */ 11825cf1a30Sjl static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 11925cf1a30Sjl /* Protects globals */ 12025cf1a30Sjl static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 12125cf1a30Sjl static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 12225cf1a30Sjl static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 12325cf1a30Sjl static int Etm_dump = 0; /* Enables hex dump for debug */ 12425cf1a30Sjl static int Etm_exit = 0; /* Flag for exit */ 12525cf1a30Sjl static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 12625cf1a30Sjl 12725cf1a30Sjl /* Module statistics */ 12825cf1a30Sjl static struct etm_stats { 12925cf1a30Sjl /* read counters */ 13025cf1a30Sjl fmd_stat_t read_ack; 13125cf1a30Sjl fmd_stat_t read_bytes; 13225cf1a30Sjl fmd_stat_t read_msg; 13377a7fd96Sjrutt fmd_stat_t post_filter; 13425cf1a30Sjl /* write counters */ 13525cf1a30Sjl fmd_stat_t write_ack; 13625cf1a30Sjl fmd_stat_t write_bytes; 13725cf1a30Sjl fmd_stat_t write_msg; 13877a7fd96Sjrutt fmd_stat_t send_filter; 13925cf1a30Sjl /* error counters */ 14025cf1a30Sjl fmd_stat_t error_protocol; 14125cf1a30Sjl fmd_stat_t error_drop_read; 14225cf1a30Sjl fmd_stat_t error_read; 14325cf1a30Sjl fmd_stat_t error_read_badhdr; 14425cf1a30Sjl fmd_stat_t error_write; 14577a7fd96Sjrutt fmd_stat_t error_send_filter; 14677a7fd96Sjrutt fmd_stat_t error_post_filter; 14725cf1a30Sjl /* misc */ 14825cf1a30Sjl fmd_stat_t peer_count; 14925cf1a30Sjl 15025cf1a30Sjl } Etm_stats = { 15125cf1a30Sjl /* read counters */ 15225cf1a30Sjl { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 15325cf1a30Sjl { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 15425cf1a30Sjl { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 15577a7fd96Sjrutt { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" }, 15625cf1a30Sjl /* write counters */ 15725cf1a30Sjl { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 15825cf1a30Sjl { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 15925cf1a30Sjl { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 16077a7fd96Sjrutt { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" }, 16125cf1a30Sjl /* ETM error counters */ 16225cf1a30Sjl { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 16325cf1a30Sjl { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 16425cf1a30Sjl { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 16525cf1a30Sjl { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 16625cf1a30Sjl { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 16777a7fd96Sjrutt { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" }, 16877a7fd96Sjrutt { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" }, 16925cf1a30Sjl /* ETM Misc */ 17025cf1a30Sjl { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 17125cf1a30Sjl }; 17225cf1a30Sjl 17325cf1a30Sjl /* 17425cf1a30Sjl * ETM Private functions 17525cf1a30Sjl */ 17625cf1a30Sjl 17725cf1a30Sjl /* 17825cf1a30Sjl * Hex dump for debug. 17925cf1a30Sjl */ 18025cf1a30Sjl static void 18125cf1a30Sjl etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 18225cf1a30Sjl { 18325cf1a30Sjl int i, j, k; 18425cf1a30Sjl int16_t *c; 18525cf1a30Sjl 18625cf1a30Sjl if (Etm_dump == 0) 18725cf1a30Sjl return; 18825cf1a30Sjl 18925cf1a30Sjl j = buflen / 16; /* Number of complete 8-column rows */ 19025cf1a30Sjl k = buflen % 16; /* Is there a last (non-8-column) row? */ 19125cf1a30Sjl 19225cf1a30Sjl if (direction) 19325cf1a30Sjl fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 19425cf1a30Sjl else 19525cf1a30Sjl fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 19625cf1a30Sjl 19725cf1a30Sjl fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 19825cf1a30Sjl 19925cf1a30Sjl /* Dump the complete 8-column rows */ 20025cf1a30Sjl for (i = 0; i < j; i++) { 20125cf1a30Sjl c = (int16_t *)buf + (i * 8); 20225cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 20325cf1a30Sjl *(c+0), *(c+1), *(c+2), *(c+3), 20425cf1a30Sjl *(c+4), *(c+5), *(c+6), *(c+7)); 20525cf1a30Sjl } 20625cf1a30Sjl 20725cf1a30Sjl /* Dump the last (incomplete) row */ 20825cf1a30Sjl c = (int16_t *)buf + (i * 8); 20925cf1a30Sjl switch (k) { 21025cf1a30Sjl case 4: 21125cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 21225cf1a30Sjl break; 21325cf1a30Sjl case 8: 21425cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 21525cf1a30Sjl *(c+2), *(c+3)); 21625cf1a30Sjl break; 21725cf1a30Sjl case 12: 21825cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 21925cf1a30Sjl *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 22025cf1a30Sjl break; 22125cf1a30Sjl } 22225cf1a30Sjl 22325cf1a30Sjl fmd_hdl_debug(hdl, "--- End Dump ---"); 22425cf1a30Sjl } 22525cf1a30Sjl 22625cf1a30Sjl /* 22725cf1a30Sjl * Provide the length of a message based on the data in the given ETM header. 22825cf1a30Sjl */ 22925cf1a30Sjl static size_t 23025cf1a30Sjl etm_get_msglen(void *buf) 23125cf1a30Sjl { 23225cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 23325cf1a30Sjl 23425cf1a30Sjl return (ntohl(hp->hdr_msglen)); 23525cf1a30Sjl } 23625cf1a30Sjl 23725cf1a30Sjl /* 23825cf1a30Sjl * Check the contents of the ETM header for errors. 23925cf1a30Sjl * Return the header type (hdr_type). 24025cf1a30Sjl */ 24125cf1a30Sjl static int 24225cf1a30Sjl etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 24325cf1a30Sjl { 24425cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 24525cf1a30Sjl 24625cf1a30Sjl if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 247154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 24825cf1a30Sjl ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 24925cf1a30Sjl return (ETM_HDR_INVALID); 25025cf1a30Sjl } 25125cf1a30Sjl 25225cf1a30Sjl if ((hp->hdr_type == ETM_HDR_C_HELLO) || 25325cf1a30Sjl (hp->hdr_type == ETM_HDR_S_HELLO)) { 25425cf1a30Sjl /* Until version is negotiated, other fields may be wrong */ 25525cf1a30Sjl return (hp->hdr_type); 25625cf1a30Sjl } 25725cf1a30Sjl 25825cf1a30Sjl if (hp->hdr_ver != mp->epm_ver) { 259154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 26025cf1a30Sjl mp->epm_ep_str, hp->hdr_ver); 26125cf1a30Sjl return (ETM_HDR_BADVERSION); 26225cf1a30Sjl } 26325cf1a30Sjl 26425cf1a30Sjl if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 26525cf1a30Sjl (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 266154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 26725cf1a30Sjl mp->epm_ep_str, hp->hdr_type); 26825cf1a30Sjl return (ETM_HDR_BADTYPE); 26925cf1a30Sjl } 27025cf1a30Sjl 27125cf1a30Sjl return (hp->hdr_type); 27225cf1a30Sjl } 27325cf1a30Sjl 27425cf1a30Sjl /* 27525cf1a30Sjl * Create an ETM header of a given type in the given buffer. 27625cf1a30Sjl * Return length of header. 27725cf1a30Sjl */ 27825cf1a30Sjl static size_t 27925cf1a30Sjl etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 28025cf1a30Sjl { 28125cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 28225cf1a30Sjl 28325cf1a30Sjl bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 28425cf1a30Sjl hp->hdr_ver = ver; 28525cf1a30Sjl hp->hdr_type = type; 28625cf1a30Sjl hp->hdr_msglen = htonl(msglen); 28725cf1a30Sjl 28825cf1a30Sjl return (ETM_HDRLEN); 28925cf1a30Sjl } 29025cf1a30Sjl 29125cf1a30Sjl /* 29225cf1a30Sjl * Convert message bytes to nvlist and post to fmd. 29325cf1a30Sjl * Return zero for success, non-zero for failure. 29425cf1a30Sjl * 29525cf1a30Sjl * Note : nvl is free'd by fmd. 29625cf1a30Sjl */ 29725cf1a30Sjl static int 29825cf1a30Sjl etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 29925cf1a30Sjl { 30025cf1a30Sjl nvlist_t *nvl; 30125cf1a30Sjl int rv; 30225cf1a30Sjl 30325cf1a30Sjl if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 304154b1f02Sjrutt fmd_hdl_error(hdl, "failed to unpack message"); 30525cf1a30Sjl return (1); 30625cf1a30Sjl } 30725cf1a30Sjl 30877a7fd96Sjrutt rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str); 30977a7fd96Sjrutt if (rv == ETM_XPORT_FILTER_DROP) { 31077a7fd96Sjrutt fmd_hdl_debug(hdl, "post_filter dropped event"); 31177a7fd96Sjrutt INCRSTAT(Etm_stats.post_filter.fmds_value.ui64); 31277a7fd96Sjrutt nvlist_free(nvl); 31377a7fd96Sjrutt return (0); 31477a7fd96Sjrutt } else if (rv == ETM_XPORT_FILTER_ERROR) { 31577a7fd96Sjrutt fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno)); 31677a7fd96Sjrutt INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64); 31777a7fd96Sjrutt /* Still post event */ 31877a7fd96Sjrutt } 31977a7fd96Sjrutt 32025cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 32125cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 32225cf1a30Sjl if (!Etm_exit) { 32325cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 32425cf1a30Sjl if (mp->epm_qstat == Q_OPEN) { 32525cf1a30Sjl fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 32625cf1a30Sjl rv = 0; 32725cf1a30Sjl } else if (mp->epm_qstat == Q_SUSPENDED) { 32825cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 32925cf1a30Sjl if (mp->epm_timer_in_use) { 33025cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 33125cf1a30Sjl mp->epm_timer_in_use = 0; 33225cf1a30Sjl } 33325cf1a30Sjl mp->epm_qstat = Q_OPEN; 33425cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", 33525cf1a30Sjl mp->epm_ep_str); 33625cf1a30Sjl fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 33725cf1a30Sjl rv = 0; 33825cf1a30Sjl } else { 33925cf1a30Sjl fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 34025cf1a30Sjl mp->epm_qstat); 341154b1f02Sjrutt nvlist_free(nvl); 342154b1f02Sjrutt /* Remote peer will attempt to resend event */ 34325cf1a30Sjl rv = 2; 34425cf1a30Sjl } 34525cf1a30Sjl } else { 34625cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 34725cf1a30Sjl fmd_hdl_debug(hdl, "unable to post message, module exiting"); 348154b1f02Sjrutt nvlist_free(nvl); 349154b1f02Sjrutt /* Remote peer will attempt to resend event */ 35025cf1a30Sjl rv = 3; 35125cf1a30Sjl } 35225cf1a30Sjl 35325cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 35425cf1a30Sjl 35525cf1a30Sjl return (rv); 35625cf1a30Sjl } 35725cf1a30Sjl 35825cf1a30Sjl /* 35925cf1a30Sjl * Handle the startup handshake to the server. The client always initiates 36025cf1a30Sjl * the startup handshake. In the following sequence, we are the client and 36125cf1a30Sjl * the remote endpoint is the server. 36225cf1a30Sjl * 36325cf1a30Sjl * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 36425cf1a30Sjl * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 36525cf1a30Sjl * Client sends ACK and transitions to Q_OPEN state. 36625cf1a30Sjl * Server receives ACK and transitions to Q_OPEN state. 36725cf1a30Sjl * 36825cf1a30Sjl * Return 0 for success, nonzero for failure. 36925cf1a30Sjl */ 37025cf1a30Sjl static int 37125cf1a30Sjl etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 37225cf1a30Sjl { 37325cf1a30Sjl etm_proto_hdr_t *hp; 37425cf1a30Sjl size_t hdrlen = ETM_HDRLEN; 37525cf1a30Sjl int hdrstat; 37625cf1a30Sjl char hbuf[ETM_HDRLEN]; 37725cf1a30Sjl 37825cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 37925cf1a30Sjl return (1); 38025cf1a30Sjl 38125cf1a30Sjl mp->epm_cstat = C_OPEN; 38225cf1a30Sjl 38325cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 38425cf1a30Sjl 38525cf1a30Sjl if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 38625cf1a30Sjl hdrlen)) != hdrlen) { 38725cf1a30Sjl fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 38825cf1a30Sjl mp->epm_ep_str); 38925cf1a30Sjl return (2); 39025cf1a30Sjl } 39125cf1a30Sjl 39225cf1a30Sjl mp->epm_qstat = Q_INIT_PENDING; 39325cf1a30Sjl 39425cf1a30Sjl if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 39525cf1a30Sjl hdrlen)) != hdrlen) { 39625cf1a30Sjl fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 39725cf1a30Sjl mp->epm_ep_str); 39825cf1a30Sjl return (3); 39925cf1a30Sjl } 40025cf1a30Sjl 40125cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, hbuf); 40225cf1a30Sjl 40325cf1a30Sjl if (hdrstat != ETM_HDR_S_HELLO) { 40425cf1a30Sjl fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 40525cf1a30Sjl "from %s", mp->epm_ep_str); 40625cf1a30Sjl return (4); 40725cf1a30Sjl } 40825cf1a30Sjl 40925cf1a30Sjl /* 41025cf1a30Sjl * Get version from the server. 41125cf1a30Sjl * Currently, only one version is supported. 41225cf1a30Sjl */ 41325cf1a30Sjl hp = (etm_proto_hdr_t *)(void *)hbuf; 41425cf1a30Sjl if (hp->hdr_ver != ETM_PROTO_V1) { 41525cf1a30Sjl fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 41625cf1a30Sjl mp->epm_ep_str, hp->hdr_ver); 41725cf1a30Sjl return (5); 41825cf1a30Sjl } 41925cf1a30Sjl mp->epm_ver = hp->hdr_ver; 42025cf1a30Sjl 42125cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 42225cf1a30Sjl 42325cf1a30Sjl if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 42425cf1a30Sjl hdrlen)) != hdrlen) { 42525cf1a30Sjl fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 42625cf1a30Sjl mp->epm_ep_str); 42725cf1a30Sjl return (6); 42825cf1a30Sjl } 42925cf1a30Sjl 43025cf1a30Sjl /* 43125cf1a30Sjl * Call fmd_xprt_open and fmd_xprt_setspecific with 43225cf1a30Sjl * Etm_mod_lock held to avoid race with etm_send thread. 43325cf1a30Sjl */ 43425cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 43525cf1a30Sjl if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 43625cf1a30Sjl mp->epm_ep_nvl, NULL)) == NULL) { 43725cf1a30Sjl fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 43825cf1a30Sjl mp->epm_ep_str); 43925cf1a30Sjl } 44025cf1a30Sjl fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 44125cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 44225cf1a30Sjl 44325cf1a30Sjl mp->epm_qstat = Q_OPEN; 44425cf1a30Sjl fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 44525cf1a30Sjl 44625cf1a30Sjl return (0); 44725cf1a30Sjl } 44825cf1a30Sjl 449*ac92251dSjrutt /* 450*ac92251dSjrutt * Open a connection to the peer, send a SHUTDOWN message, 451*ac92251dSjrutt * and close the connection. 452*ac92251dSjrutt */ 453*ac92251dSjrutt static void 454*ac92251dSjrutt etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp) 455*ac92251dSjrutt { 456*ac92251dSjrutt size_t hdrlen = ETM_HDRLEN; 457*ac92251dSjrutt char hbuf[ETM_HDRLEN]; 458*ac92251dSjrutt 459*ac92251dSjrutt if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 460*ac92251dSjrutt return; 461*ac92251dSjrutt 462*ac92251dSjrutt hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0); 463*ac92251dSjrutt 464*ac92251dSjrutt (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen); 465*ac92251dSjrutt 466*ac92251dSjrutt (void) etm_xport_close(hdl, mp->epm_oconn); 467*ac92251dSjrutt mp->epm_oconn = NULL; 468*ac92251dSjrutt } 469*ac92251dSjrutt 47025cf1a30Sjl /* 47125cf1a30Sjl * Alloc a nvlist and add a string for the endpoint. 47225cf1a30Sjl * Return zero for success, non-zero for failure. 47325cf1a30Sjl */ 47425cf1a30Sjl static int 47525cf1a30Sjl etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 47625cf1a30Sjl { 47725cf1a30Sjl /* 47825cf1a30Sjl * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 47925cf1a30Sjl * in fmd when this nvlist_t is free'd. 48025cf1a30Sjl */ 48125cf1a30Sjl (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 48225cf1a30Sjl 48325cf1a30Sjl if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 484154b1f02Sjrutt fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 48525cf1a30Sjl "for %s", mp->epm_ep_str); 48625cf1a30Sjl nvlist_free(mp->epm_ep_nvl); 48725cf1a30Sjl return (1); 48825cf1a30Sjl } 48925cf1a30Sjl 49025cf1a30Sjl return (0); 49125cf1a30Sjl } 49225cf1a30Sjl 49325cf1a30Sjl /* 49425cf1a30Sjl * Free the nvlist for the endpoint_id string. 49525cf1a30Sjl */ 49625cf1a30Sjl /*ARGSUSED*/ 49725cf1a30Sjl static void 49825cf1a30Sjl etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 49925cf1a30Sjl { 50025cf1a30Sjl nvlist_free(mp->epm_ep_nvl); 50125cf1a30Sjl } 50225cf1a30Sjl 50325cf1a30Sjl /* 50425cf1a30Sjl * Check for a duplicate endpoint/peer string. 50525cf1a30Sjl */ 50625cf1a30Sjl /*ARGSUSED*/ 50725cf1a30Sjl static int 50825cf1a30Sjl etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 50925cf1a30Sjl { 51025cf1a30Sjl etm_epmap_t *mp; 51125cf1a30Sjl 51225cf1a30Sjl for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 51325cf1a30Sjl if (strcmp(epname, mp->epm_ep_str) == 0) 51425cf1a30Sjl return (1); 51525cf1a30Sjl 51625cf1a30Sjl return (0); 51725cf1a30Sjl } 51825cf1a30Sjl 51925cf1a30Sjl /* 52025cf1a30Sjl * Attempt to re-open a connection with the remote endpoint. 52125cf1a30Sjl */ 52225cf1a30Sjl static void 52325cf1a30Sjl etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 52425cf1a30Sjl { 52525cf1a30Sjl if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 52625cf1a30Sjl if (gethrtime() < mp->epm_reconn_end) { 52725cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, 52825cf1a30Sjl mp->epm_tlhdl)) == NULL) { 52925cf1a30Sjl fmd_hdl_debug(hdl, "reconnect failed for %s", 53025cf1a30Sjl mp->epm_ep_str); 53125cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, 53225cf1a30Sjl NULL, Reconn_interval); 53325cf1a30Sjl mp->epm_timer_in_use = 1; 53425cf1a30Sjl } else { 53525cf1a30Sjl fmd_hdl_debug(hdl, "reconnect success for %s", 53625cf1a30Sjl mp->epm_ep_str); 53725cf1a30Sjl mp->epm_reconn_end = 0; 53825cf1a30Sjl mp->epm_cstat = C_OPEN; 53925cf1a30Sjl } 54025cf1a30Sjl } else { 54125cf1a30Sjl fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 54225cf1a30Sjl mp->epm_ep_str); 54325cf1a30Sjl mp->epm_reconn_end = 0; 54425cf1a30Sjl mp->epm_cstat = C_TIMED_OUT; 54525cf1a30Sjl } 54625cf1a30Sjl } 54725cf1a30Sjl 54825cf1a30Sjl if (mp->epm_cstat == C_OPEN) { 54925cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 55025cf1a30Sjl mp->epm_qstat = Q_OPEN; 55125cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 55225cf1a30Sjl } 55325cf1a30Sjl } 55425cf1a30Sjl 55525cf1a30Sjl /* 55625cf1a30Sjl * Suspend a given connection and setup for reconnection retries. 557154b1f02Sjrutt * Assume caller holds lock on epm_lock. 55825cf1a30Sjl */ 55925cf1a30Sjl static void 56025cf1a30Sjl etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 56125cf1a30Sjl { 56225cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 56325cf1a30Sjl if (Etm_exit) { 56425cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 56525cf1a30Sjl return; 56625cf1a30Sjl } 56725cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 56825cf1a30Sjl 56925cf1a30Sjl if (mp->epm_oconn != NULL) { 57025cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 57125cf1a30Sjl mp->epm_oconn = NULL; 57225cf1a30Sjl } 57325cf1a30Sjl 57425cf1a30Sjl mp->epm_reconn_end = gethrtime() + Reconn_timeout; 57525cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 57625cf1a30Sjl 57725cf1a30Sjl if (mp->epm_xprthdl != NULL) { 57825cf1a30Sjl fmd_xprt_suspend(hdl, mp->epm_xprthdl); 57925cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 58025cf1a30Sjl fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 58125cf1a30Sjl 58225cf1a30Sjl if (mp->epm_timer_in_use == 0) { 58325cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 58425cf1a30Sjl Reconn_interval); 58525cf1a30Sjl mp->epm_timer_in_use = 1; 58625cf1a30Sjl } 58725cf1a30Sjl } 58825cf1a30Sjl } 58925cf1a30Sjl 59025cf1a30Sjl /* 59125cf1a30Sjl * Reinitialize the connection. The old fmd_xprt_t handle must be 59225cf1a30Sjl * removed/closed first. 59325cf1a30Sjl * Assume caller holds lock on epm_lock. 59425cf1a30Sjl */ 59525cf1a30Sjl static void 59625cf1a30Sjl etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 59725cf1a30Sjl { 59825cf1a30Sjl /* 59925cf1a30Sjl * To avoid a deadlock, wait for etm_send to finish before 60025cf1a30Sjl * calling fmd_xprt_close() 60125cf1a30Sjl */ 60225cf1a30Sjl while (mp->epm_txbusy) 60325cf1a30Sjl (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 60425cf1a30Sjl 60525cf1a30Sjl if (mp->epm_xprthdl != NULL) { 60625cf1a30Sjl fmd_xprt_close(hdl, mp->epm_xprthdl); 607154b1f02Sjrutt fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 60825cf1a30Sjl mp->epm_xprthdl = NULL; 60925cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 61025cf1a30Sjl mp->epm_ep_nvl = NULL; 61125cf1a30Sjl } 61225cf1a30Sjl 61325cf1a30Sjl if (mp->epm_timer_in_use) { 61425cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 61525cf1a30Sjl mp->epm_timer_in_use = 0; 61625cf1a30Sjl } 61725cf1a30Sjl 61825cf1a30Sjl if (mp->epm_oconn != NULL) { 61925cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 62025cf1a30Sjl mp->epm_oconn = NULL; 62125cf1a30Sjl } 62225cf1a30Sjl 62325cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 62425cf1a30Sjl mp->epm_qstat = Q_UNINITIALIZED; 62525cf1a30Sjl } 62625cf1a30Sjl 62725cf1a30Sjl /* 62825cf1a30Sjl * Receive data from ETM transport layer. 62925cf1a30Sjl * Note : This is not the fmdo_recv entry point. 63025cf1a30Sjl * 63125cf1a30Sjl */ 63225cf1a30Sjl static int 63325cf1a30Sjl etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 63425cf1a30Sjl { 63525cf1a30Sjl size_t buflen, hdrlen; 63625cf1a30Sjl void *buf; 63725cf1a30Sjl char hbuf[ETM_HDRLEN]; 63825cf1a30Sjl int hdrstat, rv; 63925cf1a30Sjl 64025cf1a30Sjl hdrlen = ETM_HDRLEN; 64125cf1a30Sjl 64225cf1a30Sjl if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 64325cf1a30Sjl fmd_hdl_debug(hdl, "failed to read header from %s", 64425cf1a30Sjl mp->epm_ep_str); 64525cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 64625cf1a30Sjl return (EIO); 64725cf1a30Sjl } 64825cf1a30Sjl 64925cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, hbuf); 65025cf1a30Sjl 65125cf1a30Sjl switch (hdrstat) { 65225cf1a30Sjl case ETM_HDR_INVALID: 65325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 65425cf1a30Sjl if (mp->epm_cstat == C_OPEN) 65525cf1a30Sjl mp->epm_cstat = C_CLOSED; 65625cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 65725cf1a30Sjl 65825cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 65925cf1a30Sjl rv = ECANCELED; 66025cf1a30Sjl break; 66125cf1a30Sjl 66225cf1a30Sjl case ETM_HDR_BADTYPE: 66325cf1a30Sjl case ETM_HDR_BADVERSION: 66425cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 66525cf1a30Sjl 66625cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 66725cf1a30Sjl hdrlen)) != hdrlen) { 66825cf1a30Sjl fmd_hdl_debug(hdl, "failed to write NAK to %s", 66925cf1a30Sjl mp->epm_ep_str); 67025cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 67125cf1a30Sjl return (EIO); 67225cf1a30Sjl } 67325cf1a30Sjl 67425cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 67525cf1a30Sjl mp->epm_cstat = C_LIMBO; 67625cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 67725cf1a30Sjl 67825cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 67925cf1a30Sjl rv = ENOTSUP; 68025cf1a30Sjl break; 68125cf1a30Sjl 68225cf1a30Sjl case ETM_HDR_C_HELLO: 68325cf1a30Sjl /* Client is initiating a startup handshake */ 68425cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 68525cf1a30Sjl etm_reinit(hdl, mp); 68625cf1a30Sjl mp->epm_qstat = Q_INIT_PENDING; 68725cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 68825cf1a30Sjl 68925cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 69025cf1a30Sjl 69125cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 69225cf1a30Sjl hdrlen)) != hdrlen) { 69325cf1a30Sjl fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 69425cf1a30Sjl mp->epm_ep_str); 69525cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 69625cf1a30Sjl return (EIO); 69725cf1a30Sjl } 69825cf1a30Sjl 69925cf1a30Sjl rv = 0; 70025cf1a30Sjl break; 70125cf1a30Sjl 70225cf1a30Sjl case ETM_HDR_ACK: 70325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 70425cf1a30Sjl if (mp->epm_qstat == Q_INIT_PENDING) { 70525cf1a30Sjl /* This is client's ACK from startup handshake */ 70625cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 70725cf1a30Sjl if (mp->epm_ep_nvl == NULL) 70825cf1a30Sjl (void) etm_get_ep_nvl(hdl, mp); 70925cf1a30Sjl 71025cf1a30Sjl /* 71125cf1a30Sjl * Call fmd_xprt_open and fmd_xprt_setspecific with 71225cf1a30Sjl * Etm_mod_lock held to avoid race with etm_send thread. 71325cf1a30Sjl */ 71425cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 71525cf1a30Sjl if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 71625cf1a30Sjl mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 71725cf1a30Sjl fmd_hdl_abort(hdl, "Failed to init xprthdl " 71825cf1a30Sjl "for %s", mp->epm_ep_str); 71925cf1a30Sjl } 72025cf1a30Sjl fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 72125cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 72225cf1a30Sjl 72325cf1a30Sjl mp->epm_qstat = Q_OPEN; 72425cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 72525cf1a30Sjl fmd_hdl_debug(hdl, "queue open for %s", 72625cf1a30Sjl mp->epm_ep_str); 72725cf1a30Sjl } else { 72825cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 72925cf1a30Sjl fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 73025cf1a30Sjl "from %s\n", mp->epm_ep_str); 73125cf1a30Sjl INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 73225cf1a30Sjl } 73325cf1a30Sjl 73425cf1a30Sjl rv = 0; 73525cf1a30Sjl break; 73625cf1a30Sjl 73725cf1a30Sjl case ETM_HDR_SHUTDOWN: 73825cf1a30Sjl fmd_hdl_debug(hdl, "received shutdown from %s", 73925cf1a30Sjl mp->epm_ep_str); 74025cf1a30Sjl 74125cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 74225cf1a30Sjl 74325cf1a30Sjl etm_reinit(hdl, mp); 74425cf1a30Sjl 74525cf1a30Sjl if (IS_CLIENT(mp)) { 74625cf1a30Sjl /* 74725cf1a30Sjl * A server shutdown is considered to be temporary. 74825cf1a30Sjl * Prepare for reconnection. 74925cf1a30Sjl */ 75025cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 75125cf1a30Sjl Reconn_interval); 75225cf1a30Sjl 75325cf1a30Sjl mp->epm_timer_in_use = 1; 75425cf1a30Sjl } 75525cf1a30Sjl 75625cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 75725cf1a30Sjl 75825cf1a30Sjl rv = ECANCELED; 75925cf1a30Sjl break; 76025cf1a30Sjl 76125cf1a30Sjl case ETM_HDR_MSG: 76225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 76325cf1a30Sjl if (mp->epm_qstat == Q_UNINITIALIZED) { 76425cf1a30Sjl /* Peer (client) is unaware that we've restarted */ 76525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 76625cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 76725cf1a30Sjl ETM_HDR_S_RESTART, 0); 76825cf1a30Sjl 76925cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 77025cf1a30Sjl hdrlen)) != hdrlen) { 77125cf1a30Sjl fmd_hdl_debug(hdl, "failed to write S_RESTART " 77225cf1a30Sjl "to %s", mp->epm_ep_str); 77325cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 77425cf1a30Sjl return (EIO); 77525cf1a30Sjl } 77625cf1a30Sjl 77725cf1a30Sjl return (ECANCELED); 77825cf1a30Sjl } 77925cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 78025cf1a30Sjl 78125cf1a30Sjl buflen = etm_get_msglen(hbuf); 78225cf1a30Sjl ALLOC_BUF(hdl, buf, buflen); 78325cf1a30Sjl 78425cf1a30Sjl if (etm_xport_read(hdl, conn, Rw_timeout, buf, 78525cf1a30Sjl buflen) != buflen) { 78625cf1a30Sjl fmd_hdl_debug(hdl, "failed to read message from %s", 78725cf1a30Sjl mp->epm_ep_str); 78825cf1a30Sjl FREE_BUF(hdl, buf, buflen); 78925cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 79025cf1a30Sjl return (EIO); 79125cf1a30Sjl } 79225cf1a30Sjl 79325cf1a30Sjl INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 79425cf1a30Sjl ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 79525cf1a30Sjl 79625cf1a30Sjl etm_hex_dump(hdl, buf, buflen, 0); 79725cf1a30Sjl 79825cf1a30Sjl if (etm_post_msg(hdl, mp, buf, buflen)) { 79925cf1a30Sjl INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 80025cf1a30Sjl FREE_BUF(hdl, buf, buflen); 80125cf1a30Sjl return (EIO); 80225cf1a30Sjl } 80325cf1a30Sjl 80425cf1a30Sjl FREE_BUF(hdl, buf, buflen); 80525cf1a30Sjl 80625cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 80725cf1a30Sjl 80825cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 80925cf1a30Sjl hdrlen)) != hdrlen) { 81025cf1a30Sjl fmd_hdl_debug(hdl, "failed to write ACK to %s", 81125cf1a30Sjl mp->epm_ep_str); 81225cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 81325cf1a30Sjl return (EIO); 81425cf1a30Sjl } 81525cf1a30Sjl 81625cf1a30Sjl INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 81725cf1a30Sjl 81825cf1a30Sjl /* 81925cf1a30Sjl * If we got this far and the current state of the 82025cf1a30Sjl * outbound/sending connection is TIMED_OUT or 82125cf1a30Sjl * LIMBO, then we should reinitialize it. 82225cf1a30Sjl */ 82325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 82425cf1a30Sjl if (mp->epm_cstat == C_TIMED_OUT || 82525cf1a30Sjl mp->epm_cstat == C_LIMBO) { 82625cf1a30Sjl if (mp->epm_oconn != NULL) { 82725cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 82825cf1a30Sjl mp->epm_oconn = NULL; 82925cf1a30Sjl } 83025cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 83125cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 83225cf1a30Sjl if (mp->epm_timer_in_use) { 83325cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 83425cf1a30Sjl mp->epm_timer_in_use = 0; 83525cf1a30Sjl } 83625cf1a30Sjl mp->epm_qstat = Q_OPEN; 83725cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", 83825cf1a30Sjl mp->epm_ep_str); 83925cf1a30Sjl } 84025cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 84125cf1a30Sjl 84225cf1a30Sjl rv = 0; 84325cf1a30Sjl break; 84425cf1a30Sjl 84525cf1a30Sjl default: 84625cf1a30Sjl fmd_hdl_debug(hdl, "protocol error, unexpected header " 84725cf1a30Sjl "from %s : %d", mp->epm_ep_str, hdrstat); 84825cf1a30Sjl INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 84925cf1a30Sjl rv = 0; 85025cf1a30Sjl } 85125cf1a30Sjl 85225cf1a30Sjl return (rv); 85325cf1a30Sjl } 85425cf1a30Sjl 85525cf1a30Sjl /* 85625cf1a30Sjl * ETM transport layer callback function. 85725cf1a30Sjl * The transport layer calls this function to : 85825cf1a30Sjl * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 85925cf1a30Sjl * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 86025cf1a30Sjl */ 86125cf1a30Sjl static int 86225cf1a30Sjl etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 86325cf1a30Sjl void *arg) 86425cf1a30Sjl { 86525cf1a30Sjl etm_epmap_t *mp = (etm_epmap_t *)arg; 86625cf1a30Sjl int rv = 0; 86725cf1a30Sjl 86825cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 86925cf1a30Sjl if (Etm_exit) { 87025cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 87125cf1a30Sjl return (ECANCELED); 87225cf1a30Sjl } 87325cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 87425cf1a30Sjl 87525cf1a30Sjl switch (flag) { 87625cf1a30Sjl case ETM_CBFLAG_RECV: 87725cf1a30Sjl rv = etm_recv(hdl, conn, mp); 87825cf1a30Sjl break; 87925cf1a30Sjl case ETM_CBFLAG_REINIT: 88025cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 88125cf1a30Sjl etm_reinit(hdl, mp); 882*ac92251dSjrutt etm_send_shutdown(hdl, mp); 88325cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 88425cf1a30Sjl /* 88525cf1a30Sjl * Return ECANCELED so the transport layer will close the 88625cf1a30Sjl * server connection. The transport layer is responsible for 88725cf1a30Sjl * reestablishing this connection (should a connection request 88825cf1a30Sjl * arrive from the peer). 88925cf1a30Sjl */ 89025cf1a30Sjl rv = ECANCELED; 89125cf1a30Sjl break; 89225cf1a30Sjl default: 89325cf1a30Sjl fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 89425cf1a30Sjl rv = ENOTSUP; 89525cf1a30Sjl } 89625cf1a30Sjl 89725cf1a30Sjl return (rv); 89825cf1a30Sjl } 89925cf1a30Sjl 90025cf1a30Sjl /* 90125cf1a30Sjl * Allocate and initialize an etm_epmap_t struct for the given endpoint 90225cf1a30Sjl * name string. 90325cf1a30Sjl */ 90425cf1a30Sjl static void 90525cf1a30Sjl etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 90625cf1a30Sjl { 90725cf1a30Sjl etm_epmap_t *newmap; 90825cf1a30Sjl 90925cf1a30Sjl if (etm_check_dup_ep_str(hdl, epname)) { 91025cf1a30Sjl fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 91125cf1a30Sjl return; 91225cf1a30Sjl } 91325cf1a30Sjl 91425cf1a30Sjl newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 91525cf1a30Sjl newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 91625cf1a30Sjl newmap->epm_xprtflags = flags; 91725cf1a30Sjl newmap->epm_cstat = C_UNINITIALIZED; 91825cf1a30Sjl newmap->epm_qstat = Q_UNINITIALIZED; 91925cf1a30Sjl newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 92025cf1a30Sjl newmap->epm_txbusy = 0; 92125cf1a30Sjl 92225cf1a30Sjl (void) pthread_mutex_init(&newmap->epm_lock, NULL); 92325cf1a30Sjl (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 92425cf1a30Sjl 92525cf1a30Sjl if (etm_get_ep_nvl(hdl, newmap)) { 92625cf1a30Sjl fmd_hdl_strfree(hdl, newmap->epm_ep_str); 92725cf1a30Sjl fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 92825cf1a30Sjl return; 92925cf1a30Sjl } 93025cf1a30Sjl 931*ac92251dSjrutt (void) pthread_mutex_lock(&newmap->epm_lock); 932*ac92251dSjrutt 93325cf1a30Sjl if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 93425cf1a30Sjl etm_cb_func, newmap)) == NULL) { 93525cf1a30Sjl fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 93625cf1a30Sjl newmap->epm_ep_str); 93725cf1a30Sjl etm_free_ep_nvl(hdl, newmap); 938*ac92251dSjrutt (void) pthread_mutex_unlock(&newmap->epm_lock); 939*ac92251dSjrutt (void) pthread_mutex_destroy(&newmap->epm_lock); 94025cf1a30Sjl fmd_hdl_strfree(hdl, newmap->epm_ep_str); 94125cf1a30Sjl fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 94225cf1a30Sjl return; 94325cf1a30Sjl } 94425cf1a30Sjl 94525cf1a30Sjl if (IS_CLIENT(newmap)) { 94625cf1a30Sjl if (etm_handle_startup(hdl, newmap)) { 947154b1f02Sjrutt /* 948154b1f02Sjrutt * For whatever reason, we could not complete the 949154b1f02Sjrutt * startup handshake with the server. Set the timer 950154b1f02Sjrutt * and try again. 951154b1f02Sjrutt */ 952154b1f02Sjrutt if (newmap->epm_oconn != NULL) { 953154b1f02Sjrutt (void) etm_xport_close(hdl, newmap->epm_oconn); 954154b1f02Sjrutt newmap->epm_oconn = NULL; 955154b1f02Sjrutt } 956154b1f02Sjrutt newmap->epm_cstat = C_UNINITIALIZED; 957154b1f02Sjrutt newmap->epm_qstat = Q_UNINITIALIZED; 958154b1f02Sjrutt newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 959154b1f02Sjrutt NULL, Reconn_interval); 960154b1f02Sjrutt newmap->epm_timer_in_use = 1; 96125cf1a30Sjl } 962*ac92251dSjrutt } else { 963*ac92251dSjrutt /* 964*ac92251dSjrutt * We may be restarting after a crash. If so, the client 965*ac92251dSjrutt * may be unaware of this. 966*ac92251dSjrutt */ 967*ac92251dSjrutt etm_send_shutdown(hdl, newmap); 96825cf1a30Sjl } 96925cf1a30Sjl 97025cf1a30Sjl /* Add this transport instance handle to the list */ 97125cf1a30Sjl newmap->epm_next = Epmap_head; 97225cf1a30Sjl Epmap_head = newmap; 97325cf1a30Sjl 974*ac92251dSjrutt (void) pthread_mutex_unlock(&newmap->epm_lock); 975*ac92251dSjrutt 97625cf1a30Sjl INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 97725cf1a30Sjl } 97825cf1a30Sjl 97925cf1a30Sjl /* 98025cf1a30Sjl * Parse the given property list string and call etm_init_epmap 98125cf1a30Sjl * for each endpoint. 98225cf1a30Sjl */ 98325cf1a30Sjl static void 98425cf1a30Sjl etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 98525cf1a30Sjl { 98625cf1a30Sjl char *epstr, *ep, *prefix, *lasts, *numstr; 98725cf1a30Sjl char epname[MAXPATHLEN]; 98825cf1a30Sjl size_t slen, nlen; 98925cf1a30Sjl int beg, end, i; 99025cf1a30Sjl 99125cf1a30Sjl if (eplist == NULL) 99225cf1a30Sjl return; 99325cf1a30Sjl /* 99425cf1a30Sjl * Create a copy of eplist for parsing. 99525cf1a30Sjl * strtok/strtok_r(3C) will insert null chars to the string. 99625cf1a30Sjl * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 99725cf1a30Sjl */ 99825cf1a30Sjl slen = strlen(eplist); 99925cf1a30Sjl epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 100025cf1a30Sjl (void) strcpy(epstr, eplist); 100125cf1a30Sjl 100225cf1a30Sjl /* 100325cf1a30Sjl * The following are supported for the "client_list" and 100425cf1a30Sjl * "server_list" properties : 100525cf1a30Sjl * 100625cf1a30Sjl * A space-separated list of endpoints. 100725cf1a30Sjl * "dev:///dom0 dev:///dom1 dev:///dom2" 100825cf1a30Sjl * 100925cf1a30Sjl * An array syntax for a range of instances. 101025cf1a30Sjl * "dev:///dom[0:2]" 101125cf1a30Sjl * 101225cf1a30Sjl * A combination of both. 101325cf1a30Sjl * "dev:///dom0 dev:///dom[1:2]" 101425cf1a30Sjl */ 101525cf1a30Sjl ep = strtok_r(epstr, " ", &lasts); 101625cf1a30Sjl while (ep != NULL) { 101725cf1a30Sjl if (strchr(ep, '[') != NULL) { 101825cf1a30Sjl /* 101925cf1a30Sjl * This string is using array syntax. 102025cf1a30Sjl * Check the string for correct syntax. 102125cf1a30Sjl */ 102225cf1a30Sjl if ((strchr(ep, ':') == NULL) || 102325cf1a30Sjl (strchr(ep, ']') == NULL)) { 102425cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 102525cf1a30Sjl "that includes : %s\n", ep); 102625cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 102725cf1a30Sjl continue; 102825cf1a30Sjl } 102925cf1a30Sjl 103025cf1a30Sjl /* expand the array syntax */ 103125cf1a30Sjl prefix = strtok(ep, "["); 103225cf1a30Sjl 103325cf1a30Sjl numstr = strtok(NULL, ":"); 103425cf1a30Sjl if ((numstr == NULL) || (!isdigit(*numstr))) { 103525cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 103625cf1a30Sjl "that includes : %s[\n", prefix); 103725cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 103825cf1a30Sjl continue; 103925cf1a30Sjl } 104025cf1a30Sjl beg = atoi(numstr); 104125cf1a30Sjl 104225cf1a30Sjl numstr = strtok(NULL, "]"); 104325cf1a30Sjl if ((numstr == NULL) || (!isdigit(*numstr))) { 104425cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 104525cf1a30Sjl "that includes : %s[\n", prefix); 104625cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 104725cf1a30Sjl continue; 104825cf1a30Sjl } 104925cf1a30Sjl end = atoi(numstr); 105025cf1a30Sjl 105125cf1a30Sjl nlen = strlen(prefix) + ETM_EP_INST_MAX; 105225cf1a30Sjl 105325cf1a30Sjl if (nlen > MAXPATHLEN) { 105425cf1a30Sjl fmd_hdl_error(hdl, "Endpoint prop string " 105525cf1a30Sjl "exceeds MAXPATHLEN\n"); 105625cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 105725cf1a30Sjl continue; 105825cf1a30Sjl } 105925cf1a30Sjl 106025cf1a30Sjl for (i = beg; i <= end; i++) { 106125cf1a30Sjl bzero(epname, MAXPATHLEN); 106225cf1a30Sjl (void) snprintf(epname, nlen, "%s%d", 106325cf1a30Sjl prefix, i); 106425cf1a30Sjl etm_init_epmap(hdl, epname, flags); 106525cf1a30Sjl } 106625cf1a30Sjl } else { 106725cf1a30Sjl etm_init_epmap(hdl, ep, flags); 106825cf1a30Sjl } 106925cf1a30Sjl 107025cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 107125cf1a30Sjl } 107225cf1a30Sjl 107325cf1a30Sjl fmd_hdl_free(hdl, epstr, slen + 1); 107425cf1a30Sjl } 107525cf1a30Sjl 107625cf1a30Sjl /* 107725cf1a30Sjl * Free the transport infrastructure for an endpoint. 107825cf1a30Sjl */ 107925cf1a30Sjl static void 108025cf1a30Sjl etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 108125cf1a30Sjl { 108225cf1a30Sjl size_t hdrlen; 108325cf1a30Sjl char hbuf[ETM_HDRLEN]; 108425cf1a30Sjl 108525cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 108625cf1a30Sjl 108725cf1a30Sjl /* 108825cf1a30Sjl * If an etm_send thread is in progress, wait for it to finish. 108925cf1a30Sjl * The etm_recv thread is managed by the transport layer and will 109025cf1a30Sjl * be destroyed with etm_xport_fini(). 109125cf1a30Sjl */ 109225cf1a30Sjl while (mp->epm_txbusy) 109325cf1a30Sjl (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 109425cf1a30Sjl 109525cf1a30Sjl if (mp->epm_timer_in_use) 109625cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 109725cf1a30Sjl 109825cf1a30Sjl if (mp->epm_oconn != NULL) { 109925cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 110025cf1a30Sjl ETM_HDR_SHUTDOWN, 0); 110125cf1a30Sjl (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 110225cf1a30Sjl hdrlen); 110325cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 110425cf1a30Sjl mp->epm_oconn = NULL; 110525cf1a30Sjl } 110625cf1a30Sjl 110725cf1a30Sjl if (mp->epm_xprthdl != NULL) { 110825cf1a30Sjl fmd_xprt_close(hdl, mp->epm_xprthdl); 110925cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 111025cf1a30Sjl mp->epm_ep_nvl = NULL; 111125cf1a30Sjl } 111225cf1a30Sjl 111325cf1a30Sjl if (mp->epm_ep_nvl != NULL) 111425cf1a30Sjl etm_free_ep_nvl(hdl, mp); 111525cf1a30Sjl 111625cf1a30Sjl if (mp->epm_tlhdl != NULL) 111725cf1a30Sjl (void) etm_xport_fini(hdl, mp->epm_tlhdl); 111825cf1a30Sjl 111925cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 112025cf1a30Sjl (void) pthread_mutex_destroy(&mp->epm_lock); 112125cf1a30Sjl fmd_hdl_strfree(hdl, mp->epm_ep_str); 112225cf1a30Sjl fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 112325cf1a30Sjl DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 112425cf1a30Sjl } 112525cf1a30Sjl 112625cf1a30Sjl /* 112725cf1a30Sjl * FMD entry points 112825cf1a30Sjl */ 112925cf1a30Sjl 113025cf1a30Sjl /* 113125cf1a30Sjl * FMD fmdo_send entry point. 113225cf1a30Sjl * Send an event to the remote endpoint and receive an ACK. 113325cf1a30Sjl */ 113425cf1a30Sjl static int 113525cf1a30Sjl etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 113625cf1a30Sjl { 113725cf1a30Sjl etm_epmap_t *mp; 113825cf1a30Sjl nvlist_t *msgnvl; 113977a7fd96Sjrutt int hdrstat, rv, cnt = 0; 114025cf1a30Sjl char *buf, *nvbuf, *class; 114125cf1a30Sjl size_t nvsize, buflen, hdrlen; 114277a7fd96Sjrutt struct timespec tms; 114325cf1a30Sjl 114425cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 114525cf1a30Sjl if (Etm_exit) { 114625cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 114725cf1a30Sjl return (FMD_SEND_RETRY); 114825cf1a30Sjl } 114925cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 115025cf1a30Sjl 115125cf1a30Sjl mp = fmd_xprt_getspecific(hdl, xprthdl); 115225cf1a30Sjl 115377a7fd96Sjrutt for (;;) { 115477a7fd96Sjrutt if (pthread_mutex_trylock(&mp->epm_lock) == 0) { 115577a7fd96Sjrutt break; 115677a7fd96Sjrutt } else { 115777a7fd96Sjrutt /* 115877a7fd96Sjrutt * Another thread may be (1) trying to close this 115977a7fd96Sjrutt * fmd_xprt_t, or (2) posting an event to it. 116077a7fd96Sjrutt * If (1), don't want to spend too much time here. 116177a7fd96Sjrutt * If (2), allow it to finish and release epm_lock. 116277a7fd96Sjrutt */ 116377a7fd96Sjrutt if (cnt++ < 10) { 116477a7fd96Sjrutt tms.tv_sec = 0; 116577a7fd96Sjrutt tms.tv_nsec = (cnt * 10000); 116677a7fd96Sjrutt (void) nanosleep(&tms, NULL); 116777a7fd96Sjrutt 116877a7fd96Sjrutt } else { 116977a7fd96Sjrutt return (FMD_SEND_RETRY); 117077a7fd96Sjrutt } 117177a7fd96Sjrutt } 117277a7fd96Sjrutt } 117325cf1a30Sjl 117425cf1a30Sjl mp->epm_txbusy++; 117525cf1a30Sjl 1176154b1f02Sjrutt if (mp->epm_qstat == Q_UNINITIALIZED) { 117725cf1a30Sjl mp->epm_txbusy--; 117825cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 117945736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 1180154b1f02Sjrutt return (FMD_SEND_FAILED); 1181154b1f02Sjrutt } 1182154b1f02Sjrutt 1183154b1f02Sjrutt if (mp->epm_cstat == C_CLOSED) { 118425cf1a30Sjl etm_suspend_reconnect(hdl, mp); 1185154b1f02Sjrutt mp->epm_txbusy--; 1186154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 118745736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 118825cf1a30Sjl return (FMD_SEND_RETRY); 118925cf1a30Sjl } 119025cf1a30Sjl 119125cf1a30Sjl if (mp->epm_cstat == C_LIMBO) { 119225cf1a30Sjl if (mp->epm_oconn != NULL) { 119325cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 119425cf1a30Sjl mp->epm_oconn = NULL; 119525cf1a30Sjl } 119625cf1a30Sjl 119725cf1a30Sjl fmd_xprt_suspend(hdl, xprthdl); 119825cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 119925cf1a30Sjl mp->epm_txbusy--; 120025cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 120145736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 120225cf1a30Sjl fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 120325cf1a30Sjl return (FMD_SEND_RETRY); 120425cf1a30Sjl } 120525cf1a30Sjl 120625cf1a30Sjl if (mp->epm_oconn == NULL) { 120725cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 120825cf1a30Sjl == NULL) { 1209154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 121025cf1a30Sjl mp->epm_txbusy--; 121125cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 121245736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 121325cf1a30Sjl return (FMD_SEND_RETRY); 121425cf1a30Sjl } else { 121525cf1a30Sjl mp->epm_cstat = C_OPEN; 121625cf1a30Sjl } 121725cf1a30Sjl } 121825cf1a30Sjl 121925cf1a30Sjl if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 122025cf1a30Sjl fmd_hdl_abort(hdl, "No class string in nvlist"); 122125cf1a30Sjl 122225cf1a30Sjl msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 122325cf1a30Sjl if (msgnvl == NULL) { 1224154b1f02Sjrutt mp->epm_txbusy--; 1225154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 122645736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 122725cf1a30Sjl fmd_hdl_error(hdl, "Failed to translate event %p\n", 122825cf1a30Sjl (void *) ep); 122925cf1a30Sjl return (FMD_SEND_FAILED); 123025cf1a30Sjl } 123125cf1a30Sjl 123277a7fd96Sjrutt rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str); 123377a7fd96Sjrutt if (rv == ETM_XPORT_FILTER_DROP) { 123477a7fd96Sjrutt mp->epm_txbusy--; 123577a7fd96Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 123677a7fd96Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 123777a7fd96Sjrutt fmd_hdl_debug(hdl, "send_filter dropped event"); 123877a7fd96Sjrutt nvlist_free(msgnvl); 123977a7fd96Sjrutt INCRSTAT(Etm_stats.send_filter.fmds_value.ui64); 124077a7fd96Sjrutt return (FMD_SEND_SUCCESS); 124177a7fd96Sjrutt } else if (rv == ETM_XPORT_FILTER_ERROR) { 124277a7fd96Sjrutt fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno)); 124377a7fd96Sjrutt INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64); 124477a7fd96Sjrutt /* Still send event */ 124577a7fd96Sjrutt } 124677a7fd96Sjrutt 124725cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 124825cf1a30Sjl 124925cf1a30Sjl (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 125025cf1a30Sjl 125125cf1a30Sjl hdrlen = ETM_HDRLEN; 125225cf1a30Sjl buflen = nvsize + hdrlen; 125325cf1a30Sjl 125425cf1a30Sjl ALLOC_BUF(hdl, buf, buflen); 125525cf1a30Sjl 125625cf1a30Sjl nvbuf = buf + hdrlen; 125725cf1a30Sjl 125825cf1a30Sjl (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 125925cf1a30Sjl 126025cf1a30Sjl if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1261154b1f02Sjrutt (void) pthread_mutex_lock(&mp->epm_lock); 1262154b1f02Sjrutt mp->epm_txbusy--; 1263154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 126445736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 126525cf1a30Sjl fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 126677a7fd96Sjrutt nvlist_free(msgnvl); 126725cf1a30Sjl FREE_BUF(hdl, buf, buflen); 126825cf1a30Sjl return (FMD_SEND_FAILED); 126925cf1a30Sjl } 127025cf1a30Sjl 127125cf1a30Sjl nvlist_free(msgnvl); 127225cf1a30Sjl 127325cf1a30Sjl if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 127425cf1a30Sjl buflen) != buflen) { 1275154b1f02Sjrutt fmd_hdl_debug(hdl, "failed to send message to %s", 1276154b1f02Sjrutt mp->epm_ep_str); 127725cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 1278154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 127925cf1a30Sjl mp->epm_txbusy--; 128025cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 128145736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 128225cf1a30Sjl FREE_BUF(hdl, buf, buflen); 128325cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 128425cf1a30Sjl return (FMD_SEND_RETRY); 128525cf1a30Sjl } 128625cf1a30Sjl 128725cf1a30Sjl INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 128825cf1a30Sjl ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 128925cf1a30Sjl 129025cf1a30Sjl etm_hex_dump(hdl, nvbuf, nvsize, 1); 129125cf1a30Sjl 129225cf1a30Sjl if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 129325cf1a30Sjl hdrlen) != hdrlen) { 1294154b1f02Sjrutt fmd_hdl_debug(hdl, "failed to read ACK from %s", 1295154b1f02Sjrutt mp->epm_ep_str); 129625cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 1297154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 129825cf1a30Sjl mp->epm_txbusy--; 129925cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 130045736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 130125cf1a30Sjl FREE_BUF(hdl, buf, buflen); 130225cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 130325cf1a30Sjl return (FMD_SEND_RETRY); 130425cf1a30Sjl } 130525cf1a30Sjl 130625cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, buf); 130725cf1a30Sjl FREE_BUF(hdl, buf, buflen); 130825cf1a30Sjl 130925cf1a30Sjl if (hdrstat == ETM_HDR_ACK) { 131025cf1a30Sjl INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 131125cf1a30Sjl } else { 131225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 131325cf1a30Sjl 131425cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 131525cf1a30Sjl mp->epm_oconn = NULL; 131625cf1a30Sjl 131725cf1a30Sjl if (hdrstat == ETM_HDR_NAK) { 131825cf1a30Sjl /* Peer received a bad value in the header */ 131925cf1a30Sjl if (mp->epm_xprthdl != NULL) { 132025cf1a30Sjl mp->epm_cstat = C_LIMBO; 132125cf1a30Sjl fmd_xprt_suspend(hdl, xprthdl); 132225cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 132325cf1a30Sjl fmd_hdl_debug(hdl, "received NAK, queue " 132425cf1a30Sjl "suspended for %s", mp->epm_ep_str); 132525cf1a30Sjl } 132625cf1a30Sjl 132725cf1a30Sjl rv = FMD_SEND_RETRY; 132825cf1a30Sjl 132925cf1a30Sjl } else if (hdrstat == ETM_HDR_S_RESTART) { 133025cf1a30Sjl /* Server has restarted */ 1331154b1f02Sjrutt mp->epm_cstat = C_CLOSED; 1332154b1f02Sjrutt mp->epm_qstat = Q_UNINITIALIZED; 1333154b1f02Sjrutt fmd_hdl_debug(hdl, "server %s restarted", 1334154b1f02Sjrutt mp->epm_ep_str); 1335154b1f02Sjrutt /* 1336154b1f02Sjrutt * Cannot call fmd_xprt_close here, so we'll do it 1337154b1f02Sjrutt * on the timeout thread. 1338154b1f02Sjrutt */ 1339154b1f02Sjrutt if (mp->epm_timer_in_use == 0) { 1340154b1f02Sjrutt mp->epm_timer_id = fmd_timer_install( 1341154b1f02Sjrutt hdl, mp, NULL, 0); 1342154b1f02Sjrutt mp->epm_timer_in_use = 1; 134325cf1a30Sjl } 134425cf1a30Sjl 134525cf1a30Sjl /* 134625cf1a30Sjl * fault.* or list.* events will be replayed if a 134725cf1a30Sjl * transport is opened with the same auth. 134825cf1a30Sjl * Other events will be discarded. 134925cf1a30Sjl */ 135025cf1a30Sjl rv = FMD_SEND_FAILED; 135125cf1a30Sjl 135225cf1a30Sjl } else { 135325cf1a30Sjl mp->epm_cstat = C_CLOSED; 135425cf1a30Sjl fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 135525cf1a30Sjl 135625cf1a30Sjl rv = FMD_SEND_RETRY; 135725cf1a30Sjl } 135825cf1a30Sjl 135925cf1a30Sjl mp->epm_txbusy--; 136025cf1a30Sjl 136125cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 136245736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 136325cf1a30Sjl 136425cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 136525cf1a30Sjl 136625cf1a30Sjl return (rv); 136725cf1a30Sjl } 136825cf1a30Sjl 136925cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 137025cf1a30Sjl mp->epm_txbusy--; 137125cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 137245736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 137325cf1a30Sjl 137425cf1a30Sjl return (FMD_SEND_SUCCESS); 137525cf1a30Sjl } 137625cf1a30Sjl 137725cf1a30Sjl /* 137825cf1a30Sjl * FMD fmdo_timeout entry point.. 137925cf1a30Sjl */ 138025cf1a30Sjl /*ARGSUSED*/ 138125cf1a30Sjl static void 138225cf1a30Sjl etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 138325cf1a30Sjl { 138425cf1a30Sjl etm_epmap_t *mp = (etm_epmap_t *)data; 138525cf1a30Sjl 138625cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 138725cf1a30Sjl 138825cf1a30Sjl mp->epm_timer_in_use = 0; 138925cf1a30Sjl 139025cf1a30Sjl if (mp->epm_qstat == Q_UNINITIALIZED) { 139125cf1a30Sjl /* Server has shutdown and we (client) need to reconnect */ 1392154b1f02Sjrutt if (mp->epm_xprthdl != NULL) { 1393154b1f02Sjrutt fmd_xprt_close(hdl, mp->epm_xprthdl); 1394154b1f02Sjrutt fmd_hdl_debug(hdl, "queue closed for %s", 1395154b1f02Sjrutt mp->epm_ep_str); 1396154b1f02Sjrutt mp->epm_xprthdl = NULL; 1397154b1f02Sjrutt /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1398154b1f02Sjrutt mp->epm_ep_nvl = NULL; 1399154b1f02Sjrutt } 1400154b1f02Sjrutt 140125cf1a30Sjl if (mp->epm_ep_nvl == NULL) 140225cf1a30Sjl (void) etm_get_ep_nvl(hdl, mp); 140325cf1a30Sjl 140425cf1a30Sjl if (etm_handle_startup(hdl, mp)) { 1405154b1f02Sjrutt if (mp->epm_oconn != NULL) { 1406154b1f02Sjrutt (void) etm_xport_close(hdl, mp->epm_oconn); 1407154b1f02Sjrutt mp->epm_oconn = NULL; 1408154b1f02Sjrutt } 1409154b1f02Sjrutt mp->epm_cstat = C_UNINITIALIZED; 141025cf1a30Sjl mp->epm_qstat = Q_UNINITIALIZED; 141125cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 141225cf1a30Sjl Reconn_interval); 141325cf1a30Sjl mp->epm_timer_in_use = 1; 141425cf1a30Sjl } 141525cf1a30Sjl } else { 141625cf1a30Sjl etm_reconnect(hdl, mp); 141725cf1a30Sjl } 141825cf1a30Sjl 141925cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 142025cf1a30Sjl } 142125cf1a30Sjl 142225cf1a30Sjl /* 142325cf1a30Sjl * FMD Module declarations 142425cf1a30Sjl */ 142525cf1a30Sjl static const fmd_hdl_ops_t etm_ops = { 142625cf1a30Sjl NULL, /* fmdo_recv */ 142725cf1a30Sjl etm_timeout, /* fmdo_timeout */ 142825cf1a30Sjl NULL, /* fmdo_close */ 142925cf1a30Sjl NULL, /* fmdo_stats */ 143025cf1a30Sjl NULL, /* fmdo_gc */ 143125cf1a30Sjl etm_send, /* fmdo_send */ 143225cf1a30Sjl }; 143325cf1a30Sjl 143425cf1a30Sjl static const fmd_prop_t etm_props[] = { 143525cf1a30Sjl { "client_list", FMD_TYPE_STRING, NULL }, 143625cf1a30Sjl { "server_list", FMD_TYPE_STRING, NULL }, 143725cf1a30Sjl { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 143877a7fd96Sjrutt { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" }, 143977a7fd96Sjrutt { "rw_timeout", FMD_TYPE_UINT64, "2000000000" }, 144077a7fd96Sjrutt { "filter_path", FMD_TYPE_STRING, NULL }, 144125cf1a30Sjl { NULL, 0, NULL } 144225cf1a30Sjl }; 144325cf1a30Sjl 144425cf1a30Sjl static const fmd_hdl_info_t etm_info = { 144525cf1a30Sjl "Event Transport Module", "2.0", &etm_ops, etm_props 144625cf1a30Sjl }; 144725cf1a30Sjl 144825cf1a30Sjl /* 144925cf1a30Sjl * Initialize the transport for use by ETM. 145025cf1a30Sjl */ 145125cf1a30Sjl void 145225cf1a30Sjl _fmd_init(fmd_hdl_t *hdl) 145325cf1a30Sjl { 145425cf1a30Sjl char *propstr; 145525cf1a30Sjl 145625cf1a30Sjl if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 145725cf1a30Sjl return; /* invalid data in configuration file */ 145825cf1a30Sjl } 145925cf1a30Sjl 146025cf1a30Sjl /* Create global stats */ 146125cf1a30Sjl (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 146225cf1a30Sjl sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 146325cf1a30Sjl 146425cf1a30Sjl /* Get module properties */ 146525cf1a30Sjl Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 146625cf1a30Sjl Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 146725cf1a30Sjl Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 146825cf1a30Sjl 146925cf1a30Sjl propstr = fmd_prop_get_string(hdl, "client_list"); 147025cf1a30Sjl etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 147125cf1a30Sjl fmd_prop_free_string(hdl, propstr); 147225cf1a30Sjl 147325cf1a30Sjl propstr = fmd_prop_get_string(hdl, "server_list"); 147425cf1a30Sjl etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 147525cf1a30Sjl fmd_prop_free_string(hdl, propstr); 147625cf1a30Sjl 147725cf1a30Sjl if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 147825cf1a30Sjl fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 147925cf1a30Sjl fmd_hdl_unregister(hdl); 148025cf1a30Sjl return; 148125cf1a30Sjl } 148225cf1a30Sjl } 148325cf1a30Sjl 148425cf1a30Sjl /* 148525cf1a30Sjl * Teardown the transport 148625cf1a30Sjl */ 148725cf1a30Sjl void 148825cf1a30Sjl _fmd_fini(fmd_hdl_t *hdl) 148925cf1a30Sjl { 149025cf1a30Sjl etm_epmap_t *mp, *next; 149125cf1a30Sjl 149225cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 149325cf1a30Sjl Etm_exit = 1; 149425cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 149525cf1a30Sjl 149625cf1a30Sjl mp = Epmap_head; 149725cf1a30Sjl 149825cf1a30Sjl while (mp) { 149925cf1a30Sjl next = mp->epm_next; 150025cf1a30Sjl etm_free_epmap(hdl, mp); 150125cf1a30Sjl mp = next; 150225cf1a30Sjl } 150325cf1a30Sjl 150425cf1a30Sjl fmd_hdl_unregister(hdl); 150525cf1a30Sjl } 1506