xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision d87d03b4c0f66bf125e607ef8b0d9c5481040d20)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * FMA Event Transport Module
31  *
32  * Plugin for sending/receiving FMA events to/from a remote endoint.
33  */
34 
35 #include <netinet/in.h>
36 #include <errno.h>
37 #include <sys/fm/protocol.h>
38 #include <sys/sysmacros.h>
39 #include <pthread.h>
40 #include <strings.h>
41 #include <ctype.h>
42 #include <link.h>
43 #include <libnvpair.h>
44 #include "etm_xport_api.h"
45 #include "etm_proto.h"
46 
47 /*
48  * ETM declarations
49  */
50 
51 typedef enum etm_connection_status {
52 	C_UNINITIALIZED = 0,
53 	C_OPEN,				/* Connection is open */
54 	C_CLOSED,			/* Connection is closed */
55 	C_LIMBO,			/* Bad value in header from peer */
56 	C_TIMED_OUT			/* Reconnection to peer timed out */
57 } etm_connstat_t;
58 
59 typedef enum etm_fmd_queue_status {
60 	Q_UNINITIALIZED = 100,
61 	Q_INIT_PENDING,			/* Queue initialization in progress */
62 	Q_OPEN,				/* Queue is open */
63 	Q_SUSPENDED			/* Queue is suspended */
64 } etm_qstat_t;
65 
66 /* Per endpoint data */
67 typedef struct etm_endpoint_map {
68 	uint8_t epm_ver;		/* Protocol version being used */
69 	char *epm_ep_str;		/* Endpoint ID string */
70 	int epm_xprtflags;		/* FMD transport open flags */
71 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
72 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
73 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
74 	int epm_txbusy;			/* Busy doing send/transmit */
75 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
76 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
77 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
78 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
79 	etm_connstat_t epm_cstat;	/* Status of connection */
80 	id_t epm_timer_id;		/* Timer id */
81 	int epm_timer_in_use;		/* Indicates if timer is in use */
82 	hrtime_t epm_reconn_end;	/* Reconnection end time */
83 	struct etm_endpoint_map *epm_next;
84 } etm_epmap_t;
85 
86 #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
87 #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
88 #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
89 #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
90 #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
91 #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
92 
93 #define	ALLOC_BUF(hdl, buf, size) \
94 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
95 
96 #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
97 
98 #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
99 
100 #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
101 				(x)++;					    \
102 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
103 			}
104 
105 #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
106 				(x)--;					    \
107 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
108 			}
109 
110 #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
111 				(x) += (y);				    \
112 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
113 			}
114 
115 /*
116  * Global variables
117  */
118 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
119 					/* Protects globals */
120 static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
121 static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
122 static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
123 static int Etm_dump = 0;		/* Enables hex dump for debug */
124 static int Etm_exit = 0;		/* Flag for exit */
125 static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
126 
127 /* Module statistics */
128 static struct etm_stats {
129 	/* read counters */
130 	fmd_stat_t read_ack;
131 	fmd_stat_t read_bytes;
132 	fmd_stat_t read_msg;
133 	fmd_stat_t post_filter;
134 	/* write counters */
135 	fmd_stat_t write_ack;
136 	fmd_stat_t write_bytes;
137 	fmd_stat_t write_msg;
138 	fmd_stat_t send_filter;
139 	/* error counters */
140 	fmd_stat_t error_protocol;
141 	fmd_stat_t error_drop_read;
142 	fmd_stat_t error_read;
143 	fmd_stat_t error_read_badhdr;
144 	fmd_stat_t error_write;
145 	fmd_stat_t error_send_filter;
146 	fmd_stat_t error_post_filter;
147 	/* misc */
148 	fmd_stat_t peer_count;
149 
150 } Etm_stats = {
151 	/* read counters */
152 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
153 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
154 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
155 	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
156 	/* write counters */
157 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
158 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
159 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
160 	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
161 	/* ETM error counters */
162 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
163 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
164 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
165 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
166 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
167 	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
168 	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
169 	/* ETM Misc */
170 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
171 };
172 
173 /*
174  * ETM Private functions
175  */
176 
177 /*
178  * Hex dump for debug.
179  */
180 static void
181 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
182 {
183 	int i, j, k;
184 	int16_t *c;
185 
186 	if (Etm_dump == 0)
187 		return;
188 
189 	j = buflen / 16;	/* Number of complete 8-column rows */
190 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
191 
192 	if (direction)
193 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
194 	else
195 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
196 
197 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
198 
199 	/* Dump the complete 8-column rows */
200 	for (i = 0; i < j; i++) {
201 		c = (int16_t *)buf + (i * 8);
202 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
203 		    *(c+0), *(c+1), *(c+2), *(c+3),
204 		    *(c+4), *(c+5), *(c+6), *(c+7));
205 	}
206 
207 	/* Dump the last (incomplete) row */
208 	c = (int16_t *)buf + (i * 8);
209 	switch (k) {
210 	case 4:
211 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
212 		break;
213 	case 8:
214 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
215 		    *(c+2), *(c+3));
216 		break;
217 	case 12:
218 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
219 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
220 		break;
221 	}
222 
223 	fmd_hdl_debug(hdl, "---      End Dump      ---");
224 }
225 
226 /*
227  * Provide the length of a message based on the data in the given ETM header.
228  */
229 static size_t
230 etm_get_msglen(void *buf)
231 {
232 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
233 
234 	return (ntohl(hp->hdr_msglen));
235 }
236 
237 /*
238  * Check the contents of the ETM header for errors.
239  * Return the header type (hdr_type).
240  */
241 static int
242 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
243 {
244 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
245 
246 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
248 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
249 		return (ETM_HDR_INVALID);
250 	}
251 
252 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
253 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
254 		/* Until version is negotiated, other fields may be wrong */
255 		return (hp->hdr_type);
256 	}
257 
258 	if (hp->hdr_ver != mp->epm_ver) {
259 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
260 		    mp->epm_ep_str, hp->hdr_ver);
261 		return (ETM_HDR_BADVERSION);
262 	}
263 
264 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
265 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
267 		    mp->epm_ep_str, hp->hdr_type);
268 		return (ETM_HDR_BADTYPE);
269 	}
270 
271 	return (hp->hdr_type);
272 }
273 
274 /*
275  * Create an ETM header of a given type in the given buffer.
276  * Return length of header.
277  */
278 static size_t
279 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
280 {
281 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
282 
283 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
284 	hp->hdr_ver = ver;
285 	hp->hdr_type = type;
286 	hp->hdr_msglen = htonl(msglen);
287 
288 	return (ETM_HDRLEN);
289 }
290 
291 /*
292  * Convert message bytes to nvlist and post to fmd.
293  * Return zero for success, non-zero for failure.
294  *
295  * Note : nvl is free'd by fmd.
296  */
297 static int
298 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
299 {
300 	nvlist_t *nvl;
301 	int rv;
302 
303 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304 		fmd_hdl_error(hdl, "failed to unpack message");
305 		return (1);
306 	}
307 
308 	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
309 	if (rv == ETM_XPORT_FILTER_DROP) {
310 		fmd_hdl_debug(hdl, "post_filter dropped event");
311 		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
312 		nvlist_free(nvl);
313 		return (0);
314 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
315 		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
316 		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
317 		/* Still post event */
318 	}
319 
320 	(void) pthread_mutex_lock(&mp->epm_lock);
321 	(void) pthread_mutex_lock(&Etm_mod_lock);
322 	if (!Etm_exit) {
323 		(void) pthread_mutex_unlock(&Etm_mod_lock);
324 		if (mp->epm_qstat == Q_OPEN) {
325 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
326 			rv = 0;
327 		} else if (mp->epm_qstat == Q_SUSPENDED) {
328 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
329 			if (mp->epm_timer_in_use) {
330 				fmd_timer_remove(hdl, mp->epm_timer_id);
331 				mp->epm_timer_in_use = 0;
332 			}
333 			mp->epm_qstat = Q_OPEN;
334 			fmd_hdl_debug(hdl, "queue resumed for %s",
335 			    mp->epm_ep_str);
336 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
337 			rv = 0;
338 		} else {
339 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
340 			    mp->epm_qstat);
341 			nvlist_free(nvl);
342 			/* Remote peer will attempt to resend event */
343 			rv = 2;
344 		}
345 	} else {
346 		(void) pthread_mutex_unlock(&Etm_mod_lock);
347 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348 		nvlist_free(nvl);
349 		/* Remote peer will attempt to resend event */
350 		rv = 3;
351 	}
352 
353 	(void) pthread_mutex_unlock(&mp->epm_lock);
354 
355 	return (rv);
356 }
357 
358 /*
359  * Handle the startup handshake to the server.  The client always initiates
360  * the startup handshake.  In the following sequence, we are the client and
361  * the remote endpoint is the server.
362  *
363  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
364  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
365  *	Client sends ACK and transitions to Q_OPEN state.
366  *	Server receives ACK and transitions to Q_OPEN state.
367  *
368  * Return 0 for success, nonzero for failure.
369  */
370 static int
371 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
372 {
373 	etm_proto_hdr_t *hp;
374 	size_t hdrlen = ETM_HDRLEN;
375 	int hdrstat;
376 	char hbuf[ETM_HDRLEN];
377 
378 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
379 		return (1);
380 
381 	mp->epm_cstat = C_OPEN;
382 
383 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
384 
385 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
386 	    hdrlen)) != hdrlen) {
387 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
388 		    mp->epm_ep_str);
389 		return (2);
390 	}
391 
392 	mp->epm_qstat = Q_INIT_PENDING;
393 
394 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
395 	    hdrlen)) != hdrlen) {
396 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
397 		    mp->epm_ep_str);
398 		return (3);
399 	}
400 
401 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
402 
403 	if (hdrstat != ETM_HDR_S_HELLO) {
404 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
405 		    "from %s", mp->epm_ep_str);
406 		return (4);
407 	}
408 
409 	/*
410 	 * Get version from the server.
411 	 * Currently, only one version is supported.
412 	 */
413 	hp = (etm_proto_hdr_t *)(void *)hbuf;
414 	if (hp->hdr_ver != ETM_PROTO_V1) {
415 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
416 		    mp->epm_ep_str, hp->hdr_ver);
417 		return (5);
418 	}
419 	mp->epm_ver = hp->hdr_ver;
420 
421 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
422 
423 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
424 	    hdrlen)) != hdrlen) {
425 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
426 		    mp->epm_ep_str);
427 		return (6);
428 	}
429 
430 	/*
431 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
432 	 * Etm_mod_lock held to avoid race with etm_send thread.
433 	 */
434 	(void) pthread_mutex_lock(&Etm_mod_lock);
435 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
436 	    mp->epm_ep_nvl, NULL)) == NULL) {
437 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
438 		    mp->epm_ep_str);
439 	}
440 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
441 	(void) pthread_mutex_unlock(&Etm_mod_lock);
442 
443 	mp->epm_qstat = Q_OPEN;
444 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
445 
446 	return (0);
447 }
448 
449 /*
450  * Open a connection to the peer, send a SHUTDOWN message,
451  * and close the connection.
452  */
453 static void
454 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
455 {
456 	size_t hdrlen = ETM_HDRLEN;
457 	char hbuf[ETM_HDRLEN];
458 
459 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
460 		return;
461 
462 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
463 
464 	(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
465 
466 	(void) etm_xport_close(hdl, mp->epm_oconn);
467 	mp->epm_oconn = NULL;
468 }
469 
470 /*
471  * Alloc a nvlist and add a string for the endpoint.
472  * Return zero for success, non-zero for failure.
473  */
474 static int
475 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
476 {
477 	/*
478 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
479 	 * in fmd when this nvlist_t is free'd.
480 	 */
481 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
482 
483 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
484 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
485 		    "for %s", mp->epm_ep_str);
486 		nvlist_free(mp->epm_ep_nvl);
487 		return (1);
488 	}
489 
490 	return (0);
491 }
492 
493 /*
494  * Free the nvlist for the endpoint_id string.
495  */
496 /*ARGSUSED*/
497 static void
498 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
499 {
500 	nvlist_free(mp->epm_ep_nvl);
501 }
502 
503 /*
504  * Check for a duplicate endpoint/peer string.
505  */
506 /*ARGSUSED*/
507 static int
508 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
509 {
510 	etm_epmap_t *mp;
511 
512 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
513 		if (strcmp(epname, mp->epm_ep_str) == 0)
514 			return (1);
515 
516 	return (0);
517 }
518 
519 /*
520  * Attempt to re-open a connection with the remote endpoint.
521  */
522 static void
523 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
524 {
525 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
526 		if (gethrtime() < mp->epm_reconn_end) {
527 			if ((mp->epm_oconn = etm_xport_open(hdl,
528 			    mp->epm_tlhdl)) == NULL) {
529 				fmd_hdl_debug(hdl, "reconnect failed for %s",
530 				    mp->epm_ep_str);
531 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
532 				    NULL, Reconn_interval);
533 				mp->epm_timer_in_use = 1;
534 			} else {
535 				fmd_hdl_debug(hdl, "reconnect success for %s",
536 				    mp->epm_ep_str);
537 				mp->epm_reconn_end = 0;
538 				mp->epm_cstat = C_OPEN;
539 			}
540 		} else {
541 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
542 			    mp->epm_ep_str);
543 			mp->epm_reconn_end = 0;
544 			mp->epm_cstat = C_TIMED_OUT;
545 		}
546 	}
547 
548 	if (mp->epm_cstat == C_OPEN) {
549 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
550 		mp->epm_qstat = Q_OPEN;
551 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
552 	}
553 }
554 
555 /*
556  * Suspend a given connection and setup for reconnection retries.
557  * Assume caller holds lock on epm_lock.
558  */
559 static void
560 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
561 {
562 	(void) pthread_mutex_lock(&Etm_mod_lock);
563 	if (Etm_exit) {
564 		(void) pthread_mutex_unlock(&Etm_mod_lock);
565 		return;
566 	}
567 	(void) pthread_mutex_unlock(&Etm_mod_lock);
568 
569 	if (mp->epm_oconn != NULL) {
570 		(void) etm_xport_close(hdl, mp->epm_oconn);
571 		mp->epm_oconn = NULL;
572 	}
573 
574 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
575 	mp->epm_cstat = C_UNINITIALIZED;
576 
577 	if (mp->epm_xprthdl != NULL) {
578 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
579 		mp->epm_qstat = Q_SUSPENDED;
580 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
581 
582 		if (mp->epm_timer_in_use == 0) {
583 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
584 			    Reconn_interval);
585 			mp->epm_timer_in_use = 1;
586 		}
587 	}
588 }
589 
590 /*
591  * Reinitialize the connection. The old fmd_xprt_t handle must be
592  * removed/closed first.
593  * Assume caller holds lock on epm_lock.
594  */
595 static void
596 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
597 {
598 	/*
599 	 * To avoid a deadlock, wait for etm_send to finish before
600 	 * calling fmd_xprt_close()
601 	 */
602 	while (mp->epm_txbusy)
603 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
604 
605 	if (mp->epm_xprthdl != NULL) {
606 		fmd_xprt_close(hdl, mp->epm_xprthdl);
607 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
608 		mp->epm_xprthdl = NULL;
609 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
610 		mp->epm_ep_nvl = NULL;
611 	}
612 
613 	if (mp->epm_timer_in_use) {
614 		fmd_timer_remove(hdl, mp->epm_timer_id);
615 		mp->epm_timer_in_use = 0;
616 	}
617 
618 	if (mp->epm_oconn != NULL) {
619 		(void) etm_xport_close(hdl, mp->epm_oconn);
620 		mp->epm_oconn = NULL;
621 	}
622 
623 	mp->epm_cstat = C_UNINITIALIZED;
624 	mp->epm_qstat = Q_UNINITIALIZED;
625 }
626 
627 /*
628  * Receive data from ETM transport layer.
629  * Note : This is not the fmdo_recv entry point.
630  *
631  */
632 static int
633 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
634 {
635 	size_t buflen, hdrlen;
636 	void *buf;
637 	char hbuf[ETM_HDRLEN];
638 	int hdrstat, rv;
639 
640 	hdrlen = ETM_HDRLEN;
641 
642 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
643 		fmd_hdl_debug(hdl, "failed to read header from %s",
644 		    mp->epm_ep_str);
645 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
646 		return (EIO);
647 	}
648 
649 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
650 
651 	switch (hdrstat) {
652 	case ETM_HDR_INVALID:
653 		(void) pthread_mutex_lock(&mp->epm_lock);
654 		if (mp->epm_cstat == C_OPEN)
655 			mp->epm_cstat = C_CLOSED;
656 		(void) pthread_mutex_unlock(&mp->epm_lock);
657 
658 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
659 		rv = ECANCELED;
660 		break;
661 
662 	case ETM_HDR_BADTYPE:
663 	case ETM_HDR_BADVERSION:
664 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
665 
666 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
667 		    hdrlen)) != hdrlen) {
668 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
669 			    mp->epm_ep_str);
670 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
671 			return (EIO);
672 		}
673 
674 		(void) pthread_mutex_lock(&mp->epm_lock);
675 		mp->epm_cstat = C_LIMBO;
676 		(void) pthread_mutex_unlock(&mp->epm_lock);
677 
678 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
679 		rv = ENOTSUP;
680 		break;
681 
682 	case ETM_HDR_C_HELLO:
683 		/* Client is initiating a startup handshake */
684 		(void) pthread_mutex_lock(&mp->epm_lock);
685 		etm_reinit(hdl, mp);
686 		mp->epm_qstat = Q_INIT_PENDING;
687 		(void) pthread_mutex_unlock(&mp->epm_lock);
688 
689 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
690 
691 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
692 		    hdrlen)) != hdrlen) {
693 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
694 			    mp->epm_ep_str);
695 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
696 			return (EIO);
697 		}
698 
699 		rv = 0;
700 		break;
701 
702 	case ETM_HDR_ACK:
703 		(void) pthread_mutex_lock(&mp->epm_lock);
704 		if (mp->epm_qstat == Q_INIT_PENDING) {
705 			/* This is client's ACK from startup handshake */
706 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
707 			if (mp->epm_ep_nvl == NULL)
708 				(void) etm_get_ep_nvl(hdl, mp);
709 
710 			/*
711 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
712 			 * Etm_mod_lock held to avoid race with etm_send thread.
713 			 */
714 			(void) pthread_mutex_lock(&Etm_mod_lock);
715 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
716 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
717 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
718 				    "for %s", mp->epm_ep_str);
719 			}
720 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
721 			(void) pthread_mutex_unlock(&Etm_mod_lock);
722 
723 			mp->epm_qstat = Q_OPEN;
724 			(void) pthread_mutex_unlock(&mp->epm_lock);
725 			fmd_hdl_debug(hdl, "queue open for %s",
726 			    mp->epm_ep_str);
727 		} else {
728 			(void) pthread_mutex_unlock(&mp->epm_lock);
729 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
730 			    "from %s\n", mp->epm_ep_str);
731 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
732 		}
733 
734 		rv = 0;
735 		break;
736 
737 	case ETM_HDR_SHUTDOWN:
738 		fmd_hdl_debug(hdl, "received shutdown from %s",
739 		    mp->epm_ep_str);
740 
741 		(void) pthread_mutex_lock(&mp->epm_lock);
742 
743 		etm_reinit(hdl, mp);
744 
745 		if (IS_CLIENT(mp)) {
746 			/*
747 			 * A server shutdown is considered to be temporary.
748 			 * Prepare for reconnection.
749 			 */
750 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
751 			    Reconn_interval);
752 
753 			mp->epm_timer_in_use = 1;
754 		}
755 
756 		(void) pthread_mutex_unlock(&mp->epm_lock);
757 
758 		rv = ECANCELED;
759 		break;
760 
761 	case ETM_HDR_MSG:
762 		(void) pthread_mutex_lock(&mp->epm_lock);
763 		if (mp->epm_qstat == Q_UNINITIALIZED) {
764 			/* Peer (client) is unaware that we've restarted */
765 			(void) pthread_mutex_unlock(&mp->epm_lock);
766 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
767 			    ETM_HDR_S_RESTART, 0);
768 
769 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
770 			    hdrlen)) != hdrlen) {
771 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
772 				    "to %s", mp->epm_ep_str);
773 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
774 				return (EIO);
775 			}
776 
777 			return (ECANCELED);
778 		}
779 		(void) pthread_mutex_unlock(&mp->epm_lock);
780 
781 		buflen = etm_get_msglen(hbuf);
782 		ALLOC_BUF(hdl, buf, buflen);
783 
784 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
785 		    buflen) != buflen) {
786 			fmd_hdl_debug(hdl, "failed to read message from %s",
787 			    mp->epm_ep_str);
788 			FREE_BUF(hdl, buf, buflen);
789 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
790 			return (EIO);
791 		}
792 
793 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
794 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
795 
796 		etm_hex_dump(hdl, buf, buflen, 0);
797 
798 		if (etm_post_msg(hdl, mp, buf, buflen)) {
799 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
800 			FREE_BUF(hdl, buf, buflen);
801 			return (EIO);
802 		}
803 
804 		FREE_BUF(hdl, buf, buflen);
805 
806 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
807 
808 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
809 		    hdrlen)) != hdrlen) {
810 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
811 			    mp->epm_ep_str);
812 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
813 			return (EIO);
814 		}
815 
816 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
817 
818 		/*
819 		 * If we got this far and the current state of the
820 		 * outbound/sending connection is TIMED_OUT or
821 		 * LIMBO, then we should reinitialize it.
822 		 */
823 		(void) pthread_mutex_lock(&mp->epm_lock);
824 		if (mp->epm_cstat == C_TIMED_OUT ||
825 		    mp->epm_cstat == C_LIMBO) {
826 			if (mp->epm_oconn != NULL) {
827 				(void) etm_xport_close(hdl, mp->epm_oconn);
828 				mp->epm_oconn = NULL;
829 			}
830 			mp->epm_cstat = C_UNINITIALIZED;
831 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
832 			if (mp->epm_timer_in_use) {
833 				fmd_timer_remove(hdl, mp->epm_timer_id);
834 				mp->epm_timer_in_use = 0;
835 			}
836 			mp->epm_qstat = Q_OPEN;
837 			fmd_hdl_debug(hdl, "queue resumed for %s",
838 			    mp->epm_ep_str);
839 		}
840 		(void) pthread_mutex_unlock(&mp->epm_lock);
841 
842 		rv = 0;
843 		break;
844 
845 	default:
846 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
847 		    "from %s : %d", mp->epm_ep_str, hdrstat);
848 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
849 		rv = 0;
850 	}
851 
852 	return (rv);
853 }
854 
855 /*
856  * ETM transport layer callback function.
857  * The transport layer calls this function to :
858  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
859  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
860  */
861 static int
862 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
863     void *arg)
864 {
865 	etm_epmap_t *mp = (etm_epmap_t *)arg;
866 	int rv = 0;
867 
868 	(void) pthread_mutex_lock(&Etm_mod_lock);
869 	if (Etm_exit) {
870 		(void) pthread_mutex_unlock(&Etm_mod_lock);
871 		return (ECANCELED);
872 	}
873 	(void) pthread_mutex_unlock(&Etm_mod_lock);
874 
875 	switch (flag) {
876 	case ETM_CBFLAG_RECV:
877 		rv = etm_recv(hdl, conn, mp);
878 		break;
879 	case ETM_CBFLAG_REINIT:
880 		(void) pthread_mutex_lock(&mp->epm_lock);
881 		etm_reinit(hdl, mp);
882 		etm_send_shutdown(hdl, mp);
883 		(void) pthread_mutex_unlock(&mp->epm_lock);
884 		/*
885 		 * Return ECANCELED so the transport layer will close the
886 		 * server connection.  The transport layer is responsible for
887 		 * reestablishing this connection (should a connection request
888 		 * arrive from the peer).
889 		 */
890 		rv = ECANCELED;
891 		break;
892 	default:
893 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
894 		rv = ENOTSUP;
895 	}
896 
897 	return (rv);
898 }
899 
900 /*
901  * Allocate and initialize an etm_epmap_t struct for the given endpoint
902  * name string.
903  */
904 static void
905 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
906 {
907 	etm_epmap_t *newmap;
908 
909 	if (etm_check_dup_ep_str(hdl, epname)) {
910 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
911 		return;
912 	}
913 
914 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
915 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
916 	newmap->epm_xprtflags = flags;
917 	newmap->epm_cstat = C_UNINITIALIZED;
918 	newmap->epm_qstat = Q_UNINITIALIZED;
919 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
920 	newmap->epm_txbusy = 0;
921 
922 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
923 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
924 
925 	if (etm_get_ep_nvl(hdl, newmap)) {
926 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
927 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
928 		return;
929 	}
930 
931 	(void) pthread_mutex_lock(&newmap->epm_lock);
932 
933 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
934 	    etm_cb_func, newmap)) == NULL) {
935 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
936 		    newmap->epm_ep_str);
937 		etm_free_ep_nvl(hdl, newmap);
938 		(void) pthread_mutex_unlock(&newmap->epm_lock);
939 		(void) pthread_mutex_destroy(&newmap->epm_lock);
940 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
941 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
942 		return;
943 	}
944 
945 	if (IS_CLIENT(newmap)) {
946 		if (etm_handle_startup(hdl, newmap)) {
947 			/*
948 			 * For whatever reason, we could not complete the
949 			 * startup handshake with the server.  Set the timer
950 			 * and try again.
951 			 */
952 			if (newmap->epm_oconn != NULL) {
953 				(void) etm_xport_close(hdl, newmap->epm_oconn);
954 				newmap->epm_oconn = NULL;
955 			}
956 			newmap->epm_cstat = C_UNINITIALIZED;
957 			newmap->epm_qstat = Q_UNINITIALIZED;
958 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
959 			    NULL, Reconn_interval);
960 			newmap->epm_timer_in_use = 1;
961 		}
962 	} else {
963 		/*
964 		 * We may be restarting after a crash.  If so, the client
965 		 * may be unaware of this.
966 		 */
967 		etm_send_shutdown(hdl, newmap);
968 	}
969 
970 	/* Add this transport instance handle to the list */
971 	newmap->epm_next = Epmap_head;
972 	Epmap_head = newmap;
973 
974 	(void) pthread_mutex_unlock(&newmap->epm_lock);
975 
976 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
977 }
978 
979 /*
980  * Parse the given property list string and call etm_init_epmap
981  * for each endpoint.
982  */
983 static void
984 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
985 {
986 	char *epstr, *ep, *prefix, *lasts, *numstr;
987 	char epname[MAXPATHLEN];
988 	size_t slen, nlen;
989 	int beg, end, i;
990 
991 	if (eplist == NULL)
992 		return;
993 	/*
994 	 * Create a copy of eplist for parsing.
995 	 * strtok/strtok_r(3C) will insert null chars to the string.
996 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
997 	 */
998 	slen = strlen(eplist);
999 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
1000 	(void) strcpy(epstr, eplist);
1001 
1002 	/*
1003 	 * The following are supported for the "client_list" and
1004 	 * "server_list" properties :
1005 	 *
1006 	 *    A space-separated list of endpoints.
1007 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
1008 	 *
1009 	 *    An array syntax for a range of instances.
1010 	 *	"dev:///dom[0:2]"
1011 	 *
1012 	 *    A combination of both.
1013 	 *	"dev:///dom0 dev:///dom[1:2]"
1014 	 */
1015 	ep = strtok_r(epstr, " ", &lasts);
1016 	while (ep != NULL) {
1017 		if (strchr(ep, '[') != NULL) {
1018 			/*
1019 			 * This string is using array syntax.
1020 			 * Check the string for correct syntax.
1021 			 */
1022 			if ((strchr(ep, ':') == NULL) ||
1023 			    (strchr(ep, ']') == NULL)) {
1024 				fmd_hdl_error(hdl, "Syntax error in property "
1025 				    "that includes : %s\n", ep);
1026 				ep = strtok_r(NULL, " ", &lasts);
1027 				continue;
1028 			}
1029 
1030 			/* expand the array syntax */
1031 			prefix = strtok(ep, "[");
1032 
1033 			numstr = strtok(NULL, ":");
1034 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1035 				fmd_hdl_error(hdl, "Syntax error in property "
1036 				    "that includes : %s[\n", prefix);
1037 				ep = strtok_r(NULL, " ", &lasts);
1038 				continue;
1039 			}
1040 			beg = atoi(numstr);
1041 
1042 			numstr = strtok(NULL, "]");
1043 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1044 				fmd_hdl_error(hdl, "Syntax error in property "
1045 				    "that includes : %s[\n", prefix);
1046 				ep = strtok_r(NULL, " ", &lasts);
1047 				continue;
1048 			}
1049 			end = atoi(numstr);
1050 
1051 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
1052 
1053 			if (nlen > MAXPATHLEN) {
1054 				fmd_hdl_error(hdl, "Endpoint prop string "
1055 				    "exceeds MAXPATHLEN\n");
1056 				ep = strtok_r(NULL, " ", &lasts);
1057 				continue;
1058 			}
1059 
1060 			for (i = beg; i <= end; i++) {
1061 				bzero(epname, MAXPATHLEN);
1062 				(void) snprintf(epname, nlen, "%s%d",
1063 				    prefix, i);
1064 				etm_init_epmap(hdl, epname, flags);
1065 			}
1066 		} else {
1067 			etm_init_epmap(hdl, ep, flags);
1068 		}
1069 
1070 		ep = strtok_r(NULL, " ", &lasts);
1071 	}
1072 
1073 	fmd_hdl_free(hdl, epstr, slen + 1);
1074 }
1075 
1076 /*
1077  * Free the transport infrastructure for an endpoint.
1078  */
1079 static void
1080 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1081 {
1082 	size_t hdrlen;
1083 	char hbuf[ETM_HDRLEN];
1084 
1085 	(void) pthread_mutex_lock(&mp->epm_lock);
1086 
1087 	/*
1088 	 * If an etm_send thread is in progress, wait for it to finish.
1089 	 * The etm_recv thread is managed by the transport layer and will
1090 	 * be destroyed with etm_xport_fini().
1091 	 */
1092 	while (mp->epm_txbusy)
1093 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1094 
1095 	if (mp->epm_timer_in_use)
1096 		fmd_timer_remove(hdl, mp->epm_timer_id);
1097 
1098 	if (mp->epm_oconn != NULL) {
1099 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1100 		    ETM_HDR_SHUTDOWN, 0);
1101 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1102 		    hdrlen);
1103 		(void) etm_xport_close(hdl, mp->epm_oconn);
1104 		mp->epm_oconn = NULL;
1105 	}
1106 
1107 	if (mp->epm_xprthdl != NULL) {
1108 		fmd_xprt_close(hdl, mp->epm_xprthdl);
1109 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1110 		mp->epm_ep_nvl = NULL;
1111 	}
1112 
1113 	if (mp->epm_ep_nvl != NULL)
1114 		etm_free_ep_nvl(hdl, mp);
1115 
1116 	if (mp->epm_tlhdl != NULL)
1117 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1118 
1119 	(void) pthread_mutex_unlock(&mp->epm_lock);
1120 	(void) pthread_mutex_destroy(&mp->epm_lock);
1121 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1122 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1123 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1124 }
1125 
1126 /*
1127  * FMD entry points
1128  */
1129 
1130 /*
1131  * FMD fmdo_send entry point.
1132  * Send an event to the remote endpoint and receive an ACK.
1133  */
1134 static int
1135 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1136 {
1137 	etm_epmap_t *mp;
1138 	nvlist_t *msgnvl;
1139 	int hdrstat, rv, cnt = 0;
1140 	char *buf, *nvbuf, *class;
1141 	size_t nvsize, buflen, hdrlen;
1142 	struct timespec tms;
1143 
1144 	(void) pthread_mutex_lock(&Etm_mod_lock);
1145 	if (Etm_exit) {
1146 		(void) pthread_mutex_unlock(&Etm_mod_lock);
1147 		return (FMD_SEND_RETRY);
1148 	}
1149 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1150 
1151 	mp = fmd_xprt_getspecific(hdl, xprthdl);
1152 
1153 	for (;;) {
1154 		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1155 			break;
1156 		} else {
1157 			/*
1158 			 * Another thread may be (1) trying to close this
1159 			 * fmd_xprt_t, or (2) posting an event to it.
1160 			 * If (1), don't want to spend too much time here.
1161 			 * If (2), allow it to finish and release epm_lock.
1162 			 */
1163 			if (cnt++ < 10) {
1164 				tms.tv_sec = 0;
1165 				tms.tv_nsec = (cnt * 10000);
1166 				(void) nanosleep(&tms, NULL);
1167 
1168 			} else {
1169 				return (FMD_SEND_RETRY);
1170 			}
1171 		}
1172 	}
1173 
1174 	mp->epm_txbusy++;
1175 
1176 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1177 		mp->epm_txbusy--;
1178 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1179 		(void) pthread_mutex_unlock(&mp->epm_lock);
1180 		return (FMD_SEND_FAILED);
1181 	}
1182 
1183 	if (mp->epm_cstat == C_CLOSED) {
1184 		etm_suspend_reconnect(hdl, mp);
1185 		mp->epm_txbusy--;
1186 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1187 		(void) pthread_mutex_unlock(&mp->epm_lock);
1188 		return (FMD_SEND_RETRY);
1189 	}
1190 
1191 	if (mp->epm_cstat == C_LIMBO) {
1192 		if (mp->epm_oconn != NULL) {
1193 			(void) etm_xport_close(hdl, mp->epm_oconn);
1194 			mp->epm_oconn = NULL;
1195 		}
1196 
1197 		fmd_xprt_suspend(hdl, xprthdl);
1198 		mp->epm_qstat = Q_SUSPENDED;
1199 		mp->epm_txbusy--;
1200 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1201 		(void) pthread_mutex_unlock(&mp->epm_lock);
1202 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1203 		return (FMD_SEND_RETRY);
1204 	}
1205 
1206 	if (mp->epm_oconn == NULL) {
1207 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1208 		    == NULL) {
1209 			etm_suspend_reconnect(hdl, mp);
1210 			mp->epm_txbusy--;
1211 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1212 			(void) pthread_mutex_unlock(&mp->epm_lock);
1213 			return (FMD_SEND_RETRY);
1214 		} else {
1215 			mp->epm_cstat = C_OPEN;
1216 		}
1217 	}
1218 
1219 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1220 		fmd_hdl_abort(hdl, "No class string in nvlist");
1221 
1222 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1223 	if (msgnvl == NULL) {
1224 		mp->epm_txbusy--;
1225 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1226 		(void) pthread_mutex_unlock(&mp->epm_lock);
1227 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1228 		    (void *) ep);
1229 		return (FMD_SEND_FAILED);
1230 	}
1231 
1232 	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1233 	if (rv == ETM_XPORT_FILTER_DROP) {
1234 		mp->epm_txbusy--;
1235 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1236 		(void) pthread_mutex_unlock(&mp->epm_lock);
1237 		fmd_hdl_debug(hdl, "send_filter dropped event");
1238 		nvlist_free(msgnvl);
1239 		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1240 		return (FMD_SEND_SUCCESS);
1241 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
1242 		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1243 		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1244 		/* Still send event */
1245 	}
1246 
1247 	(void) pthread_mutex_unlock(&mp->epm_lock);
1248 
1249 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1250 
1251 	hdrlen = ETM_HDRLEN;
1252 	buflen = nvsize + hdrlen;
1253 
1254 	ALLOC_BUF(hdl, buf, buflen);
1255 
1256 	nvbuf = buf + hdrlen;
1257 
1258 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1259 
1260 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1261 		(void) pthread_mutex_lock(&mp->epm_lock);
1262 		mp->epm_txbusy--;
1263 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1264 		(void) pthread_mutex_unlock(&mp->epm_lock);
1265 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1266 		nvlist_free(msgnvl);
1267 		FREE_BUF(hdl, buf, buflen);
1268 		return (FMD_SEND_FAILED);
1269 	}
1270 
1271 	nvlist_free(msgnvl);
1272 
1273 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1274 	    buflen) != buflen) {
1275 		fmd_hdl_debug(hdl, "failed to send message to %s",
1276 		    mp->epm_ep_str);
1277 		(void) pthread_mutex_lock(&mp->epm_lock);
1278 		etm_suspend_reconnect(hdl, mp);
1279 		mp->epm_txbusy--;
1280 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1281 		(void) pthread_mutex_unlock(&mp->epm_lock);
1282 		FREE_BUF(hdl, buf, buflen);
1283 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1284 		return (FMD_SEND_RETRY);
1285 	}
1286 
1287 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1288 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1289 
1290 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1291 
1292 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1293 	    hdrlen) != hdrlen) {
1294 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1295 		    mp->epm_ep_str);
1296 		(void) pthread_mutex_lock(&mp->epm_lock);
1297 		etm_suspend_reconnect(hdl, mp);
1298 		mp->epm_txbusy--;
1299 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1300 		(void) pthread_mutex_unlock(&mp->epm_lock);
1301 		FREE_BUF(hdl, buf, buflen);
1302 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1303 		return (FMD_SEND_RETRY);
1304 	}
1305 
1306 	hdrstat = etm_check_hdr(hdl, mp, buf);
1307 	FREE_BUF(hdl, buf, buflen);
1308 
1309 	if (hdrstat == ETM_HDR_ACK) {
1310 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1311 	} else {
1312 		(void) pthread_mutex_lock(&mp->epm_lock);
1313 
1314 		(void) etm_xport_close(hdl, mp->epm_oconn);
1315 		mp->epm_oconn = NULL;
1316 
1317 		if (hdrstat == ETM_HDR_NAK) {
1318 			/* Peer received a bad value in the header */
1319 			if (mp->epm_xprthdl != NULL) {
1320 				mp->epm_cstat = C_LIMBO;
1321 				fmd_xprt_suspend(hdl, xprthdl);
1322 				mp->epm_qstat = Q_SUSPENDED;
1323 				fmd_hdl_debug(hdl, "received NAK, queue "
1324 				    "suspended for %s", mp->epm_ep_str);
1325 			}
1326 
1327 			rv = FMD_SEND_RETRY;
1328 
1329 		} else if (hdrstat == ETM_HDR_S_RESTART) {
1330 			/* Server has restarted */
1331 			mp->epm_cstat = C_CLOSED;
1332 			mp->epm_qstat = Q_UNINITIALIZED;
1333 			fmd_hdl_debug(hdl, "server %s restarted",
1334 			    mp->epm_ep_str);
1335 			/*
1336 			 * Cannot call fmd_xprt_close here, so we'll do it
1337 			 * on the timeout thread.
1338 			 */
1339 			if (mp->epm_timer_in_use == 0) {
1340 				mp->epm_timer_id = fmd_timer_install(
1341 				    hdl, mp, NULL, 0);
1342 				mp->epm_timer_in_use = 1;
1343 			}
1344 
1345 			/*
1346 			 * fault.* or list.* events will be replayed if a
1347 			 * transport is opened with the same auth.
1348 			 * Other events will be discarded.
1349 			 */
1350 			rv = FMD_SEND_FAILED;
1351 
1352 		} else {
1353 			mp->epm_cstat = C_CLOSED;
1354 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1355 
1356 			rv = FMD_SEND_RETRY;
1357 		}
1358 
1359 		mp->epm_txbusy--;
1360 
1361 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1362 		(void) pthread_mutex_unlock(&mp->epm_lock);
1363 
1364 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1365 
1366 		return (rv);
1367 	}
1368 
1369 	(void) pthread_mutex_lock(&mp->epm_lock);
1370 	mp->epm_txbusy--;
1371 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1372 	(void) pthread_mutex_unlock(&mp->epm_lock);
1373 
1374 	return (FMD_SEND_SUCCESS);
1375 }
1376 
1377 /*
1378  * FMD fmdo_timeout entry point..
1379  */
1380 /*ARGSUSED*/
1381 static void
1382 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1383 {
1384 	etm_epmap_t *mp = (etm_epmap_t *)data;
1385 
1386 	(void) pthread_mutex_lock(&mp->epm_lock);
1387 
1388 	mp->epm_timer_in_use = 0;
1389 
1390 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1391 		/* Server has shutdown and we (client) need to reconnect */
1392 		if (mp->epm_xprthdl != NULL) {
1393 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1394 			fmd_hdl_debug(hdl, "queue closed for %s",
1395 			    mp->epm_ep_str);
1396 			mp->epm_xprthdl = NULL;
1397 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1398 			mp->epm_ep_nvl = NULL;
1399 		}
1400 
1401 		if (mp->epm_ep_nvl == NULL)
1402 			(void) etm_get_ep_nvl(hdl, mp);
1403 
1404 		if (etm_handle_startup(hdl, mp)) {
1405 			if (mp->epm_oconn != NULL) {
1406 				(void) etm_xport_close(hdl, mp->epm_oconn);
1407 				mp->epm_oconn = NULL;
1408 			}
1409 			mp->epm_cstat = C_UNINITIALIZED;
1410 			mp->epm_qstat = Q_UNINITIALIZED;
1411 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1412 			    Reconn_interval);
1413 			mp->epm_timer_in_use = 1;
1414 		}
1415 	} else {
1416 		etm_reconnect(hdl, mp);
1417 	}
1418 
1419 	(void) pthread_mutex_unlock(&mp->epm_lock);
1420 }
1421 
1422 /*
1423  * FMD Module declarations
1424  */
1425 static const fmd_hdl_ops_t etm_ops = {
1426 	NULL,		/* fmdo_recv */
1427 	etm_timeout,	/* fmdo_timeout */
1428 	NULL,		/* fmdo_close */
1429 	NULL,		/* fmdo_stats */
1430 	NULL,		/* fmdo_gc */
1431 	etm_send,	/* fmdo_send */
1432 };
1433 
1434 static const fmd_prop_t etm_props[] = {
1435 	{ "client_list", FMD_TYPE_STRING, NULL },
1436 	{ "server_list", FMD_TYPE_STRING, NULL },
1437 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1438 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1439 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1440 	{ "filter_path", FMD_TYPE_STRING, NULL },
1441 	{ NULL, 0, NULL }
1442 };
1443 
1444 static const fmd_hdl_info_t etm_info = {
1445 	"Event Transport Module", "2.0", &etm_ops, etm_props
1446 };
1447 
1448 /*
1449  * Initialize the transport for use by ETM.
1450  */
1451 void
1452 _fmd_init(fmd_hdl_t *hdl)
1453 {
1454 	char *propstr;
1455 
1456 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1457 		return; /* invalid data in configuration file */
1458 	}
1459 
1460 	/* Create global stats */
1461 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1462 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1463 
1464 	/* Get module properties */
1465 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1466 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1467 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1468 
1469 	propstr = fmd_prop_get_string(hdl, "client_list");
1470 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1471 	fmd_prop_free_string(hdl, propstr);
1472 
1473 	propstr = fmd_prop_get_string(hdl, "server_list");
1474 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1475 	fmd_prop_free_string(hdl, propstr);
1476 
1477 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1478 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1479 		fmd_hdl_unregister(hdl);
1480 		return;
1481 	}
1482 }
1483 
1484 /*
1485  * Teardown the transport
1486  */
1487 void
1488 _fmd_fini(fmd_hdl_t *hdl)
1489 {
1490 	etm_epmap_t *mp, *next;
1491 
1492 	(void) pthread_mutex_lock(&Etm_mod_lock);
1493 	Etm_exit = 1;
1494 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1495 
1496 	mp = Epmap_head;
1497 
1498 	while (mp) {
1499 		next = mp->epm_next;
1500 		etm_free_epmap(hdl, mp);
1501 		mp = next;
1502 	}
1503 
1504 	fmd_hdl_unregister(hdl);
1505 }
1506