1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 /* 28 * FMA Event Transport Module 29 * 30 * Plugin for sending/receiving FMA events to/from a remote endoint. 31 */ 32 33 #include <netinet/in.h> 34 #include <errno.h> 35 #include <sys/fm/protocol.h> 36 #include <sys/sysmacros.h> 37 #include <pthread.h> 38 #include <strings.h> 39 #include <ctype.h> 40 #include <link.h> 41 #include <libnvpair.h> 42 #include "etm_xport_api.h" 43 #include "etm_proto.h" 44 45 /* 46 * ETM declarations 47 */ 48 49 typedef enum etm_connection_status { 50 C_UNINITIALIZED = 0, 51 C_OPEN, /* Connection is open */ 52 C_CLOSED, /* Connection is closed */ 53 C_LIMBO, /* Bad value in header from peer */ 54 C_TIMED_OUT /* Reconnection to peer timed out */ 55 } etm_connstat_t; 56 57 typedef enum etm_fmd_queue_status { 58 Q_UNINITIALIZED = 100, 59 Q_INIT_PENDING, /* Queue initialization in progress */ 60 Q_OPEN, /* Queue is open */ 61 Q_SUSPENDED /* Queue is suspended */ 62 } etm_qstat_t; 63 64 /* Per endpoint data */ 65 typedef struct etm_endpoint_map { 66 uint8_t epm_ver; /* Protocol version being used */ 67 char *epm_ep_str; /* Endpoint ID string */ 68 int epm_xprtflags; /* FMD transport open flags */ 69 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 70 pthread_mutex_t epm_lock; /* Protects remainder of struct */ 71 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 72 int epm_txbusy; /* Busy doing send/transmit */ 73 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 74 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 75 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 76 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 77 etm_connstat_t epm_cstat; /* Status of connection */ 78 id_t epm_timer_id; /* Timer id */ 79 int epm_timer_in_use; /* Indicates if timer is in use */ 80 hrtime_t epm_reconn_end; /* Reconnection end time */ 81 struct etm_endpoint_map *epm_next; 82 } etm_epmap_t; 83 84 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 85 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 86 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 87 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 88 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 89 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 90 91 #define ALLOC_BUF(hdl, buf, size) \ 92 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 93 94 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 95 96 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 97 98 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 99 (x)++; \ 100 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 101 } 102 103 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 104 (x)--; \ 105 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 106 } 107 108 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 109 (x) += (y); \ 110 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 111 } 112 113 /* 114 * Global variables 115 */ 116 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 117 /* Protects globals */ 118 static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 119 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 120 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 121 static int Etm_dump = 0; /* Enables hex dump for debug */ 122 static int Etm_exit = 0; /* Flag for exit */ 123 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 124 125 /* Module statistics */ 126 static struct etm_stats { 127 /* read counters */ 128 fmd_stat_t read_ack; 129 fmd_stat_t read_bytes; 130 fmd_stat_t read_msg; 131 fmd_stat_t post_filter; 132 /* write counters */ 133 fmd_stat_t write_ack; 134 fmd_stat_t write_bytes; 135 fmd_stat_t write_msg; 136 fmd_stat_t send_filter; 137 /* error counters */ 138 fmd_stat_t error_protocol; 139 fmd_stat_t error_drop_read; 140 fmd_stat_t error_read; 141 fmd_stat_t error_read_badhdr; 142 fmd_stat_t error_write; 143 fmd_stat_t error_send_filter; 144 fmd_stat_t error_post_filter; 145 /* misc */ 146 fmd_stat_t peer_count; 147 148 } Etm_stats = { 149 /* read counters */ 150 { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 151 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 152 { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 153 { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" }, 154 /* write counters */ 155 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 156 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 157 { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 158 { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" }, 159 /* ETM error counters */ 160 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 161 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 162 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 163 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 164 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 165 { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" }, 166 { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" }, 167 /* ETM Misc */ 168 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 169 }; 170 171 /* 172 * ETM Private functions 173 */ 174 175 /* 176 * Hex dump for debug. 177 */ 178 static void 179 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 180 { 181 int i, j, k; 182 int16_t *c; 183 184 if (Etm_dump == 0) 185 return; 186 187 j = buflen / 16; /* Number of complete 8-column rows */ 188 k = buflen % 16; /* Is there a last (non-8-column) row? */ 189 190 if (direction) 191 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 192 else 193 fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 194 195 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 196 197 /* Dump the complete 8-column rows */ 198 for (i = 0; i < j; i++) { 199 c = (int16_t *)buf + (i * 8); 200 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 201 *(c+0), *(c+1), *(c+2), *(c+3), 202 *(c+4), *(c+5), *(c+6), *(c+7)); 203 } 204 205 /* Dump the last (incomplete) row */ 206 c = (int16_t *)buf + (i * 8); 207 switch (k) { 208 case 4: 209 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 210 break; 211 case 8: 212 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 213 *(c+2), *(c+3)); 214 break; 215 case 12: 216 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 217 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 218 break; 219 } 220 221 fmd_hdl_debug(hdl, "--- End Dump ---"); 222 } 223 224 /* 225 * Provide the length of a message based on the data in the given ETM header. 226 */ 227 static size_t 228 etm_get_msglen(void *buf) 229 { 230 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 231 232 return (ntohl(hp->hdr_msglen)); 233 } 234 235 /* 236 * Check the contents of the ETM header for errors. 237 * Return the header type (hdr_type). 238 */ 239 static int 240 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 241 { 242 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 243 244 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 245 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 246 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 247 return (ETM_HDR_INVALID); 248 } 249 250 if ((hp->hdr_type == ETM_HDR_C_HELLO) || 251 (hp->hdr_type == ETM_HDR_S_HELLO)) { 252 /* Until version is negotiated, other fields may be wrong */ 253 return (hp->hdr_type); 254 } 255 256 if (hp->hdr_ver != mp->epm_ver) { 257 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 258 mp->epm_ep_str, hp->hdr_ver); 259 return (ETM_HDR_BADVERSION); 260 } 261 262 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 263 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 264 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 265 mp->epm_ep_str, hp->hdr_type); 266 return (ETM_HDR_BADTYPE); 267 } 268 269 return (hp->hdr_type); 270 } 271 272 /* 273 * Create an ETM header of a given type in the given buffer. 274 * Return length of header. 275 */ 276 static size_t 277 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 278 { 279 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 280 281 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 282 hp->hdr_ver = ver; 283 hp->hdr_type = type; 284 hp->hdr_msglen = htonl(msglen); 285 286 return (ETM_HDRLEN); 287 } 288 289 /* 290 * Convert message bytes to nvlist and post to fmd. 291 * Return zero for success, non-zero for failure. 292 * 293 * Note : nvl is free'd by fmd. 294 */ 295 static int 296 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 297 { 298 nvlist_t *nvl; 299 int rv; 300 301 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 302 fmd_hdl_error(hdl, "failed to unpack message"); 303 return (1); 304 } 305 306 rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str); 307 if (rv == ETM_XPORT_FILTER_DROP) { 308 fmd_hdl_debug(hdl, "post_filter dropped event"); 309 INCRSTAT(Etm_stats.post_filter.fmds_value.ui64); 310 nvlist_free(nvl); 311 return (0); 312 } else if (rv == ETM_XPORT_FILTER_ERROR) { 313 fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno)); 314 INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64); 315 /* Still post event */ 316 } 317 318 (void) pthread_mutex_lock(&mp->epm_lock); 319 (void) pthread_mutex_lock(&Etm_mod_lock); 320 if (!Etm_exit) { 321 (void) pthread_mutex_unlock(&Etm_mod_lock); 322 if (mp->epm_qstat == Q_OPEN) { 323 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 324 rv = 0; 325 } else if (mp->epm_qstat == Q_SUSPENDED) { 326 fmd_xprt_resume(hdl, mp->epm_xprthdl); 327 if (mp->epm_timer_in_use) { 328 fmd_timer_remove(hdl, mp->epm_timer_id); 329 mp->epm_timer_in_use = 0; 330 } 331 mp->epm_qstat = Q_OPEN; 332 fmd_hdl_debug(hdl, "queue resumed for %s", 333 mp->epm_ep_str); 334 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 335 rv = 0; 336 } else { 337 fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 338 mp->epm_qstat); 339 nvlist_free(nvl); 340 /* Remote peer will attempt to resend event */ 341 rv = 2; 342 } 343 } else { 344 (void) pthread_mutex_unlock(&Etm_mod_lock); 345 fmd_hdl_debug(hdl, "unable to post message, module exiting"); 346 nvlist_free(nvl); 347 /* Remote peer will attempt to resend event */ 348 rv = 3; 349 } 350 351 (void) pthread_mutex_unlock(&mp->epm_lock); 352 353 return (rv); 354 } 355 356 /* 357 * Handle the startup handshake to the server. The client always initiates 358 * the startup handshake. In the following sequence, we are the client and 359 * the remote endpoint is the server. 360 * 361 * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 362 * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 363 * Client sends ACK and transitions to Q_OPEN state. 364 * Server receives ACK and transitions to Q_OPEN state. 365 * 366 * Return 0 for success, nonzero for failure. 367 */ 368 static int 369 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 370 { 371 etm_proto_hdr_t *hp; 372 size_t hdrlen = ETM_HDRLEN; 373 int hdrstat; 374 char hbuf[ETM_HDRLEN]; 375 376 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 377 return (1); 378 379 mp->epm_cstat = C_OPEN; 380 381 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 382 383 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 384 hdrlen)) != hdrlen) { 385 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 386 mp->epm_ep_str); 387 return (2); 388 } 389 390 mp->epm_qstat = Q_INIT_PENDING; 391 392 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 393 hdrlen)) != hdrlen) { 394 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 395 mp->epm_ep_str); 396 return (3); 397 } 398 399 hdrstat = etm_check_hdr(hdl, mp, hbuf); 400 401 if (hdrstat != ETM_HDR_S_HELLO) { 402 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 403 "from %s", mp->epm_ep_str); 404 return (4); 405 } 406 407 /* 408 * Get version from the server. 409 * Currently, only one version is supported. 410 */ 411 hp = (etm_proto_hdr_t *)(void *)hbuf; 412 if (hp->hdr_ver != ETM_PROTO_V1) { 413 fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 414 mp->epm_ep_str, hp->hdr_ver); 415 return (5); 416 } 417 mp->epm_ver = hp->hdr_ver; 418 419 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 420 421 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 422 hdrlen)) != hdrlen) { 423 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 424 mp->epm_ep_str); 425 return (6); 426 } 427 428 /* 429 * Call fmd_xprt_open and fmd_xprt_setspecific with 430 * Etm_mod_lock held to avoid race with etm_send thread. 431 */ 432 (void) pthread_mutex_lock(&Etm_mod_lock); 433 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 434 mp->epm_ep_nvl, NULL)) == NULL) { 435 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 436 mp->epm_ep_str); 437 } 438 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 439 (void) pthread_mutex_unlock(&Etm_mod_lock); 440 441 mp->epm_qstat = Q_OPEN; 442 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 443 444 return (0); 445 } 446 447 /* 448 * Open a connection to the peer, send a SHUTDOWN message, 449 * and close the connection. 450 */ 451 static void 452 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp) 453 { 454 size_t hdrlen = ETM_HDRLEN; 455 char hbuf[ETM_HDRLEN]; 456 457 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 458 return; 459 460 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0); 461 462 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen); 463 464 (void) etm_xport_close(hdl, mp->epm_oconn); 465 mp->epm_oconn = NULL; 466 } 467 468 /* 469 * Alloc a nvlist and add a string for the endpoint. 470 * Return zero for success, non-zero for failure. 471 */ 472 static int 473 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 474 { 475 /* 476 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 477 * in fmd when this nvlist_t is free'd. 478 */ 479 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 480 481 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 482 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 483 "for %s", mp->epm_ep_str); 484 nvlist_free(mp->epm_ep_nvl); 485 return (1); 486 } 487 488 return (0); 489 } 490 491 /* 492 * Free the nvlist for the endpoint_id string. 493 */ 494 /*ARGSUSED*/ 495 static void 496 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 497 { 498 nvlist_free(mp->epm_ep_nvl); 499 } 500 501 /* 502 * Check for a duplicate endpoint/peer string. 503 */ 504 /*ARGSUSED*/ 505 static int 506 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 507 { 508 etm_epmap_t *mp; 509 510 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 511 if (strcmp(epname, mp->epm_ep_str) == 0) 512 return (1); 513 514 return (0); 515 } 516 517 /* 518 * Attempt to re-open a connection with the remote endpoint. 519 */ 520 static void 521 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 522 { 523 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 524 if (gethrtime() < mp->epm_reconn_end) { 525 if ((mp->epm_oconn = etm_xport_open(hdl, 526 mp->epm_tlhdl)) == NULL) { 527 fmd_hdl_debug(hdl, "reconnect failed for %s", 528 mp->epm_ep_str); 529 mp->epm_timer_id = fmd_timer_install(hdl, mp, 530 NULL, Reconn_interval); 531 mp->epm_timer_in_use = 1; 532 } else { 533 fmd_hdl_debug(hdl, "reconnect success for %s", 534 mp->epm_ep_str); 535 mp->epm_reconn_end = 0; 536 mp->epm_cstat = C_OPEN; 537 } 538 } else { 539 fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 540 mp->epm_ep_str); 541 mp->epm_reconn_end = 0; 542 mp->epm_cstat = C_TIMED_OUT; 543 } 544 } 545 546 if (mp->epm_cstat == C_OPEN) { 547 fmd_xprt_resume(hdl, mp->epm_xprthdl); 548 mp->epm_qstat = Q_OPEN; 549 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 550 } 551 } 552 553 /* 554 * Suspend a given connection and setup for reconnection retries. 555 * Assume caller holds lock on epm_lock. 556 */ 557 static void 558 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 559 { 560 (void) pthread_mutex_lock(&Etm_mod_lock); 561 if (Etm_exit) { 562 (void) pthread_mutex_unlock(&Etm_mod_lock); 563 return; 564 } 565 (void) pthread_mutex_unlock(&Etm_mod_lock); 566 567 if (mp->epm_oconn != NULL) { 568 (void) etm_xport_close(hdl, mp->epm_oconn); 569 mp->epm_oconn = NULL; 570 } 571 572 mp->epm_reconn_end = gethrtime() + Reconn_timeout; 573 mp->epm_cstat = C_UNINITIALIZED; 574 575 if (mp->epm_xprthdl != NULL) { 576 fmd_xprt_suspend(hdl, mp->epm_xprthdl); 577 mp->epm_qstat = Q_SUSPENDED; 578 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 579 580 if (mp->epm_timer_in_use == 0) { 581 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 582 Reconn_interval); 583 mp->epm_timer_in_use = 1; 584 } 585 } 586 } 587 588 /* 589 * Reinitialize the connection. The old fmd_xprt_t handle must be 590 * removed/closed first. 591 * Assume caller holds lock on epm_lock. 592 */ 593 static void 594 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 595 { 596 /* 597 * To avoid a deadlock, wait for etm_send to finish before 598 * calling fmd_xprt_close() 599 */ 600 while (mp->epm_txbusy) 601 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 602 603 if (mp->epm_xprthdl != NULL) { 604 fmd_xprt_close(hdl, mp->epm_xprthdl); 605 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 606 mp->epm_xprthdl = NULL; 607 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 608 mp->epm_ep_nvl = NULL; 609 } 610 611 if (mp->epm_timer_in_use) { 612 fmd_timer_remove(hdl, mp->epm_timer_id); 613 mp->epm_timer_in_use = 0; 614 } 615 616 if (mp->epm_oconn != NULL) { 617 (void) etm_xport_close(hdl, mp->epm_oconn); 618 mp->epm_oconn = NULL; 619 } 620 621 mp->epm_cstat = C_UNINITIALIZED; 622 mp->epm_qstat = Q_UNINITIALIZED; 623 } 624 625 /* 626 * Receive data from ETM transport layer. 627 * Note : This is not the fmdo_recv entry point. 628 * 629 */ 630 static int 631 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 632 { 633 size_t buflen, hdrlen; 634 void *buf; 635 char hbuf[ETM_HDRLEN]; 636 int hdrstat, rv; 637 638 hdrlen = ETM_HDRLEN; 639 640 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 641 fmd_hdl_debug(hdl, "failed to read header from %s", 642 mp->epm_ep_str); 643 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 644 return (EIO); 645 } 646 647 hdrstat = etm_check_hdr(hdl, mp, hbuf); 648 649 switch (hdrstat) { 650 case ETM_HDR_INVALID: 651 (void) pthread_mutex_lock(&mp->epm_lock); 652 if (mp->epm_cstat == C_OPEN) 653 mp->epm_cstat = C_CLOSED; 654 (void) pthread_mutex_unlock(&mp->epm_lock); 655 656 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 657 rv = ECANCELED; 658 break; 659 660 case ETM_HDR_BADTYPE: 661 case ETM_HDR_BADVERSION: 662 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 663 664 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 665 hdrlen)) != hdrlen) { 666 fmd_hdl_debug(hdl, "failed to write NAK to %s", 667 mp->epm_ep_str); 668 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 669 return (EIO); 670 } 671 672 (void) pthread_mutex_lock(&mp->epm_lock); 673 mp->epm_cstat = C_LIMBO; 674 (void) pthread_mutex_unlock(&mp->epm_lock); 675 676 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 677 rv = ENOTSUP; 678 break; 679 680 case ETM_HDR_C_HELLO: 681 /* Client is initiating a startup handshake */ 682 (void) pthread_mutex_lock(&mp->epm_lock); 683 etm_reinit(hdl, mp); 684 mp->epm_qstat = Q_INIT_PENDING; 685 (void) pthread_mutex_unlock(&mp->epm_lock); 686 687 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 688 689 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 690 hdrlen)) != hdrlen) { 691 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 692 mp->epm_ep_str); 693 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 694 return (EIO); 695 } 696 697 rv = 0; 698 break; 699 700 case ETM_HDR_ACK: 701 (void) pthread_mutex_lock(&mp->epm_lock); 702 if (mp->epm_qstat == Q_INIT_PENDING) { 703 /* This is client's ACK from startup handshake */ 704 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 705 if (mp->epm_ep_nvl == NULL) 706 (void) etm_get_ep_nvl(hdl, mp); 707 708 /* 709 * Call fmd_xprt_open and fmd_xprt_setspecific with 710 * Etm_mod_lock held to avoid race with etm_send thread. 711 */ 712 (void) pthread_mutex_lock(&Etm_mod_lock); 713 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 714 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 715 fmd_hdl_abort(hdl, "Failed to init xprthdl " 716 "for %s", mp->epm_ep_str); 717 } 718 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 719 (void) pthread_mutex_unlock(&Etm_mod_lock); 720 721 mp->epm_qstat = Q_OPEN; 722 (void) pthread_mutex_unlock(&mp->epm_lock); 723 fmd_hdl_debug(hdl, "queue open for %s", 724 mp->epm_ep_str); 725 } else { 726 (void) pthread_mutex_unlock(&mp->epm_lock); 727 fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 728 "from %s\n", mp->epm_ep_str); 729 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 730 } 731 732 rv = 0; 733 break; 734 735 case ETM_HDR_SHUTDOWN: 736 fmd_hdl_debug(hdl, "received shutdown from %s", 737 mp->epm_ep_str); 738 739 (void) pthread_mutex_lock(&mp->epm_lock); 740 741 etm_reinit(hdl, mp); 742 743 if (IS_CLIENT(mp)) { 744 /* 745 * A server shutdown is considered to be temporary. 746 * Prepare for reconnection. 747 */ 748 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 749 Reconn_interval); 750 751 mp->epm_timer_in_use = 1; 752 } 753 754 (void) pthread_mutex_unlock(&mp->epm_lock); 755 756 rv = ECANCELED; 757 break; 758 759 case ETM_HDR_MSG: 760 (void) pthread_mutex_lock(&mp->epm_lock); 761 if (mp->epm_qstat == Q_UNINITIALIZED) { 762 /* Peer (client) is unaware that we've restarted */ 763 (void) pthread_mutex_unlock(&mp->epm_lock); 764 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 765 ETM_HDR_S_RESTART, 0); 766 767 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 768 hdrlen)) != hdrlen) { 769 fmd_hdl_debug(hdl, "failed to write S_RESTART " 770 "to %s", mp->epm_ep_str); 771 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 772 return (EIO); 773 } 774 775 return (ECANCELED); 776 } 777 (void) pthread_mutex_unlock(&mp->epm_lock); 778 779 buflen = etm_get_msglen(hbuf); 780 ALLOC_BUF(hdl, buf, buflen); 781 782 if (etm_xport_read(hdl, conn, Rw_timeout, buf, 783 buflen) != buflen) { 784 fmd_hdl_debug(hdl, "failed to read message from %s", 785 mp->epm_ep_str); 786 FREE_BUF(hdl, buf, buflen); 787 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 788 return (EIO); 789 } 790 791 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 792 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 793 794 etm_hex_dump(hdl, buf, buflen, 0); 795 796 if (etm_post_msg(hdl, mp, buf, buflen)) { 797 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 798 FREE_BUF(hdl, buf, buflen); 799 return (EIO); 800 } 801 802 FREE_BUF(hdl, buf, buflen); 803 804 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 805 806 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 807 hdrlen)) != hdrlen) { 808 fmd_hdl_debug(hdl, "failed to write ACK to %s", 809 mp->epm_ep_str); 810 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 811 return (EIO); 812 } 813 814 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 815 816 /* 817 * If we got this far and the current state of the 818 * outbound/sending connection is TIMED_OUT or 819 * LIMBO, then we should reinitialize it. 820 */ 821 (void) pthread_mutex_lock(&mp->epm_lock); 822 if (mp->epm_cstat == C_TIMED_OUT || 823 mp->epm_cstat == C_LIMBO) { 824 if (mp->epm_oconn != NULL) { 825 (void) etm_xport_close(hdl, mp->epm_oconn); 826 mp->epm_oconn = NULL; 827 } 828 mp->epm_cstat = C_UNINITIALIZED; 829 fmd_xprt_resume(hdl, mp->epm_xprthdl); 830 if (mp->epm_timer_in_use) { 831 fmd_timer_remove(hdl, mp->epm_timer_id); 832 mp->epm_timer_in_use = 0; 833 } 834 mp->epm_qstat = Q_OPEN; 835 fmd_hdl_debug(hdl, "queue resumed for %s", 836 mp->epm_ep_str); 837 } 838 (void) pthread_mutex_unlock(&mp->epm_lock); 839 840 rv = 0; 841 break; 842 843 default: 844 fmd_hdl_debug(hdl, "protocol error, unexpected header " 845 "from %s : %d", mp->epm_ep_str, hdrstat); 846 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 847 rv = 0; 848 } 849 850 return (rv); 851 } 852 853 /* 854 * ETM transport layer callback function. 855 * The transport layer calls this function to : 856 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 857 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 858 */ 859 static int 860 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 861 void *arg) 862 { 863 etm_epmap_t *mp = (etm_epmap_t *)arg; 864 int rv = 0; 865 866 (void) pthread_mutex_lock(&Etm_mod_lock); 867 if (Etm_exit) { 868 (void) pthread_mutex_unlock(&Etm_mod_lock); 869 return (ECANCELED); 870 } 871 (void) pthread_mutex_unlock(&Etm_mod_lock); 872 873 switch (flag) { 874 case ETM_CBFLAG_RECV: 875 rv = etm_recv(hdl, conn, mp); 876 break; 877 case ETM_CBFLAG_REINIT: 878 (void) pthread_mutex_lock(&mp->epm_lock); 879 etm_reinit(hdl, mp); 880 etm_send_shutdown(hdl, mp); 881 (void) pthread_mutex_unlock(&mp->epm_lock); 882 /* 883 * Return ECANCELED so the transport layer will close the 884 * server connection. The transport layer is responsible for 885 * reestablishing this connection (should a connection request 886 * arrive from the peer). 887 */ 888 rv = ECANCELED; 889 break; 890 default: 891 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 892 rv = ENOTSUP; 893 } 894 895 return (rv); 896 } 897 898 /* 899 * Allocate and initialize an etm_epmap_t struct for the given endpoint 900 * name string. 901 */ 902 static void 903 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 904 { 905 etm_epmap_t *newmap; 906 907 if (etm_check_dup_ep_str(hdl, epname)) { 908 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 909 return; 910 } 911 912 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 913 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 914 newmap->epm_xprtflags = flags; 915 newmap->epm_cstat = C_UNINITIALIZED; 916 newmap->epm_qstat = Q_UNINITIALIZED; 917 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 918 newmap->epm_txbusy = 0; 919 920 (void) pthread_mutex_init(&newmap->epm_lock, NULL); 921 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 922 923 if (etm_get_ep_nvl(hdl, newmap)) { 924 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 925 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 926 return; 927 } 928 929 (void) pthread_mutex_lock(&newmap->epm_lock); 930 931 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 932 etm_cb_func, newmap)) == NULL) { 933 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 934 newmap->epm_ep_str); 935 etm_free_ep_nvl(hdl, newmap); 936 (void) pthread_mutex_unlock(&newmap->epm_lock); 937 (void) pthread_mutex_destroy(&newmap->epm_lock); 938 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 939 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 940 return; 941 } 942 943 if (IS_CLIENT(newmap)) { 944 if (etm_handle_startup(hdl, newmap)) { 945 /* 946 * For whatever reason, we could not complete the 947 * startup handshake with the server. Set the timer 948 * and try again. 949 */ 950 if (newmap->epm_oconn != NULL) { 951 (void) etm_xport_close(hdl, newmap->epm_oconn); 952 newmap->epm_oconn = NULL; 953 } 954 newmap->epm_cstat = C_UNINITIALIZED; 955 newmap->epm_qstat = Q_UNINITIALIZED; 956 newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 957 NULL, Reconn_interval); 958 newmap->epm_timer_in_use = 1; 959 } 960 } else { 961 /* 962 * We may be restarting after a crash. If so, the client 963 * may be unaware of this. 964 */ 965 etm_send_shutdown(hdl, newmap); 966 } 967 968 /* Add this transport instance handle to the list */ 969 newmap->epm_next = Epmap_head; 970 Epmap_head = newmap; 971 972 (void) pthread_mutex_unlock(&newmap->epm_lock); 973 974 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 975 } 976 977 /* 978 * Parse the given property list string and call etm_init_epmap 979 * for each endpoint. 980 */ 981 static void 982 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 983 { 984 char *epstr, *ep, *prefix, *lasts, *numstr; 985 char epname[MAXPATHLEN]; 986 size_t slen, nlen; 987 int beg, end, i; 988 989 if (eplist == NULL) 990 return; 991 /* 992 * Create a copy of eplist for parsing. 993 * strtok/strtok_r(3C) will insert null chars to the string. 994 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 995 */ 996 slen = strlen(eplist); 997 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 998 (void) strcpy(epstr, eplist); 999 1000 /* 1001 * The following are supported for the "client_list" and 1002 * "server_list" properties : 1003 * 1004 * A space-separated list of endpoints. 1005 * "dev:///dom0 dev:///dom1 dev:///dom2" 1006 * 1007 * An array syntax for a range of instances. 1008 * "dev:///dom[0:2]" 1009 * 1010 * A combination of both. 1011 * "dev:///dom0 dev:///dom[1:2]" 1012 */ 1013 ep = strtok_r(epstr, " ", &lasts); 1014 while (ep != NULL) { 1015 if (strchr(ep, '[') != NULL) { 1016 /* 1017 * This string is using array syntax. 1018 * Check the string for correct syntax. 1019 */ 1020 if ((strchr(ep, ':') == NULL) || 1021 (strchr(ep, ']') == NULL)) { 1022 fmd_hdl_error(hdl, "Syntax error in property " 1023 "that includes : %s\n", ep); 1024 ep = strtok_r(NULL, " ", &lasts); 1025 continue; 1026 } 1027 1028 /* expand the array syntax */ 1029 prefix = strtok(ep, "["); 1030 1031 numstr = strtok(NULL, ":"); 1032 if ((numstr == NULL) || (!isdigit(*numstr))) { 1033 fmd_hdl_error(hdl, "Syntax error in property " 1034 "that includes : %s[\n", prefix); 1035 ep = strtok_r(NULL, " ", &lasts); 1036 continue; 1037 } 1038 beg = atoi(numstr); 1039 1040 numstr = strtok(NULL, "]"); 1041 if ((numstr == NULL) || (!isdigit(*numstr))) { 1042 fmd_hdl_error(hdl, "Syntax error in property " 1043 "that includes : %s[\n", prefix); 1044 ep = strtok_r(NULL, " ", &lasts); 1045 continue; 1046 } 1047 end = atoi(numstr); 1048 1049 nlen = strlen(prefix) + ETM_EP_INST_MAX; 1050 1051 if (nlen > MAXPATHLEN) { 1052 fmd_hdl_error(hdl, "Endpoint prop string " 1053 "exceeds MAXPATHLEN\n"); 1054 ep = strtok_r(NULL, " ", &lasts); 1055 continue; 1056 } 1057 1058 for (i = beg; i <= end; i++) { 1059 bzero(epname, MAXPATHLEN); 1060 (void) snprintf(epname, nlen, "%s%d", 1061 prefix, i); 1062 etm_init_epmap(hdl, epname, flags); 1063 } 1064 } else { 1065 etm_init_epmap(hdl, ep, flags); 1066 } 1067 1068 ep = strtok_r(NULL, " ", &lasts); 1069 } 1070 1071 fmd_hdl_free(hdl, epstr, slen + 1); 1072 } 1073 1074 /* 1075 * Free the transport infrastructure for an endpoint. 1076 */ 1077 static void 1078 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 1079 { 1080 size_t hdrlen; 1081 char hbuf[ETM_HDRLEN]; 1082 1083 (void) pthread_mutex_lock(&mp->epm_lock); 1084 1085 /* 1086 * If an etm_send thread is in progress, wait for it to finish. 1087 * The etm_recv thread is managed by the transport layer and will 1088 * be destroyed with etm_xport_fini(). 1089 */ 1090 while (mp->epm_txbusy) 1091 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 1092 1093 if (mp->epm_timer_in_use) 1094 fmd_timer_remove(hdl, mp->epm_timer_id); 1095 1096 if (mp->epm_oconn != NULL) { 1097 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 1098 ETM_HDR_SHUTDOWN, 0); 1099 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 1100 hdrlen); 1101 (void) etm_xport_close(hdl, mp->epm_oconn); 1102 mp->epm_oconn = NULL; 1103 } 1104 1105 if (mp->epm_xprthdl != NULL) { 1106 fmd_xprt_close(hdl, mp->epm_xprthdl); 1107 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1108 mp->epm_ep_nvl = NULL; 1109 } 1110 1111 if (mp->epm_ep_nvl != NULL) 1112 etm_free_ep_nvl(hdl, mp); 1113 1114 if (mp->epm_tlhdl != NULL) 1115 (void) etm_xport_fini(hdl, mp->epm_tlhdl); 1116 1117 (void) pthread_mutex_unlock(&mp->epm_lock); 1118 (void) pthread_mutex_destroy(&mp->epm_lock); 1119 fmd_hdl_strfree(hdl, mp->epm_ep_str); 1120 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 1121 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 1122 } 1123 1124 /* 1125 * FMD entry points 1126 */ 1127 1128 /* 1129 * FMD fmdo_send entry point. 1130 * Send an event to the remote endpoint and receive an ACK. 1131 */ 1132 static int 1133 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 1134 { 1135 etm_epmap_t *mp; 1136 nvlist_t *msgnvl; 1137 int hdrstat, rv, cnt = 0; 1138 char *buf, *nvbuf, *class; 1139 size_t nvsize, buflen, hdrlen; 1140 struct timespec tms; 1141 1142 (void) pthread_mutex_lock(&Etm_mod_lock); 1143 if (Etm_exit) { 1144 (void) pthread_mutex_unlock(&Etm_mod_lock); 1145 return (FMD_SEND_RETRY); 1146 } 1147 (void) pthread_mutex_unlock(&Etm_mod_lock); 1148 1149 mp = fmd_xprt_getspecific(hdl, xprthdl); 1150 1151 for (;;) { 1152 if (pthread_mutex_trylock(&mp->epm_lock) == 0) { 1153 break; 1154 } else { 1155 /* 1156 * Another thread may be (1) trying to close this 1157 * fmd_xprt_t, or (2) posting an event to it. 1158 * If (1), don't want to spend too much time here. 1159 * If (2), allow it to finish and release epm_lock. 1160 */ 1161 if (cnt++ < 10) { 1162 tms.tv_sec = 0; 1163 tms.tv_nsec = (cnt * 10000); 1164 (void) nanosleep(&tms, NULL); 1165 1166 } else { 1167 return (FMD_SEND_RETRY); 1168 } 1169 } 1170 } 1171 1172 mp->epm_txbusy++; 1173 1174 if (mp->epm_qstat == Q_UNINITIALIZED) { 1175 mp->epm_txbusy--; 1176 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1177 (void) pthread_mutex_unlock(&mp->epm_lock); 1178 return (FMD_SEND_FAILED); 1179 } 1180 1181 if (mp->epm_cstat == C_CLOSED) { 1182 etm_suspend_reconnect(hdl, mp); 1183 mp->epm_txbusy--; 1184 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1185 (void) pthread_mutex_unlock(&mp->epm_lock); 1186 return (FMD_SEND_RETRY); 1187 } 1188 1189 if (mp->epm_cstat == C_LIMBO) { 1190 if (mp->epm_oconn != NULL) { 1191 (void) etm_xport_close(hdl, mp->epm_oconn); 1192 mp->epm_oconn = NULL; 1193 } 1194 1195 fmd_xprt_suspend(hdl, xprthdl); 1196 mp->epm_qstat = Q_SUSPENDED; 1197 mp->epm_txbusy--; 1198 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1199 (void) pthread_mutex_unlock(&mp->epm_lock); 1200 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 1201 return (FMD_SEND_RETRY); 1202 } 1203 1204 if (mp->epm_oconn == NULL) { 1205 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 1206 == NULL) { 1207 etm_suspend_reconnect(hdl, mp); 1208 mp->epm_txbusy--; 1209 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1210 (void) pthread_mutex_unlock(&mp->epm_lock); 1211 return (FMD_SEND_RETRY); 1212 } else { 1213 mp->epm_cstat = C_OPEN; 1214 } 1215 } 1216 1217 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 1218 fmd_hdl_abort(hdl, "No class string in nvlist"); 1219 1220 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 1221 if (msgnvl == NULL) { 1222 mp->epm_txbusy--; 1223 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1224 (void) pthread_mutex_unlock(&mp->epm_lock); 1225 fmd_hdl_error(hdl, "Failed to translate event %p\n", 1226 (void *) ep); 1227 return (FMD_SEND_FAILED); 1228 } 1229 1230 rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str); 1231 if (rv == ETM_XPORT_FILTER_DROP) { 1232 mp->epm_txbusy--; 1233 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1234 (void) pthread_mutex_unlock(&mp->epm_lock); 1235 fmd_hdl_debug(hdl, "send_filter dropped event"); 1236 nvlist_free(msgnvl); 1237 INCRSTAT(Etm_stats.send_filter.fmds_value.ui64); 1238 return (FMD_SEND_SUCCESS); 1239 } else if (rv == ETM_XPORT_FILTER_ERROR) { 1240 fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno)); 1241 INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64); 1242 /* Still send event */ 1243 } 1244 1245 (void) pthread_mutex_unlock(&mp->epm_lock); 1246 1247 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 1248 1249 hdrlen = ETM_HDRLEN; 1250 buflen = nvsize + hdrlen; 1251 1252 ALLOC_BUF(hdl, buf, buflen); 1253 1254 nvbuf = buf + hdrlen; 1255 1256 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 1257 1258 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1259 (void) pthread_mutex_lock(&mp->epm_lock); 1260 mp->epm_txbusy--; 1261 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1262 (void) pthread_mutex_unlock(&mp->epm_lock); 1263 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1264 nvlist_free(msgnvl); 1265 FREE_BUF(hdl, buf, buflen); 1266 return (FMD_SEND_FAILED); 1267 } 1268 1269 nvlist_free(msgnvl); 1270 1271 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 1272 buflen) != buflen) { 1273 fmd_hdl_debug(hdl, "failed to send message to %s", 1274 mp->epm_ep_str); 1275 (void) pthread_mutex_lock(&mp->epm_lock); 1276 etm_suspend_reconnect(hdl, mp); 1277 mp->epm_txbusy--; 1278 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1279 (void) pthread_mutex_unlock(&mp->epm_lock); 1280 FREE_BUF(hdl, buf, buflen); 1281 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 1282 return (FMD_SEND_RETRY); 1283 } 1284 1285 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 1286 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 1287 1288 etm_hex_dump(hdl, nvbuf, nvsize, 1); 1289 1290 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 1291 hdrlen) != hdrlen) { 1292 fmd_hdl_debug(hdl, "failed to read ACK from %s", 1293 mp->epm_ep_str); 1294 (void) pthread_mutex_lock(&mp->epm_lock); 1295 etm_suspend_reconnect(hdl, mp); 1296 mp->epm_txbusy--; 1297 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1298 (void) pthread_mutex_unlock(&mp->epm_lock); 1299 FREE_BUF(hdl, buf, buflen); 1300 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 1301 return (FMD_SEND_RETRY); 1302 } 1303 1304 hdrstat = etm_check_hdr(hdl, mp, buf); 1305 FREE_BUF(hdl, buf, buflen); 1306 1307 if (hdrstat == ETM_HDR_ACK) { 1308 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 1309 } else { 1310 (void) pthread_mutex_lock(&mp->epm_lock); 1311 1312 (void) etm_xport_close(hdl, mp->epm_oconn); 1313 mp->epm_oconn = NULL; 1314 1315 if (hdrstat == ETM_HDR_NAK) { 1316 /* Peer received a bad value in the header */ 1317 if (mp->epm_xprthdl != NULL) { 1318 mp->epm_cstat = C_LIMBO; 1319 fmd_xprt_suspend(hdl, xprthdl); 1320 mp->epm_qstat = Q_SUSPENDED; 1321 fmd_hdl_debug(hdl, "received NAK, queue " 1322 "suspended for %s", mp->epm_ep_str); 1323 } 1324 1325 rv = FMD_SEND_RETRY; 1326 1327 } else if (hdrstat == ETM_HDR_S_RESTART) { 1328 /* Server has restarted */ 1329 mp->epm_cstat = C_CLOSED; 1330 mp->epm_qstat = Q_UNINITIALIZED; 1331 fmd_hdl_debug(hdl, "server %s restarted", 1332 mp->epm_ep_str); 1333 /* 1334 * Cannot call fmd_xprt_close here, so we'll do it 1335 * on the timeout thread. 1336 */ 1337 if (mp->epm_timer_in_use == 0) { 1338 mp->epm_timer_id = fmd_timer_install( 1339 hdl, mp, NULL, 0); 1340 mp->epm_timer_in_use = 1; 1341 } 1342 1343 /* 1344 * fault.* or list.* events will be replayed if a 1345 * transport is opened with the same auth. 1346 * Other events will be discarded. 1347 */ 1348 rv = FMD_SEND_FAILED; 1349 1350 } else { 1351 mp->epm_cstat = C_CLOSED; 1352 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 1353 1354 rv = FMD_SEND_RETRY; 1355 } 1356 1357 mp->epm_txbusy--; 1358 1359 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1360 (void) pthread_mutex_unlock(&mp->epm_lock); 1361 1362 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 1363 1364 return (rv); 1365 } 1366 1367 (void) pthread_mutex_lock(&mp->epm_lock); 1368 mp->epm_txbusy--; 1369 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1370 (void) pthread_mutex_unlock(&mp->epm_lock); 1371 1372 return (FMD_SEND_SUCCESS); 1373 } 1374 1375 /* 1376 * FMD fmdo_timeout entry point.. 1377 */ 1378 /*ARGSUSED*/ 1379 static void 1380 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 1381 { 1382 etm_epmap_t *mp = (etm_epmap_t *)data; 1383 1384 (void) pthread_mutex_lock(&mp->epm_lock); 1385 1386 mp->epm_timer_in_use = 0; 1387 1388 if (mp->epm_qstat == Q_UNINITIALIZED) { 1389 /* Server has shutdown and we (client) need to reconnect */ 1390 if (mp->epm_xprthdl != NULL) { 1391 fmd_xprt_close(hdl, mp->epm_xprthdl); 1392 fmd_hdl_debug(hdl, "queue closed for %s", 1393 mp->epm_ep_str); 1394 mp->epm_xprthdl = NULL; 1395 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1396 mp->epm_ep_nvl = NULL; 1397 } 1398 1399 if (mp->epm_ep_nvl == NULL) 1400 (void) etm_get_ep_nvl(hdl, mp); 1401 1402 if (etm_handle_startup(hdl, mp)) { 1403 if (mp->epm_oconn != NULL) { 1404 (void) etm_xport_close(hdl, mp->epm_oconn); 1405 mp->epm_oconn = NULL; 1406 } 1407 mp->epm_cstat = C_UNINITIALIZED; 1408 mp->epm_qstat = Q_UNINITIALIZED; 1409 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 1410 Reconn_interval); 1411 mp->epm_timer_in_use = 1; 1412 } 1413 } else { 1414 etm_reconnect(hdl, mp); 1415 } 1416 1417 (void) pthread_mutex_unlock(&mp->epm_lock); 1418 } 1419 1420 /* 1421 * FMD Module declarations 1422 */ 1423 static const fmd_hdl_ops_t etm_ops = { 1424 NULL, /* fmdo_recv */ 1425 etm_timeout, /* fmdo_timeout */ 1426 NULL, /* fmdo_close */ 1427 NULL, /* fmdo_stats */ 1428 NULL, /* fmdo_gc */ 1429 etm_send, /* fmdo_send */ 1430 }; 1431 1432 static const fmd_prop_t etm_props[] = { 1433 { "client_list", FMD_TYPE_STRING, NULL }, 1434 { "server_list", FMD_TYPE_STRING, NULL }, 1435 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1436 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" }, 1437 { "rw_timeout", FMD_TYPE_UINT64, "2000000000" }, 1438 { "filter_path", FMD_TYPE_STRING, NULL }, 1439 { NULL, 0, NULL } 1440 }; 1441 1442 static const fmd_hdl_info_t etm_info = { 1443 "Event Transport Module", "2.0", &etm_ops, etm_props 1444 }; 1445 1446 /* 1447 * Initialize the transport for use by ETM. 1448 */ 1449 void 1450 _fmd_init(fmd_hdl_t *hdl) 1451 { 1452 char *propstr; 1453 1454 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 1455 return; /* invalid data in configuration file */ 1456 } 1457 1458 /* Create global stats */ 1459 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 1460 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 1461 1462 /* Get module properties */ 1463 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 1464 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 1465 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 1466 1467 propstr = fmd_prop_get_string(hdl, "client_list"); 1468 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 1469 fmd_prop_free_string(hdl, propstr); 1470 1471 propstr = fmd_prop_get_string(hdl, "server_list"); 1472 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 1473 fmd_prop_free_string(hdl, propstr); 1474 1475 if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 1476 fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 1477 fmd_hdl_unregister(hdl); 1478 return; 1479 } 1480 } 1481 1482 /* 1483 * Teardown the transport 1484 */ 1485 void 1486 _fmd_fini(fmd_hdl_t *hdl) 1487 { 1488 etm_epmap_t *mp, *next; 1489 1490 (void) pthread_mutex_lock(&Etm_mod_lock); 1491 Etm_exit = 1; 1492 (void) pthread_mutex_unlock(&Etm_mod_lock); 1493 1494 mp = Epmap_head; 1495 1496 while (mp) { 1497 next = mp->epm_next; 1498 etm_free_epmap(hdl, mp); 1499 mp = next; 1500 } 1501 1502 fmd_hdl_unregister(hdl); 1503 } 1504