1 /* 2 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "event2/event-config.h" 28 #include "evconfig-private.h" 29 30 #ifdef _WIN32 31 #define WIN32_LEAN_AND_MEAN 32 #include <winsock2.h> 33 #include <windows.h> 34 #undef WIN32_LEAN_AND_MEAN 35 #endif 36 37 #include <sys/types.h> 38 #ifndef _WIN32 39 #include <sys/socket.h> 40 #endif 41 #ifdef EVENT__HAVE_SYS_TIME_H 42 #include <sys/time.h> 43 #endif 44 #include <sys/queue.h> 45 #include <stdio.h> 46 #include <stdlib.h> 47 #ifndef _WIN32 48 #include <unistd.h> 49 #endif 50 #include <errno.h> 51 #include <signal.h> 52 #include <string.h> 53 54 #include <sys/queue.h> 55 56 #include "event2/event.h" 57 #include "event2/event_struct.h" 58 #include "event2/rpc.h" 59 #include "event2/rpc_struct.h" 60 #include "evrpc-internal.h" 61 #include "event2/http.h" 62 #include "event2/buffer.h" 63 #include "event2/tag.h" 64 #include "event2/http_struct.h" 65 #include "event2/http_compat.h" 66 #include "event2/util.h" 67 #include "util-internal.h" 68 #include "log-internal.h" 69 #include "mm-internal.h" 70 71 struct evrpc_base * 72 evrpc_init(struct evhttp *http_server) 73 { 74 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 75 if (base == NULL) 76 return (NULL); 77 78 /* we rely on the tagging sub system */ 79 evtag_init(); 80 81 TAILQ_INIT(&base->registered_rpcs); 82 TAILQ_INIT(&base->input_hooks); 83 TAILQ_INIT(&base->output_hooks); 84 85 TAILQ_INIT(&base->paused_requests); 86 87 base->http_server = http_server; 88 89 return (base); 90 } 91 92 void 93 evrpc_free(struct evrpc_base *base) 94 { 95 struct evrpc *rpc; 96 struct evrpc_hook *hook; 97 struct evrpc_hook_ctx *pause; 98 int r; 99 100 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 101 r = evrpc_unregister_rpc(base, rpc->uri); 102 EVUTIL_ASSERT(r == 0); 103 } 104 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 105 TAILQ_REMOVE(&base->paused_requests, pause, next); 106 mm_free(pause); 107 } 108 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 109 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 110 EVUTIL_ASSERT(r); 111 } 112 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 113 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 114 EVUTIL_ASSERT(r); 115 } 116 mm_free(base); 117 } 118 119 void * 120 evrpc_add_hook(void *vbase, 121 enum EVRPC_HOOK_TYPE hook_type, 122 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 123 void *cb_arg) 124 { 125 struct evrpc_hooks_ *base = vbase; 126 struct evrpc_hook_list *head = NULL; 127 struct evrpc_hook *hook = NULL; 128 switch (hook_type) { 129 case EVRPC_INPUT: 130 head = &base->in_hooks; 131 break; 132 case EVRPC_OUTPUT: 133 head = &base->out_hooks; 134 break; 135 default: 136 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 137 } 138 139 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 140 EVUTIL_ASSERT(hook != NULL); 141 142 hook->process = cb; 143 hook->process_arg = cb_arg; 144 TAILQ_INSERT_TAIL(head, hook, next); 145 146 return (hook); 147 } 148 149 static int 150 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 151 { 152 struct evrpc_hook *hook = NULL; 153 TAILQ_FOREACH(hook, head, next) { 154 if (hook == handle) { 155 TAILQ_REMOVE(head, hook, next); 156 mm_free(hook); 157 return (1); 158 } 159 } 160 161 return (0); 162 } 163 164 /* 165 * remove the hook specified by the handle 166 */ 167 168 int 169 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 170 { 171 struct evrpc_hooks_ *base = vbase; 172 struct evrpc_hook_list *head = NULL; 173 switch (hook_type) { 174 case EVRPC_INPUT: 175 head = &base->in_hooks; 176 break; 177 case EVRPC_OUTPUT: 178 head = &base->out_hooks; 179 break; 180 default: 181 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 182 } 183 184 return (evrpc_remove_hook_internal(head, handle)); 185 } 186 187 static int 188 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 189 struct evhttp_request *req, struct evbuffer *evbuf) 190 { 191 struct evrpc_hook *hook; 192 TAILQ_FOREACH(hook, head, next) { 193 int res = hook->process(ctx, req, evbuf, hook->process_arg); 194 if (res != EVRPC_CONTINUE) 195 return (res); 196 } 197 198 return (EVRPC_CONTINUE); 199 } 200 201 static void evrpc_pool_schedule(struct evrpc_pool *pool); 202 static void evrpc_request_cb(struct evhttp_request *, void *); 203 204 /* 205 * Registers a new RPC with the HTTP server. The evrpc object is expected 206 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 207 * calls this function. 208 */ 209 210 static char * 211 evrpc_construct_uri(const char *uri) 212 { 213 char *constructed_uri; 214 size_t constructed_uri_len; 215 216 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 217 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 218 event_err(1, "%s: failed to register rpc at %s", 219 __func__, uri); 220 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 221 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 222 constructed_uri[constructed_uri_len - 1] = '\0'; 223 224 return (constructed_uri); 225 } 226 227 int 228 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 229 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 230 { 231 char *constructed_uri = evrpc_construct_uri(rpc->uri); 232 233 rpc->base = base; 234 rpc->cb = cb; 235 rpc->cb_arg = cb_arg; 236 237 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 238 239 evhttp_set_cb(base->http_server, 240 constructed_uri, 241 evrpc_request_cb, 242 rpc); 243 244 mm_free(constructed_uri); 245 246 return (0); 247 } 248 249 int 250 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 251 { 252 char *registered_uri = NULL; 253 struct evrpc *rpc; 254 int r; 255 256 /* find the right rpc; linear search might be slow */ 257 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 258 if (strcmp(rpc->uri, name) == 0) 259 break; 260 } 261 if (rpc == NULL) { 262 /* We did not find an RPC with this name */ 263 return (-1); 264 } 265 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 266 267 registered_uri = evrpc_construct_uri(name); 268 269 /* remove the http server callback */ 270 r = evhttp_del_cb(base->http_server, registered_uri); 271 EVUTIL_ASSERT(r == 0); 272 273 mm_free(registered_uri); 274 275 mm_free((char *)rpc->uri); 276 mm_free(rpc); 277 return (0); 278 } 279 280 static int evrpc_pause_request(void *vbase, void *ctx, 281 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 282 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 283 284 static void 285 evrpc_request_cb(struct evhttp_request *req, void *arg) 286 { 287 struct evrpc *rpc = arg; 288 struct evrpc_req_generic *rpc_state = NULL; 289 290 /* let's verify the outside parameters */ 291 if (req->type != EVHTTP_REQ_POST || 292 evbuffer_get_length(req->input_buffer) <= 0) 293 goto error; 294 295 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 296 if (rpc_state == NULL) 297 goto error; 298 rpc_state->rpc = rpc; 299 rpc_state->http_req = req; 300 rpc_state->rpc_data = NULL; 301 302 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 303 int hook_res; 304 305 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 306 307 /* 308 * allow hooks to modify the outgoing request 309 */ 310 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 311 rpc_state, req, req->input_buffer); 312 switch (hook_res) { 313 case EVRPC_TERMINATE: 314 goto error; 315 case EVRPC_PAUSE: 316 evrpc_pause_request(rpc->base, rpc_state, 317 evrpc_request_cb_closure); 318 return; 319 case EVRPC_CONTINUE: 320 break; 321 default: 322 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 323 hook_res == EVRPC_CONTINUE || 324 hook_res == EVRPC_PAUSE); 325 } 326 } 327 328 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 329 return; 330 331 error: 332 evrpc_reqstate_free_(rpc_state); 333 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 334 return; 335 } 336 337 static void 338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 339 { 340 struct evrpc_req_generic *rpc_state = arg; 341 struct evrpc *rpc; 342 struct evhttp_request *req; 343 344 EVUTIL_ASSERT(rpc_state); 345 rpc = rpc_state->rpc; 346 req = rpc_state->http_req; 347 348 if (hook_res == EVRPC_TERMINATE) 349 goto error; 350 351 /* let's check that we can parse the request */ 352 rpc_state->request = rpc->request_new(rpc->request_new_arg); 353 if (rpc_state->request == NULL) 354 goto error; 355 356 if (rpc->request_unmarshal( 357 rpc_state->request, req->input_buffer) == -1) { 358 /* we failed to parse the request; that's a bummer */ 359 goto error; 360 } 361 362 /* at this point, we have a well formed request, prepare the reply */ 363 364 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 365 if (rpc_state->reply == NULL) 366 goto error; 367 368 /* give the rpc to the user; they can deal with it */ 369 rpc->cb(rpc_state, rpc->cb_arg); 370 371 return; 372 373 error: 374 evrpc_reqstate_free_(rpc_state); 375 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 376 return; 377 } 378 379 380 void 381 evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 382 { 383 struct evrpc *rpc; 384 EVUTIL_ASSERT(rpc_state != NULL); 385 rpc = rpc_state->rpc; 386 387 /* clean up all memory */ 388 if (rpc_state->hook_meta != NULL) 389 evrpc_hook_context_free_(rpc_state->hook_meta); 390 if (rpc_state->request != NULL) 391 rpc->request_free(rpc_state->request); 392 if (rpc_state->reply != NULL) 393 rpc->reply_free(rpc_state->reply); 394 if (rpc_state->rpc_data != NULL) 395 evbuffer_free(rpc_state->rpc_data); 396 mm_free(rpc_state); 397 } 398 399 static void 400 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 401 402 void 403 evrpc_request_done(struct evrpc_req_generic *rpc_state) 404 { 405 struct evhttp_request *req; 406 struct evrpc *rpc; 407 408 EVUTIL_ASSERT(rpc_state); 409 410 req = rpc_state->http_req; 411 rpc = rpc_state->rpc; 412 413 if (rpc->reply_complete(rpc_state->reply) == -1) { 414 /* the reply was not completely filled in. error out */ 415 goto error; 416 } 417 418 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 419 /* out of memory */ 420 goto error; 421 } 422 423 /* serialize the reply */ 424 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 425 426 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 427 int hook_res; 428 429 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 430 431 /* do hook based tweaks to the request */ 432 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 433 rpc_state, req, rpc_state->rpc_data); 434 switch (hook_res) { 435 case EVRPC_TERMINATE: 436 goto error; 437 case EVRPC_PAUSE: 438 if (evrpc_pause_request(rpc->base, rpc_state, 439 evrpc_request_done_closure) == -1) 440 goto error; 441 return; 442 case EVRPC_CONTINUE: 443 break; 444 default: 445 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 446 hook_res == EVRPC_CONTINUE || 447 hook_res == EVRPC_PAUSE); 448 } 449 } 450 451 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 452 return; 453 454 error: 455 evrpc_reqstate_free_(rpc_state); 456 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 457 return; 458 } 459 460 void * 461 evrpc_get_request(struct evrpc_req_generic *req) 462 { 463 return req->request; 464 } 465 466 void * 467 evrpc_get_reply(struct evrpc_req_generic *req) 468 { 469 return req->reply; 470 } 471 472 static void 473 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 474 { 475 struct evrpc_req_generic *rpc_state = arg; 476 struct evhttp_request *req; 477 EVUTIL_ASSERT(rpc_state); 478 req = rpc_state->http_req; 479 480 if (hook_res == EVRPC_TERMINATE) 481 goto error; 482 483 /* on success, we are going to transmit marshaled binary data */ 484 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 485 evhttp_add_header(req->output_headers, 486 "Content-Type", "application/octet-stream"); 487 } 488 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 489 490 evrpc_reqstate_free_(rpc_state); 491 492 return; 493 494 error: 495 evrpc_reqstate_free_(rpc_state); 496 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 497 return; 498 } 499 500 501 /* Client implementation of RPC site */ 502 503 static int evrpc_schedule_request(struct evhttp_connection *connection, 504 struct evrpc_request_wrapper *ctx); 505 506 struct evrpc_pool * 507 evrpc_pool_new(struct event_base *base) 508 { 509 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 510 if (pool == NULL) 511 return (NULL); 512 513 TAILQ_INIT(&pool->connections); 514 TAILQ_INIT(&pool->requests); 515 516 TAILQ_INIT(&pool->paused_requests); 517 518 TAILQ_INIT(&pool->input_hooks); 519 TAILQ_INIT(&pool->output_hooks); 520 521 pool->base = base; 522 pool->timeout = -1; 523 524 return (pool); 525 } 526 527 static void 528 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 529 { 530 if (request->hook_meta != NULL) 531 evrpc_hook_context_free_(request->hook_meta); 532 mm_free(request->name); 533 mm_free(request); 534 } 535 536 void 537 evrpc_pool_free(struct evrpc_pool *pool) 538 { 539 struct evhttp_connection *connection; 540 struct evrpc_request_wrapper *request; 541 struct evrpc_hook_ctx *pause; 542 struct evrpc_hook *hook; 543 int r; 544 545 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 546 TAILQ_REMOVE(&pool->requests, request, next); 547 evrpc_request_wrapper_free(request); 548 } 549 550 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 551 TAILQ_REMOVE(&pool->paused_requests, pause, next); 552 mm_free(pause); 553 } 554 555 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 556 TAILQ_REMOVE(&pool->connections, connection, next); 557 evhttp_connection_free(connection); 558 } 559 560 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 561 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 562 EVUTIL_ASSERT(r); 563 } 564 565 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 566 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 567 EVUTIL_ASSERT(r); 568 } 569 570 mm_free(pool); 571 } 572 573 /* 574 * Add a connection to the RPC pool. A request scheduled on the pool 575 * may use any available connection. 576 */ 577 578 void 579 evrpc_pool_add_connection(struct evrpc_pool *pool, 580 struct evhttp_connection *connection) 581 { 582 EVUTIL_ASSERT(connection->http_server == NULL); 583 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 584 585 /* 586 * associate an event base with this connection 587 */ 588 if (pool->base != NULL) 589 evhttp_connection_set_base(connection, pool->base); 590 591 /* 592 * unless a timeout was specifically set for a connection, 593 * the connection inherits the timeout from the pool. 594 */ 595 if (!evutil_timerisset(&connection->timeout)) 596 evhttp_connection_set_timeout(connection, pool->timeout); 597 598 /* 599 * if we have any requests pending, schedule them with the new 600 * connections. 601 */ 602 603 if (TAILQ_FIRST(&pool->requests) != NULL) { 604 struct evrpc_request_wrapper *request = 605 TAILQ_FIRST(&pool->requests); 606 TAILQ_REMOVE(&pool->requests, request, next); 607 evrpc_schedule_request(connection, request); 608 } 609 } 610 611 void 612 evrpc_pool_remove_connection(struct evrpc_pool *pool, 613 struct evhttp_connection *connection) 614 { 615 TAILQ_REMOVE(&pool->connections, connection, next); 616 } 617 618 void 619 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 620 { 621 struct evhttp_connection *evcon; 622 TAILQ_FOREACH(evcon, &pool->connections, next) { 623 evhttp_connection_set_timeout(evcon, timeout_in_secs); 624 } 625 pool->timeout = timeout_in_secs; 626 } 627 628 629 static void evrpc_reply_done(struct evhttp_request *, void *); 630 static void evrpc_request_timeout(evutil_socket_t, short, void *); 631 632 /* 633 * Finds a connection object associated with the pool that is currently 634 * idle and can be used to make a request. 635 */ 636 static struct evhttp_connection * 637 evrpc_pool_find_connection(struct evrpc_pool *pool) 638 { 639 struct evhttp_connection *connection; 640 TAILQ_FOREACH(connection, &pool->connections, next) { 641 if (TAILQ_FIRST(&connection->requests) == NULL) 642 return (connection); 643 } 644 645 return (NULL); 646 } 647 648 /* 649 * Prototypes responsible for evrpc scheduling and hooking 650 */ 651 652 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 653 654 /* 655 * We assume that the ctx is no longer queued on the pool. 656 */ 657 static int 658 evrpc_schedule_request(struct evhttp_connection *connection, 659 struct evrpc_request_wrapper *ctx) 660 { 661 struct evhttp_request *req = NULL; 662 struct evrpc_pool *pool = ctx->pool; 663 struct evrpc_status status; 664 665 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 666 goto error; 667 668 /* serialize the request data into the output buffer */ 669 ctx->request_marshal(req->output_buffer, ctx->request); 670 671 /* we need to know the connection that we might have to abort */ 672 ctx->evcon = connection; 673 674 /* if we get paused we also need to know the request */ 675 ctx->req = req; 676 677 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 678 int hook_res; 679 680 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 681 682 /* apply hooks to the outgoing request */ 683 hook_res = evrpc_process_hooks(&pool->output_hooks, 684 ctx, req, req->output_buffer); 685 686 switch (hook_res) { 687 case EVRPC_TERMINATE: 688 goto error; 689 case EVRPC_PAUSE: 690 /* we need to be explicitly resumed */ 691 if (evrpc_pause_request(pool, ctx, 692 evrpc_schedule_request_closure) == -1) 693 goto error; 694 return (0); 695 case EVRPC_CONTINUE: 696 /* we can just continue */ 697 break; 698 default: 699 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 700 hook_res == EVRPC_CONTINUE || 701 hook_res == EVRPC_PAUSE); 702 } 703 } 704 705 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 706 return (0); 707 708 error: 709 memset(&status, 0, sizeof(status)); 710 status.error = EVRPC_STATUS_ERR_UNSTARTED; 711 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 712 evrpc_request_wrapper_free(ctx); 713 return (-1); 714 } 715 716 static void 717 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 718 { 719 struct evrpc_request_wrapper *ctx = arg; 720 struct evhttp_connection *connection = ctx->evcon; 721 struct evhttp_request *req = ctx->req; 722 struct evrpc_pool *pool = ctx->pool; 723 struct evrpc_status status; 724 char *uri = NULL; 725 int res = 0; 726 727 if (hook_res == EVRPC_TERMINATE) 728 goto error; 729 730 uri = evrpc_construct_uri(ctx->name); 731 if (uri == NULL) 732 goto error; 733 734 if (pool->timeout > 0) { 735 /* 736 * a timeout after which the whole rpc is going to be aborted. 737 */ 738 struct timeval tv; 739 evutil_timerclear(&tv); 740 tv.tv_sec = pool->timeout; 741 evtimer_add(&ctx->ev_timeout, &tv); 742 } 743 744 /* start the request over the connection */ 745 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 746 mm_free(uri); 747 748 if (res == -1) 749 goto error; 750 751 return; 752 753 error: 754 memset(&status, 0, sizeof(status)); 755 status.error = EVRPC_STATUS_ERR_UNSTARTED; 756 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 757 evrpc_request_wrapper_free(ctx); 758 } 759 760 /* we just queue the paused request on the pool under the req object */ 761 static int 762 evrpc_pause_request(void *vbase, void *ctx, 763 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 764 { 765 struct evrpc_hooks_ *base = vbase; 766 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 767 if (pause == NULL) 768 return (-1); 769 770 pause->ctx = ctx; 771 pause->cb = cb; 772 773 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 774 return (0); 775 } 776 777 int 778 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 779 { 780 struct evrpc_hooks_ *base = vbase; 781 struct evrpc_pause_list *head = &base->pause_requests; 782 struct evrpc_hook_ctx *pause; 783 784 TAILQ_FOREACH(pause, head, next) { 785 if (pause->ctx == ctx) 786 break; 787 } 788 789 if (pause == NULL) 790 return (-1); 791 792 (*pause->cb)(pause->ctx, res); 793 TAILQ_REMOVE(head, pause, next); 794 mm_free(pause); 795 return (0); 796 } 797 798 int 799 evrpc_make_request(struct evrpc_request_wrapper *ctx) 800 { 801 struct evrpc_pool *pool = ctx->pool; 802 803 /* initialize the event structure for this rpc */ 804 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 805 806 /* we better have some available connections on the pool */ 807 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 808 809 /* 810 * if no connection is available, we queue the request on the pool, 811 * the next time a connection is empty, the rpc will be send on that. 812 */ 813 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 814 815 evrpc_pool_schedule(pool); 816 817 return (0); 818 } 819 820 821 struct evrpc_request_wrapper * 822 evrpc_make_request_ctx( 823 struct evrpc_pool *pool, void *request, void *reply, 824 const char *rpcname, 825 void (*req_marshal)(struct evbuffer*, void *), 826 void (*rpl_clear)(void *), 827 int (*rpl_unmarshal)(void *, struct evbuffer *), 828 void (*cb)(struct evrpc_status *, void *, void *, void *), 829 void *cbarg) 830 { 831 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 832 mm_malloc(sizeof(struct evrpc_request_wrapper)); 833 if (ctx == NULL) 834 return (NULL); 835 836 ctx->pool = pool; 837 ctx->hook_meta = NULL; 838 ctx->evcon = NULL; 839 ctx->name = mm_strdup(rpcname); 840 if (ctx->name == NULL) { 841 mm_free(ctx); 842 return (NULL); 843 } 844 ctx->cb = cb; 845 ctx->cb_arg = cbarg; 846 ctx->request = request; 847 ctx->reply = reply; 848 ctx->request_marshal = req_marshal; 849 ctx->reply_clear = rpl_clear; 850 ctx->reply_unmarshal = rpl_unmarshal; 851 852 return (ctx); 853 } 854 855 static void 856 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 857 858 static void 859 evrpc_reply_done(struct evhttp_request *req, void *arg) 860 { 861 struct evrpc_request_wrapper *ctx = arg; 862 struct evrpc_pool *pool = ctx->pool; 863 int hook_res = EVRPC_CONTINUE; 864 865 /* cancel any timeout we might have scheduled */ 866 event_del(&ctx->ev_timeout); 867 868 ctx->req = req; 869 870 /* we need to get the reply now */ 871 if (req == NULL) { 872 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 873 return; 874 } 875 876 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 877 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 878 879 /* apply hooks to the incoming request */ 880 hook_res = evrpc_process_hooks(&pool->input_hooks, 881 ctx, req, req->input_buffer); 882 883 switch (hook_res) { 884 case EVRPC_TERMINATE: 885 case EVRPC_CONTINUE: 886 break; 887 case EVRPC_PAUSE: 888 /* 889 * if we get paused we also need to know the 890 * request. unfortunately, the underlying 891 * layer is going to free it. we need to 892 * request ownership explicitly 893 */ 894 if (req != NULL) 895 evhttp_request_own(req); 896 897 evrpc_pause_request(pool, ctx, 898 evrpc_reply_done_closure); 899 return; 900 default: 901 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 902 hook_res == EVRPC_CONTINUE || 903 hook_res == EVRPC_PAUSE); 904 } 905 } 906 907 evrpc_reply_done_closure(ctx, hook_res); 908 909 /* http request is being freed by underlying layer */ 910 } 911 912 static void 913 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 914 { 915 struct evrpc_request_wrapper *ctx = arg; 916 struct evhttp_request *req = ctx->req; 917 struct evrpc_pool *pool = ctx->pool; 918 struct evrpc_status status; 919 int res = -1; 920 921 memset(&status, 0, sizeof(status)); 922 status.http_req = req; 923 924 /* we need to get the reply now */ 925 if (req == NULL) { 926 status.error = EVRPC_STATUS_ERR_TIMEOUT; 927 } else if (hook_res == EVRPC_TERMINATE) { 928 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 929 } else { 930 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 931 if (res == -1) 932 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 933 } 934 935 if (res == -1) { 936 /* clear everything that we might have written previously */ 937 ctx->reply_clear(ctx->reply); 938 } 939 940 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 941 942 evrpc_request_wrapper_free(ctx); 943 944 /* the http layer owned the original request structure, but if we 945 * got paused, we asked for ownership and need to free it here. */ 946 if (req != NULL && evhttp_request_is_owned(req)) 947 evhttp_request_free(req); 948 949 /* see if we can schedule another request */ 950 evrpc_pool_schedule(pool); 951 } 952 953 static void 954 evrpc_pool_schedule(struct evrpc_pool *pool) 955 { 956 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 957 struct evhttp_connection *evcon; 958 959 /* if no requests are pending, we have no work */ 960 if (ctx == NULL) 961 return; 962 963 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 964 TAILQ_REMOVE(&pool->requests, ctx, next); 965 evrpc_schedule_request(evcon, ctx); 966 } 967 } 968 969 static void 970 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 971 { 972 struct evrpc_request_wrapper *ctx = arg; 973 struct evhttp_connection *evcon = ctx->evcon; 974 EVUTIL_ASSERT(evcon != NULL); 975 976 evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 977 } 978 979 /* 980 * frees potential meta data associated with a request. 981 */ 982 983 static void 984 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 985 { 986 struct evrpc_meta *entry; 987 EVUTIL_ASSERT(meta_data != NULL); 988 989 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 990 TAILQ_REMOVE(meta_data, entry, next); 991 mm_free(entry->key); 992 mm_free(entry->data); 993 mm_free(entry); 994 } 995 } 996 997 static struct evrpc_hook_meta * 998 evrpc_hook_meta_new_(void) 999 { 1000 struct evrpc_hook_meta *ctx; 1001 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1002 EVUTIL_ASSERT(ctx != NULL); 1003 1004 TAILQ_INIT(&ctx->meta_data); 1005 ctx->evcon = NULL; 1006 1007 return (ctx); 1008 } 1009 1010 static void 1011 evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1012 struct evhttp_connection *evcon) 1013 { 1014 struct evrpc_hook_meta *ctx = *pctx; 1015 if (ctx == NULL) 1016 *pctx = ctx = evrpc_hook_meta_new_(); 1017 ctx->evcon = evcon; 1018 } 1019 1020 static void 1021 evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1022 { 1023 evrpc_meta_data_free(&ctx->meta_data); 1024 mm_free(ctx); 1025 } 1026 1027 /* Adds meta data */ 1028 void 1029 evrpc_hook_add_meta(void *ctx, const char *key, 1030 const void *data, size_t data_size) 1031 { 1032 struct evrpc_request_wrapper *req = ctx; 1033 struct evrpc_hook_meta *store = NULL; 1034 struct evrpc_meta *meta = NULL; 1035 1036 if ((store = req->hook_meta) == NULL) 1037 store = req->hook_meta = evrpc_hook_meta_new_(); 1038 1039 meta = mm_malloc(sizeof(struct evrpc_meta)); 1040 EVUTIL_ASSERT(meta != NULL); 1041 meta->key = mm_strdup(key); 1042 EVUTIL_ASSERT(meta->key != NULL); 1043 meta->data_size = data_size; 1044 meta->data = mm_malloc(data_size); 1045 EVUTIL_ASSERT(meta->data != NULL); 1046 memcpy(meta->data, data, data_size); 1047 1048 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1049 } 1050 1051 int 1052 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1053 { 1054 struct evrpc_request_wrapper *req = ctx; 1055 struct evrpc_meta *meta = NULL; 1056 1057 if (req->hook_meta == NULL) 1058 return (-1); 1059 1060 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1061 if (strcmp(meta->key, key) == 0) { 1062 *data = meta->data; 1063 *data_size = meta->data_size; 1064 return (0); 1065 } 1066 } 1067 1068 return (-1); 1069 } 1070 1071 struct evhttp_connection * 1072 evrpc_hook_get_connection(void *ctx) 1073 { 1074 struct evrpc_request_wrapper *req = ctx; 1075 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1076 } 1077 1078 int 1079 evrpc_send_request_generic(struct evrpc_pool *pool, 1080 void *request, void *reply, 1081 void (*cb)(struct evrpc_status *, void *, void *, void *), 1082 void *cb_arg, 1083 const char *rpcname, 1084 void (*req_marshal)(struct evbuffer *, void *), 1085 void (*rpl_clear)(void *), 1086 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1087 { 1088 struct evrpc_status status; 1089 struct evrpc_request_wrapper *ctx; 1090 ctx = evrpc_make_request_ctx(pool, request, reply, 1091 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1092 if (ctx == NULL) 1093 goto error; 1094 return (evrpc_make_request(ctx)); 1095 error: 1096 memset(&status, 0, sizeof(status)); 1097 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1098 (*(cb))(&status, request, reply, cb_arg); 1099 return (-1); 1100 } 1101 1102 /** Takes a request object and fills it in with the right magic */ 1103 static struct evrpc * 1104 evrpc_register_object(const char *name, 1105 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1106 int (*req_unmarshal)(void *, struct evbuffer *), 1107 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1108 int (*rpl_complete)(void *), 1109 void (*rpl_marshal)(struct evbuffer *, void *)) 1110 { 1111 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1112 if (rpc == NULL) 1113 return (NULL); 1114 rpc->uri = mm_strdup(name); 1115 if (rpc->uri == NULL) { 1116 mm_free(rpc); 1117 return (NULL); 1118 } 1119 rpc->request_new = req_new; 1120 rpc->request_new_arg = req_new_arg; 1121 rpc->request_free = req_free; 1122 rpc->request_unmarshal = req_unmarshal; 1123 rpc->reply_new = rpl_new; 1124 rpc->reply_new_arg = rpl_new_arg; 1125 rpc->reply_free = rpl_free; 1126 rpc->reply_complete = rpl_complete; 1127 rpc->reply_marshal = rpl_marshal; 1128 return (rpc); 1129 } 1130 1131 int 1132 evrpc_register_generic(struct evrpc_base *base, const char *name, 1133 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1134 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1135 int (*req_unmarshal)(void *, struct evbuffer *), 1136 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1137 int (*rpl_complete)(void *), 1138 void (*rpl_marshal)(struct evbuffer *, void *)) 1139 { 1140 struct evrpc* rpc = 1141 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1142 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1143 if (rpc == NULL) 1144 return (-1); 1145 evrpc_register_rpc(base, rpc, 1146 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1147 return (0); 1148 } 1149 1150 /** accessors for obscure and undocumented functionality */ 1151 struct evrpc_pool * 1152 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1153 { 1154 return (ctx->pool); 1155 } 1156 1157 void 1158 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1159 struct evrpc_pool *pool) 1160 { 1161 ctx->pool = pool; 1162 } 1163 1164 void 1165 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1166 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1167 void *cb_arg) 1168 { 1169 ctx->cb = cb; 1170 ctx->cb_arg = cb_arg; 1171 } 1172