1*2b15cb3dSCy Schubert /* 2*2b15cb3dSCy Schubert * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3*2b15cb3dSCy Schubert * 4*2b15cb3dSCy Schubert * All rights reserved. 5*2b15cb3dSCy Schubert * 6*2b15cb3dSCy Schubert * Redistribution and use in source and binary forms, with or without 7*2b15cb3dSCy Schubert * modification, are permitted provided that the following conditions 8*2b15cb3dSCy Schubert * are met: 9*2b15cb3dSCy Schubert * 1. Redistributions of source code must retain the above copyright 10*2b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer. 11*2b15cb3dSCy Schubert * 2. Redistributions in binary form must reproduce the above copyright 12*2b15cb3dSCy Schubert * notice, this list of conditions and the following disclaimer in the 13*2b15cb3dSCy Schubert * documentation and/or other materials provided with the distribution. 14*2b15cb3dSCy Schubert * 3. The name of the author may not be used to endorse or promote products 15*2b15cb3dSCy Schubert * derived from this software without specific prior written permission. 16*2b15cb3dSCy Schubert * 17*2b15cb3dSCy Schubert * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18*2b15cb3dSCy Schubert * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19*2b15cb3dSCy Schubert * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20*2b15cb3dSCy Schubert * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21*2b15cb3dSCy Schubert * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22*2b15cb3dSCy Schubert * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23*2b15cb3dSCy Schubert * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24*2b15cb3dSCy Schubert * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25*2b15cb3dSCy Schubert * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26*2b15cb3dSCy Schubert * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27*2b15cb3dSCy Schubert */ 28*2b15cb3dSCy Schubert 29*2b15cb3dSCy Schubert #include "event2/event-config.h" 30*2b15cb3dSCy Schubert #include "evconfig-private.h" 31*2b15cb3dSCy Schubert 32*2b15cb3dSCy Schubert #ifdef EVENT__HAVE_SYS_TIME_H 33*2b15cb3dSCy Schubert #include <sys/time.h> 34*2b15cb3dSCy Schubert #endif 35*2b15cb3dSCy Schubert 36*2b15cb3dSCy Schubert #include <errno.h> 37*2b15cb3dSCy Schubert #include <stdio.h> 38*2b15cb3dSCy Schubert #include <stdlib.h> 39*2b15cb3dSCy Schubert #include <string.h> 40*2b15cb3dSCy Schubert #ifdef EVENT__HAVE_STDARG_H 41*2b15cb3dSCy Schubert #include <stdarg.h> 42*2b15cb3dSCy Schubert #endif 43*2b15cb3dSCy Schubert #ifdef EVENT__HAVE_UNISTD_H 44*2b15cb3dSCy Schubert #include <unistd.h> 45*2b15cb3dSCy Schubert #endif 46*2b15cb3dSCy Schubert 47*2b15cb3dSCy Schubert #ifdef _WIN32 48*2b15cb3dSCy Schubert #include <winsock2.h> 49*2b15cb3dSCy Schubert #include <ws2tcpip.h> 50*2b15cb3dSCy Schubert #endif 51*2b15cb3dSCy Schubert 52*2b15cb3dSCy Schubert #include <sys/queue.h> 53*2b15cb3dSCy Schubert 54*2b15cb3dSCy Schubert #include "event2/util.h" 55*2b15cb3dSCy Schubert #include "event2/bufferevent.h" 56*2b15cb3dSCy Schubert #include "event2/buffer.h" 57*2b15cb3dSCy Schubert #include "event2/bufferevent_struct.h" 58*2b15cb3dSCy Schubert #include "event2/event.h" 59*2b15cb3dSCy Schubert #include "event2/util.h" 60*2b15cb3dSCy Schubert #include "event-internal.h" 61*2b15cb3dSCy Schubert #include "log-internal.h" 62*2b15cb3dSCy Schubert #include "mm-internal.h" 63*2b15cb3dSCy Schubert #include "bufferevent-internal.h" 64*2b15cb3dSCy Schubert #include "util-internal.h" 65*2b15cb3dSCy Schubert #include "iocp-internal.h" 66*2b15cb3dSCy Schubert 67*2b15cb3dSCy Schubert #ifndef SO_UPDATE_CONNECT_CONTEXT 68*2b15cb3dSCy Schubert /* Mingw is sometimes missing this */ 69*2b15cb3dSCy Schubert #define SO_UPDATE_CONNECT_CONTEXT 0x7010 70*2b15cb3dSCy Schubert #endif 71*2b15cb3dSCy Schubert 72*2b15cb3dSCy Schubert /* prototypes */ 73*2b15cb3dSCy Schubert static int be_async_enable(struct bufferevent *, short); 74*2b15cb3dSCy Schubert static int be_async_disable(struct bufferevent *, short); 75*2b15cb3dSCy Schubert static void be_async_destruct(struct bufferevent *); 76*2b15cb3dSCy Schubert static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 77*2b15cb3dSCy Schubert static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 78*2b15cb3dSCy Schubert 79*2b15cb3dSCy Schubert struct bufferevent_async { 80*2b15cb3dSCy Schubert struct bufferevent_private bev; 81*2b15cb3dSCy Schubert struct event_overlapped connect_overlapped; 82*2b15cb3dSCy Schubert struct event_overlapped read_overlapped; 83*2b15cb3dSCy Schubert struct event_overlapped write_overlapped; 84*2b15cb3dSCy Schubert size_t read_in_progress; 85*2b15cb3dSCy Schubert size_t write_in_progress; 86*2b15cb3dSCy Schubert unsigned ok : 1; 87*2b15cb3dSCy Schubert unsigned read_added : 1; 88*2b15cb3dSCy Schubert unsigned write_added : 1; 89*2b15cb3dSCy Schubert }; 90*2b15cb3dSCy Schubert 91*2b15cb3dSCy Schubert const struct bufferevent_ops bufferevent_ops_async = { 92*2b15cb3dSCy Schubert "socket_async", 93*2b15cb3dSCy Schubert evutil_offsetof(struct bufferevent_async, bev.bev), 94*2b15cb3dSCy Schubert be_async_enable, 95*2b15cb3dSCy Schubert be_async_disable, 96*2b15cb3dSCy Schubert NULL, /* Unlink */ 97*2b15cb3dSCy Schubert be_async_destruct, 98*2b15cb3dSCy Schubert bufferevent_generic_adj_timeouts_, 99*2b15cb3dSCy Schubert be_async_flush, 100*2b15cb3dSCy Schubert be_async_ctrl, 101*2b15cb3dSCy Schubert }; 102*2b15cb3dSCy Schubert 103*2b15cb3dSCy Schubert static inline struct bufferevent_async * 104*2b15cb3dSCy Schubert upcast(struct bufferevent *bev) 105*2b15cb3dSCy Schubert { 106*2b15cb3dSCy Schubert struct bufferevent_async *bev_a; 107*2b15cb3dSCy Schubert if (bev->be_ops != &bufferevent_ops_async) 108*2b15cb3dSCy Schubert return NULL; 109*2b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 110*2b15cb3dSCy Schubert return bev_a; 111*2b15cb3dSCy Schubert } 112*2b15cb3dSCy Schubert 113*2b15cb3dSCy Schubert static inline struct bufferevent_async * 114*2b15cb3dSCy Schubert upcast_connect(struct event_overlapped *eo) 115*2b15cb3dSCy Schubert { 116*2b15cb3dSCy Schubert struct bufferevent_async *bev_a; 117*2b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 118*2b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 119*2b15cb3dSCy Schubert return bev_a; 120*2b15cb3dSCy Schubert } 121*2b15cb3dSCy Schubert 122*2b15cb3dSCy Schubert static inline struct bufferevent_async * 123*2b15cb3dSCy Schubert upcast_read(struct event_overlapped *eo) 124*2b15cb3dSCy Schubert { 125*2b15cb3dSCy Schubert struct bufferevent_async *bev_a; 126*2b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 127*2b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 128*2b15cb3dSCy Schubert return bev_a; 129*2b15cb3dSCy Schubert } 130*2b15cb3dSCy Schubert 131*2b15cb3dSCy Schubert static inline struct bufferevent_async * 132*2b15cb3dSCy Schubert upcast_write(struct event_overlapped *eo) 133*2b15cb3dSCy Schubert { 134*2b15cb3dSCy Schubert struct bufferevent_async *bev_a; 135*2b15cb3dSCy Schubert bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 136*2b15cb3dSCy Schubert EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 137*2b15cb3dSCy Schubert return bev_a; 138*2b15cb3dSCy Schubert } 139*2b15cb3dSCy Schubert 140*2b15cb3dSCy Schubert static void 141*2b15cb3dSCy Schubert bev_async_del_write(struct bufferevent_async *beva) 142*2b15cb3dSCy Schubert { 143*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 144*2b15cb3dSCy Schubert 145*2b15cb3dSCy Schubert if (beva->write_added) { 146*2b15cb3dSCy Schubert beva->write_added = 0; 147*2b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 148*2b15cb3dSCy Schubert } 149*2b15cb3dSCy Schubert } 150*2b15cb3dSCy Schubert 151*2b15cb3dSCy Schubert static void 152*2b15cb3dSCy Schubert bev_async_del_read(struct bufferevent_async *beva) 153*2b15cb3dSCy Schubert { 154*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 155*2b15cb3dSCy Schubert 156*2b15cb3dSCy Schubert if (beva->read_added) { 157*2b15cb3dSCy Schubert beva->read_added = 0; 158*2b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 159*2b15cb3dSCy Schubert } 160*2b15cb3dSCy Schubert } 161*2b15cb3dSCy Schubert 162*2b15cb3dSCy Schubert static void 163*2b15cb3dSCy Schubert bev_async_add_write(struct bufferevent_async *beva) 164*2b15cb3dSCy Schubert { 165*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 166*2b15cb3dSCy Schubert 167*2b15cb3dSCy Schubert if (!beva->write_added) { 168*2b15cb3dSCy Schubert beva->write_added = 1; 169*2b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 170*2b15cb3dSCy Schubert } 171*2b15cb3dSCy Schubert } 172*2b15cb3dSCy Schubert 173*2b15cb3dSCy Schubert static void 174*2b15cb3dSCy Schubert bev_async_add_read(struct bufferevent_async *beva) 175*2b15cb3dSCy Schubert { 176*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 177*2b15cb3dSCy Schubert 178*2b15cb3dSCy Schubert if (!beva->read_added) { 179*2b15cb3dSCy Schubert beva->read_added = 1; 180*2b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 181*2b15cb3dSCy Schubert } 182*2b15cb3dSCy Schubert } 183*2b15cb3dSCy Schubert 184*2b15cb3dSCy Schubert static void 185*2b15cb3dSCy Schubert bev_async_consider_writing(struct bufferevent_async *beva) 186*2b15cb3dSCy Schubert { 187*2b15cb3dSCy Schubert size_t at_most; 188*2b15cb3dSCy Schubert int limit; 189*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 190*2b15cb3dSCy Schubert 191*2b15cb3dSCy Schubert /* Don't write if there's a write in progress, or we do not 192*2b15cb3dSCy Schubert * want to write, or when there's nothing left to write. */ 193*2b15cb3dSCy Schubert if (beva->write_in_progress || beva->bev.connecting) 194*2b15cb3dSCy Schubert return; 195*2b15cb3dSCy Schubert if (!beva->ok || !(bev->enabled&EV_WRITE) || 196*2b15cb3dSCy Schubert !evbuffer_get_length(bev->output)) { 197*2b15cb3dSCy Schubert bev_async_del_write(beva); 198*2b15cb3dSCy Schubert return; 199*2b15cb3dSCy Schubert } 200*2b15cb3dSCy Schubert 201*2b15cb3dSCy Schubert at_most = evbuffer_get_length(bev->output); 202*2b15cb3dSCy Schubert 203*2b15cb3dSCy Schubert /* This is safe so long as bufferevent_get_write_max never returns 204*2b15cb3dSCy Schubert * more than INT_MAX. That's true for now. XXXX */ 205*2b15cb3dSCy Schubert limit = (int)bufferevent_get_write_max_(&beva->bev); 206*2b15cb3dSCy Schubert if (at_most >= (size_t)limit && limit >= 0) 207*2b15cb3dSCy Schubert at_most = limit; 208*2b15cb3dSCy Schubert 209*2b15cb3dSCy Schubert if (beva->bev.write_suspended) { 210*2b15cb3dSCy Schubert bev_async_del_write(beva); 211*2b15cb3dSCy Schubert return; 212*2b15cb3dSCy Schubert } 213*2b15cb3dSCy Schubert 214*2b15cb3dSCy Schubert /* XXXX doesn't respect low-water mark very well. */ 215*2b15cb3dSCy Schubert bufferevent_incref_(bev); 216*2b15cb3dSCy Schubert if (evbuffer_launch_write_(bev->output, at_most, 217*2b15cb3dSCy Schubert &beva->write_overlapped)) { 218*2b15cb3dSCy Schubert bufferevent_decref_(bev); 219*2b15cb3dSCy Schubert beva->ok = 0; 220*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 221*2b15cb3dSCy Schubert } else { 222*2b15cb3dSCy Schubert beva->write_in_progress = at_most; 223*2b15cb3dSCy Schubert bufferevent_decrement_write_buckets_(&beva->bev, at_most); 224*2b15cb3dSCy Schubert bev_async_add_write(beva); 225*2b15cb3dSCy Schubert } 226*2b15cb3dSCy Schubert } 227*2b15cb3dSCy Schubert 228*2b15cb3dSCy Schubert static void 229*2b15cb3dSCy Schubert bev_async_consider_reading(struct bufferevent_async *beva) 230*2b15cb3dSCy Schubert { 231*2b15cb3dSCy Schubert size_t cur_size; 232*2b15cb3dSCy Schubert size_t read_high; 233*2b15cb3dSCy Schubert size_t at_most; 234*2b15cb3dSCy Schubert int limit; 235*2b15cb3dSCy Schubert struct bufferevent *bev = &beva->bev.bev; 236*2b15cb3dSCy Schubert 237*2b15cb3dSCy Schubert /* Don't read if there is a read in progress, or we do not 238*2b15cb3dSCy Schubert * want to read. */ 239*2b15cb3dSCy Schubert if (beva->read_in_progress || beva->bev.connecting) 240*2b15cb3dSCy Schubert return; 241*2b15cb3dSCy Schubert if (!beva->ok || !(bev->enabled&EV_READ)) { 242*2b15cb3dSCy Schubert bev_async_del_read(beva); 243*2b15cb3dSCy Schubert return; 244*2b15cb3dSCy Schubert } 245*2b15cb3dSCy Schubert 246*2b15cb3dSCy Schubert /* Don't read if we're full */ 247*2b15cb3dSCy Schubert cur_size = evbuffer_get_length(bev->input); 248*2b15cb3dSCy Schubert read_high = bev->wm_read.high; 249*2b15cb3dSCy Schubert if (read_high) { 250*2b15cb3dSCy Schubert if (cur_size >= read_high) { 251*2b15cb3dSCy Schubert bev_async_del_read(beva); 252*2b15cb3dSCy Schubert return; 253*2b15cb3dSCy Schubert } 254*2b15cb3dSCy Schubert at_most = read_high - cur_size; 255*2b15cb3dSCy Schubert } else { 256*2b15cb3dSCy Schubert at_most = 16384; /* FIXME totally magic. */ 257*2b15cb3dSCy Schubert } 258*2b15cb3dSCy Schubert 259*2b15cb3dSCy Schubert /* XXXX This over-commits. */ 260*2b15cb3dSCy Schubert /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 261*2b15cb3dSCy Schubert limit = (int)bufferevent_get_read_max_(&beva->bev); 262*2b15cb3dSCy Schubert if (at_most >= (size_t)limit && limit >= 0) 263*2b15cb3dSCy Schubert at_most = limit; 264*2b15cb3dSCy Schubert 265*2b15cb3dSCy Schubert if (beva->bev.read_suspended) { 266*2b15cb3dSCy Schubert bev_async_del_read(beva); 267*2b15cb3dSCy Schubert return; 268*2b15cb3dSCy Schubert } 269*2b15cb3dSCy Schubert 270*2b15cb3dSCy Schubert bufferevent_incref_(bev); 271*2b15cb3dSCy Schubert if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 272*2b15cb3dSCy Schubert beva->ok = 0; 273*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 274*2b15cb3dSCy Schubert bufferevent_decref_(bev); 275*2b15cb3dSCy Schubert } else { 276*2b15cb3dSCy Schubert beva->read_in_progress = at_most; 277*2b15cb3dSCy Schubert bufferevent_decrement_read_buckets_(&beva->bev, at_most); 278*2b15cb3dSCy Schubert bev_async_add_read(beva); 279*2b15cb3dSCy Schubert } 280*2b15cb3dSCy Schubert 281*2b15cb3dSCy Schubert return; 282*2b15cb3dSCy Schubert } 283*2b15cb3dSCy Schubert 284*2b15cb3dSCy Schubert static void 285*2b15cb3dSCy Schubert be_async_outbuf_callback(struct evbuffer *buf, 286*2b15cb3dSCy Schubert const struct evbuffer_cb_info *cbinfo, 287*2b15cb3dSCy Schubert void *arg) 288*2b15cb3dSCy Schubert { 289*2b15cb3dSCy Schubert struct bufferevent *bev = arg; 290*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 291*2b15cb3dSCy Schubert 292*2b15cb3dSCy Schubert /* If we added data to the outbuf and were not writing before, 293*2b15cb3dSCy Schubert * we may want to write now. */ 294*2b15cb3dSCy Schubert 295*2b15cb3dSCy Schubert bufferevent_incref_and_lock_(bev); 296*2b15cb3dSCy Schubert 297*2b15cb3dSCy Schubert if (cbinfo->n_added) 298*2b15cb3dSCy Schubert bev_async_consider_writing(bev_async); 299*2b15cb3dSCy Schubert 300*2b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 301*2b15cb3dSCy Schubert } 302*2b15cb3dSCy Schubert 303*2b15cb3dSCy Schubert static void 304*2b15cb3dSCy Schubert be_async_inbuf_callback(struct evbuffer *buf, 305*2b15cb3dSCy Schubert const struct evbuffer_cb_info *cbinfo, 306*2b15cb3dSCy Schubert void *arg) 307*2b15cb3dSCy Schubert { 308*2b15cb3dSCy Schubert struct bufferevent *bev = arg; 309*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 310*2b15cb3dSCy Schubert 311*2b15cb3dSCy Schubert /* If we drained data from the inbuf and were not reading before, 312*2b15cb3dSCy Schubert * we may want to read now */ 313*2b15cb3dSCy Schubert 314*2b15cb3dSCy Schubert bufferevent_incref_and_lock_(bev); 315*2b15cb3dSCy Schubert 316*2b15cb3dSCy Schubert if (cbinfo->n_deleted) 317*2b15cb3dSCy Schubert bev_async_consider_reading(bev_async); 318*2b15cb3dSCy Schubert 319*2b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 320*2b15cb3dSCy Schubert } 321*2b15cb3dSCy Schubert 322*2b15cb3dSCy Schubert static int 323*2b15cb3dSCy Schubert be_async_enable(struct bufferevent *buf, short what) 324*2b15cb3dSCy Schubert { 325*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(buf); 326*2b15cb3dSCy Schubert 327*2b15cb3dSCy Schubert if (!bev_async->ok) 328*2b15cb3dSCy Schubert return -1; 329*2b15cb3dSCy Schubert 330*2b15cb3dSCy Schubert if (bev_async->bev.connecting) { 331*2b15cb3dSCy Schubert /* Don't launch anything during connection attempts. */ 332*2b15cb3dSCy Schubert return 0; 333*2b15cb3dSCy Schubert } 334*2b15cb3dSCy Schubert 335*2b15cb3dSCy Schubert if (what & EV_READ) 336*2b15cb3dSCy Schubert BEV_RESET_GENERIC_READ_TIMEOUT(buf); 337*2b15cb3dSCy Schubert if (what & EV_WRITE) 338*2b15cb3dSCy Schubert BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 339*2b15cb3dSCy Schubert 340*2b15cb3dSCy Schubert /* If we newly enable reading or writing, and we aren't reading or 341*2b15cb3dSCy Schubert writing already, consider launching a new read or write. */ 342*2b15cb3dSCy Schubert 343*2b15cb3dSCy Schubert if (what & EV_READ) 344*2b15cb3dSCy Schubert bev_async_consider_reading(bev_async); 345*2b15cb3dSCy Schubert if (what & EV_WRITE) 346*2b15cb3dSCy Schubert bev_async_consider_writing(bev_async); 347*2b15cb3dSCy Schubert return 0; 348*2b15cb3dSCy Schubert } 349*2b15cb3dSCy Schubert 350*2b15cb3dSCy Schubert static int 351*2b15cb3dSCy Schubert be_async_disable(struct bufferevent *bev, short what) 352*2b15cb3dSCy Schubert { 353*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 354*2b15cb3dSCy Schubert /* XXXX If we disable reading or writing, we may want to consider 355*2b15cb3dSCy Schubert * canceling any in-progress read or write operation, though it might 356*2b15cb3dSCy Schubert * not work. */ 357*2b15cb3dSCy Schubert 358*2b15cb3dSCy Schubert if (what & EV_READ) { 359*2b15cb3dSCy Schubert BEV_DEL_GENERIC_READ_TIMEOUT(bev); 360*2b15cb3dSCy Schubert bev_async_del_read(bev_async); 361*2b15cb3dSCy Schubert } 362*2b15cb3dSCy Schubert if (what & EV_WRITE) { 363*2b15cb3dSCy Schubert BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 364*2b15cb3dSCy Schubert bev_async_del_write(bev_async); 365*2b15cb3dSCy Schubert } 366*2b15cb3dSCy Schubert 367*2b15cb3dSCy Schubert return 0; 368*2b15cb3dSCy Schubert } 369*2b15cb3dSCy Schubert 370*2b15cb3dSCy Schubert static void 371*2b15cb3dSCy Schubert be_async_destruct(struct bufferevent *bev) 372*2b15cb3dSCy Schubert { 373*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 374*2b15cb3dSCy Schubert struct bufferevent_private *bev_p = BEV_UPCAST(bev); 375*2b15cb3dSCy Schubert evutil_socket_t fd; 376*2b15cb3dSCy Schubert 377*2b15cb3dSCy Schubert EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 378*2b15cb3dSCy Schubert !upcast(bev)->read_in_progress); 379*2b15cb3dSCy Schubert 380*2b15cb3dSCy Schubert bev_async_del_read(bev_async); 381*2b15cb3dSCy Schubert bev_async_del_write(bev_async); 382*2b15cb3dSCy Schubert 383*2b15cb3dSCy Schubert fd = evbuffer_overlapped_get_fd_(bev->input); 384*2b15cb3dSCy Schubert if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { 385*2b15cb3dSCy Schubert /* XXXX possible double-close */ 386*2b15cb3dSCy Schubert evutil_closesocket(fd); 387*2b15cb3dSCy Schubert } 388*2b15cb3dSCy Schubert } 389*2b15cb3dSCy Schubert 390*2b15cb3dSCy Schubert /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 391*2b15cb3dSCy Schubert * we use WSAGetOverlappedResult to translate. */ 392*2b15cb3dSCy Schubert static void 393*2b15cb3dSCy Schubert bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 394*2b15cb3dSCy Schubert { 395*2b15cb3dSCy Schubert DWORD bytes, flags; 396*2b15cb3dSCy Schubert evutil_socket_t fd; 397*2b15cb3dSCy Schubert 398*2b15cb3dSCy Schubert fd = evbuffer_overlapped_get_fd_(bev->input); 399*2b15cb3dSCy Schubert WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 400*2b15cb3dSCy Schubert } 401*2b15cb3dSCy Schubert 402*2b15cb3dSCy Schubert static int 403*2b15cb3dSCy Schubert be_async_flush(struct bufferevent *bev, short what, 404*2b15cb3dSCy Schubert enum bufferevent_flush_mode mode) 405*2b15cb3dSCy Schubert { 406*2b15cb3dSCy Schubert return 0; 407*2b15cb3dSCy Schubert } 408*2b15cb3dSCy Schubert 409*2b15cb3dSCy Schubert static void 410*2b15cb3dSCy Schubert connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 411*2b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 412*2b15cb3dSCy Schubert { 413*2b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_connect(eo); 414*2b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 415*2b15cb3dSCy Schubert evutil_socket_t sock; 416*2b15cb3dSCy Schubert 417*2b15cb3dSCy Schubert BEV_LOCK(bev); 418*2b15cb3dSCy Schubert 419*2b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->bev.connecting); 420*2b15cb3dSCy Schubert bev_a->bev.connecting = 0; 421*2b15cb3dSCy Schubert sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 422*2b15cb3dSCy Schubert /* XXXX Handle error? */ 423*2b15cb3dSCy Schubert setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 424*2b15cb3dSCy Schubert 425*2b15cb3dSCy Schubert if (ok) 426*2b15cb3dSCy Schubert bufferevent_async_set_connected_(bev); 427*2b15cb3dSCy Schubert else 428*2b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 429*2b15cb3dSCy Schubert 430*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, 431*2b15cb3dSCy Schubert ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 432*2b15cb3dSCy Schubert 433*2b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 434*2b15cb3dSCy Schubert 435*2b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 436*2b15cb3dSCy Schubert } 437*2b15cb3dSCy Schubert 438*2b15cb3dSCy Schubert static void 439*2b15cb3dSCy Schubert read_complete(struct event_overlapped *eo, ev_uintptr_t key, 440*2b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 441*2b15cb3dSCy Schubert { 442*2b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_read(eo); 443*2b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 444*2b15cb3dSCy Schubert short what = BEV_EVENT_READING; 445*2b15cb3dSCy Schubert ev_ssize_t amount_unread; 446*2b15cb3dSCy Schubert BEV_LOCK(bev); 447*2b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->read_in_progress); 448*2b15cb3dSCy Schubert 449*2b15cb3dSCy Schubert amount_unread = bev_a->read_in_progress - nbytes; 450*2b15cb3dSCy Schubert evbuffer_commit_read_(bev->input, nbytes); 451*2b15cb3dSCy Schubert bev_a->read_in_progress = 0; 452*2b15cb3dSCy Schubert if (amount_unread) 453*2b15cb3dSCy Schubert bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 454*2b15cb3dSCy Schubert 455*2b15cb3dSCy Schubert if (!ok) 456*2b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 457*2b15cb3dSCy Schubert 458*2b15cb3dSCy Schubert if (bev_a->ok) { 459*2b15cb3dSCy Schubert if (ok && nbytes) { 460*2b15cb3dSCy Schubert BEV_RESET_GENERIC_READ_TIMEOUT(bev); 461*2b15cb3dSCy Schubert bufferevent_trigger_nolock_(bev, EV_READ, 0); 462*2b15cb3dSCy Schubert bev_async_consider_reading(bev_a); 463*2b15cb3dSCy Schubert } else if (!ok) { 464*2b15cb3dSCy Schubert what |= BEV_EVENT_ERROR; 465*2b15cb3dSCy Schubert bev_a->ok = 0; 466*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 467*2b15cb3dSCy Schubert } else if (!nbytes) { 468*2b15cb3dSCy Schubert what |= BEV_EVENT_EOF; 469*2b15cb3dSCy Schubert bev_a->ok = 0; 470*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 471*2b15cb3dSCy Schubert } 472*2b15cb3dSCy Schubert } 473*2b15cb3dSCy Schubert 474*2b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 475*2b15cb3dSCy Schubert } 476*2b15cb3dSCy Schubert 477*2b15cb3dSCy Schubert static void 478*2b15cb3dSCy Schubert write_complete(struct event_overlapped *eo, ev_uintptr_t key, 479*2b15cb3dSCy Schubert ev_ssize_t nbytes, int ok) 480*2b15cb3dSCy Schubert { 481*2b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast_write(eo); 482*2b15cb3dSCy Schubert struct bufferevent *bev = &bev_a->bev.bev; 483*2b15cb3dSCy Schubert short what = BEV_EVENT_WRITING; 484*2b15cb3dSCy Schubert ev_ssize_t amount_unwritten; 485*2b15cb3dSCy Schubert 486*2b15cb3dSCy Schubert BEV_LOCK(bev); 487*2b15cb3dSCy Schubert EVUTIL_ASSERT(bev_a->write_in_progress); 488*2b15cb3dSCy Schubert 489*2b15cb3dSCy Schubert amount_unwritten = bev_a->write_in_progress - nbytes; 490*2b15cb3dSCy Schubert evbuffer_commit_write_(bev->output, nbytes); 491*2b15cb3dSCy Schubert bev_a->write_in_progress = 0; 492*2b15cb3dSCy Schubert 493*2b15cb3dSCy Schubert if (amount_unwritten) 494*2b15cb3dSCy Schubert bufferevent_decrement_write_buckets_(&bev_a->bev, 495*2b15cb3dSCy Schubert -amount_unwritten); 496*2b15cb3dSCy Schubert 497*2b15cb3dSCy Schubert 498*2b15cb3dSCy Schubert if (!ok) 499*2b15cb3dSCy Schubert bev_async_set_wsa_error(bev, eo); 500*2b15cb3dSCy Schubert 501*2b15cb3dSCy Schubert if (bev_a->ok) { 502*2b15cb3dSCy Schubert if (ok && nbytes) { 503*2b15cb3dSCy Schubert BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 504*2b15cb3dSCy Schubert bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 505*2b15cb3dSCy Schubert bev_async_consider_writing(bev_a); 506*2b15cb3dSCy Schubert } else if (!ok) { 507*2b15cb3dSCy Schubert what |= BEV_EVENT_ERROR; 508*2b15cb3dSCy Schubert bev_a->ok = 0; 509*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 510*2b15cb3dSCy Schubert } else if (!nbytes) { 511*2b15cb3dSCy Schubert what |= BEV_EVENT_EOF; 512*2b15cb3dSCy Schubert bev_a->ok = 0; 513*2b15cb3dSCy Schubert bufferevent_run_eventcb_(bev, what, 0); 514*2b15cb3dSCy Schubert } 515*2b15cb3dSCy Schubert } 516*2b15cb3dSCy Schubert 517*2b15cb3dSCy Schubert bufferevent_decref_and_unlock_(bev); 518*2b15cb3dSCy Schubert } 519*2b15cb3dSCy Schubert 520*2b15cb3dSCy Schubert struct bufferevent * 521*2b15cb3dSCy Schubert bufferevent_async_new_(struct event_base *base, 522*2b15cb3dSCy Schubert evutil_socket_t fd, int options) 523*2b15cb3dSCy Schubert { 524*2b15cb3dSCy Schubert struct bufferevent_async *bev_a; 525*2b15cb3dSCy Schubert struct bufferevent *bev; 526*2b15cb3dSCy Schubert struct event_iocp_port *iocp; 527*2b15cb3dSCy Schubert 528*2b15cb3dSCy Schubert options |= BEV_OPT_THREADSAFE; 529*2b15cb3dSCy Schubert 530*2b15cb3dSCy Schubert if (!(iocp = event_base_get_iocp_(base))) 531*2b15cb3dSCy Schubert return NULL; 532*2b15cb3dSCy Schubert 533*2b15cb3dSCy Schubert if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 534*2b15cb3dSCy Schubert int err = GetLastError(); 535*2b15cb3dSCy Schubert /* We may have alrady associated this fd with a port. 536*2b15cb3dSCy Schubert * Let's hope it's this port, and that the error code 537*2b15cb3dSCy Schubert * for doing this neer changes. */ 538*2b15cb3dSCy Schubert if (err != ERROR_INVALID_PARAMETER) 539*2b15cb3dSCy Schubert return NULL; 540*2b15cb3dSCy Schubert } 541*2b15cb3dSCy Schubert 542*2b15cb3dSCy Schubert if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 543*2b15cb3dSCy Schubert return NULL; 544*2b15cb3dSCy Schubert 545*2b15cb3dSCy Schubert bev = &bev_a->bev.bev; 546*2b15cb3dSCy Schubert if (!(bev->input = evbuffer_overlapped_new_(fd))) { 547*2b15cb3dSCy Schubert mm_free(bev_a); 548*2b15cb3dSCy Schubert return NULL; 549*2b15cb3dSCy Schubert } 550*2b15cb3dSCy Schubert if (!(bev->output = evbuffer_overlapped_new_(fd))) { 551*2b15cb3dSCy Schubert evbuffer_free(bev->input); 552*2b15cb3dSCy Schubert mm_free(bev_a); 553*2b15cb3dSCy Schubert return NULL; 554*2b15cb3dSCy Schubert } 555*2b15cb3dSCy Schubert 556*2b15cb3dSCy Schubert if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 557*2b15cb3dSCy Schubert options)<0) 558*2b15cb3dSCy Schubert goto err; 559*2b15cb3dSCy Schubert 560*2b15cb3dSCy Schubert evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 561*2b15cb3dSCy Schubert evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 562*2b15cb3dSCy Schubert 563*2b15cb3dSCy Schubert event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 564*2b15cb3dSCy Schubert event_overlapped_init_(&bev_a->read_overlapped, read_complete); 565*2b15cb3dSCy Schubert event_overlapped_init_(&bev_a->write_overlapped, write_complete); 566*2b15cb3dSCy Schubert 567*2b15cb3dSCy Schubert bev_a->ok = fd >= 0; 568*2b15cb3dSCy Schubert if (bev_a->ok) 569*2b15cb3dSCy Schubert bufferevent_init_generic_timeout_cbs_(bev); 570*2b15cb3dSCy Schubert 571*2b15cb3dSCy Schubert return bev; 572*2b15cb3dSCy Schubert err: 573*2b15cb3dSCy Schubert bufferevent_free(&bev_a->bev.bev); 574*2b15cb3dSCy Schubert return NULL; 575*2b15cb3dSCy Schubert } 576*2b15cb3dSCy Schubert 577*2b15cb3dSCy Schubert void 578*2b15cb3dSCy Schubert bufferevent_async_set_connected_(struct bufferevent *bev) 579*2b15cb3dSCy Schubert { 580*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 581*2b15cb3dSCy Schubert bev_async->ok = 1; 582*2b15cb3dSCy Schubert bufferevent_init_generic_timeout_cbs_(bev); 583*2b15cb3dSCy Schubert /* Now's a good time to consider reading/writing */ 584*2b15cb3dSCy Schubert be_async_enable(bev, bev->enabled); 585*2b15cb3dSCy Schubert } 586*2b15cb3dSCy Schubert 587*2b15cb3dSCy Schubert int 588*2b15cb3dSCy Schubert bufferevent_async_can_connect_(struct bufferevent *bev) 589*2b15cb3dSCy Schubert { 590*2b15cb3dSCy Schubert const struct win32_extension_fns *ext = 591*2b15cb3dSCy Schubert event_get_win32_extension_fns_(); 592*2b15cb3dSCy Schubert 593*2b15cb3dSCy Schubert if (BEV_IS_ASYNC(bev) && 594*2b15cb3dSCy Schubert event_base_get_iocp_(bev->ev_base) && 595*2b15cb3dSCy Schubert ext && ext->ConnectEx) 596*2b15cb3dSCy Schubert return 1; 597*2b15cb3dSCy Schubert 598*2b15cb3dSCy Schubert return 0; 599*2b15cb3dSCy Schubert } 600*2b15cb3dSCy Schubert 601*2b15cb3dSCy Schubert int 602*2b15cb3dSCy Schubert bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 603*2b15cb3dSCy Schubert const struct sockaddr *sa, int socklen) 604*2b15cb3dSCy Schubert { 605*2b15cb3dSCy Schubert BOOL rc; 606*2b15cb3dSCy Schubert struct bufferevent_async *bev_async = upcast(bev); 607*2b15cb3dSCy Schubert struct sockaddr_storage ss; 608*2b15cb3dSCy Schubert const struct win32_extension_fns *ext = 609*2b15cb3dSCy Schubert event_get_win32_extension_fns_(); 610*2b15cb3dSCy Schubert 611*2b15cb3dSCy Schubert EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 612*2b15cb3dSCy Schubert 613*2b15cb3dSCy Schubert /* ConnectEx() requires that the socket be bound to an address 614*2b15cb3dSCy Schubert * with bind() before using, otherwise it will fail. We attempt 615*2b15cb3dSCy Schubert * to issue a bind() here, taking into account that the error 616*2b15cb3dSCy Schubert * code is set to WSAEINVAL when the socket is already bound. */ 617*2b15cb3dSCy Schubert memset(&ss, 0, sizeof(ss)); 618*2b15cb3dSCy Schubert if (sa->sa_family == AF_INET) { 619*2b15cb3dSCy Schubert struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 620*2b15cb3dSCy Schubert sin->sin_family = AF_INET; 621*2b15cb3dSCy Schubert sin->sin_addr.s_addr = INADDR_ANY; 622*2b15cb3dSCy Schubert } else if (sa->sa_family == AF_INET6) { 623*2b15cb3dSCy Schubert struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 624*2b15cb3dSCy Schubert sin6->sin6_family = AF_INET6; 625*2b15cb3dSCy Schubert sin6->sin6_addr = in6addr_any; 626*2b15cb3dSCy Schubert } else { 627*2b15cb3dSCy Schubert /* Well, the user will have to bind() */ 628*2b15cb3dSCy Schubert return -1; 629*2b15cb3dSCy Schubert } 630*2b15cb3dSCy Schubert if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 631*2b15cb3dSCy Schubert WSAGetLastError() != WSAEINVAL) 632*2b15cb3dSCy Schubert return -1; 633*2b15cb3dSCy Schubert 634*2b15cb3dSCy Schubert event_base_add_virtual_(bev->ev_base); 635*2b15cb3dSCy Schubert bufferevent_incref_(bev); 636*2b15cb3dSCy Schubert rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 637*2b15cb3dSCy Schubert &bev_async->connect_overlapped.overlapped); 638*2b15cb3dSCy Schubert if (rc || WSAGetLastError() == ERROR_IO_PENDING) 639*2b15cb3dSCy Schubert return 0; 640*2b15cb3dSCy Schubert 641*2b15cb3dSCy Schubert event_base_del_virtual_(bev->ev_base); 642*2b15cb3dSCy Schubert bufferevent_decref_(bev); 643*2b15cb3dSCy Schubert 644*2b15cb3dSCy Schubert return -1; 645*2b15cb3dSCy Schubert } 646*2b15cb3dSCy Schubert 647*2b15cb3dSCy Schubert static int 648*2b15cb3dSCy Schubert be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 649*2b15cb3dSCy Schubert union bufferevent_ctrl_data *data) 650*2b15cb3dSCy Schubert { 651*2b15cb3dSCy Schubert switch (op) { 652*2b15cb3dSCy Schubert case BEV_CTRL_GET_FD: 653*2b15cb3dSCy Schubert data->fd = evbuffer_overlapped_get_fd_(bev->input); 654*2b15cb3dSCy Schubert return 0; 655*2b15cb3dSCy Schubert case BEV_CTRL_SET_FD: { 656*2b15cb3dSCy Schubert struct event_iocp_port *iocp; 657*2b15cb3dSCy Schubert 658*2b15cb3dSCy Schubert if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 659*2b15cb3dSCy Schubert return 0; 660*2b15cb3dSCy Schubert if (!(iocp = event_base_get_iocp_(bev->ev_base))) 661*2b15cb3dSCy Schubert return -1; 662*2b15cb3dSCy Schubert if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 663*2b15cb3dSCy Schubert return -1; 664*2b15cb3dSCy Schubert evbuffer_overlapped_set_fd_(bev->input, data->fd); 665*2b15cb3dSCy Schubert evbuffer_overlapped_set_fd_(bev->output, data->fd); 666*2b15cb3dSCy Schubert return 0; 667*2b15cb3dSCy Schubert } 668*2b15cb3dSCy Schubert case BEV_CTRL_CANCEL_ALL: { 669*2b15cb3dSCy Schubert struct bufferevent_async *bev_a = upcast(bev); 670*2b15cb3dSCy Schubert evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 671*2b15cb3dSCy Schubert if (fd != (evutil_socket_t)INVALID_SOCKET && 672*2b15cb3dSCy Schubert (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 673*2b15cb3dSCy Schubert closesocket(fd); 674*2b15cb3dSCy Schubert } 675*2b15cb3dSCy Schubert bev_a->ok = 0; 676*2b15cb3dSCy Schubert return 0; 677*2b15cb3dSCy Schubert } 678*2b15cb3dSCy Schubert case BEV_CTRL_GET_UNDERLYING: 679*2b15cb3dSCy Schubert default: 680*2b15cb3dSCy Schubert return -1; 681*2b15cb3dSCy Schubert } 682*2b15cb3dSCy Schubert } 683*2b15cb3dSCy Schubert 684*2b15cb3dSCy Schubert 685