1 /* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #include "config.h" 45 #include "dnstap/dtstream.h" 46 #include "dnstap/dnstap_fstrm.h" 47 #include "util/config_file.h" 48 #include "util/ub_event.h" 49 #include "util/net_help.h" 50 #include "services/outside_network.h" 51 #include "sldns/sbuffer.h" 52 #ifdef HAVE_SYS_UN_H 53 #include <sys/un.h> 54 #endif 55 #include <fcntl.h> 56 #ifdef HAVE_OPENSSL_SSL_H 57 #include <openssl/ssl.h> 58 #endif 59 #ifdef HAVE_OPENSSL_ERR_H 60 #include <openssl/err.h> 61 #endif 62 63 /** number of messages to process in one output callback */ 64 #define DTIO_MESSAGES_PER_CALLBACK 100 65 /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66 #define DTIO_RECONNECT_TIMEOUT_MIN 10 67 /** the msec to wait for reconnect max after backoff */ 68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71 /** number of messages before wakeup of thread */ 72 #define DTIO_MSG_FOR_WAKEUP 32 73 74 /** maximum length of received frame */ 75 #define DTIO_RECV_FRAME_MAX_LEN 1000 76 77 struct stop_flush_info; 78 /** DTIO command channel commands */ 79 enum { 80 /** DTIO command channel stop */ 81 DTIO_COMMAND_STOP = 0, 82 /** DTIO command channel wakeup */ 83 DTIO_COMMAND_WAKEUP = 1 84 } dtio_channel_command; 85 86 /** open the output channel */ 87 static void dtio_open_output(struct dt_io_thread* dtio); 88 /** add output event for read and write */ 89 static int dtio_add_output_event_write(struct dt_io_thread* dtio); 90 /** start reconnection attempts */ 91 static void dtio_reconnect_enable(struct dt_io_thread* dtio); 92 /** stop from stop_flush event loop */ 93 static void dtio_stop_flush_exit(struct stop_flush_info* info); 94 /** setup a start control message */ 95 static int dtio_control_start_send(struct dt_io_thread* dtio); 96 #ifdef HAVE_SSL 97 /** enable briefly waiting for a read event, for SSL negotiation */ 98 static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99 /** enable briefly waiting for a write event, for SSL negotiation */ 100 static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101 #endif 102 103 struct dt_msg_queue* 104 dt_msg_queue_create(struct comm_base* base) 105 { 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ 111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119 } 120 121 /** clear the message list, caller must hold the lock */ 122 static void 123 dt_msg_queue_clear(struct dt_msg_queue* mq) 124 { 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; 135 mq->msgcount = 0; 136 } 137 138 void 139 dt_msg_queue_delete(struct dt_msg_queue* mq) 140 { 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); 144 comm_timer_delete(mq->wakeup_timer); 145 free(mq); 146 } 147 148 /** make the dtio wake up by sending a wakeup command */ 149 static void dtio_wakeup(struct dt_io_thread* dtio) 150 { 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158 #ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; 161 #else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; 166 #endif 167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); 169 break; 170 } 171 break; 172 } 173 } 174 175 void 176 mq_wakeup_cb(void* arg) 177 { 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 /* even if the dtio is already active, because perhaps much 180 * traffic suddenly, we leave the timer running to save on 181 * managing it, the once a second timer is less work then 182 * starting and stopping the timer frequently */ 183 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184 mq->dtio->wakeup_timer_enabled = 0; 185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186 dtio_wakeup(mq->dtio); 187 } 188 189 /** start timer to wakeup dtio because there is content in the queue */ 190 static void 191 dt_msg_queue_start_timer(struct dt_msg_queue* mq) 192 { 193 struct timeval tv; 194 /* Start a timer to process messages to be logged. 195 * If we woke up the dtio thread for every message, the wakeup 196 * messages take up too much processing power. If the queue 197 * fills up the wakeup happens immediately. The timer wakes it up 198 * if there are infrequent messages to log. */ 199 200 /* we cannot start a timer in dtio thread, because it is a different 201 * thread and its event base is in use by the other thread, it would 202 * give race conditions if we tried to modify its event base, 203 * and locks would wait until it woke up, and this is what we do. */ 204 205 /* do not start the timer if a timer already exists, perhaps 206 * in another worker. So this variable is protected by a lock in 207 * dtio */ 208 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 209 if(mq->dtio->wakeup_timer_enabled) { 210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 211 return; 212 } 213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215 216 /* start the timer, in mq, in the event base of our worker */ 217 tv.tv_sec = 1; 218 tv.tv_usec = 0; 219 comm_timer_set(mq->wakeup_timer, &tv); 220 } 221 222 void 223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 224 { 225 int wakeupnow = 0, wakeupstarttimer = 0; 226 struct dt_msg_entry* entry; 227 228 /* check conditions */ 229 if(!buf) return; 230 if(len == 0) { 231 /* it is not possible to log entries with zero length, 232 * because the framestream protocol does not carry it. 233 * However the protobuf serialization does not create zero 234 * length datagrams for dnstap, so this should not happen. */ 235 free(buf); 236 return; 237 } 238 if(!mq) { 239 free(buf); 240 return; 241 } 242 243 /* allocate memory for queue entry */ 244 entry = malloc(sizeof(*entry)); 245 if(!entry) { 246 log_err("out of memory logging dnstap"); 247 free(buf); 248 return; 249 } 250 entry->next = NULL; 251 entry->buf = buf; 252 entry->len = len; 253 254 /* aqcuire lock */ 255 lock_basic_lock(&mq->lock); 256 /* if list was empty, start timer for (eventual) wakeup */ 257 if(mq->first == NULL) 258 wakeupstarttimer = 1; 259 /* if list contains more than wakeupnum elements, wakeup now, 260 * or if list is (going to be) almost full */ 261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 262 (mq->cursize < mq->maxsize * 9 / 10 && 263 mq->cursize+len >= mq->maxsize * 9 / 10)) 264 wakeupnow = 1; 265 /* see if it is going to fit */ 266 if(mq->cursize + len > mq->maxsize) { 267 /* buffer full, or congested. */ 268 /* drop */ 269 lock_basic_unlock(&mq->lock); 270 free(buf); 271 free(entry); 272 return; 273 } 274 mq->cursize += len; 275 mq->msgcount ++; 276 /* append to list */ 277 if(mq->last) { 278 mq->last->next = entry; 279 } else { 280 mq->first = entry; 281 } 282 mq->last = entry; 283 /* release lock */ 284 lock_basic_unlock(&mq->lock); 285 286 if(wakeupnow) { 287 dtio_wakeup(mq->dtio); 288 } else if(wakeupstarttimer) { 289 dt_msg_queue_start_timer(mq); 290 } 291 } 292 293 struct dt_io_thread* dt_io_thread_create(void) 294 { 295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 296 lock_basic_init(&dtio->wakeup_timer_lock); 297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 298 sizeof(dtio->wakeup_timer_enabled)); 299 return dtio; 300 } 301 302 void dt_io_thread_delete(struct dt_io_thread* dtio) 303 { 304 struct dt_io_list_item* item, *nextitem; 305 if(!dtio) return; 306 lock_basic_destroy(&dtio->wakeup_timer_lock); 307 item=dtio->io_list; 308 while(item) { 309 nextitem = item->next; 310 free(item); 311 item = nextitem; 312 } 313 free(dtio->socket_path); 314 free(dtio->ip_str); 315 free(dtio->tls_server_name); 316 free(dtio->client_key_file); 317 free(dtio->client_cert_file); 318 if(dtio->ssl_ctx) { 319 #ifdef HAVE_SSL 320 SSL_CTX_free(dtio->ssl_ctx); 321 #endif 322 } 323 free(dtio); 324 } 325 326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 327 { 328 if(!cfg->dnstap) { 329 log_warn("cannot setup dnstap because dnstap-enable is no"); 330 return 0; 331 } 332 333 /* what type of connectivity do we have */ 334 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 335 if(cfg->dnstap_tls) 336 dtio->upstream_is_tls = 1; 337 else dtio->upstream_is_tcp = 1; 338 } else { 339 dtio->upstream_is_unix = 1; 340 } 341 dtio->is_bidirectional = cfg->dnstap_bidirectional; 342 343 if(dtio->upstream_is_unix) { 344 if(!cfg->dnstap_socket_path || 345 cfg->dnstap_socket_path[0]==0) { 346 log_err("dnstap setup: no dnstap-socket-path for " 347 "socket connect"); 348 return 0; 349 } 350 free(dtio->socket_path); 351 dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path, 352 cfg, 1); 353 if(!dtio->socket_path) { 354 log_err("dnstap setup: malloc failure"); 355 return 0; 356 } 357 } 358 359 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 360 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 361 log_err("dnstap setup: no dnstap-ip for TCP connect"); 362 return 0; 363 } 364 free(dtio->ip_str); 365 dtio->ip_str = strdup(cfg->dnstap_ip); 366 if(!dtio->ip_str) { 367 log_err("dnstap setup: malloc failure"); 368 return 0; 369 } 370 } 371 372 if(dtio->upstream_is_tls) { 373 #ifdef HAVE_SSL 374 if(cfg->dnstap_tls_server_name && 375 cfg->dnstap_tls_server_name[0]) { 376 free(dtio->tls_server_name); 377 dtio->tls_server_name = strdup( 378 cfg->dnstap_tls_server_name); 379 if(!dtio->tls_server_name) { 380 log_err("dnstap setup: malloc failure"); 381 return 0; 382 } 383 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 384 return 0; 385 } 386 if(cfg->dnstap_tls_client_key_file && 387 cfg->dnstap_tls_client_key_file[0]) { 388 dtio->use_client_certs = 1; 389 free(dtio->client_key_file); 390 dtio->client_key_file = strdup( 391 cfg->dnstap_tls_client_key_file); 392 if(!dtio->client_key_file) { 393 log_err("dnstap setup: malloc failure"); 394 return 0; 395 } 396 if(!cfg->dnstap_tls_client_cert_file || 397 cfg->dnstap_tls_client_cert_file[0]==0) { 398 log_err("dnstap setup: client key " 399 "authentication enabled with " 400 "dnstap-tls-client-key-file, but " 401 "no dnstap-tls-client-cert-file " 402 "is given"); 403 return 0; 404 } 405 free(dtio->client_cert_file); 406 dtio->client_cert_file = strdup( 407 cfg->dnstap_tls_client_cert_file); 408 if(!dtio->client_cert_file) { 409 log_err("dnstap setup: malloc failure"); 410 return 0; 411 } 412 } else { 413 dtio->use_client_certs = 0; 414 dtio->client_key_file = NULL; 415 dtio->client_cert_file = NULL; 416 } 417 418 if(cfg->dnstap_tls_cert_bundle) { 419 dtio->ssl_ctx = connect_sslctx_create( 420 dtio->client_key_file, 421 dtio->client_cert_file, 422 cfg->dnstap_tls_cert_bundle, 0); 423 } else { 424 dtio->ssl_ctx = connect_sslctx_create( 425 dtio->client_key_file, 426 dtio->client_cert_file, 427 cfg->tls_cert_bundle, cfg->tls_win_cert); 428 } 429 if(!dtio->ssl_ctx) { 430 log_err("could not setup SSL CTX"); 431 return 0; 432 } 433 dtio->tls_use_sni = cfg->tls_use_sni; 434 #endif /* HAVE_SSL */ 435 } 436 return 1; 437 } 438 439 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 440 struct dt_msg_queue* mq) 441 { 442 struct dt_io_list_item* item = malloc(sizeof(*item)); 443 if(!item) return 0; 444 lock_basic_lock(&mq->lock); 445 mq->dtio = dtio; 446 lock_basic_unlock(&mq->lock); 447 item->queue = mq; 448 item->next = dtio->io_list; 449 dtio->io_list = item; 450 dtio->io_list_iter = NULL; 451 return 1; 452 } 453 454 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 455 struct dt_msg_queue* mq) 456 { 457 struct dt_io_list_item* item, *prev=NULL; 458 if(!dtio) return; 459 item = dtio->io_list; 460 while(item) { 461 if(item->queue == mq) { 462 /* found it */ 463 if(prev) prev->next = item->next; 464 else dtio->io_list = item->next; 465 /* the queue itself only registered, not deleted */ 466 lock_basic_lock(&item->queue->lock); 467 item->queue->dtio = NULL; 468 lock_basic_unlock(&item->queue->lock); 469 free(item); 470 dtio->io_list_iter = NULL; 471 return; 472 } 473 prev = item; 474 item = item->next; 475 } 476 } 477 478 /** pick a message from the queue, the routine locks and unlocks, 479 * returns true if there is a message */ 480 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 481 size_t* len) 482 { 483 lock_basic_lock(&mq->lock); 484 if(mq->first) { 485 struct dt_msg_entry* entry = mq->first; 486 mq->first = entry->next; 487 if(!entry->next) mq->last = NULL; 488 mq->cursize -= entry->len; 489 mq->msgcount --; 490 lock_basic_unlock(&mq->lock); 491 492 *buf = entry->buf; 493 *len = entry->len; 494 free(entry); 495 return 1; 496 } 497 lock_basic_unlock(&mq->lock); 498 return 0; 499 } 500 501 /** find message in queue, false if no message, true if message to send */ 502 static int dtio_find_in_queue(struct dt_io_thread* dtio, 503 struct dt_msg_queue* mq) 504 { 505 void* buf=NULL; 506 size_t len=0; 507 if(dt_msg_queue_pop(mq, &buf, &len)) { 508 dtio->cur_msg = buf; 509 dtio->cur_msg_len = len; 510 dtio->cur_msg_done = 0; 511 dtio->cur_msg_len_done = 0; 512 return 1; 513 } 514 return 0; 515 } 516 517 /** find a new message to write, search message queues, false if none */ 518 static int dtio_find_msg(struct dt_io_thread* dtio) 519 { 520 struct dt_io_list_item *spot, *item; 521 522 spot = dtio->io_list_iter; 523 /* use the next queue for the next message lookup, 524 * if we hit the end(NULL) the NULL restarts the iter at start. */ 525 if(spot) 526 dtio->io_list_iter = spot->next; 527 else if(dtio->io_list) 528 dtio->io_list_iter = dtio->io_list->next; 529 530 /* scan from spot to end-of-io_list */ 531 item = spot; 532 while(item) { 533 if(dtio_find_in_queue(dtio, item->queue)) 534 return 1; 535 item = item->next; 536 } 537 /* scan starting at the start-of-list (to wrap around the end) */ 538 item = dtio->io_list; 539 while(item) { 540 if(dtio_find_in_queue(dtio, item->queue)) 541 return 1; 542 item = item->next; 543 } 544 return 0; 545 } 546 547 /** callback for the dnstap reconnect, to start reconnecting to output */ 548 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 549 short ATTR_UNUSED(bits), void* arg) 550 { 551 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 552 dtio->reconnect_is_added = 0; 553 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 554 555 dtio_open_output(dtio); 556 if(dtio->event) { 557 if(!dtio_add_output_event_write(dtio)) 558 return; 559 /* nothing wrong so far, wait on the output event */ 560 return; 561 } 562 /* exponential backoff and retry on timer */ 563 dtio_reconnect_enable(dtio); 564 } 565 566 /** attempt to reconnect to the output, after a timeout */ 567 static void dtio_reconnect_enable(struct dt_io_thread* dtio) 568 { 569 struct timeval tv; 570 int msec; 571 if(dtio->want_to_exit) return; 572 if(dtio->reconnect_is_added) 573 return; /* already done */ 574 575 /* exponential backoff, store the value for next timeout */ 576 msec = dtio->reconnect_timeout; 577 if(msec == 0) { 578 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 579 } else { 580 dtio->reconnect_timeout = msec*2; 581 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 582 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 583 } 584 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 585 msec); 586 587 /* setup wait timer */ 588 memset(&tv, 0, sizeof(tv)); 589 tv.tv_sec = msec/1000; 590 tv.tv_usec = (msec%1000)*1000; 591 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 592 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 593 log_err("dnstap io: could not reconnect ev timer add"); 594 return; 595 } 596 dtio->reconnect_is_added = 1; 597 } 598 599 /** remove dtio reconnect timer */ 600 static void dtio_reconnect_del(struct dt_io_thread* dtio) 601 { 602 if(!dtio->reconnect_is_added) 603 return; 604 ub_timer_del(dtio->reconnect_timer); 605 dtio->reconnect_is_added = 0; 606 } 607 608 /** clear the reconnect exponential backoff timer. 609 * We have successfully connected so we can try again with short timeouts. */ 610 static void dtio_reconnect_clear(struct dt_io_thread* dtio) 611 { 612 dtio->reconnect_timeout = 0; 613 dtio_reconnect_del(dtio); 614 } 615 616 /** reconnect slowly, because we already know we have to wait for a bit */ 617 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 618 { 619 dtio_reconnect_del(dtio); 620 dtio->reconnect_timeout = msec; 621 dtio_reconnect_enable(dtio); 622 } 623 624 /** delete the current message in the dtio, and reset counters */ 625 static void dtio_cur_msg_free(struct dt_io_thread* dtio) 626 { 627 free(dtio->cur_msg); 628 dtio->cur_msg = NULL; 629 dtio->cur_msg_len = 0; 630 dtio->cur_msg_done = 0; 631 dtio->cur_msg_len_done = 0; 632 } 633 634 /** delete the buffer and counters used to read frame */ 635 static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 636 { 637 if(rb->buf) { 638 free(rb->buf); 639 rb->buf = NULL; 640 } 641 rb->buf_count = 0; 642 rb->buf_cap = 0; 643 rb->frame_len = 0; 644 rb->frame_len_done = 0; 645 rb->control_frame = 0; 646 } 647 648 /** del the output file descriptor event for listening */ 649 static void dtio_del_output_event(struct dt_io_thread* dtio) 650 { 651 if(!dtio->event_added) 652 return; 653 ub_event_del(dtio->event); 654 dtio->event_added = 0; 655 dtio->event_added_is_write = 0; 656 } 657 658 /** close dtio socket and set it to -1 */ 659 static void dtio_close_fd(struct dt_io_thread* dtio) 660 { 661 sock_close(dtio->fd); 662 dtio->fd = -1; 663 } 664 665 /** close and stop the output file descriptor event */ 666 static void dtio_close_output(struct dt_io_thread* dtio) 667 { 668 if(!dtio->event) 669 return; 670 ub_event_free(dtio->event); 671 dtio->event = NULL; 672 if(dtio->ssl) { 673 #ifdef HAVE_SSL 674 SSL_shutdown(dtio->ssl); 675 SSL_free(dtio->ssl); 676 dtio->ssl = NULL; 677 #endif 678 } 679 dtio_close_fd(dtio); 680 681 /* if there is a (partial) message, discard it 682 * we cannot send (the remainder of) it, and a new 683 * connection needs to start with a control frame. */ 684 if(dtio->cur_msg) { 685 dtio_cur_msg_free(dtio); 686 } 687 688 dtio->ready_frame_sent = 0; 689 dtio->accept_frame_received = 0; 690 dtio_read_frame_free(&dtio->read_frame); 691 692 dtio_reconnect_enable(dtio); 693 } 694 695 /** check for pending nonblocking connect errors, 696 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 697 static int dtio_check_nb_connect(struct dt_io_thread* dtio) 698 { 699 int error = 0; 700 socklen_t len = (socklen_t)sizeof(error); 701 if(!dtio->check_nb_connect) 702 return 1; /* everything okay */ 703 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 704 &len) < 0) { 705 #ifndef USE_WINSOCK 706 error = errno; /* on solaris errno is error */ 707 #else 708 error = WSAGetLastError(); 709 #endif 710 } 711 #ifndef USE_WINSOCK 712 #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 713 if(error == EINPROGRESS || error == EWOULDBLOCK) 714 return 0; /* try again later */ 715 #endif 716 #else 717 if(error == WSAEINPROGRESS) { 718 return 0; /* try again later */ 719 } else if(error == WSAEWOULDBLOCK) { 720 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 721 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 722 return 0; /* try again later */ 723 } 724 #endif 725 if(error != 0) { 726 char* to = dtio->socket_path; 727 if(!to) to = dtio->ip_str; 728 if(!to) to = ""; 729 log_err("dnstap io: failed to connect to \"%s\": %s", 730 to, sock_strerror(error)); 731 return -1; /* error, close it */ 732 } 733 734 if(dtio->ip_str) 735 verbose(VERB_DETAIL, "dnstap io: connected to %s", 736 dtio->ip_str); 737 else if(dtio->socket_path) 738 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 739 dtio->socket_path); 740 dtio_reconnect_clear(dtio); 741 dtio->check_nb_connect = 0; 742 return 1; /* everything okay */ 743 } 744 745 #ifdef HAVE_SSL 746 /** write to ssl output 747 * returns number of bytes written, 0 if nothing happened, 748 * try again later, or -1 if the channel is to be closed. */ 749 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 750 size_t len) 751 { 752 int r; 753 ERR_clear_error(); 754 r = SSL_write(dtio->ssl, buf, len); 755 if(r <= 0) { 756 int want = SSL_get_error(dtio->ssl, r); 757 if(want == SSL_ERROR_ZERO_RETURN) { 758 /* closed */ 759 return -1; 760 } else if(want == SSL_ERROR_WANT_READ) { 761 /* we want a brief read event */ 762 dtio_enable_brief_read(dtio); 763 return 0; 764 } else if(want == SSL_ERROR_WANT_WRITE) { 765 /* write again later */ 766 return 0; 767 } else if(want == SSL_ERROR_SYSCALL) { 768 #ifdef EPIPE 769 if(errno == EPIPE && verbosity < 2) 770 return -1; /* silence 'broken pipe' */ 771 #endif 772 #ifdef ECONNRESET 773 if(errno == ECONNRESET && verbosity < 2) 774 return -1; /* silence reset by peer */ 775 #endif 776 if(errno != 0) { 777 log_err("dnstap io, SSL_write syscall: %s", 778 strerror(errno)); 779 } 780 return -1; 781 } 782 log_crypto_err("dnstap io, could not SSL_write"); 783 return -1; 784 } 785 return r; 786 } 787 #endif /* HAVE_SSL */ 788 789 /** write buffer to output. 790 * returns number of bytes written, 0 if nothing happened, 791 * try again later, or -1 if the channel is to be closed. */ 792 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 793 size_t len) 794 { 795 ssize_t ret; 796 if(dtio->fd == -1) 797 return -1; 798 #ifdef HAVE_SSL 799 if(dtio->ssl) 800 return dtio_write_ssl(dtio, buf, len); 801 #endif 802 ret = send(dtio->fd, (void*)buf, len, 0); 803 if(ret == -1) { 804 #ifndef USE_WINSOCK 805 if(errno == EINTR || errno == EAGAIN) 806 return 0; 807 #else 808 if(WSAGetLastError() == WSAEINPROGRESS) 809 return 0; 810 if(WSAGetLastError() == WSAEWOULDBLOCK) { 811 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 812 dtio->stop_flush_event:dtio->event), 813 UB_EV_WRITE); 814 return 0; 815 } 816 #endif 817 log_err("dnstap io: failed send: %s", sock_strerror(errno)); 818 return -1; 819 } 820 return ret; 821 } 822 823 #ifdef HAVE_WRITEV 824 /** write with writev, len and message, in one write, if possible. 825 * return true if message is done, false if incomplete */ 826 static int dtio_write_with_writev(struct dt_io_thread* dtio) 827 { 828 uint32_t sendlen = htonl(dtio->cur_msg_len); 829 struct iovec iov[2]; 830 ssize_t r; 831 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 832 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 833 iov[1].iov_base = dtio->cur_msg; 834 iov[1].iov_len = dtio->cur_msg_len; 835 log_assert(iov[0].iov_len > 0); 836 r = writev(dtio->fd, iov, 2); 837 if(r == -1) { 838 #ifndef USE_WINSOCK 839 if(errno == EINTR || errno == EAGAIN) 840 return 0; 841 #else 842 if(WSAGetLastError() == WSAEINPROGRESS) 843 return 0; 844 if(WSAGetLastError() == WSAEWOULDBLOCK) { 845 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 846 dtio->stop_flush_event:dtio->event), 847 UB_EV_WRITE); 848 return 0; 849 } 850 #endif 851 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 852 /* close the channel */ 853 dtio_del_output_event(dtio); 854 dtio_close_output(dtio); 855 return 0; 856 } 857 /* written r bytes */ 858 dtio->cur_msg_len_done += r; 859 if(dtio->cur_msg_len_done < 4) 860 return 0; 861 if(dtio->cur_msg_len_done > 4) { 862 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 863 dtio->cur_msg_len_done = 4; 864 } 865 if(dtio->cur_msg_done < dtio->cur_msg_len) 866 return 0; 867 return 1; 868 } 869 #endif /* HAVE_WRITEV */ 870 871 /** write more of the length, preceding the data frame. 872 * return true if message is done, false if incomplete. */ 873 static int dtio_write_more_of_len(struct dt_io_thread* dtio) 874 { 875 uint32_t sendlen; 876 int r; 877 if(dtio->cur_msg_len_done >= 4) 878 return 1; 879 #ifdef HAVE_WRITEV 880 if(!dtio->ssl) { 881 /* we try writev for everything.*/ 882 return dtio_write_with_writev(dtio); 883 } 884 #endif /* HAVE_WRITEV */ 885 sendlen = htonl(dtio->cur_msg_len); 886 r = dtio_write_buf(dtio, 887 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 888 sizeof(sendlen)-dtio->cur_msg_len_done); 889 if(r == -1) { 890 /* close the channel */ 891 dtio_del_output_event(dtio); 892 dtio_close_output(dtio); 893 return 0; 894 } else if(r == 0) { 895 /* try again later */ 896 return 0; 897 } 898 dtio->cur_msg_len_done += r; 899 if(dtio->cur_msg_len_done < 4) 900 return 0; 901 return 1; 902 } 903 904 /** write more of the data frame. 905 * return true if message is done, false if incomplete. */ 906 static int dtio_write_more_of_data(struct dt_io_thread* dtio) 907 { 908 int r; 909 if(dtio->cur_msg_done >= dtio->cur_msg_len) 910 return 1; 911 r = dtio_write_buf(dtio, 912 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 913 dtio->cur_msg_len - dtio->cur_msg_done); 914 if(r == -1) { 915 /* close the channel */ 916 dtio_del_output_event(dtio); 917 dtio_close_output(dtio); 918 return 0; 919 } else if(r == 0) { 920 /* try again later */ 921 return 0; 922 } 923 dtio->cur_msg_done += r; 924 if(dtio->cur_msg_done < dtio->cur_msg_len) 925 return 0; 926 return 1; 927 } 928 929 /** write more of the current messsage. false if incomplete, true if 930 * the message is done */ 931 static int dtio_write_more(struct dt_io_thread* dtio) 932 { 933 if(dtio->cur_msg_len_done < 4) { 934 if(!dtio_write_more_of_len(dtio)) 935 return 0; 936 } 937 if(dtio->cur_msg_done < dtio->cur_msg_len) { 938 if(!dtio_write_more_of_data(dtio)) 939 return 0; 940 } 941 return 1; 942 } 943 944 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 945 * -1: continue, >0: number of bytes read into buffer */ 946 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 947 ssize_t r; 948 r = recv(dtio->fd, (void*)buf, len, 0); 949 if(r == -1) { 950 char* to = dtio->socket_path; 951 if(!to) to = dtio->ip_str; 952 if(!to) to = ""; 953 #ifndef USE_WINSOCK 954 if(errno == EINTR || errno == EAGAIN) 955 return -1; /* try later */ 956 #else 957 if(WSAGetLastError() == WSAEINPROGRESS) { 958 return -1; /* try later */ 959 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 960 ub_winsock_tcp_wouldblock( 961 (dtio->stop_flush_event? 962 dtio->stop_flush_event:dtio->event), 963 UB_EV_READ); 964 return -1; /* try later */ 965 } 966 #endif 967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 968 verbosity < 4) 969 return 0; /* no log retries on low verbosity */ 970 log_err("dnstap io: output closed, recv %s: %s", to, 971 strerror(errno)); 972 /* and close below */ 973 return 0; 974 } 975 if(r == 0) { 976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 977 verbosity < 4) 978 return 0; /* no log retries on low verbosity */ 979 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 980 /* and close below */ 981 return 0; 982 } 983 /* something was received */ 984 return r; 985 } 986 987 #ifdef HAVE_SSL 988 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 989 * -1: continue, >0: number of bytes read into buffer */ 990 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 991 { 992 int r; 993 ERR_clear_error(); 994 r = SSL_read(dtio->ssl, buf, len); 995 if(r <= 0) { 996 int want = SSL_get_error(dtio->ssl, r); 997 if(want == SSL_ERROR_ZERO_RETURN) { 998 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 999 verbosity < 4) 1000 return 0; /* no log retries on low verbosity */ 1001 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1002 "other side"); 1003 return 0; 1004 } else if(want == SSL_ERROR_WANT_READ) { 1005 /* continue later */ 1006 return -1; 1007 } else if(want == SSL_ERROR_WANT_WRITE) { 1008 (void)dtio_enable_brief_write(dtio); 1009 return -1; 1010 } else if(want == SSL_ERROR_SYSCALL) { 1011 #ifdef ECONNRESET 1012 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1013 errno == ECONNRESET && verbosity < 4) 1014 return 0; /* silence reset by peer */ 1015 #endif 1016 if(errno != 0) 1017 log_err("SSL_read syscall: %s", 1018 strerror(errno)); 1019 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1020 "other side"); 1021 return 0; 1022 } 1023 log_crypto_err("could not SSL_read"); 1024 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1025 "other side"); 1026 return 0; 1027 } 1028 return r; 1029 } 1030 #endif /* HAVE_SSL */ 1031 1032 /** check if the output fd has been closed, 1033 * it returns false if the stream is closed. */ 1034 static int dtio_check_close(struct dt_io_thread* dtio) 1035 { 1036 /* we don't want to read any packets, but if there are we can 1037 * discard the input (ignore it). Ignore of unknown (control) 1038 * packets is okay for the framestream protocol. And also, the 1039 * read call can return that the stream has been closed by the 1040 * other side. */ 1041 uint8_t buf[1024]; 1042 int r = -1; 1043 1044 1045 if(dtio->fd == -1) return 0; 1046 1047 while(r != 0) { 1048 /* not interested in buffer content, overwrite */ 1049 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 1050 if(r == -1) 1051 return 1; 1052 } 1053 /* the other end has been closed */ 1054 /* close the channel */ 1055 dtio_del_output_event(dtio); 1056 dtio_close_output(dtio); 1057 return 0; 1058 } 1059 1060 /** Read accept frame. Returns -1: continue reading, 0: closed, 1061 * 1: valid accept received. */ 1062 static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1063 { 1064 int r; 1065 size_t read_frame_done; 1066 while(dtio->read_frame.frame_len_done < 4) { 1067 #ifdef HAVE_SSL 1068 if(dtio->ssl) { 1069 r = ssl_read_bytes(dtio, 1070 (uint8_t*)&dtio->read_frame.frame_len+ 1071 dtio->read_frame.frame_len_done, 1072 4-dtio->read_frame.frame_len_done); 1073 } else { 1074 #endif 1075 r = receive_bytes(dtio, 1076 (uint8_t*)&dtio->read_frame.frame_len+ 1077 dtio->read_frame.frame_len_done, 1078 4-dtio->read_frame.frame_len_done); 1079 #ifdef HAVE_SSL 1080 } 1081 #endif 1082 if(r == -1) 1083 return -1; /* continue reading */ 1084 if(r == 0) { 1085 /* connection closed */ 1086 goto close_connection; 1087 } 1088 dtio->read_frame.frame_len_done += r; 1089 if(dtio->read_frame.frame_len_done < 4) 1090 return -1; /* continue reading */ 1091 1092 if(dtio->read_frame.frame_len == 0) { 1093 dtio->read_frame.frame_len_done = 0; 1094 dtio->read_frame.control_frame = 1; 1095 continue; 1096 } 1097 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1098 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1099 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1100 "length of %d bytes, closing connection", 1101 DTIO_RECV_FRAME_MAX_LEN); 1102 goto close_connection; 1103 } 1104 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1105 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1106 if(!dtio->read_frame.buf) { 1107 log_err("dnstap io: out of memory (creating read " 1108 "buffer)"); 1109 goto close_connection; 1110 } 1111 } 1112 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1113 #ifdef HAVE_SSL 1114 if(dtio->ssl) { 1115 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1116 dtio->read_frame.buf_count, 1117 dtio->read_frame.buf_cap- 1118 dtio->read_frame.buf_count); 1119 } else { 1120 #endif 1121 r = receive_bytes(dtio, dtio->read_frame.buf+ 1122 dtio->read_frame.buf_count, 1123 dtio->read_frame.buf_cap- 1124 dtio->read_frame.buf_count); 1125 #ifdef HAVE_SSL 1126 } 1127 #endif 1128 if(r == -1) 1129 return -1; /* continue reading */ 1130 if(r == 0) { 1131 /* connection closed */ 1132 goto close_connection; 1133 } 1134 dtio->read_frame.buf_count += r; 1135 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1136 return -1; /* continue reading */ 1137 } 1138 1139 /* Complete frame received, check if this is a valid ACCEPT control 1140 * frame. */ 1141 if(dtio->read_frame.frame_len < 4) { 1142 verbose(VERB_OPS, "dnstap: invalid data received"); 1143 goto close_connection; 1144 } 1145 if(sldns_read_uint32(dtio->read_frame.buf) != 1146 FSTRM_CONTROL_FRAME_ACCEPT) { 1147 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1148 "ignored"); 1149 dtio->ready_frame_sent = 0; 1150 dtio->accept_frame_received = 0; 1151 dtio_read_frame_free(&dtio->read_frame); 1152 return -1; 1153 } 1154 read_frame_done = 4; /* control frame type */ 1155 1156 /* Iterate over control fields, ignore unknown types. 1157 * Need to be able to read at least 8 bytes (control field type + 1158 * length). */ 1159 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1160 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1161 read_frame_done); 1162 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1163 read_frame_done + 4); 1164 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1165 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1166 read_frame_done+8+len <= 1167 dtio->read_frame.frame_len && 1168 memcmp(dtio->read_frame.buf + read_frame_done + 1169 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1170 if(!dtio_control_start_send(dtio)) { 1171 verbose(VERB_OPS, "dnstap io: out of " 1172 "memory while sending START frame"); 1173 goto close_connection; 1174 } 1175 dtio->accept_frame_received = 1; 1176 if(!dtio_add_output_event_write(dtio)) 1177 goto close_connection; 1178 return 1; 1179 } else { 1180 /* unknow content type */ 1181 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1182 "contains unknown content type, " 1183 "closing connection"); 1184 goto close_connection; 1185 } 1186 } 1187 /* unknown option, try next */ 1188 read_frame_done += 8+len; 1189 } 1190 1191 1192 close_connection: 1193 dtio_del_output_event(dtio); 1194 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1195 dtio_close_output(dtio); 1196 return 0; 1197 } 1198 1199 /** add the output file descriptor event for listening, read only */ 1200 static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1201 { 1202 if(!dtio->event) 1203 return 0; 1204 if(dtio->event_added && !dtio->event_added_is_write) 1205 return 1; 1206 /* we have to (re-)register the event */ 1207 if(dtio->event_added) 1208 ub_event_del(dtio->event); 1209 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1210 if(ub_event_add(dtio->event, NULL) != 0) { 1211 log_err("dnstap io: out of memory (adding event)"); 1212 dtio->event_added = 0; 1213 dtio->event_added_is_write = 0; 1214 /* close output and start reattempts to open it */ 1215 dtio_close_output(dtio); 1216 return 0; 1217 } 1218 dtio->event_added = 1; 1219 dtio->event_added_is_write = 0; 1220 return 1; 1221 } 1222 1223 /** add the output file descriptor event for listening, read and write */ 1224 static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1225 { 1226 if(!dtio->event) 1227 return 0; 1228 if(dtio->event_added && dtio->event_added_is_write) 1229 return 1; 1230 /* we have to (re-)register the event */ 1231 if(dtio->event_added) 1232 ub_event_del(dtio->event); 1233 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1234 if(ub_event_add(dtio->event, NULL) != 0) { 1235 log_err("dnstap io: out of memory (adding event)"); 1236 dtio->event_added = 0; 1237 dtio->event_added_is_write = 0; 1238 /* close output and start reattempts to open it */ 1239 dtio_close_output(dtio); 1240 return 0; 1241 } 1242 dtio->event_added = 1; 1243 dtio->event_added_is_write = 1; 1244 return 1; 1245 } 1246 1247 /** put the dtio thread to sleep */ 1248 static void dtio_sleep(struct dt_io_thread* dtio) 1249 { 1250 /* unregister the event polling for write, because there is 1251 * nothing to be written */ 1252 (void)dtio_add_output_event_read(dtio); 1253 } 1254 1255 #ifdef HAVE_SSL 1256 /** enable the brief read condition */ 1257 static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1258 { 1259 dtio->ssl_brief_read = 1; 1260 if(dtio->stop_flush_event) { 1261 ub_event_del(dtio->stop_flush_event); 1262 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1263 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1264 log_err("dnstap io, stop flush, could not ub_event_add"); 1265 return 0; 1266 } 1267 return 1; 1268 } 1269 return dtio_add_output_event_read(dtio); 1270 } 1271 #endif /* HAVE_SSL */ 1272 1273 #ifdef HAVE_SSL 1274 /** disable the brief read condition */ 1275 static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1276 { 1277 dtio->ssl_brief_read = 0; 1278 if(dtio->stop_flush_event) { 1279 ub_event_del(dtio->stop_flush_event); 1280 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1281 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1282 log_err("dnstap io, stop flush, could not ub_event_add"); 1283 return 0; 1284 } 1285 return 1; 1286 } 1287 return dtio_add_output_event_write(dtio); 1288 } 1289 #endif /* HAVE_SSL */ 1290 1291 #ifdef HAVE_SSL 1292 /** enable the brief write condition */ 1293 static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1294 { 1295 dtio->ssl_brief_write = 1; 1296 return dtio_add_output_event_write(dtio); 1297 } 1298 #endif /* HAVE_SSL */ 1299 1300 #ifdef HAVE_SSL 1301 /** disable the brief write condition */ 1302 static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1303 { 1304 dtio->ssl_brief_write = 0; 1305 return dtio_add_output_event_read(dtio); 1306 } 1307 #endif /* HAVE_SSL */ 1308 1309 #ifdef HAVE_SSL 1310 /** check peer verification after ssl handshake connection, false if closed*/ 1311 static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1312 { 1313 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1314 /* verification */ 1315 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1316 X509* x = SSL_get_peer_certificate(dtio->ssl); 1317 if(!x) { 1318 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1319 "connection failed no certificate", 1320 dtio->ip_str); 1321 return 0; 1322 } 1323 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1324 x); 1325 #ifdef HAVE_SSL_GET0_PEERNAME 1326 if(SSL_get0_peername(dtio->ssl)) { 1327 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1328 "connection to %s authenticated", 1329 dtio->ip_str, 1330 SSL_get0_peername(dtio->ssl)); 1331 } else { 1332 #endif 1333 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1334 "connection authenticated", 1335 dtio->ip_str); 1336 #ifdef HAVE_SSL_GET0_PEERNAME 1337 } 1338 #endif 1339 X509_free(x); 1340 } else { 1341 X509* x = SSL_get_peer_certificate(dtio->ssl); 1342 if(x) { 1343 log_cert(VERB_ALGO, "dnstap io, peer " 1344 "certificate", x); 1345 X509_free(x); 1346 } 1347 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1348 "failed: failed to authenticate", 1349 dtio->ip_str); 1350 return 0; 1351 } 1352 } else { 1353 /* unauthenticated, the verify peer flag was not set 1354 * in ssl when the ssl object was created from ssl_ctx */ 1355 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1356 dtio->ip_str); 1357 } 1358 return 1; 1359 } 1360 #endif /* HAVE_SSL */ 1361 1362 #ifdef HAVE_SSL 1363 /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1364 static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1365 struct stop_flush_info* info) 1366 { 1367 int r; 1368 if(dtio->ssl_brief_read) { 1369 /* assume the brief read condition is satisfied, 1370 * if we need more or again, we can set it again */ 1371 if(!dtio_disable_brief_read(dtio)) { 1372 if(info) dtio_stop_flush_exit(info); 1373 return 0; 1374 } 1375 } 1376 if(dtio->ssl_handshake_done) 1377 return 1; 1378 1379 ERR_clear_error(); 1380 r = SSL_do_handshake(dtio->ssl); 1381 if(r != 1) { 1382 int want = SSL_get_error(dtio->ssl, r); 1383 if(want == SSL_ERROR_WANT_READ) { 1384 /* we want to read on the connection */ 1385 if(!dtio_enable_brief_read(dtio)) { 1386 if(info) dtio_stop_flush_exit(info); 1387 return 0; 1388 } 1389 return 0; 1390 } else if(want == SSL_ERROR_WANT_WRITE) { 1391 /* we want to write on the connection */ 1392 return 0; 1393 } else if(r == 0) { 1394 /* closed */ 1395 if(info) dtio_stop_flush_exit(info); 1396 dtio_del_output_event(dtio); 1397 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1398 dtio_close_output(dtio); 1399 return 0; 1400 } else if(want == SSL_ERROR_SYSCALL) { 1401 /* SYSCALL and errno==0 means closed uncleanly */ 1402 int silent = 0; 1403 #ifdef EPIPE 1404 if(errno == EPIPE && verbosity < 2) 1405 silent = 1; /* silence 'broken pipe' */ 1406 #endif 1407 #ifdef ECONNRESET 1408 if(errno == ECONNRESET && verbosity < 2) 1409 silent = 1; /* silence reset by peer */ 1410 #endif 1411 if(errno == 0) 1412 silent = 1; 1413 if(!silent) 1414 log_err("dnstap io, SSL_handshake syscall: %s", 1415 strerror(errno)); 1416 /* closed */ 1417 if(info) dtio_stop_flush_exit(info); 1418 dtio_del_output_event(dtio); 1419 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1420 dtio_close_output(dtio); 1421 return 0; 1422 } else { 1423 unsigned long err = ERR_get_error(); 1424 if(!squelch_err_ssl_handshake(err)) { 1425 log_crypto_err_code("dnstap io, ssl handshake failed", 1426 err); 1427 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1428 "from %s", dtio->ip_str); 1429 } 1430 /* closed */ 1431 if(info) dtio_stop_flush_exit(info); 1432 dtio_del_output_event(dtio); 1433 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1434 dtio_close_output(dtio); 1435 return 0; 1436 } 1437 1438 } 1439 /* check peer verification */ 1440 dtio->ssl_handshake_done = 1; 1441 1442 if(!dtio_ssl_check_peer(dtio)) { 1443 /* closed */ 1444 if(info) dtio_stop_flush_exit(info); 1445 dtio_del_output_event(dtio); 1446 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1447 dtio_close_output(dtio); 1448 return 0; 1449 } 1450 return 1; 1451 } 1452 #endif /* HAVE_SSL */ 1453 1454 /** callback for the dnstap events, to write to the output */ 1455 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1456 { 1457 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1458 int i; 1459 1460 if(dtio->check_nb_connect) { 1461 int connect_err = dtio_check_nb_connect(dtio); 1462 if(connect_err == -1) { 1463 /* close the channel */ 1464 dtio_del_output_event(dtio); 1465 dtio_close_output(dtio); 1466 return; 1467 } else if(connect_err == 0) { 1468 /* try again later */ 1469 return; 1470 } 1471 /* nonblocking connect check passed, continue */ 1472 } 1473 1474 #ifdef HAVE_SSL 1475 if(dtio->ssl && 1476 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1477 if(!dtio_ssl_handshake(dtio, NULL)) 1478 return; 1479 } 1480 #endif 1481 1482 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1483 if(dtio->ssl_brief_write) 1484 (void)dtio_disable_brief_write(dtio); 1485 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1486 if(dtio_read_accept_frame(dtio) <= 0) 1487 return; 1488 } else if(!dtio_check_close(dtio)) 1489 return; 1490 } 1491 1492 /* loop to process a number of messages. This improves throughput, 1493 * because selecting on write-event if not needed for busy messages 1494 * (dnstap log) generation and if they need to all be written back. 1495 * The write event is usually not blocked up. But not forever, 1496 * because the event loop needs to stay responsive for other events. 1497 * If there are no (more) messages, or if the output buffers get 1498 * full, it returns out of the loop. */ 1499 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1500 /* see if there are messages that need writing */ 1501 if(!dtio->cur_msg) { 1502 if(!dtio_find_msg(dtio)) { 1503 if(i == 0) { 1504 /* no messages on the first iteration, 1505 * the queues are all empty */ 1506 dtio_sleep(dtio); 1507 } 1508 return; /* nothing to do */ 1509 } 1510 } 1511 1512 /* write it */ 1513 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1514 if(!dtio_write_more(dtio)) 1515 return; 1516 } 1517 1518 /* done with the current message */ 1519 dtio_cur_msg_free(dtio); 1520 1521 /* If this is a bidirectional stream the first message will be 1522 * the READY control frame. We can only continue writing after 1523 * receiving an ACCEPT control frame. */ 1524 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1525 dtio->ready_frame_sent = 1; 1526 (void)dtio_add_output_event_read(dtio); 1527 break; 1528 } 1529 } 1530 } 1531 1532 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1533 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1534 { 1535 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1536 uint8_t cmd; 1537 ssize_t r; 1538 if(dtio->want_to_exit) 1539 return; 1540 r = read(fd, &cmd, sizeof(cmd)); 1541 if(r == -1) { 1542 #ifndef USE_WINSOCK 1543 if(errno == EINTR || errno == EAGAIN) 1544 return; /* ignore this */ 1545 #else 1546 if(WSAGetLastError() == WSAEINPROGRESS) 1547 return; 1548 if(WSAGetLastError() == WSAEWOULDBLOCK) 1549 return; 1550 #endif 1551 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 1552 /* and then fall through to quit the thread */ 1553 } else if(r == 0) { 1554 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1555 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1556 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1557 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1558 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1559 1560 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1561 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1562 "waiting for ACCEPT control frame"); 1563 return; 1564 } 1565 1566 /* reregister event */ 1567 if(!dtio_add_output_event_write(dtio)) 1568 return; 1569 return; 1570 } else if(r == 1) { 1571 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1572 } 1573 dtio->want_to_exit = 1; 1574 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1575 != 0) { 1576 log_err("dnstap io: could not loopexit"); 1577 } 1578 } 1579 1580 #ifndef THREADS_DISABLED 1581 /** setup the event base for the dnstap io thread */ 1582 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1583 struct timeval* now) 1584 { 1585 memset(now, 0, sizeof(*now)); 1586 dtio->event_base = ub_default_event_base(0, secs, now); 1587 if(!dtio->event_base) { 1588 fatal_exit("dnstap io: could not create event_base"); 1589 } 1590 } 1591 #endif /* THREADS_DISABLED */ 1592 1593 /** setup the cmd event for dnstap io */ 1594 static void dtio_setup_cmd(struct dt_io_thread* dtio) 1595 { 1596 struct ub_event* cmdev; 1597 fd_set_nonblock(dtio->commandpipe[0]); 1598 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1599 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1600 if(!cmdev) { 1601 fatal_exit("dnstap io: out of memory"); 1602 } 1603 dtio->command_event = cmdev; 1604 if(ub_event_add(cmdev, NULL) != 0) { 1605 fatal_exit("dnstap io: out of memory (adding event)"); 1606 } 1607 } 1608 1609 /** setup the reconnect event for dnstap io */ 1610 static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1611 { 1612 dtio_reconnect_clear(dtio); 1613 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1614 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1615 if(!dtio->reconnect_timer) { 1616 fatal_exit("dnstap io: out of memory"); 1617 } 1618 } 1619 1620 /** 1621 * structure to keep track of information during stop flush 1622 */ 1623 struct stop_flush_info { 1624 /** the event base during stop flush */ 1625 struct ub_event_base* base; 1626 /** did we already want to exit this stop-flush event base */ 1627 int want_to_exit_flush; 1628 /** has the timer fired */ 1629 int timer_done; 1630 /** the dtio */ 1631 struct dt_io_thread* dtio; 1632 /** the stop control frame */ 1633 void* stop_frame; 1634 /** length of the stop frame */ 1635 size_t stop_frame_len; 1636 /** how much we have done of the stop frame */ 1637 size_t stop_frame_done; 1638 }; 1639 1640 /** exit the stop flush base */ 1641 static void dtio_stop_flush_exit(struct stop_flush_info* info) 1642 { 1643 if(info->want_to_exit_flush) 1644 return; 1645 info->want_to_exit_flush = 1; 1646 if(ub_event_base_loopexit(info->base) != 0) { 1647 log_err("dnstap io: could not loopexit"); 1648 } 1649 } 1650 1651 /** send the stop control, 1652 * return true if completed the frame. */ 1653 static int dtio_control_stop_send(struct stop_flush_info* info) 1654 { 1655 struct dt_io_thread* dtio = info->dtio; 1656 int r; 1657 if(info->stop_frame_done >= info->stop_frame_len) 1658 return 1; 1659 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1660 info->stop_frame_done, info->stop_frame_len - 1661 info->stop_frame_done); 1662 if(r == -1) { 1663 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1664 dtio_stop_flush_exit(info); 1665 return 0; 1666 } 1667 if(r == 0) { 1668 /* try again later, or timeout */ 1669 return 0; 1670 } 1671 info->stop_frame_done += r; 1672 if(info->stop_frame_done < info->stop_frame_len) 1673 return 0; /* not done yet */ 1674 return 1; 1675 } 1676 1677 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1678 void* arg) 1679 { 1680 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1681 if(info->want_to_exit_flush) 1682 return; 1683 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1684 info->timer_done = 1; 1685 dtio_stop_flush_exit(info); 1686 } 1687 1688 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1689 { 1690 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1691 struct dt_io_thread* dtio = info->dtio; 1692 if(info->want_to_exit_flush) 1693 return; 1694 if(dtio->check_nb_connect) { 1695 /* we don't start the stop_flush if connect still 1696 * in progress, but the check code is here, just in case */ 1697 int connect_err = dtio_check_nb_connect(dtio); 1698 if(connect_err == -1) { 1699 /* close the channel, exit the stop flush */ 1700 dtio_stop_flush_exit(info); 1701 dtio_del_output_event(dtio); 1702 dtio_close_output(dtio); 1703 return; 1704 } else if(connect_err == 0) { 1705 /* try again later */ 1706 return; 1707 } 1708 /* nonblocking connect check passed, continue */ 1709 } 1710 #ifdef HAVE_SSL 1711 if(dtio->ssl && 1712 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1713 if(!dtio_ssl_handshake(dtio, info)) 1714 return; 1715 } 1716 #endif 1717 1718 if((bits&UB_EV_READ)) { 1719 if(!dtio_check_close(dtio)) { 1720 if(dtio->fd == -1) { 1721 verbose(VERB_ALGO, "dnstap io: " 1722 "stop flush: output closed"); 1723 dtio_stop_flush_exit(info); 1724 } 1725 return; 1726 } 1727 } 1728 /* write remainder of last frame */ 1729 if(dtio->cur_msg) { 1730 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1731 if(!dtio_write_more(dtio)) { 1732 if(dtio->fd == -1) { 1733 verbose(VERB_ALGO, "dnstap io: " 1734 "stop flush: output closed"); 1735 dtio_stop_flush_exit(info); 1736 } 1737 return; 1738 } 1739 } 1740 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1741 "last frame"); 1742 dtio_cur_msg_free(dtio); 1743 } 1744 /* write stop frame */ 1745 if(info->stop_frame_done < info->stop_frame_len) { 1746 if(!dtio_control_stop_send(info)) 1747 return; 1748 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1749 "stop control frame"); 1750 } 1751 /* when last frame and stop frame are sent, exit */ 1752 dtio_stop_flush_exit(info); 1753 } 1754 1755 /** flush at end, last packet and stop control */ 1756 static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1757 { 1758 /* briefly attempt to flush the previous packet to the output, 1759 * this could be a partial packet, or even the start control frame */ 1760 time_t secs = 0; 1761 struct timeval now; 1762 struct stop_flush_info info; 1763 struct timeval tv; 1764 struct ub_event* timer, *stopev; 1765 1766 if(dtio->fd == -1 || dtio->check_nb_connect) { 1767 /* no connection or we have just connected, so nothing is 1768 * sent yet, so nothing to stop or flush */ 1769 return; 1770 } 1771 if(dtio->ssl && !dtio->ssl_handshake_done) { 1772 /* no SSL connection has been established yet */ 1773 return; 1774 } 1775 1776 memset(&info, 0, sizeof(info)); 1777 memset(&now, 0, sizeof(now)); 1778 info.dtio = dtio; 1779 info.base = ub_default_event_base(0, &secs, &now); 1780 if(!info.base) { 1781 log_err("dnstap io: malloc failure"); 1782 return; 1783 } 1784 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1785 &dtio_stop_timer_cb, &info); 1786 if(!timer) { 1787 log_err("dnstap io: malloc failure"); 1788 ub_event_base_free(info.base); 1789 return; 1790 } 1791 memset(&tv, 0, sizeof(tv)); 1792 tv.tv_sec = 2; 1793 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1794 &tv) != 0) { 1795 log_err("dnstap io: cannot event_timer_add"); 1796 ub_event_free(timer); 1797 ub_event_base_free(info.base); 1798 return; 1799 } 1800 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1801 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1802 if(!stopev) { 1803 log_err("dnstap io: malloc failure"); 1804 ub_timer_del(timer); 1805 ub_event_free(timer); 1806 ub_event_base_free(info.base); 1807 return; 1808 } 1809 if(ub_event_add(stopev, NULL) != 0) { 1810 log_err("dnstap io: cannot event_add"); 1811 ub_event_free(stopev); 1812 ub_timer_del(timer); 1813 ub_event_free(timer); 1814 ub_event_base_free(info.base); 1815 return; 1816 } 1817 info.stop_frame = fstrm_create_control_frame_stop( 1818 &info.stop_frame_len); 1819 if(!info.stop_frame) { 1820 log_err("dnstap io: malloc failure"); 1821 ub_event_del(stopev); 1822 ub_event_free(stopev); 1823 ub_timer_del(timer); 1824 ub_event_free(timer); 1825 ub_event_base_free(info.base); 1826 return; 1827 } 1828 dtio->stop_flush_event = stopev; 1829 1830 /* wait briefly, or until finished */ 1831 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1832 if(ub_event_base_dispatch(info.base) < 0) { 1833 log_err("dnstap io: dispatch flush failed, errno is %s", 1834 strerror(errno)); 1835 } 1836 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1837 free(info.stop_frame); 1838 dtio->stop_flush_event = NULL; 1839 ub_event_del(stopev); 1840 ub_event_free(stopev); 1841 ub_timer_del(timer); 1842 ub_event_free(timer); 1843 ub_event_base_free(info.base); 1844 } 1845 1846 /** perform desetup and free stuff when the dnstap io thread exits */ 1847 static void dtio_desetup(struct dt_io_thread* dtio) 1848 { 1849 dtio_control_stop_flush(dtio); 1850 dtio_del_output_event(dtio); 1851 dtio_close_output(dtio); 1852 ub_event_del(dtio->command_event); 1853 ub_event_free(dtio->command_event); 1854 #ifndef USE_WINSOCK 1855 close(dtio->commandpipe[0]); 1856 #else 1857 _close(dtio->commandpipe[0]); 1858 #endif 1859 dtio->commandpipe[0] = -1; 1860 dtio_reconnect_del(dtio); 1861 ub_event_free(dtio->reconnect_timer); 1862 dtio_cur_msg_free(dtio); 1863 #ifndef THREADS_DISABLED 1864 ub_event_base_free(dtio->event_base); 1865 #endif 1866 } 1867 1868 /** setup a start control message */ 1869 static int dtio_control_start_send(struct dt_io_thread* dtio) 1870 { 1871 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1872 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1873 &dtio->cur_msg_len); 1874 if(!dtio->cur_msg) { 1875 return 0; 1876 } 1877 /* setup to send the control message */ 1878 /* set that the buffer needs to be sent, but the length 1879 * of that buffer is already written, that way the buffer can 1880 * start with 0 length and then the length of the control frame 1881 * in it */ 1882 dtio->cur_msg_done = 0; 1883 dtio->cur_msg_len_done = 4; 1884 return 1; 1885 } 1886 1887 /** setup a ready control message */ 1888 static int dtio_control_ready_send(struct dt_io_thread* dtio) 1889 { 1890 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1891 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1892 &dtio->cur_msg_len); 1893 if(!dtio->cur_msg) { 1894 return 0; 1895 } 1896 /* setup to send the control message */ 1897 /* set that the buffer needs to be sent, but the length 1898 * of that buffer is already written, that way the buffer can 1899 * start with 0 length and then the length of the control frame 1900 * in it */ 1901 dtio->cur_msg_done = 0; 1902 dtio->cur_msg_len_done = 4; 1903 return 1; 1904 } 1905 1906 /** open the output file descriptor for af_local */ 1907 static int dtio_open_output_local(struct dt_io_thread* dtio) 1908 { 1909 #ifdef HAVE_SYS_UN_H 1910 struct sockaddr_un s; 1911 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1912 if(dtio->fd == -1) { 1913 log_err("dnstap io: failed to create socket: %s", 1914 sock_strerror(errno)); 1915 return 0; 1916 } 1917 memset(&s, 0, sizeof(s)); 1918 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1919 /* this member exists on BSDs, not Linux */ 1920 s.sun_len = (unsigned)sizeof(s); 1921 #endif 1922 s.sun_family = AF_LOCAL; 1923 /* length is 92-108, 104 on FreeBSD */ 1924 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1925 fd_set_nonblock(dtio->fd); 1926 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1927 == -1) { 1928 char* to = dtio->socket_path; 1929 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1930 verbosity < 4) { 1931 dtio_close_fd(dtio); 1932 return 0; /* no log retries on low verbosity */ 1933 } 1934 log_err("dnstap io: failed to connect to \"%s\": %s", 1935 to, sock_strerror(errno)); 1936 dtio_close_fd(dtio); 1937 return 0; 1938 } 1939 return 1; 1940 #else 1941 log_err("cannot create af_local socket"); 1942 return 0; 1943 #endif /* HAVE_SYS_UN_H */ 1944 } 1945 1946 /** open the output file descriptor for af_inet and af_inet6 */ 1947 static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1948 { 1949 struct sockaddr_storage addr; 1950 socklen_t addrlen; 1951 memset(&addr, 0, sizeof(addr)); 1952 addrlen = (socklen_t)sizeof(addr); 1953 1954 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1955 log_err("could not parse IP '%s'", dtio->ip_str); 1956 return 0; 1957 } 1958 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1959 if(dtio->fd == -1) { 1960 log_err("can't create socket: %s", sock_strerror(errno)); 1961 return 0; 1962 } 1963 fd_set_nonblock(dtio->fd); 1964 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1965 if(errno == EINPROGRESS) 1966 return 1; /* wait until connect done*/ 1967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1968 verbosity < 4) { 1969 dtio_close_fd(dtio); 1970 return 0; /* no log retries on low verbosity */ 1971 } 1972 #ifndef USE_WINSOCK 1973 if(tcp_connect_errno_needs_log( 1974 (struct sockaddr *)&addr, addrlen)) { 1975 log_err("dnstap io: failed to connect to %s: %s", 1976 dtio->ip_str, strerror(errno)); 1977 } 1978 #else 1979 if(WSAGetLastError() == WSAEINPROGRESS || 1980 WSAGetLastError() == WSAEWOULDBLOCK) 1981 return 1; /* wait until connect done*/ 1982 if(tcp_connect_errno_needs_log( 1983 (struct sockaddr *)&addr, addrlen)) { 1984 log_err("dnstap io: failed to connect to %s: %s", 1985 dtio->ip_str, wsa_strerror(WSAGetLastError())); 1986 } 1987 #endif 1988 dtio_close_fd(dtio); 1989 return 0; 1990 } 1991 return 1; 1992 } 1993 1994 /** setup the SSL structure for new connection */ 1995 static int dtio_setup_ssl(struct dt_io_thread* dtio) 1996 { 1997 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 1998 if(!dtio->ssl) return 0; 1999 dtio->ssl_handshake_done = 0; 2000 dtio->ssl_brief_read = 0; 2001 2002 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 2003 dtio->tls_use_sni)) { 2004 return 0; 2005 } 2006 return 1; 2007 } 2008 2009 /** open the output file descriptor */ 2010 static void dtio_open_output(struct dt_io_thread* dtio) 2011 { 2012 struct ub_event* ev; 2013 if(dtio->upstream_is_unix) { 2014 if(!dtio_open_output_local(dtio)) { 2015 dtio_reconnect_enable(dtio); 2016 return; 2017 } 2018 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 2019 if(!dtio_open_output_tcp(dtio)) { 2020 dtio_reconnect_enable(dtio); 2021 return; 2022 } 2023 if(dtio->upstream_is_tls) { 2024 if(!dtio_setup_ssl(dtio)) { 2025 dtio_close_fd(dtio); 2026 dtio_reconnect_enable(dtio); 2027 return; 2028 } 2029 } 2030 } 2031 dtio->check_nb_connect = 1; 2032 2033 /* the EV_READ is to read ACCEPT control messages, and catch channel 2034 * close. EV_WRITE is to write packets */ 2035 ev = ub_event_new(dtio->event_base, dtio->fd, 2036 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 2037 dtio); 2038 if(!ev) { 2039 log_err("dnstap io: out of memory"); 2040 if(dtio->ssl) { 2041 #ifdef HAVE_SSL 2042 SSL_free(dtio->ssl); 2043 dtio->ssl = NULL; 2044 #endif 2045 } 2046 dtio_close_fd(dtio); 2047 dtio_reconnect_enable(dtio); 2048 return; 2049 } 2050 dtio->event = ev; 2051 2052 /* setup protocol control message to start */ 2053 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2054 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2055 log_err("dnstap io: out of memory"); 2056 ub_event_free(dtio->event); 2057 dtio->event = NULL; 2058 if(dtio->ssl) { 2059 #ifdef HAVE_SSL 2060 SSL_free(dtio->ssl); 2061 dtio->ssl = NULL; 2062 #endif 2063 } 2064 dtio_close_fd(dtio); 2065 dtio_reconnect_enable(dtio); 2066 return; 2067 } 2068 } 2069 2070 /** perform the setup of the writer thread on the established event_base */ 2071 static void dtio_setup_on_base(struct dt_io_thread* dtio) 2072 { 2073 dtio_setup_cmd(dtio); 2074 dtio_setup_reconnect(dtio); 2075 dtio_open_output(dtio); 2076 if(!dtio_add_output_event_write(dtio)) 2077 return; 2078 } 2079 2080 #ifndef THREADS_DISABLED 2081 /** the IO thread function for the DNSTAP IO */ 2082 static void* dnstap_io(void* arg) 2083 { 2084 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2085 time_t secs = 0; 2086 struct timeval now; 2087 log_thread_set(&dtio->threadnum); 2088 2089 /* setup */ 2090 verbose(VERB_ALGO, "start dnstap io thread"); 2091 dtio_setup_base(dtio, &secs, &now); 2092 dtio_setup_on_base(dtio); 2093 2094 /* run */ 2095 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2096 log_err("dnstap io: dispatch failed, errno is %s", 2097 strerror(errno)); 2098 } 2099 2100 /* cleanup */ 2101 verbose(VERB_ALGO, "stop dnstap io thread"); 2102 dtio_desetup(dtio); 2103 return NULL; 2104 } 2105 #endif /* THREADS_DISABLED */ 2106 2107 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2108 int numworkers) 2109 { 2110 /* set up the thread, can fail */ 2111 #ifndef USE_WINSOCK 2112 if(pipe(dtio->commandpipe) == -1) { 2113 log_err("failed to create pipe: %s", strerror(errno)); 2114 return 0; 2115 } 2116 #else 2117 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2118 log_err("failed to create _pipe: %s", 2119 wsa_strerror(WSAGetLastError())); 2120 return 0; 2121 } 2122 #endif 2123 2124 /* start the thread */ 2125 dtio->threadnum = numworkers+1; 2126 dtio->started = 1; 2127 #ifndef THREADS_DISABLED 2128 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2129 (void)event_base_nothr; 2130 #else 2131 dtio->event_base = event_base_nothr; 2132 dtio_setup_on_base(dtio); 2133 #endif 2134 return 1; 2135 } 2136 2137 void dt_io_thread_stop(struct dt_io_thread* dtio) 2138 { 2139 #ifndef THREADS_DISABLED 2140 uint8_t cmd = DTIO_COMMAND_STOP; 2141 #endif 2142 if(!dtio) return; 2143 if(!dtio->started) return; 2144 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2145 2146 #ifndef THREADS_DISABLED 2147 while(1) { 2148 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2149 if(r == -1) { 2150 #ifndef USE_WINSOCK 2151 if(errno == EINTR || errno == EAGAIN) 2152 continue; 2153 #else 2154 if(WSAGetLastError() == WSAEINPROGRESS) 2155 continue; 2156 if(WSAGetLastError() == WSAEWOULDBLOCK) 2157 continue; 2158 #endif 2159 log_err("dnstap io stop: write: %s", 2160 sock_strerror(errno)); 2161 break; 2162 } 2163 break; 2164 } 2165 dtio->started = 0; 2166 #endif /* THREADS_DISABLED */ 2167 2168 #ifndef USE_WINSOCK 2169 close(dtio->commandpipe[1]); 2170 #else 2171 _close(dtio->commandpipe[1]); 2172 #endif 2173 dtio->commandpipe[1] = -1; 2174 #ifndef THREADS_DISABLED 2175 ub_thread_join(dtio->tid); 2176 #else 2177 dtio->want_to_exit = 1; 2178 dtio_desetup(dtio); 2179 #endif 2180 } 2181