xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision d12abe7ce2663ac39e686a14960eb4febf560195)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * FMA Event Transport Module
31  *
32  * Plugin for sending/receiving FMA events to/from a remote endoint.
33  */
34 
35 #include <netinet/in.h>
36 #include <sys/fm/protocol.h>
37 #include <sys/sysmacros.h>
38 #include <pthread.h>
39 #include <strings.h>
40 #include <ctype.h>
41 #include <link.h>
42 #include <libnvpair.h>
43 #include "etm_xport_api.h"
44 #include "etm_proto.h"
45 
46 /*
47  * ETM declarations
48  */
49 
50 typedef enum etm_connection_status {
51 	C_UNINITIALIZED = 0,
52 	C_OPEN,				/* Connection is open */
53 	C_CLOSED,			/* Connection is closed */
54 	C_LIMBO,			/* Bad value in header from peer */
55 	C_TIMED_OUT			/* Reconnection to peer timed out */
56 } etm_connstat_t;
57 
58 typedef enum etm_fmd_queue_status {
59 	Q_UNINITIALIZED = 100,
60 	Q_INIT_PENDING,			/* Queue initialization in progress */
61 	Q_OPEN,				/* Queue is open */
62 	Q_SUSPENDED			/* Queue is suspended */
63 } etm_qstat_t;
64 
65 /* Per endpoint data */
66 typedef struct etm_endpoint_map {
67 	uint8_t epm_ver;		/* Protocol version being used */
68 	char *epm_ep_str;		/* Endpoint ID string */
69 	int epm_xprtflags;		/* FMD transport open flags */
70 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
71 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
72 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
73 	int epm_txbusy;			/* Busy doing send/transmit */
74 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
75 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
76 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
77 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
78 	etm_connstat_t epm_cstat;	/* Status of connection */
79 	id_t epm_timer_id;		/* Timer id */
80 	int epm_timer_in_use;		/* Indicates if timer is in use */
81 	hrtime_t epm_reconn_end;	/* Reconnection end time */
82 	struct etm_endpoint_map *epm_next;
83 } etm_epmap_t;
84 
85 #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
86 #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
87 #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
88 #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
89 #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
90 #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
91 
92 #define	ALLOC_BUF(hdl, buf, size) \
93 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
94 
95 #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
96 
97 #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
98 
99 #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
100 				(x)++;					    \
101 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
102 			}
103 
104 #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
105 				(x)--;					    \
106 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
107 			}
108 
109 #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
110 				(x) += (y);				    \
111 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
112 			}
113 
114 /*
115  * Global variables
116  */
117 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
118 					/* Protects globals */
119 static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
120 static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
121 static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
122 static int Etm_dump = 0;		/* Enables hex dump for debug */
123 static int Etm_exit = 0;		/* Flag for exit */
124 static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
125 
126 /* Module statistics */
127 static struct etm_stats {
128 	/* read counters */
129 	fmd_stat_t read_ack;
130 	fmd_stat_t read_bytes;
131 	fmd_stat_t read_msg;
132 	/* write counters */
133 	fmd_stat_t write_ack;
134 	fmd_stat_t write_bytes;
135 	fmd_stat_t write_msg;
136 	/* error counters */
137 	fmd_stat_t error_protocol;
138 	fmd_stat_t error_drop_read;
139 	fmd_stat_t error_read;
140 	fmd_stat_t error_read_badhdr;
141 	fmd_stat_t error_write;
142 	/* misc */
143 	fmd_stat_t peer_count;
144 
145 } Etm_stats = {
146 	/* read counters */
147 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
148 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
149 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
150 	/* write counters */
151 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
152 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
153 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
154 	/* ETM error counters */
155 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
156 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
157 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
158 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
159 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
160 	/* ETM Misc */
161 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
162 };
163 
164 /*
165  * ETM Private functions
166  */
167 
168 /*
169  * Hex dump for debug.
170  */
171 static void
172 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
173 {
174 	int i, j, k;
175 	int16_t *c;
176 
177 	if (Etm_dump == 0)
178 		return;
179 
180 	j = buflen / 16;	/* Number of complete 8-column rows */
181 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
182 
183 	if (direction)
184 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
185 	else
186 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
187 
188 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
189 
190 	/* Dump the complete 8-column rows */
191 	for (i = 0; i < j; i++) {
192 		c = (int16_t *)buf + (i * 8);
193 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
194 		    *(c+0), *(c+1), *(c+2), *(c+3),
195 		    *(c+4), *(c+5), *(c+6), *(c+7));
196 	}
197 
198 	/* Dump the last (incomplete) row */
199 	c = (int16_t *)buf + (i * 8);
200 	switch (k) {
201 	case 4:
202 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
203 		break;
204 	case 8:
205 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
206 		    *(c+2), *(c+3));
207 		break;
208 	case 12:
209 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
210 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
211 		break;
212 	}
213 
214 	fmd_hdl_debug(hdl, "---      End Dump      ---");
215 }
216 
217 /*
218  * Provide the length of a message based on the data in the given ETM header.
219  */
220 static size_t
221 etm_get_msglen(void *buf)
222 {
223 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
224 
225 	return (ntohl(hp->hdr_msglen));
226 }
227 
228 /*
229  * Check the contents of the ETM header for errors.
230  * Return the header type (hdr_type).
231  */
232 static int
233 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
234 {
235 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
236 
237 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
238 		fmd_hdl_error(hdl, "Bad delimiter in ETM header from %s "
239 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
240 		return (ETM_HDR_INVALID);
241 	}
242 
243 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
244 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
245 		/* Until version is negotiated, other fields may be wrong */
246 		return (hp->hdr_type);
247 	}
248 
249 	if (hp->hdr_ver != mp->epm_ver) {
250 		fmd_hdl_error(hdl, "Bad version in ETM header from %s : 0x%x\n",
251 		    mp->epm_ep_str, hp->hdr_ver);
252 		return (ETM_HDR_BADVERSION);
253 	}
254 
255 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
256 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
257 		fmd_hdl_error(hdl, "Bad type in ETM header from %s : 0x%x\n",
258 		    mp->epm_ep_str, hp->hdr_type);
259 		return (ETM_HDR_BADTYPE);
260 	}
261 
262 	return (hp->hdr_type);
263 }
264 
265 /*
266  * Create an ETM header of a given type in the given buffer.
267  * Return length of header.
268  */
269 static size_t
270 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
271 {
272 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
273 
274 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
275 	hp->hdr_ver = ver;
276 	hp->hdr_type = type;
277 	hp->hdr_msglen = htonl(msglen);
278 
279 	return (ETM_HDRLEN);
280 }
281 
282 /*
283  * Convert message bytes to nvlist and post to fmd.
284  * Return zero for success, non-zero for failure.
285  *
286  * Note : nvl is free'd by fmd.
287  */
288 static int
289 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
290 {
291 	nvlist_t *nvl;
292 	int rv;
293 
294 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
295 		fmd_hdl_debug(hdl, "failed to unpack message");
296 		return (1);
297 	}
298 
299 	(void) pthread_mutex_lock(&mp->epm_lock);
300 	(void) pthread_mutex_lock(&Etm_mod_lock);
301 	if (!Etm_exit) {
302 		(void) pthread_mutex_unlock(&Etm_mod_lock);
303 		if (mp->epm_qstat == Q_OPEN) {
304 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
305 			rv = 0;
306 		} else if (mp->epm_qstat == Q_SUSPENDED) {
307 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
308 			if (mp->epm_timer_in_use) {
309 				fmd_timer_remove(hdl, mp->epm_timer_id);
310 				mp->epm_timer_in_use = 0;
311 			}
312 			mp->epm_qstat = Q_OPEN;
313 			fmd_hdl_debug(hdl, "queue resumed for %s",
314 			    mp->epm_ep_str);
315 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
316 			rv = 0;
317 		} else {
318 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
319 			    mp->epm_qstat);
320 			rv = 2;
321 		}
322 	} else {
323 		(void) pthread_mutex_unlock(&Etm_mod_lock);
324 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
325 		rv = 3;
326 	}
327 
328 	(void) pthread_mutex_unlock(&mp->epm_lock);
329 
330 	return (rv);
331 }
332 
333 /*
334  * Handle the startup handshake to the server.  The client always initiates
335  * the startup handshake.  In the following sequence, we are the client and
336  * the remote endpoint is the server.
337  *
338  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
339  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
340  *	Client sends ACK and transitions to Q_OPEN state.
341  *	Server receives ACK and transitions to Q_OPEN state.
342  *
343  * Return 0 for success, nonzero for failure.
344  */
345 static int
346 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
347 {
348 	etm_proto_hdr_t *hp;
349 	size_t hdrlen = ETM_HDRLEN;
350 	int hdrstat;
351 	char hbuf[ETM_HDRLEN];
352 
353 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
354 		return (1);
355 
356 	mp->epm_cstat = C_OPEN;
357 
358 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
359 
360 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
361 	    hdrlen)) != hdrlen) {
362 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
363 		    mp->epm_ep_str);
364 		return (2);
365 	}
366 
367 	mp->epm_qstat = Q_INIT_PENDING;
368 
369 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
370 	    hdrlen)) != hdrlen) {
371 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
372 		    mp->epm_ep_str);
373 		return (3);
374 	}
375 
376 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
377 
378 	if (hdrstat != ETM_HDR_S_HELLO) {
379 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
380 		    "from %s", mp->epm_ep_str);
381 		return (4);
382 	}
383 
384 	/*
385 	 * Get version from the server.
386 	 * Currently, only one version is supported.
387 	 */
388 	hp = (etm_proto_hdr_t *)(void *)hbuf;
389 	if (hp->hdr_ver != ETM_PROTO_V1) {
390 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
391 		    mp->epm_ep_str, hp->hdr_ver);
392 		return (5);
393 	}
394 	mp->epm_ver = hp->hdr_ver;
395 
396 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
397 
398 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
399 	    hdrlen)) != hdrlen) {
400 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
401 		    mp->epm_ep_str);
402 		return (6);
403 	}
404 
405 	/*
406 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
407 	 * Etm_mod_lock held to avoid race with etm_send thread.
408 	 */
409 	(void) pthread_mutex_lock(&Etm_mod_lock);
410 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
411 	    mp->epm_ep_nvl, NULL)) == NULL) {
412 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
413 		    mp->epm_ep_str);
414 	}
415 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
416 	(void) pthread_mutex_unlock(&Etm_mod_lock);
417 
418 	mp->epm_qstat = Q_OPEN;
419 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
420 
421 	return (0);
422 }
423 
424 /*
425  * Alloc a nvlist and add a string for the endpoint.
426  * Return zero for success, non-zero for failure.
427  */
428 static int
429 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
430 {
431 	/*
432 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
433 	 * in fmd when this nvlist_t is free'd.
434 	 */
435 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
436 
437 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
438 		fmd_hdl_debug(hdl, "failed to add domain-id string to nvlist "
439 		    "for %s", mp->epm_ep_str);
440 		nvlist_free(mp->epm_ep_nvl);
441 		return (1);
442 	}
443 
444 	return (0);
445 }
446 
447 /*
448  * Free the nvlist for the endpoint_id string.
449  */
450 /*ARGSUSED*/
451 static void
452 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
453 {
454 	nvlist_free(mp->epm_ep_nvl);
455 }
456 
457 /*
458  * Check for a duplicate endpoint/peer string.
459  */
460 /*ARGSUSED*/
461 static int
462 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
463 {
464 	etm_epmap_t *mp;
465 
466 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
467 		if (strcmp(epname, mp->epm_ep_str) == 0)
468 			return (1);
469 
470 	return (0);
471 }
472 
473 /*
474  * Attempt to re-open a connection with the remote endpoint.
475  */
476 static void
477 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
478 {
479 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
480 		if (gethrtime() < mp->epm_reconn_end) {
481 			if ((mp->epm_oconn = etm_xport_open(hdl,
482 			    mp->epm_tlhdl)) == NULL) {
483 				fmd_hdl_debug(hdl, "reconnect failed for %s",
484 				    mp->epm_ep_str);
485 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
486 				    NULL, Reconn_interval);
487 				mp->epm_timer_in_use = 1;
488 			} else {
489 				fmd_hdl_debug(hdl, "reconnect success for %s",
490 				    mp->epm_ep_str);
491 				mp->epm_reconn_end = 0;
492 				mp->epm_cstat = C_OPEN;
493 			}
494 		} else {
495 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
496 			    mp->epm_ep_str);
497 			mp->epm_reconn_end = 0;
498 			mp->epm_cstat = C_TIMED_OUT;
499 		}
500 	}
501 
502 	if (mp->epm_cstat == C_OPEN) {
503 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
504 		mp->epm_qstat = Q_OPEN;
505 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
506 	}
507 }
508 
509 /*
510  * Suspend a given connection and setup for reconnection retries.
511  */
512 static void
513 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
514 {
515 	(void) pthread_mutex_lock(&Etm_mod_lock);
516 	if (Etm_exit) {
517 		(void) pthread_mutex_unlock(&Etm_mod_lock);
518 		return;
519 	}
520 	(void) pthread_mutex_unlock(&Etm_mod_lock);
521 
522 	(void) pthread_mutex_lock(&mp->epm_lock);
523 
524 	if (mp->epm_oconn != NULL) {
525 		(void) etm_xport_close(hdl, mp->epm_oconn);
526 		mp->epm_oconn = NULL;
527 	}
528 
529 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
530 	mp->epm_cstat = C_UNINITIALIZED;
531 
532 	if (mp->epm_xprthdl != NULL) {
533 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
534 		mp->epm_qstat = Q_SUSPENDED;
535 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
536 
537 		if (mp->epm_timer_in_use == 0) {
538 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
539 			    Reconn_interval);
540 			mp->epm_timer_in_use = 1;
541 		}
542 	}
543 
544 	(void) pthread_mutex_unlock(&mp->epm_lock);
545 }
546 
547 /*
548  * Reinitialize the connection. The old fmd_xprt_t handle must be
549  * removed/closed first.
550  * Assume caller holds lock on epm_lock.
551  */
552 static void
553 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
554 {
555 	/*
556 	 * To avoid a deadlock, wait for etm_send to finish before
557 	 * calling fmd_xprt_close()
558 	 */
559 	while (mp->epm_txbusy)
560 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
561 
562 	if (mp->epm_xprthdl != NULL) {
563 		fmd_xprt_close(hdl, mp->epm_xprthdl);
564 		fmd_hdl_debug(hdl, "queue closed for %s",  mp->epm_ep_str);
565 		mp->epm_xprthdl = NULL;
566 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
567 		mp->epm_ep_nvl = NULL;
568 	}
569 
570 	if (mp->epm_timer_in_use) {
571 		fmd_timer_remove(hdl, mp->epm_timer_id);
572 		mp->epm_timer_in_use = 0;
573 	}
574 
575 	if (mp->epm_oconn != NULL) {
576 		(void) etm_xport_close(hdl, mp->epm_oconn);
577 		mp->epm_oconn = NULL;
578 	}
579 
580 	mp->epm_cstat = C_UNINITIALIZED;
581 	mp->epm_qstat = Q_UNINITIALIZED;
582 }
583 
584 /*
585  * Receive data from ETM transport layer.
586  * Note : This is not the fmdo_recv entry point.
587  *
588  */
589 static int
590 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
591 {
592 	size_t buflen, hdrlen;
593 	void *buf;
594 	char hbuf[ETM_HDRLEN];
595 	int hdrstat, rv;
596 
597 	hdrlen = ETM_HDRLEN;
598 
599 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
600 		fmd_hdl_debug(hdl, "failed to read header from %s",
601 		    mp->epm_ep_str);
602 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
603 		return (EIO);
604 	}
605 
606 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
607 
608 	switch (hdrstat) {
609 	case ETM_HDR_INVALID:
610 		(void) pthread_mutex_lock(&mp->epm_lock);
611 		if (mp->epm_cstat == C_OPEN)
612 			mp->epm_cstat = C_CLOSED;
613 		(void) pthread_mutex_unlock(&mp->epm_lock);
614 
615 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
616 		rv = ECANCELED;
617 		break;
618 
619 	case ETM_HDR_BADTYPE:
620 	case ETM_HDR_BADVERSION:
621 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
622 
623 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
624 		    hdrlen)) != hdrlen) {
625 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
626 			    mp->epm_ep_str);
627 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
628 			return (EIO);
629 		}
630 
631 		(void) pthread_mutex_lock(&mp->epm_lock);
632 		mp->epm_cstat = C_LIMBO;
633 		(void) pthread_mutex_unlock(&mp->epm_lock);
634 
635 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
636 		rv = ENOTSUP;
637 		break;
638 
639 	case ETM_HDR_C_HELLO:
640 		/* Client is initiating a startup handshake */
641 		(void) pthread_mutex_lock(&mp->epm_lock);
642 		etm_reinit(hdl, mp);
643 		mp->epm_qstat = Q_INIT_PENDING;
644 		(void) pthread_mutex_unlock(&mp->epm_lock);
645 
646 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
647 
648 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
649 		    hdrlen)) != hdrlen) {
650 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
651 			    mp->epm_ep_str);
652 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
653 			return (EIO);
654 		}
655 
656 		rv = 0;
657 		break;
658 
659 	case ETM_HDR_ACK:
660 		(void) pthread_mutex_lock(&mp->epm_lock);
661 		if (mp->epm_qstat == Q_INIT_PENDING) {
662 			/* This is client's ACK from startup handshake */
663 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
664 			if (mp->epm_ep_nvl == NULL)
665 				(void) etm_get_ep_nvl(hdl, mp);
666 
667 			/*
668 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
669 			 * Etm_mod_lock held to avoid race with etm_send thread.
670 			 */
671 			(void) pthread_mutex_lock(&Etm_mod_lock);
672 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
673 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
674 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
675 				    "for %s", mp->epm_ep_str);
676 			}
677 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
678 			(void) pthread_mutex_unlock(&Etm_mod_lock);
679 
680 			mp->epm_qstat = Q_OPEN;
681 			(void) pthread_mutex_unlock(&mp->epm_lock);
682 			fmd_hdl_debug(hdl, "queue open for %s",
683 			    mp->epm_ep_str);
684 		} else {
685 			(void) pthread_mutex_unlock(&mp->epm_lock);
686 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
687 			    "from %s\n", mp->epm_ep_str);
688 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
689 		}
690 
691 		rv = 0;
692 		break;
693 
694 	case ETM_HDR_SHUTDOWN:
695 		fmd_hdl_debug(hdl, "received shutdown from %s",
696 		    mp->epm_ep_str);
697 
698 		(void) pthread_mutex_lock(&mp->epm_lock);
699 
700 		etm_reinit(hdl, mp);
701 
702 		if (IS_CLIENT(mp)) {
703 			/*
704 			 * A server shutdown is considered to be temporary.
705 			 * Prepare for reconnection.
706 			 */
707 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
708 			    Reconn_interval);
709 
710 			mp->epm_timer_in_use = 1;
711 		}
712 
713 		(void) pthread_mutex_unlock(&mp->epm_lock);
714 
715 		rv = ECANCELED;
716 		break;
717 
718 	case ETM_HDR_MSG:
719 		(void) pthread_mutex_lock(&mp->epm_lock);
720 		if (mp->epm_qstat == Q_UNINITIALIZED) {
721 			/* Peer (client) is unaware that we've restarted */
722 			(void) pthread_mutex_unlock(&mp->epm_lock);
723 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
724 			    ETM_HDR_S_RESTART, 0);
725 
726 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
727 			    hdrlen)) != hdrlen) {
728 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
729 				    "to %s", mp->epm_ep_str);
730 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
731 				return (EIO);
732 			}
733 
734 			return (ECANCELED);
735 		}
736 		(void) pthread_mutex_unlock(&mp->epm_lock);
737 
738 		buflen = etm_get_msglen(hbuf);
739 		ALLOC_BUF(hdl, buf, buflen);
740 
741 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
742 		    buflen) != buflen) {
743 			fmd_hdl_debug(hdl, "failed to read message from %s",
744 			    mp->epm_ep_str);
745 			FREE_BUF(hdl, buf, buflen);
746 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
747 			return (EIO);
748 		}
749 
750 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
751 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
752 
753 		etm_hex_dump(hdl, buf, buflen, 0);
754 
755 		if (etm_post_msg(hdl, mp, buf, buflen)) {
756 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
757 			FREE_BUF(hdl, buf, buflen);
758 			return (EIO);
759 		}
760 
761 		FREE_BUF(hdl, buf, buflen);
762 
763 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
764 
765 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
766 		    hdrlen)) != hdrlen) {
767 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
768 			    mp->epm_ep_str);
769 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
770 			return (EIO);
771 		}
772 
773 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
774 
775 		/*
776 		 * If we got this far and the current state of the
777 		 * outbound/sending connection is TIMED_OUT or
778 		 * LIMBO, then we should reinitialize it.
779 		 */
780 		(void) pthread_mutex_lock(&mp->epm_lock);
781 		if (mp->epm_cstat == C_TIMED_OUT ||
782 		    mp->epm_cstat == C_LIMBO) {
783 			if (mp->epm_oconn != NULL) {
784 				(void) etm_xport_close(hdl, mp->epm_oconn);
785 				mp->epm_oconn = NULL;
786 			}
787 			mp->epm_cstat = C_UNINITIALIZED;
788 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
789 			if (mp->epm_timer_in_use) {
790 				fmd_timer_remove(hdl, mp->epm_timer_id);
791 				mp->epm_timer_in_use = 0;
792 			}
793 			mp->epm_qstat = Q_OPEN;
794 			fmd_hdl_debug(hdl, "queue resumed for %s",
795 			    mp->epm_ep_str);
796 		}
797 		(void) pthread_mutex_unlock(&mp->epm_lock);
798 
799 		rv = 0;
800 		break;
801 
802 	default:
803 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
804 		    "from %s : %d", mp->epm_ep_str, hdrstat);
805 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
806 		rv = 0;
807 	}
808 
809 	return (rv);
810 }
811 
812 /*
813  * ETM transport layer callback function.
814  * The transport layer calls this function to :
815  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
816  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
817  */
818 static int
819 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
820     void *arg)
821 {
822 	etm_epmap_t *mp = (etm_epmap_t *)arg;
823 	int rv = 0;
824 
825 	(void) pthread_mutex_lock(&Etm_mod_lock);
826 	if (Etm_exit) {
827 		(void) pthread_mutex_unlock(&Etm_mod_lock);
828 		return (ECANCELED);
829 	}
830 	(void) pthread_mutex_unlock(&Etm_mod_lock);
831 
832 	switch (flag) {
833 	case ETM_CBFLAG_RECV:
834 		rv = etm_recv(hdl, conn, mp);
835 		break;
836 	case ETM_CBFLAG_REINIT:
837 		(void) pthread_mutex_lock(&mp->epm_lock);
838 		etm_reinit(hdl, mp);
839 		(void) pthread_mutex_unlock(&mp->epm_lock);
840 		/*
841 		 * Return ECANCELED so the transport layer will close the
842 		 * server connection.  The transport layer is responsible for
843 		 * reestablishing this connection (should a connection request
844 		 * arrive from the peer).
845 		 */
846 		rv = ECANCELED;
847 		break;
848 	default:
849 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
850 		rv = ENOTSUP;
851 	}
852 
853 	return (rv);
854 }
855 
856 /*
857  * Allocate and initialize an etm_epmap_t struct for the given endpoint
858  * name string.
859  */
860 static void
861 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
862 {
863 	etm_epmap_t *newmap;
864 
865 	if (etm_check_dup_ep_str(hdl, epname)) {
866 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
867 		return;
868 	}
869 
870 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
871 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
872 	newmap->epm_xprtflags = flags;
873 	newmap->epm_cstat = C_UNINITIALIZED;
874 	newmap->epm_qstat = Q_UNINITIALIZED;
875 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
876 	newmap->epm_txbusy = 0;
877 
878 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
879 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
880 
881 	if (etm_get_ep_nvl(hdl, newmap)) {
882 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
883 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
884 		return;
885 	}
886 
887 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
888 	    etm_cb_func, newmap)) == NULL) {
889 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
890 		    newmap->epm_ep_str);
891 		etm_free_ep_nvl(hdl, newmap);
892 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
893 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
894 		return;
895 	}
896 
897 	if (IS_CLIENT(newmap)) {
898 		if (etm_handle_startup(hdl, newmap)) {
899 			etm_free_ep_nvl(hdl, newmap);
900 			(void) etm_xport_fini(hdl, newmap->epm_tlhdl);
901 			fmd_hdl_strfree(hdl, newmap->epm_ep_str);
902 			fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
903 			return;
904 		}
905 	}
906 
907 	/* Add this transport instance handle to the list */
908 	newmap->epm_next = Epmap_head;
909 	Epmap_head = newmap;
910 
911 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
912 }
913 
914 /*
915  * Parse the given property list string and call etm_init_epmap
916  * for each endpoint.
917  */
918 static void
919 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
920 {
921 	char *epstr, *ep, *prefix, *lasts, *numstr;
922 	char epname[MAXPATHLEN];
923 	size_t slen, nlen;
924 	int beg, end, i;
925 
926 	if (eplist == NULL)
927 		return;
928 	/*
929 	 * Create a copy of eplist for parsing.
930 	 * strtok/strtok_r(3C) will insert null chars to the string.
931 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
932 	 */
933 	slen = strlen(eplist);
934 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
935 	(void) strcpy(epstr, eplist);
936 
937 	/*
938 	 * The following are supported for the "client_list" and
939 	 * "server_list" properties :
940 	 *
941 	 *    A space-separated list of endpoints.
942 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
943 	 *
944 	 *    An array syntax for a range of instances.
945 	 *	"dev:///dom[0:2]"
946 	 *
947 	 *    A combination of both.
948 	 *	"dev:///dom0 dev:///dom[1:2]"
949 	 */
950 	ep = strtok_r(epstr, " ", &lasts);
951 	while (ep != NULL) {
952 		if (strchr(ep, '[') != NULL) {
953 			/*
954 			 * This string is using array syntax.
955 			 * Check the string for correct syntax.
956 			 */
957 			if ((strchr(ep, ':') == NULL) ||
958 			    (strchr(ep, ']') == NULL)) {
959 				fmd_hdl_error(hdl, "Syntax error in property "
960 				    "that includes : %s\n", ep);
961 				ep = strtok_r(NULL, " ", &lasts);
962 				continue;
963 			}
964 
965 			/* expand the array syntax */
966 			prefix = strtok(ep, "[");
967 
968 			numstr = strtok(NULL, ":");
969 			if ((numstr == NULL) || (!isdigit(*numstr))) {
970 				fmd_hdl_error(hdl, "Syntax error in property "
971 				    "that includes : %s[\n", prefix);
972 				ep = strtok_r(NULL, " ", &lasts);
973 				continue;
974 			}
975 			beg = atoi(numstr);
976 
977 			numstr = strtok(NULL, "]");
978 			if ((numstr == NULL) || (!isdigit(*numstr))) {
979 				fmd_hdl_error(hdl, "Syntax error in property "
980 				    "that includes : %s[\n", prefix);
981 				ep = strtok_r(NULL, " ", &lasts);
982 				continue;
983 			}
984 			end = atoi(numstr);
985 
986 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
987 
988 			if (nlen > MAXPATHLEN) {
989 				fmd_hdl_error(hdl, "Endpoint prop string "
990 				    "exceeds MAXPATHLEN\n");
991 				ep = strtok_r(NULL, " ", &lasts);
992 				continue;
993 			}
994 
995 			for (i = beg; i <= end; i++) {
996 				bzero(epname, MAXPATHLEN);
997 				(void) snprintf(epname, nlen, "%s%d",
998 				    prefix, i);
999 				etm_init_epmap(hdl, epname, flags);
1000 			}
1001 		} else {
1002 			etm_init_epmap(hdl, ep, flags);
1003 		}
1004 
1005 		ep = strtok_r(NULL, " ", &lasts);
1006 	}
1007 
1008 	fmd_hdl_free(hdl, epstr, slen + 1);
1009 }
1010 
1011 /*
1012  * Free the transport infrastructure for an endpoint.
1013  */
1014 static void
1015 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1016 {
1017 	size_t hdrlen;
1018 	char hbuf[ETM_HDRLEN];
1019 
1020 	(void) pthread_mutex_lock(&mp->epm_lock);
1021 
1022 	/*
1023 	 * If an etm_send thread is in progress, wait for it to finish.
1024 	 * The etm_recv thread is managed by the transport layer and will
1025 	 * be destroyed with etm_xport_fini().
1026 	 */
1027 	while (mp->epm_txbusy)
1028 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1029 
1030 	if (mp->epm_timer_in_use)
1031 		fmd_timer_remove(hdl, mp->epm_timer_id);
1032 
1033 	if (mp->epm_oconn != NULL) {
1034 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1035 		    ETM_HDR_SHUTDOWN, 0);
1036 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1037 		    hdrlen);
1038 		(void) etm_xport_close(hdl, mp->epm_oconn);
1039 		mp->epm_oconn = NULL;
1040 	}
1041 
1042 	if (mp->epm_xprthdl != NULL) {
1043 		fmd_xprt_close(hdl, mp->epm_xprthdl);
1044 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1045 		mp->epm_ep_nvl = NULL;
1046 	}
1047 
1048 	if (mp->epm_ep_nvl != NULL)
1049 		etm_free_ep_nvl(hdl, mp);
1050 
1051 	if (mp->epm_tlhdl != NULL)
1052 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1053 
1054 	(void) pthread_mutex_unlock(&mp->epm_lock);
1055 	(void) pthread_mutex_destroy(&mp->epm_lock);
1056 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1057 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1058 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1059 }
1060 
1061 /*
1062  * FMD entry points
1063  */
1064 
1065 /*
1066  * FMD fmdo_send entry point.
1067  * Send an event to the remote endpoint and receive an ACK.
1068  */
1069 static int
1070 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1071 {
1072 	etm_epmap_t *mp;
1073 	nvlist_t *msgnvl;
1074 	int hdrstat, rv;
1075 	char *buf, *nvbuf, *class;
1076 	size_t nvsize, buflen, hdrlen;
1077 
1078 	(void) pthread_mutex_lock(&Etm_mod_lock);
1079 	if (Etm_exit) {
1080 		(void) pthread_mutex_unlock(&Etm_mod_lock);
1081 		return (FMD_SEND_RETRY);
1082 	}
1083 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1084 
1085 	mp = fmd_xprt_getspecific(hdl, xprthdl);
1086 
1087 	(void) pthread_mutex_lock(&mp->epm_lock);
1088 
1089 	mp->epm_txbusy++;
1090 
1091 	if (mp->epm_cstat == C_CLOSED) {
1092 		mp->epm_txbusy--;
1093 		(void) pthread_mutex_unlock(&mp->epm_lock);
1094 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1095 		etm_suspend_reconnect(hdl, mp);
1096 		return (FMD_SEND_RETRY);
1097 	}
1098 
1099 	if (mp->epm_cstat == C_LIMBO) {
1100 		if (mp->epm_oconn != NULL) {
1101 			(void) etm_xport_close(hdl, mp->epm_oconn);
1102 			mp->epm_oconn = NULL;
1103 		}
1104 
1105 		fmd_xprt_suspend(hdl, xprthdl);
1106 		mp->epm_qstat = Q_SUSPENDED;
1107 		mp->epm_txbusy--;
1108 		(void) pthread_mutex_unlock(&mp->epm_lock);
1109 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1110 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1111 		return (FMD_SEND_RETRY);
1112 	}
1113 
1114 	if (mp->epm_oconn == NULL) {
1115 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1116 		    == NULL) {
1117 			mp->epm_txbusy--;
1118 			(void) pthread_mutex_unlock(&mp->epm_lock);
1119 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1120 			etm_suspend_reconnect(hdl, mp);
1121 			return (FMD_SEND_RETRY);
1122 		} else {
1123 			mp->epm_cstat = C_OPEN;
1124 		}
1125 	}
1126 
1127 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1128 		fmd_hdl_abort(hdl, "No class string in nvlist");
1129 
1130 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1131 	if (msgnvl == NULL) {
1132 		mp->epm_qstat = Q_UNINITIALIZED;
1133 		(void) pthread_mutex_unlock(&mp->epm_lock);
1134 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1135 		    (void *) ep);
1136 		return (FMD_SEND_FAILED);
1137 	}
1138 
1139 	(void) pthread_mutex_unlock(&mp->epm_lock);
1140 
1141 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1142 
1143 	hdrlen = ETM_HDRLEN;
1144 	buflen = nvsize + hdrlen;
1145 
1146 	ALLOC_BUF(hdl, buf, buflen);
1147 
1148 	nvbuf = buf + hdrlen;
1149 
1150 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1151 
1152 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1153 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1154 		FREE_BUF(hdl, buf, buflen);
1155 		return (FMD_SEND_FAILED);
1156 	}
1157 
1158 	nvlist_free(msgnvl);
1159 
1160 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1161 	    buflen) != buflen) {
1162 		(void) pthread_mutex_lock(&mp->epm_lock);
1163 		mp->epm_txbusy--;
1164 		(void) pthread_mutex_unlock(&mp->epm_lock);
1165 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1166 		fmd_hdl_debug(hdl, "failed to send message to %s",
1167 		    mp->epm_ep_str);
1168 		FREE_BUF(hdl, buf, buflen);
1169 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1170 		etm_suspend_reconnect(hdl, mp);
1171 		return (FMD_SEND_RETRY);
1172 	}
1173 
1174 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1175 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1176 
1177 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1178 
1179 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1180 	    hdrlen) != hdrlen) {
1181 		(void) pthread_mutex_lock(&mp->epm_lock);
1182 		mp->epm_txbusy--;
1183 		(void) pthread_mutex_unlock(&mp->epm_lock);
1184 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1185 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1186 		    mp->epm_ep_str);
1187 		FREE_BUF(hdl, buf, buflen);
1188 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1189 		etm_suspend_reconnect(hdl, mp);
1190 		return (FMD_SEND_RETRY);
1191 	}
1192 
1193 	hdrstat = etm_check_hdr(hdl, mp, buf);
1194 	FREE_BUF(hdl, buf, buflen);
1195 
1196 	if (hdrstat == ETM_HDR_ACK) {
1197 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1198 	} else {
1199 		(void) pthread_mutex_lock(&mp->epm_lock);
1200 
1201 		(void) etm_xport_close(hdl, mp->epm_oconn);
1202 		mp->epm_oconn = NULL;
1203 
1204 		if (hdrstat == ETM_HDR_NAK) {
1205 			/* Peer received a bad value in the header */
1206 			if (mp->epm_xprthdl != NULL) {
1207 				mp->epm_cstat = C_LIMBO;
1208 				fmd_xprt_suspend(hdl, xprthdl);
1209 				mp->epm_qstat = Q_SUSPENDED;
1210 				fmd_hdl_debug(hdl, "received NAK, queue "
1211 				    "suspended for %s", mp->epm_ep_str);
1212 			}
1213 
1214 			rv = FMD_SEND_RETRY;
1215 
1216 		} else if (hdrstat == ETM_HDR_S_RESTART) {
1217 			/* Server has restarted */
1218 			if (mp->epm_xprthdl != NULL) {
1219 				mp->epm_cstat = C_CLOSED;
1220 				fmd_xprt_close(hdl, xprthdl);
1221 				/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1222 				mp->epm_ep_nvl = NULL;
1223 				mp->epm_qstat = Q_UNINITIALIZED;
1224 				fmd_hdl_debug(hdl, "server restarted, queue "
1225 				    "closed for %s", mp->epm_ep_str);
1226 				if (mp->epm_timer_in_use == 0) {
1227 					mp->epm_timer_id = fmd_timer_install(
1228 					    hdl, mp, NULL, Reconn_interval);
1229 					mp->epm_timer_in_use = 1;
1230 				}
1231 			}
1232 
1233 			/*
1234 			 * fault.* or list.* events will be replayed if a
1235 			 * transport is opened with the same auth.
1236 			 * Other events will be discarded.
1237 			 */
1238 			rv = FMD_SEND_FAILED;
1239 
1240 		} else {
1241 			mp->epm_cstat = C_CLOSED;
1242 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1243 
1244 			rv = FMD_SEND_RETRY;
1245 		}
1246 
1247 		mp->epm_txbusy--;
1248 		(void) pthread_mutex_unlock(&mp->epm_lock);
1249 
1250 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1251 
1252 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1253 
1254 		return (rv);
1255 	}
1256 
1257 	(void) pthread_mutex_lock(&mp->epm_lock);
1258 	mp->epm_txbusy--;
1259 	(void) pthread_mutex_unlock(&mp->epm_lock);
1260 
1261 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1262 
1263 	return (FMD_SEND_SUCCESS);
1264 }
1265 
1266 /*
1267  * FMD fmdo_timeout entry point..
1268  */
1269 /*ARGSUSED*/
1270 static void
1271 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1272 {
1273 	etm_epmap_t *mp = (etm_epmap_t *)data;
1274 
1275 	(void) pthread_mutex_lock(&mp->epm_lock);
1276 
1277 	mp->epm_timer_in_use = 0;
1278 
1279 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1280 		/* Server has shutdown and we (client) need to reconnect */
1281 		if (mp->epm_ep_nvl == NULL)
1282 			(void) etm_get_ep_nvl(hdl, mp);
1283 
1284 		if (etm_handle_startup(hdl, mp)) {
1285 			mp->epm_qstat = Q_UNINITIALIZED;
1286 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1287 			    Reconn_interval);
1288 			mp->epm_timer_in_use = 1;
1289 		}
1290 	} else {
1291 		etm_reconnect(hdl, mp);
1292 	}
1293 
1294 	(void) pthread_mutex_unlock(&mp->epm_lock);
1295 }
1296 
1297 /*
1298  * FMD Module declarations
1299  */
1300 static const fmd_hdl_ops_t etm_ops = {
1301 	NULL,		/* fmdo_recv */
1302 	etm_timeout,	/* fmdo_timeout */
1303 	NULL,		/* fmdo_close */
1304 	NULL,		/* fmdo_stats */
1305 	NULL,		/* fmdo_gc */
1306 	etm_send,	/* fmdo_send */
1307 };
1308 
1309 static const fmd_prop_t etm_props[] = {
1310 	{ "client_list", FMD_TYPE_STRING, NULL },
1311 	{ "server_list", FMD_TYPE_STRING, NULL },
1312 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1313 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000"},
1314 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000"},
1315 	{ NULL, 0, NULL }
1316 };
1317 
1318 static const fmd_hdl_info_t etm_info = {
1319 	"Event Transport Module", "2.0", &etm_ops, etm_props
1320 };
1321 
1322 /*
1323  * Initialize the transport for use by ETM.
1324  */
1325 void
1326 _fmd_init(fmd_hdl_t *hdl)
1327 {
1328 	char *propstr;
1329 
1330 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1331 		return; /* invalid data in configuration file */
1332 	}
1333 
1334 	/* Create global stats */
1335 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1336 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1337 
1338 	/* Get module properties */
1339 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1340 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1341 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1342 
1343 	propstr = fmd_prop_get_string(hdl, "client_list");
1344 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1345 	fmd_prop_free_string(hdl, propstr);
1346 
1347 	propstr = fmd_prop_get_string(hdl, "server_list");
1348 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1349 	fmd_prop_free_string(hdl, propstr);
1350 
1351 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1352 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1353 		fmd_hdl_unregister(hdl);
1354 		return;
1355 	}
1356 }
1357 
1358 /*
1359  * Teardown the transport
1360  */
1361 void
1362 _fmd_fini(fmd_hdl_t *hdl)
1363 {
1364 	etm_epmap_t *mp, *next;
1365 
1366 	(void) pthread_mutex_lock(&Etm_mod_lock);
1367 	Etm_exit = 1;
1368 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1369 
1370 	mp = Epmap_head;
1371 
1372 	while (mp) {
1373 		next = mp->epm_next;
1374 		etm_free_epmap(hdl, mp);
1375 		mp = next;
1376 	}
1377 
1378 	fmd_hdl_unregister(hdl);
1379 }
1380