125039b37SCy Schubert /* 225039b37SCy Schubert * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 325039b37SCy Schubert * 425039b37SCy Schubert * Copyright (c) 2020, NLnet Labs. All rights reserved. 525039b37SCy Schubert * 625039b37SCy Schubert * This software is open source. 725039b37SCy Schubert * 825039b37SCy Schubert * Redistribution and use in source and binary forms, with or without 925039b37SCy Schubert * modification, are permitted provided that the following conditions 1025039b37SCy Schubert * are met: 1125039b37SCy Schubert * 1225039b37SCy Schubert * Redistributions of source code must retain the above copyright notice, 1325039b37SCy Schubert * this list of conditions and the following disclaimer. 1425039b37SCy Schubert * 1525039b37SCy Schubert * Redistributions in binary form must reproduce the above copyright notice, 1625039b37SCy Schubert * this list of conditions and the following disclaimer in the documentation 1725039b37SCy Schubert * and/or other materials provided with the distribution. 1825039b37SCy Schubert * 1925039b37SCy Schubert * Neither the name of the NLNET LABS nor the names of its contributors may 2025039b37SCy Schubert * be used to endorse or promote products derived from this software without 2125039b37SCy Schubert * specific prior written permission. 2225039b37SCy Schubert * 2325039b37SCy Schubert * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2425039b37SCy Schubert * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 2525039b37SCy Schubert * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 2625039b37SCy Schubert * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 2725039b37SCy Schubert * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 2825039b37SCy Schubert * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 2925039b37SCy Schubert * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 3025039b37SCy Schubert * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 3125039b37SCy Schubert * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 3225039b37SCy Schubert * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 3325039b37SCy Schubert * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3425039b37SCy Schubert * 3525039b37SCy Schubert */ 3625039b37SCy Schubert 3725039b37SCy Schubert /** 3825039b37SCy Schubert * \file 3925039b37SCy Schubert * 4025039b37SCy Schubert * An implementation of the Frame Streams data transport protocol for 4125039b37SCy Schubert * the Unbound DNSTAP message logging facility. 4225039b37SCy Schubert */ 4325039b37SCy Schubert 4425039b37SCy Schubert #include "config.h" 4525039b37SCy Schubert #include "dnstap/dtstream.h" 4625039b37SCy Schubert #include "dnstap/dnstap_fstrm.h" 4725039b37SCy Schubert #include "util/config_file.h" 4825039b37SCy Schubert #include "util/ub_event.h" 4925039b37SCy Schubert #include "util/net_help.h" 5025039b37SCy Schubert #include "services/outside_network.h" 5125039b37SCy Schubert #include "sldns/sbuffer.h" 5225039b37SCy Schubert #ifdef HAVE_SYS_UN_H 5325039b37SCy Schubert #include <sys/un.h> 5425039b37SCy Schubert #endif 5525039b37SCy Schubert #include <fcntl.h> 5625039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H 5725039b37SCy Schubert #include <openssl/ssl.h> 5825039b37SCy Schubert #endif 5925039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H 6025039b37SCy Schubert #include <openssl/err.h> 6125039b37SCy Schubert #endif 6225039b37SCy Schubert 6325039b37SCy Schubert /** number of messages to process in one output callback */ 6425039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100 6525039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */ 6625039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10 6725039b37SCy Schubert /** the msec to wait for reconnect max after backoff */ 6825039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000 6925039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 7025039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71c0caa2e2SCy Schubert /** number of messages before wakeup of thread */ 72c0caa2e2SCy Schubert #define DTIO_MSG_FOR_WAKEUP 32 7325039b37SCy Schubert 7425039b37SCy Schubert /** maximum length of received frame */ 7525039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000 7625039b37SCy Schubert 7725039b37SCy Schubert struct stop_flush_info; 7825039b37SCy Schubert /** DTIO command channel commands */ 7925039b37SCy Schubert enum { 8025039b37SCy Schubert /** DTIO command channel stop */ 8125039b37SCy Schubert DTIO_COMMAND_STOP = 0, 8225039b37SCy Schubert /** DTIO command channel wakeup */ 8325039b37SCy Schubert DTIO_COMMAND_WAKEUP = 1 8425039b37SCy Schubert } dtio_channel_command; 8525039b37SCy Schubert 8625039b37SCy Schubert /** open the output channel */ 8725039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio); 8825039b37SCy Schubert /** add output event for read and write */ 8925039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio); 9025039b37SCy Schubert /** start reconnection attempts */ 9125039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio); 9225039b37SCy Schubert /** stop from stop_flush event loop */ 9325039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info); 9425039b37SCy Schubert /** setup a start control message */ 9525039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio); 9625039b37SCy Schubert #ifdef HAVE_SSL 9725039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */ 9825039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio); 9925039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */ 10025039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio); 10125039b37SCy Schubert #endif 10225039b37SCy Schubert 10325039b37SCy Schubert struct dt_msg_queue* 104c0caa2e2SCy Schubert dt_msg_queue_create(struct comm_base* base) 10525039b37SCy Schubert { 10625039b37SCy Schubert struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 10725039b37SCy Schubert if(!mq) return NULL; 10825039b37SCy Schubert mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 10925039b37SCy Schubert about 1 M should contain 64K messages with some overhead, 11025039b37SCy Schubert or a whole bunch smaller ones */ 111c0caa2e2SCy Schubert mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112c0caa2e2SCy Schubert if(!mq->wakeup_timer) { 113c0caa2e2SCy Schubert free(mq); 114c0caa2e2SCy Schubert return NULL; 115c0caa2e2SCy Schubert } 11625039b37SCy Schubert lock_basic_init(&mq->lock); 11725039b37SCy Schubert lock_protect(&mq->lock, mq, sizeof(*mq)); 11825039b37SCy Schubert return mq; 11925039b37SCy Schubert } 12025039b37SCy Schubert 12125039b37SCy Schubert /** clear the message list, caller must hold the lock */ 12225039b37SCy Schubert static void 12325039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq) 12425039b37SCy Schubert { 12525039b37SCy Schubert struct dt_msg_entry* e = mq->first, *next=NULL; 12625039b37SCy Schubert while(e) { 12725039b37SCy Schubert next = e->next; 12825039b37SCy Schubert free(e->buf); 12925039b37SCy Schubert free(e); 13025039b37SCy Schubert e = next; 13125039b37SCy Schubert } 13225039b37SCy Schubert mq->first = NULL; 13325039b37SCy Schubert mq->last = NULL; 13425039b37SCy Schubert mq->cursize = 0; 135c0caa2e2SCy Schubert mq->msgcount = 0; 13625039b37SCy Schubert } 13725039b37SCy Schubert 13825039b37SCy Schubert void 13925039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq) 14025039b37SCy Schubert { 14125039b37SCy Schubert if(!mq) return; 14225039b37SCy Schubert lock_basic_destroy(&mq->lock); 14325039b37SCy Schubert dt_msg_queue_clear(mq); 144c0caa2e2SCy Schubert comm_timer_delete(mq->wakeup_timer); 14525039b37SCy Schubert free(mq); 14625039b37SCy Schubert } 14725039b37SCy Schubert 14825039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */ 14925039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio) 15025039b37SCy Schubert { 15125039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_WAKEUP; 15225039b37SCy Schubert if(!dtio) return; 15325039b37SCy Schubert if(!dtio->started) return; 15425039b37SCy Schubert 15525039b37SCy Schubert while(1) { 15625039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 15725039b37SCy Schubert if(r == -1) { 15825039b37SCy Schubert #ifndef USE_WINSOCK 15925039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 16025039b37SCy Schubert continue; 16125039b37SCy Schubert #else 16225039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 16325039b37SCy Schubert continue; 16425039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 16525039b37SCy Schubert continue; 16625039b37SCy Schubert #endif 167c0caa2e2SCy Schubert log_err("dnstap io wakeup: write: %s", 168c0caa2e2SCy Schubert sock_strerror(errno)); 16925039b37SCy Schubert break; 17025039b37SCy Schubert } 17125039b37SCy Schubert break; 17225039b37SCy Schubert } 17325039b37SCy Schubert } 17425039b37SCy Schubert 17525039b37SCy Schubert void 176c0caa2e2SCy Schubert mq_wakeup_cb(void* arg) 177c0caa2e2SCy Schubert { 178c0caa2e2SCy Schubert struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179c0caa2e2SCy Schubert /* even if the dtio is already active, because perhaps much 180c0caa2e2SCy Schubert * traffic suddenly, we leave the timer running to save on 181c0caa2e2SCy Schubert * managing it, the once a second timer is less work then 182c0caa2e2SCy Schubert * starting and stopping the timer frequently */ 183c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 0; 185c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186c0caa2e2SCy Schubert dtio_wakeup(mq->dtio); 187c0caa2e2SCy Schubert } 188c0caa2e2SCy Schubert 189c0caa2e2SCy Schubert /** start timer to wakeup dtio because there is content in the queue */ 190c0caa2e2SCy Schubert static void 191c0caa2e2SCy Schubert dt_msg_queue_start_timer(struct dt_msg_queue* mq) 192c0caa2e2SCy Schubert { 193c0caa2e2SCy Schubert struct timeval tv; 194c0caa2e2SCy Schubert /* Start a timer to process messages to be logged. 195c0caa2e2SCy Schubert * If we woke up the dtio thread for every message, the wakeup 196c0caa2e2SCy Schubert * messages take up too much processing power. If the queue 197c0caa2e2SCy Schubert * fills up the wakeup happens immediately. The timer wakes it up 198c0caa2e2SCy Schubert * if there are infrequent messages to log. */ 199c0caa2e2SCy Schubert 200c0caa2e2SCy Schubert /* we cannot start a timer in dtio thread, because it is a different 201c0caa2e2SCy Schubert * thread and its event base is in use by the other thread, it would 202c0caa2e2SCy Schubert * give race conditions if we tried to modify its event base, 203c0caa2e2SCy Schubert * and locks would wait until it woke up, and this is what we do. */ 204c0caa2e2SCy Schubert 205c0caa2e2SCy Schubert /* do not start the timer if a timer already exists, perhaps 206c0caa2e2SCy Schubert * in another worker. So this variable is protected by a lock in 207c0caa2e2SCy Schubert * dtio */ 208c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 209c0caa2e2SCy Schubert if(mq->dtio->wakeup_timer_enabled) { 210c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 211c0caa2e2SCy Schubert return; 212c0caa2e2SCy Schubert } 213c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 214c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215c0caa2e2SCy Schubert 216c0caa2e2SCy Schubert /* start the timer, in mq, in the event base of our worker */ 217c0caa2e2SCy Schubert tv.tv_sec = 1; 218c0caa2e2SCy Schubert tv.tv_usec = 0; 219c0caa2e2SCy Schubert comm_timer_set(mq->wakeup_timer, &tv); 220c0caa2e2SCy Schubert } 221c0caa2e2SCy Schubert 222c0caa2e2SCy Schubert void 22325039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 22425039b37SCy Schubert { 225c0caa2e2SCy Schubert int wakeupnow = 0, wakeupstarttimer = 0; 22625039b37SCy Schubert struct dt_msg_entry* entry; 22725039b37SCy Schubert 22825039b37SCy Schubert /* check conditions */ 22925039b37SCy Schubert if(!buf) return; 23025039b37SCy Schubert if(len == 0) { 23125039b37SCy Schubert /* it is not possible to log entries with zero length, 23225039b37SCy Schubert * because the framestream protocol does not carry it. 23325039b37SCy Schubert * However the protobuf serialization does not create zero 23425039b37SCy Schubert * length datagrams for dnstap, so this should not happen. */ 23525039b37SCy Schubert free(buf); 23625039b37SCy Schubert return; 23725039b37SCy Schubert } 23825039b37SCy Schubert if(!mq) { 23925039b37SCy Schubert free(buf); 24025039b37SCy Schubert return; 24125039b37SCy Schubert } 24225039b37SCy Schubert 24325039b37SCy Schubert /* allocate memory for queue entry */ 24425039b37SCy Schubert entry = malloc(sizeof(*entry)); 24525039b37SCy Schubert if(!entry) { 24625039b37SCy Schubert log_err("out of memory logging dnstap"); 24725039b37SCy Schubert free(buf); 24825039b37SCy Schubert return; 24925039b37SCy Schubert } 25025039b37SCy Schubert entry->next = NULL; 25125039b37SCy Schubert entry->buf = buf; 25225039b37SCy Schubert entry->len = len; 25325039b37SCy Schubert 25425039b37SCy Schubert /* aqcuire lock */ 25525039b37SCy Schubert lock_basic_lock(&mq->lock); 256c0caa2e2SCy Schubert /* if list was empty, start timer for (eventual) wakeup */ 25725039b37SCy Schubert if(mq->first == NULL) 258c0caa2e2SCy Schubert wakeupstarttimer = 1; 259c0caa2e2SCy Schubert /* if list contains more than wakeupnum elements, wakeup now, 260c0caa2e2SCy Schubert * or if list is (going to be) almost full */ 261c0caa2e2SCy Schubert if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 262c0caa2e2SCy Schubert (mq->cursize < mq->maxsize * 9 / 10 && 263c0caa2e2SCy Schubert mq->cursize+len >= mq->maxsize * 9 / 10)) 264c0caa2e2SCy Schubert wakeupnow = 1; 26525039b37SCy Schubert /* see if it is going to fit */ 26625039b37SCy Schubert if(mq->cursize + len > mq->maxsize) { 26725039b37SCy Schubert /* buffer full, or congested. */ 26825039b37SCy Schubert /* drop */ 26925039b37SCy Schubert lock_basic_unlock(&mq->lock); 27025039b37SCy Schubert free(buf); 27125039b37SCy Schubert free(entry); 27225039b37SCy Schubert return; 27325039b37SCy Schubert } 27425039b37SCy Schubert mq->cursize += len; 275c0caa2e2SCy Schubert mq->msgcount ++; 27625039b37SCy Schubert /* append to list */ 27725039b37SCy Schubert if(mq->last) { 27825039b37SCy Schubert mq->last->next = entry; 27925039b37SCy Schubert } else { 28025039b37SCy Schubert mq->first = entry; 28125039b37SCy Schubert } 28225039b37SCy Schubert mq->last = entry; 28325039b37SCy Schubert /* release lock */ 28425039b37SCy Schubert lock_basic_unlock(&mq->lock); 28525039b37SCy Schubert 286c0caa2e2SCy Schubert if(wakeupnow) { 28725039b37SCy Schubert dtio_wakeup(mq->dtio); 288c0caa2e2SCy Schubert } else if(wakeupstarttimer) { 289c0caa2e2SCy Schubert dt_msg_queue_start_timer(mq); 290c0caa2e2SCy Schubert } 29125039b37SCy Schubert } 29225039b37SCy Schubert 29325039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void) 29425039b37SCy Schubert { 29525039b37SCy Schubert struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 296c0caa2e2SCy Schubert lock_basic_init(&dtio->wakeup_timer_lock); 297c0caa2e2SCy Schubert lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 298c0caa2e2SCy Schubert sizeof(dtio->wakeup_timer_enabled)); 29925039b37SCy Schubert return dtio; 30025039b37SCy Schubert } 30125039b37SCy Schubert 30225039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio) 30325039b37SCy Schubert { 30425039b37SCy Schubert struct dt_io_list_item* item, *nextitem; 30525039b37SCy Schubert if(!dtio) return; 306c0caa2e2SCy Schubert lock_basic_destroy(&dtio->wakeup_timer_lock); 30725039b37SCy Schubert item=dtio->io_list; 30825039b37SCy Schubert while(item) { 30925039b37SCy Schubert nextitem = item->next; 31025039b37SCy Schubert free(item); 31125039b37SCy Schubert item = nextitem; 31225039b37SCy Schubert } 31325039b37SCy Schubert free(dtio->socket_path); 31425039b37SCy Schubert free(dtio->ip_str); 31525039b37SCy Schubert free(dtio->tls_server_name); 31625039b37SCy Schubert free(dtio->client_key_file); 31725039b37SCy Schubert free(dtio->client_cert_file); 31825039b37SCy Schubert if(dtio->ssl_ctx) { 31925039b37SCy Schubert #ifdef HAVE_SSL 32025039b37SCy Schubert SSL_CTX_free(dtio->ssl_ctx); 32125039b37SCy Schubert #endif 32225039b37SCy Schubert } 32325039b37SCy Schubert free(dtio); 32425039b37SCy Schubert } 32525039b37SCy Schubert 32625039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 32725039b37SCy Schubert { 32825039b37SCy Schubert if(!cfg->dnstap) { 32925039b37SCy Schubert log_warn("cannot setup dnstap because dnstap-enable is no"); 33025039b37SCy Schubert return 0; 33125039b37SCy Schubert } 33225039b37SCy Schubert 33325039b37SCy Schubert /* what type of connectivity do we have */ 33425039b37SCy Schubert if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 33525039b37SCy Schubert if(cfg->dnstap_tls) 33625039b37SCy Schubert dtio->upstream_is_tls = 1; 33725039b37SCy Schubert else dtio->upstream_is_tcp = 1; 33825039b37SCy Schubert } else { 33925039b37SCy Schubert dtio->upstream_is_unix = 1; 34025039b37SCy Schubert } 34125039b37SCy Schubert dtio->is_bidirectional = cfg->dnstap_bidirectional; 34225039b37SCy Schubert 34325039b37SCy Schubert if(dtio->upstream_is_unix) { 344*369c6923SCy Schubert char* nm; 34525039b37SCy Schubert if(!cfg->dnstap_socket_path || 34625039b37SCy Schubert cfg->dnstap_socket_path[0]==0) { 34725039b37SCy Schubert log_err("dnstap setup: no dnstap-socket-path for " 34825039b37SCy Schubert "socket connect"); 34925039b37SCy Schubert return 0; 35025039b37SCy Schubert } 351*369c6923SCy Schubert nm = cfg->dnstap_socket_path; 352*369c6923SCy Schubert if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 353*369c6923SCy Schubert cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 354*369c6923SCy Schubert nm += strlen(cfg->chrootdir); 35525039b37SCy Schubert free(dtio->socket_path); 356*369c6923SCy Schubert dtio->socket_path = strdup(nm); 35725039b37SCy Schubert if(!dtio->socket_path) { 35825039b37SCy Schubert log_err("dnstap setup: malloc failure"); 35925039b37SCy Schubert return 0; 36025039b37SCy Schubert } 36125039b37SCy Schubert } 36225039b37SCy Schubert 36325039b37SCy Schubert if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 36425039b37SCy Schubert if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 36525039b37SCy Schubert log_err("dnstap setup: no dnstap-ip for TCP connect"); 36625039b37SCy Schubert return 0; 36725039b37SCy Schubert } 36825039b37SCy Schubert free(dtio->ip_str); 36925039b37SCy Schubert dtio->ip_str = strdup(cfg->dnstap_ip); 37025039b37SCy Schubert if(!dtio->ip_str) { 37125039b37SCy Schubert log_err("dnstap setup: malloc failure"); 37225039b37SCy Schubert return 0; 37325039b37SCy Schubert } 37425039b37SCy Schubert } 37525039b37SCy Schubert 37625039b37SCy Schubert if(dtio->upstream_is_tls) { 37725039b37SCy Schubert #ifdef HAVE_SSL 37825039b37SCy Schubert if(cfg->dnstap_tls_server_name && 37925039b37SCy Schubert cfg->dnstap_tls_server_name[0]) { 38025039b37SCy Schubert free(dtio->tls_server_name); 38125039b37SCy Schubert dtio->tls_server_name = strdup( 38225039b37SCy Schubert cfg->dnstap_tls_server_name); 38325039b37SCy Schubert if(!dtio->tls_server_name) { 38425039b37SCy Schubert log_err("dnstap setup: malloc failure"); 38525039b37SCy Schubert return 0; 38625039b37SCy Schubert } 38725039b37SCy Schubert if(!check_auth_name_for_ssl(dtio->tls_server_name)) 38825039b37SCy Schubert return 0; 38925039b37SCy Schubert } 39025039b37SCy Schubert if(cfg->dnstap_tls_client_key_file && 39125039b37SCy Schubert cfg->dnstap_tls_client_key_file[0]) { 39225039b37SCy Schubert dtio->use_client_certs = 1; 39325039b37SCy Schubert free(dtio->client_key_file); 39425039b37SCy Schubert dtio->client_key_file = strdup( 39525039b37SCy Schubert cfg->dnstap_tls_client_key_file); 39625039b37SCy Schubert if(!dtio->client_key_file) { 39725039b37SCy Schubert log_err("dnstap setup: malloc failure"); 39825039b37SCy Schubert return 0; 39925039b37SCy Schubert } 40025039b37SCy Schubert if(!cfg->dnstap_tls_client_cert_file || 40125039b37SCy Schubert cfg->dnstap_tls_client_cert_file[0]==0) { 40225039b37SCy Schubert log_err("dnstap setup: client key " 40325039b37SCy Schubert "authentication enabled with " 40425039b37SCy Schubert "dnstap-tls-client-key-file, but " 40525039b37SCy Schubert "no dnstap-tls-client-cert-file " 40625039b37SCy Schubert "is given"); 40725039b37SCy Schubert return 0; 40825039b37SCy Schubert } 40925039b37SCy Schubert free(dtio->client_cert_file); 41025039b37SCy Schubert dtio->client_cert_file = strdup( 41125039b37SCy Schubert cfg->dnstap_tls_client_cert_file); 41225039b37SCy Schubert if(!dtio->client_cert_file) { 41325039b37SCy Schubert log_err("dnstap setup: malloc failure"); 41425039b37SCy Schubert return 0; 41525039b37SCy Schubert } 41625039b37SCy Schubert } else { 41725039b37SCy Schubert dtio->use_client_certs = 0; 41825039b37SCy Schubert dtio->client_key_file = NULL; 41925039b37SCy Schubert dtio->client_cert_file = NULL; 42025039b37SCy Schubert } 42125039b37SCy Schubert 42225039b37SCy Schubert if(cfg->dnstap_tls_cert_bundle) { 42325039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 42425039b37SCy Schubert dtio->client_key_file, 42525039b37SCy Schubert dtio->client_cert_file, 42625039b37SCy Schubert cfg->dnstap_tls_cert_bundle, 0); 42725039b37SCy Schubert } else { 42825039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 42925039b37SCy Schubert dtio->client_key_file, 43025039b37SCy Schubert dtio->client_cert_file, 43125039b37SCy Schubert cfg->tls_cert_bundle, cfg->tls_win_cert); 43225039b37SCy Schubert } 43325039b37SCy Schubert if(!dtio->ssl_ctx) { 43425039b37SCy Schubert log_err("could not setup SSL CTX"); 43525039b37SCy Schubert return 0; 43625039b37SCy Schubert } 43725039b37SCy Schubert dtio->tls_use_sni = cfg->tls_use_sni; 43825039b37SCy Schubert #endif /* HAVE_SSL */ 43925039b37SCy Schubert } 44025039b37SCy Schubert return 1; 44125039b37SCy Schubert } 44225039b37SCy Schubert 44325039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio, 44425039b37SCy Schubert struct dt_msg_queue* mq) 44525039b37SCy Schubert { 44625039b37SCy Schubert struct dt_io_list_item* item = malloc(sizeof(*item)); 44725039b37SCy Schubert if(!item) return 0; 44825039b37SCy Schubert lock_basic_lock(&mq->lock); 44925039b37SCy Schubert mq->dtio = dtio; 45025039b37SCy Schubert lock_basic_unlock(&mq->lock); 45125039b37SCy Schubert item->queue = mq; 45225039b37SCy Schubert item->next = dtio->io_list; 45325039b37SCy Schubert dtio->io_list = item; 45425039b37SCy Schubert dtio->io_list_iter = NULL; 45525039b37SCy Schubert return 1; 45625039b37SCy Schubert } 45725039b37SCy Schubert 45825039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 45925039b37SCy Schubert struct dt_msg_queue* mq) 46025039b37SCy Schubert { 46125039b37SCy Schubert struct dt_io_list_item* item, *prev=NULL; 46225039b37SCy Schubert if(!dtio) return; 46325039b37SCy Schubert item = dtio->io_list; 46425039b37SCy Schubert while(item) { 46525039b37SCy Schubert if(item->queue == mq) { 46625039b37SCy Schubert /* found it */ 46725039b37SCy Schubert if(prev) prev->next = item->next; 46825039b37SCy Schubert else dtio->io_list = item->next; 46925039b37SCy Schubert /* the queue itself only registered, not deleted */ 47025039b37SCy Schubert lock_basic_lock(&item->queue->lock); 47125039b37SCy Schubert item->queue->dtio = NULL; 47225039b37SCy Schubert lock_basic_unlock(&item->queue->lock); 47325039b37SCy Schubert free(item); 47425039b37SCy Schubert dtio->io_list_iter = NULL; 47525039b37SCy Schubert return; 47625039b37SCy Schubert } 47725039b37SCy Schubert prev = item; 47825039b37SCy Schubert item = item->next; 47925039b37SCy Schubert } 48025039b37SCy Schubert } 48125039b37SCy Schubert 48225039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks, 48325039b37SCy Schubert * returns true if there is a message */ 48425039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 48525039b37SCy Schubert size_t* len) 48625039b37SCy Schubert { 48725039b37SCy Schubert lock_basic_lock(&mq->lock); 48825039b37SCy Schubert if(mq->first) { 48925039b37SCy Schubert struct dt_msg_entry* entry = mq->first; 49025039b37SCy Schubert mq->first = entry->next; 49125039b37SCy Schubert if(!entry->next) mq->last = NULL; 49225039b37SCy Schubert mq->cursize -= entry->len; 493c0caa2e2SCy Schubert mq->msgcount --; 49425039b37SCy Schubert lock_basic_unlock(&mq->lock); 49525039b37SCy Schubert 49625039b37SCy Schubert *buf = entry->buf; 49725039b37SCy Schubert *len = entry->len; 49825039b37SCy Schubert free(entry); 49925039b37SCy Schubert return 1; 50025039b37SCy Schubert } 50125039b37SCy Schubert lock_basic_unlock(&mq->lock); 50225039b37SCy Schubert return 0; 50325039b37SCy Schubert } 50425039b37SCy Schubert 50525039b37SCy Schubert /** find message in queue, false if no message, true if message to send */ 50625039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio, 50725039b37SCy Schubert struct dt_msg_queue* mq) 50825039b37SCy Schubert { 50925039b37SCy Schubert void* buf=NULL; 51025039b37SCy Schubert size_t len=0; 51125039b37SCy Schubert if(dt_msg_queue_pop(mq, &buf, &len)) { 51225039b37SCy Schubert dtio->cur_msg = buf; 51325039b37SCy Schubert dtio->cur_msg_len = len; 51425039b37SCy Schubert dtio->cur_msg_done = 0; 51525039b37SCy Schubert dtio->cur_msg_len_done = 0; 51625039b37SCy Schubert return 1; 51725039b37SCy Schubert } 51825039b37SCy Schubert return 0; 51925039b37SCy Schubert } 52025039b37SCy Schubert 52125039b37SCy Schubert /** find a new message to write, search message queues, false if none */ 52225039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio) 52325039b37SCy Schubert { 52425039b37SCy Schubert struct dt_io_list_item *spot, *item; 52525039b37SCy Schubert 52625039b37SCy Schubert spot = dtio->io_list_iter; 52725039b37SCy Schubert /* use the next queue for the next message lookup, 52825039b37SCy Schubert * if we hit the end(NULL) the NULL restarts the iter at start. */ 52925039b37SCy Schubert if(spot) 53025039b37SCy Schubert dtio->io_list_iter = spot->next; 53125039b37SCy Schubert else if(dtio->io_list) 53225039b37SCy Schubert dtio->io_list_iter = dtio->io_list->next; 53325039b37SCy Schubert 53425039b37SCy Schubert /* scan from spot to end-of-io_list */ 53525039b37SCy Schubert item = spot; 53625039b37SCy Schubert while(item) { 53725039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 53825039b37SCy Schubert return 1; 53925039b37SCy Schubert item = item->next; 54025039b37SCy Schubert } 54125039b37SCy Schubert /* scan starting at the start-of-list (to wrap around the end) */ 54225039b37SCy Schubert item = dtio->io_list; 54325039b37SCy Schubert while(item) { 54425039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 54525039b37SCy Schubert return 1; 54625039b37SCy Schubert item = item->next; 54725039b37SCy Schubert } 54825039b37SCy Schubert return 0; 54925039b37SCy Schubert } 55025039b37SCy Schubert 55125039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */ 55225039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 55325039b37SCy Schubert short ATTR_UNUSED(bits), void* arg) 55425039b37SCy Schubert { 55525039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 55625039b37SCy Schubert dtio->reconnect_is_added = 0; 55725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: reconnect timer"); 55825039b37SCy Schubert 55925039b37SCy Schubert dtio_open_output(dtio); 56025039b37SCy Schubert if(dtio->event) { 56125039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 56225039b37SCy Schubert return; 56325039b37SCy Schubert /* nothing wrong so far, wait on the output event */ 56425039b37SCy Schubert return; 56525039b37SCy Schubert } 56625039b37SCy Schubert /* exponential backoff and retry on timer */ 56725039b37SCy Schubert dtio_reconnect_enable(dtio); 56825039b37SCy Schubert } 56925039b37SCy Schubert 57025039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */ 57125039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio) 57225039b37SCy Schubert { 57325039b37SCy Schubert struct timeval tv; 57425039b37SCy Schubert int msec; 57525039b37SCy Schubert if(dtio->want_to_exit) return; 57625039b37SCy Schubert if(dtio->reconnect_is_added) 57725039b37SCy Schubert return; /* already done */ 57825039b37SCy Schubert 57925039b37SCy Schubert /* exponential backoff, store the value for next timeout */ 58025039b37SCy Schubert msec = dtio->reconnect_timeout; 58125039b37SCy Schubert if(msec == 0) { 58225039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 58325039b37SCy Schubert } else { 58425039b37SCy Schubert dtio->reconnect_timeout = msec*2; 58525039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 58625039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 58725039b37SCy Schubert } 58825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 58925039b37SCy Schubert msec); 59025039b37SCy Schubert 59125039b37SCy Schubert /* setup wait timer */ 59225039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 59325039b37SCy Schubert tv.tv_sec = msec/1000; 59425039b37SCy Schubert tv.tv_usec = (msec%1000)*1000; 59525039b37SCy Schubert if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 59625039b37SCy Schubert &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 59725039b37SCy Schubert log_err("dnstap io: could not reconnect ev timer add"); 59825039b37SCy Schubert return; 59925039b37SCy Schubert } 60025039b37SCy Schubert dtio->reconnect_is_added = 1; 60125039b37SCy Schubert } 60225039b37SCy Schubert 60325039b37SCy Schubert /** remove dtio reconnect timer */ 60425039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio) 60525039b37SCy Schubert { 60625039b37SCy Schubert if(!dtio->reconnect_is_added) 60725039b37SCy Schubert return; 60825039b37SCy Schubert ub_timer_del(dtio->reconnect_timer); 60925039b37SCy Schubert dtio->reconnect_is_added = 0; 61025039b37SCy Schubert } 61125039b37SCy Schubert 61225039b37SCy Schubert /** clear the reconnect exponential backoff timer. 61325039b37SCy Schubert * We have successfully connected so we can try again with short timeouts. */ 61425039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio) 61525039b37SCy Schubert { 61625039b37SCy Schubert dtio->reconnect_timeout = 0; 61725039b37SCy Schubert dtio_reconnect_del(dtio); 61825039b37SCy Schubert } 61925039b37SCy Schubert 62025039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */ 62125039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 62225039b37SCy Schubert { 62325039b37SCy Schubert dtio_reconnect_del(dtio); 62425039b37SCy Schubert dtio->reconnect_timeout = msec; 62525039b37SCy Schubert dtio_reconnect_enable(dtio); 62625039b37SCy Schubert } 62725039b37SCy Schubert 62825039b37SCy Schubert /** delete the current message in the dtio, and reset counters */ 62925039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio) 63025039b37SCy Schubert { 63125039b37SCy Schubert free(dtio->cur_msg); 63225039b37SCy Schubert dtio->cur_msg = NULL; 63325039b37SCy Schubert dtio->cur_msg_len = 0; 63425039b37SCy Schubert dtio->cur_msg_done = 0; 63525039b37SCy Schubert dtio->cur_msg_len_done = 0; 63625039b37SCy Schubert } 63725039b37SCy Schubert 63825039b37SCy Schubert /** delete the buffer and counters used to read frame */ 63925039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 64025039b37SCy Schubert { 64125039b37SCy Schubert if(rb->buf) { 64225039b37SCy Schubert free(rb->buf); 64325039b37SCy Schubert rb->buf = NULL; 64425039b37SCy Schubert } 64525039b37SCy Schubert rb->buf_count = 0; 64625039b37SCy Schubert rb->buf_cap = 0; 64725039b37SCy Schubert rb->frame_len = 0; 64825039b37SCy Schubert rb->frame_len_done = 0; 64925039b37SCy Schubert rb->control_frame = 0; 65025039b37SCy Schubert } 65125039b37SCy Schubert 65225039b37SCy Schubert /** del the output file descriptor event for listening */ 65325039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio) 65425039b37SCy Schubert { 65525039b37SCy Schubert if(!dtio->event_added) 65625039b37SCy Schubert return; 65725039b37SCy Schubert ub_event_del(dtio->event); 65825039b37SCy Schubert dtio->event_added = 0; 65925039b37SCy Schubert dtio->event_added_is_write = 0; 66025039b37SCy Schubert } 66125039b37SCy Schubert 66225039b37SCy Schubert /** close dtio socket and set it to -1 */ 66325039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio) 66425039b37SCy Schubert { 665c0caa2e2SCy Schubert sock_close(dtio->fd); 66625039b37SCy Schubert dtio->fd = -1; 66725039b37SCy Schubert } 66825039b37SCy Schubert 66925039b37SCy Schubert /** close and stop the output file descriptor event */ 67025039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio) 67125039b37SCy Schubert { 67225039b37SCy Schubert if(!dtio->event) 67325039b37SCy Schubert return; 67425039b37SCy Schubert ub_event_free(dtio->event); 67525039b37SCy Schubert dtio->event = NULL; 67625039b37SCy Schubert if(dtio->ssl) { 67725039b37SCy Schubert #ifdef HAVE_SSL 67825039b37SCy Schubert SSL_shutdown(dtio->ssl); 67925039b37SCy Schubert SSL_free(dtio->ssl); 68025039b37SCy Schubert dtio->ssl = NULL; 68125039b37SCy Schubert #endif 68225039b37SCy Schubert } 68325039b37SCy Schubert dtio_close_fd(dtio); 68425039b37SCy Schubert 68525039b37SCy Schubert /* if there is a (partial) message, discard it 68625039b37SCy Schubert * we cannot send (the remainder of) it, and a new 68725039b37SCy Schubert * connection needs to start with a control frame. */ 68825039b37SCy Schubert if(dtio->cur_msg) { 68925039b37SCy Schubert dtio_cur_msg_free(dtio); 69025039b37SCy Schubert } 69125039b37SCy Schubert 69225039b37SCy Schubert dtio->ready_frame_sent = 0; 69325039b37SCy Schubert dtio->accept_frame_received = 0; 69425039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 69525039b37SCy Schubert 69625039b37SCy Schubert dtio_reconnect_enable(dtio); 69725039b37SCy Schubert } 69825039b37SCy Schubert 69925039b37SCy Schubert /** check for pending nonblocking connect errors, 70025039b37SCy Schubert * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 70125039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio) 70225039b37SCy Schubert { 70325039b37SCy Schubert int error = 0; 70425039b37SCy Schubert socklen_t len = (socklen_t)sizeof(error); 70525039b37SCy Schubert if(!dtio->check_nb_connect) 70625039b37SCy Schubert return 1; /* everything okay */ 70725039b37SCy Schubert if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 70825039b37SCy Schubert &len) < 0) { 70925039b37SCy Schubert #ifndef USE_WINSOCK 71025039b37SCy Schubert error = errno; /* on solaris errno is error */ 71125039b37SCy Schubert #else 71225039b37SCy Schubert error = WSAGetLastError(); 71325039b37SCy Schubert #endif 71425039b37SCy Schubert } 71525039b37SCy Schubert #ifndef USE_WINSOCK 71625039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 71725039b37SCy Schubert if(error == EINPROGRESS || error == EWOULDBLOCK) 71825039b37SCy Schubert return 0; /* try again later */ 71925039b37SCy Schubert #endif 72025039b37SCy Schubert #else 72125039b37SCy Schubert if(error == WSAEINPROGRESS) { 72225039b37SCy Schubert return 0; /* try again later */ 72325039b37SCy Schubert } else if(error == WSAEWOULDBLOCK) { 72425039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 72525039b37SCy Schubert dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 72625039b37SCy Schubert return 0; /* try again later */ 72725039b37SCy Schubert } 72825039b37SCy Schubert #endif 72925039b37SCy Schubert if(error != 0) { 73025039b37SCy Schubert char* to = dtio->socket_path; 73125039b37SCy Schubert if(!to) to = dtio->ip_str; 73225039b37SCy Schubert if(!to) to = ""; 73325039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 734c0caa2e2SCy Schubert to, sock_strerror(error)); 73525039b37SCy Schubert return -1; /* error, close it */ 73625039b37SCy Schubert } 73725039b37SCy Schubert 73825039b37SCy Schubert if(dtio->ip_str) 73925039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to %s", 74025039b37SCy Schubert dtio->ip_str); 74125039b37SCy Schubert else if(dtio->socket_path) 74225039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 74325039b37SCy Schubert dtio->socket_path); 74425039b37SCy Schubert dtio_reconnect_clear(dtio); 74525039b37SCy Schubert dtio->check_nb_connect = 0; 74625039b37SCy Schubert return 1; /* everything okay */ 74725039b37SCy Schubert } 74825039b37SCy Schubert 74925039b37SCy Schubert #ifdef HAVE_SSL 75025039b37SCy Schubert /** write to ssl output 75125039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 75225039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 75325039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 75425039b37SCy Schubert size_t len) 75525039b37SCy Schubert { 75625039b37SCy Schubert int r; 75725039b37SCy Schubert ERR_clear_error(); 75825039b37SCy Schubert r = SSL_write(dtio->ssl, buf, len); 75925039b37SCy Schubert if(r <= 0) { 76025039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 76125039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 76225039b37SCy Schubert /* closed */ 76325039b37SCy Schubert return -1; 76425039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 76525039b37SCy Schubert /* we want a brief read event */ 76625039b37SCy Schubert dtio_enable_brief_read(dtio); 76725039b37SCy Schubert return 0; 76825039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 76925039b37SCy Schubert /* write again later */ 77025039b37SCy Schubert return 0; 77125039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 77225039b37SCy Schubert #ifdef EPIPE 77325039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 77425039b37SCy Schubert return -1; /* silence 'broken pipe' */ 77525039b37SCy Schubert #endif 77625039b37SCy Schubert #ifdef ECONNRESET 77725039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 77825039b37SCy Schubert return -1; /* silence reset by peer */ 77925039b37SCy Schubert #endif 78025039b37SCy Schubert if(errno != 0) { 78125039b37SCy Schubert log_err("dnstap io, SSL_write syscall: %s", 78225039b37SCy Schubert strerror(errno)); 78325039b37SCy Schubert } 78425039b37SCy Schubert return -1; 78525039b37SCy Schubert } 78625039b37SCy Schubert log_crypto_err("dnstap io, could not SSL_write"); 78725039b37SCy Schubert return -1; 78825039b37SCy Schubert } 78925039b37SCy Schubert return r; 79025039b37SCy Schubert } 79125039b37SCy Schubert #endif /* HAVE_SSL */ 79225039b37SCy Schubert 79325039b37SCy Schubert /** write buffer to output. 79425039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 79525039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 79625039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 79725039b37SCy Schubert size_t len) 79825039b37SCy Schubert { 79925039b37SCy Schubert ssize_t ret; 80025039b37SCy Schubert if(dtio->fd == -1) 80125039b37SCy Schubert return -1; 80225039b37SCy Schubert #ifdef HAVE_SSL 80325039b37SCy Schubert if(dtio->ssl) 80425039b37SCy Schubert return dtio_write_ssl(dtio, buf, len); 80525039b37SCy Schubert #endif 80625039b37SCy Schubert ret = send(dtio->fd, (void*)buf, len, 0); 80725039b37SCy Schubert if(ret == -1) { 80825039b37SCy Schubert #ifndef USE_WINSOCK 80925039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 81025039b37SCy Schubert return 0; 81125039b37SCy Schubert #else 81225039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 81325039b37SCy Schubert return 0; 81425039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 81525039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 81625039b37SCy Schubert dtio->stop_flush_event:dtio->event), 81725039b37SCy Schubert UB_EV_WRITE); 81825039b37SCy Schubert return 0; 81925039b37SCy Schubert } 82025039b37SCy Schubert #endif 821c0caa2e2SCy Schubert log_err("dnstap io: failed send: %s", sock_strerror(errno)); 82225039b37SCy Schubert return -1; 82325039b37SCy Schubert } 82425039b37SCy Schubert return ret; 82525039b37SCy Schubert } 82625039b37SCy Schubert 82725039b37SCy Schubert #ifdef HAVE_WRITEV 82825039b37SCy Schubert /** write with writev, len and message, in one write, if possible. 82925039b37SCy Schubert * return true if message is done, false if incomplete */ 83025039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio) 83125039b37SCy Schubert { 83225039b37SCy Schubert uint32_t sendlen = htonl(dtio->cur_msg_len); 83325039b37SCy Schubert struct iovec iov[2]; 83425039b37SCy Schubert ssize_t r; 83525039b37SCy Schubert iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 83625039b37SCy Schubert iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 83725039b37SCy Schubert iov[1].iov_base = dtio->cur_msg; 83825039b37SCy Schubert iov[1].iov_len = dtio->cur_msg_len; 83925039b37SCy Schubert log_assert(iov[0].iov_len > 0); 84025039b37SCy Schubert r = writev(dtio->fd, iov, 2); 84125039b37SCy Schubert if(r == -1) { 84225039b37SCy Schubert #ifndef USE_WINSOCK 84325039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 84425039b37SCy Schubert return 0; 84525039b37SCy Schubert #else 84625039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 84725039b37SCy Schubert return 0; 84825039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 84925039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 85025039b37SCy Schubert dtio->stop_flush_event:dtio->event), 85125039b37SCy Schubert UB_EV_WRITE); 85225039b37SCy Schubert return 0; 85325039b37SCy Schubert } 85425039b37SCy Schubert #endif 855c0caa2e2SCy Schubert log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 85625039b37SCy Schubert /* close the channel */ 85725039b37SCy Schubert dtio_del_output_event(dtio); 85825039b37SCy Schubert dtio_close_output(dtio); 85925039b37SCy Schubert return 0; 86025039b37SCy Schubert } 86125039b37SCy Schubert /* written r bytes */ 86225039b37SCy Schubert dtio->cur_msg_len_done += r; 86325039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 86425039b37SCy Schubert return 0; 86525039b37SCy Schubert if(dtio->cur_msg_len_done > 4) { 86625039b37SCy Schubert dtio->cur_msg_done = dtio->cur_msg_len_done-4; 86725039b37SCy Schubert dtio->cur_msg_len_done = 4; 86825039b37SCy Schubert } 86925039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 87025039b37SCy Schubert return 0; 87125039b37SCy Schubert return 1; 87225039b37SCy Schubert } 87325039b37SCy Schubert #endif /* HAVE_WRITEV */ 87425039b37SCy Schubert 87525039b37SCy Schubert /** write more of the length, preceding the data frame. 87625039b37SCy Schubert * return true if message is done, false if incomplete. */ 87725039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio) 87825039b37SCy Schubert { 87925039b37SCy Schubert uint32_t sendlen; 88025039b37SCy Schubert int r; 88125039b37SCy Schubert if(dtio->cur_msg_len_done >= 4) 88225039b37SCy Schubert return 1; 88325039b37SCy Schubert #ifdef HAVE_WRITEV 88425039b37SCy Schubert if(!dtio->ssl) { 88525039b37SCy Schubert /* we try writev for everything.*/ 88625039b37SCy Schubert return dtio_write_with_writev(dtio); 88725039b37SCy Schubert } 88825039b37SCy Schubert #endif /* HAVE_WRITEV */ 88925039b37SCy Schubert sendlen = htonl(dtio->cur_msg_len); 89025039b37SCy Schubert r = dtio_write_buf(dtio, 89125039b37SCy Schubert ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 89225039b37SCy Schubert sizeof(sendlen)-dtio->cur_msg_len_done); 89325039b37SCy Schubert if(r == -1) { 89425039b37SCy Schubert /* close the channel */ 89525039b37SCy Schubert dtio_del_output_event(dtio); 89625039b37SCy Schubert dtio_close_output(dtio); 89725039b37SCy Schubert return 0; 89825039b37SCy Schubert } else if(r == 0) { 89925039b37SCy Schubert /* try again later */ 90025039b37SCy Schubert return 0; 90125039b37SCy Schubert } 90225039b37SCy Schubert dtio->cur_msg_len_done += r; 90325039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 90425039b37SCy Schubert return 0; 90525039b37SCy Schubert return 1; 90625039b37SCy Schubert } 90725039b37SCy Schubert 90825039b37SCy Schubert /** write more of the data frame. 90925039b37SCy Schubert * return true if message is done, false if incomplete. */ 91025039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio) 91125039b37SCy Schubert { 91225039b37SCy Schubert int r; 91325039b37SCy Schubert if(dtio->cur_msg_done >= dtio->cur_msg_len) 91425039b37SCy Schubert return 1; 91525039b37SCy Schubert r = dtio_write_buf(dtio, 91625039b37SCy Schubert ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 91725039b37SCy Schubert dtio->cur_msg_len - dtio->cur_msg_done); 91825039b37SCy Schubert if(r == -1) { 91925039b37SCy Schubert /* close the channel */ 92025039b37SCy Schubert dtio_del_output_event(dtio); 92125039b37SCy Schubert dtio_close_output(dtio); 92225039b37SCy Schubert return 0; 92325039b37SCy Schubert } else if(r == 0) { 92425039b37SCy Schubert /* try again later */ 92525039b37SCy Schubert return 0; 92625039b37SCy Schubert } 92725039b37SCy Schubert dtio->cur_msg_done += r; 92825039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 92925039b37SCy Schubert return 0; 93025039b37SCy Schubert return 1; 93125039b37SCy Schubert } 93225039b37SCy Schubert 93325039b37SCy Schubert /** write more of the current messsage. false if incomplete, true if 93425039b37SCy Schubert * the message is done */ 93525039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio) 93625039b37SCy Schubert { 93725039b37SCy Schubert if(dtio->cur_msg_len_done < 4) { 93825039b37SCy Schubert if(!dtio_write_more_of_len(dtio)) 93925039b37SCy Schubert return 0; 94025039b37SCy Schubert } 94125039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 94225039b37SCy Schubert if(!dtio_write_more_of_data(dtio)) 94325039b37SCy Schubert return 0; 94425039b37SCy Schubert } 94525039b37SCy Schubert return 1; 94625039b37SCy Schubert } 94725039b37SCy Schubert 94825039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 94925039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 95025039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 95125039b37SCy Schubert ssize_t r; 95225039b37SCy Schubert r = recv(dtio->fd, (void*)buf, len, 0); 95325039b37SCy Schubert if(r == -1) { 95425039b37SCy Schubert char* to = dtio->socket_path; 95525039b37SCy Schubert if(!to) to = dtio->ip_str; 95625039b37SCy Schubert if(!to) to = ""; 95725039b37SCy Schubert #ifndef USE_WINSOCK 95825039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 95925039b37SCy Schubert return -1; /* try later */ 96025039b37SCy Schubert #else 96125039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) { 96225039b37SCy Schubert return -1; /* try later */ 96325039b37SCy Schubert } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 96425039b37SCy Schubert ub_winsock_tcp_wouldblock( 96525039b37SCy Schubert (dtio->stop_flush_event? 96625039b37SCy Schubert dtio->stop_flush_event:dtio->event), 96725039b37SCy Schubert UB_EV_READ); 96825039b37SCy Schubert return -1; /* try later */ 96925039b37SCy Schubert } 97025039b37SCy Schubert #endif 97125039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 97225039b37SCy Schubert verbosity < 4) 97325039b37SCy Schubert return 0; /* no log retries on low verbosity */ 97425039b37SCy Schubert log_err("dnstap io: output closed, recv %s: %s", to, 97525039b37SCy Schubert strerror(errno)); 97625039b37SCy Schubert /* and close below */ 97725039b37SCy Schubert return 0; 97825039b37SCy Schubert } 97925039b37SCy Schubert if(r == 0) { 98025039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 98125039b37SCy Schubert verbosity < 4) 98225039b37SCy Schubert return 0; /* no log retries on low verbosity */ 98325039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 98425039b37SCy Schubert /* and close below */ 98525039b37SCy Schubert return 0; 98625039b37SCy Schubert } 98725039b37SCy Schubert /* something was received */ 98825039b37SCy Schubert return r; 98925039b37SCy Schubert } 99025039b37SCy Schubert 99125039b37SCy Schubert #ifdef HAVE_SSL 99225039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 99325039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 99425039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 99525039b37SCy Schubert { 99625039b37SCy Schubert int r; 99725039b37SCy Schubert ERR_clear_error(); 99825039b37SCy Schubert r = SSL_read(dtio->ssl, buf, len); 99925039b37SCy Schubert if(r <= 0) { 100025039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 100125039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 100225039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 100325039b37SCy Schubert verbosity < 4) 100425039b37SCy Schubert return 0; /* no log retries on low verbosity */ 100525039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 100625039b37SCy Schubert "other side"); 100725039b37SCy Schubert return 0; 100825039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 100925039b37SCy Schubert /* continue later */ 101025039b37SCy Schubert return -1; 101125039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 101225039b37SCy Schubert (void)dtio_enable_brief_write(dtio); 101325039b37SCy Schubert return -1; 101425039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 101525039b37SCy Schubert #ifdef ECONNRESET 101625039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 101725039b37SCy Schubert errno == ECONNRESET && verbosity < 4) 101825039b37SCy Schubert return 0; /* silence reset by peer */ 101925039b37SCy Schubert #endif 102025039b37SCy Schubert if(errno != 0) 102125039b37SCy Schubert log_err("SSL_read syscall: %s", 102225039b37SCy Schubert strerror(errno)); 102325039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 102425039b37SCy Schubert "other side"); 102525039b37SCy Schubert return 0; 102625039b37SCy Schubert } 102725039b37SCy Schubert log_crypto_err("could not SSL_read"); 102825039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 102925039b37SCy Schubert "other side"); 103025039b37SCy Schubert return 0; 103125039b37SCy Schubert } 103225039b37SCy Schubert return r; 103325039b37SCy Schubert } 103425039b37SCy Schubert #endif /* HAVE_SSL */ 103525039b37SCy Schubert 103625039b37SCy Schubert /** check if the output fd has been closed, 103725039b37SCy Schubert * it returns false if the stream is closed. */ 103825039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio) 103925039b37SCy Schubert { 104025039b37SCy Schubert /* we don't want to read any packets, but if there are we can 104125039b37SCy Schubert * discard the input (ignore it). Ignore of unknown (control) 104225039b37SCy Schubert * packets is okay for the framestream protocol. And also, the 104325039b37SCy Schubert * read call can return that the stream has been closed by the 104425039b37SCy Schubert * other side. */ 104525039b37SCy Schubert uint8_t buf[1024]; 104625039b37SCy Schubert int r = -1; 104725039b37SCy Schubert 104825039b37SCy Schubert 104925039b37SCy Schubert if(dtio->fd == -1) return 0; 105025039b37SCy Schubert 105125039b37SCy Schubert while(r != 0) { 105225039b37SCy Schubert /* not interested in buffer content, overwrite */ 105325039b37SCy Schubert r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 105425039b37SCy Schubert if(r == -1) 105525039b37SCy Schubert return 1; 105625039b37SCy Schubert } 105725039b37SCy Schubert /* the other end has been closed */ 105825039b37SCy Schubert /* close the channel */ 105925039b37SCy Schubert dtio_del_output_event(dtio); 106025039b37SCy Schubert dtio_close_output(dtio); 106125039b37SCy Schubert return 0; 106225039b37SCy Schubert } 106325039b37SCy Schubert 106425039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed, 106525039b37SCy Schubert * 1: valid accept received. */ 106625039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio) 106725039b37SCy Schubert { 106825039b37SCy Schubert int r; 106925039b37SCy Schubert size_t read_frame_done; 107025039b37SCy Schubert while(dtio->read_frame.frame_len_done < 4) { 107125039b37SCy Schubert #ifdef HAVE_SSL 107225039b37SCy Schubert if(dtio->ssl) { 107325039b37SCy Schubert r = ssl_read_bytes(dtio, 107425039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 107525039b37SCy Schubert dtio->read_frame.frame_len_done, 107625039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 107725039b37SCy Schubert } else { 107825039b37SCy Schubert #endif 107925039b37SCy Schubert r = receive_bytes(dtio, 108025039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 108125039b37SCy Schubert dtio->read_frame.frame_len_done, 108225039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 108325039b37SCy Schubert #ifdef HAVE_SSL 108425039b37SCy Schubert } 108525039b37SCy Schubert #endif 108625039b37SCy Schubert if(r == -1) 108725039b37SCy Schubert return -1; /* continue reading */ 108825039b37SCy Schubert if(r == 0) { 108925039b37SCy Schubert /* connection closed */ 109025039b37SCy Schubert goto close_connection; 109125039b37SCy Schubert } 109225039b37SCy Schubert dtio->read_frame.frame_len_done += r; 109325039b37SCy Schubert if(dtio->read_frame.frame_len_done < 4) 109425039b37SCy Schubert return -1; /* continue reading */ 109525039b37SCy Schubert 109625039b37SCy Schubert if(dtio->read_frame.frame_len == 0) { 109725039b37SCy Schubert dtio->read_frame.frame_len_done = 0; 109825039b37SCy Schubert dtio->read_frame.control_frame = 1; 109925039b37SCy Schubert continue; 110025039b37SCy Schubert } 110125039b37SCy Schubert dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 110225039b37SCy Schubert if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 110325039b37SCy Schubert verbose(VERB_OPS, "dnstap: received frame exceeds max " 110425039b37SCy Schubert "length of %d bytes, closing connection", 110525039b37SCy Schubert DTIO_RECV_FRAME_MAX_LEN); 110625039b37SCy Schubert goto close_connection; 110725039b37SCy Schubert } 110825039b37SCy Schubert dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 110925039b37SCy Schubert dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 111025039b37SCy Schubert if(!dtio->read_frame.buf) { 111125039b37SCy Schubert log_err("dnstap io: out of memory (creating read " 111225039b37SCy Schubert "buffer)"); 111325039b37SCy Schubert goto close_connection; 111425039b37SCy Schubert } 111525039b37SCy Schubert } 111625039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 111725039b37SCy Schubert #ifdef HAVE_SSL 111825039b37SCy Schubert if(dtio->ssl) { 111925039b37SCy Schubert r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 112025039b37SCy Schubert dtio->read_frame.buf_count, 112125039b37SCy Schubert dtio->read_frame.buf_cap- 112225039b37SCy Schubert dtio->read_frame.buf_count); 112325039b37SCy Schubert } else { 112425039b37SCy Schubert #endif 112525039b37SCy Schubert r = receive_bytes(dtio, dtio->read_frame.buf+ 112625039b37SCy Schubert dtio->read_frame.buf_count, 112725039b37SCy Schubert dtio->read_frame.buf_cap- 112825039b37SCy Schubert dtio->read_frame.buf_count); 112925039b37SCy Schubert #ifdef HAVE_SSL 113025039b37SCy Schubert } 113125039b37SCy Schubert #endif 113225039b37SCy Schubert if(r == -1) 113325039b37SCy Schubert return -1; /* continue reading */ 113425039b37SCy Schubert if(r == 0) { 113525039b37SCy Schubert /* connection closed */ 113625039b37SCy Schubert goto close_connection; 113725039b37SCy Schubert } 113825039b37SCy Schubert dtio->read_frame.buf_count += r; 113925039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 114025039b37SCy Schubert return -1; /* continue reading */ 114125039b37SCy Schubert } 114225039b37SCy Schubert 114325039b37SCy Schubert /* Complete frame received, check if this is a valid ACCEPT control 114425039b37SCy Schubert * frame. */ 114525039b37SCy Schubert if(dtio->read_frame.frame_len < 4) { 114625039b37SCy Schubert verbose(VERB_OPS, "dnstap: invalid data received"); 114725039b37SCy Schubert goto close_connection; 114825039b37SCy Schubert } 114925039b37SCy Schubert if(sldns_read_uint32(dtio->read_frame.buf) != 115025039b37SCy Schubert FSTRM_CONTROL_FRAME_ACCEPT) { 115125039b37SCy Schubert verbose(VERB_ALGO, "dnstap: invalid control type received, " 115225039b37SCy Schubert "ignored"); 115325039b37SCy Schubert dtio->ready_frame_sent = 0; 115425039b37SCy Schubert dtio->accept_frame_received = 0; 115525039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 115625039b37SCy Schubert return -1; 115725039b37SCy Schubert } 115825039b37SCy Schubert read_frame_done = 4; /* control frame type */ 115925039b37SCy Schubert 116025039b37SCy Schubert /* Iterate over control fields, ignore unknown types. 116125039b37SCy Schubert * Need to be able to read at least 8 bytes (control field type + 116225039b37SCy Schubert * length). */ 116325039b37SCy Schubert while(read_frame_done+8 < dtio->read_frame.frame_len) { 116425039b37SCy Schubert uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 116525039b37SCy Schubert read_frame_done); 116625039b37SCy Schubert uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 116725039b37SCy Schubert read_frame_done + 4); 116825039b37SCy Schubert if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 116925039b37SCy Schubert if(len == strlen(DNSTAP_CONTENT_TYPE) && 117025039b37SCy Schubert read_frame_done+8+len <= 117125039b37SCy Schubert dtio->read_frame.frame_len && 117225039b37SCy Schubert memcmp(dtio->read_frame.buf + read_frame_done + 117325039b37SCy Schubert + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 117425039b37SCy Schubert if(!dtio_control_start_send(dtio)) { 117525039b37SCy Schubert verbose(VERB_OPS, "dnstap io: out of " 117625039b37SCy Schubert "memory while sending START frame"); 117725039b37SCy Schubert goto close_connection; 117825039b37SCy Schubert } 117925039b37SCy Schubert dtio->accept_frame_received = 1; 1180c0caa2e2SCy Schubert if(!dtio_add_output_event_write(dtio)) 1181c0caa2e2SCy Schubert goto close_connection; 118225039b37SCy Schubert return 1; 118325039b37SCy Schubert } else { 118425039b37SCy Schubert /* unknow content type */ 118525039b37SCy Schubert verbose(VERB_ALGO, "dnstap: ACCEPT frame " 118625039b37SCy Schubert "contains unknown content type, " 118725039b37SCy Schubert "closing connection"); 118825039b37SCy Schubert goto close_connection; 118925039b37SCy Schubert } 119025039b37SCy Schubert } 119125039b37SCy Schubert /* unknown option, try next */ 119225039b37SCy Schubert read_frame_done += 8+len; 119325039b37SCy Schubert } 119425039b37SCy Schubert 119525039b37SCy Schubert 119625039b37SCy Schubert close_connection: 119725039b37SCy Schubert dtio_del_output_event(dtio); 119825039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 119925039b37SCy Schubert dtio_close_output(dtio); 120025039b37SCy Schubert return 0; 120125039b37SCy Schubert } 120225039b37SCy Schubert 120325039b37SCy Schubert /** add the output file descriptor event for listening, read only */ 120425039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio) 120525039b37SCy Schubert { 120625039b37SCy Schubert if(!dtio->event) 120725039b37SCy Schubert return 0; 120825039b37SCy Schubert if(dtio->event_added && !dtio->event_added_is_write) 120925039b37SCy Schubert return 1; 121025039b37SCy Schubert /* we have to (re-)register the event */ 121125039b37SCy Schubert if(dtio->event_added) 121225039b37SCy Schubert ub_event_del(dtio->event); 121325039b37SCy Schubert ub_event_del_bits(dtio->event, UB_EV_WRITE); 121425039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 121525039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 121625039b37SCy Schubert dtio->event_added = 0; 121725039b37SCy Schubert dtio->event_added_is_write = 0; 121825039b37SCy Schubert /* close output and start reattempts to open it */ 121925039b37SCy Schubert dtio_close_output(dtio); 122025039b37SCy Schubert return 0; 122125039b37SCy Schubert } 122225039b37SCy Schubert dtio->event_added = 1; 122325039b37SCy Schubert dtio->event_added_is_write = 0; 122425039b37SCy Schubert return 1; 122525039b37SCy Schubert } 122625039b37SCy Schubert 122725039b37SCy Schubert /** add the output file descriptor event for listening, read and write */ 122825039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio) 122925039b37SCy Schubert { 123025039b37SCy Schubert if(!dtio->event) 123125039b37SCy Schubert return 0; 123225039b37SCy Schubert if(dtio->event_added && dtio->event_added_is_write) 123325039b37SCy Schubert return 1; 123425039b37SCy Schubert /* we have to (re-)register the event */ 123525039b37SCy Schubert if(dtio->event_added) 123625039b37SCy Schubert ub_event_del(dtio->event); 123725039b37SCy Schubert ub_event_add_bits(dtio->event, UB_EV_WRITE); 123825039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 123925039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 124025039b37SCy Schubert dtio->event_added = 0; 124125039b37SCy Schubert dtio->event_added_is_write = 0; 124225039b37SCy Schubert /* close output and start reattempts to open it */ 124325039b37SCy Schubert dtio_close_output(dtio); 124425039b37SCy Schubert return 0; 124525039b37SCy Schubert } 124625039b37SCy Schubert dtio->event_added = 1; 124725039b37SCy Schubert dtio->event_added_is_write = 1; 124825039b37SCy Schubert return 1; 124925039b37SCy Schubert } 125025039b37SCy Schubert 125125039b37SCy Schubert /** put the dtio thread to sleep */ 125225039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio) 125325039b37SCy Schubert { 125425039b37SCy Schubert /* unregister the event polling for write, because there is 125525039b37SCy Schubert * nothing to be written */ 125625039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 125725039b37SCy Schubert } 125825039b37SCy Schubert 125925039b37SCy Schubert #ifdef HAVE_SSL 126025039b37SCy Schubert /** enable the brief read condition */ 126125039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio) 126225039b37SCy Schubert { 126325039b37SCy Schubert dtio->ssl_brief_read = 1; 126425039b37SCy Schubert if(dtio->stop_flush_event) { 126525039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 126625039b37SCy Schubert ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 126725039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 126825039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 126925039b37SCy Schubert return 0; 127025039b37SCy Schubert } 127125039b37SCy Schubert return 1; 127225039b37SCy Schubert } 127325039b37SCy Schubert return dtio_add_output_event_read(dtio); 127425039b37SCy Schubert } 127525039b37SCy Schubert #endif /* HAVE_SSL */ 127625039b37SCy Schubert 127725039b37SCy Schubert #ifdef HAVE_SSL 127825039b37SCy Schubert /** disable the brief read condition */ 127925039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio) 128025039b37SCy Schubert { 128125039b37SCy Schubert dtio->ssl_brief_read = 0; 128225039b37SCy Schubert if(dtio->stop_flush_event) { 128325039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 128425039b37SCy Schubert ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 128525039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 128625039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 128725039b37SCy Schubert return 0; 128825039b37SCy Schubert } 128925039b37SCy Schubert return 1; 129025039b37SCy Schubert } 129125039b37SCy Schubert return dtio_add_output_event_write(dtio); 129225039b37SCy Schubert } 129325039b37SCy Schubert #endif /* HAVE_SSL */ 129425039b37SCy Schubert 129525039b37SCy Schubert #ifdef HAVE_SSL 129625039b37SCy Schubert /** enable the brief write condition */ 129725039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio) 129825039b37SCy Schubert { 129925039b37SCy Schubert dtio->ssl_brief_write = 1; 130025039b37SCy Schubert return dtio_add_output_event_write(dtio); 130125039b37SCy Schubert } 130225039b37SCy Schubert #endif /* HAVE_SSL */ 130325039b37SCy Schubert 130425039b37SCy Schubert #ifdef HAVE_SSL 130525039b37SCy Schubert /** disable the brief write condition */ 130625039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio) 130725039b37SCy Schubert { 130825039b37SCy Schubert dtio->ssl_brief_write = 0; 130925039b37SCy Schubert return dtio_add_output_event_read(dtio); 131025039b37SCy Schubert } 131125039b37SCy Schubert #endif /* HAVE_SSL */ 131225039b37SCy Schubert 131325039b37SCy Schubert #ifdef HAVE_SSL 131425039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/ 131525039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 131625039b37SCy Schubert { 131725039b37SCy Schubert if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 131825039b37SCy Schubert /* verification */ 131925039b37SCy Schubert if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 132025039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 132125039b37SCy Schubert if(!x) { 132225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 132325039b37SCy Schubert "connection failed no certificate", 132425039b37SCy Schubert dtio->ip_str); 132525039b37SCy Schubert return 0; 132625039b37SCy Schubert } 132725039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer certificate", 132825039b37SCy Schubert x); 132925039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 133025039b37SCy Schubert if(SSL_get0_peername(dtio->ssl)) { 133125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 133225039b37SCy Schubert "connection to %s authenticated", 133325039b37SCy Schubert dtio->ip_str, 133425039b37SCy Schubert SSL_get0_peername(dtio->ssl)); 133525039b37SCy Schubert } else { 133625039b37SCy Schubert #endif 133725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 133825039b37SCy Schubert "connection authenticated", 133925039b37SCy Schubert dtio->ip_str); 134025039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 134125039b37SCy Schubert } 134225039b37SCy Schubert #endif 134325039b37SCy Schubert X509_free(x); 134425039b37SCy Schubert } else { 134525039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 134625039b37SCy Schubert if(x) { 134725039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer " 134825039b37SCy Schubert "certificate", x); 134925039b37SCy Schubert X509_free(x); 135025039b37SCy Schubert } 135125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 135225039b37SCy Schubert "failed: failed to authenticate", 135325039b37SCy Schubert dtio->ip_str); 135425039b37SCy Schubert return 0; 135525039b37SCy Schubert } 135625039b37SCy Schubert } else { 135725039b37SCy Schubert /* unauthenticated, the verify peer flag was not set 135825039b37SCy Schubert * in ssl when the ssl object was created from ssl_ctx */ 135925039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 136025039b37SCy Schubert dtio->ip_str); 136125039b37SCy Schubert } 136225039b37SCy Schubert return 1; 136325039b37SCy Schubert } 136425039b37SCy Schubert #endif /* HAVE_SSL */ 136525039b37SCy Schubert 136625039b37SCy Schubert #ifdef HAVE_SSL 136725039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */ 136825039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio, 136925039b37SCy Schubert struct stop_flush_info* info) 137025039b37SCy Schubert { 137125039b37SCy Schubert int r; 137225039b37SCy Schubert if(dtio->ssl_brief_read) { 137325039b37SCy Schubert /* assume the brief read condition is satisfied, 137425039b37SCy Schubert * if we need more or again, we can set it again */ 137525039b37SCy Schubert if(!dtio_disable_brief_read(dtio)) { 137625039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 137725039b37SCy Schubert return 0; 137825039b37SCy Schubert } 137925039b37SCy Schubert } 138025039b37SCy Schubert if(dtio->ssl_handshake_done) 138125039b37SCy Schubert return 1; 138225039b37SCy Schubert 138325039b37SCy Schubert ERR_clear_error(); 138425039b37SCy Schubert r = SSL_do_handshake(dtio->ssl); 138525039b37SCy Schubert if(r != 1) { 138625039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 138725039b37SCy Schubert if(want == SSL_ERROR_WANT_READ) { 138825039b37SCy Schubert /* we want to read on the connection */ 138925039b37SCy Schubert if(!dtio_enable_brief_read(dtio)) { 139025039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 139125039b37SCy Schubert return 0; 139225039b37SCy Schubert } 139325039b37SCy Schubert return 0; 139425039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 139525039b37SCy Schubert /* we want to write on the connection */ 139625039b37SCy Schubert return 0; 139725039b37SCy Schubert } else if(r == 0) { 139825039b37SCy Schubert /* closed */ 139925039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 140025039b37SCy Schubert dtio_del_output_event(dtio); 140125039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 140225039b37SCy Schubert dtio_close_output(dtio); 140325039b37SCy Schubert return 0; 140425039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 140525039b37SCy Schubert /* SYSCALL and errno==0 means closed uncleanly */ 140625039b37SCy Schubert int silent = 0; 140725039b37SCy Schubert #ifdef EPIPE 140825039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 140925039b37SCy Schubert silent = 1; /* silence 'broken pipe' */ 141025039b37SCy Schubert #endif 141125039b37SCy Schubert #ifdef ECONNRESET 141225039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 141325039b37SCy Schubert silent = 1; /* silence reset by peer */ 141425039b37SCy Schubert #endif 141525039b37SCy Schubert if(errno == 0) 141625039b37SCy Schubert silent = 1; 141725039b37SCy Schubert if(!silent) 141825039b37SCy Schubert log_err("dnstap io, SSL_handshake syscall: %s", 141925039b37SCy Schubert strerror(errno)); 142025039b37SCy Schubert /* closed */ 142125039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 142225039b37SCy Schubert dtio_del_output_event(dtio); 142325039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 142425039b37SCy Schubert dtio_close_output(dtio); 142525039b37SCy Schubert return 0; 142625039b37SCy Schubert } else { 142725039b37SCy Schubert unsigned long err = ERR_get_error(); 142825039b37SCy Schubert if(!squelch_err_ssl_handshake(err)) { 142925039b37SCy Schubert log_crypto_err_code("dnstap io, ssl handshake failed", 143025039b37SCy Schubert err); 143125039b37SCy Schubert verbose(VERB_OPS, "dnstap io, ssl handshake failed " 143225039b37SCy Schubert "from %s", dtio->ip_str); 143325039b37SCy Schubert } 143425039b37SCy Schubert /* closed */ 143525039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 143625039b37SCy Schubert dtio_del_output_event(dtio); 143725039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 143825039b37SCy Schubert dtio_close_output(dtio); 143925039b37SCy Schubert return 0; 144025039b37SCy Schubert } 144125039b37SCy Schubert 144225039b37SCy Schubert } 144325039b37SCy Schubert /* check peer verification */ 144425039b37SCy Schubert dtio->ssl_handshake_done = 1; 144525039b37SCy Schubert 144625039b37SCy Schubert if(!dtio_ssl_check_peer(dtio)) { 144725039b37SCy Schubert /* closed */ 144825039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 144925039b37SCy Schubert dtio_del_output_event(dtio); 145025039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 145125039b37SCy Schubert dtio_close_output(dtio); 145225039b37SCy Schubert return 0; 145325039b37SCy Schubert } 145425039b37SCy Schubert return 1; 145525039b37SCy Schubert } 145625039b37SCy Schubert #endif /* HAVE_SSL */ 145725039b37SCy Schubert 145825039b37SCy Schubert /** callback for the dnstap events, to write to the output */ 145925039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 146025039b37SCy Schubert { 146125039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 146225039b37SCy Schubert int i; 146325039b37SCy Schubert 146425039b37SCy Schubert if(dtio->check_nb_connect) { 146525039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 146625039b37SCy Schubert if(connect_err == -1) { 146725039b37SCy Schubert /* close the channel */ 146825039b37SCy Schubert dtio_del_output_event(dtio); 146925039b37SCy Schubert dtio_close_output(dtio); 147025039b37SCy Schubert return; 147125039b37SCy Schubert } else if(connect_err == 0) { 147225039b37SCy Schubert /* try again later */ 147325039b37SCy Schubert return; 147425039b37SCy Schubert } 147525039b37SCy Schubert /* nonblocking connect check passed, continue */ 147625039b37SCy Schubert } 147725039b37SCy Schubert 147825039b37SCy Schubert #ifdef HAVE_SSL 147925039b37SCy Schubert if(dtio->ssl && 148025039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 148125039b37SCy Schubert if(!dtio_ssl_handshake(dtio, NULL)) 148225039b37SCy Schubert return; 148325039b37SCy Schubert } 148425039b37SCy Schubert #endif 148525039b37SCy Schubert 148625039b37SCy Schubert if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 148725039b37SCy Schubert if(dtio->ssl_brief_write) 148825039b37SCy Schubert (void)dtio_disable_brief_write(dtio); 148925039b37SCy Schubert if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 149025039b37SCy Schubert if(dtio_read_accept_frame(dtio) <= 0) 149125039b37SCy Schubert return; 149225039b37SCy Schubert } else if(!dtio_check_close(dtio)) 149325039b37SCy Schubert return; 149425039b37SCy Schubert } 149525039b37SCy Schubert 149625039b37SCy Schubert /* loop to process a number of messages. This improves throughput, 149725039b37SCy Schubert * because selecting on write-event if not needed for busy messages 149825039b37SCy Schubert * (dnstap log) generation and if they need to all be written back. 149925039b37SCy Schubert * The write event is usually not blocked up. But not forever, 150025039b37SCy Schubert * because the event loop needs to stay responsive for other events. 150125039b37SCy Schubert * If there are no (more) messages, or if the output buffers get 150225039b37SCy Schubert * full, it returns out of the loop. */ 150325039b37SCy Schubert for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 150425039b37SCy Schubert /* see if there are messages that need writing */ 150525039b37SCy Schubert if(!dtio->cur_msg) { 150625039b37SCy Schubert if(!dtio_find_msg(dtio)) { 150725039b37SCy Schubert if(i == 0) { 150825039b37SCy Schubert /* no messages on the first iteration, 150925039b37SCy Schubert * the queues are all empty */ 151025039b37SCy Schubert dtio_sleep(dtio); 151125039b37SCy Schubert } 151225039b37SCy Schubert return; /* nothing to do */ 151325039b37SCy Schubert } 151425039b37SCy Schubert } 151525039b37SCy Schubert 151625039b37SCy Schubert /* write it */ 151725039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 151825039b37SCy Schubert if(!dtio_write_more(dtio)) 151925039b37SCy Schubert return; 152025039b37SCy Schubert } 152125039b37SCy Schubert 152225039b37SCy Schubert /* done with the current message */ 152325039b37SCy Schubert dtio_cur_msg_free(dtio); 152425039b37SCy Schubert 152525039b37SCy Schubert /* If this is a bidirectional stream the first message will be 152625039b37SCy Schubert * the READY control frame. We can only continue writing after 152725039b37SCy Schubert * receiving an ACCEPT control frame. */ 152825039b37SCy Schubert if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 152925039b37SCy Schubert dtio->ready_frame_sent = 1; 153025039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 153125039b37SCy Schubert break; 153225039b37SCy Schubert } 153325039b37SCy Schubert } 153425039b37SCy Schubert } 153525039b37SCy Schubert 153625039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */ 153725039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 153825039b37SCy Schubert { 153925039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 154025039b37SCy Schubert uint8_t cmd; 154125039b37SCy Schubert ssize_t r; 154225039b37SCy Schubert if(dtio->want_to_exit) 154325039b37SCy Schubert return; 154425039b37SCy Schubert r = read(fd, &cmd, sizeof(cmd)); 154525039b37SCy Schubert if(r == -1) { 154625039b37SCy Schubert #ifndef USE_WINSOCK 154725039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 154825039b37SCy Schubert return; /* ignore this */ 154925039b37SCy Schubert #else 155025039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 155125039b37SCy Schubert return; 155225039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 155325039b37SCy Schubert return; 155425039b37SCy Schubert #endif 1555c0caa2e2SCy Schubert log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 155625039b37SCy Schubert /* and then fall through to quit the thread */ 155725039b37SCy Schubert } else if(r == 0) { 155825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 155925039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 156025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 156125039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 156225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 156325039b37SCy Schubert 156425039b37SCy Schubert if(dtio->is_bidirectional && !dtio->accept_frame_received) { 156525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 156625039b37SCy Schubert "waiting for ACCEPT control frame"); 156725039b37SCy Schubert return; 156825039b37SCy Schubert } 156925039b37SCy Schubert 157025039b37SCy Schubert /* reregister event */ 157125039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 157225039b37SCy Schubert return; 157325039b37SCy Schubert return; 157425039b37SCy Schubert } else if(r == 1) { 157525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 157625039b37SCy Schubert } 157725039b37SCy Schubert dtio->want_to_exit = 1; 157825039b37SCy Schubert if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 157925039b37SCy Schubert != 0) { 158025039b37SCy Schubert log_err("dnstap io: could not loopexit"); 158125039b37SCy Schubert } 158225039b37SCy Schubert } 158325039b37SCy Schubert 158425039b37SCy Schubert #ifndef THREADS_DISABLED 158525039b37SCy Schubert /** setup the event base for the dnstap io thread */ 158625039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 158725039b37SCy Schubert struct timeval* now) 158825039b37SCy Schubert { 158925039b37SCy Schubert memset(now, 0, sizeof(*now)); 159025039b37SCy Schubert dtio->event_base = ub_default_event_base(0, secs, now); 159125039b37SCy Schubert if(!dtio->event_base) { 159225039b37SCy Schubert fatal_exit("dnstap io: could not create event_base"); 159325039b37SCy Schubert } 159425039b37SCy Schubert } 159525039b37SCy Schubert #endif /* THREADS_DISABLED */ 159625039b37SCy Schubert 159725039b37SCy Schubert /** setup the cmd event for dnstap io */ 159825039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio) 159925039b37SCy Schubert { 160025039b37SCy Schubert struct ub_event* cmdev; 160125039b37SCy Schubert fd_set_nonblock(dtio->commandpipe[0]); 160225039b37SCy Schubert cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 160325039b37SCy Schubert UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 160425039b37SCy Schubert if(!cmdev) { 160525039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 160625039b37SCy Schubert } 160725039b37SCy Schubert dtio->command_event = cmdev; 160825039b37SCy Schubert if(ub_event_add(cmdev, NULL) != 0) { 160925039b37SCy Schubert fatal_exit("dnstap io: out of memory (adding event)"); 161025039b37SCy Schubert } 161125039b37SCy Schubert } 161225039b37SCy Schubert 161325039b37SCy Schubert /** setup the reconnect event for dnstap io */ 161425039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio) 161525039b37SCy Schubert { 161625039b37SCy Schubert dtio_reconnect_clear(dtio); 161725039b37SCy Schubert dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 161825039b37SCy Schubert UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 161925039b37SCy Schubert if(!dtio->reconnect_timer) { 162025039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 162125039b37SCy Schubert } 162225039b37SCy Schubert } 162325039b37SCy Schubert 162425039b37SCy Schubert /** 162525039b37SCy Schubert * structure to keep track of information during stop flush 162625039b37SCy Schubert */ 162725039b37SCy Schubert struct stop_flush_info { 162825039b37SCy Schubert /** the event base during stop flush */ 162925039b37SCy Schubert struct ub_event_base* base; 163025039b37SCy Schubert /** did we already want to exit this stop-flush event base */ 163125039b37SCy Schubert int want_to_exit_flush; 163225039b37SCy Schubert /** has the timer fired */ 163325039b37SCy Schubert int timer_done; 163425039b37SCy Schubert /** the dtio */ 163525039b37SCy Schubert struct dt_io_thread* dtio; 163625039b37SCy Schubert /** the stop control frame */ 163725039b37SCy Schubert void* stop_frame; 163825039b37SCy Schubert /** length of the stop frame */ 163925039b37SCy Schubert size_t stop_frame_len; 164025039b37SCy Schubert /** how much we have done of the stop frame */ 164125039b37SCy Schubert size_t stop_frame_done; 164225039b37SCy Schubert }; 164325039b37SCy Schubert 164425039b37SCy Schubert /** exit the stop flush base */ 164525039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info) 164625039b37SCy Schubert { 164725039b37SCy Schubert if(info->want_to_exit_flush) 164825039b37SCy Schubert return; 164925039b37SCy Schubert info->want_to_exit_flush = 1; 165025039b37SCy Schubert if(ub_event_base_loopexit(info->base) != 0) { 165125039b37SCy Schubert log_err("dnstap io: could not loopexit"); 165225039b37SCy Schubert } 165325039b37SCy Schubert } 165425039b37SCy Schubert 165525039b37SCy Schubert /** send the stop control, 165625039b37SCy Schubert * return true if completed the frame. */ 165725039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info) 165825039b37SCy Schubert { 165925039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 166025039b37SCy Schubert int r; 166125039b37SCy Schubert if(info->stop_frame_done >= info->stop_frame_len) 166225039b37SCy Schubert return 1; 166325039b37SCy Schubert r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 166425039b37SCy Schubert info->stop_frame_done, info->stop_frame_len - 166525039b37SCy Schubert info->stop_frame_done); 166625039b37SCy Schubert if(r == -1) { 166725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 166825039b37SCy Schubert dtio_stop_flush_exit(info); 166925039b37SCy Schubert return 0; 167025039b37SCy Schubert } 167125039b37SCy Schubert if(r == 0) { 167225039b37SCy Schubert /* try again later, or timeout */ 167325039b37SCy Schubert return 0; 167425039b37SCy Schubert } 167525039b37SCy Schubert info->stop_frame_done += r; 167625039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) 167725039b37SCy Schubert return 0; /* not done yet */ 167825039b37SCy Schubert return 1; 167925039b37SCy Schubert } 168025039b37SCy Schubert 168125039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 168225039b37SCy Schubert void* arg) 168325039b37SCy Schubert { 168425039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 168525039b37SCy Schubert if(info->want_to_exit_flush) 168625039b37SCy Schubert return; 168725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 168825039b37SCy Schubert info->timer_done = 1; 168925039b37SCy Schubert dtio_stop_flush_exit(info); 169025039b37SCy Schubert } 169125039b37SCy Schubert 169225039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 169325039b37SCy Schubert { 169425039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 169525039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 169625039b37SCy Schubert if(info->want_to_exit_flush) 169725039b37SCy Schubert return; 169825039b37SCy Schubert if(dtio->check_nb_connect) { 169925039b37SCy Schubert /* we don't start the stop_flush if connect still 170025039b37SCy Schubert * in progress, but the check code is here, just in case */ 170125039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 170225039b37SCy Schubert if(connect_err == -1) { 170325039b37SCy Schubert /* close the channel, exit the stop flush */ 170425039b37SCy Schubert dtio_stop_flush_exit(info); 170525039b37SCy Schubert dtio_del_output_event(dtio); 170625039b37SCy Schubert dtio_close_output(dtio); 170725039b37SCy Schubert return; 170825039b37SCy Schubert } else if(connect_err == 0) { 170925039b37SCy Schubert /* try again later */ 171025039b37SCy Schubert return; 171125039b37SCy Schubert } 171225039b37SCy Schubert /* nonblocking connect check passed, continue */ 171325039b37SCy Schubert } 171425039b37SCy Schubert #ifdef HAVE_SSL 171525039b37SCy Schubert if(dtio->ssl && 171625039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 171725039b37SCy Schubert if(!dtio_ssl_handshake(dtio, info)) 171825039b37SCy Schubert return; 171925039b37SCy Schubert } 172025039b37SCy Schubert #endif 172125039b37SCy Schubert 172225039b37SCy Schubert if((bits&UB_EV_READ)) { 172325039b37SCy Schubert if(!dtio_check_close(dtio)) { 172425039b37SCy Schubert if(dtio->fd == -1) { 172525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 172625039b37SCy Schubert "stop flush: output closed"); 172725039b37SCy Schubert dtio_stop_flush_exit(info); 172825039b37SCy Schubert } 172925039b37SCy Schubert return; 173025039b37SCy Schubert } 173125039b37SCy Schubert } 173225039b37SCy Schubert /* write remainder of last frame */ 173325039b37SCy Schubert if(dtio->cur_msg) { 173425039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 173525039b37SCy Schubert if(!dtio_write_more(dtio)) { 173625039b37SCy Schubert if(dtio->fd == -1) { 173725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 173825039b37SCy Schubert "stop flush: output closed"); 173925039b37SCy Schubert dtio_stop_flush_exit(info); 174025039b37SCy Schubert } 174125039b37SCy Schubert return; 174225039b37SCy Schubert } 174325039b37SCy Schubert } 174425039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 174525039b37SCy Schubert "last frame"); 174625039b37SCy Schubert dtio_cur_msg_free(dtio); 174725039b37SCy Schubert } 174825039b37SCy Schubert /* write stop frame */ 174925039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) { 175025039b37SCy Schubert if(!dtio_control_stop_send(info)) 175125039b37SCy Schubert return; 175225039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 175325039b37SCy Schubert "stop control frame"); 175425039b37SCy Schubert } 175525039b37SCy Schubert /* when last frame and stop frame are sent, exit */ 175625039b37SCy Schubert dtio_stop_flush_exit(info); 175725039b37SCy Schubert } 175825039b37SCy Schubert 175925039b37SCy Schubert /** flush at end, last packet and stop control */ 176025039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio) 176125039b37SCy Schubert { 176225039b37SCy Schubert /* briefly attempt to flush the previous packet to the output, 176325039b37SCy Schubert * this could be a partial packet, or even the start control frame */ 176425039b37SCy Schubert time_t secs = 0; 176525039b37SCy Schubert struct timeval now; 176625039b37SCy Schubert struct stop_flush_info info; 176725039b37SCy Schubert struct timeval tv; 176825039b37SCy Schubert struct ub_event* timer, *stopev; 176925039b37SCy Schubert 177025039b37SCy Schubert if(dtio->fd == -1 || dtio->check_nb_connect) { 177125039b37SCy Schubert /* no connection or we have just connected, so nothing is 177225039b37SCy Schubert * sent yet, so nothing to stop or flush */ 177325039b37SCy Schubert return; 177425039b37SCy Schubert } 177525039b37SCy Schubert if(dtio->ssl && !dtio->ssl_handshake_done) { 177625039b37SCy Schubert /* no SSL connection has been established yet */ 177725039b37SCy Schubert return; 177825039b37SCy Schubert } 177925039b37SCy Schubert 178025039b37SCy Schubert memset(&info, 0, sizeof(info)); 178125039b37SCy Schubert memset(&now, 0, sizeof(now)); 178225039b37SCy Schubert info.dtio = dtio; 178325039b37SCy Schubert info.base = ub_default_event_base(0, &secs, &now); 178425039b37SCy Schubert if(!info.base) { 178525039b37SCy Schubert log_err("dnstap io: malloc failure"); 178625039b37SCy Schubert return; 178725039b37SCy Schubert } 178825039b37SCy Schubert timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 178925039b37SCy Schubert &dtio_stop_timer_cb, &info); 179025039b37SCy Schubert if(!timer) { 179125039b37SCy Schubert log_err("dnstap io: malloc failure"); 179225039b37SCy Schubert ub_event_base_free(info.base); 179325039b37SCy Schubert return; 179425039b37SCy Schubert } 179525039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 179625039b37SCy Schubert tv.tv_sec = 2; 179725039b37SCy Schubert if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 179825039b37SCy Schubert &tv) != 0) { 179925039b37SCy Schubert log_err("dnstap io: cannot event_timer_add"); 180025039b37SCy Schubert ub_event_free(timer); 180125039b37SCy Schubert ub_event_base_free(info.base); 180225039b37SCy Schubert return; 180325039b37SCy Schubert } 180425039b37SCy Schubert stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 180525039b37SCy Schubert UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 180625039b37SCy Schubert if(!stopev) { 180725039b37SCy Schubert log_err("dnstap io: malloc failure"); 180825039b37SCy Schubert ub_timer_del(timer); 180925039b37SCy Schubert ub_event_free(timer); 181025039b37SCy Schubert ub_event_base_free(info.base); 181125039b37SCy Schubert return; 181225039b37SCy Schubert } 181325039b37SCy Schubert if(ub_event_add(stopev, NULL) != 0) { 181425039b37SCy Schubert log_err("dnstap io: cannot event_add"); 181525039b37SCy Schubert ub_event_free(stopev); 181625039b37SCy Schubert ub_timer_del(timer); 181725039b37SCy Schubert ub_event_free(timer); 181825039b37SCy Schubert ub_event_base_free(info.base); 181925039b37SCy Schubert return; 182025039b37SCy Schubert } 182125039b37SCy Schubert info.stop_frame = fstrm_create_control_frame_stop( 182225039b37SCy Schubert &info.stop_frame_len); 182325039b37SCy Schubert if(!info.stop_frame) { 182425039b37SCy Schubert log_err("dnstap io: malloc failure"); 182525039b37SCy Schubert ub_event_del(stopev); 182625039b37SCy Schubert ub_event_free(stopev); 182725039b37SCy Schubert ub_timer_del(timer); 182825039b37SCy Schubert ub_event_free(timer); 182925039b37SCy Schubert ub_event_base_free(info.base); 183025039b37SCy Schubert return; 183125039b37SCy Schubert } 183225039b37SCy Schubert dtio->stop_flush_event = stopev; 183325039b37SCy Schubert 183425039b37SCy Schubert /* wait briefly, or until finished */ 183525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush started"); 183625039b37SCy Schubert if(ub_event_base_dispatch(info.base) < 0) { 183725039b37SCy Schubert log_err("dnstap io: dispatch flush failed, errno is %s", 183825039b37SCy Schubert strerror(errno)); 183925039b37SCy Schubert } 184025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush ended"); 184125039b37SCy Schubert free(info.stop_frame); 184225039b37SCy Schubert dtio->stop_flush_event = NULL; 184325039b37SCy Schubert ub_event_del(stopev); 184425039b37SCy Schubert ub_event_free(stopev); 184525039b37SCy Schubert ub_timer_del(timer); 184625039b37SCy Schubert ub_event_free(timer); 184725039b37SCy Schubert ub_event_base_free(info.base); 184825039b37SCy Schubert } 184925039b37SCy Schubert 185025039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */ 185125039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio) 185225039b37SCy Schubert { 185325039b37SCy Schubert dtio_control_stop_flush(dtio); 185425039b37SCy Schubert dtio_del_output_event(dtio); 185525039b37SCy Schubert dtio_close_output(dtio); 185625039b37SCy Schubert ub_event_del(dtio->command_event); 185725039b37SCy Schubert ub_event_free(dtio->command_event); 185825039b37SCy Schubert #ifndef USE_WINSOCK 185925039b37SCy Schubert close(dtio->commandpipe[0]); 186025039b37SCy Schubert #else 186125039b37SCy Schubert _close(dtio->commandpipe[0]); 186225039b37SCy Schubert #endif 186325039b37SCy Schubert dtio->commandpipe[0] = -1; 186425039b37SCy Schubert dtio_reconnect_del(dtio); 186525039b37SCy Schubert ub_event_free(dtio->reconnect_timer); 186625039b37SCy Schubert dtio_cur_msg_free(dtio); 186725039b37SCy Schubert #ifndef THREADS_DISABLED 186825039b37SCy Schubert ub_event_base_free(dtio->event_base); 186925039b37SCy Schubert #endif 187025039b37SCy Schubert } 187125039b37SCy Schubert 187225039b37SCy Schubert /** setup a start control message */ 187325039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio) 187425039b37SCy Schubert { 187525039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 187625039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 187725039b37SCy Schubert &dtio->cur_msg_len); 187825039b37SCy Schubert if(!dtio->cur_msg) { 187925039b37SCy Schubert return 0; 188025039b37SCy Schubert } 188125039b37SCy Schubert /* setup to send the control message */ 188225039b37SCy Schubert /* set that the buffer needs to be sent, but the length 188325039b37SCy Schubert * of that buffer is already written, that way the buffer can 188425039b37SCy Schubert * start with 0 length and then the length of the control frame 188525039b37SCy Schubert * in it */ 188625039b37SCy Schubert dtio->cur_msg_done = 0; 188725039b37SCy Schubert dtio->cur_msg_len_done = 4; 188825039b37SCy Schubert return 1; 188925039b37SCy Schubert } 189025039b37SCy Schubert 189125039b37SCy Schubert /** setup a ready control message */ 189225039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio) 189325039b37SCy Schubert { 189425039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 189525039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 189625039b37SCy Schubert &dtio->cur_msg_len); 189725039b37SCy Schubert if(!dtio->cur_msg) { 189825039b37SCy Schubert return 0; 189925039b37SCy Schubert } 190025039b37SCy Schubert /* setup to send the control message */ 190125039b37SCy Schubert /* set that the buffer needs to be sent, but the length 190225039b37SCy Schubert * of that buffer is already written, that way the buffer can 190325039b37SCy Schubert * start with 0 length and then the length of the control frame 190425039b37SCy Schubert * in it */ 190525039b37SCy Schubert dtio->cur_msg_done = 0; 190625039b37SCy Schubert dtio->cur_msg_len_done = 4; 190725039b37SCy Schubert return 1; 190825039b37SCy Schubert } 190925039b37SCy Schubert 191025039b37SCy Schubert /** open the output file descriptor for af_local */ 191125039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio) 191225039b37SCy Schubert { 191325039b37SCy Schubert #ifdef HAVE_SYS_UN_H 191425039b37SCy Schubert struct sockaddr_un s; 191525039b37SCy Schubert dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 191625039b37SCy Schubert if(dtio->fd == -1) { 191725039b37SCy Schubert log_err("dnstap io: failed to create socket: %s", 1918c0caa2e2SCy Schubert sock_strerror(errno)); 191925039b37SCy Schubert return 0; 192025039b37SCy Schubert } 192125039b37SCy Schubert memset(&s, 0, sizeof(s)); 192225039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 192325039b37SCy Schubert /* this member exists on BSDs, not Linux */ 192425039b37SCy Schubert s.sun_len = (unsigned)sizeof(s); 192525039b37SCy Schubert #endif 192625039b37SCy Schubert s.sun_family = AF_LOCAL; 192725039b37SCy Schubert /* length is 92-108, 104 on FreeBSD */ 192825039b37SCy Schubert (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 192925039b37SCy Schubert fd_set_nonblock(dtio->fd); 193025039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 193125039b37SCy Schubert == -1) { 193225039b37SCy Schubert char* to = dtio->socket_path; 1933c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1934c0caa2e2SCy Schubert verbosity < 4) { 1935c0caa2e2SCy Schubert dtio_close_fd(dtio); 1936c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1937c0caa2e2SCy Schubert } 193825039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 1939c0caa2e2SCy Schubert to, sock_strerror(errno)); 194025039b37SCy Schubert dtio_close_fd(dtio); 194125039b37SCy Schubert return 0; 194225039b37SCy Schubert } 194325039b37SCy Schubert return 1; 194425039b37SCy Schubert #else 194525039b37SCy Schubert log_err("cannot create af_local socket"); 194625039b37SCy Schubert return 0; 194725039b37SCy Schubert #endif /* HAVE_SYS_UN_H */ 194825039b37SCy Schubert } 194925039b37SCy Schubert 195025039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */ 195125039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio) 195225039b37SCy Schubert { 195325039b37SCy Schubert struct sockaddr_storage addr; 195425039b37SCy Schubert socklen_t addrlen; 195525039b37SCy Schubert memset(&addr, 0, sizeof(addr)); 195625039b37SCy Schubert addrlen = (socklen_t)sizeof(addr); 195725039b37SCy Schubert 195825039b37SCy Schubert if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 195925039b37SCy Schubert log_err("could not parse IP '%s'", dtio->ip_str); 196025039b37SCy Schubert return 0; 196125039b37SCy Schubert } 196225039b37SCy Schubert dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 196325039b37SCy Schubert if(dtio->fd == -1) { 1964c0caa2e2SCy Schubert log_err("can't create socket: %s", sock_strerror(errno)); 196525039b37SCy Schubert return 0; 196625039b37SCy Schubert } 196725039b37SCy Schubert fd_set_nonblock(dtio->fd); 196825039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 196925039b37SCy Schubert if(errno == EINPROGRESS) 197025039b37SCy Schubert return 1; /* wait until connect done*/ 1971c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1972c0caa2e2SCy Schubert verbosity < 4) { 1973c0caa2e2SCy Schubert dtio_close_fd(dtio); 1974c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1975c0caa2e2SCy Schubert } 197625039b37SCy Schubert #ifndef USE_WINSOCK 197725039b37SCy Schubert if(tcp_connect_errno_needs_log( 197825039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 197925039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 198025039b37SCy Schubert dtio->ip_str, strerror(errno)); 198125039b37SCy Schubert } 198225039b37SCy Schubert #else 198325039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS || 198425039b37SCy Schubert WSAGetLastError() == WSAEWOULDBLOCK) 198525039b37SCy Schubert return 1; /* wait until connect done*/ 198625039b37SCy Schubert if(tcp_connect_errno_needs_log( 198725039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 198825039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 198925039b37SCy Schubert dtio->ip_str, wsa_strerror(WSAGetLastError())); 199025039b37SCy Schubert } 199125039b37SCy Schubert #endif 199225039b37SCy Schubert dtio_close_fd(dtio); 199325039b37SCy Schubert return 0; 199425039b37SCy Schubert } 199525039b37SCy Schubert return 1; 199625039b37SCy Schubert } 199725039b37SCy Schubert 199825039b37SCy Schubert /** setup the SSL structure for new connection */ 199925039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio) 200025039b37SCy Schubert { 200125039b37SCy Schubert dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 200225039b37SCy Schubert if(!dtio->ssl) return 0; 200325039b37SCy Schubert dtio->ssl_handshake_done = 0; 200425039b37SCy Schubert dtio->ssl_brief_read = 0; 200525039b37SCy Schubert 200625039b37SCy Schubert if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 200725039b37SCy Schubert dtio->tls_use_sni)) { 200825039b37SCy Schubert return 0; 200925039b37SCy Schubert } 201025039b37SCy Schubert return 1; 201125039b37SCy Schubert } 201225039b37SCy Schubert 201325039b37SCy Schubert /** open the output file descriptor */ 201425039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio) 201525039b37SCy Schubert { 201625039b37SCy Schubert struct ub_event* ev; 201725039b37SCy Schubert if(dtio->upstream_is_unix) { 201825039b37SCy Schubert if(!dtio_open_output_local(dtio)) { 201925039b37SCy Schubert dtio_reconnect_enable(dtio); 202025039b37SCy Schubert return; 202125039b37SCy Schubert } 202225039b37SCy Schubert } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 202325039b37SCy Schubert if(!dtio_open_output_tcp(dtio)) { 202425039b37SCy Schubert dtio_reconnect_enable(dtio); 202525039b37SCy Schubert return; 202625039b37SCy Schubert } 202725039b37SCy Schubert if(dtio->upstream_is_tls) { 202825039b37SCy Schubert if(!dtio_setup_ssl(dtio)) { 202925039b37SCy Schubert dtio_close_fd(dtio); 203025039b37SCy Schubert dtio_reconnect_enable(dtio); 203125039b37SCy Schubert return; 203225039b37SCy Schubert } 203325039b37SCy Schubert } 203425039b37SCy Schubert } 203525039b37SCy Schubert dtio->check_nb_connect = 1; 203625039b37SCy Schubert 203725039b37SCy Schubert /* the EV_READ is to read ACCEPT control messages, and catch channel 203825039b37SCy Schubert * close. EV_WRITE is to write packets */ 203925039b37SCy Schubert ev = ub_event_new(dtio->event_base, dtio->fd, 204025039b37SCy Schubert UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 204125039b37SCy Schubert dtio); 204225039b37SCy Schubert if(!ev) { 204325039b37SCy Schubert log_err("dnstap io: out of memory"); 204425039b37SCy Schubert if(dtio->ssl) { 204525039b37SCy Schubert #ifdef HAVE_SSL 204625039b37SCy Schubert SSL_free(dtio->ssl); 204725039b37SCy Schubert dtio->ssl = NULL; 204825039b37SCy Schubert #endif 204925039b37SCy Schubert } 205025039b37SCy Schubert dtio_close_fd(dtio); 205125039b37SCy Schubert dtio_reconnect_enable(dtio); 205225039b37SCy Schubert return; 205325039b37SCy Schubert } 205425039b37SCy Schubert dtio->event = ev; 205525039b37SCy Schubert 205625039b37SCy Schubert /* setup protocol control message to start */ 205725039b37SCy Schubert if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 205825039b37SCy Schubert (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 205925039b37SCy Schubert log_err("dnstap io: out of memory"); 206025039b37SCy Schubert ub_event_free(dtio->event); 206125039b37SCy Schubert dtio->event = NULL; 206225039b37SCy Schubert if(dtio->ssl) { 206325039b37SCy Schubert #ifdef HAVE_SSL 206425039b37SCy Schubert SSL_free(dtio->ssl); 206525039b37SCy Schubert dtio->ssl = NULL; 206625039b37SCy Schubert #endif 206725039b37SCy Schubert } 206825039b37SCy Schubert dtio_close_fd(dtio); 206925039b37SCy Schubert dtio_reconnect_enable(dtio); 207025039b37SCy Schubert return; 207125039b37SCy Schubert } 207225039b37SCy Schubert } 207325039b37SCy Schubert 207425039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */ 207525039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio) 207625039b37SCy Schubert { 207725039b37SCy Schubert dtio_setup_cmd(dtio); 207825039b37SCy Schubert dtio_setup_reconnect(dtio); 207925039b37SCy Schubert dtio_open_output(dtio); 208025039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 208125039b37SCy Schubert return; 208225039b37SCy Schubert } 208325039b37SCy Schubert 208425039b37SCy Schubert #ifndef THREADS_DISABLED 208525039b37SCy Schubert /** the IO thread function for the DNSTAP IO */ 208625039b37SCy Schubert static void* dnstap_io(void* arg) 208725039b37SCy Schubert { 208825039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 208925039b37SCy Schubert time_t secs = 0; 209025039b37SCy Schubert struct timeval now; 209125039b37SCy Schubert log_thread_set(&dtio->threadnum); 209225039b37SCy Schubert 209325039b37SCy Schubert /* setup */ 209425039b37SCy Schubert verbose(VERB_ALGO, "start dnstap io thread"); 209525039b37SCy Schubert dtio_setup_base(dtio, &secs, &now); 209625039b37SCy Schubert dtio_setup_on_base(dtio); 209725039b37SCy Schubert 209825039b37SCy Schubert /* run */ 209925039b37SCy Schubert if(ub_event_base_dispatch(dtio->event_base) < 0) { 210025039b37SCy Schubert log_err("dnstap io: dispatch failed, errno is %s", 210125039b37SCy Schubert strerror(errno)); 210225039b37SCy Schubert } 210325039b37SCy Schubert 210425039b37SCy Schubert /* cleanup */ 210525039b37SCy Schubert verbose(VERB_ALGO, "stop dnstap io thread"); 210625039b37SCy Schubert dtio_desetup(dtio); 210725039b37SCy Schubert return NULL; 210825039b37SCy Schubert } 210925039b37SCy Schubert #endif /* THREADS_DISABLED */ 211025039b37SCy Schubert 211125039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 211225039b37SCy Schubert int numworkers) 211325039b37SCy Schubert { 211425039b37SCy Schubert /* set up the thread, can fail */ 211525039b37SCy Schubert #ifndef USE_WINSOCK 211625039b37SCy Schubert if(pipe(dtio->commandpipe) == -1) { 211725039b37SCy Schubert log_err("failed to create pipe: %s", strerror(errno)); 211825039b37SCy Schubert return 0; 211925039b37SCy Schubert } 212025039b37SCy Schubert #else 212125039b37SCy Schubert if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 212225039b37SCy Schubert log_err("failed to create _pipe: %s", 212325039b37SCy Schubert wsa_strerror(WSAGetLastError())); 212425039b37SCy Schubert return 0; 212525039b37SCy Schubert } 212625039b37SCy Schubert #endif 212725039b37SCy Schubert 212825039b37SCy Schubert /* start the thread */ 212925039b37SCy Schubert dtio->threadnum = numworkers+1; 213025039b37SCy Schubert dtio->started = 1; 213125039b37SCy Schubert #ifndef THREADS_DISABLED 213225039b37SCy Schubert ub_thread_create(&dtio->tid, dnstap_io, dtio); 213325039b37SCy Schubert (void)event_base_nothr; 213425039b37SCy Schubert #else 213525039b37SCy Schubert dtio->event_base = event_base_nothr; 213625039b37SCy Schubert dtio_setup_on_base(dtio); 213725039b37SCy Schubert #endif 213825039b37SCy Schubert return 1; 213925039b37SCy Schubert } 214025039b37SCy Schubert 214125039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio) 214225039b37SCy Schubert { 214325039b37SCy Schubert #ifndef THREADS_DISABLED 214425039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_STOP; 214525039b37SCy Schubert #endif 214625039b37SCy Schubert if(!dtio) return; 214725039b37SCy Schubert if(!dtio->started) return; 214825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: send stop cmd"); 214925039b37SCy Schubert 215025039b37SCy Schubert #ifndef THREADS_DISABLED 215125039b37SCy Schubert while(1) { 215225039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 215325039b37SCy Schubert if(r == -1) { 215425039b37SCy Schubert #ifndef USE_WINSOCK 215525039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 215625039b37SCy Schubert continue; 215725039b37SCy Schubert #else 215825039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 215925039b37SCy Schubert continue; 216025039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 216125039b37SCy Schubert continue; 216225039b37SCy Schubert #endif 2163c0caa2e2SCy Schubert log_err("dnstap io stop: write: %s", 2164c0caa2e2SCy Schubert sock_strerror(errno)); 216525039b37SCy Schubert break; 216625039b37SCy Schubert } 216725039b37SCy Schubert break; 216825039b37SCy Schubert } 216925039b37SCy Schubert dtio->started = 0; 217025039b37SCy Schubert #endif /* THREADS_DISABLED */ 217125039b37SCy Schubert 217225039b37SCy Schubert #ifndef USE_WINSOCK 217325039b37SCy Schubert close(dtio->commandpipe[1]); 217425039b37SCy Schubert #else 217525039b37SCy Schubert _close(dtio->commandpipe[1]); 217625039b37SCy Schubert #endif 217725039b37SCy Schubert dtio->commandpipe[1] = -1; 217825039b37SCy Schubert #ifndef THREADS_DISABLED 217925039b37SCy Schubert ub_thread_join(dtio->tid); 218025039b37SCy Schubert #else 218125039b37SCy Schubert dtio->want_to_exit = 1; 218225039b37SCy Schubert dtio_desetup(dtio); 218325039b37SCy Schubert #endif 218425039b37SCy Schubert } 2185