1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/init.h> 18 #include <linux/hash.h> 19 #include <linux/list.h> 20 #include <linux/cpu.h> 21 #include <linux/fs.h> 22 23 #include "trace.h" 24 25 /* 26 * The ring buffer header is special. We must manually up keep it. 27 */ 28 int ring_buffer_print_entry_header(struct trace_seq *s) 29 { 30 int ret; 31 32 ret = trace_seq_printf(s, "# compressed entry header\n"); 33 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 34 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 35 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 36 ret = trace_seq_printf(s, "\n"); 37 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 38 RINGBUF_TYPE_PADDING); 39 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 40 RINGBUF_TYPE_TIME_EXTEND); 41 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 42 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 43 44 return ret; 45 } 46 47 /* 48 * The ring buffer is made up of a list of pages. A separate list of pages is 49 * allocated for each CPU. A writer may only write to a buffer that is 50 * associated with the CPU it is currently executing on. A reader may read 51 * from any per cpu buffer. 52 * 53 * The reader is special. For each per cpu buffer, the reader has its own 54 * reader page. When a reader has read the entire reader page, this reader 55 * page is swapped with another page in the ring buffer. 56 * 57 * Now, as long as the writer is off the reader page, the reader can do what 58 * ever it wants with that page. The writer will never write to that page 59 * again (as long as it is out of the ring buffer). 60 * 61 * Here's some silly ASCII art. 62 * 63 * +------+ 64 * |reader| RING BUFFER 65 * |page | 66 * +------+ +---+ +---+ +---+ 67 * | |-->| |-->| | 68 * +---+ +---+ +---+ 69 * ^ | 70 * | | 71 * +---------------+ 72 * 73 * 74 * +------+ 75 * |reader| RING BUFFER 76 * |page |------------------v 77 * +------+ +---+ +---+ +---+ 78 * | |-->| |-->| | 79 * +---+ +---+ +---+ 80 * ^ | 81 * | | 82 * +---------------+ 83 * 84 * 85 * +------+ 86 * |reader| RING BUFFER 87 * |page |------------------v 88 * +------+ +---+ +---+ +---+ 89 * ^ | |-->| |-->| | 90 * | +---+ +---+ +---+ 91 * | | 92 * | | 93 * +------------------------------+ 94 * 95 * 96 * +------+ 97 * |buffer| RING BUFFER 98 * |page |------------------v 99 * +------+ +---+ +---+ +---+ 100 * ^ | | | |-->| | 101 * | New +---+ +---+ +---+ 102 * | Reader------^ | 103 * | page | 104 * +------------------------------+ 105 * 106 * 107 * After we make this swap, the reader can hand this page off to the splice 108 * code and be done with it. It can even allocate a new page if it needs to 109 * and swap that into the ring buffer. 110 * 111 * We will be using cmpxchg soon to make all this lockless. 112 * 113 */ 114 115 /* 116 * A fast way to enable or disable all ring buffers is to 117 * call tracing_on or tracing_off. Turning off the ring buffers 118 * prevents all ring buffers from being recorded to. 119 * Turning this switch on, makes it OK to write to the 120 * ring buffer, if the ring buffer is enabled itself. 121 * 122 * There's three layers that must be on in order to write 123 * to the ring buffer. 124 * 125 * 1) This global flag must be set. 126 * 2) The ring buffer must be enabled for recording. 127 * 3) The per cpu buffer must be enabled for recording. 128 * 129 * In case of an anomaly, this global flag has a bit set that 130 * will permantly disable all ring buffers. 131 */ 132 133 /* 134 * Global flag to disable all recording to ring buffers 135 * This has two bits: ON, DISABLED 136 * 137 * ON DISABLED 138 * ---- ---------- 139 * 0 0 : ring buffers are off 140 * 1 0 : ring buffers are on 141 * X 1 : ring buffers are permanently disabled 142 */ 143 144 enum { 145 RB_BUFFERS_ON_BIT = 0, 146 RB_BUFFERS_DISABLED_BIT = 1, 147 }; 148 149 enum { 150 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 151 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 152 }; 153 154 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 155 156 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 157 158 /** 159 * tracing_on - enable all tracing buffers 160 * 161 * This function enables all tracing buffers that may have been 162 * disabled with tracing_off. 163 */ 164 void tracing_on(void) 165 { 166 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 167 } 168 EXPORT_SYMBOL_GPL(tracing_on); 169 170 /** 171 * tracing_off - turn off all tracing buffers 172 * 173 * This function stops all tracing buffers from recording data. 174 * It does not disable any overhead the tracers themselves may 175 * be causing. This function simply causes all recording to 176 * the ring buffers to fail. 177 */ 178 void tracing_off(void) 179 { 180 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 181 } 182 EXPORT_SYMBOL_GPL(tracing_off); 183 184 /** 185 * tracing_off_permanent - permanently disable ring buffers 186 * 187 * This function, once called, will disable all ring buffers 188 * permanently. 189 */ 190 void tracing_off_permanent(void) 191 { 192 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 193 } 194 195 /** 196 * tracing_is_on - show state of ring buffers enabled 197 */ 198 int tracing_is_on(void) 199 { 200 return ring_buffer_flags == RB_BUFFERS_ON; 201 } 202 EXPORT_SYMBOL_GPL(tracing_is_on); 203 204 #include "trace.h" 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 212 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 213 214 enum { 215 RB_LEN_TIME_EXTEND = 8, 216 RB_LEN_TIME_STAMP = 16, 217 }; 218 219 static inline int rb_null_event(struct ring_buffer_event *event) 220 { 221 return event->type_len == RINGBUF_TYPE_PADDING 222 && event->time_delta == 0; 223 } 224 225 static inline int rb_discarded_event(struct ring_buffer_event *event) 226 { 227 return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta; 228 } 229 230 static void rb_event_set_padding(struct ring_buffer_event *event) 231 { 232 event->type_len = RINGBUF_TYPE_PADDING; 233 event->time_delta = 0; 234 } 235 236 static unsigned 237 rb_event_data_length(struct ring_buffer_event *event) 238 { 239 unsigned length; 240 241 if (event->type_len) 242 length = event->type_len * RB_ALIGNMENT; 243 else 244 length = event->array[0]; 245 return length + RB_EVNT_HDR_SIZE; 246 } 247 248 /* inline for ring buffer fast paths */ 249 static unsigned 250 rb_event_length(struct ring_buffer_event *event) 251 { 252 switch (event->type_len) { 253 case RINGBUF_TYPE_PADDING: 254 if (rb_null_event(event)) 255 /* undefined */ 256 return -1; 257 return event->array[0] + RB_EVNT_HDR_SIZE; 258 259 case RINGBUF_TYPE_TIME_EXTEND: 260 return RB_LEN_TIME_EXTEND; 261 262 case RINGBUF_TYPE_TIME_STAMP: 263 return RB_LEN_TIME_STAMP; 264 265 case RINGBUF_TYPE_DATA: 266 return rb_event_data_length(event); 267 default: 268 BUG(); 269 } 270 /* not hit */ 271 return 0; 272 } 273 274 /** 275 * ring_buffer_event_length - return the length of the event 276 * @event: the event to get the length of 277 */ 278 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 279 { 280 unsigned length = rb_event_length(event); 281 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 282 return length; 283 length -= RB_EVNT_HDR_SIZE; 284 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 285 length -= sizeof(event->array[0]); 286 return length; 287 } 288 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 289 290 /* inline for ring buffer fast paths */ 291 static void * 292 rb_event_data(struct ring_buffer_event *event) 293 { 294 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define TS_SHIFT 27 316 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 317 #define TS_DELTA_TEST (~TS_MASK) 318 319 struct buffer_data_page { 320 u64 time_stamp; /* page time stamp */ 321 local_t commit; /* write committed index */ 322 unsigned char data[]; /* data of buffer page */ 323 }; 324 325 struct buffer_page { 326 struct list_head list; /* list of buffer pages */ 327 local_t write; /* index for next write */ 328 unsigned read; /* index for next read */ 329 local_t entries; /* entries on this page */ 330 struct buffer_data_page *page; /* Actual data page */ 331 }; 332 333 static void rb_init_page(struct buffer_data_page *bpage) 334 { 335 local_set(&bpage->commit, 0); 336 } 337 338 /** 339 * ring_buffer_page_len - the size of data on the page. 340 * @page: The page to read 341 * 342 * Returns the amount of data on the page, including buffer page header. 343 */ 344 size_t ring_buffer_page_len(void *page) 345 { 346 return local_read(&((struct buffer_data_page *)page)->commit) 347 + BUF_PAGE_HDR_SIZE; 348 } 349 350 /* 351 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 352 * this issue out. 353 */ 354 static void free_buffer_page(struct buffer_page *bpage) 355 { 356 free_page((unsigned long)bpage->page); 357 kfree(bpage); 358 } 359 360 /* 361 * We need to fit the time_stamp delta into 27 bits. 362 */ 363 static inline int test_time_stamp(u64 delta) 364 { 365 if (delta & TS_DELTA_TEST) 366 return 1; 367 return 0; 368 } 369 370 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 371 372 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 373 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 374 375 /* Max number of timestamps that can fit on a page */ 376 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 377 378 int ring_buffer_print_page_header(struct trace_seq *s) 379 { 380 struct buffer_data_page field; 381 int ret; 382 383 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 384 "offset:0;\tsize:%u;\n", 385 (unsigned int)sizeof(field.time_stamp)); 386 387 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 388 "offset:%u;\tsize:%u;\n", 389 (unsigned int)offsetof(typeof(field), commit), 390 (unsigned int)sizeof(field.commit)); 391 392 ret = trace_seq_printf(s, "\tfield: char data;\t" 393 "offset:%u;\tsize:%u;\n", 394 (unsigned int)offsetof(typeof(field), data), 395 (unsigned int)BUF_PAGE_SIZE); 396 397 return ret; 398 } 399 400 /* 401 * head_page == tail_page && head == tail then buffer is empty. 402 */ 403 struct ring_buffer_per_cpu { 404 int cpu; 405 struct ring_buffer *buffer; 406 spinlock_t reader_lock; /* serialize readers */ 407 raw_spinlock_t lock; 408 struct lock_class_key lock_key; 409 struct list_head pages; 410 struct buffer_page *head_page; /* read from head */ 411 struct buffer_page *tail_page; /* write to tail */ 412 struct buffer_page *commit_page; /* committed pages */ 413 struct buffer_page *reader_page; 414 unsigned long nmi_dropped; 415 unsigned long commit_overrun; 416 unsigned long overrun; 417 unsigned long read; 418 local_t entries; 419 local_t committing; 420 local_t commits; 421 u64 write_stamp; 422 u64 read_stamp; 423 atomic_t record_disabled; 424 }; 425 426 struct ring_buffer { 427 unsigned pages; 428 unsigned flags; 429 int cpus; 430 atomic_t record_disabled; 431 cpumask_var_t cpumask; 432 433 struct lock_class_key *reader_lock_key; 434 435 struct mutex mutex; 436 437 struct ring_buffer_per_cpu **buffers; 438 439 #ifdef CONFIG_HOTPLUG_CPU 440 struct notifier_block cpu_notify; 441 #endif 442 u64 (*clock)(void); 443 }; 444 445 struct ring_buffer_iter { 446 struct ring_buffer_per_cpu *cpu_buffer; 447 unsigned long head; 448 struct buffer_page *head_page; 449 u64 read_stamp; 450 }; 451 452 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 453 #define RB_WARN_ON(buffer, cond) \ 454 ({ \ 455 int _____ret = unlikely(cond); \ 456 if (_____ret) { \ 457 atomic_inc(&buffer->record_disabled); \ 458 WARN_ON(1); \ 459 } \ 460 _____ret; \ 461 }) 462 463 /* Up this if you want to test the TIME_EXTENTS and normalization */ 464 #define DEBUG_SHIFT 0 465 466 static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu) 467 { 468 /* shift to debug/test normalization and TIME_EXTENTS */ 469 return buffer->clock() << DEBUG_SHIFT; 470 } 471 472 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 473 { 474 u64 time; 475 476 preempt_disable_notrace(); 477 time = rb_time_stamp(buffer, cpu); 478 preempt_enable_no_resched_notrace(); 479 480 return time; 481 } 482 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 483 484 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 485 int cpu, u64 *ts) 486 { 487 /* Just stupid testing the normalize function and deltas */ 488 *ts >>= DEBUG_SHIFT; 489 } 490 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 491 492 /** 493 * check_pages - integrity check of buffer pages 494 * @cpu_buffer: CPU buffer with pages to test 495 * 496 * As a safety measure we check to make sure the data pages have not 497 * been corrupted. 498 */ 499 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 500 { 501 struct list_head *head = &cpu_buffer->pages; 502 struct buffer_page *bpage, *tmp; 503 504 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 505 return -1; 506 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 507 return -1; 508 509 list_for_each_entry_safe(bpage, tmp, head, list) { 510 if (RB_WARN_ON(cpu_buffer, 511 bpage->list.next->prev != &bpage->list)) 512 return -1; 513 if (RB_WARN_ON(cpu_buffer, 514 bpage->list.prev->next != &bpage->list)) 515 return -1; 516 } 517 518 return 0; 519 } 520 521 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 522 unsigned nr_pages) 523 { 524 struct list_head *head = &cpu_buffer->pages; 525 struct buffer_page *bpage, *tmp; 526 unsigned long addr; 527 LIST_HEAD(pages); 528 unsigned i; 529 530 for (i = 0; i < nr_pages; i++) { 531 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 532 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 533 if (!bpage) 534 goto free_pages; 535 list_add(&bpage->list, &pages); 536 537 addr = __get_free_page(GFP_KERNEL); 538 if (!addr) 539 goto free_pages; 540 bpage->page = (void *)addr; 541 rb_init_page(bpage->page); 542 } 543 544 list_splice(&pages, head); 545 546 rb_check_pages(cpu_buffer); 547 548 return 0; 549 550 free_pages: 551 list_for_each_entry_safe(bpage, tmp, &pages, list) { 552 list_del_init(&bpage->list); 553 free_buffer_page(bpage); 554 } 555 return -ENOMEM; 556 } 557 558 static struct ring_buffer_per_cpu * 559 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 560 { 561 struct ring_buffer_per_cpu *cpu_buffer; 562 struct buffer_page *bpage; 563 unsigned long addr; 564 int ret; 565 566 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 567 GFP_KERNEL, cpu_to_node(cpu)); 568 if (!cpu_buffer) 569 return NULL; 570 571 cpu_buffer->cpu = cpu; 572 cpu_buffer->buffer = buffer; 573 spin_lock_init(&cpu_buffer->reader_lock); 574 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 575 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 576 INIT_LIST_HEAD(&cpu_buffer->pages); 577 578 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 579 GFP_KERNEL, cpu_to_node(cpu)); 580 if (!bpage) 581 goto fail_free_buffer; 582 583 cpu_buffer->reader_page = bpage; 584 addr = __get_free_page(GFP_KERNEL); 585 if (!addr) 586 goto fail_free_reader; 587 bpage->page = (void *)addr; 588 rb_init_page(bpage->page); 589 590 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 591 592 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 593 if (ret < 0) 594 goto fail_free_reader; 595 596 cpu_buffer->head_page 597 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 598 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 599 600 return cpu_buffer; 601 602 fail_free_reader: 603 free_buffer_page(cpu_buffer->reader_page); 604 605 fail_free_buffer: 606 kfree(cpu_buffer); 607 return NULL; 608 } 609 610 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 611 { 612 struct list_head *head = &cpu_buffer->pages; 613 struct buffer_page *bpage, *tmp; 614 615 free_buffer_page(cpu_buffer->reader_page); 616 617 list_for_each_entry_safe(bpage, tmp, head, list) { 618 list_del_init(&bpage->list); 619 free_buffer_page(bpage); 620 } 621 kfree(cpu_buffer); 622 } 623 624 #ifdef CONFIG_HOTPLUG_CPU 625 static int rb_cpu_notify(struct notifier_block *self, 626 unsigned long action, void *hcpu); 627 #endif 628 629 /** 630 * ring_buffer_alloc - allocate a new ring_buffer 631 * @size: the size in bytes per cpu that is needed. 632 * @flags: attributes to set for the ring buffer. 633 * 634 * Currently the only flag that is available is the RB_FL_OVERWRITE 635 * flag. This flag means that the buffer will overwrite old data 636 * when the buffer wraps. If this flag is not set, the buffer will 637 * drop data when the tail hits the head. 638 */ 639 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 640 struct lock_class_key *key) 641 { 642 struct ring_buffer *buffer; 643 int bsize; 644 int cpu; 645 646 /* keep it in its own cache line */ 647 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 648 GFP_KERNEL); 649 if (!buffer) 650 return NULL; 651 652 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 653 goto fail_free_buffer; 654 655 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 656 buffer->flags = flags; 657 buffer->clock = trace_clock_local; 658 buffer->reader_lock_key = key; 659 660 /* need at least two pages */ 661 if (buffer->pages < 2) 662 buffer->pages = 2; 663 664 /* 665 * In case of non-hotplug cpu, if the ring-buffer is allocated 666 * in early initcall, it will not be notified of secondary cpus. 667 * In that off case, we need to allocate for all possible cpus. 668 */ 669 #ifdef CONFIG_HOTPLUG_CPU 670 get_online_cpus(); 671 cpumask_copy(buffer->cpumask, cpu_online_mask); 672 #else 673 cpumask_copy(buffer->cpumask, cpu_possible_mask); 674 #endif 675 buffer->cpus = nr_cpu_ids; 676 677 bsize = sizeof(void *) * nr_cpu_ids; 678 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 679 GFP_KERNEL); 680 if (!buffer->buffers) 681 goto fail_free_cpumask; 682 683 for_each_buffer_cpu(buffer, cpu) { 684 buffer->buffers[cpu] = 685 rb_allocate_cpu_buffer(buffer, cpu); 686 if (!buffer->buffers[cpu]) 687 goto fail_free_buffers; 688 } 689 690 #ifdef CONFIG_HOTPLUG_CPU 691 buffer->cpu_notify.notifier_call = rb_cpu_notify; 692 buffer->cpu_notify.priority = 0; 693 register_cpu_notifier(&buffer->cpu_notify); 694 #endif 695 696 put_online_cpus(); 697 mutex_init(&buffer->mutex); 698 699 return buffer; 700 701 fail_free_buffers: 702 for_each_buffer_cpu(buffer, cpu) { 703 if (buffer->buffers[cpu]) 704 rb_free_cpu_buffer(buffer->buffers[cpu]); 705 } 706 kfree(buffer->buffers); 707 708 fail_free_cpumask: 709 free_cpumask_var(buffer->cpumask); 710 put_online_cpus(); 711 712 fail_free_buffer: 713 kfree(buffer); 714 return NULL; 715 } 716 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 717 718 /** 719 * ring_buffer_free - free a ring buffer. 720 * @buffer: the buffer to free. 721 */ 722 void 723 ring_buffer_free(struct ring_buffer *buffer) 724 { 725 int cpu; 726 727 get_online_cpus(); 728 729 #ifdef CONFIG_HOTPLUG_CPU 730 unregister_cpu_notifier(&buffer->cpu_notify); 731 #endif 732 733 for_each_buffer_cpu(buffer, cpu) 734 rb_free_cpu_buffer(buffer->buffers[cpu]); 735 736 put_online_cpus(); 737 738 free_cpumask_var(buffer->cpumask); 739 740 kfree(buffer); 741 } 742 EXPORT_SYMBOL_GPL(ring_buffer_free); 743 744 void ring_buffer_set_clock(struct ring_buffer *buffer, 745 u64 (*clock)(void)) 746 { 747 buffer->clock = clock; 748 } 749 750 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 751 752 static void 753 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 754 { 755 struct buffer_page *bpage; 756 struct list_head *p; 757 unsigned i; 758 759 atomic_inc(&cpu_buffer->record_disabled); 760 synchronize_sched(); 761 762 for (i = 0; i < nr_pages; i++) { 763 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 764 return; 765 p = cpu_buffer->pages.next; 766 bpage = list_entry(p, struct buffer_page, list); 767 list_del_init(&bpage->list); 768 free_buffer_page(bpage); 769 } 770 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 771 return; 772 773 rb_reset_cpu(cpu_buffer); 774 775 rb_check_pages(cpu_buffer); 776 777 atomic_dec(&cpu_buffer->record_disabled); 778 779 } 780 781 static void 782 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 783 struct list_head *pages, unsigned nr_pages) 784 { 785 struct buffer_page *bpage; 786 struct list_head *p; 787 unsigned i; 788 789 atomic_inc(&cpu_buffer->record_disabled); 790 synchronize_sched(); 791 792 for (i = 0; i < nr_pages; i++) { 793 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 794 return; 795 p = pages->next; 796 bpage = list_entry(p, struct buffer_page, list); 797 list_del_init(&bpage->list); 798 list_add_tail(&bpage->list, &cpu_buffer->pages); 799 } 800 rb_reset_cpu(cpu_buffer); 801 802 rb_check_pages(cpu_buffer); 803 804 atomic_dec(&cpu_buffer->record_disabled); 805 } 806 807 /** 808 * ring_buffer_resize - resize the ring buffer 809 * @buffer: the buffer to resize. 810 * @size: the new size. 811 * 812 * The tracer is responsible for making sure that the buffer is 813 * not being used while changing the size. 814 * Note: We may be able to change the above requirement by using 815 * RCU synchronizations. 816 * 817 * Minimum size is 2 * BUF_PAGE_SIZE. 818 * 819 * Returns -1 on failure. 820 */ 821 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 822 { 823 struct ring_buffer_per_cpu *cpu_buffer; 824 unsigned nr_pages, rm_pages, new_pages; 825 struct buffer_page *bpage, *tmp; 826 unsigned long buffer_size; 827 unsigned long addr; 828 LIST_HEAD(pages); 829 int i, cpu; 830 831 /* 832 * Always succeed at resizing a non-existent buffer: 833 */ 834 if (!buffer) 835 return size; 836 837 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 838 size *= BUF_PAGE_SIZE; 839 buffer_size = buffer->pages * BUF_PAGE_SIZE; 840 841 /* we need a minimum of two pages */ 842 if (size < BUF_PAGE_SIZE * 2) 843 size = BUF_PAGE_SIZE * 2; 844 845 if (size == buffer_size) 846 return size; 847 848 mutex_lock(&buffer->mutex); 849 get_online_cpus(); 850 851 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 852 853 if (size < buffer_size) { 854 855 /* easy case, just free pages */ 856 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 857 goto out_fail; 858 859 rm_pages = buffer->pages - nr_pages; 860 861 for_each_buffer_cpu(buffer, cpu) { 862 cpu_buffer = buffer->buffers[cpu]; 863 rb_remove_pages(cpu_buffer, rm_pages); 864 } 865 goto out; 866 } 867 868 /* 869 * This is a bit more difficult. We only want to add pages 870 * when we can allocate enough for all CPUs. We do this 871 * by allocating all the pages and storing them on a local 872 * link list. If we succeed in our allocation, then we 873 * add these pages to the cpu_buffers. Otherwise we just free 874 * them all and return -ENOMEM; 875 */ 876 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 877 goto out_fail; 878 879 new_pages = nr_pages - buffer->pages; 880 881 for_each_buffer_cpu(buffer, cpu) { 882 for (i = 0; i < new_pages; i++) { 883 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 884 cache_line_size()), 885 GFP_KERNEL, cpu_to_node(cpu)); 886 if (!bpage) 887 goto free_pages; 888 list_add(&bpage->list, &pages); 889 addr = __get_free_page(GFP_KERNEL); 890 if (!addr) 891 goto free_pages; 892 bpage->page = (void *)addr; 893 rb_init_page(bpage->page); 894 } 895 } 896 897 for_each_buffer_cpu(buffer, cpu) { 898 cpu_buffer = buffer->buffers[cpu]; 899 rb_insert_pages(cpu_buffer, &pages, new_pages); 900 } 901 902 if (RB_WARN_ON(buffer, !list_empty(&pages))) 903 goto out_fail; 904 905 out: 906 buffer->pages = nr_pages; 907 put_online_cpus(); 908 mutex_unlock(&buffer->mutex); 909 910 return size; 911 912 free_pages: 913 list_for_each_entry_safe(bpage, tmp, &pages, list) { 914 list_del_init(&bpage->list); 915 free_buffer_page(bpage); 916 } 917 put_online_cpus(); 918 mutex_unlock(&buffer->mutex); 919 return -ENOMEM; 920 921 /* 922 * Something went totally wrong, and we are too paranoid 923 * to even clean up the mess. 924 */ 925 out_fail: 926 put_online_cpus(); 927 mutex_unlock(&buffer->mutex); 928 return -1; 929 } 930 EXPORT_SYMBOL_GPL(ring_buffer_resize); 931 932 static inline void * 933 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 934 { 935 return bpage->data + index; 936 } 937 938 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 939 { 940 return bpage->page->data + index; 941 } 942 943 static inline struct ring_buffer_event * 944 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 945 { 946 return __rb_page_index(cpu_buffer->reader_page, 947 cpu_buffer->reader_page->read); 948 } 949 950 static inline struct ring_buffer_event * 951 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 952 { 953 return __rb_page_index(cpu_buffer->head_page, 954 cpu_buffer->head_page->read); 955 } 956 957 static inline struct ring_buffer_event * 958 rb_iter_head_event(struct ring_buffer_iter *iter) 959 { 960 return __rb_page_index(iter->head_page, iter->head); 961 } 962 963 static inline unsigned rb_page_write(struct buffer_page *bpage) 964 { 965 return local_read(&bpage->write); 966 } 967 968 static inline unsigned rb_page_commit(struct buffer_page *bpage) 969 { 970 return local_read(&bpage->page->commit); 971 } 972 973 /* Size is determined by what has been commited */ 974 static inline unsigned rb_page_size(struct buffer_page *bpage) 975 { 976 return rb_page_commit(bpage); 977 } 978 979 static inline unsigned 980 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 981 { 982 return rb_page_commit(cpu_buffer->commit_page); 983 } 984 985 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 986 { 987 return rb_page_commit(cpu_buffer->head_page); 988 } 989 990 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 991 struct buffer_page **bpage) 992 { 993 struct list_head *p = (*bpage)->list.next; 994 995 if (p == &cpu_buffer->pages) 996 p = p->next; 997 998 *bpage = list_entry(p, struct buffer_page, list); 999 } 1000 1001 static inline unsigned 1002 rb_event_index(struct ring_buffer_event *event) 1003 { 1004 unsigned long addr = (unsigned long)event; 1005 1006 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1007 } 1008 1009 static inline int 1010 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1011 struct ring_buffer_event *event) 1012 { 1013 unsigned long addr = (unsigned long)event; 1014 unsigned long index; 1015 1016 index = rb_event_index(event); 1017 addr &= PAGE_MASK; 1018 1019 return cpu_buffer->commit_page->page == (void *)addr && 1020 rb_commit_index(cpu_buffer) == index; 1021 } 1022 1023 static void 1024 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1025 { 1026 /* 1027 * We only race with interrupts and NMIs on this CPU. 1028 * If we own the commit event, then we can commit 1029 * all others that interrupted us, since the interruptions 1030 * are in stack format (they finish before they come 1031 * back to us). This allows us to do a simple loop to 1032 * assign the commit to the tail. 1033 */ 1034 again: 1035 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1036 cpu_buffer->commit_page->page->commit = 1037 cpu_buffer->commit_page->write; 1038 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1039 cpu_buffer->write_stamp = 1040 cpu_buffer->commit_page->page->time_stamp; 1041 /* add barrier to keep gcc from optimizing too much */ 1042 barrier(); 1043 } 1044 while (rb_commit_index(cpu_buffer) != 1045 rb_page_write(cpu_buffer->commit_page)) { 1046 cpu_buffer->commit_page->page->commit = 1047 cpu_buffer->commit_page->write; 1048 barrier(); 1049 } 1050 1051 /* again, keep gcc from optimizing */ 1052 barrier(); 1053 1054 /* 1055 * If an interrupt came in just after the first while loop 1056 * and pushed the tail page forward, we will be left with 1057 * a dangling commit that will never go forward. 1058 */ 1059 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1060 goto again; 1061 } 1062 1063 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1064 { 1065 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1066 cpu_buffer->reader_page->read = 0; 1067 } 1068 1069 static void rb_inc_iter(struct ring_buffer_iter *iter) 1070 { 1071 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1072 1073 /* 1074 * The iterator could be on the reader page (it starts there). 1075 * But the head could have moved, since the reader was 1076 * found. Check for this case and assign the iterator 1077 * to the head page instead of next. 1078 */ 1079 if (iter->head_page == cpu_buffer->reader_page) 1080 iter->head_page = cpu_buffer->head_page; 1081 else 1082 rb_inc_page(cpu_buffer, &iter->head_page); 1083 1084 iter->read_stamp = iter->head_page->page->time_stamp; 1085 iter->head = 0; 1086 } 1087 1088 /** 1089 * ring_buffer_update_event - update event type and data 1090 * @event: the even to update 1091 * @type: the type of event 1092 * @length: the size of the event field in the ring buffer 1093 * 1094 * Update the type and data fields of the event. The length 1095 * is the actual size that is written to the ring buffer, 1096 * and with this, we can determine what to place into the 1097 * data field. 1098 */ 1099 static void 1100 rb_update_event(struct ring_buffer_event *event, 1101 unsigned type, unsigned length) 1102 { 1103 event->type_len = type; 1104 1105 switch (type) { 1106 1107 case RINGBUF_TYPE_PADDING: 1108 case RINGBUF_TYPE_TIME_EXTEND: 1109 case RINGBUF_TYPE_TIME_STAMP: 1110 break; 1111 1112 case 0: 1113 length -= RB_EVNT_HDR_SIZE; 1114 if (length > RB_MAX_SMALL_DATA) 1115 event->array[0] = length; 1116 else 1117 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1118 break; 1119 default: 1120 BUG(); 1121 } 1122 } 1123 1124 static unsigned rb_calculate_event_length(unsigned length) 1125 { 1126 struct ring_buffer_event event; /* Used only for sizeof array */ 1127 1128 /* zero length can cause confusions */ 1129 if (!length) 1130 length = 1; 1131 1132 if (length > RB_MAX_SMALL_DATA) 1133 length += sizeof(event.array[0]); 1134 1135 length += RB_EVNT_HDR_SIZE; 1136 length = ALIGN(length, RB_ALIGNMENT); 1137 1138 return length; 1139 } 1140 1141 static inline void 1142 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1143 struct buffer_page *tail_page, 1144 unsigned long tail, unsigned long length) 1145 { 1146 struct ring_buffer_event *event; 1147 1148 /* 1149 * Only the event that crossed the page boundary 1150 * must fill the old tail_page with padding. 1151 */ 1152 if (tail >= BUF_PAGE_SIZE) { 1153 local_sub(length, &tail_page->write); 1154 return; 1155 } 1156 1157 event = __rb_page_index(tail_page, tail); 1158 kmemcheck_annotate_bitfield(event, bitfield); 1159 1160 /* 1161 * If this event is bigger than the minimum size, then 1162 * we need to be careful that we don't subtract the 1163 * write counter enough to allow another writer to slip 1164 * in on this page. 1165 * We put in a discarded commit instead, to make sure 1166 * that this space is not used again. 1167 * 1168 * If we are less than the minimum size, we don't need to 1169 * worry about it. 1170 */ 1171 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1172 /* No room for any events */ 1173 1174 /* Mark the rest of the page with padding */ 1175 rb_event_set_padding(event); 1176 1177 /* Set the write back to the previous setting */ 1178 local_sub(length, &tail_page->write); 1179 return; 1180 } 1181 1182 /* Put in a discarded event */ 1183 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1184 event->type_len = RINGBUF_TYPE_PADDING; 1185 /* time delta must be non zero */ 1186 event->time_delta = 1; 1187 /* Account for this as an entry */ 1188 local_inc(&tail_page->entries); 1189 local_inc(&cpu_buffer->entries); 1190 1191 /* Set write to end of buffer */ 1192 length = (tail + length) - BUF_PAGE_SIZE; 1193 local_sub(length, &tail_page->write); 1194 } 1195 1196 static struct ring_buffer_event * 1197 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1198 unsigned long length, unsigned long tail, 1199 struct buffer_page *commit_page, 1200 struct buffer_page *tail_page, u64 *ts) 1201 { 1202 struct buffer_page *next_page, *head_page, *reader_page; 1203 struct ring_buffer *buffer = cpu_buffer->buffer; 1204 bool lock_taken = false; 1205 unsigned long flags; 1206 1207 next_page = tail_page; 1208 1209 local_irq_save(flags); 1210 /* 1211 * Since the write to the buffer is still not 1212 * fully lockless, we must be careful with NMIs. 1213 * The locks in the writers are taken when a write 1214 * crosses to a new page. The locks protect against 1215 * races with the readers (this will soon be fixed 1216 * with a lockless solution). 1217 * 1218 * Because we can not protect against NMIs, and we 1219 * want to keep traces reentrant, we need to manage 1220 * what happens when we are in an NMI. 1221 * 1222 * NMIs can happen after we take the lock. 1223 * If we are in an NMI, only take the lock 1224 * if it is not already taken. Otherwise 1225 * simply fail. 1226 */ 1227 if (unlikely(in_nmi())) { 1228 if (!__raw_spin_trylock(&cpu_buffer->lock)) { 1229 cpu_buffer->nmi_dropped++; 1230 goto out_reset; 1231 } 1232 } else 1233 __raw_spin_lock(&cpu_buffer->lock); 1234 1235 lock_taken = true; 1236 1237 rb_inc_page(cpu_buffer, &next_page); 1238 1239 head_page = cpu_buffer->head_page; 1240 reader_page = cpu_buffer->reader_page; 1241 1242 /* we grabbed the lock before incrementing */ 1243 if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) 1244 goto out_reset; 1245 1246 /* 1247 * If for some reason, we had an interrupt storm that made 1248 * it all the way around the buffer, bail, and warn 1249 * about it. 1250 */ 1251 if (unlikely(next_page == commit_page)) { 1252 cpu_buffer->commit_overrun++; 1253 goto out_reset; 1254 } 1255 1256 if (next_page == head_page) { 1257 if (!(buffer->flags & RB_FL_OVERWRITE)) 1258 goto out_reset; 1259 1260 /* tail_page has not moved yet? */ 1261 if (tail_page == cpu_buffer->tail_page) { 1262 /* count overflows */ 1263 cpu_buffer->overrun += 1264 local_read(&head_page->entries); 1265 1266 rb_inc_page(cpu_buffer, &head_page); 1267 cpu_buffer->head_page = head_page; 1268 cpu_buffer->head_page->read = 0; 1269 } 1270 } 1271 1272 /* 1273 * If the tail page is still the same as what we think 1274 * it is, then it is up to us to update the tail 1275 * pointer. 1276 */ 1277 if (tail_page == cpu_buffer->tail_page) { 1278 local_set(&next_page->write, 0); 1279 local_set(&next_page->entries, 0); 1280 local_set(&next_page->page->commit, 0); 1281 cpu_buffer->tail_page = next_page; 1282 1283 /* reread the time stamp */ 1284 *ts = rb_time_stamp(buffer, cpu_buffer->cpu); 1285 cpu_buffer->tail_page->page->time_stamp = *ts; 1286 } 1287 1288 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1289 1290 __raw_spin_unlock(&cpu_buffer->lock); 1291 local_irq_restore(flags); 1292 1293 /* fail and let the caller try again */ 1294 return ERR_PTR(-EAGAIN); 1295 1296 out_reset: 1297 /* reset write */ 1298 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1299 1300 if (likely(lock_taken)) 1301 __raw_spin_unlock(&cpu_buffer->lock); 1302 local_irq_restore(flags); 1303 return NULL; 1304 } 1305 1306 static struct ring_buffer_event * 1307 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1308 unsigned type, unsigned long length, u64 *ts) 1309 { 1310 struct buffer_page *tail_page, *commit_page; 1311 struct ring_buffer_event *event; 1312 unsigned long tail, write; 1313 1314 commit_page = cpu_buffer->commit_page; 1315 /* we just need to protect against interrupts */ 1316 barrier(); 1317 tail_page = cpu_buffer->tail_page; 1318 write = local_add_return(length, &tail_page->write); 1319 tail = write - length; 1320 1321 /* See if we shot pass the end of this buffer page */ 1322 if (write > BUF_PAGE_SIZE) 1323 return rb_move_tail(cpu_buffer, length, tail, 1324 commit_page, tail_page, ts); 1325 1326 /* We reserved something on the buffer */ 1327 1328 event = __rb_page_index(tail_page, tail); 1329 kmemcheck_annotate_bitfield(event, bitfield); 1330 rb_update_event(event, type, length); 1331 1332 /* The passed in type is zero for DATA */ 1333 if (likely(!type)) 1334 local_inc(&tail_page->entries); 1335 1336 /* 1337 * If this is the first commit on the page, then update 1338 * its timestamp. 1339 */ 1340 if (!tail) 1341 tail_page->page->time_stamp = *ts; 1342 1343 return event; 1344 } 1345 1346 static inline int 1347 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1348 struct ring_buffer_event *event) 1349 { 1350 unsigned long new_index, old_index; 1351 struct buffer_page *bpage; 1352 unsigned long index; 1353 unsigned long addr; 1354 1355 new_index = rb_event_index(event); 1356 old_index = new_index + rb_event_length(event); 1357 addr = (unsigned long)event; 1358 addr &= PAGE_MASK; 1359 1360 bpage = cpu_buffer->tail_page; 1361 1362 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1363 /* 1364 * This is on the tail page. It is possible that 1365 * a write could come in and move the tail page 1366 * and write to the next page. That is fine 1367 * because we just shorten what is on this page. 1368 */ 1369 index = local_cmpxchg(&bpage->write, old_index, new_index); 1370 if (index == old_index) 1371 return 1; 1372 } 1373 1374 /* could not discard */ 1375 return 0; 1376 } 1377 1378 static int 1379 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1380 u64 *ts, u64 *delta) 1381 { 1382 struct ring_buffer_event *event; 1383 static int once; 1384 int ret; 1385 1386 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1387 printk(KERN_WARNING "Delta way too big! %llu" 1388 " ts=%llu write stamp = %llu\n", 1389 (unsigned long long)*delta, 1390 (unsigned long long)*ts, 1391 (unsigned long long)cpu_buffer->write_stamp); 1392 WARN_ON(1); 1393 } 1394 1395 /* 1396 * The delta is too big, we to add a 1397 * new timestamp. 1398 */ 1399 event = __rb_reserve_next(cpu_buffer, 1400 RINGBUF_TYPE_TIME_EXTEND, 1401 RB_LEN_TIME_EXTEND, 1402 ts); 1403 if (!event) 1404 return -EBUSY; 1405 1406 if (PTR_ERR(event) == -EAGAIN) 1407 return -EAGAIN; 1408 1409 /* Only a commited time event can update the write stamp */ 1410 if (rb_event_is_commit(cpu_buffer, event)) { 1411 /* 1412 * If this is the first on the page, then it was 1413 * updated with the page itself. Try to discard it 1414 * and if we can't just make it zero. 1415 */ 1416 if (rb_event_index(event)) { 1417 event->time_delta = *delta & TS_MASK; 1418 event->array[0] = *delta >> TS_SHIFT; 1419 } else { 1420 /* try to discard, since we do not need this */ 1421 if (!rb_try_to_discard(cpu_buffer, event)) { 1422 /* nope, just zero it */ 1423 event->time_delta = 0; 1424 event->array[0] = 0; 1425 } 1426 } 1427 cpu_buffer->write_stamp = *ts; 1428 /* let the caller know this was the commit */ 1429 ret = 1; 1430 } else { 1431 /* Try to discard the event */ 1432 if (!rb_try_to_discard(cpu_buffer, event)) { 1433 /* Darn, this is just wasted space */ 1434 event->time_delta = 0; 1435 event->array[0] = 0; 1436 } 1437 ret = 0; 1438 } 1439 1440 *delta = 0; 1441 1442 return ret; 1443 } 1444 1445 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 1446 { 1447 local_inc(&cpu_buffer->committing); 1448 local_inc(&cpu_buffer->commits); 1449 } 1450 1451 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 1452 { 1453 unsigned long commits; 1454 1455 if (RB_WARN_ON(cpu_buffer, 1456 !local_read(&cpu_buffer->committing))) 1457 return; 1458 1459 again: 1460 commits = local_read(&cpu_buffer->commits); 1461 /* synchronize with interrupts */ 1462 barrier(); 1463 if (local_read(&cpu_buffer->committing) == 1) 1464 rb_set_commit_to_write(cpu_buffer); 1465 1466 local_dec(&cpu_buffer->committing); 1467 1468 /* synchronize with interrupts */ 1469 barrier(); 1470 1471 /* 1472 * Need to account for interrupts coming in between the 1473 * updating of the commit page and the clearing of the 1474 * committing counter. 1475 */ 1476 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 1477 !local_read(&cpu_buffer->committing)) { 1478 local_inc(&cpu_buffer->committing); 1479 goto again; 1480 } 1481 } 1482 1483 static struct ring_buffer_event * 1484 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1485 unsigned long length) 1486 { 1487 struct ring_buffer_event *event; 1488 u64 ts, delta = 0; 1489 int commit = 0; 1490 int nr_loops = 0; 1491 1492 rb_start_commit(cpu_buffer); 1493 1494 length = rb_calculate_event_length(length); 1495 again: 1496 /* 1497 * We allow for interrupts to reenter here and do a trace. 1498 * If one does, it will cause this original code to loop 1499 * back here. Even with heavy interrupts happening, this 1500 * should only happen a few times in a row. If this happens 1501 * 1000 times in a row, there must be either an interrupt 1502 * storm or we have something buggy. 1503 * Bail! 1504 */ 1505 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1506 goto out_fail; 1507 1508 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); 1509 1510 /* 1511 * Only the first commit can update the timestamp. 1512 * Yes there is a race here. If an interrupt comes in 1513 * just after the conditional and it traces too, then it 1514 * will also check the deltas. More than one timestamp may 1515 * also be made. But only the entry that did the actual 1516 * commit will be something other than zero. 1517 */ 1518 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 1519 rb_page_write(cpu_buffer->tail_page) == 1520 rb_commit_index(cpu_buffer))) { 1521 u64 diff; 1522 1523 diff = ts - cpu_buffer->write_stamp; 1524 1525 /* make sure this diff is calculated here */ 1526 barrier(); 1527 1528 /* Did the write stamp get updated already? */ 1529 if (unlikely(ts < cpu_buffer->write_stamp)) 1530 goto get_event; 1531 1532 delta = diff; 1533 if (unlikely(test_time_stamp(delta))) { 1534 1535 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1536 if (commit == -EBUSY) 1537 goto out_fail; 1538 1539 if (commit == -EAGAIN) 1540 goto again; 1541 1542 RB_WARN_ON(cpu_buffer, commit < 0); 1543 } 1544 } 1545 1546 get_event: 1547 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 1548 if (unlikely(PTR_ERR(event) == -EAGAIN)) 1549 goto again; 1550 1551 if (!event) 1552 goto out_fail; 1553 1554 if (!rb_event_is_commit(cpu_buffer, event)) 1555 delta = 0; 1556 1557 event->time_delta = delta; 1558 1559 return event; 1560 1561 out_fail: 1562 rb_end_commit(cpu_buffer); 1563 return NULL; 1564 } 1565 1566 #ifdef CONFIG_TRACING 1567 1568 #define TRACE_RECURSIVE_DEPTH 16 1569 1570 static int trace_recursive_lock(void) 1571 { 1572 current->trace_recursion++; 1573 1574 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 1575 return 0; 1576 1577 /* Disable all tracing before we do anything else */ 1578 tracing_off_permanent(); 1579 1580 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 1581 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 1582 current->trace_recursion, 1583 hardirq_count() >> HARDIRQ_SHIFT, 1584 softirq_count() >> SOFTIRQ_SHIFT, 1585 in_nmi()); 1586 1587 WARN_ON_ONCE(1); 1588 return -1; 1589 } 1590 1591 static void trace_recursive_unlock(void) 1592 { 1593 WARN_ON_ONCE(!current->trace_recursion); 1594 1595 current->trace_recursion--; 1596 } 1597 1598 #else 1599 1600 #define trace_recursive_lock() (0) 1601 #define trace_recursive_unlock() do { } while (0) 1602 1603 #endif 1604 1605 static DEFINE_PER_CPU(int, rb_need_resched); 1606 1607 /** 1608 * ring_buffer_lock_reserve - reserve a part of the buffer 1609 * @buffer: the ring buffer to reserve from 1610 * @length: the length of the data to reserve (excluding event header) 1611 * 1612 * Returns a reseverd event on the ring buffer to copy directly to. 1613 * The user of this interface will need to get the body to write into 1614 * and can use the ring_buffer_event_data() interface. 1615 * 1616 * The length is the length of the data needed, not the event length 1617 * which also includes the event header. 1618 * 1619 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1620 * If NULL is returned, then nothing has been allocated or locked. 1621 */ 1622 struct ring_buffer_event * 1623 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 1624 { 1625 struct ring_buffer_per_cpu *cpu_buffer; 1626 struct ring_buffer_event *event; 1627 int cpu, resched; 1628 1629 if (ring_buffer_flags != RB_BUFFERS_ON) 1630 return NULL; 1631 1632 if (atomic_read(&buffer->record_disabled)) 1633 return NULL; 1634 1635 /* If we are tracing schedule, we don't want to recurse */ 1636 resched = ftrace_preempt_disable(); 1637 1638 if (trace_recursive_lock()) 1639 goto out_nocheck; 1640 1641 cpu = raw_smp_processor_id(); 1642 1643 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1644 goto out; 1645 1646 cpu_buffer = buffer->buffers[cpu]; 1647 1648 if (atomic_read(&cpu_buffer->record_disabled)) 1649 goto out; 1650 1651 if (length > BUF_MAX_DATA_SIZE) 1652 goto out; 1653 1654 event = rb_reserve_next_event(cpu_buffer, length); 1655 if (!event) 1656 goto out; 1657 1658 /* 1659 * Need to store resched state on this cpu. 1660 * Only the first needs to. 1661 */ 1662 1663 if (preempt_count() == 1) 1664 per_cpu(rb_need_resched, cpu) = resched; 1665 1666 return event; 1667 1668 out: 1669 trace_recursive_unlock(); 1670 1671 out_nocheck: 1672 ftrace_preempt_enable(resched); 1673 return NULL; 1674 } 1675 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 1676 1677 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1678 struct ring_buffer_event *event) 1679 { 1680 local_inc(&cpu_buffer->entries); 1681 1682 /* 1683 * The event first in the commit queue updates the 1684 * time stamp. 1685 */ 1686 if (rb_event_is_commit(cpu_buffer, event)) 1687 cpu_buffer->write_stamp += event->time_delta; 1688 1689 rb_end_commit(cpu_buffer); 1690 } 1691 1692 /** 1693 * ring_buffer_unlock_commit - commit a reserved 1694 * @buffer: The buffer to commit to 1695 * @event: The event pointer to commit. 1696 * 1697 * This commits the data to the ring buffer, and releases any locks held. 1698 * 1699 * Must be paired with ring_buffer_lock_reserve. 1700 */ 1701 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1702 struct ring_buffer_event *event) 1703 { 1704 struct ring_buffer_per_cpu *cpu_buffer; 1705 int cpu = raw_smp_processor_id(); 1706 1707 cpu_buffer = buffer->buffers[cpu]; 1708 1709 rb_commit(cpu_buffer, event); 1710 1711 trace_recursive_unlock(); 1712 1713 /* 1714 * Only the last preempt count needs to restore preemption. 1715 */ 1716 if (preempt_count() == 1) 1717 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1718 else 1719 preempt_enable_no_resched_notrace(); 1720 1721 return 0; 1722 } 1723 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 1724 1725 static inline void rb_event_discard(struct ring_buffer_event *event) 1726 { 1727 /* array[0] holds the actual length for the discarded event */ 1728 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 1729 event->type_len = RINGBUF_TYPE_PADDING; 1730 /* time delta must be non zero */ 1731 if (!event->time_delta) 1732 event->time_delta = 1; 1733 } 1734 1735 /** 1736 * ring_buffer_event_discard - discard any event in the ring buffer 1737 * @event: the event to discard 1738 * 1739 * Sometimes a event that is in the ring buffer needs to be ignored. 1740 * This function lets the user discard an event in the ring buffer 1741 * and then that event will not be read later. 1742 * 1743 * Note, it is up to the user to be careful with this, and protect 1744 * against races. If the user discards an event that has been consumed 1745 * it is possible that it could corrupt the ring buffer. 1746 */ 1747 void ring_buffer_event_discard(struct ring_buffer_event *event) 1748 { 1749 rb_event_discard(event); 1750 } 1751 EXPORT_SYMBOL_GPL(ring_buffer_event_discard); 1752 1753 /** 1754 * ring_buffer_commit_discard - discard an event that has not been committed 1755 * @buffer: the ring buffer 1756 * @event: non committed event to discard 1757 * 1758 * This is similar to ring_buffer_event_discard but must only be 1759 * performed on an event that has not been committed yet. The difference 1760 * is that this will also try to free the event from the ring buffer 1761 * if another event has not been added behind it. 1762 * 1763 * If another event has been added behind it, it will set the event 1764 * up as discarded, and perform the commit. 1765 * 1766 * If this function is called, do not call ring_buffer_unlock_commit on 1767 * the event. 1768 */ 1769 void ring_buffer_discard_commit(struct ring_buffer *buffer, 1770 struct ring_buffer_event *event) 1771 { 1772 struct ring_buffer_per_cpu *cpu_buffer; 1773 int cpu; 1774 1775 /* The event is discarded regardless */ 1776 rb_event_discard(event); 1777 1778 cpu = smp_processor_id(); 1779 cpu_buffer = buffer->buffers[cpu]; 1780 1781 /* 1782 * This must only be called if the event has not been 1783 * committed yet. Thus we can assume that preemption 1784 * is still disabled. 1785 */ 1786 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 1787 1788 if (!rb_try_to_discard(cpu_buffer, event)) 1789 goto out; 1790 1791 /* 1792 * The commit is still visible by the reader, so we 1793 * must increment entries. 1794 */ 1795 local_inc(&cpu_buffer->entries); 1796 out: 1797 rb_end_commit(cpu_buffer); 1798 1799 trace_recursive_unlock(); 1800 1801 /* 1802 * Only the last preempt count needs to restore preemption. 1803 */ 1804 if (preempt_count() == 1) 1805 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1806 else 1807 preempt_enable_no_resched_notrace(); 1808 1809 } 1810 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 1811 1812 /** 1813 * ring_buffer_write - write data to the buffer without reserving 1814 * @buffer: The ring buffer to write to. 1815 * @length: The length of the data being written (excluding the event header) 1816 * @data: The data to write to the buffer. 1817 * 1818 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1819 * one function. If you already have the data to write to the buffer, it 1820 * may be easier to simply call this function. 1821 * 1822 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1823 * and not the length of the event which would hold the header. 1824 */ 1825 int ring_buffer_write(struct ring_buffer *buffer, 1826 unsigned long length, 1827 void *data) 1828 { 1829 struct ring_buffer_per_cpu *cpu_buffer; 1830 struct ring_buffer_event *event; 1831 void *body; 1832 int ret = -EBUSY; 1833 int cpu, resched; 1834 1835 if (ring_buffer_flags != RB_BUFFERS_ON) 1836 return -EBUSY; 1837 1838 if (atomic_read(&buffer->record_disabled)) 1839 return -EBUSY; 1840 1841 resched = ftrace_preempt_disable(); 1842 1843 cpu = raw_smp_processor_id(); 1844 1845 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1846 goto out; 1847 1848 cpu_buffer = buffer->buffers[cpu]; 1849 1850 if (atomic_read(&cpu_buffer->record_disabled)) 1851 goto out; 1852 1853 if (length > BUF_MAX_DATA_SIZE) 1854 goto out; 1855 1856 event = rb_reserve_next_event(cpu_buffer, length); 1857 if (!event) 1858 goto out; 1859 1860 body = rb_event_data(event); 1861 1862 memcpy(body, data, length); 1863 1864 rb_commit(cpu_buffer, event); 1865 1866 ret = 0; 1867 out: 1868 ftrace_preempt_enable(resched); 1869 1870 return ret; 1871 } 1872 EXPORT_SYMBOL_GPL(ring_buffer_write); 1873 1874 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1875 { 1876 struct buffer_page *reader = cpu_buffer->reader_page; 1877 struct buffer_page *head = cpu_buffer->head_page; 1878 struct buffer_page *commit = cpu_buffer->commit_page; 1879 1880 return reader->read == rb_page_commit(reader) && 1881 (commit == reader || 1882 (commit == head && 1883 head->read == rb_page_commit(commit))); 1884 } 1885 1886 /** 1887 * ring_buffer_record_disable - stop all writes into the buffer 1888 * @buffer: The ring buffer to stop writes to. 1889 * 1890 * This prevents all writes to the buffer. Any attempt to write 1891 * to the buffer after this will fail and return NULL. 1892 * 1893 * The caller should call synchronize_sched() after this. 1894 */ 1895 void ring_buffer_record_disable(struct ring_buffer *buffer) 1896 { 1897 atomic_inc(&buffer->record_disabled); 1898 } 1899 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 1900 1901 /** 1902 * ring_buffer_record_enable - enable writes to the buffer 1903 * @buffer: The ring buffer to enable writes 1904 * 1905 * Note, multiple disables will need the same number of enables 1906 * to truely enable the writing (much like preempt_disable). 1907 */ 1908 void ring_buffer_record_enable(struct ring_buffer *buffer) 1909 { 1910 atomic_dec(&buffer->record_disabled); 1911 } 1912 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 1913 1914 /** 1915 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1916 * @buffer: The ring buffer to stop writes to. 1917 * @cpu: The CPU buffer to stop 1918 * 1919 * This prevents all writes to the buffer. Any attempt to write 1920 * to the buffer after this will fail and return NULL. 1921 * 1922 * The caller should call synchronize_sched() after this. 1923 */ 1924 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1925 { 1926 struct ring_buffer_per_cpu *cpu_buffer; 1927 1928 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1929 return; 1930 1931 cpu_buffer = buffer->buffers[cpu]; 1932 atomic_inc(&cpu_buffer->record_disabled); 1933 } 1934 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 1935 1936 /** 1937 * ring_buffer_record_enable_cpu - enable writes to the buffer 1938 * @buffer: The ring buffer to enable writes 1939 * @cpu: The CPU to enable. 1940 * 1941 * Note, multiple disables will need the same number of enables 1942 * to truely enable the writing (much like preempt_disable). 1943 */ 1944 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1945 { 1946 struct ring_buffer_per_cpu *cpu_buffer; 1947 1948 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1949 return; 1950 1951 cpu_buffer = buffer->buffers[cpu]; 1952 atomic_dec(&cpu_buffer->record_disabled); 1953 } 1954 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 1955 1956 /** 1957 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1958 * @buffer: The ring buffer 1959 * @cpu: The per CPU buffer to get the entries from. 1960 */ 1961 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1962 { 1963 struct ring_buffer_per_cpu *cpu_buffer; 1964 unsigned long ret; 1965 1966 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1967 return 0; 1968 1969 cpu_buffer = buffer->buffers[cpu]; 1970 ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun) 1971 - cpu_buffer->read; 1972 1973 return ret; 1974 } 1975 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 1976 1977 /** 1978 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1979 * @buffer: The ring buffer 1980 * @cpu: The per CPU buffer to get the number of overruns from 1981 */ 1982 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1983 { 1984 struct ring_buffer_per_cpu *cpu_buffer; 1985 unsigned long ret; 1986 1987 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1988 return 0; 1989 1990 cpu_buffer = buffer->buffers[cpu]; 1991 ret = cpu_buffer->overrun; 1992 1993 return ret; 1994 } 1995 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 1996 1997 /** 1998 * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped 1999 * @buffer: The ring buffer 2000 * @cpu: The per CPU buffer to get the number of overruns from 2001 */ 2002 unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu) 2003 { 2004 struct ring_buffer_per_cpu *cpu_buffer; 2005 unsigned long ret; 2006 2007 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2008 return 0; 2009 2010 cpu_buffer = buffer->buffers[cpu]; 2011 ret = cpu_buffer->nmi_dropped; 2012 2013 return ret; 2014 } 2015 EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu); 2016 2017 /** 2018 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2019 * @buffer: The ring buffer 2020 * @cpu: The per CPU buffer to get the number of overruns from 2021 */ 2022 unsigned long 2023 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2024 { 2025 struct ring_buffer_per_cpu *cpu_buffer; 2026 unsigned long ret; 2027 2028 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2029 return 0; 2030 2031 cpu_buffer = buffer->buffers[cpu]; 2032 ret = cpu_buffer->commit_overrun; 2033 2034 return ret; 2035 } 2036 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2037 2038 /** 2039 * ring_buffer_entries - get the number of entries in a buffer 2040 * @buffer: The ring buffer 2041 * 2042 * Returns the total number of entries in the ring buffer 2043 * (all CPU entries) 2044 */ 2045 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2046 { 2047 struct ring_buffer_per_cpu *cpu_buffer; 2048 unsigned long entries = 0; 2049 int cpu; 2050 2051 /* if you care about this being correct, lock the buffer */ 2052 for_each_buffer_cpu(buffer, cpu) { 2053 cpu_buffer = buffer->buffers[cpu]; 2054 entries += (local_read(&cpu_buffer->entries) - 2055 cpu_buffer->overrun) - cpu_buffer->read; 2056 } 2057 2058 return entries; 2059 } 2060 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2061 2062 /** 2063 * ring_buffer_overrun_cpu - get the number of overruns in buffer 2064 * @buffer: The ring buffer 2065 * 2066 * Returns the total number of overruns in the ring buffer 2067 * (all CPU entries) 2068 */ 2069 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2070 { 2071 struct ring_buffer_per_cpu *cpu_buffer; 2072 unsigned long overruns = 0; 2073 int cpu; 2074 2075 /* if you care about this being correct, lock the buffer */ 2076 for_each_buffer_cpu(buffer, cpu) { 2077 cpu_buffer = buffer->buffers[cpu]; 2078 overruns += cpu_buffer->overrun; 2079 } 2080 2081 return overruns; 2082 } 2083 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2084 2085 static void rb_iter_reset(struct ring_buffer_iter *iter) 2086 { 2087 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2088 2089 /* Iterator usage is expected to have record disabled */ 2090 if (list_empty(&cpu_buffer->reader_page->list)) { 2091 iter->head_page = cpu_buffer->head_page; 2092 iter->head = cpu_buffer->head_page->read; 2093 } else { 2094 iter->head_page = cpu_buffer->reader_page; 2095 iter->head = cpu_buffer->reader_page->read; 2096 } 2097 if (iter->head) 2098 iter->read_stamp = cpu_buffer->read_stamp; 2099 else 2100 iter->read_stamp = iter->head_page->page->time_stamp; 2101 } 2102 2103 /** 2104 * ring_buffer_iter_reset - reset an iterator 2105 * @iter: The iterator to reset 2106 * 2107 * Resets the iterator, so that it will start from the beginning 2108 * again. 2109 */ 2110 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2111 { 2112 struct ring_buffer_per_cpu *cpu_buffer; 2113 unsigned long flags; 2114 2115 if (!iter) 2116 return; 2117 2118 cpu_buffer = iter->cpu_buffer; 2119 2120 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2121 rb_iter_reset(iter); 2122 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2123 } 2124 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2125 2126 /** 2127 * ring_buffer_iter_empty - check if an iterator has no more to read 2128 * @iter: The iterator to check 2129 */ 2130 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2131 { 2132 struct ring_buffer_per_cpu *cpu_buffer; 2133 2134 cpu_buffer = iter->cpu_buffer; 2135 2136 return iter->head_page == cpu_buffer->commit_page && 2137 iter->head == rb_commit_index(cpu_buffer); 2138 } 2139 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2140 2141 static void 2142 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2143 struct ring_buffer_event *event) 2144 { 2145 u64 delta; 2146 2147 switch (event->type_len) { 2148 case RINGBUF_TYPE_PADDING: 2149 return; 2150 2151 case RINGBUF_TYPE_TIME_EXTEND: 2152 delta = event->array[0]; 2153 delta <<= TS_SHIFT; 2154 delta += event->time_delta; 2155 cpu_buffer->read_stamp += delta; 2156 return; 2157 2158 case RINGBUF_TYPE_TIME_STAMP: 2159 /* FIXME: not implemented */ 2160 return; 2161 2162 case RINGBUF_TYPE_DATA: 2163 cpu_buffer->read_stamp += event->time_delta; 2164 return; 2165 2166 default: 2167 BUG(); 2168 } 2169 return; 2170 } 2171 2172 static void 2173 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2174 struct ring_buffer_event *event) 2175 { 2176 u64 delta; 2177 2178 switch (event->type_len) { 2179 case RINGBUF_TYPE_PADDING: 2180 return; 2181 2182 case RINGBUF_TYPE_TIME_EXTEND: 2183 delta = event->array[0]; 2184 delta <<= TS_SHIFT; 2185 delta += event->time_delta; 2186 iter->read_stamp += delta; 2187 return; 2188 2189 case RINGBUF_TYPE_TIME_STAMP: 2190 /* FIXME: not implemented */ 2191 return; 2192 2193 case RINGBUF_TYPE_DATA: 2194 iter->read_stamp += event->time_delta; 2195 return; 2196 2197 default: 2198 BUG(); 2199 } 2200 return; 2201 } 2202 2203 static struct buffer_page * 2204 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2205 { 2206 struct buffer_page *reader = NULL; 2207 unsigned long flags; 2208 int nr_loops = 0; 2209 2210 local_irq_save(flags); 2211 __raw_spin_lock(&cpu_buffer->lock); 2212 2213 again: 2214 /* 2215 * This should normally only loop twice. But because the 2216 * start of the reader inserts an empty page, it causes 2217 * a case where we will loop three times. There should be no 2218 * reason to loop four times (that I know of). 2219 */ 2220 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2221 reader = NULL; 2222 goto out; 2223 } 2224 2225 reader = cpu_buffer->reader_page; 2226 2227 /* If there's more to read, return this page */ 2228 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2229 goto out; 2230 2231 /* Never should we have an index greater than the size */ 2232 if (RB_WARN_ON(cpu_buffer, 2233 cpu_buffer->reader_page->read > rb_page_size(reader))) 2234 goto out; 2235 2236 /* check if we caught up to the tail */ 2237 reader = NULL; 2238 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2239 goto out; 2240 2241 /* 2242 * Splice the empty reader page into the list around the head. 2243 * Reset the reader page to size zero. 2244 */ 2245 2246 reader = cpu_buffer->head_page; 2247 cpu_buffer->reader_page->list.next = reader->list.next; 2248 cpu_buffer->reader_page->list.prev = reader->list.prev; 2249 2250 local_set(&cpu_buffer->reader_page->write, 0); 2251 local_set(&cpu_buffer->reader_page->entries, 0); 2252 local_set(&cpu_buffer->reader_page->page->commit, 0); 2253 2254 /* Make the reader page now replace the head */ 2255 reader->list.prev->next = &cpu_buffer->reader_page->list; 2256 reader->list.next->prev = &cpu_buffer->reader_page->list; 2257 2258 /* 2259 * If the tail is on the reader, then we must set the head 2260 * to the inserted page, otherwise we set it one before. 2261 */ 2262 cpu_buffer->head_page = cpu_buffer->reader_page; 2263 2264 if (cpu_buffer->commit_page != reader) 2265 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2266 2267 /* Finally update the reader page to the new head */ 2268 cpu_buffer->reader_page = reader; 2269 rb_reset_reader_page(cpu_buffer); 2270 2271 goto again; 2272 2273 out: 2274 __raw_spin_unlock(&cpu_buffer->lock); 2275 local_irq_restore(flags); 2276 2277 return reader; 2278 } 2279 2280 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2281 { 2282 struct ring_buffer_event *event; 2283 struct buffer_page *reader; 2284 unsigned length; 2285 2286 reader = rb_get_reader_page(cpu_buffer); 2287 2288 /* This function should not be called when buffer is empty */ 2289 if (RB_WARN_ON(cpu_buffer, !reader)) 2290 return; 2291 2292 event = rb_reader_event(cpu_buffer); 2293 2294 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX 2295 || rb_discarded_event(event)) 2296 cpu_buffer->read++; 2297 2298 rb_update_read_stamp(cpu_buffer, event); 2299 2300 length = rb_event_length(event); 2301 cpu_buffer->reader_page->read += length; 2302 } 2303 2304 static void rb_advance_iter(struct ring_buffer_iter *iter) 2305 { 2306 struct ring_buffer *buffer; 2307 struct ring_buffer_per_cpu *cpu_buffer; 2308 struct ring_buffer_event *event; 2309 unsigned length; 2310 2311 cpu_buffer = iter->cpu_buffer; 2312 buffer = cpu_buffer->buffer; 2313 2314 /* 2315 * Check if we are at the end of the buffer. 2316 */ 2317 if (iter->head >= rb_page_size(iter->head_page)) { 2318 /* discarded commits can make the page empty */ 2319 if (iter->head_page == cpu_buffer->commit_page) 2320 return; 2321 rb_inc_iter(iter); 2322 return; 2323 } 2324 2325 event = rb_iter_head_event(iter); 2326 2327 length = rb_event_length(event); 2328 2329 /* 2330 * This should not be called to advance the header if we are 2331 * at the tail of the buffer. 2332 */ 2333 if (RB_WARN_ON(cpu_buffer, 2334 (iter->head_page == cpu_buffer->commit_page) && 2335 (iter->head + length > rb_commit_index(cpu_buffer)))) 2336 return; 2337 2338 rb_update_iter_read_stamp(iter, event); 2339 2340 iter->head += length; 2341 2342 /* check for end of page padding */ 2343 if ((iter->head >= rb_page_size(iter->head_page)) && 2344 (iter->head_page != cpu_buffer->commit_page)) 2345 rb_advance_iter(iter); 2346 } 2347 2348 static struct ring_buffer_event * 2349 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2350 { 2351 struct ring_buffer_per_cpu *cpu_buffer; 2352 struct ring_buffer_event *event; 2353 struct buffer_page *reader; 2354 int nr_loops = 0; 2355 2356 cpu_buffer = buffer->buffers[cpu]; 2357 2358 again: 2359 /* 2360 * We repeat when a timestamp is encountered. It is possible 2361 * to get multiple timestamps from an interrupt entering just 2362 * as one timestamp is about to be written, or from discarded 2363 * commits. The most that we can have is the number on a single page. 2364 */ 2365 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2366 return NULL; 2367 2368 reader = rb_get_reader_page(cpu_buffer); 2369 if (!reader) 2370 return NULL; 2371 2372 event = rb_reader_event(cpu_buffer); 2373 2374 switch (event->type_len) { 2375 case RINGBUF_TYPE_PADDING: 2376 if (rb_null_event(event)) 2377 RB_WARN_ON(cpu_buffer, 1); 2378 /* 2379 * Because the writer could be discarding every 2380 * event it creates (which would probably be bad) 2381 * if we were to go back to "again" then we may never 2382 * catch up, and will trigger the warn on, or lock 2383 * the box. Return the padding, and we will release 2384 * the current locks, and try again. 2385 */ 2386 rb_advance_reader(cpu_buffer); 2387 return event; 2388 2389 case RINGBUF_TYPE_TIME_EXTEND: 2390 /* Internal data, OK to advance */ 2391 rb_advance_reader(cpu_buffer); 2392 goto again; 2393 2394 case RINGBUF_TYPE_TIME_STAMP: 2395 /* FIXME: not implemented */ 2396 rb_advance_reader(cpu_buffer); 2397 goto again; 2398 2399 case RINGBUF_TYPE_DATA: 2400 if (ts) { 2401 *ts = cpu_buffer->read_stamp + event->time_delta; 2402 ring_buffer_normalize_time_stamp(buffer, 2403 cpu_buffer->cpu, ts); 2404 } 2405 return event; 2406 2407 default: 2408 BUG(); 2409 } 2410 2411 return NULL; 2412 } 2413 EXPORT_SYMBOL_GPL(ring_buffer_peek); 2414 2415 static struct ring_buffer_event * 2416 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2417 { 2418 struct ring_buffer *buffer; 2419 struct ring_buffer_per_cpu *cpu_buffer; 2420 struct ring_buffer_event *event; 2421 int nr_loops = 0; 2422 2423 if (ring_buffer_iter_empty(iter)) 2424 return NULL; 2425 2426 cpu_buffer = iter->cpu_buffer; 2427 buffer = cpu_buffer->buffer; 2428 2429 again: 2430 /* 2431 * We repeat when a timestamp is encountered. 2432 * We can get multiple timestamps by nested interrupts or also 2433 * if filtering is on (discarding commits). Since discarding 2434 * commits can be frequent we can get a lot of timestamps. 2435 * But we limit them by not adding timestamps if they begin 2436 * at the start of a page. 2437 */ 2438 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 2439 return NULL; 2440 2441 if (rb_per_cpu_empty(cpu_buffer)) 2442 return NULL; 2443 2444 event = rb_iter_head_event(iter); 2445 2446 switch (event->type_len) { 2447 case RINGBUF_TYPE_PADDING: 2448 if (rb_null_event(event)) { 2449 rb_inc_iter(iter); 2450 goto again; 2451 } 2452 rb_advance_iter(iter); 2453 return event; 2454 2455 case RINGBUF_TYPE_TIME_EXTEND: 2456 /* Internal data, OK to advance */ 2457 rb_advance_iter(iter); 2458 goto again; 2459 2460 case RINGBUF_TYPE_TIME_STAMP: 2461 /* FIXME: not implemented */ 2462 rb_advance_iter(iter); 2463 goto again; 2464 2465 case RINGBUF_TYPE_DATA: 2466 if (ts) { 2467 *ts = iter->read_stamp + event->time_delta; 2468 ring_buffer_normalize_time_stamp(buffer, 2469 cpu_buffer->cpu, ts); 2470 } 2471 return event; 2472 2473 default: 2474 BUG(); 2475 } 2476 2477 return NULL; 2478 } 2479 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 2480 2481 static inline int rb_ok_to_lock(void) 2482 { 2483 /* 2484 * If an NMI die dumps out the content of the ring buffer 2485 * do not grab locks. We also permanently disable the ring 2486 * buffer too. A one time deal is all you get from reading 2487 * the ring buffer from an NMI. 2488 */ 2489 if (likely(!in_nmi() && !oops_in_progress)) 2490 return 1; 2491 2492 tracing_off_permanent(); 2493 return 0; 2494 } 2495 2496 /** 2497 * ring_buffer_peek - peek at the next event to be read 2498 * @buffer: The ring buffer to read 2499 * @cpu: The cpu to peak at 2500 * @ts: The timestamp counter of this event. 2501 * 2502 * This will return the event that will be read next, but does 2503 * not consume the data. 2504 */ 2505 struct ring_buffer_event * 2506 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 2507 { 2508 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2509 struct ring_buffer_event *event; 2510 unsigned long flags; 2511 int dolock; 2512 2513 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2514 return NULL; 2515 2516 dolock = rb_ok_to_lock(); 2517 again: 2518 local_irq_save(flags); 2519 if (dolock) 2520 spin_lock(&cpu_buffer->reader_lock); 2521 event = rb_buffer_peek(buffer, cpu, ts); 2522 if (dolock) 2523 spin_unlock(&cpu_buffer->reader_lock); 2524 local_irq_restore(flags); 2525 2526 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2527 cpu_relax(); 2528 goto again; 2529 } 2530 2531 return event; 2532 } 2533 2534 /** 2535 * ring_buffer_iter_peek - peek at the next event to be read 2536 * @iter: The ring buffer iterator 2537 * @ts: The timestamp counter of this event. 2538 * 2539 * This will return the event that will be read next, but does 2540 * not increment the iterator. 2541 */ 2542 struct ring_buffer_event * 2543 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2544 { 2545 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2546 struct ring_buffer_event *event; 2547 unsigned long flags; 2548 2549 again: 2550 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2551 event = rb_iter_peek(iter, ts); 2552 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2553 2554 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2555 cpu_relax(); 2556 goto again; 2557 } 2558 2559 return event; 2560 } 2561 2562 /** 2563 * ring_buffer_consume - return an event and consume it 2564 * @buffer: The ring buffer to get the next event from 2565 * 2566 * Returns the next event in the ring buffer, and that event is consumed. 2567 * Meaning, that sequential reads will keep returning a different event, 2568 * and eventually empty the ring buffer if the producer is slower. 2569 */ 2570 struct ring_buffer_event * 2571 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 2572 { 2573 struct ring_buffer_per_cpu *cpu_buffer; 2574 struct ring_buffer_event *event = NULL; 2575 unsigned long flags; 2576 int dolock; 2577 2578 dolock = rb_ok_to_lock(); 2579 2580 again: 2581 /* might be called in atomic */ 2582 preempt_disable(); 2583 2584 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2585 goto out; 2586 2587 cpu_buffer = buffer->buffers[cpu]; 2588 local_irq_save(flags); 2589 if (dolock) 2590 spin_lock(&cpu_buffer->reader_lock); 2591 2592 event = rb_buffer_peek(buffer, cpu, ts); 2593 if (!event) 2594 goto out_unlock; 2595 2596 rb_advance_reader(cpu_buffer); 2597 2598 out_unlock: 2599 if (dolock) 2600 spin_unlock(&cpu_buffer->reader_lock); 2601 local_irq_restore(flags); 2602 2603 out: 2604 preempt_enable(); 2605 2606 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2607 cpu_relax(); 2608 goto again; 2609 } 2610 2611 return event; 2612 } 2613 EXPORT_SYMBOL_GPL(ring_buffer_consume); 2614 2615 /** 2616 * ring_buffer_read_start - start a non consuming read of the buffer 2617 * @buffer: The ring buffer to read from 2618 * @cpu: The cpu buffer to iterate over 2619 * 2620 * This starts up an iteration through the buffer. It also disables 2621 * the recording to the buffer until the reading is finished. 2622 * This prevents the reading from being corrupted. This is not 2623 * a consuming read, so a producer is not expected. 2624 * 2625 * Must be paired with ring_buffer_finish. 2626 */ 2627 struct ring_buffer_iter * 2628 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 2629 { 2630 struct ring_buffer_per_cpu *cpu_buffer; 2631 struct ring_buffer_iter *iter; 2632 unsigned long flags; 2633 2634 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2635 return NULL; 2636 2637 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 2638 if (!iter) 2639 return NULL; 2640 2641 cpu_buffer = buffer->buffers[cpu]; 2642 2643 iter->cpu_buffer = cpu_buffer; 2644 2645 atomic_inc(&cpu_buffer->record_disabled); 2646 synchronize_sched(); 2647 2648 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2649 __raw_spin_lock(&cpu_buffer->lock); 2650 rb_iter_reset(iter); 2651 __raw_spin_unlock(&cpu_buffer->lock); 2652 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2653 2654 return iter; 2655 } 2656 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 2657 2658 /** 2659 * ring_buffer_finish - finish reading the iterator of the buffer 2660 * @iter: The iterator retrieved by ring_buffer_start 2661 * 2662 * This re-enables the recording to the buffer, and frees the 2663 * iterator. 2664 */ 2665 void 2666 ring_buffer_read_finish(struct ring_buffer_iter *iter) 2667 { 2668 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2669 2670 atomic_dec(&cpu_buffer->record_disabled); 2671 kfree(iter); 2672 } 2673 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 2674 2675 /** 2676 * ring_buffer_read - read the next item in the ring buffer by the iterator 2677 * @iter: The ring buffer iterator 2678 * @ts: The time stamp of the event read. 2679 * 2680 * This reads the next event in the ring buffer and increments the iterator. 2681 */ 2682 struct ring_buffer_event * 2683 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 2684 { 2685 struct ring_buffer_event *event; 2686 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2687 unsigned long flags; 2688 2689 again: 2690 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2691 event = rb_iter_peek(iter, ts); 2692 if (!event) 2693 goto out; 2694 2695 rb_advance_iter(iter); 2696 out: 2697 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2698 2699 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 2700 cpu_relax(); 2701 goto again; 2702 } 2703 2704 return event; 2705 } 2706 EXPORT_SYMBOL_GPL(ring_buffer_read); 2707 2708 /** 2709 * ring_buffer_size - return the size of the ring buffer (in bytes) 2710 * @buffer: The ring buffer. 2711 */ 2712 unsigned long ring_buffer_size(struct ring_buffer *buffer) 2713 { 2714 return BUF_PAGE_SIZE * buffer->pages; 2715 } 2716 EXPORT_SYMBOL_GPL(ring_buffer_size); 2717 2718 static void 2719 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 2720 { 2721 cpu_buffer->head_page 2722 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 2723 local_set(&cpu_buffer->head_page->write, 0); 2724 local_set(&cpu_buffer->head_page->entries, 0); 2725 local_set(&cpu_buffer->head_page->page->commit, 0); 2726 2727 cpu_buffer->head_page->read = 0; 2728 2729 cpu_buffer->tail_page = cpu_buffer->head_page; 2730 cpu_buffer->commit_page = cpu_buffer->head_page; 2731 2732 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2733 local_set(&cpu_buffer->reader_page->write, 0); 2734 local_set(&cpu_buffer->reader_page->entries, 0); 2735 local_set(&cpu_buffer->reader_page->page->commit, 0); 2736 cpu_buffer->reader_page->read = 0; 2737 2738 cpu_buffer->nmi_dropped = 0; 2739 cpu_buffer->commit_overrun = 0; 2740 cpu_buffer->overrun = 0; 2741 cpu_buffer->read = 0; 2742 local_set(&cpu_buffer->entries, 0); 2743 local_set(&cpu_buffer->committing, 0); 2744 local_set(&cpu_buffer->commits, 0); 2745 2746 cpu_buffer->write_stamp = 0; 2747 cpu_buffer->read_stamp = 0; 2748 } 2749 2750 /** 2751 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 2752 * @buffer: The ring buffer to reset a per cpu buffer of 2753 * @cpu: The CPU buffer to be reset 2754 */ 2755 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 2756 { 2757 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2758 unsigned long flags; 2759 2760 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2761 return; 2762 2763 atomic_inc(&cpu_buffer->record_disabled); 2764 2765 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2766 2767 __raw_spin_lock(&cpu_buffer->lock); 2768 2769 rb_reset_cpu(cpu_buffer); 2770 2771 __raw_spin_unlock(&cpu_buffer->lock); 2772 2773 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2774 2775 atomic_dec(&cpu_buffer->record_disabled); 2776 } 2777 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 2778 2779 /** 2780 * ring_buffer_reset - reset a ring buffer 2781 * @buffer: The ring buffer to reset all cpu buffers 2782 */ 2783 void ring_buffer_reset(struct ring_buffer *buffer) 2784 { 2785 int cpu; 2786 2787 for_each_buffer_cpu(buffer, cpu) 2788 ring_buffer_reset_cpu(buffer, cpu); 2789 } 2790 EXPORT_SYMBOL_GPL(ring_buffer_reset); 2791 2792 /** 2793 * rind_buffer_empty - is the ring buffer empty? 2794 * @buffer: The ring buffer to test 2795 */ 2796 int ring_buffer_empty(struct ring_buffer *buffer) 2797 { 2798 struct ring_buffer_per_cpu *cpu_buffer; 2799 unsigned long flags; 2800 int dolock; 2801 int cpu; 2802 int ret; 2803 2804 dolock = rb_ok_to_lock(); 2805 2806 /* yes this is racy, but if you don't like the race, lock the buffer */ 2807 for_each_buffer_cpu(buffer, cpu) { 2808 cpu_buffer = buffer->buffers[cpu]; 2809 local_irq_save(flags); 2810 if (dolock) 2811 spin_lock(&cpu_buffer->reader_lock); 2812 ret = rb_per_cpu_empty(cpu_buffer); 2813 if (dolock) 2814 spin_unlock(&cpu_buffer->reader_lock); 2815 local_irq_restore(flags); 2816 2817 if (!ret) 2818 return 0; 2819 } 2820 2821 return 1; 2822 } 2823 EXPORT_SYMBOL_GPL(ring_buffer_empty); 2824 2825 /** 2826 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2827 * @buffer: The ring buffer 2828 * @cpu: The CPU buffer to test 2829 */ 2830 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2831 { 2832 struct ring_buffer_per_cpu *cpu_buffer; 2833 unsigned long flags; 2834 int dolock; 2835 int ret; 2836 2837 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2838 return 1; 2839 2840 dolock = rb_ok_to_lock(); 2841 2842 cpu_buffer = buffer->buffers[cpu]; 2843 local_irq_save(flags); 2844 if (dolock) 2845 spin_lock(&cpu_buffer->reader_lock); 2846 ret = rb_per_cpu_empty(cpu_buffer); 2847 if (dolock) 2848 spin_unlock(&cpu_buffer->reader_lock); 2849 local_irq_restore(flags); 2850 2851 return ret; 2852 } 2853 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 2854 2855 /** 2856 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2857 * @buffer_a: One buffer to swap with 2858 * @buffer_b: The other buffer to swap with 2859 * 2860 * This function is useful for tracers that want to take a "snapshot" 2861 * of a CPU buffer and has another back up buffer lying around. 2862 * it is expected that the tracer handles the cpu buffer not being 2863 * used at the moment. 2864 */ 2865 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2866 struct ring_buffer *buffer_b, int cpu) 2867 { 2868 struct ring_buffer_per_cpu *cpu_buffer_a; 2869 struct ring_buffer_per_cpu *cpu_buffer_b; 2870 int ret = -EINVAL; 2871 2872 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 2873 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 2874 goto out; 2875 2876 /* At least make sure the two buffers are somewhat the same */ 2877 if (buffer_a->pages != buffer_b->pages) 2878 goto out; 2879 2880 ret = -EAGAIN; 2881 2882 if (ring_buffer_flags != RB_BUFFERS_ON) 2883 goto out; 2884 2885 if (atomic_read(&buffer_a->record_disabled)) 2886 goto out; 2887 2888 if (atomic_read(&buffer_b->record_disabled)) 2889 goto out; 2890 2891 cpu_buffer_a = buffer_a->buffers[cpu]; 2892 cpu_buffer_b = buffer_b->buffers[cpu]; 2893 2894 if (atomic_read(&cpu_buffer_a->record_disabled)) 2895 goto out; 2896 2897 if (atomic_read(&cpu_buffer_b->record_disabled)) 2898 goto out; 2899 2900 /* 2901 * We can't do a synchronize_sched here because this 2902 * function can be called in atomic context. 2903 * Normally this will be called from the same CPU as cpu. 2904 * If not it's up to the caller to protect this. 2905 */ 2906 atomic_inc(&cpu_buffer_a->record_disabled); 2907 atomic_inc(&cpu_buffer_b->record_disabled); 2908 2909 buffer_a->buffers[cpu] = cpu_buffer_b; 2910 buffer_b->buffers[cpu] = cpu_buffer_a; 2911 2912 cpu_buffer_b->buffer = buffer_a; 2913 cpu_buffer_a->buffer = buffer_b; 2914 2915 atomic_dec(&cpu_buffer_a->record_disabled); 2916 atomic_dec(&cpu_buffer_b->record_disabled); 2917 2918 ret = 0; 2919 out: 2920 return ret; 2921 } 2922 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 2923 2924 /** 2925 * ring_buffer_alloc_read_page - allocate a page to read from buffer 2926 * @buffer: the buffer to allocate for. 2927 * 2928 * This function is used in conjunction with ring_buffer_read_page. 2929 * When reading a full page from the ring buffer, these functions 2930 * can be used to speed up the process. The calling function should 2931 * allocate a few pages first with this function. Then when it 2932 * needs to get pages from the ring buffer, it passes the result 2933 * of this function into ring_buffer_read_page, which will swap 2934 * the page that was allocated, with the read page of the buffer. 2935 * 2936 * Returns: 2937 * The page allocated, or NULL on error. 2938 */ 2939 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 2940 { 2941 struct buffer_data_page *bpage; 2942 unsigned long addr; 2943 2944 addr = __get_free_page(GFP_KERNEL); 2945 if (!addr) 2946 return NULL; 2947 2948 bpage = (void *)addr; 2949 2950 rb_init_page(bpage); 2951 2952 return bpage; 2953 } 2954 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 2955 2956 /** 2957 * ring_buffer_free_read_page - free an allocated read page 2958 * @buffer: the buffer the page was allocate for 2959 * @data: the page to free 2960 * 2961 * Free a page allocated from ring_buffer_alloc_read_page. 2962 */ 2963 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 2964 { 2965 free_page((unsigned long)data); 2966 } 2967 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 2968 2969 /** 2970 * ring_buffer_read_page - extract a page from the ring buffer 2971 * @buffer: buffer to extract from 2972 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 2973 * @len: amount to extract 2974 * @cpu: the cpu of the buffer to extract 2975 * @full: should the extraction only happen when the page is full. 2976 * 2977 * This function will pull out a page from the ring buffer and consume it. 2978 * @data_page must be the address of the variable that was returned 2979 * from ring_buffer_alloc_read_page. This is because the page might be used 2980 * to swap with a page in the ring buffer. 2981 * 2982 * for example: 2983 * rpage = ring_buffer_alloc_read_page(buffer); 2984 * if (!rpage) 2985 * return error; 2986 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 2987 * if (ret >= 0) 2988 * process_page(rpage, ret); 2989 * 2990 * When @full is set, the function will not return true unless 2991 * the writer is off the reader page. 2992 * 2993 * Note: it is up to the calling functions to handle sleeps and wakeups. 2994 * The ring buffer can be used anywhere in the kernel and can not 2995 * blindly call wake_up. The layer that uses the ring buffer must be 2996 * responsible for that. 2997 * 2998 * Returns: 2999 * >=0 if data has been transferred, returns the offset of consumed data. 3000 * <0 if no data has been transferred. 3001 */ 3002 int ring_buffer_read_page(struct ring_buffer *buffer, 3003 void **data_page, size_t len, int cpu, int full) 3004 { 3005 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3006 struct ring_buffer_event *event; 3007 struct buffer_data_page *bpage; 3008 struct buffer_page *reader; 3009 unsigned long flags; 3010 unsigned int commit; 3011 unsigned int read; 3012 u64 save_timestamp; 3013 int ret = -1; 3014 3015 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3016 goto out; 3017 3018 /* 3019 * If len is not big enough to hold the page header, then 3020 * we can not copy anything. 3021 */ 3022 if (len <= BUF_PAGE_HDR_SIZE) 3023 goto out; 3024 3025 len -= BUF_PAGE_HDR_SIZE; 3026 3027 if (!data_page) 3028 goto out; 3029 3030 bpage = *data_page; 3031 if (!bpage) 3032 goto out; 3033 3034 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3035 3036 reader = rb_get_reader_page(cpu_buffer); 3037 if (!reader) 3038 goto out_unlock; 3039 3040 event = rb_reader_event(cpu_buffer); 3041 3042 read = reader->read; 3043 commit = rb_page_commit(reader); 3044 3045 /* 3046 * If this page has been partially read or 3047 * if len is not big enough to read the rest of the page or 3048 * a writer is still on the page, then 3049 * we must copy the data from the page to the buffer. 3050 * Otherwise, we can simply swap the page with the one passed in. 3051 */ 3052 if (read || (len < (commit - read)) || 3053 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3054 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3055 unsigned int rpos = read; 3056 unsigned int pos = 0; 3057 unsigned int size; 3058 3059 if (full) 3060 goto out_unlock; 3061 3062 if (len > (commit - read)) 3063 len = (commit - read); 3064 3065 size = rb_event_length(event); 3066 3067 if (len < size) 3068 goto out_unlock; 3069 3070 /* save the current timestamp, since the user will need it */ 3071 save_timestamp = cpu_buffer->read_stamp; 3072 3073 /* Need to copy one event at a time */ 3074 do { 3075 memcpy(bpage->data + pos, rpage->data + rpos, size); 3076 3077 len -= size; 3078 3079 rb_advance_reader(cpu_buffer); 3080 rpos = reader->read; 3081 pos += size; 3082 3083 event = rb_reader_event(cpu_buffer); 3084 size = rb_event_length(event); 3085 } while (len > size); 3086 3087 /* update bpage */ 3088 local_set(&bpage->commit, pos); 3089 bpage->time_stamp = save_timestamp; 3090 3091 /* we copied everything to the beginning */ 3092 read = 0; 3093 } else { 3094 /* update the entry counter */ 3095 cpu_buffer->read += local_read(&reader->entries); 3096 3097 /* swap the pages */ 3098 rb_init_page(bpage); 3099 bpage = reader->page; 3100 reader->page = *data_page; 3101 local_set(&reader->write, 0); 3102 local_set(&reader->entries, 0); 3103 reader->read = 0; 3104 *data_page = bpage; 3105 } 3106 ret = read; 3107 3108 out_unlock: 3109 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3110 3111 out: 3112 return ret; 3113 } 3114 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3115 3116 #ifdef CONFIG_TRACING 3117 static ssize_t 3118 rb_simple_read(struct file *filp, char __user *ubuf, 3119 size_t cnt, loff_t *ppos) 3120 { 3121 unsigned long *p = filp->private_data; 3122 char buf[64]; 3123 int r; 3124 3125 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3126 r = sprintf(buf, "permanently disabled\n"); 3127 else 3128 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3129 3130 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3131 } 3132 3133 static ssize_t 3134 rb_simple_write(struct file *filp, const char __user *ubuf, 3135 size_t cnt, loff_t *ppos) 3136 { 3137 unsigned long *p = filp->private_data; 3138 char buf[64]; 3139 unsigned long val; 3140 int ret; 3141 3142 if (cnt >= sizeof(buf)) 3143 return -EINVAL; 3144 3145 if (copy_from_user(&buf, ubuf, cnt)) 3146 return -EFAULT; 3147 3148 buf[cnt] = 0; 3149 3150 ret = strict_strtoul(buf, 10, &val); 3151 if (ret < 0) 3152 return ret; 3153 3154 if (val) 3155 set_bit(RB_BUFFERS_ON_BIT, p); 3156 else 3157 clear_bit(RB_BUFFERS_ON_BIT, p); 3158 3159 (*ppos)++; 3160 3161 return cnt; 3162 } 3163 3164 static const struct file_operations rb_simple_fops = { 3165 .open = tracing_open_generic, 3166 .read = rb_simple_read, 3167 .write = rb_simple_write, 3168 }; 3169 3170 3171 static __init int rb_init_debugfs(void) 3172 { 3173 struct dentry *d_tracer; 3174 3175 d_tracer = tracing_init_dentry(); 3176 3177 trace_create_file("tracing_on", 0644, d_tracer, 3178 &ring_buffer_flags, &rb_simple_fops); 3179 3180 return 0; 3181 } 3182 3183 fs_initcall(rb_init_debugfs); 3184 #endif 3185 3186 #ifdef CONFIG_HOTPLUG_CPU 3187 static int rb_cpu_notify(struct notifier_block *self, 3188 unsigned long action, void *hcpu) 3189 { 3190 struct ring_buffer *buffer = 3191 container_of(self, struct ring_buffer, cpu_notify); 3192 long cpu = (long)hcpu; 3193 3194 switch (action) { 3195 case CPU_UP_PREPARE: 3196 case CPU_UP_PREPARE_FROZEN: 3197 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3198 return NOTIFY_OK; 3199 3200 buffer->buffers[cpu] = 3201 rb_allocate_cpu_buffer(buffer, cpu); 3202 if (!buffer->buffers[cpu]) { 3203 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3204 cpu); 3205 return NOTIFY_OK; 3206 } 3207 smp_wmb(); 3208 cpumask_set_cpu(cpu, buffer->cpumask); 3209 break; 3210 case CPU_DOWN_PREPARE: 3211 case CPU_DOWN_PREPARE_FROZEN: 3212 /* 3213 * Do nothing. 3214 * If we were to free the buffer, then the user would 3215 * lose any trace that was in the buffer. 3216 */ 3217 break; 3218 default: 3219 break; 3220 } 3221 return NOTIFY_OK; 3222 } 3223 #endif 3224