1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/types.h> 32 #include <sys/event.h> 33 #include <sys/socket.h> 34 #include <sys/time.h> 35 36 #include <assert.h> 37 #include <errno.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 42 #include "cachelib.h" 43 #include "config.h" 44 #include "debug.h" 45 #include "log.h" 46 #include "query.h" 47 #include "mp_ws_query.h" 48 #include "singletons.h" 49 50 static int on_mp_write_session_abandon_notification(struct query_state *); 51 static int on_mp_write_session_close_notification(struct query_state *); 52 static void on_mp_write_session_destroy(struct query_state *); 53 static int on_mp_write_session_mapper(struct query_state *); 54 /* int on_mp_write_session_request_read1(struct query_state *); */ 55 static int on_mp_write_session_request_read2(struct query_state *); 56 static int on_mp_write_session_request_process(struct query_state *); 57 static int on_mp_write_session_response_write1(struct query_state *); 58 static int on_mp_write_session_write_request_read1(struct query_state *); 59 static int on_mp_write_session_write_request_read2(struct query_state *); 60 static int on_mp_write_session_write_request_process(struct query_state *); 61 static int on_mp_write_session_write_response_write1(struct query_state *); 62 63 /* 64 * This function is used as the query_state's destroy_func to make the 65 * proper cleanup in case of errors. 66 */ 67 static void 68 on_mp_write_session_destroy(struct query_state *qstate) 69 { 70 71 TRACE_IN(on_mp_write_session_destroy); 72 finalize_comm_element(&qstate->request); 73 finalize_comm_element(&qstate->response); 74 75 if (qstate->mdata != NULL) { 76 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 77 abandon_cache_mp_write_session( 78 (cache_mp_write_session)qstate->mdata); 79 configuration_unlock_entry(qstate->config_entry, 80 CELT_MULTIPART); 81 } 82 TRACE_OUT(on_mp_write_session_destroy); 83 } 84 85 /* 86 * The functions below are used to process multipart write session initiation 87 * requests. 88 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2 89 * read the request itself 90 * - on_mp_write_session_request_process processes it 91 * - on_mp_write_session_response_write1 sends the response 92 */ 93 int 94 on_mp_write_session_request_read1(struct query_state *qstate) 95 { 96 struct cache_mp_write_session_request *c_mp_ws_request; 97 ssize_t result; 98 99 TRACE_IN(on_mp_write_session_request_read1); 100 if (qstate->kevent_watermark == 0) 101 qstate->kevent_watermark = sizeof(size_t); 102 else { 103 init_comm_element(&qstate->request, 104 CET_MP_WRITE_SESSION_REQUEST); 105 c_mp_ws_request = get_cache_mp_write_session_request( 106 &qstate->request); 107 108 result = qstate->read_func(qstate, 109 &c_mp_ws_request->entry_length, sizeof(size_t)); 110 111 if (result != sizeof(size_t)) { 112 LOG_ERR_3("on_mp_write_session_request_read1", 113 "read failed"); 114 TRACE_OUT(on_mp_write_session_request_read1); 115 return (-1); 116 } 117 118 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) { 119 LOG_ERR_3("on_mp_write_session_request_read1", 120 "invalid entry_length value"); 121 TRACE_OUT(on_mp_write_session_request_read1); 122 return (-1); 123 } 124 125 c_mp_ws_request->entry = calloc(1, 126 c_mp_ws_request->entry_length + 1); 127 assert(c_mp_ws_request->entry != NULL); 128 129 qstate->kevent_watermark = c_mp_ws_request->entry_length; 130 qstate->process_func = on_mp_write_session_request_read2; 131 } 132 TRACE_OUT(on_mp_write_session_request_read1); 133 return (0); 134 } 135 136 static int 137 on_mp_write_session_request_read2(struct query_state *qstate) 138 { 139 struct cache_mp_write_session_request *c_mp_ws_request; 140 ssize_t result; 141 142 TRACE_IN(on_mp_write_session_request_read2); 143 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 144 145 result = qstate->read_func(qstate, c_mp_ws_request->entry, 146 c_mp_ws_request->entry_length); 147 148 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 149 LOG_ERR_3("on_mp_write_session_request_read2", 150 "read failed"); 151 TRACE_OUT(on_mp_write_session_request_read2); 152 return (-1); 153 } 154 155 qstate->kevent_watermark = 0; 156 qstate->process_func = on_mp_write_session_request_process; 157 158 TRACE_OUT(on_mp_write_session_request_read2); 159 return (0); 160 } 161 162 static int 163 on_mp_write_session_request_process(struct query_state *qstate) 164 { 165 struct cache_mp_write_session_request *c_mp_ws_request; 166 struct cache_mp_write_session_response *c_mp_ws_response; 167 cache_mp_write_session ws; 168 cache_entry c_entry; 169 char *dec_cache_entry_name; 170 171 TRACE_IN(on_mp_write_session_request_process); 172 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE); 173 c_mp_ws_response = get_cache_mp_write_session_response( 174 &qstate->response); 175 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 176 177 qstate->config_entry = configuration_find_entry( 178 s_configuration, c_mp_ws_request->entry); 179 if (qstate->config_entry == NULL) { 180 c_mp_ws_response->error_code = ENOENT; 181 182 LOG_ERR_2("write_session_request", 183 "can't find configuration entry '%s'. " 184 "aborting request", c_mp_ws_request->entry); 185 goto fin; 186 } 187 188 if (qstate->config_entry->enabled == 0) { 189 c_mp_ws_response->error_code = EACCES; 190 191 LOG_ERR_2("write_session_request", 192 "configuration entry '%s' is disabled", 193 c_mp_ws_request->entry); 194 goto fin; 195 } 196 197 if (qstate->config_entry->perform_actual_lookups != 0) { 198 c_mp_ws_response->error_code = EOPNOTSUPP; 199 200 LOG_ERR_2("write_session_request", 201 "entry '%s' performs lookups by itself: " 202 "can't write to it", c_mp_ws_request->entry); 203 goto fin; 204 } else { 205 #ifdef NS_NSCD_EID_CHECKING 206 if (check_query_eids(qstate) != 0) { 207 c_mp_ws_response->error_code = EPERM; 208 goto fin; 209 } 210 #endif 211 } 212 213 /* 214 * All multipart entries are separated by their name decorations. 215 * For one configuration entry there will be a lot of multipart 216 * cache entries - each with its own decorated name. 217 */ 218 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 219 qstate->config_entry->mp_cache_params.cep.entry_name); 220 assert(dec_cache_entry_name != NULL); 221 222 configuration_lock_rdlock(s_configuration); 223 c_entry = find_cache_entry(s_cache, 224 dec_cache_entry_name); 225 configuration_unlock(s_configuration); 226 227 if (c_entry == INVALID_CACHE_ENTRY) 228 c_entry = register_new_mp_cache_entry(qstate, 229 dec_cache_entry_name); 230 231 free(dec_cache_entry_name); 232 233 assert(c_entry != NULL); 234 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 235 ws = open_cache_mp_write_session(c_entry); 236 if (ws == INVALID_CACHE_MP_WRITE_SESSION) 237 c_mp_ws_response->error_code = -1; 238 else { 239 qstate->mdata = ws; 240 qstate->destroy_func = on_mp_write_session_destroy; 241 242 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 243 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 244 memcpy(&qstate->timeout, 245 &qstate->config_entry->mp_query_timeout, 246 sizeof(struct timeval)); 247 } 248 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 249 250 fin: 251 qstate->process_func = on_mp_write_session_response_write1; 252 qstate->kevent_watermark = sizeof(int); 253 qstate->kevent_filter = EVFILT_WRITE; 254 255 TRACE_OUT(on_mp_write_session_request_process); 256 return (0); 257 } 258 259 static int 260 on_mp_write_session_response_write1(struct query_state *qstate) 261 { 262 struct cache_mp_write_session_response *c_mp_ws_response; 263 ssize_t result; 264 265 TRACE_IN(on_mp_write_session_response_write1); 266 c_mp_ws_response = get_cache_mp_write_session_response( 267 &qstate->response); 268 result = qstate->write_func(qstate, &c_mp_ws_response->error_code, 269 sizeof(int)); 270 if (result != sizeof(int)) { 271 LOG_ERR_3("on_mp_write_session_response_write1", 272 "write failed"); 273 TRACE_OUT(on_mp_write_session_response_write1); 274 return (-1); 275 } 276 277 if (c_mp_ws_response->error_code == 0) { 278 qstate->kevent_watermark = sizeof(int); 279 qstate->process_func = on_mp_write_session_mapper; 280 qstate->kevent_filter = EVFILT_READ; 281 } else { 282 qstate->kevent_watermark = 0; 283 qstate->process_func = NULL; 284 } 285 TRACE_OUT(on_mp_write_session_response_write1); 286 return (0); 287 } 288 289 /* 290 * Mapper function is used to avoid multiple connections for each session 291 * write or read requests. After processing the request, it does not close 292 * the connection, but waits for the next request. 293 */ 294 static int 295 on_mp_write_session_mapper(struct query_state *qstate) 296 { 297 ssize_t result; 298 int elem_type; 299 300 TRACE_IN(on_mp_write_session_mapper); 301 if (qstate->kevent_watermark == 0) { 302 qstate->kevent_watermark = sizeof(int); 303 } else { 304 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 305 if (result != sizeof(int)) { 306 LOG_ERR_3("on_mp_write_session_mapper", 307 "read failed"); 308 TRACE_OUT(on_mp_write_session_mapper); 309 return (-1); 310 } 311 312 switch (elem_type) { 313 case CET_MP_WRITE_SESSION_WRITE_REQUEST: 314 qstate->kevent_watermark = sizeof(size_t); 315 qstate->process_func = 316 on_mp_write_session_write_request_read1; 317 break; 318 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION: 319 qstate->kevent_watermark = 0; 320 qstate->process_func = 321 on_mp_write_session_abandon_notification; 322 break; 323 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION: 324 qstate->kevent_watermark = 0; 325 qstate->process_func = 326 on_mp_write_session_close_notification; 327 break; 328 default: 329 qstate->kevent_watermark = 0; 330 qstate->process_func = NULL; 331 LOG_ERR_2("on_mp_write_session_mapper", 332 "unknown element type"); 333 TRACE_OUT(on_mp_write_session_mapper); 334 return (-1); 335 } 336 } 337 TRACE_OUT(on_mp_write_session_mapper); 338 return (0); 339 } 340 341 /* 342 * The functions below are used to process multipart write sessions write 343 * requests. 344 * - on_mp_write_session_write_request_read1 and 345 * on_mp_write_session_write_request_read2 read the request itself 346 * - on_mp_write_session_write_request_process processes it 347 * - on_mp_write_session_write_response_write1 sends the response 348 */ 349 static int 350 on_mp_write_session_write_request_read1(struct query_state *qstate) 351 { 352 struct cache_mp_write_session_write_request *write_request; 353 ssize_t result; 354 355 TRACE_IN(on_mp_write_session_write_request_read1); 356 init_comm_element(&qstate->request, 357 CET_MP_WRITE_SESSION_WRITE_REQUEST); 358 write_request = get_cache_mp_write_session_write_request( 359 &qstate->request); 360 361 result = qstate->read_func(qstate, &write_request->data_size, 362 sizeof(size_t)); 363 364 if (result != sizeof(size_t)) { 365 LOG_ERR_3("on_mp_write_session_write_request_read1", 366 "read failed"); 367 TRACE_OUT(on_mp_write_session_write_request_read1); 368 return (-1); 369 } 370 371 if (BUFSIZE_INVALID(write_request->data_size)) { 372 LOG_ERR_3("on_mp_write_session_write_request_read1", 373 "invalid data_size value"); 374 TRACE_OUT(on_mp_write_session_write_request_read1); 375 return (-1); 376 } 377 378 write_request->data = calloc(1, write_request->data_size); 379 assert(write_request->data != NULL); 380 381 qstate->kevent_watermark = write_request->data_size; 382 qstate->process_func = on_mp_write_session_write_request_read2; 383 TRACE_OUT(on_mp_write_session_write_request_read1); 384 return (0); 385 } 386 387 static int 388 on_mp_write_session_write_request_read2(struct query_state *qstate) 389 { 390 struct cache_mp_write_session_write_request *write_request; 391 ssize_t result; 392 393 TRACE_IN(on_mp_write_session_write_request_read2); 394 write_request = get_cache_mp_write_session_write_request( 395 &qstate->request); 396 397 result = qstate->read_func(qstate, write_request->data, 398 write_request->data_size); 399 400 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 401 LOG_ERR_3("on_mp_write_session_write_request_read2", 402 "read failed"); 403 TRACE_OUT(on_mp_write_session_write_request_read2); 404 return (-1); 405 } 406 407 qstate->kevent_watermark = 0; 408 qstate->process_func = on_mp_write_session_write_request_process; 409 TRACE_OUT(on_mp_write_session_write_request_read2); 410 return (0); 411 } 412 413 static int 414 on_mp_write_session_write_request_process(struct query_state *qstate) 415 { 416 struct cache_mp_write_session_write_request *write_request; 417 struct cache_mp_write_session_write_response *write_response; 418 419 TRACE_IN(on_mp_write_session_write_request_process); 420 init_comm_element(&qstate->response, 421 CET_MP_WRITE_SESSION_WRITE_RESPONSE); 422 write_response = get_cache_mp_write_session_write_response( 423 &qstate->response); 424 write_request = get_cache_mp_write_session_write_request( 425 &qstate->request); 426 427 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 428 write_response->error_code = cache_mp_write( 429 (cache_mp_write_session)qstate->mdata, 430 write_request->data, 431 write_request->data_size); 432 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 433 434 qstate->kevent_watermark = sizeof(int); 435 qstate->process_func = on_mp_write_session_write_response_write1; 436 qstate->kevent_filter = EVFILT_WRITE; 437 438 TRACE_OUT(on_mp_write_session_write_request_process); 439 return (0); 440 } 441 442 static int 443 on_mp_write_session_write_response_write1(struct query_state *qstate) 444 { 445 struct cache_mp_write_session_write_response *write_response; 446 ssize_t result; 447 448 TRACE_IN(on_mp_write_session_write_response_write1); 449 write_response = get_cache_mp_write_session_write_response( 450 &qstate->response); 451 result = qstate->write_func(qstate, &write_response->error_code, 452 sizeof(int)); 453 if (result != sizeof(int)) { 454 LOG_ERR_3("on_mp_write_session_write_response_write1", 455 "write failed"); 456 TRACE_OUT(on_mp_write_session_write_response_write1); 457 return (-1); 458 } 459 460 if (write_response->error_code == 0) { 461 finalize_comm_element(&qstate->request); 462 finalize_comm_element(&qstate->response); 463 464 qstate->kevent_watermark = sizeof(int); 465 qstate->process_func = on_mp_write_session_mapper; 466 qstate->kevent_filter = EVFILT_READ; 467 } else { 468 qstate->kevent_watermark = 0; 469 qstate->process_func = 0; 470 } 471 472 TRACE_OUT(on_mp_write_session_write_response_write1); 473 return (0); 474 } 475 476 /* 477 * Handles abandon notifications. Destroys the session by calling the 478 * abandon_cache_mp_write_session. 479 */ 480 static int 481 on_mp_write_session_abandon_notification(struct query_state *qstate) 482 { 483 TRACE_IN(on_mp_write_session_abandon_notification); 484 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 485 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 486 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 487 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 488 489 qstate->kevent_watermark = 0; 490 qstate->process_func = NULL; 491 TRACE_OUT(on_mp_write_session_abandon_notification); 492 return (0); 493 } 494 495 /* 496 * Handles close notifications. Commits the session by calling 497 * the close_cache_mp_write_session. 498 */ 499 static int 500 on_mp_write_session_close_notification(struct query_state *qstate) 501 { 502 TRACE_IN(on_mp_write_session_close_notification); 503 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 504 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 505 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 506 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 507 508 qstate->kevent_watermark = 0; 509 qstate->process_func = NULL; 510 TRACE_OUT(on_mp_write_session_close_notification); 511 return (0); 512 } 513 514 cache_entry register_new_mp_cache_entry(struct query_state *qstate, 515 const char *dec_cache_entry_name) 516 { 517 cache_entry c_entry; 518 char *en_bkp; 519 520 TRACE_IN(register_new_mp_cache_entry); 521 c_entry = INVALID_CACHE_ENTRY; 522 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 523 524 configuration_lock_wrlock(s_configuration); 525 en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name; 526 qstate->config_entry->mp_cache_params.cep.entry_name = 527 (char *)dec_cache_entry_name; 528 register_cache_entry(s_cache, (struct cache_entry_params *) 529 &qstate->config_entry->mp_cache_params); 530 qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp; 531 configuration_unlock(s_configuration); 532 533 configuration_lock_rdlock(s_configuration); 534 c_entry = find_cache_entry(s_cache, 535 dec_cache_entry_name); 536 configuration_unlock(s_configuration); 537 538 configuration_entry_add_mp_cache_entry(qstate->config_entry, 539 c_entry); 540 541 configuration_unlock_entry(qstate->config_entry, 542 CELT_MULTIPART); 543 544 TRACE_OUT(register_new_mp_cache_entry); 545 return (c_entry); 546 } 547