1 /*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <sys/types.h>
29 #include <sys/event.h>
30 #include <sys/socket.h>
31 #include <sys/time.h>
32
33 #include <assert.h>
34 #include <errno.h>
35 #include <nsswitch.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39
40 #include "cachelib.h"
41 #include "config.h"
42 #include "debug.h"
43 #include "log.h"
44 #include "query.h"
45 #include "mp_rs_query.h"
46 #include "mp_ws_query.h"
47 #include "singletons.h"
48
49 static int on_mp_read_session_close_notification(struct query_state *);
50 static void on_mp_read_session_destroy(struct query_state *);
51 static int on_mp_read_session_mapper(struct query_state *);
52 /* int on_mp_read_session_request_read1(struct query_state *); */
53 static int on_mp_read_session_request_read2(struct query_state *);
54 static int on_mp_read_session_request_process(struct query_state *);
55 static int on_mp_read_session_response_write1(struct query_state *);
56 static int on_mp_read_session_read_request_process(struct query_state *);
57 static int on_mp_read_session_read_response_write1(struct query_state *);
58 static int on_mp_read_session_read_response_write2(struct query_state *);
59
60 /*
61 * This function is used as the query_state's destroy_func to make the
62 * proper cleanup in case of errors.
63 */
64 static void
on_mp_read_session_destroy(struct query_state * qstate)65 on_mp_read_session_destroy(struct query_state *qstate)
66 {
67 TRACE_IN(on_mp_read_session_destroy);
68 finalize_comm_element(&qstate->request);
69 finalize_comm_element(&qstate->response);
70
71 if (qstate->mdata != NULL) {
72 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
73 close_cache_mp_read_session(
74 (cache_mp_read_session)qstate->mdata);
75 configuration_unlock_entry(qstate->config_entry,
76 CELT_MULTIPART);
77 }
78 TRACE_OUT(on_mp_read_session_destroy);
79 }
80
81 /*
82 * The functions below are used to process multipart read session initiation
83 * requests.
84 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
85 * the request itself
86 * - on_mp_read_session_request_process processes it
87 * - on_mp_read_session_response_write1 sends the response
88 */
89 int
on_mp_read_session_request_read1(struct query_state * qstate)90 on_mp_read_session_request_read1(struct query_state *qstate)
91 {
92 struct cache_mp_read_session_request *c_mp_rs_request;
93 ssize_t result;
94
95 TRACE_IN(on_mp_read_session_request_read1);
96 if (qstate->kevent_watermark == 0)
97 qstate->kevent_watermark = sizeof(size_t);
98 else {
99 init_comm_element(&qstate->request,
100 CET_MP_READ_SESSION_REQUEST);
101 c_mp_rs_request = get_cache_mp_read_session_request(
102 &qstate->request);
103
104 result = qstate->read_func(qstate,
105 &c_mp_rs_request->entry_length, sizeof(size_t));
106
107 if (result != sizeof(size_t)) {
108 TRACE_OUT(on_mp_read_session_request_read1);
109 return (-1);
110 }
111
112 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
113 TRACE_OUT(on_mp_read_session_request_read1);
114 return (-1);
115 }
116
117 c_mp_rs_request->entry = calloc(1,
118 c_mp_rs_request->entry_length + 1);
119 assert(c_mp_rs_request->entry != NULL);
120
121 qstate->kevent_watermark = c_mp_rs_request->entry_length;
122 qstate->process_func = on_mp_read_session_request_read2;
123 }
124 TRACE_OUT(on_mp_read_session_request_read1);
125 return (0);
126 }
127
128 static int
on_mp_read_session_request_read2(struct query_state * qstate)129 on_mp_read_session_request_read2(struct query_state *qstate)
130 {
131 struct cache_mp_read_session_request *c_mp_rs_request;
132 ssize_t result;
133
134 TRACE_IN(on_mp_read_session_request_read2);
135 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
136
137 result = qstate->read_func(qstate, c_mp_rs_request->entry,
138 c_mp_rs_request->entry_length);
139
140 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
141 LOG_ERR_3("on_mp_read_session_request_read2",
142 "read failed");
143 TRACE_OUT(on_mp_read_session_request_read2);
144 return (-1);
145 }
146
147 qstate->kevent_watermark = 0;
148 qstate->process_func = on_mp_read_session_request_process;
149 TRACE_OUT(on_mp_read_session_request_read2);
150 return (0);
151 }
152
153 static int
on_mp_read_session_request_process(struct query_state * qstate)154 on_mp_read_session_request_process(struct query_state *qstate)
155 {
156 struct cache_mp_read_session_request *c_mp_rs_request;
157 struct cache_mp_read_session_response *c_mp_rs_response;
158 cache_mp_read_session rs;
159 cache_entry c_entry;
160 char *dec_cache_entry_name;
161
162 char *buffer;
163 size_t buffer_size;
164 cache_mp_write_session ws;
165 struct agent *lookup_agent;
166 struct multipart_agent *mp_agent;
167 void *mdata;
168 int res;
169
170 TRACE_IN(on_mp_read_session_request_process);
171 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
172 c_mp_rs_response = get_cache_mp_read_session_response(
173 &qstate->response);
174 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
175
176 qstate->config_entry = configuration_find_entry(
177 s_configuration, c_mp_rs_request->entry);
178 if (qstate->config_entry == NULL) {
179 c_mp_rs_response->error_code = ENOENT;
180
181 LOG_ERR_2("read_session_request",
182 "can't find configuration entry '%s'."
183 " aborting request", c_mp_rs_request->entry);
184 goto fin;
185 }
186
187 if (qstate->config_entry->enabled == 0) {
188 c_mp_rs_response->error_code = EACCES;
189
190 LOG_ERR_2("read_session_request",
191 "configuration entry '%s' is disabled",
192 c_mp_rs_request->entry);
193 goto fin;
194 }
195
196 if (qstate->config_entry->perform_actual_lookups != 0)
197 dec_cache_entry_name = strdup(
198 qstate->config_entry->mp_cache_params.cep.entry_name);
199 else {
200 #ifdef NS_NSCD_EID_CHECKING
201 if (check_query_eids(qstate) != 0) {
202 c_mp_rs_response->error_code = EPERM;
203 goto fin;
204 }
205 #endif
206
207 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
208 qstate->config_entry->mp_cache_params.cep.entry_name);
209 }
210
211 assert(dec_cache_entry_name != NULL);
212
213 configuration_lock_rdlock(s_configuration);
214 c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
215 configuration_unlock(s_configuration);
216
217 if ((c_entry == INVALID_CACHE) &&
218 (qstate->config_entry->perform_actual_lookups != 0))
219 c_entry = register_new_mp_cache_entry(qstate,
220 dec_cache_entry_name);
221
222 free(dec_cache_entry_name);
223
224 if (c_entry != INVALID_CACHE_ENTRY) {
225 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
226 rs = open_cache_mp_read_session(c_entry);
227 configuration_unlock_entry(qstate->config_entry,
228 CELT_MULTIPART);
229
230 if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
231 (qstate->config_entry->perform_actual_lookups != 0)) {
232 lookup_agent = find_agent(s_agent_table,
233 c_mp_rs_request->entry, MULTIPART_AGENT);
234
235 if ((lookup_agent != NULL) &&
236 (lookup_agent->type == MULTIPART_AGENT)) {
237 mp_agent = (struct multipart_agent *)
238 lookup_agent;
239 mdata = mp_agent->mp_init_func();
240
241 /*
242 * Multipart agents read the whole snapshot
243 * of the data at one time.
244 */
245 configuration_lock_entry(qstate->config_entry,
246 CELT_MULTIPART);
247 ws = open_cache_mp_write_session(c_entry);
248 configuration_unlock_entry(qstate->config_entry,
249 CELT_MULTIPART);
250 if (ws != NULL) {
251 do {
252 buffer = NULL;
253 res = mp_agent->mp_lookup_func(&buffer,
254 &buffer_size,
255 mdata);
256
257 if ((res & NS_TERMINATE) &&
258 (buffer != NULL)) {
259 configuration_lock_entry(
260 qstate->config_entry,
261 CELT_MULTIPART);
262 if (cache_mp_write(ws, buffer,
263 buffer_size) != 0) {
264 abandon_cache_mp_write_session(ws);
265 ws = NULL;
266 }
267 configuration_unlock_entry(
268 qstate->config_entry,
269 CELT_MULTIPART);
270
271 free(buffer);
272 buffer = NULL;
273 } else {
274 configuration_lock_entry(
275 qstate->config_entry,
276 CELT_MULTIPART);
277 close_cache_mp_write_session(ws);
278 configuration_unlock_entry(
279 qstate->config_entry,
280 CELT_MULTIPART);
281
282 free(buffer);
283 buffer = NULL;
284 }
285 } while ((res & NS_TERMINATE) &&
286 (ws != NULL));
287 }
288
289 configuration_lock_entry(qstate->config_entry,
290 CELT_MULTIPART);
291 rs = open_cache_mp_read_session(c_entry);
292 configuration_unlock_entry(qstate->config_entry,
293 CELT_MULTIPART);
294 }
295 }
296
297 if (rs == INVALID_CACHE_MP_READ_SESSION)
298 c_mp_rs_response->error_code = -1;
299 else {
300 qstate->mdata = rs;
301 qstate->destroy_func = on_mp_read_session_destroy;
302
303 configuration_lock_entry(qstate->config_entry,
304 CELT_MULTIPART);
305 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
306 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
307 memcpy(&qstate->timeout,
308 &qstate->config_entry->mp_query_timeout,
309 sizeof(struct timeval));
310 configuration_unlock_entry(qstate->config_entry,
311 CELT_MULTIPART);
312 }
313 } else
314 c_mp_rs_response->error_code = -1;
315
316 fin:
317 qstate->process_func = on_mp_read_session_response_write1;
318 qstate->kevent_watermark = sizeof(int);
319 qstate->kevent_filter = EVFILT_WRITE;
320
321 TRACE_OUT(on_mp_read_session_request_process);
322 return (0);
323 }
324
325 static int
on_mp_read_session_response_write1(struct query_state * qstate)326 on_mp_read_session_response_write1(struct query_state *qstate)
327 {
328 struct cache_mp_read_session_response *c_mp_rs_response;
329 ssize_t result;
330
331 TRACE_IN(on_mp_read_session_response_write1);
332 c_mp_rs_response = get_cache_mp_read_session_response(
333 &qstate->response);
334 result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
335 sizeof(int));
336
337 if (result != sizeof(int)) {
338 LOG_ERR_3("on_mp_read_session_response_write1",
339 "write failed");
340 TRACE_OUT(on_mp_read_session_response_write1);
341 return (-1);
342 }
343
344 if (c_mp_rs_response->error_code == 0) {
345 qstate->kevent_watermark = sizeof(int);
346 qstate->process_func = on_mp_read_session_mapper;
347 qstate->kevent_filter = EVFILT_READ;
348 } else {
349 qstate->kevent_watermark = 0;
350 qstate->process_func = NULL;
351 }
352 TRACE_OUT(on_mp_read_session_response_write1);
353 return (0);
354 }
355
356 /*
357 * Mapper function is used to avoid multiple connections for each session
358 * write or read requests. After processing the request, it does not close
359 * the connection, but waits for the next request.
360 */
361 static int
on_mp_read_session_mapper(struct query_state * qstate)362 on_mp_read_session_mapper(struct query_state *qstate)
363 {
364 ssize_t result;
365 int elem_type;
366
367 TRACE_IN(on_mp_read_session_mapper);
368 if (qstate->kevent_watermark == 0) {
369 qstate->kevent_watermark = sizeof(int);
370 } else {
371 result = qstate->read_func(qstate, &elem_type, sizeof(int));
372 if (result != sizeof(int)) {
373 LOG_ERR_3("on_mp_read_session_mapper",
374 "read failed");
375 TRACE_OUT(on_mp_read_session_mapper);
376 return (-1);
377 }
378
379 switch (elem_type) {
380 case CET_MP_READ_SESSION_READ_REQUEST:
381 qstate->kevent_watermark = 0;
382 qstate->process_func =
383 on_mp_read_session_read_request_process;
384 break;
385 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
386 qstate->kevent_watermark = 0;
387 qstate->process_func =
388 on_mp_read_session_close_notification;
389 break;
390 default:
391 qstate->kevent_watermark = 0;
392 qstate->process_func = NULL;
393 LOG_ERR_3("on_mp_read_session_mapper",
394 "unknown element type");
395 TRACE_OUT(on_mp_read_session_mapper);
396 return (-1);
397 }
398 }
399 TRACE_OUT(on_mp_read_session_mapper);
400 return (0);
401 }
402
403 /*
404 * The functions below are used to process multipart read sessions read
405 * requests. User doesn't have to pass any kind of data, besides the
406 * request identificator itself. So we don't need any XXX_read functions and
407 * start with the XXX_process function.
408 * - on_mp_read_session_read_request_process processes it
409 * - on_mp_read_session_read_response_write1 and
410 * on_mp_read_session_read_response_write2 sends the response
411 */
412 static int
on_mp_read_session_read_request_process(struct query_state * qstate)413 on_mp_read_session_read_request_process(struct query_state *qstate)
414 {
415 struct cache_mp_read_session_read_response *read_response;
416
417 TRACE_IN(on_mp_read_session_response_process);
418 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
419 read_response = get_cache_mp_read_session_read_response(
420 &qstate->response);
421
422 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
423 read_response->error_code = cache_mp_read(
424 (cache_mp_read_session)qstate->mdata, NULL,
425 &read_response->data_size);
426
427 if (read_response->error_code == 0) {
428 read_response->data = malloc(read_response->data_size);
429 assert(read_response != NULL);
430 read_response->error_code = cache_mp_read(
431 (cache_mp_read_session)qstate->mdata,
432 read_response->data,
433 &read_response->data_size);
434 }
435 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
436
437 if (read_response->error_code == 0)
438 qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
439 else
440 qstate->kevent_watermark = sizeof(int);
441 qstate->process_func = on_mp_read_session_read_response_write1;
442 qstate->kevent_filter = EVFILT_WRITE;
443
444 TRACE_OUT(on_mp_read_session_response_process);
445 return (0);
446 }
447
448 static int
on_mp_read_session_read_response_write1(struct query_state * qstate)449 on_mp_read_session_read_response_write1(struct query_state *qstate)
450 {
451 struct cache_mp_read_session_read_response *read_response;
452 ssize_t result;
453
454 TRACE_IN(on_mp_read_session_read_response_write1);
455 read_response = get_cache_mp_read_session_read_response(
456 &qstate->response);
457
458 result = qstate->write_func(qstate, &read_response->error_code,
459 sizeof(int));
460 if (read_response->error_code == 0) {
461 result += qstate->write_func(qstate, &read_response->data_size,
462 sizeof(size_t));
463 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
464 TRACE_OUT(on_mp_read_session_read_response_write1);
465 LOG_ERR_3("on_mp_read_session_read_response_write1",
466 "write failed");
467 return (-1);
468 }
469
470 qstate->kevent_watermark = read_response->data_size;
471 qstate->process_func = on_mp_read_session_read_response_write2;
472 } else {
473 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
474 LOG_ERR_3("on_mp_read_session_read_response_write1",
475 "write failed");
476 TRACE_OUT(on_mp_read_session_read_response_write1);
477 return (-1);
478 }
479
480 qstate->kevent_watermark = 0;
481 qstate->process_func = NULL;
482 }
483
484 TRACE_OUT(on_mp_read_session_read_response_write1);
485 return (0);
486 }
487
488 static int
on_mp_read_session_read_response_write2(struct query_state * qstate)489 on_mp_read_session_read_response_write2(struct query_state *qstate)
490 {
491 struct cache_mp_read_session_read_response *read_response;
492 ssize_t result;
493
494 TRACE_IN(on_mp_read_session_read_response_write2);
495 read_response = get_cache_mp_read_session_read_response(
496 &qstate->response);
497 result = qstate->write_func(qstate, read_response->data,
498 read_response->data_size);
499 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
500 LOG_ERR_3("on_mp_read_session_read_response_write2",
501 "write failed");
502 TRACE_OUT(on_mp_read_session_read_response_write2);
503 return (-1);
504 }
505
506 finalize_comm_element(&qstate->request);
507 finalize_comm_element(&qstate->response);
508
509 qstate->kevent_watermark = sizeof(int);
510 qstate->process_func = on_mp_read_session_mapper;
511 qstate->kevent_filter = EVFILT_READ;
512
513 TRACE_OUT(on_mp_read_session_read_response_write2);
514 return (0);
515 }
516
517 /*
518 * Handles session close notification by calling close_cache_mp_read_session
519 * function.
520 */
521 static int
on_mp_read_session_close_notification(struct query_state * qstate)522 on_mp_read_session_close_notification(struct query_state *qstate)
523 {
524
525 TRACE_IN(on_mp_read_session_close_notification);
526 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
527 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
528 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
529 qstate->mdata = NULL;
530 qstate->kevent_watermark = 0;
531 qstate->process_func = NULL;
532 TRACE_OUT(on_mp_read_session_close_notification);
533 return (0);
534 }
535