1 /* 2 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 3. The name of the author may not be used to endorse or promote products 13 * derived from this software without specific prior written permission. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 */ 26 #include "../util-internal.h" 27 28 #include <stdio.h> 29 #include <stdlib.h> 30 #include <string.h> 31 #include <assert.h> 32 #include <math.h> 33 34 #ifdef _WIN32 35 #include <winsock2.h> 36 #include <ws2tcpip.h> 37 #else 38 #include <sys/socket.h> 39 #include <netinet/in.h> 40 # ifdef _XOPEN_SOURCE_EXTENDED 41 # include <arpa/inet.h> 42 # endif 43 #endif 44 #include <signal.h> 45 46 #include "event2/bufferevent.h" 47 #include "event2/buffer.h" 48 #include "event2/event.h" 49 #include "event2/util.h" 50 #include "event2/listener.h" 51 #include "event2/thread.h" 52 53 54 static int cfg_verbose = 0; 55 static int cfg_help = 0; 56 57 static int cfg_n_connections = 30; 58 static int cfg_duration = 5; 59 static int cfg_connlimit = 0; 60 static int cfg_grouplimit = 0; 61 static int cfg_tick_msec = 1000; 62 static int cfg_min_share = -1; 63 static int cfg_group_drain = 0; 64 65 static int cfg_connlimit_tolerance = -1; 66 static int cfg_grouplimit_tolerance = -1; 67 static int cfg_stddev_tolerance = -1; 68 69 #ifdef _WIN32 70 static int cfg_enable_iocp = 0; 71 #endif 72 73 static struct timeval cfg_tick = { 0, 500*1000 }; 74 75 static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL; 76 static struct ev_token_bucket_cfg *group_bucket_cfg = NULL; 77 struct bufferevent_rate_limit_group *ratelim_group = NULL; 78 static double seconds_per_tick = 0.0; 79 80 struct client_state { 81 size_t queued; 82 ev_uint64_t received; 83 84 }; 85 static const struct timeval *ms100_common=NULL; 86 87 /* info from check_bucket_levels_cb */ 88 static int total_n_bev_checks = 0; 89 static ev_int64_t total_rbucket_level=0; 90 static ev_int64_t total_wbucket_level=0; 91 static ev_int64_t total_max_to_read=0; 92 static ev_int64_t total_max_to_write=0; 93 static ev_int64_t max_bucket_level=EV_INT64_MIN; 94 static ev_int64_t min_bucket_level=EV_INT64_MAX; 95 96 /* from check_group_bucket_levels_cb */ 97 static int total_n_group_bev_checks = 0; 98 static ev_int64_t total_group_rbucket_level = 0; 99 static ev_int64_t total_group_wbucket_level = 0; 100 101 static int n_echo_conns_open = 0; 102 103 /* Info on the open connections */ 104 struct bufferevent **bevs; 105 struct client_state *states; 106 struct bufferevent_rate_limit_group *group = NULL; 107 108 static void check_bucket_levels_cb(evutil_socket_t fd, short events, void *arg); 109 110 static void 111 loud_writecb(struct bufferevent *bev, void *ctx) 112 { 113 struct client_state *cs = ctx; 114 struct evbuffer *output = bufferevent_get_output(bev); 115 char buf[1024]; 116 #ifdef _WIN32 117 int r = rand() % 256; 118 #else 119 int r = random() % 256; 120 #endif 121 memset(buf, r, sizeof(buf)); 122 while (evbuffer_get_length(output) < 8192) { 123 evbuffer_add(output, buf, sizeof(buf)); 124 cs->queued += sizeof(buf); 125 } 126 } 127 128 static void 129 discard_readcb(struct bufferevent *bev, void *ctx) 130 { 131 struct client_state *cs = ctx; 132 struct evbuffer *input = bufferevent_get_input(bev); 133 size_t len = evbuffer_get_length(input); 134 evbuffer_drain(input, len); 135 cs->received += len; 136 } 137 138 static void 139 write_on_connectedcb(struct bufferevent *bev, short what, void *ctx) 140 { 141 if (what & BEV_EVENT_CONNECTED) { 142 loud_writecb(bev, ctx); 143 /* XXXX this shouldn't be needed. */ 144 bufferevent_enable(bev, EV_READ|EV_WRITE); 145 } 146 } 147 148 static void 149 echo_readcb(struct bufferevent *bev, void *ctx) 150 { 151 struct evbuffer *input = bufferevent_get_input(bev); 152 struct evbuffer *output = bufferevent_get_output(bev); 153 154 evbuffer_add_buffer(output, input); 155 if (evbuffer_get_length(output) > 1024000) 156 bufferevent_disable(bev, EV_READ); 157 } 158 159 static void 160 echo_writecb(struct bufferevent *bev, void *ctx) 161 { 162 struct evbuffer *output = bufferevent_get_output(bev); 163 if (evbuffer_get_length(output) < 512000) 164 bufferevent_enable(bev, EV_READ); 165 } 166 167 static void 168 echo_eventcb(struct bufferevent *bev, short what, void *ctx) 169 { 170 if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) { 171 --n_echo_conns_open; 172 bufferevent_free(bev); 173 } 174 } 175 176 static void 177 echo_listenercb(struct evconnlistener *listener, evutil_socket_t newsock, 178 struct sockaddr *sourceaddr, int socklen, void *ctx) 179 { 180 struct event_base *base = ctx; 181 int flags = BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE; 182 struct bufferevent *bev; 183 184 bev = bufferevent_socket_new(base, newsock, flags); 185 bufferevent_setcb(bev, echo_readcb, echo_writecb, echo_eventcb, NULL); 186 if (conn_bucket_cfg) { 187 struct event *check_event = 188 event_new(base, -1, EV_PERSIST, check_bucket_levels_cb, bev); 189 bufferevent_set_rate_limit(bev, conn_bucket_cfg); 190 191 assert(bufferevent_get_token_bucket_cfg(bev) != NULL); 192 event_add(check_event, ms100_common); 193 } 194 if (ratelim_group) 195 bufferevent_add_to_rate_limit_group(bev, ratelim_group); 196 ++n_echo_conns_open; 197 bufferevent_enable(bev, EV_READ|EV_WRITE); 198 } 199 200 /* Called periodically to check up on how full the buckets are */ 201 static void 202 check_bucket_levels_cb(evutil_socket_t fd, short events, void *arg) 203 { 204 struct bufferevent *bev = arg; 205 206 ev_ssize_t r = bufferevent_get_read_limit(bev); 207 ev_ssize_t w = bufferevent_get_write_limit(bev); 208 ev_ssize_t rm = bufferevent_get_max_to_read(bev); 209 ev_ssize_t wm = bufferevent_get_max_to_write(bev); 210 /* XXXX check that no value is above the cofigured burst 211 * limit */ 212 total_rbucket_level += r; 213 total_wbucket_level += w; 214 total_max_to_read += rm; 215 total_max_to_write += wm; 216 #define B(x) \ 217 if ((x) > max_bucket_level) \ 218 max_bucket_level = (x); \ 219 if ((x) < min_bucket_level) \ 220 min_bucket_level = (x) 221 B(r); 222 B(w); 223 #undef B 224 225 total_n_bev_checks++; 226 if (total_n_bev_checks >= .8 * (cfg_duration / cfg_tick_msec) * cfg_n_connections) { 227 event_free(event_base_get_running_event(bufferevent_get_base(bev))); 228 } 229 } 230 231 static void 232 check_group_bucket_levels_cb(evutil_socket_t fd, short events, void *arg) 233 { 234 if (ratelim_group) { 235 ev_ssize_t r = bufferevent_rate_limit_group_get_read_limit(ratelim_group); 236 ev_ssize_t w = bufferevent_rate_limit_group_get_write_limit(ratelim_group); 237 total_group_rbucket_level += r; 238 total_group_wbucket_level += w; 239 } 240 ++total_n_group_bev_checks; 241 } 242 243 static void 244 group_drain_cb(evutil_socket_t fd, short events, void *arg) 245 { 246 bufferevent_rate_limit_group_decrement_read(ratelim_group, cfg_group_drain); 247 bufferevent_rate_limit_group_decrement_write(ratelim_group, cfg_group_drain); 248 } 249 250 static int 251 test_ratelimiting(void) 252 { 253 struct event_base *base; 254 struct sockaddr_in sin; 255 struct evconnlistener *listener; 256 257 struct sockaddr_storage ss; 258 ev_socklen_t slen; 259 260 int i; 261 262 struct timeval tv; 263 264 ev_uint64_t total_received; 265 double total_sq_persec, total_persec; 266 double variance; 267 double expected_total_persec = -1.0, expected_avg_persec = -1.0; 268 int ok = 1; 269 struct event_config *base_cfg; 270 struct event *periodic_level_check; 271 struct event *group_drain_event=NULL; 272 273 memset(&sin, 0, sizeof(sin)); 274 sin.sin_family = AF_INET; 275 sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ 276 sin.sin_port = 0; /* unspecified port */ 277 278 if (0) 279 event_enable_debug_mode(); 280 281 base_cfg = event_config_new(); 282 283 #ifdef _WIN32 284 if (cfg_enable_iocp) { 285 evthread_use_windows_threads(); 286 event_config_set_flag(base_cfg, EVENT_BASE_FLAG_STARTUP_IOCP); 287 } 288 #endif 289 290 base = event_base_new_with_config(base_cfg); 291 event_config_free(base_cfg); 292 if (! base) { 293 fprintf(stderr, "Couldn't create event_base"); 294 return 1; 295 } 296 297 listener = evconnlistener_new_bind(base, echo_listenercb, base, 298 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, 299 (struct sockaddr *)&sin, sizeof(sin)); 300 if (! listener) { 301 fprintf(stderr, "Couldn't create listener"); 302 return 1; 303 } 304 305 slen = sizeof(ss); 306 if (getsockname(evconnlistener_get_fd(listener), (struct sockaddr *)&ss, 307 &slen) < 0) { 308 perror("getsockname"); 309 return 1; 310 } 311 312 if (cfg_connlimit > 0) { 313 conn_bucket_cfg = ev_token_bucket_cfg_new( 314 cfg_connlimit, cfg_connlimit * 4, 315 cfg_connlimit, cfg_connlimit * 4, 316 &cfg_tick); 317 assert(conn_bucket_cfg); 318 } 319 320 if (cfg_grouplimit > 0) { 321 group_bucket_cfg = ev_token_bucket_cfg_new( 322 cfg_grouplimit, cfg_grouplimit * 4, 323 cfg_grouplimit, cfg_grouplimit * 4, 324 &cfg_tick); 325 group = ratelim_group = bufferevent_rate_limit_group_new( 326 base, group_bucket_cfg); 327 expected_total_persec = cfg_grouplimit - (cfg_group_drain / seconds_per_tick); 328 expected_avg_persec = cfg_grouplimit / cfg_n_connections; 329 if (cfg_connlimit > 0 && expected_avg_persec > cfg_connlimit) 330 expected_avg_persec = cfg_connlimit; 331 if (cfg_min_share >= 0) 332 bufferevent_rate_limit_group_set_min_share( 333 ratelim_group, cfg_min_share); 334 } 335 336 if (expected_avg_persec < 0 && cfg_connlimit > 0) 337 expected_avg_persec = cfg_connlimit; 338 339 if (expected_avg_persec > 0) 340 expected_avg_persec /= seconds_per_tick; 341 if (expected_total_persec > 0) 342 expected_total_persec /= seconds_per_tick; 343 344 bevs = calloc(cfg_n_connections, sizeof(struct bufferevent *)); 345 states = calloc(cfg_n_connections, sizeof(struct client_state)); 346 347 for (i = 0; i < cfg_n_connections; ++i) { 348 bevs[i] = bufferevent_socket_new(base, -1, 349 BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE); 350 assert(bevs[i]); 351 bufferevent_setcb(bevs[i], discard_readcb, loud_writecb, 352 write_on_connectedcb, &states[i]); 353 bufferevent_enable(bevs[i], EV_READ|EV_WRITE); 354 bufferevent_socket_connect(bevs[i], (struct sockaddr *)&ss, 355 slen); 356 } 357 358 tv.tv_sec = cfg_duration - 1; 359 tv.tv_usec = 995000; 360 361 event_base_loopexit(base, &tv); 362 363 tv.tv_sec = 0; 364 tv.tv_usec = 100*1000; 365 ms100_common = event_base_init_common_timeout(base, &tv); 366 367 periodic_level_check = event_new(base, -1, EV_PERSIST, check_group_bucket_levels_cb, NULL); 368 event_add(periodic_level_check, ms100_common); 369 370 if (cfg_group_drain && ratelim_group) { 371 group_drain_event = event_new(base, -1, EV_PERSIST, group_drain_cb, NULL); 372 event_add(group_drain_event, &cfg_tick); 373 } 374 375 event_base_dispatch(base); 376 377 ratelim_group = NULL; /* So no more responders get added */ 378 event_free(periodic_level_check); 379 if (group_drain_event) 380 event_del(group_drain_event); 381 382 for (i = 0; i < cfg_n_connections; ++i) { 383 bufferevent_free(bevs[i]); 384 } 385 evconnlistener_free(listener); 386 387 /* Make sure no new echo_conns get added to the group. */ 388 ratelim_group = NULL; 389 390 /* This should get _everybody_ freed */ 391 while (n_echo_conns_open) { 392 printf("waiting for %d conns\n", n_echo_conns_open); 393 tv.tv_sec = 0; 394 tv.tv_usec = 300000; 395 event_base_loopexit(base, &tv); 396 event_base_dispatch(base); 397 } 398 399 if (group) 400 bufferevent_rate_limit_group_free(group); 401 402 if (total_n_bev_checks) { 403 printf("Average read bucket level: %f\n", 404 (double)total_rbucket_level/total_n_bev_checks); 405 printf("Average write bucket level: %f\n", 406 (double)total_wbucket_level/total_n_bev_checks); 407 printf("Highest read bucket level: %f\n", 408 (double)max_bucket_level); 409 printf("Highest write bucket level: %f\n", 410 (double)min_bucket_level); 411 printf("Average max-to-read: %f\n", 412 ((double)total_max_to_read)/total_n_bev_checks); 413 printf("Average max-to-write: %f\n", 414 ((double)total_max_to_write)/total_n_bev_checks); 415 } 416 if (total_n_group_bev_checks) { 417 printf("Average group read bucket level: %f\n", 418 ((double)total_group_rbucket_level)/total_n_group_bev_checks); 419 printf("Average group write bucket level: %f\n", 420 ((double)total_group_wbucket_level)/total_n_group_bev_checks); 421 } 422 423 total_received = 0; 424 total_persec = 0.0; 425 total_sq_persec = 0.0; 426 for (i=0; i < cfg_n_connections; ++i) { 427 double persec = states[i].received; 428 persec /= cfg_duration; 429 total_received += states[i].received; 430 total_persec += persec; 431 total_sq_persec += persec*persec; 432 printf("%d: %f per second\n", i+1, persec); 433 } 434 printf(" total: %f per second\n", 435 ((double)total_received)/cfg_duration); 436 if (expected_total_persec > 0) { 437 double diff = expected_total_persec - 438 ((double)total_received/cfg_duration); 439 printf(" [Off by %lf]\n", diff); 440 if (cfg_grouplimit_tolerance > 0 && 441 fabs(diff) > cfg_grouplimit_tolerance) { 442 fprintf(stderr, "Group bandwidth out of bounds\n"); 443 ok = 0; 444 } 445 } 446 447 printf(" average: %f per second\n", 448 (((double)total_received)/cfg_duration)/cfg_n_connections); 449 if (expected_avg_persec > 0) { 450 double diff = expected_avg_persec - (((double)total_received)/cfg_duration)/cfg_n_connections; 451 printf(" [Off by %lf]\n", diff); 452 if (cfg_connlimit_tolerance > 0 && 453 fabs(diff) > cfg_connlimit_tolerance) { 454 fprintf(stderr, "Connection bandwidth out of bounds\n"); 455 ok = 0; 456 } 457 } 458 459 variance = total_sq_persec/cfg_n_connections - total_persec*total_persec/(cfg_n_connections*cfg_n_connections); 460 461 printf(" stddev: %f per second\n", sqrt(variance)); 462 if (cfg_stddev_tolerance > 0 && 463 sqrt(variance) > cfg_stddev_tolerance) { 464 fprintf(stderr, "Connection variance out of bounds\n"); 465 ok = 0; 466 } 467 468 event_base_free(base); 469 free(bevs); 470 free(states); 471 472 return ok ? 0 : 1; 473 } 474 475 static struct option { 476 const char *name; int *ptr; int min; int isbool; 477 } options[] = { 478 { "-v", &cfg_verbose, 0, 1 }, 479 { "-h", &cfg_help, 0, 1 }, 480 { "-n", &cfg_n_connections, 1, 0 }, 481 { "-d", &cfg_duration, 1, 0 }, 482 { "-c", &cfg_connlimit, 0, 0 }, 483 { "-g", &cfg_grouplimit, 0, 0 }, 484 { "-G", &cfg_group_drain, -100000, 0 }, 485 { "-t", &cfg_tick_msec, 10, 0 }, 486 { "--min-share", &cfg_min_share, 0, 0 }, 487 { "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 }, 488 { "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 }, 489 { "--check-stddev", &cfg_stddev_tolerance, 0, 0 }, 490 #ifdef _WIN32 491 { "--iocp", &cfg_enable_iocp, 0, 1 }, 492 #endif 493 { NULL, NULL, -1, 0 }, 494 }; 495 496 static int 497 handle_option(int argc, char **argv, int *i, const struct option *opt) 498 { 499 long val; 500 char *endptr = NULL; 501 if (opt->isbool) { 502 *opt->ptr = 1; 503 return 0; 504 } 505 if (*i + 1 == argc) { 506 fprintf(stderr, "Too few arguments to '%s'\n",argv[*i]); 507 return -1; 508 } 509 val = strtol(argv[*i+1], &endptr, 10); 510 if (*argv[*i+1] == '\0' || !endptr || *endptr != '\0') { 511 fprintf(stderr, "Couldn't parse numeric value '%s'\n", 512 argv[*i+1]); 513 return -1; 514 } 515 if (val < opt->min || val > 0x7fffffff) { 516 fprintf(stderr, "Value '%s' is out-of-range'\n", 517 argv[*i+1]); 518 return -1; 519 } 520 *opt->ptr = (int)val; 521 ++*i; 522 return 0; 523 } 524 525 static void 526 usage(void) 527 { 528 fprintf(stderr, 529 "test-ratelim [-v] [-n INT] [-d INT] [-c INT] [-g INT] [-t INT]\n\n" 530 "Pushes bytes through a number of possibly rate-limited connections, and\n" 531 "displays average throughput.\n\n" 532 " -n INT: Number of connections to open (default: 30)\n" 533 " -d INT: Duration of the test in seconds (default: 5 sec)\n"); 534 fprintf(stderr, 535 " -c INT: Connection-rate limit applied to each connection in bytes per second\n" 536 " (default: None.)\n" 537 " -g INT: Group-rate limit applied to sum of all usage in bytes per second\n" 538 " (default: None.)\n" 539 " -G INT: drain INT bytes from the group limit every tick. (default: 0)\n" 540 " -t INT: Granularity of timing, in milliseconds (default: 1000 msec)\n"); 541 } 542 543 int 544 main(int argc, char **argv) 545 { 546 int i,j; 547 double ratio; 548 549 #ifdef _WIN32 550 WORD wVersionRequested = MAKEWORD(2,2); 551 WSADATA wsaData; 552 553 (void) WSAStartup(wVersionRequested, &wsaData); 554 #endif 555 556 #ifndef _WIN32 557 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) 558 return 1; 559 #endif 560 for (i = 1; i < argc; ++i) { 561 for (j = 0; options[j].name; ++j) { 562 if (!strcmp(argv[i],options[j].name)) { 563 if (handle_option(argc,argv,&i,&options[j])<0) 564 return 1; 565 goto again; 566 } 567 } 568 fprintf(stderr, "Unknown option '%s'\n", argv[i]); 569 usage(); 570 return 1; 571 again: 572 ; 573 } 574 if (cfg_help) { 575 usage(); 576 return 0; 577 } 578 579 cfg_tick.tv_sec = cfg_tick_msec / 1000; 580 cfg_tick.tv_usec = (cfg_tick_msec % 1000)*1000; 581 582 seconds_per_tick = ratio = cfg_tick_msec / 1000.0; 583 584 cfg_connlimit *= ratio; 585 cfg_grouplimit *= ratio; 586 587 { 588 struct timeval tv; 589 evutil_gettimeofday(&tv, NULL); 590 #ifdef _WIN32 591 srand(tv.tv_usec); 592 #else 593 srandom(tv.tv_usec); 594 #endif 595 } 596 597 #ifndef EVENT__DISABLE_THREAD_SUPPORT 598 evthread_enable_lock_debugging(); 599 #endif 600 601 return test_ratelimiting(); 602 } 603