1 /* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #include "config.h" 45 #include "dnstap/dtstream.h" 46 #include "dnstap/dnstap_fstrm.h" 47 #include "util/config_file.h" 48 #include "util/ub_event.h" 49 #include "util/net_help.h" 50 #include "services/outside_network.h" 51 #include "sldns/sbuffer.h" 52 #ifdef HAVE_SYS_UN_H 53 #include <sys/un.h> 54 #endif 55 #include <fcntl.h> 56 #ifdef HAVE_OPENSSL_SSL_H 57 #include <openssl/ssl.h> 58 #endif 59 #ifdef HAVE_OPENSSL_ERR_H 60 #include <openssl/err.h> 61 #endif 62 63 /** number of messages to process in one output callback */ 64 #define DTIO_MESSAGES_PER_CALLBACK 100 65 /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66 #define DTIO_RECONNECT_TIMEOUT_MIN 10 67 /** the msec to wait for reconnect max after backoff */ 68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71 72 /** maximum length of received frame */ 73 #define DTIO_RECV_FRAME_MAX_LEN 1000 74 75 struct stop_flush_info; 76 /** DTIO command channel commands */ 77 enum { 78 /** DTIO command channel stop */ 79 DTIO_COMMAND_STOP = 0, 80 /** DTIO command channel wakeup */ 81 DTIO_COMMAND_WAKEUP = 1 82 } dtio_channel_command; 83 84 /** open the output channel */ 85 static void dtio_open_output(struct dt_io_thread* dtio); 86 /** add output event for read and write */ 87 static int dtio_add_output_event_write(struct dt_io_thread* dtio); 88 /** start reconnection attempts */ 89 static void dtio_reconnect_enable(struct dt_io_thread* dtio); 90 /** stop from stop_flush event loop */ 91 static void dtio_stop_flush_exit(struct stop_flush_info* info); 92 /** setup a start control message */ 93 static int dtio_control_start_send(struct dt_io_thread* dtio); 94 #ifdef HAVE_SSL 95 /** enable briefly waiting for a read event, for SSL negotiation */ 96 static int dtio_enable_brief_read(struct dt_io_thread* dtio); 97 /** enable briefly waiting for a write event, for SSL negotiation */ 98 static int dtio_enable_brief_write(struct dt_io_thread* dtio); 99 #endif 100 101 struct dt_msg_queue* 102 dt_msg_queue_create(void) 103 { 104 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 105 if(!mq) return NULL; 106 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 107 about 1 M should contain 64K messages with some overhead, 108 or a whole bunch smaller ones */ 109 lock_basic_init(&mq->lock); 110 lock_protect(&mq->lock, mq, sizeof(*mq)); 111 return mq; 112 } 113 114 /** clear the message list, caller must hold the lock */ 115 static void 116 dt_msg_queue_clear(struct dt_msg_queue* mq) 117 { 118 struct dt_msg_entry* e = mq->first, *next=NULL; 119 while(e) { 120 next = e->next; 121 free(e->buf); 122 free(e); 123 e = next; 124 } 125 mq->first = NULL; 126 mq->last = NULL; 127 mq->cursize = 0; 128 } 129 130 void 131 dt_msg_queue_delete(struct dt_msg_queue* mq) 132 { 133 if(!mq) return; 134 lock_basic_destroy(&mq->lock); 135 dt_msg_queue_clear(mq); 136 free(mq); 137 } 138 139 /** make the dtio wake up by sending a wakeup command */ 140 static void dtio_wakeup(struct dt_io_thread* dtio) 141 { 142 uint8_t cmd = DTIO_COMMAND_WAKEUP; 143 if(!dtio) return; 144 if(!dtio->started) return; 145 146 while(1) { 147 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 148 if(r == -1) { 149 #ifndef USE_WINSOCK 150 if(errno == EINTR || errno == EAGAIN) 151 continue; 152 log_err("dnstap io wakeup: write: %s", strerror(errno)); 153 #else 154 if(WSAGetLastError() == WSAEINPROGRESS) 155 continue; 156 if(WSAGetLastError() == WSAEWOULDBLOCK) 157 continue; 158 log_err("dnstap io stop: write: %s", 159 wsa_strerror(WSAGetLastError())); 160 #endif 161 break; 162 } 163 break; 164 } 165 } 166 167 void 168 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 169 { 170 int wakeup = 0; 171 struct dt_msg_entry* entry; 172 173 /* check conditions */ 174 if(!buf) return; 175 if(len == 0) { 176 /* it is not possible to log entries with zero length, 177 * because the framestream protocol does not carry it. 178 * However the protobuf serialization does not create zero 179 * length datagrams for dnstap, so this should not happen. */ 180 free(buf); 181 return; 182 } 183 if(!mq) { 184 free(buf); 185 return; 186 } 187 188 /* allocate memory for queue entry */ 189 entry = malloc(sizeof(*entry)); 190 if(!entry) { 191 log_err("out of memory logging dnstap"); 192 free(buf); 193 return; 194 } 195 entry->next = NULL; 196 entry->buf = buf; 197 entry->len = len; 198 199 /* aqcuire lock */ 200 lock_basic_lock(&mq->lock); 201 /* list was empty, wakeup dtio */ 202 if(mq->first == NULL) 203 wakeup = 1; 204 /* see if it is going to fit */ 205 if(mq->cursize + len > mq->maxsize) { 206 /* buffer full, or congested. */ 207 /* drop */ 208 lock_basic_unlock(&mq->lock); 209 free(buf); 210 free(entry); 211 return; 212 } 213 mq->cursize += len; 214 /* append to list */ 215 if(mq->last) { 216 mq->last->next = entry; 217 } else { 218 mq->first = entry; 219 } 220 mq->last = entry; 221 /* release lock */ 222 lock_basic_unlock(&mq->lock); 223 224 if(wakeup) 225 dtio_wakeup(mq->dtio); 226 } 227 228 struct dt_io_thread* dt_io_thread_create(void) 229 { 230 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 231 return dtio; 232 } 233 234 void dt_io_thread_delete(struct dt_io_thread* dtio) 235 { 236 struct dt_io_list_item* item, *nextitem; 237 if(!dtio) return; 238 item=dtio->io_list; 239 while(item) { 240 nextitem = item->next; 241 free(item); 242 item = nextitem; 243 } 244 free(dtio->socket_path); 245 free(dtio->ip_str); 246 free(dtio->tls_server_name); 247 free(dtio->client_key_file); 248 free(dtio->client_cert_file); 249 if(dtio->ssl_ctx) { 250 #ifdef HAVE_SSL 251 SSL_CTX_free(dtio->ssl_ctx); 252 #endif 253 } 254 free(dtio); 255 } 256 257 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 258 { 259 if(!cfg->dnstap) { 260 log_warn("cannot setup dnstap because dnstap-enable is no"); 261 return 0; 262 } 263 264 /* what type of connectivity do we have */ 265 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 266 if(cfg->dnstap_tls) 267 dtio->upstream_is_tls = 1; 268 else dtio->upstream_is_tcp = 1; 269 } else { 270 dtio->upstream_is_unix = 1; 271 } 272 dtio->is_bidirectional = cfg->dnstap_bidirectional; 273 274 if(dtio->upstream_is_unix) { 275 if(!cfg->dnstap_socket_path || 276 cfg->dnstap_socket_path[0]==0) { 277 log_err("dnstap setup: no dnstap-socket-path for " 278 "socket connect"); 279 return 0; 280 } 281 free(dtio->socket_path); 282 dtio->socket_path = strdup(cfg->dnstap_socket_path); 283 if(!dtio->socket_path) { 284 log_err("dnstap setup: malloc failure"); 285 return 0; 286 } 287 } 288 289 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 290 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 291 log_err("dnstap setup: no dnstap-ip for TCP connect"); 292 return 0; 293 } 294 free(dtio->ip_str); 295 dtio->ip_str = strdup(cfg->dnstap_ip); 296 if(!dtio->ip_str) { 297 log_err("dnstap setup: malloc failure"); 298 return 0; 299 } 300 } 301 302 if(dtio->upstream_is_tls) { 303 #ifdef HAVE_SSL 304 if(cfg->dnstap_tls_server_name && 305 cfg->dnstap_tls_server_name[0]) { 306 free(dtio->tls_server_name); 307 dtio->tls_server_name = strdup( 308 cfg->dnstap_tls_server_name); 309 if(!dtio->tls_server_name) { 310 log_err("dnstap setup: malloc failure"); 311 return 0; 312 } 313 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 314 return 0; 315 } 316 if(cfg->dnstap_tls_client_key_file && 317 cfg->dnstap_tls_client_key_file[0]) { 318 dtio->use_client_certs = 1; 319 free(dtio->client_key_file); 320 dtio->client_key_file = strdup( 321 cfg->dnstap_tls_client_key_file); 322 if(!dtio->client_key_file) { 323 log_err("dnstap setup: malloc failure"); 324 return 0; 325 } 326 if(!cfg->dnstap_tls_client_cert_file || 327 cfg->dnstap_tls_client_cert_file[0]==0) { 328 log_err("dnstap setup: client key " 329 "authentication enabled with " 330 "dnstap-tls-client-key-file, but " 331 "no dnstap-tls-client-cert-file " 332 "is given"); 333 return 0; 334 } 335 free(dtio->client_cert_file); 336 dtio->client_cert_file = strdup( 337 cfg->dnstap_tls_client_cert_file); 338 if(!dtio->client_cert_file) { 339 log_err("dnstap setup: malloc failure"); 340 return 0; 341 } 342 } else { 343 dtio->use_client_certs = 0; 344 dtio->client_key_file = NULL; 345 dtio->client_cert_file = NULL; 346 } 347 348 if(cfg->dnstap_tls_cert_bundle) { 349 dtio->ssl_ctx = connect_sslctx_create( 350 dtio->client_key_file, 351 dtio->client_cert_file, 352 cfg->dnstap_tls_cert_bundle, 0); 353 } else { 354 dtio->ssl_ctx = connect_sslctx_create( 355 dtio->client_key_file, 356 dtio->client_cert_file, 357 cfg->tls_cert_bundle, cfg->tls_win_cert); 358 } 359 if(!dtio->ssl_ctx) { 360 log_err("could not setup SSL CTX"); 361 return 0; 362 } 363 dtio->tls_use_sni = cfg->tls_use_sni; 364 #endif /* HAVE_SSL */ 365 } 366 return 1; 367 } 368 369 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 370 struct dt_msg_queue* mq) 371 { 372 struct dt_io_list_item* item = malloc(sizeof(*item)); 373 if(!item) return 0; 374 lock_basic_lock(&mq->lock); 375 mq->dtio = dtio; 376 lock_basic_unlock(&mq->lock); 377 item->queue = mq; 378 item->next = dtio->io_list; 379 dtio->io_list = item; 380 dtio->io_list_iter = NULL; 381 return 1; 382 } 383 384 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 385 struct dt_msg_queue* mq) 386 { 387 struct dt_io_list_item* item, *prev=NULL; 388 if(!dtio) return; 389 item = dtio->io_list; 390 while(item) { 391 if(item->queue == mq) { 392 /* found it */ 393 if(prev) prev->next = item->next; 394 else dtio->io_list = item->next; 395 /* the queue itself only registered, not deleted */ 396 lock_basic_lock(&item->queue->lock); 397 item->queue->dtio = NULL; 398 lock_basic_unlock(&item->queue->lock); 399 free(item); 400 dtio->io_list_iter = NULL; 401 return; 402 } 403 prev = item; 404 item = item->next; 405 } 406 } 407 408 /** pick a message from the queue, the routine locks and unlocks, 409 * returns true if there is a message */ 410 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 411 size_t* len) 412 { 413 lock_basic_lock(&mq->lock); 414 if(mq->first) { 415 struct dt_msg_entry* entry = mq->first; 416 mq->first = entry->next; 417 if(!entry->next) mq->last = NULL; 418 mq->cursize -= entry->len; 419 lock_basic_unlock(&mq->lock); 420 421 *buf = entry->buf; 422 *len = entry->len; 423 free(entry); 424 return 1; 425 } 426 lock_basic_unlock(&mq->lock); 427 return 0; 428 } 429 430 /** find message in queue, false if no message, true if message to send */ 431 static int dtio_find_in_queue(struct dt_io_thread* dtio, 432 struct dt_msg_queue* mq) 433 { 434 void* buf=NULL; 435 size_t len=0; 436 if(dt_msg_queue_pop(mq, &buf, &len)) { 437 dtio->cur_msg = buf; 438 dtio->cur_msg_len = len; 439 dtio->cur_msg_done = 0; 440 dtio->cur_msg_len_done = 0; 441 return 1; 442 } 443 return 0; 444 } 445 446 /** find a new message to write, search message queues, false if none */ 447 static int dtio_find_msg(struct dt_io_thread* dtio) 448 { 449 struct dt_io_list_item *spot, *item; 450 451 spot = dtio->io_list_iter; 452 /* use the next queue for the next message lookup, 453 * if we hit the end(NULL) the NULL restarts the iter at start. */ 454 if(spot) 455 dtio->io_list_iter = spot->next; 456 else if(dtio->io_list) 457 dtio->io_list_iter = dtio->io_list->next; 458 459 /* scan from spot to end-of-io_list */ 460 item = spot; 461 while(item) { 462 if(dtio_find_in_queue(dtio, item->queue)) 463 return 1; 464 item = item->next; 465 } 466 /* scan starting at the start-of-list (to wrap around the end) */ 467 item = dtio->io_list; 468 while(item) { 469 if(dtio_find_in_queue(dtio, item->queue)) 470 return 1; 471 item = item->next; 472 } 473 return 0; 474 } 475 476 /** callback for the dnstap reconnect, to start reconnecting to output */ 477 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 478 short ATTR_UNUSED(bits), void* arg) 479 { 480 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 481 dtio->reconnect_is_added = 0; 482 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 483 484 dtio_open_output(dtio); 485 if(dtio->event) { 486 if(!dtio_add_output_event_write(dtio)) 487 return; 488 /* nothing wrong so far, wait on the output event */ 489 return; 490 } 491 /* exponential backoff and retry on timer */ 492 dtio_reconnect_enable(dtio); 493 } 494 495 /** attempt to reconnect to the output, after a timeout */ 496 static void dtio_reconnect_enable(struct dt_io_thread* dtio) 497 { 498 struct timeval tv; 499 int msec; 500 if(dtio->want_to_exit) return; 501 if(dtio->reconnect_is_added) 502 return; /* already done */ 503 504 /* exponential backoff, store the value for next timeout */ 505 msec = dtio->reconnect_timeout; 506 if(msec == 0) { 507 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 508 } else { 509 dtio->reconnect_timeout = msec*2; 510 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 511 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 512 } 513 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 514 msec); 515 516 /* setup wait timer */ 517 memset(&tv, 0, sizeof(tv)); 518 tv.tv_sec = msec/1000; 519 tv.tv_usec = (msec%1000)*1000; 520 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 521 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 522 log_err("dnstap io: could not reconnect ev timer add"); 523 return; 524 } 525 dtio->reconnect_is_added = 1; 526 } 527 528 /** remove dtio reconnect timer */ 529 static void dtio_reconnect_del(struct dt_io_thread* dtio) 530 { 531 if(!dtio->reconnect_is_added) 532 return; 533 ub_timer_del(dtio->reconnect_timer); 534 dtio->reconnect_is_added = 0; 535 } 536 537 /** clear the reconnect exponential backoff timer. 538 * We have successfully connected so we can try again with short timeouts. */ 539 static void dtio_reconnect_clear(struct dt_io_thread* dtio) 540 { 541 dtio->reconnect_timeout = 0; 542 dtio_reconnect_del(dtio); 543 } 544 545 /** reconnect slowly, because we already know we have to wait for a bit */ 546 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 547 { 548 dtio_reconnect_del(dtio); 549 dtio->reconnect_timeout = msec; 550 dtio_reconnect_enable(dtio); 551 } 552 553 /** delete the current message in the dtio, and reset counters */ 554 static void dtio_cur_msg_free(struct dt_io_thread* dtio) 555 { 556 free(dtio->cur_msg); 557 dtio->cur_msg = NULL; 558 dtio->cur_msg_len = 0; 559 dtio->cur_msg_done = 0; 560 dtio->cur_msg_len_done = 0; 561 } 562 563 /** delete the buffer and counters used to read frame */ 564 static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 565 { 566 if(rb->buf) { 567 free(rb->buf); 568 rb->buf = NULL; 569 } 570 rb->buf_count = 0; 571 rb->buf_cap = 0; 572 rb->frame_len = 0; 573 rb->frame_len_done = 0; 574 rb->control_frame = 0; 575 } 576 577 /** del the output file descriptor event for listening */ 578 static void dtio_del_output_event(struct dt_io_thread* dtio) 579 { 580 if(!dtio->event_added) 581 return; 582 ub_event_del(dtio->event); 583 dtio->event_added = 0; 584 dtio->event_added_is_write = 0; 585 } 586 587 /** close dtio socket and set it to -1 */ 588 static void dtio_close_fd(struct dt_io_thread* dtio) 589 { 590 #ifndef USE_WINSOCK 591 close(dtio->fd); 592 #else 593 closesocket(dtio->fd); 594 #endif 595 dtio->fd = -1; 596 } 597 598 /** close and stop the output file descriptor event */ 599 static void dtio_close_output(struct dt_io_thread* dtio) 600 { 601 if(!dtio->event) 602 return; 603 ub_event_free(dtio->event); 604 dtio->event = NULL; 605 if(dtio->ssl) { 606 #ifdef HAVE_SSL 607 SSL_shutdown(dtio->ssl); 608 SSL_free(dtio->ssl); 609 dtio->ssl = NULL; 610 #endif 611 } 612 dtio_close_fd(dtio); 613 614 /* if there is a (partial) message, discard it 615 * we cannot send (the remainder of) it, and a new 616 * connection needs to start with a control frame. */ 617 if(dtio->cur_msg) { 618 dtio_cur_msg_free(dtio); 619 } 620 621 dtio->ready_frame_sent = 0; 622 dtio->accept_frame_received = 0; 623 dtio_read_frame_free(&dtio->read_frame); 624 625 dtio_reconnect_enable(dtio); 626 } 627 628 /** check for pending nonblocking connect errors, 629 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 630 static int dtio_check_nb_connect(struct dt_io_thread* dtio) 631 { 632 int error = 0; 633 socklen_t len = (socklen_t)sizeof(error); 634 if(!dtio->check_nb_connect) 635 return 1; /* everything okay */ 636 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 637 &len) < 0) { 638 #ifndef USE_WINSOCK 639 error = errno; /* on solaris errno is error */ 640 #else 641 error = WSAGetLastError(); 642 #endif 643 } 644 #ifndef USE_WINSOCK 645 #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 646 if(error == EINPROGRESS || error == EWOULDBLOCK) 647 return 0; /* try again later */ 648 #endif 649 #else 650 if(error == WSAEINPROGRESS) { 651 return 0; /* try again later */ 652 } else if(error == WSAEWOULDBLOCK) { 653 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 654 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 655 return 0; /* try again later */ 656 } 657 #endif 658 if(error != 0) { 659 char* to = dtio->socket_path; 660 if(!to) to = dtio->ip_str; 661 if(!to) to = ""; 662 #ifndef USE_WINSOCK 663 log_err("dnstap io: failed to connect to \"%s\": %s", 664 to, strerror(error)); 665 #else 666 log_err("dnstap io: failed to connect to \"%s\": %s", 667 to, wsa_strerror(error)); 668 #endif 669 return -1; /* error, close it */ 670 } 671 672 if(dtio->ip_str) 673 verbose(VERB_DETAIL, "dnstap io: connected to %s", 674 dtio->ip_str); 675 else if(dtio->socket_path) 676 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 677 dtio->socket_path); 678 dtio_reconnect_clear(dtio); 679 dtio->check_nb_connect = 0; 680 return 1; /* everything okay */ 681 } 682 683 #ifdef HAVE_SSL 684 /** write to ssl output 685 * returns number of bytes written, 0 if nothing happened, 686 * try again later, or -1 if the channel is to be closed. */ 687 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 688 size_t len) 689 { 690 int r; 691 ERR_clear_error(); 692 r = SSL_write(dtio->ssl, buf, len); 693 if(r <= 0) { 694 int want = SSL_get_error(dtio->ssl, r); 695 if(want == SSL_ERROR_ZERO_RETURN) { 696 /* closed */ 697 return -1; 698 } else if(want == SSL_ERROR_WANT_READ) { 699 /* we want a brief read event */ 700 dtio_enable_brief_read(dtio); 701 return 0; 702 } else if(want == SSL_ERROR_WANT_WRITE) { 703 /* write again later */ 704 return 0; 705 } else if(want == SSL_ERROR_SYSCALL) { 706 #ifdef EPIPE 707 if(errno == EPIPE && verbosity < 2) 708 return -1; /* silence 'broken pipe' */ 709 #endif 710 #ifdef ECONNRESET 711 if(errno == ECONNRESET && verbosity < 2) 712 return -1; /* silence reset by peer */ 713 #endif 714 if(errno != 0) { 715 log_err("dnstap io, SSL_write syscall: %s", 716 strerror(errno)); 717 } 718 return -1; 719 } 720 log_crypto_err("dnstap io, could not SSL_write"); 721 return -1; 722 } 723 return r; 724 } 725 #endif /* HAVE_SSL */ 726 727 /** write buffer to output. 728 * returns number of bytes written, 0 if nothing happened, 729 * try again later, or -1 if the channel is to be closed. */ 730 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 731 size_t len) 732 { 733 ssize_t ret; 734 if(dtio->fd == -1) 735 return -1; 736 #ifdef HAVE_SSL 737 if(dtio->ssl) 738 return dtio_write_ssl(dtio, buf, len); 739 #endif 740 ret = send(dtio->fd, (void*)buf, len, 0); 741 if(ret == -1) { 742 #ifndef USE_WINSOCK 743 if(errno == EINTR || errno == EAGAIN) 744 return 0; 745 log_err("dnstap io: failed send: %s", strerror(errno)); 746 #else 747 if(WSAGetLastError() == WSAEINPROGRESS) 748 return 0; 749 if(WSAGetLastError() == WSAEWOULDBLOCK) { 750 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 751 dtio->stop_flush_event:dtio->event), 752 UB_EV_WRITE); 753 return 0; 754 } 755 log_err("dnstap io: failed send: %s", 756 wsa_strerror(WSAGetLastError())); 757 #endif 758 return -1; 759 } 760 return ret; 761 } 762 763 #ifdef HAVE_WRITEV 764 /** write with writev, len and message, in one write, if possible. 765 * return true if message is done, false if incomplete */ 766 static int dtio_write_with_writev(struct dt_io_thread* dtio) 767 { 768 uint32_t sendlen = htonl(dtio->cur_msg_len); 769 struct iovec iov[2]; 770 ssize_t r; 771 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 772 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 773 iov[1].iov_base = dtio->cur_msg; 774 iov[1].iov_len = dtio->cur_msg_len; 775 log_assert(iov[0].iov_len > 0); 776 r = writev(dtio->fd, iov, 2); 777 if(r == -1) { 778 #ifndef USE_WINSOCK 779 if(errno == EINTR || errno == EAGAIN) 780 return 0; 781 log_err("dnstap io: failed writev: %s", strerror(errno)); 782 #else 783 if(WSAGetLastError() == WSAEINPROGRESS) 784 return 0; 785 if(WSAGetLastError() == WSAEWOULDBLOCK) { 786 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 787 dtio->stop_flush_event:dtio->event), 788 UB_EV_WRITE); 789 return 0; 790 } 791 log_err("dnstap io: failed writev: %s", 792 wsa_strerror(WSAGetLastError())); 793 #endif 794 /* close the channel */ 795 dtio_del_output_event(dtio); 796 dtio_close_output(dtio); 797 return 0; 798 } 799 /* written r bytes */ 800 dtio->cur_msg_len_done += r; 801 if(dtio->cur_msg_len_done < 4) 802 return 0; 803 if(dtio->cur_msg_len_done > 4) { 804 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 805 dtio->cur_msg_len_done = 4; 806 } 807 if(dtio->cur_msg_done < dtio->cur_msg_len) 808 return 0; 809 return 1; 810 } 811 #endif /* HAVE_WRITEV */ 812 813 /** write more of the length, preceding the data frame. 814 * return true if message is done, false if incomplete. */ 815 static int dtio_write_more_of_len(struct dt_io_thread* dtio) 816 { 817 uint32_t sendlen; 818 int r; 819 if(dtio->cur_msg_len_done >= 4) 820 return 1; 821 #ifdef HAVE_WRITEV 822 if(!dtio->ssl) { 823 /* we try writev for everything.*/ 824 return dtio_write_with_writev(dtio); 825 } 826 #endif /* HAVE_WRITEV */ 827 sendlen = htonl(dtio->cur_msg_len); 828 r = dtio_write_buf(dtio, 829 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 830 sizeof(sendlen)-dtio->cur_msg_len_done); 831 if(r == -1) { 832 /* close the channel */ 833 dtio_del_output_event(dtio); 834 dtio_close_output(dtio); 835 return 0; 836 } else if(r == 0) { 837 /* try again later */ 838 return 0; 839 } 840 dtio->cur_msg_len_done += r; 841 if(dtio->cur_msg_len_done < 4) 842 return 0; 843 return 1; 844 } 845 846 /** write more of the data frame. 847 * return true if message is done, false if incomplete. */ 848 static int dtio_write_more_of_data(struct dt_io_thread* dtio) 849 { 850 int r; 851 if(dtio->cur_msg_done >= dtio->cur_msg_len) 852 return 1; 853 r = dtio_write_buf(dtio, 854 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 855 dtio->cur_msg_len - dtio->cur_msg_done); 856 if(r == -1) { 857 /* close the channel */ 858 dtio_del_output_event(dtio); 859 dtio_close_output(dtio); 860 return 0; 861 } else if(r == 0) { 862 /* try again later */ 863 return 0; 864 } 865 dtio->cur_msg_done += r; 866 if(dtio->cur_msg_done < dtio->cur_msg_len) 867 return 0; 868 return 1; 869 } 870 871 /** write more of the current messsage. false if incomplete, true if 872 * the message is done */ 873 static int dtio_write_more(struct dt_io_thread* dtio) 874 { 875 if(dtio->cur_msg_len_done < 4) { 876 if(!dtio_write_more_of_len(dtio)) 877 return 0; 878 } 879 if(dtio->cur_msg_done < dtio->cur_msg_len) { 880 if(!dtio_write_more_of_data(dtio)) 881 return 0; 882 } 883 return 1; 884 } 885 886 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 887 * -1: continue, >0: number of bytes read into buffer */ 888 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 889 ssize_t r; 890 r = recv(dtio->fd, (void*)buf, len, 0); 891 if(r == -1) { 892 char* to = dtio->socket_path; 893 if(!to) to = dtio->ip_str; 894 if(!to) to = ""; 895 #ifndef USE_WINSOCK 896 if(errno == EINTR || errno == EAGAIN) 897 return -1; /* try later */ 898 #else 899 if(WSAGetLastError() == WSAEINPROGRESS) { 900 return -1; /* try later */ 901 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 902 ub_winsock_tcp_wouldblock( 903 (dtio->stop_flush_event? 904 dtio->stop_flush_event:dtio->event), 905 UB_EV_READ); 906 return -1; /* try later */ 907 } 908 #endif 909 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 910 verbosity < 4) 911 return 0; /* no log retries on low verbosity */ 912 log_err("dnstap io: output closed, recv %s: %s", to, 913 strerror(errno)); 914 /* and close below */ 915 return 0; 916 } 917 if(r == 0) { 918 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 919 verbosity < 4) 920 return 0; /* no log retries on low verbosity */ 921 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 922 /* and close below */ 923 return 0; 924 } 925 /* something was received */ 926 return r; 927 } 928 929 #ifdef HAVE_SSL 930 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 931 * -1: continue, >0: number of bytes read into buffer */ 932 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 933 { 934 int r; 935 ERR_clear_error(); 936 r = SSL_read(dtio->ssl, buf, len); 937 if(r <= 0) { 938 int want = SSL_get_error(dtio->ssl, r); 939 if(want == SSL_ERROR_ZERO_RETURN) { 940 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 941 verbosity < 4) 942 return 0; /* no log retries on low verbosity */ 943 verbose(VERB_DETAIL, "dnstap io: output closed by the " 944 "other side"); 945 return 0; 946 } else if(want == SSL_ERROR_WANT_READ) { 947 /* continue later */ 948 return -1; 949 } else if(want == SSL_ERROR_WANT_WRITE) { 950 (void)dtio_enable_brief_write(dtio); 951 return -1; 952 } else if(want == SSL_ERROR_SYSCALL) { 953 #ifdef ECONNRESET 954 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 955 errno == ECONNRESET && verbosity < 4) 956 return 0; /* silence reset by peer */ 957 #endif 958 if(errno != 0) 959 log_err("SSL_read syscall: %s", 960 strerror(errno)); 961 verbose(VERB_DETAIL, "dnstap io: output closed by the " 962 "other side"); 963 return 0; 964 } 965 log_crypto_err("could not SSL_read"); 966 verbose(VERB_DETAIL, "dnstap io: output closed by the " 967 "other side"); 968 return 0; 969 } 970 return r; 971 } 972 #endif /* HAVE_SSL */ 973 974 /** check if the output fd has been closed, 975 * it returns false if the stream is closed. */ 976 static int dtio_check_close(struct dt_io_thread* dtio) 977 { 978 /* we don't want to read any packets, but if there are we can 979 * discard the input (ignore it). Ignore of unknown (control) 980 * packets is okay for the framestream protocol. And also, the 981 * read call can return that the stream has been closed by the 982 * other side. */ 983 uint8_t buf[1024]; 984 int r = -1; 985 986 987 if(dtio->fd == -1) return 0; 988 989 while(r != 0) { 990 /* not interested in buffer content, overwrite */ 991 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 992 if(r == -1) 993 return 1; 994 } 995 /* the other end has been closed */ 996 /* close the channel */ 997 dtio_del_output_event(dtio); 998 dtio_close_output(dtio); 999 return 0; 1000 } 1001 1002 /** Read accept frame. Returns -1: continue reading, 0: closed, 1003 * 1: valid accept received. */ 1004 static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1005 { 1006 int r; 1007 size_t read_frame_done; 1008 while(dtio->read_frame.frame_len_done < 4) { 1009 #ifdef HAVE_SSL 1010 if(dtio->ssl) { 1011 r = ssl_read_bytes(dtio, 1012 (uint8_t*)&dtio->read_frame.frame_len+ 1013 dtio->read_frame.frame_len_done, 1014 4-dtio->read_frame.frame_len_done); 1015 } else { 1016 #endif 1017 r = receive_bytes(dtio, 1018 (uint8_t*)&dtio->read_frame.frame_len+ 1019 dtio->read_frame.frame_len_done, 1020 4-dtio->read_frame.frame_len_done); 1021 #ifdef HAVE_SSL 1022 } 1023 #endif 1024 if(r == -1) 1025 return -1; /* continue reading */ 1026 if(r == 0) { 1027 /* connection closed */ 1028 goto close_connection; 1029 } 1030 dtio->read_frame.frame_len_done += r; 1031 if(dtio->read_frame.frame_len_done < 4) 1032 return -1; /* continue reading */ 1033 1034 if(dtio->read_frame.frame_len == 0) { 1035 dtio->read_frame.frame_len_done = 0; 1036 dtio->read_frame.control_frame = 1; 1037 continue; 1038 } 1039 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1040 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1041 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1042 "length of %d bytes, closing connection", 1043 DTIO_RECV_FRAME_MAX_LEN); 1044 goto close_connection; 1045 } 1046 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1047 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1048 if(!dtio->read_frame.buf) { 1049 log_err("dnstap io: out of memory (creating read " 1050 "buffer)"); 1051 goto close_connection; 1052 } 1053 } 1054 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1055 #ifdef HAVE_SSL 1056 if(dtio->ssl) { 1057 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1058 dtio->read_frame.buf_count, 1059 dtio->read_frame.buf_cap- 1060 dtio->read_frame.buf_count); 1061 } else { 1062 #endif 1063 r = receive_bytes(dtio, dtio->read_frame.buf+ 1064 dtio->read_frame.buf_count, 1065 dtio->read_frame.buf_cap- 1066 dtio->read_frame.buf_count); 1067 #ifdef HAVE_SSL 1068 } 1069 #endif 1070 if(r == -1) 1071 return -1; /* continue reading */ 1072 if(r == 0) { 1073 /* connection closed */ 1074 goto close_connection; 1075 } 1076 dtio->read_frame.buf_count += r; 1077 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1078 return -1; /* continue reading */ 1079 } 1080 1081 /* Complete frame received, check if this is a valid ACCEPT control 1082 * frame. */ 1083 if(dtio->read_frame.frame_len < 4) { 1084 verbose(VERB_OPS, "dnstap: invalid data received"); 1085 goto close_connection; 1086 } 1087 if(sldns_read_uint32(dtio->read_frame.buf) != 1088 FSTRM_CONTROL_FRAME_ACCEPT) { 1089 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1090 "ignored"); 1091 dtio->ready_frame_sent = 0; 1092 dtio->accept_frame_received = 0; 1093 dtio_read_frame_free(&dtio->read_frame); 1094 return -1; 1095 } 1096 read_frame_done = 4; /* control frame type */ 1097 1098 /* Iterate over control fields, ignore unknown types. 1099 * Need to be able to read at least 8 bytes (control field type + 1100 * length). */ 1101 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1102 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1103 read_frame_done); 1104 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1105 read_frame_done + 4); 1106 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1107 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1108 read_frame_done+8+len <= 1109 dtio->read_frame.frame_len && 1110 memcmp(dtio->read_frame.buf + read_frame_done + 1111 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1112 if(!dtio_control_start_send(dtio)) { 1113 verbose(VERB_OPS, "dnstap io: out of " 1114 "memory while sending START frame"); 1115 goto close_connection; 1116 } 1117 dtio->accept_frame_received = 1; 1118 return 1; 1119 } else { 1120 /* unknow content type */ 1121 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1122 "contains unknown content type, " 1123 "closing connection"); 1124 goto close_connection; 1125 } 1126 } 1127 /* unknown option, try next */ 1128 read_frame_done += 8+len; 1129 } 1130 1131 1132 close_connection: 1133 dtio_del_output_event(dtio); 1134 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1135 dtio_close_output(dtio); 1136 return 0; 1137 } 1138 1139 /** add the output file descriptor event for listening, read only */ 1140 static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1141 { 1142 if(!dtio->event) 1143 return 0; 1144 if(dtio->event_added && !dtio->event_added_is_write) 1145 return 1; 1146 /* we have to (re-)register the event */ 1147 if(dtio->event_added) 1148 ub_event_del(dtio->event); 1149 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1150 if(ub_event_add(dtio->event, NULL) != 0) { 1151 log_err("dnstap io: out of memory (adding event)"); 1152 dtio->event_added = 0; 1153 dtio->event_added_is_write = 0; 1154 /* close output and start reattempts to open it */ 1155 dtio_close_output(dtio); 1156 return 0; 1157 } 1158 dtio->event_added = 1; 1159 dtio->event_added_is_write = 0; 1160 return 1; 1161 } 1162 1163 /** add the output file descriptor event for listening, read and write */ 1164 static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1165 { 1166 if(!dtio->event) 1167 return 0; 1168 if(dtio->event_added && dtio->event_added_is_write) 1169 return 1; 1170 /* we have to (re-)register the event */ 1171 if(dtio->event_added) 1172 ub_event_del(dtio->event); 1173 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1174 if(ub_event_add(dtio->event, NULL) != 0) { 1175 log_err("dnstap io: out of memory (adding event)"); 1176 dtio->event_added = 0; 1177 dtio->event_added_is_write = 0; 1178 /* close output and start reattempts to open it */ 1179 dtio_close_output(dtio); 1180 return 0; 1181 } 1182 dtio->event_added = 1; 1183 dtio->event_added_is_write = 1; 1184 return 1; 1185 } 1186 1187 /** put the dtio thread to sleep */ 1188 static void dtio_sleep(struct dt_io_thread* dtio) 1189 { 1190 /* unregister the event polling for write, because there is 1191 * nothing to be written */ 1192 (void)dtio_add_output_event_read(dtio); 1193 } 1194 1195 #ifdef HAVE_SSL 1196 /** enable the brief read condition */ 1197 static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1198 { 1199 dtio->ssl_brief_read = 1; 1200 if(dtio->stop_flush_event) { 1201 ub_event_del(dtio->stop_flush_event); 1202 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1203 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1204 log_err("dnstap io, stop flush, could not ub_event_add"); 1205 return 0; 1206 } 1207 return 1; 1208 } 1209 return dtio_add_output_event_read(dtio); 1210 } 1211 #endif /* HAVE_SSL */ 1212 1213 #ifdef HAVE_SSL 1214 /** disable the brief read condition */ 1215 static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1216 { 1217 dtio->ssl_brief_read = 0; 1218 if(dtio->stop_flush_event) { 1219 ub_event_del(dtio->stop_flush_event); 1220 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1221 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1222 log_err("dnstap io, stop flush, could not ub_event_add"); 1223 return 0; 1224 } 1225 return 1; 1226 } 1227 return dtio_add_output_event_write(dtio); 1228 } 1229 #endif /* HAVE_SSL */ 1230 1231 #ifdef HAVE_SSL 1232 /** enable the brief write condition */ 1233 static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1234 { 1235 dtio->ssl_brief_write = 1; 1236 return dtio_add_output_event_write(dtio); 1237 } 1238 #endif /* HAVE_SSL */ 1239 1240 #ifdef HAVE_SSL 1241 /** disable the brief write condition */ 1242 static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1243 { 1244 dtio->ssl_brief_write = 0; 1245 return dtio_add_output_event_read(dtio); 1246 } 1247 #endif /* HAVE_SSL */ 1248 1249 #ifdef HAVE_SSL 1250 /** check peer verification after ssl handshake connection, false if closed*/ 1251 static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1252 { 1253 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1254 /* verification */ 1255 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1256 X509* x = SSL_get_peer_certificate(dtio->ssl); 1257 if(!x) { 1258 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1259 "connection failed no certificate", 1260 dtio->ip_str); 1261 return 0; 1262 } 1263 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1264 x); 1265 #ifdef HAVE_SSL_GET0_PEERNAME 1266 if(SSL_get0_peername(dtio->ssl)) { 1267 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1268 "connection to %s authenticated", 1269 dtio->ip_str, 1270 SSL_get0_peername(dtio->ssl)); 1271 } else { 1272 #endif 1273 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1274 "connection authenticated", 1275 dtio->ip_str); 1276 #ifdef HAVE_SSL_GET0_PEERNAME 1277 } 1278 #endif 1279 X509_free(x); 1280 } else { 1281 X509* x = SSL_get_peer_certificate(dtio->ssl); 1282 if(x) { 1283 log_cert(VERB_ALGO, "dnstap io, peer " 1284 "certificate", x); 1285 X509_free(x); 1286 } 1287 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1288 "failed: failed to authenticate", 1289 dtio->ip_str); 1290 return 0; 1291 } 1292 } else { 1293 /* unauthenticated, the verify peer flag was not set 1294 * in ssl when the ssl object was created from ssl_ctx */ 1295 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1296 dtio->ip_str); 1297 } 1298 return 1; 1299 } 1300 #endif /* HAVE_SSL */ 1301 1302 #ifdef HAVE_SSL 1303 /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1304 static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1305 struct stop_flush_info* info) 1306 { 1307 int r; 1308 if(dtio->ssl_brief_read) { 1309 /* assume the brief read condition is satisfied, 1310 * if we need more or again, we can set it again */ 1311 if(!dtio_disable_brief_read(dtio)) { 1312 if(info) dtio_stop_flush_exit(info); 1313 return 0; 1314 } 1315 } 1316 if(dtio->ssl_handshake_done) 1317 return 1; 1318 1319 ERR_clear_error(); 1320 r = SSL_do_handshake(dtio->ssl); 1321 if(r != 1) { 1322 int want = SSL_get_error(dtio->ssl, r); 1323 if(want == SSL_ERROR_WANT_READ) { 1324 /* we want to read on the connection */ 1325 if(!dtio_enable_brief_read(dtio)) { 1326 if(info) dtio_stop_flush_exit(info); 1327 return 0; 1328 } 1329 return 0; 1330 } else if(want == SSL_ERROR_WANT_WRITE) { 1331 /* we want to write on the connection */ 1332 return 0; 1333 } else if(r == 0) { 1334 /* closed */ 1335 if(info) dtio_stop_flush_exit(info); 1336 dtio_del_output_event(dtio); 1337 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1338 dtio_close_output(dtio); 1339 return 0; 1340 } else if(want == SSL_ERROR_SYSCALL) { 1341 /* SYSCALL and errno==0 means closed uncleanly */ 1342 int silent = 0; 1343 #ifdef EPIPE 1344 if(errno == EPIPE && verbosity < 2) 1345 silent = 1; /* silence 'broken pipe' */ 1346 #endif 1347 #ifdef ECONNRESET 1348 if(errno == ECONNRESET && verbosity < 2) 1349 silent = 1; /* silence reset by peer */ 1350 #endif 1351 if(errno == 0) 1352 silent = 1; 1353 if(!silent) 1354 log_err("dnstap io, SSL_handshake syscall: %s", 1355 strerror(errno)); 1356 /* closed */ 1357 if(info) dtio_stop_flush_exit(info); 1358 dtio_del_output_event(dtio); 1359 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1360 dtio_close_output(dtio); 1361 return 0; 1362 } else { 1363 unsigned long err = ERR_get_error(); 1364 if(!squelch_err_ssl_handshake(err)) { 1365 log_crypto_err_code("dnstap io, ssl handshake failed", 1366 err); 1367 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1368 "from %s", dtio->ip_str); 1369 } 1370 /* closed */ 1371 if(info) dtio_stop_flush_exit(info); 1372 dtio_del_output_event(dtio); 1373 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1374 dtio_close_output(dtio); 1375 return 0; 1376 } 1377 1378 } 1379 /* check peer verification */ 1380 dtio->ssl_handshake_done = 1; 1381 1382 if(!dtio_ssl_check_peer(dtio)) { 1383 /* closed */ 1384 if(info) dtio_stop_flush_exit(info); 1385 dtio_del_output_event(dtio); 1386 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1387 dtio_close_output(dtio); 1388 return 0; 1389 } 1390 return 1; 1391 } 1392 #endif /* HAVE_SSL */ 1393 1394 /** callback for the dnstap events, to write to the output */ 1395 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1396 { 1397 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1398 int i; 1399 1400 if(dtio->check_nb_connect) { 1401 int connect_err = dtio_check_nb_connect(dtio); 1402 if(connect_err == -1) { 1403 /* close the channel */ 1404 dtio_del_output_event(dtio); 1405 dtio_close_output(dtio); 1406 return; 1407 } else if(connect_err == 0) { 1408 /* try again later */ 1409 return; 1410 } 1411 /* nonblocking connect check passed, continue */ 1412 } 1413 1414 #ifdef HAVE_SSL 1415 if(dtio->ssl && 1416 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1417 if(!dtio_ssl_handshake(dtio, NULL)) 1418 return; 1419 } 1420 #endif 1421 1422 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1423 if(dtio->ssl_brief_write) 1424 (void)dtio_disable_brief_write(dtio); 1425 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1426 if(dtio_read_accept_frame(dtio) <= 0) 1427 return; 1428 } else if(!dtio_check_close(dtio)) 1429 return; 1430 } 1431 1432 /* loop to process a number of messages. This improves throughput, 1433 * because selecting on write-event if not needed for busy messages 1434 * (dnstap log) generation and if they need to all be written back. 1435 * The write event is usually not blocked up. But not forever, 1436 * because the event loop needs to stay responsive for other events. 1437 * If there are no (more) messages, or if the output buffers get 1438 * full, it returns out of the loop. */ 1439 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1440 /* see if there are messages that need writing */ 1441 if(!dtio->cur_msg) { 1442 if(!dtio_find_msg(dtio)) { 1443 if(i == 0) { 1444 /* no messages on the first iteration, 1445 * the queues are all empty */ 1446 dtio_sleep(dtio); 1447 } 1448 return; /* nothing to do */ 1449 } 1450 } 1451 1452 /* write it */ 1453 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1454 if(!dtio_write_more(dtio)) 1455 return; 1456 } 1457 1458 /* done with the current message */ 1459 dtio_cur_msg_free(dtio); 1460 1461 /* If this is a bidirectional stream the first message will be 1462 * the READY control frame. We can only continue writing after 1463 * receiving an ACCEPT control frame. */ 1464 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1465 dtio->ready_frame_sent = 1; 1466 (void)dtio_add_output_event_read(dtio); 1467 break; 1468 } 1469 } 1470 } 1471 1472 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1473 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1474 { 1475 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1476 uint8_t cmd; 1477 ssize_t r; 1478 if(dtio->want_to_exit) 1479 return; 1480 r = read(fd, &cmd, sizeof(cmd)); 1481 if(r == -1) { 1482 #ifndef USE_WINSOCK 1483 if(errno == EINTR || errno == EAGAIN) 1484 return; /* ignore this */ 1485 log_err("dnstap io: failed to read: %s", strerror(errno)); 1486 #else 1487 if(WSAGetLastError() == WSAEINPROGRESS) 1488 return; 1489 if(WSAGetLastError() == WSAEWOULDBLOCK) 1490 return; 1491 log_err("dnstap io: failed to read: %s", 1492 wsa_strerror(WSAGetLastError())); 1493 #endif 1494 /* and then fall through to quit the thread */ 1495 } else if(r == 0) { 1496 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1497 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1498 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1499 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1500 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1501 1502 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1503 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1504 "waiting for ACCEPT control frame"); 1505 return; 1506 } 1507 1508 /* reregister event */ 1509 if(!dtio_add_output_event_write(dtio)) 1510 return; 1511 return; 1512 } else if(r == 1) { 1513 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1514 } 1515 dtio->want_to_exit = 1; 1516 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1517 != 0) { 1518 log_err("dnstap io: could not loopexit"); 1519 } 1520 } 1521 1522 #ifndef THREADS_DISABLED 1523 /** setup the event base for the dnstap io thread */ 1524 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1525 struct timeval* now) 1526 { 1527 memset(now, 0, sizeof(*now)); 1528 dtio->event_base = ub_default_event_base(0, secs, now); 1529 if(!dtio->event_base) { 1530 fatal_exit("dnstap io: could not create event_base"); 1531 } 1532 } 1533 #endif /* THREADS_DISABLED */ 1534 1535 /** setup the cmd event for dnstap io */ 1536 static void dtio_setup_cmd(struct dt_io_thread* dtio) 1537 { 1538 struct ub_event* cmdev; 1539 fd_set_nonblock(dtio->commandpipe[0]); 1540 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1541 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1542 if(!cmdev) { 1543 fatal_exit("dnstap io: out of memory"); 1544 } 1545 dtio->command_event = cmdev; 1546 if(ub_event_add(cmdev, NULL) != 0) { 1547 fatal_exit("dnstap io: out of memory (adding event)"); 1548 } 1549 } 1550 1551 /** setup the reconnect event for dnstap io */ 1552 static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1553 { 1554 dtio_reconnect_clear(dtio); 1555 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1556 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1557 if(!dtio->reconnect_timer) { 1558 fatal_exit("dnstap io: out of memory"); 1559 } 1560 } 1561 1562 /** 1563 * structure to keep track of information during stop flush 1564 */ 1565 struct stop_flush_info { 1566 /** the event base during stop flush */ 1567 struct ub_event_base* base; 1568 /** did we already want to exit this stop-flush event base */ 1569 int want_to_exit_flush; 1570 /** has the timer fired */ 1571 int timer_done; 1572 /** the dtio */ 1573 struct dt_io_thread* dtio; 1574 /** the stop control frame */ 1575 void* stop_frame; 1576 /** length of the stop frame */ 1577 size_t stop_frame_len; 1578 /** how much we have done of the stop frame */ 1579 size_t stop_frame_done; 1580 }; 1581 1582 /** exit the stop flush base */ 1583 static void dtio_stop_flush_exit(struct stop_flush_info* info) 1584 { 1585 if(info->want_to_exit_flush) 1586 return; 1587 info->want_to_exit_flush = 1; 1588 if(ub_event_base_loopexit(info->base) != 0) { 1589 log_err("dnstap io: could not loopexit"); 1590 } 1591 } 1592 1593 /** send the stop control, 1594 * return true if completed the frame. */ 1595 static int dtio_control_stop_send(struct stop_flush_info* info) 1596 { 1597 struct dt_io_thread* dtio = info->dtio; 1598 int r; 1599 if(info->stop_frame_done >= info->stop_frame_len) 1600 return 1; 1601 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1602 info->stop_frame_done, info->stop_frame_len - 1603 info->stop_frame_done); 1604 if(r == -1) { 1605 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1606 dtio_stop_flush_exit(info); 1607 return 0; 1608 } 1609 if(r == 0) { 1610 /* try again later, or timeout */ 1611 return 0; 1612 } 1613 info->stop_frame_done += r; 1614 if(info->stop_frame_done < info->stop_frame_len) 1615 return 0; /* not done yet */ 1616 return 1; 1617 } 1618 1619 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1620 void* arg) 1621 { 1622 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1623 if(info->want_to_exit_flush) 1624 return; 1625 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1626 info->timer_done = 1; 1627 dtio_stop_flush_exit(info); 1628 } 1629 1630 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1631 { 1632 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1633 struct dt_io_thread* dtio = info->dtio; 1634 if(info->want_to_exit_flush) 1635 return; 1636 if(dtio->check_nb_connect) { 1637 /* we don't start the stop_flush if connect still 1638 * in progress, but the check code is here, just in case */ 1639 int connect_err = dtio_check_nb_connect(dtio); 1640 if(connect_err == -1) { 1641 /* close the channel, exit the stop flush */ 1642 dtio_stop_flush_exit(info); 1643 dtio_del_output_event(dtio); 1644 dtio_close_output(dtio); 1645 return; 1646 } else if(connect_err == 0) { 1647 /* try again later */ 1648 return; 1649 } 1650 /* nonblocking connect check passed, continue */ 1651 } 1652 #ifdef HAVE_SSL 1653 if(dtio->ssl && 1654 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1655 if(!dtio_ssl_handshake(dtio, info)) 1656 return; 1657 } 1658 #endif 1659 1660 if((bits&UB_EV_READ)) { 1661 if(!dtio_check_close(dtio)) { 1662 if(dtio->fd == -1) { 1663 verbose(VERB_ALGO, "dnstap io: " 1664 "stop flush: output closed"); 1665 dtio_stop_flush_exit(info); 1666 } 1667 return; 1668 } 1669 } 1670 /* write remainder of last frame */ 1671 if(dtio->cur_msg) { 1672 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1673 if(!dtio_write_more(dtio)) { 1674 if(dtio->fd == -1) { 1675 verbose(VERB_ALGO, "dnstap io: " 1676 "stop flush: output closed"); 1677 dtio_stop_flush_exit(info); 1678 } 1679 return; 1680 } 1681 } 1682 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1683 "last frame"); 1684 dtio_cur_msg_free(dtio); 1685 } 1686 /* write stop frame */ 1687 if(info->stop_frame_done < info->stop_frame_len) { 1688 if(!dtio_control_stop_send(info)) 1689 return; 1690 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1691 "stop control frame"); 1692 } 1693 /* when last frame and stop frame are sent, exit */ 1694 dtio_stop_flush_exit(info); 1695 } 1696 1697 /** flush at end, last packet and stop control */ 1698 static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1699 { 1700 /* briefly attempt to flush the previous packet to the output, 1701 * this could be a partial packet, or even the start control frame */ 1702 time_t secs = 0; 1703 struct timeval now; 1704 struct stop_flush_info info; 1705 struct timeval tv; 1706 struct ub_event* timer, *stopev; 1707 1708 if(dtio->fd == -1 || dtio->check_nb_connect) { 1709 /* no connection or we have just connected, so nothing is 1710 * sent yet, so nothing to stop or flush */ 1711 return; 1712 } 1713 if(dtio->ssl && !dtio->ssl_handshake_done) { 1714 /* no SSL connection has been established yet */ 1715 return; 1716 } 1717 1718 memset(&info, 0, sizeof(info)); 1719 memset(&now, 0, sizeof(now)); 1720 info.dtio = dtio; 1721 info.base = ub_default_event_base(0, &secs, &now); 1722 if(!info.base) { 1723 log_err("dnstap io: malloc failure"); 1724 return; 1725 } 1726 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1727 &dtio_stop_timer_cb, &info); 1728 if(!timer) { 1729 log_err("dnstap io: malloc failure"); 1730 ub_event_base_free(info.base); 1731 return; 1732 } 1733 memset(&tv, 0, sizeof(tv)); 1734 tv.tv_sec = 2; 1735 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1736 &tv) != 0) { 1737 log_err("dnstap io: cannot event_timer_add"); 1738 ub_event_free(timer); 1739 ub_event_base_free(info.base); 1740 return; 1741 } 1742 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1743 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1744 if(!stopev) { 1745 log_err("dnstap io: malloc failure"); 1746 ub_timer_del(timer); 1747 ub_event_free(timer); 1748 ub_event_base_free(info.base); 1749 return; 1750 } 1751 if(ub_event_add(stopev, NULL) != 0) { 1752 log_err("dnstap io: cannot event_add"); 1753 ub_event_free(stopev); 1754 ub_timer_del(timer); 1755 ub_event_free(timer); 1756 ub_event_base_free(info.base); 1757 return; 1758 } 1759 info.stop_frame = fstrm_create_control_frame_stop( 1760 &info.stop_frame_len); 1761 if(!info.stop_frame) { 1762 log_err("dnstap io: malloc failure"); 1763 ub_event_del(stopev); 1764 ub_event_free(stopev); 1765 ub_timer_del(timer); 1766 ub_event_free(timer); 1767 ub_event_base_free(info.base); 1768 return; 1769 } 1770 dtio->stop_flush_event = stopev; 1771 1772 /* wait briefly, or until finished */ 1773 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1774 if(ub_event_base_dispatch(info.base) < 0) { 1775 log_err("dnstap io: dispatch flush failed, errno is %s", 1776 strerror(errno)); 1777 } 1778 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1779 free(info.stop_frame); 1780 dtio->stop_flush_event = NULL; 1781 ub_event_del(stopev); 1782 ub_event_free(stopev); 1783 ub_timer_del(timer); 1784 ub_event_free(timer); 1785 ub_event_base_free(info.base); 1786 } 1787 1788 /** perform desetup and free stuff when the dnstap io thread exits */ 1789 static void dtio_desetup(struct dt_io_thread* dtio) 1790 { 1791 dtio_control_stop_flush(dtio); 1792 dtio_del_output_event(dtio); 1793 dtio_close_output(dtio); 1794 ub_event_del(dtio->command_event); 1795 ub_event_free(dtio->command_event); 1796 #ifndef USE_WINSOCK 1797 close(dtio->commandpipe[0]); 1798 #else 1799 _close(dtio->commandpipe[0]); 1800 #endif 1801 dtio->commandpipe[0] = -1; 1802 dtio_reconnect_del(dtio); 1803 ub_event_free(dtio->reconnect_timer); 1804 dtio_cur_msg_free(dtio); 1805 #ifndef THREADS_DISABLED 1806 ub_event_base_free(dtio->event_base); 1807 #endif 1808 } 1809 1810 /** setup a start control message */ 1811 static int dtio_control_start_send(struct dt_io_thread* dtio) 1812 { 1813 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1814 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1815 &dtio->cur_msg_len); 1816 if(!dtio->cur_msg) { 1817 return 0; 1818 } 1819 /* setup to send the control message */ 1820 /* set that the buffer needs to be sent, but the length 1821 * of that buffer is already written, that way the buffer can 1822 * start with 0 length and then the length of the control frame 1823 * in it */ 1824 dtio->cur_msg_done = 0; 1825 dtio->cur_msg_len_done = 4; 1826 return 1; 1827 } 1828 1829 /** setup a ready control message */ 1830 static int dtio_control_ready_send(struct dt_io_thread* dtio) 1831 { 1832 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1833 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1834 &dtio->cur_msg_len); 1835 if(!dtio->cur_msg) { 1836 return 0; 1837 } 1838 /* setup to send the control message */ 1839 /* set that the buffer needs to be sent, but the length 1840 * of that buffer is already written, that way the buffer can 1841 * start with 0 length and then the length of the control frame 1842 * in it */ 1843 dtio->cur_msg_done = 0; 1844 dtio->cur_msg_len_done = 4; 1845 return 1; 1846 } 1847 1848 /** open the output file descriptor for af_local */ 1849 static int dtio_open_output_local(struct dt_io_thread* dtio) 1850 { 1851 #ifdef HAVE_SYS_UN_H 1852 struct sockaddr_un s; 1853 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1854 if(dtio->fd == -1) { 1855 #ifndef USE_WINSOCK 1856 log_err("dnstap io: failed to create socket: %s", 1857 strerror(errno)); 1858 #else 1859 log_err("dnstap io: failed to create socket: %s", 1860 wsa_strerror(WSAGetLastError())); 1861 #endif 1862 return 0; 1863 } 1864 memset(&s, 0, sizeof(s)); 1865 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1866 /* this member exists on BSDs, not Linux */ 1867 s.sun_len = (unsigned)sizeof(s); 1868 #endif 1869 s.sun_family = AF_LOCAL; 1870 /* length is 92-108, 104 on FreeBSD */ 1871 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1872 fd_set_nonblock(dtio->fd); 1873 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1874 == -1) { 1875 char* to = dtio->socket_path; 1876 #ifndef USE_WINSOCK 1877 log_err("dnstap io: failed to connect to \"%s\": %s", 1878 to, strerror(errno)); 1879 #else 1880 log_err("dnstap io: failed to connect to \"%s\": %s", 1881 to, wsa_strerror(WSAGetLastError())); 1882 #endif 1883 dtio_close_fd(dtio); 1884 return 0; 1885 } 1886 return 1; 1887 #else 1888 log_err("cannot create af_local socket"); 1889 return 0; 1890 #endif /* HAVE_SYS_UN_H */ 1891 } 1892 1893 /** open the output file descriptor for af_inet and af_inet6 */ 1894 static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1895 { 1896 struct sockaddr_storage addr; 1897 socklen_t addrlen; 1898 memset(&addr, 0, sizeof(addr)); 1899 addrlen = (socklen_t)sizeof(addr); 1900 1901 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1902 log_err("could not parse IP '%s'", dtio->ip_str); 1903 return 0; 1904 } 1905 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1906 if(dtio->fd == -1) { 1907 #ifndef USE_WINSOCK 1908 log_err("can't create socket: %s", strerror(errno)); 1909 #else 1910 log_err("can't create socket: %s", 1911 wsa_strerror(WSAGetLastError())); 1912 #endif 1913 return 0; 1914 } 1915 fd_set_nonblock(dtio->fd); 1916 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1917 if(errno == EINPROGRESS) 1918 return 1; /* wait until connect done*/ 1919 #ifndef USE_WINSOCK 1920 if(tcp_connect_errno_needs_log( 1921 (struct sockaddr *)&addr, addrlen)) { 1922 log_err("dnstap io: failed to connect to %s: %s", 1923 dtio->ip_str, strerror(errno)); 1924 } 1925 #else 1926 if(WSAGetLastError() == WSAEINPROGRESS || 1927 WSAGetLastError() == WSAEWOULDBLOCK) 1928 return 1; /* wait until connect done*/ 1929 if(tcp_connect_errno_needs_log( 1930 (struct sockaddr *)&addr, addrlen)) { 1931 log_err("dnstap io: failed to connect to %s: %s", 1932 dtio->ip_str, wsa_strerror(WSAGetLastError())); 1933 } 1934 #endif 1935 dtio_close_fd(dtio); 1936 return 0; 1937 } 1938 return 1; 1939 } 1940 1941 /** setup the SSL structure for new connection */ 1942 static int dtio_setup_ssl(struct dt_io_thread* dtio) 1943 { 1944 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 1945 if(!dtio->ssl) return 0; 1946 dtio->ssl_handshake_done = 0; 1947 dtio->ssl_brief_read = 0; 1948 1949 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 1950 dtio->tls_use_sni)) { 1951 return 0; 1952 } 1953 return 1; 1954 } 1955 1956 /** open the output file descriptor */ 1957 static void dtio_open_output(struct dt_io_thread* dtio) 1958 { 1959 struct ub_event* ev; 1960 if(dtio->upstream_is_unix) { 1961 if(!dtio_open_output_local(dtio)) { 1962 dtio_reconnect_enable(dtio); 1963 return; 1964 } 1965 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 1966 if(!dtio_open_output_tcp(dtio)) { 1967 dtio_reconnect_enable(dtio); 1968 return; 1969 } 1970 if(dtio->upstream_is_tls) { 1971 if(!dtio_setup_ssl(dtio)) { 1972 dtio_close_fd(dtio); 1973 dtio_reconnect_enable(dtio); 1974 return; 1975 } 1976 } 1977 } 1978 dtio->check_nb_connect = 1; 1979 1980 /* the EV_READ is to read ACCEPT control messages, and catch channel 1981 * close. EV_WRITE is to write packets */ 1982 ev = ub_event_new(dtio->event_base, dtio->fd, 1983 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 1984 dtio); 1985 if(!ev) { 1986 log_err("dnstap io: out of memory"); 1987 if(dtio->ssl) { 1988 #ifdef HAVE_SSL 1989 SSL_free(dtio->ssl); 1990 dtio->ssl = NULL; 1991 #endif 1992 } 1993 dtio_close_fd(dtio); 1994 dtio_reconnect_enable(dtio); 1995 return; 1996 } 1997 dtio->event = ev; 1998 1999 /* setup protocol control message to start */ 2000 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2001 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2002 log_err("dnstap io: out of memory"); 2003 ub_event_free(dtio->event); 2004 dtio->event = NULL; 2005 if(dtio->ssl) { 2006 #ifdef HAVE_SSL 2007 SSL_free(dtio->ssl); 2008 dtio->ssl = NULL; 2009 #endif 2010 } 2011 dtio_close_fd(dtio); 2012 dtio_reconnect_enable(dtio); 2013 return; 2014 } 2015 } 2016 2017 /** perform the setup of the writer thread on the established event_base */ 2018 static void dtio_setup_on_base(struct dt_io_thread* dtio) 2019 { 2020 dtio_setup_cmd(dtio); 2021 dtio_setup_reconnect(dtio); 2022 dtio_open_output(dtio); 2023 if(!dtio_add_output_event_write(dtio)) 2024 return; 2025 } 2026 2027 #ifndef THREADS_DISABLED 2028 /** the IO thread function for the DNSTAP IO */ 2029 static void* dnstap_io(void* arg) 2030 { 2031 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2032 time_t secs = 0; 2033 struct timeval now; 2034 log_thread_set(&dtio->threadnum); 2035 2036 /* setup */ 2037 verbose(VERB_ALGO, "start dnstap io thread"); 2038 dtio_setup_base(dtio, &secs, &now); 2039 dtio_setup_on_base(dtio); 2040 2041 /* run */ 2042 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2043 log_err("dnstap io: dispatch failed, errno is %s", 2044 strerror(errno)); 2045 } 2046 2047 /* cleanup */ 2048 verbose(VERB_ALGO, "stop dnstap io thread"); 2049 dtio_desetup(dtio); 2050 return NULL; 2051 } 2052 #endif /* THREADS_DISABLED */ 2053 2054 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2055 int numworkers) 2056 { 2057 /* set up the thread, can fail */ 2058 #ifndef USE_WINSOCK 2059 if(pipe(dtio->commandpipe) == -1) { 2060 log_err("failed to create pipe: %s", strerror(errno)); 2061 return 0; 2062 } 2063 #else 2064 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2065 log_err("failed to create _pipe: %s", 2066 wsa_strerror(WSAGetLastError())); 2067 return 0; 2068 } 2069 #endif 2070 2071 /* start the thread */ 2072 dtio->threadnum = numworkers+1; 2073 dtio->started = 1; 2074 #ifndef THREADS_DISABLED 2075 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2076 (void)event_base_nothr; 2077 #else 2078 dtio->event_base = event_base_nothr; 2079 dtio_setup_on_base(dtio); 2080 #endif 2081 return 1; 2082 } 2083 2084 void dt_io_thread_stop(struct dt_io_thread* dtio) 2085 { 2086 #ifndef THREADS_DISABLED 2087 uint8_t cmd = DTIO_COMMAND_STOP; 2088 #endif 2089 if(!dtio) return; 2090 if(!dtio->started) return; 2091 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2092 2093 #ifndef THREADS_DISABLED 2094 while(1) { 2095 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2096 if(r == -1) { 2097 #ifndef USE_WINSOCK 2098 if(errno == EINTR || errno == EAGAIN) 2099 continue; 2100 log_err("dnstap io stop: write: %s", strerror(errno)); 2101 #else 2102 if(WSAGetLastError() == WSAEINPROGRESS) 2103 continue; 2104 if(WSAGetLastError() == WSAEWOULDBLOCK) 2105 continue; 2106 log_err("dnstap io stop: write: %s", 2107 wsa_strerror(WSAGetLastError())); 2108 #endif 2109 break; 2110 } 2111 break; 2112 } 2113 dtio->started = 0; 2114 #endif /* THREADS_DISABLED */ 2115 2116 #ifndef USE_WINSOCK 2117 close(dtio->commandpipe[1]); 2118 #else 2119 _close(dtio->commandpipe[1]); 2120 #endif 2121 dtio->commandpipe[1] = -1; 2122 #ifndef THREADS_DISABLED 2123 ub_thread_join(dtio->tid); 2124 #else 2125 dtio->want_to_exit = 1; 2126 dtio_desetup(dtio); 2127 #endif 2128 } 2129