1 /*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37 /**
38 * \file
39 *
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
42 */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 /** DTIO command channel stop */
81 DTIO_COMMAND_STOP = 0,
82 /** DTIO command channel wakeup */
83 DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102
103 struct dt_msg_queue*
dt_msg_queue_create(struct comm_base * base)104 dt_msg_queue_create(struct comm_base* base)
105 {
106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 if(!mq) return NULL;
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 about 1 M should contain 64K messages with some overhead,
110 or a whole bunch smaller ones */
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 if(!mq->wakeup_timer) {
113 free(mq);
114 return NULL;
115 }
116 lock_basic_init(&mq->lock);
117 lock_protect(&mq->lock, mq, sizeof(*mq));
118 return mq;
119 }
120
121 /** clear the message list, caller must hold the lock */
122 static void
dt_msg_queue_clear(struct dt_msg_queue * mq)123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 struct dt_msg_entry* e = mq->first, *next=NULL;
126 while(e) {
127 next = e->next;
128 free(e->buf);
129 free(e);
130 e = next;
131 }
132 mq->first = NULL;
133 mq->last = NULL;
134 mq->cursize = 0;
135 mq->msgcount = 0;
136 }
137
138 void
dt_msg_queue_delete(struct dt_msg_queue * mq)139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 if(!mq) return;
142 lock_basic_destroy(&mq->lock);
143 dt_msg_queue_clear(mq);
144 comm_timer_delete(mq->wakeup_timer);
145 free(mq);
146 }
147
148 /** make the dtio wake up by sending a wakeup command */
dtio_wakeup(struct dt_io_thread * dtio)149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 if(!dtio) return;
153 if(!dtio->started) return;
154
155 while(1) {
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 if(r == -1) {
158 #ifndef USE_WINSOCK
159 if(errno == EINTR || errno == EAGAIN)
160 continue;
161 #else
162 if(WSAGetLastError() == WSAEINPROGRESS)
163 continue;
164 if(WSAGetLastError() == WSAEWOULDBLOCK)
165 continue;
166 #endif
167 log_err("dnstap io wakeup: write: %s",
168 sock_strerror(errno));
169 break;
170 }
171 break;
172 }
173 }
174
175 void
mq_wakeup_cb(void * arg)176 mq_wakeup_cb(void* arg)
177 {
178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179
180 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
181 mq->dtio->wakeup_timer_enabled = 0;
182 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
183 dtio_wakeup(mq->dtio);
184 }
185
186 /** start timer to wakeup dtio because there is content in the queue */
187 static void
dt_msg_queue_start_timer(struct dt_msg_queue * mq,int wakeupnow)188 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
189 {
190 struct timeval tv = {0};
191 /* Start a timer to process messages to be logged.
192 * If we woke up the dtio thread for every message, the wakeup
193 * messages take up too much processing power. If the queue
194 * fills up the wakeup happens immediately. The timer wakes it up
195 * if there are infrequent messages to log. */
196
197 /* we cannot start a timer in dtio thread, because it is a different
198 * thread and its event base is in use by the other thread, it would
199 * give race conditions if we tried to modify its event base,
200 * and locks would wait until it woke up, and this is what we do. */
201
202 /* do not start the timer if a timer already exists, perhaps
203 * in another worker. So this variable is protected by a lock in
204 * dtio. */
205
206 /* If we need to wakeupnow, 0 the timer to force the callback. */
207 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
208 if(mq->dtio->wakeup_timer_enabled) {
209 if(wakeupnow) {
210 tv.tv_sec = 0;
211 tv.tv_usec = 0;
212 comm_timer_set(mq->wakeup_timer, &tv);
213 }
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215 return;
216 }
217 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
218
219 /* start the timer, in mq, in the event base of our worker */
220 if(!wakeupnow) {
221 tv.tv_sec = 1;
222 tv.tv_usec = 0;
223 /* If it is already set, keep it running. */
224 if(!comm_timer_is_set(mq->wakeup_timer))
225 comm_timer_set(mq->wakeup_timer, &tv);
226 } else {
227 tv.tv_sec = 0;
228 tv.tv_usec = 0;
229 comm_timer_set(mq->wakeup_timer, &tv);
230 }
231 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
232 }
233
234 void
dt_msg_queue_submit(struct dt_msg_queue * mq,void * buf,size_t len)235 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
236 {
237 int wakeupnow = 0, wakeupstarttimer = 0;
238 struct dt_msg_entry* entry;
239
240 /* check conditions */
241 if(!buf) return;
242 if(len == 0) {
243 /* it is not possible to log entries with zero length,
244 * because the framestream protocol does not carry it.
245 * However the protobuf serialization does not create zero
246 * length datagrams for dnstap, so this should not happen. */
247 free(buf);
248 return;
249 }
250 if(!mq) {
251 free(buf);
252 return;
253 }
254
255 /* allocate memory for queue entry */
256 entry = malloc(sizeof(*entry));
257 if(!entry) {
258 log_err("out of memory logging dnstap");
259 free(buf);
260 return;
261 }
262 entry->next = NULL;
263 entry->buf = buf;
264 entry->len = len;
265
266 /* acquire lock */
267 lock_basic_lock(&mq->lock);
268 /* if list was empty, start timer for (eventual) wakeup,
269 * or if dtio is not writing now an eventual wakeup is needed. */
270 if(mq->first == NULL || !mq->dtio->event_added_is_write)
271 wakeupstarttimer = 1;
272 /* if list contains more than wakeupnum elements, wakeup now,
273 * or if list is (going to be) almost full */
274 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
275 (mq->cursize < mq->maxsize * 9 / 10 &&
276 mq->cursize+len >= mq->maxsize * 9 / 10))
277 wakeupnow = 1;
278 /* see if it is going to fit */
279 if(mq->cursize + len > mq->maxsize) {
280 /* buffer full, or congested. */
281 /* drop */
282 lock_basic_unlock(&mq->lock);
283 free(buf);
284 free(entry);
285 return;
286 }
287 mq->cursize += len;
288 mq->msgcount ++;
289 /* append to list */
290 if(mq->last) {
291 mq->last->next = entry;
292 } else {
293 mq->first = entry;
294 }
295 mq->last = entry;
296 /* release lock */
297 lock_basic_unlock(&mq->lock);
298
299 if(wakeupnow || wakeupstarttimer) {
300 dt_msg_queue_start_timer(mq, wakeupnow);
301 }
302 }
303
dt_io_thread_create(void)304 struct dt_io_thread* dt_io_thread_create(void)
305 {
306 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
307 lock_basic_init(&dtio->wakeup_timer_lock);
308 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
309 sizeof(dtio->wakeup_timer_enabled));
310 return dtio;
311 }
312
dt_io_thread_delete(struct dt_io_thread * dtio)313 void dt_io_thread_delete(struct dt_io_thread* dtio)
314 {
315 struct dt_io_list_item* item, *nextitem;
316 if(!dtio) return;
317 lock_basic_destroy(&dtio->wakeup_timer_lock);
318 item=dtio->io_list;
319 while(item) {
320 nextitem = item->next;
321 free(item);
322 item = nextitem;
323 }
324 free(dtio->socket_path);
325 free(dtio->ip_str);
326 free(dtio->tls_server_name);
327 free(dtio->client_key_file);
328 free(dtio->client_cert_file);
329 if(dtio->ssl_ctx) {
330 #ifdef HAVE_SSL
331 SSL_CTX_free(dtio->ssl_ctx);
332 #endif
333 }
334 free(dtio);
335 }
336
dt_io_thread_apply_cfg(struct dt_io_thread * dtio,struct config_file * cfg)337 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
338 {
339 if(!cfg->dnstap) {
340 log_warn("cannot setup dnstap because dnstap-enable is no");
341 return 0;
342 }
343
344 /* what type of connectivity do we have */
345 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
346 if(cfg->dnstap_tls)
347 dtio->upstream_is_tls = 1;
348 else dtio->upstream_is_tcp = 1;
349 } else {
350 dtio->upstream_is_unix = 1;
351 }
352 dtio->is_bidirectional = cfg->dnstap_bidirectional;
353
354 if(dtio->upstream_is_unix) {
355 char* nm;
356 if(!cfg->dnstap_socket_path ||
357 cfg->dnstap_socket_path[0]==0) {
358 log_err("dnstap setup: no dnstap-socket-path for "
359 "socket connect");
360 return 0;
361 }
362 nm = cfg->dnstap_socket_path;
363 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
364 cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
365 nm += strlen(cfg->chrootdir);
366 free(dtio->socket_path);
367 dtio->socket_path = strdup(nm);
368 if(!dtio->socket_path) {
369 log_err("dnstap setup: malloc failure");
370 return 0;
371 }
372 }
373
374 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
375 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
376 log_err("dnstap setup: no dnstap-ip for TCP connect");
377 return 0;
378 }
379 free(dtio->ip_str);
380 dtio->ip_str = strdup(cfg->dnstap_ip);
381 if(!dtio->ip_str) {
382 log_err("dnstap setup: malloc failure");
383 return 0;
384 }
385 }
386
387 if(dtio->upstream_is_tls) {
388 #ifdef HAVE_SSL
389 if(cfg->dnstap_tls_server_name &&
390 cfg->dnstap_tls_server_name[0]) {
391 free(dtio->tls_server_name);
392 dtio->tls_server_name = strdup(
393 cfg->dnstap_tls_server_name);
394 if(!dtio->tls_server_name) {
395 log_err("dnstap setup: malloc failure");
396 return 0;
397 }
398 if(!check_auth_name_for_ssl(dtio->tls_server_name))
399 return 0;
400 }
401 if(cfg->dnstap_tls_client_key_file &&
402 cfg->dnstap_tls_client_key_file[0]) {
403 dtio->use_client_certs = 1;
404 free(dtio->client_key_file);
405 dtio->client_key_file = strdup(
406 cfg->dnstap_tls_client_key_file);
407 if(!dtio->client_key_file) {
408 log_err("dnstap setup: malloc failure");
409 return 0;
410 }
411 if(!cfg->dnstap_tls_client_cert_file ||
412 cfg->dnstap_tls_client_cert_file[0]==0) {
413 log_err("dnstap setup: client key "
414 "authentication enabled with "
415 "dnstap-tls-client-key-file, but "
416 "no dnstap-tls-client-cert-file "
417 "is given");
418 return 0;
419 }
420 free(dtio->client_cert_file);
421 dtio->client_cert_file = strdup(
422 cfg->dnstap_tls_client_cert_file);
423 if(!dtio->client_cert_file) {
424 log_err("dnstap setup: malloc failure");
425 return 0;
426 }
427 } else {
428 dtio->use_client_certs = 0;
429 dtio->client_key_file = NULL;
430 dtio->client_cert_file = NULL;
431 }
432
433 if(cfg->dnstap_tls_cert_bundle) {
434 dtio->ssl_ctx = connect_sslctx_create(
435 dtio->client_key_file,
436 dtio->client_cert_file,
437 cfg->dnstap_tls_cert_bundle, 0);
438 } else {
439 dtio->ssl_ctx = connect_sslctx_create(
440 dtio->client_key_file,
441 dtio->client_cert_file,
442 cfg->tls_cert_bundle, cfg->tls_win_cert);
443 }
444 if(!dtio->ssl_ctx) {
445 log_err("could not setup SSL CTX");
446 return 0;
447 }
448 dtio->tls_use_sni = cfg->tls_use_sni;
449 #endif /* HAVE_SSL */
450 }
451 #ifdef HAVE_GETTID
452 dtio->thread_tid_log = cfg->log_thread_id;
453 #endif
454 return 1;
455 }
456
dt_io_thread_register_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)457 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
458 struct dt_msg_queue* mq)
459 {
460 struct dt_io_list_item* item = malloc(sizeof(*item));
461 if(!item) return 0;
462 lock_basic_lock(&mq->lock);
463 mq->dtio = dtio;
464 lock_basic_unlock(&mq->lock);
465 item->queue = mq;
466 item->next = dtio->io_list;
467 dtio->io_list = item;
468 dtio->io_list_iter = NULL;
469 return 1;
470 }
471
dt_io_thread_unregister_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)472 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
473 struct dt_msg_queue* mq)
474 {
475 struct dt_io_list_item* item, *prev=NULL;
476 if(!dtio) return;
477 item = dtio->io_list;
478 while(item) {
479 if(item->queue == mq) {
480 /* found it */
481 if(prev) prev->next = item->next;
482 else dtio->io_list = item->next;
483 /* the queue itself only registered, not deleted */
484 lock_basic_lock(&item->queue->lock);
485 item->queue->dtio = NULL;
486 lock_basic_unlock(&item->queue->lock);
487 free(item);
488 dtio->io_list_iter = NULL;
489 return;
490 }
491 prev = item;
492 item = item->next;
493 }
494 }
495
496 /** pick a message from the queue, the routine locks and unlocks,
497 * returns true if there is a message */
dt_msg_queue_pop(struct dt_msg_queue * mq,void ** buf,size_t * len)498 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
499 size_t* len)
500 {
501 lock_basic_lock(&mq->lock);
502 if(mq->first) {
503 struct dt_msg_entry* entry = mq->first;
504 mq->first = entry->next;
505 if(!entry->next) mq->last = NULL;
506 mq->cursize -= entry->len;
507 mq->msgcount --;
508 lock_basic_unlock(&mq->lock);
509
510 *buf = entry->buf;
511 *len = entry->len;
512 free(entry);
513 return 1;
514 }
515 lock_basic_unlock(&mq->lock);
516 return 0;
517 }
518
519 /** find message in queue, false if no message, true if message to send */
dtio_find_in_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)520 static int dtio_find_in_queue(struct dt_io_thread* dtio,
521 struct dt_msg_queue* mq)
522 {
523 void* buf=NULL;
524 size_t len=0;
525 if(dt_msg_queue_pop(mq, &buf, &len)) {
526 dtio->cur_msg = buf;
527 dtio->cur_msg_len = len;
528 dtio->cur_msg_done = 0;
529 dtio->cur_msg_len_done = 0;
530 return 1;
531 }
532 return 0;
533 }
534
535 /** find a new message to write, search message queues, false if none */
dtio_find_msg(struct dt_io_thread * dtio)536 static int dtio_find_msg(struct dt_io_thread* dtio)
537 {
538 struct dt_io_list_item *spot, *item;
539
540 spot = dtio->io_list_iter;
541 /* use the next queue for the next message lookup,
542 * if we hit the end(NULL) the NULL restarts the iter at start. */
543 if(spot)
544 dtio->io_list_iter = spot->next;
545 else if(dtio->io_list)
546 dtio->io_list_iter = dtio->io_list->next;
547
548 /* scan from spot to end-of-io_list */
549 item = spot;
550 while(item) {
551 if(dtio_find_in_queue(dtio, item->queue))
552 return 1;
553 item = item->next;
554 }
555 /* scan starting at the start-of-list (to wrap around the end) */
556 item = dtio->io_list;
557 while(item) {
558 if(dtio_find_in_queue(dtio, item->queue))
559 return 1;
560 item = item->next;
561 }
562 return 0;
563 }
564
565 /** callback for the dnstap reconnect, to start reconnecting to output */
dtio_reconnect_timeout_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)566 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
567 short ATTR_UNUSED(bits), void* arg)
568 {
569 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
570 dtio->reconnect_is_added = 0;
571 verbose(VERB_ALGO, "dnstap io: reconnect timer");
572
573 dtio_open_output(dtio);
574 if(dtio->event) {
575 if(!dtio_add_output_event_write(dtio))
576 return;
577 /* nothing wrong so far, wait on the output event */
578 return;
579 }
580 /* exponential backoff and retry on timer */
581 dtio_reconnect_enable(dtio);
582 }
583
584 /** attempt to reconnect to the output, after a timeout */
dtio_reconnect_enable(struct dt_io_thread * dtio)585 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
586 {
587 struct timeval tv;
588 int msec;
589 if(dtio->want_to_exit) return;
590 if(dtio->reconnect_is_added)
591 return; /* already done */
592
593 /* exponential backoff, store the value for next timeout */
594 msec = dtio->reconnect_timeout;
595 if(msec == 0) {
596 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
597 } else {
598 dtio->reconnect_timeout = msec*2;
599 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
600 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
601 }
602 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
603 msec);
604
605 /* setup wait timer */
606 memset(&tv, 0, sizeof(tv));
607 tv.tv_sec = msec/1000;
608 tv.tv_usec = (msec%1000)*1000;
609 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
610 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
611 log_err("dnstap io: could not reconnect ev timer add");
612 return;
613 }
614 dtio->reconnect_is_added = 1;
615 }
616
617 /** remove dtio reconnect timer */
dtio_reconnect_del(struct dt_io_thread * dtio)618 static void dtio_reconnect_del(struct dt_io_thread* dtio)
619 {
620 if(!dtio->reconnect_is_added)
621 return;
622 ub_timer_del(dtio->reconnect_timer);
623 dtio->reconnect_is_added = 0;
624 }
625
626 /** clear the reconnect exponential backoff timer.
627 * We have successfully connected so we can try again with short timeouts. */
dtio_reconnect_clear(struct dt_io_thread * dtio)628 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
629 {
630 dtio->reconnect_timeout = 0;
631 dtio_reconnect_del(dtio);
632 }
633
634 /** reconnect slowly, because we already know we have to wait for a bit */
dtio_reconnect_slow(struct dt_io_thread * dtio,int msec)635 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
636 {
637 dtio_reconnect_del(dtio);
638 dtio->reconnect_timeout = msec;
639 dtio_reconnect_enable(dtio);
640 }
641
642 /** delete the current message in the dtio, and reset counters */
dtio_cur_msg_free(struct dt_io_thread * dtio)643 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
644 {
645 free(dtio->cur_msg);
646 dtio->cur_msg = NULL;
647 dtio->cur_msg_len = 0;
648 dtio->cur_msg_done = 0;
649 dtio->cur_msg_len_done = 0;
650 }
651
652 /** delete the buffer and counters used to read frame */
dtio_read_frame_free(struct dt_frame_read_buf * rb)653 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
654 {
655 if(rb->buf) {
656 free(rb->buf);
657 rb->buf = NULL;
658 }
659 rb->buf_count = 0;
660 rb->buf_cap = 0;
661 rb->frame_len = 0;
662 rb->frame_len_done = 0;
663 rb->control_frame = 0;
664 }
665
666 /** del the output file descriptor event for listening */
dtio_del_output_event(struct dt_io_thread * dtio)667 static void dtio_del_output_event(struct dt_io_thread* dtio)
668 {
669 if(!dtio->event_added)
670 return;
671 ub_event_del(dtio->event);
672 dtio->event_added = 0;
673 dtio->event_added_is_write = 0;
674 }
675
676 /** close dtio socket and set it to -1 */
dtio_close_fd(struct dt_io_thread * dtio)677 static void dtio_close_fd(struct dt_io_thread* dtio)
678 {
679 sock_close(dtio->fd);
680 dtio->fd = -1;
681 }
682
683 /** close and stop the output file descriptor event */
dtio_close_output(struct dt_io_thread * dtio)684 static void dtio_close_output(struct dt_io_thread* dtio)
685 {
686 if(!dtio->event)
687 return;
688 ub_event_free(dtio->event);
689 dtio->event = NULL;
690 if(dtio->ssl) {
691 #ifdef HAVE_SSL
692 SSL_shutdown(dtio->ssl);
693 SSL_free(dtio->ssl);
694 dtio->ssl = NULL;
695 #endif
696 }
697 dtio_close_fd(dtio);
698
699 /* if there is a (partial) message, discard it
700 * we cannot send (the remainder of) it, and a new
701 * connection needs to start with a control frame. */
702 if(dtio->cur_msg) {
703 dtio_cur_msg_free(dtio);
704 }
705
706 dtio->ready_frame_sent = 0;
707 dtio->accept_frame_received = 0;
708 dtio_read_frame_free(&dtio->read_frame);
709
710 dtio_reconnect_enable(dtio);
711 }
712
713 /** check for pending nonblocking connect errors,
714 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
dtio_check_nb_connect(struct dt_io_thread * dtio)715 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
716 {
717 int error = 0;
718 socklen_t len = (socklen_t)sizeof(error);
719 if(!dtio->check_nb_connect)
720 return 1; /* everything okay */
721 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
722 &len) < 0) {
723 #ifndef USE_WINSOCK
724 error = errno; /* on solaris errno is error */
725 #else
726 error = WSAGetLastError();
727 #endif
728 }
729 #ifndef USE_WINSOCK
730 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
731 if(error == EINPROGRESS || error == EWOULDBLOCK)
732 return 0; /* try again later */
733 #endif
734 #else
735 if(error == WSAEINPROGRESS) {
736 return 0; /* try again later */
737 } else if(error == WSAEWOULDBLOCK) {
738 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
739 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
740 return 0; /* try again later */
741 }
742 #endif
743 if(error != 0) {
744 char* to = dtio->socket_path;
745 if(!to) to = dtio->ip_str;
746 if(!to) to = "";
747 log_err("dnstap io: failed to connect to \"%s\": %s",
748 to, sock_strerror(error));
749 return -1; /* error, close it */
750 }
751
752 if(dtio->ip_str)
753 verbose(VERB_DETAIL, "dnstap io: connected to %s",
754 dtio->ip_str);
755 else if(dtio->socket_path)
756 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
757 dtio->socket_path);
758 dtio_reconnect_clear(dtio);
759 dtio->check_nb_connect = 0;
760 return 1; /* everything okay */
761 }
762
763 #ifdef HAVE_SSL
764 /** write to ssl output
765 * returns number of bytes written, 0 if nothing happened,
766 * try again later, or -1 if the channel is to be closed. */
dtio_write_ssl(struct dt_io_thread * dtio,uint8_t * buf,size_t len)767 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
768 size_t len)
769 {
770 int r;
771 ERR_clear_error();
772 r = SSL_write(dtio->ssl, buf, len);
773 if(r <= 0) {
774 int want = SSL_get_error(dtio->ssl, r);
775 if(want == SSL_ERROR_ZERO_RETURN) {
776 /* closed */
777 return -1;
778 } else if(want == SSL_ERROR_WANT_READ) {
779 /* we want a brief read event */
780 dtio_enable_brief_read(dtio);
781 return 0;
782 } else if(want == SSL_ERROR_WANT_WRITE) {
783 /* write again later */
784 return 0;
785 } else if(want == SSL_ERROR_SYSCALL) {
786 #ifdef EPIPE
787 if(errno == EPIPE && verbosity < 2)
788 return -1; /* silence 'broken pipe' */
789 #endif
790 #ifdef ECONNRESET
791 if(errno == ECONNRESET && verbosity < 2)
792 return -1; /* silence reset by peer */
793 #endif
794 if(errno != 0) {
795 log_err("dnstap io, SSL_write syscall: %s",
796 strerror(errno));
797 }
798 return -1;
799 }
800 log_crypto_err_io("dnstap io, could not SSL_write", want);
801 return -1;
802 }
803 return r;
804 }
805 #endif /* HAVE_SSL */
806
807 /** write buffer to output.
808 * returns number of bytes written, 0 if nothing happened,
809 * try again later, or -1 if the channel is to be closed. */
dtio_write_buf(struct dt_io_thread * dtio,uint8_t * buf,size_t len)810 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
811 size_t len)
812 {
813 ssize_t ret;
814 if(dtio->fd == -1)
815 return -1;
816 #ifdef HAVE_SSL
817 if(dtio->ssl)
818 return dtio_write_ssl(dtio, buf, len);
819 #endif
820 ret = send(dtio->fd, (void*)buf, len, 0);
821 if(ret == -1) {
822 #ifndef USE_WINSOCK
823 if(errno == EINTR || errno == EAGAIN)
824 return 0;
825 #else
826 if(WSAGetLastError() == WSAEINPROGRESS)
827 return 0;
828 if(WSAGetLastError() == WSAEWOULDBLOCK) {
829 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
830 dtio->stop_flush_event:dtio->event),
831 UB_EV_WRITE);
832 return 0;
833 }
834 #endif
835 log_err("dnstap io: failed send: %s", sock_strerror(errno));
836 return -1;
837 }
838 return ret;
839 }
840
841 #ifdef HAVE_WRITEV
842 /** write with writev, len and message, in one write, if possible.
843 * return true if message is done, false if incomplete */
dtio_write_with_writev(struct dt_io_thread * dtio)844 static int dtio_write_with_writev(struct dt_io_thread* dtio)
845 {
846 uint32_t sendlen = htonl(dtio->cur_msg_len);
847 struct iovec iov[2];
848 ssize_t r;
849 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
850 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
851 iov[1].iov_base = dtio->cur_msg;
852 iov[1].iov_len = dtio->cur_msg_len;
853 log_assert(iov[0].iov_len > 0);
854 r = writev(dtio->fd, iov, 2);
855 if(r == -1) {
856 #ifndef USE_WINSOCK
857 if(errno == EINTR || errno == EAGAIN)
858 return 0;
859 #else
860 if(WSAGetLastError() == WSAEINPROGRESS)
861 return 0;
862 if(WSAGetLastError() == WSAEWOULDBLOCK) {
863 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
864 dtio->stop_flush_event:dtio->event),
865 UB_EV_WRITE);
866 return 0;
867 }
868 #endif
869 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
870 /* close the channel */
871 dtio_del_output_event(dtio);
872 dtio_close_output(dtio);
873 return 0;
874 }
875 /* written r bytes */
876 dtio->cur_msg_len_done += r;
877 if(dtio->cur_msg_len_done < 4)
878 return 0;
879 if(dtio->cur_msg_len_done > 4) {
880 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
881 dtio->cur_msg_len_done = 4;
882 }
883 if(dtio->cur_msg_done < dtio->cur_msg_len)
884 return 0;
885 return 1;
886 }
887 #endif /* HAVE_WRITEV */
888
889 /** write more of the length, preceding the data frame.
890 * return true if message is done, false if incomplete. */
dtio_write_more_of_len(struct dt_io_thread * dtio)891 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
892 {
893 uint32_t sendlen;
894 int r;
895 if(dtio->cur_msg_len_done >= 4)
896 return 1;
897 #ifdef HAVE_WRITEV
898 if(!dtio->ssl) {
899 /* we try writev for everything.*/
900 return dtio_write_with_writev(dtio);
901 }
902 #endif /* HAVE_WRITEV */
903 sendlen = htonl(dtio->cur_msg_len);
904 r = dtio_write_buf(dtio,
905 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
906 sizeof(sendlen)-dtio->cur_msg_len_done);
907 if(r == -1) {
908 /* close the channel */
909 dtio_del_output_event(dtio);
910 dtio_close_output(dtio);
911 return 0;
912 } else if(r == 0) {
913 /* try again later */
914 return 0;
915 }
916 dtio->cur_msg_len_done += r;
917 if(dtio->cur_msg_len_done < 4)
918 return 0;
919 return 1;
920 }
921
922 /** write more of the data frame.
923 * return true if message is done, false if incomplete. */
dtio_write_more_of_data(struct dt_io_thread * dtio)924 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
925 {
926 int r;
927 if(dtio->cur_msg_done >= dtio->cur_msg_len)
928 return 1;
929 r = dtio_write_buf(dtio,
930 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
931 dtio->cur_msg_len - dtio->cur_msg_done);
932 if(r == -1) {
933 /* close the channel */
934 dtio_del_output_event(dtio);
935 dtio_close_output(dtio);
936 return 0;
937 } else if(r == 0) {
938 /* try again later */
939 return 0;
940 }
941 dtio->cur_msg_done += r;
942 if(dtio->cur_msg_done < dtio->cur_msg_len)
943 return 0;
944 return 1;
945 }
946
947 /** write more of the current message. false if incomplete, true if
948 * the message is done */
dtio_write_more(struct dt_io_thread * dtio)949 static int dtio_write_more(struct dt_io_thread* dtio)
950 {
951 if(dtio->cur_msg_len_done < 4) {
952 if(!dtio_write_more_of_len(dtio))
953 return 0;
954 }
955 if(dtio->cur_msg_done < dtio->cur_msg_len) {
956 if(!dtio_write_more_of_data(dtio))
957 return 0;
958 }
959 return 1;
960 }
961
962 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
963 * -1: continue, >0: number of bytes read into buffer */
receive_bytes(struct dt_io_thread * dtio,void * buf,size_t len)964 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
965 ssize_t r;
966 r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
967 if(r == -1) {
968 char* to = dtio->socket_path;
969 if(!to) to = dtio->ip_str;
970 if(!to) to = "";
971 #ifndef USE_WINSOCK
972 if(errno == EINTR || errno == EAGAIN)
973 return -1; /* try later */
974 #else
975 if(WSAGetLastError() == WSAEINPROGRESS) {
976 return -1; /* try later */
977 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
978 ub_winsock_tcp_wouldblock(
979 (dtio->stop_flush_event?
980 dtio->stop_flush_event:dtio->event),
981 UB_EV_READ);
982 return -1; /* try later */
983 }
984 #endif
985 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986 verbosity < 4)
987 return 0; /* no log retries on low verbosity */
988 log_err("dnstap io: output closed, recv %s: %s", to,
989 strerror(errno));
990 /* and close below */
991 return 0;
992 }
993 if(r == 0) {
994 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
995 verbosity < 4)
996 return 0; /* no log retries on low verbosity */
997 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
998 /* and close below */
999 return 0;
1000 }
1001 /* something was received */
1002 return r;
1003 }
1004
1005 #ifdef HAVE_SSL
1006 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
1007 * -1: continue, >0: number of bytes read into buffer */
ssl_read_bytes(struct dt_io_thread * dtio,void * buf,size_t len)1008 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1009 {
1010 int r;
1011 ERR_clear_error();
1012 r = SSL_read(dtio->ssl, buf, len);
1013 if(r <= 0) {
1014 int want = SSL_get_error(dtio->ssl, r);
1015 if(want == SSL_ERROR_ZERO_RETURN) {
1016 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017 verbosity < 4)
1018 return 0; /* no log retries on low verbosity */
1019 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1020 "other side");
1021 return 0;
1022 } else if(want == SSL_ERROR_WANT_READ) {
1023 /* continue later */
1024 return -1;
1025 } else if(want == SSL_ERROR_WANT_WRITE) {
1026 (void)dtio_enable_brief_write(dtio);
1027 return -1;
1028 } else if(want == SSL_ERROR_SYSCALL) {
1029 #ifdef ECONNRESET
1030 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1031 errno == ECONNRESET && verbosity < 4)
1032 return 0; /* silence reset by peer */
1033 #endif
1034 if(errno != 0)
1035 log_err("SSL_read syscall: %s",
1036 strerror(errno));
1037 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1038 "other side");
1039 return 0;
1040 }
1041 log_crypto_err_io("could not SSL_read", want);
1042 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1043 "other side");
1044 return 0;
1045 }
1046 return r;
1047 }
1048 #endif /* HAVE_SSL */
1049
1050 /** check if the output fd has been closed,
1051 * it returns false if the stream is closed. */
dtio_check_close(struct dt_io_thread * dtio)1052 static int dtio_check_close(struct dt_io_thread* dtio)
1053 {
1054 /* we don't want to read any packets, but if there are we can
1055 * discard the input (ignore it). Ignore of unknown (control)
1056 * packets is okay for the framestream protocol. And also, the
1057 * read call can return that the stream has been closed by the
1058 * other side. */
1059 uint8_t buf[1024];
1060 int r = -1;
1061
1062
1063 if(dtio->fd == -1) return 0;
1064
1065 while(r != 0) {
1066 /* not interested in buffer content, overwrite */
1067 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1068 if(r == -1)
1069 return 1;
1070 }
1071 /* the other end has been closed */
1072 /* close the channel */
1073 dtio_del_output_event(dtio);
1074 dtio_close_output(dtio);
1075 return 0;
1076 }
1077
1078 /** Read accept frame. Returns -1: continue reading, 0: closed,
1079 * 1: valid accept received. */
dtio_read_accept_frame(struct dt_io_thread * dtio)1080 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1081 {
1082 int r;
1083 size_t read_frame_done;
1084 while(dtio->read_frame.frame_len_done < 4) {
1085 #ifdef HAVE_SSL
1086 if(dtio->ssl) {
1087 r = ssl_read_bytes(dtio,
1088 (uint8_t*)&dtio->read_frame.frame_len+
1089 dtio->read_frame.frame_len_done,
1090 4-dtio->read_frame.frame_len_done);
1091 } else {
1092 #endif
1093 r = receive_bytes(dtio,
1094 (uint8_t*)&dtio->read_frame.frame_len+
1095 dtio->read_frame.frame_len_done,
1096 4-dtio->read_frame.frame_len_done);
1097 #ifdef HAVE_SSL
1098 }
1099 #endif
1100 if(r == -1)
1101 return -1; /* continue reading */
1102 if(r == 0) {
1103 /* connection closed */
1104 goto close_connection;
1105 }
1106 dtio->read_frame.frame_len_done += r;
1107 if(dtio->read_frame.frame_len_done < 4)
1108 return -1; /* continue reading */
1109
1110 if(dtio->read_frame.frame_len == 0) {
1111 dtio->read_frame.frame_len_done = 0;
1112 dtio->read_frame.control_frame = 1;
1113 continue;
1114 }
1115 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1116 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1117 verbose(VERB_OPS, "dnstap: received frame exceeds max "
1118 "length of %d bytes, closing connection",
1119 DTIO_RECV_FRAME_MAX_LEN);
1120 goto close_connection;
1121 }
1122 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1123 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1124 if(!dtio->read_frame.buf) {
1125 log_err("dnstap io: out of memory (creating read "
1126 "buffer)");
1127 goto close_connection;
1128 }
1129 }
1130 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1131 #ifdef HAVE_SSL
1132 if(dtio->ssl) {
1133 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1134 dtio->read_frame.buf_count,
1135 dtio->read_frame.buf_cap-
1136 dtio->read_frame.buf_count);
1137 } else {
1138 #endif
1139 r = receive_bytes(dtio, dtio->read_frame.buf+
1140 dtio->read_frame.buf_count,
1141 dtio->read_frame.buf_cap-
1142 dtio->read_frame.buf_count);
1143 #ifdef HAVE_SSL
1144 }
1145 #endif
1146 if(r == -1)
1147 return -1; /* continue reading */
1148 if(r == 0) {
1149 /* connection closed */
1150 goto close_connection;
1151 }
1152 dtio->read_frame.buf_count += r;
1153 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1154 return -1; /* continue reading */
1155 }
1156
1157 /* Complete frame received, check if this is a valid ACCEPT control
1158 * frame. */
1159 if(dtio->read_frame.frame_len < 4) {
1160 verbose(VERB_OPS, "dnstap: invalid data received");
1161 goto close_connection;
1162 }
1163 if(sldns_read_uint32(dtio->read_frame.buf) !=
1164 FSTRM_CONTROL_FRAME_ACCEPT) {
1165 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1166 "ignored");
1167 dtio->ready_frame_sent = 0;
1168 dtio->accept_frame_received = 0;
1169 dtio_read_frame_free(&dtio->read_frame);
1170 return -1;
1171 }
1172 read_frame_done = 4; /* control frame type */
1173
1174 /* Iterate over control fields, ignore unknown types.
1175 * Need to be able to read at least 8 bytes (control field type +
1176 * length). */
1177 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1178 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1179 read_frame_done);
1180 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1181 read_frame_done + 4);
1182 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1183 if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1184 read_frame_done+8+len <=
1185 dtio->read_frame.frame_len &&
1186 memcmp(dtio->read_frame.buf + read_frame_done +
1187 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1188 if(!dtio_control_start_send(dtio)) {
1189 verbose(VERB_OPS, "dnstap io: out of "
1190 "memory while sending START frame");
1191 goto close_connection;
1192 }
1193 dtio->accept_frame_received = 1;
1194 if(!dtio_add_output_event_write(dtio))
1195 goto close_connection;
1196 return 1;
1197 } else {
1198 /* unknown content type */
1199 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1200 "contains unknown content type, "
1201 "closing connection");
1202 goto close_connection;
1203 }
1204 }
1205 /* unknown option, try next */
1206 read_frame_done += 8+len;
1207 }
1208
1209
1210 close_connection:
1211 dtio_del_output_event(dtio);
1212 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1213 dtio_close_output(dtio);
1214 return 0;
1215 }
1216
1217 /** add the output file descriptor event for listening, read only */
dtio_add_output_event_read(struct dt_io_thread * dtio)1218 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1219 {
1220 if(!dtio->event)
1221 return 0;
1222 if(dtio->event_added && !dtio->event_added_is_write)
1223 return 1;
1224 /* we have to (re-)register the event */
1225 if(dtio->event_added)
1226 ub_event_del(dtio->event);
1227 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1228 if(ub_event_add(dtio->event, NULL) != 0) {
1229 log_err("dnstap io: out of memory (adding event)");
1230 dtio->event_added = 0;
1231 dtio->event_added_is_write = 0;
1232 /* close output and start reattempts to open it */
1233 dtio_close_output(dtio);
1234 return 0;
1235 }
1236 dtio->event_added = 1;
1237 dtio->event_added_is_write = 0;
1238 return 1;
1239 }
1240
1241 /** add the output file descriptor event for listening, read and write */
dtio_add_output_event_write(struct dt_io_thread * dtio)1242 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1243 {
1244 if(!dtio->event)
1245 return 0;
1246 if(dtio->event_added && dtio->event_added_is_write)
1247 return 1;
1248 /* we have to (re-)register the event */
1249 if(dtio->event_added)
1250 ub_event_del(dtio->event);
1251 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1252 if(ub_event_add(dtio->event, NULL) != 0) {
1253 log_err("dnstap io: out of memory (adding event)");
1254 dtio->event_added = 0;
1255 dtio->event_added_is_write = 0;
1256 /* close output and start reattempts to open it */
1257 dtio_close_output(dtio);
1258 return 0;
1259 }
1260 dtio->event_added = 1;
1261 dtio->event_added_is_write = 1;
1262 return 1;
1263 }
1264
1265 /** put the dtio thread to sleep */
dtio_sleep(struct dt_io_thread * dtio)1266 static void dtio_sleep(struct dt_io_thread* dtio)
1267 {
1268 /* unregister the event polling for write, because there is
1269 * nothing to be written */
1270 (void)dtio_add_output_event_read(dtio);
1271
1272 /* Set wakeuptimer enabled off; so that the next worker thread that
1273 * wants to log starts a timer if needed, since the writer thread
1274 * has gone to sleep. */
1275 lock_basic_lock(&dtio->wakeup_timer_lock);
1276 dtio->wakeup_timer_enabled = 0;
1277 lock_basic_unlock(&dtio->wakeup_timer_lock);
1278 }
1279
1280 #ifdef HAVE_SSL
1281 /** enable the brief read condition */
dtio_enable_brief_read(struct dt_io_thread * dtio)1282 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1283 {
1284 dtio->ssl_brief_read = 1;
1285 if(dtio->stop_flush_event) {
1286 ub_event_del(dtio->stop_flush_event);
1287 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1288 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1289 log_err("dnstap io, stop flush, could not ub_event_add");
1290 return 0;
1291 }
1292 return 1;
1293 }
1294 return dtio_add_output_event_read(dtio);
1295 }
1296 #endif /* HAVE_SSL */
1297
1298 #ifdef HAVE_SSL
1299 /** disable the brief read condition */
dtio_disable_brief_read(struct dt_io_thread * dtio)1300 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1301 {
1302 dtio->ssl_brief_read = 0;
1303 if(dtio->stop_flush_event) {
1304 ub_event_del(dtio->stop_flush_event);
1305 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1306 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1307 log_err("dnstap io, stop flush, could not ub_event_add");
1308 return 0;
1309 }
1310 return 1;
1311 }
1312 return dtio_add_output_event_write(dtio);
1313 }
1314 #endif /* HAVE_SSL */
1315
1316 #ifdef HAVE_SSL
1317 /** enable the brief write condition */
dtio_enable_brief_write(struct dt_io_thread * dtio)1318 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1319 {
1320 dtio->ssl_brief_write = 1;
1321 return dtio_add_output_event_write(dtio);
1322 }
1323 #endif /* HAVE_SSL */
1324
1325 #ifdef HAVE_SSL
1326 /** disable the brief write condition */
dtio_disable_brief_write(struct dt_io_thread * dtio)1327 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1328 {
1329 dtio->ssl_brief_write = 0;
1330 return dtio_add_output_event_read(dtio);
1331 }
1332 #endif /* HAVE_SSL */
1333
1334 #ifdef HAVE_SSL
1335 /** check peer verification after ssl handshake connection, false if closed*/
dtio_ssl_check_peer(struct dt_io_thread * dtio)1336 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1337 {
1338 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1339 /* verification */
1340 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1341 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1342 X509* x = SSL_get1_peer_certificate(dtio->ssl);
1343 #else
1344 X509* x = SSL_get_peer_certificate(dtio->ssl);
1345 #endif
1346 if(!x) {
1347 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1348 "connection failed no certificate",
1349 dtio->ip_str);
1350 return 0;
1351 }
1352 log_cert(VERB_ALGO, "dnstap io, peer certificate",
1353 x);
1354 #ifdef HAVE_SSL_GET0_PEERNAME
1355 if(SSL_get0_peername(dtio->ssl)) {
1356 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1357 "connection to %s authenticated",
1358 dtio->ip_str,
1359 SSL_get0_peername(dtio->ssl));
1360 } else {
1361 #endif
1362 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1363 "connection authenticated",
1364 dtio->ip_str);
1365 #ifdef HAVE_SSL_GET0_PEERNAME
1366 }
1367 #endif
1368 X509_free(x);
1369 } else {
1370 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
1371 X509* x = SSL_get1_peer_certificate(dtio->ssl);
1372 #else
1373 X509* x = SSL_get_peer_certificate(dtio->ssl);
1374 #endif
1375 if(x) {
1376 log_cert(VERB_ALGO, "dnstap io, peer "
1377 "certificate", x);
1378 X509_free(x);
1379 }
1380 verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1381 "failed: failed to authenticate",
1382 dtio->ip_str);
1383 return 0;
1384 }
1385 } else {
1386 /* unauthenticated, the verify peer flag was not set
1387 * in ssl when the ssl object was created from ssl_ctx */
1388 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1389 dtio->ip_str);
1390 }
1391 return 1;
1392 }
1393 #endif /* HAVE_SSL */
1394
1395 #ifdef HAVE_SSL
1396 /** perform ssl handshake, returns 1 if okay, 0 to stop */
dtio_ssl_handshake(struct dt_io_thread * dtio,struct stop_flush_info * info)1397 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1398 struct stop_flush_info* info)
1399 {
1400 int r;
1401 if(dtio->ssl_brief_read) {
1402 /* assume the brief read condition is satisfied,
1403 * if we need more or again, we can set it again */
1404 if(!dtio_disable_brief_read(dtio)) {
1405 if(info) dtio_stop_flush_exit(info);
1406 return 0;
1407 }
1408 }
1409 if(dtio->ssl_handshake_done)
1410 return 1;
1411
1412 ERR_clear_error();
1413 r = SSL_do_handshake(dtio->ssl);
1414 if(r != 1) {
1415 int want = SSL_get_error(dtio->ssl, r);
1416 if(want == SSL_ERROR_WANT_READ) {
1417 /* we want to read on the connection */
1418 if(!dtio_enable_brief_read(dtio)) {
1419 if(info) dtio_stop_flush_exit(info);
1420 return 0;
1421 }
1422 return 0;
1423 } else if(want == SSL_ERROR_WANT_WRITE) {
1424 /* we want to write on the connection */
1425 return 0;
1426 } else if(r == 0) {
1427 /* closed */
1428 if(info) dtio_stop_flush_exit(info);
1429 dtio_del_output_event(dtio);
1430 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1431 dtio_close_output(dtio);
1432 return 0;
1433 } else if(want == SSL_ERROR_SYSCALL) {
1434 /* SYSCALL and errno==0 means closed uncleanly */
1435 int silent = 0;
1436 #ifdef EPIPE
1437 if(errno == EPIPE && verbosity < 2)
1438 silent = 1; /* silence 'broken pipe' */
1439 #endif
1440 #ifdef ECONNRESET
1441 if(errno == ECONNRESET && verbosity < 2)
1442 silent = 1; /* silence reset by peer */
1443 #endif
1444 if(errno == 0)
1445 silent = 1;
1446 if(!silent)
1447 log_err("dnstap io, SSL_handshake syscall: %s",
1448 strerror(errno));
1449 /* closed */
1450 if(info) dtio_stop_flush_exit(info);
1451 dtio_del_output_event(dtio);
1452 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1453 dtio_close_output(dtio);
1454 return 0;
1455 } else {
1456 unsigned long err = ERR_get_error();
1457 if(!squelch_err_ssl_handshake(err)) {
1458 log_crypto_err_io_code("dnstap io, ssl handshake failed",
1459 want, err);
1460 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1461 "from %s", dtio->ip_str);
1462 }
1463 /* closed */
1464 if(info) dtio_stop_flush_exit(info);
1465 dtio_del_output_event(dtio);
1466 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1467 dtio_close_output(dtio);
1468 return 0;
1469 }
1470
1471 }
1472 /* check peer verification */
1473 dtio->ssl_handshake_done = 1;
1474
1475 if(!dtio_ssl_check_peer(dtio)) {
1476 /* closed */
1477 if(info) dtio_stop_flush_exit(info);
1478 dtio_del_output_event(dtio);
1479 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1480 dtio_close_output(dtio);
1481 return 0;
1482 }
1483 return 1;
1484 }
1485 #endif /* HAVE_SSL */
1486
1487 /** callback for the dnstap events, to write to the output */
dtio_output_cb(int ATTR_UNUSED (fd),short bits,void * arg)1488 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1489 {
1490 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1491 int i;
1492
1493 if(dtio->check_nb_connect) {
1494 int connect_err = dtio_check_nb_connect(dtio);
1495 if(connect_err == -1) {
1496 /* close the channel */
1497 dtio_del_output_event(dtio);
1498 dtio_close_output(dtio);
1499 return;
1500 } else if(connect_err == 0) {
1501 /* try again later */
1502 return;
1503 }
1504 /* nonblocking connect check passed, continue */
1505 }
1506
1507 #ifdef HAVE_SSL
1508 if(dtio->ssl &&
1509 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1510 if(!dtio_ssl_handshake(dtio, NULL))
1511 return;
1512 }
1513 #endif
1514
1515 if((bits&UB_EV_READ) || dtio->ssl_brief_write) {
1516 #ifdef HAVE_SSL
1517 if(dtio->ssl_brief_write)
1518 (void)dtio_disable_brief_write(dtio);
1519 #endif
1520 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1521 if(dtio_read_accept_frame(dtio) <= 0)
1522 return;
1523 } else if(!dtio_check_close(dtio))
1524 return;
1525 }
1526
1527 /* loop to process a number of messages. This improves throughput,
1528 * because selecting on write-event if not needed for busy messages
1529 * (dnstap log) generation and if they need to all be written back.
1530 * The write event is usually not blocked up. But not forever,
1531 * because the event loop needs to stay responsive for other events.
1532 * If there are no (more) messages, or if the output buffers get
1533 * full, it returns out of the loop. */
1534 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1535 /* see if there are messages that need writing */
1536 if(!dtio->cur_msg) {
1537 if(!dtio_find_msg(dtio)) {
1538 if(i == 0) {
1539 /* no messages on the first iteration,
1540 * the queues are all empty */
1541 dtio_sleep(dtio);
1542 /* After putting to sleep, see if
1543 * a message is in a message queue,
1544 * if so, resume service. Stops a
1545 * race condition where a thread could
1546 * have one message but the dtio
1547 * also just went to sleep. With the
1548 * message queued between the
1549 * dtio_find_msg and dtio_sleep
1550 * calls. */
1551 if(dtio_find_msg(dtio)) {
1552 if(!dtio_add_output_event_write(dtio))
1553 return;
1554 }
1555 }
1556 if(!dtio->cur_msg)
1557 return; /* nothing to do */
1558 }
1559 }
1560
1561 /* write it */
1562 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1563 if(!dtio_write_more(dtio))
1564 return;
1565 }
1566
1567 /* done with the current message */
1568 dtio_cur_msg_free(dtio);
1569
1570 /* If this is a bidirectional stream the first message will be
1571 * the READY control frame. We can only continue writing after
1572 * receiving an ACCEPT control frame. */
1573 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1574 dtio->ready_frame_sent = 1;
1575 (void)dtio_add_output_event_read(dtio);
1576 break;
1577 }
1578 }
1579 }
1580
1581 /** callback for the dnstap commandpipe, to stop the dnstap IO */
dtio_cmd_cb(int fd,short ATTR_UNUSED (bits),void * arg)1582 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1583 {
1584 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1585 uint8_t cmd;
1586 ssize_t r;
1587 if(dtio->want_to_exit)
1588 return;
1589 r = read(fd, &cmd, sizeof(cmd));
1590 if(r == -1) {
1591 #ifndef USE_WINSOCK
1592 if(errno == EINTR || errno == EAGAIN)
1593 return; /* ignore this */
1594 #else
1595 if(WSAGetLastError() == WSAEINPROGRESS)
1596 return;
1597 if(WSAGetLastError() == WSAEWOULDBLOCK)
1598 return;
1599 #endif
1600 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1601 /* and then fall through to quit the thread */
1602 } else if(r == 0) {
1603 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1604 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1605 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1606 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1607 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1608
1609 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1610 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1611 "waiting for ACCEPT control frame");
1612 return;
1613 }
1614
1615 /* reregister event */
1616 if(!dtio_add_output_event_write(dtio))
1617 return;
1618 return;
1619 } else if(r == 1) {
1620 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1621 }
1622 dtio->want_to_exit = 1;
1623 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1624 != 0) {
1625 log_err("dnstap io: could not loopexit");
1626 }
1627 }
1628
1629 #ifndef THREADS_DISABLED
1630 /** setup the event base for the dnstap io thread */
dtio_setup_base(struct dt_io_thread * dtio,time_t * secs,struct timeval * now)1631 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1632 struct timeval* now)
1633 {
1634 memset(now, 0, sizeof(*now));
1635 dtio->event_base = ub_default_event_base(0, secs, now);
1636 if(!dtio->event_base) {
1637 fatal_exit("dnstap io: could not create event_base");
1638 }
1639 }
1640 #endif /* THREADS_DISABLED */
1641
1642 /** setup the cmd event for dnstap io */
dtio_setup_cmd(struct dt_io_thread * dtio)1643 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1644 {
1645 struct ub_event* cmdev;
1646 fd_set_nonblock(dtio->commandpipe[0]);
1647 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1648 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1649 if(!cmdev) {
1650 fatal_exit("dnstap io: out of memory");
1651 }
1652 dtio->command_event = cmdev;
1653 if(ub_event_add(cmdev, NULL) != 0) {
1654 fatal_exit("dnstap io: out of memory (adding event)");
1655 }
1656 }
1657
1658 /** setup the reconnect event for dnstap io */
dtio_setup_reconnect(struct dt_io_thread * dtio)1659 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1660 {
1661 dtio_reconnect_clear(dtio);
1662 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1663 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1664 if(!dtio->reconnect_timer) {
1665 fatal_exit("dnstap io: out of memory");
1666 }
1667 }
1668
1669 /**
1670 * structure to keep track of information during stop flush
1671 */
1672 struct stop_flush_info {
1673 /** the event base during stop flush */
1674 struct ub_event_base* base;
1675 /** did we already want to exit this stop-flush event base */
1676 int want_to_exit_flush;
1677 /** has the timer fired */
1678 int timer_done;
1679 /** the dtio */
1680 struct dt_io_thread* dtio;
1681 /** the stop control frame */
1682 void* stop_frame;
1683 /** length of the stop frame */
1684 size_t stop_frame_len;
1685 /** how much we have done of the stop frame */
1686 size_t stop_frame_done;
1687 };
1688
1689 /** exit the stop flush base */
dtio_stop_flush_exit(struct stop_flush_info * info)1690 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1691 {
1692 if(info->want_to_exit_flush)
1693 return;
1694 info->want_to_exit_flush = 1;
1695 if(ub_event_base_loopexit(info->base) != 0) {
1696 log_err("dnstap io: could not loopexit");
1697 }
1698 }
1699
1700 /** send the stop control,
1701 * return true if completed the frame. */
dtio_control_stop_send(struct stop_flush_info * info)1702 static int dtio_control_stop_send(struct stop_flush_info* info)
1703 {
1704 struct dt_io_thread* dtio = info->dtio;
1705 int r;
1706 if(info->stop_frame_done >= info->stop_frame_len)
1707 return 1;
1708 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1709 info->stop_frame_done, info->stop_frame_len -
1710 info->stop_frame_done);
1711 if(r == -1) {
1712 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1713 dtio_stop_flush_exit(info);
1714 return 0;
1715 }
1716 if(r == 0) {
1717 /* try again later, or timeout */
1718 return 0;
1719 }
1720 info->stop_frame_done += r;
1721 if(info->stop_frame_done < info->stop_frame_len)
1722 return 0; /* not done yet */
1723 return 1;
1724 }
1725
dtio_stop_timer_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)1726 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1727 void* arg)
1728 {
1729 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1730 if(info->want_to_exit_flush)
1731 return;
1732 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1733 info->timer_done = 1;
1734 dtio_stop_flush_exit(info);
1735 }
1736
dtio_stop_ev_cb(int ATTR_UNUSED (fd),short bits,void * arg)1737 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1738 {
1739 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1740 struct dt_io_thread* dtio = info->dtio;
1741 if(info->want_to_exit_flush)
1742 return;
1743 if(dtio->check_nb_connect) {
1744 /* we don't start the stop_flush if connect still
1745 * in progress, but the check code is here, just in case */
1746 int connect_err = dtio_check_nb_connect(dtio);
1747 if(connect_err == -1) {
1748 /* close the channel, exit the stop flush */
1749 dtio_stop_flush_exit(info);
1750 dtio_del_output_event(dtio);
1751 dtio_close_output(dtio);
1752 return;
1753 } else if(connect_err == 0) {
1754 /* try again later */
1755 return;
1756 }
1757 /* nonblocking connect check passed, continue */
1758 }
1759 #ifdef HAVE_SSL
1760 if(dtio->ssl &&
1761 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1762 if(!dtio_ssl_handshake(dtio, info))
1763 return;
1764 }
1765 #endif
1766
1767 if((bits&UB_EV_READ)) {
1768 if(!dtio_check_close(dtio)) {
1769 if(dtio->fd == -1) {
1770 verbose(VERB_ALGO, "dnstap io: "
1771 "stop flush: output closed");
1772 dtio_stop_flush_exit(info);
1773 }
1774 return;
1775 }
1776 }
1777 /* write remainder of last frame */
1778 if(dtio->cur_msg) {
1779 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1780 if(!dtio_write_more(dtio)) {
1781 if(dtio->fd == -1) {
1782 verbose(VERB_ALGO, "dnstap io: "
1783 "stop flush: output closed");
1784 dtio_stop_flush_exit(info);
1785 }
1786 return;
1787 }
1788 }
1789 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1790 "last frame");
1791 dtio_cur_msg_free(dtio);
1792 }
1793 /* write stop frame */
1794 if(info->stop_frame_done < info->stop_frame_len) {
1795 if(!dtio_control_stop_send(info))
1796 return;
1797 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1798 "stop control frame");
1799 }
1800 /* when last frame and stop frame are sent, exit */
1801 dtio_stop_flush_exit(info);
1802 }
1803
1804 /** flush at end, last packet and stop control */
dtio_control_stop_flush(struct dt_io_thread * dtio)1805 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1806 {
1807 /* briefly attempt to flush the previous packet to the output,
1808 * this could be a partial packet, or even the start control frame */
1809 time_t secs = 0;
1810 struct timeval now;
1811 struct stop_flush_info info;
1812 struct timeval tv;
1813 struct ub_event* timer, *stopev;
1814
1815 if(dtio->fd == -1 || dtio->check_nb_connect) {
1816 /* no connection or we have just connected, so nothing is
1817 * sent yet, so nothing to stop or flush */
1818 return;
1819 }
1820 if(dtio->ssl && !dtio->ssl_handshake_done) {
1821 /* no SSL connection has been established yet */
1822 return;
1823 }
1824
1825 memset(&info, 0, sizeof(info));
1826 memset(&now, 0, sizeof(now));
1827 info.dtio = dtio;
1828 info.base = ub_default_event_base(0, &secs, &now);
1829 if(!info.base) {
1830 log_err("dnstap io: malloc failure");
1831 return;
1832 }
1833 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1834 &dtio_stop_timer_cb, &info);
1835 if(!timer) {
1836 log_err("dnstap io: malloc failure");
1837 ub_event_base_free(info.base);
1838 return;
1839 }
1840 memset(&tv, 0, sizeof(tv));
1841 tv.tv_sec = 2;
1842 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1843 &tv) != 0) {
1844 log_err("dnstap io: cannot event_timer_add");
1845 ub_event_free(timer);
1846 ub_event_base_free(info.base);
1847 return;
1848 }
1849 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1850 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1851 if(!stopev) {
1852 log_err("dnstap io: malloc failure");
1853 ub_timer_del(timer);
1854 ub_event_free(timer);
1855 ub_event_base_free(info.base);
1856 return;
1857 }
1858 if(ub_event_add(stopev, NULL) != 0) {
1859 log_err("dnstap io: cannot event_add");
1860 ub_event_free(stopev);
1861 ub_timer_del(timer);
1862 ub_event_free(timer);
1863 ub_event_base_free(info.base);
1864 return;
1865 }
1866 info.stop_frame = fstrm_create_control_frame_stop(
1867 &info.stop_frame_len);
1868 if(!info.stop_frame) {
1869 log_err("dnstap io: malloc failure");
1870 ub_event_del(stopev);
1871 ub_event_free(stopev);
1872 ub_timer_del(timer);
1873 ub_event_free(timer);
1874 ub_event_base_free(info.base);
1875 return;
1876 }
1877 dtio->stop_flush_event = stopev;
1878
1879 /* wait briefly, or until finished */
1880 verbose(VERB_ALGO, "dnstap io: stop flush started");
1881 if(ub_event_base_dispatch(info.base) < 0) {
1882 log_err("dnstap io: dispatch flush failed, errno is %s",
1883 strerror(errno));
1884 }
1885 verbose(VERB_ALGO, "dnstap io: stop flush ended");
1886 free(info.stop_frame);
1887 dtio->stop_flush_event = NULL;
1888 ub_event_del(stopev);
1889 ub_event_free(stopev);
1890 ub_timer_del(timer);
1891 ub_event_free(timer);
1892 ub_event_base_free(info.base);
1893 }
1894
1895 /** perform desetup and free stuff when the dnstap io thread exits */
dtio_desetup(struct dt_io_thread * dtio)1896 static void dtio_desetup(struct dt_io_thread* dtio)
1897 {
1898 dtio_control_stop_flush(dtio);
1899 dtio_del_output_event(dtio);
1900 dtio_close_output(dtio);
1901 ub_event_del(dtio->command_event);
1902 ub_event_free(dtio->command_event);
1903 #ifndef USE_WINSOCK
1904 close(dtio->commandpipe[0]);
1905 #else
1906 _close(dtio->commandpipe[0]);
1907 #endif
1908 dtio->commandpipe[0] = -1;
1909 dtio_reconnect_del(dtio);
1910 ub_event_free(dtio->reconnect_timer);
1911 dtio_cur_msg_free(dtio);
1912 #ifndef THREADS_DISABLED
1913 ub_event_base_free(dtio->event_base);
1914 #endif
1915 }
1916
1917 /** setup a start control message */
dtio_control_start_send(struct dt_io_thread * dtio)1918 static int dtio_control_start_send(struct dt_io_thread* dtio)
1919 {
1920 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1921 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1922 &dtio->cur_msg_len);
1923 if(!dtio->cur_msg) {
1924 return 0;
1925 }
1926 /* setup to send the control message */
1927 /* set that the buffer needs to be sent, but the length
1928 * of that buffer is already written, that way the buffer can
1929 * start with 0 length and then the length of the control frame
1930 * in it */
1931 dtio->cur_msg_done = 0;
1932 dtio->cur_msg_len_done = 4;
1933 return 1;
1934 }
1935
1936 /** setup a ready control message */
dtio_control_ready_send(struct dt_io_thread * dtio)1937 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1938 {
1939 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1940 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1941 &dtio->cur_msg_len);
1942 if(!dtio->cur_msg) {
1943 return 0;
1944 }
1945 /* setup to send the control message */
1946 /* set that the buffer needs to be sent, but the length
1947 * of that buffer is already written, that way the buffer can
1948 * start with 0 length and then the length of the control frame
1949 * in it */
1950 dtio->cur_msg_done = 0;
1951 dtio->cur_msg_len_done = 4;
1952 return 1;
1953 }
1954
1955 /** open the output file descriptor for af_local */
dtio_open_output_local(struct dt_io_thread * dtio)1956 static int dtio_open_output_local(struct dt_io_thread* dtio)
1957 {
1958 #ifdef HAVE_SYS_UN_H
1959 struct sockaddr_un s;
1960 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1961 if(dtio->fd == -1) {
1962 log_err("dnstap io: failed to create socket: %s",
1963 sock_strerror(errno));
1964 return 0;
1965 }
1966 memset(&s, 0, sizeof(s));
1967 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1968 /* this member exists on BSDs, not Linux */
1969 s.sun_len = (unsigned)sizeof(s);
1970 #endif
1971 s.sun_family = AF_LOCAL;
1972 /* length is 92-108, 104 on FreeBSD */
1973 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1974 fd_set_nonblock(dtio->fd);
1975 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1976 == -1) {
1977 char* to = dtio->socket_path;
1978 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1979 verbosity < 4) {
1980 dtio_close_fd(dtio);
1981 return 0; /* no log retries on low verbosity */
1982 }
1983 log_err("dnstap io: failed to connect to \"%s\": %s",
1984 to, sock_strerror(errno));
1985 dtio_close_fd(dtio);
1986 return 0;
1987 }
1988 return 1;
1989 #else
1990 log_err("cannot create af_local socket");
1991 return 0;
1992 #endif /* HAVE_SYS_UN_H */
1993 }
1994
1995 /** open the output file descriptor for af_inet and af_inet6 */
dtio_open_output_tcp(struct dt_io_thread * dtio)1996 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1997 {
1998 struct sockaddr_storage addr;
1999 socklen_t addrlen;
2000 memset(&addr, 0, sizeof(addr));
2001 addrlen = (socklen_t)sizeof(addr);
2002
2003 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
2004 log_err("could not parse IP '%s'", dtio->ip_str);
2005 return 0;
2006 }
2007 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
2008 if(dtio->fd == -1) {
2009 log_err("can't create socket: %s", sock_strerror(errno));
2010 return 0;
2011 }
2012 fd_set_nonblock(dtio->fd);
2013 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
2014 if(errno == EINPROGRESS)
2015 return 1; /* wait until connect done*/
2016 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
2017 verbosity < 4) {
2018 dtio_close_fd(dtio);
2019 return 0; /* no log retries on low verbosity */
2020 }
2021 #ifndef USE_WINSOCK
2022 if(tcp_connect_errno_needs_log(
2023 (struct sockaddr *)&addr, addrlen)) {
2024 log_err("dnstap io: failed to connect to %s: %s",
2025 dtio->ip_str, strerror(errno));
2026 }
2027 #else
2028 if(WSAGetLastError() == WSAEINPROGRESS ||
2029 WSAGetLastError() == WSAEWOULDBLOCK)
2030 return 1; /* wait until connect done*/
2031 if(tcp_connect_errno_needs_log(
2032 (struct sockaddr *)&addr, addrlen)) {
2033 log_err("dnstap io: failed to connect to %s: %s",
2034 dtio->ip_str, wsa_strerror(WSAGetLastError()));
2035 }
2036 #endif
2037 dtio_close_fd(dtio);
2038 return 0;
2039 }
2040 return 1;
2041 }
2042
2043 /** setup the SSL structure for new connection */
dtio_setup_ssl(struct dt_io_thread * dtio)2044 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2045 {
2046 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2047 if(!dtio->ssl) return 0;
2048 dtio->ssl_handshake_done = 0;
2049 dtio->ssl_brief_read = 0;
2050
2051 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2052 dtio->tls_use_sni)) {
2053 return 0;
2054 }
2055 return 1;
2056 }
2057
2058 /** open the output file descriptor */
dtio_open_output(struct dt_io_thread * dtio)2059 static void dtio_open_output(struct dt_io_thread* dtio)
2060 {
2061 struct ub_event* ev;
2062 if(dtio->upstream_is_unix) {
2063 if(!dtio_open_output_local(dtio)) {
2064 dtio_reconnect_enable(dtio);
2065 return;
2066 }
2067 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2068 if(!dtio_open_output_tcp(dtio)) {
2069 dtio_reconnect_enable(dtio);
2070 return;
2071 }
2072 if(dtio->upstream_is_tls) {
2073 if(!dtio_setup_ssl(dtio)) {
2074 dtio_close_fd(dtio);
2075 dtio_reconnect_enable(dtio);
2076 return;
2077 }
2078 }
2079 }
2080 dtio->check_nb_connect = 1;
2081
2082 /* the EV_READ is to read ACCEPT control messages, and catch channel
2083 * close. EV_WRITE is to write packets */
2084 ev = ub_event_new(dtio->event_base, dtio->fd,
2085 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2086 dtio);
2087 if(!ev) {
2088 log_err("dnstap io: out of memory");
2089 if(dtio->ssl) {
2090 #ifdef HAVE_SSL
2091 SSL_free(dtio->ssl);
2092 dtio->ssl = NULL;
2093 #endif
2094 }
2095 dtio_close_fd(dtio);
2096 dtio_reconnect_enable(dtio);
2097 return;
2098 }
2099 dtio->event = ev;
2100
2101 /* setup protocol control message to start */
2102 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2103 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2104 log_err("dnstap io: out of memory");
2105 ub_event_free(dtio->event);
2106 dtio->event = NULL;
2107 if(dtio->ssl) {
2108 #ifdef HAVE_SSL
2109 SSL_free(dtio->ssl);
2110 dtio->ssl = NULL;
2111 #endif
2112 }
2113 dtio_close_fd(dtio);
2114 dtio_reconnect_enable(dtio);
2115 return;
2116 }
2117 }
2118
2119 /** perform the setup of the writer thread on the established event_base */
dtio_setup_on_base(struct dt_io_thread * dtio)2120 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2121 {
2122 dtio_setup_cmd(dtio);
2123 dtio_setup_reconnect(dtio);
2124 dtio_open_output(dtio);
2125 if(!dtio_add_output_event_write(dtio))
2126 return;
2127 }
2128
2129 #ifndef THREADS_DISABLED
2130 /** the IO thread function for the DNSTAP IO */
dnstap_io(void * arg)2131 static void* dnstap_io(void* arg)
2132 {
2133 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2134 time_t secs = 0;
2135 struct timeval now;
2136 const char name[16] = "unbound/dnstap"; /* seems to be the safest size
2137 between different OSes */
2138
2139 #if defined(HAVE_GETTID) && !defined(THREADS_DISABLED)
2140 dtio->thread_tid = gettid();
2141 if(dtio->thread_tid_log)
2142 log_thread_set(&dtio->thread_tid);
2143 else
2144 #endif
2145 log_thread_set(&dtio->threadnum);
2146
2147 ub_thread_setname(dtio->tid, name);
2148
2149 /* setup */
2150 verbose(VERB_ALGO, "start dnstap io thread");
2151 dtio_setup_base(dtio, &secs, &now);
2152 dtio_setup_on_base(dtio);
2153
2154 /* run */
2155 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2156 log_err("dnstap io: dispatch failed, errno is %s",
2157 strerror(errno));
2158 }
2159
2160 /* cleanup */
2161 verbose(VERB_ALGO, "stop dnstap io thread");
2162 dtio_desetup(dtio);
2163 return NULL;
2164 }
2165 #endif /* THREADS_DISABLED */
2166
dt_io_thread_start(struct dt_io_thread * dtio,void * event_base_nothr,int numworkers)2167 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2168 int numworkers)
2169 {
2170 /* set up the thread, can fail */
2171 #ifndef USE_WINSOCK
2172 if(pipe(dtio->commandpipe) == -1) {
2173 log_err("failed to create pipe: %s", strerror(errno));
2174 return 0;
2175 }
2176 #else
2177 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2178 log_err("failed to create _pipe: %s",
2179 wsa_strerror(WSAGetLastError()));
2180 return 0;
2181 }
2182 #endif
2183
2184 /* start the thread */
2185 dtio->threadnum = numworkers+1;
2186 dtio->started = 1;
2187 #ifndef THREADS_DISABLED
2188 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2189 (void)event_base_nothr;
2190 #else
2191 dtio->event_base = event_base_nothr;
2192 dtio_setup_on_base(dtio);
2193 #endif
2194 return 1;
2195 }
2196
dt_io_thread_stop(struct dt_io_thread * dtio)2197 void dt_io_thread_stop(struct dt_io_thread* dtio)
2198 {
2199 #ifndef THREADS_DISABLED
2200 uint8_t cmd = DTIO_COMMAND_STOP;
2201 #endif
2202 if(!dtio) return;
2203 if(!dtio->started) return;
2204 verbose(VERB_ALGO, "dnstap io: send stop cmd");
2205
2206 #ifndef THREADS_DISABLED
2207 while(1) {
2208 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2209 if(r == -1) {
2210 #ifndef USE_WINSOCK
2211 if(errno == EINTR || errno == EAGAIN)
2212 continue;
2213 #else
2214 if(WSAGetLastError() == WSAEINPROGRESS)
2215 continue;
2216 if(WSAGetLastError() == WSAEWOULDBLOCK)
2217 continue;
2218 #endif
2219 log_err("dnstap io stop: write: %s",
2220 sock_strerror(errno));
2221 break;
2222 }
2223 break;
2224 }
2225 dtio->started = 0;
2226 #endif /* THREADS_DISABLED */
2227
2228 #ifndef USE_WINSOCK
2229 close(dtio->commandpipe[1]);
2230 #else
2231 _close(dtio->commandpipe[1]);
2232 #endif
2233 dtio->commandpipe[1] = -1;
2234 #ifndef THREADS_DISABLED
2235 ub_thread_join(dtio->tid);
2236 #else
2237 dtio->want_to_exit = 1;
2238 dtio_desetup(dtio);
2239 #endif
2240 }
2241