1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/slab.h> 18 #include <linux/init.h> 19 #include <linux/hash.h> 20 #include <linux/list.h> 21 #include <linux/cpu.h> 22 #include <linux/fs.h> 23 24 #include <asm/local.h> 25 #include "trace.h" 26 27 /* 28 * The ring buffer header is special. We must manually up keep it. 29 */ 30 int ring_buffer_print_entry_header(struct trace_seq *s) 31 { 32 int ret; 33 34 ret = trace_seq_printf(s, "# compressed entry header\n"); 35 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 36 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 37 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 38 ret = trace_seq_printf(s, "\n"); 39 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 40 RINGBUF_TYPE_PADDING); 41 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 42 RINGBUF_TYPE_TIME_EXTEND); 43 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 44 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 45 46 return ret; 47 } 48 49 /* 50 * The ring buffer is made up of a list of pages. A separate list of pages is 51 * allocated for each CPU. A writer may only write to a buffer that is 52 * associated with the CPU it is currently executing on. A reader may read 53 * from any per cpu buffer. 54 * 55 * The reader is special. For each per cpu buffer, the reader has its own 56 * reader page. When a reader has read the entire reader page, this reader 57 * page is swapped with another page in the ring buffer. 58 * 59 * Now, as long as the writer is off the reader page, the reader can do what 60 * ever it wants with that page. The writer will never write to that page 61 * again (as long as it is out of the ring buffer). 62 * 63 * Here's some silly ASCII art. 64 * 65 * +------+ 66 * |reader| RING BUFFER 67 * |page | 68 * +------+ +---+ +---+ +---+ 69 * | |-->| |-->| | 70 * +---+ +---+ +---+ 71 * ^ | 72 * | | 73 * +---------------+ 74 * 75 * 76 * +------+ 77 * |reader| RING BUFFER 78 * |page |------------------v 79 * +------+ +---+ +---+ +---+ 80 * | |-->| |-->| | 81 * +---+ +---+ +---+ 82 * ^ | 83 * | | 84 * +---------------+ 85 * 86 * 87 * +------+ 88 * |reader| RING BUFFER 89 * |page |------------------v 90 * +------+ +---+ +---+ +---+ 91 * ^ | |-->| |-->| | 92 * | +---+ +---+ +---+ 93 * | | 94 * | | 95 * +------------------------------+ 96 * 97 * 98 * +------+ 99 * |buffer| RING BUFFER 100 * |page |------------------v 101 * +------+ +---+ +---+ +---+ 102 * ^ | | | |-->| | 103 * | New +---+ +---+ +---+ 104 * | Reader------^ | 105 * | page | 106 * +------------------------------+ 107 * 108 * 109 * After we make this swap, the reader can hand this page off to the splice 110 * code and be done with it. It can even allocate a new page if it needs to 111 * and swap that into the ring buffer. 112 * 113 * We will be using cmpxchg soon to make all this lockless. 114 * 115 */ 116 117 /* 118 * A fast way to enable or disable all ring buffers is to 119 * call tracing_on or tracing_off. Turning off the ring buffers 120 * prevents all ring buffers from being recorded to. 121 * Turning this switch on, makes it OK to write to the 122 * ring buffer, if the ring buffer is enabled itself. 123 * 124 * There's three layers that must be on in order to write 125 * to the ring buffer. 126 * 127 * 1) This global flag must be set. 128 * 2) The ring buffer must be enabled for recording. 129 * 3) The per cpu buffer must be enabled for recording. 130 * 131 * In case of an anomaly, this global flag has a bit set that 132 * will permantly disable all ring buffers. 133 */ 134 135 /* 136 * Global flag to disable all recording to ring buffers 137 * This has two bits: ON, DISABLED 138 * 139 * ON DISABLED 140 * ---- ---------- 141 * 0 0 : ring buffers are off 142 * 1 0 : ring buffers are on 143 * X 1 : ring buffers are permanently disabled 144 */ 145 146 enum { 147 RB_BUFFERS_ON_BIT = 0, 148 RB_BUFFERS_DISABLED_BIT = 1, 149 }; 150 151 enum { 152 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 153 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 154 }; 155 156 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 157 158 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 159 160 /** 161 * tracing_on - enable all tracing buffers 162 * 163 * This function enables all tracing buffers that may have been 164 * disabled with tracing_off. 165 */ 166 void tracing_on(void) 167 { 168 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 169 } 170 EXPORT_SYMBOL_GPL(tracing_on); 171 172 /** 173 * tracing_off - turn off all tracing buffers 174 * 175 * This function stops all tracing buffers from recording data. 176 * It does not disable any overhead the tracers themselves may 177 * be causing. This function simply causes all recording to 178 * the ring buffers to fail. 179 */ 180 void tracing_off(void) 181 { 182 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 183 } 184 EXPORT_SYMBOL_GPL(tracing_off); 185 186 /** 187 * tracing_off_permanent - permanently disable ring buffers 188 * 189 * This function, once called, will disable all ring buffers 190 * permanently. 191 */ 192 void tracing_off_permanent(void) 193 { 194 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 195 } 196 197 /** 198 * tracing_is_on - show state of ring buffers enabled 199 */ 200 int tracing_is_on(void) 201 { 202 return ring_buffer_flags == RB_BUFFERS_ON; 203 } 204 EXPORT_SYMBOL_GPL(tracing_is_on); 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) 212 # define RB_FORCE_8BYTE_ALIGNMENT 0 213 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 214 #else 215 # define RB_FORCE_8BYTE_ALIGNMENT 1 216 # define RB_ARCH_ALIGNMENT 8U 217 #endif 218 219 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 220 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 221 222 enum { 223 RB_LEN_TIME_EXTEND = 8, 224 RB_LEN_TIME_STAMP = 16, 225 }; 226 227 static inline int rb_null_event(struct ring_buffer_event *event) 228 { 229 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 230 } 231 232 static void rb_event_set_padding(struct ring_buffer_event *event) 233 { 234 /* padding has a NULL time_delta */ 235 event->type_len = RINGBUF_TYPE_PADDING; 236 event->time_delta = 0; 237 } 238 239 static unsigned 240 rb_event_data_length(struct ring_buffer_event *event) 241 { 242 unsigned length; 243 244 if (event->type_len) 245 length = event->type_len * RB_ALIGNMENT; 246 else 247 length = event->array[0]; 248 return length + RB_EVNT_HDR_SIZE; 249 } 250 251 /* inline for ring buffer fast paths */ 252 static unsigned 253 rb_event_length(struct ring_buffer_event *event) 254 { 255 switch (event->type_len) { 256 case RINGBUF_TYPE_PADDING: 257 if (rb_null_event(event)) 258 /* undefined */ 259 return -1; 260 return event->array[0] + RB_EVNT_HDR_SIZE; 261 262 case RINGBUF_TYPE_TIME_EXTEND: 263 return RB_LEN_TIME_EXTEND; 264 265 case RINGBUF_TYPE_TIME_STAMP: 266 return RB_LEN_TIME_STAMP; 267 268 case RINGBUF_TYPE_DATA: 269 return rb_event_data_length(event); 270 default: 271 BUG(); 272 } 273 /* not hit */ 274 return 0; 275 } 276 277 /** 278 * ring_buffer_event_length - return the length of the event 279 * @event: the event to get the length of 280 */ 281 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 282 { 283 unsigned length = rb_event_length(event); 284 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 285 return length; 286 length -= RB_EVNT_HDR_SIZE; 287 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 288 length -= sizeof(event->array[0]); 289 return length; 290 } 291 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 292 293 /* inline for ring buffer fast paths */ 294 static void * 295 rb_event_data(struct ring_buffer_event *event) 296 { 297 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 298 /* If length is in len field, then array[0] has the data */ 299 if (event->type_len) 300 return (void *)&event->array[0]; 301 /* Otherwise length is in array[0] and array[1] has the data */ 302 return (void *)&event->array[1]; 303 } 304 305 /** 306 * ring_buffer_event_data - return the data of the event 307 * @event: the event to get the data from 308 */ 309 void *ring_buffer_event_data(struct ring_buffer_event *event) 310 { 311 return rb_event_data(event); 312 } 313 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 314 315 #define for_each_buffer_cpu(buffer, cpu) \ 316 for_each_cpu(cpu, buffer->cpumask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 struct buffer_data_page { 323 u64 time_stamp; /* page time stamp */ 324 local_t commit; /* write committed index */ 325 unsigned char data[]; /* data of buffer page */ 326 }; 327 328 /* 329 * Note, the buffer_page list must be first. The buffer pages 330 * are allocated in cache lines, which means that each buffer 331 * page will be at the beginning of a cache line, and thus 332 * the least significant bits will be zero. We use this to 333 * add flags in the list struct pointers, to make the ring buffer 334 * lockless. 335 */ 336 struct buffer_page { 337 struct list_head list; /* list of buffer pages */ 338 local_t write; /* index for next write */ 339 unsigned read; /* index for next read */ 340 local_t entries; /* entries on this page */ 341 struct buffer_data_page *page; /* Actual data page */ 342 }; 343 344 /* 345 * The buffer page counters, write and entries, must be reset 346 * atomically when crossing page boundaries. To synchronize this 347 * update, two counters are inserted into the number. One is 348 * the actual counter for the write position or count on the page. 349 * 350 * The other is a counter of updaters. Before an update happens 351 * the update partition of the counter is incremented. This will 352 * allow the updater to update the counter atomically. 353 * 354 * The counter is 20 bits, and the state data is 12. 355 */ 356 #define RB_WRITE_MASK 0xfffff 357 #define RB_WRITE_INTCNT (1 << 20) 358 359 static void rb_init_page(struct buffer_data_page *bpage) 360 { 361 local_set(&bpage->commit, 0); 362 } 363 364 /** 365 * ring_buffer_page_len - the size of data on the page. 366 * @page: The page to read 367 * 368 * Returns the amount of data on the page, including buffer page header. 369 */ 370 size_t ring_buffer_page_len(void *page) 371 { 372 return local_read(&((struct buffer_data_page *)page)->commit) 373 + BUF_PAGE_HDR_SIZE; 374 } 375 376 /* 377 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 378 * this issue out. 379 */ 380 static void free_buffer_page(struct buffer_page *bpage) 381 { 382 free_page((unsigned long)bpage->page); 383 kfree(bpage); 384 } 385 386 /* 387 * We need to fit the time_stamp delta into 27 bits. 388 */ 389 static inline int test_time_stamp(u64 delta) 390 { 391 if (delta & TS_DELTA_TEST) 392 return 1; 393 return 0; 394 } 395 396 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 397 398 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 399 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 400 401 /* Max number of timestamps that can fit on a page */ 402 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 403 404 int ring_buffer_print_page_header(struct trace_seq *s) 405 { 406 struct buffer_data_page field; 407 int ret; 408 409 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 410 "offset:0;\tsize:%u;\tsigned:%u;\n", 411 (unsigned int)sizeof(field.time_stamp), 412 (unsigned int)is_signed_type(u64)); 413 414 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 415 "offset:%u;\tsize:%u;\tsigned:%u;\n", 416 (unsigned int)offsetof(typeof(field), commit), 417 (unsigned int)sizeof(field.commit), 418 (unsigned int)is_signed_type(long)); 419 420 ret = trace_seq_printf(s, "\tfield: char data;\t" 421 "offset:%u;\tsize:%u;\tsigned:%u;\n", 422 (unsigned int)offsetof(typeof(field), data), 423 (unsigned int)BUF_PAGE_SIZE, 424 (unsigned int)is_signed_type(char)); 425 426 return ret; 427 } 428 429 /* 430 * head_page == tail_page && head == tail then buffer is empty. 431 */ 432 struct ring_buffer_per_cpu { 433 int cpu; 434 struct ring_buffer *buffer; 435 spinlock_t reader_lock; /* serialize readers */ 436 arch_spinlock_t lock; 437 struct lock_class_key lock_key; 438 struct list_head *pages; 439 struct buffer_page *head_page; /* read from head */ 440 struct buffer_page *tail_page; /* write to tail */ 441 struct buffer_page *commit_page; /* committed pages */ 442 struct buffer_page *reader_page; 443 local_t commit_overrun; 444 local_t overrun; 445 local_t entries; 446 local_t committing; 447 local_t commits; 448 unsigned long read; 449 u64 write_stamp; 450 u64 read_stamp; 451 atomic_t record_disabled; 452 }; 453 454 struct ring_buffer { 455 unsigned pages; 456 unsigned flags; 457 int cpus; 458 atomic_t record_disabled; 459 cpumask_var_t cpumask; 460 461 struct lock_class_key *reader_lock_key; 462 463 struct mutex mutex; 464 465 struct ring_buffer_per_cpu **buffers; 466 467 #ifdef CONFIG_HOTPLUG_CPU 468 struct notifier_block cpu_notify; 469 #endif 470 u64 (*clock)(void); 471 }; 472 473 struct ring_buffer_iter { 474 struct ring_buffer_per_cpu *cpu_buffer; 475 unsigned long head; 476 struct buffer_page *head_page; 477 struct buffer_page *cache_reader_page; 478 unsigned long cache_read; 479 u64 read_stamp; 480 }; 481 482 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 483 #define RB_WARN_ON(b, cond) \ 484 ({ \ 485 int _____ret = unlikely(cond); \ 486 if (_____ret) { \ 487 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 488 struct ring_buffer_per_cpu *__b = \ 489 (void *)b; \ 490 atomic_inc(&__b->buffer->record_disabled); \ 491 } else \ 492 atomic_inc(&b->record_disabled); \ 493 WARN_ON(1); \ 494 } \ 495 _____ret; \ 496 }) 497 498 /* Up this if you want to test the TIME_EXTENTS and normalization */ 499 #define DEBUG_SHIFT 0 500 501 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 502 { 503 /* shift to debug/test normalization and TIME_EXTENTS */ 504 return buffer->clock() << DEBUG_SHIFT; 505 } 506 507 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 508 { 509 u64 time; 510 511 preempt_disable_notrace(); 512 time = rb_time_stamp(buffer); 513 preempt_enable_no_resched_notrace(); 514 515 return time; 516 } 517 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 518 519 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 520 int cpu, u64 *ts) 521 { 522 /* Just stupid testing the normalize function and deltas */ 523 *ts >>= DEBUG_SHIFT; 524 } 525 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 526 527 /* 528 * Making the ring buffer lockless makes things tricky. 529 * Although writes only happen on the CPU that they are on, 530 * and they only need to worry about interrupts. Reads can 531 * happen on any CPU. 532 * 533 * The reader page is always off the ring buffer, but when the 534 * reader finishes with a page, it needs to swap its page with 535 * a new one from the buffer. The reader needs to take from 536 * the head (writes go to the tail). But if a writer is in overwrite 537 * mode and wraps, it must push the head page forward. 538 * 539 * Here lies the problem. 540 * 541 * The reader must be careful to replace only the head page, and 542 * not another one. As described at the top of the file in the 543 * ASCII art, the reader sets its old page to point to the next 544 * page after head. It then sets the page after head to point to 545 * the old reader page. But if the writer moves the head page 546 * during this operation, the reader could end up with the tail. 547 * 548 * We use cmpxchg to help prevent this race. We also do something 549 * special with the page before head. We set the LSB to 1. 550 * 551 * When the writer must push the page forward, it will clear the 552 * bit that points to the head page, move the head, and then set 553 * the bit that points to the new head page. 554 * 555 * We also don't want an interrupt coming in and moving the head 556 * page on another writer. Thus we use the second LSB to catch 557 * that too. Thus: 558 * 559 * head->list->prev->next bit 1 bit 0 560 * ------- ------- 561 * Normal page 0 0 562 * Points to head page 0 1 563 * New head page 1 0 564 * 565 * Note we can not trust the prev pointer of the head page, because: 566 * 567 * +----+ +-----+ +-----+ 568 * | |------>| T |---X--->| N | 569 * | |<------| | | | 570 * +----+ +-----+ +-----+ 571 * ^ ^ | 572 * | +-----+ | | 573 * +----------| R |----------+ | 574 * | |<-----------+ 575 * +-----+ 576 * 577 * Key: ---X--> HEAD flag set in pointer 578 * T Tail page 579 * R Reader page 580 * N Next page 581 * 582 * (see __rb_reserve_next() to see where this happens) 583 * 584 * What the above shows is that the reader just swapped out 585 * the reader page with a page in the buffer, but before it 586 * could make the new header point back to the new page added 587 * it was preempted by a writer. The writer moved forward onto 588 * the new page added by the reader and is about to move forward 589 * again. 590 * 591 * You can see, it is legitimate for the previous pointer of 592 * the head (or any page) not to point back to itself. But only 593 * temporarially. 594 */ 595 596 #define RB_PAGE_NORMAL 0UL 597 #define RB_PAGE_HEAD 1UL 598 #define RB_PAGE_UPDATE 2UL 599 600 601 #define RB_FLAG_MASK 3UL 602 603 /* PAGE_MOVED is not part of the mask */ 604 #define RB_PAGE_MOVED 4UL 605 606 /* 607 * rb_list_head - remove any bit 608 */ 609 static struct list_head *rb_list_head(struct list_head *list) 610 { 611 unsigned long val = (unsigned long)list; 612 613 return (struct list_head *)(val & ~RB_FLAG_MASK); 614 } 615 616 /* 617 * rb_is_head_page - test if the given page is the head page 618 * 619 * Because the reader may move the head_page pointer, we can 620 * not trust what the head page is (it may be pointing to 621 * the reader page). But if the next page is a header page, 622 * its flags will be non zero. 623 */ 624 static int inline 625 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 626 struct buffer_page *page, struct list_head *list) 627 { 628 unsigned long val; 629 630 val = (unsigned long)list->next; 631 632 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 633 return RB_PAGE_MOVED; 634 635 return val & RB_FLAG_MASK; 636 } 637 638 /* 639 * rb_is_reader_page 640 * 641 * The unique thing about the reader page, is that, if the 642 * writer is ever on it, the previous pointer never points 643 * back to the reader page. 644 */ 645 static int rb_is_reader_page(struct buffer_page *page) 646 { 647 struct list_head *list = page->list.prev; 648 649 return rb_list_head(list->next) != &page->list; 650 } 651 652 /* 653 * rb_set_list_to_head - set a list_head to be pointing to head. 654 */ 655 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 656 struct list_head *list) 657 { 658 unsigned long *ptr; 659 660 ptr = (unsigned long *)&list->next; 661 *ptr |= RB_PAGE_HEAD; 662 *ptr &= ~RB_PAGE_UPDATE; 663 } 664 665 /* 666 * rb_head_page_activate - sets up head page 667 */ 668 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 669 { 670 struct buffer_page *head; 671 672 head = cpu_buffer->head_page; 673 if (!head) 674 return; 675 676 /* 677 * Set the previous list pointer to have the HEAD flag. 678 */ 679 rb_set_list_to_head(cpu_buffer, head->list.prev); 680 } 681 682 static void rb_list_head_clear(struct list_head *list) 683 { 684 unsigned long *ptr = (unsigned long *)&list->next; 685 686 *ptr &= ~RB_FLAG_MASK; 687 } 688 689 /* 690 * rb_head_page_dactivate - clears head page ptr (for free list) 691 */ 692 static void 693 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 694 { 695 struct list_head *hd; 696 697 /* Go through the whole list and clear any pointers found. */ 698 rb_list_head_clear(cpu_buffer->pages); 699 700 list_for_each(hd, cpu_buffer->pages) 701 rb_list_head_clear(hd); 702 } 703 704 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 705 struct buffer_page *head, 706 struct buffer_page *prev, 707 int old_flag, int new_flag) 708 { 709 struct list_head *list; 710 unsigned long val = (unsigned long)&head->list; 711 unsigned long ret; 712 713 list = &prev->list; 714 715 val &= ~RB_FLAG_MASK; 716 717 ret = cmpxchg((unsigned long *)&list->next, 718 val | old_flag, val | new_flag); 719 720 /* check if the reader took the page */ 721 if ((ret & ~RB_FLAG_MASK) != val) 722 return RB_PAGE_MOVED; 723 724 return ret & RB_FLAG_MASK; 725 } 726 727 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 728 struct buffer_page *head, 729 struct buffer_page *prev, 730 int old_flag) 731 { 732 return rb_head_page_set(cpu_buffer, head, prev, 733 old_flag, RB_PAGE_UPDATE); 734 } 735 736 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 737 struct buffer_page *head, 738 struct buffer_page *prev, 739 int old_flag) 740 { 741 return rb_head_page_set(cpu_buffer, head, prev, 742 old_flag, RB_PAGE_HEAD); 743 } 744 745 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 746 struct buffer_page *head, 747 struct buffer_page *prev, 748 int old_flag) 749 { 750 return rb_head_page_set(cpu_buffer, head, prev, 751 old_flag, RB_PAGE_NORMAL); 752 } 753 754 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 755 struct buffer_page **bpage) 756 { 757 struct list_head *p = rb_list_head((*bpage)->list.next); 758 759 *bpage = list_entry(p, struct buffer_page, list); 760 } 761 762 static struct buffer_page * 763 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 764 { 765 struct buffer_page *head; 766 struct buffer_page *page; 767 struct list_head *list; 768 int i; 769 770 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 771 return NULL; 772 773 /* sanity check */ 774 list = cpu_buffer->pages; 775 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 776 return NULL; 777 778 page = head = cpu_buffer->head_page; 779 /* 780 * It is possible that the writer moves the header behind 781 * where we started, and we miss in one loop. 782 * A second loop should grab the header, but we'll do 783 * three loops just because I'm paranoid. 784 */ 785 for (i = 0; i < 3; i++) { 786 do { 787 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 788 cpu_buffer->head_page = page; 789 return page; 790 } 791 rb_inc_page(cpu_buffer, &page); 792 } while (page != head); 793 } 794 795 RB_WARN_ON(cpu_buffer, 1); 796 797 return NULL; 798 } 799 800 static int rb_head_page_replace(struct buffer_page *old, 801 struct buffer_page *new) 802 { 803 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 804 unsigned long val; 805 unsigned long ret; 806 807 val = *ptr & ~RB_FLAG_MASK; 808 val |= RB_PAGE_HEAD; 809 810 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 811 812 return ret == val; 813 } 814 815 /* 816 * rb_tail_page_update - move the tail page forward 817 * 818 * Returns 1 if moved tail page, 0 if someone else did. 819 */ 820 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 821 struct buffer_page *tail_page, 822 struct buffer_page *next_page) 823 { 824 struct buffer_page *old_tail; 825 unsigned long old_entries; 826 unsigned long old_write; 827 int ret = 0; 828 829 /* 830 * The tail page now needs to be moved forward. 831 * 832 * We need to reset the tail page, but without messing 833 * with possible erasing of data brought in by interrupts 834 * that have moved the tail page and are currently on it. 835 * 836 * We add a counter to the write field to denote this. 837 */ 838 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 839 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 840 841 /* 842 * Just make sure we have seen our old_write and synchronize 843 * with any interrupts that come in. 844 */ 845 barrier(); 846 847 /* 848 * If the tail page is still the same as what we think 849 * it is, then it is up to us to update the tail 850 * pointer. 851 */ 852 if (tail_page == cpu_buffer->tail_page) { 853 /* Zero the write counter */ 854 unsigned long val = old_write & ~RB_WRITE_MASK; 855 unsigned long eval = old_entries & ~RB_WRITE_MASK; 856 857 /* 858 * This will only succeed if an interrupt did 859 * not come in and change it. In which case, we 860 * do not want to modify it. 861 * 862 * We add (void) to let the compiler know that we do not care 863 * about the return value of these functions. We use the 864 * cmpxchg to only update if an interrupt did not already 865 * do it for us. If the cmpxchg fails, we don't care. 866 */ 867 (void)local_cmpxchg(&next_page->write, old_write, val); 868 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 869 870 /* 871 * No need to worry about races with clearing out the commit. 872 * it only can increment when a commit takes place. But that 873 * only happens in the outer most nested commit. 874 */ 875 local_set(&next_page->page->commit, 0); 876 877 old_tail = cmpxchg(&cpu_buffer->tail_page, 878 tail_page, next_page); 879 880 if (old_tail == tail_page) 881 ret = 1; 882 } 883 884 return ret; 885 } 886 887 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 888 struct buffer_page *bpage) 889 { 890 unsigned long val = (unsigned long)bpage; 891 892 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 893 return 1; 894 895 return 0; 896 } 897 898 /** 899 * rb_check_list - make sure a pointer to a list has the last bits zero 900 */ 901 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 902 struct list_head *list) 903 { 904 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 905 return 1; 906 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 907 return 1; 908 return 0; 909 } 910 911 /** 912 * check_pages - integrity check of buffer pages 913 * @cpu_buffer: CPU buffer with pages to test 914 * 915 * As a safety measure we check to make sure the data pages have not 916 * been corrupted. 917 */ 918 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 919 { 920 struct list_head *head = cpu_buffer->pages; 921 struct buffer_page *bpage, *tmp; 922 923 rb_head_page_deactivate(cpu_buffer); 924 925 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 926 return -1; 927 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 928 return -1; 929 930 if (rb_check_list(cpu_buffer, head)) 931 return -1; 932 933 list_for_each_entry_safe(bpage, tmp, head, list) { 934 if (RB_WARN_ON(cpu_buffer, 935 bpage->list.next->prev != &bpage->list)) 936 return -1; 937 if (RB_WARN_ON(cpu_buffer, 938 bpage->list.prev->next != &bpage->list)) 939 return -1; 940 if (rb_check_list(cpu_buffer, &bpage->list)) 941 return -1; 942 } 943 944 rb_head_page_activate(cpu_buffer); 945 946 return 0; 947 } 948 949 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 950 unsigned nr_pages) 951 { 952 struct buffer_page *bpage, *tmp; 953 unsigned long addr; 954 LIST_HEAD(pages); 955 unsigned i; 956 957 WARN_ON(!nr_pages); 958 959 for (i = 0; i < nr_pages; i++) { 960 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 961 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 962 if (!bpage) 963 goto free_pages; 964 965 rb_check_bpage(cpu_buffer, bpage); 966 967 list_add(&bpage->list, &pages); 968 969 addr = __get_free_page(GFP_KERNEL); 970 if (!addr) 971 goto free_pages; 972 bpage->page = (void *)addr; 973 rb_init_page(bpage->page); 974 } 975 976 /* 977 * The ring buffer page list is a circular list that does not 978 * start and end with a list head. All page list items point to 979 * other pages. 980 */ 981 cpu_buffer->pages = pages.next; 982 list_del(&pages); 983 984 rb_check_pages(cpu_buffer); 985 986 return 0; 987 988 free_pages: 989 list_for_each_entry_safe(bpage, tmp, &pages, list) { 990 list_del_init(&bpage->list); 991 free_buffer_page(bpage); 992 } 993 return -ENOMEM; 994 } 995 996 static struct ring_buffer_per_cpu * 997 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 998 { 999 struct ring_buffer_per_cpu *cpu_buffer; 1000 struct buffer_page *bpage; 1001 unsigned long addr; 1002 int ret; 1003 1004 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1005 GFP_KERNEL, cpu_to_node(cpu)); 1006 if (!cpu_buffer) 1007 return NULL; 1008 1009 cpu_buffer->cpu = cpu; 1010 cpu_buffer->buffer = buffer; 1011 spin_lock_init(&cpu_buffer->reader_lock); 1012 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1013 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1014 1015 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1016 GFP_KERNEL, cpu_to_node(cpu)); 1017 if (!bpage) 1018 goto fail_free_buffer; 1019 1020 rb_check_bpage(cpu_buffer, bpage); 1021 1022 cpu_buffer->reader_page = bpage; 1023 addr = __get_free_page(GFP_KERNEL); 1024 if (!addr) 1025 goto fail_free_reader; 1026 bpage->page = (void *)addr; 1027 rb_init_page(bpage->page); 1028 1029 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1030 1031 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1032 if (ret < 0) 1033 goto fail_free_reader; 1034 1035 cpu_buffer->head_page 1036 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1037 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1038 1039 rb_head_page_activate(cpu_buffer); 1040 1041 return cpu_buffer; 1042 1043 fail_free_reader: 1044 free_buffer_page(cpu_buffer->reader_page); 1045 1046 fail_free_buffer: 1047 kfree(cpu_buffer); 1048 return NULL; 1049 } 1050 1051 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1052 { 1053 struct list_head *head = cpu_buffer->pages; 1054 struct buffer_page *bpage, *tmp; 1055 1056 free_buffer_page(cpu_buffer->reader_page); 1057 1058 rb_head_page_deactivate(cpu_buffer); 1059 1060 if (head) { 1061 list_for_each_entry_safe(bpage, tmp, head, list) { 1062 list_del_init(&bpage->list); 1063 free_buffer_page(bpage); 1064 } 1065 bpage = list_entry(head, struct buffer_page, list); 1066 free_buffer_page(bpage); 1067 } 1068 1069 kfree(cpu_buffer); 1070 } 1071 1072 #ifdef CONFIG_HOTPLUG_CPU 1073 static int rb_cpu_notify(struct notifier_block *self, 1074 unsigned long action, void *hcpu); 1075 #endif 1076 1077 /** 1078 * ring_buffer_alloc - allocate a new ring_buffer 1079 * @size: the size in bytes per cpu that is needed. 1080 * @flags: attributes to set for the ring buffer. 1081 * 1082 * Currently the only flag that is available is the RB_FL_OVERWRITE 1083 * flag. This flag means that the buffer will overwrite old data 1084 * when the buffer wraps. If this flag is not set, the buffer will 1085 * drop data when the tail hits the head. 1086 */ 1087 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1088 struct lock_class_key *key) 1089 { 1090 struct ring_buffer *buffer; 1091 int bsize; 1092 int cpu; 1093 1094 /* keep it in its own cache line */ 1095 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1096 GFP_KERNEL); 1097 if (!buffer) 1098 return NULL; 1099 1100 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1101 goto fail_free_buffer; 1102 1103 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1104 buffer->flags = flags; 1105 buffer->clock = trace_clock_local; 1106 buffer->reader_lock_key = key; 1107 1108 /* need at least two pages */ 1109 if (buffer->pages < 2) 1110 buffer->pages = 2; 1111 1112 /* 1113 * In case of non-hotplug cpu, if the ring-buffer is allocated 1114 * in early initcall, it will not be notified of secondary cpus. 1115 * In that off case, we need to allocate for all possible cpus. 1116 */ 1117 #ifdef CONFIG_HOTPLUG_CPU 1118 get_online_cpus(); 1119 cpumask_copy(buffer->cpumask, cpu_online_mask); 1120 #else 1121 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1122 #endif 1123 buffer->cpus = nr_cpu_ids; 1124 1125 bsize = sizeof(void *) * nr_cpu_ids; 1126 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1127 GFP_KERNEL); 1128 if (!buffer->buffers) 1129 goto fail_free_cpumask; 1130 1131 for_each_buffer_cpu(buffer, cpu) { 1132 buffer->buffers[cpu] = 1133 rb_allocate_cpu_buffer(buffer, cpu); 1134 if (!buffer->buffers[cpu]) 1135 goto fail_free_buffers; 1136 } 1137 1138 #ifdef CONFIG_HOTPLUG_CPU 1139 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1140 buffer->cpu_notify.priority = 0; 1141 register_cpu_notifier(&buffer->cpu_notify); 1142 #endif 1143 1144 put_online_cpus(); 1145 mutex_init(&buffer->mutex); 1146 1147 return buffer; 1148 1149 fail_free_buffers: 1150 for_each_buffer_cpu(buffer, cpu) { 1151 if (buffer->buffers[cpu]) 1152 rb_free_cpu_buffer(buffer->buffers[cpu]); 1153 } 1154 kfree(buffer->buffers); 1155 1156 fail_free_cpumask: 1157 free_cpumask_var(buffer->cpumask); 1158 put_online_cpus(); 1159 1160 fail_free_buffer: 1161 kfree(buffer); 1162 return NULL; 1163 } 1164 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1165 1166 /** 1167 * ring_buffer_free - free a ring buffer. 1168 * @buffer: the buffer to free. 1169 */ 1170 void 1171 ring_buffer_free(struct ring_buffer *buffer) 1172 { 1173 int cpu; 1174 1175 get_online_cpus(); 1176 1177 #ifdef CONFIG_HOTPLUG_CPU 1178 unregister_cpu_notifier(&buffer->cpu_notify); 1179 #endif 1180 1181 for_each_buffer_cpu(buffer, cpu) 1182 rb_free_cpu_buffer(buffer->buffers[cpu]); 1183 1184 put_online_cpus(); 1185 1186 kfree(buffer->buffers); 1187 free_cpumask_var(buffer->cpumask); 1188 1189 kfree(buffer); 1190 } 1191 EXPORT_SYMBOL_GPL(ring_buffer_free); 1192 1193 void ring_buffer_set_clock(struct ring_buffer *buffer, 1194 u64 (*clock)(void)) 1195 { 1196 buffer->clock = clock; 1197 } 1198 1199 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1200 1201 static void 1202 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1203 { 1204 struct buffer_page *bpage; 1205 struct list_head *p; 1206 unsigned i; 1207 1208 spin_lock_irq(&cpu_buffer->reader_lock); 1209 rb_head_page_deactivate(cpu_buffer); 1210 1211 for (i = 0; i < nr_pages; i++) { 1212 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1213 return; 1214 p = cpu_buffer->pages->next; 1215 bpage = list_entry(p, struct buffer_page, list); 1216 list_del_init(&bpage->list); 1217 free_buffer_page(bpage); 1218 } 1219 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1220 return; 1221 1222 rb_reset_cpu(cpu_buffer); 1223 rb_check_pages(cpu_buffer); 1224 1225 spin_unlock_irq(&cpu_buffer->reader_lock); 1226 } 1227 1228 static void 1229 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1230 struct list_head *pages, unsigned nr_pages) 1231 { 1232 struct buffer_page *bpage; 1233 struct list_head *p; 1234 unsigned i; 1235 1236 spin_lock_irq(&cpu_buffer->reader_lock); 1237 rb_head_page_deactivate(cpu_buffer); 1238 1239 for (i = 0; i < nr_pages; i++) { 1240 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1241 return; 1242 p = pages->next; 1243 bpage = list_entry(p, struct buffer_page, list); 1244 list_del_init(&bpage->list); 1245 list_add_tail(&bpage->list, cpu_buffer->pages); 1246 } 1247 rb_reset_cpu(cpu_buffer); 1248 rb_check_pages(cpu_buffer); 1249 1250 spin_unlock_irq(&cpu_buffer->reader_lock); 1251 } 1252 1253 /** 1254 * ring_buffer_resize - resize the ring buffer 1255 * @buffer: the buffer to resize. 1256 * @size: the new size. 1257 * 1258 * Minimum size is 2 * BUF_PAGE_SIZE. 1259 * 1260 * Returns -1 on failure. 1261 */ 1262 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1263 { 1264 struct ring_buffer_per_cpu *cpu_buffer; 1265 unsigned nr_pages, rm_pages, new_pages; 1266 struct buffer_page *bpage, *tmp; 1267 unsigned long buffer_size; 1268 unsigned long addr; 1269 LIST_HEAD(pages); 1270 int i, cpu; 1271 1272 /* 1273 * Always succeed at resizing a non-existent buffer: 1274 */ 1275 if (!buffer) 1276 return size; 1277 1278 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1279 size *= BUF_PAGE_SIZE; 1280 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1281 1282 /* we need a minimum of two pages */ 1283 if (size < BUF_PAGE_SIZE * 2) 1284 size = BUF_PAGE_SIZE * 2; 1285 1286 if (size == buffer_size) 1287 return size; 1288 1289 atomic_inc(&buffer->record_disabled); 1290 1291 /* Make sure all writers are done with this buffer. */ 1292 synchronize_sched(); 1293 1294 mutex_lock(&buffer->mutex); 1295 get_online_cpus(); 1296 1297 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1298 1299 if (size < buffer_size) { 1300 1301 /* easy case, just free pages */ 1302 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1303 goto out_fail; 1304 1305 rm_pages = buffer->pages - nr_pages; 1306 1307 for_each_buffer_cpu(buffer, cpu) { 1308 cpu_buffer = buffer->buffers[cpu]; 1309 rb_remove_pages(cpu_buffer, rm_pages); 1310 } 1311 goto out; 1312 } 1313 1314 /* 1315 * This is a bit more difficult. We only want to add pages 1316 * when we can allocate enough for all CPUs. We do this 1317 * by allocating all the pages and storing them on a local 1318 * link list. If we succeed in our allocation, then we 1319 * add these pages to the cpu_buffers. Otherwise we just free 1320 * them all and return -ENOMEM; 1321 */ 1322 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1323 goto out_fail; 1324 1325 new_pages = nr_pages - buffer->pages; 1326 1327 for_each_buffer_cpu(buffer, cpu) { 1328 for (i = 0; i < new_pages; i++) { 1329 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1330 cache_line_size()), 1331 GFP_KERNEL, cpu_to_node(cpu)); 1332 if (!bpage) 1333 goto free_pages; 1334 list_add(&bpage->list, &pages); 1335 addr = __get_free_page(GFP_KERNEL); 1336 if (!addr) 1337 goto free_pages; 1338 bpage->page = (void *)addr; 1339 rb_init_page(bpage->page); 1340 } 1341 } 1342 1343 for_each_buffer_cpu(buffer, cpu) { 1344 cpu_buffer = buffer->buffers[cpu]; 1345 rb_insert_pages(cpu_buffer, &pages, new_pages); 1346 } 1347 1348 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1349 goto out_fail; 1350 1351 out: 1352 buffer->pages = nr_pages; 1353 put_online_cpus(); 1354 mutex_unlock(&buffer->mutex); 1355 1356 atomic_dec(&buffer->record_disabled); 1357 1358 return size; 1359 1360 free_pages: 1361 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1362 list_del_init(&bpage->list); 1363 free_buffer_page(bpage); 1364 } 1365 put_online_cpus(); 1366 mutex_unlock(&buffer->mutex); 1367 atomic_dec(&buffer->record_disabled); 1368 return -ENOMEM; 1369 1370 /* 1371 * Something went totally wrong, and we are too paranoid 1372 * to even clean up the mess. 1373 */ 1374 out_fail: 1375 put_online_cpus(); 1376 mutex_unlock(&buffer->mutex); 1377 atomic_dec(&buffer->record_disabled); 1378 return -1; 1379 } 1380 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1381 1382 static inline void * 1383 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1384 { 1385 return bpage->data + index; 1386 } 1387 1388 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1389 { 1390 return bpage->page->data + index; 1391 } 1392 1393 static inline struct ring_buffer_event * 1394 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1395 { 1396 return __rb_page_index(cpu_buffer->reader_page, 1397 cpu_buffer->reader_page->read); 1398 } 1399 1400 static inline struct ring_buffer_event * 1401 rb_iter_head_event(struct ring_buffer_iter *iter) 1402 { 1403 return __rb_page_index(iter->head_page, iter->head); 1404 } 1405 1406 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1407 { 1408 return local_read(&bpage->write) & RB_WRITE_MASK; 1409 } 1410 1411 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1412 { 1413 return local_read(&bpage->page->commit); 1414 } 1415 1416 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1417 { 1418 return local_read(&bpage->entries) & RB_WRITE_MASK; 1419 } 1420 1421 /* Size is determined by what has been commited */ 1422 static inline unsigned rb_page_size(struct buffer_page *bpage) 1423 { 1424 return rb_page_commit(bpage); 1425 } 1426 1427 static inline unsigned 1428 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1429 { 1430 return rb_page_commit(cpu_buffer->commit_page); 1431 } 1432 1433 static inline unsigned 1434 rb_event_index(struct ring_buffer_event *event) 1435 { 1436 unsigned long addr = (unsigned long)event; 1437 1438 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1439 } 1440 1441 static inline int 1442 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1443 struct ring_buffer_event *event) 1444 { 1445 unsigned long addr = (unsigned long)event; 1446 unsigned long index; 1447 1448 index = rb_event_index(event); 1449 addr &= PAGE_MASK; 1450 1451 return cpu_buffer->commit_page->page == (void *)addr && 1452 rb_commit_index(cpu_buffer) == index; 1453 } 1454 1455 static void 1456 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1457 { 1458 unsigned long max_count; 1459 1460 /* 1461 * We only race with interrupts and NMIs on this CPU. 1462 * If we own the commit event, then we can commit 1463 * all others that interrupted us, since the interruptions 1464 * are in stack format (they finish before they come 1465 * back to us). This allows us to do a simple loop to 1466 * assign the commit to the tail. 1467 */ 1468 again: 1469 max_count = cpu_buffer->buffer->pages * 100; 1470 1471 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1472 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1473 return; 1474 if (RB_WARN_ON(cpu_buffer, 1475 rb_is_reader_page(cpu_buffer->tail_page))) 1476 return; 1477 local_set(&cpu_buffer->commit_page->page->commit, 1478 rb_page_write(cpu_buffer->commit_page)); 1479 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1480 cpu_buffer->write_stamp = 1481 cpu_buffer->commit_page->page->time_stamp; 1482 /* add barrier to keep gcc from optimizing too much */ 1483 barrier(); 1484 } 1485 while (rb_commit_index(cpu_buffer) != 1486 rb_page_write(cpu_buffer->commit_page)) { 1487 1488 local_set(&cpu_buffer->commit_page->page->commit, 1489 rb_page_write(cpu_buffer->commit_page)); 1490 RB_WARN_ON(cpu_buffer, 1491 local_read(&cpu_buffer->commit_page->page->commit) & 1492 ~RB_WRITE_MASK); 1493 barrier(); 1494 } 1495 1496 /* again, keep gcc from optimizing */ 1497 barrier(); 1498 1499 /* 1500 * If an interrupt came in just after the first while loop 1501 * and pushed the tail page forward, we will be left with 1502 * a dangling commit that will never go forward. 1503 */ 1504 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1505 goto again; 1506 } 1507 1508 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1509 { 1510 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1511 cpu_buffer->reader_page->read = 0; 1512 } 1513 1514 static void rb_inc_iter(struct ring_buffer_iter *iter) 1515 { 1516 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1517 1518 /* 1519 * The iterator could be on the reader page (it starts there). 1520 * But the head could have moved, since the reader was 1521 * found. Check for this case and assign the iterator 1522 * to the head page instead of next. 1523 */ 1524 if (iter->head_page == cpu_buffer->reader_page) 1525 iter->head_page = rb_set_head_page(cpu_buffer); 1526 else 1527 rb_inc_page(cpu_buffer, &iter->head_page); 1528 1529 iter->read_stamp = iter->head_page->page->time_stamp; 1530 iter->head = 0; 1531 } 1532 1533 /** 1534 * ring_buffer_update_event - update event type and data 1535 * @event: the even to update 1536 * @type: the type of event 1537 * @length: the size of the event field in the ring buffer 1538 * 1539 * Update the type and data fields of the event. The length 1540 * is the actual size that is written to the ring buffer, 1541 * and with this, we can determine what to place into the 1542 * data field. 1543 */ 1544 static void 1545 rb_update_event(struct ring_buffer_event *event, 1546 unsigned type, unsigned length) 1547 { 1548 event->type_len = type; 1549 1550 switch (type) { 1551 1552 case RINGBUF_TYPE_PADDING: 1553 case RINGBUF_TYPE_TIME_EXTEND: 1554 case RINGBUF_TYPE_TIME_STAMP: 1555 break; 1556 1557 case 0: 1558 length -= RB_EVNT_HDR_SIZE; 1559 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1560 event->array[0] = length; 1561 else 1562 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1563 break; 1564 default: 1565 BUG(); 1566 } 1567 } 1568 1569 /* 1570 * rb_handle_head_page - writer hit the head page 1571 * 1572 * Returns: +1 to retry page 1573 * 0 to continue 1574 * -1 on error 1575 */ 1576 static int 1577 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1578 struct buffer_page *tail_page, 1579 struct buffer_page *next_page) 1580 { 1581 struct buffer_page *new_head; 1582 int entries; 1583 int type; 1584 int ret; 1585 1586 entries = rb_page_entries(next_page); 1587 1588 /* 1589 * The hard part is here. We need to move the head 1590 * forward, and protect against both readers on 1591 * other CPUs and writers coming in via interrupts. 1592 */ 1593 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1594 RB_PAGE_HEAD); 1595 1596 /* 1597 * type can be one of four: 1598 * NORMAL - an interrupt already moved it for us 1599 * HEAD - we are the first to get here. 1600 * UPDATE - we are the interrupt interrupting 1601 * a current move. 1602 * MOVED - a reader on another CPU moved the next 1603 * pointer to its reader page. Give up 1604 * and try again. 1605 */ 1606 1607 switch (type) { 1608 case RB_PAGE_HEAD: 1609 /* 1610 * We changed the head to UPDATE, thus 1611 * it is our responsibility to update 1612 * the counters. 1613 */ 1614 local_add(entries, &cpu_buffer->overrun); 1615 1616 /* 1617 * The entries will be zeroed out when we move the 1618 * tail page. 1619 */ 1620 1621 /* still more to do */ 1622 break; 1623 1624 case RB_PAGE_UPDATE: 1625 /* 1626 * This is an interrupt that interrupt the 1627 * previous update. Still more to do. 1628 */ 1629 break; 1630 case RB_PAGE_NORMAL: 1631 /* 1632 * An interrupt came in before the update 1633 * and processed this for us. 1634 * Nothing left to do. 1635 */ 1636 return 1; 1637 case RB_PAGE_MOVED: 1638 /* 1639 * The reader is on another CPU and just did 1640 * a swap with our next_page. 1641 * Try again. 1642 */ 1643 return 1; 1644 default: 1645 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1646 return -1; 1647 } 1648 1649 /* 1650 * Now that we are here, the old head pointer is 1651 * set to UPDATE. This will keep the reader from 1652 * swapping the head page with the reader page. 1653 * The reader (on another CPU) will spin till 1654 * we are finished. 1655 * 1656 * We just need to protect against interrupts 1657 * doing the job. We will set the next pointer 1658 * to HEAD. After that, we set the old pointer 1659 * to NORMAL, but only if it was HEAD before. 1660 * otherwise we are an interrupt, and only 1661 * want the outer most commit to reset it. 1662 */ 1663 new_head = next_page; 1664 rb_inc_page(cpu_buffer, &new_head); 1665 1666 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1667 RB_PAGE_NORMAL); 1668 1669 /* 1670 * Valid returns are: 1671 * HEAD - an interrupt came in and already set it. 1672 * NORMAL - One of two things: 1673 * 1) We really set it. 1674 * 2) A bunch of interrupts came in and moved 1675 * the page forward again. 1676 */ 1677 switch (ret) { 1678 case RB_PAGE_HEAD: 1679 case RB_PAGE_NORMAL: 1680 /* OK */ 1681 break; 1682 default: 1683 RB_WARN_ON(cpu_buffer, 1); 1684 return -1; 1685 } 1686 1687 /* 1688 * It is possible that an interrupt came in, 1689 * set the head up, then more interrupts came in 1690 * and moved it again. When we get back here, 1691 * the page would have been set to NORMAL but we 1692 * just set it back to HEAD. 1693 * 1694 * How do you detect this? Well, if that happened 1695 * the tail page would have moved. 1696 */ 1697 if (ret == RB_PAGE_NORMAL) { 1698 /* 1699 * If the tail had moved passed next, then we need 1700 * to reset the pointer. 1701 */ 1702 if (cpu_buffer->tail_page != tail_page && 1703 cpu_buffer->tail_page != next_page) 1704 rb_head_page_set_normal(cpu_buffer, new_head, 1705 next_page, 1706 RB_PAGE_HEAD); 1707 } 1708 1709 /* 1710 * If this was the outer most commit (the one that 1711 * changed the original pointer from HEAD to UPDATE), 1712 * then it is up to us to reset it to NORMAL. 1713 */ 1714 if (type == RB_PAGE_HEAD) { 1715 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1716 tail_page, 1717 RB_PAGE_UPDATE); 1718 if (RB_WARN_ON(cpu_buffer, 1719 ret != RB_PAGE_UPDATE)) 1720 return -1; 1721 } 1722 1723 return 0; 1724 } 1725 1726 static unsigned rb_calculate_event_length(unsigned length) 1727 { 1728 struct ring_buffer_event event; /* Used only for sizeof array */ 1729 1730 /* zero length can cause confusions */ 1731 if (!length) 1732 length = 1; 1733 1734 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1735 length += sizeof(event.array[0]); 1736 1737 length += RB_EVNT_HDR_SIZE; 1738 length = ALIGN(length, RB_ARCH_ALIGNMENT); 1739 1740 return length; 1741 } 1742 1743 static inline void 1744 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1745 struct buffer_page *tail_page, 1746 unsigned long tail, unsigned long length) 1747 { 1748 struct ring_buffer_event *event; 1749 1750 /* 1751 * Only the event that crossed the page boundary 1752 * must fill the old tail_page with padding. 1753 */ 1754 if (tail >= BUF_PAGE_SIZE) { 1755 local_sub(length, &tail_page->write); 1756 return; 1757 } 1758 1759 event = __rb_page_index(tail_page, tail); 1760 kmemcheck_annotate_bitfield(event, bitfield); 1761 1762 /* 1763 * If this event is bigger than the minimum size, then 1764 * we need to be careful that we don't subtract the 1765 * write counter enough to allow another writer to slip 1766 * in on this page. 1767 * We put in a discarded commit instead, to make sure 1768 * that this space is not used again. 1769 * 1770 * If we are less than the minimum size, we don't need to 1771 * worry about it. 1772 */ 1773 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1774 /* No room for any events */ 1775 1776 /* Mark the rest of the page with padding */ 1777 rb_event_set_padding(event); 1778 1779 /* Set the write back to the previous setting */ 1780 local_sub(length, &tail_page->write); 1781 return; 1782 } 1783 1784 /* Put in a discarded event */ 1785 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1786 event->type_len = RINGBUF_TYPE_PADDING; 1787 /* time delta must be non zero */ 1788 event->time_delta = 1; 1789 1790 /* Set write to end of buffer */ 1791 length = (tail + length) - BUF_PAGE_SIZE; 1792 local_sub(length, &tail_page->write); 1793 } 1794 1795 static struct ring_buffer_event * 1796 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1797 unsigned long length, unsigned long tail, 1798 struct buffer_page *tail_page, u64 *ts) 1799 { 1800 struct buffer_page *commit_page = cpu_buffer->commit_page; 1801 struct ring_buffer *buffer = cpu_buffer->buffer; 1802 struct buffer_page *next_page; 1803 int ret; 1804 1805 next_page = tail_page; 1806 1807 rb_inc_page(cpu_buffer, &next_page); 1808 1809 /* 1810 * If for some reason, we had an interrupt storm that made 1811 * it all the way around the buffer, bail, and warn 1812 * about it. 1813 */ 1814 if (unlikely(next_page == commit_page)) { 1815 local_inc(&cpu_buffer->commit_overrun); 1816 goto out_reset; 1817 } 1818 1819 /* 1820 * This is where the fun begins! 1821 * 1822 * We are fighting against races between a reader that 1823 * could be on another CPU trying to swap its reader 1824 * page with the buffer head. 1825 * 1826 * We are also fighting against interrupts coming in and 1827 * moving the head or tail on us as well. 1828 * 1829 * If the next page is the head page then we have filled 1830 * the buffer, unless the commit page is still on the 1831 * reader page. 1832 */ 1833 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1834 1835 /* 1836 * If the commit is not on the reader page, then 1837 * move the header page. 1838 */ 1839 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1840 /* 1841 * If we are not in overwrite mode, 1842 * this is easy, just stop here. 1843 */ 1844 if (!(buffer->flags & RB_FL_OVERWRITE)) 1845 goto out_reset; 1846 1847 ret = rb_handle_head_page(cpu_buffer, 1848 tail_page, 1849 next_page); 1850 if (ret < 0) 1851 goto out_reset; 1852 if (ret) 1853 goto out_again; 1854 } else { 1855 /* 1856 * We need to be careful here too. The 1857 * commit page could still be on the reader 1858 * page. We could have a small buffer, and 1859 * have filled up the buffer with events 1860 * from interrupts and such, and wrapped. 1861 * 1862 * Note, if the tail page is also the on the 1863 * reader_page, we let it move out. 1864 */ 1865 if (unlikely((cpu_buffer->commit_page != 1866 cpu_buffer->tail_page) && 1867 (cpu_buffer->commit_page == 1868 cpu_buffer->reader_page))) { 1869 local_inc(&cpu_buffer->commit_overrun); 1870 goto out_reset; 1871 } 1872 } 1873 } 1874 1875 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1876 if (ret) { 1877 /* 1878 * Nested commits always have zero deltas, so 1879 * just reread the time stamp 1880 */ 1881 *ts = rb_time_stamp(buffer); 1882 next_page->page->time_stamp = *ts; 1883 } 1884 1885 out_again: 1886 1887 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1888 1889 /* fail and let the caller try again */ 1890 return ERR_PTR(-EAGAIN); 1891 1892 out_reset: 1893 /* reset write */ 1894 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1895 1896 return NULL; 1897 } 1898 1899 static struct ring_buffer_event * 1900 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1901 unsigned type, unsigned long length, u64 *ts) 1902 { 1903 struct buffer_page *tail_page; 1904 struct ring_buffer_event *event; 1905 unsigned long tail, write; 1906 1907 tail_page = cpu_buffer->tail_page; 1908 write = local_add_return(length, &tail_page->write); 1909 1910 /* set write to only the index of the write */ 1911 write &= RB_WRITE_MASK; 1912 tail = write - length; 1913 1914 /* See if we shot pass the end of this buffer page */ 1915 if (write > BUF_PAGE_SIZE) 1916 return rb_move_tail(cpu_buffer, length, tail, 1917 tail_page, ts); 1918 1919 /* We reserved something on the buffer */ 1920 1921 event = __rb_page_index(tail_page, tail); 1922 kmemcheck_annotate_bitfield(event, bitfield); 1923 rb_update_event(event, type, length); 1924 1925 /* The passed in type is zero for DATA */ 1926 if (likely(!type)) 1927 local_inc(&tail_page->entries); 1928 1929 /* 1930 * If this is the first commit on the page, then update 1931 * its timestamp. 1932 */ 1933 if (!tail) 1934 tail_page->page->time_stamp = *ts; 1935 1936 return event; 1937 } 1938 1939 static inline int 1940 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1941 struct ring_buffer_event *event) 1942 { 1943 unsigned long new_index, old_index; 1944 struct buffer_page *bpage; 1945 unsigned long index; 1946 unsigned long addr; 1947 1948 new_index = rb_event_index(event); 1949 old_index = new_index + rb_event_length(event); 1950 addr = (unsigned long)event; 1951 addr &= PAGE_MASK; 1952 1953 bpage = cpu_buffer->tail_page; 1954 1955 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1956 unsigned long write_mask = 1957 local_read(&bpage->write) & ~RB_WRITE_MASK; 1958 /* 1959 * This is on the tail page. It is possible that 1960 * a write could come in and move the tail page 1961 * and write to the next page. That is fine 1962 * because we just shorten what is on this page. 1963 */ 1964 old_index += write_mask; 1965 new_index += write_mask; 1966 index = local_cmpxchg(&bpage->write, old_index, new_index); 1967 if (index == old_index) 1968 return 1; 1969 } 1970 1971 /* could not discard */ 1972 return 0; 1973 } 1974 1975 static int 1976 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1977 u64 *ts, u64 *delta) 1978 { 1979 struct ring_buffer_event *event; 1980 static int once; 1981 int ret; 1982 1983 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1984 printk(KERN_WARNING "Delta way too big! %llu" 1985 " ts=%llu write stamp = %llu\n", 1986 (unsigned long long)*delta, 1987 (unsigned long long)*ts, 1988 (unsigned long long)cpu_buffer->write_stamp); 1989 WARN_ON(1); 1990 } 1991 1992 /* 1993 * The delta is too big, we to add a 1994 * new timestamp. 1995 */ 1996 event = __rb_reserve_next(cpu_buffer, 1997 RINGBUF_TYPE_TIME_EXTEND, 1998 RB_LEN_TIME_EXTEND, 1999 ts); 2000 if (!event) 2001 return -EBUSY; 2002 2003 if (PTR_ERR(event) == -EAGAIN) 2004 return -EAGAIN; 2005 2006 /* Only a commited time event can update the write stamp */ 2007 if (rb_event_is_commit(cpu_buffer, event)) { 2008 /* 2009 * If this is the first on the page, then it was 2010 * updated with the page itself. Try to discard it 2011 * and if we can't just make it zero. 2012 */ 2013 if (rb_event_index(event)) { 2014 event->time_delta = *delta & TS_MASK; 2015 event->array[0] = *delta >> TS_SHIFT; 2016 } else { 2017 /* try to discard, since we do not need this */ 2018 if (!rb_try_to_discard(cpu_buffer, event)) { 2019 /* nope, just zero it */ 2020 event->time_delta = 0; 2021 event->array[0] = 0; 2022 } 2023 } 2024 cpu_buffer->write_stamp = *ts; 2025 /* let the caller know this was the commit */ 2026 ret = 1; 2027 } else { 2028 /* Try to discard the event */ 2029 if (!rb_try_to_discard(cpu_buffer, event)) { 2030 /* Darn, this is just wasted space */ 2031 event->time_delta = 0; 2032 event->array[0] = 0; 2033 } 2034 ret = 0; 2035 } 2036 2037 *delta = 0; 2038 2039 return ret; 2040 } 2041 2042 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2043 { 2044 local_inc(&cpu_buffer->committing); 2045 local_inc(&cpu_buffer->commits); 2046 } 2047 2048 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2049 { 2050 unsigned long commits; 2051 2052 if (RB_WARN_ON(cpu_buffer, 2053 !local_read(&cpu_buffer->committing))) 2054 return; 2055 2056 again: 2057 commits = local_read(&cpu_buffer->commits); 2058 /* synchronize with interrupts */ 2059 barrier(); 2060 if (local_read(&cpu_buffer->committing) == 1) 2061 rb_set_commit_to_write(cpu_buffer); 2062 2063 local_dec(&cpu_buffer->committing); 2064 2065 /* synchronize with interrupts */ 2066 barrier(); 2067 2068 /* 2069 * Need to account for interrupts coming in between the 2070 * updating of the commit page and the clearing of the 2071 * committing counter. 2072 */ 2073 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2074 !local_read(&cpu_buffer->committing)) { 2075 local_inc(&cpu_buffer->committing); 2076 goto again; 2077 } 2078 } 2079 2080 static struct ring_buffer_event * 2081 rb_reserve_next_event(struct ring_buffer *buffer, 2082 struct ring_buffer_per_cpu *cpu_buffer, 2083 unsigned long length) 2084 { 2085 struct ring_buffer_event *event; 2086 u64 ts, delta = 0; 2087 int commit = 0; 2088 int nr_loops = 0; 2089 2090 rb_start_commit(cpu_buffer); 2091 2092 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2093 /* 2094 * Due to the ability to swap a cpu buffer from a buffer 2095 * it is possible it was swapped before we committed. 2096 * (committing stops a swap). We check for it here and 2097 * if it happened, we have to fail the write. 2098 */ 2099 barrier(); 2100 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2101 local_dec(&cpu_buffer->committing); 2102 local_dec(&cpu_buffer->commits); 2103 return NULL; 2104 } 2105 #endif 2106 2107 length = rb_calculate_event_length(length); 2108 again: 2109 /* 2110 * We allow for interrupts to reenter here and do a trace. 2111 * If one does, it will cause this original code to loop 2112 * back here. Even with heavy interrupts happening, this 2113 * should only happen a few times in a row. If this happens 2114 * 1000 times in a row, there must be either an interrupt 2115 * storm or we have something buggy. 2116 * Bail! 2117 */ 2118 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2119 goto out_fail; 2120 2121 ts = rb_time_stamp(cpu_buffer->buffer); 2122 2123 /* 2124 * Only the first commit can update the timestamp. 2125 * Yes there is a race here. If an interrupt comes in 2126 * just after the conditional and it traces too, then it 2127 * will also check the deltas. More than one timestamp may 2128 * also be made. But only the entry that did the actual 2129 * commit will be something other than zero. 2130 */ 2131 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 2132 rb_page_write(cpu_buffer->tail_page) == 2133 rb_commit_index(cpu_buffer))) { 2134 u64 diff; 2135 2136 diff = ts - cpu_buffer->write_stamp; 2137 2138 /* make sure this diff is calculated here */ 2139 barrier(); 2140 2141 /* Did the write stamp get updated already? */ 2142 if (unlikely(ts < cpu_buffer->write_stamp)) 2143 goto get_event; 2144 2145 delta = diff; 2146 if (unlikely(test_time_stamp(delta))) { 2147 2148 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 2149 if (commit == -EBUSY) 2150 goto out_fail; 2151 2152 if (commit == -EAGAIN) 2153 goto again; 2154 2155 RB_WARN_ON(cpu_buffer, commit < 0); 2156 } 2157 } 2158 2159 get_event: 2160 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 2161 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2162 goto again; 2163 2164 if (!event) 2165 goto out_fail; 2166 2167 if (!rb_event_is_commit(cpu_buffer, event)) 2168 delta = 0; 2169 2170 event->time_delta = delta; 2171 2172 return event; 2173 2174 out_fail: 2175 rb_end_commit(cpu_buffer); 2176 return NULL; 2177 } 2178 2179 #ifdef CONFIG_TRACING 2180 2181 #define TRACE_RECURSIVE_DEPTH 16 2182 2183 static int trace_recursive_lock(void) 2184 { 2185 current->trace_recursion++; 2186 2187 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 2188 return 0; 2189 2190 /* Disable all tracing before we do anything else */ 2191 tracing_off_permanent(); 2192 2193 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2194 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2195 current->trace_recursion, 2196 hardirq_count() >> HARDIRQ_SHIFT, 2197 softirq_count() >> SOFTIRQ_SHIFT, 2198 in_nmi()); 2199 2200 WARN_ON_ONCE(1); 2201 return -1; 2202 } 2203 2204 static void trace_recursive_unlock(void) 2205 { 2206 WARN_ON_ONCE(!current->trace_recursion); 2207 2208 current->trace_recursion--; 2209 } 2210 2211 #else 2212 2213 #define trace_recursive_lock() (0) 2214 #define trace_recursive_unlock() do { } while (0) 2215 2216 #endif 2217 2218 static DEFINE_PER_CPU(int, rb_need_resched); 2219 2220 /** 2221 * ring_buffer_lock_reserve - reserve a part of the buffer 2222 * @buffer: the ring buffer to reserve from 2223 * @length: the length of the data to reserve (excluding event header) 2224 * 2225 * Returns a reseverd event on the ring buffer to copy directly to. 2226 * The user of this interface will need to get the body to write into 2227 * and can use the ring_buffer_event_data() interface. 2228 * 2229 * The length is the length of the data needed, not the event length 2230 * which also includes the event header. 2231 * 2232 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2233 * If NULL is returned, then nothing has been allocated or locked. 2234 */ 2235 struct ring_buffer_event * 2236 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2237 { 2238 struct ring_buffer_per_cpu *cpu_buffer; 2239 struct ring_buffer_event *event; 2240 int cpu, resched; 2241 2242 if (ring_buffer_flags != RB_BUFFERS_ON) 2243 return NULL; 2244 2245 /* If we are tracing schedule, we don't want to recurse */ 2246 resched = ftrace_preempt_disable(); 2247 2248 if (atomic_read(&buffer->record_disabled)) 2249 goto out_nocheck; 2250 2251 if (trace_recursive_lock()) 2252 goto out_nocheck; 2253 2254 cpu = raw_smp_processor_id(); 2255 2256 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2257 goto out; 2258 2259 cpu_buffer = buffer->buffers[cpu]; 2260 2261 if (atomic_read(&cpu_buffer->record_disabled)) 2262 goto out; 2263 2264 if (length > BUF_MAX_DATA_SIZE) 2265 goto out; 2266 2267 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2268 if (!event) 2269 goto out; 2270 2271 /* 2272 * Need to store resched state on this cpu. 2273 * Only the first needs to. 2274 */ 2275 2276 if (preempt_count() == 1) 2277 per_cpu(rb_need_resched, cpu) = resched; 2278 2279 return event; 2280 2281 out: 2282 trace_recursive_unlock(); 2283 2284 out_nocheck: 2285 ftrace_preempt_enable(resched); 2286 return NULL; 2287 } 2288 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2289 2290 static void 2291 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2292 struct ring_buffer_event *event) 2293 { 2294 /* 2295 * The event first in the commit queue updates the 2296 * time stamp. 2297 */ 2298 if (rb_event_is_commit(cpu_buffer, event)) 2299 cpu_buffer->write_stamp += event->time_delta; 2300 } 2301 2302 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2303 struct ring_buffer_event *event) 2304 { 2305 local_inc(&cpu_buffer->entries); 2306 rb_update_write_stamp(cpu_buffer, event); 2307 rb_end_commit(cpu_buffer); 2308 } 2309 2310 /** 2311 * ring_buffer_unlock_commit - commit a reserved 2312 * @buffer: The buffer to commit to 2313 * @event: The event pointer to commit. 2314 * 2315 * This commits the data to the ring buffer, and releases any locks held. 2316 * 2317 * Must be paired with ring_buffer_lock_reserve. 2318 */ 2319 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2320 struct ring_buffer_event *event) 2321 { 2322 struct ring_buffer_per_cpu *cpu_buffer; 2323 int cpu = raw_smp_processor_id(); 2324 2325 cpu_buffer = buffer->buffers[cpu]; 2326 2327 rb_commit(cpu_buffer, event); 2328 2329 trace_recursive_unlock(); 2330 2331 /* 2332 * Only the last preempt count needs to restore preemption. 2333 */ 2334 if (preempt_count() == 1) 2335 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2336 else 2337 preempt_enable_no_resched_notrace(); 2338 2339 return 0; 2340 } 2341 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2342 2343 static inline void rb_event_discard(struct ring_buffer_event *event) 2344 { 2345 /* array[0] holds the actual length for the discarded event */ 2346 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2347 event->type_len = RINGBUF_TYPE_PADDING; 2348 /* time delta must be non zero */ 2349 if (!event->time_delta) 2350 event->time_delta = 1; 2351 } 2352 2353 /* 2354 * Decrement the entries to the page that an event is on. 2355 * The event does not even need to exist, only the pointer 2356 * to the page it is on. This may only be called before the commit 2357 * takes place. 2358 */ 2359 static inline void 2360 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2361 struct ring_buffer_event *event) 2362 { 2363 unsigned long addr = (unsigned long)event; 2364 struct buffer_page *bpage = cpu_buffer->commit_page; 2365 struct buffer_page *start; 2366 2367 addr &= PAGE_MASK; 2368 2369 /* Do the likely case first */ 2370 if (likely(bpage->page == (void *)addr)) { 2371 local_dec(&bpage->entries); 2372 return; 2373 } 2374 2375 /* 2376 * Because the commit page may be on the reader page we 2377 * start with the next page and check the end loop there. 2378 */ 2379 rb_inc_page(cpu_buffer, &bpage); 2380 start = bpage; 2381 do { 2382 if (bpage->page == (void *)addr) { 2383 local_dec(&bpage->entries); 2384 return; 2385 } 2386 rb_inc_page(cpu_buffer, &bpage); 2387 } while (bpage != start); 2388 2389 /* commit not part of this buffer?? */ 2390 RB_WARN_ON(cpu_buffer, 1); 2391 } 2392 2393 /** 2394 * ring_buffer_commit_discard - discard an event that has not been committed 2395 * @buffer: the ring buffer 2396 * @event: non committed event to discard 2397 * 2398 * Sometimes an event that is in the ring buffer needs to be ignored. 2399 * This function lets the user discard an event in the ring buffer 2400 * and then that event will not be read later. 2401 * 2402 * This function only works if it is called before the the item has been 2403 * committed. It will try to free the event from the ring buffer 2404 * if another event has not been added behind it. 2405 * 2406 * If another event has been added behind it, it will set the event 2407 * up as discarded, and perform the commit. 2408 * 2409 * If this function is called, do not call ring_buffer_unlock_commit on 2410 * the event. 2411 */ 2412 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2413 struct ring_buffer_event *event) 2414 { 2415 struct ring_buffer_per_cpu *cpu_buffer; 2416 int cpu; 2417 2418 /* The event is discarded regardless */ 2419 rb_event_discard(event); 2420 2421 cpu = smp_processor_id(); 2422 cpu_buffer = buffer->buffers[cpu]; 2423 2424 /* 2425 * This must only be called if the event has not been 2426 * committed yet. Thus we can assume that preemption 2427 * is still disabled. 2428 */ 2429 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2430 2431 rb_decrement_entry(cpu_buffer, event); 2432 if (rb_try_to_discard(cpu_buffer, event)) 2433 goto out; 2434 2435 /* 2436 * The commit is still visible by the reader, so we 2437 * must still update the timestamp. 2438 */ 2439 rb_update_write_stamp(cpu_buffer, event); 2440 out: 2441 rb_end_commit(cpu_buffer); 2442 2443 trace_recursive_unlock(); 2444 2445 /* 2446 * Only the last preempt count needs to restore preemption. 2447 */ 2448 if (preempt_count() == 1) 2449 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2450 else 2451 preempt_enable_no_resched_notrace(); 2452 2453 } 2454 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2455 2456 /** 2457 * ring_buffer_write - write data to the buffer without reserving 2458 * @buffer: The ring buffer to write to. 2459 * @length: The length of the data being written (excluding the event header) 2460 * @data: The data to write to the buffer. 2461 * 2462 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2463 * one function. If you already have the data to write to the buffer, it 2464 * may be easier to simply call this function. 2465 * 2466 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2467 * and not the length of the event which would hold the header. 2468 */ 2469 int ring_buffer_write(struct ring_buffer *buffer, 2470 unsigned long length, 2471 void *data) 2472 { 2473 struct ring_buffer_per_cpu *cpu_buffer; 2474 struct ring_buffer_event *event; 2475 void *body; 2476 int ret = -EBUSY; 2477 int cpu, resched; 2478 2479 if (ring_buffer_flags != RB_BUFFERS_ON) 2480 return -EBUSY; 2481 2482 resched = ftrace_preempt_disable(); 2483 2484 if (atomic_read(&buffer->record_disabled)) 2485 goto out; 2486 2487 cpu = raw_smp_processor_id(); 2488 2489 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2490 goto out; 2491 2492 cpu_buffer = buffer->buffers[cpu]; 2493 2494 if (atomic_read(&cpu_buffer->record_disabled)) 2495 goto out; 2496 2497 if (length > BUF_MAX_DATA_SIZE) 2498 goto out; 2499 2500 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2501 if (!event) 2502 goto out; 2503 2504 body = rb_event_data(event); 2505 2506 memcpy(body, data, length); 2507 2508 rb_commit(cpu_buffer, event); 2509 2510 ret = 0; 2511 out: 2512 ftrace_preempt_enable(resched); 2513 2514 return ret; 2515 } 2516 EXPORT_SYMBOL_GPL(ring_buffer_write); 2517 2518 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2519 { 2520 struct buffer_page *reader = cpu_buffer->reader_page; 2521 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2522 struct buffer_page *commit = cpu_buffer->commit_page; 2523 2524 /* In case of error, head will be NULL */ 2525 if (unlikely(!head)) 2526 return 1; 2527 2528 return reader->read == rb_page_commit(reader) && 2529 (commit == reader || 2530 (commit == head && 2531 head->read == rb_page_commit(commit))); 2532 } 2533 2534 /** 2535 * ring_buffer_record_disable - stop all writes into the buffer 2536 * @buffer: The ring buffer to stop writes to. 2537 * 2538 * This prevents all writes to the buffer. Any attempt to write 2539 * to the buffer after this will fail and return NULL. 2540 * 2541 * The caller should call synchronize_sched() after this. 2542 */ 2543 void ring_buffer_record_disable(struct ring_buffer *buffer) 2544 { 2545 atomic_inc(&buffer->record_disabled); 2546 } 2547 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2548 2549 /** 2550 * ring_buffer_record_enable - enable writes to the buffer 2551 * @buffer: The ring buffer to enable writes 2552 * 2553 * Note, multiple disables will need the same number of enables 2554 * to truly enable the writing (much like preempt_disable). 2555 */ 2556 void ring_buffer_record_enable(struct ring_buffer *buffer) 2557 { 2558 atomic_dec(&buffer->record_disabled); 2559 } 2560 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2561 2562 /** 2563 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2564 * @buffer: The ring buffer to stop writes to. 2565 * @cpu: The CPU buffer to stop 2566 * 2567 * This prevents all writes to the buffer. Any attempt to write 2568 * to the buffer after this will fail and return NULL. 2569 * 2570 * The caller should call synchronize_sched() after this. 2571 */ 2572 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2573 { 2574 struct ring_buffer_per_cpu *cpu_buffer; 2575 2576 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2577 return; 2578 2579 cpu_buffer = buffer->buffers[cpu]; 2580 atomic_inc(&cpu_buffer->record_disabled); 2581 } 2582 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2583 2584 /** 2585 * ring_buffer_record_enable_cpu - enable writes to the buffer 2586 * @buffer: The ring buffer to enable writes 2587 * @cpu: The CPU to enable. 2588 * 2589 * Note, multiple disables will need the same number of enables 2590 * to truly enable the writing (much like preempt_disable). 2591 */ 2592 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2593 { 2594 struct ring_buffer_per_cpu *cpu_buffer; 2595 2596 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2597 return; 2598 2599 cpu_buffer = buffer->buffers[cpu]; 2600 atomic_dec(&cpu_buffer->record_disabled); 2601 } 2602 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2603 2604 /** 2605 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2606 * @buffer: The ring buffer 2607 * @cpu: The per CPU buffer to get the entries from. 2608 */ 2609 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2610 { 2611 struct ring_buffer_per_cpu *cpu_buffer; 2612 unsigned long ret; 2613 2614 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2615 return 0; 2616 2617 cpu_buffer = buffer->buffers[cpu]; 2618 ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun)) 2619 - cpu_buffer->read; 2620 2621 return ret; 2622 } 2623 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2624 2625 /** 2626 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2627 * @buffer: The ring buffer 2628 * @cpu: The per CPU buffer to get the number of overruns from 2629 */ 2630 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2631 { 2632 struct ring_buffer_per_cpu *cpu_buffer; 2633 unsigned long ret; 2634 2635 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2636 return 0; 2637 2638 cpu_buffer = buffer->buffers[cpu]; 2639 ret = local_read(&cpu_buffer->overrun); 2640 2641 return ret; 2642 } 2643 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2644 2645 /** 2646 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2647 * @buffer: The ring buffer 2648 * @cpu: The per CPU buffer to get the number of overruns from 2649 */ 2650 unsigned long 2651 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2652 { 2653 struct ring_buffer_per_cpu *cpu_buffer; 2654 unsigned long ret; 2655 2656 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2657 return 0; 2658 2659 cpu_buffer = buffer->buffers[cpu]; 2660 ret = local_read(&cpu_buffer->commit_overrun); 2661 2662 return ret; 2663 } 2664 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2665 2666 /** 2667 * ring_buffer_entries - get the number of entries in a buffer 2668 * @buffer: The ring buffer 2669 * 2670 * Returns the total number of entries in the ring buffer 2671 * (all CPU entries) 2672 */ 2673 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2674 { 2675 struct ring_buffer_per_cpu *cpu_buffer; 2676 unsigned long entries = 0; 2677 int cpu; 2678 2679 /* if you care about this being correct, lock the buffer */ 2680 for_each_buffer_cpu(buffer, cpu) { 2681 cpu_buffer = buffer->buffers[cpu]; 2682 entries += (local_read(&cpu_buffer->entries) - 2683 local_read(&cpu_buffer->overrun)) - cpu_buffer->read; 2684 } 2685 2686 return entries; 2687 } 2688 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2689 2690 /** 2691 * ring_buffer_overruns - get the number of overruns in buffer 2692 * @buffer: The ring buffer 2693 * 2694 * Returns the total number of overruns in the ring buffer 2695 * (all CPU entries) 2696 */ 2697 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2698 { 2699 struct ring_buffer_per_cpu *cpu_buffer; 2700 unsigned long overruns = 0; 2701 int cpu; 2702 2703 /* if you care about this being correct, lock the buffer */ 2704 for_each_buffer_cpu(buffer, cpu) { 2705 cpu_buffer = buffer->buffers[cpu]; 2706 overruns += local_read(&cpu_buffer->overrun); 2707 } 2708 2709 return overruns; 2710 } 2711 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2712 2713 static void rb_iter_reset(struct ring_buffer_iter *iter) 2714 { 2715 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2716 2717 /* Iterator usage is expected to have record disabled */ 2718 if (list_empty(&cpu_buffer->reader_page->list)) { 2719 iter->head_page = rb_set_head_page(cpu_buffer); 2720 if (unlikely(!iter->head_page)) 2721 return; 2722 iter->head = iter->head_page->read; 2723 } else { 2724 iter->head_page = cpu_buffer->reader_page; 2725 iter->head = cpu_buffer->reader_page->read; 2726 } 2727 if (iter->head) 2728 iter->read_stamp = cpu_buffer->read_stamp; 2729 else 2730 iter->read_stamp = iter->head_page->page->time_stamp; 2731 iter->cache_reader_page = cpu_buffer->reader_page; 2732 iter->cache_read = cpu_buffer->read; 2733 } 2734 2735 /** 2736 * ring_buffer_iter_reset - reset an iterator 2737 * @iter: The iterator to reset 2738 * 2739 * Resets the iterator, so that it will start from the beginning 2740 * again. 2741 */ 2742 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2743 { 2744 struct ring_buffer_per_cpu *cpu_buffer; 2745 unsigned long flags; 2746 2747 if (!iter) 2748 return; 2749 2750 cpu_buffer = iter->cpu_buffer; 2751 2752 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2753 rb_iter_reset(iter); 2754 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2755 } 2756 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2757 2758 /** 2759 * ring_buffer_iter_empty - check if an iterator has no more to read 2760 * @iter: The iterator to check 2761 */ 2762 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2763 { 2764 struct ring_buffer_per_cpu *cpu_buffer; 2765 2766 cpu_buffer = iter->cpu_buffer; 2767 2768 return iter->head_page == cpu_buffer->commit_page && 2769 iter->head == rb_commit_index(cpu_buffer); 2770 } 2771 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2772 2773 static void 2774 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2775 struct ring_buffer_event *event) 2776 { 2777 u64 delta; 2778 2779 switch (event->type_len) { 2780 case RINGBUF_TYPE_PADDING: 2781 return; 2782 2783 case RINGBUF_TYPE_TIME_EXTEND: 2784 delta = event->array[0]; 2785 delta <<= TS_SHIFT; 2786 delta += event->time_delta; 2787 cpu_buffer->read_stamp += delta; 2788 return; 2789 2790 case RINGBUF_TYPE_TIME_STAMP: 2791 /* FIXME: not implemented */ 2792 return; 2793 2794 case RINGBUF_TYPE_DATA: 2795 cpu_buffer->read_stamp += event->time_delta; 2796 return; 2797 2798 default: 2799 BUG(); 2800 } 2801 return; 2802 } 2803 2804 static void 2805 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2806 struct ring_buffer_event *event) 2807 { 2808 u64 delta; 2809 2810 switch (event->type_len) { 2811 case RINGBUF_TYPE_PADDING: 2812 return; 2813 2814 case RINGBUF_TYPE_TIME_EXTEND: 2815 delta = event->array[0]; 2816 delta <<= TS_SHIFT; 2817 delta += event->time_delta; 2818 iter->read_stamp += delta; 2819 return; 2820 2821 case RINGBUF_TYPE_TIME_STAMP: 2822 /* FIXME: not implemented */ 2823 return; 2824 2825 case RINGBUF_TYPE_DATA: 2826 iter->read_stamp += event->time_delta; 2827 return; 2828 2829 default: 2830 BUG(); 2831 } 2832 return; 2833 } 2834 2835 static struct buffer_page * 2836 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2837 { 2838 struct buffer_page *reader = NULL; 2839 unsigned long flags; 2840 int nr_loops = 0; 2841 int ret; 2842 2843 local_irq_save(flags); 2844 arch_spin_lock(&cpu_buffer->lock); 2845 2846 again: 2847 /* 2848 * This should normally only loop twice. But because the 2849 * start of the reader inserts an empty page, it causes 2850 * a case where we will loop three times. There should be no 2851 * reason to loop four times (that I know of). 2852 */ 2853 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2854 reader = NULL; 2855 goto out; 2856 } 2857 2858 reader = cpu_buffer->reader_page; 2859 2860 /* If there's more to read, return this page */ 2861 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2862 goto out; 2863 2864 /* Never should we have an index greater than the size */ 2865 if (RB_WARN_ON(cpu_buffer, 2866 cpu_buffer->reader_page->read > rb_page_size(reader))) 2867 goto out; 2868 2869 /* check if we caught up to the tail */ 2870 reader = NULL; 2871 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2872 goto out; 2873 2874 /* 2875 * Reset the reader page to size zero. 2876 */ 2877 local_set(&cpu_buffer->reader_page->write, 0); 2878 local_set(&cpu_buffer->reader_page->entries, 0); 2879 local_set(&cpu_buffer->reader_page->page->commit, 0); 2880 2881 spin: 2882 /* 2883 * Splice the empty reader page into the list around the head. 2884 */ 2885 reader = rb_set_head_page(cpu_buffer); 2886 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 2887 cpu_buffer->reader_page->list.prev = reader->list.prev; 2888 2889 /* 2890 * cpu_buffer->pages just needs to point to the buffer, it 2891 * has no specific buffer page to point to. Lets move it out 2892 * of our way so we don't accidently swap it. 2893 */ 2894 cpu_buffer->pages = reader->list.prev; 2895 2896 /* The reader page will be pointing to the new head */ 2897 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2898 2899 /* 2900 * Here's the tricky part. 2901 * 2902 * We need to move the pointer past the header page. 2903 * But we can only do that if a writer is not currently 2904 * moving it. The page before the header page has the 2905 * flag bit '1' set if it is pointing to the page we want. 2906 * but if the writer is in the process of moving it 2907 * than it will be '2' or already moved '0'. 2908 */ 2909 2910 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 2911 2912 /* 2913 * If we did not convert it, then we must try again. 2914 */ 2915 if (!ret) 2916 goto spin; 2917 2918 /* 2919 * Yeah! We succeeded in replacing the page. 2920 * 2921 * Now make the new head point back to the reader page. 2922 */ 2923 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 2924 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2925 2926 /* Finally update the reader page to the new head */ 2927 cpu_buffer->reader_page = reader; 2928 rb_reset_reader_page(cpu_buffer); 2929 2930 goto again; 2931 2932 out: 2933 arch_spin_unlock(&cpu_buffer->lock); 2934 local_irq_restore(flags); 2935 2936 return reader; 2937 } 2938 2939 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2940 { 2941 struct ring_buffer_event *event; 2942 struct buffer_page *reader; 2943 unsigned length; 2944 2945 reader = rb_get_reader_page(cpu_buffer); 2946 2947 /* This function should not be called when buffer is empty */ 2948 if (RB_WARN_ON(cpu_buffer, !reader)) 2949 return; 2950 2951 event = rb_reader_event(cpu_buffer); 2952 2953 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 2954 cpu_buffer->read++; 2955 2956 rb_update_read_stamp(cpu_buffer, event); 2957 2958 length = rb_event_length(event); 2959 cpu_buffer->reader_page->read += length; 2960 } 2961 2962 static void rb_advance_iter(struct ring_buffer_iter *iter) 2963 { 2964 struct ring_buffer *buffer; 2965 struct ring_buffer_per_cpu *cpu_buffer; 2966 struct ring_buffer_event *event; 2967 unsigned length; 2968 2969 cpu_buffer = iter->cpu_buffer; 2970 buffer = cpu_buffer->buffer; 2971 2972 /* 2973 * Check if we are at the end of the buffer. 2974 */ 2975 if (iter->head >= rb_page_size(iter->head_page)) { 2976 /* discarded commits can make the page empty */ 2977 if (iter->head_page == cpu_buffer->commit_page) 2978 return; 2979 rb_inc_iter(iter); 2980 return; 2981 } 2982 2983 event = rb_iter_head_event(iter); 2984 2985 length = rb_event_length(event); 2986 2987 /* 2988 * This should not be called to advance the header if we are 2989 * at the tail of the buffer. 2990 */ 2991 if (RB_WARN_ON(cpu_buffer, 2992 (iter->head_page == cpu_buffer->commit_page) && 2993 (iter->head + length > rb_commit_index(cpu_buffer)))) 2994 return; 2995 2996 rb_update_iter_read_stamp(iter, event); 2997 2998 iter->head += length; 2999 3000 /* check for end of page padding */ 3001 if ((iter->head >= rb_page_size(iter->head_page)) && 3002 (iter->head_page != cpu_buffer->commit_page)) 3003 rb_advance_iter(iter); 3004 } 3005 3006 static struct ring_buffer_event * 3007 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) 3008 { 3009 struct ring_buffer_event *event; 3010 struct buffer_page *reader; 3011 int nr_loops = 0; 3012 3013 again: 3014 /* 3015 * We repeat when a timestamp is encountered. It is possible 3016 * to get multiple timestamps from an interrupt entering just 3017 * as one timestamp is about to be written, or from discarded 3018 * commits. The most that we can have is the number on a single page. 3019 */ 3020 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3021 return NULL; 3022 3023 reader = rb_get_reader_page(cpu_buffer); 3024 if (!reader) 3025 return NULL; 3026 3027 event = rb_reader_event(cpu_buffer); 3028 3029 switch (event->type_len) { 3030 case RINGBUF_TYPE_PADDING: 3031 if (rb_null_event(event)) 3032 RB_WARN_ON(cpu_buffer, 1); 3033 /* 3034 * Because the writer could be discarding every 3035 * event it creates (which would probably be bad) 3036 * if we were to go back to "again" then we may never 3037 * catch up, and will trigger the warn on, or lock 3038 * the box. Return the padding, and we will release 3039 * the current locks, and try again. 3040 */ 3041 return event; 3042 3043 case RINGBUF_TYPE_TIME_EXTEND: 3044 /* Internal data, OK to advance */ 3045 rb_advance_reader(cpu_buffer); 3046 goto again; 3047 3048 case RINGBUF_TYPE_TIME_STAMP: 3049 /* FIXME: not implemented */ 3050 rb_advance_reader(cpu_buffer); 3051 goto again; 3052 3053 case RINGBUF_TYPE_DATA: 3054 if (ts) { 3055 *ts = cpu_buffer->read_stamp + event->time_delta; 3056 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3057 cpu_buffer->cpu, ts); 3058 } 3059 return event; 3060 3061 default: 3062 BUG(); 3063 } 3064 3065 return NULL; 3066 } 3067 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3068 3069 static struct ring_buffer_event * 3070 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3071 { 3072 struct ring_buffer *buffer; 3073 struct ring_buffer_per_cpu *cpu_buffer; 3074 struct ring_buffer_event *event; 3075 int nr_loops = 0; 3076 3077 cpu_buffer = iter->cpu_buffer; 3078 buffer = cpu_buffer->buffer; 3079 3080 /* 3081 * Check if someone performed a consuming read to 3082 * the buffer. A consuming read invalidates the iterator 3083 * and we need to reset the iterator in this case. 3084 */ 3085 if (unlikely(iter->cache_read != cpu_buffer->read || 3086 iter->cache_reader_page != cpu_buffer->reader_page)) 3087 rb_iter_reset(iter); 3088 3089 again: 3090 if (ring_buffer_iter_empty(iter)) 3091 return NULL; 3092 3093 /* 3094 * We repeat when a timestamp is encountered. 3095 * We can get multiple timestamps by nested interrupts or also 3096 * if filtering is on (discarding commits). Since discarding 3097 * commits can be frequent we can get a lot of timestamps. 3098 * But we limit them by not adding timestamps if they begin 3099 * at the start of a page. 3100 */ 3101 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3102 return NULL; 3103 3104 if (rb_per_cpu_empty(cpu_buffer)) 3105 return NULL; 3106 3107 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3108 rb_inc_iter(iter); 3109 goto again; 3110 } 3111 3112 event = rb_iter_head_event(iter); 3113 3114 switch (event->type_len) { 3115 case RINGBUF_TYPE_PADDING: 3116 if (rb_null_event(event)) { 3117 rb_inc_iter(iter); 3118 goto again; 3119 } 3120 rb_advance_iter(iter); 3121 return event; 3122 3123 case RINGBUF_TYPE_TIME_EXTEND: 3124 /* Internal data, OK to advance */ 3125 rb_advance_iter(iter); 3126 goto again; 3127 3128 case RINGBUF_TYPE_TIME_STAMP: 3129 /* FIXME: not implemented */ 3130 rb_advance_iter(iter); 3131 goto again; 3132 3133 case RINGBUF_TYPE_DATA: 3134 if (ts) { 3135 *ts = iter->read_stamp + event->time_delta; 3136 ring_buffer_normalize_time_stamp(buffer, 3137 cpu_buffer->cpu, ts); 3138 } 3139 return event; 3140 3141 default: 3142 BUG(); 3143 } 3144 3145 return NULL; 3146 } 3147 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3148 3149 static inline int rb_ok_to_lock(void) 3150 { 3151 /* 3152 * If an NMI die dumps out the content of the ring buffer 3153 * do not grab locks. We also permanently disable the ring 3154 * buffer too. A one time deal is all you get from reading 3155 * the ring buffer from an NMI. 3156 */ 3157 if (likely(!in_nmi())) 3158 return 1; 3159 3160 tracing_off_permanent(); 3161 return 0; 3162 } 3163 3164 /** 3165 * ring_buffer_peek - peek at the next event to be read 3166 * @buffer: The ring buffer to read 3167 * @cpu: The cpu to peak at 3168 * @ts: The timestamp counter of this event. 3169 * 3170 * This will return the event that will be read next, but does 3171 * not consume the data. 3172 */ 3173 struct ring_buffer_event * 3174 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3175 { 3176 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3177 struct ring_buffer_event *event; 3178 unsigned long flags; 3179 int dolock; 3180 3181 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3182 return NULL; 3183 3184 dolock = rb_ok_to_lock(); 3185 again: 3186 local_irq_save(flags); 3187 if (dolock) 3188 spin_lock(&cpu_buffer->reader_lock); 3189 event = rb_buffer_peek(cpu_buffer, ts); 3190 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3191 rb_advance_reader(cpu_buffer); 3192 if (dolock) 3193 spin_unlock(&cpu_buffer->reader_lock); 3194 local_irq_restore(flags); 3195 3196 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3197 goto again; 3198 3199 return event; 3200 } 3201 3202 /** 3203 * ring_buffer_iter_peek - peek at the next event to be read 3204 * @iter: The ring buffer iterator 3205 * @ts: The timestamp counter of this event. 3206 * 3207 * This will return the event that will be read next, but does 3208 * not increment the iterator. 3209 */ 3210 struct ring_buffer_event * 3211 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3212 { 3213 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3214 struct ring_buffer_event *event; 3215 unsigned long flags; 3216 3217 again: 3218 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3219 event = rb_iter_peek(iter, ts); 3220 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3221 3222 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3223 goto again; 3224 3225 return event; 3226 } 3227 3228 /** 3229 * ring_buffer_consume - return an event and consume it 3230 * @buffer: The ring buffer to get the next event from 3231 * 3232 * Returns the next event in the ring buffer, and that event is consumed. 3233 * Meaning, that sequential reads will keep returning a different event, 3234 * and eventually empty the ring buffer if the producer is slower. 3235 */ 3236 struct ring_buffer_event * 3237 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 3238 { 3239 struct ring_buffer_per_cpu *cpu_buffer; 3240 struct ring_buffer_event *event = NULL; 3241 unsigned long flags; 3242 int dolock; 3243 3244 dolock = rb_ok_to_lock(); 3245 3246 again: 3247 /* might be called in atomic */ 3248 preempt_disable(); 3249 3250 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3251 goto out; 3252 3253 cpu_buffer = buffer->buffers[cpu]; 3254 local_irq_save(flags); 3255 if (dolock) 3256 spin_lock(&cpu_buffer->reader_lock); 3257 3258 event = rb_buffer_peek(cpu_buffer, ts); 3259 if (event) 3260 rb_advance_reader(cpu_buffer); 3261 3262 if (dolock) 3263 spin_unlock(&cpu_buffer->reader_lock); 3264 local_irq_restore(flags); 3265 3266 out: 3267 preempt_enable(); 3268 3269 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3270 goto again; 3271 3272 return event; 3273 } 3274 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3275 3276 /** 3277 * ring_buffer_read_start - start a non consuming read of the buffer 3278 * @buffer: The ring buffer to read from 3279 * @cpu: The cpu buffer to iterate over 3280 * 3281 * This starts up an iteration through the buffer. It also disables 3282 * the recording to the buffer until the reading is finished. 3283 * This prevents the reading from being corrupted. This is not 3284 * a consuming read, so a producer is not expected. 3285 * 3286 * Must be paired with ring_buffer_finish. 3287 */ 3288 struct ring_buffer_iter * 3289 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 3290 { 3291 struct ring_buffer_per_cpu *cpu_buffer; 3292 struct ring_buffer_iter *iter; 3293 unsigned long flags; 3294 3295 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3296 return NULL; 3297 3298 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3299 if (!iter) 3300 return NULL; 3301 3302 cpu_buffer = buffer->buffers[cpu]; 3303 3304 iter->cpu_buffer = cpu_buffer; 3305 3306 atomic_inc(&cpu_buffer->record_disabled); 3307 synchronize_sched(); 3308 3309 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3310 arch_spin_lock(&cpu_buffer->lock); 3311 rb_iter_reset(iter); 3312 arch_spin_unlock(&cpu_buffer->lock); 3313 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3314 3315 return iter; 3316 } 3317 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3318 3319 /** 3320 * ring_buffer_finish - finish reading the iterator of the buffer 3321 * @iter: The iterator retrieved by ring_buffer_start 3322 * 3323 * This re-enables the recording to the buffer, and frees the 3324 * iterator. 3325 */ 3326 void 3327 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3328 { 3329 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3330 3331 atomic_dec(&cpu_buffer->record_disabled); 3332 kfree(iter); 3333 } 3334 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3335 3336 /** 3337 * ring_buffer_read - read the next item in the ring buffer by the iterator 3338 * @iter: The ring buffer iterator 3339 * @ts: The time stamp of the event read. 3340 * 3341 * This reads the next event in the ring buffer and increments the iterator. 3342 */ 3343 struct ring_buffer_event * 3344 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3345 { 3346 struct ring_buffer_event *event; 3347 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3348 unsigned long flags; 3349 3350 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3351 again: 3352 event = rb_iter_peek(iter, ts); 3353 if (!event) 3354 goto out; 3355 3356 if (event->type_len == RINGBUF_TYPE_PADDING) 3357 goto again; 3358 3359 rb_advance_iter(iter); 3360 out: 3361 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3362 3363 return event; 3364 } 3365 EXPORT_SYMBOL_GPL(ring_buffer_read); 3366 3367 /** 3368 * ring_buffer_size - return the size of the ring buffer (in bytes) 3369 * @buffer: The ring buffer. 3370 */ 3371 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3372 { 3373 return BUF_PAGE_SIZE * buffer->pages; 3374 } 3375 EXPORT_SYMBOL_GPL(ring_buffer_size); 3376 3377 static void 3378 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3379 { 3380 rb_head_page_deactivate(cpu_buffer); 3381 3382 cpu_buffer->head_page 3383 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3384 local_set(&cpu_buffer->head_page->write, 0); 3385 local_set(&cpu_buffer->head_page->entries, 0); 3386 local_set(&cpu_buffer->head_page->page->commit, 0); 3387 3388 cpu_buffer->head_page->read = 0; 3389 3390 cpu_buffer->tail_page = cpu_buffer->head_page; 3391 cpu_buffer->commit_page = cpu_buffer->head_page; 3392 3393 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3394 local_set(&cpu_buffer->reader_page->write, 0); 3395 local_set(&cpu_buffer->reader_page->entries, 0); 3396 local_set(&cpu_buffer->reader_page->page->commit, 0); 3397 cpu_buffer->reader_page->read = 0; 3398 3399 local_set(&cpu_buffer->commit_overrun, 0); 3400 local_set(&cpu_buffer->overrun, 0); 3401 local_set(&cpu_buffer->entries, 0); 3402 local_set(&cpu_buffer->committing, 0); 3403 local_set(&cpu_buffer->commits, 0); 3404 cpu_buffer->read = 0; 3405 3406 cpu_buffer->write_stamp = 0; 3407 cpu_buffer->read_stamp = 0; 3408 3409 rb_head_page_activate(cpu_buffer); 3410 } 3411 3412 /** 3413 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3414 * @buffer: The ring buffer to reset a per cpu buffer of 3415 * @cpu: The CPU buffer to be reset 3416 */ 3417 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3418 { 3419 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3420 unsigned long flags; 3421 3422 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3423 return; 3424 3425 atomic_inc(&cpu_buffer->record_disabled); 3426 3427 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3428 3429 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3430 goto out; 3431 3432 arch_spin_lock(&cpu_buffer->lock); 3433 3434 rb_reset_cpu(cpu_buffer); 3435 3436 arch_spin_unlock(&cpu_buffer->lock); 3437 3438 out: 3439 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3440 3441 atomic_dec(&cpu_buffer->record_disabled); 3442 } 3443 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3444 3445 /** 3446 * ring_buffer_reset - reset a ring buffer 3447 * @buffer: The ring buffer to reset all cpu buffers 3448 */ 3449 void ring_buffer_reset(struct ring_buffer *buffer) 3450 { 3451 int cpu; 3452 3453 for_each_buffer_cpu(buffer, cpu) 3454 ring_buffer_reset_cpu(buffer, cpu); 3455 } 3456 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3457 3458 /** 3459 * rind_buffer_empty - is the ring buffer empty? 3460 * @buffer: The ring buffer to test 3461 */ 3462 int ring_buffer_empty(struct ring_buffer *buffer) 3463 { 3464 struct ring_buffer_per_cpu *cpu_buffer; 3465 unsigned long flags; 3466 int dolock; 3467 int cpu; 3468 int ret; 3469 3470 dolock = rb_ok_to_lock(); 3471 3472 /* yes this is racy, but if you don't like the race, lock the buffer */ 3473 for_each_buffer_cpu(buffer, cpu) { 3474 cpu_buffer = buffer->buffers[cpu]; 3475 local_irq_save(flags); 3476 if (dolock) 3477 spin_lock(&cpu_buffer->reader_lock); 3478 ret = rb_per_cpu_empty(cpu_buffer); 3479 if (dolock) 3480 spin_unlock(&cpu_buffer->reader_lock); 3481 local_irq_restore(flags); 3482 3483 if (!ret) 3484 return 0; 3485 } 3486 3487 return 1; 3488 } 3489 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3490 3491 /** 3492 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3493 * @buffer: The ring buffer 3494 * @cpu: The CPU buffer to test 3495 */ 3496 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3497 { 3498 struct ring_buffer_per_cpu *cpu_buffer; 3499 unsigned long flags; 3500 int dolock; 3501 int ret; 3502 3503 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3504 return 1; 3505 3506 dolock = rb_ok_to_lock(); 3507 3508 cpu_buffer = buffer->buffers[cpu]; 3509 local_irq_save(flags); 3510 if (dolock) 3511 spin_lock(&cpu_buffer->reader_lock); 3512 ret = rb_per_cpu_empty(cpu_buffer); 3513 if (dolock) 3514 spin_unlock(&cpu_buffer->reader_lock); 3515 local_irq_restore(flags); 3516 3517 return ret; 3518 } 3519 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3520 3521 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3522 /** 3523 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3524 * @buffer_a: One buffer to swap with 3525 * @buffer_b: The other buffer to swap with 3526 * 3527 * This function is useful for tracers that want to take a "snapshot" 3528 * of a CPU buffer and has another back up buffer lying around. 3529 * it is expected that the tracer handles the cpu buffer not being 3530 * used at the moment. 3531 */ 3532 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3533 struct ring_buffer *buffer_b, int cpu) 3534 { 3535 struct ring_buffer_per_cpu *cpu_buffer_a; 3536 struct ring_buffer_per_cpu *cpu_buffer_b; 3537 int ret = -EINVAL; 3538 3539 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3540 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3541 goto out; 3542 3543 /* At least make sure the two buffers are somewhat the same */ 3544 if (buffer_a->pages != buffer_b->pages) 3545 goto out; 3546 3547 ret = -EAGAIN; 3548 3549 if (ring_buffer_flags != RB_BUFFERS_ON) 3550 goto out; 3551 3552 if (atomic_read(&buffer_a->record_disabled)) 3553 goto out; 3554 3555 if (atomic_read(&buffer_b->record_disabled)) 3556 goto out; 3557 3558 cpu_buffer_a = buffer_a->buffers[cpu]; 3559 cpu_buffer_b = buffer_b->buffers[cpu]; 3560 3561 if (atomic_read(&cpu_buffer_a->record_disabled)) 3562 goto out; 3563 3564 if (atomic_read(&cpu_buffer_b->record_disabled)) 3565 goto out; 3566 3567 /* 3568 * We can't do a synchronize_sched here because this 3569 * function can be called in atomic context. 3570 * Normally this will be called from the same CPU as cpu. 3571 * If not it's up to the caller to protect this. 3572 */ 3573 atomic_inc(&cpu_buffer_a->record_disabled); 3574 atomic_inc(&cpu_buffer_b->record_disabled); 3575 3576 ret = -EBUSY; 3577 if (local_read(&cpu_buffer_a->committing)) 3578 goto out_dec; 3579 if (local_read(&cpu_buffer_b->committing)) 3580 goto out_dec; 3581 3582 buffer_a->buffers[cpu] = cpu_buffer_b; 3583 buffer_b->buffers[cpu] = cpu_buffer_a; 3584 3585 cpu_buffer_b->buffer = buffer_a; 3586 cpu_buffer_a->buffer = buffer_b; 3587 3588 ret = 0; 3589 3590 out_dec: 3591 atomic_dec(&cpu_buffer_a->record_disabled); 3592 atomic_dec(&cpu_buffer_b->record_disabled); 3593 out: 3594 return ret; 3595 } 3596 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3597 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3598 3599 /** 3600 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3601 * @buffer: the buffer to allocate for. 3602 * 3603 * This function is used in conjunction with ring_buffer_read_page. 3604 * When reading a full page from the ring buffer, these functions 3605 * can be used to speed up the process. The calling function should 3606 * allocate a few pages first with this function. Then when it 3607 * needs to get pages from the ring buffer, it passes the result 3608 * of this function into ring_buffer_read_page, which will swap 3609 * the page that was allocated, with the read page of the buffer. 3610 * 3611 * Returns: 3612 * The page allocated, or NULL on error. 3613 */ 3614 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 3615 { 3616 struct buffer_data_page *bpage; 3617 unsigned long addr; 3618 3619 addr = __get_free_page(GFP_KERNEL); 3620 if (!addr) 3621 return NULL; 3622 3623 bpage = (void *)addr; 3624 3625 rb_init_page(bpage); 3626 3627 return bpage; 3628 } 3629 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3630 3631 /** 3632 * ring_buffer_free_read_page - free an allocated read page 3633 * @buffer: the buffer the page was allocate for 3634 * @data: the page to free 3635 * 3636 * Free a page allocated from ring_buffer_alloc_read_page. 3637 */ 3638 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3639 { 3640 free_page((unsigned long)data); 3641 } 3642 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3643 3644 /** 3645 * ring_buffer_read_page - extract a page from the ring buffer 3646 * @buffer: buffer to extract from 3647 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3648 * @len: amount to extract 3649 * @cpu: the cpu of the buffer to extract 3650 * @full: should the extraction only happen when the page is full. 3651 * 3652 * This function will pull out a page from the ring buffer and consume it. 3653 * @data_page must be the address of the variable that was returned 3654 * from ring_buffer_alloc_read_page. This is because the page might be used 3655 * to swap with a page in the ring buffer. 3656 * 3657 * for example: 3658 * rpage = ring_buffer_alloc_read_page(buffer); 3659 * if (!rpage) 3660 * return error; 3661 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3662 * if (ret >= 0) 3663 * process_page(rpage, ret); 3664 * 3665 * When @full is set, the function will not return true unless 3666 * the writer is off the reader page. 3667 * 3668 * Note: it is up to the calling functions to handle sleeps and wakeups. 3669 * The ring buffer can be used anywhere in the kernel and can not 3670 * blindly call wake_up. The layer that uses the ring buffer must be 3671 * responsible for that. 3672 * 3673 * Returns: 3674 * >=0 if data has been transferred, returns the offset of consumed data. 3675 * <0 if no data has been transferred. 3676 */ 3677 int ring_buffer_read_page(struct ring_buffer *buffer, 3678 void **data_page, size_t len, int cpu, int full) 3679 { 3680 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3681 struct ring_buffer_event *event; 3682 struct buffer_data_page *bpage; 3683 struct buffer_page *reader; 3684 unsigned long flags; 3685 unsigned int commit; 3686 unsigned int read; 3687 u64 save_timestamp; 3688 int ret = -1; 3689 3690 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3691 goto out; 3692 3693 /* 3694 * If len is not big enough to hold the page header, then 3695 * we can not copy anything. 3696 */ 3697 if (len <= BUF_PAGE_HDR_SIZE) 3698 goto out; 3699 3700 len -= BUF_PAGE_HDR_SIZE; 3701 3702 if (!data_page) 3703 goto out; 3704 3705 bpage = *data_page; 3706 if (!bpage) 3707 goto out; 3708 3709 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3710 3711 reader = rb_get_reader_page(cpu_buffer); 3712 if (!reader) 3713 goto out_unlock; 3714 3715 event = rb_reader_event(cpu_buffer); 3716 3717 read = reader->read; 3718 commit = rb_page_commit(reader); 3719 3720 /* 3721 * If this page has been partially read or 3722 * if len is not big enough to read the rest of the page or 3723 * a writer is still on the page, then 3724 * we must copy the data from the page to the buffer. 3725 * Otherwise, we can simply swap the page with the one passed in. 3726 */ 3727 if (read || (len < (commit - read)) || 3728 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3729 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3730 unsigned int rpos = read; 3731 unsigned int pos = 0; 3732 unsigned int size; 3733 3734 if (full) 3735 goto out_unlock; 3736 3737 if (len > (commit - read)) 3738 len = (commit - read); 3739 3740 size = rb_event_length(event); 3741 3742 if (len < size) 3743 goto out_unlock; 3744 3745 /* save the current timestamp, since the user will need it */ 3746 save_timestamp = cpu_buffer->read_stamp; 3747 3748 /* Need to copy one event at a time */ 3749 do { 3750 memcpy(bpage->data + pos, rpage->data + rpos, size); 3751 3752 len -= size; 3753 3754 rb_advance_reader(cpu_buffer); 3755 rpos = reader->read; 3756 pos += size; 3757 3758 event = rb_reader_event(cpu_buffer); 3759 size = rb_event_length(event); 3760 } while (len > size); 3761 3762 /* update bpage */ 3763 local_set(&bpage->commit, pos); 3764 bpage->time_stamp = save_timestamp; 3765 3766 /* we copied everything to the beginning */ 3767 read = 0; 3768 } else { 3769 /* update the entry counter */ 3770 cpu_buffer->read += rb_page_entries(reader); 3771 3772 /* swap the pages */ 3773 rb_init_page(bpage); 3774 bpage = reader->page; 3775 reader->page = *data_page; 3776 local_set(&reader->write, 0); 3777 local_set(&reader->entries, 0); 3778 reader->read = 0; 3779 *data_page = bpage; 3780 } 3781 ret = read; 3782 3783 out_unlock: 3784 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3785 3786 out: 3787 return ret; 3788 } 3789 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3790 3791 #ifdef CONFIG_TRACING 3792 static ssize_t 3793 rb_simple_read(struct file *filp, char __user *ubuf, 3794 size_t cnt, loff_t *ppos) 3795 { 3796 unsigned long *p = filp->private_data; 3797 char buf[64]; 3798 int r; 3799 3800 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3801 r = sprintf(buf, "permanently disabled\n"); 3802 else 3803 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3804 3805 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3806 } 3807 3808 static ssize_t 3809 rb_simple_write(struct file *filp, const char __user *ubuf, 3810 size_t cnt, loff_t *ppos) 3811 { 3812 unsigned long *p = filp->private_data; 3813 char buf[64]; 3814 unsigned long val; 3815 int ret; 3816 3817 if (cnt >= sizeof(buf)) 3818 return -EINVAL; 3819 3820 if (copy_from_user(&buf, ubuf, cnt)) 3821 return -EFAULT; 3822 3823 buf[cnt] = 0; 3824 3825 ret = strict_strtoul(buf, 10, &val); 3826 if (ret < 0) 3827 return ret; 3828 3829 if (val) 3830 set_bit(RB_BUFFERS_ON_BIT, p); 3831 else 3832 clear_bit(RB_BUFFERS_ON_BIT, p); 3833 3834 (*ppos)++; 3835 3836 return cnt; 3837 } 3838 3839 static const struct file_operations rb_simple_fops = { 3840 .open = tracing_open_generic, 3841 .read = rb_simple_read, 3842 .write = rb_simple_write, 3843 }; 3844 3845 3846 static __init int rb_init_debugfs(void) 3847 { 3848 struct dentry *d_tracer; 3849 3850 d_tracer = tracing_init_dentry(); 3851 3852 trace_create_file("tracing_on", 0644, d_tracer, 3853 &ring_buffer_flags, &rb_simple_fops); 3854 3855 return 0; 3856 } 3857 3858 fs_initcall(rb_init_debugfs); 3859 #endif 3860 3861 #ifdef CONFIG_HOTPLUG_CPU 3862 static int rb_cpu_notify(struct notifier_block *self, 3863 unsigned long action, void *hcpu) 3864 { 3865 struct ring_buffer *buffer = 3866 container_of(self, struct ring_buffer, cpu_notify); 3867 long cpu = (long)hcpu; 3868 3869 switch (action) { 3870 case CPU_UP_PREPARE: 3871 case CPU_UP_PREPARE_FROZEN: 3872 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3873 return NOTIFY_OK; 3874 3875 buffer->buffers[cpu] = 3876 rb_allocate_cpu_buffer(buffer, cpu); 3877 if (!buffer->buffers[cpu]) { 3878 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3879 cpu); 3880 return NOTIFY_OK; 3881 } 3882 smp_wmb(); 3883 cpumask_set_cpu(cpu, buffer->cpumask); 3884 break; 3885 case CPU_DOWN_PREPARE: 3886 case CPU_DOWN_PREPARE_FROZEN: 3887 /* 3888 * Do nothing. 3889 * If we were to free the buffer, then the user would 3890 * lose any trace that was in the buffer. 3891 */ 3892 break; 3893 default: 3894 break; 3895 } 3896 return NOTIFY_OK; 3897 } 3898 #endif 3899