1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 /* 28 * FMA Event Transport Module Transport Layer API implementation. 29 * 30 * Library for establishing connections and transporting FMA events between 31 * ETMs (event-transport modules) in separate fault domains. 32 * 33 * The transport for this library is internet socket based and uses the DSCP 34 * client services library (libdscp). 35 */ 36 37 #include "ex_dscp.h" 38 39 /* 40 * On the SP, there is one DSCP interface for every domain. 41 * Each domain has one and only one DSCP interface to the SP. 42 * 43 * The DSCP interface is created when the domain powers-on. On the SP, 44 * a sysevent will be generated when the DSCP interface is up. On the domain, 45 * the DSCP interface should be up when ETM loads. 46 */ 47 48 exs_conn_t Acc; /* Connection for accepting/listening */ 49 pthread_t Acc_tid; /* Thread ID for accepting conns */ 50 int Acc_quit; /* Signal to quit the acceptor thread */ 51 int Acc_destroy; /* Destroy accept/listen thread? */ 52 exs_hdl_t *Exh_head = NULL; /* Head of ex_hdl_t list */ 53 pthread_mutex_t List_lock = PTHREAD_MUTEX_INITIALIZER; 54 /* Protects linked list of ex_hdl_t */ 55 static void *Dlp = NULL; /* Handle for dlopen/dlclose/dlsym */ 56 static int (*Send_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *dest); 57 static int (*Post_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *src); 58 59 /* 60 * * * * * * * * * * * * * * 61 * Module specific routines 62 * * * * * * * * * * * * * * 63 */ 64 65 /* 66 * Allocate and initialize a transport instance handle. 67 * Return hdl pointer for success, NULL for failure. 68 */ 69 static exs_hdl_t * 70 exs_hdl_alloc(fmd_hdl_t *hdl, char *endpoint_id, 71 int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 72 void *arg), void *cb_func_arg, int dom) 73 { 74 exs_hdl_t *hp; 75 76 hp = fmd_hdl_zalloc(hdl, sizeof (exs_hdl_t), FMD_SLEEP); 77 78 hp->h_endpt_id = fmd_hdl_strdup(hdl, endpoint_id, FMD_SLEEP); 79 hp->h_dom = dom; 80 hp->h_client.c_sd = EXS_SD_FREE; 81 hp->h_server.c_sd = EXS_SD_FREE; 82 hp->h_tid = EXS_TID_FREE; 83 hp->h_destroy = 0; 84 hp->h_hdl = hdl; 85 hp->h_cb_func = cb_func; 86 hp->h_cb_func_arg = cb_func_arg; 87 hp->h_quit = 0; 88 89 return (hp); 90 } 91 92 /* 93 * dlopen() the platform filter library and dlsym() the filter funcs. 94 */ 95 static void 96 exs_filter_init(fmd_hdl_t *hdl) 97 { 98 char *propstr = fmd_prop_get_string(hdl, "filter_path"); 99 100 if (propstr == NULL) { 101 fmd_hdl_debug(hdl, "No filter plugin specified"); 102 Send_filter = NULL; 103 Post_filter = NULL; 104 return; 105 } else { 106 if ((Dlp = dlopen(propstr, RTLD_LOCAL | RTLD_NOW)) == NULL) { 107 fmd_hdl_debug(hdl, "Failed to dlopen filter plugin"); 108 Send_filter = NULL; 109 Post_filter = NULL; 110 fmd_prop_free_string(hdl, propstr); 111 return; 112 } 113 114 if ((Send_filter = (int (*)())dlsym(Dlp, "send_filter")) 115 == NULL) { 116 fmd_hdl_debug(hdl, "failed to dlsym send_filter()"); 117 Send_filter = NULL; 118 } 119 120 if ((Post_filter = (int (*)())dlsym(Dlp, "post_filter")) 121 == NULL) { 122 fmd_hdl_debug(hdl, "failed to dlsym post_filter()"); 123 Post_filter = NULL; 124 } 125 } 126 127 fmd_prop_free_string(hdl, propstr); 128 } 129 130 /* 131 * If open, dlclose() the platform filter library. 132 */ 133 /*ARGSUSED*/ 134 static void 135 exs_filter_fini(fmd_hdl_t *hdl) 136 { 137 if (Dlp != NULL) 138 (void) dlclose(Dlp); 139 } 140 141 /* 142 * Translate endpoint_id string to int. 143 * Return the domain ID via "dom_id". 144 * Return 0 for success, nonzero for failure 145 */ 146 static int 147 exs_get_id(fmd_hdl_t *hdl, char *endpoint_id, int *dom_id) 148 { 149 char *ptr; 150 151 if (strstr(endpoint_id, EXS_SP_PREFIX) != NULL) { 152 /* Remote endpoint is the SP */ 153 *dom_id = DSCP_IDENT_SP; 154 return (0); 155 } else { 156 if ((ptr = strstr(endpoint_id, EXS_DOMAIN_PREFIX)) == NULL) { 157 fmd_hdl_error(hdl, "Property parsing error : %s not " 158 "found in %s. Check event-transport.conf\n", 159 EXS_DOMAIN_PREFIX, endpoint_id); 160 return (1); 161 } 162 163 ptr += EXS_DOMAIN_PREFIX_LEN; 164 165 if ((sscanf(ptr, "%d", dom_id)) != 1) { 166 fmd_hdl_error(hdl, "Property parsing error : no " 167 "integer found in %s. Check event-transport.conf\n", 168 endpoint_id); 169 return (2); 170 } 171 } 172 173 return (0); 174 } 175 176 /* 177 * Prepare the client connection. 178 * Return 0 for success, nonzero for failure. 179 */ 180 static int 181 exs_prep_client(exs_hdl_t *hp) 182 { 183 int rv, optval = 1; 184 struct linger ling; 185 186 /* Find the DSCP address for the remote endpoint */ 187 if ((rv = dscpAddr(hp->h_dom, DSCP_ADDR_REMOTE, 188 (struct sockaddr *)&hp->h_client.c_saddr, 189 &hp->h_client.c_len)) != DSCP_OK) { 190 fmd_hdl_debug(hp->h_hdl, "dscpAddr on the client socket " 191 "failed for %s : rv = %d\n", hp->h_endpt_id, rv); 192 return (1); 193 } 194 195 if ((hp->h_client.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { 196 fmd_hdl_error(hp->h_hdl, "Failed to create the client socket " 197 "for %s", hp->h_endpt_id); 198 return (2); 199 } 200 201 if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_REUSEADDR, 202 &optval, sizeof (optval))) { 203 fmd_hdl_error(hp->h_hdl, "Failed to set REUSEADDR on the " 204 "client socket for %s", hp->h_endpt_id); 205 EXS_CLOSE_CLR(hp->h_client); 206 return (3); 207 } 208 209 /* 210 * Set SO_LINGER so TCP aborts the connection when closed. 211 * If the domain's client socket goes into the TIME_WAIT state, 212 * ETM will be unable to connect to the SP until this clears. 213 * This connection is over DSCP, which is a simple point-to-point 214 * connection and therefore has no routers or multiple forwarding. 215 * The risk of receiving old packets from a previously terminated 216 * connection is very small. 217 */ 218 ling.l_onoff = 1; 219 ling.l_linger = 0; 220 if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_LINGER, &ling, 221 sizeof (ling))) { 222 fmd_hdl_error(hp->h_hdl, "Failed to set SO_LINGER on the " 223 "client socket for %s", hp->h_endpt_id); 224 EXS_CLOSE_CLR(hp->h_client); 225 return (4); 226 } 227 228 /* Bind the socket to the local IP address of the DSCP link */ 229 if ((rv = dscpBind(hp->h_dom, hp->h_client.c_sd, 230 EXS_CLIENT_PORT)) != DSCP_OK) { 231 if (rv == DSCP_ERROR_DOWN) { 232 fmd_hdl_debug(hp->h_hdl, "xport - dscp link for %s " 233 "is down", hp->h_endpt_id); 234 } else { 235 fmd_hdl_debug(hp->h_hdl, "dscpBind on the client " 236 "socket failed : rv = %d\n", rv); 237 } 238 EXS_CLOSE_CLR(hp->h_client); 239 return (5); 240 } 241 242 hp->h_client.c_saddr.sin_port = htons(EXS_SERVER_PORT); 243 244 /* Set IPsec security policy for this socket */ 245 if ((rv = dscpSecure(hp->h_dom, hp->h_client.c_sd)) != DSCP_OK) { 246 fmd_hdl_error(hp->h_hdl, "dscpSecure on the client socket " 247 "failed for %s : rv = %d\n", hp->h_endpt_id, rv); 248 EXS_CLOSE_CLR(hp->h_client); 249 return (6); 250 } 251 252 return (0); 253 } 254 255 /* 256 * Server function/thread. There is one thread per endpoint. 257 * Accepts incoming connections and notifies ETM of incoming data. 258 */ 259 void 260 exs_server(void *arg) 261 { 262 exs_hdl_t *hp = (exs_hdl_t *)arg; 263 struct pollfd pfd; 264 265 while (!hp->h_quit) { 266 pfd.events = POLLIN; 267 pfd.revents = 0; 268 pfd.fd = hp->h_server.c_sd; 269 270 if (poll(&pfd, 1, -1) <= 0) 271 continue; /* loop around and check h_quit */ 272 273 if (pfd.revents & (POLLHUP | POLLERR)) { 274 fmd_hdl_debug(hp->h_hdl, "xport - poll hangup/err for " 275 "%s server socket", hp->h_endpt_id); 276 EXS_CLOSE_CLR(hp->h_server); 277 hp->h_destroy++; 278 break; /* thread exits */ 279 } 280 281 if (pfd.revents & POLLIN) { 282 /* Notify ETM that incoming data is available */ 283 if (hp->h_cb_func(hp->h_hdl, &hp->h_server, 284 ETM_CBFLAG_RECV, hp->h_cb_func_arg)) { 285 /* 286 * For any non-zero return, close the 287 * connection and exit the thread. 288 */ 289 EXS_CLOSE_CLR(hp->h_server); 290 hp->h_destroy++; 291 break; /* thread exits */ 292 } 293 } 294 } 295 296 fmd_hdl_debug(hp->h_hdl, "xport - exiting server thread for %s", 297 hp->h_endpt_id); 298 } 299 300 /* 301 * Accept a new incoming connection. 302 */ 303 static void 304 exs_accept(fmd_hdl_t *hdl) 305 { 306 int new_sd, dom, flags, rv; 307 struct sockaddr_in new_saddr; 308 socklen_t new_len = sizeof (struct sockaddr); 309 exs_hdl_t *hp; 310 311 if ((new_sd = accept(Acc.c_sd, (struct sockaddr *)&new_saddr, 312 &new_len)) != -1) { 313 /* Translate saddr to domain id */ 314 if ((rv = dscpIdent((struct sockaddr *)&new_saddr, (int)new_len, 315 &dom)) != DSCP_OK) { 316 fmd_hdl_error(hdl, "dscpIdent failed : rv = %d\n", rv); 317 (void) close(new_sd); 318 return; 319 } 320 321 /* Find the exs_hdl_t for the domain trying to connect */ 322 (void) pthread_mutex_lock(&List_lock); 323 for (hp = Exh_head; hp; hp = hp->h_next) { 324 if (hp->h_dom == dom) 325 break; 326 } 327 (void) pthread_mutex_unlock(&List_lock); 328 329 if (hp == NULL) { 330 fmd_hdl_error(hdl, "Not configured to accept a " 331 "connection from domain %d. Check " 332 "event-transport.conf\n", dom); 333 (void) close(new_sd); 334 return; 335 } 336 337 /* Authenticate this connection request */ 338 if ((rv = dscpAuth(dom, (struct sockaddr *)&new_saddr, 339 (int)new_len)) != DSCP_OK) { 340 fmd_hdl_error(hdl, "dscpAuth failed for %s : rv = %d ", 341 " Possible spoofing attack\n", hp->h_endpt_id, rv); 342 (void) close(new_sd); 343 return; 344 } 345 346 if (hp->h_tid != EXS_TID_FREE) { 347 hp->h_quit = 1; 348 fmd_thr_signal(hp->h_hdl, hp->h_tid); 349 fmd_thr_destroy(hp->h_hdl, hp->h_tid); 350 hp->h_destroy = 0; 351 hp->h_quit = 0; 352 } 353 354 if (hp->h_server.c_sd != EXS_SD_FREE) 355 EXS_CLOSE_CLR(hp->h_server); 356 357 /* Set the socket to be non-blocking */ 358 flags = fcntl(new_sd, F_GETFL, 0); 359 (void) fcntl(new_sd, F_SETFL, flags | O_NONBLOCK); 360 361 hp->h_server.c_sd = new_sd; 362 363 hp->h_tid = fmd_thr_create(hdl, exs_server, hp); 364 365 } else { 366 fmd_hdl_error(hdl, "Failed to accept() a new connection"); 367 } 368 } 369 370 /* 371 * Listen for and accept incoming connections. 372 * There is only one such thread. 373 */ 374 void 375 exs_listen(void *arg) 376 { 377 fmd_hdl_t *hdl = (fmd_hdl_t *)arg; 378 struct pollfd pfd; 379 380 while (!Acc_quit) { 381 pfd.events = POLLIN; 382 pfd.revents = 0; 383 pfd.fd = Acc.c_sd; 384 385 if (poll(&pfd, 1, -1) <= 0) 386 continue; /* loop around and check Acc_quit */ 387 388 if (pfd.revents & (POLLHUP | POLLERR)) { 389 fmd_hdl_debug(hdl, "xport - poll hangup/err on " 390 "accept socket"); 391 EXS_CLOSE_CLR(Acc); 392 Acc_destroy++; 393 break; /* thread exits */ 394 } 395 396 if (pfd.revents & POLLIN) 397 exs_accept(hdl); 398 } 399 400 fmd_hdl_debug(hdl, "xport - exiting accept-listen thread"); 401 } 402 403 /* 404 * Prepare to accept a connection. 405 * Return 0 for success, nonzero for failure. 406 */ 407 void 408 exs_prep_accept(fmd_hdl_t *hdl, int dom) 409 { 410 int flags, optval = 1; 411 int rv; 412 413 if (Acc.c_sd != EXS_SD_FREE) 414 return; /* nothing to do */ 415 416 if (Acc_destroy) { 417 fmd_thr_destroy(hdl, Acc_tid); 418 Acc_tid = EXS_TID_FREE; 419 } 420 421 /* Check to see if the DSCP interface is configured */ 422 if ((rv = dscpAddr(dom, DSCP_ADDR_LOCAL, 423 (struct sockaddr *)&Acc.c_saddr, &Acc.c_len)) != DSCP_OK) { 424 fmd_hdl_debug(hdl, "xport - dscpAddr on the accept socket " 425 "failed for domain %d : rv = %d", dom, rv); 426 return; 427 } 428 429 if ((Acc.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { 430 fmd_hdl_error(hdl, "Failed to create the accept socket"); 431 return; 432 } 433 434 if (setsockopt(Acc.c_sd, SOL_SOCKET, SO_REUSEADDR, &optval, 435 sizeof (optval))) { 436 fmd_hdl_error(hdl, "Failed to set REUSEADDR for the accept " 437 "socket"); 438 EXS_CLOSE_CLR(Acc); 439 return; 440 } 441 442 /* Bind the socket to the local IP address of the DSCP link */ 443 if ((rv = dscpBind(dom, Acc.c_sd, EXS_SERVER_PORT)) != DSCP_OK) { 444 if (rv == DSCP_ERROR_DOWN) { 445 fmd_hdl_debug(hdl, "xport - dscp link for domain %d " 446 "is down", dom); 447 } else { 448 fmd_hdl_debug(hdl, "dscpBind on the accept socket " 449 "failed : rv = %d\n", rv); 450 } 451 EXS_CLOSE_CLR(Acc); 452 return; 453 } 454 455 /* Activate IPsec security policy for this socket */ 456 if ((rv = dscpSecure(dom, Acc.c_sd)) != DSCP_OK) { 457 fmd_hdl_error(hdl, "dscpSecure on the accept socket failed : " 458 "rv = %d\n", dom, rv); 459 EXS_CLOSE_CLR(Acc); 460 return; 461 } 462 463 if ((listen(Acc.c_sd, EXS_NUM_SOCKS)) == -1) { 464 fmd_hdl_debug(hdl, "Failed to listen() for connections"); 465 EXS_CLOSE_CLR(Acc); 466 return; 467 } 468 469 flags = fcntl(Acc.c_sd, F_GETFL, 0); 470 (void) fcntl(Acc.c_sd, F_SETFL, flags | O_NONBLOCK); 471 472 Acc_tid = fmd_thr_create(hdl, exs_listen, hdl); 473 } 474 475 /* 476 * * * * * * * * * * * * * * * * * * * * * * * * * * * 477 * ETM-to-Transport API Connection Management routines 478 * * * * * * * * * * * * * * * * * * * * * * * * * * * 479 */ 480 481 /* 482 * Initialize and setup any transport infrastructure before any connections 483 * are opened. 484 * Return etm_xport_hdl_t for success, NULL for failure. 485 */ 486 etm_xport_hdl_t 487 etm_xport_init(fmd_hdl_t *hdl, char *endpoint_id, 488 int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag, 489 void *arg), void *cb_func_arg) 490 { 491 exs_hdl_t *hp, *curr; 492 int dom; 493 494 if (exs_get_id(hdl, endpoint_id, &dom)) 495 return (NULL); 496 497 (void) pthread_mutex_lock(&List_lock); 498 499 /* Check for a duplicate endpoint_id on the list */ 500 for (curr = Exh_head; curr; curr = curr->h_next) { 501 if (dom == curr->h_dom) { 502 fmd_hdl_debug(hdl, "xport - init failed, " 503 "duplicate domain id : %d\n", dom); 504 (void) pthread_mutex_unlock(&List_lock); 505 return (NULL); 506 } 507 } 508 509 if (Exh_head == NULL) { 510 /* Do one-time initializations */ 511 exs_filter_init(hdl); 512 513 /* Initialize the accept/listen vars */ 514 Acc.c_sd = EXS_SD_FREE; 515 Acc_tid = EXS_TID_FREE; 516 Acc_destroy = 0; 517 Acc_quit = 0; 518 } 519 520 hp = exs_hdl_alloc(hdl, endpoint_id, cb_func, cb_func_arg, dom); 521 522 /* Add this transport instance handle to the list */ 523 hp->h_next = Exh_head; 524 Exh_head = hp; 525 526 (void) pthread_mutex_unlock(&List_lock); 527 528 exs_prep_accept(hdl, dom); 529 530 return ((etm_xport_hdl_t)hp); 531 } 532 533 /* 534 * Teardown any transport infrastructure after all connections are closed. 535 * Return 0 for success, or nonzero for failure. 536 */ 537 int 538 etm_xport_fini(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl) 539 { 540 exs_hdl_t *hp = (exs_hdl_t *)tlhdl; 541 exs_hdl_t *xp, **ppx; 542 543 (void) pthread_mutex_lock(&List_lock); 544 545 ppx = &Exh_head; 546 547 for (xp = *ppx; xp; xp = xp->h_next) { 548 if (xp != hp) 549 ppx = &xp->h_next; 550 else 551 break; 552 } 553 554 if (xp != hp) { 555 (void) pthread_mutex_unlock(&List_lock); 556 fmd_hdl_abort(hdl, "xport - fini failed, tlhdl %p not on list", 557 (void *)hp); 558 } 559 560 *ppx = hp->h_next; 561 hp->h_next = NULL; 562 563 if (hp->h_tid != EXS_TID_FREE) { 564 hp->h_quit = 1; 565 fmd_thr_signal(hdl, hp->h_tid); 566 fmd_thr_destroy(hdl, hp->h_tid); 567 } 568 569 if (hp->h_server.c_sd != EXS_SD_FREE) 570 (void) close(hp->h_server.c_sd); 571 572 if (hp->h_client.c_sd != EXS_SD_FREE) 573 (void) close(hp->h_client.c_sd); 574 575 fmd_hdl_strfree(hdl, hp->h_endpt_id); 576 fmd_hdl_free(hdl, hp, sizeof (exs_hdl_t)); 577 578 if (Exh_head == NULL) { 579 /* Undo one-time initializations */ 580 exs_filter_fini(hdl); 581 582 /* Destroy the accept/listen thread */ 583 if (Acc_tid != EXS_TID_FREE) { 584 Acc_quit = 1; 585 fmd_thr_signal(hdl, Acc_tid); 586 fmd_thr_destroy(hdl, Acc_tid); 587 } 588 589 if (Acc.c_sd != EXS_SD_FREE) 590 EXS_CLOSE_CLR(Acc); 591 } 592 593 (void) pthread_mutex_unlock(&List_lock); 594 595 return (0); 596 } 597 598 /* 599 * Open a connection with the given endpoint, 600 * Return etm_xport_conn_t for success, NULL and set errno for failure. 601 */ 602 etm_xport_conn_t 603 etm_xport_open(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl) 604 { 605 int flags; 606 exs_hdl_t *hp = (exs_hdl_t *)tlhdl; 607 608 if (hp->h_destroy) { 609 fmd_thr_destroy(hp->h_hdl, hp->h_tid); 610 hp->h_tid = EXS_TID_FREE; 611 hp->h_destroy = 0; 612 } 613 614 if (hp->h_client.c_sd == EXS_SD_FREE) { 615 if (exs_prep_client(hp) != 0) 616 return (NULL); 617 } 618 619 /* Set the socket to be non-blocking */ 620 flags = fcntl(hp->h_client.c_sd, F_GETFL, 0); 621 (void) fcntl(hp->h_client.c_sd, F_SETFL, flags | O_NONBLOCK); 622 623 if ((connect(hp->h_client.c_sd, 624 (struct sockaddr *)&hp->h_client.c_saddr, 625 hp->h_client.c_len)) == -1) { 626 if (errno != EINPROGRESS) { 627 fmd_hdl_debug(hdl, "xport - failed to connect to %s", 628 hp->h_endpt_id); 629 EXS_CLOSE_CLR(hp->h_client); 630 return (NULL); 631 } 632 } 633 634 fmd_hdl_debug(hdl, "xport - connected client socket for %s", 635 hp->h_endpt_id); 636 637 return (&hp->h_client); 638 } 639 640 /* 641 * Close a connection from either endpoint. 642 * Return zero for success, nonzero for failure. 643 */ 644 /*ARGSUSED*/ 645 int 646 etm_xport_close(fmd_hdl_t *hdl, etm_xport_conn_t conn) 647 { 648 exs_conn_t *cp = (exs_conn_t *)conn; 649 650 if (cp->c_sd == EXS_SD_FREE) 651 return (0); /* Connection already closed */ 652 653 (void) close(cp->c_sd); 654 cp->c_sd = EXS_SD_FREE; 655 656 return (0); 657 } 658 659 /* 660 * * * * * * * * * * * * * * * * * * 661 * ETM-to-Transport API I/O routines 662 * * * * * * * * * * * * * * * * * * 663 */ 664 665 /* 666 * Try to read byte_cnt bytes from the connection into the given buffer. 667 * Return how many bytes actually read for success, negative value for failure. 668 */ 669 ssize_t 670 etm_xport_read(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout, 671 void *buf, size_t byte_cnt) 672 { 673 ssize_t len, nbytes = 0; 674 hrtime_t endtime, sleeptime; 675 struct timespec tms; 676 char *ptr = (char *)buf; 677 exs_conn_t *cp = (exs_conn_t *)conn; 678 679 if (cp->c_sd == EXS_SD_FREE) { 680 fmd_hdl_debug(hdl, "xport - read socket %d is closed\n", 681 cp->c_sd); 682 return (-EBADF); 683 } 684 685 endtime = gethrtime() + timeout; 686 sleeptime = timeout / EXS_IO_SLEEP_DIV; 687 688 tms.tv_sec = 0; 689 tms.tv_nsec = sleeptime; 690 691 while (nbytes < byte_cnt) { 692 if (gethrtime() < endtime) { 693 if ((len = recv(cp->c_sd, ptr, byte_cnt - nbytes, 694 0)) < 0) { 695 if (errno != EINTR && errno != EWOULDBLOCK) { 696 fmd_hdl_debug(hdl, "xport - recv " 697 "failed for socket %d", cp->c_sd); 698 } 699 700 (void) nanosleep(&tms, 0); 701 continue; 702 } else if (len == 0) { 703 fmd_hdl_debug(hdl, "xport - remote endpt " 704 "closed for socket %d", cp->c_sd); 705 return (0); 706 } 707 708 ptr += len; 709 nbytes += len; 710 } else { 711 fmd_hdl_debug(hdl, "xport - read timed out for socket " 712 "%d", cp->c_sd); 713 break; 714 } 715 } 716 717 if (nbytes) 718 return (nbytes); 719 else 720 return (-1); 721 } 722 723 /* 724 * Try to write byte_cnt bytes to the connection from the given buffer. 725 * Return how many bytes actually written for success, negative value 726 * for failure. 727 */ 728 ssize_t 729 etm_xport_write(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout, 730 void *buf, size_t byte_cnt) 731 { 732 ssize_t len, nbytes = 0; 733 hrtime_t endtime, sleeptime; 734 struct timespec tms; 735 char *ptr = (char *)buf; 736 exs_conn_t *cp = (exs_conn_t *)conn; 737 738 if (cp->c_sd == EXS_SD_FREE) { 739 fmd_hdl_debug(hdl, "xport - write socket %d is closed\n", 740 cp->c_sd); 741 return (-EBADF); 742 } 743 744 endtime = gethrtime() + timeout; 745 sleeptime = timeout / EXS_IO_SLEEP_DIV; 746 747 tms.tv_sec = 0; 748 tms.tv_nsec = sleeptime; 749 750 while (nbytes < byte_cnt) { 751 if (gethrtime() < endtime) { 752 if ((len = send(cp->c_sd, ptr, byte_cnt - nbytes, 753 0)) < 0) { 754 if (errno != EINTR && errno != EWOULDBLOCK) { 755 fmd_hdl_debug(hdl, "xport - send " 756 "failed for socket %d", cp->c_sd); 757 } 758 759 (void) nanosleep(&tms, 0); 760 continue; 761 } 762 763 ptr += len; 764 nbytes += len; 765 } else { 766 fmd_hdl_debug(hdl, "xport - write timed out for socket " 767 "%d", cp->c_sd); 768 break; 769 } 770 } 771 772 if (nbytes) 773 return (nbytes); 774 else 775 return (-1); 776 } 777 778 /* 779 * * * * * * * * * * * * * * * * * * * * 780 * ETM-to-Transport API Filter routines 781 * * * * * * * * * * * * * * * * * * * * 782 */ 783 784 /* 785 * Call the platform's send_filter function. 786 * Otherwise return ETM_XPORT_FILTER_OK. 787 */ 788 int 789 etm_xport_send_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *dest) 790 { 791 if (Send_filter != NULL) 792 return (Send_filter(hdl, event, dest)); 793 else 794 return (ETM_XPORT_FILTER_OK); 795 } 796 797 /* 798 * Call the platform's post_filter function. 799 * Otherwise return ETM_XPORT_FILTER_OK. 800 */ 801 int 802 etm_xport_post_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *src) 803 { 804 if (Post_filter != NULL) 805 return (Post_filter(hdl, event, src)); 806 else 807 return (ETM_XPORT_FILTER_OK); 808 } 809