1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/socket.h> 32 #include <sys/time.h> 33 #include <sys/types.h> 34 #include <sys/event.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <stdio.h> 40 41 #include "cachelib.h" 42 #include "config.h" 43 #include "debug.h" 44 #include "log.h" 45 #include "query.h" 46 #include "mp_ws_query.h" 47 #include "singletons.h" 48 49 static int on_mp_write_session_abandon_notification(struct query_state *); 50 static int on_mp_write_session_close_notification(struct query_state *); 51 static void on_mp_write_session_destroy(struct query_state *); 52 static int on_mp_write_session_mapper(struct query_state *); 53 /* int on_mp_write_session_request_read1(struct query_state *); */ 54 static int on_mp_write_session_request_read2(struct query_state *); 55 static int on_mp_write_session_request_process(struct query_state *); 56 static int on_mp_write_session_response_write1(struct query_state *); 57 static int on_mp_write_session_write_request_read1(struct query_state *); 58 static int on_mp_write_session_write_request_read2(struct query_state *); 59 static int on_mp_write_session_write_request_process(struct query_state *); 60 static int on_mp_write_session_write_response_write1(struct query_state *); 61 62 /* 63 * This function is used as the query_state's destroy_func to make the 64 * proper cleanup in case of errors. 65 */ 66 static void 67 on_mp_write_session_destroy(struct query_state *qstate) 68 { 69 70 TRACE_IN(on_mp_write_session_destroy); 71 finalize_comm_element(&qstate->request); 72 finalize_comm_element(&qstate->response); 73 74 if (qstate->mdata != NULL) { 75 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 76 abandon_cache_mp_write_session( 77 (cache_mp_write_session)qstate->mdata); 78 configuration_unlock_entry(qstate->config_entry, 79 CELT_MULTIPART); 80 } 81 TRACE_OUT(on_mp_write_session_destroy); 82 } 83 84 /* 85 * The functions below are used to process multipart write session initiation 86 * requests. 87 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2 88 * read the request itself 89 * - on_mp_write_session_request_process processes it 90 * - on_mp_write_session_response_write1 sends the response 91 */ 92 int 93 on_mp_write_session_request_read1(struct query_state *qstate) 94 { 95 struct cache_mp_write_session_request *c_mp_ws_request; 96 ssize_t result; 97 98 TRACE_IN(on_mp_write_session_request_read1); 99 if (qstate->kevent_watermark == 0) 100 qstate->kevent_watermark = sizeof(size_t); 101 else { 102 init_comm_element(&qstate->request, 103 CET_MP_WRITE_SESSION_REQUEST); 104 c_mp_ws_request = get_cache_mp_write_session_request( 105 &qstate->request); 106 107 result = qstate->read_func(qstate, 108 &c_mp_ws_request->entry_length, sizeof(size_t)); 109 110 if (result != sizeof(size_t)) { 111 LOG_ERR_3("on_mp_write_session_request_read1", 112 "read failed"); 113 TRACE_OUT(on_mp_write_session_request_read1); 114 return (-1); 115 } 116 117 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) { 118 LOG_ERR_3("on_mp_write_session_request_read1", 119 "invalid entry_length value"); 120 TRACE_OUT(on_mp_write_session_request_read1); 121 return (-1); 122 } 123 124 c_mp_ws_request->entry = (char *)malloc( 125 c_mp_ws_request->entry_length + 1); 126 assert(c_mp_ws_request->entry != NULL); 127 memset(c_mp_ws_request->entry, 0, 128 c_mp_ws_request->entry_length + 1); 129 130 qstate->kevent_watermark = c_mp_ws_request->entry_length; 131 qstate->process_func = on_mp_write_session_request_read2; 132 } 133 TRACE_OUT(on_mp_write_session_request_read1); 134 return (0); 135 } 136 137 static int 138 on_mp_write_session_request_read2(struct query_state *qstate) 139 { 140 struct cache_mp_write_session_request *c_mp_ws_request; 141 ssize_t result; 142 143 TRACE_IN(on_mp_write_session_request_read2); 144 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 145 146 result = qstate->read_func(qstate, c_mp_ws_request->entry, 147 c_mp_ws_request->entry_length); 148 149 if (result != qstate->kevent_watermark) { 150 LOG_ERR_3("on_mp_write_session_request_read2", 151 "read failed"); 152 TRACE_OUT(on_mp_write_session_request_read2); 153 return (-1); 154 } 155 156 qstate->kevent_watermark = 0; 157 qstate->process_func = on_mp_write_session_request_process; 158 159 TRACE_OUT(on_mp_write_session_request_read2); 160 return (0); 161 } 162 163 static int 164 on_mp_write_session_request_process(struct query_state *qstate) 165 { 166 struct cache_mp_write_session_request *c_mp_ws_request; 167 struct cache_mp_write_session_response *c_mp_ws_response; 168 cache_mp_write_session ws; 169 cache_entry c_entry; 170 char *dec_cache_entry_name; 171 172 TRACE_IN(on_mp_write_session_request_process); 173 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE); 174 c_mp_ws_response = get_cache_mp_write_session_response( 175 &qstate->response); 176 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request); 177 178 qstate->config_entry = configuration_find_entry( 179 s_configuration, c_mp_ws_request->entry); 180 if (qstate->config_entry == NULL) { 181 c_mp_ws_response->error_code = ENOENT; 182 183 LOG_ERR_2("write_session_request", 184 "can't find configuration entry '%s'. " 185 "aborting request", c_mp_ws_request->entry); 186 goto fin; 187 } 188 189 if (qstate->config_entry->enabled == 0) { 190 c_mp_ws_response->error_code = EACCES; 191 192 LOG_ERR_2("write_session_request", 193 "configuration entry '%s' is disabled", 194 c_mp_ws_request->entry); 195 goto fin; 196 } 197 198 if (qstate->config_entry->perform_actual_lookups != 0) { 199 c_mp_ws_response->error_code = EOPNOTSUPP; 200 201 LOG_ERR_2("write_session_request", 202 "entry '%s' performs lookups by itself: " 203 "can't write to it", c_mp_ws_request->entry); 204 goto fin; 205 } else { 206 #ifdef NS_NSCD_EID_CHECKING 207 if (check_query_eids(qstate) != 0) { 208 c_mp_ws_response->error_code = EPERM; 209 goto fin; 210 } 211 #endif 212 } 213 214 /* 215 * All multipart entries are separated by their name decorations. 216 * For one configuration entry there will be a lot of multipart 217 * cache entries - each with its own decorated name. 218 */ 219 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 220 qstate->config_entry->mp_cache_params.entry_name); 221 assert(dec_cache_entry_name != NULL); 222 223 configuration_lock_rdlock(s_configuration); 224 c_entry = find_cache_entry(s_cache, 225 dec_cache_entry_name); 226 configuration_unlock(s_configuration); 227 228 if (c_entry == INVALID_CACHE_ENTRY) 229 c_entry = register_new_mp_cache_entry(qstate, 230 dec_cache_entry_name); 231 232 free(dec_cache_entry_name); 233 234 assert(c_entry != NULL); 235 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 236 ws = open_cache_mp_write_session(c_entry); 237 if (ws == INVALID_CACHE_MP_WRITE_SESSION) 238 c_mp_ws_response->error_code = -1; 239 else { 240 qstate->mdata = ws; 241 qstate->destroy_func = on_mp_write_session_destroy; 242 243 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 244 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 245 memcpy(&qstate->timeout, 246 &qstate->config_entry->mp_query_timeout, 247 sizeof(struct timeval)); 248 } 249 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 250 251 fin: 252 qstate->process_func = on_mp_write_session_response_write1; 253 qstate->kevent_watermark = sizeof(int); 254 qstate->kevent_filter = EVFILT_WRITE; 255 256 TRACE_OUT(on_mp_write_session_request_process); 257 return (0); 258 } 259 260 static int 261 on_mp_write_session_response_write1(struct query_state *qstate) 262 { 263 struct cache_mp_write_session_response *c_mp_ws_response; 264 ssize_t result; 265 266 TRACE_IN(on_mp_write_session_response_write1); 267 c_mp_ws_response = get_cache_mp_write_session_response( 268 &qstate->response); 269 result = qstate->write_func(qstate, &c_mp_ws_response->error_code, 270 sizeof(int)); 271 if (result != sizeof(int)) { 272 LOG_ERR_3("on_mp_write_session_response_write1", 273 "write failed"); 274 TRACE_OUT(on_mp_write_session_response_write1); 275 return (-1); 276 } 277 278 if (c_mp_ws_response->error_code == 0) { 279 qstate->kevent_watermark = sizeof(int); 280 qstate->process_func = on_mp_write_session_mapper; 281 qstate->kevent_filter = EVFILT_READ; 282 } else { 283 qstate->kevent_watermark = 0; 284 qstate->process_func = NULL; 285 } 286 TRACE_OUT(on_mp_write_session_response_write1); 287 return (0); 288 } 289 290 /* 291 * Mapper function is used to avoid multiple connections for each session 292 * write or read requests. After processing the request, it does not close 293 * the connection, but waits for the next request. 294 */ 295 static int 296 on_mp_write_session_mapper(struct query_state *qstate) 297 { 298 ssize_t result; 299 int elem_type; 300 301 TRACE_IN(on_mp_write_session_mapper); 302 if (qstate->kevent_watermark == 0) { 303 qstate->kevent_watermark = sizeof(int); 304 } else { 305 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 306 if (result != sizeof(int)) { 307 LOG_ERR_3("on_mp_write_session_mapper", 308 "read failed"); 309 TRACE_OUT(on_mp_write_session_mapper); 310 return (-1); 311 } 312 313 switch (elem_type) { 314 case CET_MP_WRITE_SESSION_WRITE_REQUEST: 315 qstate->kevent_watermark = sizeof(size_t); 316 qstate->process_func = 317 on_mp_write_session_write_request_read1; 318 break; 319 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION: 320 qstate->kevent_watermark = 0; 321 qstate->process_func = 322 on_mp_write_session_abandon_notification; 323 break; 324 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION: 325 qstate->kevent_watermark = 0; 326 qstate->process_func = 327 on_mp_write_session_close_notification; 328 break; 329 default: 330 qstate->kevent_watermark = 0; 331 qstate->process_func = NULL; 332 LOG_ERR_2("on_mp_write_session_mapper", 333 "unknown element type"); 334 TRACE_OUT(on_mp_write_session_mapper); 335 return (-1); 336 } 337 } 338 TRACE_OUT(on_mp_write_session_mapper); 339 return (0); 340 } 341 342 /* 343 * The functions below are used to process multipart write sessions write 344 * requests. 345 * - on_mp_write_session_write_request_read1 and 346 * on_mp_write_session_write_request_read2 read the request itself 347 * - on_mp_write_session_write_request_process processes it 348 * - on_mp_write_session_write_response_write1 sends the response 349 */ 350 static int 351 on_mp_write_session_write_request_read1(struct query_state *qstate) 352 { 353 struct cache_mp_write_session_write_request *write_request; 354 ssize_t result; 355 356 TRACE_IN(on_mp_write_session_write_request_read1); 357 init_comm_element(&qstate->request, 358 CET_MP_WRITE_SESSION_WRITE_REQUEST); 359 write_request = get_cache_mp_write_session_write_request( 360 &qstate->request); 361 362 result = qstate->read_func(qstate, &write_request->data_size, 363 sizeof(size_t)); 364 365 if (result != sizeof(size_t)) { 366 LOG_ERR_3("on_mp_write_session_write_request_read1", 367 "read failed"); 368 TRACE_OUT(on_mp_write_session_write_request_read1); 369 return (-1); 370 } 371 372 if (BUFSIZE_INVALID(write_request->data_size)) { 373 LOG_ERR_3("on_mp_write_session_write_request_read1", 374 "invalid data_size value"); 375 TRACE_OUT(on_mp_write_session_write_request_read1); 376 return (-1); 377 } 378 379 write_request->data = (char *)malloc(write_request->data_size); 380 assert(write_request->data != NULL); 381 memset(write_request->data, 0, write_request->data_size); 382 383 qstate->kevent_watermark = write_request->data_size; 384 qstate->process_func = on_mp_write_session_write_request_read2; 385 TRACE_OUT(on_mp_write_session_write_request_read1); 386 return (0); 387 } 388 389 static int 390 on_mp_write_session_write_request_read2(struct query_state *qstate) 391 { 392 struct cache_mp_write_session_write_request *write_request; 393 ssize_t result; 394 395 TRACE_IN(on_mp_write_session_write_request_read2); 396 write_request = get_cache_mp_write_session_write_request( 397 &qstate->request); 398 399 result = qstate->read_func(qstate, write_request->data, 400 write_request->data_size); 401 402 if (result != qstate->kevent_watermark) { 403 LOG_ERR_3("on_mp_write_session_write_request_read2", 404 "read failed"); 405 TRACE_OUT(on_mp_write_session_write_request_read2); 406 return (-1); 407 } 408 409 qstate->kevent_watermark = 0; 410 qstate->process_func = on_mp_write_session_write_request_process; 411 TRACE_OUT(on_mp_write_session_write_request_read2); 412 return (0); 413 } 414 415 static int 416 on_mp_write_session_write_request_process(struct query_state *qstate) 417 { 418 struct cache_mp_write_session_write_request *write_request; 419 struct cache_mp_write_session_write_response *write_response; 420 421 TRACE_IN(on_mp_write_session_write_request_process); 422 init_comm_element(&qstate->response, 423 CET_MP_WRITE_SESSION_WRITE_RESPONSE); 424 write_response = get_cache_mp_write_session_write_response( 425 &qstate->response); 426 write_request = get_cache_mp_write_session_write_request( 427 &qstate->request); 428 429 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 430 write_response->error_code = cache_mp_write( 431 (cache_mp_write_session)qstate->mdata, 432 write_request->data, 433 write_request->data_size); 434 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 435 436 qstate->kevent_watermark = sizeof(int); 437 qstate->process_func = on_mp_write_session_write_response_write1; 438 qstate->kevent_filter = EVFILT_WRITE; 439 440 TRACE_OUT(on_mp_write_session_write_request_process); 441 return (0); 442 } 443 444 static int 445 on_mp_write_session_write_response_write1(struct query_state *qstate) 446 { 447 struct cache_mp_write_session_write_response *write_response; 448 ssize_t result; 449 450 TRACE_IN(on_mp_write_session_write_response_write1); 451 write_response = get_cache_mp_write_session_write_response( 452 &qstate->response); 453 result = qstate->write_func(qstate, &write_response->error_code, 454 sizeof(int)); 455 if (result != sizeof(int)) { 456 LOG_ERR_3("on_mp_write_session_write_response_write1", 457 "write failed"); 458 TRACE_OUT(on_mp_write_session_write_response_write1); 459 return (-1); 460 } 461 462 if (write_response->error_code == 0) { 463 finalize_comm_element(&qstate->request); 464 finalize_comm_element(&qstate->response); 465 466 qstate->kevent_watermark = sizeof(int); 467 qstate->process_func = on_mp_write_session_mapper; 468 qstate->kevent_filter = EVFILT_READ; 469 } else { 470 qstate->kevent_watermark = 0; 471 qstate->process_func = 0; 472 } 473 474 TRACE_OUT(on_mp_write_session_write_response_write1); 475 return (0); 476 } 477 478 /* 479 * Handles abandon notifications. Destroys the session by calling the 480 * abandon_cache_mp_write_session. 481 */ 482 static int 483 on_mp_write_session_abandon_notification(struct query_state *qstate) 484 { 485 TRACE_IN(on_mp_write_session_abandon_notification); 486 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 487 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 488 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 489 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 490 491 qstate->kevent_watermark = 0; 492 qstate->process_func = NULL; 493 TRACE_OUT(on_mp_write_session_abandon_notification); 494 return (0); 495 } 496 497 /* 498 * Handles close notifications. Commits the session by calling 499 * the close_cache_mp_write_session. 500 */ 501 static int 502 on_mp_write_session_close_notification(struct query_state *qstate) 503 { 504 TRACE_IN(on_mp_write_session_close_notification); 505 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 506 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata); 507 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 508 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION; 509 510 qstate->kevent_watermark = 0; 511 qstate->process_func = NULL; 512 TRACE_OUT(on_mp_write_session_close_notification); 513 return (0); 514 } 515 516 cache_entry register_new_mp_cache_entry(struct query_state *qstate, 517 const char *dec_cache_entry_name) 518 { 519 cache_entry c_entry; 520 char *en_bkp; 521 522 TRACE_IN(register_new_mp_cache_entry); 523 c_entry = INVALID_CACHE_ENTRY; 524 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 525 526 configuration_lock_wrlock(s_configuration); 527 en_bkp = qstate->config_entry->mp_cache_params.entry_name; 528 qstate->config_entry->mp_cache_params.entry_name = 529 (char *)dec_cache_entry_name; 530 register_cache_entry(s_cache, (struct cache_entry_params *) 531 &qstate->config_entry->mp_cache_params); 532 qstate->config_entry->mp_cache_params.entry_name = en_bkp; 533 configuration_unlock(s_configuration); 534 535 configuration_lock_rdlock(s_configuration); 536 c_entry = find_cache_entry(s_cache, 537 dec_cache_entry_name); 538 configuration_unlock(s_configuration); 539 540 configuration_entry_add_mp_cache_entry(qstate->config_entry, 541 c_entry); 542 543 configuration_unlock_entry(qstate->config_entry, 544 CELT_MULTIPART); 545 546 TRACE_OUT(register_new_mp_cache_entry); 547 return (c_entry); 548 } 549