xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision 5fa84c6ec176d186ddad25d31f8760e50f48157f)
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62 
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73 
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76 
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 	/** DTIO command channel stop */
81 	DTIO_COMMAND_STOP = 0,
82 	/** DTIO command channel wakeup */
83 	DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85 
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102 
103 struct dt_msg_queue*
dt_msg_queue_create(struct comm_base * base)104 dt_msg_queue_create(struct comm_base* base)
105 {
106 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 	if(!mq) return NULL;
108 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 		about 1 M should contain 64K messages with some overhead,
110 		or a whole bunch smaller ones */
111 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 	if(!mq->wakeup_timer) {
113 		free(mq);
114 		return NULL;
115 	}
116 	lock_basic_init(&mq->lock);
117 	lock_protect(&mq->lock, mq, sizeof(*mq));
118 	return mq;
119 }
120 
121 /** clear the message list, caller must hold the lock */
122 static void
dt_msg_queue_clear(struct dt_msg_queue * mq)123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 	struct dt_msg_entry* e = mq->first, *next=NULL;
126 	while(e) {
127 		next = e->next;
128 		free(e->buf);
129 		free(e);
130 		e = next;
131 	}
132 	mq->first = NULL;
133 	mq->last = NULL;
134 	mq->cursize = 0;
135 	mq->msgcount = 0;
136 }
137 
138 void
dt_msg_queue_delete(struct dt_msg_queue * mq)139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 	if(!mq) return;
142 	lock_basic_destroy(&mq->lock);
143 	dt_msg_queue_clear(mq);
144 	comm_timer_delete(mq->wakeup_timer);
145 	free(mq);
146 }
147 
148 /** make the dtio wake up by sending a wakeup command */
dtio_wakeup(struct dt_io_thread * dtio)149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 	if(!dtio) return;
153 	if(!dtio->started) return;
154 
155 	while(1) {
156 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 		if(r == -1) {
158 #ifndef USE_WINSOCK
159 			if(errno == EINTR || errno == EAGAIN)
160 				continue;
161 #else
162 			if(WSAGetLastError() == WSAEINPROGRESS)
163 				continue;
164 			if(WSAGetLastError() == WSAEWOULDBLOCK)
165 				continue;
166 #endif
167 			log_err("dnstap io wakeup: write: %s",
168 				sock_strerror(errno));
169 			break;
170 		}
171 		break;
172 	}
173 }
174 
175 void
mq_wakeup_cb(void * arg)176 mq_wakeup_cb(void* arg)
177 {
178 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 
180 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
181 	mq->dtio->wakeup_timer_enabled = 0;
182 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
183 	dtio_wakeup(mq->dtio);
184 }
185 
186 /** start timer to wakeup dtio because there is content in the queue */
187 static void
dt_msg_queue_start_timer(struct dt_msg_queue * mq,int wakeupnow)188 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
189 {
190 	struct timeval tv = {0};
191 	/* Start a timer to process messages to be logged.
192 	 * If we woke up the dtio thread for every message, the wakeup
193 	 * messages take up too much processing power.  If the queue
194 	 * fills up the wakeup happens immediately.  The timer wakes it up
195 	 * if there are infrequent messages to log. */
196 
197 	/* we cannot start a timer in dtio thread, because it is a different
198 	 * thread and its event base is in use by the other thread, it would
199 	 * give race conditions if we tried to modify its event base,
200 	 * and locks would wait until it woke up, and this is what we do. */
201 
202 	/* do not start the timer if a timer already exists, perhaps
203 	 * in another worker.  So this variable is protected by a lock in
204 	 * dtio. */
205 
206 	/* If we need to wakeupnow, 0 the timer to force the callback. */
207 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
208 	if(mq->dtio->wakeup_timer_enabled) {
209 		if(wakeupnow) {
210 			tv.tv_sec = 0;
211 			tv.tv_usec = 0;
212 			comm_timer_set(mq->wakeup_timer, &tv);
213 		}
214 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215 		return;
216 	}
217 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
218 
219 	/* start the timer, in mq, in the event base of our worker */
220 	if(!wakeupnow) {
221 		tv.tv_sec = 1;
222 		tv.tv_usec = 0;
223 		/* If it is already set, keep it running. */
224 		if(!comm_timer_is_set(mq->wakeup_timer))
225 			comm_timer_set(mq->wakeup_timer, &tv);
226 	} else {
227 		tv.tv_sec = 0;
228 		tv.tv_usec = 0;
229 		comm_timer_set(mq->wakeup_timer, &tv);
230 	}
231 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
232 }
233 
234 void
dt_msg_queue_submit(struct dt_msg_queue * mq,void * buf,size_t len)235 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
236 {
237 	int wakeupnow = 0, wakeupstarttimer = 0;
238 	struct dt_msg_entry* entry;
239 
240 	/* check conditions */
241 	if(!buf) return;
242 	if(len == 0) {
243 		/* it is not possible to log entries with zero length,
244 		 * because the framestream protocol does not carry it.
245 		 * However the protobuf serialization does not create zero
246 		 * length datagrams for dnstap, so this should not happen. */
247 		free(buf);
248 		return;
249 	}
250 	if(!mq) {
251 		free(buf);
252 		return;
253 	}
254 
255 	/* allocate memory for queue entry */
256 	entry = malloc(sizeof(*entry));
257 	if(!entry) {
258 		log_err("out of memory logging dnstap");
259 		free(buf);
260 		return;
261 	}
262 	entry->next = NULL;
263 	entry->buf = buf;
264 	entry->len = len;
265 
266 	/* acquire lock */
267 	lock_basic_lock(&mq->lock);
268 	/* if list was empty, start timer for (eventual) wakeup,
269 	 * or if dtio is not writing now an eventual wakeup is needed. */
270 	if(mq->first == NULL || !mq->dtio->event_added_is_write)
271 		wakeupstarttimer = 1;
272 	/* if list contains more than wakeupnum elements, wakeup now,
273 	 * or if list is (going to be) almost full */
274 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
275 		(mq->cursize < mq->maxsize * 9 / 10 &&
276 		mq->cursize+len >= mq->maxsize * 9 / 10))
277 		wakeupnow = 1;
278 	/* see if it is going to fit */
279 	if(mq->cursize + len > mq->maxsize) {
280 		/* buffer full, or congested. */
281 		/* drop */
282 		lock_basic_unlock(&mq->lock);
283 		free(buf);
284 		free(entry);
285 		return;
286 	}
287 	mq->cursize += len;
288 	mq->msgcount ++;
289 	/* append to list */
290 	if(mq->last) {
291 		mq->last->next = entry;
292 	} else {
293 		mq->first = entry;
294 	}
295 	mq->last = entry;
296 	/* release lock */
297 	lock_basic_unlock(&mq->lock);
298 
299 	if(wakeupnow || wakeupstarttimer) {
300 		dt_msg_queue_start_timer(mq, wakeupnow);
301 	}
302 }
303 
dt_io_thread_create(void)304 struct dt_io_thread* dt_io_thread_create(void)
305 {
306 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
307 	lock_basic_init(&dtio->wakeup_timer_lock);
308 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
309 		sizeof(dtio->wakeup_timer_enabled));
310 	return dtio;
311 }
312 
dt_io_thread_delete(struct dt_io_thread * dtio)313 void dt_io_thread_delete(struct dt_io_thread* dtio)
314 {
315 	struct dt_io_list_item* item, *nextitem;
316 	if(!dtio) return;
317 	lock_basic_destroy(&dtio->wakeup_timer_lock);
318 	item=dtio->io_list;
319 	while(item) {
320 		nextitem = item->next;
321 		free(item);
322 		item = nextitem;
323 	}
324 	free(dtio->socket_path);
325 	free(dtio->ip_str);
326 	free(dtio->tls_server_name);
327 	free(dtio->client_key_file);
328 	free(dtio->client_cert_file);
329 	if(dtio->ssl_ctx) {
330 #ifdef HAVE_SSL
331 		SSL_CTX_free(dtio->ssl_ctx);
332 #endif
333 	}
334 	free(dtio);
335 }
336 
dt_io_thread_apply_cfg(struct dt_io_thread * dtio,struct config_file * cfg)337 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
338 {
339 	if(!cfg->dnstap) {
340 		log_warn("cannot setup dnstap because dnstap-enable is no");
341 		return 0;
342 	}
343 
344 	/* what type of connectivity do we have */
345 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
346 		if(cfg->dnstap_tls)
347 			dtio->upstream_is_tls = 1;
348 		else	dtio->upstream_is_tcp = 1;
349 	} else {
350 		dtio->upstream_is_unix = 1;
351 	}
352 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
353 
354 	if(dtio->upstream_is_unix) {
355 		char* nm;
356 		if(!cfg->dnstap_socket_path ||
357 			cfg->dnstap_socket_path[0]==0) {
358 			log_err("dnstap setup: no dnstap-socket-path for "
359 				"socket connect");
360 			return 0;
361 		}
362 		nm = cfg->dnstap_socket_path;
363 		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
364 			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
365 			nm += strlen(cfg->chrootdir);
366 		free(dtio->socket_path);
367 		dtio->socket_path = strdup(nm);
368 		if(!dtio->socket_path) {
369 			log_err("dnstap setup: malloc failure");
370 			return 0;
371 		}
372 	}
373 
374 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
375 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
376 			log_err("dnstap setup: no dnstap-ip for TCP connect");
377 			return 0;
378 		}
379 		free(dtio->ip_str);
380 		dtio->ip_str = strdup(cfg->dnstap_ip);
381 		if(!dtio->ip_str) {
382 			log_err("dnstap setup: malloc failure");
383 			return 0;
384 		}
385 	}
386 
387 	if(dtio->upstream_is_tls) {
388 #ifdef HAVE_SSL
389 		if(cfg->dnstap_tls_server_name &&
390 			cfg->dnstap_tls_server_name[0]) {
391 			free(dtio->tls_server_name);
392 			dtio->tls_server_name = strdup(
393 				cfg->dnstap_tls_server_name);
394 			if(!dtio->tls_server_name) {
395 				log_err("dnstap setup: malloc failure");
396 				return 0;
397 			}
398 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
399 				return 0;
400 		}
401 		if(cfg->dnstap_tls_client_key_file &&
402 			cfg->dnstap_tls_client_key_file[0]) {
403 			dtio->use_client_certs = 1;
404 			free(dtio->client_key_file);
405 			dtio->client_key_file = strdup(
406 				cfg->dnstap_tls_client_key_file);
407 			if(!dtio->client_key_file) {
408 				log_err("dnstap setup: malloc failure");
409 				return 0;
410 			}
411 			if(!cfg->dnstap_tls_client_cert_file ||
412 				cfg->dnstap_tls_client_cert_file[0]==0) {
413 				log_err("dnstap setup: client key "
414 					"authentication enabled with "
415 					"dnstap-tls-client-key-file, but "
416 					"no dnstap-tls-client-cert-file "
417 					"is given");
418 				return 0;
419 			}
420 			free(dtio->client_cert_file);
421 			dtio->client_cert_file = strdup(
422 				cfg->dnstap_tls_client_cert_file);
423 			if(!dtio->client_cert_file) {
424 				log_err("dnstap setup: malloc failure");
425 				return 0;
426 			}
427 		} else {
428 			dtio->use_client_certs = 0;
429 			dtio->client_key_file = NULL;
430 			dtio->client_cert_file = NULL;
431 		}
432 
433 		if(cfg->dnstap_tls_cert_bundle) {
434 			dtio->ssl_ctx = connect_sslctx_create(
435 				dtio->client_key_file,
436 				dtio->client_cert_file,
437 				cfg->dnstap_tls_cert_bundle, 0);
438 		} else {
439 			dtio->ssl_ctx = connect_sslctx_create(
440 				dtio->client_key_file,
441 				dtio->client_cert_file,
442 				cfg->tls_cert_bundle, cfg->tls_win_cert);
443 		}
444 		if(!dtio->ssl_ctx) {
445 			log_err("could not setup SSL CTX");
446 			return 0;
447 		}
448 		dtio->tls_use_sni = cfg->tls_use_sni;
449 #endif /* HAVE_SSL */
450 	}
451 #ifdef HAVE_GETTID
452 	dtio->thread_tid_log = cfg->log_thread_id;
453 #endif
454 	return 1;
455 }
456 
dt_io_thread_register_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)457 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
458         struct dt_msg_queue* mq)
459 {
460 	struct dt_io_list_item* item = malloc(sizeof(*item));
461 	if(!item) return 0;
462 	lock_basic_lock(&mq->lock);
463 	mq->dtio = dtio;
464 	lock_basic_unlock(&mq->lock);
465 	item->queue = mq;
466 	item->next = dtio->io_list;
467 	dtio->io_list = item;
468 	dtio->io_list_iter = NULL;
469 	return 1;
470 }
471 
dt_io_thread_unregister_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)472 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
473         struct dt_msg_queue* mq)
474 {
475 	struct dt_io_list_item* item, *prev=NULL;
476 	if(!dtio) return;
477 	item = dtio->io_list;
478 	while(item) {
479 		if(item->queue == mq) {
480 			/* found it */
481 			if(prev) prev->next = item->next;
482 			else dtio->io_list = item->next;
483 			/* the queue itself only registered, not deleted */
484 			lock_basic_lock(&item->queue->lock);
485 			item->queue->dtio = NULL;
486 			lock_basic_unlock(&item->queue->lock);
487 			free(item);
488 			dtio->io_list_iter = NULL;
489 			return;
490 		}
491 		prev = item;
492 		item = item->next;
493 	}
494 }
495 
496 /** pick a message from the queue, the routine locks and unlocks,
497  * returns true if there is a message */
dt_msg_queue_pop(struct dt_msg_queue * mq,void ** buf,size_t * len)498 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
499 	size_t* len)
500 {
501 	lock_basic_lock(&mq->lock);
502 	if(mq->first) {
503 		struct dt_msg_entry* entry = mq->first;
504 		mq->first = entry->next;
505 		if(!entry->next) mq->last = NULL;
506 		mq->cursize -= entry->len;
507 		mq->msgcount --;
508 		lock_basic_unlock(&mq->lock);
509 
510 		*buf = entry->buf;
511 		*len = entry->len;
512 		free(entry);
513 		return 1;
514 	}
515 	lock_basic_unlock(&mq->lock);
516 	return 0;
517 }
518 
519 /** find message in queue, false if no message, true if message to send */
dtio_find_in_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)520 static int dtio_find_in_queue(struct dt_io_thread* dtio,
521 	struct dt_msg_queue* mq)
522 {
523 	void* buf=NULL;
524 	size_t len=0;
525 	if(dt_msg_queue_pop(mq, &buf, &len)) {
526 		dtio->cur_msg = buf;
527 		dtio->cur_msg_len = len;
528 		dtio->cur_msg_done = 0;
529 		dtio->cur_msg_len_done = 0;
530 		return 1;
531 	}
532 	return 0;
533 }
534 
535 /** find a new message to write, search message queues, false if none */
dtio_find_msg(struct dt_io_thread * dtio)536 static int dtio_find_msg(struct dt_io_thread* dtio)
537 {
538 	struct dt_io_list_item *spot, *item;
539 
540 	spot = dtio->io_list_iter;
541 	/* use the next queue for the next message lookup,
542 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
543 	if(spot)
544 		dtio->io_list_iter = spot->next;
545 	else if(dtio->io_list)
546 		dtio->io_list_iter = dtio->io_list->next;
547 
548 	/* scan from spot to end-of-io_list */
549 	item = spot;
550 	while(item) {
551 		if(dtio_find_in_queue(dtio, item->queue))
552 			return 1;
553 		item = item->next;
554 	}
555 	/* scan starting at the start-of-list (to wrap around the end) */
556 	item = dtio->io_list;
557 	while(item) {
558 		if(dtio_find_in_queue(dtio, item->queue))
559 			return 1;
560 		item = item->next;
561 	}
562 	return 0;
563 }
564 
565 /** callback for the dnstap reconnect, to start reconnecting to output */
dtio_reconnect_timeout_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)566 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
567 	short ATTR_UNUSED(bits), void* arg)
568 {
569 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
570 	dtio->reconnect_is_added = 0;
571 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
572 
573 	dtio_open_output(dtio);
574 	if(dtio->event) {
575 		if(!dtio_add_output_event_write(dtio))
576 			return;
577 		/* nothing wrong so far, wait on the output event */
578 		return;
579 	}
580 	/* exponential backoff and retry on timer */
581 	dtio_reconnect_enable(dtio);
582 }
583 
584 /** attempt to reconnect to the output, after a timeout */
dtio_reconnect_enable(struct dt_io_thread * dtio)585 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
586 {
587 	struct timeval tv;
588 	int msec;
589 	if(dtio->want_to_exit) return;
590 	if(dtio->reconnect_is_added)
591 		return; /* already done */
592 
593 	/* exponential backoff, store the value for next timeout */
594 	msec = dtio->reconnect_timeout;
595 	if(msec == 0) {
596 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
597 	} else {
598 		dtio->reconnect_timeout = msec*2;
599 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
600 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
601 	}
602 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
603 		msec);
604 
605 	/* setup wait timer */
606 	memset(&tv, 0, sizeof(tv));
607 	tv.tv_sec = msec/1000;
608 	tv.tv_usec = (msec%1000)*1000;
609 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
610 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
611 		log_err("dnstap io: could not reconnect ev timer add");
612 		return;
613 	}
614 	dtio->reconnect_is_added = 1;
615 }
616 
617 /** remove dtio reconnect timer */
dtio_reconnect_del(struct dt_io_thread * dtio)618 static void dtio_reconnect_del(struct dt_io_thread* dtio)
619 {
620 	if(!dtio->reconnect_is_added)
621 		return;
622 	ub_timer_del(dtio->reconnect_timer);
623 	dtio->reconnect_is_added = 0;
624 }
625 
626 /** clear the reconnect exponential backoff timer.
627  * We have successfully connected so we can try again with short timeouts. */
dtio_reconnect_clear(struct dt_io_thread * dtio)628 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
629 {
630 	dtio->reconnect_timeout = 0;
631 	dtio_reconnect_del(dtio);
632 }
633 
634 /** reconnect slowly, because we already know we have to wait for a bit */
dtio_reconnect_slow(struct dt_io_thread * dtio,int msec)635 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
636 {
637 	dtio_reconnect_del(dtio);
638 	dtio->reconnect_timeout = msec;
639 	dtio_reconnect_enable(dtio);
640 }
641 
642 /** delete the current message in the dtio, and reset counters */
dtio_cur_msg_free(struct dt_io_thread * dtio)643 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
644 {
645 	free(dtio->cur_msg);
646 	dtio->cur_msg = NULL;
647 	dtio->cur_msg_len = 0;
648 	dtio->cur_msg_done = 0;
649 	dtio->cur_msg_len_done = 0;
650 }
651 
652 /** delete the buffer and counters used to read frame */
dtio_read_frame_free(struct dt_frame_read_buf * rb)653 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
654 {
655 	if(rb->buf) {
656 		free(rb->buf);
657 		rb->buf = NULL;
658 	}
659 	rb->buf_count = 0;
660 	rb->buf_cap = 0;
661 	rb->frame_len = 0;
662 	rb->frame_len_done = 0;
663 	rb->control_frame = 0;
664 }
665 
666 /** del the output file descriptor event for listening */
dtio_del_output_event(struct dt_io_thread * dtio)667 static void dtio_del_output_event(struct dt_io_thread* dtio)
668 {
669 	if(!dtio->event_added)
670 		return;
671 	ub_event_del(dtio->event);
672 	dtio->event_added = 0;
673 	dtio->event_added_is_write = 0;
674 }
675 
676 /** close dtio socket and set it to -1 */
dtio_close_fd(struct dt_io_thread * dtio)677 static void dtio_close_fd(struct dt_io_thread* dtio)
678 {
679 	sock_close(dtio->fd);
680 	dtio->fd = -1;
681 }
682 
683 /** close and stop the output file descriptor event */
dtio_close_output(struct dt_io_thread * dtio)684 static void dtio_close_output(struct dt_io_thread* dtio)
685 {
686 	if(!dtio->event)
687 		return;
688 	ub_event_free(dtio->event);
689 	dtio->event = NULL;
690 	if(dtio->ssl) {
691 #ifdef HAVE_SSL
692 		SSL_shutdown(dtio->ssl);
693 		SSL_free(dtio->ssl);
694 		dtio->ssl = NULL;
695 #endif
696 	}
697 	dtio_close_fd(dtio);
698 
699 	/* if there is a (partial) message, discard it
700 	 * we cannot send (the remainder of) it, and a new
701 	 * connection needs to start with a control frame. */
702 	if(dtio->cur_msg) {
703 		dtio_cur_msg_free(dtio);
704 	}
705 
706 	dtio->ready_frame_sent = 0;
707 	dtio->accept_frame_received = 0;
708 	dtio_read_frame_free(&dtio->read_frame);
709 
710 	dtio_reconnect_enable(dtio);
711 }
712 
713 /** check for pending nonblocking connect errors,
714  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
dtio_check_nb_connect(struct dt_io_thread * dtio)715 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
716 {
717 	int error = 0;
718 	socklen_t len = (socklen_t)sizeof(error);
719 	if(!dtio->check_nb_connect)
720 		return 1; /* everything okay */
721 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
722 		&len) < 0) {
723 #ifndef USE_WINSOCK
724 		error = errno; /* on solaris errno is error */
725 #else
726 		error = WSAGetLastError();
727 #endif
728 	}
729 #ifndef USE_WINSOCK
730 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
731 	if(error == EINPROGRESS || error == EWOULDBLOCK)
732 		return 0; /* try again later */
733 #endif
734 #else
735 	if(error == WSAEINPROGRESS) {
736 		return 0; /* try again later */
737 	} else if(error == WSAEWOULDBLOCK) {
738 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
739 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
740 		return 0; /* try again later */
741 	}
742 #endif
743 	if(error != 0) {
744 		char* to = dtio->socket_path;
745 		if(!to) to = dtio->ip_str;
746 		if(!to) to = "";
747 		log_err("dnstap io: failed to connect to \"%s\": %s",
748 			to, sock_strerror(error));
749 		return -1; /* error, close it */
750 	}
751 
752 	if(dtio->ip_str)
753 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
754 			dtio->ip_str);
755 	else if(dtio->socket_path)
756 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
757 			dtio->socket_path);
758 	dtio_reconnect_clear(dtio);
759 	dtio->check_nb_connect = 0;
760 	return 1; /* everything okay */
761 }
762 
763 #ifdef HAVE_SSL
764 /** write to ssl output
765  * returns number of bytes written, 0 if nothing happened,
766  * try again later, or -1 if the channel is to be closed. */
dtio_write_ssl(struct dt_io_thread * dtio,uint8_t * buf,size_t len)767 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
768 	size_t len)
769 {
770 	int r;
771 	ERR_clear_error();
772 	r = SSL_write(dtio->ssl, buf, len);
773 	if(r <= 0) {
774 		int want = SSL_get_error(dtio->ssl, r);
775 		if(want == SSL_ERROR_ZERO_RETURN) {
776 			/* closed */
777 			return -1;
778 		} else if(want == SSL_ERROR_WANT_READ) {
779 			/* we want a brief read event */
780 			dtio_enable_brief_read(dtio);
781 			return 0;
782 		} else if(want == SSL_ERROR_WANT_WRITE) {
783 			/* write again later */
784 			return 0;
785 		} else if(want == SSL_ERROR_SYSCALL) {
786 #ifdef EPIPE
787 			if(errno == EPIPE && verbosity < 2)
788 				return -1; /* silence 'broken pipe' */
789 #endif
790 #ifdef ECONNRESET
791 			if(errno == ECONNRESET && verbosity < 2)
792 				return -1; /* silence reset by peer */
793 #endif
794 			if(errno != 0) {
795 				log_err("dnstap io, SSL_write syscall: %s",
796 					strerror(errno));
797 			}
798 			return -1;
799 		}
800 		log_crypto_err_io("dnstap io, could not SSL_write", want);
801 		return -1;
802 	}
803 	return r;
804 }
805 #endif /* HAVE_SSL */
806 
807 /** write buffer to output.
808  * returns number of bytes written, 0 if nothing happened,
809  * try again later, or -1 if the channel is to be closed. */
dtio_write_buf(struct dt_io_thread * dtio,uint8_t * buf,size_t len)810 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
811 	size_t len)
812 {
813 	ssize_t ret;
814 	if(dtio->fd == -1)
815 		return -1;
816 #ifdef HAVE_SSL
817 	if(dtio->ssl)
818 		return dtio_write_ssl(dtio, buf, len);
819 #endif
820 	ret = send(dtio->fd, (void*)buf, len, 0);
821 	if(ret == -1) {
822 #ifndef USE_WINSOCK
823 		if(errno == EINTR || errno == EAGAIN)
824 			return 0;
825 #else
826 		if(WSAGetLastError() == WSAEINPROGRESS)
827 			return 0;
828 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
829 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
830 				dtio->stop_flush_event:dtio->event),
831 				UB_EV_WRITE);
832 			return 0;
833 		}
834 #endif
835 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
836 		return -1;
837 	}
838 	return ret;
839 }
840 
841 #ifdef HAVE_WRITEV
842 /** write with writev, len and message, in one write, if possible.
843  * return true if message is done, false if incomplete */
dtio_write_with_writev(struct dt_io_thread * dtio)844 static int dtio_write_with_writev(struct dt_io_thread* dtio)
845 {
846 	uint32_t sendlen = htonl(dtio->cur_msg_len);
847 	struct iovec iov[2];
848 	ssize_t r;
849 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
850 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
851 	iov[1].iov_base = dtio->cur_msg;
852 	iov[1].iov_len = dtio->cur_msg_len;
853 	log_assert(iov[0].iov_len > 0);
854 	r = writev(dtio->fd, iov, 2);
855 	if(r == -1) {
856 #ifndef USE_WINSOCK
857 		if(errno == EINTR || errno == EAGAIN)
858 			return 0;
859 #else
860 		if(WSAGetLastError() == WSAEINPROGRESS)
861 			return 0;
862 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
863 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
864 				dtio->stop_flush_event:dtio->event),
865 				UB_EV_WRITE);
866 			return 0;
867 		}
868 #endif
869 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
870 		/* close the channel */
871 		dtio_del_output_event(dtio);
872 		dtio_close_output(dtio);
873 		return 0;
874 	}
875 	/* written r bytes */
876 	dtio->cur_msg_len_done += r;
877 	if(dtio->cur_msg_len_done < 4)
878 		return 0;
879 	if(dtio->cur_msg_len_done > 4) {
880 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
881 		dtio->cur_msg_len_done = 4;
882 	}
883 	if(dtio->cur_msg_done < dtio->cur_msg_len)
884 		return 0;
885 	return 1;
886 }
887 #endif /* HAVE_WRITEV */
888 
889 /** write more of the length, preceding the data frame.
890  * return true if message is done, false if incomplete. */
dtio_write_more_of_len(struct dt_io_thread * dtio)891 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
892 {
893 	uint32_t sendlen;
894 	int r;
895 	if(dtio->cur_msg_len_done >= 4)
896 		return 1;
897 #ifdef HAVE_WRITEV
898 	if(!dtio->ssl) {
899 		/* we try writev for everything.*/
900 		return dtio_write_with_writev(dtio);
901 	}
902 #endif /* HAVE_WRITEV */
903 	sendlen = htonl(dtio->cur_msg_len);
904 	r = dtio_write_buf(dtio,
905 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
906 		sizeof(sendlen)-dtio->cur_msg_len_done);
907 	if(r == -1) {
908 		/* close the channel */
909 		dtio_del_output_event(dtio);
910 		dtio_close_output(dtio);
911 		return 0;
912 	} else if(r == 0) {
913 		/* try again later */
914 		return 0;
915 	}
916 	dtio->cur_msg_len_done += r;
917 	if(dtio->cur_msg_len_done < 4)
918 		return 0;
919 	return 1;
920 }
921 
922 /** write more of the data frame.
923  * return true if message is done, false if incomplete. */
dtio_write_more_of_data(struct dt_io_thread * dtio)924 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
925 {
926 	int r;
927 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
928 		return 1;
929 	r = dtio_write_buf(dtio,
930 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
931 		dtio->cur_msg_len - dtio->cur_msg_done);
932 	if(r == -1) {
933 		/* close the channel */
934 		dtio_del_output_event(dtio);
935 		dtio_close_output(dtio);
936 		return 0;
937 	} else if(r == 0) {
938 		/* try again later */
939 		return 0;
940 	}
941 	dtio->cur_msg_done += r;
942 	if(dtio->cur_msg_done < dtio->cur_msg_len)
943 		return 0;
944 	return 1;
945 }
946 
947 /** write more of the current message. false if incomplete, true if
948  * the message is done */
dtio_write_more(struct dt_io_thread * dtio)949 static int dtio_write_more(struct dt_io_thread* dtio)
950 {
951 	if(dtio->cur_msg_len_done < 4) {
952 		if(!dtio_write_more_of_len(dtio))
953 			return 0;
954 	}
955 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
956 		if(!dtio_write_more_of_data(dtio))
957 			return 0;
958 	}
959 	return 1;
960 }
961 
962 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
963  * -1: continue, >0: number of bytes read into buffer */
receive_bytes(struct dt_io_thread * dtio,void * buf,size_t len)964 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
965 	ssize_t r;
966 	r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
967 	if(r == -1) {
968 		char* to = dtio->socket_path;
969 		if(!to) to = dtio->ip_str;
970 		if(!to) to = "";
971 #ifndef USE_WINSOCK
972 		if(errno == EINTR || errno == EAGAIN)
973 			return -1; /* try later */
974 #else
975 		if(WSAGetLastError() == WSAEINPROGRESS) {
976 			return -1; /* try later */
977 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
978 			ub_winsock_tcp_wouldblock(
979 				(dtio->stop_flush_event?
980 				dtio->stop_flush_event:dtio->event),
981 				UB_EV_READ);
982 			return -1; /* try later */
983 		}
984 #endif
985 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986 			verbosity < 4)
987 			return 0; /* no log retries on low verbosity */
988 		log_err("dnstap io: output closed, recv %s: %s", to,
989 			strerror(errno));
990 		/* and close below */
991 		return 0;
992 	}
993 	if(r == 0) {
994 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
995 			verbosity < 4)
996 			return 0; /* no log retries on low verbosity */
997 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
998 		/* and close below */
999 		return 0;
1000 	}
1001 	/* something was received */
1002 	return r;
1003 }
1004 
1005 #ifdef HAVE_SSL
1006 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
1007  * -1: continue, >0: number of bytes read into buffer */
ssl_read_bytes(struct dt_io_thread * dtio,void * buf,size_t len)1008 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1009 {
1010 	int r;
1011 	ERR_clear_error();
1012 	r = SSL_read(dtio->ssl, buf, len);
1013 	if(r <= 0) {
1014 		int want = SSL_get_error(dtio->ssl, r);
1015 		if(want == SSL_ERROR_ZERO_RETURN) {
1016 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017 				verbosity < 4)
1018 				return 0; /* no log retries on low verbosity */
1019 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1020 				"other side");
1021 			return 0;
1022 		} else if(want == SSL_ERROR_WANT_READ) {
1023 			/* continue later */
1024 			return -1;
1025 		} else if(want == SSL_ERROR_WANT_WRITE) {
1026 			(void)dtio_enable_brief_write(dtio);
1027 			return -1;
1028 		} else if(want == SSL_ERROR_SYSCALL) {
1029 #ifdef ECONNRESET
1030 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1031 				errno == ECONNRESET && verbosity < 4)
1032 				return 0; /* silence reset by peer */
1033 #endif
1034 			if(errno != 0)
1035 				log_err("SSL_read syscall: %s",
1036 					strerror(errno));
1037 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1038 				"other side");
1039 			return 0;
1040 		}
1041 		log_crypto_err_io("could not SSL_read", want);
1042 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1043 				"other side");
1044 		return 0;
1045 	}
1046 	return r;
1047 }
1048 #endif /* HAVE_SSL */
1049 
1050 /** check if the output fd has been closed,
1051  * it returns false if the stream is closed. */
dtio_check_close(struct dt_io_thread * dtio)1052 static int dtio_check_close(struct dt_io_thread* dtio)
1053 {
1054 	/* we don't want to read any packets, but if there are we can
1055 	 * discard the input (ignore it).  Ignore of unknown (control)
1056 	 * packets is okay for the framestream protocol.  And also, the
1057 	 * read call can return that the stream has been closed by the
1058 	 * other side. */
1059 	uint8_t buf[1024];
1060 	int r = -1;
1061 
1062 
1063 	if(dtio->fd == -1) return 0;
1064 
1065 	while(r != 0) {
1066 		/* not interested in buffer content, overwrite */
1067 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1068 		if(r == -1)
1069 			return 1;
1070 	}
1071 	/* the other end has been closed */
1072 	/* close the channel */
1073 	dtio_del_output_event(dtio);
1074 	dtio_close_output(dtio);
1075 	return 0;
1076 }
1077 
1078 /** Read accept frame. Returns -1: continue reading, 0: closed,
1079  * 1: valid accept received. */
dtio_read_accept_frame(struct dt_io_thread * dtio)1080 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1081 {
1082 	int r;
1083 	size_t read_frame_done;
1084 	while(dtio->read_frame.frame_len_done < 4) {
1085 #ifdef HAVE_SSL
1086 		if(dtio->ssl) {
1087 			r = ssl_read_bytes(dtio,
1088 				(uint8_t*)&dtio->read_frame.frame_len+
1089 				dtio->read_frame.frame_len_done,
1090 				4-dtio->read_frame.frame_len_done);
1091 		} else {
1092 #endif
1093 			r = receive_bytes(dtio,
1094 				(uint8_t*)&dtio->read_frame.frame_len+
1095 				dtio->read_frame.frame_len_done,
1096 				4-dtio->read_frame.frame_len_done);
1097 #ifdef HAVE_SSL
1098 		}
1099 #endif
1100 		if(r == -1)
1101 			return -1; /* continue reading */
1102 		if(r == 0) {
1103 			 /* connection closed */
1104 			goto close_connection;
1105 		}
1106 		dtio->read_frame.frame_len_done += r;
1107 		if(dtio->read_frame.frame_len_done < 4)
1108 			return -1; /* continue reading */
1109 
1110 		if(dtio->read_frame.frame_len == 0) {
1111 			dtio->read_frame.frame_len_done = 0;
1112 			dtio->read_frame.control_frame = 1;
1113 			continue;
1114 		}
1115 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1116 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1117 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1118 				"length of %d bytes, closing connection",
1119 				DTIO_RECV_FRAME_MAX_LEN);
1120 			goto close_connection;
1121 		}
1122 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1123 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1124 		if(!dtio->read_frame.buf) {
1125 			log_err("dnstap io: out of memory (creating read "
1126 				"buffer)");
1127 			goto close_connection;
1128 		}
1129 	}
1130 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1131 #ifdef HAVE_SSL
1132 		if(dtio->ssl) {
1133 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1134 				dtio->read_frame.buf_count,
1135 				dtio->read_frame.buf_cap-
1136 				dtio->read_frame.buf_count);
1137 		} else {
1138 #endif
1139 			r = receive_bytes(dtio, dtio->read_frame.buf+
1140 				dtio->read_frame.buf_count,
1141 				dtio->read_frame.buf_cap-
1142 				dtio->read_frame.buf_count);
1143 #ifdef HAVE_SSL
1144 		}
1145 #endif
1146 		if(r == -1)
1147 			return -1; /* continue reading */
1148 		if(r == 0) {
1149 			 /* connection closed */
1150 			goto close_connection;
1151 		}
1152 		dtio->read_frame.buf_count += r;
1153 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1154 			return -1; /* continue reading */
1155 	}
1156 
1157 	/* Complete frame received, check if this is a valid ACCEPT control
1158 	 * frame. */
1159 	if(dtio->read_frame.frame_len < 4) {
1160 		verbose(VERB_OPS, "dnstap: invalid data received");
1161 		goto close_connection;
1162 	}
1163 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1164 		FSTRM_CONTROL_FRAME_ACCEPT) {
1165 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1166 			"ignored");
1167 		dtio->ready_frame_sent = 0;
1168 		dtio->accept_frame_received = 0;
1169 		dtio_read_frame_free(&dtio->read_frame);
1170 		return -1;
1171 	}
1172 	read_frame_done = 4; /* control frame type */
1173 
1174 	/* Iterate over control fields, ignore unknown types.
1175 	 * Need to be able to read at least 8 bytes (control field type +
1176 	 * length). */
1177 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1178 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1179 			read_frame_done);
1180 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1181 			read_frame_done + 4);
1182 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1183 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1184 				read_frame_done+8+len <=
1185 				dtio->read_frame.frame_len &&
1186 				memcmp(dtio->read_frame.buf + read_frame_done +
1187 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1188 				if(!dtio_control_start_send(dtio)) {
1189 					verbose(VERB_OPS, "dnstap io: out of "
1190 					 "memory while sending START frame");
1191 					goto close_connection;
1192 				}
1193 				dtio->accept_frame_received = 1;
1194 				if(!dtio_add_output_event_write(dtio))
1195 					goto close_connection;
1196 				return 1;
1197 			} else {
1198 				/* unknown content type */
1199 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1200 					"contains unknown content type, "
1201 					"closing connection");
1202 				goto close_connection;
1203 			}
1204 		}
1205 		/* unknown option, try next */
1206 		read_frame_done += 8+len;
1207 	}
1208 
1209 
1210 close_connection:
1211 	dtio_del_output_event(dtio);
1212 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1213 	dtio_close_output(dtio);
1214 	return 0;
1215 }
1216 
1217 /** add the output file descriptor event for listening, read only */
dtio_add_output_event_read(struct dt_io_thread * dtio)1218 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1219 {
1220 	if(!dtio->event)
1221 		return 0;
1222 	if(dtio->event_added && !dtio->event_added_is_write)
1223 		return 1;
1224 	/* we have to (re-)register the event */
1225 	if(dtio->event_added)
1226 		ub_event_del(dtio->event);
1227 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1228 	if(ub_event_add(dtio->event, NULL) != 0) {
1229 		log_err("dnstap io: out of memory (adding event)");
1230 		dtio->event_added = 0;
1231 		dtio->event_added_is_write = 0;
1232 		/* close output and start reattempts to open it */
1233 		dtio_close_output(dtio);
1234 		return 0;
1235 	}
1236 	dtio->event_added = 1;
1237 	dtio->event_added_is_write = 0;
1238 	return 1;
1239 }
1240 
1241 /** add the output file descriptor event for listening, read and write */
dtio_add_output_event_write(struct dt_io_thread * dtio)1242 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1243 {
1244 	if(!dtio->event)
1245 		return 0;
1246 	if(dtio->event_added && dtio->event_added_is_write)
1247 		return 1;
1248 	/* we have to (re-)register the event */
1249 	if(dtio->event_added)
1250 		ub_event_del(dtio->event);
1251 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1252 	if(ub_event_add(dtio->event, NULL) != 0) {
1253 		log_err("dnstap io: out of memory (adding event)");
1254 		dtio->event_added = 0;
1255 		dtio->event_added_is_write = 0;
1256 		/* close output and start reattempts to open it */
1257 		dtio_close_output(dtio);
1258 		return 0;
1259 	}
1260 	dtio->event_added = 1;
1261 	dtio->event_added_is_write = 1;
1262 	return 1;
1263 }
1264 
1265 /** put the dtio thread to sleep */
dtio_sleep(struct dt_io_thread * dtio)1266 static void dtio_sleep(struct dt_io_thread* dtio)
1267 {
1268 	/* unregister the event polling for write, because there is
1269 	 * nothing to be written */
1270 	(void)dtio_add_output_event_read(dtio);
1271 
1272 	/* Set wakeuptimer enabled off; so that the next worker thread that
1273 	 * wants to log starts a timer if needed, since the writer thread
1274 	 * has gone to sleep. */
1275 	lock_basic_lock(&dtio->wakeup_timer_lock);
1276 	dtio->wakeup_timer_enabled = 0;
1277 	lock_basic_unlock(&dtio->wakeup_timer_lock);
1278 }
1279 
1280 #ifdef HAVE_SSL
1281 /** enable the brief read condition */
dtio_enable_brief_read(struct dt_io_thread * dtio)1282 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1283 {
1284 	dtio->ssl_brief_read = 1;
1285 	if(dtio->stop_flush_event) {
1286 		ub_event_del(dtio->stop_flush_event);
1287 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1288 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1289 			log_err("dnstap io, stop flush, could not ub_event_add");
1290 			return 0;
1291 		}
1292 		return 1;
1293 	}
1294 	return dtio_add_output_event_read(dtio);
1295 }
1296 #endif /* HAVE_SSL */
1297 
1298 #ifdef HAVE_SSL
1299 /** disable the brief read condition */
dtio_disable_brief_read(struct dt_io_thread * dtio)1300 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1301 {
1302 	dtio->ssl_brief_read = 0;
1303 	if(dtio->stop_flush_event) {
1304 		ub_event_del(dtio->stop_flush_event);
1305 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1306 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1307 			log_err("dnstap io, stop flush, could not ub_event_add");
1308 			return 0;
1309 		}
1310 		return 1;
1311 	}
1312 	return dtio_add_output_event_write(dtio);
1313 }
1314 #endif /* HAVE_SSL */
1315 
1316 #ifdef HAVE_SSL
1317 /** enable the brief write condition */
dtio_enable_brief_write(struct dt_io_thread * dtio)1318 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1319 {
1320 	dtio->ssl_brief_write = 1;
1321 	return dtio_add_output_event_write(dtio);
1322 }
1323 #endif /* HAVE_SSL */
1324 
1325 #ifdef HAVE_SSL
1326 /** disable the brief write condition */
dtio_disable_brief_write(struct dt_io_thread * dtio)1327 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1328 {
1329 	dtio->ssl_brief_write = 0;
1330 	return dtio_add_output_event_read(dtio);
1331 }
1332 #endif /* HAVE_SSL */
1333 
1334 #ifdef HAVE_SSL
1335 /** check peer verification after ssl handshake connection, false if closed*/
dtio_ssl_check_peer(struct dt_io_thread * dtio)1336 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1337 {
1338 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1339 		/* verification */
1340 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1341 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1342 			X509* x = SSL_get1_peer_certificate(dtio->ssl);
1343 #else
1344 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1345 #endif
1346 			if(!x) {
1347 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1348 					"connection failed no certificate",
1349 					dtio->ip_str);
1350 				return 0;
1351 			}
1352 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1353 				x);
1354 #ifdef HAVE_SSL_GET0_PEERNAME
1355 			if(SSL_get0_peername(dtio->ssl)) {
1356 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1357 					"connection to %s authenticated",
1358 					dtio->ip_str,
1359 					SSL_get0_peername(dtio->ssl));
1360 			} else {
1361 #endif
1362 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1363 					"connection authenticated",
1364 					dtio->ip_str);
1365 #ifdef HAVE_SSL_GET0_PEERNAME
1366 			}
1367 #endif
1368 			X509_free(x);
1369 		} else {
1370 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1371 			X509* x = SSL_get1_peer_certificate(dtio->ssl);
1372 #else
1373 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1374 #endif
1375 			if(x) {
1376 				log_cert(VERB_ALGO, "dnstap io, peer "
1377 					"certificate", x);
1378 				X509_free(x);
1379 			}
1380 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1381 				"failed: failed to authenticate",
1382 				dtio->ip_str);
1383 			return 0;
1384 		}
1385 	} else {
1386 		/* unauthenticated, the verify peer flag was not set
1387 		 * in ssl when the ssl object was created from ssl_ctx */
1388 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1389 			dtio->ip_str);
1390 	}
1391 	return 1;
1392 }
1393 #endif /* HAVE_SSL */
1394 
1395 #ifdef HAVE_SSL
1396 /** perform ssl handshake, returns 1 if okay, 0 to stop */
dtio_ssl_handshake(struct dt_io_thread * dtio,struct stop_flush_info * info)1397 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1398 	struct stop_flush_info* info)
1399 {
1400 	int r;
1401 	if(dtio->ssl_brief_read) {
1402 		/* assume the brief read condition is satisfied,
1403 		 * if we need more or again, we can set it again */
1404 		if(!dtio_disable_brief_read(dtio)) {
1405 			if(info) dtio_stop_flush_exit(info);
1406 			return 0;
1407 		}
1408 	}
1409 	if(dtio->ssl_handshake_done)
1410 		return 1;
1411 
1412 	ERR_clear_error();
1413 	r = SSL_do_handshake(dtio->ssl);
1414 	if(r != 1) {
1415 		int want = SSL_get_error(dtio->ssl, r);
1416 		if(want == SSL_ERROR_WANT_READ) {
1417 			/* we want to read on the connection */
1418 			if(!dtio_enable_brief_read(dtio)) {
1419 				if(info) dtio_stop_flush_exit(info);
1420 				return 0;
1421 			}
1422 			return 0;
1423 		} else if(want == SSL_ERROR_WANT_WRITE) {
1424 			/* we want to write on the connection */
1425 			return 0;
1426 		} else if(r == 0) {
1427 			/* closed */
1428 			if(info) dtio_stop_flush_exit(info);
1429 			dtio_del_output_event(dtio);
1430 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1431 			dtio_close_output(dtio);
1432 			return 0;
1433 		} else if(want == SSL_ERROR_SYSCALL) {
1434 			/* SYSCALL and errno==0 means closed uncleanly */
1435 			int silent = 0;
1436 #ifdef EPIPE
1437 			if(errno == EPIPE && verbosity < 2)
1438 				silent = 1; /* silence 'broken pipe' */
1439 #endif
1440 #ifdef ECONNRESET
1441 			if(errno == ECONNRESET && verbosity < 2)
1442 				silent = 1; /* silence reset by peer */
1443 #endif
1444 			if(errno == 0)
1445 				silent = 1;
1446 			if(!silent)
1447 				log_err("dnstap io, SSL_handshake syscall: %s",
1448 					strerror(errno));
1449 			/* closed */
1450 			if(info) dtio_stop_flush_exit(info);
1451 			dtio_del_output_event(dtio);
1452 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1453 			dtio_close_output(dtio);
1454 			return 0;
1455 		} else {
1456 			unsigned long err = ERR_get_error();
1457 			if(!squelch_err_ssl_handshake(err)) {
1458 				log_crypto_err_io_code("dnstap io, ssl handshake failed",
1459 					want, err);
1460 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1461 					"from %s", dtio->ip_str);
1462 			}
1463 			/* closed */
1464 			if(info) dtio_stop_flush_exit(info);
1465 			dtio_del_output_event(dtio);
1466 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1467 			dtio_close_output(dtio);
1468 			return 0;
1469 		}
1470 
1471 	}
1472 	/* check peer verification */
1473 	dtio->ssl_handshake_done = 1;
1474 
1475 	if(!dtio_ssl_check_peer(dtio)) {
1476 		/* closed */
1477 		if(info) dtio_stop_flush_exit(info);
1478 		dtio_del_output_event(dtio);
1479 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1480 		dtio_close_output(dtio);
1481 		return 0;
1482 	}
1483 	return 1;
1484 }
1485 #endif /* HAVE_SSL */
1486 
1487 /** callback for the dnstap events, to write to the output */
dtio_output_cb(int ATTR_UNUSED (fd),short bits,void * arg)1488 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1489 {
1490 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1491 	int i;
1492 
1493 	if(dtio->check_nb_connect) {
1494 		int connect_err = dtio_check_nb_connect(dtio);
1495 		if(connect_err == -1) {
1496 			/* close the channel */
1497 			dtio_del_output_event(dtio);
1498 			dtio_close_output(dtio);
1499 			return;
1500 		} else if(connect_err == 0) {
1501 			/* try again later */
1502 			return;
1503 		}
1504 		/* nonblocking connect check passed, continue */
1505 	}
1506 
1507 #ifdef HAVE_SSL
1508 	if(dtio->ssl &&
1509 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1510 		if(!dtio_ssl_handshake(dtio, NULL))
1511 			return;
1512 	}
1513 #endif
1514 
1515 	if((bits&UB_EV_READ) || dtio->ssl_brief_write) {
1516 #ifdef HAVE_SSL
1517 		if(dtio->ssl_brief_write)
1518 			(void)dtio_disable_brief_write(dtio);
1519 #endif
1520 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1521 			if(dtio_read_accept_frame(dtio) <= 0)
1522 				return;
1523 		} else if(!dtio_check_close(dtio))
1524 			return;
1525 	}
1526 
1527 	/* loop to process a number of messages.  This improves throughput,
1528 	 * because selecting on write-event if not needed for busy messages
1529 	 * (dnstap log) generation and if they need to all be written back.
1530 	 * The write event is usually not blocked up.  But not forever,
1531 	 * because the event loop needs to stay responsive for other events.
1532 	 * If there are no (more) messages, or if the output buffers get
1533 	 * full, it returns out of the loop. */
1534 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1535 		/* see if there are messages that need writing */
1536 		if(!dtio->cur_msg) {
1537 			if(!dtio_find_msg(dtio)) {
1538 				if(i == 0) {
1539 					/* no messages on the first iteration,
1540 					 * the queues are all empty */
1541 					dtio_sleep(dtio);
1542 					/* After putting to sleep, see if
1543 					 * a message is in a message queue,
1544 					 * if so, resume service. Stops a
1545 					 * race condition where a thread could
1546 					 * have one message but the dtio
1547 					 * also just went to sleep. With the
1548 					 * message queued between the
1549 					 * dtio_find_msg and dtio_sleep
1550 					 * calls. */
1551 					if(dtio_find_msg(dtio)) {
1552 						if(!dtio_add_output_event_write(dtio))
1553 							return;
1554 					}
1555 				}
1556 				if(!dtio->cur_msg)
1557 					return; /* nothing to do */
1558 			}
1559 		}
1560 
1561 		/* write it */
1562 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1563 			if(!dtio_write_more(dtio))
1564 				return;
1565 		}
1566 
1567 		/* done with the current message */
1568 		dtio_cur_msg_free(dtio);
1569 
1570 		/* If this is a bidirectional stream the first message will be
1571 		 * the READY control frame. We can only continue writing after
1572 		 * receiving an ACCEPT control frame. */
1573 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1574 			dtio->ready_frame_sent = 1;
1575 			(void)dtio_add_output_event_read(dtio);
1576 			break;
1577 		}
1578 	}
1579 }
1580 
1581 /** callback for the dnstap commandpipe, to stop the dnstap IO */
dtio_cmd_cb(int fd,short ATTR_UNUSED (bits),void * arg)1582 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1583 {
1584 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1585 	uint8_t cmd;
1586 	ssize_t r;
1587 	if(dtio->want_to_exit)
1588 		return;
1589 	r = read(fd, &cmd, sizeof(cmd));
1590 	if(r == -1) {
1591 #ifndef USE_WINSOCK
1592 		if(errno == EINTR || errno == EAGAIN)
1593 			return; /* ignore this */
1594 #else
1595 		if(WSAGetLastError() == WSAEINPROGRESS)
1596 			return;
1597 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1598 			return;
1599 #endif
1600 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1601 		/* and then fall through to quit the thread */
1602 	} else if(r == 0) {
1603 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1604 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1605 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1606 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1607 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1608 
1609 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1610 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1611 				"waiting for ACCEPT control frame");
1612 			return;
1613 		}
1614 
1615 		/* reregister event */
1616 		if(!dtio_add_output_event_write(dtio))
1617 			return;
1618 		return;
1619 	} else if(r == 1) {
1620 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1621 	}
1622 	dtio->want_to_exit = 1;
1623 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1624 		!= 0) {
1625 		log_err("dnstap io: could not loopexit");
1626 	}
1627 }
1628 
1629 #ifndef THREADS_DISABLED
1630 /** setup the event base for the dnstap io thread */
dtio_setup_base(struct dt_io_thread * dtio,time_t * secs,struct timeval * now)1631 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1632 	struct timeval* now)
1633 {
1634 	memset(now, 0, sizeof(*now));
1635 	dtio->event_base = ub_default_event_base(0, secs, now);
1636 	if(!dtio->event_base) {
1637 		fatal_exit("dnstap io: could not create event_base");
1638 	}
1639 }
1640 #endif /* THREADS_DISABLED */
1641 
1642 /** setup the cmd event for dnstap io */
dtio_setup_cmd(struct dt_io_thread * dtio)1643 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1644 {
1645 	struct ub_event* cmdev;
1646 	fd_set_nonblock(dtio->commandpipe[0]);
1647 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1648 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1649 	if(!cmdev) {
1650 		fatal_exit("dnstap io: out of memory");
1651 	}
1652 	dtio->command_event = cmdev;
1653 	if(ub_event_add(cmdev, NULL) != 0) {
1654 		fatal_exit("dnstap io: out of memory (adding event)");
1655 	}
1656 }
1657 
1658 /** setup the reconnect event for dnstap io */
dtio_setup_reconnect(struct dt_io_thread * dtio)1659 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1660 {
1661 	dtio_reconnect_clear(dtio);
1662 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1663 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1664 	if(!dtio->reconnect_timer) {
1665 		fatal_exit("dnstap io: out of memory");
1666 	}
1667 }
1668 
1669 /**
1670  * structure to keep track of information during stop flush
1671  */
1672 struct stop_flush_info {
1673 	/** the event base during stop flush */
1674 	struct ub_event_base* base;
1675 	/** did we already want to exit this stop-flush event base */
1676 	int want_to_exit_flush;
1677 	/** has the timer fired */
1678 	int timer_done;
1679 	/** the dtio */
1680 	struct dt_io_thread* dtio;
1681 	/** the stop control frame */
1682 	void* stop_frame;
1683 	/** length of the stop frame */
1684 	size_t stop_frame_len;
1685 	/** how much we have done of the stop frame */
1686 	size_t stop_frame_done;
1687 };
1688 
1689 /** exit the stop flush base */
dtio_stop_flush_exit(struct stop_flush_info * info)1690 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1691 {
1692 	if(info->want_to_exit_flush)
1693 		return;
1694 	info->want_to_exit_flush = 1;
1695 	if(ub_event_base_loopexit(info->base) != 0) {
1696 		log_err("dnstap io: could not loopexit");
1697 	}
1698 }
1699 
1700 /** send the stop control,
1701  * return true if completed the frame. */
dtio_control_stop_send(struct stop_flush_info * info)1702 static int dtio_control_stop_send(struct stop_flush_info* info)
1703 {
1704 	struct dt_io_thread* dtio = info->dtio;
1705 	int r;
1706 	if(info->stop_frame_done >= info->stop_frame_len)
1707 		return 1;
1708 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1709 		info->stop_frame_done, info->stop_frame_len -
1710 		info->stop_frame_done);
1711 	if(r == -1) {
1712 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1713 		dtio_stop_flush_exit(info);
1714 		return 0;
1715 	}
1716 	if(r == 0) {
1717 		/* try again later, or timeout */
1718 		return 0;
1719 	}
1720 	info->stop_frame_done += r;
1721 	if(info->stop_frame_done < info->stop_frame_len)
1722 		return 0; /* not done yet */
1723 	return 1;
1724 }
1725 
dtio_stop_timer_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)1726 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1727 	void* arg)
1728 {
1729 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1730 	if(info->want_to_exit_flush)
1731 		return;
1732 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1733 	info->timer_done = 1;
1734 	dtio_stop_flush_exit(info);
1735 }
1736 
dtio_stop_ev_cb(int ATTR_UNUSED (fd),short bits,void * arg)1737 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1738 {
1739 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1740 	struct dt_io_thread* dtio = info->dtio;
1741 	if(info->want_to_exit_flush)
1742 		return;
1743 	if(dtio->check_nb_connect) {
1744 		/* we don't start the stop_flush if connect still
1745 		 * in progress, but the check code is here, just in case */
1746 		int connect_err = dtio_check_nb_connect(dtio);
1747 		if(connect_err == -1) {
1748 			/* close the channel, exit the stop flush */
1749 			dtio_stop_flush_exit(info);
1750 			dtio_del_output_event(dtio);
1751 			dtio_close_output(dtio);
1752 			return;
1753 		} else if(connect_err == 0) {
1754 			/* try again later */
1755 			return;
1756 		}
1757 		/* nonblocking connect check passed, continue */
1758 	}
1759 #ifdef HAVE_SSL
1760 	if(dtio->ssl &&
1761 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1762 		if(!dtio_ssl_handshake(dtio, info))
1763 			return;
1764 	}
1765 #endif
1766 
1767 	if((bits&UB_EV_READ)) {
1768 		if(!dtio_check_close(dtio)) {
1769 			if(dtio->fd == -1) {
1770 				verbose(VERB_ALGO, "dnstap io: "
1771 					"stop flush: output closed");
1772 				dtio_stop_flush_exit(info);
1773 			}
1774 			return;
1775 		}
1776 	}
1777 	/* write remainder of last frame */
1778 	if(dtio->cur_msg) {
1779 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1780 			if(!dtio_write_more(dtio)) {
1781 				if(dtio->fd == -1) {
1782 					verbose(VERB_ALGO, "dnstap io: "
1783 						"stop flush: output closed");
1784 					dtio_stop_flush_exit(info);
1785 				}
1786 				return;
1787 			}
1788 		}
1789 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1790 			"last frame");
1791 		dtio_cur_msg_free(dtio);
1792 	}
1793 	/* write stop frame */
1794 	if(info->stop_frame_done < info->stop_frame_len) {
1795 		if(!dtio_control_stop_send(info))
1796 			return;
1797 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1798 			"stop control frame");
1799 	}
1800 	/* when last frame and stop frame are sent, exit */
1801 	dtio_stop_flush_exit(info);
1802 }
1803 
1804 /** flush at end, last packet and stop control */
dtio_control_stop_flush(struct dt_io_thread * dtio)1805 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1806 {
1807 	/* briefly attempt to flush the previous packet to the output,
1808 	 * this could be a partial packet, or even the start control frame */
1809 	time_t secs = 0;
1810 	struct timeval now;
1811 	struct stop_flush_info info;
1812 	struct timeval tv;
1813 	struct ub_event* timer, *stopev;
1814 
1815 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1816 		/* no connection or we have just connected, so nothing is
1817 		 * sent yet, so nothing to stop or flush */
1818 		return;
1819 	}
1820 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1821 		/* no SSL connection has been established yet */
1822 		return;
1823 	}
1824 
1825 	memset(&info, 0, sizeof(info));
1826 	memset(&now, 0, sizeof(now));
1827 	info.dtio = dtio;
1828 	info.base = ub_default_event_base(0, &secs, &now);
1829 	if(!info.base) {
1830 		log_err("dnstap io: malloc failure");
1831 		return;
1832 	}
1833 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1834 		&dtio_stop_timer_cb, &info);
1835 	if(!timer) {
1836 		log_err("dnstap io: malloc failure");
1837 		ub_event_base_free(info.base);
1838 		return;
1839 	}
1840 	memset(&tv, 0, sizeof(tv));
1841 	tv.tv_sec = 2;
1842 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1843 		&tv) != 0) {
1844 		log_err("dnstap io: cannot event_timer_add");
1845 		ub_event_free(timer);
1846 		ub_event_base_free(info.base);
1847 		return;
1848 	}
1849 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1850 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1851 	if(!stopev) {
1852 		log_err("dnstap io: malloc failure");
1853 		ub_timer_del(timer);
1854 		ub_event_free(timer);
1855 		ub_event_base_free(info.base);
1856 		return;
1857 	}
1858 	if(ub_event_add(stopev, NULL) != 0) {
1859 		log_err("dnstap io: cannot event_add");
1860 		ub_event_free(stopev);
1861 		ub_timer_del(timer);
1862 		ub_event_free(timer);
1863 		ub_event_base_free(info.base);
1864 		return;
1865 	}
1866 	info.stop_frame = fstrm_create_control_frame_stop(
1867 		&info.stop_frame_len);
1868 	if(!info.stop_frame) {
1869 		log_err("dnstap io: malloc failure");
1870 		ub_event_del(stopev);
1871 		ub_event_free(stopev);
1872 		ub_timer_del(timer);
1873 		ub_event_free(timer);
1874 		ub_event_base_free(info.base);
1875 		return;
1876 	}
1877 	dtio->stop_flush_event = stopev;
1878 
1879 	/* wait briefly, or until finished */
1880 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1881 	if(ub_event_base_dispatch(info.base) < 0) {
1882 		log_err("dnstap io: dispatch flush failed, errno is %s",
1883 			strerror(errno));
1884 	}
1885 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1886 	free(info.stop_frame);
1887 	dtio->stop_flush_event = NULL;
1888 	ub_event_del(stopev);
1889 	ub_event_free(stopev);
1890 	ub_timer_del(timer);
1891 	ub_event_free(timer);
1892 	ub_event_base_free(info.base);
1893 }
1894 
1895 /** perform desetup and free stuff when the dnstap io thread exits */
dtio_desetup(struct dt_io_thread * dtio)1896 static void dtio_desetup(struct dt_io_thread* dtio)
1897 {
1898 	dtio_control_stop_flush(dtio);
1899 	dtio_del_output_event(dtio);
1900 	dtio_close_output(dtio);
1901 	ub_event_del(dtio->command_event);
1902 	ub_event_free(dtio->command_event);
1903 #ifndef USE_WINSOCK
1904 	close(dtio->commandpipe[0]);
1905 #else
1906 	_close(dtio->commandpipe[0]);
1907 #endif
1908 	dtio->commandpipe[0] = -1;
1909 	dtio_reconnect_del(dtio);
1910 	ub_event_free(dtio->reconnect_timer);
1911 	dtio_cur_msg_free(dtio);
1912 #ifndef THREADS_DISABLED
1913 	ub_event_base_free(dtio->event_base);
1914 #endif
1915 }
1916 
1917 /** setup a start control message */
dtio_control_start_send(struct dt_io_thread * dtio)1918 static int dtio_control_start_send(struct dt_io_thread* dtio)
1919 {
1920 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1921 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1922 		&dtio->cur_msg_len);
1923 	if(!dtio->cur_msg) {
1924 		return 0;
1925 	}
1926 	/* setup to send the control message */
1927 	/* set that the buffer needs to be sent, but the length
1928 	 * of that buffer is already written, that way the buffer can
1929 	 * start with 0 length and then the length of the control frame
1930 	 * in it */
1931 	dtio->cur_msg_done = 0;
1932 	dtio->cur_msg_len_done = 4;
1933 	return 1;
1934 }
1935 
1936 /** setup a ready control message */
dtio_control_ready_send(struct dt_io_thread * dtio)1937 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1938 {
1939 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1940 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1941 		&dtio->cur_msg_len);
1942 	if(!dtio->cur_msg) {
1943 		return 0;
1944 	}
1945 	/* setup to send the control message */
1946 	/* set that the buffer needs to be sent, but the length
1947 	 * of that buffer is already written, that way the buffer can
1948 	 * start with 0 length and then the length of the control frame
1949 	 * in it */
1950 	dtio->cur_msg_done = 0;
1951 	dtio->cur_msg_len_done = 4;
1952 	return 1;
1953 }
1954 
1955 /** open the output file descriptor for af_local */
dtio_open_output_local(struct dt_io_thread * dtio)1956 static int dtio_open_output_local(struct dt_io_thread* dtio)
1957 {
1958 #ifdef HAVE_SYS_UN_H
1959 	struct sockaddr_un s;
1960 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1961 	if(dtio->fd == -1) {
1962 		log_err("dnstap io: failed to create socket: %s",
1963 			sock_strerror(errno));
1964 		return 0;
1965 	}
1966 	memset(&s, 0, sizeof(s));
1967 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1968         /* this member exists on BSDs, not Linux */
1969         s.sun_len = (unsigned)sizeof(s);
1970 #endif
1971 	s.sun_family = AF_LOCAL;
1972 	/* length is 92-108, 104 on FreeBSD */
1973         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1974 	fd_set_nonblock(dtio->fd);
1975 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1976 		== -1) {
1977 		char* to = dtio->socket_path;
1978 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1979 			verbosity < 4) {
1980 			dtio_close_fd(dtio);
1981 			return 0; /* no log retries on low verbosity */
1982 		}
1983 		log_err("dnstap io: failed to connect to \"%s\": %s",
1984 			to, sock_strerror(errno));
1985 		dtio_close_fd(dtio);
1986 		return 0;
1987 	}
1988 	return 1;
1989 #else
1990 	log_err("cannot create af_local socket");
1991 	return 0;
1992 #endif /* HAVE_SYS_UN_H */
1993 }
1994 
1995 /** open the output file descriptor for af_inet and af_inet6 */
dtio_open_output_tcp(struct dt_io_thread * dtio)1996 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1997 {
1998 	struct sockaddr_storage addr;
1999 	socklen_t addrlen;
2000 	memset(&addr, 0, sizeof(addr));
2001 	addrlen = (socklen_t)sizeof(addr);
2002 
2003 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
2004 		log_err("could not parse IP '%s'", dtio->ip_str);
2005 		return 0;
2006 	}
2007 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
2008 	if(dtio->fd == -1) {
2009 		log_err("can't create socket: %s", sock_strerror(errno));
2010 		return 0;
2011 	}
2012 	fd_set_nonblock(dtio->fd);
2013 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
2014 		if(errno == EINPROGRESS)
2015 			return 1; /* wait until connect done*/
2016 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
2017 			verbosity < 4) {
2018 			dtio_close_fd(dtio);
2019 			return 0; /* no log retries on low verbosity */
2020 		}
2021 #ifndef USE_WINSOCK
2022 		if(tcp_connect_errno_needs_log(
2023 			(struct sockaddr *)&addr, addrlen)) {
2024 			log_err("dnstap io: failed to connect to %s: %s",
2025 				dtio->ip_str, strerror(errno));
2026 		}
2027 #else
2028 		if(WSAGetLastError() == WSAEINPROGRESS ||
2029 			WSAGetLastError() == WSAEWOULDBLOCK)
2030 			return 1; /* wait until connect done*/
2031 		if(tcp_connect_errno_needs_log(
2032 			(struct sockaddr *)&addr, addrlen)) {
2033 			log_err("dnstap io: failed to connect to %s: %s",
2034 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
2035 		}
2036 #endif
2037 		dtio_close_fd(dtio);
2038 		return 0;
2039 	}
2040 	return 1;
2041 }
2042 
2043 /** setup the SSL structure for new connection */
dtio_setup_ssl(struct dt_io_thread * dtio)2044 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2045 {
2046 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2047 	if(!dtio->ssl) return 0;
2048 	dtio->ssl_handshake_done = 0;
2049 	dtio->ssl_brief_read = 0;
2050 
2051 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2052 		dtio->tls_use_sni)) {
2053 		return 0;
2054 	}
2055 	return 1;
2056 }
2057 
2058 /** open the output file descriptor */
dtio_open_output(struct dt_io_thread * dtio)2059 static void dtio_open_output(struct dt_io_thread* dtio)
2060 {
2061 	struct ub_event* ev;
2062 	if(dtio->upstream_is_unix) {
2063 		if(!dtio_open_output_local(dtio)) {
2064 			dtio_reconnect_enable(dtio);
2065 			return;
2066 		}
2067 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2068 		if(!dtio_open_output_tcp(dtio)) {
2069 			dtio_reconnect_enable(dtio);
2070 			return;
2071 		}
2072 		if(dtio->upstream_is_tls) {
2073 			if(!dtio_setup_ssl(dtio)) {
2074 				dtio_close_fd(dtio);
2075 				dtio_reconnect_enable(dtio);
2076 				return;
2077 			}
2078 		}
2079 	}
2080 	dtio->check_nb_connect = 1;
2081 
2082 	/* the EV_READ is to read ACCEPT control messages, and catch channel
2083 	 * close. EV_WRITE is to write packets */
2084 	ev = ub_event_new(dtio->event_base, dtio->fd,
2085 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2086 		dtio);
2087 	if(!ev) {
2088 		log_err("dnstap io: out of memory");
2089 		if(dtio->ssl) {
2090 #ifdef HAVE_SSL
2091 			SSL_free(dtio->ssl);
2092 			dtio->ssl = NULL;
2093 #endif
2094 		}
2095 		dtio_close_fd(dtio);
2096 		dtio_reconnect_enable(dtio);
2097 		return;
2098 	}
2099 	dtio->event = ev;
2100 
2101 	/* setup protocol control message to start */
2102 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2103 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2104 		log_err("dnstap io: out of memory");
2105 		ub_event_free(dtio->event);
2106 		dtio->event = NULL;
2107 		if(dtio->ssl) {
2108 #ifdef HAVE_SSL
2109 			SSL_free(dtio->ssl);
2110 			dtio->ssl = NULL;
2111 #endif
2112 		}
2113 		dtio_close_fd(dtio);
2114 		dtio_reconnect_enable(dtio);
2115 		return;
2116 	}
2117 }
2118 
2119 /** perform the setup of the writer thread on the established event_base */
dtio_setup_on_base(struct dt_io_thread * dtio)2120 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2121 {
2122 	dtio_setup_cmd(dtio);
2123 	dtio_setup_reconnect(dtio);
2124 	dtio_open_output(dtio);
2125 	if(!dtio_add_output_event_write(dtio))
2126 		return;
2127 }
2128 
2129 #ifndef THREADS_DISABLED
2130 /** the IO thread function for the DNSTAP IO */
dnstap_io(void * arg)2131 static void* dnstap_io(void* arg)
2132 {
2133 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2134 	time_t secs = 0;
2135 	struct timeval now;
2136 	const char name[16] = "unbound/dnstap"; /* seems to be the safest size
2137 						   between different OSes */
2138 
2139 #if defined(HAVE_GETTID) && !defined(THREADS_DISABLED)
2140 	dtio->thread_tid = gettid();
2141 	if(dtio->thread_tid_log)
2142 		log_thread_set(&dtio->thread_tid);
2143 	else
2144 #endif
2145 		log_thread_set(&dtio->threadnum);
2146 
2147 	ub_thread_setname(dtio->tid, name);
2148 
2149 	/* setup */
2150 	verbose(VERB_ALGO, "start dnstap io thread");
2151 	dtio_setup_base(dtio, &secs, &now);
2152 	dtio_setup_on_base(dtio);
2153 
2154 	/* run */
2155 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2156 		log_err("dnstap io: dispatch failed, errno is %s",
2157 			strerror(errno));
2158 	}
2159 
2160 	/* cleanup */
2161 	verbose(VERB_ALGO, "stop dnstap io thread");
2162 	dtio_desetup(dtio);
2163 	return NULL;
2164 }
2165 #endif /* THREADS_DISABLED */
2166 
dt_io_thread_start(struct dt_io_thread * dtio,void * event_base_nothr,int numworkers)2167 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2168 	int numworkers)
2169 {
2170 	/* set up the thread, can fail */
2171 #ifndef USE_WINSOCK
2172 	if(pipe(dtio->commandpipe) == -1) {
2173 		log_err("failed to create pipe: %s", strerror(errno));
2174 		return 0;
2175 	}
2176 #else
2177 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2178 		log_err("failed to create _pipe: %s",
2179 			wsa_strerror(WSAGetLastError()));
2180 		return 0;
2181 	}
2182 #endif
2183 
2184 	/* start the thread */
2185 	dtio->threadnum = numworkers+1;
2186 	dtio->started = 1;
2187 #ifndef THREADS_DISABLED
2188 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2189 	(void)event_base_nothr;
2190 #else
2191 	dtio->event_base = event_base_nothr;
2192 	dtio_setup_on_base(dtio);
2193 #endif
2194 	return 1;
2195 }
2196 
dt_io_thread_stop(struct dt_io_thread * dtio)2197 void dt_io_thread_stop(struct dt_io_thread* dtio)
2198 {
2199 #ifndef THREADS_DISABLED
2200 	uint8_t cmd = DTIO_COMMAND_STOP;
2201 #endif
2202 	if(!dtio) return;
2203 	if(!dtio->started) return;
2204 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2205 
2206 #ifndef THREADS_DISABLED
2207 	while(1) {
2208 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2209 		if(r == -1) {
2210 #ifndef USE_WINSOCK
2211 			if(errno == EINTR || errno == EAGAIN)
2212 				continue;
2213 #else
2214 			if(WSAGetLastError() == WSAEINPROGRESS)
2215 				continue;
2216 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2217 				continue;
2218 #endif
2219 			log_err("dnstap io stop: write: %s",
2220 				sock_strerror(errno));
2221 			break;
2222 		}
2223 		break;
2224 	}
2225 	dtio->started = 0;
2226 #endif /* THREADS_DISABLED */
2227 
2228 #ifndef USE_WINSOCK
2229 	close(dtio->commandpipe[1]);
2230 #else
2231 	_close(dtio->commandpipe[1]);
2232 #endif
2233 	dtio->commandpipe[1] = -1;
2234 #ifndef THREADS_DISABLED
2235 	ub_thread_join(dtio->tid);
2236 #else
2237 	dtio->want_to_exit = 1;
2238 	dtio_desetup(dtio);
2239 #endif
2240 }
2241