1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMA Event Transport Module 31 * 32 * Plugin for sending/receiving FMA events to/from a remote endoint. 33 */ 34 35 #include <netinet/in.h> 36 #include <errno.h> 37 #include <sys/fm/protocol.h> 38 #include <sys/sysmacros.h> 39 #include <pthread.h> 40 #include <strings.h> 41 #include <ctype.h> 42 #include <link.h> 43 #include <libnvpair.h> 44 #include "etm_xport_api.h" 45 #include "etm_proto.h" 46 47 /* 48 * ETM declarations 49 */ 50 51 typedef enum etm_connection_status { 52 C_UNINITIALIZED = 0, 53 C_OPEN, /* Connection is open */ 54 C_CLOSED, /* Connection is closed */ 55 C_LIMBO, /* Bad value in header from peer */ 56 C_TIMED_OUT /* Reconnection to peer timed out */ 57 } etm_connstat_t; 58 59 typedef enum etm_fmd_queue_status { 60 Q_UNINITIALIZED = 100, 61 Q_INIT_PENDING, /* Queue initialization in progress */ 62 Q_OPEN, /* Queue is open */ 63 Q_SUSPENDED /* Queue is suspended */ 64 } etm_qstat_t; 65 66 /* Per endpoint data */ 67 typedef struct etm_endpoint_map { 68 uint8_t epm_ver; /* Protocol version being used */ 69 char *epm_ep_str; /* Endpoint ID string */ 70 int epm_xprtflags; /* FMD transport open flags */ 71 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 72 pthread_mutex_t epm_lock; /* Protects remainder of struct */ 73 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 74 int epm_txbusy; /* Busy doing send/transmit */ 75 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 76 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 77 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 78 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 79 etm_connstat_t epm_cstat; /* Status of connection */ 80 id_t epm_timer_id; /* Timer id */ 81 int epm_timer_in_use; /* Indicates if timer is in use */ 82 hrtime_t epm_reconn_end; /* Reconnection end time */ 83 struct etm_endpoint_map *epm_next; 84 } etm_epmap_t; 85 86 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 87 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 88 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 89 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 90 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 91 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 92 93 #define ALLOC_BUF(hdl, buf, size) \ 94 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 95 96 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 97 98 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 99 100 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 101 (x)++; \ 102 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 103 } 104 105 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 106 (x)--; \ 107 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 108 } 109 110 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 111 (x) += (y); \ 112 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 113 } 114 115 /* 116 * Global variables 117 */ 118 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 119 /* Protects globals */ 120 static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 121 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 122 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 123 static int Etm_dump = 0; /* Enables hex dump for debug */ 124 static int Etm_exit = 0; /* Flag for exit */ 125 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 126 127 /* Module statistics */ 128 static struct etm_stats { 129 /* read counters */ 130 fmd_stat_t read_ack; 131 fmd_stat_t read_bytes; 132 fmd_stat_t read_msg; 133 fmd_stat_t post_filter; 134 /* write counters */ 135 fmd_stat_t write_ack; 136 fmd_stat_t write_bytes; 137 fmd_stat_t write_msg; 138 fmd_stat_t send_filter; 139 /* error counters */ 140 fmd_stat_t error_protocol; 141 fmd_stat_t error_drop_read; 142 fmd_stat_t error_read; 143 fmd_stat_t error_read_badhdr; 144 fmd_stat_t error_write; 145 fmd_stat_t error_send_filter; 146 fmd_stat_t error_post_filter; 147 /* misc */ 148 fmd_stat_t peer_count; 149 150 } Etm_stats = { 151 /* read counters */ 152 { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 153 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 154 { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 155 { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" }, 156 /* write counters */ 157 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 158 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 159 { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 160 { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" }, 161 /* ETM error counters */ 162 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 163 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 164 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 165 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 166 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 167 { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" }, 168 { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" }, 169 /* ETM Misc */ 170 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 171 }; 172 173 /* 174 * ETM Private functions 175 */ 176 177 /* 178 * Hex dump for debug. 179 */ 180 static void 181 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 182 { 183 int i, j, k; 184 int16_t *c; 185 186 if (Etm_dump == 0) 187 return; 188 189 j = buflen / 16; /* Number of complete 8-column rows */ 190 k = buflen % 16; /* Is there a last (non-8-column) row? */ 191 192 if (direction) 193 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 194 else 195 fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 196 197 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 198 199 /* Dump the complete 8-column rows */ 200 for (i = 0; i < j; i++) { 201 c = (int16_t *)buf + (i * 8); 202 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 203 *(c+0), *(c+1), *(c+2), *(c+3), 204 *(c+4), *(c+5), *(c+6), *(c+7)); 205 } 206 207 /* Dump the last (incomplete) row */ 208 c = (int16_t *)buf + (i * 8); 209 switch (k) { 210 case 4: 211 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 212 break; 213 case 8: 214 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 215 *(c+2), *(c+3)); 216 break; 217 case 12: 218 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 219 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 220 break; 221 } 222 223 fmd_hdl_debug(hdl, "--- End Dump ---"); 224 } 225 226 /* 227 * Provide the length of a message based on the data in the given ETM header. 228 */ 229 static size_t 230 etm_get_msglen(void *buf) 231 { 232 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 233 234 return (ntohl(hp->hdr_msglen)); 235 } 236 237 /* 238 * Check the contents of the ETM header for errors. 239 * Return the header type (hdr_type). 240 */ 241 static int 242 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 243 { 244 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 245 246 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 247 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 248 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 249 return (ETM_HDR_INVALID); 250 } 251 252 if ((hp->hdr_type == ETM_HDR_C_HELLO) || 253 (hp->hdr_type == ETM_HDR_S_HELLO)) { 254 /* Until version is negotiated, other fields may be wrong */ 255 return (hp->hdr_type); 256 } 257 258 if (hp->hdr_ver != mp->epm_ver) { 259 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 260 mp->epm_ep_str, hp->hdr_ver); 261 return (ETM_HDR_BADVERSION); 262 } 263 264 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 265 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 266 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 267 mp->epm_ep_str, hp->hdr_type); 268 return (ETM_HDR_BADTYPE); 269 } 270 271 return (hp->hdr_type); 272 } 273 274 /* 275 * Create an ETM header of a given type in the given buffer. 276 * Return length of header. 277 */ 278 static size_t 279 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 280 { 281 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 282 283 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 284 hp->hdr_ver = ver; 285 hp->hdr_type = type; 286 hp->hdr_msglen = htonl(msglen); 287 288 return (ETM_HDRLEN); 289 } 290 291 /* 292 * Convert message bytes to nvlist and post to fmd. 293 * Return zero for success, non-zero for failure. 294 * 295 * Note : nvl is free'd by fmd. 296 */ 297 static int 298 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 299 { 300 nvlist_t *nvl; 301 int rv; 302 303 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 304 fmd_hdl_error(hdl, "failed to unpack message"); 305 return (1); 306 } 307 308 rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str); 309 if (rv == ETM_XPORT_FILTER_DROP) { 310 fmd_hdl_debug(hdl, "post_filter dropped event"); 311 INCRSTAT(Etm_stats.post_filter.fmds_value.ui64); 312 nvlist_free(nvl); 313 return (0); 314 } else if (rv == ETM_XPORT_FILTER_ERROR) { 315 fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno)); 316 INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64); 317 /* Still post event */ 318 } 319 320 (void) pthread_mutex_lock(&mp->epm_lock); 321 (void) pthread_mutex_lock(&Etm_mod_lock); 322 if (!Etm_exit) { 323 (void) pthread_mutex_unlock(&Etm_mod_lock); 324 if (mp->epm_qstat == Q_OPEN) { 325 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 326 rv = 0; 327 } else if (mp->epm_qstat == Q_SUSPENDED) { 328 fmd_xprt_resume(hdl, mp->epm_xprthdl); 329 if (mp->epm_timer_in_use) { 330 fmd_timer_remove(hdl, mp->epm_timer_id); 331 mp->epm_timer_in_use = 0; 332 } 333 mp->epm_qstat = Q_OPEN; 334 fmd_hdl_debug(hdl, "queue resumed for %s", 335 mp->epm_ep_str); 336 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 337 rv = 0; 338 } else { 339 fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 340 mp->epm_qstat); 341 nvlist_free(nvl); 342 /* Remote peer will attempt to resend event */ 343 rv = 2; 344 } 345 } else { 346 (void) pthread_mutex_unlock(&Etm_mod_lock); 347 fmd_hdl_debug(hdl, "unable to post message, module exiting"); 348 nvlist_free(nvl); 349 /* Remote peer will attempt to resend event */ 350 rv = 3; 351 } 352 353 (void) pthread_mutex_unlock(&mp->epm_lock); 354 355 return (rv); 356 } 357 358 /* 359 * Handle the startup handshake to the server. The client always initiates 360 * the startup handshake. In the following sequence, we are the client and 361 * the remote endpoint is the server. 362 * 363 * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 364 * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 365 * Client sends ACK and transitions to Q_OPEN state. 366 * Server receives ACK and transitions to Q_OPEN state. 367 * 368 * Return 0 for success, nonzero for failure. 369 */ 370 static int 371 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 372 { 373 etm_proto_hdr_t *hp; 374 size_t hdrlen = ETM_HDRLEN; 375 int hdrstat; 376 char hbuf[ETM_HDRLEN]; 377 378 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 379 return (1); 380 381 mp->epm_cstat = C_OPEN; 382 383 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 384 385 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 386 hdrlen)) != hdrlen) { 387 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 388 mp->epm_ep_str); 389 return (2); 390 } 391 392 mp->epm_qstat = Q_INIT_PENDING; 393 394 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 395 hdrlen)) != hdrlen) { 396 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 397 mp->epm_ep_str); 398 return (3); 399 } 400 401 hdrstat = etm_check_hdr(hdl, mp, hbuf); 402 403 if (hdrstat != ETM_HDR_S_HELLO) { 404 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 405 "from %s", mp->epm_ep_str); 406 return (4); 407 } 408 409 /* 410 * Get version from the server. 411 * Currently, only one version is supported. 412 */ 413 hp = (etm_proto_hdr_t *)(void *)hbuf; 414 if (hp->hdr_ver != ETM_PROTO_V1) { 415 fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 416 mp->epm_ep_str, hp->hdr_ver); 417 return (5); 418 } 419 mp->epm_ver = hp->hdr_ver; 420 421 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 422 423 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 424 hdrlen)) != hdrlen) { 425 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 426 mp->epm_ep_str); 427 return (6); 428 } 429 430 /* 431 * Call fmd_xprt_open and fmd_xprt_setspecific with 432 * Etm_mod_lock held to avoid race with etm_send thread. 433 */ 434 (void) pthread_mutex_lock(&Etm_mod_lock); 435 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 436 mp->epm_ep_nvl, NULL)) == NULL) { 437 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 438 mp->epm_ep_str); 439 } 440 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 441 (void) pthread_mutex_unlock(&Etm_mod_lock); 442 443 mp->epm_qstat = Q_OPEN; 444 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 445 446 return (0); 447 } 448 449 /* 450 * Open a connection to the peer, send a SHUTDOWN message, 451 * and close the connection. 452 */ 453 static void 454 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp) 455 { 456 size_t hdrlen = ETM_HDRLEN; 457 char hbuf[ETM_HDRLEN]; 458 459 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 460 return; 461 462 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0); 463 464 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen); 465 466 (void) etm_xport_close(hdl, mp->epm_oconn); 467 mp->epm_oconn = NULL; 468 } 469 470 /* 471 * Alloc a nvlist and add a string for the endpoint. 472 * Return zero for success, non-zero for failure. 473 */ 474 static int 475 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 476 { 477 /* 478 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 479 * in fmd when this nvlist_t is free'd. 480 */ 481 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 482 483 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 484 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 485 "for %s", mp->epm_ep_str); 486 nvlist_free(mp->epm_ep_nvl); 487 return (1); 488 } 489 490 return (0); 491 } 492 493 /* 494 * Free the nvlist for the endpoint_id string. 495 */ 496 /*ARGSUSED*/ 497 static void 498 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 499 { 500 nvlist_free(mp->epm_ep_nvl); 501 } 502 503 /* 504 * Check for a duplicate endpoint/peer string. 505 */ 506 /*ARGSUSED*/ 507 static int 508 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 509 { 510 etm_epmap_t *mp; 511 512 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 513 if (strcmp(epname, mp->epm_ep_str) == 0) 514 return (1); 515 516 return (0); 517 } 518 519 /* 520 * Attempt to re-open a connection with the remote endpoint. 521 */ 522 static void 523 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 524 { 525 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 526 if (gethrtime() < mp->epm_reconn_end) { 527 if ((mp->epm_oconn = etm_xport_open(hdl, 528 mp->epm_tlhdl)) == NULL) { 529 fmd_hdl_debug(hdl, "reconnect failed for %s", 530 mp->epm_ep_str); 531 mp->epm_timer_id = fmd_timer_install(hdl, mp, 532 NULL, Reconn_interval); 533 mp->epm_timer_in_use = 1; 534 } else { 535 fmd_hdl_debug(hdl, "reconnect success for %s", 536 mp->epm_ep_str); 537 mp->epm_reconn_end = 0; 538 mp->epm_cstat = C_OPEN; 539 } 540 } else { 541 fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 542 mp->epm_ep_str); 543 mp->epm_reconn_end = 0; 544 mp->epm_cstat = C_TIMED_OUT; 545 } 546 } 547 548 if (mp->epm_cstat == C_OPEN) { 549 fmd_xprt_resume(hdl, mp->epm_xprthdl); 550 mp->epm_qstat = Q_OPEN; 551 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 552 } 553 } 554 555 /* 556 * Suspend a given connection and setup for reconnection retries. 557 * Assume caller holds lock on epm_lock. 558 */ 559 static void 560 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 561 { 562 (void) pthread_mutex_lock(&Etm_mod_lock); 563 if (Etm_exit) { 564 (void) pthread_mutex_unlock(&Etm_mod_lock); 565 return; 566 } 567 (void) pthread_mutex_unlock(&Etm_mod_lock); 568 569 if (mp->epm_oconn != NULL) { 570 (void) etm_xport_close(hdl, mp->epm_oconn); 571 mp->epm_oconn = NULL; 572 } 573 574 mp->epm_reconn_end = gethrtime() + Reconn_timeout; 575 mp->epm_cstat = C_UNINITIALIZED; 576 577 if (mp->epm_xprthdl != NULL) { 578 fmd_xprt_suspend(hdl, mp->epm_xprthdl); 579 mp->epm_qstat = Q_SUSPENDED; 580 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 581 582 if (mp->epm_timer_in_use == 0) { 583 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 584 Reconn_interval); 585 mp->epm_timer_in_use = 1; 586 } 587 } 588 } 589 590 /* 591 * Reinitialize the connection. The old fmd_xprt_t handle must be 592 * removed/closed first. 593 * Assume caller holds lock on epm_lock. 594 */ 595 static void 596 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 597 { 598 /* 599 * To avoid a deadlock, wait for etm_send to finish before 600 * calling fmd_xprt_close() 601 */ 602 while (mp->epm_txbusy) 603 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 604 605 if (mp->epm_xprthdl != NULL) { 606 fmd_xprt_close(hdl, mp->epm_xprthdl); 607 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 608 mp->epm_xprthdl = NULL; 609 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 610 mp->epm_ep_nvl = NULL; 611 } 612 613 if (mp->epm_timer_in_use) { 614 fmd_timer_remove(hdl, mp->epm_timer_id); 615 mp->epm_timer_in_use = 0; 616 } 617 618 if (mp->epm_oconn != NULL) { 619 (void) etm_xport_close(hdl, mp->epm_oconn); 620 mp->epm_oconn = NULL; 621 } 622 623 mp->epm_cstat = C_UNINITIALIZED; 624 mp->epm_qstat = Q_UNINITIALIZED; 625 } 626 627 /* 628 * Receive data from ETM transport layer. 629 * Note : This is not the fmdo_recv entry point. 630 * 631 */ 632 static int 633 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 634 { 635 size_t buflen, hdrlen; 636 void *buf; 637 char hbuf[ETM_HDRLEN]; 638 int hdrstat, rv; 639 640 hdrlen = ETM_HDRLEN; 641 642 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 643 fmd_hdl_debug(hdl, "failed to read header from %s", 644 mp->epm_ep_str); 645 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 646 return (EIO); 647 } 648 649 hdrstat = etm_check_hdr(hdl, mp, hbuf); 650 651 switch (hdrstat) { 652 case ETM_HDR_INVALID: 653 (void) pthread_mutex_lock(&mp->epm_lock); 654 if (mp->epm_cstat == C_OPEN) 655 mp->epm_cstat = C_CLOSED; 656 (void) pthread_mutex_unlock(&mp->epm_lock); 657 658 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 659 rv = ECANCELED; 660 break; 661 662 case ETM_HDR_BADTYPE: 663 case ETM_HDR_BADVERSION: 664 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 665 666 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 667 hdrlen)) != hdrlen) { 668 fmd_hdl_debug(hdl, "failed to write NAK to %s", 669 mp->epm_ep_str); 670 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 671 return (EIO); 672 } 673 674 (void) pthread_mutex_lock(&mp->epm_lock); 675 mp->epm_cstat = C_LIMBO; 676 (void) pthread_mutex_unlock(&mp->epm_lock); 677 678 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 679 rv = ENOTSUP; 680 break; 681 682 case ETM_HDR_C_HELLO: 683 /* Client is initiating a startup handshake */ 684 (void) pthread_mutex_lock(&mp->epm_lock); 685 etm_reinit(hdl, mp); 686 mp->epm_qstat = Q_INIT_PENDING; 687 (void) pthread_mutex_unlock(&mp->epm_lock); 688 689 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 690 691 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 692 hdrlen)) != hdrlen) { 693 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 694 mp->epm_ep_str); 695 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 696 return (EIO); 697 } 698 699 rv = 0; 700 break; 701 702 case ETM_HDR_ACK: 703 (void) pthread_mutex_lock(&mp->epm_lock); 704 if (mp->epm_qstat == Q_INIT_PENDING) { 705 /* This is client's ACK from startup handshake */ 706 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 707 if (mp->epm_ep_nvl == NULL) 708 (void) etm_get_ep_nvl(hdl, mp); 709 710 /* 711 * Call fmd_xprt_open and fmd_xprt_setspecific with 712 * Etm_mod_lock held to avoid race with etm_send thread. 713 */ 714 (void) pthread_mutex_lock(&Etm_mod_lock); 715 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 716 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 717 fmd_hdl_abort(hdl, "Failed to init xprthdl " 718 "for %s", mp->epm_ep_str); 719 } 720 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 721 (void) pthread_mutex_unlock(&Etm_mod_lock); 722 723 mp->epm_qstat = Q_OPEN; 724 (void) pthread_mutex_unlock(&mp->epm_lock); 725 fmd_hdl_debug(hdl, "queue open for %s", 726 mp->epm_ep_str); 727 } else { 728 (void) pthread_mutex_unlock(&mp->epm_lock); 729 fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 730 "from %s\n", mp->epm_ep_str); 731 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 732 } 733 734 rv = 0; 735 break; 736 737 case ETM_HDR_SHUTDOWN: 738 fmd_hdl_debug(hdl, "received shutdown from %s", 739 mp->epm_ep_str); 740 741 (void) pthread_mutex_lock(&mp->epm_lock); 742 743 etm_reinit(hdl, mp); 744 745 if (IS_CLIENT(mp)) { 746 /* 747 * A server shutdown is considered to be temporary. 748 * Prepare for reconnection. 749 */ 750 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 751 Reconn_interval); 752 753 mp->epm_timer_in_use = 1; 754 } 755 756 (void) pthread_mutex_unlock(&mp->epm_lock); 757 758 rv = ECANCELED; 759 break; 760 761 case ETM_HDR_MSG: 762 (void) pthread_mutex_lock(&mp->epm_lock); 763 if (mp->epm_qstat == Q_UNINITIALIZED) { 764 /* Peer (client) is unaware that we've restarted */ 765 (void) pthread_mutex_unlock(&mp->epm_lock); 766 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 767 ETM_HDR_S_RESTART, 0); 768 769 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 770 hdrlen)) != hdrlen) { 771 fmd_hdl_debug(hdl, "failed to write S_RESTART " 772 "to %s", mp->epm_ep_str); 773 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 774 return (EIO); 775 } 776 777 return (ECANCELED); 778 } 779 (void) pthread_mutex_unlock(&mp->epm_lock); 780 781 buflen = etm_get_msglen(hbuf); 782 ALLOC_BUF(hdl, buf, buflen); 783 784 if (etm_xport_read(hdl, conn, Rw_timeout, buf, 785 buflen) != buflen) { 786 fmd_hdl_debug(hdl, "failed to read message from %s", 787 mp->epm_ep_str); 788 FREE_BUF(hdl, buf, buflen); 789 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 790 return (EIO); 791 } 792 793 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 794 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 795 796 etm_hex_dump(hdl, buf, buflen, 0); 797 798 if (etm_post_msg(hdl, mp, buf, buflen)) { 799 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 800 FREE_BUF(hdl, buf, buflen); 801 return (EIO); 802 } 803 804 FREE_BUF(hdl, buf, buflen); 805 806 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 807 808 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 809 hdrlen)) != hdrlen) { 810 fmd_hdl_debug(hdl, "failed to write ACK to %s", 811 mp->epm_ep_str); 812 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 813 return (EIO); 814 } 815 816 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 817 818 /* 819 * If we got this far and the current state of the 820 * outbound/sending connection is TIMED_OUT or 821 * LIMBO, then we should reinitialize it. 822 */ 823 (void) pthread_mutex_lock(&mp->epm_lock); 824 if (mp->epm_cstat == C_TIMED_OUT || 825 mp->epm_cstat == C_LIMBO) { 826 if (mp->epm_oconn != NULL) { 827 (void) etm_xport_close(hdl, mp->epm_oconn); 828 mp->epm_oconn = NULL; 829 } 830 mp->epm_cstat = C_UNINITIALIZED; 831 fmd_xprt_resume(hdl, mp->epm_xprthdl); 832 if (mp->epm_timer_in_use) { 833 fmd_timer_remove(hdl, mp->epm_timer_id); 834 mp->epm_timer_in_use = 0; 835 } 836 mp->epm_qstat = Q_OPEN; 837 fmd_hdl_debug(hdl, "queue resumed for %s", 838 mp->epm_ep_str); 839 } 840 (void) pthread_mutex_unlock(&mp->epm_lock); 841 842 rv = 0; 843 break; 844 845 default: 846 fmd_hdl_debug(hdl, "protocol error, unexpected header " 847 "from %s : %d", mp->epm_ep_str, hdrstat); 848 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 849 rv = 0; 850 } 851 852 return (rv); 853 } 854 855 /* 856 * ETM transport layer callback function. 857 * The transport layer calls this function to : 858 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 859 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 860 */ 861 static int 862 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 863 void *arg) 864 { 865 etm_epmap_t *mp = (etm_epmap_t *)arg; 866 int rv = 0; 867 868 (void) pthread_mutex_lock(&Etm_mod_lock); 869 if (Etm_exit) { 870 (void) pthread_mutex_unlock(&Etm_mod_lock); 871 return (ECANCELED); 872 } 873 (void) pthread_mutex_unlock(&Etm_mod_lock); 874 875 switch (flag) { 876 case ETM_CBFLAG_RECV: 877 rv = etm_recv(hdl, conn, mp); 878 break; 879 case ETM_CBFLAG_REINIT: 880 (void) pthread_mutex_lock(&mp->epm_lock); 881 etm_reinit(hdl, mp); 882 etm_send_shutdown(hdl, mp); 883 (void) pthread_mutex_unlock(&mp->epm_lock); 884 /* 885 * Return ECANCELED so the transport layer will close the 886 * server connection. The transport layer is responsible for 887 * reestablishing this connection (should a connection request 888 * arrive from the peer). 889 */ 890 rv = ECANCELED; 891 break; 892 default: 893 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 894 rv = ENOTSUP; 895 } 896 897 return (rv); 898 } 899 900 /* 901 * Allocate and initialize an etm_epmap_t struct for the given endpoint 902 * name string. 903 */ 904 static void 905 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 906 { 907 etm_epmap_t *newmap; 908 909 if (etm_check_dup_ep_str(hdl, epname)) { 910 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 911 return; 912 } 913 914 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 915 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 916 newmap->epm_xprtflags = flags; 917 newmap->epm_cstat = C_UNINITIALIZED; 918 newmap->epm_qstat = Q_UNINITIALIZED; 919 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 920 newmap->epm_txbusy = 0; 921 922 (void) pthread_mutex_init(&newmap->epm_lock, NULL); 923 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 924 925 if (etm_get_ep_nvl(hdl, newmap)) { 926 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 927 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 928 return; 929 } 930 931 (void) pthread_mutex_lock(&newmap->epm_lock); 932 933 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 934 etm_cb_func, newmap)) == NULL) { 935 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 936 newmap->epm_ep_str); 937 etm_free_ep_nvl(hdl, newmap); 938 (void) pthread_mutex_unlock(&newmap->epm_lock); 939 (void) pthread_mutex_destroy(&newmap->epm_lock); 940 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 941 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 942 return; 943 } 944 945 if (IS_CLIENT(newmap)) { 946 if (etm_handle_startup(hdl, newmap)) { 947 /* 948 * For whatever reason, we could not complete the 949 * startup handshake with the server. Set the timer 950 * and try again. 951 */ 952 if (newmap->epm_oconn != NULL) { 953 (void) etm_xport_close(hdl, newmap->epm_oconn); 954 newmap->epm_oconn = NULL; 955 } 956 newmap->epm_cstat = C_UNINITIALIZED; 957 newmap->epm_qstat = Q_UNINITIALIZED; 958 newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 959 NULL, Reconn_interval); 960 newmap->epm_timer_in_use = 1; 961 } 962 } else { 963 /* 964 * We may be restarting after a crash. If so, the client 965 * may be unaware of this. 966 */ 967 etm_send_shutdown(hdl, newmap); 968 } 969 970 /* Add this transport instance handle to the list */ 971 newmap->epm_next = Epmap_head; 972 Epmap_head = newmap; 973 974 (void) pthread_mutex_unlock(&newmap->epm_lock); 975 976 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 977 } 978 979 /* 980 * Parse the given property list string and call etm_init_epmap 981 * for each endpoint. 982 */ 983 static void 984 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 985 { 986 char *epstr, *ep, *prefix, *lasts, *numstr; 987 char epname[MAXPATHLEN]; 988 size_t slen, nlen; 989 int beg, end, i; 990 991 if (eplist == NULL) 992 return; 993 /* 994 * Create a copy of eplist for parsing. 995 * strtok/strtok_r(3C) will insert null chars to the string. 996 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 997 */ 998 slen = strlen(eplist); 999 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 1000 (void) strcpy(epstr, eplist); 1001 1002 /* 1003 * The following are supported for the "client_list" and 1004 * "server_list" properties : 1005 * 1006 * A space-separated list of endpoints. 1007 * "dev:///dom0 dev:///dom1 dev:///dom2" 1008 * 1009 * An array syntax for a range of instances. 1010 * "dev:///dom[0:2]" 1011 * 1012 * A combination of both. 1013 * "dev:///dom0 dev:///dom[1:2]" 1014 */ 1015 ep = strtok_r(epstr, " ", &lasts); 1016 while (ep != NULL) { 1017 if (strchr(ep, '[') != NULL) { 1018 /* 1019 * This string is using array syntax. 1020 * Check the string for correct syntax. 1021 */ 1022 if ((strchr(ep, ':') == NULL) || 1023 (strchr(ep, ']') == NULL)) { 1024 fmd_hdl_error(hdl, "Syntax error in property " 1025 "that includes : %s\n", ep); 1026 ep = strtok_r(NULL, " ", &lasts); 1027 continue; 1028 } 1029 1030 /* expand the array syntax */ 1031 prefix = strtok(ep, "["); 1032 1033 numstr = strtok(NULL, ":"); 1034 if ((numstr == NULL) || (!isdigit(*numstr))) { 1035 fmd_hdl_error(hdl, "Syntax error in property " 1036 "that includes : %s[\n", prefix); 1037 ep = strtok_r(NULL, " ", &lasts); 1038 continue; 1039 } 1040 beg = atoi(numstr); 1041 1042 numstr = strtok(NULL, "]"); 1043 if ((numstr == NULL) || (!isdigit(*numstr))) { 1044 fmd_hdl_error(hdl, "Syntax error in property " 1045 "that includes : %s[\n", prefix); 1046 ep = strtok_r(NULL, " ", &lasts); 1047 continue; 1048 } 1049 end = atoi(numstr); 1050 1051 nlen = strlen(prefix) + ETM_EP_INST_MAX; 1052 1053 if (nlen > MAXPATHLEN) { 1054 fmd_hdl_error(hdl, "Endpoint prop string " 1055 "exceeds MAXPATHLEN\n"); 1056 ep = strtok_r(NULL, " ", &lasts); 1057 continue; 1058 } 1059 1060 for (i = beg; i <= end; i++) { 1061 bzero(epname, MAXPATHLEN); 1062 (void) snprintf(epname, nlen, "%s%d", 1063 prefix, i); 1064 etm_init_epmap(hdl, epname, flags); 1065 } 1066 } else { 1067 etm_init_epmap(hdl, ep, flags); 1068 } 1069 1070 ep = strtok_r(NULL, " ", &lasts); 1071 } 1072 1073 fmd_hdl_free(hdl, epstr, slen + 1); 1074 } 1075 1076 /* 1077 * Free the transport infrastructure for an endpoint. 1078 */ 1079 static void 1080 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 1081 { 1082 size_t hdrlen; 1083 char hbuf[ETM_HDRLEN]; 1084 1085 (void) pthread_mutex_lock(&mp->epm_lock); 1086 1087 /* 1088 * If an etm_send thread is in progress, wait for it to finish. 1089 * The etm_recv thread is managed by the transport layer and will 1090 * be destroyed with etm_xport_fini(). 1091 */ 1092 while (mp->epm_txbusy) 1093 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 1094 1095 if (mp->epm_timer_in_use) 1096 fmd_timer_remove(hdl, mp->epm_timer_id); 1097 1098 if (mp->epm_oconn != NULL) { 1099 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 1100 ETM_HDR_SHUTDOWN, 0); 1101 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 1102 hdrlen); 1103 (void) etm_xport_close(hdl, mp->epm_oconn); 1104 mp->epm_oconn = NULL; 1105 } 1106 1107 if (mp->epm_xprthdl != NULL) { 1108 fmd_xprt_close(hdl, mp->epm_xprthdl); 1109 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1110 mp->epm_ep_nvl = NULL; 1111 } 1112 1113 if (mp->epm_ep_nvl != NULL) 1114 etm_free_ep_nvl(hdl, mp); 1115 1116 if (mp->epm_tlhdl != NULL) 1117 (void) etm_xport_fini(hdl, mp->epm_tlhdl); 1118 1119 (void) pthread_mutex_unlock(&mp->epm_lock); 1120 (void) pthread_mutex_destroy(&mp->epm_lock); 1121 fmd_hdl_strfree(hdl, mp->epm_ep_str); 1122 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 1123 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 1124 } 1125 1126 /* 1127 * FMD entry points 1128 */ 1129 1130 /* 1131 * FMD fmdo_send entry point. 1132 * Send an event to the remote endpoint and receive an ACK. 1133 */ 1134 static int 1135 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 1136 { 1137 etm_epmap_t *mp; 1138 nvlist_t *msgnvl; 1139 int hdrstat, rv, cnt = 0; 1140 char *buf, *nvbuf, *class; 1141 size_t nvsize, buflen, hdrlen; 1142 struct timespec tms; 1143 1144 (void) pthread_mutex_lock(&Etm_mod_lock); 1145 if (Etm_exit) { 1146 (void) pthread_mutex_unlock(&Etm_mod_lock); 1147 return (FMD_SEND_RETRY); 1148 } 1149 (void) pthread_mutex_unlock(&Etm_mod_lock); 1150 1151 mp = fmd_xprt_getspecific(hdl, xprthdl); 1152 1153 for (;;) { 1154 if (pthread_mutex_trylock(&mp->epm_lock) == 0) { 1155 break; 1156 } else { 1157 /* 1158 * Another thread may be (1) trying to close this 1159 * fmd_xprt_t, or (2) posting an event to it. 1160 * If (1), don't want to spend too much time here. 1161 * If (2), allow it to finish and release epm_lock. 1162 */ 1163 if (cnt++ < 10) { 1164 tms.tv_sec = 0; 1165 tms.tv_nsec = (cnt * 10000); 1166 (void) nanosleep(&tms, NULL); 1167 1168 } else { 1169 return (FMD_SEND_RETRY); 1170 } 1171 } 1172 } 1173 1174 mp->epm_txbusy++; 1175 1176 if (mp->epm_qstat == Q_UNINITIALIZED) { 1177 mp->epm_txbusy--; 1178 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1179 (void) pthread_mutex_unlock(&mp->epm_lock); 1180 return (FMD_SEND_FAILED); 1181 } 1182 1183 if (mp->epm_cstat == C_CLOSED) { 1184 etm_suspend_reconnect(hdl, mp); 1185 mp->epm_txbusy--; 1186 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1187 (void) pthread_mutex_unlock(&mp->epm_lock); 1188 return (FMD_SEND_RETRY); 1189 } 1190 1191 if (mp->epm_cstat == C_LIMBO) { 1192 if (mp->epm_oconn != NULL) { 1193 (void) etm_xport_close(hdl, mp->epm_oconn); 1194 mp->epm_oconn = NULL; 1195 } 1196 1197 fmd_xprt_suspend(hdl, xprthdl); 1198 mp->epm_qstat = Q_SUSPENDED; 1199 mp->epm_txbusy--; 1200 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1201 (void) pthread_mutex_unlock(&mp->epm_lock); 1202 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 1203 return (FMD_SEND_RETRY); 1204 } 1205 1206 if (mp->epm_oconn == NULL) { 1207 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 1208 == NULL) { 1209 etm_suspend_reconnect(hdl, mp); 1210 mp->epm_txbusy--; 1211 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1212 (void) pthread_mutex_unlock(&mp->epm_lock); 1213 return (FMD_SEND_RETRY); 1214 } else { 1215 mp->epm_cstat = C_OPEN; 1216 } 1217 } 1218 1219 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 1220 fmd_hdl_abort(hdl, "No class string in nvlist"); 1221 1222 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 1223 if (msgnvl == NULL) { 1224 mp->epm_txbusy--; 1225 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1226 (void) pthread_mutex_unlock(&mp->epm_lock); 1227 fmd_hdl_error(hdl, "Failed to translate event %p\n", 1228 (void *) ep); 1229 return (FMD_SEND_FAILED); 1230 } 1231 1232 rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str); 1233 if (rv == ETM_XPORT_FILTER_DROP) { 1234 mp->epm_txbusy--; 1235 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1236 (void) pthread_mutex_unlock(&mp->epm_lock); 1237 fmd_hdl_debug(hdl, "send_filter dropped event"); 1238 nvlist_free(msgnvl); 1239 INCRSTAT(Etm_stats.send_filter.fmds_value.ui64); 1240 return (FMD_SEND_SUCCESS); 1241 } else if (rv == ETM_XPORT_FILTER_ERROR) { 1242 fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno)); 1243 INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64); 1244 /* Still send event */ 1245 } 1246 1247 (void) pthread_mutex_unlock(&mp->epm_lock); 1248 1249 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 1250 1251 hdrlen = ETM_HDRLEN; 1252 buflen = nvsize + hdrlen; 1253 1254 ALLOC_BUF(hdl, buf, buflen); 1255 1256 nvbuf = buf + hdrlen; 1257 1258 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 1259 1260 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1261 (void) pthread_mutex_lock(&mp->epm_lock); 1262 mp->epm_txbusy--; 1263 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1264 (void) pthread_mutex_unlock(&mp->epm_lock); 1265 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1266 nvlist_free(msgnvl); 1267 FREE_BUF(hdl, buf, buflen); 1268 return (FMD_SEND_FAILED); 1269 } 1270 1271 nvlist_free(msgnvl); 1272 1273 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 1274 buflen) != buflen) { 1275 fmd_hdl_debug(hdl, "failed to send message to %s", 1276 mp->epm_ep_str); 1277 (void) pthread_mutex_lock(&mp->epm_lock); 1278 etm_suspend_reconnect(hdl, mp); 1279 mp->epm_txbusy--; 1280 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1281 (void) pthread_mutex_unlock(&mp->epm_lock); 1282 FREE_BUF(hdl, buf, buflen); 1283 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 1284 return (FMD_SEND_RETRY); 1285 } 1286 1287 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 1288 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 1289 1290 etm_hex_dump(hdl, nvbuf, nvsize, 1); 1291 1292 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 1293 hdrlen) != hdrlen) { 1294 fmd_hdl_debug(hdl, "failed to read ACK from %s", 1295 mp->epm_ep_str); 1296 (void) pthread_mutex_lock(&mp->epm_lock); 1297 etm_suspend_reconnect(hdl, mp); 1298 mp->epm_txbusy--; 1299 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1300 (void) pthread_mutex_unlock(&mp->epm_lock); 1301 FREE_BUF(hdl, buf, buflen); 1302 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 1303 return (FMD_SEND_RETRY); 1304 } 1305 1306 hdrstat = etm_check_hdr(hdl, mp, buf); 1307 FREE_BUF(hdl, buf, buflen); 1308 1309 if (hdrstat == ETM_HDR_ACK) { 1310 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 1311 } else { 1312 (void) pthread_mutex_lock(&mp->epm_lock); 1313 1314 (void) etm_xport_close(hdl, mp->epm_oconn); 1315 mp->epm_oconn = NULL; 1316 1317 if (hdrstat == ETM_HDR_NAK) { 1318 /* Peer received a bad value in the header */ 1319 if (mp->epm_xprthdl != NULL) { 1320 mp->epm_cstat = C_LIMBO; 1321 fmd_xprt_suspend(hdl, xprthdl); 1322 mp->epm_qstat = Q_SUSPENDED; 1323 fmd_hdl_debug(hdl, "received NAK, queue " 1324 "suspended for %s", mp->epm_ep_str); 1325 } 1326 1327 rv = FMD_SEND_RETRY; 1328 1329 } else if (hdrstat == ETM_HDR_S_RESTART) { 1330 /* Server has restarted */ 1331 mp->epm_cstat = C_CLOSED; 1332 mp->epm_qstat = Q_UNINITIALIZED; 1333 fmd_hdl_debug(hdl, "server %s restarted", 1334 mp->epm_ep_str); 1335 /* 1336 * Cannot call fmd_xprt_close here, so we'll do it 1337 * on the timeout thread. 1338 */ 1339 if (mp->epm_timer_in_use == 0) { 1340 mp->epm_timer_id = fmd_timer_install( 1341 hdl, mp, NULL, 0); 1342 mp->epm_timer_in_use = 1; 1343 } 1344 1345 /* 1346 * fault.* or list.* events will be replayed if a 1347 * transport is opened with the same auth. 1348 * Other events will be discarded. 1349 */ 1350 rv = FMD_SEND_FAILED; 1351 1352 } else { 1353 mp->epm_cstat = C_CLOSED; 1354 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 1355 1356 rv = FMD_SEND_RETRY; 1357 } 1358 1359 mp->epm_txbusy--; 1360 1361 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1362 (void) pthread_mutex_unlock(&mp->epm_lock); 1363 1364 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 1365 1366 return (rv); 1367 } 1368 1369 (void) pthread_mutex_lock(&mp->epm_lock); 1370 mp->epm_txbusy--; 1371 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1372 (void) pthread_mutex_unlock(&mp->epm_lock); 1373 1374 return (FMD_SEND_SUCCESS); 1375 } 1376 1377 /* 1378 * FMD fmdo_timeout entry point.. 1379 */ 1380 /*ARGSUSED*/ 1381 static void 1382 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 1383 { 1384 etm_epmap_t *mp = (etm_epmap_t *)data; 1385 1386 (void) pthread_mutex_lock(&mp->epm_lock); 1387 1388 mp->epm_timer_in_use = 0; 1389 1390 if (mp->epm_qstat == Q_UNINITIALIZED) { 1391 /* Server has shutdown and we (client) need to reconnect */ 1392 if (mp->epm_xprthdl != NULL) { 1393 fmd_xprt_close(hdl, mp->epm_xprthdl); 1394 fmd_hdl_debug(hdl, "queue closed for %s", 1395 mp->epm_ep_str); 1396 mp->epm_xprthdl = NULL; 1397 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1398 mp->epm_ep_nvl = NULL; 1399 } 1400 1401 if (mp->epm_ep_nvl == NULL) 1402 (void) etm_get_ep_nvl(hdl, mp); 1403 1404 if (etm_handle_startup(hdl, mp)) { 1405 if (mp->epm_oconn != NULL) { 1406 (void) etm_xport_close(hdl, mp->epm_oconn); 1407 mp->epm_oconn = NULL; 1408 } 1409 mp->epm_cstat = C_UNINITIALIZED; 1410 mp->epm_qstat = Q_UNINITIALIZED; 1411 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 1412 Reconn_interval); 1413 mp->epm_timer_in_use = 1; 1414 } 1415 } else { 1416 etm_reconnect(hdl, mp); 1417 } 1418 1419 (void) pthread_mutex_unlock(&mp->epm_lock); 1420 } 1421 1422 /* 1423 * FMD Module declarations 1424 */ 1425 static const fmd_hdl_ops_t etm_ops = { 1426 NULL, /* fmdo_recv */ 1427 etm_timeout, /* fmdo_timeout */ 1428 NULL, /* fmdo_close */ 1429 NULL, /* fmdo_stats */ 1430 NULL, /* fmdo_gc */ 1431 etm_send, /* fmdo_send */ 1432 }; 1433 1434 static const fmd_prop_t etm_props[] = { 1435 { "client_list", FMD_TYPE_STRING, NULL }, 1436 { "server_list", FMD_TYPE_STRING, NULL }, 1437 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1438 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" }, 1439 { "rw_timeout", FMD_TYPE_UINT64, "2000000000" }, 1440 { "filter_path", FMD_TYPE_STRING, NULL }, 1441 { NULL, 0, NULL } 1442 }; 1443 1444 static const fmd_hdl_info_t etm_info = { 1445 "Event Transport Module", "2.0", &etm_ops, etm_props 1446 }; 1447 1448 /* 1449 * Initialize the transport for use by ETM. 1450 */ 1451 void 1452 _fmd_init(fmd_hdl_t *hdl) 1453 { 1454 char *propstr; 1455 1456 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 1457 return; /* invalid data in configuration file */ 1458 } 1459 1460 /* Create global stats */ 1461 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 1462 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 1463 1464 /* Get module properties */ 1465 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 1466 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 1467 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 1468 1469 propstr = fmd_prop_get_string(hdl, "client_list"); 1470 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 1471 fmd_prop_free_string(hdl, propstr); 1472 1473 propstr = fmd_prop_get_string(hdl, "server_list"); 1474 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 1475 fmd_prop_free_string(hdl, propstr); 1476 1477 if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 1478 fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 1479 fmd_hdl_unregister(hdl); 1480 return; 1481 } 1482 } 1483 1484 /* 1485 * Teardown the transport 1486 */ 1487 void 1488 _fmd_fini(fmd_hdl_t *hdl) 1489 { 1490 etm_epmap_t *mp, *next; 1491 1492 (void) pthread_mutex_lock(&Etm_mod_lock); 1493 Etm_exit = 1; 1494 (void) pthread_mutex_unlock(&Etm_mod_lock); 1495 1496 mp = Epmap_head; 1497 1498 while (mp) { 1499 next = mp->epm_next; 1500 etm_free_epmap(hdl, mp); 1501 mp = next; 1502 } 1503 1504 fmd_hdl_unregister(hdl); 1505 } 1506