1 /*- 2 * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru> 3 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org> 4 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $ 29 */ 30 31 static const char rcs_id[] = 32 "@(#) $FreeBSD$"; 33 34 #include "opt_inet6.h" 35 #include "opt_route.h" 36 #include <sys/param.h> 37 #include <sys/kernel.h> 38 #include <sys/limits.h> 39 #include <sys/mbuf.h> 40 #include <sys/syslog.h> 41 #include <sys/systm.h> 42 #include <sys/socket.h> 43 #include <sys/endian.h> 44 45 #include <machine/atomic.h> 46 #include <machine/stdarg.h> 47 48 #include <net/if.h> 49 #include <net/route.h> 50 #include <net/ethernet.h> 51 #include <netinet/in.h> 52 #include <netinet/in_systm.h> 53 #include <netinet/ip.h> 54 #include <netinet/ip6.h> 55 #include <netinet/tcp.h> 56 #include <netinet/udp.h> 57 58 #include <netgraph/ng_message.h> 59 #include <netgraph/netgraph.h> 60 61 #include <netgraph/netflow/netflow.h> 62 #include <netgraph/netflow/netflow_v9.h> 63 #include <netgraph/netflow/ng_netflow.h> 64 65 #define NBUCKETS (65536) /* must be power of 2 */ 66 67 /* This hash is for TCP or UDP packets. */ 68 #define FULL_HASH(addr1, addr2, port1, port2) \ 69 (((addr1 ^ (addr1 >> 16) ^ \ 70 htons(addr2 ^ (addr2 >> 16))) ^ \ 71 port1 ^ htons(port2)) & \ 72 (NBUCKETS - 1)) 73 74 /* This hash is for all other IP packets. */ 75 #define ADDR_HASH(addr1, addr2) \ 76 ((addr1 ^ (addr1 >> 16) ^ \ 77 htons(addr2 ^ (addr2 >> 16))) & \ 78 (NBUCKETS - 1)) 79 80 /* Macros to shorten logical constructions */ 81 /* XXX: priv must exist in namespace */ 82 #define INACTIVE(fle) (time_uptime - fle->f.last > priv->info.nfinfo_inact_t) 83 #define AGED(fle) (time_uptime - fle->f.first > priv->info.nfinfo_act_t) 84 #define ISFREE(fle) (fle->f.packets == 0) 85 86 /* 87 * 4 is a magical number: statistically number of 4-packet flows is 88 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP 89 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case 90 * of reachable host and 4-packet otherwise. 91 */ 92 #define SMALL(fle) (fle->f.packets <= 4) 93 94 95 MALLOC_DECLARE(M_NETFLOW_HASH); 96 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash"); 97 98 static int export_add(item_p, struct flow_entry *); 99 static int export_send(priv_p, fib_export_p, item_p, int); 100 101 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *, int, uint8_t, uint8_t); 102 #ifdef INET6 103 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *, int, uint8_t, uint8_t); 104 #endif 105 106 static __inline void expire_flow(priv_p, fib_export_p, struct flow_entry *, int); 107 108 /* 109 * Generate hash for a given flow record. 110 * 111 * FIB is not used here, because: 112 * most VRFS will carry public IPv4 addresses which are unique even 113 * without FIB private addresses can overlap, but this is worked out 114 * via flow_rec bcmp() containing fib id. In IPv6 world addresses are 115 * all globally unique (it's not fully true, there is FC00::/7 for example, 116 * but chances of address overlap are MUCH smaller) 117 */ 118 static __inline uint32_t 119 ip_hash(struct flow_rec *r) 120 { 121 switch (r->r_ip_p) { 122 case IPPROTO_TCP: 123 case IPPROTO_UDP: 124 return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr, 125 r->r_sport, r->r_dport); 126 default: 127 return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr); 128 } 129 } 130 131 #ifdef INET6 132 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */ 133 static __inline uint32_t 134 ip6_hash(struct flow6_rec *r) 135 { 136 switch (r->r_ip_p) { 137 case IPPROTO_TCP: 138 case IPPROTO_UDP: 139 return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 140 r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport, 141 r->r_dport); 142 default: 143 return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 144 r->dst.r_dst6.__u6_addr.__u6_addr32[3]); 145 } 146 } 147 #endif 148 149 /* This is callback from uma(9), called on alloc. */ 150 static int 151 uma_ctor_flow(void *mem, int size, void *arg, int how) 152 { 153 priv_p priv = (priv_p )arg; 154 155 if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE) 156 return (ENOMEM); 157 158 atomic_add_32(&priv->info.nfinfo_used, 1); 159 160 return (0); 161 } 162 163 /* This is callback from uma(9), called on free. */ 164 static void 165 uma_dtor_flow(void *mem, int size, void *arg) 166 { 167 priv_p priv = (priv_p )arg; 168 169 atomic_subtract_32(&priv->info.nfinfo_used, 1); 170 } 171 172 #ifdef INET6 173 /* This is callback from uma(9), called on alloc. */ 174 static int 175 uma_ctor_flow6(void *mem, int size, void *arg, int how) 176 { 177 priv_p priv = (priv_p )arg; 178 179 if (atomic_load_acq_32(&priv->info.nfinfo_used6) >= CACHESIZE) 180 return (ENOMEM); 181 182 atomic_add_32(&priv->info.nfinfo_used6, 1); 183 184 return (0); 185 } 186 187 /* This is callback from uma(9), called on free. */ 188 static void 189 uma_dtor_flow6(void *mem, int size, void *arg) 190 { 191 priv_p priv = (priv_p )arg; 192 193 atomic_subtract_32(&priv->info.nfinfo_used6, 1); 194 } 195 #endif 196 197 /* 198 * Detach export datagram from priv, if there is any. 199 * If there is no, allocate a new one. 200 */ 201 static item_p 202 get_export_dgram(priv_p priv, fib_export_p fe) 203 { 204 item_p item = NULL; 205 206 mtx_lock(&fe->export_mtx); 207 if (fe->exp.item != NULL) { 208 item = fe->exp.item; 209 fe->exp.item = NULL; 210 } 211 mtx_unlock(&fe->export_mtx); 212 213 if (item == NULL) { 214 struct netflow_v5_export_dgram *dgram; 215 struct mbuf *m; 216 217 m = m_getcl(M_DONTWAIT, MT_DATA, M_PKTHDR); 218 if (m == NULL) 219 return (NULL); 220 item = ng_package_data(m, NG_NOFLAGS); 221 if (item == NULL) 222 return (NULL); 223 dgram = mtod(m, struct netflow_v5_export_dgram *); 224 dgram->header.count = 0; 225 dgram->header.version = htons(NETFLOW_V5); 226 dgram->header.pad = 0; 227 228 } 229 230 return (item); 231 } 232 233 /* 234 * Re-attach incomplete datagram back to priv. 235 * If there is already another one, then send incomplete. */ 236 static void 237 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags) 238 { 239 /* 240 * It may happen on SMP, that some thread has already 241 * put its item there, in this case we bail out and 242 * send what we have to collector. 243 */ 244 mtx_lock(&fe->export_mtx); 245 if (fe->exp.item == NULL) { 246 fe->exp.item = item; 247 mtx_unlock(&fe->export_mtx); 248 } else { 249 mtx_unlock(&fe->export_mtx); 250 export_send(priv, fe, item, flags); 251 } 252 } 253 254 /* 255 * The flow is over. Call export_add() and free it. If datagram is 256 * full, then call export_send(). 257 */ 258 static __inline void 259 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags) 260 { 261 struct netflow_export_item exp; 262 uint16_t version = fle->f.version; 263 264 if ((priv->export != NULL) && (version == IPVERSION)) { 265 exp.item = get_export_dgram(priv, fe); 266 if (exp.item == NULL) { 267 atomic_add_32(&priv->info.nfinfo_export_failed, 1); 268 if (priv->export9 != NULL) 269 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 270 /* fle definitely contains IPv4 flow */ 271 uma_zfree_arg(priv->zone, fle, priv); 272 return; 273 } 274 275 if (export_add(exp.item, fle) > 0) 276 export_send(priv, fe, exp.item, flags); 277 else 278 return_export_dgram(priv, fe, exp.item, NG_QUEUE); 279 } 280 281 if (priv->export9 != NULL) { 282 exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt); 283 if (exp.item9 == NULL) { 284 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 285 if (version == IPVERSION) 286 uma_zfree_arg(priv->zone, fle, priv); 287 #ifdef INET6 288 else if (version == IP6VERSION) 289 uma_zfree_arg(priv->zone6, fle, priv); 290 #endif 291 else 292 panic("ng_netflow: Unknown IP proto: %d", version); 293 return; 294 } 295 296 if (export9_add(exp.item9, exp.item9_opt, fle) > 0) 297 export9_send(priv, fe, exp.item9, exp.item9_opt, flags); 298 else 299 return_export9_dgram(priv, fe, exp.item9, exp.item9_opt, NG_QUEUE); 300 } 301 302 if (version == IPVERSION) 303 uma_zfree_arg(priv->zone, fle, priv); 304 #ifdef INET6 305 else if (version == IP6VERSION) 306 uma_zfree_arg(priv->zone6, fle, priv); 307 #endif 308 } 309 310 /* Get a snapshot of node statistics */ 311 void 312 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i) 313 { 314 /* XXX: atomic */ 315 memcpy((void *)i, (void *)&priv->info, sizeof(priv->info)); 316 } 317 318 /* 319 * Insert a record into defined slot. 320 * 321 * First we get for us a free flow entry, then fill in all 322 * possible fields in it. 323 * 324 * TODO: consider dropping hash mutex while filling in datagram, 325 * as this was done in previous version. Need to test & profile 326 * to be sure. 327 */ 328 static int 329 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r, 330 int plen, uint8_t flags, uint8_t tcp_flags) 331 { 332 struct flow_entry *fle; 333 struct sockaddr_in sin; 334 struct rtentry *rt; 335 336 mtx_assert(&hsh->mtx, MA_OWNED); 337 338 fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT); 339 if (fle == NULL) { 340 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 341 return (ENOMEM); 342 } 343 344 /* 345 * Now fle is totally ours. It is detached from all lists, 346 * we can safely edit it. 347 */ 348 349 fle->f.version = IPVERSION; 350 bcopy(r, &fle->f.r, sizeof(struct flow_rec)); 351 fle->f.bytes = plen; 352 fle->f.packets = 1; 353 fle->f.tcp_flags = tcp_flags; 354 355 fle->f.first = fle->f.last = time_uptime; 356 357 /* 358 * First we do route table lookup on destination address. So we can 359 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 360 */ 361 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) { 362 bzero(&sin, sizeof(sin)); 363 sin.sin_len = sizeof(struct sockaddr_in); 364 sin.sin_family = AF_INET; 365 sin.sin_addr = fle->f.r.r_dst; 366 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 367 if (rt != NULL) { 368 fle->f.fle_o_ifx = rt->rt_ifp->if_index; 369 370 if (rt->rt_flags & RTF_GATEWAY && 371 rt->rt_gateway->sa_family == AF_INET) 372 fle->f.next_hop = 373 ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr; 374 375 if (rt_mask(rt)) 376 fle->f.dst_mask = bitcount32(((struct sockaddr_in *) 377 rt_mask(rt))->sin_addr.s_addr); 378 else if (rt->rt_flags & RTF_HOST) 379 /* Give up. We can't determine mask :( */ 380 fle->f.dst_mask = 32; 381 382 RTFREE_LOCKED(rt); 383 } 384 } 385 386 /* Do route lookup on source address, to fill in src_mask. */ 387 if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) { 388 bzero(&sin, sizeof(sin)); 389 sin.sin_len = sizeof(struct sockaddr_in); 390 sin.sin_family = AF_INET; 391 sin.sin_addr = fle->f.r.r_src; 392 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 393 if (rt != NULL) { 394 if (rt_mask(rt)) 395 fle->f.src_mask = bitcount32(((struct sockaddr_in *) 396 rt_mask(rt))->sin_addr.s_addr); 397 else if (rt->rt_flags & RTF_HOST) 398 /* Give up. We can't determine mask :( */ 399 fle->f.src_mask = 32; 400 401 RTFREE_LOCKED(rt); 402 } 403 } 404 405 /* Push new flow at the and of hash. */ 406 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 407 408 return (0); 409 } 410 411 #ifdef INET6 412 /* XXX: make normal function, instead of.. */ 413 #define ipv6_masklen(x) bitcount32((x).__u6_addr.__u6_addr32[0]) + \ 414 bitcount32((x).__u6_addr.__u6_addr32[1]) + \ 415 bitcount32((x).__u6_addr.__u6_addr32[2]) + \ 416 bitcount32((x).__u6_addr.__u6_addr32[3]) 417 #define RT_MASK6(x) (ipv6_masklen(((struct sockaddr_in6 *)rt_mask(x))->sin6_addr)) 418 static int 419 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r, 420 int plen, uint8_t flags, uint8_t tcp_flags) 421 { 422 struct flow6_entry *fle6; 423 struct sockaddr_in6 *src, *dst; 424 struct rtentry *rt; 425 struct route_in6 rin6; 426 427 mtx_assert(&hsh6->mtx, MA_OWNED); 428 429 fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT); 430 if (fle6 == NULL) { 431 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 432 return (ENOMEM); 433 } 434 435 /* 436 * Now fle is totally ours. It is detached from all lists, 437 * we can safely edit it. 438 */ 439 440 fle6->f.version = IP6VERSION; 441 bcopy(r, &fle6->f.r, sizeof(struct flow6_rec)); 442 fle6->f.bytes = plen; 443 fle6->f.packets = 1; 444 fle6->f.tcp_flags = tcp_flags; 445 446 fle6->f.first = fle6->f.last = time_uptime; 447 448 /* 449 * First we do route table lookup on destination address. So we can 450 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 451 */ 452 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) 453 { 454 bzero(&rin6, sizeof(struct route_in6)); 455 dst = (struct sockaddr_in6 *)&rin6.ro_dst; 456 dst->sin6_len = sizeof(struct sockaddr_in6); 457 dst->sin6_family = AF_INET6; 458 dst->sin6_addr = r->dst.r_dst6; 459 460 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)dst, 0, 0, r->fib); 461 462 if (rin6.ro_rt != NULL) { 463 rt = rin6.ro_rt; 464 fle6->f.fle_o_ifx = rt->rt_ifp->if_index; 465 466 if (rt->rt_flags & RTF_GATEWAY && 467 rt->rt_gateway->sa_family == AF_INET6) 468 fle6->f.n.next_hop6 = 469 ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr; 470 471 if (rt_mask(rt)) 472 fle6->f.dst_mask = RT_MASK6(rt); 473 else 474 fle6->f.dst_mask = 128; 475 476 RTFREE_LOCKED(rt); 477 } 478 } 479 480 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) 481 { 482 /* Do route lookup on source address, to fill in src_mask. */ 483 bzero(&rin6, sizeof(struct route_in6)); 484 src = (struct sockaddr_in6 *)&rin6.ro_dst; 485 src->sin6_len = sizeof(struct sockaddr_in6); 486 src->sin6_family = AF_INET6; 487 src->sin6_addr = r->src.r_src6; 488 489 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)src, 0, 0, r->fib); 490 491 if (rin6.ro_rt != NULL) { 492 rt = rin6.ro_rt; 493 494 if (rt_mask(rt)) 495 fle6->f.src_mask = RT_MASK6(rt); 496 else 497 fle6->f.src_mask = 128; 498 499 RTFREE_LOCKED(rt); 500 } 501 } 502 503 /* Push new flow at the and of hash. */ 504 TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash); 505 506 return (0); 507 } 508 #undef ipv6_masklen 509 #undef RT_MASK6 510 #endif 511 512 513 /* 514 * Non-static functions called from ng_netflow.c 515 */ 516 517 /* Allocate memory and set up flow cache */ 518 void 519 ng_netflow_cache_init(priv_p priv) 520 { 521 struct flow_hash_entry *hsh; 522 int i; 523 524 /* Initialize cache UMA zone. */ 525 priv->zone = uma_zcreate("NetFlow IPv4 cache", sizeof(struct flow_entry), 526 uma_ctor_flow, uma_dtor_flow, NULL, NULL, UMA_ALIGN_CACHE, 0); 527 uma_zone_set_max(priv->zone, CACHESIZE); 528 #ifdef INET6 529 priv->zone6 = uma_zcreate("NetFlow IPv6 cache", sizeof(struct flow6_entry), 530 uma_ctor_flow6, uma_dtor_flow6, NULL, NULL, UMA_ALIGN_CACHE, 0); 531 uma_zone_set_max(priv->zone6, CACHESIZE); 532 #endif 533 534 /* Allocate hash. */ 535 priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 536 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 537 538 /* Initialize hash. */ 539 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) { 540 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 541 TAILQ_INIT(&hsh->head); 542 } 543 544 #ifdef INET6 545 /* Allocate hash. */ 546 priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 547 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 548 549 /* Initialize hash. */ 550 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) { 551 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 552 TAILQ_INIT(&hsh->head); 553 } 554 #endif 555 556 ng_netflow_v9_cache_init(priv); 557 CTR0(KTR_NET, "ng_netflow startup()"); 558 } 559 560 /* Initialize new FIB table for v5 and v9 */ 561 int 562 ng_netflow_fib_init(priv_p priv, int fib) 563 { 564 fib_export_p fe = priv_to_fib(priv, fib); 565 566 CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib); 567 568 if (fe != NULL) 569 return (0); 570 571 if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH, M_NOWAIT | M_ZERO)) == NULL) 572 return (1); 573 574 mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF); 575 mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF); 576 fe->fib = fib; 577 fe->domain_id = fib; 578 579 if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib], (uintptr_t)NULL, (uintptr_t)fe) == 0) { 580 /* FIB already set up by other ISR */ 581 CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p", fib, fe, priv_to_fib(priv, fib)); 582 mtx_destroy(&fe->export_mtx); 583 mtx_destroy(&fe->export9_mtx); 584 free(fe, M_NETGRAPH); 585 } else { 586 /* Increase counter for statistics */ 587 CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)", fib, fe, priv_to_fib(priv, fib)); 588 atomic_fetchadd_32(&priv->info.nfinfo_alloc_fibs, 1); 589 } 590 591 return (0); 592 } 593 594 /* Free all flow cache memory. Called from node close method. */ 595 void 596 ng_netflow_cache_flush(priv_p priv) 597 { 598 struct flow_entry *fle, *fle1; 599 struct flow_hash_entry *hsh; 600 struct netflow_export_item exp; 601 fib_export_p fe; 602 int i; 603 604 bzero(&exp, sizeof(exp)); 605 606 /* 607 * We are going to free probably billable data. 608 * Expire everything before freeing it. 609 * No locking is required since callout is already drained. 610 */ 611 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) 612 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 613 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 614 fe = priv_to_fib(priv, fle->f.r.fib); 615 expire_flow(priv, fe, fle, NG_QUEUE); 616 } 617 #ifdef INET6 618 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) 619 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 620 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 621 fe = priv_to_fib(priv, fle->f.r.fib); 622 expire_flow(priv, fe, fle, NG_QUEUE); 623 } 624 #endif 625 626 uma_zdestroy(priv->zone); 627 /* Destroy hash mutexes. */ 628 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) 629 mtx_destroy(&hsh->mtx); 630 631 /* Free hash memory. */ 632 if (priv->hash != NULL) 633 free(priv->hash, M_NETFLOW_HASH); 634 #ifdef INET6 635 uma_zdestroy(priv->zone6); 636 /* Destroy hash mutexes. */ 637 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) 638 mtx_destroy(&hsh->mtx); 639 640 /* Free hash memory. */ 641 if (priv->hash6 != NULL) 642 free(priv->hash6, M_NETFLOW_HASH); 643 #endif 644 645 for (i = 0; i < priv->maxfibs; i++) { 646 if ((fe = priv_to_fib(priv, i)) == NULL) 647 continue; 648 649 if (fe->exp.item != NULL) 650 export_send(priv, fe, fe->exp.item, NG_QUEUE); 651 652 if (fe->exp.item9 != NULL) 653 export9_send(priv, fe, fe->exp.item9, fe->exp.item9_opt, NG_QUEUE); 654 655 mtx_destroy(&fe->export_mtx); 656 mtx_destroy(&fe->export9_mtx); 657 free(fe, M_NETGRAPH); 658 } 659 660 ng_netflow_v9_cache_flush(priv); 661 } 662 663 /* Insert packet from into flow cache. */ 664 int 665 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip, caddr_t upper_ptr, uint8_t upper_proto, 666 uint8_t flags, unsigned int src_if_index) 667 { 668 register struct flow_entry *fle, *fle1; 669 struct flow_hash_entry *hsh; 670 struct flow_rec r; 671 int hlen, plen; 672 int error = 0; 673 uint8_t tcp_flags = 0; 674 uint16_t eproto; 675 676 /* Try to fill flow_rec r */ 677 bzero(&r, sizeof(r)); 678 /* check version */ 679 if (ip->ip_v != IPVERSION) 680 return (EINVAL); 681 682 /* verify min header length */ 683 hlen = ip->ip_hl << 2; 684 685 if (hlen < sizeof(struct ip)) 686 return (EINVAL); 687 688 eproto = ETHERTYPE_IP; 689 /* Assume L4 template by default */ 690 r.flow_type = NETFLOW_V9_FLOW_V4_L4; 691 692 r.r_src = ip->ip_src; 693 r.r_dst = ip->ip_dst; 694 r.fib = fe->fib; 695 696 /* save packet length */ 697 plen = ntohs(ip->ip_len); 698 699 r.r_ip_p = ip->ip_p; 700 r.r_tos = ip->ip_tos; 701 702 r.r_i_ifx = src_if_index; 703 704 /* 705 * XXX NOTE: only first fragment of fragmented TCP, UDP and 706 * ICMP packet will be recorded with proper s_port and d_port. 707 * Following fragments will be recorded simply as IP packet with 708 * ip_proto = ip->ip_p and s_port, d_port set to zero. 709 * I know, it looks like bug. But I don't want to re-implement 710 * ip packet assebmling here. Anyway, (in)famous trafd works this way - 711 * and nobody complains yet :) 712 */ 713 if ((ip->ip_off & htons(IP_OFFMASK)) == 0) 714 switch(r.r_ip_p) { 715 case IPPROTO_TCP: 716 { 717 register struct tcphdr *tcp; 718 719 tcp = (struct tcphdr *)((caddr_t )ip + hlen); 720 r.r_sport = tcp->th_sport; 721 r.r_dport = tcp->th_dport; 722 tcp_flags = tcp->th_flags; 723 break; 724 } 725 case IPPROTO_UDP: 726 r.r_ports = *(uint32_t *)((caddr_t )ip + hlen); 727 break; 728 } 729 730 atomic_fetchadd_32(&priv->info.nfinfo_packets, 1); 731 /* XXX: atomic */ 732 priv->info.nfinfo_bytes += plen; 733 734 /* Find hash slot. */ 735 hsh = &priv->hash[ip_hash(&r)]; 736 737 mtx_lock(&hsh->mtx); 738 739 /* 740 * Go through hash and find our entry. If we encounter an 741 * entry, that should be expired, purge it. We do a reverse 742 * search since most active entries are first, and most 743 * searches are done on most active entries. 744 */ 745 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 746 if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0) 747 break; 748 if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) { 749 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 750 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_QUEUE); 751 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 752 } 753 } 754 755 if (fle) { /* An existent entry. */ 756 757 fle->f.bytes += plen; 758 fle->f.packets ++; 759 fle->f.tcp_flags |= tcp_flags; 760 fle->f.last = time_uptime; 761 762 /* 763 * We have the following reasons to expire flow in active way: 764 * - it hit active timeout 765 * - a TCP connection closed 766 * - it is going to overflow counter 767 */ 768 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) || 769 (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 770 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 771 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_QUEUE); 772 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 773 } else { 774 /* 775 * It is the newest, move it to the tail, 776 * if it isn't there already. Next search will 777 * locate it quicker. 778 */ 779 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 780 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 781 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 782 } 783 } 784 } else /* A new flow entry. */ 785 error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags); 786 787 mtx_unlock(&hsh->mtx); 788 789 return (error); 790 } 791 792 #ifdef INET6 793 /* Insert IPv6 packet from into flow cache. */ 794 int 795 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6, caddr_t upper_ptr, uint8_t upper_proto, 796 uint8_t flags, unsigned int src_if_index) 797 { 798 register struct flow_entry *fle = NULL, *fle1; 799 register struct flow6_entry *fle6; 800 struct flow_hash_entry *hsh; 801 struct flow6_rec r; 802 int plen; 803 int error = 0; 804 uint8_t tcp_flags = 0; 805 806 /* check version */ 807 if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION) 808 return (EINVAL); 809 810 bzero(&r, sizeof(r)); 811 812 r.src.r_src6 = ip6->ip6_src; 813 r.dst.r_dst6 = ip6->ip6_dst; 814 r.fib = fe->fib; 815 816 /* Assume L4 template by default */ 817 r.flow_type = NETFLOW_V9_FLOW_V6_L4; 818 819 /* save packet length */ 820 plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr); 821 822 /* XXX: set DSCP/CoS value */ 823 #if 0 824 r.r_tos = ip->ip_tos; 825 #endif 826 if ((flags & NG_NETFLOW_IS_FRAG) == 0) { 827 switch(upper_proto) { 828 case IPPROTO_TCP: 829 { 830 register struct tcphdr *tcp; 831 832 tcp = (struct tcphdr *)upper_ptr; 833 r.r_ports = *(uint32_t *)upper_ptr; 834 tcp_flags = tcp->th_flags; 835 break; 836 } 837 case IPPROTO_UDP: 838 case IPPROTO_SCTP: 839 { 840 r.r_ports = *(uint32_t *)upper_ptr; 841 break; 842 } 843 844 } 845 } 846 847 r.r_ip_p = upper_proto; 848 r.r_i_ifx = src_if_index; 849 850 atomic_fetchadd_32(&priv->info.nfinfo_packets6, 1); 851 /* XXX: atomic */ 852 priv->info.nfinfo_bytes6 += plen; 853 854 /* Find hash slot. */ 855 hsh = &priv->hash6[ip6_hash(&r)]; 856 857 mtx_lock(&hsh->mtx); 858 859 /* 860 * Go through hash and find our entry. If we encounter an 861 * entry, that should be expired, purge it. We do a reverse 862 * search since most active entries are first, and most 863 * searches are done on most active entries. 864 */ 865 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 866 if (fle->f.version != IP6VERSION) 867 continue; 868 fle6 = (struct flow6_entry *)fle; 869 if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0) 870 break; 871 if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) { 872 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 873 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 874 NG_QUEUE); 875 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 876 } 877 } 878 879 if (fle != NULL) { /* An existent entry. */ 880 fle6 = (struct flow6_entry *)fle; 881 882 fle6->f.bytes += plen; 883 fle6->f.packets ++; 884 fle6->f.tcp_flags |= tcp_flags; 885 fle6->f.last = time_uptime; 886 887 /* 888 * We have the following reasons to expire flow in active way: 889 * - it hit active timeout 890 * - a TCP connection closed 891 * - it is going to overflow counter 892 */ 893 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) || 894 (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 895 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 896 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 897 NG_QUEUE); 898 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 899 } else { 900 /* 901 * It is the newest, move it to the tail, 902 * if it isn't there already. Next search will 903 * locate it quicker. 904 */ 905 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 906 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 907 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 908 } 909 } 910 } else /* A new flow entry. */ 911 error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags); 912 913 mtx_unlock(&hsh->mtx); 914 915 return (error); 916 } 917 #endif 918 919 /* 920 * Return records from cache to userland. 921 * 922 * TODO: matching particular IP should be done in kernel, here. 923 */ 924 int 925 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req, 926 struct ngnf_show_header *resp) 927 { 928 struct flow_hash_entry *hsh; 929 struct flow_entry *fle; 930 struct flow_entry_data *data = (struct flow_entry_data *)(resp + 1); 931 #ifdef INET6 932 struct flow6_entry_data *data6 = (struct flow6_entry_data *)(resp + 1); 933 #endif 934 int i, max; 935 936 i = req->hash_id; 937 if (i > NBUCKETS-1) 938 return (EINVAL); 939 940 #ifdef INET6 941 if (req->version == 6) { 942 resp->version = 6; 943 hsh = priv->hash6 + i; 944 max = NREC6_AT_ONCE; 945 } else 946 #endif 947 if (req->version == 4) { 948 resp->version = 4; 949 hsh = priv->hash + i; 950 max = NREC_AT_ONCE; 951 } else 952 return (EINVAL); 953 954 /* 955 * We will transfer not more than NREC_AT_ONCE. More data 956 * will come in next message. 957 * We send current hash index and current record number in list 958 * to userland, and userland should return it back to us. 959 * Then, we will restart with new entry. 960 * 961 * The resulting cache snapshot can be inaccurate if flow expiration 962 * is taking place on hash item between userland data requests for 963 * this hash item id. 964 */ 965 resp->nentries = 0; 966 for (; i < NBUCKETS; hsh++, i++) { 967 int list_id; 968 969 if (mtx_trylock(&hsh->mtx) == 0) { 970 /* 971 * Requested hash index is not available, 972 * relay decision to skip or re-request data 973 * to userland. 974 */ 975 resp->hash_id = i; 976 resp->list_id = 0; 977 return (0); 978 } 979 980 list_id = 0; 981 TAILQ_FOREACH(fle, &hsh->head, fle_hash) { 982 if (hsh->mtx.mtx_lock & MTX_CONTESTED) { 983 resp->hash_id = i; 984 resp->list_id = list_id; 985 mtx_unlock(&hsh->mtx); 986 return (0); 987 } 988 989 list_id++; 990 /* Search for particular record in list. */ 991 if (req->list_id > 0) { 992 if (list_id < req->list_id) 993 continue; 994 995 /* Requested list position found. */ 996 req->list_id = 0; 997 } 998 #ifdef INET6 999 if (req->version == 6) { 1000 struct flow6_entry *fle6; 1001 1002 fle6 = (struct flow6_entry *)fle; 1003 bcopy(&fle6->f, data6 + resp->nentries, 1004 sizeof(fle6->f)); 1005 } else 1006 #endif 1007 bcopy(&fle->f, data + resp->nentries, 1008 sizeof(fle->f)); 1009 resp->nentries++; 1010 if (resp->nentries == max) { 1011 resp->hash_id = i; 1012 /* 1013 * If it was the last item in list 1014 * we simply skip to next hash_id. 1015 */ 1016 resp->list_id = list_id + 1; 1017 mtx_unlock(&hsh->mtx); 1018 return (0); 1019 } 1020 } 1021 mtx_unlock(&hsh->mtx); 1022 } 1023 1024 resp->hash_id = resp->list_id = 0; 1025 1026 return (0); 1027 } 1028 1029 /* We have full datagram in privdata. Send it to export hook. */ 1030 static int 1031 export_send(priv_p priv, fib_export_p fe, item_p item, int flags) 1032 { 1033 struct mbuf *m = NGI_M(item); 1034 struct netflow_v5_export_dgram *dgram = mtod(m, 1035 struct netflow_v5_export_dgram *); 1036 struct netflow_v5_header *header = &dgram->header; 1037 struct timespec ts; 1038 int error = 0; 1039 1040 /* Fill mbuf header. */ 1041 m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) * 1042 header->count + sizeof(struct netflow_v5_header); 1043 1044 /* Fill export header. */ 1045 header->sys_uptime = htonl(MILLIUPTIME(time_uptime)); 1046 getnanotime(&ts); 1047 header->unix_secs = htonl(ts.tv_sec); 1048 header->unix_nsecs = htonl(ts.tv_nsec); 1049 header->engine_type = 0; 1050 header->engine_id = fe->domain_id; 1051 header->pad = 0; 1052 header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq, 1053 header->count)); 1054 header->count = htons(header->count); 1055 1056 if (priv->export != NULL) 1057 NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags); 1058 else 1059 NG_FREE_ITEM(item); 1060 1061 return (error); 1062 } 1063 1064 1065 /* Add export record to dgram. */ 1066 static int 1067 export_add(item_p item, struct flow_entry *fle) 1068 { 1069 struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item), 1070 struct netflow_v5_export_dgram *); 1071 struct netflow_v5_header *header = &dgram->header; 1072 struct netflow_v5_record *rec; 1073 1074 rec = &dgram->r[header->count]; 1075 header->count ++; 1076 1077 KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS, 1078 ("ng_netflow: export too big")); 1079 1080 /* Fill in export record. */ 1081 rec->src_addr = fle->f.r.r_src.s_addr; 1082 rec->dst_addr = fle->f.r.r_dst.s_addr; 1083 rec->next_hop = fle->f.next_hop.s_addr; 1084 rec->i_ifx = htons(fle->f.fle_i_ifx); 1085 rec->o_ifx = htons(fle->f.fle_o_ifx); 1086 rec->packets = htonl(fle->f.packets); 1087 rec->octets = htonl(fle->f.bytes); 1088 rec->first = htonl(MILLIUPTIME(fle->f.first)); 1089 rec->last = htonl(MILLIUPTIME(fle->f.last)); 1090 rec->s_port = fle->f.r.r_sport; 1091 rec->d_port = fle->f.r.r_dport; 1092 rec->flags = fle->f.tcp_flags; 1093 rec->prot = fle->f.r.r_ip_p; 1094 rec->tos = fle->f.r.r_tos; 1095 rec->dst_mask = fle->f.dst_mask; 1096 rec->src_mask = fle->f.src_mask; 1097 rec->pad1 = 0; 1098 rec->pad2 = 0; 1099 1100 /* Not supported fields. */ 1101 rec->src_as = rec->dst_as = 0; 1102 1103 if (header->count == NETFLOW_V5_MAX_RECORDS) 1104 return (1); /* end of datagram */ 1105 else 1106 return (0); 1107 } 1108 1109 /* Periodic flow expiry run. */ 1110 void 1111 ng_netflow_expire(void *arg) 1112 { 1113 struct flow_entry *fle, *fle1; 1114 struct flow_hash_entry *hsh; 1115 priv_p priv = (priv_p )arg; 1116 uint32_t used; 1117 int i; 1118 1119 /* 1120 * Going through all the cache. 1121 */ 1122 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) { 1123 /* 1124 * Skip entries, that are already being worked on. 1125 */ 1126 if (mtx_trylock(&hsh->mtx) == 0) 1127 continue; 1128 1129 used = atomic_load_acq_32(&priv->info.nfinfo_used); 1130 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1131 /* 1132 * Interrupt thread wants this entry! 1133 * Quick! Quick! Bail out! 1134 */ 1135 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1136 break; 1137 1138 /* 1139 * Don't expire aggressively while hash collision 1140 * ratio is predicted small. 1141 */ 1142 if (used <= (NBUCKETS*2) && !INACTIVE(fle)) 1143 break; 1144 1145 if ((INACTIVE(fle) && (SMALL(fle) || 1146 (used > (NBUCKETS*2)))) || AGED(fle)) { 1147 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1148 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_NOFLAGS); 1149 used--; 1150 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1151 } 1152 } 1153 mtx_unlock(&hsh->mtx); 1154 } 1155 1156 #ifdef INET6 1157 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) { 1158 struct flow6_entry *fle6; 1159 1160 /* 1161 * Skip entries, that are already being worked on. 1162 */ 1163 if (mtx_trylock(&hsh->mtx) == 0) 1164 continue; 1165 1166 used = atomic_load_acq_32(&priv->info.nfinfo_used6); 1167 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1168 fle6 = (struct flow6_entry *)fle; 1169 /* 1170 * Interrupt thread wants this entry! 1171 * Quick! Quick! Bail out! 1172 */ 1173 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1174 break; 1175 1176 /* 1177 * Don't expire aggressively while hash collision 1178 * ratio is predicted small. 1179 */ 1180 if (used <= (NBUCKETS*2) && !INACTIVE(fle6)) 1181 break; 1182 1183 if ((INACTIVE(fle6) && (SMALL(fle6) || 1184 (used > (NBUCKETS*2)))) || AGED(fle6)) { 1185 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1186 expire_flow(priv, priv_to_fib(priv, 1187 fle->f.r.fib), fle, NG_NOFLAGS); 1188 used--; 1189 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1190 } 1191 } 1192 mtx_unlock(&hsh->mtx); 1193 } 1194 #endif 1195 1196 /* Schedule next expire. */ 1197 callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire, 1198 (void *)priv); 1199 } 1200