125039b37SCy Schubert /* 225039b37SCy Schubert * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP 325039b37SCy Schubert * 425039b37SCy Schubert * Copyright (c) 2020, NLnet Labs. All rights reserved. 525039b37SCy Schubert * 625039b37SCy Schubert * This software is open source. 725039b37SCy Schubert * 825039b37SCy Schubert * Redistribution and use in source and binary forms, with or without 925039b37SCy Schubert * modification, are permitted provided that the following conditions 1025039b37SCy Schubert * are met: 1125039b37SCy Schubert * 1225039b37SCy Schubert * Redistributions of source code must retain the above copyright notice, 1325039b37SCy Schubert * this list of conditions and the following disclaimer. 1425039b37SCy Schubert * 1525039b37SCy Schubert * Redistributions in binary form must reproduce the above copyright notice, 1625039b37SCy Schubert * this list of conditions and the following disclaimer in the documentation 1725039b37SCy Schubert * and/or other materials provided with the distribution. 1825039b37SCy Schubert * 1925039b37SCy Schubert * Neither the name of the NLNET LABS nor the names of its contributors may 2025039b37SCy Schubert * be used to endorse or promote products derived from this software without 2125039b37SCy Schubert * specific prior written permission. 2225039b37SCy Schubert * 2325039b37SCy Schubert * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2425039b37SCy Schubert * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 2525039b37SCy Schubert * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 2625039b37SCy Schubert * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 2725039b37SCy Schubert * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 2825039b37SCy Schubert * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 2925039b37SCy Schubert * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 3025039b37SCy Schubert * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 3125039b37SCy Schubert * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 3225039b37SCy Schubert * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 3325039b37SCy Schubert * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3425039b37SCy Schubert * 3525039b37SCy Schubert */ 3625039b37SCy Schubert 3725039b37SCy Schubert /** 3825039b37SCy Schubert * \file 3925039b37SCy Schubert * 4025039b37SCy Schubert * An implementation of the Frame Streams data transport protocol for 4125039b37SCy Schubert * the Unbound DNSTAP message logging facility. 4225039b37SCy Schubert */ 4325039b37SCy Schubert 4425039b37SCy Schubert #ifndef DTSTREAM_H 4525039b37SCy Schubert #define DTSTREAM_H 4625039b37SCy Schubert 4725039b37SCy Schubert #include "util/locks.h" 4825039b37SCy Schubert struct dt_msg_entry; 4925039b37SCy Schubert struct dt_io_list_item; 5025039b37SCy Schubert struct dt_io_thread; 5125039b37SCy Schubert struct config_file; 52*c0caa2e2SCy Schubert struct comm_base; 5325039b37SCy Schubert 5425039b37SCy Schubert /** 5525039b37SCy Schubert * A message buffer with dnstap messages queued up. It is per-worker. 5625039b37SCy Schubert * It has locks to synchronize. If the buffer is full, a new message 5725039b37SCy Schubert * cannot be added and is discarded. A thread reads the messages and sends 5825039b37SCy Schubert * them. 5925039b37SCy Schubert */ 6025039b37SCy Schubert struct dt_msg_queue { 6125039b37SCy Schubert /** lock of the buffer structure. Hold this lock to add or remove 6225039b37SCy Schubert * entries to the buffer. Release it so that other threads can also 6325039b37SCy Schubert * put messages to log, or a message can be taken out to send away 6425039b37SCy Schubert * by the writer thread. 6525039b37SCy Schubert */ 6625039b37SCy Schubert lock_basic_type lock; 6725039b37SCy Schubert /** the maximum size of the buffer, in bytes */ 6825039b37SCy Schubert size_t maxsize; 6925039b37SCy Schubert /** current size of the buffer, in bytes. data bytes of messages. 7025039b37SCy Schubert * If a new message make it more than maxsize, the buffer is full */ 7125039b37SCy Schubert size_t cursize; 72*c0caa2e2SCy Schubert /** number of messages in the queue */ 73*c0caa2e2SCy Schubert int msgcount; 7425039b37SCy Schubert /** list of messages. The messages are added to the back and taken 7525039b37SCy Schubert * out from the front. */ 7625039b37SCy Schubert struct dt_msg_entry* first, *last; 7725039b37SCy Schubert /** reference to the io thread to wakeup */ 7825039b37SCy Schubert struct dt_io_thread* dtio; 79*c0caa2e2SCy Schubert /** the wakeup timer for dtio, on worker event base */ 80*c0caa2e2SCy Schubert struct comm_timer* wakeup_timer; 8125039b37SCy Schubert }; 8225039b37SCy Schubert 8325039b37SCy Schubert /** 8425039b37SCy Schubert * An entry in the dt_msg_queue. contains one DNSTAP message. 8525039b37SCy Schubert * It is malloced. 8625039b37SCy Schubert */ 8725039b37SCy Schubert struct dt_msg_entry { 8825039b37SCy Schubert /** next in the list. */ 8925039b37SCy Schubert struct dt_msg_entry* next; 9025039b37SCy Schubert /** the buffer with the data to send, an encoded DNSTAP message */ 9125039b37SCy Schubert void* buf; 9225039b37SCy Schubert /** the length to send. */ 9325039b37SCy Schubert size_t len; 9425039b37SCy Schubert }; 9525039b37SCy Schubert 9625039b37SCy Schubert /** 9725039b37SCy Schubert * Containing buffer and counter for reading DNSTAP frames. 9825039b37SCy Schubert */ 9925039b37SCy Schubert struct dt_frame_read_buf { 10025039b37SCy Schubert /** Buffer containing frame, except length counter(s). */ 10125039b37SCy Schubert void* buf; 10225039b37SCy Schubert /** Number of bytes written to buffer. */ 10325039b37SCy Schubert size_t buf_count; 10425039b37SCy Schubert /** Capacity of the buffer. */ 10525039b37SCy Schubert size_t buf_cap; 10625039b37SCy Schubert 10725039b37SCy Schubert /** Frame length field. Will contain the 2nd length field for control 10825039b37SCy Schubert * frames. */ 10925039b37SCy Schubert uint32_t frame_len; 11025039b37SCy Schubert /** Number of bytes that have been written to the frame_length field. */ 11125039b37SCy Schubert size_t frame_len_done; 11225039b37SCy Schubert 11325039b37SCy Schubert /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */ 11425039b37SCy Schubert int control_frame; 11525039b37SCy Schubert }; 11625039b37SCy Schubert 11725039b37SCy Schubert /** 11825039b37SCy Schubert * IO thread that reads from the queues and writes them. 11925039b37SCy Schubert */ 12025039b37SCy Schubert struct dt_io_thread { 12125039b37SCy Schubert /** the thread number for the dtio thread, 12225039b37SCy Schubert * must be first to cast thread arg to int* in checklock code. */ 12325039b37SCy Schubert int threadnum; 12425039b37SCy Schubert /** event base, for event handling */ 12525039b37SCy Schubert void* event_base; 12625039b37SCy Schubert /** list of queues that is registered to get written */ 12725039b37SCy Schubert struct dt_io_list_item* io_list; 12825039b37SCy Schubert /** iterator point in the io_list, to pick from them in a 12925039b37SCy Schubert * round-robin fashion, instead of only from the first when busy. 13025039b37SCy Schubert * if NULL it means start at the start of the list. */ 13125039b37SCy Schubert struct dt_io_list_item* io_list_iter; 13225039b37SCy Schubert /** thread id, of the io thread */ 13325039b37SCy Schubert ub_thread_type tid; 13425039b37SCy Schubert /** if the io processing has started */ 13525039b37SCy Schubert int started; 13625039b37SCy Schubert /** ssl context for the io thread, for tls connections. type SSL_CTX* */ 13725039b37SCy Schubert void* ssl_ctx; 13825039b37SCy Schubert /** if SNI will be used for TLS connections. */ 13925039b37SCy Schubert int tls_use_sni; 14025039b37SCy Schubert 14125039b37SCy Schubert /** file descriptor that the thread writes to */ 14225039b37SCy Schubert int fd; 14325039b37SCy Schubert /** event structure that the thread uses */ 14425039b37SCy Schubert void* event; 14525039b37SCy Schubert /** the event is added */ 14625039b37SCy Schubert int event_added; 14725039b37SCy Schubert /** event added is a write event */ 14825039b37SCy Schubert int event_added_is_write; 14925039b37SCy Schubert /** check for nonblocking connect errors on fd */ 15025039b37SCy Schubert int check_nb_connect; 15125039b37SCy Schubert /** ssl for current connection, type SSL* */ 15225039b37SCy Schubert void* ssl; 15325039b37SCy Schubert /** true if the handshake for SSL is done, 0 if not */ 15425039b37SCy Schubert int ssl_handshake_done; 15525039b37SCy Schubert /** true if briefly the SSL wants a read event, 0 if not. 15625039b37SCy Schubert * This happens during negotiation, we then do not want to write, 15725039b37SCy Schubert * but wait for a read event. */ 15825039b37SCy Schubert int ssl_brief_read; 15925039b37SCy Schubert /** true if SSL_read is waiting for a write event. Set back to 0 after 16025039b37SCy Schubert * single write event is handled. */ 16125039b37SCy Schubert int ssl_brief_write; 16225039b37SCy Schubert 16325039b37SCy Schubert /** the buffer that currently getting written, or NULL if no 16425039b37SCy Schubert * (partial) message written now */ 16525039b37SCy Schubert void* cur_msg; 16625039b37SCy Schubert /** length of the current message */ 16725039b37SCy Schubert size_t cur_msg_len; 16825039b37SCy Schubert /** number of bytes written for the current message */ 16925039b37SCy Schubert size_t cur_msg_done; 17025039b37SCy Schubert /** number of bytes of the length that have been written, 17125039b37SCy Schubert * for the current message length that precedes the frame */ 17225039b37SCy Schubert size_t cur_msg_len_done; 17325039b37SCy Schubert 174*c0caa2e2SCy Schubert /** lock on wakeup_timer_enabled */ 175*c0caa2e2SCy Schubert lock_basic_type wakeup_timer_lock; 176*c0caa2e2SCy Schubert /** if wakeup timer is enabled in some thread */ 177*c0caa2e2SCy Schubert int wakeup_timer_enabled; 17825039b37SCy Schubert /** command pipe that stops the pipe if closed. Used to quit 17925039b37SCy Schubert * the program. [0] is read, [1] is written to. */ 18025039b37SCy Schubert int commandpipe[2]; 18125039b37SCy Schubert /** the event to listen to the commandpipe */ 18225039b37SCy Schubert void* command_event; 18325039b37SCy Schubert /** the io thread wants to exit */ 18425039b37SCy Schubert int want_to_exit; 18525039b37SCy Schubert 18625039b37SCy Schubert /** in stop flush, this is nonNULL and references the stop_ev */ 18725039b37SCy Schubert void* stop_flush_event; 18825039b37SCy Schubert 18925039b37SCy Schubert /** the timer event for connection retries */ 19025039b37SCy Schubert void* reconnect_timer; 19125039b37SCy Schubert /** if the reconnect timer is added to the event base */ 19225039b37SCy Schubert int reconnect_is_added; 19325039b37SCy Schubert /** the current reconnection timeout, it is increased with 19425039b37SCy Schubert * exponential backoff, in msec */ 19525039b37SCy Schubert int reconnect_timeout; 19625039b37SCy Schubert 19725039b37SCy Schubert /** If the log server is connected to over unix domain sockets, 19825039b37SCy Schubert * eg. a file is named that is created to log onto. */ 19925039b37SCy Schubert int upstream_is_unix; 20025039b37SCy Schubert /** if the log server is connected to over TCP. The ip address and 20125039b37SCy Schubert * port are used */ 20225039b37SCy Schubert int upstream_is_tcp; 20325039b37SCy Schubert /** if the log server is connected to over TLS. ip address, port, 20425039b37SCy Schubert * and client certificates can be used for authentication. */ 20525039b37SCy Schubert int upstream_is_tls; 20625039b37SCy Schubert 20725039b37SCy Schubert /** Perform bidirectional Frame Streams handshake before sending 20825039b37SCy Schubert * messages. */ 20925039b37SCy Schubert int is_bidirectional; 21025039b37SCy Schubert /** Set if the READY control frame has been sent. */ 21125039b37SCy Schubert int ready_frame_sent; 21225039b37SCy Schubert /** Set if valid ACCEPT frame is received. */ 21325039b37SCy Schubert int accept_frame_received; 21425039b37SCy Schubert /** (partially) read frame */ 21525039b37SCy Schubert struct dt_frame_read_buf read_frame; 21625039b37SCy Schubert 21725039b37SCy Schubert /** the file path for unix socket (or NULL) */ 21825039b37SCy Schubert char* socket_path; 21925039b37SCy Schubert /** the ip address and port number (or NULL) */ 22025039b37SCy Schubert char* ip_str; 22125039b37SCy Schubert /** is the TLS upstream authenticated by name, if nonNULL, 22225039b37SCy Schubert * we use the same cert bundle as used by other TLS streams. */ 22325039b37SCy Schubert char* tls_server_name; 22425039b37SCy Schubert /** are client certificates in use */ 22525039b37SCy Schubert int use_client_certs; 22625039b37SCy Schubert /** client cert files: the .key file */ 22725039b37SCy Schubert char* client_key_file; 22825039b37SCy Schubert /** client cert files: the .pem file */ 22925039b37SCy Schubert char* client_cert_file; 23025039b37SCy Schubert }; 23125039b37SCy Schubert 23225039b37SCy Schubert /** 23325039b37SCy Schubert * IO thread list of queues list item 23425039b37SCy Schubert * lists a worker queue that should be looked at and sent to the log server. 23525039b37SCy Schubert */ 23625039b37SCy Schubert struct dt_io_list_item { 23725039b37SCy Schubert /** next in the list of buffers to inspect */ 23825039b37SCy Schubert struct dt_io_list_item* next; 23925039b37SCy Schubert /** buffer of this worker */ 24025039b37SCy Schubert struct dt_msg_queue* queue; 24125039b37SCy Schubert }; 24225039b37SCy Schubert 24325039b37SCy Schubert /** 24425039b37SCy Schubert * Create new (empty) worker message queue. Limit set to default on max. 245*c0caa2e2SCy Schubert * @param base: event base for wakeup timer. 24625039b37SCy Schubert * @return NULL on malloc failure or a new queue (not locked). 24725039b37SCy Schubert */ 248*c0caa2e2SCy Schubert struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); 24925039b37SCy Schubert 25025039b37SCy Schubert /** 25125039b37SCy Schubert * Delete a worker message queue. It has to be unlinked from access, 25225039b37SCy Schubert * so it can be deleted without lock worries. The queue is emptied (deleted). 25325039b37SCy Schubert * @param mq: message queue. 25425039b37SCy Schubert */ 25525039b37SCy Schubert void dt_msg_queue_delete(struct dt_msg_queue* mq); 25625039b37SCy Schubert 25725039b37SCy Schubert /** 25825039b37SCy Schubert * Submit a message to the queue. The queue is locked by the routine, 25925039b37SCy Schubert * the message is inserted, and then the queue is unlocked so the 26025039b37SCy Schubert * message can be picked up by the writer thread. 26125039b37SCy Schubert * @param mq: message queue. 26225039b37SCy Schubert * @param buf: buffer with message (dnstap contents). 26325039b37SCy Schubert * The buffer must have been malloced by caller. It is linked in 26425039b37SCy Schubert * the queue, and is free()d after use. If the routine fails 26525039b37SCy Schubert * the buffer is freed as well (and nothing happens, the item 26625039b37SCy Schubert * could not be logged). 26725039b37SCy Schubert * @param len: length of buffer. 26825039b37SCy Schubert */ 26925039b37SCy Schubert void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); 27025039b37SCy Schubert 271*c0caa2e2SCy Schubert /** timer callback to wakeup dtio thread to process messages */ 272*c0caa2e2SCy Schubert void mq_wakeup_cb(void* arg); 273*c0caa2e2SCy Schubert 27425039b37SCy Schubert /** 27525039b37SCy Schubert * Create IO thread. 27625039b37SCy Schubert * @return new io thread object. not yet started. or NULL malloc failure. 27725039b37SCy Schubert */ 27825039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void); 27925039b37SCy Schubert 28025039b37SCy Schubert /** 28125039b37SCy Schubert * Delete the IO thread structure. 28225039b37SCy Schubert * @param dtio: the io thread that is deleted. It must not be running. 28325039b37SCy Schubert */ 28425039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio); 28525039b37SCy Schubert 28625039b37SCy Schubert /** 28725039b37SCy Schubert * Apply config to the dtio thread 28825039b37SCy Schubert * @param dtio: io thread, not yet started. 28925039b37SCy Schubert * @param cfg: config file struct. 29025039b37SCy Schubert * @return false on malloc failure. 29125039b37SCy Schubert */ 29225039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, 29325039b37SCy Schubert struct config_file *cfg); 29425039b37SCy Schubert 29525039b37SCy Schubert /** 29625039b37SCy Schubert * Register a msg queue to the io thread. It will be polled to see if 29725039b37SCy Schubert * there are messages and those then get removed and sent, when the thread 29825039b37SCy Schubert * is running. 29925039b37SCy Schubert * @param dtio: the io thread. 30025039b37SCy Schubert * @param mq: message queue to register. 30125039b37SCy Schubert * @return false on failure (malloc failure). 30225039b37SCy Schubert */ 30325039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio, 30425039b37SCy Schubert struct dt_msg_queue* mq); 30525039b37SCy Schubert 30625039b37SCy Schubert /** 30725039b37SCy Schubert * Unregister queue from io thread. 30825039b37SCy Schubert * @param dtio: the io thread. 30925039b37SCy Schubert * @param mq: message queue. 31025039b37SCy Schubert */ 31125039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 31225039b37SCy Schubert struct dt_msg_queue* mq); 31325039b37SCy Schubert 31425039b37SCy Schubert /** 31525039b37SCy Schubert * Start the io thread 31625039b37SCy Schubert * @param dtio: the io thread. 31725039b37SCy Schubert * @param event_base_nothr: the event base to attach the events to, in case 31825039b37SCy Schubert * we are running without threads. With threads, this is ignored 31925039b37SCy Schubert * and a thread is started to process the dnstap log messages. 32025039b37SCy Schubert * @param numworkers: number of worker threads. The dnstap io thread is 32125039b37SCy Schubert * that number +1 as the threadnumber (in logs). 32225039b37SCy Schubert * @return false on failure. 32325039b37SCy Schubert */ 32425039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 32525039b37SCy Schubert int numworkers); 32625039b37SCy Schubert 32725039b37SCy Schubert /** 32825039b37SCy Schubert * Stop the io thread 32925039b37SCy Schubert * @param dtio: the io thread. 33025039b37SCy Schubert */ 33125039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio); 33225039b37SCy Schubert 33325039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */ 33425039b37SCy Schubert void dtio_reconnect_timeout_cb(int fd, short bits, void* arg); 33525039b37SCy Schubert 33625039b37SCy Schubert /** callback for the dnstap events, to write to the output */ 33725039b37SCy Schubert void dtio_output_cb(int fd, short bits, void* arg); 33825039b37SCy Schubert 33925039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */ 34025039b37SCy Schubert void dtio_cmd_cb(int fd, short bits, void* arg); 34125039b37SCy Schubert 34225039b37SCy Schubert /** callback for the timer when the thread stops and wants to finish up */ 34325039b37SCy Schubert void dtio_stop_timer_cb(int fd, short bits, void* arg); 34425039b37SCy Schubert 34525039b37SCy Schubert /** callback for the output when the thread stops and wants to finish up */ 34625039b37SCy Schubert void dtio_stop_ev_cb(int fd, short bits, void* arg); 34725039b37SCy Schubert 34825039b37SCy Schubert /** callback for unbound-dnstap-socket */ 34925039b37SCy Schubert void dtio_tap_callback(int fd, short bits, void* arg); 35025039b37SCy Schubert 35125039b37SCy Schubert /** callback for unbound-dnstap-socket */ 35225039b37SCy Schubert void dtio_mainfdcallback(int fd, short bits, void* arg); 35325039b37SCy Schubert 35425039b37SCy Schubert #endif /* DTSTREAM_H */ 355