xref: /freebsd/usr.sbin/nscd/mp_rs_query.c (revision 8847579c57d6aff2b3371c707dce7a2cee8389aa)
1 /*-
2  * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/socket.h>
32 #include <sys/time.h>
33 #include <sys/types.h>
34 #include <sys/event.h>
35 #include <assert.h>
36 #include <errno.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stdio.h>
40 
41 #include "cachelib.h"
42 #include "config.h"
43 #include "debug.h"
44 #include "log.h"
45 #include "query.h"
46 #include "mp_rs_query.h"
47 #include "mp_ws_query.h"
48 #include "singletons.h"
49 
50 static int on_mp_read_session_close_notification(struct query_state *);
51 static void on_mp_read_session_destroy(struct query_state *);
52 static int on_mp_read_session_mapper(struct query_state *);
53 /* int on_mp_read_session_request_read1(struct query_state *); */
54 static int on_mp_read_session_request_read2(struct query_state *);
55 static int on_mp_read_session_request_process(struct query_state *);
56 static int on_mp_read_session_response_write1(struct query_state *);
57 static int on_mp_read_session_read_request_process(struct query_state *);
58 static int on_mp_read_session_read_response_write1(struct query_state *);
59 static int on_mp_read_session_read_response_write2(struct query_state *);
60 
61 /*
62  * This function is used as the query_state's destroy_func to make the
63  * proper cleanup in case of errors.
64  */
65 static void
66 on_mp_read_session_destroy(struct query_state *qstate)
67 {
68 	TRACE_IN(on_mp_read_session_destroy);
69 	finalize_comm_element(&qstate->request);
70 	finalize_comm_element(&qstate->response);
71 
72 	if (qstate->mdata != NULL) {
73 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
74 		close_cache_mp_read_session(
75 	    		(cache_mp_read_session)qstate->mdata);
76 		configuration_unlock_entry(qstate->config_entry,
77 			CELT_MULTIPART);
78 	}
79 	TRACE_OUT(on_mp_read_session_destroy);
80 }
81 
82 /*
83  * The functions below are used to process multipart read session initiation
84  * requests.
85  * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
86  *   the request itself
87  * - on_mp_read_session_request_process processes it
88  * - on_mp_read_session_response_write1 sends the response
89  */
90 int
91 on_mp_read_session_request_read1(struct query_state *qstate)
92 {
93 	struct cache_mp_read_session_request	*c_mp_rs_request;
94 	ssize_t	result;
95 
96 	TRACE_IN(on_mp_read_session_request_read1);
97 	if (qstate->kevent_watermark == 0)
98 		qstate->kevent_watermark = sizeof(size_t);
99 	else {
100 		init_comm_element(&qstate->request,
101 	    		CET_MP_READ_SESSION_REQUEST);
102 		c_mp_rs_request = get_cache_mp_read_session_request(
103 	    		&qstate->request);
104 
105 		result = qstate->read_func(qstate,
106 	    		&c_mp_rs_request->entry_length, sizeof(size_t));
107 
108 		if (result != sizeof(size_t)) {
109 			TRACE_OUT(on_mp_read_session_request_read1);
110 			return (-1);
111 		}
112 
113 		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
114 			TRACE_OUT(on_mp_read_session_request_read1);
115 			return (-1);
116 		}
117 
118 		c_mp_rs_request->entry = (char *)malloc(
119 			c_mp_rs_request->entry_length + 1);
120 		assert(c_mp_rs_request->entry != NULL);
121 		memset(c_mp_rs_request->entry, 0,
122 			c_mp_rs_request->entry_length + 1);
123 
124 		qstate->kevent_watermark = c_mp_rs_request->entry_length;
125 		qstate->process_func = on_mp_read_session_request_read2;
126 	}
127 	TRACE_OUT(on_mp_read_session_request_read1);
128 	return (0);
129 }
130 
131 static int
132 on_mp_read_session_request_read2(struct query_state *qstate)
133 {
134 	struct cache_mp_read_session_request	*c_mp_rs_request;
135 	ssize_t	result;
136 
137 	TRACE_IN(on_mp_read_session_request_read2);
138 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
139 
140 	result = qstate->read_func(qstate, c_mp_rs_request->entry,
141 		c_mp_rs_request->entry_length);
142 
143 	if (result != qstate->kevent_watermark) {
144 		LOG_ERR_3("on_mp_read_session_request_read2",
145 			"read failed");
146 		TRACE_OUT(on_mp_read_session_request_read2);
147 		return (-1);
148 	}
149 
150 	qstate->kevent_watermark = 0;
151 	qstate->process_func = on_mp_read_session_request_process;
152 	TRACE_OUT(on_mp_read_session_request_read2);
153 	return (0);
154 }
155 
156 static int
157 on_mp_read_session_request_process(struct query_state *qstate)
158 {
159 	struct cache_mp_read_session_request	*c_mp_rs_request;
160 	struct cache_mp_read_session_response	*c_mp_rs_response;
161 	cache_mp_read_session	rs;
162 	cache_entry	c_entry;
163 	char	*dec_cache_entry_name;
164 
165 	char *buffer;
166 	size_t buffer_size;
167 	cache_mp_write_session ws;
168 	struct agent	*lookup_agent;
169 	struct multipart_agent *mp_agent;
170 	void *mdata;
171 	int res;
172 
173 	TRACE_IN(on_mp_read_session_request_process);
174 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
175 	c_mp_rs_response = get_cache_mp_read_session_response(
176 		&qstate->response);
177 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178 
179 	qstate->config_entry = configuration_find_entry(
180 		s_configuration, c_mp_rs_request->entry);
181 	if (qstate->config_entry == NULL) {
182 		c_mp_rs_response->error_code = ENOENT;
183 
184 		LOG_ERR_2("read_session_request",
185 			"can't find configuration entry '%s'."
186 			" aborting request", c_mp_rs_request->entry);
187 		goto fin;
188 	}
189 
190 	if (qstate->config_entry->enabled == 0) {
191 		c_mp_rs_response->error_code = EACCES;
192 
193 		LOG_ERR_2("read_session_request",
194 			"configuration entry '%s' is disabled",
195 			c_mp_rs_request->entry);
196 		goto fin;
197 	}
198 
199 	if (qstate->config_entry->perform_actual_lookups != 0)
200 		dec_cache_entry_name = strdup(
201 			qstate->config_entry->mp_cache_params.entry_name);
202 	else {
203 #ifdef NS_CACHED_EID_CHECKING
204 		if (check_query_eids(qstate) != 0) {
205 			c_mp_rs_response->error_code = EPERM;
206 			goto fin;
207 		}
208 #endif
209 
210 		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211 			qstate->config_entry->mp_cache_params.entry_name);
212 	}
213 
214 	assert(dec_cache_entry_name != NULL);
215 
216 	configuration_lock_rdlock(s_configuration);
217 	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
218 	configuration_unlock(s_configuration);
219 
220 	if ((c_entry == INVALID_CACHE) &&
221 	   (qstate->config_entry->perform_actual_lookups != 0))
222 		c_entry = register_new_mp_cache_entry(qstate,
223 			dec_cache_entry_name);
224 
225 	free(dec_cache_entry_name);
226 
227 	if (c_entry != INVALID_CACHE_ENTRY) {
228 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
229 		rs = open_cache_mp_read_session(c_entry);
230 		configuration_unlock_entry(qstate->config_entry,
231 			CELT_MULTIPART);
232 
233 		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234 		   (qstate->config_entry->perform_actual_lookups != 0)) {
235 			lookup_agent = find_agent(s_agent_table,
236 				c_mp_rs_request->entry, MULTIPART_AGENT);
237 
238 			if ((lookup_agent != NULL) &&
239 			(lookup_agent->type == MULTIPART_AGENT)) {
240 				mp_agent = (struct multipart_agent *)
241 					lookup_agent;
242 				mdata = mp_agent->mp_init_func();
243 
244 				/*
245 				 * Multipart agents read the whole snapshot
246 				 * of the data at one time.
247 				 */
248 				configuration_lock_entry(qstate->config_entry,
249 					CELT_MULTIPART);
250 				ws = open_cache_mp_write_session(c_entry);
251 				configuration_unlock_entry(qstate->config_entry,
252 					CELT_MULTIPART);
253 				if (ws != NULL) {
254 				    do {
255 					buffer = NULL;
256 					res = mp_agent->mp_lookup_func(&buffer,
257 						&buffer_size,
258 						mdata);
259 
260 					if ((res & NS_TERMINATE) &&
261 					   (buffer != NULL)) {
262 						configuration_lock_entry(
263 							qstate->config_entry,
264 						   	CELT_MULTIPART);
265 						if (cache_mp_write(ws, buffer,
266 						    buffer_size) != 0) {
267 							abandon_cache_mp_write_session(ws);
268 							ws = NULL;
269 						}
270 						configuration_unlock_entry(
271 							qstate->config_entry,
272 							CELT_MULTIPART);
273 
274 						free(buffer);
275 						buffer = NULL;
276 					} else {
277 						configuration_lock_entry(
278 							qstate->config_entry,
279 							CELT_MULTIPART);
280 						close_cache_mp_write_session(ws);
281 						configuration_unlock_entry(
282 							qstate->config_entry,
283 							CELT_MULTIPART);
284 
285 						free(buffer);
286 						buffer = NULL;
287 					}
288 				    } while ((res & NS_TERMINATE) &&
289 				    	    (ws != NULL));
290 				}
291 
292 				configuration_lock_entry(qstate->config_entry,
293 					CELT_MULTIPART);
294 				rs = open_cache_mp_read_session(c_entry);
295 				configuration_unlock_entry(qstate->config_entry,
296 					CELT_MULTIPART);
297 			}
298 		}
299 
300 		if (rs == INVALID_CACHE_MP_READ_SESSION)
301 			c_mp_rs_response->error_code = -1;
302 		else {
303 		    qstate->mdata = rs;
304 		    qstate->destroy_func = on_mp_read_session_destroy;
305 
306 		    configuration_lock_entry(qstate->config_entry,
307 			CELT_MULTIPART);
308 		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309 		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310 			memcpy(&qstate->timeout,
311 			    &qstate->config_entry->mp_query_timeout,
312 			    sizeof(struct timeval));
313 		    configuration_unlock_entry(qstate->config_entry,
314 			CELT_MULTIPART);
315 		}
316 	} else
317 		c_mp_rs_response->error_code = -1;
318 
319 fin:
320 	qstate->process_func = on_mp_read_session_response_write1;
321 	qstate->kevent_watermark = sizeof(int);
322 	qstate->kevent_filter = EVFILT_WRITE;
323 
324 	TRACE_OUT(on_mp_read_session_request_process);
325 	return (0);
326 }
327 
328 static int
329 on_mp_read_session_response_write1(struct query_state *qstate)
330 {
331 	struct cache_mp_read_session_response	*c_mp_rs_response;
332 	ssize_t	result;
333 
334 	TRACE_IN(on_mp_read_session_response_write1);
335 	c_mp_rs_response = get_cache_mp_read_session_response(
336 		&qstate->response);
337 	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338 		sizeof(int));
339 
340 	if (result != sizeof(int)) {
341 		LOG_ERR_3("on_mp_read_session_response_write1",
342 			"write failed");
343 		TRACE_OUT(on_mp_read_session_response_write1);
344 		return (-1);
345 	}
346 
347 	if (c_mp_rs_response->error_code == 0) {
348 		qstate->kevent_watermark = sizeof(int);
349 		qstate->process_func = on_mp_read_session_mapper;
350 		qstate->kevent_filter = EVFILT_READ;
351 	} else {
352 		qstate->kevent_watermark = 0;
353 		qstate->process_func = NULL;
354 	}
355 	TRACE_OUT(on_mp_read_session_response_write1);
356 	return (0);
357 }
358 
359 /*
360  * Mapper function is used to avoid multiple connections for each session
361  * write or read requests. After processing the request, it does not close
362  * the connection, but waits for the next request.
363  */
364 static int
365 on_mp_read_session_mapper(struct query_state *qstate)
366 {
367 	ssize_t	result;
368 	int elem_type;
369 
370 	TRACE_IN(on_mp_read_session_mapper);
371 	if (qstate->kevent_watermark == 0) {
372 		qstate->kevent_watermark = sizeof(int);
373 	} else {
374 		result = qstate->read_func(qstate, &elem_type, sizeof(int));
375 		if (result != sizeof(int)) {
376 			LOG_ERR_3("on_mp_read_session_mapper",
377 				"read failed");
378 			TRACE_OUT(on_mp_read_session_mapper);
379 			return (-1);
380 		}
381 
382 		switch (elem_type) {
383 		case CET_MP_READ_SESSION_READ_REQUEST:
384 			qstate->kevent_watermark = 0;
385 			qstate->process_func =
386 				on_mp_read_session_read_request_process;
387 			break;
388 		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
389 			qstate->kevent_watermark = 0;
390 			qstate->process_func =
391 				on_mp_read_session_close_notification;
392 			break;
393 		default:
394 			qstate->kevent_watermark = 0;
395 			qstate->process_func = NULL;
396 			LOG_ERR_3("on_mp_read_session_mapper",
397 				"unknown element type");
398 			TRACE_OUT(on_mp_read_session_mapper);
399 			return (-1);
400 		}
401 	}
402 	TRACE_OUT(on_mp_read_session_mapper);
403 	return (0);
404 }
405 
406 /*
407  * The functions below are used to process multipart read sessions read
408  * requests. User doesn't have to pass any kind of data, besides the
409  * request identificator itself. So we don't need any XXX_read functions and
410  * start with the XXX_process function.
411  * - on_mp_read_session_read_request_process processes it
412  * - on_mp_read_session_read_response_write1 and
413  *   on_mp_read_session_read_response_write2 sends the response
414  */
415 static int
416 on_mp_read_session_read_request_process(struct query_state *qstate)
417 {
418 	struct cache_mp_read_session_read_response	*read_response;
419 
420 	TRACE_IN(on_mp_read_session_response_process);
421 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422 	read_response = get_cache_mp_read_session_read_response(
423 		&qstate->response);
424 
425 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426 	read_response->error_code = cache_mp_read(
427 		(cache_mp_read_session)qstate->mdata, NULL,
428 		&read_response->data_size);
429 
430 	if (read_response->error_code == 0) {
431 		read_response->data = (char *)malloc(read_response->data_size);
432 		assert(read_response != NULL);
433 		read_response->error_code = cache_mp_read(
434 			(cache_mp_read_session)qstate->mdata,
435 	    		read_response->data,
436 			&read_response->data_size);
437 	}
438 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439 
440 	if (read_response->error_code == 0)
441 		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442 	else
443 		qstate->kevent_watermark = sizeof(int);
444 	qstate->process_func = on_mp_read_session_read_response_write1;
445 	qstate->kevent_filter = EVFILT_WRITE;
446 
447 	TRACE_OUT(on_mp_read_session_response_process);
448 	return (0);
449 }
450 
451 static int
452 on_mp_read_session_read_response_write1(struct query_state *qstate)
453 {
454 	struct cache_mp_read_session_read_response	*read_response;
455 	ssize_t	result;
456 
457 	TRACE_IN(on_mp_read_session_read_response_write1);
458 	read_response = get_cache_mp_read_session_read_response(
459 		&qstate->response);
460 
461 	result = qstate->write_func(qstate, &read_response->error_code,
462 		sizeof(int));
463 	if (read_response->error_code == 0) {
464 		result += qstate->write_func(qstate, &read_response->data_size,
465 			sizeof(size_t));
466 		if (result != qstate->kevent_watermark) {
467 			TRACE_OUT(on_mp_read_session_read_response_write1);
468 			LOG_ERR_3("on_mp_read_session_read_response_write1",
469 				"write failed");
470 			return (-1);
471 		}
472 
473 		qstate->kevent_watermark = read_response->data_size;
474 		qstate->process_func = on_mp_read_session_read_response_write2;
475 	} else {
476 		if (result != qstate->kevent_watermark) {
477 			LOG_ERR_3("on_mp_read_session_read_response_write1",
478 				"write failed");
479 			TRACE_OUT(on_mp_read_session_read_response_write1);
480 			return (-1);
481 		}
482 
483 		qstate->kevent_watermark = 0;
484 		qstate->process_func = NULL;
485 	}
486 
487 	TRACE_OUT(on_mp_read_session_read_response_write1);
488 	return (0);
489 }
490 
491 static int
492 on_mp_read_session_read_response_write2(struct query_state *qstate)
493 {
494 	struct cache_mp_read_session_read_response *read_response;
495 	ssize_t	result;
496 
497 	TRACE_IN(on_mp_read_session_read_response_write2);
498 	read_response = get_cache_mp_read_session_read_response(
499 		&qstate->response);
500 	result = qstate->write_func(qstate, read_response->data,
501 		read_response->data_size);
502 	if (result != qstate->kevent_watermark) {
503 		LOG_ERR_3("on_mp_read_session_read_response_write2",
504 			"write failed");
505 		TRACE_OUT(on_mp_read_session_read_response_write2);
506 		return (-1);
507 	}
508 
509 	finalize_comm_element(&qstate->request);
510 	finalize_comm_element(&qstate->response);
511 
512 	qstate->kevent_watermark = sizeof(int);
513 	qstate->process_func = on_mp_read_session_mapper;
514 	qstate->kevent_filter = EVFILT_READ;
515 
516 	TRACE_OUT(on_mp_read_session_read_response_write2);
517 	return (0);
518 }
519 
520 /*
521  * Handles session close notification by calling close_cache_mp_read_session
522  * function.
523  */
524 static int
525 on_mp_read_session_close_notification(struct query_state *qstate)
526 {
527 
528 	TRACE_IN(on_mp_read_session_close_notification);
529 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530 	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532 	qstate->mdata = NULL;
533 	qstate->kevent_watermark = 0;
534 	qstate->process_func = NULL;
535 	TRACE_OUT(on_mp_read_session_close_notification);
536 	return (0);
537 }
538