1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/spinlock.h> 8 #include <linux/debugfs.h> 9 #include <linux/uaccess.h> 10 #include <linux/module.h> 11 #include <linux/percpu.h> 12 #include <linux/mutex.h> 13 #include <linux/sched.h> /* used for sched_clock() (for now) */ 14 #include <linux/init.h> 15 #include <linux/hash.h> 16 #include <linux/list.h> 17 #include <linux/fs.h> 18 19 /* Up this if you want to test the TIME_EXTENTS and normalization */ 20 #define DEBUG_SHIFT 0 21 22 /* FIXME!!! */ 23 u64 ring_buffer_time_stamp(int cpu) 24 { 25 /* shift to debug/test normalization and TIME_EXTENTS */ 26 return sched_clock() << DEBUG_SHIFT; 27 } 28 29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) 30 { 31 /* Just stupid testing the normalize function and deltas */ 32 *ts >>= DEBUG_SHIFT; 33 } 34 35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event)) 36 #define RB_ALIGNMENT_SHIFT 2 37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT) 38 #define RB_MAX_SMALL_DATA 28 39 40 enum { 41 RB_LEN_TIME_EXTEND = 8, 42 RB_LEN_TIME_STAMP = 16, 43 }; 44 45 /* inline for ring buffer fast paths */ 46 static inline unsigned 47 rb_event_length(struct ring_buffer_event *event) 48 { 49 unsigned length; 50 51 switch (event->type) { 52 case RINGBUF_TYPE_PADDING: 53 /* undefined */ 54 return -1; 55 56 case RINGBUF_TYPE_TIME_EXTEND: 57 return RB_LEN_TIME_EXTEND; 58 59 case RINGBUF_TYPE_TIME_STAMP: 60 return RB_LEN_TIME_STAMP; 61 62 case RINGBUF_TYPE_DATA: 63 if (event->len) 64 length = event->len << RB_ALIGNMENT_SHIFT; 65 else 66 length = event->array[0]; 67 return length + RB_EVNT_HDR_SIZE; 68 default: 69 BUG(); 70 } 71 /* not hit */ 72 return 0; 73 } 74 75 /** 76 * ring_buffer_event_length - return the length of the event 77 * @event: the event to get the length of 78 */ 79 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 80 { 81 return rb_event_length(event); 82 } 83 84 /* inline for ring buffer fast paths */ 85 static inline void * 86 rb_event_data(struct ring_buffer_event *event) 87 { 88 BUG_ON(event->type != RINGBUF_TYPE_DATA); 89 /* If length is in len field, then array[0] has the data */ 90 if (event->len) 91 return (void *)&event->array[0]; 92 /* Otherwise length is in array[0] and array[1] has the data */ 93 return (void *)&event->array[1]; 94 } 95 96 /** 97 * ring_buffer_event_data - return the data of the event 98 * @event: the event to get the data from 99 */ 100 void *ring_buffer_event_data(struct ring_buffer_event *event) 101 { 102 return rb_event_data(event); 103 } 104 105 #define for_each_buffer_cpu(buffer, cpu) \ 106 for_each_cpu_mask(cpu, buffer->cpumask) 107 108 #define TS_SHIFT 27 109 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 110 #define TS_DELTA_TEST (~TS_MASK) 111 112 /* 113 * This hack stolen from mm/slob.c. 114 * We can store per page timing information in the page frame of the page. 115 * Thanks to Peter Zijlstra for suggesting this idea. 116 */ 117 struct buffer_page { 118 u64 time_stamp; /* page time stamp */ 119 local_t write; /* index for next write */ 120 local_t commit; /* write commited index */ 121 unsigned read; /* index for next read */ 122 struct list_head list; /* list of free pages */ 123 void *page; /* Actual data page */ 124 }; 125 126 /* 127 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 128 * this issue out. 129 */ 130 static inline void free_buffer_page(struct buffer_page *bpage) 131 { 132 if (bpage->page) 133 free_page((unsigned long)bpage->page); 134 kfree(bpage); 135 } 136 137 /* 138 * We need to fit the time_stamp delta into 27 bits. 139 */ 140 static inline int test_time_stamp(u64 delta) 141 { 142 if (delta & TS_DELTA_TEST) 143 return 1; 144 return 0; 145 } 146 147 #define BUF_PAGE_SIZE PAGE_SIZE 148 149 /* 150 * head_page == tail_page && head == tail then buffer is empty. 151 */ 152 struct ring_buffer_per_cpu { 153 int cpu; 154 struct ring_buffer *buffer; 155 spinlock_t lock; 156 struct lock_class_key lock_key; 157 struct list_head pages; 158 struct buffer_page *head_page; /* read from head */ 159 struct buffer_page *tail_page; /* write to tail */ 160 struct buffer_page *commit_page; /* commited pages */ 161 struct buffer_page *reader_page; 162 unsigned long overrun; 163 unsigned long entries; 164 u64 write_stamp; 165 u64 read_stamp; 166 atomic_t record_disabled; 167 }; 168 169 struct ring_buffer { 170 unsigned long size; 171 unsigned pages; 172 unsigned flags; 173 int cpus; 174 cpumask_t cpumask; 175 atomic_t record_disabled; 176 177 struct mutex mutex; 178 179 struct ring_buffer_per_cpu **buffers; 180 }; 181 182 struct ring_buffer_iter { 183 struct ring_buffer_per_cpu *cpu_buffer; 184 unsigned long head; 185 struct buffer_page *head_page; 186 u64 read_stamp; 187 }; 188 189 #define RB_WARN_ON(buffer, cond) \ 190 do { \ 191 if (unlikely(cond)) { \ 192 atomic_inc(&buffer->record_disabled); \ 193 WARN_ON(1); \ 194 } \ 195 } while (0) 196 197 #define RB_WARN_ON_RET(buffer, cond) \ 198 do { \ 199 if (unlikely(cond)) { \ 200 atomic_inc(&buffer->record_disabled); \ 201 WARN_ON(1); \ 202 return -1; \ 203 } \ 204 } while (0) 205 206 #define RB_WARN_ON_ONCE(buffer, cond) \ 207 do { \ 208 static int once; \ 209 if (unlikely(cond) && !once) { \ 210 once++; \ 211 atomic_inc(&buffer->record_disabled); \ 212 WARN_ON(1); \ 213 } \ 214 } while (0) 215 216 /** 217 * check_pages - integrity check of buffer pages 218 * @cpu_buffer: CPU buffer with pages to test 219 * 220 * As a safty measure we check to make sure the data pages have not 221 * been corrupted. 222 */ 223 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 224 { 225 struct list_head *head = &cpu_buffer->pages; 226 struct buffer_page *page, *tmp; 227 228 RB_WARN_ON_RET(cpu_buffer, head->next->prev != head); 229 RB_WARN_ON_RET(cpu_buffer, head->prev->next != head); 230 231 list_for_each_entry_safe(page, tmp, head, list) { 232 RB_WARN_ON_RET(cpu_buffer, 233 page->list.next->prev != &page->list); 234 RB_WARN_ON_RET(cpu_buffer, 235 page->list.prev->next != &page->list); 236 } 237 238 return 0; 239 } 240 241 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 242 unsigned nr_pages) 243 { 244 struct list_head *head = &cpu_buffer->pages; 245 struct buffer_page *page, *tmp; 246 unsigned long addr; 247 LIST_HEAD(pages); 248 unsigned i; 249 250 for (i = 0; i < nr_pages; i++) { 251 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()), 252 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 253 if (!page) 254 goto free_pages; 255 list_add(&page->list, &pages); 256 257 addr = __get_free_page(GFP_KERNEL); 258 if (!addr) 259 goto free_pages; 260 page->page = (void *)addr; 261 } 262 263 list_splice(&pages, head); 264 265 rb_check_pages(cpu_buffer); 266 267 return 0; 268 269 free_pages: 270 list_for_each_entry_safe(page, tmp, &pages, list) { 271 list_del_init(&page->list); 272 free_buffer_page(page); 273 } 274 return -ENOMEM; 275 } 276 277 static struct ring_buffer_per_cpu * 278 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 279 { 280 struct ring_buffer_per_cpu *cpu_buffer; 281 struct buffer_page *page; 282 unsigned long addr; 283 int ret; 284 285 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 286 GFP_KERNEL, cpu_to_node(cpu)); 287 if (!cpu_buffer) 288 return NULL; 289 290 cpu_buffer->cpu = cpu; 291 cpu_buffer->buffer = buffer; 292 spin_lock_init(&cpu_buffer->lock); 293 INIT_LIST_HEAD(&cpu_buffer->pages); 294 295 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()), 296 GFP_KERNEL, cpu_to_node(cpu)); 297 if (!page) 298 goto fail_free_buffer; 299 300 cpu_buffer->reader_page = page; 301 addr = __get_free_page(GFP_KERNEL); 302 if (!addr) 303 goto fail_free_reader; 304 page->page = (void *)addr; 305 306 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 307 308 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 309 if (ret < 0) 310 goto fail_free_reader; 311 312 cpu_buffer->head_page 313 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 314 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 315 316 return cpu_buffer; 317 318 fail_free_reader: 319 free_buffer_page(cpu_buffer->reader_page); 320 321 fail_free_buffer: 322 kfree(cpu_buffer); 323 return NULL; 324 } 325 326 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 327 { 328 struct list_head *head = &cpu_buffer->pages; 329 struct buffer_page *page, *tmp; 330 331 list_del_init(&cpu_buffer->reader_page->list); 332 free_buffer_page(cpu_buffer->reader_page); 333 334 list_for_each_entry_safe(page, tmp, head, list) { 335 list_del_init(&page->list); 336 free_buffer_page(page); 337 } 338 kfree(cpu_buffer); 339 } 340 341 /* 342 * Causes compile errors if the struct buffer_page gets bigger 343 * than the struct page. 344 */ 345 extern int ring_buffer_page_too_big(void); 346 347 /** 348 * ring_buffer_alloc - allocate a new ring_buffer 349 * @size: the size in bytes that is needed. 350 * @flags: attributes to set for the ring buffer. 351 * 352 * Currently the only flag that is available is the RB_FL_OVERWRITE 353 * flag. This flag means that the buffer will overwrite old data 354 * when the buffer wraps. If this flag is not set, the buffer will 355 * drop data when the tail hits the head. 356 */ 357 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) 358 { 359 struct ring_buffer *buffer; 360 int bsize; 361 int cpu; 362 363 /* Paranoid! Optimizes out when all is well */ 364 if (sizeof(struct buffer_page) > sizeof(struct page)) 365 ring_buffer_page_too_big(); 366 367 368 /* keep it in its own cache line */ 369 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 370 GFP_KERNEL); 371 if (!buffer) 372 return NULL; 373 374 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 375 buffer->flags = flags; 376 377 /* need at least two pages */ 378 if (buffer->pages == 1) 379 buffer->pages++; 380 381 buffer->cpumask = cpu_possible_map; 382 buffer->cpus = nr_cpu_ids; 383 384 bsize = sizeof(void *) * nr_cpu_ids; 385 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 386 GFP_KERNEL); 387 if (!buffer->buffers) 388 goto fail_free_buffer; 389 390 for_each_buffer_cpu(buffer, cpu) { 391 buffer->buffers[cpu] = 392 rb_allocate_cpu_buffer(buffer, cpu); 393 if (!buffer->buffers[cpu]) 394 goto fail_free_buffers; 395 } 396 397 mutex_init(&buffer->mutex); 398 399 return buffer; 400 401 fail_free_buffers: 402 for_each_buffer_cpu(buffer, cpu) { 403 if (buffer->buffers[cpu]) 404 rb_free_cpu_buffer(buffer->buffers[cpu]); 405 } 406 kfree(buffer->buffers); 407 408 fail_free_buffer: 409 kfree(buffer); 410 return NULL; 411 } 412 413 /** 414 * ring_buffer_free - free a ring buffer. 415 * @buffer: the buffer to free. 416 */ 417 void 418 ring_buffer_free(struct ring_buffer *buffer) 419 { 420 int cpu; 421 422 for_each_buffer_cpu(buffer, cpu) 423 rb_free_cpu_buffer(buffer->buffers[cpu]); 424 425 kfree(buffer); 426 } 427 428 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 429 430 static void 431 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 432 { 433 struct buffer_page *page; 434 struct list_head *p; 435 unsigned i; 436 437 atomic_inc(&cpu_buffer->record_disabled); 438 synchronize_sched(); 439 440 for (i = 0; i < nr_pages; i++) { 441 BUG_ON(list_empty(&cpu_buffer->pages)); 442 p = cpu_buffer->pages.next; 443 page = list_entry(p, struct buffer_page, list); 444 list_del_init(&page->list); 445 free_buffer_page(page); 446 } 447 BUG_ON(list_empty(&cpu_buffer->pages)); 448 449 rb_reset_cpu(cpu_buffer); 450 451 rb_check_pages(cpu_buffer); 452 453 atomic_dec(&cpu_buffer->record_disabled); 454 455 } 456 457 static void 458 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 459 struct list_head *pages, unsigned nr_pages) 460 { 461 struct buffer_page *page; 462 struct list_head *p; 463 unsigned i; 464 465 atomic_inc(&cpu_buffer->record_disabled); 466 synchronize_sched(); 467 468 for (i = 0; i < nr_pages; i++) { 469 BUG_ON(list_empty(pages)); 470 p = pages->next; 471 page = list_entry(p, struct buffer_page, list); 472 list_del_init(&page->list); 473 list_add_tail(&page->list, &cpu_buffer->pages); 474 } 475 rb_reset_cpu(cpu_buffer); 476 477 rb_check_pages(cpu_buffer); 478 479 atomic_dec(&cpu_buffer->record_disabled); 480 } 481 482 /** 483 * ring_buffer_resize - resize the ring buffer 484 * @buffer: the buffer to resize. 485 * @size: the new size. 486 * 487 * The tracer is responsible for making sure that the buffer is 488 * not being used while changing the size. 489 * Note: We may be able to change the above requirement by using 490 * RCU synchronizations. 491 * 492 * Minimum size is 2 * BUF_PAGE_SIZE. 493 * 494 * Returns -1 on failure. 495 */ 496 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 497 { 498 struct ring_buffer_per_cpu *cpu_buffer; 499 unsigned nr_pages, rm_pages, new_pages; 500 struct buffer_page *page, *tmp; 501 unsigned long buffer_size; 502 unsigned long addr; 503 LIST_HEAD(pages); 504 int i, cpu; 505 506 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 507 size *= BUF_PAGE_SIZE; 508 buffer_size = buffer->pages * BUF_PAGE_SIZE; 509 510 /* we need a minimum of two pages */ 511 if (size < BUF_PAGE_SIZE * 2) 512 size = BUF_PAGE_SIZE * 2; 513 514 if (size == buffer_size) 515 return size; 516 517 mutex_lock(&buffer->mutex); 518 519 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 520 521 if (size < buffer_size) { 522 523 /* easy case, just free pages */ 524 BUG_ON(nr_pages >= buffer->pages); 525 526 rm_pages = buffer->pages - nr_pages; 527 528 for_each_buffer_cpu(buffer, cpu) { 529 cpu_buffer = buffer->buffers[cpu]; 530 rb_remove_pages(cpu_buffer, rm_pages); 531 } 532 goto out; 533 } 534 535 /* 536 * This is a bit more difficult. We only want to add pages 537 * when we can allocate enough for all CPUs. We do this 538 * by allocating all the pages and storing them on a local 539 * link list. If we succeed in our allocation, then we 540 * add these pages to the cpu_buffers. Otherwise we just free 541 * them all and return -ENOMEM; 542 */ 543 BUG_ON(nr_pages <= buffer->pages); 544 new_pages = nr_pages - buffer->pages; 545 546 for_each_buffer_cpu(buffer, cpu) { 547 for (i = 0; i < new_pages; i++) { 548 page = kzalloc_node(ALIGN(sizeof(*page), 549 cache_line_size()), 550 GFP_KERNEL, cpu_to_node(cpu)); 551 if (!page) 552 goto free_pages; 553 list_add(&page->list, &pages); 554 addr = __get_free_page(GFP_KERNEL); 555 if (!addr) 556 goto free_pages; 557 page->page = (void *)addr; 558 } 559 } 560 561 for_each_buffer_cpu(buffer, cpu) { 562 cpu_buffer = buffer->buffers[cpu]; 563 rb_insert_pages(cpu_buffer, &pages, new_pages); 564 } 565 566 BUG_ON(!list_empty(&pages)); 567 568 out: 569 buffer->pages = nr_pages; 570 mutex_unlock(&buffer->mutex); 571 572 return size; 573 574 free_pages: 575 list_for_each_entry_safe(page, tmp, &pages, list) { 576 list_del_init(&page->list); 577 free_buffer_page(page); 578 } 579 return -ENOMEM; 580 } 581 582 static inline int rb_null_event(struct ring_buffer_event *event) 583 { 584 return event->type == RINGBUF_TYPE_PADDING; 585 } 586 587 static inline void *__rb_page_index(struct buffer_page *page, unsigned index) 588 { 589 return page->page + index; 590 } 591 592 static inline struct ring_buffer_event * 593 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 594 { 595 return __rb_page_index(cpu_buffer->reader_page, 596 cpu_buffer->reader_page->read); 597 } 598 599 static inline struct ring_buffer_event * 600 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 601 { 602 return __rb_page_index(cpu_buffer->head_page, 603 cpu_buffer->head_page->read); 604 } 605 606 static inline struct ring_buffer_event * 607 rb_iter_head_event(struct ring_buffer_iter *iter) 608 { 609 return __rb_page_index(iter->head_page, iter->head); 610 } 611 612 static inline unsigned rb_page_write(struct buffer_page *bpage) 613 { 614 return local_read(&bpage->write); 615 } 616 617 static inline unsigned rb_page_commit(struct buffer_page *bpage) 618 { 619 return local_read(&bpage->commit); 620 } 621 622 /* Size is determined by what has been commited */ 623 static inline unsigned rb_page_size(struct buffer_page *bpage) 624 { 625 return rb_page_commit(bpage); 626 } 627 628 static inline unsigned 629 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 630 { 631 return rb_page_commit(cpu_buffer->commit_page); 632 } 633 634 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 635 { 636 return rb_page_commit(cpu_buffer->head_page); 637 } 638 639 /* 640 * When the tail hits the head and the buffer is in overwrite mode, 641 * the head jumps to the next page and all content on the previous 642 * page is discarded. But before doing so, we update the overrun 643 * variable of the buffer. 644 */ 645 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer) 646 { 647 struct ring_buffer_event *event; 648 unsigned long head; 649 650 for (head = 0; head < rb_head_size(cpu_buffer); 651 head += rb_event_length(event)) { 652 653 event = __rb_page_index(cpu_buffer->head_page, head); 654 BUG_ON(rb_null_event(event)); 655 /* Only count data entries */ 656 if (event->type != RINGBUF_TYPE_DATA) 657 continue; 658 cpu_buffer->overrun++; 659 cpu_buffer->entries--; 660 } 661 } 662 663 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 664 struct buffer_page **page) 665 { 666 struct list_head *p = (*page)->list.next; 667 668 if (p == &cpu_buffer->pages) 669 p = p->next; 670 671 *page = list_entry(p, struct buffer_page, list); 672 } 673 674 static inline unsigned 675 rb_event_index(struct ring_buffer_event *event) 676 { 677 unsigned long addr = (unsigned long)event; 678 679 return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); 680 } 681 682 static inline int 683 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 684 struct ring_buffer_event *event) 685 { 686 unsigned long addr = (unsigned long)event; 687 unsigned long index; 688 689 index = rb_event_index(event); 690 addr &= PAGE_MASK; 691 692 return cpu_buffer->commit_page->page == (void *)addr && 693 rb_commit_index(cpu_buffer) == index; 694 } 695 696 static inline void 697 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, 698 struct ring_buffer_event *event) 699 { 700 unsigned long addr = (unsigned long)event; 701 unsigned long index; 702 703 index = rb_event_index(event); 704 addr &= PAGE_MASK; 705 706 while (cpu_buffer->commit_page->page != (void *)addr) { 707 RB_WARN_ON(cpu_buffer, 708 cpu_buffer->commit_page == cpu_buffer->tail_page); 709 cpu_buffer->commit_page->commit = 710 cpu_buffer->commit_page->write; 711 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 712 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp; 713 } 714 715 /* Now set the commit to the event's index */ 716 local_set(&cpu_buffer->commit_page->commit, index); 717 } 718 719 static inline void 720 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 721 { 722 /* 723 * We only race with interrupts and NMIs on this CPU. 724 * If we own the commit event, then we can commit 725 * all others that interrupted us, since the interruptions 726 * are in stack format (they finish before they come 727 * back to us). This allows us to do a simple loop to 728 * assign the commit to the tail. 729 */ 730 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 731 cpu_buffer->commit_page->commit = 732 cpu_buffer->commit_page->write; 733 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 734 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp; 735 /* add barrier to keep gcc from optimizing too much */ 736 barrier(); 737 } 738 while (rb_commit_index(cpu_buffer) != 739 rb_page_write(cpu_buffer->commit_page)) { 740 cpu_buffer->commit_page->commit = 741 cpu_buffer->commit_page->write; 742 barrier(); 743 } 744 } 745 746 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 747 { 748 cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp; 749 cpu_buffer->reader_page->read = 0; 750 } 751 752 static inline void rb_inc_iter(struct ring_buffer_iter *iter) 753 { 754 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 755 756 /* 757 * The iterator could be on the reader page (it starts there). 758 * But the head could have moved, since the reader was 759 * found. Check for this case and assign the iterator 760 * to the head page instead of next. 761 */ 762 if (iter->head_page == cpu_buffer->reader_page) 763 iter->head_page = cpu_buffer->head_page; 764 else 765 rb_inc_page(cpu_buffer, &iter->head_page); 766 767 iter->read_stamp = iter->head_page->time_stamp; 768 iter->head = 0; 769 } 770 771 /** 772 * ring_buffer_update_event - update event type and data 773 * @event: the even to update 774 * @type: the type of event 775 * @length: the size of the event field in the ring buffer 776 * 777 * Update the type and data fields of the event. The length 778 * is the actual size that is written to the ring buffer, 779 * and with this, we can determine what to place into the 780 * data field. 781 */ 782 static inline void 783 rb_update_event(struct ring_buffer_event *event, 784 unsigned type, unsigned length) 785 { 786 event->type = type; 787 788 switch (type) { 789 790 case RINGBUF_TYPE_PADDING: 791 break; 792 793 case RINGBUF_TYPE_TIME_EXTEND: 794 event->len = 795 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1)) 796 >> RB_ALIGNMENT_SHIFT; 797 break; 798 799 case RINGBUF_TYPE_TIME_STAMP: 800 event->len = 801 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1)) 802 >> RB_ALIGNMENT_SHIFT; 803 break; 804 805 case RINGBUF_TYPE_DATA: 806 length -= RB_EVNT_HDR_SIZE; 807 if (length > RB_MAX_SMALL_DATA) { 808 event->len = 0; 809 event->array[0] = length; 810 } else 811 event->len = 812 (length + (RB_ALIGNMENT-1)) 813 >> RB_ALIGNMENT_SHIFT; 814 break; 815 default: 816 BUG(); 817 } 818 } 819 820 static inline unsigned rb_calculate_event_length(unsigned length) 821 { 822 struct ring_buffer_event event; /* Used only for sizeof array */ 823 824 /* zero length can cause confusions */ 825 if (!length) 826 length = 1; 827 828 if (length > RB_MAX_SMALL_DATA) 829 length += sizeof(event.array[0]); 830 831 length += RB_EVNT_HDR_SIZE; 832 length = ALIGN(length, RB_ALIGNMENT); 833 834 return length; 835 } 836 837 static struct ring_buffer_event * 838 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 839 unsigned type, unsigned long length, u64 *ts) 840 { 841 struct buffer_page *tail_page, *head_page, *reader_page; 842 unsigned long tail, write; 843 struct ring_buffer *buffer = cpu_buffer->buffer; 844 struct ring_buffer_event *event; 845 unsigned long flags; 846 847 tail_page = cpu_buffer->tail_page; 848 write = local_add_return(length, &tail_page->write); 849 tail = write - length; 850 851 /* See if we shot pass the end of this buffer page */ 852 if (write > BUF_PAGE_SIZE) { 853 struct buffer_page *next_page = tail_page; 854 855 spin_lock_irqsave(&cpu_buffer->lock, flags); 856 857 rb_inc_page(cpu_buffer, &next_page); 858 859 head_page = cpu_buffer->head_page; 860 reader_page = cpu_buffer->reader_page; 861 862 /* we grabbed the lock before incrementing */ 863 RB_WARN_ON(cpu_buffer, next_page == reader_page); 864 865 /* 866 * If for some reason, we had an interrupt storm that made 867 * it all the way around the buffer, bail, and warn 868 * about it. 869 */ 870 if (unlikely(next_page == cpu_buffer->commit_page)) { 871 WARN_ON_ONCE(1); 872 goto out_unlock; 873 } 874 875 if (next_page == head_page) { 876 if (!(buffer->flags & RB_FL_OVERWRITE)) { 877 /* reset write */ 878 if (tail <= BUF_PAGE_SIZE) 879 local_set(&tail_page->write, tail); 880 goto out_unlock; 881 } 882 883 /* tail_page has not moved yet? */ 884 if (tail_page == cpu_buffer->tail_page) { 885 /* count overflows */ 886 rb_update_overflow(cpu_buffer); 887 888 rb_inc_page(cpu_buffer, &head_page); 889 cpu_buffer->head_page = head_page; 890 cpu_buffer->head_page->read = 0; 891 } 892 } 893 894 /* 895 * If the tail page is still the same as what we think 896 * it is, then it is up to us to update the tail 897 * pointer. 898 */ 899 if (tail_page == cpu_buffer->tail_page) { 900 local_set(&next_page->write, 0); 901 local_set(&next_page->commit, 0); 902 cpu_buffer->tail_page = next_page; 903 904 /* reread the time stamp */ 905 *ts = ring_buffer_time_stamp(cpu_buffer->cpu); 906 cpu_buffer->tail_page->time_stamp = *ts; 907 } 908 909 /* 910 * The actual tail page has moved forward. 911 */ 912 if (tail < BUF_PAGE_SIZE) { 913 /* Mark the rest of the page with padding */ 914 event = __rb_page_index(tail_page, tail); 915 event->type = RINGBUF_TYPE_PADDING; 916 } 917 918 if (tail <= BUF_PAGE_SIZE) 919 /* Set the write back to the previous setting */ 920 local_set(&tail_page->write, tail); 921 922 /* 923 * If this was a commit entry that failed, 924 * increment that too 925 */ 926 if (tail_page == cpu_buffer->commit_page && 927 tail == rb_commit_index(cpu_buffer)) { 928 rb_set_commit_to_write(cpu_buffer); 929 } 930 931 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 932 933 /* fail and let the caller try again */ 934 return ERR_PTR(-EAGAIN); 935 } 936 937 /* We reserved something on the buffer */ 938 939 BUG_ON(write > BUF_PAGE_SIZE); 940 941 event = __rb_page_index(tail_page, tail); 942 rb_update_event(event, type, length); 943 944 /* 945 * If this is a commit and the tail is zero, then update 946 * this page's time stamp. 947 */ 948 if (!tail && rb_is_commit(cpu_buffer, event)) 949 cpu_buffer->commit_page->time_stamp = *ts; 950 951 return event; 952 953 out_unlock: 954 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 955 return NULL; 956 } 957 958 static int 959 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 960 u64 *ts, u64 *delta) 961 { 962 struct ring_buffer_event *event; 963 static int once; 964 int ret; 965 966 if (unlikely(*delta > (1ULL << 59) && !once++)) { 967 printk(KERN_WARNING "Delta way too big! %llu" 968 " ts=%llu write stamp = %llu\n", 969 (unsigned long long)*delta, 970 (unsigned long long)*ts, 971 (unsigned long long)cpu_buffer->write_stamp); 972 WARN_ON(1); 973 } 974 975 /* 976 * The delta is too big, we to add a 977 * new timestamp. 978 */ 979 event = __rb_reserve_next(cpu_buffer, 980 RINGBUF_TYPE_TIME_EXTEND, 981 RB_LEN_TIME_EXTEND, 982 ts); 983 if (!event) 984 return -EBUSY; 985 986 if (PTR_ERR(event) == -EAGAIN) 987 return -EAGAIN; 988 989 /* Only a commited time event can update the write stamp */ 990 if (rb_is_commit(cpu_buffer, event)) { 991 /* 992 * If this is the first on the page, then we need to 993 * update the page itself, and just put in a zero. 994 */ 995 if (rb_event_index(event)) { 996 event->time_delta = *delta & TS_MASK; 997 event->array[0] = *delta >> TS_SHIFT; 998 } else { 999 cpu_buffer->commit_page->time_stamp = *ts; 1000 event->time_delta = 0; 1001 event->array[0] = 0; 1002 } 1003 cpu_buffer->write_stamp = *ts; 1004 /* let the caller know this was the commit */ 1005 ret = 1; 1006 } else { 1007 /* Darn, this is just wasted space */ 1008 event->time_delta = 0; 1009 event->array[0] = 0; 1010 ret = 0; 1011 } 1012 1013 *delta = 0; 1014 1015 return ret; 1016 } 1017 1018 static struct ring_buffer_event * 1019 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1020 unsigned type, unsigned long length) 1021 { 1022 struct ring_buffer_event *event; 1023 u64 ts, delta; 1024 int commit = 0; 1025 int nr_loops = 0; 1026 1027 again: 1028 /* 1029 * We allow for interrupts to reenter here and do a trace. 1030 * If one does, it will cause this original code to loop 1031 * back here. Even with heavy interrupts happening, this 1032 * should only happen a few times in a row. If this happens 1033 * 1000 times in a row, there must be either an interrupt 1034 * storm or we have something buggy. 1035 * Bail! 1036 */ 1037 if (unlikely(++nr_loops > 1000)) { 1038 RB_WARN_ON(cpu_buffer, 1); 1039 return NULL; 1040 } 1041 1042 ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1043 1044 /* 1045 * Only the first commit can update the timestamp. 1046 * Yes there is a race here. If an interrupt comes in 1047 * just after the conditional and it traces too, then it 1048 * will also check the deltas. More than one timestamp may 1049 * also be made. But only the entry that did the actual 1050 * commit will be something other than zero. 1051 */ 1052 if (cpu_buffer->tail_page == cpu_buffer->commit_page && 1053 rb_page_write(cpu_buffer->tail_page) == 1054 rb_commit_index(cpu_buffer)) { 1055 1056 delta = ts - cpu_buffer->write_stamp; 1057 1058 /* make sure this delta is calculated here */ 1059 barrier(); 1060 1061 /* Did the write stamp get updated already? */ 1062 if (unlikely(ts < cpu_buffer->write_stamp)) 1063 goto again; 1064 1065 if (test_time_stamp(delta)) { 1066 1067 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1068 1069 if (commit == -EBUSY) 1070 return NULL; 1071 1072 if (commit == -EAGAIN) 1073 goto again; 1074 1075 RB_WARN_ON(cpu_buffer, commit < 0); 1076 } 1077 } else 1078 /* Non commits have zero deltas */ 1079 delta = 0; 1080 1081 event = __rb_reserve_next(cpu_buffer, type, length, &ts); 1082 if (PTR_ERR(event) == -EAGAIN) 1083 goto again; 1084 1085 if (!event) { 1086 if (unlikely(commit)) 1087 /* 1088 * Ouch! We needed a timestamp and it was commited. But 1089 * we didn't get our event reserved. 1090 */ 1091 rb_set_commit_to_write(cpu_buffer); 1092 return NULL; 1093 } 1094 1095 /* 1096 * If the timestamp was commited, make the commit our entry 1097 * now so that we will update it when needed. 1098 */ 1099 if (commit) 1100 rb_set_commit_event(cpu_buffer, event); 1101 else if (!rb_is_commit(cpu_buffer, event)) 1102 delta = 0; 1103 1104 event->time_delta = delta; 1105 1106 return event; 1107 } 1108 1109 static DEFINE_PER_CPU(int, rb_need_resched); 1110 1111 /** 1112 * ring_buffer_lock_reserve - reserve a part of the buffer 1113 * @buffer: the ring buffer to reserve from 1114 * @length: the length of the data to reserve (excluding event header) 1115 * @flags: a pointer to save the interrupt flags 1116 * 1117 * Returns a reseverd event on the ring buffer to copy directly to. 1118 * The user of this interface will need to get the body to write into 1119 * and can use the ring_buffer_event_data() interface. 1120 * 1121 * The length is the length of the data needed, not the event length 1122 * which also includes the event header. 1123 * 1124 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1125 * If NULL is returned, then nothing has been allocated or locked. 1126 */ 1127 struct ring_buffer_event * 1128 ring_buffer_lock_reserve(struct ring_buffer *buffer, 1129 unsigned long length, 1130 unsigned long *flags) 1131 { 1132 struct ring_buffer_per_cpu *cpu_buffer; 1133 struct ring_buffer_event *event; 1134 int cpu, resched; 1135 1136 if (atomic_read(&buffer->record_disabled)) 1137 return NULL; 1138 1139 /* If we are tracing schedule, we don't want to recurse */ 1140 resched = need_resched(); 1141 preempt_disable_notrace(); 1142 1143 cpu = raw_smp_processor_id(); 1144 1145 if (!cpu_isset(cpu, buffer->cpumask)) 1146 goto out; 1147 1148 cpu_buffer = buffer->buffers[cpu]; 1149 1150 if (atomic_read(&cpu_buffer->record_disabled)) 1151 goto out; 1152 1153 length = rb_calculate_event_length(length); 1154 if (length > BUF_PAGE_SIZE) 1155 goto out; 1156 1157 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length); 1158 if (!event) 1159 goto out; 1160 1161 /* 1162 * Need to store resched state on this cpu. 1163 * Only the first needs to. 1164 */ 1165 1166 if (preempt_count() == 1) 1167 per_cpu(rb_need_resched, cpu) = resched; 1168 1169 return event; 1170 1171 out: 1172 if (resched) 1173 preempt_enable_notrace(); 1174 else 1175 preempt_enable_notrace(); 1176 return NULL; 1177 } 1178 1179 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1180 struct ring_buffer_event *event) 1181 { 1182 cpu_buffer->entries++; 1183 1184 /* Only process further if we own the commit */ 1185 if (!rb_is_commit(cpu_buffer, event)) 1186 return; 1187 1188 cpu_buffer->write_stamp += event->time_delta; 1189 1190 rb_set_commit_to_write(cpu_buffer); 1191 } 1192 1193 /** 1194 * ring_buffer_unlock_commit - commit a reserved 1195 * @buffer: The buffer to commit to 1196 * @event: The event pointer to commit. 1197 * @flags: the interrupt flags received from ring_buffer_lock_reserve. 1198 * 1199 * This commits the data to the ring buffer, and releases any locks held. 1200 * 1201 * Must be paired with ring_buffer_lock_reserve. 1202 */ 1203 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1204 struct ring_buffer_event *event, 1205 unsigned long flags) 1206 { 1207 struct ring_buffer_per_cpu *cpu_buffer; 1208 int cpu = raw_smp_processor_id(); 1209 1210 cpu_buffer = buffer->buffers[cpu]; 1211 1212 rb_commit(cpu_buffer, event); 1213 1214 /* 1215 * Only the last preempt count needs to restore preemption. 1216 */ 1217 if (preempt_count() == 1) { 1218 if (per_cpu(rb_need_resched, cpu)) 1219 preempt_enable_no_resched_notrace(); 1220 else 1221 preempt_enable_notrace(); 1222 } else 1223 preempt_enable_no_resched_notrace(); 1224 1225 return 0; 1226 } 1227 1228 /** 1229 * ring_buffer_write - write data to the buffer without reserving 1230 * @buffer: The ring buffer to write to. 1231 * @length: The length of the data being written (excluding the event header) 1232 * @data: The data to write to the buffer. 1233 * 1234 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1235 * one function. If you already have the data to write to the buffer, it 1236 * may be easier to simply call this function. 1237 * 1238 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1239 * and not the length of the event which would hold the header. 1240 */ 1241 int ring_buffer_write(struct ring_buffer *buffer, 1242 unsigned long length, 1243 void *data) 1244 { 1245 struct ring_buffer_per_cpu *cpu_buffer; 1246 struct ring_buffer_event *event; 1247 unsigned long event_length; 1248 void *body; 1249 int ret = -EBUSY; 1250 int cpu, resched; 1251 1252 if (atomic_read(&buffer->record_disabled)) 1253 return -EBUSY; 1254 1255 resched = need_resched(); 1256 preempt_disable_notrace(); 1257 1258 cpu = raw_smp_processor_id(); 1259 1260 if (!cpu_isset(cpu, buffer->cpumask)) 1261 goto out; 1262 1263 cpu_buffer = buffer->buffers[cpu]; 1264 1265 if (atomic_read(&cpu_buffer->record_disabled)) 1266 goto out; 1267 1268 event_length = rb_calculate_event_length(length); 1269 event = rb_reserve_next_event(cpu_buffer, 1270 RINGBUF_TYPE_DATA, event_length); 1271 if (!event) 1272 goto out; 1273 1274 body = rb_event_data(event); 1275 1276 memcpy(body, data, length); 1277 1278 rb_commit(cpu_buffer, event); 1279 1280 ret = 0; 1281 out: 1282 if (resched) 1283 preempt_enable_no_resched_notrace(); 1284 else 1285 preempt_enable_notrace(); 1286 1287 return ret; 1288 } 1289 1290 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1291 { 1292 struct buffer_page *reader = cpu_buffer->reader_page; 1293 struct buffer_page *head = cpu_buffer->head_page; 1294 struct buffer_page *commit = cpu_buffer->commit_page; 1295 1296 return reader->read == rb_page_commit(reader) && 1297 (commit == reader || 1298 (commit == head && 1299 head->read == rb_page_commit(commit))); 1300 } 1301 1302 /** 1303 * ring_buffer_record_disable - stop all writes into the buffer 1304 * @buffer: The ring buffer to stop writes to. 1305 * 1306 * This prevents all writes to the buffer. Any attempt to write 1307 * to the buffer after this will fail and return NULL. 1308 * 1309 * The caller should call synchronize_sched() after this. 1310 */ 1311 void ring_buffer_record_disable(struct ring_buffer *buffer) 1312 { 1313 atomic_inc(&buffer->record_disabled); 1314 } 1315 1316 /** 1317 * ring_buffer_record_enable - enable writes to the buffer 1318 * @buffer: The ring buffer to enable writes 1319 * 1320 * Note, multiple disables will need the same number of enables 1321 * to truely enable the writing (much like preempt_disable). 1322 */ 1323 void ring_buffer_record_enable(struct ring_buffer *buffer) 1324 { 1325 atomic_dec(&buffer->record_disabled); 1326 } 1327 1328 /** 1329 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1330 * @buffer: The ring buffer to stop writes to. 1331 * @cpu: The CPU buffer to stop 1332 * 1333 * This prevents all writes to the buffer. Any attempt to write 1334 * to the buffer after this will fail and return NULL. 1335 * 1336 * The caller should call synchronize_sched() after this. 1337 */ 1338 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1339 { 1340 struct ring_buffer_per_cpu *cpu_buffer; 1341 1342 if (!cpu_isset(cpu, buffer->cpumask)) 1343 return; 1344 1345 cpu_buffer = buffer->buffers[cpu]; 1346 atomic_inc(&cpu_buffer->record_disabled); 1347 } 1348 1349 /** 1350 * ring_buffer_record_enable_cpu - enable writes to the buffer 1351 * @buffer: The ring buffer to enable writes 1352 * @cpu: The CPU to enable. 1353 * 1354 * Note, multiple disables will need the same number of enables 1355 * to truely enable the writing (much like preempt_disable). 1356 */ 1357 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1358 { 1359 struct ring_buffer_per_cpu *cpu_buffer; 1360 1361 if (!cpu_isset(cpu, buffer->cpumask)) 1362 return; 1363 1364 cpu_buffer = buffer->buffers[cpu]; 1365 atomic_dec(&cpu_buffer->record_disabled); 1366 } 1367 1368 /** 1369 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1370 * @buffer: The ring buffer 1371 * @cpu: The per CPU buffer to get the entries from. 1372 */ 1373 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1374 { 1375 struct ring_buffer_per_cpu *cpu_buffer; 1376 1377 if (!cpu_isset(cpu, buffer->cpumask)) 1378 return 0; 1379 1380 cpu_buffer = buffer->buffers[cpu]; 1381 return cpu_buffer->entries; 1382 } 1383 1384 /** 1385 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1386 * @buffer: The ring buffer 1387 * @cpu: The per CPU buffer to get the number of overruns from 1388 */ 1389 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1390 { 1391 struct ring_buffer_per_cpu *cpu_buffer; 1392 1393 if (!cpu_isset(cpu, buffer->cpumask)) 1394 return 0; 1395 1396 cpu_buffer = buffer->buffers[cpu]; 1397 return cpu_buffer->overrun; 1398 } 1399 1400 /** 1401 * ring_buffer_entries - get the number of entries in a buffer 1402 * @buffer: The ring buffer 1403 * 1404 * Returns the total number of entries in the ring buffer 1405 * (all CPU entries) 1406 */ 1407 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 1408 { 1409 struct ring_buffer_per_cpu *cpu_buffer; 1410 unsigned long entries = 0; 1411 int cpu; 1412 1413 /* if you care about this being correct, lock the buffer */ 1414 for_each_buffer_cpu(buffer, cpu) { 1415 cpu_buffer = buffer->buffers[cpu]; 1416 entries += cpu_buffer->entries; 1417 } 1418 1419 return entries; 1420 } 1421 1422 /** 1423 * ring_buffer_overrun_cpu - get the number of overruns in buffer 1424 * @buffer: The ring buffer 1425 * 1426 * Returns the total number of overruns in the ring buffer 1427 * (all CPU entries) 1428 */ 1429 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 1430 { 1431 struct ring_buffer_per_cpu *cpu_buffer; 1432 unsigned long overruns = 0; 1433 int cpu; 1434 1435 /* if you care about this being correct, lock the buffer */ 1436 for_each_buffer_cpu(buffer, cpu) { 1437 cpu_buffer = buffer->buffers[cpu]; 1438 overruns += cpu_buffer->overrun; 1439 } 1440 1441 return overruns; 1442 } 1443 1444 /** 1445 * ring_buffer_iter_reset - reset an iterator 1446 * @iter: The iterator to reset 1447 * 1448 * Resets the iterator, so that it will start from the beginning 1449 * again. 1450 */ 1451 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 1452 { 1453 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1454 1455 /* Iterator usage is expected to have record disabled */ 1456 if (list_empty(&cpu_buffer->reader_page->list)) { 1457 iter->head_page = cpu_buffer->head_page; 1458 iter->head = cpu_buffer->head_page->read; 1459 } else { 1460 iter->head_page = cpu_buffer->reader_page; 1461 iter->head = cpu_buffer->reader_page->read; 1462 } 1463 if (iter->head) 1464 iter->read_stamp = cpu_buffer->read_stamp; 1465 else 1466 iter->read_stamp = iter->head_page->time_stamp; 1467 } 1468 1469 /** 1470 * ring_buffer_iter_empty - check if an iterator has no more to read 1471 * @iter: The iterator to check 1472 */ 1473 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 1474 { 1475 struct ring_buffer_per_cpu *cpu_buffer; 1476 1477 cpu_buffer = iter->cpu_buffer; 1478 1479 return iter->head_page == cpu_buffer->commit_page && 1480 iter->head == rb_commit_index(cpu_buffer); 1481 } 1482 1483 static void 1484 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1485 struct ring_buffer_event *event) 1486 { 1487 u64 delta; 1488 1489 switch (event->type) { 1490 case RINGBUF_TYPE_PADDING: 1491 return; 1492 1493 case RINGBUF_TYPE_TIME_EXTEND: 1494 delta = event->array[0]; 1495 delta <<= TS_SHIFT; 1496 delta += event->time_delta; 1497 cpu_buffer->read_stamp += delta; 1498 return; 1499 1500 case RINGBUF_TYPE_TIME_STAMP: 1501 /* FIXME: not implemented */ 1502 return; 1503 1504 case RINGBUF_TYPE_DATA: 1505 cpu_buffer->read_stamp += event->time_delta; 1506 return; 1507 1508 default: 1509 BUG(); 1510 } 1511 return; 1512 } 1513 1514 static void 1515 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 1516 struct ring_buffer_event *event) 1517 { 1518 u64 delta; 1519 1520 switch (event->type) { 1521 case RINGBUF_TYPE_PADDING: 1522 return; 1523 1524 case RINGBUF_TYPE_TIME_EXTEND: 1525 delta = event->array[0]; 1526 delta <<= TS_SHIFT; 1527 delta += event->time_delta; 1528 iter->read_stamp += delta; 1529 return; 1530 1531 case RINGBUF_TYPE_TIME_STAMP: 1532 /* FIXME: not implemented */ 1533 return; 1534 1535 case RINGBUF_TYPE_DATA: 1536 iter->read_stamp += event->time_delta; 1537 return; 1538 1539 default: 1540 BUG(); 1541 } 1542 return; 1543 } 1544 1545 static struct buffer_page * 1546 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1547 { 1548 struct buffer_page *reader = NULL; 1549 unsigned long flags; 1550 int nr_loops = 0; 1551 1552 spin_lock_irqsave(&cpu_buffer->lock, flags); 1553 1554 again: 1555 /* 1556 * This should normally only loop twice. But because the 1557 * start of the reader inserts an empty page, it causes 1558 * a case where we will loop three times. There should be no 1559 * reason to loop four times (that I know of). 1560 */ 1561 if (unlikely(++nr_loops > 3)) { 1562 RB_WARN_ON(cpu_buffer, 1); 1563 reader = NULL; 1564 goto out; 1565 } 1566 1567 reader = cpu_buffer->reader_page; 1568 1569 /* If there's more to read, return this page */ 1570 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 1571 goto out; 1572 1573 /* Never should we have an index greater than the size */ 1574 RB_WARN_ON(cpu_buffer, 1575 cpu_buffer->reader_page->read > rb_page_size(reader)); 1576 1577 /* check if we caught up to the tail */ 1578 reader = NULL; 1579 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 1580 goto out; 1581 1582 /* 1583 * Splice the empty reader page into the list around the head. 1584 * Reset the reader page to size zero. 1585 */ 1586 1587 reader = cpu_buffer->head_page; 1588 cpu_buffer->reader_page->list.next = reader->list.next; 1589 cpu_buffer->reader_page->list.prev = reader->list.prev; 1590 1591 local_set(&cpu_buffer->reader_page->write, 0); 1592 local_set(&cpu_buffer->reader_page->commit, 0); 1593 1594 /* Make the reader page now replace the head */ 1595 reader->list.prev->next = &cpu_buffer->reader_page->list; 1596 reader->list.next->prev = &cpu_buffer->reader_page->list; 1597 1598 /* 1599 * If the tail is on the reader, then we must set the head 1600 * to the inserted page, otherwise we set it one before. 1601 */ 1602 cpu_buffer->head_page = cpu_buffer->reader_page; 1603 1604 if (cpu_buffer->commit_page != reader) 1605 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 1606 1607 /* Finally update the reader page to the new head */ 1608 cpu_buffer->reader_page = reader; 1609 rb_reset_reader_page(cpu_buffer); 1610 1611 goto again; 1612 1613 out: 1614 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1615 1616 return reader; 1617 } 1618 1619 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 1620 { 1621 struct ring_buffer_event *event; 1622 struct buffer_page *reader; 1623 unsigned length; 1624 1625 reader = rb_get_reader_page(cpu_buffer); 1626 1627 /* This function should not be called when buffer is empty */ 1628 BUG_ON(!reader); 1629 1630 event = rb_reader_event(cpu_buffer); 1631 1632 if (event->type == RINGBUF_TYPE_DATA) 1633 cpu_buffer->entries--; 1634 1635 rb_update_read_stamp(cpu_buffer, event); 1636 1637 length = rb_event_length(event); 1638 cpu_buffer->reader_page->read += length; 1639 } 1640 1641 static void rb_advance_iter(struct ring_buffer_iter *iter) 1642 { 1643 struct ring_buffer *buffer; 1644 struct ring_buffer_per_cpu *cpu_buffer; 1645 struct ring_buffer_event *event; 1646 unsigned length; 1647 1648 cpu_buffer = iter->cpu_buffer; 1649 buffer = cpu_buffer->buffer; 1650 1651 /* 1652 * Check if we are at the end of the buffer. 1653 */ 1654 if (iter->head >= rb_page_size(iter->head_page)) { 1655 BUG_ON(iter->head_page == cpu_buffer->commit_page); 1656 rb_inc_iter(iter); 1657 return; 1658 } 1659 1660 event = rb_iter_head_event(iter); 1661 1662 length = rb_event_length(event); 1663 1664 /* 1665 * This should not be called to advance the header if we are 1666 * at the tail of the buffer. 1667 */ 1668 BUG_ON((iter->head_page == cpu_buffer->commit_page) && 1669 (iter->head + length > rb_commit_index(cpu_buffer))); 1670 1671 rb_update_iter_read_stamp(iter, event); 1672 1673 iter->head += length; 1674 1675 /* check for end of page padding */ 1676 if ((iter->head >= rb_page_size(iter->head_page)) && 1677 (iter->head_page != cpu_buffer->commit_page)) 1678 rb_advance_iter(iter); 1679 } 1680 1681 /** 1682 * ring_buffer_peek - peek at the next event to be read 1683 * @buffer: The ring buffer to read 1684 * @cpu: The cpu to peak at 1685 * @ts: The timestamp counter of this event. 1686 * 1687 * This will return the event that will be read next, but does 1688 * not consume the data. 1689 */ 1690 struct ring_buffer_event * 1691 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1692 { 1693 struct ring_buffer_per_cpu *cpu_buffer; 1694 struct ring_buffer_event *event; 1695 struct buffer_page *reader; 1696 int nr_loops = 0; 1697 1698 if (!cpu_isset(cpu, buffer->cpumask)) 1699 return NULL; 1700 1701 cpu_buffer = buffer->buffers[cpu]; 1702 1703 again: 1704 /* 1705 * We repeat when a timestamp is encountered. It is possible 1706 * to get multiple timestamps from an interrupt entering just 1707 * as one timestamp is about to be written. The max times 1708 * that this can happen is the number of nested interrupts we 1709 * can have. Nesting 10 deep of interrupts is clearly 1710 * an anomaly. 1711 */ 1712 if (unlikely(++nr_loops > 10)) { 1713 RB_WARN_ON(cpu_buffer, 1); 1714 return NULL; 1715 } 1716 1717 reader = rb_get_reader_page(cpu_buffer); 1718 if (!reader) 1719 return NULL; 1720 1721 event = rb_reader_event(cpu_buffer); 1722 1723 switch (event->type) { 1724 case RINGBUF_TYPE_PADDING: 1725 RB_WARN_ON(cpu_buffer, 1); 1726 rb_advance_reader(cpu_buffer); 1727 return NULL; 1728 1729 case RINGBUF_TYPE_TIME_EXTEND: 1730 /* Internal data, OK to advance */ 1731 rb_advance_reader(cpu_buffer); 1732 goto again; 1733 1734 case RINGBUF_TYPE_TIME_STAMP: 1735 /* FIXME: not implemented */ 1736 rb_advance_reader(cpu_buffer); 1737 goto again; 1738 1739 case RINGBUF_TYPE_DATA: 1740 if (ts) { 1741 *ts = cpu_buffer->read_stamp + event->time_delta; 1742 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1743 } 1744 return event; 1745 1746 default: 1747 BUG(); 1748 } 1749 1750 return NULL; 1751 } 1752 1753 /** 1754 * ring_buffer_iter_peek - peek at the next event to be read 1755 * @iter: The ring buffer iterator 1756 * @ts: The timestamp counter of this event. 1757 * 1758 * This will return the event that will be read next, but does 1759 * not increment the iterator. 1760 */ 1761 struct ring_buffer_event * 1762 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 1763 { 1764 struct ring_buffer *buffer; 1765 struct ring_buffer_per_cpu *cpu_buffer; 1766 struct ring_buffer_event *event; 1767 int nr_loops = 0; 1768 1769 if (ring_buffer_iter_empty(iter)) 1770 return NULL; 1771 1772 cpu_buffer = iter->cpu_buffer; 1773 buffer = cpu_buffer->buffer; 1774 1775 again: 1776 /* 1777 * We repeat when a timestamp is encountered. It is possible 1778 * to get multiple timestamps from an interrupt entering just 1779 * as one timestamp is about to be written. The max times 1780 * that this can happen is the number of nested interrupts we 1781 * can have. Nesting 10 deep of interrupts is clearly 1782 * an anomaly. 1783 */ 1784 if (unlikely(++nr_loops > 10)) { 1785 RB_WARN_ON(cpu_buffer, 1); 1786 return NULL; 1787 } 1788 1789 if (rb_per_cpu_empty(cpu_buffer)) 1790 return NULL; 1791 1792 event = rb_iter_head_event(iter); 1793 1794 switch (event->type) { 1795 case RINGBUF_TYPE_PADDING: 1796 rb_inc_iter(iter); 1797 goto again; 1798 1799 case RINGBUF_TYPE_TIME_EXTEND: 1800 /* Internal data, OK to advance */ 1801 rb_advance_iter(iter); 1802 goto again; 1803 1804 case RINGBUF_TYPE_TIME_STAMP: 1805 /* FIXME: not implemented */ 1806 rb_advance_iter(iter); 1807 goto again; 1808 1809 case RINGBUF_TYPE_DATA: 1810 if (ts) { 1811 *ts = iter->read_stamp + event->time_delta; 1812 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1813 } 1814 return event; 1815 1816 default: 1817 BUG(); 1818 } 1819 1820 return NULL; 1821 } 1822 1823 /** 1824 * ring_buffer_consume - return an event and consume it 1825 * @buffer: The ring buffer to get the next event from 1826 * 1827 * Returns the next event in the ring buffer, and that event is consumed. 1828 * Meaning, that sequential reads will keep returning a different event, 1829 * and eventually empty the ring buffer if the producer is slower. 1830 */ 1831 struct ring_buffer_event * 1832 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 1833 { 1834 struct ring_buffer_per_cpu *cpu_buffer; 1835 struct ring_buffer_event *event; 1836 1837 if (!cpu_isset(cpu, buffer->cpumask)) 1838 return NULL; 1839 1840 event = ring_buffer_peek(buffer, cpu, ts); 1841 if (!event) 1842 return NULL; 1843 1844 cpu_buffer = buffer->buffers[cpu]; 1845 rb_advance_reader(cpu_buffer); 1846 1847 return event; 1848 } 1849 1850 /** 1851 * ring_buffer_read_start - start a non consuming read of the buffer 1852 * @buffer: The ring buffer to read from 1853 * @cpu: The cpu buffer to iterate over 1854 * 1855 * This starts up an iteration through the buffer. It also disables 1856 * the recording to the buffer until the reading is finished. 1857 * This prevents the reading from being corrupted. This is not 1858 * a consuming read, so a producer is not expected. 1859 * 1860 * Must be paired with ring_buffer_finish. 1861 */ 1862 struct ring_buffer_iter * 1863 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 1864 { 1865 struct ring_buffer_per_cpu *cpu_buffer; 1866 struct ring_buffer_iter *iter; 1867 unsigned long flags; 1868 1869 if (!cpu_isset(cpu, buffer->cpumask)) 1870 return NULL; 1871 1872 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 1873 if (!iter) 1874 return NULL; 1875 1876 cpu_buffer = buffer->buffers[cpu]; 1877 1878 iter->cpu_buffer = cpu_buffer; 1879 1880 atomic_inc(&cpu_buffer->record_disabled); 1881 synchronize_sched(); 1882 1883 spin_lock_irqsave(&cpu_buffer->lock, flags); 1884 ring_buffer_iter_reset(iter); 1885 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1886 1887 return iter; 1888 } 1889 1890 /** 1891 * ring_buffer_finish - finish reading the iterator of the buffer 1892 * @iter: The iterator retrieved by ring_buffer_start 1893 * 1894 * This re-enables the recording to the buffer, and frees the 1895 * iterator. 1896 */ 1897 void 1898 ring_buffer_read_finish(struct ring_buffer_iter *iter) 1899 { 1900 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1901 1902 atomic_dec(&cpu_buffer->record_disabled); 1903 kfree(iter); 1904 } 1905 1906 /** 1907 * ring_buffer_read - read the next item in the ring buffer by the iterator 1908 * @iter: The ring buffer iterator 1909 * @ts: The time stamp of the event read. 1910 * 1911 * This reads the next event in the ring buffer and increments the iterator. 1912 */ 1913 struct ring_buffer_event * 1914 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 1915 { 1916 struct ring_buffer_event *event; 1917 1918 event = ring_buffer_iter_peek(iter, ts); 1919 if (!event) 1920 return NULL; 1921 1922 rb_advance_iter(iter); 1923 1924 return event; 1925 } 1926 1927 /** 1928 * ring_buffer_size - return the size of the ring buffer (in bytes) 1929 * @buffer: The ring buffer. 1930 */ 1931 unsigned long ring_buffer_size(struct ring_buffer *buffer) 1932 { 1933 return BUF_PAGE_SIZE * buffer->pages; 1934 } 1935 1936 static void 1937 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 1938 { 1939 cpu_buffer->head_page 1940 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 1941 local_set(&cpu_buffer->head_page->write, 0); 1942 local_set(&cpu_buffer->head_page->commit, 0); 1943 1944 cpu_buffer->head_page->read = 0; 1945 1946 cpu_buffer->tail_page = cpu_buffer->head_page; 1947 cpu_buffer->commit_page = cpu_buffer->head_page; 1948 1949 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1950 local_set(&cpu_buffer->reader_page->write, 0); 1951 local_set(&cpu_buffer->reader_page->commit, 0); 1952 cpu_buffer->reader_page->read = 0; 1953 1954 cpu_buffer->overrun = 0; 1955 cpu_buffer->entries = 0; 1956 } 1957 1958 /** 1959 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 1960 * @buffer: The ring buffer to reset a per cpu buffer of 1961 * @cpu: The CPU buffer to be reset 1962 */ 1963 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 1964 { 1965 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 1966 unsigned long flags; 1967 1968 if (!cpu_isset(cpu, buffer->cpumask)) 1969 return; 1970 1971 spin_lock_irqsave(&cpu_buffer->lock, flags); 1972 1973 rb_reset_cpu(cpu_buffer); 1974 1975 spin_unlock_irqrestore(&cpu_buffer->lock, flags); 1976 } 1977 1978 /** 1979 * ring_buffer_reset - reset a ring buffer 1980 * @buffer: The ring buffer to reset all cpu buffers 1981 */ 1982 void ring_buffer_reset(struct ring_buffer *buffer) 1983 { 1984 int cpu; 1985 1986 for_each_buffer_cpu(buffer, cpu) 1987 ring_buffer_reset_cpu(buffer, cpu); 1988 } 1989 1990 /** 1991 * rind_buffer_empty - is the ring buffer empty? 1992 * @buffer: The ring buffer to test 1993 */ 1994 int ring_buffer_empty(struct ring_buffer *buffer) 1995 { 1996 struct ring_buffer_per_cpu *cpu_buffer; 1997 int cpu; 1998 1999 /* yes this is racy, but if you don't like the race, lock the buffer */ 2000 for_each_buffer_cpu(buffer, cpu) { 2001 cpu_buffer = buffer->buffers[cpu]; 2002 if (!rb_per_cpu_empty(cpu_buffer)) 2003 return 0; 2004 } 2005 return 1; 2006 } 2007 2008 /** 2009 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2010 * @buffer: The ring buffer 2011 * @cpu: The CPU buffer to test 2012 */ 2013 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2014 { 2015 struct ring_buffer_per_cpu *cpu_buffer; 2016 2017 if (!cpu_isset(cpu, buffer->cpumask)) 2018 return 1; 2019 2020 cpu_buffer = buffer->buffers[cpu]; 2021 return rb_per_cpu_empty(cpu_buffer); 2022 } 2023 2024 /** 2025 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2026 * @buffer_a: One buffer to swap with 2027 * @buffer_b: The other buffer to swap with 2028 * 2029 * This function is useful for tracers that want to take a "snapshot" 2030 * of a CPU buffer and has another back up buffer lying around. 2031 * it is expected that the tracer handles the cpu buffer not being 2032 * used at the moment. 2033 */ 2034 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2035 struct ring_buffer *buffer_b, int cpu) 2036 { 2037 struct ring_buffer_per_cpu *cpu_buffer_a; 2038 struct ring_buffer_per_cpu *cpu_buffer_b; 2039 2040 if (!cpu_isset(cpu, buffer_a->cpumask) || 2041 !cpu_isset(cpu, buffer_b->cpumask)) 2042 return -EINVAL; 2043 2044 /* At least make sure the two buffers are somewhat the same */ 2045 if (buffer_a->size != buffer_b->size || 2046 buffer_a->pages != buffer_b->pages) 2047 return -EINVAL; 2048 2049 cpu_buffer_a = buffer_a->buffers[cpu]; 2050 cpu_buffer_b = buffer_b->buffers[cpu]; 2051 2052 /* 2053 * We can't do a synchronize_sched here because this 2054 * function can be called in atomic context. 2055 * Normally this will be called from the same CPU as cpu. 2056 * If not it's up to the caller to protect this. 2057 */ 2058 atomic_inc(&cpu_buffer_a->record_disabled); 2059 atomic_inc(&cpu_buffer_b->record_disabled); 2060 2061 buffer_a->buffers[cpu] = cpu_buffer_b; 2062 buffer_b->buffers[cpu] = cpu_buffer_a; 2063 2064 cpu_buffer_b->buffer = buffer_a; 2065 cpu_buffer_a->buffer = buffer_b; 2066 2067 atomic_dec(&cpu_buffer_a->record_disabled); 2068 atomic_dec(&cpu_buffer_b->record_disabled); 2069 2070 return 0; 2071 } 2072 2073