1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/ring_buffer_types.h> 8 #include <linux/sched/isolation.h> 9 #include <linux/trace_recursion.h> 10 #include <linux/panic_notifier.h> 11 #include <linux/trace_events.h> 12 #include <linux/ring_buffer.h> 13 #include <linux/trace_clock.h> 14 #include <linux/sched/clock.h> 15 #include <linux/cacheflush.h> 16 #include <linux/trace_seq.h> 17 #include <linux/spinlock.h> 18 #include <linux/irq_work.h> 19 #include <linux/security.h> 20 #include <linux/uaccess.h> 21 #include <linux/hardirq.h> 22 #include <linux/kthread.h> /* for self test */ 23 #include <linux/module.h> 24 #include <linux/percpu.h> 25 #include <linux/mutex.h> 26 #include <linux/delay.h> 27 #include <linux/slab.h> 28 #include <linux/init.h> 29 #include <linux/hash.h> 30 #include <linux/list.h> 31 #include <linux/cpu.h> 32 #include <linux/oom.h> 33 #include <linux/mm.h> 34 35 #include <asm/ring_buffer.h> 36 #include <asm/local64.h> 37 #include <asm/local.h> 38 #include <asm/setup.h> 39 40 #include "trace.h" 41 42 /* 43 * The "absolute" timestamp in the buffer is only 59 bits. 44 * If a clock has the 5 MSBs set, it needs to be saved and 45 * reinserted. 46 */ 47 #define TS_MSB (0xf8ULL << 56) 48 #define ABS_TS_MASK (~TS_MSB) 49 50 static void update_pages_handler(struct work_struct *work); 51 52 #define RING_BUFFER_META_MAGIC 0xBADFEED 53 54 struct ring_buffer_meta { 55 int magic; 56 int struct_sizes; 57 unsigned long total_size; 58 unsigned long buffers_offset; 59 }; 60 61 struct ring_buffer_cpu_meta { 62 unsigned long first_buffer; 63 unsigned long head_buffer; 64 unsigned long commit_buffer; 65 __u32 subbuf_size; 66 __u32 nr_subbufs; 67 #ifdef CONFIG_RING_BUFFER_PERSISTENT_INJECT 68 __u32 nr_invalid; 69 __u32 entry_bytes; 70 #endif 71 int buffers[]; 72 }; 73 74 /* 75 * The ring buffer header is special. We must manually up keep it. 76 */ 77 int ring_buffer_print_entry_header(struct trace_seq *s) 78 { 79 trace_seq_puts(s, "# compressed entry header\n"); 80 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 81 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 82 trace_seq_puts(s, "\tarray : 32 bits\n"); 83 trace_seq_putc(s, '\n'); 84 trace_seq_printf(s, "\tpadding : type == %d\n", 85 RINGBUF_TYPE_PADDING); 86 trace_seq_printf(s, "\ttime_extend : type == %d\n", 87 RINGBUF_TYPE_TIME_EXTEND); 88 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 89 RINGBUF_TYPE_TIME_STAMP); 90 trace_seq_printf(s, "\tdata max type_len == %d\n", 91 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 92 93 return !trace_seq_has_overflowed(s); 94 } 95 96 /* 97 * The ring buffer is made up of a list of pages. A separate list of pages is 98 * allocated for each CPU. A writer may only write to a buffer that is 99 * associated with the CPU it is currently executing on. A reader may read 100 * from any per cpu buffer. 101 * 102 * The reader is special. For each per cpu buffer, the reader has its own 103 * reader page. When a reader has read the entire reader page, this reader 104 * page is swapped with another page in the ring buffer. 105 * 106 * Now, as long as the writer is off the reader page, the reader can do what 107 * ever it wants with that page. The writer will never write to that page 108 * again (as long as it is out of the ring buffer). 109 * 110 * Here's some silly ASCII art. 111 * 112 * +------+ 113 * |reader| RING BUFFER 114 * |page | 115 * +------+ +---+ +---+ +---+ 116 * | |-->| |-->| | 117 * +---+ +---+ +---+ 118 * ^ | 119 * | | 120 * +---------------+ 121 * 122 * 123 * +------+ 124 * |reader| RING BUFFER 125 * |page |------------------v 126 * +------+ +---+ +---+ +---+ 127 * | |-->| |-->| | 128 * +---+ +---+ +---+ 129 * ^ | 130 * | | 131 * +---------------+ 132 * 133 * 134 * +------+ 135 * |reader| RING BUFFER 136 * |page |------------------v 137 * +------+ +---+ +---+ +---+ 138 * ^ | |-->| |-->| | 139 * | +---+ +---+ +---+ 140 * | | 141 * | | 142 * +------------------------------+ 143 * 144 * 145 * +------+ 146 * |buffer| RING BUFFER 147 * |page |------------------v 148 * +------+ +---+ +---+ +---+ 149 * ^ | | | |-->| | 150 * | New +---+ +---+ +---+ 151 * | Reader------^ | 152 * | page | 153 * +------------------------------+ 154 * 155 * 156 * After we make this swap, the reader can hand this page off to the splice 157 * code and be done with it. It can even allocate a new page if it needs to 158 * and swap that into the ring buffer. 159 * 160 * We will be using cmpxchg soon to make all this lockless. 161 * 162 */ 163 164 /* Used for individual buffers (after the counter) */ 165 #define RB_BUFFER_OFF (1 << 20) 166 167 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 168 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 169 170 enum { 171 RB_LEN_TIME_EXTEND = 8, 172 RB_LEN_TIME_STAMP = 8, 173 }; 174 175 #define skip_time_extend(event) \ 176 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 177 178 #define extended_time(event) \ 179 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 180 181 static inline bool rb_null_event(struct ring_buffer_event *event) 182 { 183 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 184 } 185 186 static void rb_event_set_padding(struct ring_buffer_event *event) 187 { 188 /* padding has a NULL time_delta */ 189 event->type_len = RINGBUF_TYPE_PADDING; 190 event->time_delta = 0; 191 } 192 193 static unsigned 194 rb_event_data_length(struct ring_buffer_event *event) 195 { 196 unsigned length; 197 198 if (event->type_len) 199 length = event->type_len * RB_ALIGNMENT; 200 else 201 length = event->array[0]; 202 return length + RB_EVNT_HDR_SIZE; 203 } 204 205 /* 206 * Return the length of the given event. Will return 207 * the length of the time extend if the event is a 208 * time extend. 209 */ 210 static inline unsigned 211 rb_event_length(struct ring_buffer_event *event) 212 { 213 switch (event->type_len) { 214 case RINGBUF_TYPE_PADDING: 215 if (rb_null_event(event)) 216 /* undefined */ 217 return -1; 218 return event->array[0] + RB_EVNT_HDR_SIZE; 219 220 case RINGBUF_TYPE_TIME_EXTEND: 221 return RB_LEN_TIME_EXTEND; 222 223 case RINGBUF_TYPE_TIME_STAMP: 224 return RB_LEN_TIME_STAMP; 225 226 case RINGBUF_TYPE_DATA: 227 return rb_event_data_length(event); 228 default: 229 WARN_ON_ONCE(1); 230 } 231 /* not hit */ 232 return 0; 233 } 234 235 /* 236 * Return total length of time extend and data, 237 * or just the event length for all other events. 238 */ 239 static inline unsigned 240 rb_event_ts_length(struct ring_buffer_event *event) 241 { 242 unsigned len = 0; 243 244 if (extended_time(event)) { 245 /* time extends include the data event after it */ 246 len = RB_LEN_TIME_EXTEND; 247 event = skip_time_extend(event); 248 } 249 return len + rb_event_length(event); 250 } 251 252 /** 253 * ring_buffer_event_length - return the length of the event 254 * @event: the event to get the length of 255 * 256 * Returns the size of the data load of a data event. 257 * If the event is something other than a data event, it 258 * returns the size of the event itself. With the exception 259 * of a TIME EXTEND, where it still returns the size of the 260 * data load of the data event after it. 261 */ 262 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 263 { 264 unsigned length; 265 266 if (extended_time(event)) 267 event = skip_time_extend(event); 268 269 length = rb_event_length(event); 270 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 271 return length; 272 length -= RB_EVNT_HDR_SIZE; 273 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 274 length -= sizeof(event->array[0]); 275 return length; 276 } 277 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 278 279 /* inline for ring buffer fast paths */ 280 static __always_inline void * 281 rb_event_data(struct ring_buffer_event *event) 282 { 283 if (extended_time(event)) 284 event = skip_time_extend(event); 285 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 286 /* If length is in len field, then array[0] has the data */ 287 if (event->type_len) 288 return (void *)&event->array[0]; 289 /* Otherwise length is in array[0] and array[1] has the data */ 290 return (void *)&event->array[1]; 291 } 292 293 /** 294 * ring_buffer_event_data - return the data of the event 295 * @event: the event to get the data from 296 */ 297 void *ring_buffer_event_data(struct ring_buffer_event *event) 298 { 299 return rb_event_data(event); 300 } 301 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 302 303 #define for_each_buffer_cpu(buffer, cpu) \ 304 for_each_cpu(cpu, buffer->cpumask) 305 306 #define for_each_online_buffer_cpu(buffer, cpu) \ 307 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 308 309 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 310 { 311 u64 ts; 312 313 ts = event->array[0]; 314 ts <<= TS_SHIFT; 315 ts += event->time_delta; 316 317 return ts; 318 } 319 320 /* Flag when events were overwritten */ 321 #define RB_MISSED_EVENTS (1 << 31) 322 /* Missed count stored at end */ 323 #define RB_MISSED_STORED (1 << 30) 324 325 #define RB_MISSED_MASK (3 << 30) 326 327 struct buffer_data_read_page { 328 unsigned order; /* order of the page */ 329 struct buffer_data_page *data; /* actual data, stored in this page */ 330 }; 331 332 /* 333 * Note, the buffer_page list must be first. The buffer pages 334 * are allocated in cache lines, which means that each buffer 335 * page will be at the beginning of a cache line, and thus 336 * the least significant bits will be zero. We use this to 337 * add flags in the list struct pointers, to make the ring buffer 338 * lockless. 339 */ 340 struct buffer_page { 341 struct list_head list; /* list of buffer pages */ 342 local_t write; /* index for next write */ 343 unsigned read; /* index for next read */ 344 local_t entries; /* entries on this page */ 345 unsigned long real_end; /* real end of data */ 346 unsigned order; /* order of the page */ 347 u32 id:30; /* ID for external mapping */ 348 u32 range:1; /* Mapped via a range */ 349 struct buffer_data_page *page; /* Actual data page */ 350 }; 351 352 /* 353 * The buffer page counters, write and entries, must be reset 354 * atomically when crossing page boundaries. To synchronize this 355 * update, two counters are inserted into the number. One is 356 * the actual counter for the write position or count on the page. 357 * 358 * The other is a counter of updaters. Before an update happens 359 * the update partition of the counter is incremented. This will 360 * allow the updater to update the counter atomically. 361 * 362 * The counter is 20 bits, and the state data is 12. 363 */ 364 #define RB_WRITE_MASK 0xfffff 365 #define RB_WRITE_INTCNT (1 << 20) 366 367 static void rb_init_data_page(struct buffer_data_page *bpage) 368 { 369 local_set(&bpage->commit, 0); 370 bpage->time_stamp = 0; 371 } 372 373 static __always_inline long rb_data_page_commit(struct buffer_data_page *dpage) 374 { 375 return local_read(&dpage->commit); 376 } 377 378 static __always_inline long rb_data_page_size(struct buffer_data_page *dpage) 379 { 380 return rb_data_page_commit(dpage) & ~RB_MISSED_MASK; 381 } 382 383 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 384 { 385 return rb_data_page_commit(bpage->page); 386 } 387 388 static __always_inline unsigned int rb_page_size(struct buffer_page *bpage) 389 { 390 return rb_data_page_size(bpage->page); 391 } 392 393 static void free_buffer_page(struct buffer_page *bpage) 394 { 395 /* Range pages are not to be freed */ 396 if (!bpage->range) 397 free_pages((unsigned long)bpage->page, bpage->order); 398 kfree(bpage); 399 } 400 401 /* 402 * For best performance, allocate cpu buffer data cache line sized 403 * and per CPU. 404 */ 405 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 406 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 407 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 408 409 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 410 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 411 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 412 413 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 414 { 415 struct buffer_data_page *dpage; 416 struct page *page; 417 gfp_t mflags; 418 419 /* 420 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 421 * gracefully without invoking oom-killer and the system is not 422 * destabilized. 423 */ 424 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 425 426 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 427 if (!page) 428 return NULL; 429 430 dpage = page_address(page); 431 rb_init_data_page(dpage); 432 433 return dpage; 434 } 435 436 struct rb_irq_work { 437 struct irq_work work; 438 wait_queue_head_t waiters; 439 wait_queue_head_t full_waiters; 440 atomic_t seq; 441 bool waiters_pending; 442 bool full_waiters_pending; 443 bool wakeup_full; 444 }; 445 446 /* 447 * Structure to hold event state and handle nested events. 448 */ 449 struct rb_event_info { 450 u64 ts; 451 u64 delta; 452 u64 before; 453 u64 after; 454 unsigned long length; 455 struct buffer_page *tail_page; 456 int add_timestamp; 457 }; 458 459 /* 460 * Used for the add_timestamp 461 * NONE 462 * EXTEND - wants a time extend 463 * ABSOLUTE - the buffer requests all events to have absolute time stamps 464 * FORCE - force a full time stamp. 465 */ 466 enum { 467 RB_ADD_STAMP_NONE = 0, 468 RB_ADD_STAMP_EXTEND = BIT(1), 469 RB_ADD_STAMP_ABSOLUTE = BIT(2), 470 RB_ADD_STAMP_FORCE = BIT(3) 471 }; 472 /* 473 * Used for which event context the event is in. 474 * TRANSITION = 0 475 * NMI = 1 476 * IRQ = 2 477 * SOFTIRQ = 3 478 * NORMAL = 4 479 * 480 * See trace_recursive_lock() comment below for more details. 481 */ 482 enum { 483 RB_CTX_TRANSITION, 484 RB_CTX_NMI, 485 RB_CTX_IRQ, 486 RB_CTX_SOFTIRQ, 487 RB_CTX_NORMAL, 488 RB_CTX_MAX 489 }; 490 491 struct rb_time_struct { 492 local64_t time; 493 }; 494 typedef struct rb_time_struct rb_time_t; 495 496 #define MAX_NEST 5 497 498 /* 499 * head_page == tail_page && head == tail then buffer is empty. 500 */ 501 struct ring_buffer_per_cpu { 502 int cpu; 503 atomic_t record_disabled; 504 atomic_t resize_disabled; 505 struct trace_buffer *buffer; 506 raw_spinlock_t reader_lock; /* serialize readers */ 507 arch_spinlock_t lock; 508 struct lock_class_key lock_key; 509 struct buffer_data_page *free_page; 510 unsigned long nr_pages; 511 unsigned int current_context; 512 struct list_head *pages; 513 /* pages generation counter, incremented when the list changes */ 514 unsigned long cnt; 515 struct buffer_page *head_page; /* read from head */ 516 struct buffer_page *tail_page; /* write to tail */ 517 struct buffer_page *commit_page; /* committed pages */ 518 struct buffer_page *reader_page; 519 unsigned long lost_events; 520 unsigned long last_overrun; 521 unsigned long nest; 522 local_t entries_bytes; 523 local_t entries; 524 local_t overrun; 525 local_t commit_overrun; 526 local_t dropped_events; 527 local_t committing; 528 local_t commits; 529 local_t pages_touched; 530 local_t pages_lost; 531 local_t pages_read; 532 long last_pages_touch; 533 size_t shortest_full; 534 unsigned long read; 535 unsigned long read_bytes; 536 rb_time_t write_stamp; 537 rb_time_t before_stamp; 538 u64 event_stamp[MAX_NEST]; 539 u64 read_stamp; 540 /* pages removed since last reset */ 541 unsigned long pages_removed; 542 543 unsigned int mapped; 544 unsigned int user_mapped; /* user space mapping */ 545 struct mutex mapping_lock; 546 struct buffer_page **subbuf_ids; /* ID to subbuf VA */ 547 struct trace_buffer_meta *meta_page; 548 struct ring_buffer_cpu_meta *ring_meta; 549 550 struct ring_buffer_remote *remote; 551 552 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 553 long nr_pages_to_update; 554 struct list_head new_pages; /* new pages to add */ 555 struct work_struct update_pages_work; 556 struct completion update_done; 557 558 struct rb_irq_work irq_work; 559 }; 560 561 struct trace_buffer { 562 unsigned flags; 563 int cpus; 564 atomic_t record_disabled; 565 atomic_t resizing; 566 cpumask_var_t cpumask; 567 568 struct lock_class_key *reader_lock_key; 569 570 struct mutex mutex; 571 572 struct ring_buffer_per_cpu **buffers; 573 574 struct ring_buffer_remote *remote; 575 576 struct hlist_node node; 577 u64 (*clock)(void); 578 579 struct rb_irq_work irq_work; 580 bool time_stamp_abs; 581 582 unsigned long range_addr_start; 583 unsigned long range_addr_end; 584 struct notifier_block flush_nb; 585 586 struct ring_buffer_meta *meta; 587 588 unsigned int subbuf_size; 589 unsigned int subbuf_order; 590 unsigned int max_data_size; 591 }; 592 593 struct ring_buffer_iter { 594 struct ring_buffer_per_cpu *cpu_buffer; 595 unsigned long head; 596 unsigned long next_event; 597 struct buffer_page *head_page; 598 struct buffer_page *cache_reader_page; 599 unsigned long cache_read; 600 unsigned long cache_pages_removed; 601 u64 read_stamp; 602 u64 page_stamp; 603 struct ring_buffer_event *event; 604 size_t event_size; 605 int missed_events; 606 }; 607 608 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 609 { 610 struct buffer_data_page field; 611 612 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 613 "offset:0;\tsize:%u;\tsigned:%u;\n", 614 (unsigned int)sizeof(field.time_stamp), 615 (unsigned int)is_signed_type(u64)); 616 617 trace_seq_printf(s, "\tfield: local_t commit;\t" 618 "offset:%u;\tsize:%u;\tsigned:%u;\n", 619 (unsigned int)offsetof(typeof(field), commit), 620 (unsigned int)sizeof(field.commit), 621 (unsigned int)is_signed_type(long)); 622 623 trace_seq_printf(s, "\tfield: char overwrite;\t" 624 "offset:%u;\tsize:%u;\tsigned:%u;\n", 625 (unsigned int)offsetof(typeof(field), commit), 626 1, 627 (unsigned int)is_signed_type(char)); 628 629 trace_seq_printf(s, "\tfield: char data;\t" 630 "offset:%u;\tsize:%u;\tsigned:%u;\n", 631 (unsigned int)offsetof(typeof(field), data), 632 (unsigned int)(buffer ? buffer->subbuf_size : 633 PAGE_SIZE - BUF_PAGE_HDR_SIZE), 634 (unsigned int)is_signed_type(char)); 635 636 return !trace_seq_has_overflowed(s); 637 } 638 639 static inline void rb_time_read(rb_time_t *t, u64 *ret) 640 { 641 *ret = local64_read(&t->time); 642 } 643 static void rb_time_set(rb_time_t *t, u64 val) 644 { 645 local64_set(&t->time, val); 646 } 647 648 /* 649 * Enable this to make sure that the event passed to 650 * ring_buffer_event_time_stamp() is not committed and also 651 * is on the buffer that it passed in. 652 */ 653 //#define RB_VERIFY_EVENT 654 #ifdef RB_VERIFY_EVENT 655 static struct list_head *rb_list_head(struct list_head *list); 656 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 657 void *event) 658 { 659 struct buffer_page *page = cpu_buffer->commit_page; 660 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 661 struct list_head *next; 662 long commit, write; 663 unsigned long addr = (unsigned long)event; 664 bool done = false; 665 int stop = 0; 666 667 /* Make sure the event exists and is not committed yet */ 668 do { 669 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 670 done = true; 671 commit = rb_page_commit(page); 672 write = local_read(&page->write); 673 if (addr >= (unsigned long)&page->page->data[commit] && 674 addr < (unsigned long)&page->page->data[write]) 675 return; 676 677 next = rb_list_head(page->list.next); 678 page = list_entry(next, struct buffer_page, list); 679 } while (!done); 680 WARN_ON_ONCE(1); 681 } 682 #else 683 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 684 void *event) 685 { 686 } 687 #endif 688 689 /* 690 * The absolute time stamp drops the 5 MSBs and some clocks may 691 * require them. The rb_fix_abs_ts() will take a previous full 692 * time stamp, and add the 5 MSB of that time stamp on to the 693 * saved absolute time stamp. Then they are compared in case of 694 * the unlikely event that the latest time stamp incremented 695 * the 5 MSB. 696 */ 697 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 698 { 699 if (save_ts & TS_MSB) { 700 abs |= save_ts & TS_MSB; 701 /* Check for overflow */ 702 if (unlikely(abs < save_ts)) 703 abs += 1ULL << 59; 704 } 705 return abs; 706 } 707 708 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 709 710 /** 711 * ring_buffer_event_time_stamp - return the event's current time stamp 712 * @buffer: The buffer that the event is on 713 * @event: the event to get the time stamp of 714 * 715 * Note, this must be called after @event is reserved, and before it is 716 * committed to the ring buffer. And must be called from the same 717 * context where the event was reserved (normal, softirq, irq, etc). 718 * 719 * Returns the time stamp associated with the current event. 720 * If the event has an extended time stamp, then that is used as 721 * the time stamp to return. 722 * In the highly unlikely case that the event was nested more than 723 * the max nesting, then the write_stamp of the buffer is returned, 724 * otherwise current time is returned, but that really neither of 725 * the last two cases should ever happen. 726 */ 727 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 728 struct ring_buffer_event *event) 729 { 730 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 731 unsigned int nest; 732 u64 ts; 733 734 /* If the event includes an absolute time, then just use that */ 735 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 736 ts = rb_event_time_stamp(event); 737 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 738 } 739 740 nest = local_read(&cpu_buffer->committing); 741 verify_event(cpu_buffer, event); 742 if (WARN_ON_ONCE(!nest)) 743 goto fail; 744 745 /* Read the current saved nesting level time stamp */ 746 if (likely(--nest < MAX_NEST)) 747 return cpu_buffer->event_stamp[nest]; 748 749 /* Shouldn't happen, warn if it does */ 750 WARN_ONCE(1, "nest (%d) greater than max", nest); 751 752 fail: 753 rb_time_read(&cpu_buffer->write_stamp, &ts); 754 755 return ts; 756 } 757 758 /** 759 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 760 * @buffer: The ring_buffer to get the number of pages from 761 * @cpu: The cpu of the ring_buffer to get the number of pages from 762 * 763 * Returns the number of pages that have content in the ring buffer. 764 */ 765 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 766 { 767 size_t read; 768 size_t lost; 769 size_t cnt; 770 771 read = local_read(&buffer->buffers[cpu]->pages_read); 772 lost = local_read(&buffer->buffers[cpu]->pages_lost); 773 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 774 775 if (WARN_ON_ONCE(cnt < lost)) 776 return 0; 777 778 cnt -= lost; 779 780 /* The reader can read an empty page, but not more than that */ 781 if (cnt < read) { 782 WARN_ON_ONCE(read > cnt + 1); 783 return 0; 784 } 785 786 return cnt - read; 787 } 788 789 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 790 { 791 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 792 size_t nr_pages; 793 size_t dirty; 794 795 nr_pages = cpu_buffer->nr_pages; 796 if (!nr_pages || !full) 797 return true; 798 799 /* 800 * Add one as dirty will never equal nr_pages, as the sub-buffer 801 * that the writer is on is not counted as dirty. 802 * This is needed if "buffer_percent" is set to 100. 803 */ 804 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 805 806 return (dirty * 100) >= (full * nr_pages); 807 } 808 809 /* 810 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 811 * 812 * Schedules a delayed work to wake up any task that is blocked on the 813 * ring buffer waiters queue. 814 */ 815 static void rb_wake_up_waiters(struct irq_work *work) 816 { 817 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 818 819 /* For waiters waiting for the first wake up */ 820 (void)atomic_fetch_inc_release(&rbwork->seq); 821 822 wake_up_all(&rbwork->waiters); 823 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 824 /* Only cpu_buffer sets the above flags */ 825 struct ring_buffer_per_cpu *cpu_buffer = 826 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 827 828 /* Called from interrupt context */ 829 raw_spin_lock(&cpu_buffer->reader_lock); 830 rbwork->wakeup_full = false; 831 rbwork->full_waiters_pending = false; 832 833 /* Waking up all waiters, they will reset the shortest full */ 834 cpu_buffer->shortest_full = 0; 835 raw_spin_unlock(&cpu_buffer->reader_lock); 836 837 wake_up_all(&rbwork->full_waiters); 838 } 839 } 840 841 /** 842 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 843 * @buffer: The ring buffer to wake waiters on 844 * @cpu: The CPU buffer to wake waiters on 845 * 846 * In the case of a file that represents a ring buffer is closing, 847 * it is prudent to wake up any waiters that are on this. 848 */ 849 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 850 { 851 struct ring_buffer_per_cpu *cpu_buffer; 852 struct rb_irq_work *rbwork; 853 854 if (!buffer) 855 return; 856 857 if (cpu == RING_BUFFER_ALL_CPUS) { 858 859 /* Wake up individual ones too. One level recursion */ 860 for_each_buffer_cpu(buffer, cpu) 861 ring_buffer_wake_waiters(buffer, cpu); 862 863 rbwork = &buffer->irq_work; 864 } else { 865 if (WARN_ON_ONCE(!buffer->buffers)) 866 return; 867 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 868 return; 869 870 cpu_buffer = buffer->buffers[cpu]; 871 /* The CPU buffer may not have been initialized yet */ 872 if (!cpu_buffer) 873 return; 874 rbwork = &cpu_buffer->irq_work; 875 } 876 877 /* This can be called in any context */ 878 irq_work_queue(&rbwork->work); 879 } 880 881 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 882 { 883 struct ring_buffer_per_cpu *cpu_buffer; 884 bool ret = false; 885 886 /* Reads of all CPUs always waits for any data */ 887 if (cpu == RING_BUFFER_ALL_CPUS) 888 return !ring_buffer_empty(buffer); 889 890 cpu_buffer = buffer->buffers[cpu]; 891 892 if (!ring_buffer_empty_cpu(buffer, cpu)) { 893 unsigned long flags; 894 bool pagebusy; 895 896 if (!full) 897 return true; 898 899 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 900 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 901 ret = !pagebusy && full_hit(buffer, cpu, full); 902 903 if (!ret && (!cpu_buffer->shortest_full || 904 cpu_buffer->shortest_full > full)) { 905 cpu_buffer->shortest_full = full; 906 } 907 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 908 } 909 return ret; 910 } 911 912 static inline bool 913 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 914 int cpu, int full, ring_buffer_cond_fn cond, void *data) 915 { 916 if (rb_watermark_hit(buffer, cpu, full)) 917 return true; 918 919 if (cond(data)) 920 return true; 921 922 /* 923 * The events can happen in critical sections where 924 * checking a work queue can cause deadlocks. 925 * After adding a task to the queue, this flag is set 926 * only to notify events to try to wake up the queue 927 * using irq_work. 928 * 929 * We don't clear it even if the buffer is no longer 930 * empty. The flag only causes the next event to run 931 * irq_work to do the work queue wake up. The worse 932 * that can happen if we race with !trace_empty() is that 933 * an event will cause an irq_work to try to wake up 934 * an empty queue. 935 * 936 * There's no reason to protect this flag either, as 937 * the work queue and irq_work logic will do the necessary 938 * synchronization for the wake ups. The only thing 939 * that is necessary is that the wake up happens after 940 * a task has been queued. It's OK for spurious wake ups. 941 */ 942 if (full) 943 rbwork->full_waiters_pending = true; 944 else 945 rbwork->waiters_pending = true; 946 947 return false; 948 } 949 950 struct rb_wait_data { 951 struct rb_irq_work *irq_work; 952 int seq; 953 }; 954 955 /* 956 * The default wait condition for ring_buffer_wait() is to just to exit the 957 * wait loop the first time it is woken up. 958 */ 959 static bool rb_wait_once(void *data) 960 { 961 struct rb_wait_data *rdata = data; 962 struct rb_irq_work *rbwork = rdata->irq_work; 963 964 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 965 } 966 967 /** 968 * ring_buffer_wait - wait for input to the ring buffer 969 * @buffer: buffer to wait on 970 * @cpu: the cpu buffer to wait on 971 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 972 * @cond: condition function to break out of wait (NULL to run once) 973 * @data: the data to pass to @cond. 974 * 975 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 976 * as data is added to any of the @buffer's cpu buffers. Otherwise 977 * it will wait for data to be added to a specific cpu buffer. 978 */ 979 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 980 ring_buffer_cond_fn cond, void *data) 981 { 982 struct ring_buffer_per_cpu *cpu_buffer; 983 struct wait_queue_head *waitq; 984 struct rb_irq_work *rbwork; 985 struct rb_wait_data rdata; 986 int ret = 0; 987 988 /* 989 * Depending on what the caller is waiting for, either any 990 * data in any cpu buffer, or a specific buffer, put the 991 * caller on the appropriate wait queue. 992 */ 993 if (cpu == RING_BUFFER_ALL_CPUS) { 994 rbwork = &buffer->irq_work; 995 /* Full only makes sense on per cpu reads */ 996 full = 0; 997 } else { 998 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 999 return -ENODEV; 1000 cpu_buffer = buffer->buffers[cpu]; 1001 rbwork = &cpu_buffer->irq_work; 1002 } 1003 1004 if (full) 1005 waitq = &rbwork->full_waiters; 1006 else 1007 waitq = &rbwork->waiters; 1008 1009 /* Set up to exit loop as soon as it is woken */ 1010 if (!cond) { 1011 cond = rb_wait_once; 1012 rdata.irq_work = rbwork; 1013 rdata.seq = atomic_read_acquire(&rbwork->seq); 1014 data = &rdata; 1015 } 1016 1017 ret = wait_event_interruptible((*waitq), 1018 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 1019 1020 return ret; 1021 } 1022 1023 /** 1024 * ring_buffer_poll_wait - poll on buffer input 1025 * @buffer: buffer to wait on 1026 * @cpu: the cpu buffer to wait on 1027 * @filp: the file descriptor 1028 * @poll_table: The poll descriptor 1029 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1030 * 1031 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1032 * as data is added to any of the @buffer's cpu buffers. Otherwise 1033 * it will wait for data to be added to a specific cpu buffer. 1034 * 1035 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1036 * zero otherwise. 1037 */ 1038 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1039 struct file *filp, poll_table *poll_table, int full) 1040 { 1041 struct ring_buffer_per_cpu *cpu_buffer; 1042 struct rb_irq_work *rbwork; 1043 1044 if (cpu == RING_BUFFER_ALL_CPUS) { 1045 rbwork = &buffer->irq_work; 1046 full = 0; 1047 } else { 1048 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1049 return EPOLLERR; 1050 1051 cpu_buffer = buffer->buffers[cpu]; 1052 rbwork = &cpu_buffer->irq_work; 1053 } 1054 1055 if (full) { 1056 poll_wait(filp, &rbwork->full_waiters, poll_table); 1057 1058 if (rb_watermark_hit(buffer, cpu, full)) 1059 return EPOLLIN | EPOLLRDNORM; 1060 /* 1061 * Only allow full_waiters_pending update to be seen after 1062 * the shortest_full is set (in rb_watermark_hit). If the 1063 * writer sees the full_waiters_pending flag set, it will 1064 * compare the amount in the ring buffer to shortest_full. 1065 * If the amount in the ring buffer is greater than the 1066 * shortest_full percent, it will call the irq_work handler 1067 * to wake up this list. The irq_handler will reset shortest_full 1068 * back to zero. That's done under the reader_lock, but 1069 * the below smp_mb() makes sure that the update to 1070 * full_waiters_pending doesn't leak up into the above. 1071 */ 1072 smp_mb(); 1073 rbwork->full_waiters_pending = true; 1074 return 0; 1075 } 1076 1077 poll_wait(filp, &rbwork->waiters, poll_table); 1078 rbwork->waiters_pending = true; 1079 1080 /* 1081 * There's a tight race between setting the waiters_pending and 1082 * checking if the ring buffer is empty. Once the waiters_pending bit 1083 * is set, the next event will wake the task up, but we can get stuck 1084 * if there's only a single event in. 1085 * 1086 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1087 * but adding a memory barrier to all events will cause too much of a 1088 * performance hit in the fast path. We only need a memory barrier when 1089 * the buffer goes from empty to having content. But as this race is 1090 * extremely small, and it's not a problem if another event comes in, we 1091 * will fix it later. 1092 */ 1093 smp_mb(); 1094 1095 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1096 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1097 return EPOLLIN | EPOLLRDNORM; 1098 return 0; 1099 } 1100 1101 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1102 #define RB_WARN_ON(b, cond) \ 1103 ({ \ 1104 int _____ret = unlikely(cond); \ 1105 if (_____ret) { \ 1106 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1107 struct ring_buffer_per_cpu *__b = \ 1108 (void *)b; \ 1109 atomic_inc(&__b->buffer->record_disabled); \ 1110 } else \ 1111 atomic_inc(&b->record_disabled); \ 1112 WARN_ON(1); \ 1113 } \ 1114 _____ret; \ 1115 }) 1116 1117 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1118 #define DEBUG_SHIFT 0 1119 1120 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1121 { 1122 u64 ts; 1123 1124 /* Skip retpolines :-( */ 1125 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1126 ts = trace_clock_local(); 1127 else 1128 ts = buffer->clock(); 1129 1130 /* shift to debug/test normalization and TIME_EXTENTS */ 1131 return ts << DEBUG_SHIFT; 1132 } 1133 1134 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1135 { 1136 u64 time; 1137 1138 preempt_disable_notrace(); 1139 time = rb_time_stamp(buffer); 1140 preempt_enable_notrace(); 1141 1142 return time; 1143 } 1144 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1145 1146 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1147 int cpu, u64 *ts) 1148 { 1149 /* Just stupid testing the normalize function and deltas */ 1150 *ts >>= DEBUG_SHIFT; 1151 } 1152 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1153 1154 /* 1155 * Making the ring buffer lockless makes things tricky. 1156 * Although writes only happen on the CPU that they are on, 1157 * and they only need to worry about interrupts. Reads can 1158 * happen on any CPU. 1159 * 1160 * The reader page is always off the ring buffer, but when the 1161 * reader finishes with a page, it needs to swap its page with 1162 * a new one from the buffer. The reader needs to take from 1163 * the head (writes go to the tail). But if a writer is in overwrite 1164 * mode and wraps, it must push the head page forward. 1165 * 1166 * Here lies the problem. 1167 * 1168 * The reader must be careful to replace only the head page, and 1169 * not another one. As described at the top of the file in the 1170 * ASCII art, the reader sets its old page to point to the next 1171 * page after head. It then sets the page after head to point to 1172 * the old reader page. But if the writer moves the head page 1173 * during this operation, the reader could end up with the tail. 1174 * 1175 * We use cmpxchg to help prevent this race. We also do something 1176 * special with the page before head. We set the LSB to 1. 1177 * 1178 * When the writer must push the page forward, it will clear the 1179 * bit that points to the head page, move the head, and then set 1180 * the bit that points to the new head page. 1181 * 1182 * We also don't want an interrupt coming in and moving the head 1183 * page on another writer. Thus we use the second LSB to catch 1184 * that too. Thus: 1185 * 1186 * head->list->prev->next bit 1 bit 0 1187 * ------- ------- 1188 * Normal page 0 0 1189 * Points to head page 0 1 1190 * New head page 1 0 1191 * 1192 * Note we can not trust the prev pointer of the head page, because: 1193 * 1194 * +----+ +-----+ +-----+ 1195 * | |------>| T |---X--->| N | 1196 * | |<------| | | | 1197 * +----+ +-----+ +-----+ 1198 * ^ ^ | 1199 * | +-----+ | | 1200 * +----------| R |----------+ | 1201 * | |<-----------+ 1202 * +-----+ 1203 * 1204 * Key: ---X--> HEAD flag set in pointer 1205 * T Tail page 1206 * R Reader page 1207 * N Next page 1208 * 1209 * (see __rb_reserve_next() to see where this happens) 1210 * 1211 * What the above shows is that the reader just swapped out 1212 * the reader page with a page in the buffer, but before it 1213 * could make the new header point back to the new page added 1214 * it was preempted by a writer. The writer moved forward onto 1215 * the new page added by the reader and is about to move forward 1216 * again. 1217 * 1218 * You can see, it is legitimate for the previous pointer of 1219 * the head (or any page) not to point back to itself. But only 1220 * temporarily. 1221 */ 1222 1223 #define RB_PAGE_NORMAL 0UL 1224 #define RB_PAGE_HEAD 1UL 1225 #define RB_PAGE_UPDATE 2UL 1226 1227 1228 #define RB_FLAG_MASK 3UL 1229 1230 /* PAGE_MOVED is not part of the mask */ 1231 #define RB_PAGE_MOVED 4UL 1232 1233 /* 1234 * rb_list_head - remove any bit 1235 */ 1236 static struct list_head *rb_list_head(struct list_head *list) 1237 { 1238 unsigned long val = (unsigned long)list; 1239 1240 return (struct list_head *)(val & ~RB_FLAG_MASK); 1241 } 1242 1243 /* 1244 * rb_is_head_page - test if the given page is the head page 1245 * 1246 * Because the reader may move the head_page pointer, we can 1247 * not trust what the head page is (it may be pointing to 1248 * the reader page). But if the next page is a header page, 1249 * its flags will be non zero. 1250 */ 1251 static inline int 1252 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1253 { 1254 unsigned long val; 1255 1256 val = (unsigned long)list->next; 1257 1258 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1259 return RB_PAGE_MOVED; 1260 1261 return val & RB_FLAG_MASK; 1262 } 1263 1264 /* 1265 * rb_is_reader_page 1266 * 1267 * The unique thing about the reader page, is that, if the 1268 * writer is ever on it, the previous pointer never points 1269 * back to the reader page. 1270 */ 1271 static bool rb_is_reader_page(struct buffer_page *page) 1272 { 1273 struct list_head *list = page->list.prev; 1274 1275 return rb_list_head(list->next) != &page->list; 1276 } 1277 1278 /* 1279 * rb_set_list_to_head - set a list_head to be pointing to head. 1280 */ 1281 static void rb_set_list_to_head(struct list_head *list) 1282 { 1283 unsigned long *ptr; 1284 1285 ptr = (unsigned long *)&list->next; 1286 *ptr |= RB_PAGE_HEAD; 1287 *ptr &= ~RB_PAGE_UPDATE; 1288 } 1289 1290 /* 1291 * rb_head_page_activate - sets up head page 1292 */ 1293 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1294 { 1295 struct buffer_page *head; 1296 1297 head = cpu_buffer->head_page; 1298 if (!head) 1299 return; 1300 1301 /* 1302 * Set the previous list pointer to have the HEAD flag. 1303 */ 1304 rb_set_list_to_head(head->list.prev); 1305 1306 if (cpu_buffer->ring_meta) { 1307 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1308 meta->head_buffer = (unsigned long)head->page; 1309 } 1310 } 1311 1312 static void rb_list_head_clear(struct list_head *list) 1313 { 1314 unsigned long *ptr = (unsigned long *)&list->next; 1315 1316 *ptr &= ~RB_FLAG_MASK; 1317 } 1318 1319 /* 1320 * rb_head_page_deactivate - clears head page ptr (for free list) 1321 */ 1322 static void 1323 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1324 { 1325 struct list_head *hd; 1326 1327 /* Go through the whole list and clear any pointers found. */ 1328 rb_list_head_clear(cpu_buffer->pages); 1329 1330 list_for_each(hd, cpu_buffer->pages) 1331 rb_list_head_clear(hd); 1332 } 1333 1334 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1335 struct buffer_page *head, 1336 struct buffer_page *prev, 1337 int old_flag, int new_flag) 1338 { 1339 struct list_head *list; 1340 unsigned long val = (unsigned long)&head->list; 1341 unsigned long ret; 1342 1343 list = &prev->list; 1344 1345 val &= ~RB_FLAG_MASK; 1346 1347 ret = cmpxchg((unsigned long *)&list->next, 1348 val | old_flag, val | new_flag); 1349 1350 /* check if the reader took the page */ 1351 if ((ret & ~RB_FLAG_MASK) != val) 1352 return RB_PAGE_MOVED; 1353 1354 return ret & RB_FLAG_MASK; 1355 } 1356 1357 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1358 struct buffer_page *head, 1359 struct buffer_page *prev, 1360 int old_flag) 1361 { 1362 return rb_head_page_set(cpu_buffer, head, prev, 1363 old_flag, RB_PAGE_UPDATE); 1364 } 1365 1366 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1367 struct buffer_page *head, 1368 struct buffer_page *prev, 1369 int old_flag) 1370 { 1371 return rb_head_page_set(cpu_buffer, head, prev, 1372 old_flag, RB_PAGE_HEAD); 1373 } 1374 1375 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1376 struct buffer_page *head, 1377 struct buffer_page *prev, 1378 int old_flag) 1379 { 1380 return rb_head_page_set(cpu_buffer, head, prev, 1381 old_flag, RB_PAGE_NORMAL); 1382 } 1383 1384 static inline void rb_inc_page(struct buffer_page **bpage) 1385 { 1386 struct list_head *p = rb_list_head((*bpage)->list.next); 1387 1388 *bpage = list_entry(p, struct buffer_page, list); 1389 } 1390 1391 static inline void rb_dec_page(struct buffer_page **bpage) 1392 { 1393 struct list_head *p = rb_list_head((*bpage)->list.prev); 1394 1395 *bpage = list_entry(p, struct buffer_page, list); 1396 } 1397 1398 static struct buffer_page * 1399 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1400 { 1401 struct buffer_page *head; 1402 struct buffer_page *page; 1403 struct list_head *list; 1404 int i; 1405 1406 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1407 return NULL; 1408 1409 /* sanity check */ 1410 list = cpu_buffer->pages; 1411 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1412 return NULL; 1413 1414 page = head = cpu_buffer->head_page; 1415 /* 1416 * It is possible that the writer moves the header behind 1417 * where we started, and we miss in one loop. 1418 * A second loop should grab the header, but we'll do 1419 * three loops just because I'm paranoid. 1420 */ 1421 for (i = 0; i < 3; i++) { 1422 do { 1423 if (rb_is_head_page(page, page->list.prev)) { 1424 cpu_buffer->head_page = page; 1425 return page; 1426 } 1427 rb_inc_page(&page); 1428 } while (page != head); 1429 } 1430 1431 RB_WARN_ON(cpu_buffer, 1); 1432 1433 return NULL; 1434 } 1435 1436 static bool rb_head_page_replace(struct buffer_page *old, 1437 struct buffer_page *new) 1438 { 1439 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1440 unsigned long val; 1441 1442 val = *ptr & ~RB_FLAG_MASK; 1443 val |= RB_PAGE_HEAD; 1444 1445 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1446 } 1447 1448 /* 1449 * rb_tail_page_update - move the tail page forward 1450 */ 1451 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1452 struct buffer_page *tail_page, 1453 struct buffer_page *next_page) 1454 { 1455 unsigned long old_entries; 1456 unsigned long old_write; 1457 1458 /* 1459 * The tail page now needs to be moved forward. 1460 * 1461 * We need to reset the tail page, but without messing 1462 * with possible erasing of data brought in by interrupts 1463 * that have moved the tail page and are currently on it. 1464 * 1465 * We add a counter to the write field to denote this. 1466 */ 1467 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1468 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1469 1470 /* 1471 * Just make sure we have seen our old_write and synchronize 1472 * with any interrupts that come in. 1473 */ 1474 barrier(); 1475 1476 /* 1477 * If the tail page is still the same as what we think 1478 * it is, then it is up to us to update the tail 1479 * pointer. 1480 */ 1481 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1482 /* Zero the write counter */ 1483 unsigned long val = old_write & ~RB_WRITE_MASK; 1484 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1485 1486 /* 1487 * This will only succeed if an interrupt did 1488 * not come in and change it. In which case, we 1489 * do not want to modify it. 1490 * 1491 * We add (void) to let the compiler know that we do not care 1492 * about the return value of these functions. We use the 1493 * cmpxchg to only update if an interrupt did not already 1494 * do it for us. If the cmpxchg fails, we don't care. 1495 */ 1496 (void)local_cmpxchg(&next_page->write, old_write, val); 1497 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1498 1499 /* 1500 * No need to worry about races with clearing out the commit. 1501 * it only can increment when a commit takes place. But that 1502 * only happens in the outer most nested commit. 1503 */ 1504 local_set(&next_page->page->commit, 0); 1505 1506 /* Either we update tail_page or an interrupt does */ 1507 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1508 local_inc(&cpu_buffer->pages_touched); 1509 } 1510 } 1511 1512 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1513 struct buffer_page *bpage) 1514 { 1515 unsigned long val = (unsigned long)bpage; 1516 1517 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1518 } 1519 1520 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1521 struct list_head *list) 1522 { 1523 if (RB_WARN_ON(cpu_buffer, 1524 rb_list_head(rb_list_head(list->next)->prev) != list)) 1525 return false; 1526 1527 if (RB_WARN_ON(cpu_buffer, 1528 rb_list_head(rb_list_head(list->prev)->next) != list)) 1529 return false; 1530 1531 return true; 1532 } 1533 1534 /** 1535 * rb_check_pages - integrity check of buffer pages 1536 * @cpu_buffer: CPU buffer with pages to test 1537 * 1538 * As a safety measure we check to make sure the data pages have not 1539 * been corrupted. 1540 */ 1541 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1542 { 1543 struct list_head *head, *tmp; 1544 unsigned long buffer_cnt; 1545 unsigned long flags; 1546 int nr_loops = 0; 1547 1548 /* 1549 * Walk the linked list underpinning the ring buffer and validate all 1550 * its next and prev links. 1551 * 1552 * The check acquires the reader_lock to avoid concurrent processing 1553 * with code that could be modifying the list. However, the lock cannot 1554 * be held for the entire duration of the walk, as this would make the 1555 * time when interrupts are disabled non-deterministic, dependent on the 1556 * ring buffer size. Therefore, the code releases and re-acquires the 1557 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1558 * is then used to detect if the list was modified while the lock was 1559 * not held, in which case the check needs to be restarted. 1560 * 1561 * The code attempts to perform the check at most three times before 1562 * giving up. This is acceptable because this is only a self-validation 1563 * to detect problems early on. In practice, the list modification 1564 * operations are fairly spaced, and so this check typically succeeds at 1565 * most on the second try. 1566 */ 1567 again: 1568 if (++nr_loops > 3) 1569 return; 1570 1571 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1572 head = rb_list_head(cpu_buffer->pages); 1573 if (!rb_check_links(cpu_buffer, head)) 1574 goto out_locked; 1575 buffer_cnt = cpu_buffer->cnt; 1576 tmp = head; 1577 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1578 1579 while (true) { 1580 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1581 1582 if (buffer_cnt != cpu_buffer->cnt) { 1583 /* The list was updated, try again. */ 1584 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1585 goto again; 1586 } 1587 1588 tmp = rb_list_head(tmp->next); 1589 if (tmp == head) 1590 /* The iteration circled back, all is done. */ 1591 goto out_locked; 1592 1593 if (!rb_check_links(cpu_buffer, tmp)) 1594 goto out_locked; 1595 1596 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1597 } 1598 1599 out_locked: 1600 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1601 } 1602 1603 /* 1604 * Take an address, add the meta data size as well as the array of 1605 * array subbuffer indexes, then align it to a subbuffer size. 1606 * 1607 * This is used to help find the next per cpu subbuffer within a mapped range. 1608 */ 1609 static unsigned long 1610 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1611 { 1612 addr += sizeof(struct ring_buffer_cpu_meta) + 1613 sizeof(int) * nr_subbufs; 1614 return ALIGN(addr, subbuf_size); 1615 } 1616 1617 /* 1618 * Return the ring_buffer_meta for a given @cpu. 1619 */ 1620 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1621 { 1622 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1623 struct ring_buffer_cpu_meta *meta; 1624 struct ring_buffer_meta *bmeta; 1625 unsigned long ptr; 1626 int nr_subbufs; 1627 1628 bmeta = buffer->meta; 1629 if (!bmeta) 1630 return NULL; 1631 1632 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1633 meta = (struct ring_buffer_cpu_meta *)ptr; 1634 1635 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1636 if (!nr_pages) { 1637 nr_subbufs = meta->nr_subbufs; 1638 } else { 1639 /* Include the reader page */ 1640 nr_subbufs = nr_pages + 1; 1641 } 1642 1643 /* 1644 * The first chunk may not be subbuffer aligned, where as 1645 * the rest of the chunks are. 1646 */ 1647 if (cpu) { 1648 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1649 ptr += subbuf_size * nr_subbufs; 1650 1651 /* We can use multiplication to find chunks greater than 1 */ 1652 if (cpu > 1) { 1653 unsigned long size; 1654 unsigned long p; 1655 1656 /* Save the beginning of this CPU chunk */ 1657 p = ptr; 1658 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1659 ptr += subbuf_size * nr_subbufs; 1660 1661 /* Now all chunks after this are the same size */ 1662 size = ptr - p; 1663 ptr += size * (cpu - 2); 1664 } 1665 } 1666 return (void *)ptr; 1667 } 1668 1669 /* Return the start of subbufs given the meta pointer */ 1670 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1671 { 1672 int subbuf_size = meta->subbuf_size; 1673 unsigned long ptr; 1674 1675 ptr = (unsigned long)meta; 1676 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1677 1678 return (void *)ptr; 1679 } 1680 1681 /* 1682 * Return a specific sub-buffer for a given @cpu defined by @idx. 1683 */ 1684 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1685 { 1686 struct ring_buffer_cpu_meta *meta; 1687 unsigned long ptr; 1688 int subbuf_size; 1689 1690 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1691 if (!meta) 1692 return NULL; 1693 1694 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1695 return NULL; 1696 1697 subbuf_size = meta->subbuf_size; 1698 1699 /* Map this buffer to the order that's in meta->buffers[] */ 1700 idx = meta->buffers[idx]; 1701 1702 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1703 1704 ptr += subbuf_size * idx; 1705 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1706 return NULL; 1707 1708 return (void *)ptr; 1709 } 1710 1711 /* 1712 * See if the existing memory contains a valid meta section. 1713 * if so, use that, otherwise initialize it. 1714 */ 1715 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1716 { 1717 unsigned long ptr = buffer->range_addr_start; 1718 struct ring_buffer_meta *bmeta; 1719 unsigned long total_size; 1720 int struct_sizes; 1721 1722 bmeta = (struct ring_buffer_meta *)ptr; 1723 buffer->meta = bmeta; 1724 1725 total_size = buffer->range_addr_end - buffer->range_addr_start; 1726 1727 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1728 struct_sizes |= sizeof(*bmeta) << 16; 1729 1730 /* The first buffer will start word size after the meta page */ 1731 ptr += sizeof(*bmeta); 1732 ptr = ALIGN(ptr, sizeof(long)); 1733 ptr += scratch_size; 1734 1735 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1736 pr_info("Ring buffer boot meta mismatch of magic\n"); 1737 goto init; 1738 } 1739 1740 if (bmeta->struct_sizes != struct_sizes) { 1741 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1742 goto init; 1743 } 1744 1745 if (bmeta->total_size != total_size) { 1746 pr_info("Ring buffer boot meta mismatch of total size\n"); 1747 goto init; 1748 } 1749 1750 if (bmeta->buffers_offset > bmeta->total_size) { 1751 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1752 goto init; 1753 } 1754 1755 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1756 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1757 goto init; 1758 } 1759 1760 return true; 1761 1762 init: 1763 bmeta->magic = RING_BUFFER_META_MAGIC; 1764 bmeta->struct_sizes = struct_sizes; 1765 bmeta->total_size = total_size; 1766 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1767 1768 /* Zero out the scratch pad */ 1769 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1770 1771 return false; 1772 } 1773 1774 /* 1775 * See if the existing memory contains valid ring buffer data. 1776 * As the previous kernel must be the same as this kernel, all 1777 * the calculations (size of buffers and number of buffers) 1778 * must be the same. 1779 */ 1780 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1781 struct trace_buffer *buffer, int nr_pages, 1782 unsigned long *subbuf_mask) 1783 { 1784 int subbuf_size = PAGE_SIZE; 1785 unsigned long buffers_start; 1786 unsigned long buffers_end; 1787 int i; 1788 1789 if (!subbuf_mask) 1790 return false; 1791 1792 if (meta->subbuf_size != PAGE_SIZE) { 1793 pr_info("Ring buffer boot meta [%d] invalid subbuf_size\n", cpu); 1794 return false; 1795 } 1796 1797 buffers_start = meta->first_buffer; 1798 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1799 1800 /* Is the head and commit buffers within the range of buffers? */ 1801 if (meta->head_buffer < buffers_start || 1802 meta->head_buffer >= buffers_end) { 1803 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1804 return false; 1805 } 1806 1807 if (meta->commit_buffer < buffers_start || 1808 meta->commit_buffer >= buffers_end) { 1809 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1810 return false; 1811 } 1812 1813 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1814 1815 /* 1816 * Ensure the meta::buffers array has correct data. The data in each subbufs 1817 * are checked later in rb_meta_validate_events(). 1818 */ 1819 for (i = 0; i < meta->nr_subbufs; i++) { 1820 if (meta->buffers[i] < 0 || 1821 meta->buffers[i] >= meta->nr_subbufs) { 1822 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1823 return false; 1824 } 1825 1826 if (test_bit(meta->buffers[i], subbuf_mask)) { 1827 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1828 return false; 1829 } 1830 1831 set_bit(meta->buffers[i], subbuf_mask); 1832 } 1833 1834 return true; 1835 } 1836 1837 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1838 1839 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1840 unsigned long long *timestamp, u64 *delta_ptr) 1841 { 1842 struct ring_buffer_event *event; 1843 u64 ts, delta; 1844 int events = 0; 1845 int len; 1846 int e; 1847 1848 *delta_ptr = 0; 1849 *timestamp = 0; 1850 1851 ts = dpage->time_stamp; 1852 1853 for (e = 0; e < tail; e += len) { 1854 1855 event = (struct ring_buffer_event *)(dpage->data + e); 1856 len = rb_event_length(event); 1857 if (len <= 0 || len > tail - e) 1858 return -1; 1859 1860 switch (event->type_len) { 1861 1862 case RINGBUF_TYPE_TIME_EXTEND: 1863 delta = rb_event_time_stamp(event); 1864 ts += delta; 1865 break; 1866 1867 case RINGBUF_TYPE_TIME_STAMP: 1868 delta = rb_event_time_stamp(event); 1869 delta = rb_fix_abs_ts(delta, ts); 1870 if (delta < ts) { 1871 *delta_ptr = delta; 1872 *timestamp = ts; 1873 return -1; 1874 } 1875 ts = delta; 1876 break; 1877 1878 case RINGBUF_TYPE_PADDING: 1879 if (event->time_delta == 1) 1880 break; 1881 fallthrough; 1882 case RINGBUF_TYPE_DATA: 1883 events++; 1884 ts += event->time_delta; 1885 break; 1886 1887 default: 1888 return -1; 1889 } 1890 } 1891 *timestamp = ts; 1892 return events; 1893 } 1894 1895 struct rb_validation_state { 1896 unsigned long entries; 1897 unsigned long entry_bytes; 1898 int discarded; 1899 u64 ts; 1900 }; 1901 1902 static int __rb_validate_buffer(struct buffer_page *bpage, int cpu, 1903 struct ring_buffer_cpu_meta *meta, 1904 u64 prev_ts, u64 next_ts) 1905 { 1906 struct buffer_data_page *dpage = bpage->page; 1907 unsigned long long ts; 1908 unsigned long tail; 1909 u64 delta; 1910 int ret; 1911 1912 /* 1913 * When a sub-buffer is recovered from a read, the commit value may 1914 * have RB_MISSED_* bits set, as these bits are reset on reuse. 1915 * Even after clearing these bits, a commit value greater than the 1916 * subbuf_size is considered invalid. 1917 */ 1918 tail = rb_data_page_commit(dpage); 1919 if (tail <= meta->subbuf_size - BUF_PAGE_HDR_SIZE) 1920 ret = rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1921 else 1922 ret = -1; 1923 1924 /* 1925 * The timestamp must be greater than @prev_ts and smaller than @next_ts. 1926 * Since this function works in both forward (verify) and reverse (unwind) 1927 * loop, we don't know both @prev_ts and @next_ts at the same time. 1928 * So use the known boundary as the boundary. 1929 */ 1930 if (ret < 0 || (prev_ts && prev_ts > ts) || (next_ts && ts > next_ts)) { 1931 local_set(&bpage->entries, 0); 1932 /* 1933 * Note, the RB_MISSED_EVENTS is only set inside the main write 1934 * buffer by this verification logic. The normal ring buffer 1935 * has this bit set when the page is read and passed to the 1936 * consumers. 1937 */ 1938 local_set(&dpage->commit, RB_MISSED_EVENTS); 1939 dpage->time_stamp = prev_ts ? prev_ts : next_ts; 1940 ret = -1; 1941 } else { 1942 local_set(&bpage->entries, ret); 1943 } 1944 1945 return ret; 1946 } 1947 1948 /** 1949 * rb_validate_buffer - validates a single buffer page and updates the state. 1950 * @bpage: buffer page to validate 1951 * @cpu_buffer: cpu_buffer this page belongs to 1952 * @meta: meta of the cpu_buffer 1953 * @state: validation state 1954 * @prev_ts: previous buffer's timestamp (optional) 1955 * @next_ts: next buffer's timestamp (optional) 1956 * 1957 * If the page is invalid (wrong event length or timestamp), it increments the 1958 * discarded counter and warns it. Otherwise, it updates the validation state. 1959 */ 1960 static void rb_validate_buffer(struct buffer_page *bpage, 1961 struct ring_buffer_per_cpu *cpu_buffer, 1962 struct ring_buffer_cpu_meta *meta, 1963 struct rb_validation_state *state, 1964 u64 prev_ts, u64 next_ts) 1965 { 1966 int ret; 1967 1968 ret = __rb_validate_buffer(bpage, cpu_buffer->cpu, meta, prev_ts, next_ts); 1969 if (ret < 0) { 1970 if (!state->discarded) 1971 pr_info("Ring buffer meta [%d] invalid buffer page detected\n", 1972 cpu_buffer->cpu); 1973 state->discarded++; 1974 } else { 1975 /* If the buffer has content, update pages_touched */ 1976 if (ret) 1977 local_inc(&cpu_buffer->pages_touched); 1978 1979 state->entries += ret; 1980 state->entry_bytes += rb_page_size(bpage); 1981 state->ts = bpage->page->time_stamp; 1982 } 1983 } 1984 1985 static void rb_meta_inject_reader_page(struct ring_buffer_per_cpu *cpu_buffer, 1986 struct ring_buffer_cpu_meta *meta, 1987 struct buffer_page *orig_head, 1988 struct buffer_page *head_page) 1989 { 1990 struct buffer_page *bpage = orig_head; 1991 int i; 1992 1993 rb_dec_page(&bpage); 1994 /* 1995 * Insert the reader_page before the original head page. 1996 * Since the list encode RB_PAGE flags, general list 1997 * operations should be avoided. 1998 */ 1999 cpu_buffer->reader_page->list.next = &orig_head->list; 2000 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 2001 orig_head->list.prev = &cpu_buffer->reader_page->list; 2002 bpage->list.next = &cpu_buffer->reader_page->list; 2003 2004 /* Make the head_page the reader page */ 2005 cpu_buffer->reader_page = head_page; 2006 bpage = head_page; 2007 rb_inc_page(&head_page); 2008 head_page->list.prev = bpage->list.prev; 2009 rb_dec_page(&bpage); 2010 bpage->list.next = &head_page->list; 2011 rb_set_list_to_head(&bpage->list); 2012 cpu_buffer->pages = &head_page->list; 2013 2014 cpu_buffer->head_page = head_page; 2015 meta->head_buffer = (unsigned long)head_page->page; 2016 2017 /* Reset all the indexes */ 2018 bpage = cpu_buffer->reader_page; 2019 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 2020 bpage->id = 0; 2021 2022 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 2023 i++, rb_inc_page(&bpage)) { 2024 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 2025 bpage->id = i; 2026 } 2027 } 2028 2029 /* If the meta data has been validated, now validate the events */ 2030 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 2031 { 2032 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2033 struct buffer_page *head_page, *orig_head, *orig_reader; 2034 struct rb_validation_state state = { 0 }; 2035 bool skip = false; 2036 int ret; 2037 int i; 2038 2039 if (!meta || !meta->head_buffer) 2040 return; 2041 2042 orig_head = head_page = cpu_buffer->head_page; 2043 orig_reader = cpu_buffer->reader_page; 2044 2045 /* Do the head page first */ 2046 ret = __rb_validate_buffer(head_page, cpu_buffer->cpu, meta, 0, 0); 2047 if (ret < 0) { 2048 pr_info("Ring buffer meta [%d] invalid head page detected\n", 2049 cpu_buffer->cpu); 2050 /* Don't bother rewinding */ 2051 skip = true; 2052 state.ts = 0; 2053 } else { 2054 state.ts = head_page->page->time_stamp; 2055 } 2056 2057 /* Do the reader page - reader must be previous to head. */ 2058 rb_validate_buffer(orig_reader, cpu_buffer, meta, &state, 0, state.ts); 2059 2060 if (skip) 2061 goto skip_rewind; 2062 2063 /* 2064 * Try to rewind the head so that we can read the pages which are already 2065 * read in the previous boot. 2066 */ 2067 if (head_page == cpu_buffer->tail_page) 2068 goto skip_rewind; 2069 2070 rb_dec_page(&head_page); 2071 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 2072 2073 /* Rewind until tail (writer) page. */ 2074 if (head_page == cpu_buffer->tail_page) 2075 break; 2076 2077 /* Rewind until unused page (no timestamp, no commit). */ 2078 if (!head_page->page->time_stamp && rb_page_commit(head_page) == 0) 2079 break; 2080 2081 /* 2082 * Skip if the page is invalid, or its timestamp is newer than the 2083 * previous valid page. 2084 */ 2085 rb_validate_buffer(head_page, cpu_buffer, meta, &state, 0, state.ts); 2086 } 2087 if (i) 2088 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 2089 2090 /* The last rewound page must be skipped. */ 2091 if (head_page != orig_head) 2092 rb_inc_page(&head_page); 2093 2094 /* 2095 * If the ring buffer was rewound, then inject the reader page 2096 * into the location just before the original head page. 2097 */ 2098 if (head_page != orig_head) { 2099 rb_meta_inject_reader_page(cpu_buffer, meta, orig_head, head_page); 2100 /* We'll restart verifying from orig_head */ 2101 head_page = orig_head; 2102 } 2103 2104 skip_rewind: 2105 /* If the commit_buffer is the reader page, update the commit page */ 2106 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2107 cpu_buffer->commit_page = cpu_buffer->reader_page; 2108 /* Nothing more to do, the only page is the reader page */ 2109 goto done; 2110 } 2111 state.ts = head_page->page->time_stamp; 2112 2113 /* Iterate until finding the commit page */ 2114 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2115 2116 /* The original reader page has already been checked/counted. */ 2117 if (head_page == orig_reader) 2118 continue; 2119 2120 rb_validate_buffer(head_page, cpu_buffer, meta, &state, state.ts, 0); 2121 2122 if (head_page == cpu_buffer->commit_page) 2123 break; 2124 } 2125 2126 if (head_page != cpu_buffer->commit_page) { 2127 pr_info("Ring buffer meta [%d] commit page not found\n", 2128 cpu_buffer->cpu); 2129 goto invalid; 2130 } 2131 done: 2132 local_set(&cpu_buffer->entries, state.entries); 2133 local_set(&cpu_buffer->entries_bytes, state.entry_bytes); 2134 2135 pr_info("Ring buffer meta [%d] is from previous boot!", cpu_buffer->cpu); 2136 if (state.discarded) 2137 pr_cont(" (%d pages discarded)", state.discarded); 2138 pr_cont("\n"); 2139 2140 #ifdef CONFIG_RING_BUFFER_PERSISTENT_INJECT 2141 if (meta->nr_invalid) 2142 pr_warn("Ring buffer testing [%d] invalid pages: %s (%d/%d)\n", 2143 cpu_buffer->cpu, 2144 (state.discarded == meta->nr_invalid) ? "PASSED" : "FAILED", 2145 state.discarded, meta->nr_invalid); 2146 if (meta->entry_bytes) 2147 pr_warn("Ring buffer testing [%d] entry_bytes: %s (%ld/%ld)\n", 2148 cpu_buffer->cpu, 2149 (state.entry_bytes == meta->entry_bytes) ? "PASSED" : "FAILED", 2150 (long)state.entry_bytes, (long)meta->entry_bytes); 2151 meta->nr_invalid = 0; 2152 meta->entry_bytes = 0; 2153 #endif 2154 return; 2155 2156 invalid: 2157 /* The content of the buffers are invalid, reset the meta data */ 2158 meta->head_buffer = 0; 2159 meta->commit_buffer = 0; 2160 2161 /* Reset the reader page */ 2162 local_set(&cpu_buffer->reader_page->entries, 0); 2163 rb_init_data_page(cpu_buffer->reader_page->page); 2164 2165 /* Reset all the subbuffers */ 2166 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2167 local_set(&head_page->entries, 0); 2168 rb_init_data_page(head_page->page); 2169 } 2170 } 2171 2172 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2173 { 2174 struct ring_buffer_cpu_meta *meta; 2175 unsigned long *subbuf_mask; 2176 unsigned long delta; 2177 void *subbuf; 2178 bool valid = false; 2179 int cpu; 2180 int i; 2181 2182 /* Create a mask to test the subbuf array */ 2183 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2184 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2185 2186 if (rb_meta_init(buffer, scratch_size)) 2187 valid = true; 2188 2189 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2190 void *next_meta; 2191 2192 meta = rb_range_meta(buffer, nr_pages, cpu); 2193 2194 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2195 /* Make the mappings match the current address */ 2196 subbuf = rb_subbufs_from_meta(meta); 2197 delta = (unsigned long)subbuf - meta->first_buffer; 2198 meta->first_buffer += delta; 2199 meta->head_buffer += delta; 2200 meta->commit_buffer += delta; 2201 continue; 2202 } 2203 2204 if (cpu < nr_cpu_ids - 1) 2205 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2206 else 2207 next_meta = (void *)buffer->range_addr_end; 2208 2209 memset(meta, 0, next_meta - (void *)meta); 2210 2211 meta->nr_subbufs = nr_pages + 1; 2212 meta->subbuf_size = PAGE_SIZE; 2213 2214 subbuf = rb_subbufs_from_meta(meta); 2215 2216 meta->first_buffer = (unsigned long)subbuf; 2217 2218 /* 2219 * The buffers[] array holds the order of the sub-buffers 2220 * that are after the meta data. The sub-buffers may 2221 * be swapped out when read and inserted into a different 2222 * location of the ring buffer. Although their addresses 2223 * remain the same, the buffers[] array contains the 2224 * index into the sub-buffers holding their actual order. 2225 */ 2226 for (i = 0; i < meta->nr_subbufs; i++) { 2227 meta->buffers[i] = i; 2228 rb_init_data_page(subbuf); 2229 subbuf += meta->subbuf_size; 2230 } 2231 } 2232 bitmap_free(subbuf_mask); 2233 } 2234 2235 static void *rbm_start(struct seq_file *m, loff_t *pos) 2236 { 2237 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2238 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2239 unsigned long val; 2240 2241 if (!meta) 2242 return NULL; 2243 2244 if (*pos > meta->nr_subbufs) 2245 return NULL; 2246 2247 val = *pos; 2248 val++; 2249 2250 return (void *)val; 2251 } 2252 2253 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2254 { 2255 (*pos)++; 2256 2257 return rbm_start(m, pos); 2258 } 2259 2260 static int rbm_show(struct seq_file *m, void *v) 2261 { 2262 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2263 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2264 unsigned long val = (unsigned long)v; 2265 struct buffer_data_page *dpage; 2266 2267 if (val == 1) { 2268 seq_printf(m, "head_buffer: %d\n", 2269 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2270 seq_printf(m, "commit_buffer: %d\n", 2271 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2272 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2273 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2274 return 0; 2275 } 2276 2277 val -= 2; 2278 dpage = rb_range_buffer(cpu_buffer, val); 2279 seq_printf(m, "buffer[%ld]: %d (commit: %ld)\n", 2280 val, meta->buffers[val], dpage ? rb_data_page_commit(dpage) : -1); 2281 2282 return 0; 2283 } 2284 2285 static void rbm_stop(struct seq_file *m, void *p) 2286 { 2287 } 2288 2289 static const struct seq_operations rb_meta_seq_ops = { 2290 .start = rbm_start, 2291 .next = rbm_next, 2292 .show = rbm_show, 2293 .stop = rbm_stop, 2294 }; 2295 2296 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2297 { 2298 struct seq_file *m; 2299 int ret; 2300 2301 ret = seq_open(file, &rb_meta_seq_ops); 2302 if (ret) 2303 return ret; 2304 2305 m = file->private_data; 2306 m->private = buffer->buffers[cpu]; 2307 2308 return 0; 2309 } 2310 2311 /* Map the buffer_pages to the previous head and commit pages */ 2312 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2313 struct buffer_page *bpage) 2314 { 2315 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2316 2317 if (meta->head_buffer == (unsigned long)bpage->page) 2318 cpu_buffer->head_page = bpage; 2319 2320 if (meta->commit_buffer == (unsigned long)bpage->page) { 2321 cpu_buffer->commit_page = bpage; 2322 cpu_buffer->tail_page = bpage; 2323 } 2324 } 2325 2326 static struct ring_buffer_desc *ring_buffer_desc(struct trace_buffer_desc *trace_desc, int cpu) 2327 { 2328 struct ring_buffer_desc *desc, *end; 2329 size_t len; 2330 int i; 2331 2332 if (!trace_desc) 2333 return NULL; 2334 2335 if (cpu >= trace_desc->nr_cpus) 2336 return NULL; 2337 2338 end = (struct ring_buffer_desc *)((void *)trace_desc + trace_desc->struct_len); 2339 desc = __first_ring_buffer_desc(trace_desc); 2340 len = struct_size(desc, page_va, desc->nr_page_va); 2341 desc = (struct ring_buffer_desc *)((void *)desc + (len * cpu)); 2342 2343 if (desc < end && desc->cpu == cpu) 2344 return desc; 2345 2346 /* Missing CPUs, need to linear search */ 2347 for_each_ring_buffer_desc(desc, i, trace_desc) { 2348 if (desc->cpu == cpu) 2349 return desc; 2350 } 2351 2352 return NULL; 2353 } 2354 2355 static void *ring_buffer_desc_page(struct ring_buffer_desc *desc, unsigned int page_id) 2356 { 2357 return page_id >= desc->nr_page_va ? NULL : (void *)desc->page_va[page_id]; 2358 } 2359 2360 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2361 long nr_pages, struct list_head *pages) 2362 { 2363 struct trace_buffer *buffer = cpu_buffer->buffer; 2364 struct ring_buffer_cpu_meta *meta = NULL; 2365 struct buffer_page *bpage, *tmp; 2366 bool user_thread = current->mm != NULL; 2367 struct ring_buffer_desc *desc = NULL; 2368 long i; 2369 2370 /* 2371 * Check if the available memory is there first. 2372 * Note, si_mem_available() only gives us a rough estimate of available 2373 * memory. It may not be accurate. But we don't care, we just want 2374 * to prevent doing any allocation when it is obvious that it is 2375 * not going to succeed. 2376 */ 2377 i = si_mem_available(); 2378 if (i < nr_pages) 2379 return -ENOMEM; 2380 2381 /* 2382 * If a user thread allocates too much, and si_mem_available() 2383 * reports there's enough memory, even though there is not. 2384 * Make sure the OOM killer kills this thread. This can happen 2385 * even with RETRY_MAYFAIL because another task may be doing 2386 * an allocation after this task has taken all memory. 2387 * This is the task the OOM killer needs to take out during this 2388 * loop, even if it was triggered by an allocation somewhere else. 2389 */ 2390 if (user_thread) 2391 set_current_oom_origin(); 2392 2393 if (buffer->range_addr_start) 2394 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2395 2396 if (buffer->remote) { 2397 desc = ring_buffer_desc(buffer->remote->desc, cpu_buffer->cpu); 2398 if (!desc || WARN_ON(desc->nr_page_va != (nr_pages + 1))) 2399 return -EINVAL; 2400 } 2401 2402 for (i = 0; i < nr_pages; i++) { 2403 2404 bpage = alloc_cpu_page(cpu_buffer->cpu); 2405 if (!bpage) 2406 goto free_pages; 2407 2408 rb_check_bpage(cpu_buffer, bpage); 2409 2410 /* 2411 * Append the pages as for mapped buffers we want to keep 2412 * the order 2413 */ 2414 list_add_tail(&bpage->list, pages); 2415 2416 if (meta) { 2417 /* A range was given. Use that for the buffer page */ 2418 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2419 if (!bpage->page) 2420 goto free_pages; 2421 /* If this is valid from a previous boot */ 2422 if (meta->head_buffer) 2423 rb_meta_buffer_update(cpu_buffer, bpage); 2424 bpage->range = 1; 2425 bpage->id = i + 1; 2426 } else if (desc) { 2427 void *p = ring_buffer_desc_page(desc, i + 1); 2428 2429 if (WARN_ON(!p)) 2430 goto free_pages; 2431 2432 bpage->page = p; 2433 bpage->range = 1; /* bpage->page can't be freed */ 2434 bpage->id = i + 1; 2435 cpu_buffer->subbuf_ids[i + 1] = bpage; 2436 } else { 2437 int order = cpu_buffer->buffer->subbuf_order; 2438 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2439 if (!bpage->page) 2440 goto free_pages; 2441 } 2442 bpage->order = cpu_buffer->buffer->subbuf_order; 2443 2444 if (user_thread && fatal_signal_pending(current)) 2445 goto free_pages; 2446 } 2447 if (user_thread) 2448 clear_current_oom_origin(); 2449 2450 return 0; 2451 2452 free_pages: 2453 list_for_each_entry_safe(bpage, tmp, pages, list) { 2454 list_del_init(&bpage->list); 2455 free_buffer_page(bpage); 2456 } 2457 if (user_thread) 2458 clear_current_oom_origin(); 2459 2460 return -ENOMEM; 2461 } 2462 2463 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2464 unsigned long nr_pages) 2465 { 2466 LIST_HEAD(pages); 2467 2468 WARN_ON(!nr_pages); 2469 2470 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2471 return -ENOMEM; 2472 2473 /* 2474 * The ring buffer page list is a circular list that does not 2475 * start and end with a list head. All page list items point to 2476 * other pages. 2477 */ 2478 cpu_buffer->pages = pages.next; 2479 list_del(&pages); 2480 2481 cpu_buffer->nr_pages = nr_pages; 2482 2483 rb_check_pages(cpu_buffer); 2484 2485 return 0; 2486 } 2487 2488 static struct ring_buffer_per_cpu * 2489 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2490 { 2491 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2492 alloc_cpu_buffer(cpu); 2493 struct ring_buffer_cpu_meta *meta; 2494 struct buffer_page *bpage; 2495 int ret; 2496 2497 if (!cpu_buffer) 2498 return NULL; 2499 2500 cpu_buffer->cpu = cpu; 2501 cpu_buffer->buffer = buffer; 2502 raw_spin_lock_init(&cpu_buffer->reader_lock); 2503 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2504 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2505 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2506 init_completion(&cpu_buffer->update_done); 2507 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2508 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2509 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2510 mutex_init(&cpu_buffer->mapping_lock); 2511 2512 bpage = alloc_cpu_page(cpu); 2513 if (!bpage) 2514 return NULL; 2515 2516 rb_check_bpage(cpu_buffer, bpage); 2517 2518 cpu_buffer->reader_page = bpage; 2519 2520 if (buffer->range_addr_start) { 2521 /* 2522 * Range mapped buffers have the same restrictions as memory 2523 * mapped ones do. 2524 */ 2525 cpu_buffer->mapped = 1; 2526 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2527 bpage->page = rb_range_buffer(cpu_buffer, 0); 2528 if (!bpage->page) 2529 goto fail_free_reader; 2530 if (cpu_buffer->ring_meta->head_buffer) 2531 rb_meta_buffer_update(cpu_buffer, bpage); 2532 bpage->range = 1; 2533 } else if (buffer->remote) { 2534 struct ring_buffer_desc *desc = ring_buffer_desc(buffer->remote->desc, cpu); 2535 2536 if (!desc) 2537 goto fail_free_reader; 2538 2539 cpu_buffer->remote = buffer->remote; 2540 cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)desc->meta_va; 2541 cpu_buffer->nr_pages = nr_pages; 2542 cpu_buffer->subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, 2543 sizeof(*cpu_buffer->subbuf_ids), GFP_KERNEL); 2544 if (!cpu_buffer->subbuf_ids) 2545 goto fail_free_reader; 2546 2547 /* Remote buffers are read-only and immutable */ 2548 atomic_inc(&cpu_buffer->record_disabled); 2549 atomic_inc(&cpu_buffer->resize_disabled); 2550 2551 bpage->page = ring_buffer_desc_page(desc, cpu_buffer->meta_page->reader.id); 2552 if (!bpage->page) 2553 goto fail_free_reader; 2554 2555 bpage->range = 1; 2556 cpu_buffer->subbuf_ids[0] = bpage; 2557 } else { 2558 int order = cpu_buffer->buffer->subbuf_order; 2559 bpage->page = alloc_cpu_data(cpu, order); 2560 if (!bpage->page) 2561 goto fail_free_reader; 2562 } 2563 2564 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2565 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2566 2567 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2568 if (ret < 0) 2569 goto fail_free_reader; 2570 2571 rb_meta_validate_events(cpu_buffer); 2572 2573 /* If the boot meta was valid then this has already been updated */ 2574 meta = cpu_buffer->ring_meta; 2575 if (!meta || !meta->head_buffer || 2576 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2577 if (meta && meta->head_buffer && 2578 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2579 pr_warn("Ring buffer meta buffers not all mapped\n"); 2580 if (!cpu_buffer->head_page) 2581 pr_warn(" Missing head_page\n"); 2582 if (!cpu_buffer->commit_page) 2583 pr_warn(" Missing commit_page\n"); 2584 if (!cpu_buffer->tail_page) 2585 pr_warn(" Missing tail_page\n"); 2586 } 2587 2588 cpu_buffer->head_page 2589 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2590 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2591 2592 rb_head_page_activate(cpu_buffer); 2593 2594 if (cpu_buffer->ring_meta) 2595 meta->commit_buffer = meta->head_buffer; 2596 } else { 2597 /* The valid meta buffer still needs to activate the head page */ 2598 rb_head_page_activate(cpu_buffer); 2599 } 2600 2601 return_ptr(cpu_buffer); 2602 2603 fail_free_reader: 2604 free_buffer_page(cpu_buffer->reader_page); 2605 2606 return NULL; 2607 } 2608 2609 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2610 { 2611 struct list_head *head = cpu_buffer->pages; 2612 struct buffer_page *bpage, *tmp; 2613 2614 irq_work_sync(&cpu_buffer->irq_work.work); 2615 2616 if (cpu_buffer->remote) 2617 kfree(cpu_buffer->subbuf_ids); 2618 2619 free_buffer_page(cpu_buffer->reader_page); 2620 2621 if (head) { 2622 rb_head_page_deactivate(cpu_buffer); 2623 2624 list_for_each_entry_safe(bpage, tmp, head, list) { 2625 list_del_init(&bpage->list); 2626 free_buffer_page(bpage); 2627 } 2628 bpage = list_entry(head, struct buffer_page, list); 2629 free_buffer_page(bpage); 2630 } 2631 2632 free_page((unsigned long)cpu_buffer->free_page); 2633 2634 kfree(cpu_buffer); 2635 } 2636 2637 #ifdef CONFIG_RING_BUFFER_PERSISTENT_INJECT 2638 static void rb_test_inject_invalid_pages(struct trace_buffer *buffer) 2639 { 2640 struct ring_buffer_per_cpu *cpu_buffer; 2641 struct ring_buffer_cpu_meta *meta; 2642 struct buffer_data_page *dpage; 2643 unsigned long entry_bytes = 0; 2644 unsigned long ptr; 2645 int subbuf_size; 2646 int invalid = 0; 2647 int cpu; 2648 int i; 2649 2650 if (!(buffer->flags & RB_FL_TESTING)) 2651 return; 2652 2653 guard(preempt)(); 2654 cpu = smp_processor_id(); 2655 2656 cpu_buffer = buffer->buffers[cpu]; 2657 if (!cpu_buffer) 2658 return; 2659 meta = cpu_buffer->ring_meta; 2660 if (!meta) 2661 return; 2662 2663 ptr = (unsigned long)rb_subbufs_from_meta(meta); 2664 subbuf_size = meta->subbuf_size; 2665 2666 for (i = 0; i < meta->nr_subbufs; i++) { 2667 unsigned long idx = meta->buffers[i]; 2668 2669 dpage = (void *)(ptr + idx * subbuf_size); 2670 /* Skip unused pages */ 2671 if (!rb_data_page_commit(dpage)) 2672 continue; 2673 2674 /* 2675 * Invalidate even pages or multiples of 5. This will cause 3 2676 * contiguous invalidated(empty) pages. 2677 */ 2678 if (!(i & 0x1) || !(i % 5)) { 2679 local_add(subbuf_size + 1, &dpage->commit); 2680 invalid++; 2681 } else { 2682 /* Count total commit bytes. */ 2683 entry_bytes += rb_data_page_size(dpage); 2684 } 2685 } 2686 2687 pr_info("Inject invalidated %d pages on CPU%d, total size: %ld\n", 2688 invalid, cpu, (long)entry_bytes); 2689 meta->nr_invalid = invalid; 2690 meta->entry_bytes = entry_bytes; 2691 } 2692 #else /* !CONFIG_RING_BUFFER_PERSISTENT_INJECT */ 2693 #define rb_test_inject_invalid_pages(buffer) do { } while (0) 2694 #endif 2695 2696 /* Stop recording on a persistent buffer and flush cache if needed. */ 2697 static int rb_flush_buffer_cb(struct notifier_block *nb, unsigned long event, void *data) 2698 { 2699 struct trace_buffer *buffer = container_of(nb, struct trace_buffer, flush_nb); 2700 2701 ring_buffer_record_off(buffer); 2702 rb_test_inject_invalid_pages(buffer); 2703 arch_ring_buffer_flush_range(buffer->range_addr_start, buffer->range_addr_end); 2704 return NOTIFY_DONE; 2705 } 2706 2707 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2708 int order, unsigned long start, 2709 unsigned long end, 2710 unsigned long scratch_size, 2711 struct lock_class_key *key, 2712 struct ring_buffer_remote *remote) 2713 { 2714 struct trace_buffer *buffer __free(kfree) = NULL; 2715 long nr_pages; 2716 int subbuf_size; 2717 int bsize; 2718 int cpu; 2719 int ret; 2720 2721 /* keep it in its own cache line */ 2722 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2723 GFP_KERNEL); 2724 if (!buffer) 2725 return NULL; 2726 2727 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2728 return NULL; 2729 2730 buffer->subbuf_order = order; 2731 subbuf_size = (PAGE_SIZE << order); 2732 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2733 2734 /* Max payload is buffer page size - header (8bytes) */ 2735 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2736 2737 buffer->flags = flags; 2738 buffer->clock = trace_clock_local; 2739 buffer->reader_lock_key = key; 2740 2741 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2742 init_waitqueue_head(&buffer->irq_work.waiters); 2743 2744 buffer->cpus = nr_cpu_ids; 2745 2746 bsize = sizeof(void *) * nr_cpu_ids; 2747 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2748 GFP_KERNEL); 2749 if (!buffer->buffers) 2750 goto fail_free_cpumask; 2751 2752 cpu = raw_smp_processor_id(); 2753 2754 /* If start/end are specified, then that overrides size */ 2755 if (start && end) { 2756 unsigned long buffers_start; 2757 unsigned long ptr; 2758 int n; 2759 2760 /* Make sure that start is word aligned */ 2761 start = ALIGN(start, sizeof(long)); 2762 2763 /* scratch_size needs to be aligned too */ 2764 scratch_size = ALIGN(scratch_size, sizeof(long)); 2765 2766 /* Subtract the buffer meta data and word aligned */ 2767 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2768 buffers_start = ALIGN(buffers_start, sizeof(long)); 2769 buffers_start += scratch_size; 2770 2771 /* Calculate the size for the per CPU data */ 2772 size = end - buffers_start; 2773 size = size / nr_cpu_ids; 2774 2775 /* 2776 * The number of sub-buffers (nr_pages) is determined by the 2777 * total size allocated minus the meta data size. 2778 * Then that is divided by the number of per CPU buffers 2779 * needed, plus account for the integer array index that 2780 * will be appended to the meta data. 2781 */ 2782 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2783 (subbuf_size + sizeof(int)); 2784 /* Need at least two pages plus the reader page */ 2785 if (nr_pages < 3) 2786 goto fail_free_buffers; 2787 2788 again: 2789 /* Make sure that the size fits aligned */ 2790 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2791 ptr += sizeof(struct ring_buffer_cpu_meta) + 2792 sizeof(int) * nr_pages; 2793 ptr = ALIGN(ptr, subbuf_size); 2794 ptr += subbuf_size * nr_pages; 2795 } 2796 if (ptr > end) { 2797 if (nr_pages <= 3) 2798 goto fail_free_buffers; 2799 nr_pages--; 2800 goto again; 2801 } 2802 2803 /* nr_pages should not count the reader page */ 2804 nr_pages--; 2805 buffer->range_addr_start = start; 2806 buffer->range_addr_end = end; 2807 2808 rb_range_meta_init(buffer, nr_pages, scratch_size); 2809 } else if (remote) { 2810 struct ring_buffer_desc *desc = ring_buffer_desc(remote->desc, cpu); 2811 2812 buffer->remote = remote; 2813 /* The writer is remote. This ring-buffer is read-only */ 2814 atomic_inc(&buffer->record_disabled); 2815 nr_pages = desc->nr_page_va - 1; 2816 if (nr_pages < 2) 2817 goto fail_free_buffers; 2818 } else { 2819 2820 /* need at least two pages */ 2821 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2822 if (nr_pages < 2) 2823 nr_pages = 2; 2824 } 2825 2826 cpumask_set_cpu(cpu, buffer->cpumask); 2827 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2828 if (!buffer->buffers[cpu]) 2829 goto fail_free_buffers; 2830 2831 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2832 if (ret < 0) 2833 goto fail_free_buffers; 2834 2835 mutex_init(&buffer->mutex); 2836 2837 /* Persistent ring buffer needs to flush cache before reboot. */ 2838 if (start && end) { 2839 buffer->flush_nb.notifier_call = rb_flush_buffer_cb; 2840 atomic_notifier_chain_register(&panic_notifier_list, &buffer->flush_nb); 2841 } 2842 2843 return_ptr(buffer); 2844 2845 fail_free_buffers: 2846 for_each_buffer_cpu(buffer, cpu) { 2847 if (buffer->buffers[cpu]) 2848 rb_free_cpu_buffer(buffer->buffers[cpu]); 2849 } 2850 kfree(buffer->buffers); 2851 2852 fail_free_cpumask: 2853 free_cpumask_var(buffer->cpumask); 2854 2855 return NULL; 2856 } 2857 2858 /** 2859 * __ring_buffer_alloc - allocate a new ring_buffer 2860 * @size: the size in bytes per cpu that is needed. 2861 * @flags: attributes to set for the ring buffer. 2862 * @key: ring buffer reader_lock_key. 2863 * 2864 * Currently the only flag that is available is the RB_FL_OVERWRITE 2865 * flag. This flag means that the buffer will overwrite old data 2866 * when the buffer wraps. If this flag is not set, the buffer will 2867 * drop data when the tail hits the head. 2868 */ 2869 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2870 struct lock_class_key *key) 2871 { 2872 /* Default buffer page size - one system page */ 2873 return alloc_buffer(size, flags, 0, 0, 0, 0, key, NULL); 2874 2875 } 2876 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2877 2878 /** 2879 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2880 * @size: the size in bytes per cpu that is needed. 2881 * @flags: attributes to set for the ring buffer. 2882 * @order: sub-buffer order 2883 * @start: start of allocated range 2884 * @range_size: size of allocated range 2885 * @scratch_size: size of scratch area (for preallocated memory buffers) 2886 * @key: ring buffer reader_lock_key. 2887 * 2888 * Currently the only flag that is available is the RB_FL_OVERWRITE 2889 * flag. This flag means that the buffer will overwrite old data 2890 * when the buffer wraps. If this flag is not set, the buffer will 2891 * drop data when the tail hits the head. 2892 */ 2893 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2894 int order, unsigned long start, 2895 unsigned long range_size, 2896 unsigned long scratch_size, 2897 struct lock_class_key *key) 2898 { 2899 return alloc_buffer(size, flags, order, start, start + range_size, 2900 scratch_size, key, NULL); 2901 } 2902 2903 /** 2904 * __ring_buffer_alloc_remote - allocate a new ring_buffer from a remote 2905 * @remote: Contains a description of the ring-buffer pages and remote callbacks. 2906 * @key: ring buffer reader_lock_key. 2907 */ 2908 struct trace_buffer *__ring_buffer_alloc_remote(struct ring_buffer_remote *remote, 2909 struct lock_class_key *key) 2910 { 2911 return alloc_buffer(0, 0, 0, 0, 0, 0, key, remote); 2912 } 2913 2914 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2915 { 2916 struct ring_buffer_meta *meta; 2917 void *ptr; 2918 2919 if (!buffer || !buffer->meta) 2920 return NULL; 2921 2922 meta = buffer->meta; 2923 2924 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2925 2926 if (size) 2927 *size = (void *)meta + meta->buffers_offset - ptr; 2928 2929 return ptr; 2930 } 2931 2932 /** 2933 * ring_buffer_free - free a ring buffer. 2934 * @buffer: the buffer to free. 2935 */ 2936 void 2937 ring_buffer_free(struct trace_buffer *buffer) 2938 { 2939 int cpu; 2940 2941 if (buffer->range_addr_start && buffer->range_addr_end) 2942 atomic_notifier_chain_unregister(&panic_notifier_list, &buffer->flush_nb); 2943 2944 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2945 2946 irq_work_sync(&buffer->irq_work.work); 2947 2948 for_each_buffer_cpu(buffer, cpu) 2949 rb_free_cpu_buffer(buffer->buffers[cpu]); 2950 2951 kfree(buffer->buffers); 2952 free_cpumask_var(buffer->cpumask); 2953 2954 kfree(buffer); 2955 } 2956 EXPORT_SYMBOL_GPL(ring_buffer_free); 2957 2958 void ring_buffer_set_clock(struct trace_buffer *buffer, 2959 u64 (*clock)(void)) 2960 { 2961 buffer->clock = clock; 2962 } 2963 2964 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2965 { 2966 buffer->time_stamp_abs = abs; 2967 } 2968 2969 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2970 { 2971 return buffer->time_stamp_abs; 2972 } 2973 2974 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2975 { 2976 return local_read(&bpage->entries) & RB_WRITE_MASK; 2977 } 2978 2979 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2980 { 2981 return local_read(&bpage->write) & RB_WRITE_MASK; 2982 } 2983 2984 static bool 2985 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2986 { 2987 struct list_head *tail_page, *to_remove, *next_page; 2988 struct buffer_page *to_remove_page, *tmp_iter_page; 2989 struct buffer_page *last_page, *first_page; 2990 unsigned long nr_removed; 2991 unsigned long head_bit; 2992 int page_entries; 2993 2994 head_bit = 0; 2995 2996 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2997 atomic_inc(&cpu_buffer->record_disabled); 2998 /* 2999 * We don't race with the readers since we have acquired the reader 3000 * lock. We also don't race with writers after disabling recording. 3001 * This makes it easy to figure out the first and the last page to be 3002 * removed from the list. We unlink all the pages in between including 3003 * the first and last pages. This is done in a busy loop so that we 3004 * lose the least number of traces. 3005 * The pages are freed after we restart recording and unlock readers. 3006 */ 3007 tail_page = &cpu_buffer->tail_page->list; 3008 3009 /* 3010 * tail page might be on reader page, we remove the next page 3011 * from the ring buffer 3012 */ 3013 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 3014 tail_page = rb_list_head(tail_page->next); 3015 to_remove = tail_page; 3016 3017 /* start of pages to remove */ 3018 first_page = list_entry(rb_list_head(to_remove->next), 3019 struct buffer_page, list); 3020 3021 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 3022 to_remove = rb_list_head(to_remove)->next; 3023 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 3024 } 3025 /* Read iterators need to reset themselves when some pages removed */ 3026 cpu_buffer->pages_removed += nr_removed; 3027 3028 next_page = rb_list_head(to_remove)->next; 3029 3030 /* 3031 * Now we remove all pages between tail_page and next_page. 3032 * Make sure that we have head_bit value preserved for the 3033 * next page 3034 */ 3035 tail_page->next = (struct list_head *)((unsigned long)next_page | 3036 head_bit); 3037 next_page = rb_list_head(next_page); 3038 next_page->prev = tail_page; 3039 3040 /* make sure pages points to a valid page in the ring buffer */ 3041 cpu_buffer->pages = next_page; 3042 cpu_buffer->cnt++; 3043 3044 /* update head page */ 3045 if (head_bit) 3046 cpu_buffer->head_page = list_entry(next_page, 3047 struct buffer_page, list); 3048 3049 /* pages are removed, resume tracing and then free the pages */ 3050 atomic_dec(&cpu_buffer->record_disabled); 3051 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 3052 3053 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 3054 3055 /* last buffer page to remove */ 3056 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 3057 list); 3058 tmp_iter_page = first_page; 3059 3060 do { 3061 cond_resched(); 3062 3063 to_remove_page = tmp_iter_page; 3064 rb_inc_page(&tmp_iter_page); 3065 3066 /* update the counters */ 3067 page_entries = rb_page_entries(to_remove_page); 3068 if (page_entries) { 3069 /* 3070 * If something was added to this page, it was full 3071 * since it is not the tail page. So we deduct the 3072 * bytes consumed in ring buffer from here. 3073 * Increment overrun to account for the lost events. 3074 */ 3075 local_add(page_entries, &cpu_buffer->overrun); 3076 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 3077 local_inc(&cpu_buffer->pages_lost); 3078 } 3079 3080 /* 3081 * We have already removed references to this list item, just 3082 * free up the buffer_page and its page 3083 */ 3084 free_buffer_page(to_remove_page); 3085 nr_removed--; 3086 3087 } while (to_remove_page != last_page); 3088 3089 RB_WARN_ON(cpu_buffer, nr_removed); 3090 3091 return nr_removed == 0; 3092 } 3093 3094 static bool 3095 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 3096 { 3097 struct list_head *pages = &cpu_buffer->new_pages; 3098 unsigned long flags; 3099 bool success; 3100 int retries; 3101 3102 /* Can be called at early boot up, where interrupts must not been enabled */ 3103 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3104 /* 3105 * We are holding the reader lock, so the reader page won't be swapped 3106 * in the ring buffer. Now we are racing with the writer trying to 3107 * move head page and the tail page. 3108 * We are going to adapt the reader page update process where: 3109 * 1. We first splice the start and end of list of new pages between 3110 * the head page and its previous page. 3111 * 2. We cmpxchg the prev_page->next to point from head page to the 3112 * start of new pages list. 3113 * 3. Finally, we update the head->prev to the end of new list. 3114 * 3115 * We will try this process 10 times, to make sure that we don't keep 3116 * spinning. 3117 */ 3118 retries = 10; 3119 success = false; 3120 while (retries--) { 3121 struct list_head *head_page, *prev_page; 3122 struct list_head *last_page, *first_page; 3123 struct list_head *head_page_with_bit; 3124 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 3125 3126 if (!hpage) 3127 break; 3128 head_page = &hpage->list; 3129 prev_page = head_page->prev; 3130 3131 first_page = pages->next; 3132 last_page = pages->prev; 3133 3134 head_page_with_bit = (struct list_head *) 3135 ((unsigned long)head_page | RB_PAGE_HEAD); 3136 3137 last_page->next = head_page_with_bit; 3138 first_page->prev = prev_page; 3139 3140 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 3141 if (try_cmpxchg(&prev_page->next, 3142 &head_page_with_bit, first_page)) { 3143 /* 3144 * yay, we replaced the page pointer to our new list, 3145 * now, we just have to update to head page's prev 3146 * pointer to point to end of list 3147 */ 3148 head_page->prev = last_page; 3149 cpu_buffer->cnt++; 3150 success = true; 3151 break; 3152 } 3153 } 3154 3155 if (success) 3156 INIT_LIST_HEAD(pages); 3157 /* 3158 * If we weren't successful in adding in new pages, warn and stop 3159 * tracing 3160 */ 3161 RB_WARN_ON(cpu_buffer, !success); 3162 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3163 3164 /* free pages if they weren't inserted */ 3165 if (!success) { 3166 struct buffer_page *bpage, *tmp; 3167 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3168 list) { 3169 list_del_init(&bpage->list); 3170 free_buffer_page(bpage); 3171 } 3172 } 3173 return success; 3174 } 3175 3176 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 3177 { 3178 bool success; 3179 3180 if (cpu_buffer->nr_pages_to_update > 0) 3181 success = rb_insert_pages(cpu_buffer); 3182 else 3183 success = rb_remove_pages(cpu_buffer, 3184 -cpu_buffer->nr_pages_to_update); 3185 3186 if (success) 3187 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 3188 } 3189 3190 static void update_pages_handler(struct work_struct *work) 3191 { 3192 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 3193 struct ring_buffer_per_cpu, update_pages_work); 3194 rb_update_pages(cpu_buffer); 3195 complete(&cpu_buffer->update_done); 3196 } 3197 3198 /** 3199 * ring_buffer_resize - resize the ring buffer 3200 * @buffer: the buffer to resize. 3201 * @size: the new size. 3202 * @cpu_id: the cpu buffer to resize 3203 * 3204 * Minimum size is 2 * buffer->subbuf_size. 3205 * 3206 * Returns 0 on success and < 0 on failure. 3207 */ 3208 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 3209 int cpu_id) 3210 { 3211 struct ring_buffer_per_cpu *cpu_buffer; 3212 unsigned long nr_pages; 3213 int cpu, err; 3214 3215 /* 3216 * Always succeed at resizing a non-existent buffer: 3217 */ 3218 if (!buffer) 3219 return 0; 3220 3221 /* Make sure the requested buffer exists */ 3222 if (cpu_id != RING_BUFFER_ALL_CPUS && 3223 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 3224 return 0; 3225 3226 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 3227 3228 /* we need a minimum of two pages */ 3229 if (nr_pages < 2) 3230 nr_pages = 2; 3231 3232 /* 3233 * Keep CPUs from coming online while resizing to synchronize 3234 * with new per CPU buffers being created. 3235 */ 3236 guard(cpus_read_lock)(); 3237 3238 /* prevent another thread from changing buffer sizes */ 3239 mutex_lock(&buffer->mutex); 3240 atomic_inc(&buffer->resizing); 3241 3242 if (cpu_id == RING_BUFFER_ALL_CPUS) { 3243 /* 3244 * Don't succeed if resizing is disabled, as a reader might be 3245 * manipulating the ring buffer and is expecting a sane state while 3246 * this is true. 3247 */ 3248 for_each_buffer_cpu(buffer, cpu) { 3249 cpu_buffer = buffer->buffers[cpu]; 3250 if (atomic_read(&cpu_buffer->resize_disabled)) { 3251 err = -EBUSY; 3252 goto out_err_unlock; 3253 } 3254 } 3255 3256 /* calculate the pages to update */ 3257 for_each_buffer_cpu(buffer, cpu) { 3258 cpu_buffer = buffer->buffers[cpu]; 3259 3260 cpu_buffer->nr_pages_to_update = nr_pages - 3261 cpu_buffer->nr_pages; 3262 /* 3263 * nothing more to do for removing pages or no update 3264 */ 3265 if (cpu_buffer->nr_pages_to_update <= 0) 3266 continue; 3267 /* 3268 * to add pages, make sure all new pages can be 3269 * allocated without receiving ENOMEM 3270 */ 3271 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3272 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3273 &cpu_buffer->new_pages)) { 3274 /* not enough memory for new pages */ 3275 err = -ENOMEM; 3276 goto out_err; 3277 } 3278 3279 cond_resched(); 3280 } 3281 3282 /* 3283 * Fire off all the required work handlers 3284 * We can't schedule on offline CPUs, but it's not necessary 3285 * since we can change their buffer sizes without any race. 3286 */ 3287 for_each_buffer_cpu(buffer, cpu) { 3288 cpu_buffer = buffer->buffers[cpu]; 3289 if (!cpu_buffer->nr_pages_to_update) 3290 continue; 3291 3292 /* Can't run something on an offline CPU. */ 3293 if (!cpu_online(cpu)) { 3294 rb_update_pages(cpu_buffer); 3295 cpu_buffer->nr_pages_to_update = 0; 3296 } else { 3297 /* Run directly if possible. */ 3298 migrate_disable(); 3299 if (cpu != smp_processor_id()) { 3300 migrate_enable(); 3301 schedule_work_on(cpu, 3302 &cpu_buffer->update_pages_work); 3303 } else { 3304 update_pages_handler(&cpu_buffer->update_pages_work); 3305 migrate_enable(); 3306 } 3307 } 3308 } 3309 3310 /* wait for all the updates to complete */ 3311 for_each_buffer_cpu(buffer, cpu) { 3312 cpu_buffer = buffer->buffers[cpu]; 3313 if (!cpu_buffer->nr_pages_to_update) 3314 continue; 3315 3316 if (cpu_online(cpu)) 3317 wait_for_completion(&cpu_buffer->update_done); 3318 cpu_buffer->nr_pages_to_update = 0; 3319 } 3320 3321 } else { 3322 cpu_buffer = buffer->buffers[cpu_id]; 3323 3324 if (nr_pages == cpu_buffer->nr_pages) 3325 goto out; 3326 3327 /* 3328 * Don't succeed if resizing is disabled, as a reader might be 3329 * manipulating the ring buffer and is expecting a sane state while 3330 * this is true. 3331 */ 3332 if (atomic_read(&cpu_buffer->resize_disabled)) { 3333 err = -EBUSY; 3334 goto out_err_unlock; 3335 } 3336 3337 cpu_buffer->nr_pages_to_update = nr_pages - 3338 cpu_buffer->nr_pages; 3339 3340 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3341 if (cpu_buffer->nr_pages_to_update > 0 && 3342 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3343 &cpu_buffer->new_pages)) { 3344 err = -ENOMEM; 3345 goto out_err; 3346 } 3347 3348 /* Can't run something on an offline CPU. */ 3349 if (!cpu_online(cpu_id)) 3350 rb_update_pages(cpu_buffer); 3351 else { 3352 /* Run directly if possible. */ 3353 migrate_disable(); 3354 if (cpu_id == smp_processor_id()) { 3355 rb_update_pages(cpu_buffer); 3356 migrate_enable(); 3357 } else { 3358 migrate_enable(); 3359 schedule_work_on(cpu_id, 3360 &cpu_buffer->update_pages_work); 3361 wait_for_completion(&cpu_buffer->update_done); 3362 } 3363 } 3364 3365 cpu_buffer->nr_pages_to_update = 0; 3366 } 3367 3368 out: 3369 /* 3370 * The ring buffer resize can happen with the ring buffer 3371 * enabled, so that the update disturbs the tracing as little 3372 * as possible. But if the buffer is disabled, we do not need 3373 * to worry about that, and we can take the time to verify 3374 * that the buffer is not corrupt. 3375 */ 3376 if (atomic_read(&buffer->record_disabled)) { 3377 atomic_inc(&buffer->record_disabled); 3378 /* 3379 * Even though the buffer was disabled, we must make sure 3380 * that it is truly disabled before calling rb_check_pages. 3381 * There could have been a race between checking 3382 * record_disable and incrementing it. 3383 */ 3384 synchronize_rcu(); 3385 for_each_buffer_cpu(buffer, cpu) { 3386 cpu_buffer = buffer->buffers[cpu]; 3387 rb_check_pages(cpu_buffer); 3388 } 3389 atomic_dec(&buffer->record_disabled); 3390 } 3391 3392 atomic_dec(&buffer->resizing); 3393 mutex_unlock(&buffer->mutex); 3394 return 0; 3395 3396 out_err: 3397 for_each_buffer_cpu(buffer, cpu) { 3398 struct buffer_page *bpage, *tmp; 3399 3400 cpu_buffer = buffer->buffers[cpu]; 3401 cpu_buffer->nr_pages_to_update = 0; 3402 3403 if (list_empty(&cpu_buffer->new_pages)) 3404 continue; 3405 3406 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3407 list) { 3408 list_del_init(&bpage->list); 3409 free_buffer_page(bpage); 3410 3411 cond_resched(); 3412 } 3413 } 3414 out_err_unlock: 3415 atomic_dec(&buffer->resizing); 3416 mutex_unlock(&buffer->mutex); 3417 return err; 3418 } 3419 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3420 3421 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3422 { 3423 mutex_lock(&buffer->mutex); 3424 if (val) 3425 buffer->flags |= RB_FL_OVERWRITE; 3426 else 3427 buffer->flags &= ~RB_FL_OVERWRITE; 3428 mutex_unlock(&buffer->mutex); 3429 } 3430 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3431 3432 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3433 { 3434 return bpage->page->data + index; 3435 } 3436 3437 static __always_inline struct ring_buffer_event * 3438 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3439 { 3440 return __rb_page_index(cpu_buffer->reader_page, 3441 cpu_buffer->reader_page->read); 3442 } 3443 3444 static struct ring_buffer_event * 3445 rb_iter_head_event(struct ring_buffer_iter *iter) 3446 { 3447 struct ring_buffer_event *event; 3448 struct buffer_page *iter_head_page = iter->head_page; 3449 unsigned long commit; 3450 unsigned length; 3451 3452 if (iter->head != iter->next_event) 3453 return iter->event; 3454 3455 /* 3456 * When the writer goes across pages, it issues a cmpxchg which 3457 * is a mb(), which will synchronize with the rmb here. 3458 * (see rb_tail_page_update() and __rb_reserve_next()) 3459 */ 3460 commit = rb_page_size(iter_head_page); 3461 smp_rmb(); 3462 3463 /* An event needs to be at least 8 bytes in size */ 3464 if (iter->head > commit - 8) 3465 goto reset; 3466 3467 event = __rb_page_index(iter_head_page, iter->head); 3468 length = rb_event_length(event); 3469 3470 /* 3471 * READ_ONCE() doesn't work on functions and we don't want the 3472 * compiler doing any crazy optimizations with length. 3473 */ 3474 barrier(); 3475 3476 if ((iter->head + length) > commit || length > iter->event_size) 3477 /* Writer corrupted the read? */ 3478 goto reset; 3479 3480 memcpy(iter->event, event, length); 3481 /* 3482 * If the page stamp is still the same after this rmb() then the 3483 * event was safely copied without the writer entering the page. 3484 */ 3485 smp_rmb(); 3486 3487 /* Make sure the page didn't change since we read this */ 3488 if (iter->page_stamp != iter_head_page->page->time_stamp || 3489 commit > rb_page_size(iter_head_page)) 3490 goto reset; 3491 3492 iter->next_event = iter->head + length; 3493 return iter->event; 3494 reset: 3495 /* Reset to the beginning */ 3496 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3497 iter->head = 0; 3498 iter->next_event = 0; 3499 iter->missed_events = 1; 3500 return NULL; 3501 } 3502 3503 static __always_inline unsigned 3504 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3505 { 3506 return rb_page_commit(cpu_buffer->commit_page); 3507 } 3508 3509 static __always_inline unsigned 3510 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3511 { 3512 unsigned long addr = (unsigned long)event; 3513 3514 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3515 3516 return addr - BUF_PAGE_HDR_SIZE; 3517 } 3518 3519 static void rb_inc_iter(struct ring_buffer_iter *iter) 3520 { 3521 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3522 3523 /* 3524 * The iterator could be on the reader page (it starts there). 3525 * But the head could have moved, since the reader was 3526 * found. Check for this case and assign the iterator 3527 * to the head page instead of next. 3528 */ 3529 if (iter->head_page == cpu_buffer->reader_page) 3530 iter->head_page = rb_set_head_page(cpu_buffer); 3531 else 3532 rb_inc_page(&iter->head_page); 3533 3534 if (rb_page_commit(iter->head_page) & RB_MISSED_EVENTS) 3535 iter->missed_events = -1; 3536 3537 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3538 iter->head = 0; 3539 iter->next_event = 0; 3540 } 3541 3542 /* Return the index into the sub-buffers for a given sub-buffer */ 3543 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3544 { 3545 void *subbuf_array; 3546 3547 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3548 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3549 return (subbuf - subbuf_array) / meta->subbuf_size; 3550 } 3551 3552 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3553 struct buffer_page *next_page) 3554 { 3555 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3556 unsigned long old_head = (unsigned long)next_page->page; 3557 unsigned long new_head; 3558 3559 rb_inc_page(&next_page); 3560 new_head = (unsigned long)next_page->page; 3561 3562 /* 3563 * Only move it forward once, if something else came in and 3564 * moved it forward, then we don't want to touch it. 3565 */ 3566 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3567 } 3568 3569 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3570 struct buffer_page *reader) 3571 { 3572 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3573 void *old_reader = cpu_buffer->reader_page->page; 3574 void *new_reader = reader->page; 3575 int id; 3576 3577 id = reader->id; 3578 cpu_buffer->reader_page->id = id; 3579 reader->id = 0; 3580 3581 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3582 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3583 3584 /* The head pointer is the one after the reader */ 3585 rb_update_meta_head(cpu_buffer, reader); 3586 } 3587 3588 /* 3589 * rb_handle_head_page - writer hit the head page 3590 * 3591 * Returns: +1 to retry page 3592 * 0 to continue 3593 * -1 on error 3594 */ 3595 static int 3596 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3597 struct buffer_page *tail_page, 3598 struct buffer_page *next_page) 3599 { 3600 struct buffer_page *new_head; 3601 int entries; 3602 int type; 3603 int ret; 3604 3605 entries = rb_page_entries(next_page); 3606 3607 /* 3608 * The hard part is here. We need to move the head 3609 * forward, and protect against both readers on 3610 * other CPUs and writers coming in via interrupts. 3611 */ 3612 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3613 RB_PAGE_HEAD); 3614 3615 /* 3616 * type can be one of four: 3617 * NORMAL - an interrupt already moved it for us 3618 * HEAD - we are the first to get here. 3619 * UPDATE - we are the interrupt interrupting 3620 * a current move. 3621 * MOVED - a reader on another CPU moved the next 3622 * pointer to its reader page. Give up 3623 * and try again. 3624 */ 3625 3626 switch (type) { 3627 case RB_PAGE_HEAD: 3628 /* 3629 * We changed the head to UPDATE, thus 3630 * it is our responsibility to update 3631 * the counters. 3632 */ 3633 local_add(entries, &cpu_buffer->overrun); 3634 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3635 local_inc(&cpu_buffer->pages_lost); 3636 3637 if (cpu_buffer->ring_meta) 3638 rb_update_meta_head(cpu_buffer, next_page); 3639 /* 3640 * The entries will be zeroed out when we move the 3641 * tail page. 3642 */ 3643 3644 /* still more to do */ 3645 break; 3646 3647 case RB_PAGE_UPDATE: 3648 /* 3649 * This is an interrupt that interrupt the 3650 * previous update. Still more to do. 3651 */ 3652 break; 3653 case RB_PAGE_NORMAL: 3654 /* 3655 * An interrupt came in before the update 3656 * and processed this for us. 3657 * Nothing left to do. 3658 */ 3659 return 1; 3660 case RB_PAGE_MOVED: 3661 /* 3662 * The reader is on another CPU and just did 3663 * a swap with our next_page. 3664 * Try again. 3665 */ 3666 return 1; 3667 default: 3668 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3669 return -1; 3670 } 3671 3672 /* 3673 * Now that we are here, the old head pointer is 3674 * set to UPDATE. This will keep the reader from 3675 * swapping the head page with the reader page. 3676 * The reader (on another CPU) will spin till 3677 * we are finished. 3678 * 3679 * We just need to protect against interrupts 3680 * doing the job. We will set the next pointer 3681 * to HEAD. After that, we set the old pointer 3682 * to NORMAL, but only if it was HEAD before. 3683 * otherwise we are an interrupt, and only 3684 * want the outer most commit to reset it. 3685 */ 3686 new_head = next_page; 3687 rb_inc_page(&new_head); 3688 3689 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3690 RB_PAGE_NORMAL); 3691 3692 /* 3693 * Valid returns are: 3694 * HEAD - an interrupt came in and already set it. 3695 * NORMAL - One of two things: 3696 * 1) We really set it. 3697 * 2) A bunch of interrupts came in and moved 3698 * the page forward again. 3699 */ 3700 switch (ret) { 3701 case RB_PAGE_HEAD: 3702 case RB_PAGE_NORMAL: 3703 /* OK */ 3704 break; 3705 default: 3706 RB_WARN_ON(cpu_buffer, 1); 3707 return -1; 3708 } 3709 3710 /* 3711 * It is possible that an interrupt came in, 3712 * set the head up, then more interrupts came in 3713 * and moved it again. When we get back here, 3714 * the page would have been set to NORMAL but we 3715 * just set it back to HEAD. 3716 * 3717 * How do you detect this? Well, if that happened 3718 * the tail page would have moved. 3719 */ 3720 if (ret == RB_PAGE_NORMAL) { 3721 struct buffer_page *buffer_tail_page; 3722 3723 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3724 /* 3725 * If the tail had moved passed next, then we need 3726 * to reset the pointer. 3727 */ 3728 if (buffer_tail_page != tail_page && 3729 buffer_tail_page != next_page) 3730 rb_head_page_set_normal(cpu_buffer, new_head, 3731 next_page, 3732 RB_PAGE_HEAD); 3733 } 3734 3735 /* 3736 * If this was the outer most commit (the one that 3737 * changed the original pointer from HEAD to UPDATE), 3738 * then it is up to us to reset it to NORMAL. 3739 */ 3740 if (type == RB_PAGE_HEAD) { 3741 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3742 tail_page, 3743 RB_PAGE_UPDATE); 3744 if (RB_WARN_ON(cpu_buffer, 3745 ret != RB_PAGE_UPDATE)) 3746 return -1; 3747 } 3748 3749 return 0; 3750 } 3751 3752 static inline void 3753 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3754 unsigned long tail, struct rb_event_info *info) 3755 { 3756 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3757 struct buffer_page *tail_page = info->tail_page; 3758 struct ring_buffer_event *event; 3759 unsigned long length = info->length; 3760 3761 /* 3762 * Only the event that crossed the page boundary 3763 * must fill the old tail_page with padding. 3764 */ 3765 if (tail >= bsize) { 3766 /* 3767 * If the page was filled, then we still need 3768 * to update the real_end. Reset it to zero 3769 * and the reader will ignore it. 3770 */ 3771 if (tail == bsize) 3772 tail_page->real_end = 0; 3773 3774 local_sub(length, &tail_page->write); 3775 return; 3776 } 3777 3778 event = __rb_page_index(tail_page, tail); 3779 3780 /* 3781 * Save the original length to the meta data. 3782 * This will be used by the reader to add lost event 3783 * counter. 3784 */ 3785 tail_page->real_end = tail; 3786 3787 /* 3788 * If this event is bigger than the minimum size, then 3789 * we need to be careful that we don't subtract the 3790 * write counter enough to allow another writer to slip 3791 * in on this page. 3792 * We put in a discarded commit instead, to make sure 3793 * that this space is not used again, and this space will 3794 * not be accounted into 'entries_bytes'. 3795 * 3796 * If we are less than the minimum size, we don't need to 3797 * worry about it. 3798 */ 3799 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3800 /* No room for any events */ 3801 3802 /* Mark the rest of the page with padding */ 3803 rb_event_set_padding(event); 3804 3805 /* Make sure the padding is visible before the write update */ 3806 smp_wmb(); 3807 3808 /* Set the write back to the previous setting */ 3809 local_sub(length, &tail_page->write); 3810 return; 3811 } 3812 3813 /* Put in a discarded event */ 3814 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3815 event->type_len = RINGBUF_TYPE_PADDING; 3816 /* time delta must be non zero */ 3817 event->time_delta = 1; 3818 3819 /* account for padding bytes */ 3820 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3821 3822 /* Make sure the padding is visible before the tail_page->write update */ 3823 smp_wmb(); 3824 3825 /* Set write to end of buffer */ 3826 length = (tail + length) - bsize; 3827 local_sub(length, &tail_page->write); 3828 } 3829 3830 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3831 3832 /* 3833 * This is the slow path, force gcc not to inline it. 3834 */ 3835 static noinline struct ring_buffer_event * 3836 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3837 unsigned long tail, struct rb_event_info *info) 3838 { 3839 struct buffer_page *tail_page = info->tail_page; 3840 struct buffer_page *commit_page = cpu_buffer->commit_page; 3841 struct trace_buffer *buffer = cpu_buffer->buffer; 3842 struct buffer_page *next_page; 3843 int ret; 3844 3845 next_page = tail_page; 3846 3847 rb_inc_page(&next_page); 3848 3849 /* 3850 * If for some reason, we had an interrupt storm that made 3851 * it all the way around the buffer, bail, and warn 3852 * about it. 3853 */ 3854 if (unlikely(next_page == commit_page)) { 3855 local_inc(&cpu_buffer->commit_overrun); 3856 goto out_reset; 3857 } 3858 3859 /* 3860 * This is where the fun begins! 3861 * 3862 * We are fighting against races between a reader that 3863 * could be on another CPU trying to swap its reader 3864 * page with the buffer head. 3865 * 3866 * We are also fighting against interrupts coming in and 3867 * moving the head or tail on us as well. 3868 * 3869 * If the next page is the head page then we have filled 3870 * the buffer, unless the commit page is still on the 3871 * reader page. 3872 */ 3873 if (rb_is_head_page(next_page, &tail_page->list)) { 3874 3875 /* 3876 * If the commit is not on the reader page, then 3877 * move the header page. 3878 */ 3879 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3880 /* 3881 * If we are not in overwrite mode, 3882 * this is easy, just stop here. 3883 */ 3884 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3885 local_inc(&cpu_buffer->dropped_events); 3886 goto out_reset; 3887 } 3888 3889 ret = rb_handle_head_page(cpu_buffer, 3890 tail_page, 3891 next_page); 3892 if (ret < 0) 3893 goto out_reset; 3894 if (ret) 3895 goto out_again; 3896 } else { 3897 /* 3898 * We need to be careful here too. The 3899 * commit page could still be on the reader 3900 * page. We could have a small buffer, and 3901 * have filled up the buffer with events 3902 * from interrupts and such, and wrapped. 3903 * 3904 * Note, if the tail page is also on the 3905 * reader_page, we let it move out. 3906 */ 3907 if (unlikely((cpu_buffer->commit_page != 3908 cpu_buffer->tail_page) && 3909 (cpu_buffer->commit_page == 3910 cpu_buffer->reader_page))) { 3911 local_inc(&cpu_buffer->commit_overrun); 3912 goto out_reset; 3913 } 3914 } 3915 } 3916 3917 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3918 3919 out_again: 3920 3921 rb_reset_tail(cpu_buffer, tail, info); 3922 3923 /* Commit what we have for now. */ 3924 rb_end_commit(cpu_buffer); 3925 /* rb_end_commit() decs committing */ 3926 local_inc(&cpu_buffer->committing); 3927 3928 /* fail and let the caller try again */ 3929 return ERR_PTR(-EAGAIN); 3930 3931 out_reset: 3932 /* reset write */ 3933 rb_reset_tail(cpu_buffer, tail, info); 3934 3935 return NULL; 3936 } 3937 3938 /* Slow path */ 3939 static struct ring_buffer_event * 3940 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3941 struct ring_buffer_event *event, u64 delta, bool abs) 3942 { 3943 if (abs) 3944 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3945 else 3946 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3947 3948 /* Not the first event on the page, or not delta? */ 3949 if (abs || rb_event_index(cpu_buffer, event)) { 3950 event->time_delta = delta & TS_MASK; 3951 event->array[0] = delta >> TS_SHIFT; 3952 } else { 3953 /* nope, just zero it */ 3954 event->time_delta = 0; 3955 event->array[0] = 0; 3956 } 3957 3958 return skip_time_extend(event); 3959 } 3960 3961 static void 3962 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3963 struct rb_event_info *info) 3964 { 3965 u64 write_stamp; 3966 3967 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3968 (unsigned long long)info->delta, 3969 (unsigned long long)info->ts, 3970 (unsigned long long)info->before, 3971 (unsigned long long)info->after, 3972 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3973 sched_clock_stable() ? "" : 3974 "If you just came from a suspend/resume,\n" 3975 "please switch to the trace global clock:\n" 3976 " echo global > /sys/kernel/tracing/trace_clock\n" 3977 "or add trace_clock=global to the kernel command line\n"); 3978 } 3979 3980 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3981 struct ring_buffer_event **event, 3982 struct rb_event_info *info, 3983 u64 *delta, 3984 unsigned int *length) 3985 { 3986 bool abs = info->add_timestamp & 3987 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3988 3989 if (unlikely(info->delta > (1ULL << 59))) { 3990 /* 3991 * Some timers can use more than 59 bits, and when a timestamp 3992 * is added to the buffer, it will lose those bits. 3993 */ 3994 if (abs && (info->ts & TS_MSB)) { 3995 info->delta &= ABS_TS_MASK; 3996 3997 /* did the clock go backwards */ 3998 } else if (info->before == info->after && info->before > info->ts) { 3999 /* not interrupted */ 4000 static int once; 4001 4002 /* 4003 * This is possible with a recalibrating of the TSC. 4004 * Do not produce a call stack, but just report it. 4005 */ 4006 if (!once) { 4007 once++; 4008 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 4009 info->before, info->ts); 4010 } 4011 } else 4012 rb_check_timestamp(cpu_buffer, info); 4013 if (!abs) 4014 info->delta = 0; 4015 } 4016 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 4017 *length -= RB_LEN_TIME_EXTEND; 4018 *delta = 0; 4019 } 4020 4021 /** 4022 * rb_update_event - update event type and data 4023 * @cpu_buffer: The per cpu buffer of the @event 4024 * @event: the event to update 4025 * @info: The info to update the @event with (contains length and delta) 4026 * 4027 * Update the type and data fields of the @event. The length 4028 * is the actual size that is written to the ring buffer, 4029 * and with this, we can determine what to place into the 4030 * data field. 4031 */ 4032 static void 4033 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 4034 struct ring_buffer_event *event, 4035 struct rb_event_info *info) 4036 { 4037 unsigned length = info->length; 4038 u64 delta = info->delta; 4039 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 4040 4041 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 4042 cpu_buffer->event_stamp[nest] = info->ts; 4043 4044 /* 4045 * If we need to add a timestamp, then we 4046 * add it to the start of the reserved space. 4047 */ 4048 if (unlikely(info->add_timestamp)) 4049 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 4050 4051 event->time_delta = delta; 4052 length -= RB_EVNT_HDR_SIZE; 4053 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 4054 event->type_len = 0; 4055 event->array[0] = length; 4056 } else 4057 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 4058 } 4059 4060 static unsigned rb_calculate_event_length(unsigned length) 4061 { 4062 struct ring_buffer_event event; /* Used only for sizeof array */ 4063 4064 /* zero length can cause confusions */ 4065 if (!length) 4066 length++; 4067 4068 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 4069 length += sizeof(event.array[0]); 4070 4071 length += RB_EVNT_HDR_SIZE; 4072 length = ALIGN(length, RB_ARCH_ALIGNMENT); 4073 4074 /* 4075 * In case the time delta is larger than the 27 bits for it 4076 * in the header, we need to add a timestamp. If another 4077 * event comes in when trying to discard this one to increase 4078 * the length, then the timestamp will be added in the allocated 4079 * space of this event. If length is bigger than the size needed 4080 * for the TIME_EXTEND, then padding has to be used. The events 4081 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 4082 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 4083 * As length is a multiple of 4, we only need to worry if it 4084 * is 12 (RB_LEN_TIME_EXTEND + 4). 4085 */ 4086 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 4087 length += RB_ALIGNMENT; 4088 4089 return length; 4090 } 4091 4092 static inline bool 4093 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 4094 struct ring_buffer_event *event) 4095 { 4096 unsigned long new_index, old_index; 4097 struct buffer_page *bpage; 4098 unsigned long addr; 4099 4100 new_index = rb_event_index(cpu_buffer, event); 4101 old_index = new_index + rb_event_ts_length(event); 4102 addr = (unsigned long)event; 4103 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4104 4105 bpage = READ_ONCE(cpu_buffer->tail_page); 4106 4107 /* 4108 * Make sure the tail_page is still the same and 4109 * the next write location is the end of this event 4110 */ 4111 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 4112 unsigned long write_mask = 4113 local_read(&bpage->write) & ~RB_WRITE_MASK; 4114 unsigned long event_length = rb_event_length(event); 4115 4116 /* 4117 * For the before_stamp to be different than the write_stamp 4118 * to make sure that the next event adds an absolute 4119 * value and does not rely on the saved write stamp, which 4120 * is now going to be bogus. 4121 * 4122 * By setting the before_stamp to zero, the next event 4123 * is not going to use the write_stamp and will instead 4124 * create an absolute timestamp. This means there's no 4125 * reason to update the wirte_stamp! 4126 */ 4127 rb_time_set(&cpu_buffer->before_stamp, 0); 4128 4129 /* 4130 * If an event were to come in now, it would see that the 4131 * write_stamp and the before_stamp are different, and assume 4132 * that this event just added itself before updating 4133 * the write stamp. The interrupting event will fix the 4134 * write stamp for us, and use an absolute timestamp. 4135 */ 4136 4137 /* 4138 * This is on the tail page. It is possible that 4139 * a write could come in and move the tail page 4140 * and write to the next page. That is fine 4141 * because we just shorten what is on this page. 4142 */ 4143 old_index += write_mask; 4144 new_index += write_mask; 4145 4146 /* caution: old_index gets updated on cmpxchg failure */ 4147 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 4148 /* update counters */ 4149 local_sub(event_length, &cpu_buffer->entries_bytes); 4150 return true; 4151 } 4152 } 4153 4154 /* could not discard */ 4155 return false; 4156 } 4157 4158 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 4159 { 4160 local_inc(&cpu_buffer->committing); 4161 local_inc(&cpu_buffer->commits); 4162 } 4163 4164 static __always_inline void 4165 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 4166 { 4167 unsigned long max_count; 4168 4169 /* 4170 * We only race with interrupts and NMIs on this CPU. 4171 * If we own the commit event, then we can commit 4172 * all others that interrupted us, since the interruptions 4173 * are in stack format (they finish before they come 4174 * back to us). This allows us to do a simple loop to 4175 * assign the commit to the tail. 4176 */ 4177 again: 4178 max_count = cpu_buffer->nr_pages * 100; 4179 4180 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 4181 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 4182 return; 4183 if (RB_WARN_ON(cpu_buffer, 4184 rb_is_reader_page(cpu_buffer->tail_page))) 4185 return; 4186 /* 4187 * No need for a memory barrier here, as the update 4188 * of the tail_page did it for this page. 4189 */ 4190 local_set(&cpu_buffer->commit_page->page->commit, 4191 rb_page_write(cpu_buffer->commit_page)); 4192 rb_inc_page(&cpu_buffer->commit_page); 4193 if (cpu_buffer->ring_meta) { 4194 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 4195 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 4196 } 4197 /* add barrier to keep gcc from optimizing too much */ 4198 barrier(); 4199 } 4200 while (rb_commit_index(cpu_buffer) != 4201 rb_page_write(cpu_buffer->commit_page)) { 4202 4203 /* Make sure the readers see the content of what is committed. */ 4204 smp_wmb(); 4205 local_set(&cpu_buffer->commit_page->page->commit, 4206 rb_page_write(cpu_buffer->commit_page)); 4207 RB_WARN_ON(cpu_buffer, 4208 rb_page_commit(cpu_buffer->commit_page) & ~RB_WRITE_MASK); 4209 barrier(); 4210 } 4211 4212 /* again, keep gcc from optimizing */ 4213 barrier(); 4214 4215 /* 4216 * If an interrupt came in just after the first while loop 4217 * and pushed the tail page forward, we will be left with 4218 * a dangling commit that will never go forward. 4219 */ 4220 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 4221 goto again; 4222 } 4223 4224 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 4225 { 4226 unsigned long commits; 4227 4228 if (RB_WARN_ON(cpu_buffer, 4229 !local_read(&cpu_buffer->committing))) 4230 return; 4231 4232 again: 4233 commits = local_read(&cpu_buffer->commits); 4234 /* synchronize with interrupts */ 4235 barrier(); 4236 if (local_read(&cpu_buffer->committing) == 1) 4237 rb_set_commit_to_write(cpu_buffer); 4238 4239 local_dec(&cpu_buffer->committing); 4240 4241 /* synchronize with interrupts */ 4242 barrier(); 4243 4244 /* 4245 * Need to account for interrupts coming in between the 4246 * updating of the commit page and the clearing of the 4247 * committing counter. 4248 */ 4249 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 4250 !local_read(&cpu_buffer->committing)) { 4251 local_inc(&cpu_buffer->committing); 4252 goto again; 4253 } 4254 } 4255 4256 static inline void rb_event_discard(struct ring_buffer_event *event) 4257 { 4258 if (extended_time(event)) 4259 event = skip_time_extend(event); 4260 4261 /* array[0] holds the actual length for the discarded event */ 4262 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4263 event->type_len = RINGBUF_TYPE_PADDING; 4264 /* time delta must be non zero */ 4265 if (!event->time_delta) 4266 event->time_delta = 1; 4267 } 4268 4269 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4270 { 4271 local_inc(&cpu_buffer->entries); 4272 rb_end_commit(cpu_buffer); 4273 } 4274 4275 static bool 4276 rb_irq_work_queue(struct rb_irq_work *irq_work) 4277 { 4278 int cpu; 4279 4280 /* irq_work_queue_on() is not NMI-safe */ 4281 if (unlikely(in_nmi())) 4282 return irq_work_queue(&irq_work->work); 4283 4284 /* 4285 * If CPU isolation is not active, cpu is always the current 4286 * CPU, and the following is equivallent to irq_work_queue(). 4287 */ 4288 cpu = housekeeping_any_cpu(HK_TYPE_KERNEL_NOISE); 4289 return irq_work_queue_on(&irq_work->work, cpu); 4290 } 4291 4292 static __always_inline void 4293 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4294 { 4295 if (buffer->irq_work.waiters_pending) { 4296 buffer->irq_work.waiters_pending = false; 4297 /* irq_work_queue() supplies it's own memory barriers */ 4298 rb_irq_work_queue(&buffer->irq_work); 4299 } 4300 4301 if (cpu_buffer->irq_work.waiters_pending) { 4302 cpu_buffer->irq_work.waiters_pending = false; 4303 /* irq_work_queue() supplies it's own memory barriers */ 4304 rb_irq_work_queue(&cpu_buffer->irq_work); 4305 } 4306 4307 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4308 return; 4309 4310 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4311 return; 4312 4313 if (!cpu_buffer->irq_work.full_waiters_pending) 4314 return; 4315 4316 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4317 4318 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4319 return; 4320 4321 cpu_buffer->irq_work.wakeup_full = true; 4322 cpu_buffer->irq_work.full_waiters_pending = false; 4323 /* irq_work_queue() supplies it's own memory barriers */ 4324 rb_irq_work_queue(&cpu_buffer->irq_work); 4325 } 4326 4327 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4328 # define do_ring_buffer_record_recursion() \ 4329 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4330 #else 4331 # define do_ring_buffer_record_recursion() do { } while (0) 4332 #endif 4333 4334 /* 4335 * The lock and unlock are done within a preempt disable section. 4336 * The current_context per_cpu variable can only be modified 4337 * by the current task between lock and unlock. But it can 4338 * be modified more than once via an interrupt. To pass this 4339 * information from the lock to the unlock without having to 4340 * access the 'in_interrupt()' functions again (which do show 4341 * a bit of overhead in something as critical as function tracing, 4342 * we use a bitmask trick. 4343 * 4344 * bit 1 = NMI context 4345 * bit 2 = IRQ context 4346 * bit 3 = SoftIRQ context 4347 * bit 4 = normal context. 4348 * 4349 * This works because this is the order of contexts that can 4350 * preempt other contexts. A SoftIRQ never preempts an IRQ 4351 * context. 4352 * 4353 * When the context is determined, the corresponding bit is 4354 * checked and set (if it was set, then a recursion of that context 4355 * happened). 4356 * 4357 * On unlock, we need to clear this bit. To do so, just subtract 4358 * 1 from the current_context and AND it to itself. 4359 * 4360 * (binary) 4361 * 101 - 1 = 100 4362 * 101 & 100 = 100 (clearing bit zero) 4363 * 4364 * 1010 - 1 = 1001 4365 * 1010 & 1001 = 1000 (clearing bit 1) 4366 * 4367 * The least significant bit can be cleared this way, and it 4368 * just so happens that it is the same bit corresponding to 4369 * the current context. 4370 * 4371 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4372 * is set when a recursion is detected at the current context, and if 4373 * the TRANSITION bit is already set, it will fail the recursion. 4374 * This is needed because there's a lag between the changing of 4375 * interrupt context and updating the preempt count. In this case, 4376 * a false positive will be found. To handle this, one extra recursion 4377 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4378 * bit is already set, then it is considered a recursion and the function 4379 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4380 * 4381 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4382 * to be cleared. Even if it wasn't the context that set it. That is, 4383 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4384 * is called before preempt_count() is updated, since the check will 4385 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4386 * NMI then comes in, it will set the NMI bit, but when the NMI code 4387 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4388 * and leave the NMI bit set. But this is fine, because the interrupt 4389 * code that set the TRANSITION bit will then clear the NMI bit when it 4390 * calls trace_recursive_unlock(). If another NMI comes in, it will 4391 * set the TRANSITION bit and continue. 4392 * 4393 * Note: The TRANSITION bit only handles a single transition between context. 4394 */ 4395 4396 static __always_inline bool 4397 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4398 { 4399 unsigned int val = cpu_buffer->current_context; 4400 int bit = interrupt_context_level(); 4401 4402 bit = RB_CTX_NORMAL - bit; 4403 4404 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4405 /* 4406 * It is possible that this was called by transitioning 4407 * between interrupt context, and preempt_count() has not 4408 * been updated yet. In this case, use the TRANSITION bit. 4409 */ 4410 bit = RB_CTX_TRANSITION; 4411 if (val & (1 << (bit + cpu_buffer->nest))) { 4412 do_ring_buffer_record_recursion(); 4413 return true; 4414 } 4415 } 4416 4417 val |= (1 << (bit + cpu_buffer->nest)); 4418 cpu_buffer->current_context = val; 4419 4420 return false; 4421 } 4422 4423 static __always_inline void 4424 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4425 { 4426 cpu_buffer->current_context &= 4427 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4428 } 4429 4430 /* The recursive locking above uses 5 bits */ 4431 #define NESTED_BITS 5 4432 4433 /** 4434 * ring_buffer_nest_start - Allow to trace while nested 4435 * @buffer: The ring buffer to modify 4436 * 4437 * The ring buffer has a safety mechanism to prevent recursion. 4438 * But there may be a case where a trace needs to be done while 4439 * tracing something else. In this case, calling this function 4440 * will allow this function to nest within a currently active 4441 * ring_buffer_lock_reserve(). 4442 * 4443 * Call this function before calling another ring_buffer_lock_reserve() and 4444 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4445 */ 4446 void ring_buffer_nest_start(struct trace_buffer *buffer) 4447 { 4448 struct ring_buffer_per_cpu *cpu_buffer; 4449 int cpu; 4450 4451 /* Enabled by ring_buffer_nest_end() */ 4452 preempt_disable_notrace(); 4453 cpu = raw_smp_processor_id(); 4454 cpu_buffer = buffer->buffers[cpu]; 4455 /* This is the shift value for the above recursive locking */ 4456 cpu_buffer->nest += NESTED_BITS; 4457 } 4458 4459 /** 4460 * ring_buffer_nest_end - Allow to trace while nested 4461 * @buffer: The ring buffer to modify 4462 * 4463 * Must be called after ring_buffer_nest_start() and after the 4464 * ring_buffer_unlock_commit(). 4465 */ 4466 void ring_buffer_nest_end(struct trace_buffer *buffer) 4467 { 4468 struct ring_buffer_per_cpu *cpu_buffer; 4469 int cpu; 4470 4471 /* disabled by ring_buffer_nest_start() */ 4472 cpu = raw_smp_processor_id(); 4473 cpu_buffer = buffer->buffers[cpu]; 4474 /* This is the shift value for the above recursive locking */ 4475 cpu_buffer->nest -= NESTED_BITS; 4476 preempt_enable_notrace(); 4477 } 4478 4479 /** 4480 * ring_buffer_unlock_commit - commit a reserved 4481 * @buffer: The buffer to commit to 4482 * 4483 * This commits the data to the ring buffer, and releases any locks held. 4484 * 4485 * Must be paired with ring_buffer_lock_reserve. 4486 */ 4487 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4488 { 4489 struct ring_buffer_per_cpu *cpu_buffer; 4490 int cpu = raw_smp_processor_id(); 4491 4492 cpu_buffer = buffer->buffers[cpu]; 4493 4494 rb_commit(cpu_buffer); 4495 4496 rb_wakeups(buffer, cpu_buffer); 4497 4498 trace_recursive_unlock(cpu_buffer); 4499 4500 preempt_enable_notrace(); 4501 4502 return 0; 4503 } 4504 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4505 4506 /* Special value to validate all deltas on a page. */ 4507 #define CHECK_FULL_PAGE 1L 4508 4509 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4510 4511 static const char *show_irq_str(int bits) 4512 { 4513 static const char * type[] = { 4514 ".", // 0 4515 "s", // 1 4516 "h", // 2 4517 "Hs", // 3 4518 "n", // 4 4519 "Ns", // 5 4520 "Nh", // 6 4521 "NHs", // 7 4522 }; 4523 4524 return type[bits]; 4525 } 4526 4527 /* Assume this is a trace event */ 4528 static const char *show_flags(struct ring_buffer_event *event) 4529 { 4530 struct trace_entry *entry; 4531 int bits = 0; 4532 4533 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4534 return "X"; 4535 4536 entry = ring_buffer_event_data(event); 4537 4538 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4539 bits |= 1; 4540 4541 if (entry->flags & TRACE_FLAG_HARDIRQ) 4542 bits |= 2; 4543 4544 if (entry->flags & TRACE_FLAG_NMI) 4545 bits |= 4; 4546 4547 return show_irq_str(bits); 4548 } 4549 4550 static const char *show_irq(struct ring_buffer_event *event) 4551 { 4552 struct trace_entry *entry; 4553 4554 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4555 return ""; 4556 4557 entry = ring_buffer_event_data(event); 4558 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4559 return "d"; 4560 return ""; 4561 } 4562 4563 static const char *show_interrupt_level(void) 4564 { 4565 unsigned long pc = preempt_count(); 4566 unsigned char level = 0; 4567 4568 if (pc & SOFTIRQ_OFFSET) 4569 level |= 1; 4570 4571 if (pc & HARDIRQ_MASK) 4572 level |= 2; 4573 4574 if (pc & NMI_MASK) 4575 level |= 4; 4576 4577 return show_irq_str(level); 4578 } 4579 4580 static void dump_buffer_page(struct buffer_data_page *dpage, 4581 struct rb_event_info *info, 4582 unsigned long tail) 4583 { 4584 struct ring_buffer_event *event; 4585 u64 ts, delta; 4586 int e; 4587 4588 ts = dpage->time_stamp; 4589 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4590 4591 for (e = 0; e < tail; e += rb_event_length(event)) { 4592 4593 event = (struct ring_buffer_event *)(dpage->data + e); 4594 4595 switch (event->type_len) { 4596 4597 case RINGBUF_TYPE_TIME_EXTEND: 4598 delta = rb_event_time_stamp(event); 4599 ts += delta; 4600 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4601 e, ts, delta); 4602 break; 4603 4604 case RINGBUF_TYPE_TIME_STAMP: 4605 delta = rb_event_time_stamp(event); 4606 ts = rb_fix_abs_ts(delta, ts); 4607 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4608 e, ts, delta); 4609 break; 4610 4611 case RINGBUF_TYPE_PADDING: 4612 ts += event->time_delta; 4613 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4614 e, ts, event->time_delta); 4615 break; 4616 4617 case RINGBUF_TYPE_DATA: 4618 ts += event->time_delta; 4619 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4620 e, ts, event->time_delta, 4621 show_flags(event), show_irq(event)); 4622 break; 4623 4624 default: 4625 break; 4626 } 4627 } 4628 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4629 } 4630 4631 static DEFINE_PER_CPU(atomic_t, checking); 4632 static atomic_t ts_dump; 4633 4634 #define buffer_warn_return(fmt, ...) \ 4635 do { \ 4636 /* If another report is happening, ignore this one */ \ 4637 if (atomic_inc_return(&ts_dump) != 1) { \ 4638 atomic_dec(&ts_dump); \ 4639 goto out; \ 4640 } \ 4641 atomic_inc(&cpu_buffer->record_disabled); \ 4642 pr_warn(fmt, ##__VA_ARGS__); \ 4643 dump_buffer_page(dpage, info, tail); \ 4644 atomic_dec(&ts_dump); \ 4645 /* There's some cases in boot up that this can happen */ \ 4646 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4647 /* Do not re-enable checking */ \ 4648 return; \ 4649 } while (0) 4650 4651 /* 4652 * Check if the current event time stamp matches the deltas on 4653 * the buffer page. 4654 */ 4655 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4656 struct rb_event_info *info, 4657 unsigned long tail) 4658 { 4659 struct buffer_data_page *dpage; 4660 u64 ts, delta; 4661 bool full = false; 4662 int ret; 4663 4664 dpage = info->tail_page->page; 4665 4666 if (tail == CHECK_FULL_PAGE) { 4667 full = true; 4668 tail = rb_data_page_commit(dpage); 4669 } else if (info->add_timestamp & 4670 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4671 /* Ignore events with absolute time stamps */ 4672 return; 4673 } 4674 4675 /* 4676 * Do not check the first event (skip possible extends too). 4677 * Also do not check if previous events have not been committed. 4678 */ 4679 if (tail <= 8 || tail > rb_data_page_commit(dpage)) 4680 return; 4681 4682 /* 4683 * If this interrupted another event, 4684 */ 4685 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4686 goto out; 4687 4688 ret = rb_read_data_buffer(dpage, tail, cpu_buffer->cpu, &ts, &delta); 4689 if (ret < 0) { 4690 if (delta < ts) { 4691 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld clock:%pS\n", 4692 cpu_buffer->cpu, ts, delta, 4693 cpu_buffer->buffer->clock); 4694 goto out; 4695 } 4696 } 4697 if ((full && ts > info->ts) || 4698 (!full && ts + info->delta != info->ts)) { 4699 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\ntrace clock:%pS", 4700 cpu_buffer->cpu, 4701 ts + info->delta, info->ts, info->delta, 4702 info->before, info->after, 4703 full ? " (full)" : "", show_interrupt_level(), 4704 cpu_buffer->buffer->clock); 4705 } 4706 out: 4707 atomic_dec(this_cpu_ptr(&checking)); 4708 } 4709 #else 4710 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4711 struct rb_event_info *info, 4712 unsigned long tail) 4713 { 4714 } 4715 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4716 4717 static struct ring_buffer_event * 4718 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4719 struct rb_event_info *info) 4720 { 4721 struct ring_buffer_event *event; 4722 struct buffer_page *tail_page; 4723 unsigned long tail, write, w; 4724 4725 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4726 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4727 4728 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4729 barrier(); 4730 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4731 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4732 barrier(); 4733 info->ts = rb_time_stamp(cpu_buffer->buffer); 4734 4735 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4736 info->delta = info->ts; 4737 } else { 4738 /* 4739 * If interrupting an event time update, we may need an 4740 * absolute timestamp. 4741 * Don't bother if this is the start of a new page (w == 0). 4742 */ 4743 if (!w) { 4744 /* Use the sub-buffer timestamp */ 4745 info->delta = 0; 4746 } else if (unlikely(info->before != info->after)) { 4747 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4748 info->length += RB_LEN_TIME_EXTEND; 4749 } else { 4750 info->delta = info->ts - info->after; 4751 if (unlikely(test_time_stamp(info->delta))) { 4752 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4753 info->length += RB_LEN_TIME_EXTEND; 4754 } 4755 } 4756 } 4757 4758 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4759 4760 /*C*/ write = local_add_return(info->length, &tail_page->write); 4761 4762 /* set write to only the index of the write */ 4763 write &= RB_WRITE_MASK; 4764 4765 tail = write - info->length; 4766 4767 /* See if we shot pass the end of this buffer page */ 4768 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4769 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4770 return rb_move_tail(cpu_buffer, tail, info); 4771 } 4772 4773 if (likely(tail == w)) { 4774 /* Nothing interrupted us between A and C */ 4775 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4776 /* 4777 * If something came in between C and D, the write stamp 4778 * may now not be in sync. But that's fine as the before_stamp 4779 * will be different and then next event will just be forced 4780 * to use an absolute timestamp. 4781 */ 4782 if (likely(!(info->add_timestamp & 4783 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4784 /* This did not interrupt any time update */ 4785 info->delta = info->ts - info->after; 4786 else 4787 /* Just use full timestamp for interrupting event */ 4788 info->delta = info->ts; 4789 check_buffer(cpu_buffer, info, tail); 4790 } else { 4791 u64 ts; 4792 /* SLOW PATH - Interrupted between A and C */ 4793 4794 /* Save the old before_stamp */ 4795 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4796 4797 /* 4798 * Read a new timestamp and update the before_stamp to make 4799 * the next event after this one force using an absolute 4800 * timestamp. This is in case an interrupt were to come in 4801 * between E and F. 4802 */ 4803 ts = rb_time_stamp(cpu_buffer->buffer); 4804 rb_time_set(&cpu_buffer->before_stamp, ts); 4805 4806 barrier(); 4807 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4808 barrier(); 4809 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4810 info->after == info->before && info->after < ts) { 4811 /* 4812 * Nothing came after this event between C and F, it is 4813 * safe to use info->after for the delta as it 4814 * matched info->before and is still valid. 4815 */ 4816 info->delta = ts - info->after; 4817 } else { 4818 /* 4819 * Interrupted between C and F: 4820 * Lost the previous events time stamp. Just set the 4821 * delta to zero, and this will be the same time as 4822 * the event this event interrupted. And the events that 4823 * came after this will still be correct (as they would 4824 * have built their delta on the previous event. 4825 */ 4826 info->delta = 0; 4827 } 4828 info->ts = ts; 4829 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4830 } 4831 4832 /* 4833 * If this is the first commit on the page, then it has the same 4834 * timestamp as the page itself. 4835 */ 4836 if (unlikely(!tail && !(info->add_timestamp & 4837 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4838 info->delta = 0; 4839 4840 /* We reserved something on the buffer */ 4841 4842 event = __rb_page_index(tail_page, tail); 4843 rb_update_event(cpu_buffer, event, info); 4844 4845 local_inc(&tail_page->entries); 4846 4847 /* 4848 * If this is the first commit on the page, then update 4849 * its timestamp. 4850 */ 4851 if (unlikely(!tail)) 4852 tail_page->page->time_stamp = info->ts; 4853 4854 /* account for these added bytes */ 4855 local_add(info->length, &cpu_buffer->entries_bytes); 4856 4857 return event; 4858 } 4859 4860 static __always_inline struct ring_buffer_event * 4861 rb_reserve_next_event(struct trace_buffer *buffer, 4862 struct ring_buffer_per_cpu *cpu_buffer, 4863 unsigned long length) 4864 { 4865 struct ring_buffer_event *event; 4866 struct rb_event_info info; 4867 int nr_loops = 0; 4868 int add_ts_default; 4869 4870 /* 4871 * ring buffer does cmpxchg as well as atomic64 operations 4872 * (which some archs use locking for atomic64), make sure this 4873 * is safe in NMI context 4874 */ 4875 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4876 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4877 (unlikely(in_nmi()))) { 4878 return NULL; 4879 } 4880 4881 rb_start_commit(cpu_buffer); 4882 /* The commit page can not change after this */ 4883 4884 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4885 /* 4886 * Due to the ability to swap a cpu buffer from a buffer 4887 * it is possible it was swapped before we committed. 4888 * (committing stops a swap). We check for it here and 4889 * if it happened, we have to fail the write. 4890 */ 4891 barrier(); 4892 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4893 local_dec(&cpu_buffer->committing); 4894 local_dec(&cpu_buffer->commits); 4895 return NULL; 4896 } 4897 #endif 4898 4899 info.length = rb_calculate_event_length(length); 4900 4901 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4902 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4903 info.length += RB_LEN_TIME_EXTEND; 4904 if (info.length > cpu_buffer->buffer->max_data_size) 4905 goto out_fail; 4906 } else { 4907 add_ts_default = RB_ADD_STAMP_NONE; 4908 } 4909 4910 again: 4911 info.add_timestamp = add_ts_default; 4912 info.delta = 0; 4913 4914 /* 4915 * We allow for interrupts to reenter here and do a trace. 4916 * If one does, it will cause this original code to loop 4917 * back here. Even with heavy interrupts happening, this 4918 * should only happen a few times in a row. If this happens 4919 * 1000 times in a row, there must be either an interrupt 4920 * storm or we have something buggy. 4921 * Bail! 4922 */ 4923 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4924 goto out_fail; 4925 4926 event = __rb_reserve_next(cpu_buffer, &info); 4927 4928 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4929 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4930 info.length -= RB_LEN_TIME_EXTEND; 4931 goto again; 4932 } 4933 4934 if (likely(event)) 4935 return event; 4936 out_fail: 4937 rb_end_commit(cpu_buffer); 4938 return NULL; 4939 } 4940 4941 /** 4942 * ring_buffer_lock_reserve - reserve a part of the buffer 4943 * @buffer: the ring buffer to reserve from 4944 * @length: the length of the data to reserve (excluding event header) 4945 * 4946 * Returns a reserved event on the ring buffer to copy directly to. 4947 * The user of this interface will need to get the body to write into 4948 * and can use the ring_buffer_event_data() interface. 4949 * 4950 * The length is the length of the data needed, not the event length 4951 * which also includes the event header. 4952 * 4953 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4954 * If NULL is returned, then nothing has been allocated or locked. 4955 */ 4956 struct ring_buffer_event * 4957 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4958 { 4959 struct ring_buffer_per_cpu *cpu_buffer; 4960 struct ring_buffer_event *event; 4961 int cpu; 4962 4963 /* If we are tracing schedule, we don't want to recurse */ 4964 preempt_disable_notrace(); 4965 4966 if (unlikely(atomic_read(&buffer->record_disabled))) 4967 goto out; 4968 4969 cpu = raw_smp_processor_id(); 4970 4971 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4972 goto out; 4973 4974 cpu_buffer = buffer->buffers[cpu]; 4975 4976 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4977 goto out; 4978 4979 if (unlikely(length > buffer->max_data_size)) 4980 goto out; 4981 4982 if (unlikely(trace_recursive_lock(cpu_buffer))) 4983 goto out; 4984 4985 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4986 if (!event) 4987 goto out_unlock; 4988 4989 return event; 4990 4991 out_unlock: 4992 trace_recursive_unlock(cpu_buffer); 4993 out: 4994 preempt_enable_notrace(); 4995 return NULL; 4996 } 4997 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4998 4999 /* 5000 * Decrement the entries to the page that an event is on. 5001 * The event does not even need to exist, only the pointer 5002 * to the page it is on. This may only be called before the commit 5003 * takes place. 5004 */ 5005 static inline void 5006 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 5007 struct ring_buffer_event *event) 5008 { 5009 unsigned long addr = (unsigned long)event; 5010 struct buffer_page *bpage = cpu_buffer->commit_page; 5011 struct buffer_page *start; 5012 5013 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 5014 5015 /* Do the likely case first */ 5016 if (likely(bpage->page == (void *)addr)) { 5017 local_dec(&bpage->entries); 5018 return; 5019 } 5020 5021 /* 5022 * Because the commit page may be on the reader page we 5023 * start with the next page and check the end loop there. 5024 */ 5025 rb_inc_page(&bpage); 5026 start = bpage; 5027 do { 5028 if (bpage->page == (void *)addr) { 5029 local_dec(&bpage->entries); 5030 return; 5031 } 5032 rb_inc_page(&bpage); 5033 } while (bpage != start); 5034 5035 /* commit not part of this buffer?? */ 5036 RB_WARN_ON(cpu_buffer, 1); 5037 } 5038 5039 /** 5040 * ring_buffer_discard_commit - discard an event that has not been committed 5041 * @buffer: the ring buffer 5042 * @event: non committed event to discard 5043 * 5044 * Sometimes an event that is in the ring buffer needs to be ignored. 5045 * This function lets the user discard an event in the ring buffer 5046 * and then that event will not be read later. 5047 * 5048 * This function only works if it is called before the item has been 5049 * committed. It will try to free the event from the ring buffer 5050 * if another event has not been added behind it. 5051 * 5052 * If another event has been added behind it, it will set the event 5053 * up as discarded, and perform the commit. 5054 * 5055 * If this function is called, do not call ring_buffer_unlock_commit on 5056 * the event. 5057 */ 5058 void ring_buffer_discard_commit(struct trace_buffer *buffer, 5059 struct ring_buffer_event *event) 5060 { 5061 struct ring_buffer_per_cpu *cpu_buffer; 5062 int cpu; 5063 5064 /* The event is discarded regardless */ 5065 rb_event_discard(event); 5066 5067 cpu = smp_processor_id(); 5068 cpu_buffer = buffer->buffers[cpu]; 5069 5070 /* 5071 * This must only be called if the event has not been 5072 * committed yet. Thus we can assume that preemption 5073 * is still disabled. 5074 */ 5075 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 5076 5077 rb_decrement_entry(cpu_buffer, event); 5078 rb_try_to_discard(cpu_buffer, event); 5079 rb_end_commit(cpu_buffer); 5080 5081 trace_recursive_unlock(cpu_buffer); 5082 5083 preempt_enable_notrace(); 5084 5085 } 5086 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 5087 5088 /** 5089 * ring_buffer_write - write data to the buffer without reserving 5090 * @buffer: The ring buffer to write to. 5091 * @length: The length of the data being written (excluding the event header) 5092 * @data: The data to write to the buffer. 5093 * 5094 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 5095 * one function. If you already have the data to write to the buffer, it 5096 * may be easier to simply call this function. 5097 * 5098 * Note, like ring_buffer_lock_reserve, the length is the length of the data 5099 * and not the length of the event which would hold the header. 5100 */ 5101 int ring_buffer_write(struct trace_buffer *buffer, 5102 unsigned long length, 5103 void *data) 5104 { 5105 struct ring_buffer_per_cpu *cpu_buffer; 5106 struct ring_buffer_event *event; 5107 void *body; 5108 int ret = -EBUSY; 5109 int cpu; 5110 5111 guard(preempt_notrace)(); 5112 5113 if (atomic_read(&buffer->record_disabled)) 5114 return -EBUSY; 5115 5116 cpu = raw_smp_processor_id(); 5117 5118 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5119 return -EBUSY; 5120 5121 cpu_buffer = buffer->buffers[cpu]; 5122 5123 if (atomic_read(&cpu_buffer->record_disabled)) 5124 return -EBUSY; 5125 5126 if (length > buffer->max_data_size) 5127 return -EBUSY; 5128 5129 if (unlikely(trace_recursive_lock(cpu_buffer))) 5130 return -EBUSY; 5131 5132 event = rb_reserve_next_event(buffer, cpu_buffer, length); 5133 if (!event) 5134 goto out_unlock; 5135 5136 body = rb_event_data(event); 5137 5138 memcpy(body, data, length); 5139 5140 rb_commit(cpu_buffer); 5141 5142 rb_wakeups(buffer, cpu_buffer); 5143 5144 ret = 0; 5145 5146 out_unlock: 5147 trace_recursive_unlock(cpu_buffer); 5148 return ret; 5149 } 5150 EXPORT_SYMBOL_GPL(ring_buffer_write); 5151 5152 /* 5153 * The total entries in the ring buffer is the running counter 5154 * of entries entered into the ring buffer, minus the sum of 5155 * the entries read from the ring buffer and the number of 5156 * entries that were overwritten. 5157 */ 5158 static inline unsigned long 5159 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 5160 { 5161 return local_read(&cpu_buffer->entries) - 5162 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 5163 } 5164 5165 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 5166 { 5167 return !rb_num_of_entries(cpu_buffer); 5168 } 5169 5170 /** 5171 * ring_buffer_record_disable - stop all writes into the buffer 5172 * @buffer: The ring buffer to stop writes to. 5173 * 5174 * This prevents all writes to the buffer. Any attempt to write 5175 * to the buffer after this will fail and return NULL. 5176 * 5177 * The caller should call synchronize_rcu() after this. 5178 */ 5179 void ring_buffer_record_disable(struct trace_buffer *buffer) 5180 { 5181 atomic_inc(&buffer->record_disabled); 5182 } 5183 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 5184 5185 /** 5186 * ring_buffer_record_enable - enable writes to the buffer 5187 * @buffer: The ring buffer to enable writes 5188 * 5189 * Note, multiple disables will need the same number of enables 5190 * to truly enable the writing (much like preempt_disable). 5191 */ 5192 void ring_buffer_record_enable(struct trace_buffer *buffer) 5193 { 5194 atomic_dec(&buffer->record_disabled); 5195 } 5196 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 5197 5198 /** 5199 * ring_buffer_record_off - stop all writes into the buffer 5200 * @buffer: The ring buffer to stop writes to. 5201 * 5202 * This prevents all writes to the buffer. Any attempt to write 5203 * to the buffer after this will fail and return NULL. 5204 * 5205 * This is different than ring_buffer_record_disable() as 5206 * it works like an on/off switch, where as the disable() version 5207 * must be paired with a enable(). 5208 */ 5209 void ring_buffer_record_off(struct trace_buffer *buffer) 5210 { 5211 unsigned int rd; 5212 unsigned int new_rd; 5213 5214 rd = atomic_read(&buffer->record_disabled); 5215 do { 5216 new_rd = rd | RB_BUFFER_OFF; 5217 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5218 } 5219 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 5220 5221 /** 5222 * ring_buffer_record_on - restart writes into the buffer 5223 * @buffer: The ring buffer to start writes to. 5224 * 5225 * This enables all writes to the buffer that was disabled by 5226 * ring_buffer_record_off(). 5227 * 5228 * This is different than ring_buffer_record_enable() as 5229 * it works like an on/off switch, where as the enable() version 5230 * must be paired with a disable(). 5231 */ 5232 void ring_buffer_record_on(struct trace_buffer *buffer) 5233 { 5234 unsigned int rd; 5235 unsigned int new_rd; 5236 5237 rd = atomic_read(&buffer->record_disabled); 5238 do { 5239 new_rd = rd & ~RB_BUFFER_OFF; 5240 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5241 } 5242 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 5243 5244 /** 5245 * ring_buffer_record_is_on - return true if the ring buffer can write 5246 * @buffer: The ring buffer to see if write is enabled 5247 * 5248 * Returns true if the ring buffer is in a state that it accepts writes. 5249 */ 5250 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 5251 { 5252 return !atomic_read(&buffer->record_disabled); 5253 } 5254 5255 /** 5256 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 5257 * @buffer: The ring buffer to see if write is set enabled 5258 * 5259 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 5260 * Note that this does NOT mean it is in a writable state. 5261 * 5262 * It may return true when the ring buffer has been disabled by 5263 * ring_buffer_record_disable(), as that is a temporary disabling of 5264 * the ring buffer. 5265 */ 5266 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 5267 { 5268 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 5269 } 5270 5271 /** 5272 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 5273 * @buffer: The ring buffer to see if write is enabled 5274 * @cpu: The CPU to test if the ring buffer can write too 5275 * 5276 * Returns true if the ring buffer is in a state that it accepts writes 5277 * for a particular CPU. 5278 */ 5279 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5280 { 5281 struct ring_buffer_per_cpu *cpu_buffer; 5282 5283 cpu_buffer = buffer->buffers[cpu]; 5284 5285 return ring_buffer_record_is_set_on(buffer) && 5286 !atomic_read(&cpu_buffer->record_disabled); 5287 } 5288 5289 /** 5290 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5291 * @buffer: The ring buffer to stop writes to. 5292 * @cpu: The CPU buffer to stop 5293 * 5294 * This prevents all writes to the buffer. Any attempt to write 5295 * to the buffer after this will fail and return NULL. 5296 * 5297 * The caller should call synchronize_rcu() after this. 5298 */ 5299 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5300 { 5301 struct ring_buffer_per_cpu *cpu_buffer; 5302 5303 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5304 return; 5305 5306 cpu_buffer = buffer->buffers[cpu]; 5307 atomic_inc(&cpu_buffer->record_disabled); 5308 } 5309 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5310 5311 /** 5312 * ring_buffer_record_enable_cpu - enable writes to the buffer 5313 * @buffer: The ring buffer to enable writes 5314 * @cpu: The CPU to enable. 5315 * 5316 * Note, multiple disables will need the same number of enables 5317 * to truly enable the writing (much like preempt_disable). 5318 */ 5319 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5320 { 5321 struct ring_buffer_per_cpu *cpu_buffer; 5322 5323 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5324 return; 5325 5326 cpu_buffer = buffer->buffers[cpu]; 5327 atomic_dec(&cpu_buffer->record_disabled); 5328 } 5329 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5330 5331 /** 5332 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5333 * @buffer: The ring buffer 5334 * @cpu: The per CPU buffer to read from. 5335 */ 5336 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5337 { 5338 unsigned long flags; 5339 struct ring_buffer_per_cpu *cpu_buffer; 5340 struct buffer_page *bpage; 5341 u64 ret = 0; 5342 5343 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5344 return 0; 5345 5346 cpu_buffer = buffer->buffers[cpu]; 5347 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5348 /* 5349 * if the tail is on reader_page, oldest time stamp is on the reader 5350 * page 5351 */ 5352 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5353 bpage = cpu_buffer->reader_page; 5354 else 5355 bpage = rb_set_head_page(cpu_buffer); 5356 if (bpage) 5357 ret = bpage->page->time_stamp; 5358 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5359 5360 return ret; 5361 } 5362 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5363 5364 /** 5365 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5366 * @buffer: The ring buffer 5367 * @cpu: The per CPU buffer to read from. 5368 */ 5369 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5370 { 5371 struct ring_buffer_per_cpu *cpu_buffer; 5372 unsigned long ret; 5373 5374 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5375 return 0; 5376 5377 cpu_buffer = buffer->buffers[cpu]; 5378 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5379 5380 return ret; 5381 } 5382 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5383 5384 /** 5385 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5386 * @buffer: The ring buffer 5387 * @cpu: The per CPU buffer to get the entries from. 5388 */ 5389 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5390 { 5391 struct ring_buffer_per_cpu *cpu_buffer; 5392 5393 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5394 return 0; 5395 5396 cpu_buffer = buffer->buffers[cpu]; 5397 5398 return rb_num_of_entries(cpu_buffer); 5399 } 5400 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5401 5402 /** 5403 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5404 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5405 * @buffer: The ring buffer 5406 * @cpu: The per CPU buffer to get the number of overruns from 5407 */ 5408 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5409 { 5410 struct ring_buffer_per_cpu *cpu_buffer; 5411 unsigned long ret; 5412 5413 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5414 return 0; 5415 5416 cpu_buffer = buffer->buffers[cpu]; 5417 ret = local_read(&cpu_buffer->overrun); 5418 5419 return ret; 5420 } 5421 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5422 5423 /** 5424 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5425 * commits failing due to the buffer wrapping around while there are uncommitted 5426 * events, such as during an interrupt storm. 5427 * @buffer: The ring buffer 5428 * @cpu: The per CPU buffer to get the number of overruns from 5429 */ 5430 unsigned long 5431 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5432 { 5433 struct ring_buffer_per_cpu *cpu_buffer; 5434 unsigned long ret; 5435 5436 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5437 return 0; 5438 5439 cpu_buffer = buffer->buffers[cpu]; 5440 ret = local_read(&cpu_buffer->commit_overrun); 5441 5442 return ret; 5443 } 5444 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5445 5446 /** 5447 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5448 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5449 * @buffer: The ring buffer 5450 * @cpu: The per CPU buffer to get the number of overruns from 5451 */ 5452 unsigned long 5453 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5454 { 5455 struct ring_buffer_per_cpu *cpu_buffer; 5456 unsigned long ret; 5457 5458 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5459 return 0; 5460 5461 cpu_buffer = buffer->buffers[cpu]; 5462 ret = local_read(&cpu_buffer->dropped_events); 5463 5464 return ret; 5465 } 5466 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5467 5468 /** 5469 * ring_buffer_read_events_cpu - get the number of events successfully read 5470 * @buffer: The ring buffer 5471 * @cpu: The per CPU buffer to get the number of events read 5472 */ 5473 unsigned long 5474 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5475 { 5476 struct ring_buffer_per_cpu *cpu_buffer; 5477 5478 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5479 return 0; 5480 5481 cpu_buffer = buffer->buffers[cpu]; 5482 return cpu_buffer->read; 5483 } 5484 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5485 5486 /** 5487 * ring_buffer_entries - get the number of entries in a buffer 5488 * @buffer: The ring buffer 5489 * 5490 * Returns the total number of entries in the ring buffer 5491 * (all CPU entries) 5492 */ 5493 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5494 { 5495 struct ring_buffer_per_cpu *cpu_buffer; 5496 unsigned long entries = 0; 5497 int cpu; 5498 5499 /* if you care about this being correct, lock the buffer */ 5500 for_each_buffer_cpu(buffer, cpu) { 5501 cpu_buffer = buffer->buffers[cpu]; 5502 entries += rb_num_of_entries(cpu_buffer); 5503 } 5504 5505 return entries; 5506 } 5507 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5508 5509 /** 5510 * ring_buffer_overruns - get the number of overruns in buffer 5511 * @buffer: The ring buffer 5512 * 5513 * Returns the total number of overruns in the ring buffer 5514 * (all CPU entries) 5515 */ 5516 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5517 { 5518 struct ring_buffer_per_cpu *cpu_buffer; 5519 unsigned long overruns = 0; 5520 int cpu; 5521 5522 /* if you care about this being correct, lock the buffer */ 5523 for_each_buffer_cpu(buffer, cpu) { 5524 cpu_buffer = buffer->buffers[cpu]; 5525 overruns += local_read(&cpu_buffer->overrun); 5526 } 5527 5528 return overruns; 5529 } 5530 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5531 5532 static bool rb_read_remote_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5533 { 5534 local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries)); 5535 local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun)); 5536 local_set(&cpu_buffer->pages_touched, READ_ONCE(cpu_buffer->meta_page->pages_touched)); 5537 local_set(&cpu_buffer->pages_lost, READ_ONCE(cpu_buffer->meta_page->pages_lost)); 5538 5539 return rb_num_of_entries(cpu_buffer); 5540 } 5541 5542 static void rb_update_remote_head(struct ring_buffer_per_cpu *cpu_buffer) 5543 { 5544 struct buffer_page *next, *orig; 5545 int retry = 3; 5546 5547 orig = next = cpu_buffer->head_page; 5548 rb_inc_page(&next); 5549 5550 /* Run after the writer */ 5551 while (cpu_buffer->head_page->page->time_stamp > next->page->time_stamp) { 5552 rb_inc_page(&next); 5553 5554 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5555 rb_inc_page(&cpu_buffer->head_page); 5556 rb_set_list_to_head(cpu_buffer->head_page->list.prev); 5557 5558 if (cpu_buffer->head_page == orig) { 5559 if (WARN_ON_ONCE(!(--retry))) 5560 return; 5561 } 5562 } 5563 5564 orig = cpu_buffer->commit_page = cpu_buffer->head_page; 5565 retry = 3; 5566 5567 while (cpu_buffer->commit_page->page->time_stamp < next->page->time_stamp) { 5568 rb_inc_page(&next); 5569 rb_inc_page(&cpu_buffer->commit_page); 5570 5571 if (cpu_buffer->commit_page == orig) { 5572 if (WARN_ON_ONCE(!(--retry))) 5573 return; 5574 } 5575 } 5576 } 5577 5578 static void rb_iter_reset(struct ring_buffer_iter *iter) 5579 { 5580 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5581 5582 if (cpu_buffer->remote) { 5583 rb_read_remote_meta_page(cpu_buffer); 5584 rb_update_remote_head(cpu_buffer); 5585 } 5586 5587 /* Iterator usage is expected to have record disabled */ 5588 iter->head_page = cpu_buffer->reader_page; 5589 iter->head = cpu_buffer->reader_page->read; 5590 iter->next_event = iter->head; 5591 iter->missed_events = 0; 5592 5593 iter->cache_reader_page = iter->head_page; 5594 iter->cache_read = cpu_buffer->read; 5595 iter->cache_pages_removed = cpu_buffer->pages_removed; 5596 5597 if (iter->head) { 5598 iter->read_stamp = cpu_buffer->read_stamp; 5599 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5600 } else { 5601 iter->read_stamp = iter->head_page->page->time_stamp; 5602 iter->page_stamp = iter->read_stamp; 5603 } 5604 } 5605 5606 /** 5607 * ring_buffer_iter_reset - reset an iterator 5608 * @iter: The iterator to reset 5609 * 5610 * Resets the iterator, so that it will start from the beginning 5611 * again. 5612 */ 5613 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5614 { 5615 struct ring_buffer_per_cpu *cpu_buffer; 5616 unsigned long flags; 5617 5618 if (!iter) 5619 return; 5620 5621 cpu_buffer = iter->cpu_buffer; 5622 5623 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5624 rb_iter_reset(iter); 5625 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5626 } 5627 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5628 5629 /** 5630 * ring_buffer_iter_empty - check if an iterator has no more to read 5631 * @iter: The iterator to check 5632 */ 5633 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5634 { 5635 struct ring_buffer_per_cpu *cpu_buffer; 5636 struct buffer_page *reader; 5637 struct buffer_page *head_page; 5638 struct buffer_page *commit_page; 5639 struct buffer_page *curr_commit_page; 5640 unsigned commit; 5641 u64 curr_commit_ts; 5642 u64 commit_ts; 5643 5644 cpu_buffer = iter->cpu_buffer; 5645 reader = cpu_buffer->reader_page; 5646 head_page = cpu_buffer->head_page; 5647 commit_page = READ_ONCE(cpu_buffer->commit_page); 5648 commit_ts = commit_page->page->time_stamp; 5649 5650 /* 5651 * When the writer goes across pages, it issues a cmpxchg which 5652 * is a mb(), which will synchronize with the rmb here. 5653 * (see rb_tail_page_update()) 5654 */ 5655 smp_rmb(); 5656 commit = rb_page_size(commit_page); 5657 /* We want to make sure that the commit page doesn't change */ 5658 smp_rmb(); 5659 5660 /* Make sure commit page didn't change */ 5661 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5662 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5663 5664 /* If the commit page changed, then there's more data */ 5665 if (curr_commit_page != commit_page || 5666 curr_commit_ts != commit_ts) 5667 return 0; 5668 5669 /* Still racy, as it may return a false positive, but that's OK */ 5670 return ((iter->head_page == commit_page && iter->head >= commit) || 5671 (iter->head_page == reader && commit_page == head_page && 5672 head_page->read == commit && 5673 iter->head == rb_page_size(cpu_buffer->reader_page))); 5674 } 5675 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5676 5677 static void 5678 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5679 struct ring_buffer_event *event) 5680 { 5681 u64 delta; 5682 5683 switch (event->type_len) { 5684 case RINGBUF_TYPE_PADDING: 5685 return; 5686 5687 case RINGBUF_TYPE_TIME_EXTEND: 5688 delta = rb_event_time_stamp(event); 5689 cpu_buffer->read_stamp += delta; 5690 return; 5691 5692 case RINGBUF_TYPE_TIME_STAMP: 5693 delta = rb_event_time_stamp(event); 5694 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5695 cpu_buffer->read_stamp = delta; 5696 return; 5697 5698 case RINGBUF_TYPE_DATA: 5699 cpu_buffer->read_stamp += event->time_delta; 5700 return; 5701 5702 default: 5703 RB_WARN_ON(cpu_buffer, 1); 5704 } 5705 } 5706 5707 static void 5708 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5709 struct ring_buffer_event *event) 5710 { 5711 u64 delta; 5712 5713 switch (event->type_len) { 5714 case RINGBUF_TYPE_PADDING: 5715 return; 5716 5717 case RINGBUF_TYPE_TIME_EXTEND: 5718 delta = rb_event_time_stamp(event); 5719 iter->read_stamp += delta; 5720 return; 5721 5722 case RINGBUF_TYPE_TIME_STAMP: 5723 delta = rb_event_time_stamp(event); 5724 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5725 iter->read_stamp = delta; 5726 return; 5727 5728 case RINGBUF_TYPE_DATA: 5729 iter->read_stamp += event->time_delta; 5730 return; 5731 5732 default: 5733 RB_WARN_ON(iter->cpu_buffer, 1); 5734 } 5735 } 5736 5737 static struct buffer_page * 5738 __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer) 5739 { 5740 struct buffer_page *new_reader, *prev_reader, *prev_head, *new_head, *last; 5741 5742 if (!rb_read_remote_meta_page(cpu_buffer)) 5743 return NULL; 5744 5745 /* More to read on the reader page */ 5746 if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page)) { 5747 if (!cpu_buffer->reader_page->read) 5748 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 5749 return cpu_buffer->reader_page; 5750 } 5751 5752 prev_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5753 5754 WARN_ON_ONCE(cpu_buffer->remote->swap_reader_page(cpu_buffer->cpu, 5755 cpu_buffer->remote->priv)); 5756 /* nr_pages doesn't include the reader page */ 5757 if (WARN_ON_ONCE(cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages)) 5758 return NULL; 5759 5760 new_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5761 5762 WARN_ON_ONCE(prev_reader == new_reader); 5763 5764 prev_head = new_reader; /* New reader was also the previous head */ 5765 new_head = prev_head; 5766 rb_inc_page(&new_head); 5767 last = prev_head; 5768 rb_dec_page(&last); 5769 5770 /* Clear the old HEAD flag */ 5771 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5772 5773 prev_reader->list.next = prev_head->list.next; 5774 prev_reader->list.prev = prev_head->list.prev; 5775 5776 /* Swap prev_reader with new_reader */ 5777 last->list.next = &prev_reader->list; 5778 new_head->list.prev = &prev_reader->list; 5779 5780 new_reader->list.prev = &new_reader->list; 5781 new_reader->list.next = &new_head->list; 5782 5783 /* Reactivate the HEAD flag */ 5784 rb_set_list_to_head(&last->list); 5785 5786 cpu_buffer->head_page = new_head; 5787 cpu_buffer->reader_page = new_reader; 5788 cpu_buffer->pages = &new_head->list; 5789 cpu_buffer->read_stamp = new_reader->page->time_stamp; 5790 cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; 5791 5792 return rb_page_size(cpu_buffer->reader_page) ? cpu_buffer->reader_page : NULL; 5793 } 5794 5795 static struct buffer_page * 5796 __rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5797 { 5798 int max_loops = cpu_buffer->ring_meta ? cpu_buffer->nr_pages : 3; 5799 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5800 struct buffer_page *reader = NULL; 5801 unsigned long overwrite; 5802 unsigned long flags; 5803 int missed_events = 0; 5804 int nr_loops = 0; 5805 bool ret; 5806 5807 local_irq_save(flags); 5808 arch_spin_lock(&cpu_buffer->lock); 5809 5810 again: 5811 /* 5812 * This should normally only loop twice. But because the 5813 * start of the reader inserts an empty page, it causes a 5814 * case where we will loop three times. There should be no 5815 * reason to loop four times unless the ring buffer is a 5816 * recovered persistent ring buffer. For persistent ring buffers, 5817 * invalid pages are reset during recovery, so there may be more 5818 * than 3 contiguous pages can be empty, but less than nr_pages. 5819 */ 5820 if (RB_WARN_ON(cpu_buffer, ++nr_loops > max_loops)) { 5821 reader = NULL; 5822 goto out; 5823 } 5824 5825 reader = cpu_buffer->reader_page; 5826 5827 /* If there's more to read, return this page */ 5828 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5829 goto out; 5830 5831 /* Never should we have an index greater than the size */ 5832 if (RB_WARN_ON(cpu_buffer, 5833 cpu_buffer->reader_page->read > rb_page_size(reader))) 5834 goto out; 5835 5836 /* check if we caught up to the tail */ 5837 reader = NULL; 5838 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5839 goto out; 5840 5841 /* Don't bother swapping if the ring buffer is empty */ 5842 if (rb_num_of_entries(cpu_buffer) == 0) 5843 goto out; 5844 5845 /* 5846 * Reset the reader page to size zero. 5847 */ 5848 local_set(&cpu_buffer->reader_page->write, 0); 5849 local_set(&cpu_buffer->reader_page->entries, 0); 5850 rb_init_data_page(cpu_buffer->reader_page->page); 5851 cpu_buffer->reader_page->real_end = 0; 5852 5853 spin: 5854 /* 5855 * Splice the empty reader page into the list around the head. 5856 */ 5857 reader = rb_set_head_page(cpu_buffer); 5858 if (!reader) 5859 goto out; 5860 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5861 cpu_buffer->reader_page->list.prev = reader->list.prev; 5862 5863 /* 5864 * cpu_buffer->pages just needs to point to the buffer, it 5865 * has no specific buffer page to point to. Lets move it out 5866 * of our way so we don't accidentally swap it. 5867 */ 5868 cpu_buffer->pages = reader->list.prev; 5869 5870 /* The reader page will be pointing to the new head */ 5871 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5872 5873 /* 5874 * We want to make sure we read the overruns after we set up our 5875 * pointers to the next object. The writer side does a 5876 * cmpxchg to cross pages which acts as the mb on the writer 5877 * side. Note, the reader will constantly fail the swap 5878 * while the writer is updating the pointers, so this 5879 * guarantees that the overwrite recorded here is the one we 5880 * want to compare with the last_overrun. 5881 */ 5882 smp_mb(); 5883 overwrite = local_read(&(cpu_buffer->overrun)); 5884 5885 /* 5886 * Here's the tricky part. 5887 * 5888 * We need to move the pointer past the header page. 5889 * But we can only do that if a writer is not currently 5890 * moving it. The page before the header page has the 5891 * flag bit '1' set if it is pointing to the page we want. 5892 * but if the writer is in the process of moving it 5893 * then it will be '2' or already moved '0'. 5894 */ 5895 5896 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5897 5898 /* 5899 * If we did not convert it, then we must try again. 5900 */ 5901 if (!ret) 5902 goto spin; 5903 5904 if (rb_page_commit(reader) & RB_MISSED_EVENTS) 5905 missed_events = -1; 5906 5907 if (cpu_buffer->ring_meta) 5908 rb_update_meta_reader(cpu_buffer, reader); 5909 5910 /* 5911 * Yay! We succeeded in replacing the page. 5912 * 5913 * Now make the new head point back to the reader page. 5914 */ 5915 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5916 rb_inc_page(&cpu_buffer->head_page); 5917 5918 cpu_buffer->cnt++; 5919 local_inc(&cpu_buffer->pages_read); 5920 5921 /* Finally update the reader page to the new head */ 5922 cpu_buffer->reader_page = reader; 5923 cpu_buffer->reader_page->read = 0; 5924 5925 if (overwrite != cpu_buffer->last_overrun) { 5926 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5927 cpu_buffer->last_overrun = overwrite; 5928 } 5929 5930 goto again; 5931 5932 out: 5933 /* Update the read_stamp on the first event */ 5934 if (reader && reader->read == 0) 5935 cpu_buffer->read_stamp = reader->page->time_stamp; 5936 5937 arch_spin_unlock(&cpu_buffer->lock); 5938 local_irq_restore(flags); 5939 5940 /* 5941 * The writer has preempt disable, wait for it. But not forever 5942 * Although, 1 second is pretty much "forever" 5943 */ 5944 #define USECS_WAIT 1000000 5945 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5946 /* If the write is past the end of page, a writer is still updating it */ 5947 if (likely(!reader || rb_page_write(reader) <= bsize)) 5948 break; 5949 5950 udelay(1); 5951 5952 /* Get the latest version of the reader write value */ 5953 smp_rmb(); 5954 } 5955 5956 /* The writer is not moving forward? Something is wrong */ 5957 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5958 reader = NULL; 5959 5960 /* 5961 * Make sure we see any padding after the write update 5962 * (see rb_reset_tail()). 5963 * 5964 * In addition, a writer may be writing on the reader page 5965 * if the page has not been fully filled, so the read barrier 5966 * is also needed to make sure we see the content of what is 5967 * committed by the writer (see rb_set_commit_to_write()). 5968 */ 5969 smp_rmb(); 5970 5971 if (!cpu_buffer->lost_events) 5972 cpu_buffer->lost_events = missed_events; 5973 5974 return reader; 5975 } 5976 5977 static struct buffer_page * 5978 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5979 { 5980 return cpu_buffer->remote ? __rb_get_reader_page_from_remote(cpu_buffer) : 5981 __rb_get_reader_page(cpu_buffer); 5982 } 5983 5984 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5985 { 5986 struct ring_buffer_event *event; 5987 struct buffer_page *reader; 5988 unsigned length; 5989 5990 reader = rb_get_reader_page(cpu_buffer); 5991 5992 /* This function should not be called when buffer is empty */ 5993 if (RB_WARN_ON(cpu_buffer, !reader)) 5994 return; 5995 5996 event = rb_reader_event(cpu_buffer); 5997 5998 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5999 cpu_buffer->read++; 6000 6001 rb_update_read_stamp(cpu_buffer, event); 6002 6003 length = rb_event_length(event); 6004 cpu_buffer->reader_page->read += length; 6005 cpu_buffer->read_bytes += length; 6006 } 6007 6008 static void rb_advance_iter(struct ring_buffer_iter *iter) 6009 { 6010 struct ring_buffer_per_cpu *cpu_buffer; 6011 6012 cpu_buffer = iter->cpu_buffer; 6013 6014 /* If head == next_event then we need to jump to the next event */ 6015 if (iter->head == iter->next_event) { 6016 /* If the event gets overwritten again, there's nothing to do */ 6017 if (rb_iter_head_event(iter) == NULL) 6018 return; 6019 } 6020 6021 iter->head = iter->next_event; 6022 6023 /* 6024 * Check if we are at the end of the buffer. 6025 */ 6026 if (iter->next_event >= rb_page_size(iter->head_page)) { 6027 /* discarded commits can make the page empty */ 6028 if (iter->head_page == cpu_buffer->commit_page) 6029 return; 6030 rb_inc_iter(iter); 6031 return; 6032 } 6033 6034 rb_update_iter_read_stamp(iter, iter->event); 6035 } 6036 6037 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 6038 { 6039 return cpu_buffer->lost_events; 6040 } 6041 6042 static struct ring_buffer_event * 6043 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 6044 unsigned long *lost_events) 6045 { 6046 struct ring_buffer_event *event; 6047 struct buffer_page *reader; 6048 int nr_loops = 0; 6049 6050 if (ts) 6051 *ts = 0; 6052 again: 6053 /* 6054 * We repeat when a time extend is encountered. 6055 * Since the time extend is always attached to a data event, 6056 * we should never loop more than once. 6057 * (We never hit the following condition more than twice). 6058 */ 6059 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 6060 return NULL; 6061 6062 reader = rb_get_reader_page(cpu_buffer); 6063 if (!reader) 6064 return NULL; 6065 6066 event = rb_reader_event(cpu_buffer); 6067 6068 switch (event->type_len) { 6069 case RINGBUF_TYPE_PADDING: 6070 if (rb_null_event(event)) 6071 RB_WARN_ON(cpu_buffer, 1); 6072 /* 6073 * Because the writer could be discarding every 6074 * event it creates (which would probably be bad) 6075 * if we were to go back to "again" then we may never 6076 * catch up, and will trigger the warn on, or lock 6077 * the box. Return the padding, and we will release 6078 * the current locks, and try again. 6079 */ 6080 return event; 6081 6082 case RINGBUF_TYPE_TIME_EXTEND: 6083 /* Internal data, OK to advance */ 6084 rb_advance_reader(cpu_buffer); 6085 goto again; 6086 6087 case RINGBUF_TYPE_TIME_STAMP: 6088 if (ts) { 6089 *ts = rb_event_time_stamp(event); 6090 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 6091 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 6092 cpu_buffer->cpu, ts); 6093 } 6094 /* Internal data, OK to advance */ 6095 rb_advance_reader(cpu_buffer); 6096 goto again; 6097 6098 case RINGBUF_TYPE_DATA: 6099 if (ts && !(*ts)) { 6100 *ts = cpu_buffer->read_stamp + event->time_delta; 6101 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 6102 cpu_buffer->cpu, ts); 6103 } 6104 if (lost_events) 6105 *lost_events = rb_lost_events(cpu_buffer); 6106 return event; 6107 6108 default: 6109 RB_WARN_ON(cpu_buffer, 1); 6110 } 6111 6112 return NULL; 6113 } 6114 EXPORT_SYMBOL_GPL(ring_buffer_peek); 6115 6116 static struct ring_buffer_event * 6117 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 6118 { 6119 struct trace_buffer *buffer; 6120 struct ring_buffer_per_cpu *cpu_buffer; 6121 struct ring_buffer_event *event; 6122 int nr_loops = 0; 6123 int max_loops; 6124 6125 if (ts) 6126 *ts = 0; 6127 6128 cpu_buffer = iter->cpu_buffer; 6129 buffer = cpu_buffer->buffer; 6130 max_loops = cpu_buffer->ring_meta ? cpu_buffer->nr_pages : 3; 6131 6132 /* 6133 * Check if someone performed a consuming read to the buffer 6134 * or removed some pages from the buffer. In these cases, 6135 * iterator was invalidated and we need to reset it. 6136 */ 6137 if (unlikely(iter->cache_read != cpu_buffer->read || 6138 iter->cache_reader_page != cpu_buffer->reader_page || 6139 iter->cache_pages_removed != cpu_buffer->pages_removed)) 6140 rb_iter_reset(iter); 6141 6142 again: 6143 if (ring_buffer_iter_empty(iter)) 6144 return NULL; 6145 6146 /* 6147 * As the writer can mess with what the iterator is trying 6148 * to read, just give up if we fail to get an event after 6149 * three tries. The iterator is not as reliable when reading 6150 * the ring buffer with an active write as the consumer is. 6151 * Do not warn if the three failures is reached. 6152 */ 6153 if (++nr_loops > max_loops) 6154 return NULL; 6155 6156 if (rb_per_cpu_empty(cpu_buffer)) 6157 return NULL; 6158 6159 if (iter->head >= rb_page_size(iter->head_page)) { 6160 rb_inc_iter(iter); 6161 goto again; 6162 } 6163 6164 event = rb_iter_head_event(iter); 6165 if (!event) 6166 goto again; 6167 6168 switch (event->type_len) { 6169 case RINGBUF_TYPE_PADDING: 6170 if (rb_null_event(event)) { 6171 rb_inc_iter(iter); 6172 goto again; 6173 } 6174 rb_advance_iter(iter); 6175 return event; 6176 6177 case RINGBUF_TYPE_TIME_EXTEND: 6178 /* Internal data, OK to advance */ 6179 rb_advance_iter(iter); 6180 goto again; 6181 6182 case RINGBUF_TYPE_TIME_STAMP: 6183 if (ts) { 6184 *ts = rb_event_time_stamp(event); 6185 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 6186 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 6187 cpu_buffer->cpu, ts); 6188 } 6189 /* Internal data, OK to advance */ 6190 rb_advance_iter(iter); 6191 goto again; 6192 6193 case RINGBUF_TYPE_DATA: 6194 if (ts && !(*ts)) { 6195 *ts = iter->read_stamp + event->time_delta; 6196 ring_buffer_normalize_time_stamp(buffer, 6197 cpu_buffer->cpu, ts); 6198 } 6199 return event; 6200 6201 default: 6202 RB_WARN_ON(cpu_buffer, 1); 6203 } 6204 6205 return NULL; 6206 } 6207 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 6208 6209 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 6210 { 6211 if (likely(!in_nmi())) { 6212 raw_spin_lock(&cpu_buffer->reader_lock); 6213 return true; 6214 } 6215 6216 /* 6217 * If an NMI die dumps out the content of the ring buffer 6218 * trylock must be used to prevent a deadlock if the NMI 6219 * preempted a task that holds the ring buffer locks. If 6220 * we get the lock then all is fine, if not, then continue 6221 * to do the read, but this can corrupt the ring buffer, 6222 * so it must be permanently disabled from future writes. 6223 * Reading from NMI is a oneshot deal. 6224 */ 6225 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 6226 return true; 6227 6228 /* Continue without locking, but disable the ring buffer */ 6229 atomic_inc(&cpu_buffer->record_disabled); 6230 return false; 6231 } 6232 6233 static inline void 6234 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 6235 { 6236 if (likely(locked)) 6237 raw_spin_unlock(&cpu_buffer->reader_lock); 6238 } 6239 6240 /** 6241 * ring_buffer_peek - peek at the next event to be read 6242 * @buffer: The ring buffer to read 6243 * @cpu: The cpu to peak at 6244 * @ts: The timestamp counter of this event. 6245 * @lost_events: a variable to store if events were lost (may be NULL) 6246 * 6247 * This will return the event that will be read next, but does 6248 * not consume the data. 6249 */ 6250 struct ring_buffer_event * 6251 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 6252 unsigned long *lost_events) 6253 { 6254 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6255 struct ring_buffer_event *event; 6256 unsigned long flags; 6257 bool dolock; 6258 6259 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6260 return NULL; 6261 6262 again: 6263 local_irq_save(flags); 6264 dolock = rb_reader_lock(cpu_buffer); 6265 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6266 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6267 rb_advance_reader(cpu_buffer); 6268 rb_reader_unlock(cpu_buffer, dolock); 6269 local_irq_restore(flags); 6270 6271 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6272 goto again; 6273 6274 return event; 6275 } 6276 6277 /** ring_buffer_iter_dropped - report if there are dropped events 6278 * @iter: The ring buffer iterator 6279 * 6280 * Returns true if there was dropped events since the last peek. 6281 */ 6282 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 6283 { 6284 return iter->missed_events != 0; 6285 } 6286 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 6287 6288 /** 6289 * ring_buffer_iter_peek - peek at the next event to be read 6290 * @iter: The ring buffer iterator 6291 * @ts: The timestamp counter of this event. 6292 * 6293 * This will return the event that will be read next, but does 6294 * not increment the iterator. 6295 */ 6296 struct ring_buffer_event * 6297 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 6298 { 6299 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6300 struct ring_buffer_event *event; 6301 unsigned long flags; 6302 6303 again: 6304 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6305 event = rb_iter_peek(iter, ts); 6306 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6307 6308 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6309 goto again; 6310 6311 return event; 6312 } 6313 6314 /** 6315 * ring_buffer_consume - return an event and consume it 6316 * @buffer: The ring buffer to get the next event from 6317 * @cpu: the cpu to read the buffer from 6318 * @ts: a variable to store the timestamp (may be NULL) 6319 * @lost_events: a variable to store if events were lost (may be NULL) 6320 * 6321 * Returns the next event in the ring buffer, and that event is consumed. 6322 * Meaning, that sequential reads will keep returning a different event, 6323 * and eventually empty the ring buffer if the producer is slower. 6324 */ 6325 struct ring_buffer_event * 6326 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 6327 unsigned long *lost_events) 6328 { 6329 struct ring_buffer_per_cpu *cpu_buffer; 6330 struct ring_buffer_event *event = NULL; 6331 unsigned long flags; 6332 bool dolock; 6333 6334 again: 6335 /* might be called in atomic */ 6336 preempt_disable(); 6337 6338 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6339 goto out; 6340 6341 cpu_buffer = buffer->buffers[cpu]; 6342 local_irq_save(flags); 6343 dolock = rb_reader_lock(cpu_buffer); 6344 6345 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6346 if (event) { 6347 cpu_buffer->lost_events = 0; 6348 rb_advance_reader(cpu_buffer); 6349 } 6350 6351 rb_reader_unlock(cpu_buffer, dolock); 6352 local_irq_restore(flags); 6353 6354 out: 6355 preempt_enable(); 6356 6357 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6358 goto again; 6359 6360 return event; 6361 } 6362 EXPORT_SYMBOL_GPL(ring_buffer_consume); 6363 6364 /** 6365 * ring_buffer_read_start - start a non consuming read of the buffer 6366 * @buffer: The ring buffer to read from 6367 * @cpu: The cpu buffer to iterate over 6368 * @flags: gfp flags to use for memory allocation 6369 * 6370 * This creates an iterator to allow non-consuming iteration through 6371 * the buffer. If the buffer is disabled for writing, it will produce 6372 * the same information each time, but if the buffer is still writing 6373 * then the first hit of a write will cause the iteration to stop. 6374 * 6375 * Must be paired with ring_buffer_read_finish. 6376 */ 6377 struct ring_buffer_iter * 6378 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 6379 { 6380 struct ring_buffer_per_cpu *cpu_buffer; 6381 struct ring_buffer_iter *iter; 6382 6383 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6384 return NULL; 6385 6386 iter = kzalloc_obj(*iter, flags); 6387 if (!iter) 6388 return NULL; 6389 6390 /* Holds the entire event: data and meta data */ 6391 iter->event_size = buffer->subbuf_size; 6392 iter->event = kmalloc(iter->event_size, flags); 6393 if (!iter->event) { 6394 kfree(iter); 6395 return NULL; 6396 } 6397 6398 cpu_buffer = buffer->buffers[cpu]; 6399 6400 iter->cpu_buffer = cpu_buffer; 6401 6402 atomic_inc(&cpu_buffer->resize_disabled); 6403 6404 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6405 arch_spin_lock(&cpu_buffer->lock); 6406 rb_iter_reset(iter); 6407 arch_spin_unlock(&cpu_buffer->lock); 6408 6409 return iter; 6410 } 6411 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6412 6413 /** 6414 * ring_buffer_read_finish - finish reading the iterator of the buffer 6415 * @iter: The iterator retrieved by ring_buffer_start 6416 * 6417 * This re-enables resizing of the buffer, and frees the iterator. 6418 */ 6419 void 6420 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6421 { 6422 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6423 6424 /* Use this opportunity to check the integrity of the ring buffer. */ 6425 rb_check_pages(cpu_buffer); 6426 6427 atomic_dec(&cpu_buffer->resize_disabled); 6428 kfree(iter->event); 6429 kfree(iter); 6430 } 6431 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6432 6433 /** 6434 * ring_buffer_iter_advance - advance the iterator to the next location 6435 * @iter: The ring buffer iterator 6436 * 6437 * Move the location of the iterator such that the next read will 6438 * be the next location of the iterator. 6439 */ 6440 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6441 { 6442 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6443 unsigned long flags; 6444 6445 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6446 iter->missed_events = 0; 6447 rb_advance_iter(iter); 6448 6449 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6450 } 6451 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6452 6453 /** 6454 * ring_buffer_size - return the size of the ring buffer (in bytes) 6455 * @buffer: The ring buffer. 6456 * @cpu: The CPU to get ring buffer size from. 6457 */ 6458 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6459 { 6460 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6461 return 0; 6462 6463 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6464 } 6465 EXPORT_SYMBOL_GPL(ring_buffer_size); 6466 6467 /** 6468 * ring_buffer_max_event_size - return the max data size of an event 6469 * @buffer: The ring buffer. 6470 * 6471 * Returns the maximum size an event can be. 6472 */ 6473 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6474 { 6475 /* If abs timestamp is requested, events have a timestamp too */ 6476 if (ring_buffer_time_stamp_abs(buffer)) 6477 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6478 return buffer->max_data_size; 6479 } 6480 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6481 6482 static void rb_clear_buffer_page(struct buffer_page *page) 6483 { 6484 local_set(&page->write, 0); 6485 local_set(&page->entries, 0); 6486 rb_init_data_page(page->page); 6487 page->read = 0; 6488 } 6489 6490 /* 6491 * When the buffer is memory mapped to user space, each sub buffer 6492 * has a unique id that is used by the meta data to tell the user 6493 * where the current reader page is. 6494 * 6495 * For a normal allocated ring buffer, the id is saved in the buffer page 6496 * id field, and updated via this function. 6497 * 6498 * But for a fixed memory mapped buffer, the id is already assigned for 6499 * fixed memory ordering in the memory layout and can not be used. Instead 6500 * the index of where the page lies in the memory layout is used. 6501 * 6502 * For the normal pages, set the buffer page id with the passed in @id 6503 * value and return that. 6504 * 6505 * For fixed memory mapped pages, get the page index in the memory layout 6506 * and return that as the id. 6507 */ 6508 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6509 struct buffer_page *bpage, int id) 6510 { 6511 /* 6512 * For boot buffers, the id is the index, 6513 * otherwise, set the buffer page with this id 6514 */ 6515 if (cpu_buffer->ring_meta) 6516 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6517 else 6518 bpage->id = id; 6519 6520 return id; 6521 } 6522 6523 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6524 { 6525 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6526 6527 if (!meta) 6528 return; 6529 6530 meta->reader.read = cpu_buffer->reader_page->read; 6531 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6532 cpu_buffer->reader_page->id); 6533 6534 meta->reader.lost_events = cpu_buffer->lost_events; 6535 6536 meta->entries = local_read(&cpu_buffer->entries); 6537 meta->overrun = local_read(&cpu_buffer->overrun); 6538 meta->read = cpu_buffer->read; 6539 meta->pages_lost = local_read(&cpu_buffer->pages_lost); 6540 meta->pages_touched = local_read(&cpu_buffer->pages_touched); 6541 6542 /* Some archs do not have data cache coherency between kernel and user-space */ 6543 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6544 } 6545 6546 static void 6547 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6548 { 6549 struct buffer_page *page; 6550 6551 if (cpu_buffer->remote) { 6552 if (!cpu_buffer->remote->reset) 6553 return; 6554 6555 cpu_buffer->remote->reset(cpu_buffer->cpu, cpu_buffer->remote->priv); 6556 rb_read_remote_meta_page(cpu_buffer); 6557 6558 /* Read related values, not covered by the meta-page */ 6559 local_set(&cpu_buffer->pages_read, 0); 6560 cpu_buffer->read = 0; 6561 cpu_buffer->read_bytes = 0; 6562 cpu_buffer->last_overrun = 0; 6563 cpu_buffer->reader_page->read = 0; 6564 6565 return; 6566 } 6567 6568 rb_head_page_deactivate(cpu_buffer); 6569 6570 cpu_buffer->head_page 6571 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6572 rb_clear_buffer_page(cpu_buffer->head_page); 6573 list_for_each_entry(page, cpu_buffer->pages, list) { 6574 rb_clear_buffer_page(page); 6575 } 6576 6577 cpu_buffer->tail_page = cpu_buffer->head_page; 6578 cpu_buffer->commit_page = cpu_buffer->head_page; 6579 6580 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6581 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6582 rb_clear_buffer_page(cpu_buffer->reader_page); 6583 6584 local_set(&cpu_buffer->entries_bytes, 0); 6585 local_set(&cpu_buffer->overrun, 0); 6586 local_set(&cpu_buffer->commit_overrun, 0); 6587 local_set(&cpu_buffer->dropped_events, 0); 6588 local_set(&cpu_buffer->entries, 0); 6589 local_set(&cpu_buffer->committing, 0); 6590 local_set(&cpu_buffer->commits, 0); 6591 local_set(&cpu_buffer->pages_touched, 0); 6592 local_set(&cpu_buffer->pages_lost, 0); 6593 local_set(&cpu_buffer->pages_read, 0); 6594 cpu_buffer->last_pages_touch = 0; 6595 cpu_buffer->shortest_full = 0; 6596 cpu_buffer->read = 0; 6597 cpu_buffer->read_bytes = 0; 6598 6599 rb_time_set(&cpu_buffer->write_stamp, 0); 6600 rb_time_set(&cpu_buffer->before_stamp, 0); 6601 6602 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6603 6604 cpu_buffer->lost_events = 0; 6605 cpu_buffer->last_overrun = 0; 6606 6607 rb_head_page_activate(cpu_buffer); 6608 cpu_buffer->pages_removed = 0; 6609 6610 if (cpu_buffer->mapped) { 6611 rb_update_meta_page(cpu_buffer); 6612 if (cpu_buffer->ring_meta) { 6613 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6614 meta->commit_buffer = meta->head_buffer; 6615 } 6616 } 6617 } 6618 6619 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6620 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6621 { 6622 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6623 6624 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6625 return; 6626 6627 arch_spin_lock(&cpu_buffer->lock); 6628 6629 rb_reset_cpu(cpu_buffer); 6630 6631 arch_spin_unlock(&cpu_buffer->lock); 6632 } 6633 6634 /** 6635 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6636 * @buffer: The ring buffer to reset a per cpu buffer of 6637 * @cpu: The CPU buffer to be reset 6638 */ 6639 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6640 { 6641 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6642 6643 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6644 return; 6645 6646 /* prevent another thread from changing buffer sizes */ 6647 mutex_lock(&buffer->mutex); 6648 6649 atomic_inc(&cpu_buffer->resize_disabled); 6650 atomic_inc(&cpu_buffer->record_disabled); 6651 6652 /* Make sure all commits have finished */ 6653 synchronize_rcu(); 6654 6655 reset_disabled_cpu_buffer(cpu_buffer); 6656 6657 atomic_dec(&cpu_buffer->record_disabled); 6658 atomic_dec(&cpu_buffer->resize_disabled); 6659 6660 mutex_unlock(&buffer->mutex); 6661 } 6662 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6663 6664 /* Flag to ensure proper resetting of atomic variables */ 6665 #define RESET_BIT (1 << 30) 6666 6667 /** 6668 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6669 * @buffer: The ring buffer to reset a per cpu buffer of 6670 */ 6671 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6672 { 6673 struct ring_buffer_per_cpu *cpu_buffer; 6674 int cpu; 6675 6676 /* prevent another thread from changing buffer sizes */ 6677 mutex_lock(&buffer->mutex); 6678 6679 for_each_online_buffer_cpu(buffer, cpu) { 6680 cpu_buffer = buffer->buffers[cpu]; 6681 6682 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6683 atomic_inc(&cpu_buffer->record_disabled); 6684 } 6685 6686 /* Make sure all commits have finished */ 6687 synchronize_rcu(); 6688 6689 for_each_buffer_cpu(buffer, cpu) { 6690 cpu_buffer = buffer->buffers[cpu]; 6691 6692 /* 6693 * If a CPU came online during the synchronize_rcu(), then 6694 * ignore it. 6695 */ 6696 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6697 continue; 6698 6699 reset_disabled_cpu_buffer(cpu_buffer); 6700 6701 atomic_dec(&cpu_buffer->record_disabled); 6702 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6703 } 6704 6705 mutex_unlock(&buffer->mutex); 6706 } 6707 6708 /** 6709 * ring_buffer_reset - reset a ring buffer 6710 * @buffer: The ring buffer to reset all cpu buffers 6711 */ 6712 void ring_buffer_reset(struct trace_buffer *buffer) 6713 { 6714 struct ring_buffer_per_cpu *cpu_buffer; 6715 int cpu; 6716 6717 /* prevent another thread from changing buffer sizes */ 6718 mutex_lock(&buffer->mutex); 6719 6720 for_each_buffer_cpu(buffer, cpu) { 6721 cpu_buffer = buffer->buffers[cpu]; 6722 6723 atomic_inc(&cpu_buffer->resize_disabled); 6724 atomic_inc(&cpu_buffer->record_disabled); 6725 } 6726 6727 /* Make sure all commits have finished */ 6728 synchronize_rcu(); 6729 6730 for_each_buffer_cpu(buffer, cpu) { 6731 cpu_buffer = buffer->buffers[cpu]; 6732 6733 reset_disabled_cpu_buffer(cpu_buffer); 6734 6735 atomic_dec(&cpu_buffer->record_disabled); 6736 atomic_dec(&cpu_buffer->resize_disabled); 6737 } 6738 6739 mutex_unlock(&buffer->mutex); 6740 } 6741 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6742 6743 /** 6744 * ring_buffer_empty - is the ring buffer empty? 6745 * @buffer: The ring buffer to test 6746 */ 6747 bool ring_buffer_empty(struct trace_buffer *buffer) 6748 { 6749 struct ring_buffer_per_cpu *cpu_buffer; 6750 unsigned long flags; 6751 bool dolock; 6752 bool ret; 6753 int cpu; 6754 6755 /* yes this is racy, but if you don't like the race, lock the buffer */ 6756 for_each_buffer_cpu(buffer, cpu) { 6757 cpu_buffer = buffer->buffers[cpu]; 6758 local_irq_save(flags); 6759 dolock = rb_reader_lock(cpu_buffer); 6760 ret = rb_per_cpu_empty(cpu_buffer); 6761 rb_reader_unlock(cpu_buffer, dolock); 6762 local_irq_restore(flags); 6763 6764 if (!ret) 6765 return false; 6766 } 6767 6768 return true; 6769 } 6770 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6771 6772 /** 6773 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6774 * @buffer: The ring buffer 6775 * @cpu: The CPU buffer to test 6776 */ 6777 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6778 { 6779 struct ring_buffer_per_cpu *cpu_buffer; 6780 unsigned long flags; 6781 bool dolock; 6782 bool ret; 6783 6784 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6785 return true; 6786 6787 cpu_buffer = buffer->buffers[cpu]; 6788 local_irq_save(flags); 6789 dolock = rb_reader_lock(cpu_buffer); 6790 ret = rb_per_cpu_empty(cpu_buffer); 6791 rb_reader_unlock(cpu_buffer, dolock); 6792 local_irq_restore(flags); 6793 6794 return ret; 6795 } 6796 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6797 6798 int ring_buffer_poll_remote(struct trace_buffer *buffer, int cpu) 6799 { 6800 struct ring_buffer_per_cpu *cpu_buffer; 6801 6802 if (cpu != RING_BUFFER_ALL_CPUS) { 6803 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6804 return -EINVAL; 6805 6806 cpu_buffer = buffer->buffers[cpu]; 6807 6808 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6809 if (rb_read_remote_meta_page(cpu_buffer)) 6810 rb_wakeups(buffer, cpu_buffer); 6811 6812 return 0; 6813 } 6814 6815 guard(cpus_read_lock)(); 6816 6817 /* 6818 * Make sure all the ring buffers are up to date before we start reading 6819 * them. 6820 */ 6821 for_each_buffer_cpu(buffer, cpu) { 6822 cpu_buffer = buffer->buffers[cpu]; 6823 6824 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6825 rb_read_remote_meta_page(cpu_buffer); 6826 } 6827 6828 for_each_buffer_cpu(buffer, cpu) { 6829 cpu_buffer = buffer->buffers[cpu]; 6830 6831 if (rb_num_of_entries(cpu_buffer)) 6832 rb_wakeups(buffer, cpu_buffer); 6833 } 6834 6835 return 0; 6836 } 6837 6838 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6839 /** 6840 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6841 * @buffer_a: One buffer to swap with 6842 * @buffer_b: The other buffer to swap with 6843 * @cpu: the CPU of the buffers to swap 6844 * 6845 * This function is useful for tracers that want to take a "snapshot" 6846 * of a CPU buffer and has another back up buffer lying around. 6847 * it is expected that the tracer handles the cpu buffer not being 6848 * used at the moment. 6849 */ 6850 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6851 struct trace_buffer *buffer_b, int cpu) 6852 { 6853 struct ring_buffer_per_cpu *cpu_buffer_a; 6854 struct ring_buffer_per_cpu *cpu_buffer_b; 6855 int ret = -EINVAL; 6856 6857 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6858 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6859 return -EINVAL; 6860 6861 cpu_buffer_a = buffer_a->buffers[cpu]; 6862 cpu_buffer_b = buffer_b->buffers[cpu]; 6863 6864 /* It's up to the callers to not try to swap mapped buffers */ 6865 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6866 return -EBUSY; 6867 6868 /* At least make sure the two buffers are somewhat the same */ 6869 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6870 return -EINVAL; 6871 6872 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6873 return -EINVAL; 6874 6875 if (atomic_read(&buffer_a->record_disabled)) 6876 return -EAGAIN; 6877 6878 if (atomic_read(&buffer_b->record_disabled)) 6879 return -EAGAIN; 6880 6881 if (atomic_read(&cpu_buffer_a->record_disabled)) 6882 return -EAGAIN; 6883 6884 if (atomic_read(&cpu_buffer_b->record_disabled)) 6885 return -EAGAIN; 6886 6887 /* 6888 * We can't do a synchronize_rcu here because this 6889 * function can be called in atomic context. 6890 * Normally this will be called from the same CPU as cpu. 6891 * If not it's up to the caller to protect this. 6892 */ 6893 atomic_inc(&cpu_buffer_a->record_disabled); 6894 atomic_inc(&cpu_buffer_b->record_disabled); 6895 6896 ret = -EBUSY; 6897 if (local_read(&cpu_buffer_a->committing)) 6898 goto out_dec; 6899 if (local_read(&cpu_buffer_b->committing)) 6900 goto out_dec; 6901 6902 /* 6903 * When resize is in progress, we cannot swap it because 6904 * it will mess the state of the cpu buffer. 6905 */ 6906 if (atomic_read(&buffer_a->resizing)) 6907 goto out_dec; 6908 if (atomic_read(&buffer_b->resizing)) 6909 goto out_dec; 6910 6911 buffer_a->buffers[cpu] = cpu_buffer_b; 6912 buffer_b->buffers[cpu] = cpu_buffer_a; 6913 6914 cpu_buffer_b->buffer = buffer_a; 6915 cpu_buffer_a->buffer = buffer_b; 6916 6917 ret = 0; 6918 6919 out_dec: 6920 atomic_dec(&cpu_buffer_a->record_disabled); 6921 atomic_dec(&cpu_buffer_b->record_disabled); 6922 return ret; 6923 } 6924 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6925 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6926 6927 /** 6928 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6929 * @buffer: the buffer to allocate for. 6930 * @cpu: the cpu buffer to allocate. 6931 * 6932 * This function is used in conjunction with ring_buffer_read_page. 6933 * When reading a full page from the ring buffer, these functions 6934 * can be used to speed up the process. The calling function should 6935 * allocate a few pages first with this function. Then when it 6936 * needs to get pages from the ring buffer, it passes the result 6937 * of this function into ring_buffer_read_page, which will swap 6938 * the page that was allocated, with the read page of the buffer. 6939 * 6940 * Returns: 6941 * The page allocated, or ERR_PTR 6942 */ 6943 struct buffer_data_read_page * 6944 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6945 { 6946 struct ring_buffer_per_cpu *cpu_buffer; 6947 struct buffer_data_read_page *bpage = NULL; 6948 unsigned long flags; 6949 6950 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6951 return ERR_PTR(-ENODEV); 6952 6953 bpage = kzalloc_obj(*bpage); 6954 if (!bpage) 6955 return ERR_PTR(-ENOMEM); 6956 6957 bpage->order = buffer->subbuf_order; 6958 cpu_buffer = buffer->buffers[cpu]; 6959 local_irq_save(flags); 6960 arch_spin_lock(&cpu_buffer->lock); 6961 6962 if (cpu_buffer->free_page) { 6963 bpage->data = cpu_buffer->free_page; 6964 cpu_buffer->free_page = NULL; 6965 } 6966 6967 arch_spin_unlock(&cpu_buffer->lock); 6968 local_irq_restore(flags); 6969 6970 if (bpage->data) { 6971 rb_init_data_page(bpage->data); 6972 } else { 6973 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6974 if (!bpage->data) { 6975 kfree(bpage); 6976 return ERR_PTR(-ENOMEM); 6977 } 6978 } 6979 6980 return bpage; 6981 } 6982 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6983 6984 /** 6985 * ring_buffer_free_read_page - free an allocated read page 6986 * @buffer: the buffer the page was allocate for 6987 * @cpu: the cpu buffer the page came from 6988 * @data_page: the page to free 6989 * 6990 * Free a page allocated from ring_buffer_alloc_read_page. 6991 */ 6992 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6993 struct buffer_data_read_page *data_page) 6994 { 6995 struct ring_buffer_per_cpu *cpu_buffer; 6996 struct buffer_data_page *dpage = data_page->data; 6997 struct page *page = virt_to_page(dpage); 6998 unsigned long flags; 6999 7000 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 7001 return; 7002 7003 cpu_buffer = buffer->buffers[cpu]; 7004 7005 /* 7006 * If the page is still in use someplace else, or order of the page 7007 * is different from the subbuffer order of the buffer - 7008 * we can't reuse it 7009 */ 7010 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 7011 goto out; 7012 7013 local_irq_save(flags); 7014 arch_spin_lock(&cpu_buffer->lock); 7015 7016 if (!cpu_buffer->free_page) { 7017 cpu_buffer->free_page = dpage; 7018 dpage = NULL; 7019 } 7020 7021 arch_spin_unlock(&cpu_buffer->lock); 7022 local_irq_restore(flags); 7023 7024 out: 7025 free_pages((unsigned long)dpage, data_page->order); 7026 kfree(data_page); 7027 } 7028 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 7029 7030 /** 7031 * ring_buffer_read_page - extract a page from the ring buffer 7032 * @buffer: buffer to extract from 7033 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 7034 * @len: amount to extract 7035 * @cpu: the cpu of the buffer to extract 7036 * @full: should the extraction only happen when the page is full. 7037 * 7038 * This function will pull out a page from the ring buffer and consume it. 7039 * @data_page must be the address of the variable that was returned 7040 * from ring_buffer_alloc_read_page. This is because the page might be used 7041 * to swap with a page in the ring buffer. 7042 * 7043 * for example: 7044 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 7045 * if (IS_ERR(rpage)) 7046 * return PTR_ERR(rpage); 7047 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 7048 * if (ret >= 0) 7049 * process_page(ring_buffer_read_page_data(rpage), ret); 7050 * ring_buffer_free_read_page(buffer, cpu, rpage); 7051 * 7052 * When @full is set, the function will not return true unless 7053 * the writer is off the reader page. 7054 * 7055 * Note: it is up to the calling functions to handle sleeps and wakeups. 7056 * The ring buffer can be used anywhere in the kernel and can not 7057 * blindly call wake_up. The layer that uses the ring buffer must be 7058 * responsible for that. 7059 * 7060 * Returns: 7061 * >=0 if data has been transferred, returns the offset of consumed data. 7062 * <0 if no data has been transferred. 7063 */ 7064 int ring_buffer_read_page(struct trace_buffer *buffer, 7065 struct buffer_data_read_page *data_page, 7066 size_t len, int cpu, int full) 7067 { 7068 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 7069 struct ring_buffer_event *event; 7070 struct buffer_data_page *dpage; 7071 struct buffer_page *reader; 7072 long missed_events; 7073 unsigned int commit; 7074 unsigned int size; 7075 unsigned int read; 7076 u64 save_timestamp; 7077 bool force_memcpy; 7078 7079 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7080 return -1; 7081 7082 /* 7083 * If len is not big enough to hold the page header, then 7084 * we can not copy anything. 7085 */ 7086 if (len <= BUF_PAGE_HDR_SIZE) 7087 return -1; 7088 7089 len -= BUF_PAGE_HDR_SIZE; 7090 7091 if (!data_page || !data_page->data) 7092 return -1; 7093 7094 if (data_page->order != buffer->subbuf_order) 7095 return -1; 7096 7097 dpage = data_page->data; 7098 if (!dpage) 7099 return -1; 7100 7101 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 7102 7103 reader = rb_get_reader_page(cpu_buffer); 7104 if (!reader) 7105 return -1; 7106 7107 event = rb_reader_event(cpu_buffer); 7108 7109 read = reader->read; 7110 commit = rb_page_commit(reader); 7111 size = rb_page_size(reader); 7112 7113 /* Check if any events were dropped */ 7114 missed_events = cpu_buffer->lost_events; 7115 7116 force_memcpy = cpu_buffer->mapped || cpu_buffer->remote; 7117 7118 /* 7119 * If this page has been partially read or 7120 * if len is not big enough to read the rest of the page or 7121 * a writer is still on the page, then 7122 * we must copy the data from the page to the buffer. 7123 * Otherwise, we can simply swap the page with the one passed in. 7124 */ 7125 if (read || (len < (size - read)) || 7126 cpu_buffer->reader_page == cpu_buffer->commit_page || 7127 force_memcpy) { 7128 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 7129 unsigned int rpos = read; 7130 unsigned int pos = 0; 7131 unsigned int event_size; 7132 unsigned int flags = 0; 7133 7134 /* 7135 * If a full page is expected, this can still be returned 7136 * if there's been a previous partial read and the 7137 * rest of the page can be read and the commit page is off 7138 * the reader page. 7139 */ 7140 if (full && 7141 (!read || (len < (size - read)) || 7142 cpu_buffer->reader_page == cpu_buffer->commit_page)) 7143 return -1; 7144 7145 if (len > (size - read)) 7146 len = (size - read); 7147 7148 /* Always keep the time extend and data together */ 7149 event_size = rb_event_ts_length(event); 7150 7151 if (len < event_size) 7152 return -1; 7153 7154 if (commit & RB_MISSED_EVENTS) 7155 flags = RB_MISSED_EVENTS; 7156 7157 /* save the current timestamp, since the user will need it */ 7158 save_timestamp = cpu_buffer->read_stamp; 7159 7160 /* Need to copy one event at a time */ 7161 do { 7162 /* We need the size of one event, because 7163 * rb_advance_reader only advances by one event, 7164 * whereas rb_event_ts_length may include the size of 7165 * one or two events. 7166 * We have already ensured there's enough space if this 7167 * is a time extend. */ 7168 event_size = rb_event_length(event); 7169 memcpy(dpage->data + pos, rpage->data + rpos, event_size); 7170 7171 len -= event_size; 7172 7173 rb_advance_reader(cpu_buffer); 7174 rpos = reader->read; 7175 pos += event_size; 7176 7177 if (rpos >= event_size) 7178 break; 7179 7180 event = rb_reader_event(cpu_buffer); 7181 /* Always keep the time extend and data together */ 7182 event_size = rb_event_ts_length(event); 7183 } while (len >= event_size); 7184 7185 /* update dpage */ 7186 local_set(&dpage->commit, pos | flags); 7187 dpage->time_stamp = save_timestamp; 7188 7189 /* we copied everything to the beginning */ 7190 read = 0; 7191 } else { 7192 /* update the entry counter */ 7193 cpu_buffer->read += rb_page_entries(reader); 7194 cpu_buffer->read_bytes += rb_page_size(reader); 7195 7196 /* swap the pages */ 7197 rb_init_data_page(dpage); 7198 dpage = reader->page; 7199 reader->page = data_page->data; 7200 local_set(&reader->write, 0); 7201 local_set(&reader->entries, 0); 7202 reader->read = 0; 7203 data_page->data = dpage; 7204 if (!missed_events && rb_data_page_commit(dpage) & RB_MISSED_EVENTS) 7205 missed_events = -1; 7206 7207 /* 7208 * Use the real_end for the data size, 7209 * This gives us a chance to store the lost events 7210 * on the page. 7211 */ 7212 if (reader->real_end) 7213 local_set(&dpage->commit, reader->real_end); 7214 } 7215 7216 cpu_buffer->lost_events = 0; 7217 7218 size = rb_data_page_size(dpage); 7219 /* 7220 * Set a flag in the commit field if we lost events 7221 */ 7222 if (missed_events) { 7223 /* 7224 * If there is room at the end of the page to save the 7225 * missed events, then record it there. 7226 */ 7227 if (missed_events > 0 && 7228 buffer->subbuf_size - size >= sizeof(missed_events)) { 7229 memcpy(&dpage->data[size], &missed_events, 7230 sizeof(missed_events)); 7231 local_add(RB_MISSED_STORED, &dpage->commit); 7232 size += sizeof(missed_events); 7233 } 7234 /* 7235 * Note, for the persistent ring buffer, the RB_MISSED_EVENTS 7236 * may have been set in the main buffer via the verification code. 7237 * But here, dpage is a copy of that page and has not yet had 7238 * the RB_MISSED_EVENTS set. As for the normal buffers, 7239 * the main write buffer does not set these bits and it needs 7240 * to be set here. 7241 */ 7242 local_add(RB_MISSED_EVENTS, &dpage->commit); 7243 } 7244 7245 /* 7246 * This page may be off to user land. Zero it out here. 7247 */ 7248 if (size < buffer->subbuf_size) 7249 memset(&dpage->data[size], 0, buffer->subbuf_size - size); 7250 7251 return read; 7252 } 7253 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 7254 7255 /** 7256 * ring_buffer_read_page_data - get pointer to the data in the page. 7257 * @page: the page to get the data from 7258 * 7259 * Returns pointer to the actual data in this page. 7260 */ 7261 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 7262 { 7263 return page->data; 7264 } 7265 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 7266 7267 /** 7268 * ring_buffer_subbuf_size_get - get size of the sub buffer. 7269 * @buffer: the buffer to get the sub buffer size from 7270 * 7271 * Returns size of the sub buffer, in bytes. 7272 */ 7273 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 7274 { 7275 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7276 } 7277 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 7278 7279 /** 7280 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 7281 * @buffer: The ring_buffer to get the system sub page order from 7282 * 7283 * By default, one ring buffer sub page equals to one system page. This parameter 7284 * is configurable, per ring buffer. The size of the ring buffer sub page can be 7285 * extended, but must be an order of system page size. 7286 * 7287 * Returns the order of buffer sub page size, in system pages: 7288 * 0 means the sub buffer size is 1 system page and so forth. 7289 * In case of an error < 0 is returned. 7290 */ 7291 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 7292 { 7293 if (!buffer) 7294 return -EINVAL; 7295 7296 return buffer->subbuf_order; 7297 } 7298 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 7299 7300 /** 7301 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 7302 * @buffer: The ring_buffer to set the new page size. 7303 * @order: Order of the system pages in one sub buffer page 7304 * 7305 * By default, one ring buffer pages equals to one system page. This API can be 7306 * used to set new size of the ring buffer page. The size must be order of 7307 * system page size, that's why the input parameter @order is the order of 7308 * system pages that are allocated for one ring buffer page: 7309 * 0 - 1 system page 7310 * 1 - 2 system pages 7311 * 3 - 4 system pages 7312 * ... 7313 * 7314 * Returns 0 on success or < 0 in case of an error. 7315 */ 7316 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 7317 { 7318 struct ring_buffer_per_cpu *cpu_buffer; 7319 struct buffer_page *bpage, *tmp; 7320 int old_order, old_size; 7321 int nr_pages; 7322 int psize; 7323 int err; 7324 int cpu; 7325 7326 if (!buffer || order < 0) 7327 return -EINVAL; 7328 7329 if (buffer->subbuf_order == order) 7330 return 0; 7331 7332 psize = (1 << order) * PAGE_SIZE; 7333 if (psize <= BUF_PAGE_HDR_SIZE) 7334 return -EINVAL; 7335 7336 /* Size of a subbuf cannot be greater than the write counter */ 7337 if (psize > RB_WRITE_MASK + 1) 7338 return -EINVAL; 7339 7340 old_order = buffer->subbuf_order; 7341 old_size = buffer->subbuf_size; 7342 7343 /* prevent another thread from changing buffer sizes */ 7344 guard(mutex)(&buffer->mutex); 7345 atomic_inc(&buffer->record_disabled); 7346 7347 /* Make sure all commits have finished */ 7348 synchronize_rcu(); 7349 7350 buffer->subbuf_order = order; 7351 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 7352 7353 /* Make sure all new buffers are allocated, before deleting the old ones */ 7354 for_each_buffer_cpu(buffer, cpu) { 7355 7356 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7357 continue; 7358 7359 cpu_buffer = buffer->buffers[cpu]; 7360 7361 if (cpu_buffer->mapped) { 7362 err = -EBUSY; 7363 goto error; 7364 } 7365 7366 /* Update the number of pages to match the new size */ 7367 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 7368 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 7369 7370 /* we need a minimum of two pages */ 7371 if (nr_pages < 2) 7372 nr_pages = 2; 7373 7374 cpu_buffer->nr_pages_to_update = nr_pages; 7375 7376 /* Include the reader page */ 7377 nr_pages++; 7378 7379 /* Allocate the new size buffer */ 7380 INIT_LIST_HEAD(&cpu_buffer->new_pages); 7381 if (__rb_allocate_pages(cpu_buffer, nr_pages, 7382 &cpu_buffer->new_pages)) { 7383 /* not enough memory for new pages */ 7384 err = -ENOMEM; 7385 goto error; 7386 } 7387 } 7388 7389 for_each_buffer_cpu(buffer, cpu) { 7390 struct buffer_data_page *old_free_data_page; 7391 struct list_head old_pages; 7392 unsigned long flags; 7393 7394 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7395 continue; 7396 7397 cpu_buffer = buffer->buffers[cpu]; 7398 7399 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7400 7401 /* Clear the head bit to make the link list normal to read */ 7402 rb_head_page_deactivate(cpu_buffer); 7403 7404 /* 7405 * Collect buffers from the cpu_buffer pages list and the 7406 * reader_page on old_pages, so they can be freed later when not 7407 * under a spinlock. The pages list is a linked list with no 7408 * head, adding old_pages turns it into a regular list with 7409 * old_pages being the head. 7410 */ 7411 list_add(&old_pages, cpu_buffer->pages); 7412 list_add(&cpu_buffer->reader_page->list, &old_pages); 7413 7414 /* One page was allocated for the reader page */ 7415 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 7416 struct buffer_page, list); 7417 list_del_init(&cpu_buffer->reader_page->list); 7418 7419 /* Install the new pages, remove the head from the list */ 7420 cpu_buffer->pages = cpu_buffer->new_pages.next; 7421 list_del_init(&cpu_buffer->new_pages); 7422 cpu_buffer->cnt++; 7423 7424 cpu_buffer->head_page 7425 = list_entry(cpu_buffer->pages, struct buffer_page, list); 7426 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 7427 7428 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 7429 cpu_buffer->nr_pages_to_update = 0; 7430 7431 old_free_data_page = cpu_buffer->free_page; 7432 cpu_buffer->free_page = NULL; 7433 7434 rb_head_page_activate(cpu_buffer); 7435 7436 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7437 7438 /* Free old sub buffers */ 7439 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 7440 list_del_init(&bpage->list); 7441 free_buffer_page(bpage); 7442 } 7443 free_pages((unsigned long)old_free_data_page, old_order); 7444 7445 rb_check_pages(cpu_buffer); 7446 } 7447 7448 atomic_dec(&buffer->record_disabled); 7449 7450 return 0; 7451 7452 error: 7453 buffer->subbuf_order = old_order; 7454 buffer->subbuf_size = old_size; 7455 7456 atomic_dec(&buffer->record_disabled); 7457 7458 for_each_buffer_cpu(buffer, cpu) { 7459 cpu_buffer = buffer->buffers[cpu]; 7460 7461 if (!cpu_buffer->nr_pages_to_update) 7462 continue; 7463 7464 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 7465 list_del_init(&bpage->list); 7466 free_buffer_page(bpage); 7467 } 7468 } 7469 7470 return err; 7471 } 7472 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 7473 7474 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7475 { 7476 struct page *page; 7477 7478 if (cpu_buffer->meta_page) 7479 return 0; 7480 7481 page = alloc_page(GFP_USER | __GFP_ZERO); 7482 if (!page) 7483 return -ENOMEM; 7484 7485 cpu_buffer->meta_page = page_to_virt(page); 7486 7487 return 0; 7488 } 7489 7490 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7491 { 7492 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7493 7494 free_page(addr); 7495 cpu_buffer->meta_page = NULL; 7496 } 7497 7498 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7499 struct buffer_page **subbuf_ids) 7500 { 7501 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7502 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7503 struct buffer_page *first_subbuf, *subbuf; 7504 int cnt = 0; 7505 int id = 0; 7506 7507 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7508 subbuf_ids[id++] = cpu_buffer->reader_page; 7509 cnt++; 7510 7511 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7512 do { 7513 id = rb_page_id(cpu_buffer, subbuf, id); 7514 7515 if (WARN_ON(id >= nr_subbufs)) 7516 break; 7517 7518 subbuf_ids[id] = subbuf; 7519 7520 rb_inc_page(&subbuf); 7521 id++; 7522 cnt++; 7523 } while (subbuf != first_subbuf); 7524 7525 WARN_ON(cnt != nr_subbufs); 7526 7527 /* install subbuf ID to bpage translation */ 7528 cpu_buffer->subbuf_ids = subbuf_ids; 7529 7530 meta->meta_struct_len = sizeof(*meta); 7531 meta->nr_subbufs = nr_subbufs; 7532 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7533 meta->meta_page_size = meta->subbuf_size; 7534 7535 rb_update_meta_page(cpu_buffer); 7536 } 7537 7538 static struct ring_buffer_per_cpu * 7539 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7540 { 7541 struct ring_buffer_per_cpu *cpu_buffer; 7542 7543 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7544 return ERR_PTR(-EINVAL); 7545 7546 cpu_buffer = buffer->buffers[cpu]; 7547 7548 mutex_lock(&cpu_buffer->mapping_lock); 7549 7550 if (!cpu_buffer->user_mapped) { 7551 mutex_unlock(&cpu_buffer->mapping_lock); 7552 return ERR_PTR(-ENODEV); 7553 } 7554 7555 return cpu_buffer; 7556 } 7557 7558 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7559 { 7560 mutex_unlock(&cpu_buffer->mapping_lock); 7561 } 7562 7563 /* 7564 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7565 * to be set-up or torn-down. 7566 */ 7567 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7568 bool inc) 7569 { 7570 unsigned long flags; 7571 7572 lockdep_assert_held(&cpu_buffer->mapping_lock); 7573 7574 /* mapped is always greater or equal to user_mapped */ 7575 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7576 return -EINVAL; 7577 7578 if (inc && cpu_buffer->mapped == UINT_MAX) 7579 return -EBUSY; 7580 7581 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7582 return -EINVAL; 7583 7584 mutex_lock(&cpu_buffer->buffer->mutex); 7585 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7586 7587 if (inc) { 7588 cpu_buffer->user_mapped++; 7589 cpu_buffer->mapped++; 7590 } else { 7591 cpu_buffer->user_mapped--; 7592 cpu_buffer->mapped--; 7593 } 7594 7595 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7596 mutex_unlock(&cpu_buffer->buffer->mutex); 7597 7598 return 0; 7599 } 7600 7601 /* 7602 * +--------------+ pgoff == 0 7603 * | meta page | 7604 * +--------------+ pgoff == 1 7605 * | subbuffer 0 | 7606 * | | 7607 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7608 * | subbuffer 1 | 7609 * | | 7610 * ... 7611 */ 7612 #ifdef CONFIG_MMU 7613 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7614 struct vm_area_struct *vma) 7615 { 7616 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7617 unsigned int subbuf_pages, subbuf_order; 7618 struct page **pages __free(kfree) = NULL; 7619 int p = 0, s = 0; 7620 int err; 7621 7622 /* Refuse MP_PRIVATE or writable mappings */ 7623 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7624 !(vma->vm_flags & VM_MAYSHARE)) 7625 return -EPERM; 7626 7627 subbuf_order = cpu_buffer->buffer->subbuf_order; 7628 subbuf_pages = 1 << subbuf_order; 7629 7630 if (subbuf_order && pgoff % subbuf_pages) 7631 return -EINVAL; 7632 7633 /* 7634 * Make sure the mapping cannot become writable later. Also tell the VM 7635 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7636 */ 7637 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7638 VM_MAYWRITE); 7639 7640 lockdep_assert_held(&cpu_buffer->mapping_lock); 7641 7642 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7643 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7644 if (nr_pages <= pgoff) 7645 return -EINVAL; 7646 7647 nr_pages -= pgoff; 7648 7649 nr_vma_pages = vma_pages(vma); 7650 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7651 return -EINVAL; 7652 7653 nr_pages = nr_vma_pages; 7654 7655 pages = kzalloc_objs(*pages, nr_pages); 7656 if (!pages) 7657 return -ENOMEM; 7658 7659 if (!pgoff) { 7660 unsigned long meta_page_padding; 7661 7662 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7663 7664 /* 7665 * Pad with the zero-page to align the meta-page with the 7666 * sub-buffers. 7667 */ 7668 meta_page_padding = subbuf_pages - 1; 7669 while (meta_page_padding-- && p < nr_pages) { 7670 unsigned long __maybe_unused zero_addr = 7671 vma->vm_start + (PAGE_SIZE * p); 7672 7673 pages[p++] = ZERO_PAGE(zero_addr); 7674 } 7675 } else { 7676 /* Skip the meta-page */ 7677 pgoff -= subbuf_pages; 7678 7679 s += pgoff / subbuf_pages; 7680 } 7681 7682 while (p < nr_pages) { 7683 struct buffer_page *subbuf; 7684 struct page *page; 7685 int off = 0; 7686 7687 if (WARN_ON_ONCE(s >= nr_subbufs)) 7688 return -EINVAL; 7689 7690 subbuf = cpu_buffer->subbuf_ids[s]; 7691 page = virt_to_page((void *)subbuf->page); 7692 7693 for (; off < (1 << (subbuf_order)); off++, page++) { 7694 if (p >= nr_pages) 7695 break; 7696 7697 pages[p++] = page; 7698 } 7699 s++; 7700 } 7701 7702 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7703 7704 return err; 7705 } 7706 #else 7707 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7708 struct vm_area_struct *vma) 7709 { 7710 return -EOPNOTSUPP; 7711 } 7712 #endif 7713 7714 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7715 struct vm_area_struct *vma) 7716 { 7717 struct ring_buffer_per_cpu *cpu_buffer; 7718 struct buffer_page **subbuf_ids; 7719 unsigned long flags; 7720 int err; 7721 7722 if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->remote) 7723 return -EINVAL; 7724 7725 cpu_buffer = buffer->buffers[cpu]; 7726 7727 guard(mutex)(&cpu_buffer->mapping_lock); 7728 7729 if (cpu_buffer->user_mapped) { 7730 err = __rb_map_vma(cpu_buffer, vma); 7731 if (!err) 7732 err = __rb_inc_dec_mapped(cpu_buffer, true); 7733 return err; 7734 } 7735 7736 /* prevent another thread from changing buffer/sub-buffer sizes */ 7737 guard(mutex)(&buffer->mutex); 7738 7739 err = rb_alloc_meta_page(cpu_buffer); 7740 if (err) 7741 return err; 7742 7743 /* subbuf_ids includes the reader while nr_pages does not */ 7744 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7745 if (!subbuf_ids) { 7746 rb_free_meta_page(cpu_buffer); 7747 return -ENOMEM; 7748 } 7749 7750 atomic_inc(&cpu_buffer->resize_disabled); 7751 7752 /* 7753 * Lock all readers to block any subbuf swap until the subbuf IDs are 7754 * assigned. 7755 */ 7756 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7757 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7758 7759 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7760 7761 err = __rb_map_vma(cpu_buffer, vma); 7762 if (!err) { 7763 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7764 /* This is the first time it is mapped by user */ 7765 cpu_buffer->mapped++; 7766 cpu_buffer->user_mapped = 1; 7767 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7768 } else { 7769 kfree(cpu_buffer->subbuf_ids); 7770 cpu_buffer->subbuf_ids = NULL; 7771 rb_free_meta_page(cpu_buffer); 7772 atomic_dec(&cpu_buffer->resize_disabled); 7773 } 7774 7775 return err; 7776 } 7777 7778 /* 7779 * This is called when a VMA is duplicated (e.g., on fork()) to increment 7780 * the user_mapped counter without remapping pages. 7781 */ 7782 void ring_buffer_map_dup(struct trace_buffer *buffer, int cpu) 7783 { 7784 struct ring_buffer_per_cpu *cpu_buffer; 7785 7786 if (WARN_ON(!cpumask_test_cpu(cpu, buffer->cpumask))) 7787 return; 7788 7789 cpu_buffer = buffer->buffers[cpu]; 7790 7791 guard(mutex)(&cpu_buffer->mapping_lock); 7792 7793 if (cpu_buffer->user_mapped) 7794 __rb_inc_dec_mapped(cpu_buffer, true); 7795 else 7796 WARN(1, "Unexpected buffer stat, it should be mapped"); 7797 } 7798 7799 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7800 { 7801 struct ring_buffer_per_cpu *cpu_buffer; 7802 unsigned long flags; 7803 7804 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7805 return -EINVAL; 7806 7807 cpu_buffer = buffer->buffers[cpu]; 7808 7809 guard(mutex)(&cpu_buffer->mapping_lock); 7810 7811 if (!cpu_buffer->user_mapped) { 7812 return -ENODEV; 7813 } else if (cpu_buffer->user_mapped > 1) { 7814 __rb_inc_dec_mapped(cpu_buffer, false); 7815 return 0; 7816 } 7817 7818 guard(mutex)(&buffer->mutex); 7819 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7820 7821 /* This is the last user space mapping */ 7822 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7823 cpu_buffer->mapped--; 7824 cpu_buffer->user_mapped = 0; 7825 7826 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7827 7828 kfree(cpu_buffer->subbuf_ids); 7829 cpu_buffer->subbuf_ids = NULL; 7830 rb_free_meta_page(cpu_buffer); 7831 atomic_dec(&cpu_buffer->resize_disabled); 7832 7833 return 0; 7834 } 7835 7836 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7837 { 7838 struct ring_buffer_per_cpu *cpu_buffer; 7839 struct buffer_page *reader; 7840 unsigned long missed_events; 7841 unsigned long reader_size; 7842 unsigned long flags; 7843 7844 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7845 if (IS_ERR(cpu_buffer)) 7846 return (int)PTR_ERR(cpu_buffer); 7847 7848 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7849 7850 consume: 7851 if (rb_per_cpu_empty(cpu_buffer)) 7852 goto out; 7853 7854 reader_size = rb_page_size(cpu_buffer->reader_page); 7855 7856 /* 7857 * There are data to be read on the current reader page, we can 7858 * return to the caller. But before that, we assume the latter will read 7859 * everything. Let's update the kernel reader accordingly. 7860 */ 7861 if (cpu_buffer->reader_page->read < reader_size) { 7862 while (cpu_buffer->reader_page->read < reader_size) 7863 rb_advance_reader(cpu_buffer); 7864 goto out; 7865 } 7866 7867 /* Did the reader catch up with the writer? */ 7868 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7869 goto out; 7870 7871 reader = rb_get_reader_page(cpu_buffer); 7872 if (WARN_ON(!reader)) 7873 goto out; 7874 7875 /* Check if any events were dropped */ 7876 missed_events = cpu_buffer->lost_events; 7877 7878 if (missed_events) { 7879 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7880 struct buffer_data_page *dpage = reader->page; 7881 unsigned int commit; 7882 /* 7883 * Use the real_end for the data size, 7884 * This gives us a chance to store the lost events 7885 * on the page. 7886 */ 7887 if (reader->real_end) 7888 local_set(&dpage->commit, reader->real_end); 7889 /* 7890 * If there is room at the end of the page to save the 7891 * missed events, then record it there. 7892 */ 7893 commit = rb_page_size(reader); 7894 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7895 memcpy(&dpage->data[commit], &missed_events, 7896 sizeof(missed_events)); 7897 local_add(RB_MISSED_STORED, &dpage->commit); 7898 } 7899 local_add(RB_MISSED_EVENTS, &dpage->commit); 7900 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7901 "Reader on commit with %ld missed events", 7902 missed_events)) { 7903 /* 7904 * There shouldn't be any missed events if the tail_page 7905 * is on the reader page. But if the tail page is not on the 7906 * reader page and the commit_page is, that would mean that 7907 * there's a commit_overrun (an interrupt preempted an 7908 * addition of an event and then filled the buffer 7909 * with new events). In this case it's not an 7910 * error, but it should still be reported. 7911 * 7912 * TODO: Add missed events to the page for user space to know. 7913 */ 7914 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7915 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7916 } 7917 } 7918 7919 cpu_buffer->lost_events = 0; 7920 7921 goto consume; 7922 7923 out: 7924 /* Some archs do not have data cache coherency between kernel and user-space */ 7925 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7926 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7927 7928 rb_update_meta_page(cpu_buffer); 7929 7930 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7931 rb_put_mapped_buffer(cpu_buffer); 7932 7933 return 0; 7934 } 7935 7936 static void rb_cpu_sync(void *data) 7937 { 7938 /* Not really needed, but documents what is happening */ 7939 smp_rmb(); 7940 } 7941 7942 /* 7943 * We only allocate new buffers, never free them if the CPU goes down. 7944 * If we were to free the buffer, then the user would lose any trace that was in 7945 * the buffer. 7946 */ 7947 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7948 { 7949 struct trace_buffer *buffer; 7950 long nr_pages_same; 7951 int cpu_i; 7952 unsigned long nr_pages; 7953 7954 buffer = container_of(node, struct trace_buffer, node); 7955 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7956 return 0; 7957 7958 nr_pages = 0; 7959 nr_pages_same = 1; 7960 /* check if all cpu sizes are same */ 7961 for_each_buffer_cpu(buffer, cpu_i) { 7962 /* fill in the size from first enabled cpu */ 7963 if (nr_pages == 0) 7964 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7965 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7966 nr_pages_same = 0; 7967 break; 7968 } 7969 } 7970 /* allocate minimum pages, user can later expand it */ 7971 if (!nr_pages_same) 7972 nr_pages = 2; 7973 buffer->buffers[cpu] = 7974 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7975 if (!buffer->buffers[cpu]) { 7976 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7977 cpu); 7978 return -ENOMEM; 7979 } 7980 7981 /* 7982 * Ensure trace_buffer readers observe the newly allocated 7983 * ring_buffer_per_cpu before they check the cpumask. Instead of using a 7984 * read barrier for all readers, send an IPI. 7985 */ 7986 if (unlikely(system_state == SYSTEM_RUNNING)) { 7987 on_each_cpu(rb_cpu_sync, NULL, 1); 7988 /* Not really needed, but documents what is happening */ 7989 smp_wmb(); 7990 } 7991 7992 cpumask_set_cpu(cpu, buffer->cpumask); 7993 return 0; 7994 } 7995 7996 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7997 /* 7998 * This is a basic integrity check of the ring buffer. 7999 * Late in the boot cycle this test will run when configured in. 8000 * It will kick off a thread per CPU that will go into a loop 8001 * writing to the per cpu ring buffer various sizes of data. 8002 * Some of the data will be large items, some small. 8003 * 8004 * Another thread is created that goes into a spin, sending out 8005 * IPIs to the other CPUs to also write into the ring buffer. 8006 * this is to test the nesting ability of the buffer. 8007 * 8008 * Basic stats are recorded and reported. If something in the 8009 * ring buffer should happen that's not expected, a big warning 8010 * is displayed and all ring buffers are disabled. 8011 */ 8012 static struct task_struct *rb_threads[NR_CPUS] __initdata; 8013 8014 struct rb_test_data { 8015 struct trace_buffer *buffer; 8016 unsigned long events; 8017 unsigned long bytes_written; 8018 unsigned long bytes_alloc; 8019 unsigned long bytes_dropped; 8020 unsigned long events_nested; 8021 unsigned long bytes_written_nested; 8022 unsigned long bytes_alloc_nested; 8023 unsigned long bytes_dropped_nested; 8024 int min_size_nested; 8025 int max_size_nested; 8026 int max_size; 8027 int min_size; 8028 int cpu; 8029 int cnt; 8030 }; 8031 8032 static struct rb_test_data rb_data[NR_CPUS] __initdata; 8033 8034 /* 1 meg per cpu */ 8035 #define RB_TEST_BUFFER_SIZE 1048576 8036 8037 static char rb_string[] __initdata = 8038 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 8039 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 8040 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 8041 8042 static bool rb_test_started __initdata; 8043 8044 struct rb_item { 8045 int size; 8046 char str[]; 8047 }; 8048 8049 static __init int rb_write_something(struct rb_test_data *data, bool nested) 8050 { 8051 struct ring_buffer_event *event; 8052 struct rb_item *item; 8053 bool started; 8054 int event_len; 8055 int size; 8056 int len; 8057 int cnt; 8058 8059 /* Have nested writes different that what is written */ 8060 cnt = data->cnt + (nested ? 27 : 0); 8061 8062 /* Multiply cnt by ~e, to make some unique increment */ 8063 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 8064 8065 len = size + sizeof(struct rb_item); 8066 8067 started = rb_test_started; 8068 /* read rb_test_started before checking buffer enabled */ 8069 smp_rmb(); 8070 8071 event = ring_buffer_lock_reserve(data->buffer, len); 8072 if (!event) { 8073 /* Ignore dropped events before test starts. */ 8074 if (started) { 8075 if (nested) 8076 data->bytes_dropped_nested += len; 8077 else 8078 data->bytes_dropped += len; 8079 } 8080 return len; 8081 } 8082 8083 event_len = ring_buffer_event_length(event); 8084 8085 if (RB_WARN_ON(data->buffer, event_len < len)) 8086 goto out; 8087 8088 item = ring_buffer_event_data(event); 8089 item->size = size; 8090 memcpy(item->str, rb_string, size); 8091 8092 if (nested) { 8093 data->bytes_alloc_nested += event_len; 8094 data->bytes_written_nested += len; 8095 data->events_nested++; 8096 if (!data->min_size_nested || len < data->min_size_nested) 8097 data->min_size_nested = len; 8098 if (len > data->max_size_nested) 8099 data->max_size_nested = len; 8100 } else { 8101 data->bytes_alloc += event_len; 8102 data->bytes_written += len; 8103 data->events++; 8104 if (!data->min_size || len < data->min_size) 8105 data->max_size = len; 8106 if (len > data->max_size) 8107 data->max_size = len; 8108 } 8109 8110 out: 8111 ring_buffer_unlock_commit(data->buffer); 8112 8113 return 0; 8114 } 8115 8116 static __init int rb_test(void *arg) 8117 { 8118 struct rb_test_data *data = arg; 8119 8120 while (!kthread_should_stop()) { 8121 rb_write_something(data, false); 8122 data->cnt++; 8123 8124 set_current_state(TASK_INTERRUPTIBLE); 8125 /* Now sleep between a min of 100-300us and a max of 1ms */ 8126 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 8127 } 8128 8129 return 0; 8130 } 8131 8132 static __init void rb_ipi(void *ignore) 8133 { 8134 struct rb_test_data *data; 8135 int cpu = smp_processor_id(); 8136 8137 data = &rb_data[cpu]; 8138 rb_write_something(data, true); 8139 } 8140 8141 static __init int rb_hammer_test(void *arg) 8142 { 8143 while (!kthread_should_stop()) { 8144 8145 /* Send an IPI to all cpus to write data! */ 8146 smp_call_function(rb_ipi, NULL, 1); 8147 /* No sleep, but for non preempt, let others run */ 8148 schedule(); 8149 } 8150 8151 return 0; 8152 } 8153 8154 static __init int test_ringbuffer(void) 8155 { 8156 struct task_struct *rb_hammer; 8157 struct trace_buffer *buffer; 8158 int cpu; 8159 int ret = 0; 8160 8161 if (security_locked_down(LOCKDOWN_TRACEFS)) { 8162 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 8163 return 0; 8164 } 8165 8166 pr_info("Running ring buffer tests...\n"); 8167 8168 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 8169 if (WARN_ON(!buffer)) 8170 return 0; 8171 8172 /* Disable buffer so that threads can't write to it yet */ 8173 ring_buffer_record_off(buffer); 8174 8175 for_each_online_cpu(cpu) { 8176 rb_data[cpu].buffer = buffer; 8177 rb_data[cpu].cpu = cpu; 8178 rb_data[cpu].cnt = cpu; 8179 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 8180 cpu, "rbtester/%u"); 8181 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 8182 pr_cont("FAILED\n"); 8183 ret = PTR_ERR(rb_threads[cpu]); 8184 goto out_free; 8185 } 8186 } 8187 8188 /* Now create the rb hammer! */ 8189 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 8190 if (WARN_ON(IS_ERR(rb_hammer))) { 8191 pr_cont("FAILED\n"); 8192 ret = PTR_ERR(rb_hammer); 8193 goto out_free; 8194 } 8195 8196 ring_buffer_record_on(buffer); 8197 /* 8198 * Show buffer is enabled before setting rb_test_started. 8199 * Yes there's a small race window where events could be 8200 * dropped and the thread won't catch it. But when a ring 8201 * buffer gets enabled, there will always be some kind of 8202 * delay before other CPUs see it. Thus, we don't care about 8203 * those dropped events. We care about events dropped after 8204 * the threads see that the buffer is active. 8205 */ 8206 smp_wmb(); 8207 rb_test_started = true; 8208 8209 set_current_state(TASK_INTERRUPTIBLE); 8210 /* Just run for 10 seconds */ 8211 schedule_timeout(10 * HZ); 8212 8213 kthread_stop(rb_hammer); 8214 8215 out_free: 8216 for_each_online_cpu(cpu) { 8217 if (!rb_threads[cpu]) 8218 break; 8219 kthread_stop(rb_threads[cpu]); 8220 } 8221 if (ret) { 8222 ring_buffer_free(buffer); 8223 return ret; 8224 } 8225 8226 /* Report! */ 8227 pr_info("finished\n"); 8228 for_each_online_cpu(cpu) { 8229 struct ring_buffer_event *event; 8230 struct rb_test_data *data = &rb_data[cpu]; 8231 struct rb_item *item; 8232 unsigned long total_events; 8233 unsigned long total_dropped; 8234 unsigned long total_written; 8235 unsigned long total_alloc; 8236 unsigned long total_read = 0; 8237 unsigned long total_size = 0; 8238 unsigned long total_len = 0; 8239 unsigned long total_lost = 0; 8240 unsigned long lost; 8241 int big_event_size; 8242 int small_event_size; 8243 8244 ret = -1; 8245 8246 total_events = data->events + data->events_nested; 8247 total_written = data->bytes_written + data->bytes_written_nested; 8248 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 8249 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 8250 8251 big_event_size = data->max_size + data->max_size_nested; 8252 small_event_size = data->min_size + data->min_size_nested; 8253 8254 pr_info("CPU %d:\n", cpu); 8255 pr_info(" events: %ld\n", total_events); 8256 pr_info(" dropped bytes: %ld\n", total_dropped); 8257 pr_info(" alloced bytes: %ld\n", total_alloc); 8258 pr_info(" written bytes: %ld\n", total_written); 8259 pr_info(" biggest event: %d\n", big_event_size); 8260 pr_info(" smallest event: %d\n", small_event_size); 8261 8262 if (RB_WARN_ON(buffer, total_dropped)) 8263 break; 8264 8265 ret = 0; 8266 8267 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 8268 total_lost += lost; 8269 item = ring_buffer_event_data(event); 8270 total_len += ring_buffer_event_length(event); 8271 total_size += item->size + sizeof(struct rb_item); 8272 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 8273 pr_info("FAILED!\n"); 8274 pr_info("buffer had: %.*s\n", item->size, item->str); 8275 pr_info("expected: %.*s\n", item->size, rb_string); 8276 RB_WARN_ON(buffer, 1); 8277 ret = -1; 8278 break; 8279 } 8280 total_read++; 8281 } 8282 if (ret) 8283 break; 8284 8285 ret = -1; 8286 8287 pr_info(" read events: %ld\n", total_read); 8288 pr_info(" lost events: %ld\n", total_lost); 8289 pr_info(" total events: %ld\n", total_lost + total_read); 8290 pr_info(" recorded len bytes: %ld\n", total_len); 8291 pr_info(" recorded size bytes: %ld\n", total_size); 8292 if (total_lost) { 8293 pr_info(" With dropped events, record len and size may not match\n" 8294 " alloced and written from above\n"); 8295 } else { 8296 if (RB_WARN_ON(buffer, total_len != total_alloc || 8297 total_size != total_written)) 8298 break; 8299 } 8300 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 8301 break; 8302 8303 ret = 0; 8304 } 8305 if (!ret) 8306 pr_info("Ring buffer PASSED!\n"); 8307 8308 ring_buffer_free(buffer); 8309 return 0; 8310 } 8311 8312 late_initcall(test_ringbuffer); 8313 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 8314