xref: /freebsd/contrib/openbsm/bin/auditdistd/sender.c (revision 595e514d0df2bac5b813d35f83e32875dbf16a83)
1 /*-
2  * Copyright (c) 2012 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  *
29  * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
30  */
31 
32 #include <config/config.h>
33 
34 #include <sys/param.h>
35 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36 #include <sys/endian.h>
37 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38 #ifdef HAVE_MACHINE_ENDIAN_H
39 #include <machine/endian.h>
40 #else /* !HAVE_MACHINE_ENDIAN_H */
41 #ifdef HAVE_ENDIAN_H
42 #include <endian.h>
43 #else /* !HAVE_ENDIAN_H */
44 #error "No supported endian.h"
45 #endif /* !HAVE_ENDIAN_H */
46 #endif /* !HAVE_MACHINE_ENDIAN_H */
47 #include <compat/endian.h>
48 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49 #include <sys/queue.h>
50 #include <sys/stat.h>
51 #include <sys/wait.h>
52 
53 #include <stdio.h>
54 #include <stdlib.h>
55 #include <unistd.h>
56 
57 #include <ctype.h>
58 #include <dirent.h>
59 #include <err.h>
60 #include <errno.h>
61 #include <fcntl.h>
62 #ifdef HAVE_LIBUTIL_H
63 #include <libutil.h>
64 #endif
65 #include <signal.h>
66 #include <string.h>
67 #include <strings.h>
68 
69 #include <openssl/hmac.h>
70 
71 #ifndef HAVE_SIGTIMEDWAIT
72 #include "sigtimedwait.h"
73 #endif
74 
75 #include "auditdistd.h"
76 #include "pjdlog.h"
77 #include "proto.h"
78 #include "sandbox.h"
79 #include "subr.h"
80 #include "synch.h"
81 #include "trail.h"
82 
83 static struct adist_config *adcfg;
84 static struct adist_host *adhost;
85 
86 static pthread_rwlock_t adist_remote_lock;
87 static pthread_mutex_t adist_remote_mtx;
88 static pthread_cond_t adist_remote_cond;
89 static struct trail *adist_trail;
90 
91 static TAILQ_HEAD(, adreq) adist_free_list;
92 static pthread_mutex_t adist_free_list_lock;
93 static pthread_cond_t adist_free_list_cond;
94 static TAILQ_HEAD(, adreq) adist_send_list;
95 static pthread_mutex_t adist_send_list_lock;
96 static pthread_cond_t adist_send_list_cond;
97 static TAILQ_HEAD(, adreq) adist_recv_list;
98 static pthread_mutex_t adist_recv_list_lock;
99 static pthread_cond_t adist_recv_list_cond;
100 
101 static void
102 init_environment(void)
103 {
104 	struct adreq *adreq;
105 	unsigned int ii;
106 
107 	rw_init(&adist_remote_lock);
108 	mtx_init(&adist_remote_mtx);
109 	cv_init(&adist_remote_cond);
110 	TAILQ_INIT(&adist_free_list);
111 	mtx_init(&adist_free_list_lock);
112 	cv_init(&adist_free_list_cond);
113 	TAILQ_INIT(&adist_send_list);
114 	mtx_init(&adist_send_list_lock);
115 	cv_init(&adist_send_list_cond);
116 	TAILQ_INIT(&adist_recv_list);
117 	mtx_init(&adist_recv_list_lock);
118 	cv_init(&adist_recv_list_cond);
119 
120 	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121 		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
122 		if (adreq == NULL) {
123 			pjdlog_exitx(EX_TEMPFAIL,
124 			    "Unable to allocate %zu bytes of memory for adreq object.",
125 			    sizeof(*adreq) + ADIST_BUF_SIZE);
126 		}
127 		adreq->adr_byteorder = ADIST_BYTEORDER;
128 		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
129 		adreq->adr_seq = 0;
130 		adreq->adr_datasize = 0;
131 		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
132 	}
133 }
134 
135 static int
136 sender_connect(void)
137 {
138 	unsigned char rnd[32], hash[32], resp[32];
139 	struct proto_conn *conn;
140 	char welcome[8];
141 	int16_t val;
142 
143 	val = 1;
144 	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145 		pjdlog_exit(EX_TEMPFAIL,
146 		    "Unable to send connection request to parent");
147 	}
148 	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149 		pjdlog_exit(EX_TEMPFAIL,
150 		    "Unable to receive reply to connection request from parent");
151 	}
152 	if (val != 0) {
153 		errno = val;
154 		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155 		    adhost->adh_remoteaddr);
156 		return (-1);
157 	}
158 	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159 		pjdlog_exit(EX_TEMPFAIL,
160 		    "Unable to receive connection from parent");
161 	}
162 	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163 		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164 		    adhost->adh_remoteaddr);
165 		proto_close(conn);
166 		return (-1);
167 	}
168 	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169 	/* Error in setting timeout is not critical, but why should it fail? */
170 	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171 		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
172 	else
173 		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
174 
175 	/* Exchange welcome message, which includes version number. */
176 	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177 	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178 		pjdlog_errno(LOG_WARNING,
179 		    "Unable to send welcome message to %s",
180 		    adhost->adh_remoteaddr);
181 		proto_close(conn);
182 		return (-1);
183 	}
184 	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185 	bzero(welcome, sizeof(welcome));
186 	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187 		pjdlog_errno(LOG_WARNING,
188 		    "Unable to receive welcome message from %s",
189 		    adhost->adh_remoteaddr);
190 		proto_close(conn);
191 		return (-1);
192 	}
193 	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194 	    !isdigit(welcome[6]) || welcome[7] != '\0') {
195 		pjdlog_warning("Invalid welcome message from %s.",
196 		    adhost->adh_remoteaddr);
197 		proto_close(conn);
198 		return (-1);
199 	}
200 	pjdlog_debug(1, "Welcome message received (%s).", welcome);
201 	/*
202 	 * Receiver can only reply with version number lower or equal to
203 	 * the one we sent.
204 	 */
205 	adhost->adh_version = atoi(welcome + 5);
206 	if (adhost->adh_version > ADIST_VERSION) {
207 		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208 		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
209 		proto_close(conn);
210 		return (-1);
211 	}
212 
213 	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214 	    adhost->adh_remoteaddr);
215 
216 	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217 		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218 		    adhost->adh_remoteaddr);
219 		proto_close(conn);
220 		return (-1);
221 	}
222 	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
223 
224 	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225 		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226 		    adhost->adh_remoteaddr);
227 		proto_close(conn);
228 		return (-1);
229 	}
230 	pjdlog_debug(1, "Challenge received.");
231 
232 	if (HMAC(EVP_sha256(), adhost->adh_password,
233 	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
234 	    NULL) == NULL) {
235 		pjdlog_warning("Unable to generate response.");
236 		proto_close(conn);
237 		return (-1);
238 	}
239 	pjdlog_debug(1, "Response generated.");
240 
241 	if (proto_send(conn, hash, sizeof(hash)) == -1) {
242 		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243 		    adhost->adh_remoteaddr);
244 		proto_close(conn);
245 		return (-1);
246 	}
247 	pjdlog_debug(1, "Response sent.");
248 
249 	if (adist_random(rnd, sizeof(rnd)) == -1) {
250 		pjdlog_warning("Unable to generate challenge.");
251 		proto_close(conn);
252 		return (-1);
253 	}
254 	pjdlog_debug(1, "Challenge generated.");
255 
256 	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257 		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258 		    adhost->adh_remoteaddr);
259 		proto_close(conn);
260 		return (-1);
261 	}
262 	pjdlog_debug(1, "Challenge sent.");
263 
264 	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265 		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266 		    adhost->adh_remoteaddr);
267 		proto_close(conn);
268 		return (-1);
269 	}
270 	pjdlog_debug(1, "Response received.");
271 
272 	if (HMAC(EVP_sha256(), adhost->adh_password,
273 	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
274 	    NULL) == NULL) {
275 		pjdlog_warning("Unable to generate hash.");
276 		proto_close(conn);
277 		return (-1);
278 	}
279 	pjdlog_debug(1, "Hash generated.");
280 
281 	if (memcmp(resp, hash, sizeof(hash)) != 0) {
282 		pjdlog_warning("Invalid response from %s (wrong password?).",
283 		    adhost->adh_remoteaddr);
284 		proto_close(conn);
285 		return (-1);
286 	}
287 	pjdlog_info("Receiver authenticated.");
288 
289 	if (proto_recv(conn, &adhost->adh_trail_offset,
290 	    sizeof(adhost->adh_trail_offset)) == -1) {
291 		pjdlog_errno(LOG_WARNING,
292 		    "Unable to receive size of the most recent trail file from %s",
293 		    adhost->adh_remoteaddr);
294 		proto_close(conn);
295 		return (-1);
296 	}
297 	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298 	if (proto_recv(conn, &adhost->adh_trail_name,
299 	    sizeof(adhost->adh_trail_name)) == -1) {
300 		pjdlog_errno(LOG_WARNING,
301 		    "Unable to receive name of the most recent trail file from %s",
302 		    adhost->adh_remoteaddr);
303 		proto_close(conn);
304 		return (-1);
305 	}
306 	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307 	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
308 
309 	rw_wlock(&adist_remote_lock);
310 	mtx_lock(&adist_remote_mtx);
311 	PJDLOG_ASSERT(adhost->adh_remote == NULL);
312 	PJDLOG_ASSERT(conn != NULL);
313 	adhost->adh_remote = conn;
314 	mtx_unlock(&adist_remote_mtx);
315 	rw_unlock(&adist_remote_lock);
316 	cv_signal(&adist_remote_cond);
317 
318 	return (0);
319 }
320 
321 static void
322 sender_disconnect(void)
323 {
324 
325 	rw_wlock(&adist_remote_lock);
326 	/*
327 	 * Check for a race between dropping rlock and acquiring wlock -
328 	 * another thread can close connection in-between.
329 	 */
330 	if (adhost->adh_remote == NULL) {
331 		rw_unlock(&adist_remote_lock);
332 		return;
333 	}
334 	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335 	proto_close(adhost->adh_remote);
336 	mtx_lock(&adist_remote_mtx);
337 	adhost->adh_remote = NULL;
338 	adhost->adh_reset = true;
339 	adhost->adh_trail_name[0] = '\0';
340 	adhost->adh_trail_offset = 0;
341 	mtx_unlock(&adist_remote_mtx);
342 	rw_unlock(&adist_remote_lock);
343 
344 	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
345 
346 	/* Move all in-flight requests back onto free list. */
347 	mtx_lock(&adist_free_list_lock);
348 	mtx_lock(&adist_send_list_lock);
349 	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350 	mtx_unlock(&adist_send_list_lock);
351 	mtx_lock(&adist_recv_list_lock);
352 	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353 	mtx_unlock(&adist_recv_list_lock);
354 	mtx_unlock(&adist_free_list_lock);
355 }
356 
357 static void
358 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
359     size_t size)
360 {
361 	static uint64_t seq = 1;
362 
363 	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
364 
365 	switch (cmd) {
366 	case ADIST_CMD_OPEN:
367 	case ADIST_CMD_CLOSE:
368 		PJDLOG_ASSERT(data != NULL && size == 0);
369 		size = strlen(data) + 1;
370 		break;
371 	case ADIST_CMD_APPEND:
372 		PJDLOG_ASSERT(data != NULL && size > 0);
373 		break;
374 	case ADIST_CMD_KEEPALIVE:
375 	case ADIST_CMD_ERROR:
376 		PJDLOG_ASSERT(data == NULL && size == 0);
377 		break;
378 	default:
379 		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
380 	}
381 
382 	adreq->adr_cmd = cmd;
383 	adreq->adr_seq = seq++;
384 	adreq->adr_datasize = size;
385 	/* Don't copy if data is already in out buffer. */
386 	if (data != NULL && data != adreq->adr_data)
387 		bcopy(data, adreq->adr_data, size);
388 }
389 
390 static bool
391 read_thread_wait(void)
392 {
393 	bool newfile = false;
394 
395 	mtx_lock(&adist_remote_mtx);
396 	if (adhost->adh_reset) {
397 reset:
398 		adhost->adh_reset = false;
399 		if (trail_filefd(adist_trail) != -1)
400 			trail_close(adist_trail);
401 		trail_reset(adist_trail);
402 		while (adhost->adh_remote == NULL)
403 			cv_wait(&adist_remote_cond, &adist_remote_mtx);
404 		trail_start(adist_trail, adhost->adh_trail_name,
405 		    adhost->adh_trail_offset);
406 		newfile = true;
407 	}
408 	mtx_unlock(&adist_remote_mtx);
409 	while (trail_filefd(adist_trail) == -1) {
410 		newfile = true;
411 		wait_for_dir();
412 		/*
413 		 * We may have been disconnected and reconnected in the
414 		 * meantime, check if reset is set.
415 		 */
416 		mtx_lock(&adist_remote_mtx);
417 		if (adhost->adh_reset)
418 			goto reset;
419 		mtx_unlock(&adist_remote_mtx);
420 		if (trail_filefd(adist_trail) == -1)
421 			trail_next(adist_trail);
422 	}
423 	if (newfile) {
424 		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
425 		    adhost->adh_directory,
426 		    trail_filename(adist_trail));
427 		(void)wait_for_file_init(trail_filefd(adist_trail));
428 	}
429 	return (newfile);
430 }
431 
432 static void *
433 read_thread(void *arg __unused)
434 {
435 	struct adreq *adreq;
436 	ssize_t done;
437 	bool newfile;
438 
439 	pjdlog_debug(1, "%s started.", __func__);
440 
441 	for (;;) {
442 		newfile = read_thread_wait();
443 		QUEUE_TAKE(adreq, &adist_free_list, 0);
444 		if (newfile) {
445 			adreq_fill(adreq, ADIST_CMD_OPEN,
446 			    trail_filename(adist_trail), 0);
447 			newfile = false;
448 			goto move;
449 		}
450 
451 		done = read(trail_filefd(adist_trail), adreq->adr_data,
452 		    ADIST_BUF_SIZE);
453 		if (done == -1) {
454 			off_t offset;
455 			int error;
456 
457 			error = errno;
458 			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
459 			errno = error;
460 			pjdlog_errno(LOG_ERR,
461 			    "Error while reading \"%s/%s\" at offset %jd",
462 			    adhost->adh_directory, trail_filename(adist_trail),
463 			    offset);
464 			trail_close(adist_trail);
465 			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
466 			goto move;
467 		} else if (done == 0) {
468 			/* End of file. */
469 			pjdlog_debug(3, "End of \"%s/%s\".",
470 			    adhost->adh_directory, trail_filename(adist_trail));
471 			if (!trail_switch(adist_trail)) {
472 				/* More audit records can arrive. */
473 				mtx_lock(&adist_free_list_lock);
474 				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
475 				    adr_next);
476 				mtx_unlock(&adist_free_list_lock);
477 				wait_for_file();
478 				continue;
479 			}
480 			adreq_fill(adreq, ADIST_CMD_CLOSE,
481 			    trail_filename(adist_trail), 0);
482 			trail_close(adist_trail);
483 			goto move;
484 		}
485 
486 		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
487 move:
488 		pjdlog_debug(3,
489 		    "read thread: Moving request %p to the send queue (%hhu).",
490 		    adreq, adreq->adr_cmd);
491 		QUEUE_INSERT(adreq, &adist_send_list);
492 	}
493 	/* NOTREACHED */
494 	return (NULL);
495 }
496 
497 static void
498 keepalive_send(void)
499 {
500 	struct adreq *adreq;
501 
502 	rw_rlock(&adist_remote_lock);
503 	if (adhost->adh_remote == NULL) {
504 		rw_unlock(&adist_remote_lock);
505 		return;
506 	}
507 	rw_unlock(&adist_remote_lock);
508 
509 	mtx_lock(&adist_free_list_lock);
510 	adreq = TAILQ_FIRST(&adist_free_list);
511 	if (adreq != NULL)
512 		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
513 	mtx_unlock(&adist_free_list_lock);
514 	if (adreq == NULL)
515 		return;
516 
517 	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
518 
519 	QUEUE_INSERT(adreq, &adist_send_list);
520 
521 	pjdlog_debug(3, "keepalive_send: Request sent.");
522 }
523 
524 /*
525  * Thread sends request to secondary node.
526  */
527 static void *
528 send_thread(void *arg __unused)
529 {
530 	time_t lastcheck, now;
531 	struct adreq *adreq;
532 
533 	pjdlog_debug(1, "%s started.", __func__);
534 
535 	lastcheck = time(NULL);
536 
537 	for (;;) {
538 		pjdlog_debug(3, "send thread: Taking request.");
539 		for (;;) {
540 			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
541 			if (adreq != NULL)
542 				break;
543 			now = time(NULL);
544 			if (lastcheck + ADIST_KEEPALIVE <= now) {
545 				keepalive_send();
546 				lastcheck = now;
547 			}
548 		}
549 		PJDLOG_ASSERT(adreq != NULL);
550 		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
551 		    adreq->adr_cmd);
552 		/*
553 		 * Protect connection from disappearing.
554 		 */
555 		rw_rlock(&adist_remote_lock);
556 		/*
557 		 * Move the request to the recv queue first to avoid race
558 		 * where the recv thread receives the reply before we move
559 		 * the request to the recv queue.
560 		 */
561 		QUEUE_INSERT(adreq, &adist_recv_list);
562 		if (adhost->adh_remote == NULL ||
563 		    proto_send(adhost->adh_remote, &adreq->adr_packet,
564 		    ADPKT_SIZE(adreq)) == -1) {
565 			rw_unlock(&adist_remote_lock);
566 			pjdlog_debug(1,
567 			    "send thread: (%p) Unable to send request.", adreq);
568 			if (adhost->adh_remote != NULL)
569 				sender_disconnect();
570 			continue;
571 		} else {
572 			pjdlog_debug(3, "Request %p sent successfully.", adreq);
573 			adreq_log(LOG_DEBUG, 2, -1, adreq,
574 			    "send: (%p) Request sent: ", adreq);
575 			rw_unlock(&adist_remote_lock);
576 		}
577 	}
578 	/* NOTREACHED */
579 	return (NULL);
580 }
581 
582 static void
583 adrep_decode_header(struct adrep *adrep)
584 {
585 
586 	/* Byte-swap only is the receiver is using different byte order. */
587 	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
588 		adrep->adrp_byteorder = ADIST_BYTEORDER;
589 		adrep->adrp_seq = bswap64(adrep->adrp_seq);
590 		adrep->adrp_error = bswap16(adrep->adrp_error);
591 	}
592 }
593 
594 /*
595  * Thread receives answer from secondary node and passes it to ggate_send
596  * thread.
597  */
598 static void *
599 recv_thread(void *arg __unused)
600 {
601 	struct adrep adrep;
602 	struct adreq *adreq;
603 
604 	pjdlog_debug(1, "%s started.", __func__);
605 
606 	for (;;) {
607 		/* Wait until there is anything to receive. */
608 		QUEUE_WAIT(&adist_recv_list);
609 		pjdlog_debug(3, "recv thread: Got something.");
610 		rw_rlock(&adist_remote_lock);
611 		if (adhost->adh_remote == NULL) {
612 			/*
613 			 * Connection is dead.
614 			 * XXX: We shouldn't be here.
615 			 */
616 			rw_unlock(&adist_remote_lock);
617 			continue;
618 		}
619 		if (proto_recv(adhost->adh_remote, &adrep,
620 		    sizeof(adrep)) == -1) {
621 			rw_unlock(&adist_remote_lock);
622 			pjdlog_errno(LOG_ERR, "Unable to receive reply");
623 			sender_disconnect();
624 			continue;
625 		}
626 		rw_unlock(&adist_remote_lock);
627 		adrep_decode_header(&adrep);
628 		/*
629 		 * Find the request that was just confirmed.
630 		 */
631 		mtx_lock(&adist_recv_list_lock);
632 		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
633 			if (adreq->adr_seq == adrep.adrp_seq) {
634 				TAILQ_REMOVE(&adist_recv_list, adreq,
635 				    adr_next);
636 				break;
637 			}
638 		}
639 		if (adreq == NULL) {
640 			/*
641 			 * If we disconnected in the meantime, just continue.
642 			 * On disconnect sender_disconnect() clears the queue,
643 			 * we can use that.
644 			 */
645 			if (TAILQ_EMPTY(&adist_recv_list)) {
646 				rw_unlock(&adist_remote_lock);
647 				continue;
648 			}
649 			mtx_unlock(&adist_recv_list_lock);
650 			pjdlog_error("Found no request matching received 'seq' field (%ju).",
651 			    (uintmax_t)adrep.adrp_seq);
652 			sender_disconnect();
653 			continue;
654 		}
655 		mtx_unlock(&adist_recv_list_lock);
656 		adreq_log(LOG_DEBUG, 2, -1, adreq,
657 		    "recv thread: (%p) Request confirmed: ", adreq);
658 		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
659 		    adreq->adr_cmd);
660 		if (adrep.adrp_error != 0) {
661 			pjdlog_error("Receiver returned error (%s), disconnecting.",
662 			    adist_errstr((int)adrep.adrp_error));
663 			sender_disconnect();
664 			continue;
665 		}
666 		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
667 			trail_unlink(adist_trail, adreq->adr_data);
668 		pjdlog_debug(3, "Request received successfully.");
669 		QUEUE_INSERT(adreq, &adist_free_list);
670 	}
671 	/* NOTREACHED */
672 	return (NULL);
673 }
674 
675 static void
676 guard_check_connection(void)
677 {
678 
679 	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
680 
681 	rw_rlock(&adist_remote_lock);
682 	if (adhost->adh_remote != NULL) {
683 		rw_unlock(&adist_remote_lock);
684 		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
685 		    adhost->adh_remoteaddr);
686 		return;
687 	}
688 
689 	/*
690 	 * Upgrade the lock. It doesn't have to be atomic as no other thread
691 	 * can change connection status from disconnected to connected.
692 	 */
693 	rw_unlock(&adist_remote_lock);
694 	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
695 	    adhost->adh_remoteaddr);
696 	if (sender_connect() == 0) {
697 		pjdlog_info("Successfully reconnected to %s.",
698 		    adhost->adh_remoteaddr);
699 	} else {
700 		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
701 		    adhost->adh_remoteaddr);
702 	}
703 }
704 
705 /*
706  * Thread guards remote connections and reconnects when needed, handles
707  * signals, etc.
708  */
709 static void *
710 guard_thread(void *arg __unused)
711 {
712 	struct timespec timeout;
713 	time_t lastcheck, now;
714 	sigset_t mask;
715 	int signo;
716 
717 	lastcheck = time(NULL);
718 
719 	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
720 	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
721 	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
722 
723 	timeout.tv_sec = ADIST_KEEPALIVE;
724 	timeout.tv_nsec = 0;
725 	signo = -1;
726 
727 	for (;;) {
728 		switch (signo) {
729 		case SIGINT:
730 		case SIGTERM:
731 			sigexit_received = true;
732 			pjdlog_exitx(EX_OK,
733 			    "Termination signal received, exiting.");
734 			break;
735 		default:
736 			break;
737 		}
738 
739 		pjdlog_debug(3, "remote_guard: Checking connections.");
740 		now = time(NULL);
741 		if (lastcheck + ADIST_KEEPALIVE <= now) {
742 			guard_check_connection();
743 			lastcheck = now;
744 		}
745 		signo = sigtimedwait(&mask, NULL, &timeout);
746 	}
747 	/* NOTREACHED */
748 	return (NULL);
749 }
750 
751 void
752 adist_sender(struct adist_config *config, struct adist_host *adh)
753 {
754 	pthread_t td;
755 	pid_t pid;
756 	int error, mode, debuglevel;
757 
758 	/*
759 	 * Create communication channel for sending connection requests from
760 	 * child to parent.
761 	 */
762 	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
763 		pjdlog_errno(LOG_ERR,
764 		    "Unable to create connection sockets between child and parent");
765 		return;
766 	}
767 
768 	pid = fork();
769 	if (pid == -1) {
770 		pjdlog_errno(LOG_ERR, "Unable to fork");
771 		proto_close(adh->adh_conn);
772 		adh->adh_conn = NULL;
773 		return;
774 	}
775 
776 	if (pid > 0) {
777 		/* This is parent. */
778 		adh->adh_worker_pid = pid;
779 		/* Declare that we are receiver. */
780 		proto_recv(adh->adh_conn, NULL, 0);
781 		return;
782 	}
783 
784 	adcfg = config;
785 	adhost = adh;
786 
787 	mode = pjdlog_mode_get();
788 	debuglevel = pjdlog_debug_get();
789 
790 	/* Declare that we are sender. */
791 	proto_send(adhost->adh_conn, NULL, 0);
792 
793 	descriptors_cleanup(adhost);
794 
795 #ifdef TODO
796 	descriptors_assert(adhost, mode);
797 #endif
798 
799 	pjdlog_init(mode);
800 	pjdlog_debug_set(debuglevel);
801 	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
802 	    role2str(adhost->adh_role));
803 #ifdef HAVE_SETPROCTITLE
804 	setproctitle("[%s] (%s) ", adhost->adh_name,
805 	    role2str(adhost->adh_role));
806 #endif
807 
808 	/*
809 	 * The sender process should be able to remove entries from its
810 	 * trail directory, but it should not be able to write to the
811 	 * trail files, only read from them.
812 	 */
813 	adist_trail = trail_new(adhost->adh_directory, false);
814 	if (adist_trail == NULL)
815 		exit(EX_OSFILE);
816 
817 	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
818 	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
819 		exit(EX_CONFIG);
820 	}
821 	pjdlog_info("Privileges successfully dropped.");
822 
823 	/*
824 	 * We can ignore wait_for_dir_init() failures. It will fall back to
825 	 * using sleep(3).
826 	 */
827 	(void)wait_for_dir_init(trail_dirfd(adist_trail));
828 
829 	init_environment();
830 	if (sender_connect() == 0) {
831 		pjdlog_info("Successfully connected to %s.",
832 		    adhost->adh_remoteaddr);
833 	}
834 	adhost->adh_reset = true;
835 
836 	/*
837 	 * Create the guard thread first, so we can handle signals from the
838 	 * very begining.
839 	 */
840 	error = pthread_create(&td, NULL, guard_thread, NULL);
841 	PJDLOG_ASSERT(error == 0);
842 	error = pthread_create(&td, NULL, send_thread, NULL);
843 	PJDLOG_ASSERT(error == 0);
844 	error = pthread_create(&td, NULL, recv_thread, NULL);
845 	PJDLOG_ASSERT(error == 0);
846 	(void)read_thread(NULL);
847 }
848