1 // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 2 /* 3 * Ring buffer operations. 4 * 5 * Copyright (C) 2020 Facebook, Inc. 6 */ 7 #ifndef _GNU_SOURCE 8 #define _GNU_SOURCE 9 #endif 10 #include <stdlib.h> 11 #include <stdio.h> 12 #include <errno.h> 13 #include <unistd.h> 14 #include <linux/err.h> 15 #include <linux/bpf.h> 16 #include <asm/barrier.h> 17 #include <sys/mman.h> 18 #include <sys/epoll.h> 19 #include <time.h> 20 21 #include "libbpf.h" 22 #include "libbpf_internal.h" 23 #include "bpf.h" 24 25 struct ring { 26 ring_buffer_sample_fn sample_cb; 27 void *ctx; 28 void *data; 29 unsigned long *consumer_pos; 30 unsigned long *producer_pos; 31 unsigned long mask; 32 int map_fd; 33 }; 34 35 struct ring_buffer { 36 struct epoll_event *events; 37 struct ring *rings; 38 size_t page_size; 39 int epoll_fd; 40 int ring_cnt; 41 }; 42 43 struct user_ring_buffer { 44 struct epoll_event event; 45 unsigned long *consumer_pos; 46 unsigned long *producer_pos; 47 void *data; 48 unsigned long mask; 49 size_t page_size; 50 int map_fd; 51 int epoll_fd; 52 }; 53 54 /* 8-byte ring buffer header structure */ 55 struct ringbuf_hdr { 56 __u32 len; 57 __u32 pad; 58 }; 59 60 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) 61 { 62 if (r->consumer_pos) { 63 munmap(r->consumer_pos, rb->page_size); 64 r->consumer_pos = NULL; 65 } 66 if (r->producer_pos) { 67 munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); 68 r->producer_pos = NULL; 69 } 70 } 71 72 /* Add extra RINGBUF maps to this ring buffer manager */ 73 int ring_buffer__add(struct ring_buffer *rb, int map_fd, 74 ring_buffer_sample_fn sample_cb, void *ctx) 75 { 76 struct bpf_map_info info; 77 __u32 len = sizeof(info); 78 struct epoll_event *e; 79 struct ring *r; 80 void *tmp; 81 int err; 82 83 memset(&info, 0, sizeof(info)); 84 85 err = bpf_obj_get_info_by_fd(map_fd, &info, &len); 86 if (err) { 87 err = -errno; 88 pr_warn("ringbuf: failed to get map info for fd=%d: %d\n", 89 map_fd, err); 90 return libbpf_err(err); 91 } 92 93 if (info.type != BPF_MAP_TYPE_RINGBUF) { 94 pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n", 95 map_fd); 96 return libbpf_err(-EINVAL); 97 } 98 99 tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings)); 100 if (!tmp) 101 return libbpf_err(-ENOMEM); 102 rb->rings = tmp; 103 104 tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events)); 105 if (!tmp) 106 return libbpf_err(-ENOMEM); 107 rb->events = tmp; 108 109 r = &rb->rings[rb->ring_cnt]; 110 memset(r, 0, sizeof(*r)); 111 112 r->map_fd = map_fd; 113 r->sample_cb = sample_cb; 114 r->ctx = ctx; 115 r->mask = info.max_entries - 1; 116 117 /* Map writable consumer page */ 118 tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, 119 map_fd, 0); 120 if (tmp == MAP_FAILED) { 121 err = -errno; 122 pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n", 123 map_fd, err); 124 return libbpf_err(err); 125 } 126 r->consumer_pos = tmp; 127 128 /* Map read-only producer page and data pages. We map twice as big 129 * data size to allow simple reading of samples that wrap around the 130 * end of a ring buffer. See kernel implementation for details. 131 * */ 132 tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ, 133 MAP_SHARED, map_fd, rb->page_size); 134 if (tmp == MAP_FAILED) { 135 err = -errno; 136 ringbuf_unmap_ring(rb, r); 137 pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n", 138 map_fd, err); 139 return libbpf_err(err); 140 } 141 r->producer_pos = tmp; 142 r->data = tmp + rb->page_size; 143 144 e = &rb->events[rb->ring_cnt]; 145 memset(e, 0, sizeof(*e)); 146 147 e->events = EPOLLIN; 148 e->data.fd = rb->ring_cnt; 149 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { 150 err = -errno; 151 ringbuf_unmap_ring(rb, r); 152 pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n", 153 map_fd, err); 154 return libbpf_err(err); 155 } 156 157 rb->ring_cnt++; 158 return 0; 159 } 160 161 void ring_buffer__free(struct ring_buffer *rb) 162 { 163 int i; 164 165 if (!rb) 166 return; 167 168 for (i = 0; i < rb->ring_cnt; ++i) 169 ringbuf_unmap_ring(rb, &rb->rings[i]); 170 if (rb->epoll_fd >= 0) 171 close(rb->epoll_fd); 172 173 free(rb->events); 174 free(rb->rings); 175 free(rb); 176 } 177 178 struct ring_buffer * 179 ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, 180 const struct ring_buffer_opts *opts) 181 { 182 struct ring_buffer *rb; 183 int err; 184 185 if (!OPTS_VALID(opts, ring_buffer_opts)) 186 return errno = EINVAL, NULL; 187 188 rb = calloc(1, sizeof(*rb)); 189 if (!rb) 190 return errno = ENOMEM, NULL; 191 192 rb->page_size = getpagesize(); 193 194 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 195 if (rb->epoll_fd < 0) { 196 err = -errno; 197 pr_warn("ringbuf: failed to create epoll instance: %d\n", err); 198 goto err_out; 199 } 200 201 err = ring_buffer__add(rb, map_fd, sample_cb, ctx); 202 if (err) 203 goto err_out; 204 205 return rb; 206 207 err_out: 208 ring_buffer__free(rb); 209 return errno = -err, NULL; 210 } 211 212 static inline int roundup_len(__u32 len) 213 { 214 /* clear out top 2 bits (discard and busy, if set) */ 215 len <<= 2; 216 len >>= 2; 217 /* add length prefix */ 218 len += BPF_RINGBUF_HDR_SZ; 219 /* round up to 8 byte alignment */ 220 return (len + 7) / 8 * 8; 221 } 222 223 static int64_t ringbuf_process_ring(struct ring* r) 224 { 225 int *len_ptr, len, err; 226 /* 64-bit to avoid overflow in case of extreme application behavior */ 227 int64_t cnt = 0; 228 unsigned long cons_pos, prod_pos; 229 bool got_new_data; 230 void *sample; 231 232 cons_pos = smp_load_acquire(r->consumer_pos); 233 do { 234 got_new_data = false; 235 prod_pos = smp_load_acquire(r->producer_pos); 236 while (cons_pos < prod_pos) { 237 len_ptr = r->data + (cons_pos & r->mask); 238 len = smp_load_acquire(len_ptr); 239 240 /* sample not committed yet, bail out for now */ 241 if (len & BPF_RINGBUF_BUSY_BIT) 242 goto done; 243 244 got_new_data = true; 245 cons_pos += roundup_len(len); 246 247 if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { 248 sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; 249 err = r->sample_cb(r->ctx, sample, len); 250 if (err < 0) { 251 /* update consumer pos and bail out */ 252 smp_store_release(r->consumer_pos, 253 cons_pos); 254 return err; 255 } 256 cnt++; 257 } 258 259 smp_store_release(r->consumer_pos, cons_pos); 260 } 261 } while (got_new_data); 262 done: 263 return cnt; 264 } 265 266 /* Consume available ring buffer(s) data without event polling. 267 * Returns number of records consumed across all registered ring buffers (or 268 * INT_MAX, whichever is less), or negative number if any of the callbacks 269 * return error. 270 */ 271 int ring_buffer__consume(struct ring_buffer *rb) 272 { 273 int64_t err, res = 0; 274 int i; 275 276 for (i = 0; i < rb->ring_cnt; i++) { 277 struct ring *ring = &rb->rings[i]; 278 279 err = ringbuf_process_ring(ring); 280 if (err < 0) 281 return libbpf_err(err); 282 res += err; 283 } 284 if (res > INT_MAX) 285 return INT_MAX; 286 return res; 287 } 288 289 /* Poll for available data and consume records, if any are available. 290 * Returns number of records consumed (or INT_MAX, whichever is less), or 291 * negative number, if any of the registered callbacks returned error. 292 */ 293 int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) 294 { 295 int i, cnt; 296 int64_t err, res = 0; 297 298 cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); 299 if (cnt < 0) 300 return libbpf_err(-errno); 301 302 for (i = 0; i < cnt; i++) { 303 __u32 ring_id = rb->events[i].data.fd; 304 struct ring *ring = &rb->rings[ring_id]; 305 306 err = ringbuf_process_ring(ring); 307 if (err < 0) 308 return libbpf_err(err); 309 res += err; 310 } 311 if (res > INT_MAX) 312 return INT_MAX; 313 return res; 314 } 315 316 /* Get an fd that can be used to sleep until data is available in the ring(s) */ 317 int ring_buffer__epoll_fd(const struct ring_buffer *rb) 318 { 319 return rb->epoll_fd; 320 } 321 322 static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) 323 { 324 if (rb->consumer_pos) { 325 munmap(rb->consumer_pos, rb->page_size); 326 rb->consumer_pos = NULL; 327 } 328 if (rb->producer_pos) { 329 munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); 330 rb->producer_pos = NULL; 331 } 332 } 333 334 void user_ring_buffer__free(struct user_ring_buffer *rb) 335 { 336 if (!rb) 337 return; 338 339 user_ringbuf_unmap_ring(rb); 340 341 if (rb->epoll_fd >= 0) 342 close(rb->epoll_fd); 343 344 free(rb); 345 } 346 347 static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd) 348 { 349 struct bpf_map_info info; 350 __u32 len = sizeof(info); 351 void *tmp; 352 struct epoll_event *rb_epoll; 353 int err; 354 355 memset(&info, 0, sizeof(info)); 356 357 err = bpf_obj_get_info_by_fd(map_fd, &info, &len); 358 if (err) { 359 err = -errno; 360 pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", map_fd, err); 361 return err; 362 } 363 364 if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { 365 pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd); 366 return -EINVAL; 367 } 368 369 rb->map_fd = map_fd; 370 rb->mask = info.max_entries - 1; 371 372 /* Map read-only consumer page */ 373 tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); 374 if (tmp == MAP_FAILED) { 375 err = -errno; 376 pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", 377 map_fd, err); 378 return err; 379 } 380 rb->consumer_pos = tmp; 381 382 /* Map read-write the producer page and data pages. We map the data 383 * region as twice the total size of the ring buffer to allow the 384 * simple reading and writing of samples that wrap around the end of 385 * the buffer. See the kernel implementation for details. 386 */ 387 tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, 388 PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); 389 if (tmp == MAP_FAILED) { 390 err = -errno; 391 pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", 392 map_fd, err); 393 return err; 394 } 395 396 rb->producer_pos = tmp; 397 rb->data = tmp + rb->page_size; 398 399 rb_epoll = &rb->event; 400 rb_epoll->events = EPOLLOUT; 401 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { 402 err = -errno; 403 pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err); 404 return err; 405 } 406 407 return 0; 408 } 409 410 struct user_ring_buffer * 411 user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) 412 { 413 struct user_ring_buffer *rb; 414 int err; 415 416 if (!OPTS_VALID(opts, user_ring_buffer_opts)) 417 return errno = EINVAL, NULL; 418 419 rb = calloc(1, sizeof(*rb)); 420 if (!rb) 421 return errno = ENOMEM, NULL; 422 423 rb->page_size = getpagesize(); 424 425 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 426 if (rb->epoll_fd < 0) { 427 err = -errno; 428 pr_warn("user ringbuf: failed to create epoll instance: %d\n", err); 429 goto err_out; 430 } 431 432 err = user_ringbuf_map(rb, map_fd); 433 if (err) 434 goto err_out; 435 436 return rb; 437 438 err_out: 439 user_ring_buffer__free(rb); 440 return errno = -err, NULL; 441 } 442 443 static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard) 444 { 445 __u32 new_len; 446 struct ringbuf_hdr *hdr; 447 uintptr_t hdr_offset; 448 449 hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ; 450 hdr = rb->data + (hdr_offset & rb->mask); 451 452 new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; 453 if (discard) 454 new_len |= BPF_RINGBUF_DISCARD_BIT; 455 456 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 457 * the kernel. 458 */ 459 __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); 460 } 461 462 void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) 463 { 464 user_ringbuf_commit(rb, sample, true); 465 } 466 467 void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) 468 { 469 user_ringbuf_commit(rb, sample, false); 470 } 471 472 void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) 473 { 474 __u32 avail_size, total_size, max_size; 475 /* 64-bit to avoid overflow in case of extreme application behavior */ 476 __u64 cons_pos, prod_pos; 477 struct ringbuf_hdr *hdr; 478 479 /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in 480 * the kernel. 481 */ 482 cons_pos = smp_load_acquire(rb->consumer_pos); 483 /* Synchronizes with smp_store_release() in user_ringbuf_commit() */ 484 prod_pos = smp_load_acquire(rb->producer_pos); 485 486 max_size = rb->mask + 1; 487 avail_size = max_size - (prod_pos - cons_pos); 488 /* Round up total size to a multiple of 8. */ 489 total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; 490 491 if (total_size > max_size) 492 return errno = E2BIG, NULL; 493 494 if (avail_size < total_size) 495 return errno = ENOSPC, NULL; 496 497 hdr = rb->data + (prod_pos & rb->mask); 498 hdr->len = size | BPF_RINGBUF_BUSY_BIT; 499 hdr->pad = 0; 500 501 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 502 * the kernel. 503 */ 504 smp_store_release(rb->producer_pos, prod_pos + total_size); 505 506 return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); 507 } 508 509 static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end) 510 { 511 __u64 start_ns, end_ns, ns_per_s = 1000000000; 512 513 start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec; 514 end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec; 515 516 return end_ns - start_ns; 517 } 518 519 void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) 520 { 521 void *sample; 522 int err, ms_remaining = timeout_ms; 523 struct timespec start; 524 525 if (timeout_ms < 0 && timeout_ms != -1) 526 return errno = EINVAL, NULL; 527 528 if (timeout_ms != -1) { 529 err = clock_gettime(CLOCK_MONOTONIC, &start); 530 if (err) 531 return NULL; 532 } 533 534 do { 535 int cnt, ms_elapsed; 536 struct timespec curr; 537 __u64 ns_per_ms = 1000000; 538 539 sample = user_ring_buffer__reserve(rb, size); 540 if (sample) 541 return sample; 542 else if (errno != ENOSPC) 543 return NULL; 544 545 /* The kernel guarantees at least one event notification 546 * delivery whenever at least one sample is drained from the 547 * ring buffer in an invocation to bpf_ringbuf_drain(). Other 548 * additional events may be delivered at any time, but only one 549 * event is guaranteed per bpf_ringbuf_drain() invocation, 550 * provided that a sample is drained, and the BPF program did 551 * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If 552 * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a 553 * wakeup event will be delivered even if no samples are 554 * drained. 555 */ 556 cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); 557 if (cnt < 0) 558 return NULL; 559 560 if (timeout_ms == -1) 561 continue; 562 563 err = clock_gettime(CLOCK_MONOTONIC, &curr); 564 if (err) 565 return NULL; 566 567 ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms; 568 ms_remaining = timeout_ms - ms_elapsed; 569 } while (ms_remaining > 0); 570 571 /* Try one more time to reserve a sample after the specified timeout has elapsed. */ 572 return user_ring_buffer__reserve(rb, size); 573 } 574