xref: /freebsd/contrib/ntp/libntp/ntp_worker.c (revision 4990d495fcc77c51b3f46c91ba3a064b565afae0)
1 /*
2  * ntp_worker.c
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORKER
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 
13 #include "iosignal.h"
14 #include "ntp_stdlib.h"
15 #include "ntp_malloc.h"
16 #include "ntp_syslog.h"
17 #include "ntpd.h"
18 #include "ntp_io.h"
19 #include "ntp_assert.h"
20 #include "ntp_unixtime.h"
21 #include "intreswork.h"
22 
23 
24 #define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
25 
26 blocking_child **	blocking_children;
27 size_t			blocking_children_alloc;
28 int			worker_per_query;	/* boolean */
29 int			intres_req_pending;
30 volatile u_int		blocking_child_ready_seen;
31 volatile u_int		blocking_child_ready_done;
32 
33 
34 #ifndef HAVE_IO_COMPLETION_PORT
35 /*
36  * pipe_socketpair()
37  *
38  * Provides an AF_UNIX socketpair on systems which have them, otherwise
39  * pair of unidirectional pipes.
40  */
41 int
pipe_socketpair(int caller_fds[2],int * is_pipe)42 pipe_socketpair(
43 	int	caller_fds[2],
44 	int *	is_pipe
45 	)
46 {
47 	int	rc;
48 	int	fds[2];
49 	int	called_pipe;
50 
51 #ifdef HAVE_SOCKETPAIR
52 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
53 #else
54 	rc = -1;
55 #endif
56 
57 	if (-1 == rc) {
58 		rc = pipe(&fds[0]);
59 		called_pipe = TRUE;
60 	} else {
61 		called_pipe = FALSE;
62 	}
63 
64 	if (-1 == rc)
65 		return rc;
66 
67 	caller_fds[0] = fds[0];
68 	caller_fds[1] = fds[1];
69 	if (is_pipe != NULL)
70 		*is_pipe = called_pipe;
71 
72 	return 0;
73 }
74 
75 
76 /*
77  * close_all_except()
78  *
79  * Close all file descriptors except the given keep_fd.
80  */
81 void
close_all_except(int keep_fd)82 close_all_except(
83 	int keep_fd
84 	)
85 {
86 	int fd;
87 
88 	for (fd = 0; fd < keep_fd; fd++)
89 		close(fd);
90 
91 	close_all_beyond(keep_fd);
92 }
93 
94 
95 /*
96  * close_all_beyond()
97  *
98  * Close all file descriptors after the given keep_fd, which is the
99  * highest fd to keep open.
100  */
101 void
close_all_beyond(int keep_fd)102 close_all_beyond(
103 	int keep_fd
104 	)
105 {
106 # ifdef HAVE_CLOSEFROM
107 	closefrom(keep_fd + 1);
108 # elif defined(F_CLOSEM)
109 	/*
110 	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
111 	 * by Eric Agar (saves us from doing 32767 system
112 	 * calls)
113 	 */
114 	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
115 		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
116 # else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
117 	int fd;
118 	int max_fd;
119 
120 	max_fd = GETDTABLESIZE();
121 	for (fd = keep_fd + 1; fd < max_fd; fd++)
122 		close(fd);
123 # endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
124 }
125 #endif	/* HAVE_IO_COMPLETION_PORT */
126 
127 
128 u_int
available_blocking_child_slot(void)129 available_blocking_child_slot(void)
130 {
131 	const size_t	each = sizeof(blocking_children[0]);
132 	u_int		slot;
133 	size_t		prev_alloc;
134 	size_t		new_alloc;
135 	size_t		prev_octets;
136 	size_t		octets;
137 
138 	for (slot = 0; slot < blocking_children_alloc; slot++) {
139 		if (NULL == blocking_children[slot])
140 			return slot;
141 		if (blocking_children[slot]->reusable) {
142 			blocking_children[slot]->reusable = FALSE;
143 			return slot;
144 		}
145 	}
146 
147 	prev_alloc = blocking_children_alloc;
148 	prev_octets = prev_alloc * each;
149 	new_alloc = blocking_children_alloc + 4;
150 	octets = new_alloc * each;
151 	blocking_children = erealloc_zero(blocking_children, octets,
152 					  prev_octets);
153 	blocking_children_alloc = new_alloc;
154 
155 	/* assume we'll never have enough workers to overflow u_int */
156 	return (u_int)prev_alloc;
157 }
158 
159 
160 int
queue_blocking_request(blocking_work_req rtype,void * req,size_t reqsize,blocking_work_callback done_func,void * context)161 queue_blocking_request(
162 	blocking_work_req	rtype,
163 	void *			req,
164 	size_t			reqsize,
165 	blocking_work_callback	done_func,
166 	void *			context
167 	)
168 {
169 	static u_int		intres_slot = UINT_MAX;
170 	u_int			child_slot;
171 	blocking_child *	c;
172 	blocking_pipe_header	req_hdr;
173 
174 	req_hdr.octets = sizeof(req_hdr) + reqsize;
175 	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
176 	req_hdr.rtype = rtype;
177 	req_hdr.done_func = done_func;
178 	req_hdr.context = context;
179 
180 	child_slot = UINT_MAX;
181 	if (worker_per_query || UINT_MAX == intres_slot ||
182 	    blocking_children[intres_slot]->reusable)
183 		child_slot = available_blocking_child_slot();
184 	if (!worker_per_query) {
185 		if (UINT_MAX == intres_slot)
186 			intres_slot = child_slot;
187 		else
188 			child_slot = intres_slot;
189 		if (0 == intres_req_pending)
190 			intres_timeout_req(0);
191 	}
192 	intres_req_pending++;
193 	INSIST(UINT_MAX != child_slot);
194 	c = blocking_children[child_slot];
195 	if (NULL == c) {
196 		c = emalloc_zero(sizeof(*c));
197 #ifdef WORK_FORK
198 		c->req_read_pipe = -1;
199 		c->req_write_pipe = -1;
200 #endif
201 #ifdef WORK_PIPE
202 		c->resp_read_pipe = -1;
203 		c->resp_write_pipe = -1;
204 #endif
205 		blocking_children[child_slot] = c;
206 	}
207 	req_hdr.child_idx = child_slot;
208 
209 	return send_blocking_req_internal(c, &req_hdr, req);
210 }
211 
212 
queue_blocking_response(blocking_child * c,blocking_pipe_header * resp,size_t respsize,const blocking_pipe_header * req)213 int queue_blocking_response(
214 	blocking_child *		c,
215 	blocking_pipe_header *		resp,
216 	size_t				respsize,
217 	const blocking_pipe_header *	req
218 	)
219 {
220 	resp->octets = respsize;
221 	resp->magic_sig = BLOCKING_RESP_MAGIC;
222 	resp->rtype = req->rtype;
223 	resp->context = req->context;
224 	resp->done_func = req->done_func;
225 
226 	return send_blocking_resp_internal(c, resp);
227 }
228 
229 
230 void
process_blocking_resp(blocking_child * c)231 process_blocking_resp(
232 	blocking_child *	c
233 	)
234 {
235 	blocking_pipe_header *	resp;
236 	void *			data;
237 
238 	/*
239 	 * On Windows send_blocking_resp_internal() may signal the
240 	 * blocking_response_ready event multiple times while we're
241 	 * processing a response, so always consume all available
242 	 * responses before returning to test the event again.
243 	 */
244 #ifdef WORK_THREAD
245 	do {
246 #endif
247 		resp = receive_blocking_resp_internal(c);
248 		if (NULL != resp) {
249 			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
250 				      resp->magic_sig);
251 			data = (char *)resp + sizeof(*resp);
252 			intres_req_pending--;
253 			(*resp->done_func)(resp->rtype, resp->context,
254 					   resp->octets - sizeof(*resp),
255 					   data);
256 			free(resp);
257 		}
258 #ifdef WORK_THREAD
259 	} while (NULL != resp);
260 #endif
261 	if (!worker_per_query && 0 == intres_req_pending)
262 		intres_timeout_req(CHILD_MAX_IDLE);
263 	else if (worker_per_query)
264 		req_child_exit(c);
265 }
266 
267 void
harvest_blocking_responses(void)268 harvest_blocking_responses(void)
269 {
270 	size_t		idx;
271 	blocking_child*	cp;
272 	u_int		scseen, scdone;
273 
274 	scseen = blocking_child_ready_seen;
275 	scdone = blocking_child_ready_done;
276 	if (scdone != scseen) {
277 		blocking_child_ready_done = scseen;
278 		for (idx = 0; idx < blocking_children_alloc; idx++) {
279 			cp = blocking_children[idx];
280 			if (NULL == cp)
281 				continue;
282 			scseen = cp->resp_ready_seen;
283 			scdone = cp->resp_ready_done;
284 			if (scdone != scseen) {
285 				cp->resp_ready_done = scseen;
286 				process_blocking_resp(cp);
287 			}
288 		}
289 	}
290 }
291 
292 
293 /*
294  * blocking_child_common runs as a forked child or a thread
295  */
296 int
blocking_child_common(blocking_child * c)297 blocking_child_common(
298 	blocking_child	*c
299 	)
300 {
301 	int say_bye;
302 	blocking_pipe_header *req;
303 
304 	say_bye = FALSE;
305 	while (!say_bye) {
306 		req = receive_blocking_req_internal(c);
307 		if (NULL == req) {
308 			say_bye = TRUE;
309 			continue;
310 		}
311 
312 		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
313 
314 		switch (req->rtype) {
315 		case BLOCKING_GETADDRINFO:
316 			if (blocking_getaddrinfo(c, req))
317 				say_bye = TRUE;
318 			break;
319 
320 		case BLOCKING_GETNAMEINFO:
321 			if (blocking_getnameinfo(c, req))
322 				say_bye = TRUE;
323 			break;
324 
325 		default:
326 			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
327 			say_bye = TRUE;
328 		}
329 
330 		free(req);
331 	}
332 
333 	return 0;
334 }
335 
336 
337 /*
338  * worker_idle_timer_fired()
339  *
340  * The parent starts this timer when the last pending response has been
341  * received from the child, making it idle, and clears the timer when a
342  * request is dispatched to the child.  Once the timer expires, the
343  * child is sent packing.
344  *
345  * This is called when worker_idle_timer is nonzero and less than or
346  * equal to current_time.
347  */
348 void
worker_idle_timer_fired(void)349 worker_idle_timer_fired(void)
350 {
351 	u_int			idx;
352 	blocking_child *	c;
353 
354 	DEBUG_REQUIRE(0 == intres_req_pending);
355 
356 	intres_timeout_req(0);
357 	for (idx = 0; idx < blocking_children_alloc; idx++) {
358 		c = blocking_children[idx];
359 		if (NULL == c)
360 			continue;
361 		req_child_exit(c);
362 	}
363 }
364 
365 
366 #else	/* !WORKER follows */
367 int ntp_worker_nonempty_compilation_unit;
368 #endif
369