xref: /freebsd/contrib/unbound/dnstap/dtstream.c (revision 25039b37d3883b8fdae50475cbea41a255a08ee2)
1*25039b37SCy Schubert /*
2*25039b37SCy Schubert  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3*25039b37SCy Schubert  *
4*25039b37SCy Schubert  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5*25039b37SCy Schubert  *
6*25039b37SCy Schubert  * This software is open source.
7*25039b37SCy Schubert  *
8*25039b37SCy Schubert  * Redistribution and use in source and binary forms, with or without
9*25039b37SCy Schubert  * modification, are permitted provided that the following conditions
10*25039b37SCy Schubert  * are met:
11*25039b37SCy Schubert  *
12*25039b37SCy Schubert  * Redistributions of source code must retain the above copyright notice,
13*25039b37SCy Schubert  * this list of conditions and the following disclaimer.
14*25039b37SCy Schubert  *
15*25039b37SCy Schubert  * Redistributions in binary form must reproduce the above copyright notice,
16*25039b37SCy Schubert  * this list of conditions and the following disclaimer in the documentation
17*25039b37SCy Schubert  * and/or other materials provided with the distribution.
18*25039b37SCy Schubert  *
19*25039b37SCy Schubert  * Neither the name of the NLNET LABS nor the names of its contributors may
20*25039b37SCy Schubert  * be used to endorse or promote products derived from this software without
21*25039b37SCy Schubert  * specific prior written permission.
22*25039b37SCy Schubert  *
23*25039b37SCy Schubert  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24*25039b37SCy Schubert  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25*25039b37SCy Schubert  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26*25039b37SCy Schubert  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27*25039b37SCy Schubert  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28*25039b37SCy Schubert  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29*25039b37SCy Schubert  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30*25039b37SCy Schubert  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31*25039b37SCy Schubert  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32*25039b37SCy Schubert  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33*25039b37SCy Schubert  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34*25039b37SCy Schubert  *
35*25039b37SCy Schubert  */
36*25039b37SCy Schubert 
37*25039b37SCy Schubert /**
38*25039b37SCy Schubert  * \file
39*25039b37SCy Schubert  *
40*25039b37SCy Schubert  * An implementation of the Frame Streams data transport protocol for
41*25039b37SCy Schubert  * the Unbound DNSTAP message logging facility.
42*25039b37SCy Schubert  */
43*25039b37SCy Schubert 
44*25039b37SCy Schubert #include "config.h"
45*25039b37SCy Schubert #include "dnstap/dtstream.h"
46*25039b37SCy Schubert #include "dnstap/dnstap_fstrm.h"
47*25039b37SCy Schubert #include "util/config_file.h"
48*25039b37SCy Schubert #include "util/ub_event.h"
49*25039b37SCy Schubert #include "util/net_help.h"
50*25039b37SCy Schubert #include "services/outside_network.h"
51*25039b37SCy Schubert #include "sldns/sbuffer.h"
52*25039b37SCy Schubert #ifdef HAVE_SYS_UN_H
53*25039b37SCy Schubert #include <sys/un.h>
54*25039b37SCy Schubert #endif
55*25039b37SCy Schubert #include <fcntl.h>
56*25039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H
57*25039b37SCy Schubert #include <openssl/ssl.h>
58*25039b37SCy Schubert #endif
59*25039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H
60*25039b37SCy Schubert #include <openssl/err.h>
61*25039b37SCy Schubert #endif
62*25039b37SCy Schubert 
63*25039b37SCy Schubert /** number of messages to process in one output callback */
64*25039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100
65*25039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */
66*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10
67*25039b37SCy Schubert /** the msec to wait for reconnect max after backoff */
68*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69*25039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71*25039b37SCy Schubert 
72*25039b37SCy Schubert /** maximum length of received frame */
73*25039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000
74*25039b37SCy Schubert 
75*25039b37SCy Schubert struct stop_flush_info;
76*25039b37SCy Schubert /** DTIO command channel commands */
77*25039b37SCy Schubert enum {
78*25039b37SCy Schubert 	/** DTIO command channel stop */
79*25039b37SCy Schubert 	DTIO_COMMAND_STOP = 0,
80*25039b37SCy Schubert 	/** DTIO command channel wakeup */
81*25039b37SCy Schubert 	DTIO_COMMAND_WAKEUP = 1
82*25039b37SCy Schubert } dtio_channel_command;
83*25039b37SCy Schubert 
84*25039b37SCy Schubert /** open the output channel */
85*25039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio);
86*25039b37SCy Schubert /** add output event for read and write */
87*25039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio);
88*25039b37SCy Schubert /** start reconnection attempts */
89*25039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio);
90*25039b37SCy Schubert /** stop from stop_flush event loop */
91*25039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info);
92*25039b37SCy Schubert /** setup a start control message */
93*25039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio);
94*25039b37SCy Schubert #ifdef HAVE_SSL
95*25039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */
96*25039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio);
97*25039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */
98*25039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio);
99*25039b37SCy Schubert #endif
100*25039b37SCy Schubert 
101*25039b37SCy Schubert struct dt_msg_queue*
102*25039b37SCy Schubert dt_msg_queue_create(void)
103*25039b37SCy Schubert {
104*25039b37SCy Schubert 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
105*25039b37SCy Schubert 	if(!mq) return NULL;
106*25039b37SCy Schubert 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
107*25039b37SCy Schubert 		about 1 M should contain 64K messages with some overhead,
108*25039b37SCy Schubert 		or a whole bunch smaller ones */
109*25039b37SCy Schubert 	lock_basic_init(&mq->lock);
110*25039b37SCy Schubert 	lock_protect(&mq->lock, mq, sizeof(*mq));
111*25039b37SCy Schubert 	return mq;
112*25039b37SCy Schubert }
113*25039b37SCy Schubert 
114*25039b37SCy Schubert /** clear the message list, caller must hold the lock */
115*25039b37SCy Schubert static void
116*25039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq)
117*25039b37SCy Schubert {
118*25039b37SCy Schubert 	struct dt_msg_entry* e = mq->first, *next=NULL;
119*25039b37SCy Schubert 	while(e) {
120*25039b37SCy Schubert 		next = e->next;
121*25039b37SCy Schubert 		free(e->buf);
122*25039b37SCy Schubert 		free(e);
123*25039b37SCy Schubert 		e = next;
124*25039b37SCy Schubert 	}
125*25039b37SCy Schubert 	mq->first = NULL;
126*25039b37SCy Schubert 	mq->last = NULL;
127*25039b37SCy Schubert 	mq->cursize = 0;
128*25039b37SCy Schubert }
129*25039b37SCy Schubert 
130*25039b37SCy Schubert void
131*25039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq)
132*25039b37SCy Schubert {
133*25039b37SCy Schubert 	if(!mq) return;
134*25039b37SCy Schubert 	lock_basic_destroy(&mq->lock);
135*25039b37SCy Schubert 	dt_msg_queue_clear(mq);
136*25039b37SCy Schubert 	free(mq);
137*25039b37SCy Schubert }
138*25039b37SCy Schubert 
139*25039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */
140*25039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio)
141*25039b37SCy Schubert {
142*25039b37SCy Schubert 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
143*25039b37SCy Schubert 	if(!dtio) return;
144*25039b37SCy Schubert 	if(!dtio->started) return;
145*25039b37SCy Schubert 
146*25039b37SCy Schubert 	while(1) {
147*25039b37SCy Schubert 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
148*25039b37SCy Schubert 		if(r == -1) {
149*25039b37SCy Schubert #ifndef USE_WINSOCK
150*25039b37SCy Schubert 			if(errno == EINTR || errno == EAGAIN)
151*25039b37SCy Schubert 				continue;
152*25039b37SCy Schubert 			log_err("dnstap io wakeup: write: %s", strerror(errno));
153*25039b37SCy Schubert #else
154*25039b37SCy Schubert 			if(WSAGetLastError() == WSAEINPROGRESS)
155*25039b37SCy Schubert 				continue;
156*25039b37SCy Schubert 			if(WSAGetLastError() == WSAEWOULDBLOCK)
157*25039b37SCy Schubert 				continue;
158*25039b37SCy Schubert 			log_err("dnstap io stop: write: %s",
159*25039b37SCy Schubert 				wsa_strerror(WSAGetLastError()));
160*25039b37SCy Schubert #endif
161*25039b37SCy Schubert 			break;
162*25039b37SCy Schubert 		}
163*25039b37SCy Schubert 		break;
164*25039b37SCy Schubert 	}
165*25039b37SCy Schubert }
166*25039b37SCy Schubert 
167*25039b37SCy Schubert void
168*25039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
169*25039b37SCy Schubert {
170*25039b37SCy Schubert 	int wakeup = 0;
171*25039b37SCy Schubert 	struct dt_msg_entry* entry;
172*25039b37SCy Schubert 
173*25039b37SCy Schubert 	/* check conditions */
174*25039b37SCy Schubert 	if(!buf) return;
175*25039b37SCy Schubert 	if(len == 0) {
176*25039b37SCy Schubert 		/* it is not possible to log entries with zero length,
177*25039b37SCy Schubert 		 * because the framestream protocol does not carry it.
178*25039b37SCy Schubert 		 * However the protobuf serialization does not create zero
179*25039b37SCy Schubert 		 * length datagrams for dnstap, so this should not happen. */
180*25039b37SCy Schubert 		free(buf);
181*25039b37SCy Schubert 		return;
182*25039b37SCy Schubert 	}
183*25039b37SCy Schubert 	if(!mq) {
184*25039b37SCy Schubert 		free(buf);
185*25039b37SCy Schubert 		return;
186*25039b37SCy Schubert 	}
187*25039b37SCy Schubert 
188*25039b37SCy Schubert 	/* allocate memory for queue entry */
189*25039b37SCy Schubert 	entry = malloc(sizeof(*entry));
190*25039b37SCy Schubert 	if(!entry) {
191*25039b37SCy Schubert 		log_err("out of memory logging dnstap");
192*25039b37SCy Schubert 		free(buf);
193*25039b37SCy Schubert 		return;
194*25039b37SCy Schubert 	}
195*25039b37SCy Schubert 	entry->next = NULL;
196*25039b37SCy Schubert 	entry->buf = buf;
197*25039b37SCy Schubert 	entry->len = len;
198*25039b37SCy Schubert 
199*25039b37SCy Schubert 	/* aqcuire lock */
200*25039b37SCy Schubert 	lock_basic_lock(&mq->lock);
201*25039b37SCy Schubert 	/* list was empty, wakeup dtio */
202*25039b37SCy Schubert 	if(mq->first == NULL)
203*25039b37SCy Schubert 		wakeup = 1;
204*25039b37SCy Schubert 	/* see if it is going to fit */
205*25039b37SCy Schubert 	if(mq->cursize + len > mq->maxsize) {
206*25039b37SCy Schubert 		/* buffer full, or congested. */
207*25039b37SCy Schubert 		/* drop */
208*25039b37SCy Schubert 		lock_basic_unlock(&mq->lock);
209*25039b37SCy Schubert 		free(buf);
210*25039b37SCy Schubert 		free(entry);
211*25039b37SCy Schubert 		return;
212*25039b37SCy Schubert 	}
213*25039b37SCy Schubert 	mq->cursize += len;
214*25039b37SCy Schubert 	/* append to list */
215*25039b37SCy Schubert 	if(mq->last) {
216*25039b37SCy Schubert 		mq->last->next = entry;
217*25039b37SCy Schubert 	} else {
218*25039b37SCy Schubert 		mq->first = entry;
219*25039b37SCy Schubert 	}
220*25039b37SCy Schubert 	mq->last = entry;
221*25039b37SCy Schubert 	/* release lock */
222*25039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
223*25039b37SCy Schubert 
224*25039b37SCy Schubert 	if(wakeup)
225*25039b37SCy Schubert 		dtio_wakeup(mq->dtio);
226*25039b37SCy Schubert }
227*25039b37SCy Schubert 
228*25039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void)
229*25039b37SCy Schubert {
230*25039b37SCy Schubert 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
231*25039b37SCy Schubert 	return dtio;
232*25039b37SCy Schubert }
233*25039b37SCy Schubert 
234*25039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio)
235*25039b37SCy Schubert {
236*25039b37SCy Schubert 	struct dt_io_list_item* item, *nextitem;
237*25039b37SCy Schubert 	if(!dtio) return;
238*25039b37SCy Schubert 	item=dtio->io_list;
239*25039b37SCy Schubert 	while(item) {
240*25039b37SCy Schubert 		nextitem = item->next;
241*25039b37SCy Schubert 		free(item);
242*25039b37SCy Schubert 		item = nextitem;
243*25039b37SCy Schubert 	}
244*25039b37SCy Schubert 	free(dtio->socket_path);
245*25039b37SCy Schubert 	free(dtio->ip_str);
246*25039b37SCy Schubert 	free(dtio->tls_server_name);
247*25039b37SCy Schubert 	free(dtio->client_key_file);
248*25039b37SCy Schubert 	free(dtio->client_cert_file);
249*25039b37SCy Schubert 	if(dtio->ssl_ctx) {
250*25039b37SCy Schubert #ifdef HAVE_SSL
251*25039b37SCy Schubert 		SSL_CTX_free(dtio->ssl_ctx);
252*25039b37SCy Schubert #endif
253*25039b37SCy Schubert 	}
254*25039b37SCy Schubert 	free(dtio);
255*25039b37SCy Schubert }
256*25039b37SCy Schubert 
257*25039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
258*25039b37SCy Schubert {
259*25039b37SCy Schubert 	if(!cfg->dnstap) {
260*25039b37SCy Schubert 		log_warn("cannot setup dnstap because dnstap-enable is no");
261*25039b37SCy Schubert 		return 0;
262*25039b37SCy Schubert 	}
263*25039b37SCy Schubert 
264*25039b37SCy Schubert 	/* what type of connectivity do we have */
265*25039b37SCy Schubert 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
266*25039b37SCy Schubert 		if(cfg->dnstap_tls)
267*25039b37SCy Schubert 			dtio->upstream_is_tls = 1;
268*25039b37SCy Schubert 		else	dtio->upstream_is_tcp = 1;
269*25039b37SCy Schubert 	} else {
270*25039b37SCy Schubert 		dtio->upstream_is_unix = 1;
271*25039b37SCy Schubert 	}
272*25039b37SCy Schubert 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
273*25039b37SCy Schubert 
274*25039b37SCy Schubert 	if(dtio->upstream_is_unix) {
275*25039b37SCy Schubert 		if(!cfg->dnstap_socket_path ||
276*25039b37SCy Schubert 			cfg->dnstap_socket_path[0]==0) {
277*25039b37SCy Schubert 			log_err("dnstap setup: no dnstap-socket-path for "
278*25039b37SCy Schubert 				"socket connect");
279*25039b37SCy Schubert 			return 0;
280*25039b37SCy Schubert 		}
281*25039b37SCy Schubert 		free(dtio->socket_path);
282*25039b37SCy Schubert 		dtio->socket_path = strdup(cfg->dnstap_socket_path);
283*25039b37SCy Schubert 		if(!dtio->socket_path) {
284*25039b37SCy Schubert 			log_err("dnstap setup: malloc failure");
285*25039b37SCy Schubert 			return 0;
286*25039b37SCy Schubert 		}
287*25039b37SCy Schubert 	}
288*25039b37SCy Schubert 
289*25039b37SCy Schubert 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
290*25039b37SCy Schubert 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
291*25039b37SCy Schubert 			log_err("dnstap setup: no dnstap-ip for TCP connect");
292*25039b37SCy Schubert 			return 0;
293*25039b37SCy Schubert 		}
294*25039b37SCy Schubert 		free(dtio->ip_str);
295*25039b37SCy Schubert 		dtio->ip_str = strdup(cfg->dnstap_ip);
296*25039b37SCy Schubert 		if(!dtio->ip_str) {
297*25039b37SCy Schubert 			log_err("dnstap setup: malloc failure");
298*25039b37SCy Schubert 			return 0;
299*25039b37SCy Schubert 		}
300*25039b37SCy Schubert 	}
301*25039b37SCy Schubert 
302*25039b37SCy Schubert 	if(dtio->upstream_is_tls) {
303*25039b37SCy Schubert #ifdef HAVE_SSL
304*25039b37SCy Schubert 		if(cfg->dnstap_tls_server_name &&
305*25039b37SCy Schubert 			cfg->dnstap_tls_server_name[0]) {
306*25039b37SCy Schubert 			free(dtio->tls_server_name);
307*25039b37SCy Schubert 			dtio->tls_server_name = strdup(
308*25039b37SCy Schubert 				cfg->dnstap_tls_server_name);
309*25039b37SCy Schubert 			if(!dtio->tls_server_name) {
310*25039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
311*25039b37SCy Schubert 				return 0;
312*25039b37SCy Schubert 			}
313*25039b37SCy Schubert 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
314*25039b37SCy Schubert 				return 0;
315*25039b37SCy Schubert 		}
316*25039b37SCy Schubert 		if(cfg->dnstap_tls_client_key_file &&
317*25039b37SCy Schubert 			cfg->dnstap_tls_client_key_file[0]) {
318*25039b37SCy Schubert 			dtio->use_client_certs = 1;
319*25039b37SCy Schubert 			free(dtio->client_key_file);
320*25039b37SCy Schubert 			dtio->client_key_file = strdup(
321*25039b37SCy Schubert 				cfg->dnstap_tls_client_key_file);
322*25039b37SCy Schubert 			if(!dtio->client_key_file) {
323*25039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
324*25039b37SCy Schubert 				return 0;
325*25039b37SCy Schubert 			}
326*25039b37SCy Schubert 			if(!cfg->dnstap_tls_client_cert_file ||
327*25039b37SCy Schubert 				cfg->dnstap_tls_client_cert_file[0]==0) {
328*25039b37SCy Schubert 				log_err("dnstap setup: client key "
329*25039b37SCy Schubert 					"authentication enabled with "
330*25039b37SCy Schubert 					"dnstap-tls-client-key-file, but "
331*25039b37SCy Schubert 					"no dnstap-tls-client-cert-file "
332*25039b37SCy Schubert 					"is given");
333*25039b37SCy Schubert 				return 0;
334*25039b37SCy Schubert 			}
335*25039b37SCy Schubert 			free(dtio->client_cert_file);
336*25039b37SCy Schubert 			dtio->client_cert_file = strdup(
337*25039b37SCy Schubert 				cfg->dnstap_tls_client_cert_file);
338*25039b37SCy Schubert 			if(!dtio->client_cert_file) {
339*25039b37SCy Schubert 				log_err("dnstap setup: malloc failure");
340*25039b37SCy Schubert 				return 0;
341*25039b37SCy Schubert 			}
342*25039b37SCy Schubert 		} else {
343*25039b37SCy Schubert 			dtio->use_client_certs = 0;
344*25039b37SCy Schubert 			dtio->client_key_file = NULL;
345*25039b37SCy Schubert 			dtio->client_cert_file = NULL;
346*25039b37SCy Schubert 		}
347*25039b37SCy Schubert 
348*25039b37SCy Schubert 		if(cfg->dnstap_tls_cert_bundle) {
349*25039b37SCy Schubert 			dtio->ssl_ctx = connect_sslctx_create(
350*25039b37SCy Schubert 				dtio->client_key_file,
351*25039b37SCy Schubert 				dtio->client_cert_file,
352*25039b37SCy Schubert 				cfg->dnstap_tls_cert_bundle, 0);
353*25039b37SCy Schubert 		} else {
354*25039b37SCy Schubert 			dtio->ssl_ctx = connect_sslctx_create(
355*25039b37SCy Schubert 				dtio->client_key_file,
356*25039b37SCy Schubert 				dtio->client_cert_file,
357*25039b37SCy Schubert 				cfg->tls_cert_bundle, cfg->tls_win_cert);
358*25039b37SCy Schubert 		}
359*25039b37SCy Schubert 		if(!dtio->ssl_ctx) {
360*25039b37SCy Schubert 			log_err("could not setup SSL CTX");
361*25039b37SCy Schubert 			return 0;
362*25039b37SCy Schubert 		}
363*25039b37SCy Schubert 		dtio->tls_use_sni = cfg->tls_use_sni;
364*25039b37SCy Schubert #endif /* HAVE_SSL */
365*25039b37SCy Schubert 	}
366*25039b37SCy Schubert 	return 1;
367*25039b37SCy Schubert }
368*25039b37SCy Schubert 
369*25039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio,
370*25039b37SCy Schubert         struct dt_msg_queue* mq)
371*25039b37SCy Schubert {
372*25039b37SCy Schubert 	struct dt_io_list_item* item = malloc(sizeof(*item));
373*25039b37SCy Schubert 	if(!item) return 0;
374*25039b37SCy Schubert 	lock_basic_lock(&mq->lock);
375*25039b37SCy Schubert 	mq->dtio = dtio;
376*25039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
377*25039b37SCy Schubert 	item->queue = mq;
378*25039b37SCy Schubert 	item->next = dtio->io_list;
379*25039b37SCy Schubert 	dtio->io_list = item;
380*25039b37SCy Schubert 	dtio->io_list_iter = NULL;
381*25039b37SCy Schubert 	return 1;
382*25039b37SCy Schubert }
383*25039b37SCy Schubert 
384*25039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
385*25039b37SCy Schubert         struct dt_msg_queue* mq)
386*25039b37SCy Schubert {
387*25039b37SCy Schubert 	struct dt_io_list_item* item, *prev=NULL;
388*25039b37SCy Schubert 	if(!dtio) return;
389*25039b37SCy Schubert 	item = dtio->io_list;
390*25039b37SCy Schubert 	while(item) {
391*25039b37SCy Schubert 		if(item->queue == mq) {
392*25039b37SCy Schubert 			/* found it */
393*25039b37SCy Schubert 			if(prev) prev->next = item->next;
394*25039b37SCy Schubert 			else dtio->io_list = item->next;
395*25039b37SCy Schubert 			/* the queue itself only registered, not deleted */
396*25039b37SCy Schubert 			lock_basic_lock(&item->queue->lock);
397*25039b37SCy Schubert 			item->queue->dtio = NULL;
398*25039b37SCy Schubert 			lock_basic_unlock(&item->queue->lock);
399*25039b37SCy Schubert 			free(item);
400*25039b37SCy Schubert 			dtio->io_list_iter = NULL;
401*25039b37SCy Schubert 			return;
402*25039b37SCy Schubert 		}
403*25039b37SCy Schubert 		prev = item;
404*25039b37SCy Schubert 		item = item->next;
405*25039b37SCy Schubert 	}
406*25039b37SCy Schubert }
407*25039b37SCy Schubert 
408*25039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks,
409*25039b37SCy Schubert  * returns true if there is a message */
410*25039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
411*25039b37SCy Schubert 	size_t* len)
412*25039b37SCy Schubert {
413*25039b37SCy Schubert 	lock_basic_lock(&mq->lock);
414*25039b37SCy Schubert 	if(mq->first) {
415*25039b37SCy Schubert 		struct dt_msg_entry* entry = mq->first;
416*25039b37SCy Schubert 		mq->first = entry->next;
417*25039b37SCy Schubert 		if(!entry->next) mq->last = NULL;
418*25039b37SCy Schubert 		mq->cursize -= entry->len;
419*25039b37SCy Schubert 		lock_basic_unlock(&mq->lock);
420*25039b37SCy Schubert 
421*25039b37SCy Schubert 		*buf = entry->buf;
422*25039b37SCy Schubert 		*len = entry->len;
423*25039b37SCy Schubert 		free(entry);
424*25039b37SCy Schubert 		return 1;
425*25039b37SCy Schubert 	}
426*25039b37SCy Schubert 	lock_basic_unlock(&mq->lock);
427*25039b37SCy Schubert 	return 0;
428*25039b37SCy Schubert }
429*25039b37SCy Schubert 
430*25039b37SCy Schubert /** find message in queue, false if no message, true if message to send */
431*25039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio,
432*25039b37SCy Schubert 	struct dt_msg_queue* mq)
433*25039b37SCy Schubert {
434*25039b37SCy Schubert 	void* buf=NULL;
435*25039b37SCy Schubert 	size_t len=0;
436*25039b37SCy Schubert 	if(dt_msg_queue_pop(mq, &buf, &len)) {
437*25039b37SCy Schubert 		dtio->cur_msg = buf;
438*25039b37SCy Schubert 		dtio->cur_msg_len = len;
439*25039b37SCy Schubert 		dtio->cur_msg_done = 0;
440*25039b37SCy Schubert 		dtio->cur_msg_len_done = 0;
441*25039b37SCy Schubert 		return 1;
442*25039b37SCy Schubert 	}
443*25039b37SCy Schubert 	return 0;
444*25039b37SCy Schubert }
445*25039b37SCy Schubert 
446*25039b37SCy Schubert /** find a new message to write, search message queues, false if none */
447*25039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio)
448*25039b37SCy Schubert {
449*25039b37SCy Schubert 	struct dt_io_list_item *spot, *item;
450*25039b37SCy Schubert 
451*25039b37SCy Schubert 	spot = dtio->io_list_iter;
452*25039b37SCy Schubert 	/* use the next queue for the next message lookup,
453*25039b37SCy Schubert 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
454*25039b37SCy Schubert 	if(spot)
455*25039b37SCy Schubert 		dtio->io_list_iter = spot->next;
456*25039b37SCy Schubert 	else if(dtio->io_list)
457*25039b37SCy Schubert 		dtio->io_list_iter = dtio->io_list->next;
458*25039b37SCy Schubert 
459*25039b37SCy Schubert 	/* scan from spot to end-of-io_list */
460*25039b37SCy Schubert 	item = spot;
461*25039b37SCy Schubert 	while(item) {
462*25039b37SCy Schubert 		if(dtio_find_in_queue(dtio, item->queue))
463*25039b37SCy Schubert 			return 1;
464*25039b37SCy Schubert 		item = item->next;
465*25039b37SCy Schubert 	}
466*25039b37SCy Schubert 	/* scan starting at the start-of-list (to wrap around the end) */
467*25039b37SCy Schubert 	item = dtio->io_list;
468*25039b37SCy Schubert 	while(item) {
469*25039b37SCy Schubert 		if(dtio_find_in_queue(dtio, item->queue))
470*25039b37SCy Schubert 			return 1;
471*25039b37SCy Schubert 		item = item->next;
472*25039b37SCy Schubert 	}
473*25039b37SCy Schubert 	return 0;
474*25039b37SCy Schubert }
475*25039b37SCy Schubert 
476*25039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */
477*25039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
478*25039b37SCy Schubert 	short ATTR_UNUSED(bits), void* arg)
479*25039b37SCy Schubert {
480*25039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
481*25039b37SCy Schubert 	dtio->reconnect_is_added = 0;
482*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
483*25039b37SCy Schubert 
484*25039b37SCy Schubert 	dtio_open_output(dtio);
485*25039b37SCy Schubert 	if(dtio->event) {
486*25039b37SCy Schubert 		if(!dtio_add_output_event_write(dtio))
487*25039b37SCy Schubert 			return;
488*25039b37SCy Schubert 		/* nothing wrong so far, wait on the output event */
489*25039b37SCy Schubert 		return;
490*25039b37SCy Schubert 	}
491*25039b37SCy Schubert 	/* exponential backoff and retry on timer */
492*25039b37SCy Schubert 	dtio_reconnect_enable(dtio);
493*25039b37SCy Schubert }
494*25039b37SCy Schubert 
495*25039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */
496*25039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio)
497*25039b37SCy Schubert {
498*25039b37SCy Schubert 	struct timeval tv;
499*25039b37SCy Schubert 	int msec;
500*25039b37SCy Schubert 	if(dtio->want_to_exit) return;
501*25039b37SCy Schubert 	if(dtio->reconnect_is_added)
502*25039b37SCy Schubert 		return; /* already done */
503*25039b37SCy Schubert 
504*25039b37SCy Schubert 	/* exponential backoff, store the value for next timeout */
505*25039b37SCy Schubert 	msec = dtio->reconnect_timeout;
506*25039b37SCy Schubert 	if(msec == 0) {
507*25039b37SCy Schubert 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
508*25039b37SCy Schubert 	} else {
509*25039b37SCy Schubert 		dtio->reconnect_timeout = msec*2;
510*25039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
511*25039b37SCy Schubert 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
512*25039b37SCy Schubert 	}
513*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
514*25039b37SCy Schubert 		msec);
515*25039b37SCy Schubert 
516*25039b37SCy Schubert 	/* setup wait timer */
517*25039b37SCy Schubert 	memset(&tv, 0, sizeof(tv));
518*25039b37SCy Schubert 	tv.tv_sec = msec/1000;
519*25039b37SCy Schubert 	tv.tv_usec = (msec%1000)*1000;
520*25039b37SCy Schubert 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
521*25039b37SCy Schubert 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
522*25039b37SCy Schubert 		log_err("dnstap io: could not reconnect ev timer add");
523*25039b37SCy Schubert 		return;
524*25039b37SCy Schubert 	}
525*25039b37SCy Schubert 	dtio->reconnect_is_added = 1;
526*25039b37SCy Schubert }
527*25039b37SCy Schubert 
528*25039b37SCy Schubert /** remove dtio reconnect timer */
529*25039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio)
530*25039b37SCy Schubert {
531*25039b37SCy Schubert 	if(!dtio->reconnect_is_added)
532*25039b37SCy Schubert 		return;
533*25039b37SCy Schubert 	ub_timer_del(dtio->reconnect_timer);
534*25039b37SCy Schubert 	dtio->reconnect_is_added = 0;
535*25039b37SCy Schubert }
536*25039b37SCy Schubert 
537*25039b37SCy Schubert /** clear the reconnect exponential backoff timer.
538*25039b37SCy Schubert  * We have successfully connected so we can try again with short timeouts. */
539*25039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio)
540*25039b37SCy Schubert {
541*25039b37SCy Schubert 	dtio->reconnect_timeout = 0;
542*25039b37SCy Schubert 	dtio_reconnect_del(dtio);
543*25039b37SCy Schubert }
544*25039b37SCy Schubert 
545*25039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */
546*25039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
547*25039b37SCy Schubert {
548*25039b37SCy Schubert 	dtio_reconnect_del(dtio);
549*25039b37SCy Schubert 	dtio->reconnect_timeout = msec;
550*25039b37SCy Schubert 	dtio_reconnect_enable(dtio);
551*25039b37SCy Schubert }
552*25039b37SCy Schubert 
553*25039b37SCy Schubert /** delete the current message in the dtio, and reset counters */
554*25039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio)
555*25039b37SCy Schubert {
556*25039b37SCy Schubert 	free(dtio->cur_msg);
557*25039b37SCy Schubert 	dtio->cur_msg = NULL;
558*25039b37SCy Schubert 	dtio->cur_msg_len = 0;
559*25039b37SCy Schubert 	dtio->cur_msg_done = 0;
560*25039b37SCy Schubert 	dtio->cur_msg_len_done = 0;
561*25039b37SCy Schubert }
562*25039b37SCy Schubert 
563*25039b37SCy Schubert /** delete the buffer and counters used to read frame */
564*25039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
565*25039b37SCy Schubert {
566*25039b37SCy Schubert 	if(rb->buf) {
567*25039b37SCy Schubert 		free(rb->buf);
568*25039b37SCy Schubert 		rb->buf = NULL;
569*25039b37SCy Schubert 	}
570*25039b37SCy Schubert 	rb->buf_count = 0;
571*25039b37SCy Schubert 	rb->buf_cap = 0;
572*25039b37SCy Schubert 	rb->frame_len = 0;
573*25039b37SCy Schubert 	rb->frame_len_done = 0;
574*25039b37SCy Schubert 	rb->control_frame = 0;
575*25039b37SCy Schubert }
576*25039b37SCy Schubert 
577*25039b37SCy Schubert /** del the output file descriptor event for listening */
578*25039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio)
579*25039b37SCy Schubert {
580*25039b37SCy Schubert 	if(!dtio->event_added)
581*25039b37SCy Schubert 		return;
582*25039b37SCy Schubert 	ub_event_del(dtio->event);
583*25039b37SCy Schubert 	dtio->event_added = 0;
584*25039b37SCy Schubert 	dtio->event_added_is_write = 0;
585*25039b37SCy Schubert }
586*25039b37SCy Schubert 
587*25039b37SCy Schubert /** close dtio socket and set it to -1 */
588*25039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio)
589*25039b37SCy Schubert {
590*25039b37SCy Schubert #ifndef USE_WINSOCK
591*25039b37SCy Schubert 	close(dtio->fd);
592*25039b37SCy Schubert #else
593*25039b37SCy Schubert 	closesocket(dtio->fd);
594*25039b37SCy Schubert #endif
595*25039b37SCy Schubert 	dtio->fd = -1;
596*25039b37SCy Schubert }
597*25039b37SCy Schubert 
598*25039b37SCy Schubert /** close and stop the output file descriptor event */
599*25039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio)
600*25039b37SCy Schubert {
601*25039b37SCy Schubert 	if(!dtio->event)
602*25039b37SCy Schubert 		return;
603*25039b37SCy Schubert 	ub_event_free(dtio->event);
604*25039b37SCy Schubert 	dtio->event = NULL;
605*25039b37SCy Schubert 	if(dtio->ssl) {
606*25039b37SCy Schubert #ifdef HAVE_SSL
607*25039b37SCy Schubert 		SSL_shutdown(dtio->ssl);
608*25039b37SCy Schubert 		SSL_free(dtio->ssl);
609*25039b37SCy Schubert 		dtio->ssl = NULL;
610*25039b37SCy Schubert #endif
611*25039b37SCy Schubert 	}
612*25039b37SCy Schubert 	dtio_close_fd(dtio);
613*25039b37SCy Schubert 
614*25039b37SCy Schubert 	/* if there is a (partial) message, discard it
615*25039b37SCy Schubert 	 * we cannot send (the remainder of) it, and a new
616*25039b37SCy Schubert 	 * connection needs to start with a control frame. */
617*25039b37SCy Schubert 	if(dtio->cur_msg) {
618*25039b37SCy Schubert 		dtio_cur_msg_free(dtio);
619*25039b37SCy Schubert 	}
620*25039b37SCy Schubert 
621*25039b37SCy Schubert 	dtio->ready_frame_sent = 0;
622*25039b37SCy Schubert 	dtio->accept_frame_received = 0;
623*25039b37SCy Schubert 	dtio_read_frame_free(&dtio->read_frame);
624*25039b37SCy Schubert 
625*25039b37SCy Schubert 	dtio_reconnect_enable(dtio);
626*25039b37SCy Schubert }
627*25039b37SCy Schubert 
628*25039b37SCy Schubert /** check for pending nonblocking connect errors,
629*25039b37SCy Schubert  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
630*25039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio)
631*25039b37SCy Schubert {
632*25039b37SCy Schubert 	int error = 0;
633*25039b37SCy Schubert 	socklen_t len = (socklen_t)sizeof(error);
634*25039b37SCy Schubert 	if(!dtio->check_nb_connect)
635*25039b37SCy Schubert 		return 1; /* everything okay */
636*25039b37SCy Schubert 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
637*25039b37SCy Schubert 		&len) < 0) {
638*25039b37SCy Schubert #ifndef USE_WINSOCK
639*25039b37SCy Schubert 		error = errno; /* on solaris errno is error */
640*25039b37SCy Schubert #else
641*25039b37SCy Schubert 		error = WSAGetLastError();
642*25039b37SCy Schubert #endif
643*25039b37SCy Schubert 	}
644*25039b37SCy Schubert #ifndef USE_WINSOCK
645*25039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
646*25039b37SCy Schubert 	if(error == EINPROGRESS || error == EWOULDBLOCK)
647*25039b37SCy Schubert 		return 0; /* try again later */
648*25039b37SCy Schubert #endif
649*25039b37SCy Schubert #else
650*25039b37SCy Schubert 	if(error == WSAEINPROGRESS) {
651*25039b37SCy Schubert 		return 0; /* try again later */
652*25039b37SCy Schubert 	} else if(error == WSAEWOULDBLOCK) {
653*25039b37SCy Schubert 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
654*25039b37SCy Schubert 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
655*25039b37SCy Schubert 		return 0; /* try again later */
656*25039b37SCy Schubert 	}
657*25039b37SCy Schubert #endif
658*25039b37SCy Schubert 	if(error != 0) {
659*25039b37SCy Schubert 		char* to = dtio->socket_path;
660*25039b37SCy Schubert 		if(!to) to = dtio->ip_str;
661*25039b37SCy Schubert 		if(!to) to = "";
662*25039b37SCy Schubert #ifndef USE_WINSOCK
663*25039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
664*25039b37SCy Schubert 			to, strerror(error));
665*25039b37SCy Schubert #else
666*25039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
667*25039b37SCy Schubert 			to, wsa_strerror(error));
668*25039b37SCy Schubert #endif
669*25039b37SCy Schubert 		return -1; /* error, close it */
670*25039b37SCy Schubert 	}
671*25039b37SCy Schubert 
672*25039b37SCy Schubert 	if(dtio->ip_str)
673*25039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
674*25039b37SCy Schubert 			dtio->ip_str);
675*25039b37SCy Schubert 	else if(dtio->socket_path)
676*25039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
677*25039b37SCy Schubert 			dtio->socket_path);
678*25039b37SCy Schubert 	dtio_reconnect_clear(dtio);
679*25039b37SCy Schubert 	dtio->check_nb_connect = 0;
680*25039b37SCy Schubert 	return 1; /* everything okay */
681*25039b37SCy Schubert }
682*25039b37SCy Schubert 
683*25039b37SCy Schubert #ifdef HAVE_SSL
684*25039b37SCy Schubert /** write to ssl output
685*25039b37SCy Schubert  * returns number of bytes written, 0 if nothing happened,
686*25039b37SCy Schubert  * try again later, or -1 if the channel is to be closed. */
687*25039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
688*25039b37SCy Schubert 	size_t len)
689*25039b37SCy Schubert {
690*25039b37SCy Schubert 	int r;
691*25039b37SCy Schubert 	ERR_clear_error();
692*25039b37SCy Schubert 	r = SSL_write(dtio->ssl, buf, len);
693*25039b37SCy Schubert 	if(r <= 0) {
694*25039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
695*25039b37SCy Schubert 		if(want == SSL_ERROR_ZERO_RETURN) {
696*25039b37SCy Schubert 			/* closed */
697*25039b37SCy Schubert 			return -1;
698*25039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_READ) {
699*25039b37SCy Schubert 			/* we want a brief read event */
700*25039b37SCy Schubert 			dtio_enable_brief_read(dtio);
701*25039b37SCy Schubert 			return 0;
702*25039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
703*25039b37SCy Schubert 			/* write again later */
704*25039b37SCy Schubert 			return 0;
705*25039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
706*25039b37SCy Schubert #ifdef EPIPE
707*25039b37SCy Schubert 			if(errno == EPIPE && verbosity < 2)
708*25039b37SCy Schubert 				return -1; /* silence 'broken pipe' */
709*25039b37SCy Schubert #endif
710*25039b37SCy Schubert #ifdef ECONNRESET
711*25039b37SCy Schubert 			if(errno == ECONNRESET && verbosity < 2)
712*25039b37SCy Schubert 				return -1; /* silence reset by peer */
713*25039b37SCy Schubert #endif
714*25039b37SCy Schubert 			if(errno != 0) {
715*25039b37SCy Schubert 				log_err("dnstap io, SSL_write syscall: %s",
716*25039b37SCy Schubert 					strerror(errno));
717*25039b37SCy Schubert 			}
718*25039b37SCy Schubert 			return -1;
719*25039b37SCy Schubert 		}
720*25039b37SCy Schubert 		log_crypto_err("dnstap io, could not SSL_write");
721*25039b37SCy Schubert 		return -1;
722*25039b37SCy Schubert 	}
723*25039b37SCy Schubert 	return r;
724*25039b37SCy Schubert }
725*25039b37SCy Schubert #endif /* HAVE_SSL */
726*25039b37SCy Schubert 
727*25039b37SCy Schubert /** write buffer to output.
728*25039b37SCy Schubert  * returns number of bytes written, 0 if nothing happened,
729*25039b37SCy Schubert  * try again later, or -1 if the channel is to be closed. */
730*25039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
731*25039b37SCy Schubert 	size_t len)
732*25039b37SCy Schubert {
733*25039b37SCy Schubert 	ssize_t ret;
734*25039b37SCy Schubert 	if(dtio->fd == -1)
735*25039b37SCy Schubert 		return -1;
736*25039b37SCy Schubert #ifdef HAVE_SSL
737*25039b37SCy Schubert 	if(dtio->ssl)
738*25039b37SCy Schubert 		return dtio_write_ssl(dtio, buf, len);
739*25039b37SCy Schubert #endif
740*25039b37SCy Schubert 	ret = send(dtio->fd, (void*)buf, len, 0);
741*25039b37SCy Schubert 	if(ret == -1) {
742*25039b37SCy Schubert #ifndef USE_WINSOCK
743*25039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
744*25039b37SCy Schubert 			return 0;
745*25039b37SCy Schubert 		log_err("dnstap io: failed send: %s", strerror(errno));
746*25039b37SCy Schubert #else
747*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
748*25039b37SCy Schubert 			return 0;
749*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
750*25039b37SCy Schubert 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
751*25039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
752*25039b37SCy Schubert 				UB_EV_WRITE);
753*25039b37SCy Schubert 			return 0;
754*25039b37SCy Schubert 		}
755*25039b37SCy Schubert 		log_err("dnstap io: failed send: %s",
756*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
757*25039b37SCy Schubert #endif
758*25039b37SCy Schubert 		return -1;
759*25039b37SCy Schubert 	}
760*25039b37SCy Schubert 	return ret;
761*25039b37SCy Schubert }
762*25039b37SCy Schubert 
763*25039b37SCy Schubert #ifdef HAVE_WRITEV
764*25039b37SCy Schubert /** write with writev, len and message, in one write, if possible.
765*25039b37SCy Schubert  * return true if message is done, false if incomplete */
766*25039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio)
767*25039b37SCy Schubert {
768*25039b37SCy Schubert 	uint32_t sendlen = htonl(dtio->cur_msg_len);
769*25039b37SCy Schubert 	struct iovec iov[2];
770*25039b37SCy Schubert 	ssize_t r;
771*25039b37SCy Schubert 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
772*25039b37SCy Schubert 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
773*25039b37SCy Schubert 	iov[1].iov_base = dtio->cur_msg;
774*25039b37SCy Schubert 	iov[1].iov_len = dtio->cur_msg_len;
775*25039b37SCy Schubert 	log_assert(iov[0].iov_len > 0);
776*25039b37SCy Schubert 	r = writev(dtio->fd, iov, 2);
777*25039b37SCy Schubert 	if(r == -1) {
778*25039b37SCy Schubert #ifndef USE_WINSOCK
779*25039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
780*25039b37SCy Schubert 			return 0;
781*25039b37SCy Schubert 		log_err("dnstap io: failed writev: %s", strerror(errno));
782*25039b37SCy Schubert #else
783*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
784*25039b37SCy Schubert 			return 0;
785*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
786*25039b37SCy Schubert 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
787*25039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
788*25039b37SCy Schubert 				UB_EV_WRITE);
789*25039b37SCy Schubert 			return 0;
790*25039b37SCy Schubert 		}
791*25039b37SCy Schubert 		log_err("dnstap io: failed writev: %s",
792*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
793*25039b37SCy Schubert #endif
794*25039b37SCy Schubert 		/* close the channel */
795*25039b37SCy Schubert 		dtio_del_output_event(dtio);
796*25039b37SCy Schubert 		dtio_close_output(dtio);
797*25039b37SCy Schubert 		return 0;
798*25039b37SCy Schubert 	}
799*25039b37SCy Schubert 	/* written r bytes */
800*25039b37SCy Schubert 	dtio->cur_msg_len_done += r;
801*25039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4)
802*25039b37SCy Schubert 		return 0;
803*25039b37SCy Schubert 	if(dtio->cur_msg_len_done > 4) {
804*25039b37SCy Schubert 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
805*25039b37SCy Schubert 		dtio->cur_msg_len_done = 4;
806*25039b37SCy Schubert 	}
807*25039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len)
808*25039b37SCy Schubert 		return 0;
809*25039b37SCy Schubert 	return 1;
810*25039b37SCy Schubert }
811*25039b37SCy Schubert #endif /* HAVE_WRITEV */
812*25039b37SCy Schubert 
813*25039b37SCy Schubert /** write more of the length, preceding the data frame.
814*25039b37SCy Schubert  * return true if message is done, false if incomplete. */
815*25039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio)
816*25039b37SCy Schubert {
817*25039b37SCy Schubert 	uint32_t sendlen;
818*25039b37SCy Schubert 	int r;
819*25039b37SCy Schubert 	if(dtio->cur_msg_len_done >= 4)
820*25039b37SCy Schubert 		return 1;
821*25039b37SCy Schubert #ifdef HAVE_WRITEV
822*25039b37SCy Schubert 	if(!dtio->ssl) {
823*25039b37SCy Schubert 		/* we try writev for everything.*/
824*25039b37SCy Schubert 		return dtio_write_with_writev(dtio);
825*25039b37SCy Schubert 	}
826*25039b37SCy Schubert #endif /* HAVE_WRITEV */
827*25039b37SCy Schubert 	sendlen = htonl(dtio->cur_msg_len);
828*25039b37SCy Schubert 	r = dtio_write_buf(dtio,
829*25039b37SCy Schubert 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
830*25039b37SCy Schubert 		sizeof(sendlen)-dtio->cur_msg_len_done);
831*25039b37SCy Schubert 	if(r == -1) {
832*25039b37SCy Schubert 		/* close the channel */
833*25039b37SCy Schubert 		dtio_del_output_event(dtio);
834*25039b37SCy Schubert 		dtio_close_output(dtio);
835*25039b37SCy Schubert 		return 0;
836*25039b37SCy Schubert 	} else if(r == 0) {
837*25039b37SCy Schubert 		/* try again later */
838*25039b37SCy Schubert 		return 0;
839*25039b37SCy Schubert 	}
840*25039b37SCy Schubert 	dtio->cur_msg_len_done += r;
841*25039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4)
842*25039b37SCy Schubert 		return 0;
843*25039b37SCy Schubert 	return 1;
844*25039b37SCy Schubert }
845*25039b37SCy Schubert 
846*25039b37SCy Schubert /** write more of the data frame.
847*25039b37SCy Schubert  * return true if message is done, false if incomplete. */
848*25039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio)
849*25039b37SCy Schubert {
850*25039b37SCy Schubert 	int r;
851*25039b37SCy Schubert 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
852*25039b37SCy Schubert 		return 1;
853*25039b37SCy Schubert 	r = dtio_write_buf(dtio,
854*25039b37SCy Schubert 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
855*25039b37SCy Schubert 		dtio->cur_msg_len - dtio->cur_msg_done);
856*25039b37SCy Schubert 	if(r == -1) {
857*25039b37SCy Schubert 		/* close the channel */
858*25039b37SCy Schubert 		dtio_del_output_event(dtio);
859*25039b37SCy Schubert 		dtio_close_output(dtio);
860*25039b37SCy Schubert 		return 0;
861*25039b37SCy Schubert 	} else if(r == 0) {
862*25039b37SCy Schubert 		/* try again later */
863*25039b37SCy Schubert 		return 0;
864*25039b37SCy Schubert 	}
865*25039b37SCy Schubert 	dtio->cur_msg_done += r;
866*25039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len)
867*25039b37SCy Schubert 		return 0;
868*25039b37SCy Schubert 	return 1;
869*25039b37SCy Schubert }
870*25039b37SCy Schubert 
871*25039b37SCy Schubert /** write more of the current messsage. false if incomplete, true if
872*25039b37SCy Schubert  * the message is done */
873*25039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio)
874*25039b37SCy Schubert {
875*25039b37SCy Schubert 	if(dtio->cur_msg_len_done < 4) {
876*25039b37SCy Schubert 		if(!dtio_write_more_of_len(dtio))
877*25039b37SCy Schubert 			return 0;
878*25039b37SCy Schubert 	}
879*25039b37SCy Schubert 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
880*25039b37SCy Schubert 		if(!dtio_write_more_of_data(dtio))
881*25039b37SCy Schubert 			return 0;
882*25039b37SCy Schubert 	}
883*25039b37SCy Schubert 	return 1;
884*25039b37SCy Schubert }
885*25039b37SCy Schubert 
886*25039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
887*25039b37SCy Schubert  * -1: continue, >0: number of bytes read into buffer */
888*25039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
889*25039b37SCy Schubert 	ssize_t r;
890*25039b37SCy Schubert 	r = recv(dtio->fd, (void*)buf, len, 0);
891*25039b37SCy Schubert 	if(r == -1) {
892*25039b37SCy Schubert 		char* to = dtio->socket_path;
893*25039b37SCy Schubert 		if(!to) to = dtio->ip_str;
894*25039b37SCy Schubert 		if(!to) to = "";
895*25039b37SCy Schubert #ifndef USE_WINSOCK
896*25039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
897*25039b37SCy Schubert 			return -1; /* try later */
898*25039b37SCy Schubert #else
899*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS) {
900*25039b37SCy Schubert 			return -1; /* try later */
901*25039b37SCy Schubert 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
902*25039b37SCy Schubert 			ub_winsock_tcp_wouldblock(
903*25039b37SCy Schubert 				(dtio->stop_flush_event?
904*25039b37SCy Schubert 				dtio->stop_flush_event:dtio->event),
905*25039b37SCy Schubert 				UB_EV_READ);
906*25039b37SCy Schubert 			return -1; /* try later */
907*25039b37SCy Schubert 		}
908*25039b37SCy Schubert #endif
909*25039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
910*25039b37SCy Schubert 			verbosity < 4)
911*25039b37SCy Schubert 			return 0; /* no log retries on low verbosity */
912*25039b37SCy Schubert 		log_err("dnstap io: output closed, recv %s: %s", to,
913*25039b37SCy Schubert 			strerror(errno));
914*25039b37SCy Schubert 		/* and close below */
915*25039b37SCy Schubert 		return 0;
916*25039b37SCy Schubert 	}
917*25039b37SCy Schubert 	if(r == 0) {
918*25039b37SCy Schubert 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
919*25039b37SCy Schubert 			verbosity < 4)
920*25039b37SCy Schubert 			return 0; /* no log retries on low verbosity */
921*25039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
922*25039b37SCy Schubert 		/* and close below */
923*25039b37SCy Schubert 		return 0;
924*25039b37SCy Schubert 	}
925*25039b37SCy Schubert 	/* something was received */
926*25039b37SCy Schubert 	return r;
927*25039b37SCy Schubert }
928*25039b37SCy Schubert 
929*25039b37SCy Schubert #ifdef HAVE_SSL
930*25039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
931*25039b37SCy Schubert  * -1: continue, >0: number of bytes read into buffer */
932*25039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
933*25039b37SCy Schubert {
934*25039b37SCy Schubert 	int r;
935*25039b37SCy Schubert 	ERR_clear_error();
936*25039b37SCy Schubert 	r = SSL_read(dtio->ssl, buf, len);
937*25039b37SCy Schubert 	if(r <= 0) {
938*25039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
939*25039b37SCy Schubert 		if(want == SSL_ERROR_ZERO_RETURN) {
940*25039b37SCy Schubert 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
941*25039b37SCy Schubert 				verbosity < 4)
942*25039b37SCy Schubert 				return 0; /* no log retries on low verbosity */
943*25039b37SCy Schubert 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
944*25039b37SCy Schubert 				"other side");
945*25039b37SCy Schubert 			return 0;
946*25039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_READ) {
947*25039b37SCy Schubert 			/* continue later */
948*25039b37SCy Schubert 			return -1;
949*25039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
950*25039b37SCy Schubert 			(void)dtio_enable_brief_write(dtio);
951*25039b37SCy Schubert 			return -1;
952*25039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
953*25039b37SCy Schubert #ifdef ECONNRESET
954*25039b37SCy Schubert 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
955*25039b37SCy Schubert 				errno == ECONNRESET && verbosity < 4)
956*25039b37SCy Schubert 				return 0; /* silence reset by peer */
957*25039b37SCy Schubert #endif
958*25039b37SCy Schubert 			if(errno != 0)
959*25039b37SCy Schubert 				log_err("SSL_read syscall: %s",
960*25039b37SCy Schubert 					strerror(errno));
961*25039b37SCy Schubert 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
962*25039b37SCy Schubert 				"other side");
963*25039b37SCy Schubert 			return 0;
964*25039b37SCy Schubert 		}
965*25039b37SCy Schubert 		log_crypto_err("could not SSL_read");
966*25039b37SCy Schubert 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
967*25039b37SCy Schubert 				"other side");
968*25039b37SCy Schubert 		return 0;
969*25039b37SCy Schubert 	}
970*25039b37SCy Schubert 	return r;
971*25039b37SCy Schubert }
972*25039b37SCy Schubert #endif /* HAVE_SSL */
973*25039b37SCy Schubert 
974*25039b37SCy Schubert /** check if the output fd has been closed,
975*25039b37SCy Schubert  * it returns false if the stream is closed. */
976*25039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio)
977*25039b37SCy Schubert {
978*25039b37SCy Schubert 	/* we don't want to read any packets, but if there are we can
979*25039b37SCy Schubert 	 * discard the input (ignore it).  Ignore of unknown (control)
980*25039b37SCy Schubert 	 * packets is okay for the framestream protocol.  And also, the
981*25039b37SCy Schubert 	 * read call can return that the stream has been closed by the
982*25039b37SCy Schubert 	 * other side. */
983*25039b37SCy Schubert 	uint8_t buf[1024];
984*25039b37SCy Schubert 	int r = -1;
985*25039b37SCy Schubert 
986*25039b37SCy Schubert 
987*25039b37SCy Schubert 	if(dtio->fd == -1) return 0;
988*25039b37SCy Schubert 
989*25039b37SCy Schubert 	while(r != 0) {
990*25039b37SCy Schubert 		/* not interested in buffer content, overwrite */
991*25039b37SCy Schubert 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
992*25039b37SCy Schubert 		if(r == -1)
993*25039b37SCy Schubert 			return 1;
994*25039b37SCy Schubert 	}
995*25039b37SCy Schubert 	/* the other end has been closed */
996*25039b37SCy Schubert 	/* close the channel */
997*25039b37SCy Schubert 	dtio_del_output_event(dtio);
998*25039b37SCy Schubert 	dtio_close_output(dtio);
999*25039b37SCy Schubert 	return 0;
1000*25039b37SCy Schubert }
1001*25039b37SCy Schubert 
1002*25039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed,
1003*25039b37SCy Schubert  * 1: valid accept received. */
1004*25039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1005*25039b37SCy Schubert {
1006*25039b37SCy Schubert 	int r;
1007*25039b37SCy Schubert 	size_t read_frame_done;
1008*25039b37SCy Schubert 	while(dtio->read_frame.frame_len_done < 4) {
1009*25039b37SCy Schubert #ifdef HAVE_SSL
1010*25039b37SCy Schubert 		if(dtio->ssl) {
1011*25039b37SCy Schubert 			r = ssl_read_bytes(dtio,
1012*25039b37SCy Schubert 				(uint8_t*)&dtio->read_frame.frame_len+
1013*25039b37SCy Schubert 				dtio->read_frame.frame_len_done,
1014*25039b37SCy Schubert 				4-dtio->read_frame.frame_len_done);
1015*25039b37SCy Schubert 		} else {
1016*25039b37SCy Schubert #endif
1017*25039b37SCy Schubert 			r = receive_bytes(dtio,
1018*25039b37SCy Schubert 				(uint8_t*)&dtio->read_frame.frame_len+
1019*25039b37SCy Schubert 				dtio->read_frame.frame_len_done,
1020*25039b37SCy Schubert 				4-dtio->read_frame.frame_len_done);
1021*25039b37SCy Schubert #ifdef HAVE_SSL
1022*25039b37SCy Schubert 		}
1023*25039b37SCy Schubert #endif
1024*25039b37SCy Schubert 		if(r == -1)
1025*25039b37SCy Schubert 			return -1; /* continue reading */
1026*25039b37SCy Schubert 		if(r == 0) {
1027*25039b37SCy Schubert 			 /* connection closed */
1028*25039b37SCy Schubert 			goto close_connection;
1029*25039b37SCy Schubert 		}
1030*25039b37SCy Schubert 		dtio->read_frame.frame_len_done += r;
1031*25039b37SCy Schubert 		if(dtio->read_frame.frame_len_done < 4)
1032*25039b37SCy Schubert 			return -1; /* continue reading */
1033*25039b37SCy Schubert 
1034*25039b37SCy Schubert 		if(dtio->read_frame.frame_len == 0) {
1035*25039b37SCy Schubert 			dtio->read_frame.frame_len_done = 0;
1036*25039b37SCy Schubert 			dtio->read_frame.control_frame = 1;
1037*25039b37SCy Schubert 			continue;
1038*25039b37SCy Schubert 		}
1039*25039b37SCy Schubert 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1040*25039b37SCy Schubert 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1041*25039b37SCy Schubert 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1042*25039b37SCy Schubert 				"length of %d bytes, closing connection",
1043*25039b37SCy Schubert 				DTIO_RECV_FRAME_MAX_LEN);
1044*25039b37SCy Schubert 			goto close_connection;
1045*25039b37SCy Schubert 		}
1046*25039b37SCy Schubert 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1047*25039b37SCy Schubert 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1048*25039b37SCy Schubert 		if(!dtio->read_frame.buf) {
1049*25039b37SCy Schubert 			log_err("dnstap io: out of memory (creating read "
1050*25039b37SCy Schubert 				"buffer)");
1051*25039b37SCy Schubert 			goto close_connection;
1052*25039b37SCy Schubert 		}
1053*25039b37SCy Schubert 	}
1054*25039b37SCy Schubert 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1055*25039b37SCy Schubert #ifdef HAVE_SSL
1056*25039b37SCy Schubert 		if(dtio->ssl) {
1057*25039b37SCy Schubert 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1058*25039b37SCy Schubert 				dtio->read_frame.buf_count,
1059*25039b37SCy Schubert 				dtio->read_frame.buf_cap-
1060*25039b37SCy Schubert 				dtio->read_frame.buf_count);
1061*25039b37SCy Schubert 		} else {
1062*25039b37SCy Schubert #endif
1063*25039b37SCy Schubert 			r = receive_bytes(dtio, dtio->read_frame.buf+
1064*25039b37SCy Schubert 				dtio->read_frame.buf_count,
1065*25039b37SCy Schubert 				dtio->read_frame.buf_cap-
1066*25039b37SCy Schubert 				dtio->read_frame.buf_count);
1067*25039b37SCy Schubert #ifdef HAVE_SSL
1068*25039b37SCy Schubert 		}
1069*25039b37SCy Schubert #endif
1070*25039b37SCy Schubert 		if(r == -1)
1071*25039b37SCy Schubert 			return -1; /* continue reading */
1072*25039b37SCy Schubert 		if(r == 0) {
1073*25039b37SCy Schubert 			 /* connection closed */
1074*25039b37SCy Schubert 			goto close_connection;
1075*25039b37SCy Schubert 		}
1076*25039b37SCy Schubert 		dtio->read_frame.buf_count += r;
1077*25039b37SCy Schubert 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1078*25039b37SCy Schubert 			return -1; /* continue reading */
1079*25039b37SCy Schubert 	}
1080*25039b37SCy Schubert 
1081*25039b37SCy Schubert 	/* Complete frame received, check if this is a valid ACCEPT control
1082*25039b37SCy Schubert 	 * frame. */
1083*25039b37SCy Schubert 	if(dtio->read_frame.frame_len < 4) {
1084*25039b37SCy Schubert 		verbose(VERB_OPS, "dnstap: invalid data received");
1085*25039b37SCy Schubert 		goto close_connection;
1086*25039b37SCy Schubert 	}
1087*25039b37SCy Schubert 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1088*25039b37SCy Schubert 		FSTRM_CONTROL_FRAME_ACCEPT) {
1089*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1090*25039b37SCy Schubert 			"ignored");
1091*25039b37SCy Schubert 		dtio->ready_frame_sent = 0;
1092*25039b37SCy Schubert 		dtio->accept_frame_received = 0;
1093*25039b37SCy Schubert 		dtio_read_frame_free(&dtio->read_frame);
1094*25039b37SCy Schubert 		return -1;
1095*25039b37SCy Schubert 	}
1096*25039b37SCy Schubert 	read_frame_done = 4; /* control frame type */
1097*25039b37SCy Schubert 
1098*25039b37SCy Schubert 	/* Iterate over control fields, ignore unknown types.
1099*25039b37SCy Schubert 	 * Need to be able to read at least 8 bytes (control field type +
1100*25039b37SCy Schubert 	 * length). */
1101*25039b37SCy Schubert 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1102*25039b37SCy Schubert 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1103*25039b37SCy Schubert 			read_frame_done);
1104*25039b37SCy Schubert 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1105*25039b37SCy Schubert 			read_frame_done + 4);
1106*25039b37SCy Schubert 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1107*25039b37SCy Schubert 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1108*25039b37SCy Schubert 				read_frame_done+8+len <=
1109*25039b37SCy Schubert 				dtio->read_frame.frame_len &&
1110*25039b37SCy Schubert 				memcmp(dtio->read_frame.buf + read_frame_done +
1111*25039b37SCy Schubert 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1112*25039b37SCy Schubert 				if(!dtio_control_start_send(dtio)) {
1113*25039b37SCy Schubert 					verbose(VERB_OPS, "dnstap io: out of "
1114*25039b37SCy Schubert 					 "memory while sending START frame");
1115*25039b37SCy Schubert 					goto close_connection;
1116*25039b37SCy Schubert 				}
1117*25039b37SCy Schubert 				dtio->accept_frame_received = 1;
1118*25039b37SCy Schubert 				return 1;
1119*25039b37SCy Schubert 			} else {
1120*25039b37SCy Schubert 				/* unknow content type */
1121*25039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1122*25039b37SCy Schubert 					"contains unknown content type, "
1123*25039b37SCy Schubert 					"closing connection");
1124*25039b37SCy Schubert 				goto close_connection;
1125*25039b37SCy Schubert 			}
1126*25039b37SCy Schubert 		}
1127*25039b37SCy Schubert 		/* unknown option, try next */
1128*25039b37SCy Schubert 		read_frame_done += 8+len;
1129*25039b37SCy Schubert 	}
1130*25039b37SCy Schubert 
1131*25039b37SCy Schubert 
1132*25039b37SCy Schubert close_connection:
1133*25039b37SCy Schubert 	dtio_del_output_event(dtio);
1134*25039b37SCy Schubert 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1135*25039b37SCy Schubert 	dtio_close_output(dtio);
1136*25039b37SCy Schubert 	return 0;
1137*25039b37SCy Schubert }
1138*25039b37SCy Schubert 
1139*25039b37SCy Schubert /** add the output file descriptor event for listening, read only */
1140*25039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1141*25039b37SCy Schubert {
1142*25039b37SCy Schubert 	if(!dtio->event)
1143*25039b37SCy Schubert 		return 0;
1144*25039b37SCy Schubert 	if(dtio->event_added && !dtio->event_added_is_write)
1145*25039b37SCy Schubert 		return 1;
1146*25039b37SCy Schubert 	/* we have to (re-)register the event */
1147*25039b37SCy Schubert 	if(dtio->event_added)
1148*25039b37SCy Schubert 		ub_event_del(dtio->event);
1149*25039b37SCy Schubert 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1150*25039b37SCy Schubert 	if(ub_event_add(dtio->event, NULL) != 0) {
1151*25039b37SCy Schubert 		log_err("dnstap io: out of memory (adding event)");
1152*25039b37SCy Schubert 		dtio->event_added = 0;
1153*25039b37SCy Schubert 		dtio->event_added_is_write = 0;
1154*25039b37SCy Schubert 		/* close output and start reattempts to open it */
1155*25039b37SCy Schubert 		dtio_close_output(dtio);
1156*25039b37SCy Schubert 		return 0;
1157*25039b37SCy Schubert 	}
1158*25039b37SCy Schubert 	dtio->event_added = 1;
1159*25039b37SCy Schubert 	dtio->event_added_is_write = 0;
1160*25039b37SCy Schubert 	return 1;
1161*25039b37SCy Schubert }
1162*25039b37SCy Schubert 
1163*25039b37SCy Schubert /** add the output file descriptor event for listening, read and write */
1164*25039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1165*25039b37SCy Schubert {
1166*25039b37SCy Schubert 	if(!dtio->event)
1167*25039b37SCy Schubert 		return 0;
1168*25039b37SCy Schubert 	if(dtio->event_added && dtio->event_added_is_write)
1169*25039b37SCy Schubert 		return 1;
1170*25039b37SCy Schubert 	/* we have to (re-)register the event */
1171*25039b37SCy Schubert 	if(dtio->event_added)
1172*25039b37SCy Schubert 		ub_event_del(dtio->event);
1173*25039b37SCy Schubert 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1174*25039b37SCy Schubert 	if(ub_event_add(dtio->event, NULL) != 0) {
1175*25039b37SCy Schubert 		log_err("dnstap io: out of memory (adding event)");
1176*25039b37SCy Schubert 		dtio->event_added = 0;
1177*25039b37SCy Schubert 		dtio->event_added_is_write = 0;
1178*25039b37SCy Schubert 		/* close output and start reattempts to open it */
1179*25039b37SCy Schubert 		dtio_close_output(dtio);
1180*25039b37SCy Schubert 		return 0;
1181*25039b37SCy Schubert 	}
1182*25039b37SCy Schubert 	dtio->event_added = 1;
1183*25039b37SCy Schubert 	dtio->event_added_is_write = 1;
1184*25039b37SCy Schubert 	return 1;
1185*25039b37SCy Schubert }
1186*25039b37SCy Schubert 
1187*25039b37SCy Schubert /** put the dtio thread to sleep */
1188*25039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio)
1189*25039b37SCy Schubert {
1190*25039b37SCy Schubert 	/* unregister the event polling for write, because there is
1191*25039b37SCy Schubert 	 * nothing to be written */
1192*25039b37SCy Schubert 	(void)dtio_add_output_event_read(dtio);
1193*25039b37SCy Schubert }
1194*25039b37SCy Schubert 
1195*25039b37SCy Schubert #ifdef HAVE_SSL
1196*25039b37SCy Schubert /** enable the brief read condition */
1197*25039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1198*25039b37SCy Schubert {
1199*25039b37SCy Schubert 	dtio->ssl_brief_read = 1;
1200*25039b37SCy Schubert 	if(dtio->stop_flush_event) {
1201*25039b37SCy Schubert 		ub_event_del(dtio->stop_flush_event);
1202*25039b37SCy Schubert 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1203*25039b37SCy Schubert 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1204*25039b37SCy Schubert 			log_err("dnstap io, stop flush, could not ub_event_add");
1205*25039b37SCy Schubert 			return 0;
1206*25039b37SCy Schubert 		}
1207*25039b37SCy Schubert 		return 1;
1208*25039b37SCy Schubert 	}
1209*25039b37SCy Schubert 	return dtio_add_output_event_read(dtio);
1210*25039b37SCy Schubert }
1211*25039b37SCy Schubert #endif /* HAVE_SSL */
1212*25039b37SCy Schubert 
1213*25039b37SCy Schubert #ifdef HAVE_SSL
1214*25039b37SCy Schubert /** disable the brief read condition */
1215*25039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1216*25039b37SCy Schubert {
1217*25039b37SCy Schubert 	dtio->ssl_brief_read = 0;
1218*25039b37SCy Schubert 	if(dtio->stop_flush_event) {
1219*25039b37SCy Schubert 		ub_event_del(dtio->stop_flush_event);
1220*25039b37SCy Schubert 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1221*25039b37SCy Schubert 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1222*25039b37SCy Schubert 			log_err("dnstap io, stop flush, could not ub_event_add");
1223*25039b37SCy Schubert 			return 0;
1224*25039b37SCy Schubert 		}
1225*25039b37SCy Schubert 		return 1;
1226*25039b37SCy Schubert 	}
1227*25039b37SCy Schubert 	return dtio_add_output_event_write(dtio);
1228*25039b37SCy Schubert }
1229*25039b37SCy Schubert #endif /* HAVE_SSL */
1230*25039b37SCy Schubert 
1231*25039b37SCy Schubert #ifdef HAVE_SSL
1232*25039b37SCy Schubert /** enable the brief write condition */
1233*25039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1234*25039b37SCy Schubert {
1235*25039b37SCy Schubert 	dtio->ssl_brief_write = 1;
1236*25039b37SCy Schubert 	return dtio_add_output_event_write(dtio);
1237*25039b37SCy Schubert }
1238*25039b37SCy Schubert #endif /* HAVE_SSL */
1239*25039b37SCy Schubert 
1240*25039b37SCy Schubert #ifdef HAVE_SSL
1241*25039b37SCy Schubert /** disable the brief write condition */
1242*25039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1243*25039b37SCy Schubert {
1244*25039b37SCy Schubert 	dtio->ssl_brief_write = 0;
1245*25039b37SCy Schubert 	return dtio_add_output_event_read(dtio);
1246*25039b37SCy Schubert }
1247*25039b37SCy Schubert #endif /* HAVE_SSL */
1248*25039b37SCy Schubert 
1249*25039b37SCy Schubert #ifdef HAVE_SSL
1250*25039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/
1251*25039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1252*25039b37SCy Schubert {
1253*25039b37SCy Schubert 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1254*25039b37SCy Schubert 		/* verification */
1255*25039b37SCy Schubert 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1256*25039b37SCy Schubert 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1257*25039b37SCy Schubert 			if(!x) {
1258*25039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1259*25039b37SCy Schubert 					"connection failed no certificate",
1260*25039b37SCy Schubert 					dtio->ip_str);
1261*25039b37SCy Schubert 				return 0;
1262*25039b37SCy Schubert 			}
1263*25039b37SCy Schubert 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1264*25039b37SCy Schubert 				x);
1265*25039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME
1266*25039b37SCy Schubert 			if(SSL_get0_peername(dtio->ssl)) {
1267*25039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1268*25039b37SCy Schubert 					"connection to %s authenticated",
1269*25039b37SCy Schubert 					dtio->ip_str,
1270*25039b37SCy Schubert 					SSL_get0_peername(dtio->ssl));
1271*25039b37SCy Schubert 			} else {
1272*25039b37SCy Schubert #endif
1273*25039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1274*25039b37SCy Schubert 					"connection authenticated",
1275*25039b37SCy Schubert 					dtio->ip_str);
1276*25039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME
1277*25039b37SCy Schubert 			}
1278*25039b37SCy Schubert #endif
1279*25039b37SCy Schubert 			X509_free(x);
1280*25039b37SCy Schubert 		} else {
1281*25039b37SCy Schubert 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1282*25039b37SCy Schubert 			if(x) {
1283*25039b37SCy Schubert 				log_cert(VERB_ALGO, "dnstap io, peer "
1284*25039b37SCy Schubert 					"certificate", x);
1285*25039b37SCy Schubert 				X509_free(x);
1286*25039b37SCy Schubert 			}
1287*25039b37SCy Schubert 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1288*25039b37SCy Schubert 				"failed: failed to authenticate",
1289*25039b37SCy Schubert 				dtio->ip_str);
1290*25039b37SCy Schubert 			return 0;
1291*25039b37SCy Schubert 		}
1292*25039b37SCy Schubert 	} else {
1293*25039b37SCy Schubert 		/* unauthenticated, the verify peer flag was not set
1294*25039b37SCy Schubert 		 * in ssl when the ssl object was created from ssl_ctx */
1295*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1296*25039b37SCy Schubert 			dtio->ip_str);
1297*25039b37SCy Schubert 	}
1298*25039b37SCy Schubert 	return 1;
1299*25039b37SCy Schubert }
1300*25039b37SCy Schubert #endif /* HAVE_SSL */
1301*25039b37SCy Schubert 
1302*25039b37SCy Schubert #ifdef HAVE_SSL
1303*25039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */
1304*25039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1305*25039b37SCy Schubert 	struct stop_flush_info* info)
1306*25039b37SCy Schubert {
1307*25039b37SCy Schubert 	int r;
1308*25039b37SCy Schubert 	if(dtio->ssl_brief_read) {
1309*25039b37SCy Schubert 		/* assume the brief read condition is satisfied,
1310*25039b37SCy Schubert 		 * if we need more or again, we can set it again */
1311*25039b37SCy Schubert 		if(!dtio_disable_brief_read(dtio)) {
1312*25039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
1313*25039b37SCy Schubert 			return 0;
1314*25039b37SCy Schubert 		}
1315*25039b37SCy Schubert 	}
1316*25039b37SCy Schubert 	if(dtio->ssl_handshake_done)
1317*25039b37SCy Schubert 		return 1;
1318*25039b37SCy Schubert 
1319*25039b37SCy Schubert 	ERR_clear_error();
1320*25039b37SCy Schubert 	r = SSL_do_handshake(dtio->ssl);
1321*25039b37SCy Schubert 	if(r != 1) {
1322*25039b37SCy Schubert 		int want = SSL_get_error(dtio->ssl, r);
1323*25039b37SCy Schubert 		if(want == SSL_ERROR_WANT_READ) {
1324*25039b37SCy Schubert 			/* we want to read on the connection */
1325*25039b37SCy Schubert 			if(!dtio_enable_brief_read(dtio)) {
1326*25039b37SCy Schubert 				if(info) dtio_stop_flush_exit(info);
1327*25039b37SCy Schubert 				return 0;
1328*25039b37SCy Schubert 			}
1329*25039b37SCy Schubert 			return 0;
1330*25039b37SCy Schubert 		} else if(want == SSL_ERROR_WANT_WRITE) {
1331*25039b37SCy Schubert 			/* we want to write on the connection */
1332*25039b37SCy Schubert 			return 0;
1333*25039b37SCy Schubert 		} else if(r == 0) {
1334*25039b37SCy Schubert 			/* closed */
1335*25039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
1336*25039b37SCy Schubert 			dtio_del_output_event(dtio);
1337*25039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1338*25039b37SCy Schubert 			dtio_close_output(dtio);
1339*25039b37SCy Schubert 			return 0;
1340*25039b37SCy Schubert 		} else if(want == SSL_ERROR_SYSCALL) {
1341*25039b37SCy Schubert 			/* SYSCALL and errno==0 means closed uncleanly */
1342*25039b37SCy Schubert 			int silent = 0;
1343*25039b37SCy Schubert #ifdef EPIPE
1344*25039b37SCy Schubert 			if(errno == EPIPE && verbosity < 2)
1345*25039b37SCy Schubert 				silent = 1; /* silence 'broken pipe' */
1346*25039b37SCy Schubert #endif
1347*25039b37SCy Schubert #ifdef ECONNRESET
1348*25039b37SCy Schubert 			if(errno == ECONNRESET && verbosity < 2)
1349*25039b37SCy Schubert 				silent = 1; /* silence reset by peer */
1350*25039b37SCy Schubert #endif
1351*25039b37SCy Schubert 			if(errno == 0)
1352*25039b37SCy Schubert 				silent = 1;
1353*25039b37SCy Schubert 			if(!silent)
1354*25039b37SCy Schubert 				log_err("dnstap io, SSL_handshake syscall: %s",
1355*25039b37SCy Schubert 					strerror(errno));
1356*25039b37SCy Schubert 			/* closed */
1357*25039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
1358*25039b37SCy Schubert 			dtio_del_output_event(dtio);
1359*25039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1360*25039b37SCy Schubert 			dtio_close_output(dtio);
1361*25039b37SCy Schubert 			return 0;
1362*25039b37SCy Schubert 		} else {
1363*25039b37SCy Schubert 			unsigned long err = ERR_get_error();
1364*25039b37SCy Schubert 			if(!squelch_err_ssl_handshake(err)) {
1365*25039b37SCy Schubert 				log_crypto_err_code("dnstap io, ssl handshake failed",
1366*25039b37SCy Schubert 					err);
1367*25039b37SCy Schubert 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1368*25039b37SCy Schubert 					"from %s", dtio->ip_str);
1369*25039b37SCy Schubert 			}
1370*25039b37SCy Schubert 			/* closed */
1371*25039b37SCy Schubert 			if(info) dtio_stop_flush_exit(info);
1372*25039b37SCy Schubert 			dtio_del_output_event(dtio);
1373*25039b37SCy Schubert 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1374*25039b37SCy Schubert 			dtio_close_output(dtio);
1375*25039b37SCy Schubert 			return 0;
1376*25039b37SCy Schubert 		}
1377*25039b37SCy Schubert 
1378*25039b37SCy Schubert 	}
1379*25039b37SCy Schubert 	/* check peer verification */
1380*25039b37SCy Schubert 	dtio->ssl_handshake_done = 1;
1381*25039b37SCy Schubert 
1382*25039b37SCy Schubert 	if(!dtio_ssl_check_peer(dtio)) {
1383*25039b37SCy Schubert 		/* closed */
1384*25039b37SCy Schubert 		if(info) dtio_stop_flush_exit(info);
1385*25039b37SCy Schubert 		dtio_del_output_event(dtio);
1386*25039b37SCy Schubert 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1387*25039b37SCy Schubert 		dtio_close_output(dtio);
1388*25039b37SCy Schubert 		return 0;
1389*25039b37SCy Schubert 	}
1390*25039b37SCy Schubert 	return 1;
1391*25039b37SCy Schubert }
1392*25039b37SCy Schubert #endif /* HAVE_SSL */
1393*25039b37SCy Schubert 
1394*25039b37SCy Schubert /** callback for the dnstap events, to write to the output */
1395*25039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1396*25039b37SCy Schubert {
1397*25039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1398*25039b37SCy Schubert 	int i;
1399*25039b37SCy Schubert 
1400*25039b37SCy Schubert 	if(dtio->check_nb_connect) {
1401*25039b37SCy Schubert 		int connect_err = dtio_check_nb_connect(dtio);
1402*25039b37SCy Schubert 		if(connect_err == -1) {
1403*25039b37SCy Schubert 			/* close the channel */
1404*25039b37SCy Schubert 			dtio_del_output_event(dtio);
1405*25039b37SCy Schubert 			dtio_close_output(dtio);
1406*25039b37SCy Schubert 			return;
1407*25039b37SCy Schubert 		} else if(connect_err == 0) {
1408*25039b37SCy Schubert 			/* try again later */
1409*25039b37SCy Schubert 			return;
1410*25039b37SCy Schubert 		}
1411*25039b37SCy Schubert 		/* nonblocking connect check passed, continue */
1412*25039b37SCy Schubert 	}
1413*25039b37SCy Schubert 
1414*25039b37SCy Schubert #ifdef HAVE_SSL
1415*25039b37SCy Schubert 	if(dtio->ssl &&
1416*25039b37SCy Schubert 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1417*25039b37SCy Schubert 		if(!dtio_ssl_handshake(dtio, NULL))
1418*25039b37SCy Schubert 			return;
1419*25039b37SCy Schubert 	}
1420*25039b37SCy Schubert #endif
1421*25039b37SCy Schubert 
1422*25039b37SCy Schubert 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1423*25039b37SCy Schubert 		if(dtio->ssl_brief_write)
1424*25039b37SCy Schubert 			(void)dtio_disable_brief_write(dtio);
1425*25039b37SCy Schubert 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1426*25039b37SCy Schubert 			if(dtio_read_accept_frame(dtio) <= 0)
1427*25039b37SCy Schubert 				return;
1428*25039b37SCy Schubert 		} else if(!dtio_check_close(dtio))
1429*25039b37SCy Schubert 			return;
1430*25039b37SCy Schubert 	}
1431*25039b37SCy Schubert 
1432*25039b37SCy Schubert 	/* loop to process a number of messages.  This improves throughput,
1433*25039b37SCy Schubert 	 * because selecting on write-event if not needed for busy messages
1434*25039b37SCy Schubert 	 * (dnstap log) generation and if they need to all be written back.
1435*25039b37SCy Schubert 	 * The write event is usually not blocked up.  But not forever,
1436*25039b37SCy Schubert 	 * because the event loop needs to stay responsive for other events.
1437*25039b37SCy Schubert 	 * If there are no (more) messages, or if the output buffers get
1438*25039b37SCy Schubert 	 * full, it returns out of the loop. */
1439*25039b37SCy Schubert 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1440*25039b37SCy Schubert 		/* see if there are messages that need writing */
1441*25039b37SCy Schubert 		if(!dtio->cur_msg) {
1442*25039b37SCy Schubert 			if(!dtio_find_msg(dtio)) {
1443*25039b37SCy Schubert 				if(i == 0) {
1444*25039b37SCy Schubert 					/* no messages on the first iteration,
1445*25039b37SCy Schubert 					 * the queues are all empty */
1446*25039b37SCy Schubert 					dtio_sleep(dtio);
1447*25039b37SCy Schubert 				}
1448*25039b37SCy Schubert 				return; /* nothing to do */
1449*25039b37SCy Schubert 			}
1450*25039b37SCy Schubert 		}
1451*25039b37SCy Schubert 
1452*25039b37SCy Schubert 		/* write it */
1453*25039b37SCy Schubert 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1454*25039b37SCy Schubert 			if(!dtio_write_more(dtio))
1455*25039b37SCy Schubert 				return;
1456*25039b37SCy Schubert 		}
1457*25039b37SCy Schubert 
1458*25039b37SCy Schubert 		/* done with the current message */
1459*25039b37SCy Schubert 		dtio_cur_msg_free(dtio);
1460*25039b37SCy Schubert 
1461*25039b37SCy Schubert 		/* If this is a bidirectional stream the first message will be
1462*25039b37SCy Schubert 		 * the READY control frame. We can only continue writing after
1463*25039b37SCy Schubert 		 * receiving an ACCEPT control frame. */
1464*25039b37SCy Schubert 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1465*25039b37SCy Schubert 			dtio->ready_frame_sent = 1;
1466*25039b37SCy Schubert 			(void)dtio_add_output_event_read(dtio);
1467*25039b37SCy Schubert 			break;
1468*25039b37SCy Schubert 		}
1469*25039b37SCy Schubert 	}
1470*25039b37SCy Schubert }
1471*25039b37SCy Schubert 
1472*25039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */
1473*25039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1474*25039b37SCy Schubert {
1475*25039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1476*25039b37SCy Schubert 	uint8_t cmd;
1477*25039b37SCy Schubert 	ssize_t r;
1478*25039b37SCy Schubert 	if(dtio->want_to_exit)
1479*25039b37SCy Schubert 		return;
1480*25039b37SCy Schubert 	r = read(fd, &cmd, sizeof(cmd));
1481*25039b37SCy Schubert 	if(r == -1) {
1482*25039b37SCy Schubert #ifndef USE_WINSOCK
1483*25039b37SCy Schubert 		if(errno == EINTR || errno == EAGAIN)
1484*25039b37SCy Schubert 			return; /* ignore this */
1485*25039b37SCy Schubert 		log_err("dnstap io: failed to read: %s", strerror(errno));
1486*25039b37SCy Schubert #else
1487*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS)
1488*25039b37SCy Schubert 			return;
1489*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1490*25039b37SCy Schubert 			return;
1491*25039b37SCy Schubert 		log_err("dnstap io: failed to read: %s",
1492*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
1493*25039b37SCy Schubert #endif
1494*25039b37SCy Schubert 		/* and then fall through to quit the thread */
1495*25039b37SCy Schubert 	} else if(r == 0) {
1496*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1497*25039b37SCy Schubert 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1498*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1499*25039b37SCy Schubert 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1500*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1501*25039b37SCy Schubert 
1502*25039b37SCy Schubert 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1503*25039b37SCy Schubert 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1504*25039b37SCy Schubert 				"waiting for ACCEPT control frame");
1505*25039b37SCy Schubert 			return;
1506*25039b37SCy Schubert 		}
1507*25039b37SCy Schubert 
1508*25039b37SCy Schubert 		/* reregister event */
1509*25039b37SCy Schubert 		if(!dtio_add_output_event_write(dtio))
1510*25039b37SCy Schubert 			return;
1511*25039b37SCy Schubert 		return;
1512*25039b37SCy Schubert 	} else if(r == 1) {
1513*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1514*25039b37SCy Schubert 	}
1515*25039b37SCy Schubert 	dtio->want_to_exit = 1;
1516*25039b37SCy Schubert 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1517*25039b37SCy Schubert 		!= 0) {
1518*25039b37SCy Schubert 		log_err("dnstap io: could not loopexit");
1519*25039b37SCy Schubert 	}
1520*25039b37SCy Schubert }
1521*25039b37SCy Schubert 
1522*25039b37SCy Schubert #ifndef THREADS_DISABLED
1523*25039b37SCy Schubert /** setup the event base for the dnstap io thread */
1524*25039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1525*25039b37SCy Schubert 	struct timeval* now)
1526*25039b37SCy Schubert {
1527*25039b37SCy Schubert 	memset(now, 0, sizeof(*now));
1528*25039b37SCy Schubert 	dtio->event_base = ub_default_event_base(0, secs, now);
1529*25039b37SCy Schubert 	if(!dtio->event_base) {
1530*25039b37SCy Schubert 		fatal_exit("dnstap io: could not create event_base");
1531*25039b37SCy Schubert 	}
1532*25039b37SCy Schubert }
1533*25039b37SCy Schubert #endif /* THREADS_DISABLED */
1534*25039b37SCy Schubert 
1535*25039b37SCy Schubert /** setup the cmd event for dnstap io */
1536*25039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio)
1537*25039b37SCy Schubert {
1538*25039b37SCy Schubert 	struct ub_event* cmdev;
1539*25039b37SCy Schubert 	fd_set_nonblock(dtio->commandpipe[0]);
1540*25039b37SCy Schubert 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1541*25039b37SCy Schubert 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1542*25039b37SCy Schubert 	if(!cmdev) {
1543*25039b37SCy Schubert 		fatal_exit("dnstap io: out of memory");
1544*25039b37SCy Schubert 	}
1545*25039b37SCy Schubert 	dtio->command_event = cmdev;
1546*25039b37SCy Schubert 	if(ub_event_add(cmdev, NULL) != 0) {
1547*25039b37SCy Schubert 		fatal_exit("dnstap io: out of memory (adding event)");
1548*25039b37SCy Schubert 	}
1549*25039b37SCy Schubert }
1550*25039b37SCy Schubert 
1551*25039b37SCy Schubert /** setup the reconnect event for dnstap io */
1552*25039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1553*25039b37SCy Schubert {
1554*25039b37SCy Schubert 	dtio_reconnect_clear(dtio);
1555*25039b37SCy Schubert 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1556*25039b37SCy Schubert 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1557*25039b37SCy Schubert 	if(!dtio->reconnect_timer) {
1558*25039b37SCy Schubert 		fatal_exit("dnstap io: out of memory");
1559*25039b37SCy Schubert 	}
1560*25039b37SCy Schubert }
1561*25039b37SCy Schubert 
1562*25039b37SCy Schubert /**
1563*25039b37SCy Schubert  * structure to keep track of information during stop flush
1564*25039b37SCy Schubert  */
1565*25039b37SCy Schubert struct stop_flush_info {
1566*25039b37SCy Schubert 	/** the event base during stop flush */
1567*25039b37SCy Schubert 	struct ub_event_base* base;
1568*25039b37SCy Schubert 	/** did we already want to exit this stop-flush event base */
1569*25039b37SCy Schubert 	int want_to_exit_flush;
1570*25039b37SCy Schubert 	/** has the timer fired */
1571*25039b37SCy Schubert 	int timer_done;
1572*25039b37SCy Schubert 	/** the dtio */
1573*25039b37SCy Schubert 	struct dt_io_thread* dtio;
1574*25039b37SCy Schubert 	/** the stop control frame */
1575*25039b37SCy Schubert 	void* stop_frame;
1576*25039b37SCy Schubert 	/** length of the stop frame */
1577*25039b37SCy Schubert 	size_t stop_frame_len;
1578*25039b37SCy Schubert 	/** how much we have done of the stop frame */
1579*25039b37SCy Schubert 	size_t stop_frame_done;
1580*25039b37SCy Schubert };
1581*25039b37SCy Schubert 
1582*25039b37SCy Schubert /** exit the stop flush base */
1583*25039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info)
1584*25039b37SCy Schubert {
1585*25039b37SCy Schubert 	if(info->want_to_exit_flush)
1586*25039b37SCy Schubert 		return;
1587*25039b37SCy Schubert 	info->want_to_exit_flush = 1;
1588*25039b37SCy Schubert 	if(ub_event_base_loopexit(info->base) != 0) {
1589*25039b37SCy Schubert 		log_err("dnstap io: could not loopexit");
1590*25039b37SCy Schubert 	}
1591*25039b37SCy Schubert }
1592*25039b37SCy Schubert 
1593*25039b37SCy Schubert /** send the stop control,
1594*25039b37SCy Schubert  * return true if completed the frame. */
1595*25039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info)
1596*25039b37SCy Schubert {
1597*25039b37SCy Schubert 	struct dt_io_thread* dtio = info->dtio;
1598*25039b37SCy Schubert 	int r;
1599*25039b37SCy Schubert 	if(info->stop_frame_done >= info->stop_frame_len)
1600*25039b37SCy Schubert 		return 1;
1601*25039b37SCy Schubert 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1602*25039b37SCy Schubert 		info->stop_frame_done, info->stop_frame_len -
1603*25039b37SCy Schubert 		info->stop_frame_done);
1604*25039b37SCy Schubert 	if(r == -1) {
1605*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1606*25039b37SCy Schubert 		dtio_stop_flush_exit(info);
1607*25039b37SCy Schubert 		return 0;
1608*25039b37SCy Schubert 	}
1609*25039b37SCy Schubert 	if(r == 0) {
1610*25039b37SCy Schubert 		/* try again later, or timeout */
1611*25039b37SCy Schubert 		return 0;
1612*25039b37SCy Schubert 	}
1613*25039b37SCy Schubert 	info->stop_frame_done += r;
1614*25039b37SCy Schubert 	if(info->stop_frame_done < info->stop_frame_len)
1615*25039b37SCy Schubert 		return 0; /* not done yet */
1616*25039b37SCy Schubert 	return 1;
1617*25039b37SCy Schubert }
1618*25039b37SCy Schubert 
1619*25039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1620*25039b37SCy Schubert 	void* arg)
1621*25039b37SCy Schubert {
1622*25039b37SCy Schubert 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1623*25039b37SCy Schubert 	if(info->want_to_exit_flush)
1624*25039b37SCy Schubert 		return;
1625*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1626*25039b37SCy Schubert 	info->timer_done = 1;
1627*25039b37SCy Schubert 	dtio_stop_flush_exit(info);
1628*25039b37SCy Schubert }
1629*25039b37SCy Schubert 
1630*25039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1631*25039b37SCy Schubert {
1632*25039b37SCy Schubert 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1633*25039b37SCy Schubert 	struct dt_io_thread* dtio = info->dtio;
1634*25039b37SCy Schubert 	if(info->want_to_exit_flush)
1635*25039b37SCy Schubert 		return;
1636*25039b37SCy Schubert 	if(dtio->check_nb_connect) {
1637*25039b37SCy Schubert 		/* we don't start the stop_flush if connect still
1638*25039b37SCy Schubert 		 * in progress, but the check code is here, just in case */
1639*25039b37SCy Schubert 		int connect_err = dtio_check_nb_connect(dtio);
1640*25039b37SCy Schubert 		if(connect_err == -1) {
1641*25039b37SCy Schubert 			/* close the channel, exit the stop flush */
1642*25039b37SCy Schubert 			dtio_stop_flush_exit(info);
1643*25039b37SCy Schubert 			dtio_del_output_event(dtio);
1644*25039b37SCy Schubert 			dtio_close_output(dtio);
1645*25039b37SCy Schubert 			return;
1646*25039b37SCy Schubert 		} else if(connect_err == 0) {
1647*25039b37SCy Schubert 			/* try again later */
1648*25039b37SCy Schubert 			return;
1649*25039b37SCy Schubert 		}
1650*25039b37SCy Schubert 		/* nonblocking connect check passed, continue */
1651*25039b37SCy Schubert 	}
1652*25039b37SCy Schubert #ifdef HAVE_SSL
1653*25039b37SCy Schubert 	if(dtio->ssl &&
1654*25039b37SCy Schubert 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1655*25039b37SCy Schubert 		if(!dtio_ssl_handshake(dtio, info))
1656*25039b37SCy Schubert 			return;
1657*25039b37SCy Schubert 	}
1658*25039b37SCy Schubert #endif
1659*25039b37SCy Schubert 
1660*25039b37SCy Schubert 	if((bits&UB_EV_READ)) {
1661*25039b37SCy Schubert 		if(!dtio_check_close(dtio)) {
1662*25039b37SCy Schubert 			if(dtio->fd == -1) {
1663*25039b37SCy Schubert 				verbose(VERB_ALGO, "dnstap io: "
1664*25039b37SCy Schubert 					"stop flush: output closed");
1665*25039b37SCy Schubert 				dtio_stop_flush_exit(info);
1666*25039b37SCy Schubert 			}
1667*25039b37SCy Schubert 			return;
1668*25039b37SCy Schubert 		}
1669*25039b37SCy Schubert 	}
1670*25039b37SCy Schubert 	/* write remainder of last frame */
1671*25039b37SCy Schubert 	if(dtio->cur_msg) {
1672*25039b37SCy Schubert 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1673*25039b37SCy Schubert 			if(!dtio_write_more(dtio)) {
1674*25039b37SCy Schubert 				if(dtio->fd == -1) {
1675*25039b37SCy Schubert 					verbose(VERB_ALGO, "dnstap io: "
1676*25039b37SCy Schubert 						"stop flush: output closed");
1677*25039b37SCy Schubert 					dtio_stop_flush_exit(info);
1678*25039b37SCy Schubert 				}
1679*25039b37SCy Schubert 				return;
1680*25039b37SCy Schubert 			}
1681*25039b37SCy Schubert 		}
1682*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1683*25039b37SCy Schubert 			"last frame");
1684*25039b37SCy Schubert 		dtio_cur_msg_free(dtio);
1685*25039b37SCy Schubert 	}
1686*25039b37SCy Schubert 	/* write stop frame */
1687*25039b37SCy Schubert 	if(info->stop_frame_done < info->stop_frame_len) {
1688*25039b37SCy Schubert 		if(!dtio_control_stop_send(info))
1689*25039b37SCy Schubert 			return;
1690*25039b37SCy Schubert 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1691*25039b37SCy Schubert 			"stop control frame");
1692*25039b37SCy Schubert 	}
1693*25039b37SCy Schubert 	/* when last frame and stop frame are sent, exit */
1694*25039b37SCy Schubert 	dtio_stop_flush_exit(info);
1695*25039b37SCy Schubert }
1696*25039b37SCy Schubert 
1697*25039b37SCy Schubert /** flush at end, last packet and stop control */
1698*25039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1699*25039b37SCy Schubert {
1700*25039b37SCy Schubert 	/* briefly attempt to flush the previous packet to the output,
1701*25039b37SCy Schubert 	 * this could be a partial packet, or even the start control frame */
1702*25039b37SCy Schubert 	time_t secs = 0;
1703*25039b37SCy Schubert 	struct timeval now;
1704*25039b37SCy Schubert 	struct stop_flush_info info;
1705*25039b37SCy Schubert 	struct timeval tv;
1706*25039b37SCy Schubert 	struct ub_event* timer, *stopev;
1707*25039b37SCy Schubert 
1708*25039b37SCy Schubert 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1709*25039b37SCy Schubert 		/* no connection or we have just connected, so nothing is
1710*25039b37SCy Schubert 		 * sent yet, so nothing to stop or flush */
1711*25039b37SCy Schubert 		return;
1712*25039b37SCy Schubert 	}
1713*25039b37SCy Schubert 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1714*25039b37SCy Schubert 		/* no SSL connection has been established yet */
1715*25039b37SCy Schubert 		return;
1716*25039b37SCy Schubert 	}
1717*25039b37SCy Schubert 
1718*25039b37SCy Schubert 	memset(&info, 0, sizeof(info));
1719*25039b37SCy Schubert 	memset(&now, 0, sizeof(now));
1720*25039b37SCy Schubert 	info.dtio = dtio;
1721*25039b37SCy Schubert 	info.base = ub_default_event_base(0, &secs, &now);
1722*25039b37SCy Schubert 	if(!info.base) {
1723*25039b37SCy Schubert 		log_err("dnstap io: malloc failure");
1724*25039b37SCy Schubert 		return;
1725*25039b37SCy Schubert 	}
1726*25039b37SCy Schubert 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1727*25039b37SCy Schubert 		&dtio_stop_timer_cb, &info);
1728*25039b37SCy Schubert 	if(!timer) {
1729*25039b37SCy Schubert 		log_err("dnstap io: malloc failure");
1730*25039b37SCy Schubert 		ub_event_base_free(info.base);
1731*25039b37SCy Schubert 		return;
1732*25039b37SCy Schubert 	}
1733*25039b37SCy Schubert 	memset(&tv, 0, sizeof(tv));
1734*25039b37SCy Schubert 	tv.tv_sec = 2;
1735*25039b37SCy Schubert 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1736*25039b37SCy Schubert 		&tv) != 0) {
1737*25039b37SCy Schubert 		log_err("dnstap io: cannot event_timer_add");
1738*25039b37SCy Schubert 		ub_event_free(timer);
1739*25039b37SCy Schubert 		ub_event_base_free(info.base);
1740*25039b37SCy Schubert 		return;
1741*25039b37SCy Schubert 	}
1742*25039b37SCy Schubert 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1743*25039b37SCy Schubert 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1744*25039b37SCy Schubert 	if(!stopev) {
1745*25039b37SCy Schubert 		log_err("dnstap io: malloc failure");
1746*25039b37SCy Schubert 		ub_timer_del(timer);
1747*25039b37SCy Schubert 		ub_event_free(timer);
1748*25039b37SCy Schubert 		ub_event_base_free(info.base);
1749*25039b37SCy Schubert 		return;
1750*25039b37SCy Schubert 	}
1751*25039b37SCy Schubert 	if(ub_event_add(stopev, NULL) != 0) {
1752*25039b37SCy Schubert 		log_err("dnstap io: cannot event_add");
1753*25039b37SCy Schubert 		ub_event_free(stopev);
1754*25039b37SCy Schubert 		ub_timer_del(timer);
1755*25039b37SCy Schubert 		ub_event_free(timer);
1756*25039b37SCy Schubert 		ub_event_base_free(info.base);
1757*25039b37SCy Schubert 		return;
1758*25039b37SCy Schubert 	}
1759*25039b37SCy Schubert 	info.stop_frame = fstrm_create_control_frame_stop(
1760*25039b37SCy Schubert 		&info.stop_frame_len);
1761*25039b37SCy Schubert 	if(!info.stop_frame) {
1762*25039b37SCy Schubert 		log_err("dnstap io: malloc failure");
1763*25039b37SCy Schubert 		ub_event_del(stopev);
1764*25039b37SCy Schubert 		ub_event_free(stopev);
1765*25039b37SCy Schubert 		ub_timer_del(timer);
1766*25039b37SCy Schubert 		ub_event_free(timer);
1767*25039b37SCy Schubert 		ub_event_base_free(info.base);
1768*25039b37SCy Schubert 		return;
1769*25039b37SCy Schubert 	}
1770*25039b37SCy Schubert 	dtio->stop_flush_event = stopev;
1771*25039b37SCy Schubert 
1772*25039b37SCy Schubert 	/* wait briefly, or until finished */
1773*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1774*25039b37SCy Schubert 	if(ub_event_base_dispatch(info.base) < 0) {
1775*25039b37SCy Schubert 		log_err("dnstap io: dispatch flush failed, errno is %s",
1776*25039b37SCy Schubert 			strerror(errno));
1777*25039b37SCy Schubert 	}
1778*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1779*25039b37SCy Schubert 	free(info.stop_frame);
1780*25039b37SCy Schubert 	dtio->stop_flush_event = NULL;
1781*25039b37SCy Schubert 	ub_event_del(stopev);
1782*25039b37SCy Schubert 	ub_event_free(stopev);
1783*25039b37SCy Schubert 	ub_timer_del(timer);
1784*25039b37SCy Schubert 	ub_event_free(timer);
1785*25039b37SCy Schubert 	ub_event_base_free(info.base);
1786*25039b37SCy Schubert }
1787*25039b37SCy Schubert 
1788*25039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */
1789*25039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio)
1790*25039b37SCy Schubert {
1791*25039b37SCy Schubert 	dtio_control_stop_flush(dtio);
1792*25039b37SCy Schubert 	dtio_del_output_event(dtio);
1793*25039b37SCy Schubert 	dtio_close_output(dtio);
1794*25039b37SCy Schubert 	ub_event_del(dtio->command_event);
1795*25039b37SCy Schubert 	ub_event_free(dtio->command_event);
1796*25039b37SCy Schubert #ifndef USE_WINSOCK
1797*25039b37SCy Schubert 	close(dtio->commandpipe[0]);
1798*25039b37SCy Schubert #else
1799*25039b37SCy Schubert 	_close(dtio->commandpipe[0]);
1800*25039b37SCy Schubert #endif
1801*25039b37SCy Schubert 	dtio->commandpipe[0] = -1;
1802*25039b37SCy Schubert 	dtio_reconnect_del(dtio);
1803*25039b37SCy Schubert 	ub_event_free(dtio->reconnect_timer);
1804*25039b37SCy Schubert 	dtio_cur_msg_free(dtio);
1805*25039b37SCy Schubert #ifndef THREADS_DISABLED
1806*25039b37SCy Schubert 	ub_event_base_free(dtio->event_base);
1807*25039b37SCy Schubert #endif
1808*25039b37SCy Schubert }
1809*25039b37SCy Schubert 
1810*25039b37SCy Schubert /** setup a start control message */
1811*25039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio)
1812*25039b37SCy Schubert {
1813*25039b37SCy Schubert 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1814*25039b37SCy Schubert 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1815*25039b37SCy Schubert 		&dtio->cur_msg_len);
1816*25039b37SCy Schubert 	if(!dtio->cur_msg) {
1817*25039b37SCy Schubert 		return 0;
1818*25039b37SCy Schubert 	}
1819*25039b37SCy Schubert 	/* setup to send the control message */
1820*25039b37SCy Schubert 	/* set that the buffer needs to be sent, but the length
1821*25039b37SCy Schubert 	 * of that buffer is already written, that way the buffer can
1822*25039b37SCy Schubert 	 * start with 0 length and then the length of the control frame
1823*25039b37SCy Schubert 	 * in it */
1824*25039b37SCy Schubert 	dtio->cur_msg_done = 0;
1825*25039b37SCy Schubert 	dtio->cur_msg_len_done = 4;
1826*25039b37SCy Schubert 	return 1;
1827*25039b37SCy Schubert }
1828*25039b37SCy Schubert 
1829*25039b37SCy Schubert /** setup a ready control message */
1830*25039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio)
1831*25039b37SCy Schubert {
1832*25039b37SCy Schubert 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1833*25039b37SCy Schubert 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1834*25039b37SCy Schubert 		&dtio->cur_msg_len);
1835*25039b37SCy Schubert 	if(!dtio->cur_msg) {
1836*25039b37SCy Schubert 		return 0;
1837*25039b37SCy Schubert 	}
1838*25039b37SCy Schubert 	/* setup to send the control message */
1839*25039b37SCy Schubert 	/* set that the buffer needs to be sent, but the length
1840*25039b37SCy Schubert 	 * of that buffer is already written, that way the buffer can
1841*25039b37SCy Schubert 	 * start with 0 length and then the length of the control frame
1842*25039b37SCy Schubert 	 * in it */
1843*25039b37SCy Schubert 	dtio->cur_msg_done = 0;
1844*25039b37SCy Schubert 	dtio->cur_msg_len_done = 4;
1845*25039b37SCy Schubert 	return 1;
1846*25039b37SCy Schubert }
1847*25039b37SCy Schubert 
1848*25039b37SCy Schubert /** open the output file descriptor for af_local */
1849*25039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio)
1850*25039b37SCy Schubert {
1851*25039b37SCy Schubert #ifdef HAVE_SYS_UN_H
1852*25039b37SCy Schubert 	struct sockaddr_un s;
1853*25039b37SCy Schubert 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1854*25039b37SCy Schubert 	if(dtio->fd == -1) {
1855*25039b37SCy Schubert #ifndef USE_WINSOCK
1856*25039b37SCy Schubert 		log_err("dnstap io: failed to create socket: %s",
1857*25039b37SCy Schubert 			strerror(errno));
1858*25039b37SCy Schubert #else
1859*25039b37SCy Schubert 		log_err("dnstap io: failed to create socket: %s",
1860*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
1861*25039b37SCy Schubert #endif
1862*25039b37SCy Schubert 		return 0;
1863*25039b37SCy Schubert 	}
1864*25039b37SCy Schubert 	memset(&s, 0, sizeof(s));
1865*25039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1866*25039b37SCy Schubert         /* this member exists on BSDs, not Linux */
1867*25039b37SCy Schubert         s.sun_len = (unsigned)sizeof(s);
1868*25039b37SCy Schubert #endif
1869*25039b37SCy Schubert 	s.sun_family = AF_LOCAL;
1870*25039b37SCy Schubert 	/* length is 92-108, 104 on FreeBSD */
1871*25039b37SCy Schubert         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1872*25039b37SCy Schubert 	fd_set_nonblock(dtio->fd);
1873*25039b37SCy Schubert 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1874*25039b37SCy Schubert 		== -1) {
1875*25039b37SCy Schubert 		char* to = dtio->socket_path;
1876*25039b37SCy Schubert #ifndef USE_WINSOCK
1877*25039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
1878*25039b37SCy Schubert 			to, strerror(errno));
1879*25039b37SCy Schubert #else
1880*25039b37SCy Schubert 		log_err("dnstap io: failed to connect to \"%s\": %s",
1881*25039b37SCy Schubert 			to, wsa_strerror(WSAGetLastError()));
1882*25039b37SCy Schubert #endif
1883*25039b37SCy Schubert 		dtio_close_fd(dtio);
1884*25039b37SCy Schubert 		return 0;
1885*25039b37SCy Schubert 	}
1886*25039b37SCy Schubert 	return 1;
1887*25039b37SCy Schubert #else
1888*25039b37SCy Schubert 	log_err("cannot create af_local socket");
1889*25039b37SCy Schubert 	return 0;
1890*25039b37SCy Schubert #endif /* HAVE_SYS_UN_H */
1891*25039b37SCy Schubert }
1892*25039b37SCy Schubert 
1893*25039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */
1894*25039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1895*25039b37SCy Schubert {
1896*25039b37SCy Schubert 	struct sockaddr_storage addr;
1897*25039b37SCy Schubert 	socklen_t addrlen;
1898*25039b37SCy Schubert 	memset(&addr, 0, sizeof(addr));
1899*25039b37SCy Schubert 	addrlen = (socklen_t)sizeof(addr);
1900*25039b37SCy Schubert 
1901*25039b37SCy Schubert 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1902*25039b37SCy Schubert 		log_err("could not parse IP '%s'", dtio->ip_str);
1903*25039b37SCy Schubert 		return 0;
1904*25039b37SCy Schubert 	}
1905*25039b37SCy Schubert 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1906*25039b37SCy Schubert 	if(dtio->fd == -1) {
1907*25039b37SCy Schubert #ifndef USE_WINSOCK
1908*25039b37SCy Schubert 		log_err("can't create socket: %s", strerror(errno));
1909*25039b37SCy Schubert #else
1910*25039b37SCy Schubert 		log_err("can't create socket: %s",
1911*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
1912*25039b37SCy Schubert #endif
1913*25039b37SCy Schubert 		return 0;
1914*25039b37SCy Schubert 	}
1915*25039b37SCy Schubert 	fd_set_nonblock(dtio->fd);
1916*25039b37SCy Schubert 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1917*25039b37SCy Schubert 		if(errno == EINPROGRESS)
1918*25039b37SCy Schubert 			return 1; /* wait until connect done*/
1919*25039b37SCy Schubert #ifndef USE_WINSOCK
1920*25039b37SCy Schubert 		if(tcp_connect_errno_needs_log(
1921*25039b37SCy Schubert 			(struct sockaddr *)&addr, addrlen)) {
1922*25039b37SCy Schubert 			log_err("dnstap io: failed to connect to %s: %s",
1923*25039b37SCy Schubert 				dtio->ip_str, strerror(errno));
1924*25039b37SCy Schubert 		}
1925*25039b37SCy Schubert #else
1926*25039b37SCy Schubert 		if(WSAGetLastError() == WSAEINPROGRESS ||
1927*25039b37SCy Schubert 			WSAGetLastError() == WSAEWOULDBLOCK)
1928*25039b37SCy Schubert 			return 1; /* wait until connect done*/
1929*25039b37SCy Schubert 		if(tcp_connect_errno_needs_log(
1930*25039b37SCy Schubert 			(struct sockaddr *)&addr, addrlen)) {
1931*25039b37SCy Schubert 			log_err("dnstap io: failed to connect to %s: %s",
1932*25039b37SCy Schubert 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1933*25039b37SCy Schubert 		}
1934*25039b37SCy Schubert #endif
1935*25039b37SCy Schubert 		dtio_close_fd(dtio);
1936*25039b37SCy Schubert 		return 0;
1937*25039b37SCy Schubert 	}
1938*25039b37SCy Schubert 	return 1;
1939*25039b37SCy Schubert }
1940*25039b37SCy Schubert 
1941*25039b37SCy Schubert /** setup the SSL structure for new connection */
1942*25039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio)
1943*25039b37SCy Schubert {
1944*25039b37SCy Schubert 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1945*25039b37SCy Schubert 	if(!dtio->ssl) return 0;
1946*25039b37SCy Schubert 	dtio->ssl_handshake_done = 0;
1947*25039b37SCy Schubert 	dtio->ssl_brief_read = 0;
1948*25039b37SCy Schubert 
1949*25039b37SCy Schubert 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
1950*25039b37SCy Schubert 		dtio->tls_use_sni)) {
1951*25039b37SCy Schubert 		return 0;
1952*25039b37SCy Schubert 	}
1953*25039b37SCy Schubert 	return 1;
1954*25039b37SCy Schubert }
1955*25039b37SCy Schubert 
1956*25039b37SCy Schubert /** open the output file descriptor */
1957*25039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio)
1958*25039b37SCy Schubert {
1959*25039b37SCy Schubert 	struct ub_event* ev;
1960*25039b37SCy Schubert 	if(dtio->upstream_is_unix) {
1961*25039b37SCy Schubert 		if(!dtio_open_output_local(dtio)) {
1962*25039b37SCy Schubert 			dtio_reconnect_enable(dtio);
1963*25039b37SCy Schubert 			return;
1964*25039b37SCy Schubert 		}
1965*25039b37SCy Schubert 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
1966*25039b37SCy Schubert 		if(!dtio_open_output_tcp(dtio)) {
1967*25039b37SCy Schubert 			dtio_reconnect_enable(dtio);
1968*25039b37SCy Schubert 			return;
1969*25039b37SCy Schubert 		}
1970*25039b37SCy Schubert 		if(dtio->upstream_is_tls) {
1971*25039b37SCy Schubert 			if(!dtio_setup_ssl(dtio)) {
1972*25039b37SCy Schubert 				dtio_close_fd(dtio);
1973*25039b37SCy Schubert 				dtio_reconnect_enable(dtio);
1974*25039b37SCy Schubert 				return;
1975*25039b37SCy Schubert 			}
1976*25039b37SCy Schubert 		}
1977*25039b37SCy Schubert 	}
1978*25039b37SCy Schubert 	dtio->check_nb_connect = 1;
1979*25039b37SCy Schubert 
1980*25039b37SCy Schubert 	/* the EV_READ is to read ACCEPT control messages, and catch channel
1981*25039b37SCy Schubert 	 * close. EV_WRITE is to write packets */
1982*25039b37SCy Schubert 	ev = ub_event_new(dtio->event_base, dtio->fd,
1983*25039b37SCy Schubert 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
1984*25039b37SCy Schubert 		dtio);
1985*25039b37SCy Schubert 	if(!ev) {
1986*25039b37SCy Schubert 		log_err("dnstap io: out of memory");
1987*25039b37SCy Schubert 		if(dtio->ssl) {
1988*25039b37SCy Schubert #ifdef HAVE_SSL
1989*25039b37SCy Schubert 			SSL_free(dtio->ssl);
1990*25039b37SCy Schubert 			dtio->ssl = NULL;
1991*25039b37SCy Schubert #endif
1992*25039b37SCy Schubert 		}
1993*25039b37SCy Schubert 		dtio_close_fd(dtio);
1994*25039b37SCy Schubert 		dtio_reconnect_enable(dtio);
1995*25039b37SCy Schubert 		return;
1996*25039b37SCy Schubert 	}
1997*25039b37SCy Schubert 	dtio->event = ev;
1998*25039b37SCy Schubert 
1999*25039b37SCy Schubert 	/* setup protocol control message to start */
2000*25039b37SCy Schubert 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2001*25039b37SCy Schubert 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2002*25039b37SCy Schubert 		log_err("dnstap io: out of memory");
2003*25039b37SCy Schubert 		ub_event_free(dtio->event);
2004*25039b37SCy Schubert 		dtio->event = NULL;
2005*25039b37SCy Schubert 		if(dtio->ssl) {
2006*25039b37SCy Schubert #ifdef HAVE_SSL
2007*25039b37SCy Schubert 			SSL_free(dtio->ssl);
2008*25039b37SCy Schubert 			dtio->ssl = NULL;
2009*25039b37SCy Schubert #endif
2010*25039b37SCy Schubert 		}
2011*25039b37SCy Schubert 		dtio_close_fd(dtio);
2012*25039b37SCy Schubert 		dtio_reconnect_enable(dtio);
2013*25039b37SCy Schubert 		return;
2014*25039b37SCy Schubert 	}
2015*25039b37SCy Schubert }
2016*25039b37SCy Schubert 
2017*25039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */
2018*25039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio)
2019*25039b37SCy Schubert {
2020*25039b37SCy Schubert 	dtio_setup_cmd(dtio);
2021*25039b37SCy Schubert 	dtio_setup_reconnect(dtio);
2022*25039b37SCy Schubert 	dtio_open_output(dtio);
2023*25039b37SCy Schubert 	if(!dtio_add_output_event_write(dtio))
2024*25039b37SCy Schubert 		return;
2025*25039b37SCy Schubert }
2026*25039b37SCy Schubert 
2027*25039b37SCy Schubert #ifndef THREADS_DISABLED
2028*25039b37SCy Schubert /** the IO thread function for the DNSTAP IO */
2029*25039b37SCy Schubert static void* dnstap_io(void* arg)
2030*25039b37SCy Schubert {
2031*25039b37SCy Schubert 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2032*25039b37SCy Schubert 	time_t secs = 0;
2033*25039b37SCy Schubert 	struct timeval now;
2034*25039b37SCy Schubert 	log_thread_set(&dtio->threadnum);
2035*25039b37SCy Schubert 
2036*25039b37SCy Schubert 	/* setup */
2037*25039b37SCy Schubert 	verbose(VERB_ALGO, "start dnstap io thread");
2038*25039b37SCy Schubert 	dtio_setup_base(dtio, &secs, &now);
2039*25039b37SCy Schubert 	dtio_setup_on_base(dtio);
2040*25039b37SCy Schubert 
2041*25039b37SCy Schubert 	/* run */
2042*25039b37SCy Schubert 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2043*25039b37SCy Schubert 		log_err("dnstap io: dispatch failed, errno is %s",
2044*25039b37SCy Schubert 			strerror(errno));
2045*25039b37SCy Schubert 	}
2046*25039b37SCy Schubert 
2047*25039b37SCy Schubert 	/* cleanup */
2048*25039b37SCy Schubert 	verbose(VERB_ALGO, "stop dnstap io thread");
2049*25039b37SCy Schubert 	dtio_desetup(dtio);
2050*25039b37SCy Schubert 	return NULL;
2051*25039b37SCy Schubert }
2052*25039b37SCy Schubert #endif /* THREADS_DISABLED */
2053*25039b37SCy Schubert 
2054*25039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2055*25039b37SCy Schubert 	int numworkers)
2056*25039b37SCy Schubert {
2057*25039b37SCy Schubert 	/* set up the thread, can fail */
2058*25039b37SCy Schubert #ifndef USE_WINSOCK
2059*25039b37SCy Schubert 	if(pipe(dtio->commandpipe) == -1) {
2060*25039b37SCy Schubert 		log_err("failed to create pipe: %s", strerror(errno));
2061*25039b37SCy Schubert 		return 0;
2062*25039b37SCy Schubert 	}
2063*25039b37SCy Schubert #else
2064*25039b37SCy Schubert 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2065*25039b37SCy Schubert 		log_err("failed to create _pipe: %s",
2066*25039b37SCy Schubert 			wsa_strerror(WSAGetLastError()));
2067*25039b37SCy Schubert 		return 0;
2068*25039b37SCy Schubert 	}
2069*25039b37SCy Schubert #endif
2070*25039b37SCy Schubert 
2071*25039b37SCy Schubert 	/* start the thread */
2072*25039b37SCy Schubert 	dtio->threadnum = numworkers+1;
2073*25039b37SCy Schubert 	dtio->started = 1;
2074*25039b37SCy Schubert #ifndef THREADS_DISABLED
2075*25039b37SCy Schubert 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2076*25039b37SCy Schubert 	(void)event_base_nothr;
2077*25039b37SCy Schubert #else
2078*25039b37SCy Schubert 	dtio->event_base = event_base_nothr;
2079*25039b37SCy Schubert 	dtio_setup_on_base(dtio);
2080*25039b37SCy Schubert #endif
2081*25039b37SCy Schubert 	return 1;
2082*25039b37SCy Schubert }
2083*25039b37SCy Schubert 
2084*25039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio)
2085*25039b37SCy Schubert {
2086*25039b37SCy Schubert #ifndef THREADS_DISABLED
2087*25039b37SCy Schubert 	uint8_t cmd = DTIO_COMMAND_STOP;
2088*25039b37SCy Schubert #endif
2089*25039b37SCy Schubert 	if(!dtio) return;
2090*25039b37SCy Schubert 	if(!dtio->started) return;
2091*25039b37SCy Schubert 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2092*25039b37SCy Schubert 
2093*25039b37SCy Schubert #ifndef THREADS_DISABLED
2094*25039b37SCy Schubert 	while(1) {
2095*25039b37SCy Schubert 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2096*25039b37SCy Schubert 		if(r == -1) {
2097*25039b37SCy Schubert #ifndef USE_WINSOCK
2098*25039b37SCy Schubert 			if(errno == EINTR || errno == EAGAIN)
2099*25039b37SCy Schubert 				continue;
2100*25039b37SCy Schubert 			log_err("dnstap io stop: write: %s", strerror(errno));
2101*25039b37SCy Schubert #else
2102*25039b37SCy Schubert 			if(WSAGetLastError() == WSAEINPROGRESS)
2103*25039b37SCy Schubert 				continue;
2104*25039b37SCy Schubert 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2105*25039b37SCy Schubert 				continue;
2106*25039b37SCy Schubert 			log_err("dnstap io stop: write: %s",
2107*25039b37SCy Schubert 				wsa_strerror(WSAGetLastError()));
2108*25039b37SCy Schubert #endif
2109*25039b37SCy Schubert 			break;
2110*25039b37SCy Schubert 		}
2111*25039b37SCy Schubert 		break;
2112*25039b37SCy Schubert 	}
2113*25039b37SCy Schubert 	dtio->started = 0;
2114*25039b37SCy Schubert #endif /* THREADS_DISABLED */
2115*25039b37SCy Schubert 
2116*25039b37SCy Schubert #ifndef USE_WINSOCK
2117*25039b37SCy Schubert 	close(dtio->commandpipe[1]);
2118*25039b37SCy Schubert #else
2119*25039b37SCy Schubert 	_close(dtio->commandpipe[1]);
2120*25039b37SCy Schubert #endif
2121*25039b37SCy Schubert 	dtio->commandpipe[1] = -1;
2122*25039b37SCy Schubert #ifndef THREADS_DISABLED
2123*25039b37SCy Schubert 	ub_thread_join(dtio->tid);
2124*25039b37SCy Schubert #else
2125*25039b37SCy Schubert 	dtio->want_to_exit = 1;
2126*25039b37SCy Schubert 	dtio_desetup(dtio);
2127*25039b37SCy Schubert #endif
2128*25039b37SCy Schubert }
2129