1 /*- 2 * Copyright (c) 2012 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 * 29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $ 30 */ 31 32 #include <config/config.h> 33 34 #include <sys/param.h> 35 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 36 #include <sys/endian.h> 37 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 38 #ifdef HAVE_MACHINE_ENDIAN_H 39 #include <machine/endian.h> 40 #else /* !HAVE_MACHINE_ENDIAN_H */ 41 #ifdef HAVE_ENDIAN_H 42 #include <endian.h> 43 #else /* !HAVE_ENDIAN_H */ 44 #error "No supported endian.h" 45 #endif /* !HAVE_ENDIAN_H */ 46 #endif /* !HAVE_MACHINE_ENDIAN_H */ 47 #include <compat/endian.h> 48 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 49 #include <sys/queue.h> 50 #include <sys/stat.h> 51 #include <sys/wait.h> 52 53 #include <stdio.h> 54 #include <stdlib.h> 55 #include <unistd.h> 56 57 #include <ctype.h> 58 #include <dirent.h> 59 #include <err.h> 60 #include <errno.h> 61 #include <fcntl.h> 62 #ifdef HAVE_LIBUTIL_H 63 #include <libutil.h> 64 #endif 65 #include <signal.h> 66 #include <string.h> 67 #include <strings.h> 68 69 #include <openssl/hmac.h> 70 71 #ifndef HAVE_SIGTIMEDWAIT 72 #include "sigtimedwait.h" 73 #endif 74 75 #include "auditdistd.h" 76 #include "pjdlog.h" 77 #include "proto.h" 78 #include "sandbox.h" 79 #include "subr.h" 80 #include "synch.h" 81 #include "trail.h" 82 83 static struct adist_config *adcfg; 84 static struct adist_host *adhost; 85 86 static pthread_rwlock_t adist_remote_lock; 87 static pthread_mutex_t adist_remote_mtx; 88 static pthread_cond_t adist_remote_cond; 89 static struct trail *adist_trail; 90 91 static TAILQ_HEAD(, adreq) adist_free_list; 92 static pthread_mutex_t adist_free_list_lock; 93 static pthread_cond_t adist_free_list_cond; 94 static TAILQ_HEAD(, adreq) adist_send_list; 95 static pthread_mutex_t adist_send_list_lock; 96 static pthread_cond_t adist_send_list_cond; 97 static TAILQ_HEAD(, adreq) adist_recv_list; 98 static pthread_mutex_t adist_recv_list_lock; 99 static pthread_cond_t adist_recv_list_cond; 100 101 static void 102 init_environment(void) 103 { 104 struct adreq *adreq; 105 unsigned int ii; 106 107 rw_init(&adist_remote_lock); 108 mtx_init(&adist_remote_mtx); 109 cv_init(&adist_remote_cond); 110 TAILQ_INIT(&adist_free_list); 111 mtx_init(&adist_free_list_lock); 112 cv_init(&adist_free_list_cond); 113 TAILQ_INIT(&adist_send_list); 114 mtx_init(&adist_send_list_lock); 115 cv_init(&adist_send_list_cond); 116 TAILQ_INIT(&adist_recv_list); 117 mtx_init(&adist_recv_list_lock); 118 cv_init(&adist_recv_list_cond); 119 120 for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 121 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 122 if (adreq == NULL) { 123 pjdlog_exitx(EX_TEMPFAIL, 124 "Unable to allocate %zu bytes of memory for adreq object.", 125 sizeof(*adreq) + ADIST_BUF_SIZE); 126 } 127 adreq->adr_byteorder = ADIST_BYTEORDER; 128 adreq->adr_cmd = ADIST_CMD_UNDEFINED; 129 adreq->adr_seq = 0; 130 adreq->adr_datasize = 0; 131 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 132 } 133 } 134 135 static int 136 sender_connect(void) 137 { 138 unsigned char rnd[32], hash[32], resp[32]; 139 struct proto_conn *conn; 140 char welcome[8]; 141 int16_t val; 142 143 val = 1; 144 if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 145 pjdlog_exit(EX_TEMPFAIL, 146 "Unable to send connection request to parent"); 147 } 148 if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 149 pjdlog_exit(EX_TEMPFAIL, 150 "Unable to receive reply to connection request from parent"); 151 } 152 if (val != 0) { 153 errno = val; 154 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 155 adhost->adh_remoteaddr); 156 return (-1); 157 } 158 if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 159 pjdlog_exit(EX_TEMPFAIL, 160 "Unable to receive connection from parent"); 161 } 162 if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 163 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 164 adhost->adh_remoteaddr); 165 proto_close(conn); 166 return (-1); 167 } 168 pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 169 /* Error in setting timeout is not critical, but why should it fail? */ 170 if (proto_timeout(conn, adcfg->adc_timeout) < 0) 171 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 172 else 173 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 174 175 /* Exchange welcome message, which includes version number. */ 176 (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 177 if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 178 pjdlog_errno(LOG_WARNING, 179 "Unable to send welcome message to %s", 180 adhost->adh_remoteaddr); 181 proto_close(conn); 182 return (-1); 183 } 184 pjdlog_debug(1, "Welcome message sent (%s).", welcome); 185 bzero(welcome, sizeof(welcome)); 186 if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 187 pjdlog_errno(LOG_WARNING, 188 "Unable to receive welcome message from %s", 189 adhost->adh_remoteaddr); 190 proto_close(conn); 191 return (-1); 192 } 193 if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 194 !isdigit(welcome[6]) || welcome[7] != '\0') { 195 pjdlog_warning("Invalid welcome message from %s.", 196 adhost->adh_remoteaddr); 197 proto_close(conn); 198 return (-1); 199 } 200 pjdlog_debug(1, "Welcome message received (%s).", welcome); 201 /* 202 * Receiver can only reply with version number lower or equal to 203 * the one we sent. 204 */ 205 adhost->adh_version = atoi(welcome + 5); 206 if (adhost->adh_version > ADIST_VERSION) { 207 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 208 adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 209 proto_close(conn); 210 return (-1); 211 } 212 213 pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 214 adhost->adh_remoteaddr); 215 216 if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 217 pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 218 adhost->adh_remoteaddr); 219 proto_close(conn); 220 return (-1); 221 } 222 pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 223 224 if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 225 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 226 adhost->adh_remoteaddr); 227 proto_close(conn); 228 return (-1); 229 } 230 pjdlog_debug(1, "Challenge received."); 231 232 if (HMAC(EVP_sha256(), adhost->adh_password, 233 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 234 NULL) == NULL) { 235 pjdlog_warning("Unable to generate response."); 236 proto_close(conn); 237 return (-1); 238 } 239 pjdlog_debug(1, "Response generated."); 240 241 if (proto_send(conn, hash, sizeof(hash)) == -1) { 242 pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 243 adhost->adh_remoteaddr); 244 proto_close(conn); 245 return (-1); 246 } 247 pjdlog_debug(1, "Response sent."); 248 249 if (adist_random(rnd, sizeof(rnd)) == -1) { 250 pjdlog_warning("Unable to generate challenge."); 251 proto_close(conn); 252 return (-1); 253 } 254 pjdlog_debug(1, "Challenge generated."); 255 256 if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 257 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 258 adhost->adh_remoteaddr); 259 proto_close(conn); 260 return (-1); 261 } 262 pjdlog_debug(1, "Challenge sent."); 263 264 if (proto_recv(conn, resp, sizeof(resp)) == -1) { 265 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 266 adhost->adh_remoteaddr); 267 proto_close(conn); 268 return (-1); 269 } 270 pjdlog_debug(1, "Response received."); 271 272 if (HMAC(EVP_sha256(), adhost->adh_password, 273 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 274 NULL) == NULL) { 275 pjdlog_warning("Unable to generate hash."); 276 proto_close(conn); 277 return (-1); 278 } 279 pjdlog_debug(1, "Hash generated."); 280 281 if (memcmp(resp, hash, sizeof(hash)) != 0) { 282 pjdlog_warning("Invalid response from %s (wrong password?).", 283 adhost->adh_remoteaddr); 284 proto_close(conn); 285 return (-1); 286 } 287 pjdlog_info("Receiver authenticated."); 288 289 if (proto_recv(conn, &adhost->adh_trail_offset, 290 sizeof(adhost->adh_trail_offset)) == -1) { 291 pjdlog_errno(LOG_WARNING, 292 "Unable to receive size of the most recent trail file from %s", 293 adhost->adh_remoteaddr); 294 proto_close(conn); 295 return (-1); 296 } 297 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 298 if (proto_recv(conn, &adhost->adh_trail_name, 299 sizeof(adhost->adh_trail_name)) == -1) { 300 pjdlog_errno(LOG_WARNING, 301 "Unable to receive name of the most recent trail file from %s", 302 adhost->adh_remoteaddr); 303 proto_close(conn); 304 return (-1); 305 } 306 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 307 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 308 309 rw_wlock(&adist_remote_lock); 310 mtx_lock(&adist_remote_mtx); 311 PJDLOG_ASSERT(adhost->adh_remote == NULL); 312 PJDLOG_ASSERT(conn != NULL); 313 adhost->adh_remote = conn; 314 mtx_unlock(&adist_remote_mtx); 315 rw_unlock(&adist_remote_lock); 316 cv_signal(&adist_remote_cond); 317 318 return (0); 319 } 320 321 static void 322 sender_disconnect(void) 323 { 324 325 rw_wlock(&adist_remote_lock); 326 /* 327 * Check for a race between dropping rlock and acquiring wlock - 328 * another thread can close connection in-between. 329 */ 330 if (adhost->adh_remote == NULL) { 331 rw_unlock(&adist_remote_lock); 332 return; 333 } 334 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 335 proto_close(adhost->adh_remote); 336 mtx_lock(&adist_remote_mtx); 337 adhost->adh_remote = NULL; 338 adhost->adh_reset = true; 339 adhost->adh_trail_name[0] = '\0'; 340 adhost->adh_trail_offset = 0; 341 mtx_unlock(&adist_remote_mtx); 342 rw_unlock(&adist_remote_lock); 343 344 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 345 346 /* Move all in-flight requests back onto free list. */ 347 mtx_lock(&adist_free_list_lock); 348 mtx_lock(&adist_send_list_lock); 349 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 350 mtx_unlock(&adist_send_list_lock); 351 mtx_lock(&adist_recv_list_lock); 352 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 353 mtx_unlock(&adist_recv_list_lock); 354 mtx_unlock(&adist_free_list_lock); 355 } 356 357 static void 358 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 359 size_t size) 360 { 361 static uint64_t seq = 1; 362 363 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 364 365 switch (cmd) { 366 case ADIST_CMD_OPEN: 367 case ADIST_CMD_CLOSE: 368 PJDLOG_ASSERT(data != NULL && size == 0); 369 size = strlen(data) + 1; 370 break; 371 case ADIST_CMD_APPEND: 372 PJDLOG_ASSERT(data != NULL && size > 0); 373 break; 374 case ADIST_CMD_KEEPALIVE: 375 case ADIST_CMD_ERROR: 376 PJDLOG_ASSERT(data == NULL && size == 0); 377 break; 378 default: 379 PJDLOG_ABORT("Invalid command (%hhu).", cmd); 380 } 381 382 adreq->adr_cmd = cmd; 383 adreq->adr_seq = seq++; 384 adreq->adr_datasize = size; 385 /* Don't copy if data is already in out buffer. */ 386 if (data != NULL && data != adreq->adr_data) 387 bcopy(data, adreq->adr_data, size); 388 } 389 390 static bool 391 read_thread_wait(void) 392 { 393 bool newfile = false; 394 395 mtx_lock(&adist_remote_mtx); 396 if (adhost->adh_reset) { 397 reset: 398 adhost->adh_reset = false; 399 if (trail_filefd(adist_trail) != -1) 400 trail_close(adist_trail); 401 trail_reset(adist_trail); 402 while (adhost->adh_remote == NULL) 403 cv_wait(&adist_remote_cond, &adist_remote_mtx); 404 trail_start(adist_trail, adhost->adh_trail_name, 405 adhost->adh_trail_offset); 406 newfile = true; 407 } 408 mtx_unlock(&adist_remote_mtx); 409 while (trail_filefd(adist_trail) == -1) { 410 newfile = true; 411 wait_for_dir(); 412 /* 413 * We may have been disconnected and reconnected in the 414 * meantime, check if reset is set. 415 */ 416 mtx_lock(&adist_remote_mtx); 417 if (adhost->adh_reset) 418 goto reset; 419 mtx_unlock(&adist_remote_mtx); 420 if (trail_filefd(adist_trail) == -1) 421 trail_next(adist_trail); 422 } 423 if (newfile) { 424 pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 425 adhost->adh_directory, 426 trail_filename(adist_trail)); 427 (void)wait_for_file_init(trail_filefd(adist_trail)); 428 } 429 return (newfile); 430 } 431 432 static void * 433 read_thread(void *arg __unused) 434 { 435 struct adreq *adreq; 436 ssize_t done; 437 bool newfile; 438 439 pjdlog_debug(1, "%s started.", __func__); 440 441 for (;;) { 442 newfile = read_thread_wait(); 443 QUEUE_TAKE(adreq, &adist_free_list, 0); 444 if (newfile) { 445 adreq_fill(adreq, ADIST_CMD_OPEN, 446 trail_filename(adist_trail), 0); 447 newfile = false; 448 goto move; 449 } 450 451 done = read(trail_filefd(adist_trail), adreq->adr_data, 452 ADIST_BUF_SIZE); 453 if (done == -1) { 454 off_t offset; 455 int error; 456 457 error = errno; 458 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 459 errno = error; 460 pjdlog_errno(LOG_ERR, 461 "Error while reading \"%s/%s\" at offset %jd", 462 adhost->adh_directory, trail_filename(adist_trail), 463 offset); 464 trail_close(adist_trail); 465 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 466 goto move; 467 } else if (done == 0) { 468 /* End of file. */ 469 pjdlog_debug(3, "End of \"%s/%s\".", 470 adhost->adh_directory, trail_filename(adist_trail)); 471 if (!trail_switch(adist_trail)) { 472 /* More audit records can arrive. */ 473 mtx_lock(&adist_free_list_lock); 474 TAILQ_INSERT_TAIL(&adist_free_list, adreq, 475 adr_next); 476 mtx_unlock(&adist_free_list_lock); 477 wait_for_file(); 478 continue; 479 } 480 adreq_fill(adreq, ADIST_CMD_CLOSE, 481 trail_filename(adist_trail), 0); 482 trail_close(adist_trail); 483 goto move; 484 } 485 486 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 487 move: 488 pjdlog_debug(3, 489 "read thread: Moving request %p to the send queue (%hhu).", 490 adreq, adreq->adr_cmd); 491 QUEUE_INSERT(adreq, &adist_send_list); 492 } 493 /* NOTREACHED */ 494 return (NULL); 495 } 496 497 static void 498 keepalive_send(void) 499 { 500 struct adreq *adreq; 501 502 rw_rlock(&adist_remote_lock); 503 if (adhost->adh_remote == NULL) { 504 rw_unlock(&adist_remote_lock); 505 return; 506 } 507 rw_unlock(&adist_remote_lock); 508 509 mtx_lock(&adist_free_list_lock); 510 adreq = TAILQ_FIRST(&adist_free_list); 511 if (adreq != NULL) 512 TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 513 mtx_unlock(&adist_free_list_lock); 514 if (adreq == NULL) 515 return; 516 517 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 518 519 QUEUE_INSERT(adreq, &adist_send_list); 520 521 pjdlog_debug(3, "keepalive_send: Request sent."); 522 } 523 524 /* 525 * Thread sends request to secondary node. 526 */ 527 static void * 528 send_thread(void *arg __unused) 529 { 530 time_t lastcheck, now; 531 struct adreq *adreq; 532 533 pjdlog_debug(1, "%s started.", __func__); 534 535 lastcheck = time(NULL); 536 537 for (;;) { 538 pjdlog_debug(3, "send thread: Taking request."); 539 for (;;) { 540 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 541 if (adreq != NULL) 542 break; 543 now = time(NULL); 544 if (lastcheck + ADIST_KEEPALIVE <= now) { 545 keepalive_send(); 546 lastcheck = now; 547 } 548 } 549 PJDLOG_ASSERT(adreq != NULL); 550 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 551 adreq->adr_cmd); 552 /* 553 * Protect connection from disappearing. 554 */ 555 rw_rlock(&adist_remote_lock); 556 /* 557 * Move the request to the recv queue first to avoid race 558 * where the recv thread receives the reply before we move 559 * the request to the recv queue. 560 */ 561 QUEUE_INSERT(adreq, &adist_recv_list); 562 if (adhost->adh_remote == NULL || 563 proto_send(adhost->adh_remote, &adreq->adr_packet, 564 ADPKT_SIZE(adreq)) == -1) { 565 rw_unlock(&adist_remote_lock); 566 pjdlog_debug(1, 567 "send thread: (%p) Unable to send request.", adreq); 568 if (adhost->adh_remote != NULL) 569 sender_disconnect(); 570 continue; 571 } else { 572 pjdlog_debug(3, "Request %p sent successfully.", adreq); 573 adreq_log(LOG_DEBUG, 2, -1, adreq, 574 "send: (%p) Request sent: ", adreq); 575 rw_unlock(&adist_remote_lock); 576 } 577 } 578 /* NOTREACHED */ 579 return (NULL); 580 } 581 582 static void 583 adrep_decode_header(struct adrep *adrep) 584 { 585 586 /* Byte-swap only is the receiver is using different byte order. */ 587 if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 588 adrep->adrp_byteorder = ADIST_BYTEORDER; 589 adrep->adrp_seq = bswap64(adrep->adrp_seq); 590 adrep->adrp_error = bswap16(adrep->adrp_error); 591 } 592 } 593 594 /* 595 * Thread receives answer from secondary node and passes it to ggate_send 596 * thread. 597 */ 598 static void * 599 recv_thread(void *arg __unused) 600 { 601 struct adrep adrep; 602 struct adreq *adreq; 603 604 pjdlog_debug(1, "%s started.", __func__); 605 606 for (;;) { 607 /* Wait until there is anything to receive. */ 608 QUEUE_WAIT(&adist_recv_list); 609 pjdlog_debug(3, "recv thread: Got something."); 610 rw_rlock(&adist_remote_lock); 611 if (adhost->adh_remote == NULL) { 612 /* 613 * Connection is dead. 614 * XXX: We shouldn't be here. 615 */ 616 rw_unlock(&adist_remote_lock); 617 continue; 618 } 619 if (proto_recv(adhost->adh_remote, &adrep, 620 sizeof(adrep)) == -1) { 621 rw_unlock(&adist_remote_lock); 622 pjdlog_errno(LOG_ERR, "Unable to receive reply"); 623 sender_disconnect(); 624 continue; 625 } 626 rw_unlock(&adist_remote_lock); 627 adrep_decode_header(&adrep); 628 /* 629 * Find the request that was just confirmed. 630 */ 631 mtx_lock(&adist_recv_list_lock); 632 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 633 if (adreq->adr_seq == adrep.adrp_seq) { 634 TAILQ_REMOVE(&adist_recv_list, adreq, 635 adr_next); 636 break; 637 } 638 } 639 if (adreq == NULL) { 640 /* 641 * If we disconnected in the meantime, just continue. 642 * On disconnect sender_disconnect() clears the queue, 643 * we can use that. 644 */ 645 if (TAILQ_EMPTY(&adist_recv_list)) { 646 mtx_unlock(&adist_recv_list_lock); 647 continue; 648 } 649 mtx_unlock(&adist_recv_list_lock); 650 pjdlog_error("Found no request matching received 'seq' field (%ju).", 651 (uintmax_t)adrep.adrp_seq); 652 sender_disconnect(); 653 continue; 654 } 655 mtx_unlock(&adist_recv_list_lock); 656 adreq_log(LOG_DEBUG, 2, -1, adreq, 657 "recv thread: (%p) Request confirmed: ", adreq); 658 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 659 adreq->adr_cmd); 660 if (adrep.adrp_error != 0) { 661 pjdlog_error("Receiver returned error (%s), disconnecting.", 662 adist_errstr((int)adrep.adrp_error)); 663 sender_disconnect(); 664 continue; 665 } 666 if (adreq->adr_cmd == ADIST_CMD_CLOSE) 667 trail_unlink(adist_trail, adreq->adr_data); 668 pjdlog_debug(3, "Request received successfully."); 669 QUEUE_INSERT(adreq, &adist_free_list); 670 } 671 /* NOTREACHED */ 672 return (NULL); 673 } 674 675 static void 676 guard_check_connection(void) 677 { 678 679 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 680 681 rw_rlock(&adist_remote_lock); 682 if (adhost->adh_remote != NULL) { 683 rw_unlock(&adist_remote_lock); 684 pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 685 adhost->adh_remoteaddr); 686 return; 687 } 688 689 /* 690 * Upgrade the lock. It doesn't have to be atomic as no other thread 691 * can change connection status from disconnected to connected. 692 */ 693 rw_unlock(&adist_remote_lock); 694 pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 695 adhost->adh_remoteaddr); 696 if (sender_connect() == 0) { 697 pjdlog_info("Successfully reconnected to %s.", 698 adhost->adh_remoteaddr); 699 } else { 700 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 701 adhost->adh_remoteaddr); 702 } 703 } 704 705 /* 706 * Thread guards remote connections and reconnects when needed, handles 707 * signals, etc. 708 */ 709 static void * 710 guard_thread(void *arg __unused) 711 { 712 struct timespec timeout; 713 time_t lastcheck, now; 714 sigset_t mask; 715 int signo; 716 717 lastcheck = time(NULL); 718 719 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 720 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 721 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 722 723 timeout.tv_sec = ADIST_KEEPALIVE; 724 timeout.tv_nsec = 0; 725 signo = -1; 726 727 for (;;) { 728 switch (signo) { 729 case SIGINT: 730 case SIGTERM: 731 sigexit_received = true; 732 pjdlog_exitx(EX_OK, 733 "Termination signal received, exiting."); 734 break; 735 default: 736 break; 737 } 738 739 pjdlog_debug(3, "remote_guard: Checking connections."); 740 now = time(NULL); 741 if (lastcheck + ADIST_KEEPALIVE <= now) { 742 guard_check_connection(); 743 lastcheck = now; 744 } 745 signo = sigtimedwait(&mask, NULL, &timeout); 746 } 747 /* NOTREACHED */ 748 return (NULL); 749 } 750 751 void 752 adist_sender(struct adist_config *config, struct adist_host *adh) 753 { 754 pthread_t td; 755 pid_t pid; 756 int error, mode, debuglevel; 757 758 /* 759 * Create communication channel for sending connection requests from 760 * child to parent. 761 */ 762 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 763 pjdlog_errno(LOG_ERR, 764 "Unable to create connection sockets between child and parent"); 765 return; 766 } 767 768 pid = fork(); 769 if (pid == -1) { 770 pjdlog_errno(LOG_ERR, "Unable to fork"); 771 proto_close(adh->adh_conn); 772 adh->adh_conn = NULL; 773 return; 774 } 775 776 if (pid > 0) { 777 /* This is parent. */ 778 adh->adh_worker_pid = pid; 779 /* Declare that we are receiver. */ 780 proto_recv(adh->adh_conn, NULL, 0); 781 return; 782 } 783 784 adcfg = config; 785 adhost = adh; 786 787 mode = pjdlog_mode_get(); 788 debuglevel = pjdlog_debug_get(); 789 790 /* Declare that we are sender. */ 791 proto_send(adhost->adh_conn, NULL, 0); 792 793 descriptors_cleanup(adhost); 794 795 #ifdef TODO 796 descriptors_assert(adhost, mode); 797 #endif 798 799 pjdlog_init(mode); 800 pjdlog_debug_set(debuglevel); 801 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 802 role2str(adhost->adh_role)); 803 #ifdef HAVE_SETPROCTITLE 804 setproctitle("[%s] (%s) ", adhost->adh_name, 805 role2str(adhost->adh_role)); 806 #endif 807 808 /* 809 * The sender process should be able to remove entries from its 810 * trail directory, but it should not be able to write to the 811 * trail files, only read from them. 812 */ 813 adist_trail = trail_new(adhost->adh_directory, false); 814 if (adist_trail == NULL) 815 exit(EX_OSFILE); 816 817 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 818 role2str(adhost->adh_role), adhost->adh_name) != 0) { 819 exit(EX_CONFIG); 820 } 821 pjdlog_info("Privileges successfully dropped."); 822 823 /* 824 * We can ignore wait_for_dir_init() failures. It will fall back to 825 * using sleep(3). 826 */ 827 (void)wait_for_dir_init(trail_dirfd(adist_trail)); 828 829 init_environment(); 830 if (sender_connect() == 0) { 831 pjdlog_info("Successfully connected to %s.", 832 adhost->adh_remoteaddr); 833 } 834 adhost->adh_reset = true; 835 836 /* 837 * Create the guard thread first, so we can handle signals from the 838 * very begining. 839 */ 840 error = pthread_create(&td, NULL, guard_thread, NULL); 841 PJDLOG_ASSERT(error == 0); 842 error = pthread_create(&td, NULL, send_thread, NULL); 843 PJDLOG_ASSERT(error == 0); 844 error = pthread_create(&td, NULL, recv_thread, NULL); 845 PJDLOG_ASSERT(error == 0); 846 (void)read_thread(NULL); 847 } 848