1 /* 2 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "util-internal.h" 28 29 /* The old tests here need assertions to work. */ 30 #undef NDEBUG 31 32 #ifdef _WIN32 33 #include <winsock2.h> 34 #include <windows.h> 35 #endif 36 37 #include "event2/event-config.h" 38 39 #include <sys/types.h> 40 #include <sys/stat.h> 41 #ifdef EVENT__HAVE_SYS_TIME_H 42 #include <sys/time.h> 43 #endif 44 #include <sys/queue.h> 45 #ifndef _WIN32 46 #include <sys/socket.h> 47 #include <sys/wait.h> 48 #include <signal.h> 49 #include <unistd.h> 50 #include <netdb.h> 51 #include <netinet/in.h> 52 #endif 53 #include <fcntl.h> 54 #include <signal.h> 55 #include <stdlib.h> 56 #include <stdio.h> 57 #include <string.h> 58 #include <errno.h> 59 #include <assert.h> 60 61 #ifdef EVENT__HAVE_ARPA_INET_H 62 #include <arpa/inet.h> 63 #endif 64 65 #include "event2/event-config.h" 66 #include "event2/event.h" 67 #include "event2/event_struct.h" 68 #include "event2/event_compat.h" 69 #include "event2/tag.h" 70 #include "event2/buffer.h" 71 #include "event2/bufferevent.h" 72 #include "event2/bufferevent_compat.h" 73 #include "event2/bufferevent_struct.h" 74 #include "event2/listener.h" 75 #include "event2/util.h" 76 77 #include "bufferevent-internal.h" 78 #include "evthread-internal.h" 79 #include "util-internal.h" 80 #ifdef _WIN32 81 #include "iocp-internal.h" 82 #endif 83 84 #include "regress.h" 85 #include "regress_testutils.h" 86 87 /* 88 * simple bufferevent test 89 */ 90 91 static void 92 readcb(struct bufferevent *bev, void *arg) 93 { 94 if (evbuffer_get_length(bev->input) == 8333) { 95 struct evbuffer *evbuf = evbuffer_new(); 96 assert(evbuf != NULL); 97 98 /* gratuitous test of bufferevent_read_buffer */ 99 bufferevent_read_buffer(bev, evbuf); 100 101 bufferevent_disable(bev, EV_READ); 102 103 if (evbuffer_get_length(evbuf) == 8333) { 104 test_ok++; 105 } 106 107 evbuffer_free(evbuf); 108 } 109 } 110 111 static void 112 writecb(struct bufferevent *bev, void *arg) 113 { 114 if (evbuffer_get_length(bev->output) == 0) { 115 test_ok++; 116 } 117 } 118 119 static void 120 errorcb(struct bufferevent *bev, short what, void *arg) 121 { 122 test_ok = -2; 123 } 124 125 static void 126 test_bufferevent_impl(int use_pair) 127 { 128 struct bufferevent *bev1 = NULL, *bev2 = NULL; 129 char buffer[8333]; 130 int i; 131 132 if (use_pair) { 133 struct bufferevent *pair[2]; 134 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 135 bev1 = pair[0]; 136 bev2 = pair[1]; 137 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1); 138 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); 139 tt_int_op(bufferevent_getfd(bev1), ==, -1); 140 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 141 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2); 142 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1); 143 } else { 144 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); 145 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); 146 tt_int_op(bufferevent_getfd(bev1), ==, pair[0]); 147 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL); 148 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 149 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL); 150 } 151 152 { 153 /* Test getcb. */ 154 bufferevent_data_cb r, w; 155 bufferevent_event_cb e; 156 void *a; 157 bufferevent_getcb(bev1, &r, &w, &e, &a); 158 tt_ptr_op(r, ==, readcb); 159 tt_ptr_op(w, ==, writecb); 160 tt_ptr_op(e, ==, errorcb); 161 tt_ptr_op(a, ==, use_pair ? bev1 : NULL); 162 } 163 164 bufferevent_disable(bev1, EV_READ); 165 bufferevent_enable(bev2, EV_READ); 166 167 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE); 168 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ); 169 170 for (i = 0; i < (int)sizeof(buffer); i++) 171 buffer[i] = i; 172 173 bufferevent_write(bev1, buffer, sizeof(buffer)); 174 175 event_dispatch(); 176 177 bufferevent_free(bev2); 178 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL); 179 bufferevent_free(bev1); 180 181 if (test_ok != 2) 182 test_ok = 0; 183 end: 184 ; 185 } 186 187 static void 188 test_bufferevent(void) 189 { 190 test_bufferevent_impl(0); 191 } 192 193 static void 194 test_bufferevent_pair(void) 195 { 196 test_bufferevent_impl(1); 197 } 198 199 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 200 /** 201 * Trace lock/unlock/alloc/free for locks. 202 * (More heavier then evthread_debug*) 203 */ 204 typedef struct 205 { 206 void *lock; 207 enum { 208 ALLOC, FREE, 209 } status; 210 size_t locked /** allow recursive locking */; 211 } lock_wrapper; 212 struct lock_unlock_base 213 { 214 /* Original callbacks */ 215 struct evthread_lock_callbacks cbs; 216 /* Map of locks */ 217 lock_wrapper *locks; 218 size_t nr_locks; 219 } lu_base = { 220 .locks = NULL, 221 }; 222 223 static lock_wrapper *lu_find(void *lock_) 224 { 225 size_t i; 226 for (i = 0; i < lu_base.nr_locks; ++i) { 227 lock_wrapper *lock = &lu_base.locks[i]; 228 if (lock->lock == lock_) 229 return lock; 230 } 231 return NULL; 232 } 233 234 static void *trace_lock_alloc(unsigned locktype) 235 { 236 ++lu_base.nr_locks; 237 lu_base.locks = realloc(lu_base.locks, 238 sizeof(lock_wrapper) * lu_base.nr_locks); 239 void *lock = lu_base.cbs.alloc(locktype); 240 lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 }; 241 return lock; 242 } 243 static void trace_lock_free(void *lock_, unsigned locktype) 244 { 245 lock_wrapper *lock = lu_find(lock_); 246 if (!lock || lock->status == FREE || lock->locked) { 247 __asm__("int3"); 248 TT_FAIL(("lock: free error")); 249 } else { 250 lock->status = FREE; 251 lu_base.cbs.free(lock_, locktype); 252 } 253 } 254 static int trace_lock_lock(unsigned mode, void *lock_) 255 { 256 lock_wrapper *lock = lu_find(lock_); 257 if (!lock || lock->status == FREE) { 258 TT_FAIL(("lock: lock error")); 259 return -1; 260 } else { 261 ++lock->locked; 262 return lu_base.cbs.lock(mode, lock_); 263 } 264 } 265 static int trace_lock_unlock(unsigned mode, void *lock_) 266 { 267 lock_wrapper *lock = lu_find(lock_); 268 if (!lock || lock->status == FREE || !lock->locked) { 269 TT_FAIL(("lock: unlock error")); 270 return -1; 271 } else { 272 --lock->locked; 273 return lu_base.cbs.unlock(mode, lock_); 274 } 275 } 276 static void lock_unlock_free_thread_cbs() 277 { 278 event_base_free(NULL); 279 280 /** drop immutable flag */ 281 evthread_set_lock_callbacks(NULL); 282 /** avoid calling of event_global_setup_locks_() for new cbs */ 283 libevent_global_shutdown(); 284 /** drop immutable flag for non-debug ops (since called after shutdown) */ 285 evthread_set_lock_callbacks(NULL); 286 } 287 288 static int use_lock_unlock_profiler(void) 289 { 290 struct evthread_lock_callbacks cbs = { 291 EVTHREAD_LOCK_API_VERSION, 292 EVTHREAD_LOCKTYPE_RECURSIVE, 293 trace_lock_alloc, 294 trace_lock_free, 295 trace_lock_lock, 296 trace_lock_unlock, 297 }; 298 memcpy(&lu_base.cbs, evthread_get_lock_callbacks(), 299 sizeof(lu_base.cbs)); 300 { 301 lock_unlock_free_thread_cbs(); 302 303 evthread_set_lock_callbacks(&cbs); 304 /** re-create debug locks correctly */ 305 evthread_enable_lock_debugging(); 306 307 event_init(); 308 } 309 return 0; 310 } 311 static void free_lock_unlock_profiler(struct basic_test_data *data) 312 { 313 lock_unlock_free_thread_cbs(); 314 free(lu_base.locks); 315 data->base = NULL; 316 } 317 318 static void test_bufferevent_pair_release_lock(void *arg) 319 { 320 struct basic_test_data *data = arg; 321 use_lock_unlock_profiler(); 322 { 323 struct bufferevent *pair[2]; 324 if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) { 325 bufferevent_free(pair[0]); 326 bufferevent_free(pair[1]); 327 } else 328 tt_abort_perror("bufferevent_pair_new"); 329 } 330 free_lock_unlock_profiler(data); 331 end: 332 ; 333 } 334 #endif 335 336 /* 337 * test watermarks and bufferevent 338 */ 339 340 static void 341 wm_readcb(struct bufferevent *bev, void *arg) 342 { 343 struct evbuffer *evbuf = evbuffer_new(); 344 int len = (int)evbuffer_get_length(bev->input); 345 static int nread; 346 347 assert(len >= 10 && len <= 20); 348 349 assert(evbuf != NULL); 350 351 /* gratuitous test of bufferevent_read_buffer */ 352 bufferevent_read_buffer(bev, evbuf); 353 354 nread += len; 355 if (nread == 65000) { 356 bufferevent_disable(bev, EV_READ); 357 test_ok++; 358 } 359 360 evbuffer_free(evbuf); 361 } 362 363 static void 364 wm_writecb(struct bufferevent *bev, void *arg) 365 { 366 assert(evbuffer_get_length(bev->output) <= 100); 367 if (evbuffer_get_length(bev->output) == 0) { 368 evbuffer_drain(bev->output, evbuffer_get_length(bev->output)); 369 test_ok++; 370 } 371 } 372 373 static void 374 wm_errorcb(struct bufferevent *bev, short what, void *arg) 375 { 376 test_ok = -2; 377 } 378 379 static void 380 test_bufferevent_watermarks_impl(int use_pair) 381 { 382 struct bufferevent *bev1 = NULL, *bev2 = NULL; 383 char buffer[65000]; 384 size_t low, high; 385 int i; 386 test_ok = 0; 387 388 if (use_pair) { 389 struct bufferevent *pair[2]; 390 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 391 bev1 = pair[0]; 392 bev2 = pair[1]; 393 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL); 394 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL); 395 } else { 396 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); 397 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); 398 } 399 tt_assert(bev1); 400 tt_assert(bev2); 401 bufferevent_disable(bev1, EV_READ); 402 bufferevent_enable(bev2, EV_READ); 403 404 /* By default, low watermarks are set to 0 */ 405 bufferevent_getwatermark(bev1, EV_READ, &low, NULL); 406 tt_int_op(low, ==, 0); 407 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL); 408 tt_int_op(low, ==, 0); 409 410 for (i = 0; i < (int)sizeof(buffer); i++) 411 buffer[i] = (char)i; 412 413 /* limit the reading on the receiving bufferevent */ 414 bufferevent_setwatermark(bev2, EV_READ, 10, 20); 415 416 bufferevent_getwatermark(bev2, EV_READ, &low, &high); 417 tt_int_op(low, ==, 10); 418 tt_int_op(high, ==, 20); 419 420 /* Tell the sending bufferevent not to notify us till it's down to 421 100 bytes. */ 422 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); 423 424 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high); 425 tt_int_op(low, ==, 100); 426 tt_int_op(high, ==, 2000); 427 428 { 429 int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high); 430 tt_int_op(r, !=, 0); 431 } 432 433 bufferevent_write(bev1, buffer, sizeof(buffer)); 434 435 event_dispatch(); 436 437 tt_int_op(test_ok, ==, 2); 438 439 /* The write callback drained all the data from outbuf, so we 440 * should have removed the write event... */ 441 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL)); 442 443 end: 444 if (bev1) 445 bufferevent_free(bev1); 446 if (bev2) 447 bufferevent_free(bev2); 448 } 449 450 static void 451 test_bufferevent_watermarks(void) 452 { 453 test_bufferevent_watermarks_impl(0); 454 } 455 456 static void 457 test_bufferevent_pair_watermarks(void) 458 { 459 test_bufferevent_watermarks_impl(1); 460 } 461 462 /* 463 * Test bufferevent filters 464 */ 465 466 /* strip an 'x' from each byte */ 467 468 static enum bufferevent_filter_result 469 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, 470 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 471 { 472 const unsigned char *buffer; 473 unsigned i; 474 475 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 476 for (i = 0; i < evbuffer_get_length(src); i += 2) { 477 assert(buffer[i] == 'x'); 478 evbuffer_add(dst, buffer + i + 1, 1); 479 480 if (i + 2 > evbuffer_get_length(src)) 481 break; 482 } 483 484 evbuffer_drain(src, i); 485 return (BEV_OK); 486 } 487 488 /* add an 'x' before each byte */ 489 490 static enum bufferevent_filter_result 491 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, 492 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx) 493 { 494 const unsigned char *buffer; 495 unsigned i; 496 497 buffer = evbuffer_pullup(src, evbuffer_get_length(src)); 498 for (i = 0; i < evbuffer_get_length(src); ++i) { 499 evbuffer_add(dst, "x", 1); 500 evbuffer_add(dst, buffer + i, 1); 501 } 502 503 evbuffer_drain(src, evbuffer_get_length(src)); 504 return (BEV_OK); 505 } 506 507 static void 508 test_bufferevent_filters_impl(int use_pair) 509 { 510 struct bufferevent *bev1 = NULL, *bev2 = NULL; 511 struct bufferevent *bev1_base = NULL, *bev2_base = NULL; 512 char buffer[8333]; 513 int i; 514 515 test_ok = 0; 516 517 if (use_pair) { 518 struct bufferevent *pair[2]; 519 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); 520 bev1 = pair[0]; 521 bev2 = pair[1]; 522 } else { 523 bev1 = bufferevent_socket_new(NULL, pair[0], 0); 524 bev2 = bufferevent_socket_new(NULL, pair[1], 0); 525 } 526 bev1_base = bev1; 527 bev2_base = bev2; 528 529 for (i = 0; i < (int)sizeof(buffer); i++) 530 buffer[i] = i; 531 532 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter, 533 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 534 535 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter, 536 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 537 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL); 538 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL); 539 540 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base); 541 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base); 542 tt_int_op(bufferevent_getfd(bev1), ==, -1); 543 tt_int_op(bufferevent_getfd(bev2), ==, -1); 544 545 bufferevent_disable(bev1, EV_READ); 546 bufferevent_enable(bev2, EV_READ); 547 /* insert some filters */ 548 bufferevent_write(bev1, buffer, sizeof(buffer)); 549 550 event_dispatch(); 551 552 if (test_ok != 2) 553 test_ok = 0; 554 555 end: 556 if (bev1) 557 bufferevent_free(bev1); 558 if (bev2) 559 bufferevent_free(bev2); 560 561 } 562 563 static void 564 test_bufferevent_filters(void) 565 { 566 test_bufferevent_filters_impl(0); 567 } 568 569 static void 570 test_bufferevent_pair_filters(void) 571 { 572 test_bufferevent_filters_impl(1); 573 } 574 575 576 static void 577 sender_writecb(struct bufferevent *bev, void *ctx) 578 { 579 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) { 580 bufferevent_disable(bev,EV_READ|EV_WRITE); 581 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev))); 582 bufferevent_free(bev); 583 } 584 } 585 586 static void 587 sender_errorcb(struct bufferevent *bev, short what, void *ctx) 588 { 589 TT_FAIL(("Got sender error %d",(int)what)); 590 } 591 592 static int bufferevent_connect_test_flags = 0; 593 static int bufferevent_trigger_test_flags = 0; 594 static int n_strings_read = 0; 595 static int n_reads_invoked = 0; 596 597 #define TEST_STR "Now is the time for all good events to signal for " \ 598 "the good of their protocol" 599 static void 600 listen_cb(struct evconnlistener *listener, evutil_socket_t fd, 601 struct sockaddr *sa, int socklen, void *arg) 602 { 603 struct event_base *base = arg; 604 struct bufferevent *bev; 605 const char s[] = TEST_STR; 606 TT_BLATHER(("Got a request on socket %d", (int)fd )); 607 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags); 608 tt_assert(bev); 609 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); 610 bufferevent_write(bev, s, sizeof(s)); 611 end: 612 ; 613 } 614 615 static void 616 reader_eventcb(struct bufferevent *bev, short what, void *ctx) 617 { 618 struct event_base *base = ctx; 619 if (what & BEV_EVENT_ERROR) { 620 perror("foobar"); 621 TT_FAIL(("got connector error %d", (int)what)); 622 return; 623 } 624 if (what & BEV_EVENT_CONNECTED) { 625 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev))); 626 bufferevent_enable(bev, EV_READ); 627 } 628 if (what & BEV_EVENT_EOF) { 629 char buf[512]; 630 size_t n; 631 n = bufferevent_read(bev, buf, sizeof(buf)-1); 632 tt_int_op(n, >=, 0); 633 buf[n] = '\0'; 634 tt_str_op(buf, ==, TEST_STR); 635 if (++n_strings_read == 2) 636 event_base_loopexit(base, NULL); 637 TT_BLATHER(("EOF on %d: %d strings read.", 638 (int)bufferevent_getfd(bev), n_strings_read)); 639 } 640 end: 641 ; 642 } 643 644 static void 645 reader_readcb(struct bufferevent *bev, void *ctx) 646 { 647 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 648 n_reads_invoked++; 649 } 650 651 static void 652 test_bufferevent_connect(void *arg) 653 { 654 struct basic_test_data *data = arg; 655 struct evconnlistener *lev=NULL; 656 struct bufferevent *bev1=NULL, *bev2=NULL; 657 struct sockaddr_in localhost; 658 struct sockaddr_storage ss; 659 struct sockaddr *sa; 660 ev_socklen_t slen; 661 662 int be_flags=BEV_OPT_CLOSE_ON_FREE; 663 664 if (strstr((char*)data->setup_data, "defer")) { 665 be_flags |= BEV_OPT_DEFER_CALLBACKS; 666 } 667 if (strstr((char*)data->setup_data, "unlocked")) { 668 be_flags |= BEV_OPT_UNLOCK_CALLBACKS; 669 } 670 if (strstr((char*)data->setup_data, "lock")) { 671 be_flags |= BEV_OPT_THREADSAFE; 672 } 673 bufferevent_connect_test_flags = be_flags; 674 #ifdef _WIN32 675 if (!strcmp((char*)data->setup_data, "unset_connectex")) { 676 struct win32_extension_fns *ext = 677 (struct win32_extension_fns *) 678 event_get_win32_extension_fns_(); 679 ext->ConnectEx = NULL; 680 } 681 #endif 682 683 memset(&localhost, 0, sizeof(localhost)); 684 685 localhost.sin_port = 0; /* pick-a-port */ 686 localhost.sin_addr.s_addr = htonl(0x7f000001L); 687 localhost.sin_family = AF_INET; 688 sa = (struct sockaddr *)&localhost; 689 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 690 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 691 16, sa, sizeof(localhost)); 692 tt_assert(lev); 693 694 sa = (struct sockaddr *)&ss; 695 slen = sizeof(ss); 696 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 697 tt_abort_perror("getsockname"); 698 } 699 700 tt_assert(!evconnlistener_enable(lev)); 701 bev1 = bufferevent_socket_new(data->base, -1, be_flags); 702 bev2 = bufferevent_socket_new(data->base, -1, be_flags); 703 tt_assert(bev1); 704 tt_assert(bev2); 705 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base); 706 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base); 707 708 bufferevent_enable(bev1, EV_READ); 709 bufferevent_enable(bev2, EV_READ); 710 711 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); 712 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); 713 714 event_base_dispatch(data->base); 715 716 tt_int_op(n_strings_read, ==, 2); 717 tt_int_op(n_reads_invoked, >=, 2); 718 end: 719 if (lev) 720 evconnlistener_free(lev); 721 722 if (bev1) 723 bufferevent_free(bev1); 724 725 if (bev2) 726 bufferevent_free(bev2); 727 } 728 729 static void 730 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx) 731 { 732 struct event_base *base = ctx; 733 const char *err; 734 evutil_socket_t s; 735 736 if (what & BEV_EVENT_ERROR) { 737 s = bufferevent_getfd(bev); 738 err = evutil_socket_error_to_string(evutil_socket_geterror(s)); 739 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s", 740 EV_SOCK_ARG(s), err)); 741 test_ok = 1; 742 } else { 743 TT_FAIL(("didn't fail? what %hd", what)); 744 } 745 746 event_base_loopexit(base, NULL); 747 } 748 749 static void 750 close_socket_cb(evutil_socket_t fd, short what, void *arg) 751 { 752 evutil_socket_t *fdp = arg; 753 if (*fdp >= 0) { 754 evutil_closesocket(*fdp); 755 *fdp = -1; 756 } 757 } 758 759 static void 760 test_bufferevent_connect_fail(void *arg) 761 { 762 struct basic_test_data *data = (struct basic_test_data *)arg; 763 struct bufferevent *bev=NULL; 764 struct sockaddr_in localhost; 765 struct sockaddr *sa = (struct sockaddr*)&localhost; 766 evutil_socket_t fake_listener = -1; 767 ev_socklen_t slen = sizeof(localhost); 768 struct event close_listener_event; 769 int close_listener_event_added = 0; 770 struct timeval one_second = { 1, 0 }; 771 int r; 772 773 test_ok = 0; 774 775 memset(&localhost, 0, sizeof(localhost)); 776 localhost.sin_port = 0; /* have the kernel pick a port */ 777 localhost.sin_addr.s_addr = htonl(0x7f000001L); 778 localhost.sin_family = AF_INET; 779 780 /* bind, but don't listen or accept. should trigger 781 "Connection refused" reliably on most platforms. */ 782 fake_listener = socket(localhost.sin_family, SOCK_STREAM, 0); 783 tt_assert(fake_listener >= 0); 784 tt_assert(bind(fake_listener, sa, slen) == 0); 785 tt_assert(getsockname(fake_listener, sa, &slen) == 0); 786 bev = bufferevent_socket_new(data->base, -1, 787 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); 788 tt_assert(bev); 789 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base); 790 791 r = bufferevent_socket_connect(bev, sa, slen); 792 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells 793 * detects the error immediately, which is not really wrong of it. */ 794 tt_want(r == 0 || r == -1); 795 796 /* Close the listener socket after a second. This should trigger 797 "connection refused" on some other platforms, including OSX. */ 798 evtimer_assign(&close_listener_event, data->base, close_socket_cb, 799 &fake_listener); 800 event_add(&close_listener_event, &one_second); 801 close_listener_event_added = 1; 802 803 event_base_dispatch(data->base); 804 805 tt_int_op(test_ok, ==, 1); 806 807 end: 808 if (fake_listener >= 0) 809 evutil_closesocket(fake_listener); 810 811 if (bev) 812 bufferevent_free(bev); 813 814 if (close_listener_event_added) 815 event_del(&close_listener_event); 816 } 817 818 struct timeout_cb_result { 819 struct timeval read_timeout_at; 820 struct timeval write_timeout_at; 821 struct timeval last_wrote_at; 822 int n_read_timeouts; 823 int n_write_timeouts; 824 int total_calls; 825 }; 826 827 static void 828 bev_timeout_write_cb(struct bufferevent *bev, void *arg) 829 { 830 struct timeout_cb_result *res = arg; 831 evutil_gettimeofday(&res->last_wrote_at, NULL); 832 } 833 834 static void 835 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg) 836 { 837 struct timeout_cb_result *res = arg; 838 ++res->total_calls; 839 840 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) 841 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) { 842 evutil_gettimeofday(&res->read_timeout_at, NULL); 843 ++res->n_read_timeouts; 844 } 845 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) 846 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) { 847 evutil_gettimeofday(&res->write_timeout_at, NULL); 848 ++res->n_write_timeouts; 849 } 850 } 851 852 static void 853 test_bufferevent_timeouts(void *arg) 854 { 855 /* "arg" is a string containing "pair" and/or "filter". */ 856 struct bufferevent *bev1 = NULL, *bev2 = NULL; 857 struct basic_test_data *data = arg; 858 int use_pair = 0, use_filter = 0; 859 struct timeval tv_w, tv_r, started_at; 860 struct timeout_cb_result res1, res2; 861 char buf[1024]; 862 863 memset(&res1, 0, sizeof(res1)); 864 memset(&res2, 0, sizeof(res2)); 865 866 if (strstr((char*)data->setup_data, "pair")) 867 use_pair = 1; 868 if (strstr((char*)data->setup_data, "filter")) 869 use_filter = 1; 870 871 if (use_pair) { 872 struct bufferevent *p[2]; 873 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p)); 874 bev1 = p[0]; 875 bev2 = p[1]; 876 } else { 877 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0); 878 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0); 879 } 880 881 tt_assert(bev1); 882 tt_assert(bev2); 883 884 if (use_filter) { 885 struct bufferevent *bevf1, *bevf2; 886 bevf1 = bufferevent_filter_new(bev1, NULL, NULL, 887 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 888 bevf2 = bufferevent_filter_new(bev2, NULL, NULL, 889 BEV_OPT_CLOSE_ON_FREE, NULL, NULL); 890 tt_assert(bevf1); 891 tt_assert(bevf2); 892 bev1 = bevf1; 893 bev2 = bevf2; 894 } 895 896 /* Do this nice and early. */ 897 bufferevent_disable(bev2, EV_READ); 898 899 /* bev1 will try to write and read. Both will time out. */ 900 evutil_gettimeofday(&started_at, NULL); 901 tv_w.tv_sec = tv_r.tv_sec = 0; 902 tv_w.tv_usec = 100*1000; 903 tv_r.tv_usec = 150*1000; 904 bufferevent_setcb(bev1, NULL, bev_timeout_write_cb, 905 bev_timeout_event_cb, &res1); 906 bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0); 907 bufferevent_set_timeouts(bev1, &tv_r, &tv_w); 908 if (use_pair) { 909 /* For a pair, the fact that the other side isn't reading 910 * makes the writer stall */ 911 bufferevent_write(bev1, "ABCDEFG", 7); 912 } else { 913 /* For a real socket, the kernel's TCP buffers can eat a 914 * fair number of bytes; make sure that at some point we 915 * have some bytes that will stall. */ 916 struct evbuffer *output = bufferevent_get_output(bev1); 917 int i; 918 memset(buf, 0xbb, sizeof(buf)); 919 for (i=0;i<1024;++i) { 920 evbuffer_add_reference(output, buf, sizeof(buf), 921 NULL, NULL); 922 } 923 } 924 bufferevent_enable(bev1, EV_READ|EV_WRITE); 925 926 /* bev2 has nothing to say, and isn't listening. */ 927 bufferevent_setcb(bev2, NULL, bev_timeout_write_cb, 928 bev_timeout_event_cb, &res2); 929 tv_w.tv_sec = tv_r.tv_sec = 0; 930 tv_w.tv_usec = 200*1000; 931 tv_r.tv_usec = 100*1000; 932 bufferevent_set_timeouts(bev2, &tv_r, &tv_w); 933 bufferevent_enable(bev2, EV_WRITE); 934 935 tv_r.tv_sec = 0; 936 tv_r.tv_usec = 350000; 937 938 event_base_loopexit(data->base, &tv_r); 939 event_base_dispatch(data->base); 940 941 /* XXXX Test that actually reading or writing a little resets the 942 * timeouts. */ 943 944 /* Each buf1 timeout happens, and happens only once. */ 945 tt_want(res1.n_read_timeouts); 946 tt_want(res1.n_write_timeouts); 947 tt_want(res1.n_read_timeouts == 1); 948 tt_want(res1.n_write_timeouts == 1); 949 950 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150); 951 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100); 952 953 end: 954 if (bev1) 955 bufferevent_free(bev1); 956 if (bev2) 957 bufferevent_free(bev2); 958 } 959 960 static void 961 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) 962 { 963 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); 964 } 965 966 static void 967 trigger_eventcb(struct bufferevent *bev, short what, void *ctx) 968 { 969 struct event_base *base = ctx; 970 if (what == ~0) { 971 TT_BLATHER(("Event successfully triggered.")); 972 event_base_loopexit(base, NULL); 973 return; 974 } 975 reader_eventcb(bev, what, ctx); 976 } 977 978 static void 979 trigger_readcb_triggered(struct bufferevent *bev, void *ctx) 980 { 981 TT_BLATHER(("Read successfully triggered.")); 982 n_reads_invoked++; 983 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags); 984 } 985 986 static void 987 trigger_readcb(struct bufferevent *bev, void *ctx) 988 { 989 struct timeval timeout = { 30, 0 }; 990 struct event_base *base = ctx; 991 size_t low, high, len; 992 int expected_reads; 993 994 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); 995 expected_reads = ++n_reads_invoked; 996 997 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx); 998 999 bufferevent_getwatermark(bev, EV_READ, &low, &high); 1000 len = evbuffer_get_length(bufferevent_get_input(bev)); 1001 1002 bufferevent_setwatermark(bev, EV_READ, len + 1, 0); 1003 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); 1004 /* no callback expected */ 1005 tt_int_op(n_reads_invoked, ==, expected_reads); 1006 1007 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || 1008 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { 1009 /* will be deferred */ 1010 } else { 1011 expected_reads++; 1012 } 1013 1014 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); 1015 1016 bufferevent_trigger(bev, EV_READ, 1017 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); 1018 tt_int_op(n_reads_invoked, ==, expected_reads); 1019 1020 bufferevent_setwatermark(bev, EV_READ, low, high); 1021 end: 1022 ; 1023 } 1024 1025 static void 1026 test_bufferevent_trigger(void *arg) 1027 { 1028 struct basic_test_data *data = arg; 1029 struct evconnlistener *lev=NULL; 1030 struct bufferevent *bev=NULL; 1031 struct sockaddr_in localhost; 1032 struct sockaddr_storage ss; 1033 struct sockaddr *sa; 1034 ev_socklen_t slen; 1035 1036 int be_flags=BEV_OPT_CLOSE_ON_FREE; 1037 int trig_flags=0; 1038 1039 if (strstr((char*)data->setup_data, "defer")) { 1040 be_flags |= BEV_OPT_DEFER_CALLBACKS; 1041 } 1042 bufferevent_connect_test_flags = be_flags; 1043 1044 if (strstr((char*)data->setup_data, "postpone")) { 1045 trig_flags |= BEV_TRIG_DEFER_CALLBACKS; 1046 } 1047 bufferevent_trigger_test_flags = trig_flags; 1048 1049 memset(&localhost, 0, sizeof(localhost)); 1050 1051 localhost.sin_port = 0; /* pick-a-port */ 1052 localhost.sin_addr.s_addr = htonl(0x7f000001L); 1053 localhost.sin_family = AF_INET; 1054 sa = (struct sockaddr *)&localhost; 1055 lev = evconnlistener_new_bind(data->base, listen_cb, data->base, 1056 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 1057 16, sa, sizeof(localhost)); 1058 tt_assert(lev); 1059 1060 sa = (struct sockaddr *)&ss; 1061 slen = sizeof(ss); 1062 if (regress_get_listener_addr(lev, sa, &slen) < 0) { 1063 tt_abort_perror("getsockname"); 1064 } 1065 1066 tt_assert(!evconnlistener_enable(lev)); 1067 bev = bufferevent_socket_new(data->base, -1, be_flags); 1068 tt_assert(bev); 1069 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base); 1070 1071 bufferevent_enable(bev, EV_READ); 1072 1073 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); 1074 1075 event_base_dispatch(data->base); 1076 1077 tt_int_op(n_reads_invoked, ==, 2); 1078 end: 1079 if (lev) 1080 evconnlistener_free(lev); 1081 1082 if (bev) 1083 bufferevent_free(bev); 1084 } 1085 1086 struct testcase_t bufferevent_testcases[] = { 1087 1088 LEGACY(bufferevent, TT_ISOLATED), 1089 LEGACY(bufferevent_pair, TT_ISOLATED), 1090 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) 1091 { "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock, 1092 TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY, 1093 &basic_setup, NULL }, 1094 #endif 1095 LEGACY(bufferevent_watermarks, TT_ISOLATED), 1096 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), 1097 LEGACY(bufferevent_filters, TT_ISOLATED), 1098 LEGACY(bufferevent_pair_filters, TT_ISOLATED), 1099 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, 1100 &basic_setup, (void*)"" }, 1101 { "bufferevent_connect_defer", test_bufferevent_connect, 1102 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1103 { "bufferevent_connect_lock", test_bufferevent_connect, 1104 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" }, 1105 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1106 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1107 (void*)"defer lock" }, 1108 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect, 1109 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1110 (void*)"lock defer unlocked" }, 1111 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1112 TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, 1113 { "bufferevent_timeout", test_bufferevent_timeouts, 1114 TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" }, 1115 { "bufferevent_timeout_pair", test_bufferevent_timeouts, 1116 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" }, 1117 { "bufferevent_timeout_filter", test_bufferevent_timeouts, 1118 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, 1119 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, 1120 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, 1121 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, 1122 &basic_setup, (void*)"" }, 1123 { "bufferevent_trigger_defer", test_bufferevent_trigger, 1124 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, 1125 { "bufferevent_trigger_postpone", test_bufferevent_trigger, 1126 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1127 (void*)"postpone" }, 1128 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, 1129 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, 1130 (void*)"defer postpone" }, 1131 #ifdef EVENT__HAVE_LIBZ 1132 LEGACY(bufferevent_zlib, TT_ISOLATED), 1133 #else 1134 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL }, 1135 #endif 1136 1137 END_OF_TESTCASES, 1138 }; 1139 1140 struct testcase_t bufferevent_iocp_testcases[] = { 1141 1142 LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP), 1143 LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP), 1144 LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP), 1145 { "bufferevent_connect", test_bufferevent_connect, 1146 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" }, 1147 { "bufferevent_connect_defer", test_bufferevent_connect, 1148 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" }, 1149 { "bufferevent_connect_lock", test_bufferevent_connect, 1150 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1151 (void*)"lock" }, 1152 { "bufferevent_connect_lock_defer", test_bufferevent_connect, 1153 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, 1154 (void*)"defer lock" }, 1155 { "bufferevent_connect_fail", test_bufferevent_connect_fail, 1156 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, 1157 { "bufferevent_connect_nonblocking", test_bufferevent_connect, 1158 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, 1159 (void*)"unset_connectex" }, 1160 1161 END_OF_TESTCASES, 1162 }; 1163