xref: /freebsd/sys/netgraph/netflow/netflow.c (revision 3311ff84eac3b7e82f28e331df0586036c6d361c)
1 /*-
2  * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru>
3  * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
4  * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  *
28  * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
29  */
30 
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
33 
34 #include "opt_inet6.h"
35 #include "opt_route.h"
36 #include <sys/param.h>
37 #include <sys/systm.h>
38 #include <sys/counter.h>
39 #include <sys/kernel.h>
40 #include <sys/limits.h>
41 #include <sys/mbuf.h>
42 #include <sys/syslog.h>
43 #include <sys/socket.h>
44 
45 #include <net/if.h>
46 #include <net/if_var.h>
47 #include <net/route.h>
48 #include <net/ethernet.h>
49 #include <netinet/in.h>
50 #include <netinet/in_systm.h>
51 #include <netinet/ip.h>
52 #include <netinet/ip6.h>
53 #include <netinet/tcp.h>
54 #include <netinet/udp.h>
55 
56 #include <netgraph/ng_message.h>
57 #include <netgraph/netgraph.h>
58 
59 #include <netgraph/netflow/netflow.h>
60 #include <netgraph/netflow/netflow_v9.h>
61 #include <netgraph/netflow/ng_netflow.h>
62 
63 #define	NBUCKETS	(65536)		/* must be power of 2 */
64 
65 /* This hash is for TCP or UDP packets. */
66 #define FULL_HASH(addr1, addr2, port1, port2)	\
67 	(((addr1 ^ (addr1 >> 16) ^ 		\
68 	htons(addr2 ^ (addr2 >> 16))) ^ 	\
69 	port1 ^ htons(port2)) &			\
70 	(NBUCKETS - 1))
71 
72 /* This hash is for all other IP packets. */
73 #define ADDR_HASH(addr1, addr2)			\
74 	((addr1 ^ (addr1 >> 16) ^ 		\
75 	htons(addr2 ^ (addr2 >> 16))) &		\
76 	(NBUCKETS - 1))
77 
78 /* Macros to shorten logical constructions */
79 /* XXX: priv must exist in namespace */
80 #define	INACTIVE(fle)	(time_uptime - fle->f.last > priv->nfinfo_inact_t)
81 #define	AGED(fle)	(time_uptime - fle->f.first > priv->nfinfo_act_t)
82 #define	ISFREE(fle)	(fle->f.packets == 0)
83 
84 /*
85  * 4 is a magical number: statistically number of 4-packet flows is
86  * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
87  * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
88  * of reachable host and 4-packet otherwise.
89  */
90 #define	SMALL(fle)	(fle->f.packets <= 4)
91 
92 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash");
93 
94 static int export_add(item_p, struct flow_entry *);
95 static int export_send(priv_p, fib_export_p, item_p, int);
96 
97 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *,
98     int, uint8_t, uint8_t);
99 #ifdef INET6
100 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *,
101     int, uint8_t, uint8_t);
102 #endif
103 
104 static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int);
105 
106 /*
107  * Generate hash for a given flow record.
108  *
109  * FIB is not used here, because:
110  * most VRFS will carry public IPv4 addresses which are unique even
111  * without FIB private addresses can overlap, but this is worked out
112  * via flow_rec bcmp() containing fib id. In IPv6 world addresses are
113  * all globally unique (it's not fully true, there is FC00::/7 for example,
114  * but chances of address overlap are MUCH smaller)
115  */
116 static inline uint32_t
117 ip_hash(struct flow_rec *r)
118 {
119 
120 	switch (r->r_ip_p) {
121 	case IPPROTO_TCP:
122 	case IPPROTO_UDP:
123 		return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
124 		    r->r_sport, r->r_dport);
125 	default:
126 		return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
127 	}
128 }
129 
130 #ifdef INET6
131 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */
132 static inline uint32_t
133 ip6_hash(struct flow6_rec *r)
134 {
135 
136 	switch (r->r_ip_p) {
137 	case IPPROTO_TCP:
138 	case IPPROTO_UDP:
139 		return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
140 		    r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport,
141 		    r->r_dport);
142 	default:
143 		return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
144 		    r->dst.r_dst6.__u6_addr.__u6_addr32[3]);
145  	}
146 }
147 #endif
148 
149 /*
150  * Detach export datagram from priv, if there is any.
151  * If there is no, allocate a new one.
152  */
153 static item_p
154 get_export_dgram(priv_p priv, fib_export_p fe)
155 {
156 	item_p	item = NULL;
157 
158 	mtx_lock(&fe->export_mtx);
159 	if (fe->exp.item != NULL) {
160 		item = fe->exp.item;
161 		fe->exp.item = NULL;
162 	}
163 	mtx_unlock(&fe->export_mtx);
164 
165 	if (item == NULL) {
166 		struct netflow_v5_export_dgram *dgram;
167 		struct mbuf *m;
168 
169 		m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR);
170 		if (m == NULL)
171 			return (NULL);
172 		item = ng_package_data(m, NG_NOFLAGS);
173 		if (item == NULL)
174 			return (NULL);
175 		dgram = mtod(m, struct netflow_v5_export_dgram *);
176 		dgram->header.count = 0;
177 		dgram->header.version = htons(NETFLOW_V5);
178 		dgram->header.pad = 0;
179 	}
180 
181 	return (item);
182 }
183 
184 /*
185  * Re-attach incomplete datagram back to priv.
186  * If there is already another one, then send incomplete. */
187 static void
188 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags)
189 {
190 
191 	/*
192 	 * It may happen on SMP, that some thread has already
193 	 * put its item there, in this case we bail out and
194 	 * send what we have to collector.
195 	 */
196 	mtx_lock(&fe->export_mtx);
197 	if (fe->exp.item == NULL) {
198 		fe->exp.item = item;
199 		mtx_unlock(&fe->export_mtx);
200 	} else {
201 		mtx_unlock(&fe->export_mtx);
202 		export_send(priv, fe, item, flags);
203 	}
204 }
205 
206 /*
207  * The flow is over. Call export_add() and free it. If datagram is
208  * full, then call export_send().
209  */
210 static void
211 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags)
212 {
213 	struct netflow_export_item exp;
214 	uint16_t version = fle->f.version;
215 
216 	if ((priv->export != NULL) && (version == IPVERSION)) {
217 		exp.item = get_export_dgram(priv, fe);
218 		if (exp.item == NULL) {
219 			priv->nfinfo_export_failed++;
220 			if (priv->export9 != NULL)
221 				priv->nfinfo_export9_failed++;
222 			/* fle definitely contains IPv4 flow. */
223 			uma_zfree_arg(priv->zone, fle, priv);
224 			return;
225 		}
226 
227 		if (export_add(exp.item, fle) > 0)
228 			export_send(priv, fe, exp.item, flags);
229 		else
230 			return_export_dgram(priv, fe, exp.item, NG_QUEUE);
231 	}
232 
233 	if (priv->export9 != NULL) {
234 		exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt);
235 		if (exp.item9 == NULL) {
236 			priv->nfinfo_export9_failed++;
237 			if (version == IPVERSION)
238 				uma_zfree_arg(priv->zone, fle, priv);
239 #ifdef INET6
240 			else if (version == IP6VERSION)
241 				uma_zfree_arg(priv->zone6, fle, priv);
242 #endif
243 			else
244 				panic("ng_netflow: Unknown IP proto: %d",
245 				    version);
246 			return;
247 		}
248 
249 		if (export9_add(exp.item9, exp.item9_opt, fle) > 0)
250 			export9_send(priv, fe, exp.item9, exp.item9_opt, flags);
251 		else
252 			return_export9_dgram(priv, fe, exp.item9,
253 			    exp.item9_opt, NG_QUEUE);
254 	}
255 
256 	if (version == IPVERSION)
257 		uma_zfree_arg(priv->zone, fle, priv);
258 #ifdef INET6
259 	else if (version == IP6VERSION)
260 		uma_zfree_arg(priv->zone6, fle, priv);
261 #endif
262 }
263 
264 /* Get a snapshot of node statistics */
265 void
266 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
267 {
268 
269 	i->nfinfo_bytes = counter_u64_fetch(priv->nfinfo_bytes);
270 	i->nfinfo_packets = counter_u64_fetch(priv->nfinfo_packets);
271 	i->nfinfo_bytes6 = counter_u64_fetch(priv->nfinfo_bytes6);
272 	i->nfinfo_packets6 = counter_u64_fetch(priv->nfinfo_packets6);
273 	i->nfinfo_sbytes = counter_u64_fetch(priv->nfinfo_sbytes);
274 	i->nfinfo_spackets = counter_u64_fetch(priv->nfinfo_spackets);
275 	i->nfinfo_sbytes6 = counter_u64_fetch(priv->nfinfo_sbytes6);
276 	i->nfinfo_spackets6 = counter_u64_fetch(priv->nfinfo_spackets6);
277 	i->nfinfo_act_exp = counter_u64_fetch(priv->nfinfo_act_exp);
278 	i->nfinfo_inact_exp = counter_u64_fetch(priv->nfinfo_inact_exp);
279 
280 	i->nfinfo_used = uma_zone_get_cur(priv->zone);
281 #ifdef INET6
282 	i->nfinfo_used6 = uma_zone_get_cur(priv->zone6);
283 #endif
284 
285 	i->nfinfo_alloc_failed = priv->nfinfo_alloc_failed;
286 	i->nfinfo_export_failed = priv->nfinfo_export_failed;
287 	i->nfinfo_export9_failed = priv->nfinfo_export9_failed;
288 	i->nfinfo_realloc_mbuf = priv->nfinfo_realloc_mbuf;
289 	i->nfinfo_alloc_fibs = priv->nfinfo_alloc_fibs;
290 	i->nfinfo_inact_t = priv->nfinfo_inact_t;
291 	i->nfinfo_act_t = priv->nfinfo_act_t;
292 }
293 
294 /*
295  * Insert a record into defined slot.
296  *
297  * First we get for us a free flow entry, then fill in all
298  * possible fields in it.
299  *
300  * TODO: consider dropping hash mutex while filling in datagram,
301  * as this was done in previous version. Need to test & profile
302  * to be sure.
303  */
304 static int
305 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
306 	int plen, uint8_t flags, uint8_t tcp_flags)
307 {
308 	struct flow_entry *fle;
309 	struct sockaddr_in sin;
310 	struct rtentry *rt;
311 
312 	mtx_assert(&hsh->mtx, MA_OWNED);
313 
314 	fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
315 	if (fle == NULL) {
316 		priv->nfinfo_alloc_failed++;
317 		return (ENOMEM);
318 	}
319 
320 	/*
321 	 * Now fle is totally ours. It is detached from all lists,
322 	 * we can safely edit it.
323 	 */
324 	fle->f.version = IPVERSION;
325 	bcopy(r, &fle->f.r, sizeof(struct flow_rec));
326 	fle->f.bytes = plen;
327 	fle->f.packets = 1;
328 	fle->f.tcp_flags = tcp_flags;
329 
330 	fle->f.first = fle->f.last = time_uptime;
331 
332 	/*
333 	 * First we do route table lookup on destination address. So we can
334 	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
335 	 */
336 	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
337 		bzero(&sin, sizeof(sin));
338 		sin.sin_len = sizeof(struct sockaddr_in);
339 		sin.sin_family = AF_INET;
340 		sin.sin_addr = fle->f.r.r_dst;
341 		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
342 		if (rt != NULL) {
343 			fle->f.fle_o_ifx = rt->rt_ifp->if_index;
344 
345 			if (rt->rt_flags & RTF_GATEWAY &&
346 			    rt->rt_gateway->sa_family == AF_INET)
347 				fle->f.next_hop =
348 				    ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr;
349 
350 			if (rt_mask(rt))
351 				fle->f.dst_mask =
352 				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
353 			else if (rt->rt_flags & RTF_HOST)
354 				/* Give up. We can't determine mask :( */
355 				fle->f.dst_mask = 32;
356 
357 			RTFREE_LOCKED(rt);
358 		}
359 	}
360 
361 	/* Do route lookup on source address, to fill in src_mask. */
362 	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
363 		bzero(&sin, sizeof(sin));
364 		sin.sin_len = sizeof(struct sockaddr_in);
365 		sin.sin_family = AF_INET;
366 		sin.sin_addr = fle->f.r.r_src;
367 		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
368 		if (rt != NULL) {
369 			if (rt_mask(rt))
370 				fle->f.src_mask =
371 				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
372 			else if (rt->rt_flags & RTF_HOST)
373 				/* Give up. We can't determine mask :( */
374 				fle->f.src_mask = 32;
375 
376 			RTFREE_LOCKED(rt);
377 		}
378 	}
379 
380 	/* Push new flow at the and of hash. */
381 	TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
382 
383 	return (0);
384 }
385 
386 #ifdef INET6
387 /* XXX: make normal function, instead of.. */
388 #define ipv6_masklen(x)		bitcount32((x).__u6_addr.__u6_addr32[0]) + \
389 				bitcount32((x).__u6_addr.__u6_addr32[1]) + \
390 				bitcount32((x).__u6_addr.__u6_addr32[2]) + \
391 				bitcount32((x).__u6_addr.__u6_addr32[3])
392 #define RT_MASK6(x)	(ipv6_masklen(((struct sockaddr_in6 *)rt_mask(x))->sin6_addr))
393 static int
394 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r,
395 	int plen, uint8_t flags, uint8_t tcp_flags)
396 {
397 	struct flow6_entry *fle6;
398 	struct sockaddr_in6 sin6;
399 	struct rtentry *rt;
400 
401 	mtx_assert(&hsh6->mtx, MA_OWNED);
402 
403 	fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT);
404 	if (fle6 == NULL) {
405 		priv->nfinfo_alloc_failed++;
406 		return (ENOMEM);
407 	}
408 
409 	/*
410 	 * Now fle is totally ours. It is detached from all lists,
411 	 * we can safely edit it.
412 	 */
413 
414 	fle6->f.version = IP6VERSION;
415 	bcopy(r, &fle6->f.r, sizeof(struct flow6_rec));
416 	fle6->f.bytes = plen;
417 	fle6->f.packets = 1;
418 	fle6->f.tcp_flags = tcp_flags;
419 
420 	fle6->f.first = fle6->f.last = time_uptime;
421 
422 	/*
423 	 * First we do route table lookup on destination address. So we can
424 	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
425 	 */
426 	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
427 		bzero(&sin6, sizeof(struct sockaddr_in6));
428 		sin6.sin6_len = sizeof(struct sockaddr_in6);
429 		sin6.sin6_family = AF_INET6;
430 		sin6.sin6_addr = r->dst.r_dst6;
431 
432 		rt = rtalloc1_fib((struct sockaddr *)&sin6, 0, 0, r->fib);
433 
434 		if (rt != NULL) {
435 			fle6->f.fle_o_ifx = rt->rt_ifp->if_index;
436 
437 			if (rt->rt_flags & RTF_GATEWAY &&
438 			    rt->rt_gateway->sa_family == AF_INET6)
439 				fle6->f.n.next_hop6 =
440 				    ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr;
441 
442 			if (rt_mask(rt))
443 				fle6->f.dst_mask = RT_MASK6(rt);
444 			else
445 				fle6->f.dst_mask = 128;
446 
447 			RTFREE_LOCKED(rt);
448 		}
449 	}
450 
451 	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
452 		/* Do route lookup on source address, to fill in src_mask. */
453 		bzero(&sin6, sizeof(struct sockaddr_in6));
454 		sin6.sin6_len = sizeof(struct sockaddr_in6);
455 		sin6.sin6_family = AF_INET6;
456 		sin6.sin6_addr = r->src.r_src6;
457 
458 		rt = rtalloc1_fib((struct sockaddr *)&sin6, 0, 0, r->fib);
459 
460 		if (rt != NULL) {
461 			if (rt_mask(rt))
462 				fle6->f.src_mask = RT_MASK6(rt);
463 			else
464 				fle6->f.src_mask = 128;
465 
466 			RTFREE_LOCKED(rt);
467 		}
468 	}
469 
470 	/* Push new flow at the and of hash. */
471 	TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash);
472 
473 	return (0);
474 }
475 #undef ipv6_masklen
476 #undef RT_MASK6
477 #endif
478 
479 
480 /*
481  * Non-static functions called from ng_netflow.c
482  */
483 
484 /* Allocate memory and set up flow cache */
485 void
486 ng_netflow_cache_init(priv_p priv)
487 {
488 	struct flow_hash_entry *hsh;
489 	int i;
490 
491 	/* Initialize cache UMA zone. */
492 	priv->zone = uma_zcreate("NetFlow IPv4 cache",
493 	    sizeof(struct flow_entry), NULL, NULL, NULL, NULL,
494 	    UMA_ALIGN_CACHE, 0);
495 	uma_zone_set_max(priv->zone, CACHESIZE);
496 #ifdef INET6
497 	priv->zone6 = uma_zcreate("NetFlow IPv6 cache",
498 	    sizeof(struct flow6_entry), NULL, NULL, NULL, NULL,
499 	    UMA_ALIGN_CACHE, 0);
500 	uma_zone_set_max(priv->zone6, CACHESIZE);
501 #endif
502 
503 	/* Allocate hash. */
504 	priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
505 	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
506 
507 	/* Initialize hash. */
508 	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
509 		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
510 		TAILQ_INIT(&hsh->head);
511 	}
512 
513 #ifdef INET6
514 	/* Allocate hash. */
515 	priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
516 	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
517 
518 	/* Initialize hash. */
519 	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) {
520 		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
521 		TAILQ_INIT(&hsh->head);
522 	}
523 #endif
524 
525 	priv->nfinfo_bytes = counter_u64_alloc(M_WAITOK);
526 	priv->nfinfo_packets = counter_u64_alloc(M_WAITOK);
527 	priv->nfinfo_bytes6 = counter_u64_alloc(M_WAITOK);
528 	priv->nfinfo_packets6 = counter_u64_alloc(M_WAITOK);
529 	priv->nfinfo_sbytes = counter_u64_alloc(M_WAITOK);
530 	priv->nfinfo_spackets = counter_u64_alloc(M_WAITOK);
531 	priv->nfinfo_sbytes6 = counter_u64_alloc(M_WAITOK);
532 	priv->nfinfo_spackets6 = counter_u64_alloc(M_WAITOK);
533 	priv->nfinfo_act_exp = counter_u64_alloc(M_WAITOK);
534 	priv->nfinfo_inact_exp = counter_u64_alloc(M_WAITOK);
535 
536 	ng_netflow_v9_cache_init(priv);
537 	CTR0(KTR_NET, "ng_netflow startup()");
538 }
539 
540 /* Initialize new FIB table for v5 and v9 */
541 int
542 ng_netflow_fib_init(priv_p priv, int fib)
543 {
544 	fib_export_p	fe = priv_to_fib(priv, fib);
545 
546 	CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib);
547 
548 	if (fe != NULL)
549 		return (0);
550 
551 	if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH,
552 	    M_NOWAIT | M_ZERO)) == NULL)
553 		return (ENOMEM);
554 
555 	mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF);
556 	mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF);
557 	fe->fib = fib;
558 	fe->domain_id = fib;
559 
560 	if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib],
561 	    (uintptr_t)NULL, (uintptr_t)fe) == 0) {
562 		/* FIB already set up by other ISR */
563 		CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p",
564 		    fib, fe, priv_to_fib(priv, fib));
565 		mtx_destroy(&fe->export_mtx);
566 		mtx_destroy(&fe->export9_mtx);
567 		free(fe, M_NETGRAPH);
568 	} else {
569 		/* Increase counter for statistics */
570 		CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)",
571 		    fib, fe, priv_to_fib(priv, fib));
572 		priv->nfinfo_alloc_fibs++;
573 	}
574 
575 	return (0);
576 }
577 
578 /* Free all flow cache memory. Called from node close method. */
579 void
580 ng_netflow_cache_flush(priv_p priv)
581 {
582 	struct flow_entry	*fle, *fle1;
583 	struct flow_hash_entry	*hsh;
584 	struct netflow_export_item exp;
585 	fib_export_p fe;
586 	int i;
587 
588 	bzero(&exp, sizeof(exp));
589 
590 	/*
591 	 * We are going to free probably billable data.
592 	 * Expire everything before freeing it.
593 	 * No locking is required since callout is already drained.
594 	 */
595 	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
596 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
597 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
598 			fe = priv_to_fib(priv, fle->f.r.fib);
599 			expire_flow(priv, fe, fle, NG_QUEUE);
600 		}
601 #ifdef INET6
602 	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++)
603 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
604 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
605 			fe = priv_to_fib(priv, fle->f.r.fib);
606 			expire_flow(priv, fe, fle, NG_QUEUE);
607 		}
608 #endif
609 
610 	uma_zdestroy(priv->zone);
611 	/* Destroy hash mutexes. */
612 	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
613 		mtx_destroy(&hsh->mtx);
614 
615 	/* Free hash memory. */
616 	if (priv->hash != NULL)
617 		free(priv->hash, M_NETFLOW_HASH);
618 #ifdef INET6
619 	uma_zdestroy(priv->zone6);
620 	/* Destroy hash mutexes. */
621 	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++)
622 		mtx_destroy(&hsh->mtx);
623 
624 	/* Free hash memory. */
625 	if (priv->hash6 != NULL)
626 		free(priv->hash6, M_NETFLOW_HASH);
627 #endif
628 
629 	for (i = 0; i < priv->maxfibs; i++) {
630 		if ((fe = priv_to_fib(priv, i)) == NULL)
631 			continue;
632 
633 		if (fe->exp.item != NULL)
634 			export_send(priv, fe, fe->exp.item, NG_QUEUE);
635 
636 		if (fe->exp.item9 != NULL)
637 			export9_send(priv, fe, fe->exp.item9,
638 			    fe->exp.item9_opt, NG_QUEUE);
639 
640 		mtx_destroy(&fe->export_mtx);
641 		mtx_destroy(&fe->export9_mtx);
642 		free(fe, M_NETGRAPH);
643 	}
644 
645 	counter_u64_free(priv->nfinfo_bytes);
646 	counter_u64_free(priv->nfinfo_packets);
647 	counter_u64_free(priv->nfinfo_bytes6);
648 	counter_u64_free(priv->nfinfo_packets6);
649 	counter_u64_free(priv->nfinfo_sbytes);
650 	counter_u64_free(priv->nfinfo_spackets);
651 	counter_u64_free(priv->nfinfo_sbytes6);
652 	counter_u64_free(priv->nfinfo_spackets6);
653 	counter_u64_free(priv->nfinfo_act_exp);
654 	counter_u64_free(priv->nfinfo_inact_exp);
655 
656 	ng_netflow_v9_cache_flush(priv);
657 }
658 
659 /* Insert packet from into flow cache. */
660 int
661 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip,
662     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
663     unsigned int src_if_index)
664 {
665 	struct flow_entry	*fle, *fle1;
666 	struct flow_hash_entry	*hsh;
667 	struct flow_rec		r;
668 	int			hlen, plen;
669 	int			error = 0;
670 	uint16_t		eproto;
671 	uint8_t			tcp_flags = 0;
672 
673 	bzero(&r, sizeof(r));
674 
675 	if (ip->ip_v != IPVERSION)
676 		return (EINVAL);
677 
678 	hlen = ip->ip_hl << 2;
679 	if (hlen < sizeof(struct ip))
680 		return (EINVAL);
681 
682 	eproto = ETHERTYPE_IP;
683 	/* Assume L4 template by default */
684 	r.flow_type = NETFLOW_V9_FLOW_V4_L4;
685 
686 	r.r_src = ip->ip_src;
687 	r.r_dst = ip->ip_dst;
688 	r.fib = fe->fib;
689 
690 	plen = ntohs(ip->ip_len);
691 
692 	r.r_ip_p = ip->ip_p;
693 	r.r_tos = ip->ip_tos;
694 
695 	r.r_i_ifx = src_if_index;
696 
697 	/*
698 	 * XXX NOTE: only first fragment of fragmented TCP, UDP and
699 	 * ICMP packet will be recorded with proper s_port and d_port.
700 	 * Following fragments will be recorded simply as IP packet with
701 	 * ip_proto = ip->ip_p and s_port, d_port set to zero.
702 	 * I know, it looks like bug. But I don't want to re-implement
703 	 * ip packet assebmling here. Anyway, (in)famous trafd works this way -
704 	 * and nobody complains yet :)
705 	 */
706 	if ((ip->ip_off & htons(IP_OFFMASK)) == 0)
707 		switch(r.r_ip_p) {
708 		case IPPROTO_TCP:
709 		    {
710 			struct tcphdr *tcp;
711 
712 			tcp = (struct tcphdr *)((caddr_t )ip + hlen);
713 			r.r_sport = tcp->th_sport;
714 			r.r_dport = tcp->th_dport;
715 			tcp_flags = tcp->th_flags;
716 			break;
717 		    }
718 		case IPPROTO_UDP:
719 			r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
720 			break;
721 		}
722 
723 	counter_u64_add(priv->nfinfo_packets, 1);
724 	counter_u64_add(priv->nfinfo_bytes, plen);
725 
726 	/* Find hash slot. */
727 	hsh = &priv->hash[ip_hash(&r)];
728 
729 	mtx_lock(&hsh->mtx);
730 
731 	/*
732 	 * Go through hash and find our entry. If we encounter an
733 	 * entry, that should be expired, purge it. We do a reverse
734 	 * search since most active entries are first, and most
735 	 * searches are done on most active entries.
736 	 */
737 	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
738 		if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
739 			break;
740 		if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
741 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
742 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
743 			    fle, NG_QUEUE);
744 			counter_u64_add(priv->nfinfo_act_exp, 1);
745 		}
746 	}
747 
748 	if (fle) {			/* An existent entry. */
749 
750 		fle->f.bytes += plen;
751 		fle->f.packets ++;
752 		fle->f.tcp_flags |= tcp_flags;
753 		fle->f.last = time_uptime;
754 
755 		/*
756 		 * We have the following reasons to expire flow in active way:
757 		 * - it hit active timeout
758 		 * - a TCP connection closed
759 		 * - it is going to overflow counter
760 		 */
761 		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
762 		    (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
763 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
764 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
765 			    fle, NG_QUEUE);
766 			counter_u64_add(priv->nfinfo_act_exp, 1);
767 		} else {
768 			/*
769 			 * It is the newest, move it to the tail,
770 			 * if it isn't there already. Next search will
771 			 * locate it quicker.
772 			 */
773 			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
774 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
775 				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
776 			}
777 		}
778 	} else				/* A new flow entry. */
779 		error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags);
780 
781 	mtx_unlock(&hsh->mtx);
782 
783 	return (error);
784 }
785 
786 #ifdef INET6
787 /* Insert IPv6 packet from into flow cache. */
788 int
789 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6,
790     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
791     unsigned int src_if_index)
792 {
793 	struct flow_entry	*fle = NULL, *fle1;
794 	struct flow6_entry	*fle6;
795 	struct flow_hash_entry	*hsh;
796 	struct flow6_rec	r;
797 	int			plen;
798 	int			error = 0;
799 	uint8_t			tcp_flags = 0;
800 
801 	/* check version */
802 	if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
803 		return (EINVAL);
804 
805 	bzero(&r, sizeof(r));
806 
807 	r.src.r_src6 = ip6->ip6_src;
808 	r.dst.r_dst6 = ip6->ip6_dst;
809 	r.fib = fe->fib;
810 
811 	/* Assume L4 template by default */
812 	r.flow_type = NETFLOW_V9_FLOW_V6_L4;
813 
814 	plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr);
815 
816 #if 0
817 	/* XXX: set DSCP/CoS value */
818 	r.r_tos = ip->ip_tos;
819 #endif
820 	if ((flags & NG_NETFLOW_IS_FRAG) == 0) {
821 		switch(upper_proto) {
822 		case IPPROTO_TCP:
823 		    {
824 			struct tcphdr *tcp;
825 
826 			tcp = (struct tcphdr *)upper_ptr;
827 			r.r_ports = *(uint32_t *)upper_ptr;
828 			tcp_flags = tcp->th_flags;
829 			break;
830 		    }
831  		case IPPROTO_UDP:
832 		case IPPROTO_SCTP:
833 			r.r_ports = *(uint32_t *)upper_ptr;
834 			break;
835 		}
836 	}
837 
838 	r.r_ip_p = upper_proto;
839 	r.r_i_ifx = src_if_index;
840 
841 	counter_u64_add(priv->nfinfo_packets6, 1);
842 	counter_u64_add(priv->nfinfo_bytes6, plen);
843 
844 	/* Find hash slot. */
845 	hsh = &priv->hash6[ip6_hash(&r)];
846 
847 	mtx_lock(&hsh->mtx);
848 
849 	/*
850 	 * Go through hash and find our entry. If we encounter an
851 	 * entry, that should be expired, purge it. We do a reverse
852 	 * search since most active entries are first, and most
853 	 * searches are done on most active entries.
854 	 */
855 	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
856 		if (fle->f.version != IP6VERSION)
857 			continue;
858 		fle6 = (struct flow6_entry *)fle;
859 		if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0)
860 			break;
861 		if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) {
862 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
863 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
864 			    NG_QUEUE);
865 			counter_u64_add(priv->nfinfo_act_exp, 1);
866 		}
867 	}
868 
869 	if (fle != NULL) {			/* An existent entry. */
870 		fle6 = (struct flow6_entry *)fle;
871 
872 		fle6->f.bytes += plen;
873 		fle6->f.packets ++;
874 		fle6->f.tcp_flags |= tcp_flags;
875 		fle6->f.last = time_uptime;
876 
877 		/*
878 		 * We have the following reasons to expire flow in active way:
879 		 * - it hit active timeout
880 		 * - a TCP connection closed
881 		 * - it is going to overflow counter
882 		 */
883 		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) ||
884 		    (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
885 			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
886 			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
887 			    NG_QUEUE);
888 			counter_u64_add(priv->nfinfo_act_exp, 1);
889 		} else {
890 			/*
891 			 * It is the newest, move it to the tail,
892 			 * if it isn't there already. Next search will
893 			 * locate it quicker.
894 			 */
895 			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
896 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
897 				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
898 			}
899 		}
900 	} else				/* A new flow entry. */
901 		error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags);
902 
903 	mtx_unlock(&hsh->mtx);
904 
905 	return (error);
906 }
907 #endif
908 
909 /*
910  * Return records from cache to userland.
911  *
912  * TODO: matching particular IP should be done in kernel, here.
913  */
914 int
915 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req,
916 struct ngnf_show_header *resp)
917 {
918 	struct flow_hash_entry	*hsh;
919 	struct flow_entry	*fle;
920 	struct flow_entry_data	*data = (struct flow_entry_data *)(resp + 1);
921 #ifdef INET6
922 	struct flow6_entry_data	*data6 = (struct flow6_entry_data *)(resp + 1);
923 #endif
924 	int	i, max;
925 
926 	i = req->hash_id;
927 	if (i > NBUCKETS-1)
928 		return (EINVAL);
929 
930 #ifdef INET6
931 	if (req->version == 6) {
932 		resp->version = 6;
933 		hsh = priv->hash6 + i;
934 		max = NREC6_AT_ONCE;
935 	} else
936 #endif
937 	if (req->version == 4) {
938 		resp->version = 4;
939 		hsh = priv->hash + i;
940 		max = NREC_AT_ONCE;
941 	} else
942 		return (EINVAL);
943 
944 	/*
945 	 * We will transfer not more than NREC_AT_ONCE. More data
946 	 * will come in next message.
947 	 * We send current hash index and current record number in list
948 	 * to userland, and userland should return it back to us.
949 	 * Then, we will restart with new entry.
950 	 *
951 	 * The resulting cache snapshot can be inaccurate if flow expiration
952 	 * is taking place on hash item between userland data requests for
953 	 * this hash item id.
954 	 */
955 	resp->nentries = 0;
956 	for (; i < NBUCKETS; hsh++, i++) {
957 		int list_id;
958 
959 		if (mtx_trylock(&hsh->mtx) == 0) {
960 			/*
961 			 * Requested hash index is not available,
962 			 * relay decision to skip or re-request data
963 			 * to userland.
964 			 */
965 			resp->hash_id = i;
966 			resp->list_id = 0;
967 			return (0);
968 		}
969 
970 		list_id = 0;
971 		TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
972 			if (hsh->mtx.mtx_lock & MTX_CONTESTED) {
973 				resp->hash_id = i;
974 				resp->list_id = list_id;
975 				mtx_unlock(&hsh->mtx);
976 				return (0);
977 			}
978 
979 			list_id++;
980 			/* Search for particular record in list. */
981 			if (req->list_id > 0) {
982 				if (list_id < req->list_id)
983 					continue;
984 
985 				/* Requested list position found. */
986 				req->list_id = 0;
987 			}
988 #ifdef INET6
989 			if (req->version == 6) {
990 				struct flow6_entry *fle6;
991 
992 				fle6 = (struct flow6_entry *)fle;
993 				bcopy(&fle6->f, data6 + resp->nentries,
994 				    sizeof(fle6->f));
995 			} else
996 #endif
997 				bcopy(&fle->f, data + resp->nentries,
998 				    sizeof(fle->f));
999 			resp->nentries++;
1000 			if (resp->nentries == max) {
1001 				resp->hash_id = i;
1002 				/*
1003 				 * If it was the last item in list
1004 				 * we simply skip to next hash_id.
1005 				 */
1006 				resp->list_id = list_id + 1;
1007 				mtx_unlock(&hsh->mtx);
1008 				return (0);
1009 			}
1010 		}
1011 		mtx_unlock(&hsh->mtx);
1012 	}
1013 
1014 	resp->hash_id = resp->list_id = 0;
1015 
1016 	return (0);
1017 }
1018 
1019 /* We have full datagram in privdata. Send it to export hook. */
1020 static int
1021 export_send(priv_p priv, fib_export_p fe, item_p item, int flags)
1022 {
1023 	struct mbuf *m = NGI_M(item);
1024 	struct netflow_v5_export_dgram *dgram = mtod(m,
1025 					struct netflow_v5_export_dgram *);
1026 	struct netflow_v5_header *header = &dgram->header;
1027 	struct timespec ts;
1028 	int error = 0;
1029 
1030 	/* Fill mbuf header. */
1031 	m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
1032 	   header->count + sizeof(struct netflow_v5_header);
1033 
1034 	/* Fill export header. */
1035 	header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
1036 	getnanotime(&ts);
1037 	header->unix_secs  = htonl(ts.tv_sec);
1038 	header->unix_nsecs = htonl(ts.tv_nsec);
1039 	header->engine_type = 0;
1040 	header->engine_id = fe->domain_id;
1041 	header->pad = 0;
1042 	header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq,
1043 	    header->count));
1044 	header->count = htons(header->count);
1045 
1046 	if (priv->export != NULL)
1047 		NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags);
1048 	else
1049 		NG_FREE_ITEM(item);
1050 
1051 	return (error);
1052 }
1053 
1054 
1055 /* Add export record to dgram. */
1056 static int
1057 export_add(item_p item, struct flow_entry *fle)
1058 {
1059 	struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
1060 					struct netflow_v5_export_dgram *);
1061 	struct netflow_v5_header *header = &dgram->header;
1062 	struct netflow_v5_record *rec;
1063 
1064 	rec = &dgram->r[header->count];
1065 	header->count ++;
1066 
1067 	KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
1068 	    ("ng_netflow: export too big"));
1069 
1070 	/* Fill in export record. */
1071 	rec->src_addr = fle->f.r.r_src.s_addr;
1072 	rec->dst_addr = fle->f.r.r_dst.s_addr;
1073 	rec->next_hop = fle->f.next_hop.s_addr;
1074 	rec->i_ifx    = htons(fle->f.fle_i_ifx);
1075 	rec->o_ifx    = htons(fle->f.fle_o_ifx);
1076 	rec->packets  = htonl(fle->f.packets);
1077 	rec->octets   = htonl(fle->f.bytes);
1078 	rec->first    = htonl(MILLIUPTIME(fle->f.first));
1079 	rec->last     = htonl(MILLIUPTIME(fle->f.last));
1080 	rec->s_port   = fle->f.r.r_sport;
1081 	rec->d_port   = fle->f.r.r_dport;
1082 	rec->flags    = fle->f.tcp_flags;
1083 	rec->prot     = fle->f.r.r_ip_p;
1084 	rec->tos      = fle->f.r.r_tos;
1085 	rec->dst_mask = fle->f.dst_mask;
1086 	rec->src_mask = fle->f.src_mask;
1087 	rec->pad1     = 0;
1088 	rec->pad2     = 0;
1089 
1090 	/* Not supported fields. */
1091 	rec->src_as = rec->dst_as = 0;
1092 
1093 	if (header->count == NETFLOW_V5_MAX_RECORDS)
1094 		return (1); /* end of datagram */
1095 	else
1096 		return (0);
1097 }
1098 
1099 /* Periodic flow expiry run. */
1100 void
1101 ng_netflow_expire(void *arg)
1102 {
1103 	struct flow_entry	*fle, *fle1;
1104 	struct flow_hash_entry	*hsh;
1105 	priv_p			priv = (priv_p )arg;
1106 	int			used, i;
1107 
1108 	/*
1109 	 * Going through all the cache.
1110 	 */
1111 	used = uma_zone_get_cur(priv->zone);
1112 	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
1113 		/*
1114 		 * Skip entries, that are already being worked on.
1115 		 */
1116 		if (mtx_trylock(&hsh->mtx) == 0)
1117 			continue;
1118 
1119 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1120 			/*
1121 			 * Interrupt thread wants this entry!
1122 			 * Quick! Quick! Bail out!
1123 			 */
1124 			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1125 				break;
1126 
1127 			/*
1128 			 * Don't expire aggressively while hash collision
1129 			 * ratio is predicted small.
1130 			 */
1131 			if (used <= (NBUCKETS*2) && !INACTIVE(fle))
1132 				break;
1133 
1134 			if ((INACTIVE(fle) && (SMALL(fle) ||
1135 			    (used > (NBUCKETS*2)))) || AGED(fle)) {
1136 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1137 				expire_flow(priv, priv_to_fib(priv,
1138 				    fle->f.r.fib), fle, NG_NOFLAGS);
1139 				used--;
1140 				counter_u64_add(priv->nfinfo_inact_exp, 1);
1141 			}
1142 		}
1143 		mtx_unlock(&hsh->mtx);
1144 	}
1145 
1146 #ifdef INET6
1147 	used = uma_zone_get_cur(priv->zone6);
1148 	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) {
1149 		struct flow6_entry	*fle6;
1150 
1151 		/*
1152 		 * Skip entries, that are already being worked on.
1153 		 */
1154 		if (mtx_trylock(&hsh->mtx) == 0)
1155 			continue;
1156 
1157 		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1158 			fle6 = (struct flow6_entry *)fle;
1159 			/*
1160 			 * Interrupt thread wants this entry!
1161 			 * Quick! Quick! Bail out!
1162 			 */
1163 			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1164 				break;
1165 
1166 			/*
1167 			 * Don't expire aggressively while hash collision
1168 			 * ratio is predicted small.
1169 			 */
1170 			if (used <= (NBUCKETS*2) && !INACTIVE(fle6))
1171 				break;
1172 
1173 			if ((INACTIVE(fle6) && (SMALL(fle6) ||
1174 			    (used > (NBUCKETS*2)))) || AGED(fle6)) {
1175 				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1176 				expire_flow(priv, priv_to_fib(priv,
1177 				    fle->f.r.fib), fle, NG_NOFLAGS);
1178 				used--;
1179 				counter_u64_add(priv->nfinfo_inact_exp, 1);
1180 			}
1181 		}
1182 		mtx_unlock(&hsh->mtx);
1183 	}
1184 #endif
1185 
1186 	/* Schedule next expire. */
1187 	callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
1188 	    (void *)priv);
1189 }
1190