1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/init.h> 18 #include <linux/hash.h> 19 #include <linux/list.h> 20 #include <linux/cpu.h> 21 #include <linux/fs.h> 22 23 #include "trace.h" 24 25 /* 26 * The ring buffer header is special. We must manually up keep it. 27 */ 28 int ring_buffer_print_entry_header(struct trace_seq *s) 29 { 30 int ret; 31 32 ret = trace_seq_printf(s, "# compressed entry header\n"); 33 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 34 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 35 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 36 ret = trace_seq_printf(s, "\n"); 37 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 38 RINGBUF_TYPE_PADDING); 39 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 40 RINGBUF_TYPE_TIME_EXTEND); 41 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 42 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 43 44 return ret; 45 } 46 47 /* 48 * The ring buffer is made up of a list of pages. A separate list of pages is 49 * allocated for each CPU. A writer may only write to a buffer that is 50 * associated with the CPU it is currently executing on. A reader may read 51 * from any per cpu buffer. 52 * 53 * The reader is special. For each per cpu buffer, the reader has its own 54 * reader page. When a reader has read the entire reader page, this reader 55 * page is swapped with another page in the ring buffer. 56 * 57 * Now, as long as the writer is off the reader page, the reader can do what 58 * ever it wants with that page. The writer will never write to that page 59 * again (as long as it is out of the ring buffer). 60 * 61 * Here's some silly ASCII art. 62 * 63 * +------+ 64 * |reader| RING BUFFER 65 * |page | 66 * +------+ +---+ +---+ +---+ 67 * | |-->| |-->| | 68 * +---+ +---+ +---+ 69 * ^ | 70 * | | 71 * +---------------+ 72 * 73 * 74 * +------+ 75 * |reader| RING BUFFER 76 * |page |------------------v 77 * +------+ +---+ +---+ +---+ 78 * | |-->| |-->| | 79 * +---+ +---+ +---+ 80 * ^ | 81 * | | 82 * +---------------+ 83 * 84 * 85 * +------+ 86 * |reader| RING BUFFER 87 * |page |------------------v 88 * +------+ +---+ +---+ +---+ 89 * ^ | |-->| |-->| | 90 * | +---+ +---+ +---+ 91 * | | 92 * | | 93 * +------------------------------+ 94 * 95 * 96 * +------+ 97 * |buffer| RING BUFFER 98 * |page |------------------v 99 * +------+ +---+ +---+ +---+ 100 * ^ | | | |-->| | 101 * | New +---+ +---+ +---+ 102 * | Reader------^ | 103 * | page | 104 * +------------------------------+ 105 * 106 * 107 * After we make this swap, the reader can hand this page off to the splice 108 * code and be done with it. It can even allocate a new page if it needs to 109 * and swap that into the ring buffer. 110 * 111 * We will be using cmpxchg soon to make all this lockless. 112 * 113 */ 114 115 /* 116 * A fast way to enable or disable all ring buffers is to 117 * call tracing_on or tracing_off. Turning off the ring buffers 118 * prevents all ring buffers from being recorded to. 119 * Turning this switch on, makes it OK to write to the 120 * ring buffer, if the ring buffer is enabled itself. 121 * 122 * There's three layers that must be on in order to write 123 * to the ring buffer. 124 * 125 * 1) This global flag must be set. 126 * 2) The ring buffer must be enabled for recording. 127 * 3) The per cpu buffer must be enabled for recording. 128 * 129 * In case of an anomaly, this global flag has a bit set that 130 * will permantly disable all ring buffers. 131 */ 132 133 /* 134 * Global flag to disable all recording to ring buffers 135 * This has two bits: ON, DISABLED 136 * 137 * ON DISABLED 138 * ---- ---------- 139 * 0 0 : ring buffers are off 140 * 1 0 : ring buffers are on 141 * X 1 : ring buffers are permanently disabled 142 */ 143 144 enum { 145 RB_BUFFERS_ON_BIT = 0, 146 RB_BUFFERS_DISABLED_BIT = 1, 147 }; 148 149 enum { 150 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 151 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 152 }; 153 154 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 155 156 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 157 158 /** 159 * tracing_on - enable all tracing buffers 160 * 161 * This function enables all tracing buffers that may have been 162 * disabled with tracing_off. 163 */ 164 void tracing_on(void) 165 { 166 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 167 } 168 EXPORT_SYMBOL_GPL(tracing_on); 169 170 /** 171 * tracing_off - turn off all tracing buffers 172 * 173 * This function stops all tracing buffers from recording data. 174 * It does not disable any overhead the tracers themselves may 175 * be causing. This function simply causes all recording to 176 * the ring buffers to fail. 177 */ 178 void tracing_off(void) 179 { 180 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 181 } 182 EXPORT_SYMBOL_GPL(tracing_off); 183 184 /** 185 * tracing_off_permanent - permanently disable ring buffers 186 * 187 * This function, once called, will disable all ring buffers 188 * permanently. 189 */ 190 void tracing_off_permanent(void) 191 { 192 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 193 } 194 195 /** 196 * tracing_is_on - show state of ring buffers enabled 197 */ 198 int tracing_is_on(void) 199 { 200 return ring_buffer_flags == RB_BUFFERS_ON; 201 } 202 EXPORT_SYMBOL_GPL(tracing_is_on); 203 204 #include "trace.h" 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 212 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 213 214 enum { 215 RB_LEN_TIME_EXTEND = 8, 216 RB_LEN_TIME_STAMP = 16, 217 }; 218 219 static inline int rb_null_event(struct ring_buffer_event *event) 220 { 221 return event->type_len == RINGBUF_TYPE_PADDING 222 && event->time_delta == 0; 223 } 224 225 static inline int rb_discarded_event(struct ring_buffer_event *event) 226 { 227 return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta; 228 } 229 230 static void rb_event_set_padding(struct ring_buffer_event *event) 231 { 232 event->type_len = RINGBUF_TYPE_PADDING; 233 event->time_delta = 0; 234 } 235 236 static unsigned 237 rb_event_data_length(struct ring_buffer_event *event) 238 { 239 unsigned length; 240 241 if (event->type_len) 242 length = event->type_len * RB_ALIGNMENT; 243 else 244 length = event->array[0]; 245 return length + RB_EVNT_HDR_SIZE; 246 } 247 248 /* inline for ring buffer fast paths */ 249 static unsigned 250 rb_event_length(struct ring_buffer_event *event) 251 { 252 switch (event->type_len) { 253 case RINGBUF_TYPE_PADDING: 254 if (rb_null_event(event)) 255 /* undefined */ 256 return -1; 257 return event->array[0] + RB_EVNT_HDR_SIZE; 258 259 case RINGBUF_TYPE_TIME_EXTEND: 260 return RB_LEN_TIME_EXTEND; 261 262 case RINGBUF_TYPE_TIME_STAMP: 263 return RB_LEN_TIME_STAMP; 264 265 case RINGBUF_TYPE_DATA: 266 return rb_event_data_length(event); 267 default: 268 BUG(); 269 } 270 /* not hit */ 271 return 0; 272 } 273 274 /** 275 * ring_buffer_event_length - return the length of the event 276 * @event: the event to get the length of 277 */ 278 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 279 { 280 unsigned length = rb_event_length(event); 281 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 282 return length; 283 length -= RB_EVNT_HDR_SIZE; 284 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 285 length -= sizeof(event->array[0]); 286 return length; 287 } 288 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 289 290 /* inline for ring buffer fast paths */ 291 static void * 292 rb_event_data(struct ring_buffer_event *event) 293 { 294 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define TS_SHIFT 27 316 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 317 #define TS_DELTA_TEST (~TS_MASK) 318 319 struct buffer_data_page { 320 u64 time_stamp; /* page time stamp */ 321 local_t commit; /* write committed index */ 322 unsigned char data[]; /* data of buffer page */ 323 }; 324 325 struct buffer_page { 326 struct list_head list; /* list of buffer pages */ 327 local_t write; /* index for next write */ 328 unsigned read; /* index for next read */ 329 local_t entries; /* entries on this page */ 330 struct buffer_data_page *page; /* Actual data page */ 331 }; 332 333 static void rb_init_page(struct buffer_data_page *bpage) 334 { 335 local_set(&bpage->commit, 0); 336 } 337 338 /** 339 * ring_buffer_page_len - the size of data on the page. 340 * @page: The page to read 341 * 342 * Returns the amount of data on the page, including buffer page header. 343 */ 344 size_t ring_buffer_page_len(void *page) 345 { 346 return local_read(&((struct buffer_data_page *)page)->commit) 347 + BUF_PAGE_HDR_SIZE; 348 } 349 350 /* 351 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 352 * this issue out. 353 */ 354 static void free_buffer_page(struct buffer_page *bpage) 355 { 356 free_page((unsigned long)bpage->page); 357 kfree(bpage); 358 } 359 360 /* 361 * We need to fit the time_stamp delta into 27 bits. 362 */ 363 static inline int test_time_stamp(u64 delta) 364 { 365 if (delta & TS_DELTA_TEST) 366 return 1; 367 return 0; 368 } 369 370 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 371 372 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 373 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 374 375 /* Max number of timestamps that can fit on a page */ 376 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 377 378 int ring_buffer_print_page_header(struct trace_seq *s) 379 { 380 struct buffer_data_page field; 381 int ret; 382 383 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 384 "offset:0;\tsize:%u;\n", 385 (unsigned int)sizeof(field.time_stamp)); 386 387 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 388 "offset:%u;\tsize:%u;\n", 389 (unsigned int)offsetof(typeof(field), commit), 390 (unsigned int)sizeof(field.commit)); 391 392 ret = trace_seq_printf(s, "\tfield: char data;\t" 393 "offset:%u;\tsize:%u;\n", 394 (unsigned int)offsetof(typeof(field), data), 395 (unsigned int)BUF_PAGE_SIZE); 396 397 return ret; 398 } 399 400 /* 401 * head_page == tail_page && head == tail then buffer is empty. 402 */ 403 struct ring_buffer_per_cpu { 404 int cpu; 405 struct ring_buffer *buffer; 406 spinlock_t reader_lock; /* serialize readers */ 407 raw_spinlock_t lock; 408 struct lock_class_key lock_key; 409 struct list_head pages; 410 struct buffer_page *head_page; /* read from head */ 411 struct buffer_page *tail_page; /* write to tail */ 412 struct buffer_page *commit_page; /* committed pages */ 413 struct buffer_page *reader_page; 414 unsigned long nmi_dropped; 415 unsigned long commit_overrun; 416 unsigned long overrun; 417 unsigned long read; 418 local_t entries; 419 local_t committing; 420 local_t commits; 421 u64 write_stamp; 422 u64 read_stamp; 423 atomic_t record_disabled; 424 }; 425 426 struct ring_buffer { 427 unsigned pages; 428 unsigned flags; 429 int cpus; 430 atomic_t record_disabled; 431 cpumask_var_t cpumask; 432 433 struct lock_class_key *reader_lock_key; 434 435 struct mutex mutex; 436 437 struct ring_buffer_per_cpu **buffers; 438 439 #ifdef CONFIG_HOTPLUG_CPU 440 struct notifier_block cpu_notify; 441 #endif 442 u64 (*clock)(void); 443 }; 444 445 struct ring_buffer_iter { 446 struct ring_buffer_per_cpu *cpu_buffer; 447 unsigned long head; 448 struct buffer_page *head_page; 449 u64 read_stamp; 450 }; 451 452 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 453 #define RB_WARN_ON(buffer, cond) \ 454 ({ \ 455 int _____ret = unlikely(cond); \ 456 if (_____ret) { \ 457 atomic_inc(&buffer->record_disabled); \ 458 WARN_ON(1); \ 459 } \ 460 _____ret; \ 461 }) 462 463 /* Up this if you want to test the TIME_EXTENTS and normalization */ 464 #define DEBUG_SHIFT 0 465 466 static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu) 467 { 468 /* shift to debug/test normalization and TIME_EXTENTS */ 469 return buffer->clock() << DEBUG_SHIFT; 470 } 471 472 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 473 { 474 u64 time; 475 476 preempt_disable_notrace(); 477 time = rb_time_stamp(buffer, cpu); 478 preempt_enable_no_resched_notrace(); 479 480 return time; 481 } 482 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 483 484 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 485 int cpu, u64 *ts) 486 { 487 /* Just stupid testing the normalize function and deltas */ 488 *ts >>= DEBUG_SHIFT; 489 } 490 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 491 492 /** 493 * check_pages - integrity check of buffer pages 494 * @cpu_buffer: CPU buffer with pages to test 495 * 496 * As a safety measure we check to make sure the data pages have not 497 * been corrupted. 498 */ 499 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 500 { 501 struct list_head *head = &cpu_buffer->pages; 502 struct buffer_page *bpage, *tmp; 503 504 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 505 return -1; 506 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 507 return -1; 508 509 list_for_each_entry_safe(bpage, tmp, head, list) { 510 if (RB_WARN_ON(cpu_buffer, 511 bpage->list.next->prev != &bpage->list)) 512 return -1; 513 if (RB_WARN_ON(cpu_buffer, 514 bpage->list.prev->next != &bpage->list)) 515 return -1; 516 } 517 518 return 0; 519 } 520 521 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 522 unsigned nr_pages) 523 { 524 struct list_head *head = &cpu_buffer->pages; 525 struct buffer_page *bpage, *tmp; 526 unsigned long addr; 527 LIST_HEAD(pages); 528 unsigned i; 529 530 for (i = 0; i < nr_pages; i++) { 531 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 532 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 533 if (!bpage) 534 goto free_pages; 535 list_add(&bpage->list, &pages); 536 537 addr = __get_free_page(GFP_KERNEL); 538 if (!addr) 539 goto free_pages; 540 bpage->page = (void *)addr; 541 rb_init_page(bpage->page); 542 } 543 544 list_splice(&pages, head); 545 546 rb_check_pages(cpu_buffer); 547 548 return 0; 549 550 free_pages: 551 list_for_each_entry_safe(bpage, tmp, &pages, list) { 552 list_del_init(&bpage->list); 553 free_buffer_page(bpage); 554 } 555 return -ENOMEM; 556 } 557 558 static struct ring_buffer_per_cpu * 559 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 560 { 561 struct ring_buffer_per_cpu *cpu_buffer; 562 struct buffer_page *bpage; 563 unsigned long addr; 564 int ret; 565 566 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 567 GFP_KERNEL, cpu_to_node(cpu)); 568 if (!cpu_buffer) 569 return NULL; 570 571 cpu_buffer->cpu = cpu; 572 cpu_buffer->buffer = buffer; 573 spin_lock_init(&cpu_buffer->reader_lock); 574 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 575 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 576 INIT_LIST_HEAD(&cpu_buffer->pages); 577 578 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 579 GFP_KERNEL, cpu_to_node(cpu)); 580 if (!bpage) 581 goto fail_free_buffer; 582 583 cpu_buffer->reader_page = bpage; 584 addr = __get_free_page(GFP_KERNEL); 585 if (!addr) 586 goto fail_free_reader; 587 bpage->page = (void *)addr; 588 rb_init_page(bpage->page); 589 590 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 591 592 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 593 if (ret < 0) 594 goto fail_free_reader; 595 596 cpu_buffer->head_page 597 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 598 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 599 600 return cpu_buffer; 601 602 fail_free_reader: 603 free_buffer_page(cpu_buffer->reader_page); 604 605 fail_free_buffer: 606 kfree(cpu_buffer); 607 return NULL; 608 } 609 610 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 611 { 612 struct list_head *head = &cpu_buffer->pages; 613 struct buffer_page *bpage, *tmp; 614 615 free_buffer_page(cpu_buffer->reader_page); 616 617 list_for_each_entry_safe(bpage, tmp, head, list) { 618 list_del_init(&bpage->list); 619 free_buffer_page(bpage); 620 } 621 kfree(cpu_buffer); 622 } 623 624 #ifdef CONFIG_HOTPLUG_CPU 625 static int rb_cpu_notify(struct notifier_block *self, 626 unsigned long action, void *hcpu); 627 #endif 628 629 /** 630 * ring_buffer_alloc - allocate a new ring_buffer 631 * @size: the size in bytes per cpu that is needed. 632 * @flags: attributes to set for the ring buffer. 633 * 634 * Currently the only flag that is available is the RB_FL_OVERWRITE 635 * flag. This flag means that the buffer will overwrite old data 636 * when the buffer wraps. If this flag is not set, the buffer will 637 * drop data when the tail hits the head. 638 */ 639 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 640 struct lock_class_key *key) 641 { 642 struct ring_buffer *buffer; 643 int bsize; 644 int cpu; 645 646 /* keep it in its own cache line */ 647 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 648 GFP_KERNEL); 649 if (!buffer) 650 return NULL; 651 652 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 653 goto fail_free_buffer; 654 655 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 656 buffer->flags = flags; 657 buffer->clock = trace_clock_local; 658 buffer->reader_lock_key = key; 659 660 /* need at least two pages */ 661 if (buffer->pages < 2) 662 buffer->pages = 2; 663 664 /* 665 * In case of non-hotplug cpu, if the ring-buffer is allocated 666 * in early initcall, it will not be notified of secondary cpus. 667 * In that off case, we need to allocate for all possible cpus. 668 */ 669 #ifdef CONFIG_HOTPLUG_CPU 670 get_online_cpus(); 671 cpumask_copy(buffer->cpumask, cpu_online_mask); 672 #else 673 cpumask_copy(buffer->cpumask, cpu_possible_mask); 674 #endif 675 buffer->cpus = nr_cpu_ids; 676 677 bsize = sizeof(void *) * nr_cpu_ids; 678 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 679 GFP_KERNEL); 680 if (!buffer->buffers) 681 goto fail_free_cpumask; 682 683 for_each_buffer_cpu(buffer, cpu) { 684 buffer->buffers[cpu] = 685 rb_allocate_cpu_buffer(buffer, cpu); 686 if (!buffer->buffers[cpu]) 687 goto fail_free_buffers; 688 } 689 690 #ifdef CONFIG_HOTPLUG_CPU 691 buffer->cpu_notify.notifier_call = rb_cpu_notify; 692 buffer->cpu_notify.priority = 0; 693 register_cpu_notifier(&buffer->cpu_notify); 694 #endif 695 696 put_online_cpus(); 697 mutex_init(&buffer->mutex); 698 699 return buffer; 700 701 fail_free_buffers: 702 for_each_buffer_cpu(buffer, cpu) { 703 if (buffer->buffers[cpu]) 704 rb_free_cpu_buffer(buffer->buffers[cpu]); 705 } 706 kfree(buffer->buffers); 707 708 fail_free_cpumask: 709 free_cpumask_var(buffer->cpumask); 710 put_online_cpus(); 711 712 fail_free_buffer: 713 kfree(buffer); 714 return NULL; 715 } 716 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 717 718 /** 719 * ring_buffer_free - free a ring buffer. 720 * @buffer: the buffer to free. 721 */ 722 void 723 ring_buffer_free(struct ring_buffer *buffer) 724 { 725 int cpu; 726 727 get_online_cpus(); 728 729 #ifdef CONFIG_HOTPLUG_CPU 730 unregister_cpu_notifier(&buffer->cpu_notify); 731 #endif 732 733 for_each_buffer_cpu(buffer, cpu) 734 rb_free_cpu_buffer(buffer->buffers[cpu]); 735 736 put_online_cpus(); 737 738 kfree(buffer->buffers); 739 free_cpumask_var(buffer->cpumask); 740 741 kfree(buffer); 742 } 743 EXPORT_SYMBOL_GPL(ring_buffer_free); 744 745 void ring_buffer_set_clock(struct ring_buffer *buffer, 746 u64 (*clock)(void)) 747 { 748 buffer->clock = clock; 749 } 750 751 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 752 753 static void 754 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 755 { 756 struct buffer_page *bpage; 757 struct list_head *p; 758 unsigned i; 759 760 atomic_inc(&cpu_buffer->record_disabled); 761 synchronize_sched(); 762 763 for (i = 0; i < nr_pages; i++) { 764 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 765 return; 766 p = cpu_buffer->pages.next; 767 bpage = list_entry(p, struct buffer_page, list); 768 list_del_init(&bpage->list); 769 free_buffer_page(bpage); 770 } 771 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 772 return; 773 774 rb_reset_cpu(cpu_buffer); 775 776 rb_check_pages(cpu_buffer); 777 778 atomic_dec(&cpu_buffer->record_disabled); 779 780 } 781 782 static void 783 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 784 struct list_head *pages, unsigned nr_pages) 785 { 786 struct buffer_page *bpage; 787 struct list_head *p; 788 unsigned i; 789 790 atomic_inc(&cpu_buffer->record_disabled); 791 synchronize_sched(); 792 793 for (i = 0; i < nr_pages; i++) { 794 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 795 return; 796 p = pages->next; 797 bpage = list_entry(p, struct buffer_page, list); 798 list_del_init(&bpage->list); 799 list_add_tail(&bpage->list, &cpu_buffer->pages); 800 } 801 rb_reset_cpu(cpu_buffer); 802 803 rb_check_pages(cpu_buffer); 804 805 atomic_dec(&cpu_buffer->record_disabled); 806 } 807 808 /** 809 * ring_buffer_resize - resize the ring buffer 810 * @buffer: the buffer to resize. 811 * @size: the new size. 812 * 813 * The tracer is responsible for making sure that the buffer is 814 * not being used while changing the size. 815 * Note: We may be able to change the above requirement by using 816 * RCU synchronizations. 817 * 818 * Minimum size is 2 * BUF_PAGE_SIZE. 819 * 820 * Returns -1 on failure. 821 */ 822 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 823 { 824 struct ring_buffer_per_cpu *cpu_buffer; 825 unsigned nr_pages, rm_pages, new_pages; 826 struct buffer_page *bpage, *tmp; 827 unsigned long buffer_size; 828 unsigned long addr; 829 LIST_HEAD(pages); 830 int i, cpu; 831 832 /* 833 * Always succeed at resizing a non-existent buffer: 834 */ 835 if (!buffer) 836 return size; 837 838 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 839 size *= BUF_PAGE_SIZE; 840 buffer_size = buffer->pages * BUF_PAGE_SIZE; 841 842 /* we need a minimum of two pages */ 843 if (size < BUF_PAGE_SIZE * 2) 844 size = BUF_PAGE_SIZE * 2; 845 846 if (size == buffer_size) 847 return size; 848 849 mutex_lock(&buffer->mutex); 850 get_online_cpus(); 851 852 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 853 854 if (size < buffer_size) { 855 856 /* easy case, just free pages */ 857 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 858 goto out_fail; 859 860 rm_pages = buffer->pages - nr_pages; 861 862 for_each_buffer_cpu(buffer, cpu) { 863 cpu_buffer = buffer->buffers[cpu]; 864 rb_remove_pages(cpu_buffer, rm_pages); 865 } 866 goto out; 867 } 868 869 /* 870 * This is a bit more difficult. We only want to add pages 871 * when we can allocate enough for all CPUs. We do this 872 * by allocating all the pages and storing them on a local 873 * link list. If we succeed in our allocation, then we 874 * add these pages to the cpu_buffers. Otherwise we just free 875 * them all and return -ENOMEM; 876 */ 877 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 878 goto out_fail; 879 880 new_pages = nr_pages - buffer->pages; 881 882 for_each_buffer_cpu(buffer, cpu) { 883 for (i = 0; i < new_pages; i++) { 884 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 885 cache_line_size()), 886 GFP_KERNEL, cpu_to_node(cpu)); 887 if (!bpage) 888 goto free_pages; 889 list_add(&bpage->list, &pages); 890 addr = __get_free_page(GFP_KERNEL); 891 if (!addr) 892 goto free_pages; 893 bpage->page = (void *)addr; 894 rb_init_page(bpage->page); 895 } 896 } 897 898 for_each_buffer_cpu(buffer, cpu) { 899 cpu_buffer = buffer->buffers[cpu]; 900 rb_insert_pages(cpu_buffer, &pages, new_pages); 901 } 902 903 if (RB_WARN_ON(buffer, !list_empty(&pages))) 904 goto out_fail; 905 906 out: 907 buffer->pages = nr_pages; 908 put_online_cpus(); 909 mutex_unlock(&buffer->mutex); 910 911 return size; 912 913 free_pages: 914 list_for_each_entry_safe(bpage, tmp, &pages, list) { 915 list_del_init(&bpage->list); 916 free_buffer_page(bpage); 917 } 918 put_online_cpus(); 919 mutex_unlock(&buffer->mutex); 920 return -ENOMEM; 921 922 /* 923 * Something went totally wrong, and we are too paranoid 924 * to even clean up the mess. 925 */ 926 out_fail: 927 put_online_cpus(); 928 mutex_unlock(&buffer->mutex); 929 return -1; 930 } 931 EXPORT_SYMBOL_GPL(ring_buffer_resize); 932 933 static inline void * 934 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 935 { 936 return bpage->data + index; 937 } 938 939 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 940 { 941 return bpage->page->data + index; 942 } 943 944 static inline struct ring_buffer_event * 945 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 946 { 947 return __rb_page_index(cpu_buffer->reader_page, 948 cpu_buffer->reader_page->read); 949 } 950 951 static inline struct ring_buffer_event * 952 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 953 { 954 return __rb_page_index(cpu_buffer->head_page, 955 cpu_buffer->head_page->read); 956 } 957 958 static inline struct ring_buffer_event * 959 rb_iter_head_event(struct ring_buffer_iter *iter) 960 { 961 return __rb_page_index(iter->head_page, iter->head); 962 } 963 964 static inline unsigned rb_page_write(struct buffer_page *bpage) 965 { 966 return local_read(&bpage->write); 967 } 968 969 static inline unsigned rb_page_commit(struct buffer_page *bpage) 970 { 971 return local_read(&bpage->page->commit); 972 } 973 974 /* Size is determined by what has been commited */ 975 static inline unsigned rb_page_size(struct buffer_page *bpage) 976 { 977 return rb_page_commit(bpage); 978 } 979 980 static inline unsigned 981 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 982 { 983 return rb_page_commit(cpu_buffer->commit_page); 984 } 985 986 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 987 { 988 return rb_page_commit(cpu_buffer->head_page); 989 } 990 991 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 992 struct buffer_page **bpage) 993 { 994 struct list_head *p = (*bpage)->list.next; 995 996 if (p == &cpu_buffer->pages) 997 p = p->next; 998 999 *bpage = list_entry(p, struct buffer_page, list); 1000 } 1001 1002 static inline unsigned 1003 rb_event_index(struct ring_buffer_event *event) 1004 { 1005 unsigned long addr = (unsigned long)event; 1006 1007 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1008 } 1009 1010 static inline int 1011 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1012 struct ring_buffer_event *event) 1013 { 1014 unsigned long addr = (unsigned long)event; 1015 unsigned long index; 1016 1017 index = rb_event_index(event); 1018 addr &= PAGE_MASK; 1019 1020 return cpu_buffer->commit_page->page == (void *)addr && 1021 rb_commit_index(cpu_buffer) == index; 1022 } 1023 1024 static void 1025 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1026 { 1027 /* 1028 * We only race with interrupts and NMIs on this CPU. 1029 * If we own the commit event, then we can commit 1030 * all others that interrupted us, since the interruptions 1031 * are in stack format (they finish before they come 1032 * back to us). This allows us to do a simple loop to 1033 * assign the commit to the tail. 1034 */ 1035 again: 1036 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1037 cpu_buffer->commit_page->page->commit = 1038 cpu_buffer->commit_page->write; 1039 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1040 cpu_buffer->write_stamp = 1041 cpu_buffer->commit_page->page->time_stamp; 1042 /* add barrier to keep gcc from optimizing too much */ 1043 barrier(); 1044 } 1045 while (rb_commit_index(cpu_buffer) != 1046 rb_page_write(cpu_buffer->commit_page)) { 1047 cpu_buffer->commit_page->page->commit = 1048 cpu_buffer->commit_page->write; 1049 barrier(); 1050 } 1051 1052 /* again, keep gcc from optimizing */ 1053 barrier(); 1054 1055 /* 1056 * If an interrupt came in just after the first while loop 1057 * and pushed the tail page forward, we will be left with 1058 * a dangling commit that will never go forward. 1059 */ 1060 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1061 goto again; 1062 } 1063 1064 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1065 { 1066 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1067 cpu_buffer->reader_page->read = 0; 1068 } 1069 1070 static void rb_inc_iter(struct ring_buffer_iter *iter) 1071 { 1072 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1073 1074 /* 1075 * The iterator could be on the reader page (it starts there). 1076 * But the head could have moved, since the reader was 1077 * found. Check for this case and assign the iterator 1078 * to the head page instead of next. 1079 */ 1080 if (iter->head_page == cpu_buffer->reader_page) 1081 iter->head_page = cpu_buffer->head_page; 1082 else 1083 rb_inc_page(cpu_buffer, &iter->head_page); 1084 1085 iter->read_stamp = iter->head_page->page->time_stamp; 1086 iter->head = 0; 1087 } 1088 1089 /** 1090 * ring_buffer_update_event - update event type and data 1091 * @event: the even to update 1092 * @type: the type of event 1093 * @length: the size of the event field in the ring buffer 1094 * 1095 * Update the type and data fields of the event. The length 1096 * is the actual size that is written to the ring buffer, 1097 * and with this, we can determine what to place into the 1098 * data field. 1099 */ 1100 static void 1101 rb_update_event(struct ring_buffer_event *event, 1102 unsigned type, unsigned length) 1103 { 1104 event->type_len = type; 1105 1106 switch (type) { 1107 1108 case RINGBUF_TYPE_PADDING: 1109 case RINGBUF_TYPE_TIME_EXTEND: 1110 case RINGBUF_TYPE_TIME_STAMP: 1111 break; 1112 1113 case 0: 1114 length -= RB_EVNT_HDR_SIZE; 1115 if (length > RB_MAX_SMALL_DATA) 1116 event->array[0] = length; 1117 else 1118 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1119 break; 1120 default: 1121 BUG(); 1122 } 1123 } 1124 1125 static unsigned rb_calculate_event_length(unsigned length) 1126 { 1127 struct ring_buffer_event event; /* Used only for sizeof array */ 1128 1129 /* zero length can cause confusions */ 1130 if (!length) 1131 length = 1; 1132 1133 if (length > RB_MAX_SMALL_DATA) 1134 length += sizeof(event.array[0]); 1135 1136 length += RB_EVNT_HDR_SIZE; 1137 length = ALIGN(length, RB_ALIGNMENT); 1138 1139 return length; 1140 } 1141 1142 static inline void 1143 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1144 struct buffer_page *tail_page, 1145 unsigned long tail, unsigned long length) 1146 { 1147 struct ring_buffer_event *event; 1148 1149 /* 1150 * Only the event that crossed the page boundary 1151 * must fill the old tail_page with padding. 1152 */ 1153 if (tail >= BUF_PAGE_SIZE) { 1154 local_sub(length, &tail_page->write); 1155 return; 1156 } 1157 1158 event = __rb_page_index(tail_page, tail); 1159 kmemcheck_annotate_bitfield(event, bitfield); 1160 1161 /* 1162 * If this event is bigger than the minimum size, then 1163 * we need to be careful that we don't subtract the 1164 * write counter enough to allow another writer to slip 1165 * in on this page. 1166 * We put in a discarded commit instead, to make sure 1167 * that this space is not used again. 1168 * 1169 * If we are less than the minimum size, we don't need to 1170 * worry about it. 1171 */ 1172 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1173 /* No room for any events */ 1174 1175 /* Mark the rest of the page with padding */ 1176 rb_event_set_padding(event); 1177 1178 /* Set the write back to the previous setting */ 1179 local_sub(length, &tail_page->write); 1180 return; 1181 } 1182 1183 /* Put in a discarded event */ 1184 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1185 event->type_len = RINGBUF_TYPE_PADDING; 1186 /* time delta must be non zero */ 1187 event->time_delta = 1; 1188 /* Account for this as an entry */ 1189 local_inc(&tail_page->entries); 1190 local_inc(&cpu_buffer->entries); 1191 1192 /* Set write to end of buffer */ 1193 length = (tail + length) - BUF_PAGE_SIZE; 1194 local_sub(length, &tail_page->write); 1195 } 1196 1197 static struct ring_buffer_event * 1198 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1199 unsigned long length, unsigned long tail, 1200 struct buffer_page *commit_page, 1201 struct buffer_page *tail_page, u64 *ts) 1202 { 1203 struct buffer_page *next_page, *head_page, *reader_page; 1204 struct ring_buffer *buffer = cpu_buffer->buffer; 1205 bool lock_taken = false; 1206 unsigned long flags; 1207 1208 next_page = tail_page; 1209 1210 local_irq_save(flags); 1211 /* 1212 * Since the write to the buffer is still not 1213 * fully lockless, we must be careful with NMIs. 1214 * The locks in the writers are taken when a write 1215 * crosses to a new page. The locks protect against 1216 * races with the readers (this will soon be fixed 1217 * with a lockless solution). 1218 * 1219 * Because we can not protect against NMIs, and we 1220 * want to keep traces reentrant, we need to manage 1221 * what happens when we are in an NMI. 1222 * 1223 * NMIs can happen after we take the lock. 1224 * If we are in an NMI, only take the lock 1225 * if it is not already taken. Otherwise 1226 * simply fail. 1227 */ 1228 if (unlikely(in_nmi())) { 1229 if (!__raw_spin_trylock(&cpu_buffer->lock)) { 1230 cpu_buffer->nmi_dropped++; 1231 goto out_reset; 1232 } 1233 } else 1234 __raw_spin_lock(&cpu_buffer->lock); 1235 1236 lock_taken = true; 1237 1238 rb_inc_page(cpu_buffer, &next_page); 1239 1240 head_page = cpu_buffer->head_page; 1241 reader_page = cpu_buffer->reader_page; 1242 1243 /* we grabbed the lock before incrementing */ 1244 if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) 1245 goto out_reset; 1246 1247 /* 1248 * If for some reason, we had an interrupt storm that made 1249 * it all the way around the buffer, bail, and warn 1250 * about it. 1251 */ 1252 if (unlikely(next_page == commit_page)) { 1253 cpu_buffer->commit_overrun++; 1254 goto out_reset; 1255 } 1256 1257 if (next_page == head_page) { 1258 if (!(buffer->flags & RB_FL_OVERWRITE)) 1259 goto out_reset; 1260 1261 /* tail_page has not moved yet? */ 1262 if (tail_page == cpu_buffer->tail_page) { 1263 /* count overflows */ 1264 cpu_buffer->overrun += 1265 local_read(&head_page->entries); 1266 1267 rb_inc_page(cpu_buffer, &head_page); 1268 cpu_buffer->head_page = head_page; 1269 cpu_buffer->head_page->read = 0; 1270 } 1271 } 1272 1273 /* 1274 * If the tail page is still the same as what we think 1275 * it is, then it is up to us to update the tail 1276 * pointer. 1277 */ 1278 if (tail_page == cpu_buffer->tail_page) { 1279 local_set(&next_page->write, 0); 1280 local_set(&next_page->entries, 0); 1281 local_set(&next_page->page->commit, 0); 1282 cpu_buffer->tail_page = next_page; 1283 1284 /* reread the time stamp */ 1285 *ts = rb_time_stamp(buffer, cpu_buffer->cpu); 1286 cpu_buffer->tail_page->page->time_stamp = *ts; 1287 } 1288 1289 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1290 1291 __raw_spin_unlock(&cpu_buffer->lock); 1292 local_irq_restore(flags); 1293 1294 /* fail and let the caller try again */ 1295 return ERR_PTR(-EAGAIN); 1296 1297 out_reset: 1298 /* reset write */ 1299 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1300 1301 if (likely(lock_taken)) 1302 __raw_spin_unlock(&cpu_buffer->lock); 1303 local_irq_restore(flags); 1304 return NULL; 1305 } 1306 1307 static struct ring_buffer_event * 1308 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1309 unsigned type, unsigned long length, u64 *ts) 1310 { 1311 struct buffer_page *tail_page, *commit_page; 1312 struct ring_buffer_event *event; 1313 unsigned long tail, write; 1314 1315 commit_page = cpu_buffer->commit_page; 1316 /* we just need to protect against interrupts */ 1317 barrier(); 1318 tail_page = cpu_buffer->tail_page; 1319 write = local_add_return(length, &tail_page->write); 1320 tail = write - length; 1321 1322 /* See if we shot pass the end of this buffer page */ 1323 if (write > BUF_PAGE_SIZE) 1324 return rb_move_tail(cpu_buffer, length, tail, 1325 commit_page, tail_page, ts); 1326 1327 /* We reserved something on the buffer */ 1328 1329 event = __rb_page_index(tail_page, tail); 1330 kmemcheck_annotate_bitfield(event, bitfield); 1331 rb_update_event(event, type, length); 1332 1333 /* The passed in type is zero for DATA */ 1334 if (likely(!type)) 1335 local_inc(&tail_page->entries); 1336 1337 /* 1338 * If this is the first commit on the page, then update 1339 * its timestamp. 1340 */ 1341 if (!tail) 1342 tail_page->page->time_stamp = *ts; 1343 1344 return event; 1345 } 1346 1347 static inline int 1348 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1349 struct ring_buffer_event *event) 1350 { 1351 unsigned long new_index, old_index; 1352 struct buffer_page *bpage; 1353 unsigned long index; 1354 unsigned long addr; 1355 1356 new_index = rb_event_index(event); 1357 old_index = new_index + rb_event_length(event); 1358 addr = (unsigned long)event; 1359 addr &= PAGE_MASK; 1360 1361 bpage = cpu_buffer->tail_page; 1362 1363 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1364 /* 1365 * This is on the tail page. It is possible that 1366 * a write could come in and move the tail page 1367 * and write to the next page. That is fine 1368 * because we just shorten what is on this page. 1369 */ 1370 index = local_cmpxchg(&bpage->write, old_index, new_index); 1371 if (index == old_index) 1372 return 1; 1373 } 1374 1375 /* could not discard */ 1376 return 0; 1377 } 1378 1379 static int 1380 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1381 u64 *ts, u64 *delta) 1382 { 1383 struct ring_buffer_event *event; 1384 static int once; 1385 int ret; 1386 1387 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1388 printk(KERN_WARNING "Delta way too big! %llu" 1389 " ts=%llu write stamp = %llu\n", 1390 (unsigned long long)*delta, 1391 (unsigned long long)*ts, 1392 (unsigned long long)cpu_buffer->write_stamp); 1393 WARN_ON(1); 1394 } 1395 1396 /* 1397 * The delta is too big, we to add a 1398 * new timestamp. 1399 */ 1400 event = __rb_reserve_next(cpu_buffer, 1401 RINGBUF_TYPE_TIME_EXTEND, 1402 RB_LEN_TIME_EXTEND, 1403 ts); 1404 if (!event) 1405 return -EBUSY; 1406 1407 if (PTR_ERR(event) == -EAGAIN) 1408 return -EAGAIN; 1409 1410 /* Only a commited time event can update the write stamp */ 1411 if (rb_event_is_commit(cpu_buffer, event)) { 1412 /* 1413 * If this is the first on the page, then it was 1414 * updated with the page itself. Try to discard it 1415 * and if we can't just make it zero. 1416 */ 1417 if (rb_event_index(event)) { 1418 event->time_delta = *delta & TS_MASK; 1419 event->array[0] = *delta >> TS_SHIFT; 1420 } else { 1421 /* try to discard, since we do not need this */ 1422 if (!rb_try_to_discard(cpu_buffer, event)) { 1423 /* nope, just zero it */ 1424 event->time_delta = 0; 1425 event->array[0] = 0; 1426 } 1427 } 1428 cpu_buffer->write_stamp = *ts; 1429 /* let the caller know this was the commit */ 1430 ret = 1; 1431 } else { 1432 /* Try to discard the event */ 1433 if (!rb_try_to_discard(cpu_buffer, event)) { 1434 /* Darn, this is just wasted space */ 1435 event->time_delta = 0; 1436 event->array[0] = 0; 1437 } 1438 ret = 0; 1439 } 1440 1441 *delta = 0; 1442 1443 return ret; 1444 } 1445 1446 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 1447 { 1448 local_inc(&cpu_buffer->committing); 1449 local_inc(&cpu_buffer->commits); 1450 } 1451 1452 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 1453 { 1454 unsigned long commits; 1455 1456 if (RB_WARN_ON(cpu_buffer, 1457 !local_read(&cpu_buffer->committing))) 1458 return; 1459 1460 again: 1461 commits = local_read(&cpu_buffer->commits); 1462 /* synchronize with interrupts */ 1463 barrier(); 1464 if (local_read(&cpu_buffer->committing) == 1) 1465 rb_set_commit_to_write(cpu_buffer); 1466 1467 local_dec(&cpu_buffer->committing); 1468 1469 /* synchronize with interrupts */ 1470 barrier(); 1471 1472 /* 1473 * Need to account for interrupts coming in between the 1474 * updating of the commit page and the clearing of the 1475 * committing counter. 1476 */ 1477 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 1478 !local_read(&cpu_buffer->committing)) { 1479 local_inc(&cpu_buffer->committing); 1480 goto again; 1481 } 1482 } 1483 1484 static struct ring_buffer_event * 1485 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1486 unsigned long length) 1487 { 1488 struct ring_buffer_event *event; 1489 u64 ts, delta = 0; 1490 int commit = 0; 1491 int nr_loops = 0; 1492 1493 rb_start_commit(cpu_buffer); 1494 1495 length = rb_calculate_event_length(length); 1496 again: 1497 /* 1498 * We allow for interrupts to reenter here and do a trace. 1499 * If one does, it will cause this original code to loop 1500 * back here. Even with heavy interrupts happening, this 1501 * should only happen a few times in a row. If this happens 1502 * 1000 times in a row, there must be either an interrupt 1503 * storm or we have something buggy. 1504 * Bail! 1505 */ 1506 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1507 goto out_fail; 1508 1509 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); 1510 1511 /* 1512 * Only the first commit can update the timestamp. 1513 * Yes there is a race here. If an interrupt comes in 1514 * just after the conditional and it traces too, then it 1515 * will also check the deltas. More than one timestamp may 1516 * also be made. But only the entry that did the actual 1517 * commit will be something other than zero. 1518 */ 1519 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 1520 rb_page_write(cpu_buffer->tail_page) == 1521 rb_commit_index(cpu_buffer))) { 1522 u64 diff; 1523 1524 diff = ts - cpu_buffer->write_stamp; 1525 1526 /* make sure this diff is calculated here */ 1527 barrier(); 1528 1529 /* Did the write stamp get updated already? */ 1530 if (unlikely(ts < cpu_buffer->write_stamp)) 1531 goto get_event; 1532 1533 delta = diff; 1534 if (unlikely(test_time_stamp(delta))) { 1535 1536 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1537 if (commit == -EBUSY) 1538 goto out_fail; 1539 1540 if (commit == -EAGAIN) 1541 goto again; 1542 1543 RB_WARN_ON(cpu_buffer, commit < 0); 1544 } 1545 } 1546 1547 get_event: 1548 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 1549 if (unlikely(PTR_ERR(event) == -EAGAIN)) 1550 goto again; 1551 1552 if (!event) 1553 goto out_fail; 1554 1555 if (!rb_event_is_commit(cpu_buffer, event)) 1556 delta = 0; 1557 1558 event->time_delta = delta; 1559 1560 return event; 1561 1562 out_fail: 1563 rb_end_commit(cpu_buffer); 1564 return NULL; 1565 } 1566 1567 #ifdef CONFIG_TRACING 1568 1569 #define TRACE_RECURSIVE_DEPTH 16 1570 1571 static int trace_recursive_lock(void) 1572 { 1573 current->trace_recursion++; 1574 1575 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 1576 return 0; 1577 1578 /* Disable all tracing before we do anything else */ 1579 tracing_off_permanent(); 1580 1581 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 1582 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 1583 current->trace_recursion, 1584 hardirq_count() >> HARDIRQ_SHIFT, 1585 softirq_count() >> SOFTIRQ_SHIFT, 1586 in_nmi()); 1587 1588 WARN_ON_ONCE(1); 1589 return -1; 1590 } 1591 1592 static void trace_recursive_unlock(void) 1593 { 1594 WARN_ON_ONCE(!current->trace_recursion); 1595 1596 current->trace_recursion--; 1597 } 1598 1599 #else 1600 1601 #define trace_recursive_lock() (0) 1602 #define trace_recursive_unlock() do { } while (0) 1603 1604 #endif 1605 1606 static DEFINE_PER_CPU(int, rb_need_resched); 1607 1608 /** 1609 * ring_buffer_lock_reserve - reserve a part of the buffer 1610 * @buffer: the ring buffer to reserve from 1611 * @length: the length of the data to reserve (excluding event header) 1612 * 1613 * Returns a reseverd event on the ring buffer to copy directly to. 1614 * The user of this interface will need to get the body to write into 1615 * and can use the ring_buffer_event_data() interface. 1616 * 1617 * The length is the length of the data needed, not the event length 1618 * which also includes the event header. 1619 * 1620 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1621 * If NULL is returned, then nothing has been allocated or locked. 1622 */ 1623 struct ring_buffer_event * 1624 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 1625 { 1626 struct ring_buffer_per_cpu *cpu_buffer; 1627 struct ring_buffer_event *event; 1628 int cpu, resched; 1629 1630 if (ring_buffer_flags != RB_BUFFERS_ON) 1631 return NULL; 1632 1633 if (atomic_read(&buffer->record_disabled)) 1634 return NULL; 1635 1636 /* If we are tracing schedule, we don't want to recurse */ 1637 resched = ftrace_preempt_disable(); 1638 1639 if (trace_recursive_lock()) 1640 goto out_nocheck; 1641 1642 cpu = raw_smp_processor_id(); 1643 1644 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1645 goto out; 1646 1647 cpu_buffer = buffer->buffers[cpu]; 1648 1649 if (atomic_read(&cpu_buffer->record_disabled)) 1650 goto out; 1651 1652 if (length > BUF_MAX_DATA_SIZE) 1653 goto out; 1654 1655 event = rb_reserve_next_event(cpu_buffer, length); 1656 if (!event) 1657 goto out; 1658 1659 /* 1660 * Need to store resched state on this cpu. 1661 * Only the first needs to. 1662 */ 1663 1664 if (preempt_count() == 1) 1665 per_cpu(rb_need_resched, cpu) = resched; 1666 1667 return event; 1668 1669 out: 1670 trace_recursive_unlock(); 1671 1672 out_nocheck: 1673 ftrace_preempt_enable(resched); 1674 return NULL; 1675 } 1676 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 1677 1678 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1679 struct ring_buffer_event *event) 1680 { 1681 local_inc(&cpu_buffer->entries); 1682 1683 /* 1684 * The event first in the commit queue updates the 1685 * time stamp. 1686 */ 1687 if (rb_event_is_commit(cpu_buffer, event)) 1688 cpu_buffer->write_stamp += event->time_delta; 1689 1690 rb_end_commit(cpu_buffer); 1691 } 1692 1693 /** 1694 * ring_buffer_unlock_commit - commit a reserved 1695 * @buffer: The buffer to commit to 1696 * @event: The event pointer to commit. 1697 * 1698 * This commits the data to the ring buffer, and releases any locks held. 1699 * 1700 * Must be paired with ring_buffer_lock_reserve. 1701 */ 1702 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1703 struct ring_buffer_event *event) 1704 { 1705 struct ring_buffer_per_cpu *cpu_buffer; 1706 int cpu = raw_smp_processor_id(); 1707 1708 cpu_buffer = buffer->buffers[cpu]; 1709 1710 rb_commit(cpu_buffer, event); 1711 1712 trace_recursive_unlock(); 1713 1714 /* 1715 * Only the last preempt count needs to restore preemption. 1716 */ 1717 if (preempt_count() == 1) 1718 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1719 else 1720 preempt_enable_no_resched_notrace(); 1721 1722 return 0; 1723 } 1724 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 1725 1726 static inline void rb_event_discard(struct ring_buffer_event *event) 1727 { 1728 /* array[0] holds the actual length for the discarded event */ 1729 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 1730 event->type_len = RINGBUF_TYPE_PADDING; 1731 /* time delta must be non zero */ 1732 if (!event->time_delta) 1733 event->time_delta = 1; 1734 } 1735 1736 /** 1737 * ring_buffer_event_discard - discard any event in the ring buffer 1738 * @event: the event to discard 1739 * 1740 * Sometimes a event that is in the ring buffer needs to be ignored. 1741 * This function lets the user discard an event in the ring buffer 1742 * and then that event will not be read later. 1743 * 1744 * Note, it is up to the user to be careful with this, and protect 1745 * against races. If the user discards an event that has been consumed 1746 * it is possible that it could corrupt the ring buffer. 1747 */ 1748 void ring_buffer_event_discard(struct ring_buffer_event *event) 1749 { 1750 rb_event_discard(event); 1751 } 1752 EXPORT_SYMBOL_GPL(ring_buffer_event_discard); 1753 1754 /** 1755 * ring_buffer_commit_discard - discard an event that has not been committed 1756 * @buffer: the ring buffer 1757 * @event: non committed event to discard 1758 * 1759 * This is similar to ring_buffer_event_discard but must only be 1760 * performed on an event that has not been committed yet. The difference 1761 * is that this will also try to free the event from the ring buffer 1762 * if another event has not been added behind it. 1763 * 1764 * If another event has been added behind it, it will set the event 1765 * up as discarded, and perform the commit. 1766 * 1767 * If this function is called, do not call ring_buffer_unlock_commit on 1768 * the event. 1769 */ 1770 void ring_buffer_discard_commit(struct ring_buffer *buffer, 1771 struct ring_buffer_event *event) 1772 { 1773 struct ring_buffer_per_cpu *cpu_buffer; 1774 int cpu; 1775 1776 /* The event is discarded regardless */ 1777 rb_event_discard(event); 1778 1779 cpu = smp_processor_id(); 1780 cpu_buffer = buffer->buffers[cpu]; 1781 1782 /* 1783 * This must only be called if the event has not been 1784 * committed yet. Thus we can assume that preemption 1785 * is still disabled. 1786 */ 1787 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 1788 1789 if (rb_try_to_discard(cpu_buffer, event)) 1790 goto out; 1791 1792 /* 1793 * The commit is still visible by the reader, so we 1794 * must increment entries. 1795 */ 1796 local_inc(&cpu_buffer->entries); 1797 out: 1798 rb_end_commit(cpu_buffer); 1799 1800 trace_recursive_unlock(); 1801 1802 /* 1803 * Only the last preempt count needs to restore preemption. 1804 */ 1805 if (preempt_count() == 1) 1806 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1807 else 1808 preempt_enable_no_resched_notrace(); 1809 1810 } 1811 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 1812 1813 /** 1814 * ring_buffer_write - write data to the buffer without reserving 1815 * @buffer: The ring buffer to write to. 1816 * @length: The length of the data being written (excluding the event header) 1817 * @data: The data to write to the buffer. 1818 * 1819 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1820 * one function. If you already have the data to write to the buffer, it 1821 * may be easier to simply call this function. 1822 * 1823 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1824 * and not the length of the event which would hold the header. 1825 */ 1826 int ring_buffer_write(struct ring_buffer *buffer, 1827 unsigned long length, 1828 void *data) 1829 { 1830 struct ring_buffer_per_cpu *cpu_buffer; 1831 struct ring_buffer_event *event; 1832 void *body; 1833 int ret = -EBUSY; 1834 int cpu, resched; 1835 1836 if (ring_buffer_flags != RB_BUFFERS_ON) 1837 return -EBUSY; 1838 1839 if (atomic_read(&buffer->record_disabled)) 1840 return -EBUSY; 1841 1842 resched = ftrace_preempt_disable(); 1843 1844 cpu = raw_smp_processor_id(); 1845 1846 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1847 goto out; 1848 1849 cpu_buffer = buffer->buffers[cpu]; 1850 1851 if (atomic_read(&cpu_buffer->record_disabled)) 1852 goto out; 1853 1854 if (length > BUF_MAX_DATA_SIZE) 1855 goto out; 1856 1857 event = rb_reserve_next_event(cpu_buffer, length); 1858 if (!event) 1859 goto out; 1860 1861 body = rb_event_data(event); 1862 1863 memcpy(body, data, length); 1864 1865 rb_commit(cpu_buffer, event); 1866 1867 ret = 0; 1868 out: 1869 ftrace_preempt_enable(resched); 1870 1871 return ret; 1872 } 1873 EXPORT_SYMBOL_GPL(ring_buffer_write); 1874 1875 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1876 { 1877 struct buffer_page *reader = cpu_buffer->reader_page; 1878 struct buffer_page *head = cpu_buffer->head_page; 1879 struct buffer_page *commit = cpu_buffer->commit_page; 1880 1881 return reader->read == rb_page_commit(reader) && 1882 (commit == reader || 1883 (commit == head && 1884 head->read == rb_page_commit(commit))); 1885 } 1886 1887 /** 1888 * ring_buffer_record_disable - stop all writes into the buffer 1889 * @buffer: The ring buffer to stop writes to. 1890 * 1891 * This prevents all writes to the buffer. Any attempt to write 1892 * to the buffer after this will fail and return NULL. 1893 * 1894 * The caller should call synchronize_sched() after this. 1895 */ 1896 void ring_buffer_record_disable(struct ring_buffer *buffer) 1897 { 1898 atomic_inc(&buffer->record_disabled); 1899 } 1900 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 1901 1902 /** 1903 * ring_buffer_record_enable - enable writes to the buffer 1904 * @buffer: The ring buffer to enable writes 1905 * 1906 * Note, multiple disables will need the same number of enables 1907 * to truely enable the writing (much like preempt_disable). 1908 */ 1909 void ring_buffer_record_enable(struct ring_buffer *buffer) 1910 { 1911 atomic_dec(&buffer->record_disabled); 1912 } 1913 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 1914 1915 /** 1916 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1917 * @buffer: The ring buffer to stop writes to. 1918 * @cpu: The CPU buffer to stop 1919 * 1920 * This prevents all writes to the buffer. Any attempt to write 1921 * to the buffer after this will fail and return NULL. 1922 * 1923 * The caller should call synchronize_sched() after this. 1924 */ 1925 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1926 { 1927 struct ring_buffer_per_cpu *cpu_buffer; 1928 1929 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1930 return; 1931 1932 cpu_buffer = buffer->buffers[cpu]; 1933 atomic_inc(&cpu_buffer->record_disabled); 1934 } 1935 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 1936 1937 /** 1938 * ring_buffer_record_enable_cpu - enable writes to the buffer 1939 * @buffer: The ring buffer to enable writes 1940 * @cpu: The CPU to enable. 1941 * 1942 * Note, multiple disables will need the same number of enables 1943 * to truely enable the writing (much like preempt_disable). 1944 */ 1945 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1946 { 1947 struct ring_buffer_per_cpu *cpu_buffer; 1948 1949 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1950 return; 1951 1952 cpu_buffer = buffer->buffers[cpu]; 1953 atomic_dec(&cpu_buffer->record_disabled); 1954 } 1955 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 1956 1957 /** 1958 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1959 * @buffer: The ring buffer 1960 * @cpu: The per CPU buffer to get the entries from. 1961 */ 1962 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1963 { 1964 struct ring_buffer_per_cpu *cpu_buffer; 1965 unsigned long ret; 1966 1967 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1968 return 0; 1969 1970 cpu_buffer = buffer->buffers[cpu]; 1971 ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun) 1972 - cpu_buffer->read; 1973 1974 return ret; 1975 } 1976 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 1977 1978 /** 1979 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1980 * @buffer: The ring buffer 1981 * @cpu: The per CPU buffer to get the number of overruns from 1982 */ 1983 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1984 { 1985 struct ring_buffer_per_cpu *cpu_buffer; 1986 unsigned long ret; 1987 1988 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1989 return 0; 1990 1991 cpu_buffer = buffer->buffers[cpu]; 1992 ret = cpu_buffer->overrun; 1993 1994 return ret; 1995 } 1996 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 1997 1998 /** 1999 * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped 2000 * @buffer: The ring buffer 2001 * @cpu: The per CPU buffer to get the number of overruns from 2002 */ 2003 unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu) 2004 { 2005 struct ring_buffer_per_cpu *cpu_buffer; 2006 unsigned long ret; 2007 2008 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2009 return 0; 2010 2011 cpu_buffer = buffer->buffers[cpu]; 2012 ret = cpu_buffer->nmi_dropped; 2013 2014 return ret; 2015 } 2016 EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu); 2017 2018 /** 2019 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2020 * @buffer: The ring buffer 2021 * @cpu: The per CPU buffer to get the number of overruns from 2022 */ 2023 unsigned long 2024 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2025 { 2026 struct ring_buffer_per_cpu *cpu_buffer; 2027 unsigned long ret; 2028 2029 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2030 return 0; 2031 2032 cpu_buffer = buffer->buffers[cpu]; 2033 ret = cpu_buffer->commit_overrun; 2034 2035 return ret; 2036 } 2037 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2038 2039 /** 2040 * ring_buffer_entries - get the number of entries in a buffer 2041 * @buffer: The ring buffer 2042 * 2043 * Returns the total number of entries in the ring buffer 2044 * (all CPU entries) 2045 */ 2046 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2047 { 2048 struct ring_buffer_per_cpu *cpu_buffer; 2049 unsigned long entries = 0; 2050 int cpu; 2051 2052 /* if you care about this being correct, lock the buffer */ 2053 for_each_buffer_cpu(buffer, cpu) { 2054 cpu_buffer = buffer->buffers[cpu]; 2055 entries += (local_read(&cpu_buffer->entries) - 2056 cpu_buffer->overrun) - cpu_buffer->read; 2057 } 2058 2059 return entries; 2060 } 2061 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2062 2063 /** 2064 * ring_buffer_overrun_cpu - get the number of overruns in buffer 2065 * @buffer: The ring buffer 2066 * 2067 * Returns the total number of overruns in the ring buffer 2068 * (all CPU entries) 2069 */ 2070 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2071 { 2072 struct ring_buffer_per_cpu *cpu_buffer; 2073 unsigned long overruns = 0; 2074 int cpu; 2075 2076 /* if you care about this being correct, lock the buffer */ 2077 for_each_buffer_cpu(buffer, cpu) { 2078 cpu_buffer = buffer->buffers[cpu]; 2079 overruns += cpu_buffer->overrun; 2080 } 2081 2082 return overruns; 2083 } 2084 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2085 2086 static void rb_iter_reset(struct ring_buffer_iter *iter) 2087 { 2088 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2089 2090 /* Iterator usage is expected to have record disabled */ 2091 if (list_empty(&cpu_buffer->reader_page->list)) { 2092 iter->head_page = cpu_buffer->head_page; 2093 iter->head = cpu_buffer->head_page->read; 2094 } else { 2095 iter->head_page = cpu_buffer->reader_page; 2096 iter->head = cpu_buffer->reader_page->read; 2097 } 2098 if (iter->head) 2099 iter->read_stamp = cpu_buffer->read_stamp; 2100 else 2101 iter->read_stamp = iter->head_page->page->time_stamp; 2102 } 2103 2104 /** 2105 * ring_buffer_iter_reset - reset an iterator 2106 * @iter: The iterator to reset 2107 * 2108 * Resets the iterator, so that it will start from the beginning 2109 * again. 2110 */ 2111 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2112 { 2113 struct ring_buffer_per_cpu *cpu_buffer; 2114 unsigned long flags; 2115 2116 if (!iter) 2117 return; 2118 2119 cpu_buffer = iter->cpu_buffer; 2120 2121 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2122 rb_iter_reset(iter); 2123 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2124 } 2125 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2126 2127 /** 2128 * ring_buffer_iter_empty - check if an iterator has no more to read 2129 * @iter: The iterator to check 2130 */ 2131 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2132 { 2133 struct ring_buffer_per_cpu *cpu_buffer; 2134 2135 cpu_buffer = iter->cpu_buffer; 2136 2137 return iter->head_page == cpu_buffer->commit_page && 2138 iter->head == rb_commit_index(cpu_buffer); 2139 } 2140 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2141 2142 static void 2143 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2144 struct ring_buffer_event *event) 2145 { 2146 u64 delta; 2147 2148 switch (event->type_len) { 2149 case RINGBUF_TYPE_PADDING: 2150 return; 2151 2152 case RINGBUF_TYPE_TIME_EXTEND: 2153 delta = event->array[0]; 2154 delta <<= TS_SHIFT; 2155 delta += event->time_delta; 2156 cpu_buffer->read_stamp += delta; 2157 return; 2158 2159 case RINGBUF_TYPE_TIME_STAMP: 2160 /* FIXME: not implemented */ 2161 return; 2162 2163 case RINGBUF_TYPE_DATA: 2164 cpu_buffer->read_stamp += event->time_delta; 2165 return; 2166 2167 default: 2168 BUG(); 2169 } 2170 return; 2171 } 2172 2173 static void 2174 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2175 struct ring_buffer_event *event) 2176 { 2177 u64 delta; 2178 2179 switch (event->type_len) { 2180 case RINGBUF_TYPE_PADDING: 2181 return; 2182 2183 case RINGBUF_TYPE_TIME_EXTEND: 2184 delta = event->array[0]; 2185 delta <<= TS_SHIFT; 2186 delta += event->time_delta; 2187 iter->read_stamp += delta; 2188 return; 2189 2190 case RINGBUF_TYPE_TIME_STAMP: 2191 /* FIXME: not implemented */ 2192 return; 2193 2194 case RINGBUF_TYPE_DATA: 2195 iter->read_stamp += event->time_delta; 2196 return; 2197 2198 default: 2199 BUG(); 2200 } 2201 return; 2202 } 2203 2204 static struct buffer_page * 2205 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2206 { 2207 struct buffer_page *reader = NULL; 2208 unsigned long flags; 2209 int nr_loops = 0; 2210 2211 local_irq_save(flags); 2212 __raw_spin_lock(&cpu_buffer->lock); 2213 2214 again: 2215 /* 2216 * This should normally only loop twice. But because the 2217 * start of the reader inserts an empty page, it causes 2218 * a case where we will loop three times. There should be no 2219 * reason to loop four times (that I know of). 2220 */ 2221 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2222 reader = NULL; 2223 goto out; 2224 } 2225 2226 reader = cpu_buffer->reader_page; 2227 2228 /* If there's more to read, return this page */ 2229 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2230 goto out; 2231 2232 /* Never should we have an index greater than the size */ 2233 if (RB_WARN_ON(cpu_buffer, 2234 cpu_buffer->reader_page->read > rb_page_size(reader))) 2235 goto out; 2236 2237 /* check if we caught up to the tail */ 2238 reader = NULL; 2239 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2240 goto out; 2241 2242 /* 2243 * Splice the empty reader page into the list around the head. 2244 * Reset the reader page to size zero. 2245 */ 2246 2247 reader = cpu_buffer->head_page; 2248 cpu_buffer->reader_page->list.next = reader->list.next; 2249 cpu_buffer->reader_page->list.prev = reader->list.prev; 2250 2251 local_set(&cpu_buffer->reader_page->write, 0); 2252 local_set(&cpu_buffer->reader_page->entries, 0); 2253 local_set(&cpu_buffer->reader_page->page->commit, 0); 2254 2255 /* Make the reader page now replace the head */ 2256 reader->list.prev->next = &cpu_buffer->reader_page->list; 2257 reader->list.next->prev = &cpu_buffer->reader_page->list; 2258 2259 /* 2260 * If the tail is on the reader, then we must set the head 2261 * to the inserted page, otherwise we set it one before. 2262 */ 2263 cpu_buffer->head_page = cpu_buffer->reader_page; 2264 2265 if (cpu_buffer->commit_page != reader) 2266 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2267 2268 /* Finally update the reader page to the new head */ 2269 cpu_buffer->reader_page = reader; 2270 rb_reset_reader_page(cpu_buffer); 2271 2272 goto again; 2273 2274 out: 2275 __raw_spin_unlock(&cpu_buffer->lock); 2276 local_irq_restore(flags); 2277 2278 return reader; 2279 } 2280 2281 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2282 { 2283 struct ring_buffer_event *event; 2284 struct buffer_page *reader; 2285 unsigned length; 2286 2287 reader = rb_get_reader_page(cpu_buffer); 2288 2289 /* This function should not be called when buffer is empty */ 2290 if (RB_WARN_ON(cpu_buffer, !reader)) 2291 return; 2292 2293 event = rb_reader_event(cpu_buffer); 2294 2295 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX 2296 || rb_discarded_event(event)) 2297 cpu_buffer->read++; 2298 2299 rb_update_read_stamp(cpu_buffer, event); 2300 2301 length = rb_event_length(event); 2302 cpu_buffer->reader_page->read += length; 2303 } 2304 2305 static void rb_advance_iter(struct ring_buffer_iter *iter) 2306 { 2307 struct ring_buffer *buffer; 2308 struct ring_buffer_per_cpu *cpu_buffer; 2309 struct ring_buffer_event *event; 2310 unsigned length; 2311 2312 cpu_buffer = iter->cpu_buffer; 2313 buffer = cpu_buffer->buffer; 2314 2315 /* 2316 * Check if we are at the end of the buffer. 2317 */ 2318 if (iter->head >= rb_page_size(iter->head_page)) { 2319 /* discarded commits can make the page empty */ 2320 if (iter->head_page == cpu_buffer->commit_page) 2321 return; 2322 rb_inc_iter(iter); 2323 return; 2324 } 2325 2326 event = rb_iter_head_event(iter); 2327 2328 length = rb_event_length(event); 2329 2330 /* 2331 * This should not be called to advance the header if we are 2332 * at the tail of the buffer. 2333 */ 2334 if (RB_WARN_ON(cpu_buffer, 2335 (iter->head_page == cpu_buffer->commit_page) && 2336 (iter->head + length > rb_commit_index(cpu_buffer)))) 2337 return; 2338 2339 rb_update_iter_read_stamp(iter, event); 2340 2341 iter->head += length; 2342 2343 /* check for end of page padding */ 2344 if ((iter->head >= rb_page_size(iter->head_page)) && 2345 (iter->head_page != cpu_buffer->commit_page)) 2346 rb_advance_iter(iter); 2347 } 2348 2349 static struct ring_buffer_event * 2350 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2351 { 2352 struct ring_buffer_per_cpu *cpu_buffer; 2353 struct ring_buffer_event *event; 2354 struct buffer_page *reader; 2355 int nr_loops = 0; 2356 2357 cpu_buffer = buffer->buffers[cpu]; 2358 2359 again: 2360 /* 2361 * We repeat when a timestamp is encountered. It is possible 2362 * to get multiple timestamps from an interrupt entering just 2363 * as one timestamp is about to be written, or from discarded 2364 * commits. The most that we can have is the number on a single page. 2365 */ 2366 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2367 return NULL; 2368 2369 reader = rb_get_reader_page(cpu_buffer); 2370 if (!reader) 2371 return NULL; 2372 2373 event = rb_reader_event(cpu_buffer); 2374 2375 switch (event->type_len) { 2376 case RINGBUF_TYPE_PADDING: 2377 if (rb_null_event(event)) 2378 RB_WARN_ON(cpu_buffer, 1); 2379 /* 2380 * Because the writer could be discarding every 2381 * event it creates (which would probably be bad) 2382 * if we were to go back to "again" then we may never 2383 * catch up, and will trigger the warn on, or lock 2384 * the box. Return the padding, and we will release 2385 * the current locks, and try again. 2386 */ 2387 return event; 2388 2389 case RINGBUF_TYPE_TIME_EXTEND: 2390 /* Internal data, OK to advance */ 2391 rb_advance_reader(cpu_buffer); 2392 goto again; 2393 2394 case RINGBUF_TYPE_TIME_STAMP: 2395 /* FIXME: not implemented */ 2396 rb_advance_reader(cpu_buffer); 2397 goto again; 2398 2399 case RINGBUF_TYPE_DATA: 2400 if (ts) { 2401 *ts = cpu_buffer->read_stamp + event->time_delta; 2402 ring_buffer_normalize_time_stamp(buffer, 2403 cpu_buffer->cpu, ts); 2404 } 2405 return event; 2406 2407 default: 2408 BUG(); 2409 } 2410 2411 return NULL; 2412 } 2413 EXPORT_SYMBOL_GPL(ring_buffer_peek); 2414 2415 static struct ring_buffer_event * 2416 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2417 { 2418 struct ring_buffer *buffer; 2419 struct ring_buffer_per_cpu *cpu_buffer; 2420 struct ring_buffer_event *event; 2421 int nr_loops = 0; 2422 2423 if (ring_buffer_iter_empty(iter)) 2424 return NULL; 2425 2426 cpu_buffer = iter->cpu_buffer; 2427 buffer = cpu_buffer->buffer; 2428 2429 again: 2430 /* 2431 * We repeat when a timestamp is encountered. 2432 * We can get multiple timestamps by nested interrupts or also 2433 * if filtering is on (discarding commits). Since discarding 2434 * commits can be frequent we can get a lot of timestamps. 2435 * But we limit them by not adding timestamps if they begin 2436 * at the start of a page. 2437 */ 2438 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2439 return NULL; 2440 2441 if (rb_per_cpu_empty(cpu_buffer)) 2442 return NULL; 2443 2444 event = rb_iter_head_event(iter); 2445 2446 switch (event->type_len) { 2447 case RINGBUF_TYPE_PADDING: 2448 if (rb_null_event(event)) { 2449 rb_inc_iter(iter); 2450 goto again; 2451 } 2452 rb_advance_iter(iter); 2453 return event; 2454 2455 case RINGBUF_TYPE_TIME_EXTEND: 2456 /* Internal data, OK to advance */ 2457 rb_advance_iter(iter); 2458 goto again; 2459 2460 case RINGBUF_TYPE_TIME_STAMP: 2461 /* FIXME: not implemented */ 2462 rb_advance_iter(iter); 2463 goto again; 2464 2465 case RINGBUF_TYPE_DATA: 2466 if (ts) { 2467 *ts = iter->read_stamp + event->time_delta; 2468 ring_buffer_normalize_time_stamp(buffer, 2469 cpu_buffer->cpu, ts); 2470 } 2471 return event; 2472 2473 default: 2474 BUG(); 2475 } 2476 2477 return NULL; 2478 } 2479 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 2480 2481 static inline int rb_ok_to_lock(void) 2482 { 2483 /* 2484 * If an NMI die dumps out the content of the ring buffer 2485 * do not grab locks. We also permanently disable the ring 2486 * buffer too. A one time deal is all you get from reading 2487 * the ring buffer from an NMI. 2488 */ 2489 if (likely(!in_nmi())) 2490 return 1; 2491 2492 tracing_off_permanent(); 2493 return 0; 2494 } 2495 2496 /** 2497 * ring_buffer_peek - peek at the next event to be read 2498 * @buffer: The ring buffer to read 2499 * @cpu: The cpu to peak at 2500 * @ts: The timestamp counter of this event. 2501 * 2502 * This will return the event that will be read next, but does 2503 * not consume the data. 2504 */ 2505 struct ring_buffer_event * 2506 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2507 { 2508 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2509 struct ring_buffer_event *event; 2510 unsigned long flags; 2511 int dolock; 2512 2513 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2514 return NULL; 2515 2516 dolock = rb_ok_to_lock(); 2517 again: 2518 local_irq_save(flags); 2519 if (dolock) 2520 spin_lock(&cpu_buffer->reader_lock); 2521 event = rb_buffer_peek(buffer, cpu, ts); 2522 if (event && event->type_len == RINGBUF_TYPE_PADDING) 2523 rb_advance_reader(cpu_buffer); 2524 if (dolock) 2525 spin_unlock(&cpu_buffer->reader_lock); 2526 local_irq_restore(flags); 2527 2528 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2529 cpu_relax(); 2530 goto again; 2531 } 2532 2533 return event; 2534 } 2535 2536 /** 2537 * ring_buffer_iter_peek - peek at the next event to be read 2538 * @iter: The ring buffer iterator 2539 * @ts: The timestamp counter of this event. 2540 * 2541 * This will return the event that will be read next, but does 2542 * not increment the iterator. 2543 */ 2544 struct ring_buffer_event * 2545 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2546 { 2547 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2548 struct ring_buffer_event *event; 2549 unsigned long flags; 2550 2551 again: 2552 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2553 event = rb_iter_peek(iter, ts); 2554 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2555 2556 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2557 cpu_relax(); 2558 goto again; 2559 } 2560 2561 return event; 2562 } 2563 2564 /** 2565 * ring_buffer_consume - return an event and consume it 2566 * @buffer: The ring buffer to get the next event from 2567 * 2568 * Returns the next event in the ring buffer, and that event is consumed. 2569 * Meaning, that sequential reads will keep returning a different event, 2570 * and eventually empty the ring buffer if the producer is slower. 2571 */ 2572 struct ring_buffer_event * 2573 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 2574 { 2575 struct ring_buffer_per_cpu *cpu_buffer; 2576 struct ring_buffer_event *event = NULL; 2577 unsigned long flags; 2578 int dolock; 2579 2580 dolock = rb_ok_to_lock(); 2581 2582 again: 2583 /* might be called in atomic */ 2584 preempt_disable(); 2585 2586 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2587 goto out; 2588 2589 cpu_buffer = buffer->buffers[cpu]; 2590 local_irq_save(flags); 2591 if (dolock) 2592 spin_lock(&cpu_buffer->reader_lock); 2593 2594 event = rb_buffer_peek(buffer, cpu, ts); 2595 if (event) 2596 rb_advance_reader(cpu_buffer); 2597 2598 if (dolock) 2599 spin_unlock(&cpu_buffer->reader_lock); 2600 local_irq_restore(flags); 2601 2602 out: 2603 preempt_enable(); 2604 2605 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2606 cpu_relax(); 2607 goto again; 2608 } 2609 2610 return event; 2611 } 2612 EXPORT_SYMBOL_GPL(ring_buffer_consume); 2613 2614 /** 2615 * ring_buffer_read_start - start a non consuming read of the buffer 2616 * @buffer: The ring buffer to read from 2617 * @cpu: The cpu buffer to iterate over 2618 * 2619 * This starts up an iteration through the buffer. It also disables 2620 * the recording to the buffer until the reading is finished. 2621 * This prevents the reading from being corrupted. This is not 2622 * a consuming read, so a producer is not expected. 2623 * 2624 * Must be paired with ring_buffer_finish. 2625 */ 2626 struct ring_buffer_iter * 2627 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 2628 { 2629 struct ring_buffer_per_cpu *cpu_buffer; 2630 struct ring_buffer_iter *iter; 2631 unsigned long flags; 2632 2633 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2634 return NULL; 2635 2636 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 2637 if (!iter) 2638 return NULL; 2639 2640 cpu_buffer = buffer->buffers[cpu]; 2641 2642 iter->cpu_buffer = cpu_buffer; 2643 2644 atomic_inc(&cpu_buffer->record_disabled); 2645 synchronize_sched(); 2646 2647 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2648 __raw_spin_lock(&cpu_buffer->lock); 2649 rb_iter_reset(iter); 2650 __raw_spin_unlock(&cpu_buffer->lock); 2651 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2652 2653 return iter; 2654 } 2655 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 2656 2657 /** 2658 * ring_buffer_finish - finish reading the iterator of the buffer 2659 * @iter: The iterator retrieved by ring_buffer_start 2660 * 2661 * This re-enables the recording to the buffer, and frees the 2662 * iterator. 2663 */ 2664 void 2665 ring_buffer_read_finish(struct ring_buffer_iter *iter) 2666 { 2667 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2668 2669 atomic_dec(&cpu_buffer->record_disabled); 2670 kfree(iter); 2671 } 2672 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 2673 2674 /** 2675 * ring_buffer_read - read the next item in the ring buffer by the iterator 2676 * @iter: The ring buffer iterator 2677 * @ts: The time stamp of the event read. 2678 * 2679 * This reads the next event in the ring buffer and increments the iterator. 2680 */ 2681 struct ring_buffer_event * 2682 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 2683 { 2684 struct ring_buffer_event *event; 2685 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2686 unsigned long flags; 2687 2688 again: 2689 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2690 event = rb_iter_peek(iter, ts); 2691 if (!event) 2692 goto out; 2693 2694 rb_advance_iter(iter); 2695 out: 2696 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2697 2698 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2699 cpu_relax(); 2700 goto again; 2701 } 2702 2703 return event; 2704 } 2705 EXPORT_SYMBOL_GPL(ring_buffer_read); 2706 2707 /** 2708 * ring_buffer_size - return the size of the ring buffer (in bytes) 2709 * @buffer: The ring buffer. 2710 */ 2711 unsigned long ring_buffer_size(struct ring_buffer *buffer) 2712 { 2713 return BUF_PAGE_SIZE * buffer->pages; 2714 } 2715 EXPORT_SYMBOL_GPL(ring_buffer_size); 2716 2717 static void 2718 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 2719 { 2720 cpu_buffer->head_page 2721 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 2722 local_set(&cpu_buffer->head_page->write, 0); 2723 local_set(&cpu_buffer->head_page->entries, 0); 2724 local_set(&cpu_buffer->head_page->page->commit, 0); 2725 2726 cpu_buffer->head_page->read = 0; 2727 2728 cpu_buffer->tail_page = cpu_buffer->head_page; 2729 cpu_buffer->commit_page = cpu_buffer->head_page; 2730 2731 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2732 local_set(&cpu_buffer->reader_page->write, 0); 2733 local_set(&cpu_buffer->reader_page->entries, 0); 2734 local_set(&cpu_buffer->reader_page->page->commit, 0); 2735 cpu_buffer->reader_page->read = 0; 2736 2737 cpu_buffer->nmi_dropped = 0; 2738 cpu_buffer->commit_overrun = 0; 2739 cpu_buffer->overrun = 0; 2740 cpu_buffer->read = 0; 2741 local_set(&cpu_buffer->entries, 0); 2742 local_set(&cpu_buffer->committing, 0); 2743 local_set(&cpu_buffer->commits, 0); 2744 2745 cpu_buffer->write_stamp = 0; 2746 cpu_buffer->read_stamp = 0; 2747 } 2748 2749 /** 2750 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 2751 * @buffer: The ring buffer to reset a per cpu buffer of 2752 * @cpu: The CPU buffer to be reset 2753 */ 2754 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 2755 { 2756 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2757 unsigned long flags; 2758 2759 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2760 return; 2761 2762 atomic_inc(&cpu_buffer->record_disabled); 2763 2764 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2765 2766 __raw_spin_lock(&cpu_buffer->lock); 2767 2768 rb_reset_cpu(cpu_buffer); 2769 2770 __raw_spin_unlock(&cpu_buffer->lock); 2771 2772 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2773 2774 atomic_dec(&cpu_buffer->record_disabled); 2775 } 2776 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 2777 2778 /** 2779 * ring_buffer_reset - reset a ring buffer 2780 * @buffer: The ring buffer to reset all cpu buffers 2781 */ 2782 void ring_buffer_reset(struct ring_buffer *buffer) 2783 { 2784 int cpu; 2785 2786 for_each_buffer_cpu(buffer, cpu) 2787 ring_buffer_reset_cpu(buffer, cpu); 2788 } 2789 EXPORT_SYMBOL_GPL(ring_buffer_reset); 2790 2791 /** 2792 * rind_buffer_empty - is the ring buffer empty? 2793 * @buffer: The ring buffer to test 2794 */ 2795 int ring_buffer_empty(struct ring_buffer *buffer) 2796 { 2797 struct ring_buffer_per_cpu *cpu_buffer; 2798 unsigned long flags; 2799 int dolock; 2800 int cpu; 2801 int ret; 2802 2803 dolock = rb_ok_to_lock(); 2804 2805 /* yes this is racy, but if you don't like the race, lock the buffer */ 2806 for_each_buffer_cpu(buffer, cpu) { 2807 cpu_buffer = buffer->buffers[cpu]; 2808 local_irq_save(flags); 2809 if (dolock) 2810 spin_lock(&cpu_buffer->reader_lock); 2811 ret = rb_per_cpu_empty(cpu_buffer); 2812 if (dolock) 2813 spin_unlock(&cpu_buffer->reader_lock); 2814 local_irq_restore(flags); 2815 2816 if (!ret) 2817 return 0; 2818 } 2819 2820 return 1; 2821 } 2822 EXPORT_SYMBOL_GPL(ring_buffer_empty); 2823 2824 /** 2825 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2826 * @buffer: The ring buffer 2827 * @cpu: The CPU buffer to test 2828 */ 2829 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2830 { 2831 struct ring_buffer_per_cpu *cpu_buffer; 2832 unsigned long flags; 2833 int dolock; 2834 int ret; 2835 2836 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2837 return 1; 2838 2839 dolock = rb_ok_to_lock(); 2840 2841 cpu_buffer = buffer->buffers[cpu]; 2842 local_irq_save(flags); 2843 if (dolock) 2844 spin_lock(&cpu_buffer->reader_lock); 2845 ret = rb_per_cpu_empty(cpu_buffer); 2846 if (dolock) 2847 spin_unlock(&cpu_buffer->reader_lock); 2848 local_irq_restore(flags); 2849 2850 return ret; 2851 } 2852 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 2853 2854 /** 2855 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2856 * @buffer_a: One buffer to swap with 2857 * @buffer_b: The other buffer to swap with 2858 * 2859 * This function is useful for tracers that want to take a "snapshot" 2860 * of a CPU buffer and has another back up buffer lying around. 2861 * it is expected that the tracer handles the cpu buffer not being 2862 * used at the moment. 2863 */ 2864 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2865 struct ring_buffer *buffer_b, int cpu) 2866 { 2867 struct ring_buffer_per_cpu *cpu_buffer_a; 2868 struct ring_buffer_per_cpu *cpu_buffer_b; 2869 int ret = -EINVAL; 2870 2871 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 2872 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 2873 goto out; 2874 2875 /* At least make sure the two buffers are somewhat the same */ 2876 if (buffer_a->pages != buffer_b->pages) 2877 goto out; 2878 2879 ret = -EAGAIN; 2880 2881 if (ring_buffer_flags != RB_BUFFERS_ON) 2882 goto out; 2883 2884 if (atomic_read(&buffer_a->record_disabled)) 2885 goto out; 2886 2887 if (atomic_read(&buffer_b->record_disabled)) 2888 goto out; 2889 2890 cpu_buffer_a = buffer_a->buffers[cpu]; 2891 cpu_buffer_b = buffer_b->buffers[cpu]; 2892 2893 if (atomic_read(&cpu_buffer_a->record_disabled)) 2894 goto out; 2895 2896 if (atomic_read(&cpu_buffer_b->record_disabled)) 2897 goto out; 2898 2899 /* 2900 * We can't do a synchronize_sched here because this 2901 * function can be called in atomic context. 2902 * Normally this will be called from the same CPU as cpu. 2903 * If not it's up to the caller to protect this. 2904 */ 2905 atomic_inc(&cpu_buffer_a->record_disabled); 2906 atomic_inc(&cpu_buffer_b->record_disabled); 2907 2908 buffer_a->buffers[cpu] = cpu_buffer_b; 2909 buffer_b->buffers[cpu] = cpu_buffer_a; 2910 2911 cpu_buffer_b->buffer = buffer_a; 2912 cpu_buffer_a->buffer = buffer_b; 2913 2914 atomic_dec(&cpu_buffer_a->record_disabled); 2915 atomic_dec(&cpu_buffer_b->record_disabled); 2916 2917 ret = 0; 2918 out: 2919 return ret; 2920 } 2921 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 2922 2923 /** 2924 * ring_buffer_alloc_read_page - allocate a page to read from buffer 2925 * @buffer: the buffer to allocate for. 2926 * 2927 * This function is used in conjunction with ring_buffer_read_page. 2928 * When reading a full page from the ring buffer, these functions 2929 * can be used to speed up the process. The calling function should 2930 * allocate a few pages first with this function. Then when it 2931 * needs to get pages from the ring buffer, it passes the result 2932 * of this function into ring_buffer_read_page, which will swap 2933 * the page that was allocated, with the read page of the buffer. 2934 * 2935 * Returns: 2936 * The page allocated, or NULL on error. 2937 */ 2938 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 2939 { 2940 struct buffer_data_page *bpage; 2941 unsigned long addr; 2942 2943 addr = __get_free_page(GFP_KERNEL); 2944 if (!addr) 2945 return NULL; 2946 2947 bpage = (void *)addr; 2948 2949 rb_init_page(bpage); 2950 2951 return bpage; 2952 } 2953 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 2954 2955 /** 2956 * ring_buffer_free_read_page - free an allocated read page 2957 * @buffer: the buffer the page was allocate for 2958 * @data: the page to free 2959 * 2960 * Free a page allocated from ring_buffer_alloc_read_page. 2961 */ 2962 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 2963 { 2964 free_page((unsigned long)data); 2965 } 2966 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 2967 2968 /** 2969 * ring_buffer_read_page - extract a page from the ring buffer 2970 * @buffer: buffer to extract from 2971 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 2972 * @len: amount to extract 2973 * @cpu: the cpu of the buffer to extract 2974 * @full: should the extraction only happen when the page is full. 2975 * 2976 * This function will pull out a page from the ring buffer and consume it. 2977 * @data_page must be the address of the variable that was returned 2978 * from ring_buffer_alloc_read_page. This is because the page might be used 2979 * to swap with a page in the ring buffer. 2980 * 2981 * for example: 2982 * rpage = ring_buffer_alloc_read_page(buffer); 2983 * if (!rpage) 2984 * return error; 2985 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 2986 * if (ret >= 0) 2987 * process_page(rpage, ret); 2988 * 2989 * When @full is set, the function will not return true unless 2990 * the writer is off the reader page. 2991 * 2992 * Note: it is up to the calling functions to handle sleeps and wakeups. 2993 * The ring buffer can be used anywhere in the kernel and can not 2994 * blindly call wake_up. The layer that uses the ring buffer must be 2995 * responsible for that. 2996 * 2997 * Returns: 2998 * >=0 if data has been transferred, returns the offset of consumed data. 2999 * <0 if no data has been transferred. 3000 */ 3001 int ring_buffer_read_page(struct ring_buffer *buffer, 3002 void **data_page, size_t len, int cpu, int full) 3003 { 3004 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3005 struct ring_buffer_event *event; 3006 struct buffer_data_page *bpage; 3007 struct buffer_page *reader; 3008 unsigned long flags; 3009 unsigned int commit; 3010 unsigned int read; 3011 u64 save_timestamp; 3012 int ret = -1; 3013 3014 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3015 goto out; 3016 3017 /* 3018 * If len is not big enough to hold the page header, then 3019 * we can not copy anything. 3020 */ 3021 if (len <= BUF_PAGE_HDR_SIZE) 3022 goto out; 3023 3024 len -= BUF_PAGE_HDR_SIZE; 3025 3026 if (!data_page) 3027 goto out; 3028 3029 bpage = *data_page; 3030 if (!bpage) 3031 goto out; 3032 3033 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3034 3035 reader = rb_get_reader_page(cpu_buffer); 3036 if (!reader) 3037 goto out_unlock; 3038 3039 event = rb_reader_event(cpu_buffer); 3040 3041 read = reader->read; 3042 commit = rb_page_commit(reader); 3043 3044 /* 3045 * If this page has been partially read or 3046 * if len is not big enough to read the rest of the page or 3047 * a writer is still on the page, then 3048 * we must copy the data from the page to the buffer. 3049 * Otherwise, we can simply swap the page with the one passed in. 3050 */ 3051 if (read || (len < (commit - read)) || 3052 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3053 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3054 unsigned int rpos = read; 3055 unsigned int pos = 0; 3056 unsigned int size; 3057 3058 if (full) 3059 goto out_unlock; 3060 3061 if (len > (commit - read)) 3062 len = (commit - read); 3063 3064 size = rb_event_length(event); 3065 3066 if (len < size) 3067 goto out_unlock; 3068 3069 /* save the current timestamp, since the user will need it */ 3070 save_timestamp = cpu_buffer->read_stamp; 3071 3072 /* Need to copy one event at a time */ 3073 do { 3074 memcpy(bpage->data + pos, rpage->data + rpos, size); 3075 3076 len -= size; 3077 3078 rb_advance_reader(cpu_buffer); 3079 rpos = reader->read; 3080 pos += size; 3081 3082 event = rb_reader_event(cpu_buffer); 3083 size = rb_event_length(event); 3084 } while (len > size); 3085 3086 /* update bpage */ 3087 local_set(&bpage->commit, pos); 3088 bpage->time_stamp = save_timestamp; 3089 3090 /* we copied everything to the beginning */ 3091 read = 0; 3092 } else { 3093 /* update the entry counter */ 3094 cpu_buffer->read += local_read(&reader->entries); 3095 3096 /* swap the pages */ 3097 rb_init_page(bpage); 3098 bpage = reader->page; 3099 reader->page = *data_page; 3100 local_set(&reader->write, 0); 3101 local_set(&reader->entries, 0); 3102 reader->read = 0; 3103 *data_page = bpage; 3104 } 3105 ret = read; 3106 3107 out_unlock: 3108 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3109 3110 out: 3111 return ret; 3112 } 3113 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3114 3115 #ifdef CONFIG_TRACING 3116 static ssize_t 3117 rb_simple_read(struct file *filp, char __user *ubuf, 3118 size_t cnt, loff_t *ppos) 3119 { 3120 unsigned long *p = filp->private_data; 3121 char buf[64]; 3122 int r; 3123 3124 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3125 r = sprintf(buf, "permanently disabled\n"); 3126 else 3127 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3128 3129 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3130 } 3131 3132 static ssize_t 3133 rb_simple_write(struct file *filp, const char __user *ubuf, 3134 size_t cnt, loff_t *ppos) 3135 { 3136 unsigned long *p = filp->private_data; 3137 char buf[64]; 3138 unsigned long val; 3139 int ret; 3140 3141 if (cnt >= sizeof(buf)) 3142 return -EINVAL; 3143 3144 if (copy_from_user(&buf, ubuf, cnt)) 3145 return -EFAULT; 3146 3147 buf[cnt] = 0; 3148 3149 ret = strict_strtoul(buf, 10, &val); 3150 if (ret < 0) 3151 return ret; 3152 3153 if (val) 3154 set_bit(RB_BUFFERS_ON_BIT, p); 3155 else 3156 clear_bit(RB_BUFFERS_ON_BIT, p); 3157 3158 (*ppos)++; 3159 3160 return cnt; 3161 } 3162 3163 static const struct file_operations rb_simple_fops = { 3164 .open = tracing_open_generic, 3165 .read = rb_simple_read, 3166 .write = rb_simple_write, 3167 }; 3168 3169 3170 static __init int rb_init_debugfs(void) 3171 { 3172 struct dentry *d_tracer; 3173 3174 d_tracer = tracing_init_dentry(); 3175 3176 trace_create_file("tracing_on", 0644, d_tracer, 3177 &ring_buffer_flags, &rb_simple_fops); 3178 3179 return 0; 3180 } 3181 3182 fs_initcall(rb_init_debugfs); 3183 #endif 3184 3185 #ifdef CONFIG_HOTPLUG_CPU 3186 static int rb_cpu_notify(struct notifier_block *self, 3187 unsigned long action, void *hcpu) 3188 { 3189 struct ring_buffer *buffer = 3190 container_of(self, struct ring_buffer, cpu_notify); 3191 long cpu = (long)hcpu; 3192 3193 switch (action) { 3194 case CPU_UP_PREPARE: 3195 case CPU_UP_PREPARE_FROZEN: 3196 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3197 return NOTIFY_OK; 3198 3199 buffer->buffers[cpu] = 3200 rb_allocate_cpu_buffer(buffer, cpu); 3201 if (!buffer->buffers[cpu]) { 3202 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3203 cpu); 3204 return NOTIFY_OK; 3205 } 3206 smp_wmb(); 3207 cpumask_set_cpu(cpu, buffer->cpumask); 3208 break; 3209 case CPU_DOWN_PREPARE: 3210 case CPU_DOWN_PREPARE_FROZEN: 3211 /* 3212 * Do nothing. 3213 * If we were to free the buffer, then the user would 3214 * lose any trace that was in the buffer. 3215 */ 3216 break; 3217 default: 3218 break; 3219 } 3220 return NOTIFY_OK; 3221 } 3222 #endif 3223