xref: /illumos-gate/usr/src/cmd/fm/modules/common/event-transport/etm.c (revision a0e56b0eb1fdc159ff8348ca0e77d884bb7d126b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * FMA Event Transport Module
31  *
32  * Plugin for sending/receiving FMA events to/from a remote endoint.
33  */
34 
35 #include <netinet/in.h>
36 #include <errno.h>
37 #include <sys/fm/protocol.h>
38 #include <sys/sysmacros.h>
39 #include <pthread.h>
40 #include <strings.h>
41 #include <ctype.h>
42 #include <link.h>
43 #include <libnvpair.h>
44 #include "etm_xport_api.h"
45 #include "etm_proto.h"
46 
47 /*
48  * ETM declarations
49  */
50 
51 typedef enum etm_connection_status {
52 	C_UNINITIALIZED = 0,
53 	C_OPEN,				/* Connection is open */
54 	C_CLOSED,			/* Connection is closed */
55 	C_LIMBO,			/* Bad value in header from peer */
56 	C_TIMED_OUT			/* Reconnection to peer timed out */
57 } etm_connstat_t;
58 
59 typedef enum etm_fmd_queue_status {
60 	Q_UNINITIALIZED = 100,
61 	Q_INIT_PENDING,			/* Queue initialization in progress */
62 	Q_OPEN,				/* Queue is open */
63 	Q_SUSPENDED			/* Queue is suspended */
64 } etm_qstat_t;
65 
66 /* Per endpoint data */
67 typedef struct etm_endpoint_map {
68 	uint8_t epm_ver;		/* Protocol version being used */
69 	char *epm_ep_str;		/* Endpoint ID string */
70 	int epm_xprtflags;		/* FMD transport open flags */
71 	etm_xport_hdl_t epm_tlhdl;	/* Transport Layer instance handle */
72 	pthread_mutex_t epm_lock;	/* Protects remainder of struct */
73 	pthread_cond_t epm_tx_cv;	/* Cond var for send/transmit */
74 	int epm_txbusy;			/* Busy doing send/transmit */
75 	fmd_xprt_t *epm_xprthdl;	/* FMD transport handle */
76 	etm_qstat_t epm_qstat;		/* Status of fmd xprt queue */
77 	nvlist_t *epm_ep_nvl;		/* Endpoint ID nv_list */
78 	etm_xport_conn_t epm_oconn;	/* Connection for outgoing events */
79 	etm_connstat_t epm_cstat;	/* Status of connection */
80 	id_t epm_timer_id;		/* Timer id */
81 	int epm_timer_in_use;		/* Indicates if timer is in use */
82 	hrtime_t epm_reconn_end;	/* Reconnection end time */
83 	struct etm_endpoint_map *epm_next;
84 } etm_epmap_t;
85 
86 #define	ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
87 #define	ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
88 #define	ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
89 #define	ETM_EP_INST_MAX 4		/* Max chars in endpt instance */
90 #define	ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
91 #define	ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
92 
93 #define	ALLOC_BUF(hdl, buf, size) \
94 	buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
95 
96 #define	FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
97 
98 #define	IS_CLIENT(mp)	(((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
99 
100 #define	INCRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
101 				(x)++;					    \
102 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
103 			}
104 
105 #define	DECRSTAT(x)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
106 				(x)--;					    \
107 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
108 			}
109 
110 #define	ADDSTAT(x, y)	{	(void) pthread_mutex_lock(&Etm_mod_lock);   \
111 				(x) += (y);				    \
112 				(void) pthread_mutex_unlock(&Etm_mod_lock); \
113 			}
114 
115 /*
116  * Global variables
117  */
118 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
119 					/* Protects globals */
120 static hrtime_t Reconn_interval;	/* Time between reconnection attempts */
121 static hrtime_t Reconn_timeout;		/* Time allowed for reconnection */
122 static hrtime_t Rw_timeout;		/* Time allowed for I/O operation  */
123 static int Etm_dump = 0;		/* Enables hex dump for debug */
124 static int Etm_exit = 0;		/* Flag for exit */
125 static etm_epmap_t *Epmap_head = NULL;	/* Head of list of epmap structs */
126 
127 /* Module statistics */
128 static struct etm_stats {
129 	/* read counters */
130 	fmd_stat_t read_ack;
131 	fmd_stat_t read_bytes;
132 	fmd_stat_t read_msg;
133 	fmd_stat_t post_filter;
134 	/* write counters */
135 	fmd_stat_t write_ack;
136 	fmd_stat_t write_bytes;
137 	fmd_stat_t write_msg;
138 	fmd_stat_t send_filter;
139 	/* error counters */
140 	fmd_stat_t error_protocol;
141 	fmd_stat_t error_drop_read;
142 	fmd_stat_t error_read;
143 	fmd_stat_t error_read_badhdr;
144 	fmd_stat_t error_write;
145 	fmd_stat_t error_send_filter;
146 	fmd_stat_t error_post_filter;
147 	/* misc */
148 	fmd_stat_t peer_count;
149 
150 } Etm_stats = {
151 	/* read counters */
152 	{ "read_ack", FMD_TYPE_UINT64, "ACKs read" },
153 	{ "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
154 	{ "read_msg", FMD_TYPE_UINT64, "Messages read" },
155 	{ "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
156 	/* write counters */
157 	{ "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
158 	{ "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
159 	{ "write_msg", FMD_TYPE_UINT64, "Messages sent" },
160 	{ "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
161 	/* ETM error counters */
162 	{ "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
163 	{ "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
164 	{ "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
165 	{ "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
166 	{ "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
167 	{ "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
168 	{ "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
169 	/* ETM Misc */
170 	{ "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
171 };
172 
173 /*
174  * ETM Private functions
175  */
176 
177 /*
178  * Hex dump for debug.
179  */
180 static void
181 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
182 {
183 	int i, j, k;
184 	int16_t *c;
185 
186 	if (Etm_dump == 0)
187 		return;
188 
189 	j = buflen / 16;	/* Number of complete 8-column rows */
190 	k = buflen % 16;	/* Is there a last (non-8-column) row? */
191 
192 	if (direction)
193 		fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
194 	else
195 		fmd_hdl_debug(hdl, "---  READ Message Dump ---");
196 
197 	fmd_hdl_debug(hdl, "   Displaying %d bytes", buflen);
198 
199 	/* Dump the complete 8-column rows */
200 	for (i = 0; i < j; i++) {
201 		c = (int16_t *)buf + (i * 8);
202 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x %4x %4x", i,
203 		    *(c+0), *(c+1), *(c+2), *(c+3),
204 		    *(c+4), *(c+5), *(c+6), *(c+7));
205 	}
206 
207 	/* Dump the last (incomplete) row */
208 	c = (int16_t *)buf + (i * 8);
209 	switch (k) {
210 	case 4:
211 		fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
212 		break;
213 	case 8:
214 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
215 		    *(c+2), *(c+3));
216 		break;
217 	case 12:
218 		fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x   %4x %4x", i, *(c+0),
219 		    *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
220 		break;
221 	}
222 
223 	fmd_hdl_debug(hdl, "---      End Dump      ---");
224 }
225 
226 /*
227  * Provide the length of a message based on the data in the given ETM header.
228  */
229 static size_t
230 etm_get_msglen(void *buf)
231 {
232 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
233 
234 	return (ntohl(hp->hdr_msglen));
235 }
236 
237 /*
238  * Check the contents of the ETM header for errors.
239  * Return the header type (hdr_type).
240  */
241 static int
242 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
243 {
244 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
245 
246 	if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247 		fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
248 		    ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
249 		return (ETM_HDR_INVALID);
250 	}
251 
252 	if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
253 	    (hp->hdr_type == ETM_HDR_S_HELLO)) {
254 		/* Until version is negotiated, other fields may be wrong */
255 		return (hp->hdr_type);
256 	}
257 
258 	if (hp->hdr_ver != mp->epm_ver) {
259 		fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
260 		    mp->epm_ep_str, hp->hdr_ver);
261 		return (ETM_HDR_BADVERSION);
262 	}
263 
264 	if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
265 	    (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266 		fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
267 		    mp->epm_ep_str, hp->hdr_type);
268 		return (ETM_HDR_BADTYPE);
269 	}
270 
271 	return (hp->hdr_type);
272 }
273 
274 /*
275  * Create an ETM header of a given type in the given buffer.
276  * Return length of header.
277  */
278 static size_t
279 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
280 {
281 	etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
282 
283 	bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
284 	hp->hdr_ver = ver;
285 	hp->hdr_type = type;
286 	hp->hdr_msglen = htonl(msglen);
287 
288 	return (ETM_HDRLEN);
289 }
290 
291 /*
292  * Convert message bytes to nvlist and post to fmd.
293  * Return zero for success, non-zero for failure.
294  *
295  * Note : nvl is free'd by fmd.
296  */
297 static int
298 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
299 {
300 	nvlist_t *nvl;
301 	int rv;
302 
303 	if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304 		fmd_hdl_error(hdl, "failed to unpack message");
305 		return (1);
306 	}
307 
308 	rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
309 	if (rv == ETM_XPORT_FILTER_DROP) {
310 		fmd_hdl_debug(hdl, "post_filter dropped event");
311 		INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
312 		nvlist_free(nvl);
313 		return (0);
314 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
315 		fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
316 		INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
317 		/* Still post event */
318 	}
319 
320 	(void) pthread_mutex_lock(&mp->epm_lock);
321 	(void) pthread_mutex_lock(&Etm_mod_lock);
322 	if (!Etm_exit) {
323 		(void) pthread_mutex_unlock(&Etm_mod_lock);
324 		if (mp->epm_qstat == Q_OPEN) {
325 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
326 			rv = 0;
327 		} else if (mp->epm_qstat == Q_SUSPENDED) {
328 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
329 			if (mp->epm_timer_in_use) {
330 				fmd_timer_remove(hdl, mp->epm_timer_id);
331 				mp->epm_timer_in_use = 0;
332 			}
333 			mp->epm_qstat = Q_OPEN;
334 			fmd_hdl_debug(hdl, "queue resumed for %s",
335 			    mp->epm_ep_str);
336 			fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
337 			rv = 0;
338 		} else {
339 			fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
340 			    mp->epm_qstat);
341 			nvlist_free(nvl);
342 			/* Remote peer will attempt to resend event */
343 			rv = 2;
344 		}
345 	} else {
346 		(void) pthread_mutex_unlock(&Etm_mod_lock);
347 		fmd_hdl_debug(hdl, "unable to post message, module exiting");
348 		nvlist_free(nvl);
349 		/* Remote peer will attempt to resend event */
350 		rv = 3;
351 	}
352 
353 	(void) pthread_mutex_unlock(&mp->epm_lock);
354 
355 	return (rv);
356 }
357 
358 /*
359  * Handle the startup handshake to the server.  The client always initiates
360  * the startup handshake.  In the following sequence, we are the client and
361  * the remote endpoint is the server.
362  *
363  *	Client sends C_HELLO and transitions to Q_INIT_PENDING state.
364  *	Server sends S_HELLO and transitions to Q_INIT_PENDING state.
365  *	Client sends ACK and transitions to Q_OPEN state.
366  *	Server receives ACK and transitions to Q_OPEN state.
367  *
368  * Return 0 for success, nonzero for failure.
369  */
370 static int
371 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
372 {
373 	etm_proto_hdr_t *hp;
374 	size_t hdrlen = ETM_HDRLEN;
375 	int hdrstat;
376 	char hbuf[ETM_HDRLEN];
377 
378 	if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
379 		return (1);
380 
381 	mp->epm_cstat = C_OPEN;
382 
383 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
384 
385 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
386 	    hdrlen)) != hdrlen) {
387 		fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
388 		    mp->epm_ep_str);
389 		return (2);
390 	}
391 
392 	mp->epm_qstat = Q_INIT_PENDING;
393 
394 	if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
395 	    hdrlen)) != hdrlen) {
396 		fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
397 		    mp->epm_ep_str);
398 		return (3);
399 	}
400 
401 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
402 
403 	if (hdrstat != ETM_HDR_S_HELLO) {
404 		fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
405 		    "from %s", mp->epm_ep_str);
406 		return (4);
407 	}
408 
409 	/*
410 	 * Get version from the server.
411 	 * Currently, only one version is supported.
412 	 */
413 	hp = (etm_proto_hdr_t *)(void *)hbuf;
414 	if (hp->hdr_ver != ETM_PROTO_V1) {
415 		fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
416 		    mp->epm_ep_str, hp->hdr_ver);
417 		return (5);
418 	}
419 	mp->epm_ver = hp->hdr_ver;
420 
421 	hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
422 
423 	if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
424 	    hdrlen)) != hdrlen) {
425 		fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
426 		    mp->epm_ep_str);
427 		return (6);
428 	}
429 
430 	/*
431 	 * Call fmd_xprt_open and fmd_xprt_setspecific with
432 	 * Etm_mod_lock held to avoid race with etm_send thread.
433 	 */
434 	(void) pthread_mutex_lock(&Etm_mod_lock);
435 	if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
436 	    mp->epm_ep_nvl, NULL)) == NULL) {
437 		fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
438 		    mp->epm_ep_str);
439 	}
440 	fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
441 	(void) pthread_mutex_unlock(&Etm_mod_lock);
442 
443 	mp->epm_qstat = Q_OPEN;
444 	fmd_hdl_debug(hdl, "queue open for %s",  mp->epm_ep_str);
445 
446 	return (0);
447 }
448 
449 /*
450  * Alloc a nvlist and add a string for the endpoint.
451  * Return zero for success, non-zero for failure.
452  */
453 static int
454 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
455 {
456 	/*
457 	 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
458 	 * in fmd when this nvlist_t is free'd.
459 	 */
460 	(void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
461 
462 	if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
463 		fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
464 		    "for %s", mp->epm_ep_str);
465 		nvlist_free(mp->epm_ep_nvl);
466 		return (1);
467 	}
468 
469 	return (0);
470 }
471 
472 /*
473  * Free the nvlist for the endpoint_id string.
474  */
475 /*ARGSUSED*/
476 static void
477 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
478 {
479 	nvlist_free(mp->epm_ep_nvl);
480 }
481 
482 /*
483  * Check for a duplicate endpoint/peer string.
484  */
485 /*ARGSUSED*/
486 static int
487 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
488 {
489 	etm_epmap_t *mp;
490 
491 	for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
492 		if (strcmp(epname, mp->epm_ep_str) == 0)
493 			return (1);
494 
495 	return (0);
496 }
497 
498 /*
499  * Attempt to re-open a connection with the remote endpoint.
500  */
501 static void
502 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
503 {
504 	if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
505 		if (gethrtime() < mp->epm_reconn_end) {
506 			if ((mp->epm_oconn = etm_xport_open(hdl,
507 			    mp->epm_tlhdl)) == NULL) {
508 				fmd_hdl_debug(hdl, "reconnect failed for %s",
509 				    mp->epm_ep_str);
510 				mp->epm_timer_id = fmd_timer_install(hdl, mp,
511 				    NULL, Reconn_interval);
512 				mp->epm_timer_in_use = 1;
513 			} else {
514 				fmd_hdl_debug(hdl, "reconnect success for %s",
515 				    mp->epm_ep_str);
516 				mp->epm_reconn_end = 0;
517 				mp->epm_cstat = C_OPEN;
518 			}
519 		} else {
520 			fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
521 			    mp->epm_ep_str);
522 			mp->epm_reconn_end = 0;
523 			mp->epm_cstat = C_TIMED_OUT;
524 		}
525 	}
526 
527 	if (mp->epm_cstat == C_OPEN) {
528 		fmd_xprt_resume(hdl, mp->epm_xprthdl);
529 		mp->epm_qstat = Q_OPEN;
530 		fmd_hdl_debug(hdl, "queue resumed for %s",  mp->epm_ep_str);
531 	}
532 }
533 
534 /*
535  * Suspend a given connection and setup for reconnection retries.
536  * Assume caller holds lock on epm_lock.
537  */
538 static void
539 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
540 {
541 	(void) pthread_mutex_lock(&Etm_mod_lock);
542 	if (Etm_exit) {
543 		(void) pthread_mutex_unlock(&Etm_mod_lock);
544 		return;
545 	}
546 	(void) pthread_mutex_unlock(&Etm_mod_lock);
547 
548 	if (mp->epm_oconn != NULL) {
549 		(void) etm_xport_close(hdl, mp->epm_oconn);
550 		mp->epm_oconn = NULL;
551 	}
552 
553 	mp->epm_reconn_end = gethrtime() + Reconn_timeout;
554 	mp->epm_cstat = C_UNINITIALIZED;
555 
556 	if (mp->epm_xprthdl != NULL) {
557 		fmd_xprt_suspend(hdl, mp->epm_xprthdl);
558 		mp->epm_qstat = Q_SUSPENDED;
559 		fmd_hdl_debug(hdl, "queue suspended for %s",  mp->epm_ep_str);
560 
561 		if (mp->epm_timer_in_use == 0) {
562 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
563 			    Reconn_interval);
564 			mp->epm_timer_in_use = 1;
565 		}
566 	}
567 }
568 
569 /*
570  * Reinitialize the connection. The old fmd_xprt_t handle must be
571  * removed/closed first.
572  * Assume caller holds lock on epm_lock.
573  */
574 static void
575 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
576 {
577 	/*
578 	 * To avoid a deadlock, wait for etm_send to finish before
579 	 * calling fmd_xprt_close()
580 	 */
581 	while (mp->epm_txbusy)
582 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
583 
584 	if (mp->epm_xprthdl != NULL) {
585 		fmd_xprt_close(hdl, mp->epm_xprthdl);
586 		fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
587 		mp->epm_xprthdl = NULL;
588 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
589 		mp->epm_ep_nvl = NULL;
590 	}
591 
592 	if (mp->epm_timer_in_use) {
593 		fmd_timer_remove(hdl, mp->epm_timer_id);
594 		mp->epm_timer_in_use = 0;
595 	}
596 
597 	if (mp->epm_oconn != NULL) {
598 		(void) etm_xport_close(hdl, mp->epm_oconn);
599 		mp->epm_oconn = NULL;
600 	}
601 
602 	mp->epm_cstat = C_UNINITIALIZED;
603 	mp->epm_qstat = Q_UNINITIALIZED;
604 }
605 
606 /*
607  * Receive data from ETM transport layer.
608  * Note : This is not the fmdo_recv entry point.
609  *
610  */
611 static int
612 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
613 {
614 	size_t buflen, hdrlen;
615 	void *buf;
616 	char hbuf[ETM_HDRLEN];
617 	int hdrstat, rv;
618 
619 	hdrlen = ETM_HDRLEN;
620 
621 	if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
622 		fmd_hdl_debug(hdl, "failed to read header from %s",
623 		    mp->epm_ep_str);
624 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
625 		return (EIO);
626 	}
627 
628 	hdrstat = etm_check_hdr(hdl, mp, hbuf);
629 
630 	switch (hdrstat) {
631 	case ETM_HDR_INVALID:
632 		(void) pthread_mutex_lock(&mp->epm_lock);
633 		if (mp->epm_cstat == C_OPEN)
634 			mp->epm_cstat = C_CLOSED;
635 		(void) pthread_mutex_unlock(&mp->epm_lock);
636 
637 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
638 		rv = ECANCELED;
639 		break;
640 
641 	case ETM_HDR_BADTYPE:
642 	case ETM_HDR_BADVERSION:
643 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
644 
645 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
646 		    hdrlen)) != hdrlen) {
647 			fmd_hdl_debug(hdl, "failed to write NAK to %s",
648 			    mp->epm_ep_str);
649 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
650 			return (EIO);
651 		}
652 
653 		(void) pthread_mutex_lock(&mp->epm_lock);
654 		mp->epm_cstat = C_LIMBO;
655 		(void) pthread_mutex_unlock(&mp->epm_lock);
656 
657 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
658 		rv = ENOTSUP;
659 		break;
660 
661 	case ETM_HDR_C_HELLO:
662 		/* Client is initiating a startup handshake */
663 		(void) pthread_mutex_lock(&mp->epm_lock);
664 		etm_reinit(hdl, mp);
665 		mp->epm_qstat = Q_INIT_PENDING;
666 		(void) pthread_mutex_unlock(&mp->epm_lock);
667 
668 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
669 
670 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
671 		    hdrlen)) != hdrlen) {
672 			fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
673 			    mp->epm_ep_str);
674 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
675 			return (EIO);
676 		}
677 
678 		rv = 0;
679 		break;
680 
681 	case ETM_HDR_ACK:
682 		(void) pthread_mutex_lock(&mp->epm_lock);
683 		if (mp->epm_qstat == Q_INIT_PENDING) {
684 			/* This is client's ACK from startup handshake */
685 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
686 			if (mp->epm_ep_nvl == NULL)
687 				(void) etm_get_ep_nvl(hdl, mp);
688 
689 			/*
690 			 * Call fmd_xprt_open and fmd_xprt_setspecific with
691 			 * Etm_mod_lock held to avoid race with etm_send thread.
692 			 */
693 			(void) pthread_mutex_lock(&Etm_mod_lock);
694 			if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
695 			    mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
696 				fmd_hdl_abort(hdl, "Failed to init xprthdl "
697 				    "for %s", mp->epm_ep_str);
698 			}
699 			fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
700 			(void) pthread_mutex_unlock(&Etm_mod_lock);
701 
702 			mp->epm_qstat = Q_OPEN;
703 			(void) pthread_mutex_unlock(&mp->epm_lock);
704 			fmd_hdl_debug(hdl, "queue open for %s",
705 			    mp->epm_ep_str);
706 		} else {
707 			(void) pthread_mutex_unlock(&mp->epm_lock);
708 			fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
709 			    "from %s\n", mp->epm_ep_str);
710 			INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
711 		}
712 
713 		rv = 0;
714 		break;
715 
716 	case ETM_HDR_SHUTDOWN:
717 		fmd_hdl_debug(hdl, "received shutdown from %s",
718 		    mp->epm_ep_str);
719 
720 		(void) pthread_mutex_lock(&mp->epm_lock);
721 
722 		etm_reinit(hdl, mp);
723 
724 		if (IS_CLIENT(mp)) {
725 			/*
726 			 * A server shutdown is considered to be temporary.
727 			 * Prepare for reconnection.
728 			 */
729 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
730 			    Reconn_interval);
731 
732 			mp->epm_timer_in_use = 1;
733 		}
734 
735 		(void) pthread_mutex_unlock(&mp->epm_lock);
736 
737 		rv = ECANCELED;
738 		break;
739 
740 	case ETM_HDR_MSG:
741 		(void) pthread_mutex_lock(&mp->epm_lock);
742 		if (mp->epm_qstat == Q_UNINITIALIZED) {
743 			/* Peer (client) is unaware that we've restarted */
744 			(void) pthread_mutex_unlock(&mp->epm_lock);
745 			hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
746 			    ETM_HDR_S_RESTART, 0);
747 
748 			if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
749 			    hdrlen)) != hdrlen) {
750 				fmd_hdl_debug(hdl, "failed to write S_RESTART "
751 				    "to %s", mp->epm_ep_str);
752 				INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
753 				return (EIO);
754 			}
755 
756 			return (ECANCELED);
757 		}
758 		(void) pthread_mutex_unlock(&mp->epm_lock);
759 
760 		buflen = etm_get_msglen(hbuf);
761 		ALLOC_BUF(hdl, buf, buflen);
762 
763 		if (etm_xport_read(hdl, conn, Rw_timeout, buf,
764 		    buflen) != buflen) {
765 			fmd_hdl_debug(hdl, "failed to read message from %s",
766 			    mp->epm_ep_str);
767 			FREE_BUF(hdl, buf, buflen);
768 			INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
769 			return (EIO);
770 		}
771 
772 		INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
773 		ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
774 
775 		etm_hex_dump(hdl, buf, buflen, 0);
776 
777 		if (etm_post_msg(hdl, mp, buf, buflen)) {
778 			INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
779 			FREE_BUF(hdl, buf, buflen);
780 			return (EIO);
781 		}
782 
783 		FREE_BUF(hdl, buf, buflen);
784 
785 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
786 
787 		if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
788 		    hdrlen)) != hdrlen) {
789 			fmd_hdl_debug(hdl, "failed to write ACK to %s",
790 			    mp->epm_ep_str);
791 			INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
792 			return (EIO);
793 		}
794 
795 		INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
796 
797 		/*
798 		 * If we got this far and the current state of the
799 		 * outbound/sending connection is TIMED_OUT or
800 		 * LIMBO, then we should reinitialize it.
801 		 */
802 		(void) pthread_mutex_lock(&mp->epm_lock);
803 		if (mp->epm_cstat == C_TIMED_OUT ||
804 		    mp->epm_cstat == C_LIMBO) {
805 			if (mp->epm_oconn != NULL) {
806 				(void) etm_xport_close(hdl, mp->epm_oconn);
807 				mp->epm_oconn = NULL;
808 			}
809 			mp->epm_cstat = C_UNINITIALIZED;
810 			fmd_xprt_resume(hdl, mp->epm_xprthdl);
811 			if (mp->epm_timer_in_use) {
812 				fmd_timer_remove(hdl, mp->epm_timer_id);
813 				mp->epm_timer_in_use = 0;
814 			}
815 			mp->epm_qstat = Q_OPEN;
816 			fmd_hdl_debug(hdl, "queue resumed for %s",
817 			    mp->epm_ep_str);
818 		}
819 		(void) pthread_mutex_unlock(&mp->epm_lock);
820 
821 		rv = 0;
822 		break;
823 
824 	default:
825 		fmd_hdl_debug(hdl, "protocol error, unexpected header "
826 		    "from %s : %d", mp->epm_ep_str, hdrstat);
827 		INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
828 		rv = 0;
829 	}
830 
831 	return (rv);
832 }
833 
834 /*
835  * ETM transport layer callback function.
836  * The transport layer calls this function to :
837  *	(a) pass an incoming message (flag == ETM_CBFLAG_RECV)
838  *	(b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
839  */
840 static int
841 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
842     void *arg)
843 {
844 	etm_epmap_t *mp = (etm_epmap_t *)arg;
845 	int rv = 0;
846 
847 	(void) pthread_mutex_lock(&Etm_mod_lock);
848 	if (Etm_exit) {
849 		(void) pthread_mutex_unlock(&Etm_mod_lock);
850 		return (ECANCELED);
851 	}
852 	(void) pthread_mutex_unlock(&Etm_mod_lock);
853 
854 	switch (flag) {
855 	case ETM_CBFLAG_RECV:
856 		rv = etm_recv(hdl, conn, mp);
857 		break;
858 	case ETM_CBFLAG_REINIT:
859 		(void) pthread_mutex_lock(&mp->epm_lock);
860 		etm_reinit(hdl, mp);
861 		(void) pthread_mutex_unlock(&mp->epm_lock);
862 		/*
863 		 * Return ECANCELED so the transport layer will close the
864 		 * server connection.  The transport layer is responsible for
865 		 * reestablishing this connection (should a connection request
866 		 * arrive from the peer).
867 		 */
868 		rv = ECANCELED;
869 		break;
870 	default:
871 		fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
872 		rv = ENOTSUP;
873 	}
874 
875 	return (rv);
876 }
877 
878 /*
879  * Allocate and initialize an etm_epmap_t struct for the given endpoint
880  * name string.
881  */
882 static void
883 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
884 {
885 	etm_epmap_t *newmap;
886 
887 	if (etm_check_dup_ep_str(hdl, epname)) {
888 		fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
889 		return;
890 	}
891 
892 	newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
893 	newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
894 	newmap->epm_xprtflags = flags;
895 	newmap->epm_cstat = C_UNINITIALIZED;
896 	newmap->epm_qstat = Q_UNINITIALIZED;
897 	newmap->epm_ver = ETM_PROTO_V1;	/* Currently support one proto ver */
898 	newmap->epm_txbusy = 0;
899 
900 	(void) pthread_mutex_init(&newmap->epm_lock, NULL);
901 	(void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
902 
903 	if (etm_get_ep_nvl(hdl, newmap)) {
904 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
905 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
906 		return;
907 	}
908 
909 	if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
910 	    etm_cb_func, newmap)) == NULL) {
911 		fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
912 		    newmap->epm_ep_str);
913 		etm_free_ep_nvl(hdl, newmap);
914 		fmd_hdl_strfree(hdl, newmap->epm_ep_str);
915 		fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
916 		return;
917 	}
918 
919 	if (IS_CLIENT(newmap)) {
920 		if (etm_handle_startup(hdl, newmap)) {
921 			/*
922 			 * For whatever reason, we could not complete the
923 			 * startup handshake with the server.  Set the timer
924 			 * and try again.
925 			 */
926 			if (newmap->epm_oconn != NULL) {
927 				(void) etm_xport_close(hdl, newmap->epm_oconn);
928 				newmap->epm_oconn = NULL;
929 			}
930 			newmap->epm_cstat = C_UNINITIALIZED;
931 			newmap->epm_qstat = Q_UNINITIALIZED;
932 			newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
933 			    NULL, Reconn_interval);
934 			newmap->epm_timer_in_use = 1;
935 		}
936 	}
937 
938 	/* Add this transport instance handle to the list */
939 	newmap->epm_next = Epmap_head;
940 	Epmap_head = newmap;
941 
942 	INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
943 }
944 
945 /*
946  * Parse the given property list string and call etm_init_epmap
947  * for each endpoint.
948  */
949 static void
950 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
951 {
952 	char *epstr, *ep, *prefix, *lasts, *numstr;
953 	char epname[MAXPATHLEN];
954 	size_t slen, nlen;
955 	int beg, end, i;
956 
957 	if (eplist == NULL)
958 		return;
959 	/*
960 	 * Create a copy of eplist for parsing.
961 	 * strtok/strtok_r(3C) will insert null chars to the string.
962 	 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
963 	 */
964 	slen = strlen(eplist);
965 	epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
966 	(void) strcpy(epstr, eplist);
967 
968 	/*
969 	 * The following are supported for the "client_list" and
970 	 * "server_list" properties :
971 	 *
972 	 *    A space-separated list of endpoints.
973 	 *	"dev:///dom0 dev:///dom1 dev:///dom2"
974 	 *
975 	 *    An array syntax for a range of instances.
976 	 *	"dev:///dom[0:2]"
977 	 *
978 	 *    A combination of both.
979 	 *	"dev:///dom0 dev:///dom[1:2]"
980 	 */
981 	ep = strtok_r(epstr, " ", &lasts);
982 	while (ep != NULL) {
983 		if (strchr(ep, '[') != NULL) {
984 			/*
985 			 * This string is using array syntax.
986 			 * Check the string for correct syntax.
987 			 */
988 			if ((strchr(ep, ':') == NULL) ||
989 			    (strchr(ep, ']') == NULL)) {
990 				fmd_hdl_error(hdl, "Syntax error in property "
991 				    "that includes : %s\n", ep);
992 				ep = strtok_r(NULL, " ", &lasts);
993 				continue;
994 			}
995 
996 			/* expand the array syntax */
997 			prefix = strtok(ep, "[");
998 
999 			numstr = strtok(NULL, ":");
1000 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1001 				fmd_hdl_error(hdl, "Syntax error in property "
1002 				    "that includes : %s[\n", prefix);
1003 				ep = strtok_r(NULL, " ", &lasts);
1004 				continue;
1005 			}
1006 			beg = atoi(numstr);
1007 
1008 			numstr = strtok(NULL, "]");
1009 			if ((numstr == NULL) || (!isdigit(*numstr))) {
1010 				fmd_hdl_error(hdl, "Syntax error in property "
1011 				    "that includes : %s[\n", prefix);
1012 				ep = strtok_r(NULL, " ", &lasts);
1013 				continue;
1014 			}
1015 			end = atoi(numstr);
1016 
1017 			nlen = strlen(prefix) + ETM_EP_INST_MAX;
1018 
1019 			if (nlen > MAXPATHLEN) {
1020 				fmd_hdl_error(hdl, "Endpoint prop string "
1021 				    "exceeds MAXPATHLEN\n");
1022 				ep = strtok_r(NULL, " ", &lasts);
1023 				continue;
1024 			}
1025 
1026 			for (i = beg; i <= end; i++) {
1027 				bzero(epname, MAXPATHLEN);
1028 				(void) snprintf(epname, nlen, "%s%d",
1029 				    prefix, i);
1030 				etm_init_epmap(hdl, epname, flags);
1031 			}
1032 		} else {
1033 			etm_init_epmap(hdl, ep, flags);
1034 		}
1035 
1036 		ep = strtok_r(NULL, " ", &lasts);
1037 	}
1038 
1039 	fmd_hdl_free(hdl, epstr, slen + 1);
1040 }
1041 
1042 /*
1043  * Free the transport infrastructure for an endpoint.
1044  */
1045 static void
1046 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1047 {
1048 	size_t hdrlen;
1049 	char hbuf[ETM_HDRLEN];
1050 
1051 	(void) pthread_mutex_lock(&mp->epm_lock);
1052 
1053 	/*
1054 	 * If an etm_send thread is in progress, wait for it to finish.
1055 	 * The etm_recv thread is managed by the transport layer and will
1056 	 * be destroyed with etm_xport_fini().
1057 	 */
1058 	while (mp->epm_txbusy)
1059 		(void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1060 
1061 	if (mp->epm_timer_in_use)
1062 		fmd_timer_remove(hdl, mp->epm_timer_id);
1063 
1064 	if (mp->epm_oconn != NULL) {
1065 		hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1066 		    ETM_HDR_SHUTDOWN, 0);
1067 		(void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1068 		    hdrlen);
1069 		(void) etm_xport_close(hdl, mp->epm_oconn);
1070 		mp->epm_oconn = NULL;
1071 	}
1072 
1073 	if (mp->epm_xprthdl != NULL) {
1074 		fmd_xprt_close(hdl, mp->epm_xprthdl);
1075 		/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1076 		mp->epm_ep_nvl = NULL;
1077 	}
1078 
1079 	if (mp->epm_ep_nvl != NULL)
1080 		etm_free_ep_nvl(hdl, mp);
1081 
1082 	if (mp->epm_tlhdl != NULL)
1083 		(void) etm_xport_fini(hdl, mp->epm_tlhdl);
1084 
1085 	(void) pthread_mutex_unlock(&mp->epm_lock);
1086 	(void) pthread_mutex_destroy(&mp->epm_lock);
1087 	fmd_hdl_strfree(hdl, mp->epm_ep_str);
1088 	fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1089 	DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1090 }
1091 
1092 /*
1093  * FMD entry points
1094  */
1095 
1096 /*
1097  * FMD fmdo_send entry point.
1098  * Send an event to the remote endpoint and receive an ACK.
1099  */
1100 static int
1101 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1102 {
1103 	etm_epmap_t *mp;
1104 	nvlist_t *msgnvl;
1105 	int hdrstat, rv, cnt = 0;
1106 	char *buf, *nvbuf, *class;
1107 	size_t nvsize, buflen, hdrlen;
1108 	struct timespec tms;
1109 
1110 	(void) pthread_mutex_lock(&Etm_mod_lock);
1111 	if (Etm_exit) {
1112 		(void) pthread_mutex_unlock(&Etm_mod_lock);
1113 		return (FMD_SEND_RETRY);
1114 	}
1115 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1116 
1117 	mp = fmd_xprt_getspecific(hdl, xprthdl);
1118 
1119 	for (;;) {
1120 		if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1121 			break;
1122 		} else {
1123 			/*
1124 			 * Another thread may be (1) trying to close this
1125 			 * fmd_xprt_t, or (2) posting an event to it.
1126 			 * If (1), don't want to spend too much time here.
1127 			 * If (2), allow it to finish and release epm_lock.
1128 			 */
1129 			if (cnt++ < 10) {
1130 				tms.tv_sec = 0;
1131 				tms.tv_nsec = (cnt * 10000);
1132 				(void) nanosleep(&tms, NULL);
1133 
1134 			} else {
1135 				return (FMD_SEND_RETRY);
1136 			}
1137 		}
1138 	}
1139 
1140 	mp->epm_txbusy++;
1141 
1142 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1143 		mp->epm_txbusy--;
1144 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1145 		(void) pthread_mutex_unlock(&mp->epm_lock);
1146 		return (FMD_SEND_FAILED);
1147 	}
1148 
1149 	if (mp->epm_cstat == C_CLOSED) {
1150 		etm_suspend_reconnect(hdl, mp);
1151 		mp->epm_txbusy--;
1152 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1153 		(void) pthread_mutex_unlock(&mp->epm_lock);
1154 		return (FMD_SEND_RETRY);
1155 	}
1156 
1157 	if (mp->epm_cstat == C_LIMBO) {
1158 		if (mp->epm_oconn != NULL) {
1159 			(void) etm_xport_close(hdl, mp->epm_oconn);
1160 			mp->epm_oconn = NULL;
1161 		}
1162 
1163 		fmd_xprt_suspend(hdl, xprthdl);
1164 		mp->epm_qstat = Q_SUSPENDED;
1165 		mp->epm_txbusy--;
1166 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1167 		(void) pthread_mutex_unlock(&mp->epm_lock);
1168 		fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1169 		return (FMD_SEND_RETRY);
1170 	}
1171 
1172 	if (mp->epm_oconn == NULL) {
1173 		if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1174 		    == NULL) {
1175 			etm_suspend_reconnect(hdl, mp);
1176 			mp->epm_txbusy--;
1177 			(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1178 			(void) pthread_mutex_unlock(&mp->epm_lock);
1179 			return (FMD_SEND_RETRY);
1180 		} else {
1181 			mp->epm_cstat = C_OPEN;
1182 		}
1183 	}
1184 
1185 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1186 		fmd_hdl_abort(hdl, "No class string in nvlist");
1187 
1188 	msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1189 	if (msgnvl == NULL) {
1190 		mp->epm_txbusy--;
1191 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1192 		(void) pthread_mutex_unlock(&mp->epm_lock);
1193 		fmd_hdl_error(hdl, "Failed to translate event %p\n",
1194 		    (void *) ep);
1195 		return (FMD_SEND_FAILED);
1196 	}
1197 
1198 	rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1199 	if (rv == ETM_XPORT_FILTER_DROP) {
1200 		mp->epm_txbusy--;
1201 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1202 		(void) pthread_mutex_unlock(&mp->epm_lock);
1203 		fmd_hdl_debug(hdl, "send_filter dropped event");
1204 		nvlist_free(msgnvl);
1205 		INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1206 		return (FMD_SEND_SUCCESS);
1207 	} else if (rv == ETM_XPORT_FILTER_ERROR) {
1208 		fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1209 		INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1210 		/* Still send event */
1211 	}
1212 
1213 	(void) pthread_mutex_unlock(&mp->epm_lock);
1214 
1215 	(void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1216 
1217 	hdrlen = ETM_HDRLEN;
1218 	buflen = nvsize + hdrlen;
1219 
1220 	ALLOC_BUF(hdl, buf, buflen);
1221 
1222 	nvbuf = buf + hdrlen;
1223 
1224 	(void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1225 
1226 	if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1227 		(void) pthread_mutex_lock(&mp->epm_lock);
1228 		mp->epm_txbusy--;
1229 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1230 		(void) pthread_mutex_unlock(&mp->epm_lock);
1231 		fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1232 		nvlist_free(msgnvl);
1233 		FREE_BUF(hdl, buf, buflen);
1234 		return (FMD_SEND_FAILED);
1235 	}
1236 
1237 	nvlist_free(msgnvl);
1238 
1239 	if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1240 	    buflen) != buflen) {
1241 		fmd_hdl_debug(hdl, "failed to send message to %s",
1242 		    mp->epm_ep_str);
1243 		(void) pthread_mutex_lock(&mp->epm_lock);
1244 		etm_suspend_reconnect(hdl, mp);
1245 		mp->epm_txbusy--;
1246 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1247 		(void) pthread_mutex_unlock(&mp->epm_lock);
1248 		FREE_BUF(hdl, buf, buflen);
1249 		INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1250 		return (FMD_SEND_RETRY);
1251 	}
1252 
1253 	INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1254 	ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1255 
1256 	etm_hex_dump(hdl, nvbuf, nvsize, 1);
1257 
1258 	if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1259 	    hdrlen) != hdrlen) {
1260 		fmd_hdl_debug(hdl, "failed to read ACK from %s",
1261 		    mp->epm_ep_str);
1262 		(void) pthread_mutex_lock(&mp->epm_lock);
1263 		etm_suspend_reconnect(hdl, mp);
1264 		mp->epm_txbusy--;
1265 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1266 		(void) pthread_mutex_unlock(&mp->epm_lock);
1267 		FREE_BUF(hdl, buf, buflen);
1268 		INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1269 		return (FMD_SEND_RETRY);
1270 	}
1271 
1272 	hdrstat = etm_check_hdr(hdl, mp, buf);
1273 	FREE_BUF(hdl, buf, buflen);
1274 
1275 	if (hdrstat == ETM_HDR_ACK) {
1276 		INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1277 	} else {
1278 		(void) pthread_mutex_lock(&mp->epm_lock);
1279 
1280 		(void) etm_xport_close(hdl, mp->epm_oconn);
1281 		mp->epm_oconn = NULL;
1282 
1283 		if (hdrstat == ETM_HDR_NAK) {
1284 			/* Peer received a bad value in the header */
1285 			if (mp->epm_xprthdl != NULL) {
1286 				mp->epm_cstat = C_LIMBO;
1287 				fmd_xprt_suspend(hdl, xprthdl);
1288 				mp->epm_qstat = Q_SUSPENDED;
1289 				fmd_hdl_debug(hdl, "received NAK, queue "
1290 				    "suspended for %s", mp->epm_ep_str);
1291 			}
1292 
1293 			rv = FMD_SEND_RETRY;
1294 
1295 		} else if (hdrstat == ETM_HDR_S_RESTART) {
1296 			/* Server has restarted */
1297 			mp->epm_cstat = C_CLOSED;
1298 			mp->epm_qstat = Q_UNINITIALIZED;
1299 			fmd_hdl_debug(hdl, "server %s restarted",
1300 			    mp->epm_ep_str);
1301 			/*
1302 			 * Cannot call fmd_xprt_close here, so we'll do it
1303 			 * on the timeout thread.
1304 			 */
1305 			if (mp->epm_timer_in_use == 0) {
1306 				mp->epm_timer_id = fmd_timer_install(
1307 				    hdl, mp, NULL, 0);
1308 				mp->epm_timer_in_use = 1;
1309 			}
1310 
1311 			/*
1312 			 * fault.* or list.* events will be replayed if a
1313 			 * transport is opened with the same auth.
1314 			 * Other events will be discarded.
1315 			 */
1316 			rv = FMD_SEND_FAILED;
1317 
1318 		} else {
1319 			mp->epm_cstat = C_CLOSED;
1320 			fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1321 
1322 			rv = FMD_SEND_RETRY;
1323 		}
1324 
1325 		mp->epm_txbusy--;
1326 
1327 		(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1328 		(void) pthread_mutex_unlock(&mp->epm_lock);
1329 
1330 		INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1331 
1332 		return (rv);
1333 	}
1334 
1335 	(void) pthread_mutex_lock(&mp->epm_lock);
1336 	mp->epm_txbusy--;
1337 	(void) pthread_cond_broadcast(&mp->epm_tx_cv);
1338 	(void) pthread_mutex_unlock(&mp->epm_lock);
1339 
1340 	return (FMD_SEND_SUCCESS);
1341 }
1342 
1343 /*
1344  * FMD fmdo_timeout entry point..
1345  */
1346 /*ARGSUSED*/
1347 static void
1348 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1349 {
1350 	etm_epmap_t *mp = (etm_epmap_t *)data;
1351 
1352 	(void) pthread_mutex_lock(&mp->epm_lock);
1353 
1354 	mp->epm_timer_in_use = 0;
1355 
1356 	if (mp->epm_qstat == Q_UNINITIALIZED) {
1357 		/* Server has shutdown and we (client) need to reconnect */
1358 		if (mp->epm_xprthdl != NULL) {
1359 			fmd_xprt_close(hdl, mp->epm_xprthdl);
1360 			fmd_hdl_debug(hdl, "queue closed for %s",
1361 			    mp->epm_ep_str);
1362 			mp->epm_xprthdl = NULL;
1363 			/* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1364 			mp->epm_ep_nvl = NULL;
1365 		}
1366 
1367 		if (mp->epm_ep_nvl == NULL)
1368 			(void) etm_get_ep_nvl(hdl, mp);
1369 
1370 		if (etm_handle_startup(hdl, mp)) {
1371 			if (mp->epm_oconn != NULL) {
1372 				(void) etm_xport_close(hdl, mp->epm_oconn);
1373 				mp->epm_oconn = NULL;
1374 			}
1375 			mp->epm_cstat = C_UNINITIALIZED;
1376 			mp->epm_qstat = Q_UNINITIALIZED;
1377 			mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1378 			    Reconn_interval);
1379 			mp->epm_timer_in_use = 1;
1380 		}
1381 	} else {
1382 		etm_reconnect(hdl, mp);
1383 	}
1384 
1385 	(void) pthread_mutex_unlock(&mp->epm_lock);
1386 }
1387 
1388 /*
1389  * FMD Module declarations
1390  */
1391 static const fmd_hdl_ops_t etm_ops = {
1392 	NULL,		/* fmdo_recv */
1393 	etm_timeout,	/* fmdo_timeout */
1394 	NULL,		/* fmdo_close */
1395 	NULL,		/* fmdo_stats */
1396 	NULL,		/* fmdo_gc */
1397 	etm_send,	/* fmdo_send */
1398 };
1399 
1400 static const fmd_prop_t etm_props[] = {
1401 	{ "client_list", FMD_TYPE_STRING, NULL },
1402 	{ "server_list", FMD_TYPE_STRING, NULL },
1403 	{ "reconnect_interval",	FMD_TYPE_UINT64, "10000000000" },
1404 	{ "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1405 	{ "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1406 	{ "filter_path", FMD_TYPE_STRING, NULL },
1407 	{ NULL, 0, NULL }
1408 };
1409 
1410 static const fmd_hdl_info_t etm_info = {
1411 	"Event Transport Module", "2.0", &etm_ops, etm_props
1412 };
1413 
1414 /*
1415  * Initialize the transport for use by ETM.
1416  */
1417 void
1418 _fmd_init(fmd_hdl_t *hdl)
1419 {
1420 	char *propstr;
1421 
1422 	if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1423 		return; /* invalid data in configuration file */
1424 	}
1425 
1426 	/* Create global stats */
1427 	(void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1428 	    sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1429 
1430 	/* Get module properties */
1431 	Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1432 	Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1433 	Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1434 
1435 	propstr = fmd_prop_get_string(hdl, "client_list");
1436 	etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1437 	fmd_prop_free_string(hdl, propstr);
1438 
1439 	propstr = fmd_prop_get_string(hdl, "server_list");
1440 	etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1441 	fmd_prop_free_string(hdl, propstr);
1442 
1443 	if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1444 		fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1445 		fmd_hdl_unregister(hdl);
1446 		return;
1447 	}
1448 }
1449 
1450 /*
1451  * Teardown the transport
1452  */
1453 void
1454 _fmd_fini(fmd_hdl_t *hdl)
1455 {
1456 	etm_epmap_t *mp, *next;
1457 
1458 	(void) pthread_mutex_lock(&Etm_mod_lock);
1459 	Etm_exit = 1;
1460 	(void) pthread_mutex_unlock(&Etm_mod_lock);
1461 
1462 	mp = Epmap_head;
1463 
1464 	while (mp) {
1465 		next = mp->epm_next;
1466 		etm_free_epmap(hdl, mp);
1467 		mp = next;
1468 	}
1469 
1470 	fmd_hdl_unregister(hdl);
1471 }
1472