1 /* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #include "config.h" 45 #include "dnstap/dtstream.h" 46 #include "dnstap/dnstap_fstrm.h" 47 #include "util/config_file.h" 48 #include "util/ub_event.h" 49 #include "util/net_help.h" 50 #include "services/outside_network.h" 51 #include "sldns/sbuffer.h" 52 #ifdef HAVE_SYS_UN_H 53 #include <sys/un.h> 54 #endif 55 #include <fcntl.h> 56 #ifdef HAVE_OPENSSL_SSL_H 57 #include <openssl/ssl.h> 58 #endif 59 #ifdef HAVE_OPENSSL_ERR_H 60 #include <openssl/err.h> 61 #endif 62 63 /** number of messages to process in one output callback */ 64 #define DTIO_MESSAGES_PER_CALLBACK 100 65 /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66 #define DTIO_RECONNECT_TIMEOUT_MIN 10 67 /** the msec to wait for reconnect max after backoff */ 68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71 /** number of messages before wakeup of thread */ 72 #define DTIO_MSG_FOR_WAKEUP 32 73 74 /** maximum length of received frame */ 75 #define DTIO_RECV_FRAME_MAX_LEN 1000 76 77 struct stop_flush_info; 78 /** DTIO command channel commands */ 79 enum { 80 /** DTIO command channel stop */ 81 DTIO_COMMAND_STOP = 0, 82 /** DTIO command channel wakeup */ 83 DTIO_COMMAND_WAKEUP = 1 84 } dtio_channel_command; 85 86 /** open the output channel */ 87 static void dtio_open_output(struct dt_io_thread* dtio); 88 /** add output event for read and write */ 89 static int dtio_add_output_event_write(struct dt_io_thread* dtio); 90 /** start reconnection attempts */ 91 static void dtio_reconnect_enable(struct dt_io_thread* dtio); 92 /** stop from stop_flush event loop */ 93 static void dtio_stop_flush_exit(struct stop_flush_info* info); 94 /** setup a start control message */ 95 static int dtio_control_start_send(struct dt_io_thread* dtio); 96 #ifdef HAVE_SSL 97 /** enable briefly waiting for a read event, for SSL negotiation */ 98 static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99 /** enable briefly waiting for a write event, for SSL negotiation */ 100 static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101 #endif 102 103 struct dt_msg_queue* 104 dt_msg_queue_create(struct comm_base* base) 105 { 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ 111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119 } 120 121 /** clear the message list, caller must hold the lock */ 122 static void 123 dt_msg_queue_clear(struct dt_msg_queue* mq) 124 { 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; 135 mq->msgcount = 0; 136 } 137 138 void 139 dt_msg_queue_delete(struct dt_msg_queue* mq) 140 { 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); 144 comm_timer_delete(mq->wakeup_timer); 145 free(mq); 146 } 147 148 /** make the dtio wake up by sending a wakeup command */ 149 static void dtio_wakeup(struct dt_io_thread* dtio) 150 { 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158 #ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; 161 #else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; 166 #endif 167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); 169 break; 170 } 171 break; 172 } 173 } 174 175 void 176 mq_wakeup_cb(void* arg) 177 { 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 /* even if the dtio is already active, because perhaps much 180 * traffic suddenly, we leave the timer running to save on 181 * managing it, the once a second timer is less work then 182 * starting and stopping the timer frequently */ 183 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184 mq->dtio->wakeup_timer_enabled = 0; 185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186 dtio_wakeup(mq->dtio); 187 } 188 189 /** start timer to wakeup dtio because there is content in the queue */ 190 static void 191 dt_msg_queue_start_timer(struct dt_msg_queue* mq) 192 { 193 struct timeval tv; 194 /* Start a timer to process messages to be logged. 195 * If we woke up the dtio thread for every message, the wakeup 196 * messages take up too much processing power. If the queue 197 * fills up the wakeup happens immediately. The timer wakes it up 198 * if there are infrequent messages to log. */ 199 200 /* we cannot start a timer in dtio thread, because it is a different 201 * thread and its event base is in use by the other thread, it would 202 * give race conditions if we tried to modify its event base, 203 * and locks would wait until it woke up, and this is what we do. */ 204 205 /* do not start the timer if a timer already exists, perhaps 206 * in another worker. So this variable is protected by a lock in 207 * dtio */ 208 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 209 if(mq->dtio->wakeup_timer_enabled) { 210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 211 return; 212 } 213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215 216 /* start the timer, in mq, in the event base of our worker */ 217 tv.tv_sec = 1; 218 tv.tv_usec = 0; 219 comm_timer_set(mq->wakeup_timer, &tv); 220 } 221 222 void 223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 224 { 225 int wakeupnow = 0, wakeupstarttimer = 0; 226 struct dt_msg_entry* entry; 227 228 /* check conditions */ 229 if(!buf) return; 230 if(len == 0) { 231 /* it is not possible to log entries with zero length, 232 * because the framestream protocol does not carry it. 233 * However the protobuf serialization does not create zero 234 * length datagrams for dnstap, so this should not happen. */ 235 free(buf); 236 return; 237 } 238 if(!mq) { 239 free(buf); 240 return; 241 } 242 243 /* allocate memory for queue entry */ 244 entry = malloc(sizeof(*entry)); 245 if(!entry) { 246 log_err("out of memory logging dnstap"); 247 free(buf); 248 return; 249 } 250 entry->next = NULL; 251 entry->buf = buf; 252 entry->len = len; 253 254 /* aqcuire lock */ 255 lock_basic_lock(&mq->lock); 256 /* if list was empty, start timer for (eventual) wakeup */ 257 if(mq->first == NULL) 258 wakeupstarttimer = 1; 259 /* if list contains more than wakeupnum elements, wakeup now, 260 * or if list is (going to be) almost full */ 261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 262 (mq->cursize < mq->maxsize * 9 / 10 && 263 mq->cursize+len >= mq->maxsize * 9 / 10)) 264 wakeupnow = 1; 265 /* see if it is going to fit */ 266 if(mq->cursize + len > mq->maxsize) { 267 /* buffer full, or congested. */ 268 /* drop */ 269 lock_basic_unlock(&mq->lock); 270 free(buf); 271 free(entry); 272 return; 273 } 274 mq->cursize += len; 275 mq->msgcount ++; 276 /* append to list */ 277 if(mq->last) { 278 mq->last->next = entry; 279 } else { 280 mq->first = entry; 281 } 282 mq->last = entry; 283 /* release lock */ 284 lock_basic_unlock(&mq->lock); 285 286 if(wakeupnow) { 287 dtio_wakeup(mq->dtio); 288 } else if(wakeupstarttimer) { 289 dt_msg_queue_start_timer(mq); 290 } 291 } 292 293 struct dt_io_thread* dt_io_thread_create(void) 294 { 295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 296 lock_basic_init(&dtio->wakeup_timer_lock); 297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 298 sizeof(dtio->wakeup_timer_enabled)); 299 return dtio; 300 } 301 302 void dt_io_thread_delete(struct dt_io_thread* dtio) 303 { 304 struct dt_io_list_item* item, *nextitem; 305 if(!dtio) return; 306 lock_basic_destroy(&dtio->wakeup_timer_lock); 307 item=dtio->io_list; 308 while(item) { 309 nextitem = item->next; 310 free(item); 311 item = nextitem; 312 } 313 free(dtio->socket_path); 314 free(dtio->ip_str); 315 free(dtio->tls_server_name); 316 free(dtio->client_key_file); 317 free(dtio->client_cert_file); 318 if(dtio->ssl_ctx) { 319 #ifdef HAVE_SSL 320 SSL_CTX_free(dtio->ssl_ctx); 321 #endif 322 } 323 free(dtio); 324 } 325 326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 327 { 328 if(!cfg->dnstap) { 329 log_warn("cannot setup dnstap because dnstap-enable is no"); 330 return 0; 331 } 332 333 /* what type of connectivity do we have */ 334 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 335 if(cfg->dnstap_tls) 336 dtio->upstream_is_tls = 1; 337 else dtio->upstream_is_tcp = 1; 338 } else { 339 dtio->upstream_is_unix = 1; 340 } 341 dtio->is_bidirectional = cfg->dnstap_bidirectional; 342 343 if(dtio->upstream_is_unix) { 344 char* nm; 345 if(!cfg->dnstap_socket_path || 346 cfg->dnstap_socket_path[0]==0) { 347 log_err("dnstap setup: no dnstap-socket-path for " 348 "socket connect"); 349 return 0; 350 } 351 nm = cfg->dnstap_socket_path; 352 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 353 cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 354 nm += strlen(cfg->chrootdir); 355 free(dtio->socket_path); 356 dtio->socket_path = strdup(nm); 357 if(!dtio->socket_path) { 358 log_err("dnstap setup: malloc failure"); 359 return 0; 360 } 361 } 362 363 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 364 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 365 log_err("dnstap setup: no dnstap-ip for TCP connect"); 366 return 0; 367 } 368 free(dtio->ip_str); 369 dtio->ip_str = strdup(cfg->dnstap_ip); 370 if(!dtio->ip_str) { 371 log_err("dnstap setup: malloc failure"); 372 return 0; 373 } 374 } 375 376 if(dtio->upstream_is_tls) { 377 #ifdef HAVE_SSL 378 if(cfg->dnstap_tls_server_name && 379 cfg->dnstap_tls_server_name[0]) { 380 free(dtio->tls_server_name); 381 dtio->tls_server_name = strdup( 382 cfg->dnstap_tls_server_name); 383 if(!dtio->tls_server_name) { 384 log_err("dnstap setup: malloc failure"); 385 return 0; 386 } 387 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 388 return 0; 389 } 390 if(cfg->dnstap_tls_client_key_file && 391 cfg->dnstap_tls_client_key_file[0]) { 392 dtio->use_client_certs = 1; 393 free(dtio->client_key_file); 394 dtio->client_key_file = strdup( 395 cfg->dnstap_tls_client_key_file); 396 if(!dtio->client_key_file) { 397 log_err("dnstap setup: malloc failure"); 398 return 0; 399 } 400 if(!cfg->dnstap_tls_client_cert_file || 401 cfg->dnstap_tls_client_cert_file[0]==0) { 402 log_err("dnstap setup: client key " 403 "authentication enabled with " 404 "dnstap-tls-client-key-file, but " 405 "no dnstap-tls-client-cert-file " 406 "is given"); 407 return 0; 408 } 409 free(dtio->client_cert_file); 410 dtio->client_cert_file = strdup( 411 cfg->dnstap_tls_client_cert_file); 412 if(!dtio->client_cert_file) { 413 log_err("dnstap setup: malloc failure"); 414 return 0; 415 } 416 } else { 417 dtio->use_client_certs = 0; 418 dtio->client_key_file = NULL; 419 dtio->client_cert_file = NULL; 420 } 421 422 if(cfg->dnstap_tls_cert_bundle) { 423 dtio->ssl_ctx = connect_sslctx_create( 424 dtio->client_key_file, 425 dtio->client_cert_file, 426 cfg->dnstap_tls_cert_bundle, 0); 427 } else { 428 dtio->ssl_ctx = connect_sslctx_create( 429 dtio->client_key_file, 430 dtio->client_cert_file, 431 cfg->tls_cert_bundle, cfg->tls_win_cert); 432 } 433 if(!dtio->ssl_ctx) { 434 log_err("could not setup SSL CTX"); 435 return 0; 436 } 437 dtio->tls_use_sni = cfg->tls_use_sni; 438 #endif /* HAVE_SSL */ 439 } 440 return 1; 441 } 442 443 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 444 struct dt_msg_queue* mq) 445 { 446 struct dt_io_list_item* item = malloc(sizeof(*item)); 447 if(!item) return 0; 448 lock_basic_lock(&mq->lock); 449 mq->dtio = dtio; 450 lock_basic_unlock(&mq->lock); 451 item->queue = mq; 452 item->next = dtio->io_list; 453 dtio->io_list = item; 454 dtio->io_list_iter = NULL; 455 return 1; 456 } 457 458 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 459 struct dt_msg_queue* mq) 460 { 461 struct dt_io_list_item* item, *prev=NULL; 462 if(!dtio) return; 463 item = dtio->io_list; 464 while(item) { 465 if(item->queue == mq) { 466 /* found it */ 467 if(prev) prev->next = item->next; 468 else dtio->io_list = item->next; 469 /* the queue itself only registered, not deleted */ 470 lock_basic_lock(&item->queue->lock); 471 item->queue->dtio = NULL; 472 lock_basic_unlock(&item->queue->lock); 473 free(item); 474 dtio->io_list_iter = NULL; 475 return; 476 } 477 prev = item; 478 item = item->next; 479 } 480 } 481 482 /** pick a message from the queue, the routine locks and unlocks, 483 * returns true if there is a message */ 484 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 485 size_t* len) 486 { 487 lock_basic_lock(&mq->lock); 488 if(mq->first) { 489 struct dt_msg_entry* entry = mq->first; 490 mq->first = entry->next; 491 if(!entry->next) mq->last = NULL; 492 mq->cursize -= entry->len; 493 mq->msgcount --; 494 lock_basic_unlock(&mq->lock); 495 496 *buf = entry->buf; 497 *len = entry->len; 498 free(entry); 499 return 1; 500 } 501 lock_basic_unlock(&mq->lock); 502 return 0; 503 } 504 505 /** find message in queue, false if no message, true if message to send */ 506 static int dtio_find_in_queue(struct dt_io_thread* dtio, 507 struct dt_msg_queue* mq) 508 { 509 void* buf=NULL; 510 size_t len=0; 511 if(dt_msg_queue_pop(mq, &buf, &len)) { 512 dtio->cur_msg = buf; 513 dtio->cur_msg_len = len; 514 dtio->cur_msg_done = 0; 515 dtio->cur_msg_len_done = 0; 516 return 1; 517 } 518 return 0; 519 } 520 521 /** find a new message to write, search message queues, false if none */ 522 static int dtio_find_msg(struct dt_io_thread* dtio) 523 { 524 struct dt_io_list_item *spot, *item; 525 526 spot = dtio->io_list_iter; 527 /* use the next queue for the next message lookup, 528 * if we hit the end(NULL) the NULL restarts the iter at start. */ 529 if(spot) 530 dtio->io_list_iter = spot->next; 531 else if(dtio->io_list) 532 dtio->io_list_iter = dtio->io_list->next; 533 534 /* scan from spot to end-of-io_list */ 535 item = spot; 536 while(item) { 537 if(dtio_find_in_queue(dtio, item->queue)) 538 return 1; 539 item = item->next; 540 } 541 /* scan starting at the start-of-list (to wrap around the end) */ 542 item = dtio->io_list; 543 while(item) { 544 if(dtio_find_in_queue(dtio, item->queue)) 545 return 1; 546 item = item->next; 547 } 548 return 0; 549 } 550 551 /** callback for the dnstap reconnect, to start reconnecting to output */ 552 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 553 short ATTR_UNUSED(bits), void* arg) 554 { 555 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 556 dtio->reconnect_is_added = 0; 557 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 558 559 dtio_open_output(dtio); 560 if(dtio->event) { 561 if(!dtio_add_output_event_write(dtio)) 562 return; 563 /* nothing wrong so far, wait on the output event */ 564 return; 565 } 566 /* exponential backoff and retry on timer */ 567 dtio_reconnect_enable(dtio); 568 } 569 570 /** attempt to reconnect to the output, after a timeout */ 571 static void dtio_reconnect_enable(struct dt_io_thread* dtio) 572 { 573 struct timeval tv; 574 int msec; 575 if(dtio->want_to_exit) return; 576 if(dtio->reconnect_is_added) 577 return; /* already done */ 578 579 /* exponential backoff, store the value for next timeout */ 580 msec = dtio->reconnect_timeout; 581 if(msec == 0) { 582 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 583 } else { 584 dtio->reconnect_timeout = msec*2; 585 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 586 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 587 } 588 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 589 msec); 590 591 /* setup wait timer */ 592 memset(&tv, 0, sizeof(tv)); 593 tv.tv_sec = msec/1000; 594 tv.tv_usec = (msec%1000)*1000; 595 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 596 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 597 log_err("dnstap io: could not reconnect ev timer add"); 598 return; 599 } 600 dtio->reconnect_is_added = 1; 601 } 602 603 /** remove dtio reconnect timer */ 604 static void dtio_reconnect_del(struct dt_io_thread* dtio) 605 { 606 if(!dtio->reconnect_is_added) 607 return; 608 ub_timer_del(dtio->reconnect_timer); 609 dtio->reconnect_is_added = 0; 610 } 611 612 /** clear the reconnect exponential backoff timer. 613 * We have successfully connected so we can try again with short timeouts. */ 614 static void dtio_reconnect_clear(struct dt_io_thread* dtio) 615 { 616 dtio->reconnect_timeout = 0; 617 dtio_reconnect_del(dtio); 618 } 619 620 /** reconnect slowly, because we already know we have to wait for a bit */ 621 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 622 { 623 dtio_reconnect_del(dtio); 624 dtio->reconnect_timeout = msec; 625 dtio_reconnect_enable(dtio); 626 } 627 628 /** delete the current message in the dtio, and reset counters */ 629 static void dtio_cur_msg_free(struct dt_io_thread* dtio) 630 { 631 free(dtio->cur_msg); 632 dtio->cur_msg = NULL; 633 dtio->cur_msg_len = 0; 634 dtio->cur_msg_done = 0; 635 dtio->cur_msg_len_done = 0; 636 } 637 638 /** delete the buffer and counters used to read frame */ 639 static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 640 { 641 if(rb->buf) { 642 free(rb->buf); 643 rb->buf = NULL; 644 } 645 rb->buf_count = 0; 646 rb->buf_cap = 0; 647 rb->frame_len = 0; 648 rb->frame_len_done = 0; 649 rb->control_frame = 0; 650 } 651 652 /** del the output file descriptor event for listening */ 653 static void dtio_del_output_event(struct dt_io_thread* dtio) 654 { 655 if(!dtio->event_added) 656 return; 657 ub_event_del(dtio->event); 658 dtio->event_added = 0; 659 dtio->event_added_is_write = 0; 660 } 661 662 /** close dtio socket and set it to -1 */ 663 static void dtio_close_fd(struct dt_io_thread* dtio) 664 { 665 sock_close(dtio->fd); 666 dtio->fd = -1; 667 } 668 669 /** close and stop the output file descriptor event */ 670 static void dtio_close_output(struct dt_io_thread* dtio) 671 { 672 if(!dtio->event) 673 return; 674 ub_event_free(dtio->event); 675 dtio->event = NULL; 676 if(dtio->ssl) { 677 #ifdef HAVE_SSL 678 SSL_shutdown(dtio->ssl); 679 SSL_free(dtio->ssl); 680 dtio->ssl = NULL; 681 #endif 682 } 683 dtio_close_fd(dtio); 684 685 /* if there is a (partial) message, discard it 686 * we cannot send (the remainder of) it, and a new 687 * connection needs to start with a control frame. */ 688 if(dtio->cur_msg) { 689 dtio_cur_msg_free(dtio); 690 } 691 692 dtio->ready_frame_sent = 0; 693 dtio->accept_frame_received = 0; 694 dtio_read_frame_free(&dtio->read_frame); 695 696 dtio_reconnect_enable(dtio); 697 } 698 699 /** check for pending nonblocking connect errors, 700 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 701 static int dtio_check_nb_connect(struct dt_io_thread* dtio) 702 { 703 int error = 0; 704 socklen_t len = (socklen_t)sizeof(error); 705 if(!dtio->check_nb_connect) 706 return 1; /* everything okay */ 707 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 708 &len) < 0) { 709 #ifndef USE_WINSOCK 710 error = errno; /* on solaris errno is error */ 711 #else 712 error = WSAGetLastError(); 713 #endif 714 } 715 #ifndef USE_WINSOCK 716 #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 717 if(error == EINPROGRESS || error == EWOULDBLOCK) 718 return 0; /* try again later */ 719 #endif 720 #else 721 if(error == WSAEINPROGRESS) { 722 return 0; /* try again later */ 723 } else if(error == WSAEWOULDBLOCK) { 724 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 725 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 726 return 0; /* try again later */ 727 } 728 #endif 729 if(error != 0) { 730 char* to = dtio->socket_path; 731 if(!to) to = dtio->ip_str; 732 if(!to) to = ""; 733 log_err("dnstap io: failed to connect to \"%s\": %s", 734 to, sock_strerror(error)); 735 return -1; /* error, close it */ 736 } 737 738 if(dtio->ip_str) 739 verbose(VERB_DETAIL, "dnstap io: connected to %s", 740 dtio->ip_str); 741 else if(dtio->socket_path) 742 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 743 dtio->socket_path); 744 dtio_reconnect_clear(dtio); 745 dtio->check_nb_connect = 0; 746 return 1; /* everything okay */ 747 } 748 749 #ifdef HAVE_SSL 750 /** write to ssl output 751 * returns number of bytes written, 0 if nothing happened, 752 * try again later, or -1 if the channel is to be closed. */ 753 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 754 size_t len) 755 { 756 int r; 757 ERR_clear_error(); 758 r = SSL_write(dtio->ssl, buf, len); 759 if(r <= 0) { 760 int want = SSL_get_error(dtio->ssl, r); 761 if(want == SSL_ERROR_ZERO_RETURN) { 762 /* closed */ 763 return -1; 764 } else if(want == SSL_ERROR_WANT_READ) { 765 /* we want a brief read event */ 766 dtio_enable_brief_read(dtio); 767 return 0; 768 } else if(want == SSL_ERROR_WANT_WRITE) { 769 /* write again later */ 770 return 0; 771 } else if(want == SSL_ERROR_SYSCALL) { 772 #ifdef EPIPE 773 if(errno == EPIPE && verbosity < 2) 774 return -1; /* silence 'broken pipe' */ 775 #endif 776 #ifdef ECONNRESET 777 if(errno == ECONNRESET && verbosity < 2) 778 return -1; /* silence reset by peer */ 779 #endif 780 if(errno != 0) { 781 log_err("dnstap io, SSL_write syscall: %s", 782 strerror(errno)); 783 } 784 return -1; 785 } 786 log_crypto_err("dnstap io, could not SSL_write"); 787 return -1; 788 } 789 return r; 790 } 791 #endif /* HAVE_SSL */ 792 793 /** write buffer to output. 794 * returns number of bytes written, 0 if nothing happened, 795 * try again later, or -1 if the channel is to be closed. */ 796 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 797 size_t len) 798 { 799 ssize_t ret; 800 if(dtio->fd == -1) 801 return -1; 802 #ifdef HAVE_SSL 803 if(dtio->ssl) 804 return dtio_write_ssl(dtio, buf, len); 805 #endif 806 ret = send(dtio->fd, (void*)buf, len, 0); 807 if(ret == -1) { 808 #ifndef USE_WINSOCK 809 if(errno == EINTR || errno == EAGAIN) 810 return 0; 811 #else 812 if(WSAGetLastError() == WSAEINPROGRESS) 813 return 0; 814 if(WSAGetLastError() == WSAEWOULDBLOCK) { 815 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 816 dtio->stop_flush_event:dtio->event), 817 UB_EV_WRITE); 818 return 0; 819 } 820 #endif 821 log_err("dnstap io: failed send: %s", sock_strerror(errno)); 822 return -1; 823 } 824 return ret; 825 } 826 827 #ifdef HAVE_WRITEV 828 /** write with writev, len and message, in one write, if possible. 829 * return true if message is done, false if incomplete */ 830 static int dtio_write_with_writev(struct dt_io_thread* dtio) 831 { 832 uint32_t sendlen = htonl(dtio->cur_msg_len); 833 struct iovec iov[2]; 834 ssize_t r; 835 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 836 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 837 iov[1].iov_base = dtio->cur_msg; 838 iov[1].iov_len = dtio->cur_msg_len; 839 log_assert(iov[0].iov_len > 0); 840 r = writev(dtio->fd, iov, 2); 841 if(r == -1) { 842 #ifndef USE_WINSOCK 843 if(errno == EINTR || errno == EAGAIN) 844 return 0; 845 #else 846 if(WSAGetLastError() == WSAEINPROGRESS) 847 return 0; 848 if(WSAGetLastError() == WSAEWOULDBLOCK) { 849 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 850 dtio->stop_flush_event:dtio->event), 851 UB_EV_WRITE); 852 return 0; 853 } 854 #endif 855 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 856 /* close the channel */ 857 dtio_del_output_event(dtio); 858 dtio_close_output(dtio); 859 return 0; 860 } 861 /* written r bytes */ 862 dtio->cur_msg_len_done += r; 863 if(dtio->cur_msg_len_done < 4) 864 return 0; 865 if(dtio->cur_msg_len_done > 4) { 866 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 867 dtio->cur_msg_len_done = 4; 868 } 869 if(dtio->cur_msg_done < dtio->cur_msg_len) 870 return 0; 871 return 1; 872 } 873 #endif /* HAVE_WRITEV */ 874 875 /** write more of the length, preceding the data frame. 876 * return true if message is done, false if incomplete. */ 877 static int dtio_write_more_of_len(struct dt_io_thread* dtio) 878 { 879 uint32_t sendlen; 880 int r; 881 if(dtio->cur_msg_len_done >= 4) 882 return 1; 883 #ifdef HAVE_WRITEV 884 if(!dtio->ssl) { 885 /* we try writev for everything.*/ 886 return dtio_write_with_writev(dtio); 887 } 888 #endif /* HAVE_WRITEV */ 889 sendlen = htonl(dtio->cur_msg_len); 890 r = dtio_write_buf(dtio, 891 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 892 sizeof(sendlen)-dtio->cur_msg_len_done); 893 if(r == -1) { 894 /* close the channel */ 895 dtio_del_output_event(dtio); 896 dtio_close_output(dtio); 897 return 0; 898 } else if(r == 0) { 899 /* try again later */ 900 return 0; 901 } 902 dtio->cur_msg_len_done += r; 903 if(dtio->cur_msg_len_done < 4) 904 return 0; 905 return 1; 906 } 907 908 /** write more of the data frame. 909 * return true if message is done, false if incomplete. */ 910 static int dtio_write_more_of_data(struct dt_io_thread* dtio) 911 { 912 int r; 913 if(dtio->cur_msg_done >= dtio->cur_msg_len) 914 return 1; 915 r = dtio_write_buf(dtio, 916 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 917 dtio->cur_msg_len - dtio->cur_msg_done); 918 if(r == -1) { 919 /* close the channel */ 920 dtio_del_output_event(dtio); 921 dtio_close_output(dtio); 922 return 0; 923 } else if(r == 0) { 924 /* try again later */ 925 return 0; 926 } 927 dtio->cur_msg_done += r; 928 if(dtio->cur_msg_done < dtio->cur_msg_len) 929 return 0; 930 return 1; 931 } 932 933 /** write more of the current messsage. false if incomplete, true if 934 * the message is done */ 935 static int dtio_write_more(struct dt_io_thread* dtio) 936 { 937 if(dtio->cur_msg_len_done < 4) { 938 if(!dtio_write_more_of_len(dtio)) 939 return 0; 940 } 941 if(dtio->cur_msg_done < dtio->cur_msg_len) { 942 if(!dtio_write_more_of_data(dtio)) 943 return 0; 944 } 945 return 1; 946 } 947 948 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 949 * -1: continue, >0: number of bytes read into buffer */ 950 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 951 ssize_t r; 952 r = recv(dtio->fd, (void*)buf, len, 0); 953 if(r == -1) { 954 char* to = dtio->socket_path; 955 if(!to) to = dtio->ip_str; 956 if(!to) to = ""; 957 #ifndef USE_WINSOCK 958 if(errno == EINTR || errno == EAGAIN) 959 return -1; /* try later */ 960 #else 961 if(WSAGetLastError() == WSAEINPROGRESS) { 962 return -1; /* try later */ 963 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 964 ub_winsock_tcp_wouldblock( 965 (dtio->stop_flush_event? 966 dtio->stop_flush_event:dtio->event), 967 UB_EV_READ); 968 return -1; /* try later */ 969 } 970 #endif 971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 972 verbosity < 4) 973 return 0; /* no log retries on low verbosity */ 974 log_err("dnstap io: output closed, recv %s: %s", to, 975 strerror(errno)); 976 /* and close below */ 977 return 0; 978 } 979 if(r == 0) { 980 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 981 verbosity < 4) 982 return 0; /* no log retries on low verbosity */ 983 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 984 /* and close below */ 985 return 0; 986 } 987 /* something was received */ 988 return r; 989 } 990 991 #ifdef HAVE_SSL 992 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 993 * -1: continue, >0: number of bytes read into buffer */ 994 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 995 { 996 int r; 997 ERR_clear_error(); 998 r = SSL_read(dtio->ssl, buf, len); 999 if(r <= 0) { 1000 int want = SSL_get_error(dtio->ssl, r); 1001 if(want == SSL_ERROR_ZERO_RETURN) { 1002 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1003 verbosity < 4) 1004 return 0; /* no log retries on low verbosity */ 1005 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1006 "other side"); 1007 return 0; 1008 } else if(want == SSL_ERROR_WANT_READ) { 1009 /* continue later */ 1010 return -1; 1011 } else if(want == SSL_ERROR_WANT_WRITE) { 1012 (void)dtio_enable_brief_write(dtio); 1013 return -1; 1014 } else if(want == SSL_ERROR_SYSCALL) { 1015 #ifdef ECONNRESET 1016 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1017 errno == ECONNRESET && verbosity < 4) 1018 return 0; /* silence reset by peer */ 1019 #endif 1020 if(errno != 0) 1021 log_err("SSL_read syscall: %s", 1022 strerror(errno)); 1023 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1024 "other side"); 1025 return 0; 1026 } 1027 log_crypto_err("could not SSL_read"); 1028 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1029 "other side"); 1030 return 0; 1031 } 1032 return r; 1033 } 1034 #endif /* HAVE_SSL */ 1035 1036 /** check if the output fd has been closed, 1037 * it returns false if the stream is closed. */ 1038 static int dtio_check_close(struct dt_io_thread* dtio) 1039 { 1040 /* we don't want to read any packets, but if there are we can 1041 * discard the input (ignore it). Ignore of unknown (control) 1042 * packets is okay for the framestream protocol. And also, the 1043 * read call can return that the stream has been closed by the 1044 * other side. */ 1045 uint8_t buf[1024]; 1046 int r = -1; 1047 1048 1049 if(dtio->fd == -1) return 0; 1050 1051 while(r != 0) { 1052 /* not interested in buffer content, overwrite */ 1053 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 1054 if(r == -1) 1055 return 1; 1056 } 1057 /* the other end has been closed */ 1058 /* close the channel */ 1059 dtio_del_output_event(dtio); 1060 dtio_close_output(dtio); 1061 return 0; 1062 } 1063 1064 /** Read accept frame. Returns -1: continue reading, 0: closed, 1065 * 1: valid accept received. */ 1066 static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1067 { 1068 int r; 1069 size_t read_frame_done; 1070 while(dtio->read_frame.frame_len_done < 4) { 1071 #ifdef HAVE_SSL 1072 if(dtio->ssl) { 1073 r = ssl_read_bytes(dtio, 1074 (uint8_t*)&dtio->read_frame.frame_len+ 1075 dtio->read_frame.frame_len_done, 1076 4-dtio->read_frame.frame_len_done); 1077 } else { 1078 #endif 1079 r = receive_bytes(dtio, 1080 (uint8_t*)&dtio->read_frame.frame_len+ 1081 dtio->read_frame.frame_len_done, 1082 4-dtio->read_frame.frame_len_done); 1083 #ifdef HAVE_SSL 1084 } 1085 #endif 1086 if(r == -1) 1087 return -1; /* continue reading */ 1088 if(r == 0) { 1089 /* connection closed */ 1090 goto close_connection; 1091 } 1092 dtio->read_frame.frame_len_done += r; 1093 if(dtio->read_frame.frame_len_done < 4) 1094 return -1; /* continue reading */ 1095 1096 if(dtio->read_frame.frame_len == 0) { 1097 dtio->read_frame.frame_len_done = 0; 1098 dtio->read_frame.control_frame = 1; 1099 continue; 1100 } 1101 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1102 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1103 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1104 "length of %d bytes, closing connection", 1105 DTIO_RECV_FRAME_MAX_LEN); 1106 goto close_connection; 1107 } 1108 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1109 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1110 if(!dtio->read_frame.buf) { 1111 log_err("dnstap io: out of memory (creating read " 1112 "buffer)"); 1113 goto close_connection; 1114 } 1115 } 1116 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1117 #ifdef HAVE_SSL 1118 if(dtio->ssl) { 1119 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1120 dtio->read_frame.buf_count, 1121 dtio->read_frame.buf_cap- 1122 dtio->read_frame.buf_count); 1123 } else { 1124 #endif 1125 r = receive_bytes(dtio, dtio->read_frame.buf+ 1126 dtio->read_frame.buf_count, 1127 dtio->read_frame.buf_cap- 1128 dtio->read_frame.buf_count); 1129 #ifdef HAVE_SSL 1130 } 1131 #endif 1132 if(r == -1) 1133 return -1; /* continue reading */ 1134 if(r == 0) { 1135 /* connection closed */ 1136 goto close_connection; 1137 } 1138 dtio->read_frame.buf_count += r; 1139 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1140 return -1; /* continue reading */ 1141 } 1142 1143 /* Complete frame received, check if this is a valid ACCEPT control 1144 * frame. */ 1145 if(dtio->read_frame.frame_len < 4) { 1146 verbose(VERB_OPS, "dnstap: invalid data received"); 1147 goto close_connection; 1148 } 1149 if(sldns_read_uint32(dtio->read_frame.buf) != 1150 FSTRM_CONTROL_FRAME_ACCEPT) { 1151 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1152 "ignored"); 1153 dtio->ready_frame_sent = 0; 1154 dtio->accept_frame_received = 0; 1155 dtio_read_frame_free(&dtio->read_frame); 1156 return -1; 1157 } 1158 read_frame_done = 4; /* control frame type */ 1159 1160 /* Iterate over control fields, ignore unknown types. 1161 * Need to be able to read at least 8 bytes (control field type + 1162 * length). */ 1163 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1164 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1165 read_frame_done); 1166 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1167 read_frame_done + 4); 1168 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1169 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1170 read_frame_done+8+len <= 1171 dtio->read_frame.frame_len && 1172 memcmp(dtio->read_frame.buf + read_frame_done + 1173 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1174 if(!dtio_control_start_send(dtio)) { 1175 verbose(VERB_OPS, "dnstap io: out of " 1176 "memory while sending START frame"); 1177 goto close_connection; 1178 } 1179 dtio->accept_frame_received = 1; 1180 if(!dtio_add_output_event_write(dtio)) 1181 goto close_connection; 1182 return 1; 1183 } else { 1184 /* unknow content type */ 1185 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1186 "contains unknown content type, " 1187 "closing connection"); 1188 goto close_connection; 1189 } 1190 } 1191 /* unknown option, try next */ 1192 read_frame_done += 8+len; 1193 } 1194 1195 1196 close_connection: 1197 dtio_del_output_event(dtio); 1198 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1199 dtio_close_output(dtio); 1200 return 0; 1201 } 1202 1203 /** add the output file descriptor event for listening, read only */ 1204 static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1205 { 1206 if(!dtio->event) 1207 return 0; 1208 if(dtio->event_added && !dtio->event_added_is_write) 1209 return 1; 1210 /* we have to (re-)register the event */ 1211 if(dtio->event_added) 1212 ub_event_del(dtio->event); 1213 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1214 if(ub_event_add(dtio->event, NULL) != 0) { 1215 log_err("dnstap io: out of memory (adding event)"); 1216 dtio->event_added = 0; 1217 dtio->event_added_is_write = 0; 1218 /* close output and start reattempts to open it */ 1219 dtio_close_output(dtio); 1220 return 0; 1221 } 1222 dtio->event_added = 1; 1223 dtio->event_added_is_write = 0; 1224 return 1; 1225 } 1226 1227 /** add the output file descriptor event for listening, read and write */ 1228 static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1229 { 1230 if(!dtio->event) 1231 return 0; 1232 if(dtio->event_added && dtio->event_added_is_write) 1233 return 1; 1234 /* we have to (re-)register the event */ 1235 if(dtio->event_added) 1236 ub_event_del(dtio->event); 1237 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1238 if(ub_event_add(dtio->event, NULL) != 0) { 1239 log_err("dnstap io: out of memory (adding event)"); 1240 dtio->event_added = 0; 1241 dtio->event_added_is_write = 0; 1242 /* close output and start reattempts to open it */ 1243 dtio_close_output(dtio); 1244 return 0; 1245 } 1246 dtio->event_added = 1; 1247 dtio->event_added_is_write = 1; 1248 return 1; 1249 } 1250 1251 /** put the dtio thread to sleep */ 1252 static void dtio_sleep(struct dt_io_thread* dtio) 1253 { 1254 /* unregister the event polling for write, because there is 1255 * nothing to be written */ 1256 (void)dtio_add_output_event_read(dtio); 1257 } 1258 1259 #ifdef HAVE_SSL 1260 /** enable the brief read condition */ 1261 static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1262 { 1263 dtio->ssl_brief_read = 1; 1264 if(dtio->stop_flush_event) { 1265 ub_event_del(dtio->stop_flush_event); 1266 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1267 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1268 log_err("dnstap io, stop flush, could not ub_event_add"); 1269 return 0; 1270 } 1271 return 1; 1272 } 1273 return dtio_add_output_event_read(dtio); 1274 } 1275 #endif /* HAVE_SSL */ 1276 1277 #ifdef HAVE_SSL 1278 /** disable the brief read condition */ 1279 static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1280 { 1281 dtio->ssl_brief_read = 0; 1282 if(dtio->stop_flush_event) { 1283 ub_event_del(dtio->stop_flush_event); 1284 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1285 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1286 log_err("dnstap io, stop flush, could not ub_event_add"); 1287 return 0; 1288 } 1289 return 1; 1290 } 1291 return dtio_add_output_event_write(dtio); 1292 } 1293 #endif /* HAVE_SSL */ 1294 1295 #ifdef HAVE_SSL 1296 /** enable the brief write condition */ 1297 static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1298 { 1299 dtio->ssl_brief_write = 1; 1300 return dtio_add_output_event_write(dtio); 1301 } 1302 #endif /* HAVE_SSL */ 1303 1304 #ifdef HAVE_SSL 1305 /** disable the brief write condition */ 1306 static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1307 { 1308 dtio->ssl_brief_write = 0; 1309 return dtio_add_output_event_read(dtio); 1310 } 1311 #endif /* HAVE_SSL */ 1312 1313 #ifdef HAVE_SSL 1314 /** check peer verification after ssl handshake connection, false if closed*/ 1315 static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1316 { 1317 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1318 /* verification */ 1319 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1320 X509* x = SSL_get_peer_certificate(dtio->ssl); 1321 if(!x) { 1322 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1323 "connection failed no certificate", 1324 dtio->ip_str); 1325 return 0; 1326 } 1327 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1328 x); 1329 #ifdef HAVE_SSL_GET0_PEERNAME 1330 if(SSL_get0_peername(dtio->ssl)) { 1331 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1332 "connection to %s authenticated", 1333 dtio->ip_str, 1334 SSL_get0_peername(dtio->ssl)); 1335 } else { 1336 #endif 1337 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1338 "connection authenticated", 1339 dtio->ip_str); 1340 #ifdef HAVE_SSL_GET0_PEERNAME 1341 } 1342 #endif 1343 X509_free(x); 1344 } else { 1345 X509* x = SSL_get_peer_certificate(dtio->ssl); 1346 if(x) { 1347 log_cert(VERB_ALGO, "dnstap io, peer " 1348 "certificate", x); 1349 X509_free(x); 1350 } 1351 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1352 "failed: failed to authenticate", 1353 dtio->ip_str); 1354 return 0; 1355 } 1356 } else { 1357 /* unauthenticated, the verify peer flag was not set 1358 * in ssl when the ssl object was created from ssl_ctx */ 1359 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1360 dtio->ip_str); 1361 } 1362 return 1; 1363 } 1364 #endif /* HAVE_SSL */ 1365 1366 #ifdef HAVE_SSL 1367 /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1368 static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1369 struct stop_flush_info* info) 1370 { 1371 int r; 1372 if(dtio->ssl_brief_read) { 1373 /* assume the brief read condition is satisfied, 1374 * if we need more or again, we can set it again */ 1375 if(!dtio_disable_brief_read(dtio)) { 1376 if(info) dtio_stop_flush_exit(info); 1377 return 0; 1378 } 1379 } 1380 if(dtio->ssl_handshake_done) 1381 return 1; 1382 1383 ERR_clear_error(); 1384 r = SSL_do_handshake(dtio->ssl); 1385 if(r != 1) { 1386 int want = SSL_get_error(dtio->ssl, r); 1387 if(want == SSL_ERROR_WANT_READ) { 1388 /* we want to read on the connection */ 1389 if(!dtio_enable_brief_read(dtio)) { 1390 if(info) dtio_stop_flush_exit(info); 1391 return 0; 1392 } 1393 return 0; 1394 } else if(want == SSL_ERROR_WANT_WRITE) { 1395 /* we want to write on the connection */ 1396 return 0; 1397 } else if(r == 0) { 1398 /* closed */ 1399 if(info) dtio_stop_flush_exit(info); 1400 dtio_del_output_event(dtio); 1401 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1402 dtio_close_output(dtio); 1403 return 0; 1404 } else if(want == SSL_ERROR_SYSCALL) { 1405 /* SYSCALL and errno==0 means closed uncleanly */ 1406 int silent = 0; 1407 #ifdef EPIPE 1408 if(errno == EPIPE && verbosity < 2) 1409 silent = 1; /* silence 'broken pipe' */ 1410 #endif 1411 #ifdef ECONNRESET 1412 if(errno == ECONNRESET && verbosity < 2) 1413 silent = 1; /* silence reset by peer */ 1414 #endif 1415 if(errno == 0) 1416 silent = 1; 1417 if(!silent) 1418 log_err("dnstap io, SSL_handshake syscall: %s", 1419 strerror(errno)); 1420 /* closed */ 1421 if(info) dtio_stop_flush_exit(info); 1422 dtio_del_output_event(dtio); 1423 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1424 dtio_close_output(dtio); 1425 return 0; 1426 } else { 1427 unsigned long err = ERR_get_error(); 1428 if(!squelch_err_ssl_handshake(err)) { 1429 log_crypto_err_code("dnstap io, ssl handshake failed", 1430 err); 1431 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1432 "from %s", dtio->ip_str); 1433 } 1434 /* closed */ 1435 if(info) dtio_stop_flush_exit(info); 1436 dtio_del_output_event(dtio); 1437 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1438 dtio_close_output(dtio); 1439 return 0; 1440 } 1441 1442 } 1443 /* check peer verification */ 1444 dtio->ssl_handshake_done = 1; 1445 1446 if(!dtio_ssl_check_peer(dtio)) { 1447 /* closed */ 1448 if(info) dtio_stop_flush_exit(info); 1449 dtio_del_output_event(dtio); 1450 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1451 dtio_close_output(dtio); 1452 return 0; 1453 } 1454 return 1; 1455 } 1456 #endif /* HAVE_SSL */ 1457 1458 /** callback for the dnstap events, to write to the output */ 1459 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1460 { 1461 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1462 int i; 1463 1464 if(dtio->check_nb_connect) { 1465 int connect_err = dtio_check_nb_connect(dtio); 1466 if(connect_err == -1) { 1467 /* close the channel */ 1468 dtio_del_output_event(dtio); 1469 dtio_close_output(dtio); 1470 return; 1471 } else if(connect_err == 0) { 1472 /* try again later */ 1473 return; 1474 } 1475 /* nonblocking connect check passed, continue */ 1476 } 1477 1478 #ifdef HAVE_SSL 1479 if(dtio->ssl && 1480 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1481 if(!dtio_ssl_handshake(dtio, NULL)) 1482 return; 1483 } 1484 #endif 1485 1486 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1487 if(dtio->ssl_brief_write) 1488 (void)dtio_disable_brief_write(dtio); 1489 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1490 if(dtio_read_accept_frame(dtio) <= 0) 1491 return; 1492 } else if(!dtio_check_close(dtio)) 1493 return; 1494 } 1495 1496 /* loop to process a number of messages. This improves throughput, 1497 * because selecting on write-event if not needed for busy messages 1498 * (dnstap log) generation and if they need to all be written back. 1499 * The write event is usually not blocked up. But not forever, 1500 * because the event loop needs to stay responsive for other events. 1501 * If there are no (more) messages, or if the output buffers get 1502 * full, it returns out of the loop. */ 1503 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1504 /* see if there are messages that need writing */ 1505 if(!dtio->cur_msg) { 1506 if(!dtio_find_msg(dtio)) { 1507 if(i == 0) { 1508 /* no messages on the first iteration, 1509 * the queues are all empty */ 1510 dtio_sleep(dtio); 1511 } 1512 return; /* nothing to do */ 1513 } 1514 } 1515 1516 /* write it */ 1517 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1518 if(!dtio_write_more(dtio)) 1519 return; 1520 } 1521 1522 /* done with the current message */ 1523 dtio_cur_msg_free(dtio); 1524 1525 /* If this is a bidirectional stream the first message will be 1526 * the READY control frame. We can only continue writing after 1527 * receiving an ACCEPT control frame. */ 1528 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1529 dtio->ready_frame_sent = 1; 1530 (void)dtio_add_output_event_read(dtio); 1531 break; 1532 } 1533 } 1534 } 1535 1536 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1537 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1538 { 1539 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1540 uint8_t cmd; 1541 ssize_t r; 1542 if(dtio->want_to_exit) 1543 return; 1544 r = read(fd, &cmd, sizeof(cmd)); 1545 if(r == -1) { 1546 #ifndef USE_WINSOCK 1547 if(errno == EINTR || errno == EAGAIN) 1548 return; /* ignore this */ 1549 #else 1550 if(WSAGetLastError() == WSAEINPROGRESS) 1551 return; 1552 if(WSAGetLastError() == WSAEWOULDBLOCK) 1553 return; 1554 #endif 1555 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 1556 /* and then fall through to quit the thread */ 1557 } else if(r == 0) { 1558 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1559 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1560 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1561 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1562 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1563 1564 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1565 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1566 "waiting for ACCEPT control frame"); 1567 return; 1568 } 1569 1570 /* reregister event */ 1571 if(!dtio_add_output_event_write(dtio)) 1572 return; 1573 return; 1574 } else if(r == 1) { 1575 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1576 } 1577 dtio->want_to_exit = 1; 1578 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1579 != 0) { 1580 log_err("dnstap io: could not loopexit"); 1581 } 1582 } 1583 1584 #ifndef THREADS_DISABLED 1585 /** setup the event base for the dnstap io thread */ 1586 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1587 struct timeval* now) 1588 { 1589 memset(now, 0, sizeof(*now)); 1590 dtio->event_base = ub_default_event_base(0, secs, now); 1591 if(!dtio->event_base) { 1592 fatal_exit("dnstap io: could not create event_base"); 1593 } 1594 } 1595 #endif /* THREADS_DISABLED */ 1596 1597 /** setup the cmd event for dnstap io */ 1598 static void dtio_setup_cmd(struct dt_io_thread* dtio) 1599 { 1600 struct ub_event* cmdev; 1601 fd_set_nonblock(dtio->commandpipe[0]); 1602 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1603 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1604 if(!cmdev) { 1605 fatal_exit("dnstap io: out of memory"); 1606 } 1607 dtio->command_event = cmdev; 1608 if(ub_event_add(cmdev, NULL) != 0) { 1609 fatal_exit("dnstap io: out of memory (adding event)"); 1610 } 1611 } 1612 1613 /** setup the reconnect event for dnstap io */ 1614 static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1615 { 1616 dtio_reconnect_clear(dtio); 1617 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1618 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1619 if(!dtio->reconnect_timer) { 1620 fatal_exit("dnstap io: out of memory"); 1621 } 1622 } 1623 1624 /** 1625 * structure to keep track of information during stop flush 1626 */ 1627 struct stop_flush_info { 1628 /** the event base during stop flush */ 1629 struct ub_event_base* base; 1630 /** did we already want to exit this stop-flush event base */ 1631 int want_to_exit_flush; 1632 /** has the timer fired */ 1633 int timer_done; 1634 /** the dtio */ 1635 struct dt_io_thread* dtio; 1636 /** the stop control frame */ 1637 void* stop_frame; 1638 /** length of the stop frame */ 1639 size_t stop_frame_len; 1640 /** how much we have done of the stop frame */ 1641 size_t stop_frame_done; 1642 }; 1643 1644 /** exit the stop flush base */ 1645 static void dtio_stop_flush_exit(struct stop_flush_info* info) 1646 { 1647 if(info->want_to_exit_flush) 1648 return; 1649 info->want_to_exit_flush = 1; 1650 if(ub_event_base_loopexit(info->base) != 0) { 1651 log_err("dnstap io: could not loopexit"); 1652 } 1653 } 1654 1655 /** send the stop control, 1656 * return true if completed the frame. */ 1657 static int dtio_control_stop_send(struct stop_flush_info* info) 1658 { 1659 struct dt_io_thread* dtio = info->dtio; 1660 int r; 1661 if(info->stop_frame_done >= info->stop_frame_len) 1662 return 1; 1663 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1664 info->stop_frame_done, info->stop_frame_len - 1665 info->stop_frame_done); 1666 if(r == -1) { 1667 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1668 dtio_stop_flush_exit(info); 1669 return 0; 1670 } 1671 if(r == 0) { 1672 /* try again later, or timeout */ 1673 return 0; 1674 } 1675 info->stop_frame_done += r; 1676 if(info->stop_frame_done < info->stop_frame_len) 1677 return 0; /* not done yet */ 1678 return 1; 1679 } 1680 1681 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1682 void* arg) 1683 { 1684 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1685 if(info->want_to_exit_flush) 1686 return; 1687 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1688 info->timer_done = 1; 1689 dtio_stop_flush_exit(info); 1690 } 1691 1692 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1693 { 1694 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1695 struct dt_io_thread* dtio = info->dtio; 1696 if(info->want_to_exit_flush) 1697 return; 1698 if(dtio->check_nb_connect) { 1699 /* we don't start the stop_flush if connect still 1700 * in progress, but the check code is here, just in case */ 1701 int connect_err = dtio_check_nb_connect(dtio); 1702 if(connect_err == -1) { 1703 /* close the channel, exit the stop flush */ 1704 dtio_stop_flush_exit(info); 1705 dtio_del_output_event(dtio); 1706 dtio_close_output(dtio); 1707 return; 1708 } else if(connect_err == 0) { 1709 /* try again later */ 1710 return; 1711 } 1712 /* nonblocking connect check passed, continue */ 1713 } 1714 #ifdef HAVE_SSL 1715 if(dtio->ssl && 1716 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1717 if(!dtio_ssl_handshake(dtio, info)) 1718 return; 1719 } 1720 #endif 1721 1722 if((bits&UB_EV_READ)) { 1723 if(!dtio_check_close(dtio)) { 1724 if(dtio->fd == -1) { 1725 verbose(VERB_ALGO, "dnstap io: " 1726 "stop flush: output closed"); 1727 dtio_stop_flush_exit(info); 1728 } 1729 return; 1730 } 1731 } 1732 /* write remainder of last frame */ 1733 if(dtio->cur_msg) { 1734 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1735 if(!dtio_write_more(dtio)) { 1736 if(dtio->fd == -1) { 1737 verbose(VERB_ALGO, "dnstap io: " 1738 "stop flush: output closed"); 1739 dtio_stop_flush_exit(info); 1740 } 1741 return; 1742 } 1743 } 1744 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1745 "last frame"); 1746 dtio_cur_msg_free(dtio); 1747 } 1748 /* write stop frame */ 1749 if(info->stop_frame_done < info->stop_frame_len) { 1750 if(!dtio_control_stop_send(info)) 1751 return; 1752 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1753 "stop control frame"); 1754 } 1755 /* when last frame and stop frame are sent, exit */ 1756 dtio_stop_flush_exit(info); 1757 } 1758 1759 /** flush at end, last packet and stop control */ 1760 static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1761 { 1762 /* briefly attempt to flush the previous packet to the output, 1763 * this could be a partial packet, or even the start control frame */ 1764 time_t secs = 0; 1765 struct timeval now; 1766 struct stop_flush_info info; 1767 struct timeval tv; 1768 struct ub_event* timer, *stopev; 1769 1770 if(dtio->fd == -1 || dtio->check_nb_connect) { 1771 /* no connection or we have just connected, so nothing is 1772 * sent yet, so nothing to stop or flush */ 1773 return; 1774 } 1775 if(dtio->ssl && !dtio->ssl_handshake_done) { 1776 /* no SSL connection has been established yet */ 1777 return; 1778 } 1779 1780 memset(&info, 0, sizeof(info)); 1781 memset(&now, 0, sizeof(now)); 1782 info.dtio = dtio; 1783 info.base = ub_default_event_base(0, &secs, &now); 1784 if(!info.base) { 1785 log_err("dnstap io: malloc failure"); 1786 return; 1787 } 1788 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1789 &dtio_stop_timer_cb, &info); 1790 if(!timer) { 1791 log_err("dnstap io: malloc failure"); 1792 ub_event_base_free(info.base); 1793 return; 1794 } 1795 memset(&tv, 0, sizeof(tv)); 1796 tv.tv_sec = 2; 1797 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1798 &tv) != 0) { 1799 log_err("dnstap io: cannot event_timer_add"); 1800 ub_event_free(timer); 1801 ub_event_base_free(info.base); 1802 return; 1803 } 1804 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1805 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1806 if(!stopev) { 1807 log_err("dnstap io: malloc failure"); 1808 ub_timer_del(timer); 1809 ub_event_free(timer); 1810 ub_event_base_free(info.base); 1811 return; 1812 } 1813 if(ub_event_add(stopev, NULL) != 0) { 1814 log_err("dnstap io: cannot event_add"); 1815 ub_event_free(stopev); 1816 ub_timer_del(timer); 1817 ub_event_free(timer); 1818 ub_event_base_free(info.base); 1819 return; 1820 } 1821 info.stop_frame = fstrm_create_control_frame_stop( 1822 &info.stop_frame_len); 1823 if(!info.stop_frame) { 1824 log_err("dnstap io: malloc failure"); 1825 ub_event_del(stopev); 1826 ub_event_free(stopev); 1827 ub_timer_del(timer); 1828 ub_event_free(timer); 1829 ub_event_base_free(info.base); 1830 return; 1831 } 1832 dtio->stop_flush_event = stopev; 1833 1834 /* wait briefly, or until finished */ 1835 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1836 if(ub_event_base_dispatch(info.base) < 0) { 1837 log_err("dnstap io: dispatch flush failed, errno is %s", 1838 strerror(errno)); 1839 } 1840 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1841 free(info.stop_frame); 1842 dtio->stop_flush_event = NULL; 1843 ub_event_del(stopev); 1844 ub_event_free(stopev); 1845 ub_timer_del(timer); 1846 ub_event_free(timer); 1847 ub_event_base_free(info.base); 1848 } 1849 1850 /** perform desetup and free stuff when the dnstap io thread exits */ 1851 static void dtio_desetup(struct dt_io_thread* dtio) 1852 { 1853 dtio_control_stop_flush(dtio); 1854 dtio_del_output_event(dtio); 1855 dtio_close_output(dtio); 1856 ub_event_del(dtio->command_event); 1857 ub_event_free(dtio->command_event); 1858 #ifndef USE_WINSOCK 1859 close(dtio->commandpipe[0]); 1860 #else 1861 _close(dtio->commandpipe[0]); 1862 #endif 1863 dtio->commandpipe[0] = -1; 1864 dtio_reconnect_del(dtio); 1865 ub_event_free(dtio->reconnect_timer); 1866 dtio_cur_msg_free(dtio); 1867 #ifndef THREADS_DISABLED 1868 ub_event_base_free(dtio->event_base); 1869 #endif 1870 } 1871 1872 /** setup a start control message */ 1873 static int dtio_control_start_send(struct dt_io_thread* dtio) 1874 { 1875 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1876 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1877 &dtio->cur_msg_len); 1878 if(!dtio->cur_msg) { 1879 return 0; 1880 } 1881 /* setup to send the control message */ 1882 /* set that the buffer needs to be sent, but the length 1883 * of that buffer is already written, that way the buffer can 1884 * start with 0 length and then the length of the control frame 1885 * in it */ 1886 dtio->cur_msg_done = 0; 1887 dtio->cur_msg_len_done = 4; 1888 return 1; 1889 } 1890 1891 /** setup a ready control message */ 1892 static int dtio_control_ready_send(struct dt_io_thread* dtio) 1893 { 1894 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1895 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1896 &dtio->cur_msg_len); 1897 if(!dtio->cur_msg) { 1898 return 0; 1899 } 1900 /* setup to send the control message */ 1901 /* set that the buffer needs to be sent, but the length 1902 * of that buffer is already written, that way the buffer can 1903 * start with 0 length and then the length of the control frame 1904 * in it */ 1905 dtio->cur_msg_done = 0; 1906 dtio->cur_msg_len_done = 4; 1907 return 1; 1908 } 1909 1910 /** open the output file descriptor for af_local */ 1911 static int dtio_open_output_local(struct dt_io_thread* dtio) 1912 { 1913 #ifdef HAVE_SYS_UN_H 1914 struct sockaddr_un s; 1915 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1916 if(dtio->fd == -1) { 1917 log_err("dnstap io: failed to create socket: %s", 1918 sock_strerror(errno)); 1919 return 0; 1920 } 1921 memset(&s, 0, sizeof(s)); 1922 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1923 /* this member exists on BSDs, not Linux */ 1924 s.sun_len = (unsigned)sizeof(s); 1925 #endif 1926 s.sun_family = AF_LOCAL; 1927 /* length is 92-108, 104 on FreeBSD */ 1928 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1929 fd_set_nonblock(dtio->fd); 1930 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1931 == -1) { 1932 char* to = dtio->socket_path; 1933 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1934 verbosity < 4) { 1935 dtio_close_fd(dtio); 1936 return 0; /* no log retries on low verbosity */ 1937 } 1938 log_err("dnstap io: failed to connect to \"%s\": %s", 1939 to, sock_strerror(errno)); 1940 dtio_close_fd(dtio); 1941 return 0; 1942 } 1943 return 1; 1944 #else 1945 log_err("cannot create af_local socket"); 1946 return 0; 1947 #endif /* HAVE_SYS_UN_H */ 1948 } 1949 1950 /** open the output file descriptor for af_inet and af_inet6 */ 1951 static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1952 { 1953 struct sockaddr_storage addr; 1954 socklen_t addrlen; 1955 memset(&addr, 0, sizeof(addr)); 1956 addrlen = (socklen_t)sizeof(addr); 1957 1958 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1959 log_err("could not parse IP '%s'", dtio->ip_str); 1960 return 0; 1961 } 1962 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1963 if(dtio->fd == -1) { 1964 log_err("can't create socket: %s", sock_strerror(errno)); 1965 return 0; 1966 } 1967 fd_set_nonblock(dtio->fd); 1968 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1969 if(errno == EINPROGRESS) 1970 return 1; /* wait until connect done*/ 1971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1972 verbosity < 4) { 1973 dtio_close_fd(dtio); 1974 return 0; /* no log retries on low verbosity */ 1975 } 1976 #ifndef USE_WINSOCK 1977 if(tcp_connect_errno_needs_log( 1978 (struct sockaddr *)&addr, addrlen)) { 1979 log_err("dnstap io: failed to connect to %s: %s", 1980 dtio->ip_str, strerror(errno)); 1981 } 1982 #else 1983 if(WSAGetLastError() == WSAEINPROGRESS || 1984 WSAGetLastError() == WSAEWOULDBLOCK) 1985 return 1; /* wait until connect done*/ 1986 if(tcp_connect_errno_needs_log( 1987 (struct sockaddr *)&addr, addrlen)) { 1988 log_err("dnstap io: failed to connect to %s: %s", 1989 dtio->ip_str, wsa_strerror(WSAGetLastError())); 1990 } 1991 #endif 1992 dtio_close_fd(dtio); 1993 return 0; 1994 } 1995 return 1; 1996 } 1997 1998 /** setup the SSL structure for new connection */ 1999 static int dtio_setup_ssl(struct dt_io_thread* dtio) 2000 { 2001 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 2002 if(!dtio->ssl) return 0; 2003 dtio->ssl_handshake_done = 0; 2004 dtio->ssl_brief_read = 0; 2005 2006 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 2007 dtio->tls_use_sni)) { 2008 return 0; 2009 } 2010 return 1; 2011 } 2012 2013 /** open the output file descriptor */ 2014 static void dtio_open_output(struct dt_io_thread* dtio) 2015 { 2016 struct ub_event* ev; 2017 if(dtio->upstream_is_unix) { 2018 if(!dtio_open_output_local(dtio)) { 2019 dtio_reconnect_enable(dtio); 2020 return; 2021 } 2022 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 2023 if(!dtio_open_output_tcp(dtio)) { 2024 dtio_reconnect_enable(dtio); 2025 return; 2026 } 2027 if(dtio->upstream_is_tls) { 2028 if(!dtio_setup_ssl(dtio)) { 2029 dtio_close_fd(dtio); 2030 dtio_reconnect_enable(dtio); 2031 return; 2032 } 2033 } 2034 } 2035 dtio->check_nb_connect = 1; 2036 2037 /* the EV_READ is to read ACCEPT control messages, and catch channel 2038 * close. EV_WRITE is to write packets */ 2039 ev = ub_event_new(dtio->event_base, dtio->fd, 2040 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 2041 dtio); 2042 if(!ev) { 2043 log_err("dnstap io: out of memory"); 2044 if(dtio->ssl) { 2045 #ifdef HAVE_SSL 2046 SSL_free(dtio->ssl); 2047 dtio->ssl = NULL; 2048 #endif 2049 } 2050 dtio_close_fd(dtio); 2051 dtio_reconnect_enable(dtio); 2052 return; 2053 } 2054 dtio->event = ev; 2055 2056 /* setup protocol control message to start */ 2057 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2058 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2059 log_err("dnstap io: out of memory"); 2060 ub_event_free(dtio->event); 2061 dtio->event = NULL; 2062 if(dtio->ssl) { 2063 #ifdef HAVE_SSL 2064 SSL_free(dtio->ssl); 2065 dtio->ssl = NULL; 2066 #endif 2067 } 2068 dtio_close_fd(dtio); 2069 dtio_reconnect_enable(dtio); 2070 return; 2071 } 2072 } 2073 2074 /** perform the setup of the writer thread on the established event_base */ 2075 static void dtio_setup_on_base(struct dt_io_thread* dtio) 2076 { 2077 dtio_setup_cmd(dtio); 2078 dtio_setup_reconnect(dtio); 2079 dtio_open_output(dtio); 2080 if(!dtio_add_output_event_write(dtio)) 2081 return; 2082 } 2083 2084 #ifndef THREADS_DISABLED 2085 /** the IO thread function for the DNSTAP IO */ 2086 static void* dnstap_io(void* arg) 2087 { 2088 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2089 time_t secs = 0; 2090 struct timeval now; 2091 log_thread_set(&dtio->threadnum); 2092 2093 /* setup */ 2094 verbose(VERB_ALGO, "start dnstap io thread"); 2095 dtio_setup_base(dtio, &secs, &now); 2096 dtio_setup_on_base(dtio); 2097 2098 /* run */ 2099 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2100 log_err("dnstap io: dispatch failed, errno is %s", 2101 strerror(errno)); 2102 } 2103 2104 /* cleanup */ 2105 verbose(VERB_ALGO, "stop dnstap io thread"); 2106 dtio_desetup(dtio); 2107 return NULL; 2108 } 2109 #endif /* THREADS_DISABLED */ 2110 2111 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2112 int numworkers) 2113 { 2114 /* set up the thread, can fail */ 2115 #ifndef USE_WINSOCK 2116 if(pipe(dtio->commandpipe) == -1) { 2117 log_err("failed to create pipe: %s", strerror(errno)); 2118 return 0; 2119 } 2120 #else 2121 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2122 log_err("failed to create _pipe: %s", 2123 wsa_strerror(WSAGetLastError())); 2124 return 0; 2125 } 2126 #endif 2127 2128 /* start the thread */ 2129 dtio->threadnum = numworkers+1; 2130 dtio->started = 1; 2131 #ifndef THREADS_DISABLED 2132 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2133 (void)event_base_nothr; 2134 #else 2135 dtio->event_base = event_base_nothr; 2136 dtio_setup_on_base(dtio); 2137 #endif 2138 return 1; 2139 } 2140 2141 void dt_io_thread_stop(struct dt_io_thread* dtio) 2142 { 2143 #ifndef THREADS_DISABLED 2144 uint8_t cmd = DTIO_COMMAND_STOP; 2145 #endif 2146 if(!dtio) return; 2147 if(!dtio->started) return; 2148 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2149 2150 #ifndef THREADS_DISABLED 2151 while(1) { 2152 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2153 if(r == -1) { 2154 #ifndef USE_WINSOCK 2155 if(errno == EINTR || errno == EAGAIN) 2156 continue; 2157 #else 2158 if(WSAGetLastError() == WSAEINPROGRESS) 2159 continue; 2160 if(WSAGetLastError() == WSAEWOULDBLOCK) 2161 continue; 2162 #endif 2163 log_err("dnstap io stop: write: %s", 2164 sock_strerror(errno)); 2165 break; 2166 } 2167 break; 2168 } 2169 dtio->started = 0; 2170 #endif /* THREADS_DISABLED */ 2171 2172 #ifndef USE_WINSOCK 2173 close(dtio->commandpipe[1]); 2174 #else 2175 _close(dtio->commandpipe[1]); 2176 #endif 2177 dtio->commandpipe[1] = -1; 2178 #ifndef THREADS_DISABLED 2179 ub_thread_join(dtio->tid); 2180 #else 2181 dtio->want_to_exit = 1; 2182 dtio_desetup(dtio); 2183 #endif 2184 } 2185