1 /* 2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37 /** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44 #ifndef DTSTREAM_H 45 #define DTSTREAM_H 46 47 #include "util/locks.h" 48 struct dt_msg_entry; 49 struct dt_io_list_item; 50 struct dt_io_thread; 51 struct config_file; 52 struct comm_base; 53 54 /** 55 * A message buffer with dnstap messages queued up. It is per-worker. 56 * It has locks to synchronize. If the buffer is full, a new message 57 * cannot be added and is discarded. A thread reads the messages and sends 58 * them. 59 */ 60 struct dt_msg_queue { 61 /** lock of the buffer structure. Hold this lock to add or remove 62 * entries to the buffer. Release it so that other threads can also 63 * put messages to log, or a message can be taken out to send away 64 * by the writer thread. 65 */ 66 lock_basic_type lock; 67 /** the maximum size of the buffer, in bytes */ 68 size_t maxsize; 69 /** current size of the buffer, in bytes. data bytes of messages. 70 * If a new message make it more than maxsize, the buffer is full */ 71 size_t cursize; 72 /** number of messages in the queue */ 73 int msgcount; 74 /** list of messages. The messages are added to the back and taken 75 * out from the front. */ 76 struct dt_msg_entry* first, *last; 77 /** reference to the io thread to wakeup */ 78 struct dt_io_thread* dtio; 79 /** the wakeup timer for dtio, on worker event base */ 80 struct comm_timer* wakeup_timer; 81 }; 82 83 /** 84 * An entry in the dt_msg_queue. contains one DNSTAP message. 85 * It is malloced. 86 */ 87 struct dt_msg_entry { 88 /** next in the list. */ 89 struct dt_msg_entry* next; 90 /** the buffer with the data to send, an encoded DNSTAP message */ 91 void* buf; 92 /** the length to send. */ 93 size_t len; 94 }; 95 96 /** 97 * Containing buffer and counter for reading DNSTAP frames. 98 */ 99 struct dt_frame_read_buf { 100 /** Buffer containing frame, except length counter(s). */ 101 void* buf; 102 /** Number of bytes written to buffer. */ 103 size_t buf_count; 104 /** Capacity of the buffer. */ 105 size_t buf_cap; 106 107 /** Frame length field. Will contain the 2nd length field for control 108 * frames. */ 109 uint32_t frame_len; 110 /** Number of bytes that have been written to the frame_length field. */ 111 size_t frame_len_done; 112 113 /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */ 114 int control_frame; 115 }; 116 117 /** 118 * IO thread that reads from the queues and writes them. 119 */ 120 struct dt_io_thread { 121 /** the thread number for the dtio thread, 122 * must be first to cast thread arg to int* in checklock code. */ 123 int threadnum; 124 /** event base, for event handling */ 125 void* event_base; 126 /** list of queues that is registered to get written */ 127 struct dt_io_list_item* io_list; 128 /** iterator point in the io_list, to pick from them in a 129 * round-robin fashion, instead of only from the first when busy. 130 * if NULL it means start at the start of the list. */ 131 struct dt_io_list_item* io_list_iter; 132 /** thread id, of the io thread */ 133 ub_thread_type tid; 134 #ifdef HAVE_GETTID 135 /** thread tid, the LWP id */ 136 pid_t thread_tid; 137 /** if logging should include the LWP id */ 138 int thread_tid_log; 139 #endif 140 /** if the io processing has started */ 141 int started; 142 /** ssl context for the io thread, for tls connections. type SSL_CTX* */ 143 void* ssl_ctx; 144 /** if SNI will be used for TLS connections. */ 145 int tls_use_sni; 146 147 /** file descriptor that the thread writes to */ 148 int fd; 149 /** event structure that the thread uses */ 150 void* event; 151 /** the event is added */ 152 int event_added; 153 /** event added is a write event */ 154 int event_added_is_write; 155 /** check for nonblocking connect errors on fd */ 156 int check_nb_connect; 157 /** ssl for current connection, type SSL* */ 158 void* ssl; 159 /** true if the handshake for SSL is done, 0 if not */ 160 int ssl_handshake_done; 161 /** true if briefly the SSL wants a read event, 0 if not. 162 * This happens during negotiation, we then do not want to write, 163 * but wait for a read event. */ 164 int ssl_brief_read; 165 /** true if SSL_read is waiting for a write event. Set back to 0 after 166 * single write event is handled. */ 167 int ssl_brief_write; 168 169 /** the buffer that currently getting written, or NULL if no 170 * (partial) message written now */ 171 void* cur_msg; 172 /** length of the current message */ 173 size_t cur_msg_len; 174 /** number of bytes written for the current message */ 175 size_t cur_msg_done; 176 /** number of bytes of the length that have been written, 177 * for the current message length that precedes the frame */ 178 size_t cur_msg_len_done; 179 180 /** lock on wakeup_timer_enabled */ 181 lock_basic_type wakeup_timer_lock; 182 /** if wakeup timer is enabled in some thread */ 183 int wakeup_timer_enabled; 184 /** command pipe that stops the pipe if closed. Used to quit 185 * the program. [0] is read, [1] is written to. */ 186 int commandpipe[2]; 187 /** the event to listen to the commandpipe */ 188 void* command_event; 189 /** the io thread wants to exit */ 190 int want_to_exit; 191 192 /** in stop flush, this is nonNULL and references the stop_ev */ 193 void* stop_flush_event; 194 195 /** the timer event for connection retries */ 196 void* reconnect_timer; 197 /** if the reconnect timer is added to the event base */ 198 int reconnect_is_added; 199 /** the current reconnection timeout, it is increased with 200 * exponential backoff, in msec */ 201 int reconnect_timeout; 202 203 /** If the log server is connected to over unix domain sockets, 204 * eg. a file is named that is created to log onto. */ 205 int upstream_is_unix; 206 /** if the log server is connected to over TCP. The ip address and 207 * port are used */ 208 int upstream_is_tcp; 209 /** if the log server is connected to over TLS. ip address, port, 210 * and client certificates can be used for authentication. */ 211 int upstream_is_tls; 212 213 /** Perform bidirectional Frame Streams handshake before sending 214 * messages. */ 215 int is_bidirectional; 216 /** Set if the READY control frame has been sent. */ 217 int ready_frame_sent; 218 /** Set if valid ACCEPT frame is received. */ 219 int accept_frame_received; 220 /** (partially) read frame */ 221 struct dt_frame_read_buf read_frame; 222 223 /** the file path for unix socket (or NULL) */ 224 char* socket_path; 225 /** the ip address and port number (or NULL) */ 226 char* ip_str; 227 /** is the TLS upstream authenticated by name, if nonNULL, 228 * we use the same cert bundle as used by other TLS streams. */ 229 char* tls_server_name; 230 /** are client certificates in use */ 231 int use_client_certs; 232 /** client cert files: the .key file */ 233 char* client_key_file; 234 /** client cert files: the .pem file */ 235 char* client_cert_file; 236 }; 237 238 /** 239 * IO thread list of queues list item 240 * lists a worker queue that should be looked at and sent to the log server. 241 */ 242 struct dt_io_list_item { 243 /** next in the list of buffers to inspect */ 244 struct dt_io_list_item* next; 245 /** buffer of this worker */ 246 struct dt_msg_queue* queue; 247 }; 248 249 /** 250 * Create new (empty) worker message queue. Limit set to default on max. 251 * @param base: event base for wakeup timer. 252 * @return NULL on malloc failure or a new queue (not locked). 253 */ 254 struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); 255 256 /** 257 * Delete a worker message queue. It has to be unlinked from access, 258 * so it can be deleted without lock worries. The queue is emptied (deleted). 259 * @param mq: message queue. 260 */ 261 void dt_msg_queue_delete(struct dt_msg_queue* mq); 262 263 /** 264 * Submit a message to the queue. The queue is locked by the routine, 265 * the message is inserted, and then the queue is unlocked so the 266 * message can be picked up by the writer thread. 267 * @param mq: message queue. 268 * @param buf: buffer with message (dnstap contents). 269 * The buffer must have been malloced by caller. It is linked in 270 * the queue, and is free()d after use. If the routine fails 271 * the buffer is freed as well (and nothing happens, the item 272 * could not be logged). 273 * @param len: length of buffer. 274 */ 275 void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); 276 277 /** timer callback to wakeup dtio thread to process messages */ 278 void mq_wakeup_cb(void* arg); 279 280 /** 281 * Create IO thread. 282 * @return new io thread object. not yet started. or NULL malloc failure. 283 */ 284 struct dt_io_thread* dt_io_thread_create(void); 285 286 /** 287 * Delete the IO thread structure. 288 * @param dtio: the io thread that is deleted. It must not be running. 289 */ 290 void dt_io_thread_delete(struct dt_io_thread* dtio); 291 292 /** 293 * Apply config to the dtio thread 294 * @param dtio: io thread, not yet started. 295 * @param cfg: config file struct. 296 * @return false on malloc failure. 297 */ 298 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, 299 struct config_file *cfg); 300 301 /** 302 * Register a msg queue to the io thread. It will be polled to see if 303 * there are messages and those then get removed and sent, when the thread 304 * is running. 305 * @param dtio: the io thread. 306 * @param mq: message queue to register. 307 * @return false on failure (malloc failure). 308 */ 309 int dt_io_thread_register_queue(struct dt_io_thread* dtio, 310 struct dt_msg_queue* mq); 311 312 /** 313 * Unregister queue from io thread. 314 * @param dtio: the io thread. 315 * @param mq: message queue. 316 */ 317 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 318 struct dt_msg_queue* mq); 319 320 /** 321 * Start the io thread 322 * @param dtio: the io thread. 323 * @param event_base_nothr: the event base to attach the events to, in case 324 * we are running without threads. With threads, this is ignored 325 * and a thread is started to process the dnstap log messages. 326 * @param numworkers: number of worker threads. The dnstap io thread is 327 * that number +1 as the threadnumber (in logs). 328 * @return false on failure. 329 */ 330 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 331 int numworkers); 332 333 /** 334 * Stop the io thread 335 * @param dtio: the io thread. 336 */ 337 void dt_io_thread_stop(struct dt_io_thread* dtio); 338 339 /** callback for the dnstap reconnect, to start reconnecting to output */ 340 void dtio_reconnect_timeout_cb(int fd, short bits, void* arg); 341 342 /** callback for the dnstap events, to write to the output */ 343 void dtio_output_cb(int fd, short bits, void* arg); 344 345 /** callback for the dnstap commandpipe, to stop the dnstap IO */ 346 void dtio_cmd_cb(int fd, short bits, void* arg); 347 348 /** callback for the timer when the thread stops and wants to finish up */ 349 void dtio_stop_timer_cb(int fd, short bits, void* arg); 350 351 /** callback for the output when the thread stops and wants to finish up */ 352 void dtio_stop_ev_cb(int fd, short bits, void* arg); 353 354 /** callback for unbound-dnstap-socket */ 355 void dtio_tap_callback(int fd, short bits, void* arg); 356 357 /** callback for unbound-dnstap-socket */ 358 void dtio_mainfdcallback(int fd, short bits, void* arg); 359 360 #endif /* DTSTREAM_H */ 361