xref: /freebsd/contrib/unbound/dnstap/dtstream.h (revision c0caa2e24e9a8c64aa01e4265e8b989ba74b9702)
125039b37SCy Schubert /*
225039b37SCy Schubert  * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
325039b37SCy Schubert  *
425039b37SCy Schubert  * Copyright (c) 2020, NLnet Labs. All rights reserved.
525039b37SCy Schubert  *
625039b37SCy Schubert  * This software is open source.
725039b37SCy Schubert  *
825039b37SCy Schubert  * Redistribution and use in source and binary forms, with or without
925039b37SCy Schubert  * modification, are permitted provided that the following conditions
1025039b37SCy Schubert  * are met:
1125039b37SCy Schubert  *
1225039b37SCy Schubert  * Redistributions of source code must retain the above copyright notice,
1325039b37SCy Schubert  * this list of conditions and the following disclaimer.
1425039b37SCy Schubert  *
1525039b37SCy Schubert  * Redistributions in binary form must reproduce the above copyright notice,
1625039b37SCy Schubert  * this list of conditions and the following disclaimer in the documentation
1725039b37SCy Schubert  * and/or other materials provided with the distribution.
1825039b37SCy Schubert  *
1925039b37SCy Schubert  * Neither the name of the NLNET LABS nor the names of its contributors may
2025039b37SCy Schubert  * be used to endorse or promote products derived from this software without
2125039b37SCy Schubert  * specific prior written permission.
2225039b37SCy Schubert  *
2325039b37SCy Schubert  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
2425039b37SCy Schubert  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
2525039b37SCy Schubert  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
2625039b37SCy Schubert  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
2725039b37SCy Schubert  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
2825039b37SCy Schubert  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
2925039b37SCy Schubert  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
3025039b37SCy Schubert  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
3125039b37SCy Schubert  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
3225039b37SCy Schubert  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
3325039b37SCy Schubert  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3425039b37SCy Schubert  *
3525039b37SCy Schubert  */
3625039b37SCy Schubert 
3725039b37SCy Schubert /**
3825039b37SCy Schubert  * \file
3925039b37SCy Schubert  *
4025039b37SCy Schubert  * An implementation of the Frame Streams data transport protocol for
4125039b37SCy Schubert  * the Unbound DNSTAP message logging facility.
4225039b37SCy Schubert  */
4325039b37SCy Schubert 
4425039b37SCy Schubert #ifndef DTSTREAM_H
4525039b37SCy Schubert #define DTSTREAM_H
4625039b37SCy Schubert 
4725039b37SCy Schubert #include "util/locks.h"
4825039b37SCy Schubert struct dt_msg_entry;
4925039b37SCy Schubert struct dt_io_list_item;
5025039b37SCy Schubert struct dt_io_thread;
5125039b37SCy Schubert struct config_file;
52*c0caa2e2SCy Schubert struct comm_base;
5325039b37SCy Schubert 
5425039b37SCy Schubert /**
5525039b37SCy Schubert  * A message buffer with dnstap messages queued up.  It is per-worker.
5625039b37SCy Schubert  * It has locks to synchronize.  If the buffer is full, a new message
5725039b37SCy Schubert  * cannot be added and is discarded.  A thread reads the messages and sends
5825039b37SCy Schubert  * them.
5925039b37SCy Schubert  */
6025039b37SCy Schubert struct dt_msg_queue {
6125039b37SCy Schubert 	/** lock of the buffer structure.  Hold this lock to add or remove
6225039b37SCy Schubert 	 * entries to the buffer.  Release it so that other threads can also
6325039b37SCy Schubert 	 * put messages to log, or a message can be taken out to send away
6425039b37SCy Schubert 	 * by the writer thread.
6525039b37SCy Schubert 	 */
6625039b37SCy Schubert 	lock_basic_type lock;
6725039b37SCy Schubert 	/** the maximum size of the buffer, in bytes */
6825039b37SCy Schubert 	size_t maxsize;
6925039b37SCy Schubert 	/** current size of the buffer, in bytes.  data bytes of messages.
7025039b37SCy Schubert 	 * If a new message make it more than maxsize, the buffer is full */
7125039b37SCy Schubert 	size_t cursize;
72*c0caa2e2SCy Schubert 	/** number of messages in the queue */
73*c0caa2e2SCy Schubert 	int msgcount;
7425039b37SCy Schubert 	/** list of messages.  The messages are added to the back and taken
7525039b37SCy Schubert 	 * out from the front. */
7625039b37SCy Schubert 	struct dt_msg_entry* first, *last;
7725039b37SCy Schubert 	/** reference to the io thread to wakeup */
7825039b37SCy Schubert 	struct dt_io_thread* dtio;
79*c0caa2e2SCy Schubert 	/** the wakeup timer for dtio, on worker event base */
80*c0caa2e2SCy Schubert 	struct comm_timer* wakeup_timer;
8125039b37SCy Schubert };
8225039b37SCy Schubert 
8325039b37SCy Schubert /**
8425039b37SCy Schubert  * An entry in the dt_msg_queue. contains one DNSTAP message.
8525039b37SCy Schubert  * It is malloced.
8625039b37SCy Schubert  */
8725039b37SCy Schubert struct dt_msg_entry {
8825039b37SCy Schubert 	/** next in the list. */
8925039b37SCy Schubert 	struct dt_msg_entry* next;
9025039b37SCy Schubert 	/** the buffer with the data to send, an encoded DNSTAP message */
9125039b37SCy Schubert 	void* buf;
9225039b37SCy Schubert 	/** the length to send. */
9325039b37SCy Schubert 	size_t len;
9425039b37SCy Schubert };
9525039b37SCy Schubert 
9625039b37SCy Schubert /**
9725039b37SCy Schubert  * Containing buffer and counter for reading DNSTAP frames.
9825039b37SCy Schubert  */
9925039b37SCy Schubert struct dt_frame_read_buf {
10025039b37SCy Schubert 	/** Buffer containing frame, except length counter(s). */
10125039b37SCy Schubert 	void* buf;
10225039b37SCy Schubert 	/** Number of bytes written to buffer. */
10325039b37SCy Schubert 	size_t buf_count;
10425039b37SCy Schubert 	/** Capacity of the buffer. */
10525039b37SCy Schubert 	size_t buf_cap;
10625039b37SCy Schubert 
10725039b37SCy Schubert 	/** Frame length field. Will contain the 2nd length field for control
10825039b37SCy Schubert 	 * frames. */
10925039b37SCy Schubert 	uint32_t frame_len;
11025039b37SCy Schubert 	/** Number of bytes that have been written to the frame_length field. */
11125039b37SCy Schubert 	size_t frame_len_done;
11225039b37SCy Schubert 
11325039b37SCy Schubert 	/** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
11425039b37SCy Schubert 	int control_frame;
11525039b37SCy Schubert };
11625039b37SCy Schubert 
11725039b37SCy Schubert /**
11825039b37SCy Schubert  * IO thread that reads from the queues and writes them.
11925039b37SCy Schubert  */
12025039b37SCy Schubert struct dt_io_thread {
12125039b37SCy Schubert 	/** the thread number for the dtio thread,
12225039b37SCy Schubert 	 * must be first to cast thread arg to int* in checklock code. */
12325039b37SCy Schubert 	int threadnum;
12425039b37SCy Schubert 	/** event base, for event handling */
12525039b37SCy Schubert 	void* event_base;
12625039b37SCy Schubert 	/** list of queues that is registered to get written */
12725039b37SCy Schubert 	struct dt_io_list_item* io_list;
12825039b37SCy Schubert 	/** iterator point in the io_list, to pick from them in a
12925039b37SCy Schubert 	 * round-robin fashion, instead of only from the first when busy.
13025039b37SCy Schubert 	 * if NULL it means start at the start of the list. */
13125039b37SCy Schubert 	struct dt_io_list_item* io_list_iter;
13225039b37SCy Schubert 	/** thread id, of the io thread */
13325039b37SCy Schubert 	ub_thread_type tid;
13425039b37SCy Schubert 	/** if the io processing has started */
13525039b37SCy Schubert 	int started;
13625039b37SCy Schubert 	/** ssl context for the io thread, for tls connections. type SSL_CTX* */
13725039b37SCy Schubert 	void* ssl_ctx;
13825039b37SCy Schubert 	/** if SNI will be used for TLS connections. */
13925039b37SCy Schubert 	int tls_use_sni;
14025039b37SCy Schubert 
14125039b37SCy Schubert 	/** file descriptor that the thread writes to */
14225039b37SCy Schubert 	int fd;
14325039b37SCy Schubert 	/** event structure that the thread uses */
14425039b37SCy Schubert 	void* event;
14525039b37SCy Schubert 	/** the event is added */
14625039b37SCy Schubert 	int event_added;
14725039b37SCy Schubert 	/** event added is a write event */
14825039b37SCy Schubert 	int event_added_is_write;
14925039b37SCy Schubert 	/** check for nonblocking connect errors on fd */
15025039b37SCy Schubert 	int check_nb_connect;
15125039b37SCy Schubert 	/** ssl for current connection, type SSL* */
15225039b37SCy Schubert 	void* ssl;
15325039b37SCy Schubert 	/** true if the handshake for SSL is done, 0 if not */
15425039b37SCy Schubert 	int ssl_handshake_done;
15525039b37SCy Schubert 	/** true if briefly the SSL wants a read event, 0 if not.
15625039b37SCy Schubert 	 * This happens during negotiation, we then do not want to write,
15725039b37SCy Schubert 	 * but wait for a read event. */
15825039b37SCy Schubert 	int ssl_brief_read;
15925039b37SCy Schubert 	/** true if SSL_read is waiting for a write event. Set back to 0 after
16025039b37SCy Schubert 	 * single write event is handled. */
16125039b37SCy Schubert 	int ssl_brief_write;
16225039b37SCy Schubert 
16325039b37SCy Schubert 	/** the buffer that currently getting written, or NULL if no
16425039b37SCy Schubert 	 * (partial) message written now */
16525039b37SCy Schubert 	void* cur_msg;
16625039b37SCy Schubert 	/** length of the current message */
16725039b37SCy Schubert 	size_t cur_msg_len;
16825039b37SCy Schubert 	/** number of bytes written for the current message */
16925039b37SCy Schubert 	size_t cur_msg_done;
17025039b37SCy Schubert 	/** number of bytes of the length that have been written,
17125039b37SCy Schubert 	 * for the current message length that precedes the frame */
17225039b37SCy Schubert 	size_t cur_msg_len_done;
17325039b37SCy Schubert 
174*c0caa2e2SCy Schubert 	/** lock on wakeup_timer_enabled */
175*c0caa2e2SCy Schubert 	lock_basic_type wakeup_timer_lock;
176*c0caa2e2SCy Schubert 	/** if wakeup timer is enabled in some thread */
177*c0caa2e2SCy Schubert 	int wakeup_timer_enabled;
17825039b37SCy Schubert 	/** command pipe that stops the pipe if closed.  Used to quit
17925039b37SCy Schubert 	 * the program. [0] is read, [1] is written to. */
18025039b37SCy Schubert 	int commandpipe[2];
18125039b37SCy Schubert 	/** the event to listen to the commandpipe */
18225039b37SCy Schubert 	void* command_event;
18325039b37SCy Schubert 	/** the io thread wants to exit */
18425039b37SCy Schubert 	int want_to_exit;
18525039b37SCy Schubert 
18625039b37SCy Schubert 	/** in stop flush, this is nonNULL and references the stop_ev */
18725039b37SCy Schubert 	void* stop_flush_event;
18825039b37SCy Schubert 
18925039b37SCy Schubert 	/** the timer event for connection retries */
19025039b37SCy Schubert 	void* reconnect_timer;
19125039b37SCy Schubert 	/** if the reconnect timer is added to the event base */
19225039b37SCy Schubert 	int reconnect_is_added;
19325039b37SCy Schubert 	/** the current reconnection timeout, it is increased with
19425039b37SCy Schubert 	 * exponential backoff, in msec */
19525039b37SCy Schubert 	int reconnect_timeout;
19625039b37SCy Schubert 
19725039b37SCy Schubert 	/** If the log server is connected to over unix domain sockets,
19825039b37SCy Schubert 	 * eg. a file is named that is created to log onto. */
19925039b37SCy Schubert 	int upstream_is_unix;
20025039b37SCy Schubert 	/** if the log server is connected to over TCP.  The ip address and
20125039b37SCy Schubert 	 * port are used */
20225039b37SCy Schubert 	int upstream_is_tcp;
20325039b37SCy Schubert 	/** if the log server is connected to over TLS.  ip address, port,
20425039b37SCy Schubert 	 * and client certificates can be used for authentication. */
20525039b37SCy Schubert 	int upstream_is_tls;
20625039b37SCy Schubert 
20725039b37SCy Schubert 	/** Perform bidirectional Frame Streams handshake before sending
20825039b37SCy Schubert 	 * messages. */
20925039b37SCy Schubert 	int is_bidirectional;
21025039b37SCy Schubert 	/** Set if the READY control frame has been sent. */
21125039b37SCy Schubert 	int ready_frame_sent;
21225039b37SCy Schubert 	/** Set if valid ACCEPT frame is received. */
21325039b37SCy Schubert 	int accept_frame_received;
21425039b37SCy Schubert 	/** (partially) read frame */
21525039b37SCy Schubert 	struct dt_frame_read_buf read_frame;
21625039b37SCy Schubert 
21725039b37SCy Schubert 	/** the file path for unix socket (or NULL) */
21825039b37SCy Schubert 	char* socket_path;
21925039b37SCy Schubert 	/** the ip address and port number (or NULL) */
22025039b37SCy Schubert 	char* ip_str;
22125039b37SCy Schubert 	/** is the TLS upstream authenticated by name, if nonNULL,
22225039b37SCy Schubert 	 * we use the same cert bundle as used by other TLS streams. */
22325039b37SCy Schubert 	char* tls_server_name;
22425039b37SCy Schubert 	/** are client certificates in use */
22525039b37SCy Schubert 	int use_client_certs;
22625039b37SCy Schubert 	/** client cert files: the .key file */
22725039b37SCy Schubert 	char* client_key_file;
22825039b37SCy Schubert 	/** client cert files: the .pem file */
22925039b37SCy Schubert 	char* client_cert_file;
23025039b37SCy Schubert };
23125039b37SCy Schubert 
23225039b37SCy Schubert /**
23325039b37SCy Schubert  * IO thread list of queues list item
23425039b37SCy Schubert  * lists a worker queue that should be looked at and sent to the log server.
23525039b37SCy Schubert  */
23625039b37SCy Schubert struct dt_io_list_item {
23725039b37SCy Schubert 	/** next in the list of buffers to inspect */
23825039b37SCy Schubert 	struct dt_io_list_item* next;
23925039b37SCy Schubert 	/** buffer of this worker */
24025039b37SCy Schubert 	struct dt_msg_queue* queue;
24125039b37SCy Schubert };
24225039b37SCy Schubert 
24325039b37SCy Schubert /**
24425039b37SCy Schubert  * Create new (empty) worker message queue. Limit set to default on max.
245*c0caa2e2SCy Schubert  * @param base: event base for wakeup timer.
24625039b37SCy Schubert  * @return NULL on malloc failure or a new queue (not locked).
24725039b37SCy Schubert  */
248*c0caa2e2SCy Schubert struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
24925039b37SCy Schubert 
25025039b37SCy Schubert /**
25125039b37SCy Schubert  * Delete a worker message queue.  It has to be unlinked from access,
25225039b37SCy Schubert  * so it can be deleted without lock worries.  The queue is emptied (deleted).
25325039b37SCy Schubert  * @param mq: message queue.
25425039b37SCy Schubert  */
25525039b37SCy Schubert void dt_msg_queue_delete(struct dt_msg_queue* mq);
25625039b37SCy Schubert 
25725039b37SCy Schubert /**
25825039b37SCy Schubert  * Submit a message to the queue.  The queue is locked by the routine,
25925039b37SCy Schubert  * the message is inserted, and then the queue is unlocked so the
26025039b37SCy Schubert  * message can be picked up by the writer thread.
26125039b37SCy Schubert  * @param mq: message queue.
26225039b37SCy Schubert  * @param buf: buffer with message (dnstap contents).
26325039b37SCy Schubert  * 	The buffer must have been malloced by caller.  It is linked in
26425039b37SCy Schubert  * 	the queue, and is free()d after use.  If the routine fails
26525039b37SCy Schubert  * 	the buffer is freed as well (and nothing happens, the item
26625039b37SCy Schubert  * 	could not be logged).
26725039b37SCy Schubert  * @param len: length of buffer.
26825039b37SCy Schubert  */
26925039b37SCy Schubert void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
27025039b37SCy Schubert 
271*c0caa2e2SCy Schubert /** timer callback to wakeup dtio thread to process messages */
272*c0caa2e2SCy Schubert void mq_wakeup_cb(void* arg);
273*c0caa2e2SCy Schubert 
27425039b37SCy Schubert /**
27525039b37SCy Schubert  * Create IO thread.
27625039b37SCy Schubert  * @return new io thread object. not yet started. or NULL malloc failure.
27725039b37SCy Schubert  */
27825039b37SCy Schubert struct dt_io_thread* dt_io_thread_create(void);
27925039b37SCy Schubert 
28025039b37SCy Schubert /**
28125039b37SCy Schubert  * Delete the IO thread structure.
28225039b37SCy Schubert  * @param dtio: the io thread that is deleted.  It must not be running.
28325039b37SCy Schubert  */
28425039b37SCy Schubert void dt_io_thread_delete(struct dt_io_thread* dtio);
28525039b37SCy Schubert 
28625039b37SCy Schubert /**
28725039b37SCy Schubert  * Apply config to the dtio thread
28825039b37SCy Schubert  * @param dtio: io thread, not yet started.
28925039b37SCy Schubert  * @param cfg: config file struct.
29025039b37SCy Schubert  * @return false on malloc failure.
29125039b37SCy Schubert  */
29225039b37SCy Schubert int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
29325039b37SCy Schubert 	struct config_file *cfg);
29425039b37SCy Schubert 
29525039b37SCy Schubert /**
29625039b37SCy Schubert  * Register a msg queue to the io thread.  It will be polled to see if
29725039b37SCy Schubert  * there are messages and those then get removed and sent, when the thread
29825039b37SCy Schubert  * is running.
29925039b37SCy Schubert  * @param dtio: the io thread.
30025039b37SCy Schubert  * @param mq: message queue to register.
30125039b37SCy Schubert  * @return false on failure (malloc failure).
30225039b37SCy Schubert  */
30325039b37SCy Schubert int dt_io_thread_register_queue(struct dt_io_thread* dtio,
30425039b37SCy Schubert 	struct dt_msg_queue* mq);
30525039b37SCy Schubert 
30625039b37SCy Schubert /**
30725039b37SCy Schubert  * Unregister queue from io thread.
30825039b37SCy Schubert  * @param dtio: the io thread.
30925039b37SCy Schubert  * @param mq: message queue.
31025039b37SCy Schubert  */
31125039b37SCy Schubert void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
31225039b37SCy Schubert         struct dt_msg_queue* mq);
31325039b37SCy Schubert 
31425039b37SCy Schubert /**
31525039b37SCy Schubert  * Start the io thread
31625039b37SCy Schubert  * @param dtio: the io thread.
31725039b37SCy Schubert  * @param event_base_nothr: the event base to attach the events to, in case
31825039b37SCy Schubert  * 	we are running without threads.  With threads, this is ignored
31925039b37SCy Schubert  * 	and a thread is started to process the dnstap log messages.
32025039b37SCy Schubert  * @param numworkers: number of worker threads.  The dnstap io thread is
32125039b37SCy Schubert  * 	that number +1 as the threadnumber (in logs).
32225039b37SCy Schubert  * @return false on failure.
32325039b37SCy Schubert  */
32425039b37SCy Schubert int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
32525039b37SCy Schubert 	int numworkers);
32625039b37SCy Schubert 
32725039b37SCy Schubert /**
32825039b37SCy Schubert  * Stop the io thread
32925039b37SCy Schubert  * @param dtio: the io thread.
33025039b37SCy Schubert  */
33125039b37SCy Schubert void dt_io_thread_stop(struct dt_io_thread* dtio);
33225039b37SCy Schubert 
33325039b37SCy Schubert /** callback for the dnstap reconnect, to start reconnecting to output */
33425039b37SCy Schubert void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
33525039b37SCy Schubert 
33625039b37SCy Schubert /** callback for the dnstap events, to write to the output */
33725039b37SCy Schubert void dtio_output_cb(int fd, short bits, void* arg);
33825039b37SCy Schubert 
33925039b37SCy Schubert /** callback for the dnstap commandpipe, to stop the dnstap IO */
34025039b37SCy Schubert void dtio_cmd_cb(int fd, short bits, void* arg);
34125039b37SCy Schubert 
34225039b37SCy Schubert /** callback for the timer when the thread stops and wants to finish up */
34325039b37SCy Schubert void dtio_stop_timer_cb(int fd, short bits, void* arg);
34425039b37SCy Schubert 
34525039b37SCy Schubert /** callback for the output when the thread stops and wants to finish up */
34625039b37SCy Schubert void dtio_stop_ev_cb(int fd, short bits, void* arg);
34725039b37SCy Schubert 
34825039b37SCy Schubert /** callback for unbound-dnstap-socket */
34925039b37SCy Schubert void dtio_tap_callback(int fd, short bits, void* arg);
35025039b37SCy Schubert 
35125039b37SCy Schubert /** callback for unbound-dnstap-socket */
35225039b37SCy Schubert void dtio_mainfdcallback(int fd, short bits, void* arg);
35325039b37SCy Schubert 
35425039b37SCy Schubert #endif /* DTSTREAM_H */
355