1c43e99fdSEd Maste /*
2c43e99fdSEd Maste * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3c43e99fdSEd Maste *
4c43e99fdSEd Maste * All rights reserved.
5c43e99fdSEd Maste *
6c43e99fdSEd Maste * Redistribution and use in source and binary forms, with or without
7c43e99fdSEd Maste * modification, are permitted provided that the following conditions
8c43e99fdSEd Maste * are met:
9c43e99fdSEd Maste * 1. Redistributions of source code must retain the above copyright
10c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer.
11c43e99fdSEd Maste * 2. Redistributions in binary form must reproduce the above copyright
12c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer in the
13c43e99fdSEd Maste * documentation and/or other materials provided with the distribution.
14c43e99fdSEd Maste * 3. The name of the author may not be used to endorse or promote products
15c43e99fdSEd Maste * derived from this software without specific prior written permission.
16c43e99fdSEd Maste *
17c43e99fdSEd Maste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18c43e99fdSEd Maste * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19c43e99fdSEd Maste * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20c43e99fdSEd Maste * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21c43e99fdSEd Maste * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22c43e99fdSEd Maste * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23c43e99fdSEd Maste * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24c43e99fdSEd Maste * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25c43e99fdSEd Maste * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26c43e99fdSEd Maste * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27c43e99fdSEd Maste */
28c43e99fdSEd Maste
29c43e99fdSEd Maste #include "event2/event-config.h"
30c43e99fdSEd Maste #include "evconfig-private.h"
31c43e99fdSEd Maste
32c43e99fdSEd Maste #ifdef EVENT__HAVE_SYS_TIME_H
33c43e99fdSEd Maste #include <sys/time.h>
34c43e99fdSEd Maste #endif
35c43e99fdSEd Maste
36c43e99fdSEd Maste #include <errno.h>
37c43e99fdSEd Maste #include <stdio.h>
38c43e99fdSEd Maste #include <stdlib.h>
39c43e99fdSEd Maste #include <string.h>
40c43e99fdSEd Maste #ifdef EVENT__HAVE_STDARG_H
41c43e99fdSEd Maste #include <stdarg.h>
42c43e99fdSEd Maste #endif
43c43e99fdSEd Maste #ifdef EVENT__HAVE_UNISTD_H
44c43e99fdSEd Maste #include <unistd.h>
45c43e99fdSEd Maste #endif
46c43e99fdSEd Maste
47c43e99fdSEd Maste #ifdef _WIN32
48c43e99fdSEd Maste #include <winsock2.h>
49*b50261e2SCy Schubert #include <winerror.h>
50c43e99fdSEd Maste #include <ws2tcpip.h>
51c43e99fdSEd Maste #endif
52c43e99fdSEd Maste
53c43e99fdSEd Maste #include <sys/queue.h>
54c43e99fdSEd Maste
55c43e99fdSEd Maste #include "event2/util.h"
56c43e99fdSEd Maste #include "event2/bufferevent.h"
57c43e99fdSEd Maste #include "event2/buffer.h"
58c43e99fdSEd Maste #include "event2/bufferevent_struct.h"
59c43e99fdSEd Maste #include "event2/event.h"
60c43e99fdSEd Maste #include "event2/util.h"
61c43e99fdSEd Maste #include "event-internal.h"
62c43e99fdSEd Maste #include "log-internal.h"
63c43e99fdSEd Maste #include "mm-internal.h"
64c43e99fdSEd Maste #include "bufferevent-internal.h"
65c43e99fdSEd Maste #include "util-internal.h"
66c43e99fdSEd Maste #include "iocp-internal.h"
67c43e99fdSEd Maste
68c43e99fdSEd Maste #ifndef SO_UPDATE_CONNECT_CONTEXT
69c43e99fdSEd Maste /* Mingw is sometimes missing this */
70c43e99fdSEd Maste #define SO_UPDATE_CONNECT_CONTEXT 0x7010
71c43e99fdSEd Maste #endif
72c43e99fdSEd Maste
73c43e99fdSEd Maste /* prototypes */
74c43e99fdSEd Maste static int be_async_enable(struct bufferevent *, short);
75c43e99fdSEd Maste static int be_async_disable(struct bufferevent *, short);
76c43e99fdSEd Maste static void be_async_destruct(struct bufferevent *);
77c43e99fdSEd Maste static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
78c43e99fdSEd Maste static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
79c43e99fdSEd Maste
80c43e99fdSEd Maste struct bufferevent_async {
81c43e99fdSEd Maste struct bufferevent_private bev;
82c43e99fdSEd Maste struct event_overlapped connect_overlapped;
83c43e99fdSEd Maste struct event_overlapped read_overlapped;
84c43e99fdSEd Maste struct event_overlapped write_overlapped;
85c43e99fdSEd Maste size_t read_in_progress;
86c43e99fdSEd Maste size_t write_in_progress;
87c43e99fdSEd Maste unsigned ok : 1;
88c43e99fdSEd Maste unsigned read_added : 1;
89c43e99fdSEd Maste unsigned write_added : 1;
90c43e99fdSEd Maste };
91c43e99fdSEd Maste
92c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_async = {
93c43e99fdSEd Maste "socket_async",
94c43e99fdSEd Maste evutil_offsetof(struct bufferevent_async, bev.bev),
95c43e99fdSEd Maste be_async_enable,
96c43e99fdSEd Maste be_async_disable,
97c43e99fdSEd Maste NULL, /* Unlink */
98c43e99fdSEd Maste be_async_destruct,
99c43e99fdSEd Maste bufferevent_generic_adj_timeouts_,
100c43e99fdSEd Maste be_async_flush,
101c43e99fdSEd Maste be_async_ctrl,
102c43e99fdSEd Maste };
103c43e99fdSEd Maste
104*b50261e2SCy Schubert static inline void
be_async_run_eventcb(struct bufferevent * bev,short what,int options)105*b50261e2SCy Schubert be_async_run_eventcb(struct bufferevent *bev, short what, int options)
106*b50261e2SCy Schubert { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
107*b50261e2SCy Schubert
108*b50261e2SCy Schubert static inline void
be_async_trigger_nolock(struct bufferevent * bev,short what,int options)109*b50261e2SCy Schubert be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
110*b50261e2SCy Schubert { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
111*b50261e2SCy Schubert
112*b50261e2SCy Schubert static inline int
fatal_error(int err)113*b50261e2SCy Schubert fatal_error(int err)
114*b50261e2SCy Schubert {
115*b50261e2SCy Schubert switch (err) {
116*b50261e2SCy Schubert /* We may have already associated this fd with a port.
117*b50261e2SCy Schubert * Let's hope it's this port, and that the error code
118*b50261e2SCy Schubert * for doing this neer changes. */
119*b50261e2SCy Schubert case ERROR_INVALID_PARAMETER:
120*b50261e2SCy Schubert return 0;
121*b50261e2SCy Schubert }
122*b50261e2SCy Schubert return 1;
123*b50261e2SCy Schubert }
124*b50261e2SCy Schubert
125c43e99fdSEd Maste static inline struct bufferevent_async *
upcast(struct bufferevent * bev)126c43e99fdSEd Maste upcast(struct bufferevent *bev)
127c43e99fdSEd Maste {
128c43e99fdSEd Maste struct bufferevent_async *bev_a;
129*b50261e2SCy Schubert if (!BEV_IS_ASYNC(bev))
130c43e99fdSEd Maste return NULL;
131c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
132c43e99fdSEd Maste return bev_a;
133c43e99fdSEd Maste }
134c43e99fdSEd Maste
135c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_connect(struct event_overlapped * eo)136c43e99fdSEd Maste upcast_connect(struct event_overlapped *eo)
137c43e99fdSEd Maste {
138c43e99fdSEd Maste struct bufferevent_async *bev_a;
139c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
140c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
141c43e99fdSEd Maste return bev_a;
142c43e99fdSEd Maste }
143c43e99fdSEd Maste
144c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_read(struct event_overlapped * eo)145c43e99fdSEd Maste upcast_read(struct event_overlapped *eo)
146c43e99fdSEd Maste {
147c43e99fdSEd Maste struct bufferevent_async *bev_a;
148c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
149c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
150c43e99fdSEd Maste return bev_a;
151c43e99fdSEd Maste }
152c43e99fdSEd Maste
153c43e99fdSEd Maste static inline struct bufferevent_async *
upcast_write(struct event_overlapped * eo)154c43e99fdSEd Maste upcast_write(struct event_overlapped *eo)
155c43e99fdSEd Maste {
156c43e99fdSEd Maste struct bufferevent_async *bev_a;
157c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
158c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
159c43e99fdSEd Maste return bev_a;
160c43e99fdSEd Maste }
161c43e99fdSEd Maste
162c43e99fdSEd Maste static void
bev_async_del_write(struct bufferevent_async * beva)163c43e99fdSEd Maste bev_async_del_write(struct bufferevent_async *beva)
164c43e99fdSEd Maste {
165c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
166c43e99fdSEd Maste
167c43e99fdSEd Maste if (beva->write_added) {
168c43e99fdSEd Maste beva->write_added = 0;
169c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base);
170c43e99fdSEd Maste }
171c43e99fdSEd Maste }
172c43e99fdSEd Maste
173c43e99fdSEd Maste static void
bev_async_del_read(struct bufferevent_async * beva)174c43e99fdSEd Maste bev_async_del_read(struct bufferevent_async *beva)
175c43e99fdSEd Maste {
176c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
177c43e99fdSEd Maste
178c43e99fdSEd Maste if (beva->read_added) {
179c43e99fdSEd Maste beva->read_added = 0;
180c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base);
181c43e99fdSEd Maste }
182c43e99fdSEd Maste }
183c43e99fdSEd Maste
184c43e99fdSEd Maste static void
bev_async_add_write(struct bufferevent_async * beva)185c43e99fdSEd Maste bev_async_add_write(struct bufferevent_async *beva)
186c43e99fdSEd Maste {
187c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
188c43e99fdSEd Maste
189c43e99fdSEd Maste if (!beva->write_added) {
190c43e99fdSEd Maste beva->write_added = 1;
191c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base);
192c43e99fdSEd Maste }
193c43e99fdSEd Maste }
194c43e99fdSEd Maste
195c43e99fdSEd Maste static void
bev_async_add_read(struct bufferevent_async * beva)196c43e99fdSEd Maste bev_async_add_read(struct bufferevent_async *beva)
197c43e99fdSEd Maste {
198c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
199c43e99fdSEd Maste
200c43e99fdSEd Maste if (!beva->read_added) {
201c43e99fdSEd Maste beva->read_added = 1;
202c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base);
203c43e99fdSEd Maste }
204c43e99fdSEd Maste }
205c43e99fdSEd Maste
206c43e99fdSEd Maste static void
bev_async_consider_writing(struct bufferevent_async * beva)207c43e99fdSEd Maste bev_async_consider_writing(struct bufferevent_async *beva)
208c43e99fdSEd Maste {
209c43e99fdSEd Maste size_t at_most;
210c43e99fdSEd Maste int limit;
211c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
212c43e99fdSEd Maste
213c43e99fdSEd Maste /* Don't write if there's a write in progress, or we do not
214c43e99fdSEd Maste * want to write, or when there's nothing left to write. */
215c43e99fdSEd Maste if (beva->write_in_progress || beva->bev.connecting)
216c43e99fdSEd Maste return;
217c43e99fdSEd Maste if (!beva->ok || !(bev->enabled&EV_WRITE) ||
218c43e99fdSEd Maste !evbuffer_get_length(bev->output)) {
219c43e99fdSEd Maste bev_async_del_write(beva);
220c43e99fdSEd Maste return;
221c43e99fdSEd Maste }
222c43e99fdSEd Maste
223c43e99fdSEd Maste at_most = evbuffer_get_length(bev->output);
224c43e99fdSEd Maste
225c43e99fdSEd Maste /* This is safe so long as bufferevent_get_write_max never returns
226c43e99fdSEd Maste * more than INT_MAX. That's true for now. XXXX */
227c43e99fdSEd Maste limit = (int)bufferevent_get_write_max_(&beva->bev);
228c43e99fdSEd Maste if (at_most >= (size_t)limit && limit >= 0)
229c43e99fdSEd Maste at_most = limit;
230c43e99fdSEd Maste
231c43e99fdSEd Maste if (beva->bev.write_suspended) {
232c43e99fdSEd Maste bev_async_del_write(beva);
233c43e99fdSEd Maste return;
234c43e99fdSEd Maste }
235c43e99fdSEd Maste
236c43e99fdSEd Maste /* XXXX doesn't respect low-water mark very well. */
237c43e99fdSEd Maste bufferevent_incref_(bev);
238c43e99fdSEd Maste if (evbuffer_launch_write_(bev->output, at_most,
239c43e99fdSEd Maste &beva->write_overlapped)) {
240c43e99fdSEd Maste bufferevent_decref_(bev);
241c43e99fdSEd Maste beva->ok = 0;
242*b50261e2SCy Schubert be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
243c43e99fdSEd Maste } else {
244c43e99fdSEd Maste beva->write_in_progress = at_most;
245c43e99fdSEd Maste bufferevent_decrement_write_buckets_(&beva->bev, at_most);
246c43e99fdSEd Maste bev_async_add_write(beva);
247c43e99fdSEd Maste }
248c43e99fdSEd Maste }
249c43e99fdSEd Maste
250c43e99fdSEd Maste static void
bev_async_consider_reading(struct bufferevent_async * beva)251c43e99fdSEd Maste bev_async_consider_reading(struct bufferevent_async *beva)
252c43e99fdSEd Maste {
253c43e99fdSEd Maste size_t cur_size;
254c43e99fdSEd Maste size_t read_high;
255c43e99fdSEd Maste size_t at_most;
256c43e99fdSEd Maste int limit;
257c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev;
258c43e99fdSEd Maste
259c43e99fdSEd Maste /* Don't read if there is a read in progress, or we do not
260c43e99fdSEd Maste * want to read. */
261c43e99fdSEd Maste if (beva->read_in_progress || beva->bev.connecting)
262c43e99fdSEd Maste return;
263c43e99fdSEd Maste if (!beva->ok || !(bev->enabled&EV_READ)) {
264c43e99fdSEd Maste bev_async_del_read(beva);
265c43e99fdSEd Maste return;
266c43e99fdSEd Maste }
267c43e99fdSEd Maste
268c43e99fdSEd Maste /* Don't read if we're full */
269c43e99fdSEd Maste cur_size = evbuffer_get_length(bev->input);
270c43e99fdSEd Maste read_high = bev->wm_read.high;
271c43e99fdSEd Maste if (read_high) {
272c43e99fdSEd Maste if (cur_size >= read_high) {
273c43e99fdSEd Maste bev_async_del_read(beva);
274c43e99fdSEd Maste return;
275c43e99fdSEd Maste }
276c43e99fdSEd Maste at_most = read_high - cur_size;
277c43e99fdSEd Maste } else {
278c43e99fdSEd Maste at_most = 16384; /* FIXME totally magic. */
279c43e99fdSEd Maste }
280c43e99fdSEd Maste
281c43e99fdSEd Maste /* XXXX This over-commits. */
282c43e99fdSEd Maste /* XXXX see also not above on cast on bufferevent_get_write_max_() */
283c43e99fdSEd Maste limit = (int)bufferevent_get_read_max_(&beva->bev);
284c43e99fdSEd Maste if (at_most >= (size_t)limit && limit >= 0)
285c43e99fdSEd Maste at_most = limit;
286c43e99fdSEd Maste
287c43e99fdSEd Maste if (beva->bev.read_suspended) {
288c43e99fdSEd Maste bev_async_del_read(beva);
289c43e99fdSEd Maste return;
290c43e99fdSEd Maste }
291c43e99fdSEd Maste
292c43e99fdSEd Maste bufferevent_incref_(bev);
293c43e99fdSEd Maste if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
294c43e99fdSEd Maste beva->ok = 0;
295*b50261e2SCy Schubert be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
296c43e99fdSEd Maste bufferevent_decref_(bev);
297c43e99fdSEd Maste } else {
298c43e99fdSEd Maste beva->read_in_progress = at_most;
299c43e99fdSEd Maste bufferevent_decrement_read_buckets_(&beva->bev, at_most);
300c43e99fdSEd Maste bev_async_add_read(beva);
301c43e99fdSEd Maste }
302c43e99fdSEd Maste
303c43e99fdSEd Maste return;
304c43e99fdSEd Maste }
305c43e99fdSEd Maste
306c43e99fdSEd Maste static void
be_async_outbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)307c43e99fdSEd Maste be_async_outbuf_callback(struct evbuffer *buf,
308c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo,
309c43e99fdSEd Maste void *arg)
310c43e99fdSEd Maste {
311c43e99fdSEd Maste struct bufferevent *bev = arg;
312c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
313c43e99fdSEd Maste
314c43e99fdSEd Maste /* If we added data to the outbuf and were not writing before,
315c43e99fdSEd Maste * we may want to write now. */
316c43e99fdSEd Maste
317c43e99fdSEd Maste bufferevent_incref_and_lock_(bev);
318c43e99fdSEd Maste
319c43e99fdSEd Maste if (cbinfo->n_added)
320c43e99fdSEd Maste bev_async_consider_writing(bev_async);
321c43e99fdSEd Maste
322c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
323c43e99fdSEd Maste }
324c43e99fdSEd Maste
325c43e99fdSEd Maste static void
be_async_inbuf_callback(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)326c43e99fdSEd Maste be_async_inbuf_callback(struct evbuffer *buf,
327c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo,
328c43e99fdSEd Maste void *arg)
329c43e99fdSEd Maste {
330c43e99fdSEd Maste struct bufferevent *bev = arg;
331c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
332c43e99fdSEd Maste
333c43e99fdSEd Maste /* If we drained data from the inbuf and were not reading before,
334c43e99fdSEd Maste * we may want to read now */
335c43e99fdSEd Maste
336c43e99fdSEd Maste bufferevent_incref_and_lock_(bev);
337c43e99fdSEd Maste
338c43e99fdSEd Maste if (cbinfo->n_deleted)
339c43e99fdSEd Maste bev_async_consider_reading(bev_async);
340c43e99fdSEd Maste
341c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
342c43e99fdSEd Maste }
343c43e99fdSEd Maste
344c43e99fdSEd Maste static int
be_async_enable(struct bufferevent * buf,short what)345c43e99fdSEd Maste be_async_enable(struct bufferevent *buf, short what)
346c43e99fdSEd Maste {
347c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(buf);
348c43e99fdSEd Maste
349c43e99fdSEd Maste if (!bev_async->ok)
350c43e99fdSEd Maste return -1;
351c43e99fdSEd Maste
352c43e99fdSEd Maste if (bev_async->bev.connecting) {
353c43e99fdSEd Maste /* Don't launch anything during connection attempts. */
354c43e99fdSEd Maste return 0;
355c43e99fdSEd Maste }
356c43e99fdSEd Maste
357c43e99fdSEd Maste if (what & EV_READ)
358c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(buf);
359c43e99fdSEd Maste if (what & EV_WRITE)
360c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
361c43e99fdSEd Maste
362c43e99fdSEd Maste /* If we newly enable reading or writing, and we aren't reading or
363c43e99fdSEd Maste writing already, consider launching a new read or write. */
364c43e99fdSEd Maste
365c43e99fdSEd Maste if (what & EV_READ)
366c43e99fdSEd Maste bev_async_consider_reading(bev_async);
367c43e99fdSEd Maste if (what & EV_WRITE)
368c43e99fdSEd Maste bev_async_consider_writing(bev_async);
369c43e99fdSEd Maste return 0;
370c43e99fdSEd Maste }
371c43e99fdSEd Maste
372c43e99fdSEd Maste static int
be_async_disable(struct bufferevent * bev,short what)373c43e99fdSEd Maste be_async_disable(struct bufferevent *bev, short what)
374c43e99fdSEd Maste {
375c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
376c43e99fdSEd Maste /* XXXX If we disable reading or writing, we may want to consider
377c43e99fdSEd Maste * canceling any in-progress read or write operation, though it might
378c43e99fdSEd Maste * not work. */
379c43e99fdSEd Maste
380c43e99fdSEd Maste if (what & EV_READ) {
381c43e99fdSEd Maste BEV_DEL_GENERIC_READ_TIMEOUT(bev);
382c43e99fdSEd Maste bev_async_del_read(bev_async);
383c43e99fdSEd Maste }
384c43e99fdSEd Maste if (what & EV_WRITE) {
385c43e99fdSEd Maste BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
386c43e99fdSEd Maste bev_async_del_write(bev_async);
387c43e99fdSEd Maste }
388c43e99fdSEd Maste
389c43e99fdSEd Maste return 0;
390c43e99fdSEd Maste }
391c43e99fdSEd Maste
392c43e99fdSEd Maste static void
be_async_destruct(struct bufferevent * bev)393c43e99fdSEd Maste be_async_destruct(struct bufferevent *bev)
394c43e99fdSEd Maste {
395c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
396c43e99fdSEd Maste struct bufferevent_private *bev_p = BEV_UPCAST(bev);
397c43e99fdSEd Maste evutil_socket_t fd;
398c43e99fdSEd Maste
399c43e99fdSEd Maste EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
400c43e99fdSEd Maste !upcast(bev)->read_in_progress);
401c43e99fdSEd Maste
402c43e99fdSEd Maste bev_async_del_read(bev_async);
403c43e99fdSEd Maste bev_async_del_write(bev_async);
404c43e99fdSEd Maste
405c43e99fdSEd Maste fd = evbuffer_overlapped_get_fd_(bev->input);
406*b50261e2SCy Schubert if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
407c43e99fdSEd Maste (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
408c43e99fdSEd Maste evutil_closesocket(fd);
409*b50261e2SCy Schubert evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
410c43e99fdSEd Maste }
411c43e99fdSEd Maste }
412c43e99fdSEd Maste
413c43e99fdSEd Maste /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
414c43e99fdSEd Maste * we use WSAGetOverlappedResult to translate. */
415c43e99fdSEd Maste static void
bev_async_set_wsa_error(struct bufferevent * bev,struct event_overlapped * eo)416c43e99fdSEd Maste bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
417c43e99fdSEd Maste {
418c43e99fdSEd Maste DWORD bytes, flags;
419c43e99fdSEd Maste evutil_socket_t fd;
420c43e99fdSEd Maste
421c43e99fdSEd Maste fd = evbuffer_overlapped_get_fd_(bev->input);
422c43e99fdSEd Maste WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
423c43e99fdSEd Maste }
424c43e99fdSEd Maste
425c43e99fdSEd Maste static int
be_async_flush(struct bufferevent * bev,short what,enum bufferevent_flush_mode mode)426c43e99fdSEd Maste be_async_flush(struct bufferevent *bev, short what,
427c43e99fdSEd Maste enum bufferevent_flush_mode mode)
428c43e99fdSEd Maste {
429c43e99fdSEd Maste return 0;
430c43e99fdSEd Maste }
431c43e99fdSEd Maste
432c43e99fdSEd Maste static void
connect_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)433c43e99fdSEd Maste connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
434c43e99fdSEd Maste ev_ssize_t nbytes, int ok)
435c43e99fdSEd Maste {
436c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_connect(eo);
437c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev;
438c43e99fdSEd Maste evutil_socket_t sock;
439c43e99fdSEd Maste
440c43e99fdSEd Maste BEV_LOCK(bev);
441c43e99fdSEd Maste
442c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->bev.connecting);
443c43e99fdSEd Maste bev_a->bev.connecting = 0;
444c43e99fdSEd Maste sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
445c43e99fdSEd Maste /* XXXX Handle error? */
446c43e99fdSEd Maste setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
447c43e99fdSEd Maste
448c43e99fdSEd Maste if (ok)
449c43e99fdSEd Maste bufferevent_async_set_connected_(bev);
450c43e99fdSEd Maste else
451c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo);
452c43e99fdSEd Maste
453*b50261e2SCy Schubert be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
454c43e99fdSEd Maste
455c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base);
456c43e99fdSEd Maste
457c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
458c43e99fdSEd Maste }
459c43e99fdSEd Maste
460c43e99fdSEd Maste static void
read_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)461c43e99fdSEd Maste read_complete(struct event_overlapped *eo, ev_uintptr_t key,
462c43e99fdSEd Maste ev_ssize_t nbytes, int ok)
463c43e99fdSEd Maste {
464c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_read(eo);
465c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev;
466c43e99fdSEd Maste short what = BEV_EVENT_READING;
467c43e99fdSEd Maste ev_ssize_t amount_unread;
468c43e99fdSEd Maste BEV_LOCK(bev);
469c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->read_in_progress);
470c43e99fdSEd Maste
471c43e99fdSEd Maste amount_unread = bev_a->read_in_progress - nbytes;
472c43e99fdSEd Maste evbuffer_commit_read_(bev->input, nbytes);
473c43e99fdSEd Maste bev_a->read_in_progress = 0;
474c43e99fdSEd Maste if (amount_unread)
475c43e99fdSEd Maste bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
476c43e99fdSEd Maste
477c43e99fdSEd Maste if (!ok)
478c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo);
479c43e99fdSEd Maste
480c43e99fdSEd Maste if (bev_a->ok) {
481c43e99fdSEd Maste if (ok && nbytes) {
482c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(bev);
483*b50261e2SCy Schubert be_async_trigger_nolock(bev, EV_READ, 0);
484c43e99fdSEd Maste bev_async_consider_reading(bev_a);
485c43e99fdSEd Maste } else if (!ok) {
486c43e99fdSEd Maste what |= BEV_EVENT_ERROR;
487c43e99fdSEd Maste bev_a->ok = 0;
488*b50261e2SCy Schubert be_async_run_eventcb(bev, what, 0);
489c43e99fdSEd Maste } else if (!nbytes) {
490c43e99fdSEd Maste what |= BEV_EVENT_EOF;
491c43e99fdSEd Maste bev_a->ok = 0;
492*b50261e2SCy Schubert be_async_run_eventcb(bev, what, 0);
493c43e99fdSEd Maste }
494c43e99fdSEd Maste }
495c43e99fdSEd Maste
496c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
497c43e99fdSEd Maste }
498c43e99fdSEd Maste
499c43e99fdSEd Maste static void
write_complete(struct event_overlapped * eo,ev_uintptr_t key,ev_ssize_t nbytes,int ok)500c43e99fdSEd Maste write_complete(struct event_overlapped *eo, ev_uintptr_t key,
501c43e99fdSEd Maste ev_ssize_t nbytes, int ok)
502c43e99fdSEd Maste {
503c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_write(eo);
504c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev;
505c43e99fdSEd Maste short what = BEV_EVENT_WRITING;
506c43e99fdSEd Maste ev_ssize_t amount_unwritten;
507c43e99fdSEd Maste
508c43e99fdSEd Maste BEV_LOCK(bev);
509c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->write_in_progress);
510c43e99fdSEd Maste
511c43e99fdSEd Maste amount_unwritten = bev_a->write_in_progress - nbytes;
512c43e99fdSEd Maste evbuffer_commit_write_(bev->output, nbytes);
513c43e99fdSEd Maste bev_a->write_in_progress = 0;
514c43e99fdSEd Maste
515c43e99fdSEd Maste if (amount_unwritten)
516c43e99fdSEd Maste bufferevent_decrement_write_buckets_(&bev_a->bev,
517c43e99fdSEd Maste -amount_unwritten);
518c43e99fdSEd Maste
519c43e99fdSEd Maste
520c43e99fdSEd Maste if (!ok)
521c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo);
522c43e99fdSEd Maste
523c43e99fdSEd Maste if (bev_a->ok) {
524c43e99fdSEd Maste if (ok && nbytes) {
525c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
526*b50261e2SCy Schubert be_async_trigger_nolock(bev, EV_WRITE, 0);
527c43e99fdSEd Maste bev_async_consider_writing(bev_a);
528c43e99fdSEd Maste } else if (!ok) {
529c43e99fdSEd Maste what |= BEV_EVENT_ERROR;
530c43e99fdSEd Maste bev_a->ok = 0;
531*b50261e2SCy Schubert be_async_run_eventcb(bev, what, 0);
532c43e99fdSEd Maste } else if (!nbytes) {
533c43e99fdSEd Maste what |= BEV_EVENT_EOF;
534c43e99fdSEd Maste bev_a->ok = 0;
535*b50261e2SCy Schubert be_async_run_eventcb(bev, what, 0);
536c43e99fdSEd Maste }
537c43e99fdSEd Maste }
538c43e99fdSEd Maste
539c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
540c43e99fdSEd Maste }
541c43e99fdSEd Maste
542c43e99fdSEd Maste struct bufferevent *
bufferevent_async_new_(struct event_base * base,evutil_socket_t fd,int options)543c43e99fdSEd Maste bufferevent_async_new_(struct event_base *base,
544c43e99fdSEd Maste evutil_socket_t fd, int options)
545c43e99fdSEd Maste {
546c43e99fdSEd Maste struct bufferevent_async *bev_a;
547c43e99fdSEd Maste struct bufferevent *bev;
548c43e99fdSEd Maste struct event_iocp_port *iocp;
549c43e99fdSEd Maste
550c43e99fdSEd Maste options |= BEV_OPT_THREADSAFE;
551c43e99fdSEd Maste
552c43e99fdSEd Maste if (!(iocp = event_base_get_iocp_(base)))
553c43e99fdSEd Maste return NULL;
554c43e99fdSEd Maste
555c43e99fdSEd Maste if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
556*b50261e2SCy Schubert if (fatal_error(GetLastError()))
557c43e99fdSEd Maste return NULL;
558c43e99fdSEd Maste }
559c43e99fdSEd Maste
560c43e99fdSEd Maste if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
561c43e99fdSEd Maste return NULL;
562c43e99fdSEd Maste
563c43e99fdSEd Maste bev = &bev_a->bev.bev;
564c43e99fdSEd Maste if (!(bev->input = evbuffer_overlapped_new_(fd))) {
565c43e99fdSEd Maste mm_free(bev_a);
566c43e99fdSEd Maste return NULL;
567c43e99fdSEd Maste }
568c43e99fdSEd Maste if (!(bev->output = evbuffer_overlapped_new_(fd))) {
569c43e99fdSEd Maste evbuffer_free(bev->input);
570c43e99fdSEd Maste mm_free(bev_a);
571c43e99fdSEd Maste return NULL;
572c43e99fdSEd Maste }
573c43e99fdSEd Maste
574c43e99fdSEd Maste if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
575c43e99fdSEd Maste options)<0)
576c43e99fdSEd Maste goto err;
577c43e99fdSEd Maste
578c43e99fdSEd Maste evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
579c43e99fdSEd Maste evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
580c43e99fdSEd Maste
581c43e99fdSEd Maste event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
582c43e99fdSEd Maste event_overlapped_init_(&bev_a->read_overlapped, read_complete);
583c43e99fdSEd Maste event_overlapped_init_(&bev_a->write_overlapped, write_complete);
584c43e99fdSEd Maste
585c43e99fdSEd Maste bufferevent_init_generic_timeout_cbs_(bev);
586c43e99fdSEd Maste
587c43e99fdSEd Maste bev_a->ok = fd >= 0;
588c43e99fdSEd Maste
589c43e99fdSEd Maste return bev;
590c43e99fdSEd Maste err:
591c43e99fdSEd Maste bufferevent_free(&bev_a->bev.bev);
592c43e99fdSEd Maste return NULL;
593c43e99fdSEd Maste }
594c43e99fdSEd Maste
595c43e99fdSEd Maste void
bufferevent_async_set_connected_(struct bufferevent * bev)596c43e99fdSEd Maste bufferevent_async_set_connected_(struct bufferevent *bev)
597c43e99fdSEd Maste {
598c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
599c43e99fdSEd Maste bev_async->ok = 1;
600c43e99fdSEd Maste /* Now's a good time to consider reading/writing */
601c43e99fdSEd Maste be_async_enable(bev, bev->enabled);
602c43e99fdSEd Maste }
603c43e99fdSEd Maste
604c43e99fdSEd Maste int
bufferevent_async_can_connect_(struct bufferevent * bev)605c43e99fdSEd Maste bufferevent_async_can_connect_(struct bufferevent *bev)
606c43e99fdSEd Maste {
607c43e99fdSEd Maste const struct win32_extension_fns *ext =
608c43e99fdSEd Maste event_get_win32_extension_fns_();
609c43e99fdSEd Maste
610c43e99fdSEd Maste if (BEV_IS_ASYNC(bev) &&
611c43e99fdSEd Maste event_base_get_iocp_(bev->ev_base) &&
612c43e99fdSEd Maste ext && ext->ConnectEx)
613c43e99fdSEd Maste return 1;
614c43e99fdSEd Maste
615c43e99fdSEd Maste return 0;
616c43e99fdSEd Maste }
617c43e99fdSEd Maste
618c43e99fdSEd Maste int
bufferevent_async_connect_(struct bufferevent * bev,evutil_socket_t fd,const struct sockaddr * sa,int socklen)619c43e99fdSEd Maste bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
620c43e99fdSEd Maste const struct sockaddr *sa, int socklen)
621c43e99fdSEd Maste {
622c43e99fdSEd Maste BOOL rc;
623c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev);
624c43e99fdSEd Maste struct sockaddr_storage ss;
625c43e99fdSEd Maste const struct win32_extension_fns *ext =
626c43e99fdSEd Maste event_get_win32_extension_fns_();
627c43e99fdSEd Maste
628c43e99fdSEd Maste EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
629c43e99fdSEd Maste
630c43e99fdSEd Maste /* ConnectEx() requires that the socket be bound to an address
631c43e99fdSEd Maste * with bind() before using, otherwise it will fail. We attempt
632c43e99fdSEd Maste * to issue a bind() here, taking into account that the error
633c43e99fdSEd Maste * code is set to WSAEINVAL when the socket is already bound. */
634c43e99fdSEd Maste memset(&ss, 0, sizeof(ss));
635c43e99fdSEd Maste if (sa->sa_family == AF_INET) {
636c43e99fdSEd Maste struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
637c43e99fdSEd Maste sin->sin_family = AF_INET;
638c43e99fdSEd Maste sin->sin_addr.s_addr = INADDR_ANY;
639c43e99fdSEd Maste } else if (sa->sa_family == AF_INET6) {
640c43e99fdSEd Maste struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
641c43e99fdSEd Maste sin6->sin6_family = AF_INET6;
642c43e99fdSEd Maste sin6->sin6_addr = in6addr_any;
643c43e99fdSEd Maste } else {
644c43e99fdSEd Maste /* Well, the user will have to bind() */
645c43e99fdSEd Maste return -1;
646c43e99fdSEd Maste }
647c43e99fdSEd Maste if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
648c43e99fdSEd Maste WSAGetLastError() != WSAEINVAL)
649c43e99fdSEd Maste return -1;
650c43e99fdSEd Maste
651c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base);
652c43e99fdSEd Maste bufferevent_incref_(bev);
653c43e99fdSEd Maste rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
654c43e99fdSEd Maste &bev_async->connect_overlapped.overlapped);
655c43e99fdSEd Maste if (rc || WSAGetLastError() == ERROR_IO_PENDING)
656c43e99fdSEd Maste return 0;
657c43e99fdSEd Maste
658c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base);
659c43e99fdSEd Maste bufferevent_decref_(bev);
660c43e99fdSEd Maste
661c43e99fdSEd Maste return -1;
662c43e99fdSEd Maste }
663c43e99fdSEd Maste
664c43e99fdSEd Maste static int
be_async_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)665c43e99fdSEd Maste be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
666c43e99fdSEd Maste union bufferevent_ctrl_data *data)
667c43e99fdSEd Maste {
668c43e99fdSEd Maste switch (op) {
669c43e99fdSEd Maste case BEV_CTRL_GET_FD:
670c43e99fdSEd Maste data->fd = evbuffer_overlapped_get_fd_(bev->input);
671c43e99fdSEd Maste return 0;
672c43e99fdSEd Maste case BEV_CTRL_SET_FD: {
673*b50261e2SCy Schubert struct bufferevent_async *bev_a = upcast(bev);
674c43e99fdSEd Maste struct event_iocp_port *iocp;
675c43e99fdSEd Maste
676c43e99fdSEd Maste if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
677c43e99fdSEd Maste return 0;
678c43e99fdSEd Maste if (!(iocp = event_base_get_iocp_(bev->ev_base)))
679c43e99fdSEd Maste return -1;
680*b50261e2SCy Schubert if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
681*b50261e2SCy Schubert if (fatal_error(GetLastError()))
682c43e99fdSEd Maste return -1;
683*b50261e2SCy Schubert }
684c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->input, data->fd);
685c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->output, data->fd);
686*b50261e2SCy Schubert bev_a->ok = data->fd >= 0;
687c43e99fdSEd Maste return 0;
688c43e99fdSEd Maste }
689c43e99fdSEd Maste case BEV_CTRL_CANCEL_ALL: {
690c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast(bev);
691c43e99fdSEd Maste evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
692*b50261e2SCy Schubert if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
693c43e99fdSEd Maste (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
694c43e99fdSEd Maste closesocket(fd);
695*b50261e2SCy Schubert evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
696c43e99fdSEd Maste }
697c43e99fdSEd Maste bev_a->ok = 0;
698c43e99fdSEd Maste return 0;
699c43e99fdSEd Maste }
700c43e99fdSEd Maste case BEV_CTRL_GET_UNDERLYING:
701c43e99fdSEd Maste default:
702c43e99fdSEd Maste return -1;
703c43e99fdSEd Maste }
704c43e99fdSEd Maste }
705c43e99fdSEd Maste
706c43e99fdSEd Maste
707