1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/ring_buffer_types.h> 8 #include <linux/sched/isolation.h> 9 #include <linux/trace_recursion.h> 10 #include <linux/trace_events.h> 11 #include <linux/ring_buffer.h> 12 #include <linux/trace_clock.h> 13 #include <linux/sched/clock.h> 14 #include <linux/cacheflush.h> 15 #include <linux/trace_seq.h> 16 #include <linux/spinlock.h> 17 #include <linux/irq_work.h> 18 #include <linux/security.h> 19 #include <linux/uaccess.h> 20 #include <linux/hardirq.h> 21 #include <linux/kthread.h> /* for self test */ 22 #include <linux/module.h> 23 #include <linux/percpu.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/slab.h> 27 #include <linux/init.h> 28 #include <linux/hash.h> 29 #include <linux/list.h> 30 #include <linux/cpu.h> 31 #include <linux/oom.h> 32 #include <linux/mm.h> 33 34 #include <asm/local64.h> 35 #include <asm/local.h> 36 #include <asm/setup.h> 37 38 #include "trace.h" 39 40 /* 41 * The "absolute" timestamp in the buffer is only 59 bits. 42 * If a clock has the 5 MSBs set, it needs to be saved and 43 * reinserted. 44 */ 45 #define TS_MSB (0xf8ULL << 56) 46 #define ABS_TS_MASK (~TS_MSB) 47 48 static void update_pages_handler(struct work_struct *work); 49 50 #define RING_BUFFER_META_MAGIC 0xBADFEED 51 52 struct ring_buffer_meta { 53 int magic; 54 int struct_sizes; 55 unsigned long total_size; 56 unsigned long buffers_offset; 57 }; 58 59 struct ring_buffer_cpu_meta { 60 unsigned long first_buffer; 61 unsigned long head_buffer; 62 unsigned long commit_buffer; 63 __u32 subbuf_size; 64 __u32 nr_subbufs; 65 int buffers[]; 66 }; 67 68 /* 69 * The ring buffer header is special. We must manually up keep it. 70 */ 71 int ring_buffer_print_entry_header(struct trace_seq *s) 72 { 73 trace_seq_puts(s, "# compressed entry header\n"); 74 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 75 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 76 trace_seq_puts(s, "\tarray : 32 bits\n"); 77 trace_seq_putc(s, '\n'); 78 trace_seq_printf(s, "\tpadding : type == %d\n", 79 RINGBUF_TYPE_PADDING); 80 trace_seq_printf(s, "\ttime_extend : type == %d\n", 81 RINGBUF_TYPE_TIME_EXTEND); 82 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 83 RINGBUF_TYPE_TIME_STAMP); 84 trace_seq_printf(s, "\tdata max type_len == %d\n", 85 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 86 87 return !trace_seq_has_overflowed(s); 88 } 89 90 /* 91 * The ring buffer is made up of a list of pages. A separate list of pages is 92 * allocated for each CPU. A writer may only write to a buffer that is 93 * associated with the CPU it is currently executing on. A reader may read 94 * from any per cpu buffer. 95 * 96 * The reader is special. For each per cpu buffer, the reader has its own 97 * reader page. When a reader has read the entire reader page, this reader 98 * page is swapped with another page in the ring buffer. 99 * 100 * Now, as long as the writer is off the reader page, the reader can do what 101 * ever it wants with that page. The writer will never write to that page 102 * again (as long as it is out of the ring buffer). 103 * 104 * Here's some silly ASCII art. 105 * 106 * +------+ 107 * |reader| RING BUFFER 108 * |page | 109 * +------+ +---+ +---+ +---+ 110 * | |-->| |-->| | 111 * +---+ +---+ +---+ 112 * ^ | 113 * | | 114 * +---------------+ 115 * 116 * 117 * +------+ 118 * |reader| RING BUFFER 119 * |page |------------------v 120 * +------+ +---+ +---+ +---+ 121 * | |-->| |-->| | 122 * +---+ +---+ +---+ 123 * ^ | 124 * | | 125 * +---------------+ 126 * 127 * 128 * +------+ 129 * |reader| RING BUFFER 130 * |page |------------------v 131 * +------+ +---+ +---+ +---+ 132 * ^ | |-->| |-->| | 133 * | +---+ +---+ +---+ 134 * | | 135 * | | 136 * +------------------------------+ 137 * 138 * 139 * +------+ 140 * |buffer| RING BUFFER 141 * |page |------------------v 142 * +------+ +---+ +---+ +---+ 143 * ^ | | | |-->| | 144 * | New +---+ +---+ +---+ 145 * | Reader------^ | 146 * | page | 147 * +------------------------------+ 148 * 149 * 150 * After we make this swap, the reader can hand this page off to the splice 151 * code and be done with it. It can even allocate a new page if it needs to 152 * and swap that into the ring buffer. 153 * 154 * We will be using cmpxchg soon to make all this lockless. 155 * 156 */ 157 158 /* Used for individual buffers (after the counter) */ 159 #define RB_BUFFER_OFF (1 << 20) 160 161 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 162 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 163 164 enum { 165 RB_LEN_TIME_EXTEND = 8, 166 RB_LEN_TIME_STAMP = 8, 167 }; 168 169 #define skip_time_extend(event) \ 170 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 171 172 #define extended_time(event) \ 173 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 174 175 static inline bool rb_null_event(struct ring_buffer_event *event) 176 { 177 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 178 } 179 180 static void rb_event_set_padding(struct ring_buffer_event *event) 181 { 182 /* padding has a NULL time_delta */ 183 event->type_len = RINGBUF_TYPE_PADDING; 184 event->time_delta = 0; 185 } 186 187 static unsigned 188 rb_event_data_length(struct ring_buffer_event *event) 189 { 190 unsigned length; 191 192 if (event->type_len) 193 length = event->type_len * RB_ALIGNMENT; 194 else 195 length = event->array[0]; 196 return length + RB_EVNT_HDR_SIZE; 197 } 198 199 /* 200 * Return the length of the given event. Will return 201 * the length of the time extend if the event is a 202 * time extend. 203 */ 204 static inline unsigned 205 rb_event_length(struct ring_buffer_event *event) 206 { 207 switch (event->type_len) { 208 case RINGBUF_TYPE_PADDING: 209 if (rb_null_event(event)) 210 /* undefined */ 211 return -1; 212 return event->array[0] + RB_EVNT_HDR_SIZE; 213 214 case RINGBUF_TYPE_TIME_EXTEND: 215 return RB_LEN_TIME_EXTEND; 216 217 case RINGBUF_TYPE_TIME_STAMP: 218 return RB_LEN_TIME_STAMP; 219 220 case RINGBUF_TYPE_DATA: 221 return rb_event_data_length(event); 222 default: 223 WARN_ON_ONCE(1); 224 } 225 /* not hit */ 226 return 0; 227 } 228 229 /* 230 * Return total length of time extend and data, 231 * or just the event length for all other events. 232 */ 233 static inline unsigned 234 rb_event_ts_length(struct ring_buffer_event *event) 235 { 236 unsigned len = 0; 237 238 if (extended_time(event)) { 239 /* time extends include the data event after it */ 240 len = RB_LEN_TIME_EXTEND; 241 event = skip_time_extend(event); 242 } 243 return len + rb_event_length(event); 244 } 245 246 /** 247 * ring_buffer_event_length - return the length of the event 248 * @event: the event to get the length of 249 * 250 * Returns the size of the data load of a data event. 251 * If the event is something other than a data event, it 252 * returns the size of the event itself. With the exception 253 * of a TIME EXTEND, where it still returns the size of the 254 * data load of the data event after it. 255 */ 256 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 257 { 258 unsigned length; 259 260 if (extended_time(event)) 261 event = skip_time_extend(event); 262 263 length = rb_event_length(event); 264 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 265 return length; 266 length -= RB_EVNT_HDR_SIZE; 267 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 268 length -= sizeof(event->array[0]); 269 return length; 270 } 271 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 272 273 /* inline for ring buffer fast paths */ 274 static __always_inline void * 275 rb_event_data(struct ring_buffer_event *event) 276 { 277 if (extended_time(event)) 278 event = skip_time_extend(event); 279 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 280 /* If length is in len field, then array[0] has the data */ 281 if (event->type_len) 282 return (void *)&event->array[0]; 283 /* Otherwise length is in array[0] and array[1] has the data */ 284 return (void *)&event->array[1]; 285 } 286 287 /** 288 * ring_buffer_event_data - return the data of the event 289 * @event: the event to get the data from 290 */ 291 void *ring_buffer_event_data(struct ring_buffer_event *event) 292 { 293 return rb_event_data(event); 294 } 295 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 296 297 #define for_each_buffer_cpu(buffer, cpu) \ 298 for_each_cpu(cpu, buffer->cpumask) 299 300 #define for_each_online_buffer_cpu(buffer, cpu) \ 301 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 302 303 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 304 { 305 u64 ts; 306 307 ts = event->array[0]; 308 ts <<= TS_SHIFT; 309 ts += event->time_delta; 310 311 return ts; 312 } 313 314 /* Flag when events were overwritten */ 315 #define RB_MISSED_EVENTS (1 << 31) 316 /* Missed count stored at end */ 317 #define RB_MISSED_STORED (1 << 30) 318 319 #define RB_MISSED_MASK (3 << 30) 320 321 struct buffer_data_read_page { 322 unsigned order; /* order of the page */ 323 struct buffer_data_page *data; /* actual data, stored in this page */ 324 }; 325 326 /* 327 * Note, the buffer_page list must be first. The buffer pages 328 * are allocated in cache lines, which means that each buffer 329 * page will be at the beginning of a cache line, and thus 330 * the least significant bits will be zero. We use this to 331 * add flags in the list struct pointers, to make the ring buffer 332 * lockless. 333 */ 334 struct buffer_page { 335 struct list_head list; /* list of buffer pages */ 336 local_t write; /* index for next write */ 337 unsigned read; /* index for next read */ 338 local_t entries; /* entries on this page */ 339 unsigned long real_end; /* real end of data */ 340 unsigned order; /* order of the page */ 341 u32 id:30; /* ID for external mapping */ 342 u32 range:1; /* Mapped via a range */ 343 struct buffer_data_page *page; /* Actual data page */ 344 }; 345 346 /* 347 * The buffer page counters, write and entries, must be reset 348 * atomically when crossing page boundaries. To synchronize this 349 * update, two counters are inserted into the number. One is 350 * the actual counter for the write position or count on the page. 351 * 352 * The other is a counter of updaters. Before an update happens 353 * the update partition of the counter is incremented. This will 354 * allow the updater to update the counter atomically. 355 * 356 * The counter is 20 bits, and the state data is 12. 357 */ 358 #define RB_WRITE_MASK 0xfffff 359 #define RB_WRITE_INTCNT (1 << 20) 360 361 static void rb_init_page(struct buffer_data_page *bpage) 362 { 363 local_set(&bpage->commit, 0); 364 } 365 366 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 367 { 368 return local_read(&bpage->page->commit); 369 } 370 371 static void free_buffer_page(struct buffer_page *bpage) 372 { 373 /* Range pages are not to be freed */ 374 if (!bpage->range) 375 free_pages((unsigned long)bpage->page, bpage->order); 376 kfree(bpage); 377 } 378 379 /* 380 * For best performance, allocate cpu buffer data cache line sized 381 * and per CPU. 382 */ 383 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 384 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 385 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 386 387 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 388 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 389 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 390 391 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 392 { 393 struct buffer_data_page *dpage; 394 struct page *page; 395 gfp_t mflags; 396 397 /* 398 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 399 * gracefully without invoking oom-killer and the system is not 400 * destabilized. 401 */ 402 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 403 404 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 405 if (!page) 406 return NULL; 407 408 dpage = page_address(page); 409 rb_init_page(dpage); 410 411 return dpage; 412 } 413 414 struct rb_irq_work { 415 struct irq_work work; 416 wait_queue_head_t waiters; 417 wait_queue_head_t full_waiters; 418 atomic_t seq; 419 bool waiters_pending; 420 bool full_waiters_pending; 421 bool wakeup_full; 422 }; 423 424 /* 425 * Structure to hold event state and handle nested events. 426 */ 427 struct rb_event_info { 428 u64 ts; 429 u64 delta; 430 u64 before; 431 u64 after; 432 unsigned long length; 433 struct buffer_page *tail_page; 434 int add_timestamp; 435 }; 436 437 /* 438 * Used for the add_timestamp 439 * NONE 440 * EXTEND - wants a time extend 441 * ABSOLUTE - the buffer requests all events to have absolute time stamps 442 * FORCE - force a full time stamp. 443 */ 444 enum { 445 RB_ADD_STAMP_NONE = 0, 446 RB_ADD_STAMP_EXTEND = BIT(1), 447 RB_ADD_STAMP_ABSOLUTE = BIT(2), 448 RB_ADD_STAMP_FORCE = BIT(3) 449 }; 450 /* 451 * Used for which event context the event is in. 452 * TRANSITION = 0 453 * NMI = 1 454 * IRQ = 2 455 * SOFTIRQ = 3 456 * NORMAL = 4 457 * 458 * See trace_recursive_lock() comment below for more details. 459 */ 460 enum { 461 RB_CTX_TRANSITION, 462 RB_CTX_NMI, 463 RB_CTX_IRQ, 464 RB_CTX_SOFTIRQ, 465 RB_CTX_NORMAL, 466 RB_CTX_MAX 467 }; 468 469 struct rb_time_struct { 470 local64_t time; 471 }; 472 typedef struct rb_time_struct rb_time_t; 473 474 #define MAX_NEST 5 475 476 /* 477 * head_page == tail_page && head == tail then buffer is empty. 478 */ 479 struct ring_buffer_per_cpu { 480 int cpu; 481 atomic_t record_disabled; 482 atomic_t resize_disabled; 483 struct trace_buffer *buffer; 484 raw_spinlock_t reader_lock; /* serialize readers */ 485 arch_spinlock_t lock; 486 struct lock_class_key lock_key; 487 struct buffer_data_page *free_page; 488 unsigned long nr_pages; 489 unsigned int current_context; 490 struct list_head *pages; 491 /* pages generation counter, incremented when the list changes */ 492 unsigned long cnt; 493 struct buffer_page *head_page; /* read from head */ 494 struct buffer_page *tail_page; /* write to tail */ 495 struct buffer_page *commit_page; /* committed pages */ 496 struct buffer_page *reader_page; 497 unsigned long lost_events; 498 unsigned long last_overrun; 499 unsigned long nest; 500 local_t entries_bytes; 501 local_t entries; 502 local_t overrun; 503 local_t commit_overrun; 504 local_t dropped_events; 505 local_t committing; 506 local_t commits; 507 local_t pages_touched; 508 local_t pages_lost; 509 local_t pages_read; 510 long last_pages_touch; 511 size_t shortest_full; 512 unsigned long read; 513 unsigned long read_bytes; 514 rb_time_t write_stamp; 515 rb_time_t before_stamp; 516 u64 event_stamp[MAX_NEST]; 517 u64 read_stamp; 518 /* pages removed since last reset */ 519 unsigned long pages_removed; 520 521 unsigned int mapped; 522 unsigned int user_mapped; /* user space mapping */ 523 struct mutex mapping_lock; 524 struct buffer_page **subbuf_ids; /* ID to subbuf VA */ 525 struct trace_buffer_meta *meta_page; 526 struct ring_buffer_cpu_meta *ring_meta; 527 528 struct ring_buffer_remote *remote; 529 530 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 531 long nr_pages_to_update; 532 struct list_head new_pages; /* new pages to add */ 533 struct work_struct update_pages_work; 534 struct completion update_done; 535 536 struct rb_irq_work irq_work; 537 }; 538 539 struct trace_buffer { 540 unsigned flags; 541 int cpus; 542 atomic_t record_disabled; 543 atomic_t resizing; 544 cpumask_var_t cpumask; 545 546 struct lock_class_key *reader_lock_key; 547 548 struct mutex mutex; 549 550 struct ring_buffer_per_cpu **buffers; 551 552 struct ring_buffer_remote *remote; 553 554 struct hlist_node node; 555 u64 (*clock)(void); 556 557 struct rb_irq_work irq_work; 558 bool time_stamp_abs; 559 560 unsigned long range_addr_start; 561 unsigned long range_addr_end; 562 563 struct ring_buffer_meta *meta; 564 565 unsigned int subbuf_size; 566 unsigned int subbuf_order; 567 unsigned int max_data_size; 568 }; 569 570 struct ring_buffer_iter { 571 struct ring_buffer_per_cpu *cpu_buffer; 572 unsigned long head; 573 unsigned long next_event; 574 struct buffer_page *head_page; 575 struct buffer_page *cache_reader_page; 576 unsigned long cache_read; 577 unsigned long cache_pages_removed; 578 u64 read_stamp; 579 u64 page_stamp; 580 struct ring_buffer_event *event; 581 size_t event_size; 582 int missed_events; 583 }; 584 585 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 586 { 587 struct buffer_data_page field; 588 589 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 590 "offset:0;\tsize:%u;\tsigned:%u;\n", 591 (unsigned int)sizeof(field.time_stamp), 592 (unsigned int)is_signed_type(u64)); 593 594 trace_seq_printf(s, "\tfield: local_t commit;\t" 595 "offset:%u;\tsize:%u;\tsigned:%u;\n", 596 (unsigned int)offsetof(typeof(field), commit), 597 (unsigned int)sizeof(field.commit), 598 (unsigned int)is_signed_type(long)); 599 600 trace_seq_printf(s, "\tfield: int overwrite;\t" 601 "offset:%u;\tsize:%u;\tsigned:%u;\n", 602 (unsigned int)offsetof(typeof(field), commit), 603 1, 604 (unsigned int)is_signed_type(long)); 605 606 trace_seq_printf(s, "\tfield: char data;\t" 607 "offset:%u;\tsize:%u;\tsigned:%u;\n", 608 (unsigned int)offsetof(typeof(field), data), 609 (unsigned int)(buffer ? buffer->subbuf_size : 610 PAGE_SIZE - BUF_PAGE_HDR_SIZE), 611 (unsigned int)is_signed_type(char)); 612 613 return !trace_seq_has_overflowed(s); 614 } 615 616 static inline void rb_time_read(rb_time_t *t, u64 *ret) 617 { 618 *ret = local64_read(&t->time); 619 } 620 static void rb_time_set(rb_time_t *t, u64 val) 621 { 622 local64_set(&t->time, val); 623 } 624 625 /* 626 * Enable this to make sure that the event passed to 627 * ring_buffer_event_time_stamp() is not committed and also 628 * is on the buffer that it passed in. 629 */ 630 //#define RB_VERIFY_EVENT 631 #ifdef RB_VERIFY_EVENT 632 static struct list_head *rb_list_head(struct list_head *list); 633 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 634 void *event) 635 { 636 struct buffer_page *page = cpu_buffer->commit_page; 637 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 638 struct list_head *next; 639 long commit, write; 640 unsigned long addr = (unsigned long)event; 641 bool done = false; 642 int stop = 0; 643 644 /* Make sure the event exists and is not committed yet */ 645 do { 646 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 647 done = true; 648 commit = local_read(&page->page->commit); 649 write = local_read(&page->write); 650 if (addr >= (unsigned long)&page->page->data[commit] && 651 addr < (unsigned long)&page->page->data[write]) 652 return; 653 654 next = rb_list_head(page->list.next); 655 page = list_entry(next, struct buffer_page, list); 656 } while (!done); 657 WARN_ON_ONCE(1); 658 } 659 #else 660 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 661 void *event) 662 { 663 } 664 #endif 665 666 /* 667 * The absolute time stamp drops the 5 MSBs and some clocks may 668 * require them. The rb_fix_abs_ts() will take a previous full 669 * time stamp, and add the 5 MSB of that time stamp on to the 670 * saved absolute time stamp. Then they are compared in case of 671 * the unlikely event that the latest time stamp incremented 672 * the 5 MSB. 673 */ 674 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 675 { 676 if (save_ts & TS_MSB) { 677 abs |= save_ts & TS_MSB; 678 /* Check for overflow */ 679 if (unlikely(abs < save_ts)) 680 abs += 1ULL << 59; 681 } 682 return abs; 683 } 684 685 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 686 687 /** 688 * ring_buffer_event_time_stamp - return the event's current time stamp 689 * @buffer: The buffer that the event is on 690 * @event: the event to get the time stamp of 691 * 692 * Note, this must be called after @event is reserved, and before it is 693 * committed to the ring buffer. And must be called from the same 694 * context where the event was reserved (normal, softirq, irq, etc). 695 * 696 * Returns the time stamp associated with the current event. 697 * If the event has an extended time stamp, then that is used as 698 * the time stamp to return. 699 * In the highly unlikely case that the event was nested more than 700 * the max nesting, then the write_stamp of the buffer is returned, 701 * otherwise current time is returned, but that really neither of 702 * the last two cases should ever happen. 703 */ 704 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 705 struct ring_buffer_event *event) 706 { 707 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 708 unsigned int nest; 709 u64 ts; 710 711 /* If the event includes an absolute time, then just use that */ 712 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 713 ts = rb_event_time_stamp(event); 714 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 715 } 716 717 nest = local_read(&cpu_buffer->committing); 718 verify_event(cpu_buffer, event); 719 if (WARN_ON_ONCE(!nest)) 720 goto fail; 721 722 /* Read the current saved nesting level time stamp */ 723 if (likely(--nest < MAX_NEST)) 724 return cpu_buffer->event_stamp[nest]; 725 726 /* Shouldn't happen, warn if it does */ 727 WARN_ONCE(1, "nest (%d) greater than max", nest); 728 729 fail: 730 rb_time_read(&cpu_buffer->write_stamp, &ts); 731 732 return ts; 733 } 734 735 /** 736 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 737 * @buffer: The ring_buffer to get the number of pages from 738 * @cpu: The cpu of the ring_buffer to get the number of pages from 739 * 740 * Returns the number of pages that have content in the ring buffer. 741 */ 742 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 743 { 744 size_t read; 745 size_t lost; 746 size_t cnt; 747 748 read = local_read(&buffer->buffers[cpu]->pages_read); 749 lost = local_read(&buffer->buffers[cpu]->pages_lost); 750 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 751 752 if (WARN_ON_ONCE(cnt < lost)) 753 return 0; 754 755 cnt -= lost; 756 757 /* The reader can read an empty page, but not more than that */ 758 if (cnt < read) { 759 WARN_ON_ONCE(read > cnt + 1); 760 return 0; 761 } 762 763 return cnt - read; 764 } 765 766 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 767 { 768 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 769 size_t nr_pages; 770 size_t dirty; 771 772 nr_pages = cpu_buffer->nr_pages; 773 if (!nr_pages || !full) 774 return true; 775 776 /* 777 * Add one as dirty will never equal nr_pages, as the sub-buffer 778 * that the writer is on is not counted as dirty. 779 * This is needed if "buffer_percent" is set to 100. 780 */ 781 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 782 783 return (dirty * 100) >= (full * nr_pages); 784 } 785 786 /* 787 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 788 * 789 * Schedules a delayed work to wake up any task that is blocked on the 790 * ring buffer waiters queue. 791 */ 792 static void rb_wake_up_waiters(struct irq_work *work) 793 { 794 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 795 796 /* For waiters waiting for the first wake up */ 797 (void)atomic_fetch_inc_release(&rbwork->seq); 798 799 wake_up_all(&rbwork->waiters); 800 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 801 /* Only cpu_buffer sets the above flags */ 802 struct ring_buffer_per_cpu *cpu_buffer = 803 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 804 805 /* Called from interrupt context */ 806 raw_spin_lock(&cpu_buffer->reader_lock); 807 rbwork->wakeup_full = false; 808 rbwork->full_waiters_pending = false; 809 810 /* Waking up all waiters, they will reset the shortest full */ 811 cpu_buffer->shortest_full = 0; 812 raw_spin_unlock(&cpu_buffer->reader_lock); 813 814 wake_up_all(&rbwork->full_waiters); 815 } 816 } 817 818 /** 819 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 820 * @buffer: The ring buffer to wake waiters on 821 * @cpu: The CPU buffer to wake waiters on 822 * 823 * In the case of a file that represents a ring buffer is closing, 824 * it is prudent to wake up any waiters that are on this. 825 */ 826 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 827 { 828 struct ring_buffer_per_cpu *cpu_buffer; 829 struct rb_irq_work *rbwork; 830 831 if (!buffer) 832 return; 833 834 if (cpu == RING_BUFFER_ALL_CPUS) { 835 836 /* Wake up individual ones too. One level recursion */ 837 for_each_buffer_cpu(buffer, cpu) 838 ring_buffer_wake_waiters(buffer, cpu); 839 840 rbwork = &buffer->irq_work; 841 } else { 842 if (WARN_ON_ONCE(!buffer->buffers)) 843 return; 844 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 845 return; 846 847 cpu_buffer = buffer->buffers[cpu]; 848 /* The CPU buffer may not have been initialized yet */ 849 if (!cpu_buffer) 850 return; 851 rbwork = &cpu_buffer->irq_work; 852 } 853 854 /* This can be called in any context */ 855 irq_work_queue(&rbwork->work); 856 } 857 858 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 859 { 860 struct ring_buffer_per_cpu *cpu_buffer; 861 bool ret = false; 862 863 /* Reads of all CPUs always waits for any data */ 864 if (cpu == RING_BUFFER_ALL_CPUS) 865 return !ring_buffer_empty(buffer); 866 867 cpu_buffer = buffer->buffers[cpu]; 868 869 if (!ring_buffer_empty_cpu(buffer, cpu)) { 870 unsigned long flags; 871 bool pagebusy; 872 873 if (!full) 874 return true; 875 876 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 877 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 878 ret = !pagebusy && full_hit(buffer, cpu, full); 879 880 if (!ret && (!cpu_buffer->shortest_full || 881 cpu_buffer->shortest_full > full)) { 882 cpu_buffer->shortest_full = full; 883 } 884 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 885 } 886 return ret; 887 } 888 889 static inline bool 890 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 891 int cpu, int full, ring_buffer_cond_fn cond, void *data) 892 { 893 if (rb_watermark_hit(buffer, cpu, full)) 894 return true; 895 896 if (cond(data)) 897 return true; 898 899 /* 900 * The events can happen in critical sections where 901 * checking a work queue can cause deadlocks. 902 * After adding a task to the queue, this flag is set 903 * only to notify events to try to wake up the queue 904 * using irq_work. 905 * 906 * We don't clear it even if the buffer is no longer 907 * empty. The flag only causes the next event to run 908 * irq_work to do the work queue wake up. The worse 909 * that can happen if we race with !trace_empty() is that 910 * an event will cause an irq_work to try to wake up 911 * an empty queue. 912 * 913 * There's no reason to protect this flag either, as 914 * the work queue and irq_work logic will do the necessary 915 * synchronization for the wake ups. The only thing 916 * that is necessary is that the wake up happens after 917 * a task has been queued. It's OK for spurious wake ups. 918 */ 919 if (full) 920 rbwork->full_waiters_pending = true; 921 else 922 rbwork->waiters_pending = true; 923 924 return false; 925 } 926 927 struct rb_wait_data { 928 struct rb_irq_work *irq_work; 929 int seq; 930 }; 931 932 /* 933 * The default wait condition for ring_buffer_wait() is to just to exit the 934 * wait loop the first time it is woken up. 935 */ 936 static bool rb_wait_once(void *data) 937 { 938 struct rb_wait_data *rdata = data; 939 struct rb_irq_work *rbwork = rdata->irq_work; 940 941 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 942 } 943 944 /** 945 * ring_buffer_wait - wait for input to the ring buffer 946 * @buffer: buffer to wait on 947 * @cpu: the cpu buffer to wait on 948 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 949 * @cond: condition function to break out of wait (NULL to run once) 950 * @data: the data to pass to @cond. 951 * 952 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 953 * as data is added to any of the @buffer's cpu buffers. Otherwise 954 * it will wait for data to be added to a specific cpu buffer. 955 */ 956 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 957 ring_buffer_cond_fn cond, void *data) 958 { 959 struct ring_buffer_per_cpu *cpu_buffer; 960 struct wait_queue_head *waitq; 961 struct rb_irq_work *rbwork; 962 struct rb_wait_data rdata; 963 int ret = 0; 964 965 /* 966 * Depending on what the caller is waiting for, either any 967 * data in any cpu buffer, or a specific buffer, put the 968 * caller on the appropriate wait queue. 969 */ 970 if (cpu == RING_BUFFER_ALL_CPUS) { 971 rbwork = &buffer->irq_work; 972 /* Full only makes sense on per cpu reads */ 973 full = 0; 974 } else { 975 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 976 return -ENODEV; 977 cpu_buffer = buffer->buffers[cpu]; 978 rbwork = &cpu_buffer->irq_work; 979 } 980 981 if (full) 982 waitq = &rbwork->full_waiters; 983 else 984 waitq = &rbwork->waiters; 985 986 /* Set up to exit loop as soon as it is woken */ 987 if (!cond) { 988 cond = rb_wait_once; 989 rdata.irq_work = rbwork; 990 rdata.seq = atomic_read_acquire(&rbwork->seq); 991 data = &rdata; 992 } 993 994 ret = wait_event_interruptible((*waitq), 995 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 996 997 return ret; 998 } 999 1000 /** 1001 * ring_buffer_poll_wait - poll on buffer input 1002 * @buffer: buffer to wait on 1003 * @cpu: the cpu buffer to wait on 1004 * @filp: the file descriptor 1005 * @poll_table: The poll descriptor 1006 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1007 * 1008 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1009 * as data is added to any of the @buffer's cpu buffers. Otherwise 1010 * it will wait for data to be added to a specific cpu buffer. 1011 * 1012 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1013 * zero otherwise. 1014 */ 1015 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1016 struct file *filp, poll_table *poll_table, int full) 1017 { 1018 struct ring_buffer_per_cpu *cpu_buffer; 1019 struct rb_irq_work *rbwork; 1020 1021 if (cpu == RING_BUFFER_ALL_CPUS) { 1022 rbwork = &buffer->irq_work; 1023 full = 0; 1024 } else { 1025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1026 return EPOLLERR; 1027 1028 cpu_buffer = buffer->buffers[cpu]; 1029 rbwork = &cpu_buffer->irq_work; 1030 } 1031 1032 if (full) { 1033 poll_wait(filp, &rbwork->full_waiters, poll_table); 1034 1035 if (rb_watermark_hit(buffer, cpu, full)) 1036 return EPOLLIN | EPOLLRDNORM; 1037 /* 1038 * Only allow full_waiters_pending update to be seen after 1039 * the shortest_full is set (in rb_watermark_hit). If the 1040 * writer sees the full_waiters_pending flag set, it will 1041 * compare the amount in the ring buffer to shortest_full. 1042 * If the amount in the ring buffer is greater than the 1043 * shortest_full percent, it will call the irq_work handler 1044 * to wake up this list. The irq_handler will reset shortest_full 1045 * back to zero. That's done under the reader_lock, but 1046 * the below smp_mb() makes sure that the update to 1047 * full_waiters_pending doesn't leak up into the above. 1048 */ 1049 smp_mb(); 1050 rbwork->full_waiters_pending = true; 1051 return 0; 1052 } 1053 1054 poll_wait(filp, &rbwork->waiters, poll_table); 1055 rbwork->waiters_pending = true; 1056 1057 /* 1058 * There's a tight race between setting the waiters_pending and 1059 * checking if the ring buffer is empty. Once the waiters_pending bit 1060 * is set, the next event will wake the task up, but we can get stuck 1061 * if there's only a single event in. 1062 * 1063 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1064 * but adding a memory barrier to all events will cause too much of a 1065 * performance hit in the fast path. We only need a memory barrier when 1066 * the buffer goes from empty to having content. But as this race is 1067 * extremely small, and it's not a problem if another event comes in, we 1068 * will fix it later. 1069 */ 1070 smp_mb(); 1071 1072 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1073 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1074 return EPOLLIN | EPOLLRDNORM; 1075 return 0; 1076 } 1077 1078 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1079 #define RB_WARN_ON(b, cond) \ 1080 ({ \ 1081 int _____ret = unlikely(cond); \ 1082 if (_____ret) { \ 1083 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1084 struct ring_buffer_per_cpu *__b = \ 1085 (void *)b; \ 1086 atomic_inc(&__b->buffer->record_disabled); \ 1087 } else \ 1088 atomic_inc(&b->record_disabled); \ 1089 WARN_ON(1); \ 1090 } \ 1091 _____ret; \ 1092 }) 1093 1094 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1095 #define DEBUG_SHIFT 0 1096 1097 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1098 { 1099 u64 ts; 1100 1101 /* Skip retpolines :-( */ 1102 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1103 ts = trace_clock_local(); 1104 else 1105 ts = buffer->clock(); 1106 1107 /* shift to debug/test normalization and TIME_EXTENTS */ 1108 return ts << DEBUG_SHIFT; 1109 } 1110 1111 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1112 { 1113 u64 time; 1114 1115 preempt_disable_notrace(); 1116 time = rb_time_stamp(buffer); 1117 preempt_enable_notrace(); 1118 1119 return time; 1120 } 1121 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1122 1123 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1124 int cpu, u64 *ts) 1125 { 1126 /* Just stupid testing the normalize function and deltas */ 1127 *ts >>= DEBUG_SHIFT; 1128 } 1129 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1130 1131 /* 1132 * Making the ring buffer lockless makes things tricky. 1133 * Although writes only happen on the CPU that they are on, 1134 * and they only need to worry about interrupts. Reads can 1135 * happen on any CPU. 1136 * 1137 * The reader page is always off the ring buffer, but when the 1138 * reader finishes with a page, it needs to swap its page with 1139 * a new one from the buffer. The reader needs to take from 1140 * the head (writes go to the tail). But if a writer is in overwrite 1141 * mode and wraps, it must push the head page forward. 1142 * 1143 * Here lies the problem. 1144 * 1145 * The reader must be careful to replace only the head page, and 1146 * not another one. As described at the top of the file in the 1147 * ASCII art, the reader sets its old page to point to the next 1148 * page after head. It then sets the page after head to point to 1149 * the old reader page. But if the writer moves the head page 1150 * during this operation, the reader could end up with the tail. 1151 * 1152 * We use cmpxchg to help prevent this race. We also do something 1153 * special with the page before head. We set the LSB to 1. 1154 * 1155 * When the writer must push the page forward, it will clear the 1156 * bit that points to the head page, move the head, and then set 1157 * the bit that points to the new head page. 1158 * 1159 * We also don't want an interrupt coming in and moving the head 1160 * page on another writer. Thus we use the second LSB to catch 1161 * that too. Thus: 1162 * 1163 * head->list->prev->next bit 1 bit 0 1164 * ------- ------- 1165 * Normal page 0 0 1166 * Points to head page 0 1 1167 * New head page 1 0 1168 * 1169 * Note we can not trust the prev pointer of the head page, because: 1170 * 1171 * +----+ +-----+ +-----+ 1172 * | |------>| T |---X--->| N | 1173 * | |<------| | | | 1174 * +----+ +-----+ +-----+ 1175 * ^ ^ | 1176 * | +-----+ | | 1177 * +----------| R |----------+ | 1178 * | |<-----------+ 1179 * +-----+ 1180 * 1181 * Key: ---X--> HEAD flag set in pointer 1182 * T Tail page 1183 * R Reader page 1184 * N Next page 1185 * 1186 * (see __rb_reserve_next() to see where this happens) 1187 * 1188 * What the above shows is that the reader just swapped out 1189 * the reader page with a page in the buffer, but before it 1190 * could make the new header point back to the new page added 1191 * it was preempted by a writer. The writer moved forward onto 1192 * the new page added by the reader and is about to move forward 1193 * again. 1194 * 1195 * You can see, it is legitimate for the previous pointer of 1196 * the head (or any page) not to point back to itself. But only 1197 * temporarily. 1198 */ 1199 1200 #define RB_PAGE_NORMAL 0UL 1201 #define RB_PAGE_HEAD 1UL 1202 #define RB_PAGE_UPDATE 2UL 1203 1204 1205 #define RB_FLAG_MASK 3UL 1206 1207 /* PAGE_MOVED is not part of the mask */ 1208 #define RB_PAGE_MOVED 4UL 1209 1210 /* 1211 * rb_list_head - remove any bit 1212 */ 1213 static struct list_head *rb_list_head(struct list_head *list) 1214 { 1215 unsigned long val = (unsigned long)list; 1216 1217 return (struct list_head *)(val & ~RB_FLAG_MASK); 1218 } 1219 1220 /* 1221 * rb_is_head_page - test if the given page is the head page 1222 * 1223 * Because the reader may move the head_page pointer, we can 1224 * not trust what the head page is (it may be pointing to 1225 * the reader page). But if the next page is a header page, 1226 * its flags will be non zero. 1227 */ 1228 static inline int 1229 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1230 { 1231 unsigned long val; 1232 1233 val = (unsigned long)list->next; 1234 1235 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1236 return RB_PAGE_MOVED; 1237 1238 return val & RB_FLAG_MASK; 1239 } 1240 1241 /* 1242 * rb_is_reader_page 1243 * 1244 * The unique thing about the reader page, is that, if the 1245 * writer is ever on it, the previous pointer never points 1246 * back to the reader page. 1247 */ 1248 static bool rb_is_reader_page(struct buffer_page *page) 1249 { 1250 struct list_head *list = page->list.prev; 1251 1252 return rb_list_head(list->next) != &page->list; 1253 } 1254 1255 /* 1256 * rb_set_list_to_head - set a list_head to be pointing to head. 1257 */ 1258 static void rb_set_list_to_head(struct list_head *list) 1259 { 1260 unsigned long *ptr; 1261 1262 ptr = (unsigned long *)&list->next; 1263 *ptr |= RB_PAGE_HEAD; 1264 *ptr &= ~RB_PAGE_UPDATE; 1265 } 1266 1267 /* 1268 * rb_head_page_activate - sets up head page 1269 */ 1270 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1271 { 1272 struct buffer_page *head; 1273 1274 head = cpu_buffer->head_page; 1275 if (!head) 1276 return; 1277 1278 /* 1279 * Set the previous list pointer to have the HEAD flag. 1280 */ 1281 rb_set_list_to_head(head->list.prev); 1282 1283 if (cpu_buffer->ring_meta) { 1284 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1285 meta->head_buffer = (unsigned long)head->page; 1286 } 1287 } 1288 1289 static void rb_list_head_clear(struct list_head *list) 1290 { 1291 unsigned long *ptr = (unsigned long *)&list->next; 1292 1293 *ptr &= ~RB_FLAG_MASK; 1294 } 1295 1296 /* 1297 * rb_head_page_deactivate - clears head page ptr (for free list) 1298 */ 1299 static void 1300 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1301 { 1302 struct list_head *hd; 1303 1304 /* Go through the whole list and clear any pointers found. */ 1305 rb_list_head_clear(cpu_buffer->pages); 1306 1307 list_for_each(hd, cpu_buffer->pages) 1308 rb_list_head_clear(hd); 1309 } 1310 1311 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1312 struct buffer_page *head, 1313 struct buffer_page *prev, 1314 int old_flag, int new_flag) 1315 { 1316 struct list_head *list; 1317 unsigned long val = (unsigned long)&head->list; 1318 unsigned long ret; 1319 1320 list = &prev->list; 1321 1322 val &= ~RB_FLAG_MASK; 1323 1324 ret = cmpxchg((unsigned long *)&list->next, 1325 val | old_flag, val | new_flag); 1326 1327 /* check if the reader took the page */ 1328 if ((ret & ~RB_FLAG_MASK) != val) 1329 return RB_PAGE_MOVED; 1330 1331 return ret & RB_FLAG_MASK; 1332 } 1333 1334 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1335 struct buffer_page *head, 1336 struct buffer_page *prev, 1337 int old_flag) 1338 { 1339 return rb_head_page_set(cpu_buffer, head, prev, 1340 old_flag, RB_PAGE_UPDATE); 1341 } 1342 1343 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1344 struct buffer_page *head, 1345 struct buffer_page *prev, 1346 int old_flag) 1347 { 1348 return rb_head_page_set(cpu_buffer, head, prev, 1349 old_flag, RB_PAGE_HEAD); 1350 } 1351 1352 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1353 struct buffer_page *head, 1354 struct buffer_page *prev, 1355 int old_flag) 1356 { 1357 return rb_head_page_set(cpu_buffer, head, prev, 1358 old_flag, RB_PAGE_NORMAL); 1359 } 1360 1361 static inline void rb_inc_page(struct buffer_page **bpage) 1362 { 1363 struct list_head *p = rb_list_head((*bpage)->list.next); 1364 1365 *bpage = list_entry(p, struct buffer_page, list); 1366 } 1367 1368 static inline void rb_dec_page(struct buffer_page **bpage) 1369 { 1370 struct list_head *p = rb_list_head((*bpage)->list.prev); 1371 1372 *bpage = list_entry(p, struct buffer_page, list); 1373 } 1374 1375 static struct buffer_page * 1376 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1377 { 1378 struct buffer_page *head; 1379 struct buffer_page *page; 1380 struct list_head *list; 1381 int i; 1382 1383 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1384 return NULL; 1385 1386 /* sanity check */ 1387 list = cpu_buffer->pages; 1388 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1389 return NULL; 1390 1391 page = head = cpu_buffer->head_page; 1392 /* 1393 * It is possible that the writer moves the header behind 1394 * where we started, and we miss in one loop. 1395 * A second loop should grab the header, but we'll do 1396 * three loops just because I'm paranoid. 1397 */ 1398 for (i = 0; i < 3; i++) { 1399 do { 1400 if (rb_is_head_page(page, page->list.prev)) { 1401 cpu_buffer->head_page = page; 1402 return page; 1403 } 1404 rb_inc_page(&page); 1405 } while (page != head); 1406 } 1407 1408 RB_WARN_ON(cpu_buffer, 1); 1409 1410 return NULL; 1411 } 1412 1413 static bool rb_head_page_replace(struct buffer_page *old, 1414 struct buffer_page *new) 1415 { 1416 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1417 unsigned long val; 1418 1419 val = *ptr & ~RB_FLAG_MASK; 1420 val |= RB_PAGE_HEAD; 1421 1422 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1423 } 1424 1425 /* 1426 * rb_tail_page_update - move the tail page forward 1427 */ 1428 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1429 struct buffer_page *tail_page, 1430 struct buffer_page *next_page) 1431 { 1432 unsigned long old_entries; 1433 unsigned long old_write; 1434 1435 /* 1436 * The tail page now needs to be moved forward. 1437 * 1438 * We need to reset the tail page, but without messing 1439 * with possible erasing of data brought in by interrupts 1440 * that have moved the tail page and are currently on it. 1441 * 1442 * We add a counter to the write field to denote this. 1443 */ 1444 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1445 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1446 1447 /* 1448 * Just make sure we have seen our old_write and synchronize 1449 * with any interrupts that come in. 1450 */ 1451 barrier(); 1452 1453 /* 1454 * If the tail page is still the same as what we think 1455 * it is, then it is up to us to update the tail 1456 * pointer. 1457 */ 1458 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1459 /* Zero the write counter */ 1460 unsigned long val = old_write & ~RB_WRITE_MASK; 1461 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1462 1463 /* 1464 * This will only succeed if an interrupt did 1465 * not come in and change it. In which case, we 1466 * do not want to modify it. 1467 * 1468 * We add (void) to let the compiler know that we do not care 1469 * about the return value of these functions. We use the 1470 * cmpxchg to only update if an interrupt did not already 1471 * do it for us. If the cmpxchg fails, we don't care. 1472 */ 1473 (void)local_cmpxchg(&next_page->write, old_write, val); 1474 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1475 1476 /* 1477 * No need to worry about races with clearing out the commit. 1478 * it only can increment when a commit takes place. But that 1479 * only happens in the outer most nested commit. 1480 */ 1481 local_set(&next_page->page->commit, 0); 1482 1483 /* Either we update tail_page or an interrupt does */ 1484 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1485 local_inc(&cpu_buffer->pages_touched); 1486 } 1487 } 1488 1489 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1490 struct buffer_page *bpage) 1491 { 1492 unsigned long val = (unsigned long)bpage; 1493 1494 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1495 } 1496 1497 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1498 struct list_head *list) 1499 { 1500 if (RB_WARN_ON(cpu_buffer, 1501 rb_list_head(rb_list_head(list->next)->prev) != list)) 1502 return false; 1503 1504 if (RB_WARN_ON(cpu_buffer, 1505 rb_list_head(rb_list_head(list->prev)->next) != list)) 1506 return false; 1507 1508 return true; 1509 } 1510 1511 /** 1512 * rb_check_pages - integrity check of buffer pages 1513 * @cpu_buffer: CPU buffer with pages to test 1514 * 1515 * As a safety measure we check to make sure the data pages have not 1516 * been corrupted. 1517 */ 1518 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1519 { 1520 struct list_head *head, *tmp; 1521 unsigned long buffer_cnt; 1522 unsigned long flags; 1523 int nr_loops = 0; 1524 1525 /* 1526 * Walk the linked list underpinning the ring buffer and validate all 1527 * its next and prev links. 1528 * 1529 * The check acquires the reader_lock to avoid concurrent processing 1530 * with code that could be modifying the list. However, the lock cannot 1531 * be held for the entire duration of the walk, as this would make the 1532 * time when interrupts are disabled non-deterministic, dependent on the 1533 * ring buffer size. Therefore, the code releases and re-acquires the 1534 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1535 * is then used to detect if the list was modified while the lock was 1536 * not held, in which case the check needs to be restarted. 1537 * 1538 * The code attempts to perform the check at most three times before 1539 * giving up. This is acceptable because this is only a self-validation 1540 * to detect problems early on. In practice, the list modification 1541 * operations are fairly spaced, and so this check typically succeeds at 1542 * most on the second try. 1543 */ 1544 again: 1545 if (++nr_loops > 3) 1546 return; 1547 1548 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1549 head = rb_list_head(cpu_buffer->pages); 1550 if (!rb_check_links(cpu_buffer, head)) 1551 goto out_locked; 1552 buffer_cnt = cpu_buffer->cnt; 1553 tmp = head; 1554 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1555 1556 while (true) { 1557 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1558 1559 if (buffer_cnt != cpu_buffer->cnt) { 1560 /* The list was updated, try again. */ 1561 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1562 goto again; 1563 } 1564 1565 tmp = rb_list_head(tmp->next); 1566 if (tmp == head) 1567 /* The iteration circled back, all is done. */ 1568 goto out_locked; 1569 1570 if (!rb_check_links(cpu_buffer, tmp)) 1571 goto out_locked; 1572 1573 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1574 } 1575 1576 out_locked: 1577 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1578 } 1579 1580 /* 1581 * Take an address, add the meta data size as well as the array of 1582 * array subbuffer indexes, then align it to a subbuffer size. 1583 * 1584 * This is used to help find the next per cpu subbuffer within a mapped range. 1585 */ 1586 static unsigned long 1587 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1588 { 1589 addr += sizeof(struct ring_buffer_cpu_meta) + 1590 sizeof(int) * nr_subbufs; 1591 return ALIGN(addr, subbuf_size); 1592 } 1593 1594 /* 1595 * Return the ring_buffer_meta for a given @cpu. 1596 */ 1597 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1598 { 1599 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1600 struct ring_buffer_cpu_meta *meta; 1601 struct ring_buffer_meta *bmeta; 1602 unsigned long ptr; 1603 int nr_subbufs; 1604 1605 bmeta = buffer->meta; 1606 if (!bmeta) 1607 return NULL; 1608 1609 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1610 meta = (struct ring_buffer_cpu_meta *)ptr; 1611 1612 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1613 if (!nr_pages) { 1614 nr_subbufs = meta->nr_subbufs; 1615 } else { 1616 /* Include the reader page */ 1617 nr_subbufs = nr_pages + 1; 1618 } 1619 1620 /* 1621 * The first chunk may not be subbuffer aligned, where as 1622 * the rest of the chunks are. 1623 */ 1624 if (cpu) { 1625 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1626 ptr += subbuf_size * nr_subbufs; 1627 1628 /* We can use multiplication to find chunks greater than 1 */ 1629 if (cpu > 1) { 1630 unsigned long size; 1631 unsigned long p; 1632 1633 /* Save the beginning of this CPU chunk */ 1634 p = ptr; 1635 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1636 ptr += subbuf_size * nr_subbufs; 1637 1638 /* Now all chunks after this are the same size */ 1639 size = ptr - p; 1640 ptr += size * (cpu - 2); 1641 } 1642 } 1643 return (void *)ptr; 1644 } 1645 1646 /* Return the start of subbufs given the meta pointer */ 1647 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1648 { 1649 int subbuf_size = meta->subbuf_size; 1650 unsigned long ptr; 1651 1652 ptr = (unsigned long)meta; 1653 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1654 1655 return (void *)ptr; 1656 } 1657 1658 /* 1659 * Return a specific sub-buffer for a given @cpu defined by @idx. 1660 */ 1661 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1662 { 1663 struct ring_buffer_cpu_meta *meta; 1664 unsigned long ptr; 1665 int subbuf_size; 1666 1667 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1668 if (!meta) 1669 return NULL; 1670 1671 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1672 return NULL; 1673 1674 subbuf_size = meta->subbuf_size; 1675 1676 /* Map this buffer to the order that's in meta->buffers[] */ 1677 idx = meta->buffers[idx]; 1678 1679 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1680 1681 ptr += subbuf_size * idx; 1682 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1683 return NULL; 1684 1685 return (void *)ptr; 1686 } 1687 1688 /* 1689 * See if the existing memory contains a valid meta section. 1690 * if so, use that, otherwise initialize it. 1691 */ 1692 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1693 { 1694 unsigned long ptr = buffer->range_addr_start; 1695 struct ring_buffer_meta *bmeta; 1696 unsigned long total_size; 1697 int struct_sizes; 1698 1699 bmeta = (struct ring_buffer_meta *)ptr; 1700 buffer->meta = bmeta; 1701 1702 total_size = buffer->range_addr_end - buffer->range_addr_start; 1703 1704 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1705 struct_sizes |= sizeof(*bmeta) << 16; 1706 1707 /* The first buffer will start word size after the meta page */ 1708 ptr += sizeof(*bmeta); 1709 ptr = ALIGN(ptr, sizeof(long)); 1710 ptr += scratch_size; 1711 1712 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1713 pr_info("Ring buffer boot meta mismatch of magic\n"); 1714 goto init; 1715 } 1716 1717 if (bmeta->struct_sizes != struct_sizes) { 1718 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1719 goto init; 1720 } 1721 1722 if (bmeta->total_size != total_size) { 1723 pr_info("Ring buffer boot meta mismatch of total size\n"); 1724 goto init; 1725 } 1726 1727 if (bmeta->buffers_offset > bmeta->total_size) { 1728 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1729 goto init; 1730 } 1731 1732 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1733 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1734 goto init; 1735 } 1736 1737 return true; 1738 1739 init: 1740 bmeta->magic = RING_BUFFER_META_MAGIC; 1741 bmeta->struct_sizes = struct_sizes; 1742 bmeta->total_size = total_size; 1743 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1744 1745 /* Zero out the scratch pad */ 1746 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1747 1748 return false; 1749 } 1750 1751 /* 1752 * See if the existing memory contains valid ring buffer data. 1753 * As the previous kernel must be the same as this kernel, all 1754 * the calculations (size of buffers and number of buffers) 1755 * must be the same. 1756 */ 1757 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1758 struct trace_buffer *buffer, int nr_pages, 1759 unsigned long *subbuf_mask) 1760 { 1761 int subbuf_size = PAGE_SIZE; 1762 struct buffer_data_page *subbuf; 1763 unsigned long buffers_start; 1764 unsigned long buffers_end; 1765 int i; 1766 1767 if (!subbuf_mask) 1768 return false; 1769 1770 buffers_start = meta->first_buffer; 1771 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1772 1773 /* Is the head and commit buffers within the range of buffers? */ 1774 if (meta->head_buffer < buffers_start || 1775 meta->head_buffer >= buffers_end) { 1776 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1777 return false; 1778 } 1779 1780 if (meta->commit_buffer < buffers_start || 1781 meta->commit_buffer >= buffers_end) { 1782 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1783 return false; 1784 } 1785 1786 subbuf = rb_subbufs_from_meta(meta); 1787 1788 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1789 1790 /* Is the meta buffers and the subbufs themselves have correct data? */ 1791 for (i = 0; i < meta->nr_subbufs; i++) { 1792 if (meta->buffers[i] < 0 || 1793 meta->buffers[i] >= meta->nr_subbufs) { 1794 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1795 return false; 1796 } 1797 1798 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1799 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1800 return false; 1801 } 1802 1803 if (test_bit(meta->buffers[i], subbuf_mask)) { 1804 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1805 return false; 1806 } 1807 1808 set_bit(meta->buffers[i], subbuf_mask); 1809 subbuf = (void *)subbuf + subbuf_size; 1810 } 1811 1812 return true; 1813 } 1814 1815 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1816 1817 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1818 unsigned long long *timestamp, u64 *delta_ptr) 1819 { 1820 struct ring_buffer_event *event; 1821 u64 ts, delta; 1822 int events = 0; 1823 int len; 1824 int e; 1825 1826 *delta_ptr = 0; 1827 *timestamp = 0; 1828 1829 ts = dpage->time_stamp; 1830 1831 for (e = 0; e < tail; e += len) { 1832 1833 event = (struct ring_buffer_event *)(dpage->data + e); 1834 len = rb_event_length(event); 1835 if (len <= 0 || len > tail - e) 1836 return -1; 1837 1838 switch (event->type_len) { 1839 1840 case RINGBUF_TYPE_TIME_EXTEND: 1841 delta = rb_event_time_stamp(event); 1842 ts += delta; 1843 break; 1844 1845 case RINGBUF_TYPE_TIME_STAMP: 1846 delta = rb_event_time_stamp(event); 1847 delta = rb_fix_abs_ts(delta, ts); 1848 if (delta < ts) { 1849 *delta_ptr = delta; 1850 *timestamp = ts; 1851 return -1; 1852 } 1853 ts = delta; 1854 break; 1855 1856 case RINGBUF_TYPE_PADDING: 1857 if (event->time_delta == 1) 1858 break; 1859 fallthrough; 1860 case RINGBUF_TYPE_DATA: 1861 events++; 1862 ts += event->time_delta; 1863 break; 1864 1865 default: 1866 return -1; 1867 } 1868 } 1869 *timestamp = ts; 1870 return events; 1871 } 1872 1873 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1874 { 1875 unsigned long long ts; 1876 u64 delta; 1877 int tail; 1878 1879 tail = local_read(&dpage->commit); 1880 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1881 } 1882 1883 /* If the meta data has been validated, now validate the events */ 1884 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1885 { 1886 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1887 struct buffer_page *head_page, *orig_head; 1888 unsigned long entry_bytes = 0; 1889 unsigned long entries = 0; 1890 int ret; 1891 u64 ts; 1892 int i; 1893 1894 if (!meta || !meta->head_buffer) 1895 return; 1896 1897 orig_head = head_page = cpu_buffer->head_page; 1898 1899 /* Do the reader page first */ 1900 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1901 if (ret < 0) { 1902 pr_info("Ring buffer reader page is invalid\n"); 1903 goto invalid; 1904 } 1905 entries += ret; 1906 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1907 local_set(&cpu_buffer->reader_page->entries, ret); 1908 1909 ts = head_page->page->time_stamp; 1910 1911 /* 1912 * Try to rewind the head so that we can read the pages which already 1913 * read in the previous boot. 1914 */ 1915 if (head_page == cpu_buffer->tail_page) 1916 goto skip_rewind; 1917 1918 rb_dec_page(&head_page); 1919 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1920 1921 /* Rewind until tail (writer) page. */ 1922 if (head_page == cpu_buffer->tail_page) 1923 break; 1924 1925 /* Ensure the page has older data than head. */ 1926 if (ts < head_page->page->time_stamp) 1927 break; 1928 1929 ts = head_page->page->time_stamp; 1930 /* Ensure the page has correct timestamp and some data. */ 1931 if (!ts || rb_page_commit(head_page) == 0) 1932 break; 1933 1934 /* Stop rewind if the page is invalid. */ 1935 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1936 if (ret < 0) 1937 break; 1938 1939 /* Recover the number of entries and update stats. */ 1940 local_set(&head_page->entries, ret); 1941 if (ret) 1942 local_inc(&cpu_buffer->pages_touched); 1943 entries += ret; 1944 entry_bytes += rb_page_commit(head_page); 1945 } 1946 if (i) 1947 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1948 1949 /* The last rewound page must be skipped. */ 1950 if (head_page != orig_head) 1951 rb_inc_page(&head_page); 1952 1953 /* 1954 * If the ring buffer was rewound, then inject the reader page 1955 * into the location just before the original head page. 1956 */ 1957 if (head_page != orig_head) { 1958 struct buffer_page *bpage = orig_head; 1959 1960 rb_dec_page(&bpage); 1961 /* 1962 * Insert the reader_page before the original head page. 1963 * Since the list encode RB_PAGE flags, general list 1964 * operations should be avoided. 1965 */ 1966 cpu_buffer->reader_page->list.next = &orig_head->list; 1967 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1968 orig_head->list.prev = &cpu_buffer->reader_page->list; 1969 bpage->list.next = &cpu_buffer->reader_page->list; 1970 1971 /* Make the head_page the reader page */ 1972 cpu_buffer->reader_page = head_page; 1973 bpage = head_page; 1974 rb_inc_page(&head_page); 1975 head_page->list.prev = bpage->list.prev; 1976 rb_dec_page(&bpage); 1977 bpage->list.next = &head_page->list; 1978 rb_set_list_to_head(&bpage->list); 1979 cpu_buffer->pages = &head_page->list; 1980 1981 cpu_buffer->head_page = head_page; 1982 meta->head_buffer = (unsigned long)head_page->page; 1983 1984 /* Reset all the indexes */ 1985 bpage = cpu_buffer->reader_page; 1986 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 1987 bpage->id = 0; 1988 1989 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 1990 i++, rb_inc_page(&bpage)) { 1991 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 1992 bpage->id = i; 1993 } 1994 1995 /* We'll restart verifying from orig_head */ 1996 head_page = orig_head; 1997 } 1998 1999 skip_rewind: 2000 /* If the commit_buffer is the reader page, update the commit page */ 2001 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2002 cpu_buffer->commit_page = cpu_buffer->reader_page; 2003 /* Nothing more to do, the only page is the reader page */ 2004 goto done; 2005 } 2006 2007 /* Iterate until finding the commit page */ 2008 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2009 2010 /* Reader page has already been done */ 2011 if (head_page == cpu_buffer->reader_page) 2012 continue; 2013 2014 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2015 if (ret < 0) { 2016 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2017 cpu_buffer->cpu); 2018 goto invalid; 2019 } 2020 2021 /* If the buffer has content, update pages_touched */ 2022 if (ret) 2023 local_inc(&cpu_buffer->pages_touched); 2024 2025 entries += ret; 2026 entry_bytes += local_read(&head_page->page->commit); 2027 local_set(&head_page->entries, ret); 2028 2029 if (head_page == cpu_buffer->commit_page) 2030 break; 2031 } 2032 2033 if (head_page != cpu_buffer->commit_page) { 2034 pr_info("Ring buffer meta [%d] commit page not found\n", 2035 cpu_buffer->cpu); 2036 goto invalid; 2037 } 2038 done: 2039 local_set(&cpu_buffer->entries, entries); 2040 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2041 2042 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2043 return; 2044 2045 invalid: 2046 /* The content of the buffers are invalid, reset the meta data */ 2047 meta->head_buffer = 0; 2048 meta->commit_buffer = 0; 2049 2050 /* Reset the reader page */ 2051 local_set(&cpu_buffer->reader_page->entries, 0); 2052 local_set(&cpu_buffer->reader_page->page->commit, 0); 2053 2054 /* Reset all the subbuffers */ 2055 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2056 local_set(&head_page->entries, 0); 2057 local_set(&head_page->page->commit, 0); 2058 } 2059 } 2060 2061 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2062 { 2063 struct ring_buffer_cpu_meta *meta; 2064 unsigned long *subbuf_mask; 2065 unsigned long delta; 2066 void *subbuf; 2067 bool valid = false; 2068 int cpu; 2069 int i; 2070 2071 /* Create a mask to test the subbuf array */ 2072 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2073 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2074 2075 if (rb_meta_init(buffer, scratch_size)) 2076 valid = true; 2077 2078 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2079 void *next_meta; 2080 2081 meta = rb_range_meta(buffer, nr_pages, cpu); 2082 2083 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2084 /* Make the mappings match the current address */ 2085 subbuf = rb_subbufs_from_meta(meta); 2086 delta = (unsigned long)subbuf - meta->first_buffer; 2087 meta->first_buffer += delta; 2088 meta->head_buffer += delta; 2089 meta->commit_buffer += delta; 2090 continue; 2091 } 2092 2093 if (cpu < nr_cpu_ids - 1) 2094 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2095 else 2096 next_meta = (void *)buffer->range_addr_end; 2097 2098 memset(meta, 0, next_meta - (void *)meta); 2099 2100 meta->nr_subbufs = nr_pages + 1; 2101 meta->subbuf_size = PAGE_SIZE; 2102 2103 subbuf = rb_subbufs_from_meta(meta); 2104 2105 meta->first_buffer = (unsigned long)subbuf; 2106 2107 /* 2108 * The buffers[] array holds the order of the sub-buffers 2109 * that are after the meta data. The sub-buffers may 2110 * be swapped out when read and inserted into a different 2111 * location of the ring buffer. Although their addresses 2112 * remain the same, the buffers[] array contains the 2113 * index into the sub-buffers holding their actual order. 2114 */ 2115 for (i = 0; i < meta->nr_subbufs; i++) { 2116 meta->buffers[i] = i; 2117 rb_init_page(subbuf); 2118 subbuf += meta->subbuf_size; 2119 } 2120 } 2121 bitmap_free(subbuf_mask); 2122 } 2123 2124 static void *rbm_start(struct seq_file *m, loff_t *pos) 2125 { 2126 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2127 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2128 unsigned long val; 2129 2130 if (!meta) 2131 return NULL; 2132 2133 if (*pos > meta->nr_subbufs) 2134 return NULL; 2135 2136 val = *pos; 2137 val++; 2138 2139 return (void *)val; 2140 } 2141 2142 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2143 { 2144 (*pos)++; 2145 2146 return rbm_start(m, pos); 2147 } 2148 2149 static int rbm_show(struct seq_file *m, void *v) 2150 { 2151 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2152 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2153 unsigned long val = (unsigned long)v; 2154 2155 if (val == 1) { 2156 seq_printf(m, "head_buffer: %d\n", 2157 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2158 seq_printf(m, "commit_buffer: %d\n", 2159 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2160 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2161 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2162 return 0; 2163 } 2164 2165 val -= 2; 2166 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2167 2168 return 0; 2169 } 2170 2171 static void rbm_stop(struct seq_file *m, void *p) 2172 { 2173 } 2174 2175 static const struct seq_operations rb_meta_seq_ops = { 2176 .start = rbm_start, 2177 .next = rbm_next, 2178 .show = rbm_show, 2179 .stop = rbm_stop, 2180 }; 2181 2182 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2183 { 2184 struct seq_file *m; 2185 int ret; 2186 2187 ret = seq_open(file, &rb_meta_seq_ops); 2188 if (ret) 2189 return ret; 2190 2191 m = file->private_data; 2192 m->private = buffer->buffers[cpu]; 2193 2194 return 0; 2195 } 2196 2197 /* Map the buffer_pages to the previous head and commit pages */ 2198 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2199 struct buffer_page *bpage) 2200 { 2201 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2202 2203 if (meta->head_buffer == (unsigned long)bpage->page) 2204 cpu_buffer->head_page = bpage; 2205 2206 if (meta->commit_buffer == (unsigned long)bpage->page) { 2207 cpu_buffer->commit_page = bpage; 2208 cpu_buffer->tail_page = bpage; 2209 } 2210 } 2211 2212 static struct ring_buffer_desc *ring_buffer_desc(struct trace_buffer_desc *trace_desc, int cpu) 2213 { 2214 struct ring_buffer_desc *desc, *end; 2215 size_t len; 2216 int i; 2217 2218 if (!trace_desc) 2219 return NULL; 2220 2221 if (cpu >= trace_desc->nr_cpus) 2222 return NULL; 2223 2224 end = (struct ring_buffer_desc *)((void *)trace_desc + trace_desc->struct_len); 2225 desc = __first_ring_buffer_desc(trace_desc); 2226 len = struct_size(desc, page_va, desc->nr_page_va); 2227 desc = (struct ring_buffer_desc *)((void *)desc + (len * cpu)); 2228 2229 if (desc < end && desc->cpu == cpu) 2230 return desc; 2231 2232 /* Missing CPUs, need to linear search */ 2233 for_each_ring_buffer_desc(desc, i, trace_desc) { 2234 if (desc->cpu == cpu) 2235 return desc; 2236 } 2237 2238 return NULL; 2239 } 2240 2241 static void *ring_buffer_desc_page(struct ring_buffer_desc *desc, int page_id) 2242 { 2243 return page_id > desc->nr_page_va ? NULL : (void *)desc->page_va[page_id]; 2244 } 2245 2246 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2247 long nr_pages, struct list_head *pages) 2248 { 2249 struct trace_buffer *buffer = cpu_buffer->buffer; 2250 struct ring_buffer_cpu_meta *meta = NULL; 2251 struct buffer_page *bpage, *tmp; 2252 bool user_thread = current->mm != NULL; 2253 struct ring_buffer_desc *desc = NULL; 2254 long i; 2255 2256 /* 2257 * Check if the available memory is there first. 2258 * Note, si_mem_available() only gives us a rough estimate of available 2259 * memory. It may not be accurate. But we don't care, we just want 2260 * to prevent doing any allocation when it is obvious that it is 2261 * not going to succeed. 2262 */ 2263 i = si_mem_available(); 2264 if (i < nr_pages) 2265 return -ENOMEM; 2266 2267 /* 2268 * If a user thread allocates too much, and si_mem_available() 2269 * reports there's enough memory, even though there is not. 2270 * Make sure the OOM killer kills this thread. This can happen 2271 * even with RETRY_MAYFAIL because another task may be doing 2272 * an allocation after this task has taken all memory. 2273 * This is the task the OOM killer needs to take out during this 2274 * loop, even if it was triggered by an allocation somewhere else. 2275 */ 2276 if (user_thread) 2277 set_current_oom_origin(); 2278 2279 if (buffer->range_addr_start) 2280 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2281 2282 if (buffer->remote) { 2283 desc = ring_buffer_desc(buffer->remote->desc, cpu_buffer->cpu); 2284 if (!desc || WARN_ON(desc->nr_page_va != (nr_pages + 1))) 2285 return -EINVAL; 2286 } 2287 2288 for (i = 0; i < nr_pages; i++) { 2289 2290 bpage = alloc_cpu_page(cpu_buffer->cpu); 2291 if (!bpage) 2292 goto free_pages; 2293 2294 rb_check_bpage(cpu_buffer, bpage); 2295 2296 /* 2297 * Append the pages as for mapped buffers we want to keep 2298 * the order 2299 */ 2300 list_add_tail(&bpage->list, pages); 2301 2302 if (meta) { 2303 /* A range was given. Use that for the buffer page */ 2304 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2305 if (!bpage->page) 2306 goto free_pages; 2307 /* If this is valid from a previous boot */ 2308 if (meta->head_buffer) 2309 rb_meta_buffer_update(cpu_buffer, bpage); 2310 bpage->range = 1; 2311 bpage->id = i + 1; 2312 } else if (desc) { 2313 void *p = ring_buffer_desc_page(desc, i + 1); 2314 2315 if (WARN_ON(!p)) 2316 goto free_pages; 2317 2318 bpage->page = p; 2319 bpage->range = 1; /* bpage->page can't be freed */ 2320 bpage->id = i + 1; 2321 cpu_buffer->subbuf_ids[i + 1] = bpage; 2322 } else { 2323 int order = cpu_buffer->buffer->subbuf_order; 2324 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2325 if (!bpage->page) 2326 goto free_pages; 2327 } 2328 bpage->order = cpu_buffer->buffer->subbuf_order; 2329 2330 if (user_thread && fatal_signal_pending(current)) 2331 goto free_pages; 2332 } 2333 if (user_thread) 2334 clear_current_oom_origin(); 2335 2336 return 0; 2337 2338 free_pages: 2339 list_for_each_entry_safe(bpage, tmp, pages, list) { 2340 list_del_init(&bpage->list); 2341 free_buffer_page(bpage); 2342 } 2343 if (user_thread) 2344 clear_current_oom_origin(); 2345 2346 return -ENOMEM; 2347 } 2348 2349 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2350 unsigned long nr_pages) 2351 { 2352 LIST_HEAD(pages); 2353 2354 WARN_ON(!nr_pages); 2355 2356 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2357 return -ENOMEM; 2358 2359 /* 2360 * The ring buffer page list is a circular list that does not 2361 * start and end with a list head. All page list items point to 2362 * other pages. 2363 */ 2364 cpu_buffer->pages = pages.next; 2365 list_del(&pages); 2366 2367 cpu_buffer->nr_pages = nr_pages; 2368 2369 rb_check_pages(cpu_buffer); 2370 2371 return 0; 2372 } 2373 2374 static struct ring_buffer_per_cpu * 2375 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2376 { 2377 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2378 alloc_cpu_buffer(cpu); 2379 struct ring_buffer_cpu_meta *meta; 2380 struct buffer_page *bpage; 2381 int ret; 2382 2383 if (!cpu_buffer) 2384 return NULL; 2385 2386 cpu_buffer->cpu = cpu; 2387 cpu_buffer->buffer = buffer; 2388 raw_spin_lock_init(&cpu_buffer->reader_lock); 2389 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2390 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2391 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2392 init_completion(&cpu_buffer->update_done); 2393 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2394 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2395 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2396 mutex_init(&cpu_buffer->mapping_lock); 2397 2398 bpage = alloc_cpu_page(cpu); 2399 if (!bpage) 2400 return NULL; 2401 2402 rb_check_bpage(cpu_buffer, bpage); 2403 2404 cpu_buffer->reader_page = bpage; 2405 2406 if (buffer->range_addr_start) { 2407 /* 2408 * Range mapped buffers have the same restrictions as memory 2409 * mapped ones do. 2410 */ 2411 cpu_buffer->mapped = 1; 2412 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2413 bpage->page = rb_range_buffer(cpu_buffer, 0); 2414 if (!bpage->page) 2415 goto fail_free_reader; 2416 if (cpu_buffer->ring_meta->head_buffer) 2417 rb_meta_buffer_update(cpu_buffer, bpage); 2418 bpage->range = 1; 2419 } else if (buffer->remote) { 2420 struct ring_buffer_desc *desc = ring_buffer_desc(buffer->remote->desc, cpu); 2421 2422 if (!desc) 2423 goto fail_free_reader; 2424 2425 cpu_buffer->remote = buffer->remote; 2426 cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)desc->meta_va; 2427 cpu_buffer->nr_pages = nr_pages; 2428 cpu_buffer->subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, 2429 sizeof(*cpu_buffer->subbuf_ids), GFP_KERNEL); 2430 if (!cpu_buffer->subbuf_ids) 2431 goto fail_free_reader; 2432 2433 /* Remote buffers are read-only and immutable */ 2434 atomic_inc(&cpu_buffer->record_disabled); 2435 atomic_inc(&cpu_buffer->resize_disabled); 2436 2437 bpage->page = ring_buffer_desc_page(desc, cpu_buffer->meta_page->reader.id); 2438 if (!bpage->page) 2439 goto fail_free_reader; 2440 2441 bpage->range = 1; 2442 cpu_buffer->subbuf_ids[0] = bpage; 2443 } else { 2444 int order = cpu_buffer->buffer->subbuf_order; 2445 bpage->page = alloc_cpu_data(cpu, order); 2446 if (!bpage->page) 2447 goto fail_free_reader; 2448 } 2449 2450 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2451 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2452 2453 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2454 if (ret < 0) 2455 goto fail_free_reader; 2456 2457 rb_meta_validate_events(cpu_buffer); 2458 2459 /* If the boot meta was valid then this has already been updated */ 2460 meta = cpu_buffer->ring_meta; 2461 if (!meta || !meta->head_buffer || 2462 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2463 if (meta && meta->head_buffer && 2464 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2465 pr_warn("Ring buffer meta buffers not all mapped\n"); 2466 if (!cpu_buffer->head_page) 2467 pr_warn(" Missing head_page\n"); 2468 if (!cpu_buffer->commit_page) 2469 pr_warn(" Missing commit_page\n"); 2470 if (!cpu_buffer->tail_page) 2471 pr_warn(" Missing tail_page\n"); 2472 } 2473 2474 cpu_buffer->head_page 2475 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2476 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2477 2478 rb_head_page_activate(cpu_buffer); 2479 2480 if (cpu_buffer->ring_meta) 2481 meta->commit_buffer = meta->head_buffer; 2482 } else { 2483 /* The valid meta buffer still needs to activate the head page */ 2484 rb_head_page_activate(cpu_buffer); 2485 } 2486 2487 return_ptr(cpu_buffer); 2488 2489 fail_free_reader: 2490 free_buffer_page(cpu_buffer->reader_page); 2491 2492 return NULL; 2493 } 2494 2495 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2496 { 2497 struct list_head *head = cpu_buffer->pages; 2498 struct buffer_page *bpage, *tmp; 2499 2500 irq_work_sync(&cpu_buffer->irq_work.work); 2501 2502 if (cpu_buffer->remote) 2503 kfree(cpu_buffer->subbuf_ids); 2504 2505 free_buffer_page(cpu_buffer->reader_page); 2506 2507 if (head) { 2508 rb_head_page_deactivate(cpu_buffer); 2509 2510 list_for_each_entry_safe(bpage, tmp, head, list) { 2511 list_del_init(&bpage->list); 2512 free_buffer_page(bpage); 2513 } 2514 bpage = list_entry(head, struct buffer_page, list); 2515 free_buffer_page(bpage); 2516 } 2517 2518 free_page((unsigned long)cpu_buffer->free_page); 2519 2520 kfree(cpu_buffer); 2521 } 2522 2523 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2524 int order, unsigned long start, 2525 unsigned long end, 2526 unsigned long scratch_size, 2527 struct lock_class_key *key, 2528 struct ring_buffer_remote *remote) 2529 { 2530 struct trace_buffer *buffer __free(kfree) = NULL; 2531 long nr_pages; 2532 int subbuf_size; 2533 int bsize; 2534 int cpu; 2535 int ret; 2536 2537 /* keep it in its own cache line */ 2538 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2539 GFP_KERNEL); 2540 if (!buffer) 2541 return NULL; 2542 2543 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2544 return NULL; 2545 2546 buffer->subbuf_order = order; 2547 subbuf_size = (PAGE_SIZE << order); 2548 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2549 2550 /* Max payload is buffer page size - header (8bytes) */ 2551 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2552 2553 buffer->flags = flags; 2554 buffer->clock = trace_clock_local; 2555 buffer->reader_lock_key = key; 2556 2557 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2558 init_waitqueue_head(&buffer->irq_work.waiters); 2559 2560 buffer->cpus = nr_cpu_ids; 2561 2562 bsize = sizeof(void *) * nr_cpu_ids; 2563 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2564 GFP_KERNEL); 2565 if (!buffer->buffers) 2566 goto fail_free_cpumask; 2567 2568 cpu = raw_smp_processor_id(); 2569 2570 /* If start/end are specified, then that overrides size */ 2571 if (start && end) { 2572 unsigned long buffers_start; 2573 unsigned long ptr; 2574 int n; 2575 2576 /* Make sure that start is word aligned */ 2577 start = ALIGN(start, sizeof(long)); 2578 2579 /* scratch_size needs to be aligned too */ 2580 scratch_size = ALIGN(scratch_size, sizeof(long)); 2581 2582 /* Subtract the buffer meta data and word aligned */ 2583 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2584 buffers_start = ALIGN(buffers_start, sizeof(long)); 2585 buffers_start += scratch_size; 2586 2587 /* Calculate the size for the per CPU data */ 2588 size = end - buffers_start; 2589 size = size / nr_cpu_ids; 2590 2591 /* 2592 * The number of sub-buffers (nr_pages) is determined by the 2593 * total size allocated minus the meta data size. 2594 * Then that is divided by the number of per CPU buffers 2595 * needed, plus account for the integer array index that 2596 * will be appended to the meta data. 2597 */ 2598 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2599 (subbuf_size + sizeof(int)); 2600 /* Need at least two pages plus the reader page */ 2601 if (nr_pages < 3) 2602 goto fail_free_buffers; 2603 2604 again: 2605 /* Make sure that the size fits aligned */ 2606 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2607 ptr += sizeof(struct ring_buffer_cpu_meta) + 2608 sizeof(int) * nr_pages; 2609 ptr = ALIGN(ptr, subbuf_size); 2610 ptr += subbuf_size * nr_pages; 2611 } 2612 if (ptr > end) { 2613 if (nr_pages <= 3) 2614 goto fail_free_buffers; 2615 nr_pages--; 2616 goto again; 2617 } 2618 2619 /* nr_pages should not count the reader page */ 2620 nr_pages--; 2621 buffer->range_addr_start = start; 2622 buffer->range_addr_end = end; 2623 2624 rb_range_meta_init(buffer, nr_pages, scratch_size); 2625 } else if (remote) { 2626 struct ring_buffer_desc *desc = ring_buffer_desc(remote->desc, cpu); 2627 2628 buffer->remote = remote; 2629 /* The writer is remote. This ring-buffer is read-only */ 2630 atomic_inc(&buffer->record_disabled); 2631 nr_pages = desc->nr_page_va - 1; 2632 if (nr_pages < 2) 2633 goto fail_free_buffers; 2634 } else { 2635 2636 /* need at least two pages */ 2637 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2638 if (nr_pages < 2) 2639 nr_pages = 2; 2640 } 2641 2642 cpumask_set_cpu(cpu, buffer->cpumask); 2643 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2644 if (!buffer->buffers[cpu]) 2645 goto fail_free_buffers; 2646 2647 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2648 if (ret < 0) 2649 goto fail_free_buffers; 2650 2651 mutex_init(&buffer->mutex); 2652 2653 return_ptr(buffer); 2654 2655 fail_free_buffers: 2656 for_each_buffer_cpu(buffer, cpu) { 2657 if (buffer->buffers[cpu]) 2658 rb_free_cpu_buffer(buffer->buffers[cpu]); 2659 } 2660 kfree(buffer->buffers); 2661 2662 fail_free_cpumask: 2663 free_cpumask_var(buffer->cpumask); 2664 2665 return NULL; 2666 } 2667 2668 /** 2669 * __ring_buffer_alloc - allocate a new ring_buffer 2670 * @size: the size in bytes per cpu that is needed. 2671 * @flags: attributes to set for the ring buffer. 2672 * @key: ring buffer reader_lock_key. 2673 * 2674 * Currently the only flag that is available is the RB_FL_OVERWRITE 2675 * flag. This flag means that the buffer will overwrite old data 2676 * when the buffer wraps. If this flag is not set, the buffer will 2677 * drop data when the tail hits the head. 2678 */ 2679 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2680 struct lock_class_key *key) 2681 { 2682 /* Default buffer page size - one system page */ 2683 return alloc_buffer(size, flags, 0, 0, 0, 0, key, NULL); 2684 2685 } 2686 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2687 2688 /** 2689 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2690 * @size: the size in bytes per cpu that is needed. 2691 * @flags: attributes to set for the ring buffer. 2692 * @order: sub-buffer order 2693 * @start: start of allocated range 2694 * @range_size: size of allocated range 2695 * @scratch_size: size of scratch area (for preallocated memory buffers) 2696 * @key: ring buffer reader_lock_key. 2697 * 2698 * Currently the only flag that is available is the RB_FL_OVERWRITE 2699 * flag. This flag means that the buffer will overwrite old data 2700 * when the buffer wraps. If this flag is not set, the buffer will 2701 * drop data when the tail hits the head. 2702 */ 2703 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2704 int order, unsigned long start, 2705 unsigned long range_size, 2706 unsigned long scratch_size, 2707 struct lock_class_key *key) 2708 { 2709 return alloc_buffer(size, flags, order, start, start + range_size, 2710 scratch_size, key, NULL); 2711 } 2712 2713 /** 2714 * __ring_buffer_alloc_remote - allocate a new ring_buffer from a remote 2715 * @remote: Contains a description of the ring-buffer pages and remote callbacks. 2716 * @key: ring buffer reader_lock_key. 2717 */ 2718 struct trace_buffer *__ring_buffer_alloc_remote(struct ring_buffer_remote *remote, 2719 struct lock_class_key *key) 2720 { 2721 return alloc_buffer(0, 0, 0, 0, 0, 0, key, remote); 2722 } 2723 2724 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2725 { 2726 struct ring_buffer_meta *meta; 2727 void *ptr; 2728 2729 if (!buffer || !buffer->meta) 2730 return NULL; 2731 2732 meta = buffer->meta; 2733 2734 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2735 2736 if (size) 2737 *size = (void *)meta + meta->buffers_offset - ptr; 2738 2739 return ptr; 2740 } 2741 2742 /** 2743 * ring_buffer_free - free a ring buffer. 2744 * @buffer: the buffer to free. 2745 */ 2746 void 2747 ring_buffer_free(struct trace_buffer *buffer) 2748 { 2749 int cpu; 2750 2751 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2752 2753 irq_work_sync(&buffer->irq_work.work); 2754 2755 for_each_buffer_cpu(buffer, cpu) 2756 rb_free_cpu_buffer(buffer->buffers[cpu]); 2757 2758 kfree(buffer->buffers); 2759 free_cpumask_var(buffer->cpumask); 2760 2761 kfree(buffer); 2762 } 2763 EXPORT_SYMBOL_GPL(ring_buffer_free); 2764 2765 void ring_buffer_set_clock(struct trace_buffer *buffer, 2766 u64 (*clock)(void)) 2767 { 2768 buffer->clock = clock; 2769 } 2770 2771 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2772 { 2773 buffer->time_stamp_abs = abs; 2774 } 2775 2776 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2777 { 2778 return buffer->time_stamp_abs; 2779 } 2780 2781 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2782 { 2783 return local_read(&bpage->entries) & RB_WRITE_MASK; 2784 } 2785 2786 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2787 { 2788 return local_read(&bpage->write) & RB_WRITE_MASK; 2789 } 2790 2791 static bool 2792 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2793 { 2794 struct list_head *tail_page, *to_remove, *next_page; 2795 struct buffer_page *to_remove_page, *tmp_iter_page; 2796 struct buffer_page *last_page, *first_page; 2797 unsigned long nr_removed; 2798 unsigned long head_bit; 2799 int page_entries; 2800 2801 head_bit = 0; 2802 2803 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2804 atomic_inc(&cpu_buffer->record_disabled); 2805 /* 2806 * We don't race with the readers since we have acquired the reader 2807 * lock. We also don't race with writers after disabling recording. 2808 * This makes it easy to figure out the first and the last page to be 2809 * removed from the list. We unlink all the pages in between including 2810 * the first and last pages. This is done in a busy loop so that we 2811 * lose the least number of traces. 2812 * The pages are freed after we restart recording and unlock readers. 2813 */ 2814 tail_page = &cpu_buffer->tail_page->list; 2815 2816 /* 2817 * tail page might be on reader page, we remove the next page 2818 * from the ring buffer 2819 */ 2820 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2821 tail_page = rb_list_head(tail_page->next); 2822 to_remove = tail_page; 2823 2824 /* start of pages to remove */ 2825 first_page = list_entry(rb_list_head(to_remove->next), 2826 struct buffer_page, list); 2827 2828 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2829 to_remove = rb_list_head(to_remove)->next; 2830 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2831 } 2832 /* Read iterators need to reset themselves when some pages removed */ 2833 cpu_buffer->pages_removed += nr_removed; 2834 2835 next_page = rb_list_head(to_remove)->next; 2836 2837 /* 2838 * Now we remove all pages between tail_page and next_page. 2839 * Make sure that we have head_bit value preserved for the 2840 * next page 2841 */ 2842 tail_page->next = (struct list_head *)((unsigned long)next_page | 2843 head_bit); 2844 next_page = rb_list_head(next_page); 2845 next_page->prev = tail_page; 2846 2847 /* make sure pages points to a valid page in the ring buffer */ 2848 cpu_buffer->pages = next_page; 2849 cpu_buffer->cnt++; 2850 2851 /* update head page */ 2852 if (head_bit) 2853 cpu_buffer->head_page = list_entry(next_page, 2854 struct buffer_page, list); 2855 2856 /* pages are removed, resume tracing and then free the pages */ 2857 atomic_dec(&cpu_buffer->record_disabled); 2858 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2859 2860 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2861 2862 /* last buffer page to remove */ 2863 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2864 list); 2865 tmp_iter_page = first_page; 2866 2867 do { 2868 cond_resched(); 2869 2870 to_remove_page = tmp_iter_page; 2871 rb_inc_page(&tmp_iter_page); 2872 2873 /* update the counters */ 2874 page_entries = rb_page_entries(to_remove_page); 2875 if (page_entries) { 2876 /* 2877 * If something was added to this page, it was full 2878 * since it is not the tail page. So we deduct the 2879 * bytes consumed in ring buffer from here. 2880 * Increment overrun to account for the lost events. 2881 */ 2882 local_add(page_entries, &cpu_buffer->overrun); 2883 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2884 local_inc(&cpu_buffer->pages_lost); 2885 } 2886 2887 /* 2888 * We have already removed references to this list item, just 2889 * free up the buffer_page and its page 2890 */ 2891 free_buffer_page(to_remove_page); 2892 nr_removed--; 2893 2894 } while (to_remove_page != last_page); 2895 2896 RB_WARN_ON(cpu_buffer, nr_removed); 2897 2898 return nr_removed == 0; 2899 } 2900 2901 static bool 2902 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2903 { 2904 struct list_head *pages = &cpu_buffer->new_pages; 2905 unsigned long flags; 2906 bool success; 2907 int retries; 2908 2909 /* Can be called at early boot up, where interrupts must not been enabled */ 2910 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2911 /* 2912 * We are holding the reader lock, so the reader page won't be swapped 2913 * in the ring buffer. Now we are racing with the writer trying to 2914 * move head page and the tail page. 2915 * We are going to adapt the reader page update process where: 2916 * 1. We first splice the start and end of list of new pages between 2917 * the head page and its previous page. 2918 * 2. We cmpxchg the prev_page->next to point from head page to the 2919 * start of new pages list. 2920 * 3. Finally, we update the head->prev to the end of new list. 2921 * 2922 * We will try this process 10 times, to make sure that we don't keep 2923 * spinning. 2924 */ 2925 retries = 10; 2926 success = false; 2927 while (retries--) { 2928 struct list_head *head_page, *prev_page; 2929 struct list_head *last_page, *first_page; 2930 struct list_head *head_page_with_bit; 2931 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2932 2933 if (!hpage) 2934 break; 2935 head_page = &hpage->list; 2936 prev_page = head_page->prev; 2937 2938 first_page = pages->next; 2939 last_page = pages->prev; 2940 2941 head_page_with_bit = (struct list_head *) 2942 ((unsigned long)head_page | RB_PAGE_HEAD); 2943 2944 last_page->next = head_page_with_bit; 2945 first_page->prev = prev_page; 2946 2947 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2948 if (try_cmpxchg(&prev_page->next, 2949 &head_page_with_bit, first_page)) { 2950 /* 2951 * yay, we replaced the page pointer to our new list, 2952 * now, we just have to update to head page's prev 2953 * pointer to point to end of list 2954 */ 2955 head_page->prev = last_page; 2956 cpu_buffer->cnt++; 2957 success = true; 2958 break; 2959 } 2960 } 2961 2962 if (success) 2963 INIT_LIST_HEAD(pages); 2964 /* 2965 * If we weren't successful in adding in new pages, warn and stop 2966 * tracing 2967 */ 2968 RB_WARN_ON(cpu_buffer, !success); 2969 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2970 2971 /* free pages if they weren't inserted */ 2972 if (!success) { 2973 struct buffer_page *bpage, *tmp; 2974 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2975 list) { 2976 list_del_init(&bpage->list); 2977 free_buffer_page(bpage); 2978 } 2979 } 2980 return success; 2981 } 2982 2983 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2984 { 2985 bool success; 2986 2987 if (cpu_buffer->nr_pages_to_update > 0) 2988 success = rb_insert_pages(cpu_buffer); 2989 else 2990 success = rb_remove_pages(cpu_buffer, 2991 -cpu_buffer->nr_pages_to_update); 2992 2993 if (success) 2994 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2995 } 2996 2997 static void update_pages_handler(struct work_struct *work) 2998 { 2999 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 3000 struct ring_buffer_per_cpu, update_pages_work); 3001 rb_update_pages(cpu_buffer); 3002 complete(&cpu_buffer->update_done); 3003 } 3004 3005 /** 3006 * ring_buffer_resize - resize the ring buffer 3007 * @buffer: the buffer to resize. 3008 * @size: the new size. 3009 * @cpu_id: the cpu buffer to resize 3010 * 3011 * Minimum size is 2 * buffer->subbuf_size. 3012 * 3013 * Returns 0 on success and < 0 on failure. 3014 */ 3015 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 3016 int cpu_id) 3017 { 3018 struct ring_buffer_per_cpu *cpu_buffer; 3019 unsigned long nr_pages; 3020 int cpu, err; 3021 3022 /* 3023 * Always succeed at resizing a non-existent buffer: 3024 */ 3025 if (!buffer) 3026 return 0; 3027 3028 /* Make sure the requested buffer exists */ 3029 if (cpu_id != RING_BUFFER_ALL_CPUS && 3030 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 3031 return 0; 3032 3033 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 3034 3035 /* we need a minimum of two pages */ 3036 if (nr_pages < 2) 3037 nr_pages = 2; 3038 3039 /* 3040 * Keep CPUs from coming online while resizing to synchronize 3041 * with new per CPU buffers being created. 3042 */ 3043 guard(cpus_read_lock)(); 3044 3045 /* prevent another thread from changing buffer sizes */ 3046 mutex_lock(&buffer->mutex); 3047 atomic_inc(&buffer->resizing); 3048 3049 if (cpu_id == RING_BUFFER_ALL_CPUS) { 3050 /* 3051 * Don't succeed if resizing is disabled, as a reader might be 3052 * manipulating the ring buffer and is expecting a sane state while 3053 * this is true. 3054 */ 3055 for_each_buffer_cpu(buffer, cpu) { 3056 cpu_buffer = buffer->buffers[cpu]; 3057 if (atomic_read(&cpu_buffer->resize_disabled)) { 3058 err = -EBUSY; 3059 goto out_err_unlock; 3060 } 3061 } 3062 3063 /* calculate the pages to update */ 3064 for_each_buffer_cpu(buffer, cpu) { 3065 cpu_buffer = buffer->buffers[cpu]; 3066 3067 cpu_buffer->nr_pages_to_update = nr_pages - 3068 cpu_buffer->nr_pages; 3069 /* 3070 * nothing more to do for removing pages or no update 3071 */ 3072 if (cpu_buffer->nr_pages_to_update <= 0) 3073 continue; 3074 /* 3075 * to add pages, make sure all new pages can be 3076 * allocated without receiving ENOMEM 3077 */ 3078 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3079 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3080 &cpu_buffer->new_pages)) { 3081 /* not enough memory for new pages */ 3082 err = -ENOMEM; 3083 goto out_err; 3084 } 3085 3086 cond_resched(); 3087 } 3088 3089 /* 3090 * Fire off all the required work handlers 3091 * We can't schedule on offline CPUs, but it's not necessary 3092 * since we can change their buffer sizes without any race. 3093 */ 3094 for_each_buffer_cpu(buffer, cpu) { 3095 cpu_buffer = buffer->buffers[cpu]; 3096 if (!cpu_buffer->nr_pages_to_update) 3097 continue; 3098 3099 /* Can't run something on an offline CPU. */ 3100 if (!cpu_online(cpu)) { 3101 rb_update_pages(cpu_buffer); 3102 cpu_buffer->nr_pages_to_update = 0; 3103 } else { 3104 /* Run directly if possible. */ 3105 migrate_disable(); 3106 if (cpu != smp_processor_id()) { 3107 migrate_enable(); 3108 schedule_work_on(cpu, 3109 &cpu_buffer->update_pages_work); 3110 } else { 3111 update_pages_handler(&cpu_buffer->update_pages_work); 3112 migrate_enable(); 3113 } 3114 } 3115 } 3116 3117 /* wait for all the updates to complete */ 3118 for_each_buffer_cpu(buffer, cpu) { 3119 cpu_buffer = buffer->buffers[cpu]; 3120 if (!cpu_buffer->nr_pages_to_update) 3121 continue; 3122 3123 if (cpu_online(cpu)) 3124 wait_for_completion(&cpu_buffer->update_done); 3125 cpu_buffer->nr_pages_to_update = 0; 3126 } 3127 3128 } else { 3129 cpu_buffer = buffer->buffers[cpu_id]; 3130 3131 if (nr_pages == cpu_buffer->nr_pages) 3132 goto out; 3133 3134 /* 3135 * Don't succeed if resizing is disabled, as a reader might be 3136 * manipulating the ring buffer and is expecting a sane state while 3137 * this is true. 3138 */ 3139 if (atomic_read(&cpu_buffer->resize_disabled)) { 3140 err = -EBUSY; 3141 goto out_err_unlock; 3142 } 3143 3144 cpu_buffer->nr_pages_to_update = nr_pages - 3145 cpu_buffer->nr_pages; 3146 3147 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3148 if (cpu_buffer->nr_pages_to_update > 0 && 3149 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3150 &cpu_buffer->new_pages)) { 3151 err = -ENOMEM; 3152 goto out_err; 3153 } 3154 3155 /* Can't run something on an offline CPU. */ 3156 if (!cpu_online(cpu_id)) 3157 rb_update_pages(cpu_buffer); 3158 else { 3159 /* Run directly if possible. */ 3160 migrate_disable(); 3161 if (cpu_id == smp_processor_id()) { 3162 rb_update_pages(cpu_buffer); 3163 migrate_enable(); 3164 } else { 3165 migrate_enable(); 3166 schedule_work_on(cpu_id, 3167 &cpu_buffer->update_pages_work); 3168 wait_for_completion(&cpu_buffer->update_done); 3169 } 3170 } 3171 3172 cpu_buffer->nr_pages_to_update = 0; 3173 } 3174 3175 out: 3176 /* 3177 * The ring buffer resize can happen with the ring buffer 3178 * enabled, so that the update disturbs the tracing as little 3179 * as possible. But if the buffer is disabled, we do not need 3180 * to worry about that, and we can take the time to verify 3181 * that the buffer is not corrupt. 3182 */ 3183 if (atomic_read(&buffer->record_disabled)) { 3184 atomic_inc(&buffer->record_disabled); 3185 /* 3186 * Even though the buffer was disabled, we must make sure 3187 * that it is truly disabled before calling rb_check_pages. 3188 * There could have been a race between checking 3189 * record_disable and incrementing it. 3190 */ 3191 synchronize_rcu(); 3192 for_each_buffer_cpu(buffer, cpu) { 3193 cpu_buffer = buffer->buffers[cpu]; 3194 rb_check_pages(cpu_buffer); 3195 } 3196 atomic_dec(&buffer->record_disabled); 3197 } 3198 3199 atomic_dec(&buffer->resizing); 3200 mutex_unlock(&buffer->mutex); 3201 return 0; 3202 3203 out_err: 3204 for_each_buffer_cpu(buffer, cpu) { 3205 struct buffer_page *bpage, *tmp; 3206 3207 cpu_buffer = buffer->buffers[cpu]; 3208 cpu_buffer->nr_pages_to_update = 0; 3209 3210 if (list_empty(&cpu_buffer->new_pages)) 3211 continue; 3212 3213 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3214 list) { 3215 list_del_init(&bpage->list); 3216 free_buffer_page(bpage); 3217 3218 cond_resched(); 3219 } 3220 } 3221 out_err_unlock: 3222 atomic_dec(&buffer->resizing); 3223 mutex_unlock(&buffer->mutex); 3224 return err; 3225 } 3226 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3227 3228 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3229 { 3230 mutex_lock(&buffer->mutex); 3231 if (val) 3232 buffer->flags |= RB_FL_OVERWRITE; 3233 else 3234 buffer->flags &= ~RB_FL_OVERWRITE; 3235 mutex_unlock(&buffer->mutex); 3236 } 3237 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3238 3239 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3240 { 3241 return bpage->page->data + index; 3242 } 3243 3244 static __always_inline struct ring_buffer_event * 3245 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3246 { 3247 return __rb_page_index(cpu_buffer->reader_page, 3248 cpu_buffer->reader_page->read); 3249 } 3250 3251 static struct ring_buffer_event * 3252 rb_iter_head_event(struct ring_buffer_iter *iter) 3253 { 3254 struct ring_buffer_event *event; 3255 struct buffer_page *iter_head_page = iter->head_page; 3256 unsigned long commit; 3257 unsigned length; 3258 3259 if (iter->head != iter->next_event) 3260 return iter->event; 3261 3262 /* 3263 * When the writer goes across pages, it issues a cmpxchg which 3264 * is a mb(), which will synchronize with the rmb here. 3265 * (see rb_tail_page_update() and __rb_reserve_next()) 3266 */ 3267 commit = rb_page_commit(iter_head_page); 3268 smp_rmb(); 3269 3270 /* An event needs to be at least 8 bytes in size */ 3271 if (iter->head > commit - 8) 3272 goto reset; 3273 3274 event = __rb_page_index(iter_head_page, iter->head); 3275 length = rb_event_length(event); 3276 3277 /* 3278 * READ_ONCE() doesn't work on functions and we don't want the 3279 * compiler doing any crazy optimizations with length. 3280 */ 3281 barrier(); 3282 3283 if ((iter->head + length) > commit || length > iter->event_size) 3284 /* Writer corrupted the read? */ 3285 goto reset; 3286 3287 memcpy(iter->event, event, length); 3288 /* 3289 * If the page stamp is still the same after this rmb() then the 3290 * event was safely copied without the writer entering the page. 3291 */ 3292 smp_rmb(); 3293 3294 /* Make sure the page didn't change since we read this */ 3295 if (iter->page_stamp != iter_head_page->page->time_stamp || 3296 commit > rb_page_commit(iter_head_page)) 3297 goto reset; 3298 3299 iter->next_event = iter->head + length; 3300 return iter->event; 3301 reset: 3302 /* Reset to the beginning */ 3303 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3304 iter->head = 0; 3305 iter->next_event = 0; 3306 iter->missed_events = 1; 3307 return NULL; 3308 } 3309 3310 /* Size is determined by what has been committed */ 3311 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3312 { 3313 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3314 } 3315 3316 static __always_inline unsigned 3317 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3318 { 3319 return rb_page_commit(cpu_buffer->commit_page); 3320 } 3321 3322 static __always_inline unsigned 3323 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3324 { 3325 unsigned long addr = (unsigned long)event; 3326 3327 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3328 3329 return addr - BUF_PAGE_HDR_SIZE; 3330 } 3331 3332 static void rb_inc_iter(struct ring_buffer_iter *iter) 3333 { 3334 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3335 3336 /* 3337 * The iterator could be on the reader page (it starts there). 3338 * But the head could have moved, since the reader was 3339 * found. Check for this case and assign the iterator 3340 * to the head page instead of next. 3341 */ 3342 if (iter->head_page == cpu_buffer->reader_page) 3343 iter->head_page = rb_set_head_page(cpu_buffer); 3344 else 3345 rb_inc_page(&iter->head_page); 3346 3347 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3348 iter->head = 0; 3349 iter->next_event = 0; 3350 } 3351 3352 /* Return the index into the sub-buffers for a given sub-buffer */ 3353 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3354 { 3355 void *subbuf_array; 3356 3357 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3358 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3359 return (subbuf - subbuf_array) / meta->subbuf_size; 3360 } 3361 3362 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3363 struct buffer_page *next_page) 3364 { 3365 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3366 unsigned long old_head = (unsigned long)next_page->page; 3367 unsigned long new_head; 3368 3369 rb_inc_page(&next_page); 3370 new_head = (unsigned long)next_page->page; 3371 3372 /* 3373 * Only move it forward once, if something else came in and 3374 * moved it forward, then we don't want to touch it. 3375 */ 3376 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3377 } 3378 3379 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3380 struct buffer_page *reader) 3381 { 3382 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3383 void *old_reader = cpu_buffer->reader_page->page; 3384 void *new_reader = reader->page; 3385 int id; 3386 3387 id = reader->id; 3388 cpu_buffer->reader_page->id = id; 3389 reader->id = 0; 3390 3391 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3392 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3393 3394 /* The head pointer is the one after the reader */ 3395 rb_update_meta_head(cpu_buffer, reader); 3396 } 3397 3398 /* 3399 * rb_handle_head_page - writer hit the head page 3400 * 3401 * Returns: +1 to retry page 3402 * 0 to continue 3403 * -1 on error 3404 */ 3405 static int 3406 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3407 struct buffer_page *tail_page, 3408 struct buffer_page *next_page) 3409 { 3410 struct buffer_page *new_head; 3411 int entries; 3412 int type; 3413 int ret; 3414 3415 entries = rb_page_entries(next_page); 3416 3417 /* 3418 * The hard part is here. We need to move the head 3419 * forward, and protect against both readers on 3420 * other CPUs and writers coming in via interrupts. 3421 */ 3422 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3423 RB_PAGE_HEAD); 3424 3425 /* 3426 * type can be one of four: 3427 * NORMAL - an interrupt already moved it for us 3428 * HEAD - we are the first to get here. 3429 * UPDATE - we are the interrupt interrupting 3430 * a current move. 3431 * MOVED - a reader on another CPU moved the next 3432 * pointer to its reader page. Give up 3433 * and try again. 3434 */ 3435 3436 switch (type) { 3437 case RB_PAGE_HEAD: 3438 /* 3439 * We changed the head to UPDATE, thus 3440 * it is our responsibility to update 3441 * the counters. 3442 */ 3443 local_add(entries, &cpu_buffer->overrun); 3444 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3445 local_inc(&cpu_buffer->pages_lost); 3446 3447 if (cpu_buffer->ring_meta) 3448 rb_update_meta_head(cpu_buffer, next_page); 3449 /* 3450 * The entries will be zeroed out when we move the 3451 * tail page. 3452 */ 3453 3454 /* still more to do */ 3455 break; 3456 3457 case RB_PAGE_UPDATE: 3458 /* 3459 * This is an interrupt that interrupt the 3460 * previous update. Still more to do. 3461 */ 3462 break; 3463 case RB_PAGE_NORMAL: 3464 /* 3465 * An interrupt came in before the update 3466 * and processed this for us. 3467 * Nothing left to do. 3468 */ 3469 return 1; 3470 case RB_PAGE_MOVED: 3471 /* 3472 * The reader is on another CPU and just did 3473 * a swap with our next_page. 3474 * Try again. 3475 */ 3476 return 1; 3477 default: 3478 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3479 return -1; 3480 } 3481 3482 /* 3483 * Now that we are here, the old head pointer is 3484 * set to UPDATE. This will keep the reader from 3485 * swapping the head page with the reader page. 3486 * The reader (on another CPU) will spin till 3487 * we are finished. 3488 * 3489 * We just need to protect against interrupts 3490 * doing the job. We will set the next pointer 3491 * to HEAD. After that, we set the old pointer 3492 * to NORMAL, but only if it was HEAD before. 3493 * otherwise we are an interrupt, and only 3494 * want the outer most commit to reset it. 3495 */ 3496 new_head = next_page; 3497 rb_inc_page(&new_head); 3498 3499 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3500 RB_PAGE_NORMAL); 3501 3502 /* 3503 * Valid returns are: 3504 * HEAD - an interrupt came in and already set it. 3505 * NORMAL - One of two things: 3506 * 1) We really set it. 3507 * 2) A bunch of interrupts came in and moved 3508 * the page forward again. 3509 */ 3510 switch (ret) { 3511 case RB_PAGE_HEAD: 3512 case RB_PAGE_NORMAL: 3513 /* OK */ 3514 break; 3515 default: 3516 RB_WARN_ON(cpu_buffer, 1); 3517 return -1; 3518 } 3519 3520 /* 3521 * It is possible that an interrupt came in, 3522 * set the head up, then more interrupts came in 3523 * and moved it again. When we get back here, 3524 * the page would have been set to NORMAL but we 3525 * just set it back to HEAD. 3526 * 3527 * How do you detect this? Well, if that happened 3528 * the tail page would have moved. 3529 */ 3530 if (ret == RB_PAGE_NORMAL) { 3531 struct buffer_page *buffer_tail_page; 3532 3533 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3534 /* 3535 * If the tail had moved passed next, then we need 3536 * to reset the pointer. 3537 */ 3538 if (buffer_tail_page != tail_page && 3539 buffer_tail_page != next_page) 3540 rb_head_page_set_normal(cpu_buffer, new_head, 3541 next_page, 3542 RB_PAGE_HEAD); 3543 } 3544 3545 /* 3546 * If this was the outer most commit (the one that 3547 * changed the original pointer from HEAD to UPDATE), 3548 * then it is up to us to reset it to NORMAL. 3549 */ 3550 if (type == RB_PAGE_HEAD) { 3551 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3552 tail_page, 3553 RB_PAGE_UPDATE); 3554 if (RB_WARN_ON(cpu_buffer, 3555 ret != RB_PAGE_UPDATE)) 3556 return -1; 3557 } 3558 3559 return 0; 3560 } 3561 3562 static inline void 3563 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3564 unsigned long tail, struct rb_event_info *info) 3565 { 3566 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3567 struct buffer_page *tail_page = info->tail_page; 3568 struct ring_buffer_event *event; 3569 unsigned long length = info->length; 3570 3571 /* 3572 * Only the event that crossed the page boundary 3573 * must fill the old tail_page with padding. 3574 */ 3575 if (tail >= bsize) { 3576 /* 3577 * If the page was filled, then we still need 3578 * to update the real_end. Reset it to zero 3579 * and the reader will ignore it. 3580 */ 3581 if (tail == bsize) 3582 tail_page->real_end = 0; 3583 3584 local_sub(length, &tail_page->write); 3585 return; 3586 } 3587 3588 event = __rb_page_index(tail_page, tail); 3589 3590 /* 3591 * Save the original length to the meta data. 3592 * This will be used by the reader to add lost event 3593 * counter. 3594 */ 3595 tail_page->real_end = tail; 3596 3597 /* 3598 * If this event is bigger than the minimum size, then 3599 * we need to be careful that we don't subtract the 3600 * write counter enough to allow another writer to slip 3601 * in on this page. 3602 * We put in a discarded commit instead, to make sure 3603 * that this space is not used again, and this space will 3604 * not be accounted into 'entries_bytes'. 3605 * 3606 * If we are less than the minimum size, we don't need to 3607 * worry about it. 3608 */ 3609 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3610 /* No room for any events */ 3611 3612 /* Mark the rest of the page with padding */ 3613 rb_event_set_padding(event); 3614 3615 /* Make sure the padding is visible before the write update */ 3616 smp_wmb(); 3617 3618 /* Set the write back to the previous setting */ 3619 local_sub(length, &tail_page->write); 3620 return; 3621 } 3622 3623 /* Put in a discarded event */ 3624 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3625 event->type_len = RINGBUF_TYPE_PADDING; 3626 /* time delta must be non zero */ 3627 event->time_delta = 1; 3628 3629 /* account for padding bytes */ 3630 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3631 3632 /* Make sure the padding is visible before the tail_page->write update */ 3633 smp_wmb(); 3634 3635 /* Set write to end of buffer */ 3636 length = (tail + length) - bsize; 3637 local_sub(length, &tail_page->write); 3638 } 3639 3640 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3641 3642 /* 3643 * This is the slow path, force gcc not to inline it. 3644 */ 3645 static noinline struct ring_buffer_event * 3646 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3647 unsigned long tail, struct rb_event_info *info) 3648 { 3649 struct buffer_page *tail_page = info->tail_page; 3650 struct buffer_page *commit_page = cpu_buffer->commit_page; 3651 struct trace_buffer *buffer = cpu_buffer->buffer; 3652 struct buffer_page *next_page; 3653 int ret; 3654 3655 next_page = tail_page; 3656 3657 rb_inc_page(&next_page); 3658 3659 /* 3660 * If for some reason, we had an interrupt storm that made 3661 * it all the way around the buffer, bail, and warn 3662 * about it. 3663 */ 3664 if (unlikely(next_page == commit_page)) { 3665 local_inc(&cpu_buffer->commit_overrun); 3666 goto out_reset; 3667 } 3668 3669 /* 3670 * This is where the fun begins! 3671 * 3672 * We are fighting against races between a reader that 3673 * could be on another CPU trying to swap its reader 3674 * page with the buffer head. 3675 * 3676 * We are also fighting against interrupts coming in and 3677 * moving the head or tail on us as well. 3678 * 3679 * If the next page is the head page then we have filled 3680 * the buffer, unless the commit page is still on the 3681 * reader page. 3682 */ 3683 if (rb_is_head_page(next_page, &tail_page->list)) { 3684 3685 /* 3686 * If the commit is not on the reader page, then 3687 * move the header page. 3688 */ 3689 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3690 /* 3691 * If we are not in overwrite mode, 3692 * this is easy, just stop here. 3693 */ 3694 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3695 local_inc(&cpu_buffer->dropped_events); 3696 goto out_reset; 3697 } 3698 3699 ret = rb_handle_head_page(cpu_buffer, 3700 tail_page, 3701 next_page); 3702 if (ret < 0) 3703 goto out_reset; 3704 if (ret) 3705 goto out_again; 3706 } else { 3707 /* 3708 * We need to be careful here too. The 3709 * commit page could still be on the reader 3710 * page. We could have a small buffer, and 3711 * have filled up the buffer with events 3712 * from interrupts and such, and wrapped. 3713 * 3714 * Note, if the tail page is also on the 3715 * reader_page, we let it move out. 3716 */ 3717 if (unlikely((cpu_buffer->commit_page != 3718 cpu_buffer->tail_page) && 3719 (cpu_buffer->commit_page == 3720 cpu_buffer->reader_page))) { 3721 local_inc(&cpu_buffer->commit_overrun); 3722 goto out_reset; 3723 } 3724 } 3725 } 3726 3727 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3728 3729 out_again: 3730 3731 rb_reset_tail(cpu_buffer, tail, info); 3732 3733 /* Commit what we have for now. */ 3734 rb_end_commit(cpu_buffer); 3735 /* rb_end_commit() decs committing */ 3736 local_inc(&cpu_buffer->committing); 3737 3738 /* fail and let the caller try again */ 3739 return ERR_PTR(-EAGAIN); 3740 3741 out_reset: 3742 /* reset write */ 3743 rb_reset_tail(cpu_buffer, tail, info); 3744 3745 return NULL; 3746 } 3747 3748 /* Slow path */ 3749 static struct ring_buffer_event * 3750 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3751 struct ring_buffer_event *event, u64 delta, bool abs) 3752 { 3753 if (abs) 3754 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3755 else 3756 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3757 3758 /* Not the first event on the page, or not delta? */ 3759 if (abs || rb_event_index(cpu_buffer, event)) { 3760 event->time_delta = delta & TS_MASK; 3761 event->array[0] = delta >> TS_SHIFT; 3762 } else { 3763 /* nope, just zero it */ 3764 event->time_delta = 0; 3765 event->array[0] = 0; 3766 } 3767 3768 return skip_time_extend(event); 3769 } 3770 3771 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3772 static inline bool sched_clock_stable(void) 3773 { 3774 return true; 3775 } 3776 #endif 3777 3778 static void 3779 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3780 struct rb_event_info *info) 3781 { 3782 u64 write_stamp; 3783 3784 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3785 (unsigned long long)info->delta, 3786 (unsigned long long)info->ts, 3787 (unsigned long long)info->before, 3788 (unsigned long long)info->after, 3789 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3790 sched_clock_stable() ? "" : 3791 "If you just came from a suspend/resume,\n" 3792 "please switch to the trace global clock:\n" 3793 " echo global > /sys/kernel/tracing/trace_clock\n" 3794 "or add trace_clock=global to the kernel command line\n"); 3795 } 3796 3797 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3798 struct ring_buffer_event **event, 3799 struct rb_event_info *info, 3800 u64 *delta, 3801 unsigned int *length) 3802 { 3803 bool abs = info->add_timestamp & 3804 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3805 3806 if (unlikely(info->delta > (1ULL << 59))) { 3807 /* 3808 * Some timers can use more than 59 bits, and when a timestamp 3809 * is added to the buffer, it will lose those bits. 3810 */ 3811 if (abs && (info->ts & TS_MSB)) { 3812 info->delta &= ABS_TS_MASK; 3813 3814 /* did the clock go backwards */ 3815 } else if (info->before == info->after && info->before > info->ts) { 3816 /* not interrupted */ 3817 static int once; 3818 3819 /* 3820 * This is possible with a recalibrating of the TSC. 3821 * Do not produce a call stack, but just report it. 3822 */ 3823 if (!once) { 3824 once++; 3825 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3826 info->before, info->ts); 3827 } 3828 } else 3829 rb_check_timestamp(cpu_buffer, info); 3830 if (!abs) 3831 info->delta = 0; 3832 } 3833 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3834 *length -= RB_LEN_TIME_EXTEND; 3835 *delta = 0; 3836 } 3837 3838 /** 3839 * rb_update_event - update event type and data 3840 * @cpu_buffer: The per cpu buffer of the @event 3841 * @event: the event to update 3842 * @info: The info to update the @event with (contains length and delta) 3843 * 3844 * Update the type and data fields of the @event. The length 3845 * is the actual size that is written to the ring buffer, 3846 * and with this, we can determine what to place into the 3847 * data field. 3848 */ 3849 static void 3850 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3851 struct ring_buffer_event *event, 3852 struct rb_event_info *info) 3853 { 3854 unsigned length = info->length; 3855 u64 delta = info->delta; 3856 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3857 3858 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3859 cpu_buffer->event_stamp[nest] = info->ts; 3860 3861 /* 3862 * If we need to add a timestamp, then we 3863 * add it to the start of the reserved space. 3864 */ 3865 if (unlikely(info->add_timestamp)) 3866 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3867 3868 event->time_delta = delta; 3869 length -= RB_EVNT_HDR_SIZE; 3870 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3871 event->type_len = 0; 3872 event->array[0] = length; 3873 } else 3874 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3875 } 3876 3877 static unsigned rb_calculate_event_length(unsigned length) 3878 { 3879 struct ring_buffer_event event; /* Used only for sizeof array */ 3880 3881 /* zero length can cause confusions */ 3882 if (!length) 3883 length++; 3884 3885 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3886 length += sizeof(event.array[0]); 3887 3888 length += RB_EVNT_HDR_SIZE; 3889 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3890 3891 /* 3892 * In case the time delta is larger than the 27 bits for it 3893 * in the header, we need to add a timestamp. If another 3894 * event comes in when trying to discard this one to increase 3895 * the length, then the timestamp will be added in the allocated 3896 * space of this event. If length is bigger than the size needed 3897 * for the TIME_EXTEND, then padding has to be used. The events 3898 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3899 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3900 * As length is a multiple of 4, we only need to worry if it 3901 * is 12 (RB_LEN_TIME_EXTEND + 4). 3902 */ 3903 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3904 length += RB_ALIGNMENT; 3905 3906 return length; 3907 } 3908 3909 static inline bool 3910 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3911 struct ring_buffer_event *event) 3912 { 3913 unsigned long new_index, old_index; 3914 struct buffer_page *bpage; 3915 unsigned long addr; 3916 3917 new_index = rb_event_index(cpu_buffer, event); 3918 old_index = new_index + rb_event_ts_length(event); 3919 addr = (unsigned long)event; 3920 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3921 3922 bpage = READ_ONCE(cpu_buffer->tail_page); 3923 3924 /* 3925 * Make sure the tail_page is still the same and 3926 * the next write location is the end of this event 3927 */ 3928 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3929 unsigned long write_mask = 3930 local_read(&bpage->write) & ~RB_WRITE_MASK; 3931 unsigned long event_length = rb_event_length(event); 3932 3933 /* 3934 * For the before_stamp to be different than the write_stamp 3935 * to make sure that the next event adds an absolute 3936 * value and does not rely on the saved write stamp, which 3937 * is now going to be bogus. 3938 * 3939 * By setting the before_stamp to zero, the next event 3940 * is not going to use the write_stamp and will instead 3941 * create an absolute timestamp. This means there's no 3942 * reason to update the wirte_stamp! 3943 */ 3944 rb_time_set(&cpu_buffer->before_stamp, 0); 3945 3946 /* 3947 * If an event were to come in now, it would see that the 3948 * write_stamp and the before_stamp are different, and assume 3949 * that this event just added itself before updating 3950 * the write stamp. The interrupting event will fix the 3951 * write stamp for us, and use an absolute timestamp. 3952 */ 3953 3954 /* 3955 * This is on the tail page. It is possible that 3956 * a write could come in and move the tail page 3957 * and write to the next page. That is fine 3958 * because we just shorten what is on this page. 3959 */ 3960 old_index += write_mask; 3961 new_index += write_mask; 3962 3963 /* caution: old_index gets updated on cmpxchg failure */ 3964 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3965 /* update counters */ 3966 local_sub(event_length, &cpu_buffer->entries_bytes); 3967 return true; 3968 } 3969 } 3970 3971 /* could not discard */ 3972 return false; 3973 } 3974 3975 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3976 { 3977 local_inc(&cpu_buffer->committing); 3978 local_inc(&cpu_buffer->commits); 3979 } 3980 3981 static __always_inline void 3982 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3983 { 3984 unsigned long max_count; 3985 3986 /* 3987 * We only race with interrupts and NMIs on this CPU. 3988 * If we own the commit event, then we can commit 3989 * all others that interrupted us, since the interruptions 3990 * are in stack format (they finish before they come 3991 * back to us). This allows us to do a simple loop to 3992 * assign the commit to the tail. 3993 */ 3994 again: 3995 max_count = cpu_buffer->nr_pages * 100; 3996 3997 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3998 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3999 return; 4000 if (RB_WARN_ON(cpu_buffer, 4001 rb_is_reader_page(cpu_buffer->tail_page))) 4002 return; 4003 /* 4004 * No need for a memory barrier here, as the update 4005 * of the tail_page did it for this page. 4006 */ 4007 local_set(&cpu_buffer->commit_page->page->commit, 4008 rb_page_write(cpu_buffer->commit_page)); 4009 rb_inc_page(&cpu_buffer->commit_page); 4010 if (cpu_buffer->ring_meta) { 4011 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 4012 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 4013 } 4014 /* add barrier to keep gcc from optimizing too much */ 4015 barrier(); 4016 } 4017 while (rb_commit_index(cpu_buffer) != 4018 rb_page_write(cpu_buffer->commit_page)) { 4019 4020 /* Make sure the readers see the content of what is committed. */ 4021 smp_wmb(); 4022 local_set(&cpu_buffer->commit_page->page->commit, 4023 rb_page_write(cpu_buffer->commit_page)); 4024 RB_WARN_ON(cpu_buffer, 4025 local_read(&cpu_buffer->commit_page->page->commit) & 4026 ~RB_WRITE_MASK); 4027 barrier(); 4028 } 4029 4030 /* again, keep gcc from optimizing */ 4031 barrier(); 4032 4033 /* 4034 * If an interrupt came in just after the first while loop 4035 * and pushed the tail page forward, we will be left with 4036 * a dangling commit that will never go forward. 4037 */ 4038 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 4039 goto again; 4040 } 4041 4042 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 4043 { 4044 unsigned long commits; 4045 4046 if (RB_WARN_ON(cpu_buffer, 4047 !local_read(&cpu_buffer->committing))) 4048 return; 4049 4050 again: 4051 commits = local_read(&cpu_buffer->commits); 4052 /* synchronize with interrupts */ 4053 barrier(); 4054 if (local_read(&cpu_buffer->committing) == 1) 4055 rb_set_commit_to_write(cpu_buffer); 4056 4057 local_dec(&cpu_buffer->committing); 4058 4059 /* synchronize with interrupts */ 4060 barrier(); 4061 4062 /* 4063 * Need to account for interrupts coming in between the 4064 * updating of the commit page and the clearing of the 4065 * committing counter. 4066 */ 4067 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 4068 !local_read(&cpu_buffer->committing)) { 4069 local_inc(&cpu_buffer->committing); 4070 goto again; 4071 } 4072 } 4073 4074 static inline void rb_event_discard(struct ring_buffer_event *event) 4075 { 4076 if (extended_time(event)) 4077 event = skip_time_extend(event); 4078 4079 /* array[0] holds the actual length for the discarded event */ 4080 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4081 event->type_len = RINGBUF_TYPE_PADDING; 4082 /* time delta must be non zero */ 4083 if (!event->time_delta) 4084 event->time_delta = 1; 4085 } 4086 4087 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4088 { 4089 local_inc(&cpu_buffer->entries); 4090 rb_end_commit(cpu_buffer); 4091 } 4092 4093 static bool 4094 rb_irq_work_queue(struct rb_irq_work *irq_work) 4095 { 4096 int cpu; 4097 4098 /* irq_work_queue_on() is not NMI-safe */ 4099 if (unlikely(in_nmi())) 4100 return irq_work_queue(&irq_work->work); 4101 4102 /* 4103 * If CPU isolation is not active, cpu is always the current 4104 * CPU, and the following is equivallent to irq_work_queue(). 4105 */ 4106 cpu = housekeeping_any_cpu(HK_TYPE_KERNEL_NOISE); 4107 return irq_work_queue_on(&irq_work->work, cpu); 4108 } 4109 4110 static __always_inline void 4111 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4112 { 4113 if (buffer->irq_work.waiters_pending) { 4114 buffer->irq_work.waiters_pending = false; 4115 /* irq_work_queue() supplies it's own memory barriers */ 4116 rb_irq_work_queue(&buffer->irq_work); 4117 } 4118 4119 if (cpu_buffer->irq_work.waiters_pending) { 4120 cpu_buffer->irq_work.waiters_pending = false; 4121 /* irq_work_queue() supplies it's own memory barriers */ 4122 rb_irq_work_queue(&cpu_buffer->irq_work); 4123 } 4124 4125 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4126 return; 4127 4128 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4129 return; 4130 4131 if (!cpu_buffer->irq_work.full_waiters_pending) 4132 return; 4133 4134 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4135 4136 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4137 return; 4138 4139 cpu_buffer->irq_work.wakeup_full = true; 4140 cpu_buffer->irq_work.full_waiters_pending = false; 4141 /* irq_work_queue() supplies it's own memory barriers */ 4142 rb_irq_work_queue(&cpu_buffer->irq_work); 4143 } 4144 4145 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4146 # define do_ring_buffer_record_recursion() \ 4147 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4148 #else 4149 # define do_ring_buffer_record_recursion() do { } while (0) 4150 #endif 4151 4152 /* 4153 * The lock and unlock are done within a preempt disable section. 4154 * The current_context per_cpu variable can only be modified 4155 * by the current task between lock and unlock. But it can 4156 * be modified more than once via an interrupt. To pass this 4157 * information from the lock to the unlock without having to 4158 * access the 'in_interrupt()' functions again (which do show 4159 * a bit of overhead in something as critical as function tracing, 4160 * we use a bitmask trick. 4161 * 4162 * bit 1 = NMI context 4163 * bit 2 = IRQ context 4164 * bit 3 = SoftIRQ context 4165 * bit 4 = normal context. 4166 * 4167 * This works because this is the order of contexts that can 4168 * preempt other contexts. A SoftIRQ never preempts an IRQ 4169 * context. 4170 * 4171 * When the context is determined, the corresponding bit is 4172 * checked and set (if it was set, then a recursion of that context 4173 * happened). 4174 * 4175 * On unlock, we need to clear this bit. To do so, just subtract 4176 * 1 from the current_context and AND it to itself. 4177 * 4178 * (binary) 4179 * 101 - 1 = 100 4180 * 101 & 100 = 100 (clearing bit zero) 4181 * 4182 * 1010 - 1 = 1001 4183 * 1010 & 1001 = 1000 (clearing bit 1) 4184 * 4185 * The least significant bit can be cleared this way, and it 4186 * just so happens that it is the same bit corresponding to 4187 * the current context. 4188 * 4189 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4190 * is set when a recursion is detected at the current context, and if 4191 * the TRANSITION bit is already set, it will fail the recursion. 4192 * This is needed because there's a lag between the changing of 4193 * interrupt context and updating the preempt count. In this case, 4194 * a false positive will be found. To handle this, one extra recursion 4195 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4196 * bit is already set, then it is considered a recursion and the function 4197 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4198 * 4199 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4200 * to be cleared. Even if it wasn't the context that set it. That is, 4201 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4202 * is called before preempt_count() is updated, since the check will 4203 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4204 * NMI then comes in, it will set the NMI bit, but when the NMI code 4205 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4206 * and leave the NMI bit set. But this is fine, because the interrupt 4207 * code that set the TRANSITION bit will then clear the NMI bit when it 4208 * calls trace_recursive_unlock(). If another NMI comes in, it will 4209 * set the TRANSITION bit and continue. 4210 * 4211 * Note: The TRANSITION bit only handles a single transition between context. 4212 */ 4213 4214 static __always_inline bool 4215 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4216 { 4217 unsigned int val = cpu_buffer->current_context; 4218 int bit = interrupt_context_level(); 4219 4220 bit = RB_CTX_NORMAL - bit; 4221 4222 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4223 /* 4224 * It is possible that this was called by transitioning 4225 * between interrupt context, and preempt_count() has not 4226 * been updated yet. In this case, use the TRANSITION bit. 4227 */ 4228 bit = RB_CTX_TRANSITION; 4229 if (val & (1 << (bit + cpu_buffer->nest))) { 4230 do_ring_buffer_record_recursion(); 4231 return true; 4232 } 4233 } 4234 4235 val |= (1 << (bit + cpu_buffer->nest)); 4236 cpu_buffer->current_context = val; 4237 4238 return false; 4239 } 4240 4241 static __always_inline void 4242 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4243 { 4244 cpu_buffer->current_context &= 4245 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4246 } 4247 4248 /* The recursive locking above uses 5 bits */ 4249 #define NESTED_BITS 5 4250 4251 /** 4252 * ring_buffer_nest_start - Allow to trace while nested 4253 * @buffer: The ring buffer to modify 4254 * 4255 * The ring buffer has a safety mechanism to prevent recursion. 4256 * But there may be a case where a trace needs to be done while 4257 * tracing something else. In this case, calling this function 4258 * will allow this function to nest within a currently active 4259 * ring_buffer_lock_reserve(). 4260 * 4261 * Call this function before calling another ring_buffer_lock_reserve() and 4262 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4263 */ 4264 void ring_buffer_nest_start(struct trace_buffer *buffer) 4265 { 4266 struct ring_buffer_per_cpu *cpu_buffer; 4267 int cpu; 4268 4269 /* Enabled by ring_buffer_nest_end() */ 4270 preempt_disable_notrace(); 4271 cpu = raw_smp_processor_id(); 4272 cpu_buffer = buffer->buffers[cpu]; 4273 /* This is the shift value for the above recursive locking */ 4274 cpu_buffer->nest += NESTED_BITS; 4275 } 4276 4277 /** 4278 * ring_buffer_nest_end - Allow to trace while nested 4279 * @buffer: The ring buffer to modify 4280 * 4281 * Must be called after ring_buffer_nest_start() and after the 4282 * ring_buffer_unlock_commit(). 4283 */ 4284 void ring_buffer_nest_end(struct trace_buffer *buffer) 4285 { 4286 struct ring_buffer_per_cpu *cpu_buffer; 4287 int cpu; 4288 4289 /* disabled by ring_buffer_nest_start() */ 4290 cpu = raw_smp_processor_id(); 4291 cpu_buffer = buffer->buffers[cpu]; 4292 /* This is the shift value for the above recursive locking */ 4293 cpu_buffer->nest -= NESTED_BITS; 4294 preempt_enable_notrace(); 4295 } 4296 4297 /** 4298 * ring_buffer_unlock_commit - commit a reserved 4299 * @buffer: The buffer to commit to 4300 * 4301 * This commits the data to the ring buffer, and releases any locks held. 4302 * 4303 * Must be paired with ring_buffer_lock_reserve. 4304 */ 4305 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4306 { 4307 struct ring_buffer_per_cpu *cpu_buffer; 4308 int cpu = raw_smp_processor_id(); 4309 4310 cpu_buffer = buffer->buffers[cpu]; 4311 4312 rb_commit(cpu_buffer); 4313 4314 rb_wakeups(buffer, cpu_buffer); 4315 4316 trace_recursive_unlock(cpu_buffer); 4317 4318 preempt_enable_notrace(); 4319 4320 return 0; 4321 } 4322 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4323 4324 /* Special value to validate all deltas on a page. */ 4325 #define CHECK_FULL_PAGE 1L 4326 4327 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4328 4329 static const char *show_irq_str(int bits) 4330 { 4331 static const char * type[] = { 4332 ".", // 0 4333 "s", // 1 4334 "h", // 2 4335 "Hs", // 3 4336 "n", // 4 4337 "Ns", // 5 4338 "Nh", // 6 4339 "NHs", // 7 4340 }; 4341 4342 return type[bits]; 4343 } 4344 4345 /* Assume this is a trace event */ 4346 static const char *show_flags(struct ring_buffer_event *event) 4347 { 4348 struct trace_entry *entry; 4349 int bits = 0; 4350 4351 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4352 return "X"; 4353 4354 entry = ring_buffer_event_data(event); 4355 4356 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4357 bits |= 1; 4358 4359 if (entry->flags & TRACE_FLAG_HARDIRQ) 4360 bits |= 2; 4361 4362 if (entry->flags & TRACE_FLAG_NMI) 4363 bits |= 4; 4364 4365 return show_irq_str(bits); 4366 } 4367 4368 static const char *show_irq(struct ring_buffer_event *event) 4369 { 4370 struct trace_entry *entry; 4371 4372 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4373 return ""; 4374 4375 entry = ring_buffer_event_data(event); 4376 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4377 return "d"; 4378 return ""; 4379 } 4380 4381 static const char *show_interrupt_level(void) 4382 { 4383 unsigned long pc = preempt_count(); 4384 unsigned char level = 0; 4385 4386 if (pc & SOFTIRQ_OFFSET) 4387 level |= 1; 4388 4389 if (pc & HARDIRQ_MASK) 4390 level |= 2; 4391 4392 if (pc & NMI_MASK) 4393 level |= 4; 4394 4395 return show_irq_str(level); 4396 } 4397 4398 static void dump_buffer_page(struct buffer_data_page *bpage, 4399 struct rb_event_info *info, 4400 unsigned long tail) 4401 { 4402 struct ring_buffer_event *event; 4403 u64 ts, delta; 4404 int e; 4405 4406 ts = bpage->time_stamp; 4407 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4408 4409 for (e = 0; e < tail; e += rb_event_length(event)) { 4410 4411 event = (struct ring_buffer_event *)(bpage->data + e); 4412 4413 switch (event->type_len) { 4414 4415 case RINGBUF_TYPE_TIME_EXTEND: 4416 delta = rb_event_time_stamp(event); 4417 ts += delta; 4418 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4419 e, ts, delta); 4420 break; 4421 4422 case RINGBUF_TYPE_TIME_STAMP: 4423 delta = rb_event_time_stamp(event); 4424 ts = rb_fix_abs_ts(delta, ts); 4425 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4426 e, ts, delta); 4427 break; 4428 4429 case RINGBUF_TYPE_PADDING: 4430 ts += event->time_delta; 4431 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4432 e, ts, event->time_delta); 4433 break; 4434 4435 case RINGBUF_TYPE_DATA: 4436 ts += event->time_delta; 4437 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4438 e, ts, event->time_delta, 4439 show_flags(event), show_irq(event)); 4440 break; 4441 4442 default: 4443 break; 4444 } 4445 } 4446 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4447 } 4448 4449 static DEFINE_PER_CPU(atomic_t, checking); 4450 static atomic_t ts_dump; 4451 4452 #define buffer_warn_return(fmt, ...) \ 4453 do { \ 4454 /* If another report is happening, ignore this one */ \ 4455 if (atomic_inc_return(&ts_dump) != 1) { \ 4456 atomic_dec(&ts_dump); \ 4457 goto out; \ 4458 } \ 4459 atomic_inc(&cpu_buffer->record_disabled); \ 4460 pr_warn(fmt, ##__VA_ARGS__); \ 4461 dump_buffer_page(bpage, info, tail); \ 4462 atomic_dec(&ts_dump); \ 4463 /* There's some cases in boot up that this can happen */ \ 4464 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4465 /* Do not re-enable checking */ \ 4466 return; \ 4467 } while (0) 4468 4469 /* 4470 * Check if the current event time stamp matches the deltas on 4471 * the buffer page. 4472 */ 4473 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4474 struct rb_event_info *info, 4475 unsigned long tail) 4476 { 4477 struct buffer_data_page *bpage; 4478 u64 ts, delta; 4479 bool full = false; 4480 int ret; 4481 4482 bpage = info->tail_page->page; 4483 4484 if (tail == CHECK_FULL_PAGE) { 4485 full = true; 4486 tail = local_read(&bpage->commit); 4487 } else if (info->add_timestamp & 4488 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4489 /* Ignore events with absolute time stamps */ 4490 return; 4491 } 4492 4493 /* 4494 * Do not check the first event (skip possible extends too). 4495 * Also do not check if previous events have not been committed. 4496 */ 4497 if (tail <= 8 || tail > local_read(&bpage->commit)) 4498 return; 4499 4500 /* 4501 * If this interrupted another event, 4502 */ 4503 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4504 goto out; 4505 4506 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4507 if (ret < 0) { 4508 if (delta < ts) { 4509 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4510 cpu_buffer->cpu, ts, delta); 4511 goto out; 4512 } 4513 } 4514 if ((full && ts > info->ts) || 4515 (!full && ts + info->delta != info->ts)) { 4516 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4517 cpu_buffer->cpu, 4518 ts + info->delta, info->ts, info->delta, 4519 info->before, info->after, 4520 full ? " (full)" : "", show_interrupt_level()); 4521 } 4522 out: 4523 atomic_dec(this_cpu_ptr(&checking)); 4524 } 4525 #else 4526 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4527 struct rb_event_info *info, 4528 unsigned long tail) 4529 { 4530 } 4531 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4532 4533 static struct ring_buffer_event * 4534 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4535 struct rb_event_info *info) 4536 { 4537 struct ring_buffer_event *event; 4538 struct buffer_page *tail_page; 4539 unsigned long tail, write, w; 4540 4541 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4542 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4543 4544 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4545 barrier(); 4546 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4547 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4548 barrier(); 4549 info->ts = rb_time_stamp(cpu_buffer->buffer); 4550 4551 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4552 info->delta = info->ts; 4553 } else { 4554 /* 4555 * If interrupting an event time update, we may need an 4556 * absolute timestamp. 4557 * Don't bother if this is the start of a new page (w == 0). 4558 */ 4559 if (!w) { 4560 /* Use the sub-buffer timestamp */ 4561 info->delta = 0; 4562 } else if (unlikely(info->before != info->after)) { 4563 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4564 info->length += RB_LEN_TIME_EXTEND; 4565 } else { 4566 info->delta = info->ts - info->after; 4567 if (unlikely(test_time_stamp(info->delta))) { 4568 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4569 info->length += RB_LEN_TIME_EXTEND; 4570 } 4571 } 4572 } 4573 4574 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4575 4576 /*C*/ write = local_add_return(info->length, &tail_page->write); 4577 4578 /* set write to only the index of the write */ 4579 write &= RB_WRITE_MASK; 4580 4581 tail = write - info->length; 4582 4583 /* See if we shot pass the end of this buffer page */ 4584 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4585 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4586 return rb_move_tail(cpu_buffer, tail, info); 4587 } 4588 4589 if (likely(tail == w)) { 4590 /* Nothing interrupted us between A and C */ 4591 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4592 /* 4593 * If something came in between C and D, the write stamp 4594 * may now not be in sync. But that's fine as the before_stamp 4595 * will be different and then next event will just be forced 4596 * to use an absolute timestamp. 4597 */ 4598 if (likely(!(info->add_timestamp & 4599 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4600 /* This did not interrupt any time update */ 4601 info->delta = info->ts - info->after; 4602 else 4603 /* Just use full timestamp for interrupting event */ 4604 info->delta = info->ts; 4605 check_buffer(cpu_buffer, info, tail); 4606 } else { 4607 u64 ts; 4608 /* SLOW PATH - Interrupted between A and C */ 4609 4610 /* Save the old before_stamp */ 4611 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4612 4613 /* 4614 * Read a new timestamp and update the before_stamp to make 4615 * the next event after this one force using an absolute 4616 * timestamp. This is in case an interrupt were to come in 4617 * between E and F. 4618 */ 4619 ts = rb_time_stamp(cpu_buffer->buffer); 4620 rb_time_set(&cpu_buffer->before_stamp, ts); 4621 4622 barrier(); 4623 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4624 barrier(); 4625 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4626 info->after == info->before && info->after < ts) { 4627 /* 4628 * Nothing came after this event between C and F, it is 4629 * safe to use info->after for the delta as it 4630 * matched info->before and is still valid. 4631 */ 4632 info->delta = ts - info->after; 4633 } else { 4634 /* 4635 * Interrupted between C and F: 4636 * Lost the previous events time stamp. Just set the 4637 * delta to zero, and this will be the same time as 4638 * the event this event interrupted. And the events that 4639 * came after this will still be correct (as they would 4640 * have built their delta on the previous event. 4641 */ 4642 info->delta = 0; 4643 } 4644 info->ts = ts; 4645 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4646 } 4647 4648 /* 4649 * If this is the first commit on the page, then it has the same 4650 * timestamp as the page itself. 4651 */ 4652 if (unlikely(!tail && !(info->add_timestamp & 4653 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4654 info->delta = 0; 4655 4656 /* We reserved something on the buffer */ 4657 4658 event = __rb_page_index(tail_page, tail); 4659 rb_update_event(cpu_buffer, event, info); 4660 4661 local_inc(&tail_page->entries); 4662 4663 /* 4664 * If this is the first commit on the page, then update 4665 * its timestamp. 4666 */ 4667 if (unlikely(!tail)) 4668 tail_page->page->time_stamp = info->ts; 4669 4670 /* account for these added bytes */ 4671 local_add(info->length, &cpu_buffer->entries_bytes); 4672 4673 return event; 4674 } 4675 4676 static __always_inline struct ring_buffer_event * 4677 rb_reserve_next_event(struct trace_buffer *buffer, 4678 struct ring_buffer_per_cpu *cpu_buffer, 4679 unsigned long length) 4680 { 4681 struct ring_buffer_event *event; 4682 struct rb_event_info info; 4683 int nr_loops = 0; 4684 int add_ts_default; 4685 4686 /* 4687 * ring buffer does cmpxchg as well as atomic64 operations 4688 * (which some archs use locking for atomic64), make sure this 4689 * is safe in NMI context 4690 */ 4691 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4692 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4693 (unlikely(in_nmi()))) { 4694 return NULL; 4695 } 4696 4697 rb_start_commit(cpu_buffer); 4698 /* The commit page can not change after this */ 4699 4700 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4701 /* 4702 * Due to the ability to swap a cpu buffer from a buffer 4703 * it is possible it was swapped before we committed. 4704 * (committing stops a swap). We check for it here and 4705 * if it happened, we have to fail the write. 4706 */ 4707 barrier(); 4708 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4709 local_dec(&cpu_buffer->committing); 4710 local_dec(&cpu_buffer->commits); 4711 return NULL; 4712 } 4713 #endif 4714 4715 info.length = rb_calculate_event_length(length); 4716 4717 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4718 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4719 info.length += RB_LEN_TIME_EXTEND; 4720 if (info.length > cpu_buffer->buffer->max_data_size) 4721 goto out_fail; 4722 } else { 4723 add_ts_default = RB_ADD_STAMP_NONE; 4724 } 4725 4726 again: 4727 info.add_timestamp = add_ts_default; 4728 info.delta = 0; 4729 4730 /* 4731 * We allow for interrupts to reenter here and do a trace. 4732 * If one does, it will cause this original code to loop 4733 * back here. Even with heavy interrupts happening, this 4734 * should only happen a few times in a row. If this happens 4735 * 1000 times in a row, there must be either an interrupt 4736 * storm or we have something buggy. 4737 * Bail! 4738 */ 4739 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4740 goto out_fail; 4741 4742 event = __rb_reserve_next(cpu_buffer, &info); 4743 4744 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4745 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4746 info.length -= RB_LEN_TIME_EXTEND; 4747 goto again; 4748 } 4749 4750 if (likely(event)) 4751 return event; 4752 out_fail: 4753 rb_end_commit(cpu_buffer); 4754 return NULL; 4755 } 4756 4757 /** 4758 * ring_buffer_lock_reserve - reserve a part of the buffer 4759 * @buffer: the ring buffer to reserve from 4760 * @length: the length of the data to reserve (excluding event header) 4761 * 4762 * Returns a reserved event on the ring buffer to copy directly to. 4763 * The user of this interface will need to get the body to write into 4764 * and can use the ring_buffer_event_data() interface. 4765 * 4766 * The length is the length of the data needed, not the event length 4767 * which also includes the event header. 4768 * 4769 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4770 * If NULL is returned, then nothing has been allocated or locked. 4771 */ 4772 struct ring_buffer_event * 4773 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4774 { 4775 struct ring_buffer_per_cpu *cpu_buffer; 4776 struct ring_buffer_event *event; 4777 int cpu; 4778 4779 /* If we are tracing schedule, we don't want to recurse */ 4780 preempt_disable_notrace(); 4781 4782 if (unlikely(atomic_read(&buffer->record_disabled))) 4783 goto out; 4784 4785 cpu = raw_smp_processor_id(); 4786 4787 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4788 goto out; 4789 4790 cpu_buffer = buffer->buffers[cpu]; 4791 4792 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4793 goto out; 4794 4795 if (unlikely(length > buffer->max_data_size)) 4796 goto out; 4797 4798 if (unlikely(trace_recursive_lock(cpu_buffer))) 4799 goto out; 4800 4801 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4802 if (!event) 4803 goto out_unlock; 4804 4805 return event; 4806 4807 out_unlock: 4808 trace_recursive_unlock(cpu_buffer); 4809 out: 4810 preempt_enable_notrace(); 4811 return NULL; 4812 } 4813 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4814 4815 /* 4816 * Decrement the entries to the page that an event is on. 4817 * The event does not even need to exist, only the pointer 4818 * to the page it is on. This may only be called before the commit 4819 * takes place. 4820 */ 4821 static inline void 4822 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4823 struct ring_buffer_event *event) 4824 { 4825 unsigned long addr = (unsigned long)event; 4826 struct buffer_page *bpage = cpu_buffer->commit_page; 4827 struct buffer_page *start; 4828 4829 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4830 4831 /* Do the likely case first */ 4832 if (likely(bpage->page == (void *)addr)) { 4833 local_dec(&bpage->entries); 4834 return; 4835 } 4836 4837 /* 4838 * Because the commit page may be on the reader page we 4839 * start with the next page and check the end loop there. 4840 */ 4841 rb_inc_page(&bpage); 4842 start = bpage; 4843 do { 4844 if (bpage->page == (void *)addr) { 4845 local_dec(&bpage->entries); 4846 return; 4847 } 4848 rb_inc_page(&bpage); 4849 } while (bpage != start); 4850 4851 /* commit not part of this buffer?? */ 4852 RB_WARN_ON(cpu_buffer, 1); 4853 } 4854 4855 /** 4856 * ring_buffer_discard_commit - discard an event that has not been committed 4857 * @buffer: the ring buffer 4858 * @event: non committed event to discard 4859 * 4860 * Sometimes an event that is in the ring buffer needs to be ignored. 4861 * This function lets the user discard an event in the ring buffer 4862 * and then that event will not be read later. 4863 * 4864 * This function only works if it is called before the item has been 4865 * committed. It will try to free the event from the ring buffer 4866 * if another event has not been added behind it. 4867 * 4868 * If another event has been added behind it, it will set the event 4869 * up as discarded, and perform the commit. 4870 * 4871 * If this function is called, do not call ring_buffer_unlock_commit on 4872 * the event. 4873 */ 4874 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4875 struct ring_buffer_event *event) 4876 { 4877 struct ring_buffer_per_cpu *cpu_buffer; 4878 int cpu; 4879 4880 /* The event is discarded regardless */ 4881 rb_event_discard(event); 4882 4883 cpu = smp_processor_id(); 4884 cpu_buffer = buffer->buffers[cpu]; 4885 4886 /* 4887 * This must only be called if the event has not been 4888 * committed yet. Thus we can assume that preemption 4889 * is still disabled. 4890 */ 4891 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4892 4893 rb_decrement_entry(cpu_buffer, event); 4894 rb_try_to_discard(cpu_buffer, event); 4895 rb_end_commit(cpu_buffer); 4896 4897 trace_recursive_unlock(cpu_buffer); 4898 4899 preempt_enable_notrace(); 4900 4901 } 4902 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4903 4904 /** 4905 * ring_buffer_write - write data to the buffer without reserving 4906 * @buffer: The ring buffer to write to. 4907 * @length: The length of the data being written (excluding the event header) 4908 * @data: The data to write to the buffer. 4909 * 4910 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4911 * one function. If you already have the data to write to the buffer, it 4912 * may be easier to simply call this function. 4913 * 4914 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4915 * and not the length of the event which would hold the header. 4916 */ 4917 int ring_buffer_write(struct trace_buffer *buffer, 4918 unsigned long length, 4919 void *data) 4920 { 4921 struct ring_buffer_per_cpu *cpu_buffer; 4922 struct ring_buffer_event *event; 4923 void *body; 4924 int ret = -EBUSY; 4925 int cpu; 4926 4927 guard(preempt_notrace)(); 4928 4929 if (atomic_read(&buffer->record_disabled)) 4930 return -EBUSY; 4931 4932 cpu = raw_smp_processor_id(); 4933 4934 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4935 return -EBUSY; 4936 4937 cpu_buffer = buffer->buffers[cpu]; 4938 4939 if (atomic_read(&cpu_buffer->record_disabled)) 4940 return -EBUSY; 4941 4942 if (length > buffer->max_data_size) 4943 return -EBUSY; 4944 4945 if (unlikely(trace_recursive_lock(cpu_buffer))) 4946 return -EBUSY; 4947 4948 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4949 if (!event) 4950 goto out_unlock; 4951 4952 body = rb_event_data(event); 4953 4954 memcpy(body, data, length); 4955 4956 rb_commit(cpu_buffer); 4957 4958 rb_wakeups(buffer, cpu_buffer); 4959 4960 ret = 0; 4961 4962 out_unlock: 4963 trace_recursive_unlock(cpu_buffer); 4964 return ret; 4965 } 4966 EXPORT_SYMBOL_GPL(ring_buffer_write); 4967 4968 /* 4969 * The total entries in the ring buffer is the running counter 4970 * of entries entered into the ring buffer, minus the sum of 4971 * the entries read from the ring buffer and the number of 4972 * entries that were overwritten. 4973 */ 4974 static inline unsigned long 4975 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4976 { 4977 return local_read(&cpu_buffer->entries) - 4978 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4979 } 4980 4981 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4982 { 4983 return !rb_num_of_entries(cpu_buffer); 4984 } 4985 4986 /** 4987 * ring_buffer_record_disable - stop all writes into the buffer 4988 * @buffer: The ring buffer to stop writes to. 4989 * 4990 * This prevents all writes to the buffer. Any attempt to write 4991 * to the buffer after this will fail and return NULL. 4992 * 4993 * The caller should call synchronize_rcu() after this. 4994 */ 4995 void ring_buffer_record_disable(struct trace_buffer *buffer) 4996 { 4997 atomic_inc(&buffer->record_disabled); 4998 } 4999 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 5000 5001 /** 5002 * ring_buffer_record_enable - enable writes to the buffer 5003 * @buffer: The ring buffer to enable writes 5004 * 5005 * Note, multiple disables will need the same number of enables 5006 * to truly enable the writing (much like preempt_disable). 5007 */ 5008 void ring_buffer_record_enable(struct trace_buffer *buffer) 5009 { 5010 atomic_dec(&buffer->record_disabled); 5011 } 5012 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 5013 5014 /** 5015 * ring_buffer_record_off - stop all writes into the buffer 5016 * @buffer: The ring buffer to stop writes to. 5017 * 5018 * This prevents all writes to the buffer. Any attempt to write 5019 * to the buffer after this will fail and return NULL. 5020 * 5021 * This is different than ring_buffer_record_disable() as 5022 * it works like an on/off switch, where as the disable() version 5023 * must be paired with a enable(). 5024 */ 5025 void ring_buffer_record_off(struct trace_buffer *buffer) 5026 { 5027 unsigned int rd; 5028 unsigned int new_rd; 5029 5030 rd = atomic_read(&buffer->record_disabled); 5031 do { 5032 new_rd = rd | RB_BUFFER_OFF; 5033 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5034 } 5035 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 5036 5037 /** 5038 * ring_buffer_record_on - restart writes into the buffer 5039 * @buffer: The ring buffer to start writes to. 5040 * 5041 * This enables all writes to the buffer that was disabled by 5042 * ring_buffer_record_off(). 5043 * 5044 * This is different than ring_buffer_record_enable() as 5045 * it works like an on/off switch, where as the enable() version 5046 * must be paired with a disable(). 5047 */ 5048 void ring_buffer_record_on(struct trace_buffer *buffer) 5049 { 5050 unsigned int rd; 5051 unsigned int new_rd; 5052 5053 rd = atomic_read(&buffer->record_disabled); 5054 do { 5055 new_rd = rd & ~RB_BUFFER_OFF; 5056 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5057 } 5058 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 5059 5060 /** 5061 * ring_buffer_record_is_on - return true if the ring buffer can write 5062 * @buffer: The ring buffer to see if write is enabled 5063 * 5064 * Returns true if the ring buffer is in a state that it accepts writes. 5065 */ 5066 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 5067 { 5068 return !atomic_read(&buffer->record_disabled); 5069 } 5070 5071 /** 5072 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 5073 * @buffer: The ring buffer to see if write is set enabled 5074 * 5075 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 5076 * Note that this does NOT mean it is in a writable state. 5077 * 5078 * It may return true when the ring buffer has been disabled by 5079 * ring_buffer_record_disable(), as that is a temporary disabling of 5080 * the ring buffer. 5081 */ 5082 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 5083 { 5084 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 5085 } 5086 5087 /** 5088 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 5089 * @buffer: The ring buffer to see if write is enabled 5090 * @cpu: The CPU to test if the ring buffer can write too 5091 * 5092 * Returns true if the ring buffer is in a state that it accepts writes 5093 * for a particular CPU. 5094 */ 5095 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5096 { 5097 struct ring_buffer_per_cpu *cpu_buffer; 5098 5099 cpu_buffer = buffer->buffers[cpu]; 5100 5101 return ring_buffer_record_is_set_on(buffer) && 5102 !atomic_read(&cpu_buffer->record_disabled); 5103 } 5104 5105 /** 5106 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5107 * @buffer: The ring buffer to stop writes to. 5108 * @cpu: The CPU buffer to stop 5109 * 5110 * This prevents all writes to the buffer. Any attempt to write 5111 * to the buffer after this will fail and return NULL. 5112 * 5113 * The caller should call synchronize_rcu() after this. 5114 */ 5115 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5116 { 5117 struct ring_buffer_per_cpu *cpu_buffer; 5118 5119 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5120 return; 5121 5122 cpu_buffer = buffer->buffers[cpu]; 5123 atomic_inc(&cpu_buffer->record_disabled); 5124 } 5125 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5126 5127 /** 5128 * ring_buffer_record_enable_cpu - enable writes to the buffer 5129 * @buffer: The ring buffer to enable writes 5130 * @cpu: The CPU to enable. 5131 * 5132 * Note, multiple disables will need the same number of enables 5133 * to truly enable the writing (much like preempt_disable). 5134 */ 5135 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5136 { 5137 struct ring_buffer_per_cpu *cpu_buffer; 5138 5139 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5140 return; 5141 5142 cpu_buffer = buffer->buffers[cpu]; 5143 atomic_dec(&cpu_buffer->record_disabled); 5144 } 5145 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5146 5147 /** 5148 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5149 * @buffer: The ring buffer 5150 * @cpu: The per CPU buffer to read from. 5151 */ 5152 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5153 { 5154 unsigned long flags; 5155 struct ring_buffer_per_cpu *cpu_buffer; 5156 struct buffer_page *bpage; 5157 u64 ret = 0; 5158 5159 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5160 return 0; 5161 5162 cpu_buffer = buffer->buffers[cpu]; 5163 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5164 /* 5165 * if the tail is on reader_page, oldest time stamp is on the reader 5166 * page 5167 */ 5168 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5169 bpage = cpu_buffer->reader_page; 5170 else 5171 bpage = rb_set_head_page(cpu_buffer); 5172 if (bpage) 5173 ret = bpage->page->time_stamp; 5174 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5175 5176 return ret; 5177 } 5178 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5179 5180 /** 5181 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5182 * @buffer: The ring buffer 5183 * @cpu: The per CPU buffer to read from. 5184 */ 5185 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5186 { 5187 struct ring_buffer_per_cpu *cpu_buffer; 5188 unsigned long ret; 5189 5190 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5191 return 0; 5192 5193 cpu_buffer = buffer->buffers[cpu]; 5194 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5195 5196 return ret; 5197 } 5198 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5199 5200 /** 5201 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5202 * @buffer: The ring buffer 5203 * @cpu: The per CPU buffer to get the entries from. 5204 */ 5205 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5206 { 5207 struct ring_buffer_per_cpu *cpu_buffer; 5208 5209 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5210 return 0; 5211 5212 cpu_buffer = buffer->buffers[cpu]; 5213 5214 return rb_num_of_entries(cpu_buffer); 5215 } 5216 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5217 5218 /** 5219 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5220 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5221 * @buffer: The ring buffer 5222 * @cpu: The per CPU buffer to get the number of overruns from 5223 */ 5224 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5225 { 5226 struct ring_buffer_per_cpu *cpu_buffer; 5227 unsigned long ret; 5228 5229 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5230 return 0; 5231 5232 cpu_buffer = buffer->buffers[cpu]; 5233 ret = local_read(&cpu_buffer->overrun); 5234 5235 return ret; 5236 } 5237 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5238 5239 /** 5240 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5241 * commits failing due to the buffer wrapping around while there are uncommitted 5242 * events, such as during an interrupt storm. 5243 * @buffer: The ring buffer 5244 * @cpu: The per CPU buffer to get the number of overruns from 5245 */ 5246 unsigned long 5247 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5248 { 5249 struct ring_buffer_per_cpu *cpu_buffer; 5250 unsigned long ret; 5251 5252 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5253 return 0; 5254 5255 cpu_buffer = buffer->buffers[cpu]; 5256 ret = local_read(&cpu_buffer->commit_overrun); 5257 5258 return ret; 5259 } 5260 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5261 5262 /** 5263 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5264 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5265 * @buffer: The ring buffer 5266 * @cpu: The per CPU buffer to get the number of overruns from 5267 */ 5268 unsigned long 5269 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5270 { 5271 struct ring_buffer_per_cpu *cpu_buffer; 5272 unsigned long ret; 5273 5274 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5275 return 0; 5276 5277 cpu_buffer = buffer->buffers[cpu]; 5278 ret = local_read(&cpu_buffer->dropped_events); 5279 5280 return ret; 5281 } 5282 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5283 5284 /** 5285 * ring_buffer_read_events_cpu - get the number of events successfully read 5286 * @buffer: The ring buffer 5287 * @cpu: The per CPU buffer to get the number of events read 5288 */ 5289 unsigned long 5290 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5291 { 5292 struct ring_buffer_per_cpu *cpu_buffer; 5293 5294 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5295 return 0; 5296 5297 cpu_buffer = buffer->buffers[cpu]; 5298 return cpu_buffer->read; 5299 } 5300 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5301 5302 /** 5303 * ring_buffer_entries - get the number of entries in a buffer 5304 * @buffer: The ring buffer 5305 * 5306 * Returns the total number of entries in the ring buffer 5307 * (all CPU entries) 5308 */ 5309 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5310 { 5311 struct ring_buffer_per_cpu *cpu_buffer; 5312 unsigned long entries = 0; 5313 int cpu; 5314 5315 /* if you care about this being correct, lock the buffer */ 5316 for_each_buffer_cpu(buffer, cpu) { 5317 cpu_buffer = buffer->buffers[cpu]; 5318 entries += rb_num_of_entries(cpu_buffer); 5319 } 5320 5321 return entries; 5322 } 5323 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5324 5325 /** 5326 * ring_buffer_overruns - get the number of overruns in buffer 5327 * @buffer: The ring buffer 5328 * 5329 * Returns the total number of overruns in the ring buffer 5330 * (all CPU entries) 5331 */ 5332 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5333 { 5334 struct ring_buffer_per_cpu *cpu_buffer; 5335 unsigned long overruns = 0; 5336 int cpu; 5337 5338 /* if you care about this being correct, lock the buffer */ 5339 for_each_buffer_cpu(buffer, cpu) { 5340 cpu_buffer = buffer->buffers[cpu]; 5341 overruns += local_read(&cpu_buffer->overrun); 5342 } 5343 5344 return overruns; 5345 } 5346 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5347 5348 static bool rb_read_remote_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5349 { 5350 local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries)); 5351 local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun)); 5352 local_set(&cpu_buffer->pages_touched, READ_ONCE(cpu_buffer->meta_page->pages_touched)); 5353 local_set(&cpu_buffer->pages_lost, READ_ONCE(cpu_buffer->meta_page->pages_lost)); 5354 5355 return rb_num_of_entries(cpu_buffer); 5356 } 5357 5358 static void rb_update_remote_head(struct ring_buffer_per_cpu *cpu_buffer) 5359 { 5360 struct buffer_page *next, *orig; 5361 int retry = 3; 5362 5363 orig = next = cpu_buffer->head_page; 5364 rb_inc_page(&next); 5365 5366 /* Run after the writer */ 5367 while (cpu_buffer->head_page->page->time_stamp > next->page->time_stamp) { 5368 rb_inc_page(&next); 5369 5370 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5371 rb_inc_page(&cpu_buffer->head_page); 5372 rb_set_list_to_head(cpu_buffer->head_page->list.prev); 5373 5374 if (cpu_buffer->head_page == orig) { 5375 if (WARN_ON_ONCE(!(--retry))) 5376 return; 5377 } 5378 } 5379 5380 orig = cpu_buffer->commit_page = cpu_buffer->head_page; 5381 retry = 3; 5382 5383 while (cpu_buffer->commit_page->page->time_stamp < next->page->time_stamp) { 5384 rb_inc_page(&next); 5385 rb_inc_page(&cpu_buffer->commit_page); 5386 5387 if (cpu_buffer->commit_page == orig) { 5388 if (WARN_ON_ONCE(!(--retry))) 5389 return; 5390 } 5391 } 5392 } 5393 5394 static void rb_iter_reset(struct ring_buffer_iter *iter) 5395 { 5396 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5397 5398 if (cpu_buffer->remote) { 5399 rb_read_remote_meta_page(cpu_buffer); 5400 rb_update_remote_head(cpu_buffer); 5401 } 5402 5403 /* Iterator usage is expected to have record disabled */ 5404 iter->head_page = cpu_buffer->reader_page; 5405 iter->head = cpu_buffer->reader_page->read; 5406 iter->next_event = iter->head; 5407 5408 iter->cache_reader_page = iter->head_page; 5409 iter->cache_read = cpu_buffer->read; 5410 iter->cache_pages_removed = cpu_buffer->pages_removed; 5411 5412 if (iter->head) { 5413 iter->read_stamp = cpu_buffer->read_stamp; 5414 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5415 } else { 5416 iter->read_stamp = iter->head_page->page->time_stamp; 5417 iter->page_stamp = iter->read_stamp; 5418 } 5419 } 5420 5421 /** 5422 * ring_buffer_iter_reset - reset an iterator 5423 * @iter: The iterator to reset 5424 * 5425 * Resets the iterator, so that it will start from the beginning 5426 * again. 5427 */ 5428 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5429 { 5430 struct ring_buffer_per_cpu *cpu_buffer; 5431 unsigned long flags; 5432 5433 if (!iter) 5434 return; 5435 5436 cpu_buffer = iter->cpu_buffer; 5437 5438 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5439 rb_iter_reset(iter); 5440 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5441 } 5442 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5443 5444 /** 5445 * ring_buffer_iter_empty - check if an iterator has no more to read 5446 * @iter: The iterator to check 5447 */ 5448 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5449 { 5450 struct ring_buffer_per_cpu *cpu_buffer; 5451 struct buffer_page *reader; 5452 struct buffer_page *head_page; 5453 struct buffer_page *commit_page; 5454 struct buffer_page *curr_commit_page; 5455 unsigned commit; 5456 u64 curr_commit_ts; 5457 u64 commit_ts; 5458 5459 cpu_buffer = iter->cpu_buffer; 5460 reader = cpu_buffer->reader_page; 5461 head_page = cpu_buffer->head_page; 5462 commit_page = READ_ONCE(cpu_buffer->commit_page); 5463 commit_ts = commit_page->page->time_stamp; 5464 5465 /* 5466 * When the writer goes across pages, it issues a cmpxchg which 5467 * is a mb(), which will synchronize with the rmb here. 5468 * (see rb_tail_page_update()) 5469 */ 5470 smp_rmb(); 5471 commit = rb_page_commit(commit_page); 5472 /* We want to make sure that the commit page doesn't change */ 5473 smp_rmb(); 5474 5475 /* Make sure commit page didn't change */ 5476 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5477 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5478 5479 /* If the commit page changed, then there's more data */ 5480 if (curr_commit_page != commit_page || 5481 curr_commit_ts != commit_ts) 5482 return 0; 5483 5484 /* Still racy, as it may return a false positive, but that's OK */ 5485 return ((iter->head_page == commit_page && iter->head >= commit) || 5486 (iter->head_page == reader && commit_page == head_page && 5487 head_page->read == commit && 5488 iter->head == rb_page_size(cpu_buffer->reader_page))); 5489 } 5490 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5491 5492 static void 5493 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5494 struct ring_buffer_event *event) 5495 { 5496 u64 delta; 5497 5498 switch (event->type_len) { 5499 case RINGBUF_TYPE_PADDING: 5500 return; 5501 5502 case RINGBUF_TYPE_TIME_EXTEND: 5503 delta = rb_event_time_stamp(event); 5504 cpu_buffer->read_stamp += delta; 5505 return; 5506 5507 case RINGBUF_TYPE_TIME_STAMP: 5508 delta = rb_event_time_stamp(event); 5509 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5510 cpu_buffer->read_stamp = delta; 5511 return; 5512 5513 case RINGBUF_TYPE_DATA: 5514 cpu_buffer->read_stamp += event->time_delta; 5515 return; 5516 5517 default: 5518 RB_WARN_ON(cpu_buffer, 1); 5519 } 5520 } 5521 5522 static void 5523 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5524 struct ring_buffer_event *event) 5525 { 5526 u64 delta; 5527 5528 switch (event->type_len) { 5529 case RINGBUF_TYPE_PADDING: 5530 return; 5531 5532 case RINGBUF_TYPE_TIME_EXTEND: 5533 delta = rb_event_time_stamp(event); 5534 iter->read_stamp += delta; 5535 return; 5536 5537 case RINGBUF_TYPE_TIME_STAMP: 5538 delta = rb_event_time_stamp(event); 5539 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5540 iter->read_stamp = delta; 5541 return; 5542 5543 case RINGBUF_TYPE_DATA: 5544 iter->read_stamp += event->time_delta; 5545 return; 5546 5547 default: 5548 RB_WARN_ON(iter->cpu_buffer, 1); 5549 } 5550 } 5551 5552 static struct buffer_page * 5553 __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer) 5554 { 5555 struct buffer_page *new_reader, *prev_reader, *prev_head, *new_head, *last; 5556 5557 if (!rb_read_remote_meta_page(cpu_buffer)) 5558 return NULL; 5559 5560 /* More to read on the reader page */ 5561 if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page)) { 5562 if (!cpu_buffer->reader_page->read) 5563 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 5564 return cpu_buffer->reader_page; 5565 } 5566 5567 prev_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5568 5569 WARN_ON_ONCE(cpu_buffer->remote->swap_reader_page(cpu_buffer->cpu, 5570 cpu_buffer->remote->priv)); 5571 /* nr_pages doesn't include the reader page */ 5572 if (WARN_ON_ONCE(cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages)) 5573 return NULL; 5574 5575 new_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5576 5577 WARN_ON_ONCE(prev_reader == new_reader); 5578 5579 prev_head = new_reader; /* New reader was also the previous head */ 5580 new_head = prev_head; 5581 rb_inc_page(&new_head); 5582 last = prev_head; 5583 rb_dec_page(&last); 5584 5585 /* Clear the old HEAD flag */ 5586 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5587 5588 prev_reader->list.next = prev_head->list.next; 5589 prev_reader->list.prev = prev_head->list.prev; 5590 5591 /* Swap prev_reader with new_reader */ 5592 last->list.next = &prev_reader->list; 5593 new_head->list.prev = &prev_reader->list; 5594 5595 new_reader->list.prev = &new_reader->list; 5596 new_reader->list.next = &new_head->list; 5597 5598 /* Reactivate the HEAD flag */ 5599 rb_set_list_to_head(&last->list); 5600 5601 cpu_buffer->head_page = new_head; 5602 cpu_buffer->reader_page = new_reader; 5603 cpu_buffer->pages = &new_head->list; 5604 cpu_buffer->read_stamp = new_reader->page->time_stamp; 5605 cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; 5606 5607 return rb_page_size(cpu_buffer->reader_page) ? cpu_buffer->reader_page : NULL; 5608 } 5609 5610 static struct buffer_page * 5611 __rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5612 { 5613 struct buffer_page *reader = NULL; 5614 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5615 unsigned long overwrite; 5616 unsigned long flags; 5617 int nr_loops = 0; 5618 bool ret; 5619 5620 local_irq_save(flags); 5621 arch_spin_lock(&cpu_buffer->lock); 5622 5623 again: 5624 /* 5625 * This should normally only loop twice. But because the 5626 * start of the reader inserts an empty page, it causes 5627 * a case where we will loop three times. There should be no 5628 * reason to loop four times (that I know of). 5629 */ 5630 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5631 reader = NULL; 5632 goto out; 5633 } 5634 5635 reader = cpu_buffer->reader_page; 5636 5637 /* If there's more to read, return this page */ 5638 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5639 goto out; 5640 5641 /* Never should we have an index greater than the size */ 5642 if (RB_WARN_ON(cpu_buffer, 5643 cpu_buffer->reader_page->read > rb_page_size(reader))) 5644 goto out; 5645 5646 /* check if we caught up to the tail */ 5647 reader = NULL; 5648 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5649 goto out; 5650 5651 /* Don't bother swapping if the ring buffer is empty */ 5652 if (rb_num_of_entries(cpu_buffer) == 0) 5653 goto out; 5654 5655 /* 5656 * Reset the reader page to size zero. 5657 */ 5658 local_set(&cpu_buffer->reader_page->write, 0); 5659 local_set(&cpu_buffer->reader_page->entries, 0); 5660 cpu_buffer->reader_page->real_end = 0; 5661 5662 spin: 5663 /* 5664 * Splice the empty reader page into the list around the head. 5665 */ 5666 reader = rb_set_head_page(cpu_buffer); 5667 if (!reader) 5668 goto out; 5669 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5670 cpu_buffer->reader_page->list.prev = reader->list.prev; 5671 5672 /* 5673 * cpu_buffer->pages just needs to point to the buffer, it 5674 * has no specific buffer page to point to. Lets move it out 5675 * of our way so we don't accidentally swap it. 5676 */ 5677 cpu_buffer->pages = reader->list.prev; 5678 5679 /* The reader page will be pointing to the new head */ 5680 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5681 5682 /* 5683 * We want to make sure we read the overruns after we set up our 5684 * pointers to the next object. The writer side does a 5685 * cmpxchg to cross pages which acts as the mb on the writer 5686 * side. Note, the reader will constantly fail the swap 5687 * while the writer is updating the pointers, so this 5688 * guarantees that the overwrite recorded here is the one we 5689 * want to compare with the last_overrun. 5690 */ 5691 smp_mb(); 5692 overwrite = local_read(&(cpu_buffer->overrun)); 5693 5694 /* 5695 * Here's the tricky part. 5696 * 5697 * We need to move the pointer past the header page. 5698 * But we can only do that if a writer is not currently 5699 * moving it. The page before the header page has the 5700 * flag bit '1' set if it is pointing to the page we want. 5701 * but if the writer is in the process of moving it 5702 * then it will be '2' or already moved '0'. 5703 */ 5704 5705 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5706 5707 /* 5708 * If we did not convert it, then we must try again. 5709 */ 5710 if (!ret) 5711 goto spin; 5712 5713 if (cpu_buffer->ring_meta) 5714 rb_update_meta_reader(cpu_buffer, reader); 5715 5716 /* 5717 * Yay! We succeeded in replacing the page. 5718 * 5719 * Now make the new head point back to the reader page. 5720 */ 5721 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5722 rb_inc_page(&cpu_buffer->head_page); 5723 5724 cpu_buffer->cnt++; 5725 local_inc(&cpu_buffer->pages_read); 5726 5727 /* Finally update the reader page to the new head */ 5728 cpu_buffer->reader_page = reader; 5729 cpu_buffer->reader_page->read = 0; 5730 5731 if (overwrite != cpu_buffer->last_overrun) { 5732 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5733 cpu_buffer->last_overrun = overwrite; 5734 } 5735 5736 goto again; 5737 5738 out: 5739 /* Update the read_stamp on the first event */ 5740 if (reader && reader->read == 0) 5741 cpu_buffer->read_stamp = reader->page->time_stamp; 5742 5743 arch_spin_unlock(&cpu_buffer->lock); 5744 local_irq_restore(flags); 5745 5746 /* 5747 * The writer has preempt disable, wait for it. But not forever 5748 * Although, 1 second is pretty much "forever" 5749 */ 5750 #define USECS_WAIT 1000000 5751 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5752 /* If the write is past the end of page, a writer is still updating it */ 5753 if (likely(!reader || rb_page_write(reader) <= bsize)) 5754 break; 5755 5756 udelay(1); 5757 5758 /* Get the latest version of the reader write value */ 5759 smp_rmb(); 5760 } 5761 5762 /* The writer is not moving forward? Something is wrong */ 5763 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5764 reader = NULL; 5765 5766 /* 5767 * Make sure we see any padding after the write update 5768 * (see rb_reset_tail()). 5769 * 5770 * In addition, a writer may be writing on the reader page 5771 * if the page has not been fully filled, so the read barrier 5772 * is also needed to make sure we see the content of what is 5773 * committed by the writer (see rb_set_commit_to_write()). 5774 */ 5775 smp_rmb(); 5776 5777 5778 return reader; 5779 } 5780 5781 static struct buffer_page * 5782 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5783 { 5784 return cpu_buffer->remote ? __rb_get_reader_page_from_remote(cpu_buffer) : 5785 __rb_get_reader_page(cpu_buffer); 5786 } 5787 5788 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5789 { 5790 struct ring_buffer_event *event; 5791 struct buffer_page *reader; 5792 unsigned length; 5793 5794 reader = rb_get_reader_page(cpu_buffer); 5795 5796 /* This function should not be called when buffer is empty */ 5797 if (RB_WARN_ON(cpu_buffer, !reader)) 5798 return; 5799 5800 event = rb_reader_event(cpu_buffer); 5801 5802 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5803 cpu_buffer->read++; 5804 5805 rb_update_read_stamp(cpu_buffer, event); 5806 5807 length = rb_event_length(event); 5808 cpu_buffer->reader_page->read += length; 5809 cpu_buffer->read_bytes += length; 5810 } 5811 5812 static void rb_advance_iter(struct ring_buffer_iter *iter) 5813 { 5814 struct ring_buffer_per_cpu *cpu_buffer; 5815 5816 cpu_buffer = iter->cpu_buffer; 5817 5818 /* If head == next_event then we need to jump to the next event */ 5819 if (iter->head == iter->next_event) { 5820 /* If the event gets overwritten again, there's nothing to do */ 5821 if (rb_iter_head_event(iter) == NULL) 5822 return; 5823 } 5824 5825 iter->head = iter->next_event; 5826 5827 /* 5828 * Check if we are at the end of the buffer. 5829 */ 5830 if (iter->next_event >= rb_page_size(iter->head_page)) { 5831 /* discarded commits can make the page empty */ 5832 if (iter->head_page == cpu_buffer->commit_page) 5833 return; 5834 rb_inc_iter(iter); 5835 return; 5836 } 5837 5838 rb_update_iter_read_stamp(iter, iter->event); 5839 } 5840 5841 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5842 { 5843 return cpu_buffer->lost_events; 5844 } 5845 5846 static struct ring_buffer_event * 5847 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5848 unsigned long *lost_events) 5849 { 5850 struct ring_buffer_event *event; 5851 struct buffer_page *reader; 5852 int nr_loops = 0; 5853 5854 if (ts) 5855 *ts = 0; 5856 again: 5857 /* 5858 * We repeat when a time extend is encountered. 5859 * Since the time extend is always attached to a data event, 5860 * we should never loop more than once. 5861 * (We never hit the following condition more than twice). 5862 */ 5863 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5864 return NULL; 5865 5866 reader = rb_get_reader_page(cpu_buffer); 5867 if (!reader) 5868 return NULL; 5869 5870 event = rb_reader_event(cpu_buffer); 5871 5872 switch (event->type_len) { 5873 case RINGBUF_TYPE_PADDING: 5874 if (rb_null_event(event)) 5875 RB_WARN_ON(cpu_buffer, 1); 5876 /* 5877 * Because the writer could be discarding every 5878 * event it creates (which would probably be bad) 5879 * if we were to go back to "again" then we may never 5880 * catch up, and will trigger the warn on, or lock 5881 * the box. Return the padding, and we will release 5882 * the current locks, and try again. 5883 */ 5884 return event; 5885 5886 case RINGBUF_TYPE_TIME_EXTEND: 5887 /* Internal data, OK to advance */ 5888 rb_advance_reader(cpu_buffer); 5889 goto again; 5890 5891 case RINGBUF_TYPE_TIME_STAMP: 5892 if (ts) { 5893 *ts = rb_event_time_stamp(event); 5894 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5895 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5896 cpu_buffer->cpu, ts); 5897 } 5898 /* Internal data, OK to advance */ 5899 rb_advance_reader(cpu_buffer); 5900 goto again; 5901 5902 case RINGBUF_TYPE_DATA: 5903 if (ts && !(*ts)) { 5904 *ts = cpu_buffer->read_stamp + event->time_delta; 5905 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5906 cpu_buffer->cpu, ts); 5907 } 5908 if (lost_events) 5909 *lost_events = rb_lost_events(cpu_buffer); 5910 return event; 5911 5912 default: 5913 RB_WARN_ON(cpu_buffer, 1); 5914 } 5915 5916 return NULL; 5917 } 5918 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5919 5920 static struct ring_buffer_event * 5921 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5922 { 5923 struct trace_buffer *buffer; 5924 struct ring_buffer_per_cpu *cpu_buffer; 5925 struct ring_buffer_event *event; 5926 int nr_loops = 0; 5927 5928 if (ts) 5929 *ts = 0; 5930 5931 cpu_buffer = iter->cpu_buffer; 5932 buffer = cpu_buffer->buffer; 5933 5934 /* 5935 * Check if someone performed a consuming read to the buffer 5936 * or removed some pages from the buffer. In these cases, 5937 * iterator was invalidated and we need to reset it. 5938 */ 5939 if (unlikely(iter->cache_read != cpu_buffer->read || 5940 iter->cache_reader_page != cpu_buffer->reader_page || 5941 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5942 rb_iter_reset(iter); 5943 5944 again: 5945 if (ring_buffer_iter_empty(iter)) 5946 return NULL; 5947 5948 /* 5949 * As the writer can mess with what the iterator is trying 5950 * to read, just give up if we fail to get an event after 5951 * three tries. The iterator is not as reliable when reading 5952 * the ring buffer with an active write as the consumer is. 5953 * Do not warn if the three failures is reached. 5954 */ 5955 if (++nr_loops > 3) 5956 return NULL; 5957 5958 if (rb_per_cpu_empty(cpu_buffer)) 5959 return NULL; 5960 5961 if (iter->head >= rb_page_size(iter->head_page)) { 5962 rb_inc_iter(iter); 5963 goto again; 5964 } 5965 5966 event = rb_iter_head_event(iter); 5967 if (!event) 5968 goto again; 5969 5970 switch (event->type_len) { 5971 case RINGBUF_TYPE_PADDING: 5972 if (rb_null_event(event)) { 5973 rb_inc_iter(iter); 5974 goto again; 5975 } 5976 rb_advance_iter(iter); 5977 return event; 5978 5979 case RINGBUF_TYPE_TIME_EXTEND: 5980 /* Internal data, OK to advance */ 5981 rb_advance_iter(iter); 5982 goto again; 5983 5984 case RINGBUF_TYPE_TIME_STAMP: 5985 if (ts) { 5986 *ts = rb_event_time_stamp(event); 5987 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5988 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5989 cpu_buffer->cpu, ts); 5990 } 5991 /* Internal data, OK to advance */ 5992 rb_advance_iter(iter); 5993 goto again; 5994 5995 case RINGBUF_TYPE_DATA: 5996 if (ts && !(*ts)) { 5997 *ts = iter->read_stamp + event->time_delta; 5998 ring_buffer_normalize_time_stamp(buffer, 5999 cpu_buffer->cpu, ts); 6000 } 6001 return event; 6002 6003 default: 6004 RB_WARN_ON(cpu_buffer, 1); 6005 } 6006 6007 return NULL; 6008 } 6009 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 6010 6011 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 6012 { 6013 if (likely(!in_nmi())) { 6014 raw_spin_lock(&cpu_buffer->reader_lock); 6015 return true; 6016 } 6017 6018 /* 6019 * If an NMI die dumps out the content of the ring buffer 6020 * trylock must be used to prevent a deadlock if the NMI 6021 * preempted a task that holds the ring buffer locks. If 6022 * we get the lock then all is fine, if not, then continue 6023 * to do the read, but this can corrupt the ring buffer, 6024 * so it must be permanently disabled from future writes. 6025 * Reading from NMI is a oneshot deal. 6026 */ 6027 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 6028 return true; 6029 6030 /* Continue without locking, but disable the ring buffer */ 6031 atomic_inc(&cpu_buffer->record_disabled); 6032 return false; 6033 } 6034 6035 static inline void 6036 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 6037 { 6038 if (likely(locked)) 6039 raw_spin_unlock(&cpu_buffer->reader_lock); 6040 } 6041 6042 /** 6043 * ring_buffer_peek - peek at the next event to be read 6044 * @buffer: The ring buffer to read 6045 * @cpu: The cpu to peak at 6046 * @ts: The timestamp counter of this event. 6047 * @lost_events: a variable to store if events were lost (may be NULL) 6048 * 6049 * This will return the event that will be read next, but does 6050 * not consume the data. 6051 */ 6052 struct ring_buffer_event * 6053 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 6054 unsigned long *lost_events) 6055 { 6056 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6057 struct ring_buffer_event *event; 6058 unsigned long flags; 6059 bool dolock; 6060 6061 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6062 return NULL; 6063 6064 again: 6065 local_irq_save(flags); 6066 dolock = rb_reader_lock(cpu_buffer); 6067 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6068 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6069 rb_advance_reader(cpu_buffer); 6070 rb_reader_unlock(cpu_buffer, dolock); 6071 local_irq_restore(flags); 6072 6073 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6074 goto again; 6075 6076 return event; 6077 } 6078 6079 /** ring_buffer_iter_dropped - report if there are dropped events 6080 * @iter: The ring buffer iterator 6081 * 6082 * Returns true if there was dropped events since the last peek. 6083 */ 6084 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 6085 { 6086 bool ret = iter->missed_events != 0; 6087 6088 iter->missed_events = 0; 6089 return ret; 6090 } 6091 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 6092 6093 /** 6094 * ring_buffer_iter_peek - peek at the next event to be read 6095 * @iter: The ring buffer iterator 6096 * @ts: The timestamp counter of this event. 6097 * 6098 * This will return the event that will be read next, but does 6099 * not increment the iterator. 6100 */ 6101 struct ring_buffer_event * 6102 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 6103 { 6104 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6105 struct ring_buffer_event *event; 6106 unsigned long flags; 6107 6108 again: 6109 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6110 event = rb_iter_peek(iter, ts); 6111 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6112 6113 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6114 goto again; 6115 6116 return event; 6117 } 6118 6119 /** 6120 * ring_buffer_consume - return an event and consume it 6121 * @buffer: The ring buffer to get the next event from 6122 * @cpu: the cpu to read the buffer from 6123 * @ts: a variable to store the timestamp (may be NULL) 6124 * @lost_events: a variable to store if events were lost (may be NULL) 6125 * 6126 * Returns the next event in the ring buffer, and that event is consumed. 6127 * Meaning, that sequential reads will keep returning a different event, 6128 * and eventually empty the ring buffer if the producer is slower. 6129 */ 6130 struct ring_buffer_event * 6131 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 6132 unsigned long *lost_events) 6133 { 6134 struct ring_buffer_per_cpu *cpu_buffer; 6135 struct ring_buffer_event *event = NULL; 6136 unsigned long flags; 6137 bool dolock; 6138 6139 again: 6140 /* might be called in atomic */ 6141 preempt_disable(); 6142 6143 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6144 goto out; 6145 6146 cpu_buffer = buffer->buffers[cpu]; 6147 local_irq_save(flags); 6148 dolock = rb_reader_lock(cpu_buffer); 6149 6150 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6151 if (event) { 6152 cpu_buffer->lost_events = 0; 6153 rb_advance_reader(cpu_buffer); 6154 } 6155 6156 rb_reader_unlock(cpu_buffer, dolock); 6157 local_irq_restore(flags); 6158 6159 out: 6160 preempt_enable(); 6161 6162 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6163 goto again; 6164 6165 return event; 6166 } 6167 EXPORT_SYMBOL_GPL(ring_buffer_consume); 6168 6169 /** 6170 * ring_buffer_read_start - start a non consuming read of the buffer 6171 * @buffer: The ring buffer to read from 6172 * @cpu: The cpu buffer to iterate over 6173 * @flags: gfp flags to use for memory allocation 6174 * 6175 * This creates an iterator to allow non-consuming iteration through 6176 * the buffer. If the buffer is disabled for writing, it will produce 6177 * the same information each time, but if the buffer is still writing 6178 * then the first hit of a write will cause the iteration to stop. 6179 * 6180 * Must be paired with ring_buffer_read_finish. 6181 */ 6182 struct ring_buffer_iter * 6183 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 6184 { 6185 struct ring_buffer_per_cpu *cpu_buffer; 6186 struct ring_buffer_iter *iter; 6187 6188 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6189 return NULL; 6190 6191 iter = kzalloc_obj(*iter, flags); 6192 if (!iter) 6193 return NULL; 6194 6195 /* Holds the entire event: data and meta data */ 6196 iter->event_size = buffer->subbuf_size; 6197 iter->event = kmalloc(iter->event_size, flags); 6198 if (!iter->event) { 6199 kfree(iter); 6200 return NULL; 6201 } 6202 6203 cpu_buffer = buffer->buffers[cpu]; 6204 6205 iter->cpu_buffer = cpu_buffer; 6206 6207 atomic_inc(&cpu_buffer->resize_disabled); 6208 6209 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6210 arch_spin_lock(&cpu_buffer->lock); 6211 rb_iter_reset(iter); 6212 arch_spin_unlock(&cpu_buffer->lock); 6213 6214 return iter; 6215 } 6216 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6217 6218 /** 6219 * ring_buffer_read_finish - finish reading the iterator of the buffer 6220 * @iter: The iterator retrieved by ring_buffer_start 6221 * 6222 * This re-enables resizing of the buffer, and frees the iterator. 6223 */ 6224 void 6225 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6226 { 6227 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6228 6229 /* Use this opportunity to check the integrity of the ring buffer. */ 6230 rb_check_pages(cpu_buffer); 6231 6232 atomic_dec(&cpu_buffer->resize_disabled); 6233 kfree(iter->event); 6234 kfree(iter); 6235 } 6236 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6237 6238 /** 6239 * ring_buffer_iter_advance - advance the iterator to the next location 6240 * @iter: The ring buffer iterator 6241 * 6242 * Move the location of the iterator such that the next read will 6243 * be the next location of the iterator. 6244 */ 6245 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6246 { 6247 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6248 unsigned long flags; 6249 6250 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6251 6252 rb_advance_iter(iter); 6253 6254 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6255 } 6256 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6257 6258 /** 6259 * ring_buffer_size - return the size of the ring buffer (in bytes) 6260 * @buffer: The ring buffer. 6261 * @cpu: The CPU to get ring buffer size from. 6262 */ 6263 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6264 { 6265 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6266 return 0; 6267 6268 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6269 } 6270 EXPORT_SYMBOL_GPL(ring_buffer_size); 6271 6272 /** 6273 * ring_buffer_max_event_size - return the max data size of an event 6274 * @buffer: The ring buffer. 6275 * 6276 * Returns the maximum size an event can be. 6277 */ 6278 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6279 { 6280 /* If abs timestamp is requested, events have a timestamp too */ 6281 if (ring_buffer_time_stamp_abs(buffer)) 6282 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6283 return buffer->max_data_size; 6284 } 6285 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6286 6287 static void rb_clear_buffer_page(struct buffer_page *page) 6288 { 6289 local_set(&page->write, 0); 6290 local_set(&page->entries, 0); 6291 rb_init_page(page->page); 6292 page->read = 0; 6293 } 6294 6295 /* 6296 * When the buffer is memory mapped to user space, each sub buffer 6297 * has a unique id that is used by the meta data to tell the user 6298 * where the current reader page is. 6299 * 6300 * For a normal allocated ring buffer, the id is saved in the buffer page 6301 * id field, and updated via this function. 6302 * 6303 * But for a fixed memory mapped buffer, the id is already assigned for 6304 * fixed memory ordering in the memory layout and can not be used. Instead 6305 * the index of where the page lies in the memory layout is used. 6306 * 6307 * For the normal pages, set the buffer page id with the passed in @id 6308 * value and return that. 6309 * 6310 * For fixed memory mapped pages, get the page index in the memory layout 6311 * and return that as the id. 6312 */ 6313 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6314 struct buffer_page *bpage, int id) 6315 { 6316 /* 6317 * For boot buffers, the id is the index, 6318 * otherwise, set the buffer page with this id 6319 */ 6320 if (cpu_buffer->ring_meta) 6321 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6322 else 6323 bpage->id = id; 6324 6325 return id; 6326 } 6327 6328 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6329 { 6330 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6331 6332 if (!meta) 6333 return; 6334 6335 meta->reader.read = cpu_buffer->reader_page->read; 6336 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6337 cpu_buffer->reader_page->id); 6338 6339 meta->reader.lost_events = cpu_buffer->lost_events; 6340 6341 meta->entries = local_read(&cpu_buffer->entries); 6342 meta->overrun = local_read(&cpu_buffer->overrun); 6343 meta->read = cpu_buffer->read; 6344 meta->pages_lost = local_read(&cpu_buffer->pages_lost); 6345 meta->pages_touched = local_read(&cpu_buffer->pages_touched); 6346 6347 /* Some archs do not have data cache coherency between kernel and user-space */ 6348 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6349 } 6350 6351 static void 6352 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6353 { 6354 struct buffer_page *page; 6355 6356 if (cpu_buffer->remote) { 6357 if (!cpu_buffer->remote->reset) 6358 return; 6359 6360 cpu_buffer->remote->reset(cpu_buffer->cpu, cpu_buffer->remote->priv); 6361 rb_read_remote_meta_page(cpu_buffer); 6362 6363 /* Read related values, not covered by the meta-page */ 6364 local_set(&cpu_buffer->pages_read, 0); 6365 cpu_buffer->read = 0; 6366 cpu_buffer->read_bytes = 0; 6367 cpu_buffer->last_overrun = 0; 6368 cpu_buffer->reader_page->read = 0; 6369 6370 return; 6371 } 6372 6373 rb_head_page_deactivate(cpu_buffer); 6374 6375 cpu_buffer->head_page 6376 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6377 rb_clear_buffer_page(cpu_buffer->head_page); 6378 list_for_each_entry(page, cpu_buffer->pages, list) { 6379 rb_clear_buffer_page(page); 6380 } 6381 6382 cpu_buffer->tail_page = cpu_buffer->head_page; 6383 cpu_buffer->commit_page = cpu_buffer->head_page; 6384 6385 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6386 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6387 rb_clear_buffer_page(cpu_buffer->reader_page); 6388 6389 local_set(&cpu_buffer->entries_bytes, 0); 6390 local_set(&cpu_buffer->overrun, 0); 6391 local_set(&cpu_buffer->commit_overrun, 0); 6392 local_set(&cpu_buffer->dropped_events, 0); 6393 local_set(&cpu_buffer->entries, 0); 6394 local_set(&cpu_buffer->committing, 0); 6395 local_set(&cpu_buffer->commits, 0); 6396 local_set(&cpu_buffer->pages_touched, 0); 6397 local_set(&cpu_buffer->pages_lost, 0); 6398 local_set(&cpu_buffer->pages_read, 0); 6399 cpu_buffer->last_pages_touch = 0; 6400 cpu_buffer->shortest_full = 0; 6401 cpu_buffer->read = 0; 6402 cpu_buffer->read_bytes = 0; 6403 6404 rb_time_set(&cpu_buffer->write_stamp, 0); 6405 rb_time_set(&cpu_buffer->before_stamp, 0); 6406 6407 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6408 6409 cpu_buffer->lost_events = 0; 6410 cpu_buffer->last_overrun = 0; 6411 6412 rb_head_page_activate(cpu_buffer); 6413 cpu_buffer->pages_removed = 0; 6414 6415 if (cpu_buffer->mapped) { 6416 rb_update_meta_page(cpu_buffer); 6417 if (cpu_buffer->ring_meta) { 6418 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6419 meta->commit_buffer = meta->head_buffer; 6420 } 6421 } 6422 } 6423 6424 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6425 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6426 { 6427 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6428 6429 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6430 return; 6431 6432 arch_spin_lock(&cpu_buffer->lock); 6433 6434 rb_reset_cpu(cpu_buffer); 6435 6436 arch_spin_unlock(&cpu_buffer->lock); 6437 } 6438 6439 /** 6440 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6441 * @buffer: The ring buffer to reset a per cpu buffer of 6442 * @cpu: The CPU buffer to be reset 6443 */ 6444 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6445 { 6446 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6447 6448 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6449 return; 6450 6451 /* prevent another thread from changing buffer sizes */ 6452 mutex_lock(&buffer->mutex); 6453 6454 atomic_inc(&cpu_buffer->resize_disabled); 6455 atomic_inc(&cpu_buffer->record_disabled); 6456 6457 /* Make sure all commits have finished */ 6458 synchronize_rcu(); 6459 6460 reset_disabled_cpu_buffer(cpu_buffer); 6461 6462 atomic_dec(&cpu_buffer->record_disabled); 6463 atomic_dec(&cpu_buffer->resize_disabled); 6464 6465 mutex_unlock(&buffer->mutex); 6466 } 6467 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6468 6469 /* Flag to ensure proper resetting of atomic variables */ 6470 #define RESET_BIT (1 << 30) 6471 6472 /** 6473 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6474 * @buffer: The ring buffer to reset a per cpu buffer of 6475 */ 6476 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6477 { 6478 struct ring_buffer_per_cpu *cpu_buffer; 6479 int cpu; 6480 6481 /* prevent another thread from changing buffer sizes */ 6482 mutex_lock(&buffer->mutex); 6483 6484 for_each_online_buffer_cpu(buffer, cpu) { 6485 cpu_buffer = buffer->buffers[cpu]; 6486 6487 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6488 atomic_inc(&cpu_buffer->record_disabled); 6489 } 6490 6491 /* Make sure all commits have finished */ 6492 synchronize_rcu(); 6493 6494 for_each_buffer_cpu(buffer, cpu) { 6495 cpu_buffer = buffer->buffers[cpu]; 6496 6497 /* 6498 * If a CPU came online during the synchronize_rcu(), then 6499 * ignore it. 6500 */ 6501 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6502 continue; 6503 6504 reset_disabled_cpu_buffer(cpu_buffer); 6505 6506 atomic_dec(&cpu_buffer->record_disabled); 6507 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6508 } 6509 6510 mutex_unlock(&buffer->mutex); 6511 } 6512 6513 /** 6514 * ring_buffer_reset - reset a ring buffer 6515 * @buffer: The ring buffer to reset all cpu buffers 6516 */ 6517 void ring_buffer_reset(struct trace_buffer *buffer) 6518 { 6519 struct ring_buffer_per_cpu *cpu_buffer; 6520 int cpu; 6521 6522 /* prevent another thread from changing buffer sizes */ 6523 mutex_lock(&buffer->mutex); 6524 6525 for_each_buffer_cpu(buffer, cpu) { 6526 cpu_buffer = buffer->buffers[cpu]; 6527 6528 atomic_inc(&cpu_buffer->resize_disabled); 6529 atomic_inc(&cpu_buffer->record_disabled); 6530 } 6531 6532 /* Make sure all commits have finished */ 6533 synchronize_rcu(); 6534 6535 for_each_buffer_cpu(buffer, cpu) { 6536 cpu_buffer = buffer->buffers[cpu]; 6537 6538 reset_disabled_cpu_buffer(cpu_buffer); 6539 6540 atomic_dec(&cpu_buffer->record_disabled); 6541 atomic_dec(&cpu_buffer->resize_disabled); 6542 } 6543 6544 mutex_unlock(&buffer->mutex); 6545 } 6546 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6547 6548 /** 6549 * ring_buffer_empty - is the ring buffer empty? 6550 * @buffer: The ring buffer to test 6551 */ 6552 bool ring_buffer_empty(struct trace_buffer *buffer) 6553 { 6554 struct ring_buffer_per_cpu *cpu_buffer; 6555 unsigned long flags; 6556 bool dolock; 6557 bool ret; 6558 int cpu; 6559 6560 /* yes this is racy, but if you don't like the race, lock the buffer */ 6561 for_each_buffer_cpu(buffer, cpu) { 6562 cpu_buffer = buffer->buffers[cpu]; 6563 local_irq_save(flags); 6564 dolock = rb_reader_lock(cpu_buffer); 6565 ret = rb_per_cpu_empty(cpu_buffer); 6566 rb_reader_unlock(cpu_buffer, dolock); 6567 local_irq_restore(flags); 6568 6569 if (!ret) 6570 return false; 6571 } 6572 6573 return true; 6574 } 6575 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6576 6577 /** 6578 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6579 * @buffer: The ring buffer 6580 * @cpu: The CPU buffer to test 6581 */ 6582 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6583 { 6584 struct ring_buffer_per_cpu *cpu_buffer; 6585 unsigned long flags; 6586 bool dolock; 6587 bool ret; 6588 6589 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6590 return true; 6591 6592 cpu_buffer = buffer->buffers[cpu]; 6593 local_irq_save(flags); 6594 dolock = rb_reader_lock(cpu_buffer); 6595 ret = rb_per_cpu_empty(cpu_buffer); 6596 rb_reader_unlock(cpu_buffer, dolock); 6597 local_irq_restore(flags); 6598 6599 return ret; 6600 } 6601 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6602 6603 int ring_buffer_poll_remote(struct trace_buffer *buffer, int cpu) 6604 { 6605 struct ring_buffer_per_cpu *cpu_buffer; 6606 6607 if (cpu != RING_BUFFER_ALL_CPUS) { 6608 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6609 return -EINVAL; 6610 6611 cpu_buffer = buffer->buffers[cpu]; 6612 6613 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6614 if (rb_read_remote_meta_page(cpu_buffer)) 6615 rb_wakeups(buffer, cpu_buffer); 6616 6617 return 0; 6618 } 6619 6620 guard(cpus_read_lock)(); 6621 6622 /* 6623 * Make sure all the ring buffers are up to date before we start reading 6624 * them. 6625 */ 6626 for_each_buffer_cpu(buffer, cpu) { 6627 cpu_buffer = buffer->buffers[cpu]; 6628 6629 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6630 rb_read_remote_meta_page(cpu_buffer); 6631 } 6632 6633 for_each_buffer_cpu(buffer, cpu) { 6634 cpu_buffer = buffer->buffers[cpu]; 6635 6636 if (rb_num_of_entries(cpu_buffer)) 6637 rb_wakeups(buffer, cpu_buffer); 6638 } 6639 6640 return 0; 6641 } 6642 6643 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6644 /** 6645 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6646 * @buffer_a: One buffer to swap with 6647 * @buffer_b: The other buffer to swap with 6648 * @cpu: the CPU of the buffers to swap 6649 * 6650 * This function is useful for tracers that want to take a "snapshot" 6651 * of a CPU buffer and has another back up buffer lying around. 6652 * it is expected that the tracer handles the cpu buffer not being 6653 * used at the moment. 6654 */ 6655 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6656 struct trace_buffer *buffer_b, int cpu) 6657 { 6658 struct ring_buffer_per_cpu *cpu_buffer_a; 6659 struct ring_buffer_per_cpu *cpu_buffer_b; 6660 int ret = -EINVAL; 6661 6662 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6663 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6664 return -EINVAL; 6665 6666 cpu_buffer_a = buffer_a->buffers[cpu]; 6667 cpu_buffer_b = buffer_b->buffers[cpu]; 6668 6669 /* It's up to the callers to not try to swap mapped buffers */ 6670 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6671 return -EBUSY; 6672 6673 /* At least make sure the two buffers are somewhat the same */ 6674 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6675 return -EINVAL; 6676 6677 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6678 return -EINVAL; 6679 6680 if (atomic_read(&buffer_a->record_disabled)) 6681 return -EAGAIN; 6682 6683 if (atomic_read(&buffer_b->record_disabled)) 6684 return -EAGAIN; 6685 6686 if (atomic_read(&cpu_buffer_a->record_disabled)) 6687 return -EAGAIN; 6688 6689 if (atomic_read(&cpu_buffer_b->record_disabled)) 6690 return -EAGAIN; 6691 6692 /* 6693 * We can't do a synchronize_rcu here because this 6694 * function can be called in atomic context. 6695 * Normally this will be called from the same CPU as cpu. 6696 * If not it's up to the caller to protect this. 6697 */ 6698 atomic_inc(&cpu_buffer_a->record_disabled); 6699 atomic_inc(&cpu_buffer_b->record_disabled); 6700 6701 ret = -EBUSY; 6702 if (local_read(&cpu_buffer_a->committing)) 6703 goto out_dec; 6704 if (local_read(&cpu_buffer_b->committing)) 6705 goto out_dec; 6706 6707 /* 6708 * When resize is in progress, we cannot swap it because 6709 * it will mess the state of the cpu buffer. 6710 */ 6711 if (atomic_read(&buffer_a->resizing)) 6712 goto out_dec; 6713 if (atomic_read(&buffer_b->resizing)) 6714 goto out_dec; 6715 6716 buffer_a->buffers[cpu] = cpu_buffer_b; 6717 buffer_b->buffers[cpu] = cpu_buffer_a; 6718 6719 cpu_buffer_b->buffer = buffer_a; 6720 cpu_buffer_a->buffer = buffer_b; 6721 6722 ret = 0; 6723 6724 out_dec: 6725 atomic_dec(&cpu_buffer_a->record_disabled); 6726 atomic_dec(&cpu_buffer_b->record_disabled); 6727 return ret; 6728 } 6729 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6730 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6731 6732 /** 6733 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6734 * @buffer: the buffer to allocate for. 6735 * @cpu: the cpu buffer to allocate. 6736 * 6737 * This function is used in conjunction with ring_buffer_read_page. 6738 * When reading a full page from the ring buffer, these functions 6739 * can be used to speed up the process. The calling function should 6740 * allocate a few pages first with this function. Then when it 6741 * needs to get pages from the ring buffer, it passes the result 6742 * of this function into ring_buffer_read_page, which will swap 6743 * the page that was allocated, with the read page of the buffer. 6744 * 6745 * Returns: 6746 * The page allocated, or ERR_PTR 6747 */ 6748 struct buffer_data_read_page * 6749 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6750 { 6751 struct ring_buffer_per_cpu *cpu_buffer; 6752 struct buffer_data_read_page *bpage = NULL; 6753 unsigned long flags; 6754 6755 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6756 return ERR_PTR(-ENODEV); 6757 6758 bpage = kzalloc_obj(*bpage); 6759 if (!bpage) 6760 return ERR_PTR(-ENOMEM); 6761 6762 bpage->order = buffer->subbuf_order; 6763 cpu_buffer = buffer->buffers[cpu]; 6764 local_irq_save(flags); 6765 arch_spin_lock(&cpu_buffer->lock); 6766 6767 if (cpu_buffer->free_page) { 6768 bpage->data = cpu_buffer->free_page; 6769 cpu_buffer->free_page = NULL; 6770 } 6771 6772 arch_spin_unlock(&cpu_buffer->lock); 6773 local_irq_restore(flags); 6774 6775 if (bpage->data) { 6776 rb_init_page(bpage->data); 6777 } else { 6778 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6779 if (!bpage->data) { 6780 kfree(bpage); 6781 return ERR_PTR(-ENOMEM); 6782 } 6783 } 6784 6785 return bpage; 6786 } 6787 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6788 6789 /** 6790 * ring_buffer_free_read_page - free an allocated read page 6791 * @buffer: the buffer the page was allocate for 6792 * @cpu: the cpu buffer the page came from 6793 * @data_page: the page to free 6794 * 6795 * Free a page allocated from ring_buffer_alloc_read_page. 6796 */ 6797 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6798 struct buffer_data_read_page *data_page) 6799 { 6800 struct ring_buffer_per_cpu *cpu_buffer; 6801 struct buffer_data_page *bpage = data_page->data; 6802 struct page *page = virt_to_page(bpage); 6803 unsigned long flags; 6804 6805 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6806 return; 6807 6808 cpu_buffer = buffer->buffers[cpu]; 6809 6810 /* 6811 * If the page is still in use someplace else, or order of the page 6812 * is different from the subbuffer order of the buffer - 6813 * we can't reuse it 6814 */ 6815 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6816 goto out; 6817 6818 local_irq_save(flags); 6819 arch_spin_lock(&cpu_buffer->lock); 6820 6821 if (!cpu_buffer->free_page) { 6822 cpu_buffer->free_page = bpage; 6823 bpage = NULL; 6824 } 6825 6826 arch_spin_unlock(&cpu_buffer->lock); 6827 local_irq_restore(flags); 6828 6829 out: 6830 free_pages((unsigned long)bpage, data_page->order); 6831 kfree(data_page); 6832 } 6833 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6834 6835 /** 6836 * ring_buffer_read_page - extract a page from the ring buffer 6837 * @buffer: buffer to extract from 6838 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6839 * @len: amount to extract 6840 * @cpu: the cpu of the buffer to extract 6841 * @full: should the extraction only happen when the page is full. 6842 * 6843 * This function will pull out a page from the ring buffer and consume it. 6844 * @data_page must be the address of the variable that was returned 6845 * from ring_buffer_alloc_read_page. This is because the page might be used 6846 * to swap with a page in the ring buffer. 6847 * 6848 * for example: 6849 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6850 * if (IS_ERR(rpage)) 6851 * return PTR_ERR(rpage); 6852 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6853 * if (ret >= 0) 6854 * process_page(ring_buffer_read_page_data(rpage), ret); 6855 * ring_buffer_free_read_page(buffer, cpu, rpage); 6856 * 6857 * When @full is set, the function will not return true unless 6858 * the writer is off the reader page. 6859 * 6860 * Note: it is up to the calling functions to handle sleeps and wakeups. 6861 * The ring buffer can be used anywhere in the kernel and can not 6862 * blindly call wake_up. The layer that uses the ring buffer must be 6863 * responsible for that. 6864 * 6865 * Returns: 6866 * >=0 if data has been transferred, returns the offset of consumed data. 6867 * <0 if no data has been transferred. 6868 */ 6869 int ring_buffer_read_page(struct trace_buffer *buffer, 6870 struct buffer_data_read_page *data_page, 6871 size_t len, int cpu, int full) 6872 { 6873 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6874 struct ring_buffer_event *event; 6875 struct buffer_data_page *bpage; 6876 struct buffer_page *reader; 6877 unsigned long missed_events; 6878 unsigned int commit; 6879 unsigned int read; 6880 u64 save_timestamp; 6881 bool force_memcpy; 6882 6883 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6884 return -1; 6885 6886 /* 6887 * If len is not big enough to hold the page header, then 6888 * we can not copy anything. 6889 */ 6890 if (len <= BUF_PAGE_HDR_SIZE) 6891 return -1; 6892 6893 len -= BUF_PAGE_HDR_SIZE; 6894 6895 if (!data_page || !data_page->data) 6896 return -1; 6897 6898 if (data_page->order != buffer->subbuf_order) 6899 return -1; 6900 6901 bpage = data_page->data; 6902 if (!bpage) 6903 return -1; 6904 6905 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6906 6907 reader = rb_get_reader_page(cpu_buffer); 6908 if (!reader) 6909 return -1; 6910 6911 event = rb_reader_event(cpu_buffer); 6912 6913 read = reader->read; 6914 commit = rb_page_size(reader); 6915 6916 /* Check if any events were dropped */ 6917 missed_events = cpu_buffer->lost_events; 6918 6919 force_memcpy = cpu_buffer->mapped || cpu_buffer->remote; 6920 6921 /* 6922 * If this page has been partially read or 6923 * if len is not big enough to read the rest of the page or 6924 * a writer is still on the page, then 6925 * we must copy the data from the page to the buffer. 6926 * Otherwise, we can simply swap the page with the one passed in. 6927 */ 6928 if (read || (len < (commit - read)) || 6929 cpu_buffer->reader_page == cpu_buffer->commit_page || 6930 force_memcpy) { 6931 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6932 unsigned int rpos = read; 6933 unsigned int pos = 0; 6934 unsigned int size; 6935 6936 /* 6937 * If a full page is expected, this can still be returned 6938 * if there's been a previous partial read and the 6939 * rest of the page can be read and the commit page is off 6940 * the reader page. 6941 */ 6942 if (full && 6943 (!read || (len < (commit - read)) || 6944 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6945 return -1; 6946 6947 if (len > (commit - read)) 6948 len = (commit - read); 6949 6950 /* Always keep the time extend and data together */ 6951 size = rb_event_ts_length(event); 6952 6953 if (len < size) 6954 return -1; 6955 6956 /* save the current timestamp, since the user will need it */ 6957 save_timestamp = cpu_buffer->read_stamp; 6958 6959 /* Need to copy one event at a time */ 6960 do { 6961 /* We need the size of one event, because 6962 * rb_advance_reader only advances by one event, 6963 * whereas rb_event_ts_length may include the size of 6964 * one or two events. 6965 * We have already ensured there's enough space if this 6966 * is a time extend. */ 6967 size = rb_event_length(event); 6968 memcpy(bpage->data + pos, rpage->data + rpos, size); 6969 6970 len -= size; 6971 6972 rb_advance_reader(cpu_buffer); 6973 rpos = reader->read; 6974 pos += size; 6975 6976 if (rpos >= commit) 6977 break; 6978 6979 event = rb_reader_event(cpu_buffer); 6980 /* Always keep the time extend and data together */ 6981 size = rb_event_ts_length(event); 6982 } while (len >= size); 6983 6984 /* update bpage */ 6985 local_set(&bpage->commit, pos); 6986 bpage->time_stamp = save_timestamp; 6987 6988 /* we copied everything to the beginning */ 6989 read = 0; 6990 } else { 6991 /* update the entry counter */ 6992 cpu_buffer->read += rb_page_entries(reader); 6993 cpu_buffer->read_bytes += rb_page_size(reader); 6994 6995 /* swap the pages */ 6996 rb_init_page(bpage); 6997 bpage = reader->page; 6998 reader->page = data_page->data; 6999 local_set(&reader->write, 0); 7000 local_set(&reader->entries, 0); 7001 reader->read = 0; 7002 data_page->data = bpage; 7003 7004 /* 7005 * Use the real_end for the data size, 7006 * This gives us a chance to store the lost events 7007 * on the page. 7008 */ 7009 if (reader->real_end) 7010 local_set(&bpage->commit, reader->real_end); 7011 } 7012 7013 cpu_buffer->lost_events = 0; 7014 7015 commit = local_read(&bpage->commit); 7016 /* 7017 * Set a flag in the commit field if we lost events 7018 */ 7019 if (missed_events) { 7020 /* If there is room at the end of the page to save the 7021 * missed events, then record it there. 7022 */ 7023 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7024 memcpy(&bpage->data[commit], &missed_events, 7025 sizeof(missed_events)); 7026 local_add(RB_MISSED_STORED, &bpage->commit); 7027 commit += sizeof(missed_events); 7028 } 7029 local_add(RB_MISSED_EVENTS, &bpage->commit); 7030 } 7031 7032 /* 7033 * This page may be off to user land. Zero it out here. 7034 */ 7035 if (commit < buffer->subbuf_size) 7036 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 7037 7038 return read; 7039 } 7040 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 7041 7042 /** 7043 * ring_buffer_read_page_data - get pointer to the data in the page. 7044 * @page: the page to get the data from 7045 * 7046 * Returns pointer to the actual data in this page. 7047 */ 7048 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 7049 { 7050 return page->data; 7051 } 7052 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 7053 7054 /** 7055 * ring_buffer_subbuf_size_get - get size of the sub buffer. 7056 * @buffer: the buffer to get the sub buffer size from 7057 * 7058 * Returns size of the sub buffer, in bytes. 7059 */ 7060 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 7061 { 7062 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7063 } 7064 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 7065 7066 /** 7067 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 7068 * @buffer: The ring_buffer to get the system sub page order from 7069 * 7070 * By default, one ring buffer sub page equals to one system page. This parameter 7071 * is configurable, per ring buffer. The size of the ring buffer sub page can be 7072 * extended, but must be an order of system page size. 7073 * 7074 * Returns the order of buffer sub page size, in system pages: 7075 * 0 means the sub buffer size is 1 system page and so forth. 7076 * In case of an error < 0 is returned. 7077 */ 7078 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 7079 { 7080 if (!buffer) 7081 return -EINVAL; 7082 7083 return buffer->subbuf_order; 7084 } 7085 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 7086 7087 /** 7088 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 7089 * @buffer: The ring_buffer to set the new page size. 7090 * @order: Order of the system pages in one sub buffer page 7091 * 7092 * By default, one ring buffer pages equals to one system page. This API can be 7093 * used to set new size of the ring buffer page. The size must be order of 7094 * system page size, that's why the input parameter @order is the order of 7095 * system pages that are allocated for one ring buffer page: 7096 * 0 - 1 system page 7097 * 1 - 2 system pages 7098 * 3 - 4 system pages 7099 * ... 7100 * 7101 * Returns 0 on success or < 0 in case of an error. 7102 */ 7103 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 7104 { 7105 struct ring_buffer_per_cpu *cpu_buffer; 7106 struct buffer_page *bpage, *tmp; 7107 int old_order, old_size; 7108 int nr_pages; 7109 int psize; 7110 int err; 7111 int cpu; 7112 7113 if (!buffer || order < 0) 7114 return -EINVAL; 7115 7116 if (buffer->subbuf_order == order) 7117 return 0; 7118 7119 psize = (1 << order) * PAGE_SIZE; 7120 if (psize <= BUF_PAGE_HDR_SIZE) 7121 return -EINVAL; 7122 7123 /* Size of a subbuf cannot be greater than the write counter */ 7124 if (psize > RB_WRITE_MASK + 1) 7125 return -EINVAL; 7126 7127 old_order = buffer->subbuf_order; 7128 old_size = buffer->subbuf_size; 7129 7130 /* prevent another thread from changing buffer sizes */ 7131 guard(mutex)(&buffer->mutex); 7132 atomic_inc(&buffer->record_disabled); 7133 7134 /* Make sure all commits have finished */ 7135 synchronize_rcu(); 7136 7137 buffer->subbuf_order = order; 7138 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 7139 7140 /* Make sure all new buffers are allocated, before deleting the old ones */ 7141 for_each_buffer_cpu(buffer, cpu) { 7142 7143 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7144 continue; 7145 7146 cpu_buffer = buffer->buffers[cpu]; 7147 7148 if (cpu_buffer->mapped) { 7149 err = -EBUSY; 7150 goto error; 7151 } 7152 7153 /* Update the number of pages to match the new size */ 7154 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 7155 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 7156 7157 /* we need a minimum of two pages */ 7158 if (nr_pages < 2) 7159 nr_pages = 2; 7160 7161 cpu_buffer->nr_pages_to_update = nr_pages; 7162 7163 /* Include the reader page */ 7164 nr_pages++; 7165 7166 /* Allocate the new size buffer */ 7167 INIT_LIST_HEAD(&cpu_buffer->new_pages); 7168 if (__rb_allocate_pages(cpu_buffer, nr_pages, 7169 &cpu_buffer->new_pages)) { 7170 /* not enough memory for new pages */ 7171 err = -ENOMEM; 7172 goto error; 7173 } 7174 } 7175 7176 for_each_buffer_cpu(buffer, cpu) { 7177 struct buffer_data_page *old_free_data_page; 7178 struct list_head old_pages; 7179 unsigned long flags; 7180 7181 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7182 continue; 7183 7184 cpu_buffer = buffer->buffers[cpu]; 7185 7186 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7187 7188 /* Clear the head bit to make the link list normal to read */ 7189 rb_head_page_deactivate(cpu_buffer); 7190 7191 /* 7192 * Collect buffers from the cpu_buffer pages list and the 7193 * reader_page on old_pages, so they can be freed later when not 7194 * under a spinlock. The pages list is a linked list with no 7195 * head, adding old_pages turns it into a regular list with 7196 * old_pages being the head. 7197 */ 7198 list_add(&old_pages, cpu_buffer->pages); 7199 list_add(&cpu_buffer->reader_page->list, &old_pages); 7200 7201 /* One page was allocated for the reader page */ 7202 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 7203 struct buffer_page, list); 7204 list_del_init(&cpu_buffer->reader_page->list); 7205 7206 /* Install the new pages, remove the head from the list */ 7207 cpu_buffer->pages = cpu_buffer->new_pages.next; 7208 list_del_init(&cpu_buffer->new_pages); 7209 cpu_buffer->cnt++; 7210 7211 cpu_buffer->head_page 7212 = list_entry(cpu_buffer->pages, struct buffer_page, list); 7213 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 7214 7215 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 7216 cpu_buffer->nr_pages_to_update = 0; 7217 7218 old_free_data_page = cpu_buffer->free_page; 7219 cpu_buffer->free_page = NULL; 7220 7221 rb_head_page_activate(cpu_buffer); 7222 7223 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7224 7225 /* Free old sub buffers */ 7226 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 7227 list_del_init(&bpage->list); 7228 free_buffer_page(bpage); 7229 } 7230 free_pages((unsigned long)old_free_data_page, old_order); 7231 7232 rb_check_pages(cpu_buffer); 7233 } 7234 7235 atomic_dec(&buffer->record_disabled); 7236 7237 return 0; 7238 7239 error: 7240 buffer->subbuf_order = old_order; 7241 buffer->subbuf_size = old_size; 7242 7243 atomic_dec(&buffer->record_disabled); 7244 7245 for_each_buffer_cpu(buffer, cpu) { 7246 cpu_buffer = buffer->buffers[cpu]; 7247 7248 if (!cpu_buffer->nr_pages_to_update) 7249 continue; 7250 7251 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 7252 list_del_init(&bpage->list); 7253 free_buffer_page(bpage); 7254 } 7255 } 7256 7257 return err; 7258 } 7259 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 7260 7261 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7262 { 7263 struct page *page; 7264 7265 if (cpu_buffer->meta_page) 7266 return 0; 7267 7268 page = alloc_page(GFP_USER | __GFP_ZERO); 7269 if (!page) 7270 return -ENOMEM; 7271 7272 cpu_buffer->meta_page = page_to_virt(page); 7273 7274 return 0; 7275 } 7276 7277 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7278 { 7279 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7280 7281 free_page(addr); 7282 cpu_buffer->meta_page = NULL; 7283 } 7284 7285 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7286 struct buffer_page **subbuf_ids) 7287 { 7288 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7289 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7290 struct buffer_page *first_subbuf, *subbuf; 7291 int cnt = 0; 7292 int id = 0; 7293 7294 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7295 subbuf_ids[id++] = cpu_buffer->reader_page; 7296 cnt++; 7297 7298 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7299 do { 7300 id = rb_page_id(cpu_buffer, subbuf, id); 7301 7302 if (WARN_ON(id >= nr_subbufs)) 7303 break; 7304 7305 subbuf_ids[id] = subbuf; 7306 7307 rb_inc_page(&subbuf); 7308 id++; 7309 cnt++; 7310 } while (subbuf != first_subbuf); 7311 7312 WARN_ON(cnt != nr_subbufs); 7313 7314 /* install subbuf ID to bpage translation */ 7315 cpu_buffer->subbuf_ids = subbuf_ids; 7316 7317 meta->meta_struct_len = sizeof(*meta); 7318 meta->nr_subbufs = nr_subbufs; 7319 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7320 meta->meta_page_size = meta->subbuf_size; 7321 7322 rb_update_meta_page(cpu_buffer); 7323 } 7324 7325 static struct ring_buffer_per_cpu * 7326 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7327 { 7328 struct ring_buffer_per_cpu *cpu_buffer; 7329 7330 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7331 return ERR_PTR(-EINVAL); 7332 7333 cpu_buffer = buffer->buffers[cpu]; 7334 7335 mutex_lock(&cpu_buffer->mapping_lock); 7336 7337 if (!cpu_buffer->user_mapped) { 7338 mutex_unlock(&cpu_buffer->mapping_lock); 7339 return ERR_PTR(-ENODEV); 7340 } 7341 7342 return cpu_buffer; 7343 } 7344 7345 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7346 { 7347 mutex_unlock(&cpu_buffer->mapping_lock); 7348 } 7349 7350 /* 7351 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7352 * to be set-up or torn-down. 7353 */ 7354 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7355 bool inc) 7356 { 7357 unsigned long flags; 7358 7359 lockdep_assert_held(&cpu_buffer->mapping_lock); 7360 7361 /* mapped is always greater or equal to user_mapped */ 7362 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7363 return -EINVAL; 7364 7365 if (inc && cpu_buffer->mapped == UINT_MAX) 7366 return -EBUSY; 7367 7368 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7369 return -EINVAL; 7370 7371 mutex_lock(&cpu_buffer->buffer->mutex); 7372 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7373 7374 if (inc) { 7375 cpu_buffer->user_mapped++; 7376 cpu_buffer->mapped++; 7377 } else { 7378 cpu_buffer->user_mapped--; 7379 cpu_buffer->mapped--; 7380 } 7381 7382 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7383 mutex_unlock(&cpu_buffer->buffer->mutex); 7384 7385 return 0; 7386 } 7387 7388 /* 7389 * +--------------+ pgoff == 0 7390 * | meta page | 7391 * +--------------+ pgoff == 1 7392 * | subbuffer 0 | 7393 * | | 7394 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7395 * | subbuffer 1 | 7396 * | | 7397 * ... 7398 */ 7399 #ifdef CONFIG_MMU 7400 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7401 struct vm_area_struct *vma) 7402 { 7403 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7404 unsigned int subbuf_pages, subbuf_order; 7405 struct page **pages __free(kfree) = NULL; 7406 int p = 0, s = 0; 7407 int err; 7408 7409 /* Refuse MP_PRIVATE or writable mappings */ 7410 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7411 !(vma->vm_flags & VM_MAYSHARE)) 7412 return -EPERM; 7413 7414 subbuf_order = cpu_buffer->buffer->subbuf_order; 7415 subbuf_pages = 1 << subbuf_order; 7416 7417 if (subbuf_order && pgoff % subbuf_pages) 7418 return -EINVAL; 7419 7420 /* 7421 * Make sure the mapping cannot become writable later. Also tell the VM 7422 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7423 */ 7424 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7425 VM_MAYWRITE); 7426 7427 lockdep_assert_held(&cpu_buffer->mapping_lock); 7428 7429 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7430 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7431 if (nr_pages <= pgoff) 7432 return -EINVAL; 7433 7434 nr_pages -= pgoff; 7435 7436 nr_vma_pages = vma_pages(vma); 7437 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7438 return -EINVAL; 7439 7440 nr_pages = nr_vma_pages; 7441 7442 pages = kzalloc_objs(*pages, nr_pages); 7443 if (!pages) 7444 return -ENOMEM; 7445 7446 if (!pgoff) { 7447 unsigned long meta_page_padding; 7448 7449 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7450 7451 /* 7452 * Pad with the zero-page to align the meta-page with the 7453 * sub-buffers. 7454 */ 7455 meta_page_padding = subbuf_pages - 1; 7456 while (meta_page_padding-- && p < nr_pages) { 7457 unsigned long __maybe_unused zero_addr = 7458 vma->vm_start + (PAGE_SIZE * p); 7459 7460 pages[p++] = ZERO_PAGE(zero_addr); 7461 } 7462 } else { 7463 /* Skip the meta-page */ 7464 pgoff -= subbuf_pages; 7465 7466 s += pgoff / subbuf_pages; 7467 } 7468 7469 while (p < nr_pages) { 7470 struct buffer_page *subbuf; 7471 struct page *page; 7472 int off = 0; 7473 7474 if (WARN_ON_ONCE(s >= nr_subbufs)) 7475 return -EINVAL; 7476 7477 subbuf = cpu_buffer->subbuf_ids[s]; 7478 page = virt_to_page((void *)subbuf->page); 7479 7480 for (; off < (1 << (subbuf_order)); off++, page++) { 7481 if (p >= nr_pages) 7482 break; 7483 7484 pages[p++] = page; 7485 } 7486 s++; 7487 } 7488 7489 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7490 7491 return err; 7492 } 7493 #else 7494 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7495 struct vm_area_struct *vma) 7496 { 7497 return -EOPNOTSUPP; 7498 } 7499 #endif 7500 7501 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7502 struct vm_area_struct *vma) 7503 { 7504 struct ring_buffer_per_cpu *cpu_buffer; 7505 struct buffer_page **subbuf_ids; 7506 unsigned long flags; 7507 int err; 7508 7509 if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->remote) 7510 return -EINVAL; 7511 7512 cpu_buffer = buffer->buffers[cpu]; 7513 7514 guard(mutex)(&cpu_buffer->mapping_lock); 7515 7516 if (cpu_buffer->user_mapped) { 7517 err = __rb_map_vma(cpu_buffer, vma); 7518 if (!err) 7519 err = __rb_inc_dec_mapped(cpu_buffer, true); 7520 return err; 7521 } 7522 7523 /* prevent another thread from changing buffer/sub-buffer sizes */ 7524 guard(mutex)(&buffer->mutex); 7525 7526 err = rb_alloc_meta_page(cpu_buffer); 7527 if (err) 7528 return err; 7529 7530 /* subbuf_ids includes the reader while nr_pages does not */ 7531 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7532 if (!subbuf_ids) { 7533 rb_free_meta_page(cpu_buffer); 7534 return -ENOMEM; 7535 } 7536 7537 atomic_inc(&cpu_buffer->resize_disabled); 7538 7539 /* 7540 * Lock all readers to block any subbuf swap until the subbuf IDs are 7541 * assigned. 7542 */ 7543 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7544 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7545 7546 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7547 7548 err = __rb_map_vma(cpu_buffer, vma); 7549 if (!err) { 7550 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7551 /* This is the first time it is mapped by user */ 7552 cpu_buffer->mapped++; 7553 cpu_buffer->user_mapped = 1; 7554 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7555 } else { 7556 kfree(cpu_buffer->subbuf_ids); 7557 cpu_buffer->subbuf_ids = NULL; 7558 rb_free_meta_page(cpu_buffer); 7559 atomic_dec(&cpu_buffer->resize_disabled); 7560 } 7561 7562 return err; 7563 } 7564 7565 /* 7566 * This is called when a VMA is duplicated (e.g., on fork()) to increment 7567 * the user_mapped counter without remapping pages. 7568 */ 7569 void ring_buffer_map_dup(struct trace_buffer *buffer, int cpu) 7570 { 7571 struct ring_buffer_per_cpu *cpu_buffer; 7572 7573 if (WARN_ON(!cpumask_test_cpu(cpu, buffer->cpumask))) 7574 return; 7575 7576 cpu_buffer = buffer->buffers[cpu]; 7577 7578 guard(mutex)(&cpu_buffer->mapping_lock); 7579 7580 if (cpu_buffer->user_mapped) 7581 __rb_inc_dec_mapped(cpu_buffer, true); 7582 else 7583 WARN(1, "Unexpected buffer stat, it should be mapped"); 7584 } 7585 7586 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7587 { 7588 struct ring_buffer_per_cpu *cpu_buffer; 7589 unsigned long flags; 7590 7591 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7592 return -EINVAL; 7593 7594 cpu_buffer = buffer->buffers[cpu]; 7595 7596 guard(mutex)(&cpu_buffer->mapping_lock); 7597 7598 if (!cpu_buffer->user_mapped) { 7599 return -ENODEV; 7600 } else if (cpu_buffer->user_mapped > 1) { 7601 __rb_inc_dec_mapped(cpu_buffer, false); 7602 return 0; 7603 } 7604 7605 guard(mutex)(&buffer->mutex); 7606 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7607 7608 /* This is the last user space mapping */ 7609 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7610 cpu_buffer->mapped--; 7611 cpu_buffer->user_mapped = 0; 7612 7613 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7614 7615 kfree(cpu_buffer->subbuf_ids); 7616 cpu_buffer->subbuf_ids = NULL; 7617 rb_free_meta_page(cpu_buffer); 7618 atomic_dec(&cpu_buffer->resize_disabled); 7619 7620 return 0; 7621 } 7622 7623 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7624 { 7625 struct ring_buffer_per_cpu *cpu_buffer; 7626 struct buffer_page *reader; 7627 unsigned long missed_events; 7628 unsigned long reader_size; 7629 unsigned long flags; 7630 7631 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7632 if (IS_ERR(cpu_buffer)) 7633 return (int)PTR_ERR(cpu_buffer); 7634 7635 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7636 7637 consume: 7638 if (rb_per_cpu_empty(cpu_buffer)) 7639 goto out; 7640 7641 reader_size = rb_page_size(cpu_buffer->reader_page); 7642 7643 /* 7644 * There are data to be read on the current reader page, we can 7645 * return to the caller. But before that, we assume the latter will read 7646 * everything. Let's update the kernel reader accordingly. 7647 */ 7648 if (cpu_buffer->reader_page->read < reader_size) { 7649 while (cpu_buffer->reader_page->read < reader_size) 7650 rb_advance_reader(cpu_buffer); 7651 goto out; 7652 } 7653 7654 /* Did the reader catch up with the writer? */ 7655 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7656 goto out; 7657 7658 reader = rb_get_reader_page(cpu_buffer); 7659 if (WARN_ON(!reader)) 7660 goto out; 7661 7662 /* Check if any events were dropped */ 7663 missed_events = cpu_buffer->lost_events; 7664 7665 if (missed_events) { 7666 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7667 struct buffer_data_page *bpage = reader->page; 7668 unsigned int commit; 7669 /* 7670 * Use the real_end for the data size, 7671 * This gives us a chance to store the lost events 7672 * on the page. 7673 */ 7674 if (reader->real_end) 7675 local_set(&bpage->commit, reader->real_end); 7676 /* 7677 * If there is room at the end of the page to save the 7678 * missed events, then record it there. 7679 */ 7680 commit = rb_page_size(reader); 7681 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7682 memcpy(&bpage->data[commit], &missed_events, 7683 sizeof(missed_events)); 7684 local_add(RB_MISSED_STORED, &bpage->commit); 7685 } 7686 local_add(RB_MISSED_EVENTS, &bpage->commit); 7687 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7688 "Reader on commit with %ld missed events", 7689 missed_events)) { 7690 /* 7691 * There shouldn't be any missed events if the tail_page 7692 * is on the reader page. But if the tail page is not on the 7693 * reader page and the commit_page is, that would mean that 7694 * there's a commit_overrun (an interrupt preempted an 7695 * addition of an event and then filled the buffer 7696 * with new events). In this case it's not an 7697 * error, but it should still be reported. 7698 * 7699 * TODO: Add missed events to the page for user space to know. 7700 */ 7701 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7702 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7703 } 7704 } 7705 7706 cpu_buffer->lost_events = 0; 7707 7708 goto consume; 7709 7710 out: 7711 /* Some archs do not have data cache coherency between kernel and user-space */ 7712 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7713 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7714 7715 rb_update_meta_page(cpu_buffer); 7716 7717 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7718 rb_put_mapped_buffer(cpu_buffer); 7719 7720 return 0; 7721 } 7722 7723 /* 7724 * We only allocate new buffers, never free them if the CPU goes down. 7725 * If we were to free the buffer, then the user would lose any trace that was in 7726 * the buffer. 7727 */ 7728 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7729 { 7730 struct trace_buffer *buffer; 7731 long nr_pages_same; 7732 int cpu_i; 7733 unsigned long nr_pages; 7734 7735 buffer = container_of(node, struct trace_buffer, node); 7736 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7737 return 0; 7738 7739 nr_pages = 0; 7740 nr_pages_same = 1; 7741 /* check if all cpu sizes are same */ 7742 for_each_buffer_cpu(buffer, cpu_i) { 7743 /* fill in the size from first enabled cpu */ 7744 if (nr_pages == 0) 7745 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7746 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7747 nr_pages_same = 0; 7748 break; 7749 } 7750 } 7751 /* allocate minimum pages, user can later expand it */ 7752 if (!nr_pages_same) 7753 nr_pages = 2; 7754 buffer->buffers[cpu] = 7755 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7756 if (!buffer->buffers[cpu]) { 7757 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7758 cpu); 7759 return -ENOMEM; 7760 } 7761 smp_wmb(); 7762 cpumask_set_cpu(cpu, buffer->cpumask); 7763 return 0; 7764 } 7765 7766 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7767 /* 7768 * This is a basic integrity check of the ring buffer. 7769 * Late in the boot cycle this test will run when configured in. 7770 * It will kick off a thread per CPU that will go into a loop 7771 * writing to the per cpu ring buffer various sizes of data. 7772 * Some of the data will be large items, some small. 7773 * 7774 * Another thread is created that goes into a spin, sending out 7775 * IPIs to the other CPUs to also write into the ring buffer. 7776 * this is to test the nesting ability of the buffer. 7777 * 7778 * Basic stats are recorded and reported. If something in the 7779 * ring buffer should happen that's not expected, a big warning 7780 * is displayed and all ring buffers are disabled. 7781 */ 7782 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7783 7784 struct rb_test_data { 7785 struct trace_buffer *buffer; 7786 unsigned long events; 7787 unsigned long bytes_written; 7788 unsigned long bytes_alloc; 7789 unsigned long bytes_dropped; 7790 unsigned long events_nested; 7791 unsigned long bytes_written_nested; 7792 unsigned long bytes_alloc_nested; 7793 unsigned long bytes_dropped_nested; 7794 int min_size_nested; 7795 int max_size_nested; 7796 int max_size; 7797 int min_size; 7798 int cpu; 7799 int cnt; 7800 }; 7801 7802 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7803 7804 /* 1 meg per cpu */ 7805 #define RB_TEST_BUFFER_SIZE 1048576 7806 7807 static char rb_string[] __initdata = 7808 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7809 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7810 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7811 7812 static bool rb_test_started __initdata; 7813 7814 struct rb_item { 7815 int size; 7816 char str[]; 7817 }; 7818 7819 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7820 { 7821 struct ring_buffer_event *event; 7822 struct rb_item *item; 7823 bool started; 7824 int event_len; 7825 int size; 7826 int len; 7827 int cnt; 7828 7829 /* Have nested writes different that what is written */ 7830 cnt = data->cnt + (nested ? 27 : 0); 7831 7832 /* Multiply cnt by ~e, to make some unique increment */ 7833 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7834 7835 len = size + sizeof(struct rb_item); 7836 7837 started = rb_test_started; 7838 /* read rb_test_started before checking buffer enabled */ 7839 smp_rmb(); 7840 7841 event = ring_buffer_lock_reserve(data->buffer, len); 7842 if (!event) { 7843 /* Ignore dropped events before test starts. */ 7844 if (started) { 7845 if (nested) 7846 data->bytes_dropped_nested += len; 7847 else 7848 data->bytes_dropped += len; 7849 } 7850 return len; 7851 } 7852 7853 event_len = ring_buffer_event_length(event); 7854 7855 if (RB_WARN_ON(data->buffer, event_len < len)) 7856 goto out; 7857 7858 item = ring_buffer_event_data(event); 7859 item->size = size; 7860 memcpy(item->str, rb_string, size); 7861 7862 if (nested) { 7863 data->bytes_alloc_nested += event_len; 7864 data->bytes_written_nested += len; 7865 data->events_nested++; 7866 if (!data->min_size_nested || len < data->min_size_nested) 7867 data->min_size_nested = len; 7868 if (len > data->max_size_nested) 7869 data->max_size_nested = len; 7870 } else { 7871 data->bytes_alloc += event_len; 7872 data->bytes_written += len; 7873 data->events++; 7874 if (!data->min_size || len < data->min_size) 7875 data->max_size = len; 7876 if (len > data->max_size) 7877 data->max_size = len; 7878 } 7879 7880 out: 7881 ring_buffer_unlock_commit(data->buffer); 7882 7883 return 0; 7884 } 7885 7886 static __init int rb_test(void *arg) 7887 { 7888 struct rb_test_data *data = arg; 7889 7890 while (!kthread_should_stop()) { 7891 rb_write_something(data, false); 7892 data->cnt++; 7893 7894 set_current_state(TASK_INTERRUPTIBLE); 7895 /* Now sleep between a min of 100-300us and a max of 1ms */ 7896 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7897 } 7898 7899 return 0; 7900 } 7901 7902 static __init void rb_ipi(void *ignore) 7903 { 7904 struct rb_test_data *data; 7905 int cpu = smp_processor_id(); 7906 7907 data = &rb_data[cpu]; 7908 rb_write_something(data, true); 7909 } 7910 7911 static __init int rb_hammer_test(void *arg) 7912 { 7913 while (!kthread_should_stop()) { 7914 7915 /* Send an IPI to all cpus to write data! */ 7916 smp_call_function(rb_ipi, NULL, 1); 7917 /* No sleep, but for non preempt, let others run */ 7918 schedule(); 7919 } 7920 7921 return 0; 7922 } 7923 7924 static __init int test_ringbuffer(void) 7925 { 7926 struct task_struct *rb_hammer; 7927 struct trace_buffer *buffer; 7928 int cpu; 7929 int ret = 0; 7930 7931 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7932 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7933 return 0; 7934 } 7935 7936 pr_info("Running ring buffer tests...\n"); 7937 7938 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7939 if (WARN_ON(!buffer)) 7940 return 0; 7941 7942 /* Disable buffer so that threads can't write to it yet */ 7943 ring_buffer_record_off(buffer); 7944 7945 for_each_online_cpu(cpu) { 7946 rb_data[cpu].buffer = buffer; 7947 rb_data[cpu].cpu = cpu; 7948 rb_data[cpu].cnt = cpu; 7949 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7950 cpu, "rbtester/%u"); 7951 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7952 pr_cont("FAILED\n"); 7953 ret = PTR_ERR(rb_threads[cpu]); 7954 goto out_free; 7955 } 7956 } 7957 7958 /* Now create the rb hammer! */ 7959 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7960 if (WARN_ON(IS_ERR(rb_hammer))) { 7961 pr_cont("FAILED\n"); 7962 ret = PTR_ERR(rb_hammer); 7963 goto out_free; 7964 } 7965 7966 ring_buffer_record_on(buffer); 7967 /* 7968 * Show buffer is enabled before setting rb_test_started. 7969 * Yes there's a small race window where events could be 7970 * dropped and the thread won't catch it. But when a ring 7971 * buffer gets enabled, there will always be some kind of 7972 * delay before other CPUs see it. Thus, we don't care about 7973 * those dropped events. We care about events dropped after 7974 * the threads see that the buffer is active. 7975 */ 7976 smp_wmb(); 7977 rb_test_started = true; 7978 7979 set_current_state(TASK_INTERRUPTIBLE); 7980 /* Just run for 10 seconds */ 7981 schedule_timeout(10 * HZ); 7982 7983 kthread_stop(rb_hammer); 7984 7985 out_free: 7986 for_each_online_cpu(cpu) { 7987 if (!rb_threads[cpu]) 7988 break; 7989 kthread_stop(rb_threads[cpu]); 7990 } 7991 if (ret) { 7992 ring_buffer_free(buffer); 7993 return ret; 7994 } 7995 7996 /* Report! */ 7997 pr_info("finished\n"); 7998 for_each_online_cpu(cpu) { 7999 struct ring_buffer_event *event; 8000 struct rb_test_data *data = &rb_data[cpu]; 8001 struct rb_item *item; 8002 unsigned long total_events; 8003 unsigned long total_dropped; 8004 unsigned long total_written; 8005 unsigned long total_alloc; 8006 unsigned long total_read = 0; 8007 unsigned long total_size = 0; 8008 unsigned long total_len = 0; 8009 unsigned long total_lost = 0; 8010 unsigned long lost; 8011 int big_event_size; 8012 int small_event_size; 8013 8014 ret = -1; 8015 8016 total_events = data->events + data->events_nested; 8017 total_written = data->bytes_written + data->bytes_written_nested; 8018 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 8019 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 8020 8021 big_event_size = data->max_size + data->max_size_nested; 8022 small_event_size = data->min_size + data->min_size_nested; 8023 8024 pr_info("CPU %d:\n", cpu); 8025 pr_info(" events: %ld\n", total_events); 8026 pr_info(" dropped bytes: %ld\n", total_dropped); 8027 pr_info(" alloced bytes: %ld\n", total_alloc); 8028 pr_info(" written bytes: %ld\n", total_written); 8029 pr_info(" biggest event: %d\n", big_event_size); 8030 pr_info(" smallest event: %d\n", small_event_size); 8031 8032 if (RB_WARN_ON(buffer, total_dropped)) 8033 break; 8034 8035 ret = 0; 8036 8037 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 8038 total_lost += lost; 8039 item = ring_buffer_event_data(event); 8040 total_len += ring_buffer_event_length(event); 8041 total_size += item->size + sizeof(struct rb_item); 8042 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 8043 pr_info("FAILED!\n"); 8044 pr_info("buffer had: %.*s\n", item->size, item->str); 8045 pr_info("expected: %.*s\n", item->size, rb_string); 8046 RB_WARN_ON(buffer, 1); 8047 ret = -1; 8048 break; 8049 } 8050 total_read++; 8051 } 8052 if (ret) 8053 break; 8054 8055 ret = -1; 8056 8057 pr_info(" read events: %ld\n", total_read); 8058 pr_info(" lost events: %ld\n", total_lost); 8059 pr_info(" total events: %ld\n", total_lost + total_read); 8060 pr_info(" recorded len bytes: %ld\n", total_len); 8061 pr_info(" recorded size bytes: %ld\n", total_size); 8062 if (total_lost) { 8063 pr_info(" With dropped events, record len and size may not match\n" 8064 " alloced and written from above\n"); 8065 } else { 8066 if (RB_WARN_ON(buffer, total_len != total_alloc || 8067 total_size != total_written)) 8068 break; 8069 } 8070 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 8071 break; 8072 8073 ret = 0; 8074 } 8075 if (!ret) 8076 pr_info("Ring buffer PASSED!\n"); 8077 8078 ring_buffer_free(buffer); 8079 return 0; 8080 } 8081 8082 late_initcall(test_ringbuffer); 8083 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 8084