xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision 3078531de10dcae44b253a35125c949ff4235284)
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62 
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73 
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76 
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 	/** DTIO command channel stop */
81 	DTIO_COMMAND_STOP = 0,
82 	/** DTIO command channel wakeup */
83 	DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85 
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102 
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 	if(!mq) return NULL;
108 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 		about 1 M should contain 64K messages with some overhead,
110 		or a whole bunch smaller ones */
111 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 	if(!mq->wakeup_timer) {
113 		free(mq);
114 		return NULL;
115 	}
116 	lock_basic_init(&mq->lock);
117 	lock_protect(&mq->lock, mq, sizeof(*mq));
118 	return mq;
119 }
120 
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 	struct dt_msg_entry* e = mq->first, *next=NULL;
126 	while(e) {
127 		next = e->next;
128 		free(e->buf);
129 		free(e);
130 		e = next;
131 	}
132 	mq->first = NULL;
133 	mq->last = NULL;
134 	mq->cursize = 0;
135 	mq->msgcount = 0;
136 }
137 
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 	if(!mq) return;
142 	lock_basic_destroy(&mq->lock);
143 	dt_msg_queue_clear(mq);
144 	comm_timer_delete(mq->wakeup_timer);
145 	free(mq);
146 }
147 
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 	if(!dtio) return;
153 	if(!dtio->started) return;
154 
155 	while(1) {
156 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 		if(r == -1) {
158 #ifndef USE_WINSOCK
159 			if(errno == EINTR || errno == EAGAIN)
160 				continue;
161 #else
162 			if(WSAGetLastError() == WSAEINPROGRESS)
163 				continue;
164 			if(WSAGetLastError() == WSAEWOULDBLOCK)
165 				continue;
166 #endif
167 			log_err("dnstap io wakeup: write: %s",
168 				sock_strerror(errno));
169 			break;
170 		}
171 		break;
172 	}
173 }
174 
175 void
176 mq_wakeup_cb(void* arg)
177 {
178 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 	/* even if the dtio is already active, because perhaps much
180 	 * traffic suddenly, we leave the timer running to save on
181 	 * managing it, the once a second timer is less work then
182 	 * starting and stopping the timer frequently */
183 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 	mq->dtio->wakeup_timer_enabled = 0;
185 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 	dtio_wakeup(mq->dtio);
187 }
188 
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
192 {
193 	struct timeval tv = {0};
194 	/* Start a timer to process messages to be logged.
195 	 * If we woke up the dtio thread for every message, the wakeup
196 	 * messages take up too much processing power.  If the queue
197 	 * fills up the wakeup happens immediately.  The timer wakes it up
198 	 * if there are infrequent messages to log. */
199 
200 	/* we cannot start a timer in dtio thread, because it is a different
201 	 * thread and its event base is in use by the other thread, it would
202 	 * give race conditions if we tried to modify its event base,
203 	 * and locks would wait until it woke up, and this is what we do. */
204 
205 	/* do not start the timer if a timer already exists, perhaps
206 	 * in another worker.  So this variable is protected by a lock in
207 	 * dtio. */
208 
209 	/* If we need to wakeupnow, 0 the timer to force the callback. */
210 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
211 	if(mq->dtio->wakeup_timer_enabled) {
212 		if(wakeupnow) {
213 			comm_timer_set(mq->wakeup_timer, &tv);
214 		}
215 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
216 		return;
217 	}
218 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
219 
220 	/* start the timer, in mq, in the event base of our worker */
221 	if(!wakeupnow) {
222 		tv.tv_sec = 1;
223 		tv.tv_usec = 0;
224 	}
225 	comm_timer_set(mq->wakeup_timer, &tv);
226 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
227 }
228 
229 void
230 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
231 {
232 	int wakeupnow = 0, wakeupstarttimer = 0;
233 	struct dt_msg_entry* entry;
234 
235 	/* check conditions */
236 	if(!buf) return;
237 	if(len == 0) {
238 		/* it is not possible to log entries with zero length,
239 		 * because the framestream protocol does not carry it.
240 		 * However the protobuf serialization does not create zero
241 		 * length datagrams for dnstap, so this should not happen. */
242 		free(buf);
243 		return;
244 	}
245 	if(!mq) {
246 		free(buf);
247 		return;
248 	}
249 
250 	/* allocate memory for queue entry */
251 	entry = malloc(sizeof(*entry));
252 	if(!entry) {
253 		log_err("out of memory logging dnstap");
254 		free(buf);
255 		return;
256 	}
257 	entry->next = NULL;
258 	entry->buf = buf;
259 	entry->len = len;
260 
261 	/* acquire lock */
262 	lock_basic_lock(&mq->lock);
263 	/* if list was empty, start timer for (eventual) wakeup */
264 	if(mq->first == NULL)
265 		wakeupstarttimer = 1;
266 	/* if list contains more than wakeupnum elements, wakeup now,
267 	 * or if list is (going to be) almost full */
268 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
269 		(mq->cursize < mq->maxsize * 9 / 10 &&
270 		mq->cursize+len >= mq->maxsize * 9 / 10))
271 		wakeupnow = 1;
272 	/* see if it is going to fit */
273 	if(mq->cursize + len > mq->maxsize) {
274 		/* buffer full, or congested. */
275 		/* drop */
276 		lock_basic_unlock(&mq->lock);
277 		free(buf);
278 		free(entry);
279 		return;
280 	}
281 	mq->cursize += len;
282 	mq->msgcount ++;
283 	/* append to list */
284 	if(mq->last) {
285 		mq->last->next = entry;
286 	} else {
287 		mq->first = entry;
288 	}
289 	mq->last = entry;
290 	/* release lock */
291 	lock_basic_unlock(&mq->lock);
292 
293 	if(wakeupnow || wakeupstarttimer) {
294 		dt_msg_queue_start_timer(mq, wakeupnow);
295 	}
296 }
297 
298 struct dt_io_thread* dt_io_thread_create(void)
299 {
300 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
301 	lock_basic_init(&dtio->wakeup_timer_lock);
302 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
303 		sizeof(dtio->wakeup_timer_enabled));
304 	return dtio;
305 }
306 
307 void dt_io_thread_delete(struct dt_io_thread* dtio)
308 {
309 	struct dt_io_list_item* item, *nextitem;
310 	if(!dtio) return;
311 	lock_basic_destroy(&dtio->wakeup_timer_lock);
312 	item=dtio->io_list;
313 	while(item) {
314 		nextitem = item->next;
315 		free(item);
316 		item = nextitem;
317 	}
318 	free(dtio->socket_path);
319 	free(dtio->ip_str);
320 	free(dtio->tls_server_name);
321 	free(dtio->client_key_file);
322 	free(dtio->client_cert_file);
323 	if(dtio->ssl_ctx) {
324 #ifdef HAVE_SSL
325 		SSL_CTX_free(dtio->ssl_ctx);
326 #endif
327 	}
328 	free(dtio);
329 }
330 
331 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
332 {
333 	if(!cfg->dnstap) {
334 		log_warn("cannot setup dnstap because dnstap-enable is no");
335 		return 0;
336 	}
337 
338 	/* what type of connectivity do we have */
339 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
340 		if(cfg->dnstap_tls)
341 			dtio->upstream_is_tls = 1;
342 		else	dtio->upstream_is_tcp = 1;
343 	} else {
344 		dtio->upstream_is_unix = 1;
345 	}
346 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
347 
348 	if(dtio->upstream_is_unix) {
349 		char* nm;
350 		if(!cfg->dnstap_socket_path ||
351 			cfg->dnstap_socket_path[0]==0) {
352 			log_err("dnstap setup: no dnstap-socket-path for "
353 				"socket connect");
354 			return 0;
355 		}
356 		nm = cfg->dnstap_socket_path;
357 		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
358 			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
359 			nm += strlen(cfg->chrootdir);
360 		free(dtio->socket_path);
361 		dtio->socket_path = strdup(nm);
362 		if(!dtio->socket_path) {
363 			log_err("dnstap setup: malloc failure");
364 			return 0;
365 		}
366 	}
367 
368 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
369 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
370 			log_err("dnstap setup: no dnstap-ip for TCP connect");
371 			return 0;
372 		}
373 		free(dtio->ip_str);
374 		dtio->ip_str = strdup(cfg->dnstap_ip);
375 		if(!dtio->ip_str) {
376 			log_err("dnstap setup: malloc failure");
377 			return 0;
378 		}
379 	}
380 
381 	if(dtio->upstream_is_tls) {
382 #ifdef HAVE_SSL
383 		if(cfg->dnstap_tls_server_name &&
384 			cfg->dnstap_tls_server_name[0]) {
385 			free(dtio->tls_server_name);
386 			dtio->tls_server_name = strdup(
387 				cfg->dnstap_tls_server_name);
388 			if(!dtio->tls_server_name) {
389 				log_err("dnstap setup: malloc failure");
390 				return 0;
391 			}
392 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
393 				return 0;
394 		}
395 		if(cfg->dnstap_tls_client_key_file &&
396 			cfg->dnstap_tls_client_key_file[0]) {
397 			dtio->use_client_certs = 1;
398 			free(dtio->client_key_file);
399 			dtio->client_key_file = strdup(
400 				cfg->dnstap_tls_client_key_file);
401 			if(!dtio->client_key_file) {
402 				log_err("dnstap setup: malloc failure");
403 				return 0;
404 			}
405 			if(!cfg->dnstap_tls_client_cert_file ||
406 				cfg->dnstap_tls_client_cert_file[0]==0) {
407 				log_err("dnstap setup: client key "
408 					"authentication enabled with "
409 					"dnstap-tls-client-key-file, but "
410 					"no dnstap-tls-client-cert-file "
411 					"is given");
412 				return 0;
413 			}
414 			free(dtio->client_cert_file);
415 			dtio->client_cert_file = strdup(
416 				cfg->dnstap_tls_client_cert_file);
417 			if(!dtio->client_cert_file) {
418 				log_err("dnstap setup: malloc failure");
419 				return 0;
420 			}
421 		} else {
422 			dtio->use_client_certs = 0;
423 			dtio->client_key_file = NULL;
424 			dtio->client_cert_file = NULL;
425 		}
426 
427 		if(cfg->dnstap_tls_cert_bundle) {
428 			dtio->ssl_ctx = connect_sslctx_create(
429 				dtio->client_key_file,
430 				dtio->client_cert_file,
431 				cfg->dnstap_tls_cert_bundle, 0);
432 		} else {
433 			dtio->ssl_ctx = connect_sslctx_create(
434 				dtio->client_key_file,
435 				dtio->client_cert_file,
436 				cfg->tls_cert_bundle, cfg->tls_win_cert);
437 		}
438 		if(!dtio->ssl_ctx) {
439 			log_err("could not setup SSL CTX");
440 			return 0;
441 		}
442 		dtio->tls_use_sni = cfg->tls_use_sni;
443 #endif /* HAVE_SSL */
444 	}
445 	return 1;
446 }
447 
448 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449         struct dt_msg_queue* mq)
450 {
451 	struct dt_io_list_item* item = malloc(sizeof(*item));
452 	if(!item) return 0;
453 	lock_basic_lock(&mq->lock);
454 	mq->dtio = dtio;
455 	lock_basic_unlock(&mq->lock);
456 	item->queue = mq;
457 	item->next = dtio->io_list;
458 	dtio->io_list = item;
459 	dtio->io_list_iter = NULL;
460 	return 1;
461 }
462 
463 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
464         struct dt_msg_queue* mq)
465 {
466 	struct dt_io_list_item* item, *prev=NULL;
467 	if(!dtio) return;
468 	item = dtio->io_list;
469 	while(item) {
470 		if(item->queue == mq) {
471 			/* found it */
472 			if(prev) prev->next = item->next;
473 			else dtio->io_list = item->next;
474 			/* the queue itself only registered, not deleted */
475 			lock_basic_lock(&item->queue->lock);
476 			item->queue->dtio = NULL;
477 			lock_basic_unlock(&item->queue->lock);
478 			free(item);
479 			dtio->io_list_iter = NULL;
480 			return;
481 		}
482 		prev = item;
483 		item = item->next;
484 	}
485 }
486 
487 /** pick a message from the queue, the routine locks and unlocks,
488  * returns true if there is a message */
489 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
490 	size_t* len)
491 {
492 	lock_basic_lock(&mq->lock);
493 	if(mq->first) {
494 		struct dt_msg_entry* entry = mq->first;
495 		mq->first = entry->next;
496 		if(!entry->next) mq->last = NULL;
497 		mq->cursize -= entry->len;
498 		mq->msgcount --;
499 		lock_basic_unlock(&mq->lock);
500 
501 		*buf = entry->buf;
502 		*len = entry->len;
503 		free(entry);
504 		return 1;
505 	}
506 	lock_basic_unlock(&mq->lock);
507 	return 0;
508 }
509 
510 /** find message in queue, false if no message, true if message to send */
511 static int dtio_find_in_queue(struct dt_io_thread* dtio,
512 	struct dt_msg_queue* mq)
513 {
514 	void* buf=NULL;
515 	size_t len=0;
516 	if(dt_msg_queue_pop(mq, &buf, &len)) {
517 		dtio->cur_msg = buf;
518 		dtio->cur_msg_len = len;
519 		dtio->cur_msg_done = 0;
520 		dtio->cur_msg_len_done = 0;
521 		return 1;
522 	}
523 	return 0;
524 }
525 
526 /** find a new message to write, search message queues, false if none */
527 static int dtio_find_msg(struct dt_io_thread* dtio)
528 {
529 	struct dt_io_list_item *spot, *item;
530 
531 	spot = dtio->io_list_iter;
532 	/* use the next queue for the next message lookup,
533 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
534 	if(spot)
535 		dtio->io_list_iter = spot->next;
536 	else if(dtio->io_list)
537 		dtio->io_list_iter = dtio->io_list->next;
538 
539 	/* scan from spot to end-of-io_list */
540 	item = spot;
541 	while(item) {
542 		if(dtio_find_in_queue(dtio, item->queue))
543 			return 1;
544 		item = item->next;
545 	}
546 	/* scan starting at the start-of-list (to wrap around the end) */
547 	item = dtio->io_list;
548 	while(item) {
549 		if(dtio_find_in_queue(dtio, item->queue))
550 			return 1;
551 		item = item->next;
552 	}
553 	return 0;
554 }
555 
556 /** callback for the dnstap reconnect, to start reconnecting to output */
557 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
558 	short ATTR_UNUSED(bits), void* arg)
559 {
560 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
561 	dtio->reconnect_is_added = 0;
562 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
563 
564 	dtio_open_output(dtio);
565 	if(dtio->event) {
566 		if(!dtio_add_output_event_write(dtio))
567 			return;
568 		/* nothing wrong so far, wait on the output event */
569 		return;
570 	}
571 	/* exponential backoff and retry on timer */
572 	dtio_reconnect_enable(dtio);
573 }
574 
575 /** attempt to reconnect to the output, after a timeout */
576 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
577 {
578 	struct timeval tv;
579 	int msec;
580 	if(dtio->want_to_exit) return;
581 	if(dtio->reconnect_is_added)
582 		return; /* already done */
583 
584 	/* exponential backoff, store the value for next timeout */
585 	msec = dtio->reconnect_timeout;
586 	if(msec == 0) {
587 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
588 	} else {
589 		dtio->reconnect_timeout = msec*2;
590 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
591 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
592 	}
593 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
594 		msec);
595 
596 	/* setup wait timer */
597 	memset(&tv, 0, sizeof(tv));
598 	tv.tv_sec = msec/1000;
599 	tv.tv_usec = (msec%1000)*1000;
600 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
601 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
602 		log_err("dnstap io: could not reconnect ev timer add");
603 		return;
604 	}
605 	dtio->reconnect_is_added = 1;
606 }
607 
608 /** remove dtio reconnect timer */
609 static void dtio_reconnect_del(struct dt_io_thread* dtio)
610 {
611 	if(!dtio->reconnect_is_added)
612 		return;
613 	ub_timer_del(dtio->reconnect_timer);
614 	dtio->reconnect_is_added = 0;
615 }
616 
617 /** clear the reconnect exponential backoff timer.
618  * We have successfully connected so we can try again with short timeouts. */
619 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
620 {
621 	dtio->reconnect_timeout = 0;
622 	dtio_reconnect_del(dtio);
623 }
624 
625 /** reconnect slowly, because we already know we have to wait for a bit */
626 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
627 {
628 	dtio_reconnect_del(dtio);
629 	dtio->reconnect_timeout = msec;
630 	dtio_reconnect_enable(dtio);
631 }
632 
633 /** delete the current message in the dtio, and reset counters */
634 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
635 {
636 	free(dtio->cur_msg);
637 	dtio->cur_msg = NULL;
638 	dtio->cur_msg_len = 0;
639 	dtio->cur_msg_done = 0;
640 	dtio->cur_msg_len_done = 0;
641 }
642 
643 /** delete the buffer and counters used to read frame */
644 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
645 {
646 	if(rb->buf) {
647 		free(rb->buf);
648 		rb->buf = NULL;
649 	}
650 	rb->buf_count = 0;
651 	rb->buf_cap = 0;
652 	rb->frame_len = 0;
653 	rb->frame_len_done = 0;
654 	rb->control_frame = 0;
655 }
656 
657 /** del the output file descriptor event for listening */
658 static void dtio_del_output_event(struct dt_io_thread* dtio)
659 {
660 	if(!dtio->event_added)
661 		return;
662 	ub_event_del(dtio->event);
663 	dtio->event_added = 0;
664 	dtio->event_added_is_write = 0;
665 }
666 
667 /** close dtio socket and set it to -1 */
668 static void dtio_close_fd(struct dt_io_thread* dtio)
669 {
670 	sock_close(dtio->fd);
671 	dtio->fd = -1;
672 }
673 
674 /** close and stop the output file descriptor event */
675 static void dtio_close_output(struct dt_io_thread* dtio)
676 {
677 	if(!dtio->event)
678 		return;
679 	ub_event_free(dtio->event);
680 	dtio->event = NULL;
681 	if(dtio->ssl) {
682 #ifdef HAVE_SSL
683 		SSL_shutdown(dtio->ssl);
684 		SSL_free(dtio->ssl);
685 		dtio->ssl = NULL;
686 #endif
687 	}
688 	dtio_close_fd(dtio);
689 
690 	/* if there is a (partial) message, discard it
691 	 * we cannot send (the remainder of) it, and a new
692 	 * connection needs to start with a control frame. */
693 	if(dtio->cur_msg) {
694 		dtio_cur_msg_free(dtio);
695 	}
696 
697 	dtio->ready_frame_sent = 0;
698 	dtio->accept_frame_received = 0;
699 	dtio_read_frame_free(&dtio->read_frame);
700 
701 	dtio_reconnect_enable(dtio);
702 }
703 
704 /** check for pending nonblocking connect errors,
705  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
706 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
707 {
708 	int error = 0;
709 	socklen_t len = (socklen_t)sizeof(error);
710 	if(!dtio->check_nb_connect)
711 		return 1; /* everything okay */
712 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
713 		&len) < 0) {
714 #ifndef USE_WINSOCK
715 		error = errno; /* on solaris errno is error */
716 #else
717 		error = WSAGetLastError();
718 #endif
719 	}
720 #ifndef USE_WINSOCK
721 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
722 	if(error == EINPROGRESS || error == EWOULDBLOCK)
723 		return 0; /* try again later */
724 #endif
725 #else
726 	if(error == WSAEINPROGRESS) {
727 		return 0; /* try again later */
728 	} else if(error == WSAEWOULDBLOCK) {
729 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
730 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
731 		return 0; /* try again later */
732 	}
733 #endif
734 	if(error != 0) {
735 		char* to = dtio->socket_path;
736 		if(!to) to = dtio->ip_str;
737 		if(!to) to = "";
738 		log_err("dnstap io: failed to connect to \"%s\": %s",
739 			to, sock_strerror(error));
740 		return -1; /* error, close it */
741 	}
742 
743 	if(dtio->ip_str)
744 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
745 			dtio->ip_str);
746 	else if(dtio->socket_path)
747 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
748 			dtio->socket_path);
749 	dtio_reconnect_clear(dtio);
750 	dtio->check_nb_connect = 0;
751 	return 1; /* everything okay */
752 }
753 
754 #ifdef HAVE_SSL
755 /** write to ssl output
756  * returns number of bytes written, 0 if nothing happened,
757  * try again later, or -1 if the channel is to be closed. */
758 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
759 	size_t len)
760 {
761 	int r;
762 	ERR_clear_error();
763 	r = SSL_write(dtio->ssl, buf, len);
764 	if(r <= 0) {
765 		int want = SSL_get_error(dtio->ssl, r);
766 		if(want == SSL_ERROR_ZERO_RETURN) {
767 			/* closed */
768 			return -1;
769 		} else if(want == SSL_ERROR_WANT_READ) {
770 			/* we want a brief read event */
771 			dtio_enable_brief_read(dtio);
772 			return 0;
773 		} else if(want == SSL_ERROR_WANT_WRITE) {
774 			/* write again later */
775 			return 0;
776 		} else if(want == SSL_ERROR_SYSCALL) {
777 #ifdef EPIPE
778 			if(errno == EPIPE && verbosity < 2)
779 				return -1; /* silence 'broken pipe' */
780 #endif
781 #ifdef ECONNRESET
782 			if(errno == ECONNRESET && verbosity < 2)
783 				return -1; /* silence reset by peer */
784 #endif
785 			if(errno != 0) {
786 				log_err("dnstap io, SSL_write syscall: %s",
787 					strerror(errno));
788 			}
789 			return -1;
790 		}
791 		log_crypto_err("dnstap io, could not SSL_write");
792 		return -1;
793 	}
794 	return r;
795 }
796 #endif /* HAVE_SSL */
797 
798 /** write buffer to output.
799  * returns number of bytes written, 0 if nothing happened,
800  * try again later, or -1 if the channel is to be closed. */
801 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
802 	size_t len)
803 {
804 	ssize_t ret;
805 	if(dtio->fd == -1)
806 		return -1;
807 #ifdef HAVE_SSL
808 	if(dtio->ssl)
809 		return dtio_write_ssl(dtio, buf, len);
810 #endif
811 	ret = send(dtio->fd, (void*)buf, len, 0);
812 	if(ret == -1) {
813 #ifndef USE_WINSOCK
814 		if(errno == EINTR || errno == EAGAIN)
815 			return 0;
816 #else
817 		if(WSAGetLastError() == WSAEINPROGRESS)
818 			return 0;
819 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
820 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
821 				dtio->stop_flush_event:dtio->event),
822 				UB_EV_WRITE);
823 			return 0;
824 		}
825 #endif
826 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
827 		return -1;
828 	}
829 	return ret;
830 }
831 
832 #ifdef HAVE_WRITEV
833 /** write with writev, len and message, in one write, if possible.
834  * return true if message is done, false if incomplete */
835 static int dtio_write_with_writev(struct dt_io_thread* dtio)
836 {
837 	uint32_t sendlen = htonl(dtio->cur_msg_len);
838 	struct iovec iov[2];
839 	ssize_t r;
840 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
841 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
842 	iov[1].iov_base = dtio->cur_msg;
843 	iov[1].iov_len = dtio->cur_msg_len;
844 	log_assert(iov[0].iov_len > 0);
845 	r = writev(dtio->fd, iov, 2);
846 	if(r == -1) {
847 #ifndef USE_WINSOCK
848 		if(errno == EINTR || errno == EAGAIN)
849 			return 0;
850 #else
851 		if(WSAGetLastError() == WSAEINPROGRESS)
852 			return 0;
853 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
854 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
855 				dtio->stop_flush_event:dtio->event),
856 				UB_EV_WRITE);
857 			return 0;
858 		}
859 #endif
860 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
861 		/* close the channel */
862 		dtio_del_output_event(dtio);
863 		dtio_close_output(dtio);
864 		return 0;
865 	}
866 	/* written r bytes */
867 	dtio->cur_msg_len_done += r;
868 	if(dtio->cur_msg_len_done < 4)
869 		return 0;
870 	if(dtio->cur_msg_len_done > 4) {
871 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
872 		dtio->cur_msg_len_done = 4;
873 	}
874 	if(dtio->cur_msg_done < dtio->cur_msg_len)
875 		return 0;
876 	return 1;
877 }
878 #endif /* HAVE_WRITEV */
879 
880 /** write more of the length, preceding the data frame.
881  * return true if message is done, false if incomplete. */
882 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
883 {
884 	uint32_t sendlen;
885 	int r;
886 	if(dtio->cur_msg_len_done >= 4)
887 		return 1;
888 #ifdef HAVE_WRITEV
889 	if(!dtio->ssl) {
890 		/* we try writev for everything.*/
891 		return dtio_write_with_writev(dtio);
892 	}
893 #endif /* HAVE_WRITEV */
894 	sendlen = htonl(dtio->cur_msg_len);
895 	r = dtio_write_buf(dtio,
896 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
897 		sizeof(sendlen)-dtio->cur_msg_len_done);
898 	if(r == -1) {
899 		/* close the channel */
900 		dtio_del_output_event(dtio);
901 		dtio_close_output(dtio);
902 		return 0;
903 	} else if(r == 0) {
904 		/* try again later */
905 		return 0;
906 	}
907 	dtio->cur_msg_len_done += r;
908 	if(dtio->cur_msg_len_done < 4)
909 		return 0;
910 	return 1;
911 }
912 
913 /** write more of the data frame.
914  * return true if message is done, false if incomplete. */
915 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
916 {
917 	int r;
918 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
919 		return 1;
920 	r = dtio_write_buf(dtio,
921 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
922 		dtio->cur_msg_len - dtio->cur_msg_done);
923 	if(r == -1) {
924 		/* close the channel */
925 		dtio_del_output_event(dtio);
926 		dtio_close_output(dtio);
927 		return 0;
928 	} else if(r == 0) {
929 		/* try again later */
930 		return 0;
931 	}
932 	dtio->cur_msg_done += r;
933 	if(dtio->cur_msg_done < dtio->cur_msg_len)
934 		return 0;
935 	return 1;
936 }
937 
938 /** write more of the current message. false if incomplete, true if
939  * the message is done */
940 static int dtio_write_more(struct dt_io_thread* dtio)
941 {
942 	if(dtio->cur_msg_len_done < 4) {
943 		if(!dtio_write_more_of_len(dtio))
944 			return 0;
945 	}
946 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
947 		if(!dtio_write_more_of_data(dtio))
948 			return 0;
949 	}
950 	return 1;
951 }
952 
953 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
954  * -1: continue, >0: number of bytes read into buffer */
955 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
956 	ssize_t r;
957 	r = recv(dtio->fd, (void*)buf, len, 0);
958 	if(r == -1) {
959 		char* to = dtio->socket_path;
960 		if(!to) to = dtio->ip_str;
961 		if(!to) to = "";
962 #ifndef USE_WINSOCK
963 		if(errno == EINTR || errno == EAGAIN)
964 			return -1; /* try later */
965 #else
966 		if(WSAGetLastError() == WSAEINPROGRESS) {
967 			return -1; /* try later */
968 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
969 			ub_winsock_tcp_wouldblock(
970 				(dtio->stop_flush_event?
971 				dtio->stop_flush_event:dtio->event),
972 				UB_EV_READ);
973 			return -1; /* try later */
974 		}
975 #endif
976 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977 			verbosity < 4)
978 			return 0; /* no log retries on low verbosity */
979 		log_err("dnstap io: output closed, recv %s: %s", to,
980 			strerror(errno));
981 		/* and close below */
982 		return 0;
983 	}
984 	if(r == 0) {
985 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986 			verbosity < 4)
987 			return 0; /* no log retries on low verbosity */
988 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
989 		/* and close below */
990 		return 0;
991 	}
992 	/* something was received */
993 	return r;
994 }
995 
996 #ifdef HAVE_SSL
997 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
998  * -1: continue, >0: number of bytes read into buffer */
999 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1000 {
1001 	int r;
1002 	ERR_clear_error();
1003 	r = SSL_read(dtio->ssl, buf, len);
1004 	if(r <= 0) {
1005 		int want = SSL_get_error(dtio->ssl, r);
1006 		if(want == SSL_ERROR_ZERO_RETURN) {
1007 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1008 				verbosity < 4)
1009 				return 0; /* no log retries on low verbosity */
1010 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1011 				"other side");
1012 			return 0;
1013 		} else if(want == SSL_ERROR_WANT_READ) {
1014 			/* continue later */
1015 			return -1;
1016 		} else if(want == SSL_ERROR_WANT_WRITE) {
1017 			(void)dtio_enable_brief_write(dtio);
1018 			return -1;
1019 		} else if(want == SSL_ERROR_SYSCALL) {
1020 #ifdef ECONNRESET
1021 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1022 				errno == ECONNRESET && verbosity < 4)
1023 				return 0; /* silence reset by peer */
1024 #endif
1025 			if(errno != 0)
1026 				log_err("SSL_read syscall: %s",
1027 					strerror(errno));
1028 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029 				"other side");
1030 			return 0;
1031 		}
1032 		log_crypto_err("could not SSL_read");
1033 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1034 				"other side");
1035 		return 0;
1036 	}
1037 	return r;
1038 }
1039 #endif /* HAVE_SSL */
1040 
1041 /** check if the output fd has been closed,
1042  * it returns false if the stream is closed. */
1043 static int dtio_check_close(struct dt_io_thread* dtio)
1044 {
1045 	/* we don't want to read any packets, but if there are we can
1046 	 * discard the input (ignore it).  Ignore of unknown (control)
1047 	 * packets is okay for the framestream protocol.  And also, the
1048 	 * read call can return that the stream has been closed by the
1049 	 * other side. */
1050 	uint8_t buf[1024];
1051 	int r = -1;
1052 
1053 
1054 	if(dtio->fd == -1) return 0;
1055 
1056 	while(r != 0) {
1057 		/* not interested in buffer content, overwrite */
1058 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1059 		if(r == -1)
1060 			return 1;
1061 	}
1062 	/* the other end has been closed */
1063 	/* close the channel */
1064 	dtio_del_output_event(dtio);
1065 	dtio_close_output(dtio);
1066 	return 0;
1067 }
1068 
1069 /** Read accept frame. Returns -1: continue reading, 0: closed,
1070  * 1: valid accept received. */
1071 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1072 {
1073 	int r;
1074 	size_t read_frame_done;
1075 	while(dtio->read_frame.frame_len_done < 4) {
1076 #ifdef HAVE_SSL
1077 		if(dtio->ssl) {
1078 			r = ssl_read_bytes(dtio,
1079 				(uint8_t*)&dtio->read_frame.frame_len+
1080 				dtio->read_frame.frame_len_done,
1081 				4-dtio->read_frame.frame_len_done);
1082 		} else {
1083 #endif
1084 			r = receive_bytes(dtio,
1085 				(uint8_t*)&dtio->read_frame.frame_len+
1086 				dtio->read_frame.frame_len_done,
1087 				4-dtio->read_frame.frame_len_done);
1088 #ifdef HAVE_SSL
1089 		}
1090 #endif
1091 		if(r == -1)
1092 			return -1; /* continue reading */
1093 		if(r == 0) {
1094 			 /* connection closed */
1095 			goto close_connection;
1096 		}
1097 		dtio->read_frame.frame_len_done += r;
1098 		if(dtio->read_frame.frame_len_done < 4)
1099 			return -1; /* continue reading */
1100 
1101 		if(dtio->read_frame.frame_len == 0) {
1102 			dtio->read_frame.frame_len_done = 0;
1103 			dtio->read_frame.control_frame = 1;
1104 			continue;
1105 		}
1106 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1107 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1109 				"length of %d bytes, closing connection",
1110 				DTIO_RECV_FRAME_MAX_LEN);
1111 			goto close_connection;
1112 		}
1113 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1114 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1115 		if(!dtio->read_frame.buf) {
1116 			log_err("dnstap io: out of memory (creating read "
1117 				"buffer)");
1118 			goto close_connection;
1119 		}
1120 	}
1121 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1122 #ifdef HAVE_SSL
1123 		if(dtio->ssl) {
1124 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1125 				dtio->read_frame.buf_count,
1126 				dtio->read_frame.buf_cap-
1127 				dtio->read_frame.buf_count);
1128 		} else {
1129 #endif
1130 			r = receive_bytes(dtio, dtio->read_frame.buf+
1131 				dtio->read_frame.buf_count,
1132 				dtio->read_frame.buf_cap-
1133 				dtio->read_frame.buf_count);
1134 #ifdef HAVE_SSL
1135 		}
1136 #endif
1137 		if(r == -1)
1138 			return -1; /* continue reading */
1139 		if(r == 0) {
1140 			 /* connection closed */
1141 			goto close_connection;
1142 		}
1143 		dtio->read_frame.buf_count += r;
1144 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145 			return -1; /* continue reading */
1146 	}
1147 
1148 	/* Complete frame received, check if this is a valid ACCEPT control
1149 	 * frame. */
1150 	if(dtio->read_frame.frame_len < 4) {
1151 		verbose(VERB_OPS, "dnstap: invalid data received");
1152 		goto close_connection;
1153 	}
1154 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1155 		FSTRM_CONTROL_FRAME_ACCEPT) {
1156 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1157 			"ignored");
1158 		dtio->ready_frame_sent = 0;
1159 		dtio->accept_frame_received = 0;
1160 		dtio_read_frame_free(&dtio->read_frame);
1161 		return -1;
1162 	}
1163 	read_frame_done = 4; /* control frame type */
1164 
1165 	/* Iterate over control fields, ignore unknown types.
1166 	 * Need to be able to read at least 8 bytes (control field type +
1167 	 * length). */
1168 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1169 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1170 			read_frame_done);
1171 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1172 			read_frame_done + 4);
1173 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1174 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1175 				read_frame_done+8+len <=
1176 				dtio->read_frame.frame_len &&
1177 				memcmp(dtio->read_frame.buf + read_frame_done +
1178 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1179 				if(!dtio_control_start_send(dtio)) {
1180 					verbose(VERB_OPS, "dnstap io: out of "
1181 					 "memory while sending START frame");
1182 					goto close_connection;
1183 				}
1184 				dtio->accept_frame_received = 1;
1185 				if(!dtio_add_output_event_write(dtio))
1186 					goto close_connection;
1187 				return 1;
1188 			} else {
1189 				/* unknown content type */
1190 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1191 					"contains unknown content type, "
1192 					"closing connection");
1193 				goto close_connection;
1194 			}
1195 		}
1196 		/* unknown option, try next */
1197 		read_frame_done += 8+len;
1198 	}
1199 
1200 
1201 close_connection:
1202 	dtio_del_output_event(dtio);
1203 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1204 	dtio_close_output(dtio);
1205 	return 0;
1206 }
1207 
1208 /** add the output file descriptor event for listening, read only */
1209 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1210 {
1211 	if(!dtio->event)
1212 		return 0;
1213 	if(dtio->event_added && !dtio->event_added_is_write)
1214 		return 1;
1215 	/* we have to (re-)register the event */
1216 	if(dtio->event_added)
1217 		ub_event_del(dtio->event);
1218 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1219 	if(ub_event_add(dtio->event, NULL) != 0) {
1220 		log_err("dnstap io: out of memory (adding event)");
1221 		dtio->event_added = 0;
1222 		dtio->event_added_is_write = 0;
1223 		/* close output and start reattempts to open it */
1224 		dtio_close_output(dtio);
1225 		return 0;
1226 	}
1227 	dtio->event_added = 1;
1228 	dtio->event_added_is_write = 0;
1229 	return 1;
1230 }
1231 
1232 /** add the output file descriptor event for listening, read and write */
1233 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1234 {
1235 	if(!dtio->event)
1236 		return 0;
1237 	if(dtio->event_added && dtio->event_added_is_write)
1238 		return 1;
1239 	/* we have to (re-)register the event */
1240 	if(dtio->event_added)
1241 		ub_event_del(dtio->event);
1242 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1243 	if(ub_event_add(dtio->event, NULL) != 0) {
1244 		log_err("dnstap io: out of memory (adding event)");
1245 		dtio->event_added = 0;
1246 		dtio->event_added_is_write = 0;
1247 		/* close output and start reattempts to open it */
1248 		dtio_close_output(dtio);
1249 		return 0;
1250 	}
1251 	dtio->event_added = 1;
1252 	dtio->event_added_is_write = 1;
1253 	return 1;
1254 }
1255 
1256 /** put the dtio thread to sleep */
1257 static void dtio_sleep(struct dt_io_thread* dtio)
1258 {
1259 	/* unregister the event polling for write, because there is
1260 	 * nothing to be written */
1261 	(void)dtio_add_output_event_read(dtio);
1262 }
1263 
1264 #ifdef HAVE_SSL
1265 /** enable the brief read condition */
1266 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1267 {
1268 	dtio->ssl_brief_read = 1;
1269 	if(dtio->stop_flush_event) {
1270 		ub_event_del(dtio->stop_flush_event);
1271 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1272 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273 			log_err("dnstap io, stop flush, could not ub_event_add");
1274 			return 0;
1275 		}
1276 		return 1;
1277 	}
1278 	return dtio_add_output_event_read(dtio);
1279 }
1280 #endif /* HAVE_SSL */
1281 
1282 #ifdef HAVE_SSL
1283 /** disable the brief read condition */
1284 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1285 {
1286 	dtio->ssl_brief_read = 0;
1287 	if(dtio->stop_flush_event) {
1288 		ub_event_del(dtio->stop_flush_event);
1289 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1290 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291 			log_err("dnstap io, stop flush, could not ub_event_add");
1292 			return 0;
1293 		}
1294 		return 1;
1295 	}
1296 	return dtio_add_output_event_write(dtio);
1297 }
1298 #endif /* HAVE_SSL */
1299 
1300 #ifdef HAVE_SSL
1301 /** enable the brief write condition */
1302 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1303 {
1304 	dtio->ssl_brief_write = 1;
1305 	return dtio_add_output_event_write(dtio);
1306 }
1307 #endif /* HAVE_SSL */
1308 
1309 #ifdef HAVE_SSL
1310 /** disable the brief write condition */
1311 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1312 {
1313 	dtio->ssl_brief_write = 0;
1314 	return dtio_add_output_event_read(dtio);
1315 }
1316 #endif /* HAVE_SSL */
1317 
1318 #ifdef HAVE_SSL
1319 /** check peer verification after ssl handshake connection, false if closed*/
1320 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1321 {
1322 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1323 		/* verification */
1324 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1325 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1326 			if(!x) {
1327 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328 					"connection failed no certificate",
1329 					dtio->ip_str);
1330 				return 0;
1331 			}
1332 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1333 				x);
1334 #ifdef HAVE_SSL_GET0_PEERNAME
1335 			if(SSL_get0_peername(dtio->ssl)) {
1336 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1337 					"connection to %s authenticated",
1338 					dtio->ip_str,
1339 					SSL_get0_peername(dtio->ssl));
1340 			} else {
1341 #endif
1342 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1343 					"connection authenticated",
1344 					dtio->ip_str);
1345 #ifdef HAVE_SSL_GET0_PEERNAME
1346 			}
1347 #endif
1348 			X509_free(x);
1349 		} else {
1350 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1351 			if(x) {
1352 				log_cert(VERB_ALGO, "dnstap io, peer "
1353 					"certificate", x);
1354 				X509_free(x);
1355 			}
1356 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1357 				"failed: failed to authenticate",
1358 				dtio->ip_str);
1359 			return 0;
1360 		}
1361 	} else {
1362 		/* unauthenticated, the verify peer flag was not set
1363 		 * in ssl when the ssl object was created from ssl_ctx */
1364 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1365 			dtio->ip_str);
1366 	}
1367 	return 1;
1368 }
1369 #endif /* HAVE_SSL */
1370 
1371 #ifdef HAVE_SSL
1372 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1373 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1374 	struct stop_flush_info* info)
1375 {
1376 	int r;
1377 	if(dtio->ssl_brief_read) {
1378 		/* assume the brief read condition is satisfied,
1379 		 * if we need more or again, we can set it again */
1380 		if(!dtio_disable_brief_read(dtio)) {
1381 			if(info) dtio_stop_flush_exit(info);
1382 			return 0;
1383 		}
1384 	}
1385 	if(dtio->ssl_handshake_done)
1386 		return 1;
1387 
1388 	ERR_clear_error();
1389 	r = SSL_do_handshake(dtio->ssl);
1390 	if(r != 1) {
1391 		int want = SSL_get_error(dtio->ssl, r);
1392 		if(want == SSL_ERROR_WANT_READ) {
1393 			/* we want to read on the connection */
1394 			if(!dtio_enable_brief_read(dtio)) {
1395 				if(info) dtio_stop_flush_exit(info);
1396 				return 0;
1397 			}
1398 			return 0;
1399 		} else if(want == SSL_ERROR_WANT_WRITE) {
1400 			/* we want to write on the connection */
1401 			return 0;
1402 		} else if(r == 0) {
1403 			/* closed */
1404 			if(info) dtio_stop_flush_exit(info);
1405 			dtio_del_output_event(dtio);
1406 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1407 			dtio_close_output(dtio);
1408 			return 0;
1409 		} else if(want == SSL_ERROR_SYSCALL) {
1410 			/* SYSCALL and errno==0 means closed uncleanly */
1411 			int silent = 0;
1412 #ifdef EPIPE
1413 			if(errno == EPIPE && verbosity < 2)
1414 				silent = 1; /* silence 'broken pipe' */
1415 #endif
1416 #ifdef ECONNRESET
1417 			if(errno == ECONNRESET && verbosity < 2)
1418 				silent = 1; /* silence reset by peer */
1419 #endif
1420 			if(errno == 0)
1421 				silent = 1;
1422 			if(!silent)
1423 				log_err("dnstap io, SSL_handshake syscall: %s",
1424 					strerror(errno));
1425 			/* closed */
1426 			if(info) dtio_stop_flush_exit(info);
1427 			dtio_del_output_event(dtio);
1428 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1429 			dtio_close_output(dtio);
1430 			return 0;
1431 		} else {
1432 			unsigned long err = ERR_get_error();
1433 			if(!squelch_err_ssl_handshake(err)) {
1434 				log_crypto_err_code("dnstap io, ssl handshake failed",
1435 					err);
1436 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1437 					"from %s", dtio->ip_str);
1438 			}
1439 			/* closed */
1440 			if(info) dtio_stop_flush_exit(info);
1441 			dtio_del_output_event(dtio);
1442 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1443 			dtio_close_output(dtio);
1444 			return 0;
1445 		}
1446 
1447 	}
1448 	/* check peer verification */
1449 	dtio->ssl_handshake_done = 1;
1450 
1451 	if(!dtio_ssl_check_peer(dtio)) {
1452 		/* closed */
1453 		if(info) dtio_stop_flush_exit(info);
1454 		dtio_del_output_event(dtio);
1455 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1456 		dtio_close_output(dtio);
1457 		return 0;
1458 	}
1459 	return 1;
1460 }
1461 #endif /* HAVE_SSL */
1462 
1463 /** callback for the dnstap events, to write to the output */
1464 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1465 {
1466 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1467 	int i;
1468 
1469 	if(dtio->check_nb_connect) {
1470 		int connect_err = dtio_check_nb_connect(dtio);
1471 		if(connect_err == -1) {
1472 			/* close the channel */
1473 			dtio_del_output_event(dtio);
1474 			dtio_close_output(dtio);
1475 			return;
1476 		} else if(connect_err == 0) {
1477 			/* try again later */
1478 			return;
1479 		}
1480 		/* nonblocking connect check passed, continue */
1481 	}
1482 
1483 #ifdef HAVE_SSL
1484 	if(dtio->ssl &&
1485 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1486 		if(!dtio_ssl_handshake(dtio, NULL))
1487 			return;
1488 	}
1489 #endif
1490 
1491 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1492 		if(dtio->ssl_brief_write)
1493 			(void)dtio_disable_brief_write(dtio);
1494 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1495 			if(dtio_read_accept_frame(dtio) <= 0)
1496 				return;
1497 		} else if(!dtio_check_close(dtio))
1498 			return;
1499 	}
1500 
1501 	/* loop to process a number of messages.  This improves throughput,
1502 	 * because selecting on write-event if not needed for busy messages
1503 	 * (dnstap log) generation and if they need to all be written back.
1504 	 * The write event is usually not blocked up.  But not forever,
1505 	 * because the event loop needs to stay responsive for other events.
1506 	 * If there are no (more) messages, or if the output buffers get
1507 	 * full, it returns out of the loop. */
1508 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1509 		/* see if there are messages that need writing */
1510 		if(!dtio->cur_msg) {
1511 			if(!dtio_find_msg(dtio)) {
1512 				if(i == 0) {
1513 					/* no messages on the first iteration,
1514 					 * the queues are all empty */
1515 					dtio_sleep(dtio);
1516 				}
1517 				return; /* nothing to do */
1518 			}
1519 		}
1520 
1521 		/* write it */
1522 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1523 			if(!dtio_write_more(dtio))
1524 				return;
1525 		}
1526 
1527 		/* done with the current message */
1528 		dtio_cur_msg_free(dtio);
1529 
1530 		/* If this is a bidirectional stream the first message will be
1531 		 * the READY control frame. We can only continue writing after
1532 		 * receiving an ACCEPT control frame. */
1533 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1534 			dtio->ready_frame_sent = 1;
1535 			(void)dtio_add_output_event_read(dtio);
1536 			break;
1537 		}
1538 	}
1539 }
1540 
1541 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1542 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1543 {
1544 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1545 	uint8_t cmd;
1546 	ssize_t r;
1547 	if(dtio->want_to_exit)
1548 		return;
1549 	r = read(fd, &cmd, sizeof(cmd));
1550 	if(r == -1) {
1551 #ifndef USE_WINSOCK
1552 		if(errno == EINTR || errno == EAGAIN)
1553 			return; /* ignore this */
1554 #else
1555 		if(WSAGetLastError() == WSAEINPROGRESS)
1556 			return;
1557 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1558 			return;
1559 #endif
1560 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1561 		/* and then fall through to quit the thread */
1562 	} else if(r == 0) {
1563 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1564 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1565 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1566 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1567 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1568 
1569 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1570 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1571 				"waiting for ACCEPT control frame");
1572 			return;
1573 		}
1574 
1575 		/* reregister event */
1576 		if(!dtio_add_output_event_write(dtio))
1577 			return;
1578 		return;
1579 	} else if(r == 1) {
1580 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1581 	}
1582 	dtio->want_to_exit = 1;
1583 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1584 		!= 0) {
1585 		log_err("dnstap io: could not loopexit");
1586 	}
1587 }
1588 
1589 #ifndef THREADS_DISABLED
1590 /** setup the event base for the dnstap io thread */
1591 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1592 	struct timeval* now)
1593 {
1594 	memset(now, 0, sizeof(*now));
1595 	dtio->event_base = ub_default_event_base(0, secs, now);
1596 	if(!dtio->event_base) {
1597 		fatal_exit("dnstap io: could not create event_base");
1598 	}
1599 }
1600 #endif /* THREADS_DISABLED */
1601 
1602 /** setup the cmd event for dnstap io */
1603 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1604 {
1605 	struct ub_event* cmdev;
1606 	fd_set_nonblock(dtio->commandpipe[0]);
1607 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1608 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1609 	if(!cmdev) {
1610 		fatal_exit("dnstap io: out of memory");
1611 	}
1612 	dtio->command_event = cmdev;
1613 	if(ub_event_add(cmdev, NULL) != 0) {
1614 		fatal_exit("dnstap io: out of memory (adding event)");
1615 	}
1616 }
1617 
1618 /** setup the reconnect event for dnstap io */
1619 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1620 {
1621 	dtio_reconnect_clear(dtio);
1622 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1623 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1624 	if(!dtio->reconnect_timer) {
1625 		fatal_exit("dnstap io: out of memory");
1626 	}
1627 }
1628 
1629 /**
1630  * structure to keep track of information during stop flush
1631  */
1632 struct stop_flush_info {
1633 	/** the event base during stop flush */
1634 	struct ub_event_base* base;
1635 	/** did we already want to exit this stop-flush event base */
1636 	int want_to_exit_flush;
1637 	/** has the timer fired */
1638 	int timer_done;
1639 	/** the dtio */
1640 	struct dt_io_thread* dtio;
1641 	/** the stop control frame */
1642 	void* stop_frame;
1643 	/** length of the stop frame */
1644 	size_t stop_frame_len;
1645 	/** how much we have done of the stop frame */
1646 	size_t stop_frame_done;
1647 };
1648 
1649 /** exit the stop flush base */
1650 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1651 {
1652 	if(info->want_to_exit_flush)
1653 		return;
1654 	info->want_to_exit_flush = 1;
1655 	if(ub_event_base_loopexit(info->base) != 0) {
1656 		log_err("dnstap io: could not loopexit");
1657 	}
1658 }
1659 
1660 /** send the stop control,
1661  * return true if completed the frame. */
1662 static int dtio_control_stop_send(struct stop_flush_info* info)
1663 {
1664 	struct dt_io_thread* dtio = info->dtio;
1665 	int r;
1666 	if(info->stop_frame_done >= info->stop_frame_len)
1667 		return 1;
1668 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1669 		info->stop_frame_done, info->stop_frame_len -
1670 		info->stop_frame_done);
1671 	if(r == -1) {
1672 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1673 		dtio_stop_flush_exit(info);
1674 		return 0;
1675 	}
1676 	if(r == 0) {
1677 		/* try again later, or timeout */
1678 		return 0;
1679 	}
1680 	info->stop_frame_done += r;
1681 	if(info->stop_frame_done < info->stop_frame_len)
1682 		return 0; /* not done yet */
1683 	return 1;
1684 }
1685 
1686 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1687 	void* arg)
1688 {
1689 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1690 	if(info->want_to_exit_flush)
1691 		return;
1692 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1693 	info->timer_done = 1;
1694 	dtio_stop_flush_exit(info);
1695 }
1696 
1697 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1698 {
1699 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1700 	struct dt_io_thread* dtio = info->dtio;
1701 	if(info->want_to_exit_flush)
1702 		return;
1703 	if(dtio->check_nb_connect) {
1704 		/* we don't start the stop_flush if connect still
1705 		 * in progress, but the check code is here, just in case */
1706 		int connect_err = dtio_check_nb_connect(dtio);
1707 		if(connect_err == -1) {
1708 			/* close the channel, exit the stop flush */
1709 			dtio_stop_flush_exit(info);
1710 			dtio_del_output_event(dtio);
1711 			dtio_close_output(dtio);
1712 			return;
1713 		} else if(connect_err == 0) {
1714 			/* try again later */
1715 			return;
1716 		}
1717 		/* nonblocking connect check passed, continue */
1718 	}
1719 #ifdef HAVE_SSL
1720 	if(dtio->ssl &&
1721 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1722 		if(!dtio_ssl_handshake(dtio, info))
1723 			return;
1724 	}
1725 #endif
1726 
1727 	if((bits&UB_EV_READ)) {
1728 		if(!dtio_check_close(dtio)) {
1729 			if(dtio->fd == -1) {
1730 				verbose(VERB_ALGO, "dnstap io: "
1731 					"stop flush: output closed");
1732 				dtio_stop_flush_exit(info);
1733 			}
1734 			return;
1735 		}
1736 	}
1737 	/* write remainder of last frame */
1738 	if(dtio->cur_msg) {
1739 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1740 			if(!dtio_write_more(dtio)) {
1741 				if(dtio->fd == -1) {
1742 					verbose(VERB_ALGO, "dnstap io: "
1743 						"stop flush: output closed");
1744 					dtio_stop_flush_exit(info);
1745 				}
1746 				return;
1747 			}
1748 		}
1749 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1750 			"last frame");
1751 		dtio_cur_msg_free(dtio);
1752 	}
1753 	/* write stop frame */
1754 	if(info->stop_frame_done < info->stop_frame_len) {
1755 		if(!dtio_control_stop_send(info))
1756 			return;
1757 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1758 			"stop control frame");
1759 	}
1760 	/* when last frame and stop frame are sent, exit */
1761 	dtio_stop_flush_exit(info);
1762 }
1763 
1764 /** flush at end, last packet and stop control */
1765 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1766 {
1767 	/* briefly attempt to flush the previous packet to the output,
1768 	 * this could be a partial packet, or even the start control frame */
1769 	time_t secs = 0;
1770 	struct timeval now;
1771 	struct stop_flush_info info;
1772 	struct timeval tv;
1773 	struct ub_event* timer, *stopev;
1774 
1775 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1776 		/* no connection or we have just connected, so nothing is
1777 		 * sent yet, so nothing to stop or flush */
1778 		return;
1779 	}
1780 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1781 		/* no SSL connection has been established yet */
1782 		return;
1783 	}
1784 
1785 	memset(&info, 0, sizeof(info));
1786 	memset(&now, 0, sizeof(now));
1787 	info.dtio = dtio;
1788 	info.base = ub_default_event_base(0, &secs, &now);
1789 	if(!info.base) {
1790 		log_err("dnstap io: malloc failure");
1791 		return;
1792 	}
1793 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1794 		&dtio_stop_timer_cb, &info);
1795 	if(!timer) {
1796 		log_err("dnstap io: malloc failure");
1797 		ub_event_base_free(info.base);
1798 		return;
1799 	}
1800 	memset(&tv, 0, sizeof(tv));
1801 	tv.tv_sec = 2;
1802 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1803 		&tv) != 0) {
1804 		log_err("dnstap io: cannot event_timer_add");
1805 		ub_event_free(timer);
1806 		ub_event_base_free(info.base);
1807 		return;
1808 	}
1809 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1810 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1811 	if(!stopev) {
1812 		log_err("dnstap io: malloc failure");
1813 		ub_timer_del(timer);
1814 		ub_event_free(timer);
1815 		ub_event_base_free(info.base);
1816 		return;
1817 	}
1818 	if(ub_event_add(stopev, NULL) != 0) {
1819 		log_err("dnstap io: cannot event_add");
1820 		ub_event_free(stopev);
1821 		ub_timer_del(timer);
1822 		ub_event_free(timer);
1823 		ub_event_base_free(info.base);
1824 		return;
1825 	}
1826 	info.stop_frame = fstrm_create_control_frame_stop(
1827 		&info.stop_frame_len);
1828 	if(!info.stop_frame) {
1829 		log_err("dnstap io: malloc failure");
1830 		ub_event_del(stopev);
1831 		ub_event_free(stopev);
1832 		ub_timer_del(timer);
1833 		ub_event_free(timer);
1834 		ub_event_base_free(info.base);
1835 		return;
1836 	}
1837 	dtio->stop_flush_event = stopev;
1838 
1839 	/* wait briefly, or until finished */
1840 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1841 	if(ub_event_base_dispatch(info.base) < 0) {
1842 		log_err("dnstap io: dispatch flush failed, errno is %s",
1843 			strerror(errno));
1844 	}
1845 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1846 	free(info.stop_frame);
1847 	dtio->stop_flush_event = NULL;
1848 	ub_event_del(stopev);
1849 	ub_event_free(stopev);
1850 	ub_timer_del(timer);
1851 	ub_event_free(timer);
1852 	ub_event_base_free(info.base);
1853 }
1854 
1855 /** perform desetup and free stuff when the dnstap io thread exits */
1856 static void dtio_desetup(struct dt_io_thread* dtio)
1857 {
1858 	dtio_control_stop_flush(dtio);
1859 	dtio_del_output_event(dtio);
1860 	dtio_close_output(dtio);
1861 	ub_event_del(dtio->command_event);
1862 	ub_event_free(dtio->command_event);
1863 #ifndef USE_WINSOCK
1864 	close(dtio->commandpipe[0]);
1865 #else
1866 	_close(dtio->commandpipe[0]);
1867 #endif
1868 	dtio->commandpipe[0] = -1;
1869 	dtio_reconnect_del(dtio);
1870 	ub_event_free(dtio->reconnect_timer);
1871 	dtio_cur_msg_free(dtio);
1872 #ifndef THREADS_DISABLED
1873 	ub_event_base_free(dtio->event_base);
1874 #endif
1875 }
1876 
1877 /** setup a start control message */
1878 static int dtio_control_start_send(struct dt_io_thread* dtio)
1879 {
1880 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1881 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1882 		&dtio->cur_msg_len);
1883 	if(!dtio->cur_msg) {
1884 		return 0;
1885 	}
1886 	/* setup to send the control message */
1887 	/* set that the buffer needs to be sent, but the length
1888 	 * of that buffer is already written, that way the buffer can
1889 	 * start with 0 length and then the length of the control frame
1890 	 * in it */
1891 	dtio->cur_msg_done = 0;
1892 	dtio->cur_msg_len_done = 4;
1893 	return 1;
1894 }
1895 
1896 /** setup a ready control message */
1897 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1898 {
1899 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1900 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1901 		&dtio->cur_msg_len);
1902 	if(!dtio->cur_msg) {
1903 		return 0;
1904 	}
1905 	/* setup to send the control message */
1906 	/* set that the buffer needs to be sent, but the length
1907 	 * of that buffer is already written, that way the buffer can
1908 	 * start with 0 length and then the length of the control frame
1909 	 * in it */
1910 	dtio->cur_msg_done = 0;
1911 	dtio->cur_msg_len_done = 4;
1912 	return 1;
1913 }
1914 
1915 /** open the output file descriptor for af_local */
1916 static int dtio_open_output_local(struct dt_io_thread* dtio)
1917 {
1918 #ifdef HAVE_SYS_UN_H
1919 	struct sockaddr_un s;
1920 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1921 	if(dtio->fd == -1) {
1922 		log_err("dnstap io: failed to create socket: %s",
1923 			sock_strerror(errno));
1924 		return 0;
1925 	}
1926 	memset(&s, 0, sizeof(s));
1927 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1928         /* this member exists on BSDs, not Linux */
1929         s.sun_len = (unsigned)sizeof(s);
1930 #endif
1931 	s.sun_family = AF_LOCAL;
1932 	/* length is 92-108, 104 on FreeBSD */
1933         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1934 	fd_set_nonblock(dtio->fd);
1935 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1936 		== -1) {
1937 		char* to = dtio->socket_path;
1938 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1939 			verbosity < 4) {
1940 			dtio_close_fd(dtio);
1941 			return 0; /* no log retries on low verbosity */
1942 		}
1943 		log_err("dnstap io: failed to connect to \"%s\": %s",
1944 			to, sock_strerror(errno));
1945 		dtio_close_fd(dtio);
1946 		return 0;
1947 	}
1948 	return 1;
1949 #else
1950 	log_err("cannot create af_local socket");
1951 	return 0;
1952 #endif /* HAVE_SYS_UN_H */
1953 }
1954 
1955 /** open the output file descriptor for af_inet and af_inet6 */
1956 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1957 {
1958 	struct sockaddr_storage addr;
1959 	socklen_t addrlen;
1960 	memset(&addr, 0, sizeof(addr));
1961 	addrlen = (socklen_t)sizeof(addr);
1962 
1963 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1964 		log_err("could not parse IP '%s'", dtio->ip_str);
1965 		return 0;
1966 	}
1967 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1968 	if(dtio->fd == -1) {
1969 		log_err("can't create socket: %s", sock_strerror(errno));
1970 		return 0;
1971 	}
1972 	fd_set_nonblock(dtio->fd);
1973 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1974 		if(errno == EINPROGRESS)
1975 			return 1; /* wait until connect done*/
1976 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1977 			verbosity < 4) {
1978 			dtio_close_fd(dtio);
1979 			return 0; /* no log retries on low verbosity */
1980 		}
1981 #ifndef USE_WINSOCK
1982 		if(tcp_connect_errno_needs_log(
1983 			(struct sockaddr *)&addr, addrlen)) {
1984 			log_err("dnstap io: failed to connect to %s: %s",
1985 				dtio->ip_str, strerror(errno));
1986 		}
1987 #else
1988 		if(WSAGetLastError() == WSAEINPROGRESS ||
1989 			WSAGetLastError() == WSAEWOULDBLOCK)
1990 			return 1; /* wait until connect done*/
1991 		if(tcp_connect_errno_needs_log(
1992 			(struct sockaddr *)&addr, addrlen)) {
1993 			log_err("dnstap io: failed to connect to %s: %s",
1994 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1995 		}
1996 #endif
1997 		dtio_close_fd(dtio);
1998 		return 0;
1999 	}
2000 	return 1;
2001 }
2002 
2003 /** setup the SSL structure for new connection */
2004 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2005 {
2006 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2007 	if(!dtio->ssl) return 0;
2008 	dtio->ssl_handshake_done = 0;
2009 	dtio->ssl_brief_read = 0;
2010 
2011 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2012 		dtio->tls_use_sni)) {
2013 		return 0;
2014 	}
2015 	return 1;
2016 }
2017 
2018 /** open the output file descriptor */
2019 static void dtio_open_output(struct dt_io_thread* dtio)
2020 {
2021 	struct ub_event* ev;
2022 	if(dtio->upstream_is_unix) {
2023 		if(!dtio_open_output_local(dtio)) {
2024 			dtio_reconnect_enable(dtio);
2025 			return;
2026 		}
2027 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2028 		if(!dtio_open_output_tcp(dtio)) {
2029 			dtio_reconnect_enable(dtio);
2030 			return;
2031 		}
2032 		if(dtio->upstream_is_tls) {
2033 			if(!dtio_setup_ssl(dtio)) {
2034 				dtio_close_fd(dtio);
2035 				dtio_reconnect_enable(dtio);
2036 				return;
2037 			}
2038 		}
2039 	}
2040 	dtio->check_nb_connect = 1;
2041 
2042 	/* the EV_READ is to read ACCEPT control messages, and catch channel
2043 	 * close. EV_WRITE is to write packets */
2044 	ev = ub_event_new(dtio->event_base, dtio->fd,
2045 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2046 		dtio);
2047 	if(!ev) {
2048 		log_err("dnstap io: out of memory");
2049 		if(dtio->ssl) {
2050 #ifdef HAVE_SSL
2051 			SSL_free(dtio->ssl);
2052 			dtio->ssl = NULL;
2053 #endif
2054 		}
2055 		dtio_close_fd(dtio);
2056 		dtio_reconnect_enable(dtio);
2057 		return;
2058 	}
2059 	dtio->event = ev;
2060 
2061 	/* setup protocol control message to start */
2062 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2063 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2064 		log_err("dnstap io: out of memory");
2065 		ub_event_free(dtio->event);
2066 		dtio->event = NULL;
2067 		if(dtio->ssl) {
2068 #ifdef HAVE_SSL
2069 			SSL_free(dtio->ssl);
2070 			dtio->ssl = NULL;
2071 #endif
2072 		}
2073 		dtio_close_fd(dtio);
2074 		dtio_reconnect_enable(dtio);
2075 		return;
2076 	}
2077 }
2078 
2079 /** perform the setup of the writer thread on the established event_base */
2080 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2081 {
2082 	dtio_setup_cmd(dtio);
2083 	dtio_setup_reconnect(dtio);
2084 	dtio_open_output(dtio);
2085 	if(!dtio_add_output_event_write(dtio))
2086 		return;
2087 }
2088 
2089 #ifndef THREADS_DISABLED
2090 /** the IO thread function for the DNSTAP IO */
2091 static void* dnstap_io(void* arg)
2092 {
2093 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2094 	time_t secs = 0;
2095 	struct timeval now;
2096 	log_thread_set(&dtio->threadnum);
2097 
2098 	/* setup */
2099 	verbose(VERB_ALGO, "start dnstap io thread");
2100 	dtio_setup_base(dtio, &secs, &now);
2101 	dtio_setup_on_base(dtio);
2102 
2103 	/* run */
2104 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2105 		log_err("dnstap io: dispatch failed, errno is %s",
2106 			strerror(errno));
2107 	}
2108 
2109 	/* cleanup */
2110 	verbose(VERB_ALGO, "stop dnstap io thread");
2111 	dtio_desetup(dtio);
2112 	return NULL;
2113 }
2114 #endif /* THREADS_DISABLED */
2115 
2116 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2117 	int numworkers)
2118 {
2119 	/* set up the thread, can fail */
2120 #ifndef USE_WINSOCK
2121 	if(pipe(dtio->commandpipe) == -1) {
2122 		log_err("failed to create pipe: %s", strerror(errno));
2123 		return 0;
2124 	}
2125 #else
2126 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2127 		log_err("failed to create _pipe: %s",
2128 			wsa_strerror(WSAGetLastError()));
2129 		return 0;
2130 	}
2131 #endif
2132 
2133 	/* start the thread */
2134 	dtio->threadnum = numworkers+1;
2135 	dtio->started = 1;
2136 #ifndef THREADS_DISABLED
2137 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2138 	(void)event_base_nothr;
2139 #else
2140 	dtio->event_base = event_base_nothr;
2141 	dtio_setup_on_base(dtio);
2142 #endif
2143 	return 1;
2144 }
2145 
2146 void dt_io_thread_stop(struct dt_io_thread* dtio)
2147 {
2148 #ifndef THREADS_DISABLED
2149 	uint8_t cmd = DTIO_COMMAND_STOP;
2150 #endif
2151 	if(!dtio) return;
2152 	if(!dtio->started) return;
2153 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2154 
2155 #ifndef THREADS_DISABLED
2156 	while(1) {
2157 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2158 		if(r == -1) {
2159 #ifndef USE_WINSOCK
2160 			if(errno == EINTR || errno == EAGAIN)
2161 				continue;
2162 #else
2163 			if(WSAGetLastError() == WSAEINPROGRESS)
2164 				continue;
2165 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2166 				continue;
2167 #endif
2168 			log_err("dnstap io stop: write: %s",
2169 				sock_strerror(errno));
2170 			break;
2171 		}
2172 		break;
2173 	}
2174 	dtio->started = 0;
2175 #endif /* THREADS_DISABLED */
2176 
2177 #ifndef USE_WINSOCK
2178 	close(dtio->commandpipe[1]);
2179 #else
2180 	_close(dtio->commandpipe[1]);
2181 #endif
2182 	dtio->commandpipe[1] = -1;
2183 #ifndef THREADS_DISABLED
2184 	ub_thread_join(dtio->tid);
2185 #else
2186 	dtio->want_to_exit = 1;
2187 	dtio_desetup(dtio);
2188 #endif
2189 }
2190