1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMA Event Transport Module 31 * 32 * Plugin for sending/receiving FMA events to/from a remote endoint. 33 */ 34 35 #include <netinet/in.h> 36 #include <sys/fm/protocol.h> 37 #include <sys/sysmacros.h> 38 #include <pthread.h> 39 #include <strings.h> 40 #include <ctype.h> 41 #include <link.h> 42 #include <libnvpair.h> 43 #include "etm_xport_api.h" 44 #include "etm_proto.h" 45 46 /* 47 * ETM declarations 48 */ 49 50 typedef enum etm_connection_status { 51 C_UNINITIALIZED = 0, 52 C_OPEN, /* Connection is open */ 53 C_CLOSED, /* Connection is closed */ 54 C_LIMBO, /* Bad value in header from peer */ 55 C_TIMED_OUT /* Reconnection to peer timed out */ 56 } etm_connstat_t; 57 58 typedef enum etm_fmd_queue_status { 59 Q_UNINITIALIZED = 100, 60 Q_INIT_PENDING, /* Queue initialization in progress */ 61 Q_OPEN, /* Queue is open */ 62 Q_SUSPENDED /* Queue is suspended */ 63 } etm_qstat_t; 64 65 /* Per endpoint data */ 66 typedef struct etm_endpoint_map { 67 uint8_t epm_ver; /* Protocol version being used */ 68 char *epm_ep_str; /* Endpoint ID string */ 69 int epm_xprtflags; /* FMD transport open flags */ 70 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 71 pthread_mutex_t epm_lock; /* Protects remainder of struct */ 72 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 73 int epm_txbusy; /* Busy doing send/transmit */ 74 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 75 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 76 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 77 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 78 etm_connstat_t epm_cstat; /* Status of connection */ 79 id_t epm_timer_id; /* Timer id */ 80 int epm_timer_in_use; /* Indicates if timer is in use */ 81 hrtime_t epm_reconn_end; /* Reconnection end time */ 82 struct etm_endpoint_map *epm_next; 83 } etm_epmap_t; 84 85 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 86 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 87 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 88 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 89 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 90 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 91 92 #define ALLOC_BUF(hdl, buf, size) \ 93 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 94 95 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 96 97 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 98 99 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 100 (x)++; \ 101 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 102 } 103 104 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 105 (x)--; \ 106 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 107 } 108 109 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 110 (x) += (y); \ 111 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 112 } 113 114 /* 115 * Global variables 116 */ 117 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 118 /* Protects globals */ 119 static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 120 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 121 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 122 static int Etm_dump = 0; /* Enables hex dump for debug */ 123 static int Etm_exit = 0; /* Flag for exit */ 124 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 125 126 /* Module statistics */ 127 static struct etm_stats { 128 /* read counters */ 129 fmd_stat_t read_ack; 130 fmd_stat_t read_bytes; 131 fmd_stat_t read_msg; 132 /* write counters */ 133 fmd_stat_t write_ack; 134 fmd_stat_t write_bytes; 135 fmd_stat_t write_msg; 136 /* error counters */ 137 fmd_stat_t error_protocol; 138 fmd_stat_t error_drop_read; 139 fmd_stat_t error_read; 140 fmd_stat_t error_read_badhdr; 141 fmd_stat_t error_write; 142 /* misc */ 143 fmd_stat_t peer_count; 144 145 } Etm_stats = { 146 /* read counters */ 147 { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 148 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 149 { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 150 /* write counters */ 151 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 152 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 153 { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 154 /* ETM error counters */ 155 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 156 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 157 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 158 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 159 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 160 /* ETM Misc */ 161 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 162 }; 163 164 /* 165 * ETM Private functions 166 */ 167 168 /* 169 * Hex dump for debug. 170 */ 171 static void 172 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 173 { 174 int i, j, k; 175 int16_t *c; 176 177 if (Etm_dump == 0) 178 return; 179 180 j = buflen / 16; /* Number of complete 8-column rows */ 181 k = buflen % 16; /* Is there a last (non-8-column) row? */ 182 183 if (direction) 184 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 185 else 186 fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 187 188 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 189 190 /* Dump the complete 8-column rows */ 191 for (i = 0; i < j; i++) { 192 c = (int16_t *)buf + (i * 8); 193 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 194 *(c+0), *(c+1), *(c+2), *(c+3), 195 *(c+4), *(c+5), *(c+6), *(c+7)); 196 } 197 198 /* Dump the last (incomplete) row */ 199 c = (int16_t *)buf + (i * 8); 200 switch (k) { 201 case 4: 202 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 203 break; 204 case 8: 205 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 206 *(c+2), *(c+3)); 207 break; 208 case 12: 209 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 210 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 211 break; 212 } 213 214 fmd_hdl_debug(hdl, "--- End Dump ---"); 215 } 216 217 /* 218 * Provide the length of a message based on the data in the given ETM header. 219 */ 220 static size_t 221 etm_get_msglen(void *buf) 222 { 223 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 224 225 return (ntohl(hp->hdr_msglen)); 226 } 227 228 /* 229 * Check the contents of the ETM header for errors. 230 * Return the header type (hdr_type). 231 */ 232 static int 233 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 234 { 235 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 236 237 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 238 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s " 239 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 240 return (ETM_HDR_INVALID); 241 } 242 243 if ((hp->hdr_type == ETM_HDR_C_HELLO) || 244 (hp->hdr_type == ETM_HDR_S_HELLO)) { 245 /* Until version is negotiated, other fields may be wrong */ 246 return (hp->hdr_type); 247 } 248 249 if (hp->hdr_ver != mp->epm_ver) { 250 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n", 251 mp->epm_ep_str, hp->hdr_ver); 252 return (ETM_HDR_BADVERSION); 253 } 254 255 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 256 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 257 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n", 258 mp->epm_ep_str, hp->hdr_type); 259 return (ETM_HDR_BADTYPE); 260 } 261 262 return (hp->hdr_type); 263 } 264 265 /* 266 * Create an ETM header of a given type in the given buffer. 267 * Return length of header. 268 */ 269 static size_t 270 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 271 { 272 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 273 274 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 275 hp->hdr_ver = ver; 276 hp->hdr_type = type; 277 hp->hdr_msglen = htonl(msglen); 278 279 return (ETM_HDRLEN); 280 } 281 282 /* 283 * Convert message bytes to nvlist and post to fmd. 284 * Return zero for success, non-zero for failure. 285 * 286 * Note : nvl is free'd by fmd. 287 */ 288 static int 289 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 290 { 291 nvlist_t *nvl; 292 int rv; 293 294 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 295 fmd_hdl_error(hdl, "failed to unpack message"); 296 return (1); 297 } 298 299 (void) pthread_mutex_lock(&mp->epm_lock); 300 (void) pthread_mutex_lock(&Etm_mod_lock); 301 if (!Etm_exit) { 302 (void) pthread_mutex_unlock(&Etm_mod_lock); 303 if (mp->epm_qstat == Q_OPEN) { 304 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 305 rv = 0; 306 } else if (mp->epm_qstat == Q_SUSPENDED) { 307 fmd_xprt_resume(hdl, mp->epm_xprthdl); 308 if (mp->epm_timer_in_use) { 309 fmd_timer_remove(hdl, mp->epm_timer_id); 310 mp->epm_timer_in_use = 0; 311 } 312 mp->epm_qstat = Q_OPEN; 313 fmd_hdl_debug(hdl, "queue resumed for %s", 314 mp->epm_ep_str); 315 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 316 rv = 0; 317 } else { 318 fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 319 mp->epm_qstat); 320 nvlist_free(nvl); 321 /* Remote peer will attempt to resend event */ 322 rv = 2; 323 } 324 } else { 325 (void) pthread_mutex_unlock(&Etm_mod_lock); 326 fmd_hdl_debug(hdl, "unable to post message, module exiting"); 327 nvlist_free(nvl); 328 /* Remote peer will attempt to resend event */ 329 rv = 3; 330 } 331 332 (void) pthread_mutex_unlock(&mp->epm_lock); 333 334 return (rv); 335 } 336 337 /* 338 * Handle the startup handshake to the server. The client always initiates 339 * the startup handshake. In the following sequence, we are the client and 340 * the remote endpoint is the server. 341 * 342 * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 343 * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 344 * Client sends ACK and transitions to Q_OPEN state. 345 * Server receives ACK and transitions to Q_OPEN state. 346 * 347 * Return 0 for success, nonzero for failure. 348 */ 349 static int 350 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 351 { 352 etm_proto_hdr_t *hp; 353 size_t hdrlen = ETM_HDRLEN; 354 int hdrstat; 355 char hbuf[ETM_HDRLEN]; 356 357 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 358 return (1); 359 360 mp->epm_cstat = C_OPEN; 361 362 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 363 364 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 365 hdrlen)) != hdrlen) { 366 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 367 mp->epm_ep_str); 368 return (2); 369 } 370 371 mp->epm_qstat = Q_INIT_PENDING; 372 373 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 374 hdrlen)) != hdrlen) { 375 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 376 mp->epm_ep_str); 377 return (3); 378 } 379 380 hdrstat = etm_check_hdr(hdl, mp, hbuf); 381 382 if (hdrstat != ETM_HDR_S_HELLO) { 383 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 384 "from %s", mp->epm_ep_str); 385 return (4); 386 } 387 388 /* 389 * Get version from the server. 390 * Currently, only one version is supported. 391 */ 392 hp = (etm_proto_hdr_t *)(void *)hbuf; 393 if (hp->hdr_ver != ETM_PROTO_V1) { 394 fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 395 mp->epm_ep_str, hp->hdr_ver); 396 return (5); 397 } 398 mp->epm_ver = hp->hdr_ver; 399 400 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 401 402 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 403 hdrlen)) != hdrlen) { 404 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 405 mp->epm_ep_str); 406 return (6); 407 } 408 409 /* 410 * Call fmd_xprt_open and fmd_xprt_setspecific with 411 * Etm_mod_lock held to avoid race with etm_send thread. 412 */ 413 (void) pthread_mutex_lock(&Etm_mod_lock); 414 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 415 mp->epm_ep_nvl, NULL)) == NULL) { 416 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 417 mp->epm_ep_str); 418 } 419 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 420 (void) pthread_mutex_unlock(&Etm_mod_lock); 421 422 mp->epm_qstat = Q_OPEN; 423 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 424 425 return (0); 426 } 427 428 /* 429 * Alloc a nvlist and add a string for the endpoint. 430 * Return zero for success, non-zero for failure. 431 */ 432 static int 433 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 434 { 435 /* 436 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 437 * in fmd when this nvlist_t is free'd. 438 */ 439 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 440 441 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 442 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist " 443 "for %s", mp->epm_ep_str); 444 nvlist_free(mp->epm_ep_nvl); 445 return (1); 446 } 447 448 return (0); 449 } 450 451 /* 452 * Free the nvlist for the endpoint_id string. 453 */ 454 /*ARGSUSED*/ 455 static void 456 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 457 { 458 nvlist_free(mp->epm_ep_nvl); 459 } 460 461 /* 462 * Check for a duplicate endpoint/peer string. 463 */ 464 /*ARGSUSED*/ 465 static int 466 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 467 { 468 etm_epmap_t *mp; 469 470 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 471 if (strcmp(epname, mp->epm_ep_str) == 0) 472 return (1); 473 474 return (0); 475 } 476 477 /* 478 * Attempt to re-open a connection with the remote endpoint. 479 */ 480 static void 481 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 482 { 483 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 484 if (gethrtime() < mp->epm_reconn_end) { 485 if ((mp->epm_oconn = etm_xport_open(hdl, 486 mp->epm_tlhdl)) == NULL) { 487 fmd_hdl_debug(hdl, "reconnect failed for %s", 488 mp->epm_ep_str); 489 mp->epm_timer_id = fmd_timer_install(hdl, mp, 490 NULL, Reconn_interval); 491 mp->epm_timer_in_use = 1; 492 } else { 493 fmd_hdl_debug(hdl, "reconnect success for %s", 494 mp->epm_ep_str); 495 mp->epm_reconn_end = 0; 496 mp->epm_cstat = C_OPEN; 497 } 498 } else { 499 fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 500 mp->epm_ep_str); 501 mp->epm_reconn_end = 0; 502 mp->epm_cstat = C_TIMED_OUT; 503 } 504 } 505 506 if (mp->epm_cstat == C_OPEN) { 507 fmd_xprt_resume(hdl, mp->epm_xprthdl); 508 mp->epm_qstat = Q_OPEN; 509 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 510 } 511 } 512 513 /* 514 * Suspend a given connection and setup for reconnection retries. 515 * Assume caller holds lock on epm_lock. 516 */ 517 static void 518 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 519 { 520 (void) pthread_mutex_lock(&Etm_mod_lock); 521 if (Etm_exit) { 522 (void) pthread_mutex_unlock(&Etm_mod_lock); 523 return; 524 } 525 (void) pthread_mutex_unlock(&Etm_mod_lock); 526 527 if (mp->epm_oconn != NULL) { 528 (void) etm_xport_close(hdl, mp->epm_oconn); 529 mp->epm_oconn = NULL; 530 } 531 532 mp->epm_reconn_end = gethrtime() + Reconn_timeout; 533 mp->epm_cstat = C_UNINITIALIZED; 534 535 if (mp->epm_xprthdl != NULL) { 536 fmd_xprt_suspend(hdl, mp->epm_xprthdl); 537 mp->epm_qstat = Q_SUSPENDED; 538 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 539 540 if (mp->epm_timer_in_use == 0) { 541 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 542 Reconn_interval); 543 mp->epm_timer_in_use = 1; 544 } 545 } 546 } 547 548 /* 549 * Reinitialize the connection. The old fmd_xprt_t handle must be 550 * removed/closed first. 551 * Assume caller holds lock on epm_lock. 552 */ 553 static void 554 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 555 { 556 /* 557 * To avoid a deadlock, wait for etm_send to finish before 558 * calling fmd_xprt_close() 559 */ 560 while (mp->epm_txbusy) 561 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 562 563 if (mp->epm_xprthdl != NULL) { 564 fmd_xprt_close(hdl, mp->epm_xprthdl); 565 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 566 mp->epm_xprthdl = NULL; 567 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 568 mp->epm_ep_nvl = NULL; 569 } 570 571 if (mp->epm_timer_in_use) { 572 fmd_timer_remove(hdl, mp->epm_timer_id); 573 mp->epm_timer_in_use = 0; 574 } 575 576 if (mp->epm_oconn != NULL) { 577 (void) etm_xport_close(hdl, mp->epm_oconn); 578 mp->epm_oconn = NULL; 579 } 580 581 mp->epm_cstat = C_UNINITIALIZED; 582 mp->epm_qstat = Q_UNINITIALIZED; 583 } 584 585 /* 586 * Receive data from ETM transport layer. 587 * Note : This is not the fmdo_recv entry point. 588 * 589 */ 590 static int 591 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 592 { 593 size_t buflen, hdrlen; 594 void *buf; 595 char hbuf[ETM_HDRLEN]; 596 int hdrstat, rv; 597 598 hdrlen = ETM_HDRLEN; 599 600 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 601 fmd_hdl_debug(hdl, "failed to read header from %s", 602 mp->epm_ep_str); 603 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 604 return (EIO); 605 } 606 607 hdrstat = etm_check_hdr(hdl, mp, hbuf); 608 609 switch (hdrstat) { 610 case ETM_HDR_INVALID: 611 (void) pthread_mutex_lock(&mp->epm_lock); 612 if (mp->epm_cstat == C_OPEN) 613 mp->epm_cstat = C_CLOSED; 614 (void) pthread_mutex_unlock(&mp->epm_lock); 615 616 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 617 rv = ECANCELED; 618 break; 619 620 case ETM_HDR_BADTYPE: 621 case ETM_HDR_BADVERSION: 622 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 623 624 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 625 hdrlen)) != hdrlen) { 626 fmd_hdl_debug(hdl, "failed to write NAK to %s", 627 mp->epm_ep_str); 628 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 629 return (EIO); 630 } 631 632 (void) pthread_mutex_lock(&mp->epm_lock); 633 mp->epm_cstat = C_LIMBO; 634 (void) pthread_mutex_unlock(&mp->epm_lock); 635 636 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 637 rv = ENOTSUP; 638 break; 639 640 case ETM_HDR_C_HELLO: 641 /* Client is initiating a startup handshake */ 642 (void) pthread_mutex_lock(&mp->epm_lock); 643 etm_reinit(hdl, mp); 644 mp->epm_qstat = Q_INIT_PENDING; 645 (void) pthread_mutex_unlock(&mp->epm_lock); 646 647 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 648 649 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 650 hdrlen)) != hdrlen) { 651 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 652 mp->epm_ep_str); 653 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 654 return (EIO); 655 } 656 657 rv = 0; 658 break; 659 660 case ETM_HDR_ACK: 661 (void) pthread_mutex_lock(&mp->epm_lock); 662 if (mp->epm_qstat == Q_INIT_PENDING) { 663 /* This is client's ACK from startup handshake */ 664 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 665 if (mp->epm_ep_nvl == NULL) 666 (void) etm_get_ep_nvl(hdl, mp); 667 668 /* 669 * Call fmd_xprt_open and fmd_xprt_setspecific with 670 * Etm_mod_lock held to avoid race with etm_send thread. 671 */ 672 (void) pthread_mutex_lock(&Etm_mod_lock); 673 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 674 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 675 fmd_hdl_abort(hdl, "Failed to init xprthdl " 676 "for %s", mp->epm_ep_str); 677 } 678 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 679 (void) pthread_mutex_unlock(&Etm_mod_lock); 680 681 mp->epm_qstat = Q_OPEN; 682 (void) pthread_mutex_unlock(&mp->epm_lock); 683 fmd_hdl_debug(hdl, "queue open for %s", 684 mp->epm_ep_str); 685 } else { 686 (void) pthread_mutex_unlock(&mp->epm_lock); 687 fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 688 "from %s\n", mp->epm_ep_str); 689 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 690 } 691 692 rv = 0; 693 break; 694 695 case ETM_HDR_SHUTDOWN: 696 fmd_hdl_debug(hdl, "received shutdown from %s", 697 mp->epm_ep_str); 698 699 (void) pthread_mutex_lock(&mp->epm_lock); 700 701 etm_reinit(hdl, mp); 702 703 if (IS_CLIENT(mp)) { 704 /* 705 * A server shutdown is considered to be temporary. 706 * Prepare for reconnection. 707 */ 708 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 709 Reconn_interval); 710 711 mp->epm_timer_in_use = 1; 712 } 713 714 (void) pthread_mutex_unlock(&mp->epm_lock); 715 716 rv = ECANCELED; 717 break; 718 719 case ETM_HDR_MSG: 720 (void) pthread_mutex_lock(&mp->epm_lock); 721 if (mp->epm_qstat == Q_UNINITIALIZED) { 722 /* Peer (client) is unaware that we've restarted */ 723 (void) pthread_mutex_unlock(&mp->epm_lock); 724 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 725 ETM_HDR_S_RESTART, 0); 726 727 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 728 hdrlen)) != hdrlen) { 729 fmd_hdl_debug(hdl, "failed to write S_RESTART " 730 "to %s", mp->epm_ep_str); 731 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 732 return (EIO); 733 } 734 735 return (ECANCELED); 736 } 737 (void) pthread_mutex_unlock(&mp->epm_lock); 738 739 buflen = etm_get_msglen(hbuf); 740 ALLOC_BUF(hdl, buf, buflen); 741 742 if (etm_xport_read(hdl, conn, Rw_timeout, buf, 743 buflen) != buflen) { 744 fmd_hdl_debug(hdl, "failed to read message from %s", 745 mp->epm_ep_str); 746 FREE_BUF(hdl, buf, buflen); 747 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 748 return (EIO); 749 } 750 751 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 752 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 753 754 etm_hex_dump(hdl, buf, buflen, 0); 755 756 if (etm_post_msg(hdl, mp, buf, buflen)) { 757 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 758 FREE_BUF(hdl, buf, buflen); 759 return (EIO); 760 } 761 762 FREE_BUF(hdl, buf, buflen); 763 764 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 765 766 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 767 hdrlen)) != hdrlen) { 768 fmd_hdl_debug(hdl, "failed to write ACK to %s", 769 mp->epm_ep_str); 770 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 771 return (EIO); 772 } 773 774 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 775 776 /* 777 * If we got this far and the current state of the 778 * outbound/sending connection is TIMED_OUT or 779 * LIMBO, then we should reinitialize it. 780 */ 781 (void) pthread_mutex_lock(&mp->epm_lock); 782 if (mp->epm_cstat == C_TIMED_OUT || 783 mp->epm_cstat == C_LIMBO) { 784 if (mp->epm_oconn != NULL) { 785 (void) etm_xport_close(hdl, mp->epm_oconn); 786 mp->epm_oconn = NULL; 787 } 788 mp->epm_cstat = C_UNINITIALIZED; 789 fmd_xprt_resume(hdl, mp->epm_xprthdl); 790 if (mp->epm_timer_in_use) { 791 fmd_timer_remove(hdl, mp->epm_timer_id); 792 mp->epm_timer_in_use = 0; 793 } 794 mp->epm_qstat = Q_OPEN; 795 fmd_hdl_debug(hdl, "queue resumed for %s", 796 mp->epm_ep_str); 797 } 798 (void) pthread_mutex_unlock(&mp->epm_lock); 799 800 rv = 0; 801 break; 802 803 default: 804 fmd_hdl_debug(hdl, "protocol error, unexpected header " 805 "from %s : %d", mp->epm_ep_str, hdrstat); 806 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 807 rv = 0; 808 } 809 810 return (rv); 811 } 812 813 /* 814 * ETM transport layer callback function. 815 * The transport layer calls this function to : 816 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 817 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 818 */ 819 static int 820 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 821 void *arg) 822 { 823 etm_epmap_t *mp = (etm_epmap_t *)arg; 824 int rv = 0; 825 826 (void) pthread_mutex_lock(&Etm_mod_lock); 827 if (Etm_exit) { 828 (void) pthread_mutex_unlock(&Etm_mod_lock); 829 return (ECANCELED); 830 } 831 (void) pthread_mutex_unlock(&Etm_mod_lock); 832 833 switch (flag) { 834 case ETM_CBFLAG_RECV: 835 rv = etm_recv(hdl, conn, mp); 836 break; 837 case ETM_CBFLAG_REINIT: 838 (void) pthread_mutex_lock(&mp->epm_lock); 839 etm_reinit(hdl, mp); 840 (void) pthread_mutex_unlock(&mp->epm_lock); 841 /* 842 * Return ECANCELED so the transport layer will close the 843 * server connection. The transport layer is responsible for 844 * reestablishing this connection (should a connection request 845 * arrive from the peer). 846 */ 847 rv = ECANCELED; 848 break; 849 default: 850 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 851 rv = ENOTSUP; 852 } 853 854 return (rv); 855 } 856 857 /* 858 * Allocate and initialize an etm_epmap_t struct for the given endpoint 859 * name string. 860 */ 861 static void 862 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 863 { 864 etm_epmap_t *newmap; 865 866 if (etm_check_dup_ep_str(hdl, epname)) { 867 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 868 return; 869 } 870 871 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 872 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 873 newmap->epm_xprtflags = flags; 874 newmap->epm_cstat = C_UNINITIALIZED; 875 newmap->epm_qstat = Q_UNINITIALIZED; 876 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 877 newmap->epm_txbusy = 0; 878 879 (void) pthread_mutex_init(&newmap->epm_lock, NULL); 880 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 881 882 if (etm_get_ep_nvl(hdl, newmap)) { 883 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 884 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 885 return; 886 } 887 888 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 889 etm_cb_func, newmap)) == NULL) { 890 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 891 newmap->epm_ep_str); 892 etm_free_ep_nvl(hdl, newmap); 893 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 894 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 895 return; 896 } 897 898 if (IS_CLIENT(newmap)) { 899 if (etm_handle_startup(hdl, newmap)) { 900 /* 901 * For whatever reason, we could not complete the 902 * startup handshake with the server. Set the timer 903 * and try again. 904 */ 905 if (newmap->epm_oconn != NULL) { 906 (void) etm_xport_close(hdl, newmap->epm_oconn); 907 newmap->epm_oconn = NULL; 908 } 909 newmap->epm_cstat = C_UNINITIALIZED; 910 newmap->epm_qstat = Q_UNINITIALIZED; 911 newmap->epm_timer_id = fmd_timer_install(hdl, newmap, 912 NULL, Reconn_interval); 913 newmap->epm_timer_in_use = 1; 914 } 915 } 916 917 /* Add this transport instance handle to the list */ 918 newmap->epm_next = Epmap_head; 919 Epmap_head = newmap; 920 921 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 922 } 923 924 /* 925 * Parse the given property list string and call etm_init_epmap 926 * for each endpoint. 927 */ 928 static void 929 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 930 { 931 char *epstr, *ep, *prefix, *lasts, *numstr; 932 char epname[MAXPATHLEN]; 933 size_t slen, nlen; 934 int beg, end, i; 935 936 if (eplist == NULL) 937 return; 938 /* 939 * Create a copy of eplist for parsing. 940 * strtok/strtok_r(3C) will insert null chars to the string. 941 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 942 */ 943 slen = strlen(eplist); 944 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 945 (void) strcpy(epstr, eplist); 946 947 /* 948 * The following are supported for the "client_list" and 949 * "server_list" properties : 950 * 951 * A space-separated list of endpoints. 952 * "dev:///dom0 dev:///dom1 dev:///dom2" 953 * 954 * An array syntax for a range of instances. 955 * "dev:///dom[0:2]" 956 * 957 * A combination of both. 958 * "dev:///dom0 dev:///dom[1:2]" 959 */ 960 ep = strtok_r(epstr, " ", &lasts); 961 while (ep != NULL) { 962 if (strchr(ep, '[') != NULL) { 963 /* 964 * This string is using array syntax. 965 * Check the string for correct syntax. 966 */ 967 if ((strchr(ep, ':') == NULL) || 968 (strchr(ep, ']') == NULL)) { 969 fmd_hdl_error(hdl, "Syntax error in property " 970 "that includes : %s\n", ep); 971 ep = strtok_r(NULL, " ", &lasts); 972 continue; 973 } 974 975 /* expand the array syntax */ 976 prefix = strtok(ep, "["); 977 978 numstr = strtok(NULL, ":"); 979 if ((numstr == NULL) || (!isdigit(*numstr))) { 980 fmd_hdl_error(hdl, "Syntax error in property " 981 "that includes : %s[\n", prefix); 982 ep = strtok_r(NULL, " ", &lasts); 983 continue; 984 } 985 beg = atoi(numstr); 986 987 numstr = strtok(NULL, "]"); 988 if ((numstr == NULL) || (!isdigit(*numstr))) { 989 fmd_hdl_error(hdl, "Syntax error in property " 990 "that includes : %s[\n", prefix); 991 ep = strtok_r(NULL, " ", &lasts); 992 continue; 993 } 994 end = atoi(numstr); 995 996 nlen = strlen(prefix) + ETM_EP_INST_MAX; 997 998 if (nlen > MAXPATHLEN) { 999 fmd_hdl_error(hdl, "Endpoint prop string " 1000 "exceeds MAXPATHLEN\n"); 1001 ep = strtok_r(NULL, " ", &lasts); 1002 continue; 1003 } 1004 1005 for (i = beg; i <= end; i++) { 1006 bzero(epname, MAXPATHLEN); 1007 (void) snprintf(epname, nlen, "%s%d", 1008 prefix, i); 1009 etm_init_epmap(hdl, epname, flags); 1010 } 1011 } else { 1012 etm_init_epmap(hdl, ep, flags); 1013 } 1014 1015 ep = strtok_r(NULL, " ", &lasts); 1016 } 1017 1018 fmd_hdl_free(hdl, epstr, slen + 1); 1019 } 1020 1021 /* 1022 * Free the transport infrastructure for an endpoint. 1023 */ 1024 static void 1025 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 1026 { 1027 size_t hdrlen; 1028 char hbuf[ETM_HDRLEN]; 1029 1030 (void) pthread_mutex_lock(&mp->epm_lock); 1031 1032 /* 1033 * If an etm_send thread is in progress, wait for it to finish. 1034 * The etm_recv thread is managed by the transport layer and will 1035 * be destroyed with etm_xport_fini(). 1036 */ 1037 while (mp->epm_txbusy) 1038 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 1039 1040 if (mp->epm_timer_in_use) 1041 fmd_timer_remove(hdl, mp->epm_timer_id); 1042 1043 if (mp->epm_oconn != NULL) { 1044 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 1045 ETM_HDR_SHUTDOWN, 0); 1046 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 1047 hdrlen); 1048 (void) etm_xport_close(hdl, mp->epm_oconn); 1049 mp->epm_oconn = NULL; 1050 } 1051 1052 if (mp->epm_xprthdl != NULL) { 1053 fmd_xprt_close(hdl, mp->epm_xprthdl); 1054 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1055 mp->epm_ep_nvl = NULL; 1056 } 1057 1058 if (mp->epm_ep_nvl != NULL) 1059 etm_free_ep_nvl(hdl, mp); 1060 1061 if (mp->epm_tlhdl != NULL) 1062 (void) etm_xport_fini(hdl, mp->epm_tlhdl); 1063 1064 (void) pthread_mutex_unlock(&mp->epm_lock); 1065 (void) pthread_mutex_destroy(&mp->epm_lock); 1066 fmd_hdl_strfree(hdl, mp->epm_ep_str); 1067 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 1068 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 1069 } 1070 1071 /* 1072 * FMD entry points 1073 */ 1074 1075 /* 1076 * FMD fmdo_send entry point. 1077 * Send an event to the remote endpoint and receive an ACK. 1078 */ 1079 static int 1080 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 1081 { 1082 etm_epmap_t *mp; 1083 nvlist_t *msgnvl; 1084 int hdrstat, rv; 1085 char *buf, *nvbuf, *class; 1086 size_t nvsize, buflen, hdrlen; 1087 1088 (void) pthread_mutex_lock(&Etm_mod_lock); 1089 if (Etm_exit) { 1090 (void) pthread_mutex_unlock(&Etm_mod_lock); 1091 return (FMD_SEND_RETRY); 1092 } 1093 (void) pthread_mutex_unlock(&Etm_mod_lock); 1094 1095 mp = fmd_xprt_getspecific(hdl, xprthdl); 1096 1097 if (pthread_mutex_trylock(&mp->epm_lock)) 1098 /* Another thread may be trying to close this fmd_xprt_t */ 1099 return (FMD_SEND_RETRY); 1100 1101 mp->epm_txbusy++; 1102 1103 if (mp->epm_qstat == Q_UNINITIALIZED) { 1104 mp->epm_txbusy--; 1105 (void) pthread_mutex_unlock(&mp->epm_lock); 1106 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1107 return (FMD_SEND_FAILED); 1108 } 1109 1110 if (mp->epm_cstat == C_CLOSED) { 1111 etm_suspend_reconnect(hdl, mp); 1112 mp->epm_txbusy--; 1113 (void) pthread_mutex_unlock(&mp->epm_lock); 1114 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1115 return (FMD_SEND_RETRY); 1116 } 1117 1118 if (mp->epm_cstat == C_LIMBO) { 1119 if (mp->epm_oconn != NULL) { 1120 (void) etm_xport_close(hdl, mp->epm_oconn); 1121 mp->epm_oconn = NULL; 1122 } 1123 1124 fmd_xprt_suspend(hdl, xprthdl); 1125 mp->epm_qstat = Q_SUSPENDED; 1126 mp->epm_txbusy--; 1127 (void) pthread_mutex_unlock(&mp->epm_lock); 1128 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1129 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 1130 return (FMD_SEND_RETRY); 1131 } 1132 1133 if (mp->epm_oconn == NULL) { 1134 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 1135 == NULL) { 1136 etm_suspend_reconnect(hdl, mp); 1137 mp->epm_txbusy--; 1138 (void) pthread_mutex_unlock(&mp->epm_lock); 1139 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1140 return (FMD_SEND_RETRY); 1141 } else { 1142 mp->epm_cstat = C_OPEN; 1143 } 1144 } 1145 1146 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 1147 fmd_hdl_abort(hdl, "No class string in nvlist"); 1148 1149 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 1150 if (msgnvl == NULL) { 1151 mp->epm_txbusy--; 1152 (void) pthread_mutex_unlock(&mp->epm_lock); 1153 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1154 fmd_hdl_error(hdl, "Failed to translate event %p\n", 1155 (void *) ep); 1156 return (FMD_SEND_FAILED); 1157 } 1158 1159 (void) pthread_mutex_unlock(&mp->epm_lock); 1160 1161 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 1162 1163 hdrlen = ETM_HDRLEN; 1164 buflen = nvsize + hdrlen; 1165 1166 ALLOC_BUF(hdl, buf, buflen); 1167 1168 nvbuf = buf + hdrlen; 1169 1170 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 1171 1172 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1173 (void) pthread_mutex_lock(&mp->epm_lock); 1174 mp->epm_txbusy--; 1175 (void) pthread_mutex_unlock(&mp->epm_lock); 1176 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1177 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1178 FREE_BUF(hdl, buf, buflen); 1179 return (FMD_SEND_FAILED); 1180 } 1181 1182 nvlist_free(msgnvl); 1183 1184 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 1185 buflen) != buflen) { 1186 fmd_hdl_debug(hdl, "failed to send message to %s", 1187 mp->epm_ep_str); 1188 (void) pthread_mutex_lock(&mp->epm_lock); 1189 etm_suspend_reconnect(hdl, mp); 1190 mp->epm_txbusy--; 1191 (void) pthread_mutex_unlock(&mp->epm_lock); 1192 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1193 FREE_BUF(hdl, buf, buflen); 1194 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 1195 return (FMD_SEND_RETRY); 1196 } 1197 1198 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 1199 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 1200 1201 etm_hex_dump(hdl, nvbuf, nvsize, 1); 1202 1203 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 1204 hdrlen) != hdrlen) { 1205 fmd_hdl_debug(hdl, "failed to read ACK from %s", 1206 mp->epm_ep_str); 1207 (void) pthread_mutex_lock(&mp->epm_lock); 1208 etm_suspend_reconnect(hdl, mp); 1209 mp->epm_txbusy--; 1210 (void) pthread_mutex_unlock(&mp->epm_lock); 1211 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1212 FREE_BUF(hdl, buf, buflen); 1213 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 1214 return (FMD_SEND_RETRY); 1215 } 1216 1217 hdrstat = etm_check_hdr(hdl, mp, buf); 1218 FREE_BUF(hdl, buf, buflen); 1219 1220 if (hdrstat == ETM_HDR_ACK) { 1221 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 1222 } else { 1223 (void) pthread_mutex_lock(&mp->epm_lock); 1224 1225 (void) etm_xport_close(hdl, mp->epm_oconn); 1226 mp->epm_oconn = NULL; 1227 1228 if (hdrstat == ETM_HDR_NAK) { 1229 /* Peer received a bad value in the header */ 1230 if (mp->epm_xprthdl != NULL) { 1231 mp->epm_cstat = C_LIMBO; 1232 fmd_xprt_suspend(hdl, xprthdl); 1233 mp->epm_qstat = Q_SUSPENDED; 1234 fmd_hdl_debug(hdl, "received NAK, queue " 1235 "suspended for %s", mp->epm_ep_str); 1236 } 1237 1238 rv = FMD_SEND_RETRY; 1239 1240 } else if (hdrstat == ETM_HDR_S_RESTART) { 1241 /* Server has restarted */ 1242 mp->epm_cstat = C_CLOSED; 1243 mp->epm_qstat = Q_UNINITIALIZED; 1244 fmd_hdl_debug(hdl, "server %s restarted", 1245 mp->epm_ep_str); 1246 /* 1247 * Cannot call fmd_xprt_close here, so we'll do it 1248 * on the timeout thread. 1249 */ 1250 if (mp->epm_timer_in_use == 0) { 1251 mp->epm_timer_id = fmd_timer_install( 1252 hdl, mp, NULL, 0); 1253 mp->epm_timer_in_use = 1; 1254 } 1255 1256 /* 1257 * fault.* or list.* events will be replayed if a 1258 * transport is opened with the same auth. 1259 * Other events will be discarded. 1260 */ 1261 rv = FMD_SEND_FAILED; 1262 1263 } else { 1264 mp->epm_cstat = C_CLOSED; 1265 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 1266 1267 rv = FMD_SEND_RETRY; 1268 } 1269 1270 mp->epm_txbusy--; 1271 (void) pthread_mutex_unlock(&mp->epm_lock); 1272 1273 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1274 1275 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 1276 1277 return (rv); 1278 } 1279 1280 (void) pthread_mutex_lock(&mp->epm_lock); 1281 mp->epm_txbusy--; 1282 (void) pthread_mutex_unlock(&mp->epm_lock); 1283 1284 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1285 1286 return (FMD_SEND_SUCCESS); 1287 } 1288 1289 /* 1290 * FMD fmdo_timeout entry point.. 1291 */ 1292 /*ARGSUSED*/ 1293 static void 1294 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 1295 { 1296 etm_epmap_t *mp = (etm_epmap_t *)data; 1297 1298 (void) pthread_mutex_lock(&mp->epm_lock); 1299 1300 mp->epm_timer_in_use = 0; 1301 1302 if (mp->epm_qstat == Q_UNINITIALIZED) { 1303 /* Server has shutdown and we (client) need to reconnect */ 1304 if (mp->epm_xprthdl != NULL) { 1305 fmd_xprt_close(hdl, mp->epm_xprthdl); 1306 fmd_hdl_debug(hdl, "queue closed for %s", 1307 mp->epm_ep_str); 1308 mp->epm_xprthdl = NULL; 1309 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1310 mp->epm_ep_nvl = NULL; 1311 } 1312 1313 if (mp->epm_ep_nvl == NULL) 1314 (void) etm_get_ep_nvl(hdl, mp); 1315 1316 if (etm_handle_startup(hdl, mp)) { 1317 if (mp->epm_oconn != NULL) { 1318 (void) etm_xport_close(hdl, mp->epm_oconn); 1319 mp->epm_oconn = NULL; 1320 } 1321 mp->epm_cstat = C_UNINITIALIZED; 1322 mp->epm_qstat = Q_UNINITIALIZED; 1323 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 1324 Reconn_interval); 1325 mp->epm_timer_in_use = 1; 1326 } 1327 } else { 1328 etm_reconnect(hdl, mp); 1329 } 1330 1331 (void) pthread_mutex_unlock(&mp->epm_lock); 1332 } 1333 1334 /* 1335 * FMD Module declarations 1336 */ 1337 static const fmd_hdl_ops_t etm_ops = { 1338 NULL, /* fmdo_recv */ 1339 etm_timeout, /* fmdo_timeout */ 1340 NULL, /* fmdo_close */ 1341 NULL, /* fmdo_stats */ 1342 NULL, /* fmdo_gc */ 1343 etm_send, /* fmdo_send */ 1344 }; 1345 1346 static const fmd_prop_t etm_props[] = { 1347 { "client_list", FMD_TYPE_STRING, NULL }, 1348 { "server_list", FMD_TYPE_STRING, NULL }, 1349 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1350 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000"}, 1351 { "rw_timeout", FMD_TYPE_UINT64, "2000000000"}, 1352 { NULL, 0, NULL } 1353 }; 1354 1355 static const fmd_hdl_info_t etm_info = { 1356 "Event Transport Module", "2.0", &etm_ops, etm_props 1357 }; 1358 1359 /* 1360 * Initialize the transport for use by ETM. 1361 */ 1362 void 1363 _fmd_init(fmd_hdl_t *hdl) 1364 { 1365 char *propstr; 1366 1367 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 1368 return; /* invalid data in configuration file */ 1369 } 1370 1371 /* Create global stats */ 1372 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 1373 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 1374 1375 /* Get module properties */ 1376 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 1377 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 1378 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 1379 1380 propstr = fmd_prop_get_string(hdl, "client_list"); 1381 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 1382 fmd_prop_free_string(hdl, propstr); 1383 1384 propstr = fmd_prop_get_string(hdl, "server_list"); 1385 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 1386 fmd_prop_free_string(hdl, propstr); 1387 1388 if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 1389 fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 1390 fmd_hdl_unregister(hdl); 1391 return; 1392 } 1393 } 1394 1395 /* 1396 * Teardown the transport 1397 */ 1398 void 1399 _fmd_fini(fmd_hdl_t *hdl) 1400 { 1401 etm_epmap_t *mp, *next; 1402 1403 (void) pthread_mutex_lock(&Etm_mod_lock); 1404 Etm_exit = 1; 1405 (void) pthread_mutex_unlock(&Etm_mod_lock); 1406 1407 mp = Epmap_head; 1408 1409 while (mp) { 1410 next = mp->epm_next; 1411 etm_free_epmap(hdl, mp); 1412 mp = next; 1413 } 1414 1415 fmd_hdl_unregister(hdl); 1416 } 1417