125cf1a30Sjl /* 225cf1a30Sjl * CDDL HEADER START 325cf1a30Sjl * 425cf1a30Sjl * The contents of this file are subject to the terms of the 525cf1a30Sjl * Common Development and Distribution License (the "License"). 625cf1a30Sjl * You may not use this file except in compliance with the License. 725cf1a30Sjl * 825cf1a30Sjl * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 925cf1a30Sjl * or http://www.opensolaris.org/os/licensing. 1025cf1a30Sjl * See the License for the specific language governing permissions 1125cf1a30Sjl * and limitations under the License. 1225cf1a30Sjl * 1325cf1a30Sjl * When distributing Covered Code, include this CDDL HEADER in each 1425cf1a30Sjl * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 1525cf1a30Sjl * If applicable, add the following below this CDDL HEADER, with the 1625cf1a30Sjl * fields enclosed by brackets "[]" replaced with your own identifying 1725cf1a30Sjl * information: Portions Copyright [yyyy] [name of copyright owner] 1825cf1a30Sjl * 1925cf1a30Sjl * CDDL HEADER END 2025cf1a30Sjl */ 2125cf1a30Sjl 2225cf1a30Sjl /* 2325cf1a30Sjl * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 2425cf1a30Sjl * Use is subject to license terms. 2525cf1a30Sjl */ 2625cf1a30Sjl 2725cf1a30Sjl #pragma ident "%Z%%M% %I% %E% SMI" 2825cf1a30Sjl 2925cf1a30Sjl /* 3025cf1a30Sjl * FMA Event Transport Module 3125cf1a30Sjl * 3225cf1a30Sjl * Plugin for sending/receiving FMA events to/from a remote endoint. 3325cf1a30Sjl */ 3425cf1a30Sjl 3525cf1a30Sjl #include <netinet/in.h> 36*77a7fd96Sjrutt #include <errno.h> 3725cf1a30Sjl #include <sys/fm/protocol.h> 3825cf1a30Sjl #include <sys/sysmacros.h> 3925cf1a30Sjl #include <pthread.h> 4025cf1a30Sjl #include <strings.h> 4125cf1a30Sjl #include <ctype.h> 4225cf1a30Sjl #include <link.h> 4325cf1a30Sjl #include <libnvpair.h> 4425cf1a30Sjl #include "etm_xport_api.h" 4525cf1a30Sjl #include "etm_proto.h" 4625cf1a30Sjl 4725cf1a30Sjl /* 4825cf1a30Sjl * ETM declarations 4925cf1a30Sjl */ 5025cf1a30Sjl 5125cf1a30Sjl typedef enum etm_connection_status { 5225cf1a30Sjl C_UNINITIALIZED = 0, 5325cf1a30Sjl C_OPEN, /* Connection is open */ 5425cf1a30Sjl C_CLOSED, /* Connection is closed */ 5525cf1a30Sjl C_LIMBO, /* Bad value in header from peer */ 5625cf1a30Sjl C_TIMED_OUT /* Reconnection to peer timed out */ 5725cf1a30Sjl } etm_connstat_t; 5825cf1a30Sjl 5925cf1a30Sjl typedef enum etm_fmd_queue_status { 6025cf1a30Sjl Q_UNINITIALIZED = 100, 6125cf1a30Sjl Q_INIT_PENDING, /* Queue initialization in progress */ 6225cf1a30Sjl Q_OPEN, /* Queue is open */ 6325cf1a30Sjl Q_SUSPENDED /* Queue is suspended */ 6425cf1a30Sjl } etm_qstat_t; 6525cf1a30Sjl 6625cf1a30Sjl /* Per endpoint data */ 6725cf1a30Sjl typedef struct etm_endpoint_map { 6825cf1a30Sjl uint8_t epm_ver; /* Protocol version being used */ 6925cf1a30Sjl char *epm_ep_str; /* Endpoint ID string */ 7025cf1a30Sjl int epm_xprtflags; /* FMD transport open flags */ 7125cf1a30Sjl etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 7225cf1a30Sjl pthread_mutex_t epm_lock; /* Protects remainder of struct */ 7325cf1a30Sjl pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 7425cf1a30Sjl int epm_txbusy; /* Busy doing send/transmit */ 7525cf1a30Sjl fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 7625cf1a30Sjl etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 7725cf1a30Sjl nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 7825cf1a30Sjl etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 7925cf1a30Sjl etm_connstat_t epm_cstat; /* Status of connection */ 8025cf1a30Sjl id_t epm_timer_id; /* Timer id */ 8125cf1a30Sjl int epm_timer_in_use; /* Indicates if timer is in use */ 8225cf1a30Sjl hrtime_t epm_reconn_end; /* Reconnection end time */ 8325cf1a30Sjl struct etm_endpoint_map *epm_next; 8425cf1a30Sjl } etm_epmap_t; 8525cf1a30Sjl 8625cf1a30Sjl #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 8725cf1a30Sjl #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 8825cf1a30Sjl #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 8925cf1a30Sjl #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 9025cf1a30Sjl #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 9125cf1a30Sjl #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 9225cf1a30Sjl 9325cf1a30Sjl #define ALLOC_BUF(hdl, buf, size) \ 9425cf1a30Sjl buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 9525cf1a30Sjl 9625cf1a30Sjl #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 9725cf1a30Sjl 9825cf1a30Sjl #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 9925cf1a30Sjl 10025cf1a30Sjl #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 10125cf1a30Sjl (x)++; \ 10225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 10325cf1a30Sjl } 10425cf1a30Sjl 10525cf1a30Sjl #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 10625cf1a30Sjl (x)--; \ 10725cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 10825cf1a30Sjl } 10925cf1a30Sjl 11025cf1a30Sjl #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 11125cf1a30Sjl (x) += (y); \ 11225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); \ 11325cf1a30Sjl } 11425cf1a30Sjl 11525cf1a30Sjl /* 11625cf1a30Sjl * Global variables 11725cf1a30Sjl */ 11825cf1a30Sjl static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 11925cf1a30Sjl /* Protects globals */ 12025cf1a30Sjl static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 12125cf1a30Sjl static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 12225cf1a30Sjl static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 12325cf1a30Sjl static int Etm_dump = 0; /* Enables hex dump for debug */ 12425cf1a30Sjl static int Etm_exit = 0; /* Flag for exit */ 12525cf1a30Sjl static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 12625cf1a30Sjl 12725cf1a30Sjl /* Module statistics */ 12825cf1a30Sjl static struct etm_stats { 12925cf1a30Sjl /* read counters */ 13025cf1a30Sjl fmd_stat_t read_ack; 13125cf1a30Sjl fmd_stat_t read_bytes; 13225cf1a30Sjl fmd_stat_t read_msg; 133*77a7fd96Sjrutt fmd_stat_t post_filter; 13425cf1a30Sjl /* write counters */ 13525cf1a30Sjl fmd_stat_t write_ack; 13625cf1a30Sjl fmd_stat_t write_bytes; 13725cf1a30Sjl fmd_stat_t write_msg; 138*77a7fd96Sjrutt fmd_stat_t send_filter; 13925cf1a30Sjl /* error counters */ 14025cf1a30Sjl fmd_stat_t error_protocol; 14125cf1a30Sjl fmd_stat_t error_drop_read; 14225cf1a30Sjl fmd_stat_t error_read; 14325cf1a30Sjl fmd_stat_t error_read_badhdr; 14425cf1a30Sjl fmd_stat_t error_write; 145*77a7fd96Sjrutt fmd_stat_t error_send_filter; 146*77a7fd96Sjrutt fmd_stat_t error_post_filter; 14725cf1a30Sjl /* misc */ 14825cf1a30Sjl fmd_stat_t peer_count; 14925cf1a30Sjl 15025cf1a30Sjl } Etm_stats = { 15125cf1a30Sjl /* read counters */ 15225cf1a30Sjl { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 15325cf1a30Sjl { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 15425cf1a30Sjl { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 155*77a7fd96Sjrutt { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" }, 15625cf1a30Sjl /* write counters */ 15725cf1a30Sjl { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 15825cf1a30Sjl { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 15925cf1a30Sjl { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 160*77a7fd96Sjrutt { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" }, 16125cf1a30Sjl /* ETM error counters */ 16225cf1a30Sjl { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 16325cf1a30Sjl { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 16425cf1a30Sjl { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 16525cf1a30Sjl { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 16625cf1a30Sjl { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 167*77a7fd96Sjrutt { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" }, 168*77a7fd96Sjrutt { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" }, 16925cf1a30Sjl /* ETM Misc */ 17025cf1a30Sjl { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 17125cf1a30Sjl }; 17225cf1a30Sjl 17325cf1a30Sjl /* 17425cf1a30Sjl * ETM Private functions 17525cf1a30Sjl */ 17625cf1a30Sjl 17725cf1a30Sjl /* 17825cf1a30Sjl * Hex dump for debug. 17925cf1a30Sjl */ 18025cf1a30Sjl static void 18125cf1a30Sjl etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 18225cf1a30Sjl { 18325cf1a30Sjl int i, j, k; 18425cf1a30Sjl int16_t *c; 18525cf1a30Sjl 18625cf1a30Sjl if (Etm_dump == 0) 18725cf1a30Sjl return; 18825cf1a30Sjl 18925cf1a30Sjl j = buflen / 16; /* Number of complete 8-column rows */ 19025cf1a30Sjl k = buflen % 16; /* Is there a last (non-8-column) row? */ 19125cf1a30Sjl 19225cf1a30Sjl if (direction) 19325cf1a30Sjl fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 19425cf1a30Sjl else 19525cf1a30Sjl fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 19625cf1a30Sjl 19725cf1a30Sjl fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 19825cf1a30Sjl 19925cf1a30Sjl /* Dump the complete 8-column rows */ 20025cf1a30Sjl for (i = 0; i < j; i++) { 20125cf1a30Sjl c = (int16_t *)buf + (i * 8); 20225cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 20325cf1a30Sjl *(c+0), *(c+1), *(c+2), *(c+3), 20425cf1a30Sjl *(c+4), *(c+5), *(c+6), *(c+7)); 20525cf1a30Sjl } 20625cf1a30Sjl 20725cf1a30Sjl /* Dump the last (incomplete) row */ 20825cf1a30Sjl c = (int16_t *)buf + (i * 8); 20925cf1a30Sjl switch (k) { 21025cf1a30Sjl case 4: 21125cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 21225cf1a30Sjl break; 21325cf1a30Sjl case 8: 21425cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 21525cf1a30Sjl *(c+2), *(c+3)); 21625cf1a30Sjl break; 21725cf1a30Sjl case 12: 21825cf1a30Sjl fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 21925cf1a30Sjl *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 22025cf1a30Sjl break; 22125cf1a30Sjl } 22225cf1a30Sjl 22325cf1a30Sjl fmd_hdl_debug(hdl, "--- End Dump ---"); 22425cf1a30Sjl } 22525cf1a30Sjl 22625cf1a30Sjl /* 22725cf1a30Sjl * Provide the length of a message based on the data in the given ETM header. 22825cf1a30Sjl */ 22925cf1a30Sjl static size_t 23025cf1a30Sjl etm_get_msglen(void *buf) 23125cf1a30Sjl { 23225cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 23325cf1a30Sjl 23425cf1a30Sjl return (ntohl(hp->hdr_msglen)); 23525cf1a30Sjl } 23625cf1a30Sjl 23725cf1a30Sjl /* 23825cf1a30Sjl * Check the contents of the ETM header for errors. 23925cf1a30Sjl * Return the header type (hdr_type). 24025cf1a30Sjl */ 24125cf1a30Sjl static int 24225cf1a30Sjl etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 24325cf1a30Sjl { 24425cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 24525cf1a30Sjl 24625cf1a30Sjl if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 247154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 24825cf1a30Sjl ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 24925cf1a30Sjl return (ETM_HDR_INVALID); 25025cf1a30Sjl } 25125cf1a30Sjl 25225cf1a30Sjl if ((hp->hdr_type == ETM_HDR_C_HELLO) || 25325cf1a30Sjl (hp->hdr_type == ETM_HDR_S_HELLO)) { 25425cf1a30Sjl /* Until version is negotiated, other fields may be wrong */ 25525cf1a30Sjl return (hp->hdr_type); 25625cf1a30Sjl } 25725cf1a30Sjl 25825cf1a30Sjl if (hp->hdr_ver != mp->epm_ver) { 259154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 26025cf1a30Sjl mp->epm_ep_str, hp->hdr_ver); 26125cf1a30Sjl return (ETM_HDR_BADVERSION); 26225cf1a30Sjl } 26325cf1a30Sjl 26425cf1a30Sjl if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 26525cf1a30Sjl (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 266154b1f02Sjrutt fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 26725cf1a30Sjl mp->epm_ep_str, hp->hdr_type); 26825cf1a30Sjl return (ETM_HDR_BADTYPE); 26925cf1a30Sjl } 27025cf1a30Sjl 27125cf1a30Sjl return (hp->hdr_type); 27225cf1a30Sjl } 27325cf1a30Sjl 27425cf1a30Sjl /* 27525cf1a30Sjl * Create an ETM header of a given type in the given buffer. 27625cf1a30Sjl * Return length of header. 27725cf1a30Sjl */ 27825cf1a30Sjl static size_t 27925cf1a30Sjl etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 28025cf1a30Sjl { 28125cf1a30Sjl etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 28225cf1a30Sjl 28325cf1a30Sjl bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 28425cf1a30Sjl hp->hdr_ver = ver; 28525cf1a30Sjl hp->hdr_type = type; 28625cf1a30Sjl hp->hdr_msglen = htonl(msglen); 28725cf1a30Sjl 28825cf1a30Sjl return (ETM_HDRLEN); 28925cf1a30Sjl } 29025cf1a30Sjl 29125cf1a30Sjl /* 29225cf1a30Sjl * Convert message bytes to nvlist and post to fmd. 29325cf1a30Sjl * Return zero for success, non-zero for failure. 29425cf1a30Sjl * 29525cf1a30Sjl * Note : nvl is free'd by fmd. 29625cf1a30Sjl */ 29725cf1a30Sjl static int 29825cf1a30Sjl etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 29925cf1a30Sjl { 30025cf1a30Sjl nvlist_t *nvl; 30125cf1a30Sjl int rv; 30225cf1a30Sjl 30325cf1a30Sjl if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 304154b1f02Sjrutt fmd_hdl_error(hdl, "failed to unpack message"); 30525cf1a30Sjl return (1); 30625cf1a30Sjl } 30725cf1a30Sjl 308*77a7fd96Sjrutt rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str); 309*77a7fd96Sjrutt if (rv == ETM_XPORT_FILTER_DROP) { 310*77a7fd96Sjrutt fmd_hdl_debug(hdl, "post_filter dropped event"); 311*77a7fd96Sjrutt INCRSTAT(Etm_stats.post_filter.fmds_value.ui64); 312*77a7fd96Sjrutt nvlist_free(nvl); 313*77a7fd96Sjrutt return (0); 314*77a7fd96Sjrutt } else if (rv == ETM_XPORT_FILTER_ERROR) { 315*77a7fd96Sjrutt fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno)); 316*77a7fd96Sjrutt INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64); 317*77a7fd96Sjrutt /* Still post event */ 318*77a7fd96Sjrutt } 319*77a7fd96Sjrutt 32025cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 32125cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 32225cf1a30Sjl if (!Etm_exit) { 32325cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 32425cf1a30Sjl if (mp->epm_qstat == Q_OPEN) { 32525cf1a30Sjl fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 32625cf1a30Sjl rv = 0; 32725cf1a30Sjl } else if (mp->epm_qstat == Q_SUSPENDED) { 32825cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 32925cf1a30Sjl if (mp->epm_timer_in_use) { 33025cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 33125cf1a30Sjl mp->epm_timer_in_use = 0; 33225cf1a30Sjl } 33325cf1a30Sjl mp->epm_qstat = Q_OPEN; 33425cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", 33525cf1a30Sjl mp->epm_ep_str); 33625cf1a30Sjl fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 33725cf1a30Sjl rv = 0; 33825cf1a30Sjl } else { 33925cf1a30Sjl fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 34025cf1a30Sjl mp->epm_qstat); 341154b1f02Sjrutt nvlist_free(nvl); 342154b1f02Sjrutt /* Remote peer will attempt to resend event */ 34325cf1a30Sjl rv = 2; 34425cf1a30Sjl } 34525cf1a30Sjl } else { 34625cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 34725cf1a30Sjl fmd_hdl_debug(hdl, "unable to post message, module exiting"); 348154b1f02Sjrutt nvlist_free(nvl); 349154b1f02Sjrutt /* Remote peer will attempt to resend event */ 35025cf1a30Sjl rv = 3; 35125cf1a30Sjl } 35225cf1a30Sjl 35325cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 35425cf1a30Sjl 35525cf1a30Sjl return (rv); 35625cf1a30Sjl } 35725cf1a30Sjl 35825cf1a30Sjl /* 35925cf1a30Sjl * Handle the startup handshake to the server. The client always initiates 36025cf1a30Sjl * the startup handshake. In the following sequence, we are the client and 36125cf1a30Sjl * the remote endpoint is the server. 36225cf1a30Sjl * 36325cf1a30Sjl * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 36425cf1a30Sjl * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 36525cf1a30Sjl * Client sends ACK and transitions to Q_OPEN state. 36625cf1a30Sjl * Server receives ACK and transitions to Q_OPEN state. 36725cf1a30Sjl * 36825cf1a30Sjl * Return 0 for success, nonzero for failure. 36925cf1a30Sjl */ 37025cf1a30Sjl static int 37125cf1a30Sjl etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 37225cf1a30Sjl { 37325cf1a30Sjl etm_proto_hdr_t *hp; 37425cf1a30Sjl size_t hdrlen = ETM_HDRLEN; 37525cf1a30Sjl int hdrstat; 37625cf1a30Sjl char hbuf[ETM_HDRLEN]; 37725cf1a30Sjl 37825cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 37925cf1a30Sjl return (1); 38025cf1a30Sjl 38125cf1a30Sjl mp->epm_cstat = C_OPEN; 38225cf1a30Sjl 38325cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 38425cf1a30Sjl 38525cf1a30Sjl if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 38625cf1a30Sjl hdrlen)) != hdrlen) { 38725cf1a30Sjl fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 38825cf1a30Sjl mp->epm_ep_str); 38925cf1a30Sjl return (2); 39025cf1a30Sjl } 39125cf1a30Sjl 39225cf1a30Sjl mp->epm_qstat = Q_INIT_PENDING; 39325cf1a30Sjl 39425cf1a30Sjl if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 39525cf1a30Sjl hdrlen)) != hdrlen) { 39625cf1a30Sjl fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 39725cf1a30Sjl mp->epm_ep_str); 39825cf1a30Sjl return (3); 39925cf1a30Sjl } 40025cf1a30Sjl 40125cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, hbuf); 40225cf1a30Sjl 40325cf1a30Sjl if (hdrstat != ETM_HDR_S_HELLO) { 40425cf1a30Sjl fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 40525cf1a30Sjl "from %s", mp->epm_ep_str); 40625cf1a30Sjl return (4); 40725cf1a30Sjl } 40825cf1a30Sjl 40925cf1a30Sjl /* 41025cf1a30Sjl * Get version from the server. 41125cf1a30Sjl * Currently, only one version is supported. 41225cf1a30Sjl */ 41325cf1a30Sjl hp = (etm_proto_hdr_t *)(void *)hbuf; 41425cf1a30Sjl if (hp->hdr_ver != ETM_PROTO_V1) { 41525cf1a30Sjl fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 41625cf1a30Sjl mp->epm_ep_str, hp->hdr_ver); 41725cf1a30Sjl return (5); 41825cf1a30Sjl } 41925cf1a30Sjl mp->epm_ver = hp->hdr_ver; 42025cf1a30Sjl 42125cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 42225cf1a30Sjl 42325cf1a30Sjl if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 42425cf1a30Sjl hdrlen)) != hdrlen) { 42525cf1a30Sjl fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 42625cf1a30Sjl mp->epm_ep_str); 42725cf1a30Sjl return (6); 42825cf1a30Sjl } 42925cf1a30Sjl 43025cf1a30Sjl /* 43125cf1a30Sjl * Call fmd_xprt_open and fmd_xprt_setspecific with 43225cf1a30Sjl * Etm_mod_lock held to avoid race with etm_send thread. 43325cf1a30Sjl */ 43425cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 43525cf1a30Sjl if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 43625cf1a30Sjl mp->epm_ep_nvl, NULL)) == NULL) { 43725cf1a30Sjl fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 43825cf1a30Sjl mp->epm_ep_str); 43925cf1a30Sjl } 44025cf1a30Sjl fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 44125cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 44225cf1a30Sjl 44325cf1a30Sjl mp->epm_qstat = Q_OPEN; 44425cf1a30Sjl fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 44525cf1a30Sjl 44625cf1a30Sjl return (0); 44725cf1a30Sjl } 44825cf1a30Sjl 44925cf1a30Sjl /* 45025cf1a30Sjl * Alloc a nvlist and add a string for the endpoint. 45125cf1a30Sjl * Return zero for success, non-zero for failure. 45225cf1a30Sjl */ 45325cf1a30Sjl static int 45425cf1a30Sjl etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 45525cf1a30Sjl { 45625cf1a30Sjl /* 45725cf1a30Sjl * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 45825cf1a30Sjl * in fmd when this nvlist_t is free'd. 45925cf1a30Sjl */ 46025cf1a30Sjl (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 46125cf1a30Sjl 46225cf1a30Sjl if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 463154b1f02Sjrutt fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 46425cf1a30Sjl "for %s", mp->epm_ep_str); 46525cf1a30Sjl nvlist_free(mp->epm_ep_nvl); 46625cf1a30Sjl return (1); 46725cf1a30Sjl } 46825cf1a30Sjl 46925cf1a30Sjl return (0); 47025cf1a30Sjl } 47125cf1a30Sjl 47225cf1a30Sjl /* 47325cf1a30Sjl * Free the nvlist for the endpoint_id string. 47425cf1a30Sjl */ 47525cf1a30Sjl /*ARGSUSED*/ 47625cf1a30Sjl static void 47725cf1a30Sjl etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 47825cf1a30Sjl { 47925cf1a30Sjl nvlist_free(mp->epm_ep_nvl); 48025cf1a30Sjl } 48125cf1a30Sjl 48225cf1a30Sjl /* 48325cf1a30Sjl * Check for a duplicate endpoint/peer string. 48425cf1a30Sjl */ 48525cf1a30Sjl /*ARGSUSED*/ 48625cf1a30Sjl static int 48725cf1a30Sjl etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 48825cf1a30Sjl { 48925cf1a30Sjl etm_epmap_t *mp; 49025cf1a30Sjl 49125cf1a30Sjl for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 49225cf1a30Sjl if (strcmp(epname, mp->epm_ep_str) == 0) 49325cf1a30Sjl return (1); 49425cf1a30Sjl 49525cf1a30Sjl return (0); 49625cf1a30Sjl } 49725cf1a30Sjl 49825cf1a30Sjl /* 49925cf1a30Sjl * Attempt to re-open a connection with the remote endpoint. 50025cf1a30Sjl */ 50125cf1a30Sjl static void 50225cf1a30Sjl etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 50325cf1a30Sjl { 50425cf1a30Sjl if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 50525cf1a30Sjl if (gethrtime() < mp->epm_reconn_end) { 50625cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, 50725cf1a30Sjl mp->epm_tlhdl)) == NULL) { 50825cf1a30Sjl fmd_hdl_debug(hdl, "reconnect failed for %s", 50925cf1a30Sjl mp->epm_ep_str); 51025cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, 51125cf1a30Sjl NULL, Reconn_interval); 51225cf1a30Sjl mp->epm_timer_in_use = 1; 51325cf1a30Sjl } else { 51425cf1a30Sjl fmd_hdl_debug(hdl, "reconnect success for %s", 51525cf1a30Sjl mp->epm_ep_str); 51625cf1a30Sjl mp->epm_reconn_end = 0; 51725cf1a30Sjl mp->epm_cstat = C_OPEN; 51825cf1a30Sjl } 51925cf1a30Sjl } else { 52025cf1a30Sjl fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 52125cf1a30Sjl mp->epm_ep_str); 52225cf1a30Sjl mp->epm_reconn_end = 0; 52325cf1a30Sjl mp->epm_cstat = C_TIMED_OUT; 52425cf1a30Sjl } 52525cf1a30Sjl } 52625cf1a30Sjl 52725cf1a30Sjl if (mp->epm_cstat == C_OPEN) { 52825cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 52925cf1a30Sjl mp->epm_qstat = Q_OPEN; 53025cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 53125cf1a30Sjl } 53225cf1a30Sjl } 53325cf1a30Sjl 53425cf1a30Sjl /* 53525cf1a30Sjl * Suspend a given connection and setup for reconnection retries. 536154b1f02Sjrutt * Assume caller holds lock on epm_lock. 53725cf1a30Sjl */ 53825cf1a30Sjl static void 53925cf1a30Sjl etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 54025cf1a30Sjl { 54125cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 54225cf1a30Sjl if (Etm_exit) { 54325cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 54425cf1a30Sjl return; 54525cf1a30Sjl } 54625cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 54725cf1a30Sjl 54825cf1a30Sjl if (mp->epm_oconn != NULL) { 54925cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 55025cf1a30Sjl mp->epm_oconn = NULL; 55125cf1a30Sjl } 55225cf1a30Sjl 55325cf1a30Sjl mp->epm_reconn_end = gethrtime() + Reconn_timeout; 55425cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 55525cf1a30Sjl 55625cf1a30Sjl if (mp->epm_xprthdl != NULL) { 55725cf1a30Sjl fmd_xprt_suspend(hdl, mp->epm_xprthdl); 55825cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 55925cf1a30Sjl fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 56025cf1a30Sjl 56125cf1a30Sjl if (mp->epm_timer_in_use == 0) { 56225cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 56325cf1a30Sjl Reconn_interval); 56425cf1a30Sjl mp->epm_timer_in_use = 1; 56525cf1a30Sjl } 56625cf1a30Sjl } 56725cf1a30Sjl } 56825cf1a30Sjl 56925cf1a30Sjl /* 57025cf1a30Sjl * Reinitialize the connection. The old fmd_xprt_t handle must be 57125cf1a30Sjl * removed/closed first. 57225cf1a30Sjl * Assume caller holds lock on epm_lock. 57325cf1a30Sjl */ 57425cf1a30Sjl static void 57525cf1a30Sjl etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 57625cf1a30Sjl { 57725cf1a30Sjl /* 57825cf1a30Sjl * To avoid a deadlock, wait for etm_send to finish before 57925cf1a30Sjl * calling fmd_xprt_close() 58025cf1a30Sjl */ 58125cf1a30Sjl while (mp->epm_txbusy) 58225cf1a30Sjl (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 58325cf1a30Sjl 58425cf1a30Sjl if (mp->epm_xprthdl != NULL) { 58525cf1a30Sjl fmd_xprt_close(hdl, mp->epm_xprthdl); 586154b1f02Sjrutt fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 58725cf1a30Sjl mp->epm_xprthdl = NULL; 58825cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 58925cf1a30Sjl mp->epm_ep_nvl = NULL; 59025cf1a30Sjl } 59125cf1a30Sjl 59225cf1a30Sjl if (mp->epm_timer_in_use) { 59325cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 59425cf1a30Sjl mp->epm_timer_in_use = 0; 59525cf1a30Sjl } 59625cf1a30Sjl 59725cf1a30Sjl if (mp->epm_oconn != NULL) { 59825cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 59925cf1a30Sjl mp->epm_oconn = NULL; 60025cf1a30Sjl } 60125cf1a30Sjl 60225cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 60325cf1a30Sjl mp->epm_qstat = Q_UNINITIALIZED; 60425cf1a30Sjl } 60525cf1a30Sjl 60625cf1a30Sjl /* 60725cf1a30Sjl * Receive data from ETM transport layer. 60825cf1a30Sjl * Note : This is not the fmdo_recv entry point. 60925cf1a30Sjl * 61025cf1a30Sjl */ 61125cf1a30Sjl static int 61225cf1a30Sjl etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 61325cf1a30Sjl { 61425cf1a30Sjl size_t buflen, hdrlen; 61525cf1a30Sjl void *buf; 61625cf1a30Sjl char hbuf[ETM_HDRLEN]; 61725cf1a30Sjl int hdrstat, rv; 61825cf1a30Sjl 61925cf1a30Sjl hdrlen = ETM_HDRLEN; 62025cf1a30Sjl 62125cf1a30Sjl if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 62225cf1a30Sjl fmd_hdl_debug(hdl, "failed to read header from %s", 62325cf1a30Sjl mp->epm_ep_str); 62425cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 62525cf1a30Sjl return (EIO); 62625cf1a30Sjl } 62725cf1a30Sjl 62825cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, hbuf); 62925cf1a30Sjl 63025cf1a30Sjl switch (hdrstat) { 63125cf1a30Sjl case ETM_HDR_INVALID: 63225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 63325cf1a30Sjl if (mp->epm_cstat == C_OPEN) 63425cf1a30Sjl mp->epm_cstat = C_CLOSED; 63525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 63625cf1a30Sjl 63725cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 63825cf1a30Sjl rv = ECANCELED; 63925cf1a30Sjl break; 64025cf1a30Sjl 64125cf1a30Sjl case ETM_HDR_BADTYPE: 64225cf1a30Sjl case ETM_HDR_BADVERSION: 64325cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 64425cf1a30Sjl 64525cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 64625cf1a30Sjl hdrlen)) != hdrlen) { 64725cf1a30Sjl fmd_hdl_debug(hdl, "failed to write NAK to %s", 64825cf1a30Sjl mp->epm_ep_str); 64925cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 65025cf1a30Sjl return (EIO); 65125cf1a30Sjl } 65225cf1a30Sjl 65325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 65425cf1a30Sjl mp->epm_cstat = C_LIMBO; 65525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 65625cf1a30Sjl 65725cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 65825cf1a30Sjl rv = ENOTSUP; 65925cf1a30Sjl break; 66025cf1a30Sjl 66125cf1a30Sjl case ETM_HDR_C_HELLO: 66225cf1a30Sjl /* Client is initiating a startup handshake */ 66325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 66425cf1a30Sjl etm_reinit(hdl, mp); 66525cf1a30Sjl mp->epm_qstat = Q_INIT_PENDING; 66625cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 66725cf1a30Sjl 66825cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 66925cf1a30Sjl 67025cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 67125cf1a30Sjl hdrlen)) != hdrlen) { 67225cf1a30Sjl fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 67325cf1a30Sjl mp->epm_ep_str); 67425cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 67525cf1a30Sjl return (EIO); 67625cf1a30Sjl } 67725cf1a30Sjl 67825cf1a30Sjl rv = 0; 67925cf1a30Sjl break; 68025cf1a30Sjl 68125cf1a30Sjl case ETM_HDR_ACK: 68225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 68325cf1a30Sjl if (mp->epm_qstat == Q_INIT_PENDING) { 68425cf1a30Sjl /* This is client's ACK from startup handshake */ 68525cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 68625cf1a30Sjl if (mp->epm_ep_nvl == NULL) 68725cf1a30Sjl (void) etm_get_ep_nvl(hdl, mp); 68825cf1a30Sjl 68925cf1a30Sjl /* 69025cf1a30Sjl * Call fmd_xprt_open and fmd_xprt_setspecific with 69125cf1a30Sjl * Etm_mod_lock held to avoid race with etm_send thread. 69225cf1a30Sjl */ 69325cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 69425cf1a30Sjl if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 69525cf1a30Sjl mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 69625cf1a30Sjl fmd_hdl_abort(hdl, "Failed to init xprthdl " 69725cf1a30Sjl "for %s", mp->epm_ep_str); 69825cf1a30Sjl } 69925cf1a30Sjl fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 70025cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 70125cf1a30Sjl 70225cf1a30Sjl mp->epm_qstat = Q_OPEN; 70325cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 70425cf1a30Sjl fmd_hdl_debug(hdl, "queue open for %s", 70525cf1a30Sjl mp->epm_ep_str); 70625cf1a30Sjl } else { 70725cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 70825cf1a30Sjl fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 70925cf1a30Sjl "from %s\n", mp->epm_ep_str); 71025cf1a30Sjl INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 71125cf1a30Sjl } 71225cf1a30Sjl 71325cf1a30Sjl rv = 0; 71425cf1a30Sjl break; 71525cf1a30Sjl 71625cf1a30Sjl case ETM_HDR_SHUTDOWN: 71725cf1a30Sjl fmd_hdl_debug(hdl, "received shutdown from %s", 71825cf1a30Sjl mp->epm_ep_str); 71925cf1a30Sjl 72025cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 72125cf1a30Sjl 72225cf1a30Sjl etm_reinit(hdl, mp); 72325cf1a30Sjl 72425cf1a30Sjl if (IS_CLIENT(mp)) { 72525cf1a30Sjl /* 72625cf1a30Sjl * A server shutdown is considered to be temporary. 72725cf1a30Sjl * Prepare for reconnection. 72825cf1a30Sjl */ 72925cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 73025cf1a30Sjl Reconn_interval); 73125cf1a30Sjl 73225cf1a30Sjl mp->epm_timer_in_use = 1; 73325cf1a30Sjl } 73425cf1a30Sjl 73525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 73625cf1a30Sjl 73725cf1a30Sjl rv = ECANCELED; 73825cf1a30Sjl break; 73925cf1a30Sjl 74025cf1a30Sjl case ETM_HDR_MSG: 74125cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 74225cf1a30Sjl if (mp->epm_qstat == Q_UNINITIALIZED) { 74325cf1a30Sjl /* Peer (client) is unaware that we've restarted */ 74425cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 74525cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 74625cf1a30Sjl ETM_HDR_S_RESTART, 0); 74725cf1a30Sjl 74825cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 74925cf1a30Sjl hdrlen)) != hdrlen) { 75025cf1a30Sjl fmd_hdl_debug(hdl, "failed to write S_RESTART " 75125cf1a30Sjl "to %s", mp->epm_ep_str); 75225cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 75325cf1a30Sjl return (EIO); 75425cf1a30Sjl } 75525cf1a30Sjl 75625cf1a30Sjl return (ECANCELED); 75725cf1a30Sjl } 75825cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 75925cf1a30Sjl 76025cf1a30Sjl buflen = etm_get_msglen(hbuf); 76125cf1a30Sjl ALLOC_BUF(hdl, buf, buflen); 76225cf1a30Sjl 76325cf1a30Sjl if (etm_xport_read(hdl, conn, Rw_timeout, buf, 76425cf1a30Sjl buflen) != buflen) { 76525cf1a30Sjl fmd_hdl_debug(hdl, "failed to read message from %s", 76625cf1a30Sjl mp->epm_ep_str); 76725cf1a30Sjl FREE_BUF(hdl, buf, buflen); 76825cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 76925cf1a30Sjl return (EIO); 77025cf1a30Sjl } 77125cf1a30Sjl 77225cf1a30Sjl INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 77325cf1a30Sjl ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 77425cf1a30Sjl 77525cf1a30Sjl etm_hex_dump(hdl, buf, buflen, 0); 77625cf1a30Sjl 77725cf1a30Sjl if (etm_post_msg(hdl, mp, buf, buflen)) { 77825cf1a30Sjl INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 77925cf1a30Sjl FREE_BUF(hdl, buf, buflen); 78025cf1a30Sjl return (EIO); 78125cf1a30Sjl } 78225cf1a30Sjl 78325cf1a30Sjl FREE_BUF(hdl, buf, buflen); 78425cf1a30Sjl 78525cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 78625cf1a30Sjl 78725cf1a30Sjl if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 78825cf1a30Sjl hdrlen)) != hdrlen) { 78925cf1a30Sjl fmd_hdl_debug(hdl, "failed to write ACK to %s", 79025cf1a30Sjl mp->epm_ep_str); 79125cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 79225cf1a30Sjl return (EIO); 79325cf1a30Sjl } 79425cf1a30Sjl 79525cf1a30Sjl INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 79625cf1a30Sjl 79725cf1a30Sjl /* 79825cf1a30Sjl * If we got this far and the current state of the 79925cf1a30Sjl * outbound/sending connection is TIMED_OUT or 80025cf1a30Sjl * LIMBO, then we should reinitialize it. 80125cf1a30Sjl */ 80225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 80325cf1a30Sjl if (mp->epm_cstat == C_TIMED_OUT || 80425cf1a30Sjl mp->epm_cstat == C_LIMBO) { 80525cf1a30Sjl if (mp->epm_oconn != NULL) { 80625cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 80725cf1a30Sjl mp->epm_oconn = NULL; 80825cf1a30Sjl } 80925cf1a30Sjl mp->epm_cstat = C_UNINITIALIZED; 81025cf1a30Sjl fmd_xprt_resume(hdl, mp->epm_xprthdl); 81125cf1a30Sjl if (mp->epm_timer_in_use) { 81225cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 81325cf1a30Sjl mp->epm_timer_in_use = 0; 81425cf1a30Sjl } 81525cf1a30Sjl mp->epm_qstat = Q_OPEN; 81625cf1a30Sjl fmd_hdl_debug(hdl, "queue resumed for %s", 81725cf1a30Sjl mp->epm_ep_str); 81825cf1a30Sjl } 81925cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 82025cf1a30Sjl 82125cf1a30Sjl rv = 0; 82225cf1a30Sjl break; 82325cf1a30Sjl 82425cf1a30Sjl default: 82525cf1a30Sjl fmd_hdl_debug(hdl, "protocol error, unexpected header " 82625cf1a30Sjl "from %s : %d", mp->epm_ep_str, hdrstat); 82725cf1a30Sjl INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 82825cf1a30Sjl rv = 0; 82925cf1a30Sjl } 83025cf1a30Sjl 83125cf1a30Sjl return (rv); 83225cf1a30Sjl } 83325cf1a30Sjl 83425cf1a30Sjl /* 83525cf1a30Sjl * ETM transport layer callback function. 83625cf1a30Sjl * The transport layer calls this function to : 83725cf1a30Sjl * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 83825cf1a30Sjl * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 83925cf1a30Sjl */ 84025cf1a30Sjl static int 84125cf1a30Sjl etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 84225cf1a30Sjl void *arg) 84325cf1a30Sjl { 84425cf1a30Sjl etm_epmap_t *mp = (etm_epmap_t *)arg; 84525cf1a30Sjl int rv = 0; 84625cf1a30Sjl 84725cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 84825cf1a30Sjl if (Etm_exit) { 84925cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 85025cf1a30Sjl return (ECANCELED); 85125cf1a30Sjl } 85225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 85325cf1a30Sjl 85425cf1a30Sjl switch (flag) { 85525cf1a30Sjl case ETM_CBFLAG_RECV: 85625cf1a30Sjl rv = etm_recv(hdl, conn, mp); 85725cf1a30Sjl break; 85825cf1a30Sjl case ETM_CBFLAG_REINIT: 85925cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 86025cf1a30Sjl etm_reinit(hdl, mp); 86125cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 86225cf1a30Sjl /* 86325cf1a30Sjl * Return ECANCELED so the transport layer will close the 86425cf1a30Sjl * server connection. The transport layer is responsible for 86525cf1a30Sjl * reestablishing this connection (should a connection request 86625cf1a30Sjl * arrive from the peer). 86725cf1a30Sjl */ 86825cf1a30Sjl rv = ECANCELED; 86925cf1a30Sjl break; 87025cf1a30Sjl default: 87125cf1a30Sjl fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 87225cf1a30Sjl rv = ENOTSUP; 87325cf1a30Sjl } 87425cf1a30Sjl 87525cf1a30Sjl return (rv); 87625cf1a30Sjl } 87725cf1a30Sjl 87825cf1a30Sjl /* 87925cf1a30Sjl * Allocate and initialize an etm_epmap_t struct for the given endpoint 88025cf1a30Sjl * name string. 88125cf1a30Sjl */ 88225cf1a30Sjl static void 88325cf1a30Sjl etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 88425cf1a30Sjl { 88525cf1a30Sjl etm_epmap_t *newmap; 88625cf1a30Sjl 88725cf1a30Sjl if (etm_check_dup_ep_str(hdl, epname)) { 88825cf1a30Sjl fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 88925cf1a30Sjl return; 89025cf1a30Sjl } 89125cf1a30Sjl 89225cf1a30Sjl newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 89325cf1a30Sjl newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 89425cf1a30Sjl newmap->epm_xprtflags = flags; 89525cf1a30Sjl newmap->epm_cstat = C_UNINITIALIZED; 89625cf1a30Sjl newmap->epm_qstat = Q_UNINITIALIZED; 89725cf1a30Sjl newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 89825cf1a30Sjl newmap->epm_txbusy = 0; 89925cf1a30Sjl 90025cf1a30Sjl (void) pthread_mutex_init(&newmap->epm_lock, NULL); 90125cf1a30Sjl (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 90225cf1a30Sjl 90325cf1a30Sjl if (etm_get_ep_nvl(hdl, newmap)) { 90425cf1a30Sjl fmd_hdl_strfree(hdl, newmap->epm_ep_str); 90525cf1a30Sjl fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 90625cf1a30Sjl return; 90725cf1a30Sjl } 90825cf1a30Sjl 90925cf1a30Sjl if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 91025cf1a30Sjl etm_cb_func, newmap)) == NULL) { 91125cf1a30Sjl fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 91225cf1a30Sjl newmap->epm_ep_str); 91325cf1a30Sjl etm_free_ep_nvl(hdl, newmap); 91425cf1a30Sjl fmd_hdl_strfree(hdl, newmap->epm_ep_str); 91525cf1a30Sjl fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 91625cf1a30Sjl return; 91725cf1a30Sjl } 91825cf1a30Sjl 91925cf1a30Sjl if (IS_CLIENT(newmap)) { 92025cf1a30Sjl if (etm_handle_startup(hdl, newmap)) { 921154b1f02Sjrutt /* 922154b1f02Sjrutt * For whatever reason, we could not complete the 923154b1f02Sjrutt * startup handshake with the server. Set the timer 924154b1f02Sjrutt * and try again. 925154b1f02Sjrutt */ 926154b1f02Sjrutt if (newmap->epm_oconn != NULL) { 927154b1f02Sjrutt (void) etm_xport_close(hdl, newmap->epm_oconn); 928154b1f02Sjrutt newmap->epm_oconn = NULL; 929154b1f02Sjrutt } 930154b1f02Sjrutt newmap->epm_cstat = C_UNINITIALIZED; 931154b1f02Sjrutt newmap->epm_qstat = Q_UNINITIALIZED; 932154b1f02Sjrutt newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 933154b1f02Sjrutt NULL, Reconn_interval); 934154b1f02Sjrutt newmap->epm_timer_in_use = 1; 93525cf1a30Sjl } 93625cf1a30Sjl } 93725cf1a30Sjl 93825cf1a30Sjl /* Add this transport instance handle to the list */ 93925cf1a30Sjl newmap->epm_next = Epmap_head; 94025cf1a30Sjl Epmap_head = newmap; 94125cf1a30Sjl 94225cf1a30Sjl INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 94325cf1a30Sjl } 94425cf1a30Sjl 94525cf1a30Sjl /* 94625cf1a30Sjl * Parse the given property list string and call etm_init_epmap 94725cf1a30Sjl * for each endpoint. 94825cf1a30Sjl */ 94925cf1a30Sjl static void 95025cf1a30Sjl etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 95125cf1a30Sjl { 95225cf1a30Sjl char *epstr, *ep, *prefix, *lasts, *numstr; 95325cf1a30Sjl char epname[MAXPATHLEN]; 95425cf1a30Sjl size_t slen, nlen; 95525cf1a30Sjl int beg, end, i; 95625cf1a30Sjl 95725cf1a30Sjl if (eplist == NULL) 95825cf1a30Sjl return; 95925cf1a30Sjl /* 96025cf1a30Sjl * Create a copy of eplist for parsing. 96125cf1a30Sjl * strtok/strtok_r(3C) will insert null chars to the string. 96225cf1a30Sjl * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 96325cf1a30Sjl */ 96425cf1a30Sjl slen = strlen(eplist); 96525cf1a30Sjl epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 96625cf1a30Sjl (void) strcpy(epstr, eplist); 96725cf1a30Sjl 96825cf1a30Sjl /* 96925cf1a30Sjl * The following are supported for the "client_list" and 97025cf1a30Sjl * "server_list" properties : 97125cf1a30Sjl * 97225cf1a30Sjl * A space-separated list of endpoints. 97325cf1a30Sjl * "dev:///dom0 dev:///dom1 dev:///dom2" 97425cf1a30Sjl * 97525cf1a30Sjl * An array syntax for a range of instances. 97625cf1a30Sjl * "dev:///dom[0:2]" 97725cf1a30Sjl * 97825cf1a30Sjl * A combination of both. 97925cf1a30Sjl * "dev:///dom0 dev:///dom[1:2]" 98025cf1a30Sjl */ 98125cf1a30Sjl ep = strtok_r(epstr, " ", &lasts); 98225cf1a30Sjl while (ep != NULL) { 98325cf1a30Sjl if (strchr(ep, '[') != NULL) { 98425cf1a30Sjl /* 98525cf1a30Sjl * This string is using array syntax. 98625cf1a30Sjl * Check the string for correct syntax. 98725cf1a30Sjl */ 98825cf1a30Sjl if ((strchr(ep, ':') == NULL) || 98925cf1a30Sjl (strchr(ep, ']') == NULL)) { 99025cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 99125cf1a30Sjl "that includes : %s\n", ep); 99225cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 99325cf1a30Sjl continue; 99425cf1a30Sjl } 99525cf1a30Sjl 99625cf1a30Sjl /* expand the array syntax */ 99725cf1a30Sjl prefix = strtok(ep, "["); 99825cf1a30Sjl 99925cf1a30Sjl numstr = strtok(NULL, ":"); 100025cf1a30Sjl if ((numstr == NULL) || (!isdigit(*numstr))) { 100125cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 100225cf1a30Sjl "that includes : %s[\n", prefix); 100325cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 100425cf1a30Sjl continue; 100525cf1a30Sjl } 100625cf1a30Sjl beg = atoi(numstr); 100725cf1a30Sjl 100825cf1a30Sjl numstr = strtok(NULL, "]"); 100925cf1a30Sjl if ((numstr == NULL) || (!isdigit(*numstr))) { 101025cf1a30Sjl fmd_hdl_error(hdl, "Syntax error in property " 101125cf1a30Sjl "that includes : %s[\n", prefix); 101225cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 101325cf1a30Sjl continue; 101425cf1a30Sjl } 101525cf1a30Sjl end = atoi(numstr); 101625cf1a30Sjl 101725cf1a30Sjl nlen = strlen(prefix) + ETM_EP_INST_MAX; 101825cf1a30Sjl 101925cf1a30Sjl if (nlen > MAXPATHLEN) { 102025cf1a30Sjl fmd_hdl_error(hdl, "Endpoint prop string " 102125cf1a30Sjl "exceeds MAXPATHLEN\n"); 102225cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 102325cf1a30Sjl continue; 102425cf1a30Sjl } 102525cf1a30Sjl 102625cf1a30Sjl for (i = beg; i <= end; i++) { 102725cf1a30Sjl bzero(epname, MAXPATHLEN); 102825cf1a30Sjl (void) snprintf(epname, nlen, "%s%d", 102925cf1a30Sjl prefix, i); 103025cf1a30Sjl etm_init_epmap(hdl, epname, flags); 103125cf1a30Sjl } 103225cf1a30Sjl } else { 103325cf1a30Sjl etm_init_epmap(hdl, ep, flags); 103425cf1a30Sjl } 103525cf1a30Sjl 103625cf1a30Sjl ep = strtok_r(NULL, " ", &lasts); 103725cf1a30Sjl } 103825cf1a30Sjl 103925cf1a30Sjl fmd_hdl_free(hdl, epstr, slen + 1); 104025cf1a30Sjl } 104125cf1a30Sjl 104225cf1a30Sjl /* 104325cf1a30Sjl * Free the transport infrastructure for an endpoint. 104425cf1a30Sjl */ 104525cf1a30Sjl static void 104625cf1a30Sjl etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 104725cf1a30Sjl { 104825cf1a30Sjl size_t hdrlen; 104925cf1a30Sjl char hbuf[ETM_HDRLEN]; 105025cf1a30Sjl 105125cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 105225cf1a30Sjl 105325cf1a30Sjl /* 105425cf1a30Sjl * If an etm_send thread is in progress, wait for it to finish. 105525cf1a30Sjl * The etm_recv thread is managed by the transport layer and will 105625cf1a30Sjl * be destroyed with etm_xport_fini(). 105725cf1a30Sjl */ 105825cf1a30Sjl while (mp->epm_txbusy) 105925cf1a30Sjl (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 106025cf1a30Sjl 106125cf1a30Sjl if (mp->epm_timer_in_use) 106225cf1a30Sjl fmd_timer_remove(hdl, mp->epm_timer_id); 106325cf1a30Sjl 106425cf1a30Sjl if (mp->epm_oconn != NULL) { 106525cf1a30Sjl hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 106625cf1a30Sjl ETM_HDR_SHUTDOWN, 0); 106725cf1a30Sjl (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 106825cf1a30Sjl hdrlen); 106925cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 107025cf1a30Sjl mp->epm_oconn = NULL; 107125cf1a30Sjl } 107225cf1a30Sjl 107325cf1a30Sjl if (mp->epm_xprthdl != NULL) { 107425cf1a30Sjl fmd_xprt_close(hdl, mp->epm_xprthdl); 107525cf1a30Sjl /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 107625cf1a30Sjl mp->epm_ep_nvl = NULL; 107725cf1a30Sjl } 107825cf1a30Sjl 107925cf1a30Sjl if (mp->epm_ep_nvl != NULL) 108025cf1a30Sjl etm_free_ep_nvl(hdl, mp); 108125cf1a30Sjl 108225cf1a30Sjl if (mp->epm_tlhdl != NULL) 108325cf1a30Sjl (void) etm_xport_fini(hdl, mp->epm_tlhdl); 108425cf1a30Sjl 108525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 108625cf1a30Sjl (void) pthread_mutex_destroy(&mp->epm_lock); 108725cf1a30Sjl fmd_hdl_strfree(hdl, mp->epm_ep_str); 108825cf1a30Sjl fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 108925cf1a30Sjl DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 109025cf1a30Sjl } 109125cf1a30Sjl 109225cf1a30Sjl /* 109325cf1a30Sjl * FMD entry points 109425cf1a30Sjl */ 109525cf1a30Sjl 109625cf1a30Sjl /* 109725cf1a30Sjl * FMD fmdo_send entry point. 109825cf1a30Sjl * Send an event to the remote endpoint and receive an ACK. 109925cf1a30Sjl */ 110025cf1a30Sjl static int 110125cf1a30Sjl etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 110225cf1a30Sjl { 110325cf1a30Sjl etm_epmap_t *mp; 110425cf1a30Sjl nvlist_t *msgnvl; 1105*77a7fd96Sjrutt int hdrstat, rv, cnt = 0; 110625cf1a30Sjl char *buf, *nvbuf, *class; 110725cf1a30Sjl size_t nvsize, buflen, hdrlen; 1108*77a7fd96Sjrutt struct timespec tms; 110925cf1a30Sjl 111025cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 111125cf1a30Sjl if (Etm_exit) { 111225cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 111325cf1a30Sjl return (FMD_SEND_RETRY); 111425cf1a30Sjl } 111525cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 111625cf1a30Sjl 111725cf1a30Sjl mp = fmd_xprt_getspecific(hdl, xprthdl); 111825cf1a30Sjl 1119*77a7fd96Sjrutt for (;;) { 1120*77a7fd96Sjrutt if (pthread_mutex_trylock(&mp->epm_lock) == 0) { 1121*77a7fd96Sjrutt break; 1122*77a7fd96Sjrutt } else { 1123*77a7fd96Sjrutt /* 1124*77a7fd96Sjrutt * Another thread may be (1) trying to close this 1125*77a7fd96Sjrutt * fmd_xprt_t, or (2) posting an event to it. 1126*77a7fd96Sjrutt * If (1), don't want to spend too much time here. 1127*77a7fd96Sjrutt * If (2), allow it to finish and release epm_lock. 1128*77a7fd96Sjrutt */ 1129*77a7fd96Sjrutt if (cnt++ < 10) { 1130*77a7fd96Sjrutt tms.tv_sec = 0; 1131*77a7fd96Sjrutt tms.tv_nsec = (cnt * 10000); 1132*77a7fd96Sjrutt (void) nanosleep(&tms, NULL); 1133*77a7fd96Sjrutt 1134*77a7fd96Sjrutt } else { 1135*77a7fd96Sjrutt return (FMD_SEND_RETRY); 1136*77a7fd96Sjrutt } 1137*77a7fd96Sjrutt } 1138*77a7fd96Sjrutt } 113925cf1a30Sjl 114025cf1a30Sjl mp->epm_txbusy++; 114125cf1a30Sjl 1142154b1f02Sjrutt if (mp->epm_qstat == Q_UNINITIALIZED) { 114325cf1a30Sjl mp->epm_txbusy--; 114425cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 114545736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 1146154b1f02Sjrutt return (FMD_SEND_FAILED); 1147154b1f02Sjrutt } 1148154b1f02Sjrutt 1149154b1f02Sjrutt if (mp->epm_cstat == C_CLOSED) { 115025cf1a30Sjl etm_suspend_reconnect(hdl, mp); 1151154b1f02Sjrutt mp->epm_txbusy--; 1152154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 115345736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 115425cf1a30Sjl return (FMD_SEND_RETRY); 115525cf1a30Sjl } 115625cf1a30Sjl 115725cf1a30Sjl if (mp->epm_cstat == C_LIMBO) { 115825cf1a30Sjl if (mp->epm_oconn != NULL) { 115925cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 116025cf1a30Sjl mp->epm_oconn = NULL; 116125cf1a30Sjl } 116225cf1a30Sjl 116325cf1a30Sjl fmd_xprt_suspend(hdl, xprthdl); 116425cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 116525cf1a30Sjl mp->epm_txbusy--; 116625cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 116745736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 116825cf1a30Sjl fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 116925cf1a30Sjl return (FMD_SEND_RETRY); 117025cf1a30Sjl } 117125cf1a30Sjl 117225cf1a30Sjl if (mp->epm_oconn == NULL) { 117325cf1a30Sjl if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 117425cf1a30Sjl == NULL) { 1175154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 117625cf1a30Sjl mp->epm_txbusy--; 117725cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 117845736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 117925cf1a30Sjl return (FMD_SEND_RETRY); 118025cf1a30Sjl } else { 118125cf1a30Sjl mp->epm_cstat = C_OPEN; 118225cf1a30Sjl } 118325cf1a30Sjl } 118425cf1a30Sjl 118525cf1a30Sjl if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 118625cf1a30Sjl fmd_hdl_abort(hdl, "No class string in nvlist"); 118725cf1a30Sjl 118825cf1a30Sjl msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 118925cf1a30Sjl if (msgnvl == NULL) { 1190154b1f02Sjrutt mp->epm_txbusy--; 1191154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 119245736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 119325cf1a30Sjl fmd_hdl_error(hdl, "Failed to translate event %p\n", 119425cf1a30Sjl (void *) ep); 119525cf1a30Sjl return (FMD_SEND_FAILED); 119625cf1a30Sjl } 119725cf1a30Sjl 1198*77a7fd96Sjrutt rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str); 1199*77a7fd96Sjrutt if (rv == ETM_XPORT_FILTER_DROP) { 1200*77a7fd96Sjrutt mp->epm_txbusy--; 1201*77a7fd96Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1202*77a7fd96Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 1203*77a7fd96Sjrutt fmd_hdl_debug(hdl, "send_filter dropped event"); 1204*77a7fd96Sjrutt nvlist_free(msgnvl); 1205*77a7fd96Sjrutt INCRSTAT(Etm_stats.send_filter.fmds_value.ui64); 1206*77a7fd96Sjrutt return (FMD_SEND_SUCCESS); 1207*77a7fd96Sjrutt } else if (rv == ETM_XPORT_FILTER_ERROR) { 1208*77a7fd96Sjrutt fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno)); 1209*77a7fd96Sjrutt INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64); 1210*77a7fd96Sjrutt /* Still send event */ 1211*77a7fd96Sjrutt } 1212*77a7fd96Sjrutt 121325cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 121425cf1a30Sjl 121525cf1a30Sjl (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 121625cf1a30Sjl 121725cf1a30Sjl hdrlen = ETM_HDRLEN; 121825cf1a30Sjl buflen = nvsize + hdrlen; 121925cf1a30Sjl 122025cf1a30Sjl ALLOC_BUF(hdl, buf, buflen); 122125cf1a30Sjl 122225cf1a30Sjl nvbuf = buf + hdrlen; 122325cf1a30Sjl 122425cf1a30Sjl (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 122525cf1a30Sjl 122625cf1a30Sjl if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1227154b1f02Sjrutt (void) pthread_mutex_lock(&mp->epm_lock); 1228154b1f02Sjrutt mp->epm_txbusy--; 1229154b1f02Sjrutt (void) pthread_cond_broadcast(&mp->epm_tx_cv); 123045736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 123125cf1a30Sjl fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1232*77a7fd96Sjrutt nvlist_free(msgnvl); 123325cf1a30Sjl FREE_BUF(hdl, buf, buflen); 123425cf1a30Sjl return (FMD_SEND_FAILED); 123525cf1a30Sjl } 123625cf1a30Sjl 123725cf1a30Sjl nvlist_free(msgnvl); 123825cf1a30Sjl 123925cf1a30Sjl if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 124025cf1a30Sjl buflen) != buflen) { 1241154b1f02Sjrutt fmd_hdl_debug(hdl, "failed to send message to %s", 1242154b1f02Sjrutt mp->epm_ep_str); 124325cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 1244154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 124525cf1a30Sjl mp->epm_txbusy--; 124625cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 124745736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 124825cf1a30Sjl FREE_BUF(hdl, buf, buflen); 124925cf1a30Sjl INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 125025cf1a30Sjl return (FMD_SEND_RETRY); 125125cf1a30Sjl } 125225cf1a30Sjl 125325cf1a30Sjl INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 125425cf1a30Sjl ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 125525cf1a30Sjl 125625cf1a30Sjl etm_hex_dump(hdl, nvbuf, nvsize, 1); 125725cf1a30Sjl 125825cf1a30Sjl if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 125925cf1a30Sjl hdrlen) != hdrlen) { 1260154b1f02Sjrutt fmd_hdl_debug(hdl, "failed to read ACK from %s", 1261154b1f02Sjrutt mp->epm_ep_str); 126225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 1263154b1f02Sjrutt etm_suspend_reconnect(hdl, mp); 126425cf1a30Sjl mp->epm_txbusy--; 126525cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 126645736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 126725cf1a30Sjl FREE_BUF(hdl, buf, buflen); 126825cf1a30Sjl INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 126925cf1a30Sjl return (FMD_SEND_RETRY); 127025cf1a30Sjl } 127125cf1a30Sjl 127225cf1a30Sjl hdrstat = etm_check_hdr(hdl, mp, buf); 127325cf1a30Sjl FREE_BUF(hdl, buf, buflen); 127425cf1a30Sjl 127525cf1a30Sjl if (hdrstat == ETM_HDR_ACK) { 127625cf1a30Sjl INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 127725cf1a30Sjl } else { 127825cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 127925cf1a30Sjl 128025cf1a30Sjl (void) etm_xport_close(hdl, mp->epm_oconn); 128125cf1a30Sjl mp->epm_oconn = NULL; 128225cf1a30Sjl 128325cf1a30Sjl if (hdrstat == ETM_HDR_NAK) { 128425cf1a30Sjl /* Peer received a bad value in the header */ 128525cf1a30Sjl if (mp->epm_xprthdl != NULL) { 128625cf1a30Sjl mp->epm_cstat = C_LIMBO; 128725cf1a30Sjl fmd_xprt_suspend(hdl, xprthdl); 128825cf1a30Sjl mp->epm_qstat = Q_SUSPENDED; 128925cf1a30Sjl fmd_hdl_debug(hdl, "received NAK, queue " 129025cf1a30Sjl "suspended for %s", mp->epm_ep_str); 129125cf1a30Sjl } 129225cf1a30Sjl 129325cf1a30Sjl rv = FMD_SEND_RETRY; 129425cf1a30Sjl 129525cf1a30Sjl } else if (hdrstat == ETM_HDR_S_RESTART) { 129625cf1a30Sjl /* Server has restarted */ 1297154b1f02Sjrutt mp->epm_cstat = C_CLOSED; 1298154b1f02Sjrutt mp->epm_qstat = Q_UNINITIALIZED; 1299154b1f02Sjrutt fmd_hdl_debug(hdl, "server %s restarted", 1300154b1f02Sjrutt mp->epm_ep_str); 1301154b1f02Sjrutt /* 1302154b1f02Sjrutt * Cannot call fmd_xprt_close here, so we'll do it 1303154b1f02Sjrutt * on the timeout thread. 1304154b1f02Sjrutt */ 1305154b1f02Sjrutt if (mp->epm_timer_in_use == 0) { 1306154b1f02Sjrutt mp->epm_timer_id = fmd_timer_install( 1307154b1f02Sjrutt hdl, mp, NULL, 0); 1308154b1f02Sjrutt mp->epm_timer_in_use = 1; 130925cf1a30Sjl } 131025cf1a30Sjl 131125cf1a30Sjl /* 131225cf1a30Sjl * fault.* or list.* events will be replayed if a 131325cf1a30Sjl * transport is opened with the same auth. 131425cf1a30Sjl * Other events will be discarded. 131525cf1a30Sjl */ 131625cf1a30Sjl rv = FMD_SEND_FAILED; 131725cf1a30Sjl 131825cf1a30Sjl } else { 131925cf1a30Sjl mp->epm_cstat = C_CLOSED; 132025cf1a30Sjl fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 132125cf1a30Sjl 132225cf1a30Sjl rv = FMD_SEND_RETRY; 132325cf1a30Sjl } 132425cf1a30Sjl 132525cf1a30Sjl mp->epm_txbusy--; 132625cf1a30Sjl 132725cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 132845736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 132925cf1a30Sjl 133025cf1a30Sjl INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 133125cf1a30Sjl 133225cf1a30Sjl return (rv); 133325cf1a30Sjl } 133425cf1a30Sjl 133525cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 133625cf1a30Sjl mp->epm_txbusy--; 133725cf1a30Sjl (void) pthread_cond_broadcast(&mp->epm_tx_cv); 133845736083Sjrutt (void) pthread_mutex_unlock(&mp->epm_lock); 133925cf1a30Sjl 134025cf1a30Sjl return (FMD_SEND_SUCCESS); 134125cf1a30Sjl } 134225cf1a30Sjl 134325cf1a30Sjl /* 134425cf1a30Sjl * FMD fmdo_timeout entry point.. 134525cf1a30Sjl */ 134625cf1a30Sjl /*ARGSUSED*/ 134725cf1a30Sjl static void 134825cf1a30Sjl etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 134925cf1a30Sjl { 135025cf1a30Sjl etm_epmap_t *mp = (etm_epmap_t *)data; 135125cf1a30Sjl 135225cf1a30Sjl (void) pthread_mutex_lock(&mp->epm_lock); 135325cf1a30Sjl 135425cf1a30Sjl mp->epm_timer_in_use = 0; 135525cf1a30Sjl 135625cf1a30Sjl if (mp->epm_qstat == Q_UNINITIALIZED) { 135725cf1a30Sjl /* Server has shutdown and we (client) need to reconnect */ 1358154b1f02Sjrutt if (mp->epm_xprthdl != NULL) { 1359154b1f02Sjrutt fmd_xprt_close(hdl, mp->epm_xprthdl); 1360154b1f02Sjrutt fmd_hdl_debug(hdl, "queue closed for %s", 1361154b1f02Sjrutt mp->epm_ep_str); 1362154b1f02Sjrutt mp->epm_xprthdl = NULL; 1363154b1f02Sjrutt /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1364154b1f02Sjrutt mp->epm_ep_nvl = NULL; 1365154b1f02Sjrutt } 1366154b1f02Sjrutt 136725cf1a30Sjl if (mp->epm_ep_nvl == NULL) 136825cf1a30Sjl (void) etm_get_ep_nvl(hdl, mp); 136925cf1a30Sjl 137025cf1a30Sjl if (etm_handle_startup(hdl, mp)) { 1371154b1f02Sjrutt if (mp->epm_oconn != NULL) { 1372154b1f02Sjrutt (void) etm_xport_close(hdl, mp->epm_oconn); 1373154b1f02Sjrutt mp->epm_oconn = NULL; 1374154b1f02Sjrutt } 1375154b1f02Sjrutt mp->epm_cstat = C_UNINITIALIZED; 137625cf1a30Sjl mp->epm_qstat = Q_UNINITIALIZED; 137725cf1a30Sjl mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 137825cf1a30Sjl Reconn_interval); 137925cf1a30Sjl mp->epm_timer_in_use = 1; 138025cf1a30Sjl } 138125cf1a30Sjl } else { 138225cf1a30Sjl etm_reconnect(hdl, mp); 138325cf1a30Sjl } 138425cf1a30Sjl 138525cf1a30Sjl (void) pthread_mutex_unlock(&mp->epm_lock); 138625cf1a30Sjl } 138725cf1a30Sjl 138825cf1a30Sjl /* 138925cf1a30Sjl * FMD Module declarations 139025cf1a30Sjl */ 139125cf1a30Sjl static const fmd_hdl_ops_t etm_ops = { 139225cf1a30Sjl NULL, /* fmdo_recv */ 139325cf1a30Sjl etm_timeout, /* fmdo_timeout */ 139425cf1a30Sjl NULL, /* fmdo_close */ 139525cf1a30Sjl NULL, /* fmdo_stats */ 139625cf1a30Sjl NULL, /* fmdo_gc */ 139725cf1a30Sjl etm_send, /* fmdo_send */ 139825cf1a30Sjl }; 139925cf1a30Sjl 140025cf1a30Sjl static const fmd_prop_t etm_props[] = { 140125cf1a30Sjl { "client_list", FMD_TYPE_STRING, NULL }, 140225cf1a30Sjl { "server_list", FMD_TYPE_STRING, NULL }, 140325cf1a30Sjl { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1404*77a7fd96Sjrutt { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" }, 1405*77a7fd96Sjrutt { "rw_timeout", FMD_TYPE_UINT64, "2000000000" }, 1406*77a7fd96Sjrutt { "filter_path", FMD_TYPE_STRING, NULL }, 140725cf1a30Sjl { NULL, 0, NULL } 140825cf1a30Sjl }; 140925cf1a30Sjl 141025cf1a30Sjl static const fmd_hdl_info_t etm_info = { 141125cf1a30Sjl "Event Transport Module", "2.0", &etm_ops, etm_props 141225cf1a30Sjl }; 141325cf1a30Sjl 141425cf1a30Sjl /* 141525cf1a30Sjl * Initialize the transport for use by ETM. 141625cf1a30Sjl */ 141725cf1a30Sjl void 141825cf1a30Sjl _fmd_init(fmd_hdl_t *hdl) 141925cf1a30Sjl { 142025cf1a30Sjl char *propstr; 142125cf1a30Sjl 142225cf1a30Sjl if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 142325cf1a30Sjl return; /* invalid data in configuration file */ 142425cf1a30Sjl } 142525cf1a30Sjl 142625cf1a30Sjl /* Create global stats */ 142725cf1a30Sjl (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 142825cf1a30Sjl sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 142925cf1a30Sjl 143025cf1a30Sjl /* Get module properties */ 143125cf1a30Sjl Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 143225cf1a30Sjl Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 143325cf1a30Sjl Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 143425cf1a30Sjl 143525cf1a30Sjl propstr = fmd_prop_get_string(hdl, "client_list"); 143625cf1a30Sjl etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 143725cf1a30Sjl fmd_prop_free_string(hdl, propstr); 143825cf1a30Sjl 143925cf1a30Sjl propstr = fmd_prop_get_string(hdl, "server_list"); 144025cf1a30Sjl etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 144125cf1a30Sjl fmd_prop_free_string(hdl, propstr); 144225cf1a30Sjl 144325cf1a30Sjl if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 144425cf1a30Sjl fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 144525cf1a30Sjl fmd_hdl_unregister(hdl); 144625cf1a30Sjl return; 144725cf1a30Sjl } 144825cf1a30Sjl } 144925cf1a30Sjl 145025cf1a30Sjl /* 145125cf1a30Sjl * Teardown the transport 145225cf1a30Sjl */ 145325cf1a30Sjl void 145425cf1a30Sjl _fmd_fini(fmd_hdl_t *hdl) 145525cf1a30Sjl { 145625cf1a30Sjl etm_epmap_t *mp, *next; 145725cf1a30Sjl 145825cf1a30Sjl (void) pthread_mutex_lock(&Etm_mod_lock); 145925cf1a30Sjl Etm_exit = 1; 146025cf1a30Sjl (void) pthread_mutex_unlock(&Etm_mod_lock); 146125cf1a30Sjl 146225cf1a30Sjl mp = Epmap_head; 146325cf1a30Sjl 146425cf1a30Sjl while (mp) { 146525cf1a30Sjl next = mp->epm_next; 146625cf1a30Sjl etm_free_epmap(hdl, mp); 146725cf1a30Sjl mp = next; 146825cf1a30Sjl } 146925cf1a30Sjl 147025cf1a30Sjl fmd_hdl_unregister(hdl); 147125cf1a30Sjl } 1472