xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision 7a6d80f1660abd4755c68cbd094d4a914681d26e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMA Event Transport Module
29  *
30  * Plugin for sending/receiving FMA events to/from a remote endoint.
31  */
32 
33 #include <netinet/in.h>
34 #include <errno.h>
35 #include <sys/fm/protocol.h>
36 #include <sys/sysmacros.h>
37 #include <pthread.h>
38 #include <strings.h>
39 #include <ctype.h>
40 #include <link.h>
41 #include <libnvpair.h>
42 #include "etm_xport_api.h"
43 #include "etm_proto.h"
44 
45 /*
46  * ETM declarations
47  */
48 
49 typedef enum etm_connection_status {
50 	C_UNINITIALIZED = 0,
51 	C_OPEN,				/* Connection is open */
52 	C_CLOSED,			/* Connection is closed */
53 	C_LIMBO,			/* Bad value in header from peer */
54 	C_TIMED_OUT			/* Reconnection to peer timed out */
55 } etm_connstat_t;
56 
57 typedef enum etm_fmd_queue_status {
58 	Q_UNINITIALIZED = 100,
59 	Q_INIT_PENDING,			/* Queue initialization in progress */
60 	Q_OPEN,				/* Queue is open */
61 	Q_SUSPENDED			/* Queue is suspended */
62 } etm_qstat_t;
63 
64 /* Per endpoint data */
65 typedef struct etm_endpoint_map {
66 	uint8_t epm_ver;		/* Protocol version being used */
67 	char *epm_ep_str;		/* Endpoint ID string */
68 	int epm_xprtflags;		/* FMD transport open flags */
69 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
70 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
71 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
72 	int epm_txbusy;			/* Busy doing send/transmit */
73 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
74 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
75 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
76 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
77 	etm_connstat_t epm_cstat;	/* Status of connection */
78 	id_t epm_timer_id;		/* Timer id */
79 	int epm_timer_in_use;		/* Indicates if timer is in use */
80 	hrtime_t epm_reconn_end;	/* Reconnection end time */
81 	struct etm_endpoint_map *epm_next;
82 } etm_epmap_t;
83 
84 #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
85 #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
86 #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
87 #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
88 #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
89 #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
90 
91 #define	ALLOC_BUF(hdl, buf, size) \
92 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
93 
94 #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
95 
96 #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
97 
98 #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
99 				(x)++;					    \
100 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
101 			}
102 
103 #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
104 				(x)--;					    \
105 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
106 			}
107 
108 #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
109 				(x) += (y);				    \
110 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
111 			}
112 
113 /*
114  * Global variables
115  */
116 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
117 					/* Protects globals */
118 static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
119 static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
120 static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
121 static int Etm_dump = 0;		/* Enables hex dump for debug */
122 static int Etm_exit = 0;		/* Flag for exit */
123 static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
124 
125 /* Module statistics */
126 static struct etm_stats {
127 	/* read counters */
128 	fmd_stat_t read_ack;
129 	fmd_stat_t read_bytes;
130 	fmd_stat_t read_msg;
131 	fmd_stat_t post_filter;
132 	/* write counters */
133 	fmd_stat_t write_ack;
134 	fmd_stat_t write_bytes;
135 	fmd_stat_t write_msg;
136 	fmd_stat_t send_filter;
137 	/* error counters */
138 	fmd_stat_t error_protocol;
139 	fmd_stat_t error_drop_read;
140 	fmd_stat_t error_read;
141 	fmd_stat_t error_read_badhdr;
142 	fmd_stat_t error_write;
143 	fmd_stat_t error_send_filter;
144 	fmd_stat_t error_post_filter;
145 	/* misc */
146 	fmd_stat_t peer_count;
147 
148 } Etm_stats = {
149 	/* read counters */
150 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
151 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
152 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
153 	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
154 	/* write counters */
155 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
156 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
157 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
158 	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
159 	/* ETM error counters */
160 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
161 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
162 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
163 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
164 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
165 	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
166 	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
167 	/* ETM Misc */
168 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
169 };
170 
171 /*
172  * ETM Private functions
173  */
174 
175 /*
176  * Hex dump for debug.
177  */
178 static void
179 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
180 {
181 	int i, j, k;
182 	int16_t *c;
183 
184 	if (Etm_dump == 0)
185 		return;
186 
187 	j = buflen / 16;	/* Number of complete 8-column rows */
188 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
189 
190 	if (direction)
191 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
192 	else
193 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
194 
195 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
196 
197 	/* Dump the complete 8-column rows */
198 	for (i = 0; i < j; i++) {
199 		c = (int16_t *)buf + (i * 8);
200 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
201 		    *(c+0), *(c+1), *(c+2), *(c+3),
202 		    *(c+4), *(c+5), *(c+6), *(c+7));
203 	}
204 
205 	/* Dump the last (incomplete) row */
206 	c = (int16_t *)buf + (i * 8);
207 	switch (k) {
208 	case 4:
209 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
210 		break;
211 	case 8:
212 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
213 		    *(c+2), *(c+3));
214 		break;
215 	case 12:
216 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
217 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
218 		break;
219 	}
220 
221 	fmd_hdl_debug(hdl, "---      End Dump      ---");
222 }
223 
224 /*
225  * Provide the length of a message based on the data in the given ETM header.
226  */
227 static size_t
228 etm_get_msglen(void *buf)
229 {
230 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
231 
232 	return (ntohl(hp->hdr_msglen));
233 }
234 
235 /*
236  * Check the contents of the ETM header for errors.
237  * Return the header type (hdr_type).
238  */
239 static int
240 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
241 {
242 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
243 
244 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
245 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
246 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
247 		return (ETM_HDR_INVALID);
248 	}
249 
250 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
251 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
252 		/* Until version is negotiated, other fields may be wrong */
253 		return (hp->hdr_type);
254 	}
255 
256 	if (hp->hdr_ver != mp->epm_ver) {
257 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
258 		    mp->epm_ep_str, hp->hdr_ver);
259 		return (ETM_HDR_BADVERSION);
260 	}
261 
262 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
263 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
264 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
265 		    mp->epm_ep_str, hp->hdr_type);
266 		return (ETM_HDR_BADTYPE);
267 	}
268 
269 	return (hp->hdr_type);
270 }
271 
272 /*
273  * Create an ETM header of a given type in the given buffer.
274  * Return length of header.
275  */
276 static size_t
277 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
278 {
279 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
280 
281 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
282 	hp->hdr_ver = ver;
283 	hp->hdr_type = type;
284 	hp->hdr_msglen = htonl(msglen);
285 
286 	return (ETM_HDRLEN);
287 }
288 
289 /*
290  * Convert message bytes to nvlist and post to fmd.
291  * Return zero for success, non-zero for failure.
292  *
293  * Note : nvl is free'd by fmd.
294  */
295 static int
296 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
297 {
298 	nvlist_t *nvl;
299 	int rv;
300 
301 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
302 		fmd_hdl_error(hdl, "failed to unpack message");
303 		return (1);
304 	}
305 
306 	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
307 	if (rv == ETM_XPORT_FILTER_DROP) {
308 		fmd_hdl_debug(hdl, "post_filter dropped event");
309 		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
310 		nvlist_free(nvl);
311 		return (0);
312 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
313 		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
314 		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
315 		/* Still post event */
316 	}
317 
318 	(void) pthread_mutex_lock(&mp->epm_lock);
319 	(void) pthread_mutex_lock(&Etm_mod_lock);
320 	if (!Etm_exit) {
321 		(void) pthread_mutex_unlock(&Etm_mod_lock);
322 		if (mp->epm_qstat == Q_OPEN) {
323 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
324 			rv = 0;
325 		} else if (mp->epm_qstat == Q_SUSPENDED) {
326 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
327 			if (mp->epm_timer_in_use) {
328 				fmd_timer_remove(hdl, mp->epm_timer_id);
329 				mp->epm_timer_in_use = 0;
330 			}
331 			mp->epm_qstat = Q_OPEN;
332 			fmd_hdl_debug(hdl, "queue resumed for %s",
333 			    mp->epm_ep_str);
334 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
335 			rv = 0;
336 		} else {
337 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
338 			    mp->epm_qstat);
339 			nvlist_free(nvl);
340 			/* Remote peer will attempt to resend event */
341 			rv = 2;
342 		}
343 	} else {
344 		(void) pthread_mutex_unlock(&Etm_mod_lock);
345 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
346 		nvlist_free(nvl);
347 		/* Remote peer will attempt to resend event */
348 		rv = 3;
349 	}
350 
351 	(void) pthread_mutex_unlock(&mp->epm_lock);
352 
353 	return (rv);
354 }
355 
356 /*
357  * Handle the startup handshake to the server.  The client always initiates
358  * the startup handshake.  In the following sequence, we are the client and
359  * the remote endpoint is the server.
360  *
361  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
362  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
363  *	Client sends ACK and transitions to Q_OPEN state.
364  *	Server receives ACK and transitions to Q_OPEN state.
365  *
366  * Return 0 for success, nonzero for failure.
367  */
368 static int
369 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
370 {
371 	etm_proto_hdr_t *hp;
372 	size_t hdrlen = ETM_HDRLEN;
373 	int hdrstat;
374 	char hbuf[ETM_HDRLEN];
375 
376 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
377 		return (1);
378 
379 	mp->epm_cstat = C_OPEN;
380 
381 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
382 
383 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
384 	    hdrlen)) != hdrlen) {
385 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
386 		    mp->epm_ep_str);
387 		return (2);
388 	}
389 
390 	mp->epm_qstat = Q_INIT_PENDING;
391 
392 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
393 	    hdrlen)) != hdrlen) {
394 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
395 		    mp->epm_ep_str);
396 		return (3);
397 	}
398 
399 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
400 
401 	if (hdrstat != ETM_HDR_S_HELLO) {
402 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
403 		    "from %s", mp->epm_ep_str);
404 		return (4);
405 	}
406 
407 	/*
408 	 * Get version from the server.
409 	 * Currently, only one version is supported.
410 	 */
411 	hp = (etm_proto_hdr_t *)(void *)hbuf;
412 	if (hp->hdr_ver != ETM_PROTO_V1) {
413 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
414 		    mp->epm_ep_str, hp->hdr_ver);
415 		return (5);
416 	}
417 	mp->epm_ver = hp->hdr_ver;
418 
419 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
420 
421 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
422 	    hdrlen)) != hdrlen) {
423 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
424 		    mp->epm_ep_str);
425 		return (6);
426 	}
427 
428 	/*
429 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
430 	 * Etm_mod_lock held to avoid race with etm_send thread.
431 	 */
432 	(void) pthread_mutex_lock(&Etm_mod_lock);
433 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
434 	    mp->epm_ep_nvl, NULL)) == NULL) {
435 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
436 		    mp->epm_ep_str);
437 	}
438 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
439 	(void) pthread_mutex_unlock(&Etm_mod_lock);
440 
441 	mp->epm_qstat = Q_OPEN;
442 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
443 
444 	return (0);
445 }
446 
447 /*
448  * Open a connection to the peer, send a SHUTDOWN message,
449  * and close the connection.
450  */
451 static void
452 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
453 {
454 	size_t hdrlen = ETM_HDRLEN;
455 	char hbuf[ETM_HDRLEN];
456 
457 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
458 		return;
459 
460 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
461 
462 	(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
463 
464 	(void) etm_xport_close(hdl, mp->epm_oconn);
465 	mp->epm_oconn = NULL;
466 }
467 
468 /*
469  * Alloc a nvlist and add a string for the endpoint.
470  * Return zero for success, non-zero for failure.
471  */
472 static int
473 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
474 {
475 	/*
476 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
477 	 * in fmd when this nvlist_t is free'd.
478 	 */
479 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
480 
481 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
482 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
483 		    "for %s", mp->epm_ep_str);
484 		nvlist_free(mp->epm_ep_nvl);
485 		return (1);
486 	}
487 
488 	return (0);
489 }
490 
491 /*
492  * Free the nvlist for the endpoint_id string.
493  */
494 /*ARGSUSED*/
495 static void
496 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
497 {
498 	nvlist_free(mp->epm_ep_nvl);
499 }
500 
501 /*
502  * Check for a duplicate endpoint/peer string.
503  */
504 /*ARGSUSED*/
505 static int
506 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
507 {
508 	etm_epmap_t *mp;
509 
510 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
511 		if (strcmp(epname, mp->epm_ep_str) == 0)
512 			return (1);
513 
514 	return (0);
515 }
516 
517 /*
518  * Attempt to re-open a connection with the remote endpoint.
519  */
520 static void
521 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
522 {
523 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
524 		if (gethrtime() < mp->epm_reconn_end) {
525 			if ((mp->epm_oconn = etm_xport_open(hdl,
526 			    mp->epm_tlhdl)) == NULL) {
527 				fmd_hdl_debug(hdl, "reconnect failed for %s",
528 				    mp->epm_ep_str);
529 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
530 				    NULL, Reconn_interval);
531 				mp->epm_timer_in_use = 1;
532 			} else {
533 				fmd_hdl_debug(hdl, "reconnect success for %s",
534 				    mp->epm_ep_str);
535 				mp->epm_reconn_end = 0;
536 				mp->epm_cstat = C_OPEN;
537 			}
538 		} else {
539 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
540 			    mp->epm_ep_str);
541 			mp->epm_reconn_end = 0;
542 			mp->epm_cstat = C_TIMED_OUT;
543 		}
544 	}
545 
546 	if (mp->epm_cstat == C_OPEN) {
547 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
548 		mp->epm_qstat = Q_OPEN;
549 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
550 	}
551 }
552 
553 /*
554  * Suspend a given connection and setup for reconnection retries.
555  * Assume caller holds lock on epm_lock.
556  */
557 static void
558 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
559 {
560 	(void) pthread_mutex_lock(&Etm_mod_lock);
561 	if (Etm_exit) {
562 		(void) pthread_mutex_unlock(&Etm_mod_lock);
563 		return;
564 	}
565 	(void) pthread_mutex_unlock(&Etm_mod_lock);
566 
567 	if (mp->epm_oconn != NULL) {
568 		(void) etm_xport_close(hdl, mp->epm_oconn);
569 		mp->epm_oconn = NULL;
570 	}
571 
572 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
573 	mp->epm_cstat = C_UNINITIALIZED;
574 
575 	if (mp->epm_xprthdl != NULL) {
576 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
577 		mp->epm_qstat = Q_SUSPENDED;
578 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
579 
580 		if (mp->epm_timer_in_use == 0) {
581 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
582 			    Reconn_interval);
583 			mp->epm_timer_in_use = 1;
584 		}
585 	}
586 }
587 
588 /*
589  * Reinitialize the connection. The old fmd_xprt_t handle must be
590  * removed/closed first.
591  * Assume caller holds lock on epm_lock.
592  */
593 static void
594 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
595 {
596 	/*
597 	 * To avoid a deadlock, wait for etm_send to finish before
598 	 * calling fmd_xprt_close()
599 	 */
600 	while (mp->epm_txbusy)
601 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
602 
603 	if (mp->epm_xprthdl != NULL) {
604 		fmd_xprt_close(hdl, mp->epm_xprthdl);
605 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
606 		mp->epm_xprthdl = NULL;
607 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
608 		mp->epm_ep_nvl = NULL;
609 	}
610 
611 	if (mp->epm_timer_in_use) {
612 		fmd_timer_remove(hdl, mp->epm_timer_id);
613 		mp->epm_timer_in_use = 0;
614 	}
615 
616 	if (mp->epm_oconn != NULL) {
617 		(void) etm_xport_close(hdl, mp->epm_oconn);
618 		mp->epm_oconn = NULL;
619 	}
620 
621 	mp->epm_cstat = C_UNINITIALIZED;
622 	mp->epm_qstat = Q_UNINITIALIZED;
623 }
624 
625 /*
626  * Receive data from ETM transport layer.
627  * Note : This is not the fmdo_recv entry point.
628  *
629  */
630 static int
631 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
632 {
633 	size_t buflen, hdrlen;
634 	void *buf;
635 	char hbuf[ETM_HDRLEN];
636 	int hdrstat, rv;
637 
638 	hdrlen = ETM_HDRLEN;
639 
640 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
641 		fmd_hdl_debug(hdl, "failed to read header from %s",
642 		    mp->epm_ep_str);
643 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
644 		return (EIO);
645 	}
646 
647 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
648 
649 	switch (hdrstat) {
650 	case ETM_HDR_INVALID:
651 		(void) pthread_mutex_lock(&mp->epm_lock);
652 		if (mp->epm_cstat == C_OPEN)
653 			mp->epm_cstat = C_CLOSED;
654 		(void) pthread_mutex_unlock(&mp->epm_lock);
655 
656 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
657 		rv = ECANCELED;
658 		break;
659 
660 	case ETM_HDR_BADTYPE:
661 	case ETM_HDR_BADVERSION:
662 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
663 
664 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
665 		    hdrlen)) != hdrlen) {
666 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
667 			    mp->epm_ep_str);
668 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
669 			return (EIO);
670 		}
671 
672 		(void) pthread_mutex_lock(&mp->epm_lock);
673 		mp->epm_cstat = C_LIMBO;
674 		(void) pthread_mutex_unlock(&mp->epm_lock);
675 
676 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
677 		rv = ENOTSUP;
678 		break;
679 
680 	case ETM_HDR_C_HELLO:
681 		/* Client is initiating a startup handshake */
682 		(void) pthread_mutex_lock(&mp->epm_lock);
683 		etm_reinit(hdl, mp);
684 		mp->epm_qstat = Q_INIT_PENDING;
685 		(void) pthread_mutex_unlock(&mp->epm_lock);
686 
687 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
688 
689 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
690 		    hdrlen)) != hdrlen) {
691 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
692 			    mp->epm_ep_str);
693 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
694 			return (EIO);
695 		}
696 
697 		rv = 0;
698 		break;
699 
700 	case ETM_HDR_ACK:
701 		(void) pthread_mutex_lock(&mp->epm_lock);
702 		if (mp->epm_qstat == Q_INIT_PENDING) {
703 			/* This is client's ACK from startup handshake */
704 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
705 			if (mp->epm_ep_nvl == NULL)
706 				(void) etm_get_ep_nvl(hdl, mp);
707 
708 			/*
709 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
710 			 * Etm_mod_lock held to avoid race with etm_send thread.
711 			 */
712 			(void) pthread_mutex_lock(&Etm_mod_lock);
713 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
714 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
715 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
716 				    "for %s", mp->epm_ep_str);
717 			}
718 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
719 			(void) pthread_mutex_unlock(&Etm_mod_lock);
720 
721 			mp->epm_qstat = Q_OPEN;
722 			(void) pthread_mutex_unlock(&mp->epm_lock);
723 			fmd_hdl_debug(hdl, "queue open for %s",
724 			    mp->epm_ep_str);
725 		} else {
726 			(void) pthread_mutex_unlock(&mp->epm_lock);
727 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
728 			    "from %s\n", mp->epm_ep_str);
729 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
730 		}
731 
732 		rv = 0;
733 		break;
734 
735 	case ETM_HDR_SHUTDOWN:
736 		fmd_hdl_debug(hdl, "received shutdown from %s",
737 		    mp->epm_ep_str);
738 
739 		(void) pthread_mutex_lock(&mp->epm_lock);
740 
741 		etm_reinit(hdl, mp);
742 
743 		if (IS_CLIENT(mp)) {
744 			/*
745 			 * A server shutdown is considered to be temporary.
746 			 * Prepare for reconnection.
747 			 */
748 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
749 			    Reconn_interval);
750 
751 			mp->epm_timer_in_use = 1;
752 		}
753 
754 		(void) pthread_mutex_unlock(&mp->epm_lock);
755 
756 		rv = ECANCELED;
757 		break;
758 
759 	case ETM_HDR_MSG:
760 		(void) pthread_mutex_lock(&mp->epm_lock);
761 		if (mp->epm_qstat == Q_UNINITIALIZED) {
762 			/* Peer (client) is unaware that we've restarted */
763 			(void) pthread_mutex_unlock(&mp->epm_lock);
764 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
765 			    ETM_HDR_S_RESTART, 0);
766 
767 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
768 			    hdrlen)) != hdrlen) {
769 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
770 				    "to %s", mp->epm_ep_str);
771 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
772 				return (EIO);
773 			}
774 
775 			return (ECANCELED);
776 		}
777 		(void) pthread_mutex_unlock(&mp->epm_lock);
778 
779 		buflen = etm_get_msglen(hbuf);
780 		ALLOC_BUF(hdl, buf, buflen);
781 
782 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
783 		    buflen) != buflen) {
784 			fmd_hdl_debug(hdl, "failed to read message from %s",
785 			    mp->epm_ep_str);
786 			FREE_BUF(hdl, buf, buflen);
787 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
788 			return (EIO);
789 		}
790 
791 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
792 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
793 
794 		etm_hex_dump(hdl, buf, buflen, 0);
795 
796 		if (etm_post_msg(hdl, mp, buf, buflen)) {
797 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
798 			FREE_BUF(hdl, buf, buflen);
799 			return (EIO);
800 		}
801 
802 		FREE_BUF(hdl, buf, buflen);
803 
804 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
805 
806 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
807 		    hdrlen)) != hdrlen) {
808 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
809 			    mp->epm_ep_str);
810 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
811 			return (EIO);
812 		}
813 
814 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
815 
816 		/*
817 		 * If we got this far and the current state of the
818 		 * outbound/sending connection is TIMED_OUT or
819 		 * LIMBO, then we should reinitialize it.
820 		 */
821 		(void) pthread_mutex_lock(&mp->epm_lock);
822 		if (mp->epm_cstat == C_TIMED_OUT ||
823 		    mp->epm_cstat == C_LIMBO) {
824 			if (mp->epm_oconn != NULL) {
825 				(void) etm_xport_close(hdl, mp->epm_oconn);
826 				mp->epm_oconn = NULL;
827 			}
828 			mp->epm_cstat = C_UNINITIALIZED;
829 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
830 			if (mp->epm_timer_in_use) {
831 				fmd_timer_remove(hdl, mp->epm_timer_id);
832 				mp->epm_timer_in_use = 0;
833 			}
834 			mp->epm_qstat = Q_OPEN;
835 			fmd_hdl_debug(hdl, "queue resumed for %s",
836 			    mp->epm_ep_str);
837 		}
838 		(void) pthread_mutex_unlock(&mp->epm_lock);
839 
840 		rv = 0;
841 		break;
842 
843 	default:
844 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
845 		    "from %s : %d", mp->epm_ep_str, hdrstat);
846 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
847 		rv = 0;
848 	}
849 
850 	return (rv);
851 }
852 
853 /*
854  * ETM transport layer callback function.
855  * The transport layer calls this function to :
856  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
857  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
858  */
859 static int
860 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
861     void *arg)
862 {
863 	etm_epmap_t *mp = (etm_epmap_t *)arg;
864 	int rv = 0;
865 
866 	(void) pthread_mutex_lock(&Etm_mod_lock);
867 	if (Etm_exit) {
868 		(void) pthread_mutex_unlock(&Etm_mod_lock);
869 		return (ECANCELED);
870 	}
871 	(void) pthread_mutex_unlock(&Etm_mod_lock);
872 
873 	switch (flag) {
874 	case ETM_CBFLAG_RECV:
875 		rv = etm_recv(hdl, conn, mp);
876 		break;
877 	case ETM_CBFLAG_REINIT:
878 		(void) pthread_mutex_lock(&mp->epm_lock);
879 		etm_reinit(hdl, mp);
880 		etm_send_shutdown(hdl, mp);
881 		(void) pthread_mutex_unlock(&mp->epm_lock);
882 		/*
883 		 * Return ECANCELED so the transport layer will close the
884 		 * server connection.  The transport layer is responsible for
885 		 * reestablishing this connection (should a connection request
886 		 * arrive from the peer).
887 		 */
888 		rv = ECANCELED;
889 		break;
890 	default:
891 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
892 		rv = ENOTSUP;
893 	}
894 
895 	return (rv);
896 }
897 
898 /*
899  * Allocate and initialize an etm_epmap_t struct for the given endpoint
900  * name string.
901  */
902 static void
903 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
904 {
905 	etm_epmap_t *newmap;
906 
907 	if (etm_check_dup_ep_str(hdl, epname)) {
908 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
909 		return;
910 	}
911 
912 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
913 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
914 	newmap->epm_xprtflags = flags;
915 	newmap->epm_cstat = C_UNINITIALIZED;
916 	newmap->epm_qstat = Q_UNINITIALIZED;
917 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
918 	newmap->epm_txbusy = 0;
919 
920 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
921 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
922 
923 	if (etm_get_ep_nvl(hdl, newmap)) {
924 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
925 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
926 		return;
927 	}
928 
929 	(void) pthread_mutex_lock(&newmap->epm_lock);
930 
931 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
932 	    etm_cb_func, newmap)) == NULL) {
933 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
934 		    newmap->epm_ep_str);
935 		etm_free_ep_nvl(hdl, newmap);
936 		(void) pthread_mutex_unlock(&newmap->epm_lock);
937 		(void) pthread_mutex_destroy(&newmap->epm_lock);
938 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
939 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
940 		return;
941 	}
942 
943 	if (IS_CLIENT(newmap)) {
944 		if (etm_handle_startup(hdl, newmap)) {
945 			/*
946 			 * For whatever reason, we could not complete the
947 			 * startup handshake with the server.  Set the timer
948 			 * and try again.
949 			 */
950 			if (newmap->epm_oconn != NULL) {
951 				(void) etm_xport_close(hdl, newmap->epm_oconn);
952 				newmap->epm_oconn = NULL;
953 			}
954 			newmap->epm_cstat = C_UNINITIALIZED;
955 			newmap->epm_qstat = Q_UNINITIALIZED;
956 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
957 			    NULL, Reconn_interval);
958 			newmap->epm_timer_in_use = 1;
959 		}
960 	} else {
961 		/*
962 		 * We may be restarting after a crash.  If so, the client
963 		 * may be unaware of this.
964 		 */
965 		etm_send_shutdown(hdl, newmap);
966 	}
967 
968 	/* Add this transport instance handle to the list */
969 	newmap->epm_next = Epmap_head;
970 	Epmap_head = newmap;
971 
972 	(void) pthread_mutex_unlock(&newmap->epm_lock);
973 
974 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
975 }
976 
977 /*
978  * Parse the given property list string and call etm_init_epmap
979  * for each endpoint.
980  */
981 static void
982 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
983 {
984 	char *epstr, *ep, *prefix, *lasts, *numstr;
985 	char epname[MAXPATHLEN];
986 	size_t slen, nlen;
987 	int beg, end, i;
988 
989 	if (eplist == NULL)
990 		return;
991 	/*
992 	 * Create a copy of eplist for parsing.
993 	 * strtok/strtok_r(3C) will insert null chars to the string.
994 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
995 	 */
996 	slen = strlen(eplist);
997 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
998 	(void) strcpy(epstr, eplist);
999 
1000 	/*
1001 	 * The following are supported for the "client_list" and
1002 	 * "server_list" properties :
1003 	 *
1004 	 *    A space-separated list of endpoints.
1005 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
1006 	 *
1007 	 *    An array syntax for a range of instances.
1008 	 *	"dev:///dom[0:2]"
1009 	 *
1010 	 *    A combination of both.
1011 	 *	"dev:///dom0 dev:///dom[1:2]"
1012 	 */
1013 	ep = strtok_r(epstr, " ", &lasts);
1014 	while (ep != NULL) {
1015 		if (strchr(ep, '[') != NULL) {
1016 			/*
1017 			 * This string is using array syntax.
1018 			 * Check the string for correct syntax.
1019 			 */
1020 			if ((strchr(ep, ':') == NULL) ||
1021 			    (strchr(ep, ']') == NULL)) {
1022 				fmd_hdl_error(hdl, "Syntax error in property "
1023 				    "that includes : %s\n", ep);
1024 				ep = strtok_r(NULL, " ", &lasts);
1025 				continue;
1026 			}
1027 
1028 			/* expand the array syntax */
1029 			prefix = strtok(ep, "[");
1030 
1031 			numstr = strtok(NULL, ":");
1032 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1033 				fmd_hdl_error(hdl, "Syntax error in property "
1034 				    "that includes : %s[\n", prefix);
1035 				ep = strtok_r(NULL, " ", &lasts);
1036 				continue;
1037 			}
1038 			beg = atoi(numstr);
1039 
1040 			numstr = strtok(NULL, "]");
1041 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1042 				fmd_hdl_error(hdl, "Syntax error in property "
1043 				    "that includes : %s[\n", prefix);
1044 				ep = strtok_r(NULL, " ", &lasts);
1045 				continue;
1046 			}
1047 			end = atoi(numstr);
1048 
1049 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
1050 
1051 			if (nlen > MAXPATHLEN) {
1052 				fmd_hdl_error(hdl, "Endpoint prop string "
1053 				    "exceeds MAXPATHLEN\n");
1054 				ep = strtok_r(NULL, " ", &lasts);
1055 				continue;
1056 			}
1057 
1058 			for (i = beg; i <= end; i++) {
1059 				bzero(epname, MAXPATHLEN);
1060 				(void) snprintf(epname, nlen, "%s%d",
1061 				    prefix, i);
1062 				etm_init_epmap(hdl, epname, flags);
1063 			}
1064 		} else {
1065 			etm_init_epmap(hdl, ep, flags);
1066 		}
1067 
1068 		ep = strtok_r(NULL, " ", &lasts);
1069 	}
1070 
1071 	fmd_hdl_free(hdl, epstr, slen + 1);
1072 }
1073 
1074 /*
1075  * Free the transport infrastructure for an endpoint.
1076  */
1077 static void
1078 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1079 {
1080 	size_t hdrlen;
1081 	char hbuf[ETM_HDRLEN];
1082 
1083 	(void) pthread_mutex_lock(&mp->epm_lock);
1084 
1085 	/*
1086 	 * If an etm_send thread is in progress, wait for it to finish.
1087 	 * The etm_recv thread is managed by the transport layer and will
1088 	 * be destroyed with etm_xport_fini().
1089 	 */
1090 	while (mp->epm_txbusy)
1091 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1092 
1093 	if (mp->epm_timer_in_use)
1094 		fmd_timer_remove(hdl, mp->epm_timer_id);
1095 
1096 	if (mp->epm_oconn != NULL) {
1097 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1098 		    ETM_HDR_SHUTDOWN, 0);
1099 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1100 		    hdrlen);
1101 		(void) etm_xport_close(hdl, mp->epm_oconn);
1102 		mp->epm_oconn = NULL;
1103 	}
1104 
1105 	if (mp->epm_xprthdl != NULL) {
1106 		fmd_xprt_close(hdl, mp->epm_xprthdl);
1107 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1108 		mp->epm_ep_nvl = NULL;
1109 	}
1110 
1111 	if (mp->epm_ep_nvl != NULL)
1112 		etm_free_ep_nvl(hdl, mp);
1113 
1114 	if (mp->epm_tlhdl != NULL)
1115 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1116 
1117 	(void) pthread_mutex_unlock(&mp->epm_lock);
1118 	(void) pthread_mutex_destroy(&mp->epm_lock);
1119 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1120 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1121 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1122 }
1123 
1124 /*
1125  * FMD entry points
1126  */
1127 
1128 /*
1129  * FMD fmdo_send entry point.
1130  * Send an event to the remote endpoint and receive an ACK.
1131  */
1132 static int
1133 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1134 {
1135 	etm_epmap_t *mp;
1136 	nvlist_t *msgnvl;
1137 	int hdrstat, rv, cnt = 0;
1138 	char *buf, *nvbuf, *class;
1139 	size_t nvsize, buflen, hdrlen;
1140 	struct timespec tms;
1141 
1142 	(void) pthread_mutex_lock(&Etm_mod_lock);
1143 	if (Etm_exit) {
1144 		(void) pthread_mutex_unlock(&Etm_mod_lock);
1145 		return (FMD_SEND_RETRY);
1146 	}
1147 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1148 
1149 	mp = fmd_xprt_getspecific(hdl, xprthdl);
1150 
1151 	for (;;) {
1152 		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1153 			break;
1154 		} else {
1155 			/*
1156 			 * Another thread may be (1) trying to close this
1157 			 * fmd_xprt_t, or (2) posting an event to it.
1158 			 * If (1), don't want to spend too much time here.
1159 			 * If (2), allow it to finish and release epm_lock.
1160 			 */
1161 			if (cnt++ < 10) {
1162 				tms.tv_sec = 0;
1163 				tms.tv_nsec = (cnt * 10000);
1164 				(void) nanosleep(&tms, NULL);
1165 
1166 			} else {
1167 				return (FMD_SEND_RETRY);
1168 			}
1169 		}
1170 	}
1171 
1172 	mp->epm_txbusy++;
1173 
1174 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1175 		mp->epm_txbusy--;
1176 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1177 		(void) pthread_mutex_unlock(&mp->epm_lock);
1178 		return (FMD_SEND_FAILED);
1179 	}
1180 
1181 	if (mp->epm_cstat == C_CLOSED) {
1182 		etm_suspend_reconnect(hdl, mp);
1183 		mp->epm_txbusy--;
1184 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1185 		(void) pthread_mutex_unlock(&mp->epm_lock);
1186 		return (FMD_SEND_RETRY);
1187 	}
1188 
1189 	if (mp->epm_cstat == C_LIMBO) {
1190 		if (mp->epm_oconn != NULL) {
1191 			(void) etm_xport_close(hdl, mp->epm_oconn);
1192 			mp->epm_oconn = NULL;
1193 		}
1194 
1195 		fmd_xprt_suspend(hdl, xprthdl);
1196 		mp->epm_qstat = Q_SUSPENDED;
1197 		mp->epm_txbusy--;
1198 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1199 		(void) pthread_mutex_unlock(&mp->epm_lock);
1200 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1201 		return (FMD_SEND_RETRY);
1202 	}
1203 
1204 	if (mp->epm_oconn == NULL) {
1205 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1206 		    == NULL) {
1207 			etm_suspend_reconnect(hdl, mp);
1208 			mp->epm_txbusy--;
1209 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1210 			(void) pthread_mutex_unlock(&mp->epm_lock);
1211 			return (FMD_SEND_RETRY);
1212 		} else {
1213 			mp->epm_cstat = C_OPEN;
1214 		}
1215 	}
1216 
1217 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1218 		fmd_hdl_abort(hdl, "No class string in nvlist");
1219 
1220 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1221 	if (msgnvl == NULL) {
1222 		mp->epm_txbusy--;
1223 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1224 		(void) pthread_mutex_unlock(&mp->epm_lock);
1225 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1226 		    (void *) ep);
1227 		return (FMD_SEND_FAILED);
1228 	}
1229 
1230 	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1231 	if (rv == ETM_XPORT_FILTER_DROP) {
1232 		mp->epm_txbusy--;
1233 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1234 		(void) pthread_mutex_unlock(&mp->epm_lock);
1235 		fmd_hdl_debug(hdl, "send_filter dropped event");
1236 		nvlist_free(msgnvl);
1237 		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1238 		return (FMD_SEND_SUCCESS);
1239 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
1240 		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1241 		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1242 		/* Still send event */
1243 	}
1244 
1245 	(void) pthread_mutex_unlock(&mp->epm_lock);
1246 
1247 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1248 
1249 	hdrlen = ETM_HDRLEN;
1250 	buflen = nvsize + hdrlen;
1251 
1252 	ALLOC_BUF(hdl, buf, buflen);
1253 
1254 	nvbuf = buf + hdrlen;
1255 
1256 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1257 
1258 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1259 		(void) pthread_mutex_lock(&mp->epm_lock);
1260 		mp->epm_txbusy--;
1261 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1262 		(void) pthread_mutex_unlock(&mp->epm_lock);
1263 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1264 		nvlist_free(msgnvl);
1265 		FREE_BUF(hdl, buf, buflen);
1266 		return (FMD_SEND_FAILED);
1267 	}
1268 
1269 	nvlist_free(msgnvl);
1270 
1271 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1272 	    buflen) != buflen) {
1273 		fmd_hdl_debug(hdl, "failed to send message to %s",
1274 		    mp->epm_ep_str);
1275 		(void) pthread_mutex_lock(&mp->epm_lock);
1276 		etm_suspend_reconnect(hdl, mp);
1277 		mp->epm_txbusy--;
1278 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1279 		(void) pthread_mutex_unlock(&mp->epm_lock);
1280 		FREE_BUF(hdl, buf, buflen);
1281 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1282 		return (FMD_SEND_RETRY);
1283 	}
1284 
1285 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1286 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1287 
1288 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1289 
1290 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1291 	    hdrlen) != hdrlen) {
1292 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1293 		    mp->epm_ep_str);
1294 		(void) pthread_mutex_lock(&mp->epm_lock);
1295 		etm_suspend_reconnect(hdl, mp);
1296 		mp->epm_txbusy--;
1297 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1298 		(void) pthread_mutex_unlock(&mp->epm_lock);
1299 		FREE_BUF(hdl, buf, buflen);
1300 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1301 		return (FMD_SEND_RETRY);
1302 	}
1303 
1304 	hdrstat = etm_check_hdr(hdl, mp, buf);
1305 	FREE_BUF(hdl, buf, buflen);
1306 
1307 	if (hdrstat == ETM_HDR_ACK) {
1308 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1309 	} else {
1310 		(void) pthread_mutex_lock(&mp->epm_lock);
1311 
1312 		(void) etm_xport_close(hdl, mp->epm_oconn);
1313 		mp->epm_oconn = NULL;
1314 
1315 		if (hdrstat == ETM_HDR_NAK) {
1316 			/* Peer received a bad value in the header */
1317 			if (mp->epm_xprthdl != NULL) {
1318 				mp->epm_cstat = C_LIMBO;
1319 				fmd_xprt_suspend(hdl, xprthdl);
1320 				mp->epm_qstat = Q_SUSPENDED;
1321 				fmd_hdl_debug(hdl, "received NAK, queue "
1322 				    "suspended for %s", mp->epm_ep_str);
1323 			}
1324 
1325 			rv = FMD_SEND_RETRY;
1326 
1327 		} else if (hdrstat == ETM_HDR_S_RESTART) {
1328 			/* Server has restarted */
1329 			mp->epm_cstat = C_CLOSED;
1330 			mp->epm_qstat = Q_UNINITIALIZED;
1331 			fmd_hdl_debug(hdl, "server %s restarted",
1332 			    mp->epm_ep_str);
1333 			/*
1334 			 * Cannot call fmd_xprt_close here, so we'll do it
1335 			 * on the timeout thread.
1336 			 */
1337 			if (mp->epm_timer_in_use == 0) {
1338 				mp->epm_timer_id = fmd_timer_install(
1339 				    hdl, mp, NULL, 0);
1340 				mp->epm_timer_in_use = 1;
1341 			}
1342 
1343 			/*
1344 			 * fault.* or list.* events will be replayed if a
1345 			 * transport is opened with the same auth.
1346 			 * Other events will be discarded.
1347 			 */
1348 			rv = FMD_SEND_FAILED;
1349 
1350 		} else {
1351 			mp->epm_cstat = C_CLOSED;
1352 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1353 
1354 			rv = FMD_SEND_RETRY;
1355 		}
1356 
1357 		mp->epm_txbusy--;
1358 
1359 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1360 		(void) pthread_mutex_unlock(&mp->epm_lock);
1361 
1362 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1363 
1364 		return (rv);
1365 	}
1366 
1367 	(void) pthread_mutex_lock(&mp->epm_lock);
1368 	mp->epm_txbusy--;
1369 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1370 	(void) pthread_mutex_unlock(&mp->epm_lock);
1371 
1372 	return (FMD_SEND_SUCCESS);
1373 }
1374 
1375 /*
1376  * FMD fmdo_timeout entry point..
1377  */
1378 /*ARGSUSED*/
1379 static void
1380 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1381 {
1382 	etm_epmap_t *mp = (etm_epmap_t *)data;
1383 
1384 	(void) pthread_mutex_lock(&mp->epm_lock);
1385 
1386 	mp->epm_timer_in_use = 0;
1387 
1388 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1389 		/* Server has shutdown and we (client) need to reconnect */
1390 		if (mp->epm_xprthdl != NULL) {
1391 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1392 			fmd_hdl_debug(hdl, "queue closed for %s",
1393 			    mp->epm_ep_str);
1394 			mp->epm_xprthdl = NULL;
1395 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1396 			mp->epm_ep_nvl = NULL;
1397 		}
1398 
1399 		if (mp->epm_ep_nvl == NULL)
1400 			(void) etm_get_ep_nvl(hdl, mp);
1401 
1402 		if (etm_handle_startup(hdl, mp)) {
1403 			if (mp->epm_oconn != NULL) {
1404 				(void) etm_xport_close(hdl, mp->epm_oconn);
1405 				mp->epm_oconn = NULL;
1406 			}
1407 			mp->epm_cstat = C_UNINITIALIZED;
1408 			mp->epm_qstat = Q_UNINITIALIZED;
1409 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1410 			    Reconn_interval);
1411 			mp->epm_timer_in_use = 1;
1412 		}
1413 	} else {
1414 		etm_reconnect(hdl, mp);
1415 	}
1416 
1417 	(void) pthread_mutex_unlock(&mp->epm_lock);
1418 }
1419 
1420 /*
1421  * FMD Module declarations
1422  */
1423 static const fmd_hdl_ops_t etm_ops = {
1424 	NULL,		/* fmdo_recv */
1425 	etm_timeout,	/* fmdo_timeout */
1426 	NULL,		/* fmdo_close */
1427 	NULL,		/* fmdo_stats */
1428 	NULL,		/* fmdo_gc */
1429 	etm_send,	/* fmdo_send */
1430 };
1431 
1432 static const fmd_prop_t etm_props[] = {
1433 	{ "client_list", FMD_TYPE_STRING, NULL },
1434 	{ "server_list", FMD_TYPE_STRING, NULL },
1435 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1436 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1437 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1438 	{ "filter_path", FMD_TYPE_STRING, NULL },
1439 	{ NULL, 0, NULL }
1440 };
1441 
1442 static const fmd_hdl_info_t etm_info = {
1443 	"Event Transport Module", "2.0", &etm_ops, etm_props
1444 };
1445 
1446 /*
1447  * Initialize the transport for use by ETM.
1448  */
1449 void
1450 _fmd_init(fmd_hdl_t *hdl)
1451 {
1452 	char *propstr;
1453 
1454 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1455 		return; /* invalid data in configuration file */
1456 	}
1457 
1458 	/* Create global stats */
1459 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1460 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1461 
1462 	/* Get module properties */
1463 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1464 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1465 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1466 
1467 	propstr = fmd_prop_get_string(hdl, "client_list");
1468 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1469 	fmd_prop_free_string(hdl, propstr);
1470 
1471 	propstr = fmd_prop_get_string(hdl, "server_list");
1472 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1473 	fmd_prop_free_string(hdl, propstr);
1474 
1475 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1476 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1477 		fmd_hdl_unregister(hdl);
1478 		return;
1479 	}
1480 }
1481 
1482 /*
1483  * Teardown the transport
1484  */
1485 void
1486 _fmd_fini(fmd_hdl_t *hdl)
1487 {
1488 	etm_epmap_t *mp, *next;
1489 
1490 	(void) pthread_mutex_lock(&Etm_mod_lock);
1491 	Etm_exit = 1;
1492 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1493 
1494 	mp = Epmap_head;
1495 
1496 	while (mp) {
1497 		next = mp->epm_next;
1498 		etm_free_epmap(hdl, mp);
1499 		mp = next;
1500 	}
1501 
1502 	fmd_hdl_unregister(hdl);
1503 }
1504