xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision ec0ea6efa1ad229d75c394c1a9b9cac33af2b1d3)
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62 
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73 
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76 
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 	/** DTIO command channel stop */
81 	DTIO_COMMAND_STOP = 0,
82 	/** DTIO command channel wakeup */
83 	DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85 
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102 
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 	if(!mq) return NULL;
108 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 		about 1 M should contain 64K messages with some overhead,
110 		or a whole bunch smaller ones */
111 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 	if(!mq->wakeup_timer) {
113 		free(mq);
114 		return NULL;
115 	}
116 	lock_basic_init(&mq->lock);
117 	lock_protect(&mq->lock, mq, sizeof(*mq));
118 	return mq;
119 }
120 
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 	struct dt_msg_entry* e = mq->first, *next=NULL;
126 	while(e) {
127 		next = e->next;
128 		free(e->buf);
129 		free(e);
130 		e = next;
131 	}
132 	mq->first = NULL;
133 	mq->last = NULL;
134 	mq->cursize = 0;
135 	mq->msgcount = 0;
136 }
137 
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 	if(!mq) return;
142 	lock_basic_destroy(&mq->lock);
143 	dt_msg_queue_clear(mq);
144 	comm_timer_delete(mq->wakeup_timer);
145 	free(mq);
146 }
147 
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 	if(!dtio) return;
153 	if(!dtio->started) return;
154 
155 	while(1) {
156 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 		if(r == -1) {
158 #ifndef USE_WINSOCK
159 			if(errno == EINTR || errno == EAGAIN)
160 				continue;
161 #else
162 			if(WSAGetLastError() == WSAEINPROGRESS)
163 				continue;
164 			if(WSAGetLastError() == WSAEWOULDBLOCK)
165 				continue;
166 #endif
167 			log_err("dnstap io wakeup: write: %s",
168 				sock_strerror(errno));
169 			break;
170 		}
171 		break;
172 	}
173 }
174 
175 void
176 mq_wakeup_cb(void* arg)
177 {
178 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 	/* even if the dtio is already active, because perhaps much
180 	 * traffic suddenly, we leave the timer running to save on
181 	 * managing it, the once a second timer is less work then
182 	 * starting and stopping the timer frequently */
183 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 	mq->dtio->wakeup_timer_enabled = 0;
185 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 	dtio_wakeup(mq->dtio);
187 }
188 
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192 {
193 	struct timeval tv;
194 	/* Start a timer to process messages to be logged.
195 	 * If we woke up the dtio thread for every message, the wakeup
196 	 * messages take up too much processing power.  If the queue
197 	 * fills up the wakeup happens immediately.  The timer wakes it up
198 	 * if there are infrequent messages to log. */
199 
200 	/* we cannot start a timer in dtio thread, because it is a different
201 	 * thread and its event base is in use by the other thread, it would
202 	 * give race conditions if we tried to modify its event base,
203 	 * and locks would wait until it woke up, and this is what we do. */
204 
205 	/* do not start the timer if a timer already exists, perhaps
206 	 * in another worker.  So this variable is protected by a lock in
207 	 * dtio */
208 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 	if(mq->dtio->wakeup_timer_enabled) {
210 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211 		return;
212 	}
213 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215 
216 	/* start the timer, in mq, in the event base of our worker */
217 	tv.tv_sec = 1;
218 	tv.tv_usec = 0;
219 	comm_timer_set(mq->wakeup_timer, &tv);
220 }
221 
222 void
223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224 {
225 	int wakeupnow = 0, wakeupstarttimer = 0;
226 	struct dt_msg_entry* entry;
227 
228 	/* check conditions */
229 	if(!buf) return;
230 	if(len == 0) {
231 		/* it is not possible to log entries with zero length,
232 		 * because the framestream protocol does not carry it.
233 		 * However the protobuf serialization does not create zero
234 		 * length datagrams for dnstap, so this should not happen. */
235 		free(buf);
236 		return;
237 	}
238 	if(!mq) {
239 		free(buf);
240 		return;
241 	}
242 
243 	/* allocate memory for queue entry */
244 	entry = malloc(sizeof(*entry));
245 	if(!entry) {
246 		log_err("out of memory logging dnstap");
247 		free(buf);
248 		return;
249 	}
250 	entry->next = NULL;
251 	entry->buf = buf;
252 	entry->len = len;
253 
254 	/* acquire lock */
255 	lock_basic_lock(&mq->lock);
256 	/* if list was empty, start timer for (eventual) wakeup */
257 	if(mq->first == NULL)
258 		wakeupstarttimer = 1;
259 	/* if list contains more than wakeupnum elements, wakeup now,
260 	 * or if list is (going to be) almost full */
261 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262 		(mq->cursize < mq->maxsize * 9 / 10 &&
263 		mq->cursize+len >= mq->maxsize * 9 / 10))
264 		wakeupnow = 1;
265 	/* see if it is going to fit */
266 	if(mq->cursize + len > mq->maxsize) {
267 		/* buffer full, or congested. */
268 		/* drop */
269 		lock_basic_unlock(&mq->lock);
270 		free(buf);
271 		free(entry);
272 		return;
273 	}
274 	mq->cursize += len;
275 	mq->msgcount ++;
276 	/* append to list */
277 	if(mq->last) {
278 		mq->last->next = entry;
279 	} else {
280 		mq->first = entry;
281 	}
282 	mq->last = entry;
283 	/* release lock */
284 	lock_basic_unlock(&mq->lock);
285 
286 	if(wakeupnow) {
287 		dtio_wakeup(mq->dtio);
288 	} else if(wakeupstarttimer) {
289 		dt_msg_queue_start_timer(mq);
290 	}
291 }
292 
293 struct dt_io_thread* dt_io_thread_create(void)
294 {
295 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 	lock_basic_init(&dtio->wakeup_timer_lock);
297 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 		sizeof(dtio->wakeup_timer_enabled));
299 	return dtio;
300 }
301 
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
303 {
304 	struct dt_io_list_item* item, *nextitem;
305 	if(!dtio) return;
306 	lock_basic_destroy(&dtio->wakeup_timer_lock);
307 	item=dtio->io_list;
308 	while(item) {
309 		nextitem = item->next;
310 		free(item);
311 		item = nextitem;
312 	}
313 	free(dtio->socket_path);
314 	free(dtio->ip_str);
315 	free(dtio->tls_server_name);
316 	free(dtio->client_key_file);
317 	free(dtio->client_cert_file);
318 	if(dtio->ssl_ctx) {
319 #ifdef HAVE_SSL
320 		SSL_CTX_free(dtio->ssl_ctx);
321 #endif
322 	}
323 	free(dtio);
324 }
325 
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327 {
328 	if(!cfg->dnstap) {
329 		log_warn("cannot setup dnstap because dnstap-enable is no");
330 		return 0;
331 	}
332 
333 	/* what type of connectivity do we have */
334 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335 		if(cfg->dnstap_tls)
336 			dtio->upstream_is_tls = 1;
337 		else	dtio->upstream_is_tcp = 1;
338 	} else {
339 		dtio->upstream_is_unix = 1;
340 	}
341 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
342 
343 	if(dtio->upstream_is_unix) {
344 		char* nm;
345 		if(!cfg->dnstap_socket_path ||
346 			cfg->dnstap_socket_path[0]==0) {
347 			log_err("dnstap setup: no dnstap-socket-path for "
348 				"socket connect");
349 			return 0;
350 		}
351 		nm = cfg->dnstap_socket_path;
352 		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
353 			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
354 			nm += strlen(cfg->chrootdir);
355 		free(dtio->socket_path);
356 		dtio->socket_path = strdup(nm);
357 		if(!dtio->socket_path) {
358 			log_err("dnstap setup: malloc failure");
359 			return 0;
360 		}
361 	}
362 
363 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
364 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
365 			log_err("dnstap setup: no dnstap-ip for TCP connect");
366 			return 0;
367 		}
368 		free(dtio->ip_str);
369 		dtio->ip_str = strdup(cfg->dnstap_ip);
370 		if(!dtio->ip_str) {
371 			log_err("dnstap setup: malloc failure");
372 			return 0;
373 		}
374 	}
375 
376 	if(dtio->upstream_is_tls) {
377 #ifdef HAVE_SSL
378 		if(cfg->dnstap_tls_server_name &&
379 			cfg->dnstap_tls_server_name[0]) {
380 			free(dtio->tls_server_name);
381 			dtio->tls_server_name = strdup(
382 				cfg->dnstap_tls_server_name);
383 			if(!dtio->tls_server_name) {
384 				log_err("dnstap setup: malloc failure");
385 				return 0;
386 			}
387 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
388 				return 0;
389 		}
390 		if(cfg->dnstap_tls_client_key_file &&
391 			cfg->dnstap_tls_client_key_file[0]) {
392 			dtio->use_client_certs = 1;
393 			free(dtio->client_key_file);
394 			dtio->client_key_file = strdup(
395 				cfg->dnstap_tls_client_key_file);
396 			if(!dtio->client_key_file) {
397 				log_err("dnstap setup: malloc failure");
398 				return 0;
399 			}
400 			if(!cfg->dnstap_tls_client_cert_file ||
401 				cfg->dnstap_tls_client_cert_file[0]==0) {
402 				log_err("dnstap setup: client key "
403 					"authentication enabled with "
404 					"dnstap-tls-client-key-file, but "
405 					"no dnstap-tls-client-cert-file "
406 					"is given");
407 				return 0;
408 			}
409 			free(dtio->client_cert_file);
410 			dtio->client_cert_file = strdup(
411 				cfg->dnstap_tls_client_cert_file);
412 			if(!dtio->client_cert_file) {
413 				log_err("dnstap setup: malloc failure");
414 				return 0;
415 			}
416 		} else {
417 			dtio->use_client_certs = 0;
418 			dtio->client_key_file = NULL;
419 			dtio->client_cert_file = NULL;
420 		}
421 
422 		if(cfg->dnstap_tls_cert_bundle) {
423 			dtio->ssl_ctx = connect_sslctx_create(
424 				dtio->client_key_file,
425 				dtio->client_cert_file,
426 				cfg->dnstap_tls_cert_bundle, 0);
427 		} else {
428 			dtio->ssl_ctx = connect_sslctx_create(
429 				dtio->client_key_file,
430 				dtio->client_cert_file,
431 				cfg->tls_cert_bundle, cfg->tls_win_cert);
432 		}
433 		if(!dtio->ssl_ctx) {
434 			log_err("could not setup SSL CTX");
435 			return 0;
436 		}
437 		dtio->tls_use_sni = cfg->tls_use_sni;
438 #endif /* HAVE_SSL */
439 	}
440 	return 1;
441 }
442 
443 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
444         struct dt_msg_queue* mq)
445 {
446 	struct dt_io_list_item* item = malloc(sizeof(*item));
447 	if(!item) return 0;
448 	lock_basic_lock(&mq->lock);
449 	mq->dtio = dtio;
450 	lock_basic_unlock(&mq->lock);
451 	item->queue = mq;
452 	item->next = dtio->io_list;
453 	dtio->io_list = item;
454 	dtio->io_list_iter = NULL;
455 	return 1;
456 }
457 
458 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
459         struct dt_msg_queue* mq)
460 {
461 	struct dt_io_list_item* item, *prev=NULL;
462 	if(!dtio) return;
463 	item = dtio->io_list;
464 	while(item) {
465 		if(item->queue == mq) {
466 			/* found it */
467 			if(prev) prev->next = item->next;
468 			else dtio->io_list = item->next;
469 			/* the queue itself only registered, not deleted */
470 			lock_basic_lock(&item->queue->lock);
471 			item->queue->dtio = NULL;
472 			lock_basic_unlock(&item->queue->lock);
473 			free(item);
474 			dtio->io_list_iter = NULL;
475 			return;
476 		}
477 		prev = item;
478 		item = item->next;
479 	}
480 }
481 
482 /** pick a message from the queue, the routine locks and unlocks,
483  * returns true if there is a message */
484 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
485 	size_t* len)
486 {
487 	lock_basic_lock(&mq->lock);
488 	if(mq->first) {
489 		struct dt_msg_entry* entry = mq->first;
490 		mq->first = entry->next;
491 		if(!entry->next) mq->last = NULL;
492 		mq->cursize -= entry->len;
493 		mq->msgcount --;
494 		lock_basic_unlock(&mq->lock);
495 
496 		*buf = entry->buf;
497 		*len = entry->len;
498 		free(entry);
499 		return 1;
500 	}
501 	lock_basic_unlock(&mq->lock);
502 	return 0;
503 }
504 
505 /** find message in queue, false if no message, true if message to send */
506 static int dtio_find_in_queue(struct dt_io_thread* dtio,
507 	struct dt_msg_queue* mq)
508 {
509 	void* buf=NULL;
510 	size_t len=0;
511 	if(dt_msg_queue_pop(mq, &buf, &len)) {
512 		dtio->cur_msg = buf;
513 		dtio->cur_msg_len = len;
514 		dtio->cur_msg_done = 0;
515 		dtio->cur_msg_len_done = 0;
516 		return 1;
517 	}
518 	return 0;
519 }
520 
521 /** find a new message to write, search message queues, false if none */
522 static int dtio_find_msg(struct dt_io_thread* dtio)
523 {
524 	struct dt_io_list_item *spot, *item;
525 
526 	spot = dtio->io_list_iter;
527 	/* use the next queue for the next message lookup,
528 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
529 	if(spot)
530 		dtio->io_list_iter = spot->next;
531 	else if(dtio->io_list)
532 		dtio->io_list_iter = dtio->io_list->next;
533 
534 	/* scan from spot to end-of-io_list */
535 	item = spot;
536 	while(item) {
537 		if(dtio_find_in_queue(dtio, item->queue))
538 			return 1;
539 		item = item->next;
540 	}
541 	/* scan starting at the start-of-list (to wrap around the end) */
542 	item = dtio->io_list;
543 	while(item) {
544 		if(dtio_find_in_queue(dtio, item->queue))
545 			return 1;
546 		item = item->next;
547 	}
548 	return 0;
549 }
550 
551 /** callback for the dnstap reconnect, to start reconnecting to output */
552 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
553 	short ATTR_UNUSED(bits), void* arg)
554 {
555 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
556 	dtio->reconnect_is_added = 0;
557 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
558 
559 	dtio_open_output(dtio);
560 	if(dtio->event) {
561 		if(!dtio_add_output_event_write(dtio))
562 			return;
563 		/* nothing wrong so far, wait on the output event */
564 		return;
565 	}
566 	/* exponential backoff and retry on timer */
567 	dtio_reconnect_enable(dtio);
568 }
569 
570 /** attempt to reconnect to the output, after a timeout */
571 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
572 {
573 	struct timeval tv;
574 	int msec;
575 	if(dtio->want_to_exit) return;
576 	if(dtio->reconnect_is_added)
577 		return; /* already done */
578 
579 	/* exponential backoff, store the value for next timeout */
580 	msec = dtio->reconnect_timeout;
581 	if(msec == 0) {
582 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
583 	} else {
584 		dtio->reconnect_timeout = msec*2;
585 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
586 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
587 	}
588 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
589 		msec);
590 
591 	/* setup wait timer */
592 	memset(&tv, 0, sizeof(tv));
593 	tv.tv_sec = msec/1000;
594 	tv.tv_usec = (msec%1000)*1000;
595 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
596 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
597 		log_err("dnstap io: could not reconnect ev timer add");
598 		return;
599 	}
600 	dtio->reconnect_is_added = 1;
601 }
602 
603 /** remove dtio reconnect timer */
604 static void dtio_reconnect_del(struct dt_io_thread* dtio)
605 {
606 	if(!dtio->reconnect_is_added)
607 		return;
608 	ub_timer_del(dtio->reconnect_timer);
609 	dtio->reconnect_is_added = 0;
610 }
611 
612 /** clear the reconnect exponential backoff timer.
613  * We have successfully connected so we can try again with short timeouts. */
614 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
615 {
616 	dtio->reconnect_timeout = 0;
617 	dtio_reconnect_del(dtio);
618 }
619 
620 /** reconnect slowly, because we already know we have to wait for a bit */
621 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
622 {
623 	dtio_reconnect_del(dtio);
624 	dtio->reconnect_timeout = msec;
625 	dtio_reconnect_enable(dtio);
626 }
627 
628 /** delete the current message in the dtio, and reset counters */
629 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
630 {
631 	free(dtio->cur_msg);
632 	dtio->cur_msg = NULL;
633 	dtio->cur_msg_len = 0;
634 	dtio->cur_msg_done = 0;
635 	dtio->cur_msg_len_done = 0;
636 }
637 
638 /** delete the buffer and counters used to read frame */
639 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
640 {
641 	if(rb->buf) {
642 		free(rb->buf);
643 		rb->buf = NULL;
644 	}
645 	rb->buf_count = 0;
646 	rb->buf_cap = 0;
647 	rb->frame_len = 0;
648 	rb->frame_len_done = 0;
649 	rb->control_frame = 0;
650 }
651 
652 /** del the output file descriptor event for listening */
653 static void dtio_del_output_event(struct dt_io_thread* dtio)
654 {
655 	if(!dtio->event_added)
656 		return;
657 	ub_event_del(dtio->event);
658 	dtio->event_added = 0;
659 	dtio->event_added_is_write = 0;
660 }
661 
662 /** close dtio socket and set it to -1 */
663 static void dtio_close_fd(struct dt_io_thread* dtio)
664 {
665 	sock_close(dtio->fd);
666 	dtio->fd = -1;
667 }
668 
669 /** close and stop the output file descriptor event */
670 static void dtio_close_output(struct dt_io_thread* dtio)
671 {
672 	if(!dtio->event)
673 		return;
674 	ub_event_free(dtio->event);
675 	dtio->event = NULL;
676 	if(dtio->ssl) {
677 #ifdef HAVE_SSL
678 		SSL_shutdown(dtio->ssl);
679 		SSL_free(dtio->ssl);
680 		dtio->ssl = NULL;
681 #endif
682 	}
683 	dtio_close_fd(dtio);
684 
685 	/* if there is a (partial) message, discard it
686 	 * we cannot send (the remainder of) it, and a new
687 	 * connection needs to start with a control frame. */
688 	if(dtio->cur_msg) {
689 		dtio_cur_msg_free(dtio);
690 	}
691 
692 	dtio->ready_frame_sent = 0;
693 	dtio->accept_frame_received = 0;
694 	dtio_read_frame_free(&dtio->read_frame);
695 
696 	dtio_reconnect_enable(dtio);
697 }
698 
699 /** check for pending nonblocking connect errors,
700  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
701 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
702 {
703 	int error = 0;
704 	socklen_t len = (socklen_t)sizeof(error);
705 	if(!dtio->check_nb_connect)
706 		return 1; /* everything okay */
707 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
708 		&len) < 0) {
709 #ifndef USE_WINSOCK
710 		error = errno; /* on solaris errno is error */
711 #else
712 		error = WSAGetLastError();
713 #endif
714 	}
715 #ifndef USE_WINSOCK
716 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
717 	if(error == EINPROGRESS || error == EWOULDBLOCK)
718 		return 0; /* try again later */
719 #endif
720 #else
721 	if(error == WSAEINPROGRESS) {
722 		return 0; /* try again later */
723 	} else if(error == WSAEWOULDBLOCK) {
724 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
725 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
726 		return 0; /* try again later */
727 	}
728 #endif
729 	if(error != 0) {
730 		char* to = dtio->socket_path;
731 		if(!to) to = dtio->ip_str;
732 		if(!to) to = "";
733 		log_err("dnstap io: failed to connect to \"%s\": %s",
734 			to, sock_strerror(error));
735 		return -1; /* error, close it */
736 	}
737 
738 	if(dtio->ip_str)
739 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
740 			dtio->ip_str);
741 	else if(dtio->socket_path)
742 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
743 			dtio->socket_path);
744 	dtio_reconnect_clear(dtio);
745 	dtio->check_nb_connect = 0;
746 	return 1; /* everything okay */
747 }
748 
749 #ifdef HAVE_SSL
750 /** write to ssl output
751  * returns number of bytes written, 0 if nothing happened,
752  * try again later, or -1 if the channel is to be closed. */
753 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
754 	size_t len)
755 {
756 	int r;
757 	ERR_clear_error();
758 	r = SSL_write(dtio->ssl, buf, len);
759 	if(r <= 0) {
760 		int want = SSL_get_error(dtio->ssl, r);
761 		if(want == SSL_ERROR_ZERO_RETURN) {
762 			/* closed */
763 			return -1;
764 		} else if(want == SSL_ERROR_WANT_READ) {
765 			/* we want a brief read event */
766 			dtio_enable_brief_read(dtio);
767 			return 0;
768 		} else if(want == SSL_ERROR_WANT_WRITE) {
769 			/* write again later */
770 			return 0;
771 		} else if(want == SSL_ERROR_SYSCALL) {
772 #ifdef EPIPE
773 			if(errno == EPIPE && verbosity < 2)
774 				return -1; /* silence 'broken pipe' */
775 #endif
776 #ifdef ECONNRESET
777 			if(errno == ECONNRESET && verbosity < 2)
778 				return -1; /* silence reset by peer */
779 #endif
780 			if(errno != 0) {
781 				log_err("dnstap io, SSL_write syscall: %s",
782 					strerror(errno));
783 			}
784 			return -1;
785 		}
786 		log_crypto_err("dnstap io, could not SSL_write");
787 		return -1;
788 	}
789 	return r;
790 }
791 #endif /* HAVE_SSL */
792 
793 /** write buffer to output.
794  * returns number of bytes written, 0 if nothing happened,
795  * try again later, or -1 if the channel is to be closed. */
796 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
797 	size_t len)
798 {
799 	ssize_t ret;
800 	if(dtio->fd == -1)
801 		return -1;
802 #ifdef HAVE_SSL
803 	if(dtio->ssl)
804 		return dtio_write_ssl(dtio, buf, len);
805 #endif
806 	ret = send(dtio->fd, (void*)buf, len, 0);
807 	if(ret == -1) {
808 #ifndef USE_WINSOCK
809 		if(errno == EINTR || errno == EAGAIN)
810 			return 0;
811 #else
812 		if(WSAGetLastError() == WSAEINPROGRESS)
813 			return 0;
814 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
815 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
816 				dtio->stop_flush_event:dtio->event),
817 				UB_EV_WRITE);
818 			return 0;
819 		}
820 #endif
821 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
822 		return -1;
823 	}
824 	return ret;
825 }
826 
827 #ifdef HAVE_WRITEV
828 /** write with writev, len and message, in one write, if possible.
829  * return true if message is done, false if incomplete */
830 static int dtio_write_with_writev(struct dt_io_thread* dtio)
831 {
832 	uint32_t sendlen = htonl(dtio->cur_msg_len);
833 	struct iovec iov[2];
834 	ssize_t r;
835 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
836 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
837 	iov[1].iov_base = dtio->cur_msg;
838 	iov[1].iov_len = dtio->cur_msg_len;
839 	log_assert(iov[0].iov_len > 0);
840 	r = writev(dtio->fd, iov, 2);
841 	if(r == -1) {
842 #ifndef USE_WINSOCK
843 		if(errno == EINTR || errno == EAGAIN)
844 			return 0;
845 #else
846 		if(WSAGetLastError() == WSAEINPROGRESS)
847 			return 0;
848 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
849 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
850 				dtio->stop_flush_event:dtio->event),
851 				UB_EV_WRITE);
852 			return 0;
853 		}
854 #endif
855 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
856 		/* close the channel */
857 		dtio_del_output_event(dtio);
858 		dtio_close_output(dtio);
859 		return 0;
860 	}
861 	/* written r bytes */
862 	dtio->cur_msg_len_done += r;
863 	if(dtio->cur_msg_len_done < 4)
864 		return 0;
865 	if(dtio->cur_msg_len_done > 4) {
866 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
867 		dtio->cur_msg_len_done = 4;
868 	}
869 	if(dtio->cur_msg_done < dtio->cur_msg_len)
870 		return 0;
871 	return 1;
872 }
873 #endif /* HAVE_WRITEV */
874 
875 /** write more of the length, preceding the data frame.
876  * return true if message is done, false if incomplete. */
877 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
878 {
879 	uint32_t sendlen;
880 	int r;
881 	if(dtio->cur_msg_len_done >= 4)
882 		return 1;
883 #ifdef HAVE_WRITEV
884 	if(!dtio->ssl) {
885 		/* we try writev for everything.*/
886 		return dtio_write_with_writev(dtio);
887 	}
888 #endif /* HAVE_WRITEV */
889 	sendlen = htonl(dtio->cur_msg_len);
890 	r = dtio_write_buf(dtio,
891 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
892 		sizeof(sendlen)-dtio->cur_msg_len_done);
893 	if(r == -1) {
894 		/* close the channel */
895 		dtio_del_output_event(dtio);
896 		dtio_close_output(dtio);
897 		return 0;
898 	} else if(r == 0) {
899 		/* try again later */
900 		return 0;
901 	}
902 	dtio->cur_msg_len_done += r;
903 	if(dtio->cur_msg_len_done < 4)
904 		return 0;
905 	return 1;
906 }
907 
908 /** write more of the data frame.
909  * return true if message is done, false if incomplete. */
910 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
911 {
912 	int r;
913 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
914 		return 1;
915 	r = dtio_write_buf(dtio,
916 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
917 		dtio->cur_msg_len - dtio->cur_msg_done);
918 	if(r == -1) {
919 		/* close the channel */
920 		dtio_del_output_event(dtio);
921 		dtio_close_output(dtio);
922 		return 0;
923 	} else if(r == 0) {
924 		/* try again later */
925 		return 0;
926 	}
927 	dtio->cur_msg_done += r;
928 	if(dtio->cur_msg_done < dtio->cur_msg_len)
929 		return 0;
930 	return 1;
931 }
932 
933 /** write more of the current message. false if incomplete, true if
934  * the message is done */
935 static int dtio_write_more(struct dt_io_thread* dtio)
936 {
937 	if(dtio->cur_msg_len_done < 4) {
938 		if(!dtio_write_more_of_len(dtio))
939 			return 0;
940 	}
941 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
942 		if(!dtio_write_more_of_data(dtio))
943 			return 0;
944 	}
945 	return 1;
946 }
947 
948 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
949  * -1: continue, >0: number of bytes read into buffer */
950 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
951 	ssize_t r;
952 	r = recv(dtio->fd, (void*)buf, len, 0);
953 	if(r == -1) {
954 		char* to = dtio->socket_path;
955 		if(!to) to = dtio->ip_str;
956 		if(!to) to = "";
957 #ifndef USE_WINSOCK
958 		if(errno == EINTR || errno == EAGAIN)
959 			return -1; /* try later */
960 #else
961 		if(WSAGetLastError() == WSAEINPROGRESS) {
962 			return -1; /* try later */
963 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
964 			ub_winsock_tcp_wouldblock(
965 				(dtio->stop_flush_event?
966 				dtio->stop_flush_event:dtio->event),
967 				UB_EV_READ);
968 			return -1; /* try later */
969 		}
970 #endif
971 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
972 			verbosity < 4)
973 			return 0; /* no log retries on low verbosity */
974 		log_err("dnstap io: output closed, recv %s: %s", to,
975 			strerror(errno));
976 		/* and close below */
977 		return 0;
978 	}
979 	if(r == 0) {
980 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
981 			verbosity < 4)
982 			return 0; /* no log retries on low verbosity */
983 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
984 		/* and close below */
985 		return 0;
986 	}
987 	/* something was received */
988 	return r;
989 }
990 
991 #ifdef HAVE_SSL
992 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
993  * -1: continue, >0: number of bytes read into buffer */
994 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
995 {
996 	int r;
997 	ERR_clear_error();
998 	r = SSL_read(dtio->ssl, buf, len);
999 	if(r <= 0) {
1000 		int want = SSL_get_error(dtio->ssl, r);
1001 		if(want == SSL_ERROR_ZERO_RETURN) {
1002 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1003 				verbosity < 4)
1004 				return 0; /* no log retries on low verbosity */
1005 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1006 				"other side");
1007 			return 0;
1008 		} else if(want == SSL_ERROR_WANT_READ) {
1009 			/* continue later */
1010 			return -1;
1011 		} else if(want == SSL_ERROR_WANT_WRITE) {
1012 			(void)dtio_enable_brief_write(dtio);
1013 			return -1;
1014 		} else if(want == SSL_ERROR_SYSCALL) {
1015 #ifdef ECONNRESET
1016 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017 				errno == ECONNRESET && verbosity < 4)
1018 				return 0; /* silence reset by peer */
1019 #endif
1020 			if(errno != 0)
1021 				log_err("SSL_read syscall: %s",
1022 					strerror(errno));
1023 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1024 				"other side");
1025 			return 0;
1026 		}
1027 		log_crypto_err("could not SSL_read");
1028 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029 				"other side");
1030 		return 0;
1031 	}
1032 	return r;
1033 }
1034 #endif /* HAVE_SSL */
1035 
1036 /** check if the output fd has been closed,
1037  * it returns false if the stream is closed. */
1038 static int dtio_check_close(struct dt_io_thread* dtio)
1039 {
1040 	/* we don't want to read any packets, but if there are we can
1041 	 * discard the input (ignore it).  Ignore of unknown (control)
1042 	 * packets is okay for the framestream protocol.  And also, the
1043 	 * read call can return that the stream has been closed by the
1044 	 * other side. */
1045 	uint8_t buf[1024];
1046 	int r = -1;
1047 
1048 
1049 	if(dtio->fd == -1) return 0;
1050 
1051 	while(r != 0) {
1052 		/* not interested in buffer content, overwrite */
1053 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1054 		if(r == -1)
1055 			return 1;
1056 	}
1057 	/* the other end has been closed */
1058 	/* close the channel */
1059 	dtio_del_output_event(dtio);
1060 	dtio_close_output(dtio);
1061 	return 0;
1062 }
1063 
1064 /** Read accept frame. Returns -1: continue reading, 0: closed,
1065  * 1: valid accept received. */
1066 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1067 {
1068 	int r;
1069 	size_t read_frame_done;
1070 	while(dtio->read_frame.frame_len_done < 4) {
1071 #ifdef HAVE_SSL
1072 		if(dtio->ssl) {
1073 			r = ssl_read_bytes(dtio,
1074 				(uint8_t*)&dtio->read_frame.frame_len+
1075 				dtio->read_frame.frame_len_done,
1076 				4-dtio->read_frame.frame_len_done);
1077 		} else {
1078 #endif
1079 			r = receive_bytes(dtio,
1080 				(uint8_t*)&dtio->read_frame.frame_len+
1081 				dtio->read_frame.frame_len_done,
1082 				4-dtio->read_frame.frame_len_done);
1083 #ifdef HAVE_SSL
1084 		}
1085 #endif
1086 		if(r == -1)
1087 			return -1; /* continue reading */
1088 		if(r == 0) {
1089 			 /* connection closed */
1090 			goto close_connection;
1091 		}
1092 		dtio->read_frame.frame_len_done += r;
1093 		if(dtio->read_frame.frame_len_done < 4)
1094 			return -1; /* continue reading */
1095 
1096 		if(dtio->read_frame.frame_len == 0) {
1097 			dtio->read_frame.frame_len_done = 0;
1098 			dtio->read_frame.control_frame = 1;
1099 			continue;
1100 		}
1101 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1102 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1103 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1104 				"length of %d bytes, closing connection",
1105 				DTIO_RECV_FRAME_MAX_LEN);
1106 			goto close_connection;
1107 		}
1108 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1109 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1110 		if(!dtio->read_frame.buf) {
1111 			log_err("dnstap io: out of memory (creating read "
1112 				"buffer)");
1113 			goto close_connection;
1114 		}
1115 	}
1116 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1117 #ifdef HAVE_SSL
1118 		if(dtio->ssl) {
1119 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1120 				dtio->read_frame.buf_count,
1121 				dtio->read_frame.buf_cap-
1122 				dtio->read_frame.buf_count);
1123 		} else {
1124 #endif
1125 			r = receive_bytes(dtio, dtio->read_frame.buf+
1126 				dtio->read_frame.buf_count,
1127 				dtio->read_frame.buf_cap-
1128 				dtio->read_frame.buf_count);
1129 #ifdef HAVE_SSL
1130 		}
1131 #endif
1132 		if(r == -1)
1133 			return -1; /* continue reading */
1134 		if(r == 0) {
1135 			 /* connection closed */
1136 			goto close_connection;
1137 		}
1138 		dtio->read_frame.buf_count += r;
1139 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1140 			return -1; /* continue reading */
1141 	}
1142 
1143 	/* Complete frame received, check if this is a valid ACCEPT control
1144 	 * frame. */
1145 	if(dtio->read_frame.frame_len < 4) {
1146 		verbose(VERB_OPS, "dnstap: invalid data received");
1147 		goto close_connection;
1148 	}
1149 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1150 		FSTRM_CONTROL_FRAME_ACCEPT) {
1151 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1152 			"ignored");
1153 		dtio->ready_frame_sent = 0;
1154 		dtio->accept_frame_received = 0;
1155 		dtio_read_frame_free(&dtio->read_frame);
1156 		return -1;
1157 	}
1158 	read_frame_done = 4; /* control frame type */
1159 
1160 	/* Iterate over control fields, ignore unknown types.
1161 	 * Need to be able to read at least 8 bytes (control field type +
1162 	 * length). */
1163 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1164 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1165 			read_frame_done);
1166 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1167 			read_frame_done + 4);
1168 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1169 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1170 				read_frame_done+8+len <=
1171 				dtio->read_frame.frame_len &&
1172 				memcmp(dtio->read_frame.buf + read_frame_done +
1173 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1174 				if(!dtio_control_start_send(dtio)) {
1175 					verbose(VERB_OPS, "dnstap io: out of "
1176 					 "memory while sending START frame");
1177 					goto close_connection;
1178 				}
1179 				dtio->accept_frame_received = 1;
1180 				if(!dtio_add_output_event_write(dtio))
1181 					goto close_connection;
1182 				return 1;
1183 			} else {
1184 				/* unknown content type */
1185 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1186 					"contains unknown content type, "
1187 					"closing connection");
1188 				goto close_connection;
1189 			}
1190 		}
1191 		/* unknown option, try next */
1192 		read_frame_done += 8+len;
1193 	}
1194 
1195 
1196 close_connection:
1197 	dtio_del_output_event(dtio);
1198 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1199 	dtio_close_output(dtio);
1200 	return 0;
1201 }
1202 
1203 /** add the output file descriptor event for listening, read only */
1204 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1205 {
1206 	if(!dtio->event)
1207 		return 0;
1208 	if(dtio->event_added && !dtio->event_added_is_write)
1209 		return 1;
1210 	/* we have to (re-)register the event */
1211 	if(dtio->event_added)
1212 		ub_event_del(dtio->event);
1213 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1214 	if(ub_event_add(dtio->event, NULL) != 0) {
1215 		log_err("dnstap io: out of memory (adding event)");
1216 		dtio->event_added = 0;
1217 		dtio->event_added_is_write = 0;
1218 		/* close output and start reattempts to open it */
1219 		dtio_close_output(dtio);
1220 		return 0;
1221 	}
1222 	dtio->event_added = 1;
1223 	dtio->event_added_is_write = 0;
1224 	return 1;
1225 }
1226 
1227 /** add the output file descriptor event for listening, read and write */
1228 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1229 {
1230 	if(!dtio->event)
1231 		return 0;
1232 	if(dtio->event_added && dtio->event_added_is_write)
1233 		return 1;
1234 	/* we have to (re-)register the event */
1235 	if(dtio->event_added)
1236 		ub_event_del(dtio->event);
1237 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1238 	if(ub_event_add(dtio->event, NULL) != 0) {
1239 		log_err("dnstap io: out of memory (adding event)");
1240 		dtio->event_added = 0;
1241 		dtio->event_added_is_write = 0;
1242 		/* close output and start reattempts to open it */
1243 		dtio_close_output(dtio);
1244 		return 0;
1245 	}
1246 	dtio->event_added = 1;
1247 	dtio->event_added_is_write = 1;
1248 	return 1;
1249 }
1250 
1251 /** put the dtio thread to sleep */
1252 static void dtio_sleep(struct dt_io_thread* dtio)
1253 {
1254 	/* unregister the event polling for write, because there is
1255 	 * nothing to be written */
1256 	(void)dtio_add_output_event_read(dtio);
1257 }
1258 
1259 #ifdef HAVE_SSL
1260 /** enable the brief read condition */
1261 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1262 {
1263 	dtio->ssl_brief_read = 1;
1264 	if(dtio->stop_flush_event) {
1265 		ub_event_del(dtio->stop_flush_event);
1266 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1267 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1268 			log_err("dnstap io, stop flush, could not ub_event_add");
1269 			return 0;
1270 		}
1271 		return 1;
1272 	}
1273 	return dtio_add_output_event_read(dtio);
1274 }
1275 #endif /* HAVE_SSL */
1276 
1277 #ifdef HAVE_SSL
1278 /** disable the brief read condition */
1279 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1280 {
1281 	dtio->ssl_brief_read = 0;
1282 	if(dtio->stop_flush_event) {
1283 		ub_event_del(dtio->stop_flush_event);
1284 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1285 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1286 			log_err("dnstap io, stop flush, could not ub_event_add");
1287 			return 0;
1288 		}
1289 		return 1;
1290 	}
1291 	return dtio_add_output_event_write(dtio);
1292 }
1293 #endif /* HAVE_SSL */
1294 
1295 #ifdef HAVE_SSL
1296 /** enable the brief write condition */
1297 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1298 {
1299 	dtio->ssl_brief_write = 1;
1300 	return dtio_add_output_event_write(dtio);
1301 }
1302 #endif /* HAVE_SSL */
1303 
1304 #ifdef HAVE_SSL
1305 /** disable the brief write condition */
1306 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1307 {
1308 	dtio->ssl_brief_write = 0;
1309 	return dtio_add_output_event_read(dtio);
1310 }
1311 #endif /* HAVE_SSL */
1312 
1313 #ifdef HAVE_SSL
1314 /** check peer verification after ssl handshake connection, false if closed*/
1315 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1316 {
1317 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1318 		/* verification */
1319 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1320 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1321 			if(!x) {
1322 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1323 					"connection failed no certificate",
1324 					dtio->ip_str);
1325 				return 0;
1326 			}
1327 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1328 				x);
1329 #ifdef HAVE_SSL_GET0_PEERNAME
1330 			if(SSL_get0_peername(dtio->ssl)) {
1331 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1332 					"connection to %s authenticated",
1333 					dtio->ip_str,
1334 					SSL_get0_peername(dtio->ssl));
1335 			} else {
1336 #endif
1337 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1338 					"connection authenticated",
1339 					dtio->ip_str);
1340 #ifdef HAVE_SSL_GET0_PEERNAME
1341 			}
1342 #endif
1343 			X509_free(x);
1344 		} else {
1345 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1346 			if(x) {
1347 				log_cert(VERB_ALGO, "dnstap io, peer "
1348 					"certificate", x);
1349 				X509_free(x);
1350 			}
1351 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1352 				"failed: failed to authenticate",
1353 				dtio->ip_str);
1354 			return 0;
1355 		}
1356 	} else {
1357 		/* unauthenticated, the verify peer flag was not set
1358 		 * in ssl when the ssl object was created from ssl_ctx */
1359 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1360 			dtio->ip_str);
1361 	}
1362 	return 1;
1363 }
1364 #endif /* HAVE_SSL */
1365 
1366 #ifdef HAVE_SSL
1367 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1368 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1369 	struct stop_flush_info* info)
1370 {
1371 	int r;
1372 	if(dtio->ssl_brief_read) {
1373 		/* assume the brief read condition is satisfied,
1374 		 * if we need more or again, we can set it again */
1375 		if(!dtio_disable_brief_read(dtio)) {
1376 			if(info) dtio_stop_flush_exit(info);
1377 			return 0;
1378 		}
1379 	}
1380 	if(dtio->ssl_handshake_done)
1381 		return 1;
1382 
1383 	ERR_clear_error();
1384 	r = SSL_do_handshake(dtio->ssl);
1385 	if(r != 1) {
1386 		int want = SSL_get_error(dtio->ssl, r);
1387 		if(want == SSL_ERROR_WANT_READ) {
1388 			/* we want to read on the connection */
1389 			if(!dtio_enable_brief_read(dtio)) {
1390 				if(info) dtio_stop_flush_exit(info);
1391 				return 0;
1392 			}
1393 			return 0;
1394 		} else if(want == SSL_ERROR_WANT_WRITE) {
1395 			/* we want to write on the connection */
1396 			return 0;
1397 		} else if(r == 0) {
1398 			/* closed */
1399 			if(info) dtio_stop_flush_exit(info);
1400 			dtio_del_output_event(dtio);
1401 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1402 			dtio_close_output(dtio);
1403 			return 0;
1404 		} else if(want == SSL_ERROR_SYSCALL) {
1405 			/* SYSCALL and errno==0 means closed uncleanly */
1406 			int silent = 0;
1407 #ifdef EPIPE
1408 			if(errno == EPIPE && verbosity < 2)
1409 				silent = 1; /* silence 'broken pipe' */
1410 #endif
1411 #ifdef ECONNRESET
1412 			if(errno == ECONNRESET && verbosity < 2)
1413 				silent = 1; /* silence reset by peer */
1414 #endif
1415 			if(errno == 0)
1416 				silent = 1;
1417 			if(!silent)
1418 				log_err("dnstap io, SSL_handshake syscall: %s",
1419 					strerror(errno));
1420 			/* closed */
1421 			if(info) dtio_stop_flush_exit(info);
1422 			dtio_del_output_event(dtio);
1423 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1424 			dtio_close_output(dtio);
1425 			return 0;
1426 		} else {
1427 			unsigned long err = ERR_get_error();
1428 			if(!squelch_err_ssl_handshake(err)) {
1429 				log_crypto_err_code("dnstap io, ssl handshake failed",
1430 					err);
1431 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1432 					"from %s", dtio->ip_str);
1433 			}
1434 			/* closed */
1435 			if(info) dtio_stop_flush_exit(info);
1436 			dtio_del_output_event(dtio);
1437 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1438 			dtio_close_output(dtio);
1439 			return 0;
1440 		}
1441 
1442 	}
1443 	/* check peer verification */
1444 	dtio->ssl_handshake_done = 1;
1445 
1446 	if(!dtio_ssl_check_peer(dtio)) {
1447 		/* closed */
1448 		if(info) dtio_stop_flush_exit(info);
1449 		dtio_del_output_event(dtio);
1450 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1451 		dtio_close_output(dtio);
1452 		return 0;
1453 	}
1454 	return 1;
1455 }
1456 #endif /* HAVE_SSL */
1457 
1458 /** callback for the dnstap events, to write to the output */
1459 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1460 {
1461 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1462 	int i;
1463 
1464 	if(dtio->check_nb_connect) {
1465 		int connect_err = dtio_check_nb_connect(dtio);
1466 		if(connect_err == -1) {
1467 			/* close the channel */
1468 			dtio_del_output_event(dtio);
1469 			dtio_close_output(dtio);
1470 			return;
1471 		} else if(connect_err == 0) {
1472 			/* try again later */
1473 			return;
1474 		}
1475 		/* nonblocking connect check passed, continue */
1476 	}
1477 
1478 #ifdef HAVE_SSL
1479 	if(dtio->ssl &&
1480 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1481 		if(!dtio_ssl_handshake(dtio, NULL))
1482 			return;
1483 	}
1484 #endif
1485 
1486 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1487 		if(dtio->ssl_brief_write)
1488 			(void)dtio_disable_brief_write(dtio);
1489 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1490 			if(dtio_read_accept_frame(dtio) <= 0)
1491 				return;
1492 		} else if(!dtio_check_close(dtio))
1493 			return;
1494 	}
1495 
1496 	/* loop to process a number of messages.  This improves throughput,
1497 	 * because selecting on write-event if not needed for busy messages
1498 	 * (dnstap log) generation and if they need to all be written back.
1499 	 * The write event is usually not blocked up.  But not forever,
1500 	 * because the event loop needs to stay responsive for other events.
1501 	 * If there are no (more) messages, or if the output buffers get
1502 	 * full, it returns out of the loop. */
1503 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1504 		/* see if there are messages that need writing */
1505 		if(!dtio->cur_msg) {
1506 			if(!dtio_find_msg(dtio)) {
1507 				if(i == 0) {
1508 					/* no messages on the first iteration,
1509 					 * the queues are all empty */
1510 					dtio_sleep(dtio);
1511 				}
1512 				return; /* nothing to do */
1513 			}
1514 		}
1515 
1516 		/* write it */
1517 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1518 			if(!dtio_write_more(dtio))
1519 				return;
1520 		}
1521 
1522 		/* done with the current message */
1523 		dtio_cur_msg_free(dtio);
1524 
1525 		/* If this is a bidirectional stream the first message will be
1526 		 * the READY control frame. We can only continue writing after
1527 		 * receiving an ACCEPT control frame. */
1528 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1529 			dtio->ready_frame_sent = 1;
1530 			(void)dtio_add_output_event_read(dtio);
1531 			break;
1532 		}
1533 	}
1534 }
1535 
1536 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1537 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1538 {
1539 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1540 	uint8_t cmd;
1541 	ssize_t r;
1542 	if(dtio->want_to_exit)
1543 		return;
1544 	r = read(fd, &cmd, sizeof(cmd));
1545 	if(r == -1) {
1546 #ifndef USE_WINSOCK
1547 		if(errno == EINTR || errno == EAGAIN)
1548 			return; /* ignore this */
1549 #else
1550 		if(WSAGetLastError() == WSAEINPROGRESS)
1551 			return;
1552 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1553 			return;
1554 #endif
1555 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1556 		/* and then fall through to quit the thread */
1557 	} else if(r == 0) {
1558 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1559 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1560 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1561 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1562 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1563 
1564 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1565 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1566 				"waiting for ACCEPT control frame");
1567 			return;
1568 		}
1569 
1570 		/* reregister event */
1571 		if(!dtio_add_output_event_write(dtio))
1572 			return;
1573 		return;
1574 	} else if(r == 1) {
1575 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1576 	}
1577 	dtio->want_to_exit = 1;
1578 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1579 		!= 0) {
1580 		log_err("dnstap io: could not loopexit");
1581 	}
1582 }
1583 
1584 #ifndef THREADS_DISABLED
1585 /** setup the event base for the dnstap io thread */
1586 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1587 	struct timeval* now)
1588 {
1589 	memset(now, 0, sizeof(*now));
1590 	dtio->event_base = ub_default_event_base(0, secs, now);
1591 	if(!dtio->event_base) {
1592 		fatal_exit("dnstap io: could not create event_base");
1593 	}
1594 }
1595 #endif /* THREADS_DISABLED */
1596 
1597 /** setup the cmd event for dnstap io */
1598 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1599 {
1600 	struct ub_event* cmdev;
1601 	fd_set_nonblock(dtio->commandpipe[0]);
1602 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1603 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1604 	if(!cmdev) {
1605 		fatal_exit("dnstap io: out of memory");
1606 	}
1607 	dtio->command_event = cmdev;
1608 	if(ub_event_add(cmdev, NULL) != 0) {
1609 		fatal_exit("dnstap io: out of memory (adding event)");
1610 	}
1611 }
1612 
1613 /** setup the reconnect event for dnstap io */
1614 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1615 {
1616 	dtio_reconnect_clear(dtio);
1617 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1618 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1619 	if(!dtio->reconnect_timer) {
1620 		fatal_exit("dnstap io: out of memory");
1621 	}
1622 }
1623 
1624 /**
1625  * structure to keep track of information during stop flush
1626  */
1627 struct stop_flush_info {
1628 	/** the event base during stop flush */
1629 	struct ub_event_base* base;
1630 	/** did we already want to exit this stop-flush event base */
1631 	int want_to_exit_flush;
1632 	/** has the timer fired */
1633 	int timer_done;
1634 	/** the dtio */
1635 	struct dt_io_thread* dtio;
1636 	/** the stop control frame */
1637 	void* stop_frame;
1638 	/** length of the stop frame */
1639 	size_t stop_frame_len;
1640 	/** how much we have done of the stop frame */
1641 	size_t stop_frame_done;
1642 };
1643 
1644 /** exit the stop flush base */
1645 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1646 {
1647 	if(info->want_to_exit_flush)
1648 		return;
1649 	info->want_to_exit_flush = 1;
1650 	if(ub_event_base_loopexit(info->base) != 0) {
1651 		log_err("dnstap io: could not loopexit");
1652 	}
1653 }
1654 
1655 /** send the stop control,
1656  * return true if completed the frame. */
1657 static int dtio_control_stop_send(struct stop_flush_info* info)
1658 {
1659 	struct dt_io_thread* dtio = info->dtio;
1660 	int r;
1661 	if(info->stop_frame_done >= info->stop_frame_len)
1662 		return 1;
1663 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1664 		info->stop_frame_done, info->stop_frame_len -
1665 		info->stop_frame_done);
1666 	if(r == -1) {
1667 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1668 		dtio_stop_flush_exit(info);
1669 		return 0;
1670 	}
1671 	if(r == 0) {
1672 		/* try again later, or timeout */
1673 		return 0;
1674 	}
1675 	info->stop_frame_done += r;
1676 	if(info->stop_frame_done < info->stop_frame_len)
1677 		return 0; /* not done yet */
1678 	return 1;
1679 }
1680 
1681 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1682 	void* arg)
1683 {
1684 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1685 	if(info->want_to_exit_flush)
1686 		return;
1687 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1688 	info->timer_done = 1;
1689 	dtio_stop_flush_exit(info);
1690 }
1691 
1692 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1693 {
1694 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1695 	struct dt_io_thread* dtio = info->dtio;
1696 	if(info->want_to_exit_flush)
1697 		return;
1698 	if(dtio->check_nb_connect) {
1699 		/* we don't start the stop_flush if connect still
1700 		 * in progress, but the check code is here, just in case */
1701 		int connect_err = dtio_check_nb_connect(dtio);
1702 		if(connect_err == -1) {
1703 			/* close the channel, exit the stop flush */
1704 			dtio_stop_flush_exit(info);
1705 			dtio_del_output_event(dtio);
1706 			dtio_close_output(dtio);
1707 			return;
1708 		} else if(connect_err == 0) {
1709 			/* try again later */
1710 			return;
1711 		}
1712 		/* nonblocking connect check passed, continue */
1713 	}
1714 #ifdef HAVE_SSL
1715 	if(dtio->ssl &&
1716 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1717 		if(!dtio_ssl_handshake(dtio, info))
1718 			return;
1719 	}
1720 #endif
1721 
1722 	if((bits&UB_EV_READ)) {
1723 		if(!dtio_check_close(dtio)) {
1724 			if(dtio->fd == -1) {
1725 				verbose(VERB_ALGO, "dnstap io: "
1726 					"stop flush: output closed");
1727 				dtio_stop_flush_exit(info);
1728 			}
1729 			return;
1730 		}
1731 	}
1732 	/* write remainder of last frame */
1733 	if(dtio->cur_msg) {
1734 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1735 			if(!dtio_write_more(dtio)) {
1736 				if(dtio->fd == -1) {
1737 					verbose(VERB_ALGO, "dnstap io: "
1738 						"stop flush: output closed");
1739 					dtio_stop_flush_exit(info);
1740 				}
1741 				return;
1742 			}
1743 		}
1744 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1745 			"last frame");
1746 		dtio_cur_msg_free(dtio);
1747 	}
1748 	/* write stop frame */
1749 	if(info->stop_frame_done < info->stop_frame_len) {
1750 		if(!dtio_control_stop_send(info))
1751 			return;
1752 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1753 			"stop control frame");
1754 	}
1755 	/* when last frame and stop frame are sent, exit */
1756 	dtio_stop_flush_exit(info);
1757 }
1758 
1759 /** flush at end, last packet and stop control */
1760 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1761 {
1762 	/* briefly attempt to flush the previous packet to the output,
1763 	 * this could be a partial packet, or even the start control frame */
1764 	time_t secs = 0;
1765 	struct timeval now;
1766 	struct stop_flush_info info;
1767 	struct timeval tv;
1768 	struct ub_event* timer, *stopev;
1769 
1770 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1771 		/* no connection or we have just connected, so nothing is
1772 		 * sent yet, so nothing to stop or flush */
1773 		return;
1774 	}
1775 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1776 		/* no SSL connection has been established yet */
1777 		return;
1778 	}
1779 
1780 	memset(&info, 0, sizeof(info));
1781 	memset(&now, 0, sizeof(now));
1782 	info.dtio = dtio;
1783 	info.base = ub_default_event_base(0, &secs, &now);
1784 	if(!info.base) {
1785 		log_err("dnstap io: malloc failure");
1786 		return;
1787 	}
1788 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1789 		&dtio_stop_timer_cb, &info);
1790 	if(!timer) {
1791 		log_err("dnstap io: malloc failure");
1792 		ub_event_base_free(info.base);
1793 		return;
1794 	}
1795 	memset(&tv, 0, sizeof(tv));
1796 	tv.tv_sec = 2;
1797 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1798 		&tv) != 0) {
1799 		log_err("dnstap io: cannot event_timer_add");
1800 		ub_event_free(timer);
1801 		ub_event_base_free(info.base);
1802 		return;
1803 	}
1804 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1805 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1806 	if(!stopev) {
1807 		log_err("dnstap io: malloc failure");
1808 		ub_timer_del(timer);
1809 		ub_event_free(timer);
1810 		ub_event_base_free(info.base);
1811 		return;
1812 	}
1813 	if(ub_event_add(stopev, NULL) != 0) {
1814 		log_err("dnstap io: cannot event_add");
1815 		ub_event_free(stopev);
1816 		ub_timer_del(timer);
1817 		ub_event_free(timer);
1818 		ub_event_base_free(info.base);
1819 		return;
1820 	}
1821 	info.stop_frame = fstrm_create_control_frame_stop(
1822 		&info.stop_frame_len);
1823 	if(!info.stop_frame) {
1824 		log_err("dnstap io: malloc failure");
1825 		ub_event_del(stopev);
1826 		ub_event_free(stopev);
1827 		ub_timer_del(timer);
1828 		ub_event_free(timer);
1829 		ub_event_base_free(info.base);
1830 		return;
1831 	}
1832 	dtio->stop_flush_event = stopev;
1833 
1834 	/* wait briefly, or until finished */
1835 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1836 	if(ub_event_base_dispatch(info.base) < 0) {
1837 		log_err("dnstap io: dispatch flush failed, errno is %s",
1838 			strerror(errno));
1839 	}
1840 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1841 	free(info.stop_frame);
1842 	dtio->stop_flush_event = NULL;
1843 	ub_event_del(stopev);
1844 	ub_event_free(stopev);
1845 	ub_timer_del(timer);
1846 	ub_event_free(timer);
1847 	ub_event_base_free(info.base);
1848 }
1849 
1850 /** perform desetup and free stuff when the dnstap io thread exits */
1851 static void dtio_desetup(struct dt_io_thread* dtio)
1852 {
1853 	dtio_control_stop_flush(dtio);
1854 	dtio_del_output_event(dtio);
1855 	dtio_close_output(dtio);
1856 	ub_event_del(dtio->command_event);
1857 	ub_event_free(dtio->command_event);
1858 #ifndef USE_WINSOCK
1859 	close(dtio->commandpipe[0]);
1860 #else
1861 	_close(dtio->commandpipe[0]);
1862 #endif
1863 	dtio->commandpipe[0] = -1;
1864 	dtio_reconnect_del(dtio);
1865 	ub_event_free(dtio->reconnect_timer);
1866 	dtio_cur_msg_free(dtio);
1867 #ifndef THREADS_DISABLED
1868 	ub_event_base_free(dtio->event_base);
1869 #endif
1870 }
1871 
1872 /** setup a start control message */
1873 static int dtio_control_start_send(struct dt_io_thread* dtio)
1874 {
1875 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1876 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1877 		&dtio->cur_msg_len);
1878 	if(!dtio->cur_msg) {
1879 		return 0;
1880 	}
1881 	/* setup to send the control message */
1882 	/* set that the buffer needs to be sent, but the length
1883 	 * of that buffer is already written, that way the buffer can
1884 	 * start with 0 length and then the length of the control frame
1885 	 * in it */
1886 	dtio->cur_msg_done = 0;
1887 	dtio->cur_msg_len_done = 4;
1888 	return 1;
1889 }
1890 
1891 /** setup a ready control message */
1892 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1893 {
1894 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1895 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1896 		&dtio->cur_msg_len);
1897 	if(!dtio->cur_msg) {
1898 		return 0;
1899 	}
1900 	/* setup to send the control message */
1901 	/* set that the buffer needs to be sent, but the length
1902 	 * of that buffer is already written, that way the buffer can
1903 	 * start with 0 length and then the length of the control frame
1904 	 * in it */
1905 	dtio->cur_msg_done = 0;
1906 	dtio->cur_msg_len_done = 4;
1907 	return 1;
1908 }
1909 
1910 /** open the output file descriptor for af_local */
1911 static int dtio_open_output_local(struct dt_io_thread* dtio)
1912 {
1913 #ifdef HAVE_SYS_UN_H
1914 	struct sockaddr_un s;
1915 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1916 	if(dtio->fd == -1) {
1917 		log_err("dnstap io: failed to create socket: %s",
1918 			sock_strerror(errno));
1919 		return 0;
1920 	}
1921 	memset(&s, 0, sizeof(s));
1922 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1923         /* this member exists on BSDs, not Linux */
1924         s.sun_len = (unsigned)sizeof(s);
1925 #endif
1926 	s.sun_family = AF_LOCAL;
1927 	/* length is 92-108, 104 on FreeBSD */
1928         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1929 	fd_set_nonblock(dtio->fd);
1930 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1931 		== -1) {
1932 		char* to = dtio->socket_path;
1933 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1934 			verbosity < 4) {
1935 			dtio_close_fd(dtio);
1936 			return 0; /* no log retries on low verbosity */
1937 		}
1938 		log_err("dnstap io: failed to connect to \"%s\": %s",
1939 			to, sock_strerror(errno));
1940 		dtio_close_fd(dtio);
1941 		return 0;
1942 	}
1943 	return 1;
1944 #else
1945 	log_err("cannot create af_local socket");
1946 	return 0;
1947 #endif /* HAVE_SYS_UN_H */
1948 }
1949 
1950 /** open the output file descriptor for af_inet and af_inet6 */
1951 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1952 {
1953 	struct sockaddr_storage addr;
1954 	socklen_t addrlen;
1955 	memset(&addr, 0, sizeof(addr));
1956 	addrlen = (socklen_t)sizeof(addr);
1957 
1958 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1959 		log_err("could not parse IP '%s'", dtio->ip_str);
1960 		return 0;
1961 	}
1962 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1963 	if(dtio->fd == -1) {
1964 		log_err("can't create socket: %s", sock_strerror(errno));
1965 		return 0;
1966 	}
1967 	fd_set_nonblock(dtio->fd);
1968 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1969 		if(errno == EINPROGRESS)
1970 			return 1; /* wait until connect done*/
1971 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1972 			verbosity < 4) {
1973 			dtio_close_fd(dtio);
1974 			return 0; /* no log retries on low verbosity */
1975 		}
1976 #ifndef USE_WINSOCK
1977 		if(tcp_connect_errno_needs_log(
1978 			(struct sockaddr *)&addr, addrlen)) {
1979 			log_err("dnstap io: failed to connect to %s: %s",
1980 				dtio->ip_str, strerror(errno));
1981 		}
1982 #else
1983 		if(WSAGetLastError() == WSAEINPROGRESS ||
1984 			WSAGetLastError() == WSAEWOULDBLOCK)
1985 			return 1; /* wait until connect done*/
1986 		if(tcp_connect_errno_needs_log(
1987 			(struct sockaddr *)&addr, addrlen)) {
1988 			log_err("dnstap io: failed to connect to %s: %s",
1989 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1990 		}
1991 #endif
1992 		dtio_close_fd(dtio);
1993 		return 0;
1994 	}
1995 	return 1;
1996 }
1997 
1998 /** setup the SSL structure for new connection */
1999 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2000 {
2001 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2002 	if(!dtio->ssl) return 0;
2003 	dtio->ssl_handshake_done = 0;
2004 	dtio->ssl_brief_read = 0;
2005 
2006 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2007 		dtio->tls_use_sni)) {
2008 		return 0;
2009 	}
2010 	return 1;
2011 }
2012 
2013 /** open the output file descriptor */
2014 static void dtio_open_output(struct dt_io_thread* dtio)
2015 {
2016 	struct ub_event* ev;
2017 	if(dtio->upstream_is_unix) {
2018 		if(!dtio_open_output_local(dtio)) {
2019 			dtio_reconnect_enable(dtio);
2020 			return;
2021 		}
2022 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2023 		if(!dtio_open_output_tcp(dtio)) {
2024 			dtio_reconnect_enable(dtio);
2025 			return;
2026 		}
2027 		if(dtio->upstream_is_tls) {
2028 			if(!dtio_setup_ssl(dtio)) {
2029 				dtio_close_fd(dtio);
2030 				dtio_reconnect_enable(dtio);
2031 				return;
2032 			}
2033 		}
2034 	}
2035 	dtio->check_nb_connect = 1;
2036 
2037 	/* the EV_READ is to read ACCEPT control messages, and catch channel
2038 	 * close. EV_WRITE is to write packets */
2039 	ev = ub_event_new(dtio->event_base, dtio->fd,
2040 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2041 		dtio);
2042 	if(!ev) {
2043 		log_err("dnstap io: out of memory");
2044 		if(dtio->ssl) {
2045 #ifdef HAVE_SSL
2046 			SSL_free(dtio->ssl);
2047 			dtio->ssl = NULL;
2048 #endif
2049 		}
2050 		dtio_close_fd(dtio);
2051 		dtio_reconnect_enable(dtio);
2052 		return;
2053 	}
2054 	dtio->event = ev;
2055 
2056 	/* setup protocol control message to start */
2057 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2058 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2059 		log_err("dnstap io: out of memory");
2060 		ub_event_free(dtio->event);
2061 		dtio->event = NULL;
2062 		if(dtio->ssl) {
2063 #ifdef HAVE_SSL
2064 			SSL_free(dtio->ssl);
2065 			dtio->ssl = NULL;
2066 #endif
2067 		}
2068 		dtio_close_fd(dtio);
2069 		dtio_reconnect_enable(dtio);
2070 		return;
2071 	}
2072 }
2073 
2074 /** perform the setup of the writer thread on the established event_base */
2075 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2076 {
2077 	dtio_setup_cmd(dtio);
2078 	dtio_setup_reconnect(dtio);
2079 	dtio_open_output(dtio);
2080 	if(!dtio_add_output_event_write(dtio))
2081 		return;
2082 }
2083 
2084 #ifndef THREADS_DISABLED
2085 /** the IO thread function for the DNSTAP IO */
2086 static void* dnstap_io(void* arg)
2087 {
2088 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2089 	time_t secs = 0;
2090 	struct timeval now;
2091 	log_thread_set(&dtio->threadnum);
2092 
2093 	/* setup */
2094 	verbose(VERB_ALGO, "start dnstap io thread");
2095 	dtio_setup_base(dtio, &secs, &now);
2096 	dtio_setup_on_base(dtio);
2097 
2098 	/* run */
2099 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2100 		log_err("dnstap io: dispatch failed, errno is %s",
2101 			strerror(errno));
2102 	}
2103 
2104 	/* cleanup */
2105 	verbose(VERB_ALGO, "stop dnstap io thread");
2106 	dtio_desetup(dtio);
2107 	return NULL;
2108 }
2109 #endif /* THREADS_DISABLED */
2110 
2111 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2112 	int numworkers)
2113 {
2114 	/* set up the thread, can fail */
2115 #ifndef USE_WINSOCK
2116 	if(pipe(dtio->commandpipe) == -1) {
2117 		log_err("failed to create pipe: %s", strerror(errno));
2118 		return 0;
2119 	}
2120 #else
2121 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2122 		log_err("failed to create _pipe: %s",
2123 			wsa_strerror(WSAGetLastError()));
2124 		return 0;
2125 	}
2126 #endif
2127 
2128 	/* start the thread */
2129 	dtio->threadnum = numworkers+1;
2130 	dtio->started = 1;
2131 #ifndef THREADS_DISABLED
2132 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2133 	(void)event_base_nothr;
2134 #else
2135 	dtio->event_base = event_base_nothr;
2136 	dtio_setup_on_base(dtio);
2137 #endif
2138 	return 1;
2139 }
2140 
2141 void dt_io_thread_stop(struct dt_io_thread* dtio)
2142 {
2143 #ifndef THREADS_DISABLED
2144 	uint8_t cmd = DTIO_COMMAND_STOP;
2145 #endif
2146 	if(!dtio) return;
2147 	if(!dtio->started) return;
2148 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2149 
2150 #ifndef THREADS_DISABLED
2151 	while(1) {
2152 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2153 		if(r == -1) {
2154 #ifndef USE_WINSOCK
2155 			if(errno == EINTR || errno == EAGAIN)
2156 				continue;
2157 #else
2158 			if(WSAGetLastError() == WSAEINPROGRESS)
2159 				continue;
2160 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2161 				continue;
2162 #endif
2163 			log_err("dnstap io stop: write: %s",
2164 				sock_strerror(errno));
2165 			break;
2166 		}
2167 		break;
2168 	}
2169 	dtio->started = 0;
2170 #endif /* THREADS_DISABLED */
2171 
2172 #ifndef USE_WINSOCK
2173 	close(dtio->commandpipe[1]);
2174 #else
2175 	_close(dtio->commandpipe[1]);
2176 #endif
2177 	dtio->commandpipe[1] = -1;
2178 #ifndef THREADS_DISABLED
2179 	ub_thread_join(dtio->tid);
2180 #else
2181 	dtio->want_to_exit = 1;
2182 	dtio_desetup(dtio);
2183 #endif
2184 }
2185