1 /* 2 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "util-internal.h" 28 29 /* The old tests here need assertions to work. */ 30 #undef NDEBUG 31 32 #ifdef _WIN32 33 #include <winsock2.h> 34 #include <windows.h> 35 #endif 36 37 #include "event2/event-config.h" 38 39 #include <sys/types.h> 40 #include <sys/stat.h> 41 #ifdef EVENT__HAVE_SYS_TIME_H 42 #include <sys/time.h> 43 #endif 44 #include <sys/queue.h> 45 #ifndef _WIN32 46 #include <sys/socket.h> 47 #include <sys/wait.h> 48 #include <signal.h> 49 #include <unistd.h> 50 #include <netdb.h> 51 #include <netinet/in.h> 52 #endif 53 #include <fcntl.h> 54 #include <signal.h> 55 #include <stdlib.h> 56 #include <stdio.h> 57 #include <string.h> 58 #include <errno.h> 59 #include <assert.h> 60 61 #ifdef EVENT__HAVE_ARPA_INET_H 62 #include <arpa/inet.h> 63 #endif 64 65 #include "event2/event-config.h" 66 #include "event2/event.h" 67 #include "event2/event_struct.h" 68 #include "event2/event_compat.h" 69 #include "event2/tag.h" 70 #include "event2/buffer.h" 71 #include "event2/bufferevent.h" 72 #include "event2/bufferevent_compat.h" 73 #include "event2/bufferevent_struct.h" 74 #include "event2/listener.h" 75 #include "event2/util.h" 76 77 #include "bufferevent-internal.h" 78 #include "evthread-internal.h" 79 #include "util-internal.h" 80 #ifdef _WIN32 81 #include "iocp-internal.h" 82 #endif 83 84 #include "regress.h" 85 #include "regress_testutils.h" 86 87 /* 88 * simple bufferevent test 89 */ 90 91 static void 92 readcb(struct bufferevent *bev, void *arg) 93 { 94 if (evbuffer_get_length(bev->input) == 8333) { 95 struct evbuffer *evbuf = evbuffer_new(); 96 assert(evbuf != NULL); 97 98 /* gratuitous test of bufferevent_read_buffer */ 99 bufferevent_read_buffer(bev, evbuf); 100 101 bufferevent_disable(bev, EV_READ); 102 103 if (evbuffer_get_length(evbuf) == 8333) { 104 test_ok++; 105 } 106 107 evbuffer_free(evbuf); 108 } 109 } 110 111 static void 112 writecb(struct bufferevent *bev, void *arg) 113 { 114 if (evbuffer_get_length(bev->output) == 0) { 115 test_ok++; 116 } 117 } 118 119 static void 120 errorcb(struct bufferevent *bev, short what, void *arg) 121 { 122 test_ok = -2; 123 } 124 125 static void 126 test_bufferevent_impl(int use_pair, int flush) 127 { 128 struct bufferevent *bev1 = NULL, *bev2 = NULL; 129 char buffer[8333]; 130 int i; 131 int expected = 2; 132 133 if (use_pair) { 134 struct bufferevent *pair[2]; 135 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 136 bev1 = pair[0]; 137 bev2 = pair[1]; 138 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1); 139 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); 140 tt_int_op(bufferevent_getfd(bev1), ==, -1); 141 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 142 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2); 143 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1); 144 } else { 145 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); 146 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); 147 tt_int_op(bufferevent_getfd(bev1), ==, pair[0]); 148 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 149 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 150 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL); 151 } 152 153 { 154 /* Test getcb. */ 155 bufferevent_data_cb r, w; 156 bufferevent_event_cb e; 157 void *a; 158 bufferevent_getcb(bev1, &r, &w, &e, &a); 159 tt_ptr_op(r, ==, readcb); 160 tt_ptr_op(w, ==, writecb); 161 tt_ptr_op(e, ==, errorcb); 162 tt_ptr_op(a, ==, use_pair ? bev1 : NULL); 163 } 164 165 bufferevent_disable(bev1, EV_READ); 166 bufferevent_enable(bev2, EV_READ); 167 168 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE); 169 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ); 170 171 for (i = 0; i < (int)sizeof(buffer); i++) 172 buffer[i] = i; 173 174 bufferevent_write(bev1, buffer, sizeof(buffer)); 175 if (flush >= 0) { 176 tt_int_op(bufferevent_flush(bev1, EV_WRITE, flush), >=, 0); 177 } 178 179 event_dispatch(); 180 181 bufferevent_free(bev2); 182 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 183 bufferevent_free(bev1); 184 185 /** Only pair call errorcb for BEV_FINISHED */ 186 if (use_pair && flush == BEV_FINISHED) { 187 expected = -1; 188 } 189 if (test_ok != expected) 190 test_ok = 0; 191 end: 192 ; 193 } 194 195 static void test_bufferevent(void) { test_bufferevent_impl(0, -1); } 196 static void test_bufferevent_pair(void) { test_bufferevent_impl(1, -1); } 197 198 static void test_bufferevent_flush_normal(void) { test_bufferevent_impl(0, BEV_NORMAL); } 199 static void test_bufferevent_flush_flush(void) { test_bufferevent_impl(0, BEV_FLUSH); } 200 static void test_bufferevent_flush_finished(void) { test_bufferevent_impl(0, BEV_FINISHED); } 201 202 static void test_bufferevent_pair_flush_normal(void) { test_bufferevent_impl(1, BEV_NORMAL); } 203 static void test_bufferevent_pair_flush_flush(void) { test_bufferevent_impl(1, BEV_FLUSH); } 204 static void test_bufferevent_pair_flush_finished(void) { test_bufferevent_impl(1, BEV_FINISHED); } 205 206 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 207 /** 208 * Trace lock/unlock/alloc/free for locks. 209 * (More heavier then evthread_debug*) 210 */ 211 typedef struct 212 { 213 void *lock; 214 enum { 215 ALLOC, FREE, 216 } status; 217 size_t locked /** allow recursive locking */; 218 } lock_wrapper; 219 struct lock_unlock_base 220 { 221 /* Original callbacks */ 222 struct evthread_lock_callbacks cbs; 223 /* Map of locks */ 224 lock_wrapper *locks; 225 size_t nr_locks; 226 } lu_base = { 227 .locks = NULL, 228 }; 229 230 static lock_wrapper *lu_find(void *lock_) 231 { 232 size_t i; 233 for (i = 0; i < lu_base.nr_locks; ++i) { 234 lock_wrapper *lock = &lu_base.locks[i]; 235 if (lock->lock == lock_) 236 return lock; 237 } 238 return NULL; 239 } 240 241 static void *trace_lock_alloc(unsigned locktype) 242 { 243 void *lock; 244 ++lu_base.nr_locks; 245 lu_base.locks = realloc(lu_base.locks, 246 sizeof(lock_wrapper) * lu_base.nr_locks); 247 lock = lu_base.cbs.alloc(locktype); 248 lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 }; 249 return lock; 250 } 251 static void trace_lock_free(void *lock_, unsigned locktype) 252 { 253 lock_wrapper *lock = lu_find(lock_); 254 if (!lock || lock->status == FREE || lock->locked) { 255 TT_FAIL(("lock: free error")); 256 } else { 257 lock->status = FREE; 258 lu_base.cbs.free(lock_, locktype); 259 } 260 } 261 static int trace_lock_lock(unsigned mode, void *lock_) 262 { 263 lock_wrapper *lock = lu_find(lock_); 264 if (!lock || lock->status == FREE) { 265 TT_FAIL(("lock: lock error")); 266 return -1; 267 } else { 268 ++lock->locked; 269 return lu_base.cbs.lock(mode, lock_); 270 } 271 } 272 static int trace_lock_unlock(unsigned mode, void *lock_) 273 { 274 lock_wrapper *lock = lu_find(lock_); 275 if (!lock || lock->status == FREE || !lock->locked) { 276 TT_FAIL(("lock: unlock error")); 277 return -1; 278 } else { 279 --lock->locked; 280 return lu_base.cbs.unlock(mode, lock_); 281 } 282 } 283 static void lock_unlock_free_thread_cbs(void) 284 { 285 event_base_free(NULL); 286 287 if (libevent_tests_running_in_debug_mode) 288 libevent_global_shutdown(); 289 290 /** drop immutable flag */ 291 evthread_set_lock_callbacks(NULL); 292 /** avoid calling of event_global_setup_locks_() for new cbs */ 293 libevent_global_shutdown(); 294 /** drop immutable flag for non-debug ops (since called after shutdown) */ 295 evthread_set_lock_callbacks(NULL); 296 } 297 298 static int use_lock_unlock_profiler(void) 299 { 300 struct evthread_lock_callbacks cbs = { 301 EVTHREAD_LOCK_API_VERSION, 302 EVTHREAD_LOCKTYPE_RECURSIVE, 303 trace_lock_alloc, 304 trace_lock_free, 305 trace_lock_lock, 306 trace_lock_unlock, 307 }; 308 memcpy(&lu_base.cbs, evthread_get_lock_callbacks(), 309 sizeof(lu_base.cbs)); 310 { 311 lock_unlock_free_thread_cbs(); 312 313 evthread_set_lock_callbacks(&cbs); 314 /** re-create debug locks correctly */ 315 evthread_enable_lock_debugging(); 316 317 event_init(); 318 } 319 return 0; 320 } 321 static void free_lock_unlock_profiler(struct basic_test_data *data) 322 { 323 /** fix "held_by" for kqueue */ 324 evthread_set_lock_callbacks(NULL); 325 326 lock_unlock_free_thread_cbs(); 327 free(lu_base.locks); 328 data->base = NULL; 329 } 330 331 static void test_bufferevent_pair_release_lock(void *arg) 332 { 333 struct basic_test_data *data = arg; 334 use_lock_unlock_profiler(); 335 { 336 struct bufferevent *pair[2]; 337 if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) { 338 bufferevent_free(pair[0]); 339 bufferevent_free(pair[1]); 340 } else 341 tt_abort_perror("bufferevent_pair_new"); 342 } 343 free_lock_unlock_profiler(data); 344 end: 345 ; 346 } 347 #endif 348 349 /* 350 * test watermarks and bufferevent 351 */ 352 353 static void 354 wm_readcb(struct bufferevent *bev, void *arg) 355 { 356 struct evbuffer *evbuf = evbuffer_new(); 357 int len = (int)evbuffer_get_length(bev->input); 358 static int nread; 359 360 assert(len >= 10 && len <= 20); 361 362 assert(evbuf != NULL); 363 364 /* gratuitous test of bufferevent_read_buffer */ 365 bufferevent_read_buffer(bev, evbuf); 366 367 nread += len; 368 if (nread == 65000) { 369 bufferevent_disable(bev, EV_READ); 370 test_ok++; 371 } 372 373 evbuffer_free(evbuf); 374 } 375 376 static void 377 wm_writecb(struct bufferevent *bev, void *arg) 378 { 379 assert(evbuffer_get_length(bev->output) <= 100); 380 if (evbuffer_get_length(bev->output) == 0) { 381 evbuffer_drain(bev->output, evbuffer_get_length(bev->output)); 382 test_ok++; 383 } 384 } 385 386 static void 387 wm_errorcb(struct bufferevent *bev, short what, void *arg) 388 { 389 test_ok = -2; 390 } 391 392 static void 393 test_bufferevent_watermarks_impl(int use_pair) 394 { 395 struct bufferevent *bev1 = NULL, *bev2 = NULL; 396 char buffer[65000]; 397 size_t low, high; 398 int i; 399 test_ok = 0; 400 401 if (use_pair) { 402 struct bufferevent *pair[2]; 403 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 404 bev1 = pair[0]; 405 bev2 = pair[1]; 406 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL); 407 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL); 408 } else { 409 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); 410 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); 411 } 412 tt_assert(bev1); 413 tt_assert(bev2); 414 bufferevent_disable(bev1, EV_READ); 415 bufferevent_enable(bev2, EV_READ); 416 417 /* By default, low watermarks are set to 0 */ 418 bufferevent_getwatermark(bev1, EV_READ, &low, NULL); 419 tt_int_op(low, ==, 0); 420 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL); 421 tt_int_op(low, ==, 0); 422 423 for (i = 0; i < (int)sizeof(buffer); i++) 424 buffer[i] = (char)i; 425 426 /* limit the reading on the receiving bufferevent */ 427 bufferevent_setwatermark(bev2, EV_READ, 10, 20); 428 429 bufferevent_getwatermark(bev2, EV_READ, &low, &high); 430 tt_int_op(low, ==, 10); 431 tt_int_op(high, ==, 20); 432 433 /* Tell the sending bufferevent not to notify us till it's down to 434 100 bytes. */ 435 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); 436 437 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high); 438 tt_int_op(low, ==, 100); 439 tt_int_op(high, ==, 2000); 440 441 { 442 int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high); 443 tt_int_op(r, !=, 0); 444 } 445 446 bufferevent_write(bev1, buffer, sizeof(buffer)); 447 448 event_dispatch(); 449 450 tt_int_op(test_ok, ==, 2); 451 452 /* The write callback drained all the data from outbuf, so we 453 * should have removed the write event... */ 454 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL)); 455 456 end: 457 if (bev1) 458 bufferevent_free(bev1); 459 if (bev2) 460 bufferevent_free(bev2); 461 } 462 463 static void 464 test_bufferevent_watermarks(void) 465 { 466 test_bufferevent_watermarks_impl(0); 467 } 468 469 static void 470 test_bufferevent_pair_watermarks(void) 471 { 472 test_bufferevent_watermarks_impl(1); 473 } 474 475 /* 476 * Test bufferevent filters 477 */ 478 479 /* strip an 'x' from each byte */ 480 481 static enum bufferevent_filter_result 482 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, 483 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 484 { 485 const unsigned char *buffer; 486 unsigned i; 487 488 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 489 for (i = 0; i < evbuffer_get_length(src); i += 2) { 490 if (buffer[i] == '-') 491 continue; 492 493 assert(buffer[i] == 'x'); 494 evbuffer_add(dst, buffer + i + 1, 1); 495 } 496 497 evbuffer_drain(src, i); 498 return (BEV_OK); 499 } 500 501 /* add an 'x' before each byte */ 502 503 static enum bufferevent_filter_result 504 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, 505 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 506 { 507 const unsigned char *buffer; 508 unsigned i; 509 struct bufferevent **bevp = ctx; 510 511 ++test_ok; 512 513 if (test_ok == 1) { 514 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 515 for (i = 0; i < evbuffer_get_length(src); ++i) { 516 evbuffer_add(dst, "x", 1); 517 evbuffer_add(dst, buffer + i, 1); 518 } 519 evbuffer_drain(src, evbuffer_get_length(src)); 520 } else { 521 return BEV_ERROR; 522 } 523 524 if (bevp && test_ok == 1) { 525 int prev = ++test_ok; 526 bufferevent_write(*bevp, "-", 1); 527 /* check that during this bufferevent_write() 528 * bufferevent_output_filter() will not be called again */ 529 assert(test_ok == prev); 530 --test_ok; 531 } 532 533 return (BEV_OK); 534 } 535 536 static void 537 test_bufferevent_filters_impl(int use_pair, int disable) 538 { 539 struct bufferevent *bev1 = NULL, *bev2 = NULL; 540 struct bufferevent *bev1_base = NULL, *bev2_base = NULL; 541 char buffer[8333]; 542 int i; 543 544 test_ok = 0; 545 546 if (use_pair) { 547 struct bufferevent *pair[2]; 548 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 549 bev1 = pair[0]; 550 bev2 = pair[1]; 551 } else { 552 bev1 = bufferevent_socket_new(NULL, pair[0], 0); 553 bev2 = bufferevent_socket_new(NULL, pair[1], 0); 554 } 555 bev1_base = bev1; 556 bev2_base = bev2; 557 558 for (i = 0; i < (int)sizeof(buffer); i++) 559 buffer[i] = i; 560 561 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter, 562 BEV_OPT_CLOSE_ON_FREE, NULL, 563 disable ? &bev1 : NULL); 564 565 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter, 566 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 567 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL); 568 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL); 569 570 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base); 571 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base); 572 tt_int_op(bufferevent_getfd(bev1), ==, -1); 573 tt_int_op(bufferevent_getfd(bev2), ==, -1); 574 575 bufferevent_disable(bev1, EV_READ); 576 bufferevent_enable(bev2, EV_READ); 577 /* insert some filters */ 578 bufferevent_write(bev1, buffer, sizeof(buffer)); 579 580 event_dispatch(); 581 582 if (test_ok != 3 + !!disable) 583 test_ok = 0; 584 585 end: 586 if (bev1) 587 bufferevent_free(bev1); 588 if (bev2) 589 bufferevent_free(bev2); 590 591 } 592 593 static void test_bufferevent_filters(void) 594 { test_bufferevent_filters_impl(0, 0); } 595 static void test_bufferevent_pair_filters(void) 596 { test_bufferevent_filters_impl(1, 0); } 597 static void test_bufferevent_filters_disable(void) 598 { test_bufferevent_filters_impl(0, 1); } 599 static void test_bufferevent_pair_filters_disable(void) 600 { test_bufferevent_filters_impl(1, 1); } 601 602 603 static void 604 sender_writecb(struct bufferevent *bev, void *ctx) 605 { 606 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) { 607 bufferevent_disable(bev,EV_READ|EV_WRITE); 608 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev))); 609 bufferevent_free(bev); 610 } 611 } 612 613 static void 614 sender_errorcb(struct bufferevent *bev, short what, void *ctx) 615 { 616 TT_FAIL(("Got sender error %d",(int)what)); 617 } 618 619 static int bufferevent_connect_test_flags = 0; 620 static int bufferevent_trigger_test_flags = 0; 621 static int n_strings_read = 0; 622 static int n_reads_invoked = 0; 623 static int n_events_invoked = 0; 624 625 #define TEST_STR "Now is the time for all good events to signal for " \ 626 "the good of their protocol" 627 static void 628 listen_cb(struct evconnlistener *listener, evutil_socket_t fd, 629 struct sockaddr *sa, int socklen, void *arg) 630 { 631 struct event_base *base = arg; 632 struct bufferevent *bev; 633 const char s[] = TEST_STR; 634 TT_BLATHER(("Got a request on socket %d", (int)fd )); 635 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags); 636 tt_assert(bev); 637 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); 638 bufferevent_write(bev, s, sizeof(s)); 639 end: 640 ; 641 } 642 643 static int 644 fake_listener_create(struct sockaddr_in *localhost) 645 { 646 struct sockaddr *sa = (struct sockaddr *)localhost; 647 evutil_socket_t fd = -1; 648 ev_socklen_t slen = sizeof(*localhost); 649 650 memset(localhost, 0, sizeof(*localhost)); 651 localhost->sin_port = 0; /* have the kernel pick a port */ 652 localhost->sin_addr.s_addr = htonl(0x7f000001L); 653 localhost->sin_family = AF_INET; 654 655 /* bind, but don't listen or accept. should trigger 656 "Connection refused" reliably on most platforms. */ 657 fd = socket(localhost->sin_family, SOCK_STREAM, 0); 658 tt_assert(fd >= 0); 659 tt_assert(bind(fd, sa, slen) == 0); 660 tt_assert(getsockname(fd, sa, &slen) == 0); 661 662 return fd; 663 664 end: 665 return -1; 666 } 667 668 static void 669 reader_eventcb(struct bufferevent *bev, short what, void *ctx) 670 { 671 struct event_base *base = ctx; 672 if (what & BEV_EVENT_ERROR) { 673 perror("foobar"); 674 TT_FAIL(("got connector error %d", (int)what)); 675 return; 676 } 677 if (what & BEV_EVENT_CONNECTED) { 678 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev))); 679 bufferevent_enable(bev, EV_READ); 680 } 681 if (what & BEV_EVENT_EOF) { 682 char buf[512]; 683 size_t n; 684 n = bufferevent_read(bev, buf, sizeof(buf)-1); 685 tt_int_op(n, >=, 0); 686 buf[n] = '\0'; 687 tt_str_op(buf, ==, TEST_STR); 688 if (++n_strings_read == 2) 689 event_base_loopexit(base, NULL); 690 TT_BLATHER(("EOF on %d: %d strings read.", 691 (int)bufferevent_getfd(bev), n_strings_read)); 692 } 693 end: 694 ; 695 } 696 697 static void 698 reader_eventcb_simple(struct bufferevent *bev, short what, void *ctx) 699 { 700 TT_BLATHER(("Read eventcb simple invoked on %d.", 701 (int)bufferevent_getfd(bev))); 702 n_events_invoked++; 703 } 704 705 static void 706 reader_readcb(struct bufferevent *bev, void *ctx) 707 { 708 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 709 n_reads_invoked++; 710 } 711 712 static void 713 test_bufferevent_connect(void *arg) 714 { 715 struct basic_test_data *data = arg; 716 struct evconnlistener *lev=NULL; 717 struct bufferevent *bev1=NULL, *bev2=NULL; 718 struct sockaddr_in localhost; 719 struct sockaddr_storage ss; 720 struct sockaddr *sa; 721 ev_socklen_t slen; 722 723 int be_flags=BEV_OPT_CLOSE_ON_FREE; 724 725 if (strstr((char*)data->setup_data, "defer")) { 726 be_flags |= BEV_OPT_DEFER_CALLBACKS; 727 } 728 if (strstr((char*)data->setup_data, "unlocked")) { 729 be_flags |= BEV_OPT_UNLOCK_CALLBACKS; 730 } 731 if (strstr((char*)data->setup_data, "lock")) { 732 be_flags |= BEV_OPT_THREADSAFE; 733 } 734 bufferevent_connect_test_flags = be_flags; 735 #ifdef _WIN32 736 if (!strcmp((char*)data->setup_data, "unset_connectex")) { 737 struct win32_extension_fns *ext = 738 (struct win32_extension_fns *) 739 event_get_win32_extension_fns_(); 740 ext->ConnectEx = NULL; 741 } 742 #endif 743 744 memset(&localhost, 0, sizeof(localhost)); 745 746 localhost.sin_port = 0; /* pick-a-port */ 747 localhost.sin_addr.s_addr = htonl(0x7f000001L); 748 localhost.sin_family = AF_INET; 749 sa = (struct sockaddr *)&localhost; 750 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 751 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 752 16, sa, sizeof(localhost)); 753 tt_assert(lev); 754 755 sa = (struct sockaddr *)&ss; 756 slen = sizeof(ss); 757 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 758 tt_abort_perror("getsockname"); 759 } 760 761 tt_assert(!evconnlistener_enable(lev)); 762 bev1 = bufferevent_socket_new(data->base, -1, be_flags); 763 bev2 = bufferevent_socket_new(data->base, -1, be_flags); 764 tt_assert(bev1); 765 tt_assert(bev2); 766 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base); 767 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base); 768 769 bufferevent_enable(bev1, EV_READ); 770 bufferevent_enable(bev2, EV_READ); 771 772 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); 773 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); 774 775 event_base_dispatch(data->base); 776 777 tt_int_op(n_strings_read, ==, 2); 778 tt_int_op(n_reads_invoked, >=, 2); 779 end: 780 if (lev) 781 evconnlistener_free(lev); 782 783 if (bev1) 784 bufferevent_free(bev1); 785 786 if (bev2) 787 bufferevent_free(bev2); 788 } 789 790 static void 791 test_bufferevent_connect_fail_eventcb(void *arg) 792 { 793 struct basic_test_data *data = arg; 794 int flags = BEV_OPT_CLOSE_ON_FREE | (long)data->setup_data; 795 struct bufferevent *bev = NULL; 796 struct evconnlistener *lev = NULL; 797 struct sockaddr_in localhost; 798 ev_socklen_t slen = sizeof(localhost); 799 evutil_socket_t fake_listener = -1; 800 801 fake_listener = fake_listener_create(&localhost); 802 803 tt_int_op(n_events_invoked, ==, 0); 804 805 bev = bufferevent_socket_new(data->base, -1, flags); 806 tt_assert(bev); 807 bufferevent_setcb(bev, reader_readcb, reader_readcb, 808 reader_eventcb_simple, data->base); 809 bufferevent_enable(bev, EV_READ|EV_WRITE); 810 tt_int_op(n_events_invoked, ==, 0); 811 tt_int_op(n_reads_invoked, ==, 0); 812 /** @see also test_bufferevent_connect_fail() */ 813 bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen); 814 tt_int_op(n_events_invoked, ==, 0); 815 tt_int_op(n_reads_invoked, ==, 0); 816 event_base_dispatch(data->base); 817 tt_int_op(n_events_invoked, ==, 1); 818 tt_int_op(n_reads_invoked, ==, 0); 819 820 end: 821 if (lev) 822 evconnlistener_free(lev); 823 if (bev) 824 bufferevent_free(bev); 825 if (fake_listener >= 0) 826 evutil_closesocket(fake_listener); 827 } 828 829 static void 830 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx) 831 { 832 struct event_base *base = ctx; 833 const char *err; 834 evutil_socket_t s; 835 836 if (what & BEV_EVENT_ERROR) { 837 s = bufferevent_getfd(bev); 838 err = evutil_socket_error_to_string(evutil_socket_geterror(s)); 839 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s", 840 EV_SOCK_ARG(s), err)); 841 test_ok = 1; 842 } else { 843 TT_FAIL(("didn't fail? what %hd", what)); 844 } 845 846 event_base_loopexit(base, NULL); 847 } 848 849 static void 850 close_socket_cb(evutil_socket_t fd, short what, void *arg) 851 { 852 evutil_socket_t *fdp = arg; 853 if (*fdp >= 0) { 854 evutil_closesocket(*fdp); 855 *fdp = -1; 856 } 857 } 858 859 static void 860 test_bufferevent_connect_fail(void *arg) 861 { 862 struct basic_test_data *data = (struct basic_test_data *)arg; 863 struct bufferevent *bev=NULL; 864 struct event close_listener_event; 865 int close_listener_event_added = 0; 866 struct timeval one_second = { 1, 0 }; 867 struct sockaddr_in localhost; 868 ev_socklen_t slen = sizeof(localhost); 869 evutil_socket_t fake_listener = -1; 870 int r; 871 872 test_ok = 0; 873 874 fake_listener = fake_listener_create(&localhost); 875 bev = bufferevent_socket_new(data->base, -1, 876 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); 877 tt_assert(bev); 878 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base); 879 880 r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen); 881 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells 882 * detects the error immediately, which is not really wrong of it. */ 883 tt_want(r == 0 || r == -1); 884 885 /* Close the listener socket after a second. This should trigger 886 "connection refused" on some other platforms, including OSX. */ 887 evtimer_assign(&close_listener_event, data->base, close_socket_cb, 888 &fake_listener); 889 event_add(&close_listener_event, &one_second); 890 close_listener_event_added = 1; 891 892 event_base_dispatch(data->base); 893 894 tt_int_op(test_ok, ==, 1); 895 896 end: 897 if (fake_listener >= 0) 898 evutil_closesocket(fake_listener); 899 900 if (bev) 901 bufferevent_free(bev); 902 903 if (close_listener_event_added) 904 event_del(&close_listener_event); 905 } 906 907 struct timeout_cb_result { 908 struct timeval read_timeout_at; 909 struct timeval write_timeout_at; 910 struct timeval last_wrote_at; 911 int n_read_timeouts; 912 int n_write_timeouts; 913 int total_calls; 914 }; 915 916 static void 917 bev_timeout_write_cb(struct bufferevent *bev, void *arg) 918 { 919 struct timeout_cb_result *res = arg; 920 evutil_gettimeofday(&res->last_wrote_at, NULL); 921 } 922 923 static void 924 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg) 925 { 926 struct timeout_cb_result *res = arg; 927 ++res->total_calls; 928 929 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) 930 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) { 931 evutil_gettimeofday(&res->read_timeout_at, NULL); 932 ++res->n_read_timeouts; 933 } 934 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) 935 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) { 936 evutil_gettimeofday(&res->write_timeout_at, NULL); 937 ++res->n_write_timeouts; 938 } 939 } 940 941 static void 942 test_bufferevent_timeouts(void *arg) 943 { 944 /* "arg" is a string containing "pair" and/or "filter". */ 945 struct bufferevent *bev1 = NULL, *bev2 = NULL; 946 struct basic_test_data *data = arg; 947 int use_pair = 0, use_filter = 0; 948 struct timeval tv_w, tv_r, started_at; 949 struct timeout_cb_result res1, res2; 950 char buf[1024]; 951 952 memset(&res1, 0, sizeof(res1)); 953 memset(&res2, 0, sizeof(res2)); 954 955 if (strstr((char*)data->setup_data, "pair")) 956 use_pair = 1; 957 if (strstr((char*)data->setup_data, "filter")) 958 use_filter = 1; 959 960 if (use_pair) { 961 struct bufferevent *p[2]; 962 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p)); 963 bev1 = p[0]; 964 bev2 = p[1]; 965 } else { 966 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0); 967 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0); 968 } 969 970 tt_assert(bev1); 971 tt_assert(bev2); 972 973 if (use_filter) { 974 struct bufferevent *bevf1, *bevf2; 975 bevf1 = bufferevent_filter_new(bev1, NULL, NULL, 976 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 977 bevf2 = bufferevent_filter_new(bev2, NULL, NULL, 978 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 979 tt_assert(bevf1); 980 tt_assert(bevf2); 981 bev1 = bevf1; 982 bev2 = bevf2; 983 } 984 985 /* Do this nice and early. */ 986 bufferevent_disable(bev2, EV_READ); 987 988 /* bev1 will try to write and read. Both will time out. */ 989 evutil_gettimeofday(&started_at, NULL); 990 tv_w.tv_sec = tv_r.tv_sec = 0; 991 tv_w.tv_usec = 100*1000; 992 tv_r.tv_usec = 150*1000; 993 bufferevent_setcb(bev1, NULL, bev_timeout_write_cb, 994 bev_timeout_event_cb, &res1); 995 bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0); 996 bufferevent_set_timeouts(bev1, &tv_r, &tv_w); 997 if (use_pair) { 998 /* For a pair, the fact that the other side isn't reading 999 * makes the writer stall */ 1000 bufferevent_write(bev1, "ABCDEFG", 7); 1001 } else { 1002 /* For a real socket, the kernel's TCP buffers can eat a 1003 * fair number of bytes; make sure that at some point we 1004 * have some bytes that will stall. */ 1005 struct evbuffer *output = bufferevent_get_output(bev1); 1006 int i; 1007 memset(buf, 0xbb, sizeof(buf)); 1008 for (i=0;i<1024;++i) { 1009 evbuffer_add_reference(output, buf, sizeof(buf), 1010 NULL, NULL); 1011 } 1012 } 1013 bufferevent_enable(bev1, EV_READ|EV_WRITE); 1014 1015 /* bev2 has nothing to say, and isn't listening. */ 1016 bufferevent_setcb(bev2, NULL, bev_timeout_write_cb, 1017 bev_timeout_event_cb, &res2); 1018 tv_w.tv_sec = tv_r.tv_sec = 0; 1019 tv_w.tv_usec = 200*1000; 1020 tv_r.tv_usec = 100*1000; 1021 bufferevent_set_timeouts(bev2, &tv_r, &tv_w); 1022 bufferevent_enable(bev2, EV_WRITE); 1023 1024 tv_r.tv_sec = 0; 1025 tv_r.tv_usec = 350000; 1026 1027 event_base_loopexit(data->base, &tv_r); 1028 event_base_dispatch(data->base); 1029 1030 /* XXXX Test that actually reading or writing a little resets the 1031 * timeouts. */ 1032 1033 /* Each buf1 timeout happens, and happens only once. */ 1034 tt_want(res1.n_read_timeouts); 1035 tt_want(res1.n_write_timeouts); 1036 tt_want(res1.n_read_timeouts == 1); 1037 tt_want(res1.n_write_timeouts == 1); 1038 1039 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150); 1040 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100); 1041 1042 end: 1043 if (bev1) 1044 bufferevent_free(bev1); 1045 if (bev2) 1046 bufferevent_free(bev2); 1047 } 1048 1049 static void 1050 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) 1051 { 1052 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); 1053 } 1054 1055 static void 1056 trigger_eventcb(struct bufferevent *bev, short what, void *ctx) 1057 { 1058 struct event_base *base = ctx; 1059 if (what == ~0) { 1060 TT_BLATHER(("Event successfully triggered.")); 1061 event_base_loopexit(base, NULL); 1062 return; 1063 } 1064 reader_eventcb(bev, what, ctx); 1065 } 1066 1067 static void 1068 trigger_readcb_triggered(struct bufferevent *bev, void *ctx) 1069 { 1070 TT_BLATHER(("Read successfully triggered.")); 1071 n_reads_invoked++; 1072 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags); 1073 } 1074 1075 static void 1076 trigger_readcb(struct bufferevent *bev, void *ctx) 1077 { 1078 struct timeval timeout = { 30, 0 }; 1079 struct event_base *base = ctx; 1080 size_t low, high, len; 1081 int expected_reads; 1082 1083 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 1084 expected_reads = ++n_reads_invoked; 1085 1086 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx); 1087 1088 bufferevent_getwatermark(bev, EV_READ, &low, &high); 1089 len = evbuffer_get_length(bufferevent_get_input(bev)); 1090 1091 bufferevent_setwatermark(bev, EV_READ, len + 1, 0); 1092 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); 1093 /* no callback expected */ 1094 tt_int_op(n_reads_invoked, ==, expected_reads); 1095 1096 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || 1097 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { 1098 /* will be deferred */ 1099 } else { 1100 expected_reads++; 1101 } 1102 1103 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); 1104 1105 bufferevent_trigger(bev, EV_READ, 1106 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); 1107 tt_int_op(n_reads_invoked, ==, expected_reads); 1108 1109 bufferevent_setwatermark(bev, EV_READ, low, high); 1110 end: 1111 ; 1112 } 1113 1114 static void 1115 test_bufferevent_trigger(void *arg) 1116 { 1117 struct basic_test_data *data = arg; 1118 struct evconnlistener *lev=NULL; 1119 struct bufferevent *bev=NULL; 1120 struct sockaddr_in localhost; 1121 struct sockaddr_storage ss; 1122 struct sockaddr *sa; 1123 ev_socklen_t slen; 1124 1125 int be_flags=BEV_OPT_CLOSE_ON_FREE; 1126 int trig_flags=0; 1127 1128 if (strstr((char*)data->setup_data, "defer")) { 1129 be_flags |= BEV_OPT_DEFER_CALLBACKS; 1130 } 1131 bufferevent_connect_test_flags = be_flags; 1132 1133 if (strstr((char*)data->setup_data, "postpone")) { 1134 trig_flags |= BEV_TRIG_DEFER_CALLBACKS; 1135 } 1136 bufferevent_trigger_test_flags = trig_flags; 1137 1138 memset(&localhost, 0, sizeof(localhost)); 1139 1140 localhost.sin_port = 0; /* pick-a-port */ 1141 localhost.sin_addr.s_addr = htonl(0x7f000001L); 1142 localhost.sin_family = AF_INET; 1143 sa = (struct sockaddr *)&localhost; 1144 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 1145 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 1146 16, sa, sizeof(localhost)); 1147 tt_assert(lev); 1148 1149 sa = (struct sockaddr *)&ss; 1150 slen = sizeof(ss); 1151 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 1152 tt_abort_perror("getsockname"); 1153 } 1154 1155 tt_assert(!evconnlistener_enable(lev)); 1156 bev = bufferevent_socket_new(data->base, -1, be_flags); 1157 tt_assert(bev); 1158 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base); 1159 1160 bufferevent_enable(bev, EV_READ); 1161 1162 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); 1163 1164 event_base_dispatch(data->base); 1165 1166 tt_int_op(n_reads_invoked, ==, 2); 1167 end: 1168 if (lev) 1169 evconnlistener_free(lev); 1170 1171 if (bev) 1172 bufferevent_free(bev); 1173 } 1174 1175 static void 1176 test_bufferevent_socket_filter_inactive(void *arg) 1177 { 1178 struct basic_test_data *data = arg; 1179 struct bufferevent *bev = NULL, *bevf = NULL; 1180 1181 bev = bufferevent_socket_new(data->base, -1, 0); 1182 tt_assert(bev); 1183 bevf = bufferevent_filter_new(bev, NULL, NULL, 0, NULL, NULL); 1184 tt_assert(bevf); 1185 1186 end: 1187 if (bevf) 1188 bufferevent_free(bevf); 1189 if (bev) 1190 bufferevent_free(bev); 1191 } 1192 1193 static void 1194 pair_flush_eventcb(struct bufferevent *bev, short what, void *ctx) 1195 { 1196 int *callback_what = ctx; 1197 *callback_what = what; 1198 } 1199 1200 static void 1201 test_bufferevent_pair_flush(void *arg) 1202 { 1203 struct basic_test_data *data = arg; 1204 struct bufferevent *pair[2]; 1205 struct bufferevent *bev1 = NULL; 1206 struct bufferevent *bev2 = NULL; 1207 int callback_what = 0; 1208 1209 tt_assert(0 == bufferevent_pair_new(data->base, 0, pair)); 1210 bev1 = pair[0]; 1211 bev2 = pair[1]; 1212 tt_assert(0 == bufferevent_enable(bev1, EV_WRITE)); 1213 tt_assert(0 == bufferevent_enable(bev2, EV_READ)); 1214 1215 bufferevent_setcb(bev2, NULL, NULL, pair_flush_eventcb, &callback_what); 1216 1217 bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED); 1218 1219 event_base_loop(data->base, EVLOOP_ONCE); 1220 1221 tt_assert(callback_what == (BEV_EVENT_READING | BEV_EVENT_EOF)); 1222 1223 end: 1224 if (bev1) 1225 bufferevent_free(bev1); 1226 if (bev2) 1227 bufferevent_free(bev2); 1228 } 1229 1230 struct bufferevent_filter_data_stuck { 1231 size_t header_size; 1232 size_t total_read; 1233 }; 1234 1235 static void 1236 bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg) 1237 { 1238 struct bufferevent_filter_data_stuck *filter_data = arg; 1239 struct evbuffer *input = bufferevent_get_input(bev); 1240 size_t read_size = evbuffer_get_length(input); 1241 evbuffer_drain(input, read_size); 1242 filter_data->total_read += read_size; 1243 } 1244 1245 /** 1246 * This filter prepends header once before forwarding data. 1247 */ 1248 static enum bufferevent_filter_result 1249 bufferevent_filter_data_stuck_inputcb( 1250 struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, 1251 enum bufferevent_flush_mode mode, void *ctx) 1252 { 1253 struct bufferevent_filter_data_stuck *filter_data = ctx; 1254 static int header_inserted = 0; 1255 size_t payload_size; 1256 size_t header_size = 0; 1257 1258 if (!header_inserted) { 1259 char *header = calloc(filter_data->header_size, 1); 1260 evbuffer_add(dst, header, filter_data->header_size); 1261 free(header); 1262 header_size = filter_data->header_size; 1263 header_inserted = 1; 1264 } 1265 1266 payload_size = evbuffer_get_length(src); 1267 if (payload_size > dst_limit - header_size) { 1268 payload_size = dst_limit - header_size; 1269 } 1270 1271 tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size)); 1272 1273 end: 1274 return BEV_OK; 1275 } 1276 1277 static void 1278 test_bufferevent_filter_data_stuck(void *arg) 1279 { 1280 const size_t read_high_wm = 4096; 1281 struct bufferevent_filter_data_stuck filter_data; 1282 struct basic_test_data *data = arg; 1283 struct bufferevent *pair[2]; 1284 struct bufferevent *filter = NULL; 1285 1286 int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS; 1287 1288 char payload[4096]; 1289 int payload_size = sizeof(payload); 1290 1291 memset(&filter_data, 0, sizeof(filter_data)); 1292 filter_data.header_size = 20; 1293 1294 tt_assert(bufferevent_pair_new(data->base, options, pair) == 0); 1295 1296 bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm); 1297 bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm); 1298 1299 tt_assert( 1300 filter = 1301 bufferevent_filter_new(pair[1], 1302 bufferevent_filter_data_stuck_inputcb, 1303 NULL, 1304 options, 1305 NULL, 1306 &filter_data)); 1307 1308 bufferevent_setcb(filter, 1309 bufferevent_filter_data_stuck_readcb, 1310 NULL, 1311 NULL, 1312 &filter_data); 1313 1314 tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0); 1315 1316 bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm); 1317 1318 tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0); 1319 1320 event_base_dispatch(data->base); 1321 1322 tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size); 1323 end: 1324 if (pair[0]) 1325 bufferevent_free(pair[0]); 1326 if (filter) 1327 bufferevent_free(filter); 1328 } 1329 1330 struct testcase_t bufferevent_testcases[] = { 1331 1332 LEGACY(bufferevent, TT_ISOLATED), 1333 LEGACY(bufferevent_pair, TT_ISOLATED), 1334 LEGACY(bufferevent_flush_normal, TT_ISOLATED), 1335 LEGACY(bufferevent_flush_flush, TT_ISOLATED), 1336 LEGACY(bufferevent_flush_finished, TT_ISOLATED), 1337 LEGACY(bufferevent_pair_flush_normal, TT_ISOLATED), 1338 LEGACY(bufferevent_pair_flush_flush, TT_ISOLATED), 1339 LEGACY(bufferevent_pair_flush_finished, TT_ISOLATED), 1340 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 1341 { "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock, 1342 TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY, 1343 &basic_setup, NULL }, 1344 #endif 1345 LEGACY(bufferevent_watermarks, TT_ISOLATED), 1346 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), 1347 LEGACY(bufferevent_filters, TT_ISOLATED), 1348 LEGACY(bufferevent_pair_filters, TT_ISOLATED), 1349 LEGACY(bufferevent_filters_disable, TT_ISOLATED), 1350 LEGACY(bufferevent_pair_filters_disable, TT_ISOLATED), 1351 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, 1352 &basic_setup, (void*)"" }, 1353 { "bufferevent_connect_defer", test_bufferevent_connect, 1354 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1355 { "bufferevent_connect_lock", test_bufferevent_connect, 1356 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" }, 1357 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1358 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1359 (void*)"defer lock" }, 1360 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect, 1361 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1362 (void*)"lock defer unlocked" }, 1363 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1364 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1365 { "bufferevent_timeout", test_bufferevent_timeouts, 1366 TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" }, 1367 { "bufferevent_timeout_pair", test_bufferevent_timeouts, 1368 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" }, 1369 { "bufferevent_timeout_filter", test_bufferevent_timeouts, 1370 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, 1371 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, 1372 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, 1373 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, 1374 &basic_setup, (void*)"" }, 1375 { "bufferevent_trigger_defer", test_bufferevent_trigger, 1376 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1377 { "bufferevent_trigger_postpone", test_bufferevent_trigger, 1378 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1379 (void*)"postpone" }, 1380 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, 1381 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1382 (void*)"defer postpone" }, 1383 #ifdef EVENT__HAVE_LIBZ 1384 LEGACY(bufferevent_zlib, TT_ISOLATED), 1385 #else 1386 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL }, 1387 #endif 1388 1389 { "bufferevent_connect_fail_eventcb_defer", 1390 test_bufferevent_connect_fail_eventcb, 1391 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS }, 1392 { "bufferevent_connect_fail_eventcb", 1393 test_bufferevent_connect_fail_eventcb, 1394 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1395 1396 { "bufferevent_socket_filter_inactive", 1397 test_bufferevent_socket_filter_inactive, 1398 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1399 { "bufferevent_pair_flush", 1400 test_bufferevent_pair_flush, 1401 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1402 { "bufferevent_filter_data_stuck", 1403 test_bufferevent_filter_data_stuck, 1404 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1405 1406 END_OF_TESTCASES, 1407 }; 1408 1409 struct testcase_t bufferevent_iocp_testcases[] = { 1410 1411 LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP), 1412 LEGACY(bufferevent_flush_normal, TT_ISOLATED), 1413 LEGACY(bufferevent_flush_flush, TT_ISOLATED), 1414 LEGACY(bufferevent_flush_finished, TT_ISOLATED), 1415 LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP), 1416 LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP), 1417 LEGACY(bufferevent_filters_disable, TT_ISOLATED|TT_ENABLE_IOCP), 1418 { "bufferevent_connect", test_bufferevent_connect, 1419 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" }, 1420 { "bufferevent_connect_defer", test_bufferevent_connect, 1421 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" }, 1422 { "bufferevent_connect_lock", test_bufferevent_connect, 1423 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1424 (void*)"lock" }, 1425 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1426 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1427 (void*)"defer lock" }, 1428 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1429 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, 1430 { "bufferevent_connect_nonblocking", test_bufferevent_connect, 1431 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, 1432 (void*)"unset_connectex" }, 1433 1434 { "bufferevent_connect_fail_eventcb_defer", 1435 test_bufferevent_connect_fail_eventcb, 1436 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, 1437 (void*)BEV_OPT_DEFER_CALLBACKS }, 1438 { "bufferevent_connect_fail_eventcb", 1439 test_bufferevent_connect_fail_eventcb, 1440 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, 1441 1442 END_OF_TESTCASES, 1443 }; 1444