1 /*- 2 * Copyright (c) 2012 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <config/config.h> 31 32 #include <sys/param.h> 33 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 34 #include <sys/endian.h> 35 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 36 #ifdef HAVE_MACHINE_ENDIAN_H 37 #include <machine/endian.h> 38 #else /* !HAVE_MACHINE_ENDIAN_H */ 39 #ifdef HAVE_ENDIAN_H 40 #include <endian.h> 41 #else /* !HAVE_ENDIAN_H */ 42 #error "No supported endian.h" 43 #endif /* !HAVE_ENDIAN_H */ 44 #endif /* !HAVE_MACHINE_ENDIAN_H */ 45 #include <compat/endian.h> 46 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 47 #include <sys/queue.h> 48 #include <sys/stat.h> 49 #include <sys/wait.h> 50 51 #include <stdio.h> 52 #include <stdlib.h> 53 #include <unistd.h> 54 55 #include <ctype.h> 56 #include <dirent.h> 57 #include <err.h> 58 #include <errno.h> 59 #include <fcntl.h> 60 #ifdef HAVE_LIBUTIL_H 61 #include <libutil.h> 62 #endif 63 #include <signal.h> 64 #include <string.h> 65 #include <strings.h> 66 67 #include <openssl/hmac.h> 68 69 #ifndef HAVE_SIGTIMEDWAIT 70 #include "sigtimedwait.h" 71 #endif 72 73 #include "auditdistd.h" 74 #include "pjdlog.h" 75 #include "proto.h" 76 #include "sandbox.h" 77 #include "subr.h" 78 #include "synch.h" 79 #include "trail.h" 80 81 static struct adist_config *adcfg; 82 static struct adist_host *adhost; 83 84 static pthread_rwlock_t adist_remote_lock; 85 static pthread_mutex_t adist_remote_mtx; 86 static pthread_cond_t adist_remote_cond; 87 static struct trail *adist_trail; 88 89 static TAILQ_HEAD(, adreq) adist_free_list; 90 static pthread_mutex_t adist_free_list_lock; 91 static pthread_cond_t adist_free_list_cond; 92 static TAILQ_HEAD(, adreq) adist_send_list; 93 static pthread_mutex_t adist_send_list_lock; 94 static pthread_cond_t adist_send_list_cond; 95 static TAILQ_HEAD(, adreq) adist_recv_list; 96 static pthread_mutex_t adist_recv_list_lock; 97 static pthread_cond_t adist_recv_list_cond; 98 99 static void 100 init_environment(void) 101 { 102 struct adreq *adreq; 103 unsigned int ii; 104 105 rw_init(&adist_remote_lock); 106 mtx_init(&adist_remote_mtx); 107 cv_init(&adist_remote_cond); 108 TAILQ_INIT(&adist_free_list); 109 mtx_init(&adist_free_list_lock); 110 cv_init(&adist_free_list_cond); 111 TAILQ_INIT(&adist_send_list); 112 mtx_init(&adist_send_list_lock); 113 cv_init(&adist_send_list_cond); 114 TAILQ_INIT(&adist_recv_list); 115 mtx_init(&adist_recv_list_lock); 116 cv_init(&adist_recv_list_cond); 117 118 for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 119 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 120 if (adreq == NULL) { 121 pjdlog_exitx(EX_TEMPFAIL, 122 "Unable to allocate %zu bytes of memory for adreq object.", 123 sizeof(*adreq) + ADIST_BUF_SIZE); 124 } 125 adreq->adr_byteorder = ADIST_BYTEORDER; 126 adreq->adr_cmd = ADIST_CMD_UNDEFINED; 127 adreq->adr_seq = 0; 128 adreq->adr_datasize = 0; 129 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 130 } 131 } 132 133 static int 134 sender_connect(void) 135 { 136 unsigned char rnd[32], hash[32], resp[32]; 137 struct proto_conn *conn; 138 char welcome[8]; 139 int16_t val; 140 141 val = 1; 142 if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 143 pjdlog_exit(EX_TEMPFAIL, 144 "Unable to send connection request to parent"); 145 } 146 if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 147 pjdlog_exit(EX_TEMPFAIL, 148 "Unable to receive reply to connection request from parent"); 149 } 150 if (val != 0) { 151 errno = val; 152 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 153 adhost->adh_remoteaddr); 154 return (-1); 155 } 156 if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 157 pjdlog_exit(EX_TEMPFAIL, 158 "Unable to receive connection from parent"); 159 } 160 if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 161 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 162 adhost->adh_remoteaddr); 163 proto_close(conn); 164 return (-1); 165 } 166 pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 167 /* Error in setting timeout is not critical, but why should it fail? */ 168 if (proto_timeout(conn, adcfg->adc_timeout) < 0) 169 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 170 else 171 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 172 173 /* Exchange welcome message, which includes version number. */ 174 (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 175 if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 176 pjdlog_errno(LOG_WARNING, 177 "Unable to send welcome message to %s", 178 adhost->adh_remoteaddr); 179 proto_close(conn); 180 return (-1); 181 } 182 pjdlog_debug(1, "Welcome message sent (%s).", welcome); 183 bzero(welcome, sizeof(welcome)); 184 if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 185 pjdlog_errno(LOG_WARNING, 186 "Unable to receive welcome message from %s", 187 adhost->adh_remoteaddr); 188 proto_close(conn); 189 return (-1); 190 } 191 if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 192 !isdigit(welcome[6]) || welcome[7] != '\0') { 193 pjdlog_warning("Invalid welcome message from %s.", 194 adhost->adh_remoteaddr); 195 proto_close(conn); 196 return (-1); 197 } 198 pjdlog_debug(1, "Welcome message received (%s).", welcome); 199 /* 200 * Receiver can only reply with version number lower or equal to 201 * the one we sent. 202 */ 203 adhost->adh_version = atoi(welcome + 5); 204 if (adhost->adh_version > ADIST_VERSION) { 205 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 206 adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 207 proto_close(conn); 208 return (-1); 209 } 210 211 pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 212 adhost->adh_remoteaddr); 213 214 if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 215 pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 216 adhost->adh_remoteaddr); 217 proto_close(conn); 218 return (-1); 219 } 220 pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 221 222 if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 223 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 224 adhost->adh_remoteaddr); 225 proto_close(conn); 226 return (-1); 227 } 228 pjdlog_debug(1, "Challenge received."); 229 230 if (HMAC(EVP_sha256(), adhost->adh_password, 231 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 232 NULL) == NULL) { 233 pjdlog_warning("Unable to generate response."); 234 proto_close(conn); 235 return (-1); 236 } 237 pjdlog_debug(1, "Response generated."); 238 239 if (proto_send(conn, hash, sizeof(hash)) == -1) { 240 pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 241 adhost->adh_remoteaddr); 242 proto_close(conn); 243 return (-1); 244 } 245 pjdlog_debug(1, "Response sent."); 246 247 if (adist_random(rnd, sizeof(rnd)) == -1) { 248 pjdlog_warning("Unable to generate challenge."); 249 proto_close(conn); 250 return (-1); 251 } 252 pjdlog_debug(1, "Challenge generated."); 253 254 if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 255 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 256 adhost->adh_remoteaddr); 257 proto_close(conn); 258 return (-1); 259 } 260 pjdlog_debug(1, "Challenge sent."); 261 262 if (proto_recv(conn, resp, sizeof(resp)) == -1) { 263 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 264 adhost->adh_remoteaddr); 265 proto_close(conn); 266 return (-1); 267 } 268 pjdlog_debug(1, "Response received."); 269 270 if (HMAC(EVP_sha256(), adhost->adh_password, 271 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 272 NULL) == NULL) { 273 pjdlog_warning("Unable to generate hash."); 274 proto_close(conn); 275 return (-1); 276 } 277 pjdlog_debug(1, "Hash generated."); 278 279 if (memcmp(resp, hash, sizeof(hash)) != 0) { 280 pjdlog_warning("Invalid response from %s (wrong password?).", 281 adhost->adh_remoteaddr); 282 proto_close(conn); 283 return (-1); 284 } 285 pjdlog_info("Receiver authenticated."); 286 287 if (proto_recv(conn, &adhost->adh_trail_offset, 288 sizeof(adhost->adh_trail_offset)) == -1) { 289 pjdlog_errno(LOG_WARNING, 290 "Unable to receive size of the most recent trail file from %s", 291 adhost->adh_remoteaddr); 292 proto_close(conn); 293 return (-1); 294 } 295 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 296 if (proto_recv(conn, &adhost->adh_trail_name, 297 sizeof(adhost->adh_trail_name)) == -1) { 298 pjdlog_errno(LOG_WARNING, 299 "Unable to receive name of the most recent trail file from %s", 300 adhost->adh_remoteaddr); 301 proto_close(conn); 302 return (-1); 303 } 304 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 305 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 306 307 rw_wlock(&adist_remote_lock); 308 mtx_lock(&adist_remote_mtx); 309 PJDLOG_ASSERT(adhost->adh_remote == NULL); 310 PJDLOG_ASSERT(conn != NULL); 311 adhost->adh_remote = conn; 312 mtx_unlock(&adist_remote_mtx); 313 rw_unlock(&adist_remote_lock); 314 cv_signal(&adist_remote_cond); 315 316 return (0); 317 } 318 319 static void 320 sender_disconnect(void) 321 { 322 323 rw_wlock(&adist_remote_lock); 324 /* 325 * Check for a race between dropping rlock and acquiring wlock - 326 * another thread can close connection in-between. 327 */ 328 if (adhost->adh_remote == NULL) { 329 rw_unlock(&adist_remote_lock); 330 return; 331 } 332 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 333 proto_close(adhost->adh_remote); 334 mtx_lock(&adist_remote_mtx); 335 adhost->adh_remote = NULL; 336 adhost->adh_reset = true; 337 adhost->adh_trail_name[0] = '\0'; 338 adhost->adh_trail_offset = 0; 339 mtx_unlock(&adist_remote_mtx); 340 rw_unlock(&adist_remote_lock); 341 342 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 343 344 /* Move all in-flight requests back onto free list. */ 345 mtx_lock(&adist_free_list_lock); 346 mtx_lock(&adist_send_list_lock); 347 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 348 mtx_unlock(&adist_send_list_lock); 349 mtx_lock(&adist_recv_list_lock); 350 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 351 mtx_unlock(&adist_recv_list_lock); 352 mtx_unlock(&adist_free_list_lock); 353 } 354 355 static void 356 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 357 size_t size) 358 { 359 static uint64_t seq = 1; 360 361 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 362 363 switch (cmd) { 364 case ADIST_CMD_OPEN: 365 case ADIST_CMD_CLOSE: 366 PJDLOG_ASSERT(data != NULL && size == 0); 367 size = strlen(data) + 1; 368 break; 369 case ADIST_CMD_APPEND: 370 PJDLOG_ASSERT(data != NULL && size > 0); 371 break; 372 case ADIST_CMD_KEEPALIVE: 373 case ADIST_CMD_ERROR: 374 PJDLOG_ASSERT(data == NULL && size == 0); 375 break; 376 default: 377 PJDLOG_ABORT("Invalid command (%hhu).", cmd); 378 } 379 380 adreq->adr_cmd = cmd; 381 adreq->adr_seq = seq++; 382 adreq->adr_datasize = size; 383 /* Don't copy if data is already in out buffer. */ 384 if (data != NULL && data != adreq->adr_data) 385 bcopy(data, adreq->adr_data, size); 386 } 387 388 static bool 389 read_thread_wait(void) 390 { 391 bool newfile = false; 392 393 mtx_lock(&adist_remote_mtx); 394 if (adhost->adh_reset) { 395 reset: 396 adhost->adh_reset = false; 397 if (trail_filefd(adist_trail) != -1) 398 trail_close(adist_trail); 399 trail_reset(adist_trail); 400 while (adhost->adh_remote == NULL) 401 cv_wait(&adist_remote_cond, &adist_remote_mtx); 402 trail_start(adist_trail, adhost->adh_trail_name, 403 adhost->adh_trail_offset); 404 newfile = true; 405 } 406 mtx_unlock(&adist_remote_mtx); 407 while (trail_filefd(adist_trail) == -1) { 408 newfile = true; 409 wait_for_dir(); 410 /* 411 * We may have been disconnected and reconnected in the 412 * meantime, check if reset is set. 413 */ 414 mtx_lock(&adist_remote_mtx); 415 if (adhost->adh_reset) 416 goto reset; 417 mtx_unlock(&adist_remote_mtx); 418 if (trail_filefd(adist_trail) == -1) 419 trail_next(adist_trail); 420 } 421 if (newfile) { 422 pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 423 adhost->adh_directory, 424 trail_filename(adist_trail)); 425 (void)wait_for_file_init(trail_filefd(adist_trail)); 426 } 427 return (newfile); 428 } 429 430 static void * 431 read_thread(void *arg __unused) 432 { 433 struct adreq *adreq; 434 ssize_t done; 435 bool newfile; 436 437 pjdlog_debug(1, "%s started.", __func__); 438 439 for (;;) { 440 newfile = read_thread_wait(); 441 QUEUE_TAKE(adreq, &adist_free_list, 0); 442 if (newfile) { 443 adreq_fill(adreq, ADIST_CMD_OPEN, 444 trail_filename(adist_trail), 0); 445 newfile = false; 446 goto move; 447 } 448 449 done = read(trail_filefd(adist_trail), adreq->adr_data, 450 ADIST_BUF_SIZE); 451 if (done == -1) { 452 off_t offset; 453 int error; 454 455 error = errno; 456 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 457 errno = error; 458 pjdlog_errno(LOG_ERR, 459 "Error while reading \"%s/%s\" at offset %jd", 460 adhost->adh_directory, trail_filename(adist_trail), 461 offset); 462 trail_close(adist_trail); 463 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 464 goto move; 465 } else if (done == 0) { 466 /* End of file. */ 467 pjdlog_debug(3, "End of \"%s/%s\".", 468 adhost->adh_directory, trail_filename(adist_trail)); 469 if (!trail_switch(adist_trail)) { 470 /* More audit records can arrive. */ 471 mtx_lock(&adist_free_list_lock); 472 TAILQ_INSERT_TAIL(&adist_free_list, adreq, 473 adr_next); 474 mtx_unlock(&adist_free_list_lock); 475 wait_for_file(); 476 continue; 477 } 478 adreq_fill(adreq, ADIST_CMD_CLOSE, 479 trail_filename(adist_trail), 0); 480 trail_close(adist_trail); 481 goto move; 482 } 483 484 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 485 move: 486 pjdlog_debug(3, 487 "read thread: Moving request %p to the send queue (%hhu).", 488 adreq, adreq->adr_cmd); 489 QUEUE_INSERT(adreq, &adist_send_list); 490 } 491 /* NOTREACHED */ 492 return (NULL); 493 } 494 495 static void 496 keepalive_send(void) 497 { 498 struct adreq *adreq; 499 500 rw_rlock(&adist_remote_lock); 501 if (adhost->adh_remote == NULL) { 502 rw_unlock(&adist_remote_lock); 503 return; 504 } 505 rw_unlock(&adist_remote_lock); 506 507 mtx_lock(&adist_free_list_lock); 508 adreq = TAILQ_FIRST(&adist_free_list); 509 if (adreq != NULL) 510 TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 511 mtx_unlock(&adist_free_list_lock); 512 if (adreq == NULL) 513 return; 514 515 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 516 517 QUEUE_INSERT(adreq, &adist_send_list); 518 519 pjdlog_debug(3, "keepalive_send: Request sent."); 520 } 521 522 /* 523 * Thread sends request to secondary node. 524 */ 525 static void * 526 send_thread(void *arg __unused) 527 { 528 time_t lastcheck, now; 529 struct adreq *adreq; 530 531 pjdlog_debug(1, "%s started.", __func__); 532 533 lastcheck = time(NULL); 534 535 for (;;) { 536 pjdlog_debug(3, "send thread: Taking request."); 537 for (;;) { 538 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 539 if (adreq != NULL) 540 break; 541 now = time(NULL); 542 if (lastcheck + ADIST_KEEPALIVE <= now) { 543 keepalive_send(); 544 lastcheck = now; 545 } 546 } 547 PJDLOG_ASSERT(adreq != NULL); 548 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 549 adreq->adr_cmd); 550 /* 551 * Protect connection from disappearing. 552 */ 553 rw_rlock(&adist_remote_lock); 554 /* 555 * Move the request to the recv queue first to avoid race 556 * where the recv thread receives the reply before we move 557 * the request to the recv queue. 558 */ 559 QUEUE_INSERT(adreq, &adist_recv_list); 560 if (adhost->adh_remote == NULL || 561 proto_send(adhost->adh_remote, &adreq->adr_packet, 562 ADPKT_SIZE(adreq)) == -1) { 563 rw_unlock(&adist_remote_lock); 564 pjdlog_debug(1, 565 "send thread: (%p) Unable to send request.", adreq); 566 if (adhost->adh_remote != NULL) 567 sender_disconnect(); 568 continue; 569 } else { 570 pjdlog_debug(3, "Request %p sent successfully.", adreq); 571 adreq_log(LOG_DEBUG, 2, -1, adreq, 572 "send: (%p) Request sent: ", adreq); 573 rw_unlock(&adist_remote_lock); 574 } 575 } 576 /* NOTREACHED */ 577 return (NULL); 578 } 579 580 static void 581 adrep_decode_header(struct adrep *adrep) 582 { 583 584 /* Byte-swap only is the receiver is using different byte order. */ 585 if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 586 adrep->adrp_byteorder = ADIST_BYTEORDER; 587 adrep->adrp_seq = bswap64(adrep->adrp_seq); 588 adrep->adrp_error = bswap16(adrep->adrp_error); 589 } 590 } 591 592 /* 593 * Thread receives answer from secondary node and passes it to ggate_send 594 * thread. 595 */ 596 static void * 597 recv_thread(void *arg __unused) 598 { 599 struct adrep adrep; 600 struct adreq *adreq; 601 602 pjdlog_debug(1, "%s started.", __func__); 603 604 for (;;) { 605 /* Wait until there is anything to receive. */ 606 QUEUE_WAIT(&adist_recv_list); 607 pjdlog_debug(3, "recv thread: Got something."); 608 rw_rlock(&adist_remote_lock); 609 if (adhost->adh_remote == NULL) { 610 /* 611 * Connection is dead. 612 * XXX: We shouldn't be here. 613 */ 614 rw_unlock(&adist_remote_lock); 615 continue; 616 } 617 if (proto_recv(adhost->adh_remote, &adrep, 618 sizeof(adrep)) == -1) { 619 rw_unlock(&adist_remote_lock); 620 pjdlog_errno(LOG_ERR, "Unable to receive reply"); 621 sender_disconnect(); 622 continue; 623 } 624 rw_unlock(&adist_remote_lock); 625 adrep_decode_header(&adrep); 626 /* 627 * Find the request that was just confirmed. 628 */ 629 mtx_lock(&adist_recv_list_lock); 630 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 631 if (adreq->adr_seq == adrep.adrp_seq) { 632 TAILQ_REMOVE(&adist_recv_list, adreq, 633 adr_next); 634 break; 635 } 636 } 637 if (adreq == NULL) { 638 /* 639 * If we disconnected in the meantime, just continue. 640 * On disconnect sender_disconnect() clears the queue, 641 * we can use that. 642 */ 643 if (TAILQ_EMPTY(&adist_recv_list)) { 644 mtx_unlock(&adist_recv_list_lock); 645 continue; 646 } 647 mtx_unlock(&adist_recv_list_lock); 648 pjdlog_error("Found no request matching received 'seq' field (%ju).", 649 (uintmax_t)adrep.adrp_seq); 650 sender_disconnect(); 651 continue; 652 } 653 mtx_unlock(&adist_recv_list_lock); 654 adreq_log(LOG_DEBUG, 2, -1, adreq, 655 "recv thread: (%p) Request confirmed: ", adreq); 656 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 657 adreq->adr_cmd); 658 if (adrep.adrp_error != 0) { 659 pjdlog_error("Receiver returned error (%s), disconnecting.", 660 adist_errstr((int)adrep.adrp_error)); 661 sender_disconnect(); 662 continue; 663 } 664 if (adreq->adr_cmd == ADIST_CMD_CLOSE) 665 trail_unlink(adist_trail, adreq->adr_data); 666 pjdlog_debug(3, "Request received successfully."); 667 QUEUE_INSERT(adreq, &adist_free_list); 668 } 669 /* NOTREACHED */ 670 return (NULL); 671 } 672 673 static void 674 guard_check_connection(void) 675 { 676 677 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 678 679 rw_rlock(&adist_remote_lock); 680 if (adhost->adh_remote != NULL) { 681 rw_unlock(&adist_remote_lock); 682 pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 683 adhost->adh_remoteaddr); 684 return; 685 } 686 687 /* 688 * Upgrade the lock. It doesn't have to be atomic as no other thread 689 * can change connection status from disconnected to connected. 690 */ 691 rw_unlock(&adist_remote_lock); 692 pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 693 adhost->adh_remoteaddr); 694 if (sender_connect() == 0) { 695 pjdlog_info("Successfully reconnected to %s.", 696 adhost->adh_remoteaddr); 697 } else { 698 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 699 adhost->adh_remoteaddr); 700 } 701 } 702 703 /* 704 * Thread guards remote connections and reconnects when needed, handles 705 * signals, etc. 706 */ 707 static void * 708 guard_thread(void *arg __unused) 709 { 710 struct timespec timeout; 711 time_t lastcheck, now; 712 sigset_t mask; 713 int signo; 714 715 lastcheck = time(NULL); 716 717 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 718 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 719 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 720 721 timeout.tv_sec = ADIST_KEEPALIVE; 722 timeout.tv_nsec = 0; 723 signo = -1; 724 725 for (;;) { 726 switch (signo) { 727 case SIGINT: 728 case SIGTERM: 729 sigexit_received = true; 730 pjdlog_exitx(EX_OK, 731 "Termination signal received, exiting."); 732 break; 733 default: 734 break; 735 } 736 737 pjdlog_debug(3, "remote_guard: Checking connections."); 738 now = time(NULL); 739 if (lastcheck + ADIST_KEEPALIVE <= now) { 740 guard_check_connection(); 741 lastcheck = now; 742 } 743 signo = sigtimedwait(&mask, NULL, &timeout); 744 } 745 /* NOTREACHED */ 746 return (NULL); 747 } 748 749 void 750 adist_sender(struct adist_config *config, struct adist_host *adh) 751 { 752 pthread_t td; 753 pid_t pid; 754 int error, mode, debuglevel; 755 756 /* 757 * Create communication channel for sending connection requests from 758 * child to parent. 759 */ 760 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 761 pjdlog_errno(LOG_ERR, 762 "Unable to create connection sockets between child and parent"); 763 return; 764 } 765 766 pid = fork(); 767 if (pid == -1) { 768 pjdlog_errno(LOG_ERR, "Unable to fork"); 769 proto_close(adh->adh_conn); 770 adh->adh_conn = NULL; 771 return; 772 } 773 774 if (pid > 0) { 775 /* This is parent. */ 776 adh->adh_worker_pid = pid; 777 /* Declare that we are receiver. */ 778 proto_recv(adh->adh_conn, NULL, 0); 779 return; 780 } 781 782 adcfg = config; 783 adhost = adh; 784 785 mode = pjdlog_mode_get(); 786 debuglevel = pjdlog_debug_get(); 787 788 /* Declare that we are sender. */ 789 proto_send(adhost->adh_conn, NULL, 0); 790 791 descriptors_cleanup(adhost); 792 793 #ifdef TODO 794 descriptors_assert(adhost, mode); 795 #endif 796 797 pjdlog_init(mode); 798 pjdlog_debug_set(debuglevel); 799 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 800 role2str(adhost->adh_role)); 801 #ifdef HAVE_SETPROCTITLE 802 setproctitle("[%s] (%s) ", adhost->adh_name, 803 role2str(adhost->adh_role)); 804 #endif 805 806 /* 807 * The sender process should be able to remove entries from its 808 * trail directory, but it should not be able to write to the 809 * trail files, only read from them. 810 */ 811 adist_trail = trail_new(adhost->adh_directory, false); 812 if (adist_trail == NULL) 813 exit(EX_OSFILE); 814 815 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 816 role2str(adhost->adh_role), adhost->adh_name) != 0) { 817 exit(EX_CONFIG); 818 } 819 pjdlog_info("Privileges successfully dropped."); 820 821 /* 822 * We can ignore wait_for_dir_init() failures. It will fall back to 823 * using sleep(3). 824 */ 825 (void)wait_for_dir_init(trail_dirfd(adist_trail)); 826 827 init_environment(); 828 if (sender_connect() == 0) { 829 pjdlog_info("Successfully connected to %s.", 830 adhost->adh_remoteaddr); 831 } 832 adhost->adh_reset = true; 833 834 /* 835 * Create the guard thread first, so we can handle signals from the 836 * very begining. 837 */ 838 error = pthread_create(&td, NULL, guard_thread, NULL); 839 PJDLOG_ASSERT(error == 0); 840 error = pthread_create(&td, NULL, send_thread, NULL); 841 PJDLOG_ASSERT(error == 0); 842 error = pthread_create(&td, NULL, recv_thread, NULL); 843 PJDLOG_ASSERT(error == 0); 844 (void)read_thread(NULL); 845 } 846