12b15cb3dSCy Schubert /*
22b15cb3dSCy Schubert * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
32b15cb3dSCy Schubert * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
42b15cb3dSCy Schubert *
52b15cb3dSCy Schubert * Redistribution and use in source and binary forms, with or without
62b15cb3dSCy Schubert * modification, are permitted provided that the following conditions
72b15cb3dSCy Schubert * are met:
82b15cb3dSCy Schubert * 1. Redistributions of source code must retain the above copyright
92b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer.
102b15cb3dSCy Schubert * 2. Redistributions in binary form must reproduce the above copyright
112b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer in the
122b15cb3dSCy Schubert * documentation and/or other materials provided with the distribution.
132b15cb3dSCy Schubert * 3. The name of the author may not be used to endorse or promote products
142b15cb3dSCy Schubert * derived from this software without specific prior written permission.
152b15cb3dSCy Schubert *
162b15cb3dSCy Schubert * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
172b15cb3dSCy Schubert * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
182b15cb3dSCy Schubert * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
192b15cb3dSCy Schubert * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
202b15cb3dSCy Schubert * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
212b15cb3dSCy Schubert * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
222b15cb3dSCy Schubert * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
232b15cb3dSCy Schubert * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
242b15cb3dSCy Schubert * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
252b15cb3dSCy Schubert * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
262b15cb3dSCy Schubert */
272b15cb3dSCy Schubert #include "event2/event-config.h"
282b15cb3dSCy Schubert #include "evconfig-private.h"
292b15cb3dSCy Schubert
302b15cb3dSCy Schubert #ifdef _WIN32
312b15cb3dSCy Schubert #define WIN32_LEAN_AND_MEAN
322b15cb3dSCy Schubert #include <winsock2.h>
332b15cb3dSCy Schubert #include <windows.h>
342b15cb3dSCy Schubert #undef WIN32_LEAN_AND_MEAN
352b15cb3dSCy Schubert #endif
362b15cb3dSCy Schubert
372b15cb3dSCy Schubert #include <sys/types.h>
382b15cb3dSCy Schubert #ifndef _WIN32
392b15cb3dSCy Schubert #include <sys/socket.h>
402b15cb3dSCy Schubert #endif
412b15cb3dSCy Schubert #ifdef EVENT__HAVE_SYS_TIME_H
422b15cb3dSCy Schubert #include <sys/time.h>
432b15cb3dSCy Schubert #endif
442b15cb3dSCy Schubert #include <sys/queue.h>
452b15cb3dSCy Schubert #include <stdio.h>
462b15cb3dSCy Schubert #include <stdlib.h>
472b15cb3dSCy Schubert #ifndef _WIN32
482b15cb3dSCy Schubert #include <unistd.h>
492b15cb3dSCy Schubert #endif
502b15cb3dSCy Schubert #include <errno.h>
512b15cb3dSCy Schubert #include <signal.h>
522b15cb3dSCy Schubert #include <string.h>
532b15cb3dSCy Schubert
542b15cb3dSCy Schubert #include <sys/queue.h>
552b15cb3dSCy Schubert
562b15cb3dSCy Schubert #include "event2/event.h"
572b15cb3dSCy Schubert #include "event2/event_struct.h"
582b15cb3dSCy Schubert #include "event2/rpc.h"
592b15cb3dSCy Schubert #include "event2/rpc_struct.h"
602b15cb3dSCy Schubert #include "evrpc-internal.h"
612b15cb3dSCy Schubert #include "event2/http.h"
622b15cb3dSCy Schubert #include "event2/buffer.h"
632b15cb3dSCy Schubert #include "event2/tag.h"
642b15cb3dSCy Schubert #include "event2/http_struct.h"
652b15cb3dSCy Schubert #include "event2/http_compat.h"
662b15cb3dSCy Schubert #include "event2/util.h"
672b15cb3dSCy Schubert #include "util-internal.h"
682b15cb3dSCy Schubert #include "log-internal.h"
692b15cb3dSCy Schubert #include "mm-internal.h"
702b15cb3dSCy Schubert
712b15cb3dSCy Schubert struct evrpc_base *
evrpc_init(struct evhttp * http_server)722b15cb3dSCy Schubert evrpc_init(struct evhttp *http_server)
732b15cb3dSCy Schubert {
742b15cb3dSCy Schubert struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
752b15cb3dSCy Schubert if (base == NULL)
762b15cb3dSCy Schubert return (NULL);
772b15cb3dSCy Schubert
782b15cb3dSCy Schubert /* we rely on the tagging sub system */
792b15cb3dSCy Schubert evtag_init();
802b15cb3dSCy Schubert
812b15cb3dSCy Schubert TAILQ_INIT(&base->registered_rpcs);
822b15cb3dSCy Schubert TAILQ_INIT(&base->input_hooks);
832b15cb3dSCy Schubert TAILQ_INIT(&base->output_hooks);
842b15cb3dSCy Schubert
852b15cb3dSCy Schubert TAILQ_INIT(&base->paused_requests);
862b15cb3dSCy Schubert
872b15cb3dSCy Schubert base->http_server = http_server;
882b15cb3dSCy Schubert
892b15cb3dSCy Schubert return (base);
902b15cb3dSCy Schubert }
912b15cb3dSCy Schubert
922b15cb3dSCy Schubert void
evrpc_free(struct evrpc_base * base)932b15cb3dSCy Schubert evrpc_free(struct evrpc_base *base)
942b15cb3dSCy Schubert {
952b15cb3dSCy Schubert struct evrpc *rpc;
962b15cb3dSCy Schubert struct evrpc_hook *hook;
972b15cb3dSCy Schubert struct evrpc_hook_ctx *pause;
982b15cb3dSCy Schubert int r;
992b15cb3dSCy Schubert
1002b15cb3dSCy Schubert while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
1012b15cb3dSCy Schubert r = evrpc_unregister_rpc(base, rpc->uri);
1022b15cb3dSCy Schubert EVUTIL_ASSERT(r == 0);
1032b15cb3dSCy Schubert }
1042b15cb3dSCy Schubert while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
1052b15cb3dSCy Schubert TAILQ_REMOVE(&base->paused_requests, pause, next);
1062b15cb3dSCy Schubert mm_free(pause);
1072b15cb3dSCy Schubert }
1082b15cb3dSCy Schubert while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
1092b15cb3dSCy Schubert r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
1102b15cb3dSCy Schubert EVUTIL_ASSERT(r);
1112b15cb3dSCy Schubert }
1122b15cb3dSCy Schubert while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
1132b15cb3dSCy Schubert r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
1142b15cb3dSCy Schubert EVUTIL_ASSERT(r);
1152b15cb3dSCy Schubert }
1162b15cb3dSCy Schubert mm_free(base);
1172b15cb3dSCy Schubert }
1182b15cb3dSCy Schubert
1192b15cb3dSCy Schubert void *
evrpc_add_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,int (* cb)(void *,struct evhttp_request *,struct evbuffer *,void *),void * cb_arg)1202b15cb3dSCy Schubert evrpc_add_hook(void *vbase,
1212b15cb3dSCy Schubert enum EVRPC_HOOK_TYPE hook_type,
1222b15cb3dSCy Schubert int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
1232b15cb3dSCy Schubert void *cb_arg)
1242b15cb3dSCy Schubert {
1252b15cb3dSCy Schubert struct evrpc_hooks_ *base = vbase;
1262b15cb3dSCy Schubert struct evrpc_hook_list *head = NULL;
1272b15cb3dSCy Schubert struct evrpc_hook *hook = NULL;
1282b15cb3dSCy Schubert switch (hook_type) {
1292b15cb3dSCy Schubert case EVRPC_INPUT:
1302b15cb3dSCy Schubert head = &base->in_hooks;
1312b15cb3dSCy Schubert break;
1322b15cb3dSCy Schubert case EVRPC_OUTPUT:
1332b15cb3dSCy Schubert head = &base->out_hooks;
1342b15cb3dSCy Schubert break;
1352b15cb3dSCy Schubert default:
1362b15cb3dSCy Schubert EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
1372b15cb3dSCy Schubert }
1382b15cb3dSCy Schubert
1392b15cb3dSCy Schubert hook = mm_calloc(1, sizeof(struct evrpc_hook));
1402b15cb3dSCy Schubert EVUTIL_ASSERT(hook != NULL);
1412b15cb3dSCy Schubert
1422b15cb3dSCy Schubert hook->process = cb;
1432b15cb3dSCy Schubert hook->process_arg = cb_arg;
1442b15cb3dSCy Schubert TAILQ_INSERT_TAIL(head, hook, next);
1452b15cb3dSCy Schubert
1462b15cb3dSCy Schubert return (hook);
1472b15cb3dSCy Schubert }
1482b15cb3dSCy Schubert
1492b15cb3dSCy Schubert static int
evrpc_remove_hook_internal(struct evrpc_hook_list * head,void * handle)1502b15cb3dSCy Schubert evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
1512b15cb3dSCy Schubert {
1522b15cb3dSCy Schubert struct evrpc_hook *hook = NULL;
1532b15cb3dSCy Schubert TAILQ_FOREACH(hook, head, next) {
1542b15cb3dSCy Schubert if (hook == handle) {
1552b15cb3dSCy Schubert TAILQ_REMOVE(head, hook, next);
1562b15cb3dSCy Schubert mm_free(hook);
1572b15cb3dSCy Schubert return (1);
1582b15cb3dSCy Schubert }
1592b15cb3dSCy Schubert }
1602b15cb3dSCy Schubert
1612b15cb3dSCy Schubert return (0);
1622b15cb3dSCy Schubert }
1632b15cb3dSCy Schubert
1642b15cb3dSCy Schubert /*
1652b15cb3dSCy Schubert * remove the hook specified by the handle
1662b15cb3dSCy Schubert */
1672b15cb3dSCy Schubert
1682b15cb3dSCy Schubert int
evrpc_remove_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,void * handle)1692b15cb3dSCy Schubert evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
1702b15cb3dSCy Schubert {
1712b15cb3dSCy Schubert struct evrpc_hooks_ *base = vbase;
1722b15cb3dSCy Schubert struct evrpc_hook_list *head = NULL;
1732b15cb3dSCy Schubert switch (hook_type) {
1742b15cb3dSCy Schubert case EVRPC_INPUT:
1752b15cb3dSCy Schubert head = &base->in_hooks;
1762b15cb3dSCy Schubert break;
1772b15cb3dSCy Schubert case EVRPC_OUTPUT:
1782b15cb3dSCy Schubert head = &base->out_hooks;
1792b15cb3dSCy Schubert break;
1802b15cb3dSCy Schubert default:
1812b15cb3dSCy Schubert EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
1822b15cb3dSCy Schubert }
1832b15cb3dSCy Schubert
1842b15cb3dSCy Schubert return (evrpc_remove_hook_internal(head, handle));
1852b15cb3dSCy Schubert }
1862b15cb3dSCy Schubert
1872b15cb3dSCy Schubert static int
evrpc_process_hooks(struct evrpc_hook_list * head,void * ctx,struct evhttp_request * req,struct evbuffer * evbuf)1882b15cb3dSCy Schubert evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
1892b15cb3dSCy Schubert struct evhttp_request *req, struct evbuffer *evbuf)
1902b15cb3dSCy Schubert {
1912b15cb3dSCy Schubert struct evrpc_hook *hook;
1922b15cb3dSCy Schubert TAILQ_FOREACH(hook, head, next) {
1932b15cb3dSCy Schubert int res = hook->process(ctx, req, evbuf, hook->process_arg);
1942b15cb3dSCy Schubert if (res != EVRPC_CONTINUE)
1952b15cb3dSCy Schubert return (res);
1962b15cb3dSCy Schubert }
1972b15cb3dSCy Schubert
1982b15cb3dSCy Schubert return (EVRPC_CONTINUE);
1992b15cb3dSCy Schubert }
2002b15cb3dSCy Schubert
2012b15cb3dSCy Schubert static void evrpc_pool_schedule(struct evrpc_pool *pool);
2022b15cb3dSCy Schubert static void evrpc_request_cb(struct evhttp_request *, void *);
2032b15cb3dSCy Schubert
2042b15cb3dSCy Schubert /*
2052b15cb3dSCy Schubert * Registers a new RPC with the HTTP server. The evrpc object is expected
2062b15cb3dSCy Schubert * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
2072b15cb3dSCy Schubert * calls this function.
2082b15cb3dSCy Schubert */
2092b15cb3dSCy Schubert
2102b15cb3dSCy Schubert static char *
evrpc_construct_uri(const char * uri)2112b15cb3dSCy Schubert evrpc_construct_uri(const char *uri)
2122b15cb3dSCy Schubert {
2132b15cb3dSCy Schubert char *constructed_uri;
2142b15cb3dSCy Schubert size_t constructed_uri_len;
2152b15cb3dSCy Schubert
2162b15cb3dSCy Schubert constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
2172b15cb3dSCy Schubert if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
2182b15cb3dSCy Schubert event_err(1, "%s: failed to register rpc at %s",
2192b15cb3dSCy Schubert __func__, uri);
2202b15cb3dSCy Schubert memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
2212b15cb3dSCy Schubert memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
2222b15cb3dSCy Schubert constructed_uri[constructed_uri_len - 1] = '\0';
2232b15cb3dSCy Schubert
2242b15cb3dSCy Schubert return (constructed_uri);
2252b15cb3dSCy Schubert }
2262b15cb3dSCy Schubert
2272b15cb3dSCy Schubert int
evrpc_register_rpc(struct evrpc_base * base,struct evrpc * rpc,void (* cb)(struct evrpc_req_generic *,void *),void * cb_arg)2282b15cb3dSCy Schubert evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
2292b15cb3dSCy Schubert void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
2302b15cb3dSCy Schubert {
2312b15cb3dSCy Schubert char *constructed_uri = evrpc_construct_uri(rpc->uri);
2322b15cb3dSCy Schubert
2332b15cb3dSCy Schubert rpc->base = base;
2342b15cb3dSCy Schubert rpc->cb = cb;
2352b15cb3dSCy Schubert rpc->cb_arg = cb_arg;
2362b15cb3dSCy Schubert
2372b15cb3dSCy Schubert TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
2382b15cb3dSCy Schubert
2392b15cb3dSCy Schubert evhttp_set_cb(base->http_server,
2402b15cb3dSCy Schubert constructed_uri,
2412b15cb3dSCy Schubert evrpc_request_cb,
2422b15cb3dSCy Schubert rpc);
2432b15cb3dSCy Schubert
2442b15cb3dSCy Schubert mm_free(constructed_uri);
2452b15cb3dSCy Schubert
2462b15cb3dSCy Schubert return (0);
2472b15cb3dSCy Schubert }
2482b15cb3dSCy Schubert
2492b15cb3dSCy Schubert int
evrpc_unregister_rpc(struct evrpc_base * base,const char * name)2502b15cb3dSCy Schubert evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
2512b15cb3dSCy Schubert {
2522b15cb3dSCy Schubert char *registered_uri = NULL;
2532b15cb3dSCy Schubert struct evrpc *rpc;
2542b15cb3dSCy Schubert int r;
2552b15cb3dSCy Schubert
2562b15cb3dSCy Schubert /* find the right rpc; linear search might be slow */
2572b15cb3dSCy Schubert TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
2582b15cb3dSCy Schubert if (strcmp(rpc->uri, name) == 0)
2592b15cb3dSCy Schubert break;
2602b15cb3dSCy Schubert }
2612b15cb3dSCy Schubert if (rpc == NULL) {
2622b15cb3dSCy Schubert /* We did not find an RPC with this name */
2632b15cb3dSCy Schubert return (-1);
2642b15cb3dSCy Schubert }
2652b15cb3dSCy Schubert TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
2662b15cb3dSCy Schubert
2672b15cb3dSCy Schubert registered_uri = evrpc_construct_uri(name);
2682b15cb3dSCy Schubert
2692b15cb3dSCy Schubert /* remove the http server callback */
2702b15cb3dSCy Schubert r = evhttp_del_cb(base->http_server, registered_uri);
2712b15cb3dSCy Schubert EVUTIL_ASSERT(r == 0);
2722b15cb3dSCy Schubert
2732b15cb3dSCy Schubert mm_free(registered_uri);
2742b15cb3dSCy Schubert
2752b15cb3dSCy Schubert mm_free((char *)rpc->uri);
2762b15cb3dSCy Schubert mm_free(rpc);
2772b15cb3dSCy Schubert return (0);
2782b15cb3dSCy Schubert }
2792b15cb3dSCy Schubert
2802b15cb3dSCy Schubert static int evrpc_pause_request(void *vbase, void *ctx,
2812b15cb3dSCy Schubert void (*cb)(void *, enum EVRPC_HOOK_RESULT));
2822b15cb3dSCy Schubert static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
2832b15cb3dSCy Schubert
2842b15cb3dSCy Schubert static void
evrpc_request_cb(struct evhttp_request * req,void * arg)2852b15cb3dSCy Schubert evrpc_request_cb(struct evhttp_request *req, void *arg)
2862b15cb3dSCy Schubert {
2872b15cb3dSCy Schubert struct evrpc *rpc = arg;
2882b15cb3dSCy Schubert struct evrpc_req_generic *rpc_state = NULL;
2892b15cb3dSCy Schubert
2902b15cb3dSCy Schubert /* let's verify the outside parameters */
2912b15cb3dSCy Schubert if (req->type != EVHTTP_REQ_POST ||
2922b15cb3dSCy Schubert evbuffer_get_length(req->input_buffer) <= 0)
2932b15cb3dSCy Schubert goto error;
2942b15cb3dSCy Schubert
2952b15cb3dSCy Schubert rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
2962b15cb3dSCy Schubert if (rpc_state == NULL)
2972b15cb3dSCy Schubert goto error;
2982b15cb3dSCy Schubert rpc_state->rpc = rpc;
2992b15cb3dSCy Schubert rpc_state->http_req = req;
3002b15cb3dSCy Schubert rpc_state->rpc_data = NULL;
3012b15cb3dSCy Schubert
3022b15cb3dSCy Schubert if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
3032b15cb3dSCy Schubert int hook_res;
3042b15cb3dSCy Schubert
3052b15cb3dSCy Schubert evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
3062b15cb3dSCy Schubert
3072b15cb3dSCy Schubert /*
3082b15cb3dSCy Schubert * allow hooks to modify the outgoing request
3092b15cb3dSCy Schubert */
3102b15cb3dSCy Schubert hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
3112b15cb3dSCy Schubert rpc_state, req, req->input_buffer);
3122b15cb3dSCy Schubert switch (hook_res) {
3132b15cb3dSCy Schubert case EVRPC_TERMINATE:
3142b15cb3dSCy Schubert goto error;
3152b15cb3dSCy Schubert case EVRPC_PAUSE:
3162b15cb3dSCy Schubert evrpc_pause_request(rpc->base, rpc_state,
3172b15cb3dSCy Schubert evrpc_request_cb_closure);
3182b15cb3dSCy Schubert return;
3192b15cb3dSCy Schubert case EVRPC_CONTINUE:
3202b15cb3dSCy Schubert break;
3212b15cb3dSCy Schubert default:
3222b15cb3dSCy Schubert EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
3232b15cb3dSCy Schubert hook_res == EVRPC_CONTINUE ||
3242b15cb3dSCy Schubert hook_res == EVRPC_PAUSE);
3252b15cb3dSCy Schubert }
3262b15cb3dSCy Schubert }
3272b15cb3dSCy Schubert
3282b15cb3dSCy Schubert evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
3292b15cb3dSCy Schubert return;
3302b15cb3dSCy Schubert
3312b15cb3dSCy Schubert error:
332*a466cc55SCy Schubert if (rpc_state)
3332b15cb3dSCy Schubert evrpc_reqstate_free_(rpc_state);
3342b15cb3dSCy Schubert evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
3352b15cb3dSCy Schubert return;
3362b15cb3dSCy Schubert }
3372b15cb3dSCy Schubert
3382b15cb3dSCy Schubert static void
evrpc_request_cb_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)3392b15cb3dSCy Schubert evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
3402b15cb3dSCy Schubert {
3412b15cb3dSCy Schubert struct evrpc_req_generic *rpc_state = arg;
3422b15cb3dSCy Schubert struct evrpc *rpc;
3432b15cb3dSCy Schubert struct evhttp_request *req;
3442b15cb3dSCy Schubert
3452b15cb3dSCy Schubert EVUTIL_ASSERT(rpc_state);
3462b15cb3dSCy Schubert rpc = rpc_state->rpc;
3472b15cb3dSCy Schubert req = rpc_state->http_req;
3482b15cb3dSCy Schubert
3492b15cb3dSCy Schubert if (hook_res == EVRPC_TERMINATE)
3502b15cb3dSCy Schubert goto error;
3512b15cb3dSCy Schubert
3522b15cb3dSCy Schubert /* let's check that we can parse the request */
3532b15cb3dSCy Schubert rpc_state->request = rpc->request_new(rpc->request_new_arg);
3542b15cb3dSCy Schubert if (rpc_state->request == NULL)
3552b15cb3dSCy Schubert goto error;
3562b15cb3dSCy Schubert
3572b15cb3dSCy Schubert if (rpc->request_unmarshal(
3582b15cb3dSCy Schubert rpc_state->request, req->input_buffer) == -1) {
3592b15cb3dSCy Schubert /* we failed to parse the request; that's a bummer */
3602b15cb3dSCy Schubert goto error;
3612b15cb3dSCy Schubert }
3622b15cb3dSCy Schubert
3632b15cb3dSCy Schubert /* at this point, we have a well formed request, prepare the reply */
3642b15cb3dSCy Schubert
3652b15cb3dSCy Schubert rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
3662b15cb3dSCy Schubert if (rpc_state->reply == NULL)
3672b15cb3dSCy Schubert goto error;
3682b15cb3dSCy Schubert
3692b15cb3dSCy Schubert /* give the rpc to the user; they can deal with it */
3702b15cb3dSCy Schubert rpc->cb(rpc_state, rpc->cb_arg);
3712b15cb3dSCy Schubert
3722b15cb3dSCy Schubert return;
3732b15cb3dSCy Schubert
3742b15cb3dSCy Schubert error:
3752b15cb3dSCy Schubert evrpc_reqstate_free_(rpc_state);
3762b15cb3dSCy Schubert evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
3772b15cb3dSCy Schubert return;
3782b15cb3dSCy Schubert }
3792b15cb3dSCy Schubert
3802b15cb3dSCy Schubert
3812b15cb3dSCy Schubert void
evrpc_reqstate_free_(struct evrpc_req_generic * rpc_state)3822b15cb3dSCy Schubert evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state)
3832b15cb3dSCy Schubert {
3842b15cb3dSCy Schubert struct evrpc *rpc;
3852b15cb3dSCy Schubert EVUTIL_ASSERT(rpc_state != NULL);
3862b15cb3dSCy Schubert rpc = rpc_state->rpc;
3872b15cb3dSCy Schubert
3882b15cb3dSCy Schubert /* clean up all memory */
3892b15cb3dSCy Schubert if (rpc_state->hook_meta != NULL)
3902b15cb3dSCy Schubert evrpc_hook_context_free_(rpc_state->hook_meta);
3912b15cb3dSCy Schubert if (rpc_state->request != NULL)
3922b15cb3dSCy Schubert rpc->request_free(rpc_state->request);
3932b15cb3dSCy Schubert if (rpc_state->reply != NULL)
3942b15cb3dSCy Schubert rpc->reply_free(rpc_state->reply);
3952b15cb3dSCy Schubert if (rpc_state->rpc_data != NULL)
3962b15cb3dSCy Schubert evbuffer_free(rpc_state->rpc_data);
3972b15cb3dSCy Schubert mm_free(rpc_state);
3982b15cb3dSCy Schubert }
3992b15cb3dSCy Schubert
4002b15cb3dSCy Schubert static void
4012b15cb3dSCy Schubert evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
4022b15cb3dSCy Schubert
4032b15cb3dSCy Schubert void
evrpc_request_done(struct evrpc_req_generic * rpc_state)4042b15cb3dSCy Schubert evrpc_request_done(struct evrpc_req_generic *rpc_state)
4052b15cb3dSCy Schubert {
4062b15cb3dSCy Schubert struct evhttp_request *req;
4072b15cb3dSCy Schubert struct evrpc *rpc;
4082b15cb3dSCy Schubert
4092b15cb3dSCy Schubert EVUTIL_ASSERT(rpc_state);
4102b15cb3dSCy Schubert
4112b15cb3dSCy Schubert req = rpc_state->http_req;
4122b15cb3dSCy Schubert rpc = rpc_state->rpc;
4132b15cb3dSCy Schubert
4142b15cb3dSCy Schubert if (rpc->reply_complete(rpc_state->reply) == -1) {
4152b15cb3dSCy Schubert /* the reply was not completely filled in. error out */
4162b15cb3dSCy Schubert goto error;
4172b15cb3dSCy Schubert }
4182b15cb3dSCy Schubert
4192b15cb3dSCy Schubert if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
4202b15cb3dSCy Schubert /* out of memory */
4212b15cb3dSCy Schubert goto error;
4222b15cb3dSCy Schubert }
4232b15cb3dSCy Schubert
4242b15cb3dSCy Schubert /* serialize the reply */
4252b15cb3dSCy Schubert rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
4262b15cb3dSCy Schubert
4272b15cb3dSCy Schubert if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
4282b15cb3dSCy Schubert int hook_res;
4292b15cb3dSCy Schubert
4302b15cb3dSCy Schubert evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
4312b15cb3dSCy Schubert
4322b15cb3dSCy Schubert /* do hook based tweaks to the request */
4332b15cb3dSCy Schubert hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
4342b15cb3dSCy Schubert rpc_state, req, rpc_state->rpc_data);
4352b15cb3dSCy Schubert switch (hook_res) {
4362b15cb3dSCy Schubert case EVRPC_TERMINATE:
4372b15cb3dSCy Schubert goto error;
4382b15cb3dSCy Schubert case EVRPC_PAUSE:
4392b15cb3dSCy Schubert if (evrpc_pause_request(rpc->base, rpc_state,
4402b15cb3dSCy Schubert evrpc_request_done_closure) == -1)
4412b15cb3dSCy Schubert goto error;
4422b15cb3dSCy Schubert return;
4432b15cb3dSCy Schubert case EVRPC_CONTINUE:
4442b15cb3dSCy Schubert break;
4452b15cb3dSCy Schubert default:
4462b15cb3dSCy Schubert EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
4472b15cb3dSCy Schubert hook_res == EVRPC_CONTINUE ||
4482b15cb3dSCy Schubert hook_res == EVRPC_PAUSE);
4492b15cb3dSCy Schubert }
4502b15cb3dSCy Schubert }
4512b15cb3dSCy Schubert
4522b15cb3dSCy Schubert evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
4532b15cb3dSCy Schubert return;
4542b15cb3dSCy Schubert
4552b15cb3dSCy Schubert error:
4562b15cb3dSCy Schubert evrpc_reqstate_free_(rpc_state);
4572b15cb3dSCy Schubert evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
4582b15cb3dSCy Schubert return;
4592b15cb3dSCy Schubert }
4602b15cb3dSCy Schubert
4612b15cb3dSCy Schubert void *
evrpc_get_request(struct evrpc_req_generic * req)4622b15cb3dSCy Schubert evrpc_get_request(struct evrpc_req_generic *req)
4632b15cb3dSCy Schubert {
4642b15cb3dSCy Schubert return req->request;
4652b15cb3dSCy Schubert }
4662b15cb3dSCy Schubert
4672b15cb3dSCy Schubert void *
evrpc_get_reply(struct evrpc_req_generic * req)4682b15cb3dSCy Schubert evrpc_get_reply(struct evrpc_req_generic *req)
4692b15cb3dSCy Schubert {
4702b15cb3dSCy Schubert return req->reply;
4712b15cb3dSCy Schubert }
4722b15cb3dSCy Schubert
4732b15cb3dSCy Schubert static void
evrpc_request_done_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)4742b15cb3dSCy Schubert evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
4752b15cb3dSCy Schubert {
4762b15cb3dSCy Schubert struct evrpc_req_generic *rpc_state = arg;
4772b15cb3dSCy Schubert struct evhttp_request *req;
4782b15cb3dSCy Schubert EVUTIL_ASSERT(rpc_state);
4792b15cb3dSCy Schubert req = rpc_state->http_req;
4802b15cb3dSCy Schubert
4812b15cb3dSCy Schubert if (hook_res == EVRPC_TERMINATE)
4822b15cb3dSCy Schubert goto error;
4832b15cb3dSCy Schubert
4842b15cb3dSCy Schubert /* on success, we are going to transmit marshaled binary data */
4852b15cb3dSCy Schubert if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
4862b15cb3dSCy Schubert evhttp_add_header(req->output_headers,
4872b15cb3dSCy Schubert "Content-Type", "application/octet-stream");
4882b15cb3dSCy Schubert }
4892b15cb3dSCy Schubert evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
4902b15cb3dSCy Schubert
4912b15cb3dSCy Schubert evrpc_reqstate_free_(rpc_state);
4922b15cb3dSCy Schubert
4932b15cb3dSCy Schubert return;
4942b15cb3dSCy Schubert
4952b15cb3dSCy Schubert error:
4962b15cb3dSCy Schubert evrpc_reqstate_free_(rpc_state);
4972b15cb3dSCy Schubert evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
4982b15cb3dSCy Schubert return;
4992b15cb3dSCy Schubert }
5002b15cb3dSCy Schubert
5012b15cb3dSCy Schubert
5022b15cb3dSCy Schubert /* Client implementation of RPC site */
5032b15cb3dSCy Schubert
5042b15cb3dSCy Schubert static int evrpc_schedule_request(struct evhttp_connection *connection,
5052b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx);
5062b15cb3dSCy Schubert
5072b15cb3dSCy Schubert struct evrpc_pool *
evrpc_pool_new(struct event_base * base)5082b15cb3dSCy Schubert evrpc_pool_new(struct event_base *base)
5092b15cb3dSCy Schubert {
5102b15cb3dSCy Schubert struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
5112b15cb3dSCy Schubert if (pool == NULL)
5122b15cb3dSCy Schubert return (NULL);
5132b15cb3dSCy Schubert
5142b15cb3dSCy Schubert TAILQ_INIT(&pool->connections);
5152b15cb3dSCy Schubert TAILQ_INIT(&pool->requests);
5162b15cb3dSCy Schubert
5172b15cb3dSCy Schubert TAILQ_INIT(&pool->paused_requests);
5182b15cb3dSCy Schubert
5192b15cb3dSCy Schubert TAILQ_INIT(&pool->input_hooks);
5202b15cb3dSCy Schubert TAILQ_INIT(&pool->output_hooks);
5212b15cb3dSCy Schubert
5222b15cb3dSCy Schubert pool->base = base;
5232b15cb3dSCy Schubert pool->timeout = -1;
5242b15cb3dSCy Schubert
5252b15cb3dSCy Schubert return (pool);
5262b15cb3dSCy Schubert }
5272b15cb3dSCy Schubert
5282b15cb3dSCy Schubert static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper * request)5292b15cb3dSCy Schubert evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
5302b15cb3dSCy Schubert {
5312b15cb3dSCy Schubert if (request->hook_meta != NULL)
5322b15cb3dSCy Schubert evrpc_hook_context_free_(request->hook_meta);
5332b15cb3dSCy Schubert mm_free(request->name);
5342b15cb3dSCy Schubert mm_free(request);
5352b15cb3dSCy Schubert }
5362b15cb3dSCy Schubert
5372b15cb3dSCy Schubert void
evrpc_pool_free(struct evrpc_pool * pool)5382b15cb3dSCy Schubert evrpc_pool_free(struct evrpc_pool *pool)
5392b15cb3dSCy Schubert {
5402b15cb3dSCy Schubert struct evhttp_connection *connection;
5412b15cb3dSCy Schubert struct evrpc_request_wrapper *request;
5422b15cb3dSCy Schubert struct evrpc_hook_ctx *pause;
5432b15cb3dSCy Schubert struct evrpc_hook *hook;
5442b15cb3dSCy Schubert int r;
5452b15cb3dSCy Schubert
5462b15cb3dSCy Schubert while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
5472b15cb3dSCy Schubert TAILQ_REMOVE(&pool->requests, request, next);
5482b15cb3dSCy Schubert evrpc_request_wrapper_free(request);
5492b15cb3dSCy Schubert }
5502b15cb3dSCy Schubert
5512b15cb3dSCy Schubert while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
5522b15cb3dSCy Schubert TAILQ_REMOVE(&pool->paused_requests, pause, next);
5532b15cb3dSCy Schubert mm_free(pause);
5542b15cb3dSCy Schubert }
5552b15cb3dSCy Schubert
5562b15cb3dSCy Schubert while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
5572b15cb3dSCy Schubert TAILQ_REMOVE(&pool->connections, connection, next);
5582b15cb3dSCy Schubert evhttp_connection_free(connection);
5592b15cb3dSCy Schubert }
5602b15cb3dSCy Schubert
5612b15cb3dSCy Schubert while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
5622b15cb3dSCy Schubert r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
5632b15cb3dSCy Schubert EVUTIL_ASSERT(r);
5642b15cb3dSCy Schubert }
5652b15cb3dSCy Schubert
5662b15cb3dSCy Schubert while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
5672b15cb3dSCy Schubert r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
5682b15cb3dSCy Schubert EVUTIL_ASSERT(r);
5692b15cb3dSCy Schubert }
5702b15cb3dSCy Schubert
5712b15cb3dSCy Schubert mm_free(pool);
5722b15cb3dSCy Schubert }
5732b15cb3dSCy Schubert
5742b15cb3dSCy Schubert /*
5752b15cb3dSCy Schubert * Add a connection to the RPC pool. A request scheduled on the pool
5762b15cb3dSCy Schubert * may use any available connection.
5772b15cb3dSCy Schubert */
5782b15cb3dSCy Schubert
5792b15cb3dSCy Schubert void
evrpc_pool_add_connection(struct evrpc_pool * pool,struct evhttp_connection * connection)5802b15cb3dSCy Schubert evrpc_pool_add_connection(struct evrpc_pool *pool,
5812b15cb3dSCy Schubert struct evhttp_connection *connection)
5822b15cb3dSCy Schubert {
5832b15cb3dSCy Schubert EVUTIL_ASSERT(connection->http_server == NULL);
5842b15cb3dSCy Schubert TAILQ_INSERT_TAIL(&pool->connections, connection, next);
5852b15cb3dSCy Schubert
5862b15cb3dSCy Schubert /*
5872b15cb3dSCy Schubert * associate an event base with this connection
5882b15cb3dSCy Schubert */
5892b15cb3dSCy Schubert if (pool->base != NULL)
5902b15cb3dSCy Schubert evhttp_connection_set_base(connection, pool->base);
5912b15cb3dSCy Schubert
5922b15cb3dSCy Schubert /*
5932b15cb3dSCy Schubert * unless a timeout was specifically set for a connection,
5942b15cb3dSCy Schubert * the connection inherits the timeout from the pool.
5952b15cb3dSCy Schubert */
5962b15cb3dSCy Schubert if (!evutil_timerisset(&connection->timeout))
5972b15cb3dSCy Schubert evhttp_connection_set_timeout(connection, pool->timeout);
5982b15cb3dSCy Schubert
5992b15cb3dSCy Schubert /*
6002b15cb3dSCy Schubert * if we have any requests pending, schedule them with the new
6012b15cb3dSCy Schubert * connections.
6022b15cb3dSCy Schubert */
6032b15cb3dSCy Schubert
6042b15cb3dSCy Schubert if (TAILQ_FIRST(&pool->requests) != NULL) {
6052b15cb3dSCy Schubert struct evrpc_request_wrapper *request =
6062b15cb3dSCy Schubert TAILQ_FIRST(&pool->requests);
6072b15cb3dSCy Schubert TAILQ_REMOVE(&pool->requests, request, next);
6082b15cb3dSCy Schubert evrpc_schedule_request(connection, request);
6092b15cb3dSCy Schubert }
6102b15cb3dSCy Schubert }
6112b15cb3dSCy Schubert
6122b15cb3dSCy Schubert void
evrpc_pool_remove_connection(struct evrpc_pool * pool,struct evhttp_connection * connection)6132b15cb3dSCy Schubert evrpc_pool_remove_connection(struct evrpc_pool *pool,
6142b15cb3dSCy Schubert struct evhttp_connection *connection)
6152b15cb3dSCy Schubert {
6162b15cb3dSCy Schubert TAILQ_REMOVE(&pool->connections, connection, next);
6172b15cb3dSCy Schubert }
6182b15cb3dSCy Schubert
6192b15cb3dSCy Schubert void
evrpc_pool_set_timeout(struct evrpc_pool * pool,int timeout_in_secs)6202b15cb3dSCy Schubert evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
6212b15cb3dSCy Schubert {
6222b15cb3dSCy Schubert struct evhttp_connection *evcon;
6232b15cb3dSCy Schubert TAILQ_FOREACH(evcon, &pool->connections, next) {
6242b15cb3dSCy Schubert evhttp_connection_set_timeout(evcon, timeout_in_secs);
6252b15cb3dSCy Schubert }
6262b15cb3dSCy Schubert pool->timeout = timeout_in_secs;
6272b15cb3dSCy Schubert }
6282b15cb3dSCy Schubert
6292b15cb3dSCy Schubert
6302b15cb3dSCy Schubert static void evrpc_reply_done(struct evhttp_request *, void *);
6312b15cb3dSCy Schubert static void evrpc_request_timeout(evutil_socket_t, short, void *);
6322b15cb3dSCy Schubert
6332b15cb3dSCy Schubert /*
6342b15cb3dSCy Schubert * Finds a connection object associated with the pool that is currently
6352b15cb3dSCy Schubert * idle and can be used to make a request.
6362b15cb3dSCy Schubert */
6372b15cb3dSCy Schubert static struct evhttp_connection *
evrpc_pool_find_connection(struct evrpc_pool * pool)6382b15cb3dSCy Schubert evrpc_pool_find_connection(struct evrpc_pool *pool)
6392b15cb3dSCy Schubert {
6402b15cb3dSCy Schubert struct evhttp_connection *connection;
6412b15cb3dSCy Schubert TAILQ_FOREACH(connection, &pool->connections, next) {
6422b15cb3dSCy Schubert if (TAILQ_FIRST(&connection->requests) == NULL)
6432b15cb3dSCy Schubert return (connection);
6442b15cb3dSCy Schubert }
6452b15cb3dSCy Schubert
6462b15cb3dSCy Schubert return (NULL);
6472b15cb3dSCy Schubert }
6482b15cb3dSCy Schubert
6492b15cb3dSCy Schubert /*
6502b15cb3dSCy Schubert * Prototypes responsible for evrpc scheduling and hooking
6512b15cb3dSCy Schubert */
6522b15cb3dSCy Schubert
6532b15cb3dSCy Schubert static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
6542b15cb3dSCy Schubert
6552b15cb3dSCy Schubert /*
6562b15cb3dSCy Schubert * We assume that the ctx is no longer queued on the pool.
6572b15cb3dSCy Schubert */
6582b15cb3dSCy Schubert static int
evrpc_schedule_request(struct evhttp_connection * connection,struct evrpc_request_wrapper * ctx)6592b15cb3dSCy Schubert evrpc_schedule_request(struct evhttp_connection *connection,
6602b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx)
6612b15cb3dSCy Schubert {
6622b15cb3dSCy Schubert struct evhttp_request *req = NULL;
6632b15cb3dSCy Schubert struct evrpc_pool *pool = ctx->pool;
6642b15cb3dSCy Schubert struct evrpc_status status;
6652b15cb3dSCy Schubert
6662b15cb3dSCy Schubert if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
6672b15cb3dSCy Schubert goto error;
6682b15cb3dSCy Schubert
6692b15cb3dSCy Schubert /* serialize the request data into the output buffer */
6702b15cb3dSCy Schubert ctx->request_marshal(req->output_buffer, ctx->request);
6712b15cb3dSCy Schubert
6722b15cb3dSCy Schubert /* we need to know the connection that we might have to abort */
6732b15cb3dSCy Schubert ctx->evcon = connection;
6742b15cb3dSCy Schubert
6752b15cb3dSCy Schubert /* if we get paused we also need to know the request */
6762b15cb3dSCy Schubert ctx->req = req;
6772b15cb3dSCy Schubert
6782b15cb3dSCy Schubert if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
6792b15cb3dSCy Schubert int hook_res;
6802b15cb3dSCy Schubert
6812b15cb3dSCy Schubert evrpc_hook_associate_meta_(&ctx->hook_meta, connection);
6822b15cb3dSCy Schubert
6832b15cb3dSCy Schubert /* apply hooks to the outgoing request */
6842b15cb3dSCy Schubert hook_res = evrpc_process_hooks(&pool->output_hooks,
6852b15cb3dSCy Schubert ctx, req, req->output_buffer);
6862b15cb3dSCy Schubert
6872b15cb3dSCy Schubert switch (hook_res) {
6882b15cb3dSCy Schubert case EVRPC_TERMINATE:
6892b15cb3dSCy Schubert goto error;
6902b15cb3dSCy Schubert case EVRPC_PAUSE:
6912b15cb3dSCy Schubert /* we need to be explicitly resumed */
6922b15cb3dSCy Schubert if (evrpc_pause_request(pool, ctx,
6932b15cb3dSCy Schubert evrpc_schedule_request_closure) == -1)
6942b15cb3dSCy Schubert goto error;
6952b15cb3dSCy Schubert return (0);
6962b15cb3dSCy Schubert case EVRPC_CONTINUE:
6972b15cb3dSCy Schubert /* we can just continue */
6982b15cb3dSCy Schubert break;
6992b15cb3dSCy Schubert default:
7002b15cb3dSCy Schubert EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
7012b15cb3dSCy Schubert hook_res == EVRPC_CONTINUE ||
7022b15cb3dSCy Schubert hook_res == EVRPC_PAUSE);
7032b15cb3dSCy Schubert }
7042b15cb3dSCy Schubert }
7052b15cb3dSCy Schubert
7062b15cb3dSCy Schubert evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
7072b15cb3dSCy Schubert return (0);
7082b15cb3dSCy Schubert
7092b15cb3dSCy Schubert error:
7102b15cb3dSCy Schubert memset(&status, 0, sizeof(status));
7112b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_UNSTARTED;
7122b15cb3dSCy Schubert (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
7132b15cb3dSCy Schubert evrpc_request_wrapper_free(ctx);
7142b15cb3dSCy Schubert return (-1);
7152b15cb3dSCy Schubert }
7162b15cb3dSCy Schubert
7172b15cb3dSCy Schubert static void
evrpc_schedule_request_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)7182b15cb3dSCy Schubert evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
7192b15cb3dSCy Schubert {
7202b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = arg;
7212b15cb3dSCy Schubert struct evhttp_connection *connection = ctx->evcon;
7222b15cb3dSCy Schubert struct evhttp_request *req = ctx->req;
7232b15cb3dSCy Schubert struct evrpc_pool *pool = ctx->pool;
7242b15cb3dSCy Schubert struct evrpc_status status;
7252b15cb3dSCy Schubert char *uri = NULL;
7262b15cb3dSCy Schubert int res = 0;
7272b15cb3dSCy Schubert
7282b15cb3dSCy Schubert if (hook_res == EVRPC_TERMINATE)
7292b15cb3dSCy Schubert goto error;
7302b15cb3dSCy Schubert
7312b15cb3dSCy Schubert uri = evrpc_construct_uri(ctx->name);
7322b15cb3dSCy Schubert if (uri == NULL)
7332b15cb3dSCy Schubert goto error;
7342b15cb3dSCy Schubert
7352b15cb3dSCy Schubert if (pool->timeout > 0) {
7362b15cb3dSCy Schubert /*
7372b15cb3dSCy Schubert * a timeout after which the whole rpc is going to be aborted.
7382b15cb3dSCy Schubert */
7392b15cb3dSCy Schubert struct timeval tv;
7402b15cb3dSCy Schubert evutil_timerclear(&tv);
7412b15cb3dSCy Schubert tv.tv_sec = pool->timeout;
7422b15cb3dSCy Schubert evtimer_add(&ctx->ev_timeout, &tv);
7432b15cb3dSCy Schubert }
7442b15cb3dSCy Schubert
7452b15cb3dSCy Schubert /* start the request over the connection */
7462b15cb3dSCy Schubert res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
7472b15cb3dSCy Schubert mm_free(uri);
7482b15cb3dSCy Schubert
7492b15cb3dSCy Schubert if (res == -1)
7502b15cb3dSCy Schubert goto error;
7512b15cb3dSCy Schubert
7522b15cb3dSCy Schubert return;
7532b15cb3dSCy Schubert
7542b15cb3dSCy Schubert error:
7552b15cb3dSCy Schubert memset(&status, 0, sizeof(status));
7562b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_UNSTARTED;
7572b15cb3dSCy Schubert (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
7582b15cb3dSCy Schubert evrpc_request_wrapper_free(ctx);
7592b15cb3dSCy Schubert }
7602b15cb3dSCy Schubert
7612b15cb3dSCy Schubert /* we just queue the paused request on the pool under the req object */
7622b15cb3dSCy Schubert static int
evrpc_pause_request(void * vbase,void * ctx,void (* cb)(void *,enum EVRPC_HOOK_RESULT))7632b15cb3dSCy Schubert evrpc_pause_request(void *vbase, void *ctx,
7642b15cb3dSCy Schubert void (*cb)(void *, enum EVRPC_HOOK_RESULT))
7652b15cb3dSCy Schubert {
7662b15cb3dSCy Schubert struct evrpc_hooks_ *base = vbase;
7672b15cb3dSCy Schubert struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
7682b15cb3dSCy Schubert if (pause == NULL)
7692b15cb3dSCy Schubert return (-1);
7702b15cb3dSCy Schubert
7712b15cb3dSCy Schubert pause->ctx = ctx;
7722b15cb3dSCy Schubert pause->cb = cb;
7732b15cb3dSCy Schubert
7742b15cb3dSCy Schubert TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
7752b15cb3dSCy Schubert return (0);
7762b15cb3dSCy Schubert }
7772b15cb3dSCy Schubert
7782b15cb3dSCy Schubert int
evrpc_resume_request(void * vbase,void * ctx,enum EVRPC_HOOK_RESULT res)7792b15cb3dSCy Schubert evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
7802b15cb3dSCy Schubert {
7812b15cb3dSCy Schubert struct evrpc_hooks_ *base = vbase;
7822b15cb3dSCy Schubert struct evrpc_pause_list *head = &base->pause_requests;
7832b15cb3dSCy Schubert struct evrpc_hook_ctx *pause;
7842b15cb3dSCy Schubert
7852b15cb3dSCy Schubert TAILQ_FOREACH(pause, head, next) {
7862b15cb3dSCy Schubert if (pause->ctx == ctx)
7872b15cb3dSCy Schubert break;
7882b15cb3dSCy Schubert }
7892b15cb3dSCy Schubert
7902b15cb3dSCy Schubert if (pause == NULL)
7912b15cb3dSCy Schubert return (-1);
7922b15cb3dSCy Schubert
7932b15cb3dSCy Schubert (*pause->cb)(pause->ctx, res);
7942b15cb3dSCy Schubert TAILQ_REMOVE(head, pause, next);
7952b15cb3dSCy Schubert mm_free(pause);
7962b15cb3dSCy Schubert return (0);
7972b15cb3dSCy Schubert }
7982b15cb3dSCy Schubert
7992b15cb3dSCy Schubert int
evrpc_make_request(struct evrpc_request_wrapper * ctx)8002b15cb3dSCy Schubert evrpc_make_request(struct evrpc_request_wrapper *ctx)
8012b15cb3dSCy Schubert {
8022b15cb3dSCy Schubert struct evrpc_pool *pool = ctx->pool;
8032b15cb3dSCy Schubert
8042b15cb3dSCy Schubert /* initialize the event structure for this rpc */
8052b15cb3dSCy Schubert evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
8062b15cb3dSCy Schubert
8072b15cb3dSCy Schubert /* we better have some available connections on the pool */
8082b15cb3dSCy Schubert EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
8092b15cb3dSCy Schubert
8102b15cb3dSCy Schubert /*
8112b15cb3dSCy Schubert * if no connection is available, we queue the request on the pool,
8122b15cb3dSCy Schubert * the next time a connection is empty, the rpc will be send on that.
8132b15cb3dSCy Schubert */
8142b15cb3dSCy Schubert TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
8152b15cb3dSCy Schubert
8162b15cb3dSCy Schubert evrpc_pool_schedule(pool);
8172b15cb3dSCy Schubert
8182b15cb3dSCy Schubert return (0);
8192b15cb3dSCy Schubert }
8202b15cb3dSCy Schubert
8212b15cb3dSCy Schubert
8222b15cb3dSCy Schubert struct evrpc_request_wrapper *
evrpc_make_request_ctx(struct evrpc_pool * pool,void * request,void * reply,const char * rpcname,void (* req_marshal)(struct evbuffer *,void *),void (* rpl_clear)(void *),int (* rpl_unmarshal)(void *,struct evbuffer *),void (* cb)(struct evrpc_status *,void *,void *,void *),void * cbarg)8232b15cb3dSCy Schubert evrpc_make_request_ctx(
8242b15cb3dSCy Schubert struct evrpc_pool *pool, void *request, void *reply,
8252b15cb3dSCy Schubert const char *rpcname,
8262b15cb3dSCy Schubert void (*req_marshal)(struct evbuffer*, void *),
8272b15cb3dSCy Schubert void (*rpl_clear)(void *),
8282b15cb3dSCy Schubert int (*rpl_unmarshal)(void *, struct evbuffer *),
8292b15cb3dSCy Schubert void (*cb)(struct evrpc_status *, void *, void *, void *),
8302b15cb3dSCy Schubert void *cbarg)
8312b15cb3dSCy Schubert {
8322b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
8332b15cb3dSCy Schubert mm_malloc(sizeof(struct evrpc_request_wrapper));
8342b15cb3dSCy Schubert if (ctx == NULL)
8352b15cb3dSCy Schubert return (NULL);
8362b15cb3dSCy Schubert
8372b15cb3dSCy Schubert ctx->pool = pool;
8382b15cb3dSCy Schubert ctx->hook_meta = NULL;
8392b15cb3dSCy Schubert ctx->evcon = NULL;
8402b15cb3dSCy Schubert ctx->name = mm_strdup(rpcname);
8412b15cb3dSCy Schubert if (ctx->name == NULL) {
8422b15cb3dSCy Schubert mm_free(ctx);
8432b15cb3dSCy Schubert return (NULL);
8442b15cb3dSCy Schubert }
8452b15cb3dSCy Schubert ctx->cb = cb;
8462b15cb3dSCy Schubert ctx->cb_arg = cbarg;
8472b15cb3dSCy Schubert ctx->request = request;
8482b15cb3dSCy Schubert ctx->reply = reply;
8492b15cb3dSCy Schubert ctx->request_marshal = req_marshal;
8502b15cb3dSCy Schubert ctx->reply_clear = rpl_clear;
8512b15cb3dSCy Schubert ctx->reply_unmarshal = rpl_unmarshal;
8522b15cb3dSCy Schubert
8532b15cb3dSCy Schubert return (ctx);
8542b15cb3dSCy Schubert }
8552b15cb3dSCy Schubert
8562b15cb3dSCy Schubert static void
8572b15cb3dSCy Schubert evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
8582b15cb3dSCy Schubert
8592b15cb3dSCy Schubert static void
evrpc_reply_done(struct evhttp_request * req,void * arg)8602b15cb3dSCy Schubert evrpc_reply_done(struct evhttp_request *req, void *arg)
8612b15cb3dSCy Schubert {
8622b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = arg;
8632b15cb3dSCy Schubert struct evrpc_pool *pool = ctx->pool;
8642b15cb3dSCy Schubert int hook_res = EVRPC_CONTINUE;
8652b15cb3dSCy Schubert
8662b15cb3dSCy Schubert /* cancel any timeout we might have scheduled */
8672b15cb3dSCy Schubert event_del(&ctx->ev_timeout);
8682b15cb3dSCy Schubert
8692b15cb3dSCy Schubert ctx->req = req;
8702b15cb3dSCy Schubert
8712b15cb3dSCy Schubert /* we need to get the reply now */
8722b15cb3dSCy Schubert if (req == NULL) {
8732b15cb3dSCy Schubert evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
8742b15cb3dSCy Schubert return;
8752b15cb3dSCy Schubert }
8762b15cb3dSCy Schubert
8772b15cb3dSCy Schubert if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
8782b15cb3dSCy Schubert evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon);
8792b15cb3dSCy Schubert
8802b15cb3dSCy Schubert /* apply hooks to the incoming request */
8812b15cb3dSCy Schubert hook_res = evrpc_process_hooks(&pool->input_hooks,
8822b15cb3dSCy Schubert ctx, req, req->input_buffer);
8832b15cb3dSCy Schubert
8842b15cb3dSCy Schubert switch (hook_res) {
8852b15cb3dSCy Schubert case EVRPC_TERMINATE:
8862b15cb3dSCy Schubert case EVRPC_CONTINUE:
8872b15cb3dSCy Schubert break;
8882b15cb3dSCy Schubert case EVRPC_PAUSE:
8892b15cb3dSCy Schubert /*
8902b15cb3dSCy Schubert * if we get paused we also need to know the
8912b15cb3dSCy Schubert * request. unfortunately, the underlying
8922b15cb3dSCy Schubert * layer is going to free it. we need to
8932b15cb3dSCy Schubert * request ownership explicitly
8942b15cb3dSCy Schubert */
8952b15cb3dSCy Schubert evhttp_request_own(req);
8962b15cb3dSCy Schubert
8972b15cb3dSCy Schubert evrpc_pause_request(pool, ctx,
8982b15cb3dSCy Schubert evrpc_reply_done_closure);
8992b15cb3dSCy Schubert return;
9002b15cb3dSCy Schubert default:
9012b15cb3dSCy Schubert EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
9022b15cb3dSCy Schubert hook_res == EVRPC_CONTINUE ||
9032b15cb3dSCy Schubert hook_res == EVRPC_PAUSE);
9042b15cb3dSCy Schubert }
9052b15cb3dSCy Schubert }
9062b15cb3dSCy Schubert
9072b15cb3dSCy Schubert evrpc_reply_done_closure(ctx, hook_res);
9082b15cb3dSCy Schubert
9092b15cb3dSCy Schubert /* http request is being freed by underlying layer */
9102b15cb3dSCy Schubert }
9112b15cb3dSCy Schubert
9122b15cb3dSCy Schubert static void
evrpc_reply_done_closure(void * arg,enum EVRPC_HOOK_RESULT hook_res)9132b15cb3dSCy Schubert evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
9142b15cb3dSCy Schubert {
9152b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = arg;
9162b15cb3dSCy Schubert struct evhttp_request *req = ctx->req;
9172b15cb3dSCy Schubert struct evrpc_pool *pool = ctx->pool;
9182b15cb3dSCy Schubert struct evrpc_status status;
9192b15cb3dSCy Schubert int res = -1;
9202b15cb3dSCy Schubert
9212b15cb3dSCy Schubert memset(&status, 0, sizeof(status));
9222b15cb3dSCy Schubert status.http_req = req;
9232b15cb3dSCy Schubert
9242b15cb3dSCy Schubert /* we need to get the reply now */
9252b15cb3dSCy Schubert if (req == NULL) {
9262b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_TIMEOUT;
9272b15cb3dSCy Schubert } else if (hook_res == EVRPC_TERMINATE) {
9282b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_HOOKABORTED;
9292b15cb3dSCy Schubert } else {
9302b15cb3dSCy Schubert res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
9312b15cb3dSCy Schubert if (res == -1)
9322b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
9332b15cb3dSCy Schubert }
9342b15cb3dSCy Schubert
9352b15cb3dSCy Schubert if (res == -1) {
9362b15cb3dSCy Schubert /* clear everything that we might have written previously */
9372b15cb3dSCy Schubert ctx->reply_clear(ctx->reply);
9382b15cb3dSCy Schubert }
9392b15cb3dSCy Schubert
9402b15cb3dSCy Schubert (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
9412b15cb3dSCy Schubert
9422b15cb3dSCy Schubert evrpc_request_wrapper_free(ctx);
9432b15cb3dSCy Schubert
9442b15cb3dSCy Schubert /* the http layer owned the original request structure, but if we
9452b15cb3dSCy Schubert * got paused, we asked for ownership and need to free it here. */
9462b15cb3dSCy Schubert if (req != NULL && evhttp_request_is_owned(req))
9472b15cb3dSCy Schubert evhttp_request_free(req);
9482b15cb3dSCy Schubert
9492b15cb3dSCy Schubert /* see if we can schedule another request */
9502b15cb3dSCy Schubert evrpc_pool_schedule(pool);
9512b15cb3dSCy Schubert }
9522b15cb3dSCy Schubert
9532b15cb3dSCy Schubert static void
evrpc_pool_schedule(struct evrpc_pool * pool)9542b15cb3dSCy Schubert evrpc_pool_schedule(struct evrpc_pool *pool)
9552b15cb3dSCy Schubert {
9562b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
9572b15cb3dSCy Schubert struct evhttp_connection *evcon;
9582b15cb3dSCy Schubert
9592b15cb3dSCy Schubert /* if no requests are pending, we have no work */
9602b15cb3dSCy Schubert if (ctx == NULL)
9612b15cb3dSCy Schubert return;
9622b15cb3dSCy Schubert
9632b15cb3dSCy Schubert if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
9642b15cb3dSCy Schubert TAILQ_REMOVE(&pool->requests, ctx, next);
9652b15cb3dSCy Schubert evrpc_schedule_request(evcon, ctx);
9662b15cb3dSCy Schubert }
9672b15cb3dSCy Schubert }
9682b15cb3dSCy Schubert
9692b15cb3dSCy Schubert static void
evrpc_request_timeout(evutil_socket_t fd,short what,void * arg)9702b15cb3dSCy Schubert evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
9712b15cb3dSCy Schubert {
9722b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx = arg;
9732b15cb3dSCy Schubert struct evhttp_connection *evcon = ctx->evcon;
9742b15cb3dSCy Schubert EVUTIL_ASSERT(evcon != NULL);
9752b15cb3dSCy Schubert
9762b15cb3dSCy Schubert evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT);
9772b15cb3dSCy Schubert }
9782b15cb3dSCy Schubert
9792b15cb3dSCy Schubert /*
9802b15cb3dSCy Schubert * frees potential meta data associated with a request.
9812b15cb3dSCy Schubert */
9822b15cb3dSCy Schubert
9832b15cb3dSCy Schubert static void
evrpc_meta_data_free(struct evrpc_meta_list * meta_data)9842b15cb3dSCy Schubert evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
9852b15cb3dSCy Schubert {
9862b15cb3dSCy Schubert struct evrpc_meta *entry;
9872b15cb3dSCy Schubert EVUTIL_ASSERT(meta_data != NULL);
9882b15cb3dSCy Schubert
9892b15cb3dSCy Schubert while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
9902b15cb3dSCy Schubert TAILQ_REMOVE(meta_data, entry, next);
9912b15cb3dSCy Schubert mm_free(entry->key);
9922b15cb3dSCy Schubert mm_free(entry->data);
9932b15cb3dSCy Schubert mm_free(entry);
9942b15cb3dSCy Schubert }
9952b15cb3dSCy Schubert }
9962b15cb3dSCy Schubert
9972b15cb3dSCy Schubert static struct evrpc_hook_meta *
evrpc_hook_meta_new_(void)9982b15cb3dSCy Schubert evrpc_hook_meta_new_(void)
9992b15cb3dSCy Schubert {
10002b15cb3dSCy Schubert struct evrpc_hook_meta *ctx;
10012b15cb3dSCy Schubert ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
10022b15cb3dSCy Schubert EVUTIL_ASSERT(ctx != NULL);
10032b15cb3dSCy Schubert
10042b15cb3dSCy Schubert TAILQ_INIT(&ctx->meta_data);
10052b15cb3dSCy Schubert ctx->evcon = NULL;
10062b15cb3dSCy Schubert
10072b15cb3dSCy Schubert return (ctx);
10082b15cb3dSCy Schubert }
10092b15cb3dSCy Schubert
10102b15cb3dSCy Schubert static void
evrpc_hook_associate_meta_(struct evrpc_hook_meta ** pctx,struct evhttp_connection * evcon)10112b15cb3dSCy Schubert evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx,
10122b15cb3dSCy Schubert struct evhttp_connection *evcon)
10132b15cb3dSCy Schubert {
10142b15cb3dSCy Schubert struct evrpc_hook_meta *ctx = *pctx;
10152b15cb3dSCy Schubert if (ctx == NULL)
10162b15cb3dSCy Schubert *pctx = ctx = evrpc_hook_meta_new_();
10172b15cb3dSCy Schubert ctx->evcon = evcon;
10182b15cb3dSCy Schubert }
10192b15cb3dSCy Schubert
10202b15cb3dSCy Schubert static void
evrpc_hook_context_free_(struct evrpc_hook_meta * ctx)10212b15cb3dSCy Schubert evrpc_hook_context_free_(struct evrpc_hook_meta *ctx)
10222b15cb3dSCy Schubert {
10232b15cb3dSCy Schubert evrpc_meta_data_free(&ctx->meta_data);
10242b15cb3dSCy Schubert mm_free(ctx);
10252b15cb3dSCy Schubert }
10262b15cb3dSCy Schubert
10272b15cb3dSCy Schubert /* Adds meta data */
10282b15cb3dSCy Schubert void
evrpc_hook_add_meta(void * ctx,const char * key,const void * data,size_t data_size)10292b15cb3dSCy Schubert evrpc_hook_add_meta(void *ctx, const char *key,
10302b15cb3dSCy Schubert const void *data, size_t data_size)
10312b15cb3dSCy Schubert {
10322b15cb3dSCy Schubert struct evrpc_request_wrapper *req = ctx;
10332b15cb3dSCy Schubert struct evrpc_hook_meta *store = NULL;
10342b15cb3dSCy Schubert struct evrpc_meta *meta = NULL;
10352b15cb3dSCy Schubert
10362b15cb3dSCy Schubert if ((store = req->hook_meta) == NULL)
10372b15cb3dSCy Schubert store = req->hook_meta = evrpc_hook_meta_new_();
10382b15cb3dSCy Schubert
10392b15cb3dSCy Schubert meta = mm_malloc(sizeof(struct evrpc_meta));
10402b15cb3dSCy Schubert EVUTIL_ASSERT(meta != NULL);
10412b15cb3dSCy Schubert meta->key = mm_strdup(key);
10422b15cb3dSCy Schubert EVUTIL_ASSERT(meta->key != NULL);
10432b15cb3dSCy Schubert meta->data_size = data_size;
10442b15cb3dSCy Schubert meta->data = mm_malloc(data_size);
10452b15cb3dSCy Schubert EVUTIL_ASSERT(meta->data != NULL);
10462b15cb3dSCy Schubert memcpy(meta->data, data, data_size);
10472b15cb3dSCy Schubert
10482b15cb3dSCy Schubert TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
10492b15cb3dSCy Schubert }
10502b15cb3dSCy Schubert
10512b15cb3dSCy Schubert int
evrpc_hook_find_meta(void * ctx,const char * key,void ** data,size_t * data_size)10522b15cb3dSCy Schubert evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
10532b15cb3dSCy Schubert {
10542b15cb3dSCy Schubert struct evrpc_request_wrapper *req = ctx;
10552b15cb3dSCy Schubert struct evrpc_meta *meta = NULL;
10562b15cb3dSCy Schubert
10572b15cb3dSCy Schubert if (req->hook_meta == NULL)
10582b15cb3dSCy Schubert return (-1);
10592b15cb3dSCy Schubert
10602b15cb3dSCy Schubert TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
10612b15cb3dSCy Schubert if (strcmp(meta->key, key) == 0) {
10622b15cb3dSCy Schubert *data = meta->data;
10632b15cb3dSCy Schubert *data_size = meta->data_size;
10642b15cb3dSCy Schubert return (0);
10652b15cb3dSCy Schubert }
10662b15cb3dSCy Schubert }
10672b15cb3dSCy Schubert
10682b15cb3dSCy Schubert return (-1);
10692b15cb3dSCy Schubert }
10702b15cb3dSCy Schubert
10712b15cb3dSCy Schubert struct evhttp_connection *
evrpc_hook_get_connection(void * ctx)10722b15cb3dSCy Schubert evrpc_hook_get_connection(void *ctx)
10732b15cb3dSCy Schubert {
10742b15cb3dSCy Schubert struct evrpc_request_wrapper *req = ctx;
10752b15cb3dSCy Schubert return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
10762b15cb3dSCy Schubert }
10772b15cb3dSCy Schubert
10782b15cb3dSCy Schubert int
evrpc_send_request_generic(struct evrpc_pool * pool,void * request,void * reply,void (* cb)(struct evrpc_status *,void *,void *,void *),void * cb_arg,const char * rpcname,void (* req_marshal)(struct evbuffer *,void *),void (* rpl_clear)(void *),int (* rpl_unmarshal)(void *,struct evbuffer *))10792b15cb3dSCy Schubert evrpc_send_request_generic(struct evrpc_pool *pool,
10802b15cb3dSCy Schubert void *request, void *reply,
10812b15cb3dSCy Schubert void (*cb)(struct evrpc_status *, void *, void *, void *),
10822b15cb3dSCy Schubert void *cb_arg,
10832b15cb3dSCy Schubert const char *rpcname,
10842b15cb3dSCy Schubert void (*req_marshal)(struct evbuffer *, void *),
10852b15cb3dSCy Schubert void (*rpl_clear)(void *),
10862b15cb3dSCy Schubert int (*rpl_unmarshal)(void *, struct evbuffer *))
10872b15cb3dSCy Schubert {
10882b15cb3dSCy Schubert struct evrpc_status status;
10892b15cb3dSCy Schubert struct evrpc_request_wrapper *ctx;
10902b15cb3dSCy Schubert ctx = evrpc_make_request_ctx(pool, request, reply,
10912b15cb3dSCy Schubert rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
10922b15cb3dSCy Schubert if (ctx == NULL)
10932b15cb3dSCy Schubert goto error;
10942b15cb3dSCy Schubert return (evrpc_make_request(ctx));
10952b15cb3dSCy Schubert error:
10962b15cb3dSCy Schubert memset(&status, 0, sizeof(status));
10972b15cb3dSCy Schubert status.error = EVRPC_STATUS_ERR_UNSTARTED;
10982b15cb3dSCy Schubert (*(cb))(&status, request, reply, cb_arg);
10992b15cb3dSCy Schubert return (-1);
11002b15cb3dSCy Schubert }
11012b15cb3dSCy Schubert
11022b15cb3dSCy Schubert /** Takes a request object and fills it in with the right magic */
11032b15cb3dSCy Schubert static struct evrpc *
evrpc_register_object(const char * name,void * (* req_new)(void *),void * req_new_arg,void (* req_free)(void *),int (* req_unmarshal)(void *,struct evbuffer *),void * (* rpl_new)(void *),void * rpl_new_arg,void (* rpl_free)(void *),int (* rpl_complete)(void *),void (* rpl_marshal)(struct evbuffer *,void *))11042b15cb3dSCy Schubert evrpc_register_object(const char *name,
11052b15cb3dSCy Schubert void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
11062b15cb3dSCy Schubert int (*req_unmarshal)(void *, struct evbuffer *),
11072b15cb3dSCy Schubert void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
11082b15cb3dSCy Schubert int (*rpl_complete)(void *),
11092b15cb3dSCy Schubert void (*rpl_marshal)(struct evbuffer *, void *))
11102b15cb3dSCy Schubert {
11112b15cb3dSCy Schubert struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
11122b15cb3dSCy Schubert if (rpc == NULL)
11132b15cb3dSCy Schubert return (NULL);
11142b15cb3dSCy Schubert rpc->uri = mm_strdup(name);
11152b15cb3dSCy Schubert if (rpc->uri == NULL) {
11162b15cb3dSCy Schubert mm_free(rpc);
11172b15cb3dSCy Schubert return (NULL);
11182b15cb3dSCy Schubert }
11192b15cb3dSCy Schubert rpc->request_new = req_new;
11202b15cb3dSCy Schubert rpc->request_new_arg = req_new_arg;
11212b15cb3dSCy Schubert rpc->request_free = req_free;
11222b15cb3dSCy Schubert rpc->request_unmarshal = req_unmarshal;
11232b15cb3dSCy Schubert rpc->reply_new = rpl_new;
11242b15cb3dSCy Schubert rpc->reply_new_arg = rpl_new_arg;
11252b15cb3dSCy Schubert rpc->reply_free = rpl_free;
11262b15cb3dSCy Schubert rpc->reply_complete = rpl_complete;
11272b15cb3dSCy Schubert rpc->reply_marshal = rpl_marshal;
11282b15cb3dSCy Schubert return (rpc);
11292b15cb3dSCy Schubert }
11302b15cb3dSCy Schubert
11312b15cb3dSCy Schubert int
evrpc_register_generic(struct evrpc_base * base,const char * name,void (* callback)(struct evrpc_req_generic *,void *),void * cbarg,void * (* req_new)(void *),void * req_new_arg,void (* req_free)(void *),int (* req_unmarshal)(void *,struct evbuffer *),void * (* rpl_new)(void *),void * rpl_new_arg,void (* rpl_free)(void *),int (* rpl_complete)(void *),void (* rpl_marshal)(struct evbuffer *,void *))11322b15cb3dSCy Schubert evrpc_register_generic(struct evrpc_base *base, const char *name,
11332b15cb3dSCy Schubert void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
11342b15cb3dSCy Schubert void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
11352b15cb3dSCy Schubert int (*req_unmarshal)(void *, struct evbuffer *),
11362b15cb3dSCy Schubert void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
11372b15cb3dSCy Schubert int (*rpl_complete)(void *),
11382b15cb3dSCy Schubert void (*rpl_marshal)(struct evbuffer *, void *))
11392b15cb3dSCy Schubert {
11402b15cb3dSCy Schubert struct evrpc* rpc =
11412b15cb3dSCy Schubert evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
11422b15cb3dSCy Schubert rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
11432b15cb3dSCy Schubert if (rpc == NULL)
11442b15cb3dSCy Schubert return (-1);
11452b15cb3dSCy Schubert evrpc_register_rpc(base, rpc,
11462b15cb3dSCy Schubert (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
11472b15cb3dSCy Schubert return (0);
11482b15cb3dSCy Schubert }
11492b15cb3dSCy Schubert
11502b15cb3dSCy Schubert /** accessors for obscure and undocumented functionality */
11512b15cb3dSCy Schubert struct evrpc_pool *
evrpc_request_get_pool(struct evrpc_request_wrapper * ctx)11522b15cb3dSCy Schubert evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
11532b15cb3dSCy Schubert {
11542b15cb3dSCy Schubert return (ctx->pool);
11552b15cb3dSCy Schubert }
11562b15cb3dSCy Schubert
11572b15cb3dSCy Schubert void
evrpc_request_set_pool(struct evrpc_request_wrapper * ctx,struct evrpc_pool * pool)11582b15cb3dSCy Schubert evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
11592b15cb3dSCy Schubert struct evrpc_pool *pool)
11602b15cb3dSCy Schubert {
11612b15cb3dSCy Schubert ctx->pool = pool;
11622b15cb3dSCy Schubert }
11632b15cb3dSCy Schubert
11642b15cb3dSCy Schubert void
evrpc_request_set_cb(struct evrpc_request_wrapper * ctx,void (* cb)(struct evrpc_status *,void * request,void * reply,void * arg),void * cb_arg)11652b15cb3dSCy Schubert evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
11662b15cb3dSCy Schubert void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
11672b15cb3dSCy Schubert void *cb_arg)
11682b15cb3dSCy Schubert {
11692b15cb3dSCy Schubert ctx->cb = cb;
11702b15cb3dSCy Schubert ctx->cb_arg = cb_arg;
11712b15cb3dSCy Schubert }
1172