1 /* 2 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "event2/event-config.h" 29 #include "evconfig-private.h" 30 31 #include <sys/types.h> 32 33 #ifdef EVENT__HAVE_SYS_TIME_H 34 #include <sys/time.h> 35 #endif 36 37 #include <errno.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #ifdef EVENT__HAVE_STDARG_H 42 #include <stdarg.h> 43 #endif 44 45 #ifdef _WIN32 46 #include <winsock2.h> 47 #endif 48 49 #include "event2/util.h" 50 #include "event2/buffer.h" 51 #include "event2/buffer_compat.h" 52 #include "event2/bufferevent.h" 53 #include "event2/bufferevent_struct.h" 54 #include "event2/bufferevent_compat.h" 55 #include "event2/event.h" 56 #include "event-internal.h" 57 #include "log-internal.h" 58 #include "mm-internal.h" 59 #include "bufferevent-internal.h" 60 #include "evbuffer-internal.h" 61 #include "util-internal.h" 62 63 static void bufferevent_cancel_all_(struct bufferevent *bev); 64 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); 65 66 void 67 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 68 { 69 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 70 BEV_LOCK(bufev); 71 if (!bufev_private->read_suspended) 72 bufev->be_ops->disable(bufev, EV_READ); 73 bufev_private->read_suspended |= what; 74 BEV_UNLOCK(bufev); 75 } 76 77 void 78 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 79 { 80 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 81 BEV_LOCK(bufev); 82 bufev_private->read_suspended &= ~what; 83 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) 84 bufev->be_ops->enable(bufev, EV_READ); 85 BEV_UNLOCK(bufev); 86 } 87 88 void 89 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 90 { 91 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 92 BEV_LOCK(bufev); 93 if (!bufev_private->write_suspended) 94 bufev->be_ops->disable(bufev, EV_WRITE); 95 bufev_private->write_suspended |= what; 96 BEV_UNLOCK(bufev); 97 } 98 99 void 100 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 101 { 102 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 103 BEV_LOCK(bufev); 104 bufev_private->write_suspended &= ~what; 105 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) 106 bufev->be_ops->enable(bufev, EV_WRITE); 107 BEV_UNLOCK(bufev); 108 } 109 110 /** 111 * Sometimes bufferevent's implementation can overrun high watermarks 112 * (one of examples is openssl) and in this case if the read callback 113 * will not handle enough data do over condition above the read 114 * callback will never be called again (due to suspend above). 115 * 116 * To avoid this we are scheduling read callback again here, but only 117 * from the user callback to avoid multiple scheduling: 118 * - when the data had been added to it 119 * - when the data had been drained from it (user specified read callback) 120 */ 121 static void bufferevent_inbuf_wm_check(struct bufferevent *bev) 122 { 123 if (!bev->wm_read.high) 124 return; 125 if (!(bev->enabled & EV_READ)) 126 return; 127 if (evbuffer_get_length(bev->input) < bev->wm_read.high) 128 return; 129 130 bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); 131 } 132 133 /* Callback to implement watermarks on the input buffer. Only enabled 134 * if the watermark is set. */ 135 static void 136 bufferevent_inbuf_wm_cb(struct evbuffer *buf, 137 const struct evbuffer_cb_info *cbinfo, 138 void *arg) 139 { 140 struct bufferevent *bufev = arg; 141 size_t size; 142 143 size = evbuffer_get_length(buf); 144 145 if (size >= bufev->wm_read.high) 146 bufferevent_wm_suspend_read(bufev); 147 else 148 bufferevent_wm_unsuspend_read(bufev); 149 } 150 151 static void 152 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) 153 { 154 struct bufferevent_private *bufev_private = arg; 155 struct bufferevent *bufev = &bufev_private->bev; 156 157 BEV_LOCK(bufev); 158 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 159 bufev->errorcb) { 160 /* The "connected" happened before any reads or writes, so 161 send it first. */ 162 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 163 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); 164 } 165 if (bufev_private->readcb_pending && bufev->readcb) { 166 bufev_private->readcb_pending = 0; 167 bufev->readcb(bufev, bufev->cbarg); 168 bufferevent_inbuf_wm_check(bufev); 169 } 170 if (bufev_private->writecb_pending && bufev->writecb) { 171 bufev_private->writecb_pending = 0; 172 bufev->writecb(bufev, bufev->cbarg); 173 } 174 if (bufev_private->eventcb_pending && bufev->errorcb) { 175 short what = bufev_private->eventcb_pending; 176 int err = bufev_private->errno_pending; 177 bufev_private->eventcb_pending = 0; 178 bufev_private->errno_pending = 0; 179 EVUTIL_SET_SOCKET_ERROR(err); 180 bufev->errorcb(bufev, what, bufev->cbarg); 181 } 182 bufferevent_decref_and_unlock_(bufev); 183 } 184 185 static void 186 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) 187 { 188 struct bufferevent_private *bufev_private = arg; 189 struct bufferevent *bufev = &bufev_private->bev; 190 191 BEV_LOCK(bufev); 192 #define UNLOCKED(stmt) \ 193 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) 194 195 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 196 bufev->errorcb) { 197 /* The "connected" happened before any reads or writes, so 198 send it first. */ 199 bufferevent_event_cb errorcb = bufev->errorcb; 200 void *cbarg = bufev->cbarg; 201 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 202 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); 203 } 204 if (bufev_private->readcb_pending && bufev->readcb) { 205 bufferevent_data_cb readcb = bufev->readcb; 206 void *cbarg = bufev->cbarg; 207 bufev_private->readcb_pending = 0; 208 UNLOCKED(readcb(bufev, cbarg)); 209 bufferevent_inbuf_wm_check(bufev); 210 } 211 if (bufev_private->writecb_pending && bufev->writecb) { 212 bufferevent_data_cb writecb = bufev->writecb; 213 void *cbarg = bufev->cbarg; 214 bufev_private->writecb_pending = 0; 215 UNLOCKED(writecb(bufev, cbarg)); 216 } 217 if (bufev_private->eventcb_pending && bufev->errorcb) { 218 bufferevent_event_cb errorcb = bufev->errorcb; 219 void *cbarg = bufev->cbarg; 220 short what = bufev_private->eventcb_pending; 221 int err = bufev_private->errno_pending; 222 bufev_private->eventcb_pending = 0; 223 bufev_private->errno_pending = 0; 224 EVUTIL_SET_SOCKET_ERROR(err); 225 UNLOCKED(errorcb(bufev,what,cbarg)); 226 } 227 bufferevent_decref_and_unlock_(bufev); 228 #undef UNLOCKED 229 } 230 231 #define SCHEDULE_DEFERRED(bevp) \ 232 do { \ 233 if (event_deferred_cb_schedule_( \ 234 (bevp)->bev.ev_base, \ 235 &(bevp)->deferred)) \ 236 bufferevent_incref_(&(bevp)->bev); \ 237 } while (0) 238 239 240 void 241 bufferevent_run_readcb_(struct bufferevent *bufev, int options) 242 { 243 /* Requires that we hold the lock and a reference */ 244 struct bufferevent_private *p = BEV_UPCAST(bufev); 245 if (bufev->readcb == NULL) 246 return; 247 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 248 p->readcb_pending = 1; 249 SCHEDULE_DEFERRED(p); 250 } else { 251 bufev->readcb(bufev, bufev->cbarg); 252 bufferevent_inbuf_wm_check(bufev); 253 } 254 } 255 256 void 257 bufferevent_run_writecb_(struct bufferevent *bufev, int options) 258 { 259 /* Requires that we hold the lock and a reference */ 260 struct bufferevent_private *p = BEV_UPCAST(bufev); 261 if (bufev->writecb == NULL) 262 return; 263 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 264 p->writecb_pending = 1; 265 SCHEDULE_DEFERRED(p); 266 } else { 267 bufev->writecb(bufev, bufev->cbarg); 268 } 269 } 270 271 #define BEV_TRIG_ALL_OPTS ( \ 272 BEV_TRIG_IGNORE_WATERMARKS| \ 273 BEV_TRIG_DEFER_CALLBACKS \ 274 ) 275 276 void 277 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) 278 { 279 bufferevent_incref_and_lock_(bufev); 280 bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS); 281 bufferevent_decref_and_unlock_(bufev); 282 } 283 284 void 285 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) 286 { 287 /* Requires that we hold the lock and a reference */ 288 struct bufferevent_private *p = BEV_UPCAST(bufev); 289 if (bufev->errorcb == NULL) 290 return; 291 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 292 p->eventcb_pending |= what; 293 p->errno_pending = EVUTIL_SOCKET_ERROR(); 294 SCHEDULE_DEFERRED(p); 295 } else { 296 bufev->errorcb(bufev, what, bufev->cbarg); 297 } 298 } 299 300 void 301 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options) 302 { 303 bufferevent_incref_and_lock_(bufev); 304 bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS); 305 bufferevent_decref_and_unlock_(bufev); 306 } 307 308 int 309 bufferevent_init_common_(struct bufferevent_private *bufev_private, 310 struct event_base *base, 311 const struct bufferevent_ops *ops, 312 enum bufferevent_options options) 313 { 314 struct bufferevent *bufev = &bufev_private->bev; 315 316 if (!bufev->input) { 317 if ((bufev->input = evbuffer_new()) == NULL) 318 goto err; 319 } 320 321 if (!bufev->output) { 322 if ((bufev->output = evbuffer_new()) == NULL) 323 goto err; 324 } 325 326 bufev_private->refcnt = 1; 327 bufev->ev_base = base; 328 329 /* Disable timeouts. */ 330 evutil_timerclear(&bufev->timeout_read); 331 evutil_timerclear(&bufev->timeout_write); 332 333 bufev->be_ops = ops; 334 335 if (bufferevent_ratelim_init_(bufev_private)) 336 goto err; 337 338 /* 339 * Set to EV_WRITE so that using bufferevent_write is going to 340 * trigger a callback. Reading needs to be explicitly enabled 341 * because otherwise no data will be available. 342 */ 343 bufev->enabled = EV_WRITE; 344 345 #ifndef EVENT__DISABLE_THREAD_SUPPORT 346 if (options & BEV_OPT_THREADSAFE) { 347 if (bufferevent_enable_locking_(bufev, NULL) < 0) 348 goto err; 349 } 350 #endif 351 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) 352 == BEV_OPT_UNLOCK_CALLBACKS) { 353 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); 354 goto err; 355 } 356 if (options & BEV_OPT_UNLOCK_CALLBACKS) 357 event_deferred_cb_init_( 358 &bufev_private->deferred, 359 event_base_get_npriorities(base) / 2, 360 bufferevent_run_deferred_callbacks_unlocked, 361 bufev_private); 362 else 363 event_deferred_cb_init_( 364 &bufev_private->deferred, 365 event_base_get_npriorities(base) / 2, 366 bufferevent_run_deferred_callbacks_locked, 367 bufev_private); 368 369 bufev_private->options = options; 370 371 evbuffer_set_parent_(bufev->input, bufev); 372 evbuffer_set_parent_(bufev->output, bufev); 373 374 return 0; 375 376 err: 377 if (bufev->input) { 378 evbuffer_free(bufev->input); 379 bufev->input = NULL; 380 } 381 if (bufev->output) { 382 evbuffer_free(bufev->output); 383 bufev->output = NULL; 384 } 385 return -1; 386 } 387 388 void 389 bufferevent_setcb(struct bufferevent *bufev, 390 bufferevent_data_cb readcb, bufferevent_data_cb writecb, 391 bufferevent_event_cb eventcb, void *cbarg) 392 { 393 BEV_LOCK(bufev); 394 395 bufev->readcb = readcb; 396 bufev->writecb = writecb; 397 bufev->errorcb = eventcb; 398 399 bufev->cbarg = cbarg; 400 BEV_UNLOCK(bufev); 401 } 402 403 void 404 bufferevent_getcb(struct bufferevent *bufev, 405 bufferevent_data_cb *readcb_ptr, 406 bufferevent_data_cb *writecb_ptr, 407 bufferevent_event_cb *eventcb_ptr, 408 void **cbarg_ptr) 409 { 410 BEV_LOCK(bufev); 411 if (readcb_ptr) 412 *readcb_ptr = bufev->readcb; 413 if (writecb_ptr) 414 *writecb_ptr = bufev->writecb; 415 if (eventcb_ptr) 416 *eventcb_ptr = bufev->errorcb; 417 if (cbarg_ptr) 418 *cbarg_ptr = bufev->cbarg; 419 420 BEV_UNLOCK(bufev); 421 } 422 423 struct evbuffer * 424 bufferevent_get_input(struct bufferevent *bufev) 425 { 426 return bufev->input; 427 } 428 429 struct evbuffer * 430 bufferevent_get_output(struct bufferevent *bufev) 431 { 432 return bufev->output; 433 } 434 435 struct event_base * 436 bufferevent_get_base(struct bufferevent *bufev) 437 { 438 return bufev->ev_base; 439 } 440 441 int 442 bufferevent_get_priority(const struct bufferevent *bufev) 443 { 444 if (event_initialized(&bufev->ev_read)) { 445 return event_get_priority(&bufev->ev_read); 446 } else { 447 return event_base_get_npriorities(bufev->ev_base) / 2; 448 } 449 } 450 451 int 452 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 453 { 454 if (evbuffer_add(bufev->output, data, size) == -1) 455 return (-1); 456 457 return 0; 458 } 459 460 int 461 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 462 { 463 if (evbuffer_add_buffer(bufev->output, buf) == -1) 464 return (-1); 465 466 return 0; 467 } 468 469 size_t 470 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 471 { 472 return (evbuffer_remove(bufev->input, data, size)); 473 } 474 475 int 476 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) 477 { 478 return (evbuffer_add_buffer(buf, bufev->input)); 479 } 480 481 int 482 bufferevent_enable(struct bufferevent *bufev, short event) 483 { 484 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 485 short impl_events = event; 486 int r = 0; 487 488 bufferevent_incref_and_lock_(bufev); 489 if (bufev_private->read_suspended) 490 impl_events &= ~EV_READ; 491 if (bufev_private->write_suspended) 492 impl_events &= ~EV_WRITE; 493 494 bufev->enabled |= event; 495 496 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) 497 r = -1; 498 if (r) 499 event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev)); 500 501 bufferevent_decref_and_unlock_(bufev); 502 return r; 503 } 504 505 int 506 bufferevent_set_timeouts(struct bufferevent *bufev, 507 const struct timeval *tv_read, 508 const struct timeval *tv_write) 509 { 510 int r = 0; 511 BEV_LOCK(bufev); 512 if (tv_read) { 513 bufev->timeout_read = *tv_read; 514 } else { 515 evutil_timerclear(&bufev->timeout_read); 516 } 517 if (tv_write) { 518 bufev->timeout_write = *tv_write; 519 } else { 520 evutil_timerclear(&bufev->timeout_write); 521 } 522 523 if (bufev->be_ops->adj_timeouts) 524 r = bufev->be_ops->adj_timeouts(bufev); 525 BEV_UNLOCK(bufev); 526 527 return r; 528 } 529 530 531 /* Obsolete; use bufferevent_set_timeouts */ 532 void 533 bufferevent_settimeout(struct bufferevent *bufev, 534 int timeout_read, int timeout_write) 535 { 536 struct timeval tv_read, tv_write; 537 struct timeval *ptv_read = NULL, *ptv_write = NULL; 538 539 memset(&tv_read, 0, sizeof(tv_read)); 540 memset(&tv_write, 0, sizeof(tv_write)); 541 542 if (timeout_read) { 543 tv_read.tv_sec = timeout_read; 544 ptv_read = &tv_read; 545 } 546 if (timeout_write) { 547 tv_write.tv_sec = timeout_write; 548 ptv_write = &tv_write; 549 } 550 551 bufferevent_set_timeouts(bufev, ptv_read, ptv_write); 552 } 553 554 555 int 556 bufferevent_disable_hard_(struct bufferevent *bufev, short event) 557 { 558 int r = 0; 559 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 560 561 BEV_LOCK(bufev); 562 bufev->enabled &= ~event; 563 564 bufev_private->connecting = 0; 565 if (bufev->be_ops->disable(bufev, event) < 0) 566 r = -1; 567 568 BEV_UNLOCK(bufev); 569 return r; 570 } 571 572 int 573 bufferevent_disable(struct bufferevent *bufev, short event) 574 { 575 int r = 0; 576 577 BEV_LOCK(bufev); 578 bufev->enabled &= ~event; 579 580 if (bufev->be_ops->disable(bufev, event) < 0) 581 r = -1; 582 if (r) 583 event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev)); 584 585 BEV_UNLOCK(bufev); 586 return r; 587 } 588 589 /* 590 * Sets the water marks 591 */ 592 593 void 594 bufferevent_setwatermark(struct bufferevent *bufev, short events, 595 size_t lowmark, size_t highmark) 596 { 597 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 598 599 BEV_LOCK(bufev); 600 if (events & EV_WRITE) { 601 bufev->wm_write.low = lowmark; 602 bufev->wm_write.high = highmark; 603 } 604 605 if (events & EV_READ) { 606 bufev->wm_read.low = lowmark; 607 bufev->wm_read.high = highmark; 608 609 if (highmark) { 610 /* There is now a new high-water mark for read. 611 enable the callback if needed, and see if we should 612 suspend/bufferevent_wm_unsuspend. */ 613 614 if (bufev_private->read_watermarks_cb == NULL) { 615 bufev_private->read_watermarks_cb = 616 evbuffer_add_cb(bufev->input, 617 bufferevent_inbuf_wm_cb, 618 bufev); 619 } 620 evbuffer_cb_set_flags(bufev->input, 621 bufev_private->read_watermarks_cb, 622 EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); 623 624 if (evbuffer_get_length(bufev->input) >= highmark) 625 bufferevent_wm_suspend_read(bufev); 626 else if (evbuffer_get_length(bufev->input) < highmark) 627 bufferevent_wm_unsuspend_read(bufev); 628 } else { 629 /* There is now no high-water mark for read. */ 630 if (bufev_private->read_watermarks_cb) 631 evbuffer_cb_clear_flags(bufev->input, 632 bufev_private->read_watermarks_cb, 633 EVBUFFER_CB_ENABLED); 634 bufferevent_wm_unsuspend_read(bufev); 635 } 636 } 637 BEV_UNLOCK(bufev); 638 } 639 640 int 641 bufferevent_getwatermark(struct bufferevent *bufev, short events, 642 size_t *lowmark, size_t *highmark) 643 { 644 if (events == EV_WRITE) { 645 BEV_LOCK(bufev); 646 if (lowmark) 647 *lowmark = bufev->wm_write.low; 648 if (highmark) 649 *highmark = bufev->wm_write.high; 650 BEV_UNLOCK(bufev); 651 return 0; 652 } 653 654 if (events == EV_READ) { 655 BEV_LOCK(bufev); 656 if (lowmark) 657 *lowmark = bufev->wm_read.low; 658 if (highmark) 659 *highmark = bufev->wm_read.high; 660 BEV_UNLOCK(bufev); 661 return 0; 662 } 663 return -1; 664 } 665 666 int 667 bufferevent_flush(struct bufferevent *bufev, 668 short iotype, 669 enum bufferevent_flush_mode mode) 670 { 671 int r = -1; 672 BEV_LOCK(bufev); 673 if (bufev->be_ops->flush) 674 r = bufev->be_ops->flush(bufev, iotype, mode); 675 BEV_UNLOCK(bufev); 676 return r; 677 } 678 679 void 680 bufferevent_incref_and_lock_(struct bufferevent *bufev) 681 { 682 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 683 BEV_LOCK(bufev); 684 ++bufev_private->refcnt; 685 } 686 687 #if 0 688 static void 689 bufferevent_transfer_lock_ownership_(struct bufferevent *donor, 690 struct bufferevent *recipient) 691 { 692 struct bufferevent_private *d = BEV_UPCAST(donor); 693 struct bufferevent_private *r = BEV_UPCAST(recipient); 694 if (d->lock != r->lock) 695 return; 696 if (r->own_lock) 697 return; 698 if (d->own_lock) { 699 d->own_lock = 0; 700 r->own_lock = 1; 701 } 702 } 703 #endif 704 705 int 706 bufferevent_decref_and_unlock_(struct bufferevent *bufev) 707 { 708 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 709 int n_cbs = 0; 710 #define MAX_CBS 16 711 struct event_callback *cbs[MAX_CBS]; 712 713 EVUTIL_ASSERT(bufev_private->refcnt > 0); 714 715 if (--bufev_private->refcnt) { 716 BEV_UNLOCK(bufev); 717 return 0; 718 } 719 720 if (bufev->be_ops->unlink) 721 bufev->be_ops->unlink(bufev); 722 723 /* Okay, we're out of references. Let's finalize this once all the 724 * callbacks are done running. */ 725 cbs[0] = &bufev->ev_read.ev_evcallback; 726 cbs[1] = &bufev->ev_write.ev_evcallback; 727 cbs[2] = &bufev_private->deferred; 728 n_cbs = 3; 729 if (bufev_private->rate_limiting) { 730 struct event *e = &bufev_private->rate_limiting->refill_bucket_event; 731 if (event_initialized(e)) 732 cbs[n_cbs++] = &e->ev_evcallback; 733 } 734 n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); 735 n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); 736 737 event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, 738 bufferevent_finalize_cb_); 739 740 #undef MAX_CBS 741 BEV_UNLOCK(bufev); 742 743 return 1; 744 } 745 746 static void 747 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) 748 { 749 struct bufferevent *bufev = arg_; 750 struct bufferevent *underlying; 751 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 752 753 BEV_LOCK(bufev); 754 underlying = bufferevent_get_underlying(bufev); 755 756 /* Clean up the shared info */ 757 if (bufev->be_ops->destruct) 758 bufev->be_ops->destruct(bufev); 759 760 /* XXX what happens if refcnt for these buffers is > 1? 761 * The buffers can share a lock with this bufferevent object, 762 * but the lock might be destroyed below. */ 763 /* evbuffer will free the callbacks */ 764 evbuffer_free(bufev->input); 765 evbuffer_free(bufev->output); 766 767 if (bufev_private->rate_limiting) { 768 if (bufev_private->rate_limiting->group) 769 bufferevent_remove_from_rate_limit_group_internal_(bufev,0); 770 mm_free(bufev_private->rate_limiting); 771 bufev_private->rate_limiting = NULL; 772 } 773 774 775 BEV_UNLOCK(bufev); 776 777 if (bufev_private->own_lock) 778 EVTHREAD_FREE_LOCK(bufev_private->lock, 779 EVTHREAD_LOCKTYPE_RECURSIVE); 780 781 /* Free the actual allocated memory. */ 782 mm_free(((char*)bufev) - bufev->be_ops->mem_offset); 783 784 /* Release the reference to underlying now that we no longer need the 785 * reference to it. We wait this long mainly in case our lock is 786 * shared with underlying. 787 * 788 * The 'destruct' function will also drop a reference to underlying 789 * if BEV_OPT_CLOSE_ON_FREE is set. 790 * 791 * XXX Should we/can we just refcount evbuffer/bufferevent locks? 792 * It would probably save us some headaches. 793 */ 794 if (underlying) 795 bufferevent_decref_(underlying); 796 } 797 798 int 799 bufferevent_decref(struct bufferevent *bufev) 800 { 801 BEV_LOCK(bufev); 802 return bufferevent_decref_and_unlock_(bufev); 803 } 804 805 void 806 bufferevent_free(struct bufferevent *bufev) 807 { 808 BEV_LOCK(bufev); 809 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); 810 bufferevent_cancel_all_(bufev); 811 bufferevent_decref_and_unlock_(bufev); 812 } 813 814 void 815 bufferevent_incref(struct bufferevent *bufev) 816 { 817 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 818 819 /* XXX: now that this function is public, we might want to 820 * - return the count from this function 821 * - create a new function to atomically grab the current refcount 822 */ 823 BEV_LOCK(bufev); 824 ++bufev_private->refcnt; 825 BEV_UNLOCK(bufev); 826 } 827 828 int 829 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock) 830 { 831 #ifdef EVENT__DISABLE_THREAD_SUPPORT 832 return -1; 833 #else 834 struct bufferevent *underlying; 835 836 if (BEV_UPCAST(bufev)->lock) 837 return -1; 838 underlying = bufferevent_get_underlying(bufev); 839 840 if (!lock && underlying && BEV_UPCAST(underlying)->lock) { 841 lock = BEV_UPCAST(underlying)->lock; 842 BEV_UPCAST(bufev)->lock = lock; 843 BEV_UPCAST(bufev)->own_lock = 0; 844 } else if (!lock) { 845 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); 846 if (!lock) 847 return -1; 848 BEV_UPCAST(bufev)->lock = lock; 849 BEV_UPCAST(bufev)->own_lock = 1; 850 } else { 851 BEV_UPCAST(bufev)->lock = lock; 852 BEV_UPCAST(bufev)->own_lock = 0; 853 } 854 evbuffer_enable_locking(bufev->input, lock); 855 evbuffer_enable_locking(bufev->output, lock); 856 857 if (underlying && !BEV_UPCAST(underlying)->lock) 858 bufferevent_enable_locking_(underlying, lock); 859 860 return 0; 861 #endif 862 } 863 864 int 865 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) 866 { 867 union bufferevent_ctrl_data d; 868 int res = -1; 869 d.fd = fd; 870 BEV_LOCK(bev); 871 if (bev->be_ops->ctrl) 872 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); 873 if (res) 874 event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd)); 875 BEV_UNLOCK(bev); 876 return res; 877 } 878 879 evutil_socket_t 880 bufferevent_getfd(struct bufferevent *bev) 881 { 882 union bufferevent_ctrl_data d; 883 int res = -1; 884 d.fd = -1; 885 BEV_LOCK(bev); 886 if (bev->be_ops->ctrl) 887 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); 888 if (res) 889 event_debug(("%s: cannot get fd for %p", __func__, bev)); 890 BEV_UNLOCK(bev); 891 return (res<0) ? -1 : d.fd; 892 } 893 894 enum bufferevent_options 895 bufferevent_get_options_(struct bufferevent *bev) 896 { 897 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 898 enum bufferevent_options options; 899 900 BEV_LOCK(bev); 901 options = bev_p->options; 902 BEV_UNLOCK(bev); 903 return options; 904 } 905 906 907 static void 908 bufferevent_cancel_all_(struct bufferevent *bev) 909 { 910 union bufferevent_ctrl_data d; 911 memset(&d, 0, sizeof(d)); 912 BEV_LOCK(bev); 913 if (bev->be_ops->ctrl) 914 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); 915 BEV_UNLOCK(bev); 916 } 917 918 short 919 bufferevent_get_enabled(struct bufferevent *bufev) 920 { 921 short r; 922 BEV_LOCK(bufev); 923 r = bufev->enabled; 924 BEV_UNLOCK(bufev); 925 return r; 926 } 927 928 struct bufferevent * 929 bufferevent_get_underlying(struct bufferevent *bev) 930 { 931 union bufferevent_ctrl_data d; 932 int res = -1; 933 d.ptr = NULL; 934 BEV_LOCK(bev); 935 if (bev->be_ops->ctrl) 936 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); 937 BEV_UNLOCK(bev); 938 return (res<0) ? NULL : d.ptr; 939 } 940 941 static void 942 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) 943 { 944 struct bufferevent *bev = ctx; 945 bufferevent_incref_and_lock_(bev); 946 bufferevent_disable(bev, EV_READ); 947 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); 948 bufferevent_decref_and_unlock_(bev); 949 } 950 static void 951 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) 952 { 953 struct bufferevent *bev = ctx; 954 bufferevent_incref_and_lock_(bev); 955 bufferevent_disable(bev, EV_WRITE); 956 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); 957 bufferevent_decref_and_unlock_(bev); 958 } 959 960 void 961 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) 962 { 963 event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, 964 bufferevent_generic_read_timeout_cb, bev); 965 event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, 966 bufferevent_generic_write_timeout_cb, bev); 967 } 968 969 int 970 bufferevent_generic_adj_timeouts_(struct bufferevent *bev) 971 { 972 const short enabled = bev->enabled; 973 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 974 int r1=0, r2=0; 975 if ((enabled & EV_READ) && !bev_p->read_suspended && 976 evutil_timerisset(&bev->timeout_read)) 977 r1 = event_add(&bev->ev_read, &bev->timeout_read); 978 else 979 r1 = event_del(&bev->ev_read); 980 981 if ((enabled & EV_WRITE) && !bev_p->write_suspended && 982 evutil_timerisset(&bev->timeout_write) && 983 evbuffer_get_length(bev->output)) 984 r2 = event_add(&bev->ev_write, &bev->timeout_write); 985 else 986 r2 = event_del(&bev->ev_write); 987 if (r1 < 0 || r2 < 0) 988 return -1; 989 return 0; 990 } 991 992 int 993 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev) 994 { 995 int r = 0; 996 if (event_pending(&bev->ev_read, EV_READ, NULL)) { 997 if (evutil_timerisset(&bev->timeout_read)) { 998 if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0) 999 r = -1; 1000 } else { 1001 event_remove_timer(&bev->ev_read); 1002 } 1003 } 1004 if (event_pending(&bev->ev_write, EV_WRITE, NULL)) { 1005 if (evutil_timerisset(&bev->timeout_write)) { 1006 if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0) 1007 r = -1; 1008 } else { 1009 event_remove_timer(&bev->ev_write); 1010 } 1011 } 1012 return r; 1013 } 1014 1015 int 1016 bufferevent_add_event_(struct event *ev, const struct timeval *tv) 1017 { 1018 if (!evutil_timerisset(tv)) 1019 return event_add(ev, NULL); 1020 else 1021 return event_add(ev, tv); 1022 } 1023 1024 /* For use by user programs only; internally, we should be calling 1025 either bufferevent_incref_and_lock_(), or BEV_LOCK. */ 1026 void 1027 bufferevent_lock(struct bufferevent *bev) 1028 { 1029 bufferevent_incref_and_lock_(bev); 1030 } 1031 1032 void 1033 bufferevent_unlock(struct bufferevent *bev) 1034 { 1035 bufferevent_decref_and_unlock_(bev); 1036 } 1037