1 /* 2 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "event2/event-config.h" 28 #include "evconfig-private.h" 29 30 #ifdef _WIN32 31 #define WIN32_LEAN_AND_MEAN 32 #include <winsock2.h> 33 #include <windows.h> 34 #undef WIN32_LEAN_AND_MEAN 35 #endif 36 37 #include <sys/types.h> 38 #ifndef _WIN32 39 #include <sys/socket.h> 40 #endif 41 #ifdef EVENT__HAVE_SYS_TIME_H 42 #include <sys/time.h> 43 #endif 44 #include <sys/queue.h> 45 #include <stdio.h> 46 #include <stdlib.h> 47 #ifndef _WIN32 48 #include <unistd.h> 49 #endif 50 #include <errno.h> 51 #include <signal.h> 52 #include <string.h> 53 54 #include <sys/queue.h> 55 56 #include "event2/event.h" 57 #include "event2/event_struct.h" 58 #include "event2/rpc.h" 59 #include "event2/rpc_struct.h" 60 #include "evrpc-internal.h" 61 #include "event2/http.h" 62 #include "event2/buffer.h" 63 #include "event2/tag.h" 64 #include "event2/http_struct.h" 65 #include "event2/http_compat.h" 66 #include "event2/util.h" 67 #include "util-internal.h" 68 #include "log-internal.h" 69 #include "mm-internal.h" 70 71 struct evrpc_base * 72 evrpc_init(struct evhttp *http_server) 73 { 74 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 75 if (base == NULL) 76 return (NULL); 77 78 /* we rely on the tagging sub system */ 79 evtag_init(); 80 81 TAILQ_INIT(&base->registered_rpcs); 82 TAILQ_INIT(&base->input_hooks); 83 TAILQ_INIT(&base->output_hooks); 84 85 TAILQ_INIT(&base->paused_requests); 86 87 base->http_server = http_server; 88 89 return (base); 90 } 91 92 void 93 evrpc_free(struct evrpc_base *base) 94 { 95 struct evrpc *rpc; 96 struct evrpc_hook *hook; 97 struct evrpc_hook_ctx *pause; 98 int r; 99 100 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 101 r = evrpc_unregister_rpc(base, rpc->uri); 102 EVUTIL_ASSERT(r == 0); 103 } 104 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 105 TAILQ_REMOVE(&base->paused_requests, pause, next); 106 mm_free(pause); 107 } 108 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 109 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 110 EVUTIL_ASSERT(r); 111 } 112 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 113 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 114 EVUTIL_ASSERT(r); 115 } 116 mm_free(base); 117 } 118 119 void * 120 evrpc_add_hook(void *vbase, 121 enum EVRPC_HOOK_TYPE hook_type, 122 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 123 void *cb_arg) 124 { 125 struct evrpc_hooks_ *base = vbase; 126 struct evrpc_hook_list *head = NULL; 127 struct evrpc_hook *hook = NULL; 128 switch (hook_type) { 129 case EVRPC_INPUT: 130 head = &base->in_hooks; 131 break; 132 case EVRPC_OUTPUT: 133 head = &base->out_hooks; 134 break; 135 default: 136 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 137 } 138 139 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 140 EVUTIL_ASSERT(hook != NULL); 141 142 hook->process = cb; 143 hook->process_arg = cb_arg; 144 TAILQ_INSERT_TAIL(head, hook, next); 145 146 return (hook); 147 } 148 149 static int 150 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 151 { 152 struct evrpc_hook *hook = NULL; 153 TAILQ_FOREACH(hook, head, next) { 154 if (hook == handle) { 155 TAILQ_REMOVE(head, hook, next); 156 mm_free(hook); 157 return (1); 158 } 159 } 160 161 return (0); 162 } 163 164 /* 165 * remove the hook specified by the handle 166 */ 167 168 int 169 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 170 { 171 struct evrpc_hooks_ *base = vbase; 172 struct evrpc_hook_list *head = NULL; 173 switch (hook_type) { 174 case EVRPC_INPUT: 175 head = &base->in_hooks; 176 break; 177 case EVRPC_OUTPUT: 178 head = &base->out_hooks; 179 break; 180 default: 181 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 182 } 183 184 return (evrpc_remove_hook_internal(head, handle)); 185 } 186 187 static int 188 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 189 struct evhttp_request *req, struct evbuffer *evbuf) 190 { 191 struct evrpc_hook *hook; 192 TAILQ_FOREACH(hook, head, next) { 193 int res = hook->process(ctx, req, evbuf, hook->process_arg); 194 if (res != EVRPC_CONTINUE) 195 return (res); 196 } 197 198 return (EVRPC_CONTINUE); 199 } 200 201 static void evrpc_pool_schedule(struct evrpc_pool *pool); 202 static void evrpc_request_cb(struct evhttp_request *, void *); 203 204 /* 205 * Registers a new RPC with the HTTP server. The evrpc object is expected 206 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 207 * calls this function. 208 */ 209 210 static char * 211 evrpc_construct_uri(const char *uri) 212 { 213 char *constructed_uri; 214 size_t constructed_uri_len; 215 216 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 217 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 218 event_err(1, "%s: failed to register rpc at %s", 219 __func__, uri); 220 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 221 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 222 constructed_uri[constructed_uri_len - 1] = '\0'; 223 224 return (constructed_uri); 225 } 226 227 int 228 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 229 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 230 { 231 char *constructed_uri = evrpc_construct_uri(rpc->uri); 232 233 rpc->base = base; 234 rpc->cb = cb; 235 rpc->cb_arg = cb_arg; 236 237 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 238 239 evhttp_set_cb(base->http_server, 240 constructed_uri, 241 evrpc_request_cb, 242 rpc); 243 244 mm_free(constructed_uri); 245 246 return (0); 247 } 248 249 int 250 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 251 { 252 char *registered_uri = NULL; 253 struct evrpc *rpc; 254 int r; 255 256 /* find the right rpc; linear search might be slow */ 257 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 258 if (strcmp(rpc->uri, name) == 0) 259 break; 260 } 261 if (rpc == NULL) { 262 /* We did not find an RPC with this name */ 263 return (-1); 264 } 265 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 266 267 registered_uri = evrpc_construct_uri(name); 268 269 /* remove the http server callback */ 270 r = evhttp_del_cb(base->http_server, registered_uri); 271 EVUTIL_ASSERT(r == 0); 272 273 mm_free(registered_uri); 274 275 mm_free((char *)rpc->uri); 276 mm_free(rpc); 277 return (0); 278 } 279 280 static int evrpc_pause_request(void *vbase, void *ctx, 281 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 282 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 283 284 static void 285 evrpc_request_cb(struct evhttp_request *req, void *arg) 286 { 287 struct evrpc *rpc = arg; 288 struct evrpc_req_generic *rpc_state = NULL; 289 290 /* let's verify the outside parameters */ 291 if (req->type != EVHTTP_REQ_POST || 292 evbuffer_get_length(req->input_buffer) <= 0) 293 goto error; 294 295 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 296 if (rpc_state == NULL) 297 goto error; 298 rpc_state->rpc = rpc; 299 rpc_state->http_req = req; 300 rpc_state->rpc_data = NULL; 301 302 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 303 int hook_res; 304 305 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 306 307 /* 308 * allow hooks to modify the outgoing request 309 */ 310 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 311 rpc_state, req, req->input_buffer); 312 switch (hook_res) { 313 case EVRPC_TERMINATE: 314 goto error; 315 case EVRPC_PAUSE: 316 evrpc_pause_request(rpc->base, rpc_state, 317 evrpc_request_cb_closure); 318 return; 319 case EVRPC_CONTINUE: 320 break; 321 default: 322 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 323 hook_res == EVRPC_CONTINUE || 324 hook_res == EVRPC_PAUSE); 325 } 326 } 327 328 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 329 return; 330 331 error: 332 if (rpc_state) 333 evrpc_reqstate_free_(rpc_state); 334 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 335 return; 336 } 337 338 static void 339 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 340 { 341 struct evrpc_req_generic *rpc_state = arg; 342 struct evrpc *rpc; 343 struct evhttp_request *req; 344 345 EVUTIL_ASSERT(rpc_state); 346 rpc = rpc_state->rpc; 347 req = rpc_state->http_req; 348 349 if (hook_res == EVRPC_TERMINATE) 350 goto error; 351 352 /* let's check that we can parse the request */ 353 rpc_state->request = rpc->request_new(rpc->request_new_arg); 354 if (rpc_state->request == NULL) 355 goto error; 356 357 if (rpc->request_unmarshal( 358 rpc_state->request, req->input_buffer) == -1) { 359 /* we failed to parse the request; that's a bummer */ 360 goto error; 361 } 362 363 /* at this point, we have a well formed request, prepare the reply */ 364 365 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 366 if (rpc_state->reply == NULL) 367 goto error; 368 369 /* give the rpc to the user; they can deal with it */ 370 rpc->cb(rpc_state, rpc->cb_arg); 371 372 return; 373 374 error: 375 evrpc_reqstate_free_(rpc_state); 376 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 377 return; 378 } 379 380 381 void 382 evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 383 { 384 struct evrpc *rpc; 385 EVUTIL_ASSERT(rpc_state != NULL); 386 rpc = rpc_state->rpc; 387 388 /* clean up all memory */ 389 if (rpc_state->hook_meta != NULL) 390 evrpc_hook_context_free_(rpc_state->hook_meta); 391 if (rpc_state->request != NULL) 392 rpc->request_free(rpc_state->request); 393 if (rpc_state->reply != NULL) 394 rpc->reply_free(rpc_state->reply); 395 if (rpc_state->rpc_data != NULL) 396 evbuffer_free(rpc_state->rpc_data); 397 mm_free(rpc_state); 398 } 399 400 static void 401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 402 403 void 404 evrpc_request_done(struct evrpc_req_generic *rpc_state) 405 { 406 struct evhttp_request *req; 407 struct evrpc *rpc; 408 409 EVUTIL_ASSERT(rpc_state); 410 411 req = rpc_state->http_req; 412 rpc = rpc_state->rpc; 413 414 if (rpc->reply_complete(rpc_state->reply) == -1) { 415 /* the reply was not completely filled in. error out */ 416 goto error; 417 } 418 419 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 420 /* out of memory */ 421 goto error; 422 } 423 424 /* serialize the reply */ 425 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 426 427 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 428 int hook_res; 429 430 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 431 432 /* do hook based tweaks to the request */ 433 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 434 rpc_state, req, rpc_state->rpc_data); 435 switch (hook_res) { 436 case EVRPC_TERMINATE: 437 goto error; 438 case EVRPC_PAUSE: 439 if (evrpc_pause_request(rpc->base, rpc_state, 440 evrpc_request_done_closure) == -1) 441 goto error; 442 return; 443 case EVRPC_CONTINUE: 444 break; 445 default: 446 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 447 hook_res == EVRPC_CONTINUE || 448 hook_res == EVRPC_PAUSE); 449 } 450 } 451 452 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 453 return; 454 455 error: 456 evrpc_reqstate_free_(rpc_state); 457 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 458 return; 459 } 460 461 void * 462 evrpc_get_request(struct evrpc_req_generic *req) 463 { 464 return req->request; 465 } 466 467 void * 468 evrpc_get_reply(struct evrpc_req_generic *req) 469 { 470 return req->reply; 471 } 472 473 static void 474 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 475 { 476 struct evrpc_req_generic *rpc_state = arg; 477 struct evhttp_request *req; 478 EVUTIL_ASSERT(rpc_state); 479 req = rpc_state->http_req; 480 481 if (hook_res == EVRPC_TERMINATE) 482 goto error; 483 484 /* on success, we are going to transmit marshaled binary data */ 485 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 486 evhttp_add_header(req->output_headers, 487 "Content-Type", "application/octet-stream"); 488 } 489 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 490 491 evrpc_reqstate_free_(rpc_state); 492 493 return; 494 495 error: 496 evrpc_reqstate_free_(rpc_state); 497 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 498 return; 499 } 500 501 502 /* Client implementation of RPC site */ 503 504 static int evrpc_schedule_request(struct evhttp_connection *connection, 505 struct evrpc_request_wrapper *ctx); 506 507 struct evrpc_pool * 508 evrpc_pool_new(struct event_base *base) 509 { 510 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 511 if (pool == NULL) 512 return (NULL); 513 514 TAILQ_INIT(&pool->connections); 515 TAILQ_INIT(&pool->requests); 516 517 TAILQ_INIT(&pool->paused_requests); 518 519 TAILQ_INIT(&pool->input_hooks); 520 TAILQ_INIT(&pool->output_hooks); 521 522 pool->base = base; 523 pool->timeout = -1; 524 525 return (pool); 526 } 527 528 static void 529 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 530 { 531 if (request->hook_meta != NULL) 532 evrpc_hook_context_free_(request->hook_meta); 533 mm_free(request->name); 534 mm_free(request); 535 } 536 537 void 538 evrpc_pool_free(struct evrpc_pool *pool) 539 { 540 struct evhttp_connection *connection; 541 struct evrpc_request_wrapper *request; 542 struct evrpc_hook_ctx *pause; 543 struct evrpc_hook *hook; 544 int r; 545 546 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 547 TAILQ_REMOVE(&pool->requests, request, next); 548 evrpc_request_wrapper_free(request); 549 } 550 551 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 552 TAILQ_REMOVE(&pool->paused_requests, pause, next); 553 mm_free(pause); 554 } 555 556 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 557 TAILQ_REMOVE(&pool->connections, connection, next); 558 evhttp_connection_free(connection); 559 } 560 561 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 562 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 563 EVUTIL_ASSERT(r); 564 } 565 566 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 567 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 568 EVUTIL_ASSERT(r); 569 } 570 571 mm_free(pool); 572 } 573 574 /* 575 * Add a connection to the RPC pool. A request scheduled on the pool 576 * may use any available connection. 577 */ 578 579 void 580 evrpc_pool_add_connection(struct evrpc_pool *pool, 581 struct evhttp_connection *connection) 582 { 583 EVUTIL_ASSERT(connection->http_server == NULL); 584 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 585 586 /* 587 * associate an event base with this connection 588 */ 589 if (pool->base != NULL) 590 evhttp_connection_set_base(connection, pool->base); 591 592 /* 593 * unless a timeout was specifically set for a connection, 594 * the connection inherits the timeout from the pool. 595 */ 596 if (!evutil_timerisset(&connection->timeout)) 597 evhttp_connection_set_timeout(connection, pool->timeout); 598 599 /* 600 * if we have any requests pending, schedule them with the new 601 * connections. 602 */ 603 604 if (TAILQ_FIRST(&pool->requests) != NULL) { 605 struct evrpc_request_wrapper *request = 606 TAILQ_FIRST(&pool->requests); 607 TAILQ_REMOVE(&pool->requests, request, next); 608 evrpc_schedule_request(connection, request); 609 } 610 } 611 612 void 613 evrpc_pool_remove_connection(struct evrpc_pool *pool, 614 struct evhttp_connection *connection) 615 { 616 TAILQ_REMOVE(&pool->connections, connection, next); 617 } 618 619 void 620 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 621 { 622 struct evhttp_connection *evcon; 623 TAILQ_FOREACH(evcon, &pool->connections, next) { 624 evhttp_connection_set_timeout(evcon, timeout_in_secs); 625 } 626 pool->timeout = timeout_in_secs; 627 } 628 629 630 static void evrpc_reply_done(struct evhttp_request *, void *); 631 static void evrpc_request_timeout(evutil_socket_t, short, void *); 632 633 /* 634 * Finds a connection object associated with the pool that is currently 635 * idle and can be used to make a request. 636 */ 637 static struct evhttp_connection * 638 evrpc_pool_find_connection(struct evrpc_pool *pool) 639 { 640 struct evhttp_connection *connection; 641 TAILQ_FOREACH(connection, &pool->connections, next) { 642 if (TAILQ_FIRST(&connection->requests) == NULL) 643 return (connection); 644 } 645 646 return (NULL); 647 } 648 649 /* 650 * Prototypes responsible for evrpc scheduling and hooking 651 */ 652 653 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 654 655 /* 656 * We assume that the ctx is no longer queued on the pool. 657 */ 658 static int 659 evrpc_schedule_request(struct evhttp_connection *connection, 660 struct evrpc_request_wrapper *ctx) 661 { 662 struct evhttp_request *req = NULL; 663 struct evrpc_pool *pool = ctx->pool; 664 struct evrpc_status status; 665 666 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 667 goto error; 668 669 /* serialize the request data into the output buffer */ 670 ctx->request_marshal(req->output_buffer, ctx->request); 671 672 /* we need to know the connection that we might have to abort */ 673 ctx->evcon = connection; 674 675 /* if we get paused we also need to know the request */ 676 ctx->req = req; 677 678 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 679 int hook_res; 680 681 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 682 683 /* apply hooks to the outgoing request */ 684 hook_res = evrpc_process_hooks(&pool->output_hooks, 685 ctx, req, req->output_buffer); 686 687 switch (hook_res) { 688 case EVRPC_TERMINATE: 689 goto error; 690 case EVRPC_PAUSE: 691 /* we need to be explicitly resumed */ 692 if (evrpc_pause_request(pool, ctx, 693 evrpc_schedule_request_closure) == -1) 694 goto error; 695 return (0); 696 case EVRPC_CONTINUE: 697 /* we can just continue */ 698 break; 699 default: 700 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 701 hook_res == EVRPC_CONTINUE || 702 hook_res == EVRPC_PAUSE); 703 } 704 } 705 706 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 707 return (0); 708 709 error: 710 memset(&status, 0, sizeof(status)); 711 status.error = EVRPC_STATUS_ERR_UNSTARTED; 712 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 713 evrpc_request_wrapper_free(ctx); 714 return (-1); 715 } 716 717 static void 718 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 719 { 720 struct evrpc_request_wrapper *ctx = arg; 721 struct evhttp_connection *connection = ctx->evcon; 722 struct evhttp_request *req = ctx->req; 723 struct evrpc_pool *pool = ctx->pool; 724 struct evrpc_status status; 725 char *uri = NULL; 726 int res = 0; 727 728 if (hook_res == EVRPC_TERMINATE) 729 goto error; 730 731 uri = evrpc_construct_uri(ctx->name); 732 if (uri == NULL) 733 goto error; 734 735 if (pool->timeout > 0) { 736 /* 737 * a timeout after which the whole rpc is going to be aborted. 738 */ 739 struct timeval tv; 740 evutil_timerclear(&tv); 741 tv.tv_sec = pool->timeout; 742 evtimer_add(&ctx->ev_timeout, &tv); 743 } 744 745 /* start the request over the connection */ 746 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 747 mm_free(uri); 748 749 if (res == -1) 750 goto error; 751 752 return; 753 754 error: 755 memset(&status, 0, sizeof(status)); 756 status.error = EVRPC_STATUS_ERR_UNSTARTED; 757 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 758 evrpc_request_wrapper_free(ctx); 759 } 760 761 /* we just queue the paused request on the pool under the req object */ 762 static int 763 evrpc_pause_request(void *vbase, void *ctx, 764 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 765 { 766 struct evrpc_hooks_ *base = vbase; 767 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 768 if (pause == NULL) 769 return (-1); 770 771 pause->ctx = ctx; 772 pause->cb = cb; 773 774 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 775 return (0); 776 } 777 778 int 779 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 780 { 781 struct evrpc_hooks_ *base = vbase; 782 struct evrpc_pause_list *head = &base->pause_requests; 783 struct evrpc_hook_ctx *pause; 784 785 TAILQ_FOREACH(pause, head, next) { 786 if (pause->ctx == ctx) 787 break; 788 } 789 790 if (pause == NULL) 791 return (-1); 792 793 (*pause->cb)(pause->ctx, res); 794 TAILQ_REMOVE(head, pause, next); 795 mm_free(pause); 796 return (0); 797 } 798 799 int 800 evrpc_make_request(struct evrpc_request_wrapper *ctx) 801 { 802 struct evrpc_pool *pool = ctx->pool; 803 804 /* initialize the event structure for this rpc */ 805 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 806 807 /* we better have some available connections on the pool */ 808 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 809 810 /* 811 * if no connection is available, we queue the request on the pool, 812 * the next time a connection is empty, the rpc will be send on that. 813 */ 814 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 815 816 evrpc_pool_schedule(pool); 817 818 return (0); 819 } 820 821 822 struct evrpc_request_wrapper * 823 evrpc_make_request_ctx( 824 struct evrpc_pool *pool, void *request, void *reply, 825 const char *rpcname, 826 void (*req_marshal)(struct evbuffer*, void *), 827 void (*rpl_clear)(void *), 828 int (*rpl_unmarshal)(void *, struct evbuffer *), 829 void (*cb)(struct evrpc_status *, void *, void *, void *), 830 void *cbarg) 831 { 832 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 833 mm_malloc(sizeof(struct evrpc_request_wrapper)); 834 if (ctx == NULL) 835 return (NULL); 836 837 ctx->pool = pool; 838 ctx->hook_meta = NULL; 839 ctx->evcon = NULL; 840 ctx->name = mm_strdup(rpcname); 841 if (ctx->name == NULL) { 842 mm_free(ctx); 843 return (NULL); 844 } 845 ctx->cb = cb; 846 ctx->cb_arg = cbarg; 847 ctx->request = request; 848 ctx->reply = reply; 849 ctx->request_marshal = req_marshal; 850 ctx->reply_clear = rpl_clear; 851 ctx->reply_unmarshal = rpl_unmarshal; 852 853 return (ctx); 854 } 855 856 static void 857 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 858 859 static void 860 evrpc_reply_done(struct evhttp_request *req, void *arg) 861 { 862 struct evrpc_request_wrapper *ctx = arg; 863 struct evrpc_pool *pool = ctx->pool; 864 int hook_res = EVRPC_CONTINUE; 865 866 /* cancel any timeout we might have scheduled */ 867 event_del(&ctx->ev_timeout); 868 869 ctx->req = req; 870 871 /* we need to get the reply now */ 872 if (req == NULL) { 873 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 874 return; 875 } 876 877 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 878 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 879 880 /* apply hooks to the incoming request */ 881 hook_res = evrpc_process_hooks(&pool->input_hooks, 882 ctx, req, req->input_buffer); 883 884 switch (hook_res) { 885 case EVRPC_TERMINATE: 886 case EVRPC_CONTINUE: 887 break; 888 case EVRPC_PAUSE: 889 /* 890 * if we get paused we also need to know the 891 * request. unfortunately, the underlying 892 * layer is going to free it. we need to 893 * request ownership explicitly 894 */ 895 evhttp_request_own(req); 896 897 evrpc_pause_request(pool, ctx, 898 evrpc_reply_done_closure); 899 return; 900 default: 901 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 902 hook_res == EVRPC_CONTINUE || 903 hook_res == EVRPC_PAUSE); 904 } 905 } 906 907 evrpc_reply_done_closure(ctx, hook_res); 908 909 /* http request is being freed by underlying layer */ 910 } 911 912 static void 913 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 914 { 915 struct evrpc_request_wrapper *ctx = arg; 916 struct evhttp_request *req = ctx->req; 917 struct evrpc_pool *pool = ctx->pool; 918 struct evrpc_status status; 919 int res = -1; 920 921 memset(&status, 0, sizeof(status)); 922 status.http_req = req; 923 924 /* we need to get the reply now */ 925 if (req == NULL) { 926 status.error = EVRPC_STATUS_ERR_TIMEOUT; 927 } else if (hook_res == EVRPC_TERMINATE) { 928 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 929 } else { 930 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 931 if (res == -1) 932 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 933 } 934 935 if (res == -1) { 936 /* clear everything that we might have written previously */ 937 ctx->reply_clear(ctx->reply); 938 } 939 940 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 941 942 evrpc_request_wrapper_free(ctx); 943 944 /* the http layer owned the original request structure, but if we 945 * got paused, we asked for ownership and need to free it here. */ 946 if (req != NULL && evhttp_request_is_owned(req)) 947 evhttp_request_free(req); 948 949 /* see if we can schedule another request */ 950 evrpc_pool_schedule(pool); 951 } 952 953 static void 954 evrpc_pool_schedule(struct evrpc_pool *pool) 955 { 956 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 957 struct evhttp_connection *evcon; 958 959 /* if no requests are pending, we have no work */ 960 if (ctx == NULL) 961 return; 962 963 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 964 TAILQ_REMOVE(&pool->requests, ctx, next); 965 evrpc_schedule_request(evcon, ctx); 966 } 967 } 968 969 static void 970 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 971 { 972 struct evrpc_request_wrapper *ctx = arg; 973 struct evhttp_connection *evcon = ctx->evcon; 974 EVUTIL_ASSERT(evcon != NULL); 975 976 evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 977 } 978 979 /* 980 * frees potential meta data associated with a request. 981 */ 982 983 static void 984 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 985 { 986 struct evrpc_meta *entry; 987 EVUTIL_ASSERT(meta_data != NULL); 988 989 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 990 TAILQ_REMOVE(meta_data, entry, next); 991 mm_free(entry->key); 992 mm_free(entry->data); 993 mm_free(entry); 994 } 995 } 996 997 static struct evrpc_hook_meta * 998 evrpc_hook_meta_new_(void) 999 { 1000 struct evrpc_hook_meta *ctx; 1001 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1002 EVUTIL_ASSERT(ctx != NULL); 1003 1004 TAILQ_INIT(&ctx->meta_data); 1005 ctx->evcon = NULL; 1006 1007 return (ctx); 1008 } 1009 1010 static void 1011 evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1012 struct evhttp_connection *evcon) 1013 { 1014 struct evrpc_hook_meta *ctx = *pctx; 1015 if (ctx == NULL) 1016 *pctx = ctx = evrpc_hook_meta_new_(); 1017 ctx->evcon = evcon; 1018 } 1019 1020 static void 1021 evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1022 { 1023 evrpc_meta_data_free(&ctx->meta_data); 1024 mm_free(ctx); 1025 } 1026 1027 /* Adds meta data */ 1028 void 1029 evrpc_hook_add_meta(void *ctx, const char *key, 1030 const void *data, size_t data_size) 1031 { 1032 struct evrpc_request_wrapper *req = ctx; 1033 struct evrpc_hook_meta *store = NULL; 1034 struct evrpc_meta *meta = NULL; 1035 1036 if ((store = req->hook_meta) == NULL) 1037 store = req->hook_meta = evrpc_hook_meta_new_(); 1038 1039 meta = mm_malloc(sizeof(struct evrpc_meta)); 1040 EVUTIL_ASSERT(meta != NULL); 1041 meta->key = mm_strdup(key); 1042 EVUTIL_ASSERT(meta->key != NULL); 1043 meta->data_size = data_size; 1044 meta->data = mm_malloc(data_size); 1045 EVUTIL_ASSERT(meta->data != NULL); 1046 memcpy(meta->data, data, data_size); 1047 1048 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1049 } 1050 1051 int 1052 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1053 { 1054 struct evrpc_request_wrapper *req = ctx; 1055 struct evrpc_meta *meta = NULL; 1056 1057 if (req->hook_meta == NULL) 1058 return (-1); 1059 1060 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1061 if (strcmp(meta->key, key) == 0) { 1062 *data = meta->data; 1063 *data_size = meta->data_size; 1064 return (0); 1065 } 1066 } 1067 1068 return (-1); 1069 } 1070 1071 struct evhttp_connection * 1072 evrpc_hook_get_connection(void *ctx) 1073 { 1074 struct evrpc_request_wrapper *req = ctx; 1075 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1076 } 1077 1078 int 1079 evrpc_send_request_generic(struct evrpc_pool *pool, 1080 void *request, void *reply, 1081 void (*cb)(struct evrpc_status *, void *, void *, void *), 1082 void *cb_arg, 1083 const char *rpcname, 1084 void (*req_marshal)(struct evbuffer *, void *), 1085 void (*rpl_clear)(void *), 1086 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1087 { 1088 struct evrpc_status status; 1089 struct evrpc_request_wrapper *ctx; 1090 ctx = evrpc_make_request_ctx(pool, request, reply, 1091 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1092 if (ctx == NULL) 1093 goto error; 1094 return (evrpc_make_request(ctx)); 1095 error: 1096 memset(&status, 0, sizeof(status)); 1097 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1098 (*(cb))(&status, request, reply, cb_arg); 1099 return (-1); 1100 } 1101 1102 /** Takes a request object and fills it in with the right magic */ 1103 static struct evrpc * 1104 evrpc_register_object(const char *name, 1105 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1106 int (*req_unmarshal)(void *, struct evbuffer *), 1107 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1108 int (*rpl_complete)(void *), 1109 void (*rpl_marshal)(struct evbuffer *, void *)) 1110 { 1111 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1112 if (rpc == NULL) 1113 return (NULL); 1114 rpc->uri = mm_strdup(name); 1115 if (rpc->uri == NULL) { 1116 mm_free(rpc); 1117 return (NULL); 1118 } 1119 rpc->request_new = req_new; 1120 rpc->request_new_arg = req_new_arg; 1121 rpc->request_free = req_free; 1122 rpc->request_unmarshal = req_unmarshal; 1123 rpc->reply_new = rpl_new; 1124 rpc->reply_new_arg = rpl_new_arg; 1125 rpc->reply_free = rpl_free; 1126 rpc->reply_complete = rpl_complete; 1127 rpc->reply_marshal = rpl_marshal; 1128 return (rpc); 1129 } 1130 1131 int 1132 evrpc_register_generic(struct evrpc_base *base, const char *name, 1133 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1134 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1135 int (*req_unmarshal)(void *, struct evbuffer *), 1136 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1137 int (*rpl_complete)(void *), 1138 void (*rpl_marshal)(struct evbuffer *, void *)) 1139 { 1140 struct evrpc* rpc = 1141 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1142 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1143 if (rpc == NULL) 1144 return (-1); 1145 evrpc_register_rpc(base, rpc, 1146 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1147 return (0); 1148 } 1149 1150 /** accessors for obscure and undocumented functionality */ 1151 struct evrpc_pool * 1152 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1153 { 1154 return (ctx->pool); 1155 } 1156 1157 void 1158 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1159 struct evrpc_pool *pool) 1160 { 1161 ctx->pool = pool; 1162 } 1163 1164 void 1165 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1166 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1167 void *cb_arg) 1168 { 1169 ctx->cb = cb; 1170 ctx->cb_arg = cb_arg; 1171 } 1172