xref: /freebsd/contrib/ntp/sntp/libevent/bufferevent_async.c (revision a25439b68651d176ae05867f5090d45fd85e9f24)
12b15cb3dSCy Schubert /*
22b15cb3dSCy Schubert  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
32b15cb3dSCy Schubert  *
42b15cb3dSCy Schubert  * All rights reserved.
52b15cb3dSCy Schubert  *
62b15cb3dSCy Schubert  * Redistribution and use in source and binary forms, with or without
72b15cb3dSCy Schubert  * modification, are permitted provided that the following conditions
82b15cb3dSCy Schubert  * are met:
92b15cb3dSCy Schubert  * 1. Redistributions of source code must retain the above copyright
102b15cb3dSCy Schubert  *    notice, this list of conditions and the following disclaimer.
112b15cb3dSCy Schubert  * 2. Redistributions in binary form must reproduce the above copyright
122b15cb3dSCy Schubert  *    notice, this list of conditions and the following disclaimer in the
132b15cb3dSCy Schubert  *    documentation and/or other materials provided with the distribution.
142b15cb3dSCy Schubert  * 3. The name of the author may not be used to endorse or promote products
152b15cb3dSCy Schubert  *    derived from this software without specific prior written permission.
162b15cb3dSCy Schubert  *
172b15cb3dSCy Schubert  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
182b15cb3dSCy Schubert  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
192b15cb3dSCy Schubert  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
202b15cb3dSCy Schubert  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
212b15cb3dSCy Schubert  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
222b15cb3dSCy Schubert  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
232b15cb3dSCy Schubert  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
242b15cb3dSCy Schubert  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
252b15cb3dSCy Schubert  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
262b15cb3dSCy Schubert  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
272b15cb3dSCy Schubert  */
282b15cb3dSCy Schubert 
292b15cb3dSCy Schubert #include "event2/event-config.h"
302b15cb3dSCy Schubert #include "evconfig-private.h"
312b15cb3dSCy Schubert 
322b15cb3dSCy Schubert #ifdef EVENT__HAVE_SYS_TIME_H
332b15cb3dSCy Schubert #include <sys/time.h>
342b15cb3dSCy Schubert #endif
352b15cb3dSCy Schubert 
362b15cb3dSCy Schubert #include <errno.h>
372b15cb3dSCy Schubert #include <stdio.h>
382b15cb3dSCy Schubert #include <stdlib.h>
392b15cb3dSCy Schubert #include <string.h>
402b15cb3dSCy Schubert #ifdef EVENT__HAVE_STDARG_H
412b15cb3dSCy Schubert #include <stdarg.h>
422b15cb3dSCy Schubert #endif
432b15cb3dSCy Schubert #ifdef EVENT__HAVE_UNISTD_H
442b15cb3dSCy Schubert #include <unistd.h>
452b15cb3dSCy Schubert #endif
462b15cb3dSCy Schubert 
472b15cb3dSCy Schubert #ifdef _WIN32
482b15cb3dSCy Schubert #include <winsock2.h>
492b15cb3dSCy Schubert #include <ws2tcpip.h>
502b15cb3dSCy Schubert #endif
512b15cb3dSCy Schubert 
522b15cb3dSCy Schubert #include <sys/queue.h>
532b15cb3dSCy Schubert 
542b15cb3dSCy Schubert #include "event2/util.h"
552b15cb3dSCy Schubert #include "event2/bufferevent.h"
562b15cb3dSCy Schubert #include "event2/buffer.h"
572b15cb3dSCy Schubert #include "event2/bufferevent_struct.h"
582b15cb3dSCy Schubert #include "event2/event.h"
592b15cb3dSCy Schubert #include "event2/util.h"
602b15cb3dSCy Schubert #include "event-internal.h"
612b15cb3dSCy Schubert #include "log-internal.h"
622b15cb3dSCy Schubert #include "mm-internal.h"
632b15cb3dSCy Schubert #include "bufferevent-internal.h"
642b15cb3dSCy Schubert #include "util-internal.h"
652b15cb3dSCy Schubert #include "iocp-internal.h"
662b15cb3dSCy Schubert 
672b15cb3dSCy Schubert #ifndef SO_UPDATE_CONNECT_CONTEXT
682b15cb3dSCy Schubert /* Mingw is sometimes missing this */
692b15cb3dSCy Schubert #define SO_UPDATE_CONNECT_CONTEXT 0x7010
702b15cb3dSCy Schubert #endif
712b15cb3dSCy Schubert 
722b15cb3dSCy Schubert /* prototypes */
732b15cb3dSCy Schubert static int be_async_enable(struct bufferevent *, short);
742b15cb3dSCy Schubert static int be_async_disable(struct bufferevent *, short);
752b15cb3dSCy Schubert static void be_async_destruct(struct bufferevent *);
762b15cb3dSCy Schubert static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
772b15cb3dSCy Schubert static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
782b15cb3dSCy Schubert 
792b15cb3dSCy Schubert struct bufferevent_async {
802b15cb3dSCy Schubert 	struct bufferevent_private bev;
812b15cb3dSCy Schubert 	struct event_overlapped connect_overlapped;
822b15cb3dSCy Schubert 	struct event_overlapped read_overlapped;
832b15cb3dSCy Schubert 	struct event_overlapped write_overlapped;
842b15cb3dSCy Schubert 	size_t read_in_progress;
852b15cb3dSCy Schubert 	size_t write_in_progress;
862b15cb3dSCy Schubert 	unsigned ok : 1;
872b15cb3dSCy Schubert 	unsigned read_added : 1;
882b15cb3dSCy Schubert 	unsigned write_added : 1;
892b15cb3dSCy Schubert };
902b15cb3dSCy Schubert 
912b15cb3dSCy Schubert const struct bufferevent_ops bufferevent_ops_async = {
922b15cb3dSCy Schubert 	"socket_async",
932b15cb3dSCy Schubert 	evutil_offsetof(struct bufferevent_async, bev.bev),
942b15cb3dSCy Schubert 	be_async_enable,
952b15cb3dSCy Schubert 	be_async_disable,
962b15cb3dSCy Schubert 	NULL, /* Unlink */
972b15cb3dSCy Schubert 	be_async_destruct,
982b15cb3dSCy Schubert 	bufferevent_generic_adj_timeouts_,
992b15cb3dSCy Schubert 	be_async_flush,
1002b15cb3dSCy Schubert 	be_async_ctrl,
1012b15cb3dSCy Schubert };
1022b15cb3dSCy Schubert 
1032b15cb3dSCy Schubert static inline struct bufferevent_async *
1042b15cb3dSCy Schubert upcast(struct bufferevent *bev)
1052b15cb3dSCy Schubert {
1062b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1072b15cb3dSCy Schubert 	if (bev->be_ops != &bufferevent_ops_async)
1082b15cb3dSCy Schubert 		return NULL;
1092b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
1102b15cb3dSCy Schubert 	return bev_a;
1112b15cb3dSCy Schubert }
1122b15cb3dSCy Schubert 
1132b15cb3dSCy Schubert static inline struct bufferevent_async *
1142b15cb3dSCy Schubert upcast_connect(struct event_overlapped *eo)
1152b15cb3dSCy Schubert {
1162b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1172b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
1182b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1192b15cb3dSCy Schubert 	return bev_a;
1202b15cb3dSCy Schubert }
1212b15cb3dSCy Schubert 
1222b15cb3dSCy Schubert static inline struct bufferevent_async *
1232b15cb3dSCy Schubert upcast_read(struct event_overlapped *eo)
1242b15cb3dSCy Schubert {
1252b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1262b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
1272b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1282b15cb3dSCy Schubert 	return bev_a;
1292b15cb3dSCy Schubert }
1302b15cb3dSCy Schubert 
1312b15cb3dSCy Schubert static inline struct bufferevent_async *
1322b15cb3dSCy Schubert upcast_write(struct event_overlapped *eo)
1332b15cb3dSCy Schubert {
1342b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1352b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
1362b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1372b15cb3dSCy Schubert 	return bev_a;
1382b15cb3dSCy Schubert }
1392b15cb3dSCy Schubert 
1402b15cb3dSCy Schubert static void
1412b15cb3dSCy Schubert bev_async_del_write(struct bufferevent_async *beva)
1422b15cb3dSCy Schubert {
1432b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1442b15cb3dSCy Schubert 
1452b15cb3dSCy Schubert 	if (beva->write_added) {
1462b15cb3dSCy Schubert 		beva->write_added = 0;
1472b15cb3dSCy Schubert 		event_base_del_virtual_(bev->ev_base);
1482b15cb3dSCy Schubert 	}
1492b15cb3dSCy Schubert }
1502b15cb3dSCy Schubert 
1512b15cb3dSCy Schubert static void
1522b15cb3dSCy Schubert bev_async_del_read(struct bufferevent_async *beva)
1532b15cb3dSCy Schubert {
1542b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1552b15cb3dSCy Schubert 
1562b15cb3dSCy Schubert 	if (beva->read_added) {
1572b15cb3dSCy Schubert 		beva->read_added = 0;
1582b15cb3dSCy Schubert 		event_base_del_virtual_(bev->ev_base);
1592b15cb3dSCy Schubert 	}
1602b15cb3dSCy Schubert }
1612b15cb3dSCy Schubert 
1622b15cb3dSCy Schubert static void
1632b15cb3dSCy Schubert bev_async_add_write(struct bufferevent_async *beva)
1642b15cb3dSCy Schubert {
1652b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1662b15cb3dSCy Schubert 
1672b15cb3dSCy Schubert 	if (!beva->write_added) {
1682b15cb3dSCy Schubert 		beva->write_added = 1;
1692b15cb3dSCy Schubert 		event_base_add_virtual_(bev->ev_base);
1702b15cb3dSCy Schubert 	}
1712b15cb3dSCy Schubert }
1722b15cb3dSCy Schubert 
1732b15cb3dSCy Schubert static void
1742b15cb3dSCy Schubert bev_async_add_read(struct bufferevent_async *beva)
1752b15cb3dSCy Schubert {
1762b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1772b15cb3dSCy Schubert 
1782b15cb3dSCy Schubert 	if (!beva->read_added) {
1792b15cb3dSCy Schubert 		beva->read_added = 1;
1802b15cb3dSCy Schubert 		event_base_add_virtual_(bev->ev_base);
1812b15cb3dSCy Schubert 	}
1822b15cb3dSCy Schubert }
1832b15cb3dSCy Schubert 
1842b15cb3dSCy Schubert static void
1852b15cb3dSCy Schubert bev_async_consider_writing(struct bufferevent_async *beva)
1862b15cb3dSCy Schubert {
1872b15cb3dSCy Schubert 	size_t at_most;
1882b15cb3dSCy Schubert 	int limit;
1892b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1902b15cb3dSCy Schubert 
1912b15cb3dSCy Schubert 	/* Don't write if there's a write in progress, or we do not
1922b15cb3dSCy Schubert 	 * want to write, or when there's nothing left to write. */
1932b15cb3dSCy Schubert 	if (beva->write_in_progress || beva->bev.connecting)
1942b15cb3dSCy Schubert 		return;
1952b15cb3dSCy Schubert 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
1962b15cb3dSCy Schubert 	    !evbuffer_get_length(bev->output)) {
1972b15cb3dSCy Schubert 		bev_async_del_write(beva);
1982b15cb3dSCy Schubert 		return;
1992b15cb3dSCy Schubert 	}
2002b15cb3dSCy Schubert 
2012b15cb3dSCy Schubert 	at_most = evbuffer_get_length(bev->output);
2022b15cb3dSCy Schubert 
2032b15cb3dSCy Schubert 	/* This is safe so long as bufferevent_get_write_max never returns
2042b15cb3dSCy Schubert 	 * more than INT_MAX.  That's true for now. XXXX */
2052b15cb3dSCy Schubert 	limit = (int)bufferevent_get_write_max_(&beva->bev);
2062b15cb3dSCy Schubert 	if (at_most >= (size_t)limit && limit >= 0)
2072b15cb3dSCy Schubert 		at_most = limit;
2082b15cb3dSCy Schubert 
2092b15cb3dSCy Schubert 	if (beva->bev.write_suspended) {
2102b15cb3dSCy Schubert 		bev_async_del_write(beva);
2112b15cb3dSCy Schubert 		return;
2122b15cb3dSCy Schubert 	}
2132b15cb3dSCy Schubert 
2142b15cb3dSCy Schubert 	/*  XXXX doesn't respect low-water mark very well. */
2152b15cb3dSCy Schubert 	bufferevent_incref_(bev);
2162b15cb3dSCy Schubert 	if (evbuffer_launch_write_(bev->output, at_most,
2172b15cb3dSCy Schubert 	    &beva->write_overlapped)) {
2182b15cb3dSCy Schubert 		bufferevent_decref_(bev);
2192b15cb3dSCy Schubert 		beva->ok = 0;
2202b15cb3dSCy Schubert 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
2212b15cb3dSCy Schubert 	} else {
2222b15cb3dSCy Schubert 		beva->write_in_progress = at_most;
2232b15cb3dSCy Schubert 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
2242b15cb3dSCy Schubert 		bev_async_add_write(beva);
2252b15cb3dSCy Schubert 	}
2262b15cb3dSCy Schubert }
2272b15cb3dSCy Schubert 
2282b15cb3dSCy Schubert static void
2292b15cb3dSCy Schubert bev_async_consider_reading(struct bufferevent_async *beva)
2302b15cb3dSCy Schubert {
2312b15cb3dSCy Schubert 	size_t cur_size;
2322b15cb3dSCy Schubert 	size_t read_high;
2332b15cb3dSCy Schubert 	size_t at_most;
2342b15cb3dSCy Schubert 	int limit;
2352b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
2362b15cb3dSCy Schubert 
2372b15cb3dSCy Schubert 	/* Don't read if there is a read in progress, or we do not
2382b15cb3dSCy Schubert 	 * want to read. */
2392b15cb3dSCy Schubert 	if (beva->read_in_progress || beva->bev.connecting)
2402b15cb3dSCy Schubert 		return;
2412b15cb3dSCy Schubert 	if (!beva->ok || !(bev->enabled&EV_READ)) {
2422b15cb3dSCy Schubert 		bev_async_del_read(beva);
2432b15cb3dSCy Schubert 		return;
2442b15cb3dSCy Schubert 	}
2452b15cb3dSCy Schubert 
2462b15cb3dSCy Schubert 	/* Don't read if we're full */
2472b15cb3dSCy Schubert 	cur_size = evbuffer_get_length(bev->input);
2482b15cb3dSCy Schubert 	read_high = bev->wm_read.high;
2492b15cb3dSCy Schubert 	if (read_high) {
2502b15cb3dSCy Schubert 		if (cur_size >= read_high) {
2512b15cb3dSCy Schubert 			bev_async_del_read(beva);
2522b15cb3dSCy Schubert 			return;
2532b15cb3dSCy Schubert 		}
2542b15cb3dSCy Schubert 		at_most = read_high - cur_size;
2552b15cb3dSCy Schubert 	} else {
2562b15cb3dSCy Schubert 		at_most = 16384; /* FIXME totally magic. */
2572b15cb3dSCy Schubert 	}
2582b15cb3dSCy Schubert 
2592b15cb3dSCy Schubert 	/* XXXX This over-commits. */
2602b15cb3dSCy Schubert 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
2612b15cb3dSCy Schubert 	limit = (int)bufferevent_get_read_max_(&beva->bev);
2622b15cb3dSCy Schubert 	if (at_most >= (size_t)limit && limit >= 0)
2632b15cb3dSCy Schubert 		at_most = limit;
2642b15cb3dSCy Schubert 
2652b15cb3dSCy Schubert 	if (beva->bev.read_suspended) {
2662b15cb3dSCy Schubert 		bev_async_del_read(beva);
2672b15cb3dSCy Schubert 		return;
2682b15cb3dSCy Schubert 	}
2692b15cb3dSCy Schubert 
2702b15cb3dSCy Schubert 	bufferevent_incref_(bev);
2712b15cb3dSCy Schubert 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
2722b15cb3dSCy Schubert 		beva->ok = 0;
2732b15cb3dSCy Schubert 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
2742b15cb3dSCy Schubert 		bufferevent_decref_(bev);
2752b15cb3dSCy Schubert 	} else {
2762b15cb3dSCy Schubert 		beva->read_in_progress = at_most;
2772b15cb3dSCy Schubert 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
2782b15cb3dSCy Schubert 		bev_async_add_read(beva);
2792b15cb3dSCy Schubert 	}
2802b15cb3dSCy Schubert 
2812b15cb3dSCy Schubert 	return;
2822b15cb3dSCy Schubert }
2832b15cb3dSCy Schubert 
2842b15cb3dSCy Schubert static void
2852b15cb3dSCy Schubert be_async_outbuf_callback(struct evbuffer *buf,
2862b15cb3dSCy Schubert     const struct evbuffer_cb_info *cbinfo,
2872b15cb3dSCy Schubert     void *arg)
2882b15cb3dSCy Schubert {
2892b15cb3dSCy Schubert 	struct bufferevent *bev = arg;
2902b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
2912b15cb3dSCy Schubert 
2922b15cb3dSCy Schubert 	/* If we added data to the outbuf and were not writing before,
2932b15cb3dSCy Schubert 	 * we may want to write now. */
2942b15cb3dSCy Schubert 
2952b15cb3dSCy Schubert 	bufferevent_incref_and_lock_(bev);
2962b15cb3dSCy Schubert 
2972b15cb3dSCy Schubert 	if (cbinfo->n_added)
2982b15cb3dSCy Schubert 		bev_async_consider_writing(bev_async);
2992b15cb3dSCy Schubert 
3002b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
3012b15cb3dSCy Schubert }
3022b15cb3dSCy Schubert 
3032b15cb3dSCy Schubert static void
3042b15cb3dSCy Schubert be_async_inbuf_callback(struct evbuffer *buf,
3052b15cb3dSCy Schubert     const struct evbuffer_cb_info *cbinfo,
3062b15cb3dSCy Schubert     void *arg)
3072b15cb3dSCy Schubert {
3082b15cb3dSCy Schubert 	struct bufferevent *bev = arg;
3092b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3102b15cb3dSCy Schubert 
3112b15cb3dSCy Schubert 	/* If we drained data from the inbuf and were not reading before,
3122b15cb3dSCy Schubert 	 * we may want to read now */
3132b15cb3dSCy Schubert 
3142b15cb3dSCy Schubert 	bufferevent_incref_and_lock_(bev);
3152b15cb3dSCy Schubert 
3162b15cb3dSCy Schubert 	if (cbinfo->n_deleted)
3172b15cb3dSCy Schubert 		bev_async_consider_reading(bev_async);
3182b15cb3dSCy Schubert 
3192b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
3202b15cb3dSCy Schubert }
3212b15cb3dSCy Schubert 
3222b15cb3dSCy Schubert static int
3232b15cb3dSCy Schubert be_async_enable(struct bufferevent *buf, short what)
3242b15cb3dSCy Schubert {
3252b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(buf);
3262b15cb3dSCy Schubert 
3272b15cb3dSCy Schubert 	if (!bev_async->ok)
3282b15cb3dSCy Schubert 		return -1;
3292b15cb3dSCy Schubert 
3302b15cb3dSCy Schubert 	if (bev_async->bev.connecting) {
3312b15cb3dSCy Schubert 		/* Don't launch anything during connection attempts. */
3322b15cb3dSCy Schubert 		return 0;
3332b15cb3dSCy Schubert 	}
3342b15cb3dSCy Schubert 
3352b15cb3dSCy Schubert 	if (what & EV_READ)
3362b15cb3dSCy Schubert 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
3372b15cb3dSCy Schubert 	if (what & EV_WRITE)
3382b15cb3dSCy Schubert 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
3392b15cb3dSCy Schubert 
3402b15cb3dSCy Schubert 	/* If we newly enable reading or writing, and we aren't reading or
3412b15cb3dSCy Schubert 	   writing already, consider launching a new read or write. */
3422b15cb3dSCy Schubert 
3432b15cb3dSCy Schubert 	if (what & EV_READ)
3442b15cb3dSCy Schubert 		bev_async_consider_reading(bev_async);
3452b15cb3dSCy Schubert 	if (what & EV_WRITE)
3462b15cb3dSCy Schubert 		bev_async_consider_writing(bev_async);
3472b15cb3dSCy Schubert 	return 0;
3482b15cb3dSCy Schubert }
3492b15cb3dSCy Schubert 
3502b15cb3dSCy Schubert static int
3512b15cb3dSCy Schubert be_async_disable(struct bufferevent *bev, short what)
3522b15cb3dSCy Schubert {
3532b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3542b15cb3dSCy Schubert 	/* XXXX If we disable reading or writing, we may want to consider
3552b15cb3dSCy Schubert 	 * canceling any in-progress read or write operation, though it might
3562b15cb3dSCy Schubert 	 * not work. */
3572b15cb3dSCy Schubert 
3582b15cb3dSCy Schubert 	if (what & EV_READ) {
3592b15cb3dSCy Schubert 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
3602b15cb3dSCy Schubert 		bev_async_del_read(bev_async);
3612b15cb3dSCy Schubert 	}
3622b15cb3dSCy Schubert 	if (what & EV_WRITE) {
3632b15cb3dSCy Schubert 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
3642b15cb3dSCy Schubert 		bev_async_del_write(bev_async);
3652b15cb3dSCy Schubert 	}
3662b15cb3dSCy Schubert 
3672b15cb3dSCy Schubert 	return 0;
3682b15cb3dSCy Schubert }
3692b15cb3dSCy Schubert 
3702b15cb3dSCy Schubert static void
3712b15cb3dSCy Schubert be_async_destruct(struct bufferevent *bev)
3722b15cb3dSCy Schubert {
3732b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3742b15cb3dSCy Schubert 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
3752b15cb3dSCy Schubert 	evutil_socket_t fd;
3762b15cb3dSCy Schubert 
3772b15cb3dSCy Schubert 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
3782b15cb3dSCy Schubert 			!upcast(bev)->read_in_progress);
3792b15cb3dSCy Schubert 
3802b15cb3dSCy Schubert 	bev_async_del_read(bev_async);
3812b15cb3dSCy Schubert 	bev_async_del_write(bev_async);
3822b15cb3dSCy Schubert 
3832b15cb3dSCy Schubert 	fd = evbuffer_overlapped_get_fd_(bev->input);
384*a25439b6SCy Schubert 	if (fd != (evutil_socket_t)INVALID_SOCKET &&
385*a25439b6SCy Schubert 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
3862b15cb3dSCy Schubert 		evutil_closesocket(fd);
387*a25439b6SCy Schubert 		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
3882b15cb3dSCy Schubert 	}
3892b15cb3dSCy Schubert }
3902b15cb3dSCy Schubert 
3912b15cb3dSCy Schubert /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
3922b15cb3dSCy Schubert  * we use WSAGetOverlappedResult to translate. */
3932b15cb3dSCy Schubert static void
3942b15cb3dSCy Schubert bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
3952b15cb3dSCy Schubert {
3962b15cb3dSCy Schubert 	DWORD bytes, flags;
3972b15cb3dSCy Schubert 	evutil_socket_t fd;
3982b15cb3dSCy Schubert 
3992b15cb3dSCy Schubert 	fd = evbuffer_overlapped_get_fd_(bev->input);
4002b15cb3dSCy Schubert 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
4012b15cb3dSCy Schubert }
4022b15cb3dSCy Schubert 
4032b15cb3dSCy Schubert static int
4042b15cb3dSCy Schubert be_async_flush(struct bufferevent *bev, short what,
4052b15cb3dSCy Schubert     enum bufferevent_flush_mode mode)
4062b15cb3dSCy Schubert {
4072b15cb3dSCy Schubert 	return 0;
4082b15cb3dSCy Schubert }
4092b15cb3dSCy Schubert 
4102b15cb3dSCy Schubert static void
4112b15cb3dSCy Schubert connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
4122b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
4132b15cb3dSCy Schubert {
4142b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_connect(eo);
4152b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
4162b15cb3dSCy Schubert 	evutil_socket_t sock;
4172b15cb3dSCy Schubert 
4182b15cb3dSCy Schubert 	BEV_LOCK(bev);
4192b15cb3dSCy Schubert 
4202b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->bev.connecting);
4212b15cb3dSCy Schubert 	bev_a->bev.connecting = 0;
4222b15cb3dSCy Schubert 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
4232b15cb3dSCy Schubert 	/* XXXX Handle error? */
4242b15cb3dSCy Schubert 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
4252b15cb3dSCy Schubert 
4262b15cb3dSCy Schubert 	if (ok)
4272b15cb3dSCy Schubert 		bufferevent_async_set_connected_(bev);
4282b15cb3dSCy Schubert 	else
4292b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
4302b15cb3dSCy Schubert 
4312b15cb3dSCy Schubert 	bufferevent_run_eventcb_(bev,
4322b15cb3dSCy Schubert 			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
4332b15cb3dSCy Schubert 
4342b15cb3dSCy Schubert 	event_base_del_virtual_(bev->ev_base);
4352b15cb3dSCy Schubert 
4362b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
4372b15cb3dSCy Schubert }
4382b15cb3dSCy Schubert 
4392b15cb3dSCy Schubert static void
4402b15cb3dSCy Schubert read_complete(struct event_overlapped *eo, ev_uintptr_t key,
4412b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
4422b15cb3dSCy Schubert {
4432b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_read(eo);
4442b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
4452b15cb3dSCy Schubert 	short what = BEV_EVENT_READING;
4462b15cb3dSCy Schubert 	ev_ssize_t amount_unread;
4472b15cb3dSCy Schubert 	BEV_LOCK(bev);
4482b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->read_in_progress);
4492b15cb3dSCy Schubert 
4502b15cb3dSCy Schubert 	amount_unread = bev_a->read_in_progress - nbytes;
4512b15cb3dSCy Schubert 	evbuffer_commit_read_(bev->input, nbytes);
4522b15cb3dSCy Schubert 	bev_a->read_in_progress = 0;
4532b15cb3dSCy Schubert 	if (amount_unread)
4542b15cb3dSCy Schubert 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
4552b15cb3dSCy Schubert 
4562b15cb3dSCy Schubert 	if (!ok)
4572b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
4582b15cb3dSCy Schubert 
4592b15cb3dSCy Schubert 	if (bev_a->ok) {
4602b15cb3dSCy Schubert 		if (ok && nbytes) {
4612b15cb3dSCy Schubert 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
4622b15cb3dSCy Schubert 			bufferevent_trigger_nolock_(bev, EV_READ, 0);
4632b15cb3dSCy Schubert 			bev_async_consider_reading(bev_a);
4642b15cb3dSCy Schubert 		} else if (!ok) {
4652b15cb3dSCy Schubert 			what |= BEV_EVENT_ERROR;
4662b15cb3dSCy Schubert 			bev_a->ok = 0;
4672b15cb3dSCy Schubert 			bufferevent_run_eventcb_(bev, what, 0);
4682b15cb3dSCy Schubert 		} else if (!nbytes) {
4692b15cb3dSCy Schubert 			what |= BEV_EVENT_EOF;
4702b15cb3dSCy Schubert 			bev_a->ok = 0;
4712b15cb3dSCy Schubert 			bufferevent_run_eventcb_(bev, what, 0);
4722b15cb3dSCy Schubert 		}
4732b15cb3dSCy Schubert 	}
4742b15cb3dSCy Schubert 
4752b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
4762b15cb3dSCy Schubert }
4772b15cb3dSCy Schubert 
4782b15cb3dSCy Schubert static void
4792b15cb3dSCy Schubert write_complete(struct event_overlapped *eo, ev_uintptr_t key,
4802b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
4812b15cb3dSCy Schubert {
4822b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_write(eo);
4832b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
4842b15cb3dSCy Schubert 	short what = BEV_EVENT_WRITING;
4852b15cb3dSCy Schubert 	ev_ssize_t amount_unwritten;
4862b15cb3dSCy Schubert 
4872b15cb3dSCy Schubert 	BEV_LOCK(bev);
4882b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->write_in_progress);
4892b15cb3dSCy Schubert 
4902b15cb3dSCy Schubert 	amount_unwritten = bev_a->write_in_progress - nbytes;
4912b15cb3dSCy Schubert 	evbuffer_commit_write_(bev->output, nbytes);
4922b15cb3dSCy Schubert 	bev_a->write_in_progress = 0;
4932b15cb3dSCy Schubert 
4942b15cb3dSCy Schubert 	if (amount_unwritten)
4952b15cb3dSCy Schubert 		bufferevent_decrement_write_buckets_(&bev_a->bev,
4962b15cb3dSCy Schubert 		                                     -amount_unwritten);
4972b15cb3dSCy Schubert 
4982b15cb3dSCy Schubert 
4992b15cb3dSCy Schubert 	if (!ok)
5002b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
5012b15cb3dSCy Schubert 
5022b15cb3dSCy Schubert 	if (bev_a->ok) {
5032b15cb3dSCy Schubert 		if (ok && nbytes) {
5042b15cb3dSCy Schubert 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
5052b15cb3dSCy Schubert 			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
5062b15cb3dSCy Schubert 			bev_async_consider_writing(bev_a);
5072b15cb3dSCy Schubert 		} else if (!ok) {
5082b15cb3dSCy Schubert 			what |= BEV_EVENT_ERROR;
5092b15cb3dSCy Schubert 			bev_a->ok = 0;
5102b15cb3dSCy Schubert 			bufferevent_run_eventcb_(bev, what, 0);
5112b15cb3dSCy Schubert 		} else if (!nbytes) {
5122b15cb3dSCy Schubert 			what |= BEV_EVENT_EOF;
5132b15cb3dSCy Schubert 			bev_a->ok = 0;
5142b15cb3dSCy Schubert 			bufferevent_run_eventcb_(bev, what, 0);
5152b15cb3dSCy Schubert 		}
5162b15cb3dSCy Schubert 	}
5172b15cb3dSCy Schubert 
5182b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
5192b15cb3dSCy Schubert }
5202b15cb3dSCy Schubert 
5212b15cb3dSCy Schubert struct bufferevent *
5222b15cb3dSCy Schubert bufferevent_async_new_(struct event_base *base,
5232b15cb3dSCy Schubert     evutil_socket_t fd, int options)
5242b15cb3dSCy Schubert {
5252b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
5262b15cb3dSCy Schubert 	struct bufferevent *bev;
5272b15cb3dSCy Schubert 	struct event_iocp_port *iocp;
5282b15cb3dSCy Schubert 
5292b15cb3dSCy Schubert 	options |= BEV_OPT_THREADSAFE;
5302b15cb3dSCy Schubert 
5312b15cb3dSCy Schubert 	if (!(iocp = event_base_get_iocp_(base)))
5322b15cb3dSCy Schubert 		return NULL;
5332b15cb3dSCy Schubert 
5342b15cb3dSCy Schubert 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
5352b15cb3dSCy Schubert 		int err = GetLastError();
5362b15cb3dSCy Schubert 		/* We may have alrady associated this fd with a port.
5372b15cb3dSCy Schubert 		 * Let's hope it's this port, and that the error code
5382b15cb3dSCy Schubert 		 * for doing this neer changes. */
5392b15cb3dSCy Schubert 		if (err != ERROR_INVALID_PARAMETER)
5402b15cb3dSCy Schubert 			return NULL;
5412b15cb3dSCy Schubert 	}
5422b15cb3dSCy Schubert 
5432b15cb3dSCy Schubert 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
5442b15cb3dSCy Schubert 		return NULL;
5452b15cb3dSCy Schubert 
5462b15cb3dSCy Schubert 	bev = &bev_a->bev.bev;
5472b15cb3dSCy Schubert 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
5482b15cb3dSCy Schubert 		mm_free(bev_a);
5492b15cb3dSCy Schubert 		return NULL;
5502b15cb3dSCy Schubert 	}
5512b15cb3dSCy Schubert 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
5522b15cb3dSCy Schubert 		evbuffer_free(bev->input);
5532b15cb3dSCy Schubert 		mm_free(bev_a);
5542b15cb3dSCy Schubert 		return NULL;
5552b15cb3dSCy Schubert 	}
5562b15cb3dSCy Schubert 
5572b15cb3dSCy Schubert 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
5582b15cb3dSCy Schubert 		options)<0)
5592b15cb3dSCy Schubert 		goto err;
5602b15cb3dSCy Schubert 
5612b15cb3dSCy Schubert 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
5622b15cb3dSCy Schubert 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
5632b15cb3dSCy Schubert 
5642b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
5652b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
5662b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
5672b15cb3dSCy Schubert 
5682b15cb3dSCy Schubert 	bufferevent_init_generic_timeout_cbs_(bev);
5692b15cb3dSCy Schubert 
570*a25439b6SCy Schubert 	bev_a->ok = fd >= 0;
571*a25439b6SCy Schubert 
5722b15cb3dSCy Schubert 	return bev;
5732b15cb3dSCy Schubert err:
5742b15cb3dSCy Schubert 	bufferevent_free(&bev_a->bev.bev);
5752b15cb3dSCy Schubert 	return NULL;
5762b15cb3dSCy Schubert }
5772b15cb3dSCy Schubert 
5782b15cb3dSCy Schubert void
5792b15cb3dSCy Schubert bufferevent_async_set_connected_(struct bufferevent *bev)
5802b15cb3dSCy Schubert {
5812b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
5822b15cb3dSCy Schubert 	bev_async->ok = 1;
5832b15cb3dSCy Schubert 	bufferevent_init_generic_timeout_cbs_(bev);
5842b15cb3dSCy Schubert 	/* Now's a good time to consider reading/writing */
5852b15cb3dSCy Schubert 	be_async_enable(bev, bev->enabled);
5862b15cb3dSCy Schubert }
5872b15cb3dSCy Schubert 
5882b15cb3dSCy Schubert int
5892b15cb3dSCy Schubert bufferevent_async_can_connect_(struct bufferevent *bev)
5902b15cb3dSCy Schubert {
5912b15cb3dSCy Schubert 	const struct win32_extension_fns *ext =
5922b15cb3dSCy Schubert 	    event_get_win32_extension_fns_();
5932b15cb3dSCy Schubert 
5942b15cb3dSCy Schubert 	if (BEV_IS_ASYNC(bev) &&
5952b15cb3dSCy Schubert 	    event_base_get_iocp_(bev->ev_base) &&
5962b15cb3dSCy Schubert 	    ext && ext->ConnectEx)
5972b15cb3dSCy Schubert 		return 1;
5982b15cb3dSCy Schubert 
5992b15cb3dSCy Schubert 	return 0;
6002b15cb3dSCy Schubert }
6012b15cb3dSCy Schubert 
6022b15cb3dSCy Schubert int
6032b15cb3dSCy Schubert bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
6042b15cb3dSCy Schubert 	const struct sockaddr *sa, int socklen)
6052b15cb3dSCy Schubert {
6062b15cb3dSCy Schubert 	BOOL rc;
6072b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
6082b15cb3dSCy Schubert 	struct sockaddr_storage ss;
6092b15cb3dSCy Schubert 	const struct win32_extension_fns *ext =
6102b15cb3dSCy Schubert 	    event_get_win32_extension_fns_();
6112b15cb3dSCy Schubert 
6122b15cb3dSCy Schubert 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
6132b15cb3dSCy Schubert 
6142b15cb3dSCy Schubert 	/* ConnectEx() requires that the socket be bound to an address
6152b15cb3dSCy Schubert 	 * with bind() before using, otherwise it will fail. We attempt
6162b15cb3dSCy Schubert 	 * to issue a bind() here, taking into account that the error
6172b15cb3dSCy Schubert 	 * code is set to WSAEINVAL when the socket is already bound. */
6182b15cb3dSCy Schubert 	memset(&ss, 0, sizeof(ss));
6192b15cb3dSCy Schubert 	if (sa->sa_family == AF_INET) {
6202b15cb3dSCy Schubert 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
6212b15cb3dSCy Schubert 		sin->sin_family = AF_INET;
6222b15cb3dSCy Schubert 		sin->sin_addr.s_addr = INADDR_ANY;
6232b15cb3dSCy Schubert 	} else if (sa->sa_family == AF_INET6) {
6242b15cb3dSCy Schubert 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
6252b15cb3dSCy Schubert 		sin6->sin6_family = AF_INET6;
6262b15cb3dSCy Schubert 		sin6->sin6_addr = in6addr_any;
6272b15cb3dSCy Schubert 	} else {
6282b15cb3dSCy Schubert 		/* Well, the user will have to bind() */
6292b15cb3dSCy Schubert 		return -1;
6302b15cb3dSCy Schubert 	}
6312b15cb3dSCy Schubert 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
6322b15cb3dSCy Schubert 	    WSAGetLastError() != WSAEINVAL)
6332b15cb3dSCy Schubert 		return -1;
6342b15cb3dSCy Schubert 
6352b15cb3dSCy Schubert 	event_base_add_virtual_(bev->ev_base);
6362b15cb3dSCy Schubert 	bufferevent_incref_(bev);
6372b15cb3dSCy Schubert 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
6382b15cb3dSCy Schubert 			    &bev_async->connect_overlapped.overlapped);
6392b15cb3dSCy Schubert 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
6402b15cb3dSCy Schubert 		return 0;
6412b15cb3dSCy Schubert 
6422b15cb3dSCy Schubert 	event_base_del_virtual_(bev->ev_base);
6432b15cb3dSCy Schubert 	bufferevent_decref_(bev);
6442b15cb3dSCy Schubert 
6452b15cb3dSCy Schubert 	return -1;
6462b15cb3dSCy Schubert }
6472b15cb3dSCy Schubert 
6482b15cb3dSCy Schubert static int
6492b15cb3dSCy Schubert be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
6502b15cb3dSCy Schubert     union bufferevent_ctrl_data *data)
6512b15cb3dSCy Schubert {
6522b15cb3dSCy Schubert 	switch (op) {
6532b15cb3dSCy Schubert 	case BEV_CTRL_GET_FD:
6542b15cb3dSCy Schubert 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
6552b15cb3dSCy Schubert 		return 0;
6562b15cb3dSCy Schubert 	case BEV_CTRL_SET_FD: {
6572b15cb3dSCy Schubert 		struct event_iocp_port *iocp;
6582b15cb3dSCy Schubert 
6592b15cb3dSCy Schubert 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
6602b15cb3dSCy Schubert 			return 0;
6612b15cb3dSCy Schubert 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
6622b15cb3dSCy Schubert 			return -1;
6632b15cb3dSCy Schubert 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
6642b15cb3dSCy Schubert 			return -1;
6652b15cb3dSCy Schubert 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
6662b15cb3dSCy Schubert 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
6672b15cb3dSCy Schubert 		return 0;
6682b15cb3dSCy Schubert 	}
6692b15cb3dSCy Schubert 	case BEV_CTRL_CANCEL_ALL: {
6702b15cb3dSCy Schubert 		struct bufferevent_async *bev_a = upcast(bev);
6712b15cb3dSCy Schubert 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
6722b15cb3dSCy Schubert 		if (fd != (evutil_socket_t)INVALID_SOCKET &&
6732b15cb3dSCy Schubert 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
6742b15cb3dSCy Schubert 			closesocket(fd);
675*a25439b6SCy Schubert 			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
6762b15cb3dSCy Schubert 		}
6772b15cb3dSCy Schubert 		bev_a->ok = 0;
6782b15cb3dSCy Schubert 		return 0;
6792b15cb3dSCy Schubert 	}
6802b15cb3dSCy Schubert 	case BEV_CTRL_GET_UNDERLYING:
6812b15cb3dSCy Schubert 	default:
6822b15cb3dSCy Schubert 		return -1;
6832b15cb3dSCy Schubert 	}
6842b15cb3dSCy Schubert }
6852b15cb3dSCy Schubert 
6862b15cb3dSCy Schubert 
687