1 /*- 2 * Copyright (c) 2012 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 * 29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $ 30 */ 31 32 #include <config/config.h> 33 34 #include <sys/param.h> 35 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 36 #include <sys/endian.h> 37 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 38 #ifdef HAVE_MACHINE_ENDIAN_H 39 #include <machine/endian.h> 40 #else /* !HAVE_MACHINE_ENDIAN_H */ 41 #ifdef HAVE_ENDIAN_H 42 #include <endian.h> 43 #else /* !HAVE_ENDIAN_H */ 44 #error "No supported endian.h" 45 #endif /* !HAVE_ENDIAN_H */ 46 #endif /* !HAVE_MACHINE_ENDIAN_H */ 47 #include <compat/endian.h> 48 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 49 #include <sys/queue.h> 50 #include <sys/stat.h> 51 #include <sys/wait.h> 52 53 #include <stdio.h> 54 #include <stdlib.h> 55 #include <unistd.h> 56 57 #include <ctype.h> 58 #include <dirent.h> 59 #include <err.h> 60 #include <errno.h> 61 #include <fcntl.h> 62 #ifdef HAVE_LIBUTIL_H 63 #include <libutil.h> 64 #endif 65 #include <signal.h> 66 #include <string.h> 67 #include <strings.h> 68 69 #include <openssl/hmac.h> 70 71 #ifndef HAVE_SIGTIMEDWAIT 72 #include "sigtimedwait.h" 73 #endif 74 75 #include "auditdistd.h" 76 #include "pjdlog.h" 77 #include "proto.h" 78 #include "sandbox.h" 79 #include "subr.h" 80 #include "synch.h" 81 #include "trail.h" 82 83 static struct adist_config *adcfg; 84 static struct adist_host *adhost; 85 86 static pthread_rwlock_t adist_remote_lock; 87 static pthread_mutex_t adist_remote_mtx; 88 static pthread_cond_t adist_remote_cond; 89 static struct trail *adist_trail; 90 91 static TAILQ_HEAD(, adreq) adist_free_list; 92 static pthread_mutex_t adist_free_list_lock; 93 static pthread_cond_t adist_free_list_cond; 94 static TAILQ_HEAD(, adreq) adist_send_list; 95 static pthread_mutex_t adist_send_list_lock; 96 static pthread_cond_t adist_send_list_cond; 97 static TAILQ_HEAD(, adreq) adist_recv_list; 98 static pthread_mutex_t adist_recv_list_lock; 99 static pthread_cond_t adist_recv_list_cond; 100 101 static void 102 init_environment(void) 103 { 104 struct adreq *adreq; 105 unsigned int ii; 106 107 rw_init(&adist_remote_lock); 108 mtx_init(&adist_remote_mtx); 109 cv_init(&adist_remote_cond); 110 TAILQ_INIT(&adist_free_list); 111 mtx_init(&adist_free_list_lock); 112 cv_init(&adist_free_list_cond); 113 TAILQ_INIT(&adist_send_list); 114 mtx_init(&adist_send_list_lock); 115 cv_init(&adist_send_list_cond); 116 TAILQ_INIT(&adist_recv_list); 117 mtx_init(&adist_recv_list_lock); 118 cv_init(&adist_recv_list_cond); 119 120 for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 121 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 122 if (adreq == NULL) { 123 pjdlog_exitx(EX_TEMPFAIL, 124 "Unable to allocate %zu bytes of memory for adreq object.", 125 sizeof(*adreq) + ADIST_BUF_SIZE); 126 } 127 adreq->adr_byteorder = ADIST_BYTEORDER; 128 adreq->adr_cmd = ADIST_CMD_UNDEFINED; 129 adreq->adr_seq = 0; 130 adreq->adr_datasize = 0; 131 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 132 } 133 } 134 135 static int 136 sender_connect(void) 137 { 138 unsigned char rnd[32], hash[32], resp[32]; 139 struct proto_conn *conn; 140 char welcome[8]; 141 int16_t val; 142 143 val = 1; 144 if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 145 pjdlog_exit(EX_TEMPFAIL, 146 "Unable to send connection request to parent"); 147 } 148 if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 149 pjdlog_exit(EX_TEMPFAIL, 150 "Unable to receive reply to connection request from parent"); 151 } 152 if (val != 0) { 153 errno = val; 154 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 155 adhost->adh_remoteaddr); 156 return (-1); 157 } 158 if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 159 pjdlog_exit(EX_TEMPFAIL, 160 "Unable to receive connection from parent"); 161 } 162 if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 163 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 164 adhost->adh_remoteaddr); 165 proto_close(conn); 166 return (-1); 167 } 168 pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 169 /* Error in setting timeout is not critical, but why should it fail? */ 170 if (proto_timeout(conn, adcfg->adc_timeout) < 0) 171 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 172 else 173 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 174 175 /* Exchange welcome message, which includes version number. */ 176 (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 177 if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 178 pjdlog_errno(LOG_WARNING, 179 "Unable to send welcome message to %s", 180 adhost->adh_remoteaddr); 181 proto_close(conn); 182 return (-1); 183 } 184 pjdlog_debug(1, "Welcome message sent (%s).", welcome); 185 bzero(welcome, sizeof(welcome)); 186 if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 187 pjdlog_errno(LOG_WARNING, 188 "Unable to receive welcome message from %s", 189 adhost->adh_remoteaddr); 190 proto_close(conn); 191 return (-1); 192 } 193 if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 194 !isdigit(welcome[6]) || welcome[7] != '\0') { 195 pjdlog_warning("Invalid welcome message from %s.", 196 adhost->adh_remoteaddr); 197 proto_close(conn); 198 return (-1); 199 } 200 pjdlog_debug(1, "Welcome message received (%s).", welcome); 201 /* 202 * Receiver can only reply with version number lower or equal to 203 * the one we sent. 204 */ 205 adhost->adh_version = atoi(welcome + 5); 206 if (adhost->adh_version > ADIST_VERSION) { 207 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 208 adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 209 proto_close(conn); 210 return (-1); 211 } 212 213 pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 214 adhost->adh_remoteaddr); 215 216 if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 217 pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 218 adhost->adh_remoteaddr); 219 proto_close(conn); 220 return (-1); 221 } 222 pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 223 224 if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 225 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 226 adhost->adh_remoteaddr); 227 proto_close(conn); 228 return (-1); 229 } 230 pjdlog_debug(1, "Challenge received."); 231 232 if (HMAC(EVP_sha256(), adhost->adh_password, 233 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 234 NULL) == NULL) { 235 pjdlog_warning("Unable to generate response."); 236 proto_close(conn); 237 return (-1); 238 } 239 pjdlog_debug(1, "Response generated."); 240 241 if (proto_send(conn, hash, sizeof(hash)) == -1) { 242 pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 243 adhost->adh_remoteaddr); 244 proto_close(conn); 245 return (-1); 246 } 247 pjdlog_debug(1, "Response sent."); 248 249 if (adist_random(rnd, sizeof(rnd)) == -1) { 250 pjdlog_warning("Unable to generate challenge."); 251 proto_close(conn); 252 return (-1); 253 } 254 pjdlog_debug(1, "Challenge generated."); 255 256 if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 257 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 258 adhost->adh_remoteaddr); 259 proto_close(conn); 260 return (-1); 261 } 262 pjdlog_debug(1, "Challenge sent."); 263 264 if (proto_recv(conn, resp, sizeof(resp)) == -1) { 265 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 266 adhost->adh_remoteaddr); 267 proto_close(conn); 268 return (-1); 269 } 270 pjdlog_debug(1, "Response received."); 271 272 if (HMAC(EVP_sha256(), adhost->adh_password, 273 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 274 NULL) == NULL) { 275 pjdlog_warning("Unable to generate hash."); 276 proto_close(conn); 277 return (-1); 278 } 279 pjdlog_debug(1, "Hash generated."); 280 281 if (memcmp(resp, hash, sizeof(hash)) != 0) { 282 pjdlog_warning("Invalid response from %s (wrong password?).", 283 adhost->adh_remoteaddr); 284 proto_close(conn); 285 return (-1); 286 } 287 pjdlog_info("Receiver authenticated."); 288 289 if (proto_recv(conn, &adhost->adh_trail_offset, 290 sizeof(adhost->adh_trail_offset)) == -1) { 291 pjdlog_errno(LOG_WARNING, 292 "Unable to receive size of the most recent trail file from %s", 293 adhost->adh_remoteaddr); 294 proto_close(conn); 295 return (-1); 296 } 297 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 298 if (proto_recv(conn, &adhost->adh_trail_name, 299 sizeof(adhost->adh_trail_name)) == -1) { 300 pjdlog_errno(LOG_WARNING, 301 "Unable to receive name of the most recent trail file from %s", 302 adhost->adh_remoteaddr); 303 proto_close(conn); 304 return (-1); 305 } 306 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 307 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 308 309 rw_wlock(&adist_remote_lock); 310 mtx_lock(&adist_remote_mtx); 311 PJDLOG_ASSERT(adhost->adh_remote == NULL); 312 PJDLOG_ASSERT(conn != NULL); 313 adhost->adh_remote = conn; 314 mtx_unlock(&adist_remote_mtx); 315 rw_unlock(&adist_remote_lock); 316 cv_signal(&adist_remote_cond); 317 318 return (0); 319 } 320 321 static void 322 sender_disconnect(void) 323 { 324 325 rw_wlock(&adist_remote_lock); 326 /* 327 * Check for a race between dropping rlock and acquiring wlock - 328 * another thread can close connection in-between. 329 */ 330 if (adhost->adh_remote == NULL) { 331 rw_unlock(&adist_remote_lock); 332 return; 333 } 334 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 335 proto_close(adhost->adh_remote); 336 mtx_lock(&adist_remote_mtx); 337 adhost->adh_remote = NULL; 338 adhost->adh_reset = true; 339 adhost->adh_trail_name[0] = '\0'; 340 adhost->adh_trail_offset = 0; 341 mtx_unlock(&adist_remote_mtx); 342 rw_unlock(&adist_remote_lock); 343 344 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 345 346 /* Move all in-flight requests back onto free list. */ 347 mtx_lock(&adist_free_list_lock); 348 mtx_lock(&adist_send_list_lock); 349 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 350 mtx_unlock(&adist_send_list_lock); 351 mtx_lock(&adist_recv_list_lock); 352 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 353 mtx_unlock(&adist_recv_list_lock); 354 mtx_unlock(&adist_free_list_lock); 355 } 356 357 static void 358 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 359 size_t size) 360 { 361 static uint64_t seq = 1; 362 363 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 364 365 switch (cmd) { 366 case ADIST_CMD_OPEN: 367 case ADIST_CMD_CLOSE: 368 PJDLOG_ASSERT(data != NULL && size == 0); 369 size = strlen(data) + 1; 370 break; 371 case ADIST_CMD_APPEND: 372 PJDLOG_ASSERT(data != NULL && size > 0); 373 break; 374 case ADIST_CMD_KEEPALIVE: 375 case ADIST_CMD_ERROR: 376 PJDLOG_ASSERT(data == NULL && size == 0); 377 break; 378 default: 379 PJDLOG_ABORT("Invalid command (%hhu).", cmd); 380 } 381 382 adreq->adr_cmd = cmd; 383 adreq->adr_seq = seq++; 384 adreq->adr_datasize = size; 385 /* Don't copy if data is already in out buffer. */ 386 if (data != NULL && data != adreq->adr_data) 387 bcopy(data, adreq->adr_data, size); 388 } 389 390 static bool 391 read_thread_wait(void) 392 { 393 bool newfile = false; 394 395 mtx_lock(&adist_remote_mtx); 396 if (adhost->adh_reset) { 397 adhost->adh_reset = false; 398 if (trail_filefd(adist_trail) != -1) 399 trail_close(adist_trail); 400 trail_reset(adist_trail); 401 while (adhost->adh_remote == NULL) 402 cv_wait(&adist_remote_cond, &adist_remote_mtx); 403 trail_start(adist_trail, adhost->adh_trail_name, 404 adhost->adh_trail_offset); 405 newfile = true; 406 } 407 mtx_unlock(&adist_remote_mtx); 408 while (trail_filefd(adist_trail) == -1) { 409 newfile = true; 410 wait_for_dir(); 411 if (trail_filefd(adist_trail) == -1) 412 trail_next(adist_trail); 413 } 414 if (newfile) { 415 pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 416 adhost->adh_directory, 417 trail_filename(adist_trail)); 418 (void)wait_for_file_init(trail_filefd(adist_trail)); 419 } 420 return (newfile); 421 } 422 423 static void * 424 read_thread(void *arg __unused) 425 { 426 struct adreq *adreq; 427 ssize_t done; 428 bool newfile; 429 430 pjdlog_debug(1, "%s started.", __func__); 431 432 for (;;) { 433 newfile = read_thread_wait(); 434 QUEUE_TAKE(adreq, &adist_free_list, 0); 435 if (newfile) { 436 adreq_fill(adreq, ADIST_CMD_OPEN, 437 trail_filename(adist_trail), 0); 438 newfile = false; 439 goto move; 440 } 441 442 done = read(trail_filefd(adist_trail), adreq->adr_data, 443 ADIST_BUF_SIZE); 444 if (done == -1) { 445 off_t offset; 446 int error; 447 448 error = errno; 449 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 450 errno = error; 451 pjdlog_errno(LOG_ERR, 452 "Error while reading \"%s/%s\" at offset %jd", 453 adhost->adh_directory, trail_filename(adist_trail), 454 offset); 455 trail_close(adist_trail); 456 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 457 goto move; 458 } else if (done == 0) { 459 /* End of file. */ 460 pjdlog_debug(3, "End of \"%s/%s\".", 461 adhost->adh_directory, trail_filename(adist_trail)); 462 if (!trail_switch(adist_trail)) { 463 /* More audit records can arrive. */ 464 mtx_lock(&adist_free_list_lock); 465 TAILQ_INSERT_TAIL(&adist_free_list, adreq, 466 adr_next); 467 mtx_unlock(&adist_free_list_lock); 468 wait_for_file(); 469 continue; 470 } 471 adreq_fill(adreq, ADIST_CMD_CLOSE, 472 trail_filename(adist_trail), 0); 473 trail_close(adist_trail); 474 goto move; 475 } 476 477 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 478 move: 479 pjdlog_debug(3, 480 "read thread: Moving request %p to the send queue (%hhu).", 481 adreq, adreq->adr_cmd); 482 QUEUE_INSERT(adreq, &adist_send_list); 483 } 484 /* NOTREACHED */ 485 return (NULL); 486 } 487 488 static void 489 keepalive_send(void) 490 { 491 struct adreq *adreq; 492 493 rw_rlock(&adist_remote_lock); 494 if (adhost->adh_remote == NULL) { 495 rw_unlock(&adist_remote_lock); 496 return; 497 } 498 rw_unlock(&adist_remote_lock); 499 500 mtx_lock(&adist_free_list_lock); 501 adreq = TAILQ_FIRST(&adist_free_list); 502 if (adreq != NULL) 503 TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 504 mtx_unlock(&adist_free_list_lock); 505 if (adreq == NULL) 506 return; 507 508 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 509 510 QUEUE_INSERT(adreq, &adist_send_list); 511 512 pjdlog_debug(3, "keepalive_send: Request sent."); 513 } 514 515 /* 516 * Thread sends request to secondary node. 517 */ 518 static void * 519 send_thread(void *arg __unused) 520 { 521 time_t lastcheck, now; 522 struct adreq *adreq; 523 524 pjdlog_debug(1, "%s started.", __func__); 525 526 lastcheck = time(NULL); 527 528 for (;;) { 529 pjdlog_debug(3, "send thread: Taking request."); 530 for (;;) { 531 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 532 if (adreq != NULL) 533 break; 534 now = time(NULL); 535 if (lastcheck + ADIST_KEEPALIVE <= now) { 536 keepalive_send(); 537 lastcheck = now; 538 } 539 } 540 PJDLOG_ASSERT(adreq != NULL); 541 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 542 adreq->adr_cmd); 543 /* 544 * Protect connection from disappearing. 545 */ 546 rw_rlock(&adist_remote_lock); 547 /* 548 * Move the request to the recv queue first to avoid race 549 * where the recv thread receives the reply before we move 550 * the request to the recv queue. 551 */ 552 QUEUE_INSERT(adreq, &adist_recv_list); 553 if (adhost->adh_remote == NULL || 554 proto_send(adhost->adh_remote, &adreq->adr_packet, 555 ADPKT_SIZE(adreq)) == -1) { 556 rw_unlock(&adist_remote_lock); 557 pjdlog_debug(1, 558 "send thread: (%p) Unable to send request.", adreq); 559 if (adhost->adh_remote != NULL) 560 sender_disconnect(); 561 continue; 562 } else { 563 pjdlog_debug(3, "Request %p sent successfully.", adreq); 564 adreq_log(LOG_DEBUG, 2, -1, adreq, 565 "send: (%p) Request sent: ", adreq); 566 rw_unlock(&adist_remote_lock); 567 } 568 } 569 /* NOTREACHED */ 570 return (NULL); 571 } 572 573 static void 574 adrep_decode_header(struct adrep *adrep) 575 { 576 577 /* Byte-swap only is the receiver is using different byte order. */ 578 if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 579 adrep->adrp_byteorder = ADIST_BYTEORDER; 580 adrep->adrp_seq = bswap64(adrep->adrp_seq); 581 adrep->adrp_error = bswap16(adrep->adrp_error); 582 } 583 } 584 585 /* 586 * Thread receives answer from secondary node and passes it to ggate_send 587 * thread. 588 */ 589 static void * 590 recv_thread(void *arg __unused) 591 { 592 struct adrep adrep; 593 struct adreq *adreq; 594 595 pjdlog_debug(1, "%s started.", __func__); 596 597 for (;;) { 598 /* Wait until there is anything to receive. */ 599 QUEUE_WAIT(&adist_recv_list); 600 pjdlog_debug(3, "recv thread: Got something."); 601 rw_rlock(&adist_remote_lock); 602 if (adhost->adh_remote == NULL) { 603 /* 604 * Connection is dead. 605 * XXX: We shouldn't be here. 606 */ 607 rw_unlock(&adist_remote_lock); 608 continue; 609 } 610 if (proto_recv(adhost->adh_remote, &adrep, 611 sizeof(adrep)) == -1) { 612 rw_unlock(&adist_remote_lock); 613 pjdlog_errno(LOG_ERR, "Unable to receive reply"); 614 sender_disconnect(); 615 continue; 616 } 617 rw_unlock(&adist_remote_lock); 618 adrep_decode_header(&adrep); 619 /* 620 * Find the request that was just confirmed. 621 */ 622 mtx_lock(&adist_recv_list_lock); 623 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 624 if (adreq->adr_seq == adrep.adrp_seq) { 625 TAILQ_REMOVE(&adist_recv_list, adreq, 626 adr_next); 627 break; 628 } 629 } 630 if (adreq == NULL) { 631 /* 632 * If we disconnected in the meantime, just continue. 633 * On disconnect sender_disconnect() clears the queue, 634 * we can use that. 635 */ 636 if (TAILQ_EMPTY(&adist_recv_list)) { 637 rw_unlock(&adist_remote_lock); 638 continue; 639 } 640 mtx_unlock(&adist_recv_list_lock); 641 pjdlog_error("Found no request matching received 'seq' field (%ju).", 642 (uintmax_t)adrep.adrp_seq); 643 sender_disconnect(); 644 continue; 645 } 646 mtx_unlock(&adist_recv_list_lock); 647 adreq_log(LOG_DEBUG, 2, -1, adreq, 648 "recv thread: (%p) Request confirmed: ", adreq); 649 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 650 adreq->adr_cmd); 651 if (adrep.adrp_error != 0) { 652 pjdlog_error("Receiver returned error (%s), disconnecting.", 653 adist_errstr((int)adrep.adrp_error)); 654 sender_disconnect(); 655 continue; 656 } 657 if (adreq->adr_cmd == ADIST_CMD_CLOSE) 658 trail_unlink(adist_trail, adreq->adr_data); 659 pjdlog_debug(3, "Request received successfully."); 660 QUEUE_INSERT(adreq, &adist_free_list); 661 } 662 /* NOTREACHED */ 663 return (NULL); 664 } 665 666 static void 667 guard_check_connection(void) 668 { 669 670 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 671 672 rw_rlock(&adist_remote_lock); 673 if (adhost->adh_remote != NULL) { 674 rw_unlock(&adist_remote_lock); 675 pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 676 adhost->adh_remoteaddr); 677 return; 678 } 679 680 /* 681 * Upgrade the lock. It doesn't have to be atomic as no other thread 682 * can change connection status from disconnected to connected. 683 */ 684 rw_unlock(&adist_remote_lock); 685 pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 686 adhost->adh_remoteaddr); 687 if (sender_connect() == 0) { 688 pjdlog_info("Successfully reconnected to %s.", 689 adhost->adh_remoteaddr); 690 } else { 691 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 692 adhost->adh_remoteaddr); 693 } 694 } 695 696 /* 697 * Thread guards remote connections and reconnects when needed, handles 698 * signals, etc. 699 */ 700 static void * 701 guard_thread(void *arg __unused) 702 { 703 struct timespec timeout; 704 time_t lastcheck, now; 705 sigset_t mask; 706 int signo; 707 708 lastcheck = time(NULL); 709 710 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 711 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 712 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 713 714 timeout.tv_sec = ADIST_KEEPALIVE; 715 timeout.tv_nsec = 0; 716 signo = -1; 717 718 for (;;) { 719 switch (signo) { 720 case SIGINT: 721 case SIGTERM: 722 sigexit_received = true; 723 pjdlog_exitx(EX_OK, 724 "Termination signal received, exiting."); 725 break; 726 default: 727 break; 728 } 729 730 pjdlog_debug(3, "remote_guard: Checking connections."); 731 now = time(NULL); 732 if (lastcheck + ADIST_KEEPALIVE <= now) { 733 guard_check_connection(); 734 lastcheck = now; 735 } 736 signo = sigtimedwait(&mask, NULL, &timeout); 737 } 738 /* NOTREACHED */ 739 return (NULL); 740 } 741 742 void 743 adist_sender(struct adist_config *config, struct adist_host *adh) 744 { 745 pthread_t td; 746 pid_t pid; 747 int error, mode, debuglevel; 748 749 /* 750 * Create communication channel for sending connection requests from 751 * child to parent. 752 */ 753 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 754 pjdlog_errno(LOG_ERR, 755 "Unable to create connection sockets between child and parent"); 756 return; 757 } 758 759 pid = fork(); 760 if (pid == -1) { 761 pjdlog_errno(LOG_ERR, "Unable to fork"); 762 proto_close(adh->adh_conn); 763 adh->adh_conn = NULL; 764 return; 765 } 766 767 if (pid > 0) { 768 /* This is parent. */ 769 adh->adh_worker_pid = pid; 770 /* Declare that we are receiver. */ 771 proto_recv(adh->adh_conn, NULL, 0); 772 return; 773 } 774 775 adcfg = config; 776 adhost = adh; 777 778 mode = pjdlog_mode_get(); 779 debuglevel = pjdlog_debug_get(); 780 781 /* Declare that we are sender. */ 782 proto_send(adhost->adh_conn, NULL, 0); 783 784 descriptors_cleanup(adhost); 785 786 #ifdef TODO 787 descriptors_assert(adhost, mode); 788 #endif 789 790 pjdlog_init(mode); 791 pjdlog_debug_set(debuglevel); 792 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 793 role2str(adhost->adh_role)); 794 #ifdef HAVE_SETPROCTITLE 795 setproctitle("[%s] (%s) ", adhost->adh_name, 796 role2str(adhost->adh_role)); 797 #endif 798 799 /* 800 * The sender process should be able to remove entries from its 801 * trail directory, but it should not be able to write to the 802 * trail files, only read from them. 803 */ 804 adist_trail = trail_new(adhost->adh_directory, false); 805 if (adist_trail == NULL) 806 exit(EX_OSFILE); 807 808 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 809 role2str(adhost->adh_role), adhost->adh_name) != 0) { 810 exit(EX_CONFIG); 811 } 812 pjdlog_info("Privileges successfully dropped."); 813 814 /* 815 * We can ignore wait_for_dir_init() failures. It will fall back to 816 * using sleep(3). 817 */ 818 (void)wait_for_dir_init(trail_dirfd(adist_trail)); 819 820 init_environment(); 821 if (sender_connect() == 0) { 822 pjdlog_info("Successfully connected to %s.", 823 adhost->adh_remoteaddr); 824 } 825 adhost->adh_reset = true; 826 827 /* 828 * Create the guard thread first, so we can handle signals from the 829 * very begining. 830 */ 831 error = pthread_create(&td, NULL, guard_thread, NULL); 832 PJDLOG_ASSERT(error == 0); 833 error = pthread_create(&td, NULL, send_thread, NULL); 834 PJDLOG_ASSERT(error == 0); 835 error = pthread_create(&td, NULL, recv_thread, NULL); 836 PJDLOG_ASSERT(error == 0); 837 (void)read_thread(NULL); 838 } 839