1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/slab.h> 18 #include <linux/init.h> 19 #include <linux/hash.h> 20 #include <linux/list.h> 21 #include <linux/cpu.h> 22 #include <linux/fs.h> 23 24 #include <asm/local.h> 25 #include "trace.h" 26 27 /* 28 * The ring buffer header is special. We must manually up keep it. 29 */ 30 int ring_buffer_print_entry_header(struct trace_seq *s) 31 { 32 int ret; 33 34 ret = trace_seq_printf(s, "# compressed entry header\n"); 35 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 36 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 37 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 38 ret = trace_seq_printf(s, "\n"); 39 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 40 RINGBUF_TYPE_PADDING); 41 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 42 RINGBUF_TYPE_TIME_EXTEND); 43 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 44 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 45 46 return ret; 47 } 48 49 /* 50 * The ring buffer is made up of a list of pages. A separate list of pages is 51 * allocated for each CPU. A writer may only write to a buffer that is 52 * associated with the CPU it is currently executing on. A reader may read 53 * from any per cpu buffer. 54 * 55 * The reader is special. For each per cpu buffer, the reader has its own 56 * reader page. When a reader has read the entire reader page, this reader 57 * page is swapped with another page in the ring buffer. 58 * 59 * Now, as long as the writer is off the reader page, the reader can do what 60 * ever it wants with that page. The writer will never write to that page 61 * again (as long as it is out of the ring buffer). 62 * 63 * Here's some silly ASCII art. 64 * 65 * +------+ 66 * |reader| RING BUFFER 67 * |page | 68 * +------+ +---+ +---+ +---+ 69 * | |-->| |-->| | 70 * +---+ +---+ +---+ 71 * ^ | 72 * | | 73 * +---------------+ 74 * 75 * 76 * +------+ 77 * |reader| RING BUFFER 78 * |page |------------------v 79 * +------+ +---+ +---+ +---+ 80 * | |-->| |-->| | 81 * +---+ +---+ +---+ 82 * ^ | 83 * | | 84 * +---------------+ 85 * 86 * 87 * +------+ 88 * |reader| RING BUFFER 89 * |page |------------------v 90 * +------+ +---+ +---+ +---+ 91 * ^ | |-->| |-->| | 92 * | +---+ +---+ +---+ 93 * | | 94 * | | 95 * +------------------------------+ 96 * 97 * 98 * +------+ 99 * |buffer| RING BUFFER 100 * |page |------------------v 101 * +------+ +---+ +---+ +---+ 102 * ^ | | | |-->| | 103 * | New +---+ +---+ +---+ 104 * | Reader------^ | 105 * | page | 106 * +------------------------------+ 107 * 108 * 109 * After we make this swap, the reader can hand this page off to the splice 110 * code and be done with it. It can even allocate a new page if it needs to 111 * and swap that into the ring buffer. 112 * 113 * We will be using cmpxchg soon to make all this lockless. 114 * 115 */ 116 117 /* 118 * A fast way to enable or disable all ring buffers is to 119 * call tracing_on or tracing_off. Turning off the ring buffers 120 * prevents all ring buffers from being recorded to. 121 * Turning this switch on, makes it OK to write to the 122 * ring buffer, if the ring buffer is enabled itself. 123 * 124 * There's three layers that must be on in order to write 125 * to the ring buffer. 126 * 127 * 1) This global flag must be set. 128 * 2) The ring buffer must be enabled for recording. 129 * 3) The per cpu buffer must be enabled for recording. 130 * 131 * In case of an anomaly, this global flag has a bit set that 132 * will permantly disable all ring buffers. 133 */ 134 135 /* 136 * Global flag to disable all recording to ring buffers 137 * This has two bits: ON, DISABLED 138 * 139 * ON DISABLED 140 * ---- ---------- 141 * 0 0 : ring buffers are off 142 * 1 0 : ring buffers are on 143 * X 1 : ring buffers are permanently disabled 144 */ 145 146 enum { 147 RB_BUFFERS_ON_BIT = 0, 148 RB_BUFFERS_DISABLED_BIT = 1, 149 }; 150 151 enum { 152 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 153 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 154 }; 155 156 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 157 158 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 159 160 /** 161 * tracing_on - enable all tracing buffers 162 * 163 * This function enables all tracing buffers that may have been 164 * disabled with tracing_off. 165 */ 166 void tracing_on(void) 167 { 168 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 169 } 170 EXPORT_SYMBOL_GPL(tracing_on); 171 172 /** 173 * tracing_off - turn off all tracing buffers 174 * 175 * This function stops all tracing buffers from recording data. 176 * It does not disable any overhead the tracers themselves may 177 * be causing. This function simply causes all recording to 178 * the ring buffers to fail. 179 */ 180 void tracing_off(void) 181 { 182 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 183 } 184 EXPORT_SYMBOL_GPL(tracing_off); 185 186 /** 187 * tracing_off_permanent - permanently disable ring buffers 188 * 189 * This function, once called, will disable all ring buffers 190 * permanently. 191 */ 192 void tracing_off_permanent(void) 193 { 194 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 195 } 196 197 /** 198 * tracing_is_on - show state of ring buffers enabled 199 */ 200 int tracing_is_on(void) 201 { 202 return ring_buffer_flags == RB_BUFFERS_ON; 203 } 204 EXPORT_SYMBOL_GPL(tracing_is_on); 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) 212 # define RB_FORCE_8BYTE_ALIGNMENT 0 213 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 214 #else 215 # define RB_FORCE_8BYTE_ALIGNMENT 1 216 # define RB_ARCH_ALIGNMENT 8U 217 #endif 218 219 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 220 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 221 222 enum { 223 RB_LEN_TIME_EXTEND = 8, 224 RB_LEN_TIME_STAMP = 16, 225 }; 226 227 static inline int rb_null_event(struct ring_buffer_event *event) 228 { 229 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 230 } 231 232 static void rb_event_set_padding(struct ring_buffer_event *event) 233 { 234 /* padding has a NULL time_delta */ 235 event->type_len = RINGBUF_TYPE_PADDING; 236 event->time_delta = 0; 237 } 238 239 static unsigned 240 rb_event_data_length(struct ring_buffer_event *event) 241 { 242 unsigned length; 243 244 if (event->type_len) 245 length = event->type_len * RB_ALIGNMENT; 246 else 247 length = event->array[0]; 248 return length + RB_EVNT_HDR_SIZE; 249 } 250 251 /* inline for ring buffer fast paths */ 252 static unsigned 253 rb_event_length(struct ring_buffer_event *event) 254 { 255 switch (event->type_len) { 256 case RINGBUF_TYPE_PADDING: 257 if (rb_null_event(event)) 258 /* undefined */ 259 return -1; 260 return event->array[0] + RB_EVNT_HDR_SIZE; 261 262 case RINGBUF_TYPE_TIME_EXTEND: 263 return RB_LEN_TIME_EXTEND; 264 265 case RINGBUF_TYPE_TIME_STAMP: 266 return RB_LEN_TIME_STAMP; 267 268 case RINGBUF_TYPE_DATA: 269 return rb_event_data_length(event); 270 default: 271 BUG(); 272 } 273 /* not hit */ 274 return 0; 275 } 276 277 /** 278 * ring_buffer_event_length - return the length of the event 279 * @event: the event to get the length of 280 */ 281 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 282 { 283 unsigned length = rb_event_length(event); 284 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 285 return length; 286 length -= RB_EVNT_HDR_SIZE; 287 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 288 length -= sizeof(event->array[0]); 289 return length; 290 } 291 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 292 293 /* inline for ring buffer fast paths */ 294 static void * 295 rb_event_data(struct ring_buffer_event *event) 296 { 297 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 298 /* If length is in len field, then array[0] has the data */ 299 if (event->type_len) 300 return (void *)&event->array[0]; 301 /* Otherwise length is in array[0] and array[1] has the data */ 302 return (void *)&event->array[1]; 303 } 304 305 /** 306 * ring_buffer_event_data - return the data of the event 307 * @event: the event to get the data from 308 */ 309 void *ring_buffer_event_data(struct ring_buffer_event *event) 310 { 311 return rb_event_data(event); 312 } 313 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 314 315 #define for_each_buffer_cpu(buffer, cpu) \ 316 for_each_cpu(cpu, buffer->cpumask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 struct buffer_data_page { 323 u64 time_stamp; /* page time stamp */ 324 local_t commit; /* write committed index */ 325 unsigned char data[]; /* data of buffer page */ 326 }; 327 328 /* 329 * Note, the buffer_page list must be first. The buffer pages 330 * are allocated in cache lines, which means that each buffer 331 * page will be at the beginning of a cache line, and thus 332 * the least significant bits will be zero. We use this to 333 * add flags in the list struct pointers, to make the ring buffer 334 * lockless. 335 */ 336 struct buffer_page { 337 struct list_head list; /* list of buffer pages */ 338 local_t write; /* index for next write */ 339 unsigned read; /* index for next read */ 340 local_t entries; /* entries on this page */ 341 struct buffer_data_page *page; /* Actual data page */ 342 }; 343 344 /* 345 * The buffer page counters, write and entries, must be reset 346 * atomically when crossing page boundaries. To synchronize this 347 * update, two counters are inserted into the number. One is 348 * the actual counter for the write position or count on the page. 349 * 350 * The other is a counter of updaters. Before an update happens 351 * the update partition of the counter is incremented. This will 352 * allow the updater to update the counter atomically. 353 * 354 * The counter is 20 bits, and the state data is 12. 355 */ 356 #define RB_WRITE_MASK 0xfffff 357 #define RB_WRITE_INTCNT (1 << 20) 358 359 static void rb_init_page(struct buffer_data_page *bpage) 360 { 361 local_set(&bpage->commit, 0); 362 } 363 364 /** 365 * ring_buffer_page_len - the size of data on the page. 366 * @page: The page to read 367 * 368 * Returns the amount of data on the page, including buffer page header. 369 */ 370 size_t ring_buffer_page_len(void *page) 371 { 372 return local_read(&((struct buffer_data_page *)page)->commit) 373 + BUF_PAGE_HDR_SIZE; 374 } 375 376 /* 377 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 378 * this issue out. 379 */ 380 static void free_buffer_page(struct buffer_page *bpage) 381 { 382 free_page((unsigned long)bpage->page); 383 kfree(bpage); 384 } 385 386 /* 387 * We need to fit the time_stamp delta into 27 bits. 388 */ 389 static inline int test_time_stamp(u64 delta) 390 { 391 if (delta & TS_DELTA_TEST) 392 return 1; 393 return 0; 394 } 395 396 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 397 398 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 399 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 400 401 /* Max number of timestamps that can fit on a page */ 402 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) 403 404 int ring_buffer_print_page_header(struct trace_seq *s) 405 { 406 struct buffer_data_page field; 407 int ret; 408 409 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 410 "offset:0;\tsize:%u;\tsigned:%u;\n", 411 (unsigned int)sizeof(field.time_stamp), 412 (unsigned int)is_signed_type(u64)); 413 414 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 415 "offset:%u;\tsize:%u;\tsigned:%u;\n", 416 (unsigned int)offsetof(typeof(field), commit), 417 (unsigned int)sizeof(field.commit), 418 (unsigned int)is_signed_type(long)); 419 420 ret = trace_seq_printf(s, "\tfield: char data;\t" 421 "offset:%u;\tsize:%u;\tsigned:%u;\n", 422 (unsigned int)offsetof(typeof(field), data), 423 (unsigned int)BUF_PAGE_SIZE, 424 (unsigned int)is_signed_type(char)); 425 426 return ret; 427 } 428 429 /* 430 * head_page == tail_page && head == tail then buffer is empty. 431 */ 432 struct ring_buffer_per_cpu { 433 int cpu; 434 struct ring_buffer *buffer; 435 spinlock_t reader_lock; /* serialize readers */ 436 arch_spinlock_t lock; 437 struct lock_class_key lock_key; 438 struct list_head *pages; 439 struct buffer_page *head_page; /* read from head */ 440 struct buffer_page *tail_page; /* write to tail */ 441 struct buffer_page *commit_page; /* committed pages */ 442 struct buffer_page *reader_page; 443 local_t commit_overrun; 444 local_t overrun; 445 local_t entries; 446 local_t committing; 447 local_t commits; 448 unsigned long read; 449 u64 write_stamp; 450 u64 read_stamp; 451 atomic_t record_disabled; 452 }; 453 454 struct ring_buffer { 455 unsigned pages; 456 unsigned flags; 457 int cpus; 458 atomic_t record_disabled; 459 cpumask_var_t cpumask; 460 461 struct lock_class_key *reader_lock_key; 462 463 struct mutex mutex; 464 465 struct ring_buffer_per_cpu **buffers; 466 467 #ifdef CONFIG_HOTPLUG_CPU 468 struct notifier_block cpu_notify; 469 #endif 470 u64 (*clock)(void); 471 }; 472 473 struct ring_buffer_iter { 474 struct ring_buffer_per_cpu *cpu_buffer; 475 unsigned long head; 476 struct buffer_page *head_page; 477 struct buffer_page *cache_reader_page; 478 unsigned long cache_read; 479 u64 read_stamp; 480 }; 481 482 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 483 #define RB_WARN_ON(b, cond) \ 484 ({ \ 485 int _____ret = unlikely(cond); \ 486 if (_____ret) { \ 487 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 488 struct ring_buffer_per_cpu *__b = \ 489 (void *)b; \ 490 atomic_inc(&__b->buffer->record_disabled); \ 491 } else \ 492 atomic_inc(&b->record_disabled); \ 493 WARN_ON(1); \ 494 } \ 495 _____ret; \ 496 }) 497 498 /* Up this if you want to test the TIME_EXTENTS and normalization */ 499 #define DEBUG_SHIFT 0 500 501 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 502 { 503 /* shift to debug/test normalization and TIME_EXTENTS */ 504 return buffer->clock() << DEBUG_SHIFT; 505 } 506 507 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 508 { 509 u64 time; 510 511 preempt_disable_notrace(); 512 time = rb_time_stamp(buffer); 513 preempt_enable_no_resched_notrace(); 514 515 return time; 516 } 517 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 518 519 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 520 int cpu, u64 *ts) 521 { 522 /* Just stupid testing the normalize function and deltas */ 523 *ts >>= DEBUG_SHIFT; 524 } 525 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 526 527 /* 528 * Making the ring buffer lockless makes things tricky. 529 * Although writes only happen on the CPU that they are on, 530 * and they only need to worry about interrupts. Reads can 531 * happen on any CPU. 532 * 533 * The reader page is always off the ring buffer, but when the 534 * reader finishes with a page, it needs to swap its page with 535 * a new one from the buffer. The reader needs to take from 536 * the head (writes go to the tail). But if a writer is in overwrite 537 * mode and wraps, it must push the head page forward. 538 * 539 * Here lies the problem. 540 * 541 * The reader must be careful to replace only the head page, and 542 * not another one. As described at the top of the file in the 543 * ASCII art, the reader sets its old page to point to the next 544 * page after head. It then sets the page after head to point to 545 * the old reader page. But if the writer moves the head page 546 * during this operation, the reader could end up with the tail. 547 * 548 * We use cmpxchg to help prevent this race. We also do something 549 * special with the page before head. We set the LSB to 1. 550 * 551 * When the writer must push the page forward, it will clear the 552 * bit that points to the head page, move the head, and then set 553 * the bit that points to the new head page. 554 * 555 * We also don't want an interrupt coming in and moving the head 556 * page on another writer. Thus we use the second LSB to catch 557 * that too. Thus: 558 * 559 * head->list->prev->next bit 1 bit 0 560 * ------- ------- 561 * Normal page 0 0 562 * Points to head page 0 1 563 * New head page 1 0 564 * 565 * Note we can not trust the prev pointer of the head page, because: 566 * 567 * +----+ +-----+ +-----+ 568 * | |------>| T |---X--->| N | 569 * | |<------| | | | 570 * +----+ +-----+ +-----+ 571 * ^ ^ | 572 * | +-----+ | | 573 * +----------| R |----------+ | 574 * | |<-----------+ 575 * +-----+ 576 * 577 * Key: ---X--> HEAD flag set in pointer 578 * T Tail page 579 * R Reader page 580 * N Next page 581 * 582 * (see __rb_reserve_next() to see where this happens) 583 * 584 * What the above shows is that the reader just swapped out 585 * the reader page with a page in the buffer, but before it 586 * could make the new header point back to the new page added 587 * it was preempted by a writer. The writer moved forward onto 588 * the new page added by the reader and is about to move forward 589 * again. 590 * 591 * You can see, it is legitimate for the previous pointer of 592 * the head (or any page) not to point back to itself. But only 593 * temporarially. 594 */ 595 596 #define RB_PAGE_NORMAL 0UL 597 #define RB_PAGE_HEAD 1UL 598 #define RB_PAGE_UPDATE 2UL 599 600 601 #define RB_FLAG_MASK 3UL 602 603 /* PAGE_MOVED is not part of the mask */ 604 #define RB_PAGE_MOVED 4UL 605 606 /* 607 * rb_list_head - remove any bit 608 */ 609 static struct list_head *rb_list_head(struct list_head *list) 610 { 611 unsigned long val = (unsigned long)list; 612 613 return (struct list_head *)(val & ~RB_FLAG_MASK); 614 } 615 616 /* 617 * rb_is_head_page - test if the given page is the head page 618 * 619 * Because the reader may move the head_page pointer, we can 620 * not trust what the head page is (it may be pointing to 621 * the reader page). But if the next page is a header page, 622 * its flags will be non zero. 623 */ 624 static int inline 625 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 626 struct buffer_page *page, struct list_head *list) 627 { 628 unsigned long val; 629 630 val = (unsigned long)list->next; 631 632 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 633 return RB_PAGE_MOVED; 634 635 return val & RB_FLAG_MASK; 636 } 637 638 /* 639 * rb_is_reader_page 640 * 641 * The unique thing about the reader page, is that, if the 642 * writer is ever on it, the previous pointer never points 643 * back to the reader page. 644 */ 645 static int rb_is_reader_page(struct buffer_page *page) 646 { 647 struct list_head *list = page->list.prev; 648 649 return rb_list_head(list->next) != &page->list; 650 } 651 652 /* 653 * rb_set_list_to_head - set a list_head to be pointing to head. 654 */ 655 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 656 struct list_head *list) 657 { 658 unsigned long *ptr; 659 660 ptr = (unsigned long *)&list->next; 661 *ptr |= RB_PAGE_HEAD; 662 *ptr &= ~RB_PAGE_UPDATE; 663 } 664 665 /* 666 * rb_head_page_activate - sets up head page 667 */ 668 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 669 { 670 struct buffer_page *head; 671 672 head = cpu_buffer->head_page; 673 if (!head) 674 return; 675 676 /* 677 * Set the previous list pointer to have the HEAD flag. 678 */ 679 rb_set_list_to_head(cpu_buffer, head->list.prev); 680 } 681 682 static void rb_list_head_clear(struct list_head *list) 683 { 684 unsigned long *ptr = (unsigned long *)&list->next; 685 686 *ptr &= ~RB_FLAG_MASK; 687 } 688 689 /* 690 * rb_head_page_dactivate - clears head page ptr (for free list) 691 */ 692 static void 693 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 694 { 695 struct list_head *hd; 696 697 /* Go through the whole list and clear any pointers found. */ 698 rb_list_head_clear(cpu_buffer->pages); 699 700 list_for_each(hd, cpu_buffer->pages) 701 rb_list_head_clear(hd); 702 } 703 704 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 705 struct buffer_page *head, 706 struct buffer_page *prev, 707 int old_flag, int new_flag) 708 { 709 struct list_head *list; 710 unsigned long val = (unsigned long)&head->list; 711 unsigned long ret; 712 713 list = &prev->list; 714 715 val &= ~RB_FLAG_MASK; 716 717 ret = cmpxchg((unsigned long *)&list->next, 718 val | old_flag, val | new_flag); 719 720 /* check if the reader took the page */ 721 if ((ret & ~RB_FLAG_MASK) != val) 722 return RB_PAGE_MOVED; 723 724 return ret & RB_FLAG_MASK; 725 } 726 727 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 728 struct buffer_page *head, 729 struct buffer_page *prev, 730 int old_flag) 731 { 732 return rb_head_page_set(cpu_buffer, head, prev, 733 old_flag, RB_PAGE_UPDATE); 734 } 735 736 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 737 struct buffer_page *head, 738 struct buffer_page *prev, 739 int old_flag) 740 { 741 return rb_head_page_set(cpu_buffer, head, prev, 742 old_flag, RB_PAGE_HEAD); 743 } 744 745 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 746 struct buffer_page *head, 747 struct buffer_page *prev, 748 int old_flag) 749 { 750 return rb_head_page_set(cpu_buffer, head, prev, 751 old_flag, RB_PAGE_NORMAL); 752 } 753 754 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 755 struct buffer_page **bpage) 756 { 757 struct list_head *p = rb_list_head((*bpage)->list.next); 758 759 *bpage = list_entry(p, struct buffer_page, list); 760 } 761 762 static struct buffer_page * 763 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 764 { 765 struct buffer_page *head; 766 struct buffer_page *page; 767 struct list_head *list; 768 int i; 769 770 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 771 return NULL; 772 773 /* sanity check */ 774 list = cpu_buffer->pages; 775 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 776 return NULL; 777 778 page = head = cpu_buffer->head_page; 779 /* 780 * It is possible that the writer moves the header behind 781 * where we started, and we miss in one loop. 782 * A second loop should grab the header, but we'll do 783 * three loops just because I'm paranoid. 784 */ 785 for (i = 0; i < 3; i++) { 786 do { 787 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 788 cpu_buffer->head_page = page; 789 return page; 790 } 791 rb_inc_page(cpu_buffer, &page); 792 } while (page != head); 793 } 794 795 RB_WARN_ON(cpu_buffer, 1); 796 797 return NULL; 798 } 799 800 static int rb_head_page_replace(struct buffer_page *old, 801 struct buffer_page *new) 802 { 803 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 804 unsigned long val; 805 unsigned long ret; 806 807 val = *ptr & ~RB_FLAG_MASK; 808 val |= RB_PAGE_HEAD; 809 810 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 811 812 return ret == val; 813 } 814 815 /* 816 * rb_tail_page_update - move the tail page forward 817 * 818 * Returns 1 if moved tail page, 0 if someone else did. 819 */ 820 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 821 struct buffer_page *tail_page, 822 struct buffer_page *next_page) 823 { 824 struct buffer_page *old_tail; 825 unsigned long old_entries; 826 unsigned long old_write; 827 int ret = 0; 828 829 /* 830 * The tail page now needs to be moved forward. 831 * 832 * We need to reset the tail page, but without messing 833 * with possible erasing of data brought in by interrupts 834 * that have moved the tail page and are currently on it. 835 * 836 * We add a counter to the write field to denote this. 837 */ 838 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 839 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 840 841 /* 842 * Just make sure we have seen our old_write and synchronize 843 * with any interrupts that come in. 844 */ 845 barrier(); 846 847 /* 848 * If the tail page is still the same as what we think 849 * it is, then it is up to us to update the tail 850 * pointer. 851 */ 852 if (tail_page == cpu_buffer->tail_page) { 853 /* Zero the write counter */ 854 unsigned long val = old_write & ~RB_WRITE_MASK; 855 unsigned long eval = old_entries & ~RB_WRITE_MASK; 856 857 /* 858 * This will only succeed if an interrupt did 859 * not come in and change it. In which case, we 860 * do not want to modify it. 861 * 862 * We add (void) to let the compiler know that we do not care 863 * about the return value of these functions. We use the 864 * cmpxchg to only update if an interrupt did not already 865 * do it for us. If the cmpxchg fails, we don't care. 866 */ 867 (void)local_cmpxchg(&next_page->write, old_write, val); 868 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 869 870 /* 871 * No need to worry about races with clearing out the commit. 872 * it only can increment when a commit takes place. But that 873 * only happens in the outer most nested commit. 874 */ 875 local_set(&next_page->page->commit, 0); 876 877 old_tail = cmpxchg(&cpu_buffer->tail_page, 878 tail_page, next_page); 879 880 if (old_tail == tail_page) 881 ret = 1; 882 } 883 884 return ret; 885 } 886 887 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 888 struct buffer_page *bpage) 889 { 890 unsigned long val = (unsigned long)bpage; 891 892 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 893 return 1; 894 895 return 0; 896 } 897 898 /** 899 * rb_check_list - make sure a pointer to a list has the last bits zero 900 */ 901 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 902 struct list_head *list) 903 { 904 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 905 return 1; 906 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 907 return 1; 908 return 0; 909 } 910 911 /** 912 * check_pages - integrity check of buffer pages 913 * @cpu_buffer: CPU buffer with pages to test 914 * 915 * As a safety measure we check to make sure the data pages have not 916 * been corrupted. 917 */ 918 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 919 { 920 struct list_head *head = cpu_buffer->pages; 921 struct buffer_page *bpage, *tmp; 922 923 rb_head_page_deactivate(cpu_buffer); 924 925 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 926 return -1; 927 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 928 return -1; 929 930 if (rb_check_list(cpu_buffer, head)) 931 return -1; 932 933 list_for_each_entry_safe(bpage, tmp, head, list) { 934 if (RB_WARN_ON(cpu_buffer, 935 bpage->list.next->prev != &bpage->list)) 936 return -1; 937 if (RB_WARN_ON(cpu_buffer, 938 bpage->list.prev->next != &bpage->list)) 939 return -1; 940 if (rb_check_list(cpu_buffer, &bpage->list)) 941 return -1; 942 } 943 944 rb_head_page_activate(cpu_buffer); 945 946 return 0; 947 } 948 949 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 950 unsigned nr_pages) 951 { 952 struct buffer_page *bpage, *tmp; 953 unsigned long addr; 954 LIST_HEAD(pages); 955 unsigned i; 956 957 WARN_ON(!nr_pages); 958 959 for (i = 0; i < nr_pages; i++) { 960 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 961 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 962 if (!bpage) 963 goto free_pages; 964 965 rb_check_bpage(cpu_buffer, bpage); 966 967 list_add(&bpage->list, &pages); 968 969 addr = __get_free_page(GFP_KERNEL); 970 if (!addr) 971 goto free_pages; 972 bpage->page = (void *)addr; 973 rb_init_page(bpage->page); 974 } 975 976 /* 977 * The ring buffer page list is a circular list that does not 978 * start and end with a list head. All page list items point to 979 * other pages. 980 */ 981 cpu_buffer->pages = pages.next; 982 list_del(&pages); 983 984 rb_check_pages(cpu_buffer); 985 986 return 0; 987 988 free_pages: 989 list_for_each_entry_safe(bpage, tmp, &pages, list) { 990 list_del_init(&bpage->list); 991 free_buffer_page(bpage); 992 } 993 return -ENOMEM; 994 } 995 996 static struct ring_buffer_per_cpu * 997 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 998 { 999 struct ring_buffer_per_cpu *cpu_buffer; 1000 struct buffer_page *bpage; 1001 unsigned long addr; 1002 int ret; 1003 1004 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1005 GFP_KERNEL, cpu_to_node(cpu)); 1006 if (!cpu_buffer) 1007 return NULL; 1008 1009 cpu_buffer->cpu = cpu; 1010 cpu_buffer->buffer = buffer; 1011 spin_lock_init(&cpu_buffer->reader_lock); 1012 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1013 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1014 1015 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1016 GFP_KERNEL, cpu_to_node(cpu)); 1017 if (!bpage) 1018 goto fail_free_buffer; 1019 1020 rb_check_bpage(cpu_buffer, bpage); 1021 1022 cpu_buffer->reader_page = bpage; 1023 addr = __get_free_page(GFP_KERNEL); 1024 if (!addr) 1025 goto fail_free_reader; 1026 bpage->page = (void *)addr; 1027 rb_init_page(bpage->page); 1028 1029 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1030 1031 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1032 if (ret < 0) 1033 goto fail_free_reader; 1034 1035 cpu_buffer->head_page 1036 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1037 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1038 1039 rb_head_page_activate(cpu_buffer); 1040 1041 return cpu_buffer; 1042 1043 fail_free_reader: 1044 free_buffer_page(cpu_buffer->reader_page); 1045 1046 fail_free_buffer: 1047 kfree(cpu_buffer); 1048 return NULL; 1049 } 1050 1051 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1052 { 1053 struct list_head *head = cpu_buffer->pages; 1054 struct buffer_page *bpage, *tmp; 1055 1056 free_buffer_page(cpu_buffer->reader_page); 1057 1058 rb_head_page_deactivate(cpu_buffer); 1059 1060 if (head) { 1061 list_for_each_entry_safe(bpage, tmp, head, list) { 1062 list_del_init(&bpage->list); 1063 free_buffer_page(bpage); 1064 } 1065 bpage = list_entry(head, struct buffer_page, list); 1066 free_buffer_page(bpage); 1067 } 1068 1069 kfree(cpu_buffer); 1070 } 1071 1072 #ifdef CONFIG_HOTPLUG_CPU 1073 static int rb_cpu_notify(struct notifier_block *self, 1074 unsigned long action, void *hcpu); 1075 #endif 1076 1077 /** 1078 * ring_buffer_alloc - allocate a new ring_buffer 1079 * @size: the size in bytes per cpu that is needed. 1080 * @flags: attributes to set for the ring buffer. 1081 * 1082 * Currently the only flag that is available is the RB_FL_OVERWRITE 1083 * flag. This flag means that the buffer will overwrite old data 1084 * when the buffer wraps. If this flag is not set, the buffer will 1085 * drop data when the tail hits the head. 1086 */ 1087 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1088 struct lock_class_key *key) 1089 { 1090 struct ring_buffer *buffer; 1091 int bsize; 1092 int cpu; 1093 1094 /* keep it in its own cache line */ 1095 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1096 GFP_KERNEL); 1097 if (!buffer) 1098 return NULL; 1099 1100 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1101 goto fail_free_buffer; 1102 1103 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1104 buffer->flags = flags; 1105 buffer->clock = trace_clock_local; 1106 buffer->reader_lock_key = key; 1107 1108 /* need at least two pages */ 1109 if (buffer->pages < 2) 1110 buffer->pages = 2; 1111 1112 /* 1113 * In case of non-hotplug cpu, if the ring-buffer is allocated 1114 * in early initcall, it will not be notified of secondary cpus. 1115 * In that off case, we need to allocate for all possible cpus. 1116 */ 1117 #ifdef CONFIG_HOTPLUG_CPU 1118 get_online_cpus(); 1119 cpumask_copy(buffer->cpumask, cpu_online_mask); 1120 #else 1121 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1122 #endif 1123 buffer->cpus = nr_cpu_ids; 1124 1125 bsize = sizeof(void *) * nr_cpu_ids; 1126 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1127 GFP_KERNEL); 1128 if (!buffer->buffers) 1129 goto fail_free_cpumask; 1130 1131 for_each_buffer_cpu(buffer, cpu) { 1132 buffer->buffers[cpu] = 1133 rb_allocate_cpu_buffer(buffer, cpu); 1134 if (!buffer->buffers[cpu]) 1135 goto fail_free_buffers; 1136 } 1137 1138 #ifdef CONFIG_HOTPLUG_CPU 1139 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1140 buffer->cpu_notify.priority = 0; 1141 register_cpu_notifier(&buffer->cpu_notify); 1142 #endif 1143 1144 put_online_cpus(); 1145 mutex_init(&buffer->mutex); 1146 1147 return buffer; 1148 1149 fail_free_buffers: 1150 for_each_buffer_cpu(buffer, cpu) { 1151 if (buffer->buffers[cpu]) 1152 rb_free_cpu_buffer(buffer->buffers[cpu]); 1153 } 1154 kfree(buffer->buffers); 1155 1156 fail_free_cpumask: 1157 free_cpumask_var(buffer->cpumask); 1158 put_online_cpus(); 1159 1160 fail_free_buffer: 1161 kfree(buffer); 1162 return NULL; 1163 } 1164 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1165 1166 /** 1167 * ring_buffer_free - free a ring buffer. 1168 * @buffer: the buffer to free. 1169 */ 1170 void 1171 ring_buffer_free(struct ring_buffer *buffer) 1172 { 1173 int cpu; 1174 1175 get_online_cpus(); 1176 1177 #ifdef CONFIG_HOTPLUG_CPU 1178 unregister_cpu_notifier(&buffer->cpu_notify); 1179 #endif 1180 1181 for_each_buffer_cpu(buffer, cpu) 1182 rb_free_cpu_buffer(buffer->buffers[cpu]); 1183 1184 put_online_cpus(); 1185 1186 kfree(buffer->buffers); 1187 free_cpumask_var(buffer->cpumask); 1188 1189 kfree(buffer); 1190 } 1191 EXPORT_SYMBOL_GPL(ring_buffer_free); 1192 1193 void ring_buffer_set_clock(struct ring_buffer *buffer, 1194 u64 (*clock)(void)) 1195 { 1196 buffer->clock = clock; 1197 } 1198 1199 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1200 1201 static void 1202 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1203 { 1204 struct buffer_page *bpage; 1205 struct list_head *p; 1206 unsigned i; 1207 1208 spin_lock_irq(&cpu_buffer->reader_lock); 1209 rb_head_page_deactivate(cpu_buffer); 1210 1211 for (i = 0; i < nr_pages; i++) { 1212 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1213 goto out; 1214 p = cpu_buffer->pages->next; 1215 bpage = list_entry(p, struct buffer_page, list); 1216 list_del_init(&bpage->list); 1217 free_buffer_page(bpage); 1218 } 1219 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1220 goto out; 1221 1222 rb_reset_cpu(cpu_buffer); 1223 rb_check_pages(cpu_buffer); 1224 1225 out: 1226 spin_unlock_irq(&cpu_buffer->reader_lock); 1227 } 1228 1229 static void 1230 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1231 struct list_head *pages, unsigned nr_pages) 1232 { 1233 struct buffer_page *bpage; 1234 struct list_head *p; 1235 unsigned i; 1236 1237 spin_lock_irq(&cpu_buffer->reader_lock); 1238 rb_head_page_deactivate(cpu_buffer); 1239 1240 for (i = 0; i < nr_pages; i++) { 1241 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1242 goto out; 1243 p = pages->next; 1244 bpage = list_entry(p, struct buffer_page, list); 1245 list_del_init(&bpage->list); 1246 list_add_tail(&bpage->list, cpu_buffer->pages); 1247 } 1248 rb_reset_cpu(cpu_buffer); 1249 rb_check_pages(cpu_buffer); 1250 1251 out: 1252 spin_unlock_irq(&cpu_buffer->reader_lock); 1253 } 1254 1255 /** 1256 * ring_buffer_resize - resize the ring buffer 1257 * @buffer: the buffer to resize. 1258 * @size: the new size. 1259 * 1260 * Minimum size is 2 * BUF_PAGE_SIZE. 1261 * 1262 * Returns -1 on failure. 1263 */ 1264 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1265 { 1266 struct ring_buffer_per_cpu *cpu_buffer; 1267 unsigned nr_pages, rm_pages, new_pages; 1268 struct buffer_page *bpage, *tmp; 1269 unsigned long buffer_size; 1270 unsigned long addr; 1271 LIST_HEAD(pages); 1272 int i, cpu; 1273 1274 /* 1275 * Always succeed at resizing a non-existent buffer: 1276 */ 1277 if (!buffer) 1278 return size; 1279 1280 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1281 size *= BUF_PAGE_SIZE; 1282 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1283 1284 /* we need a minimum of two pages */ 1285 if (size < BUF_PAGE_SIZE * 2) 1286 size = BUF_PAGE_SIZE * 2; 1287 1288 if (size == buffer_size) 1289 return size; 1290 1291 atomic_inc(&buffer->record_disabled); 1292 1293 /* Make sure all writers are done with this buffer. */ 1294 synchronize_sched(); 1295 1296 mutex_lock(&buffer->mutex); 1297 get_online_cpus(); 1298 1299 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1300 1301 if (size < buffer_size) { 1302 1303 /* easy case, just free pages */ 1304 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1305 goto out_fail; 1306 1307 rm_pages = buffer->pages - nr_pages; 1308 1309 for_each_buffer_cpu(buffer, cpu) { 1310 cpu_buffer = buffer->buffers[cpu]; 1311 rb_remove_pages(cpu_buffer, rm_pages); 1312 } 1313 goto out; 1314 } 1315 1316 /* 1317 * This is a bit more difficult. We only want to add pages 1318 * when we can allocate enough for all CPUs. We do this 1319 * by allocating all the pages and storing them on a local 1320 * link list. If we succeed in our allocation, then we 1321 * add these pages to the cpu_buffers. Otherwise we just free 1322 * them all and return -ENOMEM; 1323 */ 1324 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1325 goto out_fail; 1326 1327 new_pages = nr_pages - buffer->pages; 1328 1329 for_each_buffer_cpu(buffer, cpu) { 1330 for (i = 0; i < new_pages; i++) { 1331 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1332 cache_line_size()), 1333 GFP_KERNEL, cpu_to_node(cpu)); 1334 if (!bpage) 1335 goto free_pages; 1336 list_add(&bpage->list, &pages); 1337 addr = __get_free_page(GFP_KERNEL); 1338 if (!addr) 1339 goto free_pages; 1340 bpage->page = (void *)addr; 1341 rb_init_page(bpage->page); 1342 } 1343 } 1344 1345 for_each_buffer_cpu(buffer, cpu) { 1346 cpu_buffer = buffer->buffers[cpu]; 1347 rb_insert_pages(cpu_buffer, &pages, new_pages); 1348 } 1349 1350 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1351 goto out_fail; 1352 1353 out: 1354 buffer->pages = nr_pages; 1355 put_online_cpus(); 1356 mutex_unlock(&buffer->mutex); 1357 1358 atomic_dec(&buffer->record_disabled); 1359 1360 return size; 1361 1362 free_pages: 1363 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1364 list_del_init(&bpage->list); 1365 free_buffer_page(bpage); 1366 } 1367 put_online_cpus(); 1368 mutex_unlock(&buffer->mutex); 1369 atomic_dec(&buffer->record_disabled); 1370 return -ENOMEM; 1371 1372 /* 1373 * Something went totally wrong, and we are too paranoid 1374 * to even clean up the mess. 1375 */ 1376 out_fail: 1377 put_online_cpus(); 1378 mutex_unlock(&buffer->mutex); 1379 atomic_dec(&buffer->record_disabled); 1380 return -1; 1381 } 1382 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1383 1384 static inline void * 1385 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1386 { 1387 return bpage->data + index; 1388 } 1389 1390 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1391 { 1392 return bpage->page->data + index; 1393 } 1394 1395 static inline struct ring_buffer_event * 1396 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1397 { 1398 return __rb_page_index(cpu_buffer->reader_page, 1399 cpu_buffer->reader_page->read); 1400 } 1401 1402 static inline struct ring_buffer_event * 1403 rb_iter_head_event(struct ring_buffer_iter *iter) 1404 { 1405 return __rb_page_index(iter->head_page, iter->head); 1406 } 1407 1408 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1409 { 1410 return local_read(&bpage->write) & RB_WRITE_MASK; 1411 } 1412 1413 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1414 { 1415 return local_read(&bpage->page->commit); 1416 } 1417 1418 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1419 { 1420 return local_read(&bpage->entries) & RB_WRITE_MASK; 1421 } 1422 1423 /* Size is determined by what has been commited */ 1424 static inline unsigned rb_page_size(struct buffer_page *bpage) 1425 { 1426 return rb_page_commit(bpage); 1427 } 1428 1429 static inline unsigned 1430 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1431 { 1432 return rb_page_commit(cpu_buffer->commit_page); 1433 } 1434 1435 static inline unsigned 1436 rb_event_index(struct ring_buffer_event *event) 1437 { 1438 unsigned long addr = (unsigned long)event; 1439 1440 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1441 } 1442 1443 static inline int 1444 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1445 struct ring_buffer_event *event) 1446 { 1447 unsigned long addr = (unsigned long)event; 1448 unsigned long index; 1449 1450 index = rb_event_index(event); 1451 addr &= PAGE_MASK; 1452 1453 return cpu_buffer->commit_page->page == (void *)addr && 1454 rb_commit_index(cpu_buffer) == index; 1455 } 1456 1457 static void 1458 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1459 { 1460 unsigned long max_count; 1461 1462 /* 1463 * We only race with interrupts and NMIs on this CPU. 1464 * If we own the commit event, then we can commit 1465 * all others that interrupted us, since the interruptions 1466 * are in stack format (they finish before they come 1467 * back to us). This allows us to do a simple loop to 1468 * assign the commit to the tail. 1469 */ 1470 again: 1471 max_count = cpu_buffer->buffer->pages * 100; 1472 1473 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1474 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1475 return; 1476 if (RB_WARN_ON(cpu_buffer, 1477 rb_is_reader_page(cpu_buffer->tail_page))) 1478 return; 1479 local_set(&cpu_buffer->commit_page->page->commit, 1480 rb_page_write(cpu_buffer->commit_page)); 1481 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1482 cpu_buffer->write_stamp = 1483 cpu_buffer->commit_page->page->time_stamp; 1484 /* add barrier to keep gcc from optimizing too much */ 1485 barrier(); 1486 } 1487 while (rb_commit_index(cpu_buffer) != 1488 rb_page_write(cpu_buffer->commit_page)) { 1489 1490 local_set(&cpu_buffer->commit_page->page->commit, 1491 rb_page_write(cpu_buffer->commit_page)); 1492 RB_WARN_ON(cpu_buffer, 1493 local_read(&cpu_buffer->commit_page->page->commit) & 1494 ~RB_WRITE_MASK); 1495 barrier(); 1496 } 1497 1498 /* again, keep gcc from optimizing */ 1499 barrier(); 1500 1501 /* 1502 * If an interrupt came in just after the first while loop 1503 * and pushed the tail page forward, we will be left with 1504 * a dangling commit that will never go forward. 1505 */ 1506 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1507 goto again; 1508 } 1509 1510 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1511 { 1512 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1513 cpu_buffer->reader_page->read = 0; 1514 } 1515 1516 static void rb_inc_iter(struct ring_buffer_iter *iter) 1517 { 1518 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1519 1520 /* 1521 * The iterator could be on the reader page (it starts there). 1522 * But the head could have moved, since the reader was 1523 * found. Check for this case and assign the iterator 1524 * to the head page instead of next. 1525 */ 1526 if (iter->head_page == cpu_buffer->reader_page) 1527 iter->head_page = rb_set_head_page(cpu_buffer); 1528 else 1529 rb_inc_page(cpu_buffer, &iter->head_page); 1530 1531 iter->read_stamp = iter->head_page->page->time_stamp; 1532 iter->head = 0; 1533 } 1534 1535 /** 1536 * ring_buffer_update_event - update event type and data 1537 * @event: the even to update 1538 * @type: the type of event 1539 * @length: the size of the event field in the ring buffer 1540 * 1541 * Update the type and data fields of the event. The length 1542 * is the actual size that is written to the ring buffer, 1543 * and with this, we can determine what to place into the 1544 * data field. 1545 */ 1546 static void 1547 rb_update_event(struct ring_buffer_event *event, 1548 unsigned type, unsigned length) 1549 { 1550 event->type_len = type; 1551 1552 switch (type) { 1553 1554 case RINGBUF_TYPE_PADDING: 1555 case RINGBUF_TYPE_TIME_EXTEND: 1556 case RINGBUF_TYPE_TIME_STAMP: 1557 break; 1558 1559 case 0: 1560 length -= RB_EVNT_HDR_SIZE; 1561 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1562 event->array[0] = length; 1563 else 1564 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1565 break; 1566 default: 1567 BUG(); 1568 } 1569 } 1570 1571 /* 1572 * rb_handle_head_page - writer hit the head page 1573 * 1574 * Returns: +1 to retry page 1575 * 0 to continue 1576 * -1 on error 1577 */ 1578 static int 1579 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1580 struct buffer_page *tail_page, 1581 struct buffer_page *next_page) 1582 { 1583 struct buffer_page *new_head; 1584 int entries; 1585 int type; 1586 int ret; 1587 1588 entries = rb_page_entries(next_page); 1589 1590 /* 1591 * The hard part is here. We need to move the head 1592 * forward, and protect against both readers on 1593 * other CPUs and writers coming in via interrupts. 1594 */ 1595 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1596 RB_PAGE_HEAD); 1597 1598 /* 1599 * type can be one of four: 1600 * NORMAL - an interrupt already moved it for us 1601 * HEAD - we are the first to get here. 1602 * UPDATE - we are the interrupt interrupting 1603 * a current move. 1604 * MOVED - a reader on another CPU moved the next 1605 * pointer to its reader page. Give up 1606 * and try again. 1607 */ 1608 1609 switch (type) { 1610 case RB_PAGE_HEAD: 1611 /* 1612 * We changed the head to UPDATE, thus 1613 * it is our responsibility to update 1614 * the counters. 1615 */ 1616 local_add(entries, &cpu_buffer->overrun); 1617 1618 /* 1619 * The entries will be zeroed out when we move the 1620 * tail page. 1621 */ 1622 1623 /* still more to do */ 1624 break; 1625 1626 case RB_PAGE_UPDATE: 1627 /* 1628 * This is an interrupt that interrupt the 1629 * previous update. Still more to do. 1630 */ 1631 break; 1632 case RB_PAGE_NORMAL: 1633 /* 1634 * An interrupt came in before the update 1635 * and processed this for us. 1636 * Nothing left to do. 1637 */ 1638 return 1; 1639 case RB_PAGE_MOVED: 1640 /* 1641 * The reader is on another CPU and just did 1642 * a swap with our next_page. 1643 * Try again. 1644 */ 1645 return 1; 1646 default: 1647 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1648 return -1; 1649 } 1650 1651 /* 1652 * Now that we are here, the old head pointer is 1653 * set to UPDATE. This will keep the reader from 1654 * swapping the head page with the reader page. 1655 * The reader (on another CPU) will spin till 1656 * we are finished. 1657 * 1658 * We just need to protect against interrupts 1659 * doing the job. We will set the next pointer 1660 * to HEAD. After that, we set the old pointer 1661 * to NORMAL, but only if it was HEAD before. 1662 * otherwise we are an interrupt, and only 1663 * want the outer most commit to reset it. 1664 */ 1665 new_head = next_page; 1666 rb_inc_page(cpu_buffer, &new_head); 1667 1668 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1669 RB_PAGE_NORMAL); 1670 1671 /* 1672 * Valid returns are: 1673 * HEAD - an interrupt came in and already set it. 1674 * NORMAL - One of two things: 1675 * 1) We really set it. 1676 * 2) A bunch of interrupts came in and moved 1677 * the page forward again. 1678 */ 1679 switch (ret) { 1680 case RB_PAGE_HEAD: 1681 case RB_PAGE_NORMAL: 1682 /* OK */ 1683 break; 1684 default: 1685 RB_WARN_ON(cpu_buffer, 1); 1686 return -1; 1687 } 1688 1689 /* 1690 * It is possible that an interrupt came in, 1691 * set the head up, then more interrupts came in 1692 * and moved it again. When we get back here, 1693 * the page would have been set to NORMAL but we 1694 * just set it back to HEAD. 1695 * 1696 * How do you detect this? Well, if that happened 1697 * the tail page would have moved. 1698 */ 1699 if (ret == RB_PAGE_NORMAL) { 1700 /* 1701 * If the tail had moved passed next, then we need 1702 * to reset the pointer. 1703 */ 1704 if (cpu_buffer->tail_page != tail_page && 1705 cpu_buffer->tail_page != next_page) 1706 rb_head_page_set_normal(cpu_buffer, new_head, 1707 next_page, 1708 RB_PAGE_HEAD); 1709 } 1710 1711 /* 1712 * If this was the outer most commit (the one that 1713 * changed the original pointer from HEAD to UPDATE), 1714 * then it is up to us to reset it to NORMAL. 1715 */ 1716 if (type == RB_PAGE_HEAD) { 1717 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1718 tail_page, 1719 RB_PAGE_UPDATE); 1720 if (RB_WARN_ON(cpu_buffer, 1721 ret != RB_PAGE_UPDATE)) 1722 return -1; 1723 } 1724 1725 return 0; 1726 } 1727 1728 static unsigned rb_calculate_event_length(unsigned length) 1729 { 1730 struct ring_buffer_event event; /* Used only for sizeof array */ 1731 1732 /* zero length can cause confusions */ 1733 if (!length) 1734 length = 1; 1735 1736 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1737 length += sizeof(event.array[0]); 1738 1739 length += RB_EVNT_HDR_SIZE; 1740 length = ALIGN(length, RB_ARCH_ALIGNMENT); 1741 1742 return length; 1743 } 1744 1745 static inline void 1746 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1747 struct buffer_page *tail_page, 1748 unsigned long tail, unsigned long length) 1749 { 1750 struct ring_buffer_event *event; 1751 1752 /* 1753 * Only the event that crossed the page boundary 1754 * must fill the old tail_page with padding. 1755 */ 1756 if (tail >= BUF_PAGE_SIZE) { 1757 local_sub(length, &tail_page->write); 1758 return; 1759 } 1760 1761 event = __rb_page_index(tail_page, tail); 1762 kmemcheck_annotate_bitfield(event, bitfield); 1763 1764 /* 1765 * If this event is bigger than the minimum size, then 1766 * we need to be careful that we don't subtract the 1767 * write counter enough to allow another writer to slip 1768 * in on this page. 1769 * We put in a discarded commit instead, to make sure 1770 * that this space is not used again. 1771 * 1772 * If we are less than the minimum size, we don't need to 1773 * worry about it. 1774 */ 1775 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1776 /* No room for any events */ 1777 1778 /* Mark the rest of the page with padding */ 1779 rb_event_set_padding(event); 1780 1781 /* Set the write back to the previous setting */ 1782 local_sub(length, &tail_page->write); 1783 return; 1784 } 1785 1786 /* Put in a discarded event */ 1787 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1788 event->type_len = RINGBUF_TYPE_PADDING; 1789 /* time delta must be non zero */ 1790 event->time_delta = 1; 1791 1792 /* Set write to end of buffer */ 1793 length = (tail + length) - BUF_PAGE_SIZE; 1794 local_sub(length, &tail_page->write); 1795 } 1796 1797 static struct ring_buffer_event * 1798 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1799 unsigned long length, unsigned long tail, 1800 struct buffer_page *tail_page, u64 *ts) 1801 { 1802 struct buffer_page *commit_page = cpu_buffer->commit_page; 1803 struct ring_buffer *buffer = cpu_buffer->buffer; 1804 struct buffer_page *next_page; 1805 int ret; 1806 1807 next_page = tail_page; 1808 1809 rb_inc_page(cpu_buffer, &next_page); 1810 1811 /* 1812 * If for some reason, we had an interrupt storm that made 1813 * it all the way around the buffer, bail, and warn 1814 * about it. 1815 */ 1816 if (unlikely(next_page == commit_page)) { 1817 local_inc(&cpu_buffer->commit_overrun); 1818 goto out_reset; 1819 } 1820 1821 /* 1822 * This is where the fun begins! 1823 * 1824 * We are fighting against races between a reader that 1825 * could be on another CPU trying to swap its reader 1826 * page with the buffer head. 1827 * 1828 * We are also fighting against interrupts coming in and 1829 * moving the head or tail on us as well. 1830 * 1831 * If the next page is the head page then we have filled 1832 * the buffer, unless the commit page is still on the 1833 * reader page. 1834 */ 1835 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1836 1837 /* 1838 * If the commit is not on the reader page, then 1839 * move the header page. 1840 */ 1841 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1842 /* 1843 * If we are not in overwrite mode, 1844 * this is easy, just stop here. 1845 */ 1846 if (!(buffer->flags & RB_FL_OVERWRITE)) 1847 goto out_reset; 1848 1849 ret = rb_handle_head_page(cpu_buffer, 1850 tail_page, 1851 next_page); 1852 if (ret < 0) 1853 goto out_reset; 1854 if (ret) 1855 goto out_again; 1856 } else { 1857 /* 1858 * We need to be careful here too. The 1859 * commit page could still be on the reader 1860 * page. We could have a small buffer, and 1861 * have filled up the buffer with events 1862 * from interrupts and such, and wrapped. 1863 * 1864 * Note, if the tail page is also the on the 1865 * reader_page, we let it move out. 1866 */ 1867 if (unlikely((cpu_buffer->commit_page != 1868 cpu_buffer->tail_page) && 1869 (cpu_buffer->commit_page == 1870 cpu_buffer->reader_page))) { 1871 local_inc(&cpu_buffer->commit_overrun); 1872 goto out_reset; 1873 } 1874 } 1875 } 1876 1877 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1878 if (ret) { 1879 /* 1880 * Nested commits always have zero deltas, so 1881 * just reread the time stamp 1882 */ 1883 *ts = rb_time_stamp(buffer); 1884 next_page->page->time_stamp = *ts; 1885 } 1886 1887 out_again: 1888 1889 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1890 1891 /* fail and let the caller try again */ 1892 return ERR_PTR(-EAGAIN); 1893 1894 out_reset: 1895 /* reset write */ 1896 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1897 1898 return NULL; 1899 } 1900 1901 static struct ring_buffer_event * 1902 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1903 unsigned type, unsigned long length, u64 *ts) 1904 { 1905 struct buffer_page *tail_page; 1906 struct ring_buffer_event *event; 1907 unsigned long tail, write; 1908 1909 tail_page = cpu_buffer->tail_page; 1910 write = local_add_return(length, &tail_page->write); 1911 1912 /* set write to only the index of the write */ 1913 write &= RB_WRITE_MASK; 1914 tail = write - length; 1915 1916 /* See if we shot pass the end of this buffer page */ 1917 if (write > BUF_PAGE_SIZE) 1918 return rb_move_tail(cpu_buffer, length, tail, 1919 tail_page, ts); 1920 1921 /* We reserved something on the buffer */ 1922 1923 event = __rb_page_index(tail_page, tail); 1924 kmemcheck_annotate_bitfield(event, bitfield); 1925 rb_update_event(event, type, length); 1926 1927 /* The passed in type is zero for DATA */ 1928 if (likely(!type)) 1929 local_inc(&tail_page->entries); 1930 1931 /* 1932 * If this is the first commit on the page, then update 1933 * its timestamp. 1934 */ 1935 if (!tail) 1936 tail_page->page->time_stamp = *ts; 1937 1938 return event; 1939 } 1940 1941 static inline int 1942 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 1943 struct ring_buffer_event *event) 1944 { 1945 unsigned long new_index, old_index; 1946 struct buffer_page *bpage; 1947 unsigned long index; 1948 unsigned long addr; 1949 1950 new_index = rb_event_index(event); 1951 old_index = new_index + rb_event_length(event); 1952 addr = (unsigned long)event; 1953 addr &= PAGE_MASK; 1954 1955 bpage = cpu_buffer->tail_page; 1956 1957 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 1958 unsigned long write_mask = 1959 local_read(&bpage->write) & ~RB_WRITE_MASK; 1960 /* 1961 * This is on the tail page. It is possible that 1962 * a write could come in and move the tail page 1963 * and write to the next page. That is fine 1964 * because we just shorten what is on this page. 1965 */ 1966 old_index += write_mask; 1967 new_index += write_mask; 1968 index = local_cmpxchg(&bpage->write, old_index, new_index); 1969 if (index == old_index) 1970 return 1; 1971 } 1972 1973 /* could not discard */ 1974 return 0; 1975 } 1976 1977 static int 1978 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1979 u64 *ts, u64 *delta) 1980 { 1981 struct ring_buffer_event *event; 1982 static int once; 1983 int ret; 1984 1985 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1986 printk(KERN_WARNING "Delta way too big! %llu" 1987 " ts=%llu write stamp = %llu\n", 1988 (unsigned long long)*delta, 1989 (unsigned long long)*ts, 1990 (unsigned long long)cpu_buffer->write_stamp); 1991 WARN_ON(1); 1992 } 1993 1994 /* 1995 * The delta is too big, we to add a 1996 * new timestamp. 1997 */ 1998 event = __rb_reserve_next(cpu_buffer, 1999 RINGBUF_TYPE_TIME_EXTEND, 2000 RB_LEN_TIME_EXTEND, 2001 ts); 2002 if (!event) 2003 return -EBUSY; 2004 2005 if (PTR_ERR(event) == -EAGAIN) 2006 return -EAGAIN; 2007 2008 /* Only a commited time event can update the write stamp */ 2009 if (rb_event_is_commit(cpu_buffer, event)) { 2010 /* 2011 * If this is the first on the page, then it was 2012 * updated with the page itself. Try to discard it 2013 * and if we can't just make it zero. 2014 */ 2015 if (rb_event_index(event)) { 2016 event->time_delta = *delta & TS_MASK; 2017 event->array[0] = *delta >> TS_SHIFT; 2018 } else { 2019 /* try to discard, since we do not need this */ 2020 if (!rb_try_to_discard(cpu_buffer, event)) { 2021 /* nope, just zero it */ 2022 event->time_delta = 0; 2023 event->array[0] = 0; 2024 } 2025 } 2026 cpu_buffer->write_stamp = *ts; 2027 /* let the caller know this was the commit */ 2028 ret = 1; 2029 } else { 2030 /* Try to discard the event */ 2031 if (!rb_try_to_discard(cpu_buffer, event)) { 2032 /* Darn, this is just wasted space */ 2033 event->time_delta = 0; 2034 event->array[0] = 0; 2035 } 2036 ret = 0; 2037 } 2038 2039 *delta = 0; 2040 2041 return ret; 2042 } 2043 2044 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2045 { 2046 local_inc(&cpu_buffer->committing); 2047 local_inc(&cpu_buffer->commits); 2048 } 2049 2050 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2051 { 2052 unsigned long commits; 2053 2054 if (RB_WARN_ON(cpu_buffer, 2055 !local_read(&cpu_buffer->committing))) 2056 return; 2057 2058 again: 2059 commits = local_read(&cpu_buffer->commits); 2060 /* synchronize with interrupts */ 2061 barrier(); 2062 if (local_read(&cpu_buffer->committing) == 1) 2063 rb_set_commit_to_write(cpu_buffer); 2064 2065 local_dec(&cpu_buffer->committing); 2066 2067 /* synchronize with interrupts */ 2068 barrier(); 2069 2070 /* 2071 * Need to account for interrupts coming in between the 2072 * updating of the commit page and the clearing of the 2073 * committing counter. 2074 */ 2075 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2076 !local_read(&cpu_buffer->committing)) { 2077 local_inc(&cpu_buffer->committing); 2078 goto again; 2079 } 2080 } 2081 2082 static struct ring_buffer_event * 2083 rb_reserve_next_event(struct ring_buffer *buffer, 2084 struct ring_buffer_per_cpu *cpu_buffer, 2085 unsigned long length) 2086 { 2087 struct ring_buffer_event *event; 2088 u64 ts, delta = 0; 2089 int commit = 0; 2090 int nr_loops = 0; 2091 2092 rb_start_commit(cpu_buffer); 2093 2094 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2095 /* 2096 * Due to the ability to swap a cpu buffer from a buffer 2097 * it is possible it was swapped before we committed. 2098 * (committing stops a swap). We check for it here and 2099 * if it happened, we have to fail the write. 2100 */ 2101 barrier(); 2102 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2103 local_dec(&cpu_buffer->committing); 2104 local_dec(&cpu_buffer->commits); 2105 return NULL; 2106 } 2107 #endif 2108 2109 length = rb_calculate_event_length(length); 2110 again: 2111 /* 2112 * We allow for interrupts to reenter here and do a trace. 2113 * If one does, it will cause this original code to loop 2114 * back here. Even with heavy interrupts happening, this 2115 * should only happen a few times in a row. If this happens 2116 * 1000 times in a row, there must be either an interrupt 2117 * storm or we have something buggy. 2118 * Bail! 2119 */ 2120 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2121 goto out_fail; 2122 2123 ts = rb_time_stamp(cpu_buffer->buffer); 2124 2125 /* 2126 * Only the first commit can update the timestamp. 2127 * Yes there is a race here. If an interrupt comes in 2128 * just after the conditional and it traces too, then it 2129 * will also check the deltas. More than one timestamp may 2130 * also be made. But only the entry that did the actual 2131 * commit will be something other than zero. 2132 */ 2133 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && 2134 rb_page_write(cpu_buffer->tail_page) == 2135 rb_commit_index(cpu_buffer))) { 2136 u64 diff; 2137 2138 diff = ts - cpu_buffer->write_stamp; 2139 2140 /* make sure this diff is calculated here */ 2141 barrier(); 2142 2143 /* Did the write stamp get updated already? */ 2144 if (unlikely(ts < cpu_buffer->write_stamp)) 2145 goto get_event; 2146 2147 delta = diff; 2148 if (unlikely(test_time_stamp(delta))) { 2149 2150 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 2151 if (commit == -EBUSY) 2152 goto out_fail; 2153 2154 if (commit == -EAGAIN) 2155 goto again; 2156 2157 RB_WARN_ON(cpu_buffer, commit < 0); 2158 } 2159 } 2160 2161 get_event: 2162 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 2163 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2164 goto again; 2165 2166 if (!event) 2167 goto out_fail; 2168 2169 if (!rb_event_is_commit(cpu_buffer, event)) 2170 delta = 0; 2171 2172 event->time_delta = delta; 2173 2174 return event; 2175 2176 out_fail: 2177 rb_end_commit(cpu_buffer); 2178 return NULL; 2179 } 2180 2181 #ifdef CONFIG_TRACING 2182 2183 #define TRACE_RECURSIVE_DEPTH 16 2184 2185 static int trace_recursive_lock(void) 2186 { 2187 current->trace_recursion++; 2188 2189 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 2190 return 0; 2191 2192 /* Disable all tracing before we do anything else */ 2193 tracing_off_permanent(); 2194 2195 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2196 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2197 current->trace_recursion, 2198 hardirq_count() >> HARDIRQ_SHIFT, 2199 softirq_count() >> SOFTIRQ_SHIFT, 2200 in_nmi()); 2201 2202 WARN_ON_ONCE(1); 2203 return -1; 2204 } 2205 2206 static void trace_recursive_unlock(void) 2207 { 2208 WARN_ON_ONCE(!current->trace_recursion); 2209 2210 current->trace_recursion--; 2211 } 2212 2213 #else 2214 2215 #define trace_recursive_lock() (0) 2216 #define trace_recursive_unlock() do { } while (0) 2217 2218 #endif 2219 2220 static DEFINE_PER_CPU(int, rb_need_resched); 2221 2222 /** 2223 * ring_buffer_lock_reserve - reserve a part of the buffer 2224 * @buffer: the ring buffer to reserve from 2225 * @length: the length of the data to reserve (excluding event header) 2226 * 2227 * Returns a reseverd event on the ring buffer to copy directly to. 2228 * The user of this interface will need to get the body to write into 2229 * and can use the ring_buffer_event_data() interface. 2230 * 2231 * The length is the length of the data needed, not the event length 2232 * which also includes the event header. 2233 * 2234 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2235 * If NULL is returned, then nothing has been allocated or locked. 2236 */ 2237 struct ring_buffer_event * 2238 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2239 { 2240 struct ring_buffer_per_cpu *cpu_buffer; 2241 struct ring_buffer_event *event; 2242 int cpu, resched; 2243 2244 if (ring_buffer_flags != RB_BUFFERS_ON) 2245 return NULL; 2246 2247 /* If we are tracing schedule, we don't want to recurse */ 2248 resched = ftrace_preempt_disable(); 2249 2250 if (atomic_read(&buffer->record_disabled)) 2251 goto out_nocheck; 2252 2253 if (trace_recursive_lock()) 2254 goto out_nocheck; 2255 2256 cpu = raw_smp_processor_id(); 2257 2258 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2259 goto out; 2260 2261 cpu_buffer = buffer->buffers[cpu]; 2262 2263 if (atomic_read(&cpu_buffer->record_disabled)) 2264 goto out; 2265 2266 if (length > BUF_MAX_DATA_SIZE) 2267 goto out; 2268 2269 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2270 if (!event) 2271 goto out; 2272 2273 /* 2274 * Need to store resched state on this cpu. 2275 * Only the first needs to. 2276 */ 2277 2278 if (preempt_count() == 1) 2279 per_cpu(rb_need_resched, cpu) = resched; 2280 2281 return event; 2282 2283 out: 2284 trace_recursive_unlock(); 2285 2286 out_nocheck: 2287 ftrace_preempt_enable(resched); 2288 return NULL; 2289 } 2290 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2291 2292 static void 2293 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2294 struct ring_buffer_event *event) 2295 { 2296 /* 2297 * The event first in the commit queue updates the 2298 * time stamp. 2299 */ 2300 if (rb_event_is_commit(cpu_buffer, event)) 2301 cpu_buffer->write_stamp += event->time_delta; 2302 } 2303 2304 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2305 struct ring_buffer_event *event) 2306 { 2307 local_inc(&cpu_buffer->entries); 2308 rb_update_write_stamp(cpu_buffer, event); 2309 rb_end_commit(cpu_buffer); 2310 } 2311 2312 /** 2313 * ring_buffer_unlock_commit - commit a reserved 2314 * @buffer: The buffer to commit to 2315 * @event: The event pointer to commit. 2316 * 2317 * This commits the data to the ring buffer, and releases any locks held. 2318 * 2319 * Must be paired with ring_buffer_lock_reserve. 2320 */ 2321 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2322 struct ring_buffer_event *event) 2323 { 2324 struct ring_buffer_per_cpu *cpu_buffer; 2325 int cpu = raw_smp_processor_id(); 2326 2327 cpu_buffer = buffer->buffers[cpu]; 2328 2329 rb_commit(cpu_buffer, event); 2330 2331 trace_recursive_unlock(); 2332 2333 /* 2334 * Only the last preempt count needs to restore preemption. 2335 */ 2336 if (preempt_count() == 1) 2337 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2338 else 2339 preempt_enable_no_resched_notrace(); 2340 2341 return 0; 2342 } 2343 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2344 2345 static inline void rb_event_discard(struct ring_buffer_event *event) 2346 { 2347 /* array[0] holds the actual length for the discarded event */ 2348 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2349 event->type_len = RINGBUF_TYPE_PADDING; 2350 /* time delta must be non zero */ 2351 if (!event->time_delta) 2352 event->time_delta = 1; 2353 } 2354 2355 /* 2356 * Decrement the entries to the page that an event is on. 2357 * The event does not even need to exist, only the pointer 2358 * to the page it is on. This may only be called before the commit 2359 * takes place. 2360 */ 2361 static inline void 2362 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2363 struct ring_buffer_event *event) 2364 { 2365 unsigned long addr = (unsigned long)event; 2366 struct buffer_page *bpage = cpu_buffer->commit_page; 2367 struct buffer_page *start; 2368 2369 addr &= PAGE_MASK; 2370 2371 /* Do the likely case first */ 2372 if (likely(bpage->page == (void *)addr)) { 2373 local_dec(&bpage->entries); 2374 return; 2375 } 2376 2377 /* 2378 * Because the commit page may be on the reader page we 2379 * start with the next page and check the end loop there. 2380 */ 2381 rb_inc_page(cpu_buffer, &bpage); 2382 start = bpage; 2383 do { 2384 if (bpage->page == (void *)addr) { 2385 local_dec(&bpage->entries); 2386 return; 2387 } 2388 rb_inc_page(cpu_buffer, &bpage); 2389 } while (bpage != start); 2390 2391 /* commit not part of this buffer?? */ 2392 RB_WARN_ON(cpu_buffer, 1); 2393 } 2394 2395 /** 2396 * ring_buffer_commit_discard - discard an event that has not been committed 2397 * @buffer: the ring buffer 2398 * @event: non committed event to discard 2399 * 2400 * Sometimes an event that is in the ring buffer needs to be ignored. 2401 * This function lets the user discard an event in the ring buffer 2402 * and then that event will not be read later. 2403 * 2404 * This function only works if it is called before the the item has been 2405 * committed. It will try to free the event from the ring buffer 2406 * if another event has not been added behind it. 2407 * 2408 * If another event has been added behind it, it will set the event 2409 * up as discarded, and perform the commit. 2410 * 2411 * If this function is called, do not call ring_buffer_unlock_commit on 2412 * the event. 2413 */ 2414 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2415 struct ring_buffer_event *event) 2416 { 2417 struct ring_buffer_per_cpu *cpu_buffer; 2418 int cpu; 2419 2420 /* The event is discarded regardless */ 2421 rb_event_discard(event); 2422 2423 cpu = smp_processor_id(); 2424 cpu_buffer = buffer->buffers[cpu]; 2425 2426 /* 2427 * This must only be called if the event has not been 2428 * committed yet. Thus we can assume that preemption 2429 * is still disabled. 2430 */ 2431 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2432 2433 rb_decrement_entry(cpu_buffer, event); 2434 if (rb_try_to_discard(cpu_buffer, event)) 2435 goto out; 2436 2437 /* 2438 * The commit is still visible by the reader, so we 2439 * must still update the timestamp. 2440 */ 2441 rb_update_write_stamp(cpu_buffer, event); 2442 out: 2443 rb_end_commit(cpu_buffer); 2444 2445 trace_recursive_unlock(); 2446 2447 /* 2448 * Only the last preempt count needs to restore preemption. 2449 */ 2450 if (preempt_count() == 1) 2451 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 2452 else 2453 preempt_enable_no_resched_notrace(); 2454 2455 } 2456 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2457 2458 /** 2459 * ring_buffer_write - write data to the buffer without reserving 2460 * @buffer: The ring buffer to write to. 2461 * @length: The length of the data being written (excluding the event header) 2462 * @data: The data to write to the buffer. 2463 * 2464 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2465 * one function. If you already have the data to write to the buffer, it 2466 * may be easier to simply call this function. 2467 * 2468 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2469 * and not the length of the event which would hold the header. 2470 */ 2471 int ring_buffer_write(struct ring_buffer *buffer, 2472 unsigned long length, 2473 void *data) 2474 { 2475 struct ring_buffer_per_cpu *cpu_buffer; 2476 struct ring_buffer_event *event; 2477 void *body; 2478 int ret = -EBUSY; 2479 int cpu, resched; 2480 2481 if (ring_buffer_flags != RB_BUFFERS_ON) 2482 return -EBUSY; 2483 2484 resched = ftrace_preempt_disable(); 2485 2486 if (atomic_read(&buffer->record_disabled)) 2487 goto out; 2488 2489 cpu = raw_smp_processor_id(); 2490 2491 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2492 goto out; 2493 2494 cpu_buffer = buffer->buffers[cpu]; 2495 2496 if (atomic_read(&cpu_buffer->record_disabled)) 2497 goto out; 2498 2499 if (length > BUF_MAX_DATA_SIZE) 2500 goto out; 2501 2502 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2503 if (!event) 2504 goto out; 2505 2506 body = rb_event_data(event); 2507 2508 memcpy(body, data, length); 2509 2510 rb_commit(cpu_buffer, event); 2511 2512 ret = 0; 2513 out: 2514 ftrace_preempt_enable(resched); 2515 2516 return ret; 2517 } 2518 EXPORT_SYMBOL_GPL(ring_buffer_write); 2519 2520 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2521 { 2522 struct buffer_page *reader = cpu_buffer->reader_page; 2523 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2524 struct buffer_page *commit = cpu_buffer->commit_page; 2525 2526 /* In case of error, head will be NULL */ 2527 if (unlikely(!head)) 2528 return 1; 2529 2530 return reader->read == rb_page_commit(reader) && 2531 (commit == reader || 2532 (commit == head && 2533 head->read == rb_page_commit(commit))); 2534 } 2535 2536 /** 2537 * ring_buffer_record_disable - stop all writes into the buffer 2538 * @buffer: The ring buffer to stop writes to. 2539 * 2540 * This prevents all writes to the buffer. Any attempt to write 2541 * to the buffer after this will fail and return NULL. 2542 * 2543 * The caller should call synchronize_sched() after this. 2544 */ 2545 void ring_buffer_record_disable(struct ring_buffer *buffer) 2546 { 2547 atomic_inc(&buffer->record_disabled); 2548 } 2549 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2550 2551 /** 2552 * ring_buffer_record_enable - enable writes to the buffer 2553 * @buffer: The ring buffer to enable writes 2554 * 2555 * Note, multiple disables will need the same number of enables 2556 * to truly enable the writing (much like preempt_disable). 2557 */ 2558 void ring_buffer_record_enable(struct ring_buffer *buffer) 2559 { 2560 atomic_dec(&buffer->record_disabled); 2561 } 2562 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2563 2564 /** 2565 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2566 * @buffer: The ring buffer to stop writes to. 2567 * @cpu: The CPU buffer to stop 2568 * 2569 * This prevents all writes to the buffer. Any attempt to write 2570 * to the buffer after this will fail and return NULL. 2571 * 2572 * The caller should call synchronize_sched() after this. 2573 */ 2574 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2575 { 2576 struct ring_buffer_per_cpu *cpu_buffer; 2577 2578 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2579 return; 2580 2581 cpu_buffer = buffer->buffers[cpu]; 2582 atomic_inc(&cpu_buffer->record_disabled); 2583 } 2584 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2585 2586 /** 2587 * ring_buffer_record_enable_cpu - enable writes to the buffer 2588 * @buffer: The ring buffer to enable writes 2589 * @cpu: The CPU to enable. 2590 * 2591 * Note, multiple disables will need the same number of enables 2592 * to truly enable the writing (much like preempt_disable). 2593 */ 2594 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2595 { 2596 struct ring_buffer_per_cpu *cpu_buffer; 2597 2598 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2599 return; 2600 2601 cpu_buffer = buffer->buffers[cpu]; 2602 atomic_dec(&cpu_buffer->record_disabled); 2603 } 2604 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2605 2606 /** 2607 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2608 * @buffer: The ring buffer 2609 * @cpu: The per CPU buffer to get the entries from. 2610 */ 2611 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2612 { 2613 struct ring_buffer_per_cpu *cpu_buffer; 2614 unsigned long ret; 2615 2616 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2617 return 0; 2618 2619 cpu_buffer = buffer->buffers[cpu]; 2620 ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun)) 2621 - cpu_buffer->read; 2622 2623 return ret; 2624 } 2625 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2626 2627 /** 2628 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2629 * @buffer: The ring buffer 2630 * @cpu: The per CPU buffer to get the number of overruns from 2631 */ 2632 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2633 { 2634 struct ring_buffer_per_cpu *cpu_buffer; 2635 unsigned long ret; 2636 2637 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2638 return 0; 2639 2640 cpu_buffer = buffer->buffers[cpu]; 2641 ret = local_read(&cpu_buffer->overrun); 2642 2643 return ret; 2644 } 2645 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2646 2647 /** 2648 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2649 * @buffer: The ring buffer 2650 * @cpu: The per CPU buffer to get the number of overruns from 2651 */ 2652 unsigned long 2653 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2654 { 2655 struct ring_buffer_per_cpu *cpu_buffer; 2656 unsigned long ret; 2657 2658 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2659 return 0; 2660 2661 cpu_buffer = buffer->buffers[cpu]; 2662 ret = local_read(&cpu_buffer->commit_overrun); 2663 2664 return ret; 2665 } 2666 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2667 2668 /** 2669 * ring_buffer_entries - get the number of entries in a buffer 2670 * @buffer: The ring buffer 2671 * 2672 * Returns the total number of entries in the ring buffer 2673 * (all CPU entries) 2674 */ 2675 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2676 { 2677 struct ring_buffer_per_cpu *cpu_buffer; 2678 unsigned long entries = 0; 2679 int cpu; 2680 2681 /* if you care about this being correct, lock the buffer */ 2682 for_each_buffer_cpu(buffer, cpu) { 2683 cpu_buffer = buffer->buffers[cpu]; 2684 entries += (local_read(&cpu_buffer->entries) - 2685 local_read(&cpu_buffer->overrun)) - cpu_buffer->read; 2686 } 2687 2688 return entries; 2689 } 2690 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2691 2692 /** 2693 * ring_buffer_overruns - get the number of overruns in buffer 2694 * @buffer: The ring buffer 2695 * 2696 * Returns the total number of overruns in the ring buffer 2697 * (all CPU entries) 2698 */ 2699 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2700 { 2701 struct ring_buffer_per_cpu *cpu_buffer; 2702 unsigned long overruns = 0; 2703 int cpu; 2704 2705 /* if you care about this being correct, lock the buffer */ 2706 for_each_buffer_cpu(buffer, cpu) { 2707 cpu_buffer = buffer->buffers[cpu]; 2708 overruns += local_read(&cpu_buffer->overrun); 2709 } 2710 2711 return overruns; 2712 } 2713 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2714 2715 static void rb_iter_reset(struct ring_buffer_iter *iter) 2716 { 2717 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2718 2719 /* Iterator usage is expected to have record disabled */ 2720 if (list_empty(&cpu_buffer->reader_page->list)) { 2721 iter->head_page = rb_set_head_page(cpu_buffer); 2722 if (unlikely(!iter->head_page)) 2723 return; 2724 iter->head = iter->head_page->read; 2725 } else { 2726 iter->head_page = cpu_buffer->reader_page; 2727 iter->head = cpu_buffer->reader_page->read; 2728 } 2729 if (iter->head) 2730 iter->read_stamp = cpu_buffer->read_stamp; 2731 else 2732 iter->read_stamp = iter->head_page->page->time_stamp; 2733 iter->cache_reader_page = cpu_buffer->reader_page; 2734 iter->cache_read = cpu_buffer->read; 2735 } 2736 2737 /** 2738 * ring_buffer_iter_reset - reset an iterator 2739 * @iter: The iterator to reset 2740 * 2741 * Resets the iterator, so that it will start from the beginning 2742 * again. 2743 */ 2744 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2745 { 2746 struct ring_buffer_per_cpu *cpu_buffer; 2747 unsigned long flags; 2748 2749 if (!iter) 2750 return; 2751 2752 cpu_buffer = iter->cpu_buffer; 2753 2754 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2755 rb_iter_reset(iter); 2756 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2757 } 2758 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2759 2760 /** 2761 * ring_buffer_iter_empty - check if an iterator has no more to read 2762 * @iter: The iterator to check 2763 */ 2764 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2765 { 2766 struct ring_buffer_per_cpu *cpu_buffer; 2767 2768 cpu_buffer = iter->cpu_buffer; 2769 2770 return iter->head_page == cpu_buffer->commit_page && 2771 iter->head == rb_commit_index(cpu_buffer); 2772 } 2773 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2774 2775 static void 2776 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2777 struct ring_buffer_event *event) 2778 { 2779 u64 delta; 2780 2781 switch (event->type_len) { 2782 case RINGBUF_TYPE_PADDING: 2783 return; 2784 2785 case RINGBUF_TYPE_TIME_EXTEND: 2786 delta = event->array[0]; 2787 delta <<= TS_SHIFT; 2788 delta += event->time_delta; 2789 cpu_buffer->read_stamp += delta; 2790 return; 2791 2792 case RINGBUF_TYPE_TIME_STAMP: 2793 /* FIXME: not implemented */ 2794 return; 2795 2796 case RINGBUF_TYPE_DATA: 2797 cpu_buffer->read_stamp += event->time_delta; 2798 return; 2799 2800 default: 2801 BUG(); 2802 } 2803 return; 2804 } 2805 2806 static void 2807 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2808 struct ring_buffer_event *event) 2809 { 2810 u64 delta; 2811 2812 switch (event->type_len) { 2813 case RINGBUF_TYPE_PADDING: 2814 return; 2815 2816 case RINGBUF_TYPE_TIME_EXTEND: 2817 delta = event->array[0]; 2818 delta <<= TS_SHIFT; 2819 delta += event->time_delta; 2820 iter->read_stamp += delta; 2821 return; 2822 2823 case RINGBUF_TYPE_TIME_STAMP: 2824 /* FIXME: not implemented */ 2825 return; 2826 2827 case RINGBUF_TYPE_DATA: 2828 iter->read_stamp += event->time_delta; 2829 return; 2830 2831 default: 2832 BUG(); 2833 } 2834 return; 2835 } 2836 2837 static struct buffer_page * 2838 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2839 { 2840 struct buffer_page *reader = NULL; 2841 unsigned long flags; 2842 int nr_loops = 0; 2843 int ret; 2844 2845 local_irq_save(flags); 2846 arch_spin_lock(&cpu_buffer->lock); 2847 2848 again: 2849 /* 2850 * This should normally only loop twice. But because the 2851 * start of the reader inserts an empty page, it causes 2852 * a case where we will loop three times. There should be no 2853 * reason to loop four times (that I know of). 2854 */ 2855 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2856 reader = NULL; 2857 goto out; 2858 } 2859 2860 reader = cpu_buffer->reader_page; 2861 2862 /* If there's more to read, return this page */ 2863 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2864 goto out; 2865 2866 /* Never should we have an index greater than the size */ 2867 if (RB_WARN_ON(cpu_buffer, 2868 cpu_buffer->reader_page->read > rb_page_size(reader))) 2869 goto out; 2870 2871 /* check if we caught up to the tail */ 2872 reader = NULL; 2873 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2874 goto out; 2875 2876 /* 2877 * Reset the reader page to size zero. 2878 */ 2879 local_set(&cpu_buffer->reader_page->write, 0); 2880 local_set(&cpu_buffer->reader_page->entries, 0); 2881 local_set(&cpu_buffer->reader_page->page->commit, 0); 2882 2883 spin: 2884 /* 2885 * Splice the empty reader page into the list around the head. 2886 */ 2887 reader = rb_set_head_page(cpu_buffer); 2888 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 2889 cpu_buffer->reader_page->list.prev = reader->list.prev; 2890 2891 /* 2892 * cpu_buffer->pages just needs to point to the buffer, it 2893 * has no specific buffer page to point to. Lets move it out 2894 * of our way so we don't accidently swap it. 2895 */ 2896 cpu_buffer->pages = reader->list.prev; 2897 2898 /* The reader page will be pointing to the new head */ 2899 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2900 2901 /* 2902 * Here's the tricky part. 2903 * 2904 * We need to move the pointer past the header page. 2905 * But we can only do that if a writer is not currently 2906 * moving it. The page before the header page has the 2907 * flag bit '1' set if it is pointing to the page we want. 2908 * but if the writer is in the process of moving it 2909 * than it will be '2' or already moved '0'. 2910 */ 2911 2912 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 2913 2914 /* 2915 * If we did not convert it, then we must try again. 2916 */ 2917 if (!ret) 2918 goto spin; 2919 2920 /* 2921 * Yeah! We succeeded in replacing the page. 2922 * 2923 * Now make the new head point back to the reader page. 2924 */ 2925 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 2926 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2927 2928 /* Finally update the reader page to the new head */ 2929 cpu_buffer->reader_page = reader; 2930 rb_reset_reader_page(cpu_buffer); 2931 2932 goto again; 2933 2934 out: 2935 arch_spin_unlock(&cpu_buffer->lock); 2936 local_irq_restore(flags); 2937 2938 return reader; 2939 } 2940 2941 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2942 { 2943 struct ring_buffer_event *event; 2944 struct buffer_page *reader; 2945 unsigned length; 2946 2947 reader = rb_get_reader_page(cpu_buffer); 2948 2949 /* This function should not be called when buffer is empty */ 2950 if (RB_WARN_ON(cpu_buffer, !reader)) 2951 return; 2952 2953 event = rb_reader_event(cpu_buffer); 2954 2955 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 2956 cpu_buffer->read++; 2957 2958 rb_update_read_stamp(cpu_buffer, event); 2959 2960 length = rb_event_length(event); 2961 cpu_buffer->reader_page->read += length; 2962 } 2963 2964 static void rb_advance_iter(struct ring_buffer_iter *iter) 2965 { 2966 struct ring_buffer *buffer; 2967 struct ring_buffer_per_cpu *cpu_buffer; 2968 struct ring_buffer_event *event; 2969 unsigned length; 2970 2971 cpu_buffer = iter->cpu_buffer; 2972 buffer = cpu_buffer->buffer; 2973 2974 /* 2975 * Check if we are at the end of the buffer. 2976 */ 2977 if (iter->head >= rb_page_size(iter->head_page)) { 2978 /* discarded commits can make the page empty */ 2979 if (iter->head_page == cpu_buffer->commit_page) 2980 return; 2981 rb_inc_iter(iter); 2982 return; 2983 } 2984 2985 event = rb_iter_head_event(iter); 2986 2987 length = rb_event_length(event); 2988 2989 /* 2990 * This should not be called to advance the header if we are 2991 * at the tail of the buffer. 2992 */ 2993 if (RB_WARN_ON(cpu_buffer, 2994 (iter->head_page == cpu_buffer->commit_page) && 2995 (iter->head + length > rb_commit_index(cpu_buffer)))) 2996 return; 2997 2998 rb_update_iter_read_stamp(iter, event); 2999 3000 iter->head += length; 3001 3002 /* check for end of page padding */ 3003 if ((iter->head >= rb_page_size(iter->head_page)) && 3004 (iter->head_page != cpu_buffer->commit_page)) 3005 rb_advance_iter(iter); 3006 } 3007 3008 static struct ring_buffer_event * 3009 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) 3010 { 3011 struct ring_buffer_event *event; 3012 struct buffer_page *reader; 3013 int nr_loops = 0; 3014 3015 again: 3016 /* 3017 * We repeat when a timestamp is encountered. It is possible 3018 * to get multiple timestamps from an interrupt entering just 3019 * as one timestamp is about to be written, or from discarded 3020 * commits. The most that we can have is the number on a single page. 3021 */ 3022 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3023 return NULL; 3024 3025 reader = rb_get_reader_page(cpu_buffer); 3026 if (!reader) 3027 return NULL; 3028 3029 event = rb_reader_event(cpu_buffer); 3030 3031 switch (event->type_len) { 3032 case RINGBUF_TYPE_PADDING: 3033 if (rb_null_event(event)) 3034 RB_WARN_ON(cpu_buffer, 1); 3035 /* 3036 * Because the writer could be discarding every 3037 * event it creates (which would probably be bad) 3038 * if we were to go back to "again" then we may never 3039 * catch up, and will trigger the warn on, or lock 3040 * the box. Return the padding, and we will release 3041 * the current locks, and try again. 3042 */ 3043 return event; 3044 3045 case RINGBUF_TYPE_TIME_EXTEND: 3046 /* Internal data, OK to advance */ 3047 rb_advance_reader(cpu_buffer); 3048 goto again; 3049 3050 case RINGBUF_TYPE_TIME_STAMP: 3051 /* FIXME: not implemented */ 3052 rb_advance_reader(cpu_buffer); 3053 goto again; 3054 3055 case RINGBUF_TYPE_DATA: 3056 if (ts) { 3057 *ts = cpu_buffer->read_stamp + event->time_delta; 3058 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3059 cpu_buffer->cpu, ts); 3060 } 3061 return event; 3062 3063 default: 3064 BUG(); 3065 } 3066 3067 return NULL; 3068 } 3069 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3070 3071 static struct ring_buffer_event * 3072 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3073 { 3074 struct ring_buffer *buffer; 3075 struct ring_buffer_per_cpu *cpu_buffer; 3076 struct ring_buffer_event *event; 3077 int nr_loops = 0; 3078 3079 cpu_buffer = iter->cpu_buffer; 3080 buffer = cpu_buffer->buffer; 3081 3082 /* 3083 * Check if someone performed a consuming read to 3084 * the buffer. A consuming read invalidates the iterator 3085 * and we need to reset the iterator in this case. 3086 */ 3087 if (unlikely(iter->cache_read != cpu_buffer->read || 3088 iter->cache_reader_page != cpu_buffer->reader_page)) 3089 rb_iter_reset(iter); 3090 3091 again: 3092 if (ring_buffer_iter_empty(iter)) 3093 return NULL; 3094 3095 /* 3096 * We repeat when a timestamp is encountered. 3097 * We can get multiple timestamps by nested interrupts or also 3098 * if filtering is on (discarding commits). Since discarding 3099 * commits can be frequent we can get a lot of timestamps. 3100 * But we limit them by not adding timestamps if they begin 3101 * at the start of a page. 3102 */ 3103 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3104 return NULL; 3105 3106 if (rb_per_cpu_empty(cpu_buffer)) 3107 return NULL; 3108 3109 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3110 rb_inc_iter(iter); 3111 goto again; 3112 } 3113 3114 event = rb_iter_head_event(iter); 3115 3116 switch (event->type_len) { 3117 case RINGBUF_TYPE_PADDING: 3118 if (rb_null_event(event)) { 3119 rb_inc_iter(iter); 3120 goto again; 3121 } 3122 rb_advance_iter(iter); 3123 return event; 3124 3125 case RINGBUF_TYPE_TIME_EXTEND: 3126 /* Internal data, OK to advance */ 3127 rb_advance_iter(iter); 3128 goto again; 3129 3130 case RINGBUF_TYPE_TIME_STAMP: 3131 /* FIXME: not implemented */ 3132 rb_advance_iter(iter); 3133 goto again; 3134 3135 case RINGBUF_TYPE_DATA: 3136 if (ts) { 3137 *ts = iter->read_stamp + event->time_delta; 3138 ring_buffer_normalize_time_stamp(buffer, 3139 cpu_buffer->cpu, ts); 3140 } 3141 return event; 3142 3143 default: 3144 BUG(); 3145 } 3146 3147 return NULL; 3148 } 3149 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3150 3151 static inline int rb_ok_to_lock(void) 3152 { 3153 /* 3154 * If an NMI die dumps out the content of the ring buffer 3155 * do not grab locks. We also permanently disable the ring 3156 * buffer too. A one time deal is all you get from reading 3157 * the ring buffer from an NMI. 3158 */ 3159 if (likely(!in_nmi())) 3160 return 1; 3161 3162 tracing_off_permanent(); 3163 return 0; 3164 } 3165 3166 /** 3167 * ring_buffer_peek - peek at the next event to be read 3168 * @buffer: The ring buffer to read 3169 * @cpu: The cpu to peak at 3170 * @ts: The timestamp counter of this event. 3171 * 3172 * This will return the event that will be read next, but does 3173 * not consume the data. 3174 */ 3175 struct ring_buffer_event * 3176 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3177 { 3178 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3179 struct ring_buffer_event *event; 3180 unsigned long flags; 3181 int dolock; 3182 3183 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3184 return NULL; 3185 3186 dolock = rb_ok_to_lock(); 3187 again: 3188 local_irq_save(flags); 3189 if (dolock) 3190 spin_lock(&cpu_buffer->reader_lock); 3191 event = rb_buffer_peek(cpu_buffer, ts); 3192 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3193 rb_advance_reader(cpu_buffer); 3194 if (dolock) 3195 spin_unlock(&cpu_buffer->reader_lock); 3196 local_irq_restore(flags); 3197 3198 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3199 goto again; 3200 3201 return event; 3202 } 3203 3204 /** 3205 * ring_buffer_iter_peek - peek at the next event to be read 3206 * @iter: The ring buffer iterator 3207 * @ts: The timestamp counter of this event. 3208 * 3209 * This will return the event that will be read next, but does 3210 * not increment the iterator. 3211 */ 3212 struct ring_buffer_event * 3213 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3214 { 3215 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3216 struct ring_buffer_event *event; 3217 unsigned long flags; 3218 3219 again: 3220 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3221 event = rb_iter_peek(iter, ts); 3222 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3223 3224 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3225 goto again; 3226 3227 return event; 3228 } 3229 3230 /** 3231 * ring_buffer_consume - return an event and consume it 3232 * @buffer: The ring buffer to get the next event from 3233 * 3234 * Returns the next event in the ring buffer, and that event is consumed. 3235 * Meaning, that sequential reads will keep returning a different event, 3236 * and eventually empty the ring buffer if the producer is slower. 3237 */ 3238 struct ring_buffer_event * 3239 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 3240 { 3241 struct ring_buffer_per_cpu *cpu_buffer; 3242 struct ring_buffer_event *event = NULL; 3243 unsigned long flags; 3244 int dolock; 3245 3246 dolock = rb_ok_to_lock(); 3247 3248 again: 3249 /* might be called in atomic */ 3250 preempt_disable(); 3251 3252 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3253 goto out; 3254 3255 cpu_buffer = buffer->buffers[cpu]; 3256 local_irq_save(flags); 3257 if (dolock) 3258 spin_lock(&cpu_buffer->reader_lock); 3259 3260 event = rb_buffer_peek(cpu_buffer, ts); 3261 if (event) 3262 rb_advance_reader(cpu_buffer); 3263 3264 if (dolock) 3265 spin_unlock(&cpu_buffer->reader_lock); 3266 local_irq_restore(flags); 3267 3268 out: 3269 preempt_enable(); 3270 3271 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3272 goto again; 3273 3274 return event; 3275 } 3276 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3277 3278 /** 3279 * ring_buffer_read_start - start a non consuming read of the buffer 3280 * @buffer: The ring buffer to read from 3281 * @cpu: The cpu buffer to iterate over 3282 * 3283 * This starts up an iteration through the buffer. It also disables 3284 * the recording to the buffer until the reading is finished. 3285 * This prevents the reading from being corrupted. This is not 3286 * a consuming read, so a producer is not expected. 3287 * 3288 * Must be paired with ring_buffer_finish. 3289 */ 3290 struct ring_buffer_iter * 3291 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 3292 { 3293 struct ring_buffer_per_cpu *cpu_buffer; 3294 struct ring_buffer_iter *iter; 3295 unsigned long flags; 3296 3297 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3298 return NULL; 3299 3300 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3301 if (!iter) 3302 return NULL; 3303 3304 cpu_buffer = buffer->buffers[cpu]; 3305 3306 iter->cpu_buffer = cpu_buffer; 3307 3308 atomic_inc(&cpu_buffer->record_disabled); 3309 synchronize_sched(); 3310 3311 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3312 arch_spin_lock(&cpu_buffer->lock); 3313 rb_iter_reset(iter); 3314 arch_spin_unlock(&cpu_buffer->lock); 3315 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3316 3317 return iter; 3318 } 3319 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3320 3321 /** 3322 * ring_buffer_finish - finish reading the iterator of the buffer 3323 * @iter: The iterator retrieved by ring_buffer_start 3324 * 3325 * This re-enables the recording to the buffer, and frees the 3326 * iterator. 3327 */ 3328 void 3329 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3330 { 3331 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3332 3333 atomic_dec(&cpu_buffer->record_disabled); 3334 kfree(iter); 3335 } 3336 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3337 3338 /** 3339 * ring_buffer_read - read the next item in the ring buffer by the iterator 3340 * @iter: The ring buffer iterator 3341 * @ts: The time stamp of the event read. 3342 * 3343 * This reads the next event in the ring buffer and increments the iterator. 3344 */ 3345 struct ring_buffer_event * 3346 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3347 { 3348 struct ring_buffer_event *event; 3349 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3350 unsigned long flags; 3351 3352 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3353 again: 3354 event = rb_iter_peek(iter, ts); 3355 if (!event) 3356 goto out; 3357 3358 if (event->type_len == RINGBUF_TYPE_PADDING) 3359 goto again; 3360 3361 rb_advance_iter(iter); 3362 out: 3363 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3364 3365 return event; 3366 } 3367 EXPORT_SYMBOL_GPL(ring_buffer_read); 3368 3369 /** 3370 * ring_buffer_size - return the size of the ring buffer (in bytes) 3371 * @buffer: The ring buffer. 3372 */ 3373 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3374 { 3375 return BUF_PAGE_SIZE * buffer->pages; 3376 } 3377 EXPORT_SYMBOL_GPL(ring_buffer_size); 3378 3379 static void 3380 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3381 { 3382 rb_head_page_deactivate(cpu_buffer); 3383 3384 cpu_buffer->head_page 3385 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3386 local_set(&cpu_buffer->head_page->write, 0); 3387 local_set(&cpu_buffer->head_page->entries, 0); 3388 local_set(&cpu_buffer->head_page->page->commit, 0); 3389 3390 cpu_buffer->head_page->read = 0; 3391 3392 cpu_buffer->tail_page = cpu_buffer->head_page; 3393 cpu_buffer->commit_page = cpu_buffer->head_page; 3394 3395 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3396 local_set(&cpu_buffer->reader_page->write, 0); 3397 local_set(&cpu_buffer->reader_page->entries, 0); 3398 local_set(&cpu_buffer->reader_page->page->commit, 0); 3399 cpu_buffer->reader_page->read = 0; 3400 3401 local_set(&cpu_buffer->commit_overrun, 0); 3402 local_set(&cpu_buffer->overrun, 0); 3403 local_set(&cpu_buffer->entries, 0); 3404 local_set(&cpu_buffer->committing, 0); 3405 local_set(&cpu_buffer->commits, 0); 3406 cpu_buffer->read = 0; 3407 3408 cpu_buffer->write_stamp = 0; 3409 cpu_buffer->read_stamp = 0; 3410 3411 rb_head_page_activate(cpu_buffer); 3412 } 3413 3414 /** 3415 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3416 * @buffer: The ring buffer to reset a per cpu buffer of 3417 * @cpu: The CPU buffer to be reset 3418 */ 3419 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3420 { 3421 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3422 unsigned long flags; 3423 3424 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3425 return; 3426 3427 atomic_inc(&cpu_buffer->record_disabled); 3428 3429 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3430 3431 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3432 goto out; 3433 3434 arch_spin_lock(&cpu_buffer->lock); 3435 3436 rb_reset_cpu(cpu_buffer); 3437 3438 arch_spin_unlock(&cpu_buffer->lock); 3439 3440 out: 3441 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3442 3443 atomic_dec(&cpu_buffer->record_disabled); 3444 } 3445 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3446 3447 /** 3448 * ring_buffer_reset - reset a ring buffer 3449 * @buffer: The ring buffer to reset all cpu buffers 3450 */ 3451 void ring_buffer_reset(struct ring_buffer *buffer) 3452 { 3453 int cpu; 3454 3455 for_each_buffer_cpu(buffer, cpu) 3456 ring_buffer_reset_cpu(buffer, cpu); 3457 } 3458 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3459 3460 /** 3461 * rind_buffer_empty - is the ring buffer empty? 3462 * @buffer: The ring buffer to test 3463 */ 3464 int ring_buffer_empty(struct ring_buffer *buffer) 3465 { 3466 struct ring_buffer_per_cpu *cpu_buffer; 3467 unsigned long flags; 3468 int dolock; 3469 int cpu; 3470 int ret; 3471 3472 dolock = rb_ok_to_lock(); 3473 3474 /* yes this is racy, but if you don't like the race, lock the buffer */ 3475 for_each_buffer_cpu(buffer, cpu) { 3476 cpu_buffer = buffer->buffers[cpu]; 3477 local_irq_save(flags); 3478 if (dolock) 3479 spin_lock(&cpu_buffer->reader_lock); 3480 ret = rb_per_cpu_empty(cpu_buffer); 3481 if (dolock) 3482 spin_unlock(&cpu_buffer->reader_lock); 3483 local_irq_restore(flags); 3484 3485 if (!ret) 3486 return 0; 3487 } 3488 3489 return 1; 3490 } 3491 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3492 3493 /** 3494 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3495 * @buffer: The ring buffer 3496 * @cpu: The CPU buffer to test 3497 */ 3498 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3499 { 3500 struct ring_buffer_per_cpu *cpu_buffer; 3501 unsigned long flags; 3502 int dolock; 3503 int ret; 3504 3505 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3506 return 1; 3507 3508 dolock = rb_ok_to_lock(); 3509 3510 cpu_buffer = buffer->buffers[cpu]; 3511 local_irq_save(flags); 3512 if (dolock) 3513 spin_lock(&cpu_buffer->reader_lock); 3514 ret = rb_per_cpu_empty(cpu_buffer); 3515 if (dolock) 3516 spin_unlock(&cpu_buffer->reader_lock); 3517 local_irq_restore(flags); 3518 3519 return ret; 3520 } 3521 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3522 3523 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3524 /** 3525 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3526 * @buffer_a: One buffer to swap with 3527 * @buffer_b: The other buffer to swap with 3528 * 3529 * This function is useful for tracers that want to take a "snapshot" 3530 * of a CPU buffer and has another back up buffer lying around. 3531 * it is expected that the tracer handles the cpu buffer not being 3532 * used at the moment. 3533 */ 3534 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3535 struct ring_buffer *buffer_b, int cpu) 3536 { 3537 struct ring_buffer_per_cpu *cpu_buffer_a; 3538 struct ring_buffer_per_cpu *cpu_buffer_b; 3539 int ret = -EINVAL; 3540 3541 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3542 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3543 goto out; 3544 3545 /* At least make sure the two buffers are somewhat the same */ 3546 if (buffer_a->pages != buffer_b->pages) 3547 goto out; 3548 3549 ret = -EAGAIN; 3550 3551 if (ring_buffer_flags != RB_BUFFERS_ON) 3552 goto out; 3553 3554 if (atomic_read(&buffer_a->record_disabled)) 3555 goto out; 3556 3557 if (atomic_read(&buffer_b->record_disabled)) 3558 goto out; 3559 3560 cpu_buffer_a = buffer_a->buffers[cpu]; 3561 cpu_buffer_b = buffer_b->buffers[cpu]; 3562 3563 if (atomic_read(&cpu_buffer_a->record_disabled)) 3564 goto out; 3565 3566 if (atomic_read(&cpu_buffer_b->record_disabled)) 3567 goto out; 3568 3569 /* 3570 * We can't do a synchronize_sched here because this 3571 * function can be called in atomic context. 3572 * Normally this will be called from the same CPU as cpu. 3573 * If not it's up to the caller to protect this. 3574 */ 3575 atomic_inc(&cpu_buffer_a->record_disabled); 3576 atomic_inc(&cpu_buffer_b->record_disabled); 3577 3578 ret = -EBUSY; 3579 if (local_read(&cpu_buffer_a->committing)) 3580 goto out_dec; 3581 if (local_read(&cpu_buffer_b->committing)) 3582 goto out_dec; 3583 3584 buffer_a->buffers[cpu] = cpu_buffer_b; 3585 buffer_b->buffers[cpu] = cpu_buffer_a; 3586 3587 cpu_buffer_b->buffer = buffer_a; 3588 cpu_buffer_a->buffer = buffer_b; 3589 3590 ret = 0; 3591 3592 out_dec: 3593 atomic_dec(&cpu_buffer_a->record_disabled); 3594 atomic_dec(&cpu_buffer_b->record_disabled); 3595 out: 3596 return ret; 3597 } 3598 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3599 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3600 3601 /** 3602 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3603 * @buffer: the buffer to allocate for. 3604 * 3605 * This function is used in conjunction with ring_buffer_read_page. 3606 * When reading a full page from the ring buffer, these functions 3607 * can be used to speed up the process. The calling function should 3608 * allocate a few pages first with this function. Then when it 3609 * needs to get pages from the ring buffer, it passes the result 3610 * of this function into ring_buffer_read_page, which will swap 3611 * the page that was allocated, with the read page of the buffer. 3612 * 3613 * Returns: 3614 * The page allocated, or NULL on error. 3615 */ 3616 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 3617 { 3618 struct buffer_data_page *bpage; 3619 unsigned long addr; 3620 3621 addr = __get_free_page(GFP_KERNEL); 3622 if (!addr) 3623 return NULL; 3624 3625 bpage = (void *)addr; 3626 3627 rb_init_page(bpage); 3628 3629 return bpage; 3630 } 3631 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3632 3633 /** 3634 * ring_buffer_free_read_page - free an allocated read page 3635 * @buffer: the buffer the page was allocate for 3636 * @data: the page to free 3637 * 3638 * Free a page allocated from ring_buffer_alloc_read_page. 3639 */ 3640 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3641 { 3642 free_page((unsigned long)data); 3643 } 3644 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3645 3646 /** 3647 * ring_buffer_read_page - extract a page from the ring buffer 3648 * @buffer: buffer to extract from 3649 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3650 * @len: amount to extract 3651 * @cpu: the cpu of the buffer to extract 3652 * @full: should the extraction only happen when the page is full. 3653 * 3654 * This function will pull out a page from the ring buffer and consume it. 3655 * @data_page must be the address of the variable that was returned 3656 * from ring_buffer_alloc_read_page. This is because the page might be used 3657 * to swap with a page in the ring buffer. 3658 * 3659 * for example: 3660 * rpage = ring_buffer_alloc_read_page(buffer); 3661 * if (!rpage) 3662 * return error; 3663 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3664 * if (ret >= 0) 3665 * process_page(rpage, ret); 3666 * 3667 * When @full is set, the function will not return true unless 3668 * the writer is off the reader page. 3669 * 3670 * Note: it is up to the calling functions to handle sleeps and wakeups. 3671 * The ring buffer can be used anywhere in the kernel and can not 3672 * blindly call wake_up. The layer that uses the ring buffer must be 3673 * responsible for that. 3674 * 3675 * Returns: 3676 * >=0 if data has been transferred, returns the offset of consumed data. 3677 * <0 if no data has been transferred. 3678 */ 3679 int ring_buffer_read_page(struct ring_buffer *buffer, 3680 void **data_page, size_t len, int cpu, int full) 3681 { 3682 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3683 struct ring_buffer_event *event; 3684 struct buffer_data_page *bpage; 3685 struct buffer_page *reader; 3686 unsigned long flags; 3687 unsigned int commit; 3688 unsigned int read; 3689 u64 save_timestamp; 3690 int ret = -1; 3691 3692 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3693 goto out; 3694 3695 /* 3696 * If len is not big enough to hold the page header, then 3697 * we can not copy anything. 3698 */ 3699 if (len <= BUF_PAGE_HDR_SIZE) 3700 goto out; 3701 3702 len -= BUF_PAGE_HDR_SIZE; 3703 3704 if (!data_page) 3705 goto out; 3706 3707 bpage = *data_page; 3708 if (!bpage) 3709 goto out; 3710 3711 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3712 3713 reader = rb_get_reader_page(cpu_buffer); 3714 if (!reader) 3715 goto out_unlock; 3716 3717 event = rb_reader_event(cpu_buffer); 3718 3719 read = reader->read; 3720 commit = rb_page_commit(reader); 3721 3722 /* 3723 * If this page has been partially read or 3724 * if len is not big enough to read the rest of the page or 3725 * a writer is still on the page, then 3726 * we must copy the data from the page to the buffer. 3727 * Otherwise, we can simply swap the page with the one passed in. 3728 */ 3729 if (read || (len < (commit - read)) || 3730 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3731 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3732 unsigned int rpos = read; 3733 unsigned int pos = 0; 3734 unsigned int size; 3735 3736 if (full) 3737 goto out_unlock; 3738 3739 if (len > (commit - read)) 3740 len = (commit - read); 3741 3742 size = rb_event_length(event); 3743 3744 if (len < size) 3745 goto out_unlock; 3746 3747 /* save the current timestamp, since the user will need it */ 3748 save_timestamp = cpu_buffer->read_stamp; 3749 3750 /* Need to copy one event at a time */ 3751 do { 3752 memcpy(bpage->data + pos, rpage->data + rpos, size); 3753 3754 len -= size; 3755 3756 rb_advance_reader(cpu_buffer); 3757 rpos = reader->read; 3758 pos += size; 3759 3760 event = rb_reader_event(cpu_buffer); 3761 size = rb_event_length(event); 3762 } while (len > size); 3763 3764 /* update bpage */ 3765 local_set(&bpage->commit, pos); 3766 bpage->time_stamp = save_timestamp; 3767 3768 /* we copied everything to the beginning */ 3769 read = 0; 3770 } else { 3771 /* update the entry counter */ 3772 cpu_buffer->read += rb_page_entries(reader); 3773 3774 /* swap the pages */ 3775 rb_init_page(bpage); 3776 bpage = reader->page; 3777 reader->page = *data_page; 3778 local_set(&reader->write, 0); 3779 local_set(&reader->entries, 0); 3780 reader->read = 0; 3781 *data_page = bpage; 3782 } 3783 ret = read; 3784 3785 out_unlock: 3786 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3787 3788 out: 3789 return ret; 3790 } 3791 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3792 3793 #ifdef CONFIG_TRACING 3794 static ssize_t 3795 rb_simple_read(struct file *filp, char __user *ubuf, 3796 size_t cnt, loff_t *ppos) 3797 { 3798 unsigned long *p = filp->private_data; 3799 char buf[64]; 3800 int r; 3801 3802 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3803 r = sprintf(buf, "permanently disabled\n"); 3804 else 3805 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3806 3807 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3808 } 3809 3810 static ssize_t 3811 rb_simple_write(struct file *filp, const char __user *ubuf, 3812 size_t cnt, loff_t *ppos) 3813 { 3814 unsigned long *p = filp->private_data; 3815 char buf[64]; 3816 unsigned long val; 3817 int ret; 3818 3819 if (cnt >= sizeof(buf)) 3820 return -EINVAL; 3821 3822 if (copy_from_user(&buf, ubuf, cnt)) 3823 return -EFAULT; 3824 3825 buf[cnt] = 0; 3826 3827 ret = strict_strtoul(buf, 10, &val); 3828 if (ret < 0) 3829 return ret; 3830 3831 if (val) 3832 set_bit(RB_BUFFERS_ON_BIT, p); 3833 else 3834 clear_bit(RB_BUFFERS_ON_BIT, p); 3835 3836 (*ppos)++; 3837 3838 return cnt; 3839 } 3840 3841 static const struct file_operations rb_simple_fops = { 3842 .open = tracing_open_generic, 3843 .read = rb_simple_read, 3844 .write = rb_simple_write, 3845 }; 3846 3847 3848 static __init int rb_init_debugfs(void) 3849 { 3850 struct dentry *d_tracer; 3851 3852 d_tracer = tracing_init_dentry(); 3853 3854 trace_create_file("tracing_on", 0644, d_tracer, 3855 &ring_buffer_flags, &rb_simple_fops); 3856 3857 return 0; 3858 } 3859 3860 fs_initcall(rb_init_debugfs); 3861 #endif 3862 3863 #ifdef CONFIG_HOTPLUG_CPU 3864 static int rb_cpu_notify(struct notifier_block *self, 3865 unsigned long action, void *hcpu) 3866 { 3867 struct ring_buffer *buffer = 3868 container_of(self, struct ring_buffer, cpu_notify); 3869 long cpu = (long)hcpu; 3870 3871 switch (action) { 3872 case CPU_UP_PREPARE: 3873 case CPU_UP_PREPARE_FROZEN: 3874 if (cpumask_test_cpu(cpu, buffer->cpumask)) 3875 return NOTIFY_OK; 3876 3877 buffer->buffers[cpu] = 3878 rb_allocate_cpu_buffer(buffer, cpu); 3879 if (!buffer->buffers[cpu]) { 3880 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 3881 cpu); 3882 return NOTIFY_OK; 3883 } 3884 smp_wmb(); 3885 cpumask_set_cpu(cpu, buffer->cpumask); 3886 break; 3887 case CPU_DOWN_PREPARE: 3888 case CPU_DOWN_PREPARE_FROZEN: 3889 /* 3890 * Do nothing. 3891 * If we were to free the buffer, then the user would 3892 * lose any trace that was in the buffer. 3893 */ 3894 break; 3895 default: 3896 break; 3897 } 3898 return NOTIFY_OK; 3899 } 3900 #endif 3901