/* * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP * * Copyright (c) 2020, NLnet Labs. All rights reserved. * * This software is open source. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * Neither the name of the NLNET LABS nor the names of its contributors may * be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ /** * \file * * An implementation of the Frame Streams data transport protocol for * the Unbound DNSTAP message logging facility. */ #include "config.h" #include "dnstap/dtstream.h" #include "dnstap/dnstap_fstrm.h" #include "util/config_file.h" #include "util/ub_event.h" #include "util/net_help.h" #include "services/outside_network.h" #include "sldns/sbuffer.h" #ifdef HAVE_SYS_UN_H #include #endif #include #ifdef HAVE_OPENSSL_SSL_H #include #endif #ifdef HAVE_OPENSSL_ERR_H #include #endif /** number of messages to process in one output callback */ #define DTIO_MESSAGES_PER_CALLBACK 100 /** the msec to wait for reconnect (if not immediate, the first attempt) */ #define DTIO_RECONNECT_TIMEOUT_MIN 10 /** the msec to wait for reconnect max after backoff */ #define DTIO_RECONNECT_TIMEOUT_MAX 1000 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 /** number of messages before wakeup of thread */ #define DTIO_MSG_FOR_WAKEUP 32 /** maximum length of received frame */ #define DTIO_RECV_FRAME_MAX_LEN 1000 struct stop_flush_info; /** DTIO command channel commands */ enum { /** DTIO command channel stop */ DTIO_COMMAND_STOP = 0, /** DTIO command channel wakeup */ DTIO_COMMAND_WAKEUP = 1 } dtio_channel_command; /** open the output channel */ static void dtio_open_output(struct dt_io_thread* dtio); /** add output event for read and write */ static int dtio_add_output_event_write(struct dt_io_thread* dtio); /** start reconnection attempts */ static void dtio_reconnect_enable(struct dt_io_thread* dtio); /** stop from stop_flush event loop */ static void dtio_stop_flush_exit(struct stop_flush_info* info); /** setup a start control message */ static int dtio_control_start_send(struct dt_io_thread* dtio); #ifdef HAVE_SSL /** enable briefly waiting for a read event, for SSL negotiation */ static int dtio_enable_brief_read(struct dt_io_thread* dtio); /** enable briefly waiting for a write event, for SSL negotiation */ static int dtio_enable_brief_write(struct dt_io_thread* dtio); #endif struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base) { struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); if(!mq) return NULL; mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, about 1 M should contain 64K messages with some overhead, or a whole bunch smaller ones */ mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); if(!mq->wakeup_timer) { free(mq); return NULL; } lock_basic_init(&mq->lock); lock_protect(&mq->lock, mq, sizeof(*mq)); return mq; } /** clear the message list, caller must hold the lock */ static void dt_msg_queue_clear(struct dt_msg_queue* mq) { struct dt_msg_entry* e = mq->first, *next=NULL; while(e) { next = e->next; free(e->buf); free(e); e = next; } mq->first = NULL; mq->last = NULL; mq->cursize = 0; mq->msgcount = 0; } void dt_msg_queue_delete(struct dt_msg_queue* mq) { if(!mq) return; lock_basic_destroy(&mq->lock); dt_msg_queue_clear(mq); comm_timer_delete(mq->wakeup_timer); free(mq); } /** make the dtio wake up by sending a wakeup command */ static void dtio_wakeup(struct dt_io_thread* dtio) { uint8_t cmd = DTIO_COMMAND_WAKEUP; if(!dtio) return; if(!dtio->started) return; while(1) { ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); if(r == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) continue; #else if(WSAGetLastError() == WSAEINPROGRESS) continue; if(WSAGetLastError() == WSAEWOULDBLOCK) continue; #endif log_err("dnstap io wakeup: write: %s", sock_strerror(errno)); break; } break; } } void mq_wakeup_cb(void* arg) { struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; /* even if the dtio is already active, because perhaps much * traffic suddenly, we leave the timer running to save on * managing it, the once a second timer is less work then * starting and stopping the timer frequently */ lock_basic_lock(&mq->dtio->wakeup_timer_lock); mq->dtio->wakeup_timer_enabled = 0; lock_basic_unlock(&mq->dtio->wakeup_timer_lock); dtio_wakeup(mq->dtio); } /** start timer to wakeup dtio because there is content in the queue */ static void dt_msg_queue_start_timer(struct dt_msg_queue* mq) { struct timeval tv; /* Start a timer to process messages to be logged. * If we woke up the dtio thread for every message, the wakeup * messages take up too much processing power. If the queue * fills up the wakeup happens immediately. The timer wakes it up * if there are infrequent messages to log. */ /* we cannot start a timer in dtio thread, because it is a different * thread and its event base is in use by the other thread, it would * give race conditions if we tried to modify its event base, * and locks would wait until it woke up, and this is what we do. */ /* do not start the timer if a timer already exists, perhaps * in another worker. So this variable is protected by a lock in * dtio */ lock_basic_lock(&mq->dtio->wakeup_timer_lock); if(mq->dtio->wakeup_timer_enabled) { lock_basic_unlock(&mq->dtio->wakeup_timer_lock); return; } mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ lock_basic_unlock(&mq->dtio->wakeup_timer_lock); /* start the timer, in mq, in the event base of our worker */ tv.tv_sec = 1; tv.tv_usec = 0; comm_timer_set(mq->wakeup_timer, &tv); } void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) { int wakeupnow = 0, wakeupstarttimer = 0; struct dt_msg_entry* entry; /* check conditions */ if(!buf) return; if(len == 0) { /* it is not possible to log entries with zero length, * because the framestream protocol does not carry it. * However the protobuf serialization does not create zero * length datagrams for dnstap, so this should not happen. */ free(buf); return; } if(!mq) { free(buf); return; } /* allocate memory for queue entry */ entry = malloc(sizeof(*entry)); if(!entry) { log_err("out of memory logging dnstap"); free(buf); return; } entry->next = NULL; entry->buf = buf; entry->len = len; /* aqcuire lock */ lock_basic_lock(&mq->lock); /* if list was empty, start timer for (eventual) wakeup */ if(mq->first == NULL) wakeupstarttimer = 1; /* if list contains more than wakeupnum elements, wakeup now, * or if list is (going to be) almost full */ if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || (mq->cursize < mq->maxsize * 9 / 10 && mq->cursize+len >= mq->maxsize * 9 / 10)) wakeupnow = 1; /* see if it is going to fit */ if(mq->cursize + len > mq->maxsize) { /* buffer full, or congested. */ /* drop */ lock_basic_unlock(&mq->lock); free(buf); free(entry); return; } mq->cursize += len; mq->msgcount ++; /* append to list */ if(mq->last) { mq->last->next = entry; } else { mq->first = entry; } mq->last = entry; /* release lock */ lock_basic_unlock(&mq->lock); if(wakeupnow) { dtio_wakeup(mq->dtio); } else if(wakeupstarttimer) { dt_msg_queue_start_timer(mq); } } struct dt_io_thread* dt_io_thread_create(void) { struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); lock_basic_init(&dtio->wakeup_timer_lock); lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, sizeof(dtio->wakeup_timer_enabled)); return dtio; } void dt_io_thread_delete(struct dt_io_thread* dtio) { struct dt_io_list_item* item, *nextitem; if(!dtio) return; lock_basic_destroy(&dtio->wakeup_timer_lock); item=dtio->io_list; while(item) { nextitem = item->next; free(item); item = nextitem; } free(dtio->socket_path); free(dtio->ip_str); free(dtio->tls_server_name); free(dtio->client_key_file); free(dtio->client_cert_file); if(dtio->ssl_ctx) { #ifdef HAVE_SSL SSL_CTX_free(dtio->ssl_ctx); #endif } free(dtio); } int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) { if(!cfg->dnstap) { log_warn("cannot setup dnstap because dnstap-enable is no"); return 0; } /* what type of connectivity do we have */ if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { if(cfg->dnstap_tls) dtio->upstream_is_tls = 1; else dtio->upstream_is_tcp = 1; } else { dtio->upstream_is_unix = 1; } dtio->is_bidirectional = cfg->dnstap_bidirectional; if(dtio->upstream_is_unix) { char* nm; if(!cfg->dnstap_socket_path || cfg->dnstap_socket_path[0]==0) { log_err("dnstap setup: no dnstap-socket-path for " "socket connect"); return 0; } nm = cfg->dnstap_socket_path; if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, cfg->chrootdir, strlen(cfg->chrootdir)) == 0) nm += strlen(cfg->chrootdir); free(dtio->socket_path); dtio->socket_path = strdup(nm); if(!dtio->socket_path) { log_err("dnstap setup: malloc failure"); return 0; } } if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { log_err("dnstap setup: no dnstap-ip for TCP connect"); return 0; } free(dtio->ip_str); dtio->ip_str = strdup(cfg->dnstap_ip); if(!dtio->ip_str) { log_err("dnstap setup: malloc failure"); return 0; } } if(dtio->upstream_is_tls) { #ifdef HAVE_SSL if(cfg->dnstap_tls_server_name && cfg->dnstap_tls_server_name[0]) { free(dtio->tls_server_name); dtio->tls_server_name = strdup( cfg->dnstap_tls_server_name); if(!dtio->tls_server_name) { log_err("dnstap setup: malloc failure"); return 0; } if(!check_auth_name_for_ssl(dtio->tls_server_name)) return 0; } if(cfg->dnstap_tls_client_key_file && cfg->dnstap_tls_client_key_file[0]) { dtio->use_client_certs = 1; free(dtio->client_key_file); dtio->client_key_file = strdup( cfg->dnstap_tls_client_key_file); if(!dtio->client_key_file) { log_err("dnstap setup: malloc failure"); return 0; } if(!cfg->dnstap_tls_client_cert_file || cfg->dnstap_tls_client_cert_file[0]==0) { log_err("dnstap setup: client key " "authentication enabled with " "dnstap-tls-client-key-file, but " "no dnstap-tls-client-cert-file " "is given"); return 0; } free(dtio->client_cert_file); dtio->client_cert_file = strdup( cfg->dnstap_tls_client_cert_file); if(!dtio->client_cert_file) { log_err("dnstap setup: malloc failure"); return 0; } } else { dtio->use_client_certs = 0; dtio->client_key_file = NULL; dtio->client_cert_file = NULL; } if(cfg->dnstap_tls_cert_bundle) { dtio->ssl_ctx = connect_sslctx_create( dtio->client_key_file, dtio->client_cert_file, cfg->dnstap_tls_cert_bundle, 0); } else { dtio->ssl_ctx = connect_sslctx_create( dtio->client_key_file, dtio->client_cert_file, cfg->tls_cert_bundle, cfg->tls_win_cert); } if(!dtio->ssl_ctx) { log_err("could not setup SSL CTX"); return 0; } dtio->tls_use_sni = cfg->tls_use_sni; #endif /* HAVE_SSL */ } return 1; } int dt_io_thread_register_queue(struct dt_io_thread* dtio, struct dt_msg_queue* mq) { struct dt_io_list_item* item = malloc(sizeof(*item)); if(!item) return 0; lock_basic_lock(&mq->lock); mq->dtio = dtio; lock_basic_unlock(&mq->lock); item->queue = mq; item->next = dtio->io_list; dtio->io_list = item; dtio->io_list_iter = NULL; return 1; } void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, struct dt_msg_queue* mq) { struct dt_io_list_item* item, *prev=NULL; if(!dtio) return; item = dtio->io_list; while(item) { if(item->queue == mq) { /* found it */ if(prev) prev->next = item->next; else dtio->io_list = item->next; /* the queue itself only registered, not deleted */ lock_basic_lock(&item->queue->lock); item->queue->dtio = NULL; lock_basic_unlock(&item->queue->lock); free(item); dtio->io_list_iter = NULL; return; } prev = item; item = item->next; } } /** pick a message from the queue, the routine locks and unlocks, * returns true if there is a message */ static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, size_t* len) { lock_basic_lock(&mq->lock); if(mq->first) { struct dt_msg_entry* entry = mq->first; mq->first = entry->next; if(!entry->next) mq->last = NULL; mq->cursize -= entry->len; mq->msgcount --; lock_basic_unlock(&mq->lock); *buf = entry->buf; *len = entry->len; free(entry); return 1; } lock_basic_unlock(&mq->lock); return 0; } /** find message in queue, false if no message, true if message to send */ static int dtio_find_in_queue(struct dt_io_thread* dtio, struct dt_msg_queue* mq) { void* buf=NULL; size_t len=0; if(dt_msg_queue_pop(mq, &buf, &len)) { dtio->cur_msg = buf; dtio->cur_msg_len = len; dtio->cur_msg_done = 0; dtio->cur_msg_len_done = 0; return 1; } return 0; } /** find a new message to write, search message queues, false if none */ static int dtio_find_msg(struct dt_io_thread* dtio) { struct dt_io_list_item *spot, *item; spot = dtio->io_list_iter; /* use the next queue for the next message lookup, * if we hit the end(NULL) the NULL restarts the iter at start. */ if(spot) dtio->io_list_iter = spot->next; else if(dtio->io_list) dtio->io_list_iter = dtio->io_list->next; /* scan from spot to end-of-io_list */ item = spot; while(item) { if(dtio_find_in_queue(dtio, item->queue)) return 1; item = item->next; } /* scan starting at the start-of-list (to wrap around the end) */ item = dtio->io_list; while(item) { if(dtio_find_in_queue(dtio, item->queue)) return 1; item = item->next; } return 0; } /** callback for the dnstap reconnect, to start reconnecting to output */ void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void* arg) { struct dt_io_thread* dtio = (struct dt_io_thread*)arg; dtio->reconnect_is_added = 0; verbose(VERB_ALGO, "dnstap io: reconnect timer"); dtio_open_output(dtio); if(dtio->event) { if(!dtio_add_output_event_write(dtio)) return; /* nothing wrong so far, wait on the output event */ return; } /* exponential backoff and retry on timer */ dtio_reconnect_enable(dtio); } /** attempt to reconnect to the output, after a timeout */ static void dtio_reconnect_enable(struct dt_io_thread* dtio) { struct timeval tv; int msec; if(dtio->want_to_exit) return; if(dtio->reconnect_is_added) return; /* already done */ /* exponential backoff, store the value for next timeout */ msec = dtio->reconnect_timeout; if(msec == 0) { dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; } else { dtio->reconnect_timeout = msec*2; if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; } verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", msec); /* setup wait timer */ memset(&tv, 0, sizeof(tv)); tv.tv_sec = msec/1000; tv.tv_usec = (msec%1000)*1000; if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { log_err("dnstap io: could not reconnect ev timer add"); return; } dtio->reconnect_is_added = 1; } /** remove dtio reconnect timer */ static void dtio_reconnect_del(struct dt_io_thread* dtio) { if(!dtio->reconnect_is_added) return; ub_timer_del(dtio->reconnect_timer); dtio->reconnect_is_added = 0; } /** clear the reconnect exponential backoff timer. * We have successfully connected so we can try again with short timeouts. */ static void dtio_reconnect_clear(struct dt_io_thread* dtio) { dtio->reconnect_timeout = 0; dtio_reconnect_del(dtio); } /** reconnect slowly, because we already know we have to wait for a bit */ static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) { dtio_reconnect_del(dtio); dtio->reconnect_timeout = msec; dtio_reconnect_enable(dtio); } /** delete the current message in the dtio, and reset counters */ static void dtio_cur_msg_free(struct dt_io_thread* dtio) { free(dtio->cur_msg); dtio->cur_msg = NULL; dtio->cur_msg_len = 0; dtio->cur_msg_done = 0; dtio->cur_msg_len_done = 0; } /** delete the buffer and counters used to read frame */ static void dtio_read_frame_free(struct dt_frame_read_buf* rb) { if(rb->buf) { free(rb->buf); rb->buf = NULL; } rb->buf_count = 0; rb->buf_cap = 0; rb->frame_len = 0; rb->frame_len_done = 0; rb->control_frame = 0; } /** del the output file descriptor event for listening */ static void dtio_del_output_event(struct dt_io_thread* dtio) { if(!dtio->event_added) return; ub_event_del(dtio->event); dtio->event_added = 0; dtio->event_added_is_write = 0; } /** close dtio socket and set it to -1 */ static void dtio_close_fd(struct dt_io_thread* dtio) { sock_close(dtio->fd); dtio->fd = -1; } /** close and stop the output file descriptor event */ static void dtio_close_output(struct dt_io_thread* dtio) { if(!dtio->event) return; ub_event_free(dtio->event); dtio->event = NULL; if(dtio->ssl) { #ifdef HAVE_SSL SSL_shutdown(dtio->ssl); SSL_free(dtio->ssl); dtio->ssl = NULL; #endif } dtio_close_fd(dtio); /* if there is a (partial) message, discard it * we cannot send (the remainder of) it, and a new * connection needs to start with a control frame. */ if(dtio->cur_msg) { dtio_cur_msg_free(dtio); } dtio->ready_frame_sent = 0; dtio->accept_frame_received = 0; dtio_read_frame_free(&dtio->read_frame); dtio_reconnect_enable(dtio); } /** check for pending nonblocking connect errors, * returns 1 if it is okay. -1 on error (close it), 0 to try later */ static int dtio_check_nb_connect(struct dt_io_thread* dtio) { int error = 0; socklen_t len = (socklen_t)sizeof(error); if(!dtio->check_nb_connect) return 1; /* everything okay */ if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, &len) < 0) { #ifndef USE_WINSOCK error = errno; /* on solaris errno is error */ #else error = WSAGetLastError(); #endif } #ifndef USE_WINSOCK #if defined(EINPROGRESS) && defined(EWOULDBLOCK) if(error == EINPROGRESS || error == EWOULDBLOCK) return 0; /* try again later */ #endif #else if(error == WSAEINPROGRESS) { return 0; /* try again later */ } else if(error == WSAEWOULDBLOCK) { ub_winsock_tcp_wouldblock((dtio->stop_flush_event? dtio->stop_flush_event:dtio->event), UB_EV_WRITE); return 0; /* try again later */ } #endif if(error != 0) { char* to = dtio->socket_path; if(!to) to = dtio->ip_str; if(!to) to = ""; log_err("dnstap io: failed to connect to \"%s\": %s", to, sock_strerror(error)); return -1; /* error, close it */ } if(dtio->ip_str) verbose(VERB_DETAIL, "dnstap io: connected to %s", dtio->ip_str); else if(dtio->socket_path) verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", dtio->socket_path); dtio_reconnect_clear(dtio); dtio->check_nb_connect = 0; return 1; /* everything okay */ } #ifdef HAVE_SSL /** write to ssl output * returns number of bytes written, 0 if nothing happened, * try again later, or -1 if the channel is to be closed. */ static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, size_t len) { int r; ERR_clear_error(); r = SSL_write(dtio->ssl, buf, len); if(r <= 0) { int want = SSL_get_error(dtio->ssl, r); if(want == SSL_ERROR_ZERO_RETURN) { /* closed */ return -1; } else if(want == SSL_ERROR_WANT_READ) { /* we want a brief read event */ dtio_enable_brief_read(dtio); return 0; } else if(want == SSL_ERROR_WANT_WRITE) { /* write again later */ return 0; } else if(want == SSL_ERROR_SYSCALL) { #ifdef EPIPE if(errno == EPIPE && verbosity < 2) return -1; /* silence 'broken pipe' */ #endif #ifdef ECONNRESET if(errno == ECONNRESET && verbosity < 2) return -1; /* silence reset by peer */ #endif if(errno != 0) { log_err("dnstap io, SSL_write syscall: %s", strerror(errno)); } return -1; } log_crypto_err("dnstap io, could not SSL_write"); return -1; } return r; } #endif /* HAVE_SSL */ /** write buffer to output. * returns number of bytes written, 0 if nothing happened, * try again later, or -1 if the channel is to be closed. */ static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, size_t len) { ssize_t ret; if(dtio->fd == -1) return -1; #ifdef HAVE_SSL if(dtio->ssl) return dtio_write_ssl(dtio, buf, len); #endif ret = send(dtio->fd, (void*)buf, len, 0); if(ret == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return 0; #else if(WSAGetLastError() == WSAEINPROGRESS) return 0; if(WSAGetLastError() == WSAEWOULDBLOCK) { ub_winsock_tcp_wouldblock((dtio->stop_flush_event? dtio->stop_flush_event:dtio->event), UB_EV_WRITE); return 0; } #endif log_err("dnstap io: failed send: %s", sock_strerror(errno)); return -1; } return ret; } #ifdef HAVE_WRITEV /** write with writev, len and message, in one write, if possible. * return true if message is done, false if incomplete */ static int dtio_write_with_writev(struct dt_io_thread* dtio) { uint32_t sendlen = htonl(dtio->cur_msg_len); struct iovec iov[2]; ssize_t r; iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; iov[1].iov_base = dtio->cur_msg; iov[1].iov_len = dtio->cur_msg_len; log_assert(iov[0].iov_len > 0); r = writev(dtio->fd, iov, 2); if(r == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return 0; #else if(WSAGetLastError() == WSAEINPROGRESS) return 0; if(WSAGetLastError() == WSAEWOULDBLOCK) { ub_winsock_tcp_wouldblock((dtio->stop_flush_event? dtio->stop_flush_event:dtio->event), UB_EV_WRITE); return 0; } #endif log_err("dnstap io: failed writev: %s", sock_strerror(errno)); /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); return 0; } /* written r bytes */ dtio->cur_msg_len_done += r; if(dtio->cur_msg_len_done < 4) return 0; if(dtio->cur_msg_len_done > 4) { dtio->cur_msg_done = dtio->cur_msg_len_done-4; dtio->cur_msg_len_done = 4; } if(dtio->cur_msg_done < dtio->cur_msg_len) return 0; return 1; } #endif /* HAVE_WRITEV */ /** write more of the length, preceding the data frame. * return true if message is done, false if incomplete. */ static int dtio_write_more_of_len(struct dt_io_thread* dtio) { uint32_t sendlen; int r; if(dtio->cur_msg_len_done >= 4) return 1; #ifdef HAVE_WRITEV if(!dtio->ssl) { /* we try writev for everything.*/ return dtio_write_with_writev(dtio); } #endif /* HAVE_WRITEV */ sendlen = htonl(dtio->cur_msg_len); r = dtio_write_buf(dtio, ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, sizeof(sendlen)-dtio->cur_msg_len_done); if(r == -1) { /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); return 0; } else if(r == 0) { /* try again later */ return 0; } dtio->cur_msg_len_done += r; if(dtio->cur_msg_len_done < 4) return 0; return 1; } /** write more of the data frame. * return true if message is done, false if incomplete. */ static int dtio_write_more_of_data(struct dt_io_thread* dtio) { int r; if(dtio->cur_msg_done >= dtio->cur_msg_len) return 1; r = dtio_write_buf(dtio, ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, dtio->cur_msg_len - dtio->cur_msg_done); if(r == -1) { /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); return 0; } else if(r == 0) { /* try again later */ return 0; } dtio->cur_msg_done += r; if(dtio->cur_msg_done < dtio->cur_msg_len) return 0; return 1; } /** write more of the current messsage. false if incomplete, true if * the message is done */ static int dtio_write_more(struct dt_io_thread* dtio) { if(dtio->cur_msg_len_done < 4) { if(!dtio_write_more_of_len(dtio)) return 0; } if(dtio->cur_msg_done < dtio->cur_msg_len) { if(!dtio_write_more_of_data(dtio)) return 0; } return 1; } /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, * -1: continue, >0: number of bytes read into buffer */ static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { ssize_t r; r = recv(dtio->fd, (void*)buf, len, 0); if(r == -1) { char* to = dtio->socket_path; if(!to) to = dtio->ip_str; if(!to) to = ""; #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return -1; /* try later */ #else if(WSAGetLastError() == WSAEINPROGRESS) { return -1; /* try later */ } else if(WSAGetLastError() == WSAEWOULDBLOCK) { ub_winsock_tcp_wouldblock( (dtio->stop_flush_event? dtio->stop_flush_event:dtio->event), UB_EV_READ); return -1; /* try later */ } #endif if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && verbosity < 4) return 0; /* no log retries on low verbosity */ log_err("dnstap io: output closed, recv %s: %s", to, strerror(errno)); /* and close below */ return 0; } if(r == 0) { if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && verbosity < 4) return 0; /* no log retries on low verbosity */ verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); /* and close below */ return 0; } /* something was received */ return r; } #ifdef HAVE_SSL /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, * -1: continue, >0: number of bytes read into buffer */ static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { int r; ERR_clear_error(); r = SSL_read(dtio->ssl, buf, len); if(r <= 0) { int want = SSL_get_error(dtio->ssl, r); if(want == SSL_ERROR_ZERO_RETURN) { if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && verbosity < 4) return 0; /* no log retries on low verbosity */ verbose(VERB_DETAIL, "dnstap io: output closed by the " "other side"); return 0; } else if(want == SSL_ERROR_WANT_READ) { /* continue later */ return -1; } else if(want == SSL_ERROR_WANT_WRITE) { (void)dtio_enable_brief_write(dtio); return -1; } else if(want == SSL_ERROR_SYSCALL) { #ifdef ECONNRESET if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && errno == ECONNRESET && verbosity < 4) return 0; /* silence reset by peer */ #endif if(errno != 0) log_err("SSL_read syscall: %s", strerror(errno)); verbose(VERB_DETAIL, "dnstap io: output closed by the " "other side"); return 0; } log_crypto_err("could not SSL_read"); verbose(VERB_DETAIL, "dnstap io: output closed by the " "other side"); return 0; } return r; } #endif /* HAVE_SSL */ /** check if the output fd has been closed, * it returns false if the stream is closed. */ static int dtio_check_close(struct dt_io_thread* dtio) { /* we don't want to read any packets, but if there are we can * discard the input (ignore it). Ignore of unknown (control) * packets is okay for the framestream protocol. And also, the * read call can return that the stream has been closed by the * other side. */ uint8_t buf[1024]; int r = -1; if(dtio->fd == -1) return 0; while(r != 0) { /* not interested in buffer content, overwrite */ r = receive_bytes(dtio, (void*)buf, sizeof(buf)); if(r == -1) return 1; } /* the other end has been closed */ /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); return 0; } /** Read accept frame. Returns -1: continue reading, 0: closed, * 1: valid accept received. */ static int dtio_read_accept_frame(struct dt_io_thread* dtio) { int r; size_t read_frame_done; while(dtio->read_frame.frame_len_done < 4) { #ifdef HAVE_SSL if(dtio->ssl) { r = ssl_read_bytes(dtio, (uint8_t*)&dtio->read_frame.frame_len+ dtio->read_frame.frame_len_done, 4-dtio->read_frame.frame_len_done); } else { #endif r = receive_bytes(dtio, (uint8_t*)&dtio->read_frame.frame_len+ dtio->read_frame.frame_len_done, 4-dtio->read_frame.frame_len_done); #ifdef HAVE_SSL } #endif if(r == -1) return -1; /* continue reading */ if(r == 0) { /* connection closed */ goto close_connection; } dtio->read_frame.frame_len_done += r; if(dtio->read_frame.frame_len_done < 4) return -1; /* continue reading */ if(dtio->read_frame.frame_len == 0) { dtio->read_frame.frame_len_done = 0; dtio->read_frame.control_frame = 1; continue; } dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { verbose(VERB_OPS, "dnstap: received frame exceeds max " "length of %d bytes, closing connection", DTIO_RECV_FRAME_MAX_LEN); goto close_connection; } dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); dtio->read_frame.buf_cap = dtio->read_frame.frame_len; if(!dtio->read_frame.buf) { log_err("dnstap io: out of memory (creating read " "buffer)"); goto close_connection; } } if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { #ifdef HAVE_SSL if(dtio->ssl) { r = ssl_read_bytes(dtio, dtio->read_frame.buf+ dtio->read_frame.buf_count, dtio->read_frame.buf_cap- dtio->read_frame.buf_count); } else { #endif r = receive_bytes(dtio, dtio->read_frame.buf+ dtio->read_frame.buf_count, dtio->read_frame.buf_cap- dtio->read_frame.buf_count); #ifdef HAVE_SSL } #endif if(r == -1) return -1; /* continue reading */ if(r == 0) { /* connection closed */ goto close_connection; } dtio->read_frame.buf_count += r; if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) return -1; /* continue reading */ } /* Complete frame received, check if this is a valid ACCEPT control * frame. */ if(dtio->read_frame.frame_len < 4) { verbose(VERB_OPS, "dnstap: invalid data received"); goto close_connection; } if(sldns_read_uint32(dtio->read_frame.buf) != FSTRM_CONTROL_FRAME_ACCEPT) { verbose(VERB_ALGO, "dnstap: invalid control type received, " "ignored"); dtio->ready_frame_sent = 0; dtio->accept_frame_received = 0; dtio_read_frame_free(&dtio->read_frame); return -1; } read_frame_done = 4; /* control frame type */ /* Iterate over control fields, ignore unknown types. * Need to be able to read at least 8 bytes (control field type + * length). */ while(read_frame_done+8 < dtio->read_frame.frame_len) { uint32_t type = sldns_read_uint32(dtio->read_frame.buf + read_frame_done); uint32_t len = sldns_read_uint32(dtio->read_frame.buf + read_frame_done + 4); if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { if(len == strlen(DNSTAP_CONTENT_TYPE) && read_frame_done+8+len <= dtio->read_frame.frame_len && memcmp(dtio->read_frame.buf + read_frame_done + + 8, DNSTAP_CONTENT_TYPE, len) == 0) { if(!dtio_control_start_send(dtio)) { verbose(VERB_OPS, "dnstap io: out of " "memory while sending START frame"); goto close_connection; } dtio->accept_frame_received = 1; if(!dtio_add_output_event_write(dtio)) goto close_connection; return 1; } else { /* unknow content type */ verbose(VERB_ALGO, "dnstap: ACCEPT frame " "contains unknown content type, " "closing connection"); goto close_connection; } } /* unknown option, try next */ read_frame_done += 8+len; } close_connection: dtio_del_output_event(dtio); dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); dtio_close_output(dtio); return 0; } /** add the output file descriptor event for listening, read only */ static int dtio_add_output_event_read(struct dt_io_thread* dtio) { if(!dtio->event) return 0; if(dtio->event_added && !dtio->event_added_is_write) return 1; /* we have to (re-)register the event */ if(dtio->event_added) ub_event_del(dtio->event); ub_event_del_bits(dtio->event, UB_EV_WRITE); if(ub_event_add(dtio->event, NULL) != 0) { log_err("dnstap io: out of memory (adding event)"); dtio->event_added = 0; dtio->event_added_is_write = 0; /* close output and start reattempts to open it */ dtio_close_output(dtio); return 0; } dtio->event_added = 1; dtio->event_added_is_write = 0; return 1; } /** add the output file descriptor event for listening, read and write */ static int dtio_add_output_event_write(struct dt_io_thread* dtio) { if(!dtio->event) return 0; if(dtio->event_added && dtio->event_added_is_write) return 1; /* we have to (re-)register the event */ if(dtio->event_added) ub_event_del(dtio->event); ub_event_add_bits(dtio->event, UB_EV_WRITE); if(ub_event_add(dtio->event, NULL) != 0) { log_err("dnstap io: out of memory (adding event)"); dtio->event_added = 0; dtio->event_added_is_write = 0; /* close output and start reattempts to open it */ dtio_close_output(dtio); return 0; } dtio->event_added = 1; dtio->event_added_is_write = 1; return 1; } /** put the dtio thread to sleep */ static void dtio_sleep(struct dt_io_thread* dtio) { /* unregister the event polling for write, because there is * nothing to be written */ (void)dtio_add_output_event_read(dtio); } #ifdef HAVE_SSL /** enable the brief read condition */ static int dtio_enable_brief_read(struct dt_io_thread* dtio) { dtio->ssl_brief_read = 1; if(dtio->stop_flush_event) { ub_event_del(dtio->stop_flush_event); ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { log_err("dnstap io, stop flush, could not ub_event_add"); return 0; } return 1; } return dtio_add_output_event_read(dtio); } #endif /* HAVE_SSL */ #ifdef HAVE_SSL /** disable the brief read condition */ static int dtio_disable_brief_read(struct dt_io_thread* dtio) { dtio->ssl_brief_read = 0; if(dtio->stop_flush_event) { ub_event_del(dtio->stop_flush_event); ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { log_err("dnstap io, stop flush, could not ub_event_add"); return 0; } return 1; } return dtio_add_output_event_write(dtio); } #endif /* HAVE_SSL */ #ifdef HAVE_SSL /** enable the brief write condition */ static int dtio_enable_brief_write(struct dt_io_thread* dtio) { dtio->ssl_brief_write = 1; return dtio_add_output_event_write(dtio); } #endif /* HAVE_SSL */ #ifdef HAVE_SSL /** disable the brief write condition */ static int dtio_disable_brief_write(struct dt_io_thread* dtio) { dtio->ssl_brief_write = 0; return dtio_add_output_event_read(dtio); } #endif /* HAVE_SSL */ #ifdef HAVE_SSL /** check peer verification after ssl handshake connection, false if closed*/ static int dtio_ssl_check_peer(struct dt_io_thread* dtio) { if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { /* verification */ if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { X509* x = SSL_get_peer_certificate(dtio->ssl); if(!x) { verbose(VERB_ALGO, "dnstap io, %s, SSL " "connection failed no certificate", dtio->ip_str); return 0; } log_cert(VERB_ALGO, "dnstap io, peer certificate", x); #ifdef HAVE_SSL_GET0_PEERNAME if(SSL_get0_peername(dtio->ssl)) { verbose(VERB_ALGO, "dnstap io, %s, SSL " "connection to %s authenticated", dtio->ip_str, SSL_get0_peername(dtio->ssl)); } else { #endif verbose(VERB_ALGO, "dnstap io, %s, SSL " "connection authenticated", dtio->ip_str); #ifdef HAVE_SSL_GET0_PEERNAME } #endif X509_free(x); } else { X509* x = SSL_get_peer_certificate(dtio->ssl); if(x) { log_cert(VERB_ALGO, "dnstap io, peer " "certificate", x); X509_free(x); } verbose(VERB_ALGO, "dnstap io, %s, SSL connection " "failed: failed to authenticate", dtio->ip_str); return 0; } } else { /* unauthenticated, the verify peer flag was not set * in ssl when the ssl object was created from ssl_ctx */ verbose(VERB_ALGO, "dnstap io, %s, SSL connection", dtio->ip_str); } return 1; } #endif /* HAVE_SSL */ #ifdef HAVE_SSL /** perform ssl handshake, returns 1 if okay, 0 to stop */ static int dtio_ssl_handshake(struct dt_io_thread* dtio, struct stop_flush_info* info) { int r; if(dtio->ssl_brief_read) { /* assume the brief read condition is satisfied, * if we need more or again, we can set it again */ if(!dtio_disable_brief_read(dtio)) { if(info) dtio_stop_flush_exit(info); return 0; } } if(dtio->ssl_handshake_done) return 1; ERR_clear_error(); r = SSL_do_handshake(dtio->ssl); if(r != 1) { int want = SSL_get_error(dtio->ssl, r); if(want == SSL_ERROR_WANT_READ) { /* we want to read on the connection */ if(!dtio_enable_brief_read(dtio)) { if(info) dtio_stop_flush_exit(info); return 0; } return 0; } else if(want == SSL_ERROR_WANT_WRITE) { /* we want to write on the connection */ return 0; } else if(r == 0) { /* closed */ if(info) dtio_stop_flush_exit(info); dtio_del_output_event(dtio); dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); dtio_close_output(dtio); return 0; } else if(want == SSL_ERROR_SYSCALL) { /* SYSCALL and errno==0 means closed uncleanly */ int silent = 0; #ifdef EPIPE if(errno == EPIPE && verbosity < 2) silent = 1; /* silence 'broken pipe' */ #endif #ifdef ECONNRESET if(errno == ECONNRESET && verbosity < 2) silent = 1; /* silence reset by peer */ #endif if(errno == 0) silent = 1; if(!silent) log_err("dnstap io, SSL_handshake syscall: %s", strerror(errno)); /* closed */ if(info) dtio_stop_flush_exit(info); dtio_del_output_event(dtio); dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); dtio_close_output(dtio); return 0; } else { unsigned long err = ERR_get_error(); if(!squelch_err_ssl_handshake(err)) { log_crypto_err_code("dnstap io, ssl handshake failed", err); verbose(VERB_OPS, "dnstap io, ssl handshake failed " "from %s", dtio->ip_str); } /* closed */ if(info) dtio_stop_flush_exit(info); dtio_del_output_event(dtio); dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); dtio_close_output(dtio); return 0; } } /* check peer verification */ dtio->ssl_handshake_done = 1; if(!dtio_ssl_check_peer(dtio)) { /* closed */ if(info) dtio_stop_flush_exit(info); dtio_del_output_event(dtio); dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); dtio_close_output(dtio); return 0; } return 1; } #endif /* HAVE_SSL */ /** callback for the dnstap events, to write to the output */ void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) { struct dt_io_thread* dtio = (struct dt_io_thread*)arg; int i; if(dtio->check_nb_connect) { int connect_err = dtio_check_nb_connect(dtio); if(connect_err == -1) { /* close the channel */ dtio_del_output_event(dtio); dtio_close_output(dtio); return; } else if(connect_err == 0) { /* try again later */ return; } /* nonblocking connect check passed, continue */ } #ifdef HAVE_SSL if(dtio->ssl && (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { if(!dtio_ssl_handshake(dtio, NULL)) return; } #endif if((bits&UB_EV_READ || dtio->ssl_brief_write)) { if(dtio->ssl_brief_write) (void)dtio_disable_brief_write(dtio); if(dtio->ready_frame_sent && !dtio->accept_frame_received) { if(dtio_read_accept_frame(dtio) <= 0) return; } else if(!dtio_check_close(dtio)) return; } /* loop to process a number of messages. This improves throughput, * because selecting on write-event if not needed for busy messages * (dnstap log) generation and if they need to all be written back. * The write event is usually not blocked up. But not forever, * because the event loop needs to stay responsive for other events. * If there are no (more) messages, or if the output buffers get * full, it returns out of the loop. */ for(i=0; icur_msg) { if(!dtio_find_msg(dtio)) { if(i == 0) { /* no messages on the first iteration, * the queues are all empty */ dtio_sleep(dtio); } return; /* nothing to do */ } } /* write it */ if(dtio->cur_msg_done < dtio->cur_msg_len) { if(!dtio_write_more(dtio)) return; } /* done with the current message */ dtio_cur_msg_free(dtio); /* If this is a bidirectional stream the first message will be * the READY control frame. We can only continue writing after * receiving an ACCEPT control frame. */ if(dtio->is_bidirectional && !dtio->ready_frame_sent) { dtio->ready_frame_sent = 1; (void)dtio_add_output_event_read(dtio); break; } } } /** callback for the dnstap commandpipe, to stop the dnstap IO */ void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) { struct dt_io_thread* dtio = (struct dt_io_thread*)arg; uint8_t cmd; ssize_t r; if(dtio->want_to_exit) return; r = read(fd, &cmd, sizeof(cmd)); if(r == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return; /* ignore this */ #else if(WSAGetLastError() == WSAEINPROGRESS) return; if(WSAGetLastError() == WSAEWOULDBLOCK) return; #endif log_err("dnstap io: failed to read: %s", sock_strerror(errno)); /* and then fall through to quit the thread */ } else if(r == 0) { verbose(VERB_ALGO, "dnstap io: cmd channel closed"); } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); if(dtio->is_bidirectional && !dtio->accept_frame_received) { verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " "waiting for ACCEPT control frame"); return; } /* reregister event */ if(!dtio_add_output_event_write(dtio)) return; return; } else if(r == 1) { verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); } dtio->want_to_exit = 1; if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) != 0) { log_err("dnstap io: could not loopexit"); } } #ifndef THREADS_DISABLED /** setup the event base for the dnstap io thread */ static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, struct timeval* now) { memset(now, 0, sizeof(*now)); dtio->event_base = ub_default_event_base(0, secs, now); if(!dtio->event_base) { fatal_exit("dnstap io: could not create event_base"); } } #endif /* THREADS_DISABLED */ /** setup the cmd event for dnstap io */ static void dtio_setup_cmd(struct dt_io_thread* dtio) { struct ub_event* cmdev; fd_set_nonblock(dtio->commandpipe[0]); cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); if(!cmdev) { fatal_exit("dnstap io: out of memory"); } dtio->command_event = cmdev; if(ub_event_add(cmdev, NULL) != 0) { fatal_exit("dnstap io: out of memory (adding event)"); } } /** setup the reconnect event for dnstap io */ static void dtio_setup_reconnect(struct dt_io_thread* dtio) { dtio_reconnect_clear(dtio); dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); if(!dtio->reconnect_timer) { fatal_exit("dnstap io: out of memory"); } } /** * structure to keep track of information during stop flush */ struct stop_flush_info { /** the event base during stop flush */ struct ub_event_base* base; /** did we already want to exit this stop-flush event base */ int want_to_exit_flush; /** has the timer fired */ int timer_done; /** the dtio */ struct dt_io_thread* dtio; /** the stop control frame */ void* stop_frame; /** length of the stop frame */ size_t stop_frame_len; /** how much we have done of the stop frame */ size_t stop_frame_done; }; /** exit the stop flush base */ static void dtio_stop_flush_exit(struct stop_flush_info* info) { if(info->want_to_exit_flush) return; info->want_to_exit_flush = 1; if(ub_event_base_loopexit(info->base) != 0) { log_err("dnstap io: could not loopexit"); } } /** send the stop control, * return true if completed the frame. */ static int dtio_control_stop_send(struct stop_flush_info* info) { struct dt_io_thread* dtio = info->dtio; int r; if(info->stop_frame_done >= info->stop_frame_len) return 1; r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + info->stop_frame_done, info->stop_frame_len - info->stop_frame_done); if(r == -1) { verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); dtio_stop_flush_exit(info); return 0; } if(r == 0) { /* try again later, or timeout */ return 0; } info->stop_frame_done += r; if(info->stop_frame_done < info->stop_frame_len) return 0; /* not done yet */ return 1; } void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void* arg) { struct stop_flush_info* info = (struct stop_flush_info*)arg; if(info->want_to_exit_flush) return; verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); info->timer_done = 1; dtio_stop_flush_exit(info); } void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) { struct stop_flush_info* info = (struct stop_flush_info*)arg; struct dt_io_thread* dtio = info->dtio; if(info->want_to_exit_flush) return; if(dtio->check_nb_connect) { /* we don't start the stop_flush if connect still * in progress, but the check code is here, just in case */ int connect_err = dtio_check_nb_connect(dtio); if(connect_err == -1) { /* close the channel, exit the stop flush */ dtio_stop_flush_exit(info); dtio_del_output_event(dtio); dtio_close_output(dtio); return; } else if(connect_err == 0) { /* try again later */ return; } /* nonblocking connect check passed, continue */ } #ifdef HAVE_SSL if(dtio->ssl && (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { if(!dtio_ssl_handshake(dtio, info)) return; } #endif if((bits&UB_EV_READ)) { if(!dtio_check_close(dtio)) { if(dtio->fd == -1) { verbose(VERB_ALGO, "dnstap io: " "stop flush: output closed"); dtio_stop_flush_exit(info); } return; } } /* write remainder of last frame */ if(dtio->cur_msg) { if(dtio->cur_msg_done < dtio->cur_msg_len) { if(!dtio_write_more(dtio)) { if(dtio->fd == -1) { verbose(VERB_ALGO, "dnstap io: " "stop flush: output closed"); dtio_stop_flush_exit(info); } return; } } verbose(VERB_ALGO, "dnstap io: stop flush completed " "last frame"); dtio_cur_msg_free(dtio); } /* write stop frame */ if(info->stop_frame_done < info->stop_frame_len) { if(!dtio_control_stop_send(info)) return; verbose(VERB_ALGO, "dnstap io: stop flush completed " "stop control frame"); } /* when last frame and stop frame are sent, exit */ dtio_stop_flush_exit(info); } /** flush at end, last packet and stop control */ static void dtio_control_stop_flush(struct dt_io_thread* dtio) { /* briefly attempt to flush the previous packet to the output, * this could be a partial packet, or even the start control frame */ time_t secs = 0; struct timeval now; struct stop_flush_info info; struct timeval tv; struct ub_event* timer, *stopev; if(dtio->fd == -1 || dtio->check_nb_connect) { /* no connection or we have just connected, so nothing is * sent yet, so nothing to stop or flush */ return; } if(dtio->ssl && !dtio->ssl_handshake_done) { /* no SSL connection has been established yet */ return; } memset(&info, 0, sizeof(info)); memset(&now, 0, sizeof(now)); info.dtio = dtio; info.base = ub_default_event_base(0, &secs, &now); if(!info.base) { log_err("dnstap io: malloc failure"); return; } timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, &dtio_stop_timer_cb, &info); if(!timer) { log_err("dnstap io: malloc failure"); ub_event_base_free(info.base); return; } memset(&tv, 0, sizeof(tv)); tv.tv_sec = 2; if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, &tv) != 0) { log_err("dnstap io: cannot event_timer_add"); ub_event_free(timer); ub_event_base_free(info.base); return; } stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); if(!stopev) { log_err("dnstap io: malloc failure"); ub_timer_del(timer); ub_event_free(timer); ub_event_base_free(info.base); return; } if(ub_event_add(stopev, NULL) != 0) { log_err("dnstap io: cannot event_add"); ub_event_free(stopev); ub_timer_del(timer); ub_event_free(timer); ub_event_base_free(info.base); return; } info.stop_frame = fstrm_create_control_frame_stop( &info.stop_frame_len); if(!info.stop_frame) { log_err("dnstap io: malloc failure"); ub_event_del(stopev); ub_event_free(stopev); ub_timer_del(timer); ub_event_free(timer); ub_event_base_free(info.base); return; } dtio->stop_flush_event = stopev; /* wait briefly, or until finished */ verbose(VERB_ALGO, "dnstap io: stop flush started"); if(ub_event_base_dispatch(info.base) < 0) { log_err("dnstap io: dispatch flush failed, errno is %s", strerror(errno)); } verbose(VERB_ALGO, "dnstap io: stop flush ended"); free(info.stop_frame); dtio->stop_flush_event = NULL; ub_event_del(stopev); ub_event_free(stopev); ub_timer_del(timer); ub_event_free(timer); ub_event_base_free(info.base); } /** perform desetup and free stuff when the dnstap io thread exits */ static void dtio_desetup(struct dt_io_thread* dtio) { dtio_control_stop_flush(dtio); dtio_del_output_event(dtio); dtio_close_output(dtio); ub_event_del(dtio->command_event); ub_event_free(dtio->command_event); #ifndef USE_WINSOCK close(dtio->commandpipe[0]); #else _close(dtio->commandpipe[0]); #endif dtio->commandpipe[0] = -1; dtio_reconnect_del(dtio); ub_event_free(dtio->reconnect_timer); dtio_cur_msg_free(dtio); #ifndef THREADS_DISABLED ub_event_base_free(dtio->event_base); #endif } /** setup a start control message */ static int dtio_control_start_send(struct dt_io_thread* dtio) { log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, &dtio->cur_msg_len); if(!dtio->cur_msg) { return 0; } /* setup to send the control message */ /* set that the buffer needs to be sent, but the length * of that buffer is already written, that way the buffer can * start with 0 length and then the length of the control frame * in it */ dtio->cur_msg_done = 0; dtio->cur_msg_len_done = 4; return 1; } /** setup a ready control message */ static int dtio_control_ready_send(struct dt_io_thread* dtio) { log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, &dtio->cur_msg_len); if(!dtio->cur_msg) { return 0; } /* setup to send the control message */ /* set that the buffer needs to be sent, but the length * of that buffer is already written, that way the buffer can * start with 0 length and then the length of the control frame * in it */ dtio->cur_msg_done = 0; dtio->cur_msg_len_done = 4; return 1; } /** open the output file descriptor for af_local */ static int dtio_open_output_local(struct dt_io_thread* dtio) { #ifdef HAVE_SYS_UN_H struct sockaddr_un s; dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); if(dtio->fd == -1) { log_err("dnstap io: failed to create socket: %s", sock_strerror(errno)); return 0; } memset(&s, 0, sizeof(s)); #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN /* this member exists on BSDs, not Linux */ s.sun_len = (unsigned)sizeof(s); #endif s.sun_family = AF_LOCAL; /* length is 92-108, 104 on FreeBSD */ (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); fd_set_nonblock(dtio->fd); if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) == -1) { char* to = dtio->socket_path; if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && verbosity < 4) { dtio_close_fd(dtio); return 0; /* no log retries on low verbosity */ } log_err("dnstap io: failed to connect to \"%s\": %s", to, sock_strerror(errno)); dtio_close_fd(dtio); return 0; } return 1; #else log_err("cannot create af_local socket"); return 0; #endif /* HAVE_SYS_UN_H */ } /** open the output file descriptor for af_inet and af_inet6 */ static int dtio_open_output_tcp(struct dt_io_thread* dtio) { struct sockaddr_storage addr; socklen_t addrlen; memset(&addr, 0, sizeof(addr)); addrlen = (socklen_t)sizeof(addr); if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { log_err("could not parse IP '%s'", dtio->ip_str); return 0; } dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); if(dtio->fd == -1) { log_err("can't create socket: %s", sock_strerror(errno)); return 0; } fd_set_nonblock(dtio->fd); if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { if(errno == EINPROGRESS) return 1; /* wait until connect done*/ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && verbosity < 4) { dtio_close_fd(dtio); return 0; /* no log retries on low verbosity */ } #ifndef USE_WINSOCK if(tcp_connect_errno_needs_log( (struct sockaddr *)&addr, addrlen)) { log_err("dnstap io: failed to connect to %s: %s", dtio->ip_str, strerror(errno)); } #else if(WSAGetLastError() == WSAEINPROGRESS || WSAGetLastError() == WSAEWOULDBLOCK) return 1; /* wait until connect done*/ if(tcp_connect_errno_needs_log( (struct sockaddr *)&addr, addrlen)) { log_err("dnstap io: failed to connect to %s: %s", dtio->ip_str, wsa_strerror(WSAGetLastError())); } #endif dtio_close_fd(dtio); return 0; } return 1; } /** setup the SSL structure for new connection */ static int dtio_setup_ssl(struct dt_io_thread* dtio) { dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); if(!dtio->ssl) return 0; dtio->ssl_handshake_done = 0; dtio->ssl_brief_read = 0; if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, dtio->tls_use_sni)) { return 0; } return 1; } /** open the output file descriptor */ static void dtio_open_output(struct dt_io_thread* dtio) { struct ub_event* ev; if(dtio->upstream_is_unix) { if(!dtio_open_output_local(dtio)) { dtio_reconnect_enable(dtio); return; } } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { if(!dtio_open_output_tcp(dtio)) { dtio_reconnect_enable(dtio); return; } if(dtio->upstream_is_tls) { if(!dtio_setup_ssl(dtio)) { dtio_close_fd(dtio); dtio_reconnect_enable(dtio); return; } } } dtio->check_nb_connect = 1; /* the EV_READ is to read ACCEPT control messages, and catch channel * close. EV_WRITE is to write packets */ ev = ub_event_new(dtio->event_base, dtio->fd, UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, dtio); if(!ev) { log_err("dnstap io: out of memory"); if(dtio->ssl) { #ifdef HAVE_SSL SSL_free(dtio->ssl); dtio->ssl = NULL; #endif } dtio_close_fd(dtio); dtio_reconnect_enable(dtio); return; } dtio->event = ev; /* setup protocol control message to start */ if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { log_err("dnstap io: out of memory"); ub_event_free(dtio->event); dtio->event = NULL; if(dtio->ssl) { #ifdef HAVE_SSL SSL_free(dtio->ssl); dtio->ssl = NULL; #endif } dtio_close_fd(dtio); dtio_reconnect_enable(dtio); return; } } /** perform the setup of the writer thread on the established event_base */ static void dtio_setup_on_base(struct dt_io_thread* dtio) { dtio_setup_cmd(dtio); dtio_setup_reconnect(dtio); dtio_open_output(dtio); if(!dtio_add_output_event_write(dtio)) return; } #ifndef THREADS_DISABLED /** the IO thread function for the DNSTAP IO */ static void* dnstap_io(void* arg) { struct dt_io_thread* dtio = (struct dt_io_thread*)arg; time_t secs = 0; struct timeval now; log_thread_set(&dtio->threadnum); /* setup */ verbose(VERB_ALGO, "start dnstap io thread"); dtio_setup_base(dtio, &secs, &now); dtio_setup_on_base(dtio); /* run */ if(ub_event_base_dispatch(dtio->event_base) < 0) { log_err("dnstap io: dispatch failed, errno is %s", strerror(errno)); } /* cleanup */ verbose(VERB_ALGO, "stop dnstap io thread"); dtio_desetup(dtio); return NULL; } #endif /* THREADS_DISABLED */ int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, int numworkers) { /* set up the thread, can fail */ #ifndef USE_WINSOCK if(pipe(dtio->commandpipe) == -1) { log_err("failed to create pipe: %s", strerror(errno)); return 0; } #else if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { log_err("failed to create _pipe: %s", wsa_strerror(WSAGetLastError())); return 0; } #endif /* start the thread */ dtio->threadnum = numworkers+1; dtio->started = 1; #ifndef THREADS_DISABLED ub_thread_create(&dtio->tid, dnstap_io, dtio); (void)event_base_nothr; #else dtio->event_base = event_base_nothr; dtio_setup_on_base(dtio); #endif return 1; } void dt_io_thread_stop(struct dt_io_thread* dtio) { #ifndef THREADS_DISABLED uint8_t cmd = DTIO_COMMAND_STOP; #endif if(!dtio) return; if(!dtio->started) return; verbose(VERB_ALGO, "dnstap io: send stop cmd"); #ifndef THREADS_DISABLED while(1) { ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); if(r == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) continue; #else if(WSAGetLastError() == WSAEINPROGRESS) continue; if(WSAGetLastError() == WSAEWOULDBLOCK) continue; #endif log_err("dnstap io stop: write: %s", sock_strerror(errno)); break; } break; } dtio->started = 0; #endif /* THREADS_DISABLED */ #ifndef USE_WINSOCK close(dtio->commandpipe[1]); #else _close(dtio->commandpipe[1]); #endif dtio->commandpipe[1] = -1; #ifndef THREADS_DISABLED ub_thread_join(dtio->tid); #else dtio->want_to_exit = 1; dtio_desetup(dtio); #endif }