xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision 5685098846d7f11ad642d9804d94dc7429a7b212)
125039b37SCy Schubert /*
225039b37SCy Schubert  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
325039b37SCy Schubert  *
425039b37SCy Schubert  * Copyright (c) 2020, NLnet Labs. All rights reserved.
525039b37SCy Schubert  *
625039b37SCy Schubert  * This software is open source.
725039b37SCy Schubert  *
825039b37SCy Schubert  * Redistribution and use in source and binary forms, with or without
925039b37SCy Schubert  * modification, are permitted provided that the following conditions
1025039b37SCy Schubert  * are met:
1125039b37SCy Schubert  *
1225039b37SCy Schubert  * Redistributions of source code must retain the above copyright notice,
1325039b37SCy Schubert  * this list of conditions and the following disclaimer.
1425039b37SCy Schubert  *
1525039b37SCy Schubert  * Redistributions in binary form must reproduce the above copyright notice,
1625039b37SCy Schubert  * this list of conditions and the following disclaimer in the documentation
1725039b37SCy Schubert  * and/or other materials provided with the distribution.
1825039b37SCy Schubert  *
1925039b37SCy Schubert  * Neither the name of the NLNET LABS nor the names of its contributors may
2025039b37SCy Schubert  * be used to endorse or promote products derived from this software without
2125039b37SCy Schubert  * specific prior written permission.
2225039b37SCy Schubert  *
2325039b37SCy Schubert  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
2425039b37SCy Schubert  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
2525039b37SCy Schubert  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
2625039b37SCy Schubert  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
2725039b37SCy Schubert  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
2825039b37SCy Schubert  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
2925039b37SCy Schubert  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
3025039b37SCy Schubert  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
3125039b37SCy Schubert  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
3225039b37SCy Schubert  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
3325039b37SCy Schubert  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3425039b37SCy Schubert  *
3525039b37SCy Schubert  */
3625039b37SCy Schubert 
3725039b37SCy Schubert /**
3825039b37SCy Schubert  * \file
3925039b37SCy Schubert  *
4025039b37SCy Schubert  * An implementation of the Frame Streams data transport protocol for
4125039b37SCy Schubert  * the Unbound DNSTAP message logging facility.
4225039b37SCy Schubert  */
4325039b37SCy Schubert 
4425039b37SCy Schubert #include "config.h"
4525039b37SCy Schubert #include "dnstap/dtstream.h"
4625039b37SCy Schubert #include "dnstap/dnstap_fstrm.h"
4725039b37SCy Schubert #include "util/config_file.h"
4825039b37SCy Schubert #include "util/ub_event.h"
4925039b37SCy Schubert #include "util/net_help.h"
5025039b37SCy Schubert #include "services/outside_network.h"
5125039b37SCy Schubert #include "sldns/sbuffer.h"
5225039b37SCy Schubert #ifdef HAVE_SYS_UN_H
5325039b37SCy Schubert #include <sys/un.h>
5425039b37SCy Schubert #endif
5525039b37SCy Schubert #include <fcntl.h>
5625039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H
5725039b37SCy Schubert #include <openssl/ssl.h>
5825039b37SCy Schubert #endif
5925039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H
6025039b37SCy Schubert #include <openssl/err.h>
6125039b37SCy Schubert #endif
6225039b37SCy Schubert 
6325039b37SCy Schubert /** number of messages to process in one output callback */
6425039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100
6525039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */
6625039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10
6725039b37SCy Schubert /** the msec to wait for reconnect max after backoff */
6825039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000
6925039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
7025039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71c0caa2e2SCy Schubert /** number of messages before wakeup of thread */
72c0caa2e2SCy Schubert #define DTIO_MSG_FOR_WAKEUP 32
7325039b37SCy Schubert 
7425039b37SCy Schubert /** maximum length of received frame */
7525039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000
7625039b37SCy Schubert 
7725039b37SCy Schubert struct stop_flush_info;
7825039b37SCy Schubert /** DTIO command channel commands */
7925039b37SCy Schubert enum {
8025039b37SCy Schubert 	/** DTIO command channel stop */
8125039b37SCy Schubert 	DTIO_COMMAND_STOP = 0,
8225039b37SCy Schubert 	/** DTIO command channel wakeup */
8325039b37SCy Schubert 	DTIO_COMMAND_WAKEUP = 1
8425039b37SCy Schubert } dtio_channel_command;
8525039b37SCy Schubert 
8625039b37SCy Schubert /** open the output channel */
8725039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio);
8825039b37SCy Schubert /** add output event for read and write */
8925039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio);
9025039b37SCy Schubert /** start reconnection attempts */
9125039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio);
9225039b37SCy Schubert /** stop from stop_flush event loop */
9325039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info);
9425039b37SCy Schubert /** setup a start control message */
9525039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio);
9625039b37SCy Schubert #ifdef HAVE_SSL
9725039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */
9825039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio);
9925039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */
10025039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio);
10125039b37SCy Schubert #endif
10225039b37SCy Schubert 
10325039b37SCy Schubert struct dt_msg_queue*
dt_msg_queue_create(struct comm_base * base)104c0caa2e2SCy Schubert dt_msg_queue_create(struct comm_base* base)
10525039b37SCy Schubert {
10625039b37SCy Schubert 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
10725039b37SCy Schubert 	if(!mq) return NULL;
10825039b37SCy Schubert 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
10925039b37SCy Schubert 		about 1 M should contain 64K messages with some overhead,
11025039b37SCy Schubert 		or a whole bunch smaller ones */
111c0caa2e2SCy Schubert 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112c0caa2e2SCy Schubert 	if(!mq->wakeup_timer) {
113c0caa2e2SCy Schubert 		free(mq);
114c0caa2e2SCy Schubert 		return NULL;
115c0caa2e2SCy Schubert 	}
11625039b37SCy Schubert 	lock_basic_init(&mq->lock);
11725039b37SCy Schubert 	lock_protect(&mq->lock, mq, sizeof(*mq));
11825039b37SCy Schubert 	return mq;
11925039b37SCy Schubert }
12025039b37SCy Schubert 
12125039b37SCy Schubert /** clear the message list, caller must hold the lock */
12225039b37SCy Schubert static void
dt_msg_queue_clear(struct dt_msg_queue * mq)12325039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq)
12425039b37SCy Schubert {
12525039b37SCy Schubert 	struct dt_msg_entry* e = mq->first, *next=NULL;
12625039b37SCy Schubert 	while(e) {
12725039b37SCy Schubert 		next = e->next;
12825039b37SCy Schubert 		free(e->buf);
12925039b37SCy Schubert 		free(e);
13025039b37SCy Schubert 		e = next;
13125039b37SCy Schubert 	}
13225039b37SCy Schubert 	mq->first = NULL;
13325039b37SCy Schubert 	mq->last = NULL;
13425039b37SCy Schubert 	mq->cursize = 0;
135c0caa2e2SCy Schubert 	mq->msgcount = 0;
13625039b37SCy Schubert }
13725039b37SCy Schubert 
13825039b37SCy Schubert void
dt_msg_queue_delete(struct dt_msg_queue * mq)13925039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq)
14025039b37SCy Schubert {
14125039b37SCy Schubert 	if(!mq) return;
14225039b37SCy Schubert 	lock_basic_destroy(&mq->lock);
14325039b37SCy Schubert 	dt_msg_queue_clear(mq);
144c0caa2e2SCy Schubert 	comm_timer_delete(mq->wakeup_timer);
14525039b37SCy Schubert 	free(mq);
14625039b37SCy Schubert }
14725039b37SCy Schubert 
14825039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */
dtio_wakeup(struct dt_io_thread * dtio)14925039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio)
15025039b37SCy Schubert {
15125039b37SCy Schubert 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
15225039b37SCy Schubert 	if(!dtio) return;
15325039b37SCy Schubert 	if(!dtio->started) return;
15425039b37SCy Schubert 
15525039b37SCy Schubert 	while(1) {
15625039b37SCy Schubert 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
15725039b37SCy Schubert 		if(r == -1) {
15825039b37SCy Schubert #ifndef USE_WINSOCK
15925039b37SCy Schubert 			if(errno == EINTR || errno == EAGAIN)
16025039b37SCy Schubert 				continue;
16125039b37SCy Schubert #else
16225039b37SCy Schubert 			if(WSAGetLastError() == WSAEINPROGRESS)
16325039b37SCy Schubert 				continue;
16425039b37SCy Schubert 			if(WSAGetLastError() == WSAEWOULDBLOCK)
16525039b37SCy Schubert 				continue;
16625039b37SCy Schubert #endif
167c0caa2e2SCy Schubert 			log_err("dnstap io wakeup: write: %s",
168c0caa2e2SCy Schubert 				sock_strerror(errno));
16925039b37SCy Schubert 			break;
17025039b37SCy Schubert 		}
17125039b37SCy Schubert 		break;
17225039b37SCy Schubert 	}
17325039b37SCy Schubert }
17425039b37SCy Schubert 
17525039b37SCy Schubert void
mq_wakeup_cb(void * arg)176c0caa2e2SCy Schubert mq_wakeup_cb(void* arg)
177c0caa2e2SCy Schubert {
178c0caa2e2SCy Schubert 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179*56850988SCy Schubert 
180c0caa2e2SCy Schubert 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
181c0caa2e2SCy Schubert 	mq->dtio->wakeup_timer_enabled = 0;
182c0caa2e2SCy Schubert 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
183c0caa2e2SCy Schubert 	dtio_wakeup(mq->dtio);
184c0caa2e2SCy Schubert }
185c0caa2e2SCy Schubert 
186c0caa2e2SCy Schubert /** start timer to wakeup dtio because there is content in the queue */
187c0caa2e2SCy Schubert static void
dt_msg_queue_start_timer(struct dt_msg_queue * mq,int wakeupnow)1889cf5bc93SCy Schubert dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
189c0caa2e2SCy Schubert {
1909cf5bc93SCy Schubert 	struct timeval tv = {0};
191c0caa2e2SCy Schubert 	/* Start a timer to process messages to be logged.
192c0caa2e2SCy Schubert 	 * If we woke up the dtio thread for every message, the wakeup
193c0caa2e2SCy Schubert 	 * messages take up too much processing power.  If the queue
194c0caa2e2SCy Schubert 	 * fills up the wakeup happens immediately.  The timer wakes it up
195c0caa2e2SCy Schubert 	 * if there are infrequent messages to log. */
196c0caa2e2SCy Schubert 
197c0caa2e2SCy Schubert 	/* we cannot start a timer in dtio thread, because it is a different
198c0caa2e2SCy Schubert 	 * thread and its event base is in use by the other thread, it would
199c0caa2e2SCy Schubert 	 * give race conditions if we tried to modify its event base,
200c0caa2e2SCy Schubert 	 * and locks would wait until it woke up, and this is what we do. */
201c0caa2e2SCy Schubert 
202c0caa2e2SCy Schubert 	/* do not start the timer if a timer already exists, perhaps
203c0caa2e2SCy Schubert 	 * in another worker.  So this variable is protected by a lock in
2049cf5bc93SCy Schubert 	 * dtio. */
2059cf5bc93SCy Schubert 
2069cf5bc93SCy Schubert 	/* If we need to wakeupnow, 0 the timer to force the callback. */
207c0caa2e2SCy Schubert 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
208c0caa2e2SCy Schubert 	if(mq->dtio->wakeup_timer_enabled) {
2099cf5bc93SCy Schubert 		if(wakeupnow) {
210*56850988SCy Schubert 			tv.tv_sec = 0;
211*56850988SCy Schubert 			tv.tv_usec = 0;
2129cf5bc93SCy Schubert 			comm_timer_set(mq->wakeup_timer, &tv);
2139cf5bc93SCy Schubert 		}
214c0caa2e2SCy Schubert 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215c0caa2e2SCy Schubert 		return;
216c0caa2e2SCy Schubert 	}
217c0caa2e2SCy Schubert 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
218c0caa2e2SCy Schubert 
219c0caa2e2SCy Schubert 	/* start the timer, in mq, in the event base of our worker */
2209cf5bc93SCy Schubert 	if(!wakeupnow) {
221c0caa2e2SCy Schubert 		tv.tv_sec = 1;
222c0caa2e2SCy Schubert 		tv.tv_usec = 0;
223*56850988SCy Schubert 		/* If it is already set, keep it running. */
224*56850988SCy Schubert 		if(!comm_timer_is_set(mq->wakeup_timer))
225c0caa2e2SCy Schubert 			comm_timer_set(mq->wakeup_timer, &tv);
226*56850988SCy Schubert 	} else {
227*56850988SCy Schubert 		tv.tv_sec = 0;
228*56850988SCy Schubert 		tv.tv_usec = 0;
229*56850988SCy Schubert 		comm_timer_set(mq->wakeup_timer, &tv);
230*56850988SCy Schubert 	}
2319cf5bc93SCy Schubert 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
232c0caa2e2SCy Schubert }
233c0caa2e2SCy Schubert 
234c0caa2e2SCy Schubert void
dt_msg_queue_submit(struct dt_msg_queue * mq,void * buf,size_t len)23525039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
23625039b37SCy Schubert {
237c0caa2e2SCy Schubert 	int wakeupnow = 0, wakeupstarttimer = 0;
23825039b37SCy Schubert 	struct dt_msg_entry* entry;
23925039b37SCy Schubert 
24025039b37SCy Schubert 	/* check conditions */
24125039b37SCy Schubert 	if(!buf) return;
24225039b37SCy Schubert 	if(len == 0) {
24325039b37SCy Schubert 		/* it is not possible to log entries with zero length,
24425039b37SCy Schubert 		 * because the framestream protocol does not carry it.
24525039b37SCy Schubert 		 * However the protobuf serialization does not create zero
24625039b37SCy Schubert 		 * length datagrams for dnstap, so this should not happen. */
24725039b37SCy Schubert 		free(buf);
24825039b37SCy Schubert 		return;
24925039b37SCy Schubert 	}
25025039b37SCy Schubert 	if(!mq) {
25125039b37SCy Schubert 		free(buf);
25225039b37SCy Schubert 		return;
25325039b37SCy Schubert 	}
25425039b37SCy Schubert 
25525039b37SCy Schubert 	/* allocate memory for queue entry */
25625039b37SCy Schubert 	entry = malloc(sizeof(*entry));
25725039b37SCy Schubert 	if(!entry) {
25825039b37SCy Schubert 		log_err("out of memory logging dnstap");
25925039b37SCy Schubert 		free(buf);
26025039b37SCy Schubert 		return;
26125039b37SCy Schubert 	}
26225039b37SCy Schubert 	entry->next = NULL;
26325039b37SCy Schubert 	entry->buf = buf;
26425039b37SCy Schubert 	entry->len = len;
26525039b37SCy Schubert 
26624e36522SCy Schubert 	/* acquire lock */
26725039b37SCy Schubert 	lock_basic_lock(&mq->lock);
268*56850988SCy Schubert 	/* if list was empty, start timer for (eventual) wakeup,
269*56850988SCy Schubert 	 * or if dtio is not writing now an eventual wakeup is needed. */
270*56850988SCy Schubert 	if(mq->first == NULL || !mq->dtio->event_added_is_write)
271c0caa2e2SCy Schubert 		wakeupstarttimer = 1;
272c0caa2e2SCy Schubert 	/* if list contains more than wakeupnum elements, wakeup now,
273c0caa2e2SCy Schubert 	 * or if list is (going to be) almost full */
274c0caa2e2SCy Schubert 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
275c0caa2e2SCy Schubert 		(mq->cursize < mq->maxsize * 9 / 10 &&
276c0caa2e2SCy Schubert 		mq->cursize+len >= mq->maxsize * 9 / 10))
277c0caa2e2SCy Schubert 		wakeupnow = 1;
27825039b37SCy Schubert 	/* see if it is going to fit */
27925039b37SCy Schubert 	if(mq->cursize + len > mq->maxsize) {
28025039b37SCy Schubert 		/* buffer full, or congested. */
28125039b37SCy Schubert 		/* drop */
28225039b37SCy Schubert 		lock_basic_unlock(&mq->lock);
28325039b37SCy Schubert 		free(buf);
28425039b37SCy Schubert 		free(entry);
28525039b37SCy Schubert 		return;
28625039b37SCy Schubert 	}
28725039b37SCy Schubert 	mq->cursize += len;
288c0caa2e2SCy Schubert 	mq->msgcount ++;
28925039b37SCy Schubert 	/* append to list */
29025039b37SCy Schubert 	if(mq->last) {
29125039b37SCy Schubert 		mq->last->next = entry;
29225039b37SCy Schubert 	} else {
29325039b37SCy Schubert 		mq->first = entry;
29425039b37SCy Schubert 	}
29525039b37SCy Schubert 	mq->last = entry;
29625039b37SCy Schubert 	/* release lock */
29725039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
29825039b37SCy Schubert 
2999cf5bc93SCy Schubert 	if(wakeupnow || wakeupstarttimer) {
3009cf5bc93SCy Schubert 		dt_msg_queue_start_timer(mq, wakeupnow);
301c0caa2e2SCy Schubert 	}
30225039b37SCy Schubert }
30325039b37SCy Schubert 
dt_io_thread_create(void)30425039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void)
30525039b37SCy Schubert {
30625039b37SCy Schubert 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
307c0caa2e2SCy Schubert 	lock_basic_init(&dtio->wakeup_timer_lock);
308c0caa2e2SCy Schubert 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
309c0caa2e2SCy Schubert 		sizeof(dtio->wakeup_timer_enabled));
31025039b37SCy Schubert 	return dtio;
31125039b37SCy Schubert }
31225039b37SCy Schubert 
dt_io_thread_delete(struct dt_io_thread * dtio)31325039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio)
31425039b37SCy Schubert {
31525039b37SCy Schubert 	struct dt_io_list_item* item, *nextitem;
31625039b37SCy Schubert 	if(!dtio) return;
317c0caa2e2SCy Schubert 	lock_basic_destroy(&dtio->wakeup_timer_lock);
31825039b37SCy Schubert 	item=dtio->io_list;
31925039b37SCy Schubert 	while(item) {
32025039b37SCy Schubert 		nextitem = item->next;
32125039b37SCy Schubert 		free(item);
32225039b37SCy Schubert 		item = nextitem;
32325039b37SCy Schubert 	}
32425039b37SCy Schubert 	free(dtio->socket_path);
32525039b37SCy Schubert 	free(dtio->ip_str);
32625039b37SCy Schubert 	free(dtio->tls_server_name);
32725039b37SCy Schubert 	free(dtio->client_key_file);
32825039b37SCy Schubert 	free(dtio->client_cert_file);
32925039b37SCy Schubert 	if(dtio->ssl_ctx) {
33025039b37SCy Schubert #ifdef HAVE_SSL
33125039b37SCy Schubert 		SSL_CTX_free(dtio->ssl_ctx);
33225039b37SCy Schubert #endif
33325039b37SCy Schubert 	}
33425039b37SCy Schubert 	free(dtio);
33525039b37SCy Schubert }
33625039b37SCy Schubert 
dt_io_thread_apply_cfg(struct dt_io_thread * dtio,struct config_file * cfg)33725039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
33825039b37SCy Schubert {
33925039b37SCy Schubert 	if(!cfg->dnstap) {
34025039b37SCy Schubert 		log_warn("cannot setup dnstap because dnstap-enable is no");
34125039b37SCy Schubert 		return 0;
34225039b37SCy Schubert 	}
34325039b37SCy Schubert 
34425039b37SCy Schubert 	/* what type of connectivity do we have */
34525039b37SCy Schubert 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
34625039b37SCy Schubert 		if(cfg->dnstap_tls)
34725039b37SCy Schubert 			dtio->upstream_is_tls = 1;
34825039b37SCy Schubert 		else	dtio->upstream_is_tcp = 1;
34925039b37SCy Schubert 	} else {
35025039b37SCy Schubert 		dtio->upstream_is_unix = 1;
35125039b37SCy Schubert 	}
35225039b37SCy Schubert 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
35325039b37SCy Schubert 
35425039b37SCy Schubert 	if(dtio->upstream_is_unix) {
355369c6923SCy Schubert 		char* nm;
35625039b37SCy Schubert 		if(!cfg->dnstap_socket_path ||
35725039b37SCy Schubert 			cfg->dnstap_socket_path[0]==0) {
35825039b37SCy Schubert 			log_err("dnstap setup: no dnstap-socket-path for "
35925039b37SCy Schubert 				"socket connect");
36025039b37SCy Schubert 			return 0;
36125039b37SCy Schubert 		}
362369c6923SCy Schubert 		nm = cfg->dnstap_socket_path;
363369c6923SCy Schubert 		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
364369c6923SCy Schubert 			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
365369c6923SCy Schubert 			nm += strlen(cfg->chrootdir);
36625039b37SCy Schubert 		free(dtio->socket_path);
367369c6923SCy Schubert 		dtio->socket_path = strdup(nm);
36825039b37SCy Schubert 		if(!dtio->socket_path) {
36925039b37SCy Schubert 			log_err("dnstap setup: malloc failure");
37025039b37SCy Schubert 			return 0;
37125039b37SCy Schubert 		}
37225039b37SCy Schubert 	}
37325039b37SCy Schubert 
37425039b37SCy Schubert 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
37525039b37SCy Schubert 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
37625039b37SCy Schubert 			log_err("dnstap setup: no dnstap-ip for TCP connect");
37725039b37SCy Schubert 			return 0;
37825039b37SCy Schubert 		}
37925039b37SCy Schubert 		free(dtio->ip_str);
38025039b37SCy Schubert 		dtio->ip_str = strdup(cfg->dnstap_ip);
38125039b37SCy Schubert 		if(!dtio->ip_str) {
38225039b37SCy Schubert 			log_err("dnstap setup: malloc failure");
38325039b37SCy Schubert 			return 0;
38425039b37SCy Schubert 		}
38525039b37SCy Schubert 	}
38625039b37SCy Schubert 
38725039b37SCy Schubert 	if(dtio->upstream_is_tls) {
38825039b37SCy Schubert #ifdef HAVE_SSL
38925039b37SCy Schubert 		if(cfg->dnstap_tls_server_name &&
39025039b37SCy Schubert 			cfg->dnstap_tls_server_name[0]) {
39125039b37SCy Schubert 			free(dtio->tls_server_name);
39225039b37SCy Schubert 			dtio->tls_server_name = strdup(
39325039b37SCy Schubert 				cfg->dnstap_tls_server_name);
39425039b37SCy Schubert 			if(!dtio->tls_server_name) {
39525039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
39625039b37SCy Schubert 				return 0;
39725039b37SCy Schubert 			}
39825039b37SCy Schubert 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
39925039b37SCy Schubert 				return 0;
40025039b37SCy Schubert 		}
40125039b37SCy Schubert 		if(cfg->dnstap_tls_client_key_file &&
40225039b37SCy Schubert 			cfg->dnstap_tls_client_key_file[0]) {
40325039b37SCy Schubert 			dtio->use_client_certs = 1;
40425039b37SCy Schubert 			free(dtio->client_key_file);
40525039b37SCy Schubert 			dtio->client_key_file = strdup(
40625039b37SCy Schubert 				cfg->dnstap_tls_client_key_file);
40725039b37SCy Schubert 			if(!dtio->client_key_file) {
40825039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
40925039b37SCy Schubert 				return 0;
41025039b37SCy Schubert 			}
41125039b37SCy Schubert 			if(!cfg->dnstap_tls_client_cert_file ||
41225039b37SCy Schubert 				cfg->dnstap_tls_client_cert_file[0]==0) {
41325039b37SCy Schubert 				log_err("dnstap setup: client key "
41425039b37SCy Schubert 					"authentication enabled with "
41525039b37SCy Schubert 					"dnstap-tls-client-key-file, but "
41625039b37SCy Schubert 					"no dnstap-tls-client-cert-file "
41725039b37SCy Schubert 					"is given");
41825039b37SCy Schubert 				return 0;
41925039b37SCy Schubert 			}
42025039b37SCy Schubert 			free(dtio->client_cert_file);
42125039b37SCy Schubert 			dtio->client_cert_file = strdup(
42225039b37SCy Schubert 				cfg->dnstap_tls_client_cert_file);
42325039b37SCy Schubert 			if(!dtio->client_cert_file) {
42425039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
42525039b37SCy Schubert 				return 0;
42625039b37SCy Schubert 			}
42725039b37SCy Schubert 		} else {
42825039b37SCy Schubert 			dtio->use_client_certs = 0;
42925039b37SCy Schubert 			dtio->client_key_file = NULL;
43025039b37SCy Schubert 			dtio->client_cert_file = NULL;
43125039b37SCy Schubert 		}
43225039b37SCy Schubert 
43325039b37SCy Schubert 		if(cfg->dnstap_tls_cert_bundle) {
43425039b37SCy Schubert 			dtio->ssl_ctx = connect_sslctx_create(
43525039b37SCy Schubert 				dtio->client_key_file,
43625039b37SCy Schubert 				dtio->client_cert_file,
43725039b37SCy Schubert 				cfg->dnstap_tls_cert_bundle, 0);
43825039b37SCy Schubert 		} else {
43925039b37SCy Schubert 			dtio->ssl_ctx = connect_sslctx_create(
44025039b37SCy Schubert 				dtio->client_key_file,
44125039b37SCy Schubert 				dtio->client_cert_file,
44225039b37SCy Schubert 				cfg->tls_cert_bundle, cfg->tls_win_cert);
44325039b37SCy Schubert 		}
44425039b37SCy Schubert 		if(!dtio->ssl_ctx) {
44525039b37SCy Schubert 			log_err("could not setup SSL CTX");
44625039b37SCy Schubert 			return 0;
44725039b37SCy Schubert 		}
44825039b37SCy Schubert 		dtio->tls_use_sni = cfg->tls_use_sni;
44925039b37SCy Schubert #endif /* HAVE_SSL */
45025039b37SCy Schubert 	}
45125039b37SCy Schubert 	return 1;
45225039b37SCy Schubert }
45325039b37SCy Schubert 
dt_io_thread_register_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)45425039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio,
45525039b37SCy Schubert         struct dt_msg_queue* mq)
45625039b37SCy Schubert {
45725039b37SCy Schubert 	struct dt_io_list_item* item = malloc(sizeof(*item));
45825039b37SCy Schubert 	if(!item) return 0;
45925039b37SCy Schubert 	lock_basic_lock(&mq->lock);
46025039b37SCy Schubert 	mq->dtio = dtio;
46125039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
46225039b37SCy Schubert 	item->queue = mq;
46325039b37SCy Schubert 	item->next = dtio->io_list;
46425039b37SCy Schubert 	dtio->io_list = item;
46525039b37SCy Schubert 	dtio->io_list_iter = NULL;
46625039b37SCy Schubert 	return 1;
46725039b37SCy Schubert }
46825039b37SCy Schubert 
dt_io_thread_unregister_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)46925039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
47025039b37SCy Schubert         struct dt_msg_queue* mq)
47125039b37SCy Schubert {
47225039b37SCy Schubert 	struct dt_io_list_item* item, *prev=NULL;
47325039b37SCy Schubert 	if(!dtio) return;
47425039b37SCy Schubert 	item = dtio->io_list;
47525039b37SCy Schubert 	while(item) {
47625039b37SCy Schubert 		if(item->queue == mq) {
47725039b37SCy Schubert 			/* found it */
47825039b37SCy Schubert 			if(prev) prev->next = item->next;
47925039b37SCy Schubert 			else dtio->io_list = item->next;
48025039b37SCy Schubert 			/* the queue itself only registered, not deleted */
48125039b37SCy Schubert 			lock_basic_lock(&item->queue->lock);
48225039b37SCy Schubert 			item->queue->dtio = NULL;
48325039b37SCy Schubert 			lock_basic_unlock(&item->queue->lock);
48425039b37SCy Schubert 			free(item);
48525039b37SCy Schubert 			dtio->io_list_iter = NULL;
48625039b37SCy Schubert 			return;
48725039b37SCy Schubert 		}
48825039b37SCy Schubert 		prev = item;
48925039b37SCy Schubert 		item = item->next;
49025039b37SCy Schubert 	}
49125039b37SCy Schubert }
49225039b37SCy Schubert 
49325039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks,
49425039b37SCy Schubert  * returns true if there is a message */
dt_msg_queue_pop(struct dt_msg_queue * mq,void ** buf,size_t * len)49525039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
49625039b37SCy Schubert 	size_t* len)
49725039b37SCy Schubert {
49825039b37SCy Schubert 	lock_basic_lock(&mq->lock);
49925039b37SCy Schubert 	if(mq->first) {
50025039b37SCy Schubert 		struct dt_msg_entry* entry = mq->first;
50125039b37SCy Schubert 		mq->first = entry->next;
50225039b37SCy Schubert 		if(!entry->next) mq->last = NULL;
50325039b37SCy Schubert 		mq->cursize -= entry->len;
504c0caa2e2SCy Schubert 		mq->msgcount --;
50525039b37SCy Schubert 		lock_basic_unlock(&mq->lock);
50625039b37SCy Schubert 
50725039b37SCy Schubert 		*buf = entry->buf;
50825039b37SCy Schubert 		*len = entry->len;
50925039b37SCy Schubert 		free(entry);
51025039b37SCy Schubert 		return 1;
51125039b37SCy Schubert 	}
51225039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
51325039b37SCy Schubert 	return 0;
51425039b37SCy Schubert }
51525039b37SCy Schubert 
51625039b37SCy Schubert /** find message in queue, false if no message, true if message to send */
dtio_find_in_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)51725039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio,
51825039b37SCy Schubert 	struct dt_msg_queue* mq)
51925039b37SCy Schubert {
52025039b37SCy Schubert 	void* buf=NULL;
52125039b37SCy Schubert 	size_t len=0;
52225039b37SCy Schubert 	if(dt_msg_queue_pop(mq, &buf, &len)) {
52325039b37SCy Schubert 		dtio->cur_msg = buf;
52425039b37SCy Schubert 		dtio->cur_msg_len = len;
52525039b37SCy Schubert 		dtio->cur_msg_done = 0;
52625039b37SCy Schubert 		dtio->cur_msg_len_done = 0;
52725039b37SCy Schubert 		return 1;
52825039b37SCy Schubert 	}
52925039b37SCy Schubert 	return 0;
53025039b37SCy Schubert }
53125039b37SCy Schubert 
53225039b37SCy Schubert /** find a new message to write, search message queues, false if none */
dtio_find_msg(struct dt_io_thread * dtio)53325039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio)
53425039b37SCy Schubert {
53525039b37SCy Schubert 	struct dt_io_list_item *spot, *item;
53625039b37SCy Schubert 
53725039b37SCy Schubert 	spot = dtio->io_list_iter;
53825039b37SCy Schubert 	/* use the next queue for the next message lookup,
53925039b37SCy Schubert 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
54025039b37SCy Schubert 	if(spot)
54125039b37SCy Schubert 		dtio->io_list_iter = spot->next;
54225039b37SCy Schubert 	else if(dtio->io_list)
54325039b37SCy Schubert 		dtio->io_list_iter = dtio->io_list->next;
54425039b37SCy Schubert 
54525039b37SCy Schubert 	/* scan from spot to end-of-io_list */
54625039b37SCy Schubert 	item = spot;
54725039b37SCy Schubert 	while(item) {
54825039b37SCy Schubert 		if(dtio_find_in_queue(dtio, item->queue))
54925039b37SCy Schubert 			return 1;
55025039b37SCy Schubert 		item = item->next;
55125039b37SCy Schubert 	}
55225039b37SCy Schubert 	/* scan starting at the start-of-list (to wrap around the end) */
55325039b37SCy Schubert 	item = dtio->io_list;
55425039b37SCy Schubert 	while(item) {
55525039b37SCy Schubert 		if(dtio_find_in_queue(dtio, item->queue))
55625039b37SCy Schubert 			return 1;
55725039b37SCy Schubert 		item = item->next;
55825039b37SCy Schubert 	}
55925039b37SCy Schubert 	return 0;
56025039b37SCy Schubert }
56125039b37SCy Schubert 
56225039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */
dtio_reconnect_timeout_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)56325039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
56425039b37SCy Schubert 	short ATTR_UNUSED(bits), void* arg)
56525039b37SCy Schubert {
56625039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
56725039b37SCy Schubert 	dtio->reconnect_is_added = 0;
56825039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
56925039b37SCy Schubert 
57025039b37SCy Schubert 	dtio_open_output(dtio);
57125039b37SCy Schubert 	if(dtio->event) {
57225039b37SCy Schubert 		if(!dtio_add_output_event_write(dtio))
57325039b37SCy Schubert 			return;
57425039b37SCy Schubert 		/* nothing wrong so far, wait on the output event */
57525039b37SCy Schubert 		return;
57625039b37SCy Schubert 	}
57725039b37SCy Schubert 	/* exponential backoff and retry on timer */
57825039b37SCy Schubert 	dtio_reconnect_enable(dtio);
57925039b37SCy Schubert }
58025039b37SCy Schubert 
58125039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */
dtio_reconnect_enable(struct dt_io_thread * dtio)58225039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio)
58325039b37SCy Schubert {
58425039b37SCy Schubert 	struct timeval tv;
58525039b37SCy Schubert 	int msec;
58625039b37SCy Schubert 	if(dtio->want_to_exit) return;
58725039b37SCy Schubert 	if(dtio->reconnect_is_added)
58825039b37SCy Schubert 		return; /* already done */
58925039b37SCy Schubert 
59025039b37SCy Schubert 	/* exponential backoff, store the value for next timeout */
59125039b37SCy Schubert 	msec = dtio->reconnect_timeout;
59225039b37SCy Schubert 	if(msec == 0) {
59325039b37SCy Schubert 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
59425039b37SCy Schubert 	} else {
59525039b37SCy Schubert 		dtio->reconnect_timeout = msec*2;
59625039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
59725039b37SCy Schubert 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
59825039b37SCy Schubert 	}
59925039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
60025039b37SCy Schubert 		msec);
60125039b37SCy Schubert 
60225039b37SCy Schubert 	/* setup wait timer */
60325039b37SCy Schubert 	memset(&tv, 0, sizeof(tv));
60425039b37SCy Schubert 	tv.tv_sec = msec/1000;
60525039b37SCy Schubert 	tv.tv_usec = (msec%1000)*1000;
60625039b37SCy Schubert 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
60725039b37SCy Schubert 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
60825039b37SCy Schubert 		log_err("dnstap io: could not reconnect ev timer add");
60925039b37SCy Schubert 		return;
61025039b37SCy Schubert 	}
61125039b37SCy Schubert 	dtio->reconnect_is_added = 1;
61225039b37SCy Schubert }
61325039b37SCy Schubert 
61425039b37SCy Schubert /** remove dtio reconnect timer */
dtio_reconnect_del(struct dt_io_thread * dtio)61525039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio)
61625039b37SCy Schubert {
61725039b37SCy Schubert 	if(!dtio->reconnect_is_added)
61825039b37SCy Schubert 		return;
61925039b37SCy Schubert 	ub_timer_del(dtio->reconnect_timer);
62025039b37SCy Schubert 	dtio->reconnect_is_added = 0;
62125039b37SCy Schubert }
62225039b37SCy Schubert 
62325039b37SCy Schubert /** clear the reconnect exponential backoff timer.
62425039b37SCy Schubert  * We have successfully connected so we can try again with short timeouts. */
dtio_reconnect_clear(struct dt_io_thread * dtio)62525039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio)
62625039b37SCy Schubert {
62725039b37SCy Schubert 	dtio->reconnect_timeout = 0;
62825039b37SCy Schubert 	dtio_reconnect_del(dtio);
62925039b37SCy Schubert }
63025039b37SCy Schubert 
63125039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */
dtio_reconnect_slow(struct dt_io_thread * dtio,int msec)63225039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
63325039b37SCy Schubert {
63425039b37SCy Schubert 	dtio_reconnect_del(dtio);
63525039b37SCy Schubert 	dtio->reconnect_timeout = msec;
63625039b37SCy Schubert 	dtio_reconnect_enable(dtio);
63725039b37SCy Schubert }
63825039b37SCy Schubert 
63925039b37SCy Schubert /** delete the current message in the dtio, and reset counters */
dtio_cur_msg_free(struct dt_io_thread * dtio)64025039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio)
64125039b37SCy Schubert {
64225039b37SCy Schubert 	free(dtio->cur_msg);
64325039b37SCy Schubert 	dtio->cur_msg = NULL;
64425039b37SCy Schubert 	dtio->cur_msg_len = 0;
64525039b37SCy Schubert 	dtio->cur_msg_done = 0;
64625039b37SCy Schubert 	dtio->cur_msg_len_done = 0;
64725039b37SCy Schubert }
64825039b37SCy Schubert 
64925039b37SCy Schubert /** delete the buffer and counters used to read frame */
dtio_read_frame_free(struct dt_frame_read_buf * rb)65025039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
65125039b37SCy Schubert {
65225039b37SCy Schubert 	if(rb->buf) {
65325039b37SCy Schubert 		free(rb->buf);
65425039b37SCy Schubert 		rb->buf = NULL;
65525039b37SCy Schubert 	}
65625039b37SCy Schubert 	rb->buf_count = 0;
65725039b37SCy Schubert 	rb->buf_cap = 0;
65825039b37SCy Schubert 	rb->frame_len = 0;
65925039b37SCy Schubert 	rb->frame_len_done = 0;
66025039b37SCy Schubert 	rb->control_frame = 0;
66125039b37SCy Schubert }
66225039b37SCy Schubert 
66325039b37SCy Schubert /** del the output file descriptor event for listening */
dtio_del_output_event(struct dt_io_thread * dtio)66425039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio)
66525039b37SCy Schubert {
66625039b37SCy Schubert 	if(!dtio->event_added)
66725039b37SCy Schubert 		return;
66825039b37SCy Schubert 	ub_event_del(dtio->event);
66925039b37SCy Schubert 	dtio->event_added = 0;
67025039b37SCy Schubert 	dtio->event_added_is_write = 0;
67125039b37SCy Schubert }
67225039b37SCy Schubert 
67325039b37SCy Schubert /** close dtio socket and set it to -1 */
dtio_close_fd(struct dt_io_thread * dtio)67425039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio)
67525039b37SCy Schubert {
676c0caa2e2SCy Schubert 	sock_close(dtio->fd);
67725039b37SCy Schubert 	dtio->fd = -1;
67825039b37SCy Schubert }
67925039b37SCy Schubert 
68025039b37SCy Schubert /** close and stop the output file descriptor event */
dtio_close_output(struct dt_io_thread * dtio)68125039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio)
68225039b37SCy Schubert {
68325039b37SCy Schubert 	if(!dtio->event)
68425039b37SCy Schubert 		return;
68525039b37SCy Schubert 	ub_event_free(dtio->event);
68625039b37SCy Schubert 	dtio->event = NULL;
68725039b37SCy Schubert 	if(dtio->ssl) {
68825039b37SCy Schubert #ifdef HAVE_SSL
68925039b37SCy Schubert 		SSL_shutdown(dtio->ssl);
69025039b37SCy Schubert 		SSL_free(dtio->ssl);
69125039b37SCy Schubert 		dtio->ssl = NULL;
69225039b37SCy Schubert #endif
69325039b37SCy Schubert 	}
69425039b37SCy Schubert 	dtio_close_fd(dtio);
69525039b37SCy Schubert 
69625039b37SCy Schubert 	/* if there is a (partial) message, discard it
69725039b37SCy Schubert 	 * we cannot send (the remainder of) it, and a new
69825039b37SCy Schubert 	 * connection needs to start with a control frame. */
69925039b37SCy Schubert 	if(dtio->cur_msg) {
70025039b37SCy Schubert 		dtio_cur_msg_free(dtio);
70125039b37SCy Schubert 	}
70225039b37SCy Schubert 
70325039b37SCy Schubert 	dtio->ready_frame_sent = 0;
70425039b37SCy Schubert 	dtio->accept_frame_received = 0;
70525039b37SCy Schubert 	dtio_read_frame_free(&dtio->read_frame);
70625039b37SCy Schubert 
70725039b37SCy Schubert 	dtio_reconnect_enable(dtio);
70825039b37SCy Schubert }
70925039b37SCy Schubert 
71025039b37SCy Schubert /** check for pending nonblocking connect errors,
71125039b37SCy Schubert  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
dtio_check_nb_connect(struct dt_io_thread * dtio)71225039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio)
71325039b37SCy Schubert {
71425039b37SCy Schubert 	int error = 0;
71525039b37SCy Schubert 	socklen_t len = (socklen_t)sizeof(error);
71625039b37SCy Schubert 	if(!dtio->check_nb_connect)
71725039b37SCy Schubert 		return 1; /* everything okay */
71825039b37SCy Schubert 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
71925039b37SCy Schubert 		&len) < 0) {
72025039b37SCy Schubert #ifndef USE_WINSOCK
72125039b37SCy Schubert 		error = errno; /* on solaris errno is error */
72225039b37SCy Schubert #else
72325039b37SCy Schubert 		error = WSAGetLastError();
72425039b37SCy Schubert #endif
72525039b37SCy Schubert 	}
72625039b37SCy Schubert #ifndef USE_WINSOCK
72725039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
72825039b37SCy Schubert 	if(error == EINPROGRESS || error == EWOULDBLOCK)
72925039b37SCy Schubert 		return 0; /* try again later */
73025039b37SCy Schubert #endif
73125039b37SCy Schubert #else
73225039b37SCy Schubert 	if(error == WSAEINPROGRESS) {
73325039b37SCy Schubert 		return 0; /* try again later */
73425039b37SCy Schubert 	} else if(error == WSAEWOULDBLOCK) {
73525039b37SCy Schubert 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
73625039b37SCy Schubert 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
73725039b37SCy Schubert 		return 0; /* try again later */
73825039b37SCy Schubert 	}
73925039b37SCy Schubert #endif
74025039b37SCy Schubert 	if(error != 0) {
74125039b37SCy Schubert 		char* to = dtio->socket_path;
74225039b37SCy Schubert 		if(!to) to = dtio->ip_str;
74325039b37SCy Schubert 		if(!to) to = "";
74425039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
745c0caa2e2SCy Schubert 			to, sock_strerror(error));
74625039b37SCy Schubert 		return -1; /* error, close it */
74725039b37SCy Schubert 	}
74825039b37SCy Schubert 
74925039b37SCy Schubert 	if(dtio->ip_str)
75025039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
75125039b37SCy Schubert 			dtio->ip_str);
75225039b37SCy Schubert 	else if(dtio->socket_path)
75325039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
75425039b37SCy Schubert 			dtio->socket_path);
75525039b37SCy Schubert 	dtio_reconnect_clear(dtio);
75625039b37SCy Schubert 	dtio->check_nb_connect = 0;
75725039b37SCy Schubert 	return 1; /* everything okay */
75825039b37SCy Schubert }
75925039b37SCy Schubert 
76025039b37SCy Schubert #ifdef HAVE_SSL
76125039b37SCy Schubert /** write to ssl output
76225039b37SCy Schubert  * returns number of bytes written, 0 if nothing happened,
76325039b37SCy Schubert  * try again later, or -1 if the channel is to be closed. */
dtio_write_ssl(struct dt_io_thread * dtio,uint8_t * buf,size_t len)76425039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
76525039b37SCy Schubert 	size_t len)
76625039b37SCy Schubert {
76725039b37SCy Schubert 	int r;
76825039b37SCy Schubert 	ERR_clear_error();
76925039b37SCy Schubert 	r = SSL_write(dtio->ssl, buf, len);
77025039b37SCy Schubert 	if(r <= 0) {
77125039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
77225039b37SCy Schubert 		if(want == SSL_ERROR_ZERO_RETURN) {
77325039b37SCy Schubert 			/* closed */
77425039b37SCy Schubert 			return -1;
77525039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_READ) {
77625039b37SCy Schubert 			/* we want a brief read event */
77725039b37SCy Schubert 			dtio_enable_brief_read(dtio);
77825039b37SCy Schubert 			return 0;
77925039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
78025039b37SCy Schubert 			/* write again later */
78125039b37SCy Schubert 			return 0;
78225039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
78325039b37SCy Schubert #ifdef EPIPE
78425039b37SCy Schubert 			if(errno == EPIPE && verbosity < 2)
78525039b37SCy Schubert 				return -1; /* silence 'broken pipe' */
78625039b37SCy Schubert #endif
78725039b37SCy Schubert #ifdef ECONNRESET
78825039b37SCy Schubert 			if(errno == ECONNRESET && verbosity < 2)
78925039b37SCy Schubert 				return -1; /* silence reset by peer */
79025039b37SCy Schubert #endif
79125039b37SCy Schubert 			if(errno != 0) {
79225039b37SCy Schubert 				log_err("dnstap io, SSL_write syscall: %s",
79325039b37SCy Schubert 					strerror(errno));
79425039b37SCy Schubert 			}
79525039b37SCy Schubert 			return -1;
79625039b37SCy Schubert 		}
797103ba509SCy Schubert 		log_crypto_err_io("dnstap io, could not SSL_write", want);
79825039b37SCy Schubert 		return -1;
79925039b37SCy Schubert 	}
80025039b37SCy Schubert 	return r;
80125039b37SCy Schubert }
80225039b37SCy Schubert #endif /* HAVE_SSL */
80325039b37SCy Schubert 
80425039b37SCy Schubert /** write buffer to output.
80525039b37SCy Schubert  * returns number of bytes written, 0 if nothing happened,
80625039b37SCy Schubert  * try again later, or -1 if the channel is to be closed. */
dtio_write_buf(struct dt_io_thread * dtio,uint8_t * buf,size_t len)80725039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
80825039b37SCy Schubert 	size_t len)
80925039b37SCy Schubert {
81025039b37SCy Schubert 	ssize_t ret;
81125039b37SCy Schubert 	if(dtio->fd == -1)
81225039b37SCy Schubert 		return -1;
81325039b37SCy Schubert #ifdef HAVE_SSL
81425039b37SCy Schubert 	if(dtio->ssl)
81525039b37SCy Schubert 		return dtio_write_ssl(dtio, buf, len);
81625039b37SCy Schubert #endif
81725039b37SCy Schubert 	ret = send(dtio->fd, (void*)buf, len, 0);
81825039b37SCy Schubert 	if(ret == -1) {
81925039b37SCy Schubert #ifndef USE_WINSOCK
82025039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
82125039b37SCy Schubert 			return 0;
82225039b37SCy Schubert #else
82325039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
82425039b37SCy Schubert 			return 0;
82525039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
82625039b37SCy Schubert 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
82725039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
82825039b37SCy Schubert 				UB_EV_WRITE);
82925039b37SCy Schubert 			return 0;
83025039b37SCy Schubert 		}
83125039b37SCy Schubert #endif
832c0caa2e2SCy Schubert 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
83325039b37SCy Schubert 		return -1;
83425039b37SCy Schubert 	}
83525039b37SCy Schubert 	return ret;
83625039b37SCy Schubert }
83725039b37SCy Schubert 
83825039b37SCy Schubert #ifdef HAVE_WRITEV
83925039b37SCy Schubert /** write with writev, len and message, in one write, if possible.
84025039b37SCy Schubert  * return true if message is done, false if incomplete */
dtio_write_with_writev(struct dt_io_thread * dtio)84125039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio)
84225039b37SCy Schubert {
84325039b37SCy Schubert 	uint32_t sendlen = htonl(dtio->cur_msg_len);
84425039b37SCy Schubert 	struct iovec iov[2];
84525039b37SCy Schubert 	ssize_t r;
84625039b37SCy Schubert 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
84725039b37SCy Schubert 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
84825039b37SCy Schubert 	iov[1].iov_base = dtio->cur_msg;
84925039b37SCy Schubert 	iov[1].iov_len = dtio->cur_msg_len;
85025039b37SCy Schubert 	log_assert(iov[0].iov_len > 0);
85125039b37SCy Schubert 	r = writev(dtio->fd, iov, 2);
85225039b37SCy Schubert 	if(r == -1) {
85325039b37SCy Schubert #ifndef USE_WINSOCK
85425039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
85525039b37SCy Schubert 			return 0;
85625039b37SCy Schubert #else
85725039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
85825039b37SCy Schubert 			return 0;
85925039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
86025039b37SCy Schubert 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
86125039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
86225039b37SCy Schubert 				UB_EV_WRITE);
86325039b37SCy Schubert 			return 0;
86425039b37SCy Schubert 		}
86525039b37SCy Schubert #endif
866c0caa2e2SCy Schubert 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
86725039b37SCy Schubert 		/* close the channel */
86825039b37SCy Schubert 		dtio_del_output_event(dtio);
86925039b37SCy Schubert 		dtio_close_output(dtio);
87025039b37SCy Schubert 		return 0;
87125039b37SCy Schubert 	}
87225039b37SCy Schubert 	/* written r bytes */
87325039b37SCy Schubert 	dtio->cur_msg_len_done += r;
87425039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4)
87525039b37SCy Schubert 		return 0;
87625039b37SCy Schubert 	if(dtio->cur_msg_len_done > 4) {
87725039b37SCy Schubert 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
87825039b37SCy Schubert 		dtio->cur_msg_len_done = 4;
87925039b37SCy Schubert 	}
88025039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len)
88125039b37SCy Schubert 		return 0;
88225039b37SCy Schubert 	return 1;
88325039b37SCy Schubert }
88425039b37SCy Schubert #endif /* HAVE_WRITEV */
88525039b37SCy Schubert 
88625039b37SCy Schubert /** write more of the length, preceding the data frame.
88725039b37SCy Schubert  * return true if message is done, false if incomplete. */
dtio_write_more_of_len(struct dt_io_thread * dtio)88825039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio)
88925039b37SCy Schubert {
89025039b37SCy Schubert 	uint32_t sendlen;
89125039b37SCy Schubert 	int r;
89225039b37SCy Schubert 	if(dtio->cur_msg_len_done >= 4)
89325039b37SCy Schubert 		return 1;
89425039b37SCy Schubert #ifdef HAVE_WRITEV
89525039b37SCy Schubert 	if(!dtio->ssl) {
89625039b37SCy Schubert 		/* we try writev for everything.*/
89725039b37SCy Schubert 		return dtio_write_with_writev(dtio);
89825039b37SCy Schubert 	}
89925039b37SCy Schubert #endif /* HAVE_WRITEV */
90025039b37SCy Schubert 	sendlen = htonl(dtio->cur_msg_len);
90125039b37SCy Schubert 	r = dtio_write_buf(dtio,
90225039b37SCy Schubert 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
90325039b37SCy Schubert 		sizeof(sendlen)-dtio->cur_msg_len_done);
90425039b37SCy Schubert 	if(r == -1) {
90525039b37SCy Schubert 		/* close the channel */
90625039b37SCy Schubert 		dtio_del_output_event(dtio);
90725039b37SCy Schubert 		dtio_close_output(dtio);
90825039b37SCy Schubert 		return 0;
90925039b37SCy Schubert 	} else if(r == 0) {
91025039b37SCy Schubert 		/* try again later */
91125039b37SCy Schubert 		return 0;
91225039b37SCy Schubert 	}
91325039b37SCy Schubert 	dtio->cur_msg_len_done += r;
91425039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4)
91525039b37SCy Schubert 		return 0;
91625039b37SCy Schubert 	return 1;
91725039b37SCy Schubert }
91825039b37SCy Schubert 
91925039b37SCy Schubert /** write more of the data frame.
92025039b37SCy Schubert  * return true if message is done, false if incomplete. */
dtio_write_more_of_data(struct dt_io_thread * dtio)92125039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio)
92225039b37SCy Schubert {
92325039b37SCy Schubert 	int r;
92425039b37SCy Schubert 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
92525039b37SCy Schubert 		return 1;
92625039b37SCy Schubert 	r = dtio_write_buf(dtio,
92725039b37SCy Schubert 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
92825039b37SCy Schubert 		dtio->cur_msg_len - dtio->cur_msg_done);
92925039b37SCy Schubert 	if(r == -1) {
93025039b37SCy Schubert 		/* close the channel */
93125039b37SCy Schubert 		dtio_del_output_event(dtio);
93225039b37SCy Schubert 		dtio_close_output(dtio);
93325039b37SCy Schubert 		return 0;
93425039b37SCy Schubert 	} else if(r == 0) {
93525039b37SCy Schubert 		/* try again later */
93625039b37SCy Schubert 		return 0;
93725039b37SCy Schubert 	}
93825039b37SCy Schubert 	dtio->cur_msg_done += r;
93925039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len)
94025039b37SCy Schubert 		return 0;
94125039b37SCy Schubert 	return 1;
94225039b37SCy Schubert }
94325039b37SCy Schubert 
94424e36522SCy Schubert /** write more of the current message. false if incomplete, true if
94525039b37SCy Schubert  * the message is done */
dtio_write_more(struct dt_io_thread * dtio)94625039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio)
94725039b37SCy Schubert {
94825039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4) {
94925039b37SCy Schubert 		if(!dtio_write_more_of_len(dtio))
95025039b37SCy Schubert 			return 0;
95125039b37SCy Schubert 	}
95225039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
95325039b37SCy Schubert 		if(!dtio_write_more_of_data(dtio))
95425039b37SCy Schubert 			return 0;
95525039b37SCy Schubert 	}
95625039b37SCy Schubert 	return 1;
95725039b37SCy Schubert }
95825039b37SCy Schubert 
95925039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
96025039b37SCy Schubert  * -1: continue, >0: number of bytes read into buffer */
receive_bytes(struct dt_io_thread * dtio,void * buf,size_t len)96125039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
96225039b37SCy Schubert 	ssize_t r;
963865f46b2SCy Schubert 	r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
96425039b37SCy Schubert 	if(r == -1) {
96525039b37SCy Schubert 		char* to = dtio->socket_path;
96625039b37SCy Schubert 		if(!to) to = dtio->ip_str;
96725039b37SCy Schubert 		if(!to) to = "";
96825039b37SCy Schubert #ifndef USE_WINSOCK
96925039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
97025039b37SCy Schubert 			return -1; /* try later */
97125039b37SCy Schubert #else
97225039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS) {
97325039b37SCy Schubert 			return -1; /* try later */
97425039b37SCy Schubert 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
97525039b37SCy Schubert 			ub_winsock_tcp_wouldblock(
97625039b37SCy Schubert 				(dtio->stop_flush_event?
97725039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
97825039b37SCy Schubert 				UB_EV_READ);
97925039b37SCy Schubert 			return -1; /* try later */
98025039b37SCy Schubert 		}
98125039b37SCy Schubert #endif
98225039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
98325039b37SCy Schubert 			verbosity < 4)
98425039b37SCy Schubert 			return 0; /* no log retries on low verbosity */
98525039b37SCy Schubert 		log_err("dnstap io: output closed, recv %s: %s", to,
98625039b37SCy Schubert 			strerror(errno));
98725039b37SCy Schubert 		/* and close below */
98825039b37SCy Schubert 		return 0;
98925039b37SCy Schubert 	}
99025039b37SCy Schubert 	if(r == 0) {
99125039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
99225039b37SCy Schubert 			verbosity < 4)
99325039b37SCy Schubert 			return 0; /* no log retries on low verbosity */
99425039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
99525039b37SCy Schubert 		/* and close below */
99625039b37SCy Schubert 		return 0;
99725039b37SCy Schubert 	}
99825039b37SCy Schubert 	/* something was received */
99925039b37SCy Schubert 	return r;
100025039b37SCy Schubert }
100125039b37SCy Schubert 
100225039b37SCy Schubert #ifdef HAVE_SSL
100325039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
100425039b37SCy Schubert  * -1: continue, >0: number of bytes read into buffer */
ssl_read_bytes(struct dt_io_thread * dtio,void * buf,size_t len)100525039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
100625039b37SCy Schubert {
100725039b37SCy Schubert 	int r;
100825039b37SCy Schubert 	ERR_clear_error();
100925039b37SCy Schubert 	r = SSL_read(dtio->ssl, buf, len);
101025039b37SCy Schubert 	if(r <= 0) {
101125039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
101225039b37SCy Schubert 		if(want == SSL_ERROR_ZERO_RETURN) {
101325039b37SCy Schubert 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
101425039b37SCy Schubert 				verbosity < 4)
101525039b37SCy Schubert 				return 0; /* no log retries on low verbosity */
101625039b37SCy Schubert 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
101725039b37SCy Schubert 				"other side");
101825039b37SCy Schubert 			return 0;
101925039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_READ) {
102025039b37SCy Schubert 			/* continue later */
102125039b37SCy Schubert 			return -1;
102225039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
102325039b37SCy Schubert 			(void)dtio_enable_brief_write(dtio);
102425039b37SCy Schubert 			return -1;
102525039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
102625039b37SCy Schubert #ifdef ECONNRESET
102725039b37SCy Schubert 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
102825039b37SCy Schubert 				errno == ECONNRESET && verbosity < 4)
102925039b37SCy Schubert 				return 0; /* silence reset by peer */
103025039b37SCy Schubert #endif
103125039b37SCy Schubert 			if(errno != 0)
103225039b37SCy Schubert 				log_err("SSL_read syscall: %s",
103325039b37SCy Schubert 					strerror(errno));
103425039b37SCy Schubert 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
103525039b37SCy Schubert 				"other side");
103625039b37SCy Schubert 			return 0;
103725039b37SCy Schubert 		}
1038103ba509SCy Schubert 		log_crypto_err_io("could not SSL_read", want);
103925039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
104025039b37SCy Schubert 				"other side");
104125039b37SCy Schubert 		return 0;
104225039b37SCy Schubert 	}
104325039b37SCy Schubert 	return r;
104425039b37SCy Schubert }
104525039b37SCy Schubert #endif /* HAVE_SSL */
104625039b37SCy Schubert 
104725039b37SCy Schubert /** check if the output fd has been closed,
104825039b37SCy Schubert  * it returns false if the stream is closed. */
dtio_check_close(struct dt_io_thread * dtio)104925039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio)
105025039b37SCy Schubert {
105125039b37SCy Schubert 	/* we don't want to read any packets, but if there are we can
105225039b37SCy Schubert 	 * discard the input (ignore it).  Ignore of unknown (control)
105325039b37SCy Schubert 	 * packets is okay for the framestream protocol.  And also, the
105425039b37SCy Schubert 	 * read call can return that the stream has been closed by the
105525039b37SCy Schubert 	 * other side. */
105625039b37SCy Schubert 	uint8_t buf[1024];
105725039b37SCy Schubert 	int r = -1;
105825039b37SCy Schubert 
105925039b37SCy Schubert 
106025039b37SCy Schubert 	if(dtio->fd == -1) return 0;
106125039b37SCy Schubert 
106225039b37SCy Schubert 	while(r != 0) {
106325039b37SCy Schubert 		/* not interested in buffer content, overwrite */
106425039b37SCy Schubert 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
106525039b37SCy Schubert 		if(r == -1)
106625039b37SCy Schubert 			return 1;
106725039b37SCy Schubert 	}
106825039b37SCy Schubert 	/* the other end has been closed */
106925039b37SCy Schubert 	/* close the channel */
107025039b37SCy Schubert 	dtio_del_output_event(dtio);
107125039b37SCy Schubert 	dtio_close_output(dtio);
107225039b37SCy Schubert 	return 0;
107325039b37SCy Schubert }
107425039b37SCy Schubert 
107525039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed,
107625039b37SCy Schubert  * 1: valid accept received. */
dtio_read_accept_frame(struct dt_io_thread * dtio)107725039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio)
107825039b37SCy Schubert {
107925039b37SCy Schubert 	int r;
108025039b37SCy Schubert 	size_t read_frame_done;
108125039b37SCy Schubert 	while(dtio->read_frame.frame_len_done < 4) {
108225039b37SCy Schubert #ifdef HAVE_SSL
108325039b37SCy Schubert 		if(dtio->ssl) {
108425039b37SCy Schubert 			r = ssl_read_bytes(dtio,
108525039b37SCy Schubert 				(uint8_t*)&dtio->read_frame.frame_len+
108625039b37SCy Schubert 				dtio->read_frame.frame_len_done,
108725039b37SCy Schubert 				4-dtio->read_frame.frame_len_done);
108825039b37SCy Schubert 		} else {
108925039b37SCy Schubert #endif
109025039b37SCy Schubert 			r = receive_bytes(dtio,
109125039b37SCy Schubert 				(uint8_t*)&dtio->read_frame.frame_len+
109225039b37SCy Schubert 				dtio->read_frame.frame_len_done,
109325039b37SCy Schubert 				4-dtio->read_frame.frame_len_done);
109425039b37SCy Schubert #ifdef HAVE_SSL
109525039b37SCy Schubert 		}
109625039b37SCy Schubert #endif
109725039b37SCy Schubert 		if(r == -1)
109825039b37SCy Schubert 			return -1; /* continue reading */
109925039b37SCy Schubert 		if(r == 0) {
110025039b37SCy Schubert 			 /* connection closed */
110125039b37SCy Schubert 			goto close_connection;
110225039b37SCy Schubert 		}
110325039b37SCy Schubert 		dtio->read_frame.frame_len_done += r;
110425039b37SCy Schubert 		if(dtio->read_frame.frame_len_done < 4)
110525039b37SCy Schubert 			return -1; /* continue reading */
110625039b37SCy Schubert 
110725039b37SCy Schubert 		if(dtio->read_frame.frame_len == 0) {
110825039b37SCy Schubert 			dtio->read_frame.frame_len_done = 0;
110925039b37SCy Schubert 			dtio->read_frame.control_frame = 1;
111025039b37SCy Schubert 			continue;
111125039b37SCy Schubert 		}
111225039b37SCy Schubert 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
111325039b37SCy Schubert 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
111425039b37SCy Schubert 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
111525039b37SCy Schubert 				"length of %d bytes, closing connection",
111625039b37SCy Schubert 				DTIO_RECV_FRAME_MAX_LEN);
111725039b37SCy Schubert 			goto close_connection;
111825039b37SCy Schubert 		}
111925039b37SCy Schubert 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
112025039b37SCy Schubert 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
112125039b37SCy Schubert 		if(!dtio->read_frame.buf) {
112225039b37SCy Schubert 			log_err("dnstap io: out of memory (creating read "
112325039b37SCy Schubert 				"buffer)");
112425039b37SCy Schubert 			goto close_connection;
112525039b37SCy Schubert 		}
112625039b37SCy Schubert 	}
112725039b37SCy Schubert 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
112825039b37SCy Schubert #ifdef HAVE_SSL
112925039b37SCy Schubert 		if(dtio->ssl) {
113025039b37SCy Schubert 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
113125039b37SCy Schubert 				dtio->read_frame.buf_count,
113225039b37SCy Schubert 				dtio->read_frame.buf_cap-
113325039b37SCy Schubert 				dtio->read_frame.buf_count);
113425039b37SCy Schubert 		} else {
113525039b37SCy Schubert #endif
113625039b37SCy Schubert 			r = receive_bytes(dtio, dtio->read_frame.buf+
113725039b37SCy Schubert 				dtio->read_frame.buf_count,
113825039b37SCy Schubert 				dtio->read_frame.buf_cap-
113925039b37SCy Schubert 				dtio->read_frame.buf_count);
114025039b37SCy Schubert #ifdef HAVE_SSL
114125039b37SCy Schubert 		}
114225039b37SCy Schubert #endif
114325039b37SCy Schubert 		if(r == -1)
114425039b37SCy Schubert 			return -1; /* continue reading */
114525039b37SCy Schubert 		if(r == 0) {
114625039b37SCy Schubert 			 /* connection closed */
114725039b37SCy Schubert 			goto close_connection;
114825039b37SCy Schubert 		}
114925039b37SCy Schubert 		dtio->read_frame.buf_count += r;
115025039b37SCy Schubert 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
115125039b37SCy Schubert 			return -1; /* continue reading */
115225039b37SCy Schubert 	}
115325039b37SCy Schubert 
115425039b37SCy Schubert 	/* Complete frame received, check if this is a valid ACCEPT control
115525039b37SCy Schubert 	 * frame. */
115625039b37SCy Schubert 	if(dtio->read_frame.frame_len < 4) {
115725039b37SCy Schubert 		verbose(VERB_OPS, "dnstap: invalid data received");
115825039b37SCy Schubert 		goto close_connection;
115925039b37SCy Schubert 	}
116025039b37SCy Schubert 	if(sldns_read_uint32(dtio->read_frame.buf) !=
116125039b37SCy Schubert 		FSTRM_CONTROL_FRAME_ACCEPT) {
116225039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
116325039b37SCy Schubert 			"ignored");
116425039b37SCy Schubert 		dtio->ready_frame_sent = 0;
116525039b37SCy Schubert 		dtio->accept_frame_received = 0;
116625039b37SCy Schubert 		dtio_read_frame_free(&dtio->read_frame);
116725039b37SCy Schubert 		return -1;
116825039b37SCy Schubert 	}
116925039b37SCy Schubert 	read_frame_done = 4; /* control frame type */
117025039b37SCy Schubert 
117125039b37SCy Schubert 	/* Iterate over control fields, ignore unknown types.
117225039b37SCy Schubert 	 * Need to be able to read at least 8 bytes (control field type +
117325039b37SCy Schubert 	 * length). */
117425039b37SCy Schubert 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
117525039b37SCy Schubert 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
117625039b37SCy Schubert 			read_frame_done);
117725039b37SCy Schubert 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
117825039b37SCy Schubert 			read_frame_done + 4);
117925039b37SCy Schubert 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
118025039b37SCy Schubert 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
118125039b37SCy Schubert 				read_frame_done+8+len <=
118225039b37SCy Schubert 				dtio->read_frame.frame_len &&
118325039b37SCy Schubert 				memcmp(dtio->read_frame.buf + read_frame_done +
118425039b37SCy Schubert 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
118525039b37SCy Schubert 				if(!dtio_control_start_send(dtio)) {
118625039b37SCy Schubert 					verbose(VERB_OPS, "dnstap io: out of "
118725039b37SCy Schubert 					 "memory while sending START frame");
118825039b37SCy Schubert 					goto close_connection;
118925039b37SCy Schubert 				}
119025039b37SCy Schubert 				dtio->accept_frame_received = 1;
1191c0caa2e2SCy Schubert 				if(!dtio_add_output_event_write(dtio))
1192c0caa2e2SCy Schubert 					goto close_connection;
119325039b37SCy Schubert 				return 1;
119425039b37SCy Schubert 			} else {
119524e36522SCy Schubert 				/* unknown content type */
119625039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
119725039b37SCy Schubert 					"contains unknown content type, "
119825039b37SCy Schubert 					"closing connection");
119925039b37SCy Schubert 				goto close_connection;
120025039b37SCy Schubert 			}
120125039b37SCy Schubert 		}
120225039b37SCy Schubert 		/* unknown option, try next */
120325039b37SCy Schubert 		read_frame_done += 8+len;
120425039b37SCy Schubert 	}
120525039b37SCy Schubert 
120625039b37SCy Schubert 
120725039b37SCy Schubert close_connection:
120825039b37SCy Schubert 	dtio_del_output_event(dtio);
120925039b37SCy Schubert 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
121025039b37SCy Schubert 	dtio_close_output(dtio);
121125039b37SCy Schubert 	return 0;
121225039b37SCy Schubert }
121325039b37SCy Schubert 
121425039b37SCy Schubert /** add the output file descriptor event for listening, read only */
dtio_add_output_event_read(struct dt_io_thread * dtio)121525039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio)
121625039b37SCy Schubert {
121725039b37SCy Schubert 	if(!dtio->event)
121825039b37SCy Schubert 		return 0;
121925039b37SCy Schubert 	if(dtio->event_added && !dtio->event_added_is_write)
122025039b37SCy Schubert 		return 1;
122125039b37SCy Schubert 	/* we have to (re-)register the event */
122225039b37SCy Schubert 	if(dtio->event_added)
122325039b37SCy Schubert 		ub_event_del(dtio->event);
122425039b37SCy Schubert 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
122525039b37SCy Schubert 	if(ub_event_add(dtio->event, NULL) != 0) {
122625039b37SCy Schubert 		log_err("dnstap io: out of memory (adding event)");
122725039b37SCy Schubert 		dtio->event_added = 0;
122825039b37SCy Schubert 		dtio->event_added_is_write = 0;
122925039b37SCy Schubert 		/* close output and start reattempts to open it */
123025039b37SCy Schubert 		dtio_close_output(dtio);
123125039b37SCy Schubert 		return 0;
123225039b37SCy Schubert 	}
123325039b37SCy Schubert 	dtio->event_added = 1;
123425039b37SCy Schubert 	dtio->event_added_is_write = 0;
123525039b37SCy Schubert 	return 1;
123625039b37SCy Schubert }
123725039b37SCy Schubert 
123825039b37SCy Schubert /** add the output file descriptor event for listening, read and write */
dtio_add_output_event_write(struct dt_io_thread * dtio)123925039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio)
124025039b37SCy Schubert {
124125039b37SCy Schubert 	if(!dtio->event)
124225039b37SCy Schubert 		return 0;
124325039b37SCy Schubert 	if(dtio->event_added && dtio->event_added_is_write)
124425039b37SCy Schubert 		return 1;
124525039b37SCy Schubert 	/* we have to (re-)register the event */
124625039b37SCy Schubert 	if(dtio->event_added)
124725039b37SCy Schubert 		ub_event_del(dtio->event);
124825039b37SCy Schubert 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
124925039b37SCy Schubert 	if(ub_event_add(dtio->event, NULL) != 0) {
125025039b37SCy Schubert 		log_err("dnstap io: out of memory (adding event)");
125125039b37SCy Schubert 		dtio->event_added = 0;
125225039b37SCy Schubert 		dtio->event_added_is_write = 0;
125325039b37SCy Schubert 		/* close output and start reattempts to open it */
125425039b37SCy Schubert 		dtio_close_output(dtio);
125525039b37SCy Schubert 		return 0;
125625039b37SCy Schubert 	}
125725039b37SCy Schubert 	dtio->event_added = 1;
125825039b37SCy Schubert 	dtio->event_added_is_write = 1;
125925039b37SCy Schubert 	return 1;
126025039b37SCy Schubert }
126125039b37SCy Schubert 
126225039b37SCy Schubert /** put the dtio thread to sleep */
dtio_sleep(struct dt_io_thread * dtio)126325039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio)
126425039b37SCy Schubert {
126525039b37SCy Schubert 	/* unregister the event polling for write, because there is
126625039b37SCy Schubert 	 * nothing to be written */
126725039b37SCy Schubert 	(void)dtio_add_output_event_read(dtio);
1268*56850988SCy Schubert 
1269*56850988SCy Schubert 	/* Set wakeuptimer enabled off; so that the next worker thread that
1270*56850988SCy Schubert 	 * wants to log starts a timer if needed, since the writer thread
1271*56850988SCy Schubert 	 * has gone to sleep. */
1272*56850988SCy Schubert 	lock_basic_lock(&dtio->wakeup_timer_lock);
1273*56850988SCy Schubert 	dtio->wakeup_timer_enabled = 0;
1274*56850988SCy Schubert 	lock_basic_unlock(&dtio->wakeup_timer_lock);
127525039b37SCy Schubert }
127625039b37SCy Schubert 
127725039b37SCy Schubert #ifdef HAVE_SSL
127825039b37SCy Schubert /** enable the brief read condition */
dtio_enable_brief_read(struct dt_io_thread * dtio)127925039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio)
128025039b37SCy Schubert {
128125039b37SCy Schubert 	dtio->ssl_brief_read = 1;
128225039b37SCy Schubert 	if(dtio->stop_flush_event) {
128325039b37SCy Schubert 		ub_event_del(dtio->stop_flush_event);
128425039b37SCy Schubert 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
128525039b37SCy Schubert 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
128625039b37SCy Schubert 			log_err("dnstap io, stop flush, could not ub_event_add");
128725039b37SCy Schubert 			return 0;
128825039b37SCy Schubert 		}
128925039b37SCy Schubert 		return 1;
129025039b37SCy Schubert 	}
129125039b37SCy Schubert 	return dtio_add_output_event_read(dtio);
129225039b37SCy Schubert }
129325039b37SCy Schubert #endif /* HAVE_SSL */
129425039b37SCy Schubert 
129525039b37SCy Schubert #ifdef HAVE_SSL
129625039b37SCy Schubert /** disable the brief read condition */
dtio_disable_brief_read(struct dt_io_thread * dtio)129725039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio)
129825039b37SCy Schubert {
129925039b37SCy Schubert 	dtio->ssl_brief_read = 0;
130025039b37SCy Schubert 	if(dtio->stop_flush_event) {
130125039b37SCy Schubert 		ub_event_del(dtio->stop_flush_event);
130225039b37SCy Schubert 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
130325039b37SCy Schubert 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
130425039b37SCy Schubert 			log_err("dnstap io, stop flush, could not ub_event_add");
130525039b37SCy Schubert 			return 0;
130625039b37SCy Schubert 		}
130725039b37SCy Schubert 		return 1;
130825039b37SCy Schubert 	}
130925039b37SCy Schubert 	return dtio_add_output_event_write(dtio);
131025039b37SCy Schubert }
131125039b37SCy Schubert #endif /* HAVE_SSL */
131225039b37SCy Schubert 
131325039b37SCy Schubert #ifdef HAVE_SSL
131425039b37SCy Schubert /** enable the brief write condition */
dtio_enable_brief_write(struct dt_io_thread * dtio)131525039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio)
131625039b37SCy Schubert {
131725039b37SCy Schubert 	dtio->ssl_brief_write = 1;
131825039b37SCy Schubert 	return dtio_add_output_event_write(dtio);
131925039b37SCy Schubert }
132025039b37SCy Schubert #endif /* HAVE_SSL */
132125039b37SCy Schubert 
132225039b37SCy Schubert #ifdef HAVE_SSL
132325039b37SCy Schubert /** disable the brief write condition */
dtio_disable_brief_write(struct dt_io_thread * dtio)132425039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio)
132525039b37SCy Schubert {
132625039b37SCy Schubert 	dtio->ssl_brief_write = 0;
132725039b37SCy Schubert 	return dtio_add_output_event_read(dtio);
132825039b37SCy Schubert }
132925039b37SCy Schubert #endif /* HAVE_SSL */
133025039b37SCy Schubert 
133125039b37SCy Schubert #ifdef HAVE_SSL
133225039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/
dtio_ssl_check_peer(struct dt_io_thread * dtio)133325039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
133425039b37SCy Schubert {
133525039b37SCy Schubert 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
133625039b37SCy Schubert 		/* verification */
133725039b37SCy Schubert 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1338*56850988SCy Schubert #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1339*56850988SCy Schubert 			X509* x = SSL_get1_peer_certificate(dtio->ssl);
1340*56850988SCy Schubert #else
134125039b37SCy Schubert 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1342*56850988SCy Schubert #endif
134325039b37SCy Schubert 			if(!x) {
134425039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
134525039b37SCy Schubert 					"connection failed no certificate",
134625039b37SCy Schubert 					dtio->ip_str);
134725039b37SCy Schubert 				return 0;
134825039b37SCy Schubert 			}
134925039b37SCy Schubert 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
135025039b37SCy Schubert 				x);
135125039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME
135225039b37SCy Schubert 			if(SSL_get0_peername(dtio->ssl)) {
135325039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
135425039b37SCy Schubert 					"connection to %s authenticated",
135525039b37SCy Schubert 					dtio->ip_str,
135625039b37SCy Schubert 					SSL_get0_peername(dtio->ssl));
135725039b37SCy Schubert 			} else {
135825039b37SCy Schubert #endif
135925039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
136025039b37SCy Schubert 					"connection authenticated",
136125039b37SCy Schubert 					dtio->ip_str);
136225039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME
136325039b37SCy Schubert 			}
136425039b37SCy Schubert #endif
136525039b37SCy Schubert 			X509_free(x);
136625039b37SCy Schubert 		} else {
1367*56850988SCy Schubert #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1368*56850988SCy Schubert 			X509* x = SSL_get1_peer_certificate(dtio->ssl);
1369*56850988SCy Schubert #else
137025039b37SCy Schubert 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1371*56850988SCy Schubert #endif
137225039b37SCy Schubert 			if(x) {
137325039b37SCy Schubert 				log_cert(VERB_ALGO, "dnstap io, peer "
137425039b37SCy Schubert 					"certificate", x);
137525039b37SCy Schubert 				X509_free(x);
137625039b37SCy Schubert 			}
137725039b37SCy Schubert 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
137825039b37SCy Schubert 				"failed: failed to authenticate",
137925039b37SCy Schubert 				dtio->ip_str);
138025039b37SCy Schubert 			return 0;
138125039b37SCy Schubert 		}
138225039b37SCy Schubert 	} else {
138325039b37SCy Schubert 		/* unauthenticated, the verify peer flag was not set
138425039b37SCy Schubert 		 * in ssl when the ssl object was created from ssl_ctx */
138525039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
138625039b37SCy Schubert 			dtio->ip_str);
138725039b37SCy Schubert 	}
138825039b37SCy Schubert 	return 1;
138925039b37SCy Schubert }
139025039b37SCy Schubert #endif /* HAVE_SSL */
139125039b37SCy Schubert 
139225039b37SCy Schubert #ifdef HAVE_SSL
139325039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */
dtio_ssl_handshake(struct dt_io_thread * dtio,struct stop_flush_info * info)139425039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio,
139525039b37SCy Schubert 	struct stop_flush_info* info)
139625039b37SCy Schubert {
139725039b37SCy Schubert 	int r;
139825039b37SCy Schubert 	if(dtio->ssl_brief_read) {
139925039b37SCy Schubert 		/* assume the brief read condition is satisfied,
140025039b37SCy Schubert 		 * if we need more or again, we can set it again */
140125039b37SCy Schubert 		if(!dtio_disable_brief_read(dtio)) {
140225039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
140325039b37SCy Schubert 			return 0;
140425039b37SCy Schubert 		}
140525039b37SCy Schubert 	}
140625039b37SCy Schubert 	if(dtio->ssl_handshake_done)
140725039b37SCy Schubert 		return 1;
140825039b37SCy Schubert 
140925039b37SCy Schubert 	ERR_clear_error();
141025039b37SCy Schubert 	r = SSL_do_handshake(dtio->ssl);
141125039b37SCy Schubert 	if(r != 1) {
141225039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
141325039b37SCy Schubert 		if(want == SSL_ERROR_WANT_READ) {
141425039b37SCy Schubert 			/* we want to read on the connection */
141525039b37SCy Schubert 			if(!dtio_enable_brief_read(dtio)) {
141625039b37SCy Schubert 				if(info) dtio_stop_flush_exit(info);
141725039b37SCy Schubert 				return 0;
141825039b37SCy Schubert 			}
141925039b37SCy Schubert 			return 0;
142025039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
142125039b37SCy Schubert 			/* we want to write on the connection */
142225039b37SCy Schubert 			return 0;
142325039b37SCy Schubert 		} else if(r == 0) {
142425039b37SCy Schubert 			/* closed */
142525039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
142625039b37SCy Schubert 			dtio_del_output_event(dtio);
142725039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
142825039b37SCy Schubert 			dtio_close_output(dtio);
142925039b37SCy Schubert 			return 0;
143025039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
143125039b37SCy Schubert 			/* SYSCALL and errno==0 means closed uncleanly */
143225039b37SCy Schubert 			int silent = 0;
143325039b37SCy Schubert #ifdef EPIPE
143425039b37SCy Schubert 			if(errno == EPIPE && verbosity < 2)
143525039b37SCy Schubert 				silent = 1; /* silence 'broken pipe' */
143625039b37SCy Schubert #endif
143725039b37SCy Schubert #ifdef ECONNRESET
143825039b37SCy Schubert 			if(errno == ECONNRESET && verbosity < 2)
143925039b37SCy Schubert 				silent = 1; /* silence reset by peer */
144025039b37SCy Schubert #endif
144125039b37SCy Schubert 			if(errno == 0)
144225039b37SCy Schubert 				silent = 1;
144325039b37SCy Schubert 			if(!silent)
144425039b37SCy Schubert 				log_err("dnstap io, SSL_handshake syscall: %s",
144525039b37SCy Schubert 					strerror(errno));
144625039b37SCy Schubert 			/* closed */
144725039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
144825039b37SCy Schubert 			dtio_del_output_event(dtio);
144925039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
145025039b37SCy Schubert 			dtio_close_output(dtio);
145125039b37SCy Schubert 			return 0;
145225039b37SCy Schubert 		} else {
145325039b37SCy Schubert 			unsigned long err = ERR_get_error();
145425039b37SCy Schubert 			if(!squelch_err_ssl_handshake(err)) {
1455103ba509SCy Schubert 				log_crypto_err_io_code("dnstap io, ssl handshake failed",
1456103ba509SCy Schubert 					want, err);
145725039b37SCy Schubert 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
145825039b37SCy Schubert 					"from %s", dtio->ip_str);
145925039b37SCy Schubert 			}
146025039b37SCy Schubert 			/* closed */
146125039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
146225039b37SCy Schubert 			dtio_del_output_event(dtio);
146325039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
146425039b37SCy Schubert 			dtio_close_output(dtio);
146525039b37SCy Schubert 			return 0;
146625039b37SCy Schubert 		}
146725039b37SCy Schubert 
146825039b37SCy Schubert 	}
146925039b37SCy Schubert 	/* check peer verification */
147025039b37SCy Schubert 	dtio->ssl_handshake_done = 1;
147125039b37SCy Schubert 
147225039b37SCy Schubert 	if(!dtio_ssl_check_peer(dtio)) {
147325039b37SCy Schubert 		/* closed */
147425039b37SCy Schubert 		if(info) dtio_stop_flush_exit(info);
147525039b37SCy Schubert 		dtio_del_output_event(dtio);
147625039b37SCy Schubert 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
147725039b37SCy Schubert 		dtio_close_output(dtio);
147825039b37SCy Schubert 		return 0;
147925039b37SCy Schubert 	}
148025039b37SCy Schubert 	return 1;
148125039b37SCy Schubert }
148225039b37SCy Schubert #endif /* HAVE_SSL */
148325039b37SCy Schubert 
148425039b37SCy Schubert /** callback for the dnstap events, to write to the output */
dtio_output_cb(int ATTR_UNUSED (fd),short bits,void * arg)148525039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
148625039b37SCy Schubert {
148725039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
148825039b37SCy Schubert 	int i;
148925039b37SCy Schubert 
149025039b37SCy Schubert 	if(dtio->check_nb_connect) {
149125039b37SCy Schubert 		int connect_err = dtio_check_nb_connect(dtio);
149225039b37SCy Schubert 		if(connect_err == -1) {
149325039b37SCy Schubert 			/* close the channel */
149425039b37SCy Schubert 			dtio_del_output_event(dtio);
149525039b37SCy Schubert 			dtio_close_output(dtio);
149625039b37SCy Schubert 			return;
149725039b37SCy Schubert 		} else if(connect_err == 0) {
149825039b37SCy Schubert 			/* try again later */
149925039b37SCy Schubert 			return;
150025039b37SCy Schubert 		}
150125039b37SCy Schubert 		/* nonblocking connect check passed, continue */
150225039b37SCy Schubert 	}
150325039b37SCy Schubert 
150425039b37SCy Schubert #ifdef HAVE_SSL
150525039b37SCy Schubert 	if(dtio->ssl &&
150625039b37SCy Schubert 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
150725039b37SCy Schubert 		if(!dtio_ssl_handshake(dtio, NULL))
150825039b37SCy Schubert 			return;
150925039b37SCy Schubert 	}
151025039b37SCy Schubert #endif
151125039b37SCy Schubert 
151225039b37SCy Schubert 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1513*56850988SCy Schubert #ifdef HAVE_SSL
151425039b37SCy Schubert 		if(dtio->ssl_brief_write)
151525039b37SCy Schubert 			(void)dtio_disable_brief_write(dtio);
1516*56850988SCy Schubert #endif
151725039b37SCy Schubert 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
151825039b37SCy Schubert 			if(dtio_read_accept_frame(dtio) <= 0)
151925039b37SCy Schubert 				return;
152025039b37SCy Schubert 		} else if(!dtio_check_close(dtio))
152125039b37SCy Schubert 			return;
152225039b37SCy Schubert 	}
152325039b37SCy Schubert 
152425039b37SCy Schubert 	/* loop to process a number of messages.  This improves throughput,
152525039b37SCy Schubert 	 * because selecting on write-event if not needed for busy messages
152625039b37SCy Schubert 	 * (dnstap log) generation and if they need to all be written back.
152725039b37SCy Schubert 	 * The write event is usually not blocked up.  But not forever,
152825039b37SCy Schubert 	 * because the event loop needs to stay responsive for other events.
152925039b37SCy Schubert 	 * If there are no (more) messages, or if the output buffers get
153025039b37SCy Schubert 	 * full, it returns out of the loop. */
153125039b37SCy Schubert 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
153225039b37SCy Schubert 		/* see if there are messages that need writing */
153325039b37SCy Schubert 		if(!dtio->cur_msg) {
153425039b37SCy Schubert 			if(!dtio_find_msg(dtio)) {
153525039b37SCy Schubert 				if(i == 0) {
153625039b37SCy Schubert 					/* no messages on the first iteration,
153725039b37SCy Schubert 					 * the queues are all empty */
153825039b37SCy Schubert 					dtio_sleep(dtio);
1539*56850988SCy Schubert 					/* After putting to sleep, see if
1540*56850988SCy Schubert 					 * a message is in a message queue,
1541*56850988SCy Schubert 					 * if so, resume service. Stops a
1542*56850988SCy Schubert 					 * race condition where a thread could
1543*56850988SCy Schubert 					 * have one message but the dtio
1544*56850988SCy Schubert 					 * also just went to sleep. With the
1545*56850988SCy Schubert 					 * message queued between the
1546*56850988SCy Schubert 					 * dtio_find_msg and dtio_sleep
1547*56850988SCy Schubert 					 * calls. */
1548*56850988SCy Schubert 					if(dtio_find_msg(dtio)) {
1549*56850988SCy Schubert 						if(!dtio_add_output_event_write(dtio))
1550*56850988SCy Schubert 							return;
155125039b37SCy Schubert 					}
1552*56850988SCy Schubert 				}
1553*56850988SCy Schubert 				if(!dtio->cur_msg)
155425039b37SCy Schubert 					return; /* nothing to do */
155525039b37SCy Schubert 			}
155625039b37SCy Schubert 		}
155725039b37SCy Schubert 
155825039b37SCy Schubert 		/* write it */
155925039b37SCy Schubert 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
156025039b37SCy Schubert 			if(!dtio_write_more(dtio))
156125039b37SCy Schubert 				return;
156225039b37SCy Schubert 		}
156325039b37SCy Schubert 
156425039b37SCy Schubert 		/* done with the current message */
156525039b37SCy Schubert 		dtio_cur_msg_free(dtio);
156625039b37SCy Schubert 
156725039b37SCy Schubert 		/* If this is a bidirectional stream the first message will be
156825039b37SCy Schubert 		 * the READY control frame. We can only continue writing after
156925039b37SCy Schubert 		 * receiving an ACCEPT control frame. */
157025039b37SCy Schubert 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
157125039b37SCy Schubert 			dtio->ready_frame_sent = 1;
157225039b37SCy Schubert 			(void)dtio_add_output_event_read(dtio);
157325039b37SCy Schubert 			break;
157425039b37SCy Schubert 		}
157525039b37SCy Schubert 	}
157625039b37SCy Schubert }
157725039b37SCy Schubert 
157825039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */
dtio_cmd_cb(int fd,short ATTR_UNUSED (bits),void * arg)157925039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
158025039b37SCy Schubert {
158125039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
158225039b37SCy Schubert 	uint8_t cmd;
158325039b37SCy Schubert 	ssize_t r;
158425039b37SCy Schubert 	if(dtio->want_to_exit)
158525039b37SCy Schubert 		return;
158625039b37SCy Schubert 	r = read(fd, &cmd, sizeof(cmd));
158725039b37SCy Schubert 	if(r == -1) {
158825039b37SCy Schubert #ifndef USE_WINSOCK
158925039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
159025039b37SCy Schubert 			return; /* ignore this */
159125039b37SCy Schubert #else
159225039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
159325039b37SCy Schubert 			return;
159425039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK)
159525039b37SCy Schubert 			return;
159625039b37SCy Schubert #endif
1597c0caa2e2SCy Schubert 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
159825039b37SCy Schubert 		/* and then fall through to quit the thread */
159925039b37SCy Schubert 	} else if(r == 0) {
160025039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
160125039b37SCy Schubert 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
160225039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
160325039b37SCy Schubert 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
160425039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
160525039b37SCy Schubert 
160625039b37SCy Schubert 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
160725039b37SCy Schubert 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
160825039b37SCy Schubert 				"waiting for ACCEPT control frame");
160925039b37SCy Schubert 			return;
161025039b37SCy Schubert 		}
161125039b37SCy Schubert 
161225039b37SCy Schubert 		/* reregister event */
161325039b37SCy Schubert 		if(!dtio_add_output_event_write(dtio))
161425039b37SCy Schubert 			return;
161525039b37SCy Schubert 		return;
161625039b37SCy Schubert 	} else if(r == 1) {
161725039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
161825039b37SCy Schubert 	}
161925039b37SCy Schubert 	dtio->want_to_exit = 1;
162025039b37SCy Schubert 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
162125039b37SCy Schubert 		!= 0) {
162225039b37SCy Schubert 		log_err("dnstap io: could not loopexit");
162325039b37SCy Schubert 	}
162425039b37SCy Schubert }
162525039b37SCy Schubert 
162625039b37SCy Schubert #ifndef THREADS_DISABLED
162725039b37SCy Schubert /** setup the event base for the dnstap io thread */
dtio_setup_base(struct dt_io_thread * dtio,time_t * secs,struct timeval * now)162825039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
162925039b37SCy Schubert 	struct timeval* now)
163025039b37SCy Schubert {
163125039b37SCy Schubert 	memset(now, 0, sizeof(*now));
163225039b37SCy Schubert 	dtio->event_base = ub_default_event_base(0, secs, now);
163325039b37SCy Schubert 	if(!dtio->event_base) {
163425039b37SCy Schubert 		fatal_exit("dnstap io: could not create event_base");
163525039b37SCy Schubert 	}
163625039b37SCy Schubert }
163725039b37SCy Schubert #endif /* THREADS_DISABLED */
163825039b37SCy Schubert 
163925039b37SCy Schubert /** setup the cmd event for dnstap io */
dtio_setup_cmd(struct dt_io_thread * dtio)164025039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio)
164125039b37SCy Schubert {
164225039b37SCy Schubert 	struct ub_event* cmdev;
164325039b37SCy Schubert 	fd_set_nonblock(dtio->commandpipe[0]);
164425039b37SCy Schubert 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
164525039b37SCy Schubert 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
164625039b37SCy Schubert 	if(!cmdev) {
164725039b37SCy Schubert 		fatal_exit("dnstap io: out of memory");
164825039b37SCy Schubert 	}
164925039b37SCy Schubert 	dtio->command_event = cmdev;
165025039b37SCy Schubert 	if(ub_event_add(cmdev, NULL) != 0) {
165125039b37SCy Schubert 		fatal_exit("dnstap io: out of memory (adding event)");
165225039b37SCy Schubert 	}
165325039b37SCy Schubert }
165425039b37SCy Schubert 
165525039b37SCy Schubert /** setup the reconnect event for dnstap io */
dtio_setup_reconnect(struct dt_io_thread * dtio)165625039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio)
165725039b37SCy Schubert {
165825039b37SCy Schubert 	dtio_reconnect_clear(dtio);
165925039b37SCy Schubert 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
166025039b37SCy Schubert 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
166125039b37SCy Schubert 	if(!dtio->reconnect_timer) {
166225039b37SCy Schubert 		fatal_exit("dnstap io: out of memory");
166325039b37SCy Schubert 	}
166425039b37SCy Schubert }
166525039b37SCy Schubert 
166625039b37SCy Schubert /**
166725039b37SCy Schubert  * structure to keep track of information during stop flush
166825039b37SCy Schubert  */
166925039b37SCy Schubert struct stop_flush_info {
167025039b37SCy Schubert 	/** the event base during stop flush */
167125039b37SCy Schubert 	struct ub_event_base* base;
167225039b37SCy Schubert 	/** did we already want to exit this stop-flush event base */
167325039b37SCy Schubert 	int want_to_exit_flush;
167425039b37SCy Schubert 	/** has the timer fired */
167525039b37SCy Schubert 	int timer_done;
167625039b37SCy Schubert 	/** the dtio */
167725039b37SCy Schubert 	struct dt_io_thread* dtio;
167825039b37SCy Schubert 	/** the stop control frame */
167925039b37SCy Schubert 	void* stop_frame;
168025039b37SCy Schubert 	/** length of the stop frame */
168125039b37SCy Schubert 	size_t stop_frame_len;
168225039b37SCy Schubert 	/** how much we have done of the stop frame */
168325039b37SCy Schubert 	size_t stop_frame_done;
168425039b37SCy Schubert };
168525039b37SCy Schubert 
168625039b37SCy Schubert /** exit the stop flush base */
dtio_stop_flush_exit(struct stop_flush_info * info)168725039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info)
168825039b37SCy Schubert {
168925039b37SCy Schubert 	if(info->want_to_exit_flush)
169025039b37SCy Schubert 		return;
169125039b37SCy Schubert 	info->want_to_exit_flush = 1;
169225039b37SCy Schubert 	if(ub_event_base_loopexit(info->base) != 0) {
169325039b37SCy Schubert 		log_err("dnstap io: could not loopexit");
169425039b37SCy Schubert 	}
169525039b37SCy Schubert }
169625039b37SCy Schubert 
169725039b37SCy Schubert /** send the stop control,
169825039b37SCy Schubert  * return true if completed the frame. */
dtio_control_stop_send(struct stop_flush_info * info)169925039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info)
170025039b37SCy Schubert {
170125039b37SCy Schubert 	struct dt_io_thread* dtio = info->dtio;
170225039b37SCy Schubert 	int r;
170325039b37SCy Schubert 	if(info->stop_frame_done >= info->stop_frame_len)
170425039b37SCy Schubert 		return 1;
170525039b37SCy Schubert 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
170625039b37SCy Schubert 		info->stop_frame_done, info->stop_frame_len -
170725039b37SCy Schubert 		info->stop_frame_done);
170825039b37SCy Schubert 	if(r == -1) {
170925039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
171025039b37SCy Schubert 		dtio_stop_flush_exit(info);
171125039b37SCy Schubert 		return 0;
171225039b37SCy Schubert 	}
171325039b37SCy Schubert 	if(r == 0) {
171425039b37SCy Schubert 		/* try again later, or timeout */
171525039b37SCy Schubert 		return 0;
171625039b37SCy Schubert 	}
171725039b37SCy Schubert 	info->stop_frame_done += r;
171825039b37SCy Schubert 	if(info->stop_frame_done < info->stop_frame_len)
171925039b37SCy Schubert 		return 0; /* not done yet */
172025039b37SCy Schubert 	return 1;
172125039b37SCy Schubert }
172225039b37SCy Schubert 
dtio_stop_timer_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)172325039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
172425039b37SCy Schubert 	void* arg)
172525039b37SCy Schubert {
172625039b37SCy Schubert 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
172725039b37SCy Schubert 	if(info->want_to_exit_flush)
172825039b37SCy Schubert 		return;
172925039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
173025039b37SCy Schubert 	info->timer_done = 1;
173125039b37SCy Schubert 	dtio_stop_flush_exit(info);
173225039b37SCy Schubert }
173325039b37SCy Schubert 
dtio_stop_ev_cb(int ATTR_UNUSED (fd),short bits,void * arg)173425039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
173525039b37SCy Schubert {
173625039b37SCy Schubert 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
173725039b37SCy Schubert 	struct dt_io_thread* dtio = info->dtio;
173825039b37SCy Schubert 	if(info->want_to_exit_flush)
173925039b37SCy Schubert 		return;
174025039b37SCy Schubert 	if(dtio->check_nb_connect) {
174125039b37SCy Schubert 		/* we don't start the stop_flush if connect still
174225039b37SCy Schubert 		 * in progress, but the check code is here, just in case */
174325039b37SCy Schubert 		int connect_err = dtio_check_nb_connect(dtio);
174425039b37SCy Schubert 		if(connect_err == -1) {
174525039b37SCy Schubert 			/* close the channel, exit the stop flush */
174625039b37SCy Schubert 			dtio_stop_flush_exit(info);
174725039b37SCy Schubert 			dtio_del_output_event(dtio);
174825039b37SCy Schubert 			dtio_close_output(dtio);
174925039b37SCy Schubert 			return;
175025039b37SCy Schubert 		} else if(connect_err == 0) {
175125039b37SCy Schubert 			/* try again later */
175225039b37SCy Schubert 			return;
175325039b37SCy Schubert 		}
175425039b37SCy Schubert 		/* nonblocking connect check passed, continue */
175525039b37SCy Schubert 	}
175625039b37SCy Schubert #ifdef HAVE_SSL
175725039b37SCy Schubert 	if(dtio->ssl &&
175825039b37SCy Schubert 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
175925039b37SCy Schubert 		if(!dtio_ssl_handshake(dtio, info))
176025039b37SCy Schubert 			return;
176125039b37SCy Schubert 	}
176225039b37SCy Schubert #endif
176325039b37SCy Schubert 
176425039b37SCy Schubert 	if((bits&UB_EV_READ)) {
176525039b37SCy Schubert 		if(!dtio_check_close(dtio)) {
176625039b37SCy Schubert 			if(dtio->fd == -1) {
176725039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io: "
176825039b37SCy Schubert 					"stop flush: output closed");
176925039b37SCy Schubert 				dtio_stop_flush_exit(info);
177025039b37SCy Schubert 			}
177125039b37SCy Schubert 			return;
177225039b37SCy Schubert 		}
177325039b37SCy Schubert 	}
177425039b37SCy Schubert 	/* write remainder of last frame */
177525039b37SCy Schubert 	if(dtio->cur_msg) {
177625039b37SCy Schubert 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
177725039b37SCy Schubert 			if(!dtio_write_more(dtio)) {
177825039b37SCy Schubert 				if(dtio->fd == -1) {
177925039b37SCy Schubert 					verbose(VERB_ALGO, "dnstap io: "
178025039b37SCy Schubert 						"stop flush: output closed");
178125039b37SCy Schubert 					dtio_stop_flush_exit(info);
178225039b37SCy Schubert 				}
178325039b37SCy Schubert 				return;
178425039b37SCy Schubert 			}
178525039b37SCy Schubert 		}
178625039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
178725039b37SCy Schubert 			"last frame");
178825039b37SCy Schubert 		dtio_cur_msg_free(dtio);
178925039b37SCy Schubert 	}
179025039b37SCy Schubert 	/* write stop frame */
179125039b37SCy Schubert 	if(info->stop_frame_done < info->stop_frame_len) {
179225039b37SCy Schubert 		if(!dtio_control_stop_send(info))
179325039b37SCy Schubert 			return;
179425039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
179525039b37SCy Schubert 			"stop control frame");
179625039b37SCy Schubert 	}
179725039b37SCy Schubert 	/* when last frame and stop frame are sent, exit */
179825039b37SCy Schubert 	dtio_stop_flush_exit(info);
179925039b37SCy Schubert }
180025039b37SCy Schubert 
180125039b37SCy Schubert /** flush at end, last packet and stop control */
dtio_control_stop_flush(struct dt_io_thread * dtio)180225039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio)
180325039b37SCy Schubert {
180425039b37SCy Schubert 	/* briefly attempt to flush the previous packet to the output,
180525039b37SCy Schubert 	 * this could be a partial packet, or even the start control frame */
180625039b37SCy Schubert 	time_t secs = 0;
180725039b37SCy Schubert 	struct timeval now;
180825039b37SCy Schubert 	struct stop_flush_info info;
180925039b37SCy Schubert 	struct timeval tv;
181025039b37SCy Schubert 	struct ub_event* timer, *stopev;
181125039b37SCy Schubert 
181225039b37SCy Schubert 	if(dtio->fd == -1 || dtio->check_nb_connect) {
181325039b37SCy Schubert 		/* no connection or we have just connected, so nothing is
181425039b37SCy Schubert 		 * sent yet, so nothing to stop or flush */
181525039b37SCy Schubert 		return;
181625039b37SCy Schubert 	}
181725039b37SCy Schubert 	if(dtio->ssl && !dtio->ssl_handshake_done) {
181825039b37SCy Schubert 		/* no SSL connection has been established yet */
181925039b37SCy Schubert 		return;
182025039b37SCy Schubert 	}
182125039b37SCy Schubert 
182225039b37SCy Schubert 	memset(&info, 0, sizeof(info));
182325039b37SCy Schubert 	memset(&now, 0, sizeof(now));
182425039b37SCy Schubert 	info.dtio = dtio;
182525039b37SCy Schubert 	info.base = ub_default_event_base(0, &secs, &now);
182625039b37SCy Schubert 	if(!info.base) {
182725039b37SCy Schubert 		log_err("dnstap io: malloc failure");
182825039b37SCy Schubert 		return;
182925039b37SCy Schubert 	}
183025039b37SCy Schubert 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
183125039b37SCy Schubert 		&dtio_stop_timer_cb, &info);
183225039b37SCy Schubert 	if(!timer) {
183325039b37SCy Schubert 		log_err("dnstap io: malloc failure");
183425039b37SCy Schubert 		ub_event_base_free(info.base);
183525039b37SCy Schubert 		return;
183625039b37SCy Schubert 	}
183725039b37SCy Schubert 	memset(&tv, 0, sizeof(tv));
183825039b37SCy Schubert 	tv.tv_sec = 2;
183925039b37SCy Schubert 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
184025039b37SCy Schubert 		&tv) != 0) {
184125039b37SCy Schubert 		log_err("dnstap io: cannot event_timer_add");
184225039b37SCy Schubert 		ub_event_free(timer);
184325039b37SCy Schubert 		ub_event_base_free(info.base);
184425039b37SCy Schubert 		return;
184525039b37SCy Schubert 	}
184625039b37SCy Schubert 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
184725039b37SCy Schubert 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
184825039b37SCy Schubert 	if(!stopev) {
184925039b37SCy Schubert 		log_err("dnstap io: malloc failure");
185025039b37SCy Schubert 		ub_timer_del(timer);
185125039b37SCy Schubert 		ub_event_free(timer);
185225039b37SCy Schubert 		ub_event_base_free(info.base);
185325039b37SCy Schubert 		return;
185425039b37SCy Schubert 	}
185525039b37SCy Schubert 	if(ub_event_add(stopev, NULL) != 0) {
185625039b37SCy Schubert 		log_err("dnstap io: cannot event_add");
185725039b37SCy Schubert 		ub_event_free(stopev);
185825039b37SCy Schubert 		ub_timer_del(timer);
185925039b37SCy Schubert 		ub_event_free(timer);
186025039b37SCy Schubert 		ub_event_base_free(info.base);
186125039b37SCy Schubert 		return;
186225039b37SCy Schubert 	}
186325039b37SCy Schubert 	info.stop_frame = fstrm_create_control_frame_stop(
186425039b37SCy Schubert 		&info.stop_frame_len);
186525039b37SCy Schubert 	if(!info.stop_frame) {
186625039b37SCy Schubert 		log_err("dnstap io: malloc failure");
186725039b37SCy Schubert 		ub_event_del(stopev);
186825039b37SCy Schubert 		ub_event_free(stopev);
186925039b37SCy Schubert 		ub_timer_del(timer);
187025039b37SCy Schubert 		ub_event_free(timer);
187125039b37SCy Schubert 		ub_event_base_free(info.base);
187225039b37SCy Schubert 		return;
187325039b37SCy Schubert 	}
187425039b37SCy Schubert 	dtio->stop_flush_event = stopev;
187525039b37SCy Schubert 
187625039b37SCy Schubert 	/* wait briefly, or until finished */
187725039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush started");
187825039b37SCy Schubert 	if(ub_event_base_dispatch(info.base) < 0) {
187925039b37SCy Schubert 		log_err("dnstap io: dispatch flush failed, errno is %s",
188025039b37SCy Schubert 			strerror(errno));
188125039b37SCy Schubert 	}
188225039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
188325039b37SCy Schubert 	free(info.stop_frame);
188425039b37SCy Schubert 	dtio->stop_flush_event = NULL;
188525039b37SCy Schubert 	ub_event_del(stopev);
188625039b37SCy Schubert 	ub_event_free(stopev);
188725039b37SCy Schubert 	ub_timer_del(timer);
188825039b37SCy Schubert 	ub_event_free(timer);
188925039b37SCy Schubert 	ub_event_base_free(info.base);
189025039b37SCy Schubert }
189125039b37SCy Schubert 
189225039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */
dtio_desetup(struct dt_io_thread * dtio)189325039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio)
189425039b37SCy Schubert {
189525039b37SCy Schubert 	dtio_control_stop_flush(dtio);
189625039b37SCy Schubert 	dtio_del_output_event(dtio);
189725039b37SCy Schubert 	dtio_close_output(dtio);
189825039b37SCy Schubert 	ub_event_del(dtio->command_event);
189925039b37SCy Schubert 	ub_event_free(dtio->command_event);
190025039b37SCy Schubert #ifndef USE_WINSOCK
190125039b37SCy Schubert 	close(dtio->commandpipe[0]);
190225039b37SCy Schubert #else
190325039b37SCy Schubert 	_close(dtio->commandpipe[0]);
190425039b37SCy Schubert #endif
190525039b37SCy Schubert 	dtio->commandpipe[0] = -1;
190625039b37SCy Schubert 	dtio_reconnect_del(dtio);
190725039b37SCy Schubert 	ub_event_free(dtio->reconnect_timer);
190825039b37SCy Schubert 	dtio_cur_msg_free(dtio);
190925039b37SCy Schubert #ifndef THREADS_DISABLED
191025039b37SCy Schubert 	ub_event_base_free(dtio->event_base);
191125039b37SCy Schubert #endif
191225039b37SCy Schubert }
191325039b37SCy Schubert 
191425039b37SCy Schubert /** setup a start control message */
dtio_control_start_send(struct dt_io_thread * dtio)191525039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio)
191625039b37SCy Schubert {
191725039b37SCy Schubert 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
191825039b37SCy Schubert 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
191925039b37SCy Schubert 		&dtio->cur_msg_len);
192025039b37SCy Schubert 	if(!dtio->cur_msg) {
192125039b37SCy Schubert 		return 0;
192225039b37SCy Schubert 	}
192325039b37SCy Schubert 	/* setup to send the control message */
192425039b37SCy Schubert 	/* set that the buffer needs to be sent, but the length
192525039b37SCy Schubert 	 * of that buffer is already written, that way the buffer can
192625039b37SCy Schubert 	 * start with 0 length and then the length of the control frame
192725039b37SCy Schubert 	 * in it */
192825039b37SCy Schubert 	dtio->cur_msg_done = 0;
192925039b37SCy Schubert 	dtio->cur_msg_len_done = 4;
193025039b37SCy Schubert 	return 1;
193125039b37SCy Schubert }
193225039b37SCy Schubert 
193325039b37SCy Schubert /** setup a ready control message */
dtio_control_ready_send(struct dt_io_thread * dtio)193425039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio)
193525039b37SCy Schubert {
193625039b37SCy Schubert 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
193725039b37SCy Schubert 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
193825039b37SCy Schubert 		&dtio->cur_msg_len);
193925039b37SCy Schubert 	if(!dtio->cur_msg) {
194025039b37SCy Schubert 		return 0;
194125039b37SCy Schubert 	}
194225039b37SCy Schubert 	/* setup to send the control message */
194325039b37SCy Schubert 	/* set that the buffer needs to be sent, but the length
194425039b37SCy Schubert 	 * of that buffer is already written, that way the buffer can
194525039b37SCy Schubert 	 * start with 0 length and then the length of the control frame
194625039b37SCy Schubert 	 * in it */
194725039b37SCy Schubert 	dtio->cur_msg_done = 0;
194825039b37SCy Schubert 	dtio->cur_msg_len_done = 4;
194925039b37SCy Schubert 	return 1;
195025039b37SCy Schubert }
195125039b37SCy Schubert 
195225039b37SCy Schubert /** open the output file descriptor for af_local */
dtio_open_output_local(struct dt_io_thread * dtio)195325039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio)
195425039b37SCy Schubert {
195525039b37SCy Schubert #ifdef HAVE_SYS_UN_H
195625039b37SCy Schubert 	struct sockaddr_un s;
195725039b37SCy Schubert 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
195825039b37SCy Schubert 	if(dtio->fd == -1) {
195925039b37SCy Schubert 		log_err("dnstap io: failed to create socket: %s",
1960c0caa2e2SCy Schubert 			sock_strerror(errno));
196125039b37SCy Schubert 		return 0;
196225039b37SCy Schubert 	}
196325039b37SCy Schubert 	memset(&s, 0, sizeof(s));
196425039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
196525039b37SCy Schubert         /* this member exists on BSDs, not Linux */
196625039b37SCy Schubert         s.sun_len = (unsigned)sizeof(s);
196725039b37SCy Schubert #endif
196825039b37SCy Schubert 	s.sun_family = AF_LOCAL;
196925039b37SCy Schubert 	/* length is 92-108, 104 on FreeBSD */
197025039b37SCy Schubert         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
197125039b37SCy Schubert 	fd_set_nonblock(dtio->fd);
197225039b37SCy Schubert 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
197325039b37SCy Schubert 		== -1) {
197425039b37SCy Schubert 		char* to = dtio->socket_path;
1975c0caa2e2SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1976c0caa2e2SCy Schubert 			verbosity < 4) {
1977c0caa2e2SCy Schubert 			dtio_close_fd(dtio);
1978c0caa2e2SCy Schubert 			return 0; /* no log retries on low verbosity */
1979c0caa2e2SCy Schubert 		}
198025039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
1981c0caa2e2SCy Schubert 			to, sock_strerror(errno));
198225039b37SCy Schubert 		dtio_close_fd(dtio);
198325039b37SCy Schubert 		return 0;
198425039b37SCy Schubert 	}
198525039b37SCy Schubert 	return 1;
198625039b37SCy Schubert #else
198725039b37SCy Schubert 	log_err("cannot create af_local socket");
198825039b37SCy Schubert 	return 0;
198925039b37SCy Schubert #endif /* HAVE_SYS_UN_H */
199025039b37SCy Schubert }
199125039b37SCy Schubert 
199225039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */
dtio_open_output_tcp(struct dt_io_thread * dtio)199325039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio)
199425039b37SCy Schubert {
199525039b37SCy Schubert 	struct sockaddr_storage addr;
199625039b37SCy Schubert 	socklen_t addrlen;
199725039b37SCy Schubert 	memset(&addr, 0, sizeof(addr));
199825039b37SCy Schubert 	addrlen = (socklen_t)sizeof(addr);
199925039b37SCy Schubert 
2000865f46b2SCy Schubert 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
200125039b37SCy Schubert 		log_err("could not parse IP '%s'", dtio->ip_str);
200225039b37SCy Schubert 		return 0;
200325039b37SCy Schubert 	}
200425039b37SCy Schubert 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
200525039b37SCy Schubert 	if(dtio->fd == -1) {
2006c0caa2e2SCy Schubert 		log_err("can't create socket: %s", sock_strerror(errno));
200725039b37SCy Schubert 		return 0;
200825039b37SCy Schubert 	}
200925039b37SCy Schubert 	fd_set_nonblock(dtio->fd);
201025039b37SCy Schubert 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
201125039b37SCy Schubert 		if(errno == EINPROGRESS)
201225039b37SCy Schubert 			return 1; /* wait until connect done*/
2013c0caa2e2SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
2014c0caa2e2SCy Schubert 			verbosity < 4) {
2015c0caa2e2SCy Schubert 			dtio_close_fd(dtio);
2016c0caa2e2SCy Schubert 			return 0; /* no log retries on low verbosity */
2017c0caa2e2SCy Schubert 		}
201825039b37SCy Schubert #ifndef USE_WINSOCK
201925039b37SCy Schubert 		if(tcp_connect_errno_needs_log(
202025039b37SCy Schubert 			(struct sockaddr *)&addr, addrlen)) {
202125039b37SCy Schubert 			log_err("dnstap io: failed to connect to %s: %s",
202225039b37SCy Schubert 				dtio->ip_str, strerror(errno));
202325039b37SCy Schubert 		}
202425039b37SCy Schubert #else
202525039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS ||
202625039b37SCy Schubert 			WSAGetLastError() == WSAEWOULDBLOCK)
202725039b37SCy Schubert 			return 1; /* wait until connect done*/
202825039b37SCy Schubert 		if(tcp_connect_errno_needs_log(
202925039b37SCy Schubert 			(struct sockaddr *)&addr, addrlen)) {
203025039b37SCy Schubert 			log_err("dnstap io: failed to connect to %s: %s",
203125039b37SCy Schubert 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
203225039b37SCy Schubert 		}
203325039b37SCy Schubert #endif
203425039b37SCy Schubert 		dtio_close_fd(dtio);
203525039b37SCy Schubert 		return 0;
203625039b37SCy Schubert 	}
203725039b37SCy Schubert 	return 1;
203825039b37SCy Schubert }
203925039b37SCy Schubert 
204025039b37SCy Schubert /** setup the SSL structure for new connection */
dtio_setup_ssl(struct dt_io_thread * dtio)204125039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio)
204225039b37SCy Schubert {
204325039b37SCy Schubert 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
204425039b37SCy Schubert 	if(!dtio->ssl) return 0;
204525039b37SCy Schubert 	dtio->ssl_handshake_done = 0;
204625039b37SCy Schubert 	dtio->ssl_brief_read = 0;
204725039b37SCy Schubert 
204825039b37SCy Schubert 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
204925039b37SCy Schubert 		dtio->tls_use_sni)) {
205025039b37SCy Schubert 		return 0;
205125039b37SCy Schubert 	}
205225039b37SCy Schubert 	return 1;
205325039b37SCy Schubert }
205425039b37SCy Schubert 
205525039b37SCy Schubert /** open the output file descriptor */
dtio_open_output(struct dt_io_thread * dtio)205625039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio)
205725039b37SCy Schubert {
205825039b37SCy Schubert 	struct ub_event* ev;
205925039b37SCy Schubert 	if(dtio->upstream_is_unix) {
206025039b37SCy Schubert 		if(!dtio_open_output_local(dtio)) {
206125039b37SCy Schubert 			dtio_reconnect_enable(dtio);
206225039b37SCy Schubert 			return;
206325039b37SCy Schubert 		}
206425039b37SCy Schubert 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
206525039b37SCy Schubert 		if(!dtio_open_output_tcp(dtio)) {
206625039b37SCy Schubert 			dtio_reconnect_enable(dtio);
206725039b37SCy Schubert 			return;
206825039b37SCy Schubert 		}
206925039b37SCy Schubert 		if(dtio->upstream_is_tls) {
207025039b37SCy Schubert 			if(!dtio_setup_ssl(dtio)) {
207125039b37SCy Schubert 				dtio_close_fd(dtio);
207225039b37SCy Schubert 				dtio_reconnect_enable(dtio);
207325039b37SCy Schubert 				return;
207425039b37SCy Schubert 			}
207525039b37SCy Schubert 		}
207625039b37SCy Schubert 	}
207725039b37SCy Schubert 	dtio->check_nb_connect = 1;
207825039b37SCy Schubert 
207925039b37SCy Schubert 	/* the EV_READ is to read ACCEPT control messages, and catch channel
208025039b37SCy Schubert 	 * close. EV_WRITE is to write packets */
208125039b37SCy Schubert 	ev = ub_event_new(dtio->event_base, dtio->fd,
208225039b37SCy Schubert 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
208325039b37SCy Schubert 		dtio);
208425039b37SCy Schubert 	if(!ev) {
208525039b37SCy Schubert 		log_err("dnstap io: out of memory");
208625039b37SCy Schubert 		if(dtio->ssl) {
208725039b37SCy Schubert #ifdef HAVE_SSL
208825039b37SCy Schubert 			SSL_free(dtio->ssl);
208925039b37SCy Schubert 			dtio->ssl = NULL;
209025039b37SCy Schubert #endif
209125039b37SCy Schubert 		}
209225039b37SCy Schubert 		dtio_close_fd(dtio);
209325039b37SCy Schubert 		dtio_reconnect_enable(dtio);
209425039b37SCy Schubert 		return;
209525039b37SCy Schubert 	}
209625039b37SCy Schubert 	dtio->event = ev;
209725039b37SCy Schubert 
209825039b37SCy Schubert 	/* setup protocol control message to start */
209925039b37SCy Schubert 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
210025039b37SCy Schubert 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
210125039b37SCy Schubert 		log_err("dnstap io: out of memory");
210225039b37SCy Schubert 		ub_event_free(dtio->event);
210325039b37SCy Schubert 		dtio->event = NULL;
210425039b37SCy Schubert 		if(dtio->ssl) {
210525039b37SCy Schubert #ifdef HAVE_SSL
210625039b37SCy Schubert 			SSL_free(dtio->ssl);
210725039b37SCy Schubert 			dtio->ssl = NULL;
210825039b37SCy Schubert #endif
210925039b37SCy Schubert 		}
211025039b37SCy Schubert 		dtio_close_fd(dtio);
211125039b37SCy Schubert 		dtio_reconnect_enable(dtio);
211225039b37SCy Schubert 		return;
211325039b37SCy Schubert 	}
211425039b37SCy Schubert }
211525039b37SCy Schubert 
211625039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */
dtio_setup_on_base(struct dt_io_thread * dtio)211725039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio)
211825039b37SCy Schubert {
211925039b37SCy Schubert 	dtio_setup_cmd(dtio);
212025039b37SCy Schubert 	dtio_setup_reconnect(dtio);
212125039b37SCy Schubert 	dtio_open_output(dtio);
212225039b37SCy Schubert 	if(!dtio_add_output_event_write(dtio))
212325039b37SCy Schubert 		return;
212425039b37SCy Schubert }
212525039b37SCy Schubert 
212625039b37SCy Schubert #ifndef THREADS_DISABLED
212725039b37SCy Schubert /** the IO thread function for the DNSTAP IO */
dnstap_io(void * arg)212825039b37SCy Schubert static void* dnstap_io(void* arg)
212925039b37SCy Schubert {
213025039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
213125039b37SCy Schubert 	time_t secs = 0;
213225039b37SCy Schubert 	struct timeval now;
213325039b37SCy Schubert 	log_thread_set(&dtio->threadnum);
213425039b37SCy Schubert 
213525039b37SCy Schubert 	/* setup */
213625039b37SCy Schubert 	verbose(VERB_ALGO, "start dnstap io thread");
213725039b37SCy Schubert 	dtio_setup_base(dtio, &secs, &now);
213825039b37SCy Schubert 	dtio_setup_on_base(dtio);
213925039b37SCy Schubert 
214025039b37SCy Schubert 	/* run */
214125039b37SCy Schubert 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
214225039b37SCy Schubert 		log_err("dnstap io: dispatch failed, errno is %s",
214325039b37SCy Schubert 			strerror(errno));
214425039b37SCy Schubert 	}
214525039b37SCy Schubert 
214625039b37SCy Schubert 	/* cleanup */
214725039b37SCy Schubert 	verbose(VERB_ALGO, "stop dnstap io thread");
214825039b37SCy Schubert 	dtio_desetup(dtio);
214925039b37SCy Schubert 	return NULL;
215025039b37SCy Schubert }
215125039b37SCy Schubert #endif /* THREADS_DISABLED */
215225039b37SCy Schubert 
dt_io_thread_start(struct dt_io_thread * dtio,void * event_base_nothr,int numworkers)215325039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
215425039b37SCy Schubert 	int numworkers)
215525039b37SCy Schubert {
215625039b37SCy Schubert 	/* set up the thread, can fail */
215725039b37SCy Schubert #ifndef USE_WINSOCK
215825039b37SCy Schubert 	if(pipe(dtio->commandpipe) == -1) {
215925039b37SCy Schubert 		log_err("failed to create pipe: %s", strerror(errno));
216025039b37SCy Schubert 		return 0;
216125039b37SCy Schubert 	}
216225039b37SCy Schubert #else
216325039b37SCy Schubert 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
216425039b37SCy Schubert 		log_err("failed to create _pipe: %s",
216525039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
216625039b37SCy Schubert 		return 0;
216725039b37SCy Schubert 	}
216825039b37SCy Schubert #endif
216925039b37SCy Schubert 
217025039b37SCy Schubert 	/* start the thread */
217125039b37SCy Schubert 	dtio->threadnum = numworkers+1;
217225039b37SCy Schubert 	dtio->started = 1;
217325039b37SCy Schubert #ifndef THREADS_DISABLED
217425039b37SCy Schubert 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
217525039b37SCy Schubert 	(void)event_base_nothr;
217625039b37SCy Schubert #else
217725039b37SCy Schubert 	dtio->event_base = event_base_nothr;
217825039b37SCy Schubert 	dtio_setup_on_base(dtio);
217925039b37SCy Schubert #endif
218025039b37SCy Schubert 	return 1;
218125039b37SCy Schubert }
218225039b37SCy Schubert 
dt_io_thread_stop(struct dt_io_thread * dtio)218325039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio)
218425039b37SCy Schubert {
218525039b37SCy Schubert #ifndef THREADS_DISABLED
218625039b37SCy Schubert 	uint8_t cmd = DTIO_COMMAND_STOP;
218725039b37SCy Schubert #endif
218825039b37SCy Schubert 	if(!dtio) return;
218925039b37SCy Schubert 	if(!dtio->started) return;
219025039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
219125039b37SCy Schubert 
219225039b37SCy Schubert #ifndef THREADS_DISABLED
219325039b37SCy Schubert 	while(1) {
219425039b37SCy Schubert 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
219525039b37SCy Schubert 		if(r == -1) {
219625039b37SCy Schubert #ifndef USE_WINSOCK
219725039b37SCy Schubert 			if(errno == EINTR || errno == EAGAIN)
219825039b37SCy Schubert 				continue;
219925039b37SCy Schubert #else
220025039b37SCy Schubert 			if(WSAGetLastError() == WSAEINPROGRESS)
220125039b37SCy Schubert 				continue;
220225039b37SCy Schubert 			if(WSAGetLastError() == WSAEWOULDBLOCK)
220325039b37SCy Schubert 				continue;
220425039b37SCy Schubert #endif
2205c0caa2e2SCy Schubert 			log_err("dnstap io stop: write: %s",
2206c0caa2e2SCy Schubert 				sock_strerror(errno));
220725039b37SCy Schubert 			break;
220825039b37SCy Schubert 		}
220925039b37SCy Schubert 		break;
221025039b37SCy Schubert 	}
221125039b37SCy Schubert 	dtio->started = 0;
221225039b37SCy Schubert #endif /* THREADS_DISABLED */
221325039b37SCy Schubert 
221425039b37SCy Schubert #ifndef USE_WINSOCK
221525039b37SCy Schubert 	close(dtio->commandpipe[1]);
221625039b37SCy Schubert #else
221725039b37SCy Schubert 	_close(dtio->commandpipe[1]);
221825039b37SCy Schubert #endif
221925039b37SCy Schubert 	dtio->commandpipe[1] = -1;
222025039b37SCy Schubert #ifndef THREADS_DISABLED
222125039b37SCy Schubert 	ub_thread_join(dtio->tid);
222225039b37SCy Schubert #else
222325039b37SCy Schubert 	dtio->want_to_exit = 1;
222425039b37SCy Schubert 	dtio_desetup(dtio);
222525039b37SCy Schubert #endif
222625039b37SCy Schubert }
2227