xref: /freebsd/contrib/ntp/libntp/ntp_worker.c (revision e8e8c939350bdf3c228a411caa9660c607c27a11)
1 /*
2  * ntp_worker.c
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6 
7 #ifdef WORKER
8 
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 
13 #include "iosignal.h"
14 #include "ntp_stdlib.h"
15 #include "ntp_malloc.h"
16 #include "ntp_syslog.h"
17 #include "ntpd.h"
18 #include "ntp_io.h"
19 #include "ntp_assert.h"
20 #include "ntp_unixtime.h"
21 #include "intreswork.h"
22 
23 
24 #define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
25 
26 blocking_child **	blocking_children;
27 size_t			blocking_children_alloc;
28 int			worker_per_query;	/* boolean */
29 int			intres_req_pending;
30 
31 
32 #ifndef HAVE_IO_COMPLETION_PORT
33 /*
34  * pipe_socketpair()
35  *
36  * Provides an AF_UNIX socketpair on systems which have them, otherwise
37  * pair of unidirectional pipes.
38  */
39 int
40 pipe_socketpair(
41 	int	caller_fds[2],
42 	int *	is_pipe
43 	)
44 {
45 	int	rc;
46 	int	fds[2];
47 	int	called_pipe;
48 
49 #ifdef HAVE_SOCKETPAIR
50 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
51 #else
52 	rc = -1;
53 #endif
54 
55 	if (-1 == rc) {
56 		rc = pipe(&fds[0]);
57 		called_pipe = TRUE;
58 	} else {
59 		called_pipe = FALSE;
60 	}
61 
62 	if (-1 == rc)
63 		return rc;
64 
65 	caller_fds[0] = fds[0];
66 	caller_fds[1] = fds[1];
67 	if (is_pipe != NULL)
68 		*is_pipe = called_pipe;
69 
70 	return 0;
71 }
72 
73 
74 /*
75  * close_all_except()
76  *
77  * Close all file descriptors except the given keep_fd.
78  */
79 void
80 close_all_except(
81 	int keep_fd
82 	)
83 {
84 	int fd;
85 
86 	for (fd = 0; fd < keep_fd; fd++)
87 		close(fd);
88 
89 	close_all_beyond(keep_fd);
90 }
91 
92 
93 /*
94  * close_all_beyond()
95  *
96  * Close all file descriptors after the given keep_fd, which is the
97  * highest fd to keep open.
98  */
99 void
100 close_all_beyond(
101 	int keep_fd
102 	)
103 {
104 # ifdef HAVE_CLOSEFROM
105 	closefrom(keep_fd + 1);
106 # elif defined(F_CLOSEM)
107 	/*
108 	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
109 	 * by Eric Agar (saves us from doing 32767 system
110 	 * calls)
111 	 */
112 	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
113 		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
114 # else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
115 	int fd;
116 	int max_fd;
117 
118 	max_fd = GETDTABLESIZE();
119 	for (fd = keep_fd + 1; fd < max_fd; fd++)
120 		close(fd);
121 # endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
122 }
123 #endif	/* HAVE_IO_COMPLETION_PORT */
124 
125 
126 u_int
127 available_blocking_child_slot(void)
128 {
129 	const size_t	each = sizeof(blocking_children[0]);
130 	u_int		slot;
131 	size_t		prev_alloc;
132 	size_t		new_alloc;
133 	size_t		prev_octets;
134 	size_t		octets;
135 
136 	for (slot = 0; slot < blocking_children_alloc; slot++) {
137 		if (NULL == blocking_children[slot])
138 			return slot;
139 		if (blocking_children[slot]->reusable) {
140 			blocking_children[slot]->reusable = FALSE;
141 			return slot;
142 		}
143 	}
144 
145 	prev_alloc = blocking_children_alloc;
146 	prev_octets = prev_alloc * each;
147 	new_alloc = blocking_children_alloc + 4;
148 	octets = new_alloc * each;
149 	blocking_children = erealloc_zero(blocking_children, octets,
150 					  prev_octets);
151 	blocking_children_alloc = new_alloc;
152 
153 	return prev_alloc;
154 }
155 
156 
157 int
158 queue_blocking_request(
159 	blocking_work_req	rtype,
160 	void *			req,
161 	size_t			reqsize,
162 	blocking_work_callback	done_func,
163 	void *			context
164 	)
165 {
166 	static u_int		intres_slot = UINT_MAX;
167 	u_int			child_slot;
168 	blocking_child *	c;
169 	blocking_pipe_header	req_hdr;
170 
171 	req_hdr.octets = sizeof(req_hdr) + reqsize;
172 	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
173 	req_hdr.rtype = rtype;
174 	req_hdr.done_func = done_func;
175 	req_hdr.context = context;
176 
177 	child_slot = UINT_MAX;
178 	if (worker_per_query || UINT_MAX == intres_slot ||
179 	    blocking_children[intres_slot]->reusable)
180 		child_slot = available_blocking_child_slot();
181 	if (!worker_per_query) {
182 		if (UINT_MAX == intres_slot)
183 			intres_slot = child_slot;
184 		else
185 			child_slot = intres_slot;
186 		if (0 == intres_req_pending)
187 			intres_timeout_req(0);
188 	}
189 	intres_req_pending++;
190 	INSIST(UINT_MAX != child_slot);
191 	c = blocking_children[child_slot];
192 	if (NULL == c) {
193 		c = emalloc_zero(sizeof(*c));
194 #ifdef WORK_FORK
195 		c->req_read_pipe = -1;
196 		c->req_write_pipe = -1;
197 #endif
198 #ifdef WORK_PIPE
199 		c->resp_read_pipe = -1;
200 		c->resp_write_pipe = -1;
201 #endif
202 		blocking_children[child_slot] = c;
203 	}
204 	req_hdr.child_idx = child_slot;
205 
206 	return send_blocking_req_internal(c, &req_hdr, req);
207 }
208 
209 
210 int queue_blocking_response(
211 	blocking_child *		c,
212 	blocking_pipe_header *		resp,
213 	size_t				respsize,
214 	const blocking_pipe_header *	req
215 	)
216 {
217 	resp->octets = respsize;
218 	resp->magic_sig = BLOCKING_RESP_MAGIC;
219 	resp->rtype = req->rtype;
220 	resp->context = req->context;
221 	resp->done_func = req->done_func;
222 
223 	return send_blocking_resp_internal(c, resp);
224 }
225 
226 
227 void
228 process_blocking_resp(
229 	blocking_child *	c
230 	)
231 {
232 	blocking_pipe_header *	resp;
233 	void *			data;
234 
235 	/*
236 	 * On Windows send_blocking_resp_internal() may signal the
237 	 * blocking_response_ready event multiple times while we're
238 	 * processing a response, so always consume all available
239 	 * responses before returning to test the event again.
240 	 */
241 #ifdef WORK_THREAD
242 	do {
243 #endif
244 		resp = receive_blocking_resp_internal(c);
245 		if (NULL != resp) {
246 			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
247 				      resp->magic_sig);
248 			data = (char *)resp + sizeof(*resp);
249 			intres_req_pending--;
250 			(*resp->done_func)(resp->rtype, resp->context,
251 					   resp->octets - sizeof(*resp),
252 					   data);
253 			free(resp);
254 		}
255 #ifdef WORK_THREAD
256 	} while (NULL != resp);
257 #endif
258 	if (!worker_per_query && 0 == intres_req_pending)
259 		intres_timeout_req(CHILD_MAX_IDLE);
260 	else if (worker_per_query)
261 		req_child_exit(c);
262 }
263 
264 
265 /*
266  * blocking_child_common runs as a forked child or a thread
267  */
268 int
269 blocking_child_common(
270 	blocking_child	*c
271 	)
272 {
273 	int say_bye;
274 	blocking_pipe_header *req;
275 
276 	say_bye = FALSE;
277 	while (!say_bye) {
278 		req = receive_blocking_req_internal(c);
279 		if (NULL == req) {
280 			say_bye = TRUE;
281 			break;
282 		}
283 
284 		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
285 
286 		switch (req->rtype) {
287 		case BLOCKING_GETADDRINFO:
288 			if (blocking_getaddrinfo(c, req))
289 				say_bye = TRUE;
290 			break;
291 
292 		case BLOCKING_GETNAMEINFO:
293 			if (blocking_getnameinfo(c, req))
294 				say_bye = TRUE;
295 			break;
296 
297 		default:
298 			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
299 			say_bye = TRUE;
300 		}
301 
302 		free(req);
303 	}
304 
305 	return 0;
306 }
307 
308 
309 /*
310  * worker_idle_timer_fired()
311  *
312  * The parent starts this timer when the last pending response has been
313  * received from the child, making it idle, and clears the timer when a
314  * request is dispatched to the child.  Once the timer expires, the
315  * child is sent packing.
316  *
317  * This is called when worker_idle_timer is nonzero and less than or
318  * equal to current_time.
319  */
320 void
321 worker_idle_timer_fired(void)
322 {
323 	u_int			idx;
324 	blocking_child *	c;
325 
326 	DEBUG_REQUIRE(0 == intres_req_pending);
327 
328 	intres_timeout_req(0);
329 	for (idx = 0; idx < blocking_children_alloc; idx++) {
330 		c = blocking_children[idx];
331 		if (NULL == c)
332 			continue;
333 		req_child_exit(c);
334 	}
335 }
336 
337 
338 #else	/* !WORKER follows */
339 int ntp_worker_nonempty_compilation_unit;
340 #endif
341