1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/cdefs.h> 29 #include <sys/types.h> 30 #include <sys/event.h> 31 #include <sys/socket.h> 32 #include <sys/time.h> 33 34 #include <assert.h> 35 #include <errno.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 40 #include "cachelib.h" 41 #include "config.h" 42 #include "debug.h" 43 #include "log.h" 44 #include "query.h" 45 #include "mp_ws_query.h" 46 #include "singletons.h" 47 48 static int on_mp_write_session_abandon_notification(struct query_state *); 49 static int on_mp_write_session_close_notification(struct query_state *); 50 static void on_mp_write_session_destroy(struct query_state *); 51 static int on_mp_write_session_mapper(struct query_state *); 52 /* int on_mp_write_session_request_read1(struct query_state *); */ 53 static int on_mp_write_session_request_read2(struct query_state *); 54 static int on_mp_write_session_request_process(struct query_state *); 55 static int on_mp_write_session_response_write1(struct query_state *); 56 static int on_mp_write_session_write_request_read1(struct query_state *); 57 static int on_mp_write_session_write_request_read2(struct query_state *); 58 static int on_mp_write_session_write_request_process(struct query_state *); 59 static int on_mp_write_session_write_response_write1(struct query_state *); 60 61 /* 62 * This function is used as the query_state's destroy_func to make the 63 * proper cleanup in case of errors. 64 */ 65 static void 66 on_mp_write_session_destroy(struct query_state *qstate) 67 { 68 69 TRACE_IN(on_mp_write_session_destroy); 70 finalize_comm_element(&qstate->request); 71 finalize_comm_element(&qstate->response); 72 73 if (qstate->mdata != NULL) { 74 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 75 abandon_cache_mp_write_session( 76 (cache_mp_write_session)qstate->mdata); 77 configuration_unlock_entry(qstate->config_entry, 78 CELT_MULTIPART); 79 } 80 TRACE_OUT(on_mp_write_session_destroy); 81 } 82 83 /* 84 * The functions below are used to process multipart write session initiation 85 * requests. 86 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2 87 * read the request itself 88 * - on_mp_write_session_request_process processes it 89 * - on_mp_write_session_response_write1 sends the response 90 */ 91 int 92 on_mp_write_session_request_read1(struct query_state *qstate) 93 { 94 struct cache_mp_write_session_request *c_mp_ws_request; 95 ssize_t result; 96 97 TRACE_IN(on_mp_write_session_request_read1); 98 if (qstate->kevent_watermark == 0) 99 qstate->kevent_watermark = sizeof(size_t); 100 else { 101 init_comm_element(&qstate->request, 102 CET_MP_WRITE_SESSION_REQUEST); 103 c_mp_ws_request = get_cache_mp_write_session_request( 104 &qstate->request); 105 106 result = qstate->read_func(qstate, 107 &c_mp_ws_request->entry_length, sizeof(size_t)); 108 109 if (result != sizeof(size_t)) { 110 LOG_ERR_3("on_mp_write_session_request_read1", 111 "read failed"); 112 TRACE_OUT(on_mp_write_session_request_read1); 113 return (-1); 114 } 115 116 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) { 117 LOG_ERR_3("on_mp_write_session_request_read1", 118 "invalid entry_length value"); 119 TRACE_OUT(on_mp_write_session_request_read1); 120 return (-1); 121 } 122 123 c_mp_ws_request->entry = calloc(1, 124 c_mp_ws_request->entry_length + 1); 125 assert(c_mp_ws_request->entry != NULL); 126 127 qstate->kevent_watermark = c_mp_ws_request->entry_length; 128 qstate->process_func = on_mp_write_session_request_read2; 129 } 130 TRACE_OUT(on_mp_write_session_request_read1); 131 return (0); 132 } 133 134 static int 135 on_mp_write_session_request_read2(struct query_state *qstate) 136 { 137 struct cache_mp_write_session_request *c_mp_ws_request; 138 ssize_t result; 139 140 TRACE_IN(on_mp_write_session_request_read2); 141 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 142 143 result = qstate->read_func(qstate, c_mp_ws_request->entry, 144 c_mp_ws_request->entry_length); 145 146 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 147 LOG_ERR_3("on_mp_write_session_request_read2", 148 "read failed"); 149 TRACE_OUT(on_mp_write_session_request_read2); 150 return (-1); 151 } 152 153 qstate->kevent_watermark = 0; 154 qstate->process_func = on_mp_write_session_request_process; 155 156 TRACE_OUT(on_mp_write_session_request_read2); 157 return (0); 158 } 159 160 static int 161 on_mp_write_session_request_process(struct query_state *qstate) 162 { 163 struct cache_mp_write_session_request *c_mp_ws_request; 164 struct cache_mp_write_session_response *c_mp_ws_response; 165 cache_mp_write_session ws; 166 cache_entry c_entry; 167 char *dec_cache_entry_name; 168 169 TRACE_IN(on_mp_write_session_request_process); 170 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE); 171 c_mp_ws_response = get_cache_mp_write_session_response( 172 &qstate->response); 173 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 174 175 qstate->config_entry = configuration_find_entry( 176 s_configuration, c_mp_ws_request->entry); 177 if (qstate->config_entry == NULL) { 178 c_mp_ws_response->error_code = ENOENT; 179 180 LOG_ERR_2("write_session_request", 181 "can't find configuration entry '%s'. " 182 "aborting request", c_mp_ws_request->entry); 183 goto fin; 184 } 185 186 if (qstate->config_entry->enabled == 0) { 187 c_mp_ws_response->error_code = EACCES; 188 189 LOG_ERR_2("write_session_request", 190 "configuration entry '%s' is disabled", 191 c_mp_ws_request->entry); 192 goto fin; 193 } 194 195 if (qstate->config_entry->perform_actual_lookups != 0) { 196 c_mp_ws_response->error_code = EOPNOTSUPP; 197 198 LOG_ERR_2("write_session_request", 199 "entry '%s' performs lookups by itself: " 200 "can't write to it", c_mp_ws_request->entry); 201 goto fin; 202 } else { 203 #ifdef NS_NSCD_EID_CHECKING 204 if (check_query_eids(qstate) != 0) { 205 c_mp_ws_response->error_code = EPERM; 206 goto fin; 207 } 208 #endif 209 } 210 211 /* 212 * All multipart entries are separated by their name decorations. 213 * For one configuration entry there will be a lot of multipart 214 * cache entries - each with its own decorated name. 215 */ 216 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 217 qstate->config_entry->mp_cache_params.cep.entry_name); 218 assert(dec_cache_entry_name != NULL); 219 220 configuration_lock_rdlock(s_configuration); 221 c_entry = find_cache_entry(s_cache, 222 dec_cache_entry_name); 223 configuration_unlock(s_configuration); 224 225 if (c_entry == INVALID_CACHE_ENTRY) 226 c_entry = register_new_mp_cache_entry(qstate, 227 dec_cache_entry_name); 228 229 free(dec_cache_entry_name); 230 231 assert(c_entry != NULL); 232 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 233 ws = open_cache_mp_write_session(c_entry); 234 if (ws == INVALID_CACHE_MP_WRITE_SESSION) 235 c_mp_ws_response->error_code = -1; 236 else { 237 qstate->mdata = ws; 238 qstate->destroy_func = on_mp_write_session_destroy; 239 240 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 241 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 242 memcpy(&qstate->timeout, 243 &qstate->config_entry->mp_query_timeout, 244 sizeof(struct timeval)); 245 } 246 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 247 248 fin: 249 qstate->process_func = on_mp_write_session_response_write1; 250 qstate->kevent_watermark = sizeof(int); 251 qstate->kevent_filter = EVFILT_WRITE; 252 253 TRACE_OUT(on_mp_write_session_request_process); 254 return (0); 255 } 256 257 static int 258 on_mp_write_session_response_write1(struct query_state *qstate) 259 { 260 struct cache_mp_write_session_response *c_mp_ws_response; 261 ssize_t result; 262 263 TRACE_IN(on_mp_write_session_response_write1); 264 c_mp_ws_response = get_cache_mp_write_session_response( 265 &qstate->response); 266 result = qstate->write_func(qstate, &c_mp_ws_response->error_code, 267 sizeof(int)); 268 if (result != sizeof(int)) { 269 LOG_ERR_3("on_mp_write_session_response_write1", 270 "write failed"); 271 TRACE_OUT(on_mp_write_session_response_write1); 272 return (-1); 273 } 274 275 if (c_mp_ws_response->error_code == 0) { 276 qstate->kevent_watermark = sizeof(int); 277 qstate->process_func = on_mp_write_session_mapper; 278 qstate->kevent_filter = EVFILT_READ; 279 } else { 280 qstate->kevent_watermark = 0; 281 qstate->process_func = NULL; 282 } 283 TRACE_OUT(on_mp_write_session_response_write1); 284 return (0); 285 } 286 287 /* 288 * Mapper function is used to avoid multiple connections for each session 289 * write or read requests. After processing the request, it does not close 290 * the connection, but waits for the next request. 291 */ 292 static int 293 on_mp_write_session_mapper(struct query_state *qstate) 294 { 295 ssize_t result; 296 int elem_type; 297 298 TRACE_IN(on_mp_write_session_mapper); 299 if (qstate->kevent_watermark == 0) { 300 qstate->kevent_watermark = sizeof(int); 301 } else { 302 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 303 if (result != sizeof(int)) { 304 LOG_ERR_3("on_mp_write_session_mapper", 305 "read failed"); 306 TRACE_OUT(on_mp_write_session_mapper); 307 return (-1); 308 } 309 310 switch (elem_type) { 311 case CET_MP_WRITE_SESSION_WRITE_REQUEST: 312 qstate->kevent_watermark = sizeof(size_t); 313 qstate->process_func = 314 on_mp_write_session_write_request_read1; 315 break; 316 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION: 317 qstate->kevent_watermark = 0; 318 qstate->process_func = 319 on_mp_write_session_abandon_notification; 320 break; 321 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION: 322 qstate->kevent_watermark = 0; 323 qstate->process_func = 324 on_mp_write_session_close_notification; 325 break; 326 default: 327 qstate->kevent_watermark = 0; 328 qstate->process_func = NULL; 329 LOG_ERR_2("on_mp_write_session_mapper", 330 "unknown element type"); 331 TRACE_OUT(on_mp_write_session_mapper); 332 return (-1); 333 } 334 } 335 TRACE_OUT(on_mp_write_session_mapper); 336 return (0); 337 } 338 339 /* 340 * The functions below are used to process multipart write sessions write 341 * requests. 342 * - on_mp_write_session_write_request_read1 and 343 * on_mp_write_session_write_request_read2 read the request itself 344 * - on_mp_write_session_write_request_process processes it 345 * - on_mp_write_session_write_response_write1 sends the response 346 */ 347 static int 348 on_mp_write_session_write_request_read1(struct query_state *qstate) 349 { 350 struct cache_mp_write_session_write_request *write_request; 351 ssize_t result; 352 353 TRACE_IN(on_mp_write_session_write_request_read1); 354 init_comm_element(&qstate->request, 355 CET_MP_WRITE_SESSION_WRITE_REQUEST); 356 write_request = get_cache_mp_write_session_write_request( 357 &qstate->request); 358 359 result = qstate->read_func(qstate, &write_request->data_size, 360 sizeof(size_t)); 361 362 if (result != sizeof(size_t)) { 363 LOG_ERR_3("on_mp_write_session_write_request_read1", 364 "read failed"); 365 TRACE_OUT(on_mp_write_session_write_request_read1); 366 return (-1); 367 } 368 369 if (BUFSIZE_INVALID(write_request->data_size)) { 370 LOG_ERR_3("on_mp_write_session_write_request_read1", 371 "invalid data_size value"); 372 TRACE_OUT(on_mp_write_session_write_request_read1); 373 return (-1); 374 } 375 376 write_request->data = calloc(1, write_request->data_size); 377 assert(write_request->data != NULL); 378 379 qstate->kevent_watermark = write_request->data_size; 380 qstate->process_func = on_mp_write_session_write_request_read2; 381 TRACE_OUT(on_mp_write_session_write_request_read1); 382 return (0); 383 } 384 385 static int 386 on_mp_write_session_write_request_read2(struct query_state *qstate) 387 { 388 struct cache_mp_write_session_write_request *write_request; 389 ssize_t result; 390 391 TRACE_IN(on_mp_write_session_write_request_read2); 392 write_request = get_cache_mp_write_session_write_request( 393 &qstate->request); 394 395 result = qstate->read_func(qstate, write_request->data, 396 write_request->data_size); 397 398 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 399 LOG_ERR_3("on_mp_write_session_write_request_read2", 400 "read failed"); 401 TRACE_OUT(on_mp_write_session_write_request_read2); 402 return (-1); 403 } 404 405 qstate->kevent_watermark = 0; 406 qstate->process_func = on_mp_write_session_write_request_process; 407 TRACE_OUT(on_mp_write_session_write_request_read2); 408 return (0); 409 } 410 411 static int 412 on_mp_write_session_write_request_process(struct query_state *qstate) 413 { 414 struct cache_mp_write_session_write_request *write_request; 415 struct cache_mp_write_session_write_response *write_response; 416 417 TRACE_IN(on_mp_write_session_write_request_process); 418 init_comm_element(&qstate->response, 419 CET_MP_WRITE_SESSION_WRITE_RESPONSE); 420 write_response = get_cache_mp_write_session_write_response( 421 &qstate->response); 422 write_request = get_cache_mp_write_session_write_request( 423 &qstate->request); 424 425 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 426 write_response->error_code = cache_mp_write( 427 (cache_mp_write_session)qstate->mdata, 428 write_request->data, 429 write_request->data_size); 430 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 431 432 qstate->kevent_watermark = sizeof(int); 433 qstate->process_func = on_mp_write_session_write_response_write1; 434 qstate->kevent_filter = EVFILT_WRITE; 435 436 TRACE_OUT(on_mp_write_session_write_request_process); 437 return (0); 438 } 439 440 static int 441 on_mp_write_session_write_response_write1(struct query_state *qstate) 442 { 443 struct cache_mp_write_session_write_response *write_response; 444 ssize_t result; 445 446 TRACE_IN(on_mp_write_session_write_response_write1); 447 write_response = get_cache_mp_write_session_write_response( 448 &qstate->response); 449 result = qstate->write_func(qstate, &write_response->error_code, 450 sizeof(int)); 451 if (result != sizeof(int)) { 452 LOG_ERR_3("on_mp_write_session_write_response_write1", 453 "write failed"); 454 TRACE_OUT(on_mp_write_session_write_response_write1); 455 return (-1); 456 } 457 458 if (write_response->error_code == 0) { 459 finalize_comm_element(&qstate->request); 460 finalize_comm_element(&qstate->response); 461 462 qstate->kevent_watermark = sizeof(int); 463 qstate->process_func = on_mp_write_session_mapper; 464 qstate->kevent_filter = EVFILT_READ; 465 } else { 466 qstate->kevent_watermark = 0; 467 qstate->process_func = 0; 468 } 469 470 TRACE_OUT(on_mp_write_session_write_response_write1); 471 return (0); 472 } 473 474 /* 475 * Handles abandon notifications. Destroys the session by calling the 476 * abandon_cache_mp_write_session. 477 */ 478 static int 479 on_mp_write_session_abandon_notification(struct query_state *qstate) 480 { 481 TRACE_IN(on_mp_write_session_abandon_notification); 482 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 483 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 484 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 485 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 486 487 qstate->kevent_watermark = 0; 488 qstate->process_func = NULL; 489 TRACE_OUT(on_mp_write_session_abandon_notification); 490 return (0); 491 } 492 493 /* 494 * Handles close notifications. Commits the session by calling 495 * the close_cache_mp_write_session. 496 */ 497 static int 498 on_mp_write_session_close_notification(struct query_state *qstate) 499 { 500 TRACE_IN(on_mp_write_session_close_notification); 501 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 502 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 503 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 504 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 505 506 qstate->kevent_watermark = 0; 507 qstate->process_func = NULL; 508 TRACE_OUT(on_mp_write_session_close_notification); 509 return (0); 510 } 511 512 cache_entry register_new_mp_cache_entry(struct query_state *qstate, 513 const char *dec_cache_entry_name) 514 { 515 cache_entry c_entry; 516 char *en_bkp; 517 518 TRACE_IN(register_new_mp_cache_entry); 519 c_entry = INVALID_CACHE_ENTRY; 520 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 521 522 configuration_lock_wrlock(s_configuration); 523 en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name; 524 qstate->config_entry->mp_cache_params.cep.entry_name = 525 (char *)dec_cache_entry_name; 526 register_cache_entry(s_cache, (struct cache_entry_params *) 527 &qstate->config_entry->mp_cache_params); 528 qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp; 529 configuration_unlock(s_configuration); 530 531 configuration_lock_rdlock(s_configuration); 532 c_entry = find_cache_entry(s_cache, 533 dec_cache_entry_name); 534 configuration_unlock(s_configuration); 535 536 configuration_entry_add_mp_cache_entry(qstate->config_entry, 537 c_entry); 538 539 configuration_unlock_entry(qstate->config_entry, 540 CELT_MULTIPART); 541 542 TRACE_OUT(register_new_mp_cache_entry); 543 return (c_entry); 544 } 545