Lines Matching +full:closed +full:- +full:loop
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
92 /** stop from stop_flush event loop */
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, in dt_msg_queue_create()
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); in dt_msg_queue_create()
112 if(!mq->wakeup_timer) { in dt_msg_queue_create()
116 lock_basic_init(&mq->lock); in dt_msg_queue_create()
117 lock_protect(&mq->lock, mq, sizeof(*mq)); in dt_msg_queue_create()
125 struct dt_msg_entry* e = mq->first, *next=NULL; in dt_msg_queue_clear()
127 next = e->next; in dt_msg_queue_clear()
128 free(e->buf); in dt_msg_queue_clear()
132 mq->first = NULL; in dt_msg_queue_clear()
133 mq->last = NULL; in dt_msg_queue_clear()
134 mq->cursize = 0; in dt_msg_queue_clear()
135 mq->msgcount = 0; in dt_msg_queue_clear()
142 lock_basic_destroy(&mq->lock); in dt_msg_queue_delete()
144 comm_timer_delete(mq->wakeup_timer); in dt_msg_queue_delete()
153 if(!dtio->started) return; in dtio_wakeup()
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); in dtio_wakeup()
157 if(r == -1) { in dtio_wakeup()
180 lock_basic_lock(&mq->dtio->wakeup_timer_lock); in mq_wakeup_cb()
181 mq->dtio->wakeup_timer_enabled = 0; in mq_wakeup_cb()
182 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); in mq_wakeup_cb()
183 dtio_wakeup(mq->dtio); in mq_wakeup_cb()
207 lock_basic_lock(&mq->dtio->wakeup_timer_lock); in dt_msg_queue_start_timer()
208 if(mq->dtio->wakeup_timer_enabled) { in dt_msg_queue_start_timer()
212 comm_timer_set(mq->wakeup_timer, &tv); in dt_msg_queue_start_timer()
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); in dt_msg_queue_start_timer()
217 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ in dt_msg_queue_start_timer()
224 if(!comm_timer_is_set(mq->wakeup_timer)) in dt_msg_queue_start_timer()
225 comm_timer_set(mq->wakeup_timer, &tv); in dt_msg_queue_start_timer()
229 comm_timer_set(mq->wakeup_timer, &tv); in dt_msg_queue_start_timer()
231 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); in dt_msg_queue_start_timer()
262 entry->next = NULL; in dt_msg_queue_submit()
263 entry->buf = buf; in dt_msg_queue_submit()
264 entry->len = len; in dt_msg_queue_submit()
267 lock_basic_lock(&mq->lock); in dt_msg_queue_submit()
270 if(mq->first == NULL || !mq->dtio->event_added_is_write) in dt_msg_queue_submit()
274 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || in dt_msg_queue_submit()
275 (mq->cursize < mq->maxsize * 9 / 10 && in dt_msg_queue_submit()
276 mq->cursize+len >= mq->maxsize * 9 / 10)) in dt_msg_queue_submit()
279 if(mq->cursize + len > mq->maxsize) { in dt_msg_queue_submit()
282 lock_basic_unlock(&mq->lock); in dt_msg_queue_submit()
287 mq->cursize += len; in dt_msg_queue_submit()
288 mq->msgcount ++; in dt_msg_queue_submit()
290 if(mq->last) { in dt_msg_queue_submit()
291 mq->last->next = entry; in dt_msg_queue_submit()
293 mq->first = entry; in dt_msg_queue_submit()
295 mq->last = entry; in dt_msg_queue_submit()
297 lock_basic_unlock(&mq->lock); in dt_msg_queue_submit()
307 lock_basic_init(&dtio->wakeup_timer_lock); in dt_io_thread_create()
308 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, in dt_io_thread_create()
309 sizeof(dtio->wakeup_timer_enabled)); in dt_io_thread_create()
317 lock_basic_destroy(&dtio->wakeup_timer_lock); in dt_io_thread_delete()
318 item=dtio->io_list; in dt_io_thread_delete()
320 nextitem = item->next; in dt_io_thread_delete()
324 free(dtio->socket_path); in dt_io_thread_delete()
325 free(dtio->ip_str); in dt_io_thread_delete()
326 free(dtio->tls_server_name); in dt_io_thread_delete()
327 free(dtio->client_key_file); in dt_io_thread_delete()
328 free(dtio->client_cert_file); in dt_io_thread_delete()
329 if(dtio->ssl_ctx) { in dt_io_thread_delete()
331 SSL_CTX_free(dtio->ssl_ctx); in dt_io_thread_delete()
339 if(!cfg->dnstap) { in dt_io_thread_apply_cfg()
340 log_warn("cannot setup dnstap because dnstap-enable is no"); in dt_io_thread_apply_cfg()
345 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { in dt_io_thread_apply_cfg()
346 if(cfg->dnstap_tls) in dt_io_thread_apply_cfg()
347 dtio->upstream_is_tls = 1; in dt_io_thread_apply_cfg()
348 else dtio->upstream_is_tcp = 1; in dt_io_thread_apply_cfg()
350 dtio->upstream_is_unix = 1; in dt_io_thread_apply_cfg()
352 dtio->is_bidirectional = cfg->dnstap_bidirectional; in dt_io_thread_apply_cfg()
354 if(dtio->upstream_is_unix) { in dt_io_thread_apply_cfg()
356 if(!cfg->dnstap_socket_path || in dt_io_thread_apply_cfg()
357 cfg->dnstap_socket_path[0]==0) { in dt_io_thread_apply_cfg()
358 log_err("dnstap setup: no dnstap-socket-path for " in dt_io_thread_apply_cfg()
362 nm = cfg->dnstap_socket_path; in dt_io_thread_apply_cfg()
363 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, in dt_io_thread_apply_cfg()
364 cfg->chrootdir, strlen(cfg->chrootdir)) == 0) in dt_io_thread_apply_cfg()
365 nm += strlen(cfg->chrootdir); in dt_io_thread_apply_cfg()
366 free(dtio->socket_path); in dt_io_thread_apply_cfg()
367 dtio->socket_path = strdup(nm); in dt_io_thread_apply_cfg()
368 if(!dtio->socket_path) { in dt_io_thread_apply_cfg()
374 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { in dt_io_thread_apply_cfg()
375 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { in dt_io_thread_apply_cfg()
376 log_err("dnstap setup: no dnstap-ip for TCP connect"); in dt_io_thread_apply_cfg()
379 free(dtio->ip_str); in dt_io_thread_apply_cfg()
380 dtio->ip_str = strdup(cfg->dnstap_ip); in dt_io_thread_apply_cfg()
381 if(!dtio->ip_str) { in dt_io_thread_apply_cfg()
387 if(dtio->upstream_is_tls) { in dt_io_thread_apply_cfg()
389 if(cfg->dnstap_tls_server_name && in dt_io_thread_apply_cfg()
390 cfg->dnstap_tls_server_name[0]) { in dt_io_thread_apply_cfg()
391 free(dtio->tls_server_name); in dt_io_thread_apply_cfg()
392 dtio->tls_server_name = strdup( in dt_io_thread_apply_cfg()
393 cfg->dnstap_tls_server_name); in dt_io_thread_apply_cfg()
394 if(!dtio->tls_server_name) { in dt_io_thread_apply_cfg()
398 if(!check_auth_name_for_ssl(dtio->tls_server_name)) in dt_io_thread_apply_cfg()
401 if(cfg->dnstap_tls_client_key_file && in dt_io_thread_apply_cfg()
402 cfg->dnstap_tls_client_key_file[0]) { in dt_io_thread_apply_cfg()
403 dtio->use_client_certs = 1; in dt_io_thread_apply_cfg()
404 free(dtio->client_key_file); in dt_io_thread_apply_cfg()
405 dtio->client_key_file = strdup( in dt_io_thread_apply_cfg()
406 cfg->dnstap_tls_client_key_file); in dt_io_thread_apply_cfg()
407 if(!dtio->client_key_file) { in dt_io_thread_apply_cfg()
411 if(!cfg->dnstap_tls_client_cert_file || in dt_io_thread_apply_cfg()
412 cfg->dnstap_tls_client_cert_file[0]==0) { in dt_io_thread_apply_cfg()
415 "dnstap-tls-client-key-file, but " in dt_io_thread_apply_cfg()
416 "no dnstap-tls-client-cert-file " in dt_io_thread_apply_cfg()
420 free(dtio->client_cert_file); in dt_io_thread_apply_cfg()
421 dtio->client_cert_file = strdup( in dt_io_thread_apply_cfg()
422 cfg->dnstap_tls_client_cert_file); in dt_io_thread_apply_cfg()
423 if(!dtio->client_cert_file) { in dt_io_thread_apply_cfg()
428 dtio->use_client_certs = 0; in dt_io_thread_apply_cfg()
429 dtio->client_key_file = NULL; in dt_io_thread_apply_cfg()
430 dtio->client_cert_file = NULL; in dt_io_thread_apply_cfg()
433 if(cfg->dnstap_tls_cert_bundle) { in dt_io_thread_apply_cfg()
434 dtio->ssl_ctx = connect_sslctx_create( in dt_io_thread_apply_cfg()
435 dtio->client_key_file, in dt_io_thread_apply_cfg()
436 dtio->client_cert_file, in dt_io_thread_apply_cfg()
437 cfg->dnstap_tls_cert_bundle, 0); in dt_io_thread_apply_cfg()
439 dtio->ssl_ctx = connect_sslctx_create( in dt_io_thread_apply_cfg()
440 dtio->client_key_file, in dt_io_thread_apply_cfg()
441 dtio->client_cert_file, in dt_io_thread_apply_cfg()
442 cfg->tls_cert_bundle, cfg->tls_win_cert); in dt_io_thread_apply_cfg()
444 if(!dtio->ssl_ctx) { in dt_io_thread_apply_cfg()
448 dtio->tls_use_sni = cfg->tls_use_sni; in dt_io_thread_apply_cfg()
459 lock_basic_lock(&mq->lock); in dt_io_thread_register_queue()
460 mq->dtio = dtio; in dt_io_thread_register_queue()
461 lock_basic_unlock(&mq->lock); in dt_io_thread_register_queue()
462 item->queue = mq; in dt_io_thread_register_queue()
463 item->next = dtio->io_list; in dt_io_thread_register_queue()
464 dtio->io_list = item; in dt_io_thread_register_queue()
465 dtio->io_list_iter = NULL; in dt_io_thread_register_queue()
474 item = dtio->io_list; in dt_io_thread_unregister_queue()
476 if(item->queue == mq) { in dt_io_thread_unregister_queue()
478 if(prev) prev->next = item->next; in dt_io_thread_unregister_queue()
479 else dtio->io_list = item->next; in dt_io_thread_unregister_queue()
481 lock_basic_lock(&item->queue->lock); in dt_io_thread_unregister_queue()
482 item->queue->dtio = NULL; in dt_io_thread_unregister_queue()
483 lock_basic_unlock(&item->queue->lock); in dt_io_thread_unregister_queue()
485 dtio->io_list_iter = NULL; in dt_io_thread_unregister_queue()
489 item = item->next; in dt_io_thread_unregister_queue()
498 lock_basic_lock(&mq->lock); in dt_msg_queue_pop()
499 if(mq->first) { in dt_msg_queue_pop()
500 struct dt_msg_entry* entry = mq->first; in dt_msg_queue_pop()
501 mq->first = entry->next; in dt_msg_queue_pop()
502 if(!entry->next) mq->last = NULL; in dt_msg_queue_pop()
503 mq->cursize -= entry->len; in dt_msg_queue_pop()
504 mq->msgcount --; in dt_msg_queue_pop()
505 lock_basic_unlock(&mq->lock); in dt_msg_queue_pop()
507 *buf = entry->buf; in dt_msg_queue_pop()
508 *len = entry->len; in dt_msg_queue_pop()
512 lock_basic_unlock(&mq->lock); in dt_msg_queue_pop()
523 dtio->cur_msg = buf; in dtio_find_in_queue()
524 dtio->cur_msg_len = len; in dtio_find_in_queue()
525 dtio->cur_msg_done = 0; in dtio_find_in_queue()
526 dtio->cur_msg_len_done = 0; in dtio_find_in_queue()
537 spot = dtio->io_list_iter; in dtio_find_msg()
541 dtio->io_list_iter = spot->next; in dtio_find_msg()
542 else if(dtio->io_list) in dtio_find_msg()
543 dtio->io_list_iter = dtio->io_list->next; in dtio_find_msg()
545 /* scan from spot to end-of-io_list */ in dtio_find_msg()
548 if(dtio_find_in_queue(dtio, item->queue)) in dtio_find_msg()
550 item = item->next; in dtio_find_msg()
552 /* scan starting at the start-of-list (to wrap around the end) */ in dtio_find_msg()
553 item = dtio->io_list; in dtio_find_msg()
555 if(dtio_find_in_queue(dtio, item->queue)) in dtio_find_msg()
557 item = item->next; in dtio_find_msg()
567 dtio->reconnect_is_added = 0; in dtio_reconnect_timeout_cb()
571 if(dtio->event) { in dtio_reconnect_timeout_cb()
586 if(dtio->want_to_exit) return; in dtio_reconnect_enable()
587 if(dtio->reconnect_is_added) in dtio_reconnect_enable()
591 msec = dtio->reconnect_timeout; in dtio_reconnect_enable()
593 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; in dtio_reconnect_enable()
595 dtio->reconnect_timeout = msec*2; in dtio_reconnect_enable()
596 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) in dtio_reconnect_enable()
597 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; in dtio_reconnect_enable()
606 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, in dtio_reconnect_enable()
611 dtio->reconnect_is_added = 1; in dtio_reconnect_enable()
617 if(!dtio->reconnect_is_added) in dtio_reconnect_del()
619 ub_timer_del(dtio->reconnect_timer); in dtio_reconnect_del()
620 dtio->reconnect_is_added = 0; in dtio_reconnect_del()
627 dtio->reconnect_timeout = 0; in dtio_reconnect_clear()
635 dtio->reconnect_timeout = msec; in dtio_reconnect_slow()
642 free(dtio->cur_msg); in dtio_cur_msg_free()
643 dtio->cur_msg = NULL; in dtio_cur_msg_free()
644 dtio->cur_msg_len = 0; in dtio_cur_msg_free()
645 dtio->cur_msg_done = 0; in dtio_cur_msg_free()
646 dtio->cur_msg_len_done = 0; in dtio_cur_msg_free()
652 if(rb->buf) { in dtio_read_frame_free()
653 free(rb->buf); in dtio_read_frame_free()
654 rb->buf = NULL; in dtio_read_frame_free()
656 rb->buf_count = 0; in dtio_read_frame_free()
657 rb->buf_cap = 0; in dtio_read_frame_free()
658 rb->frame_len = 0; in dtio_read_frame_free()
659 rb->frame_len_done = 0; in dtio_read_frame_free()
660 rb->control_frame = 0; in dtio_read_frame_free()
666 if(!dtio->event_added) in dtio_del_output_event()
668 ub_event_del(dtio->event); in dtio_del_output_event()
669 dtio->event_added = 0; in dtio_del_output_event()
670 dtio->event_added_is_write = 0; in dtio_del_output_event()
673 /** close dtio socket and set it to -1 */
676 sock_close(dtio->fd); in dtio_close_fd()
677 dtio->fd = -1; in dtio_close_fd()
683 if(!dtio->event) in dtio_close_output()
685 ub_event_free(dtio->event); in dtio_close_output()
686 dtio->event = NULL; in dtio_close_output()
687 if(dtio->ssl) { in dtio_close_output()
689 SSL_shutdown(dtio->ssl); in dtio_close_output()
690 SSL_free(dtio->ssl); in dtio_close_output()
691 dtio->ssl = NULL; in dtio_close_output()
699 if(dtio->cur_msg) { in dtio_close_output()
703 dtio->ready_frame_sent = 0; in dtio_close_output()
704 dtio->accept_frame_received = 0; in dtio_close_output()
705 dtio_read_frame_free(&dtio->read_frame); in dtio_close_output()
711 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
716 if(!dtio->check_nb_connect) in dtio_check_nb_connect()
718 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, in dtio_check_nb_connect()
735 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? in dtio_check_nb_connect()
736 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); in dtio_check_nb_connect()
741 char* to = dtio->socket_path; in dtio_check_nb_connect()
742 if(!to) to = dtio->ip_str; in dtio_check_nb_connect()
746 return -1; /* error, close it */ in dtio_check_nb_connect()
749 if(dtio->ip_str) in dtio_check_nb_connect()
751 dtio->ip_str); in dtio_check_nb_connect()
752 else if(dtio->socket_path) in dtio_check_nb_connect()
754 dtio->socket_path); in dtio_check_nb_connect()
756 dtio->check_nb_connect = 0; in dtio_check_nb_connect()
763 * try again later, or -1 if the channel is to be closed. */
769 r = SSL_write(dtio->ssl, buf, len); in dtio_write_ssl()
771 int want = SSL_get_error(dtio->ssl, r); in dtio_write_ssl()
773 /* closed */ in dtio_write_ssl()
774 return -1; in dtio_write_ssl()
785 return -1; /* silence 'broken pipe' */ in dtio_write_ssl()
789 return -1; /* silence reset by peer */ in dtio_write_ssl()
795 return -1; in dtio_write_ssl()
798 return -1; in dtio_write_ssl()
806 * try again later, or -1 if the channel is to be closed. */
811 if(dtio->fd == -1) in dtio_write_buf()
812 return -1; in dtio_write_buf()
814 if(dtio->ssl) in dtio_write_buf()
817 ret = send(dtio->fd, (void*)buf, len, 0); in dtio_write_buf()
818 if(ret == -1) { in dtio_write_buf()
826 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? in dtio_write_buf()
827 dtio->stop_flush_event:dtio->event), in dtio_write_buf()
833 return -1; in dtio_write_buf()
843 uint32_t sendlen = htonl(dtio->cur_msg_len); in dtio_write_with_writev()
846 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; in dtio_write_with_writev()
847 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; in dtio_write_with_writev()
848 iov[1].iov_base = dtio->cur_msg; in dtio_write_with_writev()
849 iov[1].iov_len = dtio->cur_msg_len; in dtio_write_with_writev()
851 r = writev(dtio->fd, iov, 2); in dtio_write_with_writev()
852 if(r == -1) { in dtio_write_with_writev()
860 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? in dtio_write_with_writev()
861 dtio->stop_flush_event:dtio->event), in dtio_write_with_writev()
873 dtio->cur_msg_len_done += r; in dtio_write_with_writev()
874 if(dtio->cur_msg_len_done < 4) in dtio_write_with_writev()
876 if(dtio->cur_msg_len_done > 4) { in dtio_write_with_writev()
877 dtio->cur_msg_done = dtio->cur_msg_len_done-4; in dtio_write_with_writev()
878 dtio->cur_msg_len_done = 4; in dtio_write_with_writev()
880 if(dtio->cur_msg_done < dtio->cur_msg_len) in dtio_write_with_writev()
892 if(dtio->cur_msg_len_done >= 4) in dtio_write_more_of_len()
895 if(!dtio->ssl) { in dtio_write_more_of_len()
900 sendlen = htonl(dtio->cur_msg_len); in dtio_write_more_of_len()
902 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, in dtio_write_more_of_len()
903 sizeof(sendlen)-dtio->cur_msg_len_done); in dtio_write_more_of_len()
904 if(r == -1) { in dtio_write_more_of_len()
913 dtio->cur_msg_len_done += r; in dtio_write_more_of_len()
914 if(dtio->cur_msg_len_done < 4) in dtio_write_more_of_len()
924 if(dtio->cur_msg_done >= dtio->cur_msg_len) in dtio_write_more_of_data()
927 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, in dtio_write_more_of_data()
928 dtio->cur_msg_len - dtio->cur_msg_done); in dtio_write_more_of_data()
929 if(r == -1) { in dtio_write_more_of_data()
938 dtio->cur_msg_done += r; in dtio_write_more_of_data()
939 if(dtio->cur_msg_done < dtio->cur_msg_len) in dtio_write_more_of_data()
948 if(dtio->cur_msg_len_done < 4) { in dtio_write_more()
952 if(dtio->cur_msg_done < dtio->cur_msg_len) { in dtio_write_more()
959 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
960 * -1: continue, >0: number of bytes read into buffer */
963 r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT); in receive_bytes()
964 if(r == -1) { in receive_bytes()
965 char* to = dtio->socket_path; in receive_bytes()
966 if(!to) to = dtio->ip_str; in receive_bytes()
970 return -1; /* try later */ in receive_bytes()
973 return -1; /* try later */ in receive_bytes()
976 (dtio->stop_flush_event? in receive_bytes()
977 dtio->stop_flush_event:dtio->event), in receive_bytes()
979 return -1; /* try later */ in receive_bytes()
982 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in receive_bytes()
985 log_err("dnstap io: output closed, recv %s: %s", to, in receive_bytes()
991 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in receive_bytes()
994 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); in receive_bytes()
1003 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
1004 * -1: continue, >0: number of bytes read into buffer */
1009 r = SSL_read(dtio->ssl, buf, len); in ssl_read_bytes()
1011 int want = SSL_get_error(dtio->ssl, r); in ssl_read_bytes()
1013 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in ssl_read_bytes()
1016 verbose(VERB_DETAIL, "dnstap io: output closed by the " in ssl_read_bytes()
1021 return -1; in ssl_read_bytes()
1024 return -1; in ssl_read_bytes()
1027 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in ssl_read_bytes()
1034 verbose(VERB_DETAIL, "dnstap io: output closed by the " in ssl_read_bytes()
1039 verbose(VERB_DETAIL, "dnstap io: output closed by the " in ssl_read_bytes()
1047 /** check if the output fd has been closed,
1048 * it returns false if the stream is closed. */
1054 * read call can return that the stream has been closed by the in dtio_check_close()
1057 int r = -1; in dtio_check_close()
1060 if(dtio->fd == -1) return 0; in dtio_check_close()
1065 if(r == -1) in dtio_check_close()
1068 /* the other end has been closed */ in dtio_check_close()
1075 /** Read accept frame. Returns -1: continue reading, 0: closed,
1081 while(dtio->read_frame.frame_len_done < 4) { in dtio_read_accept_frame()
1083 if(dtio->ssl) { in dtio_read_accept_frame()
1085 (uint8_t*)&dtio->read_frame.frame_len+ in dtio_read_accept_frame()
1086 dtio->read_frame.frame_len_done, in dtio_read_accept_frame()
1087 4-dtio->read_frame.frame_len_done); in dtio_read_accept_frame()
1091 (uint8_t*)&dtio->read_frame.frame_len+ in dtio_read_accept_frame()
1092 dtio->read_frame.frame_len_done, in dtio_read_accept_frame()
1093 4-dtio->read_frame.frame_len_done); in dtio_read_accept_frame()
1097 if(r == -1) in dtio_read_accept_frame()
1098 return -1; /* continue reading */ in dtio_read_accept_frame()
1100 /* connection closed */ in dtio_read_accept_frame()
1103 dtio->read_frame.frame_len_done += r; in dtio_read_accept_frame()
1104 if(dtio->read_frame.frame_len_done < 4) in dtio_read_accept_frame()
1105 return -1; /* continue reading */ in dtio_read_accept_frame()
1107 if(dtio->read_frame.frame_len == 0) { in dtio_read_accept_frame()
1108 dtio->read_frame.frame_len_done = 0; in dtio_read_accept_frame()
1109 dtio->read_frame.control_frame = 1; in dtio_read_accept_frame()
1112 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); in dtio_read_accept_frame()
1113 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { in dtio_read_accept_frame()
1119 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); in dtio_read_accept_frame()
1120 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; in dtio_read_accept_frame()
1121 if(!dtio->read_frame.buf) { in dtio_read_accept_frame()
1127 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { in dtio_read_accept_frame()
1129 if(dtio->ssl) { in dtio_read_accept_frame()
1130 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ in dtio_read_accept_frame()
1131 dtio->read_frame.buf_count, in dtio_read_accept_frame()
1132 dtio->read_frame.buf_cap- in dtio_read_accept_frame()
1133 dtio->read_frame.buf_count); in dtio_read_accept_frame()
1136 r = receive_bytes(dtio, dtio->read_frame.buf+ in dtio_read_accept_frame()
1137 dtio->read_frame.buf_count, in dtio_read_accept_frame()
1138 dtio->read_frame.buf_cap- in dtio_read_accept_frame()
1139 dtio->read_frame.buf_count); in dtio_read_accept_frame()
1143 if(r == -1) in dtio_read_accept_frame()
1144 return -1; /* continue reading */ in dtio_read_accept_frame()
1146 /* connection closed */ in dtio_read_accept_frame()
1149 dtio->read_frame.buf_count += r; in dtio_read_accept_frame()
1150 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) in dtio_read_accept_frame()
1151 return -1; /* continue reading */ in dtio_read_accept_frame()
1156 if(dtio->read_frame.frame_len < 4) { in dtio_read_accept_frame()
1160 if(sldns_read_uint32(dtio->read_frame.buf) != in dtio_read_accept_frame()
1164 dtio->ready_frame_sent = 0; in dtio_read_accept_frame()
1165 dtio->accept_frame_received = 0; in dtio_read_accept_frame()
1166 dtio_read_frame_free(&dtio->read_frame); in dtio_read_accept_frame()
1167 return -1; in dtio_read_accept_frame()
1174 while(read_frame_done+8 < dtio->read_frame.frame_len) { in dtio_read_accept_frame()
1175 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + in dtio_read_accept_frame()
1177 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + in dtio_read_accept_frame()
1182 dtio->read_frame.frame_len && in dtio_read_accept_frame()
1183 memcmp(dtio->read_frame.buf + read_frame_done + in dtio_read_accept_frame()
1190 dtio->accept_frame_received = 1; in dtio_read_accept_frame()
1217 if(!dtio->event) in dtio_add_output_event_read()
1219 if(dtio->event_added && !dtio->event_added_is_write) in dtio_add_output_event_read()
1221 /* we have to (re-)register the event */ in dtio_add_output_event_read()
1222 if(dtio->event_added) in dtio_add_output_event_read()
1223 ub_event_del(dtio->event); in dtio_add_output_event_read()
1224 ub_event_del_bits(dtio->event, UB_EV_WRITE); in dtio_add_output_event_read()
1225 if(ub_event_add(dtio->event, NULL) != 0) { in dtio_add_output_event_read()
1227 dtio->event_added = 0; in dtio_add_output_event_read()
1228 dtio->event_added_is_write = 0; in dtio_add_output_event_read()
1233 dtio->event_added = 1; in dtio_add_output_event_read()
1234 dtio->event_added_is_write = 0; in dtio_add_output_event_read()
1241 if(!dtio->event) in dtio_add_output_event_write()
1243 if(dtio->event_added && dtio->event_added_is_write) in dtio_add_output_event_write()
1245 /* we have to (re-)register the event */ in dtio_add_output_event_write()
1246 if(dtio->event_added) in dtio_add_output_event_write()
1247 ub_event_del(dtio->event); in dtio_add_output_event_write()
1248 ub_event_add_bits(dtio->event, UB_EV_WRITE); in dtio_add_output_event_write()
1249 if(ub_event_add(dtio->event, NULL) != 0) { in dtio_add_output_event_write()
1251 dtio->event_added = 0; in dtio_add_output_event_write()
1252 dtio->event_added_is_write = 0; in dtio_add_output_event_write()
1257 dtio->event_added = 1; in dtio_add_output_event_write()
1258 dtio->event_added_is_write = 1; in dtio_add_output_event_write()
1272 lock_basic_lock(&dtio->wakeup_timer_lock); in dtio_sleep()
1273 dtio->wakeup_timer_enabled = 0; in dtio_sleep()
1274 lock_basic_unlock(&dtio->wakeup_timer_lock); in dtio_sleep()
1281 dtio->ssl_brief_read = 1; in dtio_enable_brief_read()
1282 if(dtio->stop_flush_event) { in dtio_enable_brief_read()
1283 ub_event_del(dtio->stop_flush_event); in dtio_enable_brief_read()
1284 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); in dtio_enable_brief_read()
1285 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { in dtio_enable_brief_read()
1299 dtio->ssl_brief_read = 0; in dtio_disable_brief_read()
1300 if(dtio->stop_flush_event) { in dtio_disable_brief_read()
1301 ub_event_del(dtio->stop_flush_event); in dtio_disable_brief_read()
1302 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); in dtio_disable_brief_read()
1303 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { in dtio_disable_brief_read()
1317 dtio->ssl_brief_write = 1; in dtio_enable_brief_write()
1326 dtio->ssl_brief_write = 0; in dtio_disable_brief_write()
1332 /** check peer verification after ssl handshake connection, false if closed*/
1335 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { in dtio_ssl_check_peer()
1337 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { in dtio_ssl_check_peer()
1339 X509* x = SSL_get1_peer_certificate(dtio->ssl); in dtio_ssl_check_peer()
1341 X509* x = SSL_get_peer_certificate(dtio->ssl); in dtio_ssl_check_peer()
1346 dtio->ip_str); in dtio_ssl_check_peer()
1352 if(SSL_get0_peername(dtio->ssl)) { in dtio_ssl_check_peer()
1355 dtio->ip_str, in dtio_ssl_check_peer()
1356 SSL_get0_peername(dtio->ssl)); in dtio_ssl_check_peer()
1361 dtio->ip_str); in dtio_ssl_check_peer()
1368 X509* x = SSL_get1_peer_certificate(dtio->ssl); in dtio_ssl_check_peer()
1370 X509* x = SSL_get_peer_certificate(dtio->ssl); in dtio_ssl_check_peer()
1379 dtio->ip_str); in dtio_ssl_check_peer()
1386 dtio->ip_str); in dtio_ssl_check_peer()
1398 if(dtio->ssl_brief_read) { in dtio_ssl_handshake()
1406 if(dtio->ssl_handshake_done) in dtio_ssl_handshake()
1410 r = SSL_do_handshake(dtio->ssl); in dtio_ssl_handshake()
1412 int want = SSL_get_error(dtio->ssl, r); in dtio_ssl_handshake()
1424 /* closed */ in dtio_ssl_handshake()
1431 /* SYSCALL and errno==0 means closed uncleanly */ in dtio_ssl_handshake()
1446 /* closed */ in dtio_ssl_handshake()
1458 "from %s", dtio->ip_str); in dtio_ssl_handshake()
1460 /* closed */ in dtio_ssl_handshake()
1470 dtio->ssl_handshake_done = 1; in dtio_ssl_handshake()
1473 /* closed */ in dtio_ssl_handshake()
1490 if(dtio->check_nb_connect) { in dtio_output_cb()
1492 if(connect_err == -1) { in dtio_output_cb()
1505 if(dtio->ssl && in dtio_output_cb()
1506 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { in dtio_output_cb()
1512 if((bits&UB_EV_READ) || dtio->ssl_brief_write) { in dtio_output_cb()
1514 if(dtio->ssl_brief_write) in dtio_output_cb()
1517 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { in dtio_output_cb()
1524 /* loop to process a number of messages. This improves throughput, in dtio_output_cb()
1525 * because selecting on write-event if not needed for busy messages in dtio_output_cb()
1528 * because the event loop needs to stay responsive for other events. in dtio_output_cb()
1530 * full, it returns out of the loop. */ in dtio_output_cb()
1533 if(!dtio->cur_msg) { in dtio_output_cb()
1553 if(!dtio->cur_msg) in dtio_output_cb()
1559 if(dtio->cur_msg_done < dtio->cur_msg_len) { in dtio_output_cb()
1570 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { in dtio_output_cb()
1571 dtio->ready_frame_sent = 1; in dtio_output_cb()
1584 if(dtio->want_to_exit) in dtio_cmd_cb()
1587 if(r == -1) { in dtio_cmd_cb()
1600 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); in dtio_cmd_cb()
1606 if(dtio->is_bidirectional && !dtio->accept_frame_received) { in dtio_cmd_cb()
1619 dtio->want_to_exit = 1; in dtio_cmd_cb()
1620 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) in dtio_cmd_cb()
1632 dtio->event_base = ub_default_event_base(0, secs, now); in dtio_setup_base()
1633 if(!dtio->event_base) { in dtio_setup_base()
1643 fd_set_nonblock(dtio->commandpipe[0]); in dtio_setup_cmd()
1644 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], in dtio_setup_cmd()
1649 dtio->command_event = cmdev; in dtio_setup_cmd()
1659 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, in dtio_setup_reconnect()
1661 if(!dtio->reconnect_timer) { in dtio_setup_reconnect()
1672 /** did we already want to exit this stop-flush event base */
1689 if(info->want_to_exit_flush) in dtio_stop_flush_exit()
1691 info->want_to_exit_flush = 1; in dtio_stop_flush_exit()
1692 if(ub_event_base_loopexit(info->base) != 0) { in dtio_stop_flush_exit()
1701 struct dt_io_thread* dtio = info->dtio; in dtio_control_stop_send()
1703 if(info->stop_frame_done >= info->stop_frame_len) in dtio_control_stop_send()
1705 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + in dtio_control_stop_send()
1706 info->stop_frame_done, info->stop_frame_len - in dtio_control_stop_send()
1707 info->stop_frame_done); in dtio_control_stop_send()
1708 if(r == -1) { in dtio_control_stop_send()
1709 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); in dtio_control_stop_send()
1717 info->stop_frame_done += r; in dtio_control_stop_send()
1718 if(info->stop_frame_done < info->stop_frame_len) in dtio_control_stop_send()
1727 if(info->want_to_exit_flush) in dtio_stop_timer_cb()
1730 info->timer_done = 1; in dtio_stop_timer_cb()
1737 struct dt_io_thread* dtio = info->dtio; in dtio_stop_ev_cb()
1738 if(info->want_to_exit_flush) in dtio_stop_ev_cb()
1740 if(dtio->check_nb_connect) { in dtio_stop_ev_cb()
1744 if(connect_err == -1) { in dtio_stop_ev_cb()
1757 if(dtio->ssl && in dtio_stop_ev_cb()
1758 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { in dtio_stop_ev_cb()
1766 if(dtio->fd == -1) { in dtio_stop_ev_cb()
1768 "stop flush: output closed"); in dtio_stop_ev_cb()
1775 if(dtio->cur_msg) { in dtio_stop_ev_cb()
1776 if(dtio->cur_msg_done < dtio->cur_msg_len) { in dtio_stop_ev_cb()
1778 if(dtio->fd == -1) { in dtio_stop_ev_cb()
1780 "stop flush: output closed"); in dtio_stop_ev_cb()
1791 if(info->stop_frame_done < info->stop_frame_len) { in dtio_stop_ev_cb()
1812 if(dtio->fd == -1 || dtio->check_nb_connect) { in dtio_control_stop_flush()
1817 if(dtio->ssl && !dtio->ssl_handshake_done) { in dtio_control_stop_flush()
1830 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, in dtio_control_stop_flush()
1846 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | in dtio_control_stop_flush()
1874 dtio->stop_flush_event = stopev; in dtio_control_stop_flush()
1884 dtio->stop_flush_event = NULL; in dtio_control_stop_flush()
1898 ub_event_del(dtio->command_event); in dtio_desetup()
1899 ub_event_free(dtio->command_event); in dtio_desetup()
1901 close(dtio->commandpipe[0]); in dtio_desetup()
1903 _close(dtio->commandpipe[0]); in dtio_desetup()
1905 dtio->commandpipe[0] = -1; in dtio_desetup()
1907 ub_event_free(dtio->reconnect_timer); in dtio_desetup()
1910 ub_event_base_free(dtio->event_base); in dtio_desetup()
1917 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); in dtio_control_start_send()
1918 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, in dtio_control_start_send()
1919 &dtio->cur_msg_len); in dtio_control_start_send()
1920 if(!dtio->cur_msg) { in dtio_control_start_send()
1928 dtio->cur_msg_done = 0; in dtio_control_start_send()
1929 dtio->cur_msg_len_done = 4; in dtio_control_start_send()
1936 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); in dtio_control_ready_send()
1937 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, in dtio_control_ready_send()
1938 &dtio->cur_msg_len); in dtio_control_ready_send()
1939 if(!dtio->cur_msg) { in dtio_control_ready_send()
1947 dtio->cur_msg_done = 0; in dtio_control_ready_send()
1948 dtio->cur_msg_len_done = 4; in dtio_control_ready_send()
1957 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); in dtio_open_output_local()
1958 if(dtio->fd == -1) { in dtio_open_output_local()
1969 /* length is 92-108, 104 on FreeBSD */ in dtio_open_output_local()
1970 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); in dtio_open_output_local()
1971 fd_set_nonblock(dtio->fd); in dtio_open_output_local()
1972 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) in dtio_open_output_local()
1973 == -1) { in dtio_open_output_local()
1974 char* to = dtio->socket_path; in dtio_open_output_local()
1975 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in dtio_open_output_local()
2000 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) { in dtio_open_output_tcp()
2001 log_err("could not parse IP '%s'", dtio->ip_str); in dtio_open_output_tcp()
2004 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); in dtio_open_output_tcp()
2005 if(dtio->fd == -1) { in dtio_open_output_tcp()
2009 fd_set_nonblock(dtio->fd); in dtio_open_output_tcp()
2010 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { in dtio_open_output_tcp()
2013 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && in dtio_open_output_tcp()
2022 dtio->ip_str, strerror(errno)); in dtio_open_output_tcp()
2031 dtio->ip_str, wsa_strerror(WSAGetLastError())); in dtio_open_output_tcp()
2043 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); in dtio_setup_ssl()
2044 if(!dtio->ssl) return 0; in dtio_setup_ssl()
2045 dtio->ssl_handshake_done = 0; in dtio_setup_ssl()
2046 dtio->ssl_brief_read = 0; in dtio_setup_ssl()
2048 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, in dtio_setup_ssl()
2049 dtio->tls_use_sni)) { in dtio_setup_ssl()
2059 if(dtio->upstream_is_unix) { in dtio_open_output()
2064 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { in dtio_open_output()
2069 if(dtio->upstream_is_tls) { in dtio_open_output()
2077 dtio->check_nb_connect = 1; in dtio_open_output()
2081 ev = ub_event_new(dtio->event_base, dtio->fd, in dtio_open_output()
2086 if(dtio->ssl) { in dtio_open_output()
2088 SSL_free(dtio->ssl); in dtio_open_output()
2089 dtio->ssl = NULL; in dtio_open_output()
2096 dtio->event = ev; in dtio_open_output()
2099 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || in dtio_open_output()
2100 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { in dtio_open_output()
2102 ub_event_free(dtio->event); in dtio_open_output()
2103 dtio->event = NULL; in dtio_open_output()
2104 if(dtio->ssl) { in dtio_open_output()
2106 SSL_free(dtio->ssl); in dtio_open_output()
2107 dtio->ssl = NULL; in dtio_open_output()
2133 log_thread_set(&dtio->threadnum); in dnstap_io()
2141 if(ub_event_base_dispatch(dtio->event_base) < 0) { in dnstap_io()
2158 if(pipe(dtio->commandpipe) == -1) { in dt_io_thread_start()
2163 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { in dt_io_thread_start()
2171 dtio->threadnum = numworkers+1; in dt_io_thread_start()
2172 dtio->started = 1; in dt_io_thread_start()
2174 ub_thread_create(&dtio->tid, dnstap_io, dtio); in dt_io_thread_start()
2177 dtio->event_base = event_base_nothr; in dt_io_thread_start()
2189 if(!dtio->started) return; in dt_io_thread_stop()
2194 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); in dt_io_thread_stop()
2195 if(r == -1) { in dt_io_thread_stop()
2211 dtio->started = 0; in dt_io_thread_stop()
2215 close(dtio->commandpipe[1]); in dt_io_thread_stop()
2217 _close(dtio->commandpipe[1]); in dt_io_thread_stop()
2219 dtio->commandpipe[1] = -1; in dt_io_thread_stop()
2221 ub_thread_join(dtio->tid); in dt_io_thread_stop()
2223 dtio->want_to_exit = 1; in dt_io_thread_stop()