1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/socket.h> 32 #include <sys/time.h> 33 #include <sys/types.h> 34 #include <sys/event.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <stdio.h> 40 41 #include "cachelib.h" 42 #include "config.h" 43 #include "debug.h" 44 #include "log.h" 45 #include "query.h" 46 #include "mp_rs_query.h" 47 #include "mp_ws_query.h" 48 #include "singletons.h" 49 50 static int on_mp_read_session_close_notification(struct query_state *); 51 static void on_mp_read_session_destroy(struct query_state *); 52 static int on_mp_read_session_mapper(struct query_state *); 53 /* int on_mp_read_session_request_read1(struct query_state *); */ 54 static int on_mp_read_session_request_read2(struct query_state *); 55 static int on_mp_read_session_request_process(struct query_state *); 56 static int on_mp_read_session_response_write1(struct query_state *); 57 static int on_mp_read_session_read_request_process(struct query_state *); 58 static int on_mp_read_session_read_response_write1(struct query_state *); 59 static int on_mp_read_session_read_response_write2(struct query_state *); 60 61 /* 62 * This function is used as the query_state's destroy_func to make the 63 * proper cleanup in case of errors. 64 */ 65 static void 66 on_mp_read_session_destroy(struct query_state *qstate) 67 { 68 TRACE_IN(on_mp_read_session_destroy); 69 finalize_comm_element(&qstate->request); 70 finalize_comm_element(&qstate->response); 71 72 if (qstate->mdata != NULL) { 73 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 74 close_cache_mp_read_session( 75 (cache_mp_read_session)qstate->mdata); 76 configuration_unlock_entry(qstate->config_entry, 77 CELT_MULTIPART); 78 } 79 TRACE_OUT(on_mp_read_session_destroy); 80 } 81 82 /* 83 * The functions below are used to process multipart read session initiation 84 * requests. 85 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 86 * the request itself 87 * - on_mp_read_session_request_process processes it 88 * - on_mp_read_session_response_write1 sends the response 89 */ 90 int 91 on_mp_read_session_request_read1(struct query_state *qstate) 92 { 93 struct cache_mp_read_session_request *c_mp_rs_request; 94 ssize_t result; 95 96 TRACE_IN(on_mp_read_session_request_read1); 97 if (qstate->kevent_watermark == 0) 98 qstate->kevent_watermark = sizeof(size_t); 99 else { 100 init_comm_element(&qstate->request, 101 CET_MP_READ_SESSION_REQUEST); 102 c_mp_rs_request = get_cache_mp_read_session_request( 103 &qstate->request); 104 105 result = qstate->read_func(qstate, 106 &c_mp_rs_request->entry_length, sizeof(size_t)); 107 108 if (result != sizeof(size_t)) { 109 TRACE_OUT(on_mp_read_session_request_read1); 110 return (-1); 111 } 112 113 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 114 TRACE_OUT(on_mp_read_session_request_read1); 115 return (-1); 116 } 117 118 c_mp_rs_request->entry = (char *)malloc( 119 c_mp_rs_request->entry_length + 1); 120 assert(c_mp_rs_request->entry != NULL); 121 memset(c_mp_rs_request->entry, 0, 122 c_mp_rs_request->entry_length + 1); 123 124 qstate->kevent_watermark = c_mp_rs_request->entry_length; 125 qstate->process_func = on_mp_read_session_request_read2; 126 } 127 TRACE_OUT(on_mp_read_session_request_read1); 128 return (0); 129 } 130 131 static int 132 on_mp_read_session_request_read2(struct query_state *qstate) 133 { 134 struct cache_mp_read_session_request *c_mp_rs_request; 135 ssize_t result; 136 137 TRACE_IN(on_mp_read_session_request_read2); 138 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 139 140 result = qstate->read_func(qstate, c_mp_rs_request->entry, 141 c_mp_rs_request->entry_length); 142 143 if (result != qstate->kevent_watermark) { 144 LOG_ERR_3("on_mp_read_session_request_read2", 145 "read failed"); 146 TRACE_OUT(on_mp_read_session_request_read2); 147 return (-1); 148 } 149 150 qstate->kevent_watermark = 0; 151 qstate->process_func = on_mp_read_session_request_process; 152 TRACE_OUT(on_mp_read_session_request_read2); 153 return (0); 154 } 155 156 static int 157 on_mp_read_session_request_process(struct query_state *qstate) 158 { 159 struct cache_mp_read_session_request *c_mp_rs_request; 160 struct cache_mp_read_session_response *c_mp_rs_response; 161 cache_mp_read_session rs; 162 cache_entry c_entry; 163 char *dec_cache_entry_name; 164 165 char *buffer; 166 size_t buffer_size; 167 cache_mp_write_session ws; 168 struct agent *lookup_agent; 169 struct multipart_agent *mp_agent; 170 void *mdata; 171 int res; 172 173 TRACE_IN(on_mp_read_session_request_process); 174 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 175 c_mp_rs_response = get_cache_mp_read_session_response( 176 &qstate->response); 177 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 178 179 qstate->config_entry = configuration_find_entry( 180 s_configuration, c_mp_rs_request->entry); 181 if (qstate->config_entry == NULL) { 182 c_mp_rs_response->error_code = ENOENT; 183 184 LOG_ERR_2("read_session_request", 185 "can't find configuration entry '%s'." 186 " aborting request", c_mp_rs_request->entry); 187 goto fin; 188 } 189 190 if (qstate->config_entry->enabled == 0) { 191 c_mp_rs_response->error_code = EACCES; 192 193 LOG_ERR_2("read_session_request", 194 "configuration entry '%s' is disabled", 195 c_mp_rs_request->entry); 196 goto fin; 197 } 198 199 if (qstate->config_entry->perform_actual_lookups != 0) 200 dec_cache_entry_name = strdup( 201 qstate->config_entry->mp_cache_params.entry_name); 202 else { 203 #ifdef NS_CACHED_EID_CHECKING 204 if (check_query_eids(qstate) != 0) { 205 c_mp_rs_response->error_code = EPERM; 206 goto fin; 207 } 208 #endif 209 210 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 211 qstate->config_entry->mp_cache_params.entry_name); 212 } 213 214 assert(dec_cache_entry_name != NULL); 215 216 configuration_lock_rdlock(s_configuration); 217 c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 218 configuration_unlock(s_configuration); 219 220 if ((c_entry == INVALID_CACHE) && 221 (qstate->config_entry->perform_actual_lookups != 0)) 222 c_entry = register_new_mp_cache_entry(qstate, 223 dec_cache_entry_name); 224 225 free(dec_cache_entry_name); 226 227 if (c_entry != INVALID_CACHE_ENTRY) { 228 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 229 rs = open_cache_mp_read_session(c_entry); 230 configuration_unlock_entry(qstate->config_entry, 231 CELT_MULTIPART); 232 233 if ((rs == INVALID_CACHE_MP_READ_SESSION) && 234 (qstate->config_entry->perform_actual_lookups != 0)) { 235 lookup_agent = find_agent(s_agent_table, 236 c_mp_rs_request->entry, MULTIPART_AGENT); 237 238 if ((lookup_agent != NULL) && 239 (lookup_agent->type == MULTIPART_AGENT)) { 240 mp_agent = (struct multipart_agent *) 241 lookup_agent; 242 mdata = mp_agent->mp_init_func(); 243 244 /* 245 * Multipart agents read the whole snapshot 246 * of the data at one time. 247 */ 248 configuration_lock_entry(qstate->config_entry, 249 CELT_MULTIPART); 250 ws = open_cache_mp_write_session(c_entry); 251 configuration_unlock_entry(qstate->config_entry, 252 CELT_MULTIPART); 253 if (ws != NULL) { 254 do { 255 buffer = NULL; 256 res = mp_agent->mp_lookup_func(&buffer, 257 &buffer_size, 258 mdata); 259 260 if ((res & NS_TERMINATE) && 261 (buffer != NULL)) { 262 configuration_lock_entry( 263 qstate->config_entry, 264 CELT_MULTIPART); 265 if (cache_mp_write(ws, buffer, 266 buffer_size) != 0) { 267 abandon_cache_mp_write_session(ws); 268 ws = NULL; 269 } 270 configuration_unlock_entry( 271 qstate->config_entry, 272 CELT_MULTIPART); 273 274 free(buffer); 275 buffer = NULL; 276 } else { 277 configuration_lock_entry( 278 qstate->config_entry, 279 CELT_MULTIPART); 280 close_cache_mp_write_session(ws); 281 configuration_unlock_entry( 282 qstate->config_entry, 283 CELT_MULTIPART); 284 285 free(buffer); 286 buffer = NULL; 287 } 288 } while ((res & NS_TERMINATE) && 289 (ws != NULL)); 290 } 291 292 configuration_lock_entry(qstate->config_entry, 293 CELT_MULTIPART); 294 rs = open_cache_mp_read_session(c_entry); 295 configuration_unlock_entry(qstate->config_entry, 296 CELT_MULTIPART); 297 } 298 } 299 300 if (rs == INVALID_CACHE_MP_READ_SESSION) 301 c_mp_rs_response->error_code = -1; 302 else { 303 qstate->mdata = rs; 304 qstate->destroy_func = on_mp_read_session_destroy; 305 306 configuration_lock_entry(qstate->config_entry, 307 CELT_MULTIPART); 308 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 309 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 310 memcpy(&qstate->timeout, 311 &qstate->config_entry->mp_query_timeout, 312 sizeof(struct timeval)); 313 configuration_unlock_entry(qstate->config_entry, 314 CELT_MULTIPART); 315 } 316 } else 317 c_mp_rs_response->error_code = -1; 318 319 fin: 320 qstate->process_func = on_mp_read_session_response_write1; 321 qstate->kevent_watermark = sizeof(int); 322 qstate->kevent_filter = EVFILT_WRITE; 323 324 TRACE_OUT(on_mp_read_session_request_process); 325 return (0); 326 } 327 328 static int 329 on_mp_read_session_response_write1(struct query_state *qstate) 330 { 331 struct cache_mp_read_session_response *c_mp_rs_response; 332 ssize_t result; 333 334 TRACE_IN(on_mp_read_session_response_write1); 335 c_mp_rs_response = get_cache_mp_read_session_response( 336 &qstate->response); 337 result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 338 sizeof(int)); 339 340 if (result != sizeof(int)) { 341 LOG_ERR_3("on_mp_read_session_response_write1", 342 "write failed"); 343 TRACE_OUT(on_mp_read_session_response_write1); 344 return (-1); 345 } 346 347 if (c_mp_rs_response->error_code == 0) { 348 qstate->kevent_watermark = sizeof(int); 349 qstate->process_func = on_mp_read_session_mapper; 350 qstate->kevent_filter = EVFILT_READ; 351 } else { 352 qstate->kevent_watermark = 0; 353 qstate->process_func = NULL; 354 } 355 TRACE_OUT(on_mp_read_session_response_write1); 356 return (0); 357 } 358 359 /* 360 * Mapper function is used to avoid multiple connections for each session 361 * write or read requests. After processing the request, it does not close 362 * the connection, but waits for the next request. 363 */ 364 static int 365 on_mp_read_session_mapper(struct query_state *qstate) 366 { 367 ssize_t result; 368 int elem_type; 369 370 TRACE_IN(on_mp_read_session_mapper); 371 if (qstate->kevent_watermark == 0) { 372 qstate->kevent_watermark = sizeof(int); 373 } else { 374 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 375 if (result != sizeof(int)) { 376 LOG_ERR_3("on_mp_read_session_mapper", 377 "read failed"); 378 TRACE_OUT(on_mp_read_session_mapper); 379 return (-1); 380 } 381 382 switch (elem_type) { 383 case CET_MP_READ_SESSION_READ_REQUEST: 384 qstate->kevent_watermark = 0; 385 qstate->process_func = 386 on_mp_read_session_read_request_process; 387 break; 388 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 389 qstate->kevent_watermark = 0; 390 qstate->process_func = 391 on_mp_read_session_close_notification; 392 break; 393 default: 394 qstate->kevent_watermark = 0; 395 qstate->process_func = NULL; 396 LOG_ERR_3("on_mp_read_session_mapper", 397 "unknown element type"); 398 TRACE_OUT(on_mp_read_session_mapper); 399 return (-1); 400 } 401 } 402 TRACE_OUT(on_mp_read_session_mapper); 403 return (0); 404 } 405 406 /* 407 * The functions below are used to process multipart read sessions read 408 * requests. User doesn't have to pass any kind of data, besides the 409 * request identificator itself. So we don't need any XXX_read functions and 410 * start with the XXX_process function. 411 * - on_mp_read_session_read_request_process processes it 412 * - on_mp_read_session_read_response_write1 and 413 * on_mp_read_session_read_response_write2 sends the response 414 */ 415 static int 416 on_mp_read_session_read_request_process(struct query_state *qstate) 417 { 418 struct cache_mp_read_session_read_response *read_response; 419 420 TRACE_IN(on_mp_read_session_response_process); 421 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 422 read_response = get_cache_mp_read_session_read_response( 423 &qstate->response); 424 425 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 426 read_response->error_code = cache_mp_read( 427 (cache_mp_read_session)qstate->mdata, NULL, 428 &read_response->data_size); 429 430 if (read_response->error_code == 0) { 431 read_response->data = (char *)malloc(read_response->data_size); 432 assert(read_response != NULL); 433 read_response->error_code = cache_mp_read( 434 (cache_mp_read_session)qstate->mdata, 435 read_response->data, 436 &read_response->data_size); 437 } 438 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 439 440 if (read_response->error_code == 0) 441 qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 442 else 443 qstate->kevent_watermark = sizeof(int); 444 qstate->process_func = on_mp_read_session_read_response_write1; 445 qstate->kevent_filter = EVFILT_WRITE; 446 447 TRACE_OUT(on_mp_read_session_response_process); 448 return (0); 449 } 450 451 static int 452 on_mp_read_session_read_response_write1(struct query_state *qstate) 453 { 454 struct cache_mp_read_session_read_response *read_response; 455 ssize_t result; 456 457 TRACE_IN(on_mp_read_session_read_response_write1); 458 read_response = get_cache_mp_read_session_read_response( 459 &qstate->response); 460 461 result = qstate->write_func(qstate, &read_response->error_code, 462 sizeof(int)); 463 if (read_response->error_code == 0) { 464 result += qstate->write_func(qstate, &read_response->data_size, 465 sizeof(size_t)); 466 if (result != qstate->kevent_watermark) { 467 TRACE_OUT(on_mp_read_session_read_response_write1); 468 LOG_ERR_3("on_mp_read_session_read_response_write1", 469 "write failed"); 470 return (-1); 471 } 472 473 qstate->kevent_watermark = read_response->data_size; 474 qstate->process_func = on_mp_read_session_read_response_write2; 475 } else { 476 if (result != qstate->kevent_watermark) { 477 LOG_ERR_3("on_mp_read_session_read_response_write1", 478 "write failed"); 479 TRACE_OUT(on_mp_read_session_read_response_write1); 480 return (-1); 481 } 482 483 qstate->kevent_watermark = 0; 484 qstate->process_func = NULL; 485 } 486 487 TRACE_OUT(on_mp_read_session_read_response_write1); 488 return (0); 489 } 490 491 static int 492 on_mp_read_session_read_response_write2(struct query_state *qstate) 493 { 494 struct cache_mp_read_session_read_response *read_response; 495 ssize_t result; 496 497 TRACE_IN(on_mp_read_session_read_response_write2); 498 read_response = get_cache_mp_read_session_read_response( 499 &qstate->response); 500 result = qstate->write_func(qstate, read_response->data, 501 read_response->data_size); 502 if (result != qstate->kevent_watermark) { 503 LOG_ERR_3("on_mp_read_session_read_response_write2", 504 "write failed"); 505 TRACE_OUT(on_mp_read_session_read_response_write2); 506 return (-1); 507 } 508 509 finalize_comm_element(&qstate->request); 510 finalize_comm_element(&qstate->response); 511 512 qstate->kevent_watermark = sizeof(int); 513 qstate->process_func = on_mp_read_session_mapper; 514 qstate->kevent_filter = EVFILT_READ; 515 516 TRACE_OUT(on_mp_read_session_read_response_write2); 517 return (0); 518 } 519 520 /* 521 * Handles session close notification by calling close_cache_mp_read_session 522 * function. 523 */ 524 static int 525 on_mp_read_session_close_notification(struct query_state *qstate) 526 { 527 528 TRACE_IN(on_mp_read_session_close_notification); 529 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 530 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 531 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 532 qstate->mdata = NULL; 533 qstate->kevent_watermark = 0; 534 qstate->process_func = NULL; 535 TRACE_OUT(on_mp_read_session_close_notification); 536 return (0); 537 } 538