xref: /freebsd/contrib/ntp/sntp/libevent/bufferevent_async.c (revision a466cc55373fc3cf86837f09da729535b57e69a1)
12b15cb3dSCy Schubert /*
22b15cb3dSCy Schubert  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
32b15cb3dSCy Schubert  *
42b15cb3dSCy Schubert  * All rights reserved.
52b15cb3dSCy Schubert  *
62b15cb3dSCy Schubert  * Redistribution and use in source and binary forms, with or without
72b15cb3dSCy Schubert  * modification, are permitted provided that the following conditions
82b15cb3dSCy Schubert  * are met:
92b15cb3dSCy Schubert  * 1. Redistributions of source code must retain the above copyright
102b15cb3dSCy Schubert  *    notice, this list of conditions and the following disclaimer.
112b15cb3dSCy Schubert  * 2. Redistributions in binary form must reproduce the above copyright
122b15cb3dSCy Schubert  *    notice, this list of conditions and the following disclaimer in the
132b15cb3dSCy Schubert  *    documentation and/or other materials provided with the distribution.
142b15cb3dSCy Schubert  * 3. The name of the author may not be used to endorse or promote products
152b15cb3dSCy Schubert  *    derived from this software without specific prior written permission.
162b15cb3dSCy Schubert  *
172b15cb3dSCy Schubert  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
182b15cb3dSCy Schubert  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
192b15cb3dSCy Schubert  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
202b15cb3dSCy Schubert  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
212b15cb3dSCy Schubert  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
222b15cb3dSCy Schubert  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
232b15cb3dSCy Schubert  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
242b15cb3dSCy Schubert  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
252b15cb3dSCy Schubert  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
262b15cb3dSCy Schubert  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
272b15cb3dSCy Schubert  */
282b15cb3dSCy Schubert 
292b15cb3dSCy Schubert #include "event2/event-config.h"
302b15cb3dSCy Schubert #include "evconfig-private.h"
312b15cb3dSCy Schubert 
322b15cb3dSCy Schubert #ifdef EVENT__HAVE_SYS_TIME_H
332b15cb3dSCy Schubert #include <sys/time.h>
342b15cb3dSCy Schubert #endif
352b15cb3dSCy Schubert 
362b15cb3dSCy Schubert #include <errno.h>
372b15cb3dSCy Schubert #include <stdio.h>
382b15cb3dSCy Schubert #include <stdlib.h>
392b15cb3dSCy Schubert #include <string.h>
402b15cb3dSCy Schubert #ifdef EVENT__HAVE_STDARG_H
412b15cb3dSCy Schubert #include <stdarg.h>
422b15cb3dSCy Schubert #endif
432b15cb3dSCy Schubert #ifdef EVENT__HAVE_UNISTD_H
442b15cb3dSCy Schubert #include <unistd.h>
452b15cb3dSCy Schubert #endif
462b15cb3dSCy Schubert 
472b15cb3dSCy Schubert #ifdef _WIN32
482b15cb3dSCy Schubert #include <winsock2.h>
49*a466cc55SCy Schubert #include <winerror.h>
502b15cb3dSCy Schubert #include <ws2tcpip.h>
512b15cb3dSCy Schubert #endif
522b15cb3dSCy Schubert 
532b15cb3dSCy Schubert #include <sys/queue.h>
542b15cb3dSCy Schubert 
552b15cb3dSCy Schubert #include "event2/util.h"
562b15cb3dSCy Schubert #include "event2/bufferevent.h"
572b15cb3dSCy Schubert #include "event2/buffer.h"
582b15cb3dSCy Schubert #include "event2/bufferevent_struct.h"
592b15cb3dSCy Schubert #include "event2/event.h"
602b15cb3dSCy Schubert #include "event2/util.h"
612b15cb3dSCy Schubert #include "event-internal.h"
622b15cb3dSCy Schubert #include "log-internal.h"
632b15cb3dSCy Schubert #include "mm-internal.h"
642b15cb3dSCy Schubert #include "bufferevent-internal.h"
652b15cb3dSCy Schubert #include "util-internal.h"
662b15cb3dSCy Schubert #include "iocp-internal.h"
672b15cb3dSCy Schubert 
682b15cb3dSCy Schubert #ifndef SO_UPDATE_CONNECT_CONTEXT
692b15cb3dSCy Schubert /* Mingw is sometimes missing this */
702b15cb3dSCy Schubert #define SO_UPDATE_CONNECT_CONTEXT 0x7010
712b15cb3dSCy Schubert #endif
722b15cb3dSCy Schubert 
732b15cb3dSCy Schubert /* prototypes */
742b15cb3dSCy Schubert static int be_async_enable(struct bufferevent *, short);
752b15cb3dSCy Schubert static int be_async_disable(struct bufferevent *, short);
762b15cb3dSCy Schubert static void be_async_destruct(struct bufferevent *);
772b15cb3dSCy Schubert static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
782b15cb3dSCy Schubert static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
792b15cb3dSCy Schubert 
802b15cb3dSCy Schubert struct bufferevent_async {
812b15cb3dSCy Schubert 	struct bufferevent_private bev;
822b15cb3dSCy Schubert 	struct event_overlapped connect_overlapped;
832b15cb3dSCy Schubert 	struct event_overlapped read_overlapped;
842b15cb3dSCy Schubert 	struct event_overlapped write_overlapped;
852b15cb3dSCy Schubert 	size_t read_in_progress;
862b15cb3dSCy Schubert 	size_t write_in_progress;
872b15cb3dSCy Schubert 	unsigned ok : 1;
882b15cb3dSCy Schubert 	unsigned read_added : 1;
892b15cb3dSCy Schubert 	unsigned write_added : 1;
902b15cb3dSCy Schubert };
912b15cb3dSCy Schubert 
922b15cb3dSCy Schubert const struct bufferevent_ops bufferevent_ops_async = {
932b15cb3dSCy Schubert 	"socket_async",
942b15cb3dSCy Schubert 	evutil_offsetof(struct bufferevent_async, bev.bev),
952b15cb3dSCy Schubert 	be_async_enable,
962b15cb3dSCy Schubert 	be_async_disable,
972b15cb3dSCy Schubert 	NULL, /* Unlink */
982b15cb3dSCy Schubert 	be_async_destruct,
992b15cb3dSCy Schubert 	bufferevent_generic_adj_timeouts_,
1002b15cb3dSCy Schubert 	be_async_flush,
1012b15cb3dSCy Schubert 	be_async_ctrl,
1022b15cb3dSCy Schubert };
1032b15cb3dSCy Schubert 
104*a466cc55SCy Schubert static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)105*a466cc55SCy Schubert be_async_run_eventcb(struct bufferevent *bev, short what, int options)
106*a466cc55SCy Schubert { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
107*a466cc55SCy Schubert 
108*a466cc55SCy Schubert static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)109*a466cc55SCy Schubert be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
110*a466cc55SCy Schubert { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
111*a466cc55SCy Schubert 
112*a466cc55SCy Schubert static inline int
fatal_error(int err)113*a466cc55SCy Schubert fatal_error(int err)
114*a466cc55SCy Schubert {
115*a466cc55SCy Schubert 	switch (err) {
116*a466cc55SCy Schubert 		/* We may have already associated this fd with a port.
117*a466cc55SCy Schubert 		 * Let's hope it's this port, and that the error code
118*a466cc55SCy Schubert 		 * for doing this neer changes. */
119*a466cc55SCy Schubert 		case ERROR_INVALID_PARAMETER:
120*a466cc55SCy Schubert 			return 0;
121*a466cc55SCy Schubert 	}
122*a466cc55SCy Schubert 	return 1;
123*a466cc55SCy Schubert }
124*a466cc55SCy Schubert 
1252b15cb3dSCy Schubert static inline struct bufferevent_async *
upcast(struct bufferevent * bev)1262b15cb3dSCy Schubert upcast(struct bufferevent *bev)
1272b15cb3dSCy Schubert {
1282b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
129*a466cc55SCy Schubert 	if (!BEV_IS_ASYNC(bev))
1302b15cb3dSCy Schubert 		return NULL;
1312b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
1322b15cb3dSCy Schubert 	return bev_a;
1332b15cb3dSCy Schubert }
1342b15cb3dSCy Schubert 
1352b15cb3dSCy Schubert static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)1362b15cb3dSCy Schubert upcast_connect(struct event_overlapped *eo)
1372b15cb3dSCy Schubert {
1382b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1392b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
1402b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1412b15cb3dSCy Schubert 	return bev_a;
1422b15cb3dSCy Schubert }
1432b15cb3dSCy Schubert 
1442b15cb3dSCy Schubert static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)1452b15cb3dSCy Schubert upcast_read(struct event_overlapped *eo)
1462b15cb3dSCy Schubert {
1472b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1482b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
1492b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1502b15cb3dSCy Schubert 	return bev_a;
1512b15cb3dSCy Schubert }
1522b15cb3dSCy Schubert 
1532b15cb3dSCy Schubert static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)1542b15cb3dSCy Schubert upcast_write(struct event_overlapped *eo)
1552b15cb3dSCy Schubert {
1562b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
1572b15cb3dSCy Schubert 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
1582b15cb3dSCy Schubert 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
1592b15cb3dSCy Schubert 	return bev_a;
1602b15cb3dSCy Schubert }
1612b15cb3dSCy Schubert 
1622b15cb3dSCy Schubert static void
bev_async_del_write(struct bufferevent_async * beva)1632b15cb3dSCy Schubert bev_async_del_write(struct bufferevent_async *beva)
1642b15cb3dSCy Schubert {
1652b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1662b15cb3dSCy Schubert 
1672b15cb3dSCy Schubert 	if (beva->write_added) {
1682b15cb3dSCy Schubert 		beva->write_added = 0;
1692b15cb3dSCy Schubert 		event_base_del_virtual_(bev->ev_base);
1702b15cb3dSCy Schubert 	}
1712b15cb3dSCy Schubert }
1722b15cb3dSCy Schubert 
1732b15cb3dSCy Schubert static void
bev_async_del_read(struct bufferevent_async * beva)1742b15cb3dSCy Schubert bev_async_del_read(struct bufferevent_async *beva)
1752b15cb3dSCy Schubert {
1762b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1772b15cb3dSCy Schubert 
1782b15cb3dSCy Schubert 	if (beva->read_added) {
1792b15cb3dSCy Schubert 		beva->read_added = 0;
1802b15cb3dSCy Schubert 		event_base_del_virtual_(bev->ev_base);
1812b15cb3dSCy Schubert 	}
1822b15cb3dSCy Schubert }
1832b15cb3dSCy Schubert 
1842b15cb3dSCy Schubert static void
bev_async_add_write(struct bufferevent_async * beva)1852b15cb3dSCy Schubert bev_async_add_write(struct bufferevent_async *beva)
1862b15cb3dSCy Schubert {
1872b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1882b15cb3dSCy Schubert 
1892b15cb3dSCy Schubert 	if (!beva->write_added) {
1902b15cb3dSCy Schubert 		beva->write_added = 1;
1912b15cb3dSCy Schubert 		event_base_add_virtual_(bev->ev_base);
1922b15cb3dSCy Schubert 	}
1932b15cb3dSCy Schubert }
1942b15cb3dSCy Schubert 
1952b15cb3dSCy Schubert static void
bev_async_add_read(struct bufferevent_async * beva)1962b15cb3dSCy Schubert bev_async_add_read(struct bufferevent_async *beva)
1972b15cb3dSCy Schubert {
1982b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
1992b15cb3dSCy Schubert 
2002b15cb3dSCy Schubert 	if (!beva->read_added) {
2012b15cb3dSCy Schubert 		beva->read_added = 1;
2022b15cb3dSCy Schubert 		event_base_add_virtual_(bev->ev_base);
2032b15cb3dSCy Schubert 	}
2042b15cb3dSCy Schubert }
2052b15cb3dSCy Schubert 
2062b15cb3dSCy Schubert static void
bev_async_consider_writing(struct bufferevent_async * beva)2072b15cb3dSCy Schubert bev_async_consider_writing(struct bufferevent_async *beva)
2082b15cb3dSCy Schubert {
2092b15cb3dSCy Schubert 	size_t at_most;
2102b15cb3dSCy Schubert 	int limit;
2112b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
2122b15cb3dSCy Schubert 
2132b15cb3dSCy Schubert 	/* Don't write if there's a write in progress, or we do not
2142b15cb3dSCy Schubert 	 * want to write, or when there's nothing left to write. */
2152b15cb3dSCy Schubert 	if (beva->write_in_progress || beva->bev.connecting)
2162b15cb3dSCy Schubert 		return;
2172b15cb3dSCy Schubert 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
2182b15cb3dSCy Schubert 	    !evbuffer_get_length(bev->output)) {
2192b15cb3dSCy Schubert 		bev_async_del_write(beva);
2202b15cb3dSCy Schubert 		return;
2212b15cb3dSCy Schubert 	}
2222b15cb3dSCy Schubert 
2232b15cb3dSCy Schubert 	at_most = evbuffer_get_length(bev->output);
2242b15cb3dSCy Schubert 
2252b15cb3dSCy Schubert 	/* This is safe so long as bufferevent_get_write_max never returns
2262b15cb3dSCy Schubert 	 * more than INT_MAX.  That's true for now. XXXX */
2272b15cb3dSCy Schubert 	limit = (int)bufferevent_get_write_max_(&beva->bev);
2282b15cb3dSCy Schubert 	if (at_most >= (size_t)limit && limit >= 0)
2292b15cb3dSCy Schubert 		at_most = limit;
2302b15cb3dSCy Schubert 
2312b15cb3dSCy Schubert 	if (beva->bev.write_suspended) {
2322b15cb3dSCy Schubert 		bev_async_del_write(beva);
2332b15cb3dSCy Schubert 		return;
2342b15cb3dSCy Schubert 	}
2352b15cb3dSCy Schubert 
2362b15cb3dSCy Schubert 	/*  XXXX doesn't respect low-water mark very well. */
2372b15cb3dSCy Schubert 	bufferevent_incref_(bev);
2382b15cb3dSCy Schubert 	if (evbuffer_launch_write_(bev->output, at_most,
2392b15cb3dSCy Schubert 	    &beva->write_overlapped)) {
2402b15cb3dSCy Schubert 		bufferevent_decref_(bev);
2412b15cb3dSCy Schubert 		beva->ok = 0;
242*a466cc55SCy Schubert 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2432b15cb3dSCy Schubert 	} else {
2442b15cb3dSCy Schubert 		beva->write_in_progress = at_most;
2452b15cb3dSCy Schubert 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
2462b15cb3dSCy Schubert 		bev_async_add_write(beva);
2472b15cb3dSCy Schubert 	}
2482b15cb3dSCy Schubert }
2492b15cb3dSCy Schubert 
2502b15cb3dSCy Schubert static void
bev_async_consider_reading(struct bufferevent_async * beva)2512b15cb3dSCy Schubert bev_async_consider_reading(struct bufferevent_async *beva)
2522b15cb3dSCy Schubert {
2532b15cb3dSCy Schubert 	size_t cur_size;
2542b15cb3dSCy Schubert 	size_t read_high;
2552b15cb3dSCy Schubert 	size_t at_most;
2562b15cb3dSCy Schubert 	int limit;
2572b15cb3dSCy Schubert 	struct bufferevent *bev = &beva->bev.bev;
2582b15cb3dSCy Schubert 
2592b15cb3dSCy Schubert 	/* Don't read if there is a read in progress, or we do not
2602b15cb3dSCy Schubert 	 * want to read. */
2612b15cb3dSCy Schubert 	if (beva->read_in_progress || beva->bev.connecting)
2622b15cb3dSCy Schubert 		return;
2632b15cb3dSCy Schubert 	if (!beva->ok || !(bev->enabled&EV_READ)) {
2642b15cb3dSCy Schubert 		bev_async_del_read(beva);
2652b15cb3dSCy Schubert 		return;
2662b15cb3dSCy Schubert 	}
2672b15cb3dSCy Schubert 
2682b15cb3dSCy Schubert 	/* Don't read if we're full */
2692b15cb3dSCy Schubert 	cur_size = evbuffer_get_length(bev->input);
2702b15cb3dSCy Schubert 	read_high = bev->wm_read.high;
2712b15cb3dSCy Schubert 	if (read_high) {
2722b15cb3dSCy Schubert 		if (cur_size >= read_high) {
2732b15cb3dSCy Schubert 			bev_async_del_read(beva);
2742b15cb3dSCy Schubert 			return;
2752b15cb3dSCy Schubert 		}
2762b15cb3dSCy Schubert 		at_most = read_high - cur_size;
2772b15cb3dSCy Schubert 	} else {
2782b15cb3dSCy Schubert 		at_most = 16384; /* FIXME totally magic. */
2792b15cb3dSCy Schubert 	}
2802b15cb3dSCy Schubert 
2812b15cb3dSCy Schubert 	/* XXXX This over-commits. */
2822b15cb3dSCy Schubert 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
2832b15cb3dSCy Schubert 	limit = (int)bufferevent_get_read_max_(&beva->bev);
2842b15cb3dSCy Schubert 	if (at_most >= (size_t)limit && limit >= 0)
2852b15cb3dSCy Schubert 		at_most = limit;
2862b15cb3dSCy Schubert 
2872b15cb3dSCy Schubert 	if (beva->bev.read_suspended) {
2882b15cb3dSCy Schubert 		bev_async_del_read(beva);
2892b15cb3dSCy Schubert 		return;
2902b15cb3dSCy Schubert 	}
2912b15cb3dSCy Schubert 
2922b15cb3dSCy Schubert 	bufferevent_incref_(bev);
2932b15cb3dSCy Schubert 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
2942b15cb3dSCy Schubert 		beva->ok = 0;
295*a466cc55SCy Schubert 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
2962b15cb3dSCy Schubert 		bufferevent_decref_(bev);
2972b15cb3dSCy Schubert 	} else {
2982b15cb3dSCy Schubert 		beva->read_in_progress = at_most;
2992b15cb3dSCy Schubert 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
3002b15cb3dSCy Schubert 		bev_async_add_read(beva);
3012b15cb3dSCy Schubert 	}
3022b15cb3dSCy Schubert 
3032b15cb3dSCy Schubert 	return;
3042b15cb3dSCy Schubert }
3052b15cb3dSCy Schubert 
3062b15cb3dSCy Schubert static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3072b15cb3dSCy Schubert be_async_outbuf_callback(struct evbuffer *buf,
3082b15cb3dSCy Schubert     const struct evbuffer_cb_info *cbinfo,
3092b15cb3dSCy Schubert     void *arg)
3102b15cb3dSCy Schubert {
3112b15cb3dSCy Schubert 	struct bufferevent *bev = arg;
3122b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3132b15cb3dSCy Schubert 
3142b15cb3dSCy Schubert 	/* If we added data to the outbuf and were not writing before,
3152b15cb3dSCy Schubert 	 * we may want to write now. */
3162b15cb3dSCy Schubert 
3172b15cb3dSCy Schubert 	bufferevent_incref_and_lock_(bev);
3182b15cb3dSCy Schubert 
3192b15cb3dSCy Schubert 	if (cbinfo->n_added)
3202b15cb3dSCy Schubert 		bev_async_consider_writing(bev_async);
3212b15cb3dSCy Schubert 
3222b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
3232b15cb3dSCy Schubert }
3242b15cb3dSCy Schubert 
3252b15cb3dSCy Schubert static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)3262b15cb3dSCy Schubert be_async_inbuf_callback(struct evbuffer *buf,
3272b15cb3dSCy Schubert     const struct evbuffer_cb_info *cbinfo,
3282b15cb3dSCy Schubert     void *arg)
3292b15cb3dSCy Schubert {
3302b15cb3dSCy Schubert 	struct bufferevent *bev = arg;
3312b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3322b15cb3dSCy Schubert 
3332b15cb3dSCy Schubert 	/* If we drained data from the inbuf and were not reading before,
3342b15cb3dSCy Schubert 	 * we may want to read now */
3352b15cb3dSCy Schubert 
3362b15cb3dSCy Schubert 	bufferevent_incref_and_lock_(bev);
3372b15cb3dSCy Schubert 
3382b15cb3dSCy Schubert 	if (cbinfo->n_deleted)
3392b15cb3dSCy Schubert 		bev_async_consider_reading(bev_async);
3402b15cb3dSCy Schubert 
3412b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
3422b15cb3dSCy Schubert }
3432b15cb3dSCy Schubert 
3442b15cb3dSCy Schubert static int
be_async_enable(struct bufferevent * buf,short what)3452b15cb3dSCy Schubert be_async_enable(struct bufferevent *buf, short what)
3462b15cb3dSCy Schubert {
3472b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(buf);
3482b15cb3dSCy Schubert 
3492b15cb3dSCy Schubert 	if (!bev_async->ok)
3502b15cb3dSCy Schubert 		return -1;
3512b15cb3dSCy Schubert 
3522b15cb3dSCy Schubert 	if (bev_async->bev.connecting) {
3532b15cb3dSCy Schubert 		/* Don't launch anything during connection attempts. */
3542b15cb3dSCy Schubert 		return 0;
3552b15cb3dSCy Schubert 	}
3562b15cb3dSCy Schubert 
3572b15cb3dSCy Schubert 	if (what & EV_READ)
3582b15cb3dSCy Schubert 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
3592b15cb3dSCy Schubert 	if (what & EV_WRITE)
3602b15cb3dSCy Schubert 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
3612b15cb3dSCy Schubert 
3622b15cb3dSCy Schubert 	/* If we newly enable reading or writing, and we aren't reading or
3632b15cb3dSCy Schubert 	   writing already, consider launching a new read or write. */
3642b15cb3dSCy Schubert 
3652b15cb3dSCy Schubert 	if (what & EV_READ)
3662b15cb3dSCy Schubert 		bev_async_consider_reading(bev_async);
3672b15cb3dSCy Schubert 	if (what & EV_WRITE)
3682b15cb3dSCy Schubert 		bev_async_consider_writing(bev_async);
3692b15cb3dSCy Schubert 	return 0;
3702b15cb3dSCy Schubert }
3712b15cb3dSCy Schubert 
3722b15cb3dSCy Schubert static int
be_async_disable(struct bufferevent * bev,short what)3732b15cb3dSCy Schubert be_async_disable(struct bufferevent *bev, short what)
3742b15cb3dSCy Schubert {
3752b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3762b15cb3dSCy Schubert 	/* XXXX If we disable reading or writing, we may want to consider
3772b15cb3dSCy Schubert 	 * canceling any in-progress read or write operation, though it might
3782b15cb3dSCy Schubert 	 * not work. */
3792b15cb3dSCy Schubert 
3802b15cb3dSCy Schubert 	if (what & EV_READ) {
3812b15cb3dSCy Schubert 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
3822b15cb3dSCy Schubert 		bev_async_del_read(bev_async);
3832b15cb3dSCy Schubert 	}
3842b15cb3dSCy Schubert 	if (what & EV_WRITE) {
3852b15cb3dSCy Schubert 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
3862b15cb3dSCy Schubert 		bev_async_del_write(bev_async);
3872b15cb3dSCy Schubert 	}
3882b15cb3dSCy Schubert 
3892b15cb3dSCy Schubert 	return 0;
3902b15cb3dSCy Schubert }
3912b15cb3dSCy Schubert 
3922b15cb3dSCy Schubert static void
be_async_destruct(struct bufferevent * bev)3932b15cb3dSCy Schubert be_async_destruct(struct bufferevent *bev)
3942b15cb3dSCy Schubert {
3952b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
3962b15cb3dSCy Schubert 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
3972b15cb3dSCy Schubert 	evutil_socket_t fd;
3982b15cb3dSCy Schubert 
3992b15cb3dSCy Schubert 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
4002b15cb3dSCy Schubert 			!upcast(bev)->read_in_progress);
4012b15cb3dSCy Schubert 
4022b15cb3dSCy Schubert 	bev_async_del_read(bev_async);
4032b15cb3dSCy Schubert 	bev_async_del_write(bev_async);
4042b15cb3dSCy Schubert 
4052b15cb3dSCy Schubert 	fd = evbuffer_overlapped_get_fd_(bev->input);
406*a466cc55SCy Schubert 	if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
407a25439b6SCy Schubert 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
4082b15cb3dSCy Schubert 		evutil_closesocket(fd);
409*a466cc55SCy Schubert 		evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
4102b15cb3dSCy Schubert 	}
4112b15cb3dSCy Schubert }
4122b15cb3dSCy Schubert 
4132b15cb3dSCy Schubert /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
4142b15cb3dSCy Schubert  * we use WSAGetOverlappedResult to translate. */
4152b15cb3dSCy Schubert static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)4162b15cb3dSCy Schubert bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
4172b15cb3dSCy Schubert {
4182b15cb3dSCy Schubert 	DWORD bytes, flags;
4192b15cb3dSCy Schubert 	evutil_socket_t fd;
4202b15cb3dSCy Schubert 
4212b15cb3dSCy Schubert 	fd = evbuffer_overlapped_get_fd_(bev->input);
4222b15cb3dSCy Schubert 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
4232b15cb3dSCy Schubert }
4242b15cb3dSCy Schubert 
4252b15cb3dSCy Schubert static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)4262b15cb3dSCy Schubert be_async_flush(struct bufferevent *bev, short what,
4272b15cb3dSCy Schubert     enum bufferevent_flush_mode mode)
4282b15cb3dSCy Schubert {
4292b15cb3dSCy Schubert 	return 0;
4302b15cb3dSCy Schubert }
4312b15cb3dSCy Schubert 
4322b15cb3dSCy Schubert static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4332b15cb3dSCy Schubert connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
4342b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
4352b15cb3dSCy Schubert {
4362b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_connect(eo);
4372b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
4382b15cb3dSCy Schubert 	evutil_socket_t sock;
4392b15cb3dSCy Schubert 
4402b15cb3dSCy Schubert 	BEV_LOCK(bev);
4412b15cb3dSCy Schubert 
4422b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->bev.connecting);
4432b15cb3dSCy Schubert 	bev_a->bev.connecting = 0;
4442b15cb3dSCy Schubert 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
4452b15cb3dSCy Schubert 	/* XXXX Handle error? */
4462b15cb3dSCy Schubert 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
4472b15cb3dSCy Schubert 
4482b15cb3dSCy Schubert 	if (ok)
4492b15cb3dSCy Schubert 		bufferevent_async_set_connected_(bev);
4502b15cb3dSCy Schubert 	else
4512b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
4522b15cb3dSCy Schubert 
453*a466cc55SCy Schubert 	be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
4542b15cb3dSCy Schubert 
4552b15cb3dSCy Schubert 	event_base_del_virtual_(bev->ev_base);
4562b15cb3dSCy Schubert 
4572b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
4582b15cb3dSCy Schubert }
4592b15cb3dSCy Schubert 
4602b15cb3dSCy Schubert static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)4612b15cb3dSCy Schubert read_complete(struct event_overlapped *eo, ev_uintptr_t key,
4622b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
4632b15cb3dSCy Schubert {
4642b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_read(eo);
4652b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
4662b15cb3dSCy Schubert 	short what = BEV_EVENT_READING;
4672b15cb3dSCy Schubert 	ev_ssize_t amount_unread;
4682b15cb3dSCy Schubert 	BEV_LOCK(bev);
4692b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->read_in_progress);
4702b15cb3dSCy Schubert 
4712b15cb3dSCy Schubert 	amount_unread = bev_a->read_in_progress - nbytes;
4722b15cb3dSCy Schubert 	evbuffer_commit_read_(bev->input, nbytes);
4732b15cb3dSCy Schubert 	bev_a->read_in_progress = 0;
4742b15cb3dSCy Schubert 	if (amount_unread)
4752b15cb3dSCy Schubert 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
4762b15cb3dSCy Schubert 
4772b15cb3dSCy Schubert 	if (!ok)
4782b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
4792b15cb3dSCy Schubert 
4802b15cb3dSCy Schubert 	if (bev_a->ok) {
4812b15cb3dSCy Schubert 		if (ok && nbytes) {
4822b15cb3dSCy Schubert 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
483*a466cc55SCy Schubert 			be_async_trigger_nolock(bev, EV_READ, 0);
4842b15cb3dSCy Schubert 			bev_async_consider_reading(bev_a);
4852b15cb3dSCy Schubert 		} else if (!ok) {
4862b15cb3dSCy Schubert 			what |= BEV_EVENT_ERROR;
4872b15cb3dSCy Schubert 			bev_a->ok = 0;
488*a466cc55SCy Schubert 			be_async_run_eventcb(bev, what, 0);
4892b15cb3dSCy Schubert 		} else if (!nbytes) {
4902b15cb3dSCy Schubert 			what |= BEV_EVENT_EOF;
4912b15cb3dSCy Schubert 			bev_a->ok = 0;
492*a466cc55SCy Schubert 			be_async_run_eventcb(bev, what, 0);
4932b15cb3dSCy Schubert 		}
4942b15cb3dSCy Schubert 	}
4952b15cb3dSCy Schubert 
4962b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
4972b15cb3dSCy Schubert }
4982b15cb3dSCy Schubert 
4992b15cb3dSCy Schubert static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)5002b15cb3dSCy Schubert write_complete(struct event_overlapped *eo, ev_uintptr_t key,
5012b15cb3dSCy Schubert     ev_ssize_t nbytes, int ok)
5022b15cb3dSCy Schubert {
5032b15cb3dSCy Schubert 	struct bufferevent_async *bev_a = upcast_write(eo);
5042b15cb3dSCy Schubert 	struct bufferevent *bev = &bev_a->bev.bev;
5052b15cb3dSCy Schubert 	short what = BEV_EVENT_WRITING;
5062b15cb3dSCy Schubert 	ev_ssize_t amount_unwritten;
5072b15cb3dSCy Schubert 
5082b15cb3dSCy Schubert 	BEV_LOCK(bev);
5092b15cb3dSCy Schubert 	EVUTIL_ASSERT(bev_a->write_in_progress);
5102b15cb3dSCy Schubert 
5112b15cb3dSCy Schubert 	amount_unwritten = bev_a->write_in_progress - nbytes;
5122b15cb3dSCy Schubert 	evbuffer_commit_write_(bev->output, nbytes);
5132b15cb3dSCy Schubert 	bev_a->write_in_progress = 0;
5142b15cb3dSCy Schubert 
5152b15cb3dSCy Schubert 	if (amount_unwritten)
5162b15cb3dSCy Schubert 		bufferevent_decrement_write_buckets_(&bev_a->bev,
5172b15cb3dSCy Schubert 		                                     -amount_unwritten);
5182b15cb3dSCy Schubert 
5192b15cb3dSCy Schubert 
5202b15cb3dSCy Schubert 	if (!ok)
5212b15cb3dSCy Schubert 		bev_async_set_wsa_error(bev, eo);
5222b15cb3dSCy Schubert 
5232b15cb3dSCy Schubert 	if (bev_a->ok) {
5242b15cb3dSCy Schubert 		if (ok && nbytes) {
5252b15cb3dSCy Schubert 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
526*a466cc55SCy Schubert 			be_async_trigger_nolock(bev, EV_WRITE, 0);
5272b15cb3dSCy Schubert 			bev_async_consider_writing(bev_a);
5282b15cb3dSCy Schubert 		} else if (!ok) {
5292b15cb3dSCy Schubert 			what |= BEV_EVENT_ERROR;
5302b15cb3dSCy Schubert 			bev_a->ok = 0;
531*a466cc55SCy Schubert 			be_async_run_eventcb(bev, what, 0);
5322b15cb3dSCy Schubert 		} else if (!nbytes) {
5332b15cb3dSCy Schubert 			what |= BEV_EVENT_EOF;
5342b15cb3dSCy Schubert 			bev_a->ok = 0;
535*a466cc55SCy Schubert 			be_async_run_eventcb(bev, what, 0);
5362b15cb3dSCy Schubert 		}
5372b15cb3dSCy Schubert 	}
5382b15cb3dSCy Schubert 
5392b15cb3dSCy Schubert 	bufferevent_decref_and_unlock_(bev);
5402b15cb3dSCy Schubert }
5412b15cb3dSCy Schubert 
5422b15cb3dSCy Schubert struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)5432b15cb3dSCy Schubert bufferevent_async_new_(struct event_base *base,
5442b15cb3dSCy Schubert     evutil_socket_t fd, int options)
5452b15cb3dSCy Schubert {
5462b15cb3dSCy Schubert 	struct bufferevent_async *bev_a;
5472b15cb3dSCy Schubert 	struct bufferevent *bev;
5482b15cb3dSCy Schubert 	struct event_iocp_port *iocp;
5492b15cb3dSCy Schubert 
5502b15cb3dSCy Schubert 	options |= BEV_OPT_THREADSAFE;
5512b15cb3dSCy Schubert 
5522b15cb3dSCy Schubert 	if (!(iocp = event_base_get_iocp_(base)))
5532b15cb3dSCy Schubert 		return NULL;
5542b15cb3dSCy Schubert 
5552b15cb3dSCy Schubert 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
556*a466cc55SCy Schubert 		if (fatal_error(GetLastError()))
5572b15cb3dSCy Schubert 			return NULL;
5582b15cb3dSCy Schubert 	}
5592b15cb3dSCy Schubert 
5602b15cb3dSCy Schubert 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
5612b15cb3dSCy Schubert 		return NULL;
5622b15cb3dSCy Schubert 
5632b15cb3dSCy Schubert 	bev = &bev_a->bev.bev;
5642b15cb3dSCy Schubert 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
5652b15cb3dSCy Schubert 		mm_free(bev_a);
5662b15cb3dSCy Schubert 		return NULL;
5672b15cb3dSCy Schubert 	}
5682b15cb3dSCy Schubert 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
5692b15cb3dSCy Schubert 		evbuffer_free(bev->input);
5702b15cb3dSCy Schubert 		mm_free(bev_a);
5712b15cb3dSCy Schubert 		return NULL;
5722b15cb3dSCy Schubert 	}
5732b15cb3dSCy Schubert 
5742b15cb3dSCy Schubert 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
5752b15cb3dSCy Schubert 		options)<0)
5762b15cb3dSCy Schubert 		goto err;
5772b15cb3dSCy Schubert 
5782b15cb3dSCy Schubert 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
5792b15cb3dSCy Schubert 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
5802b15cb3dSCy Schubert 
5812b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
5822b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
5832b15cb3dSCy Schubert 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
5842b15cb3dSCy Schubert 
5852b15cb3dSCy Schubert 	bufferevent_init_generic_timeout_cbs_(bev);
5862b15cb3dSCy Schubert 
587a25439b6SCy Schubert 	bev_a->ok = fd >= 0;
588a25439b6SCy Schubert 
5892b15cb3dSCy Schubert 	return bev;
5902b15cb3dSCy Schubert err:
5912b15cb3dSCy Schubert 	bufferevent_free(&bev_a->bev.bev);
5922b15cb3dSCy Schubert 	return NULL;
5932b15cb3dSCy Schubert }
5942b15cb3dSCy Schubert 
5952b15cb3dSCy Schubert void
bufferevent_async_set_connected_(struct bufferevent * bev)5962b15cb3dSCy Schubert bufferevent_async_set_connected_(struct bufferevent *bev)
5972b15cb3dSCy Schubert {
5982b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
5992b15cb3dSCy Schubert 	bev_async->ok = 1;
6002b15cb3dSCy Schubert 	/* Now's a good time to consider reading/writing */
6012b15cb3dSCy Schubert 	be_async_enable(bev, bev->enabled);
6022b15cb3dSCy Schubert }
6032b15cb3dSCy Schubert 
6042b15cb3dSCy Schubert int
bufferevent_async_can_connect_(struct bufferevent * bev)6052b15cb3dSCy Schubert bufferevent_async_can_connect_(struct bufferevent *bev)
6062b15cb3dSCy Schubert {
6072b15cb3dSCy Schubert 	const struct win32_extension_fns *ext =
6082b15cb3dSCy Schubert 	    event_get_win32_extension_fns_();
6092b15cb3dSCy Schubert 
6102b15cb3dSCy Schubert 	if (BEV_IS_ASYNC(bev) &&
6112b15cb3dSCy Schubert 	    event_base_get_iocp_(bev->ev_base) &&
6122b15cb3dSCy Schubert 	    ext && ext->ConnectEx)
6132b15cb3dSCy Schubert 		return 1;
6142b15cb3dSCy Schubert 
6152b15cb3dSCy Schubert 	return 0;
6162b15cb3dSCy Schubert }
6172b15cb3dSCy Schubert 
6182b15cb3dSCy Schubert int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)6192b15cb3dSCy Schubert bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
6202b15cb3dSCy Schubert 	const struct sockaddr *sa, int socklen)
6212b15cb3dSCy Schubert {
6222b15cb3dSCy Schubert 	BOOL rc;
6232b15cb3dSCy Schubert 	struct bufferevent_async *bev_async = upcast(bev);
6242b15cb3dSCy Schubert 	struct sockaddr_storage ss;
6252b15cb3dSCy Schubert 	const struct win32_extension_fns *ext =
6262b15cb3dSCy Schubert 	    event_get_win32_extension_fns_();
6272b15cb3dSCy Schubert 
6282b15cb3dSCy Schubert 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
6292b15cb3dSCy Schubert 
6302b15cb3dSCy Schubert 	/* ConnectEx() requires that the socket be bound to an address
6312b15cb3dSCy Schubert 	 * with bind() before using, otherwise it will fail. We attempt
6322b15cb3dSCy Schubert 	 * to issue a bind() here, taking into account that the error
6332b15cb3dSCy Schubert 	 * code is set to WSAEINVAL when the socket is already bound. */
6342b15cb3dSCy Schubert 	memset(&ss, 0, sizeof(ss));
6352b15cb3dSCy Schubert 	if (sa->sa_family == AF_INET) {
6362b15cb3dSCy Schubert 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
6372b15cb3dSCy Schubert 		sin->sin_family = AF_INET;
6382b15cb3dSCy Schubert 		sin->sin_addr.s_addr = INADDR_ANY;
6392b15cb3dSCy Schubert 	} else if (sa->sa_family == AF_INET6) {
6402b15cb3dSCy Schubert 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
6412b15cb3dSCy Schubert 		sin6->sin6_family = AF_INET6;
6422b15cb3dSCy Schubert 		sin6->sin6_addr = in6addr_any;
6432b15cb3dSCy Schubert 	} else {
6442b15cb3dSCy Schubert 		/* Well, the user will have to bind() */
6452b15cb3dSCy Schubert 		return -1;
6462b15cb3dSCy Schubert 	}
6472b15cb3dSCy Schubert 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
6482b15cb3dSCy Schubert 	    WSAGetLastError() != WSAEINVAL)
6492b15cb3dSCy Schubert 		return -1;
6502b15cb3dSCy Schubert 
6512b15cb3dSCy Schubert 	event_base_add_virtual_(bev->ev_base);
6522b15cb3dSCy Schubert 	bufferevent_incref_(bev);
6532b15cb3dSCy Schubert 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
6542b15cb3dSCy Schubert 			    &bev_async->connect_overlapped.overlapped);
6552b15cb3dSCy Schubert 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
6562b15cb3dSCy Schubert 		return 0;
6572b15cb3dSCy Schubert 
6582b15cb3dSCy Schubert 	event_base_del_virtual_(bev->ev_base);
6592b15cb3dSCy Schubert 	bufferevent_decref_(bev);
6602b15cb3dSCy Schubert 
6612b15cb3dSCy Schubert 	return -1;
6622b15cb3dSCy Schubert }
6632b15cb3dSCy Schubert 
6642b15cb3dSCy Schubert static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)6652b15cb3dSCy Schubert be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
6662b15cb3dSCy Schubert     union bufferevent_ctrl_data *data)
6672b15cb3dSCy Schubert {
6682b15cb3dSCy Schubert 	switch (op) {
6692b15cb3dSCy Schubert 	case BEV_CTRL_GET_FD:
6702b15cb3dSCy Schubert 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
6712b15cb3dSCy Schubert 		return 0;
6722b15cb3dSCy Schubert 	case BEV_CTRL_SET_FD: {
673*a466cc55SCy Schubert 		struct bufferevent_async *bev_a = upcast(bev);
6742b15cb3dSCy Schubert 		struct event_iocp_port *iocp;
6752b15cb3dSCy Schubert 
6762b15cb3dSCy Schubert 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
6772b15cb3dSCy Schubert 			return 0;
6782b15cb3dSCy Schubert 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
6792b15cb3dSCy Schubert 			return -1;
680*a466cc55SCy Schubert 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
681*a466cc55SCy Schubert 			if (fatal_error(GetLastError()))
6822b15cb3dSCy Schubert 				return -1;
683*a466cc55SCy Schubert 		}
6842b15cb3dSCy Schubert 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
6852b15cb3dSCy Schubert 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
686*a466cc55SCy Schubert 		bev_a->ok = data->fd >= 0;
6872b15cb3dSCy Schubert 		return 0;
6882b15cb3dSCy Schubert 	}
6892b15cb3dSCy Schubert 	case BEV_CTRL_CANCEL_ALL: {
6902b15cb3dSCy Schubert 		struct bufferevent_async *bev_a = upcast(bev);
6912b15cb3dSCy Schubert 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
692*a466cc55SCy Schubert 		if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
6932b15cb3dSCy Schubert 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
6942b15cb3dSCy Schubert 			closesocket(fd);
695*a466cc55SCy Schubert 			evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
6962b15cb3dSCy Schubert 		}
6972b15cb3dSCy Schubert 		bev_a->ok = 0;
6982b15cb3dSCy Schubert 		return 0;
6992b15cb3dSCy Schubert 	}
7002b15cb3dSCy Schubert 	case BEV_CTRL_GET_UNDERLYING:
7012b15cb3dSCy Schubert 	default:
7022b15cb3dSCy Schubert 		return -1;
7032b15cb3dSCy Schubert 	}
7042b15cb3dSCy Schubert }
7052b15cb3dSCy Schubert 
7062b15cb3dSCy Schubert 
707