xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision 1a220b56b93ff1dc80855691548503117af4cc10)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * FMA Event Transport Module
31  *
32  * Plugin for sending/receiving FMA events to/from a remote endoint.
33  */
34 
35 #include <netinet/in.h>
36 #include <sys/fm/protocol.h>
37 #include <sys/sysmacros.h>
38 #include <pthread.h>
39 #include <strings.h>
40 #include <ctype.h>
41 #include <link.h>
42 #include <libnvpair.h>
43 #include "etm_xport_api.h"
44 #include "etm_proto.h"
45 
46 /*
47  * ETM declarations
48  */
49 
50 typedef enum etm_connection_status {
51 	C_UNINITIALIZED = 0,
52 	C_OPEN,				/* Connection is open */
53 	C_CLOSED,			/* Connection is closed */
54 	C_LIMBO,			/* Bad value in header from peer */
55 	C_TIMED_OUT			/* Reconnection to peer timed out */
56 } etm_connstat_t;
57 
58 typedef enum etm_fmd_queue_status {
59 	Q_UNINITIALIZED = 100,
60 	Q_INIT_PENDING,			/* Queue initialization in progress */
61 	Q_OPEN,				/* Queue is open */
62 	Q_SUSPENDED			/* Queue is suspended */
63 } etm_qstat_t;
64 
65 /* Per endpoint data */
66 typedef struct etm_endpoint_map {
67 	uint8_t epm_ver;		/* Protocol version being used */
68 	char *epm_ep_str;		/* Endpoint ID string */
69 	int epm_xprtflags;		/* FMD transport open flags */
70 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
71 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
72 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
73 	int epm_txbusy;			/* Busy doing send/transmit */
74 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
75 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
76 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
77 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
78 	etm_connstat_t epm_cstat;	/* Status of connection */
79 	id_t epm_timer_id;		/* Timer id */
80 	int epm_timer_in_use;		/* Indicates if timer is in use */
81 	hrtime_t epm_reconn_end;	/* Reconnection end time */
82 	struct etm_endpoint_map *epm_next;
83 } etm_epmap_t;
84 
85 #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
86 #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
87 #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
88 #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
89 #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
90 #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
91 
92 #define	ALLOC_BUF(hdl, buf, size) \
93 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
94 
95 #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
96 
97 #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
98 
99 #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
100 				(x)++;					    \
101 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
102 			}
103 
104 #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
105 				(x)--;					    \
106 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
107 			}
108 
109 #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
110 				(x) += (y);				    \
111 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
112 			}
113 
114 /*
115  * Global variables
116  */
117 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
118 					/* Protects globals */
119 static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
120 static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
121 static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
122 static int Etm_dump = 0;		/* Enables hex dump for debug */
123 static int Etm_exit = 0;		/* Flag for exit */
124 static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
125 
126 /* Module statistics */
127 static struct etm_stats {
128 	/* read counters */
129 	fmd_stat_t read_ack;
130 	fmd_stat_t read_bytes;
131 	fmd_stat_t read_msg;
132 	/* write counters */
133 	fmd_stat_t write_ack;
134 	fmd_stat_t write_bytes;
135 	fmd_stat_t write_msg;
136 	/* error counters */
137 	fmd_stat_t error_protocol;
138 	fmd_stat_t error_drop_read;
139 	fmd_stat_t error_read;
140 	fmd_stat_t error_read_badhdr;
141 	fmd_stat_t error_write;
142 	/* misc */
143 	fmd_stat_t peer_count;
144 
145 } Etm_stats = {
146 	/* read counters */
147 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
148 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
149 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
150 	/* write counters */
151 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
152 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
153 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
154 	/* ETM error counters */
155 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
156 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
157 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
158 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
159 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
160 	/* ETM Misc */
161 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
162 };
163 
164 /*
165  * ETM Private functions
166  */
167 
168 /*
169  * Hex dump for debug.
170  */
171 static void
172 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
173 {
174 	int i, j, k;
175 	int16_t *c;
176 
177 	if (Etm_dump == 0)
178 		return;
179 
180 	j = buflen / 16;	/* Number of complete 8-column rows */
181 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
182 
183 	if (direction)
184 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
185 	else
186 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
187 
188 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
189 
190 	/* Dump the complete 8-column rows */
191 	for (i = 0; i < j; i++) {
192 		c = (int16_t *)buf + (i * 8);
193 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
194 		    *(c+0), *(c+1), *(c+2), *(c+3),
195 		    *(c+4), *(c+5), *(c+6), *(c+7));
196 	}
197 
198 	/* Dump the last (incomplete) row */
199 	c = (int16_t *)buf + (i * 8);
200 	switch (k) {
201 	case 4:
202 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
203 		break;
204 	case 8:
205 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
206 		    *(c+2), *(c+3));
207 		break;
208 	case 12:
209 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
210 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
211 		break;
212 	}
213 
214 	fmd_hdl_debug(hdl, "---      End Dump      ---");
215 }
216 
217 /*
218  * Provide the length of a message based on the data in the given ETM header.
219  */
220 static size_t
221 etm_get_msglen(void *buf)
222 {
223 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
224 
225 	return (ntohl(hp->hdr_msglen));
226 }
227 
228 /*
229  * Check the contents of the ETM header for errors.
230  * Return the header type (hdr_type).
231  */
232 static int
233 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
234 {
235 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
236 
237 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
238 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
239 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
240 		return (ETM_HDR_INVALID);
241 	}
242 
243 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
244 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
245 		/* Until version is negotiated, other fields may be wrong */
246 		return (hp->hdr_type);
247 	}
248 
249 	if (hp->hdr_ver != mp->epm_ver) {
250 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
251 		    mp->epm_ep_str, hp->hdr_ver);
252 		return (ETM_HDR_BADVERSION);
253 	}
254 
255 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
256 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
257 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
258 		    mp->epm_ep_str, hp->hdr_type);
259 		return (ETM_HDR_BADTYPE);
260 	}
261 
262 	return (hp->hdr_type);
263 }
264 
265 /*
266  * Create an ETM header of a given type in the given buffer.
267  * Return length of header.
268  */
269 static size_t
270 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
271 {
272 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
273 
274 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
275 	hp->hdr_ver = ver;
276 	hp->hdr_type = type;
277 	hp->hdr_msglen = htonl(msglen);
278 
279 	return (ETM_HDRLEN);
280 }
281 
282 /*
283  * Convert message bytes to nvlist and post to fmd.
284  * Return zero for success, non-zero for failure.
285  *
286  * Note : nvl is free'd by fmd.
287  */
288 static int
289 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
290 {
291 	nvlist_t *nvl;
292 	int rv;
293 
294 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
295 		fmd_hdl_error(hdl, "failed to unpack message");
296 		return (1);
297 	}
298 
299 	(void) pthread_mutex_lock(&mp->epm_lock);
300 	(void) pthread_mutex_lock(&Etm_mod_lock);
301 	if (!Etm_exit) {
302 		(void) pthread_mutex_unlock(&Etm_mod_lock);
303 		if (mp->epm_qstat == Q_OPEN) {
304 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
305 			rv = 0;
306 		} else if (mp->epm_qstat == Q_SUSPENDED) {
307 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
308 			if (mp->epm_timer_in_use) {
309 				fmd_timer_remove(hdl, mp->epm_timer_id);
310 				mp->epm_timer_in_use = 0;
311 			}
312 			mp->epm_qstat = Q_OPEN;
313 			fmd_hdl_debug(hdl, "queue resumed for %s",
314 			    mp->epm_ep_str);
315 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
316 			rv = 0;
317 		} else {
318 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
319 			    mp->epm_qstat);
320 			nvlist_free(nvl);
321 			/* Remote peer will attempt to resend event */
322 			rv = 2;
323 		}
324 	} else {
325 		(void) pthread_mutex_unlock(&Etm_mod_lock);
326 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
327 		nvlist_free(nvl);
328 		/* Remote peer will attempt to resend event */
329 		rv = 3;
330 	}
331 
332 	(void) pthread_mutex_unlock(&mp->epm_lock);
333 
334 	return (rv);
335 }
336 
337 /*
338  * Handle the startup handshake to the server.  The client always initiates
339  * the startup handshake.  In the following sequence, we are the client and
340  * the remote endpoint is the server.
341  *
342  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
343  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
344  *	Client sends ACK and transitions to Q_OPEN state.
345  *	Server receives ACK and transitions to Q_OPEN state.
346  *
347  * Return 0 for success, nonzero for failure.
348  */
349 static int
350 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
351 {
352 	etm_proto_hdr_t *hp;
353 	size_t hdrlen = ETM_HDRLEN;
354 	int hdrstat;
355 	char hbuf[ETM_HDRLEN];
356 
357 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
358 		return (1);
359 
360 	mp->epm_cstat = C_OPEN;
361 
362 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
363 
364 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
365 	    hdrlen)) != hdrlen) {
366 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
367 		    mp->epm_ep_str);
368 		return (2);
369 	}
370 
371 	mp->epm_qstat = Q_INIT_PENDING;
372 
373 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
374 	    hdrlen)) != hdrlen) {
375 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
376 		    mp->epm_ep_str);
377 		return (3);
378 	}
379 
380 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
381 
382 	if (hdrstat != ETM_HDR_S_HELLO) {
383 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
384 		    "from %s", mp->epm_ep_str);
385 		return (4);
386 	}
387 
388 	/*
389 	 * Get version from the server.
390 	 * Currently, only one version is supported.
391 	 */
392 	hp = (etm_proto_hdr_t *)(void *)hbuf;
393 	if (hp->hdr_ver != ETM_PROTO_V1) {
394 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
395 		    mp->epm_ep_str, hp->hdr_ver);
396 		return (5);
397 	}
398 	mp->epm_ver = hp->hdr_ver;
399 
400 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
401 
402 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
403 	    hdrlen)) != hdrlen) {
404 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
405 		    mp->epm_ep_str);
406 		return (6);
407 	}
408 
409 	/*
410 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
411 	 * Etm_mod_lock held to avoid race with etm_send thread.
412 	 */
413 	(void) pthread_mutex_lock(&Etm_mod_lock);
414 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
415 	    mp->epm_ep_nvl, NULL)) == NULL) {
416 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
417 		    mp->epm_ep_str);
418 	}
419 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
420 	(void) pthread_mutex_unlock(&Etm_mod_lock);
421 
422 	mp->epm_qstat = Q_OPEN;
423 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
424 
425 	return (0);
426 }
427 
428 /*
429  * Alloc a nvlist and add a string for the endpoint.
430  * Return zero for success, non-zero for failure.
431  */
432 static int
433 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
434 {
435 	/*
436 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
437 	 * in fmd when this nvlist_t is free'd.
438 	 */
439 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
440 
441 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
442 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
443 		    "for %s", mp->epm_ep_str);
444 		nvlist_free(mp->epm_ep_nvl);
445 		return (1);
446 	}
447 
448 	return (0);
449 }
450 
451 /*
452  * Free the nvlist for the endpoint_id string.
453  */
454 /*ARGSUSED*/
455 static void
456 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
457 {
458 	nvlist_free(mp->epm_ep_nvl);
459 }
460 
461 /*
462  * Check for a duplicate endpoint/peer string.
463  */
464 /*ARGSUSED*/
465 static int
466 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
467 {
468 	etm_epmap_t *mp;
469 
470 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
471 		if (strcmp(epname, mp->epm_ep_str) == 0)
472 			return (1);
473 
474 	return (0);
475 }
476 
477 /*
478  * Attempt to re-open a connection with the remote endpoint.
479  */
480 static void
481 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
482 {
483 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
484 		if (gethrtime() < mp->epm_reconn_end) {
485 			if ((mp->epm_oconn = etm_xport_open(hdl,
486 			    mp->epm_tlhdl)) == NULL) {
487 				fmd_hdl_debug(hdl, "reconnect failed for %s",
488 				    mp->epm_ep_str);
489 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
490 				    NULL, Reconn_interval);
491 				mp->epm_timer_in_use = 1;
492 			} else {
493 				fmd_hdl_debug(hdl, "reconnect success for %s",
494 				    mp->epm_ep_str);
495 				mp->epm_reconn_end = 0;
496 				mp->epm_cstat = C_OPEN;
497 			}
498 		} else {
499 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
500 			    mp->epm_ep_str);
501 			mp->epm_reconn_end = 0;
502 			mp->epm_cstat = C_TIMED_OUT;
503 		}
504 	}
505 
506 	if (mp->epm_cstat == C_OPEN) {
507 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
508 		mp->epm_qstat = Q_OPEN;
509 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
510 	}
511 }
512 
513 /*
514  * Suspend a given connection and setup for reconnection retries.
515  * Assume caller holds lock on epm_lock.
516  */
517 static void
518 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
519 {
520 	(void) pthread_mutex_lock(&Etm_mod_lock);
521 	if (Etm_exit) {
522 		(void) pthread_mutex_unlock(&Etm_mod_lock);
523 		return;
524 	}
525 	(void) pthread_mutex_unlock(&Etm_mod_lock);
526 
527 	if (mp->epm_oconn != NULL) {
528 		(void) etm_xport_close(hdl, mp->epm_oconn);
529 		mp->epm_oconn = NULL;
530 	}
531 
532 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
533 	mp->epm_cstat = C_UNINITIALIZED;
534 
535 	if (mp->epm_xprthdl != NULL) {
536 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
537 		mp->epm_qstat = Q_SUSPENDED;
538 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
539 
540 		if (mp->epm_timer_in_use == 0) {
541 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
542 			    Reconn_interval);
543 			mp->epm_timer_in_use = 1;
544 		}
545 	}
546 }
547 
548 /*
549  * Reinitialize the connection. The old fmd_xprt_t handle must be
550  * removed/closed first.
551  * Assume caller holds lock on epm_lock.
552  */
553 static void
554 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
555 {
556 	/*
557 	 * To avoid a deadlock, wait for etm_send to finish before
558 	 * calling fmd_xprt_close()
559 	 */
560 	while (mp->epm_txbusy)
561 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
562 
563 	if (mp->epm_xprthdl != NULL) {
564 		fmd_xprt_close(hdl, mp->epm_xprthdl);
565 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
566 		mp->epm_xprthdl = NULL;
567 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
568 		mp->epm_ep_nvl = NULL;
569 	}
570 
571 	if (mp->epm_timer_in_use) {
572 		fmd_timer_remove(hdl, mp->epm_timer_id);
573 		mp->epm_timer_in_use = 0;
574 	}
575 
576 	if (mp->epm_oconn != NULL) {
577 		(void) etm_xport_close(hdl, mp->epm_oconn);
578 		mp->epm_oconn = NULL;
579 	}
580 
581 	mp->epm_cstat = C_UNINITIALIZED;
582 	mp->epm_qstat = Q_UNINITIALIZED;
583 }
584 
585 /*
586  * Receive data from ETM transport layer.
587  * Note : This is not the fmdo_recv entry point.
588  *
589  */
590 static int
591 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
592 {
593 	size_t buflen, hdrlen;
594 	void *buf;
595 	char hbuf[ETM_HDRLEN];
596 	int hdrstat, rv;
597 
598 	hdrlen = ETM_HDRLEN;
599 
600 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
601 		fmd_hdl_debug(hdl, "failed to read header from %s",
602 		    mp->epm_ep_str);
603 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
604 		return (EIO);
605 	}
606 
607 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
608 
609 	switch (hdrstat) {
610 	case ETM_HDR_INVALID:
611 		(void) pthread_mutex_lock(&mp->epm_lock);
612 		if (mp->epm_cstat == C_OPEN)
613 			mp->epm_cstat = C_CLOSED;
614 		(void) pthread_mutex_unlock(&mp->epm_lock);
615 
616 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
617 		rv = ECANCELED;
618 		break;
619 
620 	case ETM_HDR_BADTYPE:
621 	case ETM_HDR_BADVERSION:
622 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
623 
624 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
625 		    hdrlen)) != hdrlen) {
626 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
627 			    mp->epm_ep_str);
628 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
629 			return (EIO);
630 		}
631 
632 		(void) pthread_mutex_lock(&mp->epm_lock);
633 		mp->epm_cstat = C_LIMBO;
634 		(void) pthread_mutex_unlock(&mp->epm_lock);
635 
636 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
637 		rv = ENOTSUP;
638 		break;
639 
640 	case ETM_HDR_C_HELLO:
641 		/* Client is initiating a startup handshake */
642 		(void) pthread_mutex_lock(&mp->epm_lock);
643 		etm_reinit(hdl, mp);
644 		mp->epm_qstat = Q_INIT_PENDING;
645 		(void) pthread_mutex_unlock(&mp->epm_lock);
646 
647 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
648 
649 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
650 		    hdrlen)) != hdrlen) {
651 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
652 			    mp->epm_ep_str);
653 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
654 			return (EIO);
655 		}
656 
657 		rv = 0;
658 		break;
659 
660 	case ETM_HDR_ACK:
661 		(void) pthread_mutex_lock(&mp->epm_lock);
662 		if (mp->epm_qstat == Q_INIT_PENDING) {
663 			/* This is client's ACK from startup handshake */
664 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
665 			if (mp->epm_ep_nvl == NULL)
666 				(void) etm_get_ep_nvl(hdl, mp);
667 
668 			/*
669 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
670 			 * Etm_mod_lock held to avoid race with etm_send thread.
671 			 */
672 			(void) pthread_mutex_lock(&Etm_mod_lock);
673 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
674 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
675 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
676 				    "for %s", mp->epm_ep_str);
677 			}
678 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
679 			(void) pthread_mutex_unlock(&Etm_mod_lock);
680 
681 			mp->epm_qstat = Q_OPEN;
682 			(void) pthread_mutex_unlock(&mp->epm_lock);
683 			fmd_hdl_debug(hdl, "queue open for %s",
684 			    mp->epm_ep_str);
685 		} else {
686 			(void) pthread_mutex_unlock(&mp->epm_lock);
687 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
688 			    "from %s\n", mp->epm_ep_str);
689 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
690 		}
691 
692 		rv = 0;
693 		break;
694 
695 	case ETM_HDR_SHUTDOWN:
696 		fmd_hdl_debug(hdl, "received shutdown from %s",
697 		    mp->epm_ep_str);
698 
699 		(void) pthread_mutex_lock(&mp->epm_lock);
700 
701 		etm_reinit(hdl, mp);
702 
703 		if (IS_CLIENT(mp)) {
704 			/*
705 			 * A server shutdown is considered to be temporary.
706 			 * Prepare for reconnection.
707 			 */
708 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
709 			    Reconn_interval);
710 
711 			mp->epm_timer_in_use = 1;
712 		}
713 
714 		(void) pthread_mutex_unlock(&mp->epm_lock);
715 
716 		rv = ECANCELED;
717 		break;
718 
719 	case ETM_HDR_MSG:
720 		(void) pthread_mutex_lock(&mp->epm_lock);
721 		if (mp->epm_qstat == Q_UNINITIALIZED) {
722 			/* Peer (client) is unaware that we've restarted */
723 			(void) pthread_mutex_unlock(&mp->epm_lock);
724 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
725 			    ETM_HDR_S_RESTART, 0);
726 
727 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
728 			    hdrlen)) != hdrlen) {
729 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
730 				    "to %s", mp->epm_ep_str);
731 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
732 				return (EIO);
733 			}
734 
735 			return (ECANCELED);
736 		}
737 		(void) pthread_mutex_unlock(&mp->epm_lock);
738 
739 		buflen = etm_get_msglen(hbuf);
740 		ALLOC_BUF(hdl, buf, buflen);
741 
742 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
743 		    buflen) != buflen) {
744 			fmd_hdl_debug(hdl, "failed to read message from %s",
745 			    mp->epm_ep_str);
746 			FREE_BUF(hdl, buf, buflen);
747 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
748 			return (EIO);
749 		}
750 
751 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
752 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
753 
754 		etm_hex_dump(hdl, buf, buflen, 0);
755 
756 		if (etm_post_msg(hdl, mp, buf, buflen)) {
757 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
758 			FREE_BUF(hdl, buf, buflen);
759 			return (EIO);
760 		}
761 
762 		FREE_BUF(hdl, buf, buflen);
763 
764 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
765 
766 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
767 		    hdrlen)) != hdrlen) {
768 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
769 			    mp->epm_ep_str);
770 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
771 			return (EIO);
772 		}
773 
774 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
775 
776 		/*
777 		 * If we got this far and the current state of the
778 		 * outbound/sending connection is TIMED_OUT or
779 		 * LIMBO, then we should reinitialize it.
780 		 */
781 		(void) pthread_mutex_lock(&mp->epm_lock);
782 		if (mp->epm_cstat == C_TIMED_OUT ||
783 		    mp->epm_cstat == C_LIMBO) {
784 			if (mp->epm_oconn != NULL) {
785 				(void) etm_xport_close(hdl, mp->epm_oconn);
786 				mp->epm_oconn = NULL;
787 			}
788 			mp->epm_cstat = C_UNINITIALIZED;
789 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
790 			if (mp->epm_timer_in_use) {
791 				fmd_timer_remove(hdl, mp->epm_timer_id);
792 				mp->epm_timer_in_use = 0;
793 			}
794 			mp->epm_qstat = Q_OPEN;
795 			fmd_hdl_debug(hdl, "queue resumed for %s",
796 			    mp->epm_ep_str);
797 		}
798 		(void) pthread_mutex_unlock(&mp->epm_lock);
799 
800 		rv = 0;
801 		break;
802 
803 	default:
804 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
805 		    "from %s : %d", mp->epm_ep_str, hdrstat);
806 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
807 		rv = 0;
808 	}
809 
810 	return (rv);
811 }
812 
813 /*
814  * ETM transport layer callback function.
815  * The transport layer calls this function to :
816  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
817  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
818  */
819 static int
820 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
821     void *arg)
822 {
823 	etm_epmap_t *mp = (etm_epmap_t *)arg;
824 	int rv = 0;
825 
826 	(void) pthread_mutex_lock(&Etm_mod_lock);
827 	if (Etm_exit) {
828 		(void) pthread_mutex_unlock(&Etm_mod_lock);
829 		return (ECANCELED);
830 	}
831 	(void) pthread_mutex_unlock(&Etm_mod_lock);
832 
833 	switch (flag) {
834 	case ETM_CBFLAG_RECV:
835 		rv = etm_recv(hdl, conn, mp);
836 		break;
837 	case ETM_CBFLAG_REINIT:
838 		(void) pthread_mutex_lock(&mp->epm_lock);
839 		etm_reinit(hdl, mp);
840 		(void) pthread_mutex_unlock(&mp->epm_lock);
841 		/*
842 		 * Return ECANCELED so the transport layer will close the
843 		 * server connection.  The transport layer is responsible for
844 		 * reestablishing this connection (should a connection request
845 		 * arrive from the peer).
846 		 */
847 		rv = ECANCELED;
848 		break;
849 	default:
850 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
851 		rv = ENOTSUP;
852 	}
853 
854 	return (rv);
855 }
856 
857 /*
858  * Allocate and initialize an etm_epmap_t struct for the given endpoint
859  * name string.
860  */
861 static void
862 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
863 {
864 	etm_epmap_t *newmap;
865 
866 	if (etm_check_dup_ep_str(hdl, epname)) {
867 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
868 		return;
869 	}
870 
871 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
872 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
873 	newmap->epm_xprtflags = flags;
874 	newmap->epm_cstat = C_UNINITIALIZED;
875 	newmap->epm_qstat = Q_UNINITIALIZED;
876 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
877 	newmap->epm_txbusy = 0;
878 
879 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
880 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
881 
882 	if (etm_get_ep_nvl(hdl, newmap)) {
883 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
884 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
885 		return;
886 	}
887 
888 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
889 	    etm_cb_func, newmap)) == NULL) {
890 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
891 		    newmap->epm_ep_str);
892 		etm_free_ep_nvl(hdl, newmap);
893 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
894 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
895 		return;
896 	}
897 
898 	if (IS_CLIENT(newmap)) {
899 		if (etm_handle_startup(hdl, newmap)) {
900 			/*
901 			 * For whatever reason, we could not complete the
902 			 * startup handshake with the server.  Set the timer
903 			 * and try again.
904 			 */
905 			if (newmap->epm_oconn != NULL) {
906 				(void) etm_xport_close(hdl, newmap->epm_oconn);
907 				newmap->epm_oconn = NULL;
908 			}
909 			newmap->epm_cstat = C_UNINITIALIZED;
910 			newmap->epm_qstat = Q_UNINITIALIZED;
911 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
912 			    NULL, Reconn_interval);
913 			newmap->epm_timer_in_use = 1;
914 		}
915 	}
916 
917 	/* Add this transport instance handle to the list */
918 	newmap->epm_next = Epmap_head;
919 	Epmap_head = newmap;
920 
921 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
922 }
923 
924 /*
925  * Parse the given property list string and call etm_init_epmap
926  * for each endpoint.
927  */
928 static void
929 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
930 {
931 	char *epstr, *ep, *prefix, *lasts, *numstr;
932 	char epname[MAXPATHLEN];
933 	size_t slen, nlen;
934 	int beg, end, i;
935 
936 	if (eplist == NULL)
937 		return;
938 	/*
939 	 * Create a copy of eplist for parsing.
940 	 * strtok/strtok_r(3C) will insert null chars to the string.
941 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
942 	 */
943 	slen = strlen(eplist);
944 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
945 	(void) strcpy(epstr, eplist);
946 
947 	/*
948 	 * The following are supported for the "client_list" and
949 	 * "server_list" properties :
950 	 *
951 	 *    A space-separated list of endpoints.
952 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
953 	 *
954 	 *    An array syntax for a range of instances.
955 	 *	"dev:///dom[0:2]"
956 	 *
957 	 *    A combination of both.
958 	 *	"dev:///dom0 dev:///dom[1:2]"
959 	 */
960 	ep = strtok_r(epstr, " ", &lasts);
961 	while (ep != NULL) {
962 		if (strchr(ep, '[') != NULL) {
963 			/*
964 			 * This string is using array syntax.
965 			 * Check the string for correct syntax.
966 			 */
967 			if ((strchr(ep, ':') == NULL) ||
968 			    (strchr(ep, ']') == NULL)) {
969 				fmd_hdl_error(hdl, "Syntax error in property "
970 				    "that includes : %s\n", ep);
971 				ep = strtok_r(NULL, " ", &lasts);
972 				continue;
973 			}
974 
975 			/* expand the array syntax */
976 			prefix = strtok(ep, "[");
977 
978 			numstr = strtok(NULL, ":");
979 			if ((numstr == NULL) || (!isdigit(*numstr))) {
980 				fmd_hdl_error(hdl, "Syntax error in property "
981 				    "that includes : %s[\n", prefix);
982 				ep = strtok_r(NULL, " ", &lasts);
983 				continue;
984 			}
985 			beg = atoi(numstr);
986 
987 			numstr = strtok(NULL, "]");
988 			if ((numstr == NULL) || (!isdigit(*numstr))) {
989 				fmd_hdl_error(hdl, "Syntax error in property "
990 				    "that includes : %s[\n", prefix);
991 				ep = strtok_r(NULL, " ", &lasts);
992 				continue;
993 			}
994 			end = atoi(numstr);
995 
996 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
997 
998 			if (nlen > MAXPATHLEN) {
999 				fmd_hdl_error(hdl, "Endpoint prop string "
1000 				    "exceeds MAXPATHLEN\n");
1001 				ep = strtok_r(NULL, " ", &lasts);
1002 				continue;
1003 			}
1004 
1005 			for (i = beg; i <= end; i++) {
1006 				bzero(epname, MAXPATHLEN);
1007 				(void) snprintf(epname, nlen, "%s%d",
1008 				    prefix, i);
1009 				etm_init_epmap(hdl, epname, flags);
1010 			}
1011 		} else {
1012 			etm_init_epmap(hdl, ep, flags);
1013 		}
1014 
1015 		ep = strtok_r(NULL, " ", &lasts);
1016 	}
1017 
1018 	fmd_hdl_free(hdl, epstr, slen + 1);
1019 }
1020 
1021 /*
1022  * Free the transport infrastructure for an endpoint.
1023  */
1024 static void
1025 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1026 {
1027 	size_t hdrlen;
1028 	char hbuf[ETM_HDRLEN];
1029 
1030 	(void) pthread_mutex_lock(&mp->epm_lock);
1031 
1032 	/*
1033 	 * If an etm_send thread is in progress, wait for it to finish.
1034 	 * The etm_recv thread is managed by the transport layer and will
1035 	 * be destroyed with etm_xport_fini().
1036 	 */
1037 	while (mp->epm_txbusy)
1038 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1039 
1040 	if (mp->epm_timer_in_use)
1041 		fmd_timer_remove(hdl, mp->epm_timer_id);
1042 
1043 	if (mp->epm_oconn != NULL) {
1044 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1045 		    ETM_HDR_SHUTDOWN, 0);
1046 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1047 		    hdrlen);
1048 		(void) etm_xport_close(hdl, mp->epm_oconn);
1049 		mp->epm_oconn = NULL;
1050 	}
1051 
1052 	if (mp->epm_xprthdl != NULL) {
1053 		fmd_xprt_close(hdl, mp->epm_xprthdl);
1054 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1055 		mp->epm_ep_nvl = NULL;
1056 	}
1057 
1058 	if (mp->epm_ep_nvl != NULL)
1059 		etm_free_ep_nvl(hdl, mp);
1060 
1061 	if (mp->epm_tlhdl != NULL)
1062 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1063 
1064 	(void) pthread_mutex_unlock(&mp->epm_lock);
1065 	(void) pthread_mutex_destroy(&mp->epm_lock);
1066 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1067 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1068 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1069 }
1070 
1071 /*
1072  * FMD entry points
1073  */
1074 
1075 /*
1076  * FMD fmdo_send entry point.
1077  * Send an event to the remote endpoint and receive an ACK.
1078  */
1079 static int
1080 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1081 {
1082 	etm_epmap_t *mp;
1083 	nvlist_t *msgnvl;
1084 	int hdrstat, rv;
1085 	char *buf, *nvbuf, *class;
1086 	size_t nvsize, buflen, hdrlen;
1087 
1088 	(void) pthread_mutex_lock(&Etm_mod_lock);
1089 	if (Etm_exit) {
1090 		(void) pthread_mutex_unlock(&Etm_mod_lock);
1091 		return (FMD_SEND_RETRY);
1092 	}
1093 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1094 
1095 	mp = fmd_xprt_getspecific(hdl, xprthdl);
1096 
1097 	if (pthread_mutex_trylock(&mp->epm_lock))
1098 		/* Another thread may be trying to close this fmd_xprt_t */
1099 		return (FMD_SEND_RETRY);
1100 
1101 	mp->epm_txbusy++;
1102 
1103 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1104 		mp->epm_txbusy--;
1105 		(void) pthread_mutex_unlock(&mp->epm_lock);
1106 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1107 		return (FMD_SEND_FAILED);
1108 	}
1109 
1110 	if (mp->epm_cstat == C_CLOSED) {
1111 		etm_suspend_reconnect(hdl, mp);
1112 		mp->epm_txbusy--;
1113 		(void) pthread_mutex_unlock(&mp->epm_lock);
1114 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1115 		return (FMD_SEND_RETRY);
1116 	}
1117 
1118 	if (mp->epm_cstat == C_LIMBO) {
1119 		if (mp->epm_oconn != NULL) {
1120 			(void) etm_xport_close(hdl, mp->epm_oconn);
1121 			mp->epm_oconn = NULL;
1122 		}
1123 
1124 		fmd_xprt_suspend(hdl, xprthdl);
1125 		mp->epm_qstat = Q_SUSPENDED;
1126 		mp->epm_txbusy--;
1127 		(void) pthread_mutex_unlock(&mp->epm_lock);
1128 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1129 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1130 		return (FMD_SEND_RETRY);
1131 	}
1132 
1133 	if (mp->epm_oconn == NULL) {
1134 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1135 		    == NULL) {
1136 			etm_suspend_reconnect(hdl, mp);
1137 			mp->epm_txbusy--;
1138 			(void) pthread_mutex_unlock(&mp->epm_lock);
1139 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1140 			return (FMD_SEND_RETRY);
1141 		} else {
1142 			mp->epm_cstat = C_OPEN;
1143 		}
1144 	}
1145 
1146 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1147 		fmd_hdl_abort(hdl, "No class string in nvlist");
1148 
1149 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1150 	if (msgnvl == NULL) {
1151 		mp->epm_txbusy--;
1152 		(void) pthread_mutex_unlock(&mp->epm_lock);
1153 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1154 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1155 		    (void *) ep);
1156 		return (FMD_SEND_FAILED);
1157 	}
1158 
1159 	(void) pthread_mutex_unlock(&mp->epm_lock);
1160 
1161 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1162 
1163 	hdrlen = ETM_HDRLEN;
1164 	buflen = nvsize + hdrlen;
1165 
1166 	ALLOC_BUF(hdl, buf, buflen);
1167 
1168 	nvbuf = buf + hdrlen;
1169 
1170 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1171 
1172 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1173 		(void) pthread_mutex_lock(&mp->epm_lock);
1174 		mp->epm_txbusy--;
1175 		(void) pthread_mutex_unlock(&mp->epm_lock);
1176 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1177 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1178 		FREE_BUF(hdl, buf, buflen);
1179 		return (FMD_SEND_FAILED);
1180 	}
1181 
1182 	nvlist_free(msgnvl);
1183 
1184 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1185 	    buflen) != buflen) {
1186 		fmd_hdl_debug(hdl, "failed to send message to %s",
1187 		    mp->epm_ep_str);
1188 		(void) pthread_mutex_lock(&mp->epm_lock);
1189 		etm_suspend_reconnect(hdl, mp);
1190 		mp->epm_txbusy--;
1191 		(void) pthread_mutex_unlock(&mp->epm_lock);
1192 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1193 		FREE_BUF(hdl, buf, buflen);
1194 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1195 		return (FMD_SEND_RETRY);
1196 	}
1197 
1198 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1199 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1200 
1201 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1202 
1203 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1204 	    hdrlen) != hdrlen) {
1205 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1206 		    mp->epm_ep_str);
1207 		(void) pthread_mutex_lock(&mp->epm_lock);
1208 		etm_suspend_reconnect(hdl, mp);
1209 		mp->epm_txbusy--;
1210 		(void) pthread_mutex_unlock(&mp->epm_lock);
1211 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1212 		FREE_BUF(hdl, buf, buflen);
1213 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1214 		return (FMD_SEND_RETRY);
1215 	}
1216 
1217 	hdrstat = etm_check_hdr(hdl, mp, buf);
1218 	FREE_BUF(hdl, buf, buflen);
1219 
1220 	if (hdrstat == ETM_HDR_ACK) {
1221 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1222 	} else {
1223 		(void) pthread_mutex_lock(&mp->epm_lock);
1224 
1225 		(void) etm_xport_close(hdl, mp->epm_oconn);
1226 		mp->epm_oconn = NULL;
1227 
1228 		if (hdrstat == ETM_HDR_NAK) {
1229 			/* Peer received a bad value in the header */
1230 			if (mp->epm_xprthdl != NULL) {
1231 				mp->epm_cstat = C_LIMBO;
1232 				fmd_xprt_suspend(hdl, xprthdl);
1233 				mp->epm_qstat = Q_SUSPENDED;
1234 				fmd_hdl_debug(hdl, "received NAK, queue "
1235 				    "suspended for %s", mp->epm_ep_str);
1236 			}
1237 
1238 			rv = FMD_SEND_RETRY;
1239 
1240 		} else if (hdrstat == ETM_HDR_S_RESTART) {
1241 			/* Server has restarted */
1242 			mp->epm_cstat = C_CLOSED;
1243 			mp->epm_qstat = Q_UNINITIALIZED;
1244 			fmd_hdl_debug(hdl, "server %s restarted",
1245 			    mp->epm_ep_str);
1246 			/*
1247 			 * Cannot call fmd_xprt_close here, so we'll do it
1248 			 * on the timeout thread.
1249 			 */
1250 			if (mp->epm_timer_in_use == 0) {
1251 				mp->epm_timer_id = fmd_timer_install(
1252 				    hdl, mp, NULL, 0);
1253 				mp->epm_timer_in_use = 1;
1254 			}
1255 
1256 			/*
1257 			 * fault.* or list.* events will be replayed if a
1258 			 * transport is opened with the same auth.
1259 			 * Other events will be discarded.
1260 			 */
1261 			rv = FMD_SEND_FAILED;
1262 
1263 		} else {
1264 			mp->epm_cstat = C_CLOSED;
1265 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1266 
1267 			rv = FMD_SEND_RETRY;
1268 		}
1269 
1270 		mp->epm_txbusy--;
1271 		(void) pthread_mutex_unlock(&mp->epm_lock);
1272 
1273 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1274 
1275 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1276 
1277 		return (rv);
1278 	}
1279 
1280 	(void) pthread_mutex_lock(&mp->epm_lock);
1281 	mp->epm_txbusy--;
1282 	(void) pthread_mutex_unlock(&mp->epm_lock);
1283 
1284 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1285 
1286 	return (FMD_SEND_SUCCESS);
1287 }
1288 
1289 /*
1290  * FMD fmdo_timeout entry point..
1291  */
1292 /*ARGSUSED*/
1293 static void
1294 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1295 {
1296 	etm_epmap_t *mp = (etm_epmap_t *)data;
1297 
1298 	(void) pthread_mutex_lock(&mp->epm_lock);
1299 
1300 	mp->epm_timer_in_use = 0;
1301 
1302 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1303 		/* Server has shutdown and we (client) need to reconnect */
1304 		if (mp->epm_xprthdl != NULL) {
1305 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1306 			fmd_hdl_debug(hdl, "queue closed for %s",
1307 			    mp->epm_ep_str);
1308 			mp->epm_xprthdl = NULL;
1309 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1310 			mp->epm_ep_nvl = NULL;
1311 		}
1312 
1313 		if (mp->epm_ep_nvl == NULL)
1314 			(void) etm_get_ep_nvl(hdl, mp);
1315 
1316 		if (etm_handle_startup(hdl, mp)) {
1317 			if (mp->epm_oconn != NULL) {
1318 				(void) etm_xport_close(hdl, mp->epm_oconn);
1319 				mp->epm_oconn = NULL;
1320 			}
1321 			mp->epm_cstat = C_UNINITIALIZED;
1322 			mp->epm_qstat = Q_UNINITIALIZED;
1323 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1324 			    Reconn_interval);
1325 			mp->epm_timer_in_use = 1;
1326 		}
1327 	} else {
1328 		etm_reconnect(hdl, mp);
1329 	}
1330 
1331 	(void) pthread_mutex_unlock(&mp->epm_lock);
1332 }
1333 
1334 /*
1335  * FMD Module declarations
1336  */
1337 static const fmd_hdl_ops_t etm_ops = {
1338 	NULL,		/* fmdo_recv */
1339 	etm_timeout,	/* fmdo_timeout */
1340 	NULL,		/* fmdo_close */
1341 	NULL,		/* fmdo_stats */
1342 	NULL,		/* fmdo_gc */
1343 	etm_send,	/* fmdo_send */
1344 };
1345 
1346 static const fmd_prop_t etm_props[] = {
1347 	{ "client_list", FMD_TYPE_STRING, NULL },
1348 	{ "server_list", FMD_TYPE_STRING, NULL },
1349 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1350 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000"},
1351 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000"},
1352 	{ NULL, 0, NULL }
1353 };
1354 
1355 static const fmd_hdl_info_t etm_info = {
1356 	"Event Transport Module", "2.0", &etm_ops, etm_props
1357 };
1358 
1359 /*
1360  * Initialize the transport for use by ETM.
1361  */
1362 void
1363 _fmd_init(fmd_hdl_t *hdl)
1364 {
1365 	char *propstr;
1366 
1367 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1368 		return; /* invalid data in configuration file */
1369 	}
1370 
1371 	/* Create global stats */
1372 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1373 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1374 
1375 	/* Get module properties */
1376 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1377 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1378 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1379 
1380 	propstr = fmd_prop_get_string(hdl, "client_list");
1381 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1382 	fmd_prop_free_string(hdl, propstr);
1383 
1384 	propstr = fmd_prop_get_string(hdl, "server_list");
1385 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1386 	fmd_prop_free_string(hdl, propstr);
1387 
1388 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1389 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1390 		fmd_hdl_unregister(hdl);
1391 		return;
1392 	}
1393 }
1394 
1395 /*
1396  * Teardown the transport
1397  */
1398 void
1399 _fmd_fini(fmd_hdl_t *hdl)
1400 {
1401 	etm_epmap_t *mp, *next;
1402 
1403 	(void) pthread_mutex_lock(&Etm_mod_lock);
1404 	Etm_exit = 1;
1405 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1406 
1407 	mp = Epmap_head;
1408 
1409 	while (mp) {
1410 		next = mp->epm_next;
1411 		etm_free_epmap(hdl, mp);
1412 		mp = next;
1413 	}
1414 
1415 	fmd_hdl_unregister(hdl);
1416 }
1417