1*25039b37SCy Schubert /* 2*25039b37SCy Schubert * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3*25039b37SCy Schubert * 4*25039b37SCy Schubert * Copyright (c) 2020, NLnet Labs. All rights reserved. 5*25039b37SCy Schubert * 6*25039b37SCy Schubert * This software is open source. 7*25039b37SCy Schubert * 8*25039b37SCy Schubert * Redistribution and use in source and binary forms, with or without 9*25039b37SCy Schubert * modification, are permitted provided that the following conditions 10*25039b37SCy Schubert * are met: 11*25039b37SCy Schubert * 12*25039b37SCy Schubert * Redistributions of source code must retain the above copyright notice, 13*25039b37SCy Schubert * this list of conditions and the following disclaimer. 14*25039b37SCy Schubert * 15*25039b37SCy Schubert * Redistributions in binary form must reproduce the above copyright notice, 16*25039b37SCy Schubert * this list of conditions and the following disclaimer in the documentation 17*25039b37SCy Schubert * and/or other materials provided with the distribution. 18*25039b37SCy Schubert * 19*25039b37SCy Schubert * Neither the name of the NLNET LABS nor the names of its contributors may 20*25039b37SCy Schubert * be used to endorse or promote products derived from this software without 21*25039b37SCy Schubert * specific prior written permission. 22*25039b37SCy Schubert * 23*25039b37SCy Schubert * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24*25039b37SCy Schubert * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25*25039b37SCy Schubert * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26*25039b37SCy Schubert * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27*25039b37SCy Schubert * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28*25039b37SCy Schubert * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29*25039b37SCy Schubert * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30*25039b37SCy Schubert * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31*25039b37SCy Schubert * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32*25039b37SCy Schubert * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33*25039b37SCy Schubert * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34*25039b37SCy Schubert * 35*25039b37SCy Schubert */ 36*25039b37SCy Schubert 37*25039b37SCy Schubert /** 38*25039b37SCy Schubert * \file 39*25039b37SCy Schubert * 40*25039b37SCy Schubert * An implementation of the Frame Streams data transport protocol for 41*25039b37SCy Schubert * the Unbound DNSTAP message logging facility. 42*25039b37SCy Schubert */ 43*25039b37SCy Schubert 44*25039b37SCy Schubert #include "config.h" 45*25039b37SCy Schubert #include "dnstap/dtstream.h" 46*25039b37SCy Schubert #include "dnstap/dnstap_fstrm.h" 47*25039b37SCy Schubert #include "util/config_file.h" 48*25039b37SCy Schubert #include "util/ub_event.h" 49*25039b37SCy Schubert #include "util/net_help.h" 50*25039b37SCy Schubert #include "services/outside_network.h" 51*25039b37SCy Schubert #include "sldns/sbuffer.h" 52*25039b37SCy Schubert #ifdef HAVE_SYS_UN_H 53*25039b37SCy Schubert #include <sys/un.h> 54*25039b37SCy Schubert #endif 55*25039b37SCy Schubert #include <fcntl.h> 56*25039b37SCy Schubert #ifdef HAVE_OPENSSL_SSL_H 57*25039b37SCy Schubert #include <openssl/ssl.h> 58*25039b37SCy Schubert #endif 59*25039b37SCy Schubert #ifdef HAVE_OPENSSL_ERR_H 60*25039b37SCy Schubert #include <openssl/err.h> 61*25039b37SCy Schubert #endif 62*25039b37SCy Schubert 63*25039b37SCy Schubert /** number of messages to process in one output callback */ 64*25039b37SCy Schubert #define DTIO_MESSAGES_PER_CALLBACK 100 65*25039b37SCy Schubert /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MIN 10 67*25039b37SCy Schubert /** the msec to wait for reconnect max after backoff */ 68*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69*25039b37SCy Schubert /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70*25039b37SCy Schubert #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71*25039b37SCy Schubert 72*25039b37SCy Schubert /** maximum length of received frame */ 73*25039b37SCy Schubert #define DTIO_RECV_FRAME_MAX_LEN 1000 74*25039b37SCy Schubert 75*25039b37SCy Schubert struct stop_flush_info; 76*25039b37SCy Schubert /** DTIO command channel commands */ 77*25039b37SCy Schubert enum { 78*25039b37SCy Schubert /** DTIO command channel stop */ 79*25039b37SCy Schubert DTIO_COMMAND_STOP = 0, 80*25039b37SCy Schubert /** DTIO command channel wakeup */ 81*25039b37SCy Schubert DTIO_COMMAND_WAKEUP = 1 82*25039b37SCy Schubert } dtio_channel_command; 83*25039b37SCy Schubert 84*25039b37SCy Schubert /** open the output channel */ 85*25039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio); 86*25039b37SCy Schubert /** add output event for read and write */ 87*25039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio); 88*25039b37SCy Schubert /** start reconnection attempts */ 89*25039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio); 90*25039b37SCy Schubert /** stop from stop_flush event loop */ 91*25039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info); 92*25039b37SCy Schubert /** setup a start control message */ 93*25039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio); 94*25039b37SCy Schubert #ifdef HAVE_SSL 95*25039b37SCy Schubert /** enable briefly waiting for a read event, for SSL negotiation */ 96*25039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio); 97*25039b37SCy Schubert /** enable briefly waiting for a write event, for SSL negotiation */ 98*25039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio); 99*25039b37SCy Schubert #endif 100*25039b37SCy Schubert 101*25039b37SCy Schubert struct dt_msg_queue* 102*25039b37SCy Schubert dt_msg_queue_create(void) 103*25039b37SCy Schubert { 104*25039b37SCy Schubert struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 105*25039b37SCy Schubert if(!mq) return NULL; 106*25039b37SCy Schubert mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 107*25039b37SCy Schubert about 1 M should contain 64K messages with some overhead, 108*25039b37SCy Schubert or a whole bunch smaller ones */ 109*25039b37SCy Schubert lock_basic_init(&mq->lock); 110*25039b37SCy Schubert lock_protect(&mq->lock, mq, sizeof(*mq)); 111*25039b37SCy Schubert return mq; 112*25039b37SCy Schubert } 113*25039b37SCy Schubert 114*25039b37SCy Schubert /** clear the message list, caller must hold the lock */ 115*25039b37SCy Schubert static void 116*25039b37SCy Schubert dt_msg_queue_clear(struct dt_msg_queue* mq) 117*25039b37SCy Schubert { 118*25039b37SCy Schubert struct dt_msg_entry* e = mq->first, *next=NULL; 119*25039b37SCy Schubert while(e) { 120*25039b37SCy Schubert next = e->next; 121*25039b37SCy Schubert free(e->buf); 122*25039b37SCy Schubert free(e); 123*25039b37SCy Schubert e = next; 124*25039b37SCy Schubert } 125*25039b37SCy Schubert mq->first = NULL; 126*25039b37SCy Schubert mq->last = NULL; 127*25039b37SCy Schubert mq->cursize = 0; 128*25039b37SCy Schubert } 129*25039b37SCy Schubert 130*25039b37SCy Schubert void 131*25039b37SCy Schubert dt_msg_queue_delete(struct dt_msg_queue* mq) 132*25039b37SCy Schubert { 133*25039b37SCy Schubert if(!mq) return; 134*25039b37SCy Schubert lock_basic_destroy(&mq->lock); 135*25039b37SCy Schubert dt_msg_queue_clear(mq); 136*25039b37SCy Schubert free(mq); 137*25039b37SCy Schubert } 138*25039b37SCy Schubert 139*25039b37SCy Schubert /** make the dtio wake up by sending a wakeup command */ 140*25039b37SCy Schubert static void dtio_wakeup(struct dt_io_thread* dtio) 141*25039b37SCy Schubert { 142*25039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_WAKEUP; 143*25039b37SCy Schubert if(!dtio) return; 144*25039b37SCy Schubert if(!dtio->started) return; 145*25039b37SCy Schubert 146*25039b37SCy Schubert while(1) { 147*25039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 148*25039b37SCy Schubert if(r == -1) { 149*25039b37SCy Schubert #ifndef USE_WINSOCK 150*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 151*25039b37SCy Schubert continue; 152*25039b37SCy Schubert log_err("dnstap io wakeup: write: %s", strerror(errno)); 153*25039b37SCy Schubert #else 154*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 155*25039b37SCy Schubert continue; 156*25039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 157*25039b37SCy Schubert continue; 158*25039b37SCy Schubert log_err("dnstap io stop: write: %s", 159*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 160*25039b37SCy Schubert #endif 161*25039b37SCy Schubert break; 162*25039b37SCy Schubert } 163*25039b37SCy Schubert break; 164*25039b37SCy Schubert } 165*25039b37SCy Schubert } 166*25039b37SCy Schubert 167*25039b37SCy Schubert void 168*25039b37SCy Schubert dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 169*25039b37SCy Schubert { 170*25039b37SCy Schubert int wakeup = 0; 171*25039b37SCy Schubert struct dt_msg_entry* entry; 172*25039b37SCy Schubert 173*25039b37SCy Schubert /* check conditions */ 174*25039b37SCy Schubert if(!buf) return; 175*25039b37SCy Schubert if(len == 0) { 176*25039b37SCy Schubert /* it is not possible to log entries with zero length, 177*25039b37SCy Schubert * because the framestream protocol does not carry it. 178*25039b37SCy Schubert * However the protobuf serialization does not create zero 179*25039b37SCy Schubert * length datagrams for dnstap, so this should not happen. */ 180*25039b37SCy Schubert free(buf); 181*25039b37SCy Schubert return; 182*25039b37SCy Schubert } 183*25039b37SCy Schubert if(!mq) { 184*25039b37SCy Schubert free(buf); 185*25039b37SCy Schubert return; 186*25039b37SCy Schubert } 187*25039b37SCy Schubert 188*25039b37SCy Schubert /* allocate memory for queue entry */ 189*25039b37SCy Schubert entry = malloc(sizeof(*entry)); 190*25039b37SCy Schubert if(!entry) { 191*25039b37SCy Schubert log_err("out of memory logging dnstap"); 192*25039b37SCy Schubert free(buf); 193*25039b37SCy Schubert return; 194*25039b37SCy Schubert } 195*25039b37SCy Schubert entry->next = NULL; 196*25039b37SCy Schubert entry->buf = buf; 197*25039b37SCy Schubert entry->len = len; 198*25039b37SCy Schubert 199*25039b37SCy Schubert /* aqcuire lock */ 200*25039b37SCy Schubert lock_basic_lock(&mq->lock); 201*25039b37SCy Schubert /* list was empty, wakeup dtio */ 202*25039b37SCy Schubert if(mq->first == NULL) 203*25039b37SCy Schubert wakeup = 1; 204*25039b37SCy Schubert /* see if it is going to fit */ 205*25039b37SCy Schubert if(mq->cursize + len > mq->maxsize) { 206*25039b37SCy Schubert /* buffer full, or congested. */ 207*25039b37SCy Schubert /* drop */ 208*25039b37SCy Schubert lock_basic_unlock(&mq->lock); 209*25039b37SCy Schubert free(buf); 210*25039b37SCy Schubert free(entry); 211*25039b37SCy Schubert return; 212*25039b37SCy Schubert } 213*25039b37SCy Schubert mq->cursize += len; 214*25039b37SCy Schubert /* append to list */ 215*25039b37SCy Schubert if(mq->last) { 216*25039b37SCy Schubert mq->last->next = entry; 217*25039b37SCy Schubert } else { 218*25039b37SCy Schubert mq->first = entry; 219*25039b37SCy Schubert } 220*25039b37SCy Schubert mq->last = entry; 221*25039b37SCy Schubert /* release lock */ 222*25039b37SCy Schubert lock_basic_unlock(&mq->lock); 223*25039b37SCy Schubert 224*25039b37SCy Schubert if(wakeup) 225*25039b37SCy Schubert dtio_wakeup(mq->dtio); 226*25039b37SCy Schubert } 227*25039b37SCy Schubert 228*25039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void) 229*25039b37SCy Schubert { 230*25039b37SCy Schubert struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 231*25039b37SCy Schubert return dtio; 232*25039b37SCy Schubert } 233*25039b37SCy Schubert 234*25039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio) 235*25039b37SCy Schubert { 236*25039b37SCy Schubert struct dt_io_list_item* item, *nextitem; 237*25039b37SCy Schubert if(!dtio) return; 238*25039b37SCy Schubert item=dtio->io_list; 239*25039b37SCy Schubert while(item) { 240*25039b37SCy Schubert nextitem = item->next; 241*25039b37SCy Schubert free(item); 242*25039b37SCy Schubert item = nextitem; 243*25039b37SCy Schubert } 244*25039b37SCy Schubert free(dtio->socket_path); 245*25039b37SCy Schubert free(dtio->ip_str); 246*25039b37SCy Schubert free(dtio->tls_server_name); 247*25039b37SCy Schubert free(dtio->client_key_file); 248*25039b37SCy Schubert free(dtio->client_cert_file); 249*25039b37SCy Schubert if(dtio->ssl_ctx) { 250*25039b37SCy Schubert #ifdef HAVE_SSL 251*25039b37SCy Schubert SSL_CTX_free(dtio->ssl_ctx); 252*25039b37SCy Schubert #endif 253*25039b37SCy Schubert } 254*25039b37SCy Schubert free(dtio); 255*25039b37SCy Schubert } 256*25039b37SCy Schubert 257*25039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 258*25039b37SCy Schubert { 259*25039b37SCy Schubert if(!cfg->dnstap) { 260*25039b37SCy Schubert log_warn("cannot setup dnstap because dnstap-enable is no"); 261*25039b37SCy Schubert return 0; 262*25039b37SCy Schubert } 263*25039b37SCy Schubert 264*25039b37SCy Schubert /* what type of connectivity do we have */ 265*25039b37SCy Schubert if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 266*25039b37SCy Schubert if(cfg->dnstap_tls) 267*25039b37SCy Schubert dtio->upstream_is_tls = 1; 268*25039b37SCy Schubert else dtio->upstream_is_tcp = 1; 269*25039b37SCy Schubert } else { 270*25039b37SCy Schubert dtio->upstream_is_unix = 1; 271*25039b37SCy Schubert } 272*25039b37SCy Schubert dtio->is_bidirectional = cfg->dnstap_bidirectional; 273*25039b37SCy Schubert 274*25039b37SCy Schubert if(dtio->upstream_is_unix) { 275*25039b37SCy Schubert if(!cfg->dnstap_socket_path || 276*25039b37SCy Schubert cfg->dnstap_socket_path[0]==0) { 277*25039b37SCy Schubert log_err("dnstap setup: no dnstap-socket-path for " 278*25039b37SCy Schubert "socket connect"); 279*25039b37SCy Schubert return 0; 280*25039b37SCy Schubert } 281*25039b37SCy Schubert free(dtio->socket_path); 282*25039b37SCy Schubert dtio->socket_path = strdup(cfg->dnstap_socket_path); 283*25039b37SCy Schubert if(!dtio->socket_path) { 284*25039b37SCy Schubert log_err("dnstap setup: malloc failure"); 285*25039b37SCy Schubert return 0; 286*25039b37SCy Schubert } 287*25039b37SCy Schubert } 288*25039b37SCy Schubert 289*25039b37SCy Schubert if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 290*25039b37SCy Schubert if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 291*25039b37SCy Schubert log_err("dnstap setup: no dnstap-ip for TCP connect"); 292*25039b37SCy Schubert return 0; 293*25039b37SCy Schubert } 294*25039b37SCy Schubert free(dtio->ip_str); 295*25039b37SCy Schubert dtio->ip_str = strdup(cfg->dnstap_ip); 296*25039b37SCy Schubert if(!dtio->ip_str) { 297*25039b37SCy Schubert log_err("dnstap setup: malloc failure"); 298*25039b37SCy Schubert return 0; 299*25039b37SCy Schubert } 300*25039b37SCy Schubert } 301*25039b37SCy Schubert 302*25039b37SCy Schubert if(dtio->upstream_is_tls) { 303*25039b37SCy Schubert #ifdef HAVE_SSL 304*25039b37SCy Schubert if(cfg->dnstap_tls_server_name && 305*25039b37SCy Schubert cfg->dnstap_tls_server_name[0]) { 306*25039b37SCy Schubert free(dtio->tls_server_name); 307*25039b37SCy Schubert dtio->tls_server_name = strdup( 308*25039b37SCy Schubert cfg->dnstap_tls_server_name); 309*25039b37SCy Schubert if(!dtio->tls_server_name) { 310*25039b37SCy Schubert log_err("dnstap setup: malloc failure"); 311*25039b37SCy Schubert return 0; 312*25039b37SCy Schubert } 313*25039b37SCy Schubert if(!check_auth_name_for_ssl(dtio->tls_server_name)) 314*25039b37SCy Schubert return 0; 315*25039b37SCy Schubert } 316*25039b37SCy Schubert if(cfg->dnstap_tls_client_key_file && 317*25039b37SCy Schubert cfg->dnstap_tls_client_key_file[0]) { 318*25039b37SCy Schubert dtio->use_client_certs = 1; 319*25039b37SCy Schubert free(dtio->client_key_file); 320*25039b37SCy Schubert dtio->client_key_file = strdup( 321*25039b37SCy Schubert cfg->dnstap_tls_client_key_file); 322*25039b37SCy Schubert if(!dtio->client_key_file) { 323*25039b37SCy Schubert log_err("dnstap setup: malloc failure"); 324*25039b37SCy Schubert return 0; 325*25039b37SCy Schubert } 326*25039b37SCy Schubert if(!cfg->dnstap_tls_client_cert_file || 327*25039b37SCy Schubert cfg->dnstap_tls_client_cert_file[0]==0) { 328*25039b37SCy Schubert log_err("dnstap setup: client key " 329*25039b37SCy Schubert "authentication enabled with " 330*25039b37SCy Schubert "dnstap-tls-client-key-file, but " 331*25039b37SCy Schubert "no dnstap-tls-client-cert-file " 332*25039b37SCy Schubert "is given"); 333*25039b37SCy Schubert return 0; 334*25039b37SCy Schubert } 335*25039b37SCy Schubert free(dtio->client_cert_file); 336*25039b37SCy Schubert dtio->client_cert_file = strdup( 337*25039b37SCy Schubert cfg->dnstap_tls_client_cert_file); 338*25039b37SCy Schubert if(!dtio->client_cert_file) { 339*25039b37SCy Schubert log_err("dnstap setup: malloc failure"); 340*25039b37SCy Schubert return 0; 341*25039b37SCy Schubert } 342*25039b37SCy Schubert } else { 343*25039b37SCy Schubert dtio->use_client_certs = 0; 344*25039b37SCy Schubert dtio->client_key_file = NULL; 345*25039b37SCy Schubert dtio->client_cert_file = NULL; 346*25039b37SCy Schubert } 347*25039b37SCy Schubert 348*25039b37SCy Schubert if(cfg->dnstap_tls_cert_bundle) { 349*25039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 350*25039b37SCy Schubert dtio->client_key_file, 351*25039b37SCy Schubert dtio->client_cert_file, 352*25039b37SCy Schubert cfg->dnstap_tls_cert_bundle, 0); 353*25039b37SCy Schubert } else { 354*25039b37SCy Schubert dtio->ssl_ctx = connect_sslctx_create( 355*25039b37SCy Schubert dtio->client_key_file, 356*25039b37SCy Schubert dtio->client_cert_file, 357*25039b37SCy Schubert cfg->tls_cert_bundle, cfg->tls_win_cert); 358*25039b37SCy Schubert } 359*25039b37SCy Schubert if(!dtio->ssl_ctx) { 360*25039b37SCy Schubert log_err("could not setup SSL CTX"); 361*25039b37SCy Schubert return 0; 362*25039b37SCy Schubert } 363*25039b37SCy Schubert dtio->tls_use_sni = cfg->tls_use_sni; 364*25039b37SCy Schubert #endif /* HAVE_SSL */ 365*25039b37SCy Schubert } 366*25039b37SCy Schubert return 1; 367*25039b37SCy Schubert } 368*25039b37SCy Schubert 369*25039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio, 370*25039b37SCy Schubert struct dt_msg_queue* mq) 371*25039b37SCy Schubert { 372*25039b37SCy Schubert struct dt_io_list_item* item = malloc(sizeof(*item)); 373*25039b37SCy Schubert if(!item) return 0; 374*25039b37SCy Schubert lock_basic_lock(&mq->lock); 375*25039b37SCy Schubert mq->dtio = dtio; 376*25039b37SCy Schubert lock_basic_unlock(&mq->lock); 377*25039b37SCy Schubert item->queue = mq; 378*25039b37SCy Schubert item->next = dtio->io_list; 379*25039b37SCy Schubert dtio->io_list = item; 380*25039b37SCy Schubert dtio->io_list_iter = NULL; 381*25039b37SCy Schubert return 1; 382*25039b37SCy Schubert } 383*25039b37SCy Schubert 384*25039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 385*25039b37SCy Schubert struct dt_msg_queue* mq) 386*25039b37SCy Schubert { 387*25039b37SCy Schubert struct dt_io_list_item* item, *prev=NULL; 388*25039b37SCy Schubert if(!dtio) return; 389*25039b37SCy Schubert item = dtio->io_list; 390*25039b37SCy Schubert while(item) { 391*25039b37SCy Schubert if(item->queue == mq) { 392*25039b37SCy Schubert /* found it */ 393*25039b37SCy Schubert if(prev) prev->next = item->next; 394*25039b37SCy Schubert else dtio->io_list = item->next; 395*25039b37SCy Schubert /* the queue itself only registered, not deleted */ 396*25039b37SCy Schubert lock_basic_lock(&item->queue->lock); 397*25039b37SCy Schubert item->queue->dtio = NULL; 398*25039b37SCy Schubert lock_basic_unlock(&item->queue->lock); 399*25039b37SCy Schubert free(item); 400*25039b37SCy Schubert dtio->io_list_iter = NULL; 401*25039b37SCy Schubert return; 402*25039b37SCy Schubert } 403*25039b37SCy Schubert prev = item; 404*25039b37SCy Schubert item = item->next; 405*25039b37SCy Schubert } 406*25039b37SCy Schubert } 407*25039b37SCy Schubert 408*25039b37SCy Schubert /** pick a message from the queue, the routine locks and unlocks, 409*25039b37SCy Schubert * returns true if there is a message */ 410*25039b37SCy Schubert static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 411*25039b37SCy Schubert size_t* len) 412*25039b37SCy Schubert { 413*25039b37SCy Schubert lock_basic_lock(&mq->lock); 414*25039b37SCy Schubert if(mq->first) { 415*25039b37SCy Schubert struct dt_msg_entry* entry = mq->first; 416*25039b37SCy Schubert mq->first = entry->next; 417*25039b37SCy Schubert if(!entry->next) mq->last = NULL; 418*25039b37SCy Schubert mq->cursize -= entry->len; 419*25039b37SCy Schubert lock_basic_unlock(&mq->lock); 420*25039b37SCy Schubert 421*25039b37SCy Schubert *buf = entry->buf; 422*25039b37SCy Schubert *len = entry->len; 423*25039b37SCy Schubert free(entry); 424*25039b37SCy Schubert return 1; 425*25039b37SCy Schubert } 426*25039b37SCy Schubert lock_basic_unlock(&mq->lock); 427*25039b37SCy Schubert return 0; 428*25039b37SCy Schubert } 429*25039b37SCy Schubert 430*25039b37SCy Schubert /** find message in queue, false if no message, true if message to send */ 431*25039b37SCy Schubert static int dtio_find_in_queue(struct dt_io_thread* dtio, 432*25039b37SCy Schubert struct dt_msg_queue* mq) 433*25039b37SCy Schubert { 434*25039b37SCy Schubert void* buf=NULL; 435*25039b37SCy Schubert size_t len=0; 436*25039b37SCy Schubert if(dt_msg_queue_pop(mq, &buf, &len)) { 437*25039b37SCy Schubert dtio->cur_msg = buf; 438*25039b37SCy Schubert dtio->cur_msg_len = len; 439*25039b37SCy Schubert dtio->cur_msg_done = 0; 440*25039b37SCy Schubert dtio->cur_msg_len_done = 0; 441*25039b37SCy Schubert return 1; 442*25039b37SCy Schubert } 443*25039b37SCy Schubert return 0; 444*25039b37SCy Schubert } 445*25039b37SCy Schubert 446*25039b37SCy Schubert /** find a new message to write, search message queues, false if none */ 447*25039b37SCy Schubert static int dtio_find_msg(struct dt_io_thread* dtio) 448*25039b37SCy Schubert { 449*25039b37SCy Schubert struct dt_io_list_item *spot, *item; 450*25039b37SCy Schubert 451*25039b37SCy Schubert spot = dtio->io_list_iter; 452*25039b37SCy Schubert /* use the next queue for the next message lookup, 453*25039b37SCy Schubert * if we hit the end(NULL) the NULL restarts the iter at start. */ 454*25039b37SCy Schubert if(spot) 455*25039b37SCy Schubert dtio->io_list_iter = spot->next; 456*25039b37SCy Schubert else if(dtio->io_list) 457*25039b37SCy Schubert dtio->io_list_iter = dtio->io_list->next; 458*25039b37SCy Schubert 459*25039b37SCy Schubert /* scan from spot to end-of-io_list */ 460*25039b37SCy Schubert item = spot; 461*25039b37SCy Schubert while(item) { 462*25039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 463*25039b37SCy Schubert return 1; 464*25039b37SCy Schubert item = item->next; 465*25039b37SCy Schubert } 466*25039b37SCy Schubert /* scan starting at the start-of-list (to wrap around the end) */ 467*25039b37SCy Schubert item = dtio->io_list; 468*25039b37SCy Schubert while(item) { 469*25039b37SCy Schubert if(dtio_find_in_queue(dtio, item->queue)) 470*25039b37SCy Schubert return 1; 471*25039b37SCy Schubert item = item->next; 472*25039b37SCy Schubert } 473*25039b37SCy Schubert return 0; 474*25039b37SCy Schubert } 475*25039b37SCy Schubert 476*25039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */ 477*25039b37SCy Schubert void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 478*25039b37SCy Schubert short ATTR_UNUSED(bits), void* arg) 479*25039b37SCy Schubert { 480*25039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 481*25039b37SCy Schubert dtio->reconnect_is_added = 0; 482*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: reconnect timer"); 483*25039b37SCy Schubert 484*25039b37SCy Schubert dtio_open_output(dtio); 485*25039b37SCy Schubert if(dtio->event) { 486*25039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 487*25039b37SCy Schubert return; 488*25039b37SCy Schubert /* nothing wrong so far, wait on the output event */ 489*25039b37SCy Schubert return; 490*25039b37SCy Schubert } 491*25039b37SCy Schubert /* exponential backoff and retry on timer */ 492*25039b37SCy Schubert dtio_reconnect_enable(dtio); 493*25039b37SCy Schubert } 494*25039b37SCy Schubert 495*25039b37SCy Schubert /** attempt to reconnect to the output, after a timeout */ 496*25039b37SCy Schubert static void dtio_reconnect_enable(struct dt_io_thread* dtio) 497*25039b37SCy Schubert { 498*25039b37SCy Schubert struct timeval tv; 499*25039b37SCy Schubert int msec; 500*25039b37SCy Schubert if(dtio->want_to_exit) return; 501*25039b37SCy Schubert if(dtio->reconnect_is_added) 502*25039b37SCy Schubert return; /* already done */ 503*25039b37SCy Schubert 504*25039b37SCy Schubert /* exponential backoff, store the value for next timeout */ 505*25039b37SCy Schubert msec = dtio->reconnect_timeout; 506*25039b37SCy Schubert if(msec == 0) { 507*25039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 508*25039b37SCy Schubert } else { 509*25039b37SCy Schubert dtio->reconnect_timeout = msec*2; 510*25039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 511*25039b37SCy Schubert dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 512*25039b37SCy Schubert } 513*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 514*25039b37SCy Schubert msec); 515*25039b37SCy Schubert 516*25039b37SCy Schubert /* setup wait timer */ 517*25039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 518*25039b37SCy Schubert tv.tv_sec = msec/1000; 519*25039b37SCy Schubert tv.tv_usec = (msec%1000)*1000; 520*25039b37SCy Schubert if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 521*25039b37SCy Schubert &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 522*25039b37SCy Schubert log_err("dnstap io: could not reconnect ev timer add"); 523*25039b37SCy Schubert return; 524*25039b37SCy Schubert } 525*25039b37SCy Schubert dtio->reconnect_is_added = 1; 526*25039b37SCy Schubert } 527*25039b37SCy Schubert 528*25039b37SCy Schubert /** remove dtio reconnect timer */ 529*25039b37SCy Schubert static void dtio_reconnect_del(struct dt_io_thread* dtio) 530*25039b37SCy Schubert { 531*25039b37SCy Schubert if(!dtio->reconnect_is_added) 532*25039b37SCy Schubert return; 533*25039b37SCy Schubert ub_timer_del(dtio->reconnect_timer); 534*25039b37SCy Schubert dtio->reconnect_is_added = 0; 535*25039b37SCy Schubert } 536*25039b37SCy Schubert 537*25039b37SCy Schubert /** clear the reconnect exponential backoff timer. 538*25039b37SCy Schubert * We have successfully connected so we can try again with short timeouts. */ 539*25039b37SCy Schubert static void dtio_reconnect_clear(struct dt_io_thread* dtio) 540*25039b37SCy Schubert { 541*25039b37SCy Schubert dtio->reconnect_timeout = 0; 542*25039b37SCy Schubert dtio_reconnect_del(dtio); 543*25039b37SCy Schubert } 544*25039b37SCy Schubert 545*25039b37SCy Schubert /** reconnect slowly, because we already know we have to wait for a bit */ 546*25039b37SCy Schubert static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 547*25039b37SCy Schubert { 548*25039b37SCy Schubert dtio_reconnect_del(dtio); 549*25039b37SCy Schubert dtio->reconnect_timeout = msec; 550*25039b37SCy Schubert dtio_reconnect_enable(dtio); 551*25039b37SCy Schubert } 552*25039b37SCy Schubert 553*25039b37SCy Schubert /** delete the current message in the dtio, and reset counters */ 554*25039b37SCy Schubert static void dtio_cur_msg_free(struct dt_io_thread* dtio) 555*25039b37SCy Schubert { 556*25039b37SCy Schubert free(dtio->cur_msg); 557*25039b37SCy Schubert dtio->cur_msg = NULL; 558*25039b37SCy Schubert dtio->cur_msg_len = 0; 559*25039b37SCy Schubert dtio->cur_msg_done = 0; 560*25039b37SCy Schubert dtio->cur_msg_len_done = 0; 561*25039b37SCy Schubert } 562*25039b37SCy Schubert 563*25039b37SCy Schubert /** delete the buffer and counters used to read frame */ 564*25039b37SCy Schubert static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 565*25039b37SCy Schubert { 566*25039b37SCy Schubert if(rb->buf) { 567*25039b37SCy Schubert free(rb->buf); 568*25039b37SCy Schubert rb->buf = NULL; 569*25039b37SCy Schubert } 570*25039b37SCy Schubert rb->buf_count = 0; 571*25039b37SCy Schubert rb->buf_cap = 0; 572*25039b37SCy Schubert rb->frame_len = 0; 573*25039b37SCy Schubert rb->frame_len_done = 0; 574*25039b37SCy Schubert rb->control_frame = 0; 575*25039b37SCy Schubert } 576*25039b37SCy Schubert 577*25039b37SCy Schubert /** del the output file descriptor event for listening */ 578*25039b37SCy Schubert static void dtio_del_output_event(struct dt_io_thread* dtio) 579*25039b37SCy Schubert { 580*25039b37SCy Schubert if(!dtio->event_added) 581*25039b37SCy Schubert return; 582*25039b37SCy Schubert ub_event_del(dtio->event); 583*25039b37SCy Schubert dtio->event_added = 0; 584*25039b37SCy Schubert dtio->event_added_is_write = 0; 585*25039b37SCy Schubert } 586*25039b37SCy Schubert 587*25039b37SCy Schubert /** close dtio socket and set it to -1 */ 588*25039b37SCy Schubert static void dtio_close_fd(struct dt_io_thread* dtio) 589*25039b37SCy Schubert { 590*25039b37SCy Schubert #ifndef USE_WINSOCK 591*25039b37SCy Schubert close(dtio->fd); 592*25039b37SCy Schubert #else 593*25039b37SCy Schubert closesocket(dtio->fd); 594*25039b37SCy Schubert #endif 595*25039b37SCy Schubert dtio->fd = -1; 596*25039b37SCy Schubert } 597*25039b37SCy Schubert 598*25039b37SCy Schubert /** close and stop the output file descriptor event */ 599*25039b37SCy Schubert static void dtio_close_output(struct dt_io_thread* dtio) 600*25039b37SCy Schubert { 601*25039b37SCy Schubert if(!dtio->event) 602*25039b37SCy Schubert return; 603*25039b37SCy Schubert ub_event_free(dtio->event); 604*25039b37SCy Schubert dtio->event = NULL; 605*25039b37SCy Schubert if(dtio->ssl) { 606*25039b37SCy Schubert #ifdef HAVE_SSL 607*25039b37SCy Schubert SSL_shutdown(dtio->ssl); 608*25039b37SCy Schubert SSL_free(dtio->ssl); 609*25039b37SCy Schubert dtio->ssl = NULL; 610*25039b37SCy Schubert #endif 611*25039b37SCy Schubert } 612*25039b37SCy Schubert dtio_close_fd(dtio); 613*25039b37SCy Schubert 614*25039b37SCy Schubert /* if there is a (partial) message, discard it 615*25039b37SCy Schubert * we cannot send (the remainder of) it, and a new 616*25039b37SCy Schubert * connection needs to start with a control frame. */ 617*25039b37SCy Schubert if(dtio->cur_msg) { 618*25039b37SCy Schubert dtio_cur_msg_free(dtio); 619*25039b37SCy Schubert } 620*25039b37SCy Schubert 621*25039b37SCy Schubert dtio->ready_frame_sent = 0; 622*25039b37SCy Schubert dtio->accept_frame_received = 0; 623*25039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 624*25039b37SCy Schubert 625*25039b37SCy Schubert dtio_reconnect_enable(dtio); 626*25039b37SCy Schubert } 627*25039b37SCy Schubert 628*25039b37SCy Schubert /** check for pending nonblocking connect errors, 629*25039b37SCy Schubert * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 630*25039b37SCy Schubert static int dtio_check_nb_connect(struct dt_io_thread* dtio) 631*25039b37SCy Schubert { 632*25039b37SCy Schubert int error = 0; 633*25039b37SCy Schubert socklen_t len = (socklen_t)sizeof(error); 634*25039b37SCy Schubert if(!dtio->check_nb_connect) 635*25039b37SCy Schubert return 1; /* everything okay */ 636*25039b37SCy Schubert if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 637*25039b37SCy Schubert &len) < 0) { 638*25039b37SCy Schubert #ifndef USE_WINSOCK 639*25039b37SCy Schubert error = errno; /* on solaris errno is error */ 640*25039b37SCy Schubert #else 641*25039b37SCy Schubert error = WSAGetLastError(); 642*25039b37SCy Schubert #endif 643*25039b37SCy Schubert } 644*25039b37SCy Schubert #ifndef USE_WINSOCK 645*25039b37SCy Schubert #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 646*25039b37SCy Schubert if(error == EINPROGRESS || error == EWOULDBLOCK) 647*25039b37SCy Schubert return 0; /* try again later */ 648*25039b37SCy Schubert #endif 649*25039b37SCy Schubert #else 650*25039b37SCy Schubert if(error == WSAEINPROGRESS) { 651*25039b37SCy Schubert return 0; /* try again later */ 652*25039b37SCy Schubert } else if(error == WSAEWOULDBLOCK) { 653*25039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 654*25039b37SCy Schubert dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 655*25039b37SCy Schubert return 0; /* try again later */ 656*25039b37SCy Schubert } 657*25039b37SCy Schubert #endif 658*25039b37SCy Schubert if(error != 0) { 659*25039b37SCy Schubert char* to = dtio->socket_path; 660*25039b37SCy Schubert if(!to) to = dtio->ip_str; 661*25039b37SCy Schubert if(!to) to = ""; 662*25039b37SCy Schubert #ifndef USE_WINSOCK 663*25039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 664*25039b37SCy Schubert to, strerror(error)); 665*25039b37SCy Schubert #else 666*25039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 667*25039b37SCy Schubert to, wsa_strerror(error)); 668*25039b37SCy Schubert #endif 669*25039b37SCy Schubert return -1; /* error, close it */ 670*25039b37SCy Schubert } 671*25039b37SCy Schubert 672*25039b37SCy Schubert if(dtio->ip_str) 673*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to %s", 674*25039b37SCy Schubert dtio->ip_str); 675*25039b37SCy Schubert else if(dtio->socket_path) 676*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 677*25039b37SCy Schubert dtio->socket_path); 678*25039b37SCy Schubert dtio_reconnect_clear(dtio); 679*25039b37SCy Schubert dtio->check_nb_connect = 0; 680*25039b37SCy Schubert return 1; /* everything okay */ 681*25039b37SCy Schubert } 682*25039b37SCy Schubert 683*25039b37SCy Schubert #ifdef HAVE_SSL 684*25039b37SCy Schubert /** write to ssl output 685*25039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 686*25039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 687*25039b37SCy Schubert static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 688*25039b37SCy Schubert size_t len) 689*25039b37SCy Schubert { 690*25039b37SCy Schubert int r; 691*25039b37SCy Schubert ERR_clear_error(); 692*25039b37SCy Schubert r = SSL_write(dtio->ssl, buf, len); 693*25039b37SCy Schubert if(r <= 0) { 694*25039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 695*25039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 696*25039b37SCy Schubert /* closed */ 697*25039b37SCy Schubert return -1; 698*25039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 699*25039b37SCy Schubert /* we want a brief read event */ 700*25039b37SCy Schubert dtio_enable_brief_read(dtio); 701*25039b37SCy Schubert return 0; 702*25039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 703*25039b37SCy Schubert /* write again later */ 704*25039b37SCy Schubert return 0; 705*25039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 706*25039b37SCy Schubert #ifdef EPIPE 707*25039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 708*25039b37SCy Schubert return -1; /* silence 'broken pipe' */ 709*25039b37SCy Schubert #endif 710*25039b37SCy Schubert #ifdef ECONNRESET 711*25039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 712*25039b37SCy Schubert return -1; /* silence reset by peer */ 713*25039b37SCy Schubert #endif 714*25039b37SCy Schubert if(errno != 0) { 715*25039b37SCy Schubert log_err("dnstap io, SSL_write syscall: %s", 716*25039b37SCy Schubert strerror(errno)); 717*25039b37SCy Schubert } 718*25039b37SCy Schubert return -1; 719*25039b37SCy Schubert } 720*25039b37SCy Schubert log_crypto_err("dnstap io, could not SSL_write"); 721*25039b37SCy Schubert return -1; 722*25039b37SCy Schubert } 723*25039b37SCy Schubert return r; 724*25039b37SCy Schubert } 725*25039b37SCy Schubert #endif /* HAVE_SSL */ 726*25039b37SCy Schubert 727*25039b37SCy Schubert /** write buffer to output. 728*25039b37SCy Schubert * returns number of bytes written, 0 if nothing happened, 729*25039b37SCy Schubert * try again later, or -1 if the channel is to be closed. */ 730*25039b37SCy Schubert static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 731*25039b37SCy Schubert size_t len) 732*25039b37SCy Schubert { 733*25039b37SCy Schubert ssize_t ret; 734*25039b37SCy Schubert if(dtio->fd == -1) 735*25039b37SCy Schubert return -1; 736*25039b37SCy Schubert #ifdef HAVE_SSL 737*25039b37SCy Schubert if(dtio->ssl) 738*25039b37SCy Schubert return dtio_write_ssl(dtio, buf, len); 739*25039b37SCy Schubert #endif 740*25039b37SCy Schubert ret = send(dtio->fd, (void*)buf, len, 0); 741*25039b37SCy Schubert if(ret == -1) { 742*25039b37SCy Schubert #ifndef USE_WINSOCK 743*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 744*25039b37SCy Schubert return 0; 745*25039b37SCy Schubert log_err("dnstap io: failed send: %s", strerror(errno)); 746*25039b37SCy Schubert #else 747*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 748*25039b37SCy Schubert return 0; 749*25039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 750*25039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 751*25039b37SCy Schubert dtio->stop_flush_event:dtio->event), 752*25039b37SCy Schubert UB_EV_WRITE); 753*25039b37SCy Schubert return 0; 754*25039b37SCy Schubert } 755*25039b37SCy Schubert log_err("dnstap io: failed send: %s", 756*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 757*25039b37SCy Schubert #endif 758*25039b37SCy Schubert return -1; 759*25039b37SCy Schubert } 760*25039b37SCy Schubert return ret; 761*25039b37SCy Schubert } 762*25039b37SCy Schubert 763*25039b37SCy Schubert #ifdef HAVE_WRITEV 764*25039b37SCy Schubert /** write with writev, len and message, in one write, if possible. 765*25039b37SCy Schubert * return true if message is done, false if incomplete */ 766*25039b37SCy Schubert static int dtio_write_with_writev(struct dt_io_thread* dtio) 767*25039b37SCy Schubert { 768*25039b37SCy Schubert uint32_t sendlen = htonl(dtio->cur_msg_len); 769*25039b37SCy Schubert struct iovec iov[2]; 770*25039b37SCy Schubert ssize_t r; 771*25039b37SCy Schubert iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 772*25039b37SCy Schubert iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 773*25039b37SCy Schubert iov[1].iov_base = dtio->cur_msg; 774*25039b37SCy Schubert iov[1].iov_len = dtio->cur_msg_len; 775*25039b37SCy Schubert log_assert(iov[0].iov_len > 0); 776*25039b37SCy Schubert r = writev(dtio->fd, iov, 2); 777*25039b37SCy Schubert if(r == -1) { 778*25039b37SCy Schubert #ifndef USE_WINSOCK 779*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 780*25039b37SCy Schubert return 0; 781*25039b37SCy Schubert log_err("dnstap io: failed writev: %s", strerror(errno)); 782*25039b37SCy Schubert #else 783*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 784*25039b37SCy Schubert return 0; 785*25039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) { 786*25039b37SCy Schubert ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 787*25039b37SCy Schubert dtio->stop_flush_event:dtio->event), 788*25039b37SCy Schubert UB_EV_WRITE); 789*25039b37SCy Schubert return 0; 790*25039b37SCy Schubert } 791*25039b37SCy Schubert log_err("dnstap io: failed writev: %s", 792*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 793*25039b37SCy Schubert #endif 794*25039b37SCy Schubert /* close the channel */ 795*25039b37SCy Schubert dtio_del_output_event(dtio); 796*25039b37SCy Schubert dtio_close_output(dtio); 797*25039b37SCy Schubert return 0; 798*25039b37SCy Schubert } 799*25039b37SCy Schubert /* written r bytes */ 800*25039b37SCy Schubert dtio->cur_msg_len_done += r; 801*25039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 802*25039b37SCy Schubert return 0; 803*25039b37SCy Schubert if(dtio->cur_msg_len_done > 4) { 804*25039b37SCy Schubert dtio->cur_msg_done = dtio->cur_msg_len_done-4; 805*25039b37SCy Schubert dtio->cur_msg_len_done = 4; 806*25039b37SCy Schubert } 807*25039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 808*25039b37SCy Schubert return 0; 809*25039b37SCy Schubert return 1; 810*25039b37SCy Schubert } 811*25039b37SCy Schubert #endif /* HAVE_WRITEV */ 812*25039b37SCy Schubert 813*25039b37SCy Schubert /** write more of the length, preceding the data frame. 814*25039b37SCy Schubert * return true if message is done, false if incomplete. */ 815*25039b37SCy Schubert static int dtio_write_more_of_len(struct dt_io_thread* dtio) 816*25039b37SCy Schubert { 817*25039b37SCy Schubert uint32_t sendlen; 818*25039b37SCy Schubert int r; 819*25039b37SCy Schubert if(dtio->cur_msg_len_done >= 4) 820*25039b37SCy Schubert return 1; 821*25039b37SCy Schubert #ifdef HAVE_WRITEV 822*25039b37SCy Schubert if(!dtio->ssl) { 823*25039b37SCy Schubert /* we try writev for everything.*/ 824*25039b37SCy Schubert return dtio_write_with_writev(dtio); 825*25039b37SCy Schubert } 826*25039b37SCy Schubert #endif /* HAVE_WRITEV */ 827*25039b37SCy Schubert sendlen = htonl(dtio->cur_msg_len); 828*25039b37SCy Schubert r = dtio_write_buf(dtio, 829*25039b37SCy Schubert ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 830*25039b37SCy Schubert sizeof(sendlen)-dtio->cur_msg_len_done); 831*25039b37SCy Schubert if(r == -1) { 832*25039b37SCy Schubert /* close the channel */ 833*25039b37SCy Schubert dtio_del_output_event(dtio); 834*25039b37SCy Schubert dtio_close_output(dtio); 835*25039b37SCy Schubert return 0; 836*25039b37SCy Schubert } else if(r == 0) { 837*25039b37SCy Schubert /* try again later */ 838*25039b37SCy Schubert return 0; 839*25039b37SCy Schubert } 840*25039b37SCy Schubert dtio->cur_msg_len_done += r; 841*25039b37SCy Schubert if(dtio->cur_msg_len_done < 4) 842*25039b37SCy Schubert return 0; 843*25039b37SCy Schubert return 1; 844*25039b37SCy Schubert } 845*25039b37SCy Schubert 846*25039b37SCy Schubert /** write more of the data frame. 847*25039b37SCy Schubert * return true if message is done, false if incomplete. */ 848*25039b37SCy Schubert static int dtio_write_more_of_data(struct dt_io_thread* dtio) 849*25039b37SCy Schubert { 850*25039b37SCy Schubert int r; 851*25039b37SCy Schubert if(dtio->cur_msg_done >= dtio->cur_msg_len) 852*25039b37SCy Schubert return 1; 853*25039b37SCy Schubert r = dtio_write_buf(dtio, 854*25039b37SCy Schubert ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 855*25039b37SCy Schubert dtio->cur_msg_len - dtio->cur_msg_done); 856*25039b37SCy Schubert if(r == -1) { 857*25039b37SCy Schubert /* close the channel */ 858*25039b37SCy Schubert dtio_del_output_event(dtio); 859*25039b37SCy Schubert dtio_close_output(dtio); 860*25039b37SCy Schubert return 0; 861*25039b37SCy Schubert } else if(r == 0) { 862*25039b37SCy Schubert /* try again later */ 863*25039b37SCy Schubert return 0; 864*25039b37SCy Schubert } 865*25039b37SCy Schubert dtio->cur_msg_done += r; 866*25039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) 867*25039b37SCy Schubert return 0; 868*25039b37SCy Schubert return 1; 869*25039b37SCy Schubert } 870*25039b37SCy Schubert 871*25039b37SCy Schubert /** write more of the current messsage. false if incomplete, true if 872*25039b37SCy Schubert * the message is done */ 873*25039b37SCy Schubert static int dtio_write_more(struct dt_io_thread* dtio) 874*25039b37SCy Schubert { 875*25039b37SCy Schubert if(dtio->cur_msg_len_done < 4) { 876*25039b37SCy Schubert if(!dtio_write_more_of_len(dtio)) 877*25039b37SCy Schubert return 0; 878*25039b37SCy Schubert } 879*25039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 880*25039b37SCy Schubert if(!dtio_write_more_of_data(dtio)) 881*25039b37SCy Schubert return 0; 882*25039b37SCy Schubert } 883*25039b37SCy Schubert return 1; 884*25039b37SCy Schubert } 885*25039b37SCy Schubert 886*25039b37SCy Schubert /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 887*25039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 888*25039b37SCy Schubert static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 889*25039b37SCy Schubert ssize_t r; 890*25039b37SCy Schubert r = recv(dtio->fd, (void*)buf, len, 0); 891*25039b37SCy Schubert if(r == -1) { 892*25039b37SCy Schubert char* to = dtio->socket_path; 893*25039b37SCy Schubert if(!to) to = dtio->ip_str; 894*25039b37SCy Schubert if(!to) to = ""; 895*25039b37SCy Schubert #ifndef USE_WINSOCK 896*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 897*25039b37SCy Schubert return -1; /* try later */ 898*25039b37SCy Schubert #else 899*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) { 900*25039b37SCy Schubert return -1; /* try later */ 901*25039b37SCy Schubert } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 902*25039b37SCy Schubert ub_winsock_tcp_wouldblock( 903*25039b37SCy Schubert (dtio->stop_flush_event? 904*25039b37SCy Schubert dtio->stop_flush_event:dtio->event), 905*25039b37SCy Schubert UB_EV_READ); 906*25039b37SCy Schubert return -1; /* try later */ 907*25039b37SCy Schubert } 908*25039b37SCy Schubert #endif 909*25039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 910*25039b37SCy Schubert verbosity < 4) 911*25039b37SCy Schubert return 0; /* no log retries on low verbosity */ 912*25039b37SCy Schubert log_err("dnstap io: output closed, recv %s: %s", to, 913*25039b37SCy Schubert strerror(errno)); 914*25039b37SCy Schubert /* and close below */ 915*25039b37SCy Schubert return 0; 916*25039b37SCy Schubert } 917*25039b37SCy Schubert if(r == 0) { 918*25039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 919*25039b37SCy Schubert verbosity < 4) 920*25039b37SCy Schubert return 0; /* no log retries on low verbosity */ 921*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 922*25039b37SCy Schubert /* and close below */ 923*25039b37SCy Schubert return 0; 924*25039b37SCy Schubert } 925*25039b37SCy Schubert /* something was received */ 926*25039b37SCy Schubert return r; 927*25039b37SCy Schubert } 928*25039b37SCy Schubert 929*25039b37SCy Schubert #ifdef HAVE_SSL 930*25039b37SCy Schubert /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 931*25039b37SCy Schubert * -1: continue, >0: number of bytes read into buffer */ 932*25039b37SCy Schubert static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 933*25039b37SCy Schubert { 934*25039b37SCy Schubert int r; 935*25039b37SCy Schubert ERR_clear_error(); 936*25039b37SCy Schubert r = SSL_read(dtio->ssl, buf, len); 937*25039b37SCy Schubert if(r <= 0) { 938*25039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 939*25039b37SCy Schubert if(want == SSL_ERROR_ZERO_RETURN) { 940*25039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 941*25039b37SCy Schubert verbosity < 4) 942*25039b37SCy Schubert return 0; /* no log retries on low verbosity */ 943*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 944*25039b37SCy Schubert "other side"); 945*25039b37SCy Schubert return 0; 946*25039b37SCy Schubert } else if(want == SSL_ERROR_WANT_READ) { 947*25039b37SCy Schubert /* continue later */ 948*25039b37SCy Schubert return -1; 949*25039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 950*25039b37SCy Schubert (void)dtio_enable_brief_write(dtio); 951*25039b37SCy Schubert return -1; 952*25039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 953*25039b37SCy Schubert #ifdef ECONNRESET 954*25039b37SCy Schubert if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 955*25039b37SCy Schubert errno == ECONNRESET && verbosity < 4) 956*25039b37SCy Schubert return 0; /* silence reset by peer */ 957*25039b37SCy Schubert #endif 958*25039b37SCy Schubert if(errno != 0) 959*25039b37SCy Schubert log_err("SSL_read syscall: %s", 960*25039b37SCy Schubert strerror(errno)); 961*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 962*25039b37SCy Schubert "other side"); 963*25039b37SCy Schubert return 0; 964*25039b37SCy Schubert } 965*25039b37SCy Schubert log_crypto_err("could not SSL_read"); 966*25039b37SCy Schubert verbose(VERB_DETAIL, "dnstap io: output closed by the " 967*25039b37SCy Schubert "other side"); 968*25039b37SCy Schubert return 0; 969*25039b37SCy Schubert } 970*25039b37SCy Schubert return r; 971*25039b37SCy Schubert } 972*25039b37SCy Schubert #endif /* HAVE_SSL */ 973*25039b37SCy Schubert 974*25039b37SCy Schubert /** check if the output fd has been closed, 975*25039b37SCy Schubert * it returns false if the stream is closed. */ 976*25039b37SCy Schubert static int dtio_check_close(struct dt_io_thread* dtio) 977*25039b37SCy Schubert { 978*25039b37SCy Schubert /* we don't want to read any packets, but if there are we can 979*25039b37SCy Schubert * discard the input (ignore it). Ignore of unknown (control) 980*25039b37SCy Schubert * packets is okay for the framestream protocol. And also, the 981*25039b37SCy Schubert * read call can return that the stream has been closed by the 982*25039b37SCy Schubert * other side. */ 983*25039b37SCy Schubert uint8_t buf[1024]; 984*25039b37SCy Schubert int r = -1; 985*25039b37SCy Schubert 986*25039b37SCy Schubert 987*25039b37SCy Schubert if(dtio->fd == -1) return 0; 988*25039b37SCy Schubert 989*25039b37SCy Schubert while(r != 0) { 990*25039b37SCy Schubert /* not interested in buffer content, overwrite */ 991*25039b37SCy Schubert r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 992*25039b37SCy Schubert if(r == -1) 993*25039b37SCy Schubert return 1; 994*25039b37SCy Schubert } 995*25039b37SCy Schubert /* the other end has been closed */ 996*25039b37SCy Schubert /* close the channel */ 997*25039b37SCy Schubert dtio_del_output_event(dtio); 998*25039b37SCy Schubert dtio_close_output(dtio); 999*25039b37SCy Schubert return 0; 1000*25039b37SCy Schubert } 1001*25039b37SCy Schubert 1002*25039b37SCy Schubert /** Read accept frame. Returns -1: continue reading, 0: closed, 1003*25039b37SCy Schubert * 1: valid accept received. */ 1004*25039b37SCy Schubert static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1005*25039b37SCy Schubert { 1006*25039b37SCy Schubert int r; 1007*25039b37SCy Schubert size_t read_frame_done; 1008*25039b37SCy Schubert while(dtio->read_frame.frame_len_done < 4) { 1009*25039b37SCy Schubert #ifdef HAVE_SSL 1010*25039b37SCy Schubert if(dtio->ssl) { 1011*25039b37SCy Schubert r = ssl_read_bytes(dtio, 1012*25039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 1013*25039b37SCy Schubert dtio->read_frame.frame_len_done, 1014*25039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 1015*25039b37SCy Schubert } else { 1016*25039b37SCy Schubert #endif 1017*25039b37SCy Schubert r = receive_bytes(dtio, 1018*25039b37SCy Schubert (uint8_t*)&dtio->read_frame.frame_len+ 1019*25039b37SCy Schubert dtio->read_frame.frame_len_done, 1020*25039b37SCy Schubert 4-dtio->read_frame.frame_len_done); 1021*25039b37SCy Schubert #ifdef HAVE_SSL 1022*25039b37SCy Schubert } 1023*25039b37SCy Schubert #endif 1024*25039b37SCy Schubert if(r == -1) 1025*25039b37SCy Schubert return -1; /* continue reading */ 1026*25039b37SCy Schubert if(r == 0) { 1027*25039b37SCy Schubert /* connection closed */ 1028*25039b37SCy Schubert goto close_connection; 1029*25039b37SCy Schubert } 1030*25039b37SCy Schubert dtio->read_frame.frame_len_done += r; 1031*25039b37SCy Schubert if(dtio->read_frame.frame_len_done < 4) 1032*25039b37SCy Schubert return -1; /* continue reading */ 1033*25039b37SCy Schubert 1034*25039b37SCy Schubert if(dtio->read_frame.frame_len == 0) { 1035*25039b37SCy Schubert dtio->read_frame.frame_len_done = 0; 1036*25039b37SCy Schubert dtio->read_frame.control_frame = 1; 1037*25039b37SCy Schubert continue; 1038*25039b37SCy Schubert } 1039*25039b37SCy Schubert dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1040*25039b37SCy Schubert if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1041*25039b37SCy Schubert verbose(VERB_OPS, "dnstap: received frame exceeds max " 1042*25039b37SCy Schubert "length of %d bytes, closing connection", 1043*25039b37SCy Schubert DTIO_RECV_FRAME_MAX_LEN); 1044*25039b37SCy Schubert goto close_connection; 1045*25039b37SCy Schubert } 1046*25039b37SCy Schubert dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1047*25039b37SCy Schubert dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1048*25039b37SCy Schubert if(!dtio->read_frame.buf) { 1049*25039b37SCy Schubert log_err("dnstap io: out of memory (creating read " 1050*25039b37SCy Schubert "buffer)"); 1051*25039b37SCy Schubert goto close_connection; 1052*25039b37SCy Schubert } 1053*25039b37SCy Schubert } 1054*25039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1055*25039b37SCy Schubert #ifdef HAVE_SSL 1056*25039b37SCy Schubert if(dtio->ssl) { 1057*25039b37SCy Schubert r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1058*25039b37SCy Schubert dtio->read_frame.buf_count, 1059*25039b37SCy Schubert dtio->read_frame.buf_cap- 1060*25039b37SCy Schubert dtio->read_frame.buf_count); 1061*25039b37SCy Schubert } else { 1062*25039b37SCy Schubert #endif 1063*25039b37SCy Schubert r = receive_bytes(dtio, dtio->read_frame.buf+ 1064*25039b37SCy Schubert dtio->read_frame.buf_count, 1065*25039b37SCy Schubert dtio->read_frame.buf_cap- 1066*25039b37SCy Schubert dtio->read_frame.buf_count); 1067*25039b37SCy Schubert #ifdef HAVE_SSL 1068*25039b37SCy Schubert } 1069*25039b37SCy Schubert #endif 1070*25039b37SCy Schubert if(r == -1) 1071*25039b37SCy Schubert return -1; /* continue reading */ 1072*25039b37SCy Schubert if(r == 0) { 1073*25039b37SCy Schubert /* connection closed */ 1074*25039b37SCy Schubert goto close_connection; 1075*25039b37SCy Schubert } 1076*25039b37SCy Schubert dtio->read_frame.buf_count += r; 1077*25039b37SCy Schubert if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1078*25039b37SCy Schubert return -1; /* continue reading */ 1079*25039b37SCy Schubert } 1080*25039b37SCy Schubert 1081*25039b37SCy Schubert /* Complete frame received, check if this is a valid ACCEPT control 1082*25039b37SCy Schubert * frame. */ 1083*25039b37SCy Schubert if(dtio->read_frame.frame_len < 4) { 1084*25039b37SCy Schubert verbose(VERB_OPS, "dnstap: invalid data received"); 1085*25039b37SCy Schubert goto close_connection; 1086*25039b37SCy Schubert } 1087*25039b37SCy Schubert if(sldns_read_uint32(dtio->read_frame.buf) != 1088*25039b37SCy Schubert FSTRM_CONTROL_FRAME_ACCEPT) { 1089*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap: invalid control type received, " 1090*25039b37SCy Schubert "ignored"); 1091*25039b37SCy Schubert dtio->ready_frame_sent = 0; 1092*25039b37SCy Schubert dtio->accept_frame_received = 0; 1093*25039b37SCy Schubert dtio_read_frame_free(&dtio->read_frame); 1094*25039b37SCy Schubert return -1; 1095*25039b37SCy Schubert } 1096*25039b37SCy Schubert read_frame_done = 4; /* control frame type */ 1097*25039b37SCy Schubert 1098*25039b37SCy Schubert /* Iterate over control fields, ignore unknown types. 1099*25039b37SCy Schubert * Need to be able to read at least 8 bytes (control field type + 1100*25039b37SCy Schubert * length). */ 1101*25039b37SCy Schubert while(read_frame_done+8 < dtio->read_frame.frame_len) { 1102*25039b37SCy Schubert uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1103*25039b37SCy Schubert read_frame_done); 1104*25039b37SCy Schubert uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1105*25039b37SCy Schubert read_frame_done + 4); 1106*25039b37SCy Schubert if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1107*25039b37SCy Schubert if(len == strlen(DNSTAP_CONTENT_TYPE) && 1108*25039b37SCy Schubert read_frame_done+8+len <= 1109*25039b37SCy Schubert dtio->read_frame.frame_len && 1110*25039b37SCy Schubert memcmp(dtio->read_frame.buf + read_frame_done + 1111*25039b37SCy Schubert + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1112*25039b37SCy Schubert if(!dtio_control_start_send(dtio)) { 1113*25039b37SCy Schubert verbose(VERB_OPS, "dnstap io: out of " 1114*25039b37SCy Schubert "memory while sending START frame"); 1115*25039b37SCy Schubert goto close_connection; 1116*25039b37SCy Schubert } 1117*25039b37SCy Schubert dtio->accept_frame_received = 1; 1118*25039b37SCy Schubert return 1; 1119*25039b37SCy Schubert } else { 1120*25039b37SCy Schubert /* unknow content type */ 1121*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1122*25039b37SCy Schubert "contains unknown content type, " 1123*25039b37SCy Schubert "closing connection"); 1124*25039b37SCy Schubert goto close_connection; 1125*25039b37SCy Schubert } 1126*25039b37SCy Schubert } 1127*25039b37SCy Schubert /* unknown option, try next */ 1128*25039b37SCy Schubert read_frame_done += 8+len; 1129*25039b37SCy Schubert } 1130*25039b37SCy Schubert 1131*25039b37SCy Schubert 1132*25039b37SCy Schubert close_connection: 1133*25039b37SCy Schubert dtio_del_output_event(dtio); 1134*25039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1135*25039b37SCy Schubert dtio_close_output(dtio); 1136*25039b37SCy Schubert return 0; 1137*25039b37SCy Schubert } 1138*25039b37SCy Schubert 1139*25039b37SCy Schubert /** add the output file descriptor event for listening, read only */ 1140*25039b37SCy Schubert static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1141*25039b37SCy Schubert { 1142*25039b37SCy Schubert if(!dtio->event) 1143*25039b37SCy Schubert return 0; 1144*25039b37SCy Schubert if(dtio->event_added && !dtio->event_added_is_write) 1145*25039b37SCy Schubert return 1; 1146*25039b37SCy Schubert /* we have to (re-)register the event */ 1147*25039b37SCy Schubert if(dtio->event_added) 1148*25039b37SCy Schubert ub_event_del(dtio->event); 1149*25039b37SCy Schubert ub_event_del_bits(dtio->event, UB_EV_WRITE); 1150*25039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 1151*25039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 1152*25039b37SCy Schubert dtio->event_added = 0; 1153*25039b37SCy Schubert dtio->event_added_is_write = 0; 1154*25039b37SCy Schubert /* close output and start reattempts to open it */ 1155*25039b37SCy Schubert dtio_close_output(dtio); 1156*25039b37SCy Schubert return 0; 1157*25039b37SCy Schubert } 1158*25039b37SCy Schubert dtio->event_added = 1; 1159*25039b37SCy Schubert dtio->event_added_is_write = 0; 1160*25039b37SCy Schubert return 1; 1161*25039b37SCy Schubert } 1162*25039b37SCy Schubert 1163*25039b37SCy Schubert /** add the output file descriptor event for listening, read and write */ 1164*25039b37SCy Schubert static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1165*25039b37SCy Schubert { 1166*25039b37SCy Schubert if(!dtio->event) 1167*25039b37SCy Schubert return 0; 1168*25039b37SCy Schubert if(dtio->event_added && dtio->event_added_is_write) 1169*25039b37SCy Schubert return 1; 1170*25039b37SCy Schubert /* we have to (re-)register the event */ 1171*25039b37SCy Schubert if(dtio->event_added) 1172*25039b37SCy Schubert ub_event_del(dtio->event); 1173*25039b37SCy Schubert ub_event_add_bits(dtio->event, UB_EV_WRITE); 1174*25039b37SCy Schubert if(ub_event_add(dtio->event, NULL) != 0) { 1175*25039b37SCy Schubert log_err("dnstap io: out of memory (adding event)"); 1176*25039b37SCy Schubert dtio->event_added = 0; 1177*25039b37SCy Schubert dtio->event_added_is_write = 0; 1178*25039b37SCy Schubert /* close output and start reattempts to open it */ 1179*25039b37SCy Schubert dtio_close_output(dtio); 1180*25039b37SCy Schubert return 0; 1181*25039b37SCy Schubert } 1182*25039b37SCy Schubert dtio->event_added = 1; 1183*25039b37SCy Schubert dtio->event_added_is_write = 1; 1184*25039b37SCy Schubert return 1; 1185*25039b37SCy Schubert } 1186*25039b37SCy Schubert 1187*25039b37SCy Schubert /** put the dtio thread to sleep */ 1188*25039b37SCy Schubert static void dtio_sleep(struct dt_io_thread* dtio) 1189*25039b37SCy Schubert { 1190*25039b37SCy Schubert /* unregister the event polling for write, because there is 1191*25039b37SCy Schubert * nothing to be written */ 1192*25039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 1193*25039b37SCy Schubert } 1194*25039b37SCy Schubert 1195*25039b37SCy Schubert #ifdef HAVE_SSL 1196*25039b37SCy Schubert /** enable the brief read condition */ 1197*25039b37SCy Schubert static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1198*25039b37SCy Schubert { 1199*25039b37SCy Schubert dtio->ssl_brief_read = 1; 1200*25039b37SCy Schubert if(dtio->stop_flush_event) { 1201*25039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 1202*25039b37SCy Schubert ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1203*25039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1204*25039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 1205*25039b37SCy Schubert return 0; 1206*25039b37SCy Schubert } 1207*25039b37SCy Schubert return 1; 1208*25039b37SCy Schubert } 1209*25039b37SCy Schubert return dtio_add_output_event_read(dtio); 1210*25039b37SCy Schubert } 1211*25039b37SCy Schubert #endif /* HAVE_SSL */ 1212*25039b37SCy Schubert 1213*25039b37SCy Schubert #ifdef HAVE_SSL 1214*25039b37SCy Schubert /** disable the brief read condition */ 1215*25039b37SCy Schubert static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1216*25039b37SCy Schubert { 1217*25039b37SCy Schubert dtio->ssl_brief_read = 0; 1218*25039b37SCy Schubert if(dtio->stop_flush_event) { 1219*25039b37SCy Schubert ub_event_del(dtio->stop_flush_event); 1220*25039b37SCy Schubert ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1221*25039b37SCy Schubert if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1222*25039b37SCy Schubert log_err("dnstap io, stop flush, could not ub_event_add"); 1223*25039b37SCy Schubert return 0; 1224*25039b37SCy Schubert } 1225*25039b37SCy Schubert return 1; 1226*25039b37SCy Schubert } 1227*25039b37SCy Schubert return dtio_add_output_event_write(dtio); 1228*25039b37SCy Schubert } 1229*25039b37SCy Schubert #endif /* HAVE_SSL */ 1230*25039b37SCy Schubert 1231*25039b37SCy Schubert #ifdef HAVE_SSL 1232*25039b37SCy Schubert /** enable the brief write condition */ 1233*25039b37SCy Schubert static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1234*25039b37SCy Schubert { 1235*25039b37SCy Schubert dtio->ssl_brief_write = 1; 1236*25039b37SCy Schubert return dtio_add_output_event_write(dtio); 1237*25039b37SCy Schubert } 1238*25039b37SCy Schubert #endif /* HAVE_SSL */ 1239*25039b37SCy Schubert 1240*25039b37SCy Schubert #ifdef HAVE_SSL 1241*25039b37SCy Schubert /** disable the brief write condition */ 1242*25039b37SCy Schubert static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1243*25039b37SCy Schubert { 1244*25039b37SCy Schubert dtio->ssl_brief_write = 0; 1245*25039b37SCy Schubert return dtio_add_output_event_read(dtio); 1246*25039b37SCy Schubert } 1247*25039b37SCy Schubert #endif /* HAVE_SSL */ 1248*25039b37SCy Schubert 1249*25039b37SCy Schubert #ifdef HAVE_SSL 1250*25039b37SCy Schubert /** check peer verification after ssl handshake connection, false if closed*/ 1251*25039b37SCy Schubert static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1252*25039b37SCy Schubert { 1253*25039b37SCy Schubert if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1254*25039b37SCy Schubert /* verification */ 1255*25039b37SCy Schubert if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1256*25039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 1257*25039b37SCy Schubert if(!x) { 1258*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 1259*25039b37SCy Schubert "connection failed no certificate", 1260*25039b37SCy Schubert dtio->ip_str); 1261*25039b37SCy Schubert return 0; 1262*25039b37SCy Schubert } 1263*25039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer certificate", 1264*25039b37SCy Schubert x); 1265*25039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 1266*25039b37SCy Schubert if(SSL_get0_peername(dtio->ssl)) { 1267*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 1268*25039b37SCy Schubert "connection to %s authenticated", 1269*25039b37SCy Schubert dtio->ip_str, 1270*25039b37SCy Schubert SSL_get0_peername(dtio->ssl)); 1271*25039b37SCy Schubert } else { 1272*25039b37SCy Schubert #endif 1273*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL " 1274*25039b37SCy Schubert "connection authenticated", 1275*25039b37SCy Schubert dtio->ip_str); 1276*25039b37SCy Schubert #ifdef HAVE_SSL_GET0_PEERNAME 1277*25039b37SCy Schubert } 1278*25039b37SCy Schubert #endif 1279*25039b37SCy Schubert X509_free(x); 1280*25039b37SCy Schubert } else { 1281*25039b37SCy Schubert X509* x = SSL_get_peer_certificate(dtio->ssl); 1282*25039b37SCy Schubert if(x) { 1283*25039b37SCy Schubert log_cert(VERB_ALGO, "dnstap io, peer " 1284*25039b37SCy Schubert "certificate", x); 1285*25039b37SCy Schubert X509_free(x); 1286*25039b37SCy Schubert } 1287*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1288*25039b37SCy Schubert "failed: failed to authenticate", 1289*25039b37SCy Schubert dtio->ip_str); 1290*25039b37SCy Schubert return 0; 1291*25039b37SCy Schubert } 1292*25039b37SCy Schubert } else { 1293*25039b37SCy Schubert /* unauthenticated, the verify peer flag was not set 1294*25039b37SCy Schubert * in ssl when the ssl object was created from ssl_ctx */ 1295*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1296*25039b37SCy Schubert dtio->ip_str); 1297*25039b37SCy Schubert } 1298*25039b37SCy Schubert return 1; 1299*25039b37SCy Schubert } 1300*25039b37SCy Schubert #endif /* HAVE_SSL */ 1301*25039b37SCy Schubert 1302*25039b37SCy Schubert #ifdef HAVE_SSL 1303*25039b37SCy Schubert /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1304*25039b37SCy Schubert static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1305*25039b37SCy Schubert struct stop_flush_info* info) 1306*25039b37SCy Schubert { 1307*25039b37SCy Schubert int r; 1308*25039b37SCy Schubert if(dtio->ssl_brief_read) { 1309*25039b37SCy Schubert /* assume the brief read condition is satisfied, 1310*25039b37SCy Schubert * if we need more or again, we can set it again */ 1311*25039b37SCy Schubert if(!dtio_disable_brief_read(dtio)) { 1312*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1313*25039b37SCy Schubert return 0; 1314*25039b37SCy Schubert } 1315*25039b37SCy Schubert } 1316*25039b37SCy Schubert if(dtio->ssl_handshake_done) 1317*25039b37SCy Schubert return 1; 1318*25039b37SCy Schubert 1319*25039b37SCy Schubert ERR_clear_error(); 1320*25039b37SCy Schubert r = SSL_do_handshake(dtio->ssl); 1321*25039b37SCy Schubert if(r != 1) { 1322*25039b37SCy Schubert int want = SSL_get_error(dtio->ssl, r); 1323*25039b37SCy Schubert if(want == SSL_ERROR_WANT_READ) { 1324*25039b37SCy Schubert /* we want to read on the connection */ 1325*25039b37SCy Schubert if(!dtio_enable_brief_read(dtio)) { 1326*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1327*25039b37SCy Schubert return 0; 1328*25039b37SCy Schubert } 1329*25039b37SCy Schubert return 0; 1330*25039b37SCy Schubert } else if(want == SSL_ERROR_WANT_WRITE) { 1331*25039b37SCy Schubert /* we want to write on the connection */ 1332*25039b37SCy Schubert return 0; 1333*25039b37SCy Schubert } else if(r == 0) { 1334*25039b37SCy Schubert /* closed */ 1335*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1336*25039b37SCy Schubert dtio_del_output_event(dtio); 1337*25039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1338*25039b37SCy Schubert dtio_close_output(dtio); 1339*25039b37SCy Schubert return 0; 1340*25039b37SCy Schubert } else if(want == SSL_ERROR_SYSCALL) { 1341*25039b37SCy Schubert /* SYSCALL and errno==0 means closed uncleanly */ 1342*25039b37SCy Schubert int silent = 0; 1343*25039b37SCy Schubert #ifdef EPIPE 1344*25039b37SCy Schubert if(errno == EPIPE && verbosity < 2) 1345*25039b37SCy Schubert silent = 1; /* silence 'broken pipe' */ 1346*25039b37SCy Schubert #endif 1347*25039b37SCy Schubert #ifdef ECONNRESET 1348*25039b37SCy Schubert if(errno == ECONNRESET && verbosity < 2) 1349*25039b37SCy Schubert silent = 1; /* silence reset by peer */ 1350*25039b37SCy Schubert #endif 1351*25039b37SCy Schubert if(errno == 0) 1352*25039b37SCy Schubert silent = 1; 1353*25039b37SCy Schubert if(!silent) 1354*25039b37SCy Schubert log_err("dnstap io, SSL_handshake syscall: %s", 1355*25039b37SCy Schubert strerror(errno)); 1356*25039b37SCy Schubert /* closed */ 1357*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1358*25039b37SCy Schubert dtio_del_output_event(dtio); 1359*25039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1360*25039b37SCy Schubert dtio_close_output(dtio); 1361*25039b37SCy Schubert return 0; 1362*25039b37SCy Schubert } else { 1363*25039b37SCy Schubert unsigned long err = ERR_get_error(); 1364*25039b37SCy Schubert if(!squelch_err_ssl_handshake(err)) { 1365*25039b37SCy Schubert log_crypto_err_code("dnstap io, ssl handshake failed", 1366*25039b37SCy Schubert err); 1367*25039b37SCy Schubert verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1368*25039b37SCy Schubert "from %s", dtio->ip_str); 1369*25039b37SCy Schubert } 1370*25039b37SCy Schubert /* closed */ 1371*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1372*25039b37SCy Schubert dtio_del_output_event(dtio); 1373*25039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1374*25039b37SCy Schubert dtio_close_output(dtio); 1375*25039b37SCy Schubert return 0; 1376*25039b37SCy Schubert } 1377*25039b37SCy Schubert 1378*25039b37SCy Schubert } 1379*25039b37SCy Schubert /* check peer verification */ 1380*25039b37SCy Schubert dtio->ssl_handshake_done = 1; 1381*25039b37SCy Schubert 1382*25039b37SCy Schubert if(!dtio_ssl_check_peer(dtio)) { 1383*25039b37SCy Schubert /* closed */ 1384*25039b37SCy Schubert if(info) dtio_stop_flush_exit(info); 1385*25039b37SCy Schubert dtio_del_output_event(dtio); 1386*25039b37SCy Schubert dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1387*25039b37SCy Schubert dtio_close_output(dtio); 1388*25039b37SCy Schubert return 0; 1389*25039b37SCy Schubert } 1390*25039b37SCy Schubert return 1; 1391*25039b37SCy Schubert } 1392*25039b37SCy Schubert #endif /* HAVE_SSL */ 1393*25039b37SCy Schubert 1394*25039b37SCy Schubert /** callback for the dnstap events, to write to the output */ 1395*25039b37SCy Schubert void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1396*25039b37SCy Schubert { 1397*25039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1398*25039b37SCy Schubert int i; 1399*25039b37SCy Schubert 1400*25039b37SCy Schubert if(dtio->check_nb_connect) { 1401*25039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 1402*25039b37SCy Schubert if(connect_err == -1) { 1403*25039b37SCy Schubert /* close the channel */ 1404*25039b37SCy Schubert dtio_del_output_event(dtio); 1405*25039b37SCy Schubert dtio_close_output(dtio); 1406*25039b37SCy Schubert return; 1407*25039b37SCy Schubert } else if(connect_err == 0) { 1408*25039b37SCy Schubert /* try again later */ 1409*25039b37SCy Schubert return; 1410*25039b37SCy Schubert } 1411*25039b37SCy Schubert /* nonblocking connect check passed, continue */ 1412*25039b37SCy Schubert } 1413*25039b37SCy Schubert 1414*25039b37SCy Schubert #ifdef HAVE_SSL 1415*25039b37SCy Schubert if(dtio->ssl && 1416*25039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1417*25039b37SCy Schubert if(!dtio_ssl_handshake(dtio, NULL)) 1418*25039b37SCy Schubert return; 1419*25039b37SCy Schubert } 1420*25039b37SCy Schubert #endif 1421*25039b37SCy Schubert 1422*25039b37SCy Schubert if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1423*25039b37SCy Schubert if(dtio->ssl_brief_write) 1424*25039b37SCy Schubert (void)dtio_disable_brief_write(dtio); 1425*25039b37SCy Schubert if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1426*25039b37SCy Schubert if(dtio_read_accept_frame(dtio) <= 0) 1427*25039b37SCy Schubert return; 1428*25039b37SCy Schubert } else if(!dtio_check_close(dtio)) 1429*25039b37SCy Schubert return; 1430*25039b37SCy Schubert } 1431*25039b37SCy Schubert 1432*25039b37SCy Schubert /* loop to process a number of messages. This improves throughput, 1433*25039b37SCy Schubert * because selecting on write-event if not needed for busy messages 1434*25039b37SCy Schubert * (dnstap log) generation and if they need to all be written back. 1435*25039b37SCy Schubert * The write event is usually not blocked up. But not forever, 1436*25039b37SCy Schubert * because the event loop needs to stay responsive for other events. 1437*25039b37SCy Schubert * If there are no (more) messages, or if the output buffers get 1438*25039b37SCy Schubert * full, it returns out of the loop. */ 1439*25039b37SCy Schubert for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1440*25039b37SCy Schubert /* see if there are messages that need writing */ 1441*25039b37SCy Schubert if(!dtio->cur_msg) { 1442*25039b37SCy Schubert if(!dtio_find_msg(dtio)) { 1443*25039b37SCy Schubert if(i == 0) { 1444*25039b37SCy Schubert /* no messages on the first iteration, 1445*25039b37SCy Schubert * the queues are all empty */ 1446*25039b37SCy Schubert dtio_sleep(dtio); 1447*25039b37SCy Schubert } 1448*25039b37SCy Schubert return; /* nothing to do */ 1449*25039b37SCy Schubert } 1450*25039b37SCy Schubert } 1451*25039b37SCy Schubert 1452*25039b37SCy Schubert /* write it */ 1453*25039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 1454*25039b37SCy Schubert if(!dtio_write_more(dtio)) 1455*25039b37SCy Schubert return; 1456*25039b37SCy Schubert } 1457*25039b37SCy Schubert 1458*25039b37SCy Schubert /* done with the current message */ 1459*25039b37SCy Schubert dtio_cur_msg_free(dtio); 1460*25039b37SCy Schubert 1461*25039b37SCy Schubert /* If this is a bidirectional stream the first message will be 1462*25039b37SCy Schubert * the READY control frame. We can only continue writing after 1463*25039b37SCy Schubert * receiving an ACCEPT control frame. */ 1464*25039b37SCy Schubert if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1465*25039b37SCy Schubert dtio->ready_frame_sent = 1; 1466*25039b37SCy Schubert (void)dtio_add_output_event_read(dtio); 1467*25039b37SCy Schubert break; 1468*25039b37SCy Schubert } 1469*25039b37SCy Schubert } 1470*25039b37SCy Schubert } 1471*25039b37SCy Schubert 1472*25039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1473*25039b37SCy Schubert void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1474*25039b37SCy Schubert { 1475*25039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1476*25039b37SCy Schubert uint8_t cmd; 1477*25039b37SCy Schubert ssize_t r; 1478*25039b37SCy Schubert if(dtio->want_to_exit) 1479*25039b37SCy Schubert return; 1480*25039b37SCy Schubert r = read(fd, &cmd, sizeof(cmd)); 1481*25039b37SCy Schubert if(r == -1) { 1482*25039b37SCy Schubert #ifndef USE_WINSOCK 1483*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 1484*25039b37SCy Schubert return; /* ignore this */ 1485*25039b37SCy Schubert log_err("dnstap io: failed to read: %s", strerror(errno)); 1486*25039b37SCy Schubert #else 1487*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 1488*25039b37SCy Schubert return; 1489*25039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 1490*25039b37SCy Schubert return; 1491*25039b37SCy Schubert log_err("dnstap io: failed to read: %s", 1492*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 1493*25039b37SCy Schubert #endif 1494*25039b37SCy Schubert /* and then fall through to quit the thread */ 1495*25039b37SCy Schubert } else if(r == 0) { 1496*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1497*25039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1498*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1499*25039b37SCy Schubert } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1500*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1501*25039b37SCy Schubert 1502*25039b37SCy Schubert if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1503*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1504*25039b37SCy Schubert "waiting for ACCEPT control frame"); 1505*25039b37SCy Schubert return; 1506*25039b37SCy Schubert } 1507*25039b37SCy Schubert 1508*25039b37SCy Schubert /* reregister event */ 1509*25039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 1510*25039b37SCy Schubert return; 1511*25039b37SCy Schubert return; 1512*25039b37SCy Schubert } else if(r == 1) { 1513*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1514*25039b37SCy Schubert } 1515*25039b37SCy Schubert dtio->want_to_exit = 1; 1516*25039b37SCy Schubert if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1517*25039b37SCy Schubert != 0) { 1518*25039b37SCy Schubert log_err("dnstap io: could not loopexit"); 1519*25039b37SCy Schubert } 1520*25039b37SCy Schubert } 1521*25039b37SCy Schubert 1522*25039b37SCy Schubert #ifndef THREADS_DISABLED 1523*25039b37SCy Schubert /** setup the event base for the dnstap io thread */ 1524*25039b37SCy Schubert static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1525*25039b37SCy Schubert struct timeval* now) 1526*25039b37SCy Schubert { 1527*25039b37SCy Schubert memset(now, 0, sizeof(*now)); 1528*25039b37SCy Schubert dtio->event_base = ub_default_event_base(0, secs, now); 1529*25039b37SCy Schubert if(!dtio->event_base) { 1530*25039b37SCy Schubert fatal_exit("dnstap io: could not create event_base"); 1531*25039b37SCy Schubert } 1532*25039b37SCy Schubert } 1533*25039b37SCy Schubert #endif /* THREADS_DISABLED */ 1534*25039b37SCy Schubert 1535*25039b37SCy Schubert /** setup the cmd event for dnstap io */ 1536*25039b37SCy Schubert static void dtio_setup_cmd(struct dt_io_thread* dtio) 1537*25039b37SCy Schubert { 1538*25039b37SCy Schubert struct ub_event* cmdev; 1539*25039b37SCy Schubert fd_set_nonblock(dtio->commandpipe[0]); 1540*25039b37SCy Schubert cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1541*25039b37SCy Schubert UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1542*25039b37SCy Schubert if(!cmdev) { 1543*25039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 1544*25039b37SCy Schubert } 1545*25039b37SCy Schubert dtio->command_event = cmdev; 1546*25039b37SCy Schubert if(ub_event_add(cmdev, NULL) != 0) { 1547*25039b37SCy Schubert fatal_exit("dnstap io: out of memory (adding event)"); 1548*25039b37SCy Schubert } 1549*25039b37SCy Schubert } 1550*25039b37SCy Schubert 1551*25039b37SCy Schubert /** setup the reconnect event for dnstap io */ 1552*25039b37SCy Schubert static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1553*25039b37SCy Schubert { 1554*25039b37SCy Schubert dtio_reconnect_clear(dtio); 1555*25039b37SCy Schubert dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1556*25039b37SCy Schubert UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1557*25039b37SCy Schubert if(!dtio->reconnect_timer) { 1558*25039b37SCy Schubert fatal_exit("dnstap io: out of memory"); 1559*25039b37SCy Schubert } 1560*25039b37SCy Schubert } 1561*25039b37SCy Schubert 1562*25039b37SCy Schubert /** 1563*25039b37SCy Schubert * structure to keep track of information during stop flush 1564*25039b37SCy Schubert */ 1565*25039b37SCy Schubert struct stop_flush_info { 1566*25039b37SCy Schubert /** the event base during stop flush */ 1567*25039b37SCy Schubert struct ub_event_base* base; 1568*25039b37SCy Schubert /** did we already want to exit this stop-flush event base */ 1569*25039b37SCy Schubert int want_to_exit_flush; 1570*25039b37SCy Schubert /** has the timer fired */ 1571*25039b37SCy Schubert int timer_done; 1572*25039b37SCy Schubert /** the dtio */ 1573*25039b37SCy Schubert struct dt_io_thread* dtio; 1574*25039b37SCy Schubert /** the stop control frame */ 1575*25039b37SCy Schubert void* stop_frame; 1576*25039b37SCy Schubert /** length of the stop frame */ 1577*25039b37SCy Schubert size_t stop_frame_len; 1578*25039b37SCy Schubert /** how much we have done of the stop frame */ 1579*25039b37SCy Schubert size_t stop_frame_done; 1580*25039b37SCy Schubert }; 1581*25039b37SCy Schubert 1582*25039b37SCy Schubert /** exit the stop flush base */ 1583*25039b37SCy Schubert static void dtio_stop_flush_exit(struct stop_flush_info* info) 1584*25039b37SCy Schubert { 1585*25039b37SCy Schubert if(info->want_to_exit_flush) 1586*25039b37SCy Schubert return; 1587*25039b37SCy Schubert info->want_to_exit_flush = 1; 1588*25039b37SCy Schubert if(ub_event_base_loopexit(info->base) != 0) { 1589*25039b37SCy Schubert log_err("dnstap io: could not loopexit"); 1590*25039b37SCy Schubert } 1591*25039b37SCy Schubert } 1592*25039b37SCy Schubert 1593*25039b37SCy Schubert /** send the stop control, 1594*25039b37SCy Schubert * return true if completed the frame. */ 1595*25039b37SCy Schubert static int dtio_control_stop_send(struct stop_flush_info* info) 1596*25039b37SCy Schubert { 1597*25039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 1598*25039b37SCy Schubert int r; 1599*25039b37SCy Schubert if(info->stop_frame_done >= info->stop_frame_len) 1600*25039b37SCy Schubert return 1; 1601*25039b37SCy Schubert r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1602*25039b37SCy Schubert info->stop_frame_done, info->stop_frame_len - 1603*25039b37SCy Schubert info->stop_frame_done); 1604*25039b37SCy Schubert if(r == -1) { 1605*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1606*25039b37SCy Schubert dtio_stop_flush_exit(info); 1607*25039b37SCy Schubert return 0; 1608*25039b37SCy Schubert } 1609*25039b37SCy Schubert if(r == 0) { 1610*25039b37SCy Schubert /* try again later, or timeout */ 1611*25039b37SCy Schubert return 0; 1612*25039b37SCy Schubert } 1613*25039b37SCy Schubert info->stop_frame_done += r; 1614*25039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) 1615*25039b37SCy Schubert return 0; /* not done yet */ 1616*25039b37SCy Schubert return 1; 1617*25039b37SCy Schubert } 1618*25039b37SCy Schubert 1619*25039b37SCy Schubert void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1620*25039b37SCy Schubert void* arg) 1621*25039b37SCy Schubert { 1622*25039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 1623*25039b37SCy Schubert if(info->want_to_exit_flush) 1624*25039b37SCy Schubert return; 1625*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1626*25039b37SCy Schubert info->timer_done = 1; 1627*25039b37SCy Schubert dtio_stop_flush_exit(info); 1628*25039b37SCy Schubert } 1629*25039b37SCy Schubert 1630*25039b37SCy Schubert void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1631*25039b37SCy Schubert { 1632*25039b37SCy Schubert struct stop_flush_info* info = (struct stop_flush_info*)arg; 1633*25039b37SCy Schubert struct dt_io_thread* dtio = info->dtio; 1634*25039b37SCy Schubert if(info->want_to_exit_flush) 1635*25039b37SCy Schubert return; 1636*25039b37SCy Schubert if(dtio->check_nb_connect) { 1637*25039b37SCy Schubert /* we don't start the stop_flush if connect still 1638*25039b37SCy Schubert * in progress, but the check code is here, just in case */ 1639*25039b37SCy Schubert int connect_err = dtio_check_nb_connect(dtio); 1640*25039b37SCy Schubert if(connect_err == -1) { 1641*25039b37SCy Schubert /* close the channel, exit the stop flush */ 1642*25039b37SCy Schubert dtio_stop_flush_exit(info); 1643*25039b37SCy Schubert dtio_del_output_event(dtio); 1644*25039b37SCy Schubert dtio_close_output(dtio); 1645*25039b37SCy Schubert return; 1646*25039b37SCy Schubert } else if(connect_err == 0) { 1647*25039b37SCy Schubert /* try again later */ 1648*25039b37SCy Schubert return; 1649*25039b37SCy Schubert } 1650*25039b37SCy Schubert /* nonblocking connect check passed, continue */ 1651*25039b37SCy Schubert } 1652*25039b37SCy Schubert #ifdef HAVE_SSL 1653*25039b37SCy Schubert if(dtio->ssl && 1654*25039b37SCy Schubert (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1655*25039b37SCy Schubert if(!dtio_ssl_handshake(dtio, info)) 1656*25039b37SCy Schubert return; 1657*25039b37SCy Schubert } 1658*25039b37SCy Schubert #endif 1659*25039b37SCy Schubert 1660*25039b37SCy Schubert if((bits&UB_EV_READ)) { 1661*25039b37SCy Schubert if(!dtio_check_close(dtio)) { 1662*25039b37SCy Schubert if(dtio->fd == -1) { 1663*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 1664*25039b37SCy Schubert "stop flush: output closed"); 1665*25039b37SCy Schubert dtio_stop_flush_exit(info); 1666*25039b37SCy Schubert } 1667*25039b37SCy Schubert return; 1668*25039b37SCy Schubert } 1669*25039b37SCy Schubert } 1670*25039b37SCy Schubert /* write remainder of last frame */ 1671*25039b37SCy Schubert if(dtio->cur_msg) { 1672*25039b37SCy Schubert if(dtio->cur_msg_done < dtio->cur_msg_len) { 1673*25039b37SCy Schubert if(!dtio_write_more(dtio)) { 1674*25039b37SCy Schubert if(dtio->fd == -1) { 1675*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: " 1676*25039b37SCy Schubert "stop flush: output closed"); 1677*25039b37SCy Schubert dtio_stop_flush_exit(info); 1678*25039b37SCy Schubert } 1679*25039b37SCy Schubert return; 1680*25039b37SCy Schubert } 1681*25039b37SCy Schubert } 1682*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 1683*25039b37SCy Schubert "last frame"); 1684*25039b37SCy Schubert dtio_cur_msg_free(dtio); 1685*25039b37SCy Schubert } 1686*25039b37SCy Schubert /* write stop frame */ 1687*25039b37SCy Schubert if(info->stop_frame_done < info->stop_frame_len) { 1688*25039b37SCy Schubert if(!dtio_control_stop_send(info)) 1689*25039b37SCy Schubert return; 1690*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush completed " 1691*25039b37SCy Schubert "stop control frame"); 1692*25039b37SCy Schubert } 1693*25039b37SCy Schubert /* when last frame and stop frame are sent, exit */ 1694*25039b37SCy Schubert dtio_stop_flush_exit(info); 1695*25039b37SCy Schubert } 1696*25039b37SCy Schubert 1697*25039b37SCy Schubert /** flush at end, last packet and stop control */ 1698*25039b37SCy Schubert static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1699*25039b37SCy Schubert { 1700*25039b37SCy Schubert /* briefly attempt to flush the previous packet to the output, 1701*25039b37SCy Schubert * this could be a partial packet, or even the start control frame */ 1702*25039b37SCy Schubert time_t secs = 0; 1703*25039b37SCy Schubert struct timeval now; 1704*25039b37SCy Schubert struct stop_flush_info info; 1705*25039b37SCy Schubert struct timeval tv; 1706*25039b37SCy Schubert struct ub_event* timer, *stopev; 1707*25039b37SCy Schubert 1708*25039b37SCy Schubert if(dtio->fd == -1 || dtio->check_nb_connect) { 1709*25039b37SCy Schubert /* no connection or we have just connected, so nothing is 1710*25039b37SCy Schubert * sent yet, so nothing to stop or flush */ 1711*25039b37SCy Schubert return; 1712*25039b37SCy Schubert } 1713*25039b37SCy Schubert if(dtio->ssl && !dtio->ssl_handshake_done) { 1714*25039b37SCy Schubert /* no SSL connection has been established yet */ 1715*25039b37SCy Schubert return; 1716*25039b37SCy Schubert } 1717*25039b37SCy Schubert 1718*25039b37SCy Schubert memset(&info, 0, sizeof(info)); 1719*25039b37SCy Schubert memset(&now, 0, sizeof(now)); 1720*25039b37SCy Schubert info.dtio = dtio; 1721*25039b37SCy Schubert info.base = ub_default_event_base(0, &secs, &now); 1722*25039b37SCy Schubert if(!info.base) { 1723*25039b37SCy Schubert log_err("dnstap io: malloc failure"); 1724*25039b37SCy Schubert return; 1725*25039b37SCy Schubert } 1726*25039b37SCy Schubert timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1727*25039b37SCy Schubert &dtio_stop_timer_cb, &info); 1728*25039b37SCy Schubert if(!timer) { 1729*25039b37SCy Schubert log_err("dnstap io: malloc failure"); 1730*25039b37SCy Schubert ub_event_base_free(info.base); 1731*25039b37SCy Schubert return; 1732*25039b37SCy Schubert } 1733*25039b37SCy Schubert memset(&tv, 0, sizeof(tv)); 1734*25039b37SCy Schubert tv.tv_sec = 2; 1735*25039b37SCy Schubert if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1736*25039b37SCy Schubert &tv) != 0) { 1737*25039b37SCy Schubert log_err("dnstap io: cannot event_timer_add"); 1738*25039b37SCy Schubert ub_event_free(timer); 1739*25039b37SCy Schubert ub_event_base_free(info.base); 1740*25039b37SCy Schubert return; 1741*25039b37SCy Schubert } 1742*25039b37SCy Schubert stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1743*25039b37SCy Schubert UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1744*25039b37SCy Schubert if(!stopev) { 1745*25039b37SCy Schubert log_err("dnstap io: malloc failure"); 1746*25039b37SCy Schubert ub_timer_del(timer); 1747*25039b37SCy Schubert ub_event_free(timer); 1748*25039b37SCy Schubert ub_event_base_free(info.base); 1749*25039b37SCy Schubert return; 1750*25039b37SCy Schubert } 1751*25039b37SCy Schubert if(ub_event_add(stopev, NULL) != 0) { 1752*25039b37SCy Schubert log_err("dnstap io: cannot event_add"); 1753*25039b37SCy Schubert ub_event_free(stopev); 1754*25039b37SCy Schubert ub_timer_del(timer); 1755*25039b37SCy Schubert ub_event_free(timer); 1756*25039b37SCy Schubert ub_event_base_free(info.base); 1757*25039b37SCy Schubert return; 1758*25039b37SCy Schubert } 1759*25039b37SCy Schubert info.stop_frame = fstrm_create_control_frame_stop( 1760*25039b37SCy Schubert &info.stop_frame_len); 1761*25039b37SCy Schubert if(!info.stop_frame) { 1762*25039b37SCy Schubert log_err("dnstap io: malloc failure"); 1763*25039b37SCy Schubert ub_event_del(stopev); 1764*25039b37SCy Schubert ub_event_free(stopev); 1765*25039b37SCy Schubert ub_timer_del(timer); 1766*25039b37SCy Schubert ub_event_free(timer); 1767*25039b37SCy Schubert ub_event_base_free(info.base); 1768*25039b37SCy Schubert return; 1769*25039b37SCy Schubert } 1770*25039b37SCy Schubert dtio->stop_flush_event = stopev; 1771*25039b37SCy Schubert 1772*25039b37SCy Schubert /* wait briefly, or until finished */ 1773*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush started"); 1774*25039b37SCy Schubert if(ub_event_base_dispatch(info.base) < 0) { 1775*25039b37SCy Schubert log_err("dnstap io: dispatch flush failed, errno is %s", 1776*25039b37SCy Schubert strerror(errno)); 1777*25039b37SCy Schubert } 1778*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1779*25039b37SCy Schubert free(info.stop_frame); 1780*25039b37SCy Schubert dtio->stop_flush_event = NULL; 1781*25039b37SCy Schubert ub_event_del(stopev); 1782*25039b37SCy Schubert ub_event_free(stopev); 1783*25039b37SCy Schubert ub_timer_del(timer); 1784*25039b37SCy Schubert ub_event_free(timer); 1785*25039b37SCy Schubert ub_event_base_free(info.base); 1786*25039b37SCy Schubert } 1787*25039b37SCy Schubert 1788*25039b37SCy Schubert /** perform desetup and free stuff when the dnstap io thread exits */ 1789*25039b37SCy Schubert static void dtio_desetup(struct dt_io_thread* dtio) 1790*25039b37SCy Schubert { 1791*25039b37SCy Schubert dtio_control_stop_flush(dtio); 1792*25039b37SCy Schubert dtio_del_output_event(dtio); 1793*25039b37SCy Schubert dtio_close_output(dtio); 1794*25039b37SCy Schubert ub_event_del(dtio->command_event); 1795*25039b37SCy Schubert ub_event_free(dtio->command_event); 1796*25039b37SCy Schubert #ifndef USE_WINSOCK 1797*25039b37SCy Schubert close(dtio->commandpipe[0]); 1798*25039b37SCy Schubert #else 1799*25039b37SCy Schubert _close(dtio->commandpipe[0]); 1800*25039b37SCy Schubert #endif 1801*25039b37SCy Schubert dtio->commandpipe[0] = -1; 1802*25039b37SCy Schubert dtio_reconnect_del(dtio); 1803*25039b37SCy Schubert ub_event_free(dtio->reconnect_timer); 1804*25039b37SCy Schubert dtio_cur_msg_free(dtio); 1805*25039b37SCy Schubert #ifndef THREADS_DISABLED 1806*25039b37SCy Schubert ub_event_base_free(dtio->event_base); 1807*25039b37SCy Schubert #endif 1808*25039b37SCy Schubert } 1809*25039b37SCy Schubert 1810*25039b37SCy Schubert /** setup a start control message */ 1811*25039b37SCy Schubert static int dtio_control_start_send(struct dt_io_thread* dtio) 1812*25039b37SCy Schubert { 1813*25039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1814*25039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1815*25039b37SCy Schubert &dtio->cur_msg_len); 1816*25039b37SCy Schubert if(!dtio->cur_msg) { 1817*25039b37SCy Schubert return 0; 1818*25039b37SCy Schubert } 1819*25039b37SCy Schubert /* setup to send the control message */ 1820*25039b37SCy Schubert /* set that the buffer needs to be sent, but the length 1821*25039b37SCy Schubert * of that buffer is already written, that way the buffer can 1822*25039b37SCy Schubert * start with 0 length and then the length of the control frame 1823*25039b37SCy Schubert * in it */ 1824*25039b37SCy Schubert dtio->cur_msg_done = 0; 1825*25039b37SCy Schubert dtio->cur_msg_len_done = 4; 1826*25039b37SCy Schubert return 1; 1827*25039b37SCy Schubert } 1828*25039b37SCy Schubert 1829*25039b37SCy Schubert /** setup a ready control message */ 1830*25039b37SCy Schubert static int dtio_control_ready_send(struct dt_io_thread* dtio) 1831*25039b37SCy Schubert { 1832*25039b37SCy Schubert log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1833*25039b37SCy Schubert dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1834*25039b37SCy Schubert &dtio->cur_msg_len); 1835*25039b37SCy Schubert if(!dtio->cur_msg) { 1836*25039b37SCy Schubert return 0; 1837*25039b37SCy Schubert } 1838*25039b37SCy Schubert /* setup to send the control message */ 1839*25039b37SCy Schubert /* set that the buffer needs to be sent, but the length 1840*25039b37SCy Schubert * of that buffer is already written, that way the buffer can 1841*25039b37SCy Schubert * start with 0 length and then the length of the control frame 1842*25039b37SCy Schubert * in it */ 1843*25039b37SCy Schubert dtio->cur_msg_done = 0; 1844*25039b37SCy Schubert dtio->cur_msg_len_done = 4; 1845*25039b37SCy Schubert return 1; 1846*25039b37SCy Schubert } 1847*25039b37SCy Schubert 1848*25039b37SCy Schubert /** open the output file descriptor for af_local */ 1849*25039b37SCy Schubert static int dtio_open_output_local(struct dt_io_thread* dtio) 1850*25039b37SCy Schubert { 1851*25039b37SCy Schubert #ifdef HAVE_SYS_UN_H 1852*25039b37SCy Schubert struct sockaddr_un s; 1853*25039b37SCy Schubert dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1854*25039b37SCy Schubert if(dtio->fd == -1) { 1855*25039b37SCy Schubert #ifndef USE_WINSOCK 1856*25039b37SCy Schubert log_err("dnstap io: failed to create socket: %s", 1857*25039b37SCy Schubert strerror(errno)); 1858*25039b37SCy Schubert #else 1859*25039b37SCy Schubert log_err("dnstap io: failed to create socket: %s", 1860*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 1861*25039b37SCy Schubert #endif 1862*25039b37SCy Schubert return 0; 1863*25039b37SCy Schubert } 1864*25039b37SCy Schubert memset(&s, 0, sizeof(s)); 1865*25039b37SCy Schubert #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1866*25039b37SCy Schubert /* this member exists on BSDs, not Linux */ 1867*25039b37SCy Schubert s.sun_len = (unsigned)sizeof(s); 1868*25039b37SCy Schubert #endif 1869*25039b37SCy Schubert s.sun_family = AF_LOCAL; 1870*25039b37SCy Schubert /* length is 92-108, 104 on FreeBSD */ 1871*25039b37SCy Schubert (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1872*25039b37SCy Schubert fd_set_nonblock(dtio->fd); 1873*25039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1874*25039b37SCy Schubert == -1) { 1875*25039b37SCy Schubert char* to = dtio->socket_path; 1876*25039b37SCy Schubert #ifndef USE_WINSOCK 1877*25039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 1878*25039b37SCy Schubert to, strerror(errno)); 1879*25039b37SCy Schubert #else 1880*25039b37SCy Schubert log_err("dnstap io: failed to connect to \"%s\": %s", 1881*25039b37SCy Schubert to, wsa_strerror(WSAGetLastError())); 1882*25039b37SCy Schubert #endif 1883*25039b37SCy Schubert dtio_close_fd(dtio); 1884*25039b37SCy Schubert return 0; 1885*25039b37SCy Schubert } 1886*25039b37SCy Schubert return 1; 1887*25039b37SCy Schubert #else 1888*25039b37SCy Schubert log_err("cannot create af_local socket"); 1889*25039b37SCy Schubert return 0; 1890*25039b37SCy Schubert #endif /* HAVE_SYS_UN_H */ 1891*25039b37SCy Schubert } 1892*25039b37SCy Schubert 1893*25039b37SCy Schubert /** open the output file descriptor for af_inet and af_inet6 */ 1894*25039b37SCy Schubert static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1895*25039b37SCy Schubert { 1896*25039b37SCy Schubert struct sockaddr_storage addr; 1897*25039b37SCy Schubert socklen_t addrlen; 1898*25039b37SCy Schubert memset(&addr, 0, sizeof(addr)); 1899*25039b37SCy Schubert addrlen = (socklen_t)sizeof(addr); 1900*25039b37SCy Schubert 1901*25039b37SCy Schubert if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1902*25039b37SCy Schubert log_err("could not parse IP '%s'", dtio->ip_str); 1903*25039b37SCy Schubert return 0; 1904*25039b37SCy Schubert } 1905*25039b37SCy Schubert dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1906*25039b37SCy Schubert if(dtio->fd == -1) { 1907*25039b37SCy Schubert #ifndef USE_WINSOCK 1908*25039b37SCy Schubert log_err("can't create socket: %s", strerror(errno)); 1909*25039b37SCy Schubert #else 1910*25039b37SCy Schubert log_err("can't create socket: %s", 1911*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 1912*25039b37SCy Schubert #endif 1913*25039b37SCy Schubert return 0; 1914*25039b37SCy Schubert } 1915*25039b37SCy Schubert fd_set_nonblock(dtio->fd); 1916*25039b37SCy Schubert if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1917*25039b37SCy Schubert if(errno == EINPROGRESS) 1918*25039b37SCy Schubert return 1; /* wait until connect done*/ 1919*25039b37SCy Schubert #ifndef USE_WINSOCK 1920*25039b37SCy Schubert if(tcp_connect_errno_needs_log( 1921*25039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 1922*25039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 1923*25039b37SCy Schubert dtio->ip_str, strerror(errno)); 1924*25039b37SCy Schubert } 1925*25039b37SCy Schubert #else 1926*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS || 1927*25039b37SCy Schubert WSAGetLastError() == WSAEWOULDBLOCK) 1928*25039b37SCy Schubert return 1; /* wait until connect done*/ 1929*25039b37SCy Schubert if(tcp_connect_errno_needs_log( 1930*25039b37SCy Schubert (struct sockaddr *)&addr, addrlen)) { 1931*25039b37SCy Schubert log_err("dnstap io: failed to connect to %s: %s", 1932*25039b37SCy Schubert dtio->ip_str, wsa_strerror(WSAGetLastError())); 1933*25039b37SCy Schubert } 1934*25039b37SCy Schubert #endif 1935*25039b37SCy Schubert dtio_close_fd(dtio); 1936*25039b37SCy Schubert return 0; 1937*25039b37SCy Schubert } 1938*25039b37SCy Schubert return 1; 1939*25039b37SCy Schubert } 1940*25039b37SCy Schubert 1941*25039b37SCy Schubert /** setup the SSL structure for new connection */ 1942*25039b37SCy Schubert static int dtio_setup_ssl(struct dt_io_thread* dtio) 1943*25039b37SCy Schubert { 1944*25039b37SCy Schubert dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 1945*25039b37SCy Schubert if(!dtio->ssl) return 0; 1946*25039b37SCy Schubert dtio->ssl_handshake_done = 0; 1947*25039b37SCy Schubert dtio->ssl_brief_read = 0; 1948*25039b37SCy Schubert 1949*25039b37SCy Schubert if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 1950*25039b37SCy Schubert dtio->tls_use_sni)) { 1951*25039b37SCy Schubert return 0; 1952*25039b37SCy Schubert } 1953*25039b37SCy Schubert return 1; 1954*25039b37SCy Schubert } 1955*25039b37SCy Schubert 1956*25039b37SCy Schubert /** open the output file descriptor */ 1957*25039b37SCy Schubert static void dtio_open_output(struct dt_io_thread* dtio) 1958*25039b37SCy Schubert { 1959*25039b37SCy Schubert struct ub_event* ev; 1960*25039b37SCy Schubert if(dtio->upstream_is_unix) { 1961*25039b37SCy Schubert if(!dtio_open_output_local(dtio)) { 1962*25039b37SCy Schubert dtio_reconnect_enable(dtio); 1963*25039b37SCy Schubert return; 1964*25039b37SCy Schubert } 1965*25039b37SCy Schubert } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 1966*25039b37SCy Schubert if(!dtio_open_output_tcp(dtio)) { 1967*25039b37SCy Schubert dtio_reconnect_enable(dtio); 1968*25039b37SCy Schubert return; 1969*25039b37SCy Schubert } 1970*25039b37SCy Schubert if(dtio->upstream_is_tls) { 1971*25039b37SCy Schubert if(!dtio_setup_ssl(dtio)) { 1972*25039b37SCy Schubert dtio_close_fd(dtio); 1973*25039b37SCy Schubert dtio_reconnect_enable(dtio); 1974*25039b37SCy Schubert return; 1975*25039b37SCy Schubert } 1976*25039b37SCy Schubert } 1977*25039b37SCy Schubert } 1978*25039b37SCy Schubert dtio->check_nb_connect = 1; 1979*25039b37SCy Schubert 1980*25039b37SCy Schubert /* the EV_READ is to read ACCEPT control messages, and catch channel 1981*25039b37SCy Schubert * close. EV_WRITE is to write packets */ 1982*25039b37SCy Schubert ev = ub_event_new(dtio->event_base, dtio->fd, 1983*25039b37SCy Schubert UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 1984*25039b37SCy Schubert dtio); 1985*25039b37SCy Schubert if(!ev) { 1986*25039b37SCy Schubert log_err("dnstap io: out of memory"); 1987*25039b37SCy Schubert if(dtio->ssl) { 1988*25039b37SCy Schubert #ifdef HAVE_SSL 1989*25039b37SCy Schubert SSL_free(dtio->ssl); 1990*25039b37SCy Schubert dtio->ssl = NULL; 1991*25039b37SCy Schubert #endif 1992*25039b37SCy Schubert } 1993*25039b37SCy Schubert dtio_close_fd(dtio); 1994*25039b37SCy Schubert dtio_reconnect_enable(dtio); 1995*25039b37SCy Schubert return; 1996*25039b37SCy Schubert } 1997*25039b37SCy Schubert dtio->event = ev; 1998*25039b37SCy Schubert 1999*25039b37SCy Schubert /* setup protocol control message to start */ 2000*25039b37SCy Schubert if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2001*25039b37SCy Schubert (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2002*25039b37SCy Schubert log_err("dnstap io: out of memory"); 2003*25039b37SCy Schubert ub_event_free(dtio->event); 2004*25039b37SCy Schubert dtio->event = NULL; 2005*25039b37SCy Schubert if(dtio->ssl) { 2006*25039b37SCy Schubert #ifdef HAVE_SSL 2007*25039b37SCy Schubert SSL_free(dtio->ssl); 2008*25039b37SCy Schubert dtio->ssl = NULL; 2009*25039b37SCy Schubert #endif 2010*25039b37SCy Schubert } 2011*25039b37SCy Schubert dtio_close_fd(dtio); 2012*25039b37SCy Schubert dtio_reconnect_enable(dtio); 2013*25039b37SCy Schubert return; 2014*25039b37SCy Schubert } 2015*25039b37SCy Schubert } 2016*25039b37SCy Schubert 2017*25039b37SCy Schubert /** perform the setup of the writer thread on the established event_base */ 2018*25039b37SCy Schubert static void dtio_setup_on_base(struct dt_io_thread* dtio) 2019*25039b37SCy Schubert { 2020*25039b37SCy Schubert dtio_setup_cmd(dtio); 2021*25039b37SCy Schubert dtio_setup_reconnect(dtio); 2022*25039b37SCy Schubert dtio_open_output(dtio); 2023*25039b37SCy Schubert if(!dtio_add_output_event_write(dtio)) 2024*25039b37SCy Schubert return; 2025*25039b37SCy Schubert } 2026*25039b37SCy Schubert 2027*25039b37SCy Schubert #ifndef THREADS_DISABLED 2028*25039b37SCy Schubert /** the IO thread function for the DNSTAP IO */ 2029*25039b37SCy Schubert static void* dnstap_io(void* arg) 2030*25039b37SCy Schubert { 2031*25039b37SCy Schubert struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2032*25039b37SCy Schubert time_t secs = 0; 2033*25039b37SCy Schubert struct timeval now; 2034*25039b37SCy Schubert log_thread_set(&dtio->threadnum); 2035*25039b37SCy Schubert 2036*25039b37SCy Schubert /* setup */ 2037*25039b37SCy Schubert verbose(VERB_ALGO, "start dnstap io thread"); 2038*25039b37SCy Schubert dtio_setup_base(dtio, &secs, &now); 2039*25039b37SCy Schubert dtio_setup_on_base(dtio); 2040*25039b37SCy Schubert 2041*25039b37SCy Schubert /* run */ 2042*25039b37SCy Schubert if(ub_event_base_dispatch(dtio->event_base) < 0) { 2043*25039b37SCy Schubert log_err("dnstap io: dispatch failed, errno is %s", 2044*25039b37SCy Schubert strerror(errno)); 2045*25039b37SCy Schubert } 2046*25039b37SCy Schubert 2047*25039b37SCy Schubert /* cleanup */ 2048*25039b37SCy Schubert verbose(VERB_ALGO, "stop dnstap io thread"); 2049*25039b37SCy Schubert dtio_desetup(dtio); 2050*25039b37SCy Schubert return NULL; 2051*25039b37SCy Schubert } 2052*25039b37SCy Schubert #endif /* THREADS_DISABLED */ 2053*25039b37SCy Schubert 2054*25039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2055*25039b37SCy Schubert int numworkers) 2056*25039b37SCy Schubert { 2057*25039b37SCy Schubert /* set up the thread, can fail */ 2058*25039b37SCy Schubert #ifndef USE_WINSOCK 2059*25039b37SCy Schubert if(pipe(dtio->commandpipe) == -1) { 2060*25039b37SCy Schubert log_err("failed to create pipe: %s", strerror(errno)); 2061*25039b37SCy Schubert return 0; 2062*25039b37SCy Schubert } 2063*25039b37SCy Schubert #else 2064*25039b37SCy Schubert if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2065*25039b37SCy Schubert log_err("failed to create _pipe: %s", 2066*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 2067*25039b37SCy Schubert return 0; 2068*25039b37SCy Schubert } 2069*25039b37SCy Schubert #endif 2070*25039b37SCy Schubert 2071*25039b37SCy Schubert /* start the thread */ 2072*25039b37SCy Schubert dtio->threadnum = numworkers+1; 2073*25039b37SCy Schubert dtio->started = 1; 2074*25039b37SCy Schubert #ifndef THREADS_DISABLED 2075*25039b37SCy Schubert ub_thread_create(&dtio->tid, dnstap_io, dtio); 2076*25039b37SCy Schubert (void)event_base_nothr; 2077*25039b37SCy Schubert #else 2078*25039b37SCy Schubert dtio->event_base = event_base_nothr; 2079*25039b37SCy Schubert dtio_setup_on_base(dtio); 2080*25039b37SCy Schubert #endif 2081*25039b37SCy Schubert return 1; 2082*25039b37SCy Schubert } 2083*25039b37SCy Schubert 2084*25039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio) 2085*25039b37SCy Schubert { 2086*25039b37SCy Schubert #ifndef THREADS_DISABLED 2087*25039b37SCy Schubert uint8_t cmd = DTIO_COMMAND_STOP; 2088*25039b37SCy Schubert #endif 2089*25039b37SCy Schubert if(!dtio) return; 2090*25039b37SCy Schubert if(!dtio->started) return; 2091*25039b37SCy Schubert verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2092*25039b37SCy Schubert 2093*25039b37SCy Schubert #ifndef THREADS_DISABLED 2094*25039b37SCy Schubert while(1) { 2095*25039b37SCy Schubert ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2096*25039b37SCy Schubert if(r == -1) { 2097*25039b37SCy Schubert #ifndef USE_WINSOCK 2098*25039b37SCy Schubert if(errno == EINTR || errno == EAGAIN) 2099*25039b37SCy Schubert continue; 2100*25039b37SCy Schubert log_err("dnstap io stop: write: %s", strerror(errno)); 2101*25039b37SCy Schubert #else 2102*25039b37SCy Schubert if(WSAGetLastError() == WSAEINPROGRESS) 2103*25039b37SCy Schubert continue; 2104*25039b37SCy Schubert if(WSAGetLastError() == WSAEWOULDBLOCK) 2105*25039b37SCy Schubert continue; 2106*25039b37SCy Schubert log_err("dnstap io stop: write: %s", 2107*25039b37SCy Schubert wsa_strerror(WSAGetLastError())); 2108*25039b37SCy Schubert #endif 2109*25039b37SCy Schubert break; 2110*25039b37SCy Schubert } 2111*25039b37SCy Schubert break; 2112*25039b37SCy Schubert } 2113*25039b37SCy Schubert dtio->started = 0; 2114*25039b37SCy Schubert #endif /* THREADS_DISABLED */ 2115*25039b37SCy Schubert 2116*25039b37SCy Schubert #ifndef USE_WINSOCK 2117*25039b37SCy Schubert close(dtio->commandpipe[1]); 2118*25039b37SCy Schubert #else 2119*25039b37SCy Schubert _close(dtio->commandpipe[1]); 2120*25039b37SCy Schubert #endif 2121*25039b37SCy Schubert dtio->commandpipe[1] = -1; 2122*25039b37SCy Schubert #ifndef THREADS_DISABLED 2123*25039b37SCy Schubert ub_thread_join(dtio->tid); 2124*25039b37SCy Schubert #else 2125*25039b37SCy Schubert dtio->want_to_exit = 1; 2126*25039b37SCy Schubert dtio_desetup(dtio); 2127*25039b37SCy Schubert #endif 2128*25039b37SCy Schubert } 2129