1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/init.h> 18 #include <linux/hash.h> 19 #include <linux/list.h> 20 #include <linux/cpu.h> 21 #include <linux/fs.h> 22 23 #include "trace.h" 24 25 /* 26 * The ring buffer header is special. We must manually up keep it. 27 */ 28 int ring_buffer_print_entry_header(struct trace_seq *s) 29 { 30 int ret; 31 32 ret = trace_seq_printf(s, "# compressed entry header\n"); 33 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 34 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 35 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 36 ret = trace_seq_printf(s, "\n"); 37 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 38 RINGBUF_TYPE_PADDING); 39 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 40 RINGBUF_TYPE_TIME_EXTEND); 41 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 42 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 43 44 return ret; 45 } 46 47 /* 48 * The ring buffer is made up of a list of pages. A separate list of pages is 49 * allocated for each CPU. A writer may only write to a buffer that is 50 * associated with the CPU it is currently executing on. A reader may read 51 * from any per cpu buffer. 52 * 53 * The reader is special. For each per cpu buffer, the reader has its own 54 * reader page. When a reader has read the entire reader page, this reader 55 * page is swapped with another page in the ring buffer. 56 * 57 * Now, as long as the writer is off the reader page, the reader can do what 58 * ever it wants with that page. The writer will never write to that page 59 * again (as long as it is out of the ring buffer). 60 * 61 * Here's some silly ASCII art. 62 * 63 * +------+ 64 * |reader| RING BUFFER 65 * |page | 66 * +------+ +---+ +---+ +---+ 67 * | |-->| |-->| | 68 * +---+ +---+ +---+ 69 * ^ | 70 * | | 71 * +---------------+ 72 * 73 * 74 * +------+ 75 * |reader| RING BUFFER 76 * |page |------------------v 77 * +------+ +---+ +---+ +---+ 78 * | |-->| |-->| | 79 * +---+ +---+ +---+ 80 * ^ | 81 * | | 82 * +---------------+ 83 * 84 * 85 * +------+ 86 * |reader| RING BUFFER 87 * |page |------------------v 88 * +------+ +---+ +---+ +---+ 89 * ^ | |-->| |-->| | 90 * | +---+ +---+ +---+ 91 * | | 92 * | | 93 * +------------------------------+ 94 * 95 * 96 * +------+ 97 * |buffer| RING BUFFER 98 * |page |------------------v 99 * +------+ +---+ +---+ +---+ 100 * ^ | | | |-->| | 101 * | New +---+ +---+ +---+ 102 * | Reader------^ | 103 * | page | 104 * +------------------------------+ 105 * 106 * 107 * After we make this swap, the reader can hand this page off to the splice 108 * code and be done with it. It can even allocate a new page if it needs to 109 * and swap that into the ring buffer. 110 * 111 * We will be using cmpxchg soon to make all this lockless. 112 * 113 */ 114 115 /* 116 * A fast way to enable or disable all ring buffers is to 117 * call tracing_on or tracing_off. Turning off the ring buffers 118 * prevents all ring buffers from being recorded to. 119 * Turning this switch on, makes it OK to write to the 120 * ring buffer, if the ring buffer is enabled itself. 121 * 122 * There's three layers that must be on in order to write 123 * to the ring buffer. 124 * 125 * 1) This global flag must be set. 126 * 2) The ring buffer must be enabled for recording. 127 * 3) The per cpu buffer must be enabled for recording. 128 * 129 * In case of an anomaly, this global flag has a bit set that 130 * will permantly disable all ring buffers. 131 */ 132 133 /* 134 * Global flag to disable all recording to ring buffers 135 * This has two bits: ON, DISABLED 136 * 137 * ON DISABLED 138 * ---- ---------- 139 * 0 0 : ring buffers are off 140 * 1 0 : ring buffers are on 141 * X 1 : ring buffers are permanently disabled 142 */ 143 144 enum { 145 RB_BUFFERS_ON_BIT = 0, 146 RB_BUFFERS_DISABLED_BIT = 1, 147 }; 148 149 enum { 150 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 151 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 152 }; 153 154 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 155 156 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 157 158 /** 159 * tracing_on - enable all tracing buffers 160 * 161 * This function enables all tracing buffers that may have been 162 * disabled with tracing_off. 163 */ 164 void tracing_on(void) 165 { 166 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 167 } 168 EXPORT_SYMBOL_GPL(tracing_on); 169 170 /** 171 * tracing_off - turn off all tracing buffers 172 * 173 * This function stops all tracing buffers from recording data. 174 * It does not disable any overhead the tracers themselves may 175 * be causing. This function simply causes all recording to 176 * the ring buffers to fail. 177 */ 178 void tracing_off(void) 179 { 180 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 181 } 182 EXPORT_SYMBOL_GPL(tracing_off); 183 184 /** 185 * tracing_off_permanent - permanently disable ring buffers 186 * 187 * This function, once called, will disable all ring buffers 188 * permanently. 189 */ 190 void tracing_off_permanent(void) 191 { 192 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 193 } 194 195 /** 196 * tracing_is_on - show state of ring buffers enabled 197 */ 198 int tracing_is_on(void) 199 { 200 return ring_buffer_flags == RB_BUFFERS_ON; 201 } 202 EXPORT_SYMBOL_GPL(tracing_is_on); 203 204 #include "trace.h" 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 212 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 213 214 enum { 215 RB_LEN_TIME_EXTEND = 8, 216 RB_LEN_TIME_STAMP = 16, 217 }; 218 219 static inline int rb_null_event(struct ring_buffer_event *event) 220 { 221 return event->type_len == RINGBUF_TYPE_PADDING 222 && event->time_delta == 0; 223 } 224 225 static inline int rb_discarded_event(struct ring_buffer_event *event) 226 { 227 return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta; 228 } 229 230 static void rb_event_set_padding(struct ring_buffer_event *event) 231 { 232 event->type_len = RINGBUF_TYPE_PADDING; 233 event->time_delta = 0; 234 } 235 236 static unsigned 237 rb_event_data_length(struct ring_buffer_event *event) 238 { 239 unsigned length; 240 241 if (event->type_len) 242 length = event->type_len * RB_ALIGNMENT; 243 else 244 length = event->array[0]; 245 return length + RB_EVNT_HDR_SIZE; 246 } 247 248 /* inline for ring buffer fast paths */ 249 static unsigned 250 rb_event_length(struct ring_buffer_event *event) 251 { 252 switch (event->type_len) { 253 case RINGBUF_TYPE_PADDING: 254 if (rb_null_event(event)) 255 /* undefined */ 256 return -1; 257 return event->array[0] + RB_EVNT_HDR_SIZE; 258 259 case RINGBUF_TYPE_TIME_EXTEND: 260 return RB_LEN_TIME_EXTEND; 261 262 case RINGBUF_TYPE_TIME_STAMP: 263 return RB_LEN_TIME_STAMP; 264 265 case RINGBUF_TYPE_DATA: 266 return rb_event_data_length(event); 267 default: 268 BUG(); 269 } 270 /* not hit */ 271 return 0; 272 } 273 274 /** 275 * ring_buffer_event_length - return the length of the event 276 * @event: the event to get the length of 277 */ 278 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 279 { 280 unsigned length = rb_event_length(event); 281 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 282 return length; 283 length -= RB_EVNT_HDR_SIZE; 284 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 285 length -= sizeof(event->array[0]); 286 return length; 287 } 288 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 289 290 /* inline for ring buffer fast paths */ 291 static void * 292 rb_event_data(struct ring_buffer_event *event) 293 { 294 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define TS_SHIFT 27 316 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 317 #define TS_DELTA_TEST (~TS_MASK) 318 319 struct buffer_data_page { 320 u64 time_stamp; /* page time stamp */ 321 local_t commit; /* write committed index */ 322 unsigned char data[]; /* data of buffer page */ 323 }; 324 325 struct buffer_page { 326 struct list_head list; /* list of buffer pages */ 327 local_t write; /* index for next write */ 328 unsigned read; /* index for next read */ 329 local_t entries; /* entries on this page */ 330 struct buffer_data_page *page; /* Actual data page */ 331 }; 332 333 static void rb_init_page(struct buffer_data_page *bpage) 334 { 335 local_set(&bpage->commit, 0); 336 } 337 338 /** 339 * ring_buffer_page_len - the size of data on the page. 340 * @page: The page to read 341 * 342 * Returns the amount of data on the page, including buffer page header. 343 */ 344 size_t ring_buffer_page_len(void *page) 345 { 346 return local_read(&((struct buffer_data_page *)page)->commit) 347 + BUF_PAGE_HDR_SIZE; 348 } 349 350 /* 351 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 352 * this issue out. 353 */ 354 static void free_buffer_page(struct buffer_page *bpage) 355 { 356 free_page((unsigned long)bpage->page); 357 kfree(bpage); 358 } 359 360 /* 361 * We need to fit the time_stamp delta into 27 bits. 362 */ 363 static inline int test_time_stamp(u64 delta) 364 { 365 if (delta & TS_DELTA_TEST) 366 return 1; 367 return 0; 368 } 369 370 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 371 372 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 373 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 374 375 /* Max number of timestamps that can fit on a page */ 376 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 377 378 int ring_buffer_print_page_header(struct trace_seq *s) 379 { 380 struct buffer_data_page field; 381 int ret; 382 383 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 384 "offset:0;\tsize:%u;\n", 385 (unsigned int)sizeof(field.time_stamp)); 386 387 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 388 "offset:%u;\tsize:%u;\n", 389 (unsigned int)offsetof(typeof(field), commit), 390 (unsigned int)sizeof(field.commit)); 391 392 ret = trace_seq_printf(s, "\tfield: char data;\t" 393 "offset:%u;\tsize:%u;\n", 394 (unsigned int)offsetof(typeof(field), data), 395 (unsigned int)BUF_PAGE_SIZE); 396 397 return ret; 398 } 399 400 /* 401 * head_page == tail_page && head == tail then buffer is empty. 402 */ 403 struct ring_buffer_per_cpu { 404 int cpu; 405 struct ring_buffer *buffer; 406 spinlock_t reader_lock; /* serialize readers */ 407 raw_spinlock_t lock; 408 struct lock_class_key lock_key; 409 struct list_head pages; 410 struct buffer_page *head_page; /* read from head */ 411 struct buffer_page *tail_page; /* write to tail */ 412 struct buffer_page *commit_page; /* committed pages */ 413 struct buffer_page *reader_page; 414 unsigned long nmi_dropped; 415 unsigned long commit_overrun; 416 unsigned long overrun; 417 unsigned long read; 418 local_t entries; 419 local_t committing; 420 local_t commits; 421 u64 write_stamp; 422 u64 read_stamp; 423 atomic_t record_disabled; 424 }; 425 426 struct ring_buffer { 427 unsigned pages; 428 unsigned flags; 429 int cpus; 430 atomic_t record_disabled; 431 cpumask_var_t cpumask; 432 433 struct lock_class_key *reader_lock_key; 434 435 struct mutex mutex; 436 437 struct ring_buffer_per_cpu **buffers; 438 439 #ifdef CONFIG_HOTPLUG_CPU 440 struct notifier_block cpu_notify; 441 #endif 442 u64 (*clock)(void); 443 }; 444 445 struct ring_buffer_iter { 446 struct ring_buffer_per_cpu *cpu_buffer; 447 unsigned long head; 448 struct buffer_page *head_page; 449 u64 read_stamp; 450 }; 451 452 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 453 #define RB_WARN_ON(buffer, cond) \ 454 ({ \ 455 int _____ret = unlikely(cond); \ 456 if (_____ret) { \ 457 atomic_inc(&buffer->record_disabled); \ 458 WARN_ON(1); \ 459 } \ 460 _____ret; \ 461 }) 462 463 /* Up this if you want to test the TIME_EXTENTS and normalization */ 464 #define DEBUG_SHIFT 0 465 466 static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu) 467 { 468 /* shift to debug/test normalization and TIME_EXTENTS */ 469 return buffer->clock() << DEBUG_SHIFT; 470 } 471 472 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 473 { 474 u64 time; 475 476 preempt_disable_notrace(); 477 time = rb_time_stamp(buffer, cpu); 478 preempt_enable_no_resched_notrace(); 479 480 return time; 481 } 482 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 483 484 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 485 int cpu, u64 *ts) 486 { 487 /* Just stupid testing the normalize function and deltas */ 488 *ts >>= DEBUG_SHIFT; 489 } 490 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 491 492 /** 493 * check_pages - integrity check of buffer pages 494 * @cpu_buffer: CPU buffer with pages to test 495 * 496 * As a safety measure we check to make sure the data pages have not 497 * been corrupted. 498 */ 499 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 500 { 501 struct list_head *head = &cpu_buffer->pages; 502 struct buffer_page *bpage, *tmp; 503 504 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 505 return -1; 506 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 507 return -1; 508 509 list_for_each_entry_safe(bpage, tmp, head, list) { 510 if (RB_WARN_ON(cpu_buffer, 511 bpage->list.next->prev != &bpage->list)) 512 return -1; 513 if (RB_WARN_ON(cpu_buffer, 514 bpage->list.prev->next != &bpage->list)) 515 return -1; 516 } 517 518 return 0; 519 } 520 521 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 522 unsigned nr_pages) 523 { 524 struct list_head *head = &cpu_buffer->pages; 525 struct buffer_page *bpage, *tmp; 526 unsigned long addr; 527 LIST_HEAD(pages); 528 unsigned i; 529 530 for (i = 0; i < nr_pages; i++) { 531 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 532 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 533 if (!bpage) 534 goto free_pages; 535 list_add(&bpage->list, &pages); 536 537 addr = __get_free_page(GFP_KERNEL); 538 if (!addr) 539 goto free_pages; 540 bpage->page = (void *)addr; 541 rb_init_page(bpage->page); 542 } 543 544 list_splice(&pages, head); 545 546 rb_check_pages(cpu_buffer); 547 548 return 0; 549 550 free_pages: 551 list_for_each_entry_safe(bpage, tmp, &pages, list) { 552 list_del_init(&bpage->list); 553 free_buffer_page(bpage); 554 } 555 return -ENOMEM; 556 } 557 558 static struct ring_buffer_per_cpu * 559 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 560 { 561 struct ring_buffer_per_cpu *cpu_buffer; 562 struct buffer_page *bpage; 563 unsigned long addr; 564 int ret; 565 566 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 567 GFP_KERNEL, cpu_to_node(cpu)); 568 if (!cpu_buffer) 569 return NULL; 570 571 cpu_buffer->cpu = cpu; 572 cpu_buffer->buffer = buffer; 573 spin_lock_init(&cpu_buffer->reader_lock); 574 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 575 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 576 INIT_LIST_HEAD(&cpu_buffer->pages); 577 578 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 579 GFP_KERNEL, cpu_to_node(cpu)); 580 if (!bpage) 581 goto fail_free_buffer; 582 583 cpu_buffer->reader_page = bpage; 584 addr = __get_free_page(GFP_KERNEL); 585 if (!addr) 586 goto fail_free_reader; 587 bpage->page = (void *)addr; 588 rb_init_page(bpage->page); 589 590 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 591 592 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 593 if (ret < 0) 594 goto fail_free_reader; 595 596 cpu_buffer->head_page 597 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 598 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 599 600 return cpu_buffer; 601 602 fail_free_reader: 603 free_buffer_page(cpu_buffer->reader_page); 604 605 fail_free_buffer: 606 kfree(cpu_buffer); 607 return NULL; 608 } 609 610 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 611 { 612 struct list_head *head = &cpu_buffer->pages; 613 struct buffer_page *bpage, *tmp; 614 615 free_buffer_page(cpu_buffer->reader_page); 616 617 list_for_each_entry_safe(bpage, tmp, head, list) { 618 list_del_init(&bpage->list); 619 free_buffer_page(bpage); 620 } 621 kfree(cpu_buffer); 622 } 623 624 #ifdef CONFIG_HOTPLUG_CPU 625 static int rb_cpu_notify(struct notifier_block *self, 626 unsigned long action, void *hcpu); 627 #endif 628 629 /** 630 * ring_buffer_alloc - allocate a new ring_buffer 631 * @size: the size in bytes per cpu that is needed. 632 * @flags: attributes to set for the ring buffer. 633 * 634 * Currently the only flag that is available is the RB_FL_OVERWRITE 635 * flag. This flag means that the buffer will overwrite old data 636 * when the buffer wraps. If this flag is not set, the buffer will 637 * drop data when the tail hits the head. 638 */ 639 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 640 struct lock_class_key *key) 641 { 642 struct ring_buffer *buffer; 643 int bsize; 644 int cpu; 645 646 /* keep it in its own cache line */ 647 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 648 GFP_KERNEL); 649 if (!buffer) 650 return NULL; 651 652 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 653 goto fail_free_buffer; 654 655 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 656 buffer->flags = flags; 657 buffer->clock = trace_clock_local; 658 buffer->reader_lock_key = key; 659 660 /* need at least two pages */ 661 if (buffer->pages < 2) 662 buffer->pages = 2; 663 664 /* 665 * In case of non-hotplug cpu, if the ring-buffer is allocated 666 * in early initcall, it will not be notified of secondary cpus. 667 * In that off case, we need to allocate for all possible cpus. 668 */ 669 #ifdef CONFIG_HOTPLUG_CPU 670 get_online_cpus(); 671 cpumask_copy(buffer->cpumask, cpu_online_mask); 672 #else 673 cpumask_copy(buffer->cpumask, cpu_possible_mask); 674 #endif 675 buffer->cpus = nr_cpu_ids; 676 677 bsize = sizeof(void *) * nr_cpu_ids; 678 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 679 GFP_KERNEL); 680 if (!buffer->buffers) 681 goto fail_free_cpumask; 682 683 for_each_buffer_cpu(buffer, cpu) { 684 buffer->buffers[cpu] = 685 rb_allocate_cpu_buffer(buffer, cpu); 686 if (!buffer->buffers[cpu]) 687 goto fail_free_buffers; 688 } 689 690 #ifdef CONFIG_HOTPLUG_CPU 691 buffer->cpu_notify.notifier_call = rb_cpu_notify; 692 buffer->cpu_notify.priority = 0; 693 register_cpu_notifier(&buffer->cpu_notify); 694 #endif 695 696 put_online_cpus(); 697 mutex_init(&buffer->mutex); 698 699 return buffer; 700 701 fail_free_buffers: 702 for_each_buffer_cpu(buffer, cpu) { 703 if (buffer->buffers[cpu]) 704 rb_free_cpu_buffer(buffer->buffers[cpu]); 705 } 706 kfree(buffer->buffers); 707 708 fail_free_cpumask: 709 free_cpumask_var(buffer->cpumask); 710 put_online_cpus(); 711 712 fail_free_buffer: 713 kfree(buffer); 714 return NULL; 715 } 716 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 717 718 /** 719 * ring_buffer_free - free a ring buffer. 720 * @buffer: the buffer to free. 721 */ 722 void 723 ring_buffer_free(struct ring_buffer *buffer) 724 { 725 int cpu; 726 727 get_online_cpus(); 728 729 #ifdef CONFIG_HOTPLUG_CPU 730 unregister_cpu_notifier(&buffer->cpu_notify); 731 #endif 732 733 for_each_buffer_cpu(buffer, cpu) 734 rb_free_cpu_buffer(buffer->buffers[cpu]); 735 736 put_online_cpus(); 737 738 free_cpumask_var(buffer->cpumask); 739 740 kfree(buffer); 741 } 742 EXPORT_SYMBOL_GPL(ring_buffer_free); 743 744 void ring_buffer_set_clock(struct ring_buffer *buffer, 745 u64 (*clock)(void)) 746 { 747 buffer->clock = clock; 748 } 749 750 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 751 752 static void 753 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 754 { 755 struct buffer_page *bpage; 756 struct list_head *p; 757 unsigned i; 758 759 atomic_inc(&cpu_buffer->record_disabled); 760 synchronize_sched(); 761 762 for (i = 0; i < nr_pages; i++) { 763 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 764 return; 765 p = cpu_buffer->pages.next; 766 bpage = list_entry(p, struct buffer_page, list); 767 list_del_init(&bpage->list); 768 free_buffer_page(bpage); 769 } 770 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 771 return; 772 773 rb_reset_cpu(cpu_buffer); 774 775 rb_check_pages(cpu_buffer); 776 777 atomic_dec(&cpu_buffer->record_disabled); 778 779 } 780 781 static void 782 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 783 struct list_head *pages, unsigned nr_pages) 784 { 785 struct buffer_page *bpage; 786 struct list_head *p; 787 unsigned i; 788 789 atomic_inc(&cpu_buffer->record_disabled); 790 synchronize_sched(); 791 792 for (i = 0; i < nr_pages; i++) { 793 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 794 return; 795 p = pages->next; 796 bpage = list_entry(p, struct buffer_page, list); 797 list_del_init(&bpage->list); 798 list_add_tail(&bpage->list, &cpu_buffer->pages); 799 } 800 rb_reset_cpu(cpu_buffer); 801 802 rb_check_pages(cpu_buffer); 803 804 atomic_dec(&cpu_buffer->record_disabled); 805 } 806 807 /** 808 * ring_buffer_resize - resize the ring buffer 809 * @buffer: the buffer to resize. 810 * @size: the new size. 811 * 812 * The tracer is responsible for making sure that the buffer is 813 * not being used while changing the size. 814 * Note: We may be able to change the above requirement by using 815 * RCU synchronizations. 816 * 817 * Minimum size is 2 * BUF_PAGE_SIZE. 818 * 819 * Returns -1 on failure. 820 */ 821 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 822 { 823 struct ring_buffer_per_cpu *cpu_buffer; 824 unsigned nr_pages, rm_pages, new_pages; 825 struct buffer_page *bpage, *tmp; 826 unsigned long buffer_size; 827 unsigned long addr; 828 LIST_HEAD(pages); 829 int i, cpu; 830 831 /* 832 * Always succeed at resizing a non-existent buffer: 833 */ 834 if (!buffer) 835 return size; 836 837 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 838 size *= BUF_PAGE_SIZE; 839 buffer_size = buffer->pages * BUF_PAGE_SIZE; 840 841 /* we need a minimum of two pages */ 842 if (size < BUF_PAGE_SIZE * 2) 843 size = BUF_PAGE_SIZE * 2; 844 845 if (size == buffer_size) 846 return size; 847 848 mutex_lock(&buffer->mutex); 849 get_online_cpus(); 850 851 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 852 853 if (size < buffer_size) { 854 855 /* easy case, just free pages */ 856 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 857 goto out_fail; 858 859 rm_pages = buffer->pages - nr_pages; 860 861 for_each_buffer_cpu(buffer, cpu) { 862 cpu_buffer = buffer->buffers[cpu]; 863 rb_remove_pages(cpu_buffer, rm_pages); 864 } 865 goto out; 866 } 867 868 /* 869 * This is a bit more difficult. We only want to add pages 870 * when we can allocate enough for all CPUs. We do this 871 * by allocating all the pages and storing them on a local 872 * link list. If we succeed in our allocation, then we 873 * add these pages to the cpu_buffers. Otherwise we just free 874 * them all and return -ENOMEM; 875 */ 876 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 877 goto out_fail; 878 879 new_pages = nr_pages - buffer->pages; 880 881 for_each_buffer_cpu(buffer, cpu) { 882 for (i = 0; i < new_pages; i++) { 883 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 884 cache_line_size()), 885 GFP_KERNEL, cpu_to_node(cpu)); 886 if (!bpage) 887 goto free_pages; 888 list_add(&bpage->list, &pages); 889 addr = __get_free_page(GFP_KERNEL); 890 if (!addr) 891 goto free_pages; 892 bpage->page = (void *)addr; 893 rb_init_page(bpage->page); 894 } 895 } 896 897 for_each_buffer_cpu(buffer, cpu) { 898 cpu_buffer = buffer->buffers[cpu]; 899 rb_insert_pages(cpu_buffer, &pages, new_pages); 900 } 901 902 if (RB_WARN_ON(buffer, !list_empty(&pages))) 903 goto out_fail; 904 905 out: 906 buffer->pages = nr_pages; 907 put_online_cpus(); 908 mutex_unlock(&buffer->mutex); 909 910 return size; 911 912 free_pages: 913 list_for_each_entry_safe(bpage, tmp, &pages, list) { 914 list_del_init(&bpage->list); 915 free_buffer_page(bpage); 916 } 917 put_online_cpus(); 918 mutex_unlock(&buffer->mutex); 919 return -ENOMEM; 920 921 /* 922 * Something went totally wrong, and we are too paranoid 923 * to even clean up the mess. 924 */ 925 out_fail: 926 put_online_cpus(); 927 mutex_unlock(&buffer->mutex); 928 return -1; 929 } 930 EXPORT_SYMBOL_GPL(ring_buffer_resize); 931 932 static inline void * 933 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 934 { 935 return bpage->data + index; 936 } 937 938 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 939 { 940 return bpage->page->data + index; 941 } 942 943 static inline struct ring_buffer_event * 944 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 945 { 946 return __rb_page_index(cpu_buffer->reader_page, 947 cpu_buffer->reader_page->read); 948 } 949 950 static inline struct ring_buffer_event * 951 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 952 { 953 return __rb_page_index(cpu_buffer->head_page, 954 cpu_buffer->head_page->read); 955 } 956 957 static inline struct ring_buffer_event * 958 rb_iter_head_event(struct ring_buffer_iter *iter) 959 { 960 return __rb_page_index(iter->head_page, iter->head); 961 } 962 963 static inline unsigned rb_page_write(struct buffer_page *bpage) 964 { 965 return local_read(&bpage->write); 966 } 967 968 static inline unsigned rb_page_commit(struct buffer_page *bpage) 969 { 970 return local_read(&bpage->page->commit); 971 } 972 973 /* Size is determined by what has been commited */ 974 static inline unsigned rb_page_size(struct buffer_page *bpage) 975 { 976 return rb_page_commit(bpage); 977 } 978 979 static inline unsigned 980 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 981 { 982 return rb_page_commit(cpu_buffer->commit_page); 983 } 984 985 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 986 { 987 return rb_page_commit(cpu_buffer->head_page); 988 } 989 990 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 991 struct buffer_page **bpage) 992 { 993 struct list_head *p = (*bpage)->list.next; 994 995 if (p == &cpu_buffer->pages) 996 p = p->next; 997 998 *bpage = list_entry(p, struct buffer_page, list); 999 } 1000 1001 static inline unsigned 1002 rb_event_index(struct ring_buffer_event *event) 1003 { 1004 unsigned long addr = (unsigned long)event; 1005 1006 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1007 } 1008 1009 static inline int 1010 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1011 struct ring_buffer_event *event) 1012 { 1013 unsigned long addr = (unsigned long)event; 1014 unsigned long index; 1015 1016 index = rb_event_index(event); 1017 addr &= PAGE_MASK; 1018 1019 return cpu_buffer->commit_page->page == (void *)addr && 1020 rb_commit_index(cpu_buffer) == index; 1021 } 1022 1023 static void 1024 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1025 { 1026 /* 1027 * We only race with interrupts and NMIs on this CPU. 1028 * If we own the commit event, then we can commit 1029 * all others that interrupted us, since the interruptions 1030 * are in stack format (they finish before they come 1031 * back to us). This allows us to do a simple loop to 1032 * assign the commit to the tail. 1033 */ 1034 again: 1035 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1036 cpu_buffer->commit_page->page->commit = 1037 cpu_buffer->commit_page->write; 1038 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1039 cpu_buffer->write_stamp = 1040 cpu_buffer->commit_page->page->time_stamp; 1041 /* add barrier to keep gcc from optimizing too much */ 1042 barrier(); 1043 } 1044 while (rb_commit_index(cpu_buffer) != 1045 rb_page_write(cpu_buffer->commit_page)) { 1046 cpu_buffer->commit_page->page->commit = 1047 cpu_buffer->commit_page->write; 1048 barrier(); 1049 } 1050 1051 /* again, keep gcc from optimizing */ 1052 barrier(); 1053 1054 /* 1055 * If an interrupt came in just after the first while loop 1056 * and pushed the tail page forward, we will be left with 1057 * a dangling commit that will never go forward. 1058 */ 1059 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1060 goto again; 1061 } 1062 1063 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1064 { 1065 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1066 cpu_buffer->reader_page->read = 0; 1067 } 1068 1069 static void rb_inc_iter(struct ring_buffer_iter *iter) 1070 { 1071 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1072 1073 /* 1074 * The iterator could be on the reader page (it starts there). 1075 * But the head could have moved, since the reader was 1076 * found. Check for this case and assign the iterator 1077 * to the head page instead of next. 1078 */ 1079 if (iter->head_page == cpu_buffer->reader_page) 1080 iter->head_page = cpu_buffer->head_page; 1081 else 1082 rb_inc_page(cpu_buffer, &iter->head_page); 1083 1084 iter->read_stamp = iter->head_page->page->time_stamp; 1085 iter->head = 0; 1086 } 1087 1088 /** 1089 * ring_buffer_update_event - update event type and data 1090 * @event: the even to update 1091 * @type: the type of event 1092 * @length: the size of the event field in the ring buffer 1093 * 1094 * Update the type and data fields of the event. The length 1095 * is the actual size that is written to the ring buffer, 1096 * and with this, we can determine what to place into the 1097 * data field. 1098 */ 1099 static void 1100 rb_update_event(struct ring_buffer_event *event, 1101 unsigned type, unsigned length) 1102 { 1103 event->type_len = type; 1104 1105 switch (type) { 1106 1107 case RINGBUF_TYPE_PADDING: 1108 case RINGBUF_TYPE_TIME_EXTEND: 1109 case RINGBUF_TYPE_TIME_STAMP: 1110 break; 1111 1112 case 0: 1113 length -= RB_EVNT_HDR_SIZE; 1114 if (length > RB_MAX_SMALL_DATA) 1115 event->array[0] = length; 1116 else 1117 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1118 break; 1119 default: 1120 BUG(); 1121 } 1122 } 1123 1124 static unsigned rb_calculate_event_length(unsigned length) 1125 { 1126 struct ring_buffer_event event; /* Used only for sizeof array */ 1127 1128 /* zero length can cause confusions */ 1129 if (!length) 1130 length = 1; 1131 1132 if (length > RB_MAX_SMALL_DATA) 1133 length += sizeof(event.array[0]); 1134 1135 length += RB_EVNT_HDR_SIZE; 1136 length = ALIGN(length, RB_ALIGNMENT); 1137 1138 return length; 1139 } 1140 1141 static inline void 1142 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1143 struct buffer_page *tail_page, 1144 unsigned long tail, unsigned long length) 1145 { 1146 struct ring_buffer_event *event; 1147 1148 /* 1149 * Only the event that crossed the page boundary 1150 * must fill the old tail_page with padding. 1151 */ 1152 if (tail >= BUF_PAGE_SIZE) { 1153 local_sub(length, &tail_page->write); 1154 return; 1155 } 1156 1157 event = __rb_page_index(tail_page, tail); 1158 kmemcheck_annotate_bitfield(event, bitfield); 1159 1160 /* 1161 * If this event is bigger than the minimum size, then 1162 * we need to be careful that we don't subtract the 1163 * write counter enough to allow another writer to slip 1164 * in on this page. 1165 * We put in a discarded commit instead, to make sure 1166 * that this space is not used again. 1167 * 1168 * If we are less than the minimum size, we don't need to 1169 * worry about it. 1170 */ 1171 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1172 /* No room for any events */ 1173 1174 /* Mark the rest of the page with padding */ 1175 rb_event_set_padding(event); 1176 1177 /* Set the write back to the previous setting */ 1178 local_sub(length, &tail_page->write); 1179 return; 1180 } 1181 1182 /* Put in a discarded event */ 1183 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1184 event->type_len = RINGBUF_TYPE_PADDING; 1185 /* time delta must be non zero */ 1186 event->time_delta = 1; 1187 /* Account for this as an entry */ 1188 local_inc(&tail_page->entries); 1189 local_inc(&cpu_buffer->entries); 1190 1191 /* Set write to end of buffer */ 1192 length = (tail + length) - BUF_PAGE_SIZE; 1193 local_sub(length, &tail_page->write); 1194 } 1195 1196 static struct ring_buffer_event * 1197 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1198 unsigned long length, unsigned long tail, 1199 struct buffer_page *commit_page, 1200 struct buffer_page *tail_page, u64 *ts) 1201 { 1202 struct buffer_page *next_page, *head_page, *reader_page; 1203 struct ring_buffer *buffer = cpu_buffer->buffer; 1204 bool lock_taken = false; 1205 unsigned long flags; 1206 1207 next_page = tail_page; 1208 1209 local_irq_save(flags); 1210 /* 1211 * Since the write to the buffer is still not 1212 * fully lockless, we must be careful with NMIs. 1213 * The locks in the writers are taken when a write 1214 * crosses to a new page. The locks protect against 1215 * races with the readers (this will soon be fixed 1216 * with a lockless solution). 1217 * 1218 * Because we can not protect against NMIs, and we 1219 * want to keep traces reentrant, we need to manage 1220 * what happens when we are in an NMI. 1221 * 1222 * NMIs can happen after we take the lock. 1223 * If we are in an NMI, only take the lock 1224 * if it is not already taken. Otherwise 1225 * simply fail. 1226 */ 1227 if (unlikely(in_nmi())) { 1228 if (!__raw_spin_trylock(&cpu_buffer->lock)) { 1229 cpu_buffer->nmi_dropped++; 1230 goto out_reset; 1231 } 1232 } else 1233 __raw_spin_lock(&cpu_buffer->lock); 1234 1235 lock_taken = true; 1236 1237 rb_inc_page(cpu_buffer, &next_page); 1238 1239 head_page = cpu_buffer->head_page; 1240 reader_page = cpu_buffer->reader_page; 1241 1242 /* we grabbed the lock before incrementing */ 1243 if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) 1244 goto out_reset; 1245 1246 /* 1247 * If for some reason, we had an interrupt storm that made 1248 * it all the way around the buffer, bail, and warn 1249 * about it. 1250 */ 1251 if (unlikely(next_page == commit_page)) { 1252 cpu_buffer->commit_overrun++; 1253 goto out_reset; 1254 } 1255 1256 if (next_page == head_page) { 1257 if (!(buffer->flags & RB_FL_OVERWRITE)) 1258 goto out_reset; 1259 1260 /* tail_page has not moved yet? */ 1261 if (tail_page == cpu_buffer->tail_page) { 1262 /* count overflows */ 1263 cpu_buffer->overrun += 1264 local_read(&head_page->entries); 1265 1266 rb_inc_page(cpu_buffer, &head_page); 1267 cpu_buffer->head_page = head_page; 1268 cpu_buffer->head_page->read = 0; 1269 } 1270 } 1271 1272 /* 1273 * If the tail page is still the same as what we think 1274 * it is, then it is up to us to update the tail 1275 * pointer. 1276 */ 1277 if (tail_page == cpu_buffer->tail_page) { 1278 local_set(&next_page->write, 0); 1279 local_set(&next_page->entries, 0); 1280 local_set(&next_page->page->commit, 0); 1281 cpu_buffer->tail_page = next_page; 1282 1283 /* reread the time stamp */ 1284 *ts = rb_time_stamp(buffer, cpu_buffer->cpu); 1285 cpu_buffer->tail_page->page->time_stamp = *ts; 1286 } 1287 1288 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1289 1290 __raw_spin_unlock(&cpu_buffer->lock); 1291 local_irq_restore(flags); 1292 1293 /* fail and let the caller try again */ 1294 return ERR_PTR(-EAGAIN); 1295 1296 out_reset: 1297 /* reset write */ 1298 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1299 1300 if (likely(lock_taken)) 1301 __raw_spin_unlock(&cpu_buffer->lock); 1302 local_irq_restore(flags); 1303 return NULL; 1304 } 1305 1306 static struct ring_buffer_event * 1307 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1308 unsigned type, unsigned long length, u64 *ts) 1309 { 1310 struct buffer_page *tail_page, *commit_page; 1311 struct ring_buffer_event *event; 1312 unsigned long tail, write; 1313 1314 commit_page = cpu_buffer->commit_page; 1315 /* we just need to protect against interrupts */ 1316 barrier(); 1317 tail_page = cpu_buffer->tail_page; 1318 write = local_add_return(length, &tail_page->write); 1319 tail = write - length; 1320 1321 /* See if we shot pass the end of this buffer page */ 1322 if (write > BUF_PAGE_SIZE) 1323 return rb_move_tail(cpu_buffer, length, tail, 1324 commit_page, tail_page, ts); 1325 1326 /* We reserved something on the buffer */ 1327 1328 event = __rb_page_index(tail_page, tail); 1329 kmemcheck_annotate_bitfield(event, bitfield); 1330 rb_update_event(event, type, length); 1331 1332 /* The passed in type is zero for DATA */ 1333 if (likely(!type)) 1334 local_inc(&tail_page->entries); 1335 1336 /* 1337 * If this is the first commit on the page, then update 1338 * its timestamp. 1339 */ 1340 if (!tail) 1341 tail_page->page->time_stamp = *ts; 1342 1343 return event; 1344 } 1345 1346 static inline int 1347 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1348 struct ring_buffer_event *event) 1349 { 1350 unsigned long new_index, old_index; 1351 struct buffer_page *bpage; 1352 unsigned long index; 1353 unsigned long addr; 1354 1355 new_index = rb_event_index(event); 1356 old_index = new_index + rb_event_length(event); 1357 addr = (unsigned long)event; 1358 addr &= PAGE_MASK; 1359 1360 bpage = cpu_buffer->tail_page; 1361 1362 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1363 /* 1364 * This is on the tail page. It is possible that 1365 * a write could come in and move the tail page 1366 * and write to the next page. That is fine 1367 * because we just shorten what is on this page. 1368 */ 1369 index = local_cmpxchg(&bpage->write, old_index, new_index); 1370 if (index == old_index) 1371 return 1; 1372 } 1373 1374 /* could not discard */ 1375 return 0; 1376 } 1377 1378 static int 1379 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1380 u64 *ts, u64 *delta) 1381 { 1382 struct ring_buffer_event *event; 1383 static int once; 1384 int ret; 1385 1386 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1387 printk(KERN_WARNING "Delta way too big! %llu" 1388 " ts=%llu write stamp = %llu\n", 1389 (unsigned long long)*delta, 1390 (unsigned long long)*ts, 1391 (unsigned long long)cpu_buffer->write_stamp); 1392 WARN_ON(1); 1393 } 1394 1395 /* 1396 * The delta is too big, we to add a 1397 * new timestamp. 1398 */ 1399 event = __rb_reserve_next(cpu_buffer, 1400 RINGBUF_TYPE_TIME_EXTEND, 1401 RB_LEN_TIME_EXTEND, 1402 ts); 1403 if (!event) 1404 return -EBUSY; 1405 1406 if (PTR_ERR(event) == -EAGAIN) 1407 return -EAGAIN; 1408 1409 /* Only a commited time event can update the write stamp */ 1410 if (rb_event_is_commit(cpu_buffer, event)) { 1411 /* 1412 * If this is the first on the page, then it was 1413 * updated with the page itself. Try to discard it 1414 * and if we can't just make it zero. 1415 */ 1416 if (rb_event_index(event)) { 1417 event->time_delta = *delta & TS_MASK; 1418 event->array[0] = *delta >> TS_SHIFT; 1419 } else { 1420 /* try to discard, since we do not need this */ 1421 if (!rb_try_to_discard(cpu_buffer, event)) { 1422 /* nope, just zero it */ 1423 event->time_delta = 0; 1424 event->array[0] = 0; 1425 } 1426 } 1427 cpu_buffer->write_stamp = *ts; 1428 /* let the caller know this was the commit */ 1429 ret = 1; 1430 } else { 1431 /* Try to discard the event */ 1432 if (!rb_try_to_discard(cpu_buffer, event)) { 1433 /* Darn, this is just wasted space */ 1434 event->time_delta = 0; 1435 event->array[0] = 0; 1436 } 1437 ret = 0; 1438 } 1439 1440 *delta = 0; 1441 1442 return ret; 1443 } 1444 1445 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 1446 { 1447 local_inc(&cpu_buffer->committing); 1448 local_inc(&cpu_buffer->commits); 1449 } 1450 1451 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 1452 { 1453 unsigned long commits; 1454 1455 if (RB_WARN_ON(cpu_buffer, 1456 !local_read(&cpu_buffer->committing))) 1457 return; 1458 1459 again: 1460 commits = local_read(&cpu_buffer->commits); 1461 /* synchronize with interrupts */ 1462 barrier(); 1463 if (local_read(&cpu_buffer->committing) == 1) 1464 rb_set_commit_to_write(cpu_buffer); 1465 1466 local_dec(&cpu_buffer->committing); 1467 1468 /* synchronize with interrupts */ 1469 barrier(); 1470 1471 /* 1472 * Need to account for interrupts coming in between the 1473 * updating of the commit page and the clearing of the 1474 * committing counter. 1475 */ 1476 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 1477 !local_read(&cpu_buffer->committing)) { 1478 local_inc(&cpu_buffer->committing); 1479 goto again; 1480 } 1481 } 1482 1483 static struct ring_buffer_event * 1484 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1485 unsigned long length) 1486 { 1487 struct ring_buffer_event *event; 1488 u64 ts, delta = 0; 1489 int commit = 0; 1490 int nr_loops = 0; 1491 1492 rb_start_commit(cpu_buffer); 1493 1494 length = rb_calculate_event_length(length); 1495 again: 1496 /* 1497 * We allow for interrupts to reenter here and do a trace. 1498 * If one does, it will cause this original code to loop 1499 * back here. Even with heavy interrupts happening, this 1500 * should only happen a few times in a row. If this happens 1501 * 1000 times in a row, there must be either an interrupt 1502 * storm or we have something buggy. 1503 * Bail! 1504 */ 1505 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1506 goto out_fail; 1507 1508 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); 1509 1510 /* 1511 * Only the first commit can update the timestamp. 1512 * Yes there is a race here. If an interrupt comes in 1513 * just after the conditional and it traces too, then it 1514 * will also check the deltas. More than one timestamp may 1515 * also be made. But only the entry that did the actual 1516 * commit will be something other than zero. 1517 */ 1518 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 1519 rb_page_write(cpu_buffer->tail_page) == 1520 rb_commit_index(cpu_buffer))) { 1521 u64 diff; 1522 1523 diff = ts - cpu_buffer->write_stamp; 1524 1525 /* make sure this diff is calculated here */ 1526 barrier(); 1527 1528 /* Did the write stamp get updated already? */ 1529 if (unlikely(ts < cpu_buffer->write_stamp)) 1530 goto get_event; 1531 1532 delta = diff; 1533 if (unlikely(test_time_stamp(delta))) { 1534 1535 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1536 if (commit == -EBUSY) 1537 goto out_fail; 1538 1539 if (commit == -EAGAIN) 1540 goto again; 1541 1542 RB_WARN_ON(cpu_buffer, commit < 0); 1543 } 1544 } 1545 1546 get_event: 1547 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 1548 if (unlikely(PTR_ERR(event) == -EAGAIN)) 1549 goto again; 1550 1551 if (!event) 1552 goto out_fail; 1553 1554 if (!rb_event_is_commit(cpu_buffer, event)) 1555 delta = 0; 1556 1557 event->time_delta = delta; 1558 1559 return event; 1560 1561 out_fail: 1562 rb_end_commit(cpu_buffer); 1563 return NULL; 1564 } 1565 1566 #define TRACE_RECURSIVE_DEPTH 16 1567 1568 static int trace_recursive_lock(void) 1569 { 1570 current->trace_recursion++; 1571 1572 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 1573 return 0; 1574 1575 /* Disable all tracing before we do anything else */ 1576 tracing_off_permanent(); 1577 1578 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 1579 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 1580 current->trace_recursion, 1581 hardirq_count() >> HARDIRQ_SHIFT, 1582 softirq_count() >> SOFTIRQ_SHIFT, 1583 in_nmi()); 1584 1585 WARN_ON_ONCE(1); 1586 return -1; 1587 } 1588 1589 static void trace_recursive_unlock(void) 1590 { 1591 WARN_ON_ONCE(!current->trace_recursion); 1592 1593 current->trace_recursion--; 1594 } 1595 1596 static DEFINE_PER_CPU(int, rb_need_resched); 1597 1598 /** 1599 * ring_buffer_lock_reserve - reserve a part of the buffer 1600 * @buffer: the ring buffer to reserve from 1601 * @length: the length of the data to reserve (excluding event header) 1602 * 1603 * Returns a reseverd event on the ring buffer to copy directly to. 1604 * The user of this interface will need to get the body to write into 1605 * and can use the ring_buffer_event_data() interface. 1606 * 1607 * The length is the length of the data needed, not the event length 1608 * which also includes the event header. 1609 * 1610 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1611 * If NULL is returned, then nothing has been allocated or locked. 1612 */ 1613 struct ring_buffer_event * 1614 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 1615 { 1616 struct ring_buffer_per_cpu *cpu_buffer; 1617 struct ring_buffer_event *event; 1618 int cpu, resched; 1619 1620 if (ring_buffer_flags != RB_BUFFERS_ON) 1621 return NULL; 1622 1623 if (atomic_read(&buffer->record_disabled)) 1624 return NULL; 1625 1626 /* If we are tracing schedule, we don't want to recurse */ 1627 resched = ftrace_preempt_disable(); 1628 1629 if (trace_recursive_lock()) 1630 goto out_nocheck; 1631 1632 cpu = raw_smp_processor_id(); 1633 1634 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1635 goto out; 1636 1637 cpu_buffer = buffer->buffers[cpu]; 1638 1639 if (atomic_read(&cpu_buffer->record_disabled)) 1640 goto out; 1641 1642 if (length > BUF_MAX_DATA_SIZE) 1643 goto out; 1644 1645 event = rb_reserve_next_event(cpu_buffer, length); 1646 if (!event) 1647 goto out; 1648 1649 /* 1650 * Need to store resched state on this cpu. 1651 * Only the first needs to. 1652 */ 1653 1654 if (preempt_count() == 1) 1655 per_cpu(rb_need_resched, cpu) = resched; 1656 1657 return event; 1658 1659 out: 1660 trace_recursive_unlock(); 1661 1662 out_nocheck: 1663 ftrace_preempt_enable(resched); 1664 return NULL; 1665 } 1666 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 1667 1668 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1669 struct ring_buffer_event *event) 1670 { 1671 local_inc(&cpu_buffer->entries); 1672 1673 /* 1674 * The event first in the commit queue updates the 1675 * time stamp. 1676 */ 1677 if (rb_event_is_commit(cpu_buffer, event)) 1678 cpu_buffer->write_stamp += event->time_delta; 1679 1680 rb_end_commit(cpu_buffer); 1681 } 1682 1683 /** 1684 * ring_buffer_unlock_commit - commit a reserved 1685 * @buffer: The buffer to commit to 1686 * @event: The event pointer to commit. 1687 * 1688 * This commits the data to the ring buffer, and releases any locks held. 1689 * 1690 * Must be paired with ring_buffer_lock_reserve. 1691 */ 1692 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1693 struct ring_buffer_event *event) 1694 { 1695 struct ring_buffer_per_cpu *cpu_buffer; 1696 int cpu = raw_smp_processor_id(); 1697 1698 cpu_buffer = buffer->buffers[cpu]; 1699 1700 rb_commit(cpu_buffer, event); 1701 1702 trace_recursive_unlock(); 1703 1704 /* 1705 * Only the last preempt count needs to restore preemption. 1706 */ 1707 if (preempt_count() == 1) 1708 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1709 else 1710 preempt_enable_no_resched_notrace(); 1711 1712 return 0; 1713 } 1714 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 1715 1716 static inline void rb_event_discard(struct ring_buffer_event *event) 1717 { 1718 /* array[0] holds the actual length for the discarded event */ 1719 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 1720 event->type_len = RINGBUF_TYPE_PADDING; 1721 /* time delta must be non zero */ 1722 if (!event->time_delta) 1723 event->time_delta = 1; 1724 } 1725 1726 /** 1727 * ring_buffer_event_discard - discard any event in the ring buffer 1728 * @event: the event to discard 1729 * 1730 * Sometimes a event that is in the ring buffer needs to be ignored. 1731 * This function lets the user discard an event in the ring buffer 1732 * and then that event will not be read later. 1733 * 1734 * Note, it is up to the user to be careful with this, and protect 1735 * against races. If the user discards an event that has been consumed 1736 * it is possible that it could corrupt the ring buffer. 1737 */ 1738 void ring_buffer_event_discard(struct ring_buffer_event *event) 1739 { 1740 rb_event_discard(event); 1741 } 1742 EXPORT_SYMBOL_GPL(ring_buffer_event_discard); 1743 1744 /** 1745 * ring_buffer_commit_discard - discard an event that has not been committed 1746 * @buffer: the ring buffer 1747 * @event: non committed event to discard 1748 * 1749 * This is similar to ring_buffer_event_discard but must only be 1750 * performed on an event that has not been committed yet. The difference 1751 * is that this will also try to free the event from the ring buffer 1752 * if another event has not been added behind it. 1753 * 1754 * If another event has been added behind it, it will set the event 1755 * up as discarded, and perform the commit. 1756 * 1757 * If this function is called, do not call ring_buffer_unlock_commit on 1758 * the event. 1759 */ 1760 void ring_buffer_discard_commit(struct ring_buffer *buffer, 1761 struct ring_buffer_event *event) 1762 { 1763 struct ring_buffer_per_cpu *cpu_buffer; 1764 int cpu; 1765 1766 /* The event is discarded regardless */ 1767 rb_event_discard(event); 1768 1769 cpu = smp_processor_id(); 1770 cpu_buffer = buffer->buffers[cpu]; 1771 1772 /* 1773 * This must only be called if the event has not been 1774 * committed yet. Thus we can assume that preemption 1775 * is still disabled. 1776 */ 1777 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 1778 1779 if (!rb_try_to_discard(cpu_buffer, event)) 1780 goto out; 1781 1782 /* 1783 * The commit is still visible by the reader, so we 1784 * must increment entries. 1785 */ 1786 local_inc(&cpu_buffer->entries); 1787 out: 1788 rb_end_commit(cpu_buffer); 1789 1790 trace_recursive_unlock(); 1791 1792 /* 1793 * Only the last preempt count needs to restore preemption. 1794 */ 1795 if (preempt_count() == 1) 1796 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1797 else 1798 preempt_enable_no_resched_notrace(); 1799 1800 } 1801 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 1802 1803 /** 1804 * ring_buffer_write - write data to the buffer without reserving 1805 * @buffer: The ring buffer to write to. 1806 * @length: The length of the data being written (excluding the event header) 1807 * @data: The data to write to the buffer. 1808 * 1809 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1810 * one function. If you already have the data to write to the buffer, it 1811 * may be easier to simply call this function. 1812 * 1813 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1814 * and not the length of the event which would hold the header. 1815 */ 1816 int ring_buffer_write(struct ring_buffer *buffer, 1817 unsigned long length, 1818 void *data) 1819 { 1820 struct ring_buffer_per_cpu *cpu_buffer; 1821 struct ring_buffer_event *event; 1822 void *body; 1823 int ret = -EBUSY; 1824 int cpu, resched; 1825 1826 if (ring_buffer_flags != RB_BUFFERS_ON) 1827 return -EBUSY; 1828 1829 if (atomic_read(&buffer->record_disabled)) 1830 return -EBUSY; 1831 1832 resched = ftrace_preempt_disable(); 1833 1834 cpu = raw_smp_processor_id(); 1835 1836 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1837 goto out; 1838 1839 cpu_buffer = buffer->buffers[cpu]; 1840 1841 if (atomic_read(&cpu_buffer->record_disabled)) 1842 goto out; 1843 1844 if (length > BUF_MAX_DATA_SIZE) 1845 goto out; 1846 1847 event = rb_reserve_next_event(cpu_buffer, length); 1848 if (!event) 1849 goto out; 1850 1851 body = rb_event_data(event); 1852 1853 memcpy(body, data, length); 1854 1855 rb_commit(cpu_buffer, event); 1856 1857 ret = 0; 1858 out: 1859 ftrace_preempt_enable(resched); 1860 1861 return ret; 1862 } 1863 EXPORT_SYMBOL_GPL(ring_buffer_write); 1864 1865 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1866 { 1867 struct buffer_page *reader = cpu_buffer->reader_page; 1868 struct buffer_page *head = cpu_buffer->head_page; 1869 struct buffer_page *commit = cpu_buffer->commit_page; 1870 1871 return reader->read == rb_page_commit(reader) && 1872 (commit == reader || 1873 (commit == head && 1874 head->read == rb_page_commit(commit))); 1875 } 1876 1877 /** 1878 * ring_buffer_record_disable - stop all writes into the buffer 1879 * @buffer: The ring buffer to stop writes to. 1880 * 1881 * This prevents all writes to the buffer. Any attempt to write 1882 * to the buffer after this will fail and return NULL. 1883 * 1884 * The caller should call synchronize_sched() after this. 1885 */ 1886 void ring_buffer_record_disable(struct ring_buffer *buffer) 1887 { 1888 atomic_inc(&buffer->record_disabled); 1889 } 1890 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 1891 1892 /** 1893 * ring_buffer_record_enable - enable writes to the buffer 1894 * @buffer: The ring buffer to enable writes 1895 * 1896 * Note, multiple disables will need the same number of enables 1897 * to truely enable the writing (much like preempt_disable). 1898 */ 1899 void ring_buffer_record_enable(struct ring_buffer *buffer) 1900 { 1901 atomic_dec(&buffer->record_disabled); 1902 } 1903 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 1904 1905 /** 1906 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1907 * @buffer: The ring buffer to stop writes to. 1908 * @cpu: The CPU buffer to stop 1909 * 1910 * This prevents all writes to the buffer. Any attempt to write 1911 * to the buffer after this will fail and return NULL. 1912 * 1913 * The caller should call synchronize_sched() after this. 1914 */ 1915 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1916 { 1917 struct ring_buffer_per_cpu *cpu_buffer; 1918 1919 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1920 return; 1921 1922 cpu_buffer = buffer->buffers[cpu]; 1923 atomic_inc(&cpu_buffer->record_disabled); 1924 } 1925 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 1926 1927 /** 1928 * ring_buffer_record_enable_cpu - enable writes to the buffer 1929 * @buffer: The ring buffer to enable writes 1930 * @cpu: The CPU to enable. 1931 * 1932 * Note, multiple disables will need the same number of enables 1933 * to truely enable the writing (much like preempt_disable). 1934 */ 1935 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1936 { 1937 struct ring_buffer_per_cpu *cpu_buffer; 1938 1939 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1940 return; 1941 1942 cpu_buffer = buffer->buffers[cpu]; 1943 atomic_dec(&cpu_buffer->record_disabled); 1944 } 1945 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 1946 1947 /** 1948 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1949 * @buffer: The ring buffer 1950 * @cpu: The per CPU buffer to get the entries from. 1951 */ 1952 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1953 { 1954 struct ring_buffer_per_cpu *cpu_buffer; 1955 unsigned long ret; 1956 1957 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1958 return 0; 1959 1960 cpu_buffer = buffer->buffers[cpu]; 1961 ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun) 1962 - cpu_buffer->read; 1963 1964 return ret; 1965 } 1966 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 1967 1968 /** 1969 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1970 * @buffer: The ring buffer 1971 * @cpu: The per CPU buffer to get the number of overruns from 1972 */ 1973 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1974 { 1975 struct ring_buffer_per_cpu *cpu_buffer; 1976 unsigned long ret; 1977 1978 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1979 return 0; 1980 1981 cpu_buffer = buffer->buffers[cpu]; 1982 ret = cpu_buffer->overrun; 1983 1984 return ret; 1985 } 1986 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 1987 1988 /** 1989 * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped 1990 * @buffer: The ring buffer 1991 * @cpu: The per CPU buffer to get the number of overruns from 1992 */ 1993 unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu) 1994 { 1995 struct ring_buffer_per_cpu *cpu_buffer; 1996 unsigned long ret; 1997 1998 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1999 return 0; 2000 2001 cpu_buffer = buffer->buffers[cpu]; 2002 ret = cpu_buffer->nmi_dropped; 2003 2004 return ret; 2005 } 2006 EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu); 2007 2008 /** 2009 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2010 * @buffer: The ring buffer 2011 * @cpu: The per CPU buffer to get the number of overruns from 2012 */ 2013 unsigned long 2014 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2015 { 2016 struct ring_buffer_per_cpu *cpu_buffer; 2017 unsigned long ret; 2018 2019 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2020 return 0; 2021 2022 cpu_buffer = buffer->buffers[cpu]; 2023 ret = cpu_buffer->commit_overrun; 2024 2025 return ret; 2026 } 2027 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2028 2029 /** 2030 * ring_buffer_entries - get the number of entries in a buffer 2031 * @buffer: The ring buffer 2032 * 2033 * Returns the total number of entries in the ring buffer 2034 * (all CPU entries) 2035 */ 2036 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2037 { 2038 struct ring_buffer_per_cpu *cpu_buffer; 2039 unsigned long entries = 0; 2040 int cpu; 2041 2042 /* if you care about this being correct, lock the buffer */ 2043 for_each_buffer_cpu(buffer, cpu) { 2044 cpu_buffer = buffer->buffers[cpu]; 2045 entries += (local_read(&cpu_buffer->entries) - 2046 cpu_buffer->overrun) - cpu_buffer->read; 2047 } 2048 2049 return entries; 2050 } 2051 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2052 2053 /** 2054 * ring_buffer_overrun_cpu - get the number of overruns in buffer 2055 * @buffer: The ring buffer 2056 * 2057 * Returns the total number of overruns in the ring buffer 2058 * (all CPU entries) 2059 */ 2060 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2061 { 2062 struct ring_buffer_per_cpu *cpu_buffer; 2063 unsigned long overruns = 0; 2064 int cpu; 2065 2066 /* if you care about this being correct, lock the buffer */ 2067 for_each_buffer_cpu(buffer, cpu) { 2068 cpu_buffer = buffer->buffers[cpu]; 2069 overruns += cpu_buffer->overrun; 2070 } 2071 2072 return overruns; 2073 } 2074 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2075 2076 static void rb_iter_reset(struct ring_buffer_iter *iter) 2077 { 2078 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2079 2080 /* Iterator usage is expected to have record disabled */ 2081 if (list_empty(&cpu_buffer->reader_page->list)) { 2082 iter->head_page = cpu_buffer->head_page; 2083 iter->head = cpu_buffer->head_page->read; 2084 } else { 2085 iter->head_page = cpu_buffer->reader_page; 2086 iter->head = cpu_buffer->reader_page->read; 2087 } 2088 if (iter->head) 2089 iter->read_stamp = cpu_buffer->read_stamp; 2090 else 2091 iter->read_stamp = iter->head_page->page->time_stamp; 2092 } 2093 2094 /** 2095 * ring_buffer_iter_reset - reset an iterator 2096 * @iter: The iterator to reset 2097 * 2098 * Resets the iterator, so that it will start from the beginning 2099 * again. 2100 */ 2101 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2102 { 2103 struct ring_buffer_per_cpu *cpu_buffer; 2104 unsigned long flags; 2105 2106 if (!iter) 2107 return; 2108 2109 cpu_buffer = iter->cpu_buffer; 2110 2111 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2112 rb_iter_reset(iter); 2113 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2114 } 2115 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2116 2117 /** 2118 * ring_buffer_iter_empty - check if an iterator has no more to read 2119 * @iter: The iterator to check 2120 */ 2121 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2122 { 2123 struct ring_buffer_per_cpu *cpu_buffer; 2124 2125 cpu_buffer = iter->cpu_buffer; 2126 2127 return iter->head_page == cpu_buffer->commit_page && 2128 iter->head == rb_commit_index(cpu_buffer); 2129 } 2130 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2131 2132 static void 2133 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2134 struct ring_buffer_event *event) 2135 { 2136 u64 delta; 2137 2138 switch (event->type_len) { 2139 case RINGBUF_TYPE_PADDING: 2140 return; 2141 2142 case RINGBUF_TYPE_TIME_EXTEND: 2143 delta = event->array[0]; 2144 delta <<= TS_SHIFT; 2145 delta += event->time_delta; 2146 cpu_buffer->read_stamp += delta; 2147 return; 2148 2149 case RINGBUF_TYPE_TIME_STAMP: 2150 /* FIXME: not implemented */ 2151 return; 2152 2153 case RINGBUF_TYPE_DATA: 2154 cpu_buffer->read_stamp += event->time_delta; 2155 return; 2156 2157 default: 2158 BUG(); 2159 } 2160 return; 2161 } 2162 2163 static void 2164 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2165 struct ring_buffer_event *event) 2166 { 2167 u64 delta; 2168 2169 switch (event->type_len) { 2170 case RINGBUF_TYPE_PADDING: 2171 return; 2172 2173 case RINGBUF_TYPE_TIME_EXTEND: 2174 delta = event->array[0]; 2175 delta <<= TS_SHIFT; 2176 delta += event->time_delta; 2177 iter->read_stamp += delta; 2178 return; 2179 2180 case RINGBUF_TYPE_TIME_STAMP: 2181 /* FIXME: not implemented */ 2182 return; 2183 2184 case RINGBUF_TYPE_DATA: 2185 iter->read_stamp += event->time_delta; 2186 return; 2187 2188 default: 2189 BUG(); 2190 } 2191 return; 2192 } 2193 2194 static struct buffer_page * 2195 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2196 { 2197 struct buffer_page *reader = NULL; 2198 unsigned long flags; 2199 int nr_loops = 0; 2200 2201 local_irq_save(flags); 2202 __raw_spin_lock(&cpu_buffer->lock); 2203 2204 again: 2205 /* 2206 * This should normally only loop twice. But because the 2207 * start of the reader inserts an empty page, it causes 2208 * a case where we will loop three times. There should be no 2209 * reason to loop four times (that I know of). 2210 */ 2211 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2212 reader = NULL; 2213 goto out; 2214 } 2215 2216 reader = cpu_buffer->reader_page; 2217 2218 /* If there's more to read, return this page */ 2219 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2220 goto out; 2221 2222 /* Never should we have an index greater than the size */ 2223 if (RB_WARN_ON(cpu_buffer, 2224 cpu_buffer->reader_page->read > rb_page_size(reader))) 2225 goto out; 2226 2227 /* check if we caught up to the tail */ 2228 reader = NULL; 2229 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2230 goto out; 2231 2232 /* 2233 * Splice the empty reader page into the list around the head. 2234 * Reset the reader page to size zero. 2235 */ 2236 2237 reader = cpu_buffer->head_page; 2238 cpu_buffer->reader_page->list.next = reader->list.next; 2239 cpu_buffer->reader_page->list.prev = reader->list.prev; 2240 2241 local_set(&cpu_buffer->reader_page->write, 0); 2242 local_set(&cpu_buffer->reader_page->entries, 0); 2243 local_set(&cpu_buffer->reader_page->page->commit, 0); 2244 2245 /* Make the reader page now replace the head */ 2246 reader->list.prev->next = &cpu_buffer->reader_page->list; 2247 reader->list.next->prev = &cpu_buffer->reader_page->list; 2248 2249 /* 2250 * If the tail is on the reader, then we must set the head 2251 * to the inserted page, otherwise we set it one before. 2252 */ 2253 cpu_buffer->head_page = cpu_buffer->reader_page; 2254 2255 if (cpu_buffer->commit_page != reader) 2256 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2257 2258 /* Finally update the reader page to the new head */ 2259 cpu_buffer->reader_page = reader; 2260 rb_reset_reader_page(cpu_buffer); 2261 2262 goto again; 2263 2264 out: 2265 __raw_spin_unlock(&cpu_buffer->lock); 2266 local_irq_restore(flags); 2267 2268 return reader; 2269 } 2270 2271 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2272 { 2273 struct ring_buffer_event *event; 2274 struct buffer_page *reader; 2275 unsigned length; 2276 2277 reader = rb_get_reader_page(cpu_buffer); 2278 2279 /* This function should not be called when buffer is empty */ 2280 if (RB_WARN_ON(cpu_buffer, !reader)) 2281 return; 2282 2283 event = rb_reader_event(cpu_buffer); 2284 2285 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX 2286 || rb_discarded_event(event)) 2287 cpu_buffer->read++; 2288 2289 rb_update_read_stamp(cpu_buffer, event); 2290 2291 length = rb_event_length(event); 2292 cpu_buffer->reader_page->read += length; 2293 } 2294 2295 static void rb_advance_iter(struct ring_buffer_iter *iter) 2296 { 2297 struct ring_buffer *buffer; 2298 struct ring_buffer_per_cpu *cpu_buffer; 2299 struct ring_buffer_event *event; 2300 unsigned length; 2301 2302 cpu_buffer = iter->cpu_buffer; 2303 buffer = cpu_buffer->buffer; 2304 2305 /* 2306 * Check if we are at the end of the buffer. 2307 */ 2308 if (iter->head >= rb_page_size(iter->head_page)) { 2309 /* discarded commits can make the page empty */ 2310 if (iter->head_page == cpu_buffer->commit_page) 2311 return; 2312 rb_inc_iter(iter); 2313 return; 2314 } 2315 2316 event = rb_iter_head_event(iter); 2317 2318 length = rb_event_length(event); 2319 2320 /* 2321 * This should not be called to advance the header if we are 2322 * at the tail of the buffer. 2323 */ 2324 if (RB_WARN_ON(cpu_buffer, 2325 (iter->head_page == cpu_buffer->commit_page) && 2326 (iter->head + length > rb_commit_index(cpu_buffer)))) 2327 return; 2328 2329 rb_update_iter_read_stamp(iter, event); 2330 2331 iter->head += length; 2332 2333 /* check for end of page padding */ 2334 if ((iter->head >= rb_page_size(iter->head_page)) && 2335 (iter->head_page != cpu_buffer->commit_page)) 2336 rb_advance_iter(iter); 2337 } 2338 2339 static struct ring_buffer_event * 2340 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2341 { 2342 struct ring_buffer_per_cpu *cpu_buffer; 2343 struct ring_buffer_event *event; 2344 struct buffer_page *reader; 2345 int nr_loops = 0; 2346 2347 cpu_buffer = buffer->buffers[cpu]; 2348 2349 again: 2350 /* 2351 * We repeat when a timestamp is encountered. It is possible 2352 * to get multiple timestamps from an interrupt entering just 2353 * as one timestamp is about to be written, or from discarded 2354 * commits. The most that we can have is the number on a single page. 2355 */ 2356 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2357 return NULL; 2358 2359 reader = rb_get_reader_page(cpu_buffer); 2360 if (!reader) 2361 return NULL; 2362 2363 event = rb_reader_event(cpu_buffer); 2364 2365 switch (event->type_len) { 2366 case RINGBUF_TYPE_PADDING: 2367 if (rb_null_event(event)) 2368 RB_WARN_ON(cpu_buffer, 1); 2369 /* 2370 * Because the writer could be discarding every 2371 * event it creates (which would probably be bad) 2372 * if we were to go back to "again" then we may never 2373 * catch up, and will trigger the warn on, or lock 2374 * the box. Return the padding, and we will release 2375 * the current locks, and try again. 2376 */ 2377 rb_advance_reader(cpu_buffer); 2378 return event; 2379 2380 case RINGBUF_TYPE_TIME_EXTEND: 2381 /* Internal data, OK to advance */ 2382 rb_advance_reader(cpu_buffer); 2383 goto again; 2384 2385 case RINGBUF_TYPE_TIME_STAMP: 2386 /* FIXME: not implemented */ 2387 rb_advance_reader(cpu_buffer); 2388 goto again; 2389 2390 case RINGBUF_TYPE_DATA: 2391 if (ts) { 2392 *ts = cpu_buffer->read_stamp + event->time_delta; 2393 ring_buffer_normalize_time_stamp(buffer, 2394 cpu_buffer->cpu, ts); 2395 } 2396 return event; 2397 2398 default: 2399 BUG(); 2400 } 2401 2402 return NULL; 2403 } 2404 EXPORT_SYMBOL_GPL(ring_buffer_peek); 2405 2406 static struct ring_buffer_event * 2407 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2408 { 2409 struct ring_buffer *buffer; 2410 struct ring_buffer_per_cpu *cpu_buffer; 2411 struct ring_buffer_event *event; 2412 int nr_loops = 0; 2413 2414 if (ring_buffer_iter_empty(iter)) 2415 return NULL; 2416 2417 cpu_buffer = iter->cpu_buffer; 2418 buffer = cpu_buffer->buffer; 2419 2420 again: 2421 /* 2422 * We repeat when a timestamp is encountered. 2423 * We can get multiple timestamps by nested interrupts or also 2424 * if filtering is on (discarding commits). Since discarding 2425 * commits can be frequent we can get a lot of timestamps. 2426 * But we limit them by not adding timestamps if they begin 2427 * at the start of a page. 2428 */ 2429 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2430 return NULL; 2431 2432 if (rb_per_cpu_empty(cpu_buffer)) 2433 return NULL; 2434 2435 event = rb_iter_head_event(iter); 2436 2437 switch (event->type_len) { 2438 case RINGBUF_TYPE_PADDING: 2439 if (rb_null_event(event)) { 2440 rb_inc_iter(iter); 2441 goto again; 2442 } 2443 rb_advance_iter(iter); 2444 return event; 2445 2446 case RINGBUF_TYPE_TIME_EXTEND: 2447 /* Internal data, OK to advance */ 2448 rb_advance_iter(iter); 2449 goto again; 2450 2451 case RINGBUF_TYPE_TIME_STAMP: 2452 /* FIXME: not implemented */ 2453 rb_advance_iter(iter); 2454 goto again; 2455 2456 case RINGBUF_TYPE_DATA: 2457 if (ts) { 2458 *ts = iter->read_stamp + event->time_delta; 2459 ring_buffer_normalize_time_stamp(buffer, 2460 cpu_buffer->cpu, ts); 2461 } 2462 return event; 2463 2464 default: 2465 BUG(); 2466 } 2467 2468 return NULL; 2469 } 2470 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 2471 2472 static inline int rb_ok_to_lock(void) 2473 { 2474 /* 2475 * If an NMI die dumps out the content of the ring buffer 2476 * do not grab locks. We also permanently disable the ring 2477 * buffer too. A one time deal is all you get from reading 2478 * the ring buffer from an NMI. 2479 */ 2480 if (likely(!in_nmi() && !oops_in_progress)) 2481 return 1; 2482 2483 tracing_off_permanent(); 2484 return 0; 2485 } 2486 2487 /** 2488 * ring_buffer_peek - peek at the next event to be read 2489 * @buffer: The ring buffer to read 2490 * @cpu: The cpu to peak at 2491 * @ts: The timestamp counter of this event. 2492 * 2493 * This will return the event that will be read next, but does 2494 * not consume the data. 2495 */ 2496 struct ring_buffer_event * 2497 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2498 { 2499 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2500 struct ring_buffer_event *event; 2501 unsigned long flags; 2502 int dolock; 2503 2504 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2505 return NULL; 2506 2507 dolock = rb_ok_to_lock(); 2508 again: 2509 local_irq_save(flags); 2510 if (dolock) 2511 spin_lock(&cpu_buffer->reader_lock); 2512 event = rb_buffer_peek(buffer, cpu, ts); 2513 if (dolock) 2514 spin_unlock(&cpu_buffer->reader_lock); 2515 local_irq_restore(flags); 2516 2517 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2518 cpu_relax(); 2519 goto again; 2520 } 2521 2522 return event; 2523 } 2524 2525 /** 2526 * ring_buffer_iter_peek - peek at the next event to be read 2527 * @iter: The ring buffer iterator 2528 * @ts: The timestamp counter of this event. 2529 * 2530 * This will return the event that will be read next, but does 2531 * not increment the iterator. 2532 */ 2533 struct ring_buffer_event * 2534 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2535 { 2536 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2537 struct ring_buffer_event *event; 2538 unsigned long flags; 2539 2540 again: 2541 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2542 event = rb_iter_peek(iter, ts); 2543 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2544 2545 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2546 cpu_relax(); 2547 goto again; 2548 } 2549 2550 return event; 2551 } 2552 2553 /** 2554 * ring_buffer_consume - return an event and consume it 2555 * @buffer: The ring buffer to get the next event from 2556 * 2557 * Returns the next event in the ring buffer, and that event is consumed. 2558 * Meaning, that sequential reads will keep returning a different event, 2559 * and eventually empty the ring buffer if the producer is slower. 2560 */ 2561 struct ring_buffer_event * 2562 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 2563 { 2564 struct ring_buffer_per_cpu *cpu_buffer; 2565 struct ring_buffer_event *event = NULL; 2566 unsigned long flags; 2567 int dolock; 2568 2569 dolock = rb_ok_to_lock(); 2570 2571 again: 2572 /* might be called in atomic */ 2573 preempt_disable(); 2574 2575 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2576 goto out; 2577 2578 cpu_buffer = buffer->buffers[cpu]; 2579 local_irq_save(flags); 2580 if (dolock) 2581 spin_lock(&cpu_buffer->reader_lock); 2582 2583 event = rb_buffer_peek(buffer, cpu, ts); 2584 if (!event) 2585 goto out_unlock; 2586 2587 rb_advance_reader(cpu_buffer); 2588 2589 out_unlock: 2590 if (dolock) 2591 spin_unlock(&cpu_buffer->reader_lock); 2592 local_irq_restore(flags); 2593 2594 out: 2595 preempt_enable(); 2596 2597 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2598 cpu_relax(); 2599 goto again; 2600 } 2601 2602 return event; 2603 } 2604 EXPORT_SYMBOL_GPL(ring_buffer_consume); 2605 2606 /** 2607 * ring_buffer_read_start - start a non consuming read of the buffer 2608 * @buffer: The ring buffer to read from 2609 * @cpu: The cpu buffer to iterate over 2610 * 2611 * This starts up an iteration through the buffer. It also disables 2612 * the recording to the buffer until the reading is finished. 2613 * This prevents the reading from being corrupted. This is not 2614 * a consuming read, so a producer is not expected. 2615 * 2616 * Must be paired with ring_buffer_finish. 2617 */ 2618 struct ring_buffer_iter * 2619 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 2620 { 2621 struct ring_buffer_per_cpu *cpu_buffer; 2622 struct ring_buffer_iter *iter; 2623 unsigned long flags; 2624 2625 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2626 return NULL; 2627 2628 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 2629 if (!iter) 2630 return NULL; 2631 2632 cpu_buffer = buffer->buffers[cpu]; 2633 2634 iter->cpu_buffer = cpu_buffer; 2635 2636 atomic_inc(&cpu_buffer->record_disabled); 2637 synchronize_sched(); 2638 2639 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2640 __raw_spin_lock(&cpu_buffer->lock); 2641 rb_iter_reset(iter); 2642 __raw_spin_unlock(&cpu_buffer->lock); 2643 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2644 2645 return iter; 2646 } 2647 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 2648 2649 /** 2650 * ring_buffer_finish - finish reading the iterator of the buffer 2651 * @iter: The iterator retrieved by ring_buffer_start 2652 * 2653 * This re-enables the recording to the buffer, and frees the 2654 * iterator. 2655 */ 2656 void 2657 ring_buffer_read_finish(struct ring_buffer_iter *iter) 2658 { 2659 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2660 2661 atomic_dec(&cpu_buffer->record_disabled); 2662 kfree(iter); 2663 } 2664 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 2665 2666 /** 2667 * ring_buffer_read - read the next item in the ring buffer by the iterator 2668 * @iter: The ring buffer iterator 2669 * @ts: The time stamp of the event read. 2670 * 2671 * This reads the next event in the ring buffer and increments the iterator. 2672 */ 2673 struct ring_buffer_event * 2674 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 2675 { 2676 struct ring_buffer_event *event; 2677 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2678 unsigned long flags; 2679 2680 again: 2681 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2682 event = rb_iter_peek(iter, ts); 2683 if (!event) 2684 goto out; 2685 2686 rb_advance_iter(iter); 2687 out: 2688 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2689 2690 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2691 cpu_relax(); 2692 goto again; 2693 } 2694 2695 return event; 2696 } 2697 EXPORT_SYMBOL_GPL(ring_buffer_read); 2698 2699 /** 2700 * ring_buffer_size - return the size of the ring buffer (in bytes) 2701 * @buffer: The ring buffer. 2702 */ 2703 unsigned long ring_buffer_size(struct ring_buffer *buffer) 2704 { 2705 return BUF_PAGE_SIZE * buffer->pages; 2706 } 2707 EXPORT_SYMBOL_GPL(ring_buffer_size); 2708 2709 static void 2710 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 2711 { 2712 cpu_buffer->head_page 2713 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 2714 local_set(&cpu_buffer->head_page->write, 0); 2715 local_set(&cpu_buffer->head_page->entries, 0); 2716 local_set(&cpu_buffer->head_page->page->commit, 0); 2717 2718 cpu_buffer->head_page->read = 0; 2719 2720 cpu_buffer->tail_page = cpu_buffer->head_page; 2721 cpu_buffer->commit_page = cpu_buffer->head_page; 2722 2723 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2724 local_set(&cpu_buffer->reader_page->write, 0); 2725 local_set(&cpu_buffer->reader_page->entries, 0); 2726 local_set(&cpu_buffer->reader_page->page->commit, 0); 2727 cpu_buffer->reader_page->read = 0; 2728 2729 cpu_buffer->nmi_dropped = 0; 2730 cpu_buffer->commit_overrun = 0; 2731 cpu_buffer->overrun = 0; 2732 cpu_buffer->read = 0; 2733 local_set(&cpu_buffer->entries, 0); 2734 local_set(&cpu_buffer->committing, 0); 2735 local_set(&cpu_buffer->commits, 0); 2736 2737 cpu_buffer->write_stamp = 0; 2738 cpu_buffer->read_stamp = 0; 2739 } 2740 2741 /** 2742 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 2743 * @buffer: The ring buffer to reset a per cpu buffer of 2744 * @cpu: The CPU buffer to be reset 2745 */ 2746 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 2747 { 2748 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2749 unsigned long flags; 2750 2751 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2752 return; 2753 2754 atomic_inc(&cpu_buffer->record_disabled); 2755 2756 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2757 2758 __raw_spin_lock(&cpu_buffer->lock); 2759 2760 rb_reset_cpu(cpu_buffer); 2761 2762 __raw_spin_unlock(&cpu_buffer->lock); 2763 2764 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2765 2766 atomic_dec(&cpu_buffer->record_disabled); 2767 } 2768 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 2769 2770 /** 2771 * ring_buffer_reset - reset a ring buffer 2772 * @buffer: The ring buffer to reset all cpu buffers 2773 */ 2774 void ring_buffer_reset(struct ring_buffer *buffer) 2775 { 2776 int cpu; 2777 2778 for_each_buffer_cpu(buffer, cpu) 2779 ring_buffer_reset_cpu(buffer, cpu); 2780 } 2781 EXPORT_SYMBOL_GPL(ring_buffer_reset); 2782 2783 /** 2784 * rind_buffer_empty - is the ring buffer empty? 2785 * @buffer: The ring buffer to test 2786 */ 2787 int ring_buffer_empty(struct ring_buffer *buffer) 2788 { 2789 struct ring_buffer_per_cpu *cpu_buffer; 2790 unsigned long flags; 2791 int dolock; 2792 int cpu; 2793 int ret; 2794 2795 dolock = rb_ok_to_lock(); 2796 2797 /* yes this is racy, but if you don't like the race, lock the buffer */ 2798 for_each_buffer_cpu(buffer, cpu) { 2799 cpu_buffer = buffer->buffers[cpu]; 2800 local_irq_save(flags); 2801 if (dolock) 2802 spin_lock(&cpu_buffer->reader_lock); 2803 ret = rb_per_cpu_empty(cpu_buffer); 2804 if (dolock) 2805 spin_unlock(&cpu_buffer->reader_lock); 2806 local_irq_restore(flags); 2807 2808 if (!ret) 2809 return 0; 2810 } 2811 2812 return 1; 2813 } 2814 EXPORT_SYMBOL_GPL(ring_buffer_empty); 2815 2816 /** 2817 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2818 * @buffer: The ring buffer 2819 * @cpu: The CPU buffer to test 2820 */ 2821 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2822 { 2823 struct ring_buffer_per_cpu *cpu_buffer; 2824 unsigned long flags; 2825 int dolock; 2826 int ret; 2827 2828 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2829 return 1; 2830 2831 dolock = rb_ok_to_lock(); 2832 2833 cpu_buffer = buffer->buffers[cpu]; 2834 local_irq_save(flags); 2835 if (dolock) 2836 spin_lock(&cpu_buffer->reader_lock); 2837 ret = rb_per_cpu_empty(cpu_buffer); 2838 if (dolock) 2839 spin_unlock(&cpu_buffer->reader_lock); 2840 local_irq_restore(flags); 2841 2842 return ret; 2843 } 2844 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 2845 2846 /** 2847 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2848 * @buffer_a: One buffer to swap with 2849 * @buffer_b: The other buffer to swap with 2850 * 2851 * This function is useful for tracers that want to take a "snapshot" 2852 * of a CPU buffer and has another back up buffer lying around. 2853 * it is expected that the tracer handles the cpu buffer not being 2854 * used at the moment. 2855 */ 2856 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2857 struct ring_buffer *buffer_b, int cpu) 2858 { 2859 struct ring_buffer_per_cpu *cpu_buffer_a; 2860 struct ring_buffer_per_cpu *cpu_buffer_b; 2861 int ret = -EINVAL; 2862 2863 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 2864 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 2865 goto out; 2866 2867 /* At least make sure the two buffers are somewhat the same */ 2868 if (buffer_a->pages != buffer_b->pages) 2869 goto out; 2870 2871 ret = -EAGAIN; 2872 2873 if (ring_buffer_flags != RB_BUFFERS_ON) 2874 goto out; 2875 2876 if (atomic_read(&buffer_a->record_disabled)) 2877 goto out; 2878 2879 if (atomic_read(&buffer_b->record_disabled)) 2880 goto out; 2881 2882 cpu_buffer_a = buffer_a->buffers[cpu]; 2883 cpu_buffer_b = buffer_b->buffers[cpu]; 2884 2885 if (atomic_read(&cpu_buffer_a->record_disabled)) 2886 goto out; 2887 2888 if (atomic_read(&cpu_buffer_b->record_disabled)) 2889 goto out; 2890 2891 /* 2892 * We can't do a synchronize_sched here because this 2893 * function can be called in atomic context. 2894 * Normally this will be called from the same CPU as cpu. 2895 * If not it's up to the caller to protect this. 2896 */ 2897 atomic_inc(&cpu_buffer_a->record_disabled); 2898 atomic_inc(&cpu_buffer_b->record_disabled); 2899 2900 buffer_a->buffers[cpu] = cpu_buffer_b; 2901 buffer_b->buffers[cpu] = cpu_buffer_a; 2902 2903 cpu_buffer_b->buffer = buffer_a; 2904 cpu_buffer_a->buffer = buffer_b; 2905 2906 atomic_dec(&cpu_buffer_a->record_disabled); 2907 atomic_dec(&cpu_buffer_b->record_disabled); 2908 2909 ret = 0; 2910 out: 2911 return ret; 2912 } 2913 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 2914 2915 /** 2916 * ring_buffer_alloc_read_page - allocate a page to read from buffer 2917 * @buffer: the buffer to allocate for. 2918 * 2919 * This function is used in conjunction with ring_buffer_read_page. 2920 * When reading a full page from the ring buffer, these functions 2921 * can be used to speed up the process. The calling function should 2922 * allocate a few pages first with this function. Then when it 2923 * needs to get pages from the ring buffer, it passes the result 2924 * of this function into ring_buffer_read_page, which will swap 2925 * the page that was allocated, with the read page of the buffer. 2926 * 2927 * Returns: 2928 * The page allocated, or NULL on error. 2929 */ 2930 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 2931 { 2932 struct buffer_data_page *bpage; 2933 unsigned long addr; 2934 2935 addr = __get_free_page(GFP_KERNEL); 2936 if (!addr) 2937 return NULL; 2938 2939 bpage = (void *)addr; 2940 2941 rb_init_page(bpage); 2942 2943 return bpage; 2944 } 2945 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 2946 2947 /** 2948 * ring_buffer_free_read_page - free an allocated read page 2949 * @buffer: the buffer the page was allocate for 2950 * @data: the page to free 2951 * 2952 * Free a page allocated from ring_buffer_alloc_read_page. 2953 */ 2954 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 2955 { 2956 free_page((unsigned long)data); 2957 } 2958 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 2959 2960 /** 2961 * ring_buffer_read_page - extract a page from the ring buffer 2962 * @buffer: buffer to extract from 2963 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 2964 * @len: amount to extract 2965 * @cpu: the cpu of the buffer to extract 2966 * @full: should the extraction only happen when the page is full. 2967 * 2968 * This function will pull out a page from the ring buffer and consume it. 2969 * @data_page must be the address of the variable that was returned 2970 * from ring_buffer_alloc_read_page. This is because the page might be used 2971 * to swap with a page in the ring buffer. 2972 * 2973 * for example: 2974 * rpage = ring_buffer_alloc_read_page(buffer); 2975 * if (!rpage) 2976 * return error; 2977 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 2978 * if (ret >= 0) 2979 * process_page(rpage, ret); 2980 * 2981 * When @full is set, the function will not return true unless 2982 * the writer is off the reader page. 2983 * 2984 * Note: it is up to the calling functions to handle sleeps and wakeups. 2985 * The ring buffer can be used anywhere in the kernel and can not 2986 * blindly call wake_up. The layer that uses the ring buffer must be 2987 * responsible for that. 2988 * 2989 * Returns: 2990 * >=0 if data has been transferred, returns the offset of consumed data. 2991 * <0 if no data has been transferred. 2992 */ 2993 int ring_buffer_read_page(struct ring_buffer *buffer, 2994 void **data_page, size_t len, int cpu, int full) 2995 { 2996 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2997 struct ring_buffer_event *event; 2998 struct buffer_data_page *bpage; 2999 struct buffer_page *reader; 3000 unsigned long flags; 3001 unsigned int commit; 3002 unsigned int read; 3003 u64 save_timestamp; 3004 int ret = -1; 3005 3006 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3007 goto out; 3008 3009 /* 3010 * If len is not big enough to hold the page header, then 3011 * we can not copy anything. 3012 */ 3013 if (len <= BUF_PAGE_HDR_SIZE) 3014 goto out; 3015 3016 len -= BUF_PAGE_HDR_SIZE; 3017 3018 if (!data_page) 3019 goto out; 3020 3021 bpage = *data_page; 3022 if (!bpage) 3023 goto out; 3024 3025 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3026 3027 reader = rb_get_reader_page(cpu_buffer); 3028 if (!reader) 3029 goto out_unlock; 3030 3031 event = rb_reader_event(cpu_buffer); 3032 3033 read = reader->read; 3034 commit = rb_page_commit(reader); 3035 3036 /* 3037 * If this page has been partially read or 3038 * if len is not big enough to read the rest of the page or 3039 * a writer is still on the page, then 3040 * we must copy the data from the page to the buffer. 3041 * Otherwise, we can simply swap the page with the one passed in. 3042 */ 3043 if (read || (len < (commit - read)) || 3044 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3045 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3046 unsigned int rpos = read; 3047 unsigned int pos = 0; 3048 unsigned int size; 3049 3050 if (full) 3051 goto out_unlock; 3052 3053 if (len > (commit - read)) 3054 len = (commit - read); 3055 3056 size = rb_event_length(event); 3057 3058 if (len < size) 3059 goto out_unlock; 3060 3061 /* save the current timestamp, since the user will need it */ 3062 save_timestamp = cpu_buffer->read_stamp; 3063 3064 /* Need to copy one event at a time */ 3065 do { 3066 memcpy(bpage->data + pos, rpage->data + rpos, size); 3067 3068 len -= size; 3069 3070 rb_advance_reader(cpu_buffer); 3071 rpos = reader->read; 3072 pos += size; 3073 3074 event = rb_reader_event(cpu_buffer); 3075 size = rb_event_length(event); 3076 } while (len > size); 3077 3078 /* update bpage */ 3079 local_set(&bpage->commit, pos); 3080 bpage->time_stamp = save_timestamp; 3081 3082 /* we copied everything to the beginning */ 3083 read = 0; 3084 } else { 3085 /* update the entry counter */ 3086 cpu_buffer->read += local_read(&reader->entries); 3087 3088 /* swap the pages */ 3089 rb_init_page(bpage); 3090 bpage = reader->page; 3091 reader->page = *data_page; 3092 local_set(&reader->write, 0); 3093 local_set(&reader->entries, 0); 3094 reader->read = 0; 3095 *data_page = bpage; 3096 } 3097 ret = read; 3098 3099 out_unlock: 3100 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3101 3102 out: 3103 return ret; 3104 } 3105 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3106 3107 static ssize_t 3108 rb_simple_read(struct file *filp, char __user *ubuf, 3109 size_t cnt, loff_t *ppos) 3110 { 3111 unsigned long *p = filp->private_data; 3112 char buf[64]; 3113 int r; 3114 3115 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3116 r = sprintf(buf, "permanently disabled\n"); 3117 else 3118 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3119 3120 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3121 } 3122 3123 static ssize_t 3124 rb_simple_write(struct file *filp, const char __user *ubuf, 3125 size_t cnt, loff_t *ppos) 3126 { 3127 unsigned long *p = filp->private_data; 3128 char buf[64]; 3129 unsigned long val; 3130 int ret; 3131 3132 if (cnt >= sizeof(buf)) 3133 return -EINVAL; 3134 3135 if (copy_from_user(&buf, ubuf, cnt)) 3136 return -EFAULT; 3137 3138 buf[cnt] = 0; 3139 3140 ret = strict_strtoul(buf, 10, &val); 3141 if (ret < 0) 3142 return ret; 3143 3144 if (val) 3145 set_bit(RB_BUFFERS_ON_BIT, p); 3146 else 3147 clear_bit(RB_BUFFERS_ON_BIT, p); 3148 3149 (*ppos)++; 3150 3151 return cnt; 3152 } 3153 3154 static const struct file_operations rb_simple_fops = { 3155 .open = tracing_open_generic, 3156 .read = rb_simple_read, 3157 .write = rb_simple_write, 3158 }; 3159 3160 3161 static __init int rb_init_debugfs(void) 3162 { 3163 struct dentry *d_tracer; 3164 3165 d_tracer = tracing_init_dentry(); 3166 3167 trace_create_file("tracing_on", 0644, d_tracer, 3168 &ring_buffer_flags, &rb_simple_fops); 3169 3170 return 0; 3171 } 3172 3173 fs_initcall(rb_init_debugfs); 3174 3175 #ifdef CONFIG_HOTPLUG_CPU 3176 static int rb_cpu_notify(struct notifier_block *self, 3177 unsigned long action, void *hcpu) 3178 { 3179 struct ring_buffer *buffer = 3180 container_of(self, struct ring_buffer, cpu_notify); 3181 long cpu = (long)hcpu; 3182 3183 switch (action) { 3184 case CPU_UP_PREPARE: 3185 case CPU_UP_PREPARE_FROZEN: 3186 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3187 return NOTIFY_OK; 3188 3189 buffer->buffers[cpu] = 3190 rb_allocate_cpu_buffer(buffer, cpu); 3191 if (!buffer->buffers[cpu]) { 3192 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3193 cpu); 3194 return NOTIFY_OK; 3195 } 3196 smp_wmb(); 3197 cpumask_set_cpu(cpu, buffer->cpumask); 3198 break; 3199 case CPU_DOWN_PREPARE: 3200 case CPU_DOWN_PREPARE_FROZEN: 3201 /* 3202 * Do nothing. 3203 * If we were to free the buffer, then the user would 3204 * lose any trace that was in the buffer. 3205 */ 3206 break; 3207 default: 3208 break; 3209 } 3210 return NOTIFY_OK; 3211 } 3212 #endif 3213