1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 /*
28 * FMA Event Transport Module
29 *
30 * Plugin for sending/receiving FMA events to/from a remote endoint.
31 */
32
33 #include <netinet/in.h>
34 #include <errno.h>
35 #include <sys/fm/protocol.h>
36 #include <sys/sysmacros.h>
37 #include <pthread.h>
38 #include <strings.h>
39 #include <ctype.h>
40 #include <link.h>
41 #include <libnvpair.h>
42 #include "etm_xport_api.h"
43 #include "etm_proto.h"
44
45 /*
46 * ETM declarations
47 */
48
49 typedef enum etm_connection_status {
50 C_UNINITIALIZED = 0,
51 C_OPEN, /* Connection is open */
52 C_CLOSED, /* Connection is closed */
53 C_LIMBO, /* Bad value in header from peer */
54 C_TIMED_OUT /* Reconnection to peer timed out */
55 } etm_connstat_t;
56
57 typedef enum etm_fmd_queue_status {
58 Q_UNINITIALIZED = 100,
59 Q_INIT_PENDING, /* Queue initialization in progress */
60 Q_OPEN, /* Queue is open */
61 Q_SUSPENDED /* Queue is suspended */
62 } etm_qstat_t;
63
64 /* Per endpoint data */
65 typedef struct etm_endpoint_map {
66 uint8_t epm_ver; /* Protocol version being used */
67 char *epm_ep_str; /* Endpoint ID string */
68 int epm_xprtflags; /* FMD transport open flags */
69 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */
70 pthread_mutex_t epm_lock; /* Protects remainder of struct */
71 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */
72 int epm_txbusy; /* Busy doing send/transmit */
73 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */
74 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */
75 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */
76 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */
77 etm_connstat_t epm_cstat; /* Status of connection */
78 id_t epm_timer_id; /* Timer id */
79 int epm_timer_in_use; /* Indicates if timer is in use */
80 hrtime_t epm_reconn_end; /* Reconnection end time */
81 struct etm_endpoint_map *epm_next;
82 } etm_epmap_t;
83
84 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
85 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
86 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
87 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */
88 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
89 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
90
91 #define ALLOC_BUF(hdl, buf, size) \
92 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
93
94 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
95
96 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
97
98 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
99 (x)++; \
100 (void) pthread_mutex_unlock(&Etm_mod_lock); \
101 }
102
103 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
104 (x)--; \
105 (void) pthread_mutex_unlock(&Etm_mod_lock); \
106 }
107
108 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \
109 (x) += (y); \
110 (void) pthread_mutex_unlock(&Etm_mod_lock); \
111 }
112
113 /*
114 * Global variables
115 */
116 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
117 /* Protects globals */
118 static hrtime_t Reconn_interval; /* Time between reconnection attempts */
119 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */
120 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */
121 static int Etm_dump = 0; /* Enables hex dump for debug */
122 static int Etm_exit = 0; /* Flag for exit */
123 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */
124
125 /* Module statistics */
126 static struct etm_stats {
127 /* read counters */
128 fmd_stat_t read_ack;
129 fmd_stat_t read_bytes;
130 fmd_stat_t read_msg;
131 fmd_stat_t post_filter;
132 /* write counters */
133 fmd_stat_t write_ack;
134 fmd_stat_t write_bytes;
135 fmd_stat_t write_msg;
136 fmd_stat_t send_filter;
137 /* error counters */
138 fmd_stat_t error_protocol;
139 fmd_stat_t error_drop_read;
140 fmd_stat_t error_read;
141 fmd_stat_t error_read_badhdr;
142 fmd_stat_t error_write;
143 fmd_stat_t error_send_filter;
144 fmd_stat_t error_post_filter;
145 /* misc */
146 fmd_stat_t peer_count;
147
148 } Etm_stats = {
149 /* read counters */
150 { "read_ack", FMD_TYPE_UINT64, "ACKs read" },
151 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
152 { "read_msg", FMD_TYPE_UINT64, "Messages read" },
153 { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
154 /* write counters */
155 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
156 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
157 { "write_msg", FMD_TYPE_UINT64, "Messages sent" },
158 { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
159 /* ETM error counters */
160 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
161 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
162 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
163 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
164 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
165 { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
166 { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
167 /* ETM Misc */
168 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
169 };
170
171 /*
172 * ETM Private functions
173 */
174
175 /*
176 * Hex dump for debug.
177 */
178 static void
etm_hex_dump(fmd_hdl_t * hdl,void * buf,size_t buflen,int direction)179 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
180 {
181 int i, j, k;
182 int16_t *c;
183
184 if (Etm_dump == 0)
185 return;
186
187 j = buflen / 16; /* Number of complete 8-column rows */
188 k = buflen % 16; /* Is there a last (non-8-column) row? */
189
190 if (direction)
191 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
192 else
193 fmd_hdl_debug(hdl, "--- READ Message Dump ---");
194
195 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen);
196
197 /* Dump the complete 8-column rows */
198 for (i = 0; i < j; i++) {
199 c = (int16_t *)buf + (i * 8);
200 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i,
201 *(c+0), *(c+1), *(c+2), *(c+3),
202 *(c+4), *(c+5), *(c+6), *(c+7));
203 }
204
205 /* Dump the last (incomplete) row */
206 c = (int16_t *)buf + (i * 8);
207 switch (k) {
208 case 4:
209 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
210 break;
211 case 8:
212 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
213 *(c+2), *(c+3));
214 break;
215 case 12:
216 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0),
217 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
218 break;
219 }
220
221 fmd_hdl_debug(hdl, "--- End Dump ---");
222 }
223
224 /*
225 * Provide the length of a message based on the data in the given ETM header.
226 */
227 static size_t
etm_get_msglen(void * buf)228 etm_get_msglen(void *buf)
229 {
230 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
231
232 return (ntohl(hp->hdr_msglen));
233 }
234
235 /*
236 * Check the contents of the ETM header for errors.
237 * Return the header type (hdr_type).
238 */
239 static int
etm_check_hdr(fmd_hdl_t * hdl,etm_epmap_t * mp,void * buf)240 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
241 {
242 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
243
244 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
245 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
246 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
247 return (ETM_HDR_INVALID);
248 }
249
250 if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
251 (hp->hdr_type == ETM_HDR_S_HELLO)) {
252 /* Until version is negotiated, other fields may be wrong */
253 return (hp->hdr_type);
254 }
255
256 if (hp->hdr_ver != mp->epm_ver) {
257 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
258 mp->epm_ep_str, hp->hdr_ver);
259 return (ETM_HDR_BADVERSION);
260 }
261
262 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
263 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
264 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
265 mp->epm_ep_str, hp->hdr_type);
266 return (ETM_HDR_BADTYPE);
267 }
268
269 return (hp->hdr_type);
270 }
271
272 /*
273 * Create an ETM header of a given type in the given buffer.
274 * Return length of header.
275 */
276 static size_t
etm_create_hdr(void * buf,uint8_t ver,uint8_t type,uint32_t msglen)277 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
278 {
279 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
280
281 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
282 hp->hdr_ver = ver;
283 hp->hdr_type = type;
284 hp->hdr_msglen = htonl(msglen);
285
286 return (ETM_HDRLEN);
287 }
288
289 /*
290 * Convert message bytes to nvlist and post to fmd.
291 * Return zero for success, non-zero for failure.
292 *
293 * Note : nvl is free'd by fmd.
294 */
295 static int
etm_post_msg(fmd_hdl_t * hdl,etm_epmap_t * mp,void * buf,size_t buflen)296 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
297 {
298 nvlist_t *nvl;
299 int rv;
300
301 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
302 fmd_hdl_error(hdl, "failed to unpack message");
303 return (1);
304 }
305
306 rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
307 if (rv == ETM_XPORT_FILTER_DROP) {
308 fmd_hdl_debug(hdl, "post_filter dropped event");
309 INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
310 nvlist_free(nvl);
311 return (0);
312 } else if (rv == ETM_XPORT_FILTER_ERROR) {
313 fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
314 INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
315 /* Still post event */
316 }
317
318 (void) pthread_mutex_lock(&mp->epm_lock);
319 (void) pthread_mutex_lock(&Etm_mod_lock);
320 if (!Etm_exit) {
321 (void) pthread_mutex_unlock(&Etm_mod_lock);
322 if (mp->epm_qstat == Q_OPEN) {
323 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
324 rv = 0;
325 } else if (mp->epm_qstat == Q_SUSPENDED) {
326 fmd_xprt_resume(hdl, mp->epm_xprthdl);
327 if (mp->epm_timer_in_use) {
328 fmd_timer_remove(hdl, mp->epm_timer_id);
329 mp->epm_timer_in_use = 0;
330 }
331 mp->epm_qstat = Q_OPEN;
332 fmd_hdl_debug(hdl, "queue resumed for %s",
333 mp->epm_ep_str);
334 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
335 rv = 0;
336 } else {
337 fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
338 mp->epm_qstat);
339 nvlist_free(nvl);
340 /* Remote peer will attempt to resend event */
341 rv = 2;
342 }
343 } else {
344 (void) pthread_mutex_unlock(&Etm_mod_lock);
345 fmd_hdl_debug(hdl, "unable to post message, module exiting");
346 nvlist_free(nvl);
347 /* Remote peer will attempt to resend event */
348 rv = 3;
349 }
350
351 (void) pthread_mutex_unlock(&mp->epm_lock);
352
353 return (rv);
354 }
355
356 /*
357 * Handle the startup handshake to the server. The client always initiates
358 * the startup handshake. In the following sequence, we are the client and
359 * the remote endpoint is the server.
360 *
361 * Client sends C_HELLO and transitions to Q_INIT_PENDING state.
362 * Server sends S_HELLO and transitions to Q_INIT_PENDING state.
363 * Client sends ACK and transitions to Q_OPEN state.
364 * Server receives ACK and transitions to Q_OPEN state.
365 *
366 * Return 0 for success, nonzero for failure.
367 */
368 static int
etm_handle_startup(fmd_hdl_t * hdl,etm_epmap_t * mp)369 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
370 {
371 etm_proto_hdr_t *hp;
372 size_t hdrlen = ETM_HDRLEN;
373 int hdrstat;
374 char hbuf[ETM_HDRLEN];
375
376 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
377 return (1);
378
379 mp->epm_cstat = C_OPEN;
380
381 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
382
383 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
384 hdrlen)) != hdrlen) {
385 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
386 mp->epm_ep_str);
387 return (2);
388 }
389
390 mp->epm_qstat = Q_INIT_PENDING;
391
392 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
393 hdrlen)) != hdrlen) {
394 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
395 mp->epm_ep_str);
396 return (3);
397 }
398
399 hdrstat = etm_check_hdr(hdl, mp, hbuf);
400
401 if (hdrstat != ETM_HDR_S_HELLO) {
402 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
403 "from %s", mp->epm_ep_str);
404 return (4);
405 }
406
407 /*
408 * Get version from the server.
409 * Currently, only one version is supported.
410 */
411 hp = (etm_proto_hdr_t *)(void *)hbuf;
412 if (hp->hdr_ver != ETM_PROTO_V1) {
413 fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
414 mp->epm_ep_str, hp->hdr_ver);
415 return (5);
416 }
417 mp->epm_ver = hp->hdr_ver;
418
419 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
420
421 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
422 hdrlen)) != hdrlen) {
423 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
424 mp->epm_ep_str);
425 return (6);
426 }
427
428 /*
429 * Call fmd_xprt_open and fmd_xprt_setspecific with
430 * Etm_mod_lock held to avoid race with etm_send thread.
431 */
432 (void) pthread_mutex_lock(&Etm_mod_lock);
433 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
434 mp->epm_ep_nvl, NULL)) == NULL) {
435 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
436 mp->epm_ep_str);
437 }
438 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
439 (void) pthread_mutex_unlock(&Etm_mod_lock);
440
441 mp->epm_qstat = Q_OPEN;
442 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str);
443
444 return (0);
445 }
446
447 /*
448 * Open a connection to the peer, send a SHUTDOWN message,
449 * and close the connection.
450 */
451 static void
etm_send_shutdown(fmd_hdl_t * hdl,etm_epmap_t * mp)452 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
453 {
454 size_t hdrlen = ETM_HDRLEN;
455 char hbuf[ETM_HDRLEN];
456
457 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
458 return;
459
460 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
461
462 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
463
464 (void) etm_xport_close(hdl, mp->epm_oconn);
465 mp->epm_oconn = NULL;
466 }
467
468 /*
469 * Alloc a nvlist and add a string for the endpoint.
470 * Return zero for success, non-zero for failure.
471 */
472 static int
etm_get_ep_nvl(fmd_hdl_t * hdl,etm_epmap_t * mp)473 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
474 {
475 /*
476 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
477 * in fmd when this nvlist_t is free'd.
478 */
479 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
480
481 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
482 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
483 "for %s", mp->epm_ep_str);
484 nvlist_free(mp->epm_ep_nvl);
485 return (1);
486 }
487
488 return (0);
489 }
490
491 /*
492 * Free the nvlist for the endpoint_id string.
493 */
494 /*ARGSUSED*/
495 static void
etm_free_ep_nvl(fmd_hdl_t * hdl,etm_epmap_t * mp)496 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
497 {
498 nvlist_free(mp->epm_ep_nvl);
499 }
500
501 /*
502 * Check for a duplicate endpoint/peer string.
503 */
504 /*ARGSUSED*/
505 static int
etm_check_dup_ep_str(fmd_hdl_t * hdl,char * epname)506 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
507 {
508 etm_epmap_t *mp;
509
510 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
511 if (strcmp(epname, mp->epm_ep_str) == 0)
512 return (1);
513
514 return (0);
515 }
516
517 /*
518 * Attempt to re-open a connection with the remote endpoint.
519 */
520 static void
etm_reconnect(fmd_hdl_t * hdl,etm_epmap_t * mp)521 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
522 {
523 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
524 if (gethrtime() < mp->epm_reconn_end) {
525 if ((mp->epm_oconn = etm_xport_open(hdl,
526 mp->epm_tlhdl)) == NULL) {
527 fmd_hdl_debug(hdl, "reconnect failed for %s",
528 mp->epm_ep_str);
529 mp->epm_timer_id = fmd_timer_install(hdl, mp,
530 NULL, Reconn_interval);
531 mp->epm_timer_in_use = 1;
532 } else {
533 fmd_hdl_debug(hdl, "reconnect success for %s",
534 mp->epm_ep_str);
535 mp->epm_reconn_end = 0;
536 mp->epm_cstat = C_OPEN;
537 }
538 } else {
539 fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
540 mp->epm_ep_str);
541 mp->epm_reconn_end = 0;
542 mp->epm_cstat = C_TIMED_OUT;
543 }
544 }
545
546 if (mp->epm_cstat == C_OPEN) {
547 fmd_xprt_resume(hdl, mp->epm_xprthdl);
548 mp->epm_qstat = Q_OPEN;
549 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str);
550 }
551 }
552
553 /*
554 * Suspend a given connection and setup for reconnection retries.
555 * Assume caller holds lock on epm_lock.
556 */
557 static void
etm_suspend_reconnect(fmd_hdl_t * hdl,etm_epmap_t * mp)558 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
559 {
560 (void) pthread_mutex_lock(&Etm_mod_lock);
561 if (Etm_exit) {
562 (void) pthread_mutex_unlock(&Etm_mod_lock);
563 return;
564 }
565 (void) pthread_mutex_unlock(&Etm_mod_lock);
566
567 if (mp->epm_oconn != NULL) {
568 (void) etm_xport_close(hdl, mp->epm_oconn);
569 mp->epm_oconn = NULL;
570 }
571
572 mp->epm_reconn_end = gethrtime() + Reconn_timeout;
573 mp->epm_cstat = C_UNINITIALIZED;
574
575 if (mp->epm_xprthdl != NULL) {
576 fmd_xprt_suspend(hdl, mp->epm_xprthdl);
577 mp->epm_qstat = Q_SUSPENDED;
578 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
579
580 if (mp->epm_timer_in_use == 0) {
581 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
582 Reconn_interval);
583 mp->epm_timer_in_use = 1;
584 }
585 }
586 }
587
588 /*
589 * Reinitialize the connection. The old fmd_xprt_t handle must be
590 * removed/closed first.
591 * Assume caller holds lock on epm_lock.
592 */
593 static void
etm_reinit(fmd_hdl_t * hdl,etm_epmap_t * mp)594 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
595 {
596 /*
597 * To avoid a deadlock, wait for etm_send to finish before
598 * calling fmd_xprt_close()
599 */
600 while (mp->epm_txbusy)
601 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
602
603 if (mp->epm_xprthdl != NULL) {
604 fmd_xprt_close(hdl, mp->epm_xprthdl);
605 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
606 mp->epm_xprthdl = NULL;
607 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
608 mp->epm_ep_nvl = NULL;
609 }
610
611 if (mp->epm_timer_in_use) {
612 fmd_timer_remove(hdl, mp->epm_timer_id);
613 mp->epm_timer_in_use = 0;
614 }
615
616 if (mp->epm_oconn != NULL) {
617 (void) etm_xport_close(hdl, mp->epm_oconn);
618 mp->epm_oconn = NULL;
619 }
620
621 mp->epm_cstat = C_UNINITIALIZED;
622 mp->epm_qstat = Q_UNINITIALIZED;
623 }
624
625 /*
626 * Receive data from ETM transport layer.
627 * Note : This is not the fmdo_recv entry point.
628 *
629 */
630 static int
etm_recv(fmd_hdl_t * hdl,etm_xport_conn_t conn,etm_epmap_t * mp)631 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
632 {
633 size_t buflen, hdrlen;
634 void *buf;
635 char hbuf[ETM_HDRLEN];
636 int hdrstat, rv;
637
638 hdrlen = ETM_HDRLEN;
639
640 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
641 fmd_hdl_debug(hdl, "failed to read header from %s",
642 mp->epm_ep_str);
643 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
644 return (EIO);
645 }
646
647 hdrstat = etm_check_hdr(hdl, mp, hbuf);
648
649 switch (hdrstat) {
650 case ETM_HDR_INVALID:
651 (void) pthread_mutex_lock(&mp->epm_lock);
652 if (mp->epm_cstat == C_OPEN)
653 mp->epm_cstat = C_CLOSED;
654 (void) pthread_mutex_unlock(&mp->epm_lock);
655
656 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
657 rv = ECANCELED;
658 break;
659
660 case ETM_HDR_BADTYPE:
661 case ETM_HDR_BADVERSION:
662 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
663
664 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
665 hdrlen)) != hdrlen) {
666 fmd_hdl_debug(hdl, "failed to write NAK to %s",
667 mp->epm_ep_str);
668 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
669 return (EIO);
670 }
671
672 (void) pthread_mutex_lock(&mp->epm_lock);
673 mp->epm_cstat = C_LIMBO;
674 (void) pthread_mutex_unlock(&mp->epm_lock);
675
676 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
677 rv = ENOTSUP;
678 break;
679
680 case ETM_HDR_C_HELLO:
681 /* Client is initiating a startup handshake */
682 (void) pthread_mutex_lock(&mp->epm_lock);
683 etm_reinit(hdl, mp);
684 mp->epm_qstat = Q_INIT_PENDING;
685 (void) pthread_mutex_unlock(&mp->epm_lock);
686
687 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
688
689 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
690 hdrlen)) != hdrlen) {
691 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
692 mp->epm_ep_str);
693 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
694 return (EIO);
695 }
696
697 rv = 0;
698 break;
699
700 case ETM_HDR_ACK:
701 (void) pthread_mutex_lock(&mp->epm_lock);
702 if (mp->epm_qstat == Q_INIT_PENDING) {
703 /* This is client's ACK from startup handshake */
704 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
705 if (mp->epm_ep_nvl == NULL)
706 (void) etm_get_ep_nvl(hdl, mp);
707
708 /*
709 * Call fmd_xprt_open and fmd_xprt_setspecific with
710 * Etm_mod_lock held to avoid race with etm_send thread.
711 */
712 (void) pthread_mutex_lock(&Etm_mod_lock);
713 if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
714 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
715 fmd_hdl_abort(hdl, "Failed to init xprthdl "
716 "for %s", mp->epm_ep_str);
717 }
718 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
719 (void) pthread_mutex_unlock(&Etm_mod_lock);
720
721 mp->epm_qstat = Q_OPEN;
722 (void) pthread_mutex_unlock(&mp->epm_lock);
723 fmd_hdl_debug(hdl, "queue open for %s",
724 mp->epm_ep_str);
725 } else {
726 (void) pthread_mutex_unlock(&mp->epm_lock);
727 fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
728 "from %s\n", mp->epm_ep_str);
729 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
730 }
731
732 rv = 0;
733 break;
734
735 case ETM_HDR_SHUTDOWN:
736 fmd_hdl_debug(hdl, "received shutdown from %s",
737 mp->epm_ep_str);
738
739 (void) pthread_mutex_lock(&mp->epm_lock);
740
741 etm_reinit(hdl, mp);
742
743 if (IS_CLIENT(mp)) {
744 /*
745 * A server shutdown is considered to be temporary.
746 * Prepare for reconnection.
747 */
748 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
749 Reconn_interval);
750
751 mp->epm_timer_in_use = 1;
752 }
753
754 (void) pthread_mutex_unlock(&mp->epm_lock);
755
756 rv = ECANCELED;
757 break;
758
759 case ETM_HDR_MSG:
760 (void) pthread_mutex_lock(&mp->epm_lock);
761 if (mp->epm_qstat == Q_UNINITIALIZED) {
762 /* Peer (client) is unaware that we've restarted */
763 (void) pthread_mutex_unlock(&mp->epm_lock);
764 hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
765 ETM_HDR_S_RESTART, 0);
766
767 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
768 hdrlen)) != hdrlen) {
769 fmd_hdl_debug(hdl, "failed to write S_RESTART "
770 "to %s", mp->epm_ep_str);
771 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
772 return (EIO);
773 }
774
775 return (ECANCELED);
776 }
777 (void) pthread_mutex_unlock(&mp->epm_lock);
778
779 buflen = etm_get_msglen(hbuf);
780 ALLOC_BUF(hdl, buf, buflen);
781
782 if (etm_xport_read(hdl, conn, Rw_timeout, buf,
783 buflen) != buflen) {
784 fmd_hdl_debug(hdl, "failed to read message from %s",
785 mp->epm_ep_str);
786 FREE_BUF(hdl, buf, buflen);
787 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
788 return (EIO);
789 }
790
791 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
792 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
793
794 etm_hex_dump(hdl, buf, buflen, 0);
795
796 if (etm_post_msg(hdl, mp, buf, buflen)) {
797 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
798 FREE_BUF(hdl, buf, buflen);
799 return (EIO);
800 }
801
802 FREE_BUF(hdl, buf, buflen);
803
804 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
805
806 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
807 hdrlen)) != hdrlen) {
808 fmd_hdl_debug(hdl, "failed to write ACK to %s",
809 mp->epm_ep_str);
810 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
811 return (EIO);
812 }
813
814 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
815
816 /*
817 * If we got this far and the current state of the
818 * outbound/sending connection is TIMED_OUT or
819 * LIMBO, then we should reinitialize it.
820 */
821 (void) pthread_mutex_lock(&mp->epm_lock);
822 if (mp->epm_cstat == C_TIMED_OUT ||
823 mp->epm_cstat == C_LIMBO) {
824 if (mp->epm_oconn != NULL) {
825 (void) etm_xport_close(hdl, mp->epm_oconn);
826 mp->epm_oconn = NULL;
827 }
828 mp->epm_cstat = C_UNINITIALIZED;
829 fmd_xprt_resume(hdl, mp->epm_xprthdl);
830 if (mp->epm_timer_in_use) {
831 fmd_timer_remove(hdl, mp->epm_timer_id);
832 mp->epm_timer_in_use = 0;
833 }
834 mp->epm_qstat = Q_OPEN;
835 fmd_hdl_debug(hdl, "queue resumed for %s",
836 mp->epm_ep_str);
837 }
838 (void) pthread_mutex_unlock(&mp->epm_lock);
839
840 rv = 0;
841 break;
842
843 default:
844 fmd_hdl_debug(hdl, "protocol error, unexpected header "
845 "from %s : %d", mp->epm_ep_str, hdrstat);
846 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
847 rv = 0;
848 }
849
850 return (rv);
851 }
852
853 /*
854 * ETM transport layer callback function.
855 * The transport layer calls this function to :
856 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV)
857 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
858 */
859 static int
etm_cb_func(fmd_hdl_t * hdl,etm_xport_conn_t conn,etm_cb_flag_t flag,void * arg)860 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
861 void *arg)
862 {
863 etm_epmap_t *mp = (etm_epmap_t *)arg;
864 int rv = 0;
865
866 (void) pthread_mutex_lock(&Etm_mod_lock);
867 if (Etm_exit) {
868 (void) pthread_mutex_unlock(&Etm_mod_lock);
869 return (ECANCELED);
870 }
871 (void) pthread_mutex_unlock(&Etm_mod_lock);
872
873 switch (flag) {
874 case ETM_CBFLAG_RECV:
875 rv = etm_recv(hdl, conn, mp);
876 break;
877 case ETM_CBFLAG_REINIT:
878 (void) pthread_mutex_lock(&mp->epm_lock);
879 etm_reinit(hdl, mp);
880 etm_send_shutdown(hdl, mp);
881 (void) pthread_mutex_unlock(&mp->epm_lock);
882 /*
883 * Return ECANCELED so the transport layer will close the
884 * server connection. The transport layer is responsible for
885 * reestablishing this connection (should a connection request
886 * arrive from the peer).
887 */
888 rv = ECANCELED;
889 break;
890 default:
891 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
892 rv = ENOTSUP;
893 }
894
895 return (rv);
896 }
897
898 /*
899 * Allocate and initialize an etm_epmap_t struct for the given endpoint
900 * name string.
901 */
902 static void
etm_init_epmap(fmd_hdl_t * hdl,char * epname,int flags)903 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
904 {
905 etm_epmap_t *newmap;
906
907 if (etm_check_dup_ep_str(hdl, epname)) {
908 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
909 return;
910 }
911
912 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
913 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
914 newmap->epm_xprtflags = flags;
915 newmap->epm_cstat = C_UNINITIALIZED;
916 newmap->epm_qstat = Q_UNINITIALIZED;
917 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */
918 newmap->epm_txbusy = 0;
919
920 (void) pthread_mutex_init(&newmap->epm_lock, NULL);
921 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
922
923 if (etm_get_ep_nvl(hdl, newmap)) {
924 fmd_hdl_strfree(hdl, newmap->epm_ep_str);
925 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
926 return;
927 }
928
929 (void) pthread_mutex_lock(&newmap->epm_lock);
930
931 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
932 etm_cb_func, newmap)) == NULL) {
933 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
934 newmap->epm_ep_str);
935 etm_free_ep_nvl(hdl, newmap);
936 (void) pthread_mutex_unlock(&newmap->epm_lock);
937 (void) pthread_mutex_destroy(&newmap->epm_lock);
938 fmd_hdl_strfree(hdl, newmap->epm_ep_str);
939 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
940 return;
941 }
942
943 if (IS_CLIENT(newmap)) {
944 if (etm_handle_startup(hdl, newmap)) {
945 /*
946 * For whatever reason, we could not complete the
947 * startup handshake with the server. Set the timer
948 * and try again.
949 */
950 if (newmap->epm_oconn != NULL) {
951 (void) etm_xport_close(hdl, newmap->epm_oconn);
952 newmap->epm_oconn = NULL;
953 }
954 newmap->epm_cstat = C_UNINITIALIZED;
955 newmap->epm_qstat = Q_UNINITIALIZED;
956 newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
957 NULL, Reconn_interval);
958 newmap->epm_timer_in_use = 1;
959 }
960 } else {
961 /*
962 * We may be restarting after a crash. If so, the client
963 * may be unaware of this.
964 */
965 etm_send_shutdown(hdl, newmap);
966 }
967
968 /* Add this transport instance handle to the list */
969 newmap->epm_next = Epmap_head;
970 Epmap_head = newmap;
971
972 (void) pthread_mutex_unlock(&newmap->epm_lock);
973
974 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
975 }
976
977 /*
978 * Parse the given property list string and call etm_init_epmap
979 * for each endpoint.
980 */
981 static void
etm_create_epmaps(fmd_hdl_t * hdl,char * eplist,int flags)982 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
983 {
984 char *epstr, *ep, *prefix, *lasts, *numstr;
985 char epname[MAXPATHLEN];
986 size_t slen, nlen;
987 int beg, end, i;
988
989 if (eplist == NULL)
990 return;
991 /*
992 * Create a copy of eplist for parsing.
993 * strtok/strtok_r(3C) will insert null chars to the string.
994 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
995 */
996 slen = strlen(eplist);
997 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
998 (void) strcpy(epstr, eplist);
999
1000 /*
1001 * The following are supported for the "client_list" and
1002 * "server_list" properties :
1003 *
1004 * A space-separated list of endpoints.
1005 * "dev:///dom0 dev:///dom1 dev:///dom2"
1006 *
1007 * An array syntax for a range of instances.
1008 * "dev:///dom[0:2]"
1009 *
1010 * A combination of both.
1011 * "dev:///dom0 dev:///dom[1:2]"
1012 */
1013 ep = strtok_r(epstr, " ", &lasts);
1014 while (ep != NULL) {
1015 if (strchr(ep, '[') != NULL) {
1016 /*
1017 * This string is using array syntax.
1018 * Check the string for correct syntax.
1019 */
1020 if ((strchr(ep, ':') == NULL) ||
1021 (strchr(ep, ']') == NULL)) {
1022 fmd_hdl_error(hdl, "Syntax error in property "
1023 "that includes : %s\n", ep);
1024 ep = strtok_r(NULL, " ", &lasts);
1025 continue;
1026 }
1027
1028 /* expand the array syntax */
1029 prefix = strtok(ep, "[");
1030
1031 numstr = strtok(NULL, ":");
1032 if ((numstr == NULL) || (!isdigit(*numstr))) {
1033 fmd_hdl_error(hdl, "Syntax error in property "
1034 "that includes : %s[\n", prefix);
1035 ep = strtok_r(NULL, " ", &lasts);
1036 continue;
1037 }
1038 beg = atoi(numstr);
1039
1040 numstr = strtok(NULL, "]");
1041 if ((numstr == NULL) || (!isdigit(*numstr))) {
1042 fmd_hdl_error(hdl, "Syntax error in property "
1043 "that includes : %s[\n", prefix);
1044 ep = strtok_r(NULL, " ", &lasts);
1045 continue;
1046 }
1047 end = atoi(numstr);
1048
1049 nlen = strlen(prefix) + ETM_EP_INST_MAX;
1050
1051 if (nlen > MAXPATHLEN) {
1052 fmd_hdl_error(hdl, "Endpoint prop string "
1053 "exceeds MAXPATHLEN\n");
1054 ep = strtok_r(NULL, " ", &lasts);
1055 continue;
1056 }
1057
1058 for (i = beg; i <= end; i++) {
1059 bzero(epname, MAXPATHLEN);
1060 (void) snprintf(epname, nlen, "%s%d",
1061 prefix, i);
1062 etm_init_epmap(hdl, epname, flags);
1063 }
1064 } else {
1065 etm_init_epmap(hdl, ep, flags);
1066 }
1067
1068 ep = strtok_r(NULL, " ", &lasts);
1069 }
1070
1071 fmd_hdl_free(hdl, epstr, slen + 1);
1072 }
1073
1074 /*
1075 * Free the transport infrastructure for an endpoint.
1076 */
1077 static void
etm_free_epmap(fmd_hdl_t * hdl,etm_epmap_t * mp)1078 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1079 {
1080 size_t hdrlen;
1081 char hbuf[ETM_HDRLEN];
1082
1083 (void) pthread_mutex_lock(&mp->epm_lock);
1084
1085 /*
1086 * If an etm_send thread is in progress, wait for it to finish.
1087 * The etm_recv thread is managed by the transport layer and will
1088 * be destroyed with etm_xport_fini().
1089 */
1090 while (mp->epm_txbusy)
1091 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1092
1093 if (mp->epm_timer_in_use)
1094 fmd_timer_remove(hdl, mp->epm_timer_id);
1095
1096 if (mp->epm_oconn != NULL) {
1097 hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1098 ETM_HDR_SHUTDOWN, 0);
1099 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1100 hdrlen);
1101 (void) etm_xport_close(hdl, mp->epm_oconn);
1102 mp->epm_oconn = NULL;
1103 }
1104
1105 if (mp->epm_xprthdl != NULL) {
1106 fmd_xprt_close(hdl, mp->epm_xprthdl);
1107 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1108 mp->epm_ep_nvl = NULL;
1109 }
1110
1111 if (mp->epm_ep_nvl != NULL)
1112 etm_free_ep_nvl(hdl, mp);
1113
1114 if (mp->epm_tlhdl != NULL)
1115 (void) etm_xport_fini(hdl, mp->epm_tlhdl);
1116
1117 (void) pthread_mutex_unlock(&mp->epm_lock);
1118 (void) pthread_mutex_destroy(&mp->epm_lock);
1119 fmd_hdl_strfree(hdl, mp->epm_ep_str);
1120 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1121 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1122 }
1123
1124 /*
1125 * FMD entry points
1126 */
1127
1128 /*
1129 * FMD fmdo_send entry point.
1130 * Send an event to the remote endpoint and receive an ACK.
1131 */
1132 static int
etm_send(fmd_hdl_t * hdl,fmd_xprt_t * xprthdl,fmd_event_t * ep,nvlist_t * nvl)1133 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1134 {
1135 etm_epmap_t *mp;
1136 nvlist_t *msgnvl;
1137 int hdrstat, rv, cnt = 0;
1138 char *buf, *nvbuf, *class;
1139 size_t nvsize, buflen, hdrlen;
1140 struct timespec tms;
1141
1142 (void) pthread_mutex_lock(&Etm_mod_lock);
1143 if (Etm_exit) {
1144 (void) pthread_mutex_unlock(&Etm_mod_lock);
1145 return (FMD_SEND_RETRY);
1146 }
1147 (void) pthread_mutex_unlock(&Etm_mod_lock);
1148
1149 mp = fmd_xprt_getspecific(hdl, xprthdl);
1150
1151 for (;;) {
1152 if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1153 break;
1154 } else {
1155 /*
1156 * Another thread may be (1) trying to close this
1157 * fmd_xprt_t, or (2) posting an event to it.
1158 * If (1), don't want to spend too much time here.
1159 * If (2), allow it to finish and release epm_lock.
1160 */
1161 if (cnt++ < 10) {
1162 tms.tv_sec = 0;
1163 tms.tv_nsec = (cnt * 10000);
1164 (void) nanosleep(&tms, NULL);
1165
1166 } else {
1167 return (FMD_SEND_RETRY);
1168 }
1169 }
1170 }
1171
1172 mp->epm_txbusy++;
1173
1174 if (mp->epm_qstat == Q_UNINITIALIZED) {
1175 mp->epm_txbusy--;
1176 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1177 (void) pthread_mutex_unlock(&mp->epm_lock);
1178 return (FMD_SEND_FAILED);
1179 }
1180
1181 if (mp->epm_cstat == C_CLOSED) {
1182 etm_suspend_reconnect(hdl, mp);
1183 mp->epm_txbusy--;
1184 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1185 (void) pthread_mutex_unlock(&mp->epm_lock);
1186 return (FMD_SEND_RETRY);
1187 }
1188
1189 if (mp->epm_cstat == C_LIMBO) {
1190 if (mp->epm_oconn != NULL) {
1191 (void) etm_xport_close(hdl, mp->epm_oconn);
1192 mp->epm_oconn = NULL;
1193 }
1194
1195 fmd_xprt_suspend(hdl, xprthdl);
1196 mp->epm_qstat = Q_SUSPENDED;
1197 mp->epm_txbusy--;
1198 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1199 (void) pthread_mutex_unlock(&mp->epm_lock);
1200 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1201 return (FMD_SEND_RETRY);
1202 }
1203
1204 if (mp->epm_oconn == NULL) {
1205 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1206 == NULL) {
1207 etm_suspend_reconnect(hdl, mp);
1208 mp->epm_txbusy--;
1209 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1210 (void) pthread_mutex_unlock(&mp->epm_lock);
1211 return (FMD_SEND_RETRY);
1212 } else {
1213 mp->epm_cstat = C_OPEN;
1214 }
1215 }
1216
1217 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1218 fmd_hdl_abort(hdl, "No class string in nvlist");
1219
1220 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1221 if (msgnvl == NULL) {
1222 mp->epm_txbusy--;
1223 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1224 (void) pthread_mutex_unlock(&mp->epm_lock);
1225 fmd_hdl_error(hdl, "Failed to translate event %p\n",
1226 (void *) ep);
1227 return (FMD_SEND_FAILED);
1228 }
1229
1230 rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1231 if (rv == ETM_XPORT_FILTER_DROP) {
1232 mp->epm_txbusy--;
1233 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1234 (void) pthread_mutex_unlock(&mp->epm_lock);
1235 fmd_hdl_debug(hdl, "send_filter dropped event");
1236 nvlist_free(msgnvl);
1237 INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1238 return (FMD_SEND_SUCCESS);
1239 } else if (rv == ETM_XPORT_FILTER_ERROR) {
1240 fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1241 INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1242 /* Still send event */
1243 }
1244
1245 (void) pthread_mutex_unlock(&mp->epm_lock);
1246
1247 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1248
1249 hdrlen = ETM_HDRLEN;
1250 buflen = nvsize + hdrlen;
1251
1252 ALLOC_BUF(hdl, buf, buflen);
1253
1254 nvbuf = buf + hdrlen;
1255
1256 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1257
1258 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1259 (void) pthread_mutex_lock(&mp->epm_lock);
1260 mp->epm_txbusy--;
1261 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1262 (void) pthread_mutex_unlock(&mp->epm_lock);
1263 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1264 nvlist_free(msgnvl);
1265 FREE_BUF(hdl, buf, buflen);
1266 return (FMD_SEND_FAILED);
1267 }
1268
1269 nvlist_free(msgnvl);
1270
1271 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1272 buflen) != buflen) {
1273 fmd_hdl_debug(hdl, "failed to send message to %s",
1274 mp->epm_ep_str);
1275 (void) pthread_mutex_lock(&mp->epm_lock);
1276 etm_suspend_reconnect(hdl, mp);
1277 mp->epm_txbusy--;
1278 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1279 (void) pthread_mutex_unlock(&mp->epm_lock);
1280 FREE_BUF(hdl, buf, buflen);
1281 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1282 return (FMD_SEND_RETRY);
1283 }
1284
1285 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1286 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1287
1288 etm_hex_dump(hdl, nvbuf, nvsize, 1);
1289
1290 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1291 hdrlen) != hdrlen) {
1292 fmd_hdl_debug(hdl, "failed to read ACK from %s",
1293 mp->epm_ep_str);
1294 (void) pthread_mutex_lock(&mp->epm_lock);
1295 etm_suspend_reconnect(hdl, mp);
1296 mp->epm_txbusy--;
1297 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1298 (void) pthread_mutex_unlock(&mp->epm_lock);
1299 FREE_BUF(hdl, buf, buflen);
1300 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1301 return (FMD_SEND_RETRY);
1302 }
1303
1304 hdrstat = etm_check_hdr(hdl, mp, buf);
1305 FREE_BUF(hdl, buf, buflen);
1306
1307 if (hdrstat == ETM_HDR_ACK) {
1308 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1309 } else {
1310 (void) pthread_mutex_lock(&mp->epm_lock);
1311
1312 (void) etm_xport_close(hdl, mp->epm_oconn);
1313 mp->epm_oconn = NULL;
1314
1315 if (hdrstat == ETM_HDR_NAK) {
1316 /* Peer received a bad value in the header */
1317 if (mp->epm_xprthdl != NULL) {
1318 mp->epm_cstat = C_LIMBO;
1319 fmd_xprt_suspend(hdl, xprthdl);
1320 mp->epm_qstat = Q_SUSPENDED;
1321 fmd_hdl_debug(hdl, "received NAK, queue "
1322 "suspended for %s", mp->epm_ep_str);
1323 }
1324
1325 rv = FMD_SEND_RETRY;
1326
1327 } else if (hdrstat == ETM_HDR_S_RESTART) {
1328 /* Server has restarted */
1329 mp->epm_cstat = C_CLOSED;
1330 mp->epm_qstat = Q_UNINITIALIZED;
1331 fmd_hdl_debug(hdl, "server %s restarted",
1332 mp->epm_ep_str);
1333 /*
1334 * Cannot call fmd_xprt_close here, so we'll do it
1335 * on the timeout thread.
1336 */
1337 if (mp->epm_timer_in_use == 0) {
1338 mp->epm_timer_id = fmd_timer_install(
1339 hdl, mp, NULL, 0);
1340 mp->epm_timer_in_use = 1;
1341 }
1342
1343 /*
1344 * fault.* or list.* events will be replayed if a
1345 * transport is opened with the same auth.
1346 * Other events will be discarded.
1347 */
1348 rv = FMD_SEND_FAILED;
1349
1350 } else {
1351 mp->epm_cstat = C_CLOSED;
1352 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1353
1354 rv = FMD_SEND_RETRY;
1355 }
1356
1357 mp->epm_txbusy--;
1358
1359 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1360 (void) pthread_mutex_unlock(&mp->epm_lock);
1361
1362 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1363
1364 return (rv);
1365 }
1366
1367 (void) pthread_mutex_lock(&mp->epm_lock);
1368 mp->epm_txbusy--;
1369 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1370 (void) pthread_mutex_unlock(&mp->epm_lock);
1371
1372 return (FMD_SEND_SUCCESS);
1373 }
1374
1375 /*
1376 * FMD fmdo_timeout entry point..
1377 */
1378 /*ARGSUSED*/
1379 static void
etm_timeout(fmd_hdl_t * hdl,id_t id,void * data)1380 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1381 {
1382 etm_epmap_t *mp = (etm_epmap_t *)data;
1383
1384 (void) pthread_mutex_lock(&mp->epm_lock);
1385
1386 mp->epm_timer_in_use = 0;
1387
1388 if (mp->epm_qstat == Q_UNINITIALIZED) {
1389 /* Server has shutdown and we (client) need to reconnect */
1390 if (mp->epm_xprthdl != NULL) {
1391 fmd_xprt_close(hdl, mp->epm_xprthdl);
1392 fmd_hdl_debug(hdl, "queue closed for %s",
1393 mp->epm_ep_str);
1394 mp->epm_xprthdl = NULL;
1395 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1396 mp->epm_ep_nvl = NULL;
1397 }
1398
1399 if (mp->epm_ep_nvl == NULL)
1400 (void) etm_get_ep_nvl(hdl, mp);
1401
1402 if (etm_handle_startup(hdl, mp)) {
1403 if (mp->epm_oconn != NULL) {
1404 (void) etm_xport_close(hdl, mp->epm_oconn);
1405 mp->epm_oconn = NULL;
1406 }
1407 mp->epm_cstat = C_UNINITIALIZED;
1408 mp->epm_qstat = Q_UNINITIALIZED;
1409 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1410 Reconn_interval);
1411 mp->epm_timer_in_use = 1;
1412 }
1413 } else {
1414 etm_reconnect(hdl, mp);
1415 }
1416
1417 (void) pthread_mutex_unlock(&mp->epm_lock);
1418 }
1419
1420 /*
1421 * FMD Module declarations
1422 */
1423 static const fmd_hdl_ops_t etm_ops = {
1424 NULL, /* fmdo_recv */
1425 etm_timeout, /* fmdo_timeout */
1426 NULL, /* fmdo_close */
1427 NULL, /* fmdo_stats */
1428 NULL, /* fmdo_gc */
1429 etm_send, /* fmdo_send */
1430 };
1431
1432 static const fmd_prop_t etm_props[] = {
1433 { "client_list", FMD_TYPE_STRING, NULL },
1434 { "server_list", FMD_TYPE_STRING, NULL },
1435 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" },
1436 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1437 { "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1438 { "filter_path", FMD_TYPE_STRING, NULL },
1439 { NULL, 0, NULL }
1440 };
1441
1442 static const fmd_hdl_info_t etm_info = {
1443 "Event Transport Module", "2.0", &etm_ops, etm_props
1444 };
1445
1446 /*
1447 * Initialize the transport for use by ETM.
1448 */
1449 void
_fmd_init(fmd_hdl_t * hdl)1450 _fmd_init(fmd_hdl_t *hdl)
1451 {
1452 char *propstr;
1453
1454 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1455 return; /* invalid data in configuration file */
1456 }
1457
1458 /* Create global stats */
1459 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1460 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1461
1462 /* Get module properties */
1463 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1464 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1465 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1466
1467 propstr = fmd_prop_get_string(hdl, "client_list");
1468 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1469 fmd_prop_free_string(hdl, propstr);
1470
1471 propstr = fmd_prop_get_string(hdl, "server_list");
1472 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1473 fmd_prop_free_string(hdl, propstr);
1474
1475 if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1476 fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1477 fmd_hdl_unregister(hdl);
1478 return;
1479 }
1480 }
1481
1482 /*
1483 * Teardown the transport
1484 */
1485 void
_fmd_fini(fmd_hdl_t * hdl)1486 _fmd_fini(fmd_hdl_t *hdl)
1487 {
1488 etm_epmap_t *mp, *next;
1489
1490 (void) pthread_mutex_lock(&Etm_mod_lock);
1491 Etm_exit = 1;
1492 (void) pthread_mutex_unlock(&Etm_mod_lock);
1493
1494 mp = Epmap_head;
1495
1496 while (mp) {
1497 next = mp->epm_next;
1498 etm_free_epmap(hdl, mp);
1499 mp = next;
1500 }
1501
1502 fmd_hdl_unregister(hdl);
1503 }
1504