xref: /freebsd/sys/netgraph/netflow/netflow.c (revision 39ee7a7a6bdd1557b1c3532abf60d139798ac88b)
1 /*-
2  * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru>
3  * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
4  * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  *
28  * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
29  */
30 
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
33 
34 #include "opt_inet6.h"
35 #include "opt_route.h"
36 #include <sys/param.h>
37 #include <sys/systm.h>
38 #include <sys/counter.h>
39 #include <sys/kernel.h>
40 #include <sys/limits.h>
41 #include <sys/mbuf.h>
42 #include <sys/syslog.h>
43 #include <sys/socket.h>
44 
45 #include <net/if.h>
46 #include <net/if_var.h>
47 #include <net/route.h>
48 #include <net/ethernet.h>
49 #include <netinet/in.h>
50 #include <netinet/in_systm.h>
51 #include <netinet/ip.h>
52 #include <netinet/ip6.h>
53 #include <netinet/tcp.h>
54 #include <netinet/udp.h>
55 
56 #include <netgraph/ng_message.h>
57 #include <netgraph/netgraph.h>
58 
59 #include <netgraph/netflow/netflow.h>
60 #include <netgraph/netflow/netflow_v9.h>
61 #include <netgraph/netflow/ng_netflow.h>
62 
63 #define	NBUCKETS	(65536)		/* must be power of 2 */
64 
65 /* This hash is for TCP or UDP packets. */
66 #define FULL_HASH(addr1, addr2, port1, port2)	\
67 	(((addr1 ^ (addr1 >> 16) ^ 		\
68 	htons(addr2 ^ (addr2 >> 16))) ^ 	\
69 	port1 ^ htons(port2)) &			\
70 	(NBUCKETS - 1))
71 
72 /* This hash is for all other IP packets. */
73 #define ADDR_HASH(addr1, addr2)			\
74 	((addr1 ^ (addr1 >> 16) ^ 		\
75 	htons(addr2 ^ (addr2 >> 16))) &		\
76 	(NBUCKETS - 1))
77 
78 /* Macros to shorten logical constructions */
79 /* XXX: priv must exist in namespace */
80 #define	INACTIVE(fle)	(time_uptime - fle->f.last > priv->nfinfo_inact_t)
81 #define	AGED(fle)	(time_uptime - fle->f.first > priv->nfinfo_act_t)
82 #define	ISFREE(fle)	(fle->f.packets == 0)
83 
84 /*
85  * 4 is a magical number: statistically number of 4-packet flows is
86  * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
87  * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
88  * of reachable host and 4-packet otherwise.
89  */
90 #define	SMALL(fle)	(fle->f.packets <= 4)
91 
92 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash");
93 
94 static int export_add(item_p, struct flow_entry *);
95 static int export_send(priv_p, fib_export_p, item_p, int);
96 
97 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *,
98     int, uint8_t, uint8_t);
99 #ifdef INET6
100 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *,
101     int, uint8_t, uint8_t);
102 #endif
103 
104 static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int);
105 
106 /*
107  * Generate hash for a given flow record.
108  *
109  * FIB is not used here, because:
110  * most VRFS will carry public IPv4 addresses which are unique even
111  * without FIB private addresses can overlap, but this is worked out
112  * via flow_rec bcmp() containing fib id. In IPv6 world addresses are
113  * all globally unique (it's not fully true, there is FC00::/7 for example,
114  * but chances of address overlap are MUCH smaller)
115  */
116 static inline uint32_t
117 ip_hash(struct flow_rec *r)
118 {
119 
120 	switch (r->r_ip_p) {
121 	case IPPROTO_TCP:
122 	case IPPROTO_UDP:
123 		return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
124 		    r->r_sport, r->r_dport);
125 	default:
126 		return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
127 	}
128 }
129 
130 #ifdef INET6
131 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */
132 static inline uint32_t
133 ip6_hash(struct flow6_rec *r)
134 {
135 
136 	switch (r->r_ip_p) {
137 	case IPPROTO_TCP:
138 	case IPPROTO_UDP:
139 		return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
140 		    r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport,
141 		    r->r_dport);
142 	default:
143 		return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
144 		    r->dst.r_dst6.__u6_addr.__u6_addr32[3]);
145  	}
146 }
147 #endif
148 
149 /*
150  * Detach export datagram from priv, if there is any.
151  * If there is no, allocate a new one.
152  */
153 static item_p
154 get_export_dgram(priv_p priv, fib_export_p fe)
155 {
156 	item_p	item = NULL;
157 
158 	mtx_lock(&fe->export_mtx);
159 	if (fe->exp.item != NULL) {
160 		item = fe->exp.item;
161 		fe->exp.item = NULL;
162 	}
163 	mtx_unlock(&fe->export_mtx);
164 
165 	if (item == NULL) {
166 		struct netflow_v5_export_dgram *dgram;
167 		struct mbuf *m;
168 
169 		m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR);
170 		if (m == NULL)
171 			return (NULL);
172 		item = ng_package_data(m, NG_NOFLAGS);
173 		if (item == NULL)
174 			return (NULL);
175 		dgram = mtod(m, struct netflow_v5_export_dgram *);
176 		dgram->header.count = 0;
177 		dgram->header.version = htons(NETFLOW_V5);
178 		dgram->header.pad = 0;
179 	}
180 
181 	return (item);
182 }
183 
184 /*
185  * Re-attach incomplete datagram back to priv.
186  * If there is already another one, then send incomplete. */
187 static void
188 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags)
189 {
190 
191 	/*
192 	 * It may happen on SMP, that some thread has already
193 	 * put its item there, in this case we bail out and
194 	 * send what we have to collector.
195 	 */
196 	mtx_lock(&fe->export_mtx);
197 	if (fe->exp.item == NULL) {
198 		fe->exp.item = item;
199 		mtx_unlock(&fe->export_mtx);
200 	} else {
201 		mtx_unlock(&fe->export_mtx);
202 		export_send(priv, fe, item, flags);
203 	}
204 }
205 
206 /*
207  * The flow is over. Call export_add() and free it. If datagram is
208  * full, then call export_send().
209  */
210 static void
211 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags)
212 {
213 	struct netflow_export_item exp;
214 	uint16_t version = fle->f.version;
215 
216 	if ((priv->export != NULL) && (version == IPVERSION)) {
217 		exp.item = get_export_dgram(priv, fe);
218 		if (exp.item == NULL) {
219 			priv->nfinfo_export_failed++;
220 			if (priv->export9 != NULL)
221 				priv->nfinfo_export9_failed++;
222 			/* fle definitely contains IPv4 flow. */
223 			uma_zfree_arg(priv->zone, fle, priv);
224 			return;
225 		}
226 
227 		if (export_add(exp.item, fle) > 0)
228 			export_send(priv, fe, exp.item, flags);
229 		else
230 			return_export_dgram(priv, fe, exp.item, NG_QUEUE);
231 	}
232 
233 	if (priv->export9 != NULL) {
234 		exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt);
235 		if (exp.item9 == NULL) {
236 			priv->nfinfo_export9_failed++;
237 			if (version == IPVERSION)
238 				uma_zfree_arg(priv->zone, fle, priv);
239 #ifdef INET6
240 			else if (version == IP6VERSION)
241 				uma_zfree_arg(priv->zone6, fle, priv);
242 #endif
243 			else
244 				panic("ng_netflow: Unknown IP proto: %d",
245 				    version);
246 			return;
247 		}
248 
249 		if (export9_add(exp.item9, exp.item9_opt, fle) > 0)
250 			export9_send(priv, fe, exp.item9, exp.item9_opt, flags);
251 		else
252 			return_export9_dgram(priv, fe, exp.item9,
253 			    exp.item9_opt, NG_QUEUE);
254 	}
255 
256 	if (version == IPVERSION)
257 		uma_zfree_arg(priv->zone, fle, priv);
258 #ifdef INET6
259 	else if (version == IP6VERSION)
260 		uma_zfree_arg(priv->zone6, fle, priv);
261 #endif
262 }
263 
264 /* Get a snapshot of node statistics */
265 void
266 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
267 {
268 
269 	i->nfinfo_bytes = counter_u64_fetch(priv->nfinfo_bytes);
270 	i->nfinfo_packets = counter_u64_fetch(priv->nfinfo_packets);
271 	i->nfinfo_bytes6 = counter_u64_fetch(priv->nfinfo_bytes6);
272 	i->nfinfo_packets6 = counter_u64_fetch(priv->nfinfo_packets6);
273 	i->nfinfo_sbytes = counter_u64_fetch(priv->nfinfo_sbytes);
274 	i->nfinfo_spackets = counter_u64_fetch(priv->nfinfo_spackets);
275 	i->nfinfo_sbytes6 = counter_u64_fetch(priv->nfinfo_sbytes6);
276 	i->nfinfo_spackets6 = counter_u64_fetch(priv->nfinfo_spackets6);
277 	i->nfinfo_act_exp = counter_u64_fetch(priv->nfinfo_act_exp);
278 	i->nfinfo_inact_exp = counter_u64_fetch(priv->nfinfo_inact_exp);
279 
280 	i->nfinfo_used = uma_zone_get_cur(priv->zone);
281 #ifdef INET6
282 	i->nfinfo_used6 = uma_zone_get_cur(priv->zone6);
283 #endif
284 
285 	i->nfinfo_alloc_failed = priv->nfinfo_alloc_failed;
286 	i->nfinfo_export_failed = priv->nfinfo_export_failed;
287 	i->nfinfo_export9_failed = priv->nfinfo_export9_failed;
288 	i->nfinfo_realloc_mbuf = priv->nfinfo_realloc_mbuf;
289 	i->nfinfo_alloc_fibs = priv->nfinfo_alloc_fibs;
290 	i->nfinfo_inact_t = priv->nfinfo_inact_t;
291 	i->nfinfo_act_t = priv->nfinfo_act_t;
292 }
293 
294 /*
295  * Insert a record into defined slot.
296  *
297  * First we get for us a free flow entry, then fill in all
298  * possible fields in it.
299  *
300  * TODO: consider dropping hash mutex while filling in datagram,
301  * as this was done in previous version. Need to test & profile
302  * to be sure.
303  */
304 static int
305 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
306 	int plen, uint8_t flags, uint8_t tcp_flags)
307 {
308 	struct flow_entry *fle;
309 	struct sockaddr_in sin;
310 	struct rtentry *rt;
311 
312 	mtx_assert(&hsh->mtx, MA_OWNED);
313 
314 	fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
315 	if (fle == NULL) {
316 		priv->nfinfo_alloc_failed++;
317 		return (ENOMEM);
318 	}
319 
320 	/*
321 	 * Now fle is totally ours. It is detached from all lists,
322 	 * we can safely edit it.
323 	 */
324 	fle->f.version = IPVERSION;
325 	bcopy(r, &fle->f.r, sizeof(struct flow_rec));
326 	fle->f.bytes = plen;
327 	fle->f.packets = 1;
328 	fle->f.tcp_flags = tcp_flags;
329 
330 	fle->f.first = fle->f.last = time_uptime;
331 
332 	/*
333 	 * First we do route table lookup on destination address. So we can
334 	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
335 	 */
336 	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
337 		bzero(&sin, sizeof(sin));
338 		sin.sin_len = sizeof(struct sockaddr_in);
339 		sin.sin_family = AF_INET;
340 		sin.sin_addr = fle->f.r.r_dst;
341 		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
342 		if (rt != NULL) {
343 			fle->f.fle_o_ifx = rt->rt_ifp->if_index;
344 
345 			if (rt->rt_flags & RTF_GATEWAY &&
346 			    rt->rt_gateway->sa_family == AF_INET)
347 				fle->f.next_hop =
348 				    ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr;
349 
350 			if (rt_mask(rt))
351 				fle->f.dst_mask =
352 				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
353 			else if (rt->rt_flags & RTF_HOST)
354 				/* Give up. We can't determine mask :( */
355 				fle->f.dst_mask = 32;
356 
357 			RTFREE_LOCKED(rt);
358 		}
359 	}
360 
361 	/* Do route lookup on source address, to fill in src_mask. */
362 	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
363 		bzero(&sin, sizeof(sin));
364 		sin.sin_len = sizeof(struct sockaddr_in);
365 		sin.sin_family = AF_INET;
366 		sin.sin_addr = fle->f.r.r_src;
367 		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
368 		if (rt != NULL) {
369 			if (rt_mask(rt))
370 				fle->f.src_mask =
371 				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
372 			else if (rt->rt_flags & RTF_HOST)
373 				/* Give up. We can't determine mask :( */
374 				fle->f.src_mask = 32;
375 
376 			RTFREE_LOCKED(rt);
377 		}
378 	}
379 
380 	/* Push new flow at the and of hash. */
381 	TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
382 
383 	return (0);
384 }
385 
386 #ifdef INET6
387 /* XXX: make normal function, instead of.. */
388 #define ipv6_masklen(x)		bitcount32((x).__u6_addr.__u6_addr32[0]) + \
389 				bitcount32((x).__u6_addr.__u6_addr32[1]) + \
390 				bitcount32((x).__u6_addr.__u6_addr32[2]) + \
391 				bitcount32((x).__u6_addr.__u6_addr32[3])
392 #define RT_MASK6(x)	(ipv6_masklen(((struct sockaddr_in6 *)rt_mask(x))->sin6_addr))
393 static int
394 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r,
395 	int plen, uint8_t flags, uint8_t tcp_flags)
396 {
397 	struct flow6_entry *fle6;
398 	struct sockaddr_in6 *src, *dst;
399 	struct rtentry *rt;
400 	struct route_in6 rin6;
401 
402 	mtx_assert(&hsh6->mtx, MA_OWNED);
403 
404 	fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT);
405 	if (fle6 == NULL) {
406 		priv->nfinfo_alloc_failed++;
407 		return (ENOMEM);
408 	}
409 
410 	/*
411 	 * Now fle is totally ours. It is detached from all lists,
412 	 * we can safely edit it.
413 	 */
414 
415 	fle6->f.version = IP6VERSION;
416 	bcopy(r, &fle6->f.r, sizeof(struct flow6_rec));
417 	fle6->f.bytes = plen;
418 	fle6->f.packets = 1;
419 	fle6->f.tcp_flags = tcp_flags;
420 
421 	fle6->f.first = fle6->f.last = time_uptime;
422 
423 	/*
424 	 * First we do route table lookup on destination address. So we can
425 	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
426 	 */
427 	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
428 		bzero(&rin6, sizeof(struct route_in6));
429 		dst = (struct sockaddr_in6 *)&rin6.ro_dst;
430 		dst->sin6_len = sizeof(struct sockaddr_in6);
431 		dst->sin6_family = AF_INET6;
432 		dst->sin6_addr = r->dst.r_dst6;
433 
434 		rin6.ro_rt = rtalloc1_fib((struct sockaddr *)dst, 0, 0, r->fib);
435 
436 		if (rin6.ro_rt != NULL) {
437 			rt = rin6.ro_rt;
438 			fle6->f.fle_o_ifx = rt->rt_ifp->if_index;
439 
440 			if (rt->rt_flags & RTF_GATEWAY &&
441 			    rt->rt_gateway->sa_family == AF_INET6)
442 				fle6->f.n.next_hop6 =
443 				    ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr;
444 
445 			if (rt_mask(rt))
446 				fle6->f.dst_mask = RT_MASK6(rt);
447 			else
448 				fle6->f.dst_mask = 128;
449 
450 			RTFREE_LOCKED(rt);
451 		}
452 	}
453 
454 	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
455 		/* Do route lookup on source address, to fill in src_mask. */
456 		bzero(&rin6, sizeof(struct route_in6));
457 		src = (struct sockaddr_in6 *)&rin6.ro_dst;
458 		src->sin6_len = sizeof(struct sockaddr_in6);
459 		src->sin6_family = AF_INET6;
460 		src->sin6_addr = r->src.r_src6;
461 
462 		rin6.ro_rt = rtalloc1_fib((struct sockaddr *)src, 0, 0, r->fib);
463 
464 		if (rin6.ro_rt != NULL) {
465 			rt = rin6.ro_rt;
466 
467 			if (rt_mask(rt))
468 				fle6->f.src_mask = RT_MASK6(rt);
469 			else
470 				fle6->f.src_mask = 128;
471 
472 			RTFREE_LOCKED(rt);
473 		}
474 	}
475 
476 	/* Push new flow at the and of hash. */
477 	TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash);
478 
479 	return (0);
480 }
481 #undef ipv6_masklen
482 #undef RT_MASK6
483 #endif
484 
485 
486 /*
487  * Non-static functions called from ng_netflow.c
488  */
489 
490 /* Allocate memory and set up flow cache */
491 void
492 ng_netflow_cache_init(priv_p priv)
493 {
494 	struct flow_hash_entry *hsh;
495 	int i;
496 
497 	/* Initialize cache UMA zone. */
498 	priv->zone = uma_zcreate("NetFlow IPv4 cache",
499 	    sizeof(struct flow_entry), NULL, NULL, NULL, NULL,
500 	    UMA_ALIGN_CACHE, 0);
501 	uma_zone_set_max(priv->zone, CACHESIZE);
502 #ifdef INET6
503 	priv->zone6 = uma_zcreate("NetFlow IPv6 cache",
504 	    sizeof(struct flow6_entry), NULL, NULL, NULL, NULL,
505 	    UMA_ALIGN_CACHE, 0);
506 	uma_zone_set_max(priv->zone6, CACHESIZE);
507 #endif
508 
509 	/* Allocate hash. */
510 	priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
511 	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
512 
513 	/* Initialize hash. */
514 	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
515 		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
516 		TAILQ_INIT(&hsh->head);
517 	}
518 
519 #ifdef INET6
520 	/* Allocate hash. */
521 	priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
522 	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
523 
524 	/* Initialize hash. */
525 	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) {
526 		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
527 		TAILQ_INIT(&hsh->head);
528 	}
529 #endif
530 
531 	priv->nfinfo_bytes = counter_u64_alloc(M_WAITOK);
532 	priv->nfinfo_packets = counter_u64_alloc(M_WAITOK);
533 	priv->nfinfo_bytes6 = counter_u64_alloc(M_WAITOK);
534 	priv->nfinfo_packets6 = counter_u64_alloc(M_WAITOK);
535 	priv->nfinfo_sbytes = counter_u64_alloc(M_WAITOK);
536 	priv->nfinfo_spackets = counter_u64_alloc(M_WAITOK);
537 	priv->nfinfo_sbytes6 = counter_u64_alloc(M_WAITOK);
538 	priv->nfinfo_spackets6 = counter_u64_alloc(M_WAITOK);
539 	priv->nfinfo_act_exp = counter_u64_alloc(M_WAITOK);
540 	priv->nfinfo_inact_exp = counter_u64_alloc(M_WAITOK);
541 
542 	ng_netflow_v9_cache_init(priv);
543 	CTR0(KTR_NET, "ng_netflow startup()");
544 }
545 
546 /* Initialize new FIB table for v5 and v9 */
547 int
548 ng_netflow_fib_init(priv_p priv, int fib)
549 {
550 	fib_export_p	fe = priv_to_fib(priv, fib);
551 
552 	CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib);
553 
554 	if (fe != NULL)
555 		return (0);
556 
557 	if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH,
558 	    M_NOWAIT | M_ZERO)) == NULL)
559 		return (ENOMEM);
560 
561 	mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF);
562 	mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF);
563 	fe->fib = fib;
564 	fe->domain_id = fib;
565 
566 	if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib],
567 	    (uintptr_t)NULL, (uintptr_t)fe) == 0) {
568 		/* FIB already set up by other ISR */
569 		CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p",
570 		    fib, fe, priv_to_fib(priv, fib));
571 		mtx_destroy(&fe->export_mtx);
572 		mtx_destroy(&fe->export9_mtx);
573 		free(fe, M_NETGRAPH);
574 	} else {
575 		/* Increase counter for statistics */
576 		CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)",
577 		    fib, fe, priv_to_fib(priv, fib));
578 		priv->nfinfo_alloc_fibs++;
579 	}
580 
581 	return (0);
582 }
583 
584 /* Free all flow cache memory. Called from node close method. */
585 void
586 ng_netflow_cache_flush(priv_p priv)
587 {
588 	struct flow_entry	*fle, *fle1;
589 	struct flow_hash_entry	*hsh;
590 	struct netflow_export_item exp;
591 	fib_export_p fe;
592 	int i;
593 
594 	bzero(&exp, sizeof(exp));
595 
596 	/*
597 	 * We are going to free probably billable data.
598 	 * Expire everything before freeing it.
599 	 * No locking is required since callout is already drained.
600 	 */
601 	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
602 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
603 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
604 			fe = priv_to_fib(priv, fle->f.r.fib);
605 			expire_flow(priv, fe, fle, NG_QUEUE);
606 		}
607 #ifdef INET6
608 	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++)
609 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
610 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
611 			fe = priv_to_fib(priv, fle->f.r.fib);
612 			expire_flow(priv, fe, fle, NG_QUEUE);
613 		}
614 #endif
615 
616 	uma_zdestroy(priv->zone);
617 	/* Destroy hash mutexes. */
618 	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
619 		mtx_destroy(&hsh->mtx);
620 
621 	/* Free hash memory. */
622 	if (priv->hash != NULL)
623 		free(priv->hash, M_NETFLOW_HASH);
624 #ifdef INET6
625 	uma_zdestroy(priv->zone6);
626 	/* Destroy hash mutexes. */
627 	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++)
628 		mtx_destroy(&hsh->mtx);
629 
630 	/* Free hash memory. */
631 	if (priv->hash6 != NULL)
632 		free(priv->hash6, M_NETFLOW_HASH);
633 #endif
634 
635 	for (i = 0; i < priv->maxfibs; i++) {
636 		if ((fe = priv_to_fib(priv, i)) == NULL)
637 			continue;
638 
639 		if (fe->exp.item != NULL)
640 			export_send(priv, fe, fe->exp.item, NG_QUEUE);
641 
642 		if (fe->exp.item9 != NULL)
643 			export9_send(priv, fe, fe->exp.item9,
644 			    fe->exp.item9_opt, NG_QUEUE);
645 
646 		mtx_destroy(&fe->export_mtx);
647 		mtx_destroy(&fe->export9_mtx);
648 		free(fe, M_NETGRAPH);
649 	}
650 
651 	counter_u64_free(priv->nfinfo_bytes);
652 	counter_u64_free(priv->nfinfo_packets);
653 	counter_u64_free(priv->nfinfo_bytes6);
654 	counter_u64_free(priv->nfinfo_packets6);
655 	counter_u64_free(priv->nfinfo_sbytes);
656 	counter_u64_free(priv->nfinfo_spackets);
657 	counter_u64_free(priv->nfinfo_sbytes6);
658 	counter_u64_free(priv->nfinfo_spackets6);
659 	counter_u64_free(priv->nfinfo_act_exp);
660 	counter_u64_free(priv->nfinfo_inact_exp);
661 
662 	ng_netflow_v9_cache_flush(priv);
663 }
664 
665 /* Insert packet from into flow cache. */
666 int
667 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip,
668     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
669     unsigned int src_if_index)
670 {
671 	struct flow_entry	*fle, *fle1;
672 	struct flow_hash_entry	*hsh;
673 	struct flow_rec		r;
674 	int			hlen, plen;
675 	int			error = 0;
676 	uint16_t		eproto;
677 	uint8_t			tcp_flags = 0;
678 
679 	bzero(&r, sizeof(r));
680 
681 	if (ip->ip_v != IPVERSION)
682 		return (EINVAL);
683 
684 	hlen = ip->ip_hl << 2;
685 	if (hlen < sizeof(struct ip))
686 		return (EINVAL);
687 
688 	eproto = ETHERTYPE_IP;
689 	/* Assume L4 template by default */
690 	r.flow_type = NETFLOW_V9_FLOW_V4_L4;
691 
692 	r.r_src = ip->ip_src;
693 	r.r_dst = ip->ip_dst;
694 	r.fib = fe->fib;
695 
696 	plen = ntohs(ip->ip_len);
697 
698 	r.r_ip_p = ip->ip_p;
699 	r.r_tos = ip->ip_tos;
700 
701 	r.r_i_ifx = src_if_index;
702 
703 	/*
704 	 * XXX NOTE: only first fragment of fragmented TCP, UDP and
705 	 * ICMP packet will be recorded with proper s_port and d_port.
706 	 * Following fragments will be recorded simply as IP packet with
707 	 * ip_proto = ip->ip_p and s_port, d_port set to zero.
708 	 * I know, it looks like bug. But I don't want to re-implement
709 	 * ip packet assebmling here. Anyway, (in)famous trafd works this way -
710 	 * and nobody complains yet :)
711 	 */
712 	if ((ip->ip_off & htons(IP_OFFMASK)) == 0)
713 		switch(r.r_ip_p) {
714 		case IPPROTO_TCP:
715 		    {
716 			struct tcphdr *tcp;
717 
718 			tcp = (struct tcphdr *)((caddr_t )ip + hlen);
719 			r.r_sport = tcp->th_sport;
720 			r.r_dport = tcp->th_dport;
721 			tcp_flags = tcp->th_flags;
722 			break;
723 		    }
724 		case IPPROTO_UDP:
725 			r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
726 			break;
727 		}
728 
729 	counter_u64_add(priv->nfinfo_packets, 1);
730 	counter_u64_add(priv->nfinfo_bytes, plen);
731 
732 	/* Find hash slot. */
733 	hsh = &priv->hash[ip_hash(&r)];
734 
735 	mtx_lock(&hsh->mtx);
736 
737 	/*
738 	 * Go through hash and find our entry. If we encounter an
739 	 * entry, that should be expired, purge it. We do a reverse
740 	 * search since most active entries are first, and most
741 	 * searches are done on most active entries.
742 	 */
743 	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
744 		if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
745 			break;
746 		if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
747 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
748 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
749 			    fle, NG_QUEUE);
750 			counter_u64_add(priv->nfinfo_act_exp, 1);
751 		}
752 	}
753 
754 	if (fle) {			/* An existent entry. */
755 
756 		fle->f.bytes += plen;
757 		fle->f.packets ++;
758 		fle->f.tcp_flags |= tcp_flags;
759 		fle->f.last = time_uptime;
760 
761 		/*
762 		 * We have the following reasons to expire flow in active way:
763 		 * - it hit active timeout
764 		 * - a TCP connection closed
765 		 * - it is going to overflow counter
766 		 */
767 		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
768 		    (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
769 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
770 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
771 			    fle, NG_QUEUE);
772 			counter_u64_add(priv->nfinfo_act_exp, 1);
773 		} else {
774 			/*
775 			 * It is the newest, move it to the tail,
776 			 * if it isn't there already. Next search will
777 			 * locate it quicker.
778 			 */
779 			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
780 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
781 				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
782 			}
783 		}
784 	} else				/* A new flow entry. */
785 		error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags);
786 
787 	mtx_unlock(&hsh->mtx);
788 
789 	return (error);
790 }
791 
792 #ifdef INET6
793 /* Insert IPv6 packet from into flow cache. */
794 int
795 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6,
796     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
797     unsigned int src_if_index)
798 {
799 	struct flow_entry	*fle = NULL, *fle1;
800 	struct flow6_entry	*fle6;
801 	struct flow_hash_entry	*hsh;
802 	struct flow6_rec	r;
803 	int			plen;
804 	int			error = 0;
805 	uint8_t			tcp_flags = 0;
806 
807 	/* check version */
808 	if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
809 		return (EINVAL);
810 
811 	bzero(&r, sizeof(r));
812 
813 	r.src.r_src6 = ip6->ip6_src;
814 	r.dst.r_dst6 = ip6->ip6_dst;
815 	r.fib = fe->fib;
816 
817 	/* Assume L4 template by default */
818 	r.flow_type = NETFLOW_V9_FLOW_V6_L4;
819 
820 	plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr);
821 
822 #if 0
823 	/* XXX: set DSCP/CoS value */
824 	r.r_tos = ip->ip_tos;
825 #endif
826 	if ((flags & NG_NETFLOW_IS_FRAG) == 0) {
827 		switch(upper_proto) {
828 		case IPPROTO_TCP:
829 		    {
830 			struct tcphdr *tcp;
831 
832 			tcp = (struct tcphdr *)upper_ptr;
833 			r.r_ports = *(uint32_t *)upper_ptr;
834 			tcp_flags = tcp->th_flags;
835 			break;
836 		    }
837  		case IPPROTO_UDP:
838 		case IPPROTO_SCTP:
839 			r.r_ports = *(uint32_t *)upper_ptr;
840 			break;
841 		}
842 	}
843 
844 	r.r_ip_p = upper_proto;
845 	r.r_i_ifx = src_if_index;
846 
847 	counter_u64_add(priv->nfinfo_packets6, 1);
848 	counter_u64_add(priv->nfinfo_bytes6, plen);
849 
850 	/* Find hash slot. */
851 	hsh = &priv->hash6[ip6_hash(&r)];
852 
853 	mtx_lock(&hsh->mtx);
854 
855 	/*
856 	 * Go through hash and find our entry. If we encounter an
857 	 * entry, that should be expired, purge it. We do a reverse
858 	 * search since most active entries are first, and most
859 	 * searches are done on most active entries.
860 	 */
861 	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
862 		if (fle->f.version != IP6VERSION)
863 			continue;
864 		fle6 = (struct flow6_entry *)fle;
865 		if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0)
866 			break;
867 		if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) {
868 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
869 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
870 			    NG_QUEUE);
871 			counter_u64_add(priv->nfinfo_act_exp, 1);
872 		}
873 	}
874 
875 	if (fle != NULL) {			/* An existent entry. */
876 		fle6 = (struct flow6_entry *)fle;
877 
878 		fle6->f.bytes += plen;
879 		fle6->f.packets ++;
880 		fle6->f.tcp_flags |= tcp_flags;
881 		fle6->f.last = time_uptime;
882 
883 		/*
884 		 * We have the following reasons to expire flow in active way:
885 		 * - it hit active timeout
886 		 * - a TCP connection closed
887 		 * - it is going to overflow counter
888 		 */
889 		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) ||
890 		    (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
891 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
892 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
893 			    NG_QUEUE);
894 			counter_u64_add(priv->nfinfo_act_exp, 1);
895 		} else {
896 			/*
897 			 * It is the newest, move it to the tail,
898 			 * if it isn't there already. Next search will
899 			 * locate it quicker.
900 			 */
901 			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
902 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
903 				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
904 			}
905 		}
906 	} else				/* A new flow entry. */
907 		error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags);
908 
909 	mtx_unlock(&hsh->mtx);
910 
911 	return (error);
912 }
913 #endif
914 
915 /*
916  * Return records from cache to userland.
917  *
918  * TODO: matching particular IP should be done in kernel, here.
919  */
920 int
921 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req,
922 struct ngnf_show_header *resp)
923 {
924 	struct flow_hash_entry	*hsh;
925 	struct flow_entry	*fle;
926 	struct flow_entry_data	*data = (struct flow_entry_data *)(resp + 1);
927 #ifdef INET6
928 	struct flow6_entry_data	*data6 = (struct flow6_entry_data *)(resp + 1);
929 #endif
930 	int	i, max;
931 
932 	i = req->hash_id;
933 	if (i > NBUCKETS-1)
934 		return (EINVAL);
935 
936 #ifdef INET6
937 	if (req->version == 6) {
938 		resp->version = 6;
939 		hsh = priv->hash6 + i;
940 		max = NREC6_AT_ONCE;
941 	} else
942 #endif
943 	if (req->version == 4) {
944 		resp->version = 4;
945 		hsh = priv->hash + i;
946 		max = NREC_AT_ONCE;
947 	} else
948 		return (EINVAL);
949 
950 	/*
951 	 * We will transfer not more than NREC_AT_ONCE. More data
952 	 * will come in next message.
953 	 * We send current hash index and current record number in list
954 	 * to userland, and userland should return it back to us.
955 	 * Then, we will restart with new entry.
956 	 *
957 	 * The resulting cache snapshot can be inaccurate if flow expiration
958 	 * is taking place on hash item between userland data requests for
959 	 * this hash item id.
960 	 */
961 	resp->nentries = 0;
962 	for (; i < NBUCKETS; hsh++, i++) {
963 		int list_id;
964 
965 		if (mtx_trylock(&hsh->mtx) == 0) {
966 			/*
967 			 * Requested hash index is not available,
968 			 * relay decision to skip or re-request data
969 			 * to userland.
970 			 */
971 			resp->hash_id = i;
972 			resp->list_id = 0;
973 			return (0);
974 		}
975 
976 		list_id = 0;
977 		TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
978 			if (hsh->mtx.mtx_lock & MTX_CONTESTED) {
979 				resp->hash_id = i;
980 				resp->list_id = list_id;
981 				mtx_unlock(&hsh->mtx);
982 				return (0);
983 			}
984 
985 			list_id++;
986 			/* Search for particular record in list. */
987 			if (req->list_id > 0) {
988 				if (list_id < req->list_id)
989 					continue;
990 
991 				/* Requested list position found. */
992 				req->list_id = 0;
993 			}
994 #ifdef INET6
995 			if (req->version == 6) {
996 				struct flow6_entry *fle6;
997 
998 				fle6 = (struct flow6_entry *)fle;
999 				bcopy(&fle6->f, data6 + resp->nentries,
1000 				    sizeof(fle6->f));
1001 			} else
1002 #endif
1003 				bcopy(&fle->f, data + resp->nentries,
1004 				    sizeof(fle->f));
1005 			resp->nentries++;
1006 			if (resp->nentries == max) {
1007 				resp->hash_id = i;
1008 				/*
1009 				 * If it was the last item in list
1010 				 * we simply skip to next hash_id.
1011 				 */
1012 				resp->list_id = list_id + 1;
1013 				mtx_unlock(&hsh->mtx);
1014 				return (0);
1015 			}
1016 		}
1017 		mtx_unlock(&hsh->mtx);
1018 	}
1019 
1020 	resp->hash_id = resp->list_id = 0;
1021 
1022 	return (0);
1023 }
1024 
1025 /* We have full datagram in privdata. Send it to export hook. */
1026 static int
1027 export_send(priv_p priv, fib_export_p fe, item_p item, int flags)
1028 {
1029 	struct mbuf *m = NGI_M(item);
1030 	struct netflow_v5_export_dgram *dgram = mtod(m,
1031 					struct netflow_v5_export_dgram *);
1032 	struct netflow_v5_header *header = &dgram->header;
1033 	struct timespec ts;
1034 	int error = 0;
1035 
1036 	/* Fill mbuf header. */
1037 	m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
1038 	   header->count + sizeof(struct netflow_v5_header);
1039 
1040 	/* Fill export header. */
1041 	header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
1042 	getnanotime(&ts);
1043 	header->unix_secs  = htonl(ts.tv_sec);
1044 	header->unix_nsecs = htonl(ts.tv_nsec);
1045 	header->engine_type = 0;
1046 	header->engine_id = fe->domain_id;
1047 	header->pad = 0;
1048 	header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq,
1049 	    header->count));
1050 	header->count = htons(header->count);
1051 
1052 	if (priv->export != NULL)
1053 		NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags);
1054 	else
1055 		NG_FREE_ITEM(item);
1056 
1057 	return (error);
1058 }
1059 
1060 
1061 /* Add export record to dgram. */
1062 static int
1063 export_add(item_p item, struct flow_entry *fle)
1064 {
1065 	struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
1066 					struct netflow_v5_export_dgram *);
1067 	struct netflow_v5_header *header = &dgram->header;
1068 	struct netflow_v5_record *rec;
1069 
1070 	rec = &dgram->r[header->count];
1071 	header->count ++;
1072 
1073 	KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
1074 	    ("ng_netflow: export too big"));
1075 
1076 	/* Fill in export record. */
1077 	rec->src_addr = fle->f.r.r_src.s_addr;
1078 	rec->dst_addr = fle->f.r.r_dst.s_addr;
1079 	rec->next_hop = fle->f.next_hop.s_addr;
1080 	rec->i_ifx    = htons(fle->f.fle_i_ifx);
1081 	rec->o_ifx    = htons(fle->f.fle_o_ifx);
1082 	rec->packets  = htonl(fle->f.packets);
1083 	rec->octets   = htonl(fle->f.bytes);
1084 	rec->first    = htonl(MILLIUPTIME(fle->f.first));
1085 	rec->last     = htonl(MILLIUPTIME(fle->f.last));
1086 	rec->s_port   = fle->f.r.r_sport;
1087 	rec->d_port   = fle->f.r.r_dport;
1088 	rec->flags    = fle->f.tcp_flags;
1089 	rec->prot     = fle->f.r.r_ip_p;
1090 	rec->tos      = fle->f.r.r_tos;
1091 	rec->dst_mask = fle->f.dst_mask;
1092 	rec->src_mask = fle->f.src_mask;
1093 	rec->pad1     = 0;
1094 	rec->pad2     = 0;
1095 
1096 	/* Not supported fields. */
1097 	rec->src_as = rec->dst_as = 0;
1098 
1099 	if (header->count == NETFLOW_V5_MAX_RECORDS)
1100 		return (1); /* end of datagram */
1101 	else
1102 		return (0);
1103 }
1104 
1105 /* Periodic flow expiry run. */
1106 void
1107 ng_netflow_expire(void *arg)
1108 {
1109 	struct flow_entry	*fle, *fle1;
1110 	struct flow_hash_entry	*hsh;
1111 	priv_p			priv = (priv_p )arg;
1112 	int			used, i;
1113 
1114 	/*
1115 	 * Going through all the cache.
1116 	 */
1117 	used = uma_zone_get_cur(priv->zone);
1118 	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
1119 		/*
1120 		 * Skip entries, that are already being worked on.
1121 		 */
1122 		if (mtx_trylock(&hsh->mtx) == 0)
1123 			continue;
1124 
1125 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1126 			/*
1127 			 * Interrupt thread wants this entry!
1128 			 * Quick! Quick! Bail out!
1129 			 */
1130 			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1131 				break;
1132 
1133 			/*
1134 			 * Don't expire aggressively while hash collision
1135 			 * ratio is predicted small.
1136 			 */
1137 			if (used <= (NBUCKETS*2) && !INACTIVE(fle))
1138 				break;
1139 
1140 			if ((INACTIVE(fle) && (SMALL(fle) ||
1141 			    (used > (NBUCKETS*2)))) || AGED(fle)) {
1142 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1143 				expire_flow(priv, priv_to_fib(priv,
1144 				    fle->f.r.fib), fle, NG_NOFLAGS);
1145 				used--;
1146 				counter_u64_add(priv->nfinfo_inact_exp, 1);
1147 			}
1148 		}
1149 		mtx_unlock(&hsh->mtx);
1150 	}
1151 
1152 #ifdef INET6
1153 	used = uma_zone_get_cur(priv->zone6);
1154 	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) {
1155 		struct flow6_entry	*fle6;
1156 
1157 		/*
1158 		 * Skip entries, that are already being worked on.
1159 		 */
1160 		if (mtx_trylock(&hsh->mtx) == 0)
1161 			continue;
1162 
1163 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1164 			fle6 = (struct flow6_entry *)fle;
1165 			/*
1166 			 * Interrupt thread wants this entry!
1167 			 * Quick! Quick! Bail out!
1168 			 */
1169 			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1170 				break;
1171 
1172 			/*
1173 			 * Don't expire aggressively while hash collision
1174 			 * ratio is predicted small.
1175 			 */
1176 			if (used <= (NBUCKETS*2) && !INACTIVE(fle6))
1177 				break;
1178 
1179 			if ((INACTIVE(fle6) && (SMALL(fle6) ||
1180 			    (used > (NBUCKETS*2)))) || AGED(fle6)) {
1181 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1182 				expire_flow(priv, priv_to_fib(priv,
1183 				    fle->f.r.fib), fle, NG_NOFLAGS);
1184 				used--;
1185 				counter_u64_add(priv->nfinfo_inact_exp, 1);
1186 			}
1187 		}
1188 		mtx_unlock(&hsh->mtx);
1189 	}
1190 #endif
1191 
1192 	/* Schedule next expire. */
1193 	callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
1194 	    (void *)priv);
1195 }
1196