1 /* 2 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3 * 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include "event2/event-config.h" 30 #include "evconfig-private.h" 31 32 #ifdef EVENT__HAVE_SYS_TIME_H 33 #include <sys/time.h> 34 #endif 35 36 #include <errno.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #ifdef EVENT__HAVE_STDARG_H 41 #include <stdarg.h> 42 #endif 43 #ifdef EVENT__HAVE_UNISTD_H 44 #include <unistd.h> 45 #endif 46 47 #ifdef _WIN32 48 #include <winsock2.h> 49 #include <winerror.h> 50 #include <ws2tcpip.h> 51 #endif 52 53 #include <sys/queue.h> 54 55 #include "event2/util.h" 56 #include "event2/bufferevent.h" 57 #include "event2/buffer.h" 58 #include "event2/bufferevent_struct.h" 59 #include "event2/event.h" 60 #include "event2/util.h" 61 #include "event-internal.h" 62 #include "log-internal.h" 63 #include "mm-internal.h" 64 #include "bufferevent-internal.h" 65 #include "util-internal.h" 66 #include "iocp-internal.h" 67 68 #ifndef SO_UPDATE_CONNECT_CONTEXT 69 /* Mingw is sometimes missing this */ 70 #define SO_UPDATE_CONNECT_CONTEXT 0x7010 71 #endif 72 73 /* prototypes */ 74 static int be_async_enable(struct bufferevent *, short); 75 static int be_async_disable(struct bufferevent *, short); 76 static void be_async_destruct(struct bufferevent *); 77 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 78 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 79 80 struct bufferevent_async { 81 struct bufferevent_private bev; 82 struct event_overlapped connect_overlapped; 83 struct event_overlapped read_overlapped; 84 struct event_overlapped write_overlapped; 85 size_t read_in_progress; 86 size_t write_in_progress; 87 unsigned ok : 1; 88 unsigned read_added : 1; 89 unsigned write_added : 1; 90 }; 91 92 const struct bufferevent_ops bufferevent_ops_async = { 93 "socket_async", 94 evutil_offsetof(struct bufferevent_async, bev.bev), 95 be_async_enable, 96 be_async_disable, 97 NULL, /* Unlink */ 98 be_async_destruct, 99 bufferevent_generic_adj_timeouts_, 100 be_async_flush, 101 be_async_ctrl, 102 }; 103 104 static inline void 105 be_async_run_eventcb(struct bufferevent *bev, short what, int options) 106 { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 107 108 static inline void 109 be_async_trigger_nolock(struct bufferevent *bev, short what, int options) 110 { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } 111 112 static inline int 113 fatal_error(int err) 114 { 115 switch (err) { 116 /* We may have already associated this fd with a port. 117 * Let's hope it's this port, and that the error code 118 * for doing this neer changes. */ 119 case ERROR_INVALID_PARAMETER: 120 return 0; 121 } 122 return 1; 123 } 124 125 static inline struct bufferevent_async * 126 upcast(struct bufferevent *bev) 127 { 128 struct bufferevent_async *bev_a; 129 if (!BEV_IS_ASYNC(bev)) 130 return NULL; 131 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 132 return bev_a; 133 } 134 135 static inline struct bufferevent_async * 136 upcast_connect(struct event_overlapped *eo) 137 { 138 struct bufferevent_async *bev_a; 139 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 140 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 141 return bev_a; 142 } 143 144 static inline struct bufferevent_async * 145 upcast_read(struct event_overlapped *eo) 146 { 147 struct bufferevent_async *bev_a; 148 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 149 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 150 return bev_a; 151 } 152 153 static inline struct bufferevent_async * 154 upcast_write(struct event_overlapped *eo) 155 { 156 struct bufferevent_async *bev_a; 157 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 158 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 159 return bev_a; 160 } 161 162 static void 163 bev_async_del_write(struct bufferevent_async *beva) 164 { 165 struct bufferevent *bev = &beva->bev.bev; 166 167 if (beva->write_added) { 168 beva->write_added = 0; 169 event_base_del_virtual_(bev->ev_base); 170 } 171 } 172 173 static void 174 bev_async_del_read(struct bufferevent_async *beva) 175 { 176 struct bufferevent *bev = &beva->bev.bev; 177 178 if (beva->read_added) { 179 beva->read_added = 0; 180 event_base_del_virtual_(bev->ev_base); 181 } 182 } 183 184 static void 185 bev_async_add_write(struct bufferevent_async *beva) 186 { 187 struct bufferevent *bev = &beva->bev.bev; 188 189 if (!beva->write_added) { 190 beva->write_added = 1; 191 event_base_add_virtual_(bev->ev_base); 192 } 193 } 194 195 static void 196 bev_async_add_read(struct bufferevent_async *beva) 197 { 198 struct bufferevent *bev = &beva->bev.bev; 199 200 if (!beva->read_added) { 201 beva->read_added = 1; 202 event_base_add_virtual_(bev->ev_base); 203 } 204 } 205 206 static void 207 bev_async_consider_writing(struct bufferevent_async *beva) 208 { 209 size_t at_most; 210 int limit; 211 struct bufferevent *bev = &beva->bev.bev; 212 213 /* Don't write if there's a write in progress, or we do not 214 * want to write, or when there's nothing left to write. */ 215 if (beva->write_in_progress || beva->bev.connecting) 216 return; 217 if (!beva->ok || !(bev->enabled&EV_WRITE) || 218 !evbuffer_get_length(bev->output)) { 219 bev_async_del_write(beva); 220 return; 221 } 222 223 at_most = evbuffer_get_length(bev->output); 224 225 /* This is safe so long as bufferevent_get_write_max never returns 226 * more than INT_MAX. That's true for now. XXXX */ 227 limit = (int)bufferevent_get_write_max_(&beva->bev); 228 if (at_most >= (size_t)limit && limit >= 0) 229 at_most = limit; 230 231 if (beva->bev.write_suspended) { 232 bev_async_del_write(beva); 233 return; 234 } 235 236 /* XXXX doesn't respect low-water mark very well. */ 237 bufferevent_incref_(bev); 238 if (evbuffer_launch_write_(bev->output, at_most, 239 &beva->write_overlapped)) { 240 bufferevent_decref_(bev); 241 beva->ok = 0; 242 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 243 } else { 244 beva->write_in_progress = at_most; 245 bufferevent_decrement_write_buckets_(&beva->bev, at_most); 246 bev_async_add_write(beva); 247 } 248 } 249 250 static void 251 bev_async_consider_reading(struct bufferevent_async *beva) 252 { 253 size_t cur_size; 254 size_t read_high; 255 size_t at_most; 256 int limit; 257 struct bufferevent *bev = &beva->bev.bev; 258 259 /* Don't read if there is a read in progress, or we do not 260 * want to read. */ 261 if (beva->read_in_progress || beva->bev.connecting) 262 return; 263 if (!beva->ok || !(bev->enabled&EV_READ)) { 264 bev_async_del_read(beva); 265 return; 266 } 267 268 /* Don't read if we're full */ 269 cur_size = evbuffer_get_length(bev->input); 270 read_high = bev->wm_read.high; 271 if (read_high) { 272 if (cur_size >= read_high) { 273 bev_async_del_read(beva); 274 return; 275 } 276 at_most = read_high - cur_size; 277 } else { 278 at_most = 16384; /* FIXME totally magic. */ 279 } 280 281 /* XXXX This over-commits. */ 282 /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 283 limit = (int)bufferevent_get_read_max_(&beva->bev); 284 if (at_most >= (size_t)limit && limit >= 0) 285 at_most = limit; 286 287 if (beva->bev.read_suspended) { 288 bev_async_del_read(beva); 289 return; 290 } 291 292 bufferevent_incref_(bev); 293 if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 294 beva->ok = 0; 295 be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); 296 bufferevent_decref_(bev); 297 } else { 298 beva->read_in_progress = at_most; 299 bufferevent_decrement_read_buckets_(&beva->bev, at_most); 300 bev_async_add_read(beva); 301 } 302 303 return; 304 } 305 306 static void 307 be_async_outbuf_callback(struct evbuffer *buf, 308 const struct evbuffer_cb_info *cbinfo, 309 void *arg) 310 { 311 struct bufferevent *bev = arg; 312 struct bufferevent_async *bev_async = upcast(bev); 313 314 /* If we added data to the outbuf and were not writing before, 315 * we may want to write now. */ 316 317 bufferevent_incref_and_lock_(bev); 318 319 if (cbinfo->n_added) 320 bev_async_consider_writing(bev_async); 321 322 bufferevent_decref_and_unlock_(bev); 323 } 324 325 static void 326 be_async_inbuf_callback(struct evbuffer *buf, 327 const struct evbuffer_cb_info *cbinfo, 328 void *arg) 329 { 330 struct bufferevent *bev = arg; 331 struct bufferevent_async *bev_async = upcast(bev); 332 333 /* If we drained data from the inbuf and were not reading before, 334 * we may want to read now */ 335 336 bufferevent_incref_and_lock_(bev); 337 338 if (cbinfo->n_deleted) 339 bev_async_consider_reading(bev_async); 340 341 bufferevent_decref_and_unlock_(bev); 342 } 343 344 static int 345 be_async_enable(struct bufferevent *buf, short what) 346 { 347 struct bufferevent_async *bev_async = upcast(buf); 348 349 if (!bev_async->ok) 350 return -1; 351 352 if (bev_async->bev.connecting) { 353 /* Don't launch anything during connection attempts. */ 354 return 0; 355 } 356 357 if (what & EV_READ) 358 BEV_RESET_GENERIC_READ_TIMEOUT(buf); 359 if (what & EV_WRITE) 360 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 361 362 /* If we newly enable reading or writing, and we aren't reading or 363 writing already, consider launching a new read or write. */ 364 365 if (what & EV_READ) 366 bev_async_consider_reading(bev_async); 367 if (what & EV_WRITE) 368 bev_async_consider_writing(bev_async); 369 return 0; 370 } 371 372 static int 373 be_async_disable(struct bufferevent *bev, short what) 374 { 375 struct bufferevent_async *bev_async = upcast(bev); 376 /* XXXX If we disable reading or writing, we may want to consider 377 * canceling any in-progress read or write operation, though it might 378 * not work. */ 379 380 if (what & EV_READ) { 381 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 382 bev_async_del_read(bev_async); 383 } 384 if (what & EV_WRITE) { 385 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 386 bev_async_del_write(bev_async); 387 } 388 389 return 0; 390 } 391 392 static void 393 be_async_destruct(struct bufferevent *bev) 394 { 395 struct bufferevent_async *bev_async = upcast(bev); 396 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 397 evutil_socket_t fd; 398 399 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 400 !upcast(bev)->read_in_progress); 401 402 bev_async_del_read(bev_async); 403 bev_async_del_write(bev_async); 404 405 fd = evbuffer_overlapped_get_fd_(bev->input); 406 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 407 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 408 evutil_closesocket(fd); 409 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 410 } 411 } 412 413 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 414 * we use WSAGetOverlappedResult to translate. */ 415 static void 416 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 417 { 418 DWORD bytes, flags; 419 evutil_socket_t fd; 420 421 fd = evbuffer_overlapped_get_fd_(bev->input); 422 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 423 } 424 425 static int 426 be_async_flush(struct bufferevent *bev, short what, 427 enum bufferevent_flush_mode mode) 428 { 429 return 0; 430 } 431 432 static void 433 connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 434 ev_ssize_t nbytes, int ok) 435 { 436 struct bufferevent_async *bev_a = upcast_connect(eo); 437 struct bufferevent *bev = &bev_a->bev.bev; 438 evutil_socket_t sock; 439 440 BEV_LOCK(bev); 441 442 EVUTIL_ASSERT(bev_a->bev.connecting); 443 bev_a->bev.connecting = 0; 444 sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 445 /* XXXX Handle error? */ 446 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 447 448 if (ok) 449 bufferevent_async_set_connected_(bev); 450 else 451 bev_async_set_wsa_error(bev, eo); 452 453 be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 454 455 event_base_del_virtual_(bev->ev_base); 456 457 bufferevent_decref_and_unlock_(bev); 458 } 459 460 static void 461 read_complete(struct event_overlapped *eo, ev_uintptr_t key, 462 ev_ssize_t nbytes, int ok) 463 { 464 struct bufferevent_async *bev_a = upcast_read(eo); 465 struct bufferevent *bev = &bev_a->bev.bev; 466 short what = BEV_EVENT_READING; 467 ev_ssize_t amount_unread; 468 BEV_LOCK(bev); 469 EVUTIL_ASSERT(bev_a->read_in_progress); 470 471 amount_unread = bev_a->read_in_progress - nbytes; 472 evbuffer_commit_read_(bev->input, nbytes); 473 bev_a->read_in_progress = 0; 474 if (amount_unread) 475 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 476 477 if (!ok) 478 bev_async_set_wsa_error(bev, eo); 479 480 if (bev_a->ok) { 481 if (ok && nbytes) { 482 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 483 be_async_trigger_nolock(bev, EV_READ, 0); 484 bev_async_consider_reading(bev_a); 485 } else if (!ok) { 486 what |= BEV_EVENT_ERROR; 487 bev_a->ok = 0; 488 be_async_run_eventcb(bev, what, 0); 489 } else if (!nbytes) { 490 what |= BEV_EVENT_EOF; 491 bev_a->ok = 0; 492 be_async_run_eventcb(bev, what, 0); 493 } 494 } 495 496 bufferevent_decref_and_unlock_(bev); 497 } 498 499 static void 500 write_complete(struct event_overlapped *eo, ev_uintptr_t key, 501 ev_ssize_t nbytes, int ok) 502 { 503 struct bufferevent_async *bev_a = upcast_write(eo); 504 struct bufferevent *bev = &bev_a->bev.bev; 505 short what = BEV_EVENT_WRITING; 506 ev_ssize_t amount_unwritten; 507 508 BEV_LOCK(bev); 509 EVUTIL_ASSERT(bev_a->write_in_progress); 510 511 amount_unwritten = bev_a->write_in_progress - nbytes; 512 evbuffer_commit_write_(bev->output, nbytes); 513 bev_a->write_in_progress = 0; 514 515 if (amount_unwritten) 516 bufferevent_decrement_write_buckets_(&bev_a->bev, 517 -amount_unwritten); 518 519 520 if (!ok) 521 bev_async_set_wsa_error(bev, eo); 522 523 if (bev_a->ok) { 524 if (ok && nbytes) { 525 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 526 be_async_trigger_nolock(bev, EV_WRITE, 0); 527 bev_async_consider_writing(bev_a); 528 } else if (!ok) { 529 what |= BEV_EVENT_ERROR; 530 bev_a->ok = 0; 531 be_async_run_eventcb(bev, what, 0); 532 } else if (!nbytes) { 533 what |= BEV_EVENT_EOF; 534 bev_a->ok = 0; 535 be_async_run_eventcb(bev, what, 0); 536 } 537 } 538 539 bufferevent_decref_and_unlock_(bev); 540 } 541 542 struct bufferevent * 543 bufferevent_async_new_(struct event_base *base, 544 evutil_socket_t fd, int options) 545 { 546 struct bufferevent_async *bev_a; 547 struct bufferevent *bev; 548 struct event_iocp_port *iocp; 549 550 options |= BEV_OPT_THREADSAFE; 551 552 if (!(iocp = event_base_get_iocp_(base))) 553 return NULL; 554 555 if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 556 if (fatal_error(GetLastError())) 557 return NULL; 558 } 559 560 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 561 return NULL; 562 563 bev = &bev_a->bev.bev; 564 if (!(bev->input = evbuffer_overlapped_new_(fd))) { 565 mm_free(bev_a); 566 return NULL; 567 } 568 if (!(bev->output = evbuffer_overlapped_new_(fd))) { 569 evbuffer_free(bev->input); 570 mm_free(bev_a); 571 return NULL; 572 } 573 574 if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 575 options)<0) 576 goto err; 577 578 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 579 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 580 581 event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 582 event_overlapped_init_(&bev_a->read_overlapped, read_complete); 583 event_overlapped_init_(&bev_a->write_overlapped, write_complete); 584 585 bufferevent_init_generic_timeout_cbs_(bev); 586 587 bev_a->ok = fd >= 0; 588 589 return bev; 590 err: 591 bufferevent_free(&bev_a->bev.bev); 592 return NULL; 593 } 594 595 void 596 bufferevent_async_set_connected_(struct bufferevent *bev) 597 { 598 struct bufferevent_async *bev_async = upcast(bev); 599 bev_async->ok = 1; 600 /* Now's a good time to consider reading/writing */ 601 be_async_enable(bev, bev->enabled); 602 } 603 604 int 605 bufferevent_async_can_connect_(struct bufferevent *bev) 606 { 607 const struct win32_extension_fns *ext = 608 event_get_win32_extension_fns_(); 609 610 if (BEV_IS_ASYNC(bev) && 611 event_base_get_iocp_(bev->ev_base) && 612 ext && ext->ConnectEx) 613 return 1; 614 615 return 0; 616 } 617 618 int 619 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 620 const struct sockaddr *sa, int socklen) 621 { 622 BOOL rc; 623 struct bufferevent_async *bev_async = upcast(bev); 624 struct sockaddr_storage ss; 625 const struct win32_extension_fns *ext = 626 event_get_win32_extension_fns_(); 627 628 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 629 630 /* ConnectEx() requires that the socket be bound to an address 631 * with bind() before using, otherwise it will fail. We attempt 632 * to issue a bind() here, taking into account that the error 633 * code is set to WSAEINVAL when the socket is already bound. */ 634 memset(&ss, 0, sizeof(ss)); 635 if (sa->sa_family == AF_INET) { 636 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 637 sin->sin_family = AF_INET; 638 sin->sin_addr.s_addr = INADDR_ANY; 639 } else if (sa->sa_family == AF_INET6) { 640 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 641 sin6->sin6_family = AF_INET6; 642 sin6->sin6_addr = in6addr_any; 643 } else { 644 /* Well, the user will have to bind() */ 645 return -1; 646 } 647 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 648 WSAGetLastError() != WSAEINVAL) 649 return -1; 650 651 event_base_add_virtual_(bev->ev_base); 652 bufferevent_incref_(bev); 653 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 654 &bev_async->connect_overlapped.overlapped); 655 if (rc || WSAGetLastError() == ERROR_IO_PENDING) 656 return 0; 657 658 event_base_del_virtual_(bev->ev_base); 659 bufferevent_decref_(bev); 660 661 return -1; 662 } 663 664 static int 665 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 666 union bufferevent_ctrl_data *data) 667 { 668 switch (op) { 669 case BEV_CTRL_GET_FD: 670 data->fd = evbuffer_overlapped_get_fd_(bev->input); 671 return 0; 672 case BEV_CTRL_SET_FD: { 673 struct bufferevent_async *bev_a = upcast(bev); 674 struct event_iocp_port *iocp; 675 676 if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 677 return 0; 678 if (!(iocp = event_base_get_iocp_(bev->ev_base))) 679 return -1; 680 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { 681 if (fatal_error(GetLastError())) 682 return -1; 683 } 684 evbuffer_overlapped_set_fd_(bev->input, data->fd); 685 evbuffer_overlapped_set_fd_(bev->output, data->fd); 686 bev_a->ok = data->fd >= 0; 687 return 0; 688 } 689 case BEV_CTRL_CANCEL_ALL: { 690 struct bufferevent_async *bev_a = upcast(bev); 691 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 692 if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && 693 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 694 closesocket(fd); 695 evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); 696 } 697 bev_a->ok = 0; 698 return 0; 699 } 700 case BEV_CTRL_GET_UNDERLYING: 701 default: 702 return -1; 703 } 704 } 705 706 707