xref: /freebsd/contrib/ntp/sntp/libevent/bufferevent_ratelim.c (revision ba3c1f5972d7b90feb6e6da47905ff2757e0fe57)
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "evconfig-private.h"
29 
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
34 
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
41 
42 #include "ratelim-internal.h"
43 
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
48 
49 int
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51     const struct ev_token_bucket_cfg *cfg,
52     ev_uint32_t current_tick,
53     int reinitialize)
54 {
55 	if (reinitialize) {
56 		/* on reinitialization, we only clip downwards, since we've
57 		   already used who-knows-how-much bandwidth this tick.  We
58 		   leave "last_updated" as it is; the next update will add the
59 		   appropriate amount of bandwidth to the bucket.
60 		*/
61 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62 			bucket->read_limit = cfg->read_maximum;
63 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64 			bucket->write_limit = cfg->write_maximum;
65 	} else {
66 		bucket->read_limit = cfg->read_rate;
67 		bucket->write_limit = cfg->write_rate;
68 		bucket->last_updated = current_tick;
69 	}
70 	return 0;
71 }
72 
73 int
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75     const struct ev_token_bucket_cfg *cfg,
76     ev_uint32_t current_tick)
77 {
78 	/* It's okay if the tick number overflows, since we'll just
79 	 * wrap around when we do the unsigned substraction. */
80 	unsigned n_ticks = current_tick - bucket->last_updated;
81 
82 	/* Make sure some ticks actually happened, and that time didn't
83 	 * roll back. */
84 	if (n_ticks == 0 || n_ticks > INT_MAX)
85 		return 0;
86 
87 	/* Naively, we would say
88 		bucket->limit += n_ticks * cfg->rate;
89 
90 		if (bucket->limit > cfg->maximum)
91 			bucket->limit = cfg->maximum;
92 
93 	   But we're worried about overflow, so we do it like this:
94 	*/
95 
96 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97 		bucket->read_limit = cfg->read_maximum;
98 	else
99 		bucket->read_limit += n_ticks * cfg->read_rate;
100 
101 
102 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103 		bucket->write_limit = cfg->write_maximum;
104 	else
105 		bucket->write_limit += n_ticks * cfg->write_rate;
106 
107 
108 	bucket->last_updated = current_tick;
109 
110 	return 1;
111 }
112 
113 static inline void
114 bufferevent_update_buckets(struct bufferevent_private *bev)
115 {
116 	/* Must hold lock on bev. */
117 	struct timeval now;
118 	unsigned tick;
119 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121 	if (tick != bev->rate_limiting->limit.last_updated)
122 		ev_token_bucket_update_(&bev->rate_limiting->limit,
123 		    bev->rate_limiting->cfg, tick);
124 }
125 
126 ev_uint32_t
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128     const struct ev_token_bucket_cfg *cfg)
129 {
130 	/* This computation uses two multiplies and a divide.  We could do
131 	 * fewer if we knew that the tick length was an integer number of
132 	 * seconds, or if we knew it divided evenly into a second.  We should
133 	 * investigate that more.
134 	 */
135 
136 	/* We cast to an ev_uint64_t first, since we don't want to overflow
137 	 * before we do the final divide. */
138 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139 	return (unsigned)(msec / cfg->msec_per_tick);
140 }
141 
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144     size_t write_rate, size_t write_burst,
145     const struct timeval *tick_len)
146 {
147 	struct ev_token_bucket_cfg *r;
148 	struct timeval g;
149 	if (! tick_len) {
150 		g.tv_sec = 1;
151 		g.tv_usec = 0;
152 		tick_len = &g;
153 	}
154 	if (read_rate > read_burst || write_rate > write_burst ||
155 	    read_rate < 1 || write_rate < 1)
156 		return NULL;
157 	if (read_rate > EV_RATE_LIMIT_MAX ||
158 	    write_rate > EV_RATE_LIMIT_MAX ||
159 	    read_burst > EV_RATE_LIMIT_MAX ||
160 	    write_burst > EV_RATE_LIMIT_MAX)
161 		return NULL;
162 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163 	if (!r)
164 		return NULL;
165 	r->read_rate = read_rate;
166 	r->write_rate = write_rate;
167 	r->read_maximum = read_burst;
168 	r->write_maximum = write_burst;
169 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172 	return r;
173 }
174 
175 void
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 {
178 	mm_free(cfg);
179 }
180 
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
184 
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187 
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192 
193 /** Helper: figure out the maximum amount we should write if is_write, or
194     the maximum amount we should read if is_read.  Return that maximum, or
195     0 if our bucket is wholly exhausted.
196  */
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199 {
200 	/* needs lock on bev. */
201 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202 
203 #define LIM(x)						\
204 	(is_write ? (x).write_limit : (x).read_limit)
205 
206 #define GROUP_SUSPENDED(g)			\
207 	(is_write ? (g)->write_suspended : (g)->read_suspended)
208 
209 	/* Sets max_so_far to MIN(x, max_so_far) */
210 #define CLAMPTO(x)				\
211 	do {					\
212 		if (max_so_far > (x))		\
213 			max_so_far = (x);	\
214 	} while (0);
215 
216 	if (!bev->rate_limiting)
217 		return max_so_far;
218 
219 	/* If rate-limiting is enabled at all, update the appropriate
220 	   bucket, and take the smaller of our rate limit and the group
221 	   rate limit.
222 	 */
223 
224 	if (bev->rate_limiting->cfg) {
225 		bufferevent_update_buckets(bev);
226 		max_so_far = LIM(bev->rate_limiting->limit);
227 	}
228 	if (bev->rate_limiting->group) {
229 		struct bufferevent_rate_limit_group *g =
230 		    bev->rate_limiting->group;
231 		ev_ssize_t share;
232 		LOCK_GROUP(g);
233 		if (GROUP_SUSPENDED(g)) {
234 			/* We can get here if we failed to lock this
235 			 * particular bufferevent while suspending the whole
236 			 * group. */
237 			if (is_write)
238 				bufferevent_suspend_write_(&bev->bev,
239 				    BEV_SUSPEND_BW_GROUP);
240 			else
241 				bufferevent_suspend_read_(&bev->bev,
242 				    BEV_SUSPEND_BW_GROUP);
243 			share = 0;
244 		} else {
245 			/* XXXX probably we should divide among the active
246 			 * members, not the total members. */
247 			share = LIM(g->rate_limit) / g->n_members;
248 			if (share < g->min_share)
249 				share = g->min_share;
250 		}
251 		UNLOCK_GROUP(g);
252 		CLAMPTO(share);
253 	}
254 
255 	if (max_so_far < 0)
256 		max_so_far = 0;
257 	return max_so_far;
258 }
259 
260 ev_ssize_t
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
262 {
263 	return bufferevent_get_rlim_max_(bev, 0);
264 }
265 
266 ev_ssize_t
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
268 {
269 	return bufferevent_get_rlim_max_(bev, 1);
270 }
271 
272 int
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274 {
275 	/* XXXXX Make sure all users of this function check its return value */
276 	int r = 0;
277 	/* need to hold lock on bev */
278 	if (!bev->rate_limiting)
279 		return 0;
280 
281 	if (bev->rate_limiting->cfg) {
282 		bev->rate_limiting->limit.read_limit -= bytes;
283 		if (bev->rate_limiting->limit.read_limit <= 0) {
284 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285 			if (event_add(&bev->rate_limiting->refill_bucket_event,
286 				&bev->rate_limiting->cfg->tick_timeout) < 0)
287 				r = -1;
288 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
290 				event_del(&bev->rate_limiting->refill_bucket_event);
291 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292 		}
293 	}
294 
295 	if (bev->rate_limiting->group) {
296 		LOCK_GROUP(bev->rate_limiting->group);
297 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298 		bev->rate_limiting->group->total_read += bytes;
299 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300 			bev_group_suspend_reading_(bev->rate_limiting->group);
301 		} else if (bev->rate_limiting->group->read_suspended) {
302 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
303 		}
304 		UNLOCK_GROUP(bev->rate_limiting->group);
305 	}
306 
307 	return r;
308 }
309 
310 int
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312 {
313 	/* XXXXX Make sure all users of this function check its return value */
314 	int r = 0;
315 	/* need to hold lock */
316 	if (!bev->rate_limiting)
317 		return 0;
318 
319 	if (bev->rate_limiting->cfg) {
320 		bev->rate_limiting->limit.write_limit -= bytes;
321 		if (bev->rate_limiting->limit.write_limit <= 0) {
322 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323 			if (event_add(&bev->rate_limiting->refill_bucket_event,
324 				&bev->rate_limiting->cfg->tick_timeout) < 0)
325 				r = -1;
326 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
328 				event_del(&bev->rate_limiting->refill_bucket_event);
329 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330 		}
331 	}
332 
333 	if (bev->rate_limiting->group) {
334 		LOCK_GROUP(bev->rate_limiting->group);
335 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336 		bev->rate_limiting->group->total_written += bytes;
337 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338 			bev_group_suspend_writing_(bev->rate_limiting->group);
339 		} else if (bev->rate_limiting->group->write_suspended) {
340 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
341 		}
342 		UNLOCK_GROUP(bev->rate_limiting->group);
343 	}
344 
345 	return r;
346 }
347 
348 /** Stop reading on every bufferevent in <b>g</b> */
349 static int
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351 {
352 	/* Needs group lock */
353 	struct bufferevent_private *bev;
354 	g->read_suspended = 1;
355 	g->pending_unsuspend_read = 0;
356 
357 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
359 	   the bufferevent locks.  If we are unable to lock any individual
360 	   bufferevent, it will find out later when it looks at its limit
361 	   and sees that its group is suspended.)
362 	*/
363 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
365 			bufferevent_suspend_read_(&bev->bev,
366 			    BEV_SUSPEND_BW_GROUP);
367 			EVLOCK_UNLOCK(bev->lock, 0);
368 		}
369 	}
370 	return 0;
371 }
372 
373 /** Stop writing on every bufferevent in <b>g</b> */
374 static int
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376 {
377 	/* Needs group lock */
378 	struct bufferevent_private *bev;
379 	g->write_suspended = 1;
380 	g->pending_unsuspend_write = 0;
381 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
383 			bufferevent_suspend_write_(&bev->bev,
384 			    BEV_SUSPEND_BW_GROUP);
385 			EVLOCK_UNLOCK(bev->lock, 0);
386 		}
387 	}
388 	return 0;
389 }
390 
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392     buckets when they are ready to refill. */
393 static void
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395 {
396 	unsigned tick;
397 	struct timeval now;
398 	struct bufferevent_private *bev = arg;
399 	int again = 0;
400 	BEV_LOCK(&bev->bev);
401 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402 		BEV_UNLOCK(&bev->bev);
403 		return;
404 	}
405 
406 	/* First, update the bucket */
407 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408 	tick = ev_token_bucket_get_tick_(&now,
409 	    bev->rate_limiting->cfg);
410 	ev_token_bucket_update_(&bev->rate_limiting->limit,
411 	    bev->rate_limiting->cfg,
412 	    tick);
413 
414 	/* Now unsuspend any read/write operations as appropriate. */
415 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416 		if (bev->rate_limiting->limit.read_limit > 0)
417 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418 		else
419 			again = 1;
420 	}
421 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422 		if (bev->rate_limiting->limit.write_limit > 0)
423 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424 		else
425 			again = 1;
426 	}
427 	if (again) {
428 		/* One or more of the buckets may need another refill if they
429 		   started negative.
430 
431 		   XXXX if we need to be quiet for more ticks, we should
432 		   maybe figure out what timeout we really want.
433 		*/
434 		/* XXXX Handle event_add failure somehow */
435 		event_add(&bev->rate_limiting->refill_bucket_event,
436 		    &bev->rate_limiting->cfg->tick_timeout);
437 	}
438 	BEV_UNLOCK(&bev->bev);
439 }
440 
441 /** Helper: grab a random element from a bufferevent group.
442  *
443  * Requires that we hold the lock on the group.
444  */
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447 {
448 	int which;
449 	struct bufferevent_private *bev;
450 
451 	/* requires group lock */
452 
453 	if (!group->n_members)
454 		return NULL;
455 
456 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457 
458 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459 
460 	bev = LIST_FIRST(&group->members);
461 	while (which--)
462 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463 
464 	return bev;
465 }
466 
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468     starting point, assigning each to the variable 'bev', and executing the
469     block 'block'.
470 
471     We do this in a half-baked effort to get fairness among group members.
472     XXX Round-robin or some kind of priority queue would be even more fair.
473  */
474 #define FOREACH_RANDOM_ORDER(block)			\
475 	do {						\
476 		first = bev_group_random_element_(g);	\
477 		for (bev = first; bev != LIST_END(&g->members); \
478 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479 			block ;					 \
480 		}						 \
481 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483 			block ;						\
484 		}							\
485 	} while (0)
486 
487 static void
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489 {
490 	int again = 0;
491 	struct bufferevent_private *bev, *first;
492 
493 	g->read_suspended = 0;
494 	FOREACH_RANDOM_ORDER({
495 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
496 			bufferevent_unsuspend_read_(&bev->bev,
497 			    BEV_SUSPEND_BW_GROUP);
498 			EVLOCK_UNLOCK(bev->lock, 0);
499 		} else {
500 			again = 1;
501 		}
502 	});
503 	g->pending_unsuspend_read = again;
504 }
505 
506 static void
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508 {
509 	int again = 0;
510 	struct bufferevent_private *bev, *first;
511 	g->write_suspended = 0;
512 
513 	FOREACH_RANDOM_ORDER({
514 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
515 			bufferevent_unsuspend_write_(&bev->bev,
516 			    BEV_SUSPEND_BW_GROUP);
517 			EVLOCK_UNLOCK(bev->lock, 0);
518 		} else {
519 			again = 1;
520 		}
521 	});
522 	g->pending_unsuspend_write = again;
523 }
524 
525 /** Callback invoked every tick to add more elements to the group bucket
526     and unsuspend group members as needed.
527  */
528 static void
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530 {
531 	struct bufferevent_rate_limit_group *g = arg;
532 	unsigned tick;
533 	struct timeval now;
534 
535 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536 
537 	LOCK_GROUP(g);
538 
539 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541 
542 	if (g->pending_unsuspend_read ||
543 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544 		bev_group_unsuspend_reading_(g);
545 	}
546 	if (g->pending_unsuspend_write ||
547 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548 		bev_group_unsuspend_writing_(g);
549 	}
550 
551 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552 	 * with pending_unsuspend_write/read, we should do it on the
553 	 * next iteration of the mainloop.
554 	 */
555 
556 	UNLOCK_GROUP(g);
557 }
558 
559 int
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561     struct ev_token_bucket_cfg *cfg)
562 {
563 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
564 	int r = -1;
565 	struct bufferevent_rate_limit *rlim;
566 	struct timeval now;
567 	ev_uint32_t tick;
568 	int reinit = 0, suspended = 0;
569 	/* XXX reference-count cfg */
570 
571 	BEV_LOCK(bev);
572 
573 	if (cfg == NULL) {
574 		if (bevp->rate_limiting) {
575 			rlim = bevp->rate_limiting;
576 			rlim->cfg = NULL;
577 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579 			if (event_initialized(&rlim->refill_bucket_event))
580 				event_del(&rlim->refill_bucket_event);
581 		}
582 		r = 0;
583 		goto done;
584 	}
585 
586 	event_base_gettimeofday_cached(bev->ev_base, &now);
587 	tick = ev_token_bucket_get_tick_(&now, cfg);
588 
589 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590 		/* no-op */
591 		r = 0;
592 		goto done;
593 	}
594 	if (bevp->rate_limiting == NULL) {
595 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596 		if (!rlim)
597 			goto done;
598 		bevp->rate_limiting = rlim;
599 	} else {
600 		rlim = bevp->rate_limiting;
601 	}
602 	reinit = rlim->cfg != NULL;
603 
604 	rlim->cfg = cfg;
605 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606 
607 	if (reinit) {
608 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609 		event_del(&rlim->refill_bucket_event);
610 	}
611 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
612 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
613 
614 	if (rlim->limit.read_limit > 0) {
615 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616 	} else {
617 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618 		suspended=1;
619 	}
620 	if (rlim->limit.write_limit > 0) {
621 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622 	} else {
623 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624 		suspended = 1;
625 	}
626 
627 	if (suspended)
628 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629 
630 	r = 0;
631 
632 done:
633 	BEV_UNLOCK(bev);
634 	return r;
635 }
636 
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639     const struct ev_token_bucket_cfg *cfg)
640 {
641 	struct bufferevent_rate_limit_group *g;
642 	struct timeval now;
643 	ev_uint32_t tick;
644 
645 	event_base_gettimeofday_cached(base, &now);
646 	tick = ev_token_bucket_get_tick_(&now, cfg);
647 
648 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649 	if (!g)
650 		return NULL;
651 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652 	LIST_INIT(&g->members);
653 
654 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655 
656 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657 	    bev_group_refill_callback_, g);
658 	/*XXXX handle event_add failure */
659 	event_add(&g->master_refill_event, &cfg->tick_timeout);
660 
661 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662 
663 	bufferevent_rate_limit_group_set_min_share(g, 64);
664 
665 	evutil_weakrand_seed_(&g->weakrand_seed,
666 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667 
668 	return g;
669 }
670 
671 int
672 bufferevent_rate_limit_group_set_cfg(
673 	struct bufferevent_rate_limit_group *g,
674 	const struct ev_token_bucket_cfg *cfg)
675 {
676 	int same_tick;
677 	if (!g || !cfg)
678 		return -1;
679 
680 	LOCK_GROUP(g);
681 	same_tick = evutil_timercmp(
682 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684 
685 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686 		g->rate_limit.read_limit = cfg->read_maximum;
687 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688 		g->rate_limit.write_limit = cfg->write_maximum;
689 
690 	if (!same_tick) {
691 		/* This can cause a hiccup in the schedule */
692 		event_add(&g->master_refill_event, &cfg->tick_timeout);
693 	}
694 
695 	/* The new limits might force us to adjust min_share differently. */
696 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697 
698 	UNLOCK_GROUP(g);
699 	return 0;
700 }
701 
702 int
703 bufferevent_rate_limit_group_set_min_share(
704 	struct bufferevent_rate_limit_group *g,
705 	size_t share)
706 {
707 	if (share > EV_SSIZE_MAX)
708 		return -1;
709 
710 	g->configured_min_share = share;
711 
712 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
713 	 * state, at least one connection can go per tick. */
714 	if (share > g->rate_limit_cfg.read_rate)
715 		share = g->rate_limit_cfg.read_rate;
716 	if (share > g->rate_limit_cfg.write_rate)
717 		share = g->rate_limit_cfg.write_rate;
718 
719 	g->min_share = share;
720 	return 0;
721 }
722 
723 void
724 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725 {
726 	LOCK_GROUP(g);
727 	EVUTIL_ASSERT(0 == g->n_members);
728 	event_del(&g->master_refill_event);
729 	UNLOCK_GROUP(g);
730 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731 	mm_free(g);
732 }
733 
734 int
735 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736     struct bufferevent_rate_limit_group *g)
737 {
738 	int wsuspend, rsuspend;
739 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
740 	BEV_LOCK(bev);
741 
742 	if (!bevp->rate_limiting) {
743 		struct bufferevent_rate_limit *rlim;
744 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745 		if (!rlim) {
746 			BEV_UNLOCK(bev);
747 			return -1;
748 		}
749 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
750 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
751 		bevp->rate_limiting = rlim;
752 	}
753 
754 	if (bevp->rate_limiting->group == g) {
755 		BEV_UNLOCK(bev);
756 		return 0;
757 	}
758 	if (bevp->rate_limiting->group)
759 		bufferevent_remove_from_rate_limit_group(bev);
760 
761 	LOCK_GROUP(g);
762 	bevp->rate_limiting->group = g;
763 	++g->n_members;
764 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765 
766 	rsuspend = g->read_suspended;
767 	wsuspend = g->write_suspended;
768 
769 	UNLOCK_GROUP(g);
770 
771 	if (rsuspend)
772 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773 	if (wsuspend)
774 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775 
776 	BEV_UNLOCK(bev);
777 	return 0;
778 }
779 
780 int
781 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782 {
783 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784 }
785 
786 int
787 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788     int unsuspend)
789 {
790 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
791 	BEV_LOCK(bev);
792 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
793 		struct bufferevent_rate_limit_group *g =
794 		    bevp->rate_limiting->group;
795 		LOCK_GROUP(g);
796 		bevp->rate_limiting->group = NULL;
797 		--g->n_members;
798 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
799 		UNLOCK_GROUP(g);
800 	}
801 	if (unsuspend) {
802 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804 	}
805 	BEV_UNLOCK(bev);
806 	return 0;
807 }
808 
809 /* ===
810  * API functions to expose rate limits.
811  *
812  * Don't use these from inside Libevent; they're meant to be for use by
813  * the program.
814  * === */
815 
816 /* Mostly you don't want to use this function from inside libevent;
817  * bufferevent_get_read_max_() is more likely what you want*/
818 ev_ssize_t
819 bufferevent_get_read_limit(struct bufferevent *bev)
820 {
821 	ev_ssize_t r;
822 	struct bufferevent_private *bevp;
823 	BEV_LOCK(bev);
824 	bevp = BEV_UPCAST(bev);
825 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826 		bufferevent_update_buckets(bevp);
827 		r = bevp->rate_limiting->limit.read_limit;
828 	} else {
829 		r = EV_SSIZE_MAX;
830 	}
831 	BEV_UNLOCK(bev);
832 	return r;
833 }
834 
835 /* Mostly you don't want to use this function from inside libevent;
836  * bufferevent_get_write_max_() is more likely what you want*/
837 ev_ssize_t
838 bufferevent_get_write_limit(struct bufferevent *bev)
839 {
840 	ev_ssize_t r;
841 	struct bufferevent_private *bevp;
842 	BEV_LOCK(bev);
843 	bevp = BEV_UPCAST(bev);
844 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845 		bufferevent_update_buckets(bevp);
846 		r = bevp->rate_limiting->limit.write_limit;
847 	} else {
848 		r = EV_SSIZE_MAX;
849 	}
850 	BEV_UNLOCK(bev);
851 	return r;
852 }
853 
854 int
855 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856 {
857 	struct bufferevent_private *bevp;
858 	BEV_LOCK(bev);
859 	bevp = BEV_UPCAST(bev);
860 	if (size == 0 || size > EV_SSIZE_MAX)
861 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
862 	else
863 		bevp->max_single_read = size;
864 	BEV_UNLOCK(bev);
865 	return 0;
866 }
867 
868 int
869 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
870 {
871 	struct bufferevent_private *bevp;
872 	BEV_LOCK(bev);
873 	bevp = BEV_UPCAST(bev);
874 	if (size == 0 || size > EV_SSIZE_MAX)
875 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
876 	else
877 		bevp->max_single_write = size;
878 	BEV_UNLOCK(bev);
879 	return 0;
880 }
881 
882 ev_ssize_t
883 bufferevent_get_max_single_read(struct bufferevent *bev)
884 {
885 	ev_ssize_t r;
886 
887 	BEV_LOCK(bev);
888 	r = BEV_UPCAST(bev)->max_single_read;
889 	BEV_UNLOCK(bev);
890 	return r;
891 }
892 
893 ev_ssize_t
894 bufferevent_get_max_single_write(struct bufferevent *bev)
895 {
896 	ev_ssize_t r;
897 
898 	BEV_LOCK(bev);
899 	r = BEV_UPCAST(bev)->max_single_write;
900 	BEV_UNLOCK(bev);
901 	return r;
902 }
903 
904 ev_ssize_t
905 bufferevent_get_max_to_read(struct bufferevent *bev)
906 {
907 	ev_ssize_t r;
908 	BEV_LOCK(bev);
909 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
910 	BEV_UNLOCK(bev);
911 	return r;
912 }
913 
914 ev_ssize_t
915 bufferevent_get_max_to_write(struct bufferevent *bev)
916 {
917 	ev_ssize_t r;
918 	BEV_LOCK(bev);
919 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
920 	BEV_UNLOCK(bev);
921 	return r;
922 }
923 
924 const struct ev_token_bucket_cfg *
925 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
926 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
927 	struct ev_token_bucket_cfg *cfg;
928 
929 	BEV_LOCK(bev);
930 
931 	if (bufev_private->rate_limiting) {
932 		cfg = bufev_private->rate_limiting->cfg;
933 	} else {
934 		cfg = NULL;
935 	}
936 
937 	BEV_UNLOCK(bev);
938 
939 	return cfg;
940 }
941 
942 /* Mostly you don't want to use this function from inside libevent;
943  * bufferevent_get_read_max_() is more likely what you want*/
944 ev_ssize_t
945 bufferevent_rate_limit_group_get_read_limit(
946 	struct bufferevent_rate_limit_group *grp)
947 {
948 	ev_ssize_t r;
949 	LOCK_GROUP(grp);
950 	r = grp->rate_limit.read_limit;
951 	UNLOCK_GROUP(grp);
952 	return r;
953 }
954 
955 /* Mostly you don't want to use this function from inside libevent;
956  * bufferevent_get_write_max_() is more likely what you want. */
957 ev_ssize_t
958 bufferevent_rate_limit_group_get_write_limit(
959 	struct bufferevent_rate_limit_group *grp)
960 {
961 	ev_ssize_t r;
962 	LOCK_GROUP(grp);
963 	r = grp->rate_limit.write_limit;
964 	UNLOCK_GROUP(grp);
965 	return r;
966 }
967 
968 int
969 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
970 {
971 	int r = 0;
972 	ev_ssize_t old_limit, new_limit;
973 	struct bufferevent_private *bevp;
974 	BEV_LOCK(bev);
975 	bevp = BEV_UPCAST(bev);
976 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
977 	old_limit = bevp->rate_limiting->limit.read_limit;
978 
979 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
980 	if (old_limit > 0 && new_limit <= 0) {
981 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
982 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
983 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
984 			r = -1;
985 	} else if (old_limit <= 0 && new_limit > 0) {
986 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
987 			event_del(&bevp->rate_limiting->refill_bucket_event);
988 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
989 	}
990 
991 	BEV_UNLOCK(bev);
992 	return r;
993 }
994 
995 int
996 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
997 {
998 	/* XXXX this is mostly copy-and-paste from
999 	 * bufferevent_decrement_read_limit */
1000 	int r = 0;
1001 	ev_ssize_t old_limit, new_limit;
1002 	struct bufferevent_private *bevp;
1003 	BEV_LOCK(bev);
1004 	bevp = BEV_UPCAST(bev);
1005 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1006 	old_limit = bevp->rate_limiting->limit.write_limit;
1007 
1008 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1009 	if (old_limit > 0 && new_limit <= 0) {
1010 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1011 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1012 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1013 			r = -1;
1014 	} else if (old_limit <= 0 && new_limit > 0) {
1015 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1016 			event_del(&bevp->rate_limiting->refill_bucket_event);
1017 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1018 	}
1019 
1020 	BEV_UNLOCK(bev);
1021 	return r;
1022 }
1023 
1024 int
1025 bufferevent_rate_limit_group_decrement_read(
1026 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1027 {
1028 	int r = 0;
1029 	ev_ssize_t old_limit, new_limit;
1030 	LOCK_GROUP(grp);
1031 	old_limit = grp->rate_limit.read_limit;
1032 	new_limit = (grp->rate_limit.read_limit -= decr);
1033 
1034 	if (old_limit > 0 && new_limit <= 0) {
1035 		bev_group_suspend_reading_(grp);
1036 	} else if (old_limit <= 0 && new_limit > 0) {
1037 		bev_group_unsuspend_reading_(grp);
1038 	}
1039 
1040 	UNLOCK_GROUP(grp);
1041 	return r;
1042 }
1043 
1044 int
1045 bufferevent_rate_limit_group_decrement_write(
1046 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1047 {
1048 	int r = 0;
1049 	ev_ssize_t old_limit, new_limit;
1050 	LOCK_GROUP(grp);
1051 	old_limit = grp->rate_limit.write_limit;
1052 	new_limit = (grp->rate_limit.write_limit -= decr);
1053 
1054 	if (old_limit > 0 && new_limit <= 0) {
1055 		bev_group_suspend_writing_(grp);
1056 	} else if (old_limit <= 0 && new_limit > 0) {
1057 		bev_group_unsuspend_writing_(grp);
1058 	}
1059 
1060 	UNLOCK_GROUP(grp);
1061 	return r;
1062 }
1063 
1064 void
1065 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1066     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1067 {
1068 	EVUTIL_ASSERT(grp != NULL);
1069 	if (total_read_out)
1070 		*total_read_out = grp->total_read;
1071 	if (total_written_out)
1072 		*total_written_out = grp->total_written;
1073 }
1074 
1075 void
1076 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1077 {
1078 	grp->total_read = grp->total_written = 0;
1079 }
1080 
1081 int
1082 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1083 {
1084 	bev->rate_limiting = NULL;
1085 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1086 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1087 
1088 	return 0;
1089 }
1090