1 /* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #include "config.h" 45 #include "dnstap/dtstream.h" 46 #include "dnstap/dnstap_fstrm.h" 47 #include "util/config_file.h" 48 #include "util/ub_event.h" 49 #include "util/net_help.h" 50 #include "services/outside_network.h" 51 #include "sldns/sbuffer.h" 52 #ifdef HAVE_SYS_UN_H 53 #include <sys/un.h> 54 #endif 55 #include <fcntl.h> 56 #ifdef HAVE_OPENSSL_SSL_H 57 #include <openssl/ssl.h> 58 #endif 59 #ifdef HAVE_OPENSSL_ERR_H 60 #include <openssl/err.h> 61 #endif 62 63 /** number of messages to process in one output callback */ 64 #define DTIO_MESSAGES_PER_CALLBACK 100 65 /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66 #define DTIO_RECONNECT_TIMEOUT_MIN 10 67 /** the msec to wait for reconnect max after backoff */ 68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71 /** number of messages before wakeup of thread */ 72 #define DTIO_MSG_FOR_WAKEUP 32 73 74 /** maximum length of received frame */ 75 #define DTIO_RECV_FRAME_MAX_LEN 1000 76 77 struct stop_flush_info; 78 /** DTIO command channel commands */ 79 enum { 80 /** DTIO command channel stop */ 81 DTIO_COMMAND_STOP = 0, 82 /** DTIO command channel wakeup */ 83 DTIO_COMMAND_WAKEUP = 1 84 } dtio_channel_command; 85 86 /** open the output channel */ 87 static void dtio_open_output(struct dt_io_thread* dtio); 88 /** add output event for read and write */ 89 static int dtio_add_output_event_write(struct dt_io_thread* dtio); 90 /** start reconnection attempts */ 91 static void dtio_reconnect_enable(struct dt_io_thread* dtio); 92 /** stop from stop_flush event loop */ 93 static void dtio_stop_flush_exit(struct stop_flush_info* info); 94 /** setup a start control message */ 95 static int dtio_control_start_send(struct dt_io_thread* dtio); 96 #ifdef HAVE_SSL 97 /** enable briefly waiting for a read event, for SSL negotiation */ 98 static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99 /** enable briefly waiting for a write event, for SSL negotiation */ 100 static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101 #endif 102 103 struct dt_msg_queue* 104 dt_msg_queue_create(struct comm_base* base) 105 { 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ 111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119 } 120 121 /** clear the message list, caller must hold the lock */ 122 static void 123 dt_msg_queue_clear(struct dt_msg_queue* mq) 124 { 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; 135 mq->msgcount = 0; 136 } 137 138 void 139 dt_msg_queue_delete(struct dt_msg_queue* mq) 140 { 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); 144 comm_timer_delete(mq->wakeup_timer); 145 free(mq); 146 } 147 148 /** make the dtio wake up by sending a wakeup command */ 149 static void dtio_wakeup(struct dt_io_thread* dtio) 150 { 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158 #ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; 161 #else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; 166 #endif 167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); 169 break; 170 } 171 break; 172 } 173 } 174 175 void 176 mq_wakeup_cb(void* arg) 177 { 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 /* even if the dtio is already active, because perhaps much 180 * traffic suddenly, we leave the timer running to save on 181 * managing it, the once a second timer is less work then 182 * starting and stopping the timer frequently */ 183 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184 mq->dtio->wakeup_timer_enabled = 0; 185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186 dtio_wakeup(mq->dtio); 187 } 188 189 /** start timer to wakeup dtio because there is content in the queue */ 190 static void 191 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow) 192 { 193 struct timeval tv = {0}; 194 /* Start a timer to process messages to be logged. 195 * If we woke up the dtio thread for every message, the wakeup 196 * messages take up too much processing power. If the queue 197 * fills up the wakeup happens immediately. The timer wakes it up 198 * if there are infrequent messages to log. */ 199 200 /* we cannot start a timer in dtio thread, because it is a different 201 * thread and its event base is in use by the other thread, it would 202 * give race conditions if we tried to modify its event base, 203 * and locks would wait until it woke up, and this is what we do. */ 204 205 /* do not start the timer if a timer already exists, perhaps 206 * in another worker. So this variable is protected by a lock in 207 * dtio. */ 208 209 /* If we need to wakeupnow, 0 the timer to force the callback. */ 210 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 211 if(mq->dtio->wakeup_timer_enabled) { 212 if(wakeupnow) { 213 comm_timer_set(mq->wakeup_timer, &tv); 214 } 215 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 216 return; 217 } 218 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 219 220 /* start the timer, in mq, in the event base of our worker */ 221 if(!wakeupnow) { 222 tv.tv_sec = 1; 223 tv.tv_usec = 0; 224 } 225 comm_timer_set(mq->wakeup_timer, &tv); 226 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 227 } 228 229 void 230 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 231 { 232 int wakeupnow = 0, wakeupstarttimer = 0; 233 struct dt_msg_entry* entry; 234 235 /* check conditions */ 236 if(!buf) return; 237 if(len == 0) { 238 /* it is not possible to log entries with zero length, 239 * because the framestream protocol does not carry it. 240 * However the protobuf serialization does not create zero 241 * length datagrams for dnstap, so this should not happen. */ 242 free(buf); 243 return; 244 } 245 if(!mq) { 246 free(buf); 247 return; 248 } 249 250 /* allocate memory for queue entry */ 251 entry = malloc(sizeof(*entry)); 252 if(!entry) { 253 log_err("out of memory logging dnstap"); 254 free(buf); 255 return; 256 } 257 entry->next = NULL; 258 entry->buf = buf; 259 entry->len = len; 260 261 /* acquire lock */ 262 lock_basic_lock(&mq->lock); 263 /* if list was empty, start timer for (eventual) wakeup */ 264 if(mq->first == NULL) 265 wakeupstarttimer = 1; 266 /* if list contains more than wakeupnum elements, wakeup now, 267 * or if list is (going to be) almost full */ 268 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 269 (mq->cursize < mq->maxsize * 9 / 10 && 270 mq->cursize+len >= mq->maxsize * 9 / 10)) 271 wakeupnow = 1; 272 /* see if it is going to fit */ 273 if(mq->cursize + len > mq->maxsize) { 274 /* buffer full, or congested. */ 275 /* drop */ 276 lock_basic_unlock(&mq->lock); 277 free(buf); 278 free(entry); 279 return; 280 } 281 mq->cursize += len; 282 mq->msgcount ++; 283 /* append to list */ 284 if(mq->last) { 285 mq->last->next = entry; 286 } else { 287 mq->first = entry; 288 } 289 mq->last = entry; 290 /* release lock */ 291 lock_basic_unlock(&mq->lock); 292 293 if(wakeupnow || wakeupstarttimer) { 294 dt_msg_queue_start_timer(mq, wakeupnow); 295 } 296 } 297 298 struct dt_io_thread* dt_io_thread_create(void) 299 { 300 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 301 lock_basic_init(&dtio->wakeup_timer_lock); 302 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 303 sizeof(dtio->wakeup_timer_enabled)); 304 return dtio; 305 } 306 307 void dt_io_thread_delete(struct dt_io_thread* dtio) 308 { 309 struct dt_io_list_item* item, *nextitem; 310 if(!dtio) return; 311 lock_basic_destroy(&dtio->wakeup_timer_lock); 312 item=dtio->io_list; 313 while(item) { 314 nextitem = item->next; 315 free(item); 316 item = nextitem; 317 } 318 free(dtio->socket_path); 319 free(dtio->ip_str); 320 free(dtio->tls_server_name); 321 free(dtio->client_key_file); 322 free(dtio->client_cert_file); 323 if(dtio->ssl_ctx) { 324 #ifdef HAVE_SSL 325 SSL_CTX_free(dtio->ssl_ctx); 326 #endif 327 } 328 free(dtio); 329 } 330 331 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 332 { 333 if(!cfg->dnstap) { 334 log_warn("cannot setup dnstap because dnstap-enable is no"); 335 return 0; 336 } 337 338 /* what type of connectivity do we have */ 339 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 340 if(cfg->dnstap_tls) 341 dtio->upstream_is_tls = 1; 342 else dtio->upstream_is_tcp = 1; 343 } else { 344 dtio->upstream_is_unix = 1; 345 } 346 dtio->is_bidirectional = cfg->dnstap_bidirectional; 347 348 if(dtio->upstream_is_unix) { 349 char* nm; 350 if(!cfg->dnstap_socket_path || 351 cfg->dnstap_socket_path[0]==0) { 352 log_err("dnstap setup: no dnstap-socket-path for " 353 "socket connect"); 354 return 0; 355 } 356 nm = cfg->dnstap_socket_path; 357 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 358 cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 359 nm += strlen(cfg->chrootdir); 360 free(dtio->socket_path); 361 dtio->socket_path = strdup(nm); 362 if(!dtio->socket_path) { 363 log_err("dnstap setup: malloc failure"); 364 return 0; 365 } 366 } 367 368 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 369 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 370 log_err("dnstap setup: no dnstap-ip for TCP connect"); 371 return 0; 372 } 373 free(dtio->ip_str); 374 dtio->ip_str = strdup(cfg->dnstap_ip); 375 if(!dtio->ip_str) { 376 log_err("dnstap setup: malloc failure"); 377 return 0; 378 } 379 } 380 381 if(dtio->upstream_is_tls) { 382 #ifdef HAVE_SSL 383 if(cfg->dnstap_tls_server_name && 384 cfg->dnstap_tls_server_name[0]) { 385 free(dtio->tls_server_name); 386 dtio->tls_server_name = strdup( 387 cfg->dnstap_tls_server_name); 388 if(!dtio->tls_server_name) { 389 log_err("dnstap setup: malloc failure"); 390 return 0; 391 } 392 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 393 return 0; 394 } 395 if(cfg->dnstap_tls_client_key_file && 396 cfg->dnstap_tls_client_key_file[0]) { 397 dtio->use_client_certs = 1; 398 free(dtio->client_key_file); 399 dtio->client_key_file = strdup( 400 cfg->dnstap_tls_client_key_file); 401 if(!dtio->client_key_file) { 402 log_err("dnstap setup: malloc failure"); 403 return 0; 404 } 405 if(!cfg->dnstap_tls_client_cert_file || 406 cfg->dnstap_tls_client_cert_file[0]==0) { 407 log_err("dnstap setup: client key " 408 "authentication enabled with " 409 "dnstap-tls-client-key-file, but " 410 "no dnstap-tls-client-cert-file " 411 "is given"); 412 return 0; 413 } 414 free(dtio->client_cert_file); 415 dtio->client_cert_file = strdup( 416 cfg->dnstap_tls_client_cert_file); 417 if(!dtio->client_cert_file) { 418 log_err("dnstap setup: malloc failure"); 419 return 0; 420 } 421 } else { 422 dtio->use_client_certs = 0; 423 dtio->client_key_file = NULL; 424 dtio->client_cert_file = NULL; 425 } 426 427 if(cfg->dnstap_tls_cert_bundle) { 428 dtio->ssl_ctx = connect_sslctx_create( 429 dtio->client_key_file, 430 dtio->client_cert_file, 431 cfg->dnstap_tls_cert_bundle, 0); 432 } else { 433 dtio->ssl_ctx = connect_sslctx_create( 434 dtio->client_key_file, 435 dtio->client_cert_file, 436 cfg->tls_cert_bundle, cfg->tls_win_cert); 437 } 438 if(!dtio->ssl_ctx) { 439 log_err("could not setup SSL CTX"); 440 return 0; 441 } 442 dtio->tls_use_sni = cfg->tls_use_sni; 443 #endif /* HAVE_SSL */ 444 } 445 return 1; 446 } 447 448 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 449 struct dt_msg_queue* mq) 450 { 451 struct dt_io_list_item* item = malloc(sizeof(*item)); 452 if(!item) return 0; 453 lock_basic_lock(&mq->lock); 454 mq->dtio = dtio; 455 lock_basic_unlock(&mq->lock); 456 item->queue = mq; 457 item->next = dtio->io_list; 458 dtio->io_list = item; 459 dtio->io_list_iter = NULL; 460 return 1; 461 } 462 463 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 464 struct dt_msg_queue* mq) 465 { 466 struct dt_io_list_item* item, *prev=NULL; 467 if(!dtio) return; 468 item = dtio->io_list; 469 while(item) { 470 if(item->queue == mq) { 471 /* found it */ 472 if(prev) prev->next = item->next; 473 else dtio->io_list = item->next; 474 /* the queue itself only registered, not deleted */ 475 lock_basic_lock(&item->queue->lock); 476 item->queue->dtio = NULL; 477 lock_basic_unlock(&item->queue->lock); 478 free(item); 479 dtio->io_list_iter = NULL; 480 return; 481 } 482 prev = item; 483 item = item->next; 484 } 485 } 486 487 /** pick a message from the queue, the routine locks and unlocks, 488 * returns true if there is a message */ 489 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 490 size_t* len) 491 { 492 lock_basic_lock(&mq->lock); 493 if(mq->first) { 494 struct dt_msg_entry* entry = mq->first; 495 mq->first = entry->next; 496 if(!entry->next) mq->last = NULL; 497 mq->cursize -= entry->len; 498 mq->msgcount --; 499 lock_basic_unlock(&mq->lock); 500 501 *buf = entry->buf; 502 *len = entry->len; 503 free(entry); 504 return 1; 505 } 506 lock_basic_unlock(&mq->lock); 507 return 0; 508 } 509 510 /** find message in queue, false if no message, true if message to send */ 511 static int dtio_find_in_queue(struct dt_io_thread* dtio, 512 struct dt_msg_queue* mq) 513 { 514 void* buf=NULL; 515 size_t len=0; 516 if(dt_msg_queue_pop(mq, &buf, &len)) { 517 dtio->cur_msg = buf; 518 dtio->cur_msg_len = len; 519 dtio->cur_msg_done = 0; 520 dtio->cur_msg_len_done = 0; 521 return 1; 522 } 523 return 0; 524 } 525 526 /** find a new message to write, search message queues, false if none */ 527 static int dtio_find_msg(struct dt_io_thread* dtio) 528 { 529 struct dt_io_list_item *spot, *item; 530 531 spot = dtio->io_list_iter; 532 /* use the next queue for the next message lookup, 533 * if we hit the end(NULL) the NULL restarts the iter at start. */ 534 if(spot) 535 dtio->io_list_iter = spot->next; 536 else if(dtio->io_list) 537 dtio->io_list_iter = dtio->io_list->next; 538 539 /* scan from spot to end-of-io_list */ 540 item = spot; 541 while(item) { 542 if(dtio_find_in_queue(dtio, item->queue)) 543 return 1; 544 item = item->next; 545 } 546 /* scan starting at the start-of-list (to wrap around the end) */ 547 item = dtio->io_list; 548 while(item) { 549 if(dtio_find_in_queue(dtio, item->queue)) 550 return 1; 551 item = item->next; 552 } 553 return 0; 554 } 555 556 /** callback for the dnstap reconnect, to start reconnecting to output */ 557 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 558 short ATTR_UNUSED(bits), void* arg) 559 { 560 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 561 dtio->reconnect_is_added = 0; 562 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 563 564 dtio_open_output(dtio); 565 if(dtio->event) { 566 if(!dtio_add_output_event_write(dtio)) 567 return; 568 /* nothing wrong so far, wait on the output event */ 569 return; 570 } 571 /* exponential backoff and retry on timer */ 572 dtio_reconnect_enable(dtio); 573 } 574 575 /** attempt to reconnect to the output, after a timeout */ 576 static void dtio_reconnect_enable(struct dt_io_thread* dtio) 577 { 578 struct timeval tv; 579 int msec; 580 if(dtio->want_to_exit) return; 581 if(dtio->reconnect_is_added) 582 return; /* already done */ 583 584 /* exponential backoff, store the value for next timeout */ 585 msec = dtio->reconnect_timeout; 586 if(msec == 0) { 587 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 588 } else { 589 dtio->reconnect_timeout = msec*2; 590 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 591 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 592 } 593 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 594 msec); 595 596 /* setup wait timer */ 597 memset(&tv, 0, sizeof(tv)); 598 tv.tv_sec = msec/1000; 599 tv.tv_usec = (msec%1000)*1000; 600 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 601 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 602 log_err("dnstap io: could not reconnect ev timer add"); 603 return; 604 } 605 dtio->reconnect_is_added = 1; 606 } 607 608 /** remove dtio reconnect timer */ 609 static void dtio_reconnect_del(struct dt_io_thread* dtio) 610 { 611 if(!dtio->reconnect_is_added) 612 return; 613 ub_timer_del(dtio->reconnect_timer); 614 dtio->reconnect_is_added = 0; 615 } 616 617 /** clear the reconnect exponential backoff timer. 618 * We have successfully connected so we can try again with short timeouts. */ 619 static void dtio_reconnect_clear(struct dt_io_thread* dtio) 620 { 621 dtio->reconnect_timeout = 0; 622 dtio_reconnect_del(dtio); 623 } 624 625 /** reconnect slowly, because we already know we have to wait for a bit */ 626 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 627 { 628 dtio_reconnect_del(dtio); 629 dtio->reconnect_timeout = msec; 630 dtio_reconnect_enable(dtio); 631 } 632 633 /** delete the current message in the dtio, and reset counters */ 634 static void dtio_cur_msg_free(struct dt_io_thread* dtio) 635 { 636 free(dtio->cur_msg); 637 dtio->cur_msg = NULL; 638 dtio->cur_msg_len = 0; 639 dtio->cur_msg_done = 0; 640 dtio->cur_msg_len_done = 0; 641 } 642 643 /** delete the buffer and counters used to read frame */ 644 static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 645 { 646 if(rb->buf) { 647 free(rb->buf); 648 rb->buf = NULL; 649 } 650 rb->buf_count = 0; 651 rb->buf_cap = 0; 652 rb->frame_len = 0; 653 rb->frame_len_done = 0; 654 rb->control_frame = 0; 655 } 656 657 /** del the output file descriptor event for listening */ 658 static void dtio_del_output_event(struct dt_io_thread* dtio) 659 { 660 if(!dtio->event_added) 661 return; 662 ub_event_del(dtio->event); 663 dtio->event_added = 0; 664 dtio->event_added_is_write = 0; 665 } 666 667 /** close dtio socket and set it to -1 */ 668 static void dtio_close_fd(struct dt_io_thread* dtio) 669 { 670 sock_close(dtio->fd); 671 dtio->fd = -1; 672 } 673 674 /** close and stop the output file descriptor event */ 675 static void dtio_close_output(struct dt_io_thread* dtio) 676 { 677 if(!dtio->event) 678 return; 679 ub_event_free(dtio->event); 680 dtio->event = NULL; 681 if(dtio->ssl) { 682 #ifdef HAVE_SSL 683 SSL_shutdown(dtio->ssl); 684 SSL_free(dtio->ssl); 685 dtio->ssl = NULL; 686 #endif 687 } 688 dtio_close_fd(dtio); 689 690 /* if there is a (partial) message, discard it 691 * we cannot send (the remainder of) it, and a new 692 * connection needs to start with a control frame. */ 693 if(dtio->cur_msg) { 694 dtio_cur_msg_free(dtio); 695 } 696 697 dtio->ready_frame_sent = 0; 698 dtio->accept_frame_received = 0; 699 dtio_read_frame_free(&dtio->read_frame); 700 701 dtio_reconnect_enable(dtio); 702 } 703 704 /** check for pending nonblocking connect errors, 705 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 706 static int dtio_check_nb_connect(struct dt_io_thread* dtio) 707 { 708 int error = 0; 709 socklen_t len = (socklen_t)sizeof(error); 710 if(!dtio->check_nb_connect) 711 return 1; /* everything okay */ 712 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 713 &len) < 0) { 714 #ifndef USE_WINSOCK 715 error = errno; /* on solaris errno is error */ 716 #else 717 error = WSAGetLastError(); 718 #endif 719 } 720 #ifndef USE_WINSOCK 721 #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 722 if(error == EINPROGRESS || error == EWOULDBLOCK) 723 return 0; /* try again later */ 724 #endif 725 #else 726 if(error == WSAEINPROGRESS) { 727 return 0; /* try again later */ 728 } else if(error == WSAEWOULDBLOCK) { 729 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 730 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 731 return 0; /* try again later */ 732 } 733 #endif 734 if(error != 0) { 735 char* to = dtio->socket_path; 736 if(!to) to = dtio->ip_str; 737 if(!to) to = ""; 738 log_err("dnstap io: failed to connect to \"%s\": %s", 739 to, sock_strerror(error)); 740 return -1; /* error, close it */ 741 } 742 743 if(dtio->ip_str) 744 verbose(VERB_DETAIL, "dnstap io: connected to %s", 745 dtio->ip_str); 746 else if(dtio->socket_path) 747 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 748 dtio->socket_path); 749 dtio_reconnect_clear(dtio); 750 dtio->check_nb_connect = 0; 751 return 1; /* everything okay */ 752 } 753 754 #ifdef HAVE_SSL 755 /** write to ssl output 756 * returns number of bytes written, 0 if nothing happened, 757 * try again later, or -1 if the channel is to be closed. */ 758 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 759 size_t len) 760 { 761 int r; 762 ERR_clear_error(); 763 r = SSL_write(dtio->ssl, buf, len); 764 if(r <= 0) { 765 int want = SSL_get_error(dtio->ssl, r); 766 if(want == SSL_ERROR_ZERO_RETURN) { 767 /* closed */ 768 return -1; 769 } else if(want == SSL_ERROR_WANT_READ) { 770 /* we want a brief read event */ 771 dtio_enable_brief_read(dtio); 772 return 0; 773 } else if(want == SSL_ERROR_WANT_WRITE) { 774 /* write again later */ 775 return 0; 776 } else if(want == SSL_ERROR_SYSCALL) { 777 #ifdef EPIPE 778 if(errno == EPIPE && verbosity < 2) 779 return -1; /* silence 'broken pipe' */ 780 #endif 781 #ifdef ECONNRESET 782 if(errno == ECONNRESET && verbosity < 2) 783 return -1; /* silence reset by peer */ 784 #endif 785 if(errno != 0) { 786 log_err("dnstap io, SSL_write syscall: %s", 787 strerror(errno)); 788 } 789 return -1; 790 } 791 log_crypto_err("dnstap io, could not SSL_write"); 792 return -1; 793 } 794 return r; 795 } 796 #endif /* HAVE_SSL */ 797 798 /** write buffer to output. 799 * returns number of bytes written, 0 if nothing happened, 800 * try again later, or -1 if the channel is to be closed. */ 801 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 802 size_t len) 803 { 804 ssize_t ret; 805 if(dtio->fd == -1) 806 return -1; 807 #ifdef HAVE_SSL 808 if(dtio->ssl) 809 return dtio_write_ssl(dtio, buf, len); 810 #endif 811 ret = send(dtio->fd, (void*)buf, len, 0); 812 if(ret == -1) { 813 #ifndef USE_WINSOCK 814 if(errno == EINTR || errno == EAGAIN) 815 return 0; 816 #else 817 if(WSAGetLastError() == WSAEINPROGRESS) 818 return 0; 819 if(WSAGetLastError() == WSAEWOULDBLOCK) { 820 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 821 dtio->stop_flush_event:dtio->event), 822 UB_EV_WRITE); 823 return 0; 824 } 825 #endif 826 log_err("dnstap io: failed send: %s", sock_strerror(errno)); 827 return -1; 828 } 829 return ret; 830 } 831 832 #ifdef HAVE_WRITEV 833 /** write with writev, len and message, in one write, if possible. 834 * return true if message is done, false if incomplete */ 835 static int dtio_write_with_writev(struct dt_io_thread* dtio) 836 { 837 uint32_t sendlen = htonl(dtio->cur_msg_len); 838 struct iovec iov[2]; 839 ssize_t r; 840 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 841 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 842 iov[1].iov_base = dtio->cur_msg; 843 iov[1].iov_len = dtio->cur_msg_len; 844 log_assert(iov[0].iov_len > 0); 845 r = writev(dtio->fd, iov, 2); 846 if(r == -1) { 847 #ifndef USE_WINSOCK 848 if(errno == EINTR || errno == EAGAIN) 849 return 0; 850 #else 851 if(WSAGetLastError() == WSAEINPROGRESS) 852 return 0; 853 if(WSAGetLastError() == WSAEWOULDBLOCK) { 854 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 855 dtio->stop_flush_event:dtio->event), 856 UB_EV_WRITE); 857 return 0; 858 } 859 #endif 860 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 861 /* close the channel */ 862 dtio_del_output_event(dtio); 863 dtio_close_output(dtio); 864 return 0; 865 } 866 /* written r bytes */ 867 dtio->cur_msg_len_done += r; 868 if(dtio->cur_msg_len_done < 4) 869 return 0; 870 if(dtio->cur_msg_len_done > 4) { 871 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 872 dtio->cur_msg_len_done = 4; 873 } 874 if(dtio->cur_msg_done < dtio->cur_msg_len) 875 return 0; 876 return 1; 877 } 878 #endif /* HAVE_WRITEV */ 879 880 /** write more of the length, preceding the data frame. 881 * return true if message is done, false if incomplete. */ 882 static int dtio_write_more_of_len(struct dt_io_thread* dtio) 883 { 884 uint32_t sendlen; 885 int r; 886 if(dtio->cur_msg_len_done >= 4) 887 return 1; 888 #ifdef HAVE_WRITEV 889 if(!dtio->ssl) { 890 /* we try writev for everything.*/ 891 return dtio_write_with_writev(dtio); 892 } 893 #endif /* HAVE_WRITEV */ 894 sendlen = htonl(dtio->cur_msg_len); 895 r = dtio_write_buf(dtio, 896 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 897 sizeof(sendlen)-dtio->cur_msg_len_done); 898 if(r == -1) { 899 /* close the channel */ 900 dtio_del_output_event(dtio); 901 dtio_close_output(dtio); 902 return 0; 903 } else if(r == 0) { 904 /* try again later */ 905 return 0; 906 } 907 dtio->cur_msg_len_done += r; 908 if(dtio->cur_msg_len_done < 4) 909 return 0; 910 return 1; 911 } 912 913 /** write more of the data frame. 914 * return true if message is done, false if incomplete. */ 915 static int dtio_write_more_of_data(struct dt_io_thread* dtio) 916 { 917 int r; 918 if(dtio->cur_msg_done >= dtio->cur_msg_len) 919 return 1; 920 r = dtio_write_buf(dtio, 921 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 922 dtio->cur_msg_len - dtio->cur_msg_done); 923 if(r == -1) { 924 /* close the channel */ 925 dtio_del_output_event(dtio); 926 dtio_close_output(dtio); 927 return 0; 928 } else if(r == 0) { 929 /* try again later */ 930 return 0; 931 } 932 dtio->cur_msg_done += r; 933 if(dtio->cur_msg_done < dtio->cur_msg_len) 934 return 0; 935 return 1; 936 } 937 938 /** write more of the current message. false if incomplete, true if 939 * the message is done */ 940 static int dtio_write_more(struct dt_io_thread* dtio) 941 { 942 if(dtio->cur_msg_len_done < 4) { 943 if(!dtio_write_more_of_len(dtio)) 944 return 0; 945 } 946 if(dtio->cur_msg_done < dtio->cur_msg_len) { 947 if(!dtio_write_more_of_data(dtio)) 948 return 0; 949 } 950 return 1; 951 } 952 953 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 954 * -1: continue, >0: number of bytes read into buffer */ 955 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 956 ssize_t r; 957 r = recv(dtio->fd, (void*)buf, len, 0); 958 if(r == -1) { 959 char* to = dtio->socket_path; 960 if(!to) to = dtio->ip_str; 961 if(!to) to = ""; 962 #ifndef USE_WINSOCK 963 if(errno == EINTR || errno == EAGAIN) 964 return -1; /* try later */ 965 #else 966 if(WSAGetLastError() == WSAEINPROGRESS) { 967 return -1; /* try later */ 968 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 969 ub_winsock_tcp_wouldblock( 970 (dtio->stop_flush_event? 971 dtio->stop_flush_event:dtio->event), 972 UB_EV_READ); 973 return -1; /* try later */ 974 } 975 #endif 976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 977 verbosity < 4) 978 return 0; /* no log retries on low verbosity */ 979 log_err("dnstap io: output closed, recv %s: %s", to, 980 strerror(errno)); 981 /* and close below */ 982 return 0; 983 } 984 if(r == 0) { 985 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 986 verbosity < 4) 987 return 0; /* no log retries on low verbosity */ 988 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 989 /* and close below */ 990 return 0; 991 } 992 /* something was received */ 993 return r; 994 } 995 996 #ifdef HAVE_SSL 997 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 998 * -1: continue, >0: number of bytes read into buffer */ 999 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 1000 { 1001 int r; 1002 ERR_clear_error(); 1003 r = SSL_read(dtio->ssl, buf, len); 1004 if(r <= 0) { 1005 int want = SSL_get_error(dtio->ssl, r); 1006 if(want == SSL_ERROR_ZERO_RETURN) { 1007 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1008 verbosity < 4) 1009 return 0; /* no log retries on low verbosity */ 1010 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1011 "other side"); 1012 return 0; 1013 } else if(want == SSL_ERROR_WANT_READ) { 1014 /* continue later */ 1015 return -1; 1016 } else if(want == SSL_ERROR_WANT_WRITE) { 1017 (void)dtio_enable_brief_write(dtio); 1018 return -1; 1019 } else if(want == SSL_ERROR_SYSCALL) { 1020 #ifdef ECONNRESET 1021 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1022 errno == ECONNRESET && verbosity < 4) 1023 return 0; /* silence reset by peer */ 1024 #endif 1025 if(errno != 0) 1026 log_err("SSL_read syscall: %s", 1027 strerror(errno)); 1028 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1029 "other side"); 1030 return 0; 1031 } 1032 log_crypto_err("could not SSL_read"); 1033 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1034 "other side"); 1035 return 0; 1036 } 1037 return r; 1038 } 1039 #endif /* HAVE_SSL */ 1040 1041 /** check if the output fd has been closed, 1042 * it returns false if the stream is closed. */ 1043 static int dtio_check_close(struct dt_io_thread* dtio) 1044 { 1045 /* we don't want to read any packets, but if there are we can 1046 * discard the input (ignore it). Ignore of unknown (control) 1047 * packets is okay for the framestream protocol. And also, the 1048 * read call can return that the stream has been closed by the 1049 * other side. */ 1050 uint8_t buf[1024]; 1051 int r = -1; 1052 1053 1054 if(dtio->fd == -1) return 0; 1055 1056 while(r != 0) { 1057 /* not interested in buffer content, overwrite */ 1058 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 1059 if(r == -1) 1060 return 1; 1061 } 1062 /* the other end has been closed */ 1063 /* close the channel */ 1064 dtio_del_output_event(dtio); 1065 dtio_close_output(dtio); 1066 return 0; 1067 } 1068 1069 /** Read accept frame. Returns -1: continue reading, 0: closed, 1070 * 1: valid accept received. */ 1071 static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1072 { 1073 int r; 1074 size_t read_frame_done; 1075 while(dtio->read_frame.frame_len_done < 4) { 1076 #ifdef HAVE_SSL 1077 if(dtio->ssl) { 1078 r = ssl_read_bytes(dtio, 1079 (uint8_t*)&dtio->read_frame.frame_len+ 1080 dtio->read_frame.frame_len_done, 1081 4-dtio->read_frame.frame_len_done); 1082 } else { 1083 #endif 1084 r = receive_bytes(dtio, 1085 (uint8_t*)&dtio->read_frame.frame_len+ 1086 dtio->read_frame.frame_len_done, 1087 4-dtio->read_frame.frame_len_done); 1088 #ifdef HAVE_SSL 1089 } 1090 #endif 1091 if(r == -1) 1092 return -1; /* continue reading */ 1093 if(r == 0) { 1094 /* connection closed */ 1095 goto close_connection; 1096 } 1097 dtio->read_frame.frame_len_done += r; 1098 if(dtio->read_frame.frame_len_done < 4) 1099 return -1; /* continue reading */ 1100 1101 if(dtio->read_frame.frame_len == 0) { 1102 dtio->read_frame.frame_len_done = 0; 1103 dtio->read_frame.control_frame = 1; 1104 continue; 1105 } 1106 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1107 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1108 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1109 "length of %d bytes, closing connection", 1110 DTIO_RECV_FRAME_MAX_LEN); 1111 goto close_connection; 1112 } 1113 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1114 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1115 if(!dtio->read_frame.buf) { 1116 log_err("dnstap io: out of memory (creating read " 1117 "buffer)"); 1118 goto close_connection; 1119 } 1120 } 1121 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1122 #ifdef HAVE_SSL 1123 if(dtio->ssl) { 1124 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1125 dtio->read_frame.buf_count, 1126 dtio->read_frame.buf_cap- 1127 dtio->read_frame.buf_count); 1128 } else { 1129 #endif 1130 r = receive_bytes(dtio, dtio->read_frame.buf+ 1131 dtio->read_frame.buf_count, 1132 dtio->read_frame.buf_cap- 1133 dtio->read_frame.buf_count); 1134 #ifdef HAVE_SSL 1135 } 1136 #endif 1137 if(r == -1) 1138 return -1; /* continue reading */ 1139 if(r == 0) { 1140 /* connection closed */ 1141 goto close_connection; 1142 } 1143 dtio->read_frame.buf_count += r; 1144 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1145 return -1; /* continue reading */ 1146 } 1147 1148 /* Complete frame received, check if this is a valid ACCEPT control 1149 * frame. */ 1150 if(dtio->read_frame.frame_len < 4) { 1151 verbose(VERB_OPS, "dnstap: invalid data received"); 1152 goto close_connection; 1153 } 1154 if(sldns_read_uint32(dtio->read_frame.buf) != 1155 FSTRM_CONTROL_FRAME_ACCEPT) { 1156 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1157 "ignored"); 1158 dtio->ready_frame_sent = 0; 1159 dtio->accept_frame_received = 0; 1160 dtio_read_frame_free(&dtio->read_frame); 1161 return -1; 1162 } 1163 read_frame_done = 4; /* control frame type */ 1164 1165 /* Iterate over control fields, ignore unknown types. 1166 * Need to be able to read at least 8 bytes (control field type + 1167 * length). */ 1168 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1169 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1170 read_frame_done); 1171 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1172 read_frame_done + 4); 1173 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1174 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1175 read_frame_done+8+len <= 1176 dtio->read_frame.frame_len && 1177 memcmp(dtio->read_frame.buf + read_frame_done + 1178 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1179 if(!dtio_control_start_send(dtio)) { 1180 verbose(VERB_OPS, "dnstap io: out of " 1181 "memory while sending START frame"); 1182 goto close_connection; 1183 } 1184 dtio->accept_frame_received = 1; 1185 if(!dtio_add_output_event_write(dtio)) 1186 goto close_connection; 1187 return 1; 1188 } else { 1189 /* unknown content type */ 1190 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1191 "contains unknown content type, " 1192 "closing connection"); 1193 goto close_connection; 1194 } 1195 } 1196 /* unknown option, try next */ 1197 read_frame_done += 8+len; 1198 } 1199 1200 1201 close_connection: 1202 dtio_del_output_event(dtio); 1203 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1204 dtio_close_output(dtio); 1205 return 0; 1206 } 1207 1208 /** add the output file descriptor event for listening, read only */ 1209 static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1210 { 1211 if(!dtio->event) 1212 return 0; 1213 if(dtio->event_added && !dtio->event_added_is_write) 1214 return 1; 1215 /* we have to (re-)register the event */ 1216 if(dtio->event_added) 1217 ub_event_del(dtio->event); 1218 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1219 if(ub_event_add(dtio->event, NULL) != 0) { 1220 log_err("dnstap io: out of memory (adding event)"); 1221 dtio->event_added = 0; 1222 dtio->event_added_is_write = 0; 1223 /* close output and start reattempts to open it */ 1224 dtio_close_output(dtio); 1225 return 0; 1226 } 1227 dtio->event_added = 1; 1228 dtio->event_added_is_write = 0; 1229 return 1; 1230 } 1231 1232 /** add the output file descriptor event for listening, read and write */ 1233 static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1234 { 1235 if(!dtio->event) 1236 return 0; 1237 if(dtio->event_added && dtio->event_added_is_write) 1238 return 1; 1239 /* we have to (re-)register the event */ 1240 if(dtio->event_added) 1241 ub_event_del(dtio->event); 1242 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1243 if(ub_event_add(dtio->event, NULL) != 0) { 1244 log_err("dnstap io: out of memory (adding event)"); 1245 dtio->event_added = 0; 1246 dtio->event_added_is_write = 0; 1247 /* close output and start reattempts to open it */ 1248 dtio_close_output(dtio); 1249 return 0; 1250 } 1251 dtio->event_added = 1; 1252 dtio->event_added_is_write = 1; 1253 return 1; 1254 } 1255 1256 /** put the dtio thread to sleep */ 1257 static void dtio_sleep(struct dt_io_thread* dtio) 1258 { 1259 /* unregister the event polling for write, because there is 1260 * nothing to be written */ 1261 (void)dtio_add_output_event_read(dtio); 1262 } 1263 1264 #ifdef HAVE_SSL 1265 /** enable the brief read condition */ 1266 static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1267 { 1268 dtio->ssl_brief_read = 1; 1269 if(dtio->stop_flush_event) { 1270 ub_event_del(dtio->stop_flush_event); 1271 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1272 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1273 log_err("dnstap io, stop flush, could not ub_event_add"); 1274 return 0; 1275 } 1276 return 1; 1277 } 1278 return dtio_add_output_event_read(dtio); 1279 } 1280 #endif /* HAVE_SSL */ 1281 1282 #ifdef HAVE_SSL 1283 /** disable the brief read condition */ 1284 static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1285 { 1286 dtio->ssl_brief_read = 0; 1287 if(dtio->stop_flush_event) { 1288 ub_event_del(dtio->stop_flush_event); 1289 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1290 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1291 log_err("dnstap io, stop flush, could not ub_event_add"); 1292 return 0; 1293 } 1294 return 1; 1295 } 1296 return dtio_add_output_event_write(dtio); 1297 } 1298 #endif /* HAVE_SSL */ 1299 1300 #ifdef HAVE_SSL 1301 /** enable the brief write condition */ 1302 static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1303 { 1304 dtio->ssl_brief_write = 1; 1305 return dtio_add_output_event_write(dtio); 1306 } 1307 #endif /* HAVE_SSL */ 1308 1309 #ifdef HAVE_SSL 1310 /** disable the brief write condition */ 1311 static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1312 { 1313 dtio->ssl_brief_write = 0; 1314 return dtio_add_output_event_read(dtio); 1315 } 1316 #endif /* HAVE_SSL */ 1317 1318 #ifdef HAVE_SSL 1319 /** check peer verification after ssl handshake connection, false if closed*/ 1320 static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1321 { 1322 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1323 /* verification */ 1324 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1325 X509* x = SSL_get_peer_certificate(dtio->ssl); 1326 if(!x) { 1327 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1328 "connection failed no certificate", 1329 dtio->ip_str); 1330 return 0; 1331 } 1332 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1333 x); 1334 #ifdef HAVE_SSL_GET0_PEERNAME 1335 if(SSL_get0_peername(dtio->ssl)) { 1336 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1337 "connection to %s authenticated", 1338 dtio->ip_str, 1339 SSL_get0_peername(dtio->ssl)); 1340 } else { 1341 #endif 1342 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1343 "connection authenticated", 1344 dtio->ip_str); 1345 #ifdef HAVE_SSL_GET0_PEERNAME 1346 } 1347 #endif 1348 X509_free(x); 1349 } else { 1350 X509* x = SSL_get_peer_certificate(dtio->ssl); 1351 if(x) { 1352 log_cert(VERB_ALGO, "dnstap io, peer " 1353 "certificate", x); 1354 X509_free(x); 1355 } 1356 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1357 "failed: failed to authenticate", 1358 dtio->ip_str); 1359 return 0; 1360 } 1361 } else { 1362 /* unauthenticated, the verify peer flag was not set 1363 * in ssl when the ssl object was created from ssl_ctx */ 1364 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1365 dtio->ip_str); 1366 } 1367 return 1; 1368 } 1369 #endif /* HAVE_SSL */ 1370 1371 #ifdef HAVE_SSL 1372 /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1373 static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1374 struct stop_flush_info* info) 1375 { 1376 int r; 1377 if(dtio->ssl_brief_read) { 1378 /* assume the brief read condition is satisfied, 1379 * if we need more or again, we can set it again */ 1380 if(!dtio_disable_brief_read(dtio)) { 1381 if(info) dtio_stop_flush_exit(info); 1382 return 0; 1383 } 1384 } 1385 if(dtio->ssl_handshake_done) 1386 return 1; 1387 1388 ERR_clear_error(); 1389 r = SSL_do_handshake(dtio->ssl); 1390 if(r != 1) { 1391 int want = SSL_get_error(dtio->ssl, r); 1392 if(want == SSL_ERROR_WANT_READ) { 1393 /* we want to read on the connection */ 1394 if(!dtio_enable_brief_read(dtio)) { 1395 if(info) dtio_stop_flush_exit(info); 1396 return 0; 1397 } 1398 return 0; 1399 } else if(want == SSL_ERROR_WANT_WRITE) { 1400 /* we want to write on the connection */ 1401 return 0; 1402 } else if(r == 0) { 1403 /* closed */ 1404 if(info) dtio_stop_flush_exit(info); 1405 dtio_del_output_event(dtio); 1406 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1407 dtio_close_output(dtio); 1408 return 0; 1409 } else if(want == SSL_ERROR_SYSCALL) { 1410 /* SYSCALL and errno==0 means closed uncleanly */ 1411 int silent = 0; 1412 #ifdef EPIPE 1413 if(errno == EPIPE && verbosity < 2) 1414 silent = 1; /* silence 'broken pipe' */ 1415 #endif 1416 #ifdef ECONNRESET 1417 if(errno == ECONNRESET && verbosity < 2) 1418 silent = 1; /* silence reset by peer */ 1419 #endif 1420 if(errno == 0) 1421 silent = 1; 1422 if(!silent) 1423 log_err("dnstap io, SSL_handshake syscall: %s", 1424 strerror(errno)); 1425 /* closed */ 1426 if(info) dtio_stop_flush_exit(info); 1427 dtio_del_output_event(dtio); 1428 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1429 dtio_close_output(dtio); 1430 return 0; 1431 } else { 1432 unsigned long err = ERR_get_error(); 1433 if(!squelch_err_ssl_handshake(err)) { 1434 log_crypto_err_code("dnstap io, ssl handshake failed", 1435 err); 1436 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1437 "from %s", dtio->ip_str); 1438 } 1439 /* closed */ 1440 if(info) dtio_stop_flush_exit(info); 1441 dtio_del_output_event(dtio); 1442 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1443 dtio_close_output(dtio); 1444 return 0; 1445 } 1446 1447 } 1448 /* check peer verification */ 1449 dtio->ssl_handshake_done = 1; 1450 1451 if(!dtio_ssl_check_peer(dtio)) { 1452 /* closed */ 1453 if(info) dtio_stop_flush_exit(info); 1454 dtio_del_output_event(dtio); 1455 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1456 dtio_close_output(dtio); 1457 return 0; 1458 } 1459 return 1; 1460 } 1461 #endif /* HAVE_SSL */ 1462 1463 /** callback for the dnstap events, to write to the output */ 1464 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1465 { 1466 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1467 int i; 1468 1469 if(dtio->check_nb_connect) { 1470 int connect_err = dtio_check_nb_connect(dtio); 1471 if(connect_err == -1) { 1472 /* close the channel */ 1473 dtio_del_output_event(dtio); 1474 dtio_close_output(dtio); 1475 return; 1476 } else if(connect_err == 0) { 1477 /* try again later */ 1478 return; 1479 } 1480 /* nonblocking connect check passed, continue */ 1481 } 1482 1483 #ifdef HAVE_SSL 1484 if(dtio->ssl && 1485 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1486 if(!dtio_ssl_handshake(dtio, NULL)) 1487 return; 1488 } 1489 #endif 1490 1491 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1492 if(dtio->ssl_brief_write) 1493 (void)dtio_disable_brief_write(dtio); 1494 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1495 if(dtio_read_accept_frame(dtio) <= 0) 1496 return; 1497 } else if(!dtio_check_close(dtio)) 1498 return; 1499 } 1500 1501 /* loop to process a number of messages. This improves throughput, 1502 * because selecting on write-event if not needed for busy messages 1503 * (dnstap log) generation and if they need to all be written back. 1504 * The write event is usually not blocked up. But not forever, 1505 * because the event loop needs to stay responsive for other events. 1506 * If there are no (more) messages, or if the output buffers get 1507 * full, it returns out of the loop. */ 1508 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1509 /* see if there are messages that need writing */ 1510 if(!dtio->cur_msg) { 1511 if(!dtio_find_msg(dtio)) { 1512 if(i == 0) { 1513 /* no messages on the first iteration, 1514 * the queues are all empty */ 1515 dtio_sleep(dtio); 1516 } 1517 return; /* nothing to do */ 1518 } 1519 } 1520 1521 /* write it */ 1522 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1523 if(!dtio_write_more(dtio)) 1524 return; 1525 } 1526 1527 /* done with the current message */ 1528 dtio_cur_msg_free(dtio); 1529 1530 /* If this is a bidirectional stream the first message will be 1531 * the READY control frame. We can only continue writing after 1532 * receiving an ACCEPT control frame. */ 1533 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1534 dtio->ready_frame_sent = 1; 1535 (void)dtio_add_output_event_read(dtio); 1536 break; 1537 } 1538 } 1539 } 1540 1541 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1542 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1543 { 1544 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1545 uint8_t cmd; 1546 ssize_t r; 1547 if(dtio->want_to_exit) 1548 return; 1549 r = read(fd, &cmd, sizeof(cmd)); 1550 if(r == -1) { 1551 #ifndef USE_WINSOCK 1552 if(errno == EINTR || errno == EAGAIN) 1553 return; /* ignore this */ 1554 #else 1555 if(WSAGetLastError() == WSAEINPROGRESS) 1556 return; 1557 if(WSAGetLastError() == WSAEWOULDBLOCK) 1558 return; 1559 #endif 1560 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 1561 /* and then fall through to quit the thread */ 1562 } else if(r == 0) { 1563 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1564 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1565 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1566 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1567 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1568 1569 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1570 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1571 "waiting for ACCEPT control frame"); 1572 return; 1573 } 1574 1575 /* reregister event */ 1576 if(!dtio_add_output_event_write(dtio)) 1577 return; 1578 return; 1579 } else if(r == 1) { 1580 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1581 } 1582 dtio->want_to_exit = 1; 1583 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1584 != 0) { 1585 log_err("dnstap io: could not loopexit"); 1586 } 1587 } 1588 1589 #ifndef THREADS_DISABLED 1590 /** setup the event base for the dnstap io thread */ 1591 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1592 struct timeval* now) 1593 { 1594 memset(now, 0, sizeof(*now)); 1595 dtio->event_base = ub_default_event_base(0, secs, now); 1596 if(!dtio->event_base) { 1597 fatal_exit("dnstap io: could not create event_base"); 1598 } 1599 } 1600 #endif /* THREADS_DISABLED */ 1601 1602 /** setup the cmd event for dnstap io */ 1603 static void dtio_setup_cmd(struct dt_io_thread* dtio) 1604 { 1605 struct ub_event* cmdev; 1606 fd_set_nonblock(dtio->commandpipe[0]); 1607 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1608 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1609 if(!cmdev) { 1610 fatal_exit("dnstap io: out of memory"); 1611 } 1612 dtio->command_event = cmdev; 1613 if(ub_event_add(cmdev, NULL) != 0) { 1614 fatal_exit("dnstap io: out of memory (adding event)"); 1615 } 1616 } 1617 1618 /** setup the reconnect event for dnstap io */ 1619 static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1620 { 1621 dtio_reconnect_clear(dtio); 1622 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1623 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1624 if(!dtio->reconnect_timer) { 1625 fatal_exit("dnstap io: out of memory"); 1626 } 1627 } 1628 1629 /** 1630 * structure to keep track of information during stop flush 1631 */ 1632 struct stop_flush_info { 1633 /** the event base during stop flush */ 1634 struct ub_event_base* base; 1635 /** did we already want to exit this stop-flush event base */ 1636 int want_to_exit_flush; 1637 /** has the timer fired */ 1638 int timer_done; 1639 /** the dtio */ 1640 struct dt_io_thread* dtio; 1641 /** the stop control frame */ 1642 void* stop_frame; 1643 /** length of the stop frame */ 1644 size_t stop_frame_len; 1645 /** how much we have done of the stop frame */ 1646 size_t stop_frame_done; 1647 }; 1648 1649 /** exit the stop flush base */ 1650 static void dtio_stop_flush_exit(struct stop_flush_info* info) 1651 { 1652 if(info->want_to_exit_flush) 1653 return; 1654 info->want_to_exit_flush = 1; 1655 if(ub_event_base_loopexit(info->base) != 0) { 1656 log_err("dnstap io: could not loopexit"); 1657 } 1658 } 1659 1660 /** send the stop control, 1661 * return true if completed the frame. */ 1662 static int dtio_control_stop_send(struct stop_flush_info* info) 1663 { 1664 struct dt_io_thread* dtio = info->dtio; 1665 int r; 1666 if(info->stop_frame_done >= info->stop_frame_len) 1667 return 1; 1668 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1669 info->stop_frame_done, info->stop_frame_len - 1670 info->stop_frame_done); 1671 if(r == -1) { 1672 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1673 dtio_stop_flush_exit(info); 1674 return 0; 1675 } 1676 if(r == 0) { 1677 /* try again later, or timeout */ 1678 return 0; 1679 } 1680 info->stop_frame_done += r; 1681 if(info->stop_frame_done < info->stop_frame_len) 1682 return 0; /* not done yet */ 1683 return 1; 1684 } 1685 1686 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1687 void* arg) 1688 { 1689 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1690 if(info->want_to_exit_flush) 1691 return; 1692 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1693 info->timer_done = 1; 1694 dtio_stop_flush_exit(info); 1695 } 1696 1697 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1698 { 1699 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1700 struct dt_io_thread* dtio = info->dtio; 1701 if(info->want_to_exit_flush) 1702 return; 1703 if(dtio->check_nb_connect) { 1704 /* we don't start the stop_flush if connect still 1705 * in progress, but the check code is here, just in case */ 1706 int connect_err = dtio_check_nb_connect(dtio); 1707 if(connect_err == -1) { 1708 /* close the channel, exit the stop flush */ 1709 dtio_stop_flush_exit(info); 1710 dtio_del_output_event(dtio); 1711 dtio_close_output(dtio); 1712 return; 1713 } else if(connect_err == 0) { 1714 /* try again later */ 1715 return; 1716 } 1717 /* nonblocking connect check passed, continue */ 1718 } 1719 #ifdef HAVE_SSL 1720 if(dtio->ssl && 1721 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1722 if(!dtio_ssl_handshake(dtio, info)) 1723 return; 1724 } 1725 #endif 1726 1727 if((bits&UB_EV_READ)) { 1728 if(!dtio_check_close(dtio)) { 1729 if(dtio->fd == -1) { 1730 verbose(VERB_ALGO, "dnstap io: " 1731 "stop flush: output closed"); 1732 dtio_stop_flush_exit(info); 1733 } 1734 return; 1735 } 1736 } 1737 /* write remainder of last frame */ 1738 if(dtio->cur_msg) { 1739 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1740 if(!dtio_write_more(dtio)) { 1741 if(dtio->fd == -1) { 1742 verbose(VERB_ALGO, "dnstap io: " 1743 "stop flush: output closed"); 1744 dtio_stop_flush_exit(info); 1745 } 1746 return; 1747 } 1748 } 1749 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1750 "last frame"); 1751 dtio_cur_msg_free(dtio); 1752 } 1753 /* write stop frame */ 1754 if(info->stop_frame_done < info->stop_frame_len) { 1755 if(!dtio_control_stop_send(info)) 1756 return; 1757 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1758 "stop control frame"); 1759 } 1760 /* when last frame and stop frame are sent, exit */ 1761 dtio_stop_flush_exit(info); 1762 } 1763 1764 /** flush at end, last packet and stop control */ 1765 static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1766 { 1767 /* briefly attempt to flush the previous packet to the output, 1768 * this could be a partial packet, or even the start control frame */ 1769 time_t secs = 0; 1770 struct timeval now; 1771 struct stop_flush_info info; 1772 struct timeval tv; 1773 struct ub_event* timer, *stopev; 1774 1775 if(dtio->fd == -1 || dtio->check_nb_connect) { 1776 /* no connection or we have just connected, so nothing is 1777 * sent yet, so nothing to stop or flush */ 1778 return; 1779 } 1780 if(dtio->ssl && !dtio->ssl_handshake_done) { 1781 /* no SSL connection has been established yet */ 1782 return; 1783 } 1784 1785 memset(&info, 0, sizeof(info)); 1786 memset(&now, 0, sizeof(now)); 1787 info.dtio = dtio; 1788 info.base = ub_default_event_base(0, &secs, &now); 1789 if(!info.base) { 1790 log_err("dnstap io: malloc failure"); 1791 return; 1792 } 1793 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1794 &dtio_stop_timer_cb, &info); 1795 if(!timer) { 1796 log_err("dnstap io: malloc failure"); 1797 ub_event_base_free(info.base); 1798 return; 1799 } 1800 memset(&tv, 0, sizeof(tv)); 1801 tv.tv_sec = 2; 1802 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1803 &tv) != 0) { 1804 log_err("dnstap io: cannot event_timer_add"); 1805 ub_event_free(timer); 1806 ub_event_base_free(info.base); 1807 return; 1808 } 1809 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1810 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1811 if(!stopev) { 1812 log_err("dnstap io: malloc failure"); 1813 ub_timer_del(timer); 1814 ub_event_free(timer); 1815 ub_event_base_free(info.base); 1816 return; 1817 } 1818 if(ub_event_add(stopev, NULL) != 0) { 1819 log_err("dnstap io: cannot event_add"); 1820 ub_event_free(stopev); 1821 ub_timer_del(timer); 1822 ub_event_free(timer); 1823 ub_event_base_free(info.base); 1824 return; 1825 } 1826 info.stop_frame = fstrm_create_control_frame_stop( 1827 &info.stop_frame_len); 1828 if(!info.stop_frame) { 1829 log_err("dnstap io: malloc failure"); 1830 ub_event_del(stopev); 1831 ub_event_free(stopev); 1832 ub_timer_del(timer); 1833 ub_event_free(timer); 1834 ub_event_base_free(info.base); 1835 return; 1836 } 1837 dtio->stop_flush_event = stopev; 1838 1839 /* wait briefly, or until finished */ 1840 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1841 if(ub_event_base_dispatch(info.base) < 0) { 1842 log_err("dnstap io: dispatch flush failed, errno is %s", 1843 strerror(errno)); 1844 } 1845 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1846 free(info.stop_frame); 1847 dtio->stop_flush_event = NULL; 1848 ub_event_del(stopev); 1849 ub_event_free(stopev); 1850 ub_timer_del(timer); 1851 ub_event_free(timer); 1852 ub_event_base_free(info.base); 1853 } 1854 1855 /** perform desetup and free stuff when the dnstap io thread exits */ 1856 static void dtio_desetup(struct dt_io_thread* dtio) 1857 { 1858 dtio_control_stop_flush(dtio); 1859 dtio_del_output_event(dtio); 1860 dtio_close_output(dtio); 1861 ub_event_del(dtio->command_event); 1862 ub_event_free(dtio->command_event); 1863 #ifndef USE_WINSOCK 1864 close(dtio->commandpipe[0]); 1865 #else 1866 _close(dtio->commandpipe[0]); 1867 #endif 1868 dtio->commandpipe[0] = -1; 1869 dtio_reconnect_del(dtio); 1870 ub_event_free(dtio->reconnect_timer); 1871 dtio_cur_msg_free(dtio); 1872 #ifndef THREADS_DISABLED 1873 ub_event_base_free(dtio->event_base); 1874 #endif 1875 } 1876 1877 /** setup a start control message */ 1878 static int dtio_control_start_send(struct dt_io_thread* dtio) 1879 { 1880 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1881 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1882 &dtio->cur_msg_len); 1883 if(!dtio->cur_msg) { 1884 return 0; 1885 } 1886 /* setup to send the control message */ 1887 /* set that the buffer needs to be sent, but the length 1888 * of that buffer is already written, that way the buffer can 1889 * start with 0 length and then the length of the control frame 1890 * in it */ 1891 dtio->cur_msg_done = 0; 1892 dtio->cur_msg_len_done = 4; 1893 return 1; 1894 } 1895 1896 /** setup a ready control message */ 1897 static int dtio_control_ready_send(struct dt_io_thread* dtio) 1898 { 1899 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1900 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1901 &dtio->cur_msg_len); 1902 if(!dtio->cur_msg) { 1903 return 0; 1904 } 1905 /* setup to send the control message */ 1906 /* set that the buffer needs to be sent, but the length 1907 * of that buffer is already written, that way the buffer can 1908 * start with 0 length and then the length of the control frame 1909 * in it */ 1910 dtio->cur_msg_done = 0; 1911 dtio->cur_msg_len_done = 4; 1912 return 1; 1913 } 1914 1915 /** open the output file descriptor for af_local */ 1916 static int dtio_open_output_local(struct dt_io_thread* dtio) 1917 { 1918 #ifdef HAVE_SYS_UN_H 1919 struct sockaddr_un s; 1920 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1921 if(dtio->fd == -1) { 1922 log_err("dnstap io: failed to create socket: %s", 1923 sock_strerror(errno)); 1924 return 0; 1925 } 1926 memset(&s, 0, sizeof(s)); 1927 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1928 /* this member exists on BSDs, not Linux */ 1929 s.sun_len = (unsigned)sizeof(s); 1930 #endif 1931 s.sun_family = AF_LOCAL; 1932 /* length is 92-108, 104 on FreeBSD */ 1933 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1934 fd_set_nonblock(dtio->fd); 1935 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1936 == -1) { 1937 char* to = dtio->socket_path; 1938 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1939 verbosity < 4) { 1940 dtio_close_fd(dtio); 1941 return 0; /* no log retries on low verbosity */ 1942 } 1943 log_err("dnstap io: failed to connect to \"%s\": %s", 1944 to, sock_strerror(errno)); 1945 dtio_close_fd(dtio); 1946 return 0; 1947 } 1948 return 1; 1949 #else 1950 log_err("cannot create af_local socket"); 1951 return 0; 1952 #endif /* HAVE_SYS_UN_H */ 1953 } 1954 1955 /** open the output file descriptor for af_inet and af_inet6 */ 1956 static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1957 { 1958 struct sockaddr_storage addr; 1959 socklen_t addrlen; 1960 memset(&addr, 0, sizeof(addr)); 1961 addrlen = (socklen_t)sizeof(addr); 1962 1963 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1964 log_err("could not parse IP '%s'", dtio->ip_str); 1965 return 0; 1966 } 1967 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1968 if(dtio->fd == -1) { 1969 log_err("can't create socket: %s", sock_strerror(errno)); 1970 return 0; 1971 } 1972 fd_set_nonblock(dtio->fd); 1973 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1974 if(errno == EINPROGRESS) 1975 return 1; /* wait until connect done*/ 1976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1977 verbosity < 4) { 1978 dtio_close_fd(dtio); 1979 return 0; /* no log retries on low verbosity */ 1980 } 1981 #ifndef USE_WINSOCK 1982 if(tcp_connect_errno_needs_log( 1983 (struct sockaddr *)&addr, addrlen)) { 1984 log_err("dnstap io: failed to connect to %s: %s", 1985 dtio->ip_str, strerror(errno)); 1986 } 1987 #else 1988 if(WSAGetLastError() == WSAEINPROGRESS || 1989 WSAGetLastError() == WSAEWOULDBLOCK) 1990 return 1; /* wait until connect done*/ 1991 if(tcp_connect_errno_needs_log( 1992 (struct sockaddr *)&addr, addrlen)) { 1993 log_err("dnstap io: failed to connect to %s: %s", 1994 dtio->ip_str, wsa_strerror(WSAGetLastError())); 1995 } 1996 #endif 1997 dtio_close_fd(dtio); 1998 return 0; 1999 } 2000 return 1; 2001 } 2002 2003 /** setup the SSL structure for new connection */ 2004 static int dtio_setup_ssl(struct dt_io_thread* dtio) 2005 { 2006 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 2007 if(!dtio->ssl) return 0; 2008 dtio->ssl_handshake_done = 0; 2009 dtio->ssl_brief_read = 0; 2010 2011 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 2012 dtio->tls_use_sni)) { 2013 return 0; 2014 } 2015 return 1; 2016 } 2017 2018 /** open the output file descriptor */ 2019 static void dtio_open_output(struct dt_io_thread* dtio) 2020 { 2021 struct ub_event* ev; 2022 if(dtio->upstream_is_unix) { 2023 if(!dtio_open_output_local(dtio)) { 2024 dtio_reconnect_enable(dtio); 2025 return; 2026 } 2027 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 2028 if(!dtio_open_output_tcp(dtio)) { 2029 dtio_reconnect_enable(dtio); 2030 return; 2031 } 2032 if(dtio->upstream_is_tls) { 2033 if(!dtio_setup_ssl(dtio)) { 2034 dtio_close_fd(dtio); 2035 dtio_reconnect_enable(dtio); 2036 return; 2037 } 2038 } 2039 } 2040 dtio->check_nb_connect = 1; 2041 2042 /* the EV_READ is to read ACCEPT control messages, and catch channel 2043 * close. EV_WRITE is to write packets */ 2044 ev = ub_event_new(dtio->event_base, dtio->fd, 2045 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 2046 dtio); 2047 if(!ev) { 2048 log_err("dnstap io: out of memory"); 2049 if(dtio->ssl) { 2050 #ifdef HAVE_SSL 2051 SSL_free(dtio->ssl); 2052 dtio->ssl = NULL; 2053 #endif 2054 } 2055 dtio_close_fd(dtio); 2056 dtio_reconnect_enable(dtio); 2057 return; 2058 } 2059 dtio->event = ev; 2060 2061 /* setup protocol control message to start */ 2062 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2063 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2064 log_err("dnstap io: out of memory"); 2065 ub_event_free(dtio->event); 2066 dtio->event = NULL; 2067 if(dtio->ssl) { 2068 #ifdef HAVE_SSL 2069 SSL_free(dtio->ssl); 2070 dtio->ssl = NULL; 2071 #endif 2072 } 2073 dtio_close_fd(dtio); 2074 dtio_reconnect_enable(dtio); 2075 return; 2076 } 2077 } 2078 2079 /** perform the setup of the writer thread on the established event_base */ 2080 static void dtio_setup_on_base(struct dt_io_thread* dtio) 2081 { 2082 dtio_setup_cmd(dtio); 2083 dtio_setup_reconnect(dtio); 2084 dtio_open_output(dtio); 2085 if(!dtio_add_output_event_write(dtio)) 2086 return; 2087 } 2088 2089 #ifndef THREADS_DISABLED 2090 /** the IO thread function for the DNSTAP IO */ 2091 static void* dnstap_io(void* arg) 2092 { 2093 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2094 time_t secs = 0; 2095 struct timeval now; 2096 log_thread_set(&dtio->threadnum); 2097 2098 /* setup */ 2099 verbose(VERB_ALGO, "start dnstap io thread"); 2100 dtio_setup_base(dtio, &secs, &now); 2101 dtio_setup_on_base(dtio); 2102 2103 /* run */ 2104 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2105 log_err("dnstap io: dispatch failed, errno is %s", 2106 strerror(errno)); 2107 } 2108 2109 /* cleanup */ 2110 verbose(VERB_ALGO, "stop dnstap io thread"); 2111 dtio_desetup(dtio); 2112 return NULL; 2113 } 2114 #endif /* THREADS_DISABLED */ 2115 2116 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2117 int numworkers) 2118 { 2119 /* set up the thread, can fail */ 2120 #ifndef USE_WINSOCK 2121 if(pipe(dtio->commandpipe) == -1) { 2122 log_err("failed to create pipe: %s", strerror(errno)); 2123 return 0; 2124 } 2125 #else 2126 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2127 log_err("failed to create _pipe: %s", 2128 wsa_strerror(WSAGetLastError())); 2129 return 0; 2130 } 2131 #endif 2132 2133 /* start the thread */ 2134 dtio->threadnum = numworkers+1; 2135 dtio->started = 1; 2136 #ifndef THREADS_DISABLED 2137 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2138 (void)event_base_nothr; 2139 #else 2140 dtio->event_base = event_base_nothr; 2141 dtio_setup_on_base(dtio); 2142 #endif 2143 return 1; 2144 } 2145 2146 void dt_io_thread_stop(struct dt_io_thread* dtio) 2147 { 2148 #ifndef THREADS_DISABLED 2149 uint8_t cmd = DTIO_COMMAND_STOP; 2150 #endif 2151 if(!dtio) return; 2152 if(!dtio->started) return; 2153 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2154 2155 #ifndef THREADS_DISABLED 2156 while(1) { 2157 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2158 if(r == -1) { 2159 #ifndef USE_WINSOCK 2160 if(errno == EINTR || errno == EAGAIN) 2161 continue; 2162 #else 2163 if(WSAGetLastError() == WSAEINPROGRESS) 2164 continue; 2165 if(WSAGetLastError() == WSAEWOULDBLOCK) 2166 continue; 2167 #endif 2168 log_err("dnstap io stop: write: %s", 2169 sock_strerror(errno)); 2170 break; 2171 } 2172 break; 2173 } 2174 dtio->started = 0; 2175 #endif /* THREADS_DISABLED */ 2176 2177 #ifndef USE_WINSOCK 2178 close(dtio->commandpipe[1]); 2179 #else 2180 _close(dtio->commandpipe[1]); 2181 #endif 2182 dtio->commandpipe[1] = -1; 2183 #ifndef THREADS_DISABLED 2184 ub_thread_join(dtio->tid); 2185 #else 2186 dtio->want_to_exit = 1; 2187 dtio_desetup(dtio); 2188 #endif 2189 } 2190