xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision 9e5787d2284e187abb5b654d924394a65772e004)
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62 
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 
72 /** maximum length of received frame */
73 #define DTIO_RECV_FRAME_MAX_LEN 1000
74 
75 struct stop_flush_info;
76 /** DTIO command channel commands */
77 enum {
78 	/** DTIO command channel stop */
79 	DTIO_COMMAND_STOP = 0,
80 	/** DTIO command channel wakeup */
81 	DTIO_COMMAND_WAKEUP = 1
82 } dtio_channel_command;
83 
84 /** open the output channel */
85 static void dtio_open_output(struct dt_io_thread* dtio);
86 /** add output event for read and write */
87 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
88 /** start reconnection attempts */
89 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
90 /** stop from stop_flush event loop */
91 static void dtio_stop_flush_exit(struct stop_flush_info* info);
92 /** setup a start control message */
93 static int dtio_control_start_send(struct dt_io_thread* dtio);
94 #ifdef HAVE_SSL
95 /** enable briefly waiting for a read event, for SSL negotiation */
96 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
97 /** enable briefly waiting for a write event, for SSL negotiation */
98 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
99 #endif
100 
101 struct dt_msg_queue*
102 dt_msg_queue_create(void)
103 {
104 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
105 	if(!mq) return NULL;
106 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
107 		about 1 M should contain 64K messages with some overhead,
108 		or a whole bunch smaller ones */
109 	lock_basic_init(&mq->lock);
110 	lock_protect(&mq->lock, mq, sizeof(*mq));
111 	return mq;
112 }
113 
114 /** clear the message list, caller must hold the lock */
115 static void
116 dt_msg_queue_clear(struct dt_msg_queue* mq)
117 {
118 	struct dt_msg_entry* e = mq->first, *next=NULL;
119 	while(e) {
120 		next = e->next;
121 		free(e->buf);
122 		free(e);
123 		e = next;
124 	}
125 	mq->first = NULL;
126 	mq->last = NULL;
127 	mq->cursize = 0;
128 }
129 
130 void
131 dt_msg_queue_delete(struct dt_msg_queue* mq)
132 {
133 	if(!mq) return;
134 	lock_basic_destroy(&mq->lock);
135 	dt_msg_queue_clear(mq);
136 	free(mq);
137 }
138 
139 /** make the dtio wake up by sending a wakeup command */
140 static void dtio_wakeup(struct dt_io_thread* dtio)
141 {
142 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
143 	if(!dtio) return;
144 	if(!dtio->started) return;
145 
146 	while(1) {
147 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
148 		if(r == -1) {
149 #ifndef USE_WINSOCK
150 			if(errno == EINTR || errno == EAGAIN)
151 				continue;
152 			log_err("dnstap io wakeup: write: %s", strerror(errno));
153 #else
154 			if(WSAGetLastError() == WSAEINPROGRESS)
155 				continue;
156 			if(WSAGetLastError() == WSAEWOULDBLOCK)
157 				continue;
158 			log_err("dnstap io stop: write: %s",
159 				wsa_strerror(WSAGetLastError()));
160 #endif
161 			break;
162 		}
163 		break;
164 	}
165 }
166 
167 void
168 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
169 {
170 	int wakeup = 0;
171 	struct dt_msg_entry* entry;
172 
173 	/* check conditions */
174 	if(!buf) return;
175 	if(len == 0) {
176 		/* it is not possible to log entries with zero length,
177 		 * because the framestream protocol does not carry it.
178 		 * However the protobuf serialization does not create zero
179 		 * length datagrams for dnstap, so this should not happen. */
180 		free(buf);
181 		return;
182 	}
183 	if(!mq) {
184 		free(buf);
185 		return;
186 	}
187 
188 	/* allocate memory for queue entry */
189 	entry = malloc(sizeof(*entry));
190 	if(!entry) {
191 		log_err("out of memory logging dnstap");
192 		free(buf);
193 		return;
194 	}
195 	entry->next = NULL;
196 	entry->buf = buf;
197 	entry->len = len;
198 
199 	/* aqcuire lock */
200 	lock_basic_lock(&mq->lock);
201 	/* list was empty, wakeup dtio */
202 	if(mq->first == NULL)
203 		wakeup = 1;
204 	/* see if it is going to fit */
205 	if(mq->cursize + len > mq->maxsize) {
206 		/* buffer full, or congested. */
207 		/* drop */
208 		lock_basic_unlock(&mq->lock);
209 		free(buf);
210 		free(entry);
211 		return;
212 	}
213 	mq->cursize += len;
214 	/* append to list */
215 	if(mq->last) {
216 		mq->last->next = entry;
217 	} else {
218 		mq->first = entry;
219 	}
220 	mq->last = entry;
221 	/* release lock */
222 	lock_basic_unlock(&mq->lock);
223 
224 	if(wakeup)
225 		dtio_wakeup(mq->dtio);
226 }
227 
228 struct dt_io_thread* dt_io_thread_create(void)
229 {
230 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
231 	return dtio;
232 }
233 
234 void dt_io_thread_delete(struct dt_io_thread* dtio)
235 {
236 	struct dt_io_list_item* item, *nextitem;
237 	if(!dtio) return;
238 	item=dtio->io_list;
239 	while(item) {
240 		nextitem = item->next;
241 		free(item);
242 		item = nextitem;
243 	}
244 	free(dtio->socket_path);
245 	free(dtio->ip_str);
246 	free(dtio->tls_server_name);
247 	free(dtio->client_key_file);
248 	free(dtio->client_cert_file);
249 	if(dtio->ssl_ctx) {
250 #ifdef HAVE_SSL
251 		SSL_CTX_free(dtio->ssl_ctx);
252 #endif
253 	}
254 	free(dtio);
255 }
256 
257 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
258 {
259 	if(!cfg->dnstap) {
260 		log_warn("cannot setup dnstap because dnstap-enable is no");
261 		return 0;
262 	}
263 
264 	/* what type of connectivity do we have */
265 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
266 		if(cfg->dnstap_tls)
267 			dtio->upstream_is_tls = 1;
268 		else	dtio->upstream_is_tcp = 1;
269 	} else {
270 		dtio->upstream_is_unix = 1;
271 	}
272 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
273 
274 	if(dtio->upstream_is_unix) {
275 		if(!cfg->dnstap_socket_path ||
276 			cfg->dnstap_socket_path[0]==0) {
277 			log_err("dnstap setup: no dnstap-socket-path for "
278 				"socket connect");
279 			return 0;
280 		}
281 		free(dtio->socket_path);
282 		dtio->socket_path = strdup(cfg->dnstap_socket_path);
283 		if(!dtio->socket_path) {
284 			log_err("dnstap setup: malloc failure");
285 			return 0;
286 		}
287 	}
288 
289 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
290 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
291 			log_err("dnstap setup: no dnstap-ip for TCP connect");
292 			return 0;
293 		}
294 		free(dtio->ip_str);
295 		dtio->ip_str = strdup(cfg->dnstap_ip);
296 		if(!dtio->ip_str) {
297 			log_err("dnstap setup: malloc failure");
298 			return 0;
299 		}
300 	}
301 
302 	if(dtio->upstream_is_tls) {
303 #ifdef HAVE_SSL
304 		if(cfg->dnstap_tls_server_name &&
305 			cfg->dnstap_tls_server_name[0]) {
306 			free(dtio->tls_server_name);
307 			dtio->tls_server_name = strdup(
308 				cfg->dnstap_tls_server_name);
309 			if(!dtio->tls_server_name) {
310 				log_err("dnstap setup: malloc failure");
311 				return 0;
312 			}
313 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
314 				return 0;
315 		}
316 		if(cfg->dnstap_tls_client_key_file &&
317 			cfg->dnstap_tls_client_key_file[0]) {
318 			dtio->use_client_certs = 1;
319 			free(dtio->client_key_file);
320 			dtio->client_key_file = strdup(
321 				cfg->dnstap_tls_client_key_file);
322 			if(!dtio->client_key_file) {
323 				log_err("dnstap setup: malloc failure");
324 				return 0;
325 			}
326 			if(!cfg->dnstap_tls_client_cert_file ||
327 				cfg->dnstap_tls_client_cert_file[0]==0) {
328 				log_err("dnstap setup: client key "
329 					"authentication enabled with "
330 					"dnstap-tls-client-key-file, but "
331 					"no dnstap-tls-client-cert-file "
332 					"is given");
333 				return 0;
334 			}
335 			free(dtio->client_cert_file);
336 			dtio->client_cert_file = strdup(
337 				cfg->dnstap_tls_client_cert_file);
338 			if(!dtio->client_cert_file) {
339 				log_err("dnstap setup: malloc failure");
340 				return 0;
341 			}
342 		} else {
343 			dtio->use_client_certs = 0;
344 			dtio->client_key_file = NULL;
345 			dtio->client_cert_file = NULL;
346 		}
347 
348 		if(cfg->dnstap_tls_cert_bundle) {
349 			dtio->ssl_ctx = connect_sslctx_create(
350 				dtio->client_key_file,
351 				dtio->client_cert_file,
352 				cfg->dnstap_tls_cert_bundle, 0);
353 		} else {
354 			dtio->ssl_ctx = connect_sslctx_create(
355 				dtio->client_key_file,
356 				dtio->client_cert_file,
357 				cfg->tls_cert_bundle, cfg->tls_win_cert);
358 		}
359 		if(!dtio->ssl_ctx) {
360 			log_err("could not setup SSL CTX");
361 			return 0;
362 		}
363 		dtio->tls_use_sni = cfg->tls_use_sni;
364 #endif /* HAVE_SSL */
365 	}
366 	return 1;
367 }
368 
369 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
370         struct dt_msg_queue* mq)
371 {
372 	struct dt_io_list_item* item = malloc(sizeof(*item));
373 	if(!item) return 0;
374 	lock_basic_lock(&mq->lock);
375 	mq->dtio = dtio;
376 	lock_basic_unlock(&mq->lock);
377 	item->queue = mq;
378 	item->next = dtio->io_list;
379 	dtio->io_list = item;
380 	dtio->io_list_iter = NULL;
381 	return 1;
382 }
383 
384 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
385         struct dt_msg_queue* mq)
386 {
387 	struct dt_io_list_item* item, *prev=NULL;
388 	if(!dtio) return;
389 	item = dtio->io_list;
390 	while(item) {
391 		if(item->queue == mq) {
392 			/* found it */
393 			if(prev) prev->next = item->next;
394 			else dtio->io_list = item->next;
395 			/* the queue itself only registered, not deleted */
396 			lock_basic_lock(&item->queue->lock);
397 			item->queue->dtio = NULL;
398 			lock_basic_unlock(&item->queue->lock);
399 			free(item);
400 			dtio->io_list_iter = NULL;
401 			return;
402 		}
403 		prev = item;
404 		item = item->next;
405 	}
406 }
407 
408 /** pick a message from the queue, the routine locks and unlocks,
409  * returns true if there is a message */
410 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
411 	size_t* len)
412 {
413 	lock_basic_lock(&mq->lock);
414 	if(mq->first) {
415 		struct dt_msg_entry* entry = mq->first;
416 		mq->first = entry->next;
417 		if(!entry->next) mq->last = NULL;
418 		mq->cursize -= entry->len;
419 		lock_basic_unlock(&mq->lock);
420 
421 		*buf = entry->buf;
422 		*len = entry->len;
423 		free(entry);
424 		return 1;
425 	}
426 	lock_basic_unlock(&mq->lock);
427 	return 0;
428 }
429 
430 /** find message in queue, false if no message, true if message to send */
431 static int dtio_find_in_queue(struct dt_io_thread* dtio,
432 	struct dt_msg_queue* mq)
433 {
434 	void* buf=NULL;
435 	size_t len=0;
436 	if(dt_msg_queue_pop(mq, &buf, &len)) {
437 		dtio->cur_msg = buf;
438 		dtio->cur_msg_len = len;
439 		dtio->cur_msg_done = 0;
440 		dtio->cur_msg_len_done = 0;
441 		return 1;
442 	}
443 	return 0;
444 }
445 
446 /** find a new message to write, search message queues, false if none */
447 static int dtio_find_msg(struct dt_io_thread* dtio)
448 {
449 	struct dt_io_list_item *spot, *item;
450 
451 	spot = dtio->io_list_iter;
452 	/* use the next queue for the next message lookup,
453 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
454 	if(spot)
455 		dtio->io_list_iter = spot->next;
456 	else if(dtio->io_list)
457 		dtio->io_list_iter = dtio->io_list->next;
458 
459 	/* scan from spot to end-of-io_list */
460 	item = spot;
461 	while(item) {
462 		if(dtio_find_in_queue(dtio, item->queue))
463 			return 1;
464 		item = item->next;
465 	}
466 	/* scan starting at the start-of-list (to wrap around the end) */
467 	item = dtio->io_list;
468 	while(item) {
469 		if(dtio_find_in_queue(dtio, item->queue))
470 			return 1;
471 		item = item->next;
472 	}
473 	return 0;
474 }
475 
476 /** callback for the dnstap reconnect, to start reconnecting to output */
477 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
478 	short ATTR_UNUSED(bits), void* arg)
479 {
480 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
481 	dtio->reconnect_is_added = 0;
482 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
483 
484 	dtio_open_output(dtio);
485 	if(dtio->event) {
486 		if(!dtio_add_output_event_write(dtio))
487 			return;
488 		/* nothing wrong so far, wait on the output event */
489 		return;
490 	}
491 	/* exponential backoff and retry on timer */
492 	dtio_reconnect_enable(dtio);
493 }
494 
495 /** attempt to reconnect to the output, after a timeout */
496 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
497 {
498 	struct timeval tv;
499 	int msec;
500 	if(dtio->want_to_exit) return;
501 	if(dtio->reconnect_is_added)
502 		return; /* already done */
503 
504 	/* exponential backoff, store the value for next timeout */
505 	msec = dtio->reconnect_timeout;
506 	if(msec == 0) {
507 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
508 	} else {
509 		dtio->reconnect_timeout = msec*2;
510 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
511 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
512 	}
513 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
514 		msec);
515 
516 	/* setup wait timer */
517 	memset(&tv, 0, sizeof(tv));
518 	tv.tv_sec = msec/1000;
519 	tv.tv_usec = (msec%1000)*1000;
520 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
521 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
522 		log_err("dnstap io: could not reconnect ev timer add");
523 		return;
524 	}
525 	dtio->reconnect_is_added = 1;
526 }
527 
528 /** remove dtio reconnect timer */
529 static void dtio_reconnect_del(struct dt_io_thread* dtio)
530 {
531 	if(!dtio->reconnect_is_added)
532 		return;
533 	ub_timer_del(dtio->reconnect_timer);
534 	dtio->reconnect_is_added = 0;
535 }
536 
537 /** clear the reconnect exponential backoff timer.
538  * We have successfully connected so we can try again with short timeouts. */
539 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
540 {
541 	dtio->reconnect_timeout = 0;
542 	dtio_reconnect_del(dtio);
543 }
544 
545 /** reconnect slowly, because we already know we have to wait for a bit */
546 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
547 {
548 	dtio_reconnect_del(dtio);
549 	dtio->reconnect_timeout = msec;
550 	dtio_reconnect_enable(dtio);
551 }
552 
553 /** delete the current message in the dtio, and reset counters */
554 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
555 {
556 	free(dtio->cur_msg);
557 	dtio->cur_msg = NULL;
558 	dtio->cur_msg_len = 0;
559 	dtio->cur_msg_done = 0;
560 	dtio->cur_msg_len_done = 0;
561 }
562 
563 /** delete the buffer and counters used to read frame */
564 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
565 {
566 	if(rb->buf) {
567 		free(rb->buf);
568 		rb->buf = NULL;
569 	}
570 	rb->buf_count = 0;
571 	rb->buf_cap = 0;
572 	rb->frame_len = 0;
573 	rb->frame_len_done = 0;
574 	rb->control_frame = 0;
575 }
576 
577 /** del the output file descriptor event for listening */
578 static void dtio_del_output_event(struct dt_io_thread* dtio)
579 {
580 	if(!dtio->event_added)
581 		return;
582 	ub_event_del(dtio->event);
583 	dtio->event_added = 0;
584 	dtio->event_added_is_write = 0;
585 }
586 
587 /** close dtio socket and set it to -1 */
588 static void dtio_close_fd(struct dt_io_thread* dtio)
589 {
590 #ifndef USE_WINSOCK
591 	close(dtio->fd);
592 #else
593 	closesocket(dtio->fd);
594 #endif
595 	dtio->fd = -1;
596 }
597 
598 /** close and stop the output file descriptor event */
599 static void dtio_close_output(struct dt_io_thread* dtio)
600 {
601 	if(!dtio->event)
602 		return;
603 	ub_event_free(dtio->event);
604 	dtio->event = NULL;
605 	if(dtio->ssl) {
606 #ifdef HAVE_SSL
607 		SSL_shutdown(dtio->ssl);
608 		SSL_free(dtio->ssl);
609 		dtio->ssl = NULL;
610 #endif
611 	}
612 	dtio_close_fd(dtio);
613 
614 	/* if there is a (partial) message, discard it
615 	 * we cannot send (the remainder of) it, and a new
616 	 * connection needs to start with a control frame. */
617 	if(dtio->cur_msg) {
618 		dtio_cur_msg_free(dtio);
619 	}
620 
621 	dtio->ready_frame_sent = 0;
622 	dtio->accept_frame_received = 0;
623 	dtio_read_frame_free(&dtio->read_frame);
624 
625 	dtio_reconnect_enable(dtio);
626 }
627 
628 /** check for pending nonblocking connect errors,
629  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
630 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
631 {
632 	int error = 0;
633 	socklen_t len = (socklen_t)sizeof(error);
634 	if(!dtio->check_nb_connect)
635 		return 1; /* everything okay */
636 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
637 		&len) < 0) {
638 #ifndef USE_WINSOCK
639 		error = errno; /* on solaris errno is error */
640 #else
641 		error = WSAGetLastError();
642 #endif
643 	}
644 #ifndef USE_WINSOCK
645 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
646 	if(error == EINPROGRESS || error == EWOULDBLOCK)
647 		return 0; /* try again later */
648 #endif
649 #else
650 	if(error == WSAEINPROGRESS) {
651 		return 0; /* try again later */
652 	} else if(error == WSAEWOULDBLOCK) {
653 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
654 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
655 		return 0; /* try again later */
656 	}
657 #endif
658 	if(error != 0) {
659 		char* to = dtio->socket_path;
660 		if(!to) to = dtio->ip_str;
661 		if(!to) to = "";
662 #ifndef USE_WINSOCK
663 		log_err("dnstap io: failed to connect to \"%s\": %s",
664 			to, strerror(error));
665 #else
666 		log_err("dnstap io: failed to connect to \"%s\": %s",
667 			to, wsa_strerror(error));
668 #endif
669 		return -1; /* error, close it */
670 	}
671 
672 	if(dtio->ip_str)
673 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
674 			dtio->ip_str);
675 	else if(dtio->socket_path)
676 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
677 			dtio->socket_path);
678 	dtio_reconnect_clear(dtio);
679 	dtio->check_nb_connect = 0;
680 	return 1; /* everything okay */
681 }
682 
683 #ifdef HAVE_SSL
684 /** write to ssl output
685  * returns number of bytes written, 0 if nothing happened,
686  * try again later, or -1 if the channel is to be closed. */
687 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
688 	size_t len)
689 {
690 	int r;
691 	ERR_clear_error();
692 	r = SSL_write(dtio->ssl, buf, len);
693 	if(r <= 0) {
694 		int want = SSL_get_error(dtio->ssl, r);
695 		if(want == SSL_ERROR_ZERO_RETURN) {
696 			/* closed */
697 			return -1;
698 		} else if(want == SSL_ERROR_WANT_READ) {
699 			/* we want a brief read event */
700 			dtio_enable_brief_read(dtio);
701 			return 0;
702 		} else if(want == SSL_ERROR_WANT_WRITE) {
703 			/* write again later */
704 			return 0;
705 		} else if(want == SSL_ERROR_SYSCALL) {
706 #ifdef EPIPE
707 			if(errno == EPIPE && verbosity < 2)
708 				return -1; /* silence 'broken pipe' */
709 #endif
710 #ifdef ECONNRESET
711 			if(errno == ECONNRESET && verbosity < 2)
712 				return -1; /* silence reset by peer */
713 #endif
714 			if(errno != 0) {
715 				log_err("dnstap io, SSL_write syscall: %s",
716 					strerror(errno));
717 			}
718 			return -1;
719 		}
720 		log_crypto_err("dnstap io, could not SSL_write");
721 		return -1;
722 	}
723 	return r;
724 }
725 #endif /* HAVE_SSL */
726 
727 /** write buffer to output.
728  * returns number of bytes written, 0 if nothing happened,
729  * try again later, or -1 if the channel is to be closed. */
730 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
731 	size_t len)
732 {
733 	ssize_t ret;
734 	if(dtio->fd == -1)
735 		return -1;
736 #ifdef HAVE_SSL
737 	if(dtio->ssl)
738 		return dtio_write_ssl(dtio, buf, len);
739 #endif
740 	ret = send(dtio->fd, (void*)buf, len, 0);
741 	if(ret == -1) {
742 #ifndef USE_WINSOCK
743 		if(errno == EINTR || errno == EAGAIN)
744 			return 0;
745 		log_err("dnstap io: failed send: %s", strerror(errno));
746 #else
747 		if(WSAGetLastError() == WSAEINPROGRESS)
748 			return 0;
749 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
750 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
751 				dtio->stop_flush_event:dtio->event),
752 				UB_EV_WRITE);
753 			return 0;
754 		}
755 		log_err("dnstap io: failed send: %s",
756 			wsa_strerror(WSAGetLastError()));
757 #endif
758 		return -1;
759 	}
760 	return ret;
761 }
762 
763 #ifdef HAVE_WRITEV
764 /** write with writev, len and message, in one write, if possible.
765  * return true if message is done, false if incomplete */
766 static int dtio_write_with_writev(struct dt_io_thread* dtio)
767 {
768 	uint32_t sendlen = htonl(dtio->cur_msg_len);
769 	struct iovec iov[2];
770 	ssize_t r;
771 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
772 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
773 	iov[1].iov_base = dtio->cur_msg;
774 	iov[1].iov_len = dtio->cur_msg_len;
775 	log_assert(iov[0].iov_len > 0);
776 	r = writev(dtio->fd, iov, 2);
777 	if(r == -1) {
778 #ifndef USE_WINSOCK
779 		if(errno == EINTR || errno == EAGAIN)
780 			return 0;
781 		log_err("dnstap io: failed writev: %s", strerror(errno));
782 #else
783 		if(WSAGetLastError() == WSAEINPROGRESS)
784 			return 0;
785 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
786 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
787 				dtio->stop_flush_event:dtio->event),
788 				UB_EV_WRITE);
789 			return 0;
790 		}
791 		log_err("dnstap io: failed writev: %s",
792 			wsa_strerror(WSAGetLastError()));
793 #endif
794 		/* close the channel */
795 		dtio_del_output_event(dtio);
796 		dtio_close_output(dtio);
797 		return 0;
798 	}
799 	/* written r bytes */
800 	dtio->cur_msg_len_done += r;
801 	if(dtio->cur_msg_len_done < 4)
802 		return 0;
803 	if(dtio->cur_msg_len_done > 4) {
804 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
805 		dtio->cur_msg_len_done = 4;
806 	}
807 	if(dtio->cur_msg_done < dtio->cur_msg_len)
808 		return 0;
809 	return 1;
810 }
811 #endif /* HAVE_WRITEV */
812 
813 /** write more of the length, preceding the data frame.
814  * return true if message is done, false if incomplete. */
815 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
816 {
817 	uint32_t sendlen;
818 	int r;
819 	if(dtio->cur_msg_len_done >= 4)
820 		return 1;
821 #ifdef HAVE_WRITEV
822 	if(!dtio->ssl) {
823 		/* we try writev for everything.*/
824 		return dtio_write_with_writev(dtio);
825 	}
826 #endif /* HAVE_WRITEV */
827 	sendlen = htonl(dtio->cur_msg_len);
828 	r = dtio_write_buf(dtio,
829 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
830 		sizeof(sendlen)-dtio->cur_msg_len_done);
831 	if(r == -1) {
832 		/* close the channel */
833 		dtio_del_output_event(dtio);
834 		dtio_close_output(dtio);
835 		return 0;
836 	} else if(r == 0) {
837 		/* try again later */
838 		return 0;
839 	}
840 	dtio->cur_msg_len_done += r;
841 	if(dtio->cur_msg_len_done < 4)
842 		return 0;
843 	return 1;
844 }
845 
846 /** write more of the data frame.
847  * return true if message is done, false if incomplete. */
848 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
849 {
850 	int r;
851 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
852 		return 1;
853 	r = dtio_write_buf(dtio,
854 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
855 		dtio->cur_msg_len - dtio->cur_msg_done);
856 	if(r == -1) {
857 		/* close the channel */
858 		dtio_del_output_event(dtio);
859 		dtio_close_output(dtio);
860 		return 0;
861 	} else if(r == 0) {
862 		/* try again later */
863 		return 0;
864 	}
865 	dtio->cur_msg_done += r;
866 	if(dtio->cur_msg_done < dtio->cur_msg_len)
867 		return 0;
868 	return 1;
869 }
870 
871 /** write more of the current messsage. false if incomplete, true if
872  * the message is done */
873 static int dtio_write_more(struct dt_io_thread* dtio)
874 {
875 	if(dtio->cur_msg_len_done < 4) {
876 		if(!dtio_write_more_of_len(dtio))
877 			return 0;
878 	}
879 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
880 		if(!dtio_write_more_of_data(dtio))
881 			return 0;
882 	}
883 	return 1;
884 }
885 
886 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
887  * -1: continue, >0: number of bytes read into buffer */
888 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
889 	ssize_t r;
890 	r = recv(dtio->fd, (void*)buf, len, 0);
891 	if(r == -1) {
892 		char* to = dtio->socket_path;
893 		if(!to) to = dtio->ip_str;
894 		if(!to) to = "";
895 #ifndef USE_WINSOCK
896 		if(errno == EINTR || errno == EAGAIN)
897 			return -1; /* try later */
898 #else
899 		if(WSAGetLastError() == WSAEINPROGRESS) {
900 			return -1; /* try later */
901 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
902 			ub_winsock_tcp_wouldblock(
903 				(dtio->stop_flush_event?
904 				dtio->stop_flush_event:dtio->event),
905 				UB_EV_READ);
906 			return -1; /* try later */
907 		}
908 #endif
909 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
910 			verbosity < 4)
911 			return 0; /* no log retries on low verbosity */
912 		log_err("dnstap io: output closed, recv %s: %s", to,
913 			strerror(errno));
914 		/* and close below */
915 		return 0;
916 	}
917 	if(r == 0) {
918 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
919 			verbosity < 4)
920 			return 0; /* no log retries on low verbosity */
921 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
922 		/* and close below */
923 		return 0;
924 	}
925 	/* something was received */
926 	return r;
927 }
928 
929 #ifdef HAVE_SSL
930 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
931  * -1: continue, >0: number of bytes read into buffer */
932 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
933 {
934 	int r;
935 	ERR_clear_error();
936 	r = SSL_read(dtio->ssl, buf, len);
937 	if(r <= 0) {
938 		int want = SSL_get_error(dtio->ssl, r);
939 		if(want == SSL_ERROR_ZERO_RETURN) {
940 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
941 				verbosity < 4)
942 				return 0; /* no log retries on low verbosity */
943 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
944 				"other side");
945 			return 0;
946 		} else if(want == SSL_ERROR_WANT_READ) {
947 			/* continue later */
948 			return -1;
949 		} else if(want == SSL_ERROR_WANT_WRITE) {
950 			(void)dtio_enable_brief_write(dtio);
951 			return -1;
952 		} else if(want == SSL_ERROR_SYSCALL) {
953 #ifdef ECONNRESET
954 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
955 				errno == ECONNRESET && verbosity < 4)
956 				return 0; /* silence reset by peer */
957 #endif
958 			if(errno != 0)
959 				log_err("SSL_read syscall: %s",
960 					strerror(errno));
961 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
962 				"other side");
963 			return 0;
964 		}
965 		log_crypto_err("could not SSL_read");
966 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
967 				"other side");
968 		return 0;
969 	}
970 	return r;
971 }
972 #endif /* HAVE_SSL */
973 
974 /** check if the output fd has been closed,
975  * it returns false if the stream is closed. */
976 static int dtio_check_close(struct dt_io_thread* dtio)
977 {
978 	/* we don't want to read any packets, but if there are we can
979 	 * discard the input (ignore it).  Ignore of unknown (control)
980 	 * packets is okay for the framestream protocol.  And also, the
981 	 * read call can return that the stream has been closed by the
982 	 * other side. */
983 	uint8_t buf[1024];
984 	int r = -1;
985 
986 
987 	if(dtio->fd == -1) return 0;
988 
989 	while(r != 0) {
990 		/* not interested in buffer content, overwrite */
991 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
992 		if(r == -1)
993 			return 1;
994 	}
995 	/* the other end has been closed */
996 	/* close the channel */
997 	dtio_del_output_event(dtio);
998 	dtio_close_output(dtio);
999 	return 0;
1000 }
1001 
1002 /** Read accept frame. Returns -1: continue reading, 0: closed,
1003  * 1: valid accept received. */
1004 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1005 {
1006 	int r;
1007 	size_t read_frame_done;
1008 	while(dtio->read_frame.frame_len_done < 4) {
1009 #ifdef HAVE_SSL
1010 		if(dtio->ssl) {
1011 			r = ssl_read_bytes(dtio,
1012 				(uint8_t*)&dtio->read_frame.frame_len+
1013 				dtio->read_frame.frame_len_done,
1014 				4-dtio->read_frame.frame_len_done);
1015 		} else {
1016 #endif
1017 			r = receive_bytes(dtio,
1018 				(uint8_t*)&dtio->read_frame.frame_len+
1019 				dtio->read_frame.frame_len_done,
1020 				4-dtio->read_frame.frame_len_done);
1021 #ifdef HAVE_SSL
1022 		}
1023 #endif
1024 		if(r == -1)
1025 			return -1; /* continue reading */
1026 		if(r == 0) {
1027 			 /* connection closed */
1028 			goto close_connection;
1029 		}
1030 		dtio->read_frame.frame_len_done += r;
1031 		if(dtio->read_frame.frame_len_done < 4)
1032 			return -1; /* continue reading */
1033 
1034 		if(dtio->read_frame.frame_len == 0) {
1035 			dtio->read_frame.frame_len_done = 0;
1036 			dtio->read_frame.control_frame = 1;
1037 			continue;
1038 		}
1039 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1040 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1041 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1042 				"length of %d bytes, closing connection",
1043 				DTIO_RECV_FRAME_MAX_LEN);
1044 			goto close_connection;
1045 		}
1046 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1047 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1048 		if(!dtio->read_frame.buf) {
1049 			log_err("dnstap io: out of memory (creating read "
1050 				"buffer)");
1051 			goto close_connection;
1052 		}
1053 	}
1054 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1055 #ifdef HAVE_SSL
1056 		if(dtio->ssl) {
1057 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1058 				dtio->read_frame.buf_count,
1059 				dtio->read_frame.buf_cap-
1060 				dtio->read_frame.buf_count);
1061 		} else {
1062 #endif
1063 			r = receive_bytes(dtio, dtio->read_frame.buf+
1064 				dtio->read_frame.buf_count,
1065 				dtio->read_frame.buf_cap-
1066 				dtio->read_frame.buf_count);
1067 #ifdef HAVE_SSL
1068 		}
1069 #endif
1070 		if(r == -1)
1071 			return -1; /* continue reading */
1072 		if(r == 0) {
1073 			 /* connection closed */
1074 			goto close_connection;
1075 		}
1076 		dtio->read_frame.buf_count += r;
1077 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1078 			return -1; /* continue reading */
1079 	}
1080 
1081 	/* Complete frame received, check if this is a valid ACCEPT control
1082 	 * frame. */
1083 	if(dtio->read_frame.frame_len < 4) {
1084 		verbose(VERB_OPS, "dnstap: invalid data received");
1085 		goto close_connection;
1086 	}
1087 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1088 		FSTRM_CONTROL_FRAME_ACCEPT) {
1089 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1090 			"ignored");
1091 		dtio->ready_frame_sent = 0;
1092 		dtio->accept_frame_received = 0;
1093 		dtio_read_frame_free(&dtio->read_frame);
1094 		return -1;
1095 	}
1096 	read_frame_done = 4; /* control frame type */
1097 
1098 	/* Iterate over control fields, ignore unknown types.
1099 	 * Need to be able to read at least 8 bytes (control field type +
1100 	 * length). */
1101 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1102 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1103 			read_frame_done);
1104 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1105 			read_frame_done + 4);
1106 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1107 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1108 				read_frame_done+8+len <=
1109 				dtio->read_frame.frame_len &&
1110 				memcmp(dtio->read_frame.buf + read_frame_done +
1111 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1112 				if(!dtio_control_start_send(dtio)) {
1113 					verbose(VERB_OPS, "dnstap io: out of "
1114 					 "memory while sending START frame");
1115 					goto close_connection;
1116 				}
1117 				dtio->accept_frame_received = 1;
1118 				return 1;
1119 			} else {
1120 				/* unknow content type */
1121 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1122 					"contains unknown content type, "
1123 					"closing connection");
1124 				goto close_connection;
1125 			}
1126 		}
1127 		/* unknown option, try next */
1128 		read_frame_done += 8+len;
1129 	}
1130 
1131 
1132 close_connection:
1133 	dtio_del_output_event(dtio);
1134 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1135 	dtio_close_output(dtio);
1136 	return 0;
1137 }
1138 
1139 /** add the output file descriptor event for listening, read only */
1140 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1141 {
1142 	if(!dtio->event)
1143 		return 0;
1144 	if(dtio->event_added && !dtio->event_added_is_write)
1145 		return 1;
1146 	/* we have to (re-)register the event */
1147 	if(dtio->event_added)
1148 		ub_event_del(dtio->event);
1149 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1150 	if(ub_event_add(dtio->event, NULL) != 0) {
1151 		log_err("dnstap io: out of memory (adding event)");
1152 		dtio->event_added = 0;
1153 		dtio->event_added_is_write = 0;
1154 		/* close output and start reattempts to open it */
1155 		dtio_close_output(dtio);
1156 		return 0;
1157 	}
1158 	dtio->event_added = 1;
1159 	dtio->event_added_is_write = 0;
1160 	return 1;
1161 }
1162 
1163 /** add the output file descriptor event for listening, read and write */
1164 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1165 {
1166 	if(!dtio->event)
1167 		return 0;
1168 	if(dtio->event_added && dtio->event_added_is_write)
1169 		return 1;
1170 	/* we have to (re-)register the event */
1171 	if(dtio->event_added)
1172 		ub_event_del(dtio->event);
1173 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1174 	if(ub_event_add(dtio->event, NULL) != 0) {
1175 		log_err("dnstap io: out of memory (adding event)");
1176 		dtio->event_added = 0;
1177 		dtio->event_added_is_write = 0;
1178 		/* close output and start reattempts to open it */
1179 		dtio_close_output(dtio);
1180 		return 0;
1181 	}
1182 	dtio->event_added = 1;
1183 	dtio->event_added_is_write = 1;
1184 	return 1;
1185 }
1186 
1187 /** put the dtio thread to sleep */
1188 static void dtio_sleep(struct dt_io_thread* dtio)
1189 {
1190 	/* unregister the event polling for write, because there is
1191 	 * nothing to be written */
1192 	(void)dtio_add_output_event_read(dtio);
1193 }
1194 
1195 #ifdef HAVE_SSL
1196 /** enable the brief read condition */
1197 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1198 {
1199 	dtio->ssl_brief_read = 1;
1200 	if(dtio->stop_flush_event) {
1201 		ub_event_del(dtio->stop_flush_event);
1202 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1203 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1204 			log_err("dnstap io, stop flush, could not ub_event_add");
1205 			return 0;
1206 		}
1207 		return 1;
1208 	}
1209 	return dtio_add_output_event_read(dtio);
1210 }
1211 #endif /* HAVE_SSL */
1212 
1213 #ifdef HAVE_SSL
1214 /** disable the brief read condition */
1215 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1216 {
1217 	dtio->ssl_brief_read = 0;
1218 	if(dtio->stop_flush_event) {
1219 		ub_event_del(dtio->stop_flush_event);
1220 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1221 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1222 			log_err("dnstap io, stop flush, could not ub_event_add");
1223 			return 0;
1224 		}
1225 		return 1;
1226 	}
1227 	return dtio_add_output_event_write(dtio);
1228 }
1229 #endif /* HAVE_SSL */
1230 
1231 #ifdef HAVE_SSL
1232 /** enable the brief write condition */
1233 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1234 {
1235 	dtio->ssl_brief_write = 1;
1236 	return dtio_add_output_event_write(dtio);
1237 }
1238 #endif /* HAVE_SSL */
1239 
1240 #ifdef HAVE_SSL
1241 /** disable the brief write condition */
1242 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1243 {
1244 	dtio->ssl_brief_write = 0;
1245 	return dtio_add_output_event_read(dtio);
1246 }
1247 #endif /* HAVE_SSL */
1248 
1249 #ifdef HAVE_SSL
1250 /** check peer verification after ssl handshake connection, false if closed*/
1251 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1252 {
1253 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1254 		/* verification */
1255 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1256 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1257 			if(!x) {
1258 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1259 					"connection failed no certificate",
1260 					dtio->ip_str);
1261 				return 0;
1262 			}
1263 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1264 				x);
1265 #ifdef HAVE_SSL_GET0_PEERNAME
1266 			if(SSL_get0_peername(dtio->ssl)) {
1267 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1268 					"connection to %s authenticated",
1269 					dtio->ip_str,
1270 					SSL_get0_peername(dtio->ssl));
1271 			} else {
1272 #endif
1273 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1274 					"connection authenticated",
1275 					dtio->ip_str);
1276 #ifdef HAVE_SSL_GET0_PEERNAME
1277 			}
1278 #endif
1279 			X509_free(x);
1280 		} else {
1281 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1282 			if(x) {
1283 				log_cert(VERB_ALGO, "dnstap io, peer "
1284 					"certificate", x);
1285 				X509_free(x);
1286 			}
1287 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1288 				"failed: failed to authenticate",
1289 				dtio->ip_str);
1290 			return 0;
1291 		}
1292 	} else {
1293 		/* unauthenticated, the verify peer flag was not set
1294 		 * in ssl when the ssl object was created from ssl_ctx */
1295 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1296 			dtio->ip_str);
1297 	}
1298 	return 1;
1299 }
1300 #endif /* HAVE_SSL */
1301 
1302 #ifdef HAVE_SSL
1303 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1304 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1305 	struct stop_flush_info* info)
1306 {
1307 	int r;
1308 	if(dtio->ssl_brief_read) {
1309 		/* assume the brief read condition is satisfied,
1310 		 * if we need more or again, we can set it again */
1311 		if(!dtio_disable_brief_read(dtio)) {
1312 			if(info) dtio_stop_flush_exit(info);
1313 			return 0;
1314 		}
1315 	}
1316 	if(dtio->ssl_handshake_done)
1317 		return 1;
1318 
1319 	ERR_clear_error();
1320 	r = SSL_do_handshake(dtio->ssl);
1321 	if(r != 1) {
1322 		int want = SSL_get_error(dtio->ssl, r);
1323 		if(want == SSL_ERROR_WANT_READ) {
1324 			/* we want to read on the connection */
1325 			if(!dtio_enable_brief_read(dtio)) {
1326 				if(info) dtio_stop_flush_exit(info);
1327 				return 0;
1328 			}
1329 			return 0;
1330 		} else if(want == SSL_ERROR_WANT_WRITE) {
1331 			/* we want to write on the connection */
1332 			return 0;
1333 		} else if(r == 0) {
1334 			/* closed */
1335 			if(info) dtio_stop_flush_exit(info);
1336 			dtio_del_output_event(dtio);
1337 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1338 			dtio_close_output(dtio);
1339 			return 0;
1340 		} else if(want == SSL_ERROR_SYSCALL) {
1341 			/* SYSCALL and errno==0 means closed uncleanly */
1342 			int silent = 0;
1343 #ifdef EPIPE
1344 			if(errno == EPIPE && verbosity < 2)
1345 				silent = 1; /* silence 'broken pipe' */
1346 #endif
1347 #ifdef ECONNRESET
1348 			if(errno == ECONNRESET && verbosity < 2)
1349 				silent = 1; /* silence reset by peer */
1350 #endif
1351 			if(errno == 0)
1352 				silent = 1;
1353 			if(!silent)
1354 				log_err("dnstap io, SSL_handshake syscall: %s",
1355 					strerror(errno));
1356 			/* closed */
1357 			if(info) dtio_stop_flush_exit(info);
1358 			dtio_del_output_event(dtio);
1359 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1360 			dtio_close_output(dtio);
1361 			return 0;
1362 		} else {
1363 			unsigned long err = ERR_get_error();
1364 			if(!squelch_err_ssl_handshake(err)) {
1365 				log_crypto_err_code("dnstap io, ssl handshake failed",
1366 					err);
1367 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1368 					"from %s", dtio->ip_str);
1369 			}
1370 			/* closed */
1371 			if(info) dtio_stop_flush_exit(info);
1372 			dtio_del_output_event(dtio);
1373 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1374 			dtio_close_output(dtio);
1375 			return 0;
1376 		}
1377 
1378 	}
1379 	/* check peer verification */
1380 	dtio->ssl_handshake_done = 1;
1381 
1382 	if(!dtio_ssl_check_peer(dtio)) {
1383 		/* closed */
1384 		if(info) dtio_stop_flush_exit(info);
1385 		dtio_del_output_event(dtio);
1386 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1387 		dtio_close_output(dtio);
1388 		return 0;
1389 	}
1390 	return 1;
1391 }
1392 #endif /* HAVE_SSL */
1393 
1394 /** callback for the dnstap events, to write to the output */
1395 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1396 {
1397 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1398 	int i;
1399 
1400 	if(dtio->check_nb_connect) {
1401 		int connect_err = dtio_check_nb_connect(dtio);
1402 		if(connect_err == -1) {
1403 			/* close the channel */
1404 			dtio_del_output_event(dtio);
1405 			dtio_close_output(dtio);
1406 			return;
1407 		} else if(connect_err == 0) {
1408 			/* try again later */
1409 			return;
1410 		}
1411 		/* nonblocking connect check passed, continue */
1412 	}
1413 
1414 #ifdef HAVE_SSL
1415 	if(dtio->ssl &&
1416 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1417 		if(!dtio_ssl_handshake(dtio, NULL))
1418 			return;
1419 	}
1420 #endif
1421 
1422 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1423 		if(dtio->ssl_brief_write)
1424 			(void)dtio_disable_brief_write(dtio);
1425 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1426 			if(dtio_read_accept_frame(dtio) <= 0)
1427 				return;
1428 		} else if(!dtio_check_close(dtio))
1429 			return;
1430 	}
1431 
1432 	/* loop to process a number of messages.  This improves throughput,
1433 	 * because selecting on write-event if not needed for busy messages
1434 	 * (dnstap log) generation and if they need to all be written back.
1435 	 * The write event is usually not blocked up.  But not forever,
1436 	 * because the event loop needs to stay responsive for other events.
1437 	 * If there are no (more) messages, or if the output buffers get
1438 	 * full, it returns out of the loop. */
1439 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1440 		/* see if there are messages that need writing */
1441 		if(!dtio->cur_msg) {
1442 			if(!dtio_find_msg(dtio)) {
1443 				if(i == 0) {
1444 					/* no messages on the first iteration,
1445 					 * the queues are all empty */
1446 					dtio_sleep(dtio);
1447 				}
1448 				return; /* nothing to do */
1449 			}
1450 		}
1451 
1452 		/* write it */
1453 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1454 			if(!dtio_write_more(dtio))
1455 				return;
1456 		}
1457 
1458 		/* done with the current message */
1459 		dtio_cur_msg_free(dtio);
1460 
1461 		/* If this is a bidirectional stream the first message will be
1462 		 * the READY control frame. We can only continue writing after
1463 		 * receiving an ACCEPT control frame. */
1464 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1465 			dtio->ready_frame_sent = 1;
1466 			(void)dtio_add_output_event_read(dtio);
1467 			break;
1468 		}
1469 	}
1470 }
1471 
1472 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1473 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1474 {
1475 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1476 	uint8_t cmd;
1477 	ssize_t r;
1478 	if(dtio->want_to_exit)
1479 		return;
1480 	r = read(fd, &cmd, sizeof(cmd));
1481 	if(r == -1) {
1482 #ifndef USE_WINSOCK
1483 		if(errno == EINTR || errno == EAGAIN)
1484 			return; /* ignore this */
1485 		log_err("dnstap io: failed to read: %s", strerror(errno));
1486 #else
1487 		if(WSAGetLastError() == WSAEINPROGRESS)
1488 			return;
1489 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1490 			return;
1491 		log_err("dnstap io: failed to read: %s",
1492 			wsa_strerror(WSAGetLastError()));
1493 #endif
1494 		/* and then fall through to quit the thread */
1495 	} else if(r == 0) {
1496 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1497 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1498 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1499 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1500 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1501 
1502 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1503 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1504 				"waiting for ACCEPT control frame");
1505 			return;
1506 		}
1507 
1508 		/* reregister event */
1509 		if(!dtio_add_output_event_write(dtio))
1510 			return;
1511 		return;
1512 	} else if(r == 1) {
1513 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1514 	}
1515 	dtio->want_to_exit = 1;
1516 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1517 		!= 0) {
1518 		log_err("dnstap io: could not loopexit");
1519 	}
1520 }
1521 
1522 #ifndef THREADS_DISABLED
1523 /** setup the event base for the dnstap io thread */
1524 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1525 	struct timeval* now)
1526 {
1527 	memset(now, 0, sizeof(*now));
1528 	dtio->event_base = ub_default_event_base(0, secs, now);
1529 	if(!dtio->event_base) {
1530 		fatal_exit("dnstap io: could not create event_base");
1531 	}
1532 }
1533 #endif /* THREADS_DISABLED */
1534 
1535 /** setup the cmd event for dnstap io */
1536 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1537 {
1538 	struct ub_event* cmdev;
1539 	fd_set_nonblock(dtio->commandpipe[0]);
1540 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1541 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1542 	if(!cmdev) {
1543 		fatal_exit("dnstap io: out of memory");
1544 	}
1545 	dtio->command_event = cmdev;
1546 	if(ub_event_add(cmdev, NULL) != 0) {
1547 		fatal_exit("dnstap io: out of memory (adding event)");
1548 	}
1549 }
1550 
1551 /** setup the reconnect event for dnstap io */
1552 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1553 {
1554 	dtio_reconnect_clear(dtio);
1555 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1556 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1557 	if(!dtio->reconnect_timer) {
1558 		fatal_exit("dnstap io: out of memory");
1559 	}
1560 }
1561 
1562 /**
1563  * structure to keep track of information during stop flush
1564  */
1565 struct stop_flush_info {
1566 	/** the event base during stop flush */
1567 	struct ub_event_base* base;
1568 	/** did we already want to exit this stop-flush event base */
1569 	int want_to_exit_flush;
1570 	/** has the timer fired */
1571 	int timer_done;
1572 	/** the dtio */
1573 	struct dt_io_thread* dtio;
1574 	/** the stop control frame */
1575 	void* stop_frame;
1576 	/** length of the stop frame */
1577 	size_t stop_frame_len;
1578 	/** how much we have done of the stop frame */
1579 	size_t stop_frame_done;
1580 };
1581 
1582 /** exit the stop flush base */
1583 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1584 {
1585 	if(info->want_to_exit_flush)
1586 		return;
1587 	info->want_to_exit_flush = 1;
1588 	if(ub_event_base_loopexit(info->base) != 0) {
1589 		log_err("dnstap io: could not loopexit");
1590 	}
1591 }
1592 
1593 /** send the stop control,
1594  * return true if completed the frame. */
1595 static int dtio_control_stop_send(struct stop_flush_info* info)
1596 {
1597 	struct dt_io_thread* dtio = info->dtio;
1598 	int r;
1599 	if(info->stop_frame_done >= info->stop_frame_len)
1600 		return 1;
1601 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1602 		info->stop_frame_done, info->stop_frame_len -
1603 		info->stop_frame_done);
1604 	if(r == -1) {
1605 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1606 		dtio_stop_flush_exit(info);
1607 		return 0;
1608 	}
1609 	if(r == 0) {
1610 		/* try again later, or timeout */
1611 		return 0;
1612 	}
1613 	info->stop_frame_done += r;
1614 	if(info->stop_frame_done < info->stop_frame_len)
1615 		return 0; /* not done yet */
1616 	return 1;
1617 }
1618 
1619 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1620 	void* arg)
1621 {
1622 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1623 	if(info->want_to_exit_flush)
1624 		return;
1625 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1626 	info->timer_done = 1;
1627 	dtio_stop_flush_exit(info);
1628 }
1629 
1630 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1631 {
1632 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1633 	struct dt_io_thread* dtio = info->dtio;
1634 	if(info->want_to_exit_flush)
1635 		return;
1636 	if(dtio->check_nb_connect) {
1637 		/* we don't start the stop_flush if connect still
1638 		 * in progress, but the check code is here, just in case */
1639 		int connect_err = dtio_check_nb_connect(dtio);
1640 		if(connect_err == -1) {
1641 			/* close the channel, exit the stop flush */
1642 			dtio_stop_flush_exit(info);
1643 			dtio_del_output_event(dtio);
1644 			dtio_close_output(dtio);
1645 			return;
1646 		} else if(connect_err == 0) {
1647 			/* try again later */
1648 			return;
1649 		}
1650 		/* nonblocking connect check passed, continue */
1651 	}
1652 #ifdef HAVE_SSL
1653 	if(dtio->ssl &&
1654 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1655 		if(!dtio_ssl_handshake(dtio, info))
1656 			return;
1657 	}
1658 #endif
1659 
1660 	if((bits&UB_EV_READ)) {
1661 		if(!dtio_check_close(dtio)) {
1662 			if(dtio->fd == -1) {
1663 				verbose(VERB_ALGO, "dnstap io: "
1664 					"stop flush: output closed");
1665 				dtio_stop_flush_exit(info);
1666 			}
1667 			return;
1668 		}
1669 	}
1670 	/* write remainder of last frame */
1671 	if(dtio->cur_msg) {
1672 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1673 			if(!dtio_write_more(dtio)) {
1674 				if(dtio->fd == -1) {
1675 					verbose(VERB_ALGO, "dnstap io: "
1676 						"stop flush: output closed");
1677 					dtio_stop_flush_exit(info);
1678 				}
1679 				return;
1680 			}
1681 		}
1682 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1683 			"last frame");
1684 		dtio_cur_msg_free(dtio);
1685 	}
1686 	/* write stop frame */
1687 	if(info->stop_frame_done < info->stop_frame_len) {
1688 		if(!dtio_control_stop_send(info))
1689 			return;
1690 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1691 			"stop control frame");
1692 	}
1693 	/* when last frame and stop frame are sent, exit */
1694 	dtio_stop_flush_exit(info);
1695 }
1696 
1697 /** flush at end, last packet and stop control */
1698 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1699 {
1700 	/* briefly attempt to flush the previous packet to the output,
1701 	 * this could be a partial packet, or even the start control frame */
1702 	time_t secs = 0;
1703 	struct timeval now;
1704 	struct stop_flush_info info;
1705 	struct timeval tv;
1706 	struct ub_event* timer, *stopev;
1707 
1708 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1709 		/* no connection or we have just connected, so nothing is
1710 		 * sent yet, so nothing to stop or flush */
1711 		return;
1712 	}
1713 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1714 		/* no SSL connection has been established yet */
1715 		return;
1716 	}
1717 
1718 	memset(&info, 0, sizeof(info));
1719 	memset(&now, 0, sizeof(now));
1720 	info.dtio = dtio;
1721 	info.base = ub_default_event_base(0, &secs, &now);
1722 	if(!info.base) {
1723 		log_err("dnstap io: malloc failure");
1724 		return;
1725 	}
1726 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1727 		&dtio_stop_timer_cb, &info);
1728 	if(!timer) {
1729 		log_err("dnstap io: malloc failure");
1730 		ub_event_base_free(info.base);
1731 		return;
1732 	}
1733 	memset(&tv, 0, sizeof(tv));
1734 	tv.tv_sec = 2;
1735 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1736 		&tv) != 0) {
1737 		log_err("dnstap io: cannot event_timer_add");
1738 		ub_event_free(timer);
1739 		ub_event_base_free(info.base);
1740 		return;
1741 	}
1742 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1743 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1744 	if(!stopev) {
1745 		log_err("dnstap io: malloc failure");
1746 		ub_timer_del(timer);
1747 		ub_event_free(timer);
1748 		ub_event_base_free(info.base);
1749 		return;
1750 	}
1751 	if(ub_event_add(stopev, NULL) != 0) {
1752 		log_err("dnstap io: cannot event_add");
1753 		ub_event_free(stopev);
1754 		ub_timer_del(timer);
1755 		ub_event_free(timer);
1756 		ub_event_base_free(info.base);
1757 		return;
1758 	}
1759 	info.stop_frame = fstrm_create_control_frame_stop(
1760 		&info.stop_frame_len);
1761 	if(!info.stop_frame) {
1762 		log_err("dnstap io: malloc failure");
1763 		ub_event_del(stopev);
1764 		ub_event_free(stopev);
1765 		ub_timer_del(timer);
1766 		ub_event_free(timer);
1767 		ub_event_base_free(info.base);
1768 		return;
1769 	}
1770 	dtio->stop_flush_event = stopev;
1771 
1772 	/* wait briefly, or until finished */
1773 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1774 	if(ub_event_base_dispatch(info.base) < 0) {
1775 		log_err("dnstap io: dispatch flush failed, errno is %s",
1776 			strerror(errno));
1777 	}
1778 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1779 	free(info.stop_frame);
1780 	dtio->stop_flush_event = NULL;
1781 	ub_event_del(stopev);
1782 	ub_event_free(stopev);
1783 	ub_timer_del(timer);
1784 	ub_event_free(timer);
1785 	ub_event_base_free(info.base);
1786 }
1787 
1788 /** perform desetup and free stuff when the dnstap io thread exits */
1789 static void dtio_desetup(struct dt_io_thread* dtio)
1790 {
1791 	dtio_control_stop_flush(dtio);
1792 	dtio_del_output_event(dtio);
1793 	dtio_close_output(dtio);
1794 	ub_event_del(dtio->command_event);
1795 	ub_event_free(dtio->command_event);
1796 #ifndef USE_WINSOCK
1797 	close(dtio->commandpipe[0]);
1798 #else
1799 	_close(dtio->commandpipe[0]);
1800 #endif
1801 	dtio->commandpipe[0] = -1;
1802 	dtio_reconnect_del(dtio);
1803 	ub_event_free(dtio->reconnect_timer);
1804 	dtio_cur_msg_free(dtio);
1805 #ifndef THREADS_DISABLED
1806 	ub_event_base_free(dtio->event_base);
1807 #endif
1808 }
1809 
1810 /** setup a start control message */
1811 static int dtio_control_start_send(struct dt_io_thread* dtio)
1812 {
1813 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1814 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1815 		&dtio->cur_msg_len);
1816 	if(!dtio->cur_msg) {
1817 		return 0;
1818 	}
1819 	/* setup to send the control message */
1820 	/* set that the buffer needs to be sent, but the length
1821 	 * of that buffer is already written, that way the buffer can
1822 	 * start with 0 length and then the length of the control frame
1823 	 * in it */
1824 	dtio->cur_msg_done = 0;
1825 	dtio->cur_msg_len_done = 4;
1826 	return 1;
1827 }
1828 
1829 /** setup a ready control message */
1830 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1831 {
1832 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1833 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1834 		&dtio->cur_msg_len);
1835 	if(!dtio->cur_msg) {
1836 		return 0;
1837 	}
1838 	/* setup to send the control message */
1839 	/* set that the buffer needs to be sent, but the length
1840 	 * of that buffer is already written, that way the buffer can
1841 	 * start with 0 length and then the length of the control frame
1842 	 * in it */
1843 	dtio->cur_msg_done = 0;
1844 	dtio->cur_msg_len_done = 4;
1845 	return 1;
1846 }
1847 
1848 /** open the output file descriptor for af_local */
1849 static int dtio_open_output_local(struct dt_io_thread* dtio)
1850 {
1851 #ifdef HAVE_SYS_UN_H
1852 	struct sockaddr_un s;
1853 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1854 	if(dtio->fd == -1) {
1855 #ifndef USE_WINSOCK
1856 		log_err("dnstap io: failed to create socket: %s",
1857 			strerror(errno));
1858 #else
1859 		log_err("dnstap io: failed to create socket: %s",
1860 			wsa_strerror(WSAGetLastError()));
1861 #endif
1862 		return 0;
1863 	}
1864 	memset(&s, 0, sizeof(s));
1865 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1866         /* this member exists on BSDs, not Linux */
1867         s.sun_len = (unsigned)sizeof(s);
1868 #endif
1869 	s.sun_family = AF_LOCAL;
1870 	/* length is 92-108, 104 on FreeBSD */
1871         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1872 	fd_set_nonblock(dtio->fd);
1873 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1874 		== -1) {
1875 		char* to = dtio->socket_path;
1876 #ifndef USE_WINSOCK
1877 		log_err("dnstap io: failed to connect to \"%s\": %s",
1878 			to, strerror(errno));
1879 #else
1880 		log_err("dnstap io: failed to connect to \"%s\": %s",
1881 			to, wsa_strerror(WSAGetLastError()));
1882 #endif
1883 		dtio_close_fd(dtio);
1884 		return 0;
1885 	}
1886 	return 1;
1887 #else
1888 	log_err("cannot create af_local socket");
1889 	return 0;
1890 #endif /* HAVE_SYS_UN_H */
1891 }
1892 
1893 /** open the output file descriptor for af_inet and af_inet6 */
1894 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1895 {
1896 	struct sockaddr_storage addr;
1897 	socklen_t addrlen;
1898 	memset(&addr, 0, sizeof(addr));
1899 	addrlen = (socklen_t)sizeof(addr);
1900 
1901 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1902 		log_err("could not parse IP '%s'", dtio->ip_str);
1903 		return 0;
1904 	}
1905 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1906 	if(dtio->fd == -1) {
1907 #ifndef USE_WINSOCK
1908 		log_err("can't create socket: %s", strerror(errno));
1909 #else
1910 		log_err("can't create socket: %s",
1911 			wsa_strerror(WSAGetLastError()));
1912 #endif
1913 		return 0;
1914 	}
1915 	fd_set_nonblock(dtio->fd);
1916 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1917 		if(errno == EINPROGRESS)
1918 			return 1; /* wait until connect done*/
1919 #ifndef USE_WINSOCK
1920 		if(tcp_connect_errno_needs_log(
1921 			(struct sockaddr *)&addr, addrlen)) {
1922 			log_err("dnstap io: failed to connect to %s: %s",
1923 				dtio->ip_str, strerror(errno));
1924 		}
1925 #else
1926 		if(WSAGetLastError() == WSAEINPROGRESS ||
1927 			WSAGetLastError() == WSAEWOULDBLOCK)
1928 			return 1; /* wait until connect done*/
1929 		if(tcp_connect_errno_needs_log(
1930 			(struct sockaddr *)&addr, addrlen)) {
1931 			log_err("dnstap io: failed to connect to %s: %s",
1932 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1933 		}
1934 #endif
1935 		dtio_close_fd(dtio);
1936 		return 0;
1937 	}
1938 	return 1;
1939 }
1940 
1941 /** setup the SSL structure for new connection */
1942 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1943 {
1944 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1945 	if(!dtio->ssl) return 0;
1946 	dtio->ssl_handshake_done = 0;
1947 	dtio->ssl_brief_read = 0;
1948 
1949 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
1950 		dtio->tls_use_sni)) {
1951 		return 0;
1952 	}
1953 	return 1;
1954 }
1955 
1956 /** open the output file descriptor */
1957 static void dtio_open_output(struct dt_io_thread* dtio)
1958 {
1959 	struct ub_event* ev;
1960 	if(dtio->upstream_is_unix) {
1961 		if(!dtio_open_output_local(dtio)) {
1962 			dtio_reconnect_enable(dtio);
1963 			return;
1964 		}
1965 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
1966 		if(!dtio_open_output_tcp(dtio)) {
1967 			dtio_reconnect_enable(dtio);
1968 			return;
1969 		}
1970 		if(dtio->upstream_is_tls) {
1971 			if(!dtio_setup_ssl(dtio)) {
1972 				dtio_close_fd(dtio);
1973 				dtio_reconnect_enable(dtio);
1974 				return;
1975 			}
1976 		}
1977 	}
1978 	dtio->check_nb_connect = 1;
1979 
1980 	/* the EV_READ is to read ACCEPT control messages, and catch channel
1981 	 * close. EV_WRITE is to write packets */
1982 	ev = ub_event_new(dtio->event_base, dtio->fd,
1983 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
1984 		dtio);
1985 	if(!ev) {
1986 		log_err("dnstap io: out of memory");
1987 		if(dtio->ssl) {
1988 #ifdef HAVE_SSL
1989 			SSL_free(dtio->ssl);
1990 			dtio->ssl = NULL;
1991 #endif
1992 		}
1993 		dtio_close_fd(dtio);
1994 		dtio_reconnect_enable(dtio);
1995 		return;
1996 	}
1997 	dtio->event = ev;
1998 
1999 	/* setup protocol control message to start */
2000 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2001 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2002 		log_err("dnstap io: out of memory");
2003 		ub_event_free(dtio->event);
2004 		dtio->event = NULL;
2005 		if(dtio->ssl) {
2006 #ifdef HAVE_SSL
2007 			SSL_free(dtio->ssl);
2008 			dtio->ssl = NULL;
2009 #endif
2010 		}
2011 		dtio_close_fd(dtio);
2012 		dtio_reconnect_enable(dtio);
2013 		return;
2014 	}
2015 }
2016 
2017 /** perform the setup of the writer thread on the established event_base */
2018 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2019 {
2020 	dtio_setup_cmd(dtio);
2021 	dtio_setup_reconnect(dtio);
2022 	dtio_open_output(dtio);
2023 	if(!dtio_add_output_event_write(dtio))
2024 		return;
2025 }
2026 
2027 #ifndef THREADS_DISABLED
2028 /** the IO thread function for the DNSTAP IO */
2029 static void* dnstap_io(void* arg)
2030 {
2031 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2032 	time_t secs = 0;
2033 	struct timeval now;
2034 	log_thread_set(&dtio->threadnum);
2035 
2036 	/* setup */
2037 	verbose(VERB_ALGO, "start dnstap io thread");
2038 	dtio_setup_base(dtio, &secs, &now);
2039 	dtio_setup_on_base(dtio);
2040 
2041 	/* run */
2042 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2043 		log_err("dnstap io: dispatch failed, errno is %s",
2044 			strerror(errno));
2045 	}
2046 
2047 	/* cleanup */
2048 	verbose(VERB_ALGO, "stop dnstap io thread");
2049 	dtio_desetup(dtio);
2050 	return NULL;
2051 }
2052 #endif /* THREADS_DISABLED */
2053 
2054 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2055 	int numworkers)
2056 {
2057 	/* set up the thread, can fail */
2058 #ifndef USE_WINSOCK
2059 	if(pipe(dtio->commandpipe) == -1) {
2060 		log_err("failed to create pipe: %s", strerror(errno));
2061 		return 0;
2062 	}
2063 #else
2064 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2065 		log_err("failed to create _pipe: %s",
2066 			wsa_strerror(WSAGetLastError()));
2067 		return 0;
2068 	}
2069 #endif
2070 
2071 	/* start the thread */
2072 	dtio->threadnum = numworkers+1;
2073 	dtio->started = 1;
2074 #ifndef THREADS_DISABLED
2075 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2076 	(void)event_base_nothr;
2077 #else
2078 	dtio->event_base = event_base_nothr;
2079 	dtio_setup_on_base(dtio);
2080 #endif
2081 	return 1;
2082 }
2083 
2084 void dt_io_thread_stop(struct dt_io_thread* dtio)
2085 {
2086 #ifndef THREADS_DISABLED
2087 	uint8_t cmd = DTIO_COMMAND_STOP;
2088 #endif
2089 	if(!dtio) return;
2090 	if(!dtio->started) return;
2091 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2092 
2093 #ifndef THREADS_DISABLED
2094 	while(1) {
2095 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2096 		if(r == -1) {
2097 #ifndef USE_WINSOCK
2098 			if(errno == EINTR || errno == EAGAIN)
2099 				continue;
2100 			log_err("dnstap io stop: write: %s", strerror(errno));
2101 #else
2102 			if(WSAGetLastError() == WSAEINPROGRESS)
2103 				continue;
2104 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2105 				continue;
2106 			log_err("dnstap io stop: write: %s",
2107 				wsa_strerror(WSAGetLastError()));
2108 #endif
2109 			break;
2110 		}
2111 		break;
2112 	}
2113 	dtio->started = 0;
2114 #endif /* THREADS_DISABLED */
2115 
2116 #ifndef USE_WINSOCK
2117 	close(dtio->commandpipe[1]);
2118 #else
2119 	_close(dtio->commandpipe[1]);
2120 #endif
2121 	dtio->commandpipe[1] = -1;
2122 #ifndef THREADS_DISABLED
2123 	ub_thread_join(dtio->tid);
2124 #else
2125 	dtio->want_to_exit = 1;
2126 	dtio_desetup(dtio);
2127 #endif
2128 }
2129