1 // SPDX-License-Identifier: GPL-2.0 2 3 /* bpf_fq is intended for testing the bpf qdisc infrastructure and not a direct 4 * copy of sch_fq. bpf_fq implements the scheduling algorithm of sch_fq before 5 * 29f834aa326e ("net_sched: sch_fq: add 3 bands and WRR scheduling") was 6 * introduced. It gives each flow a fair chance to transmit packets in a 7 * round-robin fashion. Note that for flow pacing, bpf_fq currently only 8 * respects skb->tstamp but not skb->sk->sk_pacing_rate. In addition, if there 9 * are multiple bpf_fq instances, they will have a shared view of flows and 10 * configuration since some key data structure such as fq_prio_flows, 11 * fq_nonprio_flows, and fq_bpf_data are global. 12 * 13 * To use bpf_fq alone without running selftests, use the following commands. 14 * 15 * 1. Register bpf_fq to the kernel 16 * bpftool struct_ops register bpf_qdisc_fq.bpf.o /sys/fs/bpf 17 * 2. Add bpf_fq to an interface 18 * tc qdisc add dev <interface name> root handle <handle> bpf_fq 19 * 3. Delete bpf_fq attached to the interface 20 * tc qdisc delete dev <interface name> root 21 * 4. Unregister bpf_fq 22 * bpftool struct_ops unregister name fq 23 * 24 * The qdisc name, bpf_fq, used in tc commands is defined by Qdisc_ops.id. 25 * The struct_ops_map_name, fq, used in the bpftool command is the name of the 26 * Qdisc_ops. 27 * 28 * SEC(".struct_ops") 29 * struct Qdisc_ops fq = { 30 * ... 31 * .id = "bpf_fq", 32 * }; 33 */ 34 35 #include <vmlinux.h> 36 #include <errno.h> 37 #include <bpf/bpf_helpers.h> 38 #include "bpf_experimental.h" 39 #include "bpf_qdisc_common.h" 40 41 char _license[] SEC("license") = "GPL"; 42 43 #define NSEC_PER_USEC 1000L 44 #define NSEC_PER_SEC 1000000000L 45 46 #define NUM_QUEUE (1 << 20) 47 48 struct fq_bpf_data { 49 u32 quantum; 50 u32 initial_quantum; 51 u32 flow_refill_delay; 52 u32 flow_plimit; 53 u64 horizon; 54 u32 orphan_mask; 55 u32 timer_slack; 56 u64 time_next_delayed_flow; 57 u64 unthrottle_latency_ns; 58 u8 horizon_drop; 59 u32 new_flow_cnt; 60 u32 old_flow_cnt; 61 u64 ktime_cache; 62 }; 63 64 enum { 65 CLS_RET_PRIO = 0, 66 CLS_RET_NONPRIO = 1, 67 CLS_RET_ERR = 2, 68 }; 69 70 struct skb_node { 71 u64 tstamp; 72 struct sk_buff __kptr * skb; 73 struct bpf_rb_node node; 74 }; 75 76 struct fq_flow_node { 77 int credit; 78 u32 qlen; 79 u64 age; 80 u64 time_next_packet; 81 struct bpf_list_node list_node; 82 struct bpf_rb_node rb_node; 83 struct bpf_rb_root queue __contains(skb_node, node); 84 struct bpf_spin_lock lock; 85 struct bpf_refcount refcount; 86 }; 87 88 struct dequeue_nonprio_ctx { 89 bool stop_iter; 90 u64 expire; 91 u64 now; 92 }; 93 94 struct remove_flows_ctx { 95 bool gc_only; 96 u32 reset_cnt; 97 u32 reset_max; 98 }; 99 100 struct unset_throttled_flows_ctx { 101 bool unset_all; 102 u64 now; 103 }; 104 105 struct fq_stashed_flow { 106 struct fq_flow_node __kptr * flow; 107 }; 108 109 struct { 110 __uint(type, BPF_MAP_TYPE_HASH); 111 __type(key, __u64); 112 __type(value, struct fq_stashed_flow); 113 __uint(max_entries, NUM_QUEUE); 114 } fq_nonprio_flows SEC(".maps"); 115 116 struct { 117 __uint(type, BPF_MAP_TYPE_HASH); 118 __type(key, __u64); 119 __type(value, struct fq_stashed_flow); 120 __uint(max_entries, 1); 121 } fq_prio_flows SEC(".maps"); 122 123 private(A) struct bpf_spin_lock fq_delayed_lock; 124 private(A) struct bpf_rb_root fq_delayed __contains(fq_flow_node, rb_node); 125 126 private(B) struct bpf_spin_lock fq_new_flows_lock; 127 private(B) struct bpf_list_head fq_new_flows __contains(fq_flow_node, list_node); 128 129 private(C) struct bpf_spin_lock fq_old_flows_lock; 130 private(C) struct bpf_list_head fq_old_flows __contains(fq_flow_node, list_node); 131 132 private(D) struct fq_bpf_data q; 133 134 /* Wrapper for bpf_kptr_xchg that expects NULL dst */ 135 static void bpf_kptr_xchg_back(void *map_val, void *ptr) 136 { 137 void *ret; 138 139 ret = bpf_kptr_xchg(map_val, ptr); 140 if (ret) 141 bpf_obj_drop(ret); 142 } 143 144 static bool skbn_tstamp_less(struct bpf_rb_node *a, const struct bpf_rb_node *b) 145 { 146 struct skb_node *skbn_a; 147 struct skb_node *skbn_b; 148 149 skbn_a = container_of(a, struct skb_node, node); 150 skbn_b = container_of(b, struct skb_node, node); 151 152 return skbn_a->tstamp < skbn_b->tstamp; 153 } 154 155 static bool fn_time_next_packet_less(struct bpf_rb_node *a, const struct bpf_rb_node *b) 156 { 157 struct fq_flow_node *flow_a; 158 struct fq_flow_node *flow_b; 159 160 flow_a = container_of(a, struct fq_flow_node, rb_node); 161 flow_b = container_of(b, struct fq_flow_node, rb_node); 162 163 return flow_a->time_next_packet < flow_b->time_next_packet; 164 } 165 166 static void 167 fq_flows_add_head(struct bpf_list_head *head, struct bpf_spin_lock *lock, 168 struct fq_flow_node *flow, u32 *flow_cnt) 169 { 170 bpf_spin_lock(lock); 171 bpf_list_push_front(head, &flow->list_node); 172 bpf_spin_unlock(lock); 173 *flow_cnt += 1; 174 } 175 176 static void 177 fq_flows_add_tail(struct bpf_list_head *head, struct bpf_spin_lock *lock, 178 struct fq_flow_node *flow, u32 *flow_cnt) 179 { 180 bpf_spin_lock(lock); 181 bpf_list_push_back(head, &flow->list_node); 182 bpf_spin_unlock(lock); 183 *flow_cnt += 1; 184 } 185 186 static void 187 fq_flows_remove_front(struct bpf_list_head *head, struct bpf_spin_lock *lock, 188 struct bpf_list_node **node, u32 *flow_cnt) 189 { 190 bpf_spin_lock(lock); 191 *node = bpf_list_pop_front(head); 192 bpf_spin_unlock(lock); 193 *flow_cnt -= 1; 194 } 195 196 static bool 197 fq_flows_is_empty(struct bpf_list_head *head, struct bpf_spin_lock *lock) 198 { 199 struct bpf_list_node *node; 200 201 bpf_spin_lock(lock); 202 node = bpf_list_pop_front(head); 203 if (node) { 204 bpf_list_push_front(head, node); 205 bpf_spin_unlock(lock); 206 return false; 207 } 208 bpf_spin_unlock(lock); 209 210 return true; 211 } 212 213 /* flow->age is used to denote the state of the flow (not-detached, detached, throttled) 214 * as well as the timestamp when the flow is detached. 215 * 216 * 0: not-detached 217 * 1 - (~0ULL-1): detached 218 * ~0ULL: throttled 219 */ 220 static void fq_flow_set_detached(struct fq_flow_node *flow) 221 { 222 flow->age = bpf_jiffies64(); 223 } 224 225 static bool fq_flow_is_detached(struct fq_flow_node *flow) 226 { 227 return flow->age != 0 && flow->age != ~0ULL; 228 } 229 230 static bool sk_listener(struct sock *sk) 231 { 232 return (1 << sk->__sk_common.skc_state) & (TCPF_LISTEN | TCPF_NEW_SYN_RECV); 233 } 234 235 static void fq_gc(void); 236 237 static int fq_new_flow(void *flow_map, struct fq_stashed_flow **sflow, u64 hash) 238 { 239 struct fq_stashed_flow tmp = {}; 240 struct fq_flow_node *flow; 241 int ret; 242 243 flow = bpf_obj_new(typeof(*flow)); 244 if (!flow) 245 return -ENOMEM; 246 247 flow->credit = q.initial_quantum, 248 flow->qlen = 0, 249 flow->age = 1, 250 flow->time_next_packet = 0, 251 252 ret = bpf_map_update_elem(flow_map, &hash, &tmp, 0); 253 if (ret == -ENOMEM || ret == -E2BIG) { 254 fq_gc(); 255 bpf_map_update_elem(&fq_nonprio_flows, &hash, &tmp, 0); 256 } 257 258 *sflow = bpf_map_lookup_elem(flow_map, &hash); 259 if (!*sflow) { 260 bpf_obj_drop(flow); 261 return -ENOMEM; 262 } 263 264 bpf_kptr_xchg_back(&(*sflow)->flow, flow); 265 return 0; 266 } 267 268 static int 269 fq_classify(struct sk_buff *skb, struct fq_stashed_flow **sflow) 270 { 271 struct sock *sk = skb->sk; 272 int ret = CLS_RET_NONPRIO; 273 u64 hash = 0; 274 275 if ((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL) { 276 *sflow = bpf_map_lookup_elem(&fq_prio_flows, &hash); 277 ret = CLS_RET_PRIO; 278 } else { 279 if (!sk || sk_listener(sk)) { 280 hash = bpf_skb_get_hash(skb) & q.orphan_mask; 281 /* Avoid collision with an existing flow hash, which 282 * only uses the lower 32 bits of hash, by setting the 283 * upper half of hash to 1. 284 */ 285 hash |= (1ULL << 32); 286 } else if (sk->__sk_common.skc_state == TCP_CLOSE) { 287 hash = bpf_skb_get_hash(skb) & q.orphan_mask; 288 hash |= (1ULL << 32); 289 } else { 290 hash = sk->__sk_common.skc_hash; 291 } 292 *sflow = bpf_map_lookup_elem(&fq_nonprio_flows, &hash); 293 } 294 295 if (!*sflow) 296 ret = fq_new_flow(&fq_nonprio_flows, sflow, hash) < 0 ? 297 CLS_RET_ERR : CLS_RET_NONPRIO; 298 299 return ret; 300 } 301 302 static bool fq_packet_beyond_horizon(struct sk_buff *skb) 303 { 304 return (s64)skb->tstamp > (s64)(q.ktime_cache + q.horizon); 305 } 306 307 SEC("struct_ops/bpf_fq_enqueue") 308 int BPF_PROG(bpf_fq_enqueue, struct sk_buff *skb, struct Qdisc *sch, 309 struct bpf_sk_buff_ptr *to_free) 310 { 311 struct fq_flow_node *flow = NULL, *flow_copy; 312 struct fq_stashed_flow *sflow; 313 u64 time_to_send, jiffies; 314 struct skb_node *skbn; 315 int ret; 316 317 if (sch->q.qlen >= sch->limit) 318 goto drop; 319 320 if (!skb->tstamp) { 321 time_to_send = q.ktime_cache = bpf_ktime_get_ns(); 322 } else { 323 if (fq_packet_beyond_horizon(skb)) { 324 q.ktime_cache = bpf_ktime_get_ns(); 325 if (fq_packet_beyond_horizon(skb)) { 326 if (q.horizon_drop) 327 goto drop; 328 329 skb->tstamp = q.ktime_cache + q.horizon; 330 } 331 } 332 time_to_send = skb->tstamp; 333 } 334 335 ret = fq_classify(skb, &sflow); 336 if (ret == CLS_RET_ERR) 337 goto drop; 338 339 flow = bpf_kptr_xchg(&sflow->flow, flow); 340 if (!flow) 341 goto drop; 342 343 if (ret == CLS_RET_NONPRIO) { 344 if (flow->qlen >= q.flow_plimit) { 345 bpf_kptr_xchg_back(&sflow->flow, flow); 346 goto drop; 347 } 348 349 if (fq_flow_is_detached(flow)) { 350 flow_copy = bpf_refcount_acquire(flow); 351 352 jiffies = bpf_jiffies64(); 353 if ((s64)(jiffies - (flow_copy->age + q.flow_refill_delay)) > 0) { 354 if (flow_copy->credit < q.quantum) 355 flow_copy->credit = q.quantum; 356 } 357 flow_copy->age = 0; 358 fq_flows_add_tail(&fq_new_flows, &fq_new_flows_lock, flow_copy, 359 &q.new_flow_cnt); 360 } 361 } 362 363 skbn = bpf_obj_new(typeof(*skbn)); 364 if (!skbn) { 365 bpf_kptr_xchg_back(&sflow->flow, flow); 366 goto drop; 367 } 368 369 skbn->tstamp = skb->tstamp = time_to_send; 370 371 sch->qstats.backlog += qdisc_pkt_len(skb); 372 373 skb = bpf_kptr_xchg(&skbn->skb, skb); 374 if (skb) 375 bpf_qdisc_skb_drop(skb, to_free); 376 377 bpf_spin_lock(&flow->lock); 378 bpf_rbtree_add(&flow->queue, &skbn->node, skbn_tstamp_less); 379 bpf_spin_unlock(&flow->lock); 380 381 flow->qlen++; 382 bpf_kptr_xchg_back(&sflow->flow, flow); 383 384 sch->q.qlen++; 385 return NET_XMIT_SUCCESS; 386 387 drop: 388 bpf_qdisc_skb_drop(skb, to_free); 389 sch->qstats.drops++; 390 return NET_XMIT_DROP; 391 } 392 393 static int fq_unset_throttled_flows(u32 index, struct unset_throttled_flows_ctx *ctx) 394 { 395 struct bpf_rb_node *node = NULL; 396 struct fq_flow_node *flow; 397 398 bpf_spin_lock(&fq_delayed_lock); 399 400 node = bpf_rbtree_first(&fq_delayed); 401 if (!node) { 402 bpf_spin_unlock(&fq_delayed_lock); 403 return 1; 404 } 405 406 flow = container_of(node, struct fq_flow_node, rb_node); 407 if (!ctx->unset_all && flow->time_next_packet > ctx->now) { 408 q.time_next_delayed_flow = flow->time_next_packet; 409 bpf_spin_unlock(&fq_delayed_lock); 410 return 1; 411 } 412 413 node = bpf_rbtree_remove(&fq_delayed, &flow->rb_node); 414 415 bpf_spin_unlock(&fq_delayed_lock); 416 417 if (!node) 418 return 1; 419 420 flow = container_of(node, struct fq_flow_node, rb_node); 421 flow->age = 0; 422 fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt); 423 424 return 0; 425 } 426 427 static void fq_flow_set_throttled(struct fq_flow_node *flow) 428 { 429 flow->age = ~0ULL; 430 431 if (q.time_next_delayed_flow > flow->time_next_packet) 432 q.time_next_delayed_flow = flow->time_next_packet; 433 434 bpf_spin_lock(&fq_delayed_lock); 435 bpf_rbtree_add(&fq_delayed, &flow->rb_node, fn_time_next_packet_less); 436 bpf_spin_unlock(&fq_delayed_lock); 437 } 438 439 static void fq_check_throttled(u64 now) 440 { 441 struct unset_throttled_flows_ctx ctx = { 442 .unset_all = false, 443 .now = now, 444 }; 445 unsigned long sample; 446 447 if (q.time_next_delayed_flow > now) 448 return; 449 450 sample = (unsigned long)(now - q.time_next_delayed_flow); 451 q.unthrottle_latency_ns -= q.unthrottle_latency_ns >> 3; 452 q.unthrottle_latency_ns += sample >> 3; 453 454 q.time_next_delayed_flow = ~0ULL; 455 bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &ctx, 0); 456 } 457 458 static struct sk_buff* 459 fq_dequeue_nonprio_flows(u32 index, struct dequeue_nonprio_ctx *ctx) 460 { 461 u64 time_next_packet, time_to_send; 462 struct bpf_rb_node *rb_node; 463 struct sk_buff *skb = NULL; 464 struct bpf_list_head *head; 465 struct bpf_list_node *node; 466 struct bpf_spin_lock *lock; 467 struct fq_flow_node *flow; 468 struct skb_node *skbn; 469 bool is_empty; 470 u32 *cnt; 471 472 if (q.new_flow_cnt) { 473 head = &fq_new_flows; 474 lock = &fq_new_flows_lock; 475 cnt = &q.new_flow_cnt; 476 } else if (q.old_flow_cnt) { 477 head = &fq_old_flows; 478 lock = &fq_old_flows_lock; 479 cnt = &q.old_flow_cnt; 480 } else { 481 if (q.time_next_delayed_flow != ~0ULL) 482 ctx->expire = q.time_next_delayed_flow; 483 goto break_loop; 484 } 485 486 fq_flows_remove_front(head, lock, &node, cnt); 487 if (!node) 488 goto break_loop; 489 490 flow = container_of(node, struct fq_flow_node, list_node); 491 if (flow->credit <= 0) { 492 flow->credit += q.quantum; 493 fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt); 494 return NULL; 495 } 496 497 bpf_spin_lock(&flow->lock); 498 rb_node = bpf_rbtree_first(&flow->queue); 499 if (!rb_node) { 500 bpf_spin_unlock(&flow->lock); 501 is_empty = fq_flows_is_empty(&fq_old_flows, &fq_old_flows_lock); 502 if (head == &fq_new_flows && !is_empty) { 503 fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt); 504 } else { 505 fq_flow_set_detached(flow); 506 bpf_obj_drop(flow); 507 } 508 return NULL; 509 } 510 511 skbn = container_of(rb_node, struct skb_node, node); 512 time_to_send = skbn->tstamp; 513 514 time_next_packet = (time_to_send > flow->time_next_packet) ? 515 time_to_send : flow->time_next_packet; 516 if (ctx->now < time_next_packet) { 517 bpf_spin_unlock(&flow->lock); 518 flow->time_next_packet = time_next_packet; 519 fq_flow_set_throttled(flow); 520 return NULL; 521 } 522 523 rb_node = bpf_rbtree_remove(&flow->queue, rb_node); 524 bpf_spin_unlock(&flow->lock); 525 526 if (!rb_node) 527 goto add_flow_and_break; 528 529 skbn = container_of(rb_node, struct skb_node, node); 530 skb = bpf_kptr_xchg(&skbn->skb, skb); 531 bpf_obj_drop(skbn); 532 533 if (!skb) 534 goto add_flow_and_break; 535 536 flow->credit -= qdisc_skb_cb(skb)->pkt_len; 537 flow->qlen--; 538 539 add_flow_and_break: 540 fq_flows_add_head(head, lock, flow, cnt); 541 542 break_loop: 543 ctx->stop_iter = true; 544 return skb; 545 } 546 547 static struct sk_buff *fq_dequeue_prio(void) 548 { 549 struct fq_flow_node *flow = NULL; 550 struct fq_stashed_flow *sflow; 551 struct bpf_rb_node *rb_node; 552 struct sk_buff *skb = NULL; 553 struct skb_node *skbn; 554 u64 hash = 0; 555 556 sflow = bpf_map_lookup_elem(&fq_prio_flows, &hash); 557 if (!sflow) 558 return NULL; 559 560 flow = bpf_kptr_xchg(&sflow->flow, flow); 561 if (!flow) 562 return NULL; 563 564 bpf_spin_lock(&flow->lock); 565 rb_node = bpf_rbtree_first(&flow->queue); 566 if (!rb_node) { 567 bpf_spin_unlock(&flow->lock); 568 goto out; 569 } 570 571 skbn = container_of(rb_node, struct skb_node, node); 572 rb_node = bpf_rbtree_remove(&flow->queue, &skbn->node); 573 bpf_spin_unlock(&flow->lock); 574 575 if (!rb_node) 576 goto out; 577 578 skbn = container_of(rb_node, struct skb_node, node); 579 skb = bpf_kptr_xchg(&skbn->skb, skb); 580 bpf_obj_drop(skbn); 581 582 out: 583 bpf_kptr_xchg_back(&sflow->flow, flow); 584 585 return skb; 586 } 587 588 SEC("struct_ops/bpf_fq_dequeue") 589 struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch) 590 { 591 struct dequeue_nonprio_ctx cb_ctx = {}; 592 struct sk_buff *skb = NULL; 593 int i; 594 595 if (!sch->q.qlen) 596 goto out; 597 598 skb = fq_dequeue_prio(); 599 if (skb) 600 goto dequeue; 601 602 q.ktime_cache = cb_ctx.now = bpf_ktime_get_ns(); 603 fq_check_throttled(q.ktime_cache); 604 bpf_for(i, 0, sch->limit) { 605 skb = fq_dequeue_nonprio_flows(i, &cb_ctx); 606 if (cb_ctx.stop_iter) 607 break; 608 }; 609 610 if (skb) { 611 dequeue: 612 sch->q.qlen--; 613 sch->qstats.backlog -= qdisc_pkt_len(skb); 614 bpf_qdisc_bstats_update(sch, skb); 615 return skb; 616 } 617 618 if (cb_ctx.expire) 619 bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q.timer_slack); 620 out: 621 return NULL; 622 } 623 624 static int fq_remove_flows_in_list(u32 index, void *ctx) 625 { 626 struct bpf_list_node *node; 627 struct fq_flow_node *flow; 628 629 bpf_spin_lock(&fq_new_flows_lock); 630 node = bpf_list_pop_front(&fq_new_flows); 631 bpf_spin_unlock(&fq_new_flows_lock); 632 if (!node) { 633 bpf_spin_lock(&fq_old_flows_lock); 634 node = bpf_list_pop_front(&fq_old_flows); 635 bpf_spin_unlock(&fq_old_flows_lock); 636 if (!node) 637 return 1; 638 } 639 640 flow = container_of(node, struct fq_flow_node, list_node); 641 bpf_obj_drop(flow); 642 643 return 0; 644 } 645 646 extern unsigned CONFIG_HZ __kconfig; 647 648 /* limit number of collected flows per round */ 649 #define FQ_GC_MAX 8 650 #define FQ_GC_AGE (3*CONFIG_HZ) 651 652 static bool fq_gc_candidate(struct fq_flow_node *flow) 653 { 654 u64 jiffies = bpf_jiffies64(); 655 656 return fq_flow_is_detached(flow) && 657 ((s64)(jiffies - (flow->age + FQ_GC_AGE)) > 0); 658 } 659 660 static int 661 fq_remove_flows(struct bpf_map *flow_map, u64 *hash, 662 struct fq_stashed_flow *sflow, struct remove_flows_ctx *ctx) 663 { 664 if (sflow->flow && 665 (!ctx->gc_only || fq_gc_candidate(sflow->flow))) { 666 bpf_map_delete_elem(flow_map, hash); 667 ctx->reset_cnt++; 668 } 669 670 return ctx->reset_cnt < ctx->reset_max ? 0 : 1; 671 } 672 673 static void fq_gc(void) 674 { 675 struct remove_flows_ctx cb_ctx = { 676 .gc_only = true, 677 .reset_cnt = 0, 678 .reset_max = FQ_GC_MAX, 679 }; 680 681 bpf_for_each_map_elem(&fq_nonprio_flows, fq_remove_flows, &cb_ctx, 0); 682 } 683 684 SEC("struct_ops/bpf_fq_reset") 685 void BPF_PROG(bpf_fq_reset, struct Qdisc *sch) 686 { 687 struct unset_throttled_flows_ctx utf_ctx = { 688 .unset_all = true, 689 }; 690 struct remove_flows_ctx rf_ctx = { 691 .gc_only = false, 692 .reset_cnt = 0, 693 .reset_max = NUM_QUEUE, 694 }; 695 struct fq_stashed_flow *sflow; 696 u64 hash = 0; 697 698 sch->q.qlen = 0; 699 sch->qstats.backlog = 0; 700 701 bpf_for_each_map_elem(&fq_nonprio_flows, fq_remove_flows, &rf_ctx, 0); 702 703 rf_ctx.reset_cnt = 0; 704 bpf_for_each_map_elem(&fq_prio_flows, fq_remove_flows, &rf_ctx, 0); 705 fq_new_flow(&fq_prio_flows, &sflow, hash); 706 707 bpf_loop(NUM_QUEUE, fq_remove_flows_in_list, NULL, 0); 708 q.new_flow_cnt = 0; 709 q.old_flow_cnt = 0; 710 711 bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &utf_ctx, 0); 712 } 713 714 SEC("struct_ops/bpf_fq_init") 715 int BPF_PROG(bpf_fq_init, struct Qdisc *sch, struct nlattr *opt, 716 struct netlink_ext_ack *extack) 717 { 718 struct net_device *dev = sch->dev_queue->dev; 719 u32 psched_mtu = dev->mtu + dev->hard_header_len; 720 struct fq_stashed_flow *sflow; 721 u64 hash = 0; 722 723 if (fq_new_flow(&fq_prio_flows, &sflow, hash) < 0) 724 return -ENOMEM; 725 726 sch->limit = 10000; 727 q.initial_quantum = 10 * psched_mtu; 728 q.quantum = 2 * psched_mtu; 729 q.flow_refill_delay = 40; 730 q.flow_plimit = 100; 731 q.horizon = 10ULL * NSEC_PER_SEC; 732 q.horizon_drop = 1; 733 q.orphan_mask = 1024 - 1; 734 q.timer_slack = 10 * NSEC_PER_USEC; 735 q.time_next_delayed_flow = ~0ULL; 736 q.unthrottle_latency_ns = 0ULL; 737 q.new_flow_cnt = 0; 738 q.old_flow_cnt = 0; 739 740 return 0; 741 } 742 743 SEC("struct_ops") 744 void BPF_PROG(bpf_fq_destroy, struct Qdisc *sch) 745 { 746 } 747 748 SEC(".struct_ops") 749 struct Qdisc_ops fq = { 750 .enqueue = (void *)bpf_fq_enqueue, 751 .dequeue = (void *)bpf_fq_dequeue, 752 .reset = (void *)bpf_fq_reset, 753 .init = (void *)bpf_fq_init, 754 .destroy = (void *)bpf_fq_destroy, 755 .id = "bpf_fq", 756 }; 757