1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMA Event Transport Module 31 * 32 * Plugin for sending/receiving FMA events to/from a remote endoint. 33 */ 34 35 #include <netinet/in.h> 36 #include <errno.h> 37 #include <sys/fm/protocol.h> 38 #include <sys/sysmacros.h> 39 #include <pthread.h> 40 #include <strings.h> 41 #include <ctype.h> 42 #include <link.h> 43 #include <libnvpair.h> 44 #include "etm_xport_api.h" 45 #include "etm_proto.h" 46 47 /* 48 * ETM declarations 49 */ 50 51 typedef enum etm_connection_status { 52 C_UNINITIALIZED = 0, 53 C_OPEN, /* Connection is open */ 54 C_CLOSED, /* Connection is closed */ 55 C_LIMBO, /* Bad value in header from peer */ 56 C_TIMED_OUT /* Reconnection to peer timed out */ 57 } etm_connstat_t; 58 59 typedef enum etm_fmd_queue_status { 60 Q_UNINITIALIZED = 100, 61 Q_INIT_PENDING, /* Queue initialization in progress */ 62 Q_OPEN, /* Queue is open */ 63 Q_SUSPENDED /* Queue is suspended */ 64 } etm_qstat_t; 65 66 /* Per endpoint data */ 67 typedef struct etm_endpoint_map { 68 uint8_t epm_ver; /* Protocol version being used */ 69 char *epm_ep_str; /* Endpoint ID string */ 70 int epm_xprtflags; /* FMD transport open flags */ 71 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 72 pthread_mutex_t epm_lock; /* Protects remainder of struct */ 73 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 74 int epm_txbusy; /* Busy doing send/transmit */ 75 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 76 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 77 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 78 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 79 etm_connstat_t epm_cstat; /* Status of connection */ 80 id_t epm_timer_id; /* Timer id */ 81 int epm_timer_in_use; /* Indicates if timer is in use */ 82 hrtime_t epm_reconn_end; /* Reconnection end time */ 83 struct etm_endpoint_map *epm_next; 84 } etm_epmap_t; 85 86 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 87 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 88 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 89 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 90 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 91 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 92 93 #define ALLOC_BUF(hdl, buf, size) \ 94 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 95 96 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 97 98 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 99 100 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 101 (x)++; \ 102 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 103 } 104 105 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 106 (x)--; \ 107 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 108 } 109 110 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 111 (x) += (y); \ 112 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 113 } 114 115 /* 116 * Global variables 117 */ 118 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 119 /* Protects globals */ 120 static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 121 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 122 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 123 static int Etm_dump = 0; /* Enables hex dump for debug */ 124 static int Etm_exit = 0; /* Flag for exit */ 125 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 126 127 /* Module statistics */ 128 static struct etm_stats { 129 /* read counters */ 130 fmd_stat_t read_ack; 131 fmd_stat_t read_bytes; 132 fmd_stat_t read_msg; 133 fmd_stat_t post_filter; 134 /* write counters */ 135 fmd_stat_t write_ack; 136 fmd_stat_t write_bytes; 137 fmd_stat_t write_msg; 138 fmd_stat_t send_filter; 139 /* error counters */ 140 fmd_stat_t error_protocol; 141 fmd_stat_t error_drop_read; 142 fmd_stat_t error_read; 143 fmd_stat_t error_read_badhdr; 144 fmd_stat_t error_write; 145 fmd_stat_t error_send_filter; 146 fmd_stat_t error_post_filter; 147 /* misc */ 148 fmd_stat_t peer_count; 149 150 } Etm_stats = { 151 /* read counters */ 152 { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 153 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 154 { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 155 { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" }, 156 /* write counters */ 157 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 158 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 159 { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 160 { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" }, 161 /* ETM error counters */ 162 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 163 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 164 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 165 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 166 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 167 { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" }, 168 { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" }, 169 /* ETM Misc */ 170 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 171 }; 172 173 /* 174 * ETM Private functions 175 */ 176 177 /* 178 * Hex dump for debug. 179 */ 180 static void 181 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 182 { 183 int i, j, k; 184 int16_t *c; 185 186 if (Etm_dump == 0) 187 return; 188 189 j = buflen / 16; /* Number of complete 8-column rows */ 190 k = buflen % 16; /* Is there a last (non-8-column) row? */ 191 192 if (direction) 193 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 194 else 195 fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 196 197 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 198 199 /* Dump the complete 8-column rows */ 200 for (i = 0; i < j; i++) { 201 c = (int16_t *)buf + (i * 8); 202 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 203 *(c+0), *(c+1), *(c+2), *(c+3), 204 *(c+4), *(c+5), *(c+6), *(c+7)); 205 } 206 207 /* Dump the last (incomplete) row */ 208 c = (int16_t *)buf + (i * 8); 209 switch (k) { 210 case 4: 211 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 212 break; 213 case 8: 214 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 215 *(c+2), *(c+3)); 216 break; 217 case 12: 218 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 219 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 220 break; 221 } 222 223 fmd_hdl_debug(hdl, "--- End Dump ---"); 224 } 225 226 /* 227 * Provide the length of a message based on the data in the given ETM header. 228 */ 229 static size_t 230 etm_get_msglen(void *buf) 231 { 232 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 233 234 return (ntohl(hp->hdr_msglen)); 235 } 236 237 /* 238 * Check the contents of the ETM header for errors. 239 * Return the header type (hdr_type). 240 */ 241 static int 242 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 243 { 244 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 245 246 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 247 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 248 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 249 return (ETM_HDR_INVALID); 250 } 251 252 if ((hp->hdr_type == ETM_HDR_C_HELLO) || 253 (hp->hdr_type == ETM_HDR_S_HELLO)) { 254 /* Until version is negotiated, other fields may be wrong */ 255 return (hp->hdr_type); 256 } 257 258 if (hp->hdr_ver != mp->epm_ver) { 259 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 260 mp->epm_ep_str, hp->hdr_ver); 261 return (ETM_HDR_BADVERSION); 262 } 263 264 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 265 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 266 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 267 mp->epm_ep_str, hp->hdr_type); 268 return (ETM_HDR_BADTYPE); 269 } 270 271 return (hp->hdr_type); 272 } 273 274 /* 275 * Create an ETM header of a given type in the given buffer. 276 * Return length of header. 277 */ 278 static size_t 279 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 280 { 281 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 282 283 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 284 hp->hdr_ver = ver; 285 hp->hdr_type = type; 286 hp->hdr_msglen = htonl(msglen); 287 288 return (ETM_HDRLEN); 289 } 290 291 /* 292 * Convert message bytes to nvlist and post to fmd. 293 * Return zero for success, non-zero for failure. 294 * 295 * Note : nvl is free'd by fmd. 296 */ 297 static int 298 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 299 { 300 nvlist_t *nvl; 301 int rv; 302 303 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 304 fmd_hdl_error(hdl, "failed to unpack message"); 305 return (1); 306 } 307 308 rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str); 309 if (rv == ETM_XPORT_FILTER_DROP) { 310 fmd_hdl_debug(hdl, "post_filter dropped event"); 311 INCRSTAT(Etm_stats.post_filter.fmds_value.ui64); 312 nvlist_free(nvl); 313 return (0); 314 } else if (rv == ETM_XPORT_FILTER_ERROR) { 315 fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno)); 316 INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64); 317 /* Still post event */ 318 } 319 320 (void) pthread_mutex_lock(&mp->epm_lock); 321 (void) pthread_mutex_lock(&Etm_mod_lock); 322 if (!Etm_exit) { 323 (void) pthread_mutex_unlock(&Etm_mod_lock); 324 if (mp->epm_qstat == Q_OPEN) { 325 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 326 rv = 0; 327 } else if (mp->epm_qstat == Q_SUSPENDED) { 328 fmd_xprt_resume(hdl, mp->epm_xprthdl); 329 if (mp->epm_timer_in_use) { 330 fmd_timer_remove(hdl, mp->epm_timer_id); 331 mp->epm_timer_in_use = 0; 332 } 333 mp->epm_qstat = Q_OPEN; 334 fmd_hdl_debug(hdl, "queue resumed for %s", 335 mp->epm_ep_str); 336 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 337 rv = 0; 338 } else { 339 fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 340 mp->epm_qstat); 341 nvlist_free(nvl); 342 /* Remote peer will attempt to resend event */ 343 rv = 2; 344 } 345 } else { 346 (void) pthread_mutex_unlock(&Etm_mod_lock); 347 fmd_hdl_debug(hdl, "unable to post message, module exiting"); 348 nvlist_free(nvl); 349 /* Remote peer will attempt to resend event */ 350 rv = 3; 351 } 352 353 (void) pthread_mutex_unlock(&mp->epm_lock); 354 355 return (rv); 356 } 357 358 /* 359 * Handle the startup handshake to the server. The client always initiates 360 * the startup handshake. In the following sequence, we are the client and 361 * the remote endpoint is the server. 362 * 363 * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 364 * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 365 * Client sends ACK and transitions to Q_OPEN state. 366 * Server receives ACK and transitions to Q_OPEN state. 367 * 368 * Return 0 for success, nonzero for failure. 369 */ 370 static int 371 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 372 { 373 etm_proto_hdr_t *hp; 374 size_t hdrlen = ETM_HDRLEN; 375 int hdrstat; 376 char hbuf[ETM_HDRLEN]; 377 378 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 379 return (1); 380 381 mp->epm_cstat = C_OPEN; 382 383 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 384 385 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 386 hdrlen)) != hdrlen) { 387 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 388 mp->epm_ep_str); 389 return (2); 390 } 391 392 mp->epm_qstat = Q_INIT_PENDING; 393 394 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 395 hdrlen)) != hdrlen) { 396 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 397 mp->epm_ep_str); 398 return (3); 399 } 400 401 hdrstat = etm_check_hdr(hdl, mp, hbuf); 402 403 if (hdrstat != ETM_HDR_S_HELLO) { 404 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 405 "from %s", mp->epm_ep_str); 406 return (4); 407 } 408 409 /* 410 * Get version from the server. 411 * Currently, only one version is supported. 412 */ 413 hp = (etm_proto_hdr_t *)(void *)hbuf; 414 if (hp->hdr_ver != ETM_PROTO_V1) { 415 fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 416 mp->epm_ep_str, hp->hdr_ver); 417 return (5); 418 } 419 mp->epm_ver = hp->hdr_ver; 420 421 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 422 423 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 424 hdrlen)) != hdrlen) { 425 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 426 mp->epm_ep_str); 427 return (6); 428 } 429 430 /* 431 * Call fmd_xprt_open and fmd_xprt_setspecific with 432 * Etm_mod_lock held to avoid race with etm_send thread. 433 */ 434 (void) pthread_mutex_lock(&Etm_mod_lock); 435 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 436 mp->epm_ep_nvl, NULL)) == NULL) { 437 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 438 mp->epm_ep_str); 439 } 440 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 441 (void) pthread_mutex_unlock(&Etm_mod_lock); 442 443 mp->epm_qstat = Q_OPEN; 444 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 445 446 return (0); 447 } 448 449 /* 450 * Alloc a nvlist and add a string for the endpoint. 451 * Return zero for success, non-zero for failure. 452 */ 453 static int 454 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 455 { 456 /* 457 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 458 * in fmd when this nvlist_t is free'd. 459 */ 460 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 461 462 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 463 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 464 "for %s", mp->epm_ep_str); 465 nvlist_free(mp->epm_ep_nvl); 466 return (1); 467 } 468 469 return (0); 470 } 471 472 /* 473 * Free the nvlist for the endpoint_id string. 474 */ 475 /*ARGSUSED*/ 476 static void 477 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 478 { 479 nvlist_free(mp->epm_ep_nvl); 480 } 481 482 /* 483 * Check for a duplicate endpoint/peer string. 484 */ 485 /*ARGSUSED*/ 486 static int 487 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 488 { 489 etm_epmap_t *mp; 490 491 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 492 if (strcmp(epname, mp->epm_ep_str) == 0) 493 return (1); 494 495 return (0); 496 } 497 498 /* 499 * Attempt to re-open a connection with the remote endpoint. 500 */ 501 static void 502 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 503 { 504 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 505 if (gethrtime() < mp->epm_reconn_end) { 506 if ((mp->epm_oconn = etm_xport_open(hdl, 507 mp->epm_tlhdl)) == NULL) { 508 fmd_hdl_debug(hdl, "reconnect failed for %s", 509 mp->epm_ep_str); 510 mp->epm_timer_id = fmd_timer_install(hdl, mp, 511 NULL, Reconn_interval); 512 mp->epm_timer_in_use = 1; 513 } else { 514 fmd_hdl_debug(hdl, "reconnect success for %s", 515 mp->epm_ep_str); 516 mp->epm_reconn_end = 0; 517 mp->epm_cstat = C_OPEN; 518 } 519 } else { 520 fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 521 mp->epm_ep_str); 522 mp->epm_reconn_end = 0; 523 mp->epm_cstat = C_TIMED_OUT; 524 } 525 } 526 527 if (mp->epm_cstat == C_OPEN) { 528 fmd_xprt_resume(hdl, mp->epm_xprthdl); 529 mp->epm_qstat = Q_OPEN; 530 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 531 } 532 } 533 534 /* 535 * Suspend a given connection and setup for reconnection retries. 536 * Assume caller holds lock on epm_lock. 537 */ 538 static void 539 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 540 { 541 (void) pthread_mutex_lock(&Etm_mod_lock); 542 if (Etm_exit) { 543 (void) pthread_mutex_unlock(&Etm_mod_lock); 544 return; 545 } 546 (void) pthread_mutex_unlock(&Etm_mod_lock); 547 548 if (mp->epm_oconn != NULL) { 549 (void) etm_xport_close(hdl, mp->epm_oconn); 550 mp->epm_oconn = NULL; 551 } 552 553 mp->epm_reconn_end = gethrtime() + Reconn_timeout; 554 mp->epm_cstat = C_UNINITIALIZED; 555 556 if (mp->epm_xprthdl != NULL) { 557 fmd_xprt_suspend(hdl, mp->epm_xprthdl); 558 mp->epm_qstat = Q_SUSPENDED; 559 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 560 561 if (mp->epm_timer_in_use == 0) { 562 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 563 Reconn_interval); 564 mp->epm_timer_in_use = 1; 565 } 566 } 567 } 568 569 /* 570 * Reinitialize the connection. The old fmd_xprt_t handle must be 571 * removed/closed first. 572 * Assume caller holds lock on epm_lock. 573 */ 574 static void 575 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 576 { 577 /* 578 * To avoid a deadlock, wait for etm_send to finish before 579 * calling fmd_xprt_close() 580 */ 581 while (mp->epm_txbusy) 582 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 583 584 if (mp->epm_xprthdl != NULL) { 585 fmd_xprt_close(hdl, mp->epm_xprthdl); 586 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 587 mp->epm_xprthdl = NULL; 588 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 589 mp->epm_ep_nvl = NULL; 590 } 591 592 if (mp->epm_timer_in_use) { 593 fmd_timer_remove(hdl, mp->epm_timer_id); 594 mp->epm_timer_in_use = 0; 595 } 596 597 if (mp->epm_oconn != NULL) { 598 (void) etm_xport_close(hdl, mp->epm_oconn); 599 mp->epm_oconn = NULL; 600 } 601 602 mp->epm_cstat = C_UNINITIALIZED; 603 mp->epm_qstat = Q_UNINITIALIZED; 604 } 605 606 /* 607 * Receive data from ETM transport layer. 608 * Note : This is not the fmdo_recv entry point. 609 * 610 */ 611 static int 612 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 613 { 614 size_t buflen, hdrlen; 615 void *buf; 616 char hbuf[ETM_HDRLEN]; 617 int hdrstat, rv; 618 619 hdrlen = ETM_HDRLEN; 620 621 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 622 fmd_hdl_debug(hdl, "failed to read header from %s", 623 mp->epm_ep_str); 624 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 625 return (EIO); 626 } 627 628 hdrstat = etm_check_hdr(hdl, mp, hbuf); 629 630 switch (hdrstat) { 631 case ETM_HDR_INVALID: 632 (void) pthread_mutex_lock(&mp->epm_lock); 633 if (mp->epm_cstat == C_OPEN) 634 mp->epm_cstat = C_CLOSED; 635 (void) pthread_mutex_unlock(&mp->epm_lock); 636 637 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 638 rv = ECANCELED; 639 break; 640 641 case ETM_HDR_BADTYPE: 642 case ETM_HDR_BADVERSION: 643 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 644 645 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 646 hdrlen)) != hdrlen) { 647 fmd_hdl_debug(hdl, "failed to write NAK to %s", 648 mp->epm_ep_str); 649 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 650 return (EIO); 651 } 652 653 (void) pthread_mutex_lock(&mp->epm_lock); 654 mp->epm_cstat = C_LIMBO; 655 (void) pthread_mutex_unlock(&mp->epm_lock); 656 657 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 658 rv = ENOTSUP; 659 break; 660 661 case ETM_HDR_C_HELLO: 662 /* Client is initiating a startup handshake */ 663 (void) pthread_mutex_lock(&mp->epm_lock); 664 etm_reinit(hdl, mp); 665 mp->epm_qstat = Q_INIT_PENDING; 666 (void) pthread_mutex_unlock(&mp->epm_lock); 667 668 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 669 670 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 671 hdrlen)) != hdrlen) { 672 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 673 mp->epm_ep_str); 674 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 675 return (EIO); 676 } 677 678 rv = 0; 679 break; 680 681 case ETM_HDR_ACK: 682 (void) pthread_mutex_lock(&mp->epm_lock); 683 if (mp->epm_qstat == Q_INIT_PENDING) { 684 /* This is client's ACK from startup handshake */ 685 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 686 if (mp->epm_ep_nvl == NULL) 687 (void) etm_get_ep_nvl(hdl, mp); 688 689 /* 690 * Call fmd_xprt_open and fmd_xprt_setspecific with 691 * Etm_mod_lock held to avoid race with etm_send thread. 692 */ 693 (void) pthread_mutex_lock(&Etm_mod_lock); 694 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 695 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 696 fmd_hdl_abort(hdl, "Failed to init xprthdl " 697 "for %s", mp->epm_ep_str); 698 } 699 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 700 (void) pthread_mutex_unlock(&Etm_mod_lock); 701 702 mp->epm_qstat = Q_OPEN; 703 (void) pthread_mutex_unlock(&mp->epm_lock); 704 fmd_hdl_debug(hdl, "queue open for %s", 705 mp->epm_ep_str); 706 } else { 707 (void) pthread_mutex_unlock(&mp->epm_lock); 708 fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 709 "from %s\n", mp->epm_ep_str); 710 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 711 } 712 713 rv = 0; 714 break; 715 716 case ETM_HDR_SHUTDOWN: 717 fmd_hdl_debug(hdl, "received shutdown from %s", 718 mp->epm_ep_str); 719 720 (void) pthread_mutex_lock(&mp->epm_lock); 721 722 etm_reinit(hdl, mp); 723 724 if (IS_CLIENT(mp)) { 725 /* 726 * A server shutdown is considered to be temporary. 727 * Prepare for reconnection. 728 */ 729 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 730 Reconn_interval); 731 732 mp->epm_timer_in_use = 1; 733 } 734 735 (void) pthread_mutex_unlock(&mp->epm_lock); 736 737 rv = ECANCELED; 738 break; 739 740 case ETM_HDR_MSG: 741 (void) pthread_mutex_lock(&mp->epm_lock); 742 if (mp->epm_qstat == Q_UNINITIALIZED) { 743 /* Peer (client) is unaware that we've restarted */ 744 (void) pthread_mutex_unlock(&mp->epm_lock); 745 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 746 ETM_HDR_S_RESTART, 0); 747 748 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 749 hdrlen)) != hdrlen) { 750 fmd_hdl_debug(hdl, "failed to write S_RESTART " 751 "to %s", mp->epm_ep_str); 752 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 753 return (EIO); 754 } 755 756 return (ECANCELED); 757 } 758 (void) pthread_mutex_unlock(&mp->epm_lock); 759 760 buflen = etm_get_msglen(hbuf); 761 ALLOC_BUF(hdl, buf, buflen); 762 763 if (etm_xport_read(hdl, conn, Rw_timeout, buf, 764 buflen) != buflen) { 765 fmd_hdl_debug(hdl, "failed to read message from %s", 766 mp->epm_ep_str); 767 FREE_BUF(hdl, buf, buflen); 768 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 769 return (EIO); 770 } 771 772 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 773 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 774 775 etm_hex_dump(hdl, buf, buflen, 0); 776 777 if (etm_post_msg(hdl, mp, buf, buflen)) { 778 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 779 FREE_BUF(hdl, buf, buflen); 780 return (EIO); 781 } 782 783 FREE_BUF(hdl, buf, buflen); 784 785 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 786 787 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 788 hdrlen)) != hdrlen) { 789 fmd_hdl_debug(hdl, "failed to write ACK to %s", 790 mp->epm_ep_str); 791 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 792 return (EIO); 793 } 794 795 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 796 797 /* 798 * If we got this far and the current state of the 799 * outbound/sending connection is TIMED_OUT or 800 * LIMBO, then we should reinitialize it. 801 */ 802 (void) pthread_mutex_lock(&mp->epm_lock); 803 if (mp->epm_cstat == C_TIMED_OUT || 804 mp->epm_cstat == C_LIMBO) { 805 if (mp->epm_oconn != NULL) { 806 (void) etm_xport_close(hdl, mp->epm_oconn); 807 mp->epm_oconn = NULL; 808 } 809 mp->epm_cstat = C_UNINITIALIZED; 810 fmd_xprt_resume(hdl, mp->epm_xprthdl); 811 if (mp->epm_timer_in_use) { 812 fmd_timer_remove(hdl, mp->epm_timer_id); 813 mp->epm_timer_in_use = 0; 814 } 815 mp->epm_qstat = Q_OPEN; 816 fmd_hdl_debug(hdl, "queue resumed for %s", 817 mp->epm_ep_str); 818 } 819 (void) pthread_mutex_unlock(&mp->epm_lock); 820 821 rv = 0; 822 break; 823 824 default: 825 fmd_hdl_debug(hdl, "protocol error, unexpected header " 826 "from %s : %d", mp->epm_ep_str, hdrstat); 827 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 828 rv = 0; 829 } 830 831 return (rv); 832 } 833 834 /* 835 * ETM transport layer callback function. 836 * The transport layer calls this function to : 837 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 838 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 839 */ 840 static int 841 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 842 void *arg) 843 { 844 etm_epmap_t *mp = (etm_epmap_t *)arg; 845 int rv = 0; 846 847 (void) pthread_mutex_lock(&Etm_mod_lock); 848 if (Etm_exit) { 849 (void) pthread_mutex_unlock(&Etm_mod_lock); 850 return (ECANCELED); 851 } 852 (void) pthread_mutex_unlock(&Etm_mod_lock); 853 854 switch (flag) { 855 case ETM_CBFLAG_RECV: 856 rv = etm_recv(hdl, conn, mp); 857 break; 858 case ETM_CBFLAG_REINIT: 859 (void) pthread_mutex_lock(&mp->epm_lock); 860 etm_reinit(hdl, mp); 861 (void) pthread_mutex_unlock(&mp->epm_lock); 862 /* 863 * Return ECANCELED so the transport layer will close the 864 * server connection. The transport layer is responsible for 865 * reestablishing this connection (should a connection request 866 * arrive from the peer). 867 */ 868 rv = ECANCELED; 869 break; 870 default: 871 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 872 rv = ENOTSUP; 873 } 874 875 return (rv); 876 } 877 878 /* 879 * Allocate and initialize an etm_epmap_t struct for the given endpoint 880 * name string. 881 */ 882 static void 883 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 884 { 885 etm_epmap_t *newmap; 886 887 if (etm_check_dup_ep_str(hdl, epname)) { 888 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 889 return; 890 } 891 892 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 893 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 894 newmap->epm_xprtflags = flags; 895 newmap->epm_cstat = C_UNINITIALIZED; 896 newmap->epm_qstat = Q_UNINITIALIZED; 897 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 898 newmap->epm_txbusy = 0; 899 900 (void) pthread_mutex_init(&newmap->epm_lock, NULL); 901 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 902 903 if (etm_get_ep_nvl(hdl, newmap)) { 904 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 905 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 906 return; 907 } 908 909 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 910 etm_cb_func, newmap)) == NULL) { 911 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 912 newmap->epm_ep_str); 913 etm_free_ep_nvl(hdl, newmap); 914 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 915 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 916 return; 917 } 918 919 if (IS_CLIENT(newmap)) { 920 if (etm_handle_startup(hdl, newmap)) { 921 /* 922 * For whatever reason, we could not complete the 923 * startup handshake with the server. Set the timer 924 * and try again. 925 */ 926 if (newmap->epm_oconn != NULL) { 927 (void) etm_xport_close(hdl, newmap->epm_oconn); 928 newmap->epm_oconn = NULL; 929 } 930 newmap->epm_cstat = C_UNINITIALIZED; 931 newmap->epm_qstat = Q_UNINITIALIZED; 932 newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 933 NULL, Reconn_interval); 934 newmap->epm_timer_in_use = 1; 935 } 936 } 937 938 /* Add this transport instance handle to the list */ 939 newmap->epm_next = Epmap_head; 940 Epmap_head = newmap; 941 942 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 943 } 944 945 /* 946 * Parse the given property list string and call etm_init_epmap 947 * for each endpoint. 948 */ 949 static void 950 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 951 { 952 char *epstr, *ep, *prefix, *lasts, *numstr; 953 char epname[MAXPATHLEN]; 954 size_t slen, nlen; 955 int beg, end, i; 956 957 if (eplist == NULL) 958 return; 959 /* 960 * Create a copy of eplist for parsing. 961 * strtok/strtok_r(3C) will insert null chars to the string. 962 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 963 */ 964 slen = strlen(eplist); 965 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 966 (void) strcpy(epstr, eplist); 967 968 /* 969 * The following are supported for the "client_list" and 970 * "server_list" properties : 971 * 972 * A space-separated list of endpoints. 973 * "dev:///dom0 dev:///dom1 dev:///dom2" 974 * 975 * An array syntax for a range of instances. 976 * "dev:///dom[0:2]" 977 * 978 * A combination of both. 979 * "dev:///dom0 dev:///dom[1:2]" 980 */ 981 ep = strtok_r(epstr, " ", &lasts); 982 while (ep != NULL) { 983 if (strchr(ep, '[') != NULL) { 984 /* 985 * This string is using array syntax. 986 * Check the string for correct syntax. 987 */ 988 if ((strchr(ep, ':') == NULL) || 989 (strchr(ep, ']') == NULL)) { 990 fmd_hdl_error(hdl, "Syntax error in property " 991 "that includes : %s\n", ep); 992 ep = strtok_r(NULL, " ", &lasts); 993 continue; 994 } 995 996 /* expand the array syntax */ 997 prefix = strtok(ep, "["); 998 999 numstr = strtok(NULL, ":"); 1000 if ((numstr == NULL) || (!isdigit(*numstr))) { 1001 fmd_hdl_error(hdl, "Syntax error in property " 1002 "that includes : %s[\n", prefix); 1003 ep = strtok_r(NULL, " ", &lasts); 1004 continue; 1005 } 1006 beg = atoi(numstr); 1007 1008 numstr = strtok(NULL, "]"); 1009 if ((numstr == NULL) || (!isdigit(*numstr))) { 1010 fmd_hdl_error(hdl, "Syntax error in property " 1011 "that includes : %s[\n", prefix); 1012 ep = strtok_r(NULL, " ", &lasts); 1013 continue; 1014 } 1015 end = atoi(numstr); 1016 1017 nlen = strlen(prefix) + ETM_EP_INST_MAX; 1018 1019 if (nlen > MAXPATHLEN) { 1020 fmd_hdl_error(hdl, "Endpoint prop string " 1021 "exceeds MAXPATHLEN\n"); 1022 ep = strtok_r(NULL, " ", &lasts); 1023 continue; 1024 } 1025 1026 for (i = beg; i <= end; i++) { 1027 bzero(epname, MAXPATHLEN); 1028 (void) snprintf(epname, nlen, "%s%d", 1029 prefix, i); 1030 etm_init_epmap(hdl, epname, flags); 1031 } 1032 } else { 1033 etm_init_epmap(hdl, ep, flags); 1034 } 1035 1036 ep = strtok_r(NULL, " ", &lasts); 1037 } 1038 1039 fmd_hdl_free(hdl, epstr, slen + 1); 1040 } 1041 1042 /* 1043 * Free the transport infrastructure for an endpoint. 1044 */ 1045 static void 1046 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 1047 { 1048 size_t hdrlen; 1049 char hbuf[ETM_HDRLEN]; 1050 1051 (void) pthread_mutex_lock(&mp->epm_lock); 1052 1053 /* 1054 * If an etm_send thread is in progress, wait for it to finish. 1055 * The etm_recv thread is managed by the transport layer and will 1056 * be destroyed with etm_xport_fini(). 1057 */ 1058 while (mp->epm_txbusy) 1059 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 1060 1061 if (mp->epm_timer_in_use) 1062 fmd_timer_remove(hdl, mp->epm_timer_id); 1063 1064 if (mp->epm_oconn != NULL) { 1065 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 1066 ETM_HDR_SHUTDOWN, 0); 1067 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 1068 hdrlen); 1069 (void) etm_xport_close(hdl, mp->epm_oconn); 1070 mp->epm_oconn = NULL; 1071 } 1072 1073 if (mp->epm_xprthdl != NULL) { 1074 fmd_xprt_close(hdl, mp->epm_xprthdl); 1075 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1076 mp->epm_ep_nvl = NULL; 1077 } 1078 1079 if (mp->epm_ep_nvl != NULL) 1080 etm_free_ep_nvl(hdl, mp); 1081 1082 if (mp->epm_tlhdl != NULL) 1083 (void) etm_xport_fini(hdl, mp->epm_tlhdl); 1084 1085 (void) pthread_mutex_unlock(&mp->epm_lock); 1086 (void) pthread_mutex_destroy(&mp->epm_lock); 1087 fmd_hdl_strfree(hdl, mp->epm_ep_str); 1088 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 1089 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 1090 } 1091 1092 /* 1093 * FMD entry points 1094 */ 1095 1096 /* 1097 * FMD fmdo_send entry point. 1098 * Send an event to the remote endpoint and receive an ACK. 1099 */ 1100 static int 1101 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 1102 { 1103 etm_epmap_t *mp; 1104 nvlist_t *msgnvl; 1105 int hdrstat, rv, cnt = 0; 1106 char *buf, *nvbuf, *class; 1107 size_t nvsize, buflen, hdrlen; 1108 struct timespec tms; 1109 1110 (void) pthread_mutex_lock(&Etm_mod_lock); 1111 if (Etm_exit) { 1112 (void) pthread_mutex_unlock(&Etm_mod_lock); 1113 return (FMD_SEND_RETRY); 1114 } 1115 (void) pthread_mutex_unlock(&Etm_mod_lock); 1116 1117 mp = fmd_xprt_getspecific(hdl, xprthdl); 1118 1119 for (;;) { 1120 if (pthread_mutex_trylock(&mp->epm_lock) == 0) { 1121 break; 1122 } else { 1123 /* 1124 * Another thread may be (1) trying to close this 1125 * fmd_xprt_t, or (2) posting an event to it. 1126 * If (1), don't want to spend too much time here. 1127 * If (2), allow it to finish and release epm_lock. 1128 */ 1129 if (cnt++ < 10) { 1130 tms.tv_sec = 0; 1131 tms.tv_nsec = (cnt * 10000); 1132 (void) nanosleep(&tms, NULL); 1133 1134 } else { 1135 return (FMD_SEND_RETRY); 1136 } 1137 } 1138 } 1139 1140 mp->epm_txbusy++; 1141 1142 if (mp->epm_qstat == Q_UNINITIALIZED) { 1143 mp->epm_txbusy--; 1144 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1145 (void) pthread_mutex_unlock(&mp->epm_lock); 1146 return (FMD_SEND_FAILED); 1147 } 1148 1149 if (mp->epm_cstat == C_CLOSED) { 1150 etm_suspend_reconnect(hdl, mp); 1151 mp->epm_txbusy--; 1152 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1153 (void) pthread_mutex_unlock(&mp->epm_lock); 1154 return (FMD_SEND_RETRY); 1155 } 1156 1157 if (mp->epm_cstat == C_LIMBO) { 1158 if (mp->epm_oconn != NULL) { 1159 (void) etm_xport_close(hdl, mp->epm_oconn); 1160 mp->epm_oconn = NULL; 1161 } 1162 1163 fmd_xprt_suspend(hdl, xprthdl); 1164 mp->epm_qstat = Q_SUSPENDED; 1165 mp->epm_txbusy--; 1166 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1167 (void) pthread_mutex_unlock(&mp->epm_lock); 1168 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 1169 return (FMD_SEND_RETRY); 1170 } 1171 1172 if (mp->epm_oconn == NULL) { 1173 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 1174 == NULL) { 1175 etm_suspend_reconnect(hdl, mp); 1176 mp->epm_txbusy--; 1177 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1178 (void) pthread_mutex_unlock(&mp->epm_lock); 1179 return (FMD_SEND_RETRY); 1180 } else { 1181 mp->epm_cstat = C_OPEN; 1182 } 1183 } 1184 1185 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 1186 fmd_hdl_abort(hdl, "No class string in nvlist"); 1187 1188 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 1189 if (msgnvl == NULL) { 1190 mp->epm_txbusy--; 1191 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1192 (void) pthread_mutex_unlock(&mp->epm_lock); 1193 fmd_hdl_error(hdl, "Failed to translate event %p\n", 1194 (void *) ep); 1195 return (FMD_SEND_FAILED); 1196 } 1197 1198 rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str); 1199 if (rv == ETM_XPORT_FILTER_DROP) { 1200 mp->epm_txbusy--; 1201 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1202 (void) pthread_mutex_unlock(&mp->epm_lock); 1203 fmd_hdl_debug(hdl, "send_filter dropped event"); 1204 nvlist_free(msgnvl); 1205 INCRSTAT(Etm_stats.send_filter.fmds_value.ui64); 1206 return (FMD_SEND_SUCCESS); 1207 } else if (rv == ETM_XPORT_FILTER_ERROR) { 1208 fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno)); 1209 INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64); 1210 /* Still send event */ 1211 } 1212 1213 (void) pthread_mutex_unlock(&mp->epm_lock); 1214 1215 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 1216 1217 hdrlen = ETM_HDRLEN; 1218 buflen = nvsize + hdrlen; 1219 1220 ALLOC_BUF(hdl, buf, buflen); 1221 1222 nvbuf = buf + hdrlen; 1223 1224 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 1225 1226 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1227 (void) pthread_mutex_lock(&mp->epm_lock); 1228 mp->epm_txbusy--; 1229 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1230 (void) pthread_mutex_unlock(&mp->epm_lock); 1231 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1232 nvlist_free(msgnvl); 1233 FREE_BUF(hdl, buf, buflen); 1234 return (FMD_SEND_FAILED); 1235 } 1236 1237 nvlist_free(msgnvl); 1238 1239 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 1240 buflen) != buflen) { 1241 fmd_hdl_debug(hdl, "failed to send message to %s", 1242 mp->epm_ep_str); 1243 (void) pthread_mutex_lock(&mp->epm_lock); 1244 etm_suspend_reconnect(hdl, mp); 1245 mp->epm_txbusy--; 1246 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1247 (void) pthread_mutex_unlock(&mp->epm_lock); 1248 FREE_BUF(hdl, buf, buflen); 1249 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 1250 return (FMD_SEND_RETRY); 1251 } 1252 1253 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 1254 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 1255 1256 etm_hex_dump(hdl, nvbuf, nvsize, 1); 1257 1258 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 1259 hdrlen) != hdrlen) { 1260 fmd_hdl_debug(hdl, "failed to read ACK from %s", 1261 mp->epm_ep_str); 1262 (void) pthread_mutex_lock(&mp->epm_lock); 1263 etm_suspend_reconnect(hdl, mp); 1264 mp->epm_txbusy--; 1265 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1266 (void) pthread_mutex_unlock(&mp->epm_lock); 1267 FREE_BUF(hdl, buf, buflen); 1268 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 1269 return (FMD_SEND_RETRY); 1270 } 1271 1272 hdrstat = etm_check_hdr(hdl, mp, buf); 1273 FREE_BUF(hdl, buf, buflen); 1274 1275 if (hdrstat == ETM_HDR_ACK) { 1276 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 1277 } else { 1278 (void) pthread_mutex_lock(&mp->epm_lock); 1279 1280 (void) etm_xport_close(hdl, mp->epm_oconn); 1281 mp->epm_oconn = NULL; 1282 1283 if (hdrstat == ETM_HDR_NAK) { 1284 /* Peer received a bad value in the header */ 1285 if (mp->epm_xprthdl != NULL) { 1286 mp->epm_cstat = C_LIMBO; 1287 fmd_xprt_suspend(hdl, xprthdl); 1288 mp->epm_qstat = Q_SUSPENDED; 1289 fmd_hdl_debug(hdl, "received NAK, queue " 1290 "suspended for %s", mp->epm_ep_str); 1291 } 1292 1293 rv = FMD_SEND_RETRY; 1294 1295 } else if (hdrstat == ETM_HDR_S_RESTART) { 1296 /* Server has restarted */ 1297 mp->epm_cstat = C_CLOSED; 1298 mp->epm_qstat = Q_UNINITIALIZED; 1299 fmd_hdl_debug(hdl, "server %s restarted", 1300 mp->epm_ep_str); 1301 /* 1302 * Cannot call fmd_xprt_close here, so we'll do it 1303 * on the timeout thread. 1304 */ 1305 if (mp->epm_timer_in_use == 0) { 1306 mp->epm_timer_id = fmd_timer_install( 1307 hdl, mp, NULL, 0); 1308 mp->epm_timer_in_use = 1; 1309 } 1310 1311 /* 1312 * fault.* or list.* events will be replayed if a 1313 * transport is opened with the same auth. 1314 * Other events will be discarded. 1315 */ 1316 rv = FMD_SEND_FAILED; 1317 1318 } else { 1319 mp->epm_cstat = C_CLOSED; 1320 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 1321 1322 rv = FMD_SEND_RETRY; 1323 } 1324 1325 mp->epm_txbusy--; 1326 1327 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1328 (void) pthread_mutex_unlock(&mp->epm_lock); 1329 1330 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 1331 1332 return (rv); 1333 } 1334 1335 (void) pthread_mutex_lock(&mp->epm_lock); 1336 mp->epm_txbusy--; 1337 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1338 (void) pthread_mutex_unlock(&mp->epm_lock); 1339 1340 return (FMD_SEND_SUCCESS); 1341 } 1342 1343 /* 1344 * FMD fmdo_timeout entry point.. 1345 */ 1346 /*ARGSUSED*/ 1347 static void 1348 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 1349 { 1350 etm_epmap_t *mp = (etm_epmap_t *)data; 1351 1352 (void) pthread_mutex_lock(&mp->epm_lock); 1353 1354 mp->epm_timer_in_use = 0; 1355 1356 if (mp->epm_qstat == Q_UNINITIALIZED) { 1357 /* Server has shutdown and we (client) need to reconnect */ 1358 if (mp->epm_xprthdl != NULL) { 1359 fmd_xprt_close(hdl, mp->epm_xprthdl); 1360 fmd_hdl_debug(hdl, "queue closed for %s", 1361 mp->epm_ep_str); 1362 mp->epm_xprthdl = NULL; 1363 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1364 mp->epm_ep_nvl = NULL; 1365 } 1366 1367 if (mp->epm_ep_nvl == NULL) 1368 (void) etm_get_ep_nvl(hdl, mp); 1369 1370 if (etm_handle_startup(hdl, mp)) { 1371 if (mp->epm_oconn != NULL) { 1372 (void) etm_xport_close(hdl, mp->epm_oconn); 1373 mp->epm_oconn = NULL; 1374 } 1375 mp->epm_cstat = C_UNINITIALIZED; 1376 mp->epm_qstat = Q_UNINITIALIZED; 1377 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 1378 Reconn_interval); 1379 mp->epm_timer_in_use = 1; 1380 } 1381 } else { 1382 etm_reconnect(hdl, mp); 1383 } 1384 1385 (void) pthread_mutex_unlock(&mp->epm_lock); 1386 } 1387 1388 /* 1389 * FMD Module declarations 1390 */ 1391 static const fmd_hdl_ops_t etm_ops = { 1392 NULL, /* fmdo_recv */ 1393 etm_timeout, /* fmdo_timeout */ 1394 NULL, /* fmdo_close */ 1395 NULL, /* fmdo_stats */ 1396 NULL, /* fmdo_gc */ 1397 etm_send, /* fmdo_send */ 1398 }; 1399 1400 static const fmd_prop_t etm_props[] = { 1401 { "client_list", FMD_TYPE_STRING, NULL }, 1402 { "server_list", FMD_TYPE_STRING, NULL }, 1403 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1404 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" }, 1405 { "rw_timeout", FMD_TYPE_UINT64, "2000000000" }, 1406 { "filter_path", FMD_TYPE_STRING, NULL }, 1407 { NULL, 0, NULL } 1408 }; 1409 1410 static const fmd_hdl_info_t etm_info = { 1411 "Event Transport Module", "2.0", &etm_ops, etm_props 1412 }; 1413 1414 /* 1415 * Initialize the transport for use by ETM. 1416 */ 1417 void 1418 _fmd_init(fmd_hdl_t *hdl) 1419 { 1420 char *propstr; 1421 1422 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 1423 return; /* invalid data in configuration file */ 1424 } 1425 1426 /* Create global stats */ 1427 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 1428 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 1429 1430 /* Get module properties */ 1431 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 1432 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 1433 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 1434 1435 propstr = fmd_prop_get_string(hdl, "client_list"); 1436 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 1437 fmd_prop_free_string(hdl, propstr); 1438 1439 propstr = fmd_prop_get_string(hdl, "server_list"); 1440 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 1441 fmd_prop_free_string(hdl, propstr); 1442 1443 if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 1444 fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 1445 fmd_hdl_unregister(hdl); 1446 return; 1447 } 1448 } 1449 1450 /* 1451 * Teardown the transport 1452 */ 1453 void 1454 _fmd_fini(fmd_hdl_t *hdl) 1455 { 1456 etm_epmap_t *mp, *next; 1457 1458 (void) pthread_mutex_lock(&Etm_mod_lock); 1459 Etm_exit = 1; 1460 (void) pthread_mutex_unlock(&Etm_mod_lock); 1461 1462 mp = Epmap_head; 1463 1464 while (mp) { 1465 next = mp->epm_next; 1466 etm_free_epmap(hdl, mp); 1467 mp = next; 1468 } 1469 1470 fmd_hdl_unregister(hdl); 1471 } 1472