xref: /freebsd/contrib/libevent/bufferevent_async.c (revision b50261e21f39a6c7249a49e7b60aa878c98512a8)
1c43e99fdSEd Maste /*
2c43e99fdSEd Maste  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3c43e99fdSEd Maste  *
4c43e99fdSEd Maste  * All rights reserved.
5c43e99fdSEd Maste  *
6c43e99fdSEd Maste  * Redistribution and use in source and binary forms, with or without
7c43e99fdSEd Maste  * modification, are permitted provided that the following conditions
8c43e99fdSEd Maste  * are met:
9c43e99fdSEd Maste  * 1. Redistributions of source code must retain the above copyright
10c43e99fdSEd Maste  *    notice, this list of conditions and the following disclaimer.
11c43e99fdSEd Maste  * 2. Redistributions in binary form must reproduce the above copyright
12c43e99fdSEd Maste  *    notice, this list of conditions and the following disclaimer in the
13c43e99fdSEd Maste  *    documentation and/or other materials provided with the distribution.
14c43e99fdSEd Maste  * 3. The name of the author may not be used to endorse or promote products
15c43e99fdSEd Maste  *    derived from this software without specific prior written permission.
16c43e99fdSEd Maste  *
17c43e99fdSEd Maste  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18c43e99fdSEd Maste  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19c43e99fdSEd Maste  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20c43e99fdSEd Maste  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21c43e99fdSEd Maste  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22c43e99fdSEd Maste  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23c43e99fdSEd Maste  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24c43e99fdSEd Maste  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25c43e99fdSEd Maste  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26c43e99fdSEd Maste  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27c43e99fdSEd Maste  */
28c43e99fdSEd Maste 
29c43e99fdSEd Maste #include "event2/event-config.h"
30c43e99fdSEd Maste #include "evconfig-private.h"
31c43e99fdSEd Maste 
32c43e99fdSEd Maste #ifdef EVENT__HAVE_SYS_TIME_H
33c43e99fdSEd Maste #include <sys/time.h>
34c43e99fdSEd Maste #endif
35c43e99fdSEd Maste 
36c43e99fdSEd Maste #include <errno.h>
37c43e99fdSEd Maste #include <stdio.h>
38c43e99fdSEd Maste #include <stdlib.h>
39c43e99fdSEd Maste #include <string.h>
40c43e99fdSEd Maste #ifdef EVENT__HAVE_STDARG_H
41c43e99fdSEd Maste #include <stdarg.h>
42c43e99fdSEd Maste #endif
43c43e99fdSEd Maste #ifdef EVENT__HAVE_UNISTD_H
44c43e99fdSEd Maste #include <unistd.h>
45c43e99fdSEd Maste #endif
46c43e99fdSEd Maste 
47c43e99fdSEd Maste #ifdef _WIN32
48c43e99fdSEd Maste #include <winsock2.h>
49*b50261e2SCy Schubert #include <winerror.h>
50c43e99fdSEd Maste #include <ws2tcpip.h>
51c43e99fdSEd Maste #endif
52c43e99fdSEd Maste 
53c43e99fdSEd Maste #include <sys/queue.h>
54c43e99fdSEd Maste 
55c43e99fdSEd Maste #include "event2/util.h"
56c43e99fdSEd Maste #include "event2/bufferevent.h"
57c43e99fdSEd Maste #include "event2/buffer.h"
58c43e99fdSEd Maste #include "event2/bufferevent_struct.h"
59c43e99fdSEd Maste #include "event2/event.h"
60c43e99fdSEd Maste #include "event2/util.h"
61c43e99fdSEd Maste #include "event-internal.h"
62c43e99fdSEd Maste #include "log-internal.h"
63c43e99fdSEd Maste #include "mm-internal.h"
64c43e99fdSEd Maste #include "bufferevent-internal.h"
65c43e99fdSEd Maste #include "util-internal.h"
66c43e99fdSEd Maste #include "iocp-internal.h"
67c43e99fdSEd Maste 
68c43e99fdSEd Maste #ifndef SO_UPDATE_CONNECT_CONTEXT
69c43e99fdSEd Maste /* Mingw is sometimes missing this */
70c43e99fdSEd Maste #define SO_UPDATE_CONNECT_CONTEXT 0x7010
71c43e99fdSEd Maste #endif
72c43e99fdSEd Maste 
73c43e99fdSEd Maste /* prototypes */
74c43e99fdSEd Maste static int be_async_enable(struct bufferevent *, short);
75c43e99fdSEd Maste static int be_async_disable(struct bufferevent *, short);
76c43e99fdSEd Maste static void be_async_destruct(struct bufferevent *);
77c43e99fdSEd Maste static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
78c43e99fdSEd Maste static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
79c43e99fdSEd Maste 
80c43e99fdSEd Maste struct bufferevent_async {
81c43e99fdSEd Maste 	struct bufferevent_private bev;
82c43e99fdSEd Maste 	struct event_overlapped connect_overlapped;
83c43e99fdSEd Maste 	struct event_overlapped read_overlapped;
84c43e99fdSEd Maste 	struct event_overlapped write_overlapped;
85c43e99fdSEd Maste 	size_t read_in_progress;
86c43e99fdSEd Maste 	size_t write_in_progress;
87c43e99fdSEd Maste 	unsigned ok : 1;
88c43e99fdSEd Maste 	unsigned read_added : 1;
89c43e99fdSEd Maste 	unsigned write_added : 1;
90c43e99fdSEd Maste };
91c43e99fdSEd Maste 
92c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_async = {
93c43e99fdSEd Maste 	"socket_async",
94c43e99fdSEd Maste 	evutil_offsetof(struct bufferevent_async, bev.bev),
95c43e99fdSEd Maste 	be_async_enable,
96c43e99fdSEd Maste 	be_async_disable,
97c43e99fdSEd Maste 	NULL, /* Unlink */
98c43e99fdSEd Maste 	be_async_destruct,
99c43e99fdSEd Maste 	bufferevent_generic_adj_timeouts_,
100c43e99fdSEd Maste 	be_async_flush,
101c43e99fdSEd Maste 	be_async_ctrl,
102c43e99fdSEd Maste };
103c43e99fdSEd Maste 
104*b50261e2SCy Schubert static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)105*b50261e2SCy Schubert be_async_run_eventcb(struct bufferevent *bev, short what, int options)
106*b50261e2SCy Schubert { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
107*b50261e2SCy Schubert 
108*b50261e2SCy Schubert static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)109*b50261e2SCy Schubert be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
110*b50261e2SCy Schubert { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
111*b50261e2SCy Schubert 
112*b50261e2SCy Schubert static inline int
fatal_error(int err)113*b50261e2SCy Schubert fatal_error(int err)
114*b50261e2SCy Schubert {
115*b50261e2SCy Schubert 	switch (err) {
116*b50261e2SCy Schubert 		/* We may have already associated this fd with a port.
117*b50261e2SCy Schubert 		 * Let's hope it's this port, and that the error code
118*b50261e2SCy Schubert 		 * for doing this neer changes. */
119*b50261e2SCy Schubert 		case ERROR_INVALID_PARAMETER:
120*b50261e2SCy Schubert 			return 0;
121*b50261e2SCy Schubert 	}
122*b50261e2SCy Schubert 	return 1;
123*b50261e2SCy Schubert }
124*b50261e2SCy Schubert 
125c43e99fdSEd Maste static inline struct bufferevent_async *
upcast(struct bufferevent * bev)126c43e99fdSEd Maste upcast(struct bufferevent *bev)
127c43e99fdSEd Maste {
128c43e99fdSEd Maste 	struct bufferevent_async *bev_a;
129*b50261e2SCy Schubert 	if (!BEV_IS_ASYNC(bev))
130c43e99fdSEd Maste 		return NULL;
131c43e99fdSEd Maste 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
132c43e99fdSEd Maste 	return bev_a;
133c43e99fdSEd Maste }
134c43e99fdSEd Maste 
135c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)136c43e99fdSEd Maste upcast_connect(struct event_overlapped *eo)
137c43e99fdSEd Maste {
138c43e99fdSEd Maste 	struct bufferevent_async *bev_a;
139c43e99fdSEd Maste 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
140c43e99fdSEd Maste 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
141c43e99fdSEd Maste 	return bev_a;
142c43e99fdSEd Maste }
143c43e99fdSEd Maste 
144c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)145c43e99fdSEd Maste upcast_read(struct event_overlapped *eo)
146c43e99fdSEd Maste {
147c43e99fdSEd Maste 	struct bufferevent_async *bev_a;
148c43e99fdSEd Maste 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
149c43e99fdSEd Maste 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
150c43e99fdSEd Maste 	return bev_a;
151c43e99fdSEd Maste }
152c43e99fdSEd Maste 
153c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)154c43e99fdSEd Maste upcast_write(struct event_overlapped *eo)
155c43e99fdSEd Maste {
156c43e99fdSEd Maste 	struct bufferevent_async *bev_a;
157c43e99fdSEd Maste 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
158c43e99fdSEd Maste 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
159c43e99fdSEd Maste 	return bev_a;
160c43e99fdSEd Maste }
161c43e99fdSEd Maste 
162c43e99fdSEd Maste static void
bev_async_del_write(struct bufferevent_async * beva)163c43e99fdSEd Maste bev_async_del_write(struct bufferevent_async *beva)
164c43e99fdSEd Maste {
165c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
166c43e99fdSEd Maste 
167c43e99fdSEd Maste 	if (beva->write_added) {
168c43e99fdSEd Maste 		beva->write_added = 0;
169c43e99fdSEd Maste 		event_base_del_virtual_(bev->ev_base);
170c43e99fdSEd Maste 	}
171c43e99fdSEd Maste }
172c43e99fdSEd Maste 
173c43e99fdSEd Maste static void
bev_async_del_read(struct bufferevent_async * beva)174c43e99fdSEd Maste bev_async_del_read(struct bufferevent_async *beva)
175c43e99fdSEd Maste {
176c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
177c43e99fdSEd Maste 
178c43e99fdSEd Maste 	if (beva->read_added) {
179c43e99fdSEd Maste 		beva->read_added = 0;
180c43e99fdSEd Maste 		event_base_del_virtual_(bev->ev_base);
181c43e99fdSEd Maste 	}
182c43e99fdSEd Maste }
183c43e99fdSEd Maste 
184c43e99fdSEd Maste static void
bev_async_add_write(struct bufferevent_async * beva)185c43e99fdSEd Maste bev_async_add_write(struct bufferevent_async *beva)
186c43e99fdSEd Maste {
187c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
188c43e99fdSEd Maste 
189c43e99fdSEd Maste 	if (!beva->write_added) {
190c43e99fdSEd Maste 		beva->write_added = 1;
191c43e99fdSEd Maste 		event_base_add_virtual_(bev->ev_base);
192c43e99fdSEd Maste 	}
193c43e99fdSEd Maste }
194c43e99fdSEd Maste 
195c43e99fdSEd Maste static void
bev_async_add_read(struct bufferevent_async * beva)196c43e99fdSEd Maste bev_async_add_read(struct bufferevent_async *beva)
197c43e99fdSEd Maste {
198c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
199c43e99fdSEd Maste 
200c43e99fdSEd Maste 	if (!beva->read_added) {
201c43e99fdSEd Maste 		beva->read_added = 1;
202c43e99fdSEd Maste 		event_base_add_virtual_(bev->ev_base);
203c43e99fdSEd Maste 	}
204c43e99fdSEd Maste }
205c43e99fdSEd Maste 
206c43e99fdSEd Maste static void
bev_async_consider_writing(struct bufferevent_async * beva)207c43e99fdSEd Maste bev_async_consider_writing(struct bufferevent_async *beva)
208c43e99fdSEd Maste {
209c43e99fdSEd Maste 	size_t at_most;
210c43e99fdSEd Maste 	int limit;
211c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
212c43e99fdSEd Maste 
213c43e99fdSEd Maste 	/* Don't write if there's a write in progress, or we do not
214c43e99fdSEd Maste 	 * want to write, or when there's nothing left to write. */
215c43e99fdSEd Maste 	if (beva->write_in_progress || beva->bev.connecting)
216c43e99fdSEd Maste 		return;
217c43e99fdSEd Maste 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
218c43e99fdSEd Maste 	    !evbuffer_get_length(bev->output)) {
219c43e99fdSEd Maste 		bev_async_del_write(beva);
220c43e99fdSEd Maste 		return;
221c43e99fdSEd Maste 	}
222c43e99fdSEd Maste 
223c43e99fdSEd Maste 	at_most = evbuffer_get_length(bev->output);
224c43e99fdSEd Maste 
225c43e99fdSEd Maste 	/* This is safe so long as bufferevent_get_write_max never returns
226c43e99fdSEd Maste 	 * more than INT_MAX.  That's true for now. XXXX */
227c43e99fdSEd Maste 	limit = (int)bufferevent_get_write_max_(&beva->bev);
228c43e99fdSEd Maste 	if (at_most >= (size_t)limit && limit >= 0)
229c43e99fdSEd Maste 		at_most = limit;
230c43e99fdSEd Maste 
231c43e99fdSEd Maste 	if (beva->bev.write_suspended) {
232c43e99fdSEd Maste 		bev_async_del_write(beva);
233c43e99fdSEd Maste 		return;
234c43e99fdSEd Maste 	}
235c43e99fdSEd Maste 
236c43e99fdSEd Maste 	/*  XXXX doesn't respect low-water mark very well. */
237c43e99fdSEd Maste 	bufferevent_incref_(bev);
238c43e99fdSEd Maste 	if (evbuffer_launch_write_(bev->output, at_most,
239c43e99fdSEd Maste 	    &beva->write_overlapped)) {
240c43e99fdSEd Maste 		bufferevent_decref_(bev);
241c43e99fdSEd Maste 		beva->ok = 0;
242*b50261e2SCy Schubert 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
243c43e99fdSEd Maste 	} else {
244c43e99fdSEd Maste 		beva->write_in_progress = at_most;
245c43e99fdSEd Maste 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
246c43e99fdSEd Maste 		bev_async_add_write(beva);
247c43e99fdSEd Maste 	}
248c43e99fdSEd Maste }
249c43e99fdSEd Maste 
250c43e99fdSEd Maste static void
bev_async_consider_reading(struct bufferevent_async * beva)251c43e99fdSEd Maste bev_async_consider_reading(struct bufferevent_async *beva)
252c43e99fdSEd Maste {
253c43e99fdSEd Maste 	size_t cur_size;
254c43e99fdSEd Maste 	size_t read_high;
255c43e99fdSEd Maste 	size_t at_most;
256c43e99fdSEd Maste 	int limit;
257c43e99fdSEd Maste 	struct bufferevent *bev = &beva->bev.bev;
258c43e99fdSEd Maste 
259c43e99fdSEd Maste 	/* Don't read if there is a read in progress, or we do not
260c43e99fdSEd Maste 	 * want to read. */
261c43e99fdSEd Maste 	if (beva->read_in_progress || beva->bev.connecting)
262c43e99fdSEd Maste 		return;
263c43e99fdSEd Maste 	if (!beva->ok || !(bev->enabled&EV_READ)) {
264c43e99fdSEd Maste 		bev_async_del_read(beva);
265c43e99fdSEd Maste 		return;
266c43e99fdSEd Maste 	}
267c43e99fdSEd Maste 
268c43e99fdSEd Maste 	/* Don't read if we're full */
269c43e99fdSEd Maste 	cur_size = evbuffer_get_length(bev->input);
270c43e99fdSEd Maste 	read_high = bev->wm_read.high;
271c43e99fdSEd Maste 	if (read_high) {
272c43e99fdSEd Maste 		if (cur_size >= read_high) {
273c43e99fdSEd Maste 			bev_async_del_read(beva);
274c43e99fdSEd Maste 			return;
275c43e99fdSEd Maste 		}
276c43e99fdSEd Maste 		at_most = read_high - cur_size;
277c43e99fdSEd Maste 	} else {
278c43e99fdSEd Maste 		at_most = 16384; /* FIXME totally magic. */
279c43e99fdSEd Maste 	}
280c43e99fdSEd Maste 
281c43e99fdSEd Maste 	/* XXXX This over-commits. */
282c43e99fdSEd Maste 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
283c43e99fdSEd Maste 	limit = (int)bufferevent_get_read_max_(&beva->bev);
284c43e99fdSEd Maste 	if (at_most >= (size_t)limit && limit >= 0)
285c43e99fdSEd Maste 		at_most = limit;
286c43e99fdSEd Maste 
287c43e99fdSEd Maste 	if (beva->bev.read_suspended) {
288c43e99fdSEd Maste 		bev_async_del_read(beva);
289c43e99fdSEd Maste 		return;
290c43e99fdSEd Maste 	}
291c43e99fdSEd Maste 
292c43e99fdSEd Maste 	bufferevent_incref_(bev);
293c43e99fdSEd Maste 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
294c43e99fdSEd Maste 		beva->ok = 0;
295*b50261e2SCy Schubert 		be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
296c43e99fdSEd Maste 		bufferevent_decref_(bev);
297c43e99fdSEd Maste 	} else {
298c43e99fdSEd Maste 		beva->read_in_progress = at_most;
299c43e99fdSEd Maste 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
300c43e99fdSEd Maste 		bev_async_add_read(beva);
301c43e99fdSEd Maste 	}
302c43e99fdSEd Maste 
303c43e99fdSEd Maste 	return;
304c43e99fdSEd Maste }
305c43e99fdSEd Maste 
306c43e99fdSEd Maste static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)307c43e99fdSEd Maste be_async_outbuf_callback(struct evbuffer *buf,
308c43e99fdSEd Maste     const struct evbuffer_cb_info *cbinfo,
309c43e99fdSEd Maste     void *arg)
310c43e99fdSEd Maste {
311c43e99fdSEd Maste 	struct bufferevent *bev = arg;
312c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
313c43e99fdSEd Maste 
314c43e99fdSEd Maste 	/* If we added data to the outbuf and were not writing before,
315c43e99fdSEd Maste 	 * we may want to write now. */
316c43e99fdSEd Maste 
317c43e99fdSEd Maste 	bufferevent_incref_and_lock_(bev);
318c43e99fdSEd Maste 
319c43e99fdSEd Maste 	if (cbinfo->n_added)
320c43e99fdSEd Maste 		bev_async_consider_writing(bev_async);
321c43e99fdSEd Maste 
322c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bev);
323c43e99fdSEd Maste }
324c43e99fdSEd Maste 
325c43e99fdSEd Maste static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)326c43e99fdSEd Maste be_async_inbuf_callback(struct evbuffer *buf,
327c43e99fdSEd Maste     const struct evbuffer_cb_info *cbinfo,
328c43e99fdSEd Maste     void *arg)
329c43e99fdSEd Maste {
330c43e99fdSEd Maste 	struct bufferevent *bev = arg;
331c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
332c43e99fdSEd Maste 
333c43e99fdSEd Maste 	/* If we drained data from the inbuf and were not reading before,
334c43e99fdSEd Maste 	 * we may want to read now */
335c43e99fdSEd Maste 
336c43e99fdSEd Maste 	bufferevent_incref_and_lock_(bev);
337c43e99fdSEd Maste 
338c43e99fdSEd Maste 	if (cbinfo->n_deleted)
339c43e99fdSEd Maste 		bev_async_consider_reading(bev_async);
340c43e99fdSEd Maste 
341c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bev);
342c43e99fdSEd Maste }
343c43e99fdSEd Maste 
344c43e99fdSEd Maste static int
be_async_enable(struct bufferevent * buf,short what)345c43e99fdSEd Maste be_async_enable(struct bufferevent *buf, short what)
346c43e99fdSEd Maste {
347c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(buf);
348c43e99fdSEd Maste 
349c43e99fdSEd Maste 	if (!bev_async->ok)
350c43e99fdSEd Maste 		return -1;
351c43e99fdSEd Maste 
352c43e99fdSEd Maste 	if (bev_async->bev.connecting) {
353c43e99fdSEd Maste 		/* Don't launch anything during connection attempts. */
354c43e99fdSEd Maste 		return 0;
355c43e99fdSEd Maste 	}
356c43e99fdSEd Maste 
357c43e99fdSEd Maste 	if (what & EV_READ)
358c43e99fdSEd Maste 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
359c43e99fdSEd Maste 	if (what & EV_WRITE)
360c43e99fdSEd Maste 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
361c43e99fdSEd Maste 
362c43e99fdSEd Maste 	/* If we newly enable reading or writing, and we aren't reading or
363c43e99fdSEd Maste 	   writing already, consider launching a new read or write. */
364c43e99fdSEd Maste 
365c43e99fdSEd Maste 	if (what & EV_READ)
366c43e99fdSEd Maste 		bev_async_consider_reading(bev_async);
367c43e99fdSEd Maste 	if (what & EV_WRITE)
368c43e99fdSEd Maste 		bev_async_consider_writing(bev_async);
369c43e99fdSEd Maste 	return 0;
370c43e99fdSEd Maste }
371c43e99fdSEd Maste 
372c43e99fdSEd Maste static int
be_async_disable(struct bufferevent * bev,short what)373c43e99fdSEd Maste be_async_disable(struct bufferevent *bev, short what)
374c43e99fdSEd Maste {
375c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
376c43e99fdSEd Maste 	/* XXXX If we disable reading or writing, we may want to consider
377c43e99fdSEd Maste 	 * canceling any in-progress read or write operation, though it might
378c43e99fdSEd Maste 	 * not work. */
379c43e99fdSEd Maste 
380c43e99fdSEd Maste 	if (what & EV_READ) {
381c43e99fdSEd Maste 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
382c43e99fdSEd Maste 		bev_async_del_read(bev_async);
383c43e99fdSEd Maste 	}
384c43e99fdSEd Maste 	if (what & EV_WRITE) {
385c43e99fdSEd Maste 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
386c43e99fdSEd Maste 		bev_async_del_write(bev_async);
387c43e99fdSEd Maste 	}
388c43e99fdSEd Maste 
389c43e99fdSEd Maste 	return 0;
390c43e99fdSEd Maste }
391c43e99fdSEd Maste 
392c43e99fdSEd Maste static void
be_async_destruct(struct bufferevent * bev)393c43e99fdSEd Maste be_async_destruct(struct bufferevent *bev)
394c43e99fdSEd Maste {
395c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
396c43e99fdSEd Maste 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
397c43e99fdSEd Maste 	evutil_socket_t fd;
398c43e99fdSEd Maste 
399c43e99fdSEd Maste 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
400c43e99fdSEd Maste 			!upcast(bev)->read_in_progress);
401c43e99fdSEd Maste 
402c43e99fdSEd Maste 	bev_async_del_read(bev_async);
403c43e99fdSEd Maste 	bev_async_del_write(bev_async);
404c43e99fdSEd Maste 
405c43e99fdSEd Maste 	fd = evbuffer_overlapped_get_fd_(bev->input);
406*b50261e2SCy Schubert 	if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
407c43e99fdSEd Maste 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
408c43e99fdSEd Maste 		evutil_closesocket(fd);
409*b50261e2SCy Schubert 		evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
410c43e99fdSEd Maste 	}
411c43e99fdSEd Maste }
412c43e99fdSEd Maste 
413c43e99fdSEd Maste /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
414c43e99fdSEd Maste  * we use WSAGetOverlappedResult to translate. */
415c43e99fdSEd Maste static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)416c43e99fdSEd Maste bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
417c43e99fdSEd Maste {
418c43e99fdSEd Maste 	DWORD bytes, flags;
419c43e99fdSEd Maste 	evutil_socket_t fd;
420c43e99fdSEd Maste 
421c43e99fdSEd Maste 	fd = evbuffer_overlapped_get_fd_(bev->input);
422c43e99fdSEd Maste 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
423c43e99fdSEd Maste }
424c43e99fdSEd Maste 
425c43e99fdSEd Maste static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)426c43e99fdSEd Maste be_async_flush(struct bufferevent *bev, short what,
427c43e99fdSEd Maste     enum bufferevent_flush_mode mode)
428c43e99fdSEd Maste {
429c43e99fdSEd Maste 	return 0;
430c43e99fdSEd Maste }
431c43e99fdSEd Maste 
432c43e99fdSEd Maste static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)433c43e99fdSEd Maste connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
434c43e99fdSEd Maste     ev_ssize_t nbytes, int ok)
435c43e99fdSEd Maste {
436c43e99fdSEd Maste 	struct bufferevent_async *bev_a = upcast_connect(eo);
437c43e99fdSEd Maste 	struct bufferevent *bev = &bev_a->bev.bev;
438c43e99fdSEd Maste 	evutil_socket_t sock;
439c43e99fdSEd Maste 
440c43e99fdSEd Maste 	BEV_LOCK(bev);
441c43e99fdSEd Maste 
442c43e99fdSEd Maste 	EVUTIL_ASSERT(bev_a->bev.connecting);
443c43e99fdSEd Maste 	bev_a->bev.connecting = 0;
444c43e99fdSEd Maste 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
445c43e99fdSEd Maste 	/* XXXX Handle error? */
446c43e99fdSEd Maste 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
447c43e99fdSEd Maste 
448c43e99fdSEd Maste 	if (ok)
449c43e99fdSEd Maste 		bufferevent_async_set_connected_(bev);
450c43e99fdSEd Maste 	else
451c43e99fdSEd Maste 		bev_async_set_wsa_error(bev, eo);
452c43e99fdSEd Maste 
453*b50261e2SCy Schubert 	be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
454c43e99fdSEd Maste 
455c43e99fdSEd Maste 	event_base_del_virtual_(bev->ev_base);
456c43e99fdSEd Maste 
457c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bev);
458c43e99fdSEd Maste }
459c43e99fdSEd Maste 
460c43e99fdSEd Maste static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)461c43e99fdSEd Maste read_complete(struct event_overlapped *eo, ev_uintptr_t key,
462c43e99fdSEd Maste     ev_ssize_t nbytes, int ok)
463c43e99fdSEd Maste {
464c43e99fdSEd Maste 	struct bufferevent_async *bev_a = upcast_read(eo);
465c43e99fdSEd Maste 	struct bufferevent *bev = &bev_a->bev.bev;
466c43e99fdSEd Maste 	short what = BEV_EVENT_READING;
467c43e99fdSEd Maste 	ev_ssize_t amount_unread;
468c43e99fdSEd Maste 	BEV_LOCK(bev);
469c43e99fdSEd Maste 	EVUTIL_ASSERT(bev_a->read_in_progress);
470c43e99fdSEd Maste 
471c43e99fdSEd Maste 	amount_unread = bev_a->read_in_progress - nbytes;
472c43e99fdSEd Maste 	evbuffer_commit_read_(bev->input, nbytes);
473c43e99fdSEd Maste 	bev_a->read_in_progress = 0;
474c43e99fdSEd Maste 	if (amount_unread)
475c43e99fdSEd Maste 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
476c43e99fdSEd Maste 
477c43e99fdSEd Maste 	if (!ok)
478c43e99fdSEd Maste 		bev_async_set_wsa_error(bev, eo);
479c43e99fdSEd Maste 
480c43e99fdSEd Maste 	if (bev_a->ok) {
481c43e99fdSEd Maste 		if (ok && nbytes) {
482c43e99fdSEd Maste 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
483*b50261e2SCy Schubert 			be_async_trigger_nolock(bev, EV_READ, 0);
484c43e99fdSEd Maste 			bev_async_consider_reading(bev_a);
485c43e99fdSEd Maste 		} else if (!ok) {
486c43e99fdSEd Maste 			what |= BEV_EVENT_ERROR;
487c43e99fdSEd Maste 			bev_a->ok = 0;
488*b50261e2SCy Schubert 			be_async_run_eventcb(bev, what, 0);
489c43e99fdSEd Maste 		} else if (!nbytes) {
490c43e99fdSEd Maste 			what |= BEV_EVENT_EOF;
491c43e99fdSEd Maste 			bev_a->ok = 0;
492*b50261e2SCy Schubert 			be_async_run_eventcb(bev, what, 0);
493c43e99fdSEd Maste 		}
494c43e99fdSEd Maste 	}
495c43e99fdSEd Maste 
496c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bev);
497c43e99fdSEd Maste }
498c43e99fdSEd Maste 
499c43e99fdSEd Maste static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)500c43e99fdSEd Maste write_complete(struct event_overlapped *eo, ev_uintptr_t key,
501c43e99fdSEd Maste     ev_ssize_t nbytes, int ok)
502c43e99fdSEd Maste {
503c43e99fdSEd Maste 	struct bufferevent_async *bev_a = upcast_write(eo);
504c43e99fdSEd Maste 	struct bufferevent *bev = &bev_a->bev.bev;
505c43e99fdSEd Maste 	short what = BEV_EVENT_WRITING;
506c43e99fdSEd Maste 	ev_ssize_t amount_unwritten;
507c43e99fdSEd Maste 
508c43e99fdSEd Maste 	BEV_LOCK(bev);
509c43e99fdSEd Maste 	EVUTIL_ASSERT(bev_a->write_in_progress);
510c43e99fdSEd Maste 
511c43e99fdSEd Maste 	amount_unwritten = bev_a->write_in_progress - nbytes;
512c43e99fdSEd Maste 	evbuffer_commit_write_(bev->output, nbytes);
513c43e99fdSEd Maste 	bev_a->write_in_progress = 0;
514c43e99fdSEd Maste 
515c43e99fdSEd Maste 	if (amount_unwritten)
516c43e99fdSEd Maste 		bufferevent_decrement_write_buckets_(&bev_a->bev,
517c43e99fdSEd Maste 		                                     -amount_unwritten);
518c43e99fdSEd Maste 
519c43e99fdSEd Maste 
520c43e99fdSEd Maste 	if (!ok)
521c43e99fdSEd Maste 		bev_async_set_wsa_error(bev, eo);
522c43e99fdSEd Maste 
523c43e99fdSEd Maste 	if (bev_a->ok) {
524c43e99fdSEd Maste 		if (ok && nbytes) {
525c43e99fdSEd Maste 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
526*b50261e2SCy Schubert 			be_async_trigger_nolock(bev, EV_WRITE, 0);
527c43e99fdSEd Maste 			bev_async_consider_writing(bev_a);
528c43e99fdSEd Maste 		} else if (!ok) {
529c43e99fdSEd Maste 			what |= BEV_EVENT_ERROR;
530c43e99fdSEd Maste 			bev_a->ok = 0;
531*b50261e2SCy Schubert 			be_async_run_eventcb(bev, what, 0);
532c43e99fdSEd Maste 		} else if (!nbytes) {
533c43e99fdSEd Maste 			what |= BEV_EVENT_EOF;
534c43e99fdSEd Maste 			bev_a->ok = 0;
535*b50261e2SCy Schubert 			be_async_run_eventcb(bev, what, 0);
536c43e99fdSEd Maste 		}
537c43e99fdSEd Maste 	}
538c43e99fdSEd Maste 
539c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bev);
540c43e99fdSEd Maste }
541c43e99fdSEd Maste 
542c43e99fdSEd Maste struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)543c43e99fdSEd Maste bufferevent_async_new_(struct event_base *base,
544c43e99fdSEd Maste     evutil_socket_t fd, int options)
545c43e99fdSEd Maste {
546c43e99fdSEd Maste 	struct bufferevent_async *bev_a;
547c43e99fdSEd Maste 	struct bufferevent *bev;
548c43e99fdSEd Maste 	struct event_iocp_port *iocp;
549c43e99fdSEd Maste 
550c43e99fdSEd Maste 	options |= BEV_OPT_THREADSAFE;
551c43e99fdSEd Maste 
552c43e99fdSEd Maste 	if (!(iocp = event_base_get_iocp_(base)))
553c43e99fdSEd Maste 		return NULL;
554c43e99fdSEd Maste 
555c43e99fdSEd Maste 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
556*b50261e2SCy Schubert 		if (fatal_error(GetLastError()))
557c43e99fdSEd Maste 			return NULL;
558c43e99fdSEd Maste 	}
559c43e99fdSEd Maste 
560c43e99fdSEd Maste 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
561c43e99fdSEd Maste 		return NULL;
562c43e99fdSEd Maste 
563c43e99fdSEd Maste 	bev = &bev_a->bev.bev;
564c43e99fdSEd Maste 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
565c43e99fdSEd Maste 		mm_free(bev_a);
566c43e99fdSEd Maste 		return NULL;
567c43e99fdSEd Maste 	}
568c43e99fdSEd Maste 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
569c43e99fdSEd Maste 		evbuffer_free(bev->input);
570c43e99fdSEd Maste 		mm_free(bev_a);
571c43e99fdSEd Maste 		return NULL;
572c43e99fdSEd Maste 	}
573c43e99fdSEd Maste 
574c43e99fdSEd Maste 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
575c43e99fdSEd Maste 		options)<0)
576c43e99fdSEd Maste 		goto err;
577c43e99fdSEd Maste 
578c43e99fdSEd Maste 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
579c43e99fdSEd Maste 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
580c43e99fdSEd Maste 
581c43e99fdSEd Maste 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
582c43e99fdSEd Maste 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
583c43e99fdSEd Maste 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
584c43e99fdSEd Maste 
585c43e99fdSEd Maste 	bufferevent_init_generic_timeout_cbs_(bev);
586c43e99fdSEd Maste 
587c43e99fdSEd Maste 	bev_a->ok = fd >= 0;
588c43e99fdSEd Maste 
589c43e99fdSEd Maste 	return bev;
590c43e99fdSEd Maste err:
591c43e99fdSEd Maste 	bufferevent_free(&bev_a->bev.bev);
592c43e99fdSEd Maste 	return NULL;
593c43e99fdSEd Maste }
594c43e99fdSEd Maste 
595c43e99fdSEd Maste void
bufferevent_async_set_connected_(struct bufferevent * bev)596c43e99fdSEd Maste bufferevent_async_set_connected_(struct bufferevent *bev)
597c43e99fdSEd Maste {
598c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
599c43e99fdSEd Maste 	bev_async->ok = 1;
600c43e99fdSEd Maste 	/* Now's a good time to consider reading/writing */
601c43e99fdSEd Maste 	be_async_enable(bev, bev->enabled);
602c43e99fdSEd Maste }
603c43e99fdSEd Maste 
604c43e99fdSEd Maste int
bufferevent_async_can_connect_(struct bufferevent * bev)605c43e99fdSEd Maste bufferevent_async_can_connect_(struct bufferevent *bev)
606c43e99fdSEd Maste {
607c43e99fdSEd Maste 	const struct win32_extension_fns *ext =
608c43e99fdSEd Maste 	    event_get_win32_extension_fns_();
609c43e99fdSEd Maste 
610c43e99fdSEd Maste 	if (BEV_IS_ASYNC(bev) &&
611c43e99fdSEd Maste 	    event_base_get_iocp_(bev->ev_base) &&
612c43e99fdSEd Maste 	    ext && ext->ConnectEx)
613c43e99fdSEd Maste 		return 1;
614c43e99fdSEd Maste 
615c43e99fdSEd Maste 	return 0;
616c43e99fdSEd Maste }
617c43e99fdSEd Maste 
618c43e99fdSEd Maste int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)619c43e99fdSEd Maste bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
620c43e99fdSEd Maste 	const struct sockaddr *sa, int socklen)
621c43e99fdSEd Maste {
622c43e99fdSEd Maste 	BOOL rc;
623c43e99fdSEd Maste 	struct bufferevent_async *bev_async = upcast(bev);
624c43e99fdSEd Maste 	struct sockaddr_storage ss;
625c43e99fdSEd Maste 	const struct win32_extension_fns *ext =
626c43e99fdSEd Maste 	    event_get_win32_extension_fns_();
627c43e99fdSEd Maste 
628c43e99fdSEd Maste 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
629c43e99fdSEd Maste 
630c43e99fdSEd Maste 	/* ConnectEx() requires that the socket be bound to an address
631c43e99fdSEd Maste 	 * with bind() before using, otherwise it will fail. We attempt
632c43e99fdSEd Maste 	 * to issue a bind() here, taking into account that the error
633c43e99fdSEd Maste 	 * code is set to WSAEINVAL when the socket is already bound. */
634c43e99fdSEd Maste 	memset(&ss, 0, sizeof(ss));
635c43e99fdSEd Maste 	if (sa->sa_family == AF_INET) {
636c43e99fdSEd Maste 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
637c43e99fdSEd Maste 		sin->sin_family = AF_INET;
638c43e99fdSEd Maste 		sin->sin_addr.s_addr = INADDR_ANY;
639c43e99fdSEd Maste 	} else if (sa->sa_family == AF_INET6) {
640c43e99fdSEd Maste 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
641c43e99fdSEd Maste 		sin6->sin6_family = AF_INET6;
642c43e99fdSEd Maste 		sin6->sin6_addr = in6addr_any;
643c43e99fdSEd Maste 	} else {
644c43e99fdSEd Maste 		/* Well, the user will have to bind() */
645c43e99fdSEd Maste 		return -1;
646c43e99fdSEd Maste 	}
647c43e99fdSEd Maste 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
648c43e99fdSEd Maste 	    WSAGetLastError() != WSAEINVAL)
649c43e99fdSEd Maste 		return -1;
650c43e99fdSEd Maste 
651c43e99fdSEd Maste 	event_base_add_virtual_(bev->ev_base);
652c43e99fdSEd Maste 	bufferevent_incref_(bev);
653c43e99fdSEd Maste 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
654c43e99fdSEd Maste 			    &bev_async->connect_overlapped.overlapped);
655c43e99fdSEd Maste 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
656c43e99fdSEd Maste 		return 0;
657c43e99fdSEd Maste 
658c43e99fdSEd Maste 	event_base_del_virtual_(bev->ev_base);
659c43e99fdSEd Maste 	bufferevent_decref_(bev);
660c43e99fdSEd Maste 
661c43e99fdSEd Maste 	return -1;
662c43e99fdSEd Maste }
663c43e99fdSEd Maste 
664c43e99fdSEd Maste static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)665c43e99fdSEd Maste be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
666c43e99fdSEd Maste     union bufferevent_ctrl_data *data)
667c43e99fdSEd Maste {
668c43e99fdSEd Maste 	switch (op) {
669c43e99fdSEd Maste 	case BEV_CTRL_GET_FD:
670c43e99fdSEd Maste 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
671c43e99fdSEd Maste 		return 0;
672c43e99fdSEd Maste 	case BEV_CTRL_SET_FD: {
673*b50261e2SCy Schubert 		struct bufferevent_async *bev_a = upcast(bev);
674c43e99fdSEd Maste 		struct event_iocp_port *iocp;
675c43e99fdSEd Maste 
676c43e99fdSEd Maste 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
677c43e99fdSEd Maste 			return 0;
678c43e99fdSEd Maste 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
679c43e99fdSEd Maste 			return -1;
680*b50261e2SCy Schubert 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
681*b50261e2SCy Schubert 			if (fatal_error(GetLastError()))
682c43e99fdSEd Maste 				return -1;
683*b50261e2SCy Schubert 		}
684c43e99fdSEd Maste 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
685c43e99fdSEd Maste 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
686*b50261e2SCy Schubert 		bev_a->ok = data->fd >= 0;
687c43e99fdSEd Maste 		return 0;
688c43e99fdSEd Maste 	}
689c43e99fdSEd Maste 	case BEV_CTRL_CANCEL_ALL: {
690c43e99fdSEd Maste 		struct bufferevent_async *bev_a = upcast(bev);
691c43e99fdSEd Maste 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
692*b50261e2SCy Schubert 		if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
693c43e99fdSEd Maste 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
694c43e99fdSEd Maste 			closesocket(fd);
695*b50261e2SCy Schubert 			evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
696c43e99fdSEd Maste 		}
697c43e99fdSEd Maste 		bev_a->ok = 0;
698c43e99fdSEd Maste 		return 0;
699c43e99fdSEd Maste 	}
700c43e99fdSEd Maste 	case BEV_CTRL_GET_UNDERLYING:
701c43e99fdSEd Maste 	default:
702c43e99fdSEd Maste 		return -1;
703c43e99fdSEd Maste 	}
704c43e99fdSEd Maste }
705c43e99fdSEd Maste 
706c43e99fdSEd Maste 
707