12b15cb3dSCy Schubert /* 22b15cb3dSCy Schubert * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 32b15cb3dSCy Schubert * 42b15cb3dSCy Schubert * All rights reserved. 52b15cb3dSCy Schubert * 62b15cb3dSCy Schubert * Redistribution and use in source and binary forms, with or without 72b15cb3dSCy Schubert * modification, are permitted provided that the following conditions 82b15cb3dSCy Schubert * are met: 92b15cb3dSCy Schubert * 1. Redistributions of source code must retain the above copyright 102b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer. 112b15cb3dSCy Schubert * 2. Redistributions in binary form must reproduce the above copyright 122b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer in the 132b15cb3dSCy Schubert * documentation and/or other materials provided with the distribution. 142b15cb3dSCy Schubert * 3. The name of the author may not be used to endorse or promote products 152b15cb3dSCy Schubert * derived from this software without specific prior written permission. 162b15cb3dSCy Schubert * 172b15cb3dSCy Schubert * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 182b15cb3dSCy Schubert * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 192b15cb3dSCy Schubert * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 202b15cb3dSCy Schubert * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 212b15cb3dSCy Schubert * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 222b15cb3dSCy Schubert * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 232b15cb3dSCy Schubert * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 242b15cb3dSCy Schubert * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 252b15cb3dSCy Schubert * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 262b15cb3dSCy Schubert * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 272b15cb3dSCy Schubert */ 282b15cb3dSCy Schubert 292b15cb3dSCy Schubert #include "event2/event-config.h" 302b15cb3dSCy Schubert #include "evconfig-private.h" 312b15cb3dSCy Schubert 322b15cb3dSCy Schubert #ifdef EVENT__HAVE_SYS_TIME_H 332b15cb3dSCy Schubert #include <sys/time.h> 342b15cb3dSCy Schubert #endif 352b15cb3dSCy Schubert 362b15cb3dSCy Schubert #include <errno.h> 372b15cb3dSCy Schubert #include <stdio.h> 382b15cb3dSCy Schubert #include <stdlib.h> 392b15cb3dSCy Schubert #include <string.h> 402b15cb3dSCy Schubert #ifdef EVENT__HAVE_STDARG_H 412b15cb3dSCy Schubert #include <stdarg.h> 422b15cb3dSCy Schubert #endif 432b15cb3dSCy Schubert #ifdef EVENT__HAVE_UNISTD_H 442b15cb3dSCy Schubert #include <unistd.h> 452b15cb3dSCy Schubert #endif 462b15cb3dSCy Schubert 472b15cb3dSCy Schubert #ifdef _WIN32 482b15cb3dSCy Schubert #include <winsock2.h> 492b15cb3dSCy Schubert #include <ws2tcpip.h> 502b15cb3dSCy Schubert #endif 512b15cb3dSCy Schubert 522b15cb3dSCy Schubert #include <sys/queue.h> 532b15cb3dSCy Schubert 542b15cb3dSCy Schubert #include "event2/util.h" 552b15cb3dSCy Schubert #include "event2/bufferevent.h" 562b15cb3dSCy Schubert #include "event2/buffer.h" 572b15cb3dSCy Schubert #include "event2/bufferevent_struct.h" 582b15cb3dSCy Schubert #include "event2/event.h" 592b15cb3dSCy Schubert #include "event2/util.h" 602b15cb3dSCy Schubert #include "event-internal.h" 612b15cb3dSCy Schubert #include "log-internal.h" 622b15cb3dSCy Schubert #include "mm-internal.h" 632b15cb3dSCy Schubert #include "bufferevent-internal.h" 642b15cb3dSCy Schubert #include "util-internal.h" 652b15cb3dSCy Schubert #include "iocp-internal.h" 662b15cb3dSCy Schubert 672b15cb3dSCy Schubert #ifndef SO_UPDATE_CONNECT_CONTEXT 682b15cb3dSCy Schubert /* Mingw is sometimes missing this */ 692b15cb3dSCy Schubert #define SO_UPDATE_CONNECT_CONTEXT 0x7010 702b15cb3dSCy Schubert #endif 712b15cb3dSCy Schubert 722b15cb3dSCy Schubert /* prototypes */ 732b15cb3dSCy Schubert static int be_async_enable(struct bufferevent *, short); 742b15cb3dSCy Schubert static int be_async_disable(struct bufferevent *, short); 752b15cb3dSCy Schubert static void be_async_destruct(struct bufferevent *); 762b15cb3dSCy Schubert static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 772b15cb3dSCy Schubert static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 782b15cb3dSCy Schubert 792b15cb3dSCy Schubert struct bufferevent_async { 802b15cb3dSCy Schubert struct bufferevent_private bev; 812b15cb3dSCy Schubert struct event_overlapped connect_overlapped; 822b15cb3dSCy Schubert struct event_overlapped read_overlapped; 832b15cb3dSCy Schubert struct event_overlapped write_overlapped; 842b15cb3dSCy Schubert size_t read_in_progress; 852b15cb3dSCy Schubert size_t write_in_progress; 862b15cb3dSCy Schubert unsigned ok : 1; 872b15cb3dSCy Schubert unsigned read_added : 1; 882b15cb3dSCy Schubert unsigned write_added : 1; 892b15cb3dSCy Schubert }; 902b15cb3dSCy Schubert 912b15cb3dSCy Schubert const struct bufferevent_ops bufferevent_ops_async = { 922b15cb3dSCy Schubert "socket_async", 932b15cb3dSCy Schubert evutil_offsetof(struct bufferevent_async, bev.bev), 942b15cb3dSCy Schubert be_async_enable, 952b15cb3dSCy Schubert be_async_disable, 962b15cb3dSCy Schubert NULL, /* Unlink */ 972b15cb3dSCy Schubert be_async_destruct, 982b15cb3dSCy Schubert bufferevent_generic_adj_timeouts_, 992b15cb3dSCy Schubert be_async_flush, 1002b15cb3dSCy Schubert be_async_ctrl, 1012b15cb3dSCy Schubert }; 1022b15cb3dSCy Schubert 1032b15cb3dSCy Schubert static inline struct bufferevent_async * 1042b15cb3dSCy Schubert upcast(struct bufferevent *bev) 1052b15cb3dSCy Schubert { 1062b15cb3dSCy Schubert struct bufferevent_async *bev_a; 1072b15cb3dSCy Schubert if (bev->be_ops != &bufferevent_ops_async) 1082b15cb3dSCy Schubert return NULL; 1092b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 1102b15cb3dSCy Schubert return bev_a; 1112b15cb3dSCy Schubert } 1122b15cb3dSCy Schubert 1132b15cb3dSCy Schubert static inline struct bufferevent_async * 1142b15cb3dSCy Schubert upcast_connect(struct event_overlapped *eo) 1152b15cb3dSCy Schubert { 1162b15cb3dSCy Schubert struct bufferevent_async *bev_a; 1172b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 1182b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1192b15cb3dSCy Schubert return bev_a; 1202b15cb3dSCy Schubert } 1212b15cb3dSCy Schubert 1222b15cb3dSCy Schubert static inline struct bufferevent_async * 1232b15cb3dSCy Schubert upcast_read(struct event_overlapped *eo) 1242b15cb3dSCy Schubert { 1252b15cb3dSCy Schubert struct bufferevent_async *bev_a; 1262b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 1272b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1282b15cb3dSCy Schubert return bev_a; 1292b15cb3dSCy Schubert } 1302b15cb3dSCy Schubert 1312b15cb3dSCy Schubert static inline struct bufferevent_async * 1322b15cb3dSCy Schubert upcast_write(struct event_overlapped *eo) 1332b15cb3dSCy Schubert { 1342b15cb3dSCy Schubert struct bufferevent_async *bev_a; 1352b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 1362b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1372b15cb3dSCy Schubert return bev_a; 1382b15cb3dSCy Schubert } 1392b15cb3dSCy Schubert 1402b15cb3dSCy Schubert static void 1412b15cb3dSCy Schubert bev_async_del_write(struct bufferevent_async *beva) 1422b15cb3dSCy Schubert { 1432b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 1442b15cb3dSCy Schubert 1452b15cb3dSCy Schubert if (beva->write_added) { 1462b15cb3dSCy Schubert beva->write_added = 0; 1472b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 1482b15cb3dSCy Schubert } 1492b15cb3dSCy Schubert } 1502b15cb3dSCy Schubert 1512b15cb3dSCy Schubert static void 1522b15cb3dSCy Schubert bev_async_del_read(struct bufferevent_async *beva) 1532b15cb3dSCy Schubert { 1542b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 1552b15cb3dSCy Schubert 1562b15cb3dSCy Schubert if (beva->read_added) { 1572b15cb3dSCy Schubert beva->read_added = 0; 1582b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 1592b15cb3dSCy Schubert } 1602b15cb3dSCy Schubert } 1612b15cb3dSCy Schubert 1622b15cb3dSCy Schubert static void 1632b15cb3dSCy Schubert bev_async_add_write(struct bufferevent_async *beva) 1642b15cb3dSCy Schubert { 1652b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 1662b15cb3dSCy Schubert 1672b15cb3dSCy Schubert if (!beva->write_added) { 1682b15cb3dSCy Schubert beva->write_added = 1; 1692b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 1702b15cb3dSCy Schubert } 1712b15cb3dSCy Schubert } 1722b15cb3dSCy Schubert 1732b15cb3dSCy Schubert static void 1742b15cb3dSCy Schubert bev_async_add_read(struct bufferevent_async *beva) 1752b15cb3dSCy Schubert { 1762b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 1772b15cb3dSCy Schubert 1782b15cb3dSCy Schubert if (!beva->read_added) { 1792b15cb3dSCy Schubert beva->read_added = 1; 1802b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 1812b15cb3dSCy Schubert } 1822b15cb3dSCy Schubert } 1832b15cb3dSCy Schubert 1842b15cb3dSCy Schubert static void 1852b15cb3dSCy Schubert bev_async_consider_writing(struct bufferevent_async *beva) 1862b15cb3dSCy Schubert { 1872b15cb3dSCy Schubert size_t at_most; 1882b15cb3dSCy Schubert int limit; 1892b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 1902b15cb3dSCy Schubert 1912b15cb3dSCy Schubert /* Don't write if there's a write in progress, or we do not 1922b15cb3dSCy Schubert * want to write, or when there's nothing left to write. */ 1932b15cb3dSCy Schubert if (beva->write_in_progress || beva->bev.connecting) 1942b15cb3dSCy Schubert return; 1952b15cb3dSCy Schubert if (!beva->ok || !(bev->enabled&EV_WRITE) || 1962b15cb3dSCy Schubert !evbuffer_get_length(bev->output)) { 1972b15cb3dSCy Schubert bev_async_del_write(beva); 1982b15cb3dSCy Schubert return; 1992b15cb3dSCy Schubert } 2002b15cb3dSCy Schubert 2012b15cb3dSCy Schubert at_most = evbuffer_get_length(bev->output); 2022b15cb3dSCy Schubert 2032b15cb3dSCy Schubert /* This is safe so long as bufferevent_get_write_max never returns 2042b15cb3dSCy Schubert * more than INT_MAX. That's true for now. XXXX */ 2052b15cb3dSCy Schubert limit = (int)bufferevent_get_write_max_(&beva->bev); 2062b15cb3dSCy Schubert if (at_most >= (size_t)limit && limit >= 0) 2072b15cb3dSCy Schubert at_most = limit; 2082b15cb3dSCy Schubert 2092b15cb3dSCy Schubert if (beva->bev.write_suspended) { 2102b15cb3dSCy Schubert bev_async_del_write(beva); 2112b15cb3dSCy Schubert return; 2122b15cb3dSCy Schubert } 2132b15cb3dSCy Schubert 2142b15cb3dSCy Schubert /* XXXX doesn't respect low-water mark very well. */ 2152b15cb3dSCy Schubert bufferevent_incref_(bev); 2162b15cb3dSCy Schubert if (evbuffer_launch_write_(bev->output, at_most, 2172b15cb3dSCy Schubert &beva->write_overlapped)) { 2182b15cb3dSCy Schubert bufferevent_decref_(bev); 2192b15cb3dSCy Schubert beva->ok = 0; 2202b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 2212b15cb3dSCy Schubert } else { 2222b15cb3dSCy Schubert beva->write_in_progress = at_most; 2232b15cb3dSCy Schubert bufferevent_decrement_write_buckets_(&beva->bev, at_most); 2242b15cb3dSCy Schubert bev_async_add_write(beva); 2252b15cb3dSCy Schubert } 2262b15cb3dSCy Schubert } 2272b15cb3dSCy Schubert 2282b15cb3dSCy Schubert static void 2292b15cb3dSCy Schubert bev_async_consider_reading(struct bufferevent_async *beva) 2302b15cb3dSCy Schubert { 2312b15cb3dSCy Schubert size_t cur_size; 2322b15cb3dSCy Schubert size_t read_high; 2332b15cb3dSCy Schubert size_t at_most; 2342b15cb3dSCy Schubert int limit; 2352b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 2362b15cb3dSCy Schubert 2372b15cb3dSCy Schubert /* Don't read if there is a read in progress, or we do not 2382b15cb3dSCy Schubert * want to read. */ 2392b15cb3dSCy Schubert if (beva->read_in_progress || beva->bev.connecting) 2402b15cb3dSCy Schubert return; 2412b15cb3dSCy Schubert if (!beva->ok || !(bev->enabled&EV_READ)) { 2422b15cb3dSCy Schubert bev_async_del_read(beva); 2432b15cb3dSCy Schubert return; 2442b15cb3dSCy Schubert } 2452b15cb3dSCy Schubert 2462b15cb3dSCy Schubert /* Don't read if we're full */ 2472b15cb3dSCy Schubert cur_size = evbuffer_get_length(bev->input); 2482b15cb3dSCy Schubert read_high = bev->wm_read.high; 2492b15cb3dSCy Schubert if (read_high) { 2502b15cb3dSCy Schubert if (cur_size >= read_high) { 2512b15cb3dSCy Schubert bev_async_del_read(beva); 2522b15cb3dSCy Schubert return; 2532b15cb3dSCy Schubert } 2542b15cb3dSCy Schubert at_most = read_high - cur_size; 2552b15cb3dSCy Schubert } else { 2562b15cb3dSCy Schubert at_most = 16384; /* FIXME totally magic. */ 2572b15cb3dSCy Schubert } 2582b15cb3dSCy Schubert 2592b15cb3dSCy Schubert /* XXXX This over-commits. */ 2602b15cb3dSCy Schubert /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 2612b15cb3dSCy Schubert limit = (int)bufferevent_get_read_max_(&beva->bev); 2622b15cb3dSCy Schubert if (at_most >= (size_t)limit && limit >= 0) 2632b15cb3dSCy Schubert at_most = limit; 2642b15cb3dSCy Schubert 2652b15cb3dSCy Schubert if (beva->bev.read_suspended) { 2662b15cb3dSCy Schubert bev_async_del_read(beva); 2672b15cb3dSCy Schubert return; 2682b15cb3dSCy Schubert } 2692b15cb3dSCy Schubert 2702b15cb3dSCy Schubert bufferevent_incref_(bev); 2712b15cb3dSCy Schubert if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 2722b15cb3dSCy Schubert beva->ok = 0; 2732b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 2742b15cb3dSCy Schubert bufferevent_decref_(bev); 2752b15cb3dSCy Schubert } else { 2762b15cb3dSCy Schubert beva->read_in_progress = at_most; 2772b15cb3dSCy Schubert bufferevent_decrement_read_buckets_(&beva->bev, at_most); 2782b15cb3dSCy Schubert bev_async_add_read(beva); 2792b15cb3dSCy Schubert } 2802b15cb3dSCy Schubert 2812b15cb3dSCy Schubert return; 2822b15cb3dSCy Schubert } 2832b15cb3dSCy Schubert 2842b15cb3dSCy Schubert static void 2852b15cb3dSCy Schubert be_async_outbuf_callback(struct evbuffer *buf, 2862b15cb3dSCy Schubert const struct evbuffer_cb_info *cbinfo, 2872b15cb3dSCy Schubert void *arg) 2882b15cb3dSCy Schubert { 2892b15cb3dSCy Schubert struct bufferevent *bev = arg; 2902b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 2912b15cb3dSCy Schubert 2922b15cb3dSCy Schubert /* If we added data to the outbuf and were not writing before, 2932b15cb3dSCy Schubert * we may want to write now. */ 2942b15cb3dSCy Schubert 2952b15cb3dSCy Schubert bufferevent_incref_and_lock_(bev); 2962b15cb3dSCy Schubert 2972b15cb3dSCy Schubert if (cbinfo->n_added) 2982b15cb3dSCy Schubert bev_async_consider_writing(bev_async); 2992b15cb3dSCy Schubert 3002b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 3012b15cb3dSCy Schubert } 3022b15cb3dSCy Schubert 3032b15cb3dSCy Schubert static void 3042b15cb3dSCy Schubert be_async_inbuf_callback(struct evbuffer *buf, 3052b15cb3dSCy Schubert const struct evbuffer_cb_info *cbinfo, 3062b15cb3dSCy Schubert void *arg) 3072b15cb3dSCy Schubert { 3082b15cb3dSCy Schubert struct bufferevent *bev = arg; 3092b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 3102b15cb3dSCy Schubert 3112b15cb3dSCy Schubert /* If we drained data from the inbuf and were not reading before, 3122b15cb3dSCy Schubert * we may want to read now */ 3132b15cb3dSCy Schubert 3142b15cb3dSCy Schubert bufferevent_incref_and_lock_(bev); 3152b15cb3dSCy Schubert 3162b15cb3dSCy Schubert if (cbinfo->n_deleted) 3172b15cb3dSCy Schubert bev_async_consider_reading(bev_async); 3182b15cb3dSCy Schubert 3192b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 3202b15cb3dSCy Schubert } 3212b15cb3dSCy Schubert 3222b15cb3dSCy Schubert static int 3232b15cb3dSCy Schubert be_async_enable(struct bufferevent *buf, short what) 3242b15cb3dSCy Schubert { 3252b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(buf); 3262b15cb3dSCy Schubert 3272b15cb3dSCy Schubert if (!bev_async->ok) 3282b15cb3dSCy Schubert return -1; 3292b15cb3dSCy Schubert 3302b15cb3dSCy Schubert if (bev_async->bev.connecting) { 3312b15cb3dSCy Schubert /* Don't launch anything during connection attempts. */ 3322b15cb3dSCy Schubert return 0; 3332b15cb3dSCy Schubert } 3342b15cb3dSCy Schubert 3352b15cb3dSCy Schubert if (what & EV_READ) 3362b15cb3dSCy Schubert BEV_RESET_GENERIC_READ_TIMEOUT(buf); 3372b15cb3dSCy Schubert if (what & EV_WRITE) 3382b15cb3dSCy Schubert BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 3392b15cb3dSCy Schubert 3402b15cb3dSCy Schubert /* If we newly enable reading or writing, and we aren't reading or 3412b15cb3dSCy Schubert writing already, consider launching a new read or write. */ 3422b15cb3dSCy Schubert 3432b15cb3dSCy Schubert if (what & EV_READ) 3442b15cb3dSCy Schubert bev_async_consider_reading(bev_async); 3452b15cb3dSCy Schubert if (what & EV_WRITE) 3462b15cb3dSCy Schubert bev_async_consider_writing(bev_async); 3472b15cb3dSCy Schubert return 0; 3482b15cb3dSCy Schubert } 3492b15cb3dSCy Schubert 3502b15cb3dSCy Schubert static int 3512b15cb3dSCy Schubert be_async_disable(struct bufferevent *bev, short what) 3522b15cb3dSCy Schubert { 3532b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 3542b15cb3dSCy Schubert /* XXXX If we disable reading or writing, we may want to consider 3552b15cb3dSCy Schubert * canceling any in-progress read or write operation, though it might 3562b15cb3dSCy Schubert * not work. */ 3572b15cb3dSCy Schubert 3582b15cb3dSCy Schubert if (what & EV_READ) { 3592b15cb3dSCy Schubert BEV_DEL_GENERIC_READ_TIMEOUT(bev); 3602b15cb3dSCy Schubert bev_async_del_read(bev_async); 3612b15cb3dSCy Schubert } 3622b15cb3dSCy Schubert if (what & EV_WRITE) { 3632b15cb3dSCy Schubert BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 3642b15cb3dSCy Schubert bev_async_del_write(bev_async); 3652b15cb3dSCy Schubert } 3662b15cb3dSCy Schubert 3672b15cb3dSCy Schubert return 0; 3682b15cb3dSCy Schubert } 3692b15cb3dSCy Schubert 3702b15cb3dSCy Schubert static void 3712b15cb3dSCy Schubert be_async_destruct(struct bufferevent *bev) 3722b15cb3dSCy Schubert { 3732b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 3742b15cb3dSCy Schubert struct bufferevent_private *bev_p = BEV_UPCAST(bev); 3752b15cb3dSCy Schubert evutil_socket_t fd; 3762b15cb3dSCy Schubert 3772b15cb3dSCy Schubert EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 3782b15cb3dSCy Schubert !upcast(bev)->read_in_progress); 3792b15cb3dSCy Schubert 3802b15cb3dSCy Schubert bev_async_del_read(bev_async); 3812b15cb3dSCy Schubert bev_async_del_write(bev_async); 3822b15cb3dSCy Schubert 3832b15cb3dSCy Schubert fd = evbuffer_overlapped_get_fd_(bev->input); 384*a25439b6SCy Schubert if (fd != (evutil_socket_t)INVALID_SOCKET && 385*a25439b6SCy Schubert (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 3862b15cb3dSCy Schubert evutil_closesocket(fd); 387*a25439b6SCy Schubert evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 3882b15cb3dSCy Schubert } 3892b15cb3dSCy Schubert } 3902b15cb3dSCy Schubert 3912b15cb3dSCy Schubert /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 3922b15cb3dSCy Schubert * we use WSAGetOverlappedResult to translate. */ 3932b15cb3dSCy Schubert static void 3942b15cb3dSCy Schubert bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 3952b15cb3dSCy Schubert { 3962b15cb3dSCy Schubert DWORD bytes, flags; 3972b15cb3dSCy Schubert evutil_socket_t fd; 3982b15cb3dSCy Schubert 3992b15cb3dSCy Schubert fd = evbuffer_overlapped_get_fd_(bev->input); 4002b15cb3dSCy Schubert WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 4012b15cb3dSCy Schubert } 4022b15cb3dSCy Schubert 4032b15cb3dSCy Schubert static int 4042b15cb3dSCy Schubert be_async_flush(struct bufferevent *bev, short what, 4052b15cb3dSCy Schubert enum bufferevent_flush_mode mode) 4062b15cb3dSCy Schubert { 4072b15cb3dSCy Schubert return 0; 4082b15cb3dSCy Schubert } 4092b15cb3dSCy Schubert 4102b15cb3dSCy Schubert static void 4112b15cb3dSCy Schubert connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 4122b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 4132b15cb3dSCy Schubert { 4142b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_connect(eo); 4152b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 4162b15cb3dSCy Schubert evutil_socket_t sock; 4172b15cb3dSCy Schubert 4182b15cb3dSCy Schubert BEV_LOCK(bev); 4192b15cb3dSCy Schubert 4202b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->bev.connecting); 4212b15cb3dSCy Schubert bev_a->bev.connecting = 0; 4222b15cb3dSCy Schubert sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 4232b15cb3dSCy Schubert /* XXXX Handle error? */ 4242b15cb3dSCy Schubert setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 4252b15cb3dSCy Schubert 4262b15cb3dSCy Schubert if (ok) 4272b15cb3dSCy Schubert bufferevent_async_set_connected_(bev); 4282b15cb3dSCy Schubert else 4292b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 4302b15cb3dSCy Schubert 4312b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, 4322b15cb3dSCy Schubert ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 4332b15cb3dSCy Schubert 4342b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 4352b15cb3dSCy Schubert 4362b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 4372b15cb3dSCy Schubert } 4382b15cb3dSCy Schubert 4392b15cb3dSCy Schubert static void 4402b15cb3dSCy Schubert read_complete(struct event_overlapped *eo, ev_uintptr_t key, 4412b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 4422b15cb3dSCy Schubert { 4432b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_read(eo); 4442b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 4452b15cb3dSCy Schubert short what = BEV_EVENT_READING; 4462b15cb3dSCy Schubert ev_ssize_t amount_unread; 4472b15cb3dSCy Schubert BEV_LOCK(bev); 4482b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->read_in_progress); 4492b15cb3dSCy Schubert 4502b15cb3dSCy Schubert amount_unread = bev_a->read_in_progress - nbytes; 4512b15cb3dSCy Schubert evbuffer_commit_read_(bev->input, nbytes); 4522b15cb3dSCy Schubert bev_a->read_in_progress = 0; 4532b15cb3dSCy Schubert if (amount_unread) 4542b15cb3dSCy Schubert bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 4552b15cb3dSCy Schubert 4562b15cb3dSCy Schubert if (!ok) 4572b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 4582b15cb3dSCy Schubert 4592b15cb3dSCy Schubert if (bev_a->ok) { 4602b15cb3dSCy Schubert if (ok && nbytes) { 4612b15cb3dSCy Schubert BEV_RESET_GENERIC_READ_TIMEOUT(bev); 4622b15cb3dSCy Schubert bufferevent_trigger_nolock_(bev, EV_READ, 0); 4632b15cb3dSCy Schubert bev_async_consider_reading(bev_a); 4642b15cb3dSCy Schubert } else if (!ok) { 4652b15cb3dSCy Schubert what |= BEV_EVENT_ERROR; 4662b15cb3dSCy Schubert bev_a->ok = 0; 4672b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 4682b15cb3dSCy Schubert } else if (!nbytes) { 4692b15cb3dSCy Schubert what |= BEV_EVENT_EOF; 4702b15cb3dSCy Schubert bev_a->ok = 0; 4712b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 4722b15cb3dSCy Schubert } 4732b15cb3dSCy Schubert } 4742b15cb3dSCy Schubert 4752b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 4762b15cb3dSCy Schubert } 4772b15cb3dSCy Schubert 4782b15cb3dSCy Schubert static void 4792b15cb3dSCy Schubert write_complete(struct event_overlapped *eo, ev_uintptr_t key, 4802b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 4812b15cb3dSCy Schubert { 4822b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_write(eo); 4832b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 4842b15cb3dSCy Schubert short what = BEV_EVENT_WRITING; 4852b15cb3dSCy Schubert ev_ssize_t amount_unwritten; 4862b15cb3dSCy Schubert 4872b15cb3dSCy Schubert BEV_LOCK(bev); 4882b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->write_in_progress); 4892b15cb3dSCy Schubert 4902b15cb3dSCy Schubert amount_unwritten = bev_a->write_in_progress - nbytes; 4912b15cb3dSCy Schubert evbuffer_commit_write_(bev->output, nbytes); 4922b15cb3dSCy Schubert bev_a->write_in_progress = 0; 4932b15cb3dSCy Schubert 4942b15cb3dSCy Schubert if (amount_unwritten) 4952b15cb3dSCy Schubert bufferevent_decrement_write_buckets_(&bev_a->bev, 4962b15cb3dSCy Schubert -amount_unwritten); 4972b15cb3dSCy Schubert 4982b15cb3dSCy Schubert 4992b15cb3dSCy Schubert if (!ok) 5002b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 5012b15cb3dSCy Schubert 5022b15cb3dSCy Schubert if (bev_a->ok) { 5032b15cb3dSCy Schubert if (ok && nbytes) { 5042b15cb3dSCy Schubert BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 5052b15cb3dSCy Schubert bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 5062b15cb3dSCy Schubert bev_async_consider_writing(bev_a); 5072b15cb3dSCy Schubert } else if (!ok) { 5082b15cb3dSCy Schubert what |= BEV_EVENT_ERROR; 5092b15cb3dSCy Schubert bev_a->ok = 0; 5102b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 5112b15cb3dSCy Schubert } else if (!nbytes) { 5122b15cb3dSCy Schubert what |= BEV_EVENT_EOF; 5132b15cb3dSCy Schubert bev_a->ok = 0; 5142b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 5152b15cb3dSCy Schubert } 5162b15cb3dSCy Schubert } 5172b15cb3dSCy Schubert 5182b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 5192b15cb3dSCy Schubert } 5202b15cb3dSCy Schubert 5212b15cb3dSCy Schubert struct bufferevent * 5222b15cb3dSCy Schubert bufferevent_async_new_(struct event_base *base, 5232b15cb3dSCy Schubert evutil_socket_t fd, int options) 5242b15cb3dSCy Schubert { 5252b15cb3dSCy Schubert struct bufferevent_async *bev_a; 5262b15cb3dSCy Schubert struct bufferevent *bev; 5272b15cb3dSCy Schubert struct event_iocp_port *iocp; 5282b15cb3dSCy Schubert 5292b15cb3dSCy Schubert options |= BEV_OPT_THREADSAFE; 5302b15cb3dSCy Schubert 5312b15cb3dSCy Schubert if (!(iocp = event_base_get_iocp_(base))) 5322b15cb3dSCy Schubert return NULL; 5332b15cb3dSCy Schubert 5342b15cb3dSCy Schubert if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 5352b15cb3dSCy Schubert int err = GetLastError(); 5362b15cb3dSCy Schubert /* We may have alrady associated this fd with a port. 5372b15cb3dSCy Schubert * Let's hope it's this port, and that the error code 5382b15cb3dSCy Schubert * for doing this neer changes. */ 5392b15cb3dSCy Schubert if (err != ERROR_INVALID_PARAMETER) 5402b15cb3dSCy Schubert return NULL; 5412b15cb3dSCy Schubert } 5422b15cb3dSCy Schubert 5432b15cb3dSCy Schubert if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 5442b15cb3dSCy Schubert return NULL; 5452b15cb3dSCy Schubert 5462b15cb3dSCy Schubert bev = &bev_a->bev.bev; 5472b15cb3dSCy Schubert if (!(bev->input = evbuffer_overlapped_new_(fd))) { 5482b15cb3dSCy Schubert mm_free(bev_a); 5492b15cb3dSCy Schubert return NULL; 5502b15cb3dSCy Schubert } 5512b15cb3dSCy Schubert if (!(bev->output = evbuffer_overlapped_new_(fd))) { 5522b15cb3dSCy Schubert evbuffer_free(bev->input); 5532b15cb3dSCy Schubert mm_free(bev_a); 5542b15cb3dSCy Schubert return NULL; 5552b15cb3dSCy Schubert } 5562b15cb3dSCy Schubert 5572b15cb3dSCy Schubert if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 5582b15cb3dSCy Schubert options)<0) 5592b15cb3dSCy Schubert goto err; 5602b15cb3dSCy Schubert 5612b15cb3dSCy Schubert evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 5622b15cb3dSCy Schubert evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 5632b15cb3dSCy Schubert 5642b15cb3dSCy Schubert event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 5652b15cb3dSCy Schubert event_overlapped_init_(&bev_a->read_overlapped, read_complete); 5662b15cb3dSCy Schubert event_overlapped_init_(&bev_a->write_overlapped, write_complete); 5672b15cb3dSCy Schubert 5682b15cb3dSCy Schubert bufferevent_init_generic_timeout_cbs_(bev); 5692b15cb3dSCy Schubert 570*a25439b6SCy Schubert bev_a->ok = fd >= 0; 571*a25439b6SCy Schubert 5722b15cb3dSCy Schubert return bev; 5732b15cb3dSCy Schubert err: 5742b15cb3dSCy Schubert bufferevent_free(&bev_a->bev.bev); 5752b15cb3dSCy Schubert return NULL; 5762b15cb3dSCy Schubert } 5772b15cb3dSCy Schubert 5782b15cb3dSCy Schubert void 5792b15cb3dSCy Schubert bufferevent_async_set_connected_(struct bufferevent *bev) 5802b15cb3dSCy Schubert { 5812b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 5822b15cb3dSCy Schubert bev_async->ok = 1; 5832b15cb3dSCy Schubert bufferevent_init_generic_timeout_cbs_(bev); 5842b15cb3dSCy Schubert /* Now's a good time to consider reading/writing */ 5852b15cb3dSCy Schubert be_async_enable(bev, bev->enabled); 5862b15cb3dSCy Schubert } 5872b15cb3dSCy Schubert 5882b15cb3dSCy Schubert int 5892b15cb3dSCy Schubert bufferevent_async_can_connect_(struct bufferevent *bev) 5902b15cb3dSCy Schubert { 5912b15cb3dSCy Schubert const struct win32_extension_fns *ext = 5922b15cb3dSCy Schubert event_get_win32_extension_fns_(); 5932b15cb3dSCy Schubert 5942b15cb3dSCy Schubert if (BEV_IS_ASYNC(bev) && 5952b15cb3dSCy Schubert event_base_get_iocp_(bev->ev_base) && 5962b15cb3dSCy Schubert ext && ext->ConnectEx) 5972b15cb3dSCy Schubert return 1; 5982b15cb3dSCy Schubert 5992b15cb3dSCy Schubert return 0; 6002b15cb3dSCy Schubert } 6012b15cb3dSCy Schubert 6022b15cb3dSCy Schubert int 6032b15cb3dSCy Schubert bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 6042b15cb3dSCy Schubert const struct sockaddr *sa, int socklen) 6052b15cb3dSCy Schubert { 6062b15cb3dSCy Schubert BOOL rc; 6072b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 6082b15cb3dSCy Schubert struct sockaddr_storage ss; 6092b15cb3dSCy Schubert const struct win32_extension_fns *ext = 6102b15cb3dSCy Schubert event_get_win32_extension_fns_(); 6112b15cb3dSCy Schubert 6122b15cb3dSCy Schubert EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 6132b15cb3dSCy Schubert 6142b15cb3dSCy Schubert /* ConnectEx() requires that the socket be bound to an address 6152b15cb3dSCy Schubert * with bind() before using, otherwise it will fail. We attempt 6162b15cb3dSCy Schubert * to issue a bind() here, taking into account that the error 6172b15cb3dSCy Schubert * code is set to WSAEINVAL when the socket is already bound. */ 6182b15cb3dSCy Schubert memset(&ss, 0, sizeof(ss)); 6192b15cb3dSCy Schubert if (sa->sa_family == AF_INET) { 6202b15cb3dSCy Schubert struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 6212b15cb3dSCy Schubert sin->sin_family = AF_INET; 6222b15cb3dSCy Schubert sin->sin_addr.s_addr = INADDR_ANY; 6232b15cb3dSCy Schubert } else if (sa->sa_family == AF_INET6) { 6242b15cb3dSCy Schubert struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 6252b15cb3dSCy Schubert sin6->sin6_family = AF_INET6; 6262b15cb3dSCy Schubert sin6->sin6_addr = in6addr_any; 6272b15cb3dSCy Schubert } else { 6282b15cb3dSCy Schubert /* Well, the user will have to bind() */ 6292b15cb3dSCy Schubert return -1; 6302b15cb3dSCy Schubert } 6312b15cb3dSCy Schubert if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 6322b15cb3dSCy Schubert WSAGetLastError() != WSAEINVAL) 6332b15cb3dSCy Schubert return -1; 6342b15cb3dSCy Schubert 6352b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 6362b15cb3dSCy Schubert bufferevent_incref_(bev); 6372b15cb3dSCy Schubert rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 6382b15cb3dSCy Schubert &bev_async->connect_overlapped.overlapped); 6392b15cb3dSCy Schubert if (rc || WSAGetLastError() == ERROR_IO_PENDING) 6402b15cb3dSCy Schubert return 0; 6412b15cb3dSCy Schubert 6422b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 6432b15cb3dSCy Schubert bufferevent_decref_(bev); 6442b15cb3dSCy Schubert 6452b15cb3dSCy Schubert return -1; 6462b15cb3dSCy Schubert } 6472b15cb3dSCy Schubert 6482b15cb3dSCy Schubert static int 6492b15cb3dSCy Schubert be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 6502b15cb3dSCy Schubert union bufferevent_ctrl_data *data) 6512b15cb3dSCy Schubert { 6522b15cb3dSCy Schubert switch (op) { 6532b15cb3dSCy Schubert case BEV_CTRL_GET_FD: 6542b15cb3dSCy Schubert data->fd = evbuffer_overlapped_get_fd_(bev->input); 6552b15cb3dSCy Schubert return 0; 6562b15cb3dSCy Schubert case BEV_CTRL_SET_FD: { 6572b15cb3dSCy Schubert struct event_iocp_port *iocp; 6582b15cb3dSCy Schubert 6592b15cb3dSCy Schubert if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 6602b15cb3dSCy Schubert return 0; 6612b15cb3dSCy Schubert if (!(iocp = event_base_get_iocp_(bev->ev_base))) 6622b15cb3dSCy Schubert return -1; 6632b15cb3dSCy Schubert if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 6642b15cb3dSCy Schubert return -1; 6652b15cb3dSCy Schubert evbuffer_overlapped_set_fd_(bev->input, data->fd); 6662b15cb3dSCy Schubert evbuffer_overlapped_set_fd_(bev->output, data->fd); 6672b15cb3dSCy Schubert return 0; 6682b15cb3dSCy Schubert } 6692b15cb3dSCy Schubert case BEV_CTRL_CANCEL_ALL: { 6702b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast(bev); 6712b15cb3dSCy Schubert evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 6722b15cb3dSCy Schubert if (fd != (evutil_socket_t)INVALID_SOCKET && 6732b15cb3dSCy Schubert (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 6742b15cb3dSCy Schubert closesocket(fd); 675*a25439b6SCy Schubert evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 6762b15cb3dSCy Schubert } 6772b15cb3dSCy Schubert bev_a->ok = 0; 6782b15cb3dSCy Schubert return 0; 6792b15cb3dSCy Schubert } 6802b15cb3dSCy Schubert case BEV_CTRL_GET_UNDERLYING: 6812b15cb3dSCy Schubert default: 6822b15cb3dSCy Schubert return -1; 6832b15cb3dSCy Schubert } 6842b15cb3dSCy Schubert } 6852b15cb3dSCy Schubert 6862b15cb3dSCy Schubert 687