1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/ring_buffer_types.h> 8 #include <linux/sched/isolation.h> 9 #include <linux/trace_recursion.h> 10 #include <linux/trace_events.h> 11 #include <linux/ring_buffer.h> 12 #include <linux/trace_clock.h> 13 #include <linux/sched/clock.h> 14 #include <linux/cacheflush.h> 15 #include <linux/trace_seq.h> 16 #include <linux/spinlock.h> 17 #include <linux/irq_work.h> 18 #include <linux/security.h> 19 #include <linux/uaccess.h> 20 #include <linux/hardirq.h> 21 #include <linux/kthread.h> /* for self test */ 22 #include <linux/module.h> 23 #include <linux/percpu.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/slab.h> 27 #include <linux/init.h> 28 #include <linux/hash.h> 29 #include <linux/list.h> 30 #include <linux/cpu.h> 31 #include <linux/oom.h> 32 #include <linux/mm.h> 33 34 #include <asm/local64.h> 35 #include <asm/local.h> 36 #include <asm/setup.h> 37 38 #include "trace.h" 39 40 /* 41 * The "absolute" timestamp in the buffer is only 59 bits. 42 * If a clock has the 5 MSBs set, it needs to be saved and 43 * reinserted. 44 */ 45 #define TS_MSB (0xf8ULL << 56) 46 #define ABS_TS_MASK (~TS_MSB) 47 48 static void update_pages_handler(struct work_struct *work); 49 50 #define RING_BUFFER_META_MAGIC 0xBADFEED 51 52 struct ring_buffer_meta { 53 int magic; 54 int struct_sizes; 55 unsigned long total_size; 56 unsigned long buffers_offset; 57 }; 58 59 struct ring_buffer_cpu_meta { 60 unsigned long first_buffer; 61 unsigned long head_buffer; 62 unsigned long commit_buffer; 63 __u32 subbuf_size; 64 __u32 nr_subbufs; 65 int buffers[]; 66 }; 67 68 /* 69 * The ring buffer header is special. We must manually up keep it. 70 */ 71 int ring_buffer_print_entry_header(struct trace_seq *s) 72 { 73 trace_seq_puts(s, "# compressed entry header\n"); 74 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 75 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 76 trace_seq_puts(s, "\tarray : 32 bits\n"); 77 trace_seq_putc(s, '\n'); 78 trace_seq_printf(s, "\tpadding : type == %d\n", 79 RINGBUF_TYPE_PADDING); 80 trace_seq_printf(s, "\ttime_extend : type == %d\n", 81 RINGBUF_TYPE_TIME_EXTEND); 82 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 83 RINGBUF_TYPE_TIME_STAMP); 84 trace_seq_printf(s, "\tdata max type_len == %d\n", 85 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 86 87 return !trace_seq_has_overflowed(s); 88 } 89 90 /* 91 * The ring buffer is made up of a list of pages. A separate list of pages is 92 * allocated for each CPU. A writer may only write to a buffer that is 93 * associated with the CPU it is currently executing on. A reader may read 94 * from any per cpu buffer. 95 * 96 * The reader is special. For each per cpu buffer, the reader has its own 97 * reader page. When a reader has read the entire reader page, this reader 98 * page is swapped with another page in the ring buffer. 99 * 100 * Now, as long as the writer is off the reader page, the reader can do what 101 * ever it wants with that page. The writer will never write to that page 102 * again (as long as it is out of the ring buffer). 103 * 104 * Here's some silly ASCII art. 105 * 106 * +------+ 107 * |reader| RING BUFFER 108 * |page | 109 * +------+ +---+ +---+ +---+ 110 * | |-->| |-->| | 111 * +---+ +---+ +---+ 112 * ^ | 113 * | | 114 * +---------------+ 115 * 116 * 117 * +------+ 118 * |reader| RING BUFFER 119 * |page |------------------v 120 * +------+ +---+ +---+ +---+ 121 * | |-->| |-->| | 122 * +---+ +---+ +---+ 123 * ^ | 124 * | | 125 * +---------------+ 126 * 127 * 128 * +------+ 129 * |reader| RING BUFFER 130 * |page |------------------v 131 * +------+ +---+ +---+ +---+ 132 * ^ | |-->| |-->| | 133 * | +---+ +---+ +---+ 134 * | | 135 * | | 136 * +------------------------------+ 137 * 138 * 139 * +------+ 140 * |buffer| RING BUFFER 141 * |page |------------------v 142 * +------+ +---+ +---+ +---+ 143 * ^ | | | |-->| | 144 * | New +---+ +---+ +---+ 145 * | Reader------^ | 146 * | page | 147 * +------------------------------+ 148 * 149 * 150 * After we make this swap, the reader can hand this page off to the splice 151 * code and be done with it. It can even allocate a new page if it needs to 152 * and swap that into the ring buffer. 153 * 154 * We will be using cmpxchg soon to make all this lockless. 155 * 156 */ 157 158 /* Used for individual buffers (after the counter) */ 159 #define RB_BUFFER_OFF (1 << 20) 160 161 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 162 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 163 164 enum { 165 RB_LEN_TIME_EXTEND = 8, 166 RB_LEN_TIME_STAMP = 8, 167 }; 168 169 #define skip_time_extend(event) \ 170 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 171 172 #define extended_time(event) \ 173 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 174 175 static inline bool rb_null_event(struct ring_buffer_event *event) 176 { 177 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 178 } 179 180 static void rb_event_set_padding(struct ring_buffer_event *event) 181 { 182 /* padding has a NULL time_delta */ 183 event->type_len = RINGBUF_TYPE_PADDING; 184 event->time_delta = 0; 185 } 186 187 static unsigned 188 rb_event_data_length(struct ring_buffer_event *event) 189 { 190 unsigned length; 191 192 if (event->type_len) 193 length = event->type_len * RB_ALIGNMENT; 194 else 195 length = event->array[0]; 196 return length + RB_EVNT_HDR_SIZE; 197 } 198 199 /* 200 * Return the length of the given event. Will return 201 * the length of the time extend if the event is a 202 * time extend. 203 */ 204 static inline unsigned 205 rb_event_length(struct ring_buffer_event *event) 206 { 207 switch (event->type_len) { 208 case RINGBUF_TYPE_PADDING: 209 if (rb_null_event(event)) 210 /* undefined */ 211 return -1; 212 return event->array[0] + RB_EVNT_HDR_SIZE; 213 214 case RINGBUF_TYPE_TIME_EXTEND: 215 return RB_LEN_TIME_EXTEND; 216 217 case RINGBUF_TYPE_TIME_STAMP: 218 return RB_LEN_TIME_STAMP; 219 220 case RINGBUF_TYPE_DATA: 221 return rb_event_data_length(event); 222 default: 223 WARN_ON_ONCE(1); 224 } 225 /* not hit */ 226 return 0; 227 } 228 229 /* 230 * Return total length of time extend and data, 231 * or just the event length for all other events. 232 */ 233 static inline unsigned 234 rb_event_ts_length(struct ring_buffer_event *event) 235 { 236 unsigned len = 0; 237 238 if (extended_time(event)) { 239 /* time extends include the data event after it */ 240 len = RB_LEN_TIME_EXTEND; 241 event = skip_time_extend(event); 242 } 243 return len + rb_event_length(event); 244 } 245 246 /** 247 * ring_buffer_event_length - return the length of the event 248 * @event: the event to get the length of 249 * 250 * Returns the size of the data load of a data event. 251 * If the event is something other than a data event, it 252 * returns the size of the event itself. With the exception 253 * of a TIME EXTEND, where it still returns the size of the 254 * data load of the data event after it. 255 */ 256 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 257 { 258 unsigned length; 259 260 if (extended_time(event)) 261 event = skip_time_extend(event); 262 263 length = rb_event_length(event); 264 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 265 return length; 266 length -= RB_EVNT_HDR_SIZE; 267 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 268 length -= sizeof(event->array[0]); 269 return length; 270 } 271 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 272 273 /* inline for ring buffer fast paths */ 274 static __always_inline void * 275 rb_event_data(struct ring_buffer_event *event) 276 { 277 if (extended_time(event)) 278 event = skip_time_extend(event); 279 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 280 /* If length is in len field, then array[0] has the data */ 281 if (event->type_len) 282 return (void *)&event->array[0]; 283 /* Otherwise length is in array[0] and array[1] has the data */ 284 return (void *)&event->array[1]; 285 } 286 287 /** 288 * ring_buffer_event_data - return the data of the event 289 * @event: the event to get the data from 290 */ 291 void *ring_buffer_event_data(struct ring_buffer_event *event) 292 { 293 return rb_event_data(event); 294 } 295 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 296 297 #define for_each_buffer_cpu(buffer, cpu) \ 298 for_each_cpu(cpu, buffer->cpumask) 299 300 #define for_each_online_buffer_cpu(buffer, cpu) \ 301 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 302 303 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 304 { 305 u64 ts; 306 307 ts = event->array[0]; 308 ts <<= TS_SHIFT; 309 ts += event->time_delta; 310 311 return ts; 312 } 313 314 /* Flag when events were overwritten */ 315 #define RB_MISSED_EVENTS (1 << 31) 316 /* Missed count stored at end */ 317 #define RB_MISSED_STORED (1 << 30) 318 319 #define RB_MISSED_MASK (3 << 30) 320 321 struct buffer_data_read_page { 322 unsigned order; /* order of the page */ 323 struct buffer_data_page *data; /* actual data, stored in this page */ 324 }; 325 326 /* 327 * Note, the buffer_page list must be first. The buffer pages 328 * are allocated in cache lines, which means that each buffer 329 * page will be at the beginning of a cache line, and thus 330 * the least significant bits will be zero. We use this to 331 * add flags in the list struct pointers, to make the ring buffer 332 * lockless. 333 */ 334 struct buffer_page { 335 struct list_head list; /* list of buffer pages */ 336 local_t write; /* index for next write */ 337 unsigned read; /* index for next read */ 338 local_t entries; /* entries on this page */ 339 unsigned long real_end; /* real end of data */ 340 unsigned order; /* order of the page */ 341 u32 id:30; /* ID for external mapping */ 342 u32 range:1; /* Mapped via a range */ 343 struct buffer_data_page *page; /* Actual data page */ 344 }; 345 346 /* 347 * The buffer page counters, write and entries, must be reset 348 * atomically when crossing page boundaries. To synchronize this 349 * update, two counters are inserted into the number. One is 350 * the actual counter for the write position or count on the page. 351 * 352 * The other is a counter of updaters. Before an update happens 353 * the update partition of the counter is incremented. This will 354 * allow the updater to update the counter atomically. 355 * 356 * The counter is 20 bits, and the state data is 12. 357 */ 358 #define RB_WRITE_MASK 0xfffff 359 #define RB_WRITE_INTCNT (1 << 20) 360 361 static void rb_init_page(struct buffer_data_page *bpage) 362 { 363 local_set(&bpage->commit, 0); 364 } 365 366 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 367 { 368 return local_read(&bpage->page->commit); 369 } 370 371 static void free_buffer_page(struct buffer_page *bpage) 372 { 373 /* Range pages are not to be freed */ 374 if (!bpage->range) 375 free_pages((unsigned long)bpage->page, bpage->order); 376 kfree(bpage); 377 } 378 379 /* 380 * For best performance, allocate cpu buffer data cache line sized 381 * and per CPU. 382 */ 383 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 384 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 385 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 386 387 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 388 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 389 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 390 391 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 392 { 393 struct buffer_data_page *dpage; 394 struct page *page; 395 gfp_t mflags; 396 397 /* 398 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 399 * gracefully without invoking oom-killer and the system is not 400 * destabilized. 401 */ 402 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 403 404 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 405 if (!page) 406 return NULL; 407 408 dpage = page_address(page); 409 rb_init_page(dpage); 410 411 return dpage; 412 } 413 414 struct rb_irq_work { 415 struct irq_work work; 416 wait_queue_head_t waiters; 417 wait_queue_head_t full_waiters; 418 atomic_t seq; 419 bool waiters_pending; 420 bool full_waiters_pending; 421 bool wakeup_full; 422 }; 423 424 /* 425 * Structure to hold event state and handle nested events. 426 */ 427 struct rb_event_info { 428 u64 ts; 429 u64 delta; 430 u64 before; 431 u64 after; 432 unsigned long length; 433 struct buffer_page *tail_page; 434 int add_timestamp; 435 }; 436 437 /* 438 * Used for the add_timestamp 439 * NONE 440 * EXTEND - wants a time extend 441 * ABSOLUTE - the buffer requests all events to have absolute time stamps 442 * FORCE - force a full time stamp. 443 */ 444 enum { 445 RB_ADD_STAMP_NONE = 0, 446 RB_ADD_STAMP_EXTEND = BIT(1), 447 RB_ADD_STAMP_ABSOLUTE = BIT(2), 448 RB_ADD_STAMP_FORCE = BIT(3) 449 }; 450 /* 451 * Used for which event context the event is in. 452 * TRANSITION = 0 453 * NMI = 1 454 * IRQ = 2 455 * SOFTIRQ = 3 456 * NORMAL = 4 457 * 458 * See trace_recursive_lock() comment below for more details. 459 */ 460 enum { 461 RB_CTX_TRANSITION, 462 RB_CTX_NMI, 463 RB_CTX_IRQ, 464 RB_CTX_SOFTIRQ, 465 RB_CTX_NORMAL, 466 RB_CTX_MAX 467 }; 468 469 struct rb_time_struct { 470 local64_t time; 471 }; 472 typedef struct rb_time_struct rb_time_t; 473 474 #define MAX_NEST 5 475 476 /* 477 * head_page == tail_page && head == tail then buffer is empty. 478 */ 479 struct ring_buffer_per_cpu { 480 int cpu; 481 atomic_t record_disabled; 482 atomic_t resize_disabled; 483 struct trace_buffer *buffer; 484 raw_spinlock_t reader_lock; /* serialize readers */ 485 arch_spinlock_t lock; 486 struct lock_class_key lock_key; 487 struct buffer_data_page *free_page; 488 unsigned long nr_pages; 489 unsigned int current_context; 490 struct list_head *pages; 491 /* pages generation counter, incremented when the list changes */ 492 unsigned long cnt; 493 struct buffer_page *head_page; /* read from head */ 494 struct buffer_page *tail_page; /* write to tail */ 495 struct buffer_page *commit_page; /* committed pages */ 496 struct buffer_page *reader_page; 497 unsigned long lost_events; 498 unsigned long last_overrun; 499 unsigned long nest; 500 local_t entries_bytes; 501 local_t entries; 502 local_t overrun; 503 local_t commit_overrun; 504 local_t dropped_events; 505 local_t committing; 506 local_t commits; 507 local_t pages_touched; 508 local_t pages_lost; 509 local_t pages_read; 510 long last_pages_touch; 511 size_t shortest_full; 512 unsigned long read; 513 unsigned long read_bytes; 514 rb_time_t write_stamp; 515 rb_time_t before_stamp; 516 u64 event_stamp[MAX_NEST]; 517 u64 read_stamp; 518 /* pages removed since last reset */ 519 unsigned long pages_removed; 520 521 unsigned int mapped; 522 unsigned int user_mapped; /* user space mapping */ 523 struct mutex mapping_lock; 524 struct buffer_page **subbuf_ids; /* ID to subbuf VA */ 525 struct trace_buffer_meta *meta_page; 526 struct ring_buffer_cpu_meta *ring_meta; 527 528 struct ring_buffer_remote *remote; 529 530 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 531 long nr_pages_to_update; 532 struct list_head new_pages; /* new pages to add */ 533 struct work_struct update_pages_work; 534 struct completion update_done; 535 536 struct rb_irq_work irq_work; 537 }; 538 539 struct trace_buffer { 540 unsigned flags; 541 int cpus; 542 atomic_t record_disabled; 543 atomic_t resizing; 544 cpumask_var_t cpumask; 545 546 struct lock_class_key *reader_lock_key; 547 548 struct mutex mutex; 549 550 struct ring_buffer_per_cpu **buffers; 551 552 struct ring_buffer_remote *remote; 553 554 struct hlist_node node; 555 u64 (*clock)(void); 556 557 struct rb_irq_work irq_work; 558 bool time_stamp_abs; 559 560 unsigned long range_addr_start; 561 unsigned long range_addr_end; 562 563 struct ring_buffer_meta *meta; 564 565 unsigned int subbuf_size; 566 unsigned int subbuf_order; 567 unsigned int max_data_size; 568 }; 569 570 struct ring_buffer_iter { 571 struct ring_buffer_per_cpu *cpu_buffer; 572 unsigned long head; 573 unsigned long next_event; 574 struct buffer_page *head_page; 575 struct buffer_page *cache_reader_page; 576 unsigned long cache_read; 577 unsigned long cache_pages_removed; 578 u64 read_stamp; 579 u64 page_stamp; 580 struct ring_buffer_event *event; 581 size_t event_size; 582 int missed_events; 583 }; 584 585 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 586 { 587 struct buffer_data_page field; 588 589 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 590 "offset:0;\tsize:%u;\tsigned:%u;\n", 591 (unsigned int)sizeof(field.time_stamp), 592 (unsigned int)is_signed_type(u64)); 593 594 trace_seq_printf(s, "\tfield: local_t commit;\t" 595 "offset:%u;\tsize:%u;\tsigned:%u;\n", 596 (unsigned int)offsetof(typeof(field), commit), 597 (unsigned int)sizeof(field.commit), 598 (unsigned int)is_signed_type(long)); 599 600 trace_seq_printf(s, "\tfield: char overwrite;\t" 601 "offset:%u;\tsize:%u;\tsigned:%u;\n", 602 (unsigned int)offsetof(typeof(field), commit), 603 1, 604 (unsigned int)is_signed_type(char)); 605 606 trace_seq_printf(s, "\tfield: char data;\t" 607 "offset:%u;\tsize:%u;\tsigned:%u;\n", 608 (unsigned int)offsetof(typeof(field), data), 609 (unsigned int)(buffer ? buffer->subbuf_size : 610 PAGE_SIZE - BUF_PAGE_HDR_SIZE), 611 (unsigned int)is_signed_type(char)); 612 613 return !trace_seq_has_overflowed(s); 614 } 615 616 static inline void rb_time_read(rb_time_t *t, u64 *ret) 617 { 618 *ret = local64_read(&t->time); 619 } 620 static void rb_time_set(rb_time_t *t, u64 val) 621 { 622 local64_set(&t->time, val); 623 } 624 625 /* 626 * Enable this to make sure that the event passed to 627 * ring_buffer_event_time_stamp() is not committed and also 628 * is on the buffer that it passed in. 629 */ 630 //#define RB_VERIFY_EVENT 631 #ifdef RB_VERIFY_EVENT 632 static struct list_head *rb_list_head(struct list_head *list); 633 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 634 void *event) 635 { 636 struct buffer_page *page = cpu_buffer->commit_page; 637 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 638 struct list_head *next; 639 long commit, write; 640 unsigned long addr = (unsigned long)event; 641 bool done = false; 642 int stop = 0; 643 644 /* Make sure the event exists and is not committed yet */ 645 do { 646 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 647 done = true; 648 commit = local_read(&page->page->commit); 649 write = local_read(&page->write); 650 if (addr >= (unsigned long)&page->page->data[commit] && 651 addr < (unsigned long)&page->page->data[write]) 652 return; 653 654 next = rb_list_head(page->list.next); 655 page = list_entry(next, struct buffer_page, list); 656 } while (!done); 657 WARN_ON_ONCE(1); 658 } 659 #else 660 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 661 void *event) 662 { 663 } 664 #endif 665 666 /* 667 * The absolute time stamp drops the 5 MSBs and some clocks may 668 * require them. The rb_fix_abs_ts() will take a previous full 669 * time stamp, and add the 5 MSB of that time stamp on to the 670 * saved absolute time stamp. Then they are compared in case of 671 * the unlikely event that the latest time stamp incremented 672 * the 5 MSB. 673 */ 674 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 675 { 676 if (save_ts & TS_MSB) { 677 abs |= save_ts & TS_MSB; 678 /* Check for overflow */ 679 if (unlikely(abs < save_ts)) 680 abs += 1ULL << 59; 681 } 682 return abs; 683 } 684 685 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 686 687 /** 688 * ring_buffer_event_time_stamp - return the event's current time stamp 689 * @buffer: The buffer that the event is on 690 * @event: the event to get the time stamp of 691 * 692 * Note, this must be called after @event is reserved, and before it is 693 * committed to the ring buffer. And must be called from the same 694 * context where the event was reserved (normal, softirq, irq, etc). 695 * 696 * Returns the time stamp associated with the current event. 697 * If the event has an extended time stamp, then that is used as 698 * the time stamp to return. 699 * In the highly unlikely case that the event was nested more than 700 * the max nesting, then the write_stamp of the buffer is returned, 701 * otherwise current time is returned, but that really neither of 702 * the last two cases should ever happen. 703 */ 704 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 705 struct ring_buffer_event *event) 706 { 707 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 708 unsigned int nest; 709 u64 ts; 710 711 /* If the event includes an absolute time, then just use that */ 712 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 713 ts = rb_event_time_stamp(event); 714 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 715 } 716 717 nest = local_read(&cpu_buffer->committing); 718 verify_event(cpu_buffer, event); 719 if (WARN_ON_ONCE(!nest)) 720 goto fail; 721 722 /* Read the current saved nesting level time stamp */ 723 if (likely(--nest < MAX_NEST)) 724 return cpu_buffer->event_stamp[nest]; 725 726 /* Shouldn't happen, warn if it does */ 727 WARN_ONCE(1, "nest (%d) greater than max", nest); 728 729 fail: 730 rb_time_read(&cpu_buffer->write_stamp, &ts); 731 732 return ts; 733 } 734 735 /** 736 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 737 * @buffer: The ring_buffer to get the number of pages from 738 * @cpu: The cpu of the ring_buffer to get the number of pages from 739 * 740 * Returns the number of pages that have content in the ring buffer. 741 */ 742 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 743 { 744 size_t read; 745 size_t lost; 746 size_t cnt; 747 748 read = local_read(&buffer->buffers[cpu]->pages_read); 749 lost = local_read(&buffer->buffers[cpu]->pages_lost); 750 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 751 752 if (WARN_ON_ONCE(cnt < lost)) 753 return 0; 754 755 cnt -= lost; 756 757 /* The reader can read an empty page, but not more than that */ 758 if (cnt < read) { 759 WARN_ON_ONCE(read > cnt + 1); 760 return 0; 761 } 762 763 return cnt - read; 764 } 765 766 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 767 { 768 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 769 size_t nr_pages; 770 size_t dirty; 771 772 nr_pages = cpu_buffer->nr_pages; 773 if (!nr_pages || !full) 774 return true; 775 776 /* 777 * Add one as dirty will never equal nr_pages, as the sub-buffer 778 * that the writer is on is not counted as dirty. 779 * This is needed if "buffer_percent" is set to 100. 780 */ 781 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 782 783 return (dirty * 100) >= (full * nr_pages); 784 } 785 786 /* 787 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 788 * 789 * Schedules a delayed work to wake up any task that is blocked on the 790 * ring buffer waiters queue. 791 */ 792 static void rb_wake_up_waiters(struct irq_work *work) 793 { 794 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 795 796 /* For waiters waiting for the first wake up */ 797 (void)atomic_fetch_inc_release(&rbwork->seq); 798 799 wake_up_all(&rbwork->waiters); 800 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 801 /* Only cpu_buffer sets the above flags */ 802 struct ring_buffer_per_cpu *cpu_buffer = 803 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 804 805 /* Called from interrupt context */ 806 raw_spin_lock(&cpu_buffer->reader_lock); 807 rbwork->wakeup_full = false; 808 rbwork->full_waiters_pending = false; 809 810 /* Waking up all waiters, they will reset the shortest full */ 811 cpu_buffer->shortest_full = 0; 812 raw_spin_unlock(&cpu_buffer->reader_lock); 813 814 wake_up_all(&rbwork->full_waiters); 815 } 816 } 817 818 /** 819 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 820 * @buffer: The ring buffer to wake waiters on 821 * @cpu: The CPU buffer to wake waiters on 822 * 823 * In the case of a file that represents a ring buffer is closing, 824 * it is prudent to wake up any waiters that are on this. 825 */ 826 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 827 { 828 struct ring_buffer_per_cpu *cpu_buffer; 829 struct rb_irq_work *rbwork; 830 831 if (!buffer) 832 return; 833 834 if (cpu == RING_BUFFER_ALL_CPUS) { 835 836 /* Wake up individual ones too. One level recursion */ 837 for_each_buffer_cpu(buffer, cpu) 838 ring_buffer_wake_waiters(buffer, cpu); 839 840 rbwork = &buffer->irq_work; 841 } else { 842 if (WARN_ON_ONCE(!buffer->buffers)) 843 return; 844 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 845 return; 846 847 cpu_buffer = buffer->buffers[cpu]; 848 /* The CPU buffer may not have been initialized yet */ 849 if (!cpu_buffer) 850 return; 851 rbwork = &cpu_buffer->irq_work; 852 } 853 854 /* This can be called in any context */ 855 irq_work_queue(&rbwork->work); 856 } 857 858 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 859 { 860 struct ring_buffer_per_cpu *cpu_buffer; 861 bool ret = false; 862 863 /* Reads of all CPUs always waits for any data */ 864 if (cpu == RING_BUFFER_ALL_CPUS) 865 return !ring_buffer_empty(buffer); 866 867 cpu_buffer = buffer->buffers[cpu]; 868 869 if (!ring_buffer_empty_cpu(buffer, cpu)) { 870 unsigned long flags; 871 bool pagebusy; 872 873 if (!full) 874 return true; 875 876 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 877 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 878 ret = !pagebusy && full_hit(buffer, cpu, full); 879 880 if (!ret && (!cpu_buffer->shortest_full || 881 cpu_buffer->shortest_full > full)) { 882 cpu_buffer->shortest_full = full; 883 } 884 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 885 } 886 return ret; 887 } 888 889 static inline bool 890 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 891 int cpu, int full, ring_buffer_cond_fn cond, void *data) 892 { 893 if (rb_watermark_hit(buffer, cpu, full)) 894 return true; 895 896 if (cond(data)) 897 return true; 898 899 /* 900 * The events can happen in critical sections where 901 * checking a work queue can cause deadlocks. 902 * After adding a task to the queue, this flag is set 903 * only to notify events to try to wake up the queue 904 * using irq_work. 905 * 906 * We don't clear it even if the buffer is no longer 907 * empty. The flag only causes the next event to run 908 * irq_work to do the work queue wake up. The worse 909 * that can happen if we race with !trace_empty() is that 910 * an event will cause an irq_work to try to wake up 911 * an empty queue. 912 * 913 * There's no reason to protect this flag either, as 914 * the work queue and irq_work logic will do the necessary 915 * synchronization for the wake ups. The only thing 916 * that is necessary is that the wake up happens after 917 * a task has been queued. It's OK for spurious wake ups. 918 */ 919 if (full) 920 rbwork->full_waiters_pending = true; 921 else 922 rbwork->waiters_pending = true; 923 924 return false; 925 } 926 927 struct rb_wait_data { 928 struct rb_irq_work *irq_work; 929 int seq; 930 }; 931 932 /* 933 * The default wait condition for ring_buffer_wait() is to just to exit the 934 * wait loop the first time it is woken up. 935 */ 936 static bool rb_wait_once(void *data) 937 { 938 struct rb_wait_data *rdata = data; 939 struct rb_irq_work *rbwork = rdata->irq_work; 940 941 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 942 } 943 944 /** 945 * ring_buffer_wait - wait for input to the ring buffer 946 * @buffer: buffer to wait on 947 * @cpu: the cpu buffer to wait on 948 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 949 * @cond: condition function to break out of wait (NULL to run once) 950 * @data: the data to pass to @cond. 951 * 952 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 953 * as data is added to any of the @buffer's cpu buffers. Otherwise 954 * it will wait for data to be added to a specific cpu buffer. 955 */ 956 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 957 ring_buffer_cond_fn cond, void *data) 958 { 959 struct ring_buffer_per_cpu *cpu_buffer; 960 struct wait_queue_head *waitq; 961 struct rb_irq_work *rbwork; 962 struct rb_wait_data rdata; 963 int ret = 0; 964 965 /* 966 * Depending on what the caller is waiting for, either any 967 * data in any cpu buffer, or a specific buffer, put the 968 * caller on the appropriate wait queue. 969 */ 970 if (cpu == RING_BUFFER_ALL_CPUS) { 971 rbwork = &buffer->irq_work; 972 /* Full only makes sense on per cpu reads */ 973 full = 0; 974 } else { 975 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 976 return -ENODEV; 977 cpu_buffer = buffer->buffers[cpu]; 978 rbwork = &cpu_buffer->irq_work; 979 } 980 981 if (full) 982 waitq = &rbwork->full_waiters; 983 else 984 waitq = &rbwork->waiters; 985 986 /* Set up to exit loop as soon as it is woken */ 987 if (!cond) { 988 cond = rb_wait_once; 989 rdata.irq_work = rbwork; 990 rdata.seq = atomic_read_acquire(&rbwork->seq); 991 data = &rdata; 992 } 993 994 ret = wait_event_interruptible((*waitq), 995 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 996 997 return ret; 998 } 999 1000 /** 1001 * ring_buffer_poll_wait - poll on buffer input 1002 * @buffer: buffer to wait on 1003 * @cpu: the cpu buffer to wait on 1004 * @filp: the file descriptor 1005 * @poll_table: The poll descriptor 1006 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1007 * 1008 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1009 * as data is added to any of the @buffer's cpu buffers. Otherwise 1010 * it will wait for data to be added to a specific cpu buffer. 1011 * 1012 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1013 * zero otherwise. 1014 */ 1015 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1016 struct file *filp, poll_table *poll_table, int full) 1017 { 1018 struct ring_buffer_per_cpu *cpu_buffer; 1019 struct rb_irq_work *rbwork; 1020 1021 if (cpu == RING_BUFFER_ALL_CPUS) { 1022 rbwork = &buffer->irq_work; 1023 full = 0; 1024 } else { 1025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1026 return EPOLLERR; 1027 1028 cpu_buffer = buffer->buffers[cpu]; 1029 rbwork = &cpu_buffer->irq_work; 1030 } 1031 1032 if (full) { 1033 poll_wait(filp, &rbwork->full_waiters, poll_table); 1034 1035 if (rb_watermark_hit(buffer, cpu, full)) 1036 return EPOLLIN | EPOLLRDNORM; 1037 /* 1038 * Only allow full_waiters_pending update to be seen after 1039 * the shortest_full is set (in rb_watermark_hit). If the 1040 * writer sees the full_waiters_pending flag set, it will 1041 * compare the amount in the ring buffer to shortest_full. 1042 * If the amount in the ring buffer is greater than the 1043 * shortest_full percent, it will call the irq_work handler 1044 * to wake up this list. The irq_handler will reset shortest_full 1045 * back to zero. That's done under the reader_lock, but 1046 * the below smp_mb() makes sure that the update to 1047 * full_waiters_pending doesn't leak up into the above. 1048 */ 1049 smp_mb(); 1050 rbwork->full_waiters_pending = true; 1051 return 0; 1052 } 1053 1054 poll_wait(filp, &rbwork->waiters, poll_table); 1055 rbwork->waiters_pending = true; 1056 1057 /* 1058 * There's a tight race between setting the waiters_pending and 1059 * checking if the ring buffer is empty. Once the waiters_pending bit 1060 * is set, the next event will wake the task up, but we can get stuck 1061 * if there's only a single event in. 1062 * 1063 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1064 * but adding a memory barrier to all events will cause too much of a 1065 * performance hit in the fast path. We only need a memory barrier when 1066 * the buffer goes from empty to having content. But as this race is 1067 * extremely small, and it's not a problem if another event comes in, we 1068 * will fix it later. 1069 */ 1070 smp_mb(); 1071 1072 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1073 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1074 return EPOLLIN | EPOLLRDNORM; 1075 return 0; 1076 } 1077 1078 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1079 #define RB_WARN_ON(b, cond) \ 1080 ({ \ 1081 int _____ret = unlikely(cond); \ 1082 if (_____ret) { \ 1083 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1084 struct ring_buffer_per_cpu *__b = \ 1085 (void *)b; \ 1086 atomic_inc(&__b->buffer->record_disabled); \ 1087 } else \ 1088 atomic_inc(&b->record_disabled); \ 1089 WARN_ON(1); \ 1090 } \ 1091 _____ret; \ 1092 }) 1093 1094 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1095 #define DEBUG_SHIFT 0 1096 1097 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1098 { 1099 u64 ts; 1100 1101 /* Skip retpolines :-( */ 1102 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1103 ts = trace_clock_local(); 1104 else 1105 ts = buffer->clock(); 1106 1107 /* shift to debug/test normalization and TIME_EXTENTS */ 1108 return ts << DEBUG_SHIFT; 1109 } 1110 1111 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1112 { 1113 u64 time; 1114 1115 preempt_disable_notrace(); 1116 time = rb_time_stamp(buffer); 1117 preempt_enable_notrace(); 1118 1119 return time; 1120 } 1121 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1122 1123 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1124 int cpu, u64 *ts) 1125 { 1126 /* Just stupid testing the normalize function and deltas */ 1127 *ts >>= DEBUG_SHIFT; 1128 } 1129 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1130 1131 /* 1132 * Making the ring buffer lockless makes things tricky. 1133 * Although writes only happen on the CPU that they are on, 1134 * and they only need to worry about interrupts. Reads can 1135 * happen on any CPU. 1136 * 1137 * The reader page is always off the ring buffer, but when the 1138 * reader finishes with a page, it needs to swap its page with 1139 * a new one from the buffer. The reader needs to take from 1140 * the head (writes go to the tail). But if a writer is in overwrite 1141 * mode and wraps, it must push the head page forward. 1142 * 1143 * Here lies the problem. 1144 * 1145 * The reader must be careful to replace only the head page, and 1146 * not another one. As described at the top of the file in the 1147 * ASCII art, the reader sets its old page to point to the next 1148 * page after head. It then sets the page after head to point to 1149 * the old reader page. But if the writer moves the head page 1150 * during this operation, the reader could end up with the tail. 1151 * 1152 * We use cmpxchg to help prevent this race. We also do something 1153 * special with the page before head. We set the LSB to 1. 1154 * 1155 * When the writer must push the page forward, it will clear the 1156 * bit that points to the head page, move the head, and then set 1157 * the bit that points to the new head page. 1158 * 1159 * We also don't want an interrupt coming in and moving the head 1160 * page on another writer. Thus we use the second LSB to catch 1161 * that too. Thus: 1162 * 1163 * head->list->prev->next bit 1 bit 0 1164 * ------- ------- 1165 * Normal page 0 0 1166 * Points to head page 0 1 1167 * New head page 1 0 1168 * 1169 * Note we can not trust the prev pointer of the head page, because: 1170 * 1171 * +----+ +-----+ +-----+ 1172 * | |------>| T |---X--->| N | 1173 * | |<------| | | | 1174 * +----+ +-----+ +-----+ 1175 * ^ ^ | 1176 * | +-----+ | | 1177 * +----------| R |----------+ | 1178 * | |<-----------+ 1179 * +-----+ 1180 * 1181 * Key: ---X--> HEAD flag set in pointer 1182 * T Tail page 1183 * R Reader page 1184 * N Next page 1185 * 1186 * (see __rb_reserve_next() to see where this happens) 1187 * 1188 * What the above shows is that the reader just swapped out 1189 * the reader page with a page in the buffer, but before it 1190 * could make the new header point back to the new page added 1191 * it was preempted by a writer. The writer moved forward onto 1192 * the new page added by the reader and is about to move forward 1193 * again. 1194 * 1195 * You can see, it is legitimate for the previous pointer of 1196 * the head (or any page) not to point back to itself. But only 1197 * temporarily. 1198 */ 1199 1200 #define RB_PAGE_NORMAL 0UL 1201 #define RB_PAGE_HEAD 1UL 1202 #define RB_PAGE_UPDATE 2UL 1203 1204 1205 #define RB_FLAG_MASK 3UL 1206 1207 /* PAGE_MOVED is not part of the mask */ 1208 #define RB_PAGE_MOVED 4UL 1209 1210 /* 1211 * rb_list_head - remove any bit 1212 */ 1213 static struct list_head *rb_list_head(struct list_head *list) 1214 { 1215 unsigned long val = (unsigned long)list; 1216 1217 return (struct list_head *)(val & ~RB_FLAG_MASK); 1218 } 1219 1220 /* 1221 * rb_is_head_page - test if the given page is the head page 1222 * 1223 * Because the reader may move the head_page pointer, we can 1224 * not trust what the head page is (it may be pointing to 1225 * the reader page). But if the next page is a header page, 1226 * its flags will be non zero. 1227 */ 1228 static inline int 1229 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1230 { 1231 unsigned long val; 1232 1233 val = (unsigned long)list->next; 1234 1235 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1236 return RB_PAGE_MOVED; 1237 1238 return val & RB_FLAG_MASK; 1239 } 1240 1241 /* 1242 * rb_is_reader_page 1243 * 1244 * The unique thing about the reader page, is that, if the 1245 * writer is ever on it, the previous pointer never points 1246 * back to the reader page. 1247 */ 1248 static bool rb_is_reader_page(struct buffer_page *page) 1249 { 1250 struct list_head *list = page->list.prev; 1251 1252 return rb_list_head(list->next) != &page->list; 1253 } 1254 1255 /* 1256 * rb_set_list_to_head - set a list_head to be pointing to head. 1257 */ 1258 static void rb_set_list_to_head(struct list_head *list) 1259 { 1260 unsigned long *ptr; 1261 1262 ptr = (unsigned long *)&list->next; 1263 *ptr |= RB_PAGE_HEAD; 1264 *ptr &= ~RB_PAGE_UPDATE; 1265 } 1266 1267 /* 1268 * rb_head_page_activate - sets up head page 1269 */ 1270 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1271 { 1272 struct buffer_page *head; 1273 1274 head = cpu_buffer->head_page; 1275 if (!head) 1276 return; 1277 1278 /* 1279 * Set the previous list pointer to have the HEAD flag. 1280 */ 1281 rb_set_list_to_head(head->list.prev); 1282 1283 if (cpu_buffer->ring_meta) { 1284 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1285 meta->head_buffer = (unsigned long)head->page; 1286 } 1287 } 1288 1289 static void rb_list_head_clear(struct list_head *list) 1290 { 1291 unsigned long *ptr = (unsigned long *)&list->next; 1292 1293 *ptr &= ~RB_FLAG_MASK; 1294 } 1295 1296 /* 1297 * rb_head_page_deactivate - clears head page ptr (for free list) 1298 */ 1299 static void 1300 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1301 { 1302 struct list_head *hd; 1303 1304 /* Go through the whole list and clear any pointers found. */ 1305 rb_list_head_clear(cpu_buffer->pages); 1306 1307 list_for_each(hd, cpu_buffer->pages) 1308 rb_list_head_clear(hd); 1309 } 1310 1311 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1312 struct buffer_page *head, 1313 struct buffer_page *prev, 1314 int old_flag, int new_flag) 1315 { 1316 struct list_head *list; 1317 unsigned long val = (unsigned long)&head->list; 1318 unsigned long ret; 1319 1320 list = &prev->list; 1321 1322 val &= ~RB_FLAG_MASK; 1323 1324 ret = cmpxchg((unsigned long *)&list->next, 1325 val | old_flag, val | new_flag); 1326 1327 /* check if the reader took the page */ 1328 if ((ret & ~RB_FLAG_MASK) != val) 1329 return RB_PAGE_MOVED; 1330 1331 return ret & RB_FLAG_MASK; 1332 } 1333 1334 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1335 struct buffer_page *head, 1336 struct buffer_page *prev, 1337 int old_flag) 1338 { 1339 return rb_head_page_set(cpu_buffer, head, prev, 1340 old_flag, RB_PAGE_UPDATE); 1341 } 1342 1343 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1344 struct buffer_page *head, 1345 struct buffer_page *prev, 1346 int old_flag) 1347 { 1348 return rb_head_page_set(cpu_buffer, head, prev, 1349 old_flag, RB_PAGE_HEAD); 1350 } 1351 1352 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1353 struct buffer_page *head, 1354 struct buffer_page *prev, 1355 int old_flag) 1356 { 1357 return rb_head_page_set(cpu_buffer, head, prev, 1358 old_flag, RB_PAGE_NORMAL); 1359 } 1360 1361 static inline void rb_inc_page(struct buffer_page **bpage) 1362 { 1363 struct list_head *p = rb_list_head((*bpage)->list.next); 1364 1365 *bpage = list_entry(p, struct buffer_page, list); 1366 } 1367 1368 static inline void rb_dec_page(struct buffer_page **bpage) 1369 { 1370 struct list_head *p = rb_list_head((*bpage)->list.prev); 1371 1372 *bpage = list_entry(p, struct buffer_page, list); 1373 } 1374 1375 static struct buffer_page * 1376 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1377 { 1378 struct buffer_page *head; 1379 struct buffer_page *page; 1380 struct list_head *list; 1381 int i; 1382 1383 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1384 return NULL; 1385 1386 /* sanity check */ 1387 list = cpu_buffer->pages; 1388 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1389 return NULL; 1390 1391 page = head = cpu_buffer->head_page; 1392 /* 1393 * It is possible that the writer moves the header behind 1394 * where we started, and we miss in one loop. 1395 * A second loop should grab the header, but we'll do 1396 * three loops just because I'm paranoid. 1397 */ 1398 for (i = 0; i < 3; i++) { 1399 do { 1400 if (rb_is_head_page(page, page->list.prev)) { 1401 cpu_buffer->head_page = page; 1402 return page; 1403 } 1404 rb_inc_page(&page); 1405 } while (page != head); 1406 } 1407 1408 RB_WARN_ON(cpu_buffer, 1); 1409 1410 return NULL; 1411 } 1412 1413 static bool rb_head_page_replace(struct buffer_page *old, 1414 struct buffer_page *new) 1415 { 1416 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1417 unsigned long val; 1418 1419 val = *ptr & ~RB_FLAG_MASK; 1420 val |= RB_PAGE_HEAD; 1421 1422 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1423 } 1424 1425 /* 1426 * rb_tail_page_update - move the tail page forward 1427 */ 1428 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1429 struct buffer_page *tail_page, 1430 struct buffer_page *next_page) 1431 { 1432 unsigned long old_entries; 1433 unsigned long old_write; 1434 1435 /* 1436 * The tail page now needs to be moved forward. 1437 * 1438 * We need to reset the tail page, but without messing 1439 * with possible erasing of data brought in by interrupts 1440 * that have moved the tail page and are currently on it. 1441 * 1442 * We add a counter to the write field to denote this. 1443 */ 1444 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1445 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1446 1447 /* 1448 * Just make sure we have seen our old_write and synchronize 1449 * with any interrupts that come in. 1450 */ 1451 barrier(); 1452 1453 /* 1454 * If the tail page is still the same as what we think 1455 * it is, then it is up to us to update the tail 1456 * pointer. 1457 */ 1458 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1459 /* Zero the write counter */ 1460 unsigned long val = old_write & ~RB_WRITE_MASK; 1461 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1462 1463 /* 1464 * This will only succeed if an interrupt did 1465 * not come in and change it. In which case, we 1466 * do not want to modify it. 1467 * 1468 * We add (void) to let the compiler know that we do not care 1469 * about the return value of these functions. We use the 1470 * cmpxchg to only update if an interrupt did not already 1471 * do it for us. If the cmpxchg fails, we don't care. 1472 */ 1473 (void)local_cmpxchg(&next_page->write, old_write, val); 1474 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1475 1476 /* 1477 * No need to worry about races with clearing out the commit. 1478 * it only can increment when a commit takes place. But that 1479 * only happens in the outer most nested commit. 1480 */ 1481 local_set(&next_page->page->commit, 0); 1482 1483 /* Either we update tail_page or an interrupt does */ 1484 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1485 local_inc(&cpu_buffer->pages_touched); 1486 } 1487 } 1488 1489 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1490 struct buffer_page *bpage) 1491 { 1492 unsigned long val = (unsigned long)bpage; 1493 1494 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1495 } 1496 1497 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1498 struct list_head *list) 1499 { 1500 if (RB_WARN_ON(cpu_buffer, 1501 rb_list_head(rb_list_head(list->next)->prev) != list)) 1502 return false; 1503 1504 if (RB_WARN_ON(cpu_buffer, 1505 rb_list_head(rb_list_head(list->prev)->next) != list)) 1506 return false; 1507 1508 return true; 1509 } 1510 1511 /** 1512 * rb_check_pages - integrity check of buffer pages 1513 * @cpu_buffer: CPU buffer with pages to test 1514 * 1515 * As a safety measure we check to make sure the data pages have not 1516 * been corrupted. 1517 */ 1518 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1519 { 1520 struct list_head *head, *tmp; 1521 unsigned long buffer_cnt; 1522 unsigned long flags; 1523 int nr_loops = 0; 1524 1525 /* 1526 * Walk the linked list underpinning the ring buffer and validate all 1527 * its next and prev links. 1528 * 1529 * The check acquires the reader_lock to avoid concurrent processing 1530 * with code that could be modifying the list. However, the lock cannot 1531 * be held for the entire duration of the walk, as this would make the 1532 * time when interrupts are disabled non-deterministic, dependent on the 1533 * ring buffer size. Therefore, the code releases and re-acquires the 1534 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1535 * is then used to detect if the list was modified while the lock was 1536 * not held, in which case the check needs to be restarted. 1537 * 1538 * The code attempts to perform the check at most three times before 1539 * giving up. This is acceptable because this is only a self-validation 1540 * to detect problems early on. In practice, the list modification 1541 * operations are fairly spaced, and so this check typically succeeds at 1542 * most on the second try. 1543 */ 1544 again: 1545 if (++nr_loops > 3) 1546 return; 1547 1548 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1549 head = rb_list_head(cpu_buffer->pages); 1550 if (!rb_check_links(cpu_buffer, head)) 1551 goto out_locked; 1552 buffer_cnt = cpu_buffer->cnt; 1553 tmp = head; 1554 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1555 1556 while (true) { 1557 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1558 1559 if (buffer_cnt != cpu_buffer->cnt) { 1560 /* The list was updated, try again. */ 1561 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1562 goto again; 1563 } 1564 1565 tmp = rb_list_head(tmp->next); 1566 if (tmp == head) 1567 /* The iteration circled back, all is done. */ 1568 goto out_locked; 1569 1570 if (!rb_check_links(cpu_buffer, tmp)) 1571 goto out_locked; 1572 1573 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1574 } 1575 1576 out_locked: 1577 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1578 } 1579 1580 /* 1581 * Take an address, add the meta data size as well as the array of 1582 * array subbuffer indexes, then align it to a subbuffer size. 1583 * 1584 * This is used to help find the next per cpu subbuffer within a mapped range. 1585 */ 1586 static unsigned long 1587 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1588 { 1589 addr += sizeof(struct ring_buffer_cpu_meta) + 1590 sizeof(int) * nr_subbufs; 1591 return ALIGN(addr, subbuf_size); 1592 } 1593 1594 /* 1595 * Return the ring_buffer_meta for a given @cpu. 1596 */ 1597 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1598 { 1599 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1600 struct ring_buffer_cpu_meta *meta; 1601 struct ring_buffer_meta *bmeta; 1602 unsigned long ptr; 1603 int nr_subbufs; 1604 1605 bmeta = buffer->meta; 1606 if (!bmeta) 1607 return NULL; 1608 1609 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1610 meta = (struct ring_buffer_cpu_meta *)ptr; 1611 1612 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1613 if (!nr_pages) { 1614 nr_subbufs = meta->nr_subbufs; 1615 } else { 1616 /* Include the reader page */ 1617 nr_subbufs = nr_pages + 1; 1618 } 1619 1620 /* 1621 * The first chunk may not be subbuffer aligned, where as 1622 * the rest of the chunks are. 1623 */ 1624 if (cpu) { 1625 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1626 ptr += subbuf_size * nr_subbufs; 1627 1628 /* We can use multiplication to find chunks greater than 1 */ 1629 if (cpu > 1) { 1630 unsigned long size; 1631 unsigned long p; 1632 1633 /* Save the beginning of this CPU chunk */ 1634 p = ptr; 1635 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1636 ptr += subbuf_size * nr_subbufs; 1637 1638 /* Now all chunks after this are the same size */ 1639 size = ptr - p; 1640 ptr += size * (cpu - 2); 1641 } 1642 } 1643 return (void *)ptr; 1644 } 1645 1646 /* Return the start of subbufs given the meta pointer */ 1647 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1648 { 1649 int subbuf_size = meta->subbuf_size; 1650 unsigned long ptr; 1651 1652 ptr = (unsigned long)meta; 1653 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1654 1655 return (void *)ptr; 1656 } 1657 1658 /* 1659 * Return a specific sub-buffer for a given @cpu defined by @idx. 1660 */ 1661 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1662 { 1663 struct ring_buffer_cpu_meta *meta; 1664 unsigned long ptr; 1665 int subbuf_size; 1666 1667 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1668 if (!meta) 1669 return NULL; 1670 1671 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1672 return NULL; 1673 1674 subbuf_size = meta->subbuf_size; 1675 1676 /* Map this buffer to the order that's in meta->buffers[] */ 1677 idx = meta->buffers[idx]; 1678 1679 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1680 1681 ptr += subbuf_size * idx; 1682 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1683 return NULL; 1684 1685 return (void *)ptr; 1686 } 1687 1688 /* 1689 * See if the existing memory contains a valid meta section. 1690 * if so, use that, otherwise initialize it. 1691 */ 1692 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1693 { 1694 unsigned long ptr = buffer->range_addr_start; 1695 struct ring_buffer_meta *bmeta; 1696 unsigned long total_size; 1697 int struct_sizes; 1698 1699 bmeta = (struct ring_buffer_meta *)ptr; 1700 buffer->meta = bmeta; 1701 1702 total_size = buffer->range_addr_end - buffer->range_addr_start; 1703 1704 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1705 struct_sizes |= sizeof(*bmeta) << 16; 1706 1707 /* The first buffer will start word size after the meta page */ 1708 ptr += sizeof(*bmeta); 1709 ptr = ALIGN(ptr, sizeof(long)); 1710 ptr += scratch_size; 1711 1712 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1713 pr_info("Ring buffer boot meta mismatch of magic\n"); 1714 goto init; 1715 } 1716 1717 if (bmeta->struct_sizes != struct_sizes) { 1718 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1719 goto init; 1720 } 1721 1722 if (bmeta->total_size != total_size) { 1723 pr_info("Ring buffer boot meta mismatch of total size\n"); 1724 goto init; 1725 } 1726 1727 if (bmeta->buffers_offset > bmeta->total_size) { 1728 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1729 goto init; 1730 } 1731 1732 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1733 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1734 goto init; 1735 } 1736 1737 return true; 1738 1739 init: 1740 bmeta->magic = RING_BUFFER_META_MAGIC; 1741 bmeta->struct_sizes = struct_sizes; 1742 bmeta->total_size = total_size; 1743 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1744 1745 /* Zero out the scratch pad */ 1746 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1747 1748 return false; 1749 } 1750 1751 /* 1752 * See if the existing memory contains valid ring buffer data. 1753 * As the previous kernel must be the same as this kernel, all 1754 * the calculations (size of buffers and number of buffers) 1755 * must be the same. 1756 */ 1757 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1758 struct trace_buffer *buffer, int nr_pages, 1759 unsigned long *subbuf_mask) 1760 { 1761 int subbuf_size = PAGE_SIZE; 1762 struct buffer_data_page *subbuf; 1763 unsigned long buffers_start; 1764 unsigned long buffers_end; 1765 int i; 1766 1767 if (!subbuf_mask) 1768 return false; 1769 1770 buffers_start = meta->first_buffer; 1771 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1772 1773 /* Is the head and commit buffers within the range of buffers? */ 1774 if (meta->head_buffer < buffers_start || 1775 meta->head_buffer >= buffers_end) { 1776 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1777 return false; 1778 } 1779 1780 if (meta->commit_buffer < buffers_start || 1781 meta->commit_buffer >= buffers_end) { 1782 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1783 return false; 1784 } 1785 1786 subbuf = rb_subbufs_from_meta(meta); 1787 1788 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1789 1790 /* Is the meta buffers and the subbufs themselves have correct data? */ 1791 for (i = 0; i < meta->nr_subbufs; i++) { 1792 if (meta->buffers[i] < 0 || 1793 meta->buffers[i] >= meta->nr_subbufs) { 1794 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1795 return false; 1796 } 1797 1798 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1799 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1800 return false; 1801 } 1802 1803 if (test_bit(meta->buffers[i], subbuf_mask)) { 1804 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1805 return false; 1806 } 1807 1808 set_bit(meta->buffers[i], subbuf_mask); 1809 subbuf = (void *)subbuf + subbuf_size; 1810 } 1811 1812 return true; 1813 } 1814 1815 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1816 1817 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1818 unsigned long long *timestamp, u64 *delta_ptr) 1819 { 1820 struct ring_buffer_event *event; 1821 u64 ts, delta; 1822 int events = 0; 1823 int len; 1824 int e; 1825 1826 *delta_ptr = 0; 1827 *timestamp = 0; 1828 1829 ts = dpage->time_stamp; 1830 1831 for (e = 0; e < tail; e += len) { 1832 1833 event = (struct ring_buffer_event *)(dpage->data + e); 1834 len = rb_event_length(event); 1835 if (len <= 0 || len > tail - e) 1836 return -1; 1837 1838 switch (event->type_len) { 1839 1840 case RINGBUF_TYPE_TIME_EXTEND: 1841 delta = rb_event_time_stamp(event); 1842 ts += delta; 1843 break; 1844 1845 case RINGBUF_TYPE_TIME_STAMP: 1846 delta = rb_event_time_stamp(event); 1847 delta = rb_fix_abs_ts(delta, ts); 1848 if (delta < ts) { 1849 *delta_ptr = delta; 1850 *timestamp = ts; 1851 return -1; 1852 } 1853 ts = delta; 1854 break; 1855 1856 case RINGBUF_TYPE_PADDING: 1857 if (event->time_delta == 1) 1858 break; 1859 fallthrough; 1860 case RINGBUF_TYPE_DATA: 1861 events++; 1862 ts += event->time_delta; 1863 break; 1864 1865 default: 1866 return -1; 1867 } 1868 } 1869 *timestamp = ts; 1870 return events; 1871 } 1872 1873 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1874 { 1875 unsigned long long ts; 1876 u64 delta; 1877 int tail; 1878 1879 tail = local_read(&dpage->commit); 1880 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1881 } 1882 1883 /* If the meta data has been validated, now validate the events */ 1884 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1885 { 1886 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1887 struct buffer_page *head_page, *orig_head; 1888 unsigned long entry_bytes = 0; 1889 unsigned long entries = 0; 1890 int ret; 1891 u64 ts; 1892 int i; 1893 1894 if (!meta || !meta->head_buffer) 1895 return; 1896 1897 orig_head = head_page = cpu_buffer->head_page; 1898 1899 /* Do the reader page first */ 1900 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1901 if (ret < 0) { 1902 pr_info("Ring buffer reader page is invalid\n"); 1903 goto invalid; 1904 } 1905 entries += ret; 1906 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1907 local_set(&cpu_buffer->reader_page->entries, ret); 1908 1909 ts = head_page->page->time_stamp; 1910 1911 /* 1912 * Try to rewind the head so that we can read the pages which already 1913 * read in the previous boot. 1914 */ 1915 if (head_page == cpu_buffer->tail_page) 1916 goto skip_rewind; 1917 1918 rb_dec_page(&head_page); 1919 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1920 1921 /* Rewind until tail (writer) page. */ 1922 if (head_page == cpu_buffer->tail_page) 1923 break; 1924 1925 /* Ensure the page has older data than head. */ 1926 if (ts < head_page->page->time_stamp) 1927 break; 1928 1929 ts = head_page->page->time_stamp; 1930 /* Ensure the page has correct timestamp and some data. */ 1931 if (!ts || rb_page_commit(head_page) == 0) 1932 break; 1933 1934 /* Stop rewind if the page is invalid. */ 1935 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1936 if (ret < 0) 1937 break; 1938 1939 /* Recover the number of entries and update stats. */ 1940 local_set(&head_page->entries, ret); 1941 if (ret) 1942 local_inc(&cpu_buffer->pages_touched); 1943 entries += ret; 1944 entry_bytes += rb_page_commit(head_page); 1945 } 1946 if (i) 1947 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1948 1949 /* The last rewound page must be skipped. */ 1950 if (head_page != orig_head) 1951 rb_inc_page(&head_page); 1952 1953 /* 1954 * If the ring buffer was rewound, then inject the reader page 1955 * into the location just before the original head page. 1956 */ 1957 if (head_page != orig_head) { 1958 struct buffer_page *bpage = orig_head; 1959 1960 rb_dec_page(&bpage); 1961 /* 1962 * Insert the reader_page before the original head page. 1963 * Since the list encode RB_PAGE flags, general list 1964 * operations should be avoided. 1965 */ 1966 cpu_buffer->reader_page->list.next = &orig_head->list; 1967 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1968 orig_head->list.prev = &cpu_buffer->reader_page->list; 1969 bpage->list.next = &cpu_buffer->reader_page->list; 1970 1971 /* Make the head_page the reader page */ 1972 cpu_buffer->reader_page = head_page; 1973 bpage = head_page; 1974 rb_inc_page(&head_page); 1975 head_page->list.prev = bpage->list.prev; 1976 rb_dec_page(&bpage); 1977 bpage->list.next = &head_page->list; 1978 rb_set_list_to_head(&bpage->list); 1979 cpu_buffer->pages = &head_page->list; 1980 1981 cpu_buffer->head_page = head_page; 1982 meta->head_buffer = (unsigned long)head_page->page; 1983 1984 /* Reset all the indexes */ 1985 bpage = cpu_buffer->reader_page; 1986 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 1987 bpage->id = 0; 1988 1989 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 1990 i++, rb_inc_page(&bpage)) { 1991 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 1992 bpage->id = i; 1993 } 1994 1995 /* We'll restart verifying from orig_head */ 1996 head_page = orig_head; 1997 } 1998 1999 skip_rewind: 2000 /* If the commit_buffer is the reader page, update the commit page */ 2001 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2002 cpu_buffer->commit_page = cpu_buffer->reader_page; 2003 /* Nothing more to do, the only page is the reader page */ 2004 goto done; 2005 } 2006 2007 /* Iterate until finding the commit page */ 2008 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2009 2010 /* Reader page has already been done */ 2011 if (head_page == cpu_buffer->reader_page) 2012 continue; 2013 2014 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2015 if (ret < 0) { 2016 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2017 cpu_buffer->cpu); 2018 goto invalid; 2019 } 2020 2021 /* If the buffer has content, update pages_touched */ 2022 if (ret) 2023 local_inc(&cpu_buffer->pages_touched); 2024 2025 entries += ret; 2026 entry_bytes += local_read(&head_page->page->commit); 2027 local_set(&head_page->entries, ret); 2028 2029 if (head_page == cpu_buffer->commit_page) 2030 break; 2031 } 2032 2033 if (head_page != cpu_buffer->commit_page) { 2034 pr_info("Ring buffer meta [%d] commit page not found\n", 2035 cpu_buffer->cpu); 2036 goto invalid; 2037 } 2038 done: 2039 local_set(&cpu_buffer->entries, entries); 2040 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2041 2042 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2043 return; 2044 2045 invalid: 2046 /* The content of the buffers are invalid, reset the meta data */ 2047 meta->head_buffer = 0; 2048 meta->commit_buffer = 0; 2049 2050 /* Reset the reader page */ 2051 local_set(&cpu_buffer->reader_page->entries, 0); 2052 local_set(&cpu_buffer->reader_page->page->commit, 0); 2053 2054 /* Reset all the subbuffers */ 2055 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2056 local_set(&head_page->entries, 0); 2057 local_set(&head_page->page->commit, 0); 2058 } 2059 } 2060 2061 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2062 { 2063 struct ring_buffer_cpu_meta *meta; 2064 unsigned long *subbuf_mask; 2065 unsigned long delta; 2066 void *subbuf; 2067 bool valid = false; 2068 int cpu; 2069 int i; 2070 2071 /* Create a mask to test the subbuf array */ 2072 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2073 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2074 2075 if (rb_meta_init(buffer, scratch_size)) 2076 valid = true; 2077 2078 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2079 void *next_meta; 2080 2081 meta = rb_range_meta(buffer, nr_pages, cpu); 2082 2083 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2084 /* Make the mappings match the current address */ 2085 subbuf = rb_subbufs_from_meta(meta); 2086 delta = (unsigned long)subbuf - meta->first_buffer; 2087 meta->first_buffer += delta; 2088 meta->head_buffer += delta; 2089 meta->commit_buffer += delta; 2090 continue; 2091 } 2092 2093 if (cpu < nr_cpu_ids - 1) 2094 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2095 else 2096 next_meta = (void *)buffer->range_addr_end; 2097 2098 memset(meta, 0, next_meta - (void *)meta); 2099 2100 meta->nr_subbufs = nr_pages + 1; 2101 meta->subbuf_size = PAGE_SIZE; 2102 2103 subbuf = rb_subbufs_from_meta(meta); 2104 2105 meta->first_buffer = (unsigned long)subbuf; 2106 2107 /* 2108 * The buffers[] array holds the order of the sub-buffers 2109 * that are after the meta data. The sub-buffers may 2110 * be swapped out when read and inserted into a different 2111 * location of the ring buffer. Although their addresses 2112 * remain the same, the buffers[] array contains the 2113 * index into the sub-buffers holding their actual order. 2114 */ 2115 for (i = 0; i < meta->nr_subbufs; i++) { 2116 meta->buffers[i] = i; 2117 rb_init_page(subbuf); 2118 subbuf += meta->subbuf_size; 2119 } 2120 } 2121 bitmap_free(subbuf_mask); 2122 } 2123 2124 static void *rbm_start(struct seq_file *m, loff_t *pos) 2125 { 2126 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2127 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2128 unsigned long val; 2129 2130 if (!meta) 2131 return NULL; 2132 2133 if (*pos > meta->nr_subbufs) 2134 return NULL; 2135 2136 val = *pos; 2137 val++; 2138 2139 return (void *)val; 2140 } 2141 2142 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2143 { 2144 (*pos)++; 2145 2146 return rbm_start(m, pos); 2147 } 2148 2149 static int rbm_show(struct seq_file *m, void *v) 2150 { 2151 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2152 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2153 unsigned long val = (unsigned long)v; 2154 2155 if (val == 1) { 2156 seq_printf(m, "head_buffer: %d\n", 2157 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2158 seq_printf(m, "commit_buffer: %d\n", 2159 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2160 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2161 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2162 return 0; 2163 } 2164 2165 val -= 2; 2166 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2167 2168 return 0; 2169 } 2170 2171 static void rbm_stop(struct seq_file *m, void *p) 2172 { 2173 } 2174 2175 static const struct seq_operations rb_meta_seq_ops = { 2176 .start = rbm_start, 2177 .next = rbm_next, 2178 .show = rbm_show, 2179 .stop = rbm_stop, 2180 }; 2181 2182 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2183 { 2184 struct seq_file *m; 2185 int ret; 2186 2187 ret = seq_open(file, &rb_meta_seq_ops); 2188 if (ret) 2189 return ret; 2190 2191 m = file->private_data; 2192 m->private = buffer->buffers[cpu]; 2193 2194 return 0; 2195 } 2196 2197 /* Map the buffer_pages to the previous head and commit pages */ 2198 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2199 struct buffer_page *bpage) 2200 { 2201 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2202 2203 if (meta->head_buffer == (unsigned long)bpage->page) 2204 cpu_buffer->head_page = bpage; 2205 2206 if (meta->commit_buffer == (unsigned long)bpage->page) { 2207 cpu_buffer->commit_page = bpage; 2208 cpu_buffer->tail_page = bpage; 2209 } 2210 } 2211 2212 static struct ring_buffer_desc *ring_buffer_desc(struct trace_buffer_desc *trace_desc, int cpu) 2213 { 2214 struct ring_buffer_desc *desc, *end; 2215 size_t len; 2216 int i; 2217 2218 if (!trace_desc) 2219 return NULL; 2220 2221 if (cpu >= trace_desc->nr_cpus) 2222 return NULL; 2223 2224 end = (struct ring_buffer_desc *)((void *)trace_desc + trace_desc->struct_len); 2225 desc = __first_ring_buffer_desc(trace_desc); 2226 len = struct_size(desc, page_va, desc->nr_page_va); 2227 desc = (struct ring_buffer_desc *)((void *)desc + (len * cpu)); 2228 2229 if (desc < end && desc->cpu == cpu) 2230 return desc; 2231 2232 /* Missing CPUs, need to linear search */ 2233 for_each_ring_buffer_desc(desc, i, trace_desc) { 2234 if (desc->cpu == cpu) 2235 return desc; 2236 } 2237 2238 return NULL; 2239 } 2240 2241 static void *ring_buffer_desc_page(struct ring_buffer_desc *desc, unsigned int page_id) 2242 { 2243 return page_id >= desc->nr_page_va ? NULL : (void *)desc->page_va[page_id]; 2244 } 2245 2246 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2247 long nr_pages, struct list_head *pages) 2248 { 2249 struct trace_buffer *buffer = cpu_buffer->buffer; 2250 struct ring_buffer_cpu_meta *meta = NULL; 2251 struct buffer_page *bpage, *tmp; 2252 bool user_thread = current->mm != NULL; 2253 struct ring_buffer_desc *desc = NULL; 2254 long i; 2255 2256 /* 2257 * Check if the available memory is there first. 2258 * Note, si_mem_available() only gives us a rough estimate of available 2259 * memory. It may not be accurate. But we don't care, we just want 2260 * to prevent doing any allocation when it is obvious that it is 2261 * not going to succeed. 2262 */ 2263 i = si_mem_available(); 2264 if (i < nr_pages) 2265 return -ENOMEM; 2266 2267 /* 2268 * If a user thread allocates too much, and si_mem_available() 2269 * reports there's enough memory, even though there is not. 2270 * Make sure the OOM killer kills this thread. This can happen 2271 * even with RETRY_MAYFAIL because another task may be doing 2272 * an allocation after this task has taken all memory. 2273 * This is the task the OOM killer needs to take out during this 2274 * loop, even if it was triggered by an allocation somewhere else. 2275 */ 2276 if (user_thread) 2277 set_current_oom_origin(); 2278 2279 if (buffer->range_addr_start) 2280 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2281 2282 if (buffer->remote) { 2283 desc = ring_buffer_desc(buffer->remote->desc, cpu_buffer->cpu); 2284 if (!desc || WARN_ON(desc->nr_page_va != (nr_pages + 1))) 2285 return -EINVAL; 2286 } 2287 2288 for (i = 0; i < nr_pages; i++) { 2289 2290 bpage = alloc_cpu_page(cpu_buffer->cpu); 2291 if (!bpage) 2292 goto free_pages; 2293 2294 rb_check_bpage(cpu_buffer, bpage); 2295 2296 /* 2297 * Append the pages as for mapped buffers we want to keep 2298 * the order 2299 */ 2300 list_add_tail(&bpage->list, pages); 2301 2302 if (meta) { 2303 /* A range was given. Use that for the buffer page */ 2304 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2305 if (!bpage->page) 2306 goto free_pages; 2307 /* If this is valid from a previous boot */ 2308 if (meta->head_buffer) 2309 rb_meta_buffer_update(cpu_buffer, bpage); 2310 bpage->range = 1; 2311 bpage->id = i + 1; 2312 } else if (desc) { 2313 void *p = ring_buffer_desc_page(desc, i + 1); 2314 2315 if (WARN_ON(!p)) 2316 goto free_pages; 2317 2318 bpage->page = p; 2319 bpage->range = 1; /* bpage->page can't be freed */ 2320 bpage->id = i + 1; 2321 cpu_buffer->subbuf_ids[i + 1] = bpage; 2322 } else { 2323 int order = cpu_buffer->buffer->subbuf_order; 2324 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2325 if (!bpage->page) 2326 goto free_pages; 2327 } 2328 bpage->order = cpu_buffer->buffer->subbuf_order; 2329 2330 if (user_thread && fatal_signal_pending(current)) 2331 goto free_pages; 2332 } 2333 if (user_thread) 2334 clear_current_oom_origin(); 2335 2336 return 0; 2337 2338 free_pages: 2339 list_for_each_entry_safe(bpage, tmp, pages, list) { 2340 list_del_init(&bpage->list); 2341 free_buffer_page(bpage); 2342 } 2343 if (user_thread) 2344 clear_current_oom_origin(); 2345 2346 return -ENOMEM; 2347 } 2348 2349 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2350 unsigned long nr_pages) 2351 { 2352 LIST_HEAD(pages); 2353 2354 WARN_ON(!nr_pages); 2355 2356 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2357 return -ENOMEM; 2358 2359 /* 2360 * The ring buffer page list is a circular list that does not 2361 * start and end with a list head. All page list items point to 2362 * other pages. 2363 */ 2364 cpu_buffer->pages = pages.next; 2365 list_del(&pages); 2366 2367 cpu_buffer->nr_pages = nr_pages; 2368 2369 rb_check_pages(cpu_buffer); 2370 2371 return 0; 2372 } 2373 2374 static struct ring_buffer_per_cpu * 2375 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2376 { 2377 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2378 alloc_cpu_buffer(cpu); 2379 struct ring_buffer_cpu_meta *meta; 2380 struct buffer_page *bpage; 2381 int ret; 2382 2383 if (!cpu_buffer) 2384 return NULL; 2385 2386 cpu_buffer->cpu = cpu; 2387 cpu_buffer->buffer = buffer; 2388 raw_spin_lock_init(&cpu_buffer->reader_lock); 2389 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2390 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2391 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2392 init_completion(&cpu_buffer->update_done); 2393 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2394 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2395 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2396 mutex_init(&cpu_buffer->mapping_lock); 2397 2398 bpage = alloc_cpu_page(cpu); 2399 if (!bpage) 2400 return NULL; 2401 2402 rb_check_bpage(cpu_buffer, bpage); 2403 2404 cpu_buffer->reader_page = bpage; 2405 2406 if (buffer->range_addr_start) { 2407 /* 2408 * Range mapped buffers have the same restrictions as memory 2409 * mapped ones do. 2410 */ 2411 cpu_buffer->mapped = 1; 2412 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2413 bpage->page = rb_range_buffer(cpu_buffer, 0); 2414 if (!bpage->page) 2415 goto fail_free_reader; 2416 if (cpu_buffer->ring_meta->head_buffer) 2417 rb_meta_buffer_update(cpu_buffer, bpage); 2418 bpage->range = 1; 2419 } else if (buffer->remote) { 2420 struct ring_buffer_desc *desc = ring_buffer_desc(buffer->remote->desc, cpu); 2421 2422 if (!desc) 2423 goto fail_free_reader; 2424 2425 cpu_buffer->remote = buffer->remote; 2426 cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)desc->meta_va; 2427 cpu_buffer->nr_pages = nr_pages; 2428 cpu_buffer->subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, 2429 sizeof(*cpu_buffer->subbuf_ids), GFP_KERNEL); 2430 if (!cpu_buffer->subbuf_ids) 2431 goto fail_free_reader; 2432 2433 /* Remote buffers are read-only and immutable */ 2434 atomic_inc(&cpu_buffer->record_disabled); 2435 atomic_inc(&cpu_buffer->resize_disabled); 2436 2437 bpage->page = ring_buffer_desc_page(desc, cpu_buffer->meta_page->reader.id); 2438 if (!bpage->page) 2439 goto fail_free_reader; 2440 2441 bpage->range = 1; 2442 cpu_buffer->subbuf_ids[0] = bpage; 2443 } else { 2444 int order = cpu_buffer->buffer->subbuf_order; 2445 bpage->page = alloc_cpu_data(cpu, order); 2446 if (!bpage->page) 2447 goto fail_free_reader; 2448 } 2449 2450 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2451 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2452 2453 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2454 if (ret < 0) 2455 goto fail_free_reader; 2456 2457 rb_meta_validate_events(cpu_buffer); 2458 2459 /* If the boot meta was valid then this has already been updated */ 2460 meta = cpu_buffer->ring_meta; 2461 if (!meta || !meta->head_buffer || 2462 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2463 if (meta && meta->head_buffer && 2464 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2465 pr_warn("Ring buffer meta buffers not all mapped\n"); 2466 if (!cpu_buffer->head_page) 2467 pr_warn(" Missing head_page\n"); 2468 if (!cpu_buffer->commit_page) 2469 pr_warn(" Missing commit_page\n"); 2470 if (!cpu_buffer->tail_page) 2471 pr_warn(" Missing tail_page\n"); 2472 } 2473 2474 cpu_buffer->head_page 2475 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2476 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2477 2478 rb_head_page_activate(cpu_buffer); 2479 2480 if (cpu_buffer->ring_meta) 2481 meta->commit_buffer = meta->head_buffer; 2482 } else { 2483 /* The valid meta buffer still needs to activate the head page */ 2484 rb_head_page_activate(cpu_buffer); 2485 } 2486 2487 return_ptr(cpu_buffer); 2488 2489 fail_free_reader: 2490 free_buffer_page(cpu_buffer->reader_page); 2491 2492 return NULL; 2493 } 2494 2495 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2496 { 2497 struct list_head *head = cpu_buffer->pages; 2498 struct buffer_page *bpage, *tmp; 2499 2500 irq_work_sync(&cpu_buffer->irq_work.work); 2501 2502 if (cpu_buffer->remote) 2503 kfree(cpu_buffer->subbuf_ids); 2504 2505 free_buffer_page(cpu_buffer->reader_page); 2506 2507 if (head) { 2508 rb_head_page_deactivate(cpu_buffer); 2509 2510 list_for_each_entry_safe(bpage, tmp, head, list) { 2511 list_del_init(&bpage->list); 2512 free_buffer_page(bpage); 2513 } 2514 bpage = list_entry(head, struct buffer_page, list); 2515 free_buffer_page(bpage); 2516 } 2517 2518 free_page((unsigned long)cpu_buffer->free_page); 2519 2520 kfree(cpu_buffer); 2521 } 2522 2523 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2524 int order, unsigned long start, 2525 unsigned long end, 2526 unsigned long scratch_size, 2527 struct lock_class_key *key, 2528 struct ring_buffer_remote *remote) 2529 { 2530 struct trace_buffer *buffer __free(kfree) = NULL; 2531 long nr_pages; 2532 int subbuf_size; 2533 int bsize; 2534 int cpu; 2535 int ret; 2536 2537 /* keep it in its own cache line */ 2538 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2539 GFP_KERNEL); 2540 if (!buffer) 2541 return NULL; 2542 2543 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2544 return NULL; 2545 2546 buffer->subbuf_order = order; 2547 subbuf_size = (PAGE_SIZE << order); 2548 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2549 2550 /* Max payload is buffer page size - header (8bytes) */ 2551 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2552 2553 buffer->flags = flags; 2554 buffer->clock = trace_clock_local; 2555 buffer->reader_lock_key = key; 2556 2557 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2558 init_waitqueue_head(&buffer->irq_work.waiters); 2559 2560 buffer->cpus = nr_cpu_ids; 2561 2562 bsize = sizeof(void *) * nr_cpu_ids; 2563 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2564 GFP_KERNEL); 2565 if (!buffer->buffers) 2566 goto fail_free_cpumask; 2567 2568 cpu = raw_smp_processor_id(); 2569 2570 /* If start/end are specified, then that overrides size */ 2571 if (start && end) { 2572 unsigned long buffers_start; 2573 unsigned long ptr; 2574 int n; 2575 2576 /* Make sure that start is word aligned */ 2577 start = ALIGN(start, sizeof(long)); 2578 2579 /* scratch_size needs to be aligned too */ 2580 scratch_size = ALIGN(scratch_size, sizeof(long)); 2581 2582 /* Subtract the buffer meta data and word aligned */ 2583 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2584 buffers_start = ALIGN(buffers_start, sizeof(long)); 2585 buffers_start += scratch_size; 2586 2587 /* Calculate the size for the per CPU data */ 2588 size = end - buffers_start; 2589 size = size / nr_cpu_ids; 2590 2591 /* 2592 * The number of sub-buffers (nr_pages) is determined by the 2593 * total size allocated minus the meta data size. 2594 * Then that is divided by the number of per CPU buffers 2595 * needed, plus account for the integer array index that 2596 * will be appended to the meta data. 2597 */ 2598 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2599 (subbuf_size + sizeof(int)); 2600 /* Need at least two pages plus the reader page */ 2601 if (nr_pages < 3) 2602 goto fail_free_buffers; 2603 2604 again: 2605 /* Make sure that the size fits aligned */ 2606 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2607 ptr += sizeof(struct ring_buffer_cpu_meta) + 2608 sizeof(int) * nr_pages; 2609 ptr = ALIGN(ptr, subbuf_size); 2610 ptr += subbuf_size * nr_pages; 2611 } 2612 if (ptr > end) { 2613 if (nr_pages <= 3) 2614 goto fail_free_buffers; 2615 nr_pages--; 2616 goto again; 2617 } 2618 2619 /* nr_pages should not count the reader page */ 2620 nr_pages--; 2621 buffer->range_addr_start = start; 2622 buffer->range_addr_end = end; 2623 2624 rb_range_meta_init(buffer, nr_pages, scratch_size); 2625 } else if (remote) { 2626 struct ring_buffer_desc *desc = ring_buffer_desc(remote->desc, cpu); 2627 2628 buffer->remote = remote; 2629 /* The writer is remote. This ring-buffer is read-only */ 2630 atomic_inc(&buffer->record_disabled); 2631 nr_pages = desc->nr_page_va - 1; 2632 if (nr_pages < 2) 2633 goto fail_free_buffers; 2634 } else { 2635 2636 /* need at least two pages */ 2637 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2638 if (nr_pages < 2) 2639 nr_pages = 2; 2640 } 2641 2642 cpumask_set_cpu(cpu, buffer->cpumask); 2643 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2644 if (!buffer->buffers[cpu]) 2645 goto fail_free_buffers; 2646 2647 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2648 if (ret < 0) 2649 goto fail_free_buffers; 2650 2651 mutex_init(&buffer->mutex); 2652 2653 return_ptr(buffer); 2654 2655 fail_free_buffers: 2656 for_each_buffer_cpu(buffer, cpu) { 2657 if (buffer->buffers[cpu]) 2658 rb_free_cpu_buffer(buffer->buffers[cpu]); 2659 } 2660 kfree(buffer->buffers); 2661 2662 fail_free_cpumask: 2663 free_cpumask_var(buffer->cpumask); 2664 2665 return NULL; 2666 } 2667 2668 /** 2669 * __ring_buffer_alloc - allocate a new ring_buffer 2670 * @size: the size in bytes per cpu that is needed. 2671 * @flags: attributes to set for the ring buffer. 2672 * @key: ring buffer reader_lock_key. 2673 * 2674 * Currently the only flag that is available is the RB_FL_OVERWRITE 2675 * flag. This flag means that the buffer will overwrite old data 2676 * when the buffer wraps. If this flag is not set, the buffer will 2677 * drop data when the tail hits the head. 2678 */ 2679 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2680 struct lock_class_key *key) 2681 { 2682 /* Default buffer page size - one system page */ 2683 return alloc_buffer(size, flags, 0, 0, 0, 0, key, NULL); 2684 2685 } 2686 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2687 2688 /** 2689 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2690 * @size: the size in bytes per cpu that is needed. 2691 * @flags: attributes to set for the ring buffer. 2692 * @order: sub-buffer order 2693 * @start: start of allocated range 2694 * @range_size: size of allocated range 2695 * @scratch_size: size of scratch area (for preallocated memory buffers) 2696 * @key: ring buffer reader_lock_key. 2697 * 2698 * Currently the only flag that is available is the RB_FL_OVERWRITE 2699 * flag. This flag means that the buffer will overwrite old data 2700 * when the buffer wraps. If this flag is not set, the buffer will 2701 * drop data when the tail hits the head. 2702 */ 2703 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2704 int order, unsigned long start, 2705 unsigned long range_size, 2706 unsigned long scratch_size, 2707 struct lock_class_key *key) 2708 { 2709 return alloc_buffer(size, flags, order, start, start + range_size, 2710 scratch_size, key, NULL); 2711 } 2712 2713 /** 2714 * __ring_buffer_alloc_remote - allocate a new ring_buffer from a remote 2715 * @remote: Contains a description of the ring-buffer pages and remote callbacks. 2716 * @key: ring buffer reader_lock_key. 2717 */ 2718 struct trace_buffer *__ring_buffer_alloc_remote(struct ring_buffer_remote *remote, 2719 struct lock_class_key *key) 2720 { 2721 return alloc_buffer(0, 0, 0, 0, 0, 0, key, remote); 2722 } 2723 2724 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2725 { 2726 struct ring_buffer_meta *meta; 2727 void *ptr; 2728 2729 if (!buffer || !buffer->meta) 2730 return NULL; 2731 2732 meta = buffer->meta; 2733 2734 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2735 2736 if (size) 2737 *size = (void *)meta + meta->buffers_offset - ptr; 2738 2739 return ptr; 2740 } 2741 2742 /** 2743 * ring_buffer_free - free a ring buffer. 2744 * @buffer: the buffer to free. 2745 */ 2746 void 2747 ring_buffer_free(struct trace_buffer *buffer) 2748 { 2749 int cpu; 2750 2751 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2752 2753 irq_work_sync(&buffer->irq_work.work); 2754 2755 for_each_buffer_cpu(buffer, cpu) 2756 rb_free_cpu_buffer(buffer->buffers[cpu]); 2757 2758 kfree(buffer->buffers); 2759 free_cpumask_var(buffer->cpumask); 2760 2761 kfree(buffer); 2762 } 2763 EXPORT_SYMBOL_GPL(ring_buffer_free); 2764 2765 void ring_buffer_set_clock(struct trace_buffer *buffer, 2766 u64 (*clock)(void)) 2767 { 2768 buffer->clock = clock; 2769 } 2770 2771 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2772 { 2773 buffer->time_stamp_abs = abs; 2774 } 2775 2776 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2777 { 2778 return buffer->time_stamp_abs; 2779 } 2780 2781 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2782 { 2783 return local_read(&bpage->entries) & RB_WRITE_MASK; 2784 } 2785 2786 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2787 { 2788 return local_read(&bpage->write) & RB_WRITE_MASK; 2789 } 2790 2791 static bool 2792 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2793 { 2794 struct list_head *tail_page, *to_remove, *next_page; 2795 struct buffer_page *to_remove_page, *tmp_iter_page; 2796 struct buffer_page *last_page, *first_page; 2797 unsigned long nr_removed; 2798 unsigned long head_bit; 2799 int page_entries; 2800 2801 head_bit = 0; 2802 2803 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2804 atomic_inc(&cpu_buffer->record_disabled); 2805 /* 2806 * We don't race with the readers since we have acquired the reader 2807 * lock. We also don't race with writers after disabling recording. 2808 * This makes it easy to figure out the first and the last page to be 2809 * removed from the list. We unlink all the pages in between including 2810 * the first and last pages. This is done in a busy loop so that we 2811 * lose the least number of traces. 2812 * The pages are freed after we restart recording and unlock readers. 2813 */ 2814 tail_page = &cpu_buffer->tail_page->list; 2815 2816 /* 2817 * tail page might be on reader page, we remove the next page 2818 * from the ring buffer 2819 */ 2820 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2821 tail_page = rb_list_head(tail_page->next); 2822 to_remove = tail_page; 2823 2824 /* start of pages to remove */ 2825 first_page = list_entry(rb_list_head(to_remove->next), 2826 struct buffer_page, list); 2827 2828 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2829 to_remove = rb_list_head(to_remove)->next; 2830 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2831 } 2832 /* Read iterators need to reset themselves when some pages removed */ 2833 cpu_buffer->pages_removed += nr_removed; 2834 2835 next_page = rb_list_head(to_remove)->next; 2836 2837 /* 2838 * Now we remove all pages between tail_page and next_page. 2839 * Make sure that we have head_bit value preserved for the 2840 * next page 2841 */ 2842 tail_page->next = (struct list_head *)((unsigned long)next_page | 2843 head_bit); 2844 next_page = rb_list_head(next_page); 2845 next_page->prev = tail_page; 2846 2847 /* make sure pages points to a valid page in the ring buffer */ 2848 cpu_buffer->pages = next_page; 2849 cpu_buffer->cnt++; 2850 2851 /* update head page */ 2852 if (head_bit) 2853 cpu_buffer->head_page = list_entry(next_page, 2854 struct buffer_page, list); 2855 2856 /* pages are removed, resume tracing and then free the pages */ 2857 atomic_dec(&cpu_buffer->record_disabled); 2858 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2859 2860 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2861 2862 /* last buffer page to remove */ 2863 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2864 list); 2865 tmp_iter_page = first_page; 2866 2867 do { 2868 cond_resched(); 2869 2870 to_remove_page = tmp_iter_page; 2871 rb_inc_page(&tmp_iter_page); 2872 2873 /* update the counters */ 2874 page_entries = rb_page_entries(to_remove_page); 2875 if (page_entries) { 2876 /* 2877 * If something was added to this page, it was full 2878 * since it is not the tail page. So we deduct the 2879 * bytes consumed in ring buffer from here. 2880 * Increment overrun to account for the lost events. 2881 */ 2882 local_add(page_entries, &cpu_buffer->overrun); 2883 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2884 local_inc(&cpu_buffer->pages_lost); 2885 } 2886 2887 /* 2888 * We have already removed references to this list item, just 2889 * free up the buffer_page and its page 2890 */ 2891 free_buffer_page(to_remove_page); 2892 nr_removed--; 2893 2894 } while (to_remove_page != last_page); 2895 2896 RB_WARN_ON(cpu_buffer, nr_removed); 2897 2898 return nr_removed == 0; 2899 } 2900 2901 static bool 2902 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2903 { 2904 struct list_head *pages = &cpu_buffer->new_pages; 2905 unsigned long flags; 2906 bool success; 2907 int retries; 2908 2909 /* Can be called at early boot up, where interrupts must not been enabled */ 2910 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2911 /* 2912 * We are holding the reader lock, so the reader page won't be swapped 2913 * in the ring buffer. Now we are racing with the writer trying to 2914 * move head page and the tail page. 2915 * We are going to adapt the reader page update process where: 2916 * 1. We first splice the start and end of list of new pages between 2917 * the head page and its previous page. 2918 * 2. We cmpxchg the prev_page->next to point from head page to the 2919 * start of new pages list. 2920 * 3. Finally, we update the head->prev to the end of new list. 2921 * 2922 * We will try this process 10 times, to make sure that we don't keep 2923 * spinning. 2924 */ 2925 retries = 10; 2926 success = false; 2927 while (retries--) { 2928 struct list_head *head_page, *prev_page; 2929 struct list_head *last_page, *first_page; 2930 struct list_head *head_page_with_bit; 2931 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2932 2933 if (!hpage) 2934 break; 2935 head_page = &hpage->list; 2936 prev_page = head_page->prev; 2937 2938 first_page = pages->next; 2939 last_page = pages->prev; 2940 2941 head_page_with_bit = (struct list_head *) 2942 ((unsigned long)head_page | RB_PAGE_HEAD); 2943 2944 last_page->next = head_page_with_bit; 2945 first_page->prev = prev_page; 2946 2947 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2948 if (try_cmpxchg(&prev_page->next, 2949 &head_page_with_bit, first_page)) { 2950 /* 2951 * yay, we replaced the page pointer to our new list, 2952 * now, we just have to update to head page's prev 2953 * pointer to point to end of list 2954 */ 2955 head_page->prev = last_page; 2956 cpu_buffer->cnt++; 2957 success = true; 2958 break; 2959 } 2960 } 2961 2962 if (success) 2963 INIT_LIST_HEAD(pages); 2964 /* 2965 * If we weren't successful in adding in new pages, warn and stop 2966 * tracing 2967 */ 2968 RB_WARN_ON(cpu_buffer, !success); 2969 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2970 2971 /* free pages if they weren't inserted */ 2972 if (!success) { 2973 struct buffer_page *bpage, *tmp; 2974 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2975 list) { 2976 list_del_init(&bpage->list); 2977 free_buffer_page(bpage); 2978 } 2979 } 2980 return success; 2981 } 2982 2983 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2984 { 2985 bool success; 2986 2987 if (cpu_buffer->nr_pages_to_update > 0) 2988 success = rb_insert_pages(cpu_buffer); 2989 else 2990 success = rb_remove_pages(cpu_buffer, 2991 -cpu_buffer->nr_pages_to_update); 2992 2993 if (success) 2994 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2995 } 2996 2997 static void update_pages_handler(struct work_struct *work) 2998 { 2999 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 3000 struct ring_buffer_per_cpu, update_pages_work); 3001 rb_update_pages(cpu_buffer); 3002 complete(&cpu_buffer->update_done); 3003 } 3004 3005 /** 3006 * ring_buffer_resize - resize the ring buffer 3007 * @buffer: the buffer to resize. 3008 * @size: the new size. 3009 * @cpu_id: the cpu buffer to resize 3010 * 3011 * Minimum size is 2 * buffer->subbuf_size. 3012 * 3013 * Returns 0 on success and < 0 on failure. 3014 */ 3015 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 3016 int cpu_id) 3017 { 3018 struct ring_buffer_per_cpu *cpu_buffer; 3019 unsigned long nr_pages; 3020 int cpu, err; 3021 3022 /* 3023 * Always succeed at resizing a non-existent buffer: 3024 */ 3025 if (!buffer) 3026 return 0; 3027 3028 /* Make sure the requested buffer exists */ 3029 if (cpu_id != RING_BUFFER_ALL_CPUS && 3030 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 3031 return 0; 3032 3033 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 3034 3035 /* we need a minimum of two pages */ 3036 if (nr_pages < 2) 3037 nr_pages = 2; 3038 3039 /* 3040 * Keep CPUs from coming online while resizing to synchronize 3041 * with new per CPU buffers being created. 3042 */ 3043 guard(cpus_read_lock)(); 3044 3045 /* prevent another thread from changing buffer sizes */ 3046 mutex_lock(&buffer->mutex); 3047 atomic_inc(&buffer->resizing); 3048 3049 if (cpu_id == RING_BUFFER_ALL_CPUS) { 3050 /* 3051 * Don't succeed if resizing is disabled, as a reader might be 3052 * manipulating the ring buffer and is expecting a sane state while 3053 * this is true. 3054 */ 3055 for_each_buffer_cpu(buffer, cpu) { 3056 cpu_buffer = buffer->buffers[cpu]; 3057 if (atomic_read(&cpu_buffer->resize_disabled)) { 3058 err = -EBUSY; 3059 goto out_err_unlock; 3060 } 3061 } 3062 3063 /* calculate the pages to update */ 3064 for_each_buffer_cpu(buffer, cpu) { 3065 cpu_buffer = buffer->buffers[cpu]; 3066 3067 cpu_buffer->nr_pages_to_update = nr_pages - 3068 cpu_buffer->nr_pages; 3069 /* 3070 * nothing more to do for removing pages or no update 3071 */ 3072 if (cpu_buffer->nr_pages_to_update <= 0) 3073 continue; 3074 /* 3075 * to add pages, make sure all new pages can be 3076 * allocated without receiving ENOMEM 3077 */ 3078 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3079 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3080 &cpu_buffer->new_pages)) { 3081 /* not enough memory for new pages */ 3082 err = -ENOMEM; 3083 goto out_err; 3084 } 3085 3086 cond_resched(); 3087 } 3088 3089 /* 3090 * Fire off all the required work handlers 3091 * We can't schedule on offline CPUs, but it's not necessary 3092 * since we can change their buffer sizes without any race. 3093 */ 3094 for_each_buffer_cpu(buffer, cpu) { 3095 cpu_buffer = buffer->buffers[cpu]; 3096 if (!cpu_buffer->nr_pages_to_update) 3097 continue; 3098 3099 /* Can't run something on an offline CPU. */ 3100 if (!cpu_online(cpu)) { 3101 rb_update_pages(cpu_buffer); 3102 cpu_buffer->nr_pages_to_update = 0; 3103 } else { 3104 /* Run directly if possible. */ 3105 migrate_disable(); 3106 if (cpu != smp_processor_id()) { 3107 migrate_enable(); 3108 schedule_work_on(cpu, 3109 &cpu_buffer->update_pages_work); 3110 } else { 3111 update_pages_handler(&cpu_buffer->update_pages_work); 3112 migrate_enable(); 3113 } 3114 } 3115 } 3116 3117 /* wait for all the updates to complete */ 3118 for_each_buffer_cpu(buffer, cpu) { 3119 cpu_buffer = buffer->buffers[cpu]; 3120 if (!cpu_buffer->nr_pages_to_update) 3121 continue; 3122 3123 if (cpu_online(cpu)) 3124 wait_for_completion(&cpu_buffer->update_done); 3125 cpu_buffer->nr_pages_to_update = 0; 3126 } 3127 3128 } else { 3129 cpu_buffer = buffer->buffers[cpu_id]; 3130 3131 if (nr_pages == cpu_buffer->nr_pages) 3132 goto out; 3133 3134 /* 3135 * Don't succeed if resizing is disabled, as a reader might be 3136 * manipulating the ring buffer and is expecting a sane state while 3137 * this is true. 3138 */ 3139 if (atomic_read(&cpu_buffer->resize_disabled)) { 3140 err = -EBUSY; 3141 goto out_err_unlock; 3142 } 3143 3144 cpu_buffer->nr_pages_to_update = nr_pages - 3145 cpu_buffer->nr_pages; 3146 3147 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3148 if (cpu_buffer->nr_pages_to_update > 0 && 3149 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3150 &cpu_buffer->new_pages)) { 3151 err = -ENOMEM; 3152 goto out_err; 3153 } 3154 3155 /* Can't run something on an offline CPU. */ 3156 if (!cpu_online(cpu_id)) 3157 rb_update_pages(cpu_buffer); 3158 else { 3159 /* Run directly if possible. */ 3160 migrate_disable(); 3161 if (cpu_id == smp_processor_id()) { 3162 rb_update_pages(cpu_buffer); 3163 migrate_enable(); 3164 } else { 3165 migrate_enable(); 3166 schedule_work_on(cpu_id, 3167 &cpu_buffer->update_pages_work); 3168 wait_for_completion(&cpu_buffer->update_done); 3169 } 3170 } 3171 3172 cpu_buffer->nr_pages_to_update = 0; 3173 } 3174 3175 out: 3176 /* 3177 * The ring buffer resize can happen with the ring buffer 3178 * enabled, so that the update disturbs the tracing as little 3179 * as possible. But if the buffer is disabled, we do not need 3180 * to worry about that, and we can take the time to verify 3181 * that the buffer is not corrupt. 3182 */ 3183 if (atomic_read(&buffer->record_disabled)) { 3184 atomic_inc(&buffer->record_disabled); 3185 /* 3186 * Even though the buffer was disabled, we must make sure 3187 * that it is truly disabled before calling rb_check_pages. 3188 * There could have been a race between checking 3189 * record_disable and incrementing it. 3190 */ 3191 synchronize_rcu(); 3192 for_each_buffer_cpu(buffer, cpu) { 3193 cpu_buffer = buffer->buffers[cpu]; 3194 rb_check_pages(cpu_buffer); 3195 } 3196 atomic_dec(&buffer->record_disabled); 3197 } 3198 3199 atomic_dec(&buffer->resizing); 3200 mutex_unlock(&buffer->mutex); 3201 return 0; 3202 3203 out_err: 3204 for_each_buffer_cpu(buffer, cpu) { 3205 struct buffer_page *bpage, *tmp; 3206 3207 cpu_buffer = buffer->buffers[cpu]; 3208 cpu_buffer->nr_pages_to_update = 0; 3209 3210 if (list_empty(&cpu_buffer->new_pages)) 3211 continue; 3212 3213 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3214 list) { 3215 list_del_init(&bpage->list); 3216 free_buffer_page(bpage); 3217 3218 cond_resched(); 3219 } 3220 } 3221 out_err_unlock: 3222 atomic_dec(&buffer->resizing); 3223 mutex_unlock(&buffer->mutex); 3224 return err; 3225 } 3226 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3227 3228 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3229 { 3230 mutex_lock(&buffer->mutex); 3231 if (val) 3232 buffer->flags |= RB_FL_OVERWRITE; 3233 else 3234 buffer->flags &= ~RB_FL_OVERWRITE; 3235 mutex_unlock(&buffer->mutex); 3236 } 3237 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3238 3239 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3240 { 3241 return bpage->page->data + index; 3242 } 3243 3244 static __always_inline struct ring_buffer_event * 3245 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3246 { 3247 return __rb_page_index(cpu_buffer->reader_page, 3248 cpu_buffer->reader_page->read); 3249 } 3250 3251 static struct ring_buffer_event * 3252 rb_iter_head_event(struct ring_buffer_iter *iter) 3253 { 3254 struct ring_buffer_event *event; 3255 struct buffer_page *iter_head_page = iter->head_page; 3256 unsigned long commit; 3257 unsigned length; 3258 3259 if (iter->head != iter->next_event) 3260 return iter->event; 3261 3262 /* 3263 * When the writer goes across pages, it issues a cmpxchg which 3264 * is a mb(), which will synchronize with the rmb here. 3265 * (see rb_tail_page_update() and __rb_reserve_next()) 3266 */ 3267 commit = rb_page_commit(iter_head_page); 3268 smp_rmb(); 3269 3270 /* An event needs to be at least 8 bytes in size */ 3271 if (iter->head > commit - 8) 3272 goto reset; 3273 3274 event = __rb_page_index(iter_head_page, iter->head); 3275 length = rb_event_length(event); 3276 3277 /* 3278 * READ_ONCE() doesn't work on functions and we don't want the 3279 * compiler doing any crazy optimizations with length. 3280 */ 3281 barrier(); 3282 3283 if ((iter->head + length) > commit || length > iter->event_size) 3284 /* Writer corrupted the read? */ 3285 goto reset; 3286 3287 memcpy(iter->event, event, length); 3288 /* 3289 * If the page stamp is still the same after this rmb() then the 3290 * event was safely copied without the writer entering the page. 3291 */ 3292 smp_rmb(); 3293 3294 /* Make sure the page didn't change since we read this */ 3295 if (iter->page_stamp != iter_head_page->page->time_stamp || 3296 commit > rb_page_commit(iter_head_page)) 3297 goto reset; 3298 3299 iter->next_event = iter->head + length; 3300 return iter->event; 3301 reset: 3302 /* Reset to the beginning */ 3303 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3304 iter->head = 0; 3305 iter->next_event = 0; 3306 iter->missed_events = 1; 3307 return NULL; 3308 } 3309 3310 /* Size is determined by what has been committed */ 3311 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3312 { 3313 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3314 } 3315 3316 static __always_inline unsigned 3317 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3318 { 3319 return rb_page_commit(cpu_buffer->commit_page); 3320 } 3321 3322 static __always_inline unsigned 3323 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3324 { 3325 unsigned long addr = (unsigned long)event; 3326 3327 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3328 3329 return addr - BUF_PAGE_HDR_SIZE; 3330 } 3331 3332 static void rb_inc_iter(struct ring_buffer_iter *iter) 3333 { 3334 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3335 3336 /* 3337 * The iterator could be on the reader page (it starts there). 3338 * But the head could have moved, since the reader was 3339 * found. Check for this case and assign the iterator 3340 * to the head page instead of next. 3341 */ 3342 if (iter->head_page == cpu_buffer->reader_page) 3343 iter->head_page = rb_set_head_page(cpu_buffer); 3344 else 3345 rb_inc_page(&iter->head_page); 3346 3347 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3348 iter->head = 0; 3349 iter->next_event = 0; 3350 } 3351 3352 /* Return the index into the sub-buffers for a given sub-buffer */ 3353 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3354 { 3355 void *subbuf_array; 3356 3357 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3358 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3359 return (subbuf - subbuf_array) / meta->subbuf_size; 3360 } 3361 3362 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3363 struct buffer_page *next_page) 3364 { 3365 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3366 unsigned long old_head = (unsigned long)next_page->page; 3367 unsigned long new_head; 3368 3369 rb_inc_page(&next_page); 3370 new_head = (unsigned long)next_page->page; 3371 3372 /* 3373 * Only move it forward once, if something else came in and 3374 * moved it forward, then we don't want to touch it. 3375 */ 3376 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3377 } 3378 3379 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3380 struct buffer_page *reader) 3381 { 3382 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3383 void *old_reader = cpu_buffer->reader_page->page; 3384 void *new_reader = reader->page; 3385 int id; 3386 3387 id = reader->id; 3388 cpu_buffer->reader_page->id = id; 3389 reader->id = 0; 3390 3391 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3392 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3393 3394 /* The head pointer is the one after the reader */ 3395 rb_update_meta_head(cpu_buffer, reader); 3396 } 3397 3398 /* 3399 * rb_handle_head_page - writer hit the head page 3400 * 3401 * Returns: +1 to retry page 3402 * 0 to continue 3403 * -1 on error 3404 */ 3405 static int 3406 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3407 struct buffer_page *tail_page, 3408 struct buffer_page *next_page) 3409 { 3410 struct buffer_page *new_head; 3411 int entries; 3412 int type; 3413 int ret; 3414 3415 entries = rb_page_entries(next_page); 3416 3417 /* 3418 * The hard part is here. We need to move the head 3419 * forward, and protect against both readers on 3420 * other CPUs and writers coming in via interrupts. 3421 */ 3422 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3423 RB_PAGE_HEAD); 3424 3425 /* 3426 * type can be one of four: 3427 * NORMAL - an interrupt already moved it for us 3428 * HEAD - we are the first to get here. 3429 * UPDATE - we are the interrupt interrupting 3430 * a current move. 3431 * MOVED - a reader on another CPU moved the next 3432 * pointer to its reader page. Give up 3433 * and try again. 3434 */ 3435 3436 switch (type) { 3437 case RB_PAGE_HEAD: 3438 /* 3439 * We changed the head to UPDATE, thus 3440 * it is our responsibility to update 3441 * the counters. 3442 */ 3443 local_add(entries, &cpu_buffer->overrun); 3444 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3445 local_inc(&cpu_buffer->pages_lost); 3446 3447 if (cpu_buffer->ring_meta) 3448 rb_update_meta_head(cpu_buffer, next_page); 3449 /* 3450 * The entries will be zeroed out when we move the 3451 * tail page. 3452 */ 3453 3454 /* still more to do */ 3455 break; 3456 3457 case RB_PAGE_UPDATE: 3458 /* 3459 * This is an interrupt that interrupt the 3460 * previous update. Still more to do. 3461 */ 3462 break; 3463 case RB_PAGE_NORMAL: 3464 /* 3465 * An interrupt came in before the update 3466 * and processed this for us. 3467 * Nothing left to do. 3468 */ 3469 return 1; 3470 case RB_PAGE_MOVED: 3471 /* 3472 * The reader is on another CPU and just did 3473 * a swap with our next_page. 3474 * Try again. 3475 */ 3476 return 1; 3477 default: 3478 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3479 return -1; 3480 } 3481 3482 /* 3483 * Now that we are here, the old head pointer is 3484 * set to UPDATE. This will keep the reader from 3485 * swapping the head page with the reader page. 3486 * The reader (on another CPU) will spin till 3487 * we are finished. 3488 * 3489 * We just need to protect against interrupts 3490 * doing the job. We will set the next pointer 3491 * to HEAD. After that, we set the old pointer 3492 * to NORMAL, but only if it was HEAD before. 3493 * otherwise we are an interrupt, and only 3494 * want the outer most commit to reset it. 3495 */ 3496 new_head = next_page; 3497 rb_inc_page(&new_head); 3498 3499 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3500 RB_PAGE_NORMAL); 3501 3502 /* 3503 * Valid returns are: 3504 * HEAD - an interrupt came in and already set it. 3505 * NORMAL - One of two things: 3506 * 1) We really set it. 3507 * 2) A bunch of interrupts came in and moved 3508 * the page forward again. 3509 */ 3510 switch (ret) { 3511 case RB_PAGE_HEAD: 3512 case RB_PAGE_NORMAL: 3513 /* OK */ 3514 break; 3515 default: 3516 RB_WARN_ON(cpu_buffer, 1); 3517 return -1; 3518 } 3519 3520 /* 3521 * It is possible that an interrupt came in, 3522 * set the head up, then more interrupts came in 3523 * and moved it again. When we get back here, 3524 * the page would have been set to NORMAL but we 3525 * just set it back to HEAD. 3526 * 3527 * How do you detect this? Well, if that happened 3528 * the tail page would have moved. 3529 */ 3530 if (ret == RB_PAGE_NORMAL) { 3531 struct buffer_page *buffer_tail_page; 3532 3533 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3534 /* 3535 * If the tail had moved passed next, then we need 3536 * to reset the pointer. 3537 */ 3538 if (buffer_tail_page != tail_page && 3539 buffer_tail_page != next_page) 3540 rb_head_page_set_normal(cpu_buffer, new_head, 3541 next_page, 3542 RB_PAGE_HEAD); 3543 } 3544 3545 /* 3546 * If this was the outer most commit (the one that 3547 * changed the original pointer from HEAD to UPDATE), 3548 * then it is up to us to reset it to NORMAL. 3549 */ 3550 if (type == RB_PAGE_HEAD) { 3551 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3552 tail_page, 3553 RB_PAGE_UPDATE); 3554 if (RB_WARN_ON(cpu_buffer, 3555 ret != RB_PAGE_UPDATE)) 3556 return -1; 3557 } 3558 3559 return 0; 3560 } 3561 3562 static inline void 3563 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3564 unsigned long tail, struct rb_event_info *info) 3565 { 3566 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3567 struct buffer_page *tail_page = info->tail_page; 3568 struct ring_buffer_event *event; 3569 unsigned long length = info->length; 3570 3571 /* 3572 * Only the event that crossed the page boundary 3573 * must fill the old tail_page with padding. 3574 */ 3575 if (tail >= bsize) { 3576 /* 3577 * If the page was filled, then we still need 3578 * to update the real_end. Reset it to zero 3579 * and the reader will ignore it. 3580 */ 3581 if (tail == bsize) 3582 tail_page->real_end = 0; 3583 3584 local_sub(length, &tail_page->write); 3585 return; 3586 } 3587 3588 event = __rb_page_index(tail_page, tail); 3589 3590 /* 3591 * Save the original length to the meta data. 3592 * This will be used by the reader to add lost event 3593 * counter. 3594 */ 3595 tail_page->real_end = tail; 3596 3597 /* 3598 * If this event is bigger than the minimum size, then 3599 * we need to be careful that we don't subtract the 3600 * write counter enough to allow another writer to slip 3601 * in on this page. 3602 * We put in a discarded commit instead, to make sure 3603 * that this space is not used again, and this space will 3604 * not be accounted into 'entries_bytes'. 3605 * 3606 * If we are less than the minimum size, we don't need to 3607 * worry about it. 3608 */ 3609 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3610 /* No room for any events */ 3611 3612 /* Mark the rest of the page with padding */ 3613 rb_event_set_padding(event); 3614 3615 /* Make sure the padding is visible before the write update */ 3616 smp_wmb(); 3617 3618 /* Set the write back to the previous setting */ 3619 local_sub(length, &tail_page->write); 3620 return; 3621 } 3622 3623 /* Put in a discarded event */ 3624 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3625 event->type_len = RINGBUF_TYPE_PADDING; 3626 /* time delta must be non zero */ 3627 event->time_delta = 1; 3628 3629 /* account for padding bytes */ 3630 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3631 3632 /* Make sure the padding is visible before the tail_page->write update */ 3633 smp_wmb(); 3634 3635 /* Set write to end of buffer */ 3636 length = (tail + length) - bsize; 3637 local_sub(length, &tail_page->write); 3638 } 3639 3640 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3641 3642 /* 3643 * This is the slow path, force gcc not to inline it. 3644 */ 3645 static noinline struct ring_buffer_event * 3646 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3647 unsigned long tail, struct rb_event_info *info) 3648 { 3649 struct buffer_page *tail_page = info->tail_page; 3650 struct buffer_page *commit_page = cpu_buffer->commit_page; 3651 struct trace_buffer *buffer = cpu_buffer->buffer; 3652 struct buffer_page *next_page; 3653 int ret; 3654 3655 next_page = tail_page; 3656 3657 rb_inc_page(&next_page); 3658 3659 /* 3660 * If for some reason, we had an interrupt storm that made 3661 * it all the way around the buffer, bail, and warn 3662 * about it. 3663 */ 3664 if (unlikely(next_page == commit_page)) { 3665 local_inc(&cpu_buffer->commit_overrun); 3666 goto out_reset; 3667 } 3668 3669 /* 3670 * This is where the fun begins! 3671 * 3672 * We are fighting against races between a reader that 3673 * could be on another CPU trying to swap its reader 3674 * page with the buffer head. 3675 * 3676 * We are also fighting against interrupts coming in and 3677 * moving the head or tail on us as well. 3678 * 3679 * If the next page is the head page then we have filled 3680 * the buffer, unless the commit page is still on the 3681 * reader page. 3682 */ 3683 if (rb_is_head_page(next_page, &tail_page->list)) { 3684 3685 /* 3686 * If the commit is not on the reader page, then 3687 * move the header page. 3688 */ 3689 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3690 /* 3691 * If we are not in overwrite mode, 3692 * this is easy, just stop here. 3693 */ 3694 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3695 local_inc(&cpu_buffer->dropped_events); 3696 goto out_reset; 3697 } 3698 3699 ret = rb_handle_head_page(cpu_buffer, 3700 tail_page, 3701 next_page); 3702 if (ret < 0) 3703 goto out_reset; 3704 if (ret) 3705 goto out_again; 3706 } else { 3707 /* 3708 * We need to be careful here too. The 3709 * commit page could still be on the reader 3710 * page. We could have a small buffer, and 3711 * have filled up the buffer with events 3712 * from interrupts and such, and wrapped. 3713 * 3714 * Note, if the tail page is also on the 3715 * reader_page, we let it move out. 3716 */ 3717 if (unlikely((cpu_buffer->commit_page != 3718 cpu_buffer->tail_page) && 3719 (cpu_buffer->commit_page == 3720 cpu_buffer->reader_page))) { 3721 local_inc(&cpu_buffer->commit_overrun); 3722 goto out_reset; 3723 } 3724 } 3725 } 3726 3727 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3728 3729 out_again: 3730 3731 rb_reset_tail(cpu_buffer, tail, info); 3732 3733 /* Commit what we have for now. */ 3734 rb_end_commit(cpu_buffer); 3735 /* rb_end_commit() decs committing */ 3736 local_inc(&cpu_buffer->committing); 3737 3738 /* fail and let the caller try again */ 3739 return ERR_PTR(-EAGAIN); 3740 3741 out_reset: 3742 /* reset write */ 3743 rb_reset_tail(cpu_buffer, tail, info); 3744 3745 return NULL; 3746 } 3747 3748 /* Slow path */ 3749 static struct ring_buffer_event * 3750 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3751 struct ring_buffer_event *event, u64 delta, bool abs) 3752 { 3753 if (abs) 3754 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3755 else 3756 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3757 3758 /* Not the first event on the page, or not delta? */ 3759 if (abs || rb_event_index(cpu_buffer, event)) { 3760 event->time_delta = delta & TS_MASK; 3761 event->array[0] = delta >> TS_SHIFT; 3762 } else { 3763 /* nope, just zero it */ 3764 event->time_delta = 0; 3765 event->array[0] = 0; 3766 } 3767 3768 return skip_time_extend(event); 3769 } 3770 3771 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3772 static inline bool sched_clock_stable(void) 3773 { 3774 return true; 3775 } 3776 #endif 3777 3778 static void 3779 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3780 struct rb_event_info *info) 3781 { 3782 u64 write_stamp; 3783 3784 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3785 (unsigned long long)info->delta, 3786 (unsigned long long)info->ts, 3787 (unsigned long long)info->before, 3788 (unsigned long long)info->after, 3789 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3790 sched_clock_stable() ? "" : 3791 "If you just came from a suspend/resume,\n" 3792 "please switch to the trace global clock:\n" 3793 " echo global > /sys/kernel/tracing/trace_clock\n" 3794 "or add trace_clock=global to the kernel command line\n"); 3795 } 3796 3797 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3798 struct ring_buffer_event **event, 3799 struct rb_event_info *info, 3800 u64 *delta, 3801 unsigned int *length) 3802 { 3803 bool abs = info->add_timestamp & 3804 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3805 3806 if (unlikely(info->delta > (1ULL << 59))) { 3807 /* 3808 * Some timers can use more than 59 bits, and when a timestamp 3809 * is added to the buffer, it will lose those bits. 3810 */ 3811 if (abs && (info->ts & TS_MSB)) { 3812 info->delta &= ABS_TS_MASK; 3813 3814 /* did the clock go backwards */ 3815 } else if (info->before == info->after && info->before > info->ts) { 3816 /* not interrupted */ 3817 static int once; 3818 3819 /* 3820 * This is possible with a recalibrating of the TSC. 3821 * Do not produce a call stack, but just report it. 3822 */ 3823 if (!once) { 3824 once++; 3825 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3826 info->before, info->ts); 3827 } 3828 } else 3829 rb_check_timestamp(cpu_buffer, info); 3830 if (!abs) 3831 info->delta = 0; 3832 } 3833 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3834 *length -= RB_LEN_TIME_EXTEND; 3835 *delta = 0; 3836 } 3837 3838 /** 3839 * rb_update_event - update event type and data 3840 * @cpu_buffer: The per cpu buffer of the @event 3841 * @event: the event to update 3842 * @info: The info to update the @event with (contains length and delta) 3843 * 3844 * Update the type and data fields of the @event. The length 3845 * is the actual size that is written to the ring buffer, 3846 * and with this, we can determine what to place into the 3847 * data field. 3848 */ 3849 static void 3850 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3851 struct ring_buffer_event *event, 3852 struct rb_event_info *info) 3853 { 3854 unsigned length = info->length; 3855 u64 delta = info->delta; 3856 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3857 3858 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3859 cpu_buffer->event_stamp[nest] = info->ts; 3860 3861 /* 3862 * If we need to add a timestamp, then we 3863 * add it to the start of the reserved space. 3864 */ 3865 if (unlikely(info->add_timestamp)) 3866 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3867 3868 event->time_delta = delta; 3869 length -= RB_EVNT_HDR_SIZE; 3870 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3871 event->type_len = 0; 3872 event->array[0] = length; 3873 } else 3874 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3875 } 3876 3877 static unsigned rb_calculate_event_length(unsigned length) 3878 { 3879 struct ring_buffer_event event; /* Used only for sizeof array */ 3880 3881 /* zero length can cause confusions */ 3882 if (!length) 3883 length++; 3884 3885 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3886 length += sizeof(event.array[0]); 3887 3888 length += RB_EVNT_HDR_SIZE; 3889 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3890 3891 /* 3892 * In case the time delta is larger than the 27 bits for it 3893 * in the header, we need to add a timestamp. If another 3894 * event comes in when trying to discard this one to increase 3895 * the length, then the timestamp will be added in the allocated 3896 * space of this event. If length is bigger than the size needed 3897 * for the TIME_EXTEND, then padding has to be used. The events 3898 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3899 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3900 * As length is a multiple of 4, we only need to worry if it 3901 * is 12 (RB_LEN_TIME_EXTEND + 4). 3902 */ 3903 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3904 length += RB_ALIGNMENT; 3905 3906 return length; 3907 } 3908 3909 static inline bool 3910 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3911 struct ring_buffer_event *event) 3912 { 3913 unsigned long new_index, old_index; 3914 struct buffer_page *bpage; 3915 unsigned long addr; 3916 3917 new_index = rb_event_index(cpu_buffer, event); 3918 old_index = new_index + rb_event_ts_length(event); 3919 addr = (unsigned long)event; 3920 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3921 3922 bpage = READ_ONCE(cpu_buffer->tail_page); 3923 3924 /* 3925 * Make sure the tail_page is still the same and 3926 * the next write location is the end of this event 3927 */ 3928 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3929 unsigned long write_mask = 3930 local_read(&bpage->write) & ~RB_WRITE_MASK; 3931 unsigned long event_length = rb_event_length(event); 3932 3933 /* 3934 * For the before_stamp to be different than the write_stamp 3935 * to make sure that the next event adds an absolute 3936 * value and does not rely on the saved write stamp, which 3937 * is now going to be bogus. 3938 * 3939 * By setting the before_stamp to zero, the next event 3940 * is not going to use the write_stamp and will instead 3941 * create an absolute timestamp. This means there's no 3942 * reason to update the wirte_stamp! 3943 */ 3944 rb_time_set(&cpu_buffer->before_stamp, 0); 3945 3946 /* 3947 * If an event were to come in now, it would see that the 3948 * write_stamp and the before_stamp are different, and assume 3949 * that this event just added itself before updating 3950 * the write stamp. The interrupting event will fix the 3951 * write stamp for us, and use an absolute timestamp. 3952 */ 3953 3954 /* 3955 * This is on the tail page. It is possible that 3956 * a write could come in and move the tail page 3957 * and write to the next page. That is fine 3958 * because we just shorten what is on this page. 3959 */ 3960 old_index += write_mask; 3961 new_index += write_mask; 3962 3963 /* caution: old_index gets updated on cmpxchg failure */ 3964 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3965 /* update counters */ 3966 local_sub(event_length, &cpu_buffer->entries_bytes); 3967 return true; 3968 } 3969 } 3970 3971 /* could not discard */ 3972 return false; 3973 } 3974 3975 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3976 { 3977 local_inc(&cpu_buffer->committing); 3978 local_inc(&cpu_buffer->commits); 3979 } 3980 3981 static __always_inline void 3982 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3983 { 3984 unsigned long max_count; 3985 3986 /* 3987 * We only race with interrupts and NMIs on this CPU. 3988 * If we own the commit event, then we can commit 3989 * all others that interrupted us, since the interruptions 3990 * are in stack format (they finish before they come 3991 * back to us). This allows us to do a simple loop to 3992 * assign the commit to the tail. 3993 */ 3994 again: 3995 max_count = cpu_buffer->nr_pages * 100; 3996 3997 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3998 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3999 return; 4000 if (RB_WARN_ON(cpu_buffer, 4001 rb_is_reader_page(cpu_buffer->tail_page))) 4002 return; 4003 /* 4004 * No need for a memory barrier here, as the update 4005 * of the tail_page did it for this page. 4006 */ 4007 local_set(&cpu_buffer->commit_page->page->commit, 4008 rb_page_write(cpu_buffer->commit_page)); 4009 rb_inc_page(&cpu_buffer->commit_page); 4010 if (cpu_buffer->ring_meta) { 4011 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 4012 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 4013 } 4014 /* add barrier to keep gcc from optimizing too much */ 4015 barrier(); 4016 } 4017 while (rb_commit_index(cpu_buffer) != 4018 rb_page_write(cpu_buffer->commit_page)) { 4019 4020 /* Make sure the readers see the content of what is committed. */ 4021 smp_wmb(); 4022 local_set(&cpu_buffer->commit_page->page->commit, 4023 rb_page_write(cpu_buffer->commit_page)); 4024 RB_WARN_ON(cpu_buffer, 4025 local_read(&cpu_buffer->commit_page->page->commit) & 4026 ~RB_WRITE_MASK); 4027 barrier(); 4028 } 4029 4030 /* again, keep gcc from optimizing */ 4031 barrier(); 4032 4033 /* 4034 * If an interrupt came in just after the first while loop 4035 * and pushed the tail page forward, we will be left with 4036 * a dangling commit that will never go forward. 4037 */ 4038 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 4039 goto again; 4040 } 4041 4042 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 4043 { 4044 unsigned long commits; 4045 4046 if (RB_WARN_ON(cpu_buffer, 4047 !local_read(&cpu_buffer->committing))) 4048 return; 4049 4050 again: 4051 commits = local_read(&cpu_buffer->commits); 4052 /* synchronize with interrupts */ 4053 barrier(); 4054 if (local_read(&cpu_buffer->committing) == 1) 4055 rb_set_commit_to_write(cpu_buffer); 4056 4057 local_dec(&cpu_buffer->committing); 4058 4059 /* synchronize with interrupts */ 4060 barrier(); 4061 4062 /* 4063 * Need to account for interrupts coming in between the 4064 * updating of the commit page and the clearing of the 4065 * committing counter. 4066 */ 4067 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 4068 !local_read(&cpu_buffer->committing)) { 4069 local_inc(&cpu_buffer->committing); 4070 goto again; 4071 } 4072 } 4073 4074 static inline void rb_event_discard(struct ring_buffer_event *event) 4075 { 4076 if (extended_time(event)) 4077 event = skip_time_extend(event); 4078 4079 /* array[0] holds the actual length for the discarded event */ 4080 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4081 event->type_len = RINGBUF_TYPE_PADDING; 4082 /* time delta must be non zero */ 4083 if (!event->time_delta) 4084 event->time_delta = 1; 4085 } 4086 4087 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4088 { 4089 local_inc(&cpu_buffer->entries); 4090 rb_end_commit(cpu_buffer); 4091 } 4092 4093 static bool 4094 rb_irq_work_queue(struct rb_irq_work *irq_work) 4095 { 4096 int cpu; 4097 4098 /* irq_work_queue_on() is not NMI-safe */ 4099 if (unlikely(in_nmi())) 4100 return irq_work_queue(&irq_work->work); 4101 4102 /* 4103 * If CPU isolation is not active, cpu is always the current 4104 * CPU, and the following is equivallent to irq_work_queue(). 4105 */ 4106 cpu = housekeeping_any_cpu(HK_TYPE_KERNEL_NOISE); 4107 return irq_work_queue_on(&irq_work->work, cpu); 4108 } 4109 4110 static __always_inline void 4111 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4112 { 4113 if (buffer->irq_work.waiters_pending) { 4114 buffer->irq_work.waiters_pending = false; 4115 /* irq_work_queue() supplies it's own memory barriers */ 4116 rb_irq_work_queue(&buffer->irq_work); 4117 } 4118 4119 if (cpu_buffer->irq_work.waiters_pending) { 4120 cpu_buffer->irq_work.waiters_pending = false; 4121 /* irq_work_queue() supplies it's own memory barriers */ 4122 rb_irq_work_queue(&cpu_buffer->irq_work); 4123 } 4124 4125 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4126 return; 4127 4128 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4129 return; 4130 4131 if (!cpu_buffer->irq_work.full_waiters_pending) 4132 return; 4133 4134 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4135 4136 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4137 return; 4138 4139 cpu_buffer->irq_work.wakeup_full = true; 4140 cpu_buffer->irq_work.full_waiters_pending = false; 4141 /* irq_work_queue() supplies it's own memory barriers */ 4142 rb_irq_work_queue(&cpu_buffer->irq_work); 4143 } 4144 4145 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4146 # define do_ring_buffer_record_recursion() \ 4147 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4148 #else 4149 # define do_ring_buffer_record_recursion() do { } while (0) 4150 #endif 4151 4152 /* 4153 * The lock and unlock are done within a preempt disable section. 4154 * The current_context per_cpu variable can only be modified 4155 * by the current task between lock and unlock. But it can 4156 * be modified more than once via an interrupt. To pass this 4157 * information from the lock to the unlock without having to 4158 * access the 'in_interrupt()' functions again (which do show 4159 * a bit of overhead in something as critical as function tracing, 4160 * we use a bitmask trick. 4161 * 4162 * bit 1 = NMI context 4163 * bit 2 = IRQ context 4164 * bit 3 = SoftIRQ context 4165 * bit 4 = normal context. 4166 * 4167 * This works because this is the order of contexts that can 4168 * preempt other contexts. A SoftIRQ never preempts an IRQ 4169 * context. 4170 * 4171 * When the context is determined, the corresponding bit is 4172 * checked and set (if it was set, then a recursion of that context 4173 * happened). 4174 * 4175 * On unlock, we need to clear this bit. To do so, just subtract 4176 * 1 from the current_context and AND it to itself. 4177 * 4178 * (binary) 4179 * 101 - 1 = 100 4180 * 101 & 100 = 100 (clearing bit zero) 4181 * 4182 * 1010 - 1 = 1001 4183 * 1010 & 1001 = 1000 (clearing bit 1) 4184 * 4185 * The least significant bit can be cleared this way, and it 4186 * just so happens that it is the same bit corresponding to 4187 * the current context. 4188 * 4189 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4190 * is set when a recursion is detected at the current context, and if 4191 * the TRANSITION bit is already set, it will fail the recursion. 4192 * This is needed because there's a lag between the changing of 4193 * interrupt context and updating the preempt count. In this case, 4194 * a false positive will be found. To handle this, one extra recursion 4195 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4196 * bit is already set, then it is considered a recursion and the function 4197 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4198 * 4199 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4200 * to be cleared. Even if it wasn't the context that set it. That is, 4201 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4202 * is called before preempt_count() is updated, since the check will 4203 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4204 * NMI then comes in, it will set the NMI bit, but when the NMI code 4205 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4206 * and leave the NMI bit set. But this is fine, because the interrupt 4207 * code that set the TRANSITION bit will then clear the NMI bit when it 4208 * calls trace_recursive_unlock(). If another NMI comes in, it will 4209 * set the TRANSITION bit and continue. 4210 * 4211 * Note: The TRANSITION bit only handles a single transition between context. 4212 */ 4213 4214 static __always_inline bool 4215 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4216 { 4217 unsigned int val = cpu_buffer->current_context; 4218 int bit = interrupt_context_level(); 4219 4220 bit = RB_CTX_NORMAL - bit; 4221 4222 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4223 /* 4224 * It is possible that this was called by transitioning 4225 * between interrupt context, and preempt_count() has not 4226 * been updated yet. In this case, use the TRANSITION bit. 4227 */ 4228 bit = RB_CTX_TRANSITION; 4229 if (val & (1 << (bit + cpu_buffer->nest))) { 4230 do_ring_buffer_record_recursion(); 4231 return true; 4232 } 4233 } 4234 4235 val |= (1 << (bit + cpu_buffer->nest)); 4236 cpu_buffer->current_context = val; 4237 4238 return false; 4239 } 4240 4241 static __always_inline void 4242 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4243 { 4244 cpu_buffer->current_context &= 4245 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4246 } 4247 4248 /* The recursive locking above uses 5 bits */ 4249 #define NESTED_BITS 5 4250 4251 /** 4252 * ring_buffer_nest_start - Allow to trace while nested 4253 * @buffer: The ring buffer to modify 4254 * 4255 * The ring buffer has a safety mechanism to prevent recursion. 4256 * But there may be a case where a trace needs to be done while 4257 * tracing something else. In this case, calling this function 4258 * will allow this function to nest within a currently active 4259 * ring_buffer_lock_reserve(). 4260 * 4261 * Call this function before calling another ring_buffer_lock_reserve() and 4262 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4263 */ 4264 void ring_buffer_nest_start(struct trace_buffer *buffer) 4265 { 4266 struct ring_buffer_per_cpu *cpu_buffer; 4267 int cpu; 4268 4269 /* Enabled by ring_buffer_nest_end() */ 4270 preempt_disable_notrace(); 4271 cpu = raw_smp_processor_id(); 4272 cpu_buffer = buffer->buffers[cpu]; 4273 /* This is the shift value for the above recursive locking */ 4274 cpu_buffer->nest += NESTED_BITS; 4275 } 4276 4277 /** 4278 * ring_buffer_nest_end - Allow to trace while nested 4279 * @buffer: The ring buffer to modify 4280 * 4281 * Must be called after ring_buffer_nest_start() and after the 4282 * ring_buffer_unlock_commit(). 4283 */ 4284 void ring_buffer_nest_end(struct trace_buffer *buffer) 4285 { 4286 struct ring_buffer_per_cpu *cpu_buffer; 4287 int cpu; 4288 4289 /* disabled by ring_buffer_nest_start() */ 4290 cpu = raw_smp_processor_id(); 4291 cpu_buffer = buffer->buffers[cpu]; 4292 /* This is the shift value for the above recursive locking */ 4293 cpu_buffer->nest -= NESTED_BITS; 4294 preempt_enable_notrace(); 4295 } 4296 4297 /** 4298 * ring_buffer_unlock_commit - commit a reserved 4299 * @buffer: The buffer to commit to 4300 * 4301 * This commits the data to the ring buffer, and releases any locks held. 4302 * 4303 * Must be paired with ring_buffer_lock_reserve. 4304 */ 4305 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4306 { 4307 struct ring_buffer_per_cpu *cpu_buffer; 4308 int cpu = raw_smp_processor_id(); 4309 4310 cpu_buffer = buffer->buffers[cpu]; 4311 4312 rb_commit(cpu_buffer); 4313 4314 rb_wakeups(buffer, cpu_buffer); 4315 4316 trace_recursive_unlock(cpu_buffer); 4317 4318 preempt_enable_notrace(); 4319 4320 return 0; 4321 } 4322 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4323 4324 /* Special value to validate all deltas on a page. */ 4325 #define CHECK_FULL_PAGE 1L 4326 4327 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4328 4329 static const char *show_irq_str(int bits) 4330 { 4331 static const char * type[] = { 4332 ".", // 0 4333 "s", // 1 4334 "h", // 2 4335 "Hs", // 3 4336 "n", // 4 4337 "Ns", // 5 4338 "Nh", // 6 4339 "NHs", // 7 4340 }; 4341 4342 return type[bits]; 4343 } 4344 4345 /* Assume this is a trace event */ 4346 static const char *show_flags(struct ring_buffer_event *event) 4347 { 4348 struct trace_entry *entry; 4349 int bits = 0; 4350 4351 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4352 return "X"; 4353 4354 entry = ring_buffer_event_data(event); 4355 4356 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4357 bits |= 1; 4358 4359 if (entry->flags & TRACE_FLAG_HARDIRQ) 4360 bits |= 2; 4361 4362 if (entry->flags & TRACE_FLAG_NMI) 4363 bits |= 4; 4364 4365 return show_irq_str(bits); 4366 } 4367 4368 static const char *show_irq(struct ring_buffer_event *event) 4369 { 4370 struct trace_entry *entry; 4371 4372 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4373 return ""; 4374 4375 entry = ring_buffer_event_data(event); 4376 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4377 return "d"; 4378 return ""; 4379 } 4380 4381 static const char *show_interrupt_level(void) 4382 { 4383 unsigned long pc = preempt_count(); 4384 unsigned char level = 0; 4385 4386 if (pc & SOFTIRQ_OFFSET) 4387 level |= 1; 4388 4389 if (pc & HARDIRQ_MASK) 4390 level |= 2; 4391 4392 if (pc & NMI_MASK) 4393 level |= 4; 4394 4395 return show_irq_str(level); 4396 } 4397 4398 static void dump_buffer_page(struct buffer_data_page *bpage, 4399 struct rb_event_info *info, 4400 unsigned long tail) 4401 { 4402 struct ring_buffer_event *event; 4403 u64 ts, delta; 4404 int e; 4405 4406 ts = bpage->time_stamp; 4407 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4408 4409 for (e = 0; e < tail; e += rb_event_length(event)) { 4410 4411 event = (struct ring_buffer_event *)(bpage->data + e); 4412 4413 switch (event->type_len) { 4414 4415 case RINGBUF_TYPE_TIME_EXTEND: 4416 delta = rb_event_time_stamp(event); 4417 ts += delta; 4418 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4419 e, ts, delta); 4420 break; 4421 4422 case RINGBUF_TYPE_TIME_STAMP: 4423 delta = rb_event_time_stamp(event); 4424 ts = rb_fix_abs_ts(delta, ts); 4425 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4426 e, ts, delta); 4427 break; 4428 4429 case RINGBUF_TYPE_PADDING: 4430 ts += event->time_delta; 4431 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4432 e, ts, event->time_delta); 4433 break; 4434 4435 case RINGBUF_TYPE_DATA: 4436 ts += event->time_delta; 4437 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4438 e, ts, event->time_delta, 4439 show_flags(event), show_irq(event)); 4440 break; 4441 4442 default: 4443 break; 4444 } 4445 } 4446 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4447 } 4448 4449 static DEFINE_PER_CPU(atomic_t, checking); 4450 static atomic_t ts_dump; 4451 4452 #define buffer_warn_return(fmt, ...) \ 4453 do { \ 4454 /* If another report is happening, ignore this one */ \ 4455 if (atomic_inc_return(&ts_dump) != 1) { \ 4456 atomic_dec(&ts_dump); \ 4457 goto out; \ 4458 } \ 4459 atomic_inc(&cpu_buffer->record_disabled); \ 4460 pr_warn(fmt, ##__VA_ARGS__); \ 4461 dump_buffer_page(bpage, info, tail); \ 4462 atomic_dec(&ts_dump); \ 4463 /* There's some cases in boot up that this can happen */ \ 4464 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4465 /* Do not re-enable checking */ \ 4466 return; \ 4467 } while (0) 4468 4469 /* 4470 * Check if the current event time stamp matches the deltas on 4471 * the buffer page. 4472 */ 4473 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4474 struct rb_event_info *info, 4475 unsigned long tail) 4476 { 4477 struct buffer_data_page *bpage; 4478 u64 ts, delta; 4479 bool full = false; 4480 int ret; 4481 4482 bpage = info->tail_page->page; 4483 4484 if (tail == CHECK_FULL_PAGE) { 4485 full = true; 4486 tail = local_read(&bpage->commit); 4487 } else if (info->add_timestamp & 4488 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4489 /* Ignore events with absolute time stamps */ 4490 return; 4491 } 4492 4493 /* 4494 * Do not check the first event (skip possible extends too). 4495 * Also do not check if previous events have not been committed. 4496 */ 4497 if (tail <= 8 || tail > local_read(&bpage->commit)) 4498 return; 4499 4500 /* 4501 * If this interrupted another event, 4502 */ 4503 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4504 goto out; 4505 4506 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4507 if (ret < 0) { 4508 if (delta < ts) { 4509 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld clock:%pS\n", 4510 cpu_buffer->cpu, ts, delta, 4511 cpu_buffer->buffer->clock); 4512 goto out; 4513 } 4514 } 4515 if ((full && ts > info->ts) || 4516 (!full && ts + info->delta != info->ts)) { 4517 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\ntrace clock:%pS", 4518 cpu_buffer->cpu, 4519 ts + info->delta, info->ts, info->delta, 4520 info->before, info->after, 4521 full ? " (full)" : "", show_interrupt_level(), 4522 cpu_buffer->buffer->clock); 4523 } 4524 out: 4525 atomic_dec(this_cpu_ptr(&checking)); 4526 } 4527 #else 4528 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4529 struct rb_event_info *info, 4530 unsigned long tail) 4531 { 4532 } 4533 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4534 4535 static struct ring_buffer_event * 4536 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4537 struct rb_event_info *info) 4538 { 4539 struct ring_buffer_event *event; 4540 struct buffer_page *tail_page; 4541 unsigned long tail, write, w; 4542 4543 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4544 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4545 4546 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4547 barrier(); 4548 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4549 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4550 barrier(); 4551 info->ts = rb_time_stamp(cpu_buffer->buffer); 4552 4553 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4554 info->delta = info->ts; 4555 } else { 4556 /* 4557 * If interrupting an event time update, we may need an 4558 * absolute timestamp. 4559 * Don't bother if this is the start of a new page (w == 0). 4560 */ 4561 if (!w) { 4562 /* Use the sub-buffer timestamp */ 4563 info->delta = 0; 4564 } else if (unlikely(info->before != info->after)) { 4565 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4566 info->length += RB_LEN_TIME_EXTEND; 4567 } else { 4568 info->delta = info->ts - info->after; 4569 if (unlikely(test_time_stamp(info->delta))) { 4570 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4571 info->length += RB_LEN_TIME_EXTEND; 4572 } 4573 } 4574 } 4575 4576 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4577 4578 /*C*/ write = local_add_return(info->length, &tail_page->write); 4579 4580 /* set write to only the index of the write */ 4581 write &= RB_WRITE_MASK; 4582 4583 tail = write - info->length; 4584 4585 /* See if we shot pass the end of this buffer page */ 4586 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4587 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4588 return rb_move_tail(cpu_buffer, tail, info); 4589 } 4590 4591 if (likely(tail == w)) { 4592 /* Nothing interrupted us between A and C */ 4593 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4594 /* 4595 * If something came in between C and D, the write stamp 4596 * may now not be in sync. But that's fine as the before_stamp 4597 * will be different and then next event will just be forced 4598 * to use an absolute timestamp. 4599 */ 4600 if (likely(!(info->add_timestamp & 4601 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4602 /* This did not interrupt any time update */ 4603 info->delta = info->ts - info->after; 4604 else 4605 /* Just use full timestamp for interrupting event */ 4606 info->delta = info->ts; 4607 check_buffer(cpu_buffer, info, tail); 4608 } else { 4609 u64 ts; 4610 /* SLOW PATH - Interrupted between A and C */ 4611 4612 /* Save the old before_stamp */ 4613 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4614 4615 /* 4616 * Read a new timestamp and update the before_stamp to make 4617 * the next event after this one force using an absolute 4618 * timestamp. This is in case an interrupt were to come in 4619 * between E and F. 4620 */ 4621 ts = rb_time_stamp(cpu_buffer->buffer); 4622 rb_time_set(&cpu_buffer->before_stamp, ts); 4623 4624 barrier(); 4625 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4626 barrier(); 4627 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4628 info->after == info->before && info->after < ts) { 4629 /* 4630 * Nothing came after this event between C and F, it is 4631 * safe to use info->after for the delta as it 4632 * matched info->before and is still valid. 4633 */ 4634 info->delta = ts - info->after; 4635 } else { 4636 /* 4637 * Interrupted between C and F: 4638 * Lost the previous events time stamp. Just set the 4639 * delta to zero, and this will be the same time as 4640 * the event this event interrupted. And the events that 4641 * came after this will still be correct (as they would 4642 * have built their delta on the previous event. 4643 */ 4644 info->delta = 0; 4645 } 4646 info->ts = ts; 4647 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4648 } 4649 4650 /* 4651 * If this is the first commit on the page, then it has the same 4652 * timestamp as the page itself. 4653 */ 4654 if (unlikely(!tail && !(info->add_timestamp & 4655 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4656 info->delta = 0; 4657 4658 /* We reserved something on the buffer */ 4659 4660 event = __rb_page_index(tail_page, tail); 4661 rb_update_event(cpu_buffer, event, info); 4662 4663 local_inc(&tail_page->entries); 4664 4665 /* 4666 * If this is the first commit on the page, then update 4667 * its timestamp. 4668 */ 4669 if (unlikely(!tail)) 4670 tail_page->page->time_stamp = info->ts; 4671 4672 /* account for these added bytes */ 4673 local_add(info->length, &cpu_buffer->entries_bytes); 4674 4675 return event; 4676 } 4677 4678 static __always_inline struct ring_buffer_event * 4679 rb_reserve_next_event(struct trace_buffer *buffer, 4680 struct ring_buffer_per_cpu *cpu_buffer, 4681 unsigned long length) 4682 { 4683 struct ring_buffer_event *event; 4684 struct rb_event_info info; 4685 int nr_loops = 0; 4686 int add_ts_default; 4687 4688 /* 4689 * ring buffer does cmpxchg as well as atomic64 operations 4690 * (which some archs use locking for atomic64), make sure this 4691 * is safe in NMI context 4692 */ 4693 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4694 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4695 (unlikely(in_nmi()))) { 4696 return NULL; 4697 } 4698 4699 rb_start_commit(cpu_buffer); 4700 /* The commit page can not change after this */ 4701 4702 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4703 /* 4704 * Due to the ability to swap a cpu buffer from a buffer 4705 * it is possible it was swapped before we committed. 4706 * (committing stops a swap). We check for it here and 4707 * if it happened, we have to fail the write. 4708 */ 4709 barrier(); 4710 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4711 local_dec(&cpu_buffer->committing); 4712 local_dec(&cpu_buffer->commits); 4713 return NULL; 4714 } 4715 #endif 4716 4717 info.length = rb_calculate_event_length(length); 4718 4719 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4720 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4721 info.length += RB_LEN_TIME_EXTEND; 4722 if (info.length > cpu_buffer->buffer->max_data_size) 4723 goto out_fail; 4724 } else { 4725 add_ts_default = RB_ADD_STAMP_NONE; 4726 } 4727 4728 again: 4729 info.add_timestamp = add_ts_default; 4730 info.delta = 0; 4731 4732 /* 4733 * We allow for interrupts to reenter here and do a trace. 4734 * If one does, it will cause this original code to loop 4735 * back here. Even with heavy interrupts happening, this 4736 * should only happen a few times in a row. If this happens 4737 * 1000 times in a row, there must be either an interrupt 4738 * storm or we have something buggy. 4739 * Bail! 4740 */ 4741 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4742 goto out_fail; 4743 4744 event = __rb_reserve_next(cpu_buffer, &info); 4745 4746 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4747 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4748 info.length -= RB_LEN_TIME_EXTEND; 4749 goto again; 4750 } 4751 4752 if (likely(event)) 4753 return event; 4754 out_fail: 4755 rb_end_commit(cpu_buffer); 4756 return NULL; 4757 } 4758 4759 /** 4760 * ring_buffer_lock_reserve - reserve a part of the buffer 4761 * @buffer: the ring buffer to reserve from 4762 * @length: the length of the data to reserve (excluding event header) 4763 * 4764 * Returns a reserved event on the ring buffer to copy directly to. 4765 * The user of this interface will need to get the body to write into 4766 * and can use the ring_buffer_event_data() interface. 4767 * 4768 * The length is the length of the data needed, not the event length 4769 * which also includes the event header. 4770 * 4771 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4772 * If NULL is returned, then nothing has been allocated or locked. 4773 */ 4774 struct ring_buffer_event * 4775 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4776 { 4777 struct ring_buffer_per_cpu *cpu_buffer; 4778 struct ring_buffer_event *event; 4779 int cpu; 4780 4781 /* If we are tracing schedule, we don't want to recurse */ 4782 preempt_disable_notrace(); 4783 4784 if (unlikely(atomic_read(&buffer->record_disabled))) 4785 goto out; 4786 4787 cpu = raw_smp_processor_id(); 4788 4789 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4790 goto out; 4791 4792 cpu_buffer = buffer->buffers[cpu]; 4793 4794 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4795 goto out; 4796 4797 if (unlikely(length > buffer->max_data_size)) 4798 goto out; 4799 4800 if (unlikely(trace_recursive_lock(cpu_buffer))) 4801 goto out; 4802 4803 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4804 if (!event) 4805 goto out_unlock; 4806 4807 return event; 4808 4809 out_unlock: 4810 trace_recursive_unlock(cpu_buffer); 4811 out: 4812 preempt_enable_notrace(); 4813 return NULL; 4814 } 4815 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4816 4817 /* 4818 * Decrement the entries to the page that an event is on. 4819 * The event does not even need to exist, only the pointer 4820 * to the page it is on. This may only be called before the commit 4821 * takes place. 4822 */ 4823 static inline void 4824 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4825 struct ring_buffer_event *event) 4826 { 4827 unsigned long addr = (unsigned long)event; 4828 struct buffer_page *bpage = cpu_buffer->commit_page; 4829 struct buffer_page *start; 4830 4831 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4832 4833 /* Do the likely case first */ 4834 if (likely(bpage->page == (void *)addr)) { 4835 local_dec(&bpage->entries); 4836 return; 4837 } 4838 4839 /* 4840 * Because the commit page may be on the reader page we 4841 * start with the next page and check the end loop there. 4842 */ 4843 rb_inc_page(&bpage); 4844 start = bpage; 4845 do { 4846 if (bpage->page == (void *)addr) { 4847 local_dec(&bpage->entries); 4848 return; 4849 } 4850 rb_inc_page(&bpage); 4851 } while (bpage != start); 4852 4853 /* commit not part of this buffer?? */ 4854 RB_WARN_ON(cpu_buffer, 1); 4855 } 4856 4857 /** 4858 * ring_buffer_discard_commit - discard an event that has not been committed 4859 * @buffer: the ring buffer 4860 * @event: non committed event to discard 4861 * 4862 * Sometimes an event that is in the ring buffer needs to be ignored. 4863 * This function lets the user discard an event in the ring buffer 4864 * and then that event will not be read later. 4865 * 4866 * This function only works if it is called before the item has been 4867 * committed. It will try to free the event from the ring buffer 4868 * if another event has not been added behind it. 4869 * 4870 * If another event has been added behind it, it will set the event 4871 * up as discarded, and perform the commit. 4872 * 4873 * If this function is called, do not call ring_buffer_unlock_commit on 4874 * the event. 4875 */ 4876 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4877 struct ring_buffer_event *event) 4878 { 4879 struct ring_buffer_per_cpu *cpu_buffer; 4880 int cpu; 4881 4882 /* The event is discarded regardless */ 4883 rb_event_discard(event); 4884 4885 cpu = smp_processor_id(); 4886 cpu_buffer = buffer->buffers[cpu]; 4887 4888 /* 4889 * This must only be called if the event has not been 4890 * committed yet. Thus we can assume that preemption 4891 * is still disabled. 4892 */ 4893 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4894 4895 rb_decrement_entry(cpu_buffer, event); 4896 rb_try_to_discard(cpu_buffer, event); 4897 rb_end_commit(cpu_buffer); 4898 4899 trace_recursive_unlock(cpu_buffer); 4900 4901 preempt_enable_notrace(); 4902 4903 } 4904 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4905 4906 /** 4907 * ring_buffer_write - write data to the buffer without reserving 4908 * @buffer: The ring buffer to write to. 4909 * @length: The length of the data being written (excluding the event header) 4910 * @data: The data to write to the buffer. 4911 * 4912 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4913 * one function. If you already have the data to write to the buffer, it 4914 * may be easier to simply call this function. 4915 * 4916 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4917 * and not the length of the event which would hold the header. 4918 */ 4919 int ring_buffer_write(struct trace_buffer *buffer, 4920 unsigned long length, 4921 void *data) 4922 { 4923 struct ring_buffer_per_cpu *cpu_buffer; 4924 struct ring_buffer_event *event; 4925 void *body; 4926 int ret = -EBUSY; 4927 int cpu; 4928 4929 guard(preempt_notrace)(); 4930 4931 if (atomic_read(&buffer->record_disabled)) 4932 return -EBUSY; 4933 4934 cpu = raw_smp_processor_id(); 4935 4936 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4937 return -EBUSY; 4938 4939 cpu_buffer = buffer->buffers[cpu]; 4940 4941 if (atomic_read(&cpu_buffer->record_disabled)) 4942 return -EBUSY; 4943 4944 if (length > buffer->max_data_size) 4945 return -EBUSY; 4946 4947 if (unlikely(trace_recursive_lock(cpu_buffer))) 4948 return -EBUSY; 4949 4950 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4951 if (!event) 4952 goto out_unlock; 4953 4954 body = rb_event_data(event); 4955 4956 memcpy(body, data, length); 4957 4958 rb_commit(cpu_buffer); 4959 4960 rb_wakeups(buffer, cpu_buffer); 4961 4962 ret = 0; 4963 4964 out_unlock: 4965 trace_recursive_unlock(cpu_buffer); 4966 return ret; 4967 } 4968 EXPORT_SYMBOL_GPL(ring_buffer_write); 4969 4970 /* 4971 * The total entries in the ring buffer is the running counter 4972 * of entries entered into the ring buffer, minus the sum of 4973 * the entries read from the ring buffer and the number of 4974 * entries that were overwritten. 4975 */ 4976 static inline unsigned long 4977 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4978 { 4979 return local_read(&cpu_buffer->entries) - 4980 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4981 } 4982 4983 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4984 { 4985 return !rb_num_of_entries(cpu_buffer); 4986 } 4987 4988 /** 4989 * ring_buffer_record_disable - stop all writes into the buffer 4990 * @buffer: The ring buffer to stop writes to. 4991 * 4992 * This prevents all writes to the buffer. Any attempt to write 4993 * to the buffer after this will fail and return NULL. 4994 * 4995 * The caller should call synchronize_rcu() after this. 4996 */ 4997 void ring_buffer_record_disable(struct trace_buffer *buffer) 4998 { 4999 atomic_inc(&buffer->record_disabled); 5000 } 5001 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 5002 5003 /** 5004 * ring_buffer_record_enable - enable writes to the buffer 5005 * @buffer: The ring buffer to enable writes 5006 * 5007 * Note, multiple disables will need the same number of enables 5008 * to truly enable the writing (much like preempt_disable). 5009 */ 5010 void ring_buffer_record_enable(struct trace_buffer *buffer) 5011 { 5012 atomic_dec(&buffer->record_disabled); 5013 } 5014 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 5015 5016 /** 5017 * ring_buffer_record_off - stop all writes into the buffer 5018 * @buffer: The ring buffer to stop writes to. 5019 * 5020 * This prevents all writes to the buffer. Any attempt to write 5021 * to the buffer after this will fail and return NULL. 5022 * 5023 * This is different than ring_buffer_record_disable() as 5024 * it works like an on/off switch, where as the disable() version 5025 * must be paired with a enable(). 5026 */ 5027 void ring_buffer_record_off(struct trace_buffer *buffer) 5028 { 5029 unsigned int rd; 5030 unsigned int new_rd; 5031 5032 rd = atomic_read(&buffer->record_disabled); 5033 do { 5034 new_rd = rd | RB_BUFFER_OFF; 5035 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5036 } 5037 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 5038 5039 /** 5040 * ring_buffer_record_on - restart writes into the buffer 5041 * @buffer: The ring buffer to start writes to. 5042 * 5043 * This enables all writes to the buffer that was disabled by 5044 * ring_buffer_record_off(). 5045 * 5046 * This is different than ring_buffer_record_enable() as 5047 * it works like an on/off switch, where as the enable() version 5048 * must be paired with a disable(). 5049 */ 5050 void ring_buffer_record_on(struct trace_buffer *buffer) 5051 { 5052 unsigned int rd; 5053 unsigned int new_rd; 5054 5055 rd = atomic_read(&buffer->record_disabled); 5056 do { 5057 new_rd = rd & ~RB_BUFFER_OFF; 5058 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 5059 } 5060 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 5061 5062 /** 5063 * ring_buffer_record_is_on - return true if the ring buffer can write 5064 * @buffer: The ring buffer to see if write is enabled 5065 * 5066 * Returns true if the ring buffer is in a state that it accepts writes. 5067 */ 5068 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 5069 { 5070 return !atomic_read(&buffer->record_disabled); 5071 } 5072 5073 /** 5074 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 5075 * @buffer: The ring buffer to see if write is set enabled 5076 * 5077 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 5078 * Note that this does NOT mean it is in a writable state. 5079 * 5080 * It may return true when the ring buffer has been disabled by 5081 * ring_buffer_record_disable(), as that is a temporary disabling of 5082 * the ring buffer. 5083 */ 5084 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 5085 { 5086 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 5087 } 5088 5089 /** 5090 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 5091 * @buffer: The ring buffer to see if write is enabled 5092 * @cpu: The CPU to test if the ring buffer can write too 5093 * 5094 * Returns true if the ring buffer is in a state that it accepts writes 5095 * for a particular CPU. 5096 */ 5097 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5098 { 5099 struct ring_buffer_per_cpu *cpu_buffer; 5100 5101 cpu_buffer = buffer->buffers[cpu]; 5102 5103 return ring_buffer_record_is_set_on(buffer) && 5104 !atomic_read(&cpu_buffer->record_disabled); 5105 } 5106 5107 /** 5108 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5109 * @buffer: The ring buffer to stop writes to. 5110 * @cpu: The CPU buffer to stop 5111 * 5112 * This prevents all writes to the buffer. Any attempt to write 5113 * to the buffer after this will fail and return NULL. 5114 * 5115 * The caller should call synchronize_rcu() after this. 5116 */ 5117 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5118 { 5119 struct ring_buffer_per_cpu *cpu_buffer; 5120 5121 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5122 return; 5123 5124 cpu_buffer = buffer->buffers[cpu]; 5125 atomic_inc(&cpu_buffer->record_disabled); 5126 } 5127 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5128 5129 /** 5130 * ring_buffer_record_enable_cpu - enable writes to the buffer 5131 * @buffer: The ring buffer to enable writes 5132 * @cpu: The CPU to enable. 5133 * 5134 * Note, multiple disables will need the same number of enables 5135 * to truly enable the writing (much like preempt_disable). 5136 */ 5137 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5138 { 5139 struct ring_buffer_per_cpu *cpu_buffer; 5140 5141 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5142 return; 5143 5144 cpu_buffer = buffer->buffers[cpu]; 5145 atomic_dec(&cpu_buffer->record_disabled); 5146 } 5147 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5148 5149 /** 5150 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5151 * @buffer: The ring buffer 5152 * @cpu: The per CPU buffer to read from. 5153 */ 5154 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5155 { 5156 unsigned long flags; 5157 struct ring_buffer_per_cpu *cpu_buffer; 5158 struct buffer_page *bpage; 5159 u64 ret = 0; 5160 5161 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5162 return 0; 5163 5164 cpu_buffer = buffer->buffers[cpu]; 5165 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5166 /* 5167 * if the tail is on reader_page, oldest time stamp is on the reader 5168 * page 5169 */ 5170 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5171 bpage = cpu_buffer->reader_page; 5172 else 5173 bpage = rb_set_head_page(cpu_buffer); 5174 if (bpage) 5175 ret = bpage->page->time_stamp; 5176 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5177 5178 return ret; 5179 } 5180 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5181 5182 /** 5183 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5184 * @buffer: The ring buffer 5185 * @cpu: The per CPU buffer to read from. 5186 */ 5187 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5188 { 5189 struct ring_buffer_per_cpu *cpu_buffer; 5190 unsigned long ret; 5191 5192 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5193 return 0; 5194 5195 cpu_buffer = buffer->buffers[cpu]; 5196 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5197 5198 return ret; 5199 } 5200 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5201 5202 /** 5203 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5204 * @buffer: The ring buffer 5205 * @cpu: The per CPU buffer to get the entries from. 5206 */ 5207 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5208 { 5209 struct ring_buffer_per_cpu *cpu_buffer; 5210 5211 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5212 return 0; 5213 5214 cpu_buffer = buffer->buffers[cpu]; 5215 5216 return rb_num_of_entries(cpu_buffer); 5217 } 5218 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5219 5220 /** 5221 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5222 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5223 * @buffer: The ring buffer 5224 * @cpu: The per CPU buffer to get the number of overruns from 5225 */ 5226 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5227 { 5228 struct ring_buffer_per_cpu *cpu_buffer; 5229 unsigned long ret; 5230 5231 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5232 return 0; 5233 5234 cpu_buffer = buffer->buffers[cpu]; 5235 ret = local_read(&cpu_buffer->overrun); 5236 5237 return ret; 5238 } 5239 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5240 5241 /** 5242 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5243 * commits failing due to the buffer wrapping around while there are uncommitted 5244 * events, such as during an interrupt storm. 5245 * @buffer: The ring buffer 5246 * @cpu: The per CPU buffer to get the number of overruns from 5247 */ 5248 unsigned long 5249 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5250 { 5251 struct ring_buffer_per_cpu *cpu_buffer; 5252 unsigned long ret; 5253 5254 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5255 return 0; 5256 5257 cpu_buffer = buffer->buffers[cpu]; 5258 ret = local_read(&cpu_buffer->commit_overrun); 5259 5260 return ret; 5261 } 5262 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5263 5264 /** 5265 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5266 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5267 * @buffer: The ring buffer 5268 * @cpu: The per CPU buffer to get the number of overruns from 5269 */ 5270 unsigned long 5271 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5272 { 5273 struct ring_buffer_per_cpu *cpu_buffer; 5274 unsigned long ret; 5275 5276 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5277 return 0; 5278 5279 cpu_buffer = buffer->buffers[cpu]; 5280 ret = local_read(&cpu_buffer->dropped_events); 5281 5282 return ret; 5283 } 5284 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5285 5286 /** 5287 * ring_buffer_read_events_cpu - get the number of events successfully read 5288 * @buffer: The ring buffer 5289 * @cpu: The per CPU buffer to get the number of events read 5290 */ 5291 unsigned long 5292 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5293 { 5294 struct ring_buffer_per_cpu *cpu_buffer; 5295 5296 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5297 return 0; 5298 5299 cpu_buffer = buffer->buffers[cpu]; 5300 return cpu_buffer->read; 5301 } 5302 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5303 5304 /** 5305 * ring_buffer_entries - get the number of entries in a buffer 5306 * @buffer: The ring buffer 5307 * 5308 * Returns the total number of entries in the ring buffer 5309 * (all CPU entries) 5310 */ 5311 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5312 { 5313 struct ring_buffer_per_cpu *cpu_buffer; 5314 unsigned long entries = 0; 5315 int cpu; 5316 5317 /* if you care about this being correct, lock the buffer */ 5318 for_each_buffer_cpu(buffer, cpu) { 5319 cpu_buffer = buffer->buffers[cpu]; 5320 entries += rb_num_of_entries(cpu_buffer); 5321 } 5322 5323 return entries; 5324 } 5325 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5326 5327 /** 5328 * ring_buffer_overruns - get the number of overruns in buffer 5329 * @buffer: The ring buffer 5330 * 5331 * Returns the total number of overruns in the ring buffer 5332 * (all CPU entries) 5333 */ 5334 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5335 { 5336 struct ring_buffer_per_cpu *cpu_buffer; 5337 unsigned long overruns = 0; 5338 int cpu; 5339 5340 /* if you care about this being correct, lock the buffer */ 5341 for_each_buffer_cpu(buffer, cpu) { 5342 cpu_buffer = buffer->buffers[cpu]; 5343 overruns += local_read(&cpu_buffer->overrun); 5344 } 5345 5346 return overruns; 5347 } 5348 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5349 5350 static bool rb_read_remote_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5351 { 5352 local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries)); 5353 local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun)); 5354 local_set(&cpu_buffer->pages_touched, READ_ONCE(cpu_buffer->meta_page->pages_touched)); 5355 local_set(&cpu_buffer->pages_lost, READ_ONCE(cpu_buffer->meta_page->pages_lost)); 5356 5357 return rb_num_of_entries(cpu_buffer); 5358 } 5359 5360 static void rb_update_remote_head(struct ring_buffer_per_cpu *cpu_buffer) 5361 { 5362 struct buffer_page *next, *orig; 5363 int retry = 3; 5364 5365 orig = next = cpu_buffer->head_page; 5366 rb_inc_page(&next); 5367 5368 /* Run after the writer */ 5369 while (cpu_buffer->head_page->page->time_stamp > next->page->time_stamp) { 5370 rb_inc_page(&next); 5371 5372 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5373 rb_inc_page(&cpu_buffer->head_page); 5374 rb_set_list_to_head(cpu_buffer->head_page->list.prev); 5375 5376 if (cpu_buffer->head_page == orig) { 5377 if (WARN_ON_ONCE(!(--retry))) 5378 return; 5379 } 5380 } 5381 5382 orig = cpu_buffer->commit_page = cpu_buffer->head_page; 5383 retry = 3; 5384 5385 while (cpu_buffer->commit_page->page->time_stamp < next->page->time_stamp) { 5386 rb_inc_page(&next); 5387 rb_inc_page(&cpu_buffer->commit_page); 5388 5389 if (cpu_buffer->commit_page == orig) { 5390 if (WARN_ON_ONCE(!(--retry))) 5391 return; 5392 } 5393 } 5394 } 5395 5396 static void rb_iter_reset(struct ring_buffer_iter *iter) 5397 { 5398 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5399 5400 if (cpu_buffer->remote) { 5401 rb_read_remote_meta_page(cpu_buffer); 5402 rb_update_remote_head(cpu_buffer); 5403 } 5404 5405 /* Iterator usage is expected to have record disabled */ 5406 iter->head_page = cpu_buffer->reader_page; 5407 iter->head = cpu_buffer->reader_page->read; 5408 iter->next_event = iter->head; 5409 5410 iter->cache_reader_page = iter->head_page; 5411 iter->cache_read = cpu_buffer->read; 5412 iter->cache_pages_removed = cpu_buffer->pages_removed; 5413 5414 if (iter->head) { 5415 iter->read_stamp = cpu_buffer->read_stamp; 5416 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5417 } else { 5418 iter->read_stamp = iter->head_page->page->time_stamp; 5419 iter->page_stamp = iter->read_stamp; 5420 } 5421 } 5422 5423 /** 5424 * ring_buffer_iter_reset - reset an iterator 5425 * @iter: The iterator to reset 5426 * 5427 * Resets the iterator, so that it will start from the beginning 5428 * again. 5429 */ 5430 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5431 { 5432 struct ring_buffer_per_cpu *cpu_buffer; 5433 unsigned long flags; 5434 5435 if (!iter) 5436 return; 5437 5438 cpu_buffer = iter->cpu_buffer; 5439 5440 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5441 rb_iter_reset(iter); 5442 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5443 } 5444 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5445 5446 /** 5447 * ring_buffer_iter_empty - check if an iterator has no more to read 5448 * @iter: The iterator to check 5449 */ 5450 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5451 { 5452 struct ring_buffer_per_cpu *cpu_buffer; 5453 struct buffer_page *reader; 5454 struct buffer_page *head_page; 5455 struct buffer_page *commit_page; 5456 struct buffer_page *curr_commit_page; 5457 unsigned commit; 5458 u64 curr_commit_ts; 5459 u64 commit_ts; 5460 5461 cpu_buffer = iter->cpu_buffer; 5462 reader = cpu_buffer->reader_page; 5463 head_page = cpu_buffer->head_page; 5464 commit_page = READ_ONCE(cpu_buffer->commit_page); 5465 commit_ts = commit_page->page->time_stamp; 5466 5467 /* 5468 * When the writer goes across pages, it issues a cmpxchg which 5469 * is a mb(), which will synchronize with the rmb here. 5470 * (see rb_tail_page_update()) 5471 */ 5472 smp_rmb(); 5473 commit = rb_page_commit(commit_page); 5474 /* We want to make sure that the commit page doesn't change */ 5475 smp_rmb(); 5476 5477 /* Make sure commit page didn't change */ 5478 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5479 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5480 5481 /* If the commit page changed, then there's more data */ 5482 if (curr_commit_page != commit_page || 5483 curr_commit_ts != commit_ts) 5484 return 0; 5485 5486 /* Still racy, as it may return a false positive, but that's OK */ 5487 return ((iter->head_page == commit_page && iter->head >= commit) || 5488 (iter->head_page == reader && commit_page == head_page && 5489 head_page->read == commit && 5490 iter->head == rb_page_size(cpu_buffer->reader_page))); 5491 } 5492 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5493 5494 static void 5495 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5496 struct ring_buffer_event *event) 5497 { 5498 u64 delta; 5499 5500 switch (event->type_len) { 5501 case RINGBUF_TYPE_PADDING: 5502 return; 5503 5504 case RINGBUF_TYPE_TIME_EXTEND: 5505 delta = rb_event_time_stamp(event); 5506 cpu_buffer->read_stamp += delta; 5507 return; 5508 5509 case RINGBUF_TYPE_TIME_STAMP: 5510 delta = rb_event_time_stamp(event); 5511 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5512 cpu_buffer->read_stamp = delta; 5513 return; 5514 5515 case RINGBUF_TYPE_DATA: 5516 cpu_buffer->read_stamp += event->time_delta; 5517 return; 5518 5519 default: 5520 RB_WARN_ON(cpu_buffer, 1); 5521 } 5522 } 5523 5524 static void 5525 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5526 struct ring_buffer_event *event) 5527 { 5528 u64 delta; 5529 5530 switch (event->type_len) { 5531 case RINGBUF_TYPE_PADDING: 5532 return; 5533 5534 case RINGBUF_TYPE_TIME_EXTEND: 5535 delta = rb_event_time_stamp(event); 5536 iter->read_stamp += delta; 5537 return; 5538 5539 case RINGBUF_TYPE_TIME_STAMP: 5540 delta = rb_event_time_stamp(event); 5541 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5542 iter->read_stamp = delta; 5543 return; 5544 5545 case RINGBUF_TYPE_DATA: 5546 iter->read_stamp += event->time_delta; 5547 return; 5548 5549 default: 5550 RB_WARN_ON(iter->cpu_buffer, 1); 5551 } 5552 } 5553 5554 static struct buffer_page * 5555 __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer) 5556 { 5557 struct buffer_page *new_reader, *prev_reader, *prev_head, *new_head, *last; 5558 5559 if (!rb_read_remote_meta_page(cpu_buffer)) 5560 return NULL; 5561 5562 /* More to read on the reader page */ 5563 if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page)) { 5564 if (!cpu_buffer->reader_page->read) 5565 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 5566 return cpu_buffer->reader_page; 5567 } 5568 5569 prev_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5570 5571 WARN_ON_ONCE(cpu_buffer->remote->swap_reader_page(cpu_buffer->cpu, 5572 cpu_buffer->remote->priv)); 5573 /* nr_pages doesn't include the reader page */ 5574 if (WARN_ON_ONCE(cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages)) 5575 return NULL; 5576 5577 new_reader = cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; 5578 5579 WARN_ON_ONCE(prev_reader == new_reader); 5580 5581 prev_head = new_reader; /* New reader was also the previous head */ 5582 new_head = prev_head; 5583 rb_inc_page(&new_head); 5584 last = prev_head; 5585 rb_dec_page(&last); 5586 5587 /* Clear the old HEAD flag */ 5588 rb_list_head_clear(cpu_buffer->head_page->list.prev); 5589 5590 prev_reader->list.next = prev_head->list.next; 5591 prev_reader->list.prev = prev_head->list.prev; 5592 5593 /* Swap prev_reader with new_reader */ 5594 last->list.next = &prev_reader->list; 5595 new_head->list.prev = &prev_reader->list; 5596 5597 new_reader->list.prev = &new_reader->list; 5598 new_reader->list.next = &new_head->list; 5599 5600 /* Reactivate the HEAD flag */ 5601 rb_set_list_to_head(&last->list); 5602 5603 cpu_buffer->head_page = new_head; 5604 cpu_buffer->reader_page = new_reader; 5605 cpu_buffer->pages = &new_head->list; 5606 cpu_buffer->read_stamp = new_reader->page->time_stamp; 5607 cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; 5608 5609 return rb_page_size(cpu_buffer->reader_page) ? cpu_buffer->reader_page : NULL; 5610 } 5611 5612 static struct buffer_page * 5613 __rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5614 { 5615 struct buffer_page *reader = NULL; 5616 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5617 unsigned long overwrite; 5618 unsigned long flags; 5619 int nr_loops = 0; 5620 bool ret; 5621 5622 local_irq_save(flags); 5623 arch_spin_lock(&cpu_buffer->lock); 5624 5625 again: 5626 /* 5627 * This should normally only loop twice. But because the 5628 * start of the reader inserts an empty page, it causes 5629 * a case where we will loop three times. There should be no 5630 * reason to loop four times (that I know of). 5631 */ 5632 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5633 reader = NULL; 5634 goto out; 5635 } 5636 5637 reader = cpu_buffer->reader_page; 5638 5639 /* If there's more to read, return this page */ 5640 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5641 goto out; 5642 5643 /* Never should we have an index greater than the size */ 5644 if (RB_WARN_ON(cpu_buffer, 5645 cpu_buffer->reader_page->read > rb_page_size(reader))) 5646 goto out; 5647 5648 /* check if we caught up to the tail */ 5649 reader = NULL; 5650 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5651 goto out; 5652 5653 /* Don't bother swapping if the ring buffer is empty */ 5654 if (rb_num_of_entries(cpu_buffer) == 0) 5655 goto out; 5656 5657 /* 5658 * Reset the reader page to size zero. 5659 */ 5660 local_set(&cpu_buffer->reader_page->write, 0); 5661 local_set(&cpu_buffer->reader_page->entries, 0); 5662 cpu_buffer->reader_page->real_end = 0; 5663 5664 spin: 5665 /* 5666 * Splice the empty reader page into the list around the head. 5667 */ 5668 reader = rb_set_head_page(cpu_buffer); 5669 if (!reader) 5670 goto out; 5671 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5672 cpu_buffer->reader_page->list.prev = reader->list.prev; 5673 5674 /* 5675 * cpu_buffer->pages just needs to point to the buffer, it 5676 * has no specific buffer page to point to. Lets move it out 5677 * of our way so we don't accidentally swap it. 5678 */ 5679 cpu_buffer->pages = reader->list.prev; 5680 5681 /* The reader page will be pointing to the new head */ 5682 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5683 5684 /* 5685 * We want to make sure we read the overruns after we set up our 5686 * pointers to the next object. The writer side does a 5687 * cmpxchg to cross pages which acts as the mb on the writer 5688 * side. Note, the reader will constantly fail the swap 5689 * while the writer is updating the pointers, so this 5690 * guarantees that the overwrite recorded here is the one we 5691 * want to compare with the last_overrun. 5692 */ 5693 smp_mb(); 5694 overwrite = local_read(&(cpu_buffer->overrun)); 5695 5696 /* 5697 * Here's the tricky part. 5698 * 5699 * We need to move the pointer past the header page. 5700 * But we can only do that if a writer is not currently 5701 * moving it. The page before the header page has the 5702 * flag bit '1' set if it is pointing to the page we want. 5703 * but if the writer is in the process of moving it 5704 * then it will be '2' or already moved '0'. 5705 */ 5706 5707 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5708 5709 /* 5710 * If we did not convert it, then we must try again. 5711 */ 5712 if (!ret) 5713 goto spin; 5714 5715 if (cpu_buffer->ring_meta) 5716 rb_update_meta_reader(cpu_buffer, reader); 5717 5718 /* 5719 * Yay! We succeeded in replacing the page. 5720 * 5721 * Now make the new head point back to the reader page. 5722 */ 5723 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5724 rb_inc_page(&cpu_buffer->head_page); 5725 5726 cpu_buffer->cnt++; 5727 local_inc(&cpu_buffer->pages_read); 5728 5729 /* Finally update the reader page to the new head */ 5730 cpu_buffer->reader_page = reader; 5731 cpu_buffer->reader_page->read = 0; 5732 5733 if (overwrite != cpu_buffer->last_overrun) { 5734 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5735 cpu_buffer->last_overrun = overwrite; 5736 } 5737 5738 goto again; 5739 5740 out: 5741 /* Update the read_stamp on the first event */ 5742 if (reader && reader->read == 0) 5743 cpu_buffer->read_stamp = reader->page->time_stamp; 5744 5745 arch_spin_unlock(&cpu_buffer->lock); 5746 local_irq_restore(flags); 5747 5748 /* 5749 * The writer has preempt disable, wait for it. But not forever 5750 * Although, 1 second is pretty much "forever" 5751 */ 5752 #define USECS_WAIT 1000000 5753 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5754 /* If the write is past the end of page, a writer is still updating it */ 5755 if (likely(!reader || rb_page_write(reader) <= bsize)) 5756 break; 5757 5758 udelay(1); 5759 5760 /* Get the latest version of the reader write value */ 5761 smp_rmb(); 5762 } 5763 5764 /* The writer is not moving forward? Something is wrong */ 5765 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5766 reader = NULL; 5767 5768 /* 5769 * Make sure we see any padding after the write update 5770 * (see rb_reset_tail()). 5771 * 5772 * In addition, a writer may be writing on the reader page 5773 * if the page has not been fully filled, so the read barrier 5774 * is also needed to make sure we see the content of what is 5775 * committed by the writer (see rb_set_commit_to_write()). 5776 */ 5777 smp_rmb(); 5778 5779 5780 return reader; 5781 } 5782 5783 static struct buffer_page * 5784 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5785 { 5786 return cpu_buffer->remote ? __rb_get_reader_page_from_remote(cpu_buffer) : 5787 __rb_get_reader_page(cpu_buffer); 5788 } 5789 5790 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5791 { 5792 struct ring_buffer_event *event; 5793 struct buffer_page *reader; 5794 unsigned length; 5795 5796 reader = rb_get_reader_page(cpu_buffer); 5797 5798 /* This function should not be called when buffer is empty */ 5799 if (RB_WARN_ON(cpu_buffer, !reader)) 5800 return; 5801 5802 event = rb_reader_event(cpu_buffer); 5803 5804 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5805 cpu_buffer->read++; 5806 5807 rb_update_read_stamp(cpu_buffer, event); 5808 5809 length = rb_event_length(event); 5810 cpu_buffer->reader_page->read += length; 5811 cpu_buffer->read_bytes += length; 5812 } 5813 5814 static void rb_advance_iter(struct ring_buffer_iter *iter) 5815 { 5816 struct ring_buffer_per_cpu *cpu_buffer; 5817 5818 cpu_buffer = iter->cpu_buffer; 5819 5820 /* If head == next_event then we need to jump to the next event */ 5821 if (iter->head == iter->next_event) { 5822 /* If the event gets overwritten again, there's nothing to do */ 5823 if (rb_iter_head_event(iter) == NULL) 5824 return; 5825 } 5826 5827 iter->head = iter->next_event; 5828 5829 /* 5830 * Check if we are at the end of the buffer. 5831 */ 5832 if (iter->next_event >= rb_page_size(iter->head_page)) { 5833 /* discarded commits can make the page empty */ 5834 if (iter->head_page == cpu_buffer->commit_page) 5835 return; 5836 rb_inc_iter(iter); 5837 return; 5838 } 5839 5840 rb_update_iter_read_stamp(iter, iter->event); 5841 } 5842 5843 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5844 { 5845 return cpu_buffer->lost_events; 5846 } 5847 5848 static struct ring_buffer_event * 5849 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5850 unsigned long *lost_events) 5851 { 5852 struct ring_buffer_event *event; 5853 struct buffer_page *reader; 5854 int nr_loops = 0; 5855 5856 if (ts) 5857 *ts = 0; 5858 again: 5859 /* 5860 * We repeat when a time extend is encountered. 5861 * Since the time extend is always attached to a data event, 5862 * we should never loop more than once. 5863 * (We never hit the following condition more than twice). 5864 */ 5865 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5866 return NULL; 5867 5868 reader = rb_get_reader_page(cpu_buffer); 5869 if (!reader) 5870 return NULL; 5871 5872 event = rb_reader_event(cpu_buffer); 5873 5874 switch (event->type_len) { 5875 case RINGBUF_TYPE_PADDING: 5876 if (rb_null_event(event)) 5877 RB_WARN_ON(cpu_buffer, 1); 5878 /* 5879 * Because the writer could be discarding every 5880 * event it creates (which would probably be bad) 5881 * if we were to go back to "again" then we may never 5882 * catch up, and will trigger the warn on, or lock 5883 * the box. Return the padding, and we will release 5884 * the current locks, and try again. 5885 */ 5886 return event; 5887 5888 case RINGBUF_TYPE_TIME_EXTEND: 5889 /* Internal data, OK to advance */ 5890 rb_advance_reader(cpu_buffer); 5891 goto again; 5892 5893 case RINGBUF_TYPE_TIME_STAMP: 5894 if (ts) { 5895 *ts = rb_event_time_stamp(event); 5896 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5897 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5898 cpu_buffer->cpu, ts); 5899 } 5900 /* Internal data, OK to advance */ 5901 rb_advance_reader(cpu_buffer); 5902 goto again; 5903 5904 case RINGBUF_TYPE_DATA: 5905 if (ts && !(*ts)) { 5906 *ts = cpu_buffer->read_stamp + event->time_delta; 5907 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5908 cpu_buffer->cpu, ts); 5909 } 5910 if (lost_events) 5911 *lost_events = rb_lost_events(cpu_buffer); 5912 return event; 5913 5914 default: 5915 RB_WARN_ON(cpu_buffer, 1); 5916 } 5917 5918 return NULL; 5919 } 5920 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5921 5922 static struct ring_buffer_event * 5923 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5924 { 5925 struct trace_buffer *buffer; 5926 struct ring_buffer_per_cpu *cpu_buffer; 5927 struct ring_buffer_event *event; 5928 int nr_loops = 0; 5929 5930 if (ts) 5931 *ts = 0; 5932 5933 cpu_buffer = iter->cpu_buffer; 5934 buffer = cpu_buffer->buffer; 5935 5936 /* 5937 * Check if someone performed a consuming read to the buffer 5938 * or removed some pages from the buffer. In these cases, 5939 * iterator was invalidated and we need to reset it. 5940 */ 5941 if (unlikely(iter->cache_read != cpu_buffer->read || 5942 iter->cache_reader_page != cpu_buffer->reader_page || 5943 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5944 rb_iter_reset(iter); 5945 5946 again: 5947 if (ring_buffer_iter_empty(iter)) 5948 return NULL; 5949 5950 /* 5951 * As the writer can mess with what the iterator is trying 5952 * to read, just give up if we fail to get an event after 5953 * three tries. The iterator is not as reliable when reading 5954 * the ring buffer with an active write as the consumer is. 5955 * Do not warn if the three failures is reached. 5956 */ 5957 if (++nr_loops > 3) 5958 return NULL; 5959 5960 if (rb_per_cpu_empty(cpu_buffer)) 5961 return NULL; 5962 5963 if (iter->head >= rb_page_size(iter->head_page)) { 5964 rb_inc_iter(iter); 5965 goto again; 5966 } 5967 5968 event = rb_iter_head_event(iter); 5969 if (!event) 5970 goto again; 5971 5972 switch (event->type_len) { 5973 case RINGBUF_TYPE_PADDING: 5974 if (rb_null_event(event)) { 5975 rb_inc_iter(iter); 5976 goto again; 5977 } 5978 rb_advance_iter(iter); 5979 return event; 5980 5981 case RINGBUF_TYPE_TIME_EXTEND: 5982 /* Internal data, OK to advance */ 5983 rb_advance_iter(iter); 5984 goto again; 5985 5986 case RINGBUF_TYPE_TIME_STAMP: 5987 if (ts) { 5988 *ts = rb_event_time_stamp(event); 5989 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5990 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5991 cpu_buffer->cpu, ts); 5992 } 5993 /* Internal data, OK to advance */ 5994 rb_advance_iter(iter); 5995 goto again; 5996 5997 case RINGBUF_TYPE_DATA: 5998 if (ts && !(*ts)) { 5999 *ts = iter->read_stamp + event->time_delta; 6000 ring_buffer_normalize_time_stamp(buffer, 6001 cpu_buffer->cpu, ts); 6002 } 6003 return event; 6004 6005 default: 6006 RB_WARN_ON(cpu_buffer, 1); 6007 } 6008 6009 return NULL; 6010 } 6011 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 6012 6013 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 6014 { 6015 if (likely(!in_nmi())) { 6016 raw_spin_lock(&cpu_buffer->reader_lock); 6017 return true; 6018 } 6019 6020 /* 6021 * If an NMI die dumps out the content of the ring buffer 6022 * trylock must be used to prevent a deadlock if the NMI 6023 * preempted a task that holds the ring buffer locks. If 6024 * we get the lock then all is fine, if not, then continue 6025 * to do the read, but this can corrupt the ring buffer, 6026 * so it must be permanently disabled from future writes. 6027 * Reading from NMI is a oneshot deal. 6028 */ 6029 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 6030 return true; 6031 6032 /* Continue without locking, but disable the ring buffer */ 6033 atomic_inc(&cpu_buffer->record_disabled); 6034 return false; 6035 } 6036 6037 static inline void 6038 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 6039 { 6040 if (likely(locked)) 6041 raw_spin_unlock(&cpu_buffer->reader_lock); 6042 } 6043 6044 /** 6045 * ring_buffer_peek - peek at the next event to be read 6046 * @buffer: The ring buffer to read 6047 * @cpu: The cpu to peak at 6048 * @ts: The timestamp counter of this event. 6049 * @lost_events: a variable to store if events were lost (may be NULL) 6050 * 6051 * This will return the event that will be read next, but does 6052 * not consume the data. 6053 */ 6054 struct ring_buffer_event * 6055 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 6056 unsigned long *lost_events) 6057 { 6058 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6059 struct ring_buffer_event *event; 6060 unsigned long flags; 6061 bool dolock; 6062 6063 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6064 return NULL; 6065 6066 again: 6067 local_irq_save(flags); 6068 dolock = rb_reader_lock(cpu_buffer); 6069 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6070 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6071 rb_advance_reader(cpu_buffer); 6072 rb_reader_unlock(cpu_buffer, dolock); 6073 local_irq_restore(flags); 6074 6075 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6076 goto again; 6077 6078 return event; 6079 } 6080 6081 /** ring_buffer_iter_dropped - report if there are dropped events 6082 * @iter: The ring buffer iterator 6083 * 6084 * Returns true if there was dropped events since the last peek. 6085 */ 6086 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 6087 { 6088 bool ret = iter->missed_events != 0; 6089 6090 iter->missed_events = 0; 6091 return ret; 6092 } 6093 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 6094 6095 /** 6096 * ring_buffer_iter_peek - peek at the next event to be read 6097 * @iter: The ring buffer iterator 6098 * @ts: The timestamp counter of this event. 6099 * 6100 * This will return the event that will be read next, but does 6101 * not increment the iterator. 6102 */ 6103 struct ring_buffer_event * 6104 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 6105 { 6106 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6107 struct ring_buffer_event *event; 6108 unsigned long flags; 6109 6110 again: 6111 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6112 event = rb_iter_peek(iter, ts); 6113 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6114 6115 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6116 goto again; 6117 6118 return event; 6119 } 6120 6121 /** 6122 * ring_buffer_consume - return an event and consume it 6123 * @buffer: The ring buffer to get the next event from 6124 * @cpu: the cpu to read the buffer from 6125 * @ts: a variable to store the timestamp (may be NULL) 6126 * @lost_events: a variable to store if events were lost (may be NULL) 6127 * 6128 * Returns the next event in the ring buffer, and that event is consumed. 6129 * Meaning, that sequential reads will keep returning a different event, 6130 * and eventually empty the ring buffer if the producer is slower. 6131 */ 6132 struct ring_buffer_event * 6133 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 6134 unsigned long *lost_events) 6135 { 6136 struct ring_buffer_per_cpu *cpu_buffer; 6137 struct ring_buffer_event *event = NULL; 6138 unsigned long flags; 6139 bool dolock; 6140 6141 again: 6142 /* might be called in atomic */ 6143 preempt_disable(); 6144 6145 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6146 goto out; 6147 6148 cpu_buffer = buffer->buffers[cpu]; 6149 local_irq_save(flags); 6150 dolock = rb_reader_lock(cpu_buffer); 6151 6152 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 6153 if (event) { 6154 cpu_buffer->lost_events = 0; 6155 rb_advance_reader(cpu_buffer); 6156 } 6157 6158 rb_reader_unlock(cpu_buffer, dolock); 6159 local_irq_restore(flags); 6160 6161 out: 6162 preempt_enable(); 6163 6164 if (event && event->type_len == RINGBUF_TYPE_PADDING) 6165 goto again; 6166 6167 return event; 6168 } 6169 EXPORT_SYMBOL_GPL(ring_buffer_consume); 6170 6171 /** 6172 * ring_buffer_read_start - start a non consuming read of the buffer 6173 * @buffer: The ring buffer to read from 6174 * @cpu: The cpu buffer to iterate over 6175 * @flags: gfp flags to use for memory allocation 6176 * 6177 * This creates an iterator to allow non-consuming iteration through 6178 * the buffer. If the buffer is disabled for writing, it will produce 6179 * the same information each time, but if the buffer is still writing 6180 * then the first hit of a write will cause the iteration to stop. 6181 * 6182 * Must be paired with ring_buffer_read_finish. 6183 */ 6184 struct ring_buffer_iter * 6185 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 6186 { 6187 struct ring_buffer_per_cpu *cpu_buffer; 6188 struct ring_buffer_iter *iter; 6189 6190 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6191 return NULL; 6192 6193 iter = kzalloc_obj(*iter, flags); 6194 if (!iter) 6195 return NULL; 6196 6197 /* Holds the entire event: data and meta data */ 6198 iter->event_size = buffer->subbuf_size; 6199 iter->event = kmalloc(iter->event_size, flags); 6200 if (!iter->event) { 6201 kfree(iter); 6202 return NULL; 6203 } 6204 6205 cpu_buffer = buffer->buffers[cpu]; 6206 6207 iter->cpu_buffer = cpu_buffer; 6208 6209 atomic_inc(&cpu_buffer->resize_disabled); 6210 6211 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6212 arch_spin_lock(&cpu_buffer->lock); 6213 rb_iter_reset(iter); 6214 arch_spin_unlock(&cpu_buffer->lock); 6215 6216 return iter; 6217 } 6218 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6219 6220 /** 6221 * ring_buffer_read_finish - finish reading the iterator of the buffer 6222 * @iter: The iterator retrieved by ring_buffer_start 6223 * 6224 * This re-enables resizing of the buffer, and frees the iterator. 6225 */ 6226 void 6227 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6228 { 6229 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6230 6231 /* Use this opportunity to check the integrity of the ring buffer. */ 6232 rb_check_pages(cpu_buffer); 6233 6234 atomic_dec(&cpu_buffer->resize_disabled); 6235 kfree(iter->event); 6236 kfree(iter); 6237 } 6238 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6239 6240 /** 6241 * ring_buffer_iter_advance - advance the iterator to the next location 6242 * @iter: The ring buffer iterator 6243 * 6244 * Move the location of the iterator such that the next read will 6245 * be the next location of the iterator. 6246 */ 6247 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6248 { 6249 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6250 unsigned long flags; 6251 6252 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6253 6254 rb_advance_iter(iter); 6255 6256 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6257 } 6258 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6259 6260 /** 6261 * ring_buffer_size - return the size of the ring buffer (in bytes) 6262 * @buffer: The ring buffer. 6263 * @cpu: The CPU to get ring buffer size from. 6264 */ 6265 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6266 { 6267 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6268 return 0; 6269 6270 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6271 } 6272 EXPORT_SYMBOL_GPL(ring_buffer_size); 6273 6274 /** 6275 * ring_buffer_max_event_size - return the max data size of an event 6276 * @buffer: The ring buffer. 6277 * 6278 * Returns the maximum size an event can be. 6279 */ 6280 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6281 { 6282 /* If abs timestamp is requested, events have a timestamp too */ 6283 if (ring_buffer_time_stamp_abs(buffer)) 6284 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6285 return buffer->max_data_size; 6286 } 6287 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6288 6289 static void rb_clear_buffer_page(struct buffer_page *page) 6290 { 6291 local_set(&page->write, 0); 6292 local_set(&page->entries, 0); 6293 rb_init_page(page->page); 6294 page->read = 0; 6295 } 6296 6297 /* 6298 * When the buffer is memory mapped to user space, each sub buffer 6299 * has a unique id that is used by the meta data to tell the user 6300 * where the current reader page is. 6301 * 6302 * For a normal allocated ring buffer, the id is saved in the buffer page 6303 * id field, and updated via this function. 6304 * 6305 * But for a fixed memory mapped buffer, the id is already assigned for 6306 * fixed memory ordering in the memory layout and can not be used. Instead 6307 * the index of where the page lies in the memory layout is used. 6308 * 6309 * For the normal pages, set the buffer page id with the passed in @id 6310 * value and return that. 6311 * 6312 * For fixed memory mapped pages, get the page index in the memory layout 6313 * and return that as the id. 6314 */ 6315 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6316 struct buffer_page *bpage, int id) 6317 { 6318 /* 6319 * For boot buffers, the id is the index, 6320 * otherwise, set the buffer page with this id 6321 */ 6322 if (cpu_buffer->ring_meta) 6323 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6324 else 6325 bpage->id = id; 6326 6327 return id; 6328 } 6329 6330 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6331 { 6332 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6333 6334 if (!meta) 6335 return; 6336 6337 meta->reader.read = cpu_buffer->reader_page->read; 6338 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6339 cpu_buffer->reader_page->id); 6340 6341 meta->reader.lost_events = cpu_buffer->lost_events; 6342 6343 meta->entries = local_read(&cpu_buffer->entries); 6344 meta->overrun = local_read(&cpu_buffer->overrun); 6345 meta->read = cpu_buffer->read; 6346 meta->pages_lost = local_read(&cpu_buffer->pages_lost); 6347 meta->pages_touched = local_read(&cpu_buffer->pages_touched); 6348 6349 /* Some archs do not have data cache coherency between kernel and user-space */ 6350 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6351 } 6352 6353 static void 6354 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6355 { 6356 struct buffer_page *page; 6357 6358 if (cpu_buffer->remote) { 6359 if (!cpu_buffer->remote->reset) 6360 return; 6361 6362 cpu_buffer->remote->reset(cpu_buffer->cpu, cpu_buffer->remote->priv); 6363 rb_read_remote_meta_page(cpu_buffer); 6364 6365 /* Read related values, not covered by the meta-page */ 6366 local_set(&cpu_buffer->pages_read, 0); 6367 cpu_buffer->read = 0; 6368 cpu_buffer->read_bytes = 0; 6369 cpu_buffer->last_overrun = 0; 6370 cpu_buffer->reader_page->read = 0; 6371 6372 return; 6373 } 6374 6375 rb_head_page_deactivate(cpu_buffer); 6376 6377 cpu_buffer->head_page 6378 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6379 rb_clear_buffer_page(cpu_buffer->head_page); 6380 list_for_each_entry(page, cpu_buffer->pages, list) { 6381 rb_clear_buffer_page(page); 6382 } 6383 6384 cpu_buffer->tail_page = cpu_buffer->head_page; 6385 cpu_buffer->commit_page = cpu_buffer->head_page; 6386 6387 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6388 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6389 rb_clear_buffer_page(cpu_buffer->reader_page); 6390 6391 local_set(&cpu_buffer->entries_bytes, 0); 6392 local_set(&cpu_buffer->overrun, 0); 6393 local_set(&cpu_buffer->commit_overrun, 0); 6394 local_set(&cpu_buffer->dropped_events, 0); 6395 local_set(&cpu_buffer->entries, 0); 6396 local_set(&cpu_buffer->committing, 0); 6397 local_set(&cpu_buffer->commits, 0); 6398 local_set(&cpu_buffer->pages_touched, 0); 6399 local_set(&cpu_buffer->pages_lost, 0); 6400 local_set(&cpu_buffer->pages_read, 0); 6401 cpu_buffer->last_pages_touch = 0; 6402 cpu_buffer->shortest_full = 0; 6403 cpu_buffer->read = 0; 6404 cpu_buffer->read_bytes = 0; 6405 6406 rb_time_set(&cpu_buffer->write_stamp, 0); 6407 rb_time_set(&cpu_buffer->before_stamp, 0); 6408 6409 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6410 6411 cpu_buffer->lost_events = 0; 6412 cpu_buffer->last_overrun = 0; 6413 6414 rb_head_page_activate(cpu_buffer); 6415 cpu_buffer->pages_removed = 0; 6416 6417 if (cpu_buffer->mapped) { 6418 rb_update_meta_page(cpu_buffer); 6419 if (cpu_buffer->ring_meta) { 6420 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6421 meta->commit_buffer = meta->head_buffer; 6422 } 6423 } 6424 } 6425 6426 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6427 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6428 { 6429 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6430 6431 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6432 return; 6433 6434 arch_spin_lock(&cpu_buffer->lock); 6435 6436 rb_reset_cpu(cpu_buffer); 6437 6438 arch_spin_unlock(&cpu_buffer->lock); 6439 } 6440 6441 /** 6442 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6443 * @buffer: The ring buffer to reset a per cpu buffer of 6444 * @cpu: The CPU buffer to be reset 6445 */ 6446 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6447 { 6448 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6449 6450 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6451 return; 6452 6453 /* prevent another thread from changing buffer sizes */ 6454 mutex_lock(&buffer->mutex); 6455 6456 atomic_inc(&cpu_buffer->resize_disabled); 6457 atomic_inc(&cpu_buffer->record_disabled); 6458 6459 /* Make sure all commits have finished */ 6460 synchronize_rcu(); 6461 6462 reset_disabled_cpu_buffer(cpu_buffer); 6463 6464 atomic_dec(&cpu_buffer->record_disabled); 6465 atomic_dec(&cpu_buffer->resize_disabled); 6466 6467 mutex_unlock(&buffer->mutex); 6468 } 6469 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6470 6471 /* Flag to ensure proper resetting of atomic variables */ 6472 #define RESET_BIT (1 << 30) 6473 6474 /** 6475 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6476 * @buffer: The ring buffer to reset a per cpu buffer of 6477 */ 6478 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6479 { 6480 struct ring_buffer_per_cpu *cpu_buffer; 6481 int cpu; 6482 6483 /* prevent another thread from changing buffer sizes */ 6484 mutex_lock(&buffer->mutex); 6485 6486 for_each_online_buffer_cpu(buffer, cpu) { 6487 cpu_buffer = buffer->buffers[cpu]; 6488 6489 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6490 atomic_inc(&cpu_buffer->record_disabled); 6491 } 6492 6493 /* Make sure all commits have finished */ 6494 synchronize_rcu(); 6495 6496 for_each_buffer_cpu(buffer, cpu) { 6497 cpu_buffer = buffer->buffers[cpu]; 6498 6499 /* 6500 * If a CPU came online during the synchronize_rcu(), then 6501 * ignore it. 6502 */ 6503 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6504 continue; 6505 6506 reset_disabled_cpu_buffer(cpu_buffer); 6507 6508 atomic_dec(&cpu_buffer->record_disabled); 6509 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6510 } 6511 6512 mutex_unlock(&buffer->mutex); 6513 } 6514 6515 /** 6516 * ring_buffer_reset - reset a ring buffer 6517 * @buffer: The ring buffer to reset all cpu buffers 6518 */ 6519 void ring_buffer_reset(struct trace_buffer *buffer) 6520 { 6521 struct ring_buffer_per_cpu *cpu_buffer; 6522 int cpu; 6523 6524 /* prevent another thread from changing buffer sizes */ 6525 mutex_lock(&buffer->mutex); 6526 6527 for_each_buffer_cpu(buffer, cpu) { 6528 cpu_buffer = buffer->buffers[cpu]; 6529 6530 atomic_inc(&cpu_buffer->resize_disabled); 6531 atomic_inc(&cpu_buffer->record_disabled); 6532 } 6533 6534 /* Make sure all commits have finished */ 6535 synchronize_rcu(); 6536 6537 for_each_buffer_cpu(buffer, cpu) { 6538 cpu_buffer = buffer->buffers[cpu]; 6539 6540 reset_disabled_cpu_buffer(cpu_buffer); 6541 6542 atomic_dec(&cpu_buffer->record_disabled); 6543 atomic_dec(&cpu_buffer->resize_disabled); 6544 } 6545 6546 mutex_unlock(&buffer->mutex); 6547 } 6548 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6549 6550 /** 6551 * ring_buffer_empty - is the ring buffer empty? 6552 * @buffer: The ring buffer to test 6553 */ 6554 bool ring_buffer_empty(struct trace_buffer *buffer) 6555 { 6556 struct ring_buffer_per_cpu *cpu_buffer; 6557 unsigned long flags; 6558 bool dolock; 6559 bool ret; 6560 int cpu; 6561 6562 /* yes this is racy, but if you don't like the race, lock the buffer */ 6563 for_each_buffer_cpu(buffer, cpu) { 6564 cpu_buffer = buffer->buffers[cpu]; 6565 local_irq_save(flags); 6566 dolock = rb_reader_lock(cpu_buffer); 6567 ret = rb_per_cpu_empty(cpu_buffer); 6568 rb_reader_unlock(cpu_buffer, dolock); 6569 local_irq_restore(flags); 6570 6571 if (!ret) 6572 return false; 6573 } 6574 6575 return true; 6576 } 6577 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6578 6579 /** 6580 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6581 * @buffer: The ring buffer 6582 * @cpu: The CPU buffer to test 6583 */ 6584 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6585 { 6586 struct ring_buffer_per_cpu *cpu_buffer; 6587 unsigned long flags; 6588 bool dolock; 6589 bool ret; 6590 6591 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6592 return true; 6593 6594 cpu_buffer = buffer->buffers[cpu]; 6595 local_irq_save(flags); 6596 dolock = rb_reader_lock(cpu_buffer); 6597 ret = rb_per_cpu_empty(cpu_buffer); 6598 rb_reader_unlock(cpu_buffer, dolock); 6599 local_irq_restore(flags); 6600 6601 return ret; 6602 } 6603 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6604 6605 int ring_buffer_poll_remote(struct trace_buffer *buffer, int cpu) 6606 { 6607 struct ring_buffer_per_cpu *cpu_buffer; 6608 6609 if (cpu != RING_BUFFER_ALL_CPUS) { 6610 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6611 return -EINVAL; 6612 6613 cpu_buffer = buffer->buffers[cpu]; 6614 6615 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6616 if (rb_read_remote_meta_page(cpu_buffer)) 6617 rb_wakeups(buffer, cpu_buffer); 6618 6619 return 0; 6620 } 6621 6622 guard(cpus_read_lock)(); 6623 6624 /* 6625 * Make sure all the ring buffers are up to date before we start reading 6626 * them. 6627 */ 6628 for_each_buffer_cpu(buffer, cpu) { 6629 cpu_buffer = buffer->buffers[cpu]; 6630 6631 guard(raw_spinlock)(&cpu_buffer->reader_lock); 6632 rb_read_remote_meta_page(cpu_buffer); 6633 } 6634 6635 for_each_buffer_cpu(buffer, cpu) { 6636 cpu_buffer = buffer->buffers[cpu]; 6637 6638 if (rb_num_of_entries(cpu_buffer)) 6639 rb_wakeups(buffer, cpu_buffer); 6640 } 6641 6642 return 0; 6643 } 6644 6645 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6646 /** 6647 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6648 * @buffer_a: One buffer to swap with 6649 * @buffer_b: The other buffer to swap with 6650 * @cpu: the CPU of the buffers to swap 6651 * 6652 * This function is useful for tracers that want to take a "snapshot" 6653 * of a CPU buffer and has another back up buffer lying around. 6654 * it is expected that the tracer handles the cpu buffer not being 6655 * used at the moment. 6656 */ 6657 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6658 struct trace_buffer *buffer_b, int cpu) 6659 { 6660 struct ring_buffer_per_cpu *cpu_buffer_a; 6661 struct ring_buffer_per_cpu *cpu_buffer_b; 6662 int ret = -EINVAL; 6663 6664 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6665 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6666 return -EINVAL; 6667 6668 cpu_buffer_a = buffer_a->buffers[cpu]; 6669 cpu_buffer_b = buffer_b->buffers[cpu]; 6670 6671 /* It's up to the callers to not try to swap mapped buffers */ 6672 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6673 return -EBUSY; 6674 6675 /* At least make sure the two buffers are somewhat the same */ 6676 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6677 return -EINVAL; 6678 6679 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6680 return -EINVAL; 6681 6682 if (atomic_read(&buffer_a->record_disabled)) 6683 return -EAGAIN; 6684 6685 if (atomic_read(&buffer_b->record_disabled)) 6686 return -EAGAIN; 6687 6688 if (atomic_read(&cpu_buffer_a->record_disabled)) 6689 return -EAGAIN; 6690 6691 if (atomic_read(&cpu_buffer_b->record_disabled)) 6692 return -EAGAIN; 6693 6694 /* 6695 * We can't do a synchronize_rcu here because this 6696 * function can be called in atomic context. 6697 * Normally this will be called from the same CPU as cpu. 6698 * If not it's up to the caller to protect this. 6699 */ 6700 atomic_inc(&cpu_buffer_a->record_disabled); 6701 atomic_inc(&cpu_buffer_b->record_disabled); 6702 6703 ret = -EBUSY; 6704 if (local_read(&cpu_buffer_a->committing)) 6705 goto out_dec; 6706 if (local_read(&cpu_buffer_b->committing)) 6707 goto out_dec; 6708 6709 /* 6710 * When resize is in progress, we cannot swap it because 6711 * it will mess the state of the cpu buffer. 6712 */ 6713 if (atomic_read(&buffer_a->resizing)) 6714 goto out_dec; 6715 if (atomic_read(&buffer_b->resizing)) 6716 goto out_dec; 6717 6718 buffer_a->buffers[cpu] = cpu_buffer_b; 6719 buffer_b->buffers[cpu] = cpu_buffer_a; 6720 6721 cpu_buffer_b->buffer = buffer_a; 6722 cpu_buffer_a->buffer = buffer_b; 6723 6724 ret = 0; 6725 6726 out_dec: 6727 atomic_dec(&cpu_buffer_a->record_disabled); 6728 atomic_dec(&cpu_buffer_b->record_disabled); 6729 return ret; 6730 } 6731 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6732 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6733 6734 /** 6735 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6736 * @buffer: the buffer to allocate for. 6737 * @cpu: the cpu buffer to allocate. 6738 * 6739 * This function is used in conjunction with ring_buffer_read_page. 6740 * When reading a full page from the ring buffer, these functions 6741 * can be used to speed up the process. The calling function should 6742 * allocate a few pages first with this function. Then when it 6743 * needs to get pages from the ring buffer, it passes the result 6744 * of this function into ring_buffer_read_page, which will swap 6745 * the page that was allocated, with the read page of the buffer. 6746 * 6747 * Returns: 6748 * The page allocated, or ERR_PTR 6749 */ 6750 struct buffer_data_read_page * 6751 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6752 { 6753 struct ring_buffer_per_cpu *cpu_buffer; 6754 struct buffer_data_read_page *bpage = NULL; 6755 unsigned long flags; 6756 6757 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6758 return ERR_PTR(-ENODEV); 6759 6760 bpage = kzalloc_obj(*bpage); 6761 if (!bpage) 6762 return ERR_PTR(-ENOMEM); 6763 6764 bpage->order = buffer->subbuf_order; 6765 cpu_buffer = buffer->buffers[cpu]; 6766 local_irq_save(flags); 6767 arch_spin_lock(&cpu_buffer->lock); 6768 6769 if (cpu_buffer->free_page) { 6770 bpage->data = cpu_buffer->free_page; 6771 cpu_buffer->free_page = NULL; 6772 } 6773 6774 arch_spin_unlock(&cpu_buffer->lock); 6775 local_irq_restore(flags); 6776 6777 if (bpage->data) { 6778 rb_init_page(bpage->data); 6779 } else { 6780 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6781 if (!bpage->data) { 6782 kfree(bpage); 6783 return ERR_PTR(-ENOMEM); 6784 } 6785 } 6786 6787 return bpage; 6788 } 6789 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6790 6791 /** 6792 * ring_buffer_free_read_page - free an allocated read page 6793 * @buffer: the buffer the page was allocate for 6794 * @cpu: the cpu buffer the page came from 6795 * @data_page: the page to free 6796 * 6797 * Free a page allocated from ring_buffer_alloc_read_page. 6798 */ 6799 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6800 struct buffer_data_read_page *data_page) 6801 { 6802 struct ring_buffer_per_cpu *cpu_buffer; 6803 struct buffer_data_page *bpage = data_page->data; 6804 struct page *page = virt_to_page(bpage); 6805 unsigned long flags; 6806 6807 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6808 return; 6809 6810 cpu_buffer = buffer->buffers[cpu]; 6811 6812 /* 6813 * If the page is still in use someplace else, or order of the page 6814 * is different from the subbuffer order of the buffer - 6815 * we can't reuse it 6816 */ 6817 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6818 goto out; 6819 6820 local_irq_save(flags); 6821 arch_spin_lock(&cpu_buffer->lock); 6822 6823 if (!cpu_buffer->free_page) { 6824 cpu_buffer->free_page = bpage; 6825 bpage = NULL; 6826 } 6827 6828 arch_spin_unlock(&cpu_buffer->lock); 6829 local_irq_restore(flags); 6830 6831 out: 6832 free_pages((unsigned long)bpage, data_page->order); 6833 kfree(data_page); 6834 } 6835 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6836 6837 /** 6838 * ring_buffer_read_page - extract a page from the ring buffer 6839 * @buffer: buffer to extract from 6840 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6841 * @len: amount to extract 6842 * @cpu: the cpu of the buffer to extract 6843 * @full: should the extraction only happen when the page is full. 6844 * 6845 * This function will pull out a page from the ring buffer and consume it. 6846 * @data_page must be the address of the variable that was returned 6847 * from ring_buffer_alloc_read_page. This is because the page might be used 6848 * to swap with a page in the ring buffer. 6849 * 6850 * for example: 6851 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6852 * if (IS_ERR(rpage)) 6853 * return PTR_ERR(rpage); 6854 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6855 * if (ret >= 0) 6856 * process_page(ring_buffer_read_page_data(rpage), ret); 6857 * ring_buffer_free_read_page(buffer, cpu, rpage); 6858 * 6859 * When @full is set, the function will not return true unless 6860 * the writer is off the reader page. 6861 * 6862 * Note: it is up to the calling functions to handle sleeps and wakeups. 6863 * The ring buffer can be used anywhere in the kernel and can not 6864 * blindly call wake_up. The layer that uses the ring buffer must be 6865 * responsible for that. 6866 * 6867 * Returns: 6868 * >=0 if data has been transferred, returns the offset of consumed data. 6869 * <0 if no data has been transferred. 6870 */ 6871 int ring_buffer_read_page(struct trace_buffer *buffer, 6872 struct buffer_data_read_page *data_page, 6873 size_t len, int cpu, int full) 6874 { 6875 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6876 struct ring_buffer_event *event; 6877 struct buffer_data_page *bpage; 6878 struct buffer_page *reader; 6879 unsigned long missed_events; 6880 unsigned int commit; 6881 unsigned int read; 6882 u64 save_timestamp; 6883 bool force_memcpy; 6884 6885 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6886 return -1; 6887 6888 /* 6889 * If len is not big enough to hold the page header, then 6890 * we can not copy anything. 6891 */ 6892 if (len <= BUF_PAGE_HDR_SIZE) 6893 return -1; 6894 6895 len -= BUF_PAGE_HDR_SIZE; 6896 6897 if (!data_page || !data_page->data) 6898 return -1; 6899 6900 if (data_page->order != buffer->subbuf_order) 6901 return -1; 6902 6903 bpage = data_page->data; 6904 if (!bpage) 6905 return -1; 6906 6907 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6908 6909 reader = rb_get_reader_page(cpu_buffer); 6910 if (!reader) 6911 return -1; 6912 6913 event = rb_reader_event(cpu_buffer); 6914 6915 read = reader->read; 6916 commit = rb_page_size(reader); 6917 6918 /* Check if any events were dropped */ 6919 missed_events = cpu_buffer->lost_events; 6920 6921 force_memcpy = cpu_buffer->mapped || cpu_buffer->remote; 6922 6923 /* 6924 * If this page has been partially read or 6925 * if len is not big enough to read the rest of the page or 6926 * a writer is still on the page, then 6927 * we must copy the data from the page to the buffer. 6928 * Otherwise, we can simply swap the page with the one passed in. 6929 */ 6930 if (read || (len < (commit - read)) || 6931 cpu_buffer->reader_page == cpu_buffer->commit_page || 6932 force_memcpy) { 6933 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6934 unsigned int rpos = read; 6935 unsigned int pos = 0; 6936 unsigned int size; 6937 6938 /* 6939 * If a full page is expected, this can still be returned 6940 * if there's been a previous partial read and the 6941 * rest of the page can be read and the commit page is off 6942 * the reader page. 6943 */ 6944 if (full && 6945 (!read || (len < (commit - read)) || 6946 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6947 return -1; 6948 6949 if (len > (commit - read)) 6950 len = (commit - read); 6951 6952 /* Always keep the time extend and data together */ 6953 size = rb_event_ts_length(event); 6954 6955 if (len < size) 6956 return -1; 6957 6958 /* save the current timestamp, since the user will need it */ 6959 save_timestamp = cpu_buffer->read_stamp; 6960 6961 /* Need to copy one event at a time */ 6962 do { 6963 /* We need the size of one event, because 6964 * rb_advance_reader only advances by one event, 6965 * whereas rb_event_ts_length may include the size of 6966 * one or two events. 6967 * We have already ensured there's enough space if this 6968 * is a time extend. */ 6969 size = rb_event_length(event); 6970 memcpy(bpage->data + pos, rpage->data + rpos, size); 6971 6972 len -= size; 6973 6974 rb_advance_reader(cpu_buffer); 6975 rpos = reader->read; 6976 pos += size; 6977 6978 if (rpos >= commit) 6979 break; 6980 6981 event = rb_reader_event(cpu_buffer); 6982 /* Always keep the time extend and data together */ 6983 size = rb_event_ts_length(event); 6984 } while (len >= size); 6985 6986 /* update bpage */ 6987 local_set(&bpage->commit, pos); 6988 bpage->time_stamp = save_timestamp; 6989 6990 /* we copied everything to the beginning */ 6991 read = 0; 6992 } else { 6993 /* update the entry counter */ 6994 cpu_buffer->read += rb_page_entries(reader); 6995 cpu_buffer->read_bytes += rb_page_size(reader); 6996 6997 /* swap the pages */ 6998 rb_init_page(bpage); 6999 bpage = reader->page; 7000 reader->page = data_page->data; 7001 local_set(&reader->write, 0); 7002 local_set(&reader->entries, 0); 7003 reader->read = 0; 7004 data_page->data = bpage; 7005 7006 /* 7007 * Use the real_end for the data size, 7008 * This gives us a chance to store the lost events 7009 * on the page. 7010 */ 7011 if (reader->real_end) 7012 local_set(&bpage->commit, reader->real_end); 7013 } 7014 7015 cpu_buffer->lost_events = 0; 7016 7017 commit = local_read(&bpage->commit); 7018 /* 7019 * Set a flag in the commit field if we lost events 7020 */ 7021 if (missed_events) { 7022 /* If there is room at the end of the page to save the 7023 * missed events, then record it there. 7024 */ 7025 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7026 memcpy(&bpage->data[commit], &missed_events, 7027 sizeof(missed_events)); 7028 local_add(RB_MISSED_STORED, &bpage->commit); 7029 commit += sizeof(missed_events); 7030 } 7031 local_add(RB_MISSED_EVENTS, &bpage->commit); 7032 } 7033 7034 /* 7035 * This page may be off to user land. Zero it out here. 7036 */ 7037 if (commit < buffer->subbuf_size) 7038 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 7039 7040 return read; 7041 } 7042 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 7043 7044 /** 7045 * ring_buffer_read_page_data - get pointer to the data in the page. 7046 * @page: the page to get the data from 7047 * 7048 * Returns pointer to the actual data in this page. 7049 */ 7050 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 7051 { 7052 return page->data; 7053 } 7054 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 7055 7056 /** 7057 * ring_buffer_subbuf_size_get - get size of the sub buffer. 7058 * @buffer: the buffer to get the sub buffer size from 7059 * 7060 * Returns size of the sub buffer, in bytes. 7061 */ 7062 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 7063 { 7064 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7065 } 7066 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 7067 7068 /** 7069 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 7070 * @buffer: The ring_buffer to get the system sub page order from 7071 * 7072 * By default, one ring buffer sub page equals to one system page. This parameter 7073 * is configurable, per ring buffer. The size of the ring buffer sub page can be 7074 * extended, but must be an order of system page size. 7075 * 7076 * Returns the order of buffer sub page size, in system pages: 7077 * 0 means the sub buffer size is 1 system page and so forth. 7078 * In case of an error < 0 is returned. 7079 */ 7080 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 7081 { 7082 if (!buffer) 7083 return -EINVAL; 7084 7085 return buffer->subbuf_order; 7086 } 7087 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 7088 7089 /** 7090 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 7091 * @buffer: The ring_buffer to set the new page size. 7092 * @order: Order of the system pages in one sub buffer page 7093 * 7094 * By default, one ring buffer pages equals to one system page. This API can be 7095 * used to set new size of the ring buffer page. The size must be order of 7096 * system page size, that's why the input parameter @order is the order of 7097 * system pages that are allocated for one ring buffer page: 7098 * 0 - 1 system page 7099 * 1 - 2 system pages 7100 * 3 - 4 system pages 7101 * ... 7102 * 7103 * Returns 0 on success or < 0 in case of an error. 7104 */ 7105 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 7106 { 7107 struct ring_buffer_per_cpu *cpu_buffer; 7108 struct buffer_page *bpage, *tmp; 7109 int old_order, old_size; 7110 int nr_pages; 7111 int psize; 7112 int err; 7113 int cpu; 7114 7115 if (!buffer || order < 0) 7116 return -EINVAL; 7117 7118 if (buffer->subbuf_order == order) 7119 return 0; 7120 7121 psize = (1 << order) * PAGE_SIZE; 7122 if (psize <= BUF_PAGE_HDR_SIZE) 7123 return -EINVAL; 7124 7125 /* Size of a subbuf cannot be greater than the write counter */ 7126 if (psize > RB_WRITE_MASK + 1) 7127 return -EINVAL; 7128 7129 old_order = buffer->subbuf_order; 7130 old_size = buffer->subbuf_size; 7131 7132 /* prevent another thread from changing buffer sizes */ 7133 guard(mutex)(&buffer->mutex); 7134 atomic_inc(&buffer->record_disabled); 7135 7136 /* Make sure all commits have finished */ 7137 synchronize_rcu(); 7138 7139 buffer->subbuf_order = order; 7140 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 7141 7142 /* Make sure all new buffers are allocated, before deleting the old ones */ 7143 for_each_buffer_cpu(buffer, cpu) { 7144 7145 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7146 continue; 7147 7148 cpu_buffer = buffer->buffers[cpu]; 7149 7150 if (cpu_buffer->mapped) { 7151 err = -EBUSY; 7152 goto error; 7153 } 7154 7155 /* Update the number of pages to match the new size */ 7156 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 7157 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 7158 7159 /* we need a minimum of two pages */ 7160 if (nr_pages < 2) 7161 nr_pages = 2; 7162 7163 cpu_buffer->nr_pages_to_update = nr_pages; 7164 7165 /* Include the reader page */ 7166 nr_pages++; 7167 7168 /* Allocate the new size buffer */ 7169 INIT_LIST_HEAD(&cpu_buffer->new_pages); 7170 if (__rb_allocate_pages(cpu_buffer, nr_pages, 7171 &cpu_buffer->new_pages)) { 7172 /* not enough memory for new pages */ 7173 err = -ENOMEM; 7174 goto error; 7175 } 7176 } 7177 7178 for_each_buffer_cpu(buffer, cpu) { 7179 struct buffer_data_page *old_free_data_page; 7180 struct list_head old_pages; 7181 unsigned long flags; 7182 7183 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7184 continue; 7185 7186 cpu_buffer = buffer->buffers[cpu]; 7187 7188 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7189 7190 /* Clear the head bit to make the link list normal to read */ 7191 rb_head_page_deactivate(cpu_buffer); 7192 7193 /* 7194 * Collect buffers from the cpu_buffer pages list and the 7195 * reader_page on old_pages, so they can be freed later when not 7196 * under a spinlock. The pages list is a linked list with no 7197 * head, adding old_pages turns it into a regular list with 7198 * old_pages being the head. 7199 */ 7200 list_add(&old_pages, cpu_buffer->pages); 7201 list_add(&cpu_buffer->reader_page->list, &old_pages); 7202 7203 /* One page was allocated for the reader page */ 7204 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 7205 struct buffer_page, list); 7206 list_del_init(&cpu_buffer->reader_page->list); 7207 7208 /* Install the new pages, remove the head from the list */ 7209 cpu_buffer->pages = cpu_buffer->new_pages.next; 7210 list_del_init(&cpu_buffer->new_pages); 7211 cpu_buffer->cnt++; 7212 7213 cpu_buffer->head_page 7214 = list_entry(cpu_buffer->pages, struct buffer_page, list); 7215 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 7216 7217 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 7218 cpu_buffer->nr_pages_to_update = 0; 7219 7220 old_free_data_page = cpu_buffer->free_page; 7221 cpu_buffer->free_page = NULL; 7222 7223 rb_head_page_activate(cpu_buffer); 7224 7225 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7226 7227 /* Free old sub buffers */ 7228 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 7229 list_del_init(&bpage->list); 7230 free_buffer_page(bpage); 7231 } 7232 free_pages((unsigned long)old_free_data_page, old_order); 7233 7234 rb_check_pages(cpu_buffer); 7235 } 7236 7237 atomic_dec(&buffer->record_disabled); 7238 7239 return 0; 7240 7241 error: 7242 buffer->subbuf_order = old_order; 7243 buffer->subbuf_size = old_size; 7244 7245 atomic_dec(&buffer->record_disabled); 7246 7247 for_each_buffer_cpu(buffer, cpu) { 7248 cpu_buffer = buffer->buffers[cpu]; 7249 7250 if (!cpu_buffer->nr_pages_to_update) 7251 continue; 7252 7253 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 7254 list_del_init(&bpage->list); 7255 free_buffer_page(bpage); 7256 } 7257 } 7258 7259 return err; 7260 } 7261 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 7262 7263 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7264 { 7265 struct page *page; 7266 7267 if (cpu_buffer->meta_page) 7268 return 0; 7269 7270 page = alloc_page(GFP_USER | __GFP_ZERO); 7271 if (!page) 7272 return -ENOMEM; 7273 7274 cpu_buffer->meta_page = page_to_virt(page); 7275 7276 return 0; 7277 } 7278 7279 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7280 { 7281 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7282 7283 free_page(addr); 7284 cpu_buffer->meta_page = NULL; 7285 } 7286 7287 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7288 struct buffer_page **subbuf_ids) 7289 { 7290 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7291 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7292 struct buffer_page *first_subbuf, *subbuf; 7293 int cnt = 0; 7294 int id = 0; 7295 7296 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7297 subbuf_ids[id++] = cpu_buffer->reader_page; 7298 cnt++; 7299 7300 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7301 do { 7302 id = rb_page_id(cpu_buffer, subbuf, id); 7303 7304 if (WARN_ON(id >= nr_subbufs)) 7305 break; 7306 7307 subbuf_ids[id] = subbuf; 7308 7309 rb_inc_page(&subbuf); 7310 id++; 7311 cnt++; 7312 } while (subbuf != first_subbuf); 7313 7314 WARN_ON(cnt != nr_subbufs); 7315 7316 /* install subbuf ID to bpage translation */ 7317 cpu_buffer->subbuf_ids = subbuf_ids; 7318 7319 meta->meta_struct_len = sizeof(*meta); 7320 meta->nr_subbufs = nr_subbufs; 7321 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7322 meta->meta_page_size = meta->subbuf_size; 7323 7324 rb_update_meta_page(cpu_buffer); 7325 } 7326 7327 static struct ring_buffer_per_cpu * 7328 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7329 { 7330 struct ring_buffer_per_cpu *cpu_buffer; 7331 7332 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7333 return ERR_PTR(-EINVAL); 7334 7335 cpu_buffer = buffer->buffers[cpu]; 7336 7337 mutex_lock(&cpu_buffer->mapping_lock); 7338 7339 if (!cpu_buffer->user_mapped) { 7340 mutex_unlock(&cpu_buffer->mapping_lock); 7341 return ERR_PTR(-ENODEV); 7342 } 7343 7344 return cpu_buffer; 7345 } 7346 7347 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7348 { 7349 mutex_unlock(&cpu_buffer->mapping_lock); 7350 } 7351 7352 /* 7353 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7354 * to be set-up or torn-down. 7355 */ 7356 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7357 bool inc) 7358 { 7359 unsigned long flags; 7360 7361 lockdep_assert_held(&cpu_buffer->mapping_lock); 7362 7363 /* mapped is always greater or equal to user_mapped */ 7364 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7365 return -EINVAL; 7366 7367 if (inc && cpu_buffer->mapped == UINT_MAX) 7368 return -EBUSY; 7369 7370 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7371 return -EINVAL; 7372 7373 mutex_lock(&cpu_buffer->buffer->mutex); 7374 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7375 7376 if (inc) { 7377 cpu_buffer->user_mapped++; 7378 cpu_buffer->mapped++; 7379 } else { 7380 cpu_buffer->user_mapped--; 7381 cpu_buffer->mapped--; 7382 } 7383 7384 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7385 mutex_unlock(&cpu_buffer->buffer->mutex); 7386 7387 return 0; 7388 } 7389 7390 /* 7391 * +--------------+ pgoff == 0 7392 * | meta page | 7393 * +--------------+ pgoff == 1 7394 * | subbuffer 0 | 7395 * | | 7396 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7397 * | subbuffer 1 | 7398 * | | 7399 * ... 7400 */ 7401 #ifdef CONFIG_MMU 7402 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7403 struct vm_area_struct *vma) 7404 { 7405 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7406 unsigned int subbuf_pages, subbuf_order; 7407 struct page **pages __free(kfree) = NULL; 7408 int p = 0, s = 0; 7409 int err; 7410 7411 /* Refuse MP_PRIVATE or writable mappings */ 7412 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7413 !(vma->vm_flags & VM_MAYSHARE)) 7414 return -EPERM; 7415 7416 subbuf_order = cpu_buffer->buffer->subbuf_order; 7417 subbuf_pages = 1 << subbuf_order; 7418 7419 if (subbuf_order && pgoff % subbuf_pages) 7420 return -EINVAL; 7421 7422 /* 7423 * Make sure the mapping cannot become writable later. Also tell the VM 7424 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7425 */ 7426 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7427 VM_MAYWRITE); 7428 7429 lockdep_assert_held(&cpu_buffer->mapping_lock); 7430 7431 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7432 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7433 if (nr_pages <= pgoff) 7434 return -EINVAL; 7435 7436 nr_pages -= pgoff; 7437 7438 nr_vma_pages = vma_pages(vma); 7439 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7440 return -EINVAL; 7441 7442 nr_pages = nr_vma_pages; 7443 7444 pages = kzalloc_objs(*pages, nr_pages); 7445 if (!pages) 7446 return -ENOMEM; 7447 7448 if (!pgoff) { 7449 unsigned long meta_page_padding; 7450 7451 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7452 7453 /* 7454 * Pad with the zero-page to align the meta-page with the 7455 * sub-buffers. 7456 */ 7457 meta_page_padding = subbuf_pages - 1; 7458 while (meta_page_padding-- && p < nr_pages) { 7459 unsigned long __maybe_unused zero_addr = 7460 vma->vm_start + (PAGE_SIZE * p); 7461 7462 pages[p++] = ZERO_PAGE(zero_addr); 7463 } 7464 } else { 7465 /* Skip the meta-page */ 7466 pgoff -= subbuf_pages; 7467 7468 s += pgoff / subbuf_pages; 7469 } 7470 7471 while (p < nr_pages) { 7472 struct buffer_page *subbuf; 7473 struct page *page; 7474 int off = 0; 7475 7476 if (WARN_ON_ONCE(s >= nr_subbufs)) 7477 return -EINVAL; 7478 7479 subbuf = cpu_buffer->subbuf_ids[s]; 7480 page = virt_to_page((void *)subbuf->page); 7481 7482 for (; off < (1 << (subbuf_order)); off++, page++) { 7483 if (p >= nr_pages) 7484 break; 7485 7486 pages[p++] = page; 7487 } 7488 s++; 7489 } 7490 7491 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7492 7493 return err; 7494 } 7495 #else 7496 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7497 struct vm_area_struct *vma) 7498 { 7499 return -EOPNOTSUPP; 7500 } 7501 #endif 7502 7503 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7504 struct vm_area_struct *vma) 7505 { 7506 struct ring_buffer_per_cpu *cpu_buffer; 7507 struct buffer_page **subbuf_ids; 7508 unsigned long flags; 7509 int err; 7510 7511 if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->remote) 7512 return -EINVAL; 7513 7514 cpu_buffer = buffer->buffers[cpu]; 7515 7516 guard(mutex)(&cpu_buffer->mapping_lock); 7517 7518 if (cpu_buffer->user_mapped) { 7519 err = __rb_map_vma(cpu_buffer, vma); 7520 if (!err) 7521 err = __rb_inc_dec_mapped(cpu_buffer, true); 7522 return err; 7523 } 7524 7525 /* prevent another thread from changing buffer/sub-buffer sizes */ 7526 guard(mutex)(&buffer->mutex); 7527 7528 err = rb_alloc_meta_page(cpu_buffer); 7529 if (err) 7530 return err; 7531 7532 /* subbuf_ids includes the reader while nr_pages does not */ 7533 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7534 if (!subbuf_ids) { 7535 rb_free_meta_page(cpu_buffer); 7536 return -ENOMEM; 7537 } 7538 7539 atomic_inc(&cpu_buffer->resize_disabled); 7540 7541 /* 7542 * Lock all readers to block any subbuf swap until the subbuf IDs are 7543 * assigned. 7544 */ 7545 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7546 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7547 7548 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7549 7550 err = __rb_map_vma(cpu_buffer, vma); 7551 if (!err) { 7552 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7553 /* This is the first time it is mapped by user */ 7554 cpu_buffer->mapped++; 7555 cpu_buffer->user_mapped = 1; 7556 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7557 } else { 7558 kfree(cpu_buffer->subbuf_ids); 7559 cpu_buffer->subbuf_ids = NULL; 7560 rb_free_meta_page(cpu_buffer); 7561 atomic_dec(&cpu_buffer->resize_disabled); 7562 } 7563 7564 return err; 7565 } 7566 7567 /* 7568 * This is called when a VMA is duplicated (e.g., on fork()) to increment 7569 * the user_mapped counter without remapping pages. 7570 */ 7571 void ring_buffer_map_dup(struct trace_buffer *buffer, int cpu) 7572 { 7573 struct ring_buffer_per_cpu *cpu_buffer; 7574 7575 if (WARN_ON(!cpumask_test_cpu(cpu, buffer->cpumask))) 7576 return; 7577 7578 cpu_buffer = buffer->buffers[cpu]; 7579 7580 guard(mutex)(&cpu_buffer->mapping_lock); 7581 7582 if (cpu_buffer->user_mapped) 7583 __rb_inc_dec_mapped(cpu_buffer, true); 7584 else 7585 WARN(1, "Unexpected buffer stat, it should be mapped"); 7586 } 7587 7588 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7589 { 7590 struct ring_buffer_per_cpu *cpu_buffer; 7591 unsigned long flags; 7592 7593 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7594 return -EINVAL; 7595 7596 cpu_buffer = buffer->buffers[cpu]; 7597 7598 guard(mutex)(&cpu_buffer->mapping_lock); 7599 7600 if (!cpu_buffer->user_mapped) { 7601 return -ENODEV; 7602 } else if (cpu_buffer->user_mapped > 1) { 7603 __rb_inc_dec_mapped(cpu_buffer, false); 7604 return 0; 7605 } 7606 7607 guard(mutex)(&buffer->mutex); 7608 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7609 7610 /* This is the last user space mapping */ 7611 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7612 cpu_buffer->mapped--; 7613 cpu_buffer->user_mapped = 0; 7614 7615 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7616 7617 kfree(cpu_buffer->subbuf_ids); 7618 cpu_buffer->subbuf_ids = NULL; 7619 rb_free_meta_page(cpu_buffer); 7620 atomic_dec(&cpu_buffer->resize_disabled); 7621 7622 return 0; 7623 } 7624 7625 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7626 { 7627 struct ring_buffer_per_cpu *cpu_buffer; 7628 struct buffer_page *reader; 7629 unsigned long missed_events; 7630 unsigned long reader_size; 7631 unsigned long flags; 7632 7633 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7634 if (IS_ERR(cpu_buffer)) 7635 return (int)PTR_ERR(cpu_buffer); 7636 7637 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7638 7639 consume: 7640 if (rb_per_cpu_empty(cpu_buffer)) 7641 goto out; 7642 7643 reader_size = rb_page_size(cpu_buffer->reader_page); 7644 7645 /* 7646 * There are data to be read on the current reader page, we can 7647 * return to the caller. But before that, we assume the latter will read 7648 * everything. Let's update the kernel reader accordingly. 7649 */ 7650 if (cpu_buffer->reader_page->read < reader_size) { 7651 while (cpu_buffer->reader_page->read < reader_size) 7652 rb_advance_reader(cpu_buffer); 7653 goto out; 7654 } 7655 7656 /* Did the reader catch up with the writer? */ 7657 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7658 goto out; 7659 7660 reader = rb_get_reader_page(cpu_buffer); 7661 if (WARN_ON(!reader)) 7662 goto out; 7663 7664 /* Check if any events were dropped */ 7665 missed_events = cpu_buffer->lost_events; 7666 7667 if (missed_events) { 7668 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7669 struct buffer_data_page *bpage = reader->page; 7670 unsigned int commit; 7671 /* 7672 * Use the real_end for the data size, 7673 * This gives us a chance to store the lost events 7674 * on the page. 7675 */ 7676 if (reader->real_end) 7677 local_set(&bpage->commit, reader->real_end); 7678 /* 7679 * If there is room at the end of the page to save the 7680 * missed events, then record it there. 7681 */ 7682 commit = rb_page_size(reader); 7683 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7684 memcpy(&bpage->data[commit], &missed_events, 7685 sizeof(missed_events)); 7686 local_add(RB_MISSED_STORED, &bpage->commit); 7687 } 7688 local_add(RB_MISSED_EVENTS, &bpage->commit); 7689 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7690 "Reader on commit with %ld missed events", 7691 missed_events)) { 7692 /* 7693 * There shouldn't be any missed events if the tail_page 7694 * is on the reader page. But if the tail page is not on the 7695 * reader page and the commit_page is, that would mean that 7696 * there's a commit_overrun (an interrupt preempted an 7697 * addition of an event and then filled the buffer 7698 * with new events). In this case it's not an 7699 * error, but it should still be reported. 7700 * 7701 * TODO: Add missed events to the page for user space to know. 7702 */ 7703 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7704 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7705 } 7706 } 7707 7708 cpu_buffer->lost_events = 0; 7709 7710 goto consume; 7711 7712 out: 7713 /* Some archs do not have data cache coherency between kernel and user-space */ 7714 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7715 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7716 7717 rb_update_meta_page(cpu_buffer); 7718 7719 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7720 rb_put_mapped_buffer(cpu_buffer); 7721 7722 return 0; 7723 } 7724 7725 static void rb_cpu_sync(void *data) 7726 { 7727 /* Not really needed, but documents what is happening */ 7728 smp_rmb(); 7729 } 7730 7731 /* 7732 * We only allocate new buffers, never free them if the CPU goes down. 7733 * If we were to free the buffer, then the user would lose any trace that was in 7734 * the buffer. 7735 */ 7736 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7737 { 7738 struct trace_buffer *buffer; 7739 long nr_pages_same; 7740 int cpu_i; 7741 unsigned long nr_pages; 7742 7743 buffer = container_of(node, struct trace_buffer, node); 7744 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7745 return 0; 7746 7747 nr_pages = 0; 7748 nr_pages_same = 1; 7749 /* check if all cpu sizes are same */ 7750 for_each_buffer_cpu(buffer, cpu_i) { 7751 /* fill in the size from first enabled cpu */ 7752 if (nr_pages == 0) 7753 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7754 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7755 nr_pages_same = 0; 7756 break; 7757 } 7758 } 7759 /* allocate minimum pages, user can later expand it */ 7760 if (!nr_pages_same) 7761 nr_pages = 2; 7762 buffer->buffers[cpu] = 7763 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7764 if (!buffer->buffers[cpu]) { 7765 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7766 cpu); 7767 return -ENOMEM; 7768 } 7769 7770 /* 7771 * Ensure trace_buffer readers observe the newly allocated 7772 * ring_buffer_per_cpu before they check the cpumask. Instead of using a 7773 * read barrier for all readers, send an IPI. 7774 */ 7775 if (unlikely(system_state == SYSTEM_RUNNING)) { 7776 on_each_cpu(rb_cpu_sync, NULL, 1); 7777 /* Not really needed, but documents what is happening */ 7778 smp_wmb(); 7779 } 7780 7781 cpumask_set_cpu(cpu, buffer->cpumask); 7782 return 0; 7783 } 7784 7785 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7786 /* 7787 * This is a basic integrity check of the ring buffer. 7788 * Late in the boot cycle this test will run when configured in. 7789 * It will kick off a thread per CPU that will go into a loop 7790 * writing to the per cpu ring buffer various sizes of data. 7791 * Some of the data will be large items, some small. 7792 * 7793 * Another thread is created that goes into a spin, sending out 7794 * IPIs to the other CPUs to also write into the ring buffer. 7795 * this is to test the nesting ability of the buffer. 7796 * 7797 * Basic stats are recorded and reported. If something in the 7798 * ring buffer should happen that's not expected, a big warning 7799 * is displayed and all ring buffers are disabled. 7800 */ 7801 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7802 7803 struct rb_test_data { 7804 struct trace_buffer *buffer; 7805 unsigned long events; 7806 unsigned long bytes_written; 7807 unsigned long bytes_alloc; 7808 unsigned long bytes_dropped; 7809 unsigned long events_nested; 7810 unsigned long bytes_written_nested; 7811 unsigned long bytes_alloc_nested; 7812 unsigned long bytes_dropped_nested; 7813 int min_size_nested; 7814 int max_size_nested; 7815 int max_size; 7816 int min_size; 7817 int cpu; 7818 int cnt; 7819 }; 7820 7821 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7822 7823 /* 1 meg per cpu */ 7824 #define RB_TEST_BUFFER_SIZE 1048576 7825 7826 static char rb_string[] __initdata = 7827 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7828 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7829 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7830 7831 static bool rb_test_started __initdata; 7832 7833 struct rb_item { 7834 int size; 7835 char str[]; 7836 }; 7837 7838 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7839 { 7840 struct ring_buffer_event *event; 7841 struct rb_item *item; 7842 bool started; 7843 int event_len; 7844 int size; 7845 int len; 7846 int cnt; 7847 7848 /* Have nested writes different that what is written */ 7849 cnt = data->cnt + (nested ? 27 : 0); 7850 7851 /* Multiply cnt by ~e, to make some unique increment */ 7852 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7853 7854 len = size + sizeof(struct rb_item); 7855 7856 started = rb_test_started; 7857 /* read rb_test_started before checking buffer enabled */ 7858 smp_rmb(); 7859 7860 event = ring_buffer_lock_reserve(data->buffer, len); 7861 if (!event) { 7862 /* Ignore dropped events before test starts. */ 7863 if (started) { 7864 if (nested) 7865 data->bytes_dropped_nested += len; 7866 else 7867 data->bytes_dropped += len; 7868 } 7869 return len; 7870 } 7871 7872 event_len = ring_buffer_event_length(event); 7873 7874 if (RB_WARN_ON(data->buffer, event_len < len)) 7875 goto out; 7876 7877 item = ring_buffer_event_data(event); 7878 item->size = size; 7879 memcpy(item->str, rb_string, size); 7880 7881 if (nested) { 7882 data->bytes_alloc_nested += event_len; 7883 data->bytes_written_nested += len; 7884 data->events_nested++; 7885 if (!data->min_size_nested || len < data->min_size_nested) 7886 data->min_size_nested = len; 7887 if (len > data->max_size_nested) 7888 data->max_size_nested = len; 7889 } else { 7890 data->bytes_alloc += event_len; 7891 data->bytes_written += len; 7892 data->events++; 7893 if (!data->min_size || len < data->min_size) 7894 data->max_size = len; 7895 if (len > data->max_size) 7896 data->max_size = len; 7897 } 7898 7899 out: 7900 ring_buffer_unlock_commit(data->buffer); 7901 7902 return 0; 7903 } 7904 7905 static __init int rb_test(void *arg) 7906 { 7907 struct rb_test_data *data = arg; 7908 7909 while (!kthread_should_stop()) { 7910 rb_write_something(data, false); 7911 data->cnt++; 7912 7913 set_current_state(TASK_INTERRUPTIBLE); 7914 /* Now sleep between a min of 100-300us and a max of 1ms */ 7915 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7916 } 7917 7918 return 0; 7919 } 7920 7921 static __init void rb_ipi(void *ignore) 7922 { 7923 struct rb_test_data *data; 7924 int cpu = smp_processor_id(); 7925 7926 data = &rb_data[cpu]; 7927 rb_write_something(data, true); 7928 } 7929 7930 static __init int rb_hammer_test(void *arg) 7931 { 7932 while (!kthread_should_stop()) { 7933 7934 /* Send an IPI to all cpus to write data! */ 7935 smp_call_function(rb_ipi, NULL, 1); 7936 /* No sleep, but for non preempt, let others run */ 7937 schedule(); 7938 } 7939 7940 return 0; 7941 } 7942 7943 static __init int test_ringbuffer(void) 7944 { 7945 struct task_struct *rb_hammer; 7946 struct trace_buffer *buffer; 7947 int cpu; 7948 int ret = 0; 7949 7950 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7951 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7952 return 0; 7953 } 7954 7955 pr_info("Running ring buffer tests...\n"); 7956 7957 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7958 if (WARN_ON(!buffer)) 7959 return 0; 7960 7961 /* Disable buffer so that threads can't write to it yet */ 7962 ring_buffer_record_off(buffer); 7963 7964 for_each_online_cpu(cpu) { 7965 rb_data[cpu].buffer = buffer; 7966 rb_data[cpu].cpu = cpu; 7967 rb_data[cpu].cnt = cpu; 7968 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7969 cpu, "rbtester/%u"); 7970 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7971 pr_cont("FAILED\n"); 7972 ret = PTR_ERR(rb_threads[cpu]); 7973 goto out_free; 7974 } 7975 } 7976 7977 /* Now create the rb hammer! */ 7978 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7979 if (WARN_ON(IS_ERR(rb_hammer))) { 7980 pr_cont("FAILED\n"); 7981 ret = PTR_ERR(rb_hammer); 7982 goto out_free; 7983 } 7984 7985 ring_buffer_record_on(buffer); 7986 /* 7987 * Show buffer is enabled before setting rb_test_started. 7988 * Yes there's a small race window where events could be 7989 * dropped and the thread won't catch it. But when a ring 7990 * buffer gets enabled, there will always be some kind of 7991 * delay before other CPUs see it. Thus, we don't care about 7992 * those dropped events. We care about events dropped after 7993 * the threads see that the buffer is active. 7994 */ 7995 smp_wmb(); 7996 rb_test_started = true; 7997 7998 set_current_state(TASK_INTERRUPTIBLE); 7999 /* Just run for 10 seconds */ 8000 schedule_timeout(10 * HZ); 8001 8002 kthread_stop(rb_hammer); 8003 8004 out_free: 8005 for_each_online_cpu(cpu) { 8006 if (!rb_threads[cpu]) 8007 break; 8008 kthread_stop(rb_threads[cpu]); 8009 } 8010 if (ret) { 8011 ring_buffer_free(buffer); 8012 return ret; 8013 } 8014 8015 /* Report! */ 8016 pr_info("finished\n"); 8017 for_each_online_cpu(cpu) { 8018 struct ring_buffer_event *event; 8019 struct rb_test_data *data = &rb_data[cpu]; 8020 struct rb_item *item; 8021 unsigned long total_events; 8022 unsigned long total_dropped; 8023 unsigned long total_written; 8024 unsigned long total_alloc; 8025 unsigned long total_read = 0; 8026 unsigned long total_size = 0; 8027 unsigned long total_len = 0; 8028 unsigned long total_lost = 0; 8029 unsigned long lost; 8030 int big_event_size; 8031 int small_event_size; 8032 8033 ret = -1; 8034 8035 total_events = data->events + data->events_nested; 8036 total_written = data->bytes_written + data->bytes_written_nested; 8037 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 8038 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 8039 8040 big_event_size = data->max_size + data->max_size_nested; 8041 small_event_size = data->min_size + data->min_size_nested; 8042 8043 pr_info("CPU %d:\n", cpu); 8044 pr_info(" events: %ld\n", total_events); 8045 pr_info(" dropped bytes: %ld\n", total_dropped); 8046 pr_info(" alloced bytes: %ld\n", total_alloc); 8047 pr_info(" written bytes: %ld\n", total_written); 8048 pr_info(" biggest event: %d\n", big_event_size); 8049 pr_info(" smallest event: %d\n", small_event_size); 8050 8051 if (RB_WARN_ON(buffer, total_dropped)) 8052 break; 8053 8054 ret = 0; 8055 8056 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 8057 total_lost += lost; 8058 item = ring_buffer_event_data(event); 8059 total_len += ring_buffer_event_length(event); 8060 total_size += item->size + sizeof(struct rb_item); 8061 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 8062 pr_info("FAILED!\n"); 8063 pr_info("buffer had: %.*s\n", item->size, item->str); 8064 pr_info("expected: %.*s\n", item->size, rb_string); 8065 RB_WARN_ON(buffer, 1); 8066 ret = -1; 8067 break; 8068 } 8069 total_read++; 8070 } 8071 if (ret) 8072 break; 8073 8074 ret = -1; 8075 8076 pr_info(" read events: %ld\n", total_read); 8077 pr_info(" lost events: %ld\n", total_lost); 8078 pr_info(" total events: %ld\n", total_lost + total_read); 8079 pr_info(" recorded len bytes: %ld\n", total_len); 8080 pr_info(" recorded size bytes: %ld\n", total_size); 8081 if (total_lost) { 8082 pr_info(" With dropped events, record len and size may not match\n" 8083 " alloced and written from above\n"); 8084 } else { 8085 if (RB_WARN_ON(buffer, total_len != total_alloc || 8086 total_size != total_written)) 8087 break; 8088 } 8089 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 8090 break; 8091 8092 ret = 0; 8093 } 8094 if (!ret) 8095 pr_info("Ring buffer PASSED!\n"); 8096 8097 ring_buffer_free(buffer); 8098 return 0; 8099 } 8100 8101 late_initcall(test_ringbuffer); 8102 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 8103