1 /* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #include "config.h" 45 #include "dnstap/dtstream.h" 46 #include "dnstap/dnstap_fstrm.h" 47 #include "util/config_file.h" 48 #include "util/ub_event.h" 49 #include "util/net_help.h" 50 #include "services/outside_network.h" 51 #include "sldns/sbuffer.h" 52 #ifdef HAVE_SYS_UN_H 53 #include <sys/un.h> 54 #endif 55 #include <fcntl.h> 56 #ifdef HAVE_OPENSSL_SSL_H 57 #include <openssl/ssl.h> 58 #endif 59 #ifdef HAVE_OPENSSL_ERR_H 60 #include <openssl/err.h> 61 #endif 62 63 /** number of messages to process in one output callback */ 64 #define DTIO_MESSAGES_PER_CALLBACK 100 65 /** the msec to wait for reconnect (if not immediate, the first attempt) */ 66 #define DTIO_RECONNECT_TIMEOUT_MIN 10 67 /** the msec to wait for reconnect max after backoff */ 68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000 69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71 /** number of messages before wakeup of thread */ 72 #define DTIO_MSG_FOR_WAKEUP 32 73 74 /** maximum length of received frame */ 75 #define DTIO_RECV_FRAME_MAX_LEN 1000 76 77 struct stop_flush_info; 78 /** DTIO command channel commands */ 79 enum { 80 /** DTIO command channel stop */ 81 DTIO_COMMAND_STOP = 0, 82 /** DTIO command channel wakeup */ 83 DTIO_COMMAND_WAKEUP = 1 84 } dtio_channel_command; 85 86 /** open the output channel */ 87 static void dtio_open_output(struct dt_io_thread* dtio); 88 /** add output event for read and write */ 89 static int dtio_add_output_event_write(struct dt_io_thread* dtio); 90 /** start reconnection attempts */ 91 static void dtio_reconnect_enable(struct dt_io_thread* dtio); 92 /** stop from stop_flush event loop */ 93 static void dtio_stop_flush_exit(struct stop_flush_info* info); 94 /** setup a start control message */ 95 static int dtio_control_start_send(struct dt_io_thread* dtio); 96 #ifdef HAVE_SSL 97 /** enable briefly waiting for a read event, for SSL negotiation */ 98 static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99 /** enable briefly waiting for a write event, for SSL negotiation */ 100 static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101 #endif 102 103 struct dt_msg_queue* 104 dt_msg_queue_create(struct comm_base* base) 105 { 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ 111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119 } 120 121 /** clear the message list, caller must hold the lock */ 122 static void 123 dt_msg_queue_clear(struct dt_msg_queue* mq) 124 { 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; 135 mq->msgcount = 0; 136 } 137 138 void 139 dt_msg_queue_delete(struct dt_msg_queue* mq) 140 { 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); 144 comm_timer_delete(mq->wakeup_timer); 145 free(mq); 146 } 147 148 /** make the dtio wake up by sending a wakeup command */ 149 static void dtio_wakeup(struct dt_io_thread* dtio) 150 { 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158 #ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; 161 #else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; 166 #endif 167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); 169 break; 170 } 171 break; 172 } 173 } 174 175 void 176 mq_wakeup_cb(void* arg) 177 { 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 180 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 181 mq->dtio->wakeup_timer_enabled = 0; 182 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 183 dtio_wakeup(mq->dtio); 184 } 185 186 /** start timer to wakeup dtio because there is content in the queue */ 187 static void 188 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow) 189 { 190 struct timeval tv = {0}; 191 /* Start a timer to process messages to be logged. 192 * If we woke up the dtio thread for every message, the wakeup 193 * messages take up too much processing power. If the queue 194 * fills up the wakeup happens immediately. The timer wakes it up 195 * if there are infrequent messages to log. */ 196 197 /* we cannot start a timer in dtio thread, because it is a different 198 * thread and its event base is in use by the other thread, it would 199 * give race conditions if we tried to modify its event base, 200 * and locks would wait until it woke up, and this is what we do. */ 201 202 /* do not start the timer if a timer already exists, perhaps 203 * in another worker. So this variable is protected by a lock in 204 * dtio. */ 205 206 /* If we need to wakeupnow, 0 the timer to force the callback. */ 207 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 208 if(mq->dtio->wakeup_timer_enabled) { 209 if(wakeupnow) { 210 tv.tv_sec = 0; 211 tv.tv_usec = 0; 212 comm_timer_set(mq->wakeup_timer, &tv); 213 } 214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215 return; 216 } 217 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 218 219 /* start the timer, in mq, in the event base of our worker */ 220 if(!wakeupnow) { 221 tv.tv_sec = 1; 222 tv.tv_usec = 0; 223 /* If it is already set, keep it running. */ 224 if(!comm_timer_is_set(mq->wakeup_timer)) 225 comm_timer_set(mq->wakeup_timer, &tv); 226 } else { 227 tv.tv_sec = 0; 228 tv.tv_usec = 0; 229 comm_timer_set(mq->wakeup_timer, &tv); 230 } 231 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 232 } 233 234 void 235 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 236 { 237 int wakeupnow = 0, wakeupstarttimer = 0; 238 struct dt_msg_entry* entry; 239 240 /* check conditions */ 241 if(!buf) return; 242 if(len == 0) { 243 /* it is not possible to log entries with zero length, 244 * because the framestream protocol does not carry it. 245 * However the protobuf serialization does not create zero 246 * length datagrams for dnstap, so this should not happen. */ 247 free(buf); 248 return; 249 } 250 if(!mq) { 251 free(buf); 252 return; 253 } 254 255 /* allocate memory for queue entry */ 256 entry = malloc(sizeof(*entry)); 257 if(!entry) { 258 log_err("out of memory logging dnstap"); 259 free(buf); 260 return; 261 } 262 entry->next = NULL; 263 entry->buf = buf; 264 entry->len = len; 265 266 /* acquire lock */ 267 lock_basic_lock(&mq->lock); 268 /* if list was empty, start timer for (eventual) wakeup, 269 * or if dtio is not writing now an eventual wakeup is needed. */ 270 if(mq->first == NULL || !mq->dtio->event_added_is_write) 271 wakeupstarttimer = 1; 272 /* if list contains more than wakeupnum elements, wakeup now, 273 * or if list is (going to be) almost full */ 274 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 275 (mq->cursize < mq->maxsize * 9 / 10 && 276 mq->cursize+len >= mq->maxsize * 9 / 10)) 277 wakeupnow = 1; 278 /* see if it is going to fit */ 279 if(mq->cursize + len > mq->maxsize) { 280 /* buffer full, or congested. */ 281 /* drop */ 282 lock_basic_unlock(&mq->lock); 283 free(buf); 284 free(entry); 285 return; 286 } 287 mq->cursize += len; 288 mq->msgcount ++; 289 /* append to list */ 290 if(mq->last) { 291 mq->last->next = entry; 292 } else { 293 mq->first = entry; 294 } 295 mq->last = entry; 296 /* release lock */ 297 lock_basic_unlock(&mq->lock); 298 299 if(wakeupnow || wakeupstarttimer) { 300 dt_msg_queue_start_timer(mq, wakeupnow); 301 } 302 } 303 304 struct dt_io_thread* dt_io_thread_create(void) 305 { 306 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 307 lock_basic_init(&dtio->wakeup_timer_lock); 308 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 309 sizeof(dtio->wakeup_timer_enabled)); 310 return dtio; 311 } 312 313 void dt_io_thread_delete(struct dt_io_thread* dtio) 314 { 315 struct dt_io_list_item* item, *nextitem; 316 if(!dtio) return; 317 lock_basic_destroy(&dtio->wakeup_timer_lock); 318 item=dtio->io_list; 319 while(item) { 320 nextitem = item->next; 321 free(item); 322 item = nextitem; 323 } 324 free(dtio->socket_path); 325 free(dtio->ip_str); 326 free(dtio->tls_server_name); 327 free(dtio->client_key_file); 328 free(dtio->client_cert_file); 329 if(dtio->ssl_ctx) { 330 #ifdef HAVE_SSL 331 SSL_CTX_free(dtio->ssl_ctx); 332 #endif 333 } 334 free(dtio); 335 } 336 337 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 338 { 339 if(!cfg->dnstap) { 340 log_warn("cannot setup dnstap because dnstap-enable is no"); 341 return 0; 342 } 343 344 /* what type of connectivity do we have */ 345 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 346 if(cfg->dnstap_tls) 347 dtio->upstream_is_tls = 1; 348 else dtio->upstream_is_tcp = 1; 349 } else { 350 dtio->upstream_is_unix = 1; 351 } 352 dtio->is_bidirectional = cfg->dnstap_bidirectional; 353 354 if(dtio->upstream_is_unix) { 355 char* nm; 356 if(!cfg->dnstap_socket_path || 357 cfg->dnstap_socket_path[0]==0) { 358 log_err("dnstap setup: no dnstap-socket-path for " 359 "socket connect"); 360 return 0; 361 } 362 nm = cfg->dnstap_socket_path; 363 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 364 cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 365 nm += strlen(cfg->chrootdir); 366 free(dtio->socket_path); 367 dtio->socket_path = strdup(nm); 368 if(!dtio->socket_path) { 369 log_err("dnstap setup: malloc failure"); 370 return 0; 371 } 372 } 373 374 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 375 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 376 log_err("dnstap setup: no dnstap-ip for TCP connect"); 377 return 0; 378 } 379 free(dtio->ip_str); 380 dtio->ip_str = strdup(cfg->dnstap_ip); 381 if(!dtio->ip_str) { 382 log_err("dnstap setup: malloc failure"); 383 return 0; 384 } 385 } 386 387 if(dtio->upstream_is_tls) { 388 #ifdef HAVE_SSL 389 if(cfg->dnstap_tls_server_name && 390 cfg->dnstap_tls_server_name[0]) { 391 free(dtio->tls_server_name); 392 dtio->tls_server_name = strdup( 393 cfg->dnstap_tls_server_name); 394 if(!dtio->tls_server_name) { 395 log_err("dnstap setup: malloc failure"); 396 return 0; 397 } 398 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 399 return 0; 400 } 401 if(cfg->dnstap_tls_client_key_file && 402 cfg->dnstap_tls_client_key_file[0]) { 403 dtio->use_client_certs = 1; 404 free(dtio->client_key_file); 405 dtio->client_key_file = strdup( 406 cfg->dnstap_tls_client_key_file); 407 if(!dtio->client_key_file) { 408 log_err("dnstap setup: malloc failure"); 409 return 0; 410 } 411 if(!cfg->dnstap_tls_client_cert_file || 412 cfg->dnstap_tls_client_cert_file[0]==0) { 413 log_err("dnstap setup: client key " 414 "authentication enabled with " 415 "dnstap-tls-client-key-file, but " 416 "no dnstap-tls-client-cert-file " 417 "is given"); 418 return 0; 419 } 420 free(dtio->client_cert_file); 421 dtio->client_cert_file = strdup( 422 cfg->dnstap_tls_client_cert_file); 423 if(!dtio->client_cert_file) { 424 log_err("dnstap setup: malloc failure"); 425 return 0; 426 } 427 } else { 428 dtio->use_client_certs = 0; 429 dtio->client_key_file = NULL; 430 dtio->client_cert_file = NULL; 431 } 432 433 if(cfg->dnstap_tls_cert_bundle) { 434 dtio->ssl_ctx = connect_sslctx_create( 435 dtio->client_key_file, 436 dtio->client_cert_file, 437 cfg->dnstap_tls_cert_bundle, 0); 438 } else { 439 dtio->ssl_ctx = connect_sslctx_create( 440 dtio->client_key_file, 441 dtio->client_cert_file, 442 cfg->tls_cert_bundle, cfg->tls_win_cert); 443 } 444 if(!dtio->ssl_ctx) { 445 log_err("could not setup SSL CTX"); 446 return 0; 447 } 448 dtio->tls_use_sni = cfg->tls_use_sni; 449 #endif /* HAVE_SSL */ 450 } 451 return 1; 452 } 453 454 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 455 struct dt_msg_queue* mq) 456 { 457 struct dt_io_list_item* item = malloc(sizeof(*item)); 458 if(!item) return 0; 459 lock_basic_lock(&mq->lock); 460 mq->dtio = dtio; 461 lock_basic_unlock(&mq->lock); 462 item->queue = mq; 463 item->next = dtio->io_list; 464 dtio->io_list = item; 465 dtio->io_list_iter = NULL; 466 return 1; 467 } 468 469 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 470 struct dt_msg_queue* mq) 471 { 472 struct dt_io_list_item* item, *prev=NULL; 473 if(!dtio) return; 474 item = dtio->io_list; 475 while(item) { 476 if(item->queue == mq) { 477 /* found it */ 478 if(prev) prev->next = item->next; 479 else dtio->io_list = item->next; 480 /* the queue itself only registered, not deleted */ 481 lock_basic_lock(&item->queue->lock); 482 item->queue->dtio = NULL; 483 lock_basic_unlock(&item->queue->lock); 484 free(item); 485 dtio->io_list_iter = NULL; 486 return; 487 } 488 prev = item; 489 item = item->next; 490 } 491 } 492 493 /** pick a message from the queue, the routine locks and unlocks, 494 * returns true if there is a message */ 495 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 496 size_t* len) 497 { 498 lock_basic_lock(&mq->lock); 499 if(mq->first) { 500 struct dt_msg_entry* entry = mq->first; 501 mq->first = entry->next; 502 if(!entry->next) mq->last = NULL; 503 mq->cursize -= entry->len; 504 mq->msgcount --; 505 lock_basic_unlock(&mq->lock); 506 507 *buf = entry->buf; 508 *len = entry->len; 509 free(entry); 510 return 1; 511 } 512 lock_basic_unlock(&mq->lock); 513 return 0; 514 } 515 516 /** find message in queue, false if no message, true if message to send */ 517 static int dtio_find_in_queue(struct dt_io_thread* dtio, 518 struct dt_msg_queue* mq) 519 { 520 void* buf=NULL; 521 size_t len=0; 522 if(dt_msg_queue_pop(mq, &buf, &len)) { 523 dtio->cur_msg = buf; 524 dtio->cur_msg_len = len; 525 dtio->cur_msg_done = 0; 526 dtio->cur_msg_len_done = 0; 527 return 1; 528 } 529 return 0; 530 } 531 532 /** find a new message to write, search message queues, false if none */ 533 static int dtio_find_msg(struct dt_io_thread* dtio) 534 { 535 struct dt_io_list_item *spot, *item; 536 537 spot = dtio->io_list_iter; 538 /* use the next queue for the next message lookup, 539 * if we hit the end(NULL) the NULL restarts the iter at start. */ 540 if(spot) 541 dtio->io_list_iter = spot->next; 542 else if(dtio->io_list) 543 dtio->io_list_iter = dtio->io_list->next; 544 545 /* scan from spot to end-of-io_list */ 546 item = spot; 547 while(item) { 548 if(dtio_find_in_queue(dtio, item->queue)) 549 return 1; 550 item = item->next; 551 } 552 /* scan starting at the start-of-list (to wrap around the end) */ 553 item = dtio->io_list; 554 while(item) { 555 if(dtio_find_in_queue(dtio, item->queue)) 556 return 1; 557 item = item->next; 558 } 559 return 0; 560 } 561 562 /** callback for the dnstap reconnect, to start reconnecting to output */ 563 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 564 short ATTR_UNUSED(bits), void* arg) 565 { 566 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 567 dtio->reconnect_is_added = 0; 568 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 569 570 dtio_open_output(dtio); 571 if(dtio->event) { 572 if(!dtio_add_output_event_write(dtio)) 573 return; 574 /* nothing wrong so far, wait on the output event */ 575 return; 576 } 577 /* exponential backoff and retry on timer */ 578 dtio_reconnect_enable(dtio); 579 } 580 581 /** attempt to reconnect to the output, after a timeout */ 582 static void dtio_reconnect_enable(struct dt_io_thread* dtio) 583 { 584 struct timeval tv; 585 int msec; 586 if(dtio->want_to_exit) return; 587 if(dtio->reconnect_is_added) 588 return; /* already done */ 589 590 /* exponential backoff, store the value for next timeout */ 591 msec = dtio->reconnect_timeout; 592 if(msec == 0) { 593 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 594 } else { 595 dtio->reconnect_timeout = msec*2; 596 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 597 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 598 } 599 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 600 msec); 601 602 /* setup wait timer */ 603 memset(&tv, 0, sizeof(tv)); 604 tv.tv_sec = msec/1000; 605 tv.tv_usec = (msec%1000)*1000; 606 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 607 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 608 log_err("dnstap io: could not reconnect ev timer add"); 609 return; 610 } 611 dtio->reconnect_is_added = 1; 612 } 613 614 /** remove dtio reconnect timer */ 615 static void dtio_reconnect_del(struct dt_io_thread* dtio) 616 { 617 if(!dtio->reconnect_is_added) 618 return; 619 ub_timer_del(dtio->reconnect_timer); 620 dtio->reconnect_is_added = 0; 621 } 622 623 /** clear the reconnect exponential backoff timer. 624 * We have successfully connected so we can try again with short timeouts. */ 625 static void dtio_reconnect_clear(struct dt_io_thread* dtio) 626 { 627 dtio->reconnect_timeout = 0; 628 dtio_reconnect_del(dtio); 629 } 630 631 /** reconnect slowly, because we already know we have to wait for a bit */ 632 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 633 { 634 dtio_reconnect_del(dtio); 635 dtio->reconnect_timeout = msec; 636 dtio_reconnect_enable(dtio); 637 } 638 639 /** delete the current message in the dtio, and reset counters */ 640 static void dtio_cur_msg_free(struct dt_io_thread* dtio) 641 { 642 free(dtio->cur_msg); 643 dtio->cur_msg = NULL; 644 dtio->cur_msg_len = 0; 645 dtio->cur_msg_done = 0; 646 dtio->cur_msg_len_done = 0; 647 } 648 649 /** delete the buffer and counters used to read frame */ 650 static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 651 { 652 if(rb->buf) { 653 free(rb->buf); 654 rb->buf = NULL; 655 } 656 rb->buf_count = 0; 657 rb->buf_cap = 0; 658 rb->frame_len = 0; 659 rb->frame_len_done = 0; 660 rb->control_frame = 0; 661 } 662 663 /** del the output file descriptor event for listening */ 664 static void dtio_del_output_event(struct dt_io_thread* dtio) 665 { 666 if(!dtio->event_added) 667 return; 668 ub_event_del(dtio->event); 669 dtio->event_added = 0; 670 dtio->event_added_is_write = 0; 671 } 672 673 /** close dtio socket and set it to -1 */ 674 static void dtio_close_fd(struct dt_io_thread* dtio) 675 { 676 sock_close(dtio->fd); 677 dtio->fd = -1; 678 } 679 680 /** close and stop the output file descriptor event */ 681 static void dtio_close_output(struct dt_io_thread* dtio) 682 { 683 if(!dtio->event) 684 return; 685 ub_event_free(dtio->event); 686 dtio->event = NULL; 687 if(dtio->ssl) { 688 #ifdef HAVE_SSL 689 SSL_shutdown(dtio->ssl); 690 SSL_free(dtio->ssl); 691 dtio->ssl = NULL; 692 #endif 693 } 694 dtio_close_fd(dtio); 695 696 /* if there is a (partial) message, discard it 697 * we cannot send (the remainder of) it, and a new 698 * connection needs to start with a control frame. */ 699 if(dtio->cur_msg) { 700 dtio_cur_msg_free(dtio); 701 } 702 703 dtio->ready_frame_sent = 0; 704 dtio->accept_frame_received = 0; 705 dtio_read_frame_free(&dtio->read_frame); 706 707 dtio_reconnect_enable(dtio); 708 } 709 710 /** check for pending nonblocking connect errors, 711 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 712 static int dtio_check_nb_connect(struct dt_io_thread* dtio) 713 { 714 int error = 0; 715 socklen_t len = (socklen_t)sizeof(error); 716 if(!dtio->check_nb_connect) 717 return 1; /* everything okay */ 718 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 719 &len) < 0) { 720 #ifndef USE_WINSOCK 721 error = errno; /* on solaris errno is error */ 722 #else 723 error = WSAGetLastError(); 724 #endif 725 } 726 #ifndef USE_WINSOCK 727 #if defined(EINPROGRESS) && defined(EWOULDBLOCK) 728 if(error == EINPROGRESS || error == EWOULDBLOCK) 729 return 0; /* try again later */ 730 #endif 731 #else 732 if(error == WSAEINPROGRESS) { 733 return 0; /* try again later */ 734 } else if(error == WSAEWOULDBLOCK) { 735 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 736 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 737 return 0; /* try again later */ 738 } 739 #endif 740 if(error != 0) { 741 char* to = dtio->socket_path; 742 if(!to) to = dtio->ip_str; 743 if(!to) to = ""; 744 log_err("dnstap io: failed to connect to \"%s\": %s", 745 to, sock_strerror(error)); 746 return -1; /* error, close it */ 747 } 748 749 if(dtio->ip_str) 750 verbose(VERB_DETAIL, "dnstap io: connected to %s", 751 dtio->ip_str); 752 else if(dtio->socket_path) 753 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 754 dtio->socket_path); 755 dtio_reconnect_clear(dtio); 756 dtio->check_nb_connect = 0; 757 return 1; /* everything okay */ 758 } 759 760 #ifdef HAVE_SSL 761 /** write to ssl output 762 * returns number of bytes written, 0 if nothing happened, 763 * try again later, or -1 if the channel is to be closed. */ 764 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 765 size_t len) 766 { 767 int r; 768 ERR_clear_error(); 769 r = SSL_write(dtio->ssl, buf, len); 770 if(r <= 0) { 771 int want = SSL_get_error(dtio->ssl, r); 772 if(want == SSL_ERROR_ZERO_RETURN) { 773 /* closed */ 774 return -1; 775 } else if(want == SSL_ERROR_WANT_READ) { 776 /* we want a brief read event */ 777 dtio_enable_brief_read(dtio); 778 return 0; 779 } else if(want == SSL_ERROR_WANT_WRITE) { 780 /* write again later */ 781 return 0; 782 } else if(want == SSL_ERROR_SYSCALL) { 783 #ifdef EPIPE 784 if(errno == EPIPE && verbosity < 2) 785 return -1; /* silence 'broken pipe' */ 786 #endif 787 #ifdef ECONNRESET 788 if(errno == ECONNRESET && verbosity < 2) 789 return -1; /* silence reset by peer */ 790 #endif 791 if(errno != 0) { 792 log_err("dnstap io, SSL_write syscall: %s", 793 strerror(errno)); 794 } 795 return -1; 796 } 797 log_crypto_err_io("dnstap io, could not SSL_write", want); 798 return -1; 799 } 800 return r; 801 } 802 #endif /* HAVE_SSL */ 803 804 /** write buffer to output. 805 * returns number of bytes written, 0 if nothing happened, 806 * try again later, or -1 if the channel is to be closed. */ 807 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 808 size_t len) 809 { 810 ssize_t ret; 811 if(dtio->fd == -1) 812 return -1; 813 #ifdef HAVE_SSL 814 if(dtio->ssl) 815 return dtio_write_ssl(dtio, buf, len); 816 #endif 817 ret = send(dtio->fd, (void*)buf, len, 0); 818 if(ret == -1) { 819 #ifndef USE_WINSOCK 820 if(errno == EINTR || errno == EAGAIN) 821 return 0; 822 #else 823 if(WSAGetLastError() == WSAEINPROGRESS) 824 return 0; 825 if(WSAGetLastError() == WSAEWOULDBLOCK) { 826 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 827 dtio->stop_flush_event:dtio->event), 828 UB_EV_WRITE); 829 return 0; 830 } 831 #endif 832 log_err("dnstap io: failed send: %s", sock_strerror(errno)); 833 return -1; 834 } 835 return ret; 836 } 837 838 #ifdef HAVE_WRITEV 839 /** write with writev, len and message, in one write, if possible. 840 * return true if message is done, false if incomplete */ 841 static int dtio_write_with_writev(struct dt_io_thread* dtio) 842 { 843 uint32_t sendlen = htonl(dtio->cur_msg_len); 844 struct iovec iov[2]; 845 ssize_t r; 846 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 847 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 848 iov[1].iov_base = dtio->cur_msg; 849 iov[1].iov_len = dtio->cur_msg_len; 850 log_assert(iov[0].iov_len > 0); 851 r = writev(dtio->fd, iov, 2); 852 if(r == -1) { 853 #ifndef USE_WINSOCK 854 if(errno == EINTR || errno == EAGAIN) 855 return 0; 856 #else 857 if(WSAGetLastError() == WSAEINPROGRESS) 858 return 0; 859 if(WSAGetLastError() == WSAEWOULDBLOCK) { 860 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 861 dtio->stop_flush_event:dtio->event), 862 UB_EV_WRITE); 863 return 0; 864 } 865 #endif 866 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 867 /* close the channel */ 868 dtio_del_output_event(dtio); 869 dtio_close_output(dtio); 870 return 0; 871 } 872 /* written r bytes */ 873 dtio->cur_msg_len_done += r; 874 if(dtio->cur_msg_len_done < 4) 875 return 0; 876 if(dtio->cur_msg_len_done > 4) { 877 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 878 dtio->cur_msg_len_done = 4; 879 } 880 if(dtio->cur_msg_done < dtio->cur_msg_len) 881 return 0; 882 return 1; 883 } 884 #endif /* HAVE_WRITEV */ 885 886 /** write more of the length, preceding the data frame. 887 * return true if message is done, false if incomplete. */ 888 static int dtio_write_more_of_len(struct dt_io_thread* dtio) 889 { 890 uint32_t sendlen; 891 int r; 892 if(dtio->cur_msg_len_done >= 4) 893 return 1; 894 #ifdef HAVE_WRITEV 895 if(!dtio->ssl) { 896 /* we try writev for everything.*/ 897 return dtio_write_with_writev(dtio); 898 } 899 #endif /* HAVE_WRITEV */ 900 sendlen = htonl(dtio->cur_msg_len); 901 r = dtio_write_buf(dtio, 902 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 903 sizeof(sendlen)-dtio->cur_msg_len_done); 904 if(r == -1) { 905 /* close the channel */ 906 dtio_del_output_event(dtio); 907 dtio_close_output(dtio); 908 return 0; 909 } else if(r == 0) { 910 /* try again later */ 911 return 0; 912 } 913 dtio->cur_msg_len_done += r; 914 if(dtio->cur_msg_len_done < 4) 915 return 0; 916 return 1; 917 } 918 919 /** write more of the data frame. 920 * return true if message is done, false if incomplete. */ 921 static int dtio_write_more_of_data(struct dt_io_thread* dtio) 922 { 923 int r; 924 if(dtio->cur_msg_done >= dtio->cur_msg_len) 925 return 1; 926 r = dtio_write_buf(dtio, 927 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 928 dtio->cur_msg_len - dtio->cur_msg_done); 929 if(r == -1) { 930 /* close the channel */ 931 dtio_del_output_event(dtio); 932 dtio_close_output(dtio); 933 return 0; 934 } else if(r == 0) { 935 /* try again later */ 936 return 0; 937 } 938 dtio->cur_msg_done += r; 939 if(dtio->cur_msg_done < dtio->cur_msg_len) 940 return 0; 941 return 1; 942 } 943 944 /** write more of the current message. false if incomplete, true if 945 * the message is done */ 946 static int dtio_write_more(struct dt_io_thread* dtio) 947 { 948 if(dtio->cur_msg_len_done < 4) { 949 if(!dtio_write_more_of_len(dtio)) 950 return 0; 951 } 952 if(dtio->cur_msg_done < dtio->cur_msg_len) { 953 if(!dtio_write_more_of_data(dtio)) 954 return 0; 955 } 956 return 1; 957 } 958 959 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 960 * -1: continue, >0: number of bytes read into buffer */ 961 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 962 ssize_t r; 963 r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT); 964 if(r == -1) { 965 char* to = dtio->socket_path; 966 if(!to) to = dtio->ip_str; 967 if(!to) to = ""; 968 #ifndef USE_WINSOCK 969 if(errno == EINTR || errno == EAGAIN) 970 return -1; /* try later */ 971 #else 972 if(WSAGetLastError() == WSAEINPROGRESS) { 973 return -1; /* try later */ 974 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 975 ub_winsock_tcp_wouldblock( 976 (dtio->stop_flush_event? 977 dtio->stop_flush_event:dtio->event), 978 UB_EV_READ); 979 return -1; /* try later */ 980 } 981 #endif 982 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 983 verbosity < 4) 984 return 0; /* no log retries on low verbosity */ 985 log_err("dnstap io: output closed, recv %s: %s", to, 986 strerror(errno)); 987 /* and close below */ 988 return 0; 989 } 990 if(r == 0) { 991 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 992 verbosity < 4) 993 return 0; /* no log retries on low verbosity */ 994 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 995 /* and close below */ 996 return 0; 997 } 998 /* something was received */ 999 return r; 1000 } 1001 1002 #ifdef HAVE_SSL 1003 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 1004 * -1: continue, >0: number of bytes read into buffer */ 1005 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 1006 { 1007 int r; 1008 ERR_clear_error(); 1009 r = SSL_read(dtio->ssl, buf, len); 1010 if(r <= 0) { 1011 int want = SSL_get_error(dtio->ssl, r); 1012 if(want == SSL_ERROR_ZERO_RETURN) { 1013 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1014 verbosity < 4) 1015 return 0; /* no log retries on low verbosity */ 1016 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1017 "other side"); 1018 return 0; 1019 } else if(want == SSL_ERROR_WANT_READ) { 1020 /* continue later */ 1021 return -1; 1022 } else if(want == SSL_ERROR_WANT_WRITE) { 1023 (void)dtio_enable_brief_write(dtio); 1024 return -1; 1025 } else if(want == SSL_ERROR_SYSCALL) { 1026 #ifdef ECONNRESET 1027 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1028 errno == ECONNRESET && verbosity < 4) 1029 return 0; /* silence reset by peer */ 1030 #endif 1031 if(errno != 0) 1032 log_err("SSL_read syscall: %s", 1033 strerror(errno)); 1034 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1035 "other side"); 1036 return 0; 1037 } 1038 log_crypto_err_io("could not SSL_read", want); 1039 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1040 "other side"); 1041 return 0; 1042 } 1043 return r; 1044 } 1045 #endif /* HAVE_SSL */ 1046 1047 /** check if the output fd has been closed, 1048 * it returns false if the stream is closed. */ 1049 static int dtio_check_close(struct dt_io_thread* dtio) 1050 { 1051 /* we don't want to read any packets, but if there are we can 1052 * discard the input (ignore it). Ignore of unknown (control) 1053 * packets is okay for the framestream protocol. And also, the 1054 * read call can return that the stream has been closed by the 1055 * other side. */ 1056 uint8_t buf[1024]; 1057 int r = -1; 1058 1059 1060 if(dtio->fd == -1) return 0; 1061 1062 while(r != 0) { 1063 /* not interested in buffer content, overwrite */ 1064 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 1065 if(r == -1) 1066 return 1; 1067 } 1068 /* the other end has been closed */ 1069 /* close the channel */ 1070 dtio_del_output_event(dtio); 1071 dtio_close_output(dtio); 1072 return 0; 1073 } 1074 1075 /** Read accept frame. Returns -1: continue reading, 0: closed, 1076 * 1: valid accept received. */ 1077 static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1078 { 1079 int r; 1080 size_t read_frame_done; 1081 while(dtio->read_frame.frame_len_done < 4) { 1082 #ifdef HAVE_SSL 1083 if(dtio->ssl) { 1084 r = ssl_read_bytes(dtio, 1085 (uint8_t*)&dtio->read_frame.frame_len+ 1086 dtio->read_frame.frame_len_done, 1087 4-dtio->read_frame.frame_len_done); 1088 } else { 1089 #endif 1090 r = receive_bytes(dtio, 1091 (uint8_t*)&dtio->read_frame.frame_len+ 1092 dtio->read_frame.frame_len_done, 1093 4-dtio->read_frame.frame_len_done); 1094 #ifdef HAVE_SSL 1095 } 1096 #endif 1097 if(r == -1) 1098 return -1; /* continue reading */ 1099 if(r == 0) { 1100 /* connection closed */ 1101 goto close_connection; 1102 } 1103 dtio->read_frame.frame_len_done += r; 1104 if(dtio->read_frame.frame_len_done < 4) 1105 return -1; /* continue reading */ 1106 1107 if(dtio->read_frame.frame_len == 0) { 1108 dtio->read_frame.frame_len_done = 0; 1109 dtio->read_frame.control_frame = 1; 1110 continue; 1111 } 1112 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1113 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1114 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1115 "length of %d bytes, closing connection", 1116 DTIO_RECV_FRAME_MAX_LEN); 1117 goto close_connection; 1118 } 1119 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1120 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1121 if(!dtio->read_frame.buf) { 1122 log_err("dnstap io: out of memory (creating read " 1123 "buffer)"); 1124 goto close_connection; 1125 } 1126 } 1127 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1128 #ifdef HAVE_SSL 1129 if(dtio->ssl) { 1130 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1131 dtio->read_frame.buf_count, 1132 dtio->read_frame.buf_cap- 1133 dtio->read_frame.buf_count); 1134 } else { 1135 #endif 1136 r = receive_bytes(dtio, dtio->read_frame.buf+ 1137 dtio->read_frame.buf_count, 1138 dtio->read_frame.buf_cap- 1139 dtio->read_frame.buf_count); 1140 #ifdef HAVE_SSL 1141 } 1142 #endif 1143 if(r == -1) 1144 return -1; /* continue reading */ 1145 if(r == 0) { 1146 /* connection closed */ 1147 goto close_connection; 1148 } 1149 dtio->read_frame.buf_count += r; 1150 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1151 return -1; /* continue reading */ 1152 } 1153 1154 /* Complete frame received, check if this is a valid ACCEPT control 1155 * frame. */ 1156 if(dtio->read_frame.frame_len < 4) { 1157 verbose(VERB_OPS, "dnstap: invalid data received"); 1158 goto close_connection; 1159 } 1160 if(sldns_read_uint32(dtio->read_frame.buf) != 1161 FSTRM_CONTROL_FRAME_ACCEPT) { 1162 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1163 "ignored"); 1164 dtio->ready_frame_sent = 0; 1165 dtio->accept_frame_received = 0; 1166 dtio_read_frame_free(&dtio->read_frame); 1167 return -1; 1168 } 1169 read_frame_done = 4; /* control frame type */ 1170 1171 /* Iterate over control fields, ignore unknown types. 1172 * Need to be able to read at least 8 bytes (control field type + 1173 * length). */ 1174 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1175 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1176 read_frame_done); 1177 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1178 read_frame_done + 4); 1179 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1180 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1181 read_frame_done+8+len <= 1182 dtio->read_frame.frame_len && 1183 memcmp(dtio->read_frame.buf + read_frame_done + 1184 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1185 if(!dtio_control_start_send(dtio)) { 1186 verbose(VERB_OPS, "dnstap io: out of " 1187 "memory while sending START frame"); 1188 goto close_connection; 1189 } 1190 dtio->accept_frame_received = 1; 1191 if(!dtio_add_output_event_write(dtio)) 1192 goto close_connection; 1193 return 1; 1194 } else { 1195 /* unknown content type */ 1196 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1197 "contains unknown content type, " 1198 "closing connection"); 1199 goto close_connection; 1200 } 1201 } 1202 /* unknown option, try next */ 1203 read_frame_done += 8+len; 1204 } 1205 1206 1207 close_connection: 1208 dtio_del_output_event(dtio); 1209 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1210 dtio_close_output(dtio); 1211 return 0; 1212 } 1213 1214 /** add the output file descriptor event for listening, read only */ 1215 static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1216 { 1217 if(!dtio->event) 1218 return 0; 1219 if(dtio->event_added && !dtio->event_added_is_write) 1220 return 1; 1221 /* we have to (re-)register the event */ 1222 if(dtio->event_added) 1223 ub_event_del(dtio->event); 1224 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1225 if(ub_event_add(dtio->event, NULL) != 0) { 1226 log_err("dnstap io: out of memory (adding event)"); 1227 dtio->event_added = 0; 1228 dtio->event_added_is_write = 0; 1229 /* close output and start reattempts to open it */ 1230 dtio_close_output(dtio); 1231 return 0; 1232 } 1233 dtio->event_added = 1; 1234 dtio->event_added_is_write = 0; 1235 return 1; 1236 } 1237 1238 /** add the output file descriptor event for listening, read and write */ 1239 static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1240 { 1241 if(!dtio->event) 1242 return 0; 1243 if(dtio->event_added && dtio->event_added_is_write) 1244 return 1; 1245 /* we have to (re-)register the event */ 1246 if(dtio->event_added) 1247 ub_event_del(dtio->event); 1248 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1249 if(ub_event_add(dtio->event, NULL) != 0) { 1250 log_err("dnstap io: out of memory (adding event)"); 1251 dtio->event_added = 0; 1252 dtio->event_added_is_write = 0; 1253 /* close output and start reattempts to open it */ 1254 dtio_close_output(dtio); 1255 return 0; 1256 } 1257 dtio->event_added = 1; 1258 dtio->event_added_is_write = 1; 1259 return 1; 1260 } 1261 1262 /** put the dtio thread to sleep */ 1263 static void dtio_sleep(struct dt_io_thread* dtio) 1264 { 1265 /* unregister the event polling for write, because there is 1266 * nothing to be written */ 1267 (void)dtio_add_output_event_read(dtio); 1268 1269 /* Set wakeuptimer enabled off; so that the next worker thread that 1270 * wants to log starts a timer if needed, since the writer thread 1271 * has gone to sleep. */ 1272 lock_basic_lock(&dtio->wakeup_timer_lock); 1273 dtio->wakeup_timer_enabled = 0; 1274 lock_basic_unlock(&dtio->wakeup_timer_lock); 1275 } 1276 1277 #ifdef HAVE_SSL 1278 /** enable the brief read condition */ 1279 static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1280 { 1281 dtio->ssl_brief_read = 1; 1282 if(dtio->stop_flush_event) { 1283 ub_event_del(dtio->stop_flush_event); 1284 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1285 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1286 log_err("dnstap io, stop flush, could not ub_event_add"); 1287 return 0; 1288 } 1289 return 1; 1290 } 1291 return dtio_add_output_event_read(dtio); 1292 } 1293 #endif /* HAVE_SSL */ 1294 1295 #ifdef HAVE_SSL 1296 /** disable the brief read condition */ 1297 static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1298 { 1299 dtio->ssl_brief_read = 0; 1300 if(dtio->stop_flush_event) { 1301 ub_event_del(dtio->stop_flush_event); 1302 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1303 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1304 log_err("dnstap io, stop flush, could not ub_event_add"); 1305 return 0; 1306 } 1307 return 1; 1308 } 1309 return dtio_add_output_event_write(dtio); 1310 } 1311 #endif /* HAVE_SSL */ 1312 1313 #ifdef HAVE_SSL 1314 /** enable the brief write condition */ 1315 static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1316 { 1317 dtio->ssl_brief_write = 1; 1318 return dtio_add_output_event_write(dtio); 1319 } 1320 #endif /* HAVE_SSL */ 1321 1322 #ifdef HAVE_SSL 1323 /** disable the brief write condition */ 1324 static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1325 { 1326 dtio->ssl_brief_write = 0; 1327 return dtio_add_output_event_read(dtio); 1328 } 1329 #endif /* HAVE_SSL */ 1330 1331 #ifdef HAVE_SSL 1332 /** check peer verification after ssl handshake connection, false if closed*/ 1333 static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1334 { 1335 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1336 /* verification */ 1337 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1338 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE 1339 X509* x = SSL_get1_peer_certificate(dtio->ssl); 1340 #else 1341 X509* x = SSL_get_peer_certificate(dtio->ssl); 1342 #endif 1343 if(!x) { 1344 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1345 "connection failed no certificate", 1346 dtio->ip_str); 1347 return 0; 1348 } 1349 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1350 x); 1351 #ifdef HAVE_SSL_GET0_PEERNAME 1352 if(SSL_get0_peername(dtio->ssl)) { 1353 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1354 "connection to %s authenticated", 1355 dtio->ip_str, 1356 SSL_get0_peername(dtio->ssl)); 1357 } else { 1358 #endif 1359 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1360 "connection authenticated", 1361 dtio->ip_str); 1362 #ifdef HAVE_SSL_GET0_PEERNAME 1363 } 1364 #endif 1365 X509_free(x); 1366 } else { 1367 #ifdef HAVE_SSL_GET1_PEER_CERTIFICATE 1368 X509* x = SSL_get1_peer_certificate(dtio->ssl); 1369 #else 1370 X509* x = SSL_get_peer_certificate(dtio->ssl); 1371 #endif 1372 if(x) { 1373 log_cert(VERB_ALGO, "dnstap io, peer " 1374 "certificate", x); 1375 X509_free(x); 1376 } 1377 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1378 "failed: failed to authenticate", 1379 dtio->ip_str); 1380 return 0; 1381 } 1382 } else { 1383 /* unauthenticated, the verify peer flag was not set 1384 * in ssl when the ssl object was created from ssl_ctx */ 1385 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1386 dtio->ip_str); 1387 } 1388 return 1; 1389 } 1390 #endif /* HAVE_SSL */ 1391 1392 #ifdef HAVE_SSL 1393 /** perform ssl handshake, returns 1 if okay, 0 to stop */ 1394 static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1395 struct stop_flush_info* info) 1396 { 1397 int r; 1398 if(dtio->ssl_brief_read) { 1399 /* assume the brief read condition is satisfied, 1400 * if we need more or again, we can set it again */ 1401 if(!dtio_disable_brief_read(dtio)) { 1402 if(info) dtio_stop_flush_exit(info); 1403 return 0; 1404 } 1405 } 1406 if(dtio->ssl_handshake_done) 1407 return 1; 1408 1409 ERR_clear_error(); 1410 r = SSL_do_handshake(dtio->ssl); 1411 if(r != 1) { 1412 int want = SSL_get_error(dtio->ssl, r); 1413 if(want == SSL_ERROR_WANT_READ) { 1414 /* we want to read on the connection */ 1415 if(!dtio_enable_brief_read(dtio)) { 1416 if(info) dtio_stop_flush_exit(info); 1417 return 0; 1418 } 1419 return 0; 1420 } else if(want == SSL_ERROR_WANT_WRITE) { 1421 /* we want to write on the connection */ 1422 return 0; 1423 } else if(r == 0) { 1424 /* closed */ 1425 if(info) dtio_stop_flush_exit(info); 1426 dtio_del_output_event(dtio); 1427 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1428 dtio_close_output(dtio); 1429 return 0; 1430 } else if(want == SSL_ERROR_SYSCALL) { 1431 /* SYSCALL and errno==0 means closed uncleanly */ 1432 int silent = 0; 1433 #ifdef EPIPE 1434 if(errno == EPIPE && verbosity < 2) 1435 silent = 1; /* silence 'broken pipe' */ 1436 #endif 1437 #ifdef ECONNRESET 1438 if(errno == ECONNRESET && verbosity < 2) 1439 silent = 1; /* silence reset by peer */ 1440 #endif 1441 if(errno == 0) 1442 silent = 1; 1443 if(!silent) 1444 log_err("dnstap io, SSL_handshake syscall: %s", 1445 strerror(errno)); 1446 /* closed */ 1447 if(info) dtio_stop_flush_exit(info); 1448 dtio_del_output_event(dtio); 1449 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1450 dtio_close_output(dtio); 1451 return 0; 1452 } else { 1453 unsigned long err = ERR_get_error(); 1454 if(!squelch_err_ssl_handshake(err)) { 1455 log_crypto_err_io_code("dnstap io, ssl handshake failed", 1456 want, err); 1457 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1458 "from %s", dtio->ip_str); 1459 } 1460 /* closed */ 1461 if(info) dtio_stop_flush_exit(info); 1462 dtio_del_output_event(dtio); 1463 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1464 dtio_close_output(dtio); 1465 return 0; 1466 } 1467 1468 } 1469 /* check peer verification */ 1470 dtio->ssl_handshake_done = 1; 1471 1472 if(!dtio_ssl_check_peer(dtio)) { 1473 /* closed */ 1474 if(info) dtio_stop_flush_exit(info); 1475 dtio_del_output_event(dtio); 1476 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1477 dtio_close_output(dtio); 1478 return 0; 1479 } 1480 return 1; 1481 } 1482 #endif /* HAVE_SSL */ 1483 1484 /** callback for the dnstap events, to write to the output */ 1485 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1486 { 1487 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1488 int i; 1489 1490 if(dtio->check_nb_connect) { 1491 int connect_err = dtio_check_nb_connect(dtio); 1492 if(connect_err == -1) { 1493 /* close the channel */ 1494 dtio_del_output_event(dtio); 1495 dtio_close_output(dtio); 1496 return; 1497 } else if(connect_err == 0) { 1498 /* try again later */ 1499 return; 1500 } 1501 /* nonblocking connect check passed, continue */ 1502 } 1503 1504 #ifdef HAVE_SSL 1505 if(dtio->ssl && 1506 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1507 if(!dtio_ssl_handshake(dtio, NULL)) 1508 return; 1509 } 1510 #endif 1511 1512 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1513 #ifdef HAVE_SSL 1514 if(dtio->ssl_brief_write) 1515 (void)dtio_disable_brief_write(dtio); 1516 #endif 1517 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1518 if(dtio_read_accept_frame(dtio) <= 0) 1519 return; 1520 } else if(!dtio_check_close(dtio)) 1521 return; 1522 } 1523 1524 /* loop to process a number of messages. This improves throughput, 1525 * because selecting on write-event if not needed for busy messages 1526 * (dnstap log) generation and if they need to all be written back. 1527 * The write event is usually not blocked up. But not forever, 1528 * because the event loop needs to stay responsive for other events. 1529 * If there are no (more) messages, or if the output buffers get 1530 * full, it returns out of the loop. */ 1531 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1532 /* see if there are messages that need writing */ 1533 if(!dtio->cur_msg) { 1534 if(!dtio_find_msg(dtio)) { 1535 if(i == 0) { 1536 /* no messages on the first iteration, 1537 * the queues are all empty */ 1538 dtio_sleep(dtio); 1539 /* After putting to sleep, see if 1540 * a message is in a message queue, 1541 * if so, resume service. Stops a 1542 * race condition where a thread could 1543 * have one message but the dtio 1544 * also just went to sleep. With the 1545 * message queued between the 1546 * dtio_find_msg and dtio_sleep 1547 * calls. */ 1548 if(dtio_find_msg(dtio)) { 1549 if(!dtio_add_output_event_write(dtio)) 1550 return; 1551 } 1552 } 1553 if(!dtio->cur_msg) 1554 return; /* nothing to do */ 1555 } 1556 } 1557 1558 /* write it */ 1559 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1560 if(!dtio_write_more(dtio)) 1561 return; 1562 } 1563 1564 /* done with the current message */ 1565 dtio_cur_msg_free(dtio); 1566 1567 /* If this is a bidirectional stream the first message will be 1568 * the READY control frame. We can only continue writing after 1569 * receiving an ACCEPT control frame. */ 1570 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1571 dtio->ready_frame_sent = 1; 1572 (void)dtio_add_output_event_read(dtio); 1573 break; 1574 } 1575 } 1576 } 1577 1578 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 1579 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1580 { 1581 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1582 uint8_t cmd; 1583 ssize_t r; 1584 if(dtio->want_to_exit) 1585 return; 1586 r = read(fd, &cmd, sizeof(cmd)); 1587 if(r == -1) { 1588 #ifndef USE_WINSOCK 1589 if(errno == EINTR || errno == EAGAIN) 1590 return; /* ignore this */ 1591 #else 1592 if(WSAGetLastError() == WSAEINPROGRESS) 1593 return; 1594 if(WSAGetLastError() == WSAEWOULDBLOCK) 1595 return; 1596 #endif 1597 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 1598 /* and then fall through to quit the thread */ 1599 } else if(r == 0) { 1600 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1601 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1602 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1603 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1604 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1605 1606 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1607 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1608 "waiting for ACCEPT control frame"); 1609 return; 1610 } 1611 1612 /* reregister event */ 1613 if(!dtio_add_output_event_write(dtio)) 1614 return; 1615 return; 1616 } else if(r == 1) { 1617 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1618 } 1619 dtio->want_to_exit = 1; 1620 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1621 != 0) { 1622 log_err("dnstap io: could not loopexit"); 1623 } 1624 } 1625 1626 #ifndef THREADS_DISABLED 1627 /** setup the event base for the dnstap io thread */ 1628 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1629 struct timeval* now) 1630 { 1631 memset(now, 0, sizeof(*now)); 1632 dtio->event_base = ub_default_event_base(0, secs, now); 1633 if(!dtio->event_base) { 1634 fatal_exit("dnstap io: could not create event_base"); 1635 } 1636 } 1637 #endif /* THREADS_DISABLED */ 1638 1639 /** setup the cmd event for dnstap io */ 1640 static void dtio_setup_cmd(struct dt_io_thread* dtio) 1641 { 1642 struct ub_event* cmdev; 1643 fd_set_nonblock(dtio->commandpipe[0]); 1644 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1645 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1646 if(!cmdev) { 1647 fatal_exit("dnstap io: out of memory"); 1648 } 1649 dtio->command_event = cmdev; 1650 if(ub_event_add(cmdev, NULL) != 0) { 1651 fatal_exit("dnstap io: out of memory (adding event)"); 1652 } 1653 } 1654 1655 /** setup the reconnect event for dnstap io */ 1656 static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1657 { 1658 dtio_reconnect_clear(dtio); 1659 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1660 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1661 if(!dtio->reconnect_timer) { 1662 fatal_exit("dnstap io: out of memory"); 1663 } 1664 } 1665 1666 /** 1667 * structure to keep track of information during stop flush 1668 */ 1669 struct stop_flush_info { 1670 /** the event base during stop flush */ 1671 struct ub_event_base* base; 1672 /** did we already want to exit this stop-flush event base */ 1673 int want_to_exit_flush; 1674 /** has the timer fired */ 1675 int timer_done; 1676 /** the dtio */ 1677 struct dt_io_thread* dtio; 1678 /** the stop control frame */ 1679 void* stop_frame; 1680 /** length of the stop frame */ 1681 size_t stop_frame_len; 1682 /** how much we have done of the stop frame */ 1683 size_t stop_frame_done; 1684 }; 1685 1686 /** exit the stop flush base */ 1687 static void dtio_stop_flush_exit(struct stop_flush_info* info) 1688 { 1689 if(info->want_to_exit_flush) 1690 return; 1691 info->want_to_exit_flush = 1; 1692 if(ub_event_base_loopexit(info->base) != 0) { 1693 log_err("dnstap io: could not loopexit"); 1694 } 1695 } 1696 1697 /** send the stop control, 1698 * return true if completed the frame. */ 1699 static int dtio_control_stop_send(struct stop_flush_info* info) 1700 { 1701 struct dt_io_thread* dtio = info->dtio; 1702 int r; 1703 if(info->stop_frame_done >= info->stop_frame_len) 1704 return 1; 1705 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1706 info->stop_frame_done, info->stop_frame_len - 1707 info->stop_frame_done); 1708 if(r == -1) { 1709 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1710 dtio_stop_flush_exit(info); 1711 return 0; 1712 } 1713 if(r == 0) { 1714 /* try again later, or timeout */ 1715 return 0; 1716 } 1717 info->stop_frame_done += r; 1718 if(info->stop_frame_done < info->stop_frame_len) 1719 return 0; /* not done yet */ 1720 return 1; 1721 } 1722 1723 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1724 void* arg) 1725 { 1726 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1727 if(info->want_to_exit_flush) 1728 return; 1729 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1730 info->timer_done = 1; 1731 dtio_stop_flush_exit(info); 1732 } 1733 1734 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1735 { 1736 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1737 struct dt_io_thread* dtio = info->dtio; 1738 if(info->want_to_exit_flush) 1739 return; 1740 if(dtio->check_nb_connect) { 1741 /* we don't start the stop_flush if connect still 1742 * in progress, but the check code is here, just in case */ 1743 int connect_err = dtio_check_nb_connect(dtio); 1744 if(connect_err == -1) { 1745 /* close the channel, exit the stop flush */ 1746 dtio_stop_flush_exit(info); 1747 dtio_del_output_event(dtio); 1748 dtio_close_output(dtio); 1749 return; 1750 } else if(connect_err == 0) { 1751 /* try again later */ 1752 return; 1753 } 1754 /* nonblocking connect check passed, continue */ 1755 } 1756 #ifdef HAVE_SSL 1757 if(dtio->ssl && 1758 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1759 if(!dtio_ssl_handshake(dtio, info)) 1760 return; 1761 } 1762 #endif 1763 1764 if((bits&UB_EV_READ)) { 1765 if(!dtio_check_close(dtio)) { 1766 if(dtio->fd == -1) { 1767 verbose(VERB_ALGO, "dnstap io: " 1768 "stop flush: output closed"); 1769 dtio_stop_flush_exit(info); 1770 } 1771 return; 1772 } 1773 } 1774 /* write remainder of last frame */ 1775 if(dtio->cur_msg) { 1776 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1777 if(!dtio_write_more(dtio)) { 1778 if(dtio->fd == -1) { 1779 verbose(VERB_ALGO, "dnstap io: " 1780 "stop flush: output closed"); 1781 dtio_stop_flush_exit(info); 1782 } 1783 return; 1784 } 1785 } 1786 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1787 "last frame"); 1788 dtio_cur_msg_free(dtio); 1789 } 1790 /* write stop frame */ 1791 if(info->stop_frame_done < info->stop_frame_len) { 1792 if(!dtio_control_stop_send(info)) 1793 return; 1794 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1795 "stop control frame"); 1796 } 1797 /* when last frame and stop frame are sent, exit */ 1798 dtio_stop_flush_exit(info); 1799 } 1800 1801 /** flush at end, last packet and stop control */ 1802 static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1803 { 1804 /* briefly attempt to flush the previous packet to the output, 1805 * this could be a partial packet, or even the start control frame */ 1806 time_t secs = 0; 1807 struct timeval now; 1808 struct stop_flush_info info; 1809 struct timeval tv; 1810 struct ub_event* timer, *stopev; 1811 1812 if(dtio->fd == -1 || dtio->check_nb_connect) { 1813 /* no connection or we have just connected, so nothing is 1814 * sent yet, so nothing to stop or flush */ 1815 return; 1816 } 1817 if(dtio->ssl && !dtio->ssl_handshake_done) { 1818 /* no SSL connection has been established yet */ 1819 return; 1820 } 1821 1822 memset(&info, 0, sizeof(info)); 1823 memset(&now, 0, sizeof(now)); 1824 info.dtio = dtio; 1825 info.base = ub_default_event_base(0, &secs, &now); 1826 if(!info.base) { 1827 log_err("dnstap io: malloc failure"); 1828 return; 1829 } 1830 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1831 &dtio_stop_timer_cb, &info); 1832 if(!timer) { 1833 log_err("dnstap io: malloc failure"); 1834 ub_event_base_free(info.base); 1835 return; 1836 } 1837 memset(&tv, 0, sizeof(tv)); 1838 tv.tv_sec = 2; 1839 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1840 &tv) != 0) { 1841 log_err("dnstap io: cannot event_timer_add"); 1842 ub_event_free(timer); 1843 ub_event_base_free(info.base); 1844 return; 1845 } 1846 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1847 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1848 if(!stopev) { 1849 log_err("dnstap io: malloc failure"); 1850 ub_timer_del(timer); 1851 ub_event_free(timer); 1852 ub_event_base_free(info.base); 1853 return; 1854 } 1855 if(ub_event_add(stopev, NULL) != 0) { 1856 log_err("dnstap io: cannot event_add"); 1857 ub_event_free(stopev); 1858 ub_timer_del(timer); 1859 ub_event_free(timer); 1860 ub_event_base_free(info.base); 1861 return; 1862 } 1863 info.stop_frame = fstrm_create_control_frame_stop( 1864 &info.stop_frame_len); 1865 if(!info.stop_frame) { 1866 log_err("dnstap io: malloc failure"); 1867 ub_event_del(stopev); 1868 ub_event_free(stopev); 1869 ub_timer_del(timer); 1870 ub_event_free(timer); 1871 ub_event_base_free(info.base); 1872 return; 1873 } 1874 dtio->stop_flush_event = stopev; 1875 1876 /* wait briefly, or until finished */ 1877 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1878 if(ub_event_base_dispatch(info.base) < 0) { 1879 log_err("dnstap io: dispatch flush failed, errno is %s", 1880 strerror(errno)); 1881 } 1882 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1883 free(info.stop_frame); 1884 dtio->stop_flush_event = NULL; 1885 ub_event_del(stopev); 1886 ub_event_free(stopev); 1887 ub_timer_del(timer); 1888 ub_event_free(timer); 1889 ub_event_base_free(info.base); 1890 } 1891 1892 /** perform desetup and free stuff when the dnstap io thread exits */ 1893 static void dtio_desetup(struct dt_io_thread* dtio) 1894 { 1895 dtio_control_stop_flush(dtio); 1896 dtio_del_output_event(dtio); 1897 dtio_close_output(dtio); 1898 ub_event_del(dtio->command_event); 1899 ub_event_free(dtio->command_event); 1900 #ifndef USE_WINSOCK 1901 close(dtio->commandpipe[0]); 1902 #else 1903 _close(dtio->commandpipe[0]); 1904 #endif 1905 dtio->commandpipe[0] = -1; 1906 dtio_reconnect_del(dtio); 1907 ub_event_free(dtio->reconnect_timer); 1908 dtio_cur_msg_free(dtio); 1909 #ifndef THREADS_DISABLED 1910 ub_event_base_free(dtio->event_base); 1911 #endif 1912 } 1913 1914 /** setup a start control message */ 1915 static int dtio_control_start_send(struct dt_io_thread* dtio) 1916 { 1917 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1918 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1919 &dtio->cur_msg_len); 1920 if(!dtio->cur_msg) { 1921 return 0; 1922 } 1923 /* setup to send the control message */ 1924 /* set that the buffer needs to be sent, but the length 1925 * of that buffer is already written, that way the buffer can 1926 * start with 0 length and then the length of the control frame 1927 * in it */ 1928 dtio->cur_msg_done = 0; 1929 dtio->cur_msg_len_done = 4; 1930 return 1; 1931 } 1932 1933 /** setup a ready control message */ 1934 static int dtio_control_ready_send(struct dt_io_thread* dtio) 1935 { 1936 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1937 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1938 &dtio->cur_msg_len); 1939 if(!dtio->cur_msg) { 1940 return 0; 1941 } 1942 /* setup to send the control message */ 1943 /* set that the buffer needs to be sent, but the length 1944 * of that buffer is already written, that way the buffer can 1945 * start with 0 length and then the length of the control frame 1946 * in it */ 1947 dtio->cur_msg_done = 0; 1948 dtio->cur_msg_len_done = 4; 1949 return 1; 1950 } 1951 1952 /** open the output file descriptor for af_local */ 1953 static int dtio_open_output_local(struct dt_io_thread* dtio) 1954 { 1955 #ifdef HAVE_SYS_UN_H 1956 struct sockaddr_un s; 1957 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1958 if(dtio->fd == -1) { 1959 log_err("dnstap io: failed to create socket: %s", 1960 sock_strerror(errno)); 1961 return 0; 1962 } 1963 memset(&s, 0, sizeof(s)); 1964 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1965 /* this member exists on BSDs, not Linux */ 1966 s.sun_len = (unsigned)sizeof(s); 1967 #endif 1968 s.sun_family = AF_LOCAL; 1969 /* length is 92-108, 104 on FreeBSD */ 1970 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1971 fd_set_nonblock(dtio->fd); 1972 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1973 == -1) { 1974 char* to = dtio->socket_path; 1975 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1976 verbosity < 4) { 1977 dtio_close_fd(dtio); 1978 return 0; /* no log retries on low verbosity */ 1979 } 1980 log_err("dnstap io: failed to connect to \"%s\": %s", 1981 to, sock_strerror(errno)); 1982 dtio_close_fd(dtio); 1983 return 0; 1984 } 1985 return 1; 1986 #else 1987 log_err("cannot create af_local socket"); 1988 return 0; 1989 #endif /* HAVE_SYS_UN_H */ 1990 } 1991 1992 /** open the output file descriptor for af_inet and af_inet6 */ 1993 static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1994 { 1995 struct sockaddr_storage addr; 1996 socklen_t addrlen; 1997 memset(&addr, 0, sizeof(addr)); 1998 addrlen = (socklen_t)sizeof(addr); 1999 2000 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) { 2001 log_err("could not parse IP '%s'", dtio->ip_str); 2002 return 0; 2003 } 2004 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 2005 if(dtio->fd == -1) { 2006 log_err("can't create socket: %s", sock_strerror(errno)); 2007 return 0; 2008 } 2009 fd_set_nonblock(dtio->fd); 2010 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 2011 if(errno == EINPROGRESS) 2012 return 1; /* wait until connect done*/ 2013 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 2014 verbosity < 4) { 2015 dtio_close_fd(dtio); 2016 return 0; /* no log retries on low verbosity */ 2017 } 2018 #ifndef USE_WINSOCK 2019 if(tcp_connect_errno_needs_log( 2020 (struct sockaddr *)&addr, addrlen)) { 2021 log_err("dnstap io: failed to connect to %s: %s", 2022 dtio->ip_str, strerror(errno)); 2023 } 2024 #else 2025 if(WSAGetLastError() == WSAEINPROGRESS || 2026 WSAGetLastError() == WSAEWOULDBLOCK) 2027 return 1; /* wait until connect done*/ 2028 if(tcp_connect_errno_needs_log( 2029 (struct sockaddr *)&addr, addrlen)) { 2030 log_err("dnstap io: failed to connect to %s: %s", 2031 dtio->ip_str, wsa_strerror(WSAGetLastError())); 2032 } 2033 #endif 2034 dtio_close_fd(dtio); 2035 return 0; 2036 } 2037 return 1; 2038 } 2039 2040 /** setup the SSL structure for new connection */ 2041 static int dtio_setup_ssl(struct dt_io_thread* dtio) 2042 { 2043 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 2044 if(!dtio->ssl) return 0; 2045 dtio->ssl_handshake_done = 0; 2046 dtio->ssl_brief_read = 0; 2047 2048 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 2049 dtio->tls_use_sni)) { 2050 return 0; 2051 } 2052 return 1; 2053 } 2054 2055 /** open the output file descriptor */ 2056 static void dtio_open_output(struct dt_io_thread* dtio) 2057 { 2058 struct ub_event* ev; 2059 if(dtio->upstream_is_unix) { 2060 if(!dtio_open_output_local(dtio)) { 2061 dtio_reconnect_enable(dtio); 2062 return; 2063 } 2064 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 2065 if(!dtio_open_output_tcp(dtio)) { 2066 dtio_reconnect_enable(dtio); 2067 return; 2068 } 2069 if(dtio->upstream_is_tls) { 2070 if(!dtio_setup_ssl(dtio)) { 2071 dtio_close_fd(dtio); 2072 dtio_reconnect_enable(dtio); 2073 return; 2074 } 2075 } 2076 } 2077 dtio->check_nb_connect = 1; 2078 2079 /* the EV_READ is to read ACCEPT control messages, and catch channel 2080 * close. EV_WRITE is to write packets */ 2081 ev = ub_event_new(dtio->event_base, dtio->fd, 2082 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 2083 dtio); 2084 if(!ev) { 2085 log_err("dnstap io: out of memory"); 2086 if(dtio->ssl) { 2087 #ifdef HAVE_SSL 2088 SSL_free(dtio->ssl); 2089 dtio->ssl = NULL; 2090 #endif 2091 } 2092 dtio_close_fd(dtio); 2093 dtio_reconnect_enable(dtio); 2094 return; 2095 } 2096 dtio->event = ev; 2097 2098 /* setup protocol control message to start */ 2099 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2100 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2101 log_err("dnstap io: out of memory"); 2102 ub_event_free(dtio->event); 2103 dtio->event = NULL; 2104 if(dtio->ssl) { 2105 #ifdef HAVE_SSL 2106 SSL_free(dtio->ssl); 2107 dtio->ssl = NULL; 2108 #endif 2109 } 2110 dtio_close_fd(dtio); 2111 dtio_reconnect_enable(dtio); 2112 return; 2113 } 2114 } 2115 2116 /** perform the setup of the writer thread on the established event_base */ 2117 static void dtio_setup_on_base(struct dt_io_thread* dtio) 2118 { 2119 dtio_setup_cmd(dtio); 2120 dtio_setup_reconnect(dtio); 2121 dtio_open_output(dtio); 2122 if(!dtio_add_output_event_write(dtio)) 2123 return; 2124 } 2125 2126 #ifndef THREADS_DISABLED 2127 /** the IO thread function for the DNSTAP IO */ 2128 static void* dnstap_io(void* arg) 2129 { 2130 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2131 time_t secs = 0; 2132 struct timeval now; 2133 log_thread_set(&dtio->threadnum); 2134 2135 /* setup */ 2136 verbose(VERB_ALGO, "start dnstap io thread"); 2137 dtio_setup_base(dtio, &secs, &now); 2138 dtio_setup_on_base(dtio); 2139 2140 /* run */ 2141 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2142 log_err("dnstap io: dispatch failed, errno is %s", 2143 strerror(errno)); 2144 } 2145 2146 /* cleanup */ 2147 verbose(VERB_ALGO, "stop dnstap io thread"); 2148 dtio_desetup(dtio); 2149 return NULL; 2150 } 2151 #endif /* THREADS_DISABLED */ 2152 2153 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2154 int numworkers) 2155 { 2156 /* set up the thread, can fail */ 2157 #ifndef USE_WINSOCK 2158 if(pipe(dtio->commandpipe) == -1) { 2159 log_err("failed to create pipe: %s", strerror(errno)); 2160 return 0; 2161 } 2162 #else 2163 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2164 log_err("failed to create _pipe: %s", 2165 wsa_strerror(WSAGetLastError())); 2166 return 0; 2167 } 2168 #endif 2169 2170 /* start the thread */ 2171 dtio->threadnum = numworkers+1; 2172 dtio->started = 1; 2173 #ifndef THREADS_DISABLED 2174 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2175 (void)event_base_nothr; 2176 #else 2177 dtio->event_base = event_base_nothr; 2178 dtio_setup_on_base(dtio); 2179 #endif 2180 return 1; 2181 } 2182 2183 void dt_io_thread_stop(struct dt_io_thread* dtio) 2184 { 2185 #ifndef THREADS_DISABLED 2186 uint8_t cmd = DTIO_COMMAND_STOP; 2187 #endif 2188 if(!dtio) return; 2189 if(!dtio->started) return; 2190 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2191 2192 #ifndef THREADS_DISABLED 2193 while(1) { 2194 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2195 if(r == -1) { 2196 #ifndef USE_WINSOCK 2197 if(errno == EINTR || errno == EAGAIN) 2198 continue; 2199 #else 2200 if(WSAGetLastError() == WSAEINPROGRESS) 2201 continue; 2202 if(WSAGetLastError() == WSAEWOULDBLOCK) 2203 continue; 2204 #endif 2205 log_err("dnstap io stop: write: %s", 2206 sock_strerror(errno)); 2207 break; 2208 } 2209 break; 2210 } 2211 dtio->started = 0; 2212 #endif /* THREADS_DISABLED */ 2213 2214 #ifndef USE_WINSOCK 2215 close(dtio->commandpipe[1]); 2216 #else 2217 _close(dtio->commandpipe[1]); 2218 #endif 2219 dtio->commandpipe[1] = -1; 2220 #ifndef THREADS_DISABLED 2221 ub_thread_join(dtio->tid); 2222 #else 2223 dtio->want_to_exit = 1; 2224 dtio_desetup(dtio); 2225 #endif 2226 } 2227