1 // SPDX-License-Identifier: GPL-2.0-only 2 /* bpf/cpumap.c 3 * 4 * Copyright (c) 2017 Jesper Dangaard Brouer, Red Hat Inc. 5 */ 6 7 /** 8 * DOC: cpu map 9 * The 'cpumap' is primarily used as a backend map for XDP BPF helper 10 * call bpf_redirect_map() and XDP_REDIRECT action, like 'devmap'. 11 * 12 * Unlike devmap which redirects XDP frames out to another NIC device, 13 * this map type redirects raw XDP frames to another CPU. The remote 14 * CPU will do SKB-allocation and call the normal network stack. 15 */ 16 /* 17 * This is a scalability and isolation mechanism, that allow 18 * separating the early driver network XDP layer, from the rest of the 19 * netstack, and assigning dedicated CPUs for this stage. This 20 * basically allows for 10G wirespeed pre-filtering via bpf. 21 */ 22 #include <linux/bitops.h> 23 #include <linux/bpf.h> 24 #include <linux/filter.h> 25 #include <linux/ptr_ring.h> 26 #include <net/xdp.h> 27 #include <net/hotdata.h> 28 29 #include <linux/sched.h> 30 #include <linux/workqueue.h> 31 #include <linux/kthread.h> 32 #include <linux/completion.h> 33 #include <trace/events/xdp.h> 34 #include <linux/btf_ids.h> 35 36 #include <linux/netdevice.h> /* netif_receive_skb_list */ 37 #include <linux/etherdevice.h> /* eth_type_trans */ 38 39 /* General idea: XDP packets getting XDP redirected to another CPU, 40 * will maximum be stored/queued for one driver ->poll() call. It is 41 * guaranteed that queueing the frame and the flush operation happen on 42 * same CPU. Thus, cpu_map_flush operation can deduct via this_cpu_ptr() 43 * which queue in bpf_cpu_map_entry contains packets. 44 */ 45 46 #define CPU_MAP_BULK_SIZE 8 /* 8 == one cacheline on 64-bit archs */ 47 struct bpf_cpu_map_entry; 48 struct bpf_cpu_map; 49 50 struct xdp_bulk_queue { 51 void *q[CPU_MAP_BULK_SIZE]; 52 struct list_head flush_node; 53 struct bpf_cpu_map_entry *obj; 54 unsigned int count; 55 }; 56 57 /* Struct for every remote "destination" CPU in map */ 58 struct bpf_cpu_map_entry { 59 u32 cpu; /* kthread CPU and map index */ 60 int map_id; /* Back reference to map */ 61 62 /* XDP can run multiple RX-ring queues, need __percpu enqueue store */ 63 struct xdp_bulk_queue __percpu *bulkq; 64 65 /* Queue with potential multi-producers, and single-consumer kthread */ 66 struct ptr_ring *queue; 67 struct task_struct *kthread; 68 69 struct bpf_cpumap_val value; 70 struct bpf_prog *prog; 71 72 struct completion kthread_running; 73 struct rcu_work free_work; 74 }; 75 76 struct bpf_cpu_map { 77 struct bpf_map map; 78 /* Below members specific for map type */ 79 struct bpf_cpu_map_entry __rcu **cpu_map; 80 }; 81 82 static struct bpf_map *cpu_map_alloc(union bpf_attr *attr) 83 { 84 u32 value_size = attr->value_size; 85 struct bpf_cpu_map *cmap; 86 87 /* check sanity of attributes */ 88 if (attr->max_entries == 0 || attr->key_size != 4 || 89 (value_size != offsetofend(struct bpf_cpumap_val, qsize) && 90 value_size != offsetofend(struct bpf_cpumap_val, bpf_prog.fd)) || 91 attr->map_flags & ~BPF_F_NUMA_NODE) 92 return ERR_PTR(-EINVAL); 93 94 /* Pre-limit array size based on NR_CPUS, not final CPU check */ 95 if (attr->max_entries > NR_CPUS) 96 return ERR_PTR(-E2BIG); 97 98 cmap = bpf_map_area_alloc(sizeof(*cmap), NUMA_NO_NODE); 99 if (!cmap) 100 return ERR_PTR(-ENOMEM); 101 102 bpf_map_init_from_attr(&cmap->map, attr); 103 104 /* Alloc array for possible remote "destination" CPUs */ 105 cmap->cpu_map = bpf_map_area_alloc(cmap->map.max_entries * 106 sizeof(struct bpf_cpu_map_entry *), 107 cmap->map.numa_node); 108 if (!cmap->cpu_map) { 109 bpf_map_area_free(cmap); 110 return ERR_PTR(-ENOMEM); 111 } 112 113 return &cmap->map; 114 } 115 116 static void __cpu_map_ring_cleanup(struct ptr_ring *ring) 117 { 118 /* The tear-down procedure should have made sure that queue is 119 * empty. See __cpu_map_entry_replace() and work-queue 120 * invoked cpu_map_kthread_stop(). Catch any broken behaviour 121 * gracefully and warn once. 122 */ 123 void *ptr; 124 125 while ((ptr = ptr_ring_consume(ring))) { 126 WARN_ON_ONCE(1); 127 if (unlikely(__ptr_test_bit(0, &ptr))) { 128 __ptr_clear_bit(0, &ptr); 129 kfree_skb(ptr); 130 continue; 131 } 132 xdp_return_frame(ptr); 133 } 134 } 135 136 static void cpu_map_bpf_prog_run_skb(struct bpf_cpu_map_entry *rcpu, 137 struct list_head *listp, 138 struct xdp_cpumap_stats *stats) 139 { 140 struct sk_buff *skb, *tmp; 141 struct xdp_buff xdp; 142 u32 act; 143 int err; 144 145 list_for_each_entry_safe(skb, tmp, listp, list) { 146 act = bpf_prog_run_generic_xdp(skb, &xdp, rcpu->prog); 147 switch (act) { 148 case XDP_PASS: 149 break; 150 case XDP_REDIRECT: 151 skb_list_del_init(skb); 152 err = xdp_do_generic_redirect(skb->dev, skb, &xdp, 153 rcpu->prog); 154 if (unlikely(err)) { 155 kfree_skb(skb); 156 stats->drop++; 157 } else { 158 stats->redirect++; 159 } 160 return; 161 default: 162 bpf_warn_invalid_xdp_action(NULL, rcpu->prog, act); 163 fallthrough; 164 case XDP_ABORTED: 165 trace_xdp_exception(skb->dev, rcpu->prog, act); 166 fallthrough; 167 case XDP_DROP: 168 skb_list_del_init(skb); 169 kfree_skb(skb); 170 stats->drop++; 171 return; 172 } 173 } 174 } 175 176 static int cpu_map_bpf_prog_run_xdp(struct bpf_cpu_map_entry *rcpu, 177 void **frames, int n, 178 struct xdp_cpumap_stats *stats) 179 { 180 struct xdp_rxq_info rxq = {}; 181 struct xdp_buff xdp; 182 int i, nframes = 0; 183 184 xdp_set_return_frame_no_direct(); 185 xdp.rxq = &rxq; 186 187 for (i = 0; i < n; i++) { 188 struct xdp_frame *xdpf = frames[i]; 189 u32 act; 190 int err; 191 192 rxq.dev = xdpf->dev_rx; 193 rxq.mem = xdpf->mem; 194 /* TODO: report queue_index to xdp_rxq_info */ 195 196 xdp_convert_frame_to_buff(xdpf, &xdp); 197 198 act = bpf_prog_run_xdp(rcpu->prog, &xdp); 199 switch (act) { 200 case XDP_PASS: 201 err = xdp_update_frame_from_buff(&xdp, xdpf); 202 if (err < 0) { 203 xdp_return_frame(xdpf); 204 stats->drop++; 205 } else { 206 frames[nframes++] = xdpf; 207 stats->pass++; 208 } 209 break; 210 case XDP_REDIRECT: 211 err = xdp_do_redirect(xdpf->dev_rx, &xdp, 212 rcpu->prog); 213 if (unlikely(err)) { 214 xdp_return_frame(xdpf); 215 stats->drop++; 216 } else { 217 stats->redirect++; 218 } 219 break; 220 default: 221 bpf_warn_invalid_xdp_action(NULL, rcpu->prog, act); 222 fallthrough; 223 case XDP_DROP: 224 xdp_return_frame(xdpf); 225 stats->drop++; 226 break; 227 } 228 } 229 230 xdp_clear_return_frame_no_direct(); 231 232 return nframes; 233 } 234 235 #define CPUMAP_BATCH 8 236 237 static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames, 238 int xdp_n, struct xdp_cpumap_stats *stats, 239 struct list_head *list) 240 { 241 struct bpf_net_context __bpf_net_ctx, *bpf_net_ctx; 242 int nframes; 243 244 if (!rcpu->prog) 245 return xdp_n; 246 247 rcu_read_lock_bh(); 248 bpf_net_ctx = bpf_net_ctx_set(&__bpf_net_ctx); 249 250 nframes = cpu_map_bpf_prog_run_xdp(rcpu, frames, xdp_n, stats); 251 252 if (stats->redirect) 253 xdp_do_flush(); 254 255 if (unlikely(!list_empty(list))) 256 cpu_map_bpf_prog_run_skb(rcpu, list, stats); 257 258 bpf_net_ctx_clear(bpf_net_ctx); 259 rcu_read_unlock_bh(); /* resched point, may call do_softirq() */ 260 261 return nframes; 262 } 263 264 static int cpu_map_kthread_run(void *data) 265 { 266 struct bpf_cpu_map_entry *rcpu = data; 267 unsigned long last_qs = jiffies; 268 269 complete(&rcpu->kthread_running); 270 set_current_state(TASK_INTERRUPTIBLE); 271 272 /* When kthread gives stop order, then rcpu have been disconnected 273 * from map, thus no new packets can enter. Remaining in-flight 274 * per CPU stored packets are flushed to this queue. Wait honoring 275 * kthread_stop signal until queue is empty. 276 */ 277 while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) { 278 struct xdp_cpumap_stats stats = {}; /* zero stats */ 279 unsigned int kmem_alloc_drops = 0, sched = 0; 280 gfp_t gfp = __GFP_ZERO | GFP_ATOMIC; 281 int i, n, m, nframes, xdp_n; 282 void *frames[CPUMAP_BATCH]; 283 void *skbs[CPUMAP_BATCH]; 284 LIST_HEAD(list); 285 286 /* Release CPU reschedule checks */ 287 if (__ptr_ring_empty(rcpu->queue)) { 288 set_current_state(TASK_INTERRUPTIBLE); 289 /* Recheck to avoid lost wake-up */ 290 if (__ptr_ring_empty(rcpu->queue)) { 291 schedule(); 292 sched = 1; 293 last_qs = jiffies; 294 } else { 295 __set_current_state(TASK_RUNNING); 296 } 297 } else { 298 rcu_softirq_qs_periodic(last_qs); 299 sched = cond_resched(); 300 } 301 302 /* 303 * The bpf_cpu_map_entry is single consumer, with this 304 * kthread CPU pinned. Lockless access to ptr_ring 305 * consume side valid as no-resize allowed of queue. 306 */ 307 n = __ptr_ring_consume_batched(rcpu->queue, frames, 308 CPUMAP_BATCH); 309 for (i = 0, xdp_n = 0; i < n; i++) { 310 void *f = frames[i]; 311 struct page *page; 312 313 if (unlikely(__ptr_test_bit(0, &f))) { 314 struct sk_buff *skb = f; 315 316 __ptr_clear_bit(0, &skb); 317 list_add_tail(&skb->list, &list); 318 continue; 319 } 320 321 frames[xdp_n++] = f; 322 page = virt_to_page(f); 323 324 /* Bring struct page memory area to curr CPU. Read by 325 * build_skb_around via page_is_pfmemalloc(), and when 326 * freed written by page_frag_free call. 327 */ 328 prefetchw(page); 329 } 330 331 /* Support running another XDP prog on this CPU */ 332 nframes = cpu_map_bpf_prog_run(rcpu, frames, xdp_n, &stats, &list); 333 if (nframes) { 334 m = kmem_cache_alloc_bulk(net_hotdata.skbuff_cache, 335 gfp, nframes, skbs); 336 if (unlikely(m == 0)) { 337 for (i = 0; i < nframes; i++) 338 skbs[i] = NULL; /* effect: xdp_return_frame */ 339 kmem_alloc_drops += nframes; 340 } 341 } 342 343 local_bh_disable(); 344 for (i = 0; i < nframes; i++) { 345 struct xdp_frame *xdpf = frames[i]; 346 struct sk_buff *skb = skbs[i]; 347 348 skb = __xdp_build_skb_from_frame(xdpf, skb, 349 xdpf->dev_rx); 350 if (!skb) { 351 xdp_return_frame(xdpf); 352 continue; 353 } 354 355 list_add_tail(&skb->list, &list); 356 } 357 netif_receive_skb_list(&list); 358 359 /* Feedback loop via tracepoint */ 360 trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops, 361 sched, &stats); 362 363 local_bh_enable(); /* resched point, may call do_softirq() */ 364 } 365 __set_current_state(TASK_RUNNING); 366 367 return 0; 368 } 369 370 static int __cpu_map_load_bpf_program(struct bpf_cpu_map_entry *rcpu, 371 struct bpf_map *map, int fd) 372 { 373 struct bpf_prog *prog; 374 375 prog = bpf_prog_get_type(fd, BPF_PROG_TYPE_XDP); 376 if (IS_ERR(prog)) 377 return PTR_ERR(prog); 378 379 if (prog->expected_attach_type != BPF_XDP_CPUMAP || 380 !bpf_prog_map_compatible(map, prog)) { 381 bpf_prog_put(prog); 382 return -EINVAL; 383 } 384 385 rcpu->value.bpf_prog.id = prog->aux->id; 386 rcpu->prog = prog; 387 388 return 0; 389 } 390 391 static struct bpf_cpu_map_entry * 392 __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value, 393 u32 cpu) 394 { 395 int numa, err, i, fd = value->bpf_prog.fd; 396 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; 397 struct bpf_cpu_map_entry *rcpu; 398 struct xdp_bulk_queue *bq; 399 400 /* Have map->numa_node, but choose node of redirect target CPU */ 401 numa = cpu_to_node(cpu); 402 403 rcpu = bpf_map_kmalloc_node(map, sizeof(*rcpu), gfp | __GFP_ZERO, numa); 404 if (!rcpu) 405 return NULL; 406 407 /* Alloc percpu bulkq */ 408 rcpu->bulkq = bpf_map_alloc_percpu(map, sizeof(*rcpu->bulkq), 409 sizeof(void *), gfp); 410 if (!rcpu->bulkq) 411 goto free_rcu; 412 413 for_each_possible_cpu(i) { 414 bq = per_cpu_ptr(rcpu->bulkq, i); 415 bq->obj = rcpu; 416 } 417 418 /* Alloc queue */ 419 rcpu->queue = bpf_map_kmalloc_node(map, sizeof(*rcpu->queue), gfp, 420 numa); 421 if (!rcpu->queue) 422 goto free_bulkq; 423 424 err = ptr_ring_init(rcpu->queue, value->qsize, gfp); 425 if (err) 426 goto free_queue; 427 428 rcpu->cpu = cpu; 429 rcpu->map_id = map->id; 430 rcpu->value.qsize = value->qsize; 431 432 if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd)) 433 goto free_ptr_ring; 434 435 /* Setup kthread */ 436 init_completion(&rcpu->kthread_running); 437 rcpu->kthread = kthread_create_on_node(cpu_map_kthread_run, rcpu, numa, 438 "cpumap/%d/map:%d", cpu, 439 map->id); 440 if (IS_ERR(rcpu->kthread)) 441 goto free_prog; 442 443 /* Make sure kthread runs on a single CPU */ 444 kthread_bind(rcpu->kthread, cpu); 445 wake_up_process(rcpu->kthread); 446 447 /* Make sure kthread has been running, so kthread_stop() will not 448 * stop the kthread prematurely and all pending frames or skbs 449 * will be handled by the kthread before kthread_stop() returns. 450 */ 451 wait_for_completion(&rcpu->kthread_running); 452 453 return rcpu; 454 455 free_prog: 456 if (rcpu->prog) 457 bpf_prog_put(rcpu->prog); 458 free_ptr_ring: 459 ptr_ring_cleanup(rcpu->queue, NULL); 460 free_queue: 461 kfree(rcpu->queue); 462 free_bulkq: 463 free_percpu(rcpu->bulkq); 464 free_rcu: 465 kfree(rcpu); 466 return NULL; 467 } 468 469 static void __cpu_map_entry_free(struct work_struct *work) 470 { 471 struct bpf_cpu_map_entry *rcpu; 472 473 /* This cpu_map_entry have been disconnected from map and one 474 * RCU grace-period have elapsed. Thus, XDP cannot queue any 475 * new packets and cannot change/set flush_needed that can 476 * find this entry. 477 */ 478 rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work); 479 480 /* kthread_stop will wake_up_process and wait for it to complete. 481 * cpu_map_kthread_run() makes sure the pointer ring is empty 482 * before exiting. 483 */ 484 kthread_stop(rcpu->kthread); 485 486 if (rcpu->prog) 487 bpf_prog_put(rcpu->prog); 488 /* The queue should be empty at this point */ 489 __cpu_map_ring_cleanup(rcpu->queue); 490 ptr_ring_cleanup(rcpu->queue, NULL); 491 kfree(rcpu->queue); 492 free_percpu(rcpu->bulkq); 493 kfree(rcpu); 494 } 495 496 /* After the xchg of the bpf_cpu_map_entry pointer, we need to make sure the old 497 * entry is no longer in use before freeing. We use queue_rcu_work() to call 498 * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace 499 * period. This means that (a) all pending enqueue and flush operations have 500 * completed (because of the RCU callback), and (b) we are in a workqueue 501 * context where we can stop the kthread and wait for it to exit before freeing 502 * everything. 503 */ 504 static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap, 505 u32 key_cpu, struct bpf_cpu_map_entry *rcpu) 506 { 507 struct bpf_cpu_map_entry *old_rcpu; 508 509 old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu))); 510 if (old_rcpu) { 511 INIT_RCU_WORK(&old_rcpu->free_work, __cpu_map_entry_free); 512 queue_rcu_work(system_wq, &old_rcpu->free_work); 513 } 514 } 515 516 static long cpu_map_delete_elem(struct bpf_map *map, void *key) 517 { 518 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); 519 u32 key_cpu = *(u32 *)key; 520 521 if (key_cpu >= map->max_entries) 522 return -EINVAL; 523 524 /* notice caller map_delete_elem() uses rcu_read_lock() */ 525 __cpu_map_entry_replace(cmap, key_cpu, NULL); 526 return 0; 527 } 528 529 static long cpu_map_update_elem(struct bpf_map *map, void *key, void *value, 530 u64 map_flags) 531 { 532 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); 533 struct bpf_cpumap_val cpumap_value = {}; 534 struct bpf_cpu_map_entry *rcpu; 535 /* Array index key correspond to CPU number */ 536 u32 key_cpu = *(u32 *)key; 537 538 memcpy(&cpumap_value, value, map->value_size); 539 540 if (unlikely(map_flags > BPF_EXIST)) 541 return -EINVAL; 542 if (unlikely(key_cpu >= cmap->map.max_entries)) 543 return -E2BIG; 544 if (unlikely(map_flags == BPF_NOEXIST)) 545 return -EEXIST; 546 if (unlikely(cpumap_value.qsize > 16384)) /* sanity limit on qsize */ 547 return -EOVERFLOW; 548 549 /* Make sure CPU is a valid possible cpu */ 550 if (key_cpu >= nr_cpumask_bits || !cpu_possible(key_cpu)) 551 return -ENODEV; 552 553 if (cpumap_value.qsize == 0) { 554 rcpu = NULL; /* Same as deleting */ 555 } else { 556 /* Updating qsize cause re-allocation of bpf_cpu_map_entry */ 557 rcpu = __cpu_map_entry_alloc(map, &cpumap_value, key_cpu); 558 if (!rcpu) 559 return -ENOMEM; 560 } 561 rcu_read_lock(); 562 __cpu_map_entry_replace(cmap, key_cpu, rcpu); 563 rcu_read_unlock(); 564 return 0; 565 } 566 567 static void cpu_map_free(struct bpf_map *map) 568 { 569 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); 570 u32 i; 571 572 /* At this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0, 573 * so the bpf programs (can be more than one that used this map) were 574 * disconnected from events. Wait for outstanding critical sections in 575 * these programs to complete. synchronize_rcu() below not only 576 * guarantees no further "XDP/bpf-side" reads against 577 * bpf_cpu_map->cpu_map, but also ensure pending flush operations 578 * (if any) are completed. 579 */ 580 synchronize_rcu(); 581 582 /* The only possible user of bpf_cpu_map_entry is 583 * cpu_map_kthread_run(). 584 */ 585 for (i = 0; i < cmap->map.max_entries; i++) { 586 struct bpf_cpu_map_entry *rcpu; 587 588 rcpu = rcu_dereference_raw(cmap->cpu_map[i]); 589 if (!rcpu) 590 continue; 591 592 /* Stop kthread and cleanup entry directly */ 593 __cpu_map_entry_free(&rcpu->free_work.work); 594 } 595 bpf_map_area_free(cmap->cpu_map); 596 bpf_map_area_free(cmap); 597 } 598 599 /* Elements are kept alive by RCU; either by rcu_read_lock() (from syscall) or 600 * by local_bh_disable() (from XDP calls inside NAPI). The 601 * rcu_read_lock_bh_held() below makes lockdep accept both. 602 */ 603 static void *__cpu_map_lookup_elem(struct bpf_map *map, u32 key) 604 { 605 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); 606 struct bpf_cpu_map_entry *rcpu; 607 608 if (key >= map->max_entries) 609 return NULL; 610 611 rcpu = rcu_dereference_check(cmap->cpu_map[key], 612 rcu_read_lock_bh_held()); 613 return rcpu; 614 } 615 616 static void *cpu_map_lookup_elem(struct bpf_map *map, void *key) 617 { 618 struct bpf_cpu_map_entry *rcpu = 619 __cpu_map_lookup_elem(map, *(u32 *)key); 620 621 return rcpu ? &rcpu->value : NULL; 622 } 623 624 static int cpu_map_get_next_key(struct bpf_map *map, void *key, void *next_key) 625 { 626 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); 627 u32 index = key ? *(u32 *)key : U32_MAX; 628 u32 *next = next_key; 629 630 if (index >= cmap->map.max_entries) { 631 *next = 0; 632 return 0; 633 } 634 635 if (index == cmap->map.max_entries - 1) 636 return -ENOENT; 637 *next = index + 1; 638 return 0; 639 } 640 641 static long cpu_map_redirect(struct bpf_map *map, u64 index, u64 flags) 642 { 643 return __bpf_xdp_redirect_map(map, index, flags, 0, 644 __cpu_map_lookup_elem); 645 } 646 647 static u64 cpu_map_mem_usage(const struct bpf_map *map) 648 { 649 u64 usage = sizeof(struct bpf_cpu_map); 650 651 /* Currently the dynamically allocated elements are not counted */ 652 usage += (u64)map->max_entries * sizeof(struct bpf_cpu_map_entry *); 653 return usage; 654 } 655 656 BTF_ID_LIST_SINGLE(cpu_map_btf_ids, struct, bpf_cpu_map) 657 const struct bpf_map_ops cpu_map_ops = { 658 .map_meta_equal = bpf_map_meta_equal, 659 .map_alloc = cpu_map_alloc, 660 .map_free = cpu_map_free, 661 .map_delete_elem = cpu_map_delete_elem, 662 .map_update_elem = cpu_map_update_elem, 663 .map_lookup_elem = cpu_map_lookup_elem, 664 .map_get_next_key = cpu_map_get_next_key, 665 .map_check_btf = map_check_no_btf, 666 .map_mem_usage = cpu_map_mem_usage, 667 .map_btf_id = &cpu_map_btf_ids[0], 668 .map_redirect = cpu_map_redirect, 669 }; 670 671 static void bq_flush_to_queue(struct xdp_bulk_queue *bq) 672 { 673 struct bpf_cpu_map_entry *rcpu = bq->obj; 674 unsigned int processed = 0, drops = 0; 675 const int to_cpu = rcpu->cpu; 676 struct ptr_ring *q; 677 int i; 678 679 if (unlikely(!bq->count)) 680 return; 681 682 q = rcpu->queue; 683 spin_lock(&q->producer_lock); 684 685 for (i = 0; i < bq->count; i++) { 686 struct xdp_frame *xdpf = bq->q[i]; 687 int err; 688 689 err = __ptr_ring_produce(q, xdpf); 690 if (err) { 691 drops++; 692 xdp_return_frame_rx_napi(xdpf); 693 } 694 processed++; 695 } 696 bq->count = 0; 697 spin_unlock(&q->producer_lock); 698 699 __list_del_clearprev(&bq->flush_node); 700 701 /* Feedback loop via tracepoints */ 702 trace_xdp_cpumap_enqueue(rcpu->map_id, processed, drops, to_cpu); 703 } 704 705 /* Runs under RCU-read-side, plus in softirq under NAPI protection. 706 * Thus, safe percpu variable access. 707 */ 708 static void bq_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf) 709 { 710 struct xdp_bulk_queue *bq = this_cpu_ptr(rcpu->bulkq); 711 712 if (unlikely(bq->count == CPU_MAP_BULK_SIZE)) 713 bq_flush_to_queue(bq); 714 715 /* Notice, xdp_buff/page MUST be queued here, long enough for 716 * driver to code invoking us to finished, due to driver 717 * (e.g. ixgbe) recycle tricks based on page-refcnt. 718 * 719 * Thus, incoming xdp_frame is always queued here (else we race 720 * with another CPU on page-refcnt and remaining driver code). 721 * Queue time is very short, as driver will invoke flush 722 * operation, when completing napi->poll call. 723 */ 724 bq->q[bq->count++] = xdpf; 725 726 if (!bq->flush_node.prev) { 727 struct list_head *flush_list = bpf_net_ctx_get_cpu_map_flush_list(); 728 729 list_add(&bq->flush_node, flush_list); 730 } 731 } 732 733 int cpu_map_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf, 734 struct net_device *dev_rx) 735 { 736 /* Info needed when constructing SKB on remote CPU */ 737 xdpf->dev_rx = dev_rx; 738 739 bq_enqueue(rcpu, xdpf); 740 return 0; 741 } 742 743 int cpu_map_generic_redirect(struct bpf_cpu_map_entry *rcpu, 744 struct sk_buff *skb) 745 { 746 int ret; 747 748 __skb_pull(skb, skb->mac_len); 749 skb_set_redirected(skb, false); 750 __ptr_set_bit(0, &skb); 751 752 ret = ptr_ring_produce(rcpu->queue, skb); 753 if (ret < 0) 754 goto trace; 755 756 wake_up_process(rcpu->kthread); 757 trace: 758 trace_xdp_cpumap_enqueue(rcpu->map_id, !ret, !!ret, rcpu->cpu); 759 return ret; 760 } 761 762 void __cpu_map_flush(struct list_head *flush_list) 763 { 764 struct xdp_bulk_queue *bq, *tmp; 765 766 list_for_each_entry_safe(bq, tmp, flush_list, flush_node) { 767 bq_flush_to_queue(bq); 768 769 /* If already running, costs spin_lock_irqsave + smb_mb */ 770 wake_up_process(bq->obj->kthread); 771 } 772 } 773