1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/types.h> 29 #include <sys/event.h> 30 #include <sys/socket.h> 31 #include <sys/time.h> 32 33 #include <assert.h> 34 #include <errno.h> 35 #include <stdio.h> 36 #include <stdlib.h> 37 #include <string.h> 38 39 #include "cachelib.h" 40 #include "config.h" 41 #include "debug.h" 42 #include "log.h" 43 #include "query.h" 44 #include "mp_ws_query.h" 45 #include "singletons.h" 46 47 static int on_mp_write_session_abandon_notification(struct query_state *); 48 static int on_mp_write_session_close_notification(struct query_state *); 49 static void on_mp_write_session_destroy(struct query_state *); 50 static int on_mp_write_session_mapper(struct query_state *); 51 /* int on_mp_write_session_request_read1(struct query_state *); */ 52 static int on_mp_write_session_request_read2(struct query_state *); 53 static int on_mp_write_session_request_process(struct query_state *); 54 static int on_mp_write_session_response_write1(struct query_state *); 55 static int on_mp_write_session_write_request_read1(struct query_state *); 56 static int on_mp_write_session_write_request_read2(struct query_state *); 57 static int on_mp_write_session_write_request_process(struct query_state *); 58 static int on_mp_write_session_write_response_write1(struct query_state *); 59 60 /* 61 * This function is used as the query_state's destroy_func to make the 62 * proper cleanup in case of errors. 63 */ 64 static void 65 on_mp_write_session_destroy(struct query_state *qstate) 66 { 67 68 TRACE_IN(on_mp_write_session_destroy); 69 finalize_comm_element(&qstate->request); 70 finalize_comm_element(&qstate->response); 71 72 if (qstate->mdata != NULL) { 73 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 74 abandon_cache_mp_write_session( 75 (cache_mp_write_session)qstate->mdata); 76 configuration_unlock_entry(qstate->config_entry, 77 CELT_MULTIPART); 78 } 79 TRACE_OUT(on_mp_write_session_destroy); 80 } 81 82 /* 83 * The functions below are used to process multipart write session initiation 84 * requests. 85 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2 86 * read the request itself 87 * - on_mp_write_session_request_process processes it 88 * - on_mp_write_session_response_write1 sends the response 89 */ 90 int 91 on_mp_write_session_request_read1(struct query_state *qstate) 92 { 93 struct cache_mp_write_session_request *c_mp_ws_request; 94 ssize_t result; 95 96 TRACE_IN(on_mp_write_session_request_read1); 97 if (qstate->kevent_watermark == 0) 98 qstate->kevent_watermark = sizeof(size_t); 99 else { 100 init_comm_element(&qstate->request, 101 CET_MP_WRITE_SESSION_REQUEST); 102 c_mp_ws_request = get_cache_mp_write_session_request( 103 &qstate->request); 104 105 result = qstate->read_func(qstate, 106 &c_mp_ws_request->entry_length, sizeof(size_t)); 107 108 if (result != sizeof(size_t)) { 109 LOG_ERR_3("on_mp_write_session_request_read1", 110 "read failed"); 111 TRACE_OUT(on_mp_write_session_request_read1); 112 return (-1); 113 } 114 115 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) { 116 LOG_ERR_3("on_mp_write_session_request_read1", 117 "invalid entry_length value"); 118 TRACE_OUT(on_mp_write_session_request_read1); 119 return (-1); 120 } 121 122 c_mp_ws_request->entry = calloc(1, 123 c_mp_ws_request->entry_length + 1); 124 assert(c_mp_ws_request->entry != NULL); 125 126 qstate->kevent_watermark = c_mp_ws_request->entry_length; 127 qstate->process_func = on_mp_write_session_request_read2; 128 } 129 TRACE_OUT(on_mp_write_session_request_read1); 130 return (0); 131 } 132 133 static int 134 on_mp_write_session_request_read2(struct query_state *qstate) 135 { 136 struct cache_mp_write_session_request *c_mp_ws_request; 137 ssize_t result; 138 139 TRACE_IN(on_mp_write_session_request_read2); 140 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 141 142 result = qstate->read_func(qstate, c_mp_ws_request->entry, 143 c_mp_ws_request->entry_length); 144 145 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 146 LOG_ERR_3("on_mp_write_session_request_read2", 147 "read failed"); 148 TRACE_OUT(on_mp_write_session_request_read2); 149 return (-1); 150 } 151 152 qstate->kevent_watermark = 0; 153 qstate->process_func = on_mp_write_session_request_process; 154 155 TRACE_OUT(on_mp_write_session_request_read2); 156 return (0); 157 } 158 159 static int 160 on_mp_write_session_request_process(struct query_state *qstate) 161 { 162 struct cache_mp_write_session_request *c_mp_ws_request; 163 struct cache_mp_write_session_response *c_mp_ws_response; 164 cache_mp_write_session ws; 165 cache_entry c_entry; 166 char *dec_cache_entry_name; 167 168 TRACE_IN(on_mp_write_session_request_process); 169 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE); 170 c_mp_ws_response = get_cache_mp_write_session_response( 171 &qstate->response); 172 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 173 174 qstate->config_entry = configuration_find_entry( 175 s_configuration, c_mp_ws_request->entry); 176 if (qstate->config_entry == NULL) { 177 c_mp_ws_response->error_code = ENOENT; 178 179 LOG_ERR_2("write_session_request", 180 "can't find configuration entry '%s'. " 181 "aborting request", c_mp_ws_request->entry); 182 goto fin; 183 } 184 185 if (qstate->config_entry->enabled == 0) { 186 c_mp_ws_response->error_code = EACCES; 187 188 LOG_ERR_2("write_session_request", 189 "configuration entry '%s' is disabled", 190 c_mp_ws_request->entry); 191 goto fin; 192 } 193 194 if (qstate->config_entry->perform_actual_lookups != 0) { 195 c_mp_ws_response->error_code = EOPNOTSUPP; 196 197 LOG_ERR_2("write_session_request", 198 "entry '%s' performs lookups by itself: " 199 "can't write to it", c_mp_ws_request->entry); 200 goto fin; 201 } else { 202 #ifdef NS_NSCD_EID_CHECKING 203 if (check_query_eids(qstate) != 0) { 204 c_mp_ws_response->error_code = EPERM; 205 goto fin; 206 } 207 #endif 208 } 209 210 /* 211 * All multipart entries are separated by their name decorations. 212 * For one configuration entry there will be a lot of multipart 213 * cache entries - each with its own decorated name. 214 */ 215 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 216 qstate->config_entry->mp_cache_params.cep.entry_name); 217 assert(dec_cache_entry_name != NULL); 218 219 configuration_lock_rdlock(s_configuration); 220 c_entry = find_cache_entry(s_cache, 221 dec_cache_entry_name); 222 configuration_unlock(s_configuration); 223 224 if (c_entry == INVALID_CACHE_ENTRY) 225 c_entry = register_new_mp_cache_entry(qstate, 226 dec_cache_entry_name); 227 228 free(dec_cache_entry_name); 229 230 assert(c_entry != NULL); 231 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 232 ws = open_cache_mp_write_session(c_entry); 233 if (ws == INVALID_CACHE_MP_WRITE_SESSION) 234 c_mp_ws_response->error_code = -1; 235 else { 236 qstate->mdata = ws; 237 qstate->destroy_func = on_mp_write_session_destroy; 238 239 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 240 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 241 memcpy(&qstate->timeout, 242 &qstate->config_entry->mp_query_timeout, 243 sizeof(struct timeval)); 244 } 245 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 246 247 fin: 248 qstate->process_func = on_mp_write_session_response_write1; 249 qstate->kevent_watermark = sizeof(int); 250 qstate->kevent_filter = EVFILT_WRITE; 251 252 TRACE_OUT(on_mp_write_session_request_process); 253 return (0); 254 } 255 256 static int 257 on_mp_write_session_response_write1(struct query_state *qstate) 258 { 259 struct cache_mp_write_session_response *c_mp_ws_response; 260 ssize_t result; 261 262 TRACE_IN(on_mp_write_session_response_write1); 263 c_mp_ws_response = get_cache_mp_write_session_response( 264 &qstate->response); 265 result = qstate->write_func(qstate, &c_mp_ws_response->error_code, 266 sizeof(int)); 267 if (result != sizeof(int)) { 268 LOG_ERR_3("on_mp_write_session_response_write1", 269 "write failed"); 270 TRACE_OUT(on_mp_write_session_response_write1); 271 return (-1); 272 } 273 274 if (c_mp_ws_response->error_code == 0) { 275 qstate->kevent_watermark = sizeof(int); 276 qstate->process_func = on_mp_write_session_mapper; 277 qstate->kevent_filter = EVFILT_READ; 278 } else { 279 qstate->kevent_watermark = 0; 280 qstate->process_func = NULL; 281 } 282 TRACE_OUT(on_mp_write_session_response_write1); 283 return (0); 284 } 285 286 /* 287 * Mapper function is used to avoid multiple connections for each session 288 * write or read requests. After processing the request, it does not close 289 * the connection, but waits for the next request. 290 */ 291 static int 292 on_mp_write_session_mapper(struct query_state *qstate) 293 { 294 ssize_t result; 295 int elem_type; 296 297 TRACE_IN(on_mp_write_session_mapper); 298 if (qstate->kevent_watermark == 0) { 299 qstate->kevent_watermark = sizeof(int); 300 } else { 301 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 302 if (result != sizeof(int)) { 303 LOG_ERR_3("on_mp_write_session_mapper", 304 "read failed"); 305 TRACE_OUT(on_mp_write_session_mapper); 306 return (-1); 307 } 308 309 switch (elem_type) { 310 case CET_MP_WRITE_SESSION_WRITE_REQUEST: 311 qstate->kevent_watermark = sizeof(size_t); 312 qstate->process_func = 313 on_mp_write_session_write_request_read1; 314 break; 315 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION: 316 qstate->kevent_watermark = 0; 317 qstate->process_func = 318 on_mp_write_session_abandon_notification; 319 break; 320 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION: 321 qstate->kevent_watermark = 0; 322 qstate->process_func = 323 on_mp_write_session_close_notification; 324 break; 325 default: 326 qstate->kevent_watermark = 0; 327 qstate->process_func = NULL; 328 LOG_ERR_2("on_mp_write_session_mapper", 329 "unknown element type"); 330 TRACE_OUT(on_mp_write_session_mapper); 331 return (-1); 332 } 333 } 334 TRACE_OUT(on_mp_write_session_mapper); 335 return (0); 336 } 337 338 /* 339 * The functions below are used to process multipart write sessions write 340 * requests. 341 * - on_mp_write_session_write_request_read1 and 342 * on_mp_write_session_write_request_read2 read the request itself 343 * - on_mp_write_session_write_request_process processes it 344 * - on_mp_write_session_write_response_write1 sends the response 345 */ 346 static int 347 on_mp_write_session_write_request_read1(struct query_state *qstate) 348 { 349 struct cache_mp_write_session_write_request *write_request; 350 ssize_t result; 351 352 TRACE_IN(on_mp_write_session_write_request_read1); 353 init_comm_element(&qstate->request, 354 CET_MP_WRITE_SESSION_WRITE_REQUEST); 355 write_request = get_cache_mp_write_session_write_request( 356 &qstate->request); 357 358 result = qstate->read_func(qstate, &write_request->data_size, 359 sizeof(size_t)); 360 361 if (result != sizeof(size_t)) { 362 LOG_ERR_3("on_mp_write_session_write_request_read1", 363 "read failed"); 364 TRACE_OUT(on_mp_write_session_write_request_read1); 365 return (-1); 366 } 367 368 if (BUFSIZE_INVALID(write_request->data_size)) { 369 LOG_ERR_3("on_mp_write_session_write_request_read1", 370 "invalid data_size value"); 371 TRACE_OUT(on_mp_write_session_write_request_read1); 372 return (-1); 373 } 374 375 write_request->data = calloc(1, write_request->data_size); 376 assert(write_request->data != NULL); 377 378 qstate->kevent_watermark = write_request->data_size; 379 qstate->process_func = on_mp_write_session_write_request_read2; 380 TRACE_OUT(on_mp_write_session_write_request_read1); 381 return (0); 382 } 383 384 static int 385 on_mp_write_session_write_request_read2(struct query_state *qstate) 386 { 387 struct cache_mp_write_session_write_request *write_request; 388 ssize_t result; 389 390 TRACE_IN(on_mp_write_session_write_request_read2); 391 write_request = get_cache_mp_write_session_write_request( 392 &qstate->request); 393 394 result = qstate->read_func(qstate, write_request->data, 395 write_request->data_size); 396 397 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 398 LOG_ERR_3("on_mp_write_session_write_request_read2", 399 "read failed"); 400 TRACE_OUT(on_mp_write_session_write_request_read2); 401 return (-1); 402 } 403 404 qstate->kevent_watermark = 0; 405 qstate->process_func = on_mp_write_session_write_request_process; 406 TRACE_OUT(on_mp_write_session_write_request_read2); 407 return (0); 408 } 409 410 static int 411 on_mp_write_session_write_request_process(struct query_state *qstate) 412 { 413 struct cache_mp_write_session_write_request *write_request; 414 struct cache_mp_write_session_write_response *write_response; 415 416 TRACE_IN(on_mp_write_session_write_request_process); 417 init_comm_element(&qstate->response, 418 CET_MP_WRITE_SESSION_WRITE_RESPONSE); 419 write_response = get_cache_mp_write_session_write_response( 420 &qstate->response); 421 write_request = get_cache_mp_write_session_write_request( 422 &qstate->request); 423 424 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 425 write_response->error_code = cache_mp_write( 426 (cache_mp_write_session)qstate->mdata, 427 write_request->data, 428 write_request->data_size); 429 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 430 431 qstate->kevent_watermark = sizeof(int); 432 qstate->process_func = on_mp_write_session_write_response_write1; 433 qstate->kevent_filter = EVFILT_WRITE; 434 435 TRACE_OUT(on_mp_write_session_write_request_process); 436 return (0); 437 } 438 439 static int 440 on_mp_write_session_write_response_write1(struct query_state *qstate) 441 { 442 struct cache_mp_write_session_write_response *write_response; 443 ssize_t result; 444 445 TRACE_IN(on_mp_write_session_write_response_write1); 446 write_response = get_cache_mp_write_session_write_response( 447 &qstate->response); 448 result = qstate->write_func(qstate, &write_response->error_code, 449 sizeof(int)); 450 if (result != sizeof(int)) { 451 LOG_ERR_3("on_mp_write_session_write_response_write1", 452 "write failed"); 453 TRACE_OUT(on_mp_write_session_write_response_write1); 454 return (-1); 455 } 456 457 if (write_response->error_code == 0) { 458 finalize_comm_element(&qstate->request); 459 finalize_comm_element(&qstate->response); 460 461 qstate->kevent_watermark = sizeof(int); 462 qstate->process_func = on_mp_write_session_mapper; 463 qstate->kevent_filter = EVFILT_READ; 464 } else { 465 qstate->kevent_watermark = 0; 466 qstate->process_func = 0; 467 } 468 469 TRACE_OUT(on_mp_write_session_write_response_write1); 470 return (0); 471 } 472 473 /* 474 * Handles abandon notifications. Destroys the session by calling the 475 * abandon_cache_mp_write_session. 476 */ 477 static int 478 on_mp_write_session_abandon_notification(struct query_state *qstate) 479 { 480 TRACE_IN(on_mp_write_session_abandon_notification); 481 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 482 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 483 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 484 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 485 486 qstate->kevent_watermark = 0; 487 qstate->process_func = NULL; 488 TRACE_OUT(on_mp_write_session_abandon_notification); 489 return (0); 490 } 491 492 /* 493 * Handles close notifications. Commits the session by calling 494 * the close_cache_mp_write_session. 495 */ 496 static int 497 on_mp_write_session_close_notification(struct query_state *qstate) 498 { 499 TRACE_IN(on_mp_write_session_close_notification); 500 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 501 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 502 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 503 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 504 505 qstate->kevent_watermark = 0; 506 qstate->process_func = NULL; 507 TRACE_OUT(on_mp_write_session_close_notification); 508 return (0); 509 } 510 511 cache_entry register_new_mp_cache_entry(struct query_state *qstate, 512 const char *dec_cache_entry_name) 513 { 514 cache_entry c_entry; 515 char *en_bkp; 516 517 TRACE_IN(register_new_mp_cache_entry); 518 c_entry = INVALID_CACHE_ENTRY; 519 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 520 521 configuration_lock_wrlock(s_configuration); 522 en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name; 523 qstate->config_entry->mp_cache_params.cep.entry_name = 524 (char *)dec_cache_entry_name; 525 register_cache_entry(s_cache, (struct cache_entry_params *) 526 &qstate->config_entry->mp_cache_params); 527 qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp; 528 configuration_unlock(s_configuration); 529 530 configuration_lock_rdlock(s_configuration); 531 c_entry = find_cache_entry(s_cache, 532 dec_cache_entry_name); 533 configuration_unlock(s_configuration); 534 535 configuration_entry_add_mp_cache_entry(qstate->config_entry, 536 c_entry); 537 538 configuration_unlock_entry(qstate->config_entry, 539 CELT_MULTIPART); 540 541 TRACE_OUT(register_new_mp_cache_entry); 542 return (c_entry); 543 } 544