1 // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 2 /* 3 * Ring buffer operations. 4 * 5 * Copyright (C) 2020 Facebook, Inc. 6 */ 7 #ifndef _GNU_SOURCE 8 #define _GNU_SOURCE 9 #endif 10 #include <stdlib.h> 11 #include <stdio.h> 12 #include <errno.h> 13 #include <unistd.h> 14 #include <linux/err.h> 15 #include <linux/bpf.h> 16 #include <asm/barrier.h> 17 #include <sys/mman.h> 18 #include <sys/epoll.h> 19 #include <time.h> 20 21 #include "libbpf.h" 22 #include "libbpf_internal.h" 23 #include "bpf.h" 24 #include "str_error.h" 25 26 struct ring { 27 ring_buffer_sample_fn sample_cb; 28 void *ctx; 29 void *data; 30 unsigned long *consumer_pos; 31 unsigned long *producer_pos; 32 unsigned long mask; 33 int map_fd; 34 }; 35 36 struct ring_buffer { 37 struct epoll_event *events; 38 struct ring **rings; 39 size_t page_size; 40 int epoll_fd; 41 int ring_cnt; 42 }; 43 44 struct user_ring_buffer { 45 struct epoll_event event; 46 unsigned long *consumer_pos; 47 unsigned long *producer_pos; 48 void *data; 49 unsigned long mask; 50 size_t page_size; 51 int map_fd; 52 int epoll_fd; 53 }; 54 55 /* 8-byte ring buffer header structure */ 56 struct ringbuf_hdr { 57 __u32 len; 58 __u32 pad; 59 }; 60 61 static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r) 62 { 63 if (r->consumer_pos) { 64 munmap(r->consumer_pos, rb->page_size); 65 r->consumer_pos = NULL; 66 } 67 if (r->producer_pos) { 68 munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); 69 r->producer_pos = NULL; 70 } 71 72 free(r); 73 } 74 75 /* Add extra RINGBUF maps to this ring buffer manager */ 76 int ring_buffer__add(struct ring_buffer *rb, int map_fd, 77 ring_buffer_sample_fn sample_cb, void *ctx) 78 { 79 struct bpf_map_info info; 80 __u32 len = sizeof(info); 81 struct epoll_event *e; 82 struct ring *r; 83 __u64 mmap_sz; 84 void *tmp; 85 int err; 86 87 memset(&info, 0, sizeof(info)); 88 89 err = bpf_map_get_info_by_fd(map_fd, &info, &len); 90 if (err) { 91 err = -errno; 92 pr_warn("ringbuf: failed to get map info for fd=%d: %s\n", 93 map_fd, errstr(err)); 94 return libbpf_err(err); 95 } 96 97 if (info.type != BPF_MAP_TYPE_RINGBUF) { 98 pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n", 99 map_fd); 100 return libbpf_err(-EINVAL); 101 } 102 103 tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings)); 104 if (!tmp) 105 return libbpf_err(-ENOMEM); 106 rb->rings = tmp; 107 108 tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events)); 109 if (!tmp) 110 return libbpf_err(-ENOMEM); 111 rb->events = tmp; 112 113 r = calloc(1, sizeof(*r)); 114 if (!r) 115 return libbpf_err(-ENOMEM); 116 rb->rings[rb->ring_cnt] = r; 117 118 r->map_fd = map_fd; 119 r->sample_cb = sample_cb; 120 r->ctx = ctx; 121 r->mask = info.max_entries - 1; 122 123 /* Map writable consumer page */ 124 tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0); 125 if (tmp == MAP_FAILED) { 126 err = -errno; 127 pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %s\n", 128 map_fd, errstr(err)); 129 goto err_out; 130 } 131 r->consumer_pos = tmp; 132 133 /* Map read-only producer page and data pages. We map twice as big 134 * data size to allow simple reading of samples that wrap around the 135 * end of a ring buffer. See kernel implementation for details. 136 */ 137 mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; 138 if (mmap_sz != (__u64)(size_t)mmap_sz) { 139 err = -E2BIG; 140 pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries); 141 goto err_out; 142 } 143 tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size); 144 if (tmp == MAP_FAILED) { 145 err = -errno; 146 pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %s\n", 147 map_fd, errstr(err)); 148 goto err_out; 149 } 150 r->producer_pos = tmp; 151 r->data = tmp + rb->page_size; 152 153 e = &rb->events[rb->ring_cnt]; 154 memset(e, 0, sizeof(*e)); 155 156 e->events = EPOLLIN; 157 e->data.fd = rb->ring_cnt; 158 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { 159 err = -errno; 160 pr_warn("ringbuf: failed to epoll add map fd=%d: %s\n", 161 map_fd, errstr(err)); 162 goto err_out; 163 } 164 165 rb->ring_cnt++; 166 return 0; 167 168 err_out: 169 ringbuf_free_ring(rb, r); 170 return libbpf_err(err); 171 } 172 173 void ring_buffer__free(struct ring_buffer *rb) 174 { 175 int i; 176 177 if (!rb) 178 return; 179 180 for (i = 0; i < rb->ring_cnt; ++i) 181 ringbuf_free_ring(rb, rb->rings[i]); 182 if (rb->epoll_fd >= 0) 183 close(rb->epoll_fd); 184 185 free(rb->events); 186 free(rb->rings); 187 free(rb); 188 } 189 190 struct ring_buffer * 191 ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, 192 const struct ring_buffer_opts *opts) 193 { 194 struct ring_buffer *rb; 195 int err; 196 197 if (!OPTS_VALID(opts, ring_buffer_opts)) 198 return errno = EINVAL, NULL; 199 200 rb = calloc(1, sizeof(*rb)); 201 if (!rb) 202 return errno = ENOMEM, NULL; 203 204 rb->page_size = getpagesize(); 205 206 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 207 if (rb->epoll_fd < 0) { 208 err = -errno; 209 pr_warn("ringbuf: failed to create epoll instance: %s\n", errstr(err)); 210 goto err_out; 211 } 212 213 err = ring_buffer__add(rb, map_fd, sample_cb, ctx); 214 if (err) 215 goto err_out; 216 217 return rb; 218 219 err_out: 220 ring_buffer__free(rb); 221 return errno = -err, NULL; 222 } 223 224 static inline int roundup_len(__u32 len) 225 { 226 /* clear out top 2 bits (discard and busy, if set) */ 227 len <<= 2; 228 len >>= 2; 229 /* add length prefix */ 230 len += BPF_RINGBUF_HDR_SZ; 231 /* round up to 8 byte alignment */ 232 return (len + 7) / 8 * 8; 233 } 234 235 static int64_t ringbuf_process_ring(struct ring *r, size_t n) 236 { 237 int *len_ptr, len, err; 238 /* 64-bit to avoid overflow in case of extreme application behavior */ 239 int64_t cnt = 0; 240 unsigned long cons_pos, prod_pos; 241 bool got_new_data; 242 void *sample; 243 244 cons_pos = smp_load_acquire(r->consumer_pos); 245 do { 246 got_new_data = false; 247 prod_pos = smp_load_acquire(r->producer_pos); 248 while (cons_pos < prod_pos) { 249 len_ptr = r->data + (cons_pos & r->mask); 250 len = smp_load_acquire(len_ptr); 251 252 /* sample not committed yet, bail out for now */ 253 if (len & BPF_RINGBUF_BUSY_BIT) 254 goto done; 255 256 got_new_data = true; 257 cons_pos += roundup_len(len); 258 259 if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { 260 sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; 261 err = r->sample_cb(r->ctx, sample, len); 262 if (err < 0) { 263 /* update consumer pos and bail out */ 264 smp_store_release(r->consumer_pos, 265 cons_pos); 266 return err; 267 } 268 cnt++; 269 } 270 271 smp_store_release(r->consumer_pos, cons_pos); 272 273 if (cnt >= n) 274 goto done; 275 } 276 } while (got_new_data); 277 done: 278 return cnt; 279 } 280 281 /* Consume available ring buffer(s) data without event polling, up to n 282 * records. 283 * 284 * Returns number of records consumed across all registered ring buffers (or 285 * n, whichever is less), or negative number if any of the callbacks return 286 * error. 287 */ 288 int ring_buffer__consume_n(struct ring_buffer *rb, size_t n) 289 { 290 int64_t err, res = 0; 291 int i; 292 293 for (i = 0; i < rb->ring_cnt; i++) { 294 struct ring *ring = rb->rings[i]; 295 296 err = ringbuf_process_ring(ring, n); 297 if (err < 0) 298 return libbpf_err(err); 299 res += err; 300 n -= err; 301 302 if (n == 0) 303 break; 304 } 305 return res > INT_MAX ? INT_MAX : res; 306 } 307 308 /* Consume available ring buffer(s) data without event polling. 309 * Returns number of records consumed across all registered ring buffers (or 310 * INT_MAX, whichever is less), or negative number if any of the callbacks 311 * return error. 312 */ 313 int ring_buffer__consume(struct ring_buffer *rb) 314 { 315 int64_t err, res = 0; 316 int i; 317 318 for (i = 0; i < rb->ring_cnt; i++) { 319 struct ring *ring = rb->rings[i]; 320 321 err = ringbuf_process_ring(ring, INT_MAX); 322 if (err < 0) 323 return libbpf_err(err); 324 res += err; 325 if (res > INT_MAX) { 326 res = INT_MAX; 327 break; 328 } 329 } 330 return res; 331 } 332 333 /* Poll for available data and consume records, if any are available. 334 * Returns number of records consumed (or INT_MAX, whichever is less), or 335 * negative number, if any of the registered callbacks returned error. 336 */ 337 int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) 338 { 339 int i, cnt; 340 int64_t err, res = 0; 341 342 cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); 343 if (cnt < 0) 344 return libbpf_err(-errno); 345 346 for (i = 0; i < cnt; i++) { 347 __u32 ring_id = rb->events[i].data.fd; 348 struct ring *ring = rb->rings[ring_id]; 349 350 err = ringbuf_process_ring(ring, INT_MAX); 351 if (err < 0) 352 return libbpf_err(err); 353 res += err; 354 } 355 if (res > INT_MAX) 356 res = INT_MAX; 357 return res; 358 } 359 360 /* Get an fd that can be used to sleep until data is available in the ring(s) */ 361 int ring_buffer__epoll_fd(const struct ring_buffer *rb) 362 { 363 return rb->epoll_fd; 364 } 365 366 struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx) 367 { 368 if (idx >= rb->ring_cnt) 369 return errno = ERANGE, NULL; 370 371 return rb->rings[idx]; 372 } 373 374 unsigned long ring__consumer_pos(const struct ring *r) 375 { 376 /* Synchronizes with smp_store_release() in ringbuf_process_ring(). */ 377 return smp_load_acquire(r->consumer_pos); 378 } 379 380 unsigned long ring__producer_pos(const struct ring *r) 381 { 382 /* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in 383 * the kernel. 384 */ 385 return smp_load_acquire(r->producer_pos); 386 } 387 388 size_t ring__avail_data_size(const struct ring *r) 389 { 390 unsigned long cons_pos, prod_pos; 391 392 cons_pos = ring__consumer_pos(r); 393 prod_pos = ring__producer_pos(r); 394 return prod_pos - cons_pos; 395 } 396 397 size_t ring__size(const struct ring *r) 398 { 399 return r->mask + 1; 400 } 401 402 int ring__map_fd(const struct ring *r) 403 { 404 return r->map_fd; 405 } 406 407 int ring__consume_n(struct ring *r, size_t n) 408 { 409 int64_t res; 410 411 res = ringbuf_process_ring(r, n); 412 if (res < 0) 413 return libbpf_err(res); 414 415 return res > INT_MAX ? INT_MAX : res; 416 } 417 418 int ring__consume(struct ring *r) 419 { 420 return ring__consume_n(r, INT_MAX); 421 } 422 423 static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) 424 { 425 if (rb->consumer_pos) { 426 munmap(rb->consumer_pos, rb->page_size); 427 rb->consumer_pos = NULL; 428 } 429 if (rb->producer_pos) { 430 munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); 431 rb->producer_pos = NULL; 432 } 433 } 434 435 void user_ring_buffer__free(struct user_ring_buffer *rb) 436 { 437 if (!rb) 438 return; 439 440 user_ringbuf_unmap_ring(rb); 441 442 if (rb->epoll_fd >= 0) 443 close(rb->epoll_fd); 444 445 free(rb); 446 } 447 448 static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd) 449 { 450 struct bpf_map_info info; 451 __u32 len = sizeof(info); 452 __u64 mmap_sz; 453 void *tmp; 454 struct epoll_event *rb_epoll; 455 int err; 456 457 memset(&info, 0, sizeof(info)); 458 459 err = bpf_map_get_info_by_fd(map_fd, &info, &len); 460 if (err) { 461 err = -errno; 462 pr_warn("user ringbuf: failed to get map info for fd=%d: %s\n", 463 map_fd, errstr(err)); 464 return err; 465 } 466 467 if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { 468 pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd); 469 return -EINVAL; 470 } 471 472 rb->map_fd = map_fd; 473 rb->mask = info.max_entries - 1; 474 475 /* Map read-only consumer page */ 476 tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); 477 if (tmp == MAP_FAILED) { 478 err = -errno; 479 pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %s\n", 480 map_fd, errstr(err)); 481 return err; 482 } 483 rb->consumer_pos = tmp; 484 485 /* Map read-write the producer page and data pages. We map the data 486 * region as twice the total size of the ring buffer to allow the 487 * simple reading and writing of samples that wrap around the end of 488 * the buffer. See the kernel implementation for details. 489 */ 490 mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; 491 if (mmap_sz != (__u64)(size_t)mmap_sz) { 492 pr_warn("user ringbuf: ring buf size (%u) is too big\n", info.max_entries); 493 return -E2BIG; 494 } 495 tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED, 496 map_fd, rb->page_size); 497 if (tmp == MAP_FAILED) { 498 err = -errno; 499 pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %s\n", 500 map_fd, errstr(err)); 501 return err; 502 } 503 504 rb->producer_pos = tmp; 505 rb->data = tmp + rb->page_size; 506 507 rb_epoll = &rb->event; 508 rb_epoll->events = EPOLLOUT; 509 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { 510 err = -errno; 511 pr_warn("user ringbuf: failed to epoll add map fd=%d: %s\n", map_fd, errstr(err)); 512 return err; 513 } 514 515 return 0; 516 } 517 518 struct user_ring_buffer * 519 user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) 520 { 521 struct user_ring_buffer *rb; 522 int err; 523 524 if (!OPTS_VALID(opts, user_ring_buffer_opts)) 525 return errno = EINVAL, NULL; 526 527 rb = calloc(1, sizeof(*rb)); 528 if (!rb) 529 return errno = ENOMEM, NULL; 530 531 rb->page_size = getpagesize(); 532 533 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 534 if (rb->epoll_fd < 0) { 535 err = -errno; 536 pr_warn("user ringbuf: failed to create epoll instance: %s\n", errstr(err)); 537 goto err_out; 538 } 539 540 err = user_ringbuf_map(rb, map_fd); 541 if (err) 542 goto err_out; 543 544 return rb; 545 546 err_out: 547 user_ring_buffer__free(rb); 548 return errno = -err, NULL; 549 } 550 551 static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard) 552 { 553 __u32 new_len; 554 struct ringbuf_hdr *hdr; 555 uintptr_t hdr_offset; 556 557 hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ; 558 hdr = rb->data + (hdr_offset & rb->mask); 559 560 new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; 561 if (discard) 562 new_len |= BPF_RINGBUF_DISCARD_BIT; 563 564 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 565 * the kernel. 566 */ 567 __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); 568 } 569 570 void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) 571 { 572 user_ringbuf_commit(rb, sample, true); 573 } 574 575 void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) 576 { 577 user_ringbuf_commit(rb, sample, false); 578 } 579 580 void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) 581 { 582 __u32 avail_size, total_size, max_size; 583 /* 64-bit to avoid overflow in case of extreme application behavior */ 584 __u64 cons_pos, prod_pos; 585 struct ringbuf_hdr *hdr; 586 587 /* The top two bits are used as special flags */ 588 if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)) 589 return errno = E2BIG, NULL; 590 591 /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in 592 * the kernel. 593 */ 594 cons_pos = smp_load_acquire(rb->consumer_pos); 595 /* Synchronizes with smp_store_release() in user_ringbuf_commit() */ 596 prod_pos = smp_load_acquire(rb->producer_pos); 597 598 max_size = rb->mask + 1; 599 avail_size = max_size - (prod_pos - cons_pos); 600 /* Round up total size to a multiple of 8. */ 601 total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; 602 603 if (total_size > max_size) 604 return errno = E2BIG, NULL; 605 606 if (avail_size < total_size) 607 return errno = ENOSPC, NULL; 608 609 hdr = rb->data + (prod_pos & rb->mask); 610 hdr->len = size | BPF_RINGBUF_BUSY_BIT; 611 hdr->pad = 0; 612 613 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 614 * the kernel. 615 */ 616 smp_store_release(rb->producer_pos, prod_pos + total_size); 617 618 return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); 619 } 620 621 static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end) 622 { 623 __u64 start_ns, end_ns, ns_per_s = 1000000000; 624 625 start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec; 626 end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec; 627 628 return end_ns - start_ns; 629 } 630 631 void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) 632 { 633 void *sample; 634 int err, ms_remaining = timeout_ms; 635 struct timespec start; 636 637 if (timeout_ms < 0 && timeout_ms != -1) 638 return errno = EINVAL, NULL; 639 640 if (timeout_ms != -1) { 641 err = clock_gettime(CLOCK_MONOTONIC, &start); 642 if (err) 643 return NULL; 644 } 645 646 do { 647 int cnt, ms_elapsed; 648 struct timespec curr; 649 __u64 ns_per_ms = 1000000; 650 651 sample = user_ring_buffer__reserve(rb, size); 652 if (sample) 653 return sample; 654 else if (errno != ENOSPC) 655 return NULL; 656 657 /* The kernel guarantees at least one event notification 658 * delivery whenever at least one sample is drained from the 659 * ring buffer in an invocation to bpf_ringbuf_drain(). Other 660 * additional events may be delivered at any time, but only one 661 * event is guaranteed per bpf_ringbuf_drain() invocation, 662 * provided that a sample is drained, and the BPF program did 663 * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If 664 * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a 665 * wakeup event will be delivered even if no samples are 666 * drained. 667 */ 668 cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); 669 if (cnt < 0) 670 return NULL; 671 672 if (timeout_ms == -1) 673 continue; 674 675 err = clock_gettime(CLOCK_MONOTONIC, &curr); 676 if (err) 677 return NULL; 678 679 ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms; 680 ms_remaining = timeout_ms - ms_elapsed; 681 } while (ms_remaining > 0); 682 683 /* Try one more time to reserve a sample after the specified timeout has elapsed. */ 684 return user_ring_buffer__reserve(rb, size); 685 } 686