1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/cdefs.h> 29 #include <sys/types.h> 30 #include <sys/event.h> 31 #include <sys/socket.h> 32 #include <sys/time.h> 33 34 #include <assert.h> 35 #include <errno.h> 36 #include <nsswitch.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 41 #include "cachelib.h" 42 #include "config.h" 43 #include "debug.h" 44 #include "log.h" 45 #include "query.h" 46 #include "mp_rs_query.h" 47 #include "mp_ws_query.h" 48 #include "singletons.h" 49 50 static int on_mp_read_session_close_notification(struct query_state *); 51 static void on_mp_read_session_destroy(struct query_state *); 52 static int on_mp_read_session_mapper(struct query_state *); 53 /* int on_mp_read_session_request_read1(struct query_state *); */ 54 static int on_mp_read_session_request_read2(struct query_state *); 55 static int on_mp_read_session_request_process(struct query_state *); 56 static int on_mp_read_session_response_write1(struct query_state *); 57 static int on_mp_read_session_read_request_process(struct query_state *); 58 static int on_mp_read_session_read_response_write1(struct query_state *); 59 static int on_mp_read_session_read_response_write2(struct query_state *); 60 61 /* 62 * This function is used as the query_state's destroy_func to make the 63 * proper cleanup in case of errors. 64 */ 65 static void 66 on_mp_read_session_destroy(struct query_state *qstate) 67 { 68 TRACE_IN(on_mp_read_session_destroy); 69 finalize_comm_element(&qstate->request); 70 finalize_comm_element(&qstate->response); 71 72 if (qstate->mdata != NULL) { 73 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 74 close_cache_mp_read_session( 75 (cache_mp_read_session)qstate->mdata); 76 configuration_unlock_entry(qstate->config_entry, 77 CELT_MULTIPART); 78 } 79 TRACE_OUT(on_mp_read_session_destroy); 80 } 81 82 /* 83 * The functions below are used to process multipart read session initiation 84 * requests. 85 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 86 * the request itself 87 * - on_mp_read_session_request_process processes it 88 * - on_mp_read_session_response_write1 sends the response 89 */ 90 int 91 on_mp_read_session_request_read1(struct query_state *qstate) 92 { 93 struct cache_mp_read_session_request *c_mp_rs_request; 94 ssize_t result; 95 96 TRACE_IN(on_mp_read_session_request_read1); 97 if (qstate->kevent_watermark == 0) 98 qstate->kevent_watermark = sizeof(size_t); 99 else { 100 init_comm_element(&qstate->request, 101 CET_MP_READ_SESSION_REQUEST); 102 c_mp_rs_request = get_cache_mp_read_session_request( 103 &qstate->request); 104 105 result = qstate->read_func(qstate, 106 &c_mp_rs_request->entry_length, sizeof(size_t)); 107 108 if (result != sizeof(size_t)) { 109 TRACE_OUT(on_mp_read_session_request_read1); 110 return (-1); 111 } 112 113 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 114 TRACE_OUT(on_mp_read_session_request_read1); 115 return (-1); 116 } 117 118 c_mp_rs_request->entry = calloc(1, 119 c_mp_rs_request->entry_length + 1); 120 assert(c_mp_rs_request->entry != NULL); 121 122 qstate->kevent_watermark = c_mp_rs_request->entry_length; 123 qstate->process_func = on_mp_read_session_request_read2; 124 } 125 TRACE_OUT(on_mp_read_session_request_read1); 126 return (0); 127 } 128 129 static int 130 on_mp_read_session_request_read2(struct query_state *qstate) 131 { 132 struct cache_mp_read_session_request *c_mp_rs_request; 133 ssize_t result; 134 135 TRACE_IN(on_mp_read_session_request_read2); 136 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 137 138 result = qstate->read_func(qstate, c_mp_rs_request->entry, 139 c_mp_rs_request->entry_length); 140 141 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 142 LOG_ERR_3("on_mp_read_session_request_read2", 143 "read failed"); 144 TRACE_OUT(on_mp_read_session_request_read2); 145 return (-1); 146 } 147 148 qstate->kevent_watermark = 0; 149 qstate->process_func = on_mp_read_session_request_process; 150 TRACE_OUT(on_mp_read_session_request_read2); 151 return (0); 152 } 153 154 static int 155 on_mp_read_session_request_process(struct query_state *qstate) 156 { 157 struct cache_mp_read_session_request *c_mp_rs_request; 158 struct cache_mp_read_session_response *c_mp_rs_response; 159 cache_mp_read_session rs; 160 cache_entry c_entry; 161 char *dec_cache_entry_name; 162 163 char *buffer; 164 size_t buffer_size; 165 cache_mp_write_session ws; 166 struct agent *lookup_agent; 167 struct multipart_agent *mp_agent; 168 void *mdata; 169 int res; 170 171 TRACE_IN(on_mp_read_session_request_process); 172 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 173 c_mp_rs_response = get_cache_mp_read_session_response( 174 &qstate->response); 175 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 176 177 qstate->config_entry = configuration_find_entry( 178 s_configuration, c_mp_rs_request->entry); 179 if (qstate->config_entry == NULL) { 180 c_mp_rs_response->error_code = ENOENT; 181 182 LOG_ERR_2("read_session_request", 183 "can't find configuration entry '%s'." 184 " aborting request", c_mp_rs_request->entry); 185 goto fin; 186 } 187 188 if (qstate->config_entry->enabled == 0) { 189 c_mp_rs_response->error_code = EACCES; 190 191 LOG_ERR_2("read_session_request", 192 "configuration entry '%s' is disabled", 193 c_mp_rs_request->entry); 194 goto fin; 195 } 196 197 if (qstate->config_entry->perform_actual_lookups != 0) 198 dec_cache_entry_name = strdup( 199 qstate->config_entry->mp_cache_params.cep.entry_name); 200 else { 201 #ifdef NS_NSCD_EID_CHECKING 202 if (check_query_eids(qstate) != 0) { 203 c_mp_rs_response->error_code = EPERM; 204 goto fin; 205 } 206 #endif 207 208 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 209 qstate->config_entry->mp_cache_params.cep.entry_name); 210 } 211 212 assert(dec_cache_entry_name != NULL); 213 214 configuration_lock_rdlock(s_configuration); 215 c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 216 configuration_unlock(s_configuration); 217 218 if ((c_entry == INVALID_CACHE) && 219 (qstate->config_entry->perform_actual_lookups != 0)) 220 c_entry = register_new_mp_cache_entry(qstate, 221 dec_cache_entry_name); 222 223 free(dec_cache_entry_name); 224 225 if (c_entry != INVALID_CACHE_ENTRY) { 226 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 227 rs = open_cache_mp_read_session(c_entry); 228 configuration_unlock_entry(qstate->config_entry, 229 CELT_MULTIPART); 230 231 if ((rs == INVALID_CACHE_MP_READ_SESSION) && 232 (qstate->config_entry->perform_actual_lookups != 0)) { 233 lookup_agent = find_agent(s_agent_table, 234 c_mp_rs_request->entry, MULTIPART_AGENT); 235 236 if ((lookup_agent != NULL) && 237 (lookup_agent->type == MULTIPART_AGENT)) { 238 mp_agent = (struct multipart_agent *) 239 lookup_agent; 240 mdata = mp_agent->mp_init_func(); 241 242 /* 243 * Multipart agents read the whole snapshot 244 * of the data at one time. 245 */ 246 configuration_lock_entry(qstate->config_entry, 247 CELT_MULTIPART); 248 ws = open_cache_mp_write_session(c_entry); 249 configuration_unlock_entry(qstate->config_entry, 250 CELT_MULTIPART); 251 if (ws != NULL) { 252 do { 253 buffer = NULL; 254 res = mp_agent->mp_lookup_func(&buffer, 255 &buffer_size, 256 mdata); 257 258 if ((res & NS_TERMINATE) && 259 (buffer != NULL)) { 260 configuration_lock_entry( 261 qstate->config_entry, 262 CELT_MULTIPART); 263 if (cache_mp_write(ws, buffer, 264 buffer_size) != 0) { 265 abandon_cache_mp_write_session(ws); 266 ws = NULL; 267 } 268 configuration_unlock_entry( 269 qstate->config_entry, 270 CELT_MULTIPART); 271 272 free(buffer); 273 buffer = NULL; 274 } else { 275 configuration_lock_entry( 276 qstate->config_entry, 277 CELT_MULTIPART); 278 close_cache_mp_write_session(ws); 279 configuration_unlock_entry( 280 qstate->config_entry, 281 CELT_MULTIPART); 282 283 free(buffer); 284 buffer = NULL; 285 } 286 } while ((res & NS_TERMINATE) && 287 (ws != NULL)); 288 } 289 290 configuration_lock_entry(qstate->config_entry, 291 CELT_MULTIPART); 292 rs = open_cache_mp_read_session(c_entry); 293 configuration_unlock_entry(qstate->config_entry, 294 CELT_MULTIPART); 295 } 296 } 297 298 if (rs == INVALID_CACHE_MP_READ_SESSION) 299 c_mp_rs_response->error_code = -1; 300 else { 301 qstate->mdata = rs; 302 qstate->destroy_func = on_mp_read_session_destroy; 303 304 configuration_lock_entry(qstate->config_entry, 305 CELT_MULTIPART); 306 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 307 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 308 memcpy(&qstate->timeout, 309 &qstate->config_entry->mp_query_timeout, 310 sizeof(struct timeval)); 311 configuration_unlock_entry(qstate->config_entry, 312 CELT_MULTIPART); 313 } 314 } else 315 c_mp_rs_response->error_code = -1; 316 317 fin: 318 qstate->process_func = on_mp_read_session_response_write1; 319 qstate->kevent_watermark = sizeof(int); 320 qstate->kevent_filter = EVFILT_WRITE; 321 322 TRACE_OUT(on_mp_read_session_request_process); 323 return (0); 324 } 325 326 static int 327 on_mp_read_session_response_write1(struct query_state *qstate) 328 { 329 struct cache_mp_read_session_response *c_mp_rs_response; 330 ssize_t result; 331 332 TRACE_IN(on_mp_read_session_response_write1); 333 c_mp_rs_response = get_cache_mp_read_session_response( 334 &qstate->response); 335 result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 336 sizeof(int)); 337 338 if (result != sizeof(int)) { 339 LOG_ERR_3("on_mp_read_session_response_write1", 340 "write failed"); 341 TRACE_OUT(on_mp_read_session_response_write1); 342 return (-1); 343 } 344 345 if (c_mp_rs_response->error_code == 0) { 346 qstate->kevent_watermark = sizeof(int); 347 qstate->process_func = on_mp_read_session_mapper; 348 qstate->kevent_filter = EVFILT_READ; 349 } else { 350 qstate->kevent_watermark = 0; 351 qstate->process_func = NULL; 352 } 353 TRACE_OUT(on_mp_read_session_response_write1); 354 return (0); 355 } 356 357 /* 358 * Mapper function is used to avoid multiple connections for each session 359 * write or read requests. After processing the request, it does not close 360 * the connection, but waits for the next request. 361 */ 362 static int 363 on_mp_read_session_mapper(struct query_state *qstate) 364 { 365 ssize_t result; 366 int elem_type; 367 368 TRACE_IN(on_mp_read_session_mapper); 369 if (qstate->kevent_watermark == 0) { 370 qstate->kevent_watermark = sizeof(int); 371 } else { 372 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 373 if (result != sizeof(int)) { 374 LOG_ERR_3("on_mp_read_session_mapper", 375 "read failed"); 376 TRACE_OUT(on_mp_read_session_mapper); 377 return (-1); 378 } 379 380 switch (elem_type) { 381 case CET_MP_READ_SESSION_READ_REQUEST: 382 qstate->kevent_watermark = 0; 383 qstate->process_func = 384 on_mp_read_session_read_request_process; 385 break; 386 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 387 qstate->kevent_watermark = 0; 388 qstate->process_func = 389 on_mp_read_session_close_notification; 390 break; 391 default: 392 qstate->kevent_watermark = 0; 393 qstate->process_func = NULL; 394 LOG_ERR_3("on_mp_read_session_mapper", 395 "unknown element type"); 396 TRACE_OUT(on_mp_read_session_mapper); 397 return (-1); 398 } 399 } 400 TRACE_OUT(on_mp_read_session_mapper); 401 return (0); 402 } 403 404 /* 405 * The functions below are used to process multipart read sessions read 406 * requests. User doesn't have to pass any kind of data, besides the 407 * request identificator itself. So we don't need any XXX_read functions and 408 * start with the XXX_process function. 409 * - on_mp_read_session_read_request_process processes it 410 * - on_mp_read_session_read_response_write1 and 411 * on_mp_read_session_read_response_write2 sends the response 412 */ 413 static int 414 on_mp_read_session_read_request_process(struct query_state *qstate) 415 { 416 struct cache_mp_read_session_read_response *read_response; 417 418 TRACE_IN(on_mp_read_session_response_process); 419 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 420 read_response = get_cache_mp_read_session_read_response( 421 &qstate->response); 422 423 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 424 read_response->error_code = cache_mp_read( 425 (cache_mp_read_session)qstate->mdata, NULL, 426 &read_response->data_size); 427 428 if (read_response->error_code == 0) { 429 read_response->data = malloc(read_response->data_size); 430 assert(read_response != NULL); 431 read_response->error_code = cache_mp_read( 432 (cache_mp_read_session)qstate->mdata, 433 read_response->data, 434 &read_response->data_size); 435 } 436 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 437 438 if (read_response->error_code == 0) 439 qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 440 else 441 qstate->kevent_watermark = sizeof(int); 442 qstate->process_func = on_mp_read_session_read_response_write1; 443 qstate->kevent_filter = EVFILT_WRITE; 444 445 TRACE_OUT(on_mp_read_session_response_process); 446 return (0); 447 } 448 449 static int 450 on_mp_read_session_read_response_write1(struct query_state *qstate) 451 { 452 struct cache_mp_read_session_read_response *read_response; 453 ssize_t result; 454 455 TRACE_IN(on_mp_read_session_read_response_write1); 456 read_response = get_cache_mp_read_session_read_response( 457 &qstate->response); 458 459 result = qstate->write_func(qstate, &read_response->error_code, 460 sizeof(int)); 461 if (read_response->error_code == 0) { 462 result += qstate->write_func(qstate, &read_response->data_size, 463 sizeof(size_t)); 464 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 465 TRACE_OUT(on_mp_read_session_read_response_write1); 466 LOG_ERR_3("on_mp_read_session_read_response_write1", 467 "write failed"); 468 return (-1); 469 } 470 471 qstate->kevent_watermark = read_response->data_size; 472 qstate->process_func = on_mp_read_session_read_response_write2; 473 } else { 474 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 475 LOG_ERR_3("on_mp_read_session_read_response_write1", 476 "write failed"); 477 TRACE_OUT(on_mp_read_session_read_response_write1); 478 return (-1); 479 } 480 481 qstate->kevent_watermark = 0; 482 qstate->process_func = NULL; 483 } 484 485 TRACE_OUT(on_mp_read_session_read_response_write1); 486 return (0); 487 } 488 489 static int 490 on_mp_read_session_read_response_write2(struct query_state *qstate) 491 { 492 struct cache_mp_read_session_read_response *read_response; 493 ssize_t result; 494 495 TRACE_IN(on_mp_read_session_read_response_write2); 496 read_response = get_cache_mp_read_session_read_response( 497 &qstate->response); 498 result = qstate->write_func(qstate, read_response->data, 499 read_response->data_size); 500 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 501 LOG_ERR_3("on_mp_read_session_read_response_write2", 502 "write failed"); 503 TRACE_OUT(on_mp_read_session_read_response_write2); 504 return (-1); 505 } 506 507 finalize_comm_element(&qstate->request); 508 finalize_comm_element(&qstate->response); 509 510 qstate->kevent_watermark = sizeof(int); 511 qstate->process_func = on_mp_read_session_mapper; 512 qstate->kevent_filter = EVFILT_READ; 513 514 TRACE_OUT(on_mp_read_session_read_response_write2); 515 return (0); 516 } 517 518 /* 519 * Handles session close notification by calling close_cache_mp_read_session 520 * function. 521 */ 522 static int 523 on_mp_read_session_close_notification(struct query_state *qstate) 524 { 525 526 TRACE_IN(on_mp_read_session_close_notification); 527 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 528 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 529 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 530 qstate->mdata = NULL; 531 qstate->kevent_watermark = 0; 532 qstate->process_func = NULL; 533 TRACE_OUT(on_mp_read_session_close_notification); 534 return (0); 535 } 536