1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/trace_seq.h> 13 #include <linux/spinlock.h> 14 #include <linux/irq_work.h> 15 #include <linux/security.h> 16 #include <linux/uaccess.h> 17 #include <linux/hardirq.h> 18 #include <linux/kthread.h> /* for self test */ 19 #include <linux/module.h> 20 #include <linux/percpu.h> 21 #include <linux/mutex.h> 22 #include <linux/delay.h> 23 #include <linux/slab.h> 24 #include <linux/init.h> 25 #include <linux/hash.h> 26 #include <linux/list.h> 27 #include <linux/cpu.h> 28 #include <linux/oom.h> 29 30 #include <asm/local.h> 31 32 /* 33 * The "absolute" timestamp in the buffer is only 59 bits. 34 * If a clock has the 5 MSBs set, it needs to be saved and 35 * reinserted. 36 */ 37 #define TS_MSB (0xf8ULL << 56) 38 #define ABS_TS_MASK (~TS_MSB) 39 40 static void update_pages_handler(struct work_struct *work); 41 42 /* 43 * The ring buffer header is special. We must manually up keep it. 44 */ 45 int ring_buffer_print_entry_header(struct trace_seq *s) 46 { 47 trace_seq_puts(s, "# compressed entry header\n"); 48 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 49 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 50 trace_seq_puts(s, "\tarray : 32 bits\n"); 51 trace_seq_putc(s, '\n'); 52 trace_seq_printf(s, "\tpadding : type == %d\n", 53 RINGBUF_TYPE_PADDING); 54 trace_seq_printf(s, "\ttime_extend : type == %d\n", 55 RINGBUF_TYPE_TIME_EXTEND); 56 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 57 RINGBUF_TYPE_TIME_STAMP); 58 trace_seq_printf(s, "\tdata max type_len == %d\n", 59 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 60 61 return !trace_seq_has_overflowed(s); 62 } 63 64 /* 65 * The ring buffer is made up of a list of pages. A separate list of pages is 66 * allocated for each CPU. A writer may only write to a buffer that is 67 * associated with the CPU it is currently executing on. A reader may read 68 * from any per cpu buffer. 69 * 70 * The reader is special. For each per cpu buffer, the reader has its own 71 * reader page. When a reader has read the entire reader page, this reader 72 * page is swapped with another page in the ring buffer. 73 * 74 * Now, as long as the writer is off the reader page, the reader can do what 75 * ever it wants with that page. The writer will never write to that page 76 * again (as long as it is out of the ring buffer). 77 * 78 * Here's some silly ASCII art. 79 * 80 * +------+ 81 * |reader| RING BUFFER 82 * |page | 83 * +------+ +---+ +---+ +---+ 84 * | |-->| |-->| | 85 * +---+ +---+ +---+ 86 * ^ | 87 * | | 88 * +---------------+ 89 * 90 * 91 * +------+ 92 * |reader| RING BUFFER 93 * |page |------------------v 94 * +------+ +---+ +---+ +---+ 95 * | |-->| |-->| | 96 * +---+ +---+ +---+ 97 * ^ | 98 * | | 99 * +---------------+ 100 * 101 * 102 * +------+ 103 * |reader| RING BUFFER 104 * |page |------------------v 105 * +------+ +---+ +---+ +---+ 106 * ^ | |-->| |-->| | 107 * | +---+ +---+ +---+ 108 * | | 109 * | | 110 * +------------------------------+ 111 * 112 * 113 * +------+ 114 * |buffer| RING BUFFER 115 * |page |------------------v 116 * +------+ +---+ +---+ +---+ 117 * ^ | | | |-->| | 118 * | New +---+ +---+ +---+ 119 * | Reader------^ | 120 * | page | 121 * +------------------------------+ 122 * 123 * 124 * After we make this swap, the reader can hand this page off to the splice 125 * code and be done with it. It can even allocate a new page if it needs to 126 * and swap that into the ring buffer. 127 * 128 * We will be using cmpxchg soon to make all this lockless. 129 * 130 */ 131 132 /* Used for individual buffers (after the counter) */ 133 #define RB_BUFFER_OFF (1 << 20) 134 135 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 136 137 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 138 #define RB_ALIGNMENT 4U 139 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 140 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 141 142 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 143 # define RB_FORCE_8BYTE_ALIGNMENT 0 144 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 145 #else 146 # define RB_FORCE_8BYTE_ALIGNMENT 1 147 # define RB_ARCH_ALIGNMENT 8U 148 #endif 149 150 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 151 152 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 153 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 154 155 enum { 156 RB_LEN_TIME_EXTEND = 8, 157 RB_LEN_TIME_STAMP = 8, 158 }; 159 160 #define skip_time_extend(event) \ 161 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 162 163 #define extended_time(event) \ 164 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 165 166 static inline bool rb_null_event(struct ring_buffer_event *event) 167 { 168 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 169 } 170 171 static void rb_event_set_padding(struct ring_buffer_event *event) 172 { 173 /* padding has a NULL time_delta */ 174 event->type_len = RINGBUF_TYPE_PADDING; 175 event->time_delta = 0; 176 } 177 178 static unsigned 179 rb_event_data_length(struct ring_buffer_event *event) 180 { 181 unsigned length; 182 183 if (event->type_len) 184 length = event->type_len * RB_ALIGNMENT; 185 else 186 length = event->array[0]; 187 return length + RB_EVNT_HDR_SIZE; 188 } 189 190 /* 191 * Return the length of the given event. Will return 192 * the length of the time extend if the event is a 193 * time extend. 194 */ 195 static inline unsigned 196 rb_event_length(struct ring_buffer_event *event) 197 { 198 switch (event->type_len) { 199 case RINGBUF_TYPE_PADDING: 200 if (rb_null_event(event)) 201 /* undefined */ 202 return -1; 203 return event->array[0] + RB_EVNT_HDR_SIZE; 204 205 case RINGBUF_TYPE_TIME_EXTEND: 206 return RB_LEN_TIME_EXTEND; 207 208 case RINGBUF_TYPE_TIME_STAMP: 209 return RB_LEN_TIME_STAMP; 210 211 case RINGBUF_TYPE_DATA: 212 return rb_event_data_length(event); 213 default: 214 WARN_ON_ONCE(1); 215 } 216 /* not hit */ 217 return 0; 218 } 219 220 /* 221 * Return total length of time extend and data, 222 * or just the event length for all other events. 223 */ 224 static inline unsigned 225 rb_event_ts_length(struct ring_buffer_event *event) 226 { 227 unsigned len = 0; 228 229 if (extended_time(event)) { 230 /* time extends include the data event after it */ 231 len = RB_LEN_TIME_EXTEND; 232 event = skip_time_extend(event); 233 } 234 return len + rb_event_length(event); 235 } 236 237 /** 238 * ring_buffer_event_length - return the length of the event 239 * @event: the event to get the length of 240 * 241 * Returns the size of the data load of a data event. 242 * If the event is something other than a data event, it 243 * returns the size of the event itself. With the exception 244 * of a TIME EXTEND, where it still returns the size of the 245 * data load of the data event after it. 246 */ 247 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 248 { 249 unsigned length; 250 251 if (extended_time(event)) 252 event = skip_time_extend(event); 253 254 length = rb_event_length(event); 255 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 256 return length; 257 length -= RB_EVNT_HDR_SIZE; 258 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 259 length -= sizeof(event->array[0]); 260 return length; 261 } 262 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 263 264 /* inline for ring buffer fast paths */ 265 static __always_inline void * 266 rb_event_data(struct ring_buffer_event *event) 267 { 268 if (extended_time(event)) 269 event = skip_time_extend(event); 270 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 271 /* If length is in len field, then array[0] has the data */ 272 if (event->type_len) 273 return (void *)&event->array[0]; 274 /* Otherwise length is in array[0] and array[1] has the data */ 275 return (void *)&event->array[1]; 276 } 277 278 /** 279 * ring_buffer_event_data - return the data of the event 280 * @event: the event to get the data from 281 */ 282 void *ring_buffer_event_data(struct ring_buffer_event *event) 283 { 284 return rb_event_data(event); 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 287 288 #define for_each_buffer_cpu(buffer, cpu) \ 289 for_each_cpu(cpu, buffer->cpumask) 290 291 #define for_each_online_buffer_cpu(buffer, cpu) \ 292 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 293 294 #define TS_SHIFT 27 295 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 296 #define TS_DELTA_TEST (~TS_MASK) 297 298 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 299 { 300 u64 ts; 301 302 ts = event->array[0]; 303 ts <<= TS_SHIFT; 304 ts += event->time_delta; 305 306 return ts; 307 } 308 309 /* Flag when events were overwritten */ 310 #define RB_MISSED_EVENTS (1 << 31) 311 /* Missed count stored at end */ 312 #define RB_MISSED_STORED (1 << 30) 313 314 struct buffer_data_page { 315 u64 time_stamp; /* page time stamp */ 316 local_t commit; /* write committed index */ 317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 318 }; 319 320 /* 321 * Note, the buffer_page list must be first. The buffer pages 322 * are allocated in cache lines, which means that each buffer 323 * page will be at the beginning of a cache line, and thus 324 * the least significant bits will be zero. We use this to 325 * add flags in the list struct pointers, to make the ring buffer 326 * lockless. 327 */ 328 struct buffer_page { 329 struct list_head list; /* list of buffer pages */ 330 local_t write; /* index for next write */ 331 unsigned read; /* index for next read */ 332 local_t entries; /* entries on this page */ 333 unsigned long real_end; /* real end of data */ 334 struct buffer_data_page *page; /* Actual data page */ 335 }; 336 337 /* 338 * The buffer page counters, write and entries, must be reset 339 * atomically when crossing page boundaries. To synchronize this 340 * update, two counters are inserted into the number. One is 341 * the actual counter for the write position or count on the page. 342 * 343 * The other is a counter of updaters. Before an update happens 344 * the update partition of the counter is incremented. This will 345 * allow the updater to update the counter atomically. 346 * 347 * The counter is 20 bits, and the state data is 12. 348 */ 349 #define RB_WRITE_MASK 0xfffff 350 #define RB_WRITE_INTCNT (1 << 20) 351 352 static void rb_init_page(struct buffer_data_page *bpage) 353 { 354 local_set(&bpage->commit, 0); 355 } 356 357 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 358 { 359 return local_read(&bpage->page->commit); 360 } 361 362 static void free_buffer_page(struct buffer_page *bpage) 363 { 364 free_page((unsigned long)bpage->page); 365 kfree(bpage); 366 } 367 368 /* 369 * We need to fit the time_stamp delta into 27 bits. 370 */ 371 static inline bool test_time_stamp(u64 delta) 372 { 373 return !!(delta & TS_DELTA_TEST); 374 } 375 376 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 377 378 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 379 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 380 381 int ring_buffer_print_page_header(struct trace_seq *s) 382 { 383 struct buffer_data_page field; 384 385 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 386 "offset:0;\tsize:%u;\tsigned:%u;\n", 387 (unsigned int)sizeof(field.time_stamp), 388 (unsigned int)is_signed_type(u64)); 389 390 trace_seq_printf(s, "\tfield: local_t commit;\t" 391 "offset:%u;\tsize:%u;\tsigned:%u;\n", 392 (unsigned int)offsetof(typeof(field), commit), 393 (unsigned int)sizeof(field.commit), 394 (unsigned int)is_signed_type(long)); 395 396 trace_seq_printf(s, "\tfield: int overwrite;\t" 397 "offset:%u;\tsize:%u;\tsigned:%u;\n", 398 (unsigned int)offsetof(typeof(field), commit), 399 1, 400 (unsigned int)is_signed_type(long)); 401 402 trace_seq_printf(s, "\tfield: char data;\t" 403 "offset:%u;\tsize:%u;\tsigned:%u;\n", 404 (unsigned int)offsetof(typeof(field), data), 405 (unsigned int)BUF_PAGE_SIZE, 406 (unsigned int)is_signed_type(char)); 407 408 return !trace_seq_has_overflowed(s); 409 } 410 411 struct rb_irq_work { 412 struct irq_work work; 413 wait_queue_head_t waiters; 414 wait_queue_head_t full_waiters; 415 long wait_index; 416 bool waiters_pending; 417 bool full_waiters_pending; 418 bool wakeup_full; 419 }; 420 421 /* 422 * Structure to hold event state and handle nested events. 423 */ 424 struct rb_event_info { 425 u64 ts; 426 u64 delta; 427 u64 before; 428 u64 after; 429 unsigned long length; 430 struct buffer_page *tail_page; 431 int add_timestamp; 432 }; 433 434 /* 435 * Used for the add_timestamp 436 * NONE 437 * EXTEND - wants a time extend 438 * ABSOLUTE - the buffer requests all events to have absolute time stamps 439 * FORCE - force a full time stamp. 440 */ 441 enum { 442 RB_ADD_STAMP_NONE = 0, 443 RB_ADD_STAMP_EXTEND = BIT(1), 444 RB_ADD_STAMP_ABSOLUTE = BIT(2), 445 RB_ADD_STAMP_FORCE = BIT(3) 446 }; 447 /* 448 * Used for which event context the event is in. 449 * TRANSITION = 0 450 * NMI = 1 451 * IRQ = 2 452 * SOFTIRQ = 3 453 * NORMAL = 4 454 * 455 * See trace_recursive_lock() comment below for more details. 456 */ 457 enum { 458 RB_CTX_TRANSITION, 459 RB_CTX_NMI, 460 RB_CTX_IRQ, 461 RB_CTX_SOFTIRQ, 462 RB_CTX_NORMAL, 463 RB_CTX_MAX 464 }; 465 466 #if BITS_PER_LONG == 32 467 #define RB_TIME_32 468 #endif 469 470 /* To test on 64 bit machines */ 471 //#define RB_TIME_32 472 473 #ifdef RB_TIME_32 474 475 struct rb_time_struct { 476 local_t cnt; 477 local_t top; 478 local_t bottom; 479 local_t msb; 480 }; 481 #else 482 #include <asm/local64.h> 483 struct rb_time_struct { 484 local64_t time; 485 }; 486 #endif 487 typedef struct rb_time_struct rb_time_t; 488 489 #define MAX_NEST 5 490 491 /* 492 * head_page == tail_page && head == tail then buffer is empty. 493 */ 494 struct ring_buffer_per_cpu { 495 int cpu; 496 atomic_t record_disabled; 497 atomic_t resize_disabled; 498 struct trace_buffer *buffer; 499 raw_spinlock_t reader_lock; /* serialize readers */ 500 arch_spinlock_t lock; 501 struct lock_class_key lock_key; 502 struct buffer_data_page *free_page; 503 unsigned long nr_pages; 504 unsigned int current_context; 505 struct list_head *pages; 506 struct buffer_page *head_page; /* read from head */ 507 struct buffer_page *tail_page; /* write to tail */ 508 struct buffer_page *commit_page; /* committed pages */ 509 struct buffer_page *reader_page; 510 unsigned long lost_events; 511 unsigned long last_overrun; 512 unsigned long nest; 513 local_t entries_bytes; 514 local_t entries; 515 local_t overrun; 516 local_t commit_overrun; 517 local_t dropped_events; 518 local_t committing; 519 local_t commits; 520 local_t pages_touched; 521 local_t pages_lost; 522 local_t pages_read; 523 long last_pages_touch; 524 size_t shortest_full; 525 unsigned long read; 526 unsigned long read_bytes; 527 rb_time_t write_stamp; 528 rb_time_t before_stamp; 529 u64 event_stamp[MAX_NEST]; 530 u64 read_stamp; 531 /* pages removed since last reset */ 532 unsigned long pages_removed; 533 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 534 long nr_pages_to_update; 535 struct list_head new_pages; /* new pages to add */ 536 struct work_struct update_pages_work; 537 struct completion update_done; 538 539 struct rb_irq_work irq_work; 540 }; 541 542 struct trace_buffer { 543 unsigned flags; 544 int cpus; 545 atomic_t record_disabled; 546 atomic_t resizing; 547 cpumask_var_t cpumask; 548 549 struct lock_class_key *reader_lock_key; 550 551 struct mutex mutex; 552 553 struct ring_buffer_per_cpu **buffers; 554 555 struct hlist_node node; 556 u64 (*clock)(void); 557 558 struct rb_irq_work irq_work; 559 bool time_stamp_abs; 560 }; 561 562 struct ring_buffer_iter { 563 struct ring_buffer_per_cpu *cpu_buffer; 564 unsigned long head; 565 unsigned long next_event; 566 struct buffer_page *head_page; 567 struct buffer_page *cache_reader_page; 568 unsigned long cache_read; 569 unsigned long cache_pages_removed; 570 u64 read_stamp; 571 u64 page_stamp; 572 struct ring_buffer_event *event; 573 int missed_events; 574 }; 575 576 #ifdef RB_TIME_32 577 578 /* 579 * On 32 bit machines, local64_t is very expensive. As the ring 580 * buffer doesn't need all the features of a true 64 bit atomic, 581 * on 32 bit, it uses these functions (64 still uses local64_t). 582 * 583 * For the ring buffer, 64 bit required operations for the time is 584 * the following: 585 * 586 * - Reads may fail if it interrupted a modification of the time stamp. 587 * It will succeed if it did not interrupt another write even if 588 * the read itself is interrupted by a write. 589 * It returns whether it was successful or not. 590 * 591 * - Writes always succeed and will overwrite other writes and writes 592 * that were done by events interrupting the current write. 593 * 594 * - A write followed by a read of the same time stamp will always succeed, 595 * but may not contain the same value. 596 * 597 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 598 * Other than that, it acts like a normal cmpxchg. 599 * 600 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 601 * (bottom being the least significant 30 bits of the 60 bit time stamp). 602 * 603 * The two most significant bits of each half holds a 2 bit counter (0-3). 604 * Each update will increment this counter by one. 605 * When reading the top and bottom, if the two counter bits match then the 606 * top and bottom together make a valid 60 bit number. 607 */ 608 #define RB_TIME_SHIFT 30 609 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 610 #define RB_TIME_MSB_SHIFT 60 611 612 static inline int rb_time_cnt(unsigned long val) 613 { 614 return (val >> RB_TIME_SHIFT) & 3; 615 } 616 617 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 618 { 619 u64 val; 620 621 val = top & RB_TIME_VAL_MASK; 622 val <<= RB_TIME_SHIFT; 623 val |= bottom & RB_TIME_VAL_MASK; 624 625 return val; 626 } 627 628 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 629 { 630 unsigned long top, bottom, msb; 631 unsigned long c; 632 633 /* 634 * If the read is interrupted by a write, then the cnt will 635 * be different. Loop until both top and bottom have been read 636 * without interruption. 637 */ 638 do { 639 c = local_read(&t->cnt); 640 top = local_read(&t->top); 641 bottom = local_read(&t->bottom); 642 msb = local_read(&t->msb); 643 } while (c != local_read(&t->cnt)); 644 645 *cnt = rb_time_cnt(top); 646 647 /* If top, msb or bottom counts don't match, this interrupted a write */ 648 if (*cnt != rb_time_cnt(msb) || *cnt != rb_time_cnt(bottom)) 649 return false; 650 651 /* The shift to msb will lose its cnt bits */ 652 *ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT); 653 return true; 654 } 655 656 static bool rb_time_read(rb_time_t *t, u64 *ret) 657 { 658 unsigned long cnt; 659 660 return __rb_time_read(t, ret, &cnt); 661 } 662 663 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 664 { 665 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 666 } 667 668 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom, 669 unsigned long *msb) 670 { 671 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 672 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 673 *msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT); 674 } 675 676 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 677 { 678 val = rb_time_val_cnt(val, cnt); 679 local_set(t, val); 680 } 681 682 static void rb_time_set(rb_time_t *t, u64 val) 683 { 684 unsigned long cnt, top, bottom, msb; 685 686 rb_time_split(val, &top, &bottom, &msb); 687 688 /* Writes always succeed with a valid number even if it gets interrupted. */ 689 do { 690 cnt = local_inc_return(&t->cnt); 691 rb_time_val_set(&t->top, top, cnt); 692 rb_time_val_set(&t->bottom, bottom, cnt); 693 rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt); 694 } while (cnt != local_read(&t->cnt)); 695 } 696 697 static inline bool 698 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 699 { 700 return local_try_cmpxchg(l, &expect, set); 701 } 702 703 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 704 { 705 unsigned long cnt, top, bottom, msb; 706 unsigned long cnt2, top2, bottom2, msb2; 707 u64 val; 708 709 /* Any interruptions in this function should cause a failure */ 710 cnt = local_read(&t->cnt); 711 712 /* The cmpxchg always fails if it interrupted an update */ 713 if (!__rb_time_read(t, &val, &cnt2)) 714 return false; 715 716 if (val != expect) 717 return false; 718 719 if ((cnt & 3) != cnt2) 720 return false; 721 722 cnt2 = cnt + 1; 723 724 rb_time_split(val, &top, &bottom, &msb); 725 msb = rb_time_val_cnt(msb, cnt); 726 top = rb_time_val_cnt(top, cnt); 727 bottom = rb_time_val_cnt(bottom, cnt); 728 729 rb_time_split(set, &top2, &bottom2, &msb2); 730 msb2 = rb_time_val_cnt(msb2, cnt); 731 top2 = rb_time_val_cnt(top2, cnt2); 732 bottom2 = rb_time_val_cnt(bottom2, cnt2); 733 734 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 735 return false; 736 if (!rb_time_read_cmpxchg(&t->msb, msb, msb2)) 737 return false; 738 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 739 return false; 740 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 741 return false; 742 return true; 743 } 744 745 #else /* 64 bits */ 746 747 /* local64_t always succeeds */ 748 749 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 750 { 751 *ret = local64_read(&t->time); 752 return true; 753 } 754 static void rb_time_set(rb_time_t *t, u64 val) 755 { 756 local64_set(&t->time, val); 757 } 758 759 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 760 { 761 return local64_try_cmpxchg(&t->time, &expect, set); 762 } 763 #endif 764 765 /* 766 * Enable this to make sure that the event passed to 767 * ring_buffer_event_time_stamp() is not committed and also 768 * is on the buffer that it passed in. 769 */ 770 //#define RB_VERIFY_EVENT 771 #ifdef RB_VERIFY_EVENT 772 static struct list_head *rb_list_head(struct list_head *list); 773 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 774 void *event) 775 { 776 struct buffer_page *page = cpu_buffer->commit_page; 777 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 778 struct list_head *next; 779 long commit, write; 780 unsigned long addr = (unsigned long)event; 781 bool done = false; 782 int stop = 0; 783 784 /* Make sure the event exists and is not committed yet */ 785 do { 786 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 787 done = true; 788 commit = local_read(&page->page->commit); 789 write = local_read(&page->write); 790 if (addr >= (unsigned long)&page->page->data[commit] && 791 addr < (unsigned long)&page->page->data[write]) 792 return; 793 794 next = rb_list_head(page->list.next); 795 page = list_entry(next, struct buffer_page, list); 796 } while (!done); 797 WARN_ON_ONCE(1); 798 } 799 #else 800 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 801 void *event) 802 { 803 } 804 #endif 805 806 /* 807 * The absolute time stamp drops the 5 MSBs and some clocks may 808 * require them. The rb_fix_abs_ts() will take a previous full 809 * time stamp, and add the 5 MSB of that time stamp on to the 810 * saved absolute time stamp. Then they are compared in case of 811 * the unlikely event that the latest time stamp incremented 812 * the 5 MSB. 813 */ 814 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 815 { 816 if (save_ts & TS_MSB) { 817 abs |= save_ts & TS_MSB; 818 /* Check for overflow */ 819 if (unlikely(abs < save_ts)) 820 abs += 1ULL << 59; 821 } 822 return abs; 823 } 824 825 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 826 827 /** 828 * ring_buffer_event_time_stamp - return the event's current time stamp 829 * @buffer: The buffer that the event is on 830 * @event: the event to get the time stamp of 831 * 832 * Note, this must be called after @event is reserved, and before it is 833 * committed to the ring buffer. And must be called from the same 834 * context where the event was reserved (normal, softirq, irq, etc). 835 * 836 * Returns the time stamp associated with the current event. 837 * If the event has an extended time stamp, then that is used as 838 * the time stamp to return. 839 * In the highly unlikely case that the event was nested more than 840 * the max nesting, then the write_stamp of the buffer is returned, 841 * otherwise current time is returned, but that really neither of 842 * the last two cases should ever happen. 843 */ 844 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 845 struct ring_buffer_event *event) 846 { 847 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 848 unsigned int nest; 849 u64 ts; 850 851 /* If the event includes an absolute time, then just use that */ 852 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 853 ts = rb_event_time_stamp(event); 854 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 855 } 856 857 nest = local_read(&cpu_buffer->committing); 858 verify_event(cpu_buffer, event); 859 if (WARN_ON_ONCE(!nest)) 860 goto fail; 861 862 /* Read the current saved nesting level time stamp */ 863 if (likely(--nest < MAX_NEST)) 864 return cpu_buffer->event_stamp[nest]; 865 866 /* Shouldn't happen, warn if it does */ 867 WARN_ONCE(1, "nest (%d) greater than max", nest); 868 869 fail: 870 /* Can only fail on 32 bit */ 871 if (!rb_time_read(&cpu_buffer->write_stamp, &ts)) 872 /* Screw it, just read the current time */ 873 ts = rb_time_stamp(cpu_buffer->buffer); 874 875 return ts; 876 } 877 878 /** 879 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 880 * @buffer: The ring_buffer to get the number of pages from 881 * @cpu: The cpu of the ring_buffer to get the number of pages from 882 * 883 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 884 */ 885 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 886 { 887 return buffer->buffers[cpu]->nr_pages; 888 } 889 890 /** 891 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 892 * @buffer: The ring_buffer to get the number of pages from 893 * @cpu: The cpu of the ring_buffer to get the number of pages from 894 * 895 * Returns the number of pages that have content in the ring buffer. 896 */ 897 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 898 { 899 size_t read; 900 size_t lost; 901 size_t cnt; 902 903 read = local_read(&buffer->buffers[cpu]->pages_read); 904 lost = local_read(&buffer->buffers[cpu]->pages_lost); 905 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 906 907 if (WARN_ON_ONCE(cnt < lost)) 908 return 0; 909 910 cnt -= lost; 911 912 /* The reader can read an empty page, but not more than that */ 913 if (cnt < read) { 914 WARN_ON_ONCE(read > cnt + 1); 915 return 0; 916 } 917 918 return cnt - read; 919 } 920 921 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 922 { 923 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 924 size_t nr_pages; 925 size_t dirty; 926 927 nr_pages = cpu_buffer->nr_pages; 928 if (!nr_pages || !full) 929 return true; 930 931 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 932 933 return (dirty * 100) > (full * nr_pages); 934 } 935 936 /* 937 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 938 * 939 * Schedules a delayed work to wake up any task that is blocked on the 940 * ring buffer waiters queue. 941 */ 942 static void rb_wake_up_waiters(struct irq_work *work) 943 { 944 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 945 946 wake_up_all(&rbwork->waiters); 947 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 948 rbwork->wakeup_full = false; 949 rbwork->full_waiters_pending = false; 950 wake_up_all(&rbwork->full_waiters); 951 } 952 } 953 954 /** 955 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 956 * @buffer: The ring buffer to wake waiters on 957 * @cpu: The CPU buffer to wake waiters on 958 * 959 * In the case of a file that represents a ring buffer is closing, 960 * it is prudent to wake up any waiters that are on this. 961 */ 962 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 963 { 964 struct ring_buffer_per_cpu *cpu_buffer; 965 struct rb_irq_work *rbwork; 966 967 if (!buffer) 968 return; 969 970 if (cpu == RING_BUFFER_ALL_CPUS) { 971 972 /* Wake up individual ones too. One level recursion */ 973 for_each_buffer_cpu(buffer, cpu) 974 ring_buffer_wake_waiters(buffer, cpu); 975 976 rbwork = &buffer->irq_work; 977 } else { 978 if (WARN_ON_ONCE(!buffer->buffers)) 979 return; 980 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 981 return; 982 983 cpu_buffer = buffer->buffers[cpu]; 984 /* The CPU buffer may not have been initialized yet */ 985 if (!cpu_buffer) 986 return; 987 rbwork = &cpu_buffer->irq_work; 988 } 989 990 rbwork->wait_index++; 991 /* make sure the waiters see the new index */ 992 smp_wmb(); 993 994 rb_wake_up_waiters(&rbwork->work); 995 } 996 997 /** 998 * ring_buffer_wait - wait for input to the ring buffer 999 * @buffer: buffer to wait on 1000 * @cpu: the cpu buffer to wait on 1001 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1002 * 1003 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1004 * as data is added to any of the @buffer's cpu buffers. Otherwise 1005 * it will wait for data to be added to a specific cpu buffer. 1006 */ 1007 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 1008 { 1009 struct ring_buffer_per_cpu *cpu_buffer; 1010 DEFINE_WAIT(wait); 1011 struct rb_irq_work *work; 1012 long wait_index; 1013 int ret = 0; 1014 1015 /* 1016 * Depending on what the caller is waiting for, either any 1017 * data in any cpu buffer, or a specific buffer, put the 1018 * caller on the appropriate wait queue. 1019 */ 1020 if (cpu == RING_BUFFER_ALL_CPUS) { 1021 work = &buffer->irq_work; 1022 /* Full only makes sense on per cpu reads */ 1023 full = 0; 1024 } else { 1025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1026 return -ENODEV; 1027 cpu_buffer = buffer->buffers[cpu]; 1028 work = &cpu_buffer->irq_work; 1029 } 1030 1031 wait_index = READ_ONCE(work->wait_index); 1032 1033 while (true) { 1034 if (full) 1035 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 1036 else 1037 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 1038 1039 /* 1040 * The events can happen in critical sections where 1041 * checking a work queue can cause deadlocks. 1042 * After adding a task to the queue, this flag is set 1043 * only to notify events to try to wake up the queue 1044 * using irq_work. 1045 * 1046 * We don't clear it even if the buffer is no longer 1047 * empty. The flag only causes the next event to run 1048 * irq_work to do the work queue wake up. The worse 1049 * that can happen if we race with !trace_empty() is that 1050 * an event will cause an irq_work to try to wake up 1051 * an empty queue. 1052 * 1053 * There's no reason to protect this flag either, as 1054 * the work queue and irq_work logic will do the necessary 1055 * synchronization for the wake ups. The only thing 1056 * that is necessary is that the wake up happens after 1057 * a task has been queued. It's OK for spurious wake ups. 1058 */ 1059 if (full) 1060 work->full_waiters_pending = true; 1061 else 1062 work->waiters_pending = true; 1063 1064 if (signal_pending(current)) { 1065 ret = -EINTR; 1066 break; 1067 } 1068 1069 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 1070 break; 1071 1072 if (cpu != RING_BUFFER_ALL_CPUS && 1073 !ring_buffer_empty_cpu(buffer, cpu)) { 1074 unsigned long flags; 1075 bool pagebusy; 1076 bool done; 1077 1078 if (!full) 1079 break; 1080 1081 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1082 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 1083 done = !pagebusy && full_hit(buffer, cpu, full); 1084 1085 if (!cpu_buffer->shortest_full || 1086 cpu_buffer->shortest_full > full) 1087 cpu_buffer->shortest_full = full; 1088 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1089 if (done) 1090 break; 1091 } 1092 1093 schedule(); 1094 1095 /* Make sure to see the new wait index */ 1096 smp_rmb(); 1097 if (wait_index != work->wait_index) 1098 break; 1099 } 1100 1101 if (full) 1102 finish_wait(&work->full_waiters, &wait); 1103 else 1104 finish_wait(&work->waiters, &wait); 1105 1106 return ret; 1107 } 1108 1109 /** 1110 * ring_buffer_poll_wait - poll on buffer input 1111 * @buffer: buffer to wait on 1112 * @cpu: the cpu buffer to wait on 1113 * @filp: the file descriptor 1114 * @poll_table: The poll descriptor 1115 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1116 * 1117 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1118 * as data is added to any of the @buffer's cpu buffers. Otherwise 1119 * it will wait for data to be added to a specific cpu buffer. 1120 * 1121 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1122 * zero otherwise. 1123 */ 1124 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1125 struct file *filp, poll_table *poll_table, int full) 1126 { 1127 struct ring_buffer_per_cpu *cpu_buffer; 1128 struct rb_irq_work *work; 1129 1130 if (cpu == RING_BUFFER_ALL_CPUS) { 1131 work = &buffer->irq_work; 1132 full = 0; 1133 } else { 1134 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1135 return -EINVAL; 1136 1137 cpu_buffer = buffer->buffers[cpu]; 1138 work = &cpu_buffer->irq_work; 1139 } 1140 1141 if (full) { 1142 poll_wait(filp, &work->full_waiters, poll_table); 1143 work->full_waiters_pending = true; 1144 if (!cpu_buffer->shortest_full || 1145 cpu_buffer->shortest_full > full) 1146 cpu_buffer->shortest_full = full; 1147 } else { 1148 poll_wait(filp, &work->waiters, poll_table); 1149 work->waiters_pending = true; 1150 } 1151 1152 /* 1153 * There's a tight race between setting the waiters_pending and 1154 * checking if the ring buffer is empty. Once the waiters_pending bit 1155 * is set, the next event will wake the task up, but we can get stuck 1156 * if there's only a single event in. 1157 * 1158 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1159 * but adding a memory barrier to all events will cause too much of a 1160 * performance hit in the fast path. We only need a memory barrier when 1161 * the buffer goes from empty to having content. But as this race is 1162 * extremely small, and it's not a problem if another event comes in, we 1163 * will fix it later. 1164 */ 1165 smp_mb(); 1166 1167 if (full) 1168 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 1169 1170 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1171 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1172 return EPOLLIN | EPOLLRDNORM; 1173 return 0; 1174 } 1175 1176 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1177 #define RB_WARN_ON(b, cond) \ 1178 ({ \ 1179 int _____ret = unlikely(cond); \ 1180 if (_____ret) { \ 1181 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1182 struct ring_buffer_per_cpu *__b = \ 1183 (void *)b; \ 1184 atomic_inc(&__b->buffer->record_disabled); \ 1185 } else \ 1186 atomic_inc(&b->record_disabled); \ 1187 WARN_ON(1); \ 1188 } \ 1189 _____ret; \ 1190 }) 1191 1192 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1193 #define DEBUG_SHIFT 0 1194 1195 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1196 { 1197 u64 ts; 1198 1199 /* Skip retpolines :-( */ 1200 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1201 ts = trace_clock_local(); 1202 else 1203 ts = buffer->clock(); 1204 1205 /* shift to debug/test normalization and TIME_EXTENTS */ 1206 return ts << DEBUG_SHIFT; 1207 } 1208 1209 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1210 { 1211 u64 time; 1212 1213 preempt_disable_notrace(); 1214 time = rb_time_stamp(buffer); 1215 preempt_enable_notrace(); 1216 1217 return time; 1218 } 1219 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1220 1221 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1222 int cpu, u64 *ts) 1223 { 1224 /* Just stupid testing the normalize function and deltas */ 1225 *ts >>= DEBUG_SHIFT; 1226 } 1227 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1228 1229 /* 1230 * Making the ring buffer lockless makes things tricky. 1231 * Although writes only happen on the CPU that they are on, 1232 * and they only need to worry about interrupts. Reads can 1233 * happen on any CPU. 1234 * 1235 * The reader page is always off the ring buffer, but when the 1236 * reader finishes with a page, it needs to swap its page with 1237 * a new one from the buffer. The reader needs to take from 1238 * the head (writes go to the tail). But if a writer is in overwrite 1239 * mode and wraps, it must push the head page forward. 1240 * 1241 * Here lies the problem. 1242 * 1243 * The reader must be careful to replace only the head page, and 1244 * not another one. As described at the top of the file in the 1245 * ASCII art, the reader sets its old page to point to the next 1246 * page after head. It then sets the page after head to point to 1247 * the old reader page. But if the writer moves the head page 1248 * during this operation, the reader could end up with the tail. 1249 * 1250 * We use cmpxchg to help prevent this race. We also do something 1251 * special with the page before head. We set the LSB to 1. 1252 * 1253 * When the writer must push the page forward, it will clear the 1254 * bit that points to the head page, move the head, and then set 1255 * the bit that points to the new head page. 1256 * 1257 * We also don't want an interrupt coming in and moving the head 1258 * page on another writer. Thus we use the second LSB to catch 1259 * that too. Thus: 1260 * 1261 * head->list->prev->next bit 1 bit 0 1262 * ------- ------- 1263 * Normal page 0 0 1264 * Points to head page 0 1 1265 * New head page 1 0 1266 * 1267 * Note we can not trust the prev pointer of the head page, because: 1268 * 1269 * +----+ +-----+ +-----+ 1270 * | |------>| T |---X--->| N | 1271 * | |<------| | | | 1272 * +----+ +-----+ +-----+ 1273 * ^ ^ | 1274 * | +-----+ | | 1275 * +----------| R |----------+ | 1276 * | |<-----------+ 1277 * +-----+ 1278 * 1279 * Key: ---X--> HEAD flag set in pointer 1280 * T Tail page 1281 * R Reader page 1282 * N Next page 1283 * 1284 * (see __rb_reserve_next() to see where this happens) 1285 * 1286 * What the above shows is that the reader just swapped out 1287 * the reader page with a page in the buffer, but before it 1288 * could make the new header point back to the new page added 1289 * it was preempted by a writer. The writer moved forward onto 1290 * the new page added by the reader and is about to move forward 1291 * again. 1292 * 1293 * You can see, it is legitimate for the previous pointer of 1294 * the head (or any page) not to point back to itself. But only 1295 * temporarily. 1296 */ 1297 1298 #define RB_PAGE_NORMAL 0UL 1299 #define RB_PAGE_HEAD 1UL 1300 #define RB_PAGE_UPDATE 2UL 1301 1302 1303 #define RB_FLAG_MASK 3UL 1304 1305 /* PAGE_MOVED is not part of the mask */ 1306 #define RB_PAGE_MOVED 4UL 1307 1308 /* 1309 * rb_list_head - remove any bit 1310 */ 1311 static struct list_head *rb_list_head(struct list_head *list) 1312 { 1313 unsigned long val = (unsigned long)list; 1314 1315 return (struct list_head *)(val & ~RB_FLAG_MASK); 1316 } 1317 1318 /* 1319 * rb_is_head_page - test if the given page is the head page 1320 * 1321 * Because the reader may move the head_page pointer, we can 1322 * not trust what the head page is (it may be pointing to 1323 * the reader page). But if the next page is a header page, 1324 * its flags will be non zero. 1325 */ 1326 static inline int 1327 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1328 { 1329 unsigned long val; 1330 1331 val = (unsigned long)list->next; 1332 1333 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1334 return RB_PAGE_MOVED; 1335 1336 return val & RB_FLAG_MASK; 1337 } 1338 1339 /* 1340 * rb_is_reader_page 1341 * 1342 * The unique thing about the reader page, is that, if the 1343 * writer is ever on it, the previous pointer never points 1344 * back to the reader page. 1345 */ 1346 static bool rb_is_reader_page(struct buffer_page *page) 1347 { 1348 struct list_head *list = page->list.prev; 1349 1350 return rb_list_head(list->next) != &page->list; 1351 } 1352 1353 /* 1354 * rb_set_list_to_head - set a list_head to be pointing to head. 1355 */ 1356 static void rb_set_list_to_head(struct list_head *list) 1357 { 1358 unsigned long *ptr; 1359 1360 ptr = (unsigned long *)&list->next; 1361 *ptr |= RB_PAGE_HEAD; 1362 *ptr &= ~RB_PAGE_UPDATE; 1363 } 1364 1365 /* 1366 * rb_head_page_activate - sets up head page 1367 */ 1368 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1369 { 1370 struct buffer_page *head; 1371 1372 head = cpu_buffer->head_page; 1373 if (!head) 1374 return; 1375 1376 /* 1377 * Set the previous list pointer to have the HEAD flag. 1378 */ 1379 rb_set_list_to_head(head->list.prev); 1380 } 1381 1382 static void rb_list_head_clear(struct list_head *list) 1383 { 1384 unsigned long *ptr = (unsigned long *)&list->next; 1385 1386 *ptr &= ~RB_FLAG_MASK; 1387 } 1388 1389 /* 1390 * rb_head_page_deactivate - clears head page ptr (for free list) 1391 */ 1392 static void 1393 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1394 { 1395 struct list_head *hd; 1396 1397 /* Go through the whole list and clear any pointers found. */ 1398 rb_list_head_clear(cpu_buffer->pages); 1399 1400 list_for_each(hd, cpu_buffer->pages) 1401 rb_list_head_clear(hd); 1402 } 1403 1404 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1405 struct buffer_page *head, 1406 struct buffer_page *prev, 1407 int old_flag, int new_flag) 1408 { 1409 struct list_head *list; 1410 unsigned long val = (unsigned long)&head->list; 1411 unsigned long ret; 1412 1413 list = &prev->list; 1414 1415 val &= ~RB_FLAG_MASK; 1416 1417 ret = cmpxchg((unsigned long *)&list->next, 1418 val | old_flag, val | new_flag); 1419 1420 /* check if the reader took the page */ 1421 if ((ret & ~RB_FLAG_MASK) != val) 1422 return RB_PAGE_MOVED; 1423 1424 return ret & RB_FLAG_MASK; 1425 } 1426 1427 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1428 struct buffer_page *head, 1429 struct buffer_page *prev, 1430 int old_flag) 1431 { 1432 return rb_head_page_set(cpu_buffer, head, prev, 1433 old_flag, RB_PAGE_UPDATE); 1434 } 1435 1436 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1437 struct buffer_page *head, 1438 struct buffer_page *prev, 1439 int old_flag) 1440 { 1441 return rb_head_page_set(cpu_buffer, head, prev, 1442 old_flag, RB_PAGE_HEAD); 1443 } 1444 1445 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1446 struct buffer_page *head, 1447 struct buffer_page *prev, 1448 int old_flag) 1449 { 1450 return rb_head_page_set(cpu_buffer, head, prev, 1451 old_flag, RB_PAGE_NORMAL); 1452 } 1453 1454 static inline void rb_inc_page(struct buffer_page **bpage) 1455 { 1456 struct list_head *p = rb_list_head((*bpage)->list.next); 1457 1458 *bpage = list_entry(p, struct buffer_page, list); 1459 } 1460 1461 static struct buffer_page * 1462 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1463 { 1464 struct buffer_page *head; 1465 struct buffer_page *page; 1466 struct list_head *list; 1467 int i; 1468 1469 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1470 return NULL; 1471 1472 /* sanity check */ 1473 list = cpu_buffer->pages; 1474 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1475 return NULL; 1476 1477 page = head = cpu_buffer->head_page; 1478 /* 1479 * It is possible that the writer moves the header behind 1480 * where we started, and we miss in one loop. 1481 * A second loop should grab the header, but we'll do 1482 * three loops just because I'm paranoid. 1483 */ 1484 for (i = 0; i < 3; i++) { 1485 do { 1486 if (rb_is_head_page(page, page->list.prev)) { 1487 cpu_buffer->head_page = page; 1488 return page; 1489 } 1490 rb_inc_page(&page); 1491 } while (page != head); 1492 } 1493 1494 RB_WARN_ON(cpu_buffer, 1); 1495 1496 return NULL; 1497 } 1498 1499 static bool rb_head_page_replace(struct buffer_page *old, 1500 struct buffer_page *new) 1501 { 1502 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1503 unsigned long val; 1504 1505 val = *ptr & ~RB_FLAG_MASK; 1506 val |= RB_PAGE_HEAD; 1507 1508 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1509 } 1510 1511 /* 1512 * rb_tail_page_update - move the tail page forward 1513 */ 1514 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1515 struct buffer_page *tail_page, 1516 struct buffer_page *next_page) 1517 { 1518 unsigned long old_entries; 1519 unsigned long old_write; 1520 1521 /* 1522 * The tail page now needs to be moved forward. 1523 * 1524 * We need to reset the tail page, but without messing 1525 * with possible erasing of data brought in by interrupts 1526 * that have moved the tail page and are currently on it. 1527 * 1528 * We add a counter to the write field to denote this. 1529 */ 1530 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1531 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1532 1533 local_inc(&cpu_buffer->pages_touched); 1534 /* 1535 * Just make sure we have seen our old_write and synchronize 1536 * with any interrupts that come in. 1537 */ 1538 barrier(); 1539 1540 /* 1541 * If the tail page is still the same as what we think 1542 * it is, then it is up to us to update the tail 1543 * pointer. 1544 */ 1545 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1546 /* Zero the write counter */ 1547 unsigned long val = old_write & ~RB_WRITE_MASK; 1548 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1549 1550 /* 1551 * This will only succeed if an interrupt did 1552 * not come in and change it. In which case, we 1553 * do not want to modify it. 1554 * 1555 * We add (void) to let the compiler know that we do not care 1556 * about the return value of these functions. We use the 1557 * cmpxchg to only update if an interrupt did not already 1558 * do it for us. If the cmpxchg fails, we don't care. 1559 */ 1560 (void)local_cmpxchg(&next_page->write, old_write, val); 1561 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1562 1563 /* 1564 * No need to worry about races with clearing out the commit. 1565 * it only can increment when a commit takes place. But that 1566 * only happens in the outer most nested commit. 1567 */ 1568 local_set(&next_page->page->commit, 0); 1569 1570 /* Again, either we update tail_page or an interrupt does */ 1571 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1572 } 1573 } 1574 1575 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1576 struct buffer_page *bpage) 1577 { 1578 unsigned long val = (unsigned long)bpage; 1579 1580 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1581 } 1582 1583 /** 1584 * rb_check_pages - integrity check of buffer pages 1585 * @cpu_buffer: CPU buffer with pages to test 1586 * 1587 * As a safety measure we check to make sure the data pages have not 1588 * been corrupted. 1589 */ 1590 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1591 { 1592 struct list_head *head = rb_list_head(cpu_buffer->pages); 1593 struct list_head *tmp; 1594 1595 if (RB_WARN_ON(cpu_buffer, 1596 rb_list_head(rb_list_head(head->next)->prev) != head)) 1597 return; 1598 1599 if (RB_WARN_ON(cpu_buffer, 1600 rb_list_head(rb_list_head(head->prev)->next) != head)) 1601 return; 1602 1603 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1604 if (RB_WARN_ON(cpu_buffer, 1605 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1606 return; 1607 1608 if (RB_WARN_ON(cpu_buffer, 1609 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1610 return; 1611 } 1612 } 1613 1614 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1615 long nr_pages, struct list_head *pages) 1616 { 1617 struct buffer_page *bpage, *tmp; 1618 bool user_thread = current->mm != NULL; 1619 gfp_t mflags; 1620 long i; 1621 1622 /* 1623 * Check if the available memory is there first. 1624 * Note, si_mem_available() only gives us a rough estimate of available 1625 * memory. It may not be accurate. But we don't care, we just want 1626 * to prevent doing any allocation when it is obvious that it is 1627 * not going to succeed. 1628 */ 1629 i = si_mem_available(); 1630 if (i < nr_pages) 1631 return -ENOMEM; 1632 1633 /* 1634 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1635 * gracefully without invoking oom-killer and the system is not 1636 * destabilized. 1637 */ 1638 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1639 1640 /* 1641 * If a user thread allocates too much, and si_mem_available() 1642 * reports there's enough memory, even though there is not. 1643 * Make sure the OOM killer kills this thread. This can happen 1644 * even with RETRY_MAYFAIL because another task may be doing 1645 * an allocation after this task has taken all memory. 1646 * This is the task the OOM killer needs to take out during this 1647 * loop, even if it was triggered by an allocation somewhere else. 1648 */ 1649 if (user_thread) 1650 set_current_oom_origin(); 1651 for (i = 0; i < nr_pages; i++) { 1652 struct page *page; 1653 1654 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1655 mflags, cpu_to_node(cpu_buffer->cpu)); 1656 if (!bpage) 1657 goto free_pages; 1658 1659 rb_check_bpage(cpu_buffer, bpage); 1660 1661 list_add(&bpage->list, pages); 1662 1663 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0); 1664 if (!page) 1665 goto free_pages; 1666 bpage->page = page_address(page); 1667 rb_init_page(bpage->page); 1668 1669 if (user_thread && fatal_signal_pending(current)) 1670 goto free_pages; 1671 } 1672 if (user_thread) 1673 clear_current_oom_origin(); 1674 1675 return 0; 1676 1677 free_pages: 1678 list_for_each_entry_safe(bpage, tmp, pages, list) { 1679 list_del_init(&bpage->list); 1680 free_buffer_page(bpage); 1681 } 1682 if (user_thread) 1683 clear_current_oom_origin(); 1684 1685 return -ENOMEM; 1686 } 1687 1688 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1689 unsigned long nr_pages) 1690 { 1691 LIST_HEAD(pages); 1692 1693 WARN_ON(!nr_pages); 1694 1695 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1696 return -ENOMEM; 1697 1698 /* 1699 * The ring buffer page list is a circular list that does not 1700 * start and end with a list head. All page list items point to 1701 * other pages. 1702 */ 1703 cpu_buffer->pages = pages.next; 1704 list_del(&pages); 1705 1706 cpu_buffer->nr_pages = nr_pages; 1707 1708 rb_check_pages(cpu_buffer); 1709 1710 return 0; 1711 } 1712 1713 static struct ring_buffer_per_cpu * 1714 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1715 { 1716 struct ring_buffer_per_cpu *cpu_buffer; 1717 struct buffer_page *bpage; 1718 struct page *page; 1719 int ret; 1720 1721 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1722 GFP_KERNEL, cpu_to_node(cpu)); 1723 if (!cpu_buffer) 1724 return NULL; 1725 1726 cpu_buffer->cpu = cpu; 1727 cpu_buffer->buffer = buffer; 1728 raw_spin_lock_init(&cpu_buffer->reader_lock); 1729 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1730 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1731 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1732 init_completion(&cpu_buffer->update_done); 1733 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1734 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1735 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1736 1737 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1738 GFP_KERNEL, cpu_to_node(cpu)); 1739 if (!bpage) 1740 goto fail_free_buffer; 1741 1742 rb_check_bpage(cpu_buffer, bpage); 1743 1744 cpu_buffer->reader_page = bpage; 1745 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1746 if (!page) 1747 goto fail_free_reader; 1748 bpage->page = page_address(page); 1749 rb_init_page(bpage->page); 1750 1751 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1752 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1753 1754 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1755 if (ret < 0) 1756 goto fail_free_reader; 1757 1758 cpu_buffer->head_page 1759 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1760 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1761 1762 rb_head_page_activate(cpu_buffer); 1763 1764 return cpu_buffer; 1765 1766 fail_free_reader: 1767 free_buffer_page(cpu_buffer->reader_page); 1768 1769 fail_free_buffer: 1770 kfree(cpu_buffer); 1771 return NULL; 1772 } 1773 1774 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1775 { 1776 struct list_head *head = cpu_buffer->pages; 1777 struct buffer_page *bpage, *tmp; 1778 1779 irq_work_sync(&cpu_buffer->irq_work.work); 1780 1781 free_buffer_page(cpu_buffer->reader_page); 1782 1783 if (head) { 1784 rb_head_page_deactivate(cpu_buffer); 1785 1786 list_for_each_entry_safe(bpage, tmp, head, list) { 1787 list_del_init(&bpage->list); 1788 free_buffer_page(bpage); 1789 } 1790 bpage = list_entry(head, struct buffer_page, list); 1791 free_buffer_page(bpage); 1792 } 1793 1794 free_page((unsigned long)cpu_buffer->free_page); 1795 1796 kfree(cpu_buffer); 1797 } 1798 1799 /** 1800 * __ring_buffer_alloc - allocate a new ring_buffer 1801 * @size: the size in bytes per cpu that is needed. 1802 * @flags: attributes to set for the ring buffer. 1803 * @key: ring buffer reader_lock_key. 1804 * 1805 * Currently the only flag that is available is the RB_FL_OVERWRITE 1806 * flag. This flag means that the buffer will overwrite old data 1807 * when the buffer wraps. If this flag is not set, the buffer will 1808 * drop data when the tail hits the head. 1809 */ 1810 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1811 struct lock_class_key *key) 1812 { 1813 struct trace_buffer *buffer; 1814 long nr_pages; 1815 int bsize; 1816 int cpu; 1817 int ret; 1818 1819 /* keep it in its own cache line */ 1820 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1821 GFP_KERNEL); 1822 if (!buffer) 1823 return NULL; 1824 1825 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1826 goto fail_free_buffer; 1827 1828 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1829 buffer->flags = flags; 1830 buffer->clock = trace_clock_local; 1831 buffer->reader_lock_key = key; 1832 1833 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1834 init_waitqueue_head(&buffer->irq_work.waiters); 1835 1836 /* need at least two pages */ 1837 if (nr_pages < 2) 1838 nr_pages = 2; 1839 1840 buffer->cpus = nr_cpu_ids; 1841 1842 bsize = sizeof(void *) * nr_cpu_ids; 1843 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1844 GFP_KERNEL); 1845 if (!buffer->buffers) 1846 goto fail_free_cpumask; 1847 1848 cpu = raw_smp_processor_id(); 1849 cpumask_set_cpu(cpu, buffer->cpumask); 1850 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1851 if (!buffer->buffers[cpu]) 1852 goto fail_free_buffers; 1853 1854 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1855 if (ret < 0) 1856 goto fail_free_buffers; 1857 1858 mutex_init(&buffer->mutex); 1859 1860 return buffer; 1861 1862 fail_free_buffers: 1863 for_each_buffer_cpu(buffer, cpu) { 1864 if (buffer->buffers[cpu]) 1865 rb_free_cpu_buffer(buffer->buffers[cpu]); 1866 } 1867 kfree(buffer->buffers); 1868 1869 fail_free_cpumask: 1870 free_cpumask_var(buffer->cpumask); 1871 1872 fail_free_buffer: 1873 kfree(buffer); 1874 return NULL; 1875 } 1876 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1877 1878 /** 1879 * ring_buffer_free - free a ring buffer. 1880 * @buffer: the buffer to free. 1881 */ 1882 void 1883 ring_buffer_free(struct trace_buffer *buffer) 1884 { 1885 int cpu; 1886 1887 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1888 1889 irq_work_sync(&buffer->irq_work.work); 1890 1891 for_each_buffer_cpu(buffer, cpu) 1892 rb_free_cpu_buffer(buffer->buffers[cpu]); 1893 1894 kfree(buffer->buffers); 1895 free_cpumask_var(buffer->cpumask); 1896 1897 kfree(buffer); 1898 } 1899 EXPORT_SYMBOL_GPL(ring_buffer_free); 1900 1901 void ring_buffer_set_clock(struct trace_buffer *buffer, 1902 u64 (*clock)(void)) 1903 { 1904 buffer->clock = clock; 1905 } 1906 1907 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1908 { 1909 buffer->time_stamp_abs = abs; 1910 } 1911 1912 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1913 { 1914 return buffer->time_stamp_abs; 1915 } 1916 1917 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1918 1919 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1920 { 1921 return local_read(&bpage->entries) & RB_WRITE_MASK; 1922 } 1923 1924 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1925 { 1926 return local_read(&bpage->write) & RB_WRITE_MASK; 1927 } 1928 1929 static bool 1930 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1931 { 1932 struct list_head *tail_page, *to_remove, *next_page; 1933 struct buffer_page *to_remove_page, *tmp_iter_page; 1934 struct buffer_page *last_page, *first_page; 1935 unsigned long nr_removed; 1936 unsigned long head_bit; 1937 int page_entries; 1938 1939 head_bit = 0; 1940 1941 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1942 atomic_inc(&cpu_buffer->record_disabled); 1943 /* 1944 * We don't race with the readers since we have acquired the reader 1945 * lock. We also don't race with writers after disabling recording. 1946 * This makes it easy to figure out the first and the last page to be 1947 * removed from the list. We unlink all the pages in between including 1948 * the first and last pages. This is done in a busy loop so that we 1949 * lose the least number of traces. 1950 * The pages are freed after we restart recording and unlock readers. 1951 */ 1952 tail_page = &cpu_buffer->tail_page->list; 1953 1954 /* 1955 * tail page might be on reader page, we remove the next page 1956 * from the ring buffer 1957 */ 1958 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1959 tail_page = rb_list_head(tail_page->next); 1960 to_remove = tail_page; 1961 1962 /* start of pages to remove */ 1963 first_page = list_entry(rb_list_head(to_remove->next), 1964 struct buffer_page, list); 1965 1966 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1967 to_remove = rb_list_head(to_remove)->next; 1968 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1969 } 1970 /* Read iterators need to reset themselves when some pages removed */ 1971 cpu_buffer->pages_removed += nr_removed; 1972 1973 next_page = rb_list_head(to_remove)->next; 1974 1975 /* 1976 * Now we remove all pages between tail_page and next_page. 1977 * Make sure that we have head_bit value preserved for the 1978 * next page 1979 */ 1980 tail_page->next = (struct list_head *)((unsigned long)next_page | 1981 head_bit); 1982 next_page = rb_list_head(next_page); 1983 next_page->prev = tail_page; 1984 1985 /* make sure pages points to a valid page in the ring buffer */ 1986 cpu_buffer->pages = next_page; 1987 1988 /* update head page */ 1989 if (head_bit) 1990 cpu_buffer->head_page = list_entry(next_page, 1991 struct buffer_page, list); 1992 1993 /* pages are removed, resume tracing and then free the pages */ 1994 atomic_dec(&cpu_buffer->record_disabled); 1995 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1996 1997 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1998 1999 /* last buffer page to remove */ 2000 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2001 list); 2002 tmp_iter_page = first_page; 2003 2004 do { 2005 cond_resched(); 2006 2007 to_remove_page = tmp_iter_page; 2008 rb_inc_page(&tmp_iter_page); 2009 2010 /* update the counters */ 2011 page_entries = rb_page_entries(to_remove_page); 2012 if (page_entries) { 2013 /* 2014 * If something was added to this page, it was full 2015 * since it is not the tail page. So we deduct the 2016 * bytes consumed in ring buffer from here. 2017 * Increment overrun to account for the lost events. 2018 */ 2019 local_add(page_entries, &cpu_buffer->overrun); 2020 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2021 local_inc(&cpu_buffer->pages_lost); 2022 } 2023 2024 /* 2025 * We have already removed references to this list item, just 2026 * free up the buffer_page and its page 2027 */ 2028 free_buffer_page(to_remove_page); 2029 nr_removed--; 2030 2031 } while (to_remove_page != last_page); 2032 2033 RB_WARN_ON(cpu_buffer, nr_removed); 2034 2035 return nr_removed == 0; 2036 } 2037 2038 static bool 2039 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2040 { 2041 struct list_head *pages = &cpu_buffer->new_pages; 2042 unsigned long flags; 2043 bool success; 2044 int retries; 2045 2046 /* Can be called at early boot up, where interrupts must not been enabled */ 2047 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2048 /* 2049 * We are holding the reader lock, so the reader page won't be swapped 2050 * in the ring buffer. Now we are racing with the writer trying to 2051 * move head page and the tail page. 2052 * We are going to adapt the reader page update process where: 2053 * 1. We first splice the start and end of list of new pages between 2054 * the head page and its previous page. 2055 * 2. We cmpxchg the prev_page->next to point from head page to the 2056 * start of new pages list. 2057 * 3. Finally, we update the head->prev to the end of new list. 2058 * 2059 * We will try this process 10 times, to make sure that we don't keep 2060 * spinning. 2061 */ 2062 retries = 10; 2063 success = false; 2064 while (retries--) { 2065 struct list_head *head_page, *prev_page; 2066 struct list_head *last_page, *first_page; 2067 struct list_head *head_page_with_bit; 2068 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2069 2070 if (!hpage) 2071 break; 2072 head_page = &hpage->list; 2073 prev_page = head_page->prev; 2074 2075 first_page = pages->next; 2076 last_page = pages->prev; 2077 2078 head_page_with_bit = (struct list_head *) 2079 ((unsigned long)head_page | RB_PAGE_HEAD); 2080 2081 last_page->next = head_page_with_bit; 2082 first_page->prev = prev_page; 2083 2084 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2085 if (try_cmpxchg(&prev_page->next, 2086 &head_page_with_bit, first_page)) { 2087 /* 2088 * yay, we replaced the page pointer to our new list, 2089 * now, we just have to update to head page's prev 2090 * pointer to point to end of list 2091 */ 2092 head_page->prev = last_page; 2093 success = true; 2094 break; 2095 } 2096 } 2097 2098 if (success) 2099 INIT_LIST_HEAD(pages); 2100 /* 2101 * If we weren't successful in adding in new pages, warn and stop 2102 * tracing 2103 */ 2104 RB_WARN_ON(cpu_buffer, !success); 2105 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2106 2107 /* free pages if they weren't inserted */ 2108 if (!success) { 2109 struct buffer_page *bpage, *tmp; 2110 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2111 list) { 2112 list_del_init(&bpage->list); 2113 free_buffer_page(bpage); 2114 } 2115 } 2116 return success; 2117 } 2118 2119 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2120 { 2121 bool success; 2122 2123 if (cpu_buffer->nr_pages_to_update > 0) 2124 success = rb_insert_pages(cpu_buffer); 2125 else 2126 success = rb_remove_pages(cpu_buffer, 2127 -cpu_buffer->nr_pages_to_update); 2128 2129 if (success) 2130 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2131 } 2132 2133 static void update_pages_handler(struct work_struct *work) 2134 { 2135 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2136 struct ring_buffer_per_cpu, update_pages_work); 2137 rb_update_pages(cpu_buffer); 2138 complete(&cpu_buffer->update_done); 2139 } 2140 2141 /** 2142 * ring_buffer_resize - resize the ring buffer 2143 * @buffer: the buffer to resize. 2144 * @size: the new size. 2145 * @cpu_id: the cpu buffer to resize 2146 * 2147 * Minimum size is 2 * BUF_PAGE_SIZE. 2148 * 2149 * Returns 0 on success and < 0 on failure. 2150 */ 2151 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2152 int cpu_id) 2153 { 2154 struct ring_buffer_per_cpu *cpu_buffer; 2155 unsigned long nr_pages; 2156 int cpu, err; 2157 2158 /* 2159 * Always succeed at resizing a non-existent buffer: 2160 */ 2161 if (!buffer) 2162 return 0; 2163 2164 /* Make sure the requested buffer exists */ 2165 if (cpu_id != RING_BUFFER_ALL_CPUS && 2166 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2167 return 0; 2168 2169 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 2170 2171 /* we need a minimum of two pages */ 2172 if (nr_pages < 2) 2173 nr_pages = 2; 2174 2175 /* prevent another thread from changing buffer sizes */ 2176 mutex_lock(&buffer->mutex); 2177 atomic_inc(&buffer->resizing); 2178 2179 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2180 /* 2181 * Don't succeed if resizing is disabled, as a reader might be 2182 * manipulating the ring buffer and is expecting a sane state while 2183 * this is true. 2184 */ 2185 for_each_buffer_cpu(buffer, cpu) { 2186 cpu_buffer = buffer->buffers[cpu]; 2187 if (atomic_read(&cpu_buffer->resize_disabled)) { 2188 err = -EBUSY; 2189 goto out_err_unlock; 2190 } 2191 } 2192 2193 /* calculate the pages to update */ 2194 for_each_buffer_cpu(buffer, cpu) { 2195 cpu_buffer = buffer->buffers[cpu]; 2196 2197 cpu_buffer->nr_pages_to_update = nr_pages - 2198 cpu_buffer->nr_pages; 2199 /* 2200 * nothing more to do for removing pages or no update 2201 */ 2202 if (cpu_buffer->nr_pages_to_update <= 0) 2203 continue; 2204 /* 2205 * to add pages, make sure all new pages can be 2206 * allocated without receiving ENOMEM 2207 */ 2208 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2209 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2210 &cpu_buffer->new_pages)) { 2211 /* not enough memory for new pages */ 2212 err = -ENOMEM; 2213 goto out_err; 2214 } 2215 2216 cond_resched(); 2217 } 2218 2219 cpus_read_lock(); 2220 /* 2221 * Fire off all the required work handlers 2222 * We can't schedule on offline CPUs, but it's not necessary 2223 * since we can change their buffer sizes without any race. 2224 */ 2225 for_each_buffer_cpu(buffer, cpu) { 2226 cpu_buffer = buffer->buffers[cpu]; 2227 if (!cpu_buffer->nr_pages_to_update) 2228 continue; 2229 2230 /* Can't run something on an offline CPU. */ 2231 if (!cpu_online(cpu)) { 2232 rb_update_pages(cpu_buffer); 2233 cpu_buffer->nr_pages_to_update = 0; 2234 } else { 2235 /* Run directly if possible. */ 2236 migrate_disable(); 2237 if (cpu != smp_processor_id()) { 2238 migrate_enable(); 2239 schedule_work_on(cpu, 2240 &cpu_buffer->update_pages_work); 2241 } else { 2242 update_pages_handler(&cpu_buffer->update_pages_work); 2243 migrate_enable(); 2244 } 2245 } 2246 } 2247 2248 /* wait for all the updates to complete */ 2249 for_each_buffer_cpu(buffer, cpu) { 2250 cpu_buffer = buffer->buffers[cpu]; 2251 if (!cpu_buffer->nr_pages_to_update) 2252 continue; 2253 2254 if (cpu_online(cpu)) 2255 wait_for_completion(&cpu_buffer->update_done); 2256 cpu_buffer->nr_pages_to_update = 0; 2257 } 2258 2259 cpus_read_unlock(); 2260 } else { 2261 cpu_buffer = buffer->buffers[cpu_id]; 2262 2263 if (nr_pages == cpu_buffer->nr_pages) 2264 goto out; 2265 2266 /* 2267 * Don't succeed if resizing is disabled, as a reader might be 2268 * manipulating the ring buffer and is expecting a sane state while 2269 * this is true. 2270 */ 2271 if (atomic_read(&cpu_buffer->resize_disabled)) { 2272 err = -EBUSY; 2273 goto out_err_unlock; 2274 } 2275 2276 cpu_buffer->nr_pages_to_update = nr_pages - 2277 cpu_buffer->nr_pages; 2278 2279 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2280 if (cpu_buffer->nr_pages_to_update > 0 && 2281 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2282 &cpu_buffer->new_pages)) { 2283 err = -ENOMEM; 2284 goto out_err; 2285 } 2286 2287 cpus_read_lock(); 2288 2289 /* Can't run something on an offline CPU. */ 2290 if (!cpu_online(cpu_id)) 2291 rb_update_pages(cpu_buffer); 2292 else { 2293 /* Run directly if possible. */ 2294 migrate_disable(); 2295 if (cpu_id == smp_processor_id()) { 2296 rb_update_pages(cpu_buffer); 2297 migrate_enable(); 2298 } else { 2299 migrate_enable(); 2300 schedule_work_on(cpu_id, 2301 &cpu_buffer->update_pages_work); 2302 wait_for_completion(&cpu_buffer->update_done); 2303 } 2304 } 2305 2306 cpu_buffer->nr_pages_to_update = 0; 2307 cpus_read_unlock(); 2308 } 2309 2310 out: 2311 /* 2312 * The ring buffer resize can happen with the ring buffer 2313 * enabled, so that the update disturbs the tracing as little 2314 * as possible. But if the buffer is disabled, we do not need 2315 * to worry about that, and we can take the time to verify 2316 * that the buffer is not corrupt. 2317 */ 2318 if (atomic_read(&buffer->record_disabled)) { 2319 atomic_inc(&buffer->record_disabled); 2320 /* 2321 * Even though the buffer was disabled, we must make sure 2322 * that it is truly disabled before calling rb_check_pages. 2323 * There could have been a race between checking 2324 * record_disable and incrementing it. 2325 */ 2326 synchronize_rcu(); 2327 for_each_buffer_cpu(buffer, cpu) { 2328 cpu_buffer = buffer->buffers[cpu]; 2329 rb_check_pages(cpu_buffer); 2330 } 2331 atomic_dec(&buffer->record_disabled); 2332 } 2333 2334 atomic_dec(&buffer->resizing); 2335 mutex_unlock(&buffer->mutex); 2336 return 0; 2337 2338 out_err: 2339 for_each_buffer_cpu(buffer, cpu) { 2340 struct buffer_page *bpage, *tmp; 2341 2342 cpu_buffer = buffer->buffers[cpu]; 2343 cpu_buffer->nr_pages_to_update = 0; 2344 2345 if (list_empty(&cpu_buffer->new_pages)) 2346 continue; 2347 2348 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2349 list) { 2350 list_del_init(&bpage->list); 2351 free_buffer_page(bpage); 2352 } 2353 } 2354 out_err_unlock: 2355 atomic_dec(&buffer->resizing); 2356 mutex_unlock(&buffer->mutex); 2357 return err; 2358 } 2359 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2360 2361 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2362 { 2363 mutex_lock(&buffer->mutex); 2364 if (val) 2365 buffer->flags |= RB_FL_OVERWRITE; 2366 else 2367 buffer->flags &= ~RB_FL_OVERWRITE; 2368 mutex_unlock(&buffer->mutex); 2369 } 2370 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2371 2372 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2373 { 2374 return bpage->page->data + index; 2375 } 2376 2377 static __always_inline struct ring_buffer_event * 2378 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2379 { 2380 return __rb_page_index(cpu_buffer->reader_page, 2381 cpu_buffer->reader_page->read); 2382 } 2383 2384 static struct ring_buffer_event * 2385 rb_iter_head_event(struct ring_buffer_iter *iter) 2386 { 2387 struct ring_buffer_event *event; 2388 struct buffer_page *iter_head_page = iter->head_page; 2389 unsigned long commit; 2390 unsigned length; 2391 2392 if (iter->head != iter->next_event) 2393 return iter->event; 2394 2395 /* 2396 * When the writer goes across pages, it issues a cmpxchg which 2397 * is a mb(), which will synchronize with the rmb here. 2398 * (see rb_tail_page_update() and __rb_reserve_next()) 2399 */ 2400 commit = rb_page_commit(iter_head_page); 2401 smp_rmb(); 2402 2403 /* An event needs to be at least 8 bytes in size */ 2404 if (iter->head > commit - 8) 2405 goto reset; 2406 2407 event = __rb_page_index(iter_head_page, iter->head); 2408 length = rb_event_length(event); 2409 2410 /* 2411 * READ_ONCE() doesn't work on functions and we don't want the 2412 * compiler doing any crazy optimizations with length. 2413 */ 2414 barrier(); 2415 2416 if ((iter->head + length) > commit || length > BUF_PAGE_SIZE) 2417 /* Writer corrupted the read? */ 2418 goto reset; 2419 2420 memcpy(iter->event, event, length); 2421 /* 2422 * If the page stamp is still the same after this rmb() then the 2423 * event was safely copied without the writer entering the page. 2424 */ 2425 smp_rmb(); 2426 2427 /* Make sure the page didn't change since we read this */ 2428 if (iter->page_stamp != iter_head_page->page->time_stamp || 2429 commit > rb_page_commit(iter_head_page)) 2430 goto reset; 2431 2432 iter->next_event = iter->head + length; 2433 return iter->event; 2434 reset: 2435 /* Reset to the beginning */ 2436 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2437 iter->head = 0; 2438 iter->next_event = 0; 2439 iter->missed_events = 1; 2440 return NULL; 2441 } 2442 2443 /* Size is determined by what has been committed */ 2444 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2445 { 2446 return rb_page_commit(bpage); 2447 } 2448 2449 static __always_inline unsigned 2450 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2451 { 2452 return rb_page_commit(cpu_buffer->commit_page); 2453 } 2454 2455 static __always_inline unsigned 2456 rb_event_index(struct ring_buffer_event *event) 2457 { 2458 unsigned long addr = (unsigned long)event; 2459 2460 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2461 } 2462 2463 static void rb_inc_iter(struct ring_buffer_iter *iter) 2464 { 2465 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2466 2467 /* 2468 * The iterator could be on the reader page (it starts there). 2469 * But the head could have moved, since the reader was 2470 * found. Check for this case and assign the iterator 2471 * to the head page instead of next. 2472 */ 2473 if (iter->head_page == cpu_buffer->reader_page) 2474 iter->head_page = rb_set_head_page(cpu_buffer); 2475 else 2476 rb_inc_page(&iter->head_page); 2477 2478 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2479 iter->head = 0; 2480 iter->next_event = 0; 2481 } 2482 2483 /* 2484 * rb_handle_head_page - writer hit the head page 2485 * 2486 * Returns: +1 to retry page 2487 * 0 to continue 2488 * -1 on error 2489 */ 2490 static int 2491 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2492 struct buffer_page *tail_page, 2493 struct buffer_page *next_page) 2494 { 2495 struct buffer_page *new_head; 2496 int entries; 2497 int type; 2498 int ret; 2499 2500 entries = rb_page_entries(next_page); 2501 2502 /* 2503 * The hard part is here. We need to move the head 2504 * forward, and protect against both readers on 2505 * other CPUs and writers coming in via interrupts. 2506 */ 2507 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2508 RB_PAGE_HEAD); 2509 2510 /* 2511 * type can be one of four: 2512 * NORMAL - an interrupt already moved it for us 2513 * HEAD - we are the first to get here. 2514 * UPDATE - we are the interrupt interrupting 2515 * a current move. 2516 * MOVED - a reader on another CPU moved the next 2517 * pointer to its reader page. Give up 2518 * and try again. 2519 */ 2520 2521 switch (type) { 2522 case RB_PAGE_HEAD: 2523 /* 2524 * We changed the head to UPDATE, thus 2525 * it is our responsibility to update 2526 * the counters. 2527 */ 2528 local_add(entries, &cpu_buffer->overrun); 2529 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 2530 local_inc(&cpu_buffer->pages_lost); 2531 2532 /* 2533 * The entries will be zeroed out when we move the 2534 * tail page. 2535 */ 2536 2537 /* still more to do */ 2538 break; 2539 2540 case RB_PAGE_UPDATE: 2541 /* 2542 * This is an interrupt that interrupt the 2543 * previous update. Still more to do. 2544 */ 2545 break; 2546 case RB_PAGE_NORMAL: 2547 /* 2548 * An interrupt came in before the update 2549 * and processed this for us. 2550 * Nothing left to do. 2551 */ 2552 return 1; 2553 case RB_PAGE_MOVED: 2554 /* 2555 * The reader is on another CPU and just did 2556 * a swap with our next_page. 2557 * Try again. 2558 */ 2559 return 1; 2560 default: 2561 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2562 return -1; 2563 } 2564 2565 /* 2566 * Now that we are here, the old head pointer is 2567 * set to UPDATE. This will keep the reader from 2568 * swapping the head page with the reader page. 2569 * The reader (on another CPU) will spin till 2570 * we are finished. 2571 * 2572 * We just need to protect against interrupts 2573 * doing the job. We will set the next pointer 2574 * to HEAD. After that, we set the old pointer 2575 * to NORMAL, but only if it was HEAD before. 2576 * otherwise we are an interrupt, and only 2577 * want the outer most commit to reset it. 2578 */ 2579 new_head = next_page; 2580 rb_inc_page(&new_head); 2581 2582 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2583 RB_PAGE_NORMAL); 2584 2585 /* 2586 * Valid returns are: 2587 * HEAD - an interrupt came in and already set it. 2588 * NORMAL - One of two things: 2589 * 1) We really set it. 2590 * 2) A bunch of interrupts came in and moved 2591 * the page forward again. 2592 */ 2593 switch (ret) { 2594 case RB_PAGE_HEAD: 2595 case RB_PAGE_NORMAL: 2596 /* OK */ 2597 break; 2598 default: 2599 RB_WARN_ON(cpu_buffer, 1); 2600 return -1; 2601 } 2602 2603 /* 2604 * It is possible that an interrupt came in, 2605 * set the head up, then more interrupts came in 2606 * and moved it again. When we get back here, 2607 * the page would have been set to NORMAL but we 2608 * just set it back to HEAD. 2609 * 2610 * How do you detect this? Well, if that happened 2611 * the tail page would have moved. 2612 */ 2613 if (ret == RB_PAGE_NORMAL) { 2614 struct buffer_page *buffer_tail_page; 2615 2616 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2617 /* 2618 * If the tail had moved passed next, then we need 2619 * to reset the pointer. 2620 */ 2621 if (buffer_tail_page != tail_page && 2622 buffer_tail_page != next_page) 2623 rb_head_page_set_normal(cpu_buffer, new_head, 2624 next_page, 2625 RB_PAGE_HEAD); 2626 } 2627 2628 /* 2629 * If this was the outer most commit (the one that 2630 * changed the original pointer from HEAD to UPDATE), 2631 * then it is up to us to reset it to NORMAL. 2632 */ 2633 if (type == RB_PAGE_HEAD) { 2634 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2635 tail_page, 2636 RB_PAGE_UPDATE); 2637 if (RB_WARN_ON(cpu_buffer, 2638 ret != RB_PAGE_UPDATE)) 2639 return -1; 2640 } 2641 2642 return 0; 2643 } 2644 2645 static inline void 2646 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2647 unsigned long tail, struct rb_event_info *info) 2648 { 2649 struct buffer_page *tail_page = info->tail_page; 2650 struct ring_buffer_event *event; 2651 unsigned long length = info->length; 2652 2653 /* 2654 * Only the event that crossed the page boundary 2655 * must fill the old tail_page with padding. 2656 */ 2657 if (tail >= BUF_PAGE_SIZE) { 2658 /* 2659 * If the page was filled, then we still need 2660 * to update the real_end. Reset it to zero 2661 * and the reader will ignore it. 2662 */ 2663 if (tail == BUF_PAGE_SIZE) 2664 tail_page->real_end = 0; 2665 2666 local_sub(length, &tail_page->write); 2667 return; 2668 } 2669 2670 event = __rb_page_index(tail_page, tail); 2671 2672 /* 2673 * Save the original length to the meta data. 2674 * This will be used by the reader to add lost event 2675 * counter. 2676 */ 2677 tail_page->real_end = tail; 2678 2679 /* 2680 * If this event is bigger than the minimum size, then 2681 * we need to be careful that we don't subtract the 2682 * write counter enough to allow another writer to slip 2683 * in on this page. 2684 * We put in a discarded commit instead, to make sure 2685 * that this space is not used again, and this space will 2686 * not be accounted into 'entries_bytes'. 2687 * 2688 * If we are less than the minimum size, we don't need to 2689 * worry about it. 2690 */ 2691 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2692 /* No room for any events */ 2693 2694 /* Mark the rest of the page with padding */ 2695 rb_event_set_padding(event); 2696 2697 /* Make sure the padding is visible before the write update */ 2698 smp_wmb(); 2699 2700 /* Set the write back to the previous setting */ 2701 local_sub(length, &tail_page->write); 2702 return; 2703 } 2704 2705 /* Put in a discarded event */ 2706 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2707 event->type_len = RINGBUF_TYPE_PADDING; 2708 /* time delta must be non zero */ 2709 event->time_delta = 1; 2710 2711 /* account for padding bytes */ 2712 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2713 2714 /* Make sure the padding is visible before the tail_page->write update */ 2715 smp_wmb(); 2716 2717 /* Set write to end of buffer */ 2718 length = (tail + length) - BUF_PAGE_SIZE; 2719 local_sub(length, &tail_page->write); 2720 } 2721 2722 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2723 2724 /* 2725 * This is the slow path, force gcc not to inline it. 2726 */ 2727 static noinline struct ring_buffer_event * 2728 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2729 unsigned long tail, struct rb_event_info *info) 2730 { 2731 struct buffer_page *tail_page = info->tail_page; 2732 struct buffer_page *commit_page = cpu_buffer->commit_page; 2733 struct trace_buffer *buffer = cpu_buffer->buffer; 2734 struct buffer_page *next_page; 2735 int ret; 2736 2737 next_page = tail_page; 2738 2739 rb_inc_page(&next_page); 2740 2741 /* 2742 * If for some reason, we had an interrupt storm that made 2743 * it all the way around the buffer, bail, and warn 2744 * about it. 2745 */ 2746 if (unlikely(next_page == commit_page)) { 2747 local_inc(&cpu_buffer->commit_overrun); 2748 goto out_reset; 2749 } 2750 2751 /* 2752 * This is where the fun begins! 2753 * 2754 * We are fighting against races between a reader that 2755 * could be on another CPU trying to swap its reader 2756 * page with the buffer head. 2757 * 2758 * We are also fighting against interrupts coming in and 2759 * moving the head or tail on us as well. 2760 * 2761 * If the next page is the head page then we have filled 2762 * the buffer, unless the commit page is still on the 2763 * reader page. 2764 */ 2765 if (rb_is_head_page(next_page, &tail_page->list)) { 2766 2767 /* 2768 * If the commit is not on the reader page, then 2769 * move the header page. 2770 */ 2771 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2772 /* 2773 * If we are not in overwrite mode, 2774 * this is easy, just stop here. 2775 */ 2776 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2777 local_inc(&cpu_buffer->dropped_events); 2778 goto out_reset; 2779 } 2780 2781 ret = rb_handle_head_page(cpu_buffer, 2782 tail_page, 2783 next_page); 2784 if (ret < 0) 2785 goto out_reset; 2786 if (ret) 2787 goto out_again; 2788 } else { 2789 /* 2790 * We need to be careful here too. The 2791 * commit page could still be on the reader 2792 * page. We could have a small buffer, and 2793 * have filled up the buffer with events 2794 * from interrupts and such, and wrapped. 2795 * 2796 * Note, if the tail page is also on the 2797 * reader_page, we let it move out. 2798 */ 2799 if (unlikely((cpu_buffer->commit_page != 2800 cpu_buffer->tail_page) && 2801 (cpu_buffer->commit_page == 2802 cpu_buffer->reader_page))) { 2803 local_inc(&cpu_buffer->commit_overrun); 2804 goto out_reset; 2805 } 2806 } 2807 } 2808 2809 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2810 2811 out_again: 2812 2813 rb_reset_tail(cpu_buffer, tail, info); 2814 2815 /* Commit what we have for now. */ 2816 rb_end_commit(cpu_buffer); 2817 /* rb_end_commit() decs committing */ 2818 local_inc(&cpu_buffer->committing); 2819 2820 /* fail and let the caller try again */ 2821 return ERR_PTR(-EAGAIN); 2822 2823 out_reset: 2824 /* reset write */ 2825 rb_reset_tail(cpu_buffer, tail, info); 2826 2827 return NULL; 2828 } 2829 2830 /* Slow path */ 2831 static struct ring_buffer_event * 2832 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2833 { 2834 if (abs) 2835 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2836 else 2837 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2838 2839 /* Not the first event on the page, or not delta? */ 2840 if (abs || rb_event_index(event)) { 2841 event->time_delta = delta & TS_MASK; 2842 event->array[0] = delta >> TS_SHIFT; 2843 } else { 2844 /* nope, just zero it */ 2845 event->time_delta = 0; 2846 event->array[0] = 0; 2847 } 2848 2849 return skip_time_extend(event); 2850 } 2851 2852 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2853 static inline bool sched_clock_stable(void) 2854 { 2855 return true; 2856 } 2857 #endif 2858 2859 static void 2860 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2861 struct rb_event_info *info) 2862 { 2863 u64 write_stamp; 2864 2865 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2866 (unsigned long long)info->delta, 2867 (unsigned long long)info->ts, 2868 (unsigned long long)info->before, 2869 (unsigned long long)info->after, 2870 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2871 sched_clock_stable() ? "" : 2872 "If you just came from a suspend/resume,\n" 2873 "please switch to the trace global clock:\n" 2874 " echo global > /sys/kernel/tracing/trace_clock\n" 2875 "or add trace_clock=global to the kernel command line\n"); 2876 } 2877 2878 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2879 struct ring_buffer_event **event, 2880 struct rb_event_info *info, 2881 u64 *delta, 2882 unsigned int *length) 2883 { 2884 bool abs = info->add_timestamp & 2885 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2886 2887 if (unlikely(info->delta > (1ULL << 59))) { 2888 /* 2889 * Some timers can use more than 59 bits, and when a timestamp 2890 * is added to the buffer, it will lose those bits. 2891 */ 2892 if (abs && (info->ts & TS_MSB)) { 2893 info->delta &= ABS_TS_MASK; 2894 2895 /* did the clock go backwards */ 2896 } else if (info->before == info->after && info->before > info->ts) { 2897 /* not interrupted */ 2898 static int once; 2899 2900 /* 2901 * This is possible with a recalibrating of the TSC. 2902 * Do not produce a call stack, but just report it. 2903 */ 2904 if (!once) { 2905 once++; 2906 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2907 info->before, info->ts); 2908 } 2909 } else 2910 rb_check_timestamp(cpu_buffer, info); 2911 if (!abs) 2912 info->delta = 0; 2913 } 2914 *event = rb_add_time_stamp(*event, info->delta, abs); 2915 *length -= RB_LEN_TIME_EXTEND; 2916 *delta = 0; 2917 } 2918 2919 /** 2920 * rb_update_event - update event type and data 2921 * @cpu_buffer: The per cpu buffer of the @event 2922 * @event: the event to update 2923 * @info: The info to update the @event with (contains length and delta) 2924 * 2925 * Update the type and data fields of the @event. The length 2926 * is the actual size that is written to the ring buffer, 2927 * and with this, we can determine what to place into the 2928 * data field. 2929 */ 2930 static void 2931 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2932 struct ring_buffer_event *event, 2933 struct rb_event_info *info) 2934 { 2935 unsigned length = info->length; 2936 u64 delta = info->delta; 2937 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2938 2939 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2940 cpu_buffer->event_stamp[nest] = info->ts; 2941 2942 /* 2943 * If we need to add a timestamp, then we 2944 * add it to the start of the reserved space. 2945 */ 2946 if (unlikely(info->add_timestamp)) 2947 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2948 2949 event->time_delta = delta; 2950 length -= RB_EVNT_HDR_SIZE; 2951 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2952 event->type_len = 0; 2953 event->array[0] = length; 2954 } else 2955 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2956 } 2957 2958 static unsigned rb_calculate_event_length(unsigned length) 2959 { 2960 struct ring_buffer_event event; /* Used only for sizeof array */ 2961 2962 /* zero length can cause confusions */ 2963 if (!length) 2964 length++; 2965 2966 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2967 length += sizeof(event.array[0]); 2968 2969 length += RB_EVNT_HDR_SIZE; 2970 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2971 2972 /* 2973 * In case the time delta is larger than the 27 bits for it 2974 * in the header, we need to add a timestamp. If another 2975 * event comes in when trying to discard this one to increase 2976 * the length, then the timestamp will be added in the allocated 2977 * space of this event. If length is bigger than the size needed 2978 * for the TIME_EXTEND, then padding has to be used. The events 2979 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2980 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2981 * As length is a multiple of 4, we only need to worry if it 2982 * is 12 (RB_LEN_TIME_EXTEND + 4). 2983 */ 2984 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2985 length += RB_ALIGNMENT; 2986 2987 return length; 2988 } 2989 2990 static inline bool 2991 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2992 struct ring_buffer_event *event) 2993 { 2994 unsigned long new_index, old_index; 2995 struct buffer_page *bpage; 2996 unsigned long addr; 2997 2998 new_index = rb_event_index(event); 2999 old_index = new_index + rb_event_ts_length(event); 3000 addr = (unsigned long)event; 3001 addr &= PAGE_MASK; 3002 3003 bpage = READ_ONCE(cpu_buffer->tail_page); 3004 3005 /* 3006 * Make sure the tail_page is still the same and 3007 * the next write location is the end of this event 3008 */ 3009 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3010 unsigned long write_mask = 3011 local_read(&bpage->write) & ~RB_WRITE_MASK; 3012 unsigned long event_length = rb_event_length(event); 3013 3014 /* 3015 * For the before_stamp to be different than the write_stamp 3016 * to make sure that the next event adds an absolute 3017 * value and does not rely on the saved write stamp, which 3018 * is now going to be bogus. 3019 * 3020 * By setting the before_stamp to zero, the next event 3021 * is not going to use the write_stamp and will instead 3022 * create an absolute timestamp. This means there's no 3023 * reason to update the wirte_stamp! 3024 */ 3025 rb_time_set(&cpu_buffer->before_stamp, 0); 3026 3027 /* 3028 * If an event were to come in now, it would see that the 3029 * write_stamp and the before_stamp are different, and assume 3030 * that this event just added itself before updating 3031 * the write stamp. The interrupting event will fix the 3032 * write stamp for us, and use an absolute timestamp. 3033 */ 3034 3035 /* 3036 * This is on the tail page. It is possible that 3037 * a write could come in and move the tail page 3038 * and write to the next page. That is fine 3039 * because we just shorten what is on this page. 3040 */ 3041 old_index += write_mask; 3042 new_index += write_mask; 3043 3044 /* caution: old_index gets updated on cmpxchg failure */ 3045 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3046 /* update counters */ 3047 local_sub(event_length, &cpu_buffer->entries_bytes); 3048 return true; 3049 } 3050 } 3051 3052 /* could not discard */ 3053 return false; 3054 } 3055 3056 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3057 { 3058 local_inc(&cpu_buffer->committing); 3059 local_inc(&cpu_buffer->commits); 3060 } 3061 3062 static __always_inline void 3063 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3064 { 3065 unsigned long max_count; 3066 3067 /* 3068 * We only race with interrupts and NMIs on this CPU. 3069 * If we own the commit event, then we can commit 3070 * all others that interrupted us, since the interruptions 3071 * are in stack format (they finish before they come 3072 * back to us). This allows us to do a simple loop to 3073 * assign the commit to the tail. 3074 */ 3075 again: 3076 max_count = cpu_buffer->nr_pages * 100; 3077 3078 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3079 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3080 return; 3081 if (RB_WARN_ON(cpu_buffer, 3082 rb_is_reader_page(cpu_buffer->tail_page))) 3083 return; 3084 /* 3085 * No need for a memory barrier here, as the update 3086 * of the tail_page did it for this page. 3087 */ 3088 local_set(&cpu_buffer->commit_page->page->commit, 3089 rb_page_write(cpu_buffer->commit_page)); 3090 rb_inc_page(&cpu_buffer->commit_page); 3091 /* add barrier to keep gcc from optimizing too much */ 3092 barrier(); 3093 } 3094 while (rb_commit_index(cpu_buffer) != 3095 rb_page_write(cpu_buffer->commit_page)) { 3096 3097 /* Make sure the readers see the content of what is committed. */ 3098 smp_wmb(); 3099 local_set(&cpu_buffer->commit_page->page->commit, 3100 rb_page_write(cpu_buffer->commit_page)); 3101 RB_WARN_ON(cpu_buffer, 3102 local_read(&cpu_buffer->commit_page->page->commit) & 3103 ~RB_WRITE_MASK); 3104 barrier(); 3105 } 3106 3107 /* again, keep gcc from optimizing */ 3108 barrier(); 3109 3110 /* 3111 * If an interrupt came in just after the first while loop 3112 * and pushed the tail page forward, we will be left with 3113 * a dangling commit that will never go forward. 3114 */ 3115 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3116 goto again; 3117 } 3118 3119 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3120 { 3121 unsigned long commits; 3122 3123 if (RB_WARN_ON(cpu_buffer, 3124 !local_read(&cpu_buffer->committing))) 3125 return; 3126 3127 again: 3128 commits = local_read(&cpu_buffer->commits); 3129 /* synchronize with interrupts */ 3130 barrier(); 3131 if (local_read(&cpu_buffer->committing) == 1) 3132 rb_set_commit_to_write(cpu_buffer); 3133 3134 local_dec(&cpu_buffer->committing); 3135 3136 /* synchronize with interrupts */ 3137 barrier(); 3138 3139 /* 3140 * Need to account for interrupts coming in between the 3141 * updating of the commit page and the clearing of the 3142 * committing counter. 3143 */ 3144 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3145 !local_read(&cpu_buffer->committing)) { 3146 local_inc(&cpu_buffer->committing); 3147 goto again; 3148 } 3149 } 3150 3151 static inline void rb_event_discard(struct ring_buffer_event *event) 3152 { 3153 if (extended_time(event)) 3154 event = skip_time_extend(event); 3155 3156 /* array[0] holds the actual length for the discarded event */ 3157 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3158 event->type_len = RINGBUF_TYPE_PADDING; 3159 /* time delta must be non zero */ 3160 if (!event->time_delta) 3161 event->time_delta = 1; 3162 } 3163 3164 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3165 { 3166 local_inc(&cpu_buffer->entries); 3167 rb_end_commit(cpu_buffer); 3168 } 3169 3170 static __always_inline void 3171 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3172 { 3173 if (buffer->irq_work.waiters_pending) { 3174 buffer->irq_work.waiters_pending = false; 3175 /* irq_work_queue() supplies it's own memory barriers */ 3176 irq_work_queue(&buffer->irq_work.work); 3177 } 3178 3179 if (cpu_buffer->irq_work.waiters_pending) { 3180 cpu_buffer->irq_work.waiters_pending = false; 3181 /* irq_work_queue() supplies it's own memory barriers */ 3182 irq_work_queue(&cpu_buffer->irq_work.work); 3183 } 3184 3185 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3186 return; 3187 3188 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3189 return; 3190 3191 if (!cpu_buffer->irq_work.full_waiters_pending) 3192 return; 3193 3194 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3195 3196 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3197 return; 3198 3199 cpu_buffer->irq_work.wakeup_full = true; 3200 cpu_buffer->irq_work.full_waiters_pending = false; 3201 /* irq_work_queue() supplies it's own memory barriers */ 3202 irq_work_queue(&cpu_buffer->irq_work.work); 3203 } 3204 3205 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3206 # define do_ring_buffer_record_recursion() \ 3207 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3208 #else 3209 # define do_ring_buffer_record_recursion() do { } while (0) 3210 #endif 3211 3212 /* 3213 * The lock and unlock are done within a preempt disable section. 3214 * The current_context per_cpu variable can only be modified 3215 * by the current task between lock and unlock. But it can 3216 * be modified more than once via an interrupt. To pass this 3217 * information from the lock to the unlock without having to 3218 * access the 'in_interrupt()' functions again (which do show 3219 * a bit of overhead in something as critical as function tracing, 3220 * we use a bitmask trick. 3221 * 3222 * bit 1 = NMI context 3223 * bit 2 = IRQ context 3224 * bit 3 = SoftIRQ context 3225 * bit 4 = normal context. 3226 * 3227 * This works because this is the order of contexts that can 3228 * preempt other contexts. A SoftIRQ never preempts an IRQ 3229 * context. 3230 * 3231 * When the context is determined, the corresponding bit is 3232 * checked and set (if it was set, then a recursion of that context 3233 * happened). 3234 * 3235 * On unlock, we need to clear this bit. To do so, just subtract 3236 * 1 from the current_context and AND it to itself. 3237 * 3238 * (binary) 3239 * 101 - 1 = 100 3240 * 101 & 100 = 100 (clearing bit zero) 3241 * 3242 * 1010 - 1 = 1001 3243 * 1010 & 1001 = 1000 (clearing bit 1) 3244 * 3245 * The least significant bit can be cleared this way, and it 3246 * just so happens that it is the same bit corresponding to 3247 * the current context. 3248 * 3249 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3250 * is set when a recursion is detected at the current context, and if 3251 * the TRANSITION bit is already set, it will fail the recursion. 3252 * This is needed because there's a lag between the changing of 3253 * interrupt context and updating the preempt count. In this case, 3254 * a false positive will be found. To handle this, one extra recursion 3255 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3256 * bit is already set, then it is considered a recursion and the function 3257 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3258 * 3259 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3260 * to be cleared. Even if it wasn't the context that set it. That is, 3261 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3262 * is called before preempt_count() is updated, since the check will 3263 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3264 * NMI then comes in, it will set the NMI bit, but when the NMI code 3265 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3266 * and leave the NMI bit set. But this is fine, because the interrupt 3267 * code that set the TRANSITION bit will then clear the NMI bit when it 3268 * calls trace_recursive_unlock(). If another NMI comes in, it will 3269 * set the TRANSITION bit and continue. 3270 * 3271 * Note: The TRANSITION bit only handles a single transition between context. 3272 */ 3273 3274 static __always_inline bool 3275 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3276 { 3277 unsigned int val = cpu_buffer->current_context; 3278 int bit = interrupt_context_level(); 3279 3280 bit = RB_CTX_NORMAL - bit; 3281 3282 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3283 /* 3284 * It is possible that this was called by transitioning 3285 * between interrupt context, and preempt_count() has not 3286 * been updated yet. In this case, use the TRANSITION bit. 3287 */ 3288 bit = RB_CTX_TRANSITION; 3289 if (val & (1 << (bit + cpu_buffer->nest))) { 3290 do_ring_buffer_record_recursion(); 3291 return true; 3292 } 3293 } 3294 3295 val |= (1 << (bit + cpu_buffer->nest)); 3296 cpu_buffer->current_context = val; 3297 3298 return false; 3299 } 3300 3301 static __always_inline void 3302 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3303 { 3304 cpu_buffer->current_context &= 3305 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3306 } 3307 3308 /* The recursive locking above uses 5 bits */ 3309 #define NESTED_BITS 5 3310 3311 /** 3312 * ring_buffer_nest_start - Allow to trace while nested 3313 * @buffer: The ring buffer to modify 3314 * 3315 * The ring buffer has a safety mechanism to prevent recursion. 3316 * But there may be a case where a trace needs to be done while 3317 * tracing something else. In this case, calling this function 3318 * will allow this function to nest within a currently active 3319 * ring_buffer_lock_reserve(). 3320 * 3321 * Call this function before calling another ring_buffer_lock_reserve() and 3322 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3323 */ 3324 void ring_buffer_nest_start(struct trace_buffer *buffer) 3325 { 3326 struct ring_buffer_per_cpu *cpu_buffer; 3327 int cpu; 3328 3329 /* Enabled by ring_buffer_nest_end() */ 3330 preempt_disable_notrace(); 3331 cpu = raw_smp_processor_id(); 3332 cpu_buffer = buffer->buffers[cpu]; 3333 /* This is the shift value for the above recursive locking */ 3334 cpu_buffer->nest += NESTED_BITS; 3335 } 3336 3337 /** 3338 * ring_buffer_nest_end - Allow to trace while nested 3339 * @buffer: The ring buffer to modify 3340 * 3341 * Must be called after ring_buffer_nest_start() and after the 3342 * ring_buffer_unlock_commit(). 3343 */ 3344 void ring_buffer_nest_end(struct trace_buffer *buffer) 3345 { 3346 struct ring_buffer_per_cpu *cpu_buffer; 3347 int cpu; 3348 3349 /* disabled by ring_buffer_nest_start() */ 3350 cpu = raw_smp_processor_id(); 3351 cpu_buffer = buffer->buffers[cpu]; 3352 /* This is the shift value for the above recursive locking */ 3353 cpu_buffer->nest -= NESTED_BITS; 3354 preempt_enable_notrace(); 3355 } 3356 3357 /** 3358 * ring_buffer_unlock_commit - commit a reserved 3359 * @buffer: The buffer to commit to 3360 * 3361 * This commits the data to the ring buffer, and releases any locks held. 3362 * 3363 * Must be paired with ring_buffer_lock_reserve. 3364 */ 3365 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3366 { 3367 struct ring_buffer_per_cpu *cpu_buffer; 3368 int cpu = raw_smp_processor_id(); 3369 3370 cpu_buffer = buffer->buffers[cpu]; 3371 3372 rb_commit(cpu_buffer); 3373 3374 rb_wakeups(buffer, cpu_buffer); 3375 3376 trace_recursive_unlock(cpu_buffer); 3377 3378 preempt_enable_notrace(); 3379 3380 return 0; 3381 } 3382 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3383 3384 /* Special value to validate all deltas on a page. */ 3385 #define CHECK_FULL_PAGE 1L 3386 3387 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3388 static void dump_buffer_page(struct buffer_data_page *bpage, 3389 struct rb_event_info *info, 3390 unsigned long tail) 3391 { 3392 struct ring_buffer_event *event; 3393 u64 ts, delta; 3394 int e; 3395 3396 ts = bpage->time_stamp; 3397 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3398 3399 for (e = 0; e < tail; e += rb_event_length(event)) { 3400 3401 event = (struct ring_buffer_event *)(bpage->data + e); 3402 3403 switch (event->type_len) { 3404 3405 case RINGBUF_TYPE_TIME_EXTEND: 3406 delta = rb_event_time_stamp(event); 3407 ts += delta; 3408 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta); 3409 break; 3410 3411 case RINGBUF_TYPE_TIME_STAMP: 3412 delta = rb_event_time_stamp(event); 3413 ts = rb_fix_abs_ts(delta, ts); 3414 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta); 3415 break; 3416 3417 case RINGBUF_TYPE_PADDING: 3418 ts += event->time_delta; 3419 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta); 3420 break; 3421 3422 case RINGBUF_TYPE_DATA: 3423 ts += event->time_delta; 3424 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta); 3425 break; 3426 3427 default: 3428 break; 3429 } 3430 } 3431 } 3432 3433 static DEFINE_PER_CPU(atomic_t, checking); 3434 static atomic_t ts_dump; 3435 3436 /* 3437 * Check if the current event time stamp matches the deltas on 3438 * the buffer page. 3439 */ 3440 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3441 struct rb_event_info *info, 3442 unsigned long tail) 3443 { 3444 struct ring_buffer_event *event; 3445 struct buffer_data_page *bpage; 3446 u64 ts, delta; 3447 bool full = false; 3448 int e; 3449 3450 bpage = info->tail_page->page; 3451 3452 if (tail == CHECK_FULL_PAGE) { 3453 full = true; 3454 tail = local_read(&bpage->commit); 3455 } else if (info->add_timestamp & 3456 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3457 /* Ignore events with absolute time stamps */ 3458 return; 3459 } 3460 3461 /* 3462 * Do not check the first event (skip possible extends too). 3463 * Also do not check if previous events have not been committed. 3464 */ 3465 if (tail <= 8 || tail > local_read(&bpage->commit)) 3466 return; 3467 3468 /* 3469 * If this interrupted another event, 3470 */ 3471 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3472 goto out; 3473 3474 ts = bpage->time_stamp; 3475 3476 for (e = 0; e < tail; e += rb_event_length(event)) { 3477 3478 event = (struct ring_buffer_event *)(bpage->data + e); 3479 3480 switch (event->type_len) { 3481 3482 case RINGBUF_TYPE_TIME_EXTEND: 3483 delta = rb_event_time_stamp(event); 3484 ts += delta; 3485 break; 3486 3487 case RINGBUF_TYPE_TIME_STAMP: 3488 delta = rb_event_time_stamp(event); 3489 ts = rb_fix_abs_ts(delta, ts); 3490 break; 3491 3492 case RINGBUF_TYPE_PADDING: 3493 if (event->time_delta == 1) 3494 break; 3495 fallthrough; 3496 case RINGBUF_TYPE_DATA: 3497 ts += event->time_delta; 3498 break; 3499 3500 default: 3501 RB_WARN_ON(cpu_buffer, 1); 3502 } 3503 } 3504 if ((full && ts > info->ts) || 3505 (!full && ts + info->delta != info->ts)) { 3506 /* If another report is happening, ignore this one */ 3507 if (atomic_inc_return(&ts_dump) != 1) { 3508 atomic_dec(&ts_dump); 3509 goto out; 3510 } 3511 atomic_inc(&cpu_buffer->record_disabled); 3512 /* There's some cases in boot up that this can happen */ 3513 WARN_ON_ONCE(system_state != SYSTEM_BOOTING); 3514 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n", 3515 cpu_buffer->cpu, 3516 ts + info->delta, info->ts, info->delta, 3517 info->before, info->after, 3518 full ? " (full)" : ""); 3519 dump_buffer_page(bpage, info, tail); 3520 atomic_dec(&ts_dump); 3521 /* Do not re-enable checking */ 3522 return; 3523 } 3524 out: 3525 atomic_dec(this_cpu_ptr(&checking)); 3526 } 3527 #else 3528 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3529 struct rb_event_info *info, 3530 unsigned long tail) 3531 { 3532 } 3533 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3534 3535 static struct ring_buffer_event * 3536 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3537 struct rb_event_info *info) 3538 { 3539 struct ring_buffer_event *event; 3540 struct buffer_page *tail_page; 3541 unsigned long tail, write, w; 3542 bool a_ok; 3543 bool b_ok; 3544 3545 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3546 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3547 3548 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3549 barrier(); 3550 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3551 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3552 barrier(); 3553 info->ts = rb_time_stamp(cpu_buffer->buffer); 3554 3555 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3556 info->delta = info->ts; 3557 } else { 3558 /* 3559 * If interrupting an event time update, we may need an 3560 * absolute timestamp. 3561 * Don't bother if this is the start of a new page (w == 0). 3562 */ 3563 if (!w) { 3564 /* Use the sub-buffer timestamp */ 3565 info->delta = 0; 3566 } else if (unlikely(!a_ok || !b_ok || info->before != info->after)) { 3567 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3568 info->length += RB_LEN_TIME_EXTEND; 3569 } else { 3570 info->delta = info->ts - info->after; 3571 if (unlikely(test_time_stamp(info->delta))) { 3572 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3573 info->length += RB_LEN_TIME_EXTEND; 3574 } 3575 } 3576 } 3577 3578 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3579 3580 /*C*/ write = local_add_return(info->length, &tail_page->write); 3581 3582 /* set write to only the index of the write */ 3583 write &= RB_WRITE_MASK; 3584 3585 tail = write - info->length; 3586 3587 /* See if we shot pass the end of this buffer page */ 3588 if (unlikely(write > BUF_PAGE_SIZE)) { 3589 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3590 return rb_move_tail(cpu_buffer, tail, info); 3591 } 3592 3593 if (likely(tail == w)) { 3594 /* Nothing interrupted us between A and C */ 3595 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3596 /* 3597 * If something came in between C and D, the write stamp 3598 * may now not be in sync. But that's fine as the before_stamp 3599 * will be different and then next event will just be forced 3600 * to use an absolute timestamp. 3601 */ 3602 if (likely(!(info->add_timestamp & 3603 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3604 /* This did not interrupt any time update */ 3605 info->delta = info->ts - info->after; 3606 else 3607 /* Just use full timestamp for interrupting event */ 3608 info->delta = info->ts; 3609 check_buffer(cpu_buffer, info, tail); 3610 } else { 3611 u64 ts; 3612 /* SLOW PATH - Interrupted between A and C */ 3613 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3614 /* Was interrupted before here, write_stamp must be valid */ 3615 RB_WARN_ON(cpu_buffer, !a_ok); 3616 ts = rb_time_stamp(cpu_buffer->buffer); 3617 barrier(); 3618 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3619 info->after < ts && 3620 rb_time_cmpxchg(&cpu_buffer->write_stamp, 3621 info->after, ts)) { 3622 /* Nothing came after this event between C and E */ 3623 info->delta = ts - info->after; 3624 } else { 3625 /* 3626 * Interrupted between C and E: 3627 * Lost the previous events time stamp. Just set the 3628 * delta to zero, and this will be the same time as 3629 * the event this event interrupted. And the events that 3630 * came after this will still be correct (as they would 3631 * have built their delta on the previous event. 3632 */ 3633 info->delta = 0; 3634 } 3635 info->ts = ts; 3636 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3637 } 3638 3639 /* 3640 * If this is the first commit on the page, then it has the same 3641 * timestamp as the page itself. 3642 */ 3643 if (unlikely(!tail && !(info->add_timestamp & 3644 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3645 info->delta = 0; 3646 3647 /* We reserved something on the buffer */ 3648 3649 event = __rb_page_index(tail_page, tail); 3650 rb_update_event(cpu_buffer, event, info); 3651 3652 local_inc(&tail_page->entries); 3653 3654 /* 3655 * If this is the first commit on the page, then update 3656 * its timestamp. 3657 */ 3658 if (unlikely(!tail)) 3659 tail_page->page->time_stamp = info->ts; 3660 3661 /* account for these added bytes */ 3662 local_add(info->length, &cpu_buffer->entries_bytes); 3663 3664 return event; 3665 } 3666 3667 static __always_inline struct ring_buffer_event * 3668 rb_reserve_next_event(struct trace_buffer *buffer, 3669 struct ring_buffer_per_cpu *cpu_buffer, 3670 unsigned long length) 3671 { 3672 struct ring_buffer_event *event; 3673 struct rb_event_info info; 3674 int nr_loops = 0; 3675 int add_ts_default; 3676 3677 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 3678 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 3679 (unlikely(in_nmi()))) { 3680 return NULL; 3681 } 3682 3683 rb_start_commit(cpu_buffer); 3684 /* The commit page can not change after this */ 3685 3686 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3687 /* 3688 * Due to the ability to swap a cpu buffer from a buffer 3689 * it is possible it was swapped before we committed. 3690 * (committing stops a swap). We check for it here and 3691 * if it happened, we have to fail the write. 3692 */ 3693 barrier(); 3694 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3695 local_dec(&cpu_buffer->committing); 3696 local_dec(&cpu_buffer->commits); 3697 return NULL; 3698 } 3699 #endif 3700 3701 info.length = rb_calculate_event_length(length); 3702 3703 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3704 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3705 info.length += RB_LEN_TIME_EXTEND; 3706 if (info.length > BUF_MAX_DATA_SIZE) 3707 goto out_fail; 3708 } else { 3709 add_ts_default = RB_ADD_STAMP_NONE; 3710 } 3711 3712 again: 3713 info.add_timestamp = add_ts_default; 3714 info.delta = 0; 3715 3716 /* 3717 * We allow for interrupts to reenter here and do a trace. 3718 * If one does, it will cause this original code to loop 3719 * back here. Even with heavy interrupts happening, this 3720 * should only happen a few times in a row. If this happens 3721 * 1000 times in a row, there must be either an interrupt 3722 * storm or we have something buggy. 3723 * Bail! 3724 */ 3725 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3726 goto out_fail; 3727 3728 event = __rb_reserve_next(cpu_buffer, &info); 3729 3730 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3731 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3732 info.length -= RB_LEN_TIME_EXTEND; 3733 goto again; 3734 } 3735 3736 if (likely(event)) 3737 return event; 3738 out_fail: 3739 rb_end_commit(cpu_buffer); 3740 return NULL; 3741 } 3742 3743 /** 3744 * ring_buffer_lock_reserve - reserve a part of the buffer 3745 * @buffer: the ring buffer to reserve from 3746 * @length: the length of the data to reserve (excluding event header) 3747 * 3748 * Returns a reserved event on the ring buffer to copy directly to. 3749 * The user of this interface will need to get the body to write into 3750 * and can use the ring_buffer_event_data() interface. 3751 * 3752 * The length is the length of the data needed, not the event length 3753 * which also includes the event header. 3754 * 3755 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3756 * If NULL is returned, then nothing has been allocated or locked. 3757 */ 3758 struct ring_buffer_event * 3759 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3760 { 3761 struct ring_buffer_per_cpu *cpu_buffer; 3762 struct ring_buffer_event *event; 3763 int cpu; 3764 3765 /* If we are tracing schedule, we don't want to recurse */ 3766 preempt_disable_notrace(); 3767 3768 if (unlikely(atomic_read(&buffer->record_disabled))) 3769 goto out; 3770 3771 cpu = raw_smp_processor_id(); 3772 3773 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3774 goto out; 3775 3776 cpu_buffer = buffer->buffers[cpu]; 3777 3778 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3779 goto out; 3780 3781 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3782 goto out; 3783 3784 if (unlikely(trace_recursive_lock(cpu_buffer))) 3785 goto out; 3786 3787 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3788 if (!event) 3789 goto out_unlock; 3790 3791 return event; 3792 3793 out_unlock: 3794 trace_recursive_unlock(cpu_buffer); 3795 out: 3796 preempt_enable_notrace(); 3797 return NULL; 3798 } 3799 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3800 3801 /* 3802 * Decrement the entries to the page that an event is on. 3803 * The event does not even need to exist, only the pointer 3804 * to the page it is on. This may only be called before the commit 3805 * takes place. 3806 */ 3807 static inline void 3808 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3809 struct ring_buffer_event *event) 3810 { 3811 unsigned long addr = (unsigned long)event; 3812 struct buffer_page *bpage = cpu_buffer->commit_page; 3813 struct buffer_page *start; 3814 3815 addr &= PAGE_MASK; 3816 3817 /* Do the likely case first */ 3818 if (likely(bpage->page == (void *)addr)) { 3819 local_dec(&bpage->entries); 3820 return; 3821 } 3822 3823 /* 3824 * Because the commit page may be on the reader page we 3825 * start with the next page and check the end loop there. 3826 */ 3827 rb_inc_page(&bpage); 3828 start = bpage; 3829 do { 3830 if (bpage->page == (void *)addr) { 3831 local_dec(&bpage->entries); 3832 return; 3833 } 3834 rb_inc_page(&bpage); 3835 } while (bpage != start); 3836 3837 /* commit not part of this buffer?? */ 3838 RB_WARN_ON(cpu_buffer, 1); 3839 } 3840 3841 /** 3842 * ring_buffer_discard_commit - discard an event that has not been committed 3843 * @buffer: the ring buffer 3844 * @event: non committed event to discard 3845 * 3846 * Sometimes an event that is in the ring buffer needs to be ignored. 3847 * This function lets the user discard an event in the ring buffer 3848 * and then that event will not be read later. 3849 * 3850 * This function only works if it is called before the item has been 3851 * committed. It will try to free the event from the ring buffer 3852 * if another event has not been added behind it. 3853 * 3854 * If another event has been added behind it, it will set the event 3855 * up as discarded, and perform the commit. 3856 * 3857 * If this function is called, do not call ring_buffer_unlock_commit on 3858 * the event. 3859 */ 3860 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3861 struct ring_buffer_event *event) 3862 { 3863 struct ring_buffer_per_cpu *cpu_buffer; 3864 int cpu; 3865 3866 /* The event is discarded regardless */ 3867 rb_event_discard(event); 3868 3869 cpu = smp_processor_id(); 3870 cpu_buffer = buffer->buffers[cpu]; 3871 3872 /* 3873 * This must only be called if the event has not been 3874 * committed yet. Thus we can assume that preemption 3875 * is still disabled. 3876 */ 3877 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3878 3879 rb_decrement_entry(cpu_buffer, event); 3880 if (rb_try_to_discard(cpu_buffer, event)) 3881 goto out; 3882 3883 out: 3884 rb_end_commit(cpu_buffer); 3885 3886 trace_recursive_unlock(cpu_buffer); 3887 3888 preempt_enable_notrace(); 3889 3890 } 3891 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3892 3893 /** 3894 * ring_buffer_write - write data to the buffer without reserving 3895 * @buffer: The ring buffer to write to. 3896 * @length: The length of the data being written (excluding the event header) 3897 * @data: The data to write to the buffer. 3898 * 3899 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3900 * one function. If you already have the data to write to the buffer, it 3901 * may be easier to simply call this function. 3902 * 3903 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3904 * and not the length of the event which would hold the header. 3905 */ 3906 int ring_buffer_write(struct trace_buffer *buffer, 3907 unsigned long length, 3908 void *data) 3909 { 3910 struct ring_buffer_per_cpu *cpu_buffer; 3911 struct ring_buffer_event *event; 3912 void *body; 3913 int ret = -EBUSY; 3914 int cpu; 3915 3916 preempt_disable_notrace(); 3917 3918 if (atomic_read(&buffer->record_disabled)) 3919 goto out; 3920 3921 cpu = raw_smp_processor_id(); 3922 3923 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3924 goto out; 3925 3926 cpu_buffer = buffer->buffers[cpu]; 3927 3928 if (atomic_read(&cpu_buffer->record_disabled)) 3929 goto out; 3930 3931 if (length > BUF_MAX_DATA_SIZE) 3932 goto out; 3933 3934 if (unlikely(trace_recursive_lock(cpu_buffer))) 3935 goto out; 3936 3937 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3938 if (!event) 3939 goto out_unlock; 3940 3941 body = rb_event_data(event); 3942 3943 memcpy(body, data, length); 3944 3945 rb_commit(cpu_buffer); 3946 3947 rb_wakeups(buffer, cpu_buffer); 3948 3949 ret = 0; 3950 3951 out_unlock: 3952 trace_recursive_unlock(cpu_buffer); 3953 3954 out: 3955 preempt_enable_notrace(); 3956 3957 return ret; 3958 } 3959 EXPORT_SYMBOL_GPL(ring_buffer_write); 3960 3961 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3962 { 3963 struct buffer_page *reader = cpu_buffer->reader_page; 3964 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3965 struct buffer_page *commit = cpu_buffer->commit_page; 3966 3967 /* In case of error, head will be NULL */ 3968 if (unlikely(!head)) 3969 return true; 3970 3971 /* Reader should exhaust content in reader page */ 3972 if (reader->read != rb_page_commit(reader)) 3973 return false; 3974 3975 /* 3976 * If writers are committing on the reader page, knowing all 3977 * committed content has been read, the ring buffer is empty. 3978 */ 3979 if (commit == reader) 3980 return true; 3981 3982 /* 3983 * If writers are committing on a page other than reader page 3984 * and head page, there should always be content to read. 3985 */ 3986 if (commit != head) 3987 return false; 3988 3989 /* 3990 * Writers are committing on the head page, we just need 3991 * to care about there're committed data, and the reader will 3992 * swap reader page with head page when it is to read data. 3993 */ 3994 return rb_page_commit(commit) == 0; 3995 } 3996 3997 /** 3998 * ring_buffer_record_disable - stop all writes into the buffer 3999 * @buffer: The ring buffer to stop writes to. 4000 * 4001 * This prevents all writes to the buffer. Any attempt to write 4002 * to the buffer after this will fail and return NULL. 4003 * 4004 * The caller should call synchronize_rcu() after this. 4005 */ 4006 void ring_buffer_record_disable(struct trace_buffer *buffer) 4007 { 4008 atomic_inc(&buffer->record_disabled); 4009 } 4010 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4011 4012 /** 4013 * ring_buffer_record_enable - enable writes to the buffer 4014 * @buffer: The ring buffer to enable writes 4015 * 4016 * Note, multiple disables will need the same number of enables 4017 * to truly enable the writing (much like preempt_disable). 4018 */ 4019 void ring_buffer_record_enable(struct trace_buffer *buffer) 4020 { 4021 atomic_dec(&buffer->record_disabled); 4022 } 4023 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4024 4025 /** 4026 * ring_buffer_record_off - stop all writes into the buffer 4027 * @buffer: The ring buffer to stop writes to. 4028 * 4029 * This prevents all writes to the buffer. Any attempt to write 4030 * to the buffer after this will fail and return NULL. 4031 * 4032 * This is different than ring_buffer_record_disable() as 4033 * it works like an on/off switch, where as the disable() version 4034 * must be paired with a enable(). 4035 */ 4036 void ring_buffer_record_off(struct trace_buffer *buffer) 4037 { 4038 unsigned int rd; 4039 unsigned int new_rd; 4040 4041 rd = atomic_read(&buffer->record_disabled); 4042 do { 4043 new_rd = rd | RB_BUFFER_OFF; 4044 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4045 } 4046 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4047 4048 /** 4049 * ring_buffer_record_on - restart writes into the buffer 4050 * @buffer: The ring buffer to start writes to. 4051 * 4052 * This enables all writes to the buffer that was disabled by 4053 * ring_buffer_record_off(). 4054 * 4055 * This is different than ring_buffer_record_enable() as 4056 * it works like an on/off switch, where as the enable() version 4057 * must be paired with a disable(). 4058 */ 4059 void ring_buffer_record_on(struct trace_buffer *buffer) 4060 { 4061 unsigned int rd; 4062 unsigned int new_rd; 4063 4064 rd = atomic_read(&buffer->record_disabled); 4065 do { 4066 new_rd = rd & ~RB_BUFFER_OFF; 4067 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4068 } 4069 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4070 4071 /** 4072 * ring_buffer_record_is_on - return true if the ring buffer can write 4073 * @buffer: The ring buffer to see if write is enabled 4074 * 4075 * Returns true if the ring buffer is in a state that it accepts writes. 4076 */ 4077 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4078 { 4079 return !atomic_read(&buffer->record_disabled); 4080 } 4081 4082 /** 4083 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4084 * @buffer: The ring buffer to see if write is set enabled 4085 * 4086 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4087 * Note that this does NOT mean it is in a writable state. 4088 * 4089 * It may return true when the ring buffer has been disabled by 4090 * ring_buffer_record_disable(), as that is a temporary disabling of 4091 * the ring buffer. 4092 */ 4093 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4094 { 4095 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4096 } 4097 4098 /** 4099 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4100 * @buffer: The ring buffer to stop writes to. 4101 * @cpu: The CPU buffer to stop 4102 * 4103 * This prevents all writes to the buffer. Any attempt to write 4104 * to the buffer after this will fail and return NULL. 4105 * 4106 * The caller should call synchronize_rcu() after this. 4107 */ 4108 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4109 { 4110 struct ring_buffer_per_cpu *cpu_buffer; 4111 4112 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4113 return; 4114 4115 cpu_buffer = buffer->buffers[cpu]; 4116 atomic_inc(&cpu_buffer->record_disabled); 4117 } 4118 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4119 4120 /** 4121 * ring_buffer_record_enable_cpu - enable writes to the buffer 4122 * @buffer: The ring buffer to enable writes 4123 * @cpu: The CPU to enable. 4124 * 4125 * Note, multiple disables will need the same number of enables 4126 * to truly enable the writing (much like preempt_disable). 4127 */ 4128 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4129 { 4130 struct ring_buffer_per_cpu *cpu_buffer; 4131 4132 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4133 return; 4134 4135 cpu_buffer = buffer->buffers[cpu]; 4136 atomic_dec(&cpu_buffer->record_disabled); 4137 } 4138 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4139 4140 /* 4141 * The total entries in the ring buffer is the running counter 4142 * of entries entered into the ring buffer, minus the sum of 4143 * the entries read from the ring buffer and the number of 4144 * entries that were overwritten. 4145 */ 4146 static inline unsigned long 4147 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4148 { 4149 return local_read(&cpu_buffer->entries) - 4150 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4151 } 4152 4153 /** 4154 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4155 * @buffer: The ring buffer 4156 * @cpu: The per CPU buffer to read from. 4157 */ 4158 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4159 { 4160 unsigned long flags; 4161 struct ring_buffer_per_cpu *cpu_buffer; 4162 struct buffer_page *bpage; 4163 u64 ret = 0; 4164 4165 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4166 return 0; 4167 4168 cpu_buffer = buffer->buffers[cpu]; 4169 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4170 /* 4171 * if the tail is on reader_page, oldest time stamp is on the reader 4172 * page 4173 */ 4174 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4175 bpage = cpu_buffer->reader_page; 4176 else 4177 bpage = rb_set_head_page(cpu_buffer); 4178 if (bpage) 4179 ret = bpage->page->time_stamp; 4180 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4181 4182 return ret; 4183 } 4184 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4185 4186 /** 4187 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4188 * @buffer: The ring buffer 4189 * @cpu: The per CPU buffer to read from. 4190 */ 4191 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4192 { 4193 struct ring_buffer_per_cpu *cpu_buffer; 4194 unsigned long ret; 4195 4196 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4197 return 0; 4198 4199 cpu_buffer = buffer->buffers[cpu]; 4200 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4201 4202 return ret; 4203 } 4204 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4205 4206 /** 4207 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4208 * @buffer: The ring buffer 4209 * @cpu: The per CPU buffer to get the entries from. 4210 */ 4211 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4212 { 4213 struct ring_buffer_per_cpu *cpu_buffer; 4214 4215 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4216 return 0; 4217 4218 cpu_buffer = buffer->buffers[cpu]; 4219 4220 return rb_num_of_entries(cpu_buffer); 4221 } 4222 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4223 4224 /** 4225 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4226 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4227 * @buffer: The ring buffer 4228 * @cpu: The per CPU buffer to get the number of overruns from 4229 */ 4230 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4231 { 4232 struct ring_buffer_per_cpu *cpu_buffer; 4233 unsigned long ret; 4234 4235 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4236 return 0; 4237 4238 cpu_buffer = buffer->buffers[cpu]; 4239 ret = local_read(&cpu_buffer->overrun); 4240 4241 return ret; 4242 } 4243 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4244 4245 /** 4246 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4247 * commits failing due to the buffer wrapping around while there are uncommitted 4248 * events, such as during an interrupt storm. 4249 * @buffer: The ring buffer 4250 * @cpu: The per CPU buffer to get the number of overruns from 4251 */ 4252 unsigned long 4253 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4254 { 4255 struct ring_buffer_per_cpu *cpu_buffer; 4256 unsigned long ret; 4257 4258 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4259 return 0; 4260 4261 cpu_buffer = buffer->buffers[cpu]; 4262 ret = local_read(&cpu_buffer->commit_overrun); 4263 4264 return ret; 4265 } 4266 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4267 4268 /** 4269 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4270 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4271 * @buffer: The ring buffer 4272 * @cpu: The per CPU buffer to get the number of overruns from 4273 */ 4274 unsigned long 4275 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4276 { 4277 struct ring_buffer_per_cpu *cpu_buffer; 4278 unsigned long ret; 4279 4280 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4281 return 0; 4282 4283 cpu_buffer = buffer->buffers[cpu]; 4284 ret = local_read(&cpu_buffer->dropped_events); 4285 4286 return ret; 4287 } 4288 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4289 4290 /** 4291 * ring_buffer_read_events_cpu - get the number of events successfully read 4292 * @buffer: The ring buffer 4293 * @cpu: The per CPU buffer to get the number of events read 4294 */ 4295 unsigned long 4296 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4297 { 4298 struct ring_buffer_per_cpu *cpu_buffer; 4299 4300 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4301 return 0; 4302 4303 cpu_buffer = buffer->buffers[cpu]; 4304 return cpu_buffer->read; 4305 } 4306 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4307 4308 /** 4309 * ring_buffer_entries - get the number of entries in a buffer 4310 * @buffer: The ring buffer 4311 * 4312 * Returns the total number of entries in the ring buffer 4313 * (all CPU entries) 4314 */ 4315 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4316 { 4317 struct ring_buffer_per_cpu *cpu_buffer; 4318 unsigned long entries = 0; 4319 int cpu; 4320 4321 /* if you care about this being correct, lock the buffer */ 4322 for_each_buffer_cpu(buffer, cpu) { 4323 cpu_buffer = buffer->buffers[cpu]; 4324 entries += rb_num_of_entries(cpu_buffer); 4325 } 4326 4327 return entries; 4328 } 4329 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4330 4331 /** 4332 * ring_buffer_overruns - get the number of overruns in buffer 4333 * @buffer: The ring buffer 4334 * 4335 * Returns the total number of overruns in the ring buffer 4336 * (all CPU entries) 4337 */ 4338 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4339 { 4340 struct ring_buffer_per_cpu *cpu_buffer; 4341 unsigned long overruns = 0; 4342 int cpu; 4343 4344 /* if you care about this being correct, lock the buffer */ 4345 for_each_buffer_cpu(buffer, cpu) { 4346 cpu_buffer = buffer->buffers[cpu]; 4347 overruns += local_read(&cpu_buffer->overrun); 4348 } 4349 4350 return overruns; 4351 } 4352 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4353 4354 static void rb_iter_reset(struct ring_buffer_iter *iter) 4355 { 4356 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4357 4358 /* Iterator usage is expected to have record disabled */ 4359 iter->head_page = cpu_buffer->reader_page; 4360 iter->head = cpu_buffer->reader_page->read; 4361 iter->next_event = iter->head; 4362 4363 iter->cache_reader_page = iter->head_page; 4364 iter->cache_read = cpu_buffer->read; 4365 iter->cache_pages_removed = cpu_buffer->pages_removed; 4366 4367 if (iter->head) { 4368 iter->read_stamp = cpu_buffer->read_stamp; 4369 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4370 } else { 4371 iter->read_stamp = iter->head_page->page->time_stamp; 4372 iter->page_stamp = iter->read_stamp; 4373 } 4374 } 4375 4376 /** 4377 * ring_buffer_iter_reset - reset an iterator 4378 * @iter: The iterator to reset 4379 * 4380 * Resets the iterator, so that it will start from the beginning 4381 * again. 4382 */ 4383 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4384 { 4385 struct ring_buffer_per_cpu *cpu_buffer; 4386 unsigned long flags; 4387 4388 if (!iter) 4389 return; 4390 4391 cpu_buffer = iter->cpu_buffer; 4392 4393 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4394 rb_iter_reset(iter); 4395 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4396 } 4397 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4398 4399 /** 4400 * ring_buffer_iter_empty - check if an iterator has no more to read 4401 * @iter: The iterator to check 4402 */ 4403 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4404 { 4405 struct ring_buffer_per_cpu *cpu_buffer; 4406 struct buffer_page *reader; 4407 struct buffer_page *head_page; 4408 struct buffer_page *commit_page; 4409 struct buffer_page *curr_commit_page; 4410 unsigned commit; 4411 u64 curr_commit_ts; 4412 u64 commit_ts; 4413 4414 cpu_buffer = iter->cpu_buffer; 4415 reader = cpu_buffer->reader_page; 4416 head_page = cpu_buffer->head_page; 4417 commit_page = cpu_buffer->commit_page; 4418 commit_ts = commit_page->page->time_stamp; 4419 4420 /* 4421 * When the writer goes across pages, it issues a cmpxchg which 4422 * is a mb(), which will synchronize with the rmb here. 4423 * (see rb_tail_page_update()) 4424 */ 4425 smp_rmb(); 4426 commit = rb_page_commit(commit_page); 4427 /* We want to make sure that the commit page doesn't change */ 4428 smp_rmb(); 4429 4430 /* Make sure commit page didn't change */ 4431 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4432 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4433 4434 /* If the commit page changed, then there's more data */ 4435 if (curr_commit_page != commit_page || 4436 curr_commit_ts != commit_ts) 4437 return 0; 4438 4439 /* Still racy, as it may return a false positive, but that's OK */ 4440 return ((iter->head_page == commit_page && iter->head >= commit) || 4441 (iter->head_page == reader && commit_page == head_page && 4442 head_page->read == commit && 4443 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4444 } 4445 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4446 4447 static void 4448 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4449 struct ring_buffer_event *event) 4450 { 4451 u64 delta; 4452 4453 switch (event->type_len) { 4454 case RINGBUF_TYPE_PADDING: 4455 return; 4456 4457 case RINGBUF_TYPE_TIME_EXTEND: 4458 delta = rb_event_time_stamp(event); 4459 cpu_buffer->read_stamp += delta; 4460 return; 4461 4462 case RINGBUF_TYPE_TIME_STAMP: 4463 delta = rb_event_time_stamp(event); 4464 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4465 cpu_buffer->read_stamp = delta; 4466 return; 4467 4468 case RINGBUF_TYPE_DATA: 4469 cpu_buffer->read_stamp += event->time_delta; 4470 return; 4471 4472 default: 4473 RB_WARN_ON(cpu_buffer, 1); 4474 } 4475 } 4476 4477 static void 4478 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4479 struct ring_buffer_event *event) 4480 { 4481 u64 delta; 4482 4483 switch (event->type_len) { 4484 case RINGBUF_TYPE_PADDING: 4485 return; 4486 4487 case RINGBUF_TYPE_TIME_EXTEND: 4488 delta = rb_event_time_stamp(event); 4489 iter->read_stamp += delta; 4490 return; 4491 4492 case RINGBUF_TYPE_TIME_STAMP: 4493 delta = rb_event_time_stamp(event); 4494 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4495 iter->read_stamp = delta; 4496 return; 4497 4498 case RINGBUF_TYPE_DATA: 4499 iter->read_stamp += event->time_delta; 4500 return; 4501 4502 default: 4503 RB_WARN_ON(iter->cpu_buffer, 1); 4504 } 4505 } 4506 4507 static struct buffer_page * 4508 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4509 { 4510 struct buffer_page *reader = NULL; 4511 unsigned long overwrite; 4512 unsigned long flags; 4513 int nr_loops = 0; 4514 bool ret; 4515 4516 local_irq_save(flags); 4517 arch_spin_lock(&cpu_buffer->lock); 4518 4519 again: 4520 /* 4521 * This should normally only loop twice. But because the 4522 * start of the reader inserts an empty page, it causes 4523 * a case where we will loop three times. There should be no 4524 * reason to loop four times (that I know of). 4525 */ 4526 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4527 reader = NULL; 4528 goto out; 4529 } 4530 4531 reader = cpu_buffer->reader_page; 4532 4533 /* If there's more to read, return this page */ 4534 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4535 goto out; 4536 4537 /* Never should we have an index greater than the size */ 4538 if (RB_WARN_ON(cpu_buffer, 4539 cpu_buffer->reader_page->read > rb_page_size(reader))) 4540 goto out; 4541 4542 /* check if we caught up to the tail */ 4543 reader = NULL; 4544 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4545 goto out; 4546 4547 /* Don't bother swapping if the ring buffer is empty */ 4548 if (rb_num_of_entries(cpu_buffer) == 0) 4549 goto out; 4550 4551 /* 4552 * Reset the reader page to size zero. 4553 */ 4554 local_set(&cpu_buffer->reader_page->write, 0); 4555 local_set(&cpu_buffer->reader_page->entries, 0); 4556 local_set(&cpu_buffer->reader_page->page->commit, 0); 4557 cpu_buffer->reader_page->real_end = 0; 4558 4559 spin: 4560 /* 4561 * Splice the empty reader page into the list around the head. 4562 */ 4563 reader = rb_set_head_page(cpu_buffer); 4564 if (!reader) 4565 goto out; 4566 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4567 cpu_buffer->reader_page->list.prev = reader->list.prev; 4568 4569 /* 4570 * cpu_buffer->pages just needs to point to the buffer, it 4571 * has no specific buffer page to point to. Lets move it out 4572 * of our way so we don't accidentally swap it. 4573 */ 4574 cpu_buffer->pages = reader->list.prev; 4575 4576 /* The reader page will be pointing to the new head */ 4577 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4578 4579 /* 4580 * We want to make sure we read the overruns after we set up our 4581 * pointers to the next object. The writer side does a 4582 * cmpxchg to cross pages which acts as the mb on the writer 4583 * side. Note, the reader will constantly fail the swap 4584 * while the writer is updating the pointers, so this 4585 * guarantees that the overwrite recorded here is the one we 4586 * want to compare with the last_overrun. 4587 */ 4588 smp_mb(); 4589 overwrite = local_read(&(cpu_buffer->overrun)); 4590 4591 /* 4592 * Here's the tricky part. 4593 * 4594 * We need to move the pointer past the header page. 4595 * But we can only do that if a writer is not currently 4596 * moving it. The page before the header page has the 4597 * flag bit '1' set if it is pointing to the page we want. 4598 * but if the writer is in the process of moving it 4599 * than it will be '2' or already moved '0'. 4600 */ 4601 4602 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4603 4604 /* 4605 * If we did not convert it, then we must try again. 4606 */ 4607 if (!ret) 4608 goto spin; 4609 4610 /* 4611 * Yay! We succeeded in replacing the page. 4612 * 4613 * Now make the new head point back to the reader page. 4614 */ 4615 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4616 rb_inc_page(&cpu_buffer->head_page); 4617 4618 local_inc(&cpu_buffer->pages_read); 4619 4620 /* Finally update the reader page to the new head */ 4621 cpu_buffer->reader_page = reader; 4622 cpu_buffer->reader_page->read = 0; 4623 4624 if (overwrite != cpu_buffer->last_overrun) { 4625 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4626 cpu_buffer->last_overrun = overwrite; 4627 } 4628 4629 goto again; 4630 4631 out: 4632 /* Update the read_stamp on the first event */ 4633 if (reader && reader->read == 0) 4634 cpu_buffer->read_stamp = reader->page->time_stamp; 4635 4636 arch_spin_unlock(&cpu_buffer->lock); 4637 local_irq_restore(flags); 4638 4639 /* 4640 * The writer has preempt disable, wait for it. But not forever 4641 * Although, 1 second is pretty much "forever" 4642 */ 4643 #define USECS_WAIT 1000000 4644 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4645 /* If the write is past the end of page, a writer is still updating it */ 4646 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE)) 4647 break; 4648 4649 udelay(1); 4650 4651 /* Get the latest version of the reader write value */ 4652 smp_rmb(); 4653 } 4654 4655 /* The writer is not moving forward? Something is wrong */ 4656 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4657 reader = NULL; 4658 4659 /* 4660 * Make sure we see any padding after the write update 4661 * (see rb_reset_tail()). 4662 * 4663 * In addition, a writer may be writing on the reader page 4664 * if the page has not been fully filled, so the read barrier 4665 * is also needed to make sure we see the content of what is 4666 * committed by the writer (see rb_set_commit_to_write()). 4667 */ 4668 smp_rmb(); 4669 4670 4671 return reader; 4672 } 4673 4674 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4675 { 4676 struct ring_buffer_event *event; 4677 struct buffer_page *reader; 4678 unsigned length; 4679 4680 reader = rb_get_reader_page(cpu_buffer); 4681 4682 /* This function should not be called when buffer is empty */ 4683 if (RB_WARN_ON(cpu_buffer, !reader)) 4684 return; 4685 4686 event = rb_reader_event(cpu_buffer); 4687 4688 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4689 cpu_buffer->read++; 4690 4691 rb_update_read_stamp(cpu_buffer, event); 4692 4693 length = rb_event_length(event); 4694 cpu_buffer->reader_page->read += length; 4695 cpu_buffer->read_bytes += length; 4696 } 4697 4698 static void rb_advance_iter(struct ring_buffer_iter *iter) 4699 { 4700 struct ring_buffer_per_cpu *cpu_buffer; 4701 4702 cpu_buffer = iter->cpu_buffer; 4703 4704 /* If head == next_event then we need to jump to the next event */ 4705 if (iter->head == iter->next_event) { 4706 /* If the event gets overwritten again, there's nothing to do */ 4707 if (rb_iter_head_event(iter) == NULL) 4708 return; 4709 } 4710 4711 iter->head = iter->next_event; 4712 4713 /* 4714 * Check if we are at the end of the buffer. 4715 */ 4716 if (iter->next_event >= rb_page_size(iter->head_page)) { 4717 /* discarded commits can make the page empty */ 4718 if (iter->head_page == cpu_buffer->commit_page) 4719 return; 4720 rb_inc_iter(iter); 4721 return; 4722 } 4723 4724 rb_update_iter_read_stamp(iter, iter->event); 4725 } 4726 4727 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4728 { 4729 return cpu_buffer->lost_events; 4730 } 4731 4732 static struct ring_buffer_event * 4733 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4734 unsigned long *lost_events) 4735 { 4736 struct ring_buffer_event *event; 4737 struct buffer_page *reader; 4738 int nr_loops = 0; 4739 4740 if (ts) 4741 *ts = 0; 4742 again: 4743 /* 4744 * We repeat when a time extend is encountered. 4745 * Since the time extend is always attached to a data event, 4746 * we should never loop more than once. 4747 * (We never hit the following condition more than twice). 4748 */ 4749 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4750 return NULL; 4751 4752 reader = rb_get_reader_page(cpu_buffer); 4753 if (!reader) 4754 return NULL; 4755 4756 event = rb_reader_event(cpu_buffer); 4757 4758 switch (event->type_len) { 4759 case RINGBUF_TYPE_PADDING: 4760 if (rb_null_event(event)) 4761 RB_WARN_ON(cpu_buffer, 1); 4762 /* 4763 * Because the writer could be discarding every 4764 * event it creates (which would probably be bad) 4765 * if we were to go back to "again" then we may never 4766 * catch up, and will trigger the warn on, or lock 4767 * the box. Return the padding, and we will release 4768 * the current locks, and try again. 4769 */ 4770 return event; 4771 4772 case RINGBUF_TYPE_TIME_EXTEND: 4773 /* Internal data, OK to advance */ 4774 rb_advance_reader(cpu_buffer); 4775 goto again; 4776 4777 case RINGBUF_TYPE_TIME_STAMP: 4778 if (ts) { 4779 *ts = rb_event_time_stamp(event); 4780 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4781 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4782 cpu_buffer->cpu, ts); 4783 } 4784 /* Internal data, OK to advance */ 4785 rb_advance_reader(cpu_buffer); 4786 goto again; 4787 4788 case RINGBUF_TYPE_DATA: 4789 if (ts && !(*ts)) { 4790 *ts = cpu_buffer->read_stamp + event->time_delta; 4791 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4792 cpu_buffer->cpu, ts); 4793 } 4794 if (lost_events) 4795 *lost_events = rb_lost_events(cpu_buffer); 4796 return event; 4797 4798 default: 4799 RB_WARN_ON(cpu_buffer, 1); 4800 } 4801 4802 return NULL; 4803 } 4804 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4805 4806 static struct ring_buffer_event * 4807 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4808 { 4809 struct trace_buffer *buffer; 4810 struct ring_buffer_per_cpu *cpu_buffer; 4811 struct ring_buffer_event *event; 4812 int nr_loops = 0; 4813 4814 if (ts) 4815 *ts = 0; 4816 4817 cpu_buffer = iter->cpu_buffer; 4818 buffer = cpu_buffer->buffer; 4819 4820 /* 4821 * Check if someone performed a consuming read to the buffer 4822 * or removed some pages from the buffer. In these cases, 4823 * iterator was invalidated and we need to reset it. 4824 */ 4825 if (unlikely(iter->cache_read != cpu_buffer->read || 4826 iter->cache_reader_page != cpu_buffer->reader_page || 4827 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4828 rb_iter_reset(iter); 4829 4830 again: 4831 if (ring_buffer_iter_empty(iter)) 4832 return NULL; 4833 4834 /* 4835 * As the writer can mess with what the iterator is trying 4836 * to read, just give up if we fail to get an event after 4837 * three tries. The iterator is not as reliable when reading 4838 * the ring buffer with an active write as the consumer is. 4839 * Do not warn if the three failures is reached. 4840 */ 4841 if (++nr_loops > 3) 4842 return NULL; 4843 4844 if (rb_per_cpu_empty(cpu_buffer)) 4845 return NULL; 4846 4847 if (iter->head >= rb_page_size(iter->head_page)) { 4848 rb_inc_iter(iter); 4849 goto again; 4850 } 4851 4852 event = rb_iter_head_event(iter); 4853 if (!event) 4854 goto again; 4855 4856 switch (event->type_len) { 4857 case RINGBUF_TYPE_PADDING: 4858 if (rb_null_event(event)) { 4859 rb_inc_iter(iter); 4860 goto again; 4861 } 4862 rb_advance_iter(iter); 4863 return event; 4864 4865 case RINGBUF_TYPE_TIME_EXTEND: 4866 /* Internal data, OK to advance */ 4867 rb_advance_iter(iter); 4868 goto again; 4869 4870 case RINGBUF_TYPE_TIME_STAMP: 4871 if (ts) { 4872 *ts = rb_event_time_stamp(event); 4873 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4874 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4875 cpu_buffer->cpu, ts); 4876 } 4877 /* Internal data, OK to advance */ 4878 rb_advance_iter(iter); 4879 goto again; 4880 4881 case RINGBUF_TYPE_DATA: 4882 if (ts && !(*ts)) { 4883 *ts = iter->read_stamp + event->time_delta; 4884 ring_buffer_normalize_time_stamp(buffer, 4885 cpu_buffer->cpu, ts); 4886 } 4887 return event; 4888 4889 default: 4890 RB_WARN_ON(cpu_buffer, 1); 4891 } 4892 4893 return NULL; 4894 } 4895 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4896 4897 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4898 { 4899 if (likely(!in_nmi())) { 4900 raw_spin_lock(&cpu_buffer->reader_lock); 4901 return true; 4902 } 4903 4904 /* 4905 * If an NMI die dumps out the content of the ring buffer 4906 * trylock must be used to prevent a deadlock if the NMI 4907 * preempted a task that holds the ring buffer locks. If 4908 * we get the lock then all is fine, if not, then continue 4909 * to do the read, but this can corrupt the ring buffer, 4910 * so it must be permanently disabled from future writes. 4911 * Reading from NMI is a oneshot deal. 4912 */ 4913 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4914 return true; 4915 4916 /* Continue without locking, but disable the ring buffer */ 4917 atomic_inc(&cpu_buffer->record_disabled); 4918 return false; 4919 } 4920 4921 static inline void 4922 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4923 { 4924 if (likely(locked)) 4925 raw_spin_unlock(&cpu_buffer->reader_lock); 4926 } 4927 4928 /** 4929 * ring_buffer_peek - peek at the next event to be read 4930 * @buffer: The ring buffer to read 4931 * @cpu: The cpu to peak at 4932 * @ts: The timestamp counter of this event. 4933 * @lost_events: a variable to store if events were lost (may be NULL) 4934 * 4935 * This will return the event that will be read next, but does 4936 * not consume the data. 4937 */ 4938 struct ring_buffer_event * 4939 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4940 unsigned long *lost_events) 4941 { 4942 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4943 struct ring_buffer_event *event; 4944 unsigned long flags; 4945 bool dolock; 4946 4947 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4948 return NULL; 4949 4950 again: 4951 local_irq_save(flags); 4952 dolock = rb_reader_lock(cpu_buffer); 4953 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4954 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4955 rb_advance_reader(cpu_buffer); 4956 rb_reader_unlock(cpu_buffer, dolock); 4957 local_irq_restore(flags); 4958 4959 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4960 goto again; 4961 4962 return event; 4963 } 4964 4965 /** ring_buffer_iter_dropped - report if there are dropped events 4966 * @iter: The ring buffer iterator 4967 * 4968 * Returns true if there was dropped events since the last peek. 4969 */ 4970 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4971 { 4972 bool ret = iter->missed_events != 0; 4973 4974 iter->missed_events = 0; 4975 return ret; 4976 } 4977 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4978 4979 /** 4980 * ring_buffer_iter_peek - peek at the next event to be read 4981 * @iter: The ring buffer iterator 4982 * @ts: The timestamp counter of this event. 4983 * 4984 * This will return the event that will be read next, but does 4985 * not increment the iterator. 4986 */ 4987 struct ring_buffer_event * 4988 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4989 { 4990 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4991 struct ring_buffer_event *event; 4992 unsigned long flags; 4993 4994 again: 4995 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4996 event = rb_iter_peek(iter, ts); 4997 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4998 4999 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5000 goto again; 5001 5002 return event; 5003 } 5004 5005 /** 5006 * ring_buffer_consume - return an event and consume it 5007 * @buffer: The ring buffer to get the next event from 5008 * @cpu: the cpu to read the buffer from 5009 * @ts: a variable to store the timestamp (may be NULL) 5010 * @lost_events: a variable to store if events were lost (may be NULL) 5011 * 5012 * Returns the next event in the ring buffer, and that event is consumed. 5013 * Meaning, that sequential reads will keep returning a different event, 5014 * and eventually empty the ring buffer if the producer is slower. 5015 */ 5016 struct ring_buffer_event * 5017 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5018 unsigned long *lost_events) 5019 { 5020 struct ring_buffer_per_cpu *cpu_buffer; 5021 struct ring_buffer_event *event = NULL; 5022 unsigned long flags; 5023 bool dolock; 5024 5025 again: 5026 /* might be called in atomic */ 5027 preempt_disable(); 5028 5029 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5030 goto out; 5031 5032 cpu_buffer = buffer->buffers[cpu]; 5033 local_irq_save(flags); 5034 dolock = rb_reader_lock(cpu_buffer); 5035 5036 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5037 if (event) { 5038 cpu_buffer->lost_events = 0; 5039 rb_advance_reader(cpu_buffer); 5040 } 5041 5042 rb_reader_unlock(cpu_buffer, dolock); 5043 local_irq_restore(flags); 5044 5045 out: 5046 preempt_enable(); 5047 5048 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5049 goto again; 5050 5051 return event; 5052 } 5053 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5054 5055 /** 5056 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5057 * @buffer: The ring buffer to read from 5058 * @cpu: The cpu buffer to iterate over 5059 * @flags: gfp flags to use for memory allocation 5060 * 5061 * This performs the initial preparations necessary to iterate 5062 * through the buffer. Memory is allocated, buffer recording 5063 * is disabled, and the iterator pointer is returned to the caller. 5064 * 5065 * Disabling buffer recording prevents the reading from being 5066 * corrupted. This is not a consuming read, so a producer is not 5067 * expected. 5068 * 5069 * After a sequence of ring_buffer_read_prepare calls, the user is 5070 * expected to make at least one call to ring_buffer_read_prepare_sync. 5071 * Afterwards, ring_buffer_read_start is invoked to get things going 5072 * for real. 5073 * 5074 * This overall must be paired with ring_buffer_read_finish. 5075 */ 5076 struct ring_buffer_iter * 5077 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5078 { 5079 struct ring_buffer_per_cpu *cpu_buffer; 5080 struct ring_buffer_iter *iter; 5081 5082 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5083 return NULL; 5084 5085 iter = kzalloc(sizeof(*iter), flags); 5086 if (!iter) 5087 return NULL; 5088 5089 /* Holds the entire event: data and meta data */ 5090 iter->event = kmalloc(BUF_PAGE_SIZE, flags); 5091 if (!iter->event) { 5092 kfree(iter); 5093 return NULL; 5094 } 5095 5096 cpu_buffer = buffer->buffers[cpu]; 5097 5098 iter->cpu_buffer = cpu_buffer; 5099 5100 atomic_inc(&cpu_buffer->resize_disabled); 5101 5102 return iter; 5103 } 5104 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5105 5106 /** 5107 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5108 * 5109 * All previously invoked ring_buffer_read_prepare calls to prepare 5110 * iterators will be synchronized. Afterwards, read_buffer_read_start 5111 * calls on those iterators are allowed. 5112 */ 5113 void 5114 ring_buffer_read_prepare_sync(void) 5115 { 5116 synchronize_rcu(); 5117 } 5118 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5119 5120 /** 5121 * ring_buffer_read_start - start a non consuming read of the buffer 5122 * @iter: The iterator returned by ring_buffer_read_prepare 5123 * 5124 * This finalizes the startup of an iteration through the buffer. 5125 * The iterator comes from a call to ring_buffer_read_prepare and 5126 * an intervening ring_buffer_read_prepare_sync must have been 5127 * performed. 5128 * 5129 * Must be paired with ring_buffer_read_finish. 5130 */ 5131 void 5132 ring_buffer_read_start(struct ring_buffer_iter *iter) 5133 { 5134 struct ring_buffer_per_cpu *cpu_buffer; 5135 unsigned long flags; 5136 5137 if (!iter) 5138 return; 5139 5140 cpu_buffer = iter->cpu_buffer; 5141 5142 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5143 arch_spin_lock(&cpu_buffer->lock); 5144 rb_iter_reset(iter); 5145 arch_spin_unlock(&cpu_buffer->lock); 5146 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5147 } 5148 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5149 5150 /** 5151 * ring_buffer_read_finish - finish reading the iterator of the buffer 5152 * @iter: The iterator retrieved by ring_buffer_start 5153 * 5154 * This re-enables the recording to the buffer, and frees the 5155 * iterator. 5156 */ 5157 void 5158 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5159 { 5160 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5161 unsigned long flags; 5162 5163 /* 5164 * Ring buffer is disabled from recording, here's a good place 5165 * to check the integrity of the ring buffer. 5166 * Must prevent readers from trying to read, as the check 5167 * clears the HEAD page and readers require it. 5168 */ 5169 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5170 rb_check_pages(cpu_buffer); 5171 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5172 5173 atomic_dec(&cpu_buffer->resize_disabled); 5174 kfree(iter->event); 5175 kfree(iter); 5176 } 5177 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5178 5179 /** 5180 * ring_buffer_iter_advance - advance the iterator to the next location 5181 * @iter: The ring buffer iterator 5182 * 5183 * Move the location of the iterator such that the next read will 5184 * be the next location of the iterator. 5185 */ 5186 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5187 { 5188 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5189 unsigned long flags; 5190 5191 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5192 5193 rb_advance_iter(iter); 5194 5195 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5196 } 5197 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5198 5199 /** 5200 * ring_buffer_size - return the size of the ring buffer (in bytes) 5201 * @buffer: The ring buffer. 5202 * @cpu: The CPU to get ring buffer size from. 5203 */ 5204 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5205 { 5206 /* 5207 * Earlier, this method returned 5208 * BUF_PAGE_SIZE * buffer->nr_pages 5209 * Since the nr_pages field is now removed, we have converted this to 5210 * return the per cpu buffer value. 5211 */ 5212 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5213 return 0; 5214 5215 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 5216 } 5217 EXPORT_SYMBOL_GPL(ring_buffer_size); 5218 5219 static void rb_clear_buffer_page(struct buffer_page *page) 5220 { 5221 local_set(&page->write, 0); 5222 local_set(&page->entries, 0); 5223 rb_init_page(page->page); 5224 page->read = 0; 5225 } 5226 5227 static void 5228 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5229 { 5230 struct buffer_page *page; 5231 5232 rb_head_page_deactivate(cpu_buffer); 5233 5234 cpu_buffer->head_page 5235 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5236 rb_clear_buffer_page(cpu_buffer->head_page); 5237 list_for_each_entry(page, cpu_buffer->pages, list) { 5238 rb_clear_buffer_page(page); 5239 } 5240 5241 cpu_buffer->tail_page = cpu_buffer->head_page; 5242 cpu_buffer->commit_page = cpu_buffer->head_page; 5243 5244 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5245 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5246 rb_clear_buffer_page(cpu_buffer->reader_page); 5247 5248 local_set(&cpu_buffer->entries_bytes, 0); 5249 local_set(&cpu_buffer->overrun, 0); 5250 local_set(&cpu_buffer->commit_overrun, 0); 5251 local_set(&cpu_buffer->dropped_events, 0); 5252 local_set(&cpu_buffer->entries, 0); 5253 local_set(&cpu_buffer->committing, 0); 5254 local_set(&cpu_buffer->commits, 0); 5255 local_set(&cpu_buffer->pages_touched, 0); 5256 local_set(&cpu_buffer->pages_lost, 0); 5257 local_set(&cpu_buffer->pages_read, 0); 5258 cpu_buffer->last_pages_touch = 0; 5259 cpu_buffer->shortest_full = 0; 5260 cpu_buffer->read = 0; 5261 cpu_buffer->read_bytes = 0; 5262 5263 rb_time_set(&cpu_buffer->write_stamp, 0); 5264 rb_time_set(&cpu_buffer->before_stamp, 0); 5265 5266 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5267 5268 cpu_buffer->lost_events = 0; 5269 cpu_buffer->last_overrun = 0; 5270 5271 rb_head_page_activate(cpu_buffer); 5272 cpu_buffer->pages_removed = 0; 5273 } 5274 5275 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5276 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5277 { 5278 unsigned long flags; 5279 5280 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5281 5282 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5283 goto out; 5284 5285 arch_spin_lock(&cpu_buffer->lock); 5286 5287 rb_reset_cpu(cpu_buffer); 5288 5289 arch_spin_unlock(&cpu_buffer->lock); 5290 5291 out: 5292 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5293 } 5294 5295 /** 5296 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5297 * @buffer: The ring buffer to reset a per cpu buffer of 5298 * @cpu: The CPU buffer to be reset 5299 */ 5300 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5301 { 5302 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5303 5304 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5305 return; 5306 5307 /* prevent another thread from changing buffer sizes */ 5308 mutex_lock(&buffer->mutex); 5309 5310 atomic_inc(&cpu_buffer->resize_disabled); 5311 atomic_inc(&cpu_buffer->record_disabled); 5312 5313 /* Make sure all commits have finished */ 5314 synchronize_rcu(); 5315 5316 reset_disabled_cpu_buffer(cpu_buffer); 5317 5318 atomic_dec(&cpu_buffer->record_disabled); 5319 atomic_dec(&cpu_buffer->resize_disabled); 5320 5321 mutex_unlock(&buffer->mutex); 5322 } 5323 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5324 5325 /* Flag to ensure proper resetting of atomic variables */ 5326 #define RESET_BIT (1 << 30) 5327 5328 /** 5329 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5330 * @buffer: The ring buffer to reset a per cpu buffer of 5331 */ 5332 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5333 { 5334 struct ring_buffer_per_cpu *cpu_buffer; 5335 int cpu; 5336 5337 /* prevent another thread from changing buffer sizes */ 5338 mutex_lock(&buffer->mutex); 5339 5340 for_each_online_buffer_cpu(buffer, cpu) { 5341 cpu_buffer = buffer->buffers[cpu]; 5342 5343 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5344 atomic_inc(&cpu_buffer->record_disabled); 5345 } 5346 5347 /* Make sure all commits have finished */ 5348 synchronize_rcu(); 5349 5350 for_each_buffer_cpu(buffer, cpu) { 5351 cpu_buffer = buffer->buffers[cpu]; 5352 5353 /* 5354 * If a CPU came online during the synchronize_rcu(), then 5355 * ignore it. 5356 */ 5357 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5358 continue; 5359 5360 reset_disabled_cpu_buffer(cpu_buffer); 5361 5362 atomic_dec(&cpu_buffer->record_disabled); 5363 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5364 } 5365 5366 mutex_unlock(&buffer->mutex); 5367 } 5368 5369 /** 5370 * ring_buffer_reset - reset a ring buffer 5371 * @buffer: The ring buffer to reset all cpu buffers 5372 */ 5373 void ring_buffer_reset(struct trace_buffer *buffer) 5374 { 5375 struct ring_buffer_per_cpu *cpu_buffer; 5376 int cpu; 5377 5378 /* prevent another thread from changing buffer sizes */ 5379 mutex_lock(&buffer->mutex); 5380 5381 for_each_buffer_cpu(buffer, cpu) { 5382 cpu_buffer = buffer->buffers[cpu]; 5383 5384 atomic_inc(&cpu_buffer->resize_disabled); 5385 atomic_inc(&cpu_buffer->record_disabled); 5386 } 5387 5388 /* Make sure all commits have finished */ 5389 synchronize_rcu(); 5390 5391 for_each_buffer_cpu(buffer, cpu) { 5392 cpu_buffer = buffer->buffers[cpu]; 5393 5394 reset_disabled_cpu_buffer(cpu_buffer); 5395 5396 atomic_dec(&cpu_buffer->record_disabled); 5397 atomic_dec(&cpu_buffer->resize_disabled); 5398 } 5399 5400 mutex_unlock(&buffer->mutex); 5401 } 5402 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5403 5404 /** 5405 * ring_buffer_empty - is the ring buffer empty? 5406 * @buffer: The ring buffer to test 5407 */ 5408 bool ring_buffer_empty(struct trace_buffer *buffer) 5409 { 5410 struct ring_buffer_per_cpu *cpu_buffer; 5411 unsigned long flags; 5412 bool dolock; 5413 bool ret; 5414 int cpu; 5415 5416 /* yes this is racy, but if you don't like the race, lock the buffer */ 5417 for_each_buffer_cpu(buffer, cpu) { 5418 cpu_buffer = buffer->buffers[cpu]; 5419 local_irq_save(flags); 5420 dolock = rb_reader_lock(cpu_buffer); 5421 ret = rb_per_cpu_empty(cpu_buffer); 5422 rb_reader_unlock(cpu_buffer, dolock); 5423 local_irq_restore(flags); 5424 5425 if (!ret) 5426 return false; 5427 } 5428 5429 return true; 5430 } 5431 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5432 5433 /** 5434 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5435 * @buffer: The ring buffer 5436 * @cpu: The CPU buffer to test 5437 */ 5438 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5439 { 5440 struct ring_buffer_per_cpu *cpu_buffer; 5441 unsigned long flags; 5442 bool dolock; 5443 bool ret; 5444 5445 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5446 return true; 5447 5448 cpu_buffer = buffer->buffers[cpu]; 5449 local_irq_save(flags); 5450 dolock = rb_reader_lock(cpu_buffer); 5451 ret = rb_per_cpu_empty(cpu_buffer); 5452 rb_reader_unlock(cpu_buffer, dolock); 5453 local_irq_restore(flags); 5454 5455 return ret; 5456 } 5457 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5458 5459 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5460 /** 5461 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5462 * @buffer_a: One buffer to swap with 5463 * @buffer_b: The other buffer to swap with 5464 * @cpu: the CPU of the buffers to swap 5465 * 5466 * This function is useful for tracers that want to take a "snapshot" 5467 * of a CPU buffer and has another back up buffer lying around. 5468 * it is expected that the tracer handles the cpu buffer not being 5469 * used at the moment. 5470 */ 5471 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5472 struct trace_buffer *buffer_b, int cpu) 5473 { 5474 struct ring_buffer_per_cpu *cpu_buffer_a; 5475 struct ring_buffer_per_cpu *cpu_buffer_b; 5476 int ret = -EINVAL; 5477 5478 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5479 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5480 goto out; 5481 5482 cpu_buffer_a = buffer_a->buffers[cpu]; 5483 cpu_buffer_b = buffer_b->buffers[cpu]; 5484 5485 /* At least make sure the two buffers are somewhat the same */ 5486 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5487 goto out; 5488 5489 ret = -EAGAIN; 5490 5491 if (atomic_read(&buffer_a->record_disabled)) 5492 goto out; 5493 5494 if (atomic_read(&buffer_b->record_disabled)) 5495 goto out; 5496 5497 if (atomic_read(&cpu_buffer_a->record_disabled)) 5498 goto out; 5499 5500 if (atomic_read(&cpu_buffer_b->record_disabled)) 5501 goto out; 5502 5503 /* 5504 * We can't do a synchronize_rcu here because this 5505 * function can be called in atomic context. 5506 * Normally this will be called from the same CPU as cpu. 5507 * If not it's up to the caller to protect this. 5508 */ 5509 atomic_inc(&cpu_buffer_a->record_disabled); 5510 atomic_inc(&cpu_buffer_b->record_disabled); 5511 5512 ret = -EBUSY; 5513 if (local_read(&cpu_buffer_a->committing)) 5514 goto out_dec; 5515 if (local_read(&cpu_buffer_b->committing)) 5516 goto out_dec; 5517 5518 /* 5519 * When resize is in progress, we cannot swap it because 5520 * it will mess the state of the cpu buffer. 5521 */ 5522 if (atomic_read(&buffer_a->resizing)) 5523 goto out_dec; 5524 if (atomic_read(&buffer_b->resizing)) 5525 goto out_dec; 5526 5527 buffer_a->buffers[cpu] = cpu_buffer_b; 5528 buffer_b->buffers[cpu] = cpu_buffer_a; 5529 5530 cpu_buffer_b->buffer = buffer_a; 5531 cpu_buffer_a->buffer = buffer_b; 5532 5533 ret = 0; 5534 5535 out_dec: 5536 atomic_dec(&cpu_buffer_a->record_disabled); 5537 atomic_dec(&cpu_buffer_b->record_disabled); 5538 out: 5539 return ret; 5540 } 5541 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5542 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5543 5544 /** 5545 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5546 * @buffer: the buffer to allocate for. 5547 * @cpu: the cpu buffer to allocate. 5548 * 5549 * This function is used in conjunction with ring_buffer_read_page. 5550 * When reading a full page from the ring buffer, these functions 5551 * can be used to speed up the process. The calling function should 5552 * allocate a few pages first with this function. Then when it 5553 * needs to get pages from the ring buffer, it passes the result 5554 * of this function into ring_buffer_read_page, which will swap 5555 * the page that was allocated, with the read page of the buffer. 5556 * 5557 * Returns: 5558 * The page allocated, or ERR_PTR 5559 */ 5560 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5561 { 5562 struct ring_buffer_per_cpu *cpu_buffer; 5563 struct buffer_data_page *bpage = NULL; 5564 unsigned long flags; 5565 struct page *page; 5566 5567 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5568 return ERR_PTR(-ENODEV); 5569 5570 cpu_buffer = buffer->buffers[cpu]; 5571 local_irq_save(flags); 5572 arch_spin_lock(&cpu_buffer->lock); 5573 5574 if (cpu_buffer->free_page) { 5575 bpage = cpu_buffer->free_page; 5576 cpu_buffer->free_page = NULL; 5577 } 5578 5579 arch_spin_unlock(&cpu_buffer->lock); 5580 local_irq_restore(flags); 5581 5582 if (bpage) 5583 goto out; 5584 5585 page = alloc_pages_node(cpu_to_node(cpu), 5586 GFP_KERNEL | __GFP_NORETRY, 0); 5587 if (!page) 5588 return ERR_PTR(-ENOMEM); 5589 5590 bpage = page_address(page); 5591 5592 out: 5593 rb_init_page(bpage); 5594 5595 return bpage; 5596 } 5597 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5598 5599 /** 5600 * ring_buffer_free_read_page - free an allocated read page 5601 * @buffer: the buffer the page was allocate for 5602 * @cpu: the cpu buffer the page came from 5603 * @data: the page to free 5604 * 5605 * Free a page allocated from ring_buffer_alloc_read_page. 5606 */ 5607 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5608 { 5609 struct ring_buffer_per_cpu *cpu_buffer; 5610 struct buffer_data_page *bpage = data; 5611 struct page *page = virt_to_page(bpage); 5612 unsigned long flags; 5613 5614 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5615 return; 5616 5617 cpu_buffer = buffer->buffers[cpu]; 5618 5619 /* If the page is still in use someplace else, we can't reuse it */ 5620 if (page_ref_count(page) > 1) 5621 goto out; 5622 5623 local_irq_save(flags); 5624 arch_spin_lock(&cpu_buffer->lock); 5625 5626 if (!cpu_buffer->free_page) { 5627 cpu_buffer->free_page = bpage; 5628 bpage = NULL; 5629 } 5630 5631 arch_spin_unlock(&cpu_buffer->lock); 5632 local_irq_restore(flags); 5633 5634 out: 5635 free_page((unsigned long)bpage); 5636 } 5637 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5638 5639 /** 5640 * ring_buffer_read_page - extract a page from the ring buffer 5641 * @buffer: buffer to extract from 5642 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5643 * @len: amount to extract 5644 * @cpu: the cpu of the buffer to extract 5645 * @full: should the extraction only happen when the page is full. 5646 * 5647 * This function will pull out a page from the ring buffer and consume it. 5648 * @data_page must be the address of the variable that was returned 5649 * from ring_buffer_alloc_read_page. This is because the page might be used 5650 * to swap with a page in the ring buffer. 5651 * 5652 * for example: 5653 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5654 * if (IS_ERR(rpage)) 5655 * return PTR_ERR(rpage); 5656 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5657 * if (ret >= 0) 5658 * process_page(rpage, ret); 5659 * 5660 * When @full is set, the function will not return true unless 5661 * the writer is off the reader page. 5662 * 5663 * Note: it is up to the calling functions to handle sleeps and wakeups. 5664 * The ring buffer can be used anywhere in the kernel and can not 5665 * blindly call wake_up. The layer that uses the ring buffer must be 5666 * responsible for that. 5667 * 5668 * Returns: 5669 * >=0 if data has been transferred, returns the offset of consumed data. 5670 * <0 if no data has been transferred. 5671 */ 5672 int ring_buffer_read_page(struct trace_buffer *buffer, 5673 void **data_page, size_t len, int cpu, int full) 5674 { 5675 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5676 struct ring_buffer_event *event; 5677 struct buffer_data_page *bpage; 5678 struct buffer_page *reader; 5679 unsigned long missed_events; 5680 unsigned long flags; 5681 unsigned int commit; 5682 unsigned int read; 5683 u64 save_timestamp; 5684 int ret = -1; 5685 5686 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5687 goto out; 5688 5689 /* 5690 * If len is not big enough to hold the page header, then 5691 * we can not copy anything. 5692 */ 5693 if (len <= BUF_PAGE_HDR_SIZE) 5694 goto out; 5695 5696 len -= BUF_PAGE_HDR_SIZE; 5697 5698 if (!data_page) 5699 goto out; 5700 5701 bpage = *data_page; 5702 if (!bpage) 5703 goto out; 5704 5705 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5706 5707 reader = rb_get_reader_page(cpu_buffer); 5708 if (!reader) 5709 goto out_unlock; 5710 5711 event = rb_reader_event(cpu_buffer); 5712 5713 read = reader->read; 5714 commit = rb_page_commit(reader); 5715 5716 /* Check if any events were dropped */ 5717 missed_events = cpu_buffer->lost_events; 5718 5719 /* 5720 * If this page has been partially read or 5721 * if len is not big enough to read the rest of the page or 5722 * a writer is still on the page, then 5723 * we must copy the data from the page to the buffer. 5724 * Otherwise, we can simply swap the page with the one passed in. 5725 */ 5726 if (read || (len < (commit - read)) || 5727 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5728 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5729 unsigned int rpos = read; 5730 unsigned int pos = 0; 5731 unsigned int size; 5732 5733 /* 5734 * If a full page is expected, this can still be returned 5735 * if there's been a previous partial read and the 5736 * rest of the page can be read and the commit page is off 5737 * the reader page. 5738 */ 5739 if (full && 5740 (!read || (len < (commit - read)) || 5741 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5742 goto out_unlock; 5743 5744 if (len > (commit - read)) 5745 len = (commit - read); 5746 5747 /* Always keep the time extend and data together */ 5748 size = rb_event_ts_length(event); 5749 5750 if (len < size) 5751 goto out_unlock; 5752 5753 /* save the current timestamp, since the user will need it */ 5754 save_timestamp = cpu_buffer->read_stamp; 5755 5756 /* Need to copy one event at a time */ 5757 do { 5758 /* We need the size of one event, because 5759 * rb_advance_reader only advances by one event, 5760 * whereas rb_event_ts_length may include the size of 5761 * one or two events. 5762 * We have already ensured there's enough space if this 5763 * is a time extend. */ 5764 size = rb_event_length(event); 5765 memcpy(bpage->data + pos, rpage->data + rpos, size); 5766 5767 len -= size; 5768 5769 rb_advance_reader(cpu_buffer); 5770 rpos = reader->read; 5771 pos += size; 5772 5773 if (rpos >= commit) 5774 break; 5775 5776 event = rb_reader_event(cpu_buffer); 5777 /* Always keep the time extend and data together */ 5778 size = rb_event_ts_length(event); 5779 } while (len >= size); 5780 5781 /* update bpage */ 5782 local_set(&bpage->commit, pos); 5783 bpage->time_stamp = save_timestamp; 5784 5785 /* we copied everything to the beginning */ 5786 read = 0; 5787 } else { 5788 /* update the entry counter */ 5789 cpu_buffer->read += rb_page_entries(reader); 5790 cpu_buffer->read_bytes += rb_page_commit(reader); 5791 5792 /* swap the pages */ 5793 rb_init_page(bpage); 5794 bpage = reader->page; 5795 reader->page = *data_page; 5796 local_set(&reader->write, 0); 5797 local_set(&reader->entries, 0); 5798 reader->read = 0; 5799 *data_page = bpage; 5800 5801 /* 5802 * Use the real_end for the data size, 5803 * This gives us a chance to store the lost events 5804 * on the page. 5805 */ 5806 if (reader->real_end) 5807 local_set(&bpage->commit, reader->real_end); 5808 } 5809 ret = read; 5810 5811 cpu_buffer->lost_events = 0; 5812 5813 commit = local_read(&bpage->commit); 5814 /* 5815 * Set a flag in the commit field if we lost events 5816 */ 5817 if (missed_events) { 5818 /* If there is room at the end of the page to save the 5819 * missed events, then record it there. 5820 */ 5821 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5822 memcpy(&bpage->data[commit], &missed_events, 5823 sizeof(missed_events)); 5824 local_add(RB_MISSED_STORED, &bpage->commit); 5825 commit += sizeof(missed_events); 5826 } 5827 local_add(RB_MISSED_EVENTS, &bpage->commit); 5828 } 5829 5830 /* 5831 * This page may be off to user land. Zero it out here. 5832 */ 5833 if (commit < BUF_PAGE_SIZE) 5834 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5835 5836 out_unlock: 5837 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5838 5839 out: 5840 return ret; 5841 } 5842 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5843 5844 /* 5845 * We only allocate new buffers, never free them if the CPU goes down. 5846 * If we were to free the buffer, then the user would lose any trace that was in 5847 * the buffer. 5848 */ 5849 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5850 { 5851 struct trace_buffer *buffer; 5852 long nr_pages_same; 5853 int cpu_i; 5854 unsigned long nr_pages; 5855 5856 buffer = container_of(node, struct trace_buffer, node); 5857 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5858 return 0; 5859 5860 nr_pages = 0; 5861 nr_pages_same = 1; 5862 /* check if all cpu sizes are same */ 5863 for_each_buffer_cpu(buffer, cpu_i) { 5864 /* fill in the size from first enabled cpu */ 5865 if (nr_pages == 0) 5866 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5867 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5868 nr_pages_same = 0; 5869 break; 5870 } 5871 } 5872 /* allocate minimum pages, user can later expand it */ 5873 if (!nr_pages_same) 5874 nr_pages = 2; 5875 buffer->buffers[cpu] = 5876 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5877 if (!buffer->buffers[cpu]) { 5878 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5879 cpu); 5880 return -ENOMEM; 5881 } 5882 smp_wmb(); 5883 cpumask_set_cpu(cpu, buffer->cpumask); 5884 return 0; 5885 } 5886 5887 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5888 /* 5889 * This is a basic integrity check of the ring buffer. 5890 * Late in the boot cycle this test will run when configured in. 5891 * It will kick off a thread per CPU that will go into a loop 5892 * writing to the per cpu ring buffer various sizes of data. 5893 * Some of the data will be large items, some small. 5894 * 5895 * Another thread is created that goes into a spin, sending out 5896 * IPIs to the other CPUs to also write into the ring buffer. 5897 * this is to test the nesting ability of the buffer. 5898 * 5899 * Basic stats are recorded and reported. If something in the 5900 * ring buffer should happen that's not expected, a big warning 5901 * is displayed and all ring buffers are disabled. 5902 */ 5903 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5904 5905 struct rb_test_data { 5906 struct trace_buffer *buffer; 5907 unsigned long events; 5908 unsigned long bytes_written; 5909 unsigned long bytes_alloc; 5910 unsigned long bytes_dropped; 5911 unsigned long events_nested; 5912 unsigned long bytes_written_nested; 5913 unsigned long bytes_alloc_nested; 5914 unsigned long bytes_dropped_nested; 5915 int min_size_nested; 5916 int max_size_nested; 5917 int max_size; 5918 int min_size; 5919 int cpu; 5920 int cnt; 5921 }; 5922 5923 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5924 5925 /* 1 meg per cpu */ 5926 #define RB_TEST_BUFFER_SIZE 1048576 5927 5928 static char rb_string[] __initdata = 5929 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5930 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5931 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5932 5933 static bool rb_test_started __initdata; 5934 5935 struct rb_item { 5936 int size; 5937 char str[]; 5938 }; 5939 5940 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5941 { 5942 struct ring_buffer_event *event; 5943 struct rb_item *item; 5944 bool started; 5945 int event_len; 5946 int size; 5947 int len; 5948 int cnt; 5949 5950 /* Have nested writes different that what is written */ 5951 cnt = data->cnt + (nested ? 27 : 0); 5952 5953 /* Multiply cnt by ~e, to make some unique increment */ 5954 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5955 5956 len = size + sizeof(struct rb_item); 5957 5958 started = rb_test_started; 5959 /* read rb_test_started before checking buffer enabled */ 5960 smp_rmb(); 5961 5962 event = ring_buffer_lock_reserve(data->buffer, len); 5963 if (!event) { 5964 /* Ignore dropped events before test starts. */ 5965 if (started) { 5966 if (nested) 5967 data->bytes_dropped += len; 5968 else 5969 data->bytes_dropped_nested += len; 5970 } 5971 return len; 5972 } 5973 5974 event_len = ring_buffer_event_length(event); 5975 5976 if (RB_WARN_ON(data->buffer, event_len < len)) 5977 goto out; 5978 5979 item = ring_buffer_event_data(event); 5980 item->size = size; 5981 memcpy(item->str, rb_string, size); 5982 5983 if (nested) { 5984 data->bytes_alloc_nested += event_len; 5985 data->bytes_written_nested += len; 5986 data->events_nested++; 5987 if (!data->min_size_nested || len < data->min_size_nested) 5988 data->min_size_nested = len; 5989 if (len > data->max_size_nested) 5990 data->max_size_nested = len; 5991 } else { 5992 data->bytes_alloc += event_len; 5993 data->bytes_written += len; 5994 data->events++; 5995 if (!data->min_size || len < data->min_size) 5996 data->max_size = len; 5997 if (len > data->max_size) 5998 data->max_size = len; 5999 } 6000 6001 out: 6002 ring_buffer_unlock_commit(data->buffer); 6003 6004 return 0; 6005 } 6006 6007 static __init int rb_test(void *arg) 6008 { 6009 struct rb_test_data *data = arg; 6010 6011 while (!kthread_should_stop()) { 6012 rb_write_something(data, false); 6013 data->cnt++; 6014 6015 set_current_state(TASK_INTERRUPTIBLE); 6016 /* Now sleep between a min of 100-300us and a max of 1ms */ 6017 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6018 } 6019 6020 return 0; 6021 } 6022 6023 static __init void rb_ipi(void *ignore) 6024 { 6025 struct rb_test_data *data; 6026 int cpu = smp_processor_id(); 6027 6028 data = &rb_data[cpu]; 6029 rb_write_something(data, true); 6030 } 6031 6032 static __init int rb_hammer_test(void *arg) 6033 { 6034 while (!kthread_should_stop()) { 6035 6036 /* Send an IPI to all cpus to write data! */ 6037 smp_call_function(rb_ipi, NULL, 1); 6038 /* No sleep, but for non preempt, let others run */ 6039 schedule(); 6040 } 6041 6042 return 0; 6043 } 6044 6045 static __init int test_ringbuffer(void) 6046 { 6047 struct task_struct *rb_hammer; 6048 struct trace_buffer *buffer; 6049 int cpu; 6050 int ret = 0; 6051 6052 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6053 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6054 return 0; 6055 } 6056 6057 pr_info("Running ring buffer tests...\n"); 6058 6059 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6060 if (WARN_ON(!buffer)) 6061 return 0; 6062 6063 /* Disable buffer so that threads can't write to it yet */ 6064 ring_buffer_record_off(buffer); 6065 6066 for_each_online_cpu(cpu) { 6067 rb_data[cpu].buffer = buffer; 6068 rb_data[cpu].cpu = cpu; 6069 rb_data[cpu].cnt = cpu; 6070 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6071 cpu, "rbtester/%u"); 6072 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6073 pr_cont("FAILED\n"); 6074 ret = PTR_ERR(rb_threads[cpu]); 6075 goto out_free; 6076 } 6077 } 6078 6079 /* Now create the rb hammer! */ 6080 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6081 if (WARN_ON(IS_ERR(rb_hammer))) { 6082 pr_cont("FAILED\n"); 6083 ret = PTR_ERR(rb_hammer); 6084 goto out_free; 6085 } 6086 6087 ring_buffer_record_on(buffer); 6088 /* 6089 * Show buffer is enabled before setting rb_test_started. 6090 * Yes there's a small race window where events could be 6091 * dropped and the thread wont catch it. But when a ring 6092 * buffer gets enabled, there will always be some kind of 6093 * delay before other CPUs see it. Thus, we don't care about 6094 * those dropped events. We care about events dropped after 6095 * the threads see that the buffer is active. 6096 */ 6097 smp_wmb(); 6098 rb_test_started = true; 6099 6100 set_current_state(TASK_INTERRUPTIBLE); 6101 /* Just run for 10 seconds */; 6102 schedule_timeout(10 * HZ); 6103 6104 kthread_stop(rb_hammer); 6105 6106 out_free: 6107 for_each_online_cpu(cpu) { 6108 if (!rb_threads[cpu]) 6109 break; 6110 kthread_stop(rb_threads[cpu]); 6111 } 6112 if (ret) { 6113 ring_buffer_free(buffer); 6114 return ret; 6115 } 6116 6117 /* Report! */ 6118 pr_info("finished\n"); 6119 for_each_online_cpu(cpu) { 6120 struct ring_buffer_event *event; 6121 struct rb_test_data *data = &rb_data[cpu]; 6122 struct rb_item *item; 6123 unsigned long total_events; 6124 unsigned long total_dropped; 6125 unsigned long total_written; 6126 unsigned long total_alloc; 6127 unsigned long total_read = 0; 6128 unsigned long total_size = 0; 6129 unsigned long total_len = 0; 6130 unsigned long total_lost = 0; 6131 unsigned long lost; 6132 int big_event_size; 6133 int small_event_size; 6134 6135 ret = -1; 6136 6137 total_events = data->events + data->events_nested; 6138 total_written = data->bytes_written + data->bytes_written_nested; 6139 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6140 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6141 6142 big_event_size = data->max_size + data->max_size_nested; 6143 small_event_size = data->min_size + data->min_size_nested; 6144 6145 pr_info("CPU %d:\n", cpu); 6146 pr_info(" events: %ld\n", total_events); 6147 pr_info(" dropped bytes: %ld\n", total_dropped); 6148 pr_info(" alloced bytes: %ld\n", total_alloc); 6149 pr_info(" written bytes: %ld\n", total_written); 6150 pr_info(" biggest event: %d\n", big_event_size); 6151 pr_info(" smallest event: %d\n", small_event_size); 6152 6153 if (RB_WARN_ON(buffer, total_dropped)) 6154 break; 6155 6156 ret = 0; 6157 6158 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6159 total_lost += lost; 6160 item = ring_buffer_event_data(event); 6161 total_len += ring_buffer_event_length(event); 6162 total_size += item->size + sizeof(struct rb_item); 6163 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6164 pr_info("FAILED!\n"); 6165 pr_info("buffer had: %.*s\n", item->size, item->str); 6166 pr_info("expected: %.*s\n", item->size, rb_string); 6167 RB_WARN_ON(buffer, 1); 6168 ret = -1; 6169 break; 6170 } 6171 total_read++; 6172 } 6173 if (ret) 6174 break; 6175 6176 ret = -1; 6177 6178 pr_info(" read events: %ld\n", total_read); 6179 pr_info(" lost events: %ld\n", total_lost); 6180 pr_info(" total events: %ld\n", total_lost + total_read); 6181 pr_info(" recorded len bytes: %ld\n", total_len); 6182 pr_info(" recorded size bytes: %ld\n", total_size); 6183 if (total_lost) { 6184 pr_info(" With dropped events, record len and size may not match\n" 6185 " alloced and written from above\n"); 6186 } else { 6187 if (RB_WARN_ON(buffer, total_len != total_alloc || 6188 total_size != total_written)) 6189 break; 6190 } 6191 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6192 break; 6193 6194 ret = 0; 6195 } 6196 if (!ret) 6197 pr_info("Ring buffer PASSED!\n"); 6198 6199 ring_buffer_free(buffer); 6200 return 0; 6201 } 6202 6203 late_initcall(test_ringbuffer); 6204 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6205