1*c43e99fdSEd Maste /* 2*c43e99fdSEd Maste * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3*c43e99fdSEd Maste * 4*c43e99fdSEd Maste * All rights reserved. 5*c43e99fdSEd Maste * 6*c43e99fdSEd Maste * Redistribution and use in source and binary forms, with or without 7*c43e99fdSEd Maste * modification, are permitted provided that the following conditions 8*c43e99fdSEd Maste * are met: 9*c43e99fdSEd Maste * 1. Redistributions of source code must retain the above copyright 10*c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer. 11*c43e99fdSEd Maste * 2. Redistributions in binary form must reproduce the above copyright 12*c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer in the 13*c43e99fdSEd Maste * documentation and/or other materials provided with the distribution. 14*c43e99fdSEd Maste * 3. The name of the author may not be used to endorse or promote products 15*c43e99fdSEd Maste * derived from this software without specific prior written permission. 16*c43e99fdSEd Maste * 17*c43e99fdSEd Maste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18*c43e99fdSEd Maste * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19*c43e99fdSEd Maste * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20*c43e99fdSEd Maste * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21*c43e99fdSEd Maste * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22*c43e99fdSEd Maste * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23*c43e99fdSEd Maste * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24*c43e99fdSEd Maste * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25*c43e99fdSEd Maste * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26*c43e99fdSEd Maste * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27*c43e99fdSEd Maste */ 28*c43e99fdSEd Maste 29*c43e99fdSEd Maste #include "event2/event-config.h" 30*c43e99fdSEd Maste #include "evconfig-private.h" 31*c43e99fdSEd Maste 32*c43e99fdSEd Maste #ifdef EVENT__HAVE_SYS_TIME_H 33*c43e99fdSEd Maste #include <sys/time.h> 34*c43e99fdSEd Maste #endif 35*c43e99fdSEd Maste 36*c43e99fdSEd Maste #include <errno.h> 37*c43e99fdSEd Maste #include <stdio.h> 38*c43e99fdSEd Maste #include <stdlib.h> 39*c43e99fdSEd Maste #include <string.h> 40*c43e99fdSEd Maste #ifdef EVENT__HAVE_STDARG_H 41*c43e99fdSEd Maste #include <stdarg.h> 42*c43e99fdSEd Maste #endif 43*c43e99fdSEd Maste #ifdef EVENT__HAVE_UNISTD_H 44*c43e99fdSEd Maste #include <unistd.h> 45*c43e99fdSEd Maste #endif 46*c43e99fdSEd Maste 47*c43e99fdSEd Maste #ifdef _WIN32 48*c43e99fdSEd Maste #include <winsock2.h> 49*c43e99fdSEd Maste #include <ws2tcpip.h> 50*c43e99fdSEd Maste #endif 51*c43e99fdSEd Maste 52*c43e99fdSEd Maste #include <sys/queue.h> 53*c43e99fdSEd Maste 54*c43e99fdSEd Maste #include "event2/util.h" 55*c43e99fdSEd Maste #include "event2/bufferevent.h" 56*c43e99fdSEd Maste #include "event2/buffer.h" 57*c43e99fdSEd Maste #include "event2/bufferevent_struct.h" 58*c43e99fdSEd Maste #include "event2/event.h" 59*c43e99fdSEd Maste #include "event2/util.h" 60*c43e99fdSEd Maste #include "event-internal.h" 61*c43e99fdSEd Maste #include "log-internal.h" 62*c43e99fdSEd Maste #include "mm-internal.h" 63*c43e99fdSEd Maste #include "bufferevent-internal.h" 64*c43e99fdSEd Maste #include "util-internal.h" 65*c43e99fdSEd Maste #include "iocp-internal.h" 66*c43e99fdSEd Maste 67*c43e99fdSEd Maste #ifndef SO_UPDATE_CONNECT_CONTEXT 68*c43e99fdSEd Maste /* Mingw is sometimes missing this */ 69*c43e99fdSEd Maste #define SO_UPDATE_CONNECT_CONTEXT 0x7010 70*c43e99fdSEd Maste #endif 71*c43e99fdSEd Maste 72*c43e99fdSEd Maste /* prototypes */ 73*c43e99fdSEd Maste static int be_async_enable(struct bufferevent *, short); 74*c43e99fdSEd Maste static int be_async_disable(struct bufferevent *, short); 75*c43e99fdSEd Maste static void be_async_destruct(struct bufferevent *); 76*c43e99fdSEd Maste static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 77*c43e99fdSEd Maste static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 78*c43e99fdSEd Maste 79*c43e99fdSEd Maste struct bufferevent_async { 80*c43e99fdSEd Maste struct bufferevent_private bev; 81*c43e99fdSEd Maste struct event_overlapped connect_overlapped; 82*c43e99fdSEd Maste struct event_overlapped read_overlapped; 83*c43e99fdSEd Maste struct event_overlapped write_overlapped; 84*c43e99fdSEd Maste size_t read_in_progress; 85*c43e99fdSEd Maste size_t write_in_progress; 86*c43e99fdSEd Maste unsigned ok : 1; 87*c43e99fdSEd Maste unsigned read_added : 1; 88*c43e99fdSEd Maste unsigned write_added : 1; 89*c43e99fdSEd Maste }; 90*c43e99fdSEd Maste 91*c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_async = { 92*c43e99fdSEd Maste "socket_async", 93*c43e99fdSEd Maste evutil_offsetof(struct bufferevent_async, bev.bev), 94*c43e99fdSEd Maste be_async_enable, 95*c43e99fdSEd Maste be_async_disable, 96*c43e99fdSEd Maste NULL, /* Unlink */ 97*c43e99fdSEd Maste be_async_destruct, 98*c43e99fdSEd Maste bufferevent_generic_adj_timeouts_, 99*c43e99fdSEd Maste be_async_flush, 100*c43e99fdSEd Maste be_async_ctrl, 101*c43e99fdSEd Maste }; 102*c43e99fdSEd Maste 103*c43e99fdSEd Maste static inline struct bufferevent_async * 104*c43e99fdSEd Maste upcast(struct bufferevent *bev) 105*c43e99fdSEd Maste { 106*c43e99fdSEd Maste struct bufferevent_async *bev_a; 107*c43e99fdSEd Maste if (bev->be_ops != &bufferevent_ops_async) 108*c43e99fdSEd Maste return NULL; 109*c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 110*c43e99fdSEd Maste return bev_a; 111*c43e99fdSEd Maste } 112*c43e99fdSEd Maste 113*c43e99fdSEd Maste static inline struct bufferevent_async * 114*c43e99fdSEd Maste upcast_connect(struct event_overlapped *eo) 115*c43e99fdSEd Maste { 116*c43e99fdSEd Maste struct bufferevent_async *bev_a; 117*c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 118*c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 119*c43e99fdSEd Maste return bev_a; 120*c43e99fdSEd Maste } 121*c43e99fdSEd Maste 122*c43e99fdSEd Maste static inline struct bufferevent_async * 123*c43e99fdSEd Maste upcast_read(struct event_overlapped *eo) 124*c43e99fdSEd Maste { 125*c43e99fdSEd Maste struct bufferevent_async *bev_a; 126*c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 127*c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 128*c43e99fdSEd Maste return bev_a; 129*c43e99fdSEd Maste } 130*c43e99fdSEd Maste 131*c43e99fdSEd Maste static inline struct bufferevent_async * 132*c43e99fdSEd Maste upcast_write(struct event_overlapped *eo) 133*c43e99fdSEd Maste { 134*c43e99fdSEd Maste struct bufferevent_async *bev_a; 135*c43e99fdSEd Maste bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 136*c43e99fdSEd Maste EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 137*c43e99fdSEd Maste return bev_a; 138*c43e99fdSEd Maste } 139*c43e99fdSEd Maste 140*c43e99fdSEd Maste static void 141*c43e99fdSEd Maste bev_async_del_write(struct bufferevent_async *beva) 142*c43e99fdSEd Maste { 143*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 144*c43e99fdSEd Maste 145*c43e99fdSEd Maste if (beva->write_added) { 146*c43e99fdSEd Maste beva->write_added = 0; 147*c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base); 148*c43e99fdSEd Maste } 149*c43e99fdSEd Maste } 150*c43e99fdSEd Maste 151*c43e99fdSEd Maste static void 152*c43e99fdSEd Maste bev_async_del_read(struct bufferevent_async *beva) 153*c43e99fdSEd Maste { 154*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 155*c43e99fdSEd Maste 156*c43e99fdSEd Maste if (beva->read_added) { 157*c43e99fdSEd Maste beva->read_added = 0; 158*c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base); 159*c43e99fdSEd Maste } 160*c43e99fdSEd Maste } 161*c43e99fdSEd Maste 162*c43e99fdSEd Maste static void 163*c43e99fdSEd Maste bev_async_add_write(struct bufferevent_async *beva) 164*c43e99fdSEd Maste { 165*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 166*c43e99fdSEd Maste 167*c43e99fdSEd Maste if (!beva->write_added) { 168*c43e99fdSEd Maste beva->write_added = 1; 169*c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base); 170*c43e99fdSEd Maste } 171*c43e99fdSEd Maste } 172*c43e99fdSEd Maste 173*c43e99fdSEd Maste static void 174*c43e99fdSEd Maste bev_async_add_read(struct bufferevent_async *beva) 175*c43e99fdSEd Maste { 176*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 177*c43e99fdSEd Maste 178*c43e99fdSEd Maste if (!beva->read_added) { 179*c43e99fdSEd Maste beva->read_added = 1; 180*c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base); 181*c43e99fdSEd Maste } 182*c43e99fdSEd Maste } 183*c43e99fdSEd Maste 184*c43e99fdSEd Maste static void 185*c43e99fdSEd Maste bev_async_consider_writing(struct bufferevent_async *beva) 186*c43e99fdSEd Maste { 187*c43e99fdSEd Maste size_t at_most; 188*c43e99fdSEd Maste int limit; 189*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 190*c43e99fdSEd Maste 191*c43e99fdSEd Maste /* Don't write if there's a write in progress, or we do not 192*c43e99fdSEd Maste * want to write, or when there's nothing left to write. */ 193*c43e99fdSEd Maste if (beva->write_in_progress || beva->bev.connecting) 194*c43e99fdSEd Maste return; 195*c43e99fdSEd Maste if (!beva->ok || !(bev->enabled&EV_WRITE) || 196*c43e99fdSEd Maste !evbuffer_get_length(bev->output)) { 197*c43e99fdSEd Maste bev_async_del_write(beva); 198*c43e99fdSEd Maste return; 199*c43e99fdSEd Maste } 200*c43e99fdSEd Maste 201*c43e99fdSEd Maste at_most = evbuffer_get_length(bev->output); 202*c43e99fdSEd Maste 203*c43e99fdSEd Maste /* This is safe so long as bufferevent_get_write_max never returns 204*c43e99fdSEd Maste * more than INT_MAX. That's true for now. XXXX */ 205*c43e99fdSEd Maste limit = (int)bufferevent_get_write_max_(&beva->bev); 206*c43e99fdSEd Maste if (at_most >= (size_t)limit && limit >= 0) 207*c43e99fdSEd Maste at_most = limit; 208*c43e99fdSEd Maste 209*c43e99fdSEd Maste if (beva->bev.write_suspended) { 210*c43e99fdSEd Maste bev_async_del_write(beva); 211*c43e99fdSEd Maste return; 212*c43e99fdSEd Maste } 213*c43e99fdSEd Maste 214*c43e99fdSEd Maste /* XXXX doesn't respect low-water mark very well. */ 215*c43e99fdSEd Maste bufferevent_incref_(bev); 216*c43e99fdSEd Maste if (evbuffer_launch_write_(bev->output, at_most, 217*c43e99fdSEd Maste &beva->write_overlapped)) { 218*c43e99fdSEd Maste bufferevent_decref_(bev); 219*c43e99fdSEd Maste beva->ok = 0; 220*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 221*c43e99fdSEd Maste } else { 222*c43e99fdSEd Maste beva->write_in_progress = at_most; 223*c43e99fdSEd Maste bufferevent_decrement_write_buckets_(&beva->bev, at_most); 224*c43e99fdSEd Maste bev_async_add_write(beva); 225*c43e99fdSEd Maste } 226*c43e99fdSEd Maste } 227*c43e99fdSEd Maste 228*c43e99fdSEd Maste static void 229*c43e99fdSEd Maste bev_async_consider_reading(struct bufferevent_async *beva) 230*c43e99fdSEd Maste { 231*c43e99fdSEd Maste size_t cur_size; 232*c43e99fdSEd Maste size_t read_high; 233*c43e99fdSEd Maste size_t at_most; 234*c43e99fdSEd Maste int limit; 235*c43e99fdSEd Maste struct bufferevent *bev = &beva->bev.bev; 236*c43e99fdSEd Maste 237*c43e99fdSEd Maste /* Don't read if there is a read in progress, or we do not 238*c43e99fdSEd Maste * want to read. */ 239*c43e99fdSEd Maste if (beva->read_in_progress || beva->bev.connecting) 240*c43e99fdSEd Maste return; 241*c43e99fdSEd Maste if (!beva->ok || !(bev->enabled&EV_READ)) { 242*c43e99fdSEd Maste bev_async_del_read(beva); 243*c43e99fdSEd Maste return; 244*c43e99fdSEd Maste } 245*c43e99fdSEd Maste 246*c43e99fdSEd Maste /* Don't read if we're full */ 247*c43e99fdSEd Maste cur_size = evbuffer_get_length(bev->input); 248*c43e99fdSEd Maste read_high = bev->wm_read.high; 249*c43e99fdSEd Maste if (read_high) { 250*c43e99fdSEd Maste if (cur_size >= read_high) { 251*c43e99fdSEd Maste bev_async_del_read(beva); 252*c43e99fdSEd Maste return; 253*c43e99fdSEd Maste } 254*c43e99fdSEd Maste at_most = read_high - cur_size; 255*c43e99fdSEd Maste } else { 256*c43e99fdSEd Maste at_most = 16384; /* FIXME totally magic. */ 257*c43e99fdSEd Maste } 258*c43e99fdSEd Maste 259*c43e99fdSEd Maste /* XXXX This over-commits. */ 260*c43e99fdSEd Maste /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 261*c43e99fdSEd Maste limit = (int)bufferevent_get_read_max_(&beva->bev); 262*c43e99fdSEd Maste if (at_most >= (size_t)limit && limit >= 0) 263*c43e99fdSEd Maste at_most = limit; 264*c43e99fdSEd Maste 265*c43e99fdSEd Maste if (beva->bev.read_suspended) { 266*c43e99fdSEd Maste bev_async_del_read(beva); 267*c43e99fdSEd Maste return; 268*c43e99fdSEd Maste } 269*c43e99fdSEd Maste 270*c43e99fdSEd Maste bufferevent_incref_(bev); 271*c43e99fdSEd Maste if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 272*c43e99fdSEd Maste beva->ok = 0; 273*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 274*c43e99fdSEd Maste bufferevent_decref_(bev); 275*c43e99fdSEd Maste } else { 276*c43e99fdSEd Maste beva->read_in_progress = at_most; 277*c43e99fdSEd Maste bufferevent_decrement_read_buckets_(&beva->bev, at_most); 278*c43e99fdSEd Maste bev_async_add_read(beva); 279*c43e99fdSEd Maste } 280*c43e99fdSEd Maste 281*c43e99fdSEd Maste return; 282*c43e99fdSEd Maste } 283*c43e99fdSEd Maste 284*c43e99fdSEd Maste static void 285*c43e99fdSEd Maste be_async_outbuf_callback(struct evbuffer *buf, 286*c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo, 287*c43e99fdSEd Maste void *arg) 288*c43e99fdSEd Maste { 289*c43e99fdSEd Maste struct bufferevent *bev = arg; 290*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 291*c43e99fdSEd Maste 292*c43e99fdSEd Maste /* If we added data to the outbuf and were not writing before, 293*c43e99fdSEd Maste * we may want to write now. */ 294*c43e99fdSEd Maste 295*c43e99fdSEd Maste bufferevent_incref_and_lock_(bev); 296*c43e99fdSEd Maste 297*c43e99fdSEd Maste if (cbinfo->n_added) 298*c43e99fdSEd Maste bev_async_consider_writing(bev_async); 299*c43e99fdSEd Maste 300*c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev); 301*c43e99fdSEd Maste } 302*c43e99fdSEd Maste 303*c43e99fdSEd Maste static void 304*c43e99fdSEd Maste be_async_inbuf_callback(struct evbuffer *buf, 305*c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo, 306*c43e99fdSEd Maste void *arg) 307*c43e99fdSEd Maste { 308*c43e99fdSEd Maste struct bufferevent *bev = arg; 309*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 310*c43e99fdSEd Maste 311*c43e99fdSEd Maste /* If we drained data from the inbuf and were not reading before, 312*c43e99fdSEd Maste * we may want to read now */ 313*c43e99fdSEd Maste 314*c43e99fdSEd Maste bufferevent_incref_and_lock_(bev); 315*c43e99fdSEd Maste 316*c43e99fdSEd Maste if (cbinfo->n_deleted) 317*c43e99fdSEd Maste bev_async_consider_reading(bev_async); 318*c43e99fdSEd Maste 319*c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev); 320*c43e99fdSEd Maste } 321*c43e99fdSEd Maste 322*c43e99fdSEd Maste static int 323*c43e99fdSEd Maste be_async_enable(struct bufferevent *buf, short what) 324*c43e99fdSEd Maste { 325*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(buf); 326*c43e99fdSEd Maste 327*c43e99fdSEd Maste if (!bev_async->ok) 328*c43e99fdSEd Maste return -1; 329*c43e99fdSEd Maste 330*c43e99fdSEd Maste if (bev_async->bev.connecting) { 331*c43e99fdSEd Maste /* Don't launch anything during connection attempts. */ 332*c43e99fdSEd Maste return 0; 333*c43e99fdSEd Maste } 334*c43e99fdSEd Maste 335*c43e99fdSEd Maste if (what & EV_READ) 336*c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(buf); 337*c43e99fdSEd Maste if (what & EV_WRITE) 338*c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 339*c43e99fdSEd Maste 340*c43e99fdSEd Maste /* If we newly enable reading or writing, and we aren't reading or 341*c43e99fdSEd Maste writing already, consider launching a new read or write. */ 342*c43e99fdSEd Maste 343*c43e99fdSEd Maste if (what & EV_READ) 344*c43e99fdSEd Maste bev_async_consider_reading(bev_async); 345*c43e99fdSEd Maste if (what & EV_WRITE) 346*c43e99fdSEd Maste bev_async_consider_writing(bev_async); 347*c43e99fdSEd Maste return 0; 348*c43e99fdSEd Maste } 349*c43e99fdSEd Maste 350*c43e99fdSEd Maste static int 351*c43e99fdSEd Maste be_async_disable(struct bufferevent *bev, short what) 352*c43e99fdSEd Maste { 353*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 354*c43e99fdSEd Maste /* XXXX If we disable reading or writing, we may want to consider 355*c43e99fdSEd Maste * canceling any in-progress read or write operation, though it might 356*c43e99fdSEd Maste * not work. */ 357*c43e99fdSEd Maste 358*c43e99fdSEd Maste if (what & EV_READ) { 359*c43e99fdSEd Maste BEV_DEL_GENERIC_READ_TIMEOUT(bev); 360*c43e99fdSEd Maste bev_async_del_read(bev_async); 361*c43e99fdSEd Maste } 362*c43e99fdSEd Maste if (what & EV_WRITE) { 363*c43e99fdSEd Maste BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 364*c43e99fdSEd Maste bev_async_del_write(bev_async); 365*c43e99fdSEd Maste } 366*c43e99fdSEd Maste 367*c43e99fdSEd Maste return 0; 368*c43e99fdSEd Maste } 369*c43e99fdSEd Maste 370*c43e99fdSEd Maste static void 371*c43e99fdSEd Maste be_async_destruct(struct bufferevent *bev) 372*c43e99fdSEd Maste { 373*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 374*c43e99fdSEd Maste struct bufferevent_private *bev_p = BEV_UPCAST(bev); 375*c43e99fdSEd Maste evutil_socket_t fd; 376*c43e99fdSEd Maste 377*c43e99fdSEd Maste EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 378*c43e99fdSEd Maste !upcast(bev)->read_in_progress); 379*c43e99fdSEd Maste 380*c43e99fdSEd Maste bev_async_del_read(bev_async); 381*c43e99fdSEd Maste bev_async_del_write(bev_async); 382*c43e99fdSEd Maste 383*c43e99fdSEd Maste fd = evbuffer_overlapped_get_fd_(bev->input); 384*c43e99fdSEd Maste if (fd != (evutil_socket_t)INVALID_SOCKET && 385*c43e99fdSEd Maste (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 386*c43e99fdSEd Maste evutil_closesocket(fd); 387*c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 388*c43e99fdSEd Maste } 389*c43e99fdSEd Maste } 390*c43e99fdSEd Maste 391*c43e99fdSEd Maste /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 392*c43e99fdSEd Maste * we use WSAGetOverlappedResult to translate. */ 393*c43e99fdSEd Maste static void 394*c43e99fdSEd Maste bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 395*c43e99fdSEd Maste { 396*c43e99fdSEd Maste DWORD bytes, flags; 397*c43e99fdSEd Maste evutil_socket_t fd; 398*c43e99fdSEd Maste 399*c43e99fdSEd Maste fd = evbuffer_overlapped_get_fd_(bev->input); 400*c43e99fdSEd Maste WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 401*c43e99fdSEd Maste } 402*c43e99fdSEd Maste 403*c43e99fdSEd Maste static int 404*c43e99fdSEd Maste be_async_flush(struct bufferevent *bev, short what, 405*c43e99fdSEd Maste enum bufferevent_flush_mode mode) 406*c43e99fdSEd Maste { 407*c43e99fdSEd Maste return 0; 408*c43e99fdSEd Maste } 409*c43e99fdSEd Maste 410*c43e99fdSEd Maste static void 411*c43e99fdSEd Maste connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 412*c43e99fdSEd Maste ev_ssize_t nbytes, int ok) 413*c43e99fdSEd Maste { 414*c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_connect(eo); 415*c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev; 416*c43e99fdSEd Maste evutil_socket_t sock; 417*c43e99fdSEd Maste 418*c43e99fdSEd Maste BEV_LOCK(bev); 419*c43e99fdSEd Maste 420*c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->bev.connecting); 421*c43e99fdSEd Maste bev_a->bev.connecting = 0; 422*c43e99fdSEd Maste sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 423*c43e99fdSEd Maste /* XXXX Handle error? */ 424*c43e99fdSEd Maste setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 425*c43e99fdSEd Maste 426*c43e99fdSEd Maste if (ok) 427*c43e99fdSEd Maste bufferevent_async_set_connected_(bev); 428*c43e99fdSEd Maste else 429*c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo); 430*c43e99fdSEd Maste 431*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, 432*c43e99fdSEd Maste ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 433*c43e99fdSEd Maste 434*c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base); 435*c43e99fdSEd Maste 436*c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev); 437*c43e99fdSEd Maste } 438*c43e99fdSEd Maste 439*c43e99fdSEd Maste static void 440*c43e99fdSEd Maste read_complete(struct event_overlapped *eo, ev_uintptr_t key, 441*c43e99fdSEd Maste ev_ssize_t nbytes, int ok) 442*c43e99fdSEd Maste { 443*c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_read(eo); 444*c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev; 445*c43e99fdSEd Maste short what = BEV_EVENT_READING; 446*c43e99fdSEd Maste ev_ssize_t amount_unread; 447*c43e99fdSEd Maste BEV_LOCK(bev); 448*c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->read_in_progress); 449*c43e99fdSEd Maste 450*c43e99fdSEd Maste amount_unread = bev_a->read_in_progress - nbytes; 451*c43e99fdSEd Maste evbuffer_commit_read_(bev->input, nbytes); 452*c43e99fdSEd Maste bev_a->read_in_progress = 0; 453*c43e99fdSEd Maste if (amount_unread) 454*c43e99fdSEd Maste bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 455*c43e99fdSEd Maste 456*c43e99fdSEd Maste if (!ok) 457*c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo); 458*c43e99fdSEd Maste 459*c43e99fdSEd Maste if (bev_a->ok) { 460*c43e99fdSEd Maste if (ok && nbytes) { 461*c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(bev); 462*c43e99fdSEd Maste bufferevent_trigger_nolock_(bev, EV_READ, 0); 463*c43e99fdSEd Maste bev_async_consider_reading(bev_a); 464*c43e99fdSEd Maste } else if (!ok) { 465*c43e99fdSEd Maste what |= BEV_EVENT_ERROR; 466*c43e99fdSEd Maste bev_a->ok = 0; 467*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, what, 0); 468*c43e99fdSEd Maste } else if (!nbytes) { 469*c43e99fdSEd Maste what |= BEV_EVENT_EOF; 470*c43e99fdSEd Maste bev_a->ok = 0; 471*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, what, 0); 472*c43e99fdSEd Maste } 473*c43e99fdSEd Maste } 474*c43e99fdSEd Maste 475*c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev); 476*c43e99fdSEd Maste } 477*c43e99fdSEd Maste 478*c43e99fdSEd Maste static void 479*c43e99fdSEd Maste write_complete(struct event_overlapped *eo, ev_uintptr_t key, 480*c43e99fdSEd Maste ev_ssize_t nbytes, int ok) 481*c43e99fdSEd Maste { 482*c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast_write(eo); 483*c43e99fdSEd Maste struct bufferevent *bev = &bev_a->bev.bev; 484*c43e99fdSEd Maste short what = BEV_EVENT_WRITING; 485*c43e99fdSEd Maste ev_ssize_t amount_unwritten; 486*c43e99fdSEd Maste 487*c43e99fdSEd Maste BEV_LOCK(bev); 488*c43e99fdSEd Maste EVUTIL_ASSERT(bev_a->write_in_progress); 489*c43e99fdSEd Maste 490*c43e99fdSEd Maste amount_unwritten = bev_a->write_in_progress - nbytes; 491*c43e99fdSEd Maste evbuffer_commit_write_(bev->output, nbytes); 492*c43e99fdSEd Maste bev_a->write_in_progress = 0; 493*c43e99fdSEd Maste 494*c43e99fdSEd Maste if (amount_unwritten) 495*c43e99fdSEd Maste bufferevent_decrement_write_buckets_(&bev_a->bev, 496*c43e99fdSEd Maste -amount_unwritten); 497*c43e99fdSEd Maste 498*c43e99fdSEd Maste 499*c43e99fdSEd Maste if (!ok) 500*c43e99fdSEd Maste bev_async_set_wsa_error(bev, eo); 501*c43e99fdSEd Maste 502*c43e99fdSEd Maste if (bev_a->ok) { 503*c43e99fdSEd Maste if (ok && nbytes) { 504*c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 505*c43e99fdSEd Maste bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 506*c43e99fdSEd Maste bev_async_consider_writing(bev_a); 507*c43e99fdSEd Maste } else if (!ok) { 508*c43e99fdSEd Maste what |= BEV_EVENT_ERROR; 509*c43e99fdSEd Maste bev_a->ok = 0; 510*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, what, 0); 511*c43e99fdSEd Maste } else if (!nbytes) { 512*c43e99fdSEd Maste what |= BEV_EVENT_EOF; 513*c43e99fdSEd Maste bev_a->ok = 0; 514*c43e99fdSEd Maste bufferevent_run_eventcb_(bev, what, 0); 515*c43e99fdSEd Maste } 516*c43e99fdSEd Maste } 517*c43e99fdSEd Maste 518*c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev); 519*c43e99fdSEd Maste } 520*c43e99fdSEd Maste 521*c43e99fdSEd Maste struct bufferevent * 522*c43e99fdSEd Maste bufferevent_async_new_(struct event_base *base, 523*c43e99fdSEd Maste evutil_socket_t fd, int options) 524*c43e99fdSEd Maste { 525*c43e99fdSEd Maste struct bufferevent_async *bev_a; 526*c43e99fdSEd Maste struct bufferevent *bev; 527*c43e99fdSEd Maste struct event_iocp_port *iocp; 528*c43e99fdSEd Maste 529*c43e99fdSEd Maste options |= BEV_OPT_THREADSAFE; 530*c43e99fdSEd Maste 531*c43e99fdSEd Maste if (!(iocp = event_base_get_iocp_(base))) 532*c43e99fdSEd Maste return NULL; 533*c43e99fdSEd Maste 534*c43e99fdSEd Maste if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 535*c43e99fdSEd Maste int err = GetLastError(); 536*c43e99fdSEd Maste /* We may have alrady associated this fd with a port. 537*c43e99fdSEd Maste * Let's hope it's this port, and that the error code 538*c43e99fdSEd Maste * for doing this neer changes. */ 539*c43e99fdSEd Maste if (err != ERROR_INVALID_PARAMETER) 540*c43e99fdSEd Maste return NULL; 541*c43e99fdSEd Maste } 542*c43e99fdSEd Maste 543*c43e99fdSEd Maste if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 544*c43e99fdSEd Maste return NULL; 545*c43e99fdSEd Maste 546*c43e99fdSEd Maste bev = &bev_a->bev.bev; 547*c43e99fdSEd Maste if (!(bev->input = evbuffer_overlapped_new_(fd))) { 548*c43e99fdSEd Maste mm_free(bev_a); 549*c43e99fdSEd Maste return NULL; 550*c43e99fdSEd Maste } 551*c43e99fdSEd Maste if (!(bev->output = evbuffer_overlapped_new_(fd))) { 552*c43e99fdSEd Maste evbuffer_free(bev->input); 553*c43e99fdSEd Maste mm_free(bev_a); 554*c43e99fdSEd Maste return NULL; 555*c43e99fdSEd Maste } 556*c43e99fdSEd Maste 557*c43e99fdSEd Maste if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 558*c43e99fdSEd Maste options)<0) 559*c43e99fdSEd Maste goto err; 560*c43e99fdSEd Maste 561*c43e99fdSEd Maste evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 562*c43e99fdSEd Maste evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 563*c43e99fdSEd Maste 564*c43e99fdSEd Maste event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 565*c43e99fdSEd Maste event_overlapped_init_(&bev_a->read_overlapped, read_complete); 566*c43e99fdSEd Maste event_overlapped_init_(&bev_a->write_overlapped, write_complete); 567*c43e99fdSEd Maste 568*c43e99fdSEd Maste bufferevent_init_generic_timeout_cbs_(bev); 569*c43e99fdSEd Maste 570*c43e99fdSEd Maste bev_a->ok = fd >= 0; 571*c43e99fdSEd Maste 572*c43e99fdSEd Maste return bev; 573*c43e99fdSEd Maste err: 574*c43e99fdSEd Maste bufferevent_free(&bev_a->bev.bev); 575*c43e99fdSEd Maste return NULL; 576*c43e99fdSEd Maste } 577*c43e99fdSEd Maste 578*c43e99fdSEd Maste void 579*c43e99fdSEd Maste bufferevent_async_set_connected_(struct bufferevent *bev) 580*c43e99fdSEd Maste { 581*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 582*c43e99fdSEd Maste bev_async->ok = 1; 583*c43e99fdSEd Maste bufferevent_init_generic_timeout_cbs_(bev); 584*c43e99fdSEd Maste /* Now's a good time to consider reading/writing */ 585*c43e99fdSEd Maste be_async_enable(bev, bev->enabled); 586*c43e99fdSEd Maste } 587*c43e99fdSEd Maste 588*c43e99fdSEd Maste int 589*c43e99fdSEd Maste bufferevent_async_can_connect_(struct bufferevent *bev) 590*c43e99fdSEd Maste { 591*c43e99fdSEd Maste const struct win32_extension_fns *ext = 592*c43e99fdSEd Maste event_get_win32_extension_fns_(); 593*c43e99fdSEd Maste 594*c43e99fdSEd Maste if (BEV_IS_ASYNC(bev) && 595*c43e99fdSEd Maste event_base_get_iocp_(bev->ev_base) && 596*c43e99fdSEd Maste ext && ext->ConnectEx) 597*c43e99fdSEd Maste return 1; 598*c43e99fdSEd Maste 599*c43e99fdSEd Maste return 0; 600*c43e99fdSEd Maste } 601*c43e99fdSEd Maste 602*c43e99fdSEd Maste int 603*c43e99fdSEd Maste bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 604*c43e99fdSEd Maste const struct sockaddr *sa, int socklen) 605*c43e99fdSEd Maste { 606*c43e99fdSEd Maste BOOL rc; 607*c43e99fdSEd Maste struct bufferevent_async *bev_async = upcast(bev); 608*c43e99fdSEd Maste struct sockaddr_storage ss; 609*c43e99fdSEd Maste const struct win32_extension_fns *ext = 610*c43e99fdSEd Maste event_get_win32_extension_fns_(); 611*c43e99fdSEd Maste 612*c43e99fdSEd Maste EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 613*c43e99fdSEd Maste 614*c43e99fdSEd Maste /* ConnectEx() requires that the socket be bound to an address 615*c43e99fdSEd Maste * with bind() before using, otherwise it will fail. We attempt 616*c43e99fdSEd Maste * to issue a bind() here, taking into account that the error 617*c43e99fdSEd Maste * code is set to WSAEINVAL when the socket is already bound. */ 618*c43e99fdSEd Maste memset(&ss, 0, sizeof(ss)); 619*c43e99fdSEd Maste if (sa->sa_family == AF_INET) { 620*c43e99fdSEd Maste struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 621*c43e99fdSEd Maste sin->sin_family = AF_INET; 622*c43e99fdSEd Maste sin->sin_addr.s_addr = INADDR_ANY; 623*c43e99fdSEd Maste } else if (sa->sa_family == AF_INET6) { 624*c43e99fdSEd Maste struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 625*c43e99fdSEd Maste sin6->sin6_family = AF_INET6; 626*c43e99fdSEd Maste sin6->sin6_addr = in6addr_any; 627*c43e99fdSEd Maste } else { 628*c43e99fdSEd Maste /* Well, the user will have to bind() */ 629*c43e99fdSEd Maste return -1; 630*c43e99fdSEd Maste } 631*c43e99fdSEd Maste if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 632*c43e99fdSEd Maste WSAGetLastError() != WSAEINVAL) 633*c43e99fdSEd Maste return -1; 634*c43e99fdSEd Maste 635*c43e99fdSEd Maste event_base_add_virtual_(bev->ev_base); 636*c43e99fdSEd Maste bufferevent_incref_(bev); 637*c43e99fdSEd Maste rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 638*c43e99fdSEd Maste &bev_async->connect_overlapped.overlapped); 639*c43e99fdSEd Maste if (rc || WSAGetLastError() == ERROR_IO_PENDING) 640*c43e99fdSEd Maste return 0; 641*c43e99fdSEd Maste 642*c43e99fdSEd Maste event_base_del_virtual_(bev->ev_base); 643*c43e99fdSEd Maste bufferevent_decref_(bev); 644*c43e99fdSEd Maste 645*c43e99fdSEd Maste return -1; 646*c43e99fdSEd Maste } 647*c43e99fdSEd Maste 648*c43e99fdSEd Maste static int 649*c43e99fdSEd Maste be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 650*c43e99fdSEd Maste union bufferevent_ctrl_data *data) 651*c43e99fdSEd Maste { 652*c43e99fdSEd Maste switch (op) { 653*c43e99fdSEd Maste case BEV_CTRL_GET_FD: 654*c43e99fdSEd Maste data->fd = evbuffer_overlapped_get_fd_(bev->input); 655*c43e99fdSEd Maste return 0; 656*c43e99fdSEd Maste case BEV_CTRL_SET_FD: { 657*c43e99fdSEd Maste struct event_iocp_port *iocp; 658*c43e99fdSEd Maste 659*c43e99fdSEd Maste if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 660*c43e99fdSEd Maste return 0; 661*c43e99fdSEd Maste if (!(iocp = event_base_get_iocp_(bev->ev_base))) 662*c43e99fdSEd Maste return -1; 663*c43e99fdSEd Maste if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 664*c43e99fdSEd Maste return -1; 665*c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->input, data->fd); 666*c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->output, data->fd); 667*c43e99fdSEd Maste return 0; 668*c43e99fdSEd Maste } 669*c43e99fdSEd Maste case BEV_CTRL_CANCEL_ALL: { 670*c43e99fdSEd Maste struct bufferevent_async *bev_a = upcast(bev); 671*c43e99fdSEd Maste evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 672*c43e99fdSEd Maste if (fd != (evutil_socket_t)INVALID_SOCKET && 673*c43e99fdSEd Maste (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 674*c43e99fdSEd Maste closesocket(fd); 675*c43e99fdSEd Maste evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 676*c43e99fdSEd Maste } 677*c43e99fdSEd Maste bev_a->ok = 0; 678*c43e99fdSEd Maste return 0; 679*c43e99fdSEd Maste } 680*c43e99fdSEd Maste case BEV_CTRL_GET_UNDERLYING: 681*c43e99fdSEd Maste default: 682*c43e99fdSEd Maste return -1; 683*c43e99fdSEd Maste } 684*c43e99fdSEd Maste } 685*c43e99fdSEd Maste 686*c43e99fdSEd Maste 687