Lines Matching +full:rpc +full:- +full:if
2 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 #include "event2/event-config.h"
28 #include "evconfig-private.h"
58 #include "event2/rpc.h"
60 #include "evrpc-internal.h"
67 #include "util-internal.h"
68 #include "log-internal.h"
69 #include "mm-internal.h"
75 if (base == NULL) in evrpc_init()
81 TAILQ_INIT(&base->registered_rpcs); in evrpc_init()
82 TAILQ_INIT(&base->input_hooks); in evrpc_init()
83 TAILQ_INIT(&base->output_hooks); in evrpc_init()
85 TAILQ_INIT(&base->paused_requests); in evrpc_init()
87 base->http_server = http_server; in evrpc_init()
95 struct evrpc *rpc; in evrpc_free() local
100 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { in evrpc_free()
101 r = evrpc_unregister_rpc(base, rpc->uri); in evrpc_free()
104 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { in evrpc_free()
105 TAILQ_REMOVE(&base->paused_requests, pause, next); in evrpc_free()
108 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { in evrpc_free()
112 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { in evrpc_free()
130 head = &base->in_hooks; in evrpc_add_hook()
133 head = &base->out_hooks; in evrpc_add_hook()
142 hook->process = cb; in evrpc_add_hook()
143 hook->process_arg = cb_arg; in evrpc_add_hook()
154 if (hook == handle) { in evrpc_remove_hook_internal()
175 head = &base->in_hooks; in evrpc_remove_hook()
178 head = &base->out_hooks; in evrpc_remove_hook()
193 int res = hook->process(ctx, req, evbuf, hook->process_arg); in evrpc_process_hooks()
194 if (res != EVRPC_CONTINUE) in evrpc_process_hooks()
205 * Registers a new RPC with the HTTP server. The evrpc object is expected
217 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) in evrpc_construct_uri()
218 event_err(1, "%s: failed to register rpc at %s", in evrpc_construct_uri()
222 constructed_uri[constructed_uri_len - 1] = '\0'; in evrpc_construct_uri()
228 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, in evrpc_register_rpc() argument
231 char *constructed_uri = evrpc_construct_uri(rpc->uri); in evrpc_register_rpc()
233 rpc->base = base; in evrpc_register_rpc()
234 rpc->cb = cb; in evrpc_register_rpc()
235 rpc->cb_arg = cb_arg; in evrpc_register_rpc()
237 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); in evrpc_register_rpc()
239 evhttp_set_cb(base->http_server, in evrpc_register_rpc()
242 rpc); in evrpc_register_rpc()
253 struct evrpc *rpc; in evrpc_unregister_rpc() local
256 /* find the right rpc; linear search might be slow */ in evrpc_unregister_rpc()
257 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { in evrpc_unregister_rpc()
258 if (strcmp(rpc->uri, name) == 0) in evrpc_unregister_rpc()
261 if (rpc == NULL) { in evrpc_unregister_rpc()
262 /* We did not find an RPC with this name */ in evrpc_unregister_rpc()
263 return (-1); in evrpc_unregister_rpc()
265 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); in evrpc_unregister_rpc()
270 r = evhttp_del_cb(base->http_server, registered_uri); in evrpc_unregister_rpc()
275 mm_free((char *)rpc->uri); in evrpc_unregister_rpc()
276 mm_free(rpc); in evrpc_unregister_rpc()
287 struct evrpc *rpc = arg; in evrpc_request_cb() local
291 if (req->type != EVHTTP_REQ_POST || in evrpc_request_cb()
292 evbuffer_get_length(req->input_buffer) <= 0) in evrpc_request_cb()
296 if (rpc_state == NULL) in evrpc_request_cb()
298 rpc_state->rpc = rpc; in evrpc_request_cb()
299 rpc_state->http_req = req; in evrpc_request_cb()
300 rpc_state->rpc_data = NULL; in evrpc_request_cb()
302 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { in evrpc_request_cb()
305 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); in evrpc_request_cb()
310 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, in evrpc_request_cb()
311 rpc_state, req, req->input_buffer); in evrpc_request_cb()
316 evrpc_pause_request(rpc->base, rpc_state, in evrpc_request_cb()
332 if (rpc_state) in evrpc_request_cb()
342 struct evrpc *rpc; in evrpc_request_cb_closure() local
346 rpc = rpc_state->rpc; in evrpc_request_cb_closure()
347 req = rpc_state->http_req; in evrpc_request_cb_closure()
349 if (hook_res == EVRPC_TERMINATE) in evrpc_request_cb_closure()
353 rpc_state->request = rpc->request_new(rpc->request_new_arg); in evrpc_request_cb_closure()
354 if (rpc_state->request == NULL) in evrpc_request_cb_closure()
357 if (rpc->request_unmarshal( in evrpc_request_cb_closure()
358 rpc_state->request, req->input_buffer) == -1) { in evrpc_request_cb_closure()
365 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); in evrpc_request_cb_closure()
366 if (rpc_state->reply == NULL) in evrpc_request_cb_closure()
369 /* give the rpc to the user; they can deal with it */ in evrpc_request_cb_closure()
370 rpc->cb(rpc_state, rpc->cb_arg); in evrpc_request_cb_closure()
384 struct evrpc *rpc; in evrpc_reqstate_free_() local
386 rpc = rpc_state->rpc; in evrpc_reqstate_free_()
389 if (rpc_state->hook_meta != NULL) in evrpc_reqstate_free_()
390 evrpc_hook_context_free_(rpc_state->hook_meta); in evrpc_reqstate_free_()
391 if (rpc_state->request != NULL) in evrpc_reqstate_free_()
392 rpc->request_free(rpc_state->request); in evrpc_reqstate_free_()
393 if (rpc_state->reply != NULL) in evrpc_reqstate_free_()
394 rpc->reply_free(rpc_state->reply); in evrpc_reqstate_free_()
395 if (rpc_state->rpc_data != NULL) in evrpc_reqstate_free_()
396 evbuffer_free(rpc_state->rpc_data); in evrpc_reqstate_free_()
407 struct evrpc *rpc; in evrpc_request_done() local
411 req = rpc_state->http_req; in evrpc_request_done()
412 rpc = rpc_state->rpc; in evrpc_request_done()
414 if (rpc->reply_complete(rpc_state->reply) == -1) { in evrpc_request_done()
419 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { in evrpc_request_done()
425 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); in evrpc_request_done()
427 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { in evrpc_request_done()
430 evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); in evrpc_request_done()
433 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, in evrpc_request_done()
434 rpc_state, req, rpc_state->rpc_data); in evrpc_request_done()
439 if (evrpc_pause_request(rpc->base, rpc_state, in evrpc_request_done()
440 evrpc_request_done_closure) == -1) in evrpc_request_done()
464 return req->request; in evrpc_get_request()
470 return req->reply; in evrpc_get_reply()
479 req = rpc_state->http_req; in evrpc_request_done_closure()
481 if (hook_res == EVRPC_TERMINATE) in evrpc_request_done_closure()
485 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { in evrpc_request_done_closure()
486 evhttp_add_header(req->output_headers, in evrpc_request_done_closure()
487 "Content-Type", "application/octet-stream"); in evrpc_request_done_closure()
489 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); in evrpc_request_done_closure()
502 /* Client implementation of RPC site */
511 if (pool == NULL) in evrpc_pool_new()
514 TAILQ_INIT(&pool->connections); in evrpc_pool_new()
515 TAILQ_INIT(&pool->requests); in evrpc_pool_new()
517 TAILQ_INIT(&pool->paused_requests); in evrpc_pool_new()
519 TAILQ_INIT(&pool->input_hooks); in evrpc_pool_new()
520 TAILQ_INIT(&pool->output_hooks); in evrpc_pool_new()
522 pool->base = base; in evrpc_pool_new()
523 pool->timeout = -1; in evrpc_pool_new()
531 if (request->hook_meta != NULL) in evrpc_request_wrapper_free()
532 evrpc_hook_context_free_(request->hook_meta); in evrpc_request_wrapper_free()
533 mm_free(request->name); in evrpc_request_wrapper_free()
546 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { in evrpc_pool_free()
547 TAILQ_REMOVE(&pool->requests, request, next); in evrpc_pool_free()
551 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { in evrpc_pool_free()
552 TAILQ_REMOVE(&pool->paused_requests, pause, next); in evrpc_pool_free()
556 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { in evrpc_pool_free()
557 TAILQ_REMOVE(&pool->connections, connection, next); in evrpc_pool_free()
561 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { in evrpc_pool_free()
566 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { in evrpc_pool_free()
575 * Add a connection to the RPC pool. A request scheduled on the pool
583 EVUTIL_ASSERT(connection->http_server == NULL); in evrpc_pool_add_connection()
584 TAILQ_INSERT_TAIL(&pool->connections, connection, next); in evrpc_pool_add_connection()
589 if (pool->base != NULL) in evrpc_pool_add_connection()
590 evhttp_connection_set_base(connection, pool->base); in evrpc_pool_add_connection()
596 if (!evutil_timerisset(&connection->timeout)) in evrpc_pool_add_connection()
597 evhttp_connection_set_timeout(connection, pool->timeout); in evrpc_pool_add_connection()
600 * if we have any requests pending, schedule them with the new in evrpc_pool_add_connection()
604 if (TAILQ_FIRST(&pool->requests) != NULL) { in evrpc_pool_add_connection()
606 TAILQ_FIRST(&pool->requests); in evrpc_pool_add_connection()
607 TAILQ_REMOVE(&pool->requests, request, next); in evrpc_pool_add_connection()
616 TAILQ_REMOVE(&pool->connections, connection, next); in evrpc_pool_remove_connection()
623 TAILQ_FOREACH(evcon, &pool->connections, next) { in evrpc_pool_set_timeout()
626 pool->timeout = timeout_in_secs; in evrpc_pool_set_timeout()
641 TAILQ_FOREACH(connection, &pool->connections, next) { in evrpc_pool_find_connection()
642 if (TAILQ_FIRST(&connection->requests) == NULL) in evrpc_pool_find_connection()
663 struct evrpc_pool *pool = ctx->pool; in evrpc_schedule_request()
666 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) in evrpc_schedule_request()
670 ctx->request_marshal(req->output_buffer, ctx->request); in evrpc_schedule_request()
673 ctx->evcon = connection; in evrpc_schedule_request()
675 /* if we get paused we also need to know the request */ in evrpc_schedule_request()
676 ctx->req = req; in evrpc_schedule_request()
678 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { in evrpc_schedule_request()
681 evrpc_hook_associate_meta_(&ctx->hook_meta, connection); in evrpc_schedule_request()
684 hook_res = evrpc_process_hooks(&pool->output_hooks, in evrpc_schedule_request()
685 ctx, req, req->output_buffer); in evrpc_schedule_request()
692 if (evrpc_pause_request(pool, ctx, in evrpc_schedule_request()
693 evrpc_schedule_request_closure) == -1) in evrpc_schedule_request()
712 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); in evrpc_schedule_request()
714 return (-1); in evrpc_schedule_request()
721 struct evhttp_connection *connection = ctx->evcon; in evrpc_schedule_request_closure()
722 struct evhttp_request *req = ctx->req; in evrpc_schedule_request_closure()
723 struct evrpc_pool *pool = ctx->pool; in evrpc_schedule_request_closure()
728 if (hook_res == EVRPC_TERMINATE) in evrpc_schedule_request_closure()
731 uri = evrpc_construct_uri(ctx->name); in evrpc_schedule_request_closure()
732 if (uri == NULL) in evrpc_schedule_request_closure()
735 if (pool->timeout > 0) { in evrpc_schedule_request_closure()
737 * a timeout after which the whole rpc is going to be aborted. in evrpc_schedule_request_closure()
741 tv.tv_sec = pool->timeout; in evrpc_schedule_request_closure()
742 evtimer_add(&ctx->ev_timeout, &tv); in evrpc_schedule_request_closure()
749 if (res == -1) in evrpc_schedule_request_closure()
757 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); in evrpc_schedule_request_closure()
768 if (pause == NULL) in evrpc_pause_request()
769 return (-1); in evrpc_pause_request()
771 pause->ctx = ctx; in evrpc_pause_request()
772 pause->cb = cb; in evrpc_pause_request()
774 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); in evrpc_pause_request()
782 struct evrpc_pause_list *head = &base->pause_requests; in evrpc_resume_request()
786 if (pause->ctx == ctx) in evrpc_resume_request()
790 if (pause == NULL) in evrpc_resume_request()
791 return (-1); in evrpc_resume_request()
793 (*pause->cb)(pause->ctx, res); in evrpc_resume_request()
802 struct evrpc_pool *pool = ctx->pool; in evrpc_make_request()
804 /* initialize the event structure for this rpc */ in evrpc_make_request()
805 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); in evrpc_make_request()
808 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); in evrpc_make_request()
811 * if no connection is available, we queue the request on the pool, in evrpc_make_request()
812 * the next time a connection is empty, the rpc will be send on that. in evrpc_make_request()
814 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); in evrpc_make_request()
834 if (ctx == NULL) in evrpc_make_request_ctx()
837 ctx->pool = pool; in evrpc_make_request_ctx()
838 ctx->hook_meta = NULL; in evrpc_make_request_ctx()
839 ctx->evcon = NULL; in evrpc_make_request_ctx()
840 ctx->name = mm_strdup(rpcname); in evrpc_make_request_ctx()
841 if (ctx->name == NULL) { in evrpc_make_request_ctx()
845 ctx->cb = cb; in evrpc_make_request_ctx()
846 ctx->cb_arg = cbarg; in evrpc_make_request_ctx()
847 ctx->request = request; in evrpc_make_request_ctx()
848 ctx->reply = reply; in evrpc_make_request_ctx()
849 ctx->request_marshal = req_marshal; in evrpc_make_request_ctx()
850 ctx->reply_clear = rpl_clear; in evrpc_make_request_ctx()
851 ctx->reply_unmarshal = rpl_unmarshal; in evrpc_make_request_ctx()
863 struct evrpc_pool *pool = ctx->pool; in evrpc_reply_done()
867 event_del(&ctx->ev_timeout); in evrpc_reply_done()
869 ctx->req = req; in evrpc_reply_done()
872 if (req == NULL) { in evrpc_reply_done()
877 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { in evrpc_reply_done()
878 evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); in evrpc_reply_done()
881 hook_res = evrpc_process_hooks(&pool->input_hooks, in evrpc_reply_done()
882 ctx, req, req->input_buffer); in evrpc_reply_done()
890 * if we get paused we also need to know the in evrpc_reply_done()
916 struct evhttp_request *req = ctx->req; in evrpc_reply_done_closure()
917 struct evrpc_pool *pool = ctx->pool; in evrpc_reply_done_closure()
919 int res = -1; in evrpc_reply_done_closure()
925 if (req == NULL) { in evrpc_reply_done_closure()
927 } else if (hook_res == EVRPC_TERMINATE) { in evrpc_reply_done_closure()
930 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); in evrpc_reply_done_closure()
931 if (res == -1) in evrpc_reply_done_closure()
935 if (res == -1) { in evrpc_reply_done_closure()
937 ctx->reply_clear(ctx->reply); in evrpc_reply_done_closure()
940 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); in evrpc_reply_done_closure()
944 /* the http layer owned the original request structure, but if we in evrpc_reply_done_closure()
946 if (req != NULL && evhttp_request_is_owned(req)) in evrpc_reply_done_closure()
949 /* see if we can schedule another request */ in evrpc_reply_done_closure()
956 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); in evrpc_pool_schedule()
959 /* if no requests are pending, we have no work */ in evrpc_pool_schedule()
960 if (ctx == NULL) in evrpc_pool_schedule()
963 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { in evrpc_pool_schedule()
964 TAILQ_REMOVE(&pool->requests, ctx, next); in evrpc_pool_schedule()
973 struct evhttp_connection *evcon = ctx->evcon; in evrpc_request_timeout()
991 mm_free(entry->key); in evrpc_meta_data_free()
992 mm_free(entry->data); in evrpc_meta_data_free()
1004 TAILQ_INIT(&ctx->meta_data); in evrpc_hook_meta_new_()
1005 ctx->evcon = NULL; in evrpc_hook_meta_new_()
1015 if (ctx == NULL) in evrpc_hook_associate_meta_()
1017 ctx->evcon = evcon; in evrpc_hook_associate_meta_()
1023 evrpc_meta_data_free(&ctx->meta_data); in evrpc_hook_context_free_()
1036 if ((store = req->hook_meta) == NULL) in evrpc_hook_add_meta()
1037 store = req->hook_meta = evrpc_hook_meta_new_(); in evrpc_hook_add_meta()
1041 meta->key = mm_strdup(key); in evrpc_hook_add_meta()
1042 EVUTIL_ASSERT(meta->key != NULL); in evrpc_hook_add_meta()
1043 meta->data_size = data_size; in evrpc_hook_add_meta()
1044 meta->data = mm_malloc(data_size); in evrpc_hook_add_meta()
1045 EVUTIL_ASSERT(meta->data != NULL); in evrpc_hook_add_meta()
1046 memcpy(meta->data, data, data_size); in evrpc_hook_add_meta()
1048 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); in evrpc_hook_add_meta()
1057 if (req->hook_meta == NULL) in evrpc_hook_find_meta()
1058 return (-1); in evrpc_hook_find_meta()
1060 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { in evrpc_hook_find_meta()
1061 if (strcmp(meta->key, key) == 0) { in evrpc_hook_find_meta()
1062 *data = meta->data; in evrpc_hook_find_meta()
1063 *data_size = meta->data_size; in evrpc_hook_find_meta()
1068 return (-1); in evrpc_hook_find_meta()
1075 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); in evrpc_hook_get_connection()
1092 if (ctx == NULL) in evrpc_send_request_generic()
1099 return (-1); in evrpc_send_request_generic()
1111 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); in evrpc_register_object() local
1112 if (rpc == NULL) in evrpc_register_object()
1114 rpc->uri = mm_strdup(name); in evrpc_register_object()
1115 if (rpc->uri == NULL) { in evrpc_register_object()
1116 mm_free(rpc); in evrpc_register_object()
1119 rpc->request_new = req_new; in evrpc_register_object()
1120 rpc->request_new_arg = req_new_arg; in evrpc_register_object()
1121 rpc->request_free = req_free; in evrpc_register_object()
1122 rpc->request_unmarshal = req_unmarshal; in evrpc_register_object()
1123 rpc->reply_new = rpl_new; in evrpc_register_object()
1124 rpc->reply_new_arg = rpl_new_arg; in evrpc_register_object()
1125 rpc->reply_free = rpl_free; in evrpc_register_object()
1126 rpc->reply_complete = rpl_complete; in evrpc_register_object()
1127 rpc->reply_marshal = rpl_marshal; in evrpc_register_object()
1128 return (rpc); in evrpc_register_object()
1140 struct evrpc* rpc = in evrpc_register_generic() local
1143 if (rpc == NULL) in evrpc_register_generic()
1144 return (-1); in evrpc_register_generic()
1145 evrpc_register_rpc(base, rpc, in evrpc_register_generic()
1154 return (ctx->pool); in evrpc_request_get_pool()
1161 ctx->pool = pool; in evrpc_request_set_pool()
1169 ctx->cb = cb; in evrpc_request_set_cb()
1170 ctx->cb_arg = cb_arg; in evrpc_request_set_cb()