xref: /freebsd/contrib/unbound/dnstap/dtstream.h (revision 5fa84c6ec176d186ddad25d31f8760e50f48157f)
1 /*
2  * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36 
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43 
44 #ifndef DTSTREAM_H
45 #define DTSTREAM_H
46 
47 #include "util/locks.h"
48 struct dt_msg_entry;
49 struct dt_io_list_item;
50 struct dt_io_thread;
51 struct config_file;
52 struct comm_base;
53 
54 /**
55  * A message buffer with dnstap messages queued up.  It is per-worker.
56  * It has locks to synchronize.  If the buffer is full, a new message
57  * cannot be added and is discarded.  A thread reads the messages and sends
58  * them.
59  */
60 struct dt_msg_queue {
61 	/** lock of the buffer structure.  Hold this lock to add or remove
62 	 * entries to the buffer.  Release it so that other threads can also
63 	 * put messages to log, or a message can be taken out to send away
64 	 * by the writer thread.
65 	 */
66 	lock_basic_type lock;
67 	/** the maximum size of the buffer, in bytes */
68 	size_t maxsize;
69 	/** current size of the buffer, in bytes.  data bytes of messages.
70 	 * If a new message make it more than maxsize, the buffer is full */
71 	size_t cursize;
72 	/** number of messages in the queue */
73 	int msgcount;
74 	/** list of messages.  The messages are added to the back and taken
75 	 * out from the front. */
76 	struct dt_msg_entry* first, *last;
77 	/** reference to the io thread to wakeup */
78 	struct dt_io_thread* dtio;
79 	/** the wakeup timer for dtio, on worker event base */
80 	struct comm_timer* wakeup_timer;
81 };
82 
83 /**
84  * An entry in the dt_msg_queue. contains one DNSTAP message.
85  * It is malloced.
86  */
87 struct dt_msg_entry {
88 	/** next in the list. */
89 	struct dt_msg_entry* next;
90 	/** the buffer with the data to send, an encoded DNSTAP message */
91 	void* buf;
92 	/** the length to send. */
93 	size_t len;
94 };
95 
96 /**
97  * Containing buffer and counter for reading DNSTAP frames.
98  */
99 struct dt_frame_read_buf {
100 	/** Buffer containing frame, except length counter(s). */
101 	void* buf;
102 	/** Number of bytes written to buffer. */
103 	size_t buf_count;
104 	/** Capacity of the buffer. */
105 	size_t buf_cap;
106 
107 	/** Frame length field. Will contain the 2nd length field for control
108 	 * frames. */
109 	uint32_t frame_len;
110 	/** Number of bytes that have been written to the frame_length field. */
111 	size_t frame_len_done;
112 
113 	/** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
114 	int control_frame;
115 };
116 
117 /**
118  * IO thread that reads from the queues and writes them.
119  */
120 struct dt_io_thread {
121 	/** the thread number for the dtio thread,
122 	 * must be first to cast thread arg to int* in checklock code. */
123 	int threadnum;
124 	/** event base, for event handling */
125 	void* event_base;
126 	/** list of queues that is registered to get written */
127 	struct dt_io_list_item* io_list;
128 	/** iterator point in the io_list, to pick from them in a
129 	 * round-robin fashion, instead of only from the first when busy.
130 	 * if NULL it means start at the start of the list. */
131 	struct dt_io_list_item* io_list_iter;
132 	/** thread id, of the io thread */
133 	ub_thread_type tid;
134 #ifdef HAVE_GETTID
135 	/** thread tid, the LWP id */
136 	pid_t thread_tid;
137 	/** if logging should include the LWP id */
138 	int thread_tid_log;
139 #endif
140 	/** if the io processing has started */
141 	int started;
142 	/** ssl context for the io thread, for tls connections. type SSL_CTX* */
143 	void* ssl_ctx;
144 	/** if SNI will be used for TLS connections. */
145 	int tls_use_sni;
146 
147 	/** file descriptor that the thread writes to */
148 	int fd;
149 	/** event structure that the thread uses */
150 	void* event;
151 	/** the event is added */
152 	int event_added;
153 	/** event added is a write event */
154 	int event_added_is_write;
155 	/** check for nonblocking connect errors on fd */
156 	int check_nb_connect;
157 	/** ssl for current connection, type SSL* */
158 	void* ssl;
159 	/** true if the handshake for SSL is done, 0 if not */
160 	int ssl_handshake_done;
161 	/** true if briefly the SSL wants a read event, 0 if not.
162 	 * This happens during negotiation, we then do not want to write,
163 	 * but wait for a read event. */
164 	int ssl_brief_read;
165 	/** true if SSL_read is waiting for a write event. Set back to 0 after
166 	 * single write event is handled. */
167 	int ssl_brief_write;
168 
169 	/** the buffer that currently getting written, or NULL if no
170 	 * (partial) message written now */
171 	void* cur_msg;
172 	/** length of the current message */
173 	size_t cur_msg_len;
174 	/** number of bytes written for the current message */
175 	size_t cur_msg_done;
176 	/** number of bytes of the length that have been written,
177 	 * for the current message length that precedes the frame */
178 	size_t cur_msg_len_done;
179 
180 	/** lock on wakeup_timer_enabled */
181 	lock_basic_type wakeup_timer_lock;
182 	/** if wakeup timer is enabled in some thread */
183 	int wakeup_timer_enabled;
184 	/** command pipe that stops the pipe if closed.  Used to quit
185 	 * the program. [0] is read, [1] is written to. */
186 	int commandpipe[2];
187 	/** the event to listen to the commandpipe */
188 	void* command_event;
189 	/** the io thread wants to exit */
190 	int want_to_exit;
191 
192 	/** in stop flush, this is nonNULL and references the stop_ev */
193 	void* stop_flush_event;
194 
195 	/** the timer event for connection retries */
196 	void* reconnect_timer;
197 	/** if the reconnect timer is added to the event base */
198 	int reconnect_is_added;
199 	/** the current reconnection timeout, it is increased with
200 	 * exponential backoff, in msec */
201 	int reconnect_timeout;
202 
203 	/** If the log server is connected to over unix domain sockets,
204 	 * eg. a file is named that is created to log onto. */
205 	int upstream_is_unix;
206 	/** if the log server is connected to over TCP.  The ip address and
207 	 * port are used */
208 	int upstream_is_tcp;
209 	/** if the log server is connected to over TLS.  ip address, port,
210 	 * and client certificates can be used for authentication. */
211 	int upstream_is_tls;
212 
213 	/** Perform bidirectional Frame Streams handshake before sending
214 	 * messages. */
215 	int is_bidirectional;
216 	/** Set if the READY control frame has been sent. */
217 	int ready_frame_sent;
218 	/** Set if valid ACCEPT frame is received. */
219 	int accept_frame_received;
220 	/** (partially) read frame */
221 	struct dt_frame_read_buf read_frame;
222 
223 	/** the file path for unix socket (or NULL) */
224 	char* socket_path;
225 	/** the ip address and port number (or NULL) */
226 	char* ip_str;
227 	/** is the TLS upstream authenticated by name, if nonNULL,
228 	 * we use the same cert bundle as used by other TLS streams. */
229 	char* tls_server_name;
230 	/** are client certificates in use */
231 	int use_client_certs;
232 	/** client cert files: the .key file */
233 	char* client_key_file;
234 	/** client cert files: the .pem file */
235 	char* client_cert_file;
236 };
237 
238 /**
239  * IO thread list of queues list item
240  * lists a worker queue that should be looked at and sent to the log server.
241  */
242 struct dt_io_list_item {
243 	/** next in the list of buffers to inspect */
244 	struct dt_io_list_item* next;
245 	/** buffer of this worker */
246 	struct dt_msg_queue* queue;
247 };
248 
249 /**
250  * Create new (empty) worker message queue. Limit set to default on max.
251  * @param base: event base for wakeup timer.
252  * @return NULL on malloc failure or a new queue (not locked).
253  */
254 struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
255 
256 /**
257  * Delete a worker message queue.  It has to be unlinked from access,
258  * so it can be deleted without lock worries.  The queue is emptied (deleted).
259  * @param mq: message queue.
260  */
261 void dt_msg_queue_delete(struct dt_msg_queue* mq);
262 
263 /**
264  * Submit a message to the queue.  The queue is locked by the routine,
265  * the message is inserted, and then the queue is unlocked so the
266  * message can be picked up by the writer thread.
267  * @param mq: message queue.
268  * @param buf: buffer with message (dnstap contents).
269  * 	The buffer must have been malloced by caller.  It is linked in
270  * 	the queue, and is free()d after use.  If the routine fails
271  * 	the buffer is freed as well (and nothing happens, the item
272  * 	could not be logged).
273  * @param len: length of buffer.
274  */
275 void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
276 
277 /** timer callback to wakeup dtio thread to process messages */
278 void mq_wakeup_cb(void* arg);
279 
280 /**
281  * Create IO thread.
282  * @return new io thread object. not yet started. or NULL malloc failure.
283  */
284 struct dt_io_thread* dt_io_thread_create(void);
285 
286 /**
287  * Delete the IO thread structure.
288  * @param dtio: the io thread that is deleted.  It must not be running.
289  */
290 void dt_io_thread_delete(struct dt_io_thread* dtio);
291 
292 /**
293  * Apply config to the dtio thread
294  * @param dtio: io thread, not yet started.
295  * @param cfg: config file struct.
296  * @return false on malloc failure.
297  */
298 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
299 	struct config_file *cfg);
300 
301 /**
302  * Register a msg queue to the io thread.  It will be polled to see if
303  * there are messages and those then get removed and sent, when the thread
304  * is running.
305  * @param dtio: the io thread.
306  * @param mq: message queue to register.
307  * @return false on failure (malloc failure).
308  */
309 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
310 	struct dt_msg_queue* mq);
311 
312 /**
313  * Unregister queue from io thread.
314  * @param dtio: the io thread.
315  * @param mq: message queue.
316  */
317 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
318         struct dt_msg_queue* mq);
319 
320 /**
321  * Start the io thread
322  * @param dtio: the io thread.
323  * @param event_base_nothr: the event base to attach the events to, in case
324  * 	we are running without threads.  With threads, this is ignored
325  * 	and a thread is started to process the dnstap log messages.
326  * @param numworkers: number of worker threads.  The dnstap io thread is
327  * 	that number +1 as the threadnumber (in logs).
328  * @return false on failure.
329  */
330 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
331 	int numworkers);
332 
333 /**
334  * Stop the io thread
335  * @param dtio: the io thread.
336  */
337 void dt_io_thread_stop(struct dt_io_thread* dtio);
338 
339 /** callback for the dnstap reconnect, to start reconnecting to output */
340 void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
341 
342 /** callback for the dnstap events, to write to the output */
343 void dtio_output_cb(int fd, short bits, void* arg);
344 
345 /** callback for the dnstap commandpipe, to stop the dnstap IO */
346 void dtio_cmd_cb(int fd, short bits, void* arg);
347 
348 /** callback for the timer when the thread stops and wants to finish up */
349 void dtio_stop_timer_cb(int fd, short bits, void* arg);
350 
351 /** callback for the output when the thread stops and wants to finish up */
352 void dtio_stop_ev_cb(int fd, short bits, void* arg);
353 
354 /** callback for unbound-dnstap-socket */
355 void dtio_tap_callback(int fd, short bits, void* arg);
356 
357 /** callback for unbound-dnstap-socket */
358 void dtio_mainfdcallback(int fd, short bits, void* arg);
359 
360 #endif /* DTSTREAM_H */
361