1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/spinlock.h> 8 #include <linux/debugfs.h> 9 #include <linux/uaccess.h> 10 #include <linux/module.h> 11 #include <linux/percpu.h> 12 #include <linux/mutex.h> 13 #include <linux/sched.h> /* used for sched_clock() (for now) */ 14 #include <linux/init.h> 15 #include <linux/hash.h> 16 #include <linux/list.h> 17 #include <linux/fs.h> 18 19 /* Up this if you want to test the TIME_EXTENTS and normalization */ 20 #define DEBUG_SHIFT 0 21 22 /* FIXME!!! */ 23 u64 ring_buffer_time_stamp(int cpu) 24 { 25 /* shift to debug/test normalization and TIME_EXTENTS */ 26 return sched_clock() << DEBUG_SHIFT; 27 } 28 29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) 30 { 31 /* Just stupid testing the normalize function and deltas */ 32 *ts >>= DEBUG_SHIFT; 33 } 34 35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event)) 36 #define RB_ALIGNMENT_SHIFT 2 37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT) 38 #define RB_MAX_SMALL_DATA 28 39 40 enum { 41 RB_LEN_TIME_EXTEND = 8, 42 RB_LEN_TIME_STAMP = 16, 43 }; 44 45 /* inline for ring buffer fast paths */ 46 static inline unsigned 47 rb_event_length(struct ring_buffer_event *event) 48 { 49 unsigned length; 50 51 switch (event->type) { 52 case RINGBUF_TYPE_PADDING: 53 /* undefined */ 54 return -1; 55 56 case RINGBUF_TYPE_TIME_EXTEND: 57 return RB_LEN_TIME_EXTEND; 58 59 case RINGBUF_TYPE_TIME_STAMP: 60 return RB_LEN_TIME_STAMP; 61 62 case RINGBUF_TYPE_DATA: 63 if (event->len) 64 length = event->len << RB_ALIGNMENT_SHIFT; 65 else 66 length = event->array[0]; 67 return length + RB_EVNT_HDR_SIZE; 68 default: 69 BUG(); 70 } 71 /* not hit */ 72 return 0; 73 } 74 75 /** 76 * ring_buffer_event_length - return the length of the event 77 * @event: the event to get the length of 78 */ 79 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 80 { 81 return rb_event_length(event); 82 } 83 84 /* inline for ring buffer fast paths */ 85 static inline void * 86 rb_event_data(struct ring_buffer_event *event) 87 { 88 BUG_ON(event->type != RINGBUF_TYPE_DATA); 89 /* If length is in len field, then array[0] has the data */ 90 if (event->len) 91 return (void *)&event->array[0]; 92 /* Otherwise length is in array[0] and array[1] has the data */ 93 return (void *)&event->array[1]; 94 } 95 96 /** 97 * ring_buffer_event_data - return the data of the event 98 * @event: the event to get the data from 99 */ 100 void *ring_buffer_event_data(struct ring_buffer_event *event) 101 { 102 return rb_event_data(event); 103 } 104 105 #define for_each_buffer_cpu(buffer, cpu) \ 106 for_each_cpu_mask(cpu, buffer->cpumask) 107 108 #define TS_SHIFT 27 109 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 110 #define TS_DELTA_TEST (~TS_MASK) 111 112 /* 113 * This hack stolen from mm/slob.c. 114 * We can store per page timing information in the page frame of the page. 115 * Thanks to Peter Zijlstra for suggesting this idea. 116 */ 117 struct buffer_page { 118 u64 time_stamp; /* page time stamp */ 119 local_t write; /* index for next write */ 120 local_t commit; /* write commited index */ 121 unsigned read; /* index for next read */ 122 struct list_head list; /* list of free pages */ 123 void *page; /* Actual data page */ 124 }; 125 126 /* 127 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 128 * this issue out. 129 */ 130 static inline void free_buffer_page(struct buffer_page *bpage) 131 { 132 if (bpage->page) 133 __free_page(bpage->page); 134 kfree(bpage); 135 } 136 137 /* 138 * We need to fit the time_stamp delta into 27 bits. 139 */ 140 static inline int test_time_stamp(u64 delta) 141 { 142 if (delta & TS_DELTA_TEST) 143 return 1; 144 return 0; 145 } 146 147 #define BUF_PAGE_SIZE PAGE_SIZE 148 149 /* 150 * head_page == tail_page && head == tail then buffer is empty. 151 */ 152 struct ring_buffer_per_cpu { 153 int cpu; 154 struct ring_buffer *buffer; 155 spinlock_t lock; 156 struct lock_class_key lock_key; 157 struct list_head pages; 158 struct buffer_page *head_page; /* read from head */ 159 struct buffer_page *tail_page; /* write to tail */ 160 struct buffer_page *commit_page; /* commited pages */ 161 struct buffer_page *reader_page; 162 unsigned long overrun; 163 unsigned long entries; 164 u64 write_stamp; 165 u64 read_stamp; 166 atomic_t record_disabled; 167 }; 168 169 struct ring_buffer { 170 unsigned long size; 171 unsigned pages; 172 unsigned flags; 173 int cpus; 174 cpumask_t cpumask; 175 atomic_t record_disabled; 176 177 struct mutex mutex; 178 179 struct ring_buffer_per_cpu **buffers; 180 }; 181 182 struct ring_buffer_iter { 183 struct ring_buffer_per_cpu *cpu_buffer; 184 unsigned long head; 185 struct buffer_page *head_page; 186 u64 read_stamp; 187 }; 188 189 #define RB_WARN_ON(buffer, cond) \ 190 do { \ 191 if (unlikely(cond)) { \ 192 atomic_inc(&buffer->record_disabled); \ 193 WARN_ON(1); \ 194 } \ 195 } while (0) 196 197 #define RB_WARN_ON_RET(buffer, cond) \ 198 do { \ 199 if (unlikely(cond)) { \ 200 atomic_inc(&buffer->record_disabled); \ 201 WARN_ON(1); \ 202 return -1; \ 203 } \ 204 } while (0) 205 206 #define RB_WARN_ON_ONCE(buffer, cond) \ 207 do { \ 208 static int once; \ 209 if (unlikely(cond) && !once) { \ 210 once++; \ 211 atomic_inc(&buffer->record_disabled); \ 212 WARN_ON(1); \ 213 } \ 214 } while (0) 215 216 /** 217 * check_pages - integrity check of buffer pages 218 * @cpu_buffer: CPU buffer with pages to test 219 * 220 * As a safty measure we check to make sure the data pages have not 221 * been corrupted. 222 */ 223 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 224 { 225 struct list_head *head = &cpu_buffer->pages; 226 struct buffer_page *page, *tmp; 227 228 RB_WARN_ON_RET(cpu_buffer, head->next->prev != head); 229 RB_WARN_ON_RET(cpu_buffer, head->prev->next != head); 230 231 list_for_each_entry_safe(page, tmp, head, list) { 232 RB_WARN_ON_RET(cpu_buffer, 233 page->list.next->prev != &page->list); 234 RB_WARN_ON_RET(cpu_buffer, 235 page->list.prev->next != &page->list); 236 } 237 238 return 0; 239 } 240 241 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 242 unsigned nr_pages) 243 { 244 struct list_head *head = &cpu_buffer->pages; 245 struct buffer_page *page, *tmp; 246 unsigned long addr; 247 LIST_HEAD(pages); 248 unsigned i; 249 250 for (i = 0; i < nr_pages; i++) { 251 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()), 252 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 253 if (!page) 254 goto free_pages; 255 list_add(&page->list, &pages); 256 257 addr = __get_free_page(GFP_KERNEL); 258 if (!addr) 259 goto free_pages; 260 page->page = (void *)addr; 261 } 262 263 list_splice(&pages, head); 264 265 rb_check_pages(cpu_buffer); 266 267 return 0; 268 269 free_pages: 270 list_for_each_entry_safe(page, tmp, &pages, list) { 271 list_del_init(&page->list); 272 free_buffer_page(page); 273 } 274 return -ENOMEM; 275 } 276 277 static struct ring_buffer_per_cpu * 278 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 279 { 280 struct ring_buffer_per_cpu *cpu_buffer; 281 struct buffer_page *page; 282 unsigned long addr; 283 int ret; 284 285 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 286 GFP_KERNEL, cpu_to_node(cpu)); 287 if (!cpu_buffer) 288 return NULL; 289 290 cpu_buffer->cpu = cpu; 291 cpu_buffer->buffer = buffer; 292 spin_lock_init(&cpu_buffer->lock); 293 INIT_LIST_HEAD(&cpu_buffer->pages); 294 295 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()), 296 GFP_KERNEL, cpu_to_node(cpu)); 297 if (!page) 298 goto fail_free_buffer; 299 300 cpu_buffer->reader_page = page; 301 addr = __get_free_page(GFP_KERNEL); 302 if (!addr) 303 goto fail_free_reader; 304 page->page = (void *)addr; 305 306 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 307 308 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 309 if (ret < 0) 310 goto fail_free_reader; 311 312 cpu_buffer->head_page 313 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 314 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 315 316 return cpu_buffer; 317 318 fail_free_reader: 319 free_buffer_page(cpu_buffer->reader_page); 320 321 fail_free_buffer: 322 kfree(cpu_buffer); 323 return NULL; 324 } 325 326 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 327 { 328 struct list_head *head = &cpu_buffer->pages; 329 struct buffer_page *page, *tmp; 330 331 list_del_init(&cpu_buffer->reader_page->list); 332 free_buffer_page(cpu_buffer->reader_page); 333 334 list_for_each_entry_safe(page, tmp, head, list) { 335 list_del_init(&page->list); 336 free_buffer_page(page); 337 } 338 kfree(cpu_buffer); 339 } 340 341 /* 342 * Causes compile errors if the struct buffer_page gets bigger 343 * than the struct page. 344 */ 345 extern int ring_buffer_page_too_big(void); 346 347 /** 348 * ring_buffer_alloc - allocate a new ring_buffer 349 * @size: the size in bytes that is needed. 350 * @flags: attributes to set for the ring buffer. 351 * 352 * Currently the only flag that is available is the RB_FL_OVERWRITE 353 * flag. This flag means that the buffer will overwrite old data 354 * when the buffer wraps. If this flag is not set, the buffer will 355 * drop data when the tail hits the head. 356 */ 357 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) 358 { 359 struct ring_buffer *buffer; 360 int bsize; 361 int cpu; 362 363 /* Paranoid! Optimizes out when all is well */ 364 if (sizeof(struct buffer_page) > sizeof(struct page)) 365 ring_buffer_page_too_big(); 366 367 368 /* keep it in its own cache line */ 369 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 370 GFP_KERNEL); 371 if (!buffer) 372 return NULL; 373 374 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 375 buffer->flags = flags; 376 377 /* need at least two pages */ 378 if (buffer->pages == 1) 379 buffer->pages++; 380 381 buffer->cpumask = cpu_possible_map; 382 buffer->cpus = nr_cpu_ids; 383 384 bsize = sizeof(void *) * nr_cpu_ids; 385 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 386 GFP_KERNEL); 387 if (!buffer->buffers) 388 goto fail_free_buffer; 389 390 for_each_buffer_cpu(buffer, cpu) { 391 buffer->buffers[cpu] = 392 rb_allocate_cpu_buffer(buffer, cpu); 393 if (!buffer->buffers[cpu]) 394 goto fail_free_buffers; 395 } 396 397 mutex_init(&buffer->mutex); 398 399 return buffer; 400 401 fail_free_buffers: 402 for_each_buffer_cpu(buffer, cpu) { 403 if (buffer->buffers[cpu]) 404 rb_free_cpu_buffer(buffer->buffers[cpu]); 405 } 406 kfree(buffer->buffers); 407 408 fail_free_buffer: 409 kfree(buffer); 410 return NULL; 411 } 412 413 /** 414 * ring_buffer_free - free a ring buffer. 415 * @buffer: the buffer to free. 416 */ 417 void 418 ring_buffer_free(struct ring_buffer *buffer) 419 { 420 int cpu; 421 422 for_each_buffer_cpu(buffer, cpu) 423 rb_free_cpu_buffer(buffer->buffers[cpu]); 424 425 kfree(buffer); 426 } 427 428 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 429 430 static void 431 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 432 { 433 struct buffer_page *page; 434 struct list_head *p; 435 unsigned i; 436 437 atomic_inc(&cpu_buffer->record_disabled); 438 synchronize_sched(); 439 440 for (i = 0; i < nr_pages; i++) { 441 BUG_ON(list_empty(&cpu_buffer->pages)); 442 p = cpu_buffer->pages.next; 443 page = list_entry(p, struct buffer_page, list); 444 list_del_init(&page->list); 445 free_buffer_page(page); 446 } 447 BUG_ON(list_empty(&cpu_buffer->pages)); 448 449 rb_reset_cpu(cpu_buffer); 450 451 rb_check_pages(cpu_buffer); 452 453 atomic_dec(&cpu_buffer->record_disabled); 454 455 } 456 457 static void 458 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 459 struct list_head *pages, unsigned nr_pages) 460 { 461 struct buffer_page *page; 462 struct list_head *p; 463 unsigned i; 464 465 atomic_inc(&cpu_buffer->record_disabled); 466 synchronize_sched(); 467 468 for (i = 0; i < nr_pages; i++) { 469 BUG_ON(list_empty(pages)); 470 p = pages->next; 471 page = list_entry(p, struct buffer_page, list); 472 list_del_init(&page->list); 473 list_add_tail(&page->list, &cpu_buffer->pages); 474 } 475 rb_reset_cpu(cpu_buffer); 476 477 rb_check_pages(cpu_buffer); 478 479 atomic_dec(&cpu_buffer->record_disabled); 480 } 481 482 /** 483 * ring_buffer_resize - resize the ring buffer 484 * @buffer: the buffer to resize. 485 * @size: the new size. 486 * 487 * The tracer is responsible for making sure that the buffer is 488 * not being used while changing the size. 489 * Note: We may be able to change the above requirement by using 490 * RCU synchronizations. 491 * 492 * Minimum size is 2 * BUF_PAGE_SIZE. 493 * 494 * Returns -1 on failure. 495 */ 496 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 497 { 498 struct ring_buffer_per_cpu *cpu_buffer; 499 unsigned nr_pages, rm_pages, new_pages; 500 struct buffer_page *page, *tmp; 501 unsigned long buffer_size; 502 unsigned long addr; 503 LIST_HEAD(pages); 504 int i, cpu; 505 506 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 507 size *= BUF_PAGE_SIZE; 508 buffer_size = buffer->pages * BUF_PAGE_SIZE; 509 510 /* we need a minimum of two pages */ 511 if (size < BUF_PAGE_SIZE * 2) 512 size = BUF_PAGE_SIZE * 2; 513 514 if (size == buffer_size) 515 return size; 516 517 mutex_lock(&buffer->mutex); 518 519 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 520 521 if (size < buffer_size) { 522 523 /* easy case, just free pages */ 524 BUG_ON(nr_pages >= buffer->pages); 525 526 rm_pages = buffer->pages - nr_pages; 527 528 for_each_buffer_cpu(buffer, cpu) { 529 cpu_buffer = buffer->buffers[cpu]; 530 rb_remove_pages(cpu_buffer, rm_pages); 531 } 532 goto out; 533 } 534 535 /* 536 * This is a bit more difficult. We only want to add pages 537 * when we can allocate enough for all CPUs. We do this 538 * by allocating all the pages and storing them on a local 539 * link list. If we succeed in our allocation, then we 540 * add these pages to the cpu_buffers. Otherwise we just free 541 * them all and return -ENOMEM; 542 */ 543 BUG_ON(nr_pages <= buffer->pages); 544 new_pages = nr_pages - buffer->pages; 545 546 for_each_buffer_cpu(buffer, cpu) { 547 for (i = 0; i < new_pages; i++) { 548 page = kzalloc_node(ALIGN(sizeof(*page), 549 cache_line_size()), 550 GFP_KERNEL, cpu_to_node(cpu)); 551 if (!page) 552 goto free_pages; 553 list_add(&page->list, &pages); 554 addr = __get_free_page(GFP_KERNEL); 555 if (!addr) 556 goto free_pages; 557 page->page = (void *)addr; 558 } 559 } 560 561 for_each_buffer_cpu(buffer, cpu) { 562 cpu_buffer = buffer->buffers[cpu]; 563 rb_insert_pages(cpu_buffer, &pages, new_pages); 564 } 565 566 BUG_ON(!list_empty(&pages)); 567 568 out: 569 buffer->pages = nr_pages; 570 mutex_unlock(&buffer->mutex); 571 572 return size; 573 574 free_pages: 575 list_for_each_entry_safe(page, tmp, &pages, list) { 576 list_del_init(&page->list); 577 free_buffer_page(page); 578 } 579 return -ENOMEM; 580 } 581 582 static inline int rb_null_event(struct ring_buffer_event *event) 583 { 584 return event->type == RINGBUF_TYPE_PADDING; 585 } 586 587 static inline void *__rb_page_index(struct buffer_page *page, unsigned index) 588 { 589 return page->page + index; 590 } 591 592 static inline struct ring_buffer_event * 593 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 594 { 595 return __rb_page_index(cpu_buffer->reader_page, 596 cpu_buffer->reader_page->read); 597 } 598 599 static inline struct ring_buffer_event * 600 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 601 { 602 return __rb_page_index(cpu_buffer->head_page, 603 cpu_buffer->head_page->read); 604 } 605 606 static inline struct ring_buffer_event * 607 rb_iter_head_event(struct ring_buffer_iter *iter) 608 { 609 return __rb_page_index(iter->head_page, iter->head); 610 } 611 612 static inline unsigned rb_page_write(struct buffer_page *bpage) 613 { 614 return local_read(&bpage->write); 615 } 616 617 static inline unsigned rb_page_commit(struct buffer_page *bpage) 618 { 619 return local_read(&bpage->commit); 620 } 621 622 /* Size is determined by what has been commited */ 623 static inline unsigned rb_page_size(struct buffer_page *bpage) 624 { 625 return rb_page_commit(bpage); 626 } 627 628 static inline unsigned 629 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 630 { 631 return rb_page_commit(cpu_buffer->commit_page); 632 } 633 634 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 635 { 636 return rb_page_commit(cpu_buffer->head_page); 637 } 638 639 /* 640 * When the tail hits the head and the buffer is in overwrite mode, 641 * the head jumps to the next page and all content on the previous 642 * page is discarded. But before doing so, we update the overrun 643 * variable of the buffer. 644 */ 645 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer) 646 { 647 struct ring_buffer_event *event; 648 unsigned long head; 649 650 for (head = 0; head < rb_head_size(cpu_buffer); 651 head += rb_event_length(event)) { 652 653 event = __rb_page_index(cpu_buffer->head_page, head); 654 BUG_ON(rb_null_event(event)); 655 /* Only count data entries */ 656 if (event->type != RINGBUF_TYPE_DATA) 657 continue; 658 cpu_buffer->overrun++; 659 cpu_buffer->entries--; 660 } 661 } 662 663 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 664 struct buffer_page **page) 665 { 666 struct list_head *p = (*page)->list.next; 667 668 if (p == &cpu_buffer->pages) 669 p = p->next; 670 671 *page = list_entry(p, struct buffer_page, list); 672 } 673 674 static inline unsigned 675 rb_event_index(struct ring_buffer_event *event) 676 { 677 unsigned long addr = (unsigned long)event; 678 679 return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); 680 } 681 682 static inline int 683 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 684 struct ring_buffer_event *event) 685 { 686 unsigned long addr = (unsigned long)event; 687 unsigned long index; 688 689 index = rb_event_index(event); 690 addr &= PAGE_MASK; 691 692 return cpu_buffer->commit_page->page == (void *)addr && 693 rb_commit_index(cpu_buffer) == index; 694 } 695 696 static inline void 697 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, 698 struct ring_buffer_event *event) 699 { 700 unsigned long addr = (unsigned long)event; 701 unsigned long index; 702 703 index = rb_event_index(event); 704 addr &= PAGE_MASK; 705 706 while (cpu_buffer->commit_page->page != (void *)addr) { 707 RB_WARN_ON(cpu_buffer, 708 cpu_buffer->commit_page == cpu_buffer->tail_page); 709 cpu_buffer->commit_page->commit = 710 cpu_buffer->commit_page->write; 711 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 712 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp; 713 } 714 715 /* Now set the commit to the event's index */ 716 local_set(&cpu_buffer->commit_page->commit, index); 717 } 718 719 static inline void 720 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 721 { 722 /* 723 * We only race with interrupts and NMIs on this CPU. 724 * If we own the commit event, then we can commit 725 * all others that interrupted us, since the interruptions 726 * are in stack format (they finish before they come 727 * back to us). This allows us to do a simple loop to 728 * assign the commit to the tail. 729 */ 730 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 731 cpu_buffer->commit_page->commit = 732 cpu_buffer->commit_page->write; 733 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 734 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp; 735 /* add barrier to keep gcc from optimizing too much */ 736 barrier(); 737 } 738 while (rb_commit_index(cpu_buffer) != 739 rb_page_write(cpu_buffer->commit_page)) { 740 cpu_buffer->commit_page->commit = 741 cpu_buffer->commit_page->write; 742 barrier(); 743 } 744 } 745 746 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 747 { 748 cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp; 749 cpu_buffer->reader_page->read = 0; 750 } 751 752 static inline void rb_inc_iter(struct ring_buffer_iter *iter) 753 { 754 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 755 756 /* 757 * The iterator could be on the reader page (it starts there). 758 * But the head could have moved, since the reader was 759 * found. Check for this case and assign the iterator 760 * to the head page instead of next. 761 */ 762 if (iter->head_page == cpu_buffer->reader_page) 763 iter->head_page = cpu_buffer->head_page; 764 else 765 rb_inc_page(cpu_buffer, &iter->head_page); 766 767 iter->read_stamp = iter->head_page->time_stamp; 768 iter->head = 0; 769 } 770 771 /** 772 * ring_buffer_update_event - update event type and data 773 * @event: the even to update 774 * @type: the type of event 775 * @length: the size of the event field in the ring buffer 776 * 777 * Update the type and data fields of the event. The length 778 * is the actual size that is written to the ring buffer, 779 * and with this, we can determine what to place into the 780 * data field. 781 */ 782 static inline void 783 rb_update_event(struct ring_buffer_event *event, 784 unsigned type, unsigned length) 785 { 786 event->type = type; 787 788 switch (type) { 789 790 case RINGBUF_TYPE_PADDING: 791 break; 792 793 case RINGBUF_TYPE_TIME_EXTEND: 794 event->len = 795 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1)) 796 >> RB_ALIGNMENT_SHIFT; 797 break; 798 799 case RINGBUF_TYPE_TIME_STAMP: 800 event->len = 801 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1)) 802 >> RB_ALIGNMENT_SHIFT; 803 break; 804 805 case RINGBUF_TYPE_DATA: 806 length -= RB_EVNT_HDR_SIZE; 807 if (length > RB_MAX_SMALL_DATA) { 808 event->len = 0; 809 event->array[0] = length; 810 } else 811 event->len = 812 (length + (RB_ALIGNMENT-1)) 813 >> RB_ALIGNMENT_SHIFT; 814 break; 815 default: 816 BUG(); 817 } 818 } 819 820 static inline unsigned rb_calculate_event_length(unsigned length) 821 { 822 struct ring_buffer_event event; /* Used only for sizeof array */ 823 824 /* zero length can cause confusions */ 825 if (!length) 826 length = 1; 827 828 if (length > RB_MAX_SMALL_DATA) 829 length += sizeof(event.array[0]); 830 831 length += RB_EVNT_HDR_SIZE; 832 length = ALIGN(length, RB_ALIGNMENT); 833 834 return length; 835 } 836 837 static struct ring_buffer_event * 838 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 839 unsigned type, unsigned long length, u64 *ts) 840 { 841 struct buffer_page *tail_page, *head_page, *reader_page; 842 unsigned long tail, write; 843 struct ring_buffer *buffer = cpu_buffer->buffer; 844 struct ring_buffer_event *event; 845 unsigned long flags; 846 847 tail_page = cpu_buffer->tail_page; 848 write = local_add_return(length, &tail_page->write); 849 tail = write - length; 850 851 /* See if we shot pass the end of this buffer page */ 852 if (write > BUF_PAGE_SIZE) { 853 struct buffer_page *next_page = tail_page; 854 855 spin_lock_irqsave(&cpu_buffer->lock, flags); 856 857 rb_inc_page(cpu_buffer, &next_page); 858 859 head_page = cpu_buffer->head_page; 860 reader_page = cpu_buffer->reader_page; 861 862 /* we grabbed the lock before incrementing */ 863 RB_WARN_ON(cpu_buffer, next_page == reader_page); 864 865 /* 866 * If for some reason, we had an interrupt storm that made 867 * it all the way around the buffer, bail, and warn 868 * about it. 869 */ 870 if (unlikely(next_page == cpu_buffer->commit_page)) { 871 WARN_ON_ONCE(1); 872 goto out_unlock; 873 } 874 875 if (next_page == head_page) { 876 if (!(buffer->flags & RB_FL_OVERWRITE)) { 877 /* reset write */ 878 if (tail <= BUF_PAGE_SIZE) 879 local_set(&tail_page->write, tail); 880 goto out_unlock; 881 } 882 883 /* tail_page has not moved yet? */ 884 if (tail_page == cpu_buffer->tail_page) { 885 /* count overflows */ 886 rb_update_overflow(cpu_buffer); 887 888 rb_inc_page(cpu_buffer, &head_page); 889 cpu_buffer->head_page = head_page; 890 cpu_buffer->head_page->read = 0; 891 } 892 } 893 894 /* 895 * If the tail page is still the same as what we think 896 * it is, then it is up to us to update the tail 897 * pointer. 898 */ 899 if (tail_page == cpu_buffer->tail_page) { 900 local_set(&next_page->write, 0); 901 local_set(&next_page->commit, 0); 902 cpu_buffer->tail_page = next_page; 903 904 /* reread the time stamp */ 905 *ts = ring_buffer_time_stamp(cpu_buffer->cpu); 906 cpu_buffer->tail_page->time_stamp = *ts; 907 } 908 909 /* 910 * The actual tail page has moved forward. 911 */ 912 if (tail < BUF_PAGE_SIZE) { 913 /* Mark the rest of the page with padding */ 914 event = __rb_page_index(tail_page, tail); 915 event->type = RINGBUF_TYPE_PADDING; 916 } 917 918 if (tail <= BUF_PAGE_SIZE) 919 /* Set the write back to the previous setting */ 920 local_set(&tail_page->write, tail); 921 922 /* 923 * If this was a commit entry that failed, 924 * increment that too 925 */ 926 if (tail_page == cpu_buffer->commit_page && 927 tail == rb_commit_index(cpu_buffer)) { 928 rb_set_commit_to_write(cpu_buffer); 929 } 930 931 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 932 933 /* fail and let the caller try again */ 934 return ERR_PTR(-EAGAIN); 935 } 936 937 /* We reserved something on the buffer */ 938 939 BUG_ON(write > BUF_PAGE_SIZE); 940 941 event = __rb_page_index(tail_page, tail); 942 rb_update_event(event, type, length); 943 944 /* 945 * If this is a commit and the tail is zero, then update 946 * this page's time stamp. 947 */ 948 if (!tail && rb_is_commit(cpu_buffer, event)) 949 cpu_buffer->commit_page->time_stamp = *ts; 950 951 return event; 952 953 out_unlock: 954 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 955 return NULL; 956 } 957 958 static int 959 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 960 u64 *ts, u64 *delta) 961 { 962 struct ring_buffer_event *event; 963 static int once; 964 int ret; 965 966 if (unlikely(*delta > (1ULL << 59) && !once++)) { 967 printk(KERN_WARNING "Delta way too big! %llu" 968 " ts=%llu write stamp = %llu\n", 969 *delta, *ts, cpu_buffer->write_stamp); 970 WARN_ON(1); 971 } 972 973 /* 974 * The delta is too big, we to add a 975 * new timestamp. 976 */ 977 event = __rb_reserve_next(cpu_buffer, 978 RINGBUF_TYPE_TIME_EXTEND, 979 RB_LEN_TIME_EXTEND, 980 ts); 981 if (!event) 982 return -EBUSY; 983 984 if (PTR_ERR(event) == -EAGAIN) 985 return -EAGAIN; 986 987 /* Only a commited time event can update the write stamp */ 988 if (rb_is_commit(cpu_buffer, event)) { 989 /* 990 * If this is the first on the page, then we need to 991 * update the page itself, and just put in a zero. 992 */ 993 if (rb_event_index(event)) { 994 event->time_delta = *delta & TS_MASK; 995 event->array[0] = *delta >> TS_SHIFT; 996 } else { 997 cpu_buffer->commit_page->time_stamp = *ts; 998 event->time_delta = 0; 999 event->array[0] = 0; 1000 } 1001 cpu_buffer->write_stamp = *ts; 1002 /* let the caller know this was the commit */ 1003 ret = 1; 1004 } else { 1005 /* Darn, this is just wasted space */ 1006 event->time_delta = 0; 1007 event->array[0] = 0; 1008 ret = 0; 1009 } 1010 1011 *delta = 0; 1012 1013 return ret; 1014 } 1015 1016 static struct ring_buffer_event * 1017 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1018 unsigned type, unsigned long length) 1019 { 1020 struct ring_buffer_event *event; 1021 u64 ts, delta; 1022 int commit = 0; 1023 1024 again: 1025 ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1026 1027 /* 1028 * Only the first commit can update the timestamp. 1029 * Yes there is a race here. If an interrupt comes in 1030 * just after the conditional and it traces too, then it 1031 * will also check the deltas. More than one timestamp may 1032 * also be made. But only the entry that did the actual 1033 * commit will be something other than zero. 1034 */ 1035 if (cpu_buffer->tail_page == cpu_buffer->commit_page && 1036 rb_page_write(cpu_buffer->tail_page) == 1037 rb_commit_index(cpu_buffer)) { 1038 1039 delta = ts - cpu_buffer->write_stamp; 1040 1041 /* make sure this delta is calculated here */ 1042 barrier(); 1043 1044 /* Did the write stamp get updated already? */ 1045 if (unlikely(ts < cpu_buffer->write_stamp)) 1046 goto again; 1047 1048 if (test_time_stamp(delta)) { 1049 1050 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1051 1052 if (commit == -EBUSY) 1053 return NULL; 1054 1055 if (commit == -EAGAIN) 1056 goto again; 1057 1058 RB_WARN_ON(cpu_buffer, commit < 0); 1059 } 1060 } else 1061 /* Non commits have zero deltas */ 1062 delta = 0; 1063 1064 event = __rb_reserve_next(cpu_buffer, type, length, &ts); 1065 if (PTR_ERR(event) == -EAGAIN) 1066 goto again; 1067 1068 if (!event) { 1069 if (unlikely(commit)) 1070 /* 1071 * Ouch! We needed a timestamp and it was commited. But 1072 * we didn't get our event reserved. 1073 */ 1074 rb_set_commit_to_write(cpu_buffer); 1075 return NULL; 1076 } 1077 1078 /* 1079 * If the timestamp was commited, make the commit our entry 1080 * now so that we will update it when needed. 1081 */ 1082 if (commit) 1083 rb_set_commit_event(cpu_buffer, event); 1084 else if (!rb_is_commit(cpu_buffer, event)) 1085 delta = 0; 1086 1087 event->time_delta = delta; 1088 1089 return event; 1090 } 1091 1092 static DEFINE_PER_CPU(int, rb_need_resched); 1093 1094 /** 1095 * ring_buffer_lock_reserve - reserve a part of the buffer 1096 * @buffer: the ring buffer to reserve from 1097 * @length: the length of the data to reserve (excluding event header) 1098 * @flags: a pointer to save the interrupt flags 1099 * 1100 * Returns a reseverd event on the ring buffer to copy directly to. 1101 * The user of this interface will need to get the body to write into 1102 * and can use the ring_buffer_event_data() interface. 1103 * 1104 * The length is the length of the data needed, not the event length 1105 * which also includes the event header. 1106 * 1107 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1108 * If NULL is returned, then nothing has been allocated or locked. 1109 */ 1110 struct ring_buffer_event * 1111 ring_buffer_lock_reserve(struct ring_buffer *buffer, 1112 unsigned long length, 1113 unsigned long *flags) 1114 { 1115 struct ring_buffer_per_cpu *cpu_buffer; 1116 struct ring_buffer_event *event; 1117 int cpu, resched; 1118 1119 if (atomic_read(&buffer->record_disabled)) 1120 return NULL; 1121 1122 /* If we are tracing schedule, we don't want to recurse */ 1123 resched = need_resched(); 1124 preempt_disable_notrace(); 1125 1126 cpu = raw_smp_processor_id(); 1127 1128 if (!cpu_isset(cpu, buffer->cpumask)) 1129 goto out; 1130 1131 cpu_buffer = buffer->buffers[cpu]; 1132 1133 if (atomic_read(&cpu_buffer->record_disabled)) 1134 goto out; 1135 1136 length = rb_calculate_event_length(length); 1137 if (length > BUF_PAGE_SIZE) 1138 goto out; 1139 1140 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length); 1141 if (!event) 1142 goto out; 1143 1144 /* 1145 * Need to store resched state on this cpu. 1146 * Only the first needs to. 1147 */ 1148 1149 if (preempt_count() == 1) 1150 per_cpu(rb_need_resched, cpu) = resched; 1151 1152 return event; 1153 1154 out: 1155 if (resched) 1156 preempt_enable_notrace(); 1157 else 1158 preempt_enable_notrace(); 1159 return NULL; 1160 } 1161 1162 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1163 struct ring_buffer_event *event) 1164 { 1165 cpu_buffer->entries++; 1166 1167 /* Only process further if we own the commit */ 1168 if (!rb_is_commit(cpu_buffer, event)) 1169 return; 1170 1171 cpu_buffer->write_stamp += event->time_delta; 1172 1173 rb_set_commit_to_write(cpu_buffer); 1174 } 1175 1176 /** 1177 * ring_buffer_unlock_commit - commit a reserved 1178 * @buffer: The buffer to commit to 1179 * @event: The event pointer to commit. 1180 * @flags: the interrupt flags received from ring_buffer_lock_reserve. 1181 * 1182 * This commits the data to the ring buffer, and releases any locks held. 1183 * 1184 * Must be paired with ring_buffer_lock_reserve. 1185 */ 1186 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1187 struct ring_buffer_event *event, 1188 unsigned long flags) 1189 { 1190 struct ring_buffer_per_cpu *cpu_buffer; 1191 int cpu = raw_smp_processor_id(); 1192 1193 cpu_buffer = buffer->buffers[cpu]; 1194 1195 rb_commit(cpu_buffer, event); 1196 1197 /* 1198 * Only the last preempt count needs to restore preemption. 1199 */ 1200 if (preempt_count() == 1) { 1201 if (per_cpu(rb_need_resched, cpu)) 1202 preempt_enable_no_resched_notrace(); 1203 else 1204 preempt_enable_notrace(); 1205 } else 1206 preempt_enable_no_resched_notrace(); 1207 1208 return 0; 1209 } 1210 1211 /** 1212 * ring_buffer_write - write data to the buffer without reserving 1213 * @buffer: The ring buffer to write to. 1214 * @length: The length of the data being written (excluding the event header) 1215 * @data: The data to write to the buffer. 1216 * 1217 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1218 * one function. If you already have the data to write to the buffer, it 1219 * may be easier to simply call this function. 1220 * 1221 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1222 * and not the length of the event which would hold the header. 1223 */ 1224 int ring_buffer_write(struct ring_buffer *buffer, 1225 unsigned long length, 1226 void *data) 1227 { 1228 struct ring_buffer_per_cpu *cpu_buffer; 1229 struct ring_buffer_event *event; 1230 unsigned long event_length; 1231 void *body; 1232 int ret = -EBUSY; 1233 int cpu, resched; 1234 1235 if (atomic_read(&buffer->record_disabled)) 1236 return -EBUSY; 1237 1238 resched = need_resched(); 1239 preempt_disable_notrace(); 1240 1241 cpu = raw_smp_processor_id(); 1242 1243 if (!cpu_isset(cpu, buffer->cpumask)) 1244 goto out; 1245 1246 cpu_buffer = buffer->buffers[cpu]; 1247 1248 if (atomic_read(&cpu_buffer->record_disabled)) 1249 goto out; 1250 1251 event_length = rb_calculate_event_length(length); 1252 event = rb_reserve_next_event(cpu_buffer, 1253 RINGBUF_TYPE_DATA, event_length); 1254 if (!event) 1255 goto out; 1256 1257 body = rb_event_data(event); 1258 1259 memcpy(body, data, length); 1260 1261 rb_commit(cpu_buffer, event); 1262 1263 ret = 0; 1264 out: 1265 if (resched) 1266 preempt_enable_no_resched_notrace(); 1267 else 1268 preempt_enable_notrace(); 1269 1270 return ret; 1271 } 1272 1273 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1274 { 1275 struct buffer_page *reader = cpu_buffer->reader_page; 1276 struct buffer_page *head = cpu_buffer->head_page; 1277 struct buffer_page *commit = cpu_buffer->commit_page; 1278 1279 return reader->read == rb_page_commit(reader) && 1280 (commit == reader || 1281 (commit == head && 1282 head->read == rb_page_commit(commit))); 1283 } 1284 1285 /** 1286 * ring_buffer_record_disable - stop all writes into the buffer 1287 * @buffer: The ring buffer to stop writes to. 1288 * 1289 * This prevents all writes to the buffer. Any attempt to write 1290 * to the buffer after this will fail and return NULL. 1291 * 1292 * The caller should call synchronize_sched() after this. 1293 */ 1294 void ring_buffer_record_disable(struct ring_buffer *buffer) 1295 { 1296 atomic_inc(&buffer->record_disabled); 1297 } 1298 1299 /** 1300 * ring_buffer_record_enable - enable writes to the buffer 1301 * @buffer: The ring buffer to enable writes 1302 * 1303 * Note, multiple disables will need the same number of enables 1304 * to truely enable the writing (much like preempt_disable). 1305 */ 1306 void ring_buffer_record_enable(struct ring_buffer *buffer) 1307 { 1308 atomic_dec(&buffer->record_disabled); 1309 } 1310 1311 /** 1312 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1313 * @buffer: The ring buffer to stop writes to. 1314 * @cpu: The CPU buffer to stop 1315 * 1316 * This prevents all writes to the buffer. Any attempt to write 1317 * to the buffer after this will fail and return NULL. 1318 * 1319 * The caller should call synchronize_sched() after this. 1320 */ 1321 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1322 { 1323 struct ring_buffer_per_cpu *cpu_buffer; 1324 1325 if (!cpu_isset(cpu, buffer->cpumask)) 1326 return; 1327 1328 cpu_buffer = buffer->buffers[cpu]; 1329 atomic_inc(&cpu_buffer->record_disabled); 1330 } 1331 1332 /** 1333 * ring_buffer_record_enable_cpu - enable writes to the buffer 1334 * @buffer: The ring buffer to enable writes 1335 * @cpu: The CPU to enable. 1336 * 1337 * Note, multiple disables will need the same number of enables 1338 * to truely enable the writing (much like preempt_disable). 1339 */ 1340 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1341 { 1342 struct ring_buffer_per_cpu *cpu_buffer; 1343 1344 if (!cpu_isset(cpu, buffer->cpumask)) 1345 return; 1346 1347 cpu_buffer = buffer->buffers[cpu]; 1348 atomic_dec(&cpu_buffer->record_disabled); 1349 } 1350 1351 /** 1352 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1353 * @buffer: The ring buffer 1354 * @cpu: The per CPU buffer to get the entries from. 1355 */ 1356 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1357 { 1358 struct ring_buffer_per_cpu *cpu_buffer; 1359 1360 if (!cpu_isset(cpu, buffer->cpumask)) 1361 return 0; 1362 1363 cpu_buffer = buffer->buffers[cpu]; 1364 return cpu_buffer->entries; 1365 } 1366 1367 /** 1368 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1369 * @buffer: The ring buffer 1370 * @cpu: The per CPU buffer to get the number of overruns from 1371 */ 1372 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1373 { 1374 struct ring_buffer_per_cpu *cpu_buffer; 1375 1376 if (!cpu_isset(cpu, buffer->cpumask)) 1377 return 0; 1378 1379 cpu_buffer = buffer->buffers[cpu]; 1380 return cpu_buffer->overrun; 1381 } 1382 1383 /** 1384 * ring_buffer_entries - get the number of entries in a buffer 1385 * @buffer: The ring buffer 1386 * 1387 * Returns the total number of entries in the ring buffer 1388 * (all CPU entries) 1389 */ 1390 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 1391 { 1392 struct ring_buffer_per_cpu *cpu_buffer; 1393 unsigned long entries = 0; 1394 int cpu; 1395 1396 /* if you care about this being correct, lock the buffer */ 1397 for_each_buffer_cpu(buffer, cpu) { 1398 cpu_buffer = buffer->buffers[cpu]; 1399 entries += cpu_buffer->entries; 1400 } 1401 1402 return entries; 1403 } 1404 1405 /** 1406 * ring_buffer_overrun_cpu - get the number of overruns in buffer 1407 * @buffer: The ring buffer 1408 * 1409 * Returns the total number of overruns in the ring buffer 1410 * (all CPU entries) 1411 */ 1412 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 1413 { 1414 struct ring_buffer_per_cpu *cpu_buffer; 1415 unsigned long overruns = 0; 1416 int cpu; 1417 1418 /* if you care about this being correct, lock the buffer */ 1419 for_each_buffer_cpu(buffer, cpu) { 1420 cpu_buffer = buffer->buffers[cpu]; 1421 overruns += cpu_buffer->overrun; 1422 } 1423 1424 return overruns; 1425 } 1426 1427 /** 1428 * ring_buffer_iter_reset - reset an iterator 1429 * @iter: The iterator to reset 1430 * 1431 * Resets the iterator, so that it will start from the beginning 1432 * again. 1433 */ 1434 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 1435 { 1436 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1437 1438 /* Iterator usage is expected to have record disabled */ 1439 if (list_empty(&cpu_buffer->reader_page->list)) { 1440 iter->head_page = cpu_buffer->head_page; 1441 iter->head = cpu_buffer->head_page->read; 1442 } else { 1443 iter->head_page = cpu_buffer->reader_page; 1444 iter->head = cpu_buffer->reader_page->read; 1445 } 1446 if (iter->head) 1447 iter->read_stamp = cpu_buffer->read_stamp; 1448 else 1449 iter->read_stamp = iter->head_page->time_stamp; 1450 } 1451 1452 /** 1453 * ring_buffer_iter_empty - check if an iterator has no more to read 1454 * @iter: The iterator to check 1455 */ 1456 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 1457 { 1458 struct ring_buffer_per_cpu *cpu_buffer; 1459 1460 cpu_buffer = iter->cpu_buffer; 1461 1462 return iter->head_page == cpu_buffer->commit_page && 1463 iter->head == rb_commit_index(cpu_buffer); 1464 } 1465 1466 static void 1467 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1468 struct ring_buffer_event *event) 1469 { 1470 u64 delta; 1471 1472 switch (event->type) { 1473 case RINGBUF_TYPE_PADDING: 1474 return; 1475 1476 case RINGBUF_TYPE_TIME_EXTEND: 1477 delta = event->array[0]; 1478 delta <<= TS_SHIFT; 1479 delta += event->time_delta; 1480 cpu_buffer->read_stamp += delta; 1481 return; 1482 1483 case RINGBUF_TYPE_TIME_STAMP: 1484 /* FIXME: not implemented */ 1485 return; 1486 1487 case RINGBUF_TYPE_DATA: 1488 cpu_buffer->read_stamp += event->time_delta; 1489 return; 1490 1491 default: 1492 BUG(); 1493 } 1494 return; 1495 } 1496 1497 static void 1498 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 1499 struct ring_buffer_event *event) 1500 { 1501 u64 delta; 1502 1503 switch (event->type) { 1504 case RINGBUF_TYPE_PADDING: 1505 return; 1506 1507 case RINGBUF_TYPE_TIME_EXTEND: 1508 delta = event->array[0]; 1509 delta <<= TS_SHIFT; 1510 delta += event->time_delta; 1511 iter->read_stamp += delta; 1512 return; 1513 1514 case RINGBUF_TYPE_TIME_STAMP: 1515 /* FIXME: not implemented */ 1516 return; 1517 1518 case RINGBUF_TYPE_DATA: 1519 iter->read_stamp += event->time_delta; 1520 return; 1521 1522 default: 1523 BUG(); 1524 } 1525 return; 1526 } 1527 1528 static struct buffer_page * 1529 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1530 { 1531 struct buffer_page *reader = NULL; 1532 unsigned long flags; 1533 1534 spin_lock_irqsave(&cpu_buffer->lock, flags); 1535 1536 again: 1537 reader = cpu_buffer->reader_page; 1538 1539 /* If there's more to read, return this page */ 1540 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 1541 goto out; 1542 1543 /* Never should we have an index greater than the size */ 1544 RB_WARN_ON(cpu_buffer, 1545 cpu_buffer->reader_page->read > rb_page_size(reader)); 1546 1547 /* check if we caught up to the tail */ 1548 reader = NULL; 1549 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 1550 goto out; 1551 1552 /* 1553 * Splice the empty reader page into the list around the head. 1554 * Reset the reader page to size zero. 1555 */ 1556 1557 reader = cpu_buffer->head_page; 1558 cpu_buffer->reader_page->list.next = reader->list.next; 1559 cpu_buffer->reader_page->list.prev = reader->list.prev; 1560 1561 local_set(&cpu_buffer->reader_page->write, 0); 1562 local_set(&cpu_buffer->reader_page->commit, 0); 1563 1564 /* Make the reader page now replace the head */ 1565 reader->list.prev->next = &cpu_buffer->reader_page->list; 1566 reader->list.next->prev = &cpu_buffer->reader_page->list; 1567 1568 /* 1569 * If the tail is on the reader, then we must set the head 1570 * to the inserted page, otherwise we set it one before. 1571 */ 1572 cpu_buffer->head_page = cpu_buffer->reader_page; 1573 1574 if (cpu_buffer->commit_page != reader) 1575 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 1576 1577 /* Finally update the reader page to the new head */ 1578 cpu_buffer->reader_page = reader; 1579 rb_reset_reader_page(cpu_buffer); 1580 1581 goto again; 1582 1583 out: 1584 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1585 1586 return reader; 1587 } 1588 1589 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 1590 { 1591 struct ring_buffer_event *event; 1592 struct buffer_page *reader; 1593 unsigned length; 1594 1595 reader = rb_get_reader_page(cpu_buffer); 1596 1597 /* This function should not be called when buffer is empty */ 1598 BUG_ON(!reader); 1599 1600 event = rb_reader_event(cpu_buffer); 1601 1602 if (event->type == RINGBUF_TYPE_DATA) 1603 cpu_buffer->entries--; 1604 1605 rb_update_read_stamp(cpu_buffer, event); 1606 1607 length = rb_event_length(event); 1608 cpu_buffer->reader_page->read += length; 1609 } 1610 1611 static void rb_advance_iter(struct ring_buffer_iter *iter) 1612 { 1613 struct ring_buffer *buffer; 1614 struct ring_buffer_per_cpu *cpu_buffer; 1615 struct ring_buffer_event *event; 1616 unsigned length; 1617 1618 cpu_buffer = iter->cpu_buffer; 1619 buffer = cpu_buffer->buffer; 1620 1621 /* 1622 * Check if we are at the end of the buffer. 1623 */ 1624 if (iter->head >= rb_page_size(iter->head_page)) { 1625 BUG_ON(iter->head_page == cpu_buffer->commit_page); 1626 rb_inc_iter(iter); 1627 return; 1628 } 1629 1630 event = rb_iter_head_event(iter); 1631 1632 length = rb_event_length(event); 1633 1634 /* 1635 * This should not be called to advance the header if we are 1636 * at the tail of the buffer. 1637 */ 1638 BUG_ON((iter->head_page == cpu_buffer->commit_page) && 1639 (iter->head + length > rb_commit_index(cpu_buffer))); 1640 1641 rb_update_iter_read_stamp(iter, event); 1642 1643 iter->head += length; 1644 1645 /* check for end of page padding */ 1646 if ((iter->head >= rb_page_size(iter->head_page)) && 1647 (iter->head_page != cpu_buffer->commit_page)) 1648 rb_advance_iter(iter); 1649 } 1650 1651 /** 1652 * ring_buffer_peek - peek at the next event to be read 1653 * @buffer: The ring buffer to read 1654 * @cpu: The cpu to peak at 1655 * @ts: The timestamp counter of this event. 1656 * 1657 * This will return the event that will be read next, but does 1658 * not consume the data. 1659 */ 1660 struct ring_buffer_event * 1661 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1662 { 1663 struct ring_buffer_per_cpu *cpu_buffer; 1664 struct ring_buffer_event *event; 1665 struct buffer_page *reader; 1666 1667 if (!cpu_isset(cpu, buffer->cpumask)) 1668 return NULL; 1669 1670 cpu_buffer = buffer->buffers[cpu]; 1671 1672 again: 1673 reader = rb_get_reader_page(cpu_buffer); 1674 if (!reader) 1675 return NULL; 1676 1677 event = rb_reader_event(cpu_buffer); 1678 1679 switch (event->type) { 1680 case RINGBUF_TYPE_PADDING: 1681 RB_WARN_ON(cpu_buffer, 1); 1682 rb_advance_reader(cpu_buffer); 1683 return NULL; 1684 1685 case RINGBUF_TYPE_TIME_EXTEND: 1686 /* Internal data, OK to advance */ 1687 rb_advance_reader(cpu_buffer); 1688 goto again; 1689 1690 case RINGBUF_TYPE_TIME_STAMP: 1691 /* FIXME: not implemented */ 1692 rb_advance_reader(cpu_buffer); 1693 goto again; 1694 1695 case RINGBUF_TYPE_DATA: 1696 if (ts) { 1697 *ts = cpu_buffer->read_stamp + event->time_delta; 1698 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1699 } 1700 return event; 1701 1702 default: 1703 BUG(); 1704 } 1705 1706 return NULL; 1707 } 1708 1709 /** 1710 * ring_buffer_iter_peek - peek at the next event to be read 1711 * @iter: The ring buffer iterator 1712 * @ts: The timestamp counter of this event. 1713 * 1714 * This will return the event that will be read next, but does 1715 * not increment the iterator. 1716 */ 1717 struct ring_buffer_event * 1718 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 1719 { 1720 struct ring_buffer *buffer; 1721 struct ring_buffer_per_cpu *cpu_buffer; 1722 struct ring_buffer_event *event; 1723 1724 if (ring_buffer_iter_empty(iter)) 1725 return NULL; 1726 1727 cpu_buffer = iter->cpu_buffer; 1728 buffer = cpu_buffer->buffer; 1729 1730 again: 1731 if (rb_per_cpu_empty(cpu_buffer)) 1732 return NULL; 1733 1734 event = rb_iter_head_event(iter); 1735 1736 switch (event->type) { 1737 case RINGBUF_TYPE_PADDING: 1738 rb_inc_iter(iter); 1739 goto again; 1740 1741 case RINGBUF_TYPE_TIME_EXTEND: 1742 /* Internal data, OK to advance */ 1743 rb_advance_iter(iter); 1744 goto again; 1745 1746 case RINGBUF_TYPE_TIME_STAMP: 1747 /* FIXME: not implemented */ 1748 rb_advance_iter(iter); 1749 goto again; 1750 1751 case RINGBUF_TYPE_DATA: 1752 if (ts) { 1753 *ts = iter->read_stamp + event->time_delta; 1754 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1755 } 1756 return event; 1757 1758 default: 1759 BUG(); 1760 } 1761 1762 return NULL; 1763 } 1764 1765 /** 1766 * ring_buffer_consume - return an event and consume it 1767 * @buffer: The ring buffer to get the next event from 1768 * 1769 * Returns the next event in the ring buffer, and that event is consumed. 1770 * Meaning, that sequential reads will keep returning a different event, 1771 * and eventually empty the ring buffer if the producer is slower. 1772 */ 1773 struct ring_buffer_event * 1774 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 1775 { 1776 struct ring_buffer_per_cpu *cpu_buffer; 1777 struct ring_buffer_event *event; 1778 1779 if (!cpu_isset(cpu, buffer->cpumask)) 1780 return NULL; 1781 1782 event = ring_buffer_peek(buffer, cpu, ts); 1783 if (!event) 1784 return NULL; 1785 1786 cpu_buffer = buffer->buffers[cpu]; 1787 rb_advance_reader(cpu_buffer); 1788 1789 return event; 1790 } 1791 1792 /** 1793 * ring_buffer_read_start - start a non consuming read of the buffer 1794 * @buffer: The ring buffer to read from 1795 * @cpu: The cpu buffer to iterate over 1796 * 1797 * This starts up an iteration through the buffer. It also disables 1798 * the recording to the buffer until the reading is finished. 1799 * This prevents the reading from being corrupted. This is not 1800 * a consuming read, so a producer is not expected. 1801 * 1802 * Must be paired with ring_buffer_finish. 1803 */ 1804 struct ring_buffer_iter * 1805 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 1806 { 1807 struct ring_buffer_per_cpu *cpu_buffer; 1808 struct ring_buffer_iter *iter; 1809 unsigned long flags; 1810 1811 if (!cpu_isset(cpu, buffer->cpumask)) 1812 return NULL; 1813 1814 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 1815 if (!iter) 1816 return NULL; 1817 1818 cpu_buffer = buffer->buffers[cpu]; 1819 1820 iter->cpu_buffer = cpu_buffer; 1821 1822 atomic_inc(&cpu_buffer->record_disabled); 1823 synchronize_sched(); 1824 1825 spin_lock_irqsave(&cpu_buffer->lock, flags); 1826 ring_buffer_iter_reset(iter); 1827 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1828 1829 return iter; 1830 } 1831 1832 /** 1833 * ring_buffer_finish - finish reading the iterator of the buffer 1834 * @iter: The iterator retrieved by ring_buffer_start 1835 * 1836 * This re-enables the recording to the buffer, and frees the 1837 * iterator. 1838 */ 1839 void 1840 ring_buffer_read_finish(struct ring_buffer_iter *iter) 1841 { 1842 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1843 1844 atomic_dec(&cpu_buffer->record_disabled); 1845 kfree(iter); 1846 } 1847 1848 /** 1849 * ring_buffer_read - read the next item in the ring buffer by the iterator 1850 * @iter: The ring buffer iterator 1851 * @ts: The time stamp of the event read. 1852 * 1853 * This reads the next event in the ring buffer and increments the iterator. 1854 */ 1855 struct ring_buffer_event * 1856 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 1857 { 1858 struct ring_buffer_event *event; 1859 1860 event = ring_buffer_iter_peek(iter, ts); 1861 if (!event) 1862 return NULL; 1863 1864 rb_advance_iter(iter); 1865 1866 return event; 1867 } 1868 1869 /** 1870 * ring_buffer_size - return the size of the ring buffer (in bytes) 1871 * @buffer: The ring buffer. 1872 */ 1873 unsigned long ring_buffer_size(struct ring_buffer *buffer) 1874 { 1875 return BUF_PAGE_SIZE * buffer->pages; 1876 } 1877 1878 static void 1879 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 1880 { 1881 cpu_buffer->head_page 1882 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 1883 local_set(&cpu_buffer->head_page->write, 0); 1884 local_set(&cpu_buffer->head_page->commit, 0); 1885 1886 cpu_buffer->head_page->read = 0; 1887 1888 cpu_buffer->tail_page = cpu_buffer->head_page; 1889 cpu_buffer->commit_page = cpu_buffer->head_page; 1890 1891 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1892 local_set(&cpu_buffer->reader_page->write, 0); 1893 local_set(&cpu_buffer->reader_page->commit, 0); 1894 cpu_buffer->reader_page->read = 0; 1895 1896 cpu_buffer->overrun = 0; 1897 cpu_buffer->entries = 0; 1898 } 1899 1900 /** 1901 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 1902 * @buffer: The ring buffer to reset a per cpu buffer of 1903 * @cpu: The CPU buffer to be reset 1904 */ 1905 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 1906 { 1907 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 1908 unsigned long flags; 1909 1910 if (!cpu_isset(cpu, buffer->cpumask)) 1911 return; 1912 1913 spin_lock_irqsave(&cpu_buffer->lock, flags); 1914 1915 rb_reset_cpu(cpu_buffer); 1916 1917 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1918 } 1919 1920 /** 1921 * ring_buffer_reset - reset a ring buffer 1922 * @buffer: The ring buffer to reset all cpu buffers 1923 */ 1924 void ring_buffer_reset(struct ring_buffer *buffer) 1925 { 1926 int cpu; 1927 1928 for_each_buffer_cpu(buffer, cpu) 1929 ring_buffer_reset_cpu(buffer, cpu); 1930 } 1931 1932 /** 1933 * rind_buffer_empty - is the ring buffer empty? 1934 * @buffer: The ring buffer to test 1935 */ 1936 int ring_buffer_empty(struct ring_buffer *buffer) 1937 { 1938 struct ring_buffer_per_cpu *cpu_buffer; 1939 int cpu; 1940 1941 /* yes this is racy, but if you don't like the race, lock the buffer */ 1942 for_each_buffer_cpu(buffer, cpu) { 1943 cpu_buffer = buffer->buffers[cpu]; 1944 if (!rb_per_cpu_empty(cpu_buffer)) 1945 return 0; 1946 } 1947 return 1; 1948 } 1949 1950 /** 1951 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 1952 * @buffer: The ring buffer 1953 * @cpu: The CPU buffer to test 1954 */ 1955 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 1956 { 1957 struct ring_buffer_per_cpu *cpu_buffer; 1958 1959 if (!cpu_isset(cpu, buffer->cpumask)) 1960 return 1; 1961 1962 cpu_buffer = buffer->buffers[cpu]; 1963 return rb_per_cpu_empty(cpu_buffer); 1964 } 1965 1966 /** 1967 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 1968 * @buffer_a: One buffer to swap with 1969 * @buffer_b: The other buffer to swap with 1970 * 1971 * This function is useful for tracers that want to take a "snapshot" 1972 * of a CPU buffer and has another back up buffer lying around. 1973 * it is expected that the tracer handles the cpu buffer not being 1974 * used at the moment. 1975 */ 1976 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 1977 struct ring_buffer *buffer_b, int cpu) 1978 { 1979 struct ring_buffer_per_cpu *cpu_buffer_a; 1980 struct ring_buffer_per_cpu *cpu_buffer_b; 1981 1982 if (!cpu_isset(cpu, buffer_a->cpumask) || 1983 !cpu_isset(cpu, buffer_b->cpumask)) 1984 return -EINVAL; 1985 1986 /* At least make sure the two buffers are somewhat the same */ 1987 if (buffer_a->size != buffer_b->size || 1988 buffer_a->pages != buffer_b->pages) 1989 return -EINVAL; 1990 1991 cpu_buffer_a = buffer_a->buffers[cpu]; 1992 cpu_buffer_b = buffer_b->buffers[cpu]; 1993 1994 /* 1995 * We can't do a synchronize_sched here because this 1996 * function can be called in atomic context. 1997 * Normally this will be called from the same CPU as cpu. 1998 * If not it's up to the caller to protect this. 1999 */ 2000 atomic_inc(&cpu_buffer_a->record_disabled); 2001 atomic_inc(&cpu_buffer_b->record_disabled); 2002 2003 buffer_a->buffers[cpu] = cpu_buffer_b; 2004 buffer_b->buffers[cpu] = cpu_buffer_a; 2005 2006 cpu_buffer_b->buffer = buffer_a; 2007 cpu_buffer_a->buffer = buffer_b; 2008 2009 atomic_dec(&cpu_buffer_a->record_disabled); 2010 atomic_dec(&cpu_buffer_b->record_disabled); 2011 2012 return 0; 2013 } 2014 2015