125039b37SCy Schubert /* 225039b37SCy Schubert * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 325039b37SCy Schubert * 425039b37SCy Schubert * Copyright (c) 2020, NLnet Labs. All rights reserved. 525039b37SCy Schubert * 625039b37SCy Schubert * This software is open source. 725039b37SCy Schubert * 825039b37SCy Schubert * Redistribution and use in source and binary forms, with or without 925039b37SCy Schubert * modification, are permitted provided that the following conditions 1025039b37SCy Schubert * are met: 1125039b37SCy Schubert * 1225039b37SCy Schubert * Redistributions of source code must retain the above copyright notice, 1325039b37SCy Schubert * this list of conditions and the following disclaimer. 1425039b37SCy Schubert * 1525039b37SCy Schubert * Redistributions in binary form must reproduce the above copyright notice, 1625039b37SCy Schubert * this list of conditions and the following disclaimer in the documentation 1725039b37SCy Schubert * and/or other materials provided with the distribution. 1825039b37SCy Schubert * 1925039b37SCy Schubert * Neither the name of the NLNET LABS nor the names of its contributors may 2025039b37SCy Schubert * be used to endorse or promote products derived from this software without 2125039b37SCy Schubert * specific prior written permission. 2225039b37SCy Schubert * 2325039b37SCy Schubert * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2425039b37SCy Schubert * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 2525039b37SCy Schubert * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 2625039b37SCy Schubert * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 2725039b37SCy Schubert * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 2825039b37SCy Schubert * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 2925039b37SCy Schubert * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 3025039b37SCy Schubert * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 3125039b37SCy Schubert * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 3225039b37SCy Schubert * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 3325039b37SCy Schubert * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3425039b37SCy Schubert * 3525039b37SCy Schubert */ 3625039b37SCy Schubert 3725039b37SCy Schubert /** 3825039b37SCy Schubert * \file 3925039b37SCy Schubert * 4025039b37SCy Schubert * An implementation of the Frame Streams data transport protocol for 4125039b37SCy Schubert * the Unbound DNSTAP message logging facility. 4225039b37SCy Schubert */ 4325039b37SCy Schubert 4425039b37SCy Schubert #include "config.h" 4525039b37SCy Schubert #include "dnstap/dtstream.h" 4625039b37SCy Schubert #include "dnstap/dnstap_fstrm.h" 4725039b37SCy Schubert #include "util/config_file.h" 4825039b37SCy Schubert #include "util/ub_event.h" 4925039b37SCy Schubert #include "util/net_help.h" 5025039b37SCy Schubert #include "services/outside_network.h" 5125039b37SCy Schubert #include "sldns/sbuffer.h" 5225039b37SCy Schubert #ifdef HAVE_SYS_UN_H 5325039b37SCy Schubert #include <sys/un.h> 5425039b37SCy Schubert #endif 5525039b37SCy Schubert #include <fcntl.h> 5625039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H 5725039b37SCy Schubert #include <openssl/ssl.h> 5825039b37SCy Schubert #endif 5925039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H 6025039b37SCy Schubert #include <openssl/err.h> 6125039b37SCy Schubert #endif 6225039b37SCy Schubert 6325039b37SCy Schubert /** number of messages to process in one output callback */ 6425039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100 6525039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */ 6625039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10 6725039b37SCy Schubert /** the msec to wait for reconnect max after backoff */ 6825039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000 6925039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 7025039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71*c0caa2e2SCy Schubert /** number of messages before wakeup of thread */ 72*c0caa2e2SCy Schubert #define DTIO_MSG_FOR_WAKEUP 32 7325039b37SCy Schubert 7425039b37SCy Schubert /** maximum length of received frame */ 7525039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000 7625039b37SCy Schubert 7725039b37SCy Schubert struct stop_flush_info; 7825039b37SCy Schubert /** DTIO command channel commands */ 7925039b37SCy Schubert enum { 8025039b37SCy Schubert /** DTIO command channel stop */ 8125039b37SCy Schubert DTIO_COMMAND_STOP = 0, 8225039b37SCy Schubert /** DTIO command channel wakeup */ 8325039b37SCy Schubert DTIO_COMMAND_WAKEUP = 1 8425039b37SCy Schubert } dtio_channel_command; 8525039b37SCy Schubert 8625039b37SCy Schubert /** open the output channel */ 8725039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio); 8825039b37SCy Schubert /** add output event for read and write */ 8925039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio); 9025039b37SCy Schubert /** start reconnection attempts */ 9125039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio); 9225039b37SCy Schubert /** stop from stop_flush event loop */ 9325039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info); 9425039b37SCy Schubert /** setup a start control message */ 9525039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio); 9625039b37SCy Schubert #ifdef HAVE_SSL 9725039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */ 9825039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio); 9925039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */ 10025039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio); 10125039b37SCy Schubert #endif 10225039b37SCy Schubert 10325039b37SCy Schubert struct dt_msg_queue* 104*c0caa2e2SCy Schubert dt_msg_queue_create(struct comm_base* base) 10525039b37SCy Schubert { 10625039b37SCy Schubert struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 10725039b37SCy Schubert if(!mq) return NULL; 10825039b37SCy Schubert mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 10925039b37SCy Schubert about 1 M should contain 64K messages with some overhead, 11025039b37SCy Schubert or a whole bunch smaller ones */ 111*c0caa2e2SCy Schubert mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112*c0caa2e2SCy Schubert if(!mq->wakeup_timer) { 113*c0caa2e2SCy Schubert free(mq); 114*c0caa2e2SCy Schubert return NULL; 115*c0caa2e2SCy Schubert } 11625039b37SCy Schubert lock_basic_init(&mq->lock); 11725039b37SCy Schubert lock_protect(&mq->lock, mq, sizeof(*mq)); 11825039b37SCy Schubert return mq; 11925039b37SCy Schubert } 12025039b37SCy Schubert 12125039b37SCy Schubert /** clear the message list, caller must hold the lock */ 12225039b37SCy Schubert static void 12325039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq) 12425039b37SCy Schubert { 12525039b37SCy Schubert struct dt_msg_entry* e = mq->first, *next=NULL; 12625039b37SCy Schubert while(e) { 12725039b37SCy Schubert next = e->next; 12825039b37SCy Schubert free(e->buf); 12925039b37SCy Schubert free(e); 13025039b37SCy Schubert e = next; 13125039b37SCy Schubert } 13225039b37SCy Schubert mq->first = NULL; 13325039b37SCy Schubert mq->last = NULL; 13425039b37SCy Schubert mq->cursize = 0; 135*c0caa2e2SCy Schubert mq->msgcount = 0; 13625039b37SCy Schubert } 13725039b37SCy Schubert 13825039b37SCy Schubert void 13925039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq) 14025039b37SCy Schubert { 14125039b37SCy Schubert if(!mq) return; 14225039b37SCy Schubert lock_basic_destroy(&mq->lock); 14325039b37SCy Schubert dt_msg_queue_clear(mq); 144*c0caa2e2SCy Schubert comm_timer_delete(mq->wakeup_timer); 14525039b37SCy Schubert free(mq); 14625039b37SCy Schubert } 14725039b37SCy Schubert 14825039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */ 14925039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio) 15025039b37SCy Schubert { 15125039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_WAKEUP; 15225039b37SCy Schubert if(!dtio) return; 15325039b37SCy Schubert if(!dtio->started) return; 15425039b37SCy Schubert 15525039b37SCy Schubert while(1) { 15625039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 15725039b37SCy Schubert if(r == -1) { 15825039b37SCy Schubert #ifndef USE_WINSOCK 15925039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 16025039b37SCy Schubert continue; 16125039b37SCy Schubert #else 16225039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 16325039b37SCy Schubert continue; 16425039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 16525039b37SCy Schubert continue; 16625039b37SCy Schubert #endif 167*c0caa2e2SCy Schubert log_err("dnstap io wakeup: write: %s", 168*c0caa2e2SCy Schubert sock_strerror(errno)); 16925039b37SCy Schubert break; 17025039b37SCy Schubert } 17125039b37SCy Schubert break; 17225039b37SCy Schubert } 17325039b37SCy Schubert } 17425039b37SCy Schubert 17525039b37SCy Schubert void 176*c0caa2e2SCy Schubert mq_wakeup_cb(void* arg) 177*c0caa2e2SCy Schubert { 178*c0caa2e2SCy Schubert struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179*c0caa2e2SCy Schubert /* even if the dtio is already active, because perhaps much 180*c0caa2e2SCy Schubert * traffic suddenly, we leave the timer running to save on 181*c0caa2e2SCy Schubert * managing it, the once a second timer is less work then 182*c0caa2e2SCy Schubert * starting and stopping the timer frequently */ 183*c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184*c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 0; 185*c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186*c0caa2e2SCy Schubert dtio_wakeup(mq->dtio); 187*c0caa2e2SCy Schubert } 188*c0caa2e2SCy Schubert 189*c0caa2e2SCy Schubert /** start timer to wakeup dtio because there is content in the queue */ 190*c0caa2e2SCy Schubert static void 191*c0caa2e2SCy Schubert dt_msg_queue_start_timer(struct dt_msg_queue* mq) 192*c0caa2e2SCy Schubert { 193*c0caa2e2SCy Schubert struct timeval tv; 194*c0caa2e2SCy Schubert /* Start a timer to process messages to be logged. 195*c0caa2e2SCy Schubert * If we woke up the dtio thread for every message, the wakeup 196*c0caa2e2SCy Schubert * messages take up too much processing power. If the queue 197*c0caa2e2SCy Schubert * fills up the wakeup happens immediately. The timer wakes it up 198*c0caa2e2SCy Schubert * if there are infrequent messages to log. */ 199*c0caa2e2SCy Schubert 200*c0caa2e2SCy Schubert /* we cannot start a timer in dtio thread, because it is a different 201*c0caa2e2SCy Schubert * thread and its event base is in use by the other thread, it would 202*c0caa2e2SCy Schubert * give race conditions if we tried to modify its event base, 203*c0caa2e2SCy Schubert * and locks would wait until it woke up, and this is what we do. */ 204*c0caa2e2SCy Schubert 205*c0caa2e2SCy Schubert /* do not start the timer if a timer already exists, perhaps 206*c0caa2e2SCy Schubert * in another worker. So this variable is protected by a lock in 207*c0caa2e2SCy Schubert * dtio */ 208*c0caa2e2SCy Schubert lock_basic_lock(&mq->dtio->wakeup_timer_lock); 209*c0caa2e2SCy Schubert if(mq->dtio->wakeup_timer_enabled) { 210*c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 211*c0caa2e2SCy Schubert return; 212*c0caa2e2SCy Schubert } 213*c0caa2e2SCy Schubert mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 214*c0caa2e2SCy Schubert lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215*c0caa2e2SCy Schubert 216*c0caa2e2SCy Schubert /* start the timer, in mq, in the event base of our worker */ 217*c0caa2e2SCy Schubert tv.tv_sec = 1; 218*c0caa2e2SCy Schubert tv.tv_usec = 0; 219*c0caa2e2SCy Schubert comm_timer_set(mq->wakeup_timer, &tv); 220*c0caa2e2SCy Schubert } 221*c0caa2e2SCy Schubert 222*c0caa2e2SCy Schubert void 22325039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 22425039b37SCy Schubert { 225*c0caa2e2SCy Schubert int wakeupnow = 0, wakeupstarttimer = 0; 22625039b37SCy Schubert struct dt_msg_entry* entry; 22725039b37SCy Schubert 22825039b37SCy Schubert /* check conditions */ 22925039b37SCy Schubert if(!buf) return; 23025039b37SCy Schubert if(len == 0) { 23125039b37SCy Schubert /* it is not possible to log entries with zero length, 23225039b37SCy Schubert * because the framestream protocol does not carry it. 23325039b37SCy Schubert * However the protobuf serialization does not create zero 23425039b37SCy Schubert * length datagrams for dnstap, so this should not happen. */ 23525039b37SCy Schubert free(buf); 23625039b37SCy Schubert return; 23725039b37SCy Schubert } 23825039b37SCy Schubert if(!mq) { 23925039b37SCy Schubert free(buf); 24025039b37SCy Schubert return; 24125039b37SCy Schubert } 24225039b37SCy Schubert 24325039b37SCy Schubert /* allocate memory for queue entry */ 24425039b37SCy Schubert entry = malloc(sizeof(*entry)); 24525039b37SCy Schubert if(!entry) { 24625039b37SCy Schubert log_err("out of memory logging dnstap"); 24725039b37SCy Schubert free(buf); 24825039b37SCy Schubert return; 24925039b37SCy Schubert } 25025039b37SCy Schubert entry->next = NULL; 25125039b37SCy Schubert entry->buf = buf; 25225039b37SCy Schubert entry->len = len; 25325039b37SCy Schubert 25425039b37SCy Schubert /* aqcuire lock */ 25525039b37SCy Schubert lock_basic_lock(&mq->lock); 256*c0caa2e2SCy Schubert /* if list was empty, start timer for (eventual) wakeup */ 25725039b37SCy Schubert if(mq->first == NULL) 258*c0caa2e2SCy Schubert wakeupstarttimer = 1; 259*c0caa2e2SCy Schubert /* if list contains more than wakeupnum elements, wakeup now, 260*c0caa2e2SCy Schubert * or if list is (going to be) almost full */ 261*c0caa2e2SCy Schubert if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 262*c0caa2e2SCy Schubert (mq->cursize < mq->maxsize * 9 / 10 && 263*c0caa2e2SCy Schubert mq->cursize+len >= mq->maxsize * 9 / 10)) 264*c0caa2e2SCy Schubert wakeupnow = 1; 26525039b37SCy Schubert /* see if it is going to fit */ 26625039b37SCy Schubert if(mq->cursize + len > mq->maxsize) { 26725039b37SCy Schubert /* buffer full, or congested. */ 26825039b37SCy Schubert /* drop */ 26925039b37SCy Schubert lock_basic_unlock(&mq->lock); 27025039b37SCy Schubert free(buf); 27125039b37SCy Schubert free(entry); 27225039b37SCy Schubert return; 27325039b37SCy Schubert } 27425039b37SCy Schubert mq->cursize += len; 275*c0caa2e2SCy Schubert mq->msgcount ++; 27625039b37SCy Schubert /* append to list */ 27725039b37SCy Schubert if(mq->last) { 27825039b37SCy Schubert mq->last->next = entry; 27925039b37SCy Schubert } else { 28025039b37SCy Schubert mq->first = entry; 28125039b37SCy Schubert } 28225039b37SCy Schubert mq->last = entry; 28325039b37SCy Schubert /* release lock */ 28425039b37SCy Schubert lock_basic_unlock(&mq->lock); 28525039b37SCy Schubert 286*c0caa2e2SCy Schubert if(wakeupnow) { 28725039b37SCy Schubert dtio_wakeup(mq->dtio); 288*c0caa2e2SCy Schubert } else if(wakeupstarttimer) { 289*c0caa2e2SCy Schubert dt_msg_queue_start_timer(mq); 290*c0caa2e2SCy Schubert } 29125039b37SCy Schubert } 29225039b37SCy Schubert 29325039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void) 29425039b37SCy Schubert { 29525039b37SCy Schubert struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 296*c0caa2e2SCy Schubert lock_basic_init(&dtio->wakeup_timer_lock); 297*c0caa2e2SCy Schubert lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 298*c0caa2e2SCy Schubert sizeof(dtio->wakeup_timer_enabled)); 29925039b37SCy Schubert return dtio; 30025039b37SCy Schubert } 30125039b37SCy Schubert 30225039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio) 30325039b37SCy Schubert { 30425039b37SCy Schubert struct dt_io_list_item* item, *nextitem; 30525039b37SCy Schubert if(!dtio) return; 306*c0caa2e2SCy Schubert lock_basic_destroy(&dtio->wakeup_timer_lock); 30725039b37SCy Schubert item=dtio->io_list; 30825039b37SCy Schubert while(item) { 30925039b37SCy Schubert nextitem = item->next; 31025039b37SCy Schubert free(item); 31125039b37SCy Schubert item = nextitem; 31225039b37SCy Schubert } 31325039b37SCy Schubert free(dtio->socket_path); 31425039b37SCy Schubert free(dtio->ip_str); 31525039b37SCy Schubert free(dtio->tls_server_name); 31625039b37SCy Schubert free(dtio->client_key_file); 31725039b37SCy Schubert free(dtio->client_cert_file); 31825039b37SCy Schubert if(dtio->ssl_ctx) { 31925039b37SCy Schubert #ifdef HAVE_SSL 32025039b37SCy Schubert SSL_CTX_free(dtio->ssl_ctx); 32125039b37SCy Schubert #endif 32225039b37SCy Schubert } 32325039b37SCy Schubert free(dtio); 32425039b37SCy Schubert } 32525039b37SCy Schubert 32625039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 32725039b37SCy Schubert { 32825039b37SCy Schubert if(!cfg->dnstap) { 32925039b37SCy Schubert log_warn("cannot setup dnstap because dnstap-enable is no"); 33025039b37SCy Schubert return 0; 33125039b37SCy Schubert } 33225039b37SCy Schubert 33325039b37SCy Schubert /* what type of connectivity do we have */ 33425039b37SCy Schubert if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 33525039b37SCy Schubert if(cfg->dnstap_tls) 33625039b37SCy Schubert dtio->upstream_is_tls = 1; 33725039b37SCy Schubert else dtio->upstream_is_tcp = 1; 33825039b37SCy Schubert } else { 33925039b37SCy Schubert dtio->upstream_is_unix = 1; 34025039b37SCy Schubert } 34125039b37SCy Schubert dtio->is_bidirectional = cfg->dnstap_bidirectional; 34225039b37SCy Schubert 34325039b37SCy Schubert if(dtio->upstream_is_unix) { 34425039b37SCy Schubert if(!cfg->dnstap_socket_path || 34525039b37SCy Schubert cfg->dnstap_socket_path[0]==0) { 34625039b37SCy Schubert log_err("dnstap setup: no dnstap-socket-path for " 34725039b37SCy Schubert "socket connect"); 34825039b37SCy Schubert return 0; 34925039b37SCy Schubert } 35025039b37SCy Schubert free(dtio->socket_path); 351*c0caa2e2SCy Schubert dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path, 352*c0caa2e2SCy Schubert cfg, 1); 35325039b37SCy Schubert if(!dtio->socket_path) { 35425039b37SCy Schubert log_err("dnstap setup: malloc failure"); 35525039b37SCy Schubert return 0; 35625039b37SCy Schubert } 35725039b37SCy Schubert } 35825039b37SCy Schubert 35925039b37SCy Schubert if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 36025039b37SCy Schubert if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 36125039b37SCy Schubert log_err("dnstap setup: no dnstap-ip for TCP connect"); 36225039b37SCy Schubert return 0; 36325039b37SCy Schubert } 36425039b37SCy Schubert free(dtio->ip_str); 36525039b37SCy Schubert dtio->ip_str = strdup(cfg->dnstap_ip); 36625039b37SCy Schubert if(!dtio->ip_str) { 36725039b37SCy Schubert log_err("dnstap setup: malloc failure"); 36825039b37SCy Schubert return 0; 36925039b37SCy Schubert } 37025039b37SCy Schubert } 37125039b37SCy Schubert 37225039b37SCy Schubert if(dtio->upstream_is_tls) { 37325039b37SCy Schubert #ifdef HAVE_SSL 37425039b37SCy Schubert if(cfg->dnstap_tls_server_name && 37525039b37SCy Schubert cfg->dnstap_tls_server_name[0]) { 37625039b37SCy Schubert free(dtio->tls_server_name); 37725039b37SCy Schubert dtio->tls_server_name = strdup( 37825039b37SCy Schubert cfg->dnstap_tls_server_name); 37925039b37SCy Schubert if(!dtio->tls_server_name) { 38025039b37SCy Schubert log_err("dnstap setup: malloc failure"); 38125039b37SCy Schubert return 0; 38225039b37SCy Schubert } 38325039b37SCy Schubert if(!check_auth_name_for_ssl(dtio->tls_server_name)) 38425039b37SCy Schubert return 0; 38525039b37SCy Schubert } 38625039b37SCy Schubert if(cfg->dnstap_tls_client_key_file && 38725039b37SCy Schubert cfg->dnstap_tls_client_key_file[0]) { 38825039b37SCy Schubert dtio->use_client_certs = 1; 38925039b37SCy Schubert free(dtio->client_key_file); 39025039b37SCy Schubert dtio->client_key_file = strdup( 39125039b37SCy Schubert cfg->dnstap_tls_client_key_file); 39225039b37SCy Schubert if(!dtio->client_key_file) { 39325039b37SCy Schubert log_err("dnstap setup: malloc failure"); 39425039b37SCy Schubert return 0; 39525039b37SCy Schubert } 39625039b37SCy Schubert if(!cfg->dnstap_tls_client_cert_file || 39725039b37SCy Schubert cfg->dnstap_tls_client_cert_file[0]==0) { 39825039b37SCy Schubert log_err("dnstap setup: client key " 39925039b37SCy Schubert "authentication enabled with " 40025039b37SCy Schubert "dnstap-tls-client-key-file, but " 40125039b37SCy Schubert "no dnstap-tls-client-cert-file " 40225039b37SCy Schubert "is given"); 40325039b37SCy Schubert return 0; 40425039b37SCy Schubert } 40525039b37SCy Schubert free(dtio->client_cert_file); 40625039b37SCy Schubert dtio->client_cert_file = strdup( 40725039b37SCy Schubert cfg->dnstap_tls_client_cert_file); 40825039b37SCy Schubert if(!dtio->client_cert_file) { 40925039b37SCy Schubert log_err("dnstap setup: malloc failure"); 41025039b37SCy Schubert return 0; 41125039b37SCy Schubert } 41225039b37SCy Schubert } else { 41325039b37SCy Schubert dtio->use_client_certs = 0; 41425039b37SCy Schubert dtio->client_key_file = NULL; 41525039b37SCy Schubert dtio->client_cert_file = NULL; 41625039b37SCy Schubert } 41725039b37SCy Schubert 41825039b37SCy Schubert if(cfg->dnstap_tls_cert_bundle) { 41925039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 42025039b37SCy Schubert dtio->client_key_file, 42125039b37SCy Schubert dtio->client_cert_file, 42225039b37SCy Schubert cfg->dnstap_tls_cert_bundle, 0); 42325039b37SCy Schubert } else { 42425039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 42525039b37SCy Schubert dtio->client_key_file, 42625039b37SCy Schubert dtio->client_cert_file, 42725039b37SCy Schubert cfg->tls_cert_bundle, cfg->tls_win_cert); 42825039b37SCy Schubert } 42925039b37SCy Schubert if(!dtio->ssl_ctx) { 43025039b37SCy Schubert log_err("could not setup SSL CTX"); 43125039b37SCy Schubert return 0; 43225039b37SCy Schubert } 43325039b37SCy Schubert dtio->tls_use_sni = cfg->tls_use_sni; 43425039b37SCy Schubert #endif /* HAVE_SSL */ 43525039b37SCy Schubert } 43625039b37SCy Schubert return 1; 43725039b37SCy Schubert } 43825039b37SCy Schubert 43925039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio, 44025039b37SCy Schubert struct dt_msg_queue* mq) 44125039b37SCy Schubert { 44225039b37SCy Schubert struct dt_io_list_item* item = malloc(sizeof(*item)); 44325039b37SCy Schubert if(!item) return 0; 44425039b37SCy Schubert lock_basic_lock(&mq->lock); 44525039b37SCy Schubert mq->dtio = dtio; 44625039b37SCy Schubert lock_basic_unlock(&mq->lock); 44725039b37SCy Schubert item->queue = mq; 44825039b37SCy Schubert item->next = dtio->io_list; 44925039b37SCy Schubert dtio->io_list = item; 45025039b37SCy Schubert dtio->io_list_iter = NULL; 45125039b37SCy Schubert return 1; 45225039b37SCy Schubert } 45325039b37SCy Schubert 45425039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 45525039b37SCy Schubert struct dt_msg_queue* mq) 45625039b37SCy Schubert { 45725039b37SCy Schubert struct dt_io_list_item* item, *prev=NULL; 45825039b37SCy Schubert if(!dtio) return; 45925039b37SCy Schubert item = dtio->io_list; 46025039b37SCy Schubert while(item) { 46125039b37SCy Schubert if(item->queue == mq) { 46225039b37SCy Schubert /* found it */ 46325039b37SCy Schubert if(prev) prev->next = item->next; 46425039b37SCy Schubert else dtio->io_list = item->next; 46525039b37SCy Schubert /* the queue itself only registered, not deleted */ 46625039b37SCy Schubert lock_basic_lock(&item->queue->lock); 46725039b37SCy Schubert item->queue->dtio = NULL; 46825039b37SCy Schubert lock_basic_unlock(&item->queue->lock); 46925039b37SCy Schubert free(item); 47025039b37SCy Schubert dtio->io_list_iter = NULL; 47125039b37SCy Schubert return; 47225039b37SCy Schubert } 47325039b37SCy Schubert prev = item; 47425039b37SCy Schubert item = item->next; 47525039b37SCy Schubert } 47625039b37SCy Schubert } 47725039b37SCy Schubert 47825039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks, 47925039b37SCy Schubert * returns true if there is a message */ 48025039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 48125039b37SCy Schubert size_t* len) 48225039b37SCy Schubert { 48325039b37SCy Schubert lock_basic_lock(&mq->lock); 48425039b37SCy Schubert if(mq->first) { 48525039b37SCy Schubert struct dt_msg_entry* entry = mq->first; 48625039b37SCy Schubert mq->first = entry->next; 48725039b37SCy Schubert if(!entry->next) mq->last = NULL; 48825039b37SCy Schubert mq->cursize -= entry->len; 489*c0caa2e2SCy Schubert mq->msgcount --; 49025039b37SCy Schubert lock_basic_unlock(&mq->lock); 49125039b37SCy Schubert 49225039b37SCy Schubert *buf = entry->buf; 49325039b37SCy Schubert *len = entry->len; 49425039b37SCy Schubert free(entry); 49525039b37SCy Schubert return 1; 49625039b37SCy Schubert } 49725039b37SCy Schubert lock_basic_unlock(&mq->lock); 49825039b37SCy Schubert return 0; 49925039b37SCy Schubert } 50025039b37SCy Schubert 50125039b37SCy Schubert /** find message in queue, false if no message, true if message to send */ 50225039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio, 50325039b37SCy Schubert struct dt_msg_queue* mq) 50425039b37SCy Schubert { 50525039b37SCy Schubert void* buf=NULL; 50625039b37SCy Schubert size_t len=0; 50725039b37SCy Schubert if(dt_msg_queue_pop(mq, &buf, &len)) { 50825039b37SCy Schubert dtio->cur_msg = buf; 50925039b37SCy Schubert dtio->cur_msg_len = len; 51025039b37SCy Schubert dtio->cur_msg_done = 0; 51125039b37SCy Schubert dtio->cur_msg_len_done = 0; 51225039b37SCy Schubert return 1; 51325039b37SCy Schubert } 51425039b37SCy Schubert return 0; 51525039b37SCy Schubert } 51625039b37SCy Schubert 51725039b37SCy Schubert /** find a new message to write, search message queues, false if none */ 51825039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio) 51925039b37SCy Schubert { 52025039b37SCy Schubert struct dt_io_list_item *spot, *item; 52125039b37SCy Schubert 52225039b37SCy Schubert spot = dtio->io_list_iter; 52325039b37SCy Schubert /* use the next queue for the next message lookup, 52425039b37SCy Schubert * if we hit the end(NULL) the NULL restarts the iter at start. */ 52525039b37SCy Schubert if(spot) 52625039b37SCy Schubert dtio->io_list_iter = spot->next; 52725039b37SCy Schubert else if(dtio->io_list) 52825039b37SCy Schubert dtio->io_list_iter = dtio->io_list->next; 52925039b37SCy Schubert 53025039b37SCy Schubert /* scan from spot to end-of-io_list */ 53125039b37SCy Schubert item = spot; 53225039b37SCy Schubert while(item) { 53325039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 53425039b37SCy Schubert return 1; 53525039b37SCy Schubert item = item->next; 53625039b37SCy Schubert } 53725039b37SCy Schubert /* scan starting at the start-of-list (to wrap around the end) */ 53825039b37SCy Schubert item = dtio->io_list; 53925039b37SCy Schubert while(item) { 54025039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 54125039b37SCy Schubert return 1; 54225039b37SCy Schubert item = item->next; 54325039b37SCy Schubert } 54425039b37SCy Schubert return 0; 54525039b37SCy Schubert } 54625039b37SCy Schubert 54725039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */ 54825039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 54925039b37SCy Schubert short ATTR_UNUSED(bits), void* arg) 55025039b37SCy Schubert { 55125039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 55225039b37SCy Schubert dtio->reconnect_is_added = 0; 55325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: reconnect timer"); 55425039b37SCy Schubert 55525039b37SCy Schubert dtio_open_output(dtio); 55625039b37SCy Schubert if(dtio->event) { 55725039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 55825039b37SCy Schubert return; 55925039b37SCy Schubert /* nothing wrong so far, wait on the output event */ 56025039b37SCy Schubert return; 56125039b37SCy Schubert } 56225039b37SCy Schubert /* exponential backoff and retry on timer */ 56325039b37SCy Schubert dtio_reconnect_enable(dtio); 56425039b37SCy Schubert } 56525039b37SCy Schubert 56625039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */ 56725039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio) 56825039b37SCy Schubert { 56925039b37SCy Schubert struct timeval tv; 57025039b37SCy Schubert int msec; 57125039b37SCy Schubert if(dtio->want_to_exit) return; 57225039b37SCy Schubert if(dtio->reconnect_is_added) 57325039b37SCy Schubert return; /* already done */ 57425039b37SCy Schubert 57525039b37SCy Schubert /* exponential backoff, store the value for next timeout */ 57625039b37SCy Schubert msec = dtio->reconnect_timeout; 57725039b37SCy Schubert if(msec == 0) { 57825039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 57925039b37SCy Schubert } else { 58025039b37SCy Schubert dtio->reconnect_timeout = msec*2; 58125039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 58225039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 58325039b37SCy Schubert } 58425039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 58525039b37SCy Schubert msec); 58625039b37SCy Schubert 58725039b37SCy Schubert /* setup wait timer */ 58825039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 58925039b37SCy Schubert tv.tv_sec = msec/1000; 59025039b37SCy Schubert tv.tv_usec = (msec%1000)*1000; 59125039b37SCy Schubert if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 59225039b37SCy Schubert &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 59325039b37SCy Schubert log_err("dnstap io: could not reconnect ev timer add"); 59425039b37SCy Schubert return; 59525039b37SCy Schubert } 59625039b37SCy Schubert dtio->reconnect_is_added = 1; 59725039b37SCy Schubert } 59825039b37SCy Schubert 59925039b37SCy Schubert /** remove dtio reconnect timer */ 60025039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio) 60125039b37SCy Schubert { 60225039b37SCy Schubert if(!dtio->reconnect_is_added) 60325039b37SCy Schubert return; 60425039b37SCy Schubert ub_timer_del(dtio->reconnect_timer); 60525039b37SCy Schubert dtio->reconnect_is_added = 0; 60625039b37SCy Schubert } 60725039b37SCy Schubert 60825039b37SCy Schubert /** clear the reconnect exponential backoff timer. 60925039b37SCy Schubert * We have successfully connected so we can try again with short timeouts. */ 61025039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio) 61125039b37SCy Schubert { 61225039b37SCy Schubert dtio->reconnect_timeout = 0; 61325039b37SCy Schubert dtio_reconnect_del(dtio); 61425039b37SCy Schubert } 61525039b37SCy Schubert 61625039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */ 61725039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 61825039b37SCy Schubert { 61925039b37SCy Schubert dtio_reconnect_del(dtio); 62025039b37SCy Schubert dtio->reconnect_timeout = msec; 62125039b37SCy Schubert dtio_reconnect_enable(dtio); 62225039b37SCy Schubert } 62325039b37SCy Schubert 62425039b37SCy Schubert /** delete the current message in the dtio, and reset counters */ 62525039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio) 62625039b37SCy Schubert { 62725039b37SCy Schubert free(dtio->cur_msg); 62825039b37SCy Schubert dtio->cur_msg = NULL; 62925039b37SCy Schubert dtio->cur_msg_len = 0; 63025039b37SCy Schubert dtio->cur_msg_done = 0; 63125039b37SCy Schubert dtio->cur_msg_len_done = 0; 63225039b37SCy Schubert } 63325039b37SCy Schubert 63425039b37SCy Schubert /** delete the buffer and counters used to read frame */ 63525039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 63625039b37SCy Schubert { 63725039b37SCy Schubert if(rb->buf) { 63825039b37SCy Schubert free(rb->buf); 63925039b37SCy Schubert rb->buf = NULL; 64025039b37SCy Schubert } 64125039b37SCy Schubert rb->buf_count = 0; 64225039b37SCy Schubert rb->buf_cap = 0; 64325039b37SCy Schubert rb->frame_len = 0; 64425039b37SCy Schubert rb->frame_len_done = 0; 64525039b37SCy Schubert rb->control_frame = 0; 64625039b37SCy Schubert } 64725039b37SCy Schubert 64825039b37SCy Schubert /** del the output file descriptor event for listening */ 64925039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio) 65025039b37SCy Schubert { 65125039b37SCy Schubert if(!dtio->event_added) 65225039b37SCy Schubert return; 65325039b37SCy Schubert ub_event_del(dtio->event); 65425039b37SCy Schubert dtio->event_added = 0; 65525039b37SCy Schubert dtio->event_added_is_write = 0; 65625039b37SCy Schubert } 65725039b37SCy Schubert 65825039b37SCy Schubert /** close dtio socket and set it to -1 */ 65925039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio) 66025039b37SCy Schubert { 661*c0caa2e2SCy Schubert sock_close(dtio->fd); 66225039b37SCy Schubert dtio->fd = -1; 66325039b37SCy Schubert } 66425039b37SCy Schubert 66525039b37SCy Schubert /** close and stop the output file descriptor event */ 66625039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio) 66725039b37SCy Schubert { 66825039b37SCy Schubert if(!dtio->event) 66925039b37SCy Schubert return; 67025039b37SCy Schubert ub_event_free(dtio->event); 67125039b37SCy Schubert dtio->event = NULL; 67225039b37SCy Schubert if(dtio->ssl) { 67325039b37SCy Schubert #ifdef HAVE_SSL 67425039b37SCy Schubert SSL_shutdown(dtio->ssl); 67525039b37SCy Schubert SSL_free(dtio->ssl); 67625039b37SCy Schubert dtio->ssl = NULL; 67725039b37SCy Schubert #endif 67825039b37SCy Schubert } 67925039b37SCy Schubert dtio_close_fd(dtio); 68025039b37SCy Schubert 68125039b37SCy Schubert /* if there is a (partial) message, discard it 68225039b37SCy Schubert * we cannot send (the remainder of) it, and a new 68325039b37SCy Schubert * connection needs to start with a control frame. */ 68425039b37SCy Schubert if(dtio->cur_msg) { 68525039b37SCy Schubert dtio_cur_msg_free(dtio); 68625039b37SCy Schubert } 68725039b37SCy Schubert 68825039b37SCy Schubert dtio->ready_frame_sent = 0; 68925039b37SCy Schubert dtio->accept_frame_received = 0; 69025039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 69125039b37SCy Schubert 69225039b37SCy Schubert dtio_reconnect_enable(dtio); 69325039b37SCy Schubert } 69425039b37SCy Schubert 69525039b37SCy Schubert /** check for pending nonblocking connect errors, 69625039b37SCy Schubert * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 69725039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio) 69825039b37SCy Schubert { 69925039b37SCy Schubert int error = 0; 70025039b37SCy Schubert socklen_t len = (socklen_t)sizeof(error); 70125039b37SCy Schubert if(!dtio->check_nb_connect) 70225039b37SCy Schubert return 1; /* everything okay */ 70325039b37SCy Schubert if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 70425039b37SCy Schubert &len) < 0) { 70525039b37SCy Schubert #ifndef USE_WINSOCK 70625039b37SCy Schubert error = errno; /* on solaris errno is error */ 70725039b37SCy Schubert #else 70825039b37SCy Schubert error = WSAGetLastError(); 70925039b37SCy Schubert #endif 71025039b37SCy Schubert } 71125039b37SCy Schubert #ifndef USE_WINSOCK 71225039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 71325039b37SCy Schubert if(error == EINPROGRESS || error == EWOULDBLOCK) 71425039b37SCy Schubert return 0; /* try again later */ 71525039b37SCy Schubert #endif 71625039b37SCy Schubert #else 71725039b37SCy Schubert if(error == WSAEINPROGRESS) { 71825039b37SCy Schubert return 0; /* try again later */ 71925039b37SCy Schubert } else if(error == WSAEWOULDBLOCK) { 72025039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 72125039b37SCy Schubert dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 72225039b37SCy Schubert return 0; /* try again later */ 72325039b37SCy Schubert } 72425039b37SCy Schubert #endif 72525039b37SCy Schubert if(error != 0) { 72625039b37SCy Schubert char* to = dtio->socket_path; 72725039b37SCy Schubert if(!to) to = dtio->ip_str; 72825039b37SCy Schubert if(!to) to = ""; 72925039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 730*c0caa2e2SCy Schubert to, sock_strerror(error)); 73125039b37SCy Schubert return -1; /* error, close it */ 73225039b37SCy Schubert } 73325039b37SCy Schubert 73425039b37SCy Schubert if(dtio->ip_str) 73525039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to %s", 73625039b37SCy Schubert dtio->ip_str); 73725039b37SCy Schubert else if(dtio->socket_path) 73825039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 73925039b37SCy Schubert dtio->socket_path); 74025039b37SCy Schubert dtio_reconnect_clear(dtio); 74125039b37SCy Schubert dtio->check_nb_connect = 0; 74225039b37SCy Schubert return 1; /* everything okay */ 74325039b37SCy Schubert } 74425039b37SCy Schubert 74525039b37SCy Schubert #ifdef HAVE_SSL 74625039b37SCy Schubert /** write to ssl output 74725039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 74825039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 74925039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 75025039b37SCy Schubert size_t len) 75125039b37SCy Schubert { 75225039b37SCy Schubert int r; 75325039b37SCy Schubert ERR_clear_error(); 75425039b37SCy Schubert r = SSL_write(dtio->ssl, buf, len); 75525039b37SCy Schubert if(r <= 0) { 75625039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 75725039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 75825039b37SCy Schubert /* closed */ 75925039b37SCy Schubert return -1; 76025039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 76125039b37SCy Schubert /* we want a brief read event */ 76225039b37SCy Schubert dtio_enable_brief_read(dtio); 76325039b37SCy Schubert return 0; 76425039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 76525039b37SCy Schubert /* write again later */ 76625039b37SCy Schubert return 0; 76725039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 76825039b37SCy Schubert #ifdef EPIPE 76925039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 77025039b37SCy Schubert return -1; /* silence 'broken pipe' */ 77125039b37SCy Schubert #endif 77225039b37SCy Schubert #ifdef ECONNRESET 77325039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 77425039b37SCy Schubert return -1; /* silence reset by peer */ 77525039b37SCy Schubert #endif 77625039b37SCy Schubert if(errno != 0) { 77725039b37SCy Schubert log_err("dnstap io, SSL_write syscall: %s", 77825039b37SCy Schubert strerror(errno)); 77925039b37SCy Schubert } 78025039b37SCy Schubert return -1; 78125039b37SCy Schubert } 78225039b37SCy Schubert log_crypto_err("dnstap io, could not SSL_write"); 78325039b37SCy Schubert return -1; 78425039b37SCy Schubert } 78525039b37SCy Schubert return r; 78625039b37SCy Schubert } 78725039b37SCy Schubert #endif /* HAVE_SSL */ 78825039b37SCy Schubert 78925039b37SCy Schubert /** write buffer to output. 79025039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 79125039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 79225039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 79325039b37SCy Schubert size_t len) 79425039b37SCy Schubert { 79525039b37SCy Schubert ssize_t ret; 79625039b37SCy Schubert if(dtio->fd == -1) 79725039b37SCy Schubert return -1; 79825039b37SCy Schubert #ifdef HAVE_SSL 79925039b37SCy Schubert if(dtio->ssl) 80025039b37SCy Schubert return dtio_write_ssl(dtio, buf, len); 80125039b37SCy Schubert #endif 80225039b37SCy Schubert ret = send(dtio->fd, (void*)buf, len, 0); 80325039b37SCy Schubert if(ret == -1) { 80425039b37SCy Schubert #ifndef USE_WINSOCK 80525039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 80625039b37SCy Schubert return 0; 80725039b37SCy Schubert #else 80825039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 80925039b37SCy Schubert return 0; 81025039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 81125039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 81225039b37SCy Schubert dtio->stop_flush_event:dtio->event), 81325039b37SCy Schubert UB_EV_WRITE); 81425039b37SCy Schubert return 0; 81525039b37SCy Schubert } 81625039b37SCy Schubert #endif 817*c0caa2e2SCy Schubert log_err("dnstap io: failed send: %s", sock_strerror(errno)); 81825039b37SCy Schubert return -1; 81925039b37SCy Schubert } 82025039b37SCy Schubert return ret; 82125039b37SCy Schubert } 82225039b37SCy Schubert 82325039b37SCy Schubert #ifdef HAVE_WRITEV 82425039b37SCy Schubert /** write with writev, len and message, in one write, if possible. 82525039b37SCy Schubert * return true if message is done, false if incomplete */ 82625039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio) 82725039b37SCy Schubert { 82825039b37SCy Schubert uint32_t sendlen = htonl(dtio->cur_msg_len); 82925039b37SCy Schubert struct iovec iov[2]; 83025039b37SCy Schubert ssize_t r; 83125039b37SCy Schubert iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 83225039b37SCy Schubert iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 83325039b37SCy Schubert iov[1].iov_base = dtio->cur_msg; 83425039b37SCy Schubert iov[1].iov_len = dtio->cur_msg_len; 83525039b37SCy Schubert log_assert(iov[0].iov_len > 0); 83625039b37SCy Schubert r = writev(dtio->fd, iov, 2); 83725039b37SCy Schubert if(r == -1) { 83825039b37SCy Schubert #ifndef USE_WINSOCK 83925039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 84025039b37SCy Schubert return 0; 84125039b37SCy Schubert #else 84225039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 84325039b37SCy Schubert return 0; 84425039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 84525039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 84625039b37SCy Schubert dtio->stop_flush_event:dtio->event), 84725039b37SCy Schubert UB_EV_WRITE); 84825039b37SCy Schubert return 0; 84925039b37SCy Schubert } 85025039b37SCy Schubert #endif 851*c0caa2e2SCy Schubert log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 85225039b37SCy Schubert /* close the channel */ 85325039b37SCy Schubert dtio_del_output_event(dtio); 85425039b37SCy Schubert dtio_close_output(dtio); 85525039b37SCy Schubert return 0; 85625039b37SCy Schubert } 85725039b37SCy Schubert /* written r bytes */ 85825039b37SCy Schubert dtio->cur_msg_len_done += r; 85925039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 86025039b37SCy Schubert return 0; 86125039b37SCy Schubert if(dtio->cur_msg_len_done > 4) { 86225039b37SCy Schubert dtio->cur_msg_done = dtio->cur_msg_len_done-4; 86325039b37SCy Schubert dtio->cur_msg_len_done = 4; 86425039b37SCy Schubert } 86525039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 86625039b37SCy Schubert return 0; 86725039b37SCy Schubert return 1; 86825039b37SCy Schubert } 86925039b37SCy Schubert #endif /* HAVE_WRITEV */ 87025039b37SCy Schubert 87125039b37SCy Schubert /** write more of the length, preceding the data frame. 87225039b37SCy Schubert * return true if message is done, false if incomplete. */ 87325039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio) 87425039b37SCy Schubert { 87525039b37SCy Schubert uint32_t sendlen; 87625039b37SCy Schubert int r; 87725039b37SCy Schubert if(dtio->cur_msg_len_done >= 4) 87825039b37SCy Schubert return 1; 87925039b37SCy Schubert #ifdef HAVE_WRITEV 88025039b37SCy Schubert if(!dtio->ssl) { 88125039b37SCy Schubert /* we try writev for everything.*/ 88225039b37SCy Schubert return dtio_write_with_writev(dtio); 88325039b37SCy Schubert } 88425039b37SCy Schubert #endif /* HAVE_WRITEV */ 88525039b37SCy Schubert sendlen = htonl(dtio->cur_msg_len); 88625039b37SCy Schubert r = dtio_write_buf(dtio, 88725039b37SCy Schubert ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 88825039b37SCy Schubert sizeof(sendlen)-dtio->cur_msg_len_done); 88925039b37SCy Schubert if(r == -1) { 89025039b37SCy Schubert /* close the channel */ 89125039b37SCy Schubert dtio_del_output_event(dtio); 89225039b37SCy Schubert dtio_close_output(dtio); 89325039b37SCy Schubert return 0; 89425039b37SCy Schubert } else if(r == 0) { 89525039b37SCy Schubert /* try again later */ 89625039b37SCy Schubert return 0; 89725039b37SCy Schubert } 89825039b37SCy Schubert dtio->cur_msg_len_done += r; 89925039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 90025039b37SCy Schubert return 0; 90125039b37SCy Schubert return 1; 90225039b37SCy Schubert } 90325039b37SCy Schubert 90425039b37SCy Schubert /** write more of the data frame. 90525039b37SCy Schubert * return true if message is done, false if incomplete. */ 90625039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio) 90725039b37SCy Schubert { 90825039b37SCy Schubert int r; 90925039b37SCy Schubert if(dtio->cur_msg_done >= dtio->cur_msg_len) 91025039b37SCy Schubert return 1; 91125039b37SCy Schubert r = dtio_write_buf(dtio, 91225039b37SCy Schubert ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 91325039b37SCy Schubert dtio->cur_msg_len - dtio->cur_msg_done); 91425039b37SCy Schubert if(r == -1) { 91525039b37SCy Schubert /* close the channel */ 91625039b37SCy Schubert dtio_del_output_event(dtio); 91725039b37SCy Schubert dtio_close_output(dtio); 91825039b37SCy Schubert return 0; 91925039b37SCy Schubert } else if(r == 0) { 92025039b37SCy Schubert /* try again later */ 92125039b37SCy Schubert return 0; 92225039b37SCy Schubert } 92325039b37SCy Schubert dtio->cur_msg_done += r; 92425039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 92525039b37SCy Schubert return 0; 92625039b37SCy Schubert return 1; 92725039b37SCy Schubert } 92825039b37SCy Schubert 92925039b37SCy Schubert /** write more of the current messsage. false if incomplete, true if 93025039b37SCy Schubert * the message is done */ 93125039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio) 93225039b37SCy Schubert { 93325039b37SCy Schubert if(dtio->cur_msg_len_done < 4) { 93425039b37SCy Schubert if(!dtio_write_more_of_len(dtio)) 93525039b37SCy Schubert return 0; 93625039b37SCy Schubert } 93725039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 93825039b37SCy Schubert if(!dtio_write_more_of_data(dtio)) 93925039b37SCy Schubert return 0; 94025039b37SCy Schubert } 94125039b37SCy Schubert return 1; 94225039b37SCy Schubert } 94325039b37SCy Schubert 94425039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 94525039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 94625039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 94725039b37SCy Schubert ssize_t r; 94825039b37SCy Schubert r = recv(dtio->fd, (void*)buf, len, 0); 94925039b37SCy Schubert if(r == -1) { 95025039b37SCy Schubert char* to = dtio->socket_path; 95125039b37SCy Schubert if(!to) to = dtio->ip_str; 95225039b37SCy Schubert if(!to) to = ""; 95325039b37SCy Schubert #ifndef USE_WINSOCK 95425039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 95525039b37SCy Schubert return -1; /* try later */ 95625039b37SCy Schubert #else 95725039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) { 95825039b37SCy Schubert return -1; /* try later */ 95925039b37SCy Schubert } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 96025039b37SCy Schubert ub_winsock_tcp_wouldblock( 96125039b37SCy Schubert (dtio->stop_flush_event? 96225039b37SCy Schubert dtio->stop_flush_event:dtio->event), 96325039b37SCy Schubert UB_EV_READ); 96425039b37SCy Schubert return -1; /* try later */ 96525039b37SCy Schubert } 96625039b37SCy Schubert #endif 96725039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 96825039b37SCy Schubert verbosity < 4) 96925039b37SCy Schubert return 0; /* no log retries on low verbosity */ 97025039b37SCy Schubert log_err("dnstap io: output closed, recv %s: %s", to, 97125039b37SCy Schubert strerror(errno)); 97225039b37SCy Schubert /* and close below */ 97325039b37SCy Schubert return 0; 97425039b37SCy Schubert } 97525039b37SCy Schubert if(r == 0) { 97625039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 97725039b37SCy Schubert verbosity < 4) 97825039b37SCy Schubert return 0; /* no log retries on low verbosity */ 97925039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 98025039b37SCy Schubert /* and close below */ 98125039b37SCy Schubert return 0; 98225039b37SCy Schubert } 98325039b37SCy Schubert /* something was received */ 98425039b37SCy Schubert return r; 98525039b37SCy Schubert } 98625039b37SCy Schubert 98725039b37SCy Schubert #ifdef HAVE_SSL 98825039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 98925039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 99025039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 99125039b37SCy Schubert { 99225039b37SCy Schubert int r; 99325039b37SCy Schubert ERR_clear_error(); 99425039b37SCy Schubert r = SSL_read(dtio->ssl, buf, len); 99525039b37SCy Schubert if(r <= 0) { 99625039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 99725039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 99825039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 99925039b37SCy Schubert verbosity < 4) 100025039b37SCy Schubert return 0; /* no log retries on low verbosity */ 100125039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 100225039b37SCy Schubert "other side"); 100325039b37SCy Schubert return 0; 100425039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 100525039b37SCy Schubert /* continue later */ 100625039b37SCy Schubert return -1; 100725039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 100825039b37SCy Schubert (void)dtio_enable_brief_write(dtio); 100925039b37SCy Schubert return -1; 101025039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 101125039b37SCy Schubert #ifdef ECONNRESET 101225039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 101325039b37SCy Schubert errno == ECONNRESET && verbosity < 4) 101425039b37SCy Schubert return 0; /* silence reset by peer */ 101525039b37SCy Schubert #endif 101625039b37SCy Schubert if(errno != 0) 101725039b37SCy Schubert log_err("SSL_read syscall: %s", 101825039b37SCy Schubert strerror(errno)); 101925039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 102025039b37SCy Schubert "other side"); 102125039b37SCy Schubert return 0; 102225039b37SCy Schubert } 102325039b37SCy Schubert log_crypto_err("could not SSL_read"); 102425039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 102525039b37SCy Schubert "other side"); 102625039b37SCy Schubert return 0; 102725039b37SCy Schubert } 102825039b37SCy Schubert return r; 102925039b37SCy Schubert } 103025039b37SCy Schubert #endif /* HAVE_SSL */ 103125039b37SCy Schubert 103225039b37SCy Schubert /** check if the output fd has been closed, 103325039b37SCy Schubert * it returns false if the stream is closed. */ 103425039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio) 103525039b37SCy Schubert { 103625039b37SCy Schubert /* we don't want to read any packets, but if there are we can 103725039b37SCy Schubert * discard the input (ignore it). Ignore of unknown (control) 103825039b37SCy Schubert * packets is okay for the framestream protocol. And also, the 103925039b37SCy Schubert * read call can return that the stream has been closed by the 104025039b37SCy Schubert * other side. */ 104125039b37SCy Schubert uint8_t buf[1024]; 104225039b37SCy Schubert int r = -1; 104325039b37SCy Schubert 104425039b37SCy Schubert 104525039b37SCy Schubert if(dtio->fd == -1) return 0; 104625039b37SCy Schubert 104725039b37SCy Schubert while(r != 0) { 104825039b37SCy Schubert /* not interested in buffer content, overwrite */ 104925039b37SCy Schubert r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 105025039b37SCy Schubert if(r == -1) 105125039b37SCy Schubert return 1; 105225039b37SCy Schubert } 105325039b37SCy Schubert /* the other end has been closed */ 105425039b37SCy Schubert /* close the channel */ 105525039b37SCy Schubert dtio_del_output_event(dtio); 105625039b37SCy Schubert dtio_close_output(dtio); 105725039b37SCy Schubert return 0; 105825039b37SCy Schubert } 105925039b37SCy Schubert 106025039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed, 106125039b37SCy Schubert * 1: valid accept received. */ 106225039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio) 106325039b37SCy Schubert { 106425039b37SCy Schubert int r; 106525039b37SCy Schubert size_t read_frame_done; 106625039b37SCy Schubert while(dtio->read_frame.frame_len_done < 4) { 106725039b37SCy Schubert #ifdef HAVE_SSL 106825039b37SCy Schubert if(dtio->ssl) { 106925039b37SCy Schubert r = ssl_read_bytes(dtio, 107025039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 107125039b37SCy Schubert dtio->read_frame.frame_len_done, 107225039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 107325039b37SCy Schubert } else { 107425039b37SCy Schubert #endif 107525039b37SCy Schubert r = receive_bytes(dtio, 107625039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 107725039b37SCy Schubert dtio->read_frame.frame_len_done, 107825039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 107925039b37SCy Schubert #ifdef HAVE_SSL 108025039b37SCy Schubert } 108125039b37SCy Schubert #endif 108225039b37SCy Schubert if(r == -1) 108325039b37SCy Schubert return -1; /* continue reading */ 108425039b37SCy Schubert if(r == 0) { 108525039b37SCy Schubert /* connection closed */ 108625039b37SCy Schubert goto close_connection; 108725039b37SCy Schubert } 108825039b37SCy Schubert dtio->read_frame.frame_len_done += r; 108925039b37SCy Schubert if(dtio->read_frame.frame_len_done < 4) 109025039b37SCy Schubert return -1; /* continue reading */ 109125039b37SCy Schubert 109225039b37SCy Schubert if(dtio->read_frame.frame_len == 0) { 109325039b37SCy Schubert dtio->read_frame.frame_len_done = 0; 109425039b37SCy Schubert dtio->read_frame.control_frame = 1; 109525039b37SCy Schubert continue; 109625039b37SCy Schubert } 109725039b37SCy Schubert dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 109825039b37SCy Schubert if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 109925039b37SCy Schubert verbose(VERB_OPS, "dnstap: received frame exceeds max " 110025039b37SCy Schubert "length of %d bytes, closing connection", 110125039b37SCy Schubert DTIO_RECV_FRAME_MAX_LEN); 110225039b37SCy Schubert goto close_connection; 110325039b37SCy Schubert } 110425039b37SCy Schubert dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 110525039b37SCy Schubert dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 110625039b37SCy Schubert if(!dtio->read_frame.buf) { 110725039b37SCy Schubert log_err("dnstap io: out of memory (creating read " 110825039b37SCy Schubert "buffer)"); 110925039b37SCy Schubert goto close_connection; 111025039b37SCy Schubert } 111125039b37SCy Schubert } 111225039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 111325039b37SCy Schubert #ifdef HAVE_SSL 111425039b37SCy Schubert if(dtio->ssl) { 111525039b37SCy Schubert r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 111625039b37SCy Schubert dtio->read_frame.buf_count, 111725039b37SCy Schubert dtio->read_frame.buf_cap- 111825039b37SCy Schubert dtio->read_frame.buf_count); 111925039b37SCy Schubert } else { 112025039b37SCy Schubert #endif 112125039b37SCy Schubert r = receive_bytes(dtio, dtio->read_frame.buf+ 112225039b37SCy Schubert dtio->read_frame.buf_count, 112325039b37SCy Schubert dtio->read_frame.buf_cap- 112425039b37SCy Schubert dtio->read_frame.buf_count); 112525039b37SCy Schubert #ifdef HAVE_SSL 112625039b37SCy Schubert } 112725039b37SCy Schubert #endif 112825039b37SCy Schubert if(r == -1) 112925039b37SCy Schubert return -1; /* continue reading */ 113025039b37SCy Schubert if(r == 0) { 113125039b37SCy Schubert /* connection closed */ 113225039b37SCy Schubert goto close_connection; 113325039b37SCy Schubert } 113425039b37SCy Schubert dtio->read_frame.buf_count += r; 113525039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 113625039b37SCy Schubert return -1; /* continue reading */ 113725039b37SCy Schubert } 113825039b37SCy Schubert 113925039b37SCy Schubert /* Complete frame received, check if this is a valid ACCEPT control 114025039b37SCy Schubert * frame. */ 114125039b37SCy Schubert if(dtio->read_frame.frame_len < 4) { 114225039b37SCy Schubert verbose(VERB_OPS, "dnstap: invalid data received"); 114325039b37SCy Schubert goto close_connection; 114425039b37SCy Schubert } 114525039b37SCy Schubert if(sldns_read_uint32(dtio->read_frame.buf) != 114625039b37SCy Schubert FSTRM_CONTROL_FRAME_ACCEPT) { 114725039b37SCy Schubert verbose(VERB_ALGO, "dnstap: invalid control type received, " 114825039b37SCy Schubert "ignored"); 114925039b37SCy Schubert dtio->ready_frame_sent = 0; 115025039b37SCy Schubert dtio->accept_frame_received = 0; 115125039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 115225039b37SCy Schubert return -1; 115325039b37SCy Schubert } 115425039b37SCy Schubert read_frame_done = 4; /* control frame type */ 115525039b37SCy Schubert 115625039b37SCy Schubert /* Iterate over control fields, ignore unknown types. 115725039b37SCy Schubert * Need to be able to read at least 8 bytes (control field type + 115825039b37SCy Schubert * length). */ 115925039b37SCy Schubert while(read_frame_done+8 < dtio->read_frame.frame_len) { 116025039b37SCy Schubert uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 116125039b37SCy Schubert read_frame_done); 116225039b37SCy Schubert uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 116325039b37SCy Schubert read_frame_done + 4); 116425039b37SCy Schubert if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 116525039b37SCy Schubert if(len == strlen(DNSTAP_CONTENT_TYPE) && 116625039b37SCy Schubert read_frame_done+8+len <= 116725039b37SCy Schubert dtio->read_frame.frame_len && 116825039b37SCy Schubert memcmp(dtio->read_frame.buf + read_frame_done + 116925039b37SCy Schubert + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 117025039b37SCy Schubert if(!dtio_control_start_send(dtio)) { 117125039b37SCy Schubert verbose(VERB_OPS, "dnstap io: out of " 117225039b37SCy Schubert "memory while sending START frame"); 117325039b37SCy Schubert goto close_connection; 117425039b37SCy Schubert } 117525039b37SCy Schubert dtio->accept_frame_received = 1; 1176*c0caa2e2SCy Schubert if(!dtio_add_output_event_write(dtio)) 1177*c0caa2e2SCy Schubert goto close_connection; 117825039b37SCy Schubert return 1; 117925039b37SCy Schubert } else { 118025039b37SCy Schubert /* unknow content type */ 118125039b37SCy Schubert verbose(VERB_ALGO, "dnstap: ACCEPT frame " 118225039b37SCy Schubert "contains unknown content type, " 118325039b37SCy Schubert "closing connection"); 118425039b37SCy Schubert goto close_connection; 118525039b37SCy Schubert } 118625039b37SCy Schubert } 118725039b37SCy Schubert /* unknown option, try next */ 118825039b37SCy Schubert read_frame_done += 8+len; 118925039b37SCy Schubert } 119025039b37SCy Schubert 119125039b37SCy Schubert 119225039b37SCy Schubert close_connection: 119325039b37SCy Schubert dtio_del_output_event(dtio); 119425039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 119525039b37SCy Schubert dtio_close_output(dtio); 119625039b37SCy Schubert return 0; 119725039b37SCy Schubert } 119825039b37SCy Schubert 119925039b37SCy Schubert /** add the output file descriptor event for listening, read only */ 120025039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio) 120125039b37SCy Schubert { 120225039b37SCy Schubert if(!dtio->event) 120325039b37SCy Schubert return 0; 120425039b37SCy Schubert if(dtio->event_added && !dtio->event_added_is_write) 120525039b37SCy Schubert return 1; 120625039b37SCy Schubert /* we have to (re-)register the event */ 120725039b37SCy Schubert if(dtio->event_added) 120825039b37SCy Schubert ub_event_del(dtio->event); 120925039b37SCy Schubert ub_event_del_bits(dtio->event, UB_EV_WRITE); 121025039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 121125039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 121225039b37SCy Schubert dtio->event_added = 0; 121325039b37SCy Schubert dtio->event_added_is_write = 0; 121425039b37SCy Schubert /* close output and start reattempts to open it */ 121525039b37SCy Schubert dtio_close_output(dtio); 121625039b37SCy Schubert return 0; 121725039b37SCy Schubert } 121825039b37SCy Schubert dtio->event_added = 1; 121925039b37SCy Schubert dtio->event_added_is_write = 0; 122025039b37SCy Schubert return 1; 122125039b37SCy Schubert } 122225039b37SCy Schubert 122325039b37SCy Schubert /** add the output file descriptor event for listening, read and write */ 122425039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio) 122525039b37SCy Schubert { 122625039b37SCy Schubert if(!dtio->event) 122725039b37SCy Schubert return 0; 122825039b37SCy Schubert if(dtio->event_added && dtio->event_added_is_write) 122925039b37SCy Schubert return 1; 123025039b37SCy Schubert /* we have to (re-)register the event */ 123125039b37SCy Schubert if(dtio->event_added) 123225039b37SCy Schubert ub_event_del(dtio->event); 123325039b37SCy Schubert ub_event_add_bits(dtio->event, UB_EV_WRITE); 123425039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 123525039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 123625039b37SCy Schubert dtio->event_added = 0; 123725039b37SCy Schubert dtio->event_added_is_write = 0; 123825039b37SCy Schubert /* close output and start reattempts to open it */ 123925039b37SCy Schubert dtio_close_output(dtio); 124025039b37SCy Schubert return 0; 124125039b37SCy Schubert } 124225039b37SCy Schubert dtio->event_added = 1; 124325039b37SCy Schubert dtio->event_added_is_write = 1; 124425039b37SCy Schubert return 1; 124525039b37SCy Schubert } 124625039b37SCy Schubert 124725039b37SCy Schubert /** put the dtio thread to sleep */ 124825039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio) 124925039b37SCy Schubert { 125025039b37SCy Schubert /* unregister the event polling for write, because there is 125125039b37SCy Schubert * nothing to be written */ 125225039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 125325039b37SCy Schubert } 125425039b37SCy Schubert 125525039b37SCy Schubert #ifdef HAVE_SSL 125625039b37SCy Schubert /** enable the brief read condition */ 125725039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio) 125825039b37SCy Schubert { 125925039b37SCy Schubert dtio->ssl_brief_read = 1; 126025039b37SCy Schubert if(dtio->stop_flush_event) { 126125039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 126225039b37SCy Schubert ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 126325039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 126425039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 126525039b37SCy Schubert return 0; 126625039b37SCy Schubert } 126725039b37SCy Schubert return 1; 126825039b37SCy Schubert } 126925039b37SCy Schubert return dtio_add_output_event_read(dtio); 127025039b37SCy Schubert } 127125039b37SCy Schubert #endif /* HAVE_SSL */ 127225039b37SCy Schubert 127325039b37SCy Schubert #ifdef HAVE_SSL 127425039b37SCy Schubert /** disable the brief read condition */ 127525039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio) 127625039b37SCy Schubert { 127725039b37SCy Schubert dtio->ssl_brief_read = 0; 127825039b37SCy Schubert if(dtio->stop_flush_event) { 127925039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 128025039b37SCy Schubert ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 128125039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 128225039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 128325039b37SCy Schubert return 0; 128425039b37SCy Schubert } 128525039b37SCy Schubert return 1; 128625039b37SCy Schubert } 128725039b37SCy Schubert return dtio_add_output_event_write(dtio); 128825039b37SCy Schubert } 128925039b37SCy Schubert #endif /* HAVE_SSL */ 129025039b37SCy Schubert 129125039b37SCy Schubert #ifdef HAVE_SSL 129225039b37SCy Schubert /** enable the brief write condition */ 129325039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio) 129425039b37SCy Schubert { 129525039b37SCy Schubert dtio->ssl_brief_write = 1; 129625039b37SCy Schubert return dtio_add_output_event_write(dtio); 129725039b37SCy Schubert } 129825039b37SCy Schubert #endif /* HAVE_SSL */ 129925039b37SCy Schubert 130025039b37SCy Schubert #ifdef HAVE_SSL 130125039b37SCy Schubert /** disable the brief write condition */ 130225039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio) 130325039b37SCy Schubert { 130425039b37SCy Schubert dtio->ssl_brief_write = 0; 130525039b37SCy Schubert return dtio_add_output_event_read(dtio); 130625039b37SCy Schubert } 130725039b37SCy Schubert #endif /* HAVE_SSL */ 130825039b37SCy Schubert 130925039b37SCy Schubert #ifdef HAVE_SSL 131025039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/ 131125039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 131225039b37SCy Schubert { 131325039b37SCy Schubert if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 131425039b37SCy Schubert /* verification */ 131525039b37SCy Schubert if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 131625039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 131725039b37SCy Schubert if(!x) { 131825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 131925039b37SCy Schubert "connection failed no certificate", 132025039b37SCy Schubert dtio->ip_str); 132125039b37SCy Schubert return 0; 132225039b37SCy Schubert } 132325039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer certificate", 132425039b37SCy Schubert x); 132525039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 132625039b37SCy Schubert if(SSL_get0_peername(dtio->ssl)) { 132725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 132825039b37SCy Schubert "connection to %s authenticated", 132925039b37SCy Schubert dtio->ip_str, 133025039b37SCy Schubert SSL_get0_peername(dtio->ssl)); 133125039b37SCy Schubert } else { 133225039b37SCy Schubert #endif 133325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 133425039b37SCy Schubert "connection authenticated", 133525039b37SCy Schubert dtio->ip_str); 133625039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 133725039b37SCy Schubert } 133825039b37SCy Schubert #endif 133925039b37SCy Schubert X509_free(x); 134025039b37SCy Schubert } else { 134125039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 134225039b37SCy Schubert if(x) { 134325039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer " 134425039b37SCy Schubert "certificate", x); 134525039b37SCy Schubert X509_free(x); 134625039b37SCy Schubert } 134725039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 134825039b37SCy Schubert "failed: failed to authenticate", 134925039b37SCy Schubert dtio->ip_str); 135025039b37SCy Schubert return 0; 135125039b37SCy Schubert } 135225039b37SCy Schubert } else { 135325039b37SCy Schubert /* unauthenticated, the verify peer flag was not set 135425039b37SCy Schubert * in ssl when the ssl object was created from ssl_ctx */ 135525039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 135625039b37SCy Schubert dtio->ip_str); 135725039b37SCy Schubert } 135825039b37SCy Schubert return 1; 135925039b37SCy Schubert } 136025039b37SCy Schubert #endif /* HAVE_SSL */ 136125039b37SCy Schubert 136225039b37SCy Schubert #ifdef HAVE_SSL 136325039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */ 136425039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio, 136525039b37SCy Schubert struct stop_flush_info* info) 136625039b37SCy Schubert { 136725039b37SCy Schubert int r; 136825039b37SCy Schubert if(dtio->ssl_brief_read) { 136925039b37SCy Schubert /* assume the brief read condition is satisfied, 137025039b37SCy Schubert * if we need more or again, we can set it again */ 137125039b37SCy Schubert if(!dtio_disable_brief_read(dtio)) { 137225039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 137325039b37SCy Schubert return 0; 137425039b37SCy Schubert } 137525039b37SCy Schubert } 137625039b37SCy Schubert if(dtio->ssl_handshake_done) 137725039b37SCy Schubert return 1; 137825039b37SCy Schubert 137925039b37SCy Schubert ERR_clear_error(); 138025039b37SCy Schubert r = SSL_do_handshake(dtio->ssl); 138125039b37SCy Schubert if(r != 1) { 138225039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 138325039b37SCy Schubert if(want == SSL_ERROR_WANT_READ) { 138425039b37SCy Schubert /* we want to read on the connection */ 138525039b37SCy Schubert if(!dtio_enable_brief_read(dtio)) { 138625039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 138725039b37SCy Schubert return 0; 138825039b37SCy Schubert } 138925039b37SCy Schubert return 0; 139025039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 139125039b37SCy Schubert /* we want to write on the connection */ 139225039b37SCy Schubert return 0; 139325039b37SCy Schubert } else if(r == 0) { 139425039b37SCy Schubert /* closed */ 139525039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 139625039b37SCy Schubert dtio_del_output_event(dtio); 139725039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 139825039b37SCy Schubert dtio_close_output(dtio); 139925039b37SCy Schubert return 0; 140025039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 140125039b37SCy Schubert /* SYSCALL and errno==0 means closed uncleanly */ 140225039b37SCy Schubert int silent = 0; 140325039b37SCy Schubert #ifdef EPIPE 140425039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 140525039b37SCy Schubert silent = 1; /* silence 'broken pipe' */ 140625039b37SCy Schubert #endif 140725039b37SCy Schubert #ifdef ECONNRESET 140825039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 140925039b37SCy Schubert silent = 1; /* silence reset by peer */ 141025039b37SCy Schubert #endif 141125039b37SCy Schubert if(errno == 0) 141225039b37SCy Schubert silent = 1; 141325039b37SCy Schubert if(!silent) 141425039b37SCy Schubert log_err("dnstap io, SSL_handshake syscall: %s", 141525039b37SCy Schubert strerror(errno)); 141625039b37SCy Schubert /* closed */ 141725039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 141825039b37SCy Schubert dtio_del_output_event(dtio); 141925039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 142025039b37SCy Schubert dtio_close_output(dtio); 142125039b37SCy Schubert return 0; 142225039b37SCy Schubert } else { 142325039b37SCy Schubert unsigned long err = ERR_get_error(); 142425039b37SCy Schubert if(!squelch_err_ssl_handshake(err)) { 142525039b37SCy Schubert log_crypto_err_code("dnstap io, ssl handshake failed", 142625039b37SCy Schubert err); 142725039b37SCy Schubert verbose(VERB_OPS, "dnstap io, ssl handshake failed " 142825039b37SCy Schubert "from %s", dtio->ip_str); 142925039b37SCy Schubert } 143025039b37SCy Schubert /* closed */ 143125039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 143225039b37SCy Schubert dtio_del_output_event(dtio); 143325039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 143425039b37SCy Schubert dtio_close_output(dtio); 143525039b37SCy Schubert return 0; 143625039b37SCy Schubert } 143725039b37SCy Schubert 143825039b37SCy Schubert } 143925039b37SCy Schubert /* check peer verification */ 144025039b37SCy Schubert dtio->ssl_handshake_done = 1; 144125039b37SCy Schubert 144225039b37SCy Schubert if(!dtio_ssl_check_peer(dtio)) { 144325039b37SCy Schubert /* closed */ 144425039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 144525039b37SCy Schubert dtio_del_output_event(dtio); 144625039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 144725039b37SCy Schubert dtio_close_output(dtio); 144825039b37SCy Schubert return 0; 144925039b37SCy Schubert } 145025039b37SCy Schubert return 1; 145125039b37SCy Schubert } 145225039b37SCy Schubert #endif /* HAVE_SSL */ 145325039b37SCy Schubert 145425039b37SCy Schubert /** callback for the dnstap events, to write to the output */ 145525039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 145625039b37SCy Schubert { 145725039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 145825039b37SCy Schubert int i; 145925039b37SCy Schubert 146025039b37SCy Schubert if(dtio->check_nb_connect) { 146125039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 146225039b37SCy Schubert if(connect_err == -1) { 146325039b37SCy Schubert /* close the channel */ 146425039b37SCy Schubert dtio_del_output_event(dtio); 146525039b37SCy Schubert dtio_close_output(dtio); 146625039b37SCy Schubert return; 146725039b37SCy Schubert } else if(connect_err == 0) { 146825039b37SCy Schubert /* try again later */ 146925039b37SCy Schubert return; 147025039b37SCy Schubert } 147125039b37SCy Schubert /* nonblocking connect check passed, continue */ 147225039b37SCy Schubert } 147325039b37SCy Schubert 147425039b37SCy Schubert #ifdef HAVE_SSL 147525039b37SCy Schubert if(dtio->ssl && 147625039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 147725039b37SCy Schubert if(!dtio_ssl_handshake(dtio, NULL)) 147825039b37SCy Schubert return; 147925039b37SCy Schubert } 148025039b37SCy Schubert #endif 148125039b37SCy Schubert 148225039b37SCy Schubert if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 148325039b37SCy Schubert if(dtio->ssl_brief_write) 148425039b37SCy Schubert (void)dtio_disable_brief_write(dtio); 148525039b37SCy Schubert if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 148625039b37SCy Schubert if(dtio_read_accept_frame(dtio) <= 0) 148725039b37SCy Schubert return; 148825039b37SCy Schubert } else if(!dtio_check_close(dtio)) 148925039b37SCy Schubert return; 149025039b37SCy Schubert } 149125039b37SCy Schubert 149225039b37SCy Schubert /* loop to process a number of messages. This improves throughput, 149325039b37SCy Schubert * because selecting on write-event if not needed for busy messages 149425039b37SCy Schubert * (dnstap log) generation and if they need to all be written back. 149525039b37SCy Schubert * The write event is usually not blocked up. But not forever, 149625039b37SCy Schubert * because the event loop needs to stay responsive for other events. 149725039b37SCy Schubert * If there are no (more) messages, or if the output buffers get 149825039b37SCy Schubert * full, it returns out of the loop. */ 149925039b37SCy Schubert for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 150025039b37SCy Schubert /* see if there are messages that need writing */ 150125039b37SCy Schubert if(!dtio->cur_msg) { 150225039b37SCy Schubert if(!dtio_find_msg(dtio)) { 150325039b37SCy Schubert if(i == 0) { 150425039b37SCy Schubert /* no messages on the first iteration, 150525039b37SCy Schubert * the queues are all empty */ 150625039b37SCy Schubert dtio_sleep(dtio); 150725039b37SCy Schubert } 150825039b37SCy Schubert return; /* nothing to do */ 150925039b37SCy Schubert } 151025039b37SCy Schubert } 151125039b37SCy Schubert 151225039b37SCy Schubert /* write it */ 151325039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 151425039b37SCy Schubert if(!dtio_write_more(dtio)) 151525039b37SCy Schubert return; 151625039b37SCy Schubert } 151725039b37SCy Schubert 151825039b37SCy Schubert /* done with the current message */ 151925039b37SCy Schubert dtio_cur_msg_free(dtio); 152025039b37SCy Schubert 152125039b37SCy Schubert /* If this is a bidirectional stream the first message will be 152225039b37SCy Schubert * the READY control frame. We can only continue writing after 152325039b37SCy Schubert * receiving an ACCEPT control frame. */ 152425039b37SCy Schubert if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 152525039b37SCy Schubert dtio->ready_frame_sent = 1; 152625039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 152725039b37SCy Schubert break; 152825039b37SCy Schubert } 152925039b37SCy Schubert } 153025039b37SCy Schubert } 153125039b37SCy Schubert 153225039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */ 153325039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 153425039b37SCy Schubert { 153525039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 153625039b37SCy Schubert uint8_t cmd; 153725039b37SCy Schubert ssize_t r; 153825039b37SCy Schubert if(dtio->want_to_exit) 153925039b37SCy Schubert return; 154025039b37SCy Schubert r = read(fd, &cmd, sizeof(cmd)); 154125039b37SCy Schubert if(r == -1) { 154225039b37SCy Schubert #ifndef USE_WINSOCK 154325039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 154425039b37SCy Schubert return; /* ignore this */ 154525039b37SCy Schubert #else 154625039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 154725039b37SCy Schubert return; 154825039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 154925039b37SCy Schubert return; 155025039b37SCy Schubert #endif 1551*c0caa2e2SCy Schubert log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 155225039b37SCy Schubert /* and then fall through to quit the thread */ 155325039b37SCy Schubert } else if(r == 0) { 155425039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 155525039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 155625039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 155725039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 155825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 155925039b37SCy Schubert 156025039b37SCy Schubert if(dtio->is_bidirectional && !dtio->accept_frame_received) { 156125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 156225039b37SCy Schubert "waiting for ACCEPT control frame"); 156325039b37SCy Schubert return; 156425039b37SCy Schubert } 156525039b37SCy Schubert 156625039b37SCy Schubert /* reregister event */ 156725039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 156825039b37SCy Schubert return; 156925039b37SCy Schubert return; 157025039b37SCy Schubert } else if(r == 1) { 157125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 157225039b37SCy Schubert } 157325039b37SCy Schubert dtio->want_to_exit = 1; 157425039b37SCy Schubert if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 157525039b37SCy Schubert != 0) { 157625039b37SCy Schubert log_err("dnstap io: could not loopexit"); 157725039b37SCy Schubert } 157825039b37SCy Schubert } 157925039b37SCy Schubert 158025039b37SCy Schubert #ifndef THREADS_DISABLED 158125039b37SCy Schubert /** setup the event base for the dnstap io thread */ 158225039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 158325039b37SCy Schubert struct timeval* now) 158425039b37SCy Schubert { 158525039b37SCy Schubert memset(now, 0, sizeof(*now)); 158625039b37SCy Schubert dtio->event_base = ub_default_event_base(0, secs, now); 158725039b37SCy Schubert if(!dtio->event_base) { 158825039b37SCy Schubert fatal_exit("dnstap io: could not create event_base"); 158925039b37SCy Schubert } 159025039b37SCy Schubert } 159125039b37SCy Schubert #endif /* THREADS_DISABLED */ 159225039b37SCy Schubert 159325039b37SCy Schubert /** setup the cmd event for dnstap io */ 159425039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio) 159525039b37SCy Schubert { 159625039b37SCy Schubert struct ub_event* cmdev; 159725039b37SCy Schubert fd_set_nonblock(dtio->commandpipe[0]); 159825039b37SCy Schubert cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 159925039b37SCy Schubert UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 160025039b37SCy Schubert if(!cmdev) { 160125039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 160225039b37SCy Schubert } 160325039b37SCy Schubert dtio->command_event = cmdev; 160425039b37SCy Schubert if(ub_event_add(cmdev, NULL) != 0) { 160525039b37SCy Schubert fatal_exit("dnstap io: out of memory (adding event)"); 160625039b37SCy Schubert } 160725039b37SCy Schubert } 160825039b37SCy Schubert 160925039b37SCy Schubert /** setup the reconnect event for dnstap io */ 161025039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio) 161125039b37SCy Schubert { 161225039b37SCy Schubert dtio_reconnect_clear(dtio); 161325039b37SCy Schubert dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 161425039b37SCy Schubert UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 161525039b37SCy Schubert if(!dtio->reconnect_timer) { 161625039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 161725039b37SCy Schubert } 161825039b37SCy Schubert } 161925039b37SCy Schubert 162025039b37SCy Schubert /** 162125039b37SCy Schubert * structure to keep track of information during stop flush 162225039b37SCy Schubert */ 162325039b37SCy Schubert struct stop_flush_info { 162425039b37SCy Schubert /** the event base during stop flush */ 162525039b37SCy Schubert struct ub_event_base* base; 162625039b37SCy Schubert /** did we already want to exit this stop-flush event base */ 162725039b37SCy Schubert int want_to_exit_flush; 162825039b37SCy Schubert /** has the timer fired */ 162925039b37SCy Schubert int timer_done; 163025039b37SCy Schubert /** the dtio */ 163125039b37SCy Schubert struct dt_io_thread* dtio; 163225039b37SCy Schubert /** the stop control frame */ 163325039b37SCy Schubert void* stop_frame; 163425039b37SCy Schubert /** length of the stop frame */ 163525039b37SCy Schubert size_t stop_frame_len; 163625039b37SCy Schubert /** how much we have done of the stop frame */ 163725039b37SCy Schubert size_t stop_frame_done; 163825039b37SCy Schubert }; 163925039b37SCy Schubert 164025039b37SCy Schubert /** exit the stop flush base */ 164125039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info) 164225039b37SCy Schubert { 164325039b37SCy Schubert if(info->want_to_exit_flush) 164425039b37SCy Schubert return; 164525039b37SCy Schubert info->want_to_exit_flush = 1; 164625039b37SCy Schubert if(ub_event_base_loopexit(info->base) != 0) { 164725039b37SCy Schubert log_err("dnstap io: could not loopexit"); 164825039b37SCy Schubert } 164925039b37SCy Schubert } 165025039b37SCy Schubert 165125039b37SCy Schubert /** send the stop control, 165225039b37SCy Schubert * return true if completed the frame. */ 165325039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info) 165425039b37SCy Schubert { 165525039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 165625039b37SCy Schubert int r; 165725039b37SCy Schubert if(info->stop_frame_done >= info->stop_frame_len) 165825039b37SCy Schubert return 1; 165925039b37SCy Schubert r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 166025039b37SCy Schubert info->stop_frame_done, info->stop_frame_len - 166125039b37SCy Schubert info->stop_frame_done); 166225039b37SCy Schubert if(r == -1) { 166325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 166425039b37SCy Schubert dtio_stop_flush_exit(info); 166525039b37SCy Schubert return 0; 166625039b37SCy Schubert } 166725039b37SCy Schubert if(r == 0) { 166825039b37SCy Schubert /* try again later, or timeout */ 166925039b37SCy Schubert return 0; 167025039b37SCy Schubert } 167125039b37SCy Schubert info->stop_frame_done += r; 167225039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) 167325039b37SCy Schubert return 0; /* not done yet */ 167425039b37SCy Schubert return 1; 167525039b37SCy Schubert } 167625039b37SCy Schubert 167725039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 167825039b37SCy Schubert void* arg) 167925039b37SCy Schubert { 168025039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 168125039b37SCy Schubert if(info->want_to_exit_flush) 168225039b37SCy Schubert return; 168325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 168425039b37SCy Schubert info->timer_done = 1; 168525039b37SCy Schubert dtio_stop_flush_exit(info); 168625039b37SCy Schubert } 168725039b37SCy Schubert 168825039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 168925039b37SCy Schubert { 169025039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 169125039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 169225039b37SCy Schubert if(info->want_to_exit_flush) 169325039b37SCy Schubert return; 169425039b37SCy Schubert if(dtio->check_nb_connect) { 169525039b37SCy Schubert /* we don't start the stop_flush if connect still 169625039b37SCy Schubert * in progress, but the check code is here, just in case */ 169725039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 169825039b37SCy Schubert if(connect_err == -1) { 169925039b37SCy Schubert /* close the channel, exit the stop flush */ 170025039b37SCy Schubert dtio_stop_flush_exit(info); 170125039b37SCy Schubert dtio_del_output_event(dtio); 170225039b37SCy Schubert dtio_close_output(dtio); 170325039b37SCy Schubert return; 170425039b37SCy Schubert } else if(connect_err == 0) { 170525039b37SCy Schubert /* try again later */ 170625039b37SCy Schubert return; 170725039b37SCy Schubert } 170825039b37SCy Schubert /* nonblocking connect check passed, continue */ 170925039b37SCy Schubert } 171025039b37SCy Schubert #ifdef HAVE_SSL 171125039b37SCy Schubert if(dtio->ssl && 171225039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 171325039b37SCy Schubert if(!dtio_ssl_handshake(dtio, info)) 171425039b37SCy Schubert return; 171525039b37SCy Schubert } 171625039b37SCy Schubert #endif 171725039b37SCy Schubert 171825039b37SCy Schubert if((bits&UB_EV_READ)) { 171925039b37SCy Schubert if(!dtio_check_close(dtio)) { 172025039b37SCy Schubert if(dtio->fd == -1) { 172125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 172225039b37SCy Schubert "stop flush: output closed"); 172325039b37SCy Schubert dtio_stop_flush_exit(info); 172425039b37SCy Schubert } 172525039b37SCy Schubert return; 172625039b37SCy Schubert } 172725039b37SCy Schubert } 172825039b37SCy Schubert /* write remainder of last frame */ 172925039b37SCy Schubert if(dtio->cur_msg) { 173025039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 173125039b37SCy Schubert if(!dtio_write_more(dtio)) { 173225039b37SCy Schubert if(dtio->fd == -1) { 173325039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 173425039b37SCy Schubert "stop flush: output closed"); 173525039b37SCy Schubert dtio_stop_flush_exit(info); 173625039b37SCy Schubert } 173725039b37SCy Schubert return; 173825039b37SCy Schubert } 173925039b37SCy Schubert } 174025039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 174125039b37SCy Schubert "last frame"); 174225039b37SCy Schubert dtio_cur_msg_free(dtio); 174325039b37SCy Schubert } 174425039b37SCy Schubert /* write stop frame */ 174525039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) { 174625039b37SCy Schubert if(!dtio_control_stop_send(info)) 174725039b37SCy Schubert return; 174825039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 174925039b37SCy Schubert "stop control frame"); 175025039b37SCy Schubert } 175125039b37SCy Schubert /* when last frame and stop frame are sent, exit */ 175225039b37SCy Schubert dtio_stop_flush_exit(info); 175325039b37SCy Schubert } 175425039b37SCy Schubert 175525039b37SCy Schubert /** flush at end, last packet and stop control */ 175625039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio) 175725039b37SCy Schubert { 175825039b37SCy Schubert /* briefly attempt to flush the previous packet to the output, 175925039b37SCy Schubert * this could be a partial packet, or even the start control frame */ 176025039b37SCy Schubert time_t secs = 0; 176125039b37SCy Schubert struct timeval now; 176225039b37SCy Schubert struct stop_flush_info info; 176325039b37SCy Schubert struct timeval tv; 176425039b37SCy Schubert struct ub_event* timer, *stopev; 176525039b37SCy Schubert 176625039b37SCy Schubert if(dtio->fd == -1 || dtio->check_nb_connect) { 176725039b37SCy Schubert /* no connection or we have just connected, so nothing is 176825039b37SCy Schubert * sent yet, so nothing to stop or flush */ 176925039b37SCy Schubert return; 177025039b37SCy Schubert } 177125039b37SCy Schubert if(dtio->ssl && !dtio->ssl_handshake_done) { 177225039b37SCy Schubert /* no SSL connection has been established yet */ 177325039b37SCy Schubert return; 177425039b37SCy Schubert } 177525039b37SCy Schubert 177625039b37SCy Schubert memset(&info, 0, sizeof(info)); 177725039b37SCy Schubert memset(&now, 0, sizeof(now)); 177825039b37SCy Schubert info.dtio = dtio; 177925039b37SCy Schubert info.base = ub_default_event_base(0, &secs, &now); 178025039b37SCy Schubert if(!info.base) { 178125039b37SCy Schubert log_err("dnstap io: malloc failure"); 178225039b37SCy Schubert return; 178325039b37SCy Schubert } 178425039b37SCy Schubert timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 178525039b37SCy Schubert &dtio_stop_timer_cb, &info); 178625039b37SCy Schubert if(!timer) { 178725039b37SCy Schubert log_err("dnstap io: malloc failure"); 178825039b37SCy Schubert ub_event_base_free(info.base); 178925039b37SCy Schubert return; 179025039b37SCy Schubert } 179125039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 179225039b37SCy Schubert tv.tv_sec = 2; 179325039b37SCy Schubert if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 179425039b37SCy Schubert &tv) != 0) { 179525039b37SCy Schubert log_err("dnstap io: cannot event_timer_add"); 179625039b37SCy Schubert ub_event_free(timer); 179725039b37SCy Schubert ub_event_base_free(info.base); 179825039b37SCy Schubert return; 179925039b37SCy Schubert } 180025039b37SCy Schubert stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 180125039b37SCy Schubert UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 180225039b37SCy Schubert if(!stopev) { 180325039b37SCy Schubert log_err("dnstap io: malloc failure"); 180425039b37SCy Schubert ub_timer_del(timer); 180525039b37SCy Schubert ub_event_free(timer); 180625039b37SCy Schubert ub_event_base_free(info.base); 180725039b37SCy Schubert return; 180825039b37SCy Schubert } 180925039b37SCy Schubert if(ub_event_add(stopev, NULL) != 0) { 181025039b37SCy Schubert log_err("dnstap io: cannot event_add"); 181125039b37SCy Schubert ub_event_free(stopev); 181225039b37SCy Schubert ub_timer_del(timer); 181325039b37SCy Schubert ub_event_free(timer); 181425039b37SCy Schubert ub_event_base_free(info.base); 181525039b37SCy Schubert return; 181625039b37SCy Schubert } 181725039b37SCy Schubert info.stop_frame = fstrm_create_control_frame_stop( 181825039b37SCy Schubert &info.stop_frame_len); 181925039b37SCy Schubert if(!info.stop_frame) { 182025039b37SCy Schubert log_err("dnstap io: malloc failure"); 182125039b37SCy Schubert ub_event_del(stopev); 182225039b37SCy Schubert ub_event_free(stopev); 182325039b37SCy Schubert ub_timer_del(timer); 182425039b37SCy Schubert ub_event_free(timer); 182525039b37SCy Schubert ub_event_base_free(info.base); 182625039b37SCy Schubert return; 182725039b37SCy Schubert } 182825039b37SCy Schubert dtio->stop_flush_event = stopev; 182925039b37SCy Schubert 183025039b37SCy Schubert /* wait briefly, or until finished */ 183125039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush started"); 183225039b37SCy Schubert if(ub_event_base_dispatch(info.base) < 0) { 183325039b37SCy Schubert log_err("dnstap io: dispatch flush failed, errno is %s", 183425039b37SCy Schubert strerror(errno)); 183525039b37SCy Schubert } 183625039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush ended"); 183725039b37SCy Schubert free(info.stop_frame); 183825039b37SCy Schubert dtio->stop_flush_event = NULL; 183925039b37SCy Schubert ub_event_del(stopev); 184025039b37SCy Schubert ub_event_free(stopev); 184125039b37SCy Schubert ub_timer_del(timer); 184225039b37SCy Schubert ub_event_free(timer); 184325039b37SCy Schubert ub_event_base_free(info.base); 184425039b37SCy Schubert } 184525039b37SCy Schubert 184625039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */ 184725039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio) 184825039b37SCy Schubert { 184925039b37SCy Schubert dtio_control_stop_flush(dtio); 185025039b37SCy Schubert dtio_del_output_event(dtio); 185125039b37SCy Schubert dtio_close_output(dtio); 185225039b37SCy Schubert ub_event_del(dtio->command_event); 185325039b37SCy Schubert ub_event_free(dtio->command_event); 185425039b37SCy Schubert #ifndef USE_WINSOCK 185525039b37SCy Schubert close(dtio->commandpipe[0]); 185625039b37SCy Schubert #else 185725039b37SCy Schubert _close(dtio->commandpipe[0]); 185825039b37SCy Schubert #endif 185925039b37SCy Schubert dtio->commandpipe[0] = -1; 186025039b37SCy Schubert dtio_reconnect_del(dtio); 186125039b37SCy Schubert ub_event_free(dtio->reconnect_timer); 186225039b37SCy Schubert dtio_cur_msg_free(dtio); 186325039b37SCy Schubert #ifndef THREADS_DISABLED 186425039b37SCy Schubert ub_event_base_free(dtio->event_base); 186525039b37SCy Schubert #endif 186625039b37SCy Schubert } 186725039b37SCy Schubert 186825039b37SCy Schubert /** setup a start control message */ 186925039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio) 187025039b37SCy Schubert { 187125039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 187225039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 187325039b37SCy Schubert &dtio->cur_msg_len); 187425039b37SCy Schubert if(!dtio->cur_msg) { 187525039b37SCy Schubert return 0; 187625039b37SCy Schubert } 187725039b37SCy Schubert /* setup to send the control message */ 187825039b37SCy Schubert /* set that the buffer needs to be sent, but the length 187925039b37SCy Schubert * of that buffer is already written, that way the buffer can 188025039b37SCy Schubert * start with 0 length and then the length of the control frame 188125039b37SCy Schubert * in it */ 188225039b37SCy Schubert dtio->cur_msg_done = 0; 188325039b37SCy Schubert dtio->cur_msg_len_done = 4; 188425039b37SCy Schubert return 1; 188525039b37SCy Schubert } 188625039b37SCy Schubert 188725039b37SCy Schubert /** setup a ready control message */ 188825039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio) 188925039b37SCy Schubert { 189025039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 189125039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 189225039b37SCy Schubert &dtio->cur_msg_len); 189325039b37SCy Schubert if(!dtio->cur_msg) { 189425039b37SCy Schubert return 0; 189525039b37SCy Schubert } 189625039b37SCy Schubert /* setup to send the control message */ 189725039b37SCy Schubert /* set that the buffer needs to be sent, but the length 189825039b37SCy Schubert * of that buffer is already written, that way the buffer can 189925039b37SCy Schubert * start with 0 length and then the length of the control frame 190025039b37SCy Schubert * in it */ 190125039b37SCy Schubert dtio->cur_msg_done = 0; 190225039b37SCy Schubert dtio->cur_msg_len_done = 4; 190325039b37SCy Schubert return 1; 190425039b37SCy Schubert } 190525039b37SCy Schubert 190625039b37SCy Schubert /** open the output file descriptor for af_local */ 190725039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio) 190825039b37SCy Schubert { 190925039b37SCy Schubert #ifdef HAVE_SYS_UN_H 191025039b37SCy Schubert struct sockaddr_un s; 191125039b37SCy Schubert dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 191225039b37SCy Schubert if(dtio->fd == -1) { 191325039b37SCy Schubert log_err("dnstap io: failed to create socket: %s", 1914*c0caa2e2SCy Schubert sock_strerror(errno)); 191525039b37SCy Schubert return 0; 191625039b37SCy Schubert } 191725039b37SCy Schubert memset(&s, 0, sizeof(s)); 191825039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 191925039b37SCy Schubert /* this member exists on BSDs, not Linux */ 192025039b37SCy Schubert s.sun_len = (unsigned)sizeof(s); 192125039b37SCy Schubert #endif 192225039b37SCy Schubert s.sun_family = AF_LOCAL; 192325039b37SCy Schubert /* length is 92-108, 104 on FreeBSD */ 192425039b37SCy Schubert (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 192525039b37SCy Schubert fd_set_nonblock(dtio->fd); 192625039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 192725039b37SCy Schubert == -1) { 192825039b37SCy Schubert char* to = dtio->socket_path; 1929*c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1930*c0caa2e2SCy Schubert verbosity < 4) { 1931*c0caa2e2SCy Schubert dtio_close_fd(dtio); 1932*c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1933*c0caa2e2SCy Schubert } 193425039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 1935*c0caa2e2SCy Schubert to, sock_strerror(errno)); 193625039b37SCy Schubert dtio_close_fd(dtio); 193725039b37SCy Schubert return 0; 193825039b37SCy Schubert } 193925039b37SCy Schubert return 1; 194025039b37SCy Schubert #else 194125039b37SCy Schubert log_err("cannot create af_local socket"); 194225039b37SCy Schubert return 0; 194325039b37SCy Schubert #endif /* HAVE_SYS_UN_H */ 194425039b37SCy Schubert } 194525039b37SCy Schubert 194625039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */ 194725039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio) 194825039b37SCy Schubert { 194925039b37SCy Schubert struct sockaddr_storage addr; 195025039b37SCy Schubert socklen_t addrlen; 195125039b37SCy Schubert memset(&addr, 0, sizeof(addr)); 195225039b37SCy Schubert addrlen = (socklen_t)sizeof(addr); 195325039b37SCy Schubert 195425039b37SCy Schubert if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 195525039b37SCy Schubert log_err("could not parse IP '%s'", dtio->ip_str); 195625039b37SCy Schubert return 0; 195725039b37SCy Schubert } 195825039b37SCy Schubert dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 195925039b37SCy Schubert if(dtio->fd == -1) { 1960*c0caa2e2SCy Schubert log_err("can't create socket: %s", sock_strerror(errno)); 196125039b37SCy Schubert return 0; 196225039b37SCy Schubert } 196325039b37SCy Schubert fd_set_nonblock(dtio->fd); 196425039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 196525039b37SCy Schubert if(errno == EINPROGRESS) 196625039b37SCy Schubert return 1; /* wait until connect done*/ 1967*c0caa2e2SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1968*c0caa2e2SCy Schubert verbosity < 4) { 1969*c0caa2e2SCy Schubert dtio_close_fd(dtio); 1970*c0caa2e2SCy Schubert return 0; /* no log retries on low verbosity */ 1971*c0caa2e2SCy Schubert } 197225039b37SCy Schubert #ifndef USE_WINSOCK 197325039b37SCy Schubert if(tcp_connect_errno_needs_log( 197425039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 197525039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 197625039b37SCy Schubert dtio->ip_str, strerror(errno)); 197725039b37SCy Schubert } 197825039b37SCy Schubert #else 197925039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS || 198025039b37SCy Schubert WSAGetLastError() == WSAEWOULDBLOCK) 198125039b37SCy Schubert return 1; /* wait until connect done*/ 198225039b37SCy Schubert if(tcp_connect_errno_needs_log( 198325039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 198425039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 198525039b37SCy Schubert dtio->ip_str, wsa_strerror(WSAGetLastError())); 198625039b37SCy Schubert } 198725039b37SCy Schubert #endif 198825039b37SCy Schubert dtio_close_fd(dtio); 198925039b37SCy Schubert return 0; 199025039b37SCy Schubert } 199125039b37SCy Schubert return 1; 199225039b37SCy Schubert } 199325039b37SCy Schubert 199425039b37SCy Schubert /** setup the SSL structure for new connection */ 199525039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio) 199625039b37SCy Schubert { 199725039b37SCy Schubert dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 199825039b37SCy Schubert if(!dtio->ssl) return 0; 199925039b37SCy Schubert dtio->ssl_handshake_done = 0; 200025039b37SCy Schubert dtio->ssl_brief_read = 0; 200125039b37SCy Schubert 200225039b37SCy Schubert if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 200325039b37SCy Schubert dtio->tls_use_sni)) { 200425039b37SCy Schubert return 0; 200525039b37SCy Schubert } 200625039b37SCy Schubert return 1; 200725039b37SCy Schubert } 200825039b37SCy Schubert 200925039b37SCy Schubert /** open the output file descriptor */ 201025039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio) 201125039b37SCy Schubert { 201225039b37SCy Schubert struct ub_event* ev; 201325039b37SCy Schubert if(dtio->upstream_is_unix) { 201425039b37SCy Schubert if(!dtio_open_output_local(dtio)) { 201525039b37SCy Schubert dtio_reconnect_enable(dtio); 201625039b37SCy Schubert return; 201725039b37SCy Schubert } 201825039b37SCy Schubert } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 201925039b37SCy Schubert if(!dtio_open_output_tcp(dtio)) { 202025039b37SCy Schubert dtio_reconnect_enable(dtio); 202125039b37SCy Schubert return; 202225039b37SCy Schubert } 202325039b37SCy Schubert if(dtio->upstream_is_tls) { 202425039b37SCy Schubert if(!dtio_setup_ssl(dtio)) { 202525039b37SCy Schubert dtio_close_fd(dtio); 202625039b37SCy Schubert dtio_reconnect_enable(dtio); 202725039b37SCy Schubert return; 202825039b37SCy Schubert } 202925039b37SCy Schubert } 203025039b37SCy Schubert } 203125039b37SCy Schubert dtio->check_nb_connect = 1; 203225039b37SCy Schubert 203325039b37SCy Schubert /* the EV_READ is to read ACCEPT control messages, and catch channel 203425039b37SCy Schubert * close. EV_WRITE is to write packets */ 203525039b37SCy Schubert ev = ub_event_new(dtio->event_base, dtio->fd, 203625039b37SCy Schubert UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 203725039b37SCy Schubert dtio); 203825039b37SCy Schubert if(!ev) { 203925039b37SCy Schubert log_err("dnstap io: out of memory"); 204025039b37SCy Schubert if(dtio->ssl) { 204125039b37SCy Schubert #ifdef HAVE_SSL 204225039b37SCy Schubert SSL_free(dtio->ssl); 204325039b37SCy Schubert dtio->ssl = NULL; 204425039b37SCy Schubert #endif 204525039b37SCy Schubert } 204625039b37SCy Schubert dtio_close_fd(dtio); 204725039b37SCy Schubert dtio_reconnect_enable(dtio); 204825039b37SCy Schubert return; 204925039b37SCy Schubert } 205025039b37SCy Schubert dtio->event = ev; 205125039b37SCy Schubert 205225039b37SCy Schubert /* setup protocol control message to start */ 205325039b37SCy Schubert if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 205425039b37SCy Schubert (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 205525039b37SCy Schubert log_err("dnstap io: out of memory"); 205625039b37SCy Schubert ub_event_free(dtio->event); 205725039b37SCy Schubert dtio->event = NULL; 205825039b37SCy Schubert if(dtio->ssl) { 205925039b37SCy Schubert #ifdef HAVE_SSL 206025039b37SCy Schubert SSL_free(dtio->ssl); 206125039b37SCy Schubert dtio->ssl = NULL; 206225039b37SCy Schubert #endif 206325039b37SCy Schubert } 206425039b37SCy Schubert dtio_close_fd(dtio); 206525039b37SCy Schubert dtio_reconnect_enable(dtio); 206625039b37SCy Schubert return; 206725039b37SCy Schubert } 206825039b37SCy Schubert } 206925039b37SCy Schubert 207025039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */ 207125039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio) 207225039b37SCy Schubert { 207325039b37SCy Schubert dtio_setup_cmd(dtio); 207425039b37SCy Schubert dtio_setup_reconnect(dtio); 207525039b37SCy Schubert dtio_open_output(dtio); 207625039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 207725039b37SCy Schubert return; 207825039b37SCy Schubert } 207925039b37SCy Schubert 208025039b37SCy Schubert #ifndef THREADS_DISABLED 208125039b37SCy Schubert /** the IO thread function for the DNSTAP IO */ 208225039b37SCy Schubert static void* dnstap_io(void* arg) 208325039b37SCy Schubert { 208425039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 208525039b37SCy Schubert time_t secs = 0; 208625039b37SCy Schubert struct timeval now; 208725039b37SCy Schubert log_thread_set(&dtio->threadnum); 208825039b37SCy Schubert 208925039b37SCy Schubert /* setup */ 209025039b37SCy Schubert verbose(VERB_ALGO, "start dnstap io thread"); 209125039b37SCy Schubert dtio_setup_base(dtio, &secs, &now); 209225039b37SCy Schubert dtio_setup_on_base(dtio); 209325039b37SCy Schubert 209425039b37SCy Schubert /* run */ 209525039b37SCy Schubert if(ub_event_base_dispatch(dtio->event_base) < 0) { 209625039b37SCy Schubert log_err("dnstap io: dispatch failed, errno is %s", 209725039b37SCy Schubert strerror(errno)); 209825039b37SCy Schubert } 209925039b37SCy Schubert 210025039b37SCy Schubert /* cleanup */ 210125039b37SCy Schubert verbose(VERB_ALGO, "stop dnstap io thread"); 210225039b37SCy Schubert dtio_desetup(dtio); 210325039b37SCy Schubert return NULL; 210425039b37SCy Schubert } 210525039b37SCy Schubert #endif /* THREADS_DISABLED */ 210625039b37SCy Schubert 210725039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 210825039b37SCy Schubert int numworkers) 210925039b37SCy Schubert { 211025039b37SCy Schubert /* set up the thread, can fail */ 211125039b37SCy Schubert #ifndef USE_WINSOCK 211225039b37SCy Schubert if(pipe(dtio->commandpipe) == -1) { 211325039b37SCy Schubert log_err("failed to create pipe: %s", strerror(errno)); 211425039b37SCy Schubert return 0; 211525039b37SCy Schubert } 211625039b37SCy Schubert #else 211725039b37SCy Schubert if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 211825039b37SCy Schubert log_err("failed to create _pipe: %s", 211925039b37SCy Schubert wsa_strerror(WSAGetLastError())); 212025039b37SCy Schubert return 0; 212125039b37SCy Schubert } 212225039b37SCy Schubert #endif 212325039b37SCy Schubert 212425039b37SCy Schubert /* start the thread */ 212525039b37SCy Schubert dtio->threadnum = numworkers+1; 212625039b37SCy Schubert dtio->started = 1; 212725039b37SCy Schubert #ifndef THREADS_DISABLED 212825039b37SCy Schubert ub_thread_create(&dtio->tid, dnstap_io, dtio); 212925039b37SCy Schubert (void)event_base_nothr; 213025039b37SCy Schubert #else 213125039b37SCy Schubert dtio->event_base = event_base_nothr; 213225039b37SCy Schubert dtio_setup_on_base(dtio); 213325039b37SCy Schubert #endif 213425039b37SCy Schubert return 1; 213525039b37SCy Schubert } 213625039b37SCy Schubert 213725039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio) 213825039b37SCy Schubert { 213925039b37SCy Schubert #ifndef THREADS_DISABLED 214025039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_STOP; 214125039b37SCy Schubert #endif 214225039b37SCy Schubert if(!dtio) return; 214325039b37SCy Schubert if(!dtio->started) return; 214425039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: send stop cmd"); 214525039b37SCy Schubert 214625039b37SCy Schubert #ifndef THREADS_DISABLED 214725039b37SCy Schubert while(1) { 214825039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 214925039b37SCy Schubert if(r == -1) { 215025039b37SCy Schubert #ifndef USE_WINSOCK 215125039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 215225039b37SCy Schubert continue; 215325039b37SCy Schubert #else 215425039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 215525039b37SCy Schubert continue; 215625039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 215725039b37SCy Schubert continue; 215825039b37SCy Schubert #endif 2159*c0caa2e2SCy Schubert log_err("dnstap io stop: write: %s", 2160*c0caa2e2SCy Schubert sock_strerror(errno)); 216125039b37SCy Schubert break; 216225039b37SCy Schubert } 216325039b37SCy Schubert break; 216425039b37SCy Schubert } 216525039b37SCy Schubert dtio->started = 0; 216625039b37SCy Schubert #endif /* THREADS_DISABLED */ 216725039b37SCy Schubert 216825039b37SCy Schubert #ifndef USE_WINSOCK 216925039b37SCy Schubert close(dtio->commandpipe[1]); 217025039b37SCy Schubert #else 217125039b37SCy Schubert _close(dtio->commandpipe[1]); 217225039b37SCy Schubert #endif 217325039b37SCy Schubert dtio->commandpipe[1] = -1; 217425039b37SCy Schubert #ifndef THREADS_DISABLED 217525039b37SCy Schubert ub_thread_join(dtio->tid); 217625039b37SCy Schubert #else 217725039b37SCy Schubert dtio->want_to_exit = 1; 217825039b37SCy Schubert dtio_desetup(dtio); 217925039b37SCy Schubert #endif 218025039b37SCy Schubert } 2181