xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision bce7ee9d412b6410e6d799c4a417617cbb148e09)
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62 
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73 
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76 
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 	/** DTIO command channel stop */
81 	DTIO_COMMAND_STOP = 0,
82 	/** DTIO command channel wakeup */
83 	DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85 
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102 
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 	if(!mq) return NULL;
108 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 		about 1 M should contain 64K messages with some overhead,
110 		or a whole bunch smaller ones */
111 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 	if(!mq->wakeup_timer) {
113 		free(mq);
114 		return NULL;
115 	}
116 	lock_basic_init(&mq->lock);
117 	lock_protect(&mq->lock, mq, sizeof(*mq));
118 	return mq;
119 }
120 
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 	struct dt_msg_entry* e = mq->first, *next=NULL;
126 	while(e) {
127 		next = e->next;
128 		free(e->buf);
129 		free(e);
130 		e = next;
131 	}
132 	mq->first = NULL;
133 	mq->last = NULL;
134 	mq->cursize = 0;
135 	mq->msgcount = 0;
136 }
137 
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 	if(!mq) return;
142 	lock_basic_destroy(&mq->lock);
143 	dt_msg_queue_clear(mq);
144 	comm_timer_delete(mq->wakeup_timer);
145 	free(mq);
146 }
147 
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 	if(!dtio) return;
153 	if(!dtio->started) return;
154 
155 	while(1) {
156 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 		if(r == -1) {
158 #ifndef USE_WINSOCK
159 			if(errno == EINTR || errno == EAGAIN)
160 				continue;
161 #else
162 			if(WSAGetLastError() == WSAEINPROGRESS)
163 				continue;
164 			if(WSAGetLastError() == WSAEWOULDBLOCK)
165 				continue;
166 #endif
167 			log_err("dnstap io wakeup: write: %s",
168 				sock_strerror(errno));
169 			break;
170 		}
171 		break;
172 	}
173 }
174 
175 void
176 mq_wakeup_cb(void* arg)
177 {
178 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 	/* even if the dtio is already active, because perhaps much
180 	 * traffic suddenly, we leave the timer running to save on
181 	 * managing it, the once a second timer is less work then
182 	 * starting and stopping the timer frequently */
183 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 	mq->dtio->wakeup_timer_enabled = 0;
185 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 	dtio_wakeup(mq->dtio);
187 }
188 
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192 {
193 	struct timeval tv;
194 	/* Start a timer to process messages to be logged.
195 	 * If we woke up the dtio thread for every message, the wakeup
196 	 * messages take up too much processing power.  If the queue
197 	 * fills up the wakeup happens immediately.  The timer wakes it up
198 	 * if there are infrequent messages to log. */
199 
200 	/* we cannot start a timer in dtio thread, because it is a different
201 	 * thread and its event base is in use by the other thread, it would
202 	 * give race conditions if we tried to modify its event base,
203 	 * and locks would wait until it woke up, and this is what we do. */
204 
205 	/* do not start the timer if a timer already exists, perhaps
206 	 * in another worker.  So this variable is protected by a lock in
207 	 * dtio */
208 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 	if(mq->dtio->wakeup_timer_enabled) {
210 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211 		return;
212 	}
213 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215 
216 	/* start the timer, in mq, in the event base of our worker */
217 	tv.tv_sec = 1;
218 	tv.tv_usec = 0;
219 	comm_timer_set(mq->wakeup_timer, &tv);
220 }
221 
222 void
223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224 {
225 	int wakeupnow = 0, wakeupstarttimer = 0;
226 	struct dt_msg_entry* entry;
227 
228 	/* check conditions */
229 	if(!buf) return;
230 	if(len == 0) {
231 		/* it is not possible to log entries with zero length,
232 		 * because the framestream protocol does not carry it.
233 		 * However the protobuf serialization does not create zero
234 		 * length datagrams for dnstap, so this should not happen. */
235 		free(buf);
236 		return;
237 	}
238 	if(!mq) {
239 		free(buf);
240 		return;
241 	}
242 
243 	/* allocate memory for queue entry */
244 	entry = malloc(sizeof(*entry));
245 	if(!entry) {
246 		log_err("out of memory logging dnstap");
247 		free(buf);
248 		return;
249 	}
250 	entry->next = NULL;
251 	entry->buf = buf;
252 	entry->len = len;
253 
254 	/* aqcuire lock */
255 	lock_basic_lock(&mq->lock);
256 	/* if list was empty, start timer for (eventual) wakeup */
257 	if(mq->first == NULL)
258 		wakeupstarttimer = 1;
259 	/* if list contains more than wakeupnum elements, wakeup now,
260 	 * or if list is (going to be) almost full */
261 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262 		(mq->cursize < mq->maxsize * 9 / 10 &&
263 		mq->cursize+len >= mq->maxsize * 9 / 10))
264 		wakeupnow = 1;
265 	/* see if it is going to fit */
266 	if(mq->cursize + len > mq->maxsize) {
267 		/* buffer full, or congested. */
268 		/* drop */
269 		lock_basic_unlock(&mq->lock);
270 		free(buf);
271 		free(entry);
272 		return;
273 	}
274 	mq->cursize += len;
275 	mq->msgcount ++;
276 	/* append to list */
277 	if(mq->last) {
278 		mq->last->next = entry;
279 	} else {
280 		mq->first = entry;
281 	}
282 	mq->last = entry;
283 	/* release lock */
284 	lock_basic_unlock(&mq->lock);
285 
286 	if(wakeupnow) {
287 		dtio_wakeup(mq->dtio);
288 	} else if(wakeupstarttimer) {
289 		dt_msg_queue_start_timer(mq);
290 	}
291 }
292 
293 struct dt_io_thread* dt_io_thread_create(void)
294 {
295 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 	lock_basic_init(&dtio->wakeup_timer_lock);
297 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 		sizeof(dtio->wakeup_timer_enabled));
299 	return dtio;
300 }
301 
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
303 {
304 	struct dt_io_list_item* item, *nextitem;
305 	if(!dtio) return;
306 	lock_basic_destroy(&dtio->wakeup_timer_lock);
307 	item=dtio->io_list;
308 	while(item) {
309 		nextitem = item->next;
310 		free(item);
311 		item = nextitem;
312 	}
313 	free(dtio->socket_path);
314 	free(dtio->ip_str);
315 	free(dtio->tls_server_name);
316 	free(dtio->client_key_file);
317 	free(dtio->client_cert_file);
318 	if(dtio->ssl_ctx) {
319 #ifdef HAVE_SSL
320 		SSL_CTX_free(dtio->ssl_ctx);
321 #endif
322 	}
323 	free(dtio);
324 }
325 
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327 {
328 	if(!cfg->dnstap) {
329 		log_warn("cannot setup dnstap because dnstap-enable is no");
330 		return 0;
331 	}
332 
333 	/* what type of connectivity do we have */
334 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335 		if(cfg->dnstap_tls)
336 			dtio->upstream_is_tls = 1;
337 		else	dtio->upstream_is_tcp = 1;
338 	} else {
339 		dtio->upstream_is_unix = 1;
340 	}
341 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
342 
343 	if(dtio->upstream_is_unix) {
344 		if(!cfg->dnstap_socket_path ||
345 			cfg->dnstap_socket_path[0]==0) {
346 			log_err("dnstap setup: no dnstap-socket-path for "
347 				"socket connect");
348 			return 0;
349 		}
350 		free(dtio->socket_path);
351 		dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path,
352 			cfg, 1);
353 		if(!dtio->socket_path) {
354 			log_err("dnstap setup: malloc failure");
355 			return 0;
356 		}
357 	}
358 
359 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
360 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
361 			log_err("dnstap setup: no dnstap-ip for TCP connect");
362 			return 0;
363 		}
364 		free(dtio->ip_str);
365 		dtio->ip_str = strdup(cfg->dnstap_ip);
366 		if(!dtio->ip_str) {
367 			log_err("dnstap setup: malloc failure");
368 			return 0;
369 		}
370 	}
371 
372 	if(dtio->upstream_is_tls) {
373 #ifdef HAVE_SSL
374 		if(cfg->dnstap_tls_server_name &&
375 			cfg->dnstap_tls_server_name[0]) {
376 			free(dtio->tls_server_name);
377 			dtio->tls_server_name = strdup(
378 				cfg->dnstap_tls_server_name);
379 			if(!dtio->tls_server_name) {
380 				log_err("dnstap setup: malloc failure");
381 				return 0;
382 			}
383 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
384 				return 0;
385 		}
386 		if(cfg->dnstap_tls_client_key_file &&
387 			cfg->dnstap_tls_client_key_file[0]) {
388 			dtio->use_client_certs = 1;
389 			free(dtio->client_key_file);
390 			dtio->client_key_file = strdup(
391 				cfg->dnstap_tls_client_key_file);
392 			if(!dtio->client_key_file) {
393 				log_err("dnstap setup: malloc failure");
394 				return 0;
395 			}
396 			if(!cfg->dnstap_tls_client_cert_file ||
397 				cfg->dnstap_tls_client_cert_file[0]==0) {
398 				log_err("dnstap setup: client key "
399 					"authentication enabled with "
400 					"dnstap-tls-client-key-file, but "
401 					"no dnstap-tls-client-cert-file "
402 					"is given");
403 				return 0;
404 			}
405 			free(dtio->client_cert_file);
406 			dtio->client_cert_file = strdup(
407 				cfg->dnstap_tls_client_cert_file);
408 			if(!dtio->client_cert_file) {
409 				log_err("dnstap setup: malloc failure");
410 				return 0;
411 			}
412 		} else {
413 			dtio->use_client_certs = 0;
414 			dtio->client_key_file = NULL;
415 			dtio->client_cert_file = NULL;
416 		}
417 
418 		if(cfg->dnstap_tls_cert_bundle) {
419 			dtio->ssl_ctx = connect_sslctx_create(
420 				dtio->client_key_file,
421 				dtio->client_cert_file,
422 				cfg->dnstap_tls_cert_bundle, 0);
423 		} else {
424 			dtio->ssl_ctx = connect_sslctx_create(
425 				dtio->client_key_file,
426 				dtio->client_cert_file,
427 				cfg->tls_cert_bundle, cfg->tls_win_cert);
428 		}
429 		if(!dtio->ssl_ctx) {
430 			log_err("could not setup SSL CTX");
431 			return 0;
432 		}
433 		dtio->tls_use_sni = cfg->tls_use_sni;
434 #endif /* HAVE_SSL */
435 	}
436 	return 1;
437 }
438 
439 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
440         struct dt_msg_queue* mq)
441 {
442 	struct dt_io_list_item* item = malloc(sizeof(*item));
443 	if(!item) return 0;
444 	lock_basic_lock(&mq->lock);
445 	mq->dtio = dtio;
446 	lock_basic_unlock(&mq->lock);
447 	item->queue = mq;
448 	item->next = dtio->io_list;
449 	dtio->io_list = item;
450 	dtio->io_list_iter = NULL;
451 	return 1;
452 }
453 
454 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
455         struct dt_msg_queue* mq)
456 {
457 	struct dt_io_list_item* item, *prev=NULL;
458 	if(!dtio) return;
459 	item = dtio->io_list;
460 	while(item) {
461 		if(item->queue == mq) {
462 			/* found it */
463 			if(prev) prev->next = item->next;
464 			else dtio->io_list = item->next;
465 			/* the queue itself only registered, not deleted */
466 			lock_basic_lock(&item->queue->lock);
467 			item->queue->dtio = NULL;
468 			lock_basic_unlock(&item->queue->lock);
469 			free(item);
470 			dtio->io_list_iter = NULL;
471 			return;
472 		}
473 		prev = item;
474 		item = item->next;
475 	}
476 }
477 
478 /** pick a message from the queue, the routine locks and unlocks,
479  * returns true if there is a message */
480 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
481 	size_t* len)
482 {
483 	lock_basic_lock(&mq->lock);
484 	if(mq->first) {
485 		struct dt_msg_entry* entry = mq->first;
486 		mq->first = entry->next;
487 		if(!entry->next) mq->last = NULL;
488 		mq->cursize -= entry->len;
489 		mq->msgcount --;
490 		lock_basic_unlock(&mq->lock);
491 
492 		*buf = entry->buf;
493 		*len = entry->len;
494 		free(entry);
495 		return 1;
496 	}
497 	lock_basic_unlock(&mq->lock);
498 	return 0;
499 }
500 
501 /** find message in queue, false if no message, true if message to send */
502 static int dtio_find_in_queue(struct dt_io_thread* dtio,
503 	struct dt_msg_queue* mq)
504 {
505 	void* buf=NULL;
506 	size_t len=0;
507 	if(dt_msg_queue_pop(mq, &buf, &len)) {
508 		dtio->cur_msg = buf;
509 		dtio->cur_msg_len = len;
510 		dtio->cur_msg_done = 0;
511 		dtio->cur_msg_len_done = 0;
512 		return 1;
513 	}
514 	return 0;
515 }
516 
517 /** find a new message to write, search message queues, false if none */
518 static int dtio_find_msg(struct dt_io_thread* dtio)
519 {
520 	struct dt_io_list_item *spot, *item;
521 
522 	spot = dtio->io_list_iter;
523 	/* use the next queue for the next message lookup,
524 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
525 	if(spot)
526 		dtio->io_list_iter = spot->next;
527 	else if(dtio->io_list)
528 		dtio->io_list_iter = dtio->io_list->next;
529 
530 	/* scan from spot to end-of-io_list */
531 	item = spot;
532 	while(item) {
533 		if(dtio_find_in_queue(dtio, item->queue))
534 			return 1;
535 		item = item->next;
536 	}
537 	/* scan starting at the start-of-list (to wrap around the end) */
538 	item = dtio->io_list;
539 	while(item) {
540 		if(dtio_find_in_queue(dtio, item->queue))
541 			return 1;
542 		item = item->next;
543 	}
544 	return 0;
545 }
546 
547 /** callback for the dnstap reconnect, to start reconnecting to output */
548 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
549 	short ATTR_UNUSED(bits), void* arg)
550 {
551 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
552 	dtio->reconnect_is_added = 0;
553 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
554 
555 	dtio_open_output(dtio);
556 	if(dtio->event) {
557 		if(!dtio_add_output_event_write(dtio))
558 			return;
559 		/* nothing wrong so far, wait on the output event */
560 		return;
561 	}
562 	/* exponential backoff and retry on timer */
563 	dtio_reconnect_enable(dtio);
564 }
565 
566 /** attempt to reconnect to the output, after a timeout */
567 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
568 {
569 	struct timeval tv;
570 	int msec;
571 	if(dtio->want_to_exit) return;
572 	if(dtio->reconnect_is_added)
573 		return; /* already done */
574 
575 	/* exponential backoff, store the value for next timeout */
576 	msec = dtio->reconnect_timeout;
577 	if(msec == 0) {
578 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
579 	} else {
580 		dtio->reconnect_timeout = msec*2;
581 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
582 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
583 	}
584 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
585 		msec);
586 
587 	/* setup wait timer */
588 	memset(&tv, 0, sizeof(tv));
589 	tv.tv_sec = msec/1000;
590 	tv.tv_usec = (msec%1000)*1000;
591 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
592 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
593 		log_err("dnstap io: could not reconnect ev timer add");
594 		return;
595 	}
596 	dtio->reconnect_is_added = 1;
597 }
598 
599 /** remove dtio reconnect timer */
600 static void dtio_reconnect_del(struct dt_io_thread* dtio)
601 {
602 	if(!dtio->reconnect_is_added)
603 		return;
604 	ub_timer_del(dtio->reconnect_timer);
605 	dtio->reconnect_is_added = 0;
606 }
607 
608 /** clear the reconnect exponential backoff timer.
609  * We have successfully connected so we can try again with short timeouts. */
610 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
611 {
612 	dtio->reconnect_timeout = 0;
613 	dtio_reconnect_del(dtio);
614 }
615 
616 /** reconnect slowly, because we already know we have to wait for a bit */
617 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
618 {
619 	dtio_reconnect_del(dtio);
620 	dtio->reconnect_timeout = msec;
621 	dtio_reconnect_enable(dtio);
622 }
623 
624 /** delete the current message in the dtio, and reset counters */
625 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
626 {
627 	free(dtio->cur_msg);
628 	dtio->cur_msg = NULL;
629 	dtio->cur_msg_len = 0;
630 	dtio->cur_msg_done = 0;
631 	dtio->cur_msg_len_done = 0;
632 }
633 
634 /** delete the buffer and counters used to read frame */
635 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
636 {
637 	if(rb->buf) {
638 		free(rb->buf);
639 		rb->buf = NULL;
640 	}
641 	rb->buf_count = 0;
642 	rb->buf_cap = 0;
643 	rb->frame_len = 0;
644 	rb->frame_len_done = 0;
645 	rb->control_frame = 0;
646 }
647 
648 /** del the output file descriptor event for listening */
649 static void dtio_del_output_event(struct dt_io_thread* dtio)
650 {
651 	if(!dtio->event_added)
652 		return;
653 	ub_event_del(dtio->event);
654 	dtio->event_added = 0;
655 	dtio->event_added_is_write = 0;
656 }
657 
658 /** close dtio socket and set it to -1 */
659 static void dtio_close_fd(struct dt_io_thread* dtio)
660 {
661 	sock_close(dtio->fd);
662 	dtio->fd = -1;
663 }
664 
665 /** close and stop the output file descriptor event */
666 static void dtio_close_output(struct dt_io_thread* dtio)
667 {
668 	if(!dtio->event)
669 		return;
670 	ub_event_free(dtio->event);
671 	dtio->event = NULL;
672 	if(dtio->ssl) {
673 #ifdef HAVE_SSL
674 		SSL_shutdown(dtio->ssl);
675 		SSL_free(dtio->ssl);
676 		dtio->ssl = NULL;
677 #endif
678 	}
679 	dtio_close_fd(dtio);
680 
681 	/* if there is a (partial) message, discard it
682 	 * we cannot send (the remainder of) it, and a new
683 	 * connection needs to start with a control frame. */
684 	if(dtio->cur_msg) {
685 		dtio_cur_msg_free(dtio);
686 	}
687 
688 	dtio->ready_frame_sent = 0;
689 	dtio->accept_frame_received = 0;
690 	dtio_read_frame_free(&dtio->read_frame);
691 
692 	dtio_reconnect_enable(dtio);
693 }
694 
695 /** check for pending nonblocking connect errors,
696  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
697 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
698 {
699 	int error = 0;
700 	socklen_t len = (socklen_t)sizeof(error);
701 	if(!dtio->check_nb_connect)
702 		return 1; /* everything okay */
703 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
704 		&len) < 0) {
705 #ifndef USE_WINSOCK
706 		error = errno; /* on solaris errno is error */
707 #else
708 		error = WSAGetLastError();
709 #endif
710 	}
711 #ifndef USE_WINSOCK
712 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
713 	if(error == EINPROGRESS || error == EWOULDBLOCK)
714 		return 0; /* try again later */
715 #endif
716 #else
717 	if(error == WSAEINPROGRESS) {
718 		return 0; /* try again later */
719 	} else if(error == WSAEWOULDBLOCK) {
720 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
721 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
722 		return 0; /* try again later */
723 	}
724 #endif
725 	if(error != 0) {
726 		char* to = dtio->socket_path;
727 		if(!to) to = dtio->ip_str;
728 		if(!to) to = "";
729 		log_err("dnstap io: failed to connect to \"%s\": %s",
730 			to, sock_strerror(error));
731 		return -1; /* error, close it */
732 	}
733 
734 	if(dtio->ip_str)
735 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
736 			dtio->ip_str);
737 	else if(dtio->socket_path)
738 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
739 			dtio->socket_path);
740 	dtio_reconnect_clear(dtio);
741 	dtio->check_nb_connect = 0;
742 	return 1; /* everything okay */
743 }
744 
745 #ifdef HAVE_SSL
746 /** write to ssl output
747  * returns number of bytes written, 0 if nothing happened,
748  * try again later, or -1 if the channel is to be closed. */
749 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
750 	size_t len)
751 {
752 	int r;
753 	ERR_clear_error();
754 	r = SSL_write(dtio->ssl, buf, len);
755 	if(r <= 0) {
756 		int want = SSL_get_error(dtio->ssl, r);
757 		if(want == SSL_ERROR_ZERO_RETURN) {
758 			/* closed */
759 			return -1;
760 		} else if(want == SSL_ERROR_WANT_READ) {
761 			/* we want a brief read event */
762 			dtio_enable_brief_read(dtio);
763 			return 0;
764 		} else if(want == SSL_ERROR_WANT_WRITE) {
765 			/* write again later */
766 			return 0;
767 		} else if(want == SSL_ERROR_SYSCALL) {
768 #ifdef EPIPE
769 			if(errno == EPIPE && verbosity < 2)
770 				return -1; /* silence 'broken pipe' */
771 #endif
772 #ifdef ECONNRESET
773 			if(errno == ECONNRESET && verbosity < 2)
774 				return -1; /* silence reset by peer */
775 #endif
776 			if(errno != 0) {
777 				log_err("dnstap io, SSL_write syscall: %s",
778 					strerror(errno));
779 			}
780 			return -1;
781 		}
782 		log_crypto_err("dnstap io, could not SSL_write");
783 		return -1;
784 	}
785 	return r;
786 }
787 #endif /* HAVE_SSL */
788 
789 /** write buffer to output.
790  * returns number of bytes written, 0 if nothing happened,
791  * try again later, or -1 if the channel is to be closed. */
792 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
793 	size_t len)
794 {
795 	ssize_t ret;
796 	if(dtio->fd == -1)
797 		return -1;
798 #ifdef HAVE_SSL
799 	if(dtio->ssl)
800 		return dtio_write_ssl(dtio, buf, len);
801 #endif
802 	ret = send(dtio->fd, (void*)buf, len, 0);
803 	if(ret == -1) {
804 #ifndef USE_WINSOCK
805 		if(errno == EINTR || errno == EAGAIN)
806 			return 0;
807 #else
808 		if(WSAGetLastError() == WSAEINPROGRESS)
809 			return 0;
810 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
811 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
812 				dtio->stop_flush_event:dtio->event),
813 				UB_EV_WRITE);
814 			return 0;
815 		}
816 #endif
817 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
818 		return -1;
819 	}
820 	return ret;
821 }
822 
823 #ifdef HAVE_WRITEV
824 /** write with writev, len and message, in one write, if possible.
825  * return true if message is done, false if incomplete */
826 static int dtio_write_with_writev(struct dt_io_thread* dtio)
827 {
828 	uint32_t sendlen = htonl(dtio->cur_msg_len);
829 	struct iovec iov[2];
830 	ssize_t r;
831 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
832 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
833 	iov[1].iov_base = dtio->cur_msg;
834 	iov[1].iov_len = dtio->cur_msg_len;
835 	log_assert(iov[0].iov_len > 0);
836 	r = writev(dtio->fd, iov, 2);
837 	if(r == -1) {
838 #ifndef USE_WINSOCK
839 		if(errno == EINTR || errno == EAGAIN)
840 			return 0;
841 #else
842 		if(WSAGetLastError() == WSAEINPROGRESS)
843 			return 0;
844 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
845 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
846 				dtio->stop_flush_event:dtio->event),
847 				UB_EV_WRITE);
848 			return 0;
849 		}
850 #endif
851 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
852 		/* close the channel */
853 		dtio_del_output_event(dtio);
854 		dtio_close_output(dtio);
855 		return 0;
856 	}
857 	/* written r bytes */
858 	dtio->cur_msg_len_done += r;
859 	if(dtio->cur_msg_len_done < 4)
860 		return 0;
861 	if(dtio->cur_msg_len_done > 4) {
862 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
863 		dtio->cur_msg_len_done = 4;
864 	}
865 	if(dtio->cur_msg_done < dtio->cur_msg_len)
866 		return 0;
867 	return 1;
868 }
869 #endif /* HAVE_WRITEV */
870 
871 /** write more of the length, preceding the data frame.
872  * return true if message is done, false if incomplete. */
873 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
874 {
875 	uint32_t sendlen;
876 	int r;
877 	if(dtio->cur_msg_len_done >= 4)
878 		return 1;
879 #ifdef HAVE_WRITEV
880 	if(!dtio->ssl) {
881 		/* we try writev for everything.*/
882 		return dtio_write_with_writev(dtio);
883 	}
884 #endif /* HAVE_WRITEV */
885 	sendlen = htonl(dtio->cur_msg_len);
886 	r = dtio_write_buf(dtio,
887 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
888 		sizeof(sendlen)-dtio->cur_msg_len_done);
889 	if(r == -1) {
890 		/* close the channel */
891 		dtio_del_output_event(dtio);
892 		dtio_close_output(dtio);
893 		return 0;
894 	} else if(r == 0) {
895 		/* try again later */
896 		return 0;
897 	}
898 	dtio->cur_msg_len_done += r;
899 	if(dtio->cur_msg_len_done < 4)
900 		return 0;
901 	return 1;
902 }
903 
904 /** write more of the data frame.
905  * return true if message is done, false if incomplete. */
906 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
907 {
908 	int r;
909 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
910 		return 1;
911 	r = dtio_write_buf(dtio,
912 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
913 		dtio->cur_msg_len - dtio->cur_msg_done);
914 	if(r == -1) {
915 		/* close the channel */
916 		dtio_del_output_event(dtio);
917 		dtio_close_output(dtio);
918 		return 0;
919 	} else if(r == 0) {
920 		/* try again later */
921 		return 0;
922 	}
923 	dtio->cur_msg_done += r;
924 	if(dtio->cur_msg_done < dtio->cur_msg_len)
925 		return 0;
926 	return 1;
927 }
928 
929 /** write more of the current messsage. false if incomplete, true if
930  * the message is done */
931 static int dtio_write_more(struct dt_io_thread* dtio)
932 {
933 	if(dtio->cur_msg_len_done < 4) {
934 		if(!dtio_write_more_of_len(dtio))
935 			return 0;
936 	}
937 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
938 		if(!dtio_write_more_of_data(dtio))
939 			return 0;
940 	}
941 	return 1;
942 }
943 
944 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
945  * -1: continue, >0: number of bytes read into buffer */
946 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
947 	ssize_t r;
948 	r = recv(dtio->fd, (void*)buf, len, 0);
949 	if(r == -1) {
950 		char* to = dtio->socket_path;
951 		if(!to) to = dtio->ip_str;
952 		if(!to) to = "";
953 #ifndef USE_WINSOCK
954 		if(errno == EINTR || errno == EAGAIN)
955 			return -1; /* try later */
956 #else
957 		if(WSAGetLastError() == WSAEINPROGRESS) {
958 			return -1; /* try later */
959 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
960 			ub_winsock_tcp_wouldblock(
961 				(dtio->stop_flush_event?
962 				dtio->stop_flush_event:dtio->event),
963 				UB_EV_READ);
964 			return -1; /* try later */
965 		}
966 #endif
967 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
968 			verbosity < 4)
969 			return 0; /* no log retries on low verbosity */
970 		log_err("dnstap io: output closed, recv %s: %s", to,
971 			strerror(errno));
972 		/* and close below */
973 		return 0;
974 	}
975 	if(r == 0) {
976 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977 			verbosity < 4)
978 			return 0; /* no log retries on low verbosity */
979 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
980 		/* and close below */
981 		return 0;
982 	}
983 	/* something was received */
984 	return r;
985 }
986 
987 #ifdef HAVE_SSL
988 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
989  * -1: continue, >0: number of bytes read into buffer */
990 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
991 {
992 	int r;
993 	ERR_clear_error();
994 	r = SSL_read(dtio->ssl, buf, len);
995 	if(r <= 0) {
996 		int want = SSL_get_error(dtio->ssl, r);
997 		if(want == SSL_ERROR_ZERO_RETURN) {
998 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
999 				verbosity < 4)
1000 				return 0; /* no log retries on low verbosity */
1001 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1002 				"other side");
1003 			return 0;
1004 		} else if(want == SSL_ERROR_WANT_READ) {
1005 			/* continue later */
1006 			return -1;
1007 		} else if(want == SSL_ERROR_WANT_WRITE) {
1008 			(void)dtio_enable_brief_write(dtio);
1009 			return -1;
1010 		} else if(want == SSL_ERROR_SYSCALL) {
1011 #ifdef ECONNRESET
1012 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1013 				errno == ECONNRESET && verbosity < 4)
1014 				return 0; /* silence reset by peer */
1015 #endif
1016 			if(errno != 0)
1017 				log_err("SSL_read syscall: %s",
1018 					strerror(errno));
1019 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1020 				"other side");
1021 			return 0;
1022 		}
1023 		log_crypto_err("could not SSL_read");
1024 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1025 				"other side");
1026 		return 0;
1027 	}
1028 	return r;
1029 }
1030 #endif /* HAVE_SSL */
1031 
1032 /** check if the output fd has been closed,
1033  * it returns false if the stream is closed. */
1034 static int dtio_check_close(struct dt_io_thread* dtio)
1035 {
1036 	/* we don't want to read any packets, but if there are we can
1037 	 * discard the input (ignore it).  Ignore of unknown (control)
1038 	 * packets is okay for the framestream protocol.  And also, the
1039 	 * read call can return that the stream has been closed by the
1040 	 * other side. */
1041 	uint8_t buf[1024];
1042 	int r = -1;
1043 
1044 
1045 	if(dtio->fd == -1) return 0;
1046 
1047 	while(r != 0) {
1048 		/* not interested in buffer content, overwrite */
1049 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1050 		if(r == -1)
1051 			return 1;
1052 	}
1053 	/* the other end has been closed */
1054 	/* close the channel */
1055 	dtio_del_output_event(dtio);
1056 	dtio_close_output(dtio);
1057 	return 0;
1058 }
1059 
1060 /** Read accept frame. Returns -1: continue reading, 0: closed,
1061  * 1: valid accept received. */
1062 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1063 {
1064 	int r;
1065 	size_t read_frame_done;
1066 	while(dtio->read_frame.frame_len_done < 4) {
1067 #ifdef HAVE_SSL
1068 		if(dtio->ssl) {
1069 			r = ssl_read_bytes(dtio,
1070 				(uint8_t*)&dtio->read_frame.frame_len+
1071 				dtio->read_frame.frame_len_done,
1072 				4-dtio->read_frame.frame_len_done);
1073 		} else {
1074 #endif
1075 			r = receive_bytes(dtio,
1076 				(uint8_t*)&dtio->read_frame.frame_len+
1077 				dtio->read_frame.frame_len_done,
1078 				4-dtio->read_frame.frame_len_done);
1079 #ifdef HAVE_SSL
1080 		}
1081 #endif
1082 		if(r == -1)
1083 			return -1; /* continue reading */
1084 		if(r == 0) {
1085 			 /* connection closed */
1086 			goto close_connection;
1087 		}
1088 		dtio->read_frame.frame_len_done += r;
1089 		if(dtio->read_frame.frame_len_done < 4)
1090 			return -1; /* continue reading */
1091 
1092 		if(dtio->read_frame.frame_len == 0) {
1093 			dtio->read_frame.frame_len_done = 0;
1094 			dtio->read_frame.control_frame = 1;
1095 			continue;
1096 		}
1097 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1098 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1099 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1100 				"length of %d bytes, closing connection",
1101 				DTIO_RECV_FRAME_MAX_LEN);
1102 			goto close_connection;
1103 		}
1104 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1105 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1106 		if(!dtio->read_frame.buf) {
1107 			log_err("dnstap io: out of memory (creating read "
1108 				"buffer)");
1109 			goto close_connection;
1110 		}
1111 	}
1112 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1113 #ifdef HAVE_SSL
1114 		if(dtio->ssl) {
1115 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1116 				dtio->read_frame.buf_count,
1117 				dtio->read_frame.buf_cap-
1118 				dtio->read_frame.buf_count);
1119 		} else {
1120 #endif
1121 			r = receive_bytes(dtio, dtio->read_frame.buf+
1122 				dtio->read_frame.buf_count,
1123 				dtio->read_frame.buf_cap-
1124 				dtio->read_frame.buf_count);
1125 #ifdef HAVE_SSL
1126 		}
1127 #endif
1128 		if(r == -1)
1129 			return -1; /* continue reading */
1130 		if(r == 0) {
1131 			 /* connection closed */
1132 			goto close_connection;
1133 		}
1134 		dtio->read_frame.buf_count += r;
1135 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1136 			return -1; /* continue reading */
1137 	}
1138 
1139 	/* Complete frame received, check if this is a valid ACCEPT control
1140 	 * frame. */
1141 	if(dtio->read_frame.frame_len < 4) {
1142 		verbose(VERB_OPS, "dnstap: invalid data received");
1143 		goto close_connection;
1144 	}
1145 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1146 		FSTRM_CONTROL_FRAME_ACCEPT) {
1147 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1148 			"ignored");
1149 		dtio->ready_frame_sent = 0;
1150 		dtio->accept_frame_received = 0;
1151 		dtio_read_frame_free(&dtio->read_frame);
1152 		return -1;
1153 	}
1154 	read_frame_done = 4; /* control frame type */
1155 
1156 	/* Iterate over control fields, ignore unknown types.
1157 	 * Need to be able to read at least 8 bytes (control field type +
1158 	 * length). */
1159 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1160 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1161 			read_frame_done);
1162 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1163 			read_frame_done + 4);
1164 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1165 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1166 				read_frame_done+8+len <=
1167 				dtio->read_frame.frame_len &&
1168 				memcmp(dtio->read_frame.buf + read_frame_done +
1169 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1170 				if(!dtio_control_start_send(dtio)) {
1171 					verbose(VERB_OPS, "dnstap io: out of "
1172 					 "memory while sending START frame");
1173 					goto close_connection;
1174 				}
1175 				dtio->accept_frame_received = 1;
1176 				if(!dtio_add_output_event_write(dtio))
1177 					goto close_connection;
1178 				return 1;
1179 			} else {
1180 				/* unknow content type */
1181 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1182 					"contains unknown content type, "
1183 					"closing connection");
1184 				goto close_connection;
1185 			}
1186 		}
1187 		/* unknown option, try next */
1188 		read_frame_done += 8+len;
1189 	}
1190 
1191 
1192 close_connection:
1193 	dtio_del_output_event(dtio);
1194 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1195 	dtio_close_output(dtio);
1196 	return 0;
1197 }
1198 
1199 /** add the output file descriptor event for listening, read only */
1200 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1201 {
1202 	if(!dtio->event)
1203 		return 0;
1204 	if(dtio->event_added && !dtio->event_added_is_write)
1205 		return 1;
1206 	/* we have to (re-)register the event */
1207 	if(dtio->event_added)
1208 		ub_event_del(dtio->event);
1209 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1210 	if(ub_event_add(dtio->event, NULL) != 0) {
1211 		log_err("dnstap io: out of memory (adding event)");
1212 		dtio->event_added = 0;
1213 		dtio->event_added_is_write = 0;
1214 		/* close output and start reattempts to open it */
1215 		dtio_close_output(dtio);
1216 		return 0;
1217 	}
1218 	dtio->event_added = 1;
1219 	dtio->event_added_is_write = 0;
1220 	return 1;
1221 }
1222 
1223 /** add the output file descriptor event for listening, read and write */
1224 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1225 {
1226 	if(!dtio->event)
1227 		return 0;
1228 	if(dtio->event_added && dtio->event_added_is_write)
1229 		return 1;
1230 	/* we have to (re-)register the event */
1231 	if(dtio->event_added)
1232 		ub_event_del(dtio->event);
1233 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1234 	if(ub_event_add(dtio->event, NULL) != 0) {
1235 		log_err("dnstap io: out of memory (adding event)");
1236 		dtio->event_added = 0;
1237 		dtio->event_added_is_write = 0;
1238 		/* close output and start reattempts to open it */
1239 		dtio_close_output(dtio);
1240 		return 0;
1241 	}
1242 	dtio->event_added = 1;
1243 	dtio->event_added_is_write = 1;
1244 	return 1;
1245 }
1246 
1247 /** put the dtio thread to sleep */
1248 static void dtio_sleep(struct dt_io_thread* dtio)
1249 {
1250 	/* unregister the event polling for write, because there is
1251 	 * nothing to be written */
1252 	(void)dtio_add_output_event_read(dtio);
1253 }
1254 
1255 #ifdef HAVE_SSL
1256 /** enable the brief read condition */
1257 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1258 {
1259 	dtio->ssl_brief_read = 1;
1260 	if(dtio->stop_flush_event) {
1261 		ub_event_del(dtio->stop_flush_event);
1262 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1263 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1264 			log_err("dnstap io, stop flush, could not ub_event_add");
1265 			return 0;
1266 		}
1267 		return 1;
1268 	}
1269 	return dtio_add_output_event_read(dtio);
1270 }
1271 #endif /* HAVE_SSL */
1272 
1273 #ifdef HAVE_SSL
1274 /** disable the brief read condition */
1275 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1276 {
1277 	dtio->ssl_brief_read = 0;
1278 	if(dtio->stop_flush_event) {
1279 		ub_event_del(dtio->stop_flush_event);
1280 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1281 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1282 			log_err("dnstap io, stop flush, could not ub_event_add");
1283 			return 0;
1284 		}
1285 		return 1;
1286 	}
1287 	return dtio_add_output_event_write(dtio);
1288 }
1289 #endif /* HAVE_SSL */
1290 
1291 #ifdef HAVE_SSL
1292 /** enable the brief write condition */
1293 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1294 {
1295 	dtio->ssl_brief_write = 1;
1296 	return dtio_add_output_event_write(dtio);
1297 }
1298 #endif /* HAVE_SSL */
1299 
1300 #ifdef HAVE_SSL
1301 /** disable the brief write condition */
1302 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1303 {
1304 	dtio->ssl_brief_write = 0;
1305 	return dtio_add_output_event_read(dtio);
1306 }
1307 #endif /* HAVE_SSL */
1308 
1309 #ifdef HAVE_SSL
1310 /** check peer verification after ssl handshake connection, false if closed*/
1311 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1312 {
1313 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1314 		/* verification */
1315 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1316 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1317 			if(!x) {
1318 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1319 					"connection failed no certificate",
1320 					dtio->ip_str);
1321 				return 0;
1322 			}
1323 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1324 				x);
1325 #ifdef HAVE_SSL_GET0_PEERNAME
1326 			if(SSL_get0_peername(dtio->ssl)) {
1327 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328 					"connection to %s authenticated",
1329 					dtio->ip_str,
1330 					SSL_get0_peername(dtio->ssl));
1331 			} else {
1332 #endif
1333 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1334 					"connection authenticated",
1335 					dtio->ip_str);
1336 #ifdef HAVE_SSL_GET0_PEERNAME
1337 			}
1338 #endif
1339 			X509_free(x);
1340 		} else {
1341 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1342 			if(x) {
1343 				log_cert(VERB_ALGO, "dnstap io, peer "
1344 					"certificate", x);
1345 				X509_free(x);
1346 			}
1347 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1348 				"failed: failed to authenticate",
1349 				dtio->ip_str);
1350 			return 0;
1351 		}
1352 	} else {
1353 		/* unauthenticated, the verify peer flag was not set
1354 		 * in ssl when the ssl object was created from ssl_ctx */
1355 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1356 			dtio->ip_str);
1357 	}
1358 	return 1;
1359 }
1360 #endif /* HAVE_SSL */
1361 
1362 #ifdef HAVE_SSL
1363 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1364 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1365 	struct stop_flush_info* info)
1366 {
1367 	int r;
1368 	if(dtio->ssl_brief_read) {
1369 		/* assume the brief read condition is satisfied,
1370 		 * if we need more or again, we can set it again */
1371 		if(!dtio_disable_brief_read(dtio)) {
1372 			if(info) dtio_stop_flush_exit(info);
1373 			return 0;
1374 		}
1375 	}
1376 	if(dtio->ssl_handshake_done)
1377 		return 1;
1378 
1379 	ERR_clear_error();
1380 	r = SSL_do_handshake(dtio->ssl);
1381 	if(r != 1) {
1382 		int want = SSL_get_error(dtio->ssl, r);
1383 		if(want == SSL_ERROR_WANT_READ) {
1384 			/* we want to read on the connection */
1385 			if(!dtio_enable_brief_read(dtio)) {
1386 				if(info) dtio_stop_flush_exit(info);
1387 				return 0;
1388 			}
1389 			return 0;
1390 		} else if(want == SSL_ERROR_WANT_WRITE) {
1391 			/* we want to write on the connection */
1392 			return 0;
1393 		} else if(r == 0) {
1394 			/* closed */
1395 			if(info) dtio_stop_flush_exit(info);
1396 			dtio_del_output_event(dtio);
1397 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1398 			dtio_close_output(dtio);
1399 			return 0;
1400 		} else if(want == SSL_ERROR_SYSCALL) {
1401 			/* SYSCALL and errno==0 means closed uncleanly */
1402 			int silent = 0;
1403 #ifdef EPIPE
1404 			if(errno == EPIPE && verbosity < 2)
1405 				silent = 1; /* silence 'broken pipe' */
1406 #endif
1407 #ifdef ECONNRESET
1408 			if(errno == ECONNRESET && verbosity < 2)
1409 				silent = 1; /* silence reset by peer */
1410 #endif
1411 			if(errno == 0)
1412 				silent = 1;
1413 			if(!silent)
1414 				log_err("dnstap io, SSL_handshake syscall: %s",
1415 					strerror(errno));
1416 			/* closed */
1417 			if(info) dtio_stop_flush_exit(info);
1418 			dtio_del_output_event(dtio);
1419 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1420 			dtio_close_output(dtio);
1421 			return 0;
1422 		} else {
1423 			unsigned long err = ERR_get_error();
1424 			if(!squelch_err_ssl_handshake(err)) {
1425 				log_crypto_err_code("dnstap io, ssl handshake failed",
1426 					err);
1427 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1428 					"from %s", dtio->ip_str);
1429 			}
1430 			/* closed */
1431 			if(info) dtio_stop_flush_exit(info);
1432 			dtio_del_output_event(dtio);
1433 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1434 			dtio_close_output(dtio);
1435 			return 0;
1436 		}
1437 
1438 	}
1439 	/* check peer verification */
1440 	dtio->ssl_handshake_done = 1;
1441 
1442 	if(!dtio_ssl_check_peer(dtio)) {
1443 		/* closed */
1444 		if(info) dtio_stop_flush_exit(info);
1445 		dtio_del_output_event(dtio);
1446 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1447 		dtio_close_output(dtio);
1448 		return 0;
1449 	}
1450 	return 1;
1451 }
1452 #endif /* HAVE_SSL */
1453 
1454 /** callback for the dnstap events, to write to the output */
1455 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1456 {
1457 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1458 	int i;
1459 
1460 	if(dtio->check_nb_connect) {
1461 		int connect_err = dtio_check_nb_connect(dtio);
1462 		if(connect_err == -1) {
1463 			/* close the channel */
1464 			dtio_del_output_event(dtio);
1465 			dtio_close_output(dtio);
1466 			return;
1467 		} else if(connect_err == 0) {
1468 			/* try again later */
1469 			return;
1470 		}
1471 		/* nonblocking connect check passed, continue */
1472 	}
1473 
1474 #ifdef HAVE_SSL
1475 	if(dtio->ssl &&
1476 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1477 		if(!dtio_ssl_handshake(dtio, NULL))
1478 			return;
1479 	}
1480 #endif
1481 
1482 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1483 		if(dtio->ssl_brief_write)
1484 			(void)dtio_disable_brief_write(dtio);
1485 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1486 			if(dtio_read_accept_frame(dtio) <= 0)
1487 				return;
1488 		} else if(!dtio_check_close(dtio))
1489 			return;
1490 	}
1491 
1492 	/* loop to process a number of messages.  This improves throughput,
1493 	 * because selecting on write-event if not needed for busy messages
1494 	 * (dnstap log) generation and if they need to all be written back.
1495 	 * The write event is usually not blocked up.  But not forever,
1496 	 * because the event loop needs to stay responsive for other events.
1497 	 * If there are no (more) messages, or if the output buffers get
1498 	 * full, it returns out of the loop. */
1499 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1500 		/* see if there are messages that need writing */
1501 		if(!dtio->cur_msg) {
1502 			if(!dtio_find_msg(dtio)) {
1503 				if(i == 0) {
1504 					/* no messages on the first iteration,
1505 					 * the queues are all empty */
1506 					dtio_sleep(dtio);
1507 				}
1508 				return; /* nothing to do */
1509 			}
1510 		}
1511 
1512 		/* write it */
1513 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1514 			if(!dtio_write_more(dtio))
1515 				return;
1516 		}
1517 
1518 		/* done with the current message */
1519 		dtio_cur_msg_free(dtio);
1520 
1521 		/* If this is a bidirectional stream the first message will be
1522 		 * the READY control frame. We can only continue writing after
1523 		 * receiving an ACCEPT control frame. */
1524 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1525 			dtio->ready_frame_sent = 1;
1526 			(void)dtio_add_output_event_read(dtio);
1527 			break;
1528 		}
1529 	}
1530 }
1531 
1532 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1533 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1534 {
1535 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1536 	uint8_t cmd;
1537 	ssize_t r;
1538 	if(dtio->want_to_exit)
1539 		return;
1540 	r = read(fd, &cmd, sizeof(cmd));
1541 	if(r == -1) {
1542 #ifndef USE_WINSOCK
1543 		if(errno == EINTR || errno == EAGAIN)
1544 			return; /* ignore this */
1545 #else
1546 		if(WSAGetLastError() == WSAEINPROGRESS)
1547 			return;
1548 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1549 			return;
1550 #endif
1551 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1552 		/* and then fall through to quit the thread */
1553 	} else if(r == 0) {
1554 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1555 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1556 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1557 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1558 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1559 
1560 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1561 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1562 				"waiting for ACCEPT control frame");
1563 			return;
1564 		}
1565 
1566 		/* reregister event */
1567 		if(!dtio_add_output_event_write(dtio))
1568 			return;
1569 		return;
1570 	} else if(r == 1) {
1571 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1572 	}
1573 	dtio->want_to_exit = 1;
1574 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1575 		!= 0) {
1576 		log_err("dnstap io: could not loopexit");
1577 	}
1578 }
1579 
1580 #ifndef THREADS_DISABLED
1581 /** setup the event base for the dnstap io thread */
1582 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1583 	struct timeval* now)
1584 {
1585 	memset(now, 0, sizeof(*now));
1586 	dtio->event_base = ub_default_event_base(0, secs, now);
1587 	if(!dtio->event_base) {
1588 		fatal_exit("dnstap io: could not create event_base");
1589 	}
1590 }
1591 #endif /* THREADS_DISABLED */
1592 
1593 /** setup the cmd event for dnstap io */
1594 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1595 {
1596 	struct ub_event* cmdev;
1597 	fd_set_nonblock(dtio->commandpipe[0]);
1598 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1599 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1600 	if(!cmdev) {
1601 		fatal_exit("dnstap io: out of memory");
1602 	}
1603 	dtio->command_event = cmdev;
1604 	if(ub_event_add(cmdev, NULL) != 0) {
1605 		fatal_exit("dnstap io: out of memory (adding event)");
1606 	}
1607 }
1608 
1609 /** setup the reconnect event for dnstap io */
1610 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1611 {
1612 	dtio_reconnect_clear(dtio);
1613 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1614 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1615 	if(!dtio->reconnect_timer) {
1616 		fatal_exit("dnstap io: out of memory");
1617 	}
1618 }
1619 
1620 /**
1621  * structure to keep track of information during stop flush
1622  */
1623 struct stop_flush_info {
1624 	/** the event base during stop flush */
1625 	struct ub_event_base* base;
1626 	/** did we already want to exit this stop-flush event base */
1627 	int want_to_exit_flush;
1628 	/** has the timer fired */
1629 	int timer_done;
1630 	/** the dtio */
1631 	struct dt_io_thread* dtio;
1632 	/** the stop control frame */
1633 	void* stop_frame;
1634 	/** length of the stop frame */
1635 	size_t stop_frame_len;
1636 	/** how much we have done of the stop frame */
1637 	size_t stop_frame_done;
1638 };
1639 
1640 /** exit the stop flush base */
1641 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1642 {
1643 	if(info->want_to_exit_flush)
1644 		return;
1645 	info->want_to_exit_flush = 1;
1646 	if(ub_event_base_loopexit(info->base) != 0) {
1647 		log_err("dnstap io: could not loopexit");
1648 	}
1649 }
1650 
1651 /** send the stop control,
1652  * return true if completed the frame. */
1653 static int dtio_control_stop_send(struct stop_flush_info* info)
1654 {
1655 	struct dt_io_thread* dtio = info->dtio;
1656 	int r;
1657 	if(info->stop_frame_done >= info->stop_frame_len)
1658 		return 1;
1659 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1660 		info->stop_frame_done, info->stop_frame_len -
1661 		info->stop_frame_done);
1662 	if(r == -1) {
1663 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1664 		dtio_stop_flush_exit(info);
1665 		return 0;
1666 	}
1667 	if(r == 0) {
1668 		/* try again later, or timeout */
1669 		return 0;
1670 	}
1671 	info->stop_frame_done += r;
1672 	if(info->stop_frame_done < info->stop_frame_len)
1673 		return 0; /* not done yet */
1674 	return 1;
1675 }
1676 
1677 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1678 	void* arg)
1679 {
1680 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1681 	if(info->want_to_exit_flush)
1682 		return;
1683 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1684 	info->timer_done = 1;
1685 	dtio_stop_flush_exit(info);
1686 }
1687 
1688 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1689 {
1690 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1691 	struct dt_io_thread* dtio = info->dtio;
1692 	if(info->want_to_exit_flush)
1693 		return;
1694 	if(dtio->check_nb_connect) {
1695 		/* we don't start the stop_flush if connect still
1696 		 * in progress, but the check code is here, just in case */
1697 		int connect_err = dtio_check_nb_connect(dtio);
1698 		if(connect_err == -1) {
1699 			/* close the channel, exit the stop flush */
1700 			dtio_stop_flush_exit(info);
1701 			dtio_del_output_event(dtio);
1702 			dtio_close_output(dtio);
1703 			return;
1704 		} else if(connect_err == 0) {
1705 			/* try again later */
1706 			return;
1707 		}
1708 		/* nonblocking connect check passed, continue */
1709 	}
1710 #ifdef HAVE_SSL
1711 	if(dtio->ssl &&
1712 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1713 		if(!dtio_ssl_handshake(dtio, info))
1714 			return;
1715 	}
1716 #endif
1717 
1718 	if((bits&UB_EV_READ)) {
1719 		if(!dtio_check_close(dtio)) {
1720 			if(dtio->fd == -1) {
1721 				verbose(VERB_ALGO, "dnstap io: "
1722 					"stop flush: output closed");
1723 				dtio_stop_flush_exit(info);
1724 			}
1725 			return;
1726 		}
1727 	}
1728 	/* write remainder of last frame */
1729 	if(dtio->cur_msg) {
1730 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1731 			if(!dtio_write_more(dtio)) {
1732 				if(dtio->fd == -1) {
1733 					verbose(VERB_ALGO, "dnstap io: "
1734 						"stop flush: output closed");
1735 					dtio_stop_flush_exit(info);
1736 				}
1737 				return;
1738 			}
1739 		}
1740 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1741 			"last frame");
1742 		dtio_cur_msg_free(dtio);
1743 	}
1744 	/* write stop frame */
1745 	if(info->stop_frame_done < info->stop_frame_len) {
1746 		if(!dtio_control_stop_send(info))
1747 			return;
1748 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1749 			"stop control frame");
1750 	}
1751 	/* when last frame and stop frame are sent, exit */
1752 	dtio_stop_flush_exit(info);
1753 }
1754 
1755 /** flush at end, last packet and stop control */
1756 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1757 {
1758 	/* briefly attempt to flush the previous packet to the output,
1759 	 * this could be a partial packet, or even the start control frame */
1760 	time_t secs = 0;
1761 	struct timeval now;
1762 	struct stop_flush_info info;
1763 	struct timeval tv;
1764 	struct ub_event* timer, *stopev;
1765 
1766 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1767 		/* no connection or we have just connected, so nothing is
1768 		 * sent yet, so nothing to stop or flush */
1769 		return;
1770 	}
1771 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1772 		/* no SSL connection has been established yet */
1773 		return;
1774 	}
1775 
1776 	memset(&info, 0, sizeof(info));
1777 	memset(&now, 0, sizeof(now));
1778 	info.dtio = dtio;
1779 	info.base = ub_default_event_base(0, &secs, &now);
1780 	if(!info.base) {
1781 		log_err("dnstap io: malloc failure");
1782 		return;
1783 	}
1784 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1785 		&dtio_stop_timer_cb, &info);
1786 	if(!timer) {
1787 		log_err("dnstap io: malloc failure");
1788 		ub_event_base_free(info.base);
1789 		return;
1790 	}
1791 	memset(&tv, 0, sizeof(tv));
1792 	tv.tv_sec = 2;
1793 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1794 		&tv) != 0) {
1795 		log_err("dnstap io: cannot event_timer_add");
1796 		ub_event_free(timer);
1797 		ub_event_base_free(info.base);
1798 		return;
1799 	}
1800 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1801 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1802 	if(!stopev) {
1803 		log_err("dnstap io: malloc failure");
1804 		ub_timer_del(timer);
1805 		ub_event_free(timer);
1806 		ub_event_base_free(info.base);
1807 		return;
1808 	}
1809 	if(ub_event_add(stopev, NULL) != 0) {
1810 		log_err("dnstap io: cannot event_add");
1811 		ub_event_free(stopev);
1812 		ub_timer_del(timer);
1813 		ub_event_free(timer);
1814 		ub_event_base_free(info.base);
1815 		return;
1816 	}
1817 	info.stop_frame = fstrm_create_control_frame_stop(
1818 		&info.stop_frame_len);
1819 	if(!info.stop_frame) {
1820 		log_err("dnstap io: malloc failure");
1821 		ub_event_del(stopev);
1822 		ub_event_free(stopev);
1823 		ub_timer_del(timer);
1824 		ub_event_free(timer);
1825 		ub_event_base_free(info.base);
1826 		return;
1827 	}
1828 	dtio->stop_flush_event = stopev;
1829 
1830 	/* wait briefly, or until finished */
1831 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1832 	if(ub_event_base_dispatch(info.base) < 0) {
1833 		log_err("dnstap io: dispatch flush failed, errno is %s",
1834 			strerror(errno));
1835 	}
1836 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1837 	free(info.stop_frame);
1838 	dtio->stop_flush_event = NULL;
1839 	ub_event_del(stopev);
1840 	ub_event_free(stopev);
1841 	ub_timer_del(timer);
1842 	ub_event_free(timer);
1843 	ub_event_base_free(info.base);
1844 }
1845 
1846 /** perform desetup and free stuff when the dnstap io thread exits */
1847 static void dtio_desetup(struct dt_io_thread* dtio)
1848 {
1849 	dtio_control_stop_flush(dtio);
1850 	dtio_del_output_event(dtio);
1851 	dtio_close_output(dtio);
1852 	ub_event_del(dtio->command_event);
1853 	ub_event_free(dtio->command_event);
1854 #ifndef USE_WINSOCK
1855 	close(dtio->commandpipe[0]);
1856 #else
1857 	_close(dtio->commandpipe[0]);
1858 #endif
1859 	dtio->commandpipe[0] = -1;
1860 	dtio_reconnect_del(dtio);
1861 	ub_event_free(dtio->reconnect_timer);
1862 	dtio_cur_msg_free(dtio);
1863 #ifndef THREADS_DISABLED
1864 	ub_event_base_free(dtio->event_base);
1865 #endif
1866 }
1867 
1868 /** setup a start control message */
1869 static int dtio_control_start_send(struct dt_io_thread* dtio)
1870 {
1871 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1872 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1873 		&dtio->cur_msg_len);
1874 	if(!dtio->cur_msg) {
1875 		return 0;
1876 	}
1877 	/* setup to send the control message */
1878 	/* set that the buffer needs to be sent, but the length
1879 	 * of that buffer is already written, that way the buffer can
1880 	 * start with 0 length and then the length of the control frame
1881 	 * in it */
1882 	dtio->cur_msg_done = 0;
1883 	dtio->cur_msg_len_done = 4;
1884 	return 1;
1885 }
1886 
1887 /** setup a ready control message */
1888 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1889 {
1890 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1891 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1892 		&dtio->cur_msg_len);
1893 	if(!dtio->cur_msg) {
1894 		return 0;
1895 	}
1896 	/* setup to send the control message */
1897 	/* set that the buffer needs to be sent, but the length
1898 	 * of that buffer is already written, that way the buffer can
1899 	 * start with 0 length and then the length of the control frame
1900 	 * in it */
1901 	dtio->cur_msg_done = 0;
1902 	dtio->cur_msg_len_done = 4;
1903 	return 1;
1904 }
1905 
1906 /** open the output file descriptor for af_local */
1907 static int dtio_open_output_local(struct dt_io_thread* dtio)
1908 {
1909 #ifdef HAVE_SYS_UN_H
1910 	struct sockaddr_un s;
1911 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1912 	if(dtio->fd == -1) {
1913 		log_err("dnstap io: failed to create socket: %s",
1914 			sock_strerror(errno));
1915 		return 0;
1916 	}
1917 	memset(&s, 0, sizeof(s));
1918 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1919         /* this member exists on BSDs, not Linux */
1920         s.sun_len = (unsigned)sizeof(s);
1921 #endif
1922 	s.sun_family = AF_LOCAL;
1923 	/* length is 92-108, 104 on FreeBSD */
1924         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1925 	fd_set_nonblock(dtio->fd);
1926 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1927 		== -1) {
1928 		char* to = dtio->socket_path;
1929 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1930 			verbosity < 4) {
1931 			dtio_close_fd(dtio);
1932 			return 0; /* no log retries on low verbosity */
1933 		}
1934 		log_err("dnstap io: failed to connect to \"%s\": %s",
1935 			to, sock_strerror(errno));
1936 		dtio_close_fd(dtio);
1937 		return 0;
1938 	}
1939 	return 1;
1940 #else
1941 	log_err("cannot create af_local socket");
1942 	return 0;
1943 #endif /* HAVE_SYS_UN_H */
1944 }
1945 
1946 /** open the output file descriptor for af_inet and af_inet6 */
1947 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1948 {
1949 	struct sockaddr_storage addr;
1950 	socklen_t addrlen;
1951 	memset(&addr, 0, sizeof(addr));
1952 	addrlen = (socklen_t)sizeof(addr);
1953 
1954 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1955 		log_err("could not parse IP '%s'", dtio->ip_str);
1956 		return 0;
1957 	}
1958 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1959 	if(dtio->fd == -1) {
1960 		log_err("can't create socket: %s", sock_strerror(errno));
1961 		return 0;
1962 	}
1963 	fd_set_nonblock(dtio->fd);
1964 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1965 		if(errno == EINPROGRESS)
1966 			return 1; /* wait until connect done*/
1967 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1968 			verbosity < 4) {
1969 			dtio_close_fd(dtio);
1970 			return 0; /* no log retries on low verbosity */
1971 		}
1972 #ifndef USE_WINSOCK
1973 		if(tcp_connect_errno_needs_log(
1974 			(struct sockaddr *)&addr, addrlen)) {
1975 			log_err("dnstap io: failed to connect to %s: %s",
1976 				dtio->ip_str, strerror(errno));
1977 		}
1978 #else
1979 		if(WSAGetLastError() == WSAEINPROGRESS ||
1980 			WSAGetLastError() == WSAEWOULDBLOCK)
1981 			return 1; /* wait until connect done*/
1982 		if(tcp_connect_errno_needs_log(
1983 			(struct sockaddr *)&addr, addrlen)) {
1984 			log_err("dnstap io: failed to connect to %s: %s",
1985 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1986 		}
1987 #endif
1988 		dtio_close_fd(dtio);
1989 		return 0;
1990 	}
1991 	return 1;
1992 }
1993 
1994 /** setup the SSL structure for new connection */
1995 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1996 {
1997 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1998 	if(!dtio->ssl) return 0;
1999 	dtio->ssl_handshake_done = 0;
2000 	dtio->ssl_brief_read = 0;
2001 
2002 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2003 		dtio->tls_use_sni)) {
2004 		return 0;
2005 	}
2006 	return 1;
2007 }
2008 
2009 /** open the output file descriptor */
2010 static void dtio_open_output(struct dt_io_thread* dtio)
2011 {
2012 	struct ub_event* ev;
2013 	if(dtio->upstream_is_unix) {
2014 		if(!dtio_open_output_local(dtio)) {
2015 			dtio_reconnect_enable(dtio);
2016 			return;
2017 		}
2018 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2019 		if(!dtio_open_output_tcp(dtio)) {
2020 			dtio_reconnect_enable(dtio);
2021 			return;
2022 		}
2023 		if(dtio->upstream_is_tls) {
2024 			if(!dtio_setup_ssl(dtio)) {
2025 				dtio_close_fd(dtio);
2026 				dtio_reconnect_enable(dtio);
2027 				return;
2028 			}
2029 		}
2030 	}
2031 	dtio->check_nb_connect = 1;
2032 
2033 	/* the EV_READ is to read ACCEPT control messages, and catch channel
2034 	 * close. EV_WRITE is to write packets */
2035 	ev = ub_event_new(dtio->event_base, dtio->fd,
2036 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2037 		dtio);
2038 	if(!ev) {
2039 		log_err("dnstap io: out of memory");
2040 		if(dtio->ssl) {
2041 #ifdef HAVE_SSL
2042 			SSL_free(dtio->ssl);
2043 			dtio->ssl = NULL;
2044 #endif
2045 		}
2046 		dtio_close_fd(dtio);
2047 		dtio_reconnect_enable(dtio);
2048 		return;
2049 	}
2050 	dtio->event = ev;
2051 
2052 	/* setup protocol control message to start */
2053 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2054 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2055 		log_err("dnstap io: out of memory");
2056 		ub_event_free(dtio->event);
2057 		dtio->event = NULL;
2058 		if(dtio->ssl) {
2059 #ifdef HAVE_SSL
2060 			SSL_free(dtio->ssl);
2061 			dtio->ssl = NULL;
2062 #endif
2063 		}
2064 		dtio_close_fd(dtio);
2065 		dtio_reconnect_enable(dtio);
2066 		return;
2067 	}
2068 }
2069 
2070 /** perform the setup of the writer thread on the established event_base */
2071 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2072 {
2073 	dtio_setup_cmd(dtio);
2074 	dtio_setup_reconnect(dtio);
2075 	dtio_open_output(dtio);
2076 	if(!dtio_add_output_event_write(dtio))
2077 		return;
2078 }
2079 
2080 #ifndef THREADS_DISABLED
2081 /** the IO thread function for the DNSTAP IO */
2082 static void* dnstap_io(void* arg)
2083 {
2084 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2085 	time_t secs = 0;
2086 	struct timeval now;
2087 	log_thread_set(&dtio->threadnum);
2088 
2089 	/* setup */
2090 	verbose(VERB_ALGO, "start dnstap io thread");
2091 	dtio_setup_base(dtio, &secs, &now);
2092 	dtio_setup_on_base(dtio);
2093 
2094 	/* run */
2095 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2096 		log_err("dnstap io: dispatch failed, errno is %s",
2097 			strerror(errno));
2098 	}
2099 
2100 	/* cleanup */
2101 	verbose(VERB_ALGO, "stop dnstap io thread");
2102 	dtio_desetup(dtio);
2103 	return NULL;
2104 }
2105 #endif /* THREADS_DISABLED */
2106 
2107 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2108 	int numworkers)
2109 {
2110 	/* set up the thread, can fail */
2111 #ifndef USE_WINSOCK
2112 	if(pipe(dtio->commandpipe) == -1) {
2113 		log_err("failed to create pipe: %s", strerror(errno));
2114 		return 0;
2115 	}
2116 #else
2117 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2118 		log_err("failed to create _pipe: %s",
2119 			wsa_strerror(WSAGetLastError()));
2120 		return 0;
2121 	}
2122 #endif
2123 
2124 	/* start the thread */
2125 	dtio->threadnum = numworkers+1;
2126 	dtio->started = 1;
2127 #ifndef THREADS_DISABLED
2128 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2129 	(void)event_base_nothr;
2130 #else
2131 	dtio->event_base = event_base_nothr;
2132 	dtio_setup_on_base(dtio);
2133 #endif
2134 	return 1;
2135 }
2136 
2137 void dt_io_thread_stop(struct dt_io_thread* dtio)
2138 {
2139 #ifndef THREADS_DISABLED
2140 	uint8_t cmd = DTIO_COMMAND_STOP;
2141 #endif
2142 	if(!dtio) return;
2143 	if(!dtio->started) return;
2144 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2145 
2146 #ifndef THREADS_DISABLED
2147 	while(1) {
2148 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2149 		if(r == -1) {
2150 #ifndef USE_WINSOCK
2151 			if(errno == EINTR || errno == EAGAIN)
2152 				continue;
2153 #else
2154 			if(WSAGetLastError() == WSAEINPROGRESS)
2155 				continue;
2156 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2157 				continue;
2158 #endif
2159 			log_err("dnstap io stop: write: %s",
2160 				sock_strerror(errno));
2161 			break;
2162 		}
2163 		break;
2164 	}
2165 	dtio->started = 0;
2166 #endif /* THREADS_DISABLED */
2167 
2168 #ifndef USE_WINSOCK
2169 	close(dtio->commandpipe[1]);
2170 #else
2171 	_close(dtio->commandpipe[1]);
2172 #endif
2173 	dtio->commandpipe[1] = -1;
2174 #ifndef THREADS_DISABLED
2175 	ub_thread_join(dtio->tid);
2176 #else
2177 	dtio->want_to_exit = 1;
2178 	dtio_desetup(dtio);
2179 #endif
2180 }
2181