125039b37SCy Schubert /* 225039b37SCy Schubert * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 325039b37SCy Schubert * 425039b37SCy Schubert * Copyright (c) 2020, NLnet Labs. All rights reserved. 525039b37SCy Schubert * 625039b37SCy Schubert * This software is open source. 725039b37SCy Schubert * 825039b37SCy Schubert * Redistribution and use in source and binary forms, with or without 925039b37SCy Schubert * modification, are permitted provided that the following conditions 1025039b37SCy Schubert * are met: 1125039b37SCy Schubert * 1225039b37SCy Schubert * Redistributions of source code must retain the above copyright notice, 1325039b37SCy Schubert * this list of conditions and the following disclaimer. 1425039b37SCy Schubert * 1525039b37SCy Schubert * Redistributions in binary form must reproduce the above copyright notice, 1625039b37SCy Schubert * this list of conditions and the following disclaimer in the documentation 1725039b37SCy Schubert * and/or other materials provided with the distribution. 1825039b37SCy Schubert * 1925039b37SCy Schubert * Neither the name of the NLNET LABS nor the names of its contributors may 2025039b37SCy Schubert * be used to endorse or promote products derived from this software without 2125039b37SCy Schubert * specific prior written permission. 2225039b37SCy Schubert * 2325039b37SCy Schubert * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2425039b37SCy Schubert * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 2525039b37SCy Schubert * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 2625039b37SCy Schubert * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 2725039b37SCy Schubert * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 2825039b37SCy Schubert * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 2925039b37SCy Schubert * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 3025039b37SCy Schubert * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 3125039b37SCy Schubert * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 3225039b37SCy Schubert * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 3325039b37SCy Schubert * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3425039b37SCy Schubert * 3525039b37SCy Schubert */ 3625039b37SCy Schubert 3725039b37SCy Schubert /** 3825039b37SCy Schubert * \file 3925039b37SCy Schubert * 4025039b37SCy Schubert * An implementation of the Frame Streams data transport protocol for 4125039b37SCy Schubert * the Unbound DNSTAP message logging facility. 4225039b37SCy Schubert */ 4325039b37SCy Schubert 4425039b37SCy Schubert #include "config.h" 4525039b37SCy Schubert #include "dnstap/dtstream.h" 4625039b37SCy Schubert #include "dnstap/dnstap_fstrm.h" 4725039b37SCy Schubert #include "util/config_file.h" 4825039b37SCy Schubert #include "util/ub_event.h" 4925039b37SCy Schubert #include "util/net_help.h" 5025039b37SCy Schubert #include "services/outside_network.h" 5125039b37SCy Schubert #include "sldns/sbuffer.h" 5225039b37SCy Schubert #ifdef HAVE_SYS_UN_H 5325039b37SCy Schubert #include <sys/un.h> 5425039b37SCy Schubert #endif 5525039b37SCy Schubert #include <fcntl.h> 5625039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H 5725039b37SCy Schubert #include <openssl/ssl.h> 5825039b37SCy Schubert #endif 5925039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H 6025039b37SCy Schubert #include <openssl/err.h> 6125039b37SCy Schubert #endif 6225039b37SCy Schubert 6325039b37SCy Schubert /** number of messages to process in one output callback */ 6425039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100 6525039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */ 6625039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10 6725039b37SCy Schubert /** the msec to wait for reconnect max after backoff */ 6825039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000 6925039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 7025039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71c0caa2e2SCy Schubert /** number of messages before wakeup of thread */ 72c0caa2e2SCy Schubert #define DTIO_MSG_FOR_WAKEUP 32 7325039b37SCy Schubert 7425039b37SCy Schubert /** maximum length of received frame */ 7525039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000 7625039b37SCy Schubert 7725039b37SCy Schubert struct stop_flush_info; 7825039b37SCy Schubert /** DTIO command channel commands */ 7925039b37SCy Schubert enum { 8025039b37SCy Schubert /** DTIO command channel stop */ 8125039b37SCy Schubert DTIO_COMMAND_STOP = 0, 8225039b37SCy Schubert /** DTIO command channel wakeup */ 8325039b37SCy Schubert DTIO_COMMAND_WAKEUP = 1 8425039b37SCy Schubert } dtio_channel_command; 8525039b37SCy Schubert 8625039b37SCy Schubert /** open the output channel */ 8725039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio); 8825039b37SCy Schubert /** add output event for read and write */ 8925039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio); 9025039b37SCy Schubert /** start reconnection attempts */ 9125039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio); 9225039b37SCy Schubert /** stop from stop_flush event loop */ 9325039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info); 9425039b37SCy Schubert /** setup a start control message */ 9525039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio); 9625039b37SCy Schubert #ifdef HAVE_SSL 9725039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */ 9825039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio); 9925039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */ 10025039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio); 10125039b37SCy Schubert #endif 10225039b37SCy Schubert 10325039b37SCy Schubert struct dt_msg_queue* 104c0caa2e2SCy Schubert dt_msg_queue_create(struct comm_base* base) 10525039b37SCy Schubert { 10625039b37SCy Schubert struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 10725039b37SCy Schubert if(!mq) return NULL; 10825039b37SCy Schubert mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 10925039b37SCy Schubert about 1 M should contain 64K messages with some overhead, 11025039b37SCy Schubert or a whole bunch smaller ones */ 111c0caa2e2SCy Schubert mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112c0caa2e2SCy Schubert if(!mq->wakeup_timer) { 113c0caa2e2SCy Schubert free(mq); 114c0caa2e2SCy Schubert return NULL; 115c0caa2e2SCy Schubert } 11625039b37SCy Schubert lock_basic_init(&mq->lock); 11725039b37SCy Schubert lock_protect(&mq->lock, mq, sizeof(*mq)); 11825039b37SCy Schubert return mq; 11925039b37SCy Schubert } 12025039b37SCy Schubert 12125039b37SCy Schubert /** clear the message list, caller must hold the lock */ 12225039b37SCy Schubert static void 12325039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq) 12425039b37SCy Schubert { 12525039b37SCy Schubert struct dt_msg_entry* e = mq->first, *next=NULL; 12625039b37SCy Schubert while(e) { 12725039b37SCy Schubert next = e->next; 12825039b37SCy Schubert free(e->buf); 12925039b37SCy Schubert free(e); 13025039b37SCy Schubert e = next; 13125039b37SCy Schubert } 13225039b37SCy Schubert mq->first = NULL; 13325039b37SCy Schubert mq->last = NULL; 13425039b37SCy Schubert mq->cursize = 0; 135c0caa2e2SCy Schubert mq->msgcount = 0; 13625039b37SCy Schubert } 13725039b37SCy Schubert 13825039b37SCy Schubert void 13925039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq) 14025039b37SCy Schubert { 14125039b37SCy Schubert if(!mq) return; 14225039b37SCy Schubert lock_basic_destroy(&mq->lock); 14325039b37SCy Schubert dt_msg_queue_clear(mq); 144c0caa2e2SCy Schubert comm_timer_delete(mq->wakeup_timer); 14525039b37SCy Schubert free(mq); 14625039b37SCy Schubert } 14725039b37SCy Schubert 14825039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */ 14925039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio) 15025039b37SCy Schubert { 15125039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_WAKEUP; 15225039b37SCy Schubert if(!dtio) return; 15325039b37SCy Schubert if(!dtio->started) return; 15425039b37SCy Schubert 15525039b37SCy Schubert while(1) { 15625039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 15725039b37SCy Schubert if(r == -1) { 15825039b37SCy Schubert #ifndef USE_WINSOCK 15925039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 16025039b37SCy Schubert continue; 16125039b37SCy Schubert #else 16225039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 16325039b37SCy Schubert continue; 16425039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 16525039b37SCy Schubert continue; 16625039b37SCy Schubert #endif 167c0caa2e2SCy Schubert log_err("dnstap io wakeup: write: %s", 168c0caa2e2SCy Schubert sock_strerror(errno)); 16925039b37SCy Schubert break; 17025039b37SCy Schubert } 17125039b37SCy Schubert break; 17225039b37SCy Schubert } 17325039b37SCy Schubert } 17425039b37SCy Schubert 17525039b37SCy Schubert void 176c0caa2e2SCy Schubert mq_wakeup_cb(void* arg) 177c0caa2e2SCy Schubert { 178c0caa2e2SCy Schubert struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179c0caa2e2SCy Schubert /* even if the dtio is already active, because perhaps much 180c0caa2e2SCy Schubert * traffic suddenly, we leave the timer running to save on 181c0caa2e2SCy Schubert * managing it, the once a second timer is less work then 182c0caa2e2SCy Schubert * starting and stopping the timer frequently */ 183c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 0; 185c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186c0caa2e2SCy Schubert dtio_wakeup(mq->dtio); 187c0caa2e2SCy Schubert } 188c0caa2e2SCy Schubert 189c0caa2e2SCy Schubert /** start timer to wakeup dtio because there is content in the queue */ 190c0caa2e2SCy Schubert static void 1919cf5bc93SCy Schubert dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow) 192c0caa2e2SCy Schubert { 1939cf5bc93SCy Schubert struct timeval tv = {0}; 194c0caa2e2SCy Schubert /* Start a timer to process messages to be logged. 195c0caa2e2SCy Schubert * If we woke up the dtio thread for every message, the wakeup 196c0caa2e2SCy Schubert * messages take up too much processing power. If the queue 197c0caa2e2SCy Schubert * fills up the wakeup happens immediately. The timer wakes it up 198c0caa2e2SCy Schubert * if there are infrequent messages to log. */ 199c0caa2e2SCy Schubert 200c0caa2e2SCy Schubert /* we cannot start a timer in dtio thread, because it is a different 201c0caa2e2SCy Schubert * thread and its event base is in use by the other thread, it would 202c0caa2e2SCy Schubert * give race conditions if we tried to modify its event base, 203c0caa2e2SCy Schubert * and locks would wait until it woke up, and this is what we do. */ 204c0caa2e2SCy Schubert 205c0caa2e2SCy Schubert /* do not start the timer if a timer already exists, perhaps 206c0caa2e2SCy Schubert * in another worker. So this variable is protected by a lock in 2079cf5bc93SCy Schubert * dtio. */ 2089cf5bc93SCy Schubert 2099cf5bc93SCy Schubert /* If we need to wakeupnow, 0 the timer to force the callback. */ 210c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 211c0caa2e2SCy Schubert if(mq->dtio->wakeup_timer_enabled) { 2129cf5bc93SCy Schubert if(wakeupnow) { 2139cf5bc93SCy Schubert comm_timer_set(mq->wakeup_timer, &tv); 2149cf5bc93SCy Schubert } 215c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 216c0caa2e2SCy Schubert return; 217c0caa2e2SCy Schubert } 218c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 219c0caa2e2SCy Schubert 220c0caa2e2SCy Schubert /* start the timer, in mq, in the event base of our worker */ 2219cf5bc93SCy Schubert if(!wakeupnow) { 222c0caa2e2SCy Schubert tv.tv_sec = 1; 223c0caa2e2SCy Schubert tv.tv_usec = 0; 2249cf5bc93SCy Schubert } 225c0caa2e2SCy Schubert comm_timer_set(mq->wakeup_timer, &tv); 2269cf5bc93SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 227c0caa2e2SCy Schubert } 228c0caa2e2SCy Schubert 229c0caa2e2SCy Schubert void 23025039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 23125039b37SCy Schubert { 232c0caa2e2SCy Schubert int wakeupnow = 0, wakeupstarttimer = 0; 23325039b37SCy Schubert struct dt_msg_entry* entry; 23425039b37SCy Schubert 23525039b37SCy Schubert /* check conditions */ 23625039b37SCy Schubert if(!buf) return; 23725039b37SCy Schubert if(len == 0) { 23825039b37SCy Schubert /* it is not possible to log entries with zero length, 23925039b37SCy Schubert * because the framestream protocol does not carry it. 24025039b37SCy Schubert * However the protobuf serialization does not create zero 24125039b37SCy Schubert * length datagrams for dnstap, so this should not happen. */ 24225039b37SCy Schubert free(buf); 24325039b37SCy Schubert return; 24425039b37SCy Schubert } 24525039b37SCy Schubert if(!mq) { 24625039b37SCy Schubert free(buf); 24725039b37SCy Schubert return; 24825039b37SCy Schubert } 24925039b37SCy Schubert 25025039b37SCy Schubert /* allocate memory for queue entry */ 25125039b37SCy Schubert entry = malloc(sizeof(*entry)); 25225039b37SCy Schubert if(!entry) { 25325039b37SCy Schubert log_err("out of memory logging dnstap"); 25425039b37SCy Schubert free(buf); 25525039b37SCy Schubert return; 25625039b37SCy Schubert } 25725039b37SCy Schubert entry->next = NULL; 25825039b37SCy Schubert entry->buf = buf; 25925039b37SCy Schubert entry->len = len; 26025039b37SCy Schubert 26124e36522SCy Schubert /* acquire lock */ 26225039b37SCy Schubert lock_basic_lock(&mq->lock); 263c0caa2e2SCy Schubert /* if list was empty, start timer for (eventual) wakeup */ 26425039b37SCy Schubert if(mq->first == NULL) 265c0caa2e2SCy Schubert wakeupstarttimer = 1; 266c0caa2e2SCy Schubert /* if list contains more than wakeupnum elements, wakeup now, 267c0caa2e2SCy Schubert * or if list is (going to be) almost full */ 268c0caa2e2SCy Schubert if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 269c0caa2e2SCy Schubert (mq->cursize < mq->maxsize * 9 / 10 && 270c0caa2e2SCy Schubert mq->cursize+len >= mq->maxsize * 9 / 10)) 271c0caa2e2SCy Schubert wakeupnow = 1; 27225039b37SCy Schubert /* see if it is going to fit */ 27325039b37SCy Schubert if(mq->cursize + len > mq->maxsize) { 27425039b37SCy Schubert /* buffer full, or congested. */ 27525039b37SCy Schubert /* drop */ 27625039b37SCy Schubert lock_basic_unlock(&mq->lock); 27725039b37SCy Schubert free(buf); 27825039b37SCy Schubert free(entry); 27925039b37SCy Schubert return; 28025039b37SCy Schubert } 28125039b37SCy Schubert mq->cursize += len; 282c0caa2e2SCy Schubert mq->msgcount ++; 28325039b37SCy Schubert /* append to list */ 28425039b37SCy Schubert if(mq->last) { 28525039b37SCy Schubert mq->last->next = entry; 28625039b37SCy Schubert } else { 28725039b37SCy Schubert mq->first = entry; 28825039b37SCy Schubert } 28925039b37SCy Schubert mq->last = entry; 29025039b37SCy Schubert /* release lock */ 29125039b37SCy Schubert lock_basic_unlock(&mq->lock); 29225039b37SCy Schubert 2939cf5bc93SCy Schubert if(wakeupnow || wakeupstarttimer) { 2949cf5bc93SCy Schubert dt_msg_queue_start_timer(mq, wakeupnow); 295c0caa2e2SCy Schubert } 29625039b37SCy Schubert } 29725039b37SCy Schubert 29825039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void) 29925039b37SCy Schubert { 30025039b37SCy Schubert struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 301c0caa2e2SCy Schubert lock_basic_init(&dtio->wakeup_timer_lock); 302c0caa2e2SCy Schubert lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 303c0caa2e2SCy Schubert sizeof(dtio->wakeup_timer_enabled)); 30425039b37SCy Schubert return dtio; 30525039b37SCy Schubert } 30625039b37SCy Schubert 30725039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio) 30825039b37SCy Schubert { 30925039b37SCy Schubert struct dt_io_list_item* item, *nextitem; 31025039b37SCy Schubert if(!dtio) return; 311c0caa2e2SCy Schubert lock_basic_destroy(&dtio->wakeup_timer_lock); 31225039b37SCy Schubert item=dtio->io_list; 31325039b37SCy Schubert while(item) { 31425039b37SCy Schubert nextitem = item->next; 31525039b37SCy Schubert free(item); 31625039b37SCy Schubert item = nextitem; 31725039b37SCy Schubert } 31825039b37SCy Schubert free(dtio->socket_path); 31925039b37SCy Schubert free(dtio->ip_str); 32025039b37SCy Schubert free(dtio->tls_server_name); 32125039b37SCy Schubert free(dtio->client_key_file); 32225039b37SCy Schubert free(dtio->client_cert_file); 32325039b37SCy Schubert if(dtio->ssl_ctx) { 32425039b37SCy Schubert #ifdef HAVE_SSL 32525039b37SCy Schubert SSL_CTX_free(dtio->ssl_ctx); 32625039b37SCy Schubert #endif 32725039b37SCy Schubert } 32825039b37SCy Schubert free(dtio); 32925039b37SCy Schubert } 33025039b37SCy Schubert 33125039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 33225039b37SCy Schubert { 33325039b37SCy Schubert if(!cfg->dnstap) { 33425039b37SCy Schubert log_warn("cannot setup dnstap because dnstap-enable is no"); 33525039b37SCy Schubert return 0; 33625039b37SCy Schubert } 33725039b37SCy Schubert 33825039b37SCy Schubert /* what type of connectivity do we have */ 33925039b37SCy Schubert if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 34025039b37SCy Schubert if(cfg->dnstap_tls) 34125039b37SCy Schubert dtio->upstream_is_tls = 1; 34225039b37SCy Schubert else dtio->upstream_is_tcp = 1; 34325039b37SCy Schubert } else { 34425039b37SCy Schubert dtio->upstream_is_unix = 1; 34525039b37SCy Schubert } 34625039b37SCy Schubert dtio->is_bidirectional = cfg->dnstap_bidirectional; 34725039b37SCy Schubert 34825039b37SCy Schubert if(dtio->upstream_is_unix) { 349369c6923SCy Schubert char* nm; 35025039b37SCy Schubert if(!cfg->dnstap_socket_path || 35125039b37SCy Schubert cfg->dnstap_socket_path[0]==0) { 35225039b37SCy Schubert log_err("dnstap setup: no dnstap-socket-path for " 35325039b37SCy Schubert "socket connect"); 35425039b37SCy Schubert return 0; 35525039b37SCy Schubert } 356369c6923SCy Schubert nm = cfg->dnstap_socket_path; 357369c6923SCy Schubert if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 358369c6923SCy Schubert cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 359369c6923SCy Schubert nm += strlen(cfg->chrootdir); 36025039b37SCy Schubert free(dtio->socket_path); 361369c6923SCy Schubert dtio->socket_path = strdup(nm); 36225039b37SCy Schubert if(!dtio->socket_path) { 36325039b37SCy Schubert log_err("dnstap setup: malloc failure"); 36425039b37SCy Schubert return 0; 36525039b37SCy Schubert } 36625039b37SCy Schubert } 36725039b37SCy Schubert 36825039b37SCy Schubert if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 36925039b37SCy Schubert if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 37025039b37SCy Schubert log_err("dnstap setup: no dnstap-ip for TCP connect"); 37125039b37SCy Schubert return 0; 37225039b37SCy Schubert } 37325039b37SCy Schubert free(dtio->ip_str); 37425039b37SCy Schubert dtio->ip_str = strdup(cfg->dnstap_ip); 37525039b37SCy Schubert if(!dtio->ip_str) { 37625039b37SCy Schubert log_err("dnstap setup: malloc failure"); 37725039b37SCy Schubert return 0; 37825039b37SCy Schubert } 37925039b37SCy Schubert } 38025039b37SCy Schubert 38125039b37SCy Schubert if(dtio->upstream_is_tls) { 38225039b37SCy Schubert #ifdef HAVE_SSL 38325039b37SCy Schubert if(cfg->dnstap_tls_server_name && 38425039b37SCy Schubert cfg->dnstap_tls_server_name[0]) { 38525039b37SCy Schubert free(dtio->tls_server_name); 38625039b37SCy Schubert dtio->tls_server_name = strdup( 38725039b37SCy Schubert cfg->dnstap_tls_server_name); 38825039b37SCy Schubert if(!dtio->tls_server_name) { 38925039b37SCy Schubert log_err("dnstap setup: malloc failure"); 39025039b37SCy Schubert return 0; 39125039b37SCy Schubert } 39225039b37SCy Schubert if(!check_auth_name_for_ssl(dtio->tls_server_name)) 39325039b37SCy Schubert return 0; 39425039b37SCy Schubert } 39525039b37SCy Schubert if(cfg->dnstap_tls_client_key_file && 39625039b37SCy Schubert cfg->dnstap_tls_client_key_file[0]) { 39725039b37SCy Schubert dtio->use_client_certs = 1; 39825039b37SCy Schubert free(dtio->client_key_file); 39925039b37SCy Schubert dtio->client_key_file = strdup( 40025039b37SCy Schubert cfg->dnstap_tls_client_key_file); 40125039b37SCy Schubert if(!dtio->client_key_file) { 40225039b37SCy Schubert log_err("dnstap setup: malloc failure"); 40325039b37SCy Schubert return 0; 40425039b37SCy Schubert } 40525039b37SCy Schubert if(!cfg->dnstap_tls_client_cert_file || 40625039b37SCy Schubert cfg->dnstap_tls_client_cert_file[0]==0) { 40725039b37SCy Schubert log_err("dnstap setup: client key " 40825039b37SCy Schubert "authentication enabled with " 40925039b37SCy Schubert "dnstap-tls-client-key-file, but " 41025039b37SCy Schubert "no dnstap-tls-client-cert-file " 41125039b37SCy Schubert "is given"); 41225039b37SCy Schubert return 0; 41325039b37SCy Schubert } 41425039b37SCy Schubert free(dtio->client_cert_file); 41525039b37SCy Schubert dtio->client_cert_file = strdup( 41625039b37SCy Schubert cfg->dnstap_tls_client_cert_file); 41725039b37SCy Schubert if(!dtio->client_cert_file) { 41825039b37SCy Schubert log_err("dnstap setup: malloc failure"); 41925039b37SCy Schubert return 0; 42025039b37SCy Schubert } 42125039b37SCy Schubert } else { 42225039b37SCy Schubert dtio->use_client_certs = 0; 42325039b37SCy Schubert dtio->client_key_file = NULL; 42425039b37SCy Schubert dtio->client_cert_file = NULL; 42525039b37SCy Schubert } 42625039b37SCy Schubert 42725039b37SCy Schubert if(cfg->dnstap_tls_cert_bundle) { 42825039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 42925039b37SCy Schubert dtio->client_key_file, 43025039b37SCy Schubert dtio->client_cert_file, 43125039b37SCy Schubert cfg->dnstap_tls_cert_bundle, 0); 43225039b37SCy Schubert } else { 43325039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 43425039b37SCy Schubert dtio->client_key_file, 43525039b37SCy Schubert dtio->client_cert_file, 43625039b37SCy Schubert cfg->tls_cert_bundle, cfg->tls_win_cert); 43725039b37SCy Schubert } 43825039b37SCy Schubert if(!dtio->ssl_ctx) { 43925039b37SCy Schubert log_err("could not setup SSL CTX"); 44025039b37SCy Schubert return 0; 44125039b37SCy Schubert } 44225039b37SCy Schubert dtio->tls_use_sni = cfg->tls_use_sni; 44325039b37SCy Schubert #endif /* HAVE_SSL */ 44425039b37SCy Schubert } 44525039b37SCy Schubert return 1; 44625039b37SCy Schubert } 44725039b37SCy Schubert 44825039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio, 44925039b37SCy Schubert struct dt_msg_queue* mq) 45025039b37SCy Schubert { 45125039b37SCy Schubert struct dt_io_list_item* item = malloc(sizeof(*item)); 45225039b37SCy Schubert if(!item) return 0; 45325039b37SCy Schubert lock_basic_lock(&mq->lock); 45425039b37SCy Schubert mq->dtio = dtio; 45525039b37SCy Schubert lock_basic_unlock(&mq->lock); 45625039b37SCy Schubert item->queue = mq; 45725039b37SCy Schubert item->next = dtio->io_list; 45825039b37SCy Schubert dtio->io_list = item; 45925039b37SCy Schubert dtio->io_list_iter = NULL; 46025039b37SCy Schubert return 1; 46125039b37SCy Schubert } 46225039b37SCy Schubert 46325039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 46425039b37SCy Schubert struct dt_msg_queue* mq) 46525039b37SCy Schubert { 46625039b37SCy Schubert struct dt_io_list_item* item, *prev=NULL; 46725039b37SCy Schubert if(!dtio) return; 46825039b37SCy Schubert item = dtio->io_list; 46925039b37SCy Schubert while(item) { 47025039b37SCy Schubert if(item->queue == mq) { 47125039b37SCy Schubert /* found it */ 47225039b37SCy Schubert if(prev) prev->next = item->next; 47325039b37SCy Schubert else dtio->io_list = item->next; 47425039b37SCy Schubert /* the queue itself only registered, not deleted */ 47525039b37SCy Schubert lock_basic_lock(&item->queue->lock); 47625039b37SCy Schubert item->queue->dtio = NULL; 47725039b37SCy Schubert lock_basic_unlock(&item->queue->lock); 47825039b37SCy Schubert free(item); 47925039b37SCy Schubert dtio->io_list_iter = NULL; 48025039b37SCy Schubert return; 48125039b37SCy Schubert } 48225039b37SCy Schubert prev = item; 48325039b37SCy Schubert item = item->next; 48425039b37SCy Schubert } 48525039b37SCy Schubert } 48625039b37SCy Schubert 48725039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks, 48825039b37SCy Schubert * returns true if there is a message */ 48925039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 49025039b37SCy Schubert size_t* len) 49125039b37SCy Schubert { 49225039b37SCy Schubert lock_basic_lock(&mq->lock); 49325039b37SCy Schubert if(mq->first) { 49425039b37SCy Schubert struct dt_msg_entry* entry = mq->first; 49525039b37SCy Schubert mq->first = entry->next; 49625039b37SCy Schubert if(!entry->next) mq->last = NULL; 49725039b37SCy Schubert mq->cursize -= entry->len; 498c0caa2e2SCy Schubert mq->msgcount --; 49925039b37SCy Schubert lock_basic_unlock(&mq->lock); 50025039b37SCy Schubert 50125039b37SCy Schubert *buf = entry->buf; 50225039b37SCy Schubert *len = entry->len; 50325039b37SCy Schubert free(entry); 50425039b37SCy Schubert return 1; 50525039b37SCy Schubert } 50625039b37SCy Schubert lock_basic_unlock(&mq->lock); 50725039b37SCy Schubert return 0; 50825039b37SCy Schubert } 50925039b37SCy Schubert 51025039b37SCy Schubert /** find message in queue, false if no message, true if message to send */ 51125039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio, 51225039b37SCy Schubert struct dt_msg_queue* mq) 51325039b37SCy Schubert { 51425039b37SCy Schubert void* buf=NULL; 51525039b37SCy Schubert size_t len=0; 51625039b37SCy Schubert if(dt_msg_queue_pop(mq, &buf, &len)) { 51725039b37SCy Schubert dtio->cur_msg = buf; 51825039b37SCy Schubert dtio->cur_msg_len = len; 51925039b37SCy Schubert dtio->cur_msg_done = 0; 52025039b37SCy Schubert dtio->cur_msg_len_done = 0; 52125039b37SCy Schubert return 1; 52225039b37SCy Schubert } 52325039b37SCy Schubert return 0; 52425039b37SCy Schubert } 52525039b37SCy Schubert 52625039b37SCy Schubert /** find a new message to write, search message queues, false if none */ 52725039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio) 52825039b37SCy Schubert { 52925039b37SCy Schubert struct dt_io_list_item *spot, *item; 53025039b37SCy Schubert 53125039b37SCy Schubert spot = dtio->io_list_iter; 53225039b37SCy Schubert /* use the next queue for the next message lookup, 53325039b37SCy Schubert * if we hit the end(NULL) the NULL restarts the iter at start. */ 53425039b37SCy Schubert if(spot) 53525039b37SCy Schubert dtio->io_list_iter = spot->next; 53625039b37SCy Schubert else if(dtio->io_list) 53725039b37SCy Schubert dtio->io_list_iter = dtio->io_list->next; 53825039b37SCy Schubert 53925039b37SCy Schubert /* scan from spot to end-of-io_list */ 54025039b37SCy Schubert item = spot; 54125039b37SCy Schubert while(item) { 54225039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 54325039b37SCy Schubert return 1; 54425039b37SCy Schubert item = item->next; 54525039b37SCy Schubert } 54625039b37SCy Schubert /* scan starting at the start-of-list (to wrap around the end) */ 54725039b37SCy Schubert item = dtio->io_list; 54825039b37SCy Schubert while(item) { 54925039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 55025039b37SCy Schubert return 1; 55125039b37SCy Schubert item = item->next; 55225039b37SCy Schubert } 55325039b37SCy Schubert return 0; 55425039b37SCy Schubert } 55525039b37SCy Schubert 55625039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */ 55725039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 55825039b37SCy Schubert short ATTR_UNUSED(bits), void* arg) 55925039b37SCy Schubert { 56025039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 56125039b37SCy Schubert dtio->reconnect_is_added = 0; 56225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: reconnect timer"); 56325039b37SCy Schubert 56425039b37SCy Schubert dtio_open_output(dtio); 56525039b37SCy Schubert if(dtio->event) { 56625039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 56725039b37SCy Schubert return; 56825039b37SCy Schubert /* nothing wrong so far, wait on the output event */ 56925039b37SCy Schubert return; 57025039b37SCy Schubert } 57125039b37SCy Schubert /* exponential backoff and retry on timer */ 57225039b37SCy Schubert dtio_reconnect_enable(dtio); 57325039b37SCy Schubert } 57425039b37SCy Schubert 57525039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */ 57625039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio) 57725039b37SCy Schubert { 57825039b37SCy Schubert struct timeval tv; 57925039b37SCy Schubert int msec; 58025039b37SCy Schubert if(dtio->want_to_exit) return; 58125039b37SCy Schubert if(dtio->reconnect_is_added) 58225039b37SCy Schubert return; /* already done */ 58325039b37SCy Schubert 58425039b37SCy Schubert /* exponential backoff, store the value for next timeout */ 58525039b37SCy Schubert msec = dtio->reconnect_timeout; 58625039b37SCy Schubert if(msec == 0) { 58725039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 58825039b37SCy Schubert } else { 58925039b37SCy Schubert dtio->reconnect_timeout = msec*2; 59025039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 59125039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 59225039b37SCy Schubert } 59325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 59425039b37SCy Schubert msec); 59525039b37SCy Schubert 59625039b37SCy Schubert /* setup wait timer */ 59725039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 59825039b37SCy Schubert tv.tv_sec = msec/1000; 59925039b37SCy Schubert tv.tv_usec = (msec%1000)*1000; 60025039b37SCy Schubert if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 60125039b37SCy Schubert &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 60225039b37SCy Schubert log_err("dnstap io: could not reconnect ev timer add"); 60325039b37SCy Schubert return; 60425039b37SCy Schubert } 60525039b37SCy Schubert dtio->reconnect_is_added = 1; 60625039b37SCy Schubert } 60725039b37SCy Schubert 60825039b37SCy Schubert /** remove dtio reconnect timer */ 60925039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio) 61025039b37SCy Schubert { 61125039b37SCy Schubert if(!dtio->reconnect_is_added) 61225039b37SCy Schubert return; 61325039b37SCy Schubert ub_timer_del(dtio->reconnect_timer); 61425039b37SCy Schubert dtio->reconnect_is_added = 0; 61525039b37SCy Schubert } 61625039b37SCy Schubert 61725039b37SCy Schubert /** clear the reconnect exponential backoff timer. 61825039b37SCy Schubert * We have successfully connected so we can try again with short timeouts. */ 61925039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio) 62025039b37SCy Schubert { 62125039b37SCy Schubert dtio->reconnect_timeout = 0; 62225039b37SCy Schubert dtio_reconnect_del(dtio); 62325039b37SCy Schubert } 62425039b37SCy Schubert 62525039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */ 62625039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 62725039b37SCy Schubert { 62825039b37SCy Schubert dtio_reconnect_del(dtio); 62925039b37SCy Schubert dtio->reconnect_timeout = msec; 63025039b37SCy Schubert dtio_reconnect_enable(dtio); 63125039b37SCy Schubert } 63225039b37SCy Schubert 63325039b37SCy Schubert /** delete the current message in the dtio, and reset counters */ 63425039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio) 63525039b37SCy Schubert { 63625039b37SCy Schubert free(dtio->cur_msg); 63725039b37SCy Schubert dtio->cur_msg = NULL; 63825039b37SCy Schubert dtio->cur_msg_len = 0; 63925039b37SCy Schubert dtio->cur_msg_done = 0; 64025039b37SCy Schubert dtio->cur_msg_len_done = 0; 64125039b37SCy Schubert } 64225039b37SCy Schubert 64325039b37SCy Schubert /** delete the buffer and counters used to read frame */ 64425039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 64525039b37SCy Schubert { 64625039b37SCy Schubert if(rb->buf) { 64725039b37SCy Schubert free(rb->buf); 64825039b37SCy Schubert rb->buf = NULL; 64925039b37SCy Schubert } 65025039b37SCy Schubert rb->buf_count = 0; 65125039b37SCy Schubert rb->buf_cap = 0; 65225039b37SCy Schubert rb->frame_len = 0; 65325039b37SCy Schubert rb->frame_len_done = 0; 65425039b37SCy Schubert rb->control_frame = 0; 65525039b37SCy Schubert } 65625039b37SCy Schubert 65725039b37SCy Schubert /** del the output file descriptor event for listening */ 65825039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio) 65925039b37SCy Schubert { 66025039b37SCy Schubert if(!dtio->event_added) 66125039b37SCy Schubert return; 66225039b37SCy Schubert ub_event_del(dtio->event); 66325039b37SCy Schubert dtio->event_added = 0; 66425039b37SCy Schubert dtio->event_added_is_write = 0; 66525039b37SCy Schubert } 66625039b37SCy Schubert 66725039b37SCy Schubert /** close dtio socket and set it to -1 */ 66825039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio) 66925039b37SCy Schubert { 670c0caa2e2SCy Schubert sock_close(dtio->fd); 67125039b37SCy Schubert dtio->fd = -1; 67225039b37SCy Schubert } 67325039b37SCy Schubert 67425039b37SCy Schubert /** close and stop the output file descriptor event */ 67525039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio) 67625039b37SCy Schubert { 67725039b37SCy Schubert if(!dtio->event) 67825039b37SCy Schubert return; 67925039b37SCy Schubert ub_event_free(dtio->event); 68025039b37SCy Schubert dtio->event = NULL; 68125039b37SCy Schubert if(dtio->ssl) { 68225039b37SCy Schubert #ifdef HAVE_SSL 68325039b37SCy Schubert SSL_shutdown(dtio->ssl); 68425039b37SCy Schubert SSL_free(dtio->ssl); 68525039b37SCy Schubert dtio->ssl = NULL; 68625039b37SCy Schubert #endif 68725039b37SCy Schubert } 68825039b37SCy Schubert dtio_close_fd(dtio); 68925039b37SCy Schubert 69025039b37SCy Schubert /* if there is a (partial) message, discard it 69125039b37SCy Schubert * we cannot send (the remainder of) it, and a new 69225039b37SCy Schubert * connection needs to start with a control frame. */ 69325039b37SCy Schubert if(dtio->cur_msg) { 69425039b37SCy Schubert dtio_cur_msg_free(dtio); 69525039b37SCy Schubert } 69625039b37SCy Schubert 69725039b37SCy Schubert dtio->ready_frame_sent = 0; 69825039b37SCy Schubert dtio->accept_frame_received = 0; 69925039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 70025039b37SCy Schubert 70125039b37SCy Schubert dtio_reconnect_enable(dtio); 70225039b37SCy Schubert } 70325039b37SCy Schubert 70425039b37SCy Schubert /** check for pending nonblocking connect errors, 70525039b37SCy Schubert * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 70625039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio) 70725039b37SCy Schubert { 70825039b37SCy Schubert int error = 0; 70925039b37SCy Schubert socklen_t len = (socklen_t)sizeof(error); 71025039b37SCy Schubert if(!dtio->check_nb_connect) 71125039b37SCy Schubert return 1; /* everything okay */ 71225039b37SCy Schubert if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 71325039b37SCy Schubert &len) < 0) { 71425039b37SCy Schubert #ifndef USE_WINSOCK 71525039b37SCy Schubert error = errno; /* on solaris errno is error */ 71625039b37SCy Schubert #else 71725039b37SCy Schubert error = WSAGetLastError(); 71825039b37SCy Schubert #endif 71925039b37SCy Schubert } 72025039b37SCy Schubert #ifndef USE_WINSOCK 72125039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 72225039b37SCy Schubert if(error == EINPROGRESS || error == EWOULDBLOCK) 72325039b37SCy Schubert return 0; /* try again later */ 72425039b37SCy Schubert #endif 72525039b37SCy Schubert #else 72625039b37SCy Schubert if(error == WSAEINPROGRESS) { 72725039b37SCy Schubert return 0; /* try again later */ 72825039b37SCy Schubert } else if(error == WSAEWOULDBLOCK) { 72925039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 73025039b37SCy Schubert dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 73125039b37SCy Schubert return 0; /* try again later */ 73225039b37SCy Schubert } 73325039b37SCy Schubert #endif 73425039b37SCy Schubert if(error != 0) { 73525039b37SCy Schubert char* to = dtio->socket_path; 73625039b37SCy Schubert if(!to) to = dtio->ip_str; 73725039b37SCy Schubert if(!to) to = ""; 73825039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 739c0caa2e2SCy Schubert to, sock_strerror(error)); 74025039b37SCy Schubert return -1; /* error, close it */ 74125039b37SCy Schubert } 74225039b37SCy Schubert 74325039b37SCy Schubert if(dtio->ip_str) 74425039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to %s", 74525039b37SCy Schubert dtio->ip_str); 74625039b37SCy Schubert else if(dtio->socket_path) 74725039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 74825039b37SCy Schubert dtio->socket_path); 74925039b37SCy Schubert dtio_reconnect_clear(dtio); 75025039b37SCy Schubert dtio->check_nb_connect = 0; 75125039b37SCy Schubert return 1; /* everything okay */ 75225039b37SCy Schubert } 75325039b37SCy Schubert 75425039b37SCy Schubert #ifdef HAVE_SSL 75525039b37SCy Schubert /** write to ssl output 75625039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 75725039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 75825039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 75925039b37SCy Schubert size_t len) 76025039b37SCy Schubert { 76125039b37SCy Schubert int r; 76225039b37SCy Schubert ERR_clear_error(); 76325039b37SCy Schubert r = SSL_write(dtio->ssl, buf, len); 76425039b37SCy Schubert if(r <= 0) { 76525039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 76625039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 76725039b37SCy Schubert /* closed */ 76825039b37SCy Schubert return -1; 76925039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 77025039b37SCy Schubert /* we want a brief read event */ 77125039b37SCy Schubert dtio_enable_brief_read(dtio); 77225039b37SCy Schubert return 0; 77325039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 77425039b37SCy Schubert /* write again later */ 77525039b37SCy Schubert return 0; 77625039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 77725039b37SCy Schubert #ifdef EPIPE 77825039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 77925039b37SCy Schubert return -1; /* silence 'broken pipe' */ 78025039b37SCy Schubert #endif 78125039b37SCy Schubert #ifdef ECONNRESET 78225039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 78325039b37SCy Schubert return -1; /* silence reset by peer */ 78425039b37SCy Schubert #endif 78525039b37SCy Schubert if(errno != 0) { 78625039b37SCy Schubert log_err("dnstap io, SSL_write syscall: %s", 78725039b37SCy Schubert strerror(errno)); 78825039b37SCy Schubert } 78925039b37SCy Schubert return -1; 79025039b37SCy Schubert } 791*103ba509SCy Schubert log_crypto_err_io("dnstap io, could not SSL_write", want); 79225039b37SCy Schubert return -1; 79325039b37SCy Schubert } 79425039b37SCy Schubert return r; 79525039b37SCy Schubert } 79625039b37SCy Schubert #endif /* HAVE_SSL */ 79725039b37SCy Schubert 79825039b37SCy Schubert /** write buffer to output. 79925039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 80025039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 80125039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 80225039b37SCy Schubert size_t len) 80325039b37SCy Schubert { 80425039b37SCy Schubert ssize_t ret; 80525039b37SCy Schubert if(dtio->fd == -1) 80625039b37SCy Schubert return -1; 80725039b37SCy Schubert #ifdef HAVE_SSL 80825039b37SCy Schubert if(dtio->ssl) 80925039b37SCy Schubert return dtio_write_ssl(dtio, buf, len); 81025039b37SCy Schubert #endif 81125039b37SCy Schubert ret = send(dtio->fd, (void*)buf, len, 0); 81225039b37SCy Schubert if(ret == -1) { 81325039b37SCy Schubert #ifndef USE_WINSOCK 81425039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 81525039b37SCy Schubert return 0; 81625039b37SCy Schubert #else 81725039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 81825039b37SCy Schubert return 0; 81925039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 82025039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 82125039b37SCy Schubert dtio->stop_flush_event:dtio->event), 82225039b37SCy Schubert UB_EV_WRITE); 82325039b37SCy Schubert return 0; 82425039b37SCy Schubert } 82525039b37SCy Schubert #endif 826c0caa2e2SCy Schubert log_err("dnstap io: failed send: %s", sock_strerror(errno)); 82725039b37SCy Schubert return -1; 82825039b37SCy Schubert } 82925039b37SCy Schubert return ret; 83025039b37SCy Schubert } 83125039b37SCy Schubert 83225039b37SCy Schubert #ifdef HAVE_WRITEV 83325039b37SCy Schubert /** write with writev, len and message, in one write, if possible. 83425039b37SCy Schubert * return true if message is done, false if incomplete */ 83525039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio) 83625039b37SCy Schubert { 83725039b37SCy Schubert uint32_t sendlen = htonl(dtio->cur_msg_len); 83825039b37SCy Schubert struct iovec iov[2]; 83925039b37SCy Schubert ssize_t r; 84025039b37SCy Schubert iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 84125039b37SCy Schubert iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 84225039b37SCy Schubert iov[1].iov_base = dtio->cur_msg; 84325039b37SCy Schubert iov[1].iov_len = dtio->cur_msg_len; 84425039b37SCy Schubert log_assert(iov[0].iov_len > 0); 84525039b37SCy Schubert r = writev(dtio->fd, iov, 2); 84625039b37SCy Schubert if(r == -1) { 84725039b37SCy Schubert #ifndef USE_WINSOCK 84825039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 84925039b37SCy Schubert return 0; 85025039b37SCy Schubert #else 85125039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 85225039b37SCy Schubert return 0; 85325039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 85425039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 85525039b37SCy Schubert dtio->stop_flush_event:dtio->event), 85625039b37SCy Schubert UB_EV_WRITE); 85725039b37SCy Schubert return 0; 85825039b37SCy Schubert } 85925039b37SCy Schubert #endif 860c0caa2e2SCy Schubert log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 86125039b37SCy Schubert /* close the channel */ 86225039b37SCy Schubert dtio_del_output_event(dtio); 86325039b37SCy Schubert dtio_close_output(dtio); 86425039b37SCy Schubert return 0; 86525039b37SCy Schubert } 86625039b37SCy Schubert /* written r bytes */ 86725039b37SCy Schubert dtio->cur_msg_len_done += r; 86825039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 86925039b37SCy Schubert return 0; 87025039b37SCy Schubert if(dtio->cur_msg_len_done > 4) { 87125039b37SCy Schubert dtio->cur_msg_done = dtio->cur_msg_len_done-4; 87225039b37SCy Schubert dtio->cur_msg_len_done = 4; 87325039b37SCy Schubert } 87425039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 87525039b37SCy Schubert return 0; 87625039b37SCy Schubert return 1; 87725039b37SCy Schubert } 87825039b37SCy Schubert #endif /* HAVE_WRITEV */ 87925039b37SCy Schubert 88025039b37SCy Schubert /** write more of the length, preceding the data frame. 88125039b37SCy Schubert * return true if message is done, false if incomplete. */ 88225039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio) 88325039b37SCy Schubert { 88425039b37SCy Schubert uint32_t sendlen; 88525039b37SCy Schubert int r; 88625039b37SCy Schubert if(dtio->cur_msg_len_done >= 4) 88725039b37SCy Schubert return 1; 88825039b37SCy Schubert #ifdef HAVE_WRITEV 88925039b37SCy Schubert if(!dtio->ssl) { 89025039b37SCy Schubert /* we try writev for everything.*/ 89125039b37SCy Schubert return dtio_write_with_writev(dtio); 89225039b37SCy Schubert } 89325039b37SCy Schubert #endif /* HAVE_WRITEV */ 89425039b37SCy Schubert sendlen = htonl(dtio->cur_msg_len); 89525039b37SCy Schubert r = dtio_write_buf(dtio, 89625039b37SCy Schubert ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 89725039b37SCy Schubert sizeof(sendlen)-dtio->cur_msg_len_done); 89825039b37SCy Schubert if(r == -1) { 89925039b37SCy Schubert /* close the channel */ 90025039b37SCy Schubert dtio_del_output_event(dtio); 90125039b37SCy Schubert dtio_close_output(dtio); 90225039b37SCy Schubert return 0; 90325039b37SCy Schubert } else if(r == 0) { 90425039b37SCy Schubert /* try again later */ 90525039b37SCy Schubert return 0; 90625039b37SCy Schubert } 90725039b37SCy Schubert dtio->cur_msg_len_done += r; 90825039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 90925039b37SCy Schubert return 0; 91025039b37SCy Schubert return 1; 91125039b37SCy Schubert } 91225039b37SCy Schubert 91325039b37SCy Schubert /** write more of the data frame. 91425039b37SCy Schubert * return true if message is done, false if incomplete. */ 91525039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio) 91625039b37SCy Schubert { 91725039b37SCy Schubert int r; 91825039b37SCy Schubert if(dtio->cur_msg_done >= dtio->cur_msg_len) 91925039b37SCy Schubert return 1; 92025039b37SCy Schubert r = dtio_write_buf(dtio, 92125039b37SCy Schubert ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 92225039b37SCy Schubert dtio->cur_msg_len - dtio->cur_msg_done); 92325039b37SCy Schubert if(r == -1) { 92425039b37SCy Schubert /* close the channel */ 92525039b37SCy Schubert dtio_del_output_event(dtio); 92625039b37SCy Schubert dtio_close_output(dtio); 92725039b37SCy Schubert return 0; 92825039b37SCy Schubert } else if(r == 0) { 92925039b37SCy Schubert /* try again later */ 93025039b37SCy Schubert return 0; 93125039b37SCy Schubert } 93225039b37SCy Schubert dtio->cur_msg_done += r; 93325039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 93425039b37SCy Schubert return 0; 93525039b37SCy Schubert return 1; 93625039b37SCy Schubert } 93725039b37SCy Schubert 93824e36522SCy Schubert /** write more of the current message. false if incomplete, true if 93925039b37SCy Schubert * the message is done */ 94025039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio) 94125039b37SCy Schubert { 94225039b37SCy Schubert if(dtio->cur_msg_len_done < 4) { 94325039b37SCy Schubert if(!dtio_write_more_of_len(dtio)) 94425039b37SCy Schubert return 0; 94525039b37SCy Schubert } 94625039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 94725039b37SCy Schubert if(!dtio_write_more_of_data(dtio)) 94825039b37SCy Schubert return 0; 94925039b37SCy Schubert } 95025039b37SCy Schubert return 1; 95125039b37SCy Schubert } 95225039b37SCy Schubert 95325039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 95425039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 95525039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 95625039b37SCy Schubert ssize_t r; 957865f46b2SCy Schubert r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT); 95825039b37SCy Schubert if(r == -1) { 95925039b37SCy Schubert char* to = dtio->socket_path; 96025039b37SCy Schubert if(!to) to = dtio->ip_str; 96125039b37SCy Schubert if(!to) to = ""; 96225039b37SCy Schubert #ifndef USE_WINSOCK 96325039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 96425039b37SCy Schubert return -1; /* try later */ 96525039b37SCy Schubert #else 96625039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) { 96725039b37SCy Schubert return -1; /* try later */ 96825039b37SCy Schubert } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 96925039b37SCy Schubert ub_winsock_tcp_wouldblock( 97025039b37SCy Schubert (dtio->stop_flush_event? 97125039b37SCy Schubert dtio->stop_flush_event:dtio->event), 97225039b37SCy Schubert UB_EV_READ); 97325039b37SCy Schubert return -1; /* try later */ 97425039b37SCy Schubert } 97525039b37SCy Schubert #endif 97625039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 97725039b37SCy Schubert verbosity < 4) 97825039b37SCy Schubert return 0; /* no log retries on low verbosity */ 97925039b37SCy Schubert log_err("dnstap io: output closed, recv %s: %s", to, 98025039b37SCy Schubert strerror(errno)); 98125039b37SCy Schubert /* and close below */ 98225039b37SCy Schubert return 0; 98325039b37SCy Schubert } 98425039b37SCy Schubert if(r == 0) { 98525039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 98625039b37SCy Schubert verbosity < 4) 98725039b37SCy Schubert return 0; /* no log retries on low verbosity */ 98825039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 98925039b37SCy Schubert /* and close below */ 99025039b37SCy Schubert return 0; 99125039b37SCy Schubert } 99225039b37SCy Schubert /* something was received */ 99325039b37SCy Schubert return r; 99425039b37SCy Schubert } 99525039b37SCy Schubert 99625039b37SCy Schubert #ifdef HAVE_SSL 99725039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 99825039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 99925039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 100025039b37SCy Schubert { 100125039b37SCy Schubert int r; 100225039b37SCy Schubert ERR_clear_error(); 100325039b37SCy Schubert r = SSL_read(dtio->ssl, buf, len); 100425039b37SCy Schubert if(r <= 0) { 100525039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 100625039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 100725039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 100825039b37SCy Schubert verbosity < 4) 100925039b37SCy Schubert return 0; /* no log retries on low verbosity */ 101025039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 101125039b37SCy Schubert "other side"); 101225039b37SCy Schubert return 0; 101325039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 101425039b37SCy Schubert /* continue later */ 101525039b37SCy Schubert return -1; 101625039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 101725039b37SCy Schubert (void)dtio_enable_brief_write(dtio); 101825039b37SCy Schubert return -1; 101925039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 102025039b37SCy Schubert #ifdef ECONNRESET 102125039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 102225039b37SCy Schubert errno == ECONNRESET && verbosity < 4) 102325039b37SCy Schubert return 0; /* silence reset by peer */ 102425039b37SCy Schubert #endif 102525039b37SCy Schubert if(errno != 0) 102625039b37SCy Schubert log_err("SSL_read syscall: %s", 102725039b37SCy Schubert strerror(errno)); 102825039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 102925039b37SCy Schubert "other side"); 103025039b37SCy Schubert return 0; 103125039b37SCy Schubert } 1032*103ba509SCy Schubert log_crypto_err_io("could not SSL_read", want); 103325039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 103425039b37SCy Schubert "other side"); 103525039b37SCy Schubert return 0; 103625039b37SCy Schubert } 103725039b37SCy Schubert return r; 103825039b37SCy Schubert } 103925039b37SCy Schubert #endif /* HAVE_SSL */ 104025039b37SCy Schubert 104125039b37SCy Schubert /** check if the output fd has been closed, 104225039b37SCy Schubert * it returns false if the stream is closed. */ 104325039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio) 104425039b37SCy Schubert { 104525039b37SCy Schubert /* we don't want to read any packets, but if there are we can 104625039b37SCy Schubert * discard the input (ignore it). Ignore of unknown (control) 104725039b37SCy Schubert * packets is okay for the framestream protocol. And also, the 104825039b37SCy Schubert * read call can return that the stream has been closed by the 104925039b37SCy Schubert * other side. */ 105025039b37SCy Schubert uint8_t buf[1024]; 105125039b37SCy Schubert int r = -1; 105225039b37SCy Schubert 105325039b37SCy Schubert 105425039b37SCy Schubert if(dtio->fd == -1) return 0; 105525039b37SCy Schubert 105625039b37SCy Schubert while(r != 0) { 105725039b37SCy Schubert /* not interested in buffer content, overwrite */ 105825039b37SCy Schubert r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 105925039b37SCy Schubert if(r == -1) 106025039b37SCy Schubert return 1; 106125039b37SCy Schubert } 106225039b37SCy Schubert /* the other end has been closed */ 106325039b37SCy Schubert /* close the channel */ 106425039b37SCy Schubert dtio_del_output_event(dtio); 106525039b37SCy Schubert dtio_close_output(dtio); 106625039b37SCy Schubert return 0; 106725039b37SCy Schubert } 106825039b37SCy Schubert 106925039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed, 107025039b37SCy Schubert * 1: valid accept received. */ 107125039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio) 107225039b37SCy Schubert { 107325039b37SCy Schubert int r; 107425039b37SCy Schubert size_t read_frame_done; 107525039b37SCy Schubert while(dtio->read_frame.frame_len_done < 4) { 107625039b37SCy Schubert #ifdef HAVE_SSL 107725039b37SCy Schubert if(dtio->ssl) { 107825039b37SCy Schubert r = ssl_read_bytes(dtio, 107925039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 108025039b37SCy Schubert dtio->read_frame.frame_len_done, 108125039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 108225039b37SCy Schubert } else { 108325039b37SCy Schubert #endif 108425039b37SCy Schubert r = receive_bytes(dtio, 108525039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 108625039b37SCy Schubert dtio->read_frame.frame_len_done, 108725039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 108825039b37SCy Schubert #ifdef HAVE_SSL 108925039b37SCy Schubert } 109025039b37SCy Schubert #endif 109125039b37SCy Schubert if(r == -1) 109225039b37SCy Schubert return -1; /* continue reading */ 109325039b37SCy Schubert if(r == 0) { 109425039b37SCy Schubert /* connection closed */ 109525039b37SCy Schubert goto close_connection; 109625039b37SCy Schubert } 109725039b37SCy Schubert dtio->read_frame.frame_len_done += r; 109825039b37SCy Schubert if(dtio->read_frame.frame_len_done < 4) 109925039b37SCy Schubert return -1; /* continue reading */ 110025039b37SCy Schubert 110125039b37SCy Schubert if(dtio->read_frame.frame_len == 0) { 110225039b37SCy Schubert dtio->read_frame.frame_len_done = 0; 110325039b37SCy Schubert dtio->read_frame.control_frame = 1; 110425039b37SCy Schubert continue; 110525039b37SCy Schubert } 110625039b37SCy Schubert dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 110725039b37SCy Schubert if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 110825039b37SCy Schubert verbose(VERB_OPS, "dnstap: received frame exceeds max " 110925039b37SCy Schubert "length of %d bytes, closing connection", 111025039b37SCy Schubert DTIO_RECV_FRAME_MAX_LEN); 111125039b37SCy Schubert goto close_connection; 111225039b37SCy Schubert } 111325039b37SCy Schubert dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 111425039b37SCy Schubert dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 111525039b37SCy Schubert if(!dtio->read_frame.buf) { 111625039b37SCy Schubert log_err("dnstap io: out of memory (creating read " 111725039b37SCy Schubert "buffer)"); 111825039b37SCy Schubert goto close_connection; 111925039b37SCy Schubert } 112025039b37SCy Schubert } 112125039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 112225039b37SCy Schubert #ifdef HAVE_SSL 112325039b37SCy Schubert if(dtio->ssl) { 112425039b37SCy Schubert r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 112525039b37SCy Schubert dtio->read_frame.buf_count, 112625039b37SCy Schubert dtio->read_frame.buf_cap- 112725039b37SCy Schubert dtio->read_frame.buf_count); 112825039b37SCy Schubert } else { 112925039b37SCy Schubert #endif 113025039b37SCy Schubert r = receive_bytes(dtio, dtio->read_frame.buf+ 113125039b37SCy Schubert dtio->read_frame.buf_count, 113225039b37SCy Schubert dtio->read_frame.buf_cap- 113325039b37SCy Schubert dtio->read_frame.buf_count); 113425039b37SCy Schubert #ifdef HAVE_SSL 113525039b37SCy Schubert } 113625039b37SCy Schubert #endif 113725039b37SCy Schubert if(r == -1) 113825039b37SCy Schubert return -1; /* continue reading */ 113925039b37SCy Schubert if(r == 0) { 114025039b37SCy Schubert /* connection closed */ 114125039b37SCy Schubert goto close_connection; 114225039b37SCy Schubert } 114325039b37SCy Schubert dtio->read_frame.buf_count += r; 114425039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 114525039b37SCy Schubert return -1; /* continue reading */ 114625039b37SCy Schubert } 114725039b37SCy Schubert 114825039b37SCy Schubert /* Complete frame received, check if this is a valid ACCEPT control 114925039b37SCy Schubert * frame. */ 115025039b37SCy Schubert if(dtio->read_frame.frame_len < 4) { 115125039b37SCy Schubert verbose(VERB_OPS, "dnstap: invalid data received"); 115225039b37SCy Schubert goto close_connection; 115325039b37SCy Schubert } 115425039b37SCy Schubert if(sldns_read_uint32(dtio->read_frame.buf) != 115525039b37SCy Schubert FSTRM_CONTROL_FRAME_ACCEPT) { 115625039b37SCy Schubert verbose(VERB_ALGO, "dnstap: invalid control type received, " 115725039b37SCy Schubert "ignored"); 115825039b37SCy Schubert dtio->ready_frame_sent = 0; 115925039b37SCy Schubert dtio->accept_frame_received = 0; 116025039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 116125039b37SCy Schubert return -1; 116225039b37SCy Schubert } 116325039b37SCy Schubert read_frame_done = 4; /* control frame type */ 116425039b37SCy Schubert 116525039b37SCy Schubert /* Iterate over control fields, ignore unknown types. 116625039b37SCy Schubert * Need to be able to read at least 8 bytes (control field type + 116725039b37SCy Schubert * length). */ 116825039b37SCy Schubert while(read_frame_done+8 < dtio->read_frame.frame_len) { 116925039b37SCy Schubert uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 117025039b37SCy Schubert read_frame_done); 117125039b37SCy Schubert uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 117225039b37SCy Schubert read_frame_done + 4); 117325039b37SCy Schubert if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 117425039b37SCy Schubert if(len == strlen(DNSTAP_CONTENT_TYPE) && 117525039b37SCy Schubert read_frame_done+8+len <= 117625039b37SCy Schubert dtio->read_frame.frame_len && 117725039b37SCy Schubert memcmp(dtio->read_frame.buf + read_frame_done + 117825039b37SCy Schubert + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 117925039b37SCy Schubert if(!dtio_control_start_send(dtio)) { 118025039b37SCy Schubert verbose(VERB_OPS, "dnstap io: out of " 118125039b37SCy Schubert "memory while sending START frame"); 118225039b37SCy Schubert goto close_connection; 118325039b37SCy Schubert } 118425039b37SCy Schubert dtio->accept_frame_received = 1; 1185c0caa2e2SCy Schubert if(!dtio_add_output_event_write(dtio)) 1186c0caa2e2SCy Schubert goto close_connection; 118725039b37SCy Schubert return 1; 118825039b37SCy Schubert } else { 118924e36522SCy Schubert /* unknown content type */ 119025039b37SCy Schubert verbose(VERB_ALGO, "dnstap: ACCEPT frame " 119125039b37SCy Schubert "contains unknown content type, " 119225039b37SCy Schubert "closing connection"); 119325039b37SCy Schubert goto close_connection; 119425039b37SCy Schubert } 119525039b37SCy Schubert } 119625039b37SCy Schubert /* unknown option, try next */ 119725039b37SCy Schubert read_frame_done += 8+len; 119825039b37SCy Schubert } 119925039b37SCy Schubert 120025039b37SCy Schubert 120125039b37SCy Schubert close_connection: 120225039b37SCy Schubert dtio_del_output_event(dtio); 120325039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 120425039b37SCy Schubert dtio_close_output(dtio); 120525039b37SCy Schubert return 0; 120625039b37SCy Schubert } 120725039b37SCy Schubert 120825039b37SCy Schubert /** add the output file descriptor event for listening, read only */ 120925039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio) 121025039b37SCy Schubert { 121125039b37SCy Schubert if(!dtio->event) 121225039b37SCy Schubert return 0; 121325039b37SCy Schubert if(dtio->event_added && !dtio->event_added_is_write) 121425039b37SCy Schubert return 1; 121525039b37SCy Schubert /* we have to (re-)register the event */ 121625039b37SCy Schubert if(dtio->event_added) 121725039b37SCy Schubert ub_event_del(dtio->event); 121825039b37SCy Schubert ub_event_del_bits(dtio->event, UB_EV_WRITE); 121925039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 122025039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 122125039b37SCy Schubert dtio->event_added = 0; 122225039b37SCy Schubert dtio->event_added_is_write = 0; 122325039b37SCy Schubert /* close output and start reattempts to open it */ 122425039b37SCy Schubert dtio_close_output(dtio); 122525039b37SCy Schubert return 0; 122625039b37SCy Schubert } 122725039b37SCy Schubert dtio->event_added = 1; 122825039b37SCy Schubert dtio->event_added_is_write = 0; 122925039b37SCy Schubert return 1; 123025039b37SCy Schubert } 123125039b37SCy Schubert 123225039b37SCy Schubert /** add the output file descriptor event for listening, read and write */ 123325039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio) 123425039b37SCy Schubert { 123525039b37SCy Schubert if(!dtio->event) 123625039b37SCy Schubert return 0; 123725039b37SCy Schubert if(dtio->event_added && dtio->event_added_is_write) 123825039b37SCy Schubert return 1; 123925039b37SCy Schubert /* we have to (re-)register the event */ 124025039b37SCy Schubert if(dtio->event_added) 124125039b37SCy Schubert ub_event_del(dtio->event); 124225039b37SCy Schubert ub_event_add_bits(dtio->event, UB_EV_WRITE); 124325039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 124425039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 124525039b37SCy Schubert dtio->event_added = 0; 124625039b37SCy Schubert dtio->event_added_is_write = 0; 124725039b37SCy Schubert /* close output and start reattempts to open it */ 124825039b37SCy Schubert dtio_close_output(dtio); 124925039b37SCy Schubert return 0; 125025039b37SCy Schubert } 125125039b37SCy Schubert dtio->event_added = 1; 125225039b37SCy Schubert dtio->event_added_is_write = 1; 125325039b37SCy Schubert return 1; 125425039b37SCy Schubert } 125525039b37SCy Schubert 125625039b37SCy Schubert /** put the dtio thread to sleep */ 125725039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio) 125825039b37SCy Schubert { 125925039b37SCy Schubert /* unregister the event polling for write, because there is 126025039b37SCy Schubert * nothing to be written */ 126125039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 126225039b37SCy Schubert } 126325039b37SCy Schubert 126425039b37SCy Schubert #ifdef HAVE_SSL 126525039b37SCy Schubert /** enable the brief read condition */ 126625039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio) 126725039b37SCy Schubert { 126825039b37SCy Schubert dtio->ssl_brief_read = 1; 126925039b37SCy Schubert if(dtio->stop_flush_event) { 127025039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 127125039b37SCy Schubert ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 127225039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 127325039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 127425039b37SCy Schubert return 0; 127525039b37SCy Schubert } 127625039b37SCy Schubert return 1; 127725039b37SCy Schubert } 127825039b37SCy Schubert return dtio_add_output_event_read(dtio); 127925039b37SCy Schubert } 128025039b37SCy Schubert #endif /* HAVE_SSL */ 128125039b37SCy Schubert 128225039b37SCy Schubert #ifdef HAVE_SSL 128325039b37SCy Schubert /** disable the brief read condition */ 128425039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio) 128525039b37SCy Schubert { 128625039b37SCy Schubert dtio->ssl_brief_read = 0; 128725039b37SCy Schubert if(dtio->stop_flush_event) { 128825039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 128925039b37SCy Schubert ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 129025039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 129125039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 129225039b37SCy Schubert return 0; 129325039b37SCy Schubert } 129425039b37SCy Schubert return 1; 129525039b37SCy Schubert } 129625039b37SCy Schubert return dtio_add_output_event_write(dtio); 129725039b37SCy Schubert } 129825039b37SCy Schubert #endif /* HAVE_SSL */ 129925039b37SCy Schubert 130025039b37SCy Schubert #ifdef HAVE_SSL 130125039b37SCy Schubert /** enable the brief write condition */ 130225039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio) 130325039b37SCy Schubert { 130425039b37SCy Schubert dtio->ssl_brief_write = 1; 130525039b37SCy Schubert return dtio_add_output_event_write(dtio); 130625039b37SCy Schubert } 130725039b37SCy Schubert #endif /* HAVE_SSL */ 130825039b37SCy Schubert 130925039b37SCy Schubert #ifdef HAVE_SSL 131025039b37SCy Schubert /** disable the brief write condition */ 131125039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio) 131225039b37SCy Schubert { 131325039b37SCy Schubert dtio->ssl_brief_write = 0; 131425039b37SCy Schubert return dtio_add_output_event_read(dtio); 131525039b37SCy Schubert } 131625039b37SCy Schubert #endif /* HAVE_SSL */ 131725039b37SCy Schubert 131825039b37SCy Schubert #ifdef HAVE_SSL 131925039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/ 132025039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 132125039b37SCy Schubert { 132225039b37SCy Schubert if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 132325039b37SCy Schubert /* verification */ 132425039b37SCy Schubert if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 132525039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 132625039b37SCy Schubert if(!x) { 132725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 132825039b37SCy Schubert "connection failed no certificate", 132925039b37SCy Schubert dtio->ip_str); 133025039b37SCy Schubert return 0; 133125039b37SCy Schubert } 133225039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer certificate", 133325039b37SCy Schubert x); 133425039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 133525039b37SCy Schubert if(SSL_get0_peername(dtio->ssl)) { 133625039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 133725039b37SCy Schubert "connection to %s authenticated", 133825039b37SCy Schubert dtio->ip_str, 133925039b37SCy Schubert SSL_get0_peername(dtio->ssl)); 134025039b37SCy Schubert } else { 134125039b37SCy Schubert #endif 134225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 134325039b37SCy Schubert "connection authenticated", 134425039b37SCy Schubert dtio->ip_str); 134525039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 134625039b37SCy Schubert } 134725039b37SCy Schubert #endif 134825039b37SCy Schubert X509_free(x); 134925039b37SCy Schubert } else { 135025039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 135125039b37SCy Schubert if(x) { 135225039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer " 135325039b37SCy Schubert "certificate", x); 135425039b37SCy Schubert X509_free(x); 135525039b37SCy Schubert } 135625039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 135725039b37SCy Schubert "failed: failed to authenticate", 135825039b37SCy Schubert dtio->ip_str); 135925039b37SCy Schubert return 0; 136025039b37SCy Schubert } 136125039b37SCy Schubert } else { 136225039b37SCy Schubert /* unauthenticated, the verify peer flag was not set 136325039b37SCy Schubert * in ssl when the ssl object was created from ssl_ctx */ 136425039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 136525039b37SCy Schubert dtio->ip_str); 136625039b37SCy Schubert } 136725039b37SCy Schubert return 1; 136825039b37SCy Schubert } 136925039b37SCy Schubert #endif /* HAVE_SSL */ 137025039b37SCy Schubert 137125039b37SCy Schubert #ifdef HAVE_SSL 137225039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */ 137325039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio, 137425039b37SCy Schubert struct stop_flush_info* info) 137525039b37SCy Schubert { 137625039b37SCy Schubert int r; 137725039b37SCy Schubert if(dtio->ssl_brief_read) { 137825039b37SCy Schubert /* assume the brief read condition is satisfied, 137925039b37SCy Schubert * if we need more or again, we can set it again */ 138025039b37SCy Schubert if(!dtio_disable_brief_read(dtio)) { 138125039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 138225039b37SCy Schubert return 0; 138325039b37SCy Schubert } 138425039b37SCy Schubert } 138525039b37SCy Schubert if(dtio->ssl_handshake_done) 138625039b37SCy Schubert return 1; 138725039b37SCy Schubert 138825039b37SCy Schubert ERR_clear_error(); 138925039b37SCy Schubert r = SSL_do_handshake(dtio->ssl); 139025039b37SCy Schubert if(r != 1) { 139125039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 139225039b37SCy Schubert if(want == SSL_ERROR_WANT_READ) { 139325039b37SCy Schubert /* we want to read on the connection */ 139425039b37SCy Schubert if(!dtio_enable_brief_read(dtio)) { 139525039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 139625039b37SCy Schubert return 0; 139725039b37SCy Schubert } 139825039b37SCy Schubert return 0; 139925039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 140025039b37SCy Schubert /* we want to write on the connection */ 140125039b37SCy Schubert return 0; 140225039b37SCy Schubert } else if(r == 0) { 140325039b37SCy Schubert /* closed */ 140425039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 140525039b37SCy Schubert dtio_del_output_event(dtio); 140625039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 140725039b37SCy Schubert dtio_close_output(dtio); 140825039b37SCy Schubert return 0; 140925039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 141025039b37SCy Schubert /* SYSCALL and errno==0 means closed uncleanly */ 141125039b37SCy Schubert int silent = 0; 141225039b37SCy Schubert #ifdef EPIPE 141325039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 141425039b37SCy Schubert silent = 1; /* silence 'broken pipe' */ 141525039b37SCy Schubert #endif 141625039b37SCy Schubert #ifdef ECONNRESET 141725039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 141825039b37SCy Schubert silent = 1; /* silence reset by peer */ 141925039b37SCy Schubert #endif 142025039b37SCy Schubert if(errno == 0) 142125039b37SCy Schubert silent = 1; 142225039b37SCy Schubert if(!silent) 142325039b37SCy Schubert log_err("dnstap io, SSL_handshake syscall: %s", 142425039b37SCy Schubert strerror(errno)); 142525039b37SCy Schubert /* closed */ 142625039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 142725039b37SCy Schubert dtio_del_output_event(dtio); 142825039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 142925039b37SCy Schubert dtio_close_output(dtio); 143025039b37SCy Schubert return 0; 143125039b37SCy Schubert } else { 143225039b37SCy Schubert unsigned long err = ERR_get_error(); 143325039b37SCy Schubert if(!squelch_err_ssl_handshake(err)) { 1434*103ba509SCy Schubert log_crypto_err_io_code("dnstap io, ssl handshake failed", 1435*103ba509SCy Schubert want, err); 143625039b37SCy Schubert verbose(VERB_OPS, "dnstap io, ssl handshake failed " 143725039b37SCy Schubert "from %s", dtio->ip_str); 143825039b37SCy Schubert } 143925039b37SCy Schubert /* closed */ 144025039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 144125039b37SCy Schubert dtio_del_output_event(dtio); 144225039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 144325039b37SCy Schubert dtio_close_output(dtio); 144425039b37SCy Schubert return 0; 144525039b37SCy Schubert } 144625039b37SCy Schubert 144725039b37SCy Schubert } 144825039b37SCy Schubert /* check peer verification */ 144925039b37SCy Schubert dtio->ssl_handshake_done = 1; 145025039b37SCy Schubert 145125039b37SCy Schubert if(!dtio_ssl_check_peer(dtio)) { 145225039b37SCy Schubert /* closed */ 145325039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 145425039b37SCy Schubert dtio_del_output_event(dtio); 145525039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 145625039b37SCy Schubert dtio_close_output(dtio); 145725039b37SCy Schubert return 0; 145825039b37SCy Schubert } 145925039b37SCy Schubert return 1; 146025039b37SCy Schubert } 146125039b37SCy Schubert #endif /* HAVE_SSL */ 146225039b37SCy Schubert 146325039b37SCy Schubert /** callback for the dnstap events, to write to the output */ 146425039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 146525039b37SCy Schubert { 146625039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 146725039b37SCy Schubert int i; 146825039b37SCy Schubert 146925039b37SCy Schubert if(dtio->check_nb_connect) { 147025039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 147125039b37SCy Schubert if(connect_err == -1) { 147225039b37SCy Schubert /* close the channel */ 147325039b37SCy Schubert dtio_del_output_event(dtio); 147425039b37SCy Schubert dtio_close_output(dtio); 147525039b37SCy Schubert return; 147625039b37SCy Schubert } else if(connect_err == 0) { 147725039b37SCy Schubert /* try again later */ 147825039b37SCy Schubert return; 147925039b37SCy Schubert } 148025039b37SCy Schubert /* nonblocking connect check passed, continue */ 148125039b37SCy Schubert } 148225039b37SCy Schubert 148325039b37SCy Schubert #ifdef HAVE_SSL 148425039b37SCy Schubert if(dtio->ssl && 148525039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 148625039b37SCy Schubert if(!dtio_ssl_handshake(dtio, NULL)) 148725039b37SCy Schubert return; 148825039b37SCy Schubert } 148925039b37SCy Schubert #endif 149025039b37SCy Schubert 149125039b37SCy Schubert if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 149225039b37SCy Schubert if(dtio->ssl_brief_write) 149325039b37SCy Schubert (void)dtio_disable_brief_write(dtio); 149425039b37SCy Schubert if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 149525039b37SCy Schubert if(dtio_read_accept_frame(dtio) <= 0) 149625039b37SCy Schubert return; 149725039b37SCy Schubert } else if(!dtio_check_close(dtio)) 149825039b37SCy Schubert return; 149925039b37SCy Schubert } 150025039b37SCy Schubert 150125039b37SCy Schubert /* loop to process a number of messages. This improves throughput, 150225039b37SCy Schubert * because selecting on write-event if not needed for busy messages 150325039b37SCy Schubert * (dnstap log) generation and if they need to all be written back. 150425039b37SCy Schubert * The write event is usually not blocked up. But not forever, 150525039b37SCy Schubert * because the event loop needs to stay responsive for other events. 150625039b37SCy Schubert * If there are no (more) messages, or if the output buffers get 150725039b37SCy Schubert * full, it returns out of the loop. */ 150825039b37SCy Schubert for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 150925039b37SCy Schubert /* see if there are messages that need writing */ 151025039b37SCy Schubert if(!dtio->cur_msg) { 151125039b37SCy Schubert if(!dtio_find_msg(dtio)) { 151225039b37SCy Schubert if(i == 0) { 151325039b37SCy Schubert /* no messages on the first iteration, 151425039b37SCy Schubert * the queues are all empty */ 151525039b37SCy Schubert dtio_sleep(dtio); 151625039b37SCy Schubert } 151725039b37SCy Schubert return; /* nothing to do */ 151825039b37SCy Schubert } 151925039b37SCy Schubert } 152025039b37SCy Schubert 152125039b37SCy Schubert /* write it */ 152225039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 152325039b37SCy Schubert if(!dtio_write_more(dtio)) 152425039b37SCy Schubert return; 152525039b37SCy Schubert } 152625039b37SCy Schubert 152725039b37SCy Schubert /* done with the current message */ 152825039b37SCy Schubert dtio_cur_msg_free(dtio); 152925039b37SCy Schubert 153025039b37SCy Schubert /* If this is a bidirectional stream the first message will be 153125039b37SCy Schubert * the READY control frame. We can only continue writing after 153225039b37SCy Schubert * receiving an ACCEPT control frame. */ 153325039b37SCy Schubert if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 153425039b37SCy Schubert dtio->ready_frame_sent = 1; 153525039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 153625039b37SCy Schubert break; 153725039b37SCy Schubert } 153825039b37SCy Schubert } 153925039b37SCy Schubert } 154025039b37SCy Schubert 154125039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */ 154225039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 154325039b37SCy Schubert { 154425039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 154525039b37SCy Schubert uint8_t cmd; 154625039b37SCy Schubert ssize_t r; 154725039b37SCy Schubert if(dtio->want_to_exit) 154825039b37SCy Schubert return; 154925039b37SCy Schubert r = read(fd, &cmd, sizeof(cmd)); 155025039b37SCy Schubert if(r == -1) { 155125039b37SCy Schubert #ifndef USE_WINSOCK 155225039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 155325039b37SCy Schubert return; /* ignore this */ 155425039b37SCy Schubert #else 155525039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 155625039b37SCy Schubert return; 155725039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 155825039b37SCy Schubert return; 155925039b37SCy Schubert #endif 1560c0caa2e2SCy Schubert log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 156125039b37SCy Schubert /* and then fall through to quit the thread */ 156225039b37SCy Schubert } else if(r == 0) { 156325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 156425039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 156525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 156625039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 156725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 156825039b37SCy Schubert 156925039b37SCy Schubert if(dtio->is_bidirectional && !dtio->accept_frame_received) { 157025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 157125039b37SCy Schubert "waiting for ACCEPT control frame"); 157225039b37SCy Schubert return; 157325039b37SCy Schubert } 157425039b37SCy Schubert 157525039b37SCy Schubert /* reregister event */ 157625039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 157725039b37SCy Schubert return; 157825039b37SCy Schubert return; 157925039b37SCy Schubert } else if(r == 1) { 158025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 158125039b37SCy Schubert } 158225039b37SCy Schubert dtio->want_to_exit = 1; 158325039b37SCy Schubert if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 158425039b37SCy Schubert != 0) { 158525039b37SCy Schubert log_err("dnstap io: could not loopexit"); 158625039b37SCy Schubert } 158725039b37SCy Schubert } 158825039b37SCy Schubert 158925039b37SCy Schubert #ifndef THREADS_DISABLED 159025039b37SCy Schubert /** setup the event base for the dnstap io thread */ 159125039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 159225039b37SCy Schubert struct timeval* now) 159325039b37SCy Schubert { 159425039b37SCy Schubert memset(now, 0, sizeof(*now)); 159525039b37SCy Schubert dtio->event_base = ub_default_event_base(0, secs, now); 159625039b37SCy Schubert if(!dtio->event_base) { 159725039b37SCy Schubert fatal_exit("dnstap io: could not create event_base"); 159825039b37SCy Schubert } 159925039b37SCy Schubert } 160025039b37SCy Schubert #endif /* THREADS_DISABLED */ 160125039b37SCy Schubert 160225039b37SCy Schubert /** setup the cmd event for dnstap io */ 160325039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio) 160425039b37SCy Schubert { 160525039b37SCy Schubert struct ub_event* cmdev; 160625039b37SCy Schubert fd_set_nonblock(dtio->commandpipe[0]); 160725039b37SCy Schubert cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 160825039b37SCy Schubert UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 160925039b37SCy Schubert if(!cmdev) { 161025039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 161125039b37SCy Schubert } 161225039b37SCy Schubert dtio->command_event = cmdev; 161325039b37SCy Schubert if(ub_event_add(cmdev, NULL) != 0) { 161425039b37SCy Schubert fatal_exit("dnstap io: out of memory (adding event)"); 161525039b37SCy Schubert } 161625039b37SCy Schubert } 161725039b37SCy Schubert 161825039b37SCy Schubert /** setup the reconnect event for dnstap io */ 161925039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio) 162025039b37SCy Schubert { 162125039b37SCy Schubert dtio_reconnect_clear(dtio); 162225039b37SCy Schubert dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 162325039b37SCy Schubert UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 162425039b37SCy Schubert if(!dtio->reconnect_timer) { 162525039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 162625039b37SCy Schubert } 162725039b37SCy Schubert } 162825039b37SCy Schubert 162925039b37SCy Schubert /** 163025039b37SCy Schubert * structure to keep track of information during stop flush 163125039b37SCy Schubert */ 163225039b37SCy Schubert struct stop_flush_info { 163325039b37SCy Schubert /** the event base during stop flush */ 163425039b37SCy Schubert struct ub_event_base* base; 163525039b37SCy Schubert /** did we already want to exit this stop-flush event base */ 163625039b37SCy Schubert int want_to_exit_flush; 163725039b37SCy Schubert /** has the timer fired */ 163825039b37SCy Schubert int timer_done; 163925039b37SCy Schubert /** the dtio */ 164025039b37SCy Schubert struct dt_io_thread* dtio; 164125039b37SCy Schubert /** the stop control frame */ 164225039b37SCy Schubert void* stop_frame; 164325039b37SCy Schubert /** length of the stop frame */ 164425039b37SCy Schubert size_t stop_frame_len; 164525039b37SCy Schubert /** how much we have done of the stop frame */ 164625039b37SCy Schubert size_t stop_frame_done; 164725039b37SCy Schubert }; 164825039b37SCy Schubert 164925039b37SCy Schubert /** exit the stop flush base */ 165025039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info) 165125039b37SCy Schubert { 165225039b37SCy Schubert if(info->want_to_exit_flush) 165325039b37SCy Schubert return; 165425039b37SCy Schubert info->want_to_exit_flush = 1; 165525039b37SCy Schubert if(ub_event_base_loopexit(info->base) != 0) { 165625039b37SCy Schubert log_err("dnstap io: could not loopexit"); 165725039b37SCy Schubert } 165825039b37SCy Schubert } 165925039b37SCy Schubert 166025039b37SCy Schubert /** send the stop control, 166125039b37SCy Schubert * return true if completed the frame. */ 166225039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info) 166325039b37SCy Schubert { 166425039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 166525039b37SCy Schubert int r; 166625039b37SCy Schubert if(info->stop_frame_done >= info->stop_frame_len) 166725039b37SCy Schubert return 1; 166825039b37SCy Schubert r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 166925039b37SCy Schubert info->stop_frame_done, info->stop_frame_len - 167025039b37SCy Schubert info->stop_frame_done); 167125039b37SCy Schubert if(r == -1) { 167225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 167325039b37SCy Schubert dtio_stop_flush_exit(info); 167425039b37SCy Schubert return 0; 167525039b37SCy Schubert } 167625039b37SCy Schubert if(r == 0) { 167725039b37SCy Schubert /* try again later, or timeout */ 167825039b37SCy Schubert return 0; 167925039b37SCy Schubert } 168025039b37SCy Schubert info->stop_frame_done += r; 168125039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) 168225039b37SCy Schubert return 0; /* not done yet */ 168325039b37SCy Schubert return 1; 168425039b37SCy Schubert } 168525039b37SCy Schubert 168625039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 168725039b37SCy Schubert void* arg) 168825039b37SCy Schubert { 168925039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 169025039b37SCy Schubert if(info->want_to_exit_flush) 169125039b37SCy Schubert return; 169225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 169325039b37SCy Schubert info->timer_done = 1; 169425039b37SCy Schubert dtio_stop_flush_exit(info); 169525039b37SCy Schubert } 169625039b37SCy Schubert 169725039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 169825039b37SCy Schubert { 169925039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 170025039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 170125039b37SCy Schubert if(info->want_to_exit_flush) 170225039b37SCy Schubert return; 170325039b37SCy Schubert if(dtio->check_nb_connect) { 170425039b37SCy Schubert /* we don't start the stop_flush if connect still 170525039b37SCy Schubert * in progress, but the check code is here, just in case */ 170625039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 170725039b37SCy Schubert if(connect_err == -1) { 170825039b37SCy Schubert /* close the channel, exit the stop flush */ 170925039b37SCy Schubert dtio_stop_flush_exit(info); 171025039b37SCy Schubert dtio_del_output_event(dtio); 171125039b37SCy Schubert dtio_close_output(dtio); 171225039b37SCy Schubert return; 171325039b37SCy Schubert } else if(connect_err == 0) { 171425039b37SCy Schubert /* try again later */ 171525039b37SCy Schubert return; 171625039b37SCy Schubert } 171725039b37SCy Schubert /* nonblocking connect check passed, continue */ 171825039b37SCy Schubert } 171925039b37SCy Schubert #ifdef HAVE_SSL 172025039b37SCy Schubert if(dtio->ssl && 172125039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 172225039b37SCy Schubert if(!dtio_ssl_handshake(dtio, info)) 172325039b37SCy Schubert return; 172425039b37SCy Schubert } 172525039b37SCy Schubert #endif 172625039b37SCy Schubert 172725039b37SCy Schubert if((bits&UB_EV_READ)) { 172825039b37SCy Schubert if(!dtio_check_close(dtio)) { 172925039b37SCy Schubert if(dtio->fd == -1) { 173025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 173125039b37SCy Schubert "stop flush: output closed"); 173225039b37SCy Schubert dtio_stop_flush_exit(info); 173325039b37SCy Schubert } 173425039b37SCy Schubert return; 173525039b37SCy Schubert } 173625039b37SCy Schubert } 173725039b37SCy Schubert /* write remainder of last frame */ 173825039b37SCy Schubert if(dtio->cur_msg) { 173925039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 174025039b37SCy Schubert if(!dtio_write_more(dtio)) { 174125039b37SCy Schubert if(dtio->fd == -1) { 174225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 174325039b37SCy Schubert "stop flush: output closed"); 174425039b37SCy Schubert dtio_stop_flush_exit(info); 174525039b37SCy Schubert } 174625039b37SCy Schubert return; 174725039b37SCy Schubert } 174825039b37SCy Schubert } 174925039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 175025039b37SCy Schubert "last frame"); 175125039b37SCy Schubert dtio_cur_msg_free(dtio); 175225039b37SCy Schubert } 175325039b37SCy Schubert /* write stop frame */ 175425039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) { 175525039b37SCy Schubert if(!dtio_control_stop_send(info)) 175625039b37SCy Schubert return; 175725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 175825039b37SCy Schubert "stop control frame"); 175925039b37SCy Schubert } 176025039b37SCy Schubert /* when last frame and stop frame are sent, exit */ 176125039b37SCy Schubert dtio_stop_flush_exit(info); 176225039b37SCy Schubert } 176325039b37SCy Schubert 176425039b37SCy Schubert /** flush at end, last packet and stop control */ 176525039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio) 176625039b37SCy Schubert { 176725039b37SCy Schubert /* briefly attempt to flush the previous packet to the output, 176825039b37SCy Schubert * this could be a partial packet, or even the start control frame */ 176925039b37SCy Schubert time_t secs = 0; 177025039b37SCy Schubert struct timeval now; 177125039b37SCy Schubert struct stop_flush_info info; 177225039b37SCy Schubert struct timeval tv; 177325039b37SCy Schubert struct ub_event* timer, *stopev; 177425039b37SCy Schubert 177525039b37SCy Schubert if(dtio->fd == -1 || dtio->check_nb_connect) { 177625039b37SCy Schubert /* no connection or we have just connected, so nothing is 177725039b37SCy Schubert * sent yet, so nothing to stop or flush */ 177825039b37SCy Schubert return; 177925039b37SCy Schubert } 178025039b37SCy Schubert if(dtio->ssl && !dtio->ssl_handshake_done) { 178125039b37SCy Schubert /* no SSL connection has been established yet */ 178225039b37SCy Schubert return; 178325039b37SCy Schubert } 178425039b37SCy Schubert 178525039b37SCy Schubert memset(&info, 0, sizeof(info)); 178625039b37SCy Schubert memset(&now, 0, sizeof(now)); 178725039b37SCy Schubert info.dtio = dtio; 178825039b37SCy Schubert info.base = ub_default_event_base(0, &secs, &now); 178925039b37SCy Schubert if(!info.base) { 179025039b37SCy Schubert log_err("dnstap io: malloc failure"); 179125039b37SCy Schubert return; 179225039b37SCy Schubert } 179325039b37SCy Schubert timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 179425039b37SCy Schubert &dtio_stop_timer_cb, &info); 179525039b37SCy Schubert if(!timer) { 179625039b37SCy Schubert log_err("dnstap io: malloc failure"); 179725039b37SCy Schubert ub_event_base_free(info.base); 179825039b37SCy Schubert return; 179925039b37SCy Schubert } 180025039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 180125039b37SCy Schubert tv.tv_sec = 2; 180225039b37SCy Schubert if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 180325039b37SCy Schubert &tv) != 0) { 180425039b37SCy Schubert log_err("dnstap io: cannot event_timer_add"); 180525039b37SCy Schubert ub_event_free(timer); 180625039b37SCy Schubert ub_event_base_free(info.base); 180725039b37SCy Schubert return; 180825039b37SCy Schubert } 180925039b37SCy Schubert stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 181025039b37SCy Schubert UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 181125039b37SCy Schubert if(!stopev) { 181225039b37SCy Schubert log_err("dnstap io: malloc failure"); 181325039b37SCy Schubert ub_timer_del(timer); 181425039b37SCy Schubert ub_event_free(timer); 181525039b37SCy Schubert ub_event_base_free(info.base); 181625039b37SCy Schubert return; 181725039b37SCy Schubert } 181825039b37SCy Schubert if(ub_event_add(stopev, NULL) != 0) { 181925039b37SCy Schubert log_err("dnstap io: cannot event_add"); 182025039b37SCy Schubert ub_event_free(stopev); 182125039b37SCy Schubert ub_timer_del(timer); 182225039b37SCy Schubert ub_event_free(timer); 182325039b37SCy Schubert ub_event_base_free(info.base); 182425039b37SCy Schubert return; 182525039b37SCy Schubert } 182625039b37SCy Schubert info.stop_frame = fstrm_create_control_frame_stop( 182725039b37SCy Schubert &info.stop_frame_len); 182825039b37SCy Schubert if(!info.stop_frame) { 182925039b37SCy Schubert log_err("dnstap io: malloc failure"); 183025039b37SCy Schubert ub_event_del(stopev); 183125039b37SCy Schubert ub_event_free(stopev); 183225039b37SCy Schubert ub_timer_del(timer); 183325039b37SCy Schubert ub_event_free(timer); 183425039b37SCy Schubert ub_event_base_free(info.base); 183525039b37SCy Schubert return; 183625039b37SCy Schubert } 183725039b37SCy Schubert dtio->stop_flush_event = stopev; 183825039b37SCy Schubert 183925039b37SCy Schubert /* wait briefly, or until finished */ 184025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush started"); 184125039b37SCy Schubert if(ub_event_base_dispatch(info.base) < 0) { 184225039b37SCy Schubert log_err("dnstap io: dispatch flush failed, errno is %s", 184325039b37SCy Schubert strerror(errno)); 184425039b37SCy Schubert } 184525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush ended"); 184625039b37SCy Schubert free(info.stop_frame); 184725039b37SCy Schubert dtio->stop_flush_event = NULL; 184825039b37SCy Schubert ub_event_del(stopev); 184925039b37SCy Schubert ub_event_free(stopev); 185025039b37SCy Schubert ub_timer_del(timer); 185125039b37SCy Schubert ub_event_free(timer); 185225039b37SCy Schubert ub_event_base_free(info.base); 185325039b37SCy Schubert } 185425039b37SCy Schubert 185525039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */ 185625039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio) 185725039b37SCy Schubert { 185825039b37SCy Schubert dtio_control_stop_flush(dtio); 185925039b37SCy Schubert dtio_del_output_event(dtio); 186025039b37SCy Schubert dtio_close_output(dtio); 186125039b37SCy Schubert ub_event_del(dtio->command_event); 186225039b37SCy Schubert ub_event_free(dtio->command_event); 186325039b37SCy Schubert #ifndef USE_WINSOCK 186425039b37SCy Schubert close(dtio->commandpipe[0]); 186525039b37SCy Schubert #else 186625039b37SCy Schubert _close(dtio->commandpipe[0]); 186725039b37SCy Schubert #endif 186825039b37SCy Schubert dtio->commandpipe[0] = -1; 186925039b37SCy Schubert dtio_reconnect_del(dtio); 187025039b37SCy Schubert ub_event_free(dtio->reconnect_timer); 187125039b37SCy Schubert dtio_cur_msg_free(dtio); 187225039b37SCy Schubert #ifndef THREADS_DISABLED 187325039b37SCy Schubert ub_event_base_free(dtio->event_base); 187425039b37SCy Schubert #endif 187525039b37SCy Schubert } 187625039b37SCy Schubert 187725039b37SCy Schubert /** setup a start control message */ 187825039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio) 187925039b37SCy Schubert { 188025039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 188125039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 188225039b37SCy Schubert &dtio->cur_msg_len); 188325039b37SCy Schubert if(!dtio->cur_msg) { 188425039b37SCy Schubert return 0; 188525039b37SCy Schubert } 188625039b37SCy Schubert /* setup to send the control message */ 188725039b37SCy Schubert /* set that the buffer needs to be sent, but the length 188825039b37SCy Schubert * of that buffer is already written, that way the buffer can 188925039b37SCy Schubert * start with 0 length and then the length of the control frame 189025039b37SCy Schubert * in it */ 189125039b37SCy Schubert dtio->cur_msg_done = 0; 189225039b37SCy Schubert dtio->cur_msg_len_done = 4; 189325039b37SCy Schubert return 1; 189425039b37SCy Schubert } 189525039b37SCy Schubert 189625039b37SCy Schubert /** setup a ready control message */ 189725039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio) 189825039b37SCy Schubert { 189925039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 190025039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 190125039b37SCy Schubert &dtio->cur_msg_len); 190225039b37SCy Schubert if(!dtio->cur_msg) { 190325039b37SCy Schubert return 0; 190425039b37SCy Schubert } 190525039b37SCy Schubert /* setup to send the control message */ 190625039b37SCy Schubert /* set that the buffer needs to be sent, but the length 190725039b37SCy Schubert * of that buffer is already written, that way the buffer can 190825039b37SCy Schubert * start with 0 length and then the length of the control frame 190925039b37SCy Schubert * in it */ 191025039b37SCy Schubert dtio->cur_msg_done = 0; 191125039b37SCy Schubert dtio->cur_msg_len_done = 4; 191225039b37SCy Schubert return 1; 191325039b37SCy Schubert } 191425039b37SCy Schubert 191525039b37SCy Schubert /** open the output file descriptor for af_local */ 191625039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio) 191725039b37SCy Schubert { 191825039b37SCy Schubert #ifdef HAVE_SYS_UN_H 191925039b37SCy Schubert struct sockaddr_un s; 192025039b37SCy Schubert dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 192125039b37SCy Schubert if(dtio->fd == -1) { 192225039b37SCy Schubert log_err("dnstap io: failed to create socket: %s", 1923c0caa2e2SCy Schubert sock_strerror(errno)); 192425039b37SCy Schubert return 0; 192525039b37SCy Schubert } 192625039b37SCy Schubert memset(&s, 0, sizeof(s)); 192725039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 192825039b37SCy Schubert /* this member exists on BSDs, not Linux */ 192925039b37SCy Schubert s.sun_len = (unsigned)sizeof(s); 193025039b37SCy Schubert #endif 193125039b37SCy Schubert s.sun_family = AF_LOCAL; 193225039b37SCy Schubert /* length is 92-108, 104 on FreeBSD */ 193325039b37SCy Schubert (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 193425039b37SCy Schubert fd_set_nonblock(dtio->fd); 193525039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 193625039b37SCy Schubert == -1) { 193725039b37SCy Schubert char* to = dtio->socket_path; 1938c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1939c0caa2e2SCy Schubert verbosity < 4) { 1940c0caa2e2SCy Schubert dtio_close_fd(dtio); 1941c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1942c0caa2e2SCy Schubert } 194325039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 1944c0caa2e2SCy Schubert to, sock_strerror(errno)); 194525039b37SCy Schubert dtio_close_fd(dtio); 194625039b37SCy Schubert return 0; 194725039b37SCy Schubert } 194825039b37SCy Schubert return 1; 194925039b37SCy Schubert #else 195025039b37SCy Schubert log_err("cannot create af_local socket"); 195125039b37SCy Schubert return 0; 195225039b37SCy Schubert #endif /* HAVE_SYS_UN_H */ 195325039b37SCy Schubert } 195425039b37SCy Schubert 195525039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */ 195625039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio) 195725039b37SCy Schubert { 195825039b37SCy Schubert struct sockaddr_storage addr; 195925039b37SCy Schubert socklen_t addrlen; 196025039b37SCy Schubert memset(&addr, 0, sizeof(addr)); 196125039b37SCy Schubert addrlen = (socklen_t)sizeof(addr); 196225039b37SCy Schubert 1963865f46b2SCy Schubert if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) { 196425039b37SCy Schubert log_err("could not parse IP '%s'", dtio->ip_str); 196525039b37SCy Schubert return 0; 196625039b37SCy Schubert } 196725039b37SCy Schubert dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 196825039b37SCy Schubert if(dtio->fd == -1) { 1969c0caa2e2SCy Schubert log_err("can't create socket: %s", sock_strerror(errno)); 197025039b37SCy Schubert return 0; 197125039b37SCy Schubert } 197225039b37SCy Schubert fd_set_nonblock(dtio->fd); 197325039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 197425039b37SCy Schubert if(errno == EINPROGRESS) 197525039b37SCy Schubert return 1; /* wait until connect done*/ 1976c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1977c0caa2e2SCy Schubert verbosity < 4) { 1978c0caa2e2SCy Schubert dtio_close_fd(dtio); 1979c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1980c0caa2e2SCy Schubert } 198125039b37SCy Schubert #ifndef USE_WINSOCK 198225039b37SCy Schubert if(tcp_connect_errno_needs_log( 198325039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 198425039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 198525039b37SCy Schubert dtio->ip_str, strerror(errno)); 198625039b37SCy Schubert } 198725039b37SCy Schubert #else 198825039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS || 198925039b37SCy Schubert WSAGetLastError() == WSAEWOULDBLOCK) 199025039b37SCy Schubert return 1; /* wait until connect done*/ 199125039b37SCy Schubert if(tcp_connect_errno_needs_log( 199225039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 199325039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 199425039b37SCy Schubert dtio->ip_str, wsa_strerror(WSAGetLastError())); 199525039b37SCy Schubert } 199625039b37SCy Schubert #endif 199725039b37SCy Schubert dtio_close_fd(dtio); 199825039b37SCy Schubert return 0; 199925039b37SCy Schubert } 200025039b37SCy Schubert return 1; 200125039b37SCy Schubert } 200225039b37SCy Schubert 200325039b37SCy Schubert /** setup the SSL structure for new connection */ 200425039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio) 200525039b37SCy Schubert { 200625039b37SCy Schubert dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 200725039b37SCy Schubert if(!dtio->ssl) return 0; 200825039b37SCy Schubert dtio->ssl_handshake_done = 0; 200925039b37SCy Schubert dtio->ssl_brief_read = 0; 201025039b37SCy Schubert 201125039b37SCy Schubert if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 201225039b37SCy Schubert dtio->tls_use_sni)) { 201325039b37SCy Schubert return 0; 201425039b37SCy Schubert } 201525039b37SCy Schubert return 1; 201625039b37SCy Schubert } 201725039b37SCy Schubert 201825039b37SCy Schubert /** open the output file descriptor */ 201925039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio) 202025039b37SCy Schubert { 202125039b37SCy Schubert struct ub_event* ev; 202225039b37SCy Schubert if(dtio->upstream_is_unix) { 202325039b37SCy Schubert if(!dtio_open_output_local(dtio)) { 202425039b37SCy Schubert dtio_reconnect_enable(dtio); 202525039b37SCy Schubert return; 202625039b37SCy Schubert } 202725039b37SCy Schubert } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 202825039b37SCy Schubert if(!dtio_open_output_tcp(dtio)) { 202925039b37SCy Schubert dtio_reconnect_enable(dtio); 203025039b37SCy Schubert return; 203125039b37SCy Schubert } 203225039b37SCy Schubert if(dtio->upstream_is_tls) { 203325039b37SCy Schubert if(!dtio_setup_ssl(dtio)) { 203425039b37SCy Schubert dtio_close_fd(dtio); 203525039b37SCy Schubert dtio_reconnect_enable(dtio); 203625039b37SCy Schubert return; 203725039b37SCy Schubert } 203825039b37SCy Schubert } 203925039b37SCy Schubert } 204025039b37SCy Schubert dtio->check_nb_connect = 1; 204125039b37SCy Schubert 204225039b37SCy Schubert /* the EV_READ is to read ACCEPT control messages, and catch channel 204325039b37SCy Schubert * close. EV_WRITE is to write packets */ 204425039b37SCy Schubert ev = ub_event_new(dtio->event_base, dtio->fd, 204525039b37SCy Schubert UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 204625039b37SCy Schubert dtio); 204725039b37SCy Schubert if(!ev) { 204825039b37SCy Schubert log_err("dnstap io: out of memory"); 204925039b37SCy Schubert if(dtio->ssl) { 205025039b37SCy Schubert #ifdef HAVE_SSL 205125039b37SCy Schubert SSL_free(dtio->ssl); 205225039b37SCy Schubert dtio->ssl = NULL; 205325039b37SCy Schubert #endif 205425039b37SCy Schubert } 205525039b37SCy Schubert dtio_close_fd(dtio); 205625039b37SCy Schubert dtio_reconnect_enable(dtio); 205725039b37SCy Schubert return; 205825039b37SCy Schubert } 205925039b37SCy Schubert dtio->event = ev; 206025039b37SCy Schubert 206125039b37SCy Schubert /* setup protocol control message to start */ 206225039b37SCy Schubert if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 206325039b37SCy Schubert (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 206425039b37SCy Schubert log_err("dnstap io: out of memory"); 206525039b37SCy Schubert ub_event_free(dtio->event); 206625039b37SCy Schubert dtio->event = NULL; 206725039b37SCy Schubert if(dtio->ssl) { 206825039b37SCy Schubert #ifdef HAVE_SSL 206925039b37SCy Schubert SSL_free(dtio->ssl); 207025039b37SCy Schubert dtio->ssl = NULL; 207125039b37SCy Schubert #endif 207225039b37SCy Schubert } 207325039b37SCy Schubert dtio_close_fd(dtio); 207425039b37SCy Schubert dtio_reconnect_enable(dtio); 207525039b37SCy Schubert return; 207625039b37SCy Schubert } 207725039b37SCy Schubert } 207825039b37SCy Schubert 207925039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */ 208025039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio) 208125039b37SCy Schubert { 208225039b37SCy Schubert dtio_setup_cmd(dtio); 208325039b37SCy Schubert dtio_setup_reconnect(dtio); 208425039b37SCy Schubert dtio_open_output(dtio); 208525039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 208625039b37SCy Schubert return; 208725039b37SCy Schubert } 208825039b37SCy Schubert 208925039b37SCy Schubert #ifndef THREADS_DISABLED 209025039b37SCy Schubert /** the IO thread function for the DNSTAP IO */ 209125039b37SCy Schubert static void* dnstap_io(void* arg) 209225039b37SCy Schubert { 209325039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 209425039b37SCy Schubert time_t secs = 0; 209525039b37SCy Schubert struct timeval now; 209625039b37SCy Schubert log_thread_set(&dtio->threadnum); 209725039b37SCy Schubert 209825039b37SCy Schubert /* setup */ 209925039b37SCy Schubert verbose(VERB_ALGO, "start dnstap io thread"); 210025039b37SCy Schubert dtio_setup_base(dtio, &secs, &now); 210125039b37SCy Schubert dtio_setup_on_base(dtio); 210225039b37SCy Schubert 210325039b37SCy Schubert /* run */ 210425039b37SCy Schubert if(ub_event_base_dispatch(dtio->event_base) < 0) { 210525039b37SCy Schubert log_err("dnstap io: dispatch failed, errno is %s", 210625039b37SCy Schubert strerror(errno)); 210725039b37SCy Schubert } 210825039b37SCy Schubert 210925039b37SCy Schubert /* cleanup */ 211025039b37SCy Schubert verbose(VERB_ALGO, "stop dnstap io thread"); 211125039b37SCy Schubert dtio_desetup(dtio); 211225039b37SCy Schubert return NULL; 211325039b37SCy Schubert } 211425039b37SCy Schubert #endif /* THREADS_DISABLED */ 211525039b37SCy Schubert 211625039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 211725039b37SCy Schubert int numworkers) 211825039b37SCy Schubert { 211925039b37SCy Schubert /* set up the thread, can fail */ 212025039b37SCy Schubert #ifndef USE_WINSOCK 212125039b37SCy Schubert if(pipe(dtio->commandpipe) == -1) { 212225039b37SCy Schubert log_err("failed to create pipe: %s", strerror(errno)); 212325039b37SCy Schubert return 0; 212425039b37SCy Schubert } 212525039b37SCy Schubert #else 212625039b37SCy Schubert if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 212725039b37SCy Schubert log_err("failed to create _pipe: %s", 212825039b37SCy Schubert wsa_strerror(WSAGetLastError())); 212925039b37SCy Schubert return 0; 213025039b37SCy Schubert } 213125039b37SCy Schubert #endif 213225039b37SCy Schubert 213325039b37SCy Schubert /* start the thread */ 213425039b37SCy Schubert dtio->threadnum = numworkers+1; 213525039b37SCy Schubert dtio->started = 1; 213625039b37SCy Schubert #ifndef THREADS_DISABLED 213725039b37SCy Schubert ub_thread_create(&dtio->tid, dnstap_io, dtio); 213825039b37SCy Schubert (void)event_base_nothr; 213925039b37SCy Schubert #else 214025039b37SCy Schubert dtio->event_base = event_base_nothr; 214125039b37SCy Schubert dtio_setup_on_base(dtio); 214225039b37SCy Schubert #endif 214325039b37SCy Schubert return 1; 214425039b37SCy Schubert } 214525039b37SCy Schubert 214625039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio) 214725039b37SCy Schubert { 214825039b37SCy Schubert #ifndef THREADS_DISABLED 214925039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_STOP; 215025039b37SCy Schubert #endif 215125039b37SCy Schubert if(!dtio) return; 215225039b37SCy Schubert if(!dtio->started) return; 215325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: send stop cmd"); 215425039b37SCy Schubert 215525039b37SCy Schubert #ifndef THREADS_DISABLED 215625039b37SCy Schubert while(1) { 215725039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 215825039b37SCy Schubert if(r == -1) { 215925039b37SCy Schubert #ifndef USE_WINSOCK 216025039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 216125039b37SCy Schubert continue; 216225039b37SCy Schubert #else 216325039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 216425039b37SCy Schubert continue; 216525039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 216625039b37SCy Schubert continue; 216725039b37SCy Schubert #endif 2168c0caa2e2SCy Schubert log_err("dnstap io stop: write: %s", 2169c0caa2e2SCy Schubert sock_strerror(errno)); 217025039b37SCy Schubert break; 217125039b37SCy Schubert } 217225039b37SCy Schubert break; 217325039b37SCy Schubert } 217425039b37SCy Schubert dtio->started = 0; 217525039b37SCy Schubert #endif /* THREADS_DISABLED */ 217625039b37SCy Schubert 217725039b37SCy Schubert #ifndef USE_WINSOCK 217825039b37SCy Schubert close(dtio->commandpipe[1]); 217925039b37SCy Schubert #else 218025039b37SCy Schubert _close(dtio->commandpipe[1]); 218125039b37SCy Schubert #endif 218225039b37SCy Schubert dtio->commandpipe[1] = -1; 218325039b37SCy Schubert #ifndef THREADS_DISABLED 218425039b37SCy Schubert ub_thread_join(dtio->tid); 218525039b37SCy Schubert #else 218625039b37SCy Schubert dtio->want_to_exit = 1; 218725039b37SCy Schubert dtio_desetup(dtio); 218825039b37SCy Schubert #endif 218925039b37SCy Schubert } 2190