1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMA Event Transport Module 31 * 32 * Plugin for sending/receiving FMA events to/from a remote endoint. 33 */ 34 35 #include <netinet/in.h> 36 #include <sys/fm/protocol.h> 37 #include <sys/sysmacros.h> 38 #include <pthread.h> 39 #include <strings.h> 40 #include <ctype.h> 41 #include <link.h> 42 #include <libnvpair.h> 43 #include "etm_xport_api.h" 44 #include "etm_proto.h" 45 46 /* 47 * ETM declarations 48 */ 49 50 typedef enum etm_connection_status { 51 C_UNINITIALIZED = 0, 52 C_OPEN, /* Connection is open */ 53 C_CLOSED, /* Connection is closed */ 54 C_LIMBO, /* Bad value in header from peer */ 55 C_TIMED_OUT /* Reconnection to peer timed out */ 56 } etm_connstat_t; 57 58 typedef enum etm_fmd_queue_status { 59 Q_UNINITIALIZED = 100, 60 Q_INIT_PENDING, /* Queue initialization in progress */ 61 Q_OPEN, /* Queue is open */ 62 Q_SUSPENDED /* Queue is suspended */ 63 } etm_qstat_t; 64 65 /* Per endpoint data */ 66 typedef struct etm_endpoint_map { 67 uint8_t epm_ver; /* Protocol version being used */ 68 char *epm_ep_str; /* Endpoint ID string */ 69 int epm_xprtflags; /* FMD transport open flags */ 70 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */ 71 pthread_mutex_t epm_lock; /* Protects remainder of struct */ 72 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */ 73 int epm_txbusy; /* Busy doing send/transmit */ 74 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */ 75 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */ 76 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */ 77 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */ 78 etm_connstat_t epm_cstat; /* Status of connection */ 79 id_t epm_timer_id; /* Timer id */ 80 int epm_timer_in_use; /* Indicates if timer is in use */ 81 hrtime_t epm_reconn_end; /* Reconnection end time */ 82 struct etm_endpoint_map *epm_next; 83 } etm_epmap_t; 84 85 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1) 86 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2) 87 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3) 88 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */ 89 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR 90 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT) 91 92 #define ALLOC_BUF(hdl, buf, size) \ 93 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP); 94 95 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size)); 96 97 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1) 98 99 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 100 (x)++; \ 101 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 102 } 103 104 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 105 (x)--; \ 106 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 107 } 108 109 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \ 110 (x) += (y); \ 111 (void) pthread_mutex_unlock(&Etm_mod_lock); \ 112 } 113 114 /* 115 * Global variables 116 */ 117 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER; 118 /* Protects globals */ 119 static hrtime_t Reconn_interval; /* Time between reconnection attempts */ 120 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */ 121 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */ 122 static int Etm_dump = 0; /* Enables hex dump for debug */ 123 static int Etm_exit = 0; /* Flag for exit */ 124 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */ 125 126 /* Module statistics */ 127 static struct etm_stats { 128 /* read counters */ 129 fmd_stat_t read_ack; 130 fmd_stat_t read_bytes; 131 fmd_stat_t read_msg; 132 /* write counters */ 133 fmd_stat_t write_ack; 134 fmd_stat_t write_bytes; 135 fmd_stat_t write_msg; 136 /* error counters */ 137 fmd_stat_t error_protocol; 138 fmd_stat_t error_drop_read; 139 fmd_stat_t error_read; 140 fmd_stat_t error_read_badhdr; 141 fmd_stat_t error_write; 142 /* misc */ 143 fmd_stat_t peer_count; 144 145 } Etm_stats = { 146 /* read counters */ 147 { "read_ack", FMD_TYPE_UINT64, "ACKs read" }, 148 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" }, 149 { "read_msg", FMD_TYPE_UINT64, "Messages read" }, 150 /* write counters */ 151 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" }, 152 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" }, 153 { "write_msg", FMD_TYPE_UINT64, "Messages sent" }, 154 /* ETM error counters */ 155 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" }, 156 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" }, 157 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" }, 158 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" }, 159 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" }, 160 /* ETM Misc */ 161 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" }, 162 }; 163 164 /* 165 * ETM Private functions 166 */ 167 168 /* 169 * Hex dump for debug. 170 */ 171 static void 172 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction) 173 { 174 int i, j, k; 175 int16_t *c; 176 177 if (Etm_dump == 0) 178 return; 179 180 j = buflen / 16; /* Number of complete 8-column rows */ 181 k = buflen % 16; /* Is there a last (non-8-column) row? */ 182 183 if (direction) 184 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---"); 185 else 186 fmd_hdl_debug(hdl, "--- READ Message Dump ---"); 187 188 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen); 189 190 /* Dump the complete 8-column rows */ 191 for (i = 0; i < j; i++) { 192 c = (int16_t *)buf + (i * 8); 193 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i, 194 *(c+0), *(c+1), *(c+2), *(c+3), 195 *(c+4), *(c+5), *(c+6), *(c+7)); 196 } 197 198 /* Dump the last (incomplete) row */ 199 c = (int16_t *)buf + (i * 8); 200 switch (k) { 201 case 4: 202 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1)); 203 break; 204 case 8: 205 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1), 206 *(c+2), *(c+3)); 207 break; 208 case 12: 209 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0), 210 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5)); 211 break; 212 } 213 214 fmd_hdl_debug(hdl, "--- End Dump ---"); 215 } 216 217 /* 218 * Provide the length of a message based on the data in the given ETM header. 219 */ 220 static size_t 221 etm_get_msglen(void *buf) 222 { 223 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 224 225 return (ntohl(hp->hdr_msglen)); 226 } 227 228 /* 229 * Check the contents of the ETM header for errors. 230 * Return the header type (hdr_type). 231 */ 232 static int 233 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf) 234 { 235 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 236 237 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) { 238 fmd_hdl_error(hdl, "Bad delimiter in ETM header from %s " 239 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim); 240 return (ETM_HDR_INVALID); 241 } 242 243 if ((hp->hdr_type == ETM_HDR_C_HELLO) || 244 (hp->hdr_type == ETM_HDR_S_HELLO)) { 245 /* Until version is negotiated, other fields may be wrong */ 246 return (hp->hdr_type); 247 } 248 249 if (hp->hdr_ver != mp->epm_ver) { 250 fmd_hdl_error(hdl, "Bad version in ETM header from %s : 0x%x\n", 251 mp->epm_ep_str, hp->hdr_ver); 252 return (ETM_HDR_BADVERSION); 253 } 254 255 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) || 256 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) { 257 fmd_hdl_error(hdl, "Bad type in ETM header from %s : 0x%x\n", 258 mp->epm_ep_str, hp->hdr_type); 259 return (ETM_HDR_BADTYPE); 260 } 261 262 return (hp->hdr_type); 263 } 264 265 /* 266 * Create an ETM header of a given type in the given buffer. 267 * Return length of header. 268 */ 269 static size_t 270 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen) 271 { 272 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf; 273 274 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN); 275 hp->hdr_ver = ver; 276 hp->hdr_type = type; 277 hp->hdr_msglen = htonl(msglen); 278 279 return (ETM_HDRLEN); 280 } 281 282 /* 283 * Convert message bytes to nvlist and post to fmd. 284 * Return zero for success, non-zero for failure. 285 * 286 * Note : nvl is free'd by fmd. 287 */ 288 static int 289 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen) 290 { 291 nvlist_t *nvl; 292 int rv; 293 294 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) { 295 fmd_hdl_debug(hdl, "failed to unpack message"); 296 return (1); 297 } 298 299 (void) pthread_mutex_lock(&mp->epm_lock); 300 (void) pthread_mutex_lock(&Etm_mod_lock); 301 if (!Etm_exit) { 302 (void) pthread_mutex_unlock(&Etm_mod_lock); 303 if (mp->epm_qstat == Q_OPEN) { 304 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 305 rv = 0; 306 } else if (mp->epm_qstat == Q_SUSPENDED) { 307 fmd_xprt_resume(hdl, mp->epm_xprthdl); 308 if (mp->epm_timer_in_use) { 309 fmd_timer_remove(hdl, mp->epm_timer_id); 310 mp->epm_timer_in_use = 0; 311 } 312 mp->epm_qstat = Q_OPEN; 313 fmd_hdl_debug(hdl, "queue resumed for %s", 314 mp->epm_ep_str); 315 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0); 316 rv = 0; 317 } else { 318 fmd_hdl_debug(hdl, "unable to post message, qstat = %d", 319 mp->epm_qstat); 320 rv = 2; 321 } 322 } else { 323 (void) pthread_mutex_unlock(&Etm_mod_lock); 324 fmd_hdl_debug(hdl, "unable to post message, module exiting"); 325 rv = 3; 326 } 327 328 (void) pthread_mutex_unlock(&mp->epm_lock); 329 330 return (rv); 331 } 332 333 /* 334 * Handle the startup handshake to the server. The client always initiates 335 * the startup handshake. In the following sequence, we are the client and 336 * the remote endpoint is the server. 337 * 338 * Client sends C_HELLO and transitions to Q_INIT_PENDING state. 339 * Server sends S_HELLO and transitions to Q_INIT_PENDING state. 340 * Client sends ACK and transitions to Q_OPEN state. 341 * Server receives ACK and transitions to Q_OPEN state. 342 * 343 * Return 0 for success, nonzero for failure. 344 */ 345 static int 346 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp) 347 { 348 etm_proto_hdr_t *hp; 349 size_t hdrlen = ETM_HDRLEN; 350 int hdrstat; 351 char hbuf[ETM_HDRLEN]; 352 353 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL) 354 return (1); 355 356 mp->epm_cstat = C_OPEN; 357 358 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0); 359 360 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 361 hdrlen)) != hdrlen) { 362 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s", 363 mp->epm_ep_str); 364 return (2); 365 } 366 367 mp->epm_qstat = Q_INIT_PENDING; 368 369 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf, 370 hdrlen)) != hdrlen) { 371 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s", 372 mp->epm_ep_str); 373 return (3); 374 } 375 376 hdrstat = etm_check_hdr(hdl, mp, hbuf); 377 378 if (hdrstat != ETM_HDR_S_HELLO) { 379 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO " 380 "from %s", mp->epm_ep_str); 381 return (4); 382 } 383 384 /* 385 * Get version from the server. 386 * Currently, only one version is supported. 387 */ 388 hp = (etm_proto_hdr_t *)(void *)hbuf; 389 if (hp->hdr_ver != ETM_PROTO_V1) { 390 fmd_hdl_error(hdl, "Unable to use same version as %s : %d", 391 mp->epm_ep_str, hp->hdr_ver); 392 return (5); 393 } 394 mp->epm_ver = hp->hdr_ver; 395 396 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 397 398 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 399 hdrlen)) != hdrlen) { 400 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s", 401 mp->epm_ep_str); 402 return (6); 403 } 404 405 /* 406 * Call fmd_xprt_open and fmd_xprt_setspecific with 407 * Etm_mod_lock held to avoid race with etm_send thread. 408 */ 409 (void) pthread_mutex_lock(&Etm_mod_lock); 410 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags, 411 mp->epm_ep_nvl, NULL)) == NULL) { 412 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s", 413 mp->epm_ep_str); 414 } 415 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 416 (void) pthread_mutex_unlock(&Etm_mod_lock); 417 418 mp->epm_qstat = Q_OPEN; 419 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str); 420 421 return (0); 422 } 423 424 /* 425 * Alloc a nvlist and add a string for the endpoint. 426 * Return zero for success, non-zero for failure. 427 */ 428 static int 429 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 430 { 431 /* 432 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation 433 * in fmd when this nvlist_t is free'd. 434 */ 435 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0); 436 437 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) { 438 fmd_hdl_debug(hdl, "failed to add domain-id string to nvlist " 439 "for %s", mp->epm_ep_str); 440 nvlist_free(mp->epm_ep_nvl); 441 return (1); 442 } 443 444 return (0); 445 } 446 447 /* 448 * Free the nvlist for the endpoint_id string. 449 */ 450 /*ARGSUSED*/ 451 static void 452 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp) 453 { 454 nvlist_free(mp->epm_ep_nvl); 455 } 456 457 /* 458 * Check for a duplicate endpoint/peer string. 459 */ 460 /*ARGSUSED*/ 461 static int 462 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname) 463 { 464 etm_epmap_t *mp; 465 466 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next) 467 if (strcmp(epname, mp->epm_ep_str) == 0) 468 return (1); 469 470 return (0); 471 } 472 473 /* 474 * Attempt to re-open a connection with the remote endpoint. 475 */ 476 static void 477 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 478 { 479 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) { 480 if (gethrtime() < mp->epm_reconn_end) { 481 if ((mp->epm_oconn = etm_xport_open(hdl, 482 mp->epm_tlhdl)) == NULL) { 483 fmd_hdl_debug(hdl, "reconnect failed for %s", 484 mp->epm_ep_str); 485 mp->epm_timer_id = fmd_timer_install(hdl, mp, 486 NULL, Reconn_interval); 487 mp->epm_timer_in_use = 1; 488 } else { 489 fmd_hdl_debug(hdl, "reconnect success for %s", 490 mp->epm_ep_str); 491 mp->epm_reconn_end = 0; 492 mp->epm_cstat = C_OPEN; 493 } 494 } else { 495 fmd_hdl_error(hdl, "Reconnect timed out for %s\n", 496 mp->epm_ep_str); 497 mp->epm_reconn_end = 0; 498 mp->epm_cstat = C_TIMED_OUT; 499 } 500 } 501 502 if (mp->epm_cstat == C_OPEN) { 503 fmd_xprt_resume(hdl, mp->epm_xprthdl); 504 mp->epm_qstat = Q_OPEN; 505 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str); 506 } 507 } 508 509 /* 510 * Suspend a given connection and setup for reconnection retries. 511 */ 512 static void 513 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp) 514 { 515 (void) pthread_mutex_lock(&Etm_mod_lock); 516 if (Etm_exit) { 517 (void) pthread_mutex_unlock(&Etm_mod_lock); 518 return; 519 } 520 (void) pthread_mutex_unlock(&Etm_mod_lock); 521 522 (void) pthread_mutex_lock(&mp->epm_lock); 523 524 if (mp->epm_oconn != NULL) { 525 (void) etm_xport_close(hdl, mp->epm_oconn); 526 mp->epm_oconn = NULL; 527 } 528 529 mp->epm_reconn_end = gethrtime() + Reconn_timeout; 530 mp->epm_cstat = C_UNINITIALIZED; 531 532 if (mp->epm_xprthdl != NULL) { 533 fmd_xprt_suspend(hdl, mp->epm_xprthdl); 534 mp->epm_qstat = Q_SUSPENDED; 535 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 536 537 if (mp->epm_timer_in_use == 0) { 538 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 539 Reconn_interval); 540 mp->epm_timer_in_use = 1; 541 } 542 } 543 544 (void) pthread_mutex_unlock(&mp->epm_lock); 545 } 546 547 /* 548 * Reinitialize the connection. The old fmd_xprt_t handle must be 549 * removed/closed first. 550 * Assume caller holds lock on epm_lock. 551 */ 552 static void 553 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp) 554 { 555 /* 556 * To avoid a deadlock, wait for etm_send to finish before 557 * calling fmd_xprt_close() 558 */ 559 while (mp->epm_txbusy) 560 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 561 562 if (mp->epm_xprthdl != NULL) { 563 fmd_xprt_close(hdl, mp->epm_xprthdl); 564 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str); 565 mp->epm_xprthdl = NULL; 566 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 567 mp->epm_ep_nvl = NULL; 568 } 569 570 if (mp->epm_timer_in_use) { 571 fmd_timer_remove(hdl, mp->epm_timer_id); 572 mp->epm_timer_in_use = 0; 573 } 574 575 if (mp->epm_oconn != NULL) { 576 (void) etm_xport_close(hdl, mp->epm_oconn); 577 mp->epm_oconn = NULL; 578 } 579 580 mp->epm_cstat = C_UNINITIALIZED; 581 mp->epm_qstat = Q_UNINITIALIZED; 582 } 583 584 /* 585 * Receive data from ETM transport layer. 586 * Note : This is not the fmdo_recv entry point. 587 * 588 */ 589 static int 590 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp) 591 { 592 size_t buflen, hdrlen; 593 void *buf; 594 char hbuf[ETM_HDRLEN]; 595 int hdrstat, rv; 596 597 hdrlen = ETM_HDRLEN; 598 599 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) { 600 fmd_hdl_debug(hdl, "failed to read header from %s", 601 mp->epm_ep_str); 602 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 603 return (EIO); 604 } 605 606 hdrstat = etm_check_hdr(hdl, mp, hbuf); 607 608 switch (hdrstat) { 609 case ETM_HDR_INVALID: 610 (void) pthread_mutex_lock(&mp->epm_lock); 611 if (mp->epm_cstat == C_OPEN) 612 mp->epm_cstat = C_CLOSED; 613 (void) pthread_mutex_unlock(&mp->epm_lock); 614 615 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 616 rv = ECANCELED; 617 break; 618 619 case ETM_HDR_BADTYPE: 620 case ETM_HDR_BADVERSION: 621 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0); 622 623 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 624 hdrlen)) != hdrlen) { 625 fmd_hdl_debug(hdl, "failed to write NAK to %s", 626 mp->epm_ep_str); 627 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 628 return (EIO); 629 } 630 631 (void) pthread_mutex_lock(&mp->epm_lock); 632 mp->epm_cstat = C_LIMBO; 633 (void) pthread_mutex_unlock(&mp->epm_lock); 634 635 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 636 rv = ENOTSUP; 637 break; 638 639 case ETM_HDR_C_HELLO: 640 /* Client is initiating a startup handshake */ 641 (void) pthread_mutex_lock(&mp->epm_lock); 642 etm_reinit(hdl, mp); 643 mp->epm_qstat = Q_INIT_PENDING; 644 (void) pthread_mutex_unlock(&mp->epm_lock); 645 646 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0); 647 648 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 649 hdrlen)) != hdrlen) { 650 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s", 651 mp->epm_ep_str); 652 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 653 return (EIO); 654 } 655 656 rv = 0; 657 break; 658 659 case ETM_HDR_ACK: 660 (void) pthread_mutex_lock(&mp->epm_lock); 661 if (mp->epm_qstat == Q_INIT_PENDING) { 662 /* This is client's ACK from startup handshake */ 663 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 664 if (mp->epm_ep_nvl == NULL) 665 (void) etm_get_ep_nvl(hdl, mp); 666 667 /* 668 * Call fmd_xprt_open and fmd_xprt_setspecific with 669 * Etm_mod_lock held to avoid race with etm_send thread. 670 */ 671 (void) pthread_mutex_lock(&Etm_mod_lock); 672 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, 673 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) { 674 fmd_hdl_abort(hdl, "Failed to init xprthdl " 675 "for %s", mp->epm_ep_str); 676 } 677 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp); 678 (void) pthread_mutex_unlock(&Etm_mod_lock); 679 680 mp->epm_qstat = Q_OPEN; 681 (void) pthread_mutex_unlock(&mp->epm_lock); 682 fmd_hdl_debug(hdl, "queue open for %s", 683 mp->epm_ep_str); 684 } else { 685 (void) pthread_mutex_unlock(&mp->epm_lock); 686 fmd_hdl_debug(hdl, "protocol error, not expecting ACK " 687 "from %s\n", mp->epm_ep_str); 688 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 689 } 690 691 rv = 0; 692 break; 693 694 case ETM_HDR_SHUTDOWN: 695 fmd_hdl_debug(hdl, "received shutdown from %s", 696 mp->epm_ep_str); 697 698 (void) pthread_mutex_lock(&mp->epm_lock); 699 700 etm_reinit(hdl, mp); 701 702 if (IS_CLIENT(mp)) { 703 /* 704 * A server shutdown is considered to be temporary. 705 * Prepare for reconnection. 706 */ 707 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 708 Reconn_interval); 709 710 mp->epm_timer_in_use = 1; 711 } 712 713 (void) pthread_mutex_unlock(&mp->epm_lock); 714 715 rv = ECANCELED; 716 break; 717 718 case ETM_HDR_MSG: 719 (void) pthread_mutex_lock(&mp->epm_lock); 720 if (mp->epm_qstat == Q_UNINITIALIZED) { 721 /* Peer (client) is unaware that we've restarted */ 722 (void) pthread_mutex_unlock(&mp->epm_lock); 723 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 724 ETM_HDR_S_RESTART, 0); 725 726 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 727 hdrlen)) != hdrlen) { 728 fmd_hdl_debug(hdl, "failed to write S_RESTART " 729 "to %s", mp->epm_ep_str); 730 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 731 return (EIO); 732 } 733 734 return (ECANCELED); 735 } 736 (void) pthread_mutex_unlock(&mp->epm_lock); 737 738 buflen = etm_get_msglen(hbuf); 739 ALLOC_BUF(hdl, buf, buflen); 740 741 if (etm_xport_read(hdl, conn, Rw_timeout, buf, 742 buflen) != buflen) { 743 fmd_hdl_debug(hdl, "failed to read message from %s", 744 mp->epm_ep_str); 745 FREE_BUF(hdl, buf, buflen); 746 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 747 return (EIO); 748 } 749 750 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64); 751 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen); 752 753 etm_hex_dump(hdl, buf, buflen, 0); 754 755 if (etm_post_msg(hdl, mp, buf, buflen)) { 756 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64); 757 FREE_BUF(hdl, buf, buflen); 758 return (EIO); 759 } 760 761 FREE_BUF(hdl, buf, buflen); 762 763 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0); 764 765 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf, 766 hdrlen)) != hdrlen) { 767 fmd_hdl_debug(hdl, "failed to write ACK to %s", 768 mp->epm_ep_str); 769 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 770 return (EIO); 771 } 772 773 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64); 774 775 /* 776 * If we got this far and the current state of the 777 * outbound/sending connection is TIMED_OUT or 778 * LIMBO, then we should reinitialize it. 779 */ 780 (void) pthread_mutex_lock(&mp->epm_lock); 781 if (mp->epm_cstat == C_TIMED_OUT || 782 mp->epm_cstat == C_LIMBO) { 783 if (mp->epm_oconn != NULL) { 784 (void) etm_xport_close(hdl, mp->epm_oconn); 785 mp->epm_oconn = NULL; 786 } 787 mp->epm_cstat = C_UNINITIALIZED; 788 fmd_xprt_resume(hdl, mp->epm_xprthdl); 789 if (mp->epm_timer_in_use) { 790 fmd_timer_remove(hdl, mp->epm_timer_id); 791 mp->epm_timer_in_use = 0; 792 } 793 mp->epm_qstat = Q_OPEN; 794 fmd_hdl_debug(hdl, "queue resumed for %s", 795 mp->epm_ep_str); 796 } 797 (void) pthread_mutex_unlock(&mp->epm_lock); 798 799 rv = 0; 800 break; 801 802 default: 803 fmd_hdl_debug(hdl, "protocol error, unexpected header " 804 "from %s : %d", mp->epm_ep_str, hdrstat); 805 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64); 806 rv = 0; 807 } 808 809 return (rv); 810 } 811 812 /* 813 * ETM transport layer callback function. 814 * The transport layer calls this function to : 815 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV) 816 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT) 817 */ 818 static int 819 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 820 void *arg) 821 { 822 etm_epmap_t *mp = (etm_epmap_t *)arg; 823 int rv = 0; 824 825 (void) pthread_mutex_lock(&Etm_mod_lock); 826 if (Etm_exit) { 827 (void) pthread_mutex_unlock(&Etm_mod_lock); 828 return (ECANCELED); 829 } 830 (void) pthread_mutex_unlock(&Etm_mod_lock); 831 832 switch (flag) { 833 case ETM_CBFLAG_RECV: 834 rv = etm_recv(hdl, conn, mp); 835 break; 836 case ETM_CBFLAG_REINIT: 837 (void) pthread_mutex_lock(&mp->epm_lock); 838 etm_reinit(hdl, mp); 839 (void) pthread_mutex_unlock(&mp->epm_lock); 840 /* 841 * Return ECANCELED so the transport layer will close the 842 * server connection. The transport layer is responsible for 843 * reestablishing this connection (should a connection request 844 * arrive from the peer). 845 */ 846 rv = ECANCELED; 847 break; 848 default: 849 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag); 850 rv = ENOTSUP; 851 } 852 853 return (rv); 854 } 855 856 /* 857 * Allocate and initialize an etm_epmap_t struct for the given endpoint 858 * name string. 859 */ 860 static void 861 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags) 862 { 863 etm_epmap_t *newmap; 864 865 if (etm_check_dup_ep_str(hdl, epname)) { 866 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname); 867 return; 868 } 869 870 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP); 871 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP); 872 newmap->epm_xprtflags = flags; 873 newmap->epm_cstat = C_UNINITIALIZED; 874 newmap->epm_qstat = Q_UNINITIALIZED; 875 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */ 876 newmap->epm_txbusy = 0; 877 878 (void) pthread_mutex_init(&newmap->epm_lock, NULL); 879 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL); 880 881 if (etm_get_ep_nvl(hdl, newmap)) { 882 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 883 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 884 return; 885 } 886 887 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str, 888 etm_cb_func, newmap)) == NULL) { 889 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n", 890 newmap->epm_ep_str); 891 etm_free_ep_nvl(hdl, newmap); 892 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 893 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 894 return; 895 } 896 897 if (IS_CLIENT(newmap)) { 898 if (etm_handle_startup(hdl, newmap)) { 899 etm_free_ep_nvl(hdl, newmap); 900 (void) etm_xport_fini(hdl, newmap->epm_tlhdl); 901 fmd_hdl_strfree(hdl, newmap->epm_ep_str); 902 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t)); 903 return; 904 } 905 } 906 907 /* Add this transport instance handle to the list */ 908 newmap->epm_next = Epmap_head; 909 Epmap_head = newmap; 910 911 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64); 912 } 913 914 /* 915 * Parse the given property list string and call etm_init_epmap 916 * for each endpoint. 917 */ 918 static void 919 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags) 920 { 921 char *epstr, *ep, *prefix, *lasts, *numstr; 922 char epname[MAXPATHLEN]; 923 size_t slen, nlen; 924 int beg, end, i; 925 926 if (eplist == NULL) 927 return; 928 /* 929 * Create a copy of eplist for parsing. 930 * strtok/strtok_r(3C) will insert null chars to the string. 931 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used. 932 */ 933 slen = strlen(eplist); 934 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP); 935 (void) strcpy(epstr, eplist); 936 937 /* 938 * The following are supported for the "client_list" and 939 * "server_list" properties : 940 * 941 * A space-separated list of endpoints. 942 * "dev:///dom0 dev:///dom1 dev:///dom2" 943 * 944 * An array syntax for a range of instances. 945 * "dev:///dom[0:2]" 946 * 947 * A combination of both. 948 * "dev:///dom0 dev:///dom[1:2]" 949 */ 950 ep = strtok_r(epstr, " ", &lasts); 951 while (ep != NULL) { 952 if (strchr(ep, '[') != NULL) { 953 /* 954 * This string is using array syntax. 955 * Check the string for correct syntax. 956 */ 957 if ((strchr(ep, ':') == NULL) || 958 (strchr(ep, ']') == NULL)) { 959 fmd_hdl_error(hdl, "Syntax error in property " 960 "that includes : %s\n", ep); 961 ep = strtok_r(NULL, " ", &lasts); 962 continue; 963 } 964 965 /* expand the array syntax */ 966 prefix = strtok(ep, "["); 967 968 numstr = strtok(NULL, ":"); 969 if ((numstr == NULL) || (!isdigit(*numstr))) { 970 fmd_hdl_error(hdl, "Syntax error in property " 971 "that includes : %s[\n", prefix); 972 ep = strtok_r(NULL, " ", &lasts); 973 continue; 974 } 975 beg = atoi(numstr); 976 977 numstr = strtok(NULL, "]"); 978 if ((numstr == NULL) || (!isdigit(*numstr))) { 979 fmd_hdl_error(hdl, "Syntax error in property " 980 "that includes : %s[\n", prefix); 981 ep = strtok_r(NULL, " ", &lasts); 982 continue; 983 } 984 end = atoi(numstr); 985 986 nlen = strlen(prefix) + ETM_EP_INST_MAX; 987 988 if (nlen > MAXPATHLEN) { 989 fmd_hdl_error(hdl, "Endpoint prop string " 990 "exceeds MAXPATHLEN\n"); 991 ep = strtok_r(NULL, " ", &lasts); 992 continue; 993 } 994 995 for (i = beg; i <= end; i++) { 996 bzero(epname, MAXPATHLEN); 997 (void) snprintf(epname, nlen, "%s%d", 998 prefix, i); 999 etm_init_epmap(hdl, epname, flags); 1000 } 1001 } else { 1002 etm_init_epmap(hdl, ep, flags); 1003 } 1004 1005 ep = strtok_r(NULL, " ", &lasts); 1006 } 1007 1008 fmd_hdl_free(hdl, epstr, slen + 1); 1009 } 1010 1011 /* 1012 * Free the transport infrastructure for an endpoint. 1013 */ 1014 static void 1015 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp) 1016 { 1017 size_t hdrlen; 1018 char hbuf[ETM_HDRLEN]; 1019 1020 (void) pthread_mutex_lock(&mp->epm_lock); 1021 1022 /* 1023 * If an etm_send thread is in progress, wait for it to finish. 1024 * The etm_recv thread is managed by the transport layer and will 1025 * be destroyed with etm_xport_fini(). 1026 */ 1027 while (mp->epm_txbusy) 1028 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock); 1029 1030 if (mp->epm_timer_in_use) 1031 fmd_timer_remove(hdl, mp->epm_timer_id); 1032 1033 if (mp->epm_oconn != NULL) { 1034 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, 1035 ETM_HDR_SHUTDOWN, 0); 1036 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, 1037 hdrlen); 1038 (void) etm_xport_close(hdl, mp->epm_oconn); 1039 mp->epm_oconn = NULL; 1040 } 1041 1042 if (mp->epm_xprthdl != NULL) { 1043 fmd_xprt_close(hdl, mp->epm_xprthdl); 1044 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1045 mp->epm_ep_nvl = NULL; 1046 } 1047 1048 if (mp->epm_ep_nvl != NULL) 1049 etm_free_ep_nvl(hdl, mp); 1050 1051 if (mp->epm_tlhdl != NULL) 1052 (void) etm_xport_fini(hdl, mp->epm_tlhdl); 1053 1054 (void) pthread_mutex_unlock(&mp->epm_lock); 1055 (void) pthread_mutex_destroy(&mp->epm_lock); 1056 fmd_hdl_strfree(hdl, mp->epm_ep_str); 1057 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t)); 1058 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64); 1059 } 1060 1061 /* 1062 * FMD entry points 1063 */ 1064 1065 /* 1066 * FMD fmdo_send entry point. 1067 * Send an event to the remote endpoint and receive an ACK. 1068 */ 1069 static int 1070 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl) 1071 { 1072 etm_epmap_t *mp; 1073 nvlist_t *msgnvl; 1074 int hdrstat, rv; 1075 char *buf, *nvbuf, *class; 1076 size_t nvsize, buflen, hdrlen; 1077 1078 (void) pthread_mutex_lock(&Etm_mod_lock); 1079 if (Etm_exit) { 1080 (void) pthread_mutex_unlock(&Etm_mod_lock); 1081 return (FMD_SEND_RETRY); 1082 } 1083 (void) pthread_mutex_unlock(&Etm_mod_lock); 1084 1085 mp = fmd_xprt_getspecific(hdl, xprthdl); 1086 1087 (void) pthread_mutex_lock(&mp->epm_lock); 1088 1089 mp->epm_txbusy++; 1090 1091 if (mp->epm_cstat == C_CLOSED) { 1092 mp->epm_txbusy--; 1093 (void) pthread_mutex_unlock(&mp->epm_lock); 1094 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1095 etm_suspend_reconnect(hdl, mp); 1096 return (FMD_SEND_RETRY); 1097 } 1098 1099 if (mp->epm_cstat == C_LIMBO) { 1100 if (mp->epm_oconn != NULL) { 1101 (void) etm_xport_close(hdl, mp->epm_oconn); 1102 mp->epm_oconn = NULL; 1103 } 1104 1105 fmd_xprt_suspend(hdl, xprthdl); 1106 mp->epm_qstat = Q_SUSPENDED; 1107 mp->epm_txbusy--; 1108 (void) pthread_mutex_unlock(&mp->epm_lock); 1109 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1110 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str); 1111 return (FMD_SEND_RETRY); 1112 } 1113 1114 if (mp->epm_oconn == NULL) { 1115 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) 1116 == NULL) { 1117 mp->epm_txbusy--; 1118 (void) pthread_mutex_unlock(&mp->epm_lock); 1119 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1120 etm_suspend_reconnect(hdl, mp); 1121 return (FMD_SEND_RETRY); 1122 } else { 1123 mp->epm_cstat = C_OPEN; 1124 } 1125 } 1126 1127 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) 1128 fmd_hdl_abort(hdl, "No class string in nvlist"); 1129 1130 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep); 1131 if (msgnvl == NULL) { 1132 mp->epm_qstat = Q_UNINITIALIZED; 1133 (void) pthread_mutex_unlock(&mp->epm_lock); 1134 fmd_hdl_error(hdl, "Failed to translate event %p\n", 1135 (void *) ep); 1136 return (FMD_SEND_FAILED); 1137 } 1138 1139 (void) pthread_mutex_unlock(&mp->epm_lock); 1140 1141 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR); 1142 1143 hdrlen = ETM_HDRLEN; 1144 buflen = nvsize + hdrlen; 1145 1146 ALLOC_BUF(hdl, buf, buflen); 1147 1148 nvbuf = buf + hdrlen; 1149 1150 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize); 1151 1152 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) { 1153 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv)); 1154 FREE_BUF(hdl, buf, buflen); 1155 return (FMD_SEND_FAILED); 1156 } 1157 1158 nvlist_free(msgnvl); 1159 1160 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf, 1161 buflen) != buflen) { 1162 (void) pthread_mutex_lock(&mp->epm_lock); 1163 mp->epm_txbusy--; 1164 (void) pthread_mutex_unlock(&mp->epm_lock); 1165 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1166 fmd_hdl_debug(hdl, "failed to send message to %s", 1167 mp->epm_ep_str); 1168 FREE_BUF(hdl, buf, buflen); 1169 INCRSTAT(Etm_stats.error_write.fmds_value.ui64); 1170 etm_suspend_reconnect(hdl, mp); 1171 return (FMD_SEND_RETRY); 1172 } 1173 1174 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64); 1175 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize); 1176 1177 etm_hex_dump(hdl, nvbuf, nvsize, 1); 1178 1179 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf, 1180 hdrlen) != hdrlen) { 1181 (void) pthread_mutex_lock(&mp->epm_lock); 1182 mp->epm_txbusy--; 1183 (void) pthread_mutex_unlock(&mp->epm_lock); 1184 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1185 fmd_hdl_debug(hdl, "failed to read ACK from %s", 1186 mp->epm_ep_str); 1187 FREE_BUF(hdl, buf, buflen); 1188 INCRSTAT(Etm_stats.error_read.fmds_value.ui64); 1189 etm_suspend_reconnect(hdl, mp); 1190 return (FMD_SEND_RETRY); 1191 } 1192 1193 hdrstat = etm_check_hdr(hdl, mp, buf); 1194 FREE_BUF(hdl, buf, buflen); 1195 1196 if (hdrstat == ETM_HDR_ACK) { 1197 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64); 1198 } else { 1199 (void) pthread_mutex_lock(&mp->epm_lock); 1200 1201 (void) etm_xport_close(hdl, mp->epm_oconn); 1202 mp->epm_oconn = NULL; 1203 1204 if (hdrstat == ETM_HDR_NAK) { 1205 /* Peer received a bad value in the header */ 1206 if (mp->epm_xprthdl != NULL) { 1207 mp->epm_cstat = C_LIMBO; 1208 fmd_xprt_suspend(hdl, xprthdl); 1209 mp->epm_qstat = Q_SUSPENDED; 1210 fmd_hdl_debug(hdl, "received NAK, queue " 1211 "suspended for %s", mp->epm_ep_str); 1212 } 1213 1214 rv = FMD_SEND_RETRY; 1215 1216 } else if (hdrstat == ETM_HDR_S_RESTART) { 1217 /* Server has restarted */ 1218 if (mp->epm_xprthdl != NULL) { 1219 mp->epm_cstat = C_CLOSED; 1220 fmd_xprt_close(hdl, xprthdl); 1221 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */ 1222 mp->epm_ep_nvl = NULL; 1223 mp->epm_qstat = Q_UNINITIALIZED; 1224 fmd_hdl_debug(hdl, "server restarted, queue " 1225 "closed for %s", mp->epm_ep_str); 1226 if (mp->epm_timer_in_use == 0) { 1227 mp->epm_timer_id = fmd_timer_install( 1228 hdl, mp, NULL, Reconn_interval); 1229 mp->epm_timer_in_use = 1; 1230 } 1231 } 1232 1233 /* 1234 * fault.* or list.* events will be replayed if a 1235 * transport is opened with the same auth. 1236 * Other events will be discarded. 1237 */ 1238 rv = FMD_SEND_FAILED; 1239 1240 } else { 1241 mp->epm_cstat = C_CLOSED; 1242 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str); 1243 1244 rv = FMD_SEND_RETRY; 1245 } 1246 1247 mp->epm_txbusy--; 1248 (void) pthread_mutex_unlock(&mp->epm_lock); 1249 1250 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1251 1252 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64); 1253 1254 return (rv); 1255 } 1256 1257 (void) pthread_mutex_lock(&mp->epm_lock); 1258 mp->epm_txbusy--; 1259 (void) pthread_mutex_unlock(&mp->epm_lock); 1260 1261 (void) pthread_cond_broadcast(&mp->epm_tx_cv); 1262 1263 return (FMD_SEND_SUCCESS); 1264 } 1265 1266 /* 1267 * FMD fmdo_timeout entry point.. 1268 */ 1269 /*ARGSUSED*/ 1270 static void 1271 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data) 1272 { 1273 etm_epmap_t *mp = (etm_epmap_t *)data; 1274 1275 (void) pthread_mutex_lock(&mp->epm_lock); 1276 1277 mp->epm_timer_in_use = 0; 1278 1279 if (mp->epm_qstat == Q_UNINITIALIZED) { 1280 /* Server has shutdown and we (client) need to reconnect */ 1281 if (mp->epm_ep_nvl == NULL) 1282 (void) etm_get_ep_nvl(hdl, mp); 1283 1284 if (etm_handle_startup(hdl, mp)) { 1285 mp->epm_qstat = Q_UNINITIALIZED; 1286 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL, 1287 Reconn_interval); 1288 mp->epm_timer_in_use = 1; 1289 } 1290 } else { 1291 etm_reconnect(hdl, mp); 1292 } 1293 1294 (void) pthread_mutex_unlock(&mp->epm_lock); 1295 } 1296 1297 /* 1298 * FMD Module declarations 1299 */ 1300 static const fmd_hdl_ops_t etm_ops = { 1301 NULL, /* fmdo_recv */ 1302 etm_timeout, /* fmdo_timeout */ 1303 NULL, /* fmdo_close */ 1304 NULL, /* fmdo_stats */ 1305 NULL, /* fmdo_gc */ 1306 etm_send, /* fmdo_send */ 1307 }; 1308 1309 static const fmd_prop_t etm_props[] = { 1310 { "client_list", FMD_TYPE_STRING, NULL }, 1311 { "server_list", FMD_TYPE_STRING, NULL }, 1312 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" }, 1313 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000"}, 1314 { "rw_timeout", FMD_TYPE_UINT64, "2000000000"}, 1315 { NULL, 0, NULL } 1316 }; 1317 1318 static const fmd_hdl_info_t etm_info = { 1319 "Event Transport Module", "2.0", &etm_ops, etm_props 1320 }; 1321 1322 /* 1323 * Initialize the transport for use by ETM. 1324 */ 1325 void 1326 _fmd_init(fmd_hdl_t *hdl) 1327 { 1328 char *propstr; 1329 1330 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) { 1331 return; /* invalid data in configuration file */ 1332 } 1333 1334 /* Create global stats */ 1335 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC, 1336 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats); 1337 1338 /* Get module properties */ 1339 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout"); 1340 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval"); 1341 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout"); 1342 1343 propstr = fmd_prop_get_string(hdl, "client_list"); 1344 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS); 1345 fmd_prop_free_string(hdl, propstr); 1346 1347 propstr = fmd_prop_get_string(hdl, "server_list"); 1348 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS); 1349 fmd_prop_free_string(hdl, propstr); 1350 1351 if (Etm_stats.peer_count.fmds_value.ui64 == 0) { 1352 fmd_hdl_debug(hdl, "Failed to init any endpoint\n"); 1353 fmd_hdl_unregister(hdl); 1354 return; 1355 } 1356 } 1357 1358 /* 1359 * Teardown the transport 1360 */ 1361 void 1362 _fmd_fini(fmd_hdl_t *hdl) 1363 { 1364 etm_epmap_t *mp, *next; 1365 1366 (void) pthread_mutex_lock(&Etm_mod_lock); 1367 Etm_exit = 1; 1368 (void) pthread_mutex_unlock(&Etm_mod_lock); 1369 1370 mp = Epmap_head; 1371 1372 while (mp) { 1373 next = mp->epm_next; 1374 etm_free_epmap(hdl, mp); 1375 mp = next; 1376 } 1377 1378 fmd_hdl_unregister(hdl); 1379 } 1380