1 /*- 2 * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru> 3 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org> 4 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $ 29 */ 30 31 static const char rcs_id[] = 32 "@(#) $FreeBSD$"; 33 34 #include "opt_inet6.h" 35 #include "opt_route.h" 36 #include <sys/param.h> 37 #include <sys/kernel.h> 38 #include <sys/limits.h> 39 #include <sys/mbuf.h> 40 #include <sys/syslog.h> 41 #include <sys/systm.h> 42 #include <sys/socket.h> 43 #include <sys/endian.h> 44 45 #include <machine/atomic.h> 46 #include <machine/stdarg.h> 47 48 #include <net/if.h> 49 #include <net/if_var.h> 50 #include <net/route.h> 51 #include <net/ethernet.h> 52 #include <netinet/in.h> 53 #include <netinet/in_systm.h> 54 #include <netinet/ip.h> 55 #include <netinet/ip6.h> 56 #include <netinet/tcp.h> 57 #include <netinet/udp.h> 58 59 #include <netgraph/ng_message.h> 60 #include <netgraph/netgraph.h> 61 62 #include <netgraph/netflow/netflow.h> 63 #include <netgraph/netflow/netflow_v9.h> 64 #include <netgraph/netflow/ng_netflow.h> 65 66 #define NBUCKETS (65536) /* must be power of 2 */ 67 68 /* This hash is for TCP or UDP packets. */ 69 #define FULL_HASH(addr1, addr2, port1, port2) \ 70 (((addr1 ^ (addr1 >> 16) ^ \ 71 htons(addr2 ^ (addr2 >> 16))) ^ \ 72 port1 ^ htons(port2)) & \ 73 (NBUCKETS - 1)) 74 75 /* This hash is for all other IP packets. */ 76 #define ADDR_HASH(addr1, addr2) \ 77 ((addr1 ^ (addr1 >> 16) ^ \ 78 htons(addr2 ^ (addr2 >> 16))) & \ 79 (NBUCKETS - 1)) 80 81 /* Macros to shorten logical constructions */ 82 /* XXX: priv must exist in namespace */ 83 #define INACTIVE(fle) (time_uptime - fle->f.last > priv->info.nfinfo_inact_t) 84 #define AGED(fle) (time_uptime - fle->f.first > priv->info.nfinfo_act_t) 85 #define ISFREE(fle) (fle->f.packets == 0) 86 87 /* 88 * 4 is a magical number: statistically number of 4-packet flows is 89 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP 90 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case 91 * of reachable host and 4-packet otherwise. 92 */ 93 #define SMALL(fle) (fle->f.packets <= 4) 94 95 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash"); 96 97 static int export_add(item_p, struct flow_entry *); 98 static int export_send(priv_p, fib_export_p, item_p, int); 99 100 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *, 101 int, uint8_t, uint8_t); 102 #ifdef INET6 103 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *, 104 int, uint8_t, uint8_t); 105 #endif 106 107 static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int); 108 109 /* 110 * Generate hash for a given flow record. 111 * 112 * FIB is not used here, because: 113 * most VRFS will carry public IPv4 addresses which are unique even 114 * without FIB private addresses can overlap, but this is worked out 115 * via flow_rec bcmp() containing fib id. In IPv6 world addresses are 116 * all globally unique (it's not fully true, there is FC00::/7 for example, 117 * but chances of address overlap are MUCH smaller) 118 */ 119 static inline uint32_t 120 ip_hash(struct flow_rec *r) 121 { 122 123 switch (r->r_ip_p) { 124 case IPPROTO_TCP: 125 case IPPROTO_UDP: 126 return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr, 127 r->r_sport, r->r_dport); 128 default: 129 return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr); 130 } 131 } 132 133 #ifdef INET6 134 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */ 135 static inline uint32_t 136 ip6_hash(struct flow6_rec *r) 137 { 138 139 switch (r->r_ip_p) { 140 case IPPROTO_TCP: 141 case IPPROTO_UDP: 142 return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 143 r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport, 144 r->r_dport); 145 default: 146 return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 147 r->dst.r_dst6.__u6_addr.__u6_addr32[3]); 148 } 149 } 150 #endif 151 152 /* This is callback from uma(9), called on alloc. */ 153 static int 154 uma_ctor_flow(void *mem, int size, void *arg, int how) 155 { 156 priv_p priv = (priv_p )arg; 157 158 if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE) 159 return (ENOMEM); 160 161 atomic_add_32(&priv->info.nfinfo_used, 1); 162 163 return (0); 164 } 165 166 /* This is callback from uma(9), called on free. */ 167 static void 168 uma_dtor_flow(void *mem, int size, void *arg) 169 { 170 priv_p priv = (priv_p )arg; 171 172 atomic_subtract_32(&priv->info.nfinfo_used, 1); 173 } 174 175 #ifdef INET6 176 /* This is callback from uma(9), called on alloc. */ 177 static int 178 uma_ctor_flow6(void *mem, int size, void *arg, int how) 179 { 180 priv_p priv = (priv_p )arg; 181 182 if (atomic_load_acq_32(&priv->info.nfinfo_used6) >= CACHESIZE) 183 return (ENOMEM); 184 185 atomic_add_32(&priv->info.nfinfo_used6, 1); 186 187 return (0); 188 } 189 190 /* This is callback from uma(9), called on free. */ 191 static void 192 uma_dtor_flow6(void *mem, int size, void *arg) 193 { 194 priv_p priv = (priv_p )arg; 195 196 atomic_subtract_32(&priv->info.nfinfo_used6, 1); 197 } 198 #endif 199 200 /* 201 * Detach export datagram from priv, if there is any. 202 * If there is no, allocate a new one. 203 */ 204 static item_p 205 get_export_dgram(priv_p priv, fib_export_p fe) 206 { 207 item_p item = NULL; 208 209 mtx_lock(&fe->export_mtx); 210 if (fe->exp.item != NULL) { 211 item = fe->exp.item; 212 fe->exp.item = NULL; 213 } 214 mtx_unlock(&fe->export_mtx); 215 216 if (item == NULL) { 217 struct netflow_v5_export_dgram *dgram; 218 struct mbuf *m; 219 220 m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR); 221 if (m == NULL) 222 return (NULL); 223 item = ng_package_data(m, NG_NOFLAGS); 224 if (item == NULL) 225 return (NULL); 226 dgram = mtod(m, struct netflow_v5_export_dgram *); 227 dgram->header.count = 0; 228 dgram->header.version = htons(NETFLOW_V5); 229 dgram->header.pad = 0; 230 } 231 232 return (item); 233 } 234 235 /* 236 * Re-attach incomplete datagram back to priv. 237 * If there is already another one, then send incomplete. */ 238 static void 239 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags) 240 { 241 242 /* 243 * It may happen on SMP, that some thread has already 244 * put its item there, in this case we bail out and 245 * send what we have to collector. 246 */ 247 mtx_lock(&fe->export_mtx); 248 if (fe->exp.item == NULL) { 249 fe->exp.item = item; 250 mtx_unlock(&fe->export_mtx); 251 } else { 252 mtx_unlock(&fe->export_mtx); 253 export_send(priv, fe, item, flags); 254 } 255 } 256 257 /* 258 * The flow is over. Call export_add() and free it. If datagram is 259 * full, then call export_send(). 260 */ 261 static void 262 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags) 263 { 264 struct netflow_export_item exp; 265 uint16_t version = fle->f.version; 266 267 if ((priv->export != NULL) && (version == IPVERSION)) { 268 exp.item = get_export_dgram(priv, fe); 269 if (exp.item == NULL) { 270 atomic_add_32(&priv->info.nfinfo_export_failed, 1); 271 if (priv->export9 != NULL) 272 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 273 /* fle definitely contains IPv4 flow. */ 274 uma_zfree_arg(priv->zone, fle, priv); 275 return; 276 } 277 278 if (export_add(exp.item, fle) > 0) 279 export_send(priv, fe, exp.item, flags); 280 else 281 return_export_dgram(priv, fe, exp.item, NG_QUEUE); 282 } 283 284 if (priv->export9 != NULL) { 285 exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt); 286 if (exp.item9 == NULL) { 287 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 288 if (version == IPVERSION) 289 uma_zfree_arg(priv->zone, fle, priv); 290 #ifdef INET6 291 else if (version == IP6VERSION) 292 uma_zfree_arg(priv->zone6, fle, priv); 293 #endif 294 else 295 panic("ng_netflow: Unknown IP proto: %d", 296 version); 297 return; 298 } 299 300 if (export9_add(exp.item9, exp.item9_opt, fle) > 0) 301 export9_send(priv, fe, exp.item9, exp.item9_opt, flags); 302 else 303 return_export9_dgram(priv, fe, exp.item9, 304 exp.item9_opt, NG_QUEUE); 305 } 306 307 if (version == IPVERSION) 308 uma_zfree_arg(priv->zone, fle, priv); 309 #ifdef INET6 310 else if (version == IP6VERSION) 311 uma_zfree_arg(priv->zone6, fle, priv); 312 #endif 313 } 314 315 /* Get a snapshot of node statistics */ 316 void 317 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i) 318 { 319 320 /* XXX: atomic */ 321 memcpy((void *)i, (void *)&priv->info, sizeof(priv->info)); 322 } 323 324 /* 325 * Insert a record into defined slot. 326 * 327 * First we get for us a free flow entry, then fill in all 328 * possible fields in it. 329 * 330 * TODO: consider dropping hash mutex while filling in datagram, 331 * as this was done in previous version. Need to test & profile 332 * to be sure. 333 */ 334 static int 335 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r, 336 int plen, uint8_t flags, uint8_t tcp_flags) 337 { 338 struct flow_entry *fle; 339 struct sockaddr_in sin; 340 struct rtentry *rt; 341 342 mtx_assert(&hsh->mtx, MA_OWNED); 343 344 fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT); 345 if (fle == NULL) { 346 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 347 return (ENOMEM); 348 } 349 350 /* 351 * Now fle is totally ours. It is detached from all lists, 352 * we can safely edit it. 353 */ 354 fle->f.version = IPVERSION; 355 bcopy(r, &fle->f.r, sizeof(struct flow_rec)); 356 fle->f.bytes = plen; 357 fle->f.packets = 1; 358 fle->f.tcp_flags = tcp_flags; 359 360 fle->f.first = fle->f.last = time_uptime; 361 362 /* 363 * First we do route table lookup on destination address. So we can 364 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 365 */ 366 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) { 367 bzero(&sin, sizeof(sin)); 368 sin.sin_len = sizeof(struct sockaddr_in); 369 sin.sin_family = AF_INET; 370 sin.sin_addr = fle->f.r.r_dst; 371 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 372 if (rt != NULL) { 373 fle->f.fle_o_ifx = rt->rt_ifp->if_index; 374 375 if (rt->rt_flags & RTF_GATEWAY && 376 rt->rt_gateway->sa_family == AF_INET) 377 fle->f.next_hop = 378 ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr; 379 380 if (rt_mask(rt)) 381 fle->f.dst_mask = 382 bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr); 383 else if (rt->rt_flags & RTF_HOST) 384 /* Give up. We can't determine mask :( */ 385 fle->f.dst_mask = 32; 386 387 RTFREE_LOCKED(rt); 388 } 389 } 390 391 /* Do route lookup on source address, to fill in src_mask. */ 392 if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) { 393 bzero(&sin, sizeof(sin)); 394 sin.sin_len = sizeof(struct sockaddr_in); 395 sin.sin_family = AF_INET; 396 sin.sin_addr = fle->f.r.r_src; 397 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 398 if (rt != NULL) { 399 if (rt_mask(rt)) 400 fle->f.src_mask = 401 bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr); 402 else if (rt->rt_flags & RTF_HOST) 403 /* Give up. We can't determine mask :( */ 404 fle->f.src_mask = 32; 405 406 RTFREE_LOCKED(rt); 407 } 408 } 409 410 /* Push new flow at the and of hash. */ 411 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 412 413 return (0); 414 } 415 416 #ifdef INET6 417 /* XXX: make normal function, instead of.. */ 418 #define ipv6_masklen(x) bitcount32((x).__u6_addr.__u6_addr32[0]) + \ 419 bitcount32((x).__u6_addr.__u6_addr32[1]) + \ 420 bitcount32((x).__u6_addr.__u6_addr32[2]) + \ 421 bitcount32((x).__u6_addr.__u6_addr32[3]) 422 #define RT_MASK6(x) (ipv6_masklen(((struct sockaddr_in6 *)rt_mask(x))->sin6_addr)) 423 static int 424 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r, 425 int plen, uint8_t flags, uint8_t tcp_flags) 426 { 427 struct flow6_entry *fle6; 428 struct sockaddr_in6 *src, *dst; 429 struct rtentry *rt; 430 struct route_in6 rin6; 431 432 mtx_assert(&hsh6->mtx, MA_OWNED); 433 434 fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT); 435 if (fle6 == NULL) { 436 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 437 return (ENOMEM); 438 } 439 440 /* 441 * Now fle is totally ours. It is detached from all lists, 442 * we can safely edit it. 443 */ 444 445 fle6->f.version = IP6VERSION; 446 bcopy(r, &fle6->f.r, sizeof(struct flow6_rec)); 447 fle6->f.bytes = plen; 448 fle6->f.packets = 1; 449 fle6->f.tcp_flags = tcp_flags; 450 451 fle6->f.first = fle6->f.last = time_uptime; 452 453 /* 454 * First we do route table lookup on destination address. So we can 455 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 456 */ 457 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) { 458 bzero(&rin6, sizeof(struct route_in6)); 459 dst = (struct sockaddr_in6 *)&rin6.ro_dst; 460 dst->sin6_len = sizeof(struct sockaddr_in6); 461 dst->sin6_family = AF_INET6; 462 dst->sin6_addr = r->dst.r_dst6; 463 464 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)dst, 0, 0, r->fib); 465 466 if (rin6.ro_rt != NULL) { 467 rt = rin6.ro_rt; 468 fle6->f.fle_o_ifx = rt->rt_ifp->if_index; 469 470 if (rt->rt_flags & RTF_GATEWAY && 471 rt->rt_gateway->sa_family == AF_INET6) 472 fle6->f.n.next_hop6 = 473 ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr; 474 475 if (rt_mask(rt)) 476 fle6->f.dst_mask = RT_MASK6(rt); 477 else 478 fle6->f.dst_mask = 128; 479 480 RTFREE_LOCKED(rt); 481 } 482 } 483 484 if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) { 485 /* Do route lookup on source address, to fill in src_mask. */ 486 bzero(&rin6, sizeof(struct route_in6)); 487 src = (struct sockaddr_in6 *)&rin6.ro_dst; 488 src->sin6_len = sizeof(struct sockaddr_in6); 489 src->sin6_family = AF_INET6; 490 src->sin6_addr = r->src.r_src6; 491 492 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)src, 0, 0, r->fib); 493 494 if (rin6.ro_rt != NULL) { 495 rt = rin6.ro_rt; 496 497 if (rt_mask(rt)) 498 fle6->f.src_mask = RT_MASK6(rt); 499 else 500 fle6->f.src_mask = 128; 501 502 RTFREE_LOCKED(rt); 503 } 504 } 505 506 /* Push new flow at the and of hash. */ 507 TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash); 508 509 return (0); 510 } 511 #undef ipv6_masklen 512 #undef RT_MASK6 513 #endif 514 515 516 /* 517 * Non-static functions called from ng_netflow.c 518 */ 519 520 /* Allocate memory and set up flow cache */ 521 void 522 ng_netflow_cache_init(priv_p priv) 523 { 524 struct flow_hash_entry *hsh; 525 int i; 526 527 /* Initialize cache UMA zone. */ 528 priv->zone = uma_zcreate("NetFlow IPv4 cache", 529 sizeof(struct flow_entry), uma_ctor_flow, uma_dtor_flow, NULL, 530 NULL, UMA_ALIGN_CACHE, 0); 531 uma_zone_set_max(priv->zone, CACHESIZE); 532 #ifdef INET6 533 priv->zone6 = uma_zcreate("NetFlow IPv6 cache", 534 sizeof(struct flow6_entry), uma_ctor_flow6, uma_dtor_flow6, NULL, 535 NULL, UMA_ALIGN_CACHE, 0); 536 uma_zone_set_max(priv->zone6, CACHESIZE); 537 #endif 538 539 /* Allocate hash. */ 540 priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 541 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 542 543 /* Initialize hash. */ 544 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) { 545 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 546 TAILQ_INIT(&hsh->head); 547 } 548 549 #ifdef INET6 550 /* Allocate hash. */ 551 priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 552 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 553 554 /* Initialize hash. */ 555 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) { 556 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 557 TAILQ_INIT(&hsh->head); 558 } 559 #endif 560 561 ng_netflow_v9_cache_init(priv); 562 CTR0(KTR_NET, "ng_netflow startup()"); 563 } 564 565 /* Initialize new FIB table for v5 and v9 */ 566 int 567 ng_netflow_fib_init(priv_p priv, int fib) 568 { 569 fib_export_p fe = priv_to_fib(priv, fib); 570 571 CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib); 572 573 if (fe != NULL) 574 return (0); 575 576 if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH, 577 M_NOWAIT | M_ZERO)) == NULL) 578 return (ENOMEM); 579 580 mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF); 581 mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF); 582 fe->fib = fib; 583 fe->domain_id = fib; 584 585 if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib], 586 (uintptr_t)NULL, (uintptr_t)fe) == 0) { 587 /* FIB already set up by other ISR */ 588 CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p", 589 fib, fe, priv_to_fib(priv, fib)); 590 mtx_destroy(&fe->export_mtx); 591 mtx_destroy(&fe->export9_mtx); 592 free(fe, M_NETGRAPH); 593 } else { 594 /* Increase counter for statistics */ 595 CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)", 596 fib, fe, priv_to_fib(priv, fib)); 597 atomic_fetchadd_32(&priv->info.nfinfo_alloc_fibs, 1); 598 } 599 600 return (0); 601 } 602 603 /* Free all flow cache memory. Called from node close method. */ 604 void 605 ng_netflow_cache_flush(priv_p priv) 606 { 607 struct flow_entry *fle, *fle1; 608 struct flow_hash_entry *hsh; 609 struct netflow_export_item exp; 610 fib_export_p fe; 611 int i; 612 613 bzero(&exp, sizeof(exp)); 614 615 /* 616 * We are going to free probably billable data. 617 * Expire everything before freeing it. 618 * No locking is required since callout is already drained. 619 */ 620 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) 621 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 622 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 623 fe = priv_to_fib(priv, fle->f.r.fib); 624 expire_flow(priv, fe, fle, NG_QUEUE); 625 } 626 #ifdef INET6 627 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) 628 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 629 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 630 fe = priv_to_fib(priv, fle->f.r.fib); 631 expire_flow(priv, fe, fle, NG_QUEUE); 632 } 633 #endif 634 635 uma_zdestroy(priv->zone); 636 /* Destroy hash mutexes. */ 637 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) 638 mtx_destroy(&hsh->mtx); 639 640 /* Free hash memory. */ 641 if (priv->hash != NULL) 642 free(priv->hash, M_NETFLOW_HASH); 643 #ifdef INET6 644 uma_zdestroy(priv->zone6); 645 /* Destroy hash mutexes. */ 646 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) 647 mtx_destroy(&hsh->mtx); 648 649 /* Free hash memory. */ 650 if (priv->hash6 != NULL) 651 free(priv->hash6, M_NETFLOW_HASH); 652 #endif 653 654 for (i = 0; i < priv->maxfibs; i++) { 655 if ((fe = priv_to_fib(priv, i)) == NULL) 656 continue; 657 658 if (fe->exp.item != NULL) 659 export_send(priv, fe, fe->exp.item, NG_QUEUE); 660 661 if (fe->exp.item9 != NULL) 662 export9_send(priv, fe, fe->exp.item9, 663 fe->exp.item9_opt, NG_QUEUE); 664 665 mtx_destroy(&fe->export_mtx); 666 mtx_destroy(&fe->export9_mtx); 667 free(fe, M_NETGRAPH); 668 } 669 670 ng_netflow_v9_cache_flush(priv); 671 } 672 673 /* Insert packet from into flow cache. */ 674 int 675 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip, 676 caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags, 677 unsigned int src_if_index) 678 { 679 struct flow_entry *fle, *fle1; 680 struct flow_hash_entry *hsh; 681 struct flow_rec r; 682 int hlen, plen; 683 int error = 0; 684 uint16_t eproto; 685 uint8_t tcp_flags = 0; 686 687 bzero(&r, sizeof(r)); 688 689 if (ip->ip_v != IPVERSION) 690 return (EINVAL); 691 692 hlen = ip->ip_hl << 2; 693 if (hlen < sizeof(struct ip)) 694 return (EINVAL); 695 696 eproto = ETHERTYPE_IP; 697 /* Assume L4 template by default */ 698 r.flow_type = NETFLOW_V9_FLOW_V4_L4; 699 700 r.r_src = ip->ip_src; 701 r.r_dst = ip->ip_dst; 702 r.fib = fe->fib; 703 704 plen = ntohs(ip->ip_len); 705 706 r.r_ip_p = ip->ip_p; 707 r.r_tos = ip->ip_tos; 708 709 r.r_i_ifx = src_if_index; 710 711 /* 712 * XXX NOTE: only first fragment of fragmented TCP, UDP and 713 * ICMP packet will be recorded with proper s_port and d_port. 714 * Following fragments will be recorded simply as IP packet with 715 * ip_proto = ip->ip_p and s_port, d_port set to zero. 716 * I know, it looks like bug. But I don't want to re-implement 717 * ip packet assebmling here. Anyway, (in)famous trafd works this way - 718 * and nobody complains yet :) 719 */ 720 if ((ip->ip_off & htons(IP_OFFMASK)) == 0) 721 switch(r.r_ip_p) { 722 case IPPROTO_TCP: 723 { 724 struct tcphdr *tcp; 725 726 tcp = (struct tcphdr *)((caddr_t )ip + hlen); 727 r.r_sport = tcp->th_sport; 728 r.r_dport = tcp->th_dport; 729 tcp_flags = tcp->th_flags; 730 break; 731 } 732 case IPPROTO_UDP: 733 r.r_ports = *(uint32_t *)((caddr_t )ip + hlen); 734 break; 735 } 736 737 atomic_fetchadd_32(&priv->info.nfinfo_packets, 1); 738 /* XXX: atomic */ 739 priv->info.nfinfo_bytes += plen; 740 741 /* Find hash slot. */ 742 hsh = &priv->hash[ip_hash(&r)]; 743 744 mtx_lock(&hsh->mtx); 745 746 /* 747 * Go through hash and find our entry. If we encounter an 748 * entry, that should be expired, purge it. We do a reverse 749 * search since most active entries are first, and most 750 * searches are done on most active entries. 751 */ 752 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 753 if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0) 754 break; 755 if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) { 756 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 757 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), 758 fle, NG_QUEUE); 759 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 760 } 761 } 762 763 if (fle) { /* An existent entry. */ 764 765 fle->f.bytes += plen; 766 fle->f.packets ++; 767 fle->f.tcp_flags |= tcp_flags; 768 fle->f.last = time_uptime; 769 770 /* 771 * We have the following reasons to expire flow in active way: 772 * - it hit active timeout 773 * - a TCP connection closed 774 * - it is going to overflow counter 775 */ 776 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) || 777 (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 778 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 779 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), 780 fle, NG_QUEUE); 781 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 782 } else { 783 /* 784 * It is the newest, move it to the tail, 785 * if it isn't there already. Next search will 786 * locate it quicker. 787 */ 788 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 789 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 790 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 791 } 792 } 793 } else /* A new flow entry. */ 794 error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags); 795 796 mtx_unlock(&hsh->mtx); 797 798 return (error); 799 } 800 801 #ifdef INET6 802 /* Insert IPv6 packet from into flow cache. */ 803 int 804 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6, 805 caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags, 806 unsigned int src_if_index) 807 { 808 struct flow_entry *fle = NULL, *fle1; 809 struct flow6_entry *fle6; 810 struct flow_hash_entry *hsh; 811 struct flow6_rec r; 812 int plen; 813 int error = 0; 814 uint8_t tcp_flags = 0; 815 816 /* check version */ 817 if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION) 818 return (EINVAL); 819 820 bzero(&r, sizeof(r)); 821 822 r.src.r_src6 = ip6->ip6_src; 823 r.dst.r_dst6 = ip6->ip6_dst; 824 r.fib = fe->fib; 825 826 /* Assume L4 template by default */ 827 r.flow_type = NETFLOW_V9_FLOW_V6_L4; 828 829 plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr); 830 831 #if 0 832 /* XXX: set DSCP/CoS value */ 833 r.r_tos = ip->ip_tos; 834 #endif 835 if ((flags & NG_NETFLOW_IS_FRAG) == 0) { 836 switch(upper_proto) { 837 case IPPROTO_TCP: 838 { 839 struct tcphdr *tcp; 840 841 tcp = (struct tcphdr *)upper_ptr; 842 r.r_ports = *(uint32_t *)upper_ptr; 843 tcp_flags = tcp->th_flags; 844 break; 845 } 846 case IPPROTO_UDP: 847 case IPPROTO_SCTP: 848 r.r_ports = *(uint32_t *)upper_ptr; 849 break; 850 } 851 } 852 853 r.r_ip_p = upper_proto; 854 r.r_i_ifx = src_if_index; 855 856 atomic_fetchadd_32(&priv->info.nfinfo_packets6, 1); 857 /* XXX: atomic */ 858 priv->info.nfinfo_bytes6 += plen; 859 860 /* Find hash slot. */ 861 hsh = &priv->hash6[ip6_hash(&r)]; 862 863 mtx_lock(&hsh->mtx); 864 865 /* 866 * Go through hash and find our entry. If we encounter an 867 * entry, that should be expired, purge it. We do a reverse 868 * search since most active entries are first, and most 869 * searches are done on most active entries. 870 */ 871 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 872 if (fle->f.version != IP6VERSION) 873 continue; 874 fle6 = (struct flow6_entry *)fle; 875 if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0) 876 break; 877 if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) { 878 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 879 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 880 NG_QUEUE); 881 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 882 } 883 } 884 885 if (fle != NULL) { /* An existent entry. */ 886 fle6 = (struct flow6_entry *)fle; 887 888 fle6->f.bytes += plen; 889 fle6->f.packets ++; 890 fle6->f.tcp_flags |= tcp_flags; 891 fle6->f.last = time_uptime; 892 893 /* 894 * We have the following reasons to expire flow in active way: 895 * - it hit active timeout 896 * - a TCP connection closed 897 * - it is going to overflow counter 898 */ 899 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) || 900 (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 901 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 902 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 903 NG_QUEUE); 904 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 905 } else { 906 /* 907 * It is the newest, move it to the tail, 908 * if it isn't there already. Next search will 909 * locate it quicker. 910 */ 911 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 912 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 913 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 914 } 915 } 916 } else /* A new flow entry. */ 917 error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags); 918 919 mtx_unlock(&hsh->mtx); 920 921 return (error); 922 } 923 #endif 924 925 /* 926 * Return records from cache to userland. 927 * 928 * TODO: matching particular IP should be done in kernel, here. 929 */ 930 int 931 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req, 932 struct ngnf_show_header *resp) 933 { 934 struct flow_hash_entry *hsh; 935 struct flow_entry *fle; 936 struct flow_entry_data *data = (struct flow_entry_data *)(resp + 1); 937 #ifdef INET6 938 struct flow6_entry_data *data6 = (struct flow6_entry_data *)(resp + 1); 939 #endif 940 int i, max; 941 942 i = req->hash_id; 943 if (i > NBUCKETS-1) 944 return (EINVAL); 945 946 #ifdef INET6 947 if (req->version == 6) { 948 resp->version = 6; 949 hsh = priv->hash6 + i; 950 max = NREC6_AT_ONCE; 951 } else 952 #endif 953 if (req->version == 4) { 954 resp->version = 4; 955 hsh = priv->hash + i; 956 max = NREC_AT_ONCE; 957 } else 958 return (EINVAL); 959 960 /* 961 * We will transfer not more than NREC_AT_ONCE. More data 962 * will come in next message. 963 * We send current hash index and current record number in list 964 * to userland, and userland should return it back to us. 965 * Then, we will restart with new entry. 966 * 967 * The resulting cache snapshot can be inaccurate if flow expiration 968 * is taking place on hash item between userland data requests for 969 * this hash item id. 970 */ 971 resp->nentries = 0; 972 for (; i < NBUCKETS; hsh++, i++) { 973 int list_id; 974 975 if (mtx_trylock(&hsh->mtx) == 0) { 976 /* 977 * Requested hash index is not available, 978 * relay decision to skip or re-request data 979 * to userland. 980 */ 981 resp->hash_id = i; 982 resp->list_id = 0; 983 return (0); 984 } 985 986 list_id = 0; 987 TAILQ_FOREACH(fle, &hsh->head, fle_hash) { 988 if (hsh->mtx.mtx_lock & MTX_CONTESTED) { 989 resp->hash_id = i; 990 resp->list_id = list_id; 991 mtx_unlock(&hsh->mtx); 992 return (0); 993 } 994 995 list_id++; 996 /* Search for particular record in list. */ 997 if (req->list_id > 0) { 998 if (list_id < req->list_id) 999 continue; 1000 1001 /* Requested list position found. */ 1002 req->list_id = 0; 1003 } 1004 #ifdef INET6 1005 if (req->version == 6) { 1006 struct flow6_entry *fle6; 1007 1008 fle6 = (struct flow6_entry *)fle; 1009 bcopy(&fle6->f, data6 + resp->nentries, 1010 sizeof(fle6->f)); 1011 } else 1012 #endif 1013 bcopy(&fle->f, data + resp->nentries, 1014 sizeof(fle->f)); 1015 resp->nentries++; 1016 if (resp->nentries == max) { 1017 resp->hash_id = i; 1018 /* 1019 * If it was the last item in list 1020 * we simply skip to next hash_id. 1021 */ 1022 resp->list_id = list_id + 1; 1023 mtx_unlock(&hsh->mtx); 1024 return (0); 1025 } 1026 } 1027 mtx_unlock(&hsh->mtx); 1028 } 1029 1030 resp->hash_id = resp->list_id = 0; 1031 1032 return (0); 1033 } 1034 1035 /* We have full datagram in privdata. Send it to export hook. */ 1036 static int 1037 export_send(priv_p priv, fib_export_p fe, item_p item, int flags) 1038 { 1039 struct mbuf *m = NGI_M(item); 1040 struct netflow_v5_export_dgram *dgram = mtod(m, 1041 struct netflow_v5_export_dgram *); 1042 struct netflow_v5_header *header = &dgram->header; 1043 struct timespec ts; 1044 int error = 0; 1045 1046 /* Fill mbuf header. */ 1047 m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) * 1048 header->count + sizeof(struct netflow_v5_header); 1049 1050 /* Fill export header. */ 1051 header->sys_uptime = htonl(MILLIUPTIME(time_uptime)); 1052 getnanotime(&ts); 1053 header->unix_secs = htonl(ts.tv_sec); 1054 header->unix_nsecs = htonl(ts.tv_nsec); 1055 header->engine_type = 0; 1056 header->engine_id = fe->domain_id; 1057 header->pad = 0; 1058 header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq, 1059 header->count)); 1060 header->count = htons(header->count); 1061 1062 if (priv->export != NULL) 1063 NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags); 1064 else 1065 NG_FREE_ITEM(item); 1066 1067 return (error); 1068 } 1069 1070 1071 /* Add export record to dgram. */ 1072 static int 1073 export_add(item_p item, struct flow_entry *fle) 1074 { 1075 struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item), 1076 struct netflow_v5_export_dgram *); 1077 struct netflow_v5_header *header = &dgram->header; 1078 struct netflow_v5_record *rec; 1079 1080 rec = &dgram->r[header->count]; 1081 header->count ++; 1082 1083 KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS, 1084 ("ng_netflow: export too big")); 1085 1086 /* Fill in export record. */ 1087 rec->src_addr = fle->f.r.r_src.s_addr; 1088 rec->dst_addr = fle->f.r.r_dst.s_addr; 1089 rec->next_hop = fle->f.next_hop.s_addr; 1090 rec->i_ifx = htons(fle->f.fle_i_ifx); 1091 rec->o_ifx = htons(fle->f.fle_o_ifx); 1092 rec->packets = htonl(fle->f.packets); 1093 rec->octets = htonl(fle->f.bytes); 1094 rec->first = htonl(MILLIUPTIME(fle->f.first)); 1095 rec->last = htonl(MILLIUPTIME(fle->f.last)); 1096 rec->s_port = fle->f.r.r_sport; 1097 rec->d_port = fle->f.r.r_dport; 1098 rec->flags = fle->f.tcp_flags; 1099 rec->prot = fle->f.r.r_ip_p; 1100 rec->tos = fle->f.r.r_tos; 1101 rec->dst_mask = fle->f.dst_mask; 1102 rec->src_mask = fle->f.src_mask; 1103 rec->pad1 = 0; 1104 rec->pad2 = 0; 1105 1106 /* Not supported fields. */ 1107 rec->src_as = rec->dst_as = 0; 1108 1109 if (header->count == NETFLOW_V5_MAX_RECORDS) 1110 return (1); /* end of datagram */ 1111 else 1112 return (0); 1113 } 1114 1115 /* Periodic flow expiry run. */ 1116 void 1117 ng_netflow_expire(void *arg) 1118 { 1119 struct flow_entry *fle, *fle1; 1120 struct flow_hash_entry *hsh; 1121 priv_p priv = (priv_p )arg; 1122 uint32_t used; 1123 int i; 1124 1125 /* 1126 * Going through all the cache. 1127 */ 1128 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) { 1129 /* 1130 * Skip entries, that are already being worked on. 1131 */ 1132 if (mtx_trylock(&hsh->mtx) == 0) 1133 continue; 1134 1135 used = atomic_load_acq_32(&priv->info.nfinfo_used); 1136 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1137 /* 1138 * Interrupt thread wants this entry! 1139 * Quick! Quick! Bail out! 1140 */ 1141 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1142 break; 1143 1144 /* 1145 * Don't expire aggressively while hash collision 1146 * ratio is predicted small. 1147 */ 1148 if (used <= (NBUCKETS*2) && !INACTIVE(fle)) 1149 break; 1150 1151 if ((INACTIVE(fle) && (SMALL(fle) || 1152 (used > (NBUCKETS*2)))) || AGED(fle)) { 1153 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1154 expire_flow(priv, priv_to_fib(priv, 1155 fle->f.r.fib), fle, NG_NOFLAGS); 1156 used--; 1157 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1158 } 1159 } 1160 mtx_unlock(&hsh->mtx); 1161 } 1162 1163 #ifdef INET6 1164 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) { 1165 struct flow6_entry *fle6; 1166 1167 /* 1168 * Skip entries, that are already being worked on. 1169 */ 1170 if (mtx_trylock(&hsh->mtx) == 0) 1171 continue; 1172 1173 used = atomic_load_acq_32(&priv->info.nfinfo_used6); 1174 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1175 fle6 = (struct flow6_entry *)fle; 1176 /* 1177 * Interrupt thread wants this entry! 1178 * Quick! Quick! Bail out! 1179 */ 1180 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1181 break; 1182 1183 /* 1184 * Don't expire aggressively while hash collision 1185 * ratio is predicted small. 1186 */ 1187 if (used <= (NBUCKETS*2) && !INACTIVE(fle6)) 1188 break; 1189 1190 if ((INACTIVE(fle6) && (SMALL(fle6) || 1191 (used > (NBUCKETS*2)))) || AGED(fle6)) { 1192 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1193 expire_flow(priv, priv_to_fib(priv, 1194 fle->f.r.fib), fle, NG_NOFLAGS); 1195 used--; 1196 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1197 } 1198 } 1199 mtx_unlock(&hsh->mtx); 1200 } 1201 #endif 1202 1203 /* Schedule next expire. */ 1204 callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire, 1205 (void *)priv); 1206 } 1207