1 /*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <sys/types.h>
29 #include <sys/event.h>
30 #include <sys/socket.h>
31 #include <sys/time.h>
32
33 #include <assert.h>
34 #include <errno.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38
39 #include "cachelib.h"
40 #include "config.h"
41 #include "debug.h"
42 #include "log.h"
43 #include "query.h"
44 #include "mp_ws_query.h"
45 #include "singletons.h"
46
47 static int on_mp_write_session_abandon_notification(struct query_state *);
48 static int on_mp_write_session_close_notification(struct query_state *);
49 static void on_mp_write_session_destroy(struct query_state *);
50 static int on_mp_write_session_mapper(struct query_state *);
51 /* int on_mp_write_session_request_read1(struct query_state *); */
52 static int on_mp_write_session_request_read2(struct query_state *);
53 static int on_mp_write_session_request_process(struct query_state *);
54 static int on_mp_write_session_response_write1(struct query_state *);
55 static int on_mp_write_session_write_request_read1(struct query_state *);
56 static int on_mp_write_session_write_request_read2(struct query_state *);
57 static int on_mp_write_session_write_request_process(struct query_state *);
58 static int on_mp_write_session_write_response_write1(struct query_state *);
59
60 /*
61 * This function is used as the query_state's destroy_func to make the
62 * proper cleanup in case of errors.
63 */
64 static void
on_mp_write_session_destroy(struct query_state * qstate)65 on_mp_write_session_destroy(struct query_state *qstate)
66 {
67
68 TRACE_IN(on_mp_write_session_destroy);
69 finalize_comm_element(&qstate->request);
70 finalize_comm_element(&qstate->response);
71
72 if (qstate->mdata != NULL) {
73 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
74 abandon_cache_mp_write_session(
75 (cache_mp_write_session)qstate->mdata);
76 configuration_unlock_entry(qstate->config_entry,
77 CELT_MULTIPART);
78 }
79 TRACE_OUT(on_mp_write_session_destroy);
80 }
81
82 /*
83 * The functions below are used to process multipart write session initiation
84 * requests.
85 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
86 * read the request itself
87 * - on_mp_write_session_request_process processes it
88 * - on_mp_write_session_response_write1 sends the response
89 */
90 int
on_mp_write_session_request_read1(struct query_state * qstate)91 on_mp_write_session_request_read1(struct query_state *qstate)
92 {
93 struct cache_mp_write_session_request *c_mp_ws_request;
94 ssize_t result;
95
96 TRACE_IN(on_mp_write_session_request_read1);
97 if (qstate->kevent_watermark == 0)
98 qstate->kevent_watermark = sizeof(size_t);
99 else {
100 init_comm_element(&qstate->request,
101 CET_MP_WRITE_SESSION_REQUEST);
102 c_mp_ws_request = get_cache_mp_write_session_request(
103 &qstate->request);
104
105 result = qstate->read_func(qstate,
106 &c_mp_ws_request->entry_length, sizeof(size_t));
107
108 if (result != sizeof(size_t)) {
109 LOG_ERR_3("on_mp_write_session_request_read1",
110 "read failed");
111 TRACE_OUT(on_mp_write_session_request_read1);
112 return (-1);
113 }
114
115 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
116 LOG_ERR_3("on_mp_write_session_request_read1",
117 "invalid entry_length value");
118 TRACE_OUT(on_mp_write_session_request_read1);
119 return (-1);
120 }
121
122 c_mp_ws_request->entry = calloc(1,
123 c_mp_ws_request->entry_length + 1);
124 assert(c_mp_ws_request->entry != NULL);
125
126 qstate->kevent_watermark = c_mp_ws_request->entry_length;
127 qstate->process_func = on_mp_write_session_request_read2;
128 }
129 TRACE_OUT(on_mp_write_session_request_read1);
130 return (0);
131 }
132
133 static int
on_mp_write_session_request_read2(struct query_state * qstate)134 on_mp_write_session_request_read2(struct query_state *qstate)
135 {
136 struct cache_mp_write_session_request *c_mp_ws_request;
137 ssize_t result;
138
139 TRACE_IN(on_mp_write_session_request_read2);
140 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
141
142 result = qstate->read_func(qstate, c_mp_ws_request->entry,
143 c_mp_ws_request->entry_length);
144
145 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
146 LOG_ERR_3("on_mp_write_session_request_read2",
147 "read failed");
148 TRACE_OUT(on_mp_write_session_request_read2);
149 return (-1);
150 }
151
152 qstate->kevent_watermark = 0;
153 qstate->process_func = on_mp_write_session_request_process;
154
155 TRACE_OUT(on_mp_write_session_request_read2);
156 return (0);
157 }
158
159 static int
on_mp_write_session_request_process(struct query_state * qstate)160 on_mp_write_session_request_process(struct query_state *qstate)
161 {
162 struct cache_mp_write_session_request *c_mp_ws_request;
163 struct cache_mp_write_session_response *c_mp_ws_response;
164 cache_mp_write_session ws;
165 cache_entry c_entry;
166 char *dec_cache_entry_name;
167
168 TRACE_IN(on_mp_write_session_request_process);
169 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
170 c_mp_ws_response = get_cache_mp_write_session_response(
171 &qstate->response);
172 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
173
174 qstate->config_entry = configuration_find_entry(
175 s_configuration, c_mp_ws_request->entry);
176 if (qstate->config_entry == NULL) {
177 c_mp_ws_response->error_code = ENOENT;
178
179 LOG_ERR_2("write_session_request",
180 "can't find configuration entry '%s'. "
181 "aborting request", c_mp_ws_request->entry);
182 goto fin;
183 }
184
185 if (qstate->config_entry->enabled == 0) {
186 c_mp_ws_response->error_code = EACCES;
187
188 LOG_ERR_2("write_session_request",
189 "configuration entry '%s' is disabled",
190 c_mp_ws_request->entry);
191 goto fin;
192 }
193
194 if (qstate->config_entry->perform_actual_lookups != 0) {
195 c_mp_ws_response->error_code = EOPNOTSUPP;
196
197 LOG_ERR_2("write_session_request",
198 "entry '%s' performs lookups by itself: "
199 "can't write to it", c_mp_ws_request->entry);
200 goto fin;
201 } else {
202 #ifdef NS_NSCD_EID_CHECKING
203 if (check_query_eids(qstate) != 0) {
204 c_mp_ws_response->error_code = EPERM;
205 goto fin;
206 }
207 #endif
208 }
209
210 /*
211 * All multipart entries are separated by their name decorations.
212 * For one configuration entry there will be a lot of multipart
213 * cache entries - each with its own decorated name.
214 */
215 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
216 qstate->config_entry->mp_cache_params.cep.entry_name);
217 assert(dec_cache_entry_name != NULL);
218
219 configuration_lock_rdlock(s_configuration);
220 c_entry = find_cache_entry(s_cache,
221 dec_cache_entry_name);
222 configuration_unlock(s_configuration);
223
224 if (c_entry == INVALID_CACHE_ENTRY)
225 c_entry = register_new_mp_cache_entry(qstate,
226 dec_cache_entry_name);
227
228 free(dec_cache_entry_name);
229
230 assert(c_entry != NULL);
231 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
232 ws = open_cache_mp_write_session(c_entry);
233 if (ws == INVALID_CACHE_MP_WRITE_SESSION)
234 c_mp_ws_response->error_code = -1;
235 else {
236 qstate->mdata = ws;
237 qstate->destroy_func = on_mp_write_session_destroy;
238
239 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
240 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
241 memcpy(&qstate->timeout,
242 &qstate->config_entry->mp_query_timeout,
243 sizeof(struct timeval));
244 }
245 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
246
247 fin:
248 qstate->process_func = on_mp_write_session_response_write1;
249 qstate->kevent_watermark = sizeof(int);
250 qstate->kevent_filter = EVFILT_WRITE;
251
252 TRACE_OUT(on_mp_write_session_request_process);
253 return (0);
254 }
255
256 static int
on_mp_write_session_response_write1(struct query_state * qstate)257 on_mp_write_session_response_write1(struct query_state *qstate)
258 {
259 struct cache_mp_write_session_response *c_mp_ws_response;
260 ssize_t result;
261
262 TRACE_IN(on_mp_write_session_response_write1);
263 c_mp_ws_response = get_cache_mp_write_session_response(
264 &qstate->response);
265 result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
266 sizeof(int));
267 if (result != sizeof(int)) {
268 LOG_ERR_3("on_mp_write_session_response_write1",
269 "write failed");
270 TRACE_OUT(on_mp_write_session_response_write1);
271 return (-1);
272 }
273
274 if (c_mp_ws_response->error_code == 0) {
275 qstate->kevent_watermark = sizeof(int);
276 qstate->process_func = on_mp_write_session_mapper;
277 qstate->kevent_filter = EVFILT_READ;
278 } else {
279 qstate->kevent_watermark = 0;
280 qstate->process_func = NULL;
281 }
282 TRACE_OUT(on_mp_write_session_response_write1);
283 return (0);
284 }
285
286 /*
287 * Mapper function is used to avoid multiple connections for each session
288 * write or read requests. After processing the request, it does not close
289 * the connection, but waits for the next request.
290 */
291 static int
on_mp_write_session_mapper(struct query_state * qstate)292 on_mp_write_session_mapper(struct query_state *qstate)
293 {
294 ssize_t result;
295 int elem_type;
296
297 TRACE_IN(on_mp_write_session_mapper);
298 if (qstate->kevent_watermark == 0) {
299 qstate->kevent_watermark = sizeof(int);
300 } else {
301 result = qstate->read_func(qstate, &elem_type, sizeof(int));
302 if (result != sizeof(int)) {
303 LOG_ERR_3("on_mp_write_session_mapper",
304 "read failed");
305 TRACE_OUT(on_mp_write_session_mapper);
306 return (-1);
307 }
308
309 switch (elem_type) {
310 case CET_MP_WRITE_SESSION_WRITE_REQUEST:
311 qstate->kevent_watermark = sizeof(size_t);
312 qstate->process_func =
313 on_mp_write_session_write_request_read1;
314 break;
315 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
316 qstate->kevent_watermark = 0;
317 qstate->process_func =
318 on_mp_write_session_abandon_notification;
319 break;
320 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
321 qstate->kevent_watermark = 0;
322 qstate->process_func =
323 on_mp_write_session_close_notification;
324 break;
325 default:
326 qstate->kevent_watermark = 0;
327 qstate->process_func = NULL;
328 LOG_ERR_2("on_mp_write_session_mapper",
329 "unknown element type");
330 TRACE_OUT(on_mp_write_session_mapper);
331 return (-1);
332 }
333 }
334 TRACE_OUT(on_mp_write_session_mapper);
335 return (0);
336 }
337
338 /*
339 * The functions below are used to process multipart write sessions write
340 * requests.
341 * - on_mp_write_session_write_request_read1 and
342 * on_mp_write_session_write_request_read2 read the request itself
343 * - on_mp_write_session_write_request_process processes it
344 * - on_mp_write_session_write_response_write1 sends the response
345 */
346 static int
on_mp_write_session_write_request_read1(struct query_state * qstate)347 on_mp_write_session_write_request_read1(struct query_state *qstate)
348 {
349 struct cache_mp_write_session_write_request *write_request;
350 ssize_t result;
351
352 TRACE_IN(on_mp_write_session_write_request_read1);
353 init_comm_element(&qstate->request,
354 CET_MP_WRITE_SESSION_WRITE_REQUEST);
355 write_request = get_cache_mp_write_session_write_request(
356 &qstate->request);
357
358 result = qstate->read_func(qstate, &write_request->data_size,
359 sizeof(size_t));
360
361 if (result != sizeof(size_t)) {
362 LOG_ERR_3("on_mp_write_session_write_request_read1",
363 "read failed");
364 TRACE_OUT(on_mp_write_session_write_request_read1);
365 return (-1);
366 }
367
368 if (BUFSIZE_INVALID(write_request->data_size)) {
369 LOG_ERR_3("on_mp_write_session_write_request_read1",
370 "invalid data_size value");
371 TRACE_OUT(on_mp_write_session_write_request_read1);
372 return (-1);
373 }
374
375 write_request->data = calloc(1, write_request->data_size);
376 assert(write_request->data != NULL);
377
378 qstate->kevent_watermark = write_request->data_size;
379 qstate->process_func = on_mp_write_session_write_request_read2;
380 TRACE_OUT(on_mp_write_session_write_request_read1);
381 return (0);
382 }
383
384 static int
on_mp_write_session_write_request_read2(struct query_state * qstate)385 on_mp_write_session_write_request_read2(struct query_state *qstate)
386 {
387 struct cache_mp_write_session_write_request *write_request;
388 ssize_t result;
389
390 TRACE_IN(on_mp_write_session_write_request_read2);
391 write_request = get_cache_mp_write_session_write_request(
392 &qstate->request);
393
394 result = qstate->read_func(qstate, write_request->data,
395 write_request->data_size);
396
397 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
398 LOG_ERR_3("on_mp_write_session_write_request_read2",
399 "read failed");
400 TRACE_OUT(on_mp_write_session_write_request_read2);
401 return (-1);
402 }
403
404 qstate->kevent_watermark = 0;
405 qstate->process_func = on_mp_write_session_write_request_process;
406 TRACE_OUT(on_mp_write_session_write_request_read2);
407 return (0);
408 }
409
410 static int
on_mp_write_session_write_request_process(struct query_state * qstate)411 on_mp_write_session_write_request_process(struct query_state *qstate)
412 {
413 struct cache_mp_write_session_write_request *write_request;
414 struct cache_mp_write_session_write_response *write_response;
415
416 TRACE_IN(on_mp_write_session_write_request_process);
417 init_comm_element(&qstate->response,
418 CET_MP_WRITE_SESSION_WRITE_RESPONSE);
419 write_response = get_cache_mp_write_session_write_response(
420 &qstate->response);
421 write_request = get_cache_mp_write_session_write_request(
422 &qstate->request);
423
424 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
425 write_response->error_code = cache_mp_write(
426 (cache_mp_write_session)qstate->mdata,
427 write_request->data,
428 write_request->data_size);
429 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
430
431 qstate->kevent_watermark = sizeof(int);
432 qstate->process_func = on_mp_write_session_write_response_write1;
433 qstate->kevent_filter = EVFILT_WRITE;
434
435 TRACE_OUT(on_mp_write_session_write_request_process);
436 return (0);
437 }
438
439 static int
on_mp_write_session_write_response_write1(struct query_state * qstate)440 on_mp_write_session_write_response_write1(struct query_state *qstate)
441 {
442 struct cache_mp_write_session_write_response *write_response;
443 ssize_t result;
444
445 TRACE_IN(on_mp_write_session_write_response_write1);
446 write_response = get_cache_mp_write_session_write_response(
447 &qstate->response);
448 result = qstate->write_func(qstate, &write_response->error_code,
449 sizeof(int));
450 if (result != sizeof(int)) {
451 LOG_ERR_3("on_mp_write_session_write_response_write1",
452 "write failed");
453 TRACE_OUT(on_mp_write_session_write_response_write1);
454 return (-1);
455 }
456
457 if (write_response->error_code == 0) {
458 finalize_comm_element(&qstate->request);
459 finalize_comm_element(&qstate->response);
460
461 qstate->kevent_watermark = sizeof(int);
462 qstate->process_func = on_mp_write_session_mapper;
463 qstate->kevent_filter = EVFILT_READ;
464 } else {
465 qstate->kevent_watermark = 0;
466 qstate->process_func = 0;
467 }
468
469 TRACE_OUT(on_mp_write_session_write_response_write1);
470 return (0);
471 }
472
473 /*
474 * Handles abandon notifications. Destroys the session by calling the
475 * abandon_cache_mp_write_session.
476 */
477 static int
on_mp_write_session_abandon_notification(struct query_state * qstate)478 on_mp_write_session_abandon_notification(struct query_state *qstate)
479 {
480 TRACE_IN(on_mp_write_session_abandon_notification);
481 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
482 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
483 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
484 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
485
486 qstate->kevent_watermark = 0;
487 qstate->process_func = NULL;
488 TRACE_OUT(on_mp_write_session_abandon_notification);
489 return (0);
490 }
491
492 /*
493 * Handles close notifications. Commits the session by calling
494 * the close_cache_mp_write_session.
495 */
496 static int
on_mp_write_session_close_notification(struct query_state * qstate)497 on_mp_write_session_close_notification(struct query_state *qstate)
498 {
499 TRACE_IN(on_mp_write_session_close_notification);
500 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
501 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
502 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
503 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
504
505 qstate->kevent_watermark = 0;
506 qstate->process_func = NULL;
507 TRACE_OUT(on_mp_write_session_close_notification);
508 return (0);
509 }
510
register_new_mp_cache_entry(struct query_state * qstate,const char * dec_cache_entry_name)511 cache_entry register_new_mp_cache_entry(struct query_state *qstate,
512 const char *dec_cache_entry_name)
513 {
514 cache_entry c_entry;
515 char *en_bkp;
516
517 TRACE_IN(register_new_mp_cache_entry);
518 c_entry = INVALID_CACHE_ENTRY;
519 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
520
521 configuration_lock_wrlock(s_configuration);
522 en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
523 qstate->config_entry->mp_cache_params.cep.entry_name =
524 (char *)dec_cache_entry_name;
525 register_cache_entry(s_cache, (struct cache_entry_params *)
526 &qstate->config_entry->mp_cache_params);
527 qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
528 configuration_unlock(s_configuration);
529
530 configuration_lock_rdlock(s_configuration);
531 c_entry = find_cache_entry(s_cache,
532 dec_cache_entry_name);
533 configuration_unlock(s_configuration);
534
535 configuration_entry_add_mp_cache_entry(qstate->config_entry,
536 c_entry);
537
538 configuration_unlock_entry(qstate->config_entry,
539 CELT_MULTIPART);
540
541 TRACE_OUT(register_new_mp_cache_entry);
542 return (c_entry);
543 }
544