1 /*- 2 * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru> 3 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org> 4 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $ 29 */ 30 31 static const char rcs_id[] = 32 "@(#) $FreeBSD$"; 33 34 #include "opt_inet6.h" 35 #include "opt_route.h" 36 #include <sys/param.h> 37 #include <sys/kernel.h> 38 #include <sys/limits.h> 39 #include <sys/mbuf.h> 40 #include <sys/syslog.h> 41 #include <sys/systm.h> 42 #include <sys/socket.h> 43 #include <sys/endian.h> 44 45 #include <machine/atomic.h> 46 #include <machine/stdarg.h> 47 48 #include <net/if.h> 49 #include <net/route.h> 50 #include <net/ethernet.h> 51 #include <netinet/in.h> 52 #include <netinet/in_systm.h> 53 #include <netinet/ip.h> 54 #include <netinet/ip6.h> 55 #include <netinet/tcp.h> 56 #include <netinet/udp.h> 57 58 #include <netgraph/ng_message.h> 59 #include <netgraph/netgraph.h> 60 61 #include <netgraph/netflow/netflow.h> 62 #include <netgraph/netflow/netflow_v9.h> 63 #include <netgraph/netflow/ng_netflow.h> 64 65 #define NBUCKETS (65536) /* must be power of 2 */ 66 67 /* This hash is for TCP or UDP packets. */ 68 #define FULL_HASH(addr1, addr2, port1, port2) \ 69 (((addr1 ^ (addr1 >> 16) ^ \ 70 htons(addr2 ^ (addr2 >> 16))) ^ \ 71 port1 ^ htons(port2)) & \ 72 (NBUCKETS - 1)) 73 74 /* This hash is for all other IP packets. */ 75 #define ADDR_HASH(addr1, addr2) \ 76 ((addr1 ^ (addr1 >> 16) ^ \ 77 htons(addr2 ^ (addr2 >> 16))) & \ 78 (NBUCKETS - 1)) 79 80 /* Macros to shorten logical constructions */ 81 /* XXX: priv must exist in namespace */ 82 #define INACTIVE(fle) (time_uptime - fle->f.last > priv->info.nfinfo_inact_t) 83 #define AGED(fle) (time_uptime - fle->f.first > priv->info.nfinfo_act_t) 84 #define ISFREE(fle) (fle->f.packets == 0) 85 86 /* 87 * 4 is a magical number: statistically number of 4-packet flows is 88 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP 89 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case 90 * of reachable host and 4-packet otherwise. 91 */ 92 #define SMALL(fle) (fle->f.packets <= 4) 93 94 95 MALLOC_DECLARE(M_NETFLOW_HASH); 96 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash"); 97 98 static int export_add(item_p, struct flow_entry *); 99 static int export_send(priv_p, fib_export_p, item_p, int); 100 101 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *, int, uint8_t); 102 #ifdef INET6 103 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *, int, uint8_t); 104 #endif 105 106 static __inline void expire_flow(priv_p, fib_export_p, struct flow_entry *, int); 107 108 /* 109 * Generate hash for a given flow record. 110 * 111 * FIB is not used here, because: 112 * most VRFS will carry public IPv4 addresses which are unique even 113 * without FIB private addresses can overlap, but this is worked out 114 * via flow_rec bcmp() containing fib id. In IPv6 world addresses are 115 * all globally unique (it's not fully true, there is FC00::/7 for example, 116 * but chances of address overlap are MUCH smaller) 117 */ 118 static __inline uint32_t 119 ip_hash(struct flow_rec *r) 120 { 121 switch (r->r_ip_p) { 122 case IPPROTO_TCP: 123 case IPPROTO_UDP: 124 return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr, 125 r->r_sport, r->r_dport); 126 default: 127 return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr); 128 } 129 } 130 131 #ifdef INET6 132 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */ 133 static __inline uint32_t 134 ip6_hash(struct flow6_rec *r) 135 { 136 switch (r->r_ip_p) { 137 case IPPROTO_TCP: 138 case IPPROTO_UDP: 139 return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 140 r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport, 141 r->r_dport); 142 default: 143 return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3], 144 r->dst.r_dst6.__u6_addr.__u6_addr32[3]); 145 } 146 } 147 #endif 148 149 /* This is callback from uma(9), called on alloc. */ 150 static int 151 uma_ctor_flow(void *mem, int size, void *arg, int how) 152 { 153 priv_p priv = (priv_p )arg; 154 155 if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE) 156 return (ENOMEM); 157 158 atomic_add_32(&priv->info.nfinfo_used, 1); 159 160 return (0); 161 } 162 163 /* This is callback from uma(9), called on free. */ 164 static void 165 uma_dtor_flow(void *mem, int size, void *arg) 166 { 167 priv_p priv = (priv_p )arg; 168 169 atomic_subtract_32(&priv->info.nfinfo_used, 1); 170 } 171 172 #ifdef INET6 173 /* This is callback from uma(9), called on alloc. */ 174 static int 175 uma_ctor_flow6(void *mem, int size, void *arg, int how) 176 { 177 priv_p priv = (priv_p )arg; 178 179 if (atomic_load_acq_32(&priv->info.nfinfo_used6) >= CACHESIZE) 180 return (ENOMEM); 181 182 atomic_add_32(&priv->info.nfinfo_used6, 1); 183 184 return (0); 185 } 186 187 /* This is callback from uma(9), called on free. */ 188 static void 189 uma_dtor_flow6(void *mem, int size, void *arg) 190 { 191 priv_p priv = (priv_p )arg; 192 193 atomic_subtract_32(&priv->info.nfinfo_used6, 1); 194 } 195 #endif 196 197 /* 198 * Detach export datagram from priv, if there is any. 199 * If there is no, allocate a new one. 200 */ 201 static item_p 202 get_export_dgram(priv_p priv, fib_export_p fe) 203 { 204 item_p item = NULL; 205 206 mtx_lock(&fe->export_mtx); 207 if (fe->exp.item != NULL) { 208 item = fe->exp.item; 209 fe->exp.item = NULL; 210 } 211 mtx_unlock(&fe->export_mtx); 212 213 if (item == NULL) { 214 struct netflow_v5_export_dgram *dgram; 215 struct mbuf *m; 216 217 m = m_getcl(M_DONTWAIT, MT_DATA, M_PKTHDR); 218 if (m == NULL) 219 return (NULL); 220 item = ng_package_data(m, NG_NOFLAGS); 221 if (item == NULL) 222 return (NULL); 223 dgram = mtod(m, struct netflow_v5_export_dgram *); 224 dgram->header.count = 0; 225 dgram->header.version = htons(NETFLOW_V5); 226 dgram->header.pad = 0; 227 228 } 229 230 return (item); 231 } 232 233 /* 234 * Re-attach incomplete datagram back to priv. 235 * If there is already another one, then send incomplete. */ 236 static void 237 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags) 238 { 239 /* 240 * It may happen on SMP, that some thread has already 241 * put its item there, in this case we bail out and 242 * send what we have to collector. 243 */ 244 mtx_lock(&fe->export_mtx); 245 if (fe->exp.item == NULL) { 246 fe->exp.item = item; 247 mtx_unlock(&fe->export_mtx); 248 } else { 249 mtx_unlock(&fe->export_mtx); 250 export_send(priv, fe, item, flags); 251 } 252 } 253 254 /* 255 * The flow is over. Call export_add() and free it. If datagram is 256 * full, then call export_send(). 257 */ 258 static __inline void 259 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags) 260 { 261 struct netflow_export_item exp; 262 uint16_t version = fle->f.version; 263 264 if ((priv->export != NULL) && (version == IPVERSION)) { 265 exp.item = get_export_dgram(priv, fe); 266 if (exp.item == NULL) { 267 atomic_add_32(&priv->info.nfinfo_export_failed, 1); 268 if (priv->export9 != NULL) 269 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 270 /* fle definitely contains IPv4 flow */ 271 uma_zfree_arg(priv->zone, fle, priv); 272 return; 273 } 274 275 if (export_add(exp.item, fle) > 0) 276 export_send(priv, fe, exp.item, flags); 277 else 278 return_export_dgram(priv, fe, exp.item, NG_QUEUE); 279 } 280 281 if (priv->export9 != NULL) { 282 exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt); 283 if (exp.item9 == NULL) { 284 atomic_add_32(&priv->info.nfinfo_export9_failed, 1); 285 if (version == IPVERSION) 286 uma_zfree_arg(priv->zone, fle, priv); 287 #ifdef INET6 288 else if (version == IP6VERSION) 289 uma_zfree_arg(priv->zone6, fle, priv); 290 #endif 291 else 292 panic("ng_netflow: Unknown IP proto: %d", version); 293 return; 294 } 295 296 if (export9_add(exp.item9, exp.item9_opt, fle) > 0) 297 export9_send(priv, fe, exp.item9, exp.item9_opt, flags); 298 else 299 return_export9_dgram(priv, fe, exp.item9, exp.item9_opt, NG_QUEUE); 300 } 301 302 if (version == IPVERSION) 303 uma_zfree_arg(priv->zone, fle, priv); 304 #ifdef INET6 305 else if (version == IP6VERSION) 306 uma_zfree_arg(priv->zone6, fle, priv); 307 #endif 308 } 309 310 /* Get a snapshot of node statistics */ 311 void 312 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i) 313 { 314 /* XXX: atomic */ 315 memcpy((void *)i, (void *)&priv->info, sizeof(priv->info)); 316 } 317 318 /* 319 * Insert a record into defined slot. 320 * 321 * First we get for us a free flow entry, then fill in all 322 * possible fields in it. 323 * 324 * TODO: consider dropping hash mutex while filling in datagram, 325 * as this was done in previous version. Need to test & profile 326 * to be sure. 327 */ 328 static __inline int 329 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r, 330 int plen, uint8_t tcp_flags) 331 { 332 struct flow_entry *fle; 333 struct sockaddr_in sin; 334 struct rtentry *rt; 335 336 mtx_assert(&hsh->mtx, MA_OWNED); 337 338 fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT); 339 if (fle == NULL) { 340 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 341 return (ENOMEM); 342 } 343 344 /* 345 * Now fle is totally ours. It is detached from all lists, 346 * we can safely edit it. 347 */ 348 349 fle->f.version = IPVERSION; 350 bcopy(r, &fle->f.r, sizeof(struct flow_rec)); 351 fle->f.bytes = plen; 352 fle->f.packets = 1; 353 fle->f.tcp_flags = tcp_flags; 354 355 fle->f.first = fle->f.last = time_uptime; 356 357 /* 358 * First we do route table lookup on destination address. So we can 359 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 360 */ 361 bzero(&sin, sizeof(sin)); 362 sin.sin_len = sizeof(struct sockaddr_in); 363 sin.sin_family = AF_INET; 364 sin.sin_addr = fle->f.r.r_dst; 365 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 366 if (rt != NULL) { 367 fle->f.fle_o_ifx = rt->rt_ifp->if_index; 368 369 if (rt->rt_flags & RTF_GATEWAY && 370 rt->rt_gateway->sa_family == AF_INET) 371 fle->f.next_hop = 372 ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr; 373 374 if (rt_mask(rt)) 375 fle->f.dst_mask = bitcount32(((struct sockaddr_in *) 376 rt_mask(rt))->sin_addr.s_addr); 377 else if (rt->rt_flags & RTF_HOST) 378 /* Give up. We can't determine mask :( */ 379 fle->f.dst_mask = 32; 380 381 RTFREE_LOCKED(rt); 382 } 383 384 /* Do route lookup on source address, to fill in src_mask. */ 385 bzero(&sin, sizeof(sin)); 386 sin.sin_len = sizeof(struct sockaddr_in); 387 sin.sin_family = AF_INET; 388 sin.sin_addr = fle->f.r.r_src; 389 rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib); 390 if (rt != NULL) { 391 if (rt_mask(rt)) 392 fle->f.src_mask = bitcount32(((struct sockaddr_in *) 393 rt_mask(rt))->sin_addr.s_addr); 394 else if (rt->rt_flags & RTF_HOST) 395 /* Give up. We can't determine mask :( */ 396 fle->f.src_mask = 32; 397 398 RTFREE_LOCKED(rt); 399 } 400 401 /* Push new flow at the and of hash. */ 402 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 403 404 return (0); 405 } 406 407 #ifdef INET6 408 /* XXX: make normal function, instead of.. */ 409 #define ipv6_masklen(x) bitcount32((x).__u6_addr.__u6_addr32[0]) + \ 410 bitcount32((x).__u6_addr.__u6_addr32[1]) + \ 411 bitcount32((x).__u6_addr.__u6_addr32[2]) + \ 412 bitcount32((x).__u6_addr.__u6_addr32[3]) 413 /* XXX: Do we need inline here ? */ 414 static __inline int 415 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r, 416 int plen, uint8_t tcp_flags) 417 { 418 struct flow6_entry *fle6; 419 struct sockaddr_in6 *src, *dst; 420 struct rtentry *rt; 421 struct route_in6 rin6; 422 423 mtx_assert(&hsh6->mtx, MA_OWNED); 424 425 fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT); 426 if (fle6 == NULL) { 427 atomic_add_32(&priv->info.nfinfo_alloc_failed, 1); 428 return (ENOMEM); 429 } 430 431 /* 432 * Now fle is totally ours. It is detached from all lists, 433 * we can safely edit it. 434 */ 435 436 fle6->f.version = IP6VERSION; 437 bcopy(r, &fle6->f.r, sizeof(struct flow6_rec)); 438 fle6->f.bytes = plen; 439 fle6->f.packets = 1; 440 fle6->f.tcp_flags = tcp_flags; 441 442 fle6->f.first = fle6->f.last = time_uptime; 443 444 /* 445 * First we do route table lookup on destination address. So we can 446 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases. 447 */ 448 bzero(&rin6, sizeof(struct route_in6)); 449 dst = (struct sockaddr_in6 *)&rin6.ro_dst; 450 dst->sin6_len = sizeof(struct sockaddr_in6); 451 dst->sin6_family = AF_INET6; 452 dst->sin6_addr = r->dst.r_dst6; 453 454 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)dst, 0, 0, r->fib); 455 456 if (rin6.ro_rt != NULL) { 457 rt = rin6.ro_rt; 458 fle6->f.fle_o_ifx = rt->rt_ifp->if_index; 459 460 if (rt->rt_flags & RTF_GATEWAY && 461 rt->rt_gateway->sa_family == AF_INET6) 462 fle6->f.n.next_hop6 = 463 ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr; 464 465 if (rt_mask(rt)) 466 fle6->f.dst_mask = ipv6_masklen(((struct sockaddr_in6 *)rt_mask(rt))->sin6_addr); 467 else 468 fle6->f.dst_mask = 128; 469 470 RTFREE_LOCKED(rt); 471 } 472 473 /* Do route lookup on source address, to fill in src_mask. */ 474 bzero(&rin6, sizeof(struct route_in6)); 475 src = (struct sockaddr_in6 *)&rin6.ro_dst; 476 src->sin6_len = sizeof(struct sockaddr_in6); 477 src->sin6_family = AF_INET6; 478 src->sin6_addr = r->src.r_src6; 479 480 rin6.ro_rt = rtalloc1_fib((struct sockaddr *)src, 0, 0, r->fib); 481 482 if (rin6.ro_rt != NULL) { 483 rt = rin6.ro_rt; 484 485 if (rt_mask(rt)) 486 fle6->f.src_mask = ipv6_masklen(((struct sockaddr_in6 *)rt_mask(rt))->sin6_addr); 487 else 488 fle6->f.src_mask = 128; 489 490 RTFREE_LOCKED(rt); 491 } 492 493 /* Push new flow at the and of hash. */ 494 TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash); 495 496 return (0); 497 } 498 #endif 499 500 501 /* 502 * Non-static functions called from ng_netflow.c 503 */ 504 505 /* Allocate memory and set up flow cache */ 506 void 507 ng_netflow_cache_init(priv_p priv) 508 { 509 struct flow_hash_entry *hsh; 510 int i; 511 512 /* Initialize cache UMA zone. */ 513 priv->zone = uma_zcreate("NetFlow IPv4 cache", sizeof(struct flow_entry), 514 uma_ctor_flow, uma_dtor_flow, NULL, NULL, UMA_ALIGN_CACHE, 0); 515 uma_zone_set_max(priv->zone, CACHESIZE); 516 #ifdef INET6 517 priv->zone6 = uma_zcreate("NetFlow IPv6 cache", sizeof(struct flow6_entry), 518 uma_ctor_flow6, uma_dtor_flow6, NULL, NULL, UMA_ALIGN_CACHE, 0); 519 uma_zone_set_max(priv->zone6, CACHESIZE); 520 #endif 521 522 /* Allocate hash. */ 523 priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 524 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 525 526 /* Initialize hash. */ 527 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) { 528 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 529 TAILQ_INIT(&hsh->head); 530 } 531 532 #ifdef INET6 533 /* Allocate hash. */ 534 priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry), 535 M_NETFLOW_HASH, M_WAITOK | M_ZERO); 536 537 /* Initialize hash. */ 538 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) { 539 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); 540 TAILQ_INIT(&hsh->head); 541 } 542 #endif 543 544 ng_netflow_v9_cache_init(priv); 545 CTR0(KTR_NET, "ng_netflow startup()"); 546 } 547 548 /* Initialize new FIB table for v5 and v9 */ 549 int 550 ng_netflow_fib_init(priv_p priv, int fib) 551 { 552 fib_export_p fe = priv_to_fib(priv, fib); 553 554 CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib); 555 556 if (fe != NULL) 557 return (0); 558 559 if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH, M_NOWAIT | M_ZERO)) == NULL) 560 return (1); 561 562 mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF); 563 mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF); 564 fe->fib = fib; 565 fe->domain_id = fib; 566 567 if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib], (uintptr_t)NULL, (uintptr_t)fe) == 0) { 568 /* FIB already set up by other ISR */ 569 CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p", fib, fe, priv_to_fib(priv, fib)); 570 mtx_destroy(&fe->export_mtx); 571 mtx_destroy(&fe->export9_mtx); 572 free(fe, M_NETGRAPH); 573 } else { 574 /* Increase counter for statistics */ 575 CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)", fib, fe, priv_to_fib(priv, fib)); 576 atomic_fetchadd_32(&priv->info.nfinfo_alloc_fibs, 1); 577 } 578 579 return (0); 580 } 581 582 /* Free all flow cache memory. Called from node close method. */ 583 void 584 ng_netflow_cache_flush(priv_p priv) 585 { 586 struct flow_entry *fle, *fle1; 587 struct flow_hash_entry *hsh; 588 struct netflow_export_item exp; 589 fib_export_p fe; 590 int i; 591 592 bzero(&exp, sizeof(exp)); 593 594 /* 595 * We are going to free probably billable data. 596 * Expire everything before freeing it. 597 * No locking is required since callout is already drained. 598 */ 599 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) 600 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 601 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 602 fe = priv_to_fib(priv, fle->f.r.fib); 603 expire_flow(priv, fe, fle, NG_QUEUE); 604 } 605 #ifdef INET6 606 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) 607 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 608 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 609 fe = priv_to_fib(priv, fle->f.r.fib); 610 expire_flow(priv, fe, fle, NG_QUEUE); 611 } 612 #endif 613 614 uma_zdestroy(priv->zone); 615 /* Destroy hash mutexes. */ 616 for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) 617 mtx_destroy(&hsh->mtx); 618 619 /* Free hash memory. */ 620 if (priv->hash != NULL) 621 free(priv->hash, M_NETFLOW_HASH); 622 #ifdef INET6 623 uma_zdestroy(priv->zone6); 624 /* Destroy hash mutexes. */ 625 for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) 626 mtx_destroy(&hsh->mtx); 627 628 /* Free hash memory. */ 629 if (priv->hash6 != NULL) 630 free(priv->hash6, M_NETFLOW_HASH); 631 #endif 632 633 for (i = 0; i < RT_NUMFIBS; i++) { 634 if ((fe = priv_to_fib(priv, i)) == NULL) 635 continue; 636 637 if (fe->exp.item != NULL) 638 export_send(priv, fe, fe->exp.item, NG_QUEUE); 639 640 if (fe->exp.item9 != NULL) 641 export9_send(priv, fe, fe->exp.item9, fe->exp.item9_opt, NG_QUEUE); 642 643 mtx_destroy(&fe->export_mtx); 644 mtx_destroy(&fe->export9_mtx); 645 free(fe, M_NETGRAPH); 646 } 647 648 ng_netflow_v9_cache_flush(priv); 649 } 650 651 /* Insert packet from into flow cache. */ 652 int 653 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip, caddr_t upper_ptr, uint8_t upper_proto, 654 uint8_t is_frag, unsigned int src_if_index) 655 { 656 register struct flow_entry *fle, *fle1; 657 struct flow_hash_entry *hsh; 658 struct flow_rec r; 659 int hlen, plen; 660 int error = 0; 661 uint8_t tcp_flags = 0; 662 uint16_t eproto; 663 664 /* Try to fill flow_rec r */ 665 bzero(&r, sizeof(r)); 666 /* check version */ 667 if (ip->ip_v != IPVERSION) 668 return (EINVAL); 669 670 /* verify min header length */ 671 hlen = ip->ip_hl << 2; 672 673 if (hlen < sizeof(struct ip)) 674 return (EINVAL); 675 676 eproto = ETHERTYPE_IP; 677 /* Assume L4 template by default */ 678 r.flow_type = NETFLOW_V9_FLOW_V4_L4; 679 680 r.r_src = ip->ip_src; 681 r.r_dst = ip->ip_dst; 682 r.fib = fe->fib; 683 684 /* save packet length */ 685 plen = ntohs(ip->ip_len); 686 687 r.r_ip_p = ip->ip_p; 688 r.r_tos = ip->ip_tos; 689 690 r.r_i_ifx = src_if_index; 691 692 /* 693 * XXX NOTE: only first fragment of fragmented TCP, UDP and 694 * ICMP packet will be recorded with proper s_port and d_port. 695 * Following fragments will be recorded simply as IP packet with 696 * ip_proto = ip->ip_p and s_port, d_port set to zero. 697 * I know, it looks like bug. But I don't want to re-implement 698 * ip packet assebmling here. Anyway, (in)famous trafd works this way - 699 * and nobody complains yet :) 700 */ 701 if ((ip->ip_off & htons(IP_OFFMASK)) == 0) 702 switch(r.r_ip_p) { 703 case IPPROTO_TCP: 704 { 705 register struct tcphdr *tcp; 706 707 tcp = (struct tcphdr *)((caddr_t )ip + hlen); 708 r.r_sport = tcp->th_sport; 709 r.r_dport = tcp->th_dport; 710 tcp_flags = tcp->th_flags; 711 break; 712 } 713 case IPPROTO_UDP: 714 r.r_ports = *(uint32_t *)((caddr_t )ip + hlen); 715 break; 716 } 717 718 atomic_fetchadd_32(&priv->info.nfinfo_packets, 1); 719 /* XXX: atomic */ 720 priv->info.nfinfo_bytes += plen; 721 722 /* Find hash slot. */ 723 hsh = &priv->hash[ip_hash(&r)]; 724 725 mtx_lock(&hsh->mtx); 726 727 /* 728 * Go through hash and find our entry. If we encounter an 729 * entry, that should be expired, purge it. We do a reverse 730 * search since most active entries are first, and most 731 * searches are done on most active entries. 732 */ 733 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 734 if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0) 735 break; 736 if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) { 737 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 738 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_QUEUE); 739 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 740 } 741 } 742 743 if (fle) { /* An existent entry. */ 744 745 fle->f.bytes += plen; 746 fle->f.packets ++; 747 fle->f.tcp_flags |= tcp_flags; 748 fle->f.last = time_uptime; 749 750 /* 751 * We have the following reasons to expire flow in active way: 752 * - it hit active timeout 753 * - a TCP connection closed 754 * - it is going to overflow counter 755 */ 756 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) || 757 (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 758 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 759 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_QUEUE); 760 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 761 } else { 762 /* 763 * It is the newest, move it to the tail, 764 * if it isn't there already. Next search will 765 * locate it quicker. 766 */ 767 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 768 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 769 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 770 } 771 } 772 } else /* A new flow entry. */ 773 error = hash_insert(priv, hsh, &r, plen, tcp_flags); 774 775 mtx_unlock(&hsh->mtx); 776 777 return (error); 778 } 779 780 #ifdef INET6 781 /* Insert IPv6 packet from into flow cache. */ 782 int 783 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6, caddr_t upper_ptr, uint8_t upper_proto, 784 uint8_t is_frag, unsigned int src_if_index) 785 { 786 register struct flow_entry *fle = NULL, *fle1; 787 register struct flow6_entry *fle6; 788 struct flow_hash_entry *hsh; 789 struct flow6_rec r; 790 int plen; 791 int error = 0; 792 uint8_t tcp_flags = 0; 793 794 /* check version */ 795 if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION) 796 return (EINVAL); 797 798 bzero(&r, sizeof(r)); 799 800 r.src.r_src6 = ip6->ip6_src; 801 r.dst.r_dst6 = ip6->ip6_dst; 802 r.fib = fe->fib; 803 804 /* Assume L4 template by default */ 805 r.flow_type = NETFLOW_V9_FLOW_V6_L4; 806 807 /* save packet length */ 808 plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr); 809 810 /* XXX: set DSCP/CoS value */ 811 #if 0 812 r.r_tos = ip->ip_tos; 813 #endif 814 if (is_frag == 0) { 815 switch(upper_proto) { 816 case IPPROTO_TCP: 817 { 818 register struct tcphdr *tcp; 819 820 tcp = (struct tcphdr *)upper_ptr; 821 r.r_ports = *(uint32_t *)upper_ptr; 822 tcp_flags = tcp->th_flags; 823 break; 824 } 825 case IPPROTO_UDP: 826 case IPPROTO_SCTP: 827 { 828 r.r_ports = *(uint32_t *)upper_ptr; 829 break; 830 } 831 832 } 833 } 834 835 r.r_ip_p = upper_proto; 836 r.r_i_ifx = src_if_index; 837 838 atomic_fetchadd_32(&priv->info.nfinfo_packets6, 1); 839 /* XXX: atomic */ 840 priv->info.nfinfo_bytes6 += plen; 841 842 /* Find hash slot. */ 843 hsh = &priv->hash6[ip6_hash(&r)]; 844 845 mtx_lock(&hsh->mtx); 846 847 /* 848 * Go through hash and find our entry. If we encounter an 849 * entry, that should be expired, purge it. We do a reverse 850 * search since most active entries are first, and most 851 * searches are done on most active entries. 852 */ 853 TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { 854 if (fle->f.version != IP6VERSION) 855 continue; 856 fle6 = (struct flow6_entry *)fle; 857 if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0) 858 break; 859 if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) { 860 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 861 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 862 NG_QUEUE); 863 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 864 } 865 } 866 867 if (fle != NULL) { /* An existent entry. */ 868 fle6 = (struct flow6_entry *)fle; 869 870 fle6->f.bytes += plen; 871 fle6->f.packets ++; 872 fle6->f.tcp_flags |= tcp_flags; 873 fle6->f.last = time_uptime; 874 875 /* 876 * We have the following reasons to expire flow in active way: 877 * - it hit active timeout 878 * - a TCP connection closed 879 * - it is going to overflow counter 880 */ 881 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) || 882 (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) { 883 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 884 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, 885 NG_QUEUE); 886 atomic_add_32(&priv->info.nfinfo_act_exp, 1); 887 } else { 888 /* 889 * It is the newest, move it to the tail, 890 * if it isn't there already. Next search will 891 * locate it quicker. 892 */ 893 if (fle != TAILQ_LAST(&hsh->head, fhead)) { 894 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 895 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); 896 } 897 } 898 } else /* A new flow entry. */ 899 error = hash6_insert(priv, hsh, &r, plen, tcp_flags); 900 901 mtx_unlock(&hsh->mtx); 902 903 return (error); 904 } 905 #endif 906 907 /* 908 * Return records from cache to userland. 909 * 910 * TODO: matching particular IP should be done in kernel, here. 911 */ 912 int 913 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req, 914 struct ngnf_show_header *resp) 915 { 916 struct flow_hash_entry *hsh; 917 struct flow_entry *fle; 918 struct flow_entry_data *data = (struct flow_entry_data *)(resp + 1); 919 #ifdef INET6 920 struct flow6_entry_data *data6 = (struct flow6_entry_data *)(resp + 1); 921 #endif 922 int i, max; 923 924 i = req->hash_id; 925 if (i > NBUCKETS-1) 926 return (EINVAL); 927 928 #ifdef INET6 929 if (req->version == 6) { 930 resp->version = 6; 931 hsh = priv->hash6 + i; 932 max = NREC6_AT_ONCE; 933 } else 934 #endif 935 if (req->version == 4) { 936 resp->version = 4; 937 hsh = priv->hash + i; 938 max = NREC_AT_ONCE; 939 } else 940 return (EINVAL); 941 942 /* 943 * We will transfer not more than NREC_AT_ONCE. More data 944 * will come in next message. 945 * We send current hash index and current record number in list 946 * to userland, and userland should return it back to us. 947 * Then, we will restart with new entry. 948 * 949 * The resulting cache snapshot can be inaccurate if flow expiration 950 * is taking place on hash item between userland data requests for 951 * this hash item id. 952 */ 953 resp->nentries = 0; 954 for (; i < NBUCKETS; hsh++, i++) { 955 int list_id; 956 957 if (mtx_trylock(&hsh->mtx) == 0) { 958 /* 959 * Requested hash index is not available, 960 * relay decision to skip or re-request data 961 * to userland. 962 */ 963 resp->hash_id = i; 964 resp->list_id = 0; 965 return (0); 966 } 967 968 list_id = 0; 969 TAILQ_FOREACH(fle, &hsh->head, fle_hash) { 970 if (hsh->mtx.mtx_lock & MTX_CONTESTED) { 971 resp->hash_id = i; 972 resp->list_id = list_id; 973 mtx_unlock(&hsh->mtx); 974 return (0); 975 } 976 977 list_id++; 978 /* Search for particular record in list. */ 979 if (req->list_id > 0) { 980 if (list_id < req->list_id) 981 continue; 982 983 /* Requested list position found. */ 984 req->list_id = 0; 985 } 986 #ifdef INET6 987 if (req->version == 6) { 988 struct flow6_entry *fle6; 989 990 fle6 = (struct flow6_entry *)fle; 991 bcopy(&fle6->f, data6 + resp->nentries, 992 sizeof(fle6->f)); 993 } else 994 #endif 995 bcopy(&fle->f, data + resp->nentries, 996 sizeof(fle->f)); 997 resp->nentries++; 998 if (resp->nentries == max) { 999 resp->hash_id = i; 1000 /* 1001 * If it was the last item in list 1002 * we simply skip to next hash_id. 1003 */ 1004 resp->list_id = list_id + 1; 1005 mtx_unlock(&hsh->mtx); 1006 return (0); 1007 } 1008 } 1009 mtx_unlock(&hsh->mtx); 1010 } 1011 1012 resp->hash_id = resp->list_id = 0; 1013 1014 return (0); 1015 } 1016 1017 /* We have full datagram in privdata. Send it to export hook. */ 1018 static int 1019 export_send(priv_p priv, fib_export_p fe, item_p item, int flags) 1020 { 1021 struct mbuf *m = NGI_M(item); 1022 struct netflow_v5_export_dgram *dgram = mtod(m, 1023 struct netflow_v5_export_dgram *); 1024 struct netflow_v5_header *header = &dgram->header; 1025 struct timespec ts; 1026 int error = 0; 1027 1028 /* Fill mbuf header. */ 1029 m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) * 1030 header->count + sizeof(struct netflow_v5_header); 1031 1032 /* Fill export header. */ 1033 header->sys_uptime = htonl(MILLIUPTIME(time_uptime)); 1034 getnanotime(&ts); 1035 header->unix_secs = htonl(ts.tv_sec); 1036 header->unix_nsecs = htonl(ts.tv_nsec); 1037 header->engine_type = 0; 1038 header->engine_id = fe->domain_id; 1039 header->pad = 0; 1040 header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq, 1041 header->count)); 1042 header->count = htons(header->count); 1043 1044 if (priv->export != NULL) 1045 NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags); 1046 else 1047 NG_FREE_ITEM(item); 1048 1049 return (error); 1050 } 1051 1052 1053 /* Add export record to dgram. */ 1054 static int 1055 export_add(item_p item, struct flow_entry *fle) 1056 { 1057 struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item), 1058 struct netflow_v5_export_dgram *); 1059 struct netflow_v5_header *header = &dgram->header; 1060 struct netflow_v5_record *rec; 1061 1062 rec = &dgram->r[header->count]; 1063 header->count ++; 1064 1065 KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS, 1066 ("ng_netflow: export too big")); 1067 1068 /* Fill in export record. */ 1069 rec->src_addr = fle->f.r.r_src.s_addr; 1070 rec->dst_addr = fle->f.r.r_dst.s_addr; 1071 rec->next_hop = fle->f.next_hop.s_addr; 1072 rec->i_ifx = htons(fle->f.fle_i_ifx); 1073 rec->o_ifx = htons(fle->f.fle_o_ifx); 1074 rec->packets = htonl(fle->f.packets); 1075 rec->octets = htonl(fle->f.bytes); 1076 rec->first = htonl(MILLIUPTIME(fle->f.first)); 1077 rec->last = htonl(MILLIUPTIME(fle->f.last)); 1078 rec->s_port = fle->f.r.r_sport; 1079 rec->d_port = fle->f.r.r_dport; 1080 rec->flags = fle->f.tcp_flags; 1081 rec->prot = fle->f.r.r_ip_p; 1082 rec->tos = fle->f.r.r_tos; 1083 rec->dst_mask = fle->f.dst_mask; 1084 rec->src_mask = fle->f.src_mask; 1085 rec->pad1 = 0; 1086 rec->pad2 = 0; 1087 1088 /* Not supported fields. */ 1089 rec->src_as = rec->dst_as = 0; 1090 1091 if (header->count == NETFLOW_V5_MAX_RECORDS) 1092 return (1); /* end of datagram */ 1093 else 1094 return (0); 1095 } 1096 1097 /* Periodic flow expiry run. */ 1098 void 1099 ng_netflow_expire(void *arg) 1100 { 1101 struct flow_entry *fle, *fle1; 1102 struct flow_hash_entry *hsh; 1103 priv_p priv = (priv_p )arg; 1104 uint32_t used; 1105 int i; 1106 1107 /* 1108 * Going through all the cache. 1109 */ 1110 for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) { 1111 /* 1112 * Skip entries, that are already being worked on. 1113 */ 1114 if (mtx_trylock(&hsh->mtx) == 0) 1115 continue; 1116 1117 used = atomic_load_acq_32(&priv->info.nfinfo_used); 1118 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1119 /* 1120 * Interrupt thread wants this entry! 1121 * Quick! Quick! Bail out! 1122 */ 1123 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1124 break; 1125 1126 /* 1127 * Don't expire aggressively while hash collision 1128 * ratio is predicted small. 1129 */ 1130 if (used <= (NBUCKETS*2) && !INACTIVE(fle)) 1131 break; 1132 1133 if ((INACTIVE(fle) && (SMALL(fle) || 1134 (used > (NBUCKETS*2)))) || AGED(fle)) { 1135 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1136 expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle, NG_NOFLAGS); 1137 used--; 1138 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1139 } 1140 } 1141 mtx_unlock(&hsh->mtx); 1142 } 1143 1144 #ifdef INET6 1145 for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) { 1146 struct flow6_entry *fle6; 1147 1148 /* 1149 * Skip entries, that are already being worked on. 1150 */ 1151 if (mtx_trylock(&hsh->mtx) == 0) 1152 continue; 1153 1154 used = atomic_load_acq_32(&priv->info.nfinfo_used6); 1155 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { 1156 fle6 = (struct flow6_entry *)fle; 1157 /* 1158 * Interrupt thread wants this entry! 1159 * Quick! Quick! Bail out! 1160 */ 1161 if (hsh->mtx.mtx_lock & MTX_CONTESTED) 1162 break; 1163 1164 /* 1165 * Don't expire aggressively while hash collision 1166 * ratio is predicted small. 1167 */ 1168 if (used <= (NBUCKETS*2) && !INACTIVE(fle6)) 1169 break; 1170 1171 if ((INACTIVE(fle6) && (SMALL(fle6) || 1172 (used > (NBUCKETS*2)))) || AGED(fle6)) { 1173 TAILQ_REMOVE(&hsh->head, fle, fle_hash); 1174 expire_flow(priv, priv_to_fib(priv, 1175 fle->f.r.fib), fle, NG_NOFLAGS); 1176 used--; 1177 atomic_add_32(&priv->info.nfinfo_inact_exp, 1); 1178 } 1179 } 1180 mtx_unlock(&hsh->mtx); 1181 } 1182 #endif 1183 1184 /* Schedule next expire. */ 1185 callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire, 1186 (void *)priv); 1187 } 1188